Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/executor/multiprocess.py
import os | import os | ||||
import sys | import sys | ||||
from contextlib import ExitStack | |||||
from dagster import EventMetadataEntry, check | from dagster import EventMetadataEntry, check | ||||
from dagster.core.errors import DagsterExecutionInterruptedError, DagsterSubprocessError | from dagster.core.errors import DagsterExecutionInterruptedError, DagsterSubprocessError | ||||
from dagster.core.events import DagsterEvent, EngineEventData | from dagster.core.events import DagsterEvent, EngineEventData | ||||
from dagster.core.execution.api import create_execution_plan, execute_plan_iterator | from dagster.core.execution.api import create_execution_plan, execute_plan_iterator | ||||
from dagster.core.execution.compute_logs import compute_log_key_for_steps | |||||
from dagster.core.execution.context.system import PlanOrchestrationContext | from dagster.core.execution.context.system import PlanOrchestrationContext | ||||
from dagster.core.execution.plan.objects import StepFailureData | from dagster.core.execution.plan.objects import StepFailureData | ||||
from dagster.core.execution.plan.plan import ExecutionPlan | from dagster.core.execution.plan.plan import ExecutionPlan | ||||
from dagster.core.execution.retries import RetryMode | from dagster.core.execution.retries import RetryMode | ||||
from dagster.core.executor.base import Executor | from dagster.core.executor.base import Executor | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.storage.captured_log_manager import CapturedLogManager | |||||
from dagster.seven import multiprocessing | from dagster.seven import multiprocessing | ||||
from dagster.utils import start_termination_thread | from dagster.utils import start_termination_thread | ||||
from dagster.utils.error import serializable_error_info_from_exc_info | from dagster.utils.error import serializable_error_info_from_exc_info | ||||
from dagster.utils.timing import format_duration, time_execution_scope | from dagster.utils.timing import format_duration, time_execution_scope | ||||
from .child_process_executor import ( | from .child_process_executor import ( | ||||
ChildProcessCommand, | ChildProcessCommand, | ||||
ChildProcessCrashException, | ChildProcessCrashException, | ||||
Show All 23 Lines | ): | ||||
self.instance_ref = instance_ref | self.instance_ref = instance_ref | ||||
self.term_event = term_event | self.term_event = term_event | ||||
self.recon_pipeline = recon_pipeline | self.recon_pipeline = recon_pipeline | ||||
self.retry_mode = retry_mode | self.retry_mode = retry_mode | ||||
self.known_state = known_state | self.known_state = known_state | ||||
def execute(self): | def execute(self): | ||||
pipeline = self.recon_pipeline | pipeline = self.recon_pipeline | ||||
with DagsterInstance.from_ref(self.instance_ref) as instance: | |||||
with ExitStack() as stack: | |||||
instance = stack.enter_context(DagsterInstance.from_ref(self.instance_ref)) | |||||
start_termination_thread(self.term_event) | start_termination_thread(self.term_event) | ||||
execution_plan = create_execution_plan( | execution_plan = create_execution_plan( | ||||
pipeline=pipeline, | pipeline=pipeline, | ||||
run_config=self.run_config, | run_config=self.run_config, | ||||
mode=self.pipeline_run.mode, | mode=self.pipeline_run.mode, | ||||
step_keys_to_execute=[self.step_key], | step_keys_to_execute=[self.step_key], | ||||
known_state=self.known_state, | known_state=self.known_state, | ||||
) | ) | ||||
yield instance.report_engine_event( | yield instance.report_engine_event( | ||||
"Executing step {} in subprocess".format(self.step_key), | "Executing step {} in subprocess".format(self.step_key), | ||||
self.pipeline_run, | self.pipeline_run, | ||||
EngineEventData( | EngineEventData( | ||||
[ | [ | ||||
EventMetadataEntry.text(str(os.getpid()), "pid"), | EventMetadataEntry.text(str(os.getpid()), "pid"), | ||||
EventMetadataEntry.text(self.step_key, "step_key"), | EventMetadataEntry.text(self.step_key, "step_key"), | ||||
], | ], | ||||
marker_end=DELEGATE_MARKER, | marker_end=DELEGATE_MARKER, | ||||
), | ), | ||||
MultiprocessExecutor, | MultiprocessExecutor, | ||||
self.step_key, | self.step_key, | ||||
) | ) | ||||
if ( | |||||
isinstance(instance.compute_log_manager, CapturedLogManager) | |||||
and not instance.compute_log_manager.should_capture_run_by_step() | |||||
): | |||||
steps = execution_plan.get_steps_to_execute_in_topo_order() | |||||
log_key = compute_log_key_for_steps(steps) | |||||
stack.enter_context( | |||||
instance.compute_log_manager.capture_logs(self.pipeline_run.run_id, log_key) | |||||
) | |||||
# need to manually provide a pipeline run since we are generating this event outside | |||||
# of a pipeline context | |||||
capture_event = DagsterEvent.capture_logs( | |||||
alangenfeld: kwargs for clarity (mostly for what is None) | |||||
pipeline_context=None, | |||||
Not Done Inline Actions[1] alangenfeld: [1] | |||||
log_key=log_key, | |||||
steps=steps, | |||||
pipeline_run=self.pipeline_run, | |||||
) | |||||
# without a pipeline context and corresponding log manager, we need to manually | |||||
# report this event on the instance | |||||
instance.report_dagster_event(capture_event) | |||||
yield capture_event | |||||
yield from execute_plan_iterator( | yield from execute_plan_iterator( | ||||
execution_plan, | execution_plan, | ||||
pipeline, | pipeline, | ||||
self.pipeline_run, | self.pipeline_run, | ||||
run_config=self.run_config, | run_config=self.run_config, | ||||
retry_mode=self.retry_mode.for_inner_plan(), | retry_mode=self.retry_mode.for_inner_plan(), | ||||
instance=instance, | instance=instance, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 191 Lines • Show Last 20 Lines |
kwargs for clarity (mostly for what is None)