Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/execution/api.py
import sys | import sys | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions import IPipeline, PipelineDefinition, SystemStorageData | from dagster.core.definitions import IPipeline, PipelineDefinition, SystemStorageData | ||||
from dagster.core.definitions.pipeline import PipelineSubsetDefinition | from dagster.core.definitions.pipeline import PipelineSubsetDefinition | ||||
from dagster.core.definitions.pipeline_base import InMemoryPipeline | from dagster.core.definitions.pipeline_base import InMemoryPipeline | ||||
from dagster.core.errors import DagsterInvariantViolationError | from dagster.core.errors import DagsterInvariantViolationError | ||||
from dagster.core.events import DagsterEvent | from dagster.core.events import DagsterEvent | ||||
from dagster.core.execution.context.system import SystemPipelineExecutionContext | from dagster.core.execution.context.system import SystemPipelineExecutionContext | ||||
from dagster.core.execution.plan.execute_plan import inner_plan_execution_iterator | from dagster.core.execution.plan.execute_plan import inner_plan_execution_iterator | ||||
from dagster.core.execution.plan.plan import ExecutionPlan | from dagster.core.execution.plan.plan import ExecutionPlan | ||||
from dagster.core.execution.resolve_versions import resolve_memoized_execution_plan | |||||
from dagster.core.execution.retries import Retries | from dagster.core.execution.retries import Retries | ||||
from dagster.core.instance import DagsterInstance, is_memoized_run | from dagster.core.instance import DagsterInstance, is_memoized_run | ||||
from dagster.core.selector import parse_step_selection | from dagster.core.selector import parse_step_selection | ||||
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus | ||||
from dagster.core.system_config.objects import EnvironmentConfig | from dagster.core.system_config.objects import EnvironmentConfig | ||||
from dagster.core.telemetry import log_repo_stats, telemetry_wrapper | from dagster.core.telemetry import log_repo_stats, telemetry_wrapper | ||||
from dagster.core.utils import str_format_set | from dagster.core.utils import str_format_set | ||||
from dagster.utils import delay_interrupts, merge_dicts | from dagster.utils import delay_interrupts, merge_dicts | ||||
▲ Show 20 Lines • Show All 133 Lines • ▼ Show 20 Lines | def execute_run(pipeline, pipeline_run, instance, raise_on_error=False): | ||||
execution_plan = create_execution_plan( | execution_plan = create_execution_plan( | ||||
pipeline, | pipeline, | ||||
run_config=pipeline_run.run_config, | run_config=pipeline_run.run_config, | ||||
mode=pipeline_run.mode, | mode=pipeline_run.mode, | ||||
step_keys_to_execute=pipeline_run.step_keys_to_execute, | step_keys_to_execute=pipeline_run.step_keys_to_execute, | ||||
) | ) | ||||
if is_memoized_run(pipeline_run.tags): | if is_memoized_run(pipeline_run.tags): | ||||
execution_plan = instance.resolve_memoized_execution_plan(execution_plan) | execution_plan = resolve_memoized_execution_plan(execution_plan) | ||||
_execute_run_iterable = _ExecuteRunWithPlanIterable( | _execute_run_iterable = _ExecuteRunWithPlanIterable( | ||||
execution_plan=execution_plan, | execution_plan=execution_plan, | ||||
iterator=_pipeline_execution_iterator, | iterator=_pipeline_execution_iterator, | ||||
execution_context_manager=PipelineExecutionContextManager( | execution_context_manager=PipelineExecutionContextManager( | ||||
execution_plan=execution_plan, | execution_plan=execution_plan, | ||||
pipeline_run=pipeline_run, | pipeline_run=pipeline_run, | ||||
instance=instance, | instance=instance, | ||||
▲ Show 20 Lines • Show All 655 Lines • Show Last 20 Lines |