Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/instance/__init__.py
import inspect | import inspect | ||||
import logging | import logging | ||||
import os | import os | ||||
import sys | import sys | ||||
import tempfile | import tempfile | ||||
import time | import time | ||||
import warnings | import warnings | ||||
import weakref | import weakref | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from enum import Enum | from enum import Enum | ||||
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, cast | from typing import ( | ||||
TYPE_CHECKING, | |||||
Any, | |||||
Callable, | |||||
Dict, | |||||
Iterable, | |||||
List, | |||||
Optional, | |||||
Tuple, | |||||
Type, | |||||
Union, | |||||
cast, | |||||
) | |||||
import yaml | import yaml | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions.events import AssetKey | from dagster.core.definitions.events import AssetKey | ||||
from dagster.core.definitions.pipeline import PipelineDefinition, PipelineSubsetDefinition | from dagster.core.definitions.pipeline import PipelineDefinition, PipelineSubsetDefinition | ||||
from dagster.core.definitions.pipeline_base import InMemoryPipeline | from dagster.core.definitions.pipeline_base import InMemoryPipeline | ||||
from dagster.core.errors import ( | from dagster.core.errors import ( | ||||
DagsterHomeNotSetError, | DagsterHomeNotSetError, | ||||
Show All 24 Lines | |||||
# 'execution_date' used in Airflow operator execution and 'is_airflow_ingest_pipeline' determines | # 'execution_date' used in Airflow operator execution and 'is_airflow_ingest_pipeline' determines | ||||
# whether 'airflow_execution_date' is needed. | # whether 'airflow_execution_date' is needed. | ||||
# https://github.com/dagster-io/dagster/issues/2403 | # https://github.com/dagster-io/dagster/issues/2403 | ||||
AIRFLOW_EXECUTION_DATE_STR = "airflow_execution_date" | AIRFLOW_EXECUTION_DATE_STR = "airflow_execution_date" | ||||
IS_AIRFLOW_INGEST_PIPELINE_STR = "is_airflow_ingest_pipeline" | IS_AIRFLOW_INGEST_PIPELINE_STR = "is_airflow_ingest_pipeline" | ||||
if TYPE_CHECKING: | if TYPE_CHECKING: | ||||
from dagster.core.events import DagsterEvent, DagsterEventType | from dagster.core.events import DagsterEvent, DagsterEventType, EngineEventData | ||||
from dagster.core.host_representation import HistoricalPipeline | from dagster.core.host_representation import HistoricalPipeline | ||||
from dagster.core.snap import PipelineSnapshot, ExecutionPlanSnapshot | from dagster.core.snap import PipelineSnapshot, ExecutionPlanSnapshot | ||||
from dagster.core.storage.event_log.base import EventRecordsFilter, EventLogRecord | from dagster.core.storage.event_log.base import EventRecordsFilter, EventLogRecord | ||||
from dagster.core.workspace.workspace import IWorkspace | from dagster.core.workspace.workspace import IWorkspace | ||||
from dagster.daemon.types import DaemonHeartbeat | from dagster.daemon.types import DaemonHeartbeat | ||||
from dagster.core.storage.compute_log_manager import ComputeLogManager | from dagster.core.storage.compute_log_manager import ComputeLogManager | ||||
from dagster.core.storage.event_log import EventLogStorage | from dagster.core.storage.event_log import EventLogStorage | ||||
from dagster.core.storage.root import LocalArtifactStorage | from dagster.core.storage.root import LocalArtifactStorage | ||||
▲ Show 20 Lines • Show All 1,098 Lines • ▼ Show 20 Lines | def handle_new_event(self, event): | ||||
for sub in self._subscribers[run_id]: | for sub in self._subscribers[run_id]: | ||||
sub(event) | sub(event) | ||||
def add_event_listener(self, run_id, cb): | def add_event_listener(self, run_id, cb): | ||||
self._subscribers[run_id].append(cb) | self._subscribers[run_id].append(cb) | ||||
def report_engine_event( | def report_engine_event( | ||||
self, | self, | ||||
message, | message: str, | ||||
pipeline_run, | pipeline_run: PipelineRun, | ||||
engine_event_data=None, | engine_event_data: Optional["EngineEventData"] = None, | ||||
cls=None, | cls: Optional[Type] = None, | ||||
step_key=None, | step_key: Optional[str] = None, | ||||
): | ): | ||||
""" | """ | ||||
Report a EngineEvent that occurred outside of a pipeline execution context. | Report a EngineEvent that occurred outside of a pipeline execution context. | ||||
""" | """ | ||||
from dagster.core.events import EngineEventData, DagsterEvent, DagsterEventType | from dagster.core.events import EngineEventData, DagsterEvent, DagsterEventType | ||||
from dagster.core.events.log import EventLogEntry | |||||
check.class_param(cls, "cls") | check.class_param(cls, "cls") | ||||
check.str_param(message, "message") | check.str_param(message, "message") | ||||
check.inst_param(pipeline_run, "pipeline_run", PipelineRun) | check.inst_param(pipeline_run, "pipeline_run", PipelineRun) | ||||
engine_event_data = check.opt_inst_param( | engine_event_data = check.opt_inst_param( | ||||
engine_event_data, | engine_event_data, | ||||
"engine_event_data", | "engine_event_data", | ||||
EngineEventData, | EngineEventData, | ||||
EngineEventData([]), | EngineEventData([]), | ||||
) | ) | ||||
if cls: | if cls: | ||||
message = "[{}] {}".format(cls.__name__, message) | message = "[{}] {}".format(cls.__name__, message) | ||||
log_level = logging.INFO | log_level = logging.INFO | ||||
if engine_event_data and engine_event_data.error: | if engine_event_data and engine_event_data.error: | ||||
log_level = logging.ERROR | log_level = logging.ERROR | ||||
dagster_event = DagsterEvent( | dagster_event = DagsterEvent( | ||||
event_type_value=DagsterEventType.ENGINE_EVENT.value, | event_type_value=DagsterEventType.ENGINE_EVENT.value, | ||||
pipeline_name=pipeline_run.pipeline_name, | pipeline_name=pipeline_run.pipeline_name, | ||||
message=message, | message=message, | ||||
event_specific_data=engine_event_data, | event_specific_data=engine_event_data, | ||||
step_key=step_key, | |||||
) | ) | ||||
self.report_dagster_event(dagster_event, pipeline_run, log_level) | |||||
return dagster_event | |||||
def report_dagster_event( | |||||
alangenfeld: mypy | |||||
self, | |||||
dagster_event: "DagsterEvent", | |||||
pipeline_run: PipelineRun, | |||||
log_level: Union[str, int] = logging.INFO, | |||||
): | |||||
""" | |||||
Takes a DagsterEvent and stores it in persistent storage for the corresponding PipelineRun | |||||
""" | |||||
from dagster.core.events.log import EventLogEntry | |||||
event_record = EventLogEntry( | event_record = EventLogEntry( | ||||
message=message, | message=dagster_event.message or "", | ||||
user_message=message, | user_message=dagster_event.message or "", | ||||
level=log_level, | level=log_level, | ||||
pipeline_name=pipeline_run.pipeline_name, | pipeline_name=pipeline_run.pipeline_name, | ||||
run_id=pipeline_run.run_id, | run_id=pipeline_run.run_id, | ||||
error_info=None, | error_info=None, | ||||
timestamp=time.time(), | timestamp=time.time(), | ||||
step_key=step_key, | step_key=dagster_event.step_key, | ||||
dagster_event=dagster_event, | dagster_event=dagster_event, | ||||
) | ) | ||||
self.handle_new_event(event_record) | self.handle_new_event(event_record) | ||||
return dagster_event | return dagster_event | ||||
def report_run_canceling(self, run, message=None): | def report_run_canceling(self, run, message=None): | ||||
from dagster.core.events import DagsterEvent, DagsterEventType | from dagster.core.events import DagsterEvent, DagsterEventType | ||||
from dagster.core.events.log import EventLogEntry | |||||
check.inst_param(run, "run", PipelineRun) | check.inst_param(run, "run", PipelineRun) | ||||
message = check.opt_str_param( | message = check.opt_str_param( | ||||
message, | message, | ||||
"message", | "message", | ||||
"Sending pipeline termination request.", | "Sending pipeline termination request.", | ||||
) | ) | ||||
canceling_event = DagsterEvent( | canceling_event = DagsterEvent( | ||||
event_type_value=DagsterEventType.PIPELINE_CANCELING.value, | event_type_value=DagsterEventType.PIPELINE_CANCELING.value, | ||||
pipeline_name=run.pipeline_name, | pipeline_name=run.pipeline_name, | ||||
message=message, | message=message, | ||||
) | ) | ||||
return self.report_dagster_event(canceling_event, run) | |||||
event_record = EventLogEntry( | |||||
message=message, | |||||
user_message="", | |||||
level=logging.INFO, | |||||
pipeline_name=run.pipeline_name, | |||||
run_id=run.run_id, | |||||
error_info=None, | |||||
timestamp=time.time(), | |||||
dagster_event=canceling_event, | |||||
) | |||||
self.handle_new_event(event_record) | |||||
def report_run_canceled( | def report_run_canceled( | ||||
self, | self, | ||||
pipeline_run, | pipeline_run, | ||||
message=None, | message=None, | ||||
): | ): | ||||
from dagster.core.events import DagsterEvent, DagsterEventType | from dagster.core.events import DagsterEvent, DagsterEventType | ||||
from dagster.core.events.log import EventLogEntry | |||||
check.inst_param(pipeline_run, "pipeline_run", PipelineRun) | check.inst_param(pipeline_run, "pipeline_run", PipelineRun) | ||||
message = check.opt_str_param( | message = check.opt_str_param( | ||||
message, | message, | ||||
"mesage", | "mesage", | ||||
"This pipeline run has been marked as canceled from outside the execution context.", | "This pipeline run has been marked as canceled from outside the execution context.", | ||||
) | ) | ||||
dagster_event = DagsterEvent( | dagster_event = DagsterEvent( | ||||
event_type_value=DagsterEventType.PIPELINE_CANCELED.value, | event_type_value=DagsterEventType.PIPELINE_CANCELED.value, | ||||
pipeline_name=pipeline_run.pipeline_name, | pipeline_name=pipeline_run.pipeline_name, | ||||
message=message, | message=message, | ||||
) | ) | ||||
event_record = EventLogEntry( | return self.report_dagster_event(dagster_event, pipeline_run, log_level=logging.ERROR) | ||||
message=message, | |||||
user_message=message, | |||||
level=logging.ERROR, | |||||
pipeline_name=pipeline_run.pipeline_name, | |||||
run_id=pipeline_run.run_id, | |||||
error_info=None, | |||||
timestamp=time.time(), | |||||
dagster_event=dagster_event, | |||||
) | |||||
self.handle_new_event(event_record) | |||||
return dagster_event | |||||
def report_run_failed(self, pipeline_run, message=None): | def report_run_failed(self, pipeline_run, message=None): | ||||
from dagster.core.events import DagsterEvent, DagsterEventType | from dagster.core.events import DagsterEvent, DagsterEventType | ||||
from dagster.core.events.log import EventLogEntry | |||||
check.inst_param(pipeline_run, "pipeline_run", PipelineRun) | check.inst_param(pipeline_run, "pipeline_run", PipelineRun) | ||||
message = check.opt_str_param( | message = check.opt_str_param( | ||||
message, | message, | ||||
"message", | "message", | ||||
"This pipeline run has been marked as failed from outside the execution context.", | "This pipeline run has been marked as failed from outside the execution context.", | ||||
) | ) | ||||
dagster_event = DagsterEvent( | dagster_event = DagsterEvent( | ||||
event_type_value=DagsterEventType.PIPELINE_FAILURE.value, | event_type_value=DagsterEventType.PIPELINE_FAILURE.value, | ||||
pipeline_name=pipeline_run.pipeline_name, | pipeline_name=pipeline_run.pipeline_name, | ||||
message=message, | message=message, | ||||
) | ) | ||||
event_record = EventLogEntry( | return self.report_dagster_event(dagster_event, pipeline_run, log_level=logging.ERROR) | ||||
message=message, | |||||
user_message=message, | |||||
level=logging.ERROR, | |||||
pipeline_name=pipeline_run.pipeline_name, | |||||
run_id=pipeline_run.run_id, | |||||
error_info=None, | |||||
timestamp=time.time(), | |||||
dagster_event=dagster_event, | |||||
) | |||||
self.handle_new_event(event_record) | |||||
return dagster_event | |||||
# directories | # directories | ||||
def file_manager_directory(self, run_id): | def file_manager_directory(self, run_id): | ||||
return self._local_artifact_storage.file_manager_dir(run_id) | return self._local_artifact_storage.file_manager_dir(run_id) | ||||
def intermediates_directory(self, run_id): | def intermediates_directory(self, run_id): | ||||
return self._local_artifact_storage.intermediates_dir(run_id) | return self._local_artifact_storage.intermediates_dir(run_id) | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | def launch_run(self, run_id: str, workspace: "IWorkspace"): | ||||
Args: | Args: | ||||
run_id (str): The id of the run the launch. | run_id (str): The id of the run the launch. | ||||
""" | """ | ||||
from dagster.core.launcher import LaunchRunContext | from dagster.core.launcher import LaunchRunContext | ||||
run = self.get_run_by_id(run_id) | run = self.get_run_by_id(run_id) | ||||
from dagster.core.events import EngineEventData, DagsterEvent, DagsterEventType | from dagster.core.events import EngineEventData, DagsterEvent, DagsterEventType | ||||
from dagster.core.events.log import EventLogEntry | |||||
launch_started_event = DagsterEvent( | launch_started_event = DagsterEvent( | ||||
event_type_value=DagsterEventType.PIPELINE_STARTING.value, | event_type_value=DagsterEventType.PIPELINE_STARTING.value, | ||||
pipeline_name=run.pipeline_name, | pipeline_name=run.pipeline_name, | ||||
) | ) | ||||
event_record = EventLogEntry( | self.report_dagster_event(launch_started_event, run) | ||||
message="", | |||||
user_message="", | |||||
level=logging.INFO, | |||||
pipeline_name=run.pipeline_name, | |||||
run_id=run.run_id, | |||||
error_info=None, | |||||
timestamp=time.time(), | |||||
dagster_event=launch_started_event, | |||||
) | |||||
self.handle_new_event(event_record) | |||||
run = self.get_run_by_id(run_id) | run = self.get_run_by_id(run_id) | ||||
try: | try: | ||||
self._run_launcher.launch_run(LaunchRunContext(pipeline_run=run, workspace=workspace)) | self._run_launcher.launch_run(LaunchRunContext(pipeline_run=run, workspace=workspace)) | ||||
except: | except: | ||||
error = serializable_error_info_from_exc_info(sys.exc_info()) | error = serializable_error_info_from_exc_info(sys.exc_info()) | ||||
self.report_engine_event( | self.report_engine_event( | ||||
▲ Show 20 Lines • Show All 211 Lines • Show Last 20 Lines |
mypy