Differential D6164 Diff 30546 python_modules/dagster-graphql/dagster_graphql/implementation/execution/launch_execution.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/implementation/execution/launch_execution.py
from dagster import check | from dagster import check | ||||
from graphql.execution.base import ResolveInfo | from graphql.execution.base import ResolveInfo | ||||
from ..external import get_external_pipeline_or_raise | from ..external import get_external_pipeline_or_raise | ||||
from ..utils import ExecutionMetadata, ExecutionParams, capture_dauphin_error | from ..utils import ExecutionMetadata, ExecutionParams, capture_error | ||||
from .run_lifecycle import create_valid_pipeline_run | from .run_lifecycle import create_valid_pipeline_run | ||||
@capture_dauphin_error | @capture_error | ||||
def launch_pipeline_reexecution(graphene_info, execution_params): | def launch_pipeline_reexecution(graphene_info, execution_params): | ||||
return _launch_pipeline_execution(graphene_info, execution_params, is_reexecuted=True) | return _launch_pipeline_execution(graphene_info, execution_params, is_reexecuted=True) | ||||
@capture_dauphin_error | @capture_error | ||||
def launch_pipeline_execution(graphene_info, execution_params): | def launch_pipeline_execution(graphene_info, execution_params): | ||||
return _launch_pipeline_execution(graphene_info, execution_params) | return _launch_pipeline_execution(graphene_info, execution_params) | ||||
def do_launch(graphene_info, execution_params, is_reexecuted=False): | def do_launch(graphene_info, execution_params, is_reexecuted=False): | ||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(execution_params, "execution_params", ExecutionParams) | check.inst_param(execution_params, "execution_params", ExecutionParams) | ||||
check.bool_param(is_reexecuted, "is_reexecuted") | check.bool_param(is_reexecuted, "is_reexecuted") | ||||
Show All 11 Lines | def do_launch(graphene_info, execution_params, is_reexecuted=False): | ||||
pipeline_run = create_valid_pipeline_run(graphene_info, external_pipeline, execution_params) | pipeline_run = create_valid_pipeline_run(graphene_info, external_pipeline, execution_params) | ||||
return graphene_info.context.instance.submit_run( | return graphene_info.context.instance.submit_run( | ||||
pipeline_run.run_id, external_pipeline=external_pipeline | pipeline_run.run_id, external_pipeline=external_pipeline | ||||
) | ) | ||||
def _launch_pipeline_execution(graphene_info, execution_params, is_reexecuted=False): | def _launch_pipeline_execution(graphene_info, execution_params, is_reexecuted=False): | ||||
from ...schema.pipelines.pipeline import GraphenePipelineRun | |||||
from ...schema.runs import GrapheneLaunchPipelineRunSuccess | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(execution_params, "execution_params", ExecutionParams) | check.inst_param(execution_params, "execution_params", ExecutionParams) | ||||
check.bool_param(is_reexecuted, "is_reexecuted") | check.bool_param(is_reexecuted, "is_reexecuted") | ||||
run = do_launch(graphene_info, execution_params, is_reexecuted) | run = do_launch(graphene_info, execution_params, is_reexecuted) | ||||
return graphene_info.schema.type_named("LaunchPipelineRunSuccess")( | return GrapheneLaunchPipelineRunSuccess(run=GraphenePipelineRun(run)) | ||||
run=graphene_info.schema.type_named("PipelineRun")(run) | |||||
) |