Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/scheduler/scheduler.py
import datetime | import datetime | ||||
import logging | |||||
import os | import os | ||||
import sys | import sys | ||||
import time | import time | ||||
import pendulum | import pendulum | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.errors import DagsterSubprocessError | from dagster.core.errors import DagsterSubprocessError | ||||
from dagster.core.events import EngineEventData | from dagster.core.events import EngineEventData | ||||
from dagster.core.host_representation import ( | from dagster.core.host_representation import ( | ||||
ExternalPipeline, | ExternalPipeline, | ||||
ExternalScheduleExecutionErrorData, | ExternalScheduleExecutionErrorData, | ||||
PipelineSelector, | PipelineSelector, | ||||
RepositoryLocation, | RepositoryLocation, | ||||
RepositoryLocationHandle, | RepositoryLocationHandle, | ||||
) | ) | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance, is_memoized_run | ||||
from dagster.core.scheduler.job import JobState, JobStatus, JobTickData, JobTickStatus, JobType | from dagster.core.scheduler.job import JobState, JobStatus, JobTickData, JobTickStatus, JobType | ||||
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | ||||
from dagster.core.storage.tags import SCHEDULED_EXECUTION_TIME_TAG, check_tags | from dagster.core.storage.tags import MEMOIZED_RUN_TAG, SCHEDULED_EXECUTION_TIME_TAG, check_tags | ||||
from dagster.utils import merge_dicts | from dagster.utils import merge_dicts | ||||
from dagster.utils.error import serializable_error_info_from_exc_info | from dagster.utils.error import serializable_error_info_from_exc_info | ||||
class ScheduleTickHolder: | class ScheduleTickHolder: | ||||
def __init__(self, tick, instance, logger): | def __init__(self, tick, instance, logger): | ||||
self._tick = tick | self._tick = tick | ||||
self._instance = instance | self._instance = instance | ||||
▲ Show 20 Lines • Show All 368 Lines • ▼ Show 20 Lines | except DagsterSubprocessError as e: | ||||
execution_plan_errors.extend(e.subprocess_error_infos) | execution_plan_errors.extend(e.subprocess_error_infos) | ||||
except Exception as e: # pylint: disable=broad-except | except Exception as e: # pylint: disable=broad-except | ||||
execution_plan_errors.append(serializable_error_info_from_exc_info(sys.exc_info())) | execution_plan_errors.append(serializable_error_info_from_exc_info(sys.exc_info())) | ||||
pipeline_tags = external_pipeline.tags or {} | pipeline_tags = external_pipeline.tags or {} | ||||
check_tags(pipeline_tags, "pipeline_tags") | check_tags(pipeline_tags, "pipeline_tags") | ||||
tags = merge_dicts(pipeline_tags, schedule_tags) | tags = merge_dicts(pipeline_tags, schedule_tags) | ||||
if is_memoized_run(tags): | |||||
logging.warning( | |||||
'Tag "{tag}" was found when initializing pipeline run, however, memoized ' | |||||
"execution is only supported from the command line. This pipeline will run, but " | |||||
"outputs from previous executions will be ignored.".format(tag=MEMOIZED_RUN_TAG) | |||||
) | |||||
tags[SCHEDULED_EXECUTION_TIME_TAG] = schedule_time.in_tz("UTC").isoformat() | tags[SCHEDULED_EXECUTION_TIME_TAG] = schedule_time.in_tz("UTC").isoformat() | ||||
# If the run was scheduled correctly but there was an error creating its | # If the run was scheduled correctly but there was an error creating its | ||||
# run config, enter it into the run DB with a FAILURE status | # run config, enter it into the run DB with a FAILURE status | ||||
possibly_invalid_pipeline_run = instance.create_run( | possibly_invalid_pipeline_run = instance.create_run( | ||||
pipeline_name=external_schedule.pipeline_name, | pipeline_name=external_schedule.pipeline_name, | ||||
run_id=None, | run_id=None, | ||||
run_config=run_config, | run_config=run_config, | ||||
Show All 31 Lines |