Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/execution/results.py
from collections import defaultdict | from collections import defaultdict | ||||
import six | import six | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions import PipelineDefinition, Solid, SolidHandle | from dagster.core.definitions import GraphDefinition, PipelineDefinition, Solid, SolidHandle | ||||
from dagster.core.definitions.events import ObjectStoreOperation | from dagster.core.definitions.events import ObjectStoreOperation | ||||
from dagster.core.definitions.solid_container import IContainSolids | |||||
from dagster.core.definitions.utils import DEFAULT_OUTPUT | from dagster.core.definitions.utils import DEFAULT_OUTPUT | ||||
from dagster.core.errors import DagsterInvariantViolationError | from dagster.core.errors import DagsterInvariantViolationError | ||||
from dagster.core.events import DagsterEvent, DagsterEventType | from dagster.core.events import DagsterEvent, DagsterEventType | ||||
from dagster.core.execution.plan.objects import StepKind | from dagster.core.execution.plan.objects import StepKind | ||||
def _construct_events_by_step_key(event_list): | def _construct_events_by_step_key(event_list): | ||||
events_by_step_key = defaultdict(list) | events_by_step_key = defaultdict(list) | ||||
for event in event_list: | for event in event_list: | ||||
events_by_step_key[event.step_key].append(event) | events_by_step_key[event.step_key].append(event) | ||||
return dict(events_by_step_key) | return dict(events_by_step_key) | ||||
class IContainSolidsExecutionResult(object): | class GraphExecutionResult(object): | ||||
def __init__(self, container, event_list, reconstruct_context, handle=None): | def __init__(self, container, event_list, reconstruct_context, handle=None): | ||||
self.container = check.inst_param(container, "container", IContainSolids) | self.container = check.inst_param(container, "container", GraphDefinition) | ||||
self.event_list = check.list_param(event_list, "step_event_list", of_type=DagsterEvent) | self.event_list = check.list_param(event_list, "step_event_list", of_type=DagsterEvent) | ||||
self.reconstruct_context = check.callable_param(reconstruct_context, "reconstruct_context") | self.reconstruct_context = check.callable_param(reconstruct_context, "reconstruct_context") | ||||
self.handle = check.opt_inst_param(handle, "handle", SolidHandle) | self.handle = check.opt_inst_param(handle, "handle", SolidHandle) | ||||
self._events_by_step_key = _construct_events_by_step_key(event_list) | self._events_by_step_key = _construct_events_by_step_key(event_list) | ||||
@property | @property | ||||
def success(self): | def success(self): | ||||
"""bool: Whether all steps in the execution were successful.""" | """bool: Whether all steps in the execution were successful.""" | ||||
▲ Show 20 Lines • Show All 99 Lines • ▼ Show 20 Lines | def result_for_handle(self, handle): | ||||
else: | else: | ||||
check.inst_param(handle, "handle", SolidHandle) | check.inst_param(handle, "handle", SolidHandle) | ||||
solid = self.container.get_solid(handle) | solid = self.container.get_solid(handle) | ||||
return self._result_for_handle(solid, handle) | return self._result_for_handle(solid, handle) | ||||
class PipelineExecutionResult(IContainSolidsExecutionResult): | class PipelineExecutionResult(GraphExecutionResult): | ||||
"""The result of executing a pipeline. | """The result of executing a pipeline. | ||||
Returned by :py:func:`execute_pipeline`. Users should not instantiate this class. | Returned by :py:func:`execute_pipeline`. Users should not instantiate this class. | ||||
""" | """ | ||||
def __init__(self, pipeline_def, run_id, event_list, reconstruct_context): | def __init__(self, pipeline_def, run_id, event_list, reconstruct_context): | ||||
self.run_id = check.str_param(run_id, "run_id") | self.run_id = check.str_param(run_id, "run_id") | ||||
check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) | check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) | ||||
super(PipelineExecutionResult, self).__init__( | super(PipelineExecutionResult, self).__init__( | ||||
container=pipeline_def, event_list=event_list, reconstruct_context=reconstruct_context, | container=pipeline_def, event_list=event_list, reconstruct_context=reconstruct_context, | ||||
) | ) | ||||
@property | @property | ||||
def pipeline_def(self): | def pipeline_def(self): | ||||
return self.container | return self.container | ||||
class CompositeSolidExecutionResult(IContainSolidsExecutionResult): | class CompositeSolidExecutionResult(GraphExecutionResult): | ||||
"""Execution result for a composite solid in a pipeline. | """Execution result for a composite solid in a pipeline. | ||||
Users should not instantiate this class. | Users should not instantiate this class. | ||||
""" | """ | ||||
def __init__(self, solid, event_list, step_events_by_kind, reconstruct_context, handle=None): | def __init__(self, solid, event_list, step_events_by_kind, reconstruct_context, handle=None): | ||||
check.inst_param(solid, "solid", Solid) | check.inst_param(solid, "solid", Solid) | ||||
check.invariant( | check.invariant( | ||||
▲ Show 20 Lines • Show All 324 Lines • Show Last 20 Lines |