Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/scheduler/scheduler.py
import datetime | import datetime | ||||
import logging | import logging | ||||
import os | import os | ||||
import sys | import sys | ||||
import time | import time | ||||
import click | import click | ||||
import pendulum | |||||
from croniter import croniter_range | from croniter import croniter_range | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.errors import DagsterSubprocessError | from dagster.core.errors import DagsterSubprocessError | ||||
from dagster.core.events import EngineEventData | from dagster.core.events import EngineEventData | ||||
from dagster.core.host_representation import ( | from dagster.core.host_representation import ( | ||||
ExternalPipeline, | ExternalPipeline, | ||||
ExternalScheduleExecutionErrorData, | ExternalScheduleExecutionErrorData, | ||||
PipelineSelector, | PipelineSelector, | ||||
RepositoryLocation, | RepositoryLocation, | ||||
RepositoryLocationHandle, | RepositoryLocationHandle, | ||||
) | ) | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.scheduler import ( | from dagster.core.scheduler import ( | ||||
DagsterCommandLineScheduler, | |||||
ScheduleState, | ScheduleState, | ||||
ScheduleStatus, | ScheduleStatus, | ||||
ScheduleTickData, | ScheduleTickData, | ||||
ScheduleTickStatus, | ScheduleTickStatus, | ||||
) | ) | ||||
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | ||||
from dagster.core.storage.tags import SCHEDULED_EXECUTION_TIME_TAG, check_tags | from dagster.core.storage.tags import SCHEDULED_EXECUTION_TIME_TAG, check_tags | ||||
from dagster.grpc.types import ScheduleExecutionDataMode | from dagster.grpc.types import ScheduleExecutionDataMode | ||||
from dagster.seven import ( | |||||
get_current_datetime_in_utc, | |||||
get_timestamp_from_utc_datetime, | |||||
get_utc_timezone, | |||||
) | |||||
from dagster.utils import merge_dicts | from dagster.utils import merge_dicts | ||||
from dagster.utils.error import serializable_error_info_from_exc_info | from dagster.utils.error import serializable_error_info_from_exc_info | ||||
from dagster.utils.log import default_format_string | from dagster.utils.log import default_format_string | ||||
class ScheduleTickHolder: | class ScheduleTickHolder: | ||||
def __init__(self, tick, instance, logger): | def __init__(self, tick, instance, logger): | ||||
self._tick = tick | self._tick = tick | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | @click.option( | ||||
"--max-catchup-runs", | "--max-catchup-runs", | ||||
help="Max number of past runs since the schedule was started that we should execute", | help="Max number of past runs since the schedule was started that we should execute", | ||||
default=_DEFAULT_MAX_CATCHUP_RUNS, | default=_DEFAULT_MAX_CATCHUP_RUNS, | ||||
) | ) | ||||
def scheduler_run_command(interval, max_catchup_runs): | def scheduler_run_command(interval, max_catchup_runs): | ||||
execute_scheduler_command(interval, max_catchup_runs) | execute_scheduler_command(interval, max_catchup_runs) | ||||
def _mockable_localtime(_): | |||||
now_time = pendulum.now() | |||||
return now_time.timetuple() | |||||
def get_default_scheduler_logger(): | def get_default_scheduler_logger(): | ||||
handler = logging.StreamHandler(sys.stdout) | handler = logging.StreamHandler(sys.stdout) | ||||
logger = logging.getLogger("dagster-scheduler") | logger = logging.getLogger("dagster-scheduler") | ||||
logger.setLevel(logging.INFO) | logger.setLevel(logging.INFO) | ||||
logger.handlers = [handler] | logger.handlers = [handler] | ||||
formatter = logging.Formatter(default_format_string(), "%Y-%m-%d %H:%M:%S") | formatter = logging.Formatter(default_format_string(), "%Y-%m-%d %H:%M:%S") | ||||
formatter.converter = time.localtime | formatter.converter = _mockable_localtime | ||||
handler.setFormatter(formatter) | handler.setFormatter(formatter) | ||||
return logger | return logger | ||||
def execute_scheduler_command(interval, max_catchup_runs): | def execute_scheduler_command(interval, max_catchup_runs): | ||||
logger = get_default_scheduler_logger() | logger = get_default_scheduler_logger() | ||||
while True: | while True: | ||||
with DagsterInstance.get() as instance: | with DagsterInstance.get() as instance: | ||||
end_datetime_utc = get_current_datetime_in_utc() | end_datetime_utc = pendulum.now("UTC") | ||||
launch_scheduled_runs(instance, logger, end_datetime_utc, max_catchup_runs) | launch_scheduled_runs(instance, logger, end_datetime_utc, max_catchup_runs) | ||||
time_left = interval - (get_current_datetime_in_utc() - end_datetime_utc).seconds | time_left = interval - (pendulum.now("UTC") - end_datetime_utc).seconds | ||||
if time_left > 0: | if time_left > 0: | ||||
time.sleep(time_left) | time.sleep(time_left) | ||||
def launch_scheduled_runs( | def launch_scheduled_runs( | ||||
instance, | instance, | ||||
logger, | logger, | ||||
Show All 24 Lines | |||||
def launch_scheduled_runs_for_schedule( | def launch_scheduled_runs_for_schedule( | ||||
instance, logger, schedule_state, end_datetime_utc, max_catchup_runs, debug_crash_flags=None | instance, logger, schedule_state, end_datetime_utc, max_catchup_runs, debug_crash_flags=None | ||||
): | ): | ||||
check.inst_param(instance, "instance", DagsterInstance) | check.inst_param(instance, "instance", DagsterInstance) | ||||
check.inst_param(schedule_state, "schedule_state", ScheduleState) | check.inst_param(schedule_state, "schedule_state", ScheduleState) | ||||
check.inst_param(end_datetime_utc, "end_datetime_utc", datetime.datetime) | check.inst_param(end_datetime_utc, "end_datetime_utc", datetime.datetime) | ||||
scheduler = instance.scheduler | |||||
check.invariant(isinstance(scheduler, DagsterCommandLineScheduler)) | |||||
sashank: Nit: We might want to have an error message here, so people know why this line is failing. | |||||
latest_tick = instance.get_latest_tick(schedule_state.schedule_origin_id) | latest_tick = instance.get_latest_tick(schedule_state.schedule_origin_id) | ||||
if not latest_tick: | if not latest_tick: | ||||
start_timestamp_utc = schedule_state.start_timestamp | start_timestamp_utc = schedule_state.start_timestamp | ||||
elif latest_tick.status == ScheduleTickStatus.STARTED: | elif latest_tick.status == ScheduleTickStatus.STARTED: | ||||
# Scheduler was interrupted while performing this tick, re-do it | # Scheduler was interrupted while performing this tick, re-do it | ||||
start_timestamp_utc = latest_tick.timestamp | start_timestamp_utc = latest_tick.timestamp | ||||
else: | else: | ||||
start_timestamp_utc = latest_tick.timestamp + 1 | start_timestamp_utc = latest_tick.timestamp + 1 | ||||
start_datetime_utc = datetime.datetime.fromtimestamp(start_timestamp_utc, tz=get_utc_timezone()) | timezone_str = scheduler.default_timezone_str | ||||
tick_times = list( | end_datetime = end_datetime_utc.in_tz(timezone_str) | ||||
croniter_range(start_datetime_utc, end_datetime_utc, schedule_state.cron_schedule) | start_datetime = pendulum.from_timestamp(start_timestamp_utc, tz=timezone_str) | ||||
check.invariant(start_datetime.timezone.name == end_datetime.timezone.name) | |||||
tick_times = ( | |||||
list(croniter_range(start_datetime, end_datetime, schedule_state.cron_schedule)) | |||||
if end_datetime >= start_datetime | |||||
else [] | |||||
) | ) | ||||
if not tick_times: | if not tick_times: | ||||
logger.info("No new runs for {schedule_name}".format(schedule_name=schedule_state.name)) | logger.info("No new runs for {schedule_name}".format(schedule_name=schedule_state.name)) | ||||
return | return | ||||
if len(tick_times) > max_catchup_runs: | if len(tick_times) > max_catchup_runs: | ||||
logger.warn( | logger.warn( | ||||
"{schedule_name} has fallen behind, only launching {max_catchup_runs} runs".format( | "{schedule_name} has fallen behind, only launching {max_catchup_runs} runs".format( | ||||
schedule_name=schedule_state.name, max_catchup_runs=max_catchup_runs | schedule_name=schedule_state.name, max_catchup_runs=max_catchup_runs | ||||
) | ) | ||||
) | ) | ||||
tick_times = tick_times[-max_catchup_runs:] | tick_times = tick_times[-max_catchup_runs:] | ||||
if len(tick_times) == 1: | if len(tick_times) == 1: | ||||
logger.info( | logger.info( | ||||
"Launching run for {schedule_name} at {time}".format( | "Launching run for {schedule_name} at {time}".format( | ||||
schedule_name=schedule_state.name, | schedule_name=schedule_state.name, | ||||
time=tick_times[0].strftime(_SCHEDULER_DATETIME_FORMAT), | time=tick_times[0].strftime(_SCHEDULER_DATETIME_FORMAT), | ||||
Not Done Inline Actions👍 🎉 sashank: 👍 🎉 | |||||
Not Done Inline ActionsIt might be helpful to add that example you showed me inline for future readers to know exactly what you're talking about sashank: It might be helpful to add that example you showed me inline for future readers to know exactly… | |||||
) | ) | ||||
) | ) | ||||
else: | else: | ||||
logger.info( | logger.info( | ||||
"Launching {num_runs} runs for {schedule_name} at the following times: {times}".format( | "Launching {num_runs} runs for {schedule_name} at the following times: {times}".format( | ||||
num_runs=len(tick_times), | num_runs=len(tick_times), | ||||
schedule_name=schedule_state.name, | schedule_name=schedule_state.name, | ||||
times=", ".join([time.strftime(_SCHEDULER_DATETIME_FORMAT) for time in tick_times]), | times=", ".join([time.strftime(_SCHEDULER_DATETIME_FORMAT) for time in tick_times]), | ||||
) | ) | ||||
) | ) | ||||
for schedule_time_utc in tick_times: | for schedule_time in tick_times: | ||||
schedule_timestamp = get_timestamp_from_utc_datetime(schedule_time_utc) | schedule_time_utc = pendulum.instance(schedule_time).in_tz("UTC") | ||||
schedule_timestamp = schedule_time_utc.float_timestamp | |||||
if latest_tick and latest_tick.timestamp == schedule_timestamp: | if latest_tick and latest_tick.timestamp == schedule_timestamp: | ||||
tick = latest_tick | tick = latest_tick | ||||
logger.info("Resuming previously interrupted schedule execution") | logger.info("Resuming previously interrupted schedule execution") | ||||
else: | else: | ||||
tick = instance.create_schedule_tick( | tick = instance.create_schedule_tick( | ||||
ScheduleTickData( | ScheduleTickData( | ||||
schedule_origin_id=schedule_state.schedule_origin_id, | schedule_origin_id=schedule_state.schedule_origin_id, | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | def _schedule_run_at_time( | ||||
repo_location, | repo_location, | ||||
schedule_state, | schedule_state, | ||||
schedule_time_utc, | schedule_time_utc, | ||||
tick_holder, | tick_holder, | ||||
debug_crash_flags, | debug_crash_flags, | ||||
): | ): | ||||
schedule_name = schedule_state.name | schedule_name = schedule_state.name | ||||
check.invariant(schedule_time_utc.timezone_name == "UTC") | |||||
repo_dict = repo_location.get_repositories() | repo_dict = repo_location.get_repositories() | ||||
check.invariant( | check.invariant( | ||||
len(repo_dict) == 1, "Reconstructed repository location should have exactly one repository", | len(repo_dict) == 1, "Reconstructed repository location should have exactly one repository", | ||||
) | ) | ||||
external_repo = next(iter(repo_dict.values())) | external_repo = next(iter(repo_dict.values())) | ||||
external_schedule = external_repo.get_external_schedule(schedule_name) | external_schedule = external_repo.get_external_schedule(schedule_name) | ||||
▲ Show 20 Lines • Show All 146 Lines • ▼ Show 20 Lines | except DagsterSubprocessError as e: | ||||
execution_plan_errors.extend(e.subprocess_error_infos) | execution_plan_errors.extend(e.subprocess_error_infos) | ||||
except Exception as e: # pylint: disable=broad-except | except Exception as e: # pylint: disable=broad-except | ||||
execution_plan_errors.append(serializable_error_info_from_exc_info(sys.exc_info())) | execution_plan_errors.append(serializable_error_info_from_exc_info(sys.exc_info())) | ||||
pipeline_tags = external_pipeline.tags or {} | pipeline_tags = external_pipeline.tags or {} | ||||
check_tags(pipeline_tags, "pipeline_tags") | check_tags(pipeline_tags, "pipeline_tags") | ||||
tags = merge_dicts(pipeline_tags, schedule_tags) | tags = merge_dicts(pipeline_tags, schedule_tags) | ||||
tags[SCHEDULED_EXECUTION_TIME_TAG] = schedule_time_utc.isoformat() | tags[SCHEDULED_EXECUTION_TIME_TAG] = schedule_time_utc.isoformat() | ||||
Not Done Inline ActionsShould we add another tag that's in the local execution time? To help people debug? sashank: Should we add another tag that's in the local execution time? To help people debug? | |||||
# If the run was scheduled correctly but there was an error creating its | # If the run was scheduled correctly but there was an error creating its | ||||
# run config, enter it into the run DB with a FAILURE status | # run config, enter it into the run DB with a FAILURE status | ||||
possibly_invalid_pipeline_run = instance.create_run( | possibly_invalid_pipeline_run = instance.create_run( | ||||
pipeline_name=external_schedule.pipeline_name, | pipeline_name=external_schedule.pipeline_name, | ||||
run_id=None, | run_id=None, | ||||
run_config=run_config, | run_config=run_config, | ||||
mode=external_schedule.mode, | mode=external_schedule.mode, | ||||
Show All 38 Lines |
Nit: We might want to have an error message here, so people know why this line is failing.
Something along the lines of: