Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/executor/in_process.py
import os | import os | ||||
from contextlib import ExitStack | |||||
from dagster import check | from dagster import check | ||||
from dagster.core.events import DagsterEvent, EngineEventData | from dagster.core.events import DagsterEvent, EngineEventData | ||||
from dagster.core.execution.api import ExecuteRunWithPlanIterable | from dagster.core.execution.api import ExecuteRunWithPlanIterable | ||||
from dagster.core.execution.compute_logs import compute_log_key_for_steps | |||||
from dagster.core.execution.context.system import PlanOrchestrationContext | from dagster.core.execution.context.system import PlanOrchestrationContext | ||||
from dagster.core.execution.context_creation_pipeline import PlanExecutionContextManager | from dagster.core.execution.context_creation_pipeline import PlanExecutionContextManager | ||||
from dagster.core.execution.plan.execute_plan import inner_plan_execution_iterator | from dagster.core.execution.plan.execute_plan import inner_plan_execution_iterator | ||||
from dagster.core.execution.plan.plan import ExecutionPlan | from dagster.core.execution.plan.plan import ExecutionPlan | ||||
from dagster.core.execution.retries import RetryMode | from dagster.core.execution.retries import RetryMode | ||||
from dagster.core.storage.captured_log_manager import CapturedLogManager | |||||
from dagster.utils.timing import format_duration, time_execution_scope | from dagster.utils.timing import format_duration, time_execution_scope | ||||
from .base import Executor | from .base import Executor | ||||
class InProcessExecutor(Executor): | class InProcessExecutor(Executor): | ||||
def __init__(self, retries, marker_to_close): | def __init__(self, retries, marker_to_close): | ||||
self._retries = check.inst_param(retries, "retries", RetryMode) | self._retries = check.inst_param(retries, "retries", RetryMode) | ||||
Show All 10 Lines | def execute(self, pipeline_context, execution_plan): | ||||
step_keys_to_execute = execution_plan.step_keys_to_execute | step_keys_to_execute = execution_plan.step_keys_to_execute | ||||
yield DagsterEvent.engine_event( | yield DagsterEvent.engine_event( | ||||
pipeline_context, | pipeline_context, | ||||
"Executing steps in process (pid: {pid})".format(pid=os.getpid()), | "Executing steps in process (pid: {pid})".format(pid=os.getpid()), | ||||
event_specific_data=EngineEventData.in_process(os.getpid(), step_keys_to_execute), | event_specific_data=EngineEventData.in_process(os.getpid(), step_keys_to_execute), | ||||
) | ) | ||||
with time_execution_scope() as timer_result: | with ExitStack() as stack: | ||||
timer_result = stack.enter_context(time_execution_scope()) | |||||
log_manager = pipeline_context.instance.compute_log_manager | |||||
if ( | |||||
isinstance(log_manager, CapturedLogManager) | |||||
and not log_manager.should_capture_run_by_step() | |||||
): | |||||
plan_steps = execution_plan.get_steps_to_execute_in_topo_order() | |||||
log_key = compute_log_key_for_steps(plan_steps) | |||||
stack.enter_context( | |||||
pipeline_context.instance.compute_log_manager.capture_logs( | |||||
pipeline_context.pipeline_run.run_id, log_key | |||||
) | |||||
) | |||||
yield DagsterEvent.capture_logs( | |||||
pipeline_context, log_key, plan_steps, pipeline_context.pipeline_run | |||||
) | |||||
yield from iter( | yield from iter( | ||||
ExecuteRunWithPlanIterable( | ExecuteRunWithPlanIterable( | ||||
execution_plan=pipeline_context.execution_plan, | execution_plan=pipeline_context.execution_plan, | ||||
iterator=inner_plan_execution_iterator, | iterator=inner_plan_execution_iterator, | ||||
execution_context_manager=PlanExecutionContextManager( | execution_context_manager=PlanExecutionContextManager( | ||||
pipeline=pipeline_context.pipeline, | pipeline=pipeline_context.pipeline, | ||||
retry_mode=pipeline_context.retry_mode, | retry_mode=pipeline_context.retry_mode, | ||||
execution_plan=pipeline_context.execution_plan, | execution_plan=pipeline_context.execution_plan, | ||||
Show All 16 Lines |