Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
import os | import os | ||||
import random | import random | ||||
import string | import string | ||||
import sys | import sys | ||||
import time | import time | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
import pendulum | import pendulum | ||||
import pytest | import pytest | ||||
from dagster import Any, Field, pipeline, repository, solid | from dagster import Any, Field, pipeline, repository, solid | ||||
from dagster.core.definitions import PartitionSetDefinition | from dagster.core.definitions import Partition, PartitionSetDefinition | ||||
from dagster.core.definitions.reconstructable import ReconstructableRepository | from dagster.core.definitions.reconstructable import ReconstructableRepository | ||||
from dagster.core.execution.backfill import BulkActionStatus, PartitionBackfill | from dagster.core.execution.backfill import BulkActionStatus, PartitionBackfill | ||||
from dagster.core.host_representation import ( | from dagster.core.host_representation import ( | ||||
ExternalRepositoryOrigin, | ExternalRepositoryOrigin, | ||||
InProcessRepositoryLocationOrigin, | InProcessRepositoryLocationOrigin, | ||||
ManagedGrpcPythonEnvRepositoryLocationOrigin, | ManagedGrpcPythonEnvRepositoryLocationOrigin, | ||||
) | ) | ||||
from dagster.core.host_representation.grpc_server_registry import ProcessGrpcServerRegistry | from dagster.core.host_representation.grpc_server_registry import ProcessGrpcServerRegistry | ||||
▲ Show 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | |||||
@pipeline | @pipeline | ||||
def config_pipeline(): | def config_pipeline(): | ||||
config_solid() | config_solid() | ||||
simple_partition_set = PartitionSetDefinition( | simple_partition_set = PartitionSetDefinition( | ||||
name="simple_partition_set", | name="simple_partition_set", | ||||
pipeline_name="the_pipeline", | pipeline_name="the_pipeline", | ||||
partition_fn=lambda: ["one", "two", "three"], | partition_fn=lambda: [Partition("one"), Partition("two"), Partition("three")], | ||||
run_config_fn_for_partition=lambda _partition: {"intermediate_storage": {"filesystem": {}}}, | run_config_fn_for_partition=lambda _partition: {"intermediate_storage": {"filesystem": {}}}, | ||||
) | ) | ||||
conditionally_fail_partition_set = PartitionSetDefinition( | conditionally_fail_partition_set = PartitionSetDefinition( | ||||
name="conditionally_fail_partition_set", | name="conditionally_fail_partition_set", | ||||
pipeline_name="conditional_failure_pipeline", | pipeline_name="conditional_failure_pipeline", | ||||
partition_fn=lambda: ["one", "two", "three"], | partition_fn=lambda: ["one", "two", "three"], | ||||
run_config_fn_for_partition=lambda _partition: {"intermediate_storage": {"filesystem": {}}}, | run_config_fn_for_partition=lambda _partition: {"intermediate_storage": {"filesystem": {}}}, | ||||
) | ) | ||||
partial_partition_set = PartitionSetDefinition( | partial_partition_set = PartitionSetDefinition( | ||||
name="partial_partition_set", | name="partial_partition_set", | ||||
pipeline_name="partial_pipeline", | pipeline_name="partial_pipeline", | ||||
partition_fn=lambda: ["one", "two", "three"], | partition_fn=lambda: [Partition("one"), Partition("two"), Partition("three")], | ||||
run_config_fn_for_partition=lambda _partition: {"intermediate_storage": {"filesystem": {}}}, | run_config_fn_for_partition=lambda _partition: {"intermediate_storage": {"filesystem": {}}}, | ||||
) | ) | ||||
def _large_partition_config(_): | def _large_partition_config(_): | ||||
REQUEST_CONFIG_COUNT = 50000 | REQUEST_CONFIG_COUNT = 50000 | ||||
def _random_string(length): | def _random_string(length): | ||||
Show All 10 Lines | return { | ||||
} | } | ||||
} | } | ||||
} | } | ||||
large_partition_set = PartitionSetDefinition( | large_partition_set = PartitionSetDefinition( | ||||
name="large_partition_set", | name="large_partition_set", | ||||
pipeline_name="config_pipeline", | pipeline_name="config_pipeline", | ||||
partition_fn=lambda: ["one", "two", "three"], | partition_fn=lambda: [Partition("one"), Partition("two"), Partition("three")], | ||||
run_config_fn_for_partition=_large_partition_config, | run_config_fn_for_partition=_large_partition_config, | ||||
) | ) | ||||
def _unloadable_partition_set_origin(): | def _unloadable_partition_set_origin(): | ||||
working_directory = os.path.dirname(__file__) | working_directory = os.path.dirname(__file__) | ||||
recon_repo = ReconstructableRepository.for_file(__file__, "doesnt_exist", working_directory) | recon_repo = ReconstructableRepository.for_file(__file__, "doesnt_exist", working_directory) | ||||
return ExternalRepositoryOrigin( | return ExternalRepositoryOrigin( | ||||
▲ Show 20 Lines • Show All 393 Lines • Show Last 20 Lines |