Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/scheduler/sensor.py
import logging | |||||
Lint: Unused Import: Unused import logging | |||||
import os | import os | ||||
import sys | import sys | ||||
import time | import time | ||||
import pendulum | import pendulum | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions.job import JobType | from dagster.core.definitions.job import JobType | ||||
from dagster.core.errors import DagsterSubprocessError | from dagster.core.errors import DagsterSubprocessError | ||||
from dagster.core.events import EngineEventData | from dagster.core.events import EngineEventData | ||||
from dagster.core.host_representation import ( | from dagster.core.host_representation import ( | ||||
ExternalPipeline, | ExternalPipeline, | ||||
PipelineSelector, | PipelineSelector, | ||||
RepositoryLocation, | RepositoryLocation, | ||||
RepositoryLocationHandle, | RepositoryLocationHandle, | ||||
) | ) | ||||
from dagster.core.host_representation.external_data import ( | from dagster.core.host_representation.external_data import ( | ||||
ExternalSensorExecutionData, | ExternalSensorExecutionData, | ||||
ExternalSensorExecutionErrorData, | ExternalSensorExecutionErrorData, | ||||
) | ) | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.scheduler.job import JobStatus, JobTickData, JobTickStatus, SensorJobData | from dagster.core.scheduler.job import JobStatus, JobTickData, JobTickStatus, SensorJobData | ||||
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | ||||
from dagster.core.storage.tags import RUN_KEY_TAG, check_tags | from dagster.core.storage.tags import MEMOIZED_RUN_TAG, RUN_KEY_TAG, check_tags | ||||
Lint: Unused Import Unused MEMOIZED_RUN_TAG imported from dagster.core.storage.tags Lint: Unused Import: Unused MEMOIZED_RUN_TAG imported from dagster.core.storage.tags | |||||
from dagster.utils import merge_dicts | from dagster.utils import merge_dicts | ||||
from dagster.utils.error import serializable_error_info_from_exc_info | from dagster.utils.error import serializable_error_info_from_exc_info | ||||
RECORDED_TICK_STATES = [JobTickStatus.SUCCESS, JobTickStatus.FAILURE] | RECORDED_TICK_STATES = [JobTickStatus.SUCCESS, JobTickStatus.FAILURE] | ||||
FULFILLED_TICK_STATES = [JobTickStatus.SKIPPED, JobTickStatus.SUCCESS] | FULFILLED_TICK_STATES = [JobTickStatus.SKIPPED, JobTickStatus.SUCCESS] | ||||
class SensorLaunchContext: | class SensorLaunchContext: | ||||
▲ Show 20 Lines • Show All 287 Lines • ▼ Show 20 Lines | try: | ||||
execution_plan_snapshot = external_execution_plan.execution_plan_snapshot | execution_plan_snapshot = external_execution_plan.execution_plan_snapshot | ||||
except DagsterSubprocessError as e: | except DagsterSubprocessError as e: | ||||
execution_plan_errors.extend(e.subprocess_error_infos) | execution_plan_errors.extend(e.subprocess_error_infos) | ||||
except Exception as e: # pylint: disable=broad-except | except Exception as e: # pylint: disable=broad-except | ||||
execution_plan_errors.append(serializable_error_info_from_exc_info(sys.exc_info())) | execution_plan_errors.append(serializable_error_info_from_exc_info(sys.exc_info())) | ||||
pipeline_tags = external_pipeline.tags or {} | pipeline_tags = external_pipeline.tags or {} | ||||
check_tags(pipeline_tags, "pipeline_tags") | check_tags(pipeline_tags, "pipeline_tags") | ||||
tags = merge_dicts( | |||||
merge_dicts(pipeline_tags, run_request.tags), PipelineRun.tags_for_sensor(external_sensor) | |||||
) | |||||
if run_request.run_key: | if run_request.run_key: | ||||
tags[RUN_KEY_TAG] = run_request.run_key | tags[RUN_KEY_TAG] = run_request.run_key | ||||
Lint: Undefined Variable Undefined variable 'tags' Lint: Undefined Variable: Undefined variable 'tags' | |||||
run = instance.create_run( | run = instance.create_run( | ||||
pipeline_name=external_sensor.pipeline_name, | pipeline_name=external_sensor.pipeline_name, | ||||
run_id=None, | run_id=None, | ||||
Not Done Inline Actionsdoes this show up anywhere useful? alangenfeld: does this show up anywhere useful? | |||||
run_config=run_request.run_config, | run_config=run_request.run_config, | ||||
mode=external_sensor.mode, | mode=external_sensor.mode, | ||||
solids_to_execute=external_pipeline.solids_to_execute, | solids_to_execute=external_pipeline.solids_to_execute, | ||||
step_keys_to_execute=None, | step_keys_to_execute=None, | ||||
solid_selection=external_sensor.solid_selection, | solid_selection=external_sensor.solid_selection, | ||||
status=( | status=( | ||||
PipelineRunStatus.FAILURE | PipelineRunStatus.FAILURE | ||||
if len(execution_plan_errors) > 0 | if len(execution_plan_errors) > 0 | ||||
else PipelineRunStatus.NOT_STARTED | else PipelineRunStatus.NOT_STARTED | ||||
), | ), | ||||
root_run_id=None, | root_run_id=None, | ||||
parent_run_id=None, | parent_run_id=None, | ||||
tags=tags, | tags=merge_dicts( | ||||
merge_dicts(pipeline_tags, run_request.tags), | |||||
PipelineRun.tags_for_sensor(external_sensor), | |||||
), | |||||
pipeline_snapshot=external_pipeline.pipeline_snapshot, | pipeline_snapshot=external_pipeline.pipeline_snapshot, | ||||
execution_plan_snapshot=execution_plan_snapshot, | execution_plan_snapshot=execution_plan_snapshot, | ||||
parent_pipeline_snapshot=external_pipeline.parent_pipeline_snapshot, | parent_pipeline_snapshot=external_pipeline.parent_pipeline_snapshot, | ||||
external_pipeline_origin=external_pipeline.get_external_origin(), | external_pipeline_origin=external_pipeline.get_external_origin(), | ||||
) | ) | ||||
if len(execution_plan_errors) > 0: | if len(execution_plan_errors) > 0: | ||||
for error in execution_plan_errors: | for error in execution_plan_errors: | ||||
Show All 12 Lines |
Unused import logging