Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/execution/plan/execute_plan.py
import sys | import sys | ||||
from contextlib import ExitStack | |||||
from typing import Iterator, List, cast | from typing import Iterator, List, cast | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions import Failure, HookExecutionResult, RetryRequested | from dagster.core.definitions import Failure, HookExecutionResult, RetryRequested | ||||
from dagster.core.errors import ( | from dagster.core.errors import ( | ||||
DagsterError, | DagsterError, | ||||
DagsterExecutionInterruptedError, | DagsterExecutionInterruptedError, | ||||
DagsterUserCodeExecutionError, | DagsterUserCodeExecutionError, | ||||
HookExecutionError, | HookExecutionError, | ||||
user_code_error_boundary, | user_code_error_boundary, | ||||
) | ) | ||||
from dagster.core.events import DagsterEvent | from dagster.core.events import DagsterEvent | ||||
from dagster.core.execution.context.system import PlanExecutionContext, StepExecutionContext | from dagster.core.execution.context.system import PlanExecutionContext, StepExecutionContext | ||||
from dagster.core.execution.plan.execute_step import core_dagster_event_sequence_for_step | from dagster.core.execution.plan.execute_step import core_dagster_event_sequence_for_step | ||||
from dagster.core.execution.plan.objects import ( | from dagster.core.execution.plan.objects import ( | ||||
ErrorSource, | ErrorSource, | ||||
StepFailureData, | StepFailureData, | ||||
StepRetryData, | StepRetryData, | ||||
UserFailureData, | UserFailureData, | ||||
step_failure_event_from_exc_info, | step_failure_event_from_exc_info, | ||||
) | ) | ||||
from dagster.core.execution.plan.plan import ExecutionPlan | from dagster.core.execution.plan.plan import ExecutionPlan | ||||
from dagster.core.storage.captured_log_manager import CapturedLogManager | |||||
from dagster.utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info | from dagster.utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info | ||||
def inner_plan_execution_iterator( | def inner_plan_execution_iterator( | ||||
pipeline_context: PlanExecutionContext, execution_plan: ExecutionPlan | pipeline_context: PlanExecutionContext, execution_plan: ExecutionPlan | ||||
) -> Iterator[DagsterEvent]: | ) -> Iterator[DagsterEvent]: | ||||
check.inst_param(pipeline_context, "pipeline_context", PlanExecutionContext) | check.inst_param(pipeline_context, "pipeline_context", PlanExecutionContext) | ||||
check.inst_param(execution_plan, "execution_plan", ExecutionPlan) | check.inst_param(execution_plan, "execution_plan", ExecutionPlan) | ||||
Show All 22 Lines | with execution_plan.start(retry_mode=pipeline_context.retry_mode) as active_execution: | ||||
len(missing_resources) == 0, | len(missing_resources) == 0, | ||||
( | ( | ||||
"Expected step context for solid {solid_name} to have all required resources, but " | "Expected step context for solid {solid_name} to have all required resources, but " | ||||
"missing {missing_resources}." | "missing {missing_resources}." | ||||
).format(solid_name=step_context.solid.name, missing_resources=missing_resources), | ).format(solid_name=step_context.solid.name, missing_resources=missing_resources), | ||||
) | ) | ||||
# capture all of the logs for this step | # capture all of the logs for this step | ||||
with pipeline_context.instance.compute_log_manager.watch( | with ExitStack() as stack: | ||||
step_context.pipeline_run, step_context.step.key | log_manager = pipeline_context.instance.compute_log_manager | ||||
): | if isinstance(log_manager, CapturedLogManager): | ||||
stack.enter_context( | |||||
log_manager.capture_logs( | |||||
log_key=step_context.step.key, | |||||
namespace=step_context.pipeline_run.run_id, | |||||
) | |||||
) | |||||
else: | |||||
stack.enter_context( | |||||
log_manager.watch(step_context.pipeline_run, step_context.step.key) | |||||
) | |||||
yield DagsterEvent.capture_logs( | yield DagsterEvent.capture_logs( | ||||
step_context, log_key=step_context.step.key, steps=[step_context.step] | step_context, log_key=step_context.step.key, steps=[step_context.step] | ||||
) | ) | ||||
for step_event in check.generator(_dagster_event_sequence_for_step(step_context)): | for step_event in check.generator(_dagster_event_sequence_for_step(step_context)): | ||||
check.inst(step_event, DagsterEvent) | check.inst(step_event, DagsterEvent) | ||||
step_event_list.append(step_event) | step_event_list.append(step_event) | ||||
yield step_event | yield step_event | ||||
▲ Show 20 Lines • Show All 220 Lines • Show Last 20 Lines |