Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/scheduler/scheduler.py
import abc | import abc | ||||
import os | import os | ||||
from collections import namedtuple | from collections import namedtuple | ||||
from datetime import datetime | |||||
from enum import Enum | from enum import Enum | ||||
import pendulum | |||||
import six | import six | ||||
from dagster import check | from dagster import Field, String, check | ||||
from dagster.core.errors import DagsterError | from dagster.core.errors import DagsterError | ||||
from dagster.core.host_representation import ExternalSchedule | from dagster.core.host_representation import ExternalSchedule | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.origin import ScheduleOrigin | from dagster.core.origin import ScheduleOrigin | ||||
from dagster.serdes import ConfigurableClass, whitelist_for_serdes | from dagster.serdes import ConfigurableClass, whitelist_for_serdes | ||||
from dagster.seven import get_current_datetime_in_utc, get_timestamp_from_utc_datetime | |||||
from dagster.utils import mkdir_p | from dagster.utils import mkdir_p | ||||
from dagster.utils.error import SerializableErrorInfo | from dagster.utils.error import SerializableErrorInfo | ||||
class DagsterSchedulerError(DagsterError): | class DagsterSchedulerError(DagsterError): | ||||
"""Base class for all Dagster Scheduler errors""" | """Base class for all Dagster Scheduler errors""" | ||||
▲ Show 20 Lines • Show All 126 Lines • ▼ Show 20 Lines | def schedule_origin_id(self): | ||||
return self.origin.get_id() | return self.origin.get_id() | ||||
@property | @property | ||||
def repository_origin_id(self): | def repository_origin_id(self): | ||||
return self.origin.repository_origin.get_id() | return self.origin.repository_origin.get_id() | ||||
def with_status(self, status, start_time_utc=None): | def with_status(self, status, start_time_utc=None): | ||||
check.inst_param(status, "status", ScheduleStatus) | check.inst_param(status, "status", ScheduleStatus) | ||||
check.opt_inst_param(start_time_utc, "start_time_utc", datetime) | check.opt_inst_param(start_time_utc, "start_time_utc", pendulum.DateTime) | ||||
check.invariant( | check.invariant( | ||||
(status == ScheduleStatus.RUNNING) == (start_time_utc != None), | (status == ScheduleStatus.RUNNING) == (start_time_utc != None), | ||||
"start_time_utc must be set if and only if the schedule is being started", | "start_time_utc must be set if and only if the schedule is being started", | ||||
) | ) | ||||
return ScheduleState( | return ScheduleState( | ||||
self.origin, | self.origin, | ||||
status=status, | status=status, | ||||
cron_schedule=self.cron_schedule, | cron_schedule=self.cron_schedule, | ||||
start_timestamp=get_timestamp_from_utc_datetime(start_time_utc) | start_timestamp=start_time_utc.float_timestamp if start_time_utc else None, | ||||
if start_time_utc | |||||
else None, | |||||
) | ) | ||||
class Scheduler(six.with_metaclass(abc.ABCMeta)): | class Scheduler(six.with_metaclass(abc.ABCMeta)): | ||||
"""Abstract base class for a scheduler. This component is responsible for interfacing with | """Abstract base class for a scheduler. This component is responsible for interfacing with | ||||
an external system such as cron to ensure scheduled repeated execution according. | an external system such as cron to ensure scheduled repeated execution according. | ||||
""" | """ | ||||
Show All 29 Lines | def reconcile_scheduler_state(self, instance, external_repository): | ||||
for external_schedule in external_repository.get_external_schedules(): | for external_schedule in external_repository.get_external_schedules(): | ||||
# If a schedule already exists for schedule_def, overwrite bash script and | # If a schedule already exists for schedule_def, overwrite bash script and | ||||
# metadata file | # metadata file | ||||
existing_schedule_state = instance.get_schedule_state(external_schedule.get_origin_id()) | existing_schedule_state = instance.get_schedule_state(external_schedule.get_origin_id()) | ||||
if existing_schedule_state: | if existing_schedule_state: | ||||
new_timestamp = existing_schedule_state.start_timestamp | new_timestamp = existing_schedule_state.start_timestamp | ||||
if not new_timestamp and existing_schedule_state.status == ScheduleStatus.RUNNING: | if not new_timestamp and existing_schedule_state.status == ScheduleStatus.RUNNING: | ||||
new_timestamp = get_timestamp_from_utc_datetime(get_current_datetime_in_utc()) | new_timestamp = pendulum.now("UTC").float_timestamp | ||||
# Keep the status, update target and cron schedule | # Keep the status, update target and cron schedule | ||||
schedule_state = ScheduleState( | schedule_state = ScheduleState( | ||||
external_schedule.get_origin(), | external_schedule.get_origin(), | ||||
existing_schedule_state.status, | existing_schedule_state.status, | ||||
external_schedule.cron_schedule, | external_schedule.cron_schedule, | ||||
new_timestamp, | new_timestamp, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | def start_schedule_and_update_storage_state(self, instance, external_schedule): | ||||
raise DagsterSchedulerError( | raise DagsterSchedulerError( | ||||
"You have attempted to start schedule {name}, but it is already running".format( | "You have attempted to start schedule {name}, but it is already running".format( | ||||
name=external_schedule.name | name=external_schedule.name | ||||
) | ) | ||||
) | ) | ||||
self.start_schedule(instance, external_schedule) | self.start_schedule(instance, external_schedule) | ||||
started_schedule = schedule_state.with_status( | started_schedule = schedule_state.with_status( | ||||
ScheduleStatus.RUNNING, start_time_utc=get_current_datetime_in_utc() | ScheduleStatus.RUNNING, start_time_utc=pendulum.now("UTC"), | ||||
) | ) | ||||
instance.update_schedule_state(started_schedule) | instance.update_schedule_state(started_schedule) | ||||
return started_schedule | return started_schedule | ||||
def stop_schedule_and_update_storage_state(self, instance, schedule_origin_id): | def stop_schedule_and_update_storage_state(self, instance, schedule_origin_id): | ||||
""" | """ | ||||
Updates the status of the given schedule to `ScheduleStatus.STOPPED` in schedule storage, | Updates the status of the given schedule to `ScheduleStatus.STOPPED` in schedule storage, | ||||
then calls `stop_schedule`. | then calls `stop_schedule`. | ||||
▲ Show 20 Lines • Show All 115 Lines • ▼ Show 20 Lines | def get_logs_path(self, instance, schedule_origin_id): | ||||
""" | """ | ||||
class DagsterCommandLineScheduler(Scheduler, ConfigurableClass): | class DagsterCommandLineScheduler(Scheduler, ConfigurableClass): | ||||
"""Scheduler implementation that launches runs from the `dagster scheduler run` | """Scheduler implementation that launches runs from the `dagster scheduler run` | ||||
long-lived process. | long-lived process. | ||||
""" | """ | ||||
def __init__( | def __init__(self, inst_data=None, default_timezone_str="UTC"): | ||||
self, inst_data=None, | |||||
): | |||||
self._inst_data = inst_data | self._inst_data = inst_data | ||||
self.default_timezone_str = default_timezone_str | |||||
@property | @property | ||||
def inst_data(self): | def inst_data(self): | ||||
return self._inst_data | return self._inst_data | ||||
@classmethod | @classmethod | ||||
def config_type(cls): | def config_type(cls): | ||||
return {} | return {"default_timezone": Field(String, is_required=False, default_value="UTC")} | ||||
@staticmethod | @staticmethod | ||||
def from_config_value(inst_data, config_value): | def from_config_value(inst_data, config_value): | ||||
return DagsterCommandLineScheduler(inst_data=inst_data) | default_timezone = config_value.get("default_timezone") | ||||
return DagsterCommandLineScheduler( | |||||
inst_data=inst_data, | |||||
default_timezone_str=default_timezone if default_timezone else "UTC", | |||||
) | |||||
def debug_info(self): | def debug_info(self): | ||||
return "" | return "" | ||||
def start_schedule(self, instance, external_schedule): | def start_schedule(self, instance, external_schedule): | ||||
# Automatically picked up by the `dagster scheduler run` command | # Automatically picked up by the `dagster scheduler run` command | ||||
pass | pass | ||||
▲ Show 20 Lines • Show All 200 Lines • Show Last 20 Lines |