Differential D8751 Diff 41254 python_modules/dagster/dagster/core/executor/step_delegating/step_handler/base.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/executor/step_delegating/step_handler/base.py
import abc | import abc | ||||
from typing import List, NamedTuple, Optional | from typing import List, Optional | ||||
from dagster import DagsterEvent, DagsterInstance, check | from dagster import DagsterEvent, DagsterInstance | ||||
from dagster.core.storage.pipeline_run import PipelineRun | from dagster.core.storage.pipeline_run import PipelineRun | ||||
from dagster.grpc.types import ExecuteStepArgs | from dagster.grpc.types import ExecuteStepArgs | ||||
from dagster.serdes.serdes import whitelist_for_serdes | |||||
@whitelist_for_serdes | |||||
class PersistedStepHandlerContext( | |||||
NamedTuple( | |||||
"_PersistedStepHandlerContext", | |||||
[ | |||||
("execute_step_args", ExecuteStepArgs), | |||||
], | |||||
) | |||||
): | |||||
def __new__(cls, execute_step_args: ExecuteStepArgs): | |||||
return super(PersistedStepHandlerContext, cls).__new__( | |||||
cls, check.inst_param(execute_step_args, "execute_step_args", ExecuteStepArgs) | |||||
) | |||||
class StepHandlerContext: | class StepHandlerContext: | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
instance: DagsterInstance, | instance: DagsterInstance, | ||||
execute_step_args: ExecuteStepArgs, | execute_step_args: ExecuteStepArgs, | ||||
pipeline_run: Optional[PipelineRun] = None, | pipeline_run: Optional[PipelineRun] = None, | ||||
Show All 14 Lines | def pipeline_run(self) -> PipelineRun: | ||||
self._pipeline_run = run | self._pipeline_run = run | ||||
return self._pipeline_run | return self._pipeline_run | ||||
@property | @property | ||||
def instance(self) -> DagsterInstance: | def instance(self) -> DagsterInstance: | ||||
return self._instance | return self._instance | ||||
def serialize(self) -> PersistedStepHandlerContext: | |||||
return PersistedStepHandlerContext(execute_step_args=self._execute_step_args) | |||||
@classmethod | |||||
def deserialize(cls, instance: DagsterInstance, ctx_tuple: PersistedStepHandlerContext): | |||||
return cls(instance=instance, execute_step_args=ctx_tuple.execute_step_args) | |||||
class StepHandler(abc.ABC): # pylint: disable=no-init | class StepHandler(abc.ABC): # pylint: disable=no-init | ||||
@abc.abstractproperty | @abc.abstractproperty | ||||
def name(self) -> str: | def name(self) -> str: | ||||
pass | pass | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def launch_step(self, step_handler_context: StepHandlerContext) -> List[DagsterEvent]: | def launch_step(self, step_handler_context: StepHandlerContext) -> List[DagsterEvent]: | ||||
Show All 9 Lines |