Differential D5083 Diff 26697 python_modules/dagster-graphql/dagster_graphql/implementation/execution/run_lifecycle.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/implementation/execution/run_lifecycle.py
import logging | |||||
from dagster.core.instance import is_memoized_run | |||||
from dagster.core.storage.pipeline_run import PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRunStatus | ||||
from dagster.core.storage.tags import MEMOIZED_RUN_TAG | |||||
from dagster.core.utils import make_new_run_id | from dagster.core.utils import make_new_run_id | ||||
from dagster.utils import merge_dicts | from dagster.utils import merge_dicts | ||||
from ..external import ensure_valid_config, get_external_execution_plan_or_raise | from ..external import ensure_valid_config, get_external_execution_plan_or_raise | ||||
from ..resume_retry import compute_step_keys_to_execute | from ..resume_retry import compute_step_keys_to_execute | ||||
def create_valid_pipeline_run(graphene_info, external_pipeline, execution_params): | def create_valid_pipeline_run(graphene_info, external_pipeline, execution_params): | ||||
ensure_valid_config(external_pipeline, execution_params.mode, execution_params.run_config) | ensure_valid_config(external_pipeline, execution_params.mode, execution_params.run_config) | ||||
step_keys_to_execute = compute_step_keys_to_execute( | step_keys_to_execute = compute_step_keys_to_execute( | ||||
graphene_info, external_pipeline, execution_params | graphene_info, external_pipeline, execution_params | ||||
) | ) | ||||
external_execution_plan = get_external_execution_plan_or_raise( | external_execution_plan = get_external_execution_plan_or_raise( | ||||
graphene_info=graphene_info, | graphene_info=graphene_info, | ||||
external_pipeline=external_pipeline, | external_pipeline=external_pipeline, | ||||
mode=execution_params.mode, | mode=execution_params.mode, | ||||
run_config=execution_params.run_config, | run_config=execution_params.run_config, | ||||
step_keys_to_execute=step_keys_to_execute, | step_keys_to_execute=step_keys_to_execute, | ||||
) | ) | ||||
tags = merge_dicts(external_pipeline.tags, execution_params.execution_metadata.tags) | |||||
if is_memoized_run(tags): | |||||
logging.warning( | |||||
'Tag "{tag}" was found when initializing pipeline run, however, memoized ' | |||||
"execution is only supported from the command line. This pipeline will run, but " | |||||
"outputs from previous executions will be ignored.".format(tag=MEMOIZED_RUN_TAG) | |||||
) | |||||
alangenfeld: this wont be visible to anyone really - will just be in the stdout logs of the webserver. I… | |||||
Done Inline Actionsthat makes sense, will do. cdecarolis: that makes sense, will do. | |||||
return graphene_info.context.instance.create_run( | return graphene_info.context.instance.create_run( | ||||
pipeline_snapshot=external_pipeline.pipeline_snapshot, | pipeline_snapshot=external_pipeline.pipeline_snapshot, | ||||
execution_plan_snapshot=external_execution_plan.execution_plan_snapshot, | execution_plan_snapshot=external_execution_plan.execution_plan_snapshot, | ||||
parent_pipeline_snapshot=external_pipeline.parent_pipeline_snapshot, | parent_pipeline_snapshot=external_pipeline.parent_pipeline_snapshot, | ||||
pipeline_name=execution_params.selector.pipeline_name, | pipeline_name=execution_params.selector.pipeline_name, | ||||
run_id=execution_params.execution_metadata.run_id | run_id=execution_params.execution_metadata.run_id | ||||
if execution_params.execution_metadata.run_id | if execution_params.execution_metadata.run_id | ||||
else make_new_run_id(), | else make_new_run_id(), | ||||
solids_to_execute=frozenset(execution_params.selector.solid_selection) | solids_to_execute=frozenset(execution_params.selector.solid_selection) | ||||
if execution_params.selector.solid_selection | if execution_params.selector.solid_selection | ||||
else None, | else None, | ||||
run_config=execution_params.run_config, | run_config=execution_params.run_config, | ||||
mode=execution_params.mode, | mode=execution_params.mode, | ||||
step_keys_to_execute=step_keys_to_execute, | step_keys_to_execute=step_keys_to_execute, | ||||
tags=merge_dicts(external_pipeline.tags, execution_params.execution_metadata.tags), | tags=tags, | ||||
root_run_id=execution_params.execution_metadata.root_run_id, | root_run_id=execution_params.execution_metadata.root_run_id, | ||||
parent_run_id=execution_params.execution_metadata.parent_run_id, | parent_run_id=execution_params.execution_metadata.parent_run_id, | ||||
status=PipelineRunStatus.NOT_STARTED, | status=PipelineRunStatus.NOT_STARTED, | ||||
external_pipeline_origin=external_pipeline.get_external_origin(), | external_pipeline_origin=external_pipeline.get_external_origin(), | ||||
) | ) | ||||
Not Done Inline Actionsnit: link to issue alangenfeld: nit: link to issue |
this wont be visible to anyone really - will just be in the stdout logs of the webserver. I think you need to report an engine event after the run is created