Differential D4671 Diff 23273 python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagster-cron/dagster_cron_tests/test_cron_scheduler.py
import os | import os | ||||
import re | import re | ||||
import subprocess | import subprocess | ||||
import sys | import sys | ||||
import pendulum | |||||
import pytest | import pytest | ||||
import yaml | import yaml | ||||
from dagster_cron import SystemCronScheduler | from dagster_cron import SystemCronScheduler | ||||
from freezegun import freeze_time | |||||
from dagster import ScheduleDefinition | from dagster import ScheduleDefinition | ||||
from dagster.core.definitions import lambda_solid, pipeline, repository | from dagster.core.definitions import lambda_solid, pipeline, repository | ||||
from dagster.core.host_representation import PythonEnvRepositoryLocation, RepositoryLocationHandle | from dagster.core.host_representation import PythonEnvRepositoryLocation, RepositoryLocationHandle | ||||
from dagster.core.instance import DagsterInstance, InstanceType | from dagster.core.instance import DagsterInstance, InstanceType | ||||
from dagster.core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher | from dagster.core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher | ||||
from dagster.core.scheduler import ScheduleState, ScheduleStatus | from dagster.core.scheduler import ScheduleState, ScheduleStatus | ||||
from dagster.core.scheduler.scheduler import ( | from dagster.core.scheduler.scheduler import ( | ||||
DagsterScheduleDoesNotExist, | DagsterScheduleDoesNotExist, | ||||
DagsterScheduleReconciliationError, | DagsterScheduleReconciliationError, | ||||
DagsterSchedulerError, | DagsterSchedulerError, | ||||
) | ) | ||||
from dagster.core.storage.event_log import InMemoryEventLogStorage | from dagster.core.storage.event_log import InMemoryEventLogStorage | ||||
from dagster.core.storage.noop_compute_log_manager import NoOpComputeLogManager | from dagster.core.storage.noop_compute_log_manager import NoOpComputeLogManager | ||||
from dagster.core.storage.pipeline_run import PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRunStatus | ||||
from dagster.core.storage.root import LocalArtifactStorage | from dagster.core.storage.root import LocalArtifactStorage | ||||
from dagster.core.storage.runs import InMemoryRunStorage | from dagster.core.storage.runs import InMemoryRunStorage | ||||
from dagster.core.storage.schedules import SqliteScheduleStorage | from dagster.core.storage.schedules import SqliteScheduleStorage | ||||
from dagster.core.test_utils import environ | from dagster.core.test_utils import environ | ||||
from dagster.core.types.loadable_target_origin import LoadableTargetOrigin | from dagster.core.types.loadable_target_origin import LoadableTargetOrigin | ||||
from dagster.seven import ( | from dagster.seven import TemporaryDirectory | ||||
TemporaryDirectory, | |||||
get_current_datetime_in_utc, | |||||
get_timestamp_from_utc_datetime, | |||||
) | |||||
@pytest.fixture(scope="function") | @pytest.fixture(scope="function") | ||||
def restore_cron_tab(): | def restore_cron_tab(): | ||||
with TemporaryDirectory() as tempdir: | with TemporaryDirectory() as tempdir: | ||||
crontab_backup = os.path.join(tempdir, "crontab_backup.txt") | crontab_backup = os.path.join(tempdir, "crontab_backup.txt") | ||||
with open(crontab_backup, "wb+") as f: | with open(crontab_backup, "wb+") as f: | ||||
try: | try: | ||||
▲ Show 20 Lines • Show All 115 Lines • ▼ Show 20 Lines | with TemporaryDirectory() as tempdir: | ||||
instance.reconcile_scheduler_state(external_repository) | instance.reconcile_scheduler_state(external_repository) | ||||
# Check schedules are saved to disk | # Check schedules are saved to disk | ||||
assert "schedules" in os.listdir(tempdir) | assert "schedules" in os.listdir(tempdir) | ||||
assert instance.all_stored_schedule_state() | assert instance.all_stored_schedule_state() | ||||
@freeze_time("2019-02-27") | |||||
def test_re_init(restore_cron_tab): # pylint:disable=unused-argument,redefined-outer-name | def test_re_init(restore_cron_tab): # pylint:disable=unused-argument,redefined-outer-name | ||||
with TemporaryDirectory() as tempdir: | with TemporaryDirectory() as tempdir: | ||||
with pendulum.test(pendulum.datetime(year=2019, month=2, day=27)): | |||||
instance = define_scheduler_instance(tempdir) | instance = define_scheduler_instance(tempdir) | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
now = get_current_datetime_in_utc() | now = pendulum.now("UTC") | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
# Start schedule | # Start schedule | ||||
schedule_state = instance.start_schedule_and_update_storage_state( | schedule_state = instance.start_schedule_and_update_storage_state( | ||||
external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | ||||
) | ) | ||||
assert schedule_state.start_timestamp == get_timestamp_from_utc_datetime(now) | assert schedule_state.start_timestamp == now.float_timestamp | ||||
# Re-initialize scheduler | # Re-initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
# Check schedules are saved to disk | # Check schedules are saved to disk | ||||
assert "schedules" in os.listdir(tempdir) | assert "schedules" in os.listdir(tempdir) | ||||
schedule_states = instance.all_stored_schedule_state() | schedule_states = instance.all_stored_schedule_state() | ||||
for state in schedule_states: | for state in schedule_states: | ||||
if state.name == "no_config_pipeline_every_min_schedule": | if state.name == "no_config_pipeline_every_min_schedule": | ||||
assert state == schedule_state | assert state == schedule_state | ||||
def test_start_and_stop_schedule( | def test_start_and_stop_schedule( | ||||
restore_cron_tab, | restore_cron_tab, | ||||
): # pylint:disable=unused-argument,redefined-outer-name | ): # pylint:disable=unused-argument,redefined-outer-name | ||||
with TemporaryDirectory() as tempdir: | with TemporaryDirectory() as tempdir: | ||||
instance = define_scheduler_instance(tempdir) | instance = define_scheduler_instance(tempdir) | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
▲ Show 20 Lines • Show All 552 Lines • ▼ Show 20 Lines | with TemporaryDirectory() as tempdir: | ||||
# Initialize scheduler | # Initialize scheduler | ||||
with pytest.raises( | with pytest.raises( | ||||
DagsterScheduleReconciliationError, | DagsterScheduleReconciliationError, | ||||
match="Error 1: Failed to stop\n Error 2: Failed to stop\n Error 3: Failed to stop", | match="Error 1: Failed to stop\n Error 2: Failed to stop\n Error 3: Failed to stop", | ||||
): | ): | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
@freeze_time("2019-02-27") | |||||
def test_reconcile_schedule_without_start_time(): | def test_reconcile_schedule_without_start_time(): | ||||
with TemporaryDirectory() as tempdir: | with TemporaryDirectory() as tempdir: | ||||
with pendulum.test(pendulum.datetime(year=2019, month=2, day=27)): | |||||
instance = define_scheduler_instance(tempdir) | instance = define_scheduler_instance(tempdir) | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
external_schedule = external_repo.get_external_schedule("no_config_pipeline_daily_schedule") | external_schedule = external_repo.get_external_schedule( | ||||
"no_config_pipeline_daily_schedule" | |||||
) | |||||
legacy_schedule_state = ScheduleState( | legacy_schedule_state = ScheduleState( | ||||
external_schedule.get_origin(), | external_schedule.get_origin(), | ||||
ScheduleStatus.RUNNING, | ScheduleStatus.RUNNING, | ||||
external_schedule.cron_schedule, | external_schedule.cron_schedule, | ||||
None, | None, | ||||
) | ) | ||||
instance.add_schedule_state(legacy_schedule_state) | instance.add_schedule_state(legacy_schedule_state) | ||||
instance.reconcile_scheduler_state(external_repository=external_repo) | instance.reconcile_scheduler_state(external_repository=external_repo) | ||||
reconciled_schedule_state = instance.get_schedule_state(external_schedule.get_origin_id()) | reconciled_schedule_state = instance.get_schedule_state( | ||||
external_schedule.get_origin_id() | |||||
) | |||||
assert reconciled_schedule_state.status == ScheduleStatus.RUNNING | assert reconciled_schedule_state.status == ScheduleStatus.RUNNING | ||||
assert reconciled_schedule_state.start_timestamp == get_timestamp_from_utc_datetime( | assert reconciled_schedule_state.start_timestamp == pendulum.now("UTC").float_timestamp | ||||
get_current_datetime_in_utc() | |||||
) | |||||
def test_reconcile_failure_when_deleting_schedule_def( | def test_reconcile_failure_when_deleting_schedule_def( | ||||
restore_cron_tab, | restore_cron_tab, | ||||
): # pylint:disable=unused-argument,redefined-outer-name | ): # pylint:disable=unused-argument,redefined-outer-name | ||||
with TemporaryDirectory() as tempdir: | with TemporaryDirectory() as tempdir: | ||||
instance = define_scheduler_instance(tempdir) | instance = define_scheduler_instance(tempdir) | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
Show All 17 Lines |