Differential D6164 Diff 30546 python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import RepositorySelector | from dagster.core.host_representation import RepositorySelector | ||||
from dagster.core.host_representation.external import ExternalPartitionSet | from dagster.core.host_representation.external import ExternalPartitionSet | ||||
from dagster.core.host_representation.external_data import ( | from dagster.core.host_representation.external_data import ( | ||||
ExternalPartitionExecutionErrorData, | ExternalPartitionExecutionErrorData, | ||||
ExternalPartitionExecutionParamData, | ExternalPartitionExecutionParamData, | ||||
ExternalPartitionSetExecutionParamData, | ExternalPartitionSetExecutionParamData, | ||||
) | ) | ||||
from dagster.core.host_representation.selector import PipelineSelector | from dagster.core.host_representation.selector import PipelineSelector | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | ||||
from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG, RESUME_RETRY_TAG | from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG, RESUME_RETRY_TAG | ||||
from dagster.core.utils import make_new_backfill_id | from dagster.core.utils import make_new_backfill_id | ||||
from dagster.utils import merge_dicts | from dagster.utils import merge_dicts | ||||
from ..utils import ExecutionMetadata, ExecutionParams, capture_dauphin_error | from ..utils import ExecutionMetadata, ExecutionParams, capture_error | ||||
from .run_lifecycle import create_valid_pipeline_run | from .run_lifecycle import create_valid_pipeline_run | ||||
@capture_dauphin_error | @capture_error | ||||
max: Rename | |||||
def create_and_launch_partition_backfill(graphene_info, backfill_params): | def create_and_launch_partition_backfill(graphene_info, backfill_params): | ||||
from ...schema.backfill import GraphenePartitionBackfillSuccess | |||||
from ...schema.errors import GraphenePartitionSetNotFoundError, GraphenePythonError | |||||
partition_set_selector = backfill_params.get("selector") | partition_set_selector = backfill_params.get("selector") | ||||
partition_set_name = partition_set_selector.get("partitionSetName") | partition_set_name = partition_set_selector.get("partitionSetName") | ||||
repository_selector = RepositorySelector.from_graphql_input( | repository_selector = RepositorySelector.from_graphql_input( | ||||
partition_set_selector.get("repositorySelector") | partition_set_selector.get("repositorySelector") | ||||
) | ) | ||||
location = graphene_info.context.get_repository_location(repository_selector.location_name) | location = graphene_info.context.get_repository_location(repository_selector.location_name) | ||||
repository = location.get_repository(repository_selector.repository_name) | repository = location.get_repository(repository_selector.repository_name) | ||||
matches = [ | matches = [ | ||||
partition_set | partition_set | ||||
for partition_set in repository.get_external_partition_sets() | for partition_set in repository.get_external_partition_sets() | ||||
if partition_set.name == partition_set_selector.get("partitionSetName") | if partition_set.name == partition_set_selector.get("partitionSetName") | ||||
] | ] | ||||
if not matches: | if not matches: | ||||
return graphene_info.schema.type_named("PartitionSetNotFoundError")(partition_set_name) | return GraphenePartitionSetNotFoundError(partition_set_name) | ||||
check.invariant( | check.invariant( | ||||
len(matches) == 1, | len(matches) == 1, | ||||
"Partition set names must be unique: found {num} matches for {partition_set_name}".format( | "Partition set names must be unique: found {num} matches for {partition_set_name}".format( | ||||
num=len(matches), partition_set_name=partition_set_name | num=len(matches), partition_set_name=partition_set_name | ||||
), | ), | ||||
) | ) | ||||
Show All 9 Lines | def create_and_launch_partition_backfill(graphene_info, backfill_params): | ||||
partition_names = backfill_params.get("partitionNames") | partition_names = backfill_params.get("partitionNames") | ||||
backfill_id = make_new_backfill_id() | backfill_id = make_new_backfill_id() | ||||
result = graphene_info.context.get_external_partition_set_execution_param_data( | result = graphene_info.context.get_external_partition_set_execution_param_data( | ||||
repository.handle, partition_set_name, partition_names | repository.handle, partition_set_name, partition_names | ||||
) | ) | ||||
if isinstance(result, ExternalPartitionExecutionErrorData): | if isinstance(result, ExternalPartitionExecutionErrorData): | ||||
return graphene_info.schema.type_named("PythonError")(result.error) | return GraphenePythonError(result.error) | ||||
assert isinstance(result, ExternalPartitionSetExecutionParamData) | assert isinstance(result, ExternalPartitionSetExecutionParamData) | ||||
launched_run_ids = [] | launched_run_ids = [] | ||||
execution_param_list = _build_execution_param_list_for_backfill( | execution_param_list = _build_execution_param_list_for_backfill( | ||||
graphene_info.context.instance, | graphene_info.context.instance, | ||||
result.partition_data, | result.partition_data, | ||||
backfill_id, | backfill_id, | ||||
backfill_params, | backfill_params, | ||||
pipeline_selector, | pipeline_selector, | ||||
external_partition_set, | external_partition_set, | ||||
) | ) | ||||
for execution_params in execution_param_list: | for execution_params in execution_param_list: | ||||
pipeline_run = create_valid_pipeline_run(graphene_info, external_pipeline, execution_params) | pipeline_run = create_valid_pipeline_run(graphene_info, external_pipeline, execution_params) | ||||
graphene_info.context.instance.submit_run(pipeline_run.run_id, external_pipeline) | graphene_info.context.instance.submit_run(pipeline_run.run_id, external_pipeline) | ||||
launched_run_ids.append(pipeline_run.run_id) | launched_run_ids.append(pipeline_run.run_id) | ||||
return graphene_info.schema.type_named("PartitionBackfillSuccess")( | return GraphenePartitionBackfillSuccess( | ||||
backfill_id=backfill_id, launched_run_ids=launched_run_ids | backfill_id=backfill_id, launched_run_ids=launched_run_ids | ||||
) | ) | ||||
Done Inline ActionsNo need for this circumlocution max: No need for this circumlocution | |||||
def _build_execution_param_list_for_backfill( | def _build_execution_param_list_for_backfill( | ||||
instance, | instance, | ||||
partition_data_list, | partition_data_list, | ||||
backfill_id, | backfill_id, | ||||
backfill_params, | backfill_params, | ||||
pipeline_selector, | pipeline_selector, | ||||
▲ Show 20 Lines • Show All 94 Lines • Show Last 20 Lines |
Rename