Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/scheduler/sensor.py
import logging | |||||
import os | import os | ||||
import sys | import sys | ||||
import time | import time | ||||
import pendulum | import pendulum | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions.job import JobType | from dagster.core.definitions.job import JobType | ||||
from dagster.core.errors import DagsterSubprocessError | from dagster.core.errors import DagsterSubprocessError | ||||
from dagster.core.events import EngineEventData | from dagster.core.events import EngineEventData | ||||
from dagster.core.host_representation import ( | from dagster.core.host_representation import ( | ||||
ExternalPipeline, | ExternalPipeline, | ||||
PipelineSelector, | PipelineSelector, | ||||
RepositoryLocation, | RepositoryLocation, | ||||
RepositoryLocationHandle, | RepositoryLocationHandle, | ||||
) | ) | ||||
from dagster.core.host_representation.external_data import ( | from dagster.core.host_representation.external_data import ( | ||||
ExternalSensorExecutionData, | ExternalSensorExecutionData, | ||||
ExternalSensorExecutionErrorData, | ExternalSensorExecutionErrorData, | ||||
) | ) | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance, is_memoized_run | ||||
from dagster.core.scheduler.job import JobStatus, JobTickData, JobTickStatus, SensorJobData | from dagster.core.scheduler.job import JobStatus, JobTickData, JobTickStatus, SensorJobData | ||||
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | ||||
from dagster.core.storage.tags import RUN_KEY_TAG, check_tags | from dagster.core.storage.tags import MEMOIZED_RUN_TAG, RUN_KEY_TAG, check_tags | ||||
from dagster.utils import merge_dicts | from dagster.utils import merge_dicts | ||||
from dagster.utils.error import serializable_error_info_from_exc_info | from dagster.utils.error import serializable_error_info_from_exc_info | ||||
RECORDED_TICK_STATES = [JobTickStatus.SUCCESS, JobTickStatus.FAILURE] | RECORDED_TICK_STATES = [JobTickStatus.SUCCESS, JobTickStatus.FAILURE] | ||||
FULFILLED_TICK_STATES = [JobTickStatus.SKIPPED, JobTickStatus.SUCCESS] | FULFILLED_TICK_STATES = [JobTickStatus.SKIPPED, JobTickStatus.SUCCESS] | ||||
class SensorLaunchContext: | class SensorLaunchContext: | ||||
▲ Show 20 Lines • Show All 290 Lines • ▼ Show 20 Lines | ): | ||||
except Exception as e: # pylint: disable=broad-except | except Exception as e: # pylint: disable=broad-except | ||||
execution_plan_errors.append(serializable_error_info_from_exc_info(sys.exc_info())) | execution_plan_errors.append(serializable_error_info_from_exc_info(sys.exc_info())) | ||||
pipeline_tags = external_pipeline.tags or {} | pipeline_tags = external_pipeline.tags or {} | ||||
check_tags(pipeline_tags, "pipeline_tags") | check_tags(pipeline_tags, "pipeline_tags") | ||||
tags = merge_dicts( | tags = merge_dicts( | ||||
merge_dicts(pipeline_tags, run_request.tags), PipelineRun.tags_for_sensor(external_sensor) | merge_dicts(pipeline_tags, run_request.tags), PipelineRun.tags_for_sensor(external_sensor) | ||||
) | ) | ||||
if is_memoized_run(tags): | |||||
logging.warning( | |||||
'Tag "{tag}" was found when initializing pipeline run, however, memoized ' | |||||
"execution is only supported from the command line. This pipeline will run, but " | |||||
"outputs from previous executions will be ignored.".format(tag=MEMOIZED_RUN_TAG) | |||||
) | |||||
alangenfeld: does this show up anywhere useful? | |||||
if run_request.run_key: | if run_request.run_key: | ||||
tags[RUN_KEY_TAG] = run_request.run_key | tags[RUN_KEY_TAG] = run_request.run_key | ||||
run = instance.create_run( | run = instance.create_run( | ||||
pipeline_name=external_sensor.pipeline_name, | pipeline_name=external_sensor.pipeline_name, | ||||
run_id=None, | run_id=None, | ||||
run_config=run_request.run_config, | run_config=run_request.run_config, | ||||
mode=external_sensor.mode, | mode=external_sensor.mode, | ||||
Show All 31 Lines |
does this show up anywhere useful?