Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/events/__init__.py
Show First 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | EventSpecificData = Union[ | ||||
StepInputData, | StepInputData, | ||||
"EngineEventData", | "EngineEventData", | ||||
"HookErroredData", | "HookErroredData", | ||||
StepRetryData, | StepRetryData, | ||||
"PipelineFailureData", | "PipelineFailureData", | ||||
"PipelineCanceledData", | "PipelineCanceledData", | ||||
"ObjectStoreOperationResultData", | "ObjectStoreOperationResultData", | ||||
"HandledOutputData", | "HandledOutputData", | ||||
"PipelineInitFailureData", | |||||
"LoadedInputData", | "LoadedInputData", | ||||
"ComputeLogsCaptureData", | "ComputeLogsCaptureData", | ||||
] | ] | ||||
class DagsterEventType(Enum): | class DagsterEventType(Enum): | ||||
"""The types of events that may be yielded by solid and pipeline execution.""" | """The types of events that may be yielded by solid and pipeline execution.""" | ||||
STEP_OUTPUT = "STEP_OUTPUT" | STEP_OUTPUT = "STEP_OUTPUT" | ||||
STEP_INPUT = "STEP_INPUT" | STEP_INPUT = "STEP_INPUT" | ||||
STEP_FAILURE = "STEP_FAILURE" | STEP_FAILURE = "STEP_FAILURE" | ||||
STEP_START = "STEP_START" | STEP_START = "STEP_START" | ||||
STEP_SUCCESS = "STEP_SUCCESS" | STEP_SUCCESS = "STEP_SUCCESS" | ||||
STEP_SKIPPED = "STEP_SKIPPED" | STEP_SKIPPED = "STEP_SKIPPED" | ||||
STEP_UP_FOR_RETRY = "STEP_UP_FOR_RETRY" # "failed" but want to retry | STEP_UP_FOR_RETRY = "STEP_UP_FOR_RETRY" # "failed" but want to retry | ||||
STEP_RESTARTED = "STEP_RESTARTED" | STEP_RESTARTED = "STEP_RESTARTED" | ||||
ASSET_MATERIALIZATION = "ASSET_MATERIALIZATION" | ASSET_MATERIALIZATION = "ASSET_MATERIALIZATION" | ||||
STEP_EXPECTATION_RESULT = "STEP_EXPECTATION_RESULT" | STEP_EXPECTATION_RESULT = "STEP_EXPECTATION_RESULT" | ||||
PIPELINE_INIT_FAILURE = "PIPELINE_INIT_FAILURE" | |||||
PIPELINE_ENQUEUED = "PIPELINE_ENQUEUED" | PIPELINE_ENQUEUED = "PIPELINE_ENQUEUED" | ||||
PIPELINE_DEQUEUED = "PIPELINE_DEQUEUED" | PIPELINE_DEQUEUED = "PIPELINE_DEQUEUED" | ||||
PIPELINE_STARTING = "PIPELINE_STARTING" # Launch is happening, execution hasn't started yet | PIPELINE_STARTING = "PIPELINE_STARTING" # Launch is happening, execution hasn't started yet | ||||
PIPELINE_START = "PIPELINE_START" # Execution has started | PIPELINE_START = "PIPELINE_START" # Execution has started | ||||
PIPELINE_SUCCESS = "PIPELINE_SUCCESS" | PIPELINE_SUCCESS = "PIPELINE_SUCCESS" | ||||
PIPELINE_FAILURE = "PIPELINE_FAILURE" | PIPELINE_FAILURE = "PIPELINE_FAILURE" | ||||
Show All 28 Lines | STEP_EVENTS = { | ||||
DagsterEventType.OBJECT_STORE_OPERATION, | DagsterEventType.OBJECT_STORE_OPERATION, | ||||
DagsterEventType.HANDLED_OUTPUT, | DagsterEventType.HANDLED_OUTPUT, | ||||
DagsterEventType.LOADED_INPUT, | DagsterEventType.LOADED_INPUT, | ||||
DagsterEventType.STEP_RESTARTED, | DagsterEventType.STEP_RESTARTED, | ||||
DagsterEventType.STEP_UP_FOR_RETRY, | DagsterEventType.STEP_UP_FOR_RETRY, | ||||
} | } | ||||
FAILURE_EVENTS = { | FAILURE_EVENTS = { | ||||
DagsterEventType.PIPELINE_INIT_FAILURE, | |||||
DagsterEventType.PIPELINE_FAILURE, | DagsterEventType.PIPELINE_FAILURE, | ||||
DagsterEventType.STEP_FAILURE, | DagsterEventType.STEP_FAILURE, | ||||
DagsterEventType.PIPELINE_CANCELED, | DagsterEventType.PIPELINE_CANCELED, | ||||
} | } | ||||
PIPELINE_EVENTS = { | PIPELINE_EVENTS = { | ||||
DagsterEventType.PIPELINE_ENQUEUED, | DagsterEventType.PIPELINE_ENQUEUED, | ||||
DagsterEventType.PIPELINE_DEQUEUED, | DagsterEventType.PIPELINE_DEQUEUED, | ||||
DagsterEventType.PIPELINE_STARTING, | DagsterEventType.PIPELINE_STARTING, | ||||
DagsterEventType.PIPELINE_START, | DagsterEventType.PIPELINE_START, | ||||
DagsterEventType.PIPELINE_SUCCESS, | DagsterEventType.PIPELINE_SUCCESS, | ||||
DagsterEventType.PIPELINE_INIT_FAILURE, | |||||
DagsterEventType.PIPELINE_FAILURE, | DagsterEventType.PIPELINE_FAILURE, | ||||
DagsterEventType.PIPELINE_CANCELING, | DagsterEventType.PIPELINE_CANCELING, | ||||
DagsterEventType.PIPELINE_CANCELED, | DagsterEventType.PIPELINE_CANCELED, | ||||
} | } | ||||
HOOK_EVENTS = { | HOOK_EVENTS = { | ||||
DagsterEventType.HOOK_COMPLETED, | DagsterEventType.HOOK_COMPLETED, | ||||
DagsterEventType.HOOK_ERRORED, | DagsterEventType.HOOK_ERRORED, | ||||
▲ Show 20 Lines • Show All 297 Lines • ▼ Show 20 Lines | ): | ||||
def is_pipeline_success(self) -> bool: | def is_pipeline_success(self) -> bool: | ||||
return self.event_type == DagsterEventType.PIPELINE_SUCCESS | return self.event_type == DagsterEventType.PIPELINE_SUCCESS | ||||
@property | @property | ||||
def is_pipeline_failure(self) -> bool: | def is_pipeline_failure(self) -> bool: | ||||
return self.event_type == DagsterEventType.PIPELINE_FAILURE | return self.event_type == DagsterEventType.PIPELINE_FAILURE | ||||
@property | @property | ||||
def is_pipeline_init_failure(self) -> bool: | |||||
return self.event_type == DagsterEventType.PIPELINE_INIT_FAILURE | |||||
@property | |||||
def is_failure(self) -> bool: | def is_failure(self) -> bool: | ||||
return self.event_type in FAILURE_EVENTS | return self.event_type in FAILURE_EVENTS | ||||
@property | @property | ||||
def is_pipeline_event(self) -> bool: | def is_pipeline_event(self) -> bool: | ||||
return self.event_type in PIPELINE_EVENTS | return self.event_type in PIPELINE_EVENTS | ||||
@property | @property | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | def step_expectation_result_data(self) -> "StepExpectationResultData": | ||||
_assert_type( | _assert_type( | ||||
"step_expectation_result_data", | "step_expectation_result_data", | ||||
DagsterEventType.STEP_EXPECTATION_RESULT, | DagsterEventType.STEP_EXPECTATION_RESULT, | ||||
self.event_type, | self.event_type, | ||||
) | ) | ||||
return cast(StepExpectationResultData, self.event_specific_data) | return cast(StepExpectationResultData, self.event_specific_data) | ||||
@property | @property | ||||
def pipeline_init_failure_data(self) -> "PipelineInitFailureData": | |||||
_assert_type( | |||||
"pipeline_init_failure_data", DagsterEventType.PIPELINE_INIT_FAILURE, self.event_type | |||||
) | |||||
return cast(PipelineInitFailureData, self.event_specific_data) | |||||
@property | |||||
def pipeline_failure_data(self) -> "PipelineFailureData": | def pipeline_failure_data(self) -> "PipelineFailureData": | ||||
_assert_type("pipeline_failure_data", DagsterEventType.PIPELINE_FAILURE, self.event_type) | _assert_type("pipeline_failure_data", DagsterEventType.PIPELINE_FAILURE, self.event_type) | ||||
return cast(PipelineFailureData, self.event_specific_data) | return cast(PipelineFailureData, self.event_specific_data) | ||||
@property | @property | ||||
def engine_event_data(self) -> "EngineEventData": | def engine_event_data(self) -> "EngineEventData": | ||||
_assert_type("engine_event_data", DagsterEventType.ENGINE_EVENT, self.event_type) | _assert_type("engine_event_data", DagsterEventType.ENGINE_EVENT, self.event_type) | ||||
return cast(EngineEventData, self.event_specific_data) | return cast(EngineEventData, self.event_specific_data) | ||||
▲ Show 20 Lines • Show All 198 Lines • ▼ Show 20 Lines | def pipeline_success(pipeline_context: IPlanContext) -> "DagsterEvent": | ||||
pipeline_context, | pipeline_context, | ||||
message='Finished execution of pipeline "{pipeline_name}".'.format( | message='Finished execution of pipeline "{pipeline_name}".'.format( | ||||
pipeline_name=pipeline_context.pipeline_name | pipeline_name=pipeline_context.pipeline_name | ||||
), | ), | ||||
) | ) | ||||
@staticmethod | @staticmethod | ||||
def pipeline_failure( | def pipeline_failure( | ||||
pipeline_context: IPlanContext, | pipeline_context_or_name: Union[IPlanContext, str], | ||||
context_msg: str, | context_msg: str, | ||||
error_info: Optional[SerializableErrorInfo] = None, | error_info: Optional[SerializableErrorInfo] = None, | ||||
) -> "DagsterEvent": | ) -> "DagsterEvent": | ||||
check.str_param(context_msg, "context_msg") | |||||
if isinstance(pipeline_context_or_name, IPlanContext): | |||||
return DagsterEvent.from_pipeline( | return DagsterEvent.from_pipeline( | ||||
DagsterEventType.PIPELINE_FAILURE, | DagsterEventType.PIPELINE_FAILURE, | ||||
pipeline_context, | pipeline_context_or_name, | ||||
message='Execution of pipeline "{pipeline_name}" failed. {context_msg}'.format( | message='Execution of pipeline "{pipeline_name}" failed. {context_msg}'.format( | ||||
pipeline_name=pipeline_context.pipeline_name, | pipeline_name=pipeline_context_or_name.pipeline_name, | ||||
context_msg=context_msg, | context_msg=context_msg, | ||||
), | ), | ||||
event_specific_data=PipelineFailureData(error_info), | event_specific_data=PipelineFailureData(error_info), | ||||
) | ) | ||||
else: | |||||
# when the failure happens trying to bring up context, the pipeline_context hasn't been | |||||
# built and so can't use from_pipeline | |||||
check.str_param(pipeline_context_or_name, "pipeline_name") | |||||
event = DagsterEvent( | |||||
event_type_value=DagsterEventType.PIPELINE_FAILURE.value, | |||||
pipeline_name=pipeline_context_or_name, | |||||
event_specific_data=PipelineFailureData(error_info), | |||||
message='Execution of pipeline "{pipeline_name}" failed. {context_msg}'.format( | |||||
pipeline_name=pipeline_context_or_name, | |||||
context_msg=context_msg, | |||||
), | |||||
alangenfeld: can we just use the `context_msg` like above for the init failure case instead of encoding… | |||||
pid=os.getpid(), | |||||
) | |||||
return event | |||||
@staticmethod | @staticmethod | ||||
def pipeline_canceled( | def pipeline_canceled( | ||||
pipeline_context: IPlanContext, error_info: Optional[SerializableErrorInfo] = None | pipeline_context: IPlanContext, error_info: Optional[SerializableErrorInfo] = None | ||||
) -> "DagsterEvent": | ) -> "DagsterEvent": | ||||
return DagsterEvent.from_pipeline( | return DagsterEvent.from_pipeline( | ||||
DagsterEventType.PIPELINE_CANCELED, | DagsterEventType.PIPELINE_CANCELED, | ||||
pipeline_context, | pipeline_context, | ||||
▲ Show 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | ) -> "DagsterEvent": | ||||
metadata_entries=[], | metadata_entries=[], | ||||
marker_start=None, | marker_start=None, | ||||
marker_end=None, | marker_end=None, | ||||
error=error, | error=error, | ||||
), | ), | ||||
) | ) | ||||
@staticmethod | @staticmethod | ||||
def pipeline_init_failure( | |||||
pipeline_name: str, failure_data: "PipelineInitFailureData", log_manager: DagsterLogManager | |||||
) -> "DagsterEvent": | |||||
# this failure happens trying to bring up context so can't use from_pipeline | |||||
event = DagsterEvent( | |||||
event_type_value=DagsterEventType.PIPELINE_INIT_FAILURE.value, | |||||
pipeline_name=pipeline_name, | |||||
event_specific_data=failure_data, | |||||
message=( | |||||
'Pipeline failure during initialization for pipeline "{pipeline_name}". ' | |||||
"This may be due to a failure in initializing the executor or one of the loggers." | |||||
).format(pipeline_name=pipeline_name), | |||||
pid=os.getpid(), | |||||
) | |||||
log_manager.error( | |||||
event.message | |||||
or "{event_type} for pipeline {pipeline_name}".format( | |||||
event_type=DagsterEventType.PIPELINE_INIT_FAILURE, pipeline_name=pipeline_name | |||||
), | |||||
dagster_event=event, | |||||
pipeline_name=pipeline_name, | |||||
) | |||||
return event | |||||
@staticmethod | |||||
def engine_event( | def engine_event( | ||||
pipeline_context: IPlanContext, | pipeline_context: IPlanContext, | ||||
message: str, | message: str, | ||||
event_specific_data: Optional["EngineEventData"] = None, | event_specific_data: Optional["EngineEventData"] = None, | ||||
step_handle: Optional[StepHandle] = None, | step_handle: Optional[StepHandle] = None, | ||||
) -> "DagsterEvent": | ) -> "DagsterEvent": | ||||
return DagsterEvent.from_pipeline( | return DagsterEvent.from_pipeline( | ||||
DagsterEventType.ENGINE_EVENT, | DagsterEventType.ENGINE_EVENT, | ||||
▲ Show 20 Lines • Show All 401 Lines • ▼ Show 20 Lines | def interrupted(steps_interrupted: List[str]) -> "EngineEventData": | ||||
) | ) | ||||
@staticmethod | @staticmethod | ||||
def engine_error(error: SerializableErrorInfo) -> "EngineEventData": | def engine_error(error: SerializableErrorInfo) -> "EngineEventData": | ||||
return EngineEventData(metadata_entries=[], error=error) | return EngineEventData(metadata_entries=[], error=error) | ||||
@whitelist_for_serdes | @whitelist_for_serdes | ||||
class PipelineInitFailureData( | |||||
NamedTuple( | |||||
"_PipelineInitFailureData", | |||||
[ | |||||
("error", SerializableErrorInfo), | |||||
], | |||||
) | |||||
): | |||||
def __new__(cls, error: SerializableErrorInfo): | |||||
return super(PipelineInitFailureData, cls).__new__( | |||||
cls, error=check.inst_param(error, "error", SerializableErrorInfo) | |||||
) | |||||
@whitelist_for_serdes | |||||
class PipelineFailureData( | class PipelineFailureData( | ||||
NamedTuple( | NamedTuple( | ||||
"_PipelineFailureData", | "_PipelineFailureData", | ||||
[ | [ | ||||
("error", Optional[SerializableErrorInfo]), | ("error", Optional[SerializableErrorInfo]), | ||||
], | ], | ||||
) | ) | ||||
): | ): | ||||
▲ Show 20 Lines • Show All 101 Lines • ▼ Show 20 Lines | ): | ||||
def __new__(cls, log_key, step_keys): | def __new__(cls, log_key, step_keys): | ||||
return super(ComputeLogsCaptureData, cls).__new__( | return super(ComputeLogsCaptureData, cls).__new__( | ||||
cls, | cls, | ||||
log_key=check.str_param(log_key, "log_key"), | log_key=check.str_param(log_key, "log_key"), | ||||
step_keys=check.opt_list_param(step_keys, "step_keys", of_type=str), | step_keys=check.opt_list_param(step_keys, "step_keys", of_type=str), | ||||
) | ) | ||||
################################################################################################### | ################################################################################################### | ||||
# THE GRAVEYARD | # THE GRAVEYARD | ||||
Not Done Inline ActionsFOR WHOM THE BELL TOLLS max: FOR WHOM THE BELL TOLLS | |||||
# | # | ||||
# -|- -|- | # -|- -|- -|- | ||||
# | | | # | | | | ||||
# _-'~~~~~`-_ . _-'~~~~~`-_ | # _-'~~~~~`-_ . _-'~~~~~`-_ _-'~~~~~`-_ | ||||
# .' '. .' '. | # .' '. .' '. .' '. | ||||
# | R I P | | R I P | | # | R I P | | R I P | | R I P | | ||||
# | | | | | # | | | | | | | ||||
# | Synthetic | | Asset | | # | Synthetic | | Asset | | Pipeline | | ||||
# | Process | | Store | | # | Process | | Store | | Init | | ||||
# | Events | | Operations | | # | Events | | Operations | | Failures | | ||||
# | | | | | # | | | | | | | ||||
################################################################################################### | ################################################################################################### | ||||
Done Inline Actionsdont forget to add a tombstone alangenfeld: dont forget to add a tombstone | |||||
# Keep these around to prevent issues like https://github.com/dagster-io/dagster/issues/3533 | # Keep these around to prevent issues like https://github.com/dagster-io/dagster/issues/3533 | ||||
@whitelist_for_serdes | @whitelist_for_serdes | ||||
class AssetStoreOperationData(NamedTuple): | class AssetStoreOperationData(NamedTuple): | ||||
op: str | op: str | ||||
step_key: str | step_key: str | ||||
output_name: str | output_name: str | ||||
asset_store_key: str | asset_store_key: str | ||||
@whitelist_for_serdes | @whitelist_for_serdes | ||||
class AssetStoreOperationType(Enum): | class AssetStoreOperationType(Enum): | ||||
SET_ASSET = "SET_ASSET" | SET_ASSET = "SET_ASSET" | ||||
GET_ASSET = "GET_ASSET" | GET_ASSET = "GET_ASSET" | ||||
@whitelist_for_serdes | |||||
class PipelineInitFailureData(NamedTuple): | |||||
error: SerializableErrorInfo | |||||
def _handle_back_compat(event_type_value, event_specific_data): | def _handle_back_compat(event_type_value, event_specific_data): | ||||
# transform old specific process events in to engine events | # transform old specific process events in to engine events | ||||
if event_type_value == "PIPELINE_PROCESS_START": | if event_type_value == "PIPELINE_PROCESS_START": | ||||
return DagsterEventType.ENGINE_EVENT.value, EngineEventData([]) | return DagsterEventType.ENGINE_EVENT.value, EngineEventData([]) | ||||
elif event_type_value == "PIPELINE_PROCESS_STARTED": | elif event_type_value == "PIPELINE_PROCESS_STARTED": | ||||
return DagsterEventType.ENGINE_EVENT.value, EngineEventData([]) | return DagsterEventType.ENGINE_EVENT.value, EngineEventData([]) | ||||
elif event_type_value == "PIPELINE_PROCESS_EXITED": | elif event_type_value == "PIPELINE_PROCESS_EXITED": | ||||
return DagsterEventType.ENGINE_EVENT.value, EngineEventData([]) | return DagsterEventType.ENGINE_EVENT.value, EngineEventData([]) | ||||
Show All 14 Lines | elif event_type_value == "ASSET_STORE_OPERATION": | ||||
event_specific_data.output_name, event_specific_data.asset_store_key, [] | event_specific_data.output_name, event_specific_data.asset_store_key, [] | ||||
), | ), | ||||
) | ) | ||||
# previous name for ASSET_MATERIALIZATION was STEP_MATERIALIZATION | # previous name for ASSET_MATERIALIZATION was STEP_MATERIALIZATION | ||||
if event_type_value == "STEP_MATERIALIZATION": | if event_type_value == "STEP_MATERIALIZATION": | ||||
return DagsterEventType.ASSET_MATERIALIZATION.value, event_specific_data | return DagsterEventType.ASSET_MATERIALIZATION.value, event_specific_data | ||||
else: | # transform PIPELINE_INIT_FAILURE to PIPELINE_FAILURE | ||||
if event_type_value == "PIPELINE_INIT_FAILURE": | |||||
return DagsterEventType.PIPELINE_FAILURE.value, PipelineFailureData( | |||||
event_specific_data.error | |||||
) | |||||
return event_type_value, event_specific_data | return event_type_value, event_specific_data | ||||
register_serdes_tuple_fallbacks( | register_serdes_tuple_fallbacks( | ||||
{ | { | ||||
"PipelineProcessStartedData": None, | "PipelineProcessStartedData": None, | ||||
"PipelineProcessExitedData": None, | "PipelineProcessExitedData": None, | ||||
"PipelineProcessStartData": None, | "PipelineProcessStartData": None, | ||||
} | } | ||||
) | ) |
can we just use the context_msg like above for the init failure case instead of encoding these assumptions about use - i think its better to dupe at call sites then to have this message show up unexpectedly in some future case