Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/grpc/impl.py
"""Workhorse functions for individual API requests.""" | """Workhorse functions for individual API requests.""" | ||||
import os | import os | ||||
import sys | import sys | ||||
import pendulum | import pendulum | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions import ScheduleExecutionContext | from dagster.core.definitions import ScheduleExecutionContext | ||||
from dagster.core.definitions.reconstructable import ( | from dagster.core.definitions.reconstructable import ( | ||||
ReconstructablePipeline, | ReconstructablePipeline, | ||||
ReconstructableRepository, | ReconstructableRepository, | ||||
) | ) | ||||
from dagster.core.definitions.sensor import SensorExecutionContext, SensorRunParams, SensorSkipData | from dagster.core.definitions.sensor import RunRequest, SensorExecutionContext, SkipReason | ||||
from dagster.core.errors import ( | from dagster.core.errors import ( | ||||
DagsterInvalidSubsetError, | DagsterInvalidSubsetError, | ||||
DagsterRunNotFoundError, | DagsterRunNotFoundError, | ||||
DagsterSubprocessError, | DagsterSubprocessError, | ||||
PartitionExecutionError, | PartitionExecutionError, | ||||
ScheduleExecutionError, | ScheduleExecutionError, | ||||
SensorExecutionError, | SensorExecutionError, | ||||
user_code_error_boundary, | user_code_error_boundary, | ||||
▲ Show 20 Lines • Show All 245 Lines • ▼ Show 20 Lines | with DagsterInstance.from_ref(instance_ref) as instance: | ||||
try: | try: | ||||
with user_code_error_boundary( | with user_code_error_boundary( | ||||
SensorExecutionError, | SensorExecutionError, | ||||
lambda: "Error occurred during the execution of evaluation_fn for sensor " | lambda: "Error occurred during the execution of evaluation_fn for sensor " | ||||
"{sensor_name}".format(sensor_name=sensor_def.name), | "{sensor_name}".format(sensor_name=sensor_def.name), | ||||
): | ): | ||||
tick_data_list = sensor_def.get_tick_data(sensor_context) | tick_data_list = sensor_def.get_tick_data(sensor_context) | ||||
return ExternalSensorExecutionData( | return ExternalSensorExecutionData( | ||||
run_params=[ | run_requests=[tick for tick in tick_data_list if isinstance(tick, RunRequest)], | ||||
tick for tick in tick_data_list if isinstance(tick, SensorRunParams) | |||||
], | |||||
skip_message=tick_data_list[0].skip_message | skip_message=tick_data_list[0].skip_message | ||||
if tick_data_list and isinstance(tick_data_list[0], SensorSkipData) | if tick_data_list and isinstance(tick_data_list[0], SkipReason) | ||||
else None, | else None, | ||||
) | ) | ||||
except SensorExecutionError: | except SensorExecutionError: | ||||
return ExternalSensorExecutionErrorData( | return ExternalSensorExecutionErrorData( | ||||
serializable_error_info_from_exc_info(sys.exc_info()) | serializable_error_info_from_exc_info(sys.exc_info()) | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 126 Lines • Show Last 20 Lines |