Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/utils/test/schedule_storage.py
import sys | import sys | ||||
import time | import time | ||||
import pendulum | |||||
import pytest | import pytest | ||||
from dagster import DagsterInvariantViolationError | from dagster import DagsterInvariantViolationError | ||||
from dagster.core.code_pointer import ModuleCodePointer | from dagster.core.code_pointer import ModuleCodePointer | ||||
from dagster.core.origin import RepositoryPythonOrigin, SchedulePythonOrigin | from dagster.core.origin import RepositoryPythonOrigin, SchedulePythonOrigin | ||||
from dagster.core.scheduler import ScheduleState, ScheduleStatus | from dagster.core.scheduler import ScheduleState, ScheduleStatus | ||||
from dagster.core.scheduler.scheduler import ScheduleTickData, ScheduleTickStatus | from dagster.core.scheduler.scheduler import ScheduleTickData, ScheduleTickStatus | ||||
from dagster.seven import get_current_datetime_in_utc, get_timestamp_from_utc_datetime | |||||
from dagster.utils.error import SerializableErrorInfo | from dagster.utils.error import SerializableErrorInfo | ||||
class TestScheduleStorage: | class TestScheduleStorage: | ||||
""" | """ | ||||
You can extend this class to easily run these set of tests on any schedule storage. When extending, | You can extend this class to easily run these set of tests on any schedule storage. When extending, | ||||
you simply need to override the `schedule_storage` fixture and return your implementation of | you simply need to override the `schedule_storage` fixture and return your implementation of | ||||
`ScheduleStorage`. | `ScheduleStorage`. | ||||
▲ Show 20 Lines • Show All 79 Lines • ▼ Show 20 Lines | def test_get_schedule_state_not_found(self, storage): | ||||
assert schedule is None | assert schedule is None | ||||
def test_update_schedule(self, storage): | def test_update_schedule(self, storage): | ||||
assert storage | assert storage | ||||
schedule = self.build_schedule("my_schedule", "* * * * *") | schedule = self.build_schedule("my_schedule", "* * * * *") | ||||
storage.add_schedule_state(schedule) | storage.add_schedule_state(schedule) | ||||
now_time = get_current_datetime_in_utc() | now_time = pendulum.now("UTC") | ||||
new_schedule = schedule.with_status(ScheduleStatus.RUNNING, start_time_utc=now_time) | new_schedule = schedule.with_status(ScheduleStatus.RUNNING, start_time_utc=now_time) | ||||
storage.update_schedule_state(new_schedule) | storage.update_schedule_state(new_schedule) | ||||
schedules = storage.all_stored_schedule_state(self.fake_repo_target().get_id()) | schedules = storage.all_stored_schedule_state(self.fake_repo_target().get_id()) | ||||
assert len(schedules) == 1 | assert len(schedules) == 1 | ||||
schedule = schedules[0] | schedule = schedules[0] | ||||
assert schedule.name == "my_schedule" | assert schedule.name == "my_schedule" | ||||
assert schedule.status == ScheduleStatus.RUNNING | assert schedule.status == ScheduleStatus.RUNNING | ||||
assert schedule.start_timestamp == get_timestamp_from_utc_datetime(now_time) | assert schedule.start_timestamp == now_time.float_timestamp | ||||
stopped_schedule = schedule.with_status(ScheduleStatus.STOPPED) | stopped_schedule = schedule.with_status(ScheduleStatus.STOPPED) | ||||
storage.update_schedule_state(stopped_schedule) | storage.update_schedule_state(stopped_schedule) | ||||
schedules = storage.all_stored_schedule_state(self.fake_repo_target().get_id()) | schedules = storage.all_stored_schedule_state(self.fake_repo_target().get_id()) | ||||
assert len(schedules) == 1 | assert len(schedules) == 1 | ||||
schedule = schedules[0] | schedule = schedules[0] | ||||
▲ Show 20 Lines • Show All 160 Lines • Show Last 20 Lines |