Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/scheduler/scheduler.py
Show First 20 Lines • Show All 171 Lines • ▼ Show 20 Lines | def launch_scheduled_runs_for_schedule( | ||||
max_catchup_runs, | max_catchup_runs, | ||||
debug_crash_flags=None, | debug_crash_flags=None, | ||||
): | ): | ||||
check.inst_param(instance, "instance", DagsterInstance) | check.inst_param(instance, "instance", DagsterInstance) | ||||
check.inst_param(schedule_state, "schedule_state", ScheduleState) | check.inst_param(schedule_state, "schedule_state", ScheduleState) | ||||
check.inst_param(end_datetime_utc, "end_datetime_utc", datetime.datetime) | check.inst_param(end_datetime_utc, "end_datetime_utc", datetime.datetime) | ||||
check.inst_param(repo_location, "repo_location", RepositoryLocation) | check.inst_param(repo_location, "repo_location", RepositoryLocation) | ||||
scheduler = instance.scheduler | |||||
latest_tick = instance.get_latest_tick(schedule_state.schedule_origin_id) | latest_tick = instance.get_latest_tick(schedule_state.schedule_origin_id) | ||||
if not latest_tick: | if not latest_tick: | ||||
start_timestamp_utc = schedule_state.start_timestamp | start_timestamp_utc = schedule_state.start_timestamp | ||||
elif latest_tick.status == ScheduleTickStatus.STARTED: | elif latest_tick.status == ScheduleTickStatus.STARTED: | ||||
# Scheduler was interrupted while performing this tick, re-do it | # Scheduler was interrupted while performing this tick, re-do it | ||||
start_timestamp_utc = latest_tick.timestamp | start_timestamp_utc = latest_tick.timestamp | ||||
else: | else: | ||||
start_timestamp_utc = latest_tick.timestamp + 1 | start_timestamp_utc = latest_tick.timestamp + 1 | ||||
schedule_name = schedule_state.name | schedule_name = schedule_state.name | ||||
repo_dict = repo_location.get_repositories() | repo_dict = repo_location.get_repositories() | ||||
check.invariant( | check.invariant( | ||||
len(repo_dict) == 1, "Reconstructed repository location should have exactly one repository", | len(repo_dict) == 1, "Reconstructed repository location should have exactly one repository", | ||||
) | ) | ||||
external_repo = next(iter(repo_dict.values())) | external_repo = next(iter(repo_dict.values())) | ||||
external_schedule = external_repo.get_external_schedule(schedule_name) | external_schedule = external_repo.get_external_schedule(schedule_name) | ||||
timezone_str = ( | timezone_str = ( | ||||
external_schedule.execution_timezone | external_schedule.execution_timezone | ||||
if external_schedule.execution_timezone | if external_schedule.execution_timezone | ||||
else scheduler.default_timezone_str | else instance.default_schedule_timezone | ||||
) | ) | ||||
end_datetime = end_datetime_utc.in_tz(timezone_str) | end_datetime = end_datetime_utc.in_tz(timezone_str) | ||||
start_datetime = pendulum.from_timestamp(start_timestamp_utc, tz=timezone_str) | start_datetime = pendulum.from_timestamp(start_timestamp_utc, tz=timezone_str) | ||||
date_iter = croniter(external_schedule.cron_schedule, start_datetime) | date_iter = croniter(external_schedule.cron_schedule, start_datetime) | ||||
tick_times = [] | tick_times = [] | ||||
▲ Show 20 Lines • Show All 305 Lines • Show Last 20 Lines |