Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/events/__init__.py
Show All 21 Lines | from dagster.core.execution.context.system import ( | ||||
IStepContext, | IStepContext, | ||||
PlanExecutionContext, | PlanExecutionContext, | ||||
PlanOrchestrationContext, | PlanOrchestrationContext, | ||||
StepExecutionContext, | StepExecutionContext, | ||||
) | ) | ||||
from dagster.core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle | from dagster.core.execution.plan.handle import ResolvedFromDynamicStepHandle, StepHandle | ||||
from dagster.core.execution.plan.outputs import StepOutputData | from dagster.core.execution.plan.outputs import StepOutputData | ||||
from dagster.core.log_manager import DagsterLogManager | from dagster.core.log_manager import DagsterLogManager | ||||
from dagster.core.storage.pipeline_run import PipelineRun | |||||
from dagster.serdes import register_serdes_tuple_fallbacks, whitelist_for_serdes | from dagster.serdes import register_serdes_tuple_fallbacks, whitelist_for_serdes | ||||
from dagster.utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info | from dagster.utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info | ||||
from dagster.utils.timing import format_duration | from dagster.utils.timing import format_duration | ||||
if TYPE_CHECKING: | if TYPE_CHECKING: | ||||
from dagster.core.execution.plan.plan import ExecutionPlan | from dagster.core.execution.plan.plan import ExecutionPlan | ||||
from dagster.core.execution.plan.step import ExecutionStep, StepKind | from dagster.core.execution.plan.step import ExecutionStep, StepKind | ||||
from dagster.core.execution.plan.inputs import StepInputData | from dagster.core.execution.plan.inputs import StepInputData | ||||
▲ Show 20 Lines • Show All 222 Lines • ▼ Show 20 Lines | Attributes: | ||||
step_kind_value (str): Value for a StepKind. | step_kind_value (str): Value for a StepKind. | ||||
logging_tags (Dict[str, str]) | logging_tags (Dict[str, str]) | ||||
event_specific_data (Any): Type must correspond to event_type_value. | event_specific_data (Any): Type must correspond to event_type_value. | ||||
message (str) | message (str) | ||||
pid (int) | pid (int) | ||||
step_key (Optional[str]): DEPRECATED | step_key (Optional[str]): DEPRECATED | ||||
""" | """ | ||||
@staticmethod | @staticmethod | ||||
def from_step( | def from_step( | ||||
event_type: "DagsterEventType", | event_type: "DagsterEventType", | ||||
step_context: IStepContext, | step_context: IStepContext, | ||||
event_specific_data: Optional["EventSpecificData"] = None, | event_specific_data: Optional["EventSpecificData"] = None, | ||||
message: Optional[str] = None, | message: Optional[str] = None, | ||||
) -> "DagsterEvent": | ) -> "DagsterEvent": | ||||
event = DagsterEvent( | event = DagsterEvent( | ||||
event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, | event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, | ||||
pipeline_name=step_context.pipeline_name, | pipeline_name=step_context.pipeline_name, | ||||
step_handle=step_context.step.handle, | step_handle=step_context.step.handle, | ||||
solid_handle=step_context.step.solid_handle, | solid_handle=step_context.step.solid_handle, | ||||
step_kind_value=step_context.step.kind.value, | step_kind_value=step_context.step.kind.value, | ||||
logging_tags=step_context.logging_tags, | logging_tags=step_context.logging_tags, | ||||
event_specific_data=_validate_event_specific_data(event_type, event_specific_data), | event_specific_data=_validate_event_specific_data(event_type, event_specific_data), | ||||
message=check.opt_str_param(message, "message"), | message=check.opt_str_param(message, "message"), | ||||
pid=os.getpid(), | pid=os.getpid(), | ||||
) | ) | ||||
log_step_event(step_context, event) | log_step_event(step_context, event) | ||||
return event | return event | ||||
@staticmethod | @staticmethod | ||||
def from_pipeline( | def from_pipeline( | ||||
event_type: DagsterEventType, | event_type: DagsterEventType, | ||||
pipeline_context: IPlanContext, | pipeline_context: IPlanContext, | ||||
message: Optional[str] = None, | message: Optional[str] = None, | ||||
event_specific_data: Optional["EventSpecificData"] = None, | event_specific_data: Optional["EventSpecificData"] = None, | ||||
step_handle: Optional[Union[StepHandle, ResolvedFromDynamicStepHandle]] = None, | step_handle: Optional[Union[StepHandle, ResolvedFromDynamicStepHandle]] = None, | ||||
) -> "DagsterEvent": | ) -> "DagsterEvent": | ||||
check.opt_inst_param( | check.opt_inst_param( | ||||
step_handle, "step_handle", (StepHandle, ResolvedFromDynamicStepHandle) | step_handle, "step_handle", (StepHandle, ResolvedFromDynamicStepHandle) | ||||
) | ) | ||||
event = DagsterEvent( | event = DagsterEvent( | ||||
event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, | event_type_value=check.inst_param(event_type, "event_type", DagsterEventType).value, | ||||
pipeline_name=pipeline_context.pipeline_name, | pipeline_name=pipeline_context.pipeline_name, | ||||
message=check.opt_str_param(message, "message"), | message=check.opt_str_param(message, "message"), | ||||
event_specific_data=_validate_event_specific_data(event_type, event_specific_data), | event_specific_data=_validate_event_specific_data(event_type, event_specific_data), | ||||
step_handle=step_handle, | step_handle=step_handle, | ||||
pid=os.getpid(), | pid=os.getpid(), | ||||
) | ) | ||||
step_key = step_handle.to_key() if step_handle else None | step_key = step_handle.to_key() if step_handle else None | ||||
log_pipeline_event(pipeline_context, event, step_key) | log_pipeline_event(pipeline_context, event, step_key) | ||||
return event | return event | ||||
@staticmethod | @staticmethod | ||||
def from_resource( | def from_resource( | ||||
pipeline_name: str, | pipeline_name: str, | ||||
execution_plan: "ExecutionPlan", | execution_plan: "ExecutionPlan", | ||||
log_manager: DagsterLogManager, | log_manager: DagsterLogManager, | ||||
message: Optional[str] = None, | message: Optional[str] = None, | ||||
event_specific_data: Optional["EngineEventData"] = None, | event_specific_data: Optional["EngineEventData"] = None, | ||||
) -> "DagsterEvent": | ) -> "DagsterEvent": | ||||
event = DagsterEvent( | event = DagsterEvent( | ||||
DagsterEventType.ENGINE_EVENT.value, | DagsterEventType.ENGINE_EVENT.value, | ||||
pipeline_name=pipeline_name, | pipeline_name=pipeline_name, | ||||
message=check.opt_str_param(message, "message"), | message=check.opt_str_param(message, "message"), | ||||
event_specific_data=_validate_event_specific_data( | event_specific_data=_validate_event_specific_data( | ||||
DagsterEventType.ENGINE_EVENT, event_specific_data | DagsterEventType.ENGINE_EVENT, event_specific_data | ||||
), | ), | ||||
step_handle=execution_plan.step_handle_for_single_step_plans(), | step_handle=execution_plan.step_handle_for_single_step_plans(), | ||||
pid=os.getpid(), | pid=os.getpid(), | ||||
) | ) | ||||
log_resource_event(log_manager, pipeline_name, event) | log_resource_event(log_manager, pipeline_name, event) | ||||
return event | return event | ||||
alangenfeld: not for this diff - but we should rename these for clarity
* `report_and_log_from_xxx`
*… | |||||
def __new__( | def __new__( | ||||
cls, | cls, | ||||
event_type_value: str, | event_type_value: str, | ||||
pipeline_name: str, | pipeline_name: str, | ||||
step_handle: Optional[Union[StepHandle, ResolvedFromDynamicStepHandle]] = None, | step_handle: Optional[Union[StepHandle, ResolvedFromDynamicStepHandle]] = None, | ||||
solid_handle: Optional[SolidHandle] = None, | solid_handle: Optional[SolidHandle] = None, | ||||
step_kind_value: Optional[str] = None, | step_kind_value: Optional[str] = None, | ||||
logging_tags: Optional[Dict[str, str]] = None, | logging_tags: Optional[Dict[str, str]] = None, | ||||
▲ Show 20 Lines • Show All 752 Lines • ▼ Show 20 Lines | ) -> "DagsterEvent": | ||||
event.message, | event.message, | ||||
dagster_event=event, | dagster_event=event, | ||||
pipeline_name=step_context.pipeline_name, | pipeline_name=step_context.pipeline_name, | ||||
) | ) | ||||
return event | return event | ||||
@staticmethod | @staticmethod | ||||
def capture_logs(pipeline_context: IPlanContext, log_key: str, steps: List["ExecutionStep"]): | def capture_logs( | ||||
pipeline_context: Optional[IPlanContext], | |||||
log_key: str, | |||||
steps: List["ExecutionStep"], | |||||
pipeline_run: Optional[PipelineRun] = None, | |||||
): | |||||
step_keys = [step.key for step in steps] | step_keys = [step.key for step in steps] | ||||
if len(step_keys) == 1: | if len(step_keys) == 1: | ||||
message = f"Started capturing logs for solid: {step_keys[0]}." | message = f"Started capturing logs for solid: {step_keys[0]}." | ||||
else: | else: | ||||
message = f"Started capturing logs in process (pid: {os.getpid()})." | message = f"Started capturing logs in process (pid: {os.getpid()})." | ||||
if isinstance(pipeline_context, StepExecutionContext): | if isinstance(pipeline_context, StepExecutionContext): | ||||
return DagsterEvent.from_step( | return DagsterEvent.from_step( | ||||
DagsterEventType.LOGS_CAPTURED, | DagsterEventType.LOGS_CAPTURED, | ||||
pipeline_context, | pipeline_context, | ||||
message=message, | message=message, | ||||
event_specific_data=ComputeLogsCaptureData( | event_specific_data=ComputeLogsCaptureData( | ||||
step_keys=step_keys, | step_keys=step_keys, | ||||
log_key=log_key, | log_key=log_key, | ||||
), | ), | ||||
) | ) | ||||
elif isinstance(pipeline_context, IPlanContext): | |||||
return DagsterEvent.from_pipeline( | return DagsterEvent.from_pipeline( | ||||
DagsterEventType.LOGS_CAPTURED, | DagsterEventType.LOGS_CAPTURED, | ||||
pipeline_context, | pipeline_context, | ||||
message=message, | message=message, | ||||
event_specific_data=ComputeLogsCaptureData( | event_specific_data=ComputeLogsCaptureData( | ||||
step_keys=step_keys, | step_keys=step_keys, | ||||
log_key=log_key, | log_key=log_key, | ||||
), | ), | ||||
) | ) | ||||
elif pipeline_run: | |||||
check.inst_param(pipeline_run, "pipeline_run", PipelineRun) | |||||
return DagsterEvent( | |||||
event_type_value=DagsterEventType.LOGS_CAPTURED.value, | |||||
pipeline_name=pipeline_run.pipeline_name or "", | |||||
message=message, | |||||
event_specific_data=ComputeLogsCaptureData( | |||||
step_keys=step_keys, | |||||
log_key=log_key, | |||||
), | |||||
) | |||||
else: | |||||
check.failed("DagsterEvent.capture_logs called without a IPlanContext or a PipelineRun") | |||||
Not Done Inline Actionsthese first two log directly, and this last case doesn't, should probably report directly here instead of doing it at the callsite [1] alangenfeld: these first two log directly, and this last case doesn't, should probably report directly here… | |||||
def get_step_output_event( | def get_step_output_event( | ||||
events: List[DagsterEvent], step_key: str, output_name: Optional[str] = "result" | events: List[DagsterEvent], step_key: str, output_name: Optional[str] = "result" | ||||
) -> Optional["DagsterEvent"]: | ) -> Optional["DagsterEvent"]: | ||||
check.list_param(events, "events", of_type=DagsterEvent) | check.list_param(events, "events", of_type=DagsterEvent) | ||||
check.str_param(step_key, "step_key") | check.str_param(step_key, "step_key") | ||||
check.str_param(output_name, "output_name") | check.str_param(output_name, "output_name") | ||||
for event in events: | for event in events: | ||||
▲ Show 20 Lines • Show All 359 Lines • Show Last 20 Lines |
not for this diff - but we should rename these for clarity