Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/grpc/impl.py
"""Workhorse functions for individual API requests.""" | """Workhorse functions for individual API requests.""" | ||||
import os | import os | ||||
import sys | import sys | ||||
import pendulum | import pendulum | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions import ScheduleExecutionContext | from dagster.core.definitions import ScheduleExecutionContext | ||||
from dagster.core.definitions.reconstructable import ( | from dagster.core.definitions.reconstructable import ( | ||||
ReconstructablePipeline, | ReconstructablePipeline, | ||||
ReconstructableRepository, | ReconstructableRepository, | ||||
) | ) | ||||
from dagster.core.definitions.sensor import SensorExecutionContext | |||||
from dagster.core.errors import ( | from dagster.core.errors import ( | ||||
DagsterInvalidSubsetError, | DagsterInvalidSubsetError, | ||||
DagsterRunNotFoundError, | DagsterRunNotFoundError, | ||||
DagsterSubprocessError, | DagsterSubprocessError, | ||||
PartitionExecutionError, | PartitionExecutionError, | ||||
ScheduleExecutionError, | ScheduleExecutionError, | ||||
SensorExecutionError, | |||||
user_code_error_boundary, | user_code_error_boundary, | ||||
) | ) | ||||
from dagster.core.events import EngineEventData | from dagster.core.events import EngineEventData | ||||
from dagster.core.execution.api import create_execution_plan, execute_run_iterator | from dagster.core.execution.api import create_execution_plan, execute_run_iterator | ||||
from dagster.core.host_representation import external_pipeline_data_from_def | from dagster.core.host_representation import external_pipeline_data_from_def | ||||
from dagster.core.host_representation.external_data import ( | from dagster.core.host_representation.external_data import ( | ||||
ExternalPartitionConfigData, | ExternalPartitionConfigData, | ||||
ExternalPartitionExecutionErrorData, | ExternalPartitionExecutionErrorData, | ||||
ExternalPartitionExecutionParamData, | ExternalPartitionExecutionParamData, | ||||
ExternalPartitionNamesData, | ExternalPartitionNamesData, | ||||
ExternalPartitionSetExecutionParamData, | ExternalPartitionSetExecutionParamData, | ||||
ExternalPartitionTagsData, | ExternalPartitionTagsData, | ||||
ExternalPipelineSubsetResult, | ExternalPipelineSubsetResult, | ||||
ExternalScheduleExecutionData, | ExternalScheduleExecutionData, | ||||
ExternalScheduleExecutionErrorData, | ExternalScheduleExecutionErrorData, | ||||
ExternalSensorExecutionData, | |||||
ExternalSensorExecutionErrorData, | |||||
) | ) | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.snap.execution_plan_snapshot import ( | from dagster.core.snap.execution_plan_snapshot import ( | ||||
ExecutionPlanSnapshotErrorData, | ExecutionPlanSnapshotErrorData, | ||||
snapshot_from_execution_plan, | snapshot_from_execution_plan, | ||||
) | ) | ||||
from dagster.core.storage.pipeline_run import PipelineRun | from dagster.core.storage.pipeline_run import PipelineRun | ||||
from dagster.grpc.types import ExecutionPlanSnapshotArgs | from dagster.grpc.types import ExecutionPlanSnapshotArgs | ||||
▲ Show 20 Lines • Show All 200 Lines • ▼ Show 20 Lines | with DagsterInstance.from_ref(instance_ref) as instance: | ||||
run_config=run_config, tags=tags, should_execute=True | run_config=run_config, tags=tags, should_execute=True | ||||
) | ) | ||||
except ScheduleExecutionError: | except ScheduleExecutionError: | ||||
return ExternalScheduleExecutionErrorData( | return ExternalScheduleExecutionErrorData( | ||||
serializable_error_info_from_exc_info(sys.exc_info()) | serializable_error_info_from_exc_info(sys.exc_info()) | ||||
) | ) | ||||
def get_external_sensor_execution(recon_repo, instance_ref, sensor_name, last_evaluation_timestamp): | |||||
check.inst_param( | |||||
recon_repo, "recon_repo", ReconstructableRepository, | |||||
) | |||||
definition = recon_repo.get_definition() | |||||
sensor_def = definition.get_sensor_def(sensor_name) | |||||
with DagsterInstance.from_ref(instance_ref) as instance: | |||||
sensor_context = SensorExecutionContext( | |||||
instance, last_evaluation_time=last_evaluation_timestamp | |||||
) | |||||
try: | |||||
with user_code_error_boundary( | |||||
SensorExecutionError, | |||||
lambda: "Error occurred during the execution of should_execute for sensor " | |||||
"{sensor_name}".format(sensor_name=sensor_def.name), | |||||
): | |||||
if not sensor_def.should_execute(sensor_context): | |||||
return ExternalSensorExecutionData( | |||||
should_execute=False, run_config=None, tags=None | |||||
) | |||||
with user_code_error_boundary( | |||||
SensorExecutionError, | |||||
lambda: "Error occurred during the execution of run_config_fn for sensor " | |||||
"{sensor_name}".format(sensor_name=sensor_def.name), | |||||
): | |||||
run_config = sensor_def.get_run_config(sensor_context) | |||||
with user_code_error_boundary( | |||||
SensorExecutionError, | |||||
lambda: "Error occurred during the execution of tags_fn for sensor " | |||||
"{sensor_name}".format(sensor_name=sensor_def.name), | |||||
): | |||||
tags = sensor_def.get_tags(sensor_context) | |||||
return ExternalSensorExecutionData( | |||||
should_execute=True, run_config=run_config, tags=tags | |||||
) | |||||
except SensorExecutionError: | |||||
return ExternalSensorExecutionErrorData( | |||||
serializable_error_info_from_exc_info(sys.exc_info()) | |||||
) | |||||
def get_partition_config(recon_repo, partition_set_name, partition_name): | def get_partition_config(recon_repo, partition_set_name, partition_name): | ||||
definition = recon_repo.get_definition() | definition = recon_repo.get_definition() | ||||
partition_set_def = definition.get_partition_set_def(partition_set_name) | partition_set_def = definition.get_partition_set_def(partition_set_name) | ||||
partition = partition_set_def.get_partition(partition_name) | partition = partition_set_def.get_partition(partition_name) | ||||
try: | try: | ||||
with user_code_error_boundary( | with user_code_error_boundary( | ||||
PartitionExecutionError, | PartitionExecutionError, | ||||
lambda: "Error occurred during the evaluation of the `run_config_for_partition` " | lambda: "Error occurred during the evaluation of the `run_config_for_partition` " | ||||
▲ Show 20 Lines • Show All 117 Lines • Show Last 20 Lines |