Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/scheduler/scheduler.py
import abc | import abc | ||||
import os | import os | ||||
from collections import namedtuple | from collections import namedtuple | ||||
from datetime import datetime | from datetime import datetime | ||||
from enum import Enum | from enum import Enum | ||||
import six | import six | ||||
from dagster import check | from dagster import Field, String, check | ||||
from dagster.core.errors import DagsterError | from dagster.core.errors import DagsterError | ||||
from dagster.core.host_representation import ExternalSchedule | from dagster.core.host_representation import ExternalSchedule | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.origin import ScheduleOrigin | from dagster.core.origin import ScheduleOrigin | ||||
from dagster.serdes import ConfigurableClass, whitelist_for_serdes | from dagster.serdes import ConfigurableClass, whitelist_for_serdes | ||||
from dagster.seven import get_current_datetime_in_utc, get_timestamp_from_utc_datetime | from dagster.seven import get_current_datetime_in_utc, get_timestamp_from_utc_datetime | ||||
from dagster.utils import mkdir_p | from dagster.utils import mkdir_p | ||||
from dagster.utils.error import SerializableErrorInfo | from dagster.utils.error import SerializableErrorInfo | ||||
▲ Show 20 Lines • Show All 412 Lines • ▼ Show 20 Lines | def get_logs_path(self, instance, schedule_origin_id): | ||||
""" | """ | ||||
class DagsterCommandLineScheduler(Scheduler, ConfigurableClass): | class DagsterCommandLineScheduler(Scheduler, ConfigurableClass): | ||||
"""Scheduler implementation that launches runs from the `dagster scheduler run` | """Scheduler implementation that launches runs from the `dagster scheduler run` | ||||
long-lived process. | long-lived process. | ||||
""" | """ | ||||
def __init__( | def __init__(self, inst_data=None, default_timezone_str="UTC"): | ||||
self, inst_data=None, | |||||
): | |||||
self._inst_data = inst_data | self._inst_data = inst_data | ||||
self.default_timezone_str = default_timezone_str | |||||
@property | @property | ||||
def inst_data(self): | def inst_data(self): | ||||
return self._inst_data | return self._inst_data | ||||
@classmethod | @classmethod | ||||
def config_type(cls): | def config_type(cls): | ||||
return {} | return {"default_timezone": Field(String, is_required=False, default_value="UTC")} | ||||
@staticmethod | @staticmethod | ||||
def from_config_value(inst_data, config_value): | def from_config_value(inst_data, config_value): | ||||
return DagsterCommandLineScheduler(inst_data=inst_data) | default_timezone = config_value.get("default_timezone") | ||||
return DagsterCommandLineScheduler( | |||||
inst_data=inst_data, | |||||
default_timezone_str=default_timezone if default_timezone else "UTC", | |||||
) | |||||
def debug_info(self): | def debug_info(self): | ||||
return "" | return "" | ||||
def start_schedule(self, instance, external_schedule): | def start_schedule(self, instance, external_schedule): | ||||
# Automatically picked up by the `dagster scheduler run` command | # Automatically picked up by the `dagster scheduler run` command | ||||
pass | pass | ||||
▲ Show 20 Lines • Show All 200 Lines • Show Last 20 Lines |