Differential D4671 Diff 23272 python_modules/dagster/dagster_tests/scheduler_tests/test_scheduler_run.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/scheduler_tests/test_scheduler_run.py
import os | import os | ||||
import sys | import sys | ||||
import time | import time | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
from datetime import datetime, timedelta | from datetime import datetime | ||||
import pendulum | |||||
import pytest | import pytest | ||||
from freezegun import freeze_time | |||||
from dagster import DagsterEventType, daily_schedule, hourly_schedule, pipeline, repository, solid | from dagster import DagsterEventType, daily_schedule, hourly_schedule, pipeline, repository, solid | ||||
from dagster.core.definitions.reconstructable import ReconstructableRepository | from dagster.core.definitions.reconstructable import ReconstructableRepository | ||||
from dagster.core.host_representation import ( | from dagster.core.host_representation import ( | ||||
PythonEnvRepositoryLocation, | PythonEnvRepositoryLocation, | ||||
RepositoryLocation, | RepositoryLocation, | ||||
RepositoryLocationHandle, | RepositoryLocationHandle, | ||||
) | ) | ||||
from dagster.core.scheduler import ScheduleState, ScheduleStatus, ScheduleTickStatus | from dagster.core.scheduler import ScheduleState, ScheduleStatus, ScheduleTickStatus | ||||
from dagster.core.storage.pipeline_run import PipelineRunStatus, PipelineRunsFilter | from dagster.core.storage.pipeline_run import PipelineRunStatus, PipelineRunsFilter | ||||
from dagster.core.storage.tags import PARTITION_NAME_TAG, SCHEDULED_EXECUTION_TIME_TAG | from dagster.core.storage.tags import PARTITION_NAME_TAG, SCHEDULED_EXECUTION_TIME_TAG | ||||
from dagster.core.test_utils import environ, instance_for_test | from dagster.core.test_utils import instance_for_test | ||||
from dagster.core.types.loadable_target_origin import LoadableTargetOrigin | from dagster.core.types.loadable_target_origin import LoadableTargetOrigin | ||||
from dagster.grpc.server import GrpcServerProcess | from dagster.grpc.server import GrpcServerProcess | ||||
from dagster.scheduler.scheduler import get_default_scheduler_logger, launch_scheduled_runs | from dagster.scheduler.scheduler import get_default_scheduler_logger, launch_scheduled_runs | ||||
from dagster.seven import ( | |||||
get_current_datetime_in_utc, | |||||
get_timestamp_from_utc_datetime, | |||||
get_utc_timezone, | |||||
) | |||||
from dagster.utils import merge_dicts | from dagster.utils import merge_dicts | ||||
_COUPLE_DAYS_AGO = datetime(year=2019, month=2, day=25) | _COUPLE_DAYS_AGO = datetime(year=2019, month=2, day=25) | ||||
def _throw(_context): | def _throw(_context): | ||||
raise Exception("bananas") | raise Exception("bananas") | ||||
▲ Show 20 Lines • Show All 124 Lines • ▼ Show 20 Lines | try: | ||||
port=api_client.port, socket=api_client.socket, host=api_client.host, | port=api_client.port, socket=api_client.socket, host=api_client.host, | ||||
) | ) | ||||
) | ) | ||||
finally: | finally: | ||||
server_process.wait() | server_process.wait() | ||||
@contextmanager | @contextmanager | ||||
def central_timezone(): | |||||
try: | |||||
with environ({"TZ": "US/Central"}): | |||||
time.tzset() | |||||
yield | |||||
finally: | |||||
time.tzset() | |||||
@contextmanager | |||||
def grpc_repo(): | def grpc_repo(): | ||||
with grpc_repo_location() as repo_location: | with grpc_repo_location() as repo_location: | ||||
yield repo_location.get_repository("the_repo") | yield repo_location.get_repository("the_repo") | ||||
@contextmanager | @contextmanager | ||||
def cli_api_repo(): | def cli_api_repo(): | ||||
loadable_target_origin = LoadableTargetOrigin( | loadable_target_origin = LoadableTargetOrigin( | ||||
Show All 14 Lines | def validate_tick( | ||||
expected_status, | expected_status, | ||||
expected_run_id, | expected_run_id, | ||||
expected_error=None, | expected_error=None, | ||||
): | ): | ||||
tick_data = tick.schedule_tick_data | tick_data = tick.schedule_tick_data | ||||
assert tick_data.schedule_origin_id == external_schedule.get_origin_id() | assert tick_data.schedule_origin_id == external_schedule.get_origin_id() | ||||
assert tick_data.schedule_name == external_schedule.name | assert tick_data.schedule_name == external_schedule.name | ||||
assert tick_data.cron_schedule == external_schedule.cron_schedule | assert tick_data.cron_schedule == external_schedule.cron_schedule | ||||
assert tick_data.timestamp == get_timestamp_from_utc_datetime(expected_datetime) | assert tick_data.timestamp == expected_datetime.float_timestamp | ||||
assert tick_data.status == expected_status | assert tick_data.status == expected_status | ||||
assert tick_data.run_id == expected_run_id | assert tick_data.run_id == expected_run_id | ||||
if expected_error: | if expected_error: | ||||
assert expected_error in tick_data.error.message | assert expected_error in tick_data.error.message | ||||
def validate_run_started(run, expected_datetime, expected_partition, expected_success=True): | def validate_run_started(run, expected_datetime, expected_partition, expected_success=True): | ||||
assert run.tags[SCHEDULED_EXECUTION_TIME_TAG] == expected_datetime.isoformat() | assert run.tags[SCHEDULED_EXECUTION_TIME_TAG] == expected_datetime.in_tz("UTC").isoformat() | ||||
assert run.tags[PARTITION_NAME_TAG] == expected_partition | assert run.tags[PARTITION_NAME_TAG] == expected_partition | ||||
if expected_success: | if expected_success: | ||||
assert run.status == PipelineRunStatus.STARTED or run.status == PipelineRunStatus.SUCCESS | assert run.status == PipelineRunStatus.STARTED or run.status == PipelineRunStatus.SUCCESS | ||||
else: | else: | ||||
assert run.status == PipelineRunStatus.FAILURE | assert run.status == PipelineRunStatus.FAILURE | ||||
Show All 11 Lines | while True: | ||||
if len(not_started_runs) == 0: | if len(not_started_runs) == 0: | ||||
break | break | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
"external_repo_context", [cli_api_repo, grpc_repo], | "external_repo_context", [cli_api_repo, grpc_repo], | ||||
) | ) | ||||
def test_simple_schedule(external_repo_context, capfd): | def test_simple_schedule(external_repo_context, capfd): | ||||
with central_timezone(): | freeze_datetime = pendulum.datetime( | ||||
initial_datetime = datetime( | year=2019, month=2, day=27, hour=23, minute=59, second=59, | ||||
year=2019, month=2, day=27, hour=23, minute=59, second=59, tzinfo=get_utc_timezone(), | ).in_tz("US/Central") | ||||
) | |||||
with instance_with_schedules(external_repo_context) as (instance, external_repo): | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
with freeze_time(initial_datetime) as frozen_datetime: | with pendulum.test(freeze_datetime): | ||||
external_schedule = external_repo.get_external_schedule("simple_schedule") | external_schedule = external_repo.get_external_schedule("simple_schedule") | ||||
schedule_origin = external_schedule.get_origin() | schedule_origin = external_schedule.get_origin() | ||||
instance.start_schedule_and_update_storage_state(external_schedule) | instance.start_schedule_and_update_storage_state(external_schedule) | ||||
assert instance.get_runs_count() == 0 | assert instance.get_runs_count() == 0 | ||||
ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ||||
assert len(ticks) == 0 | assert len(ticks) == 0 | ||||
# launch_scheduled_runs does nothing before the first tick | # launch_scheduled_runs does nothing before the first tick | ||||
launch_scheduled_runs( | launch_scheduled_runs( | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | instance, get_default_scheduler_logger(), pendulum.now("UTC"), | ||||
) | ) | ||||
assert instance.get_runs_count() == 0 | assert instance.get_runs_count() == 0 | ||||
ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ||||
assert len(ticks) == 0 | assert len(ticks) == 0 | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert ( | assert ( | ||||
captured.out | captured.out | ||||
== """2019-02-27 17:59:59 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | == """2019-02-27 17:59:59 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | ||||
2019-02-27 17:59:59 - dagster-scheduler - INFO - No new runs for simple_schedule | 2019-02-27 17:59:59 - dagster-scheduler - INFO - No new runs for simple_schedule | ||||
""" | """ | ||||
) | ) | ||||
# Move forward in time so we're past a tick | freeze_datetime = freeze_datetime.add(seconds=2) | ||||
frozen_datetime.tick(delta=timedelta(seconds=2)) | with pendulum.test(freeze_datetime): | ||||
launch_scheduled_runs( | launch_scheduled_runs( | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | instance, get_default_scheduler_logger(), pendulum.now("UTC"), | ||||
) | ) | ||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
expected_datetime = datetime(year=2019, month=2, day=28, tzinfo=get_utc_timezone()) | expected_datetime = pendulum.datetime(year=2019, month=2, day=28) | ||||
validate_tick( | validate_tick( | ||||
ticks[0], | ticks[0], | ||||
external_schedule, | external_schedule, | ||||
expected_datetime, | expected_datetime, | ||||
ScheduleTickStatus.SUCCESS, | ScheduleTickStatus.SUCCESS, | ||||
instance.get_runs()[0].run_id, | instance.get_runs()[0].run_id, | ||||
) | ) | ||||
wait_for_all_runs_to_start(instance) | wait_for_all_runs_to_start(instance) | ||||
validate_run_started(instance.get_runs()[0], expected_datetime, "2019-02-27") | validate_run_started(instance.get_runs()[0], expected_datetime, "2019-02-27") | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert ( | assert ( | ||||
captured.out | captured.out | ||||
== """2019-02-27 18:00:01 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | == """2019-02-27 18:00:01 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | ||||
2019-02-27 18:00:01 - dagster-scheduler - INFO - Launching run for simple_schedule at 2019-02-28 00:00:00+0000 | 2019-02-27 18:00:01 - dagster-scheduler - INFO - Launching run for simple_schedule at 2019-02-28 00:00:00+0000 | ||||
2019-02-27 18:00:01 - dagster-scheduler - INFO - Completed scheduled launch of run {run_id} for simple_schedule | 2019-02-27 18:00:01 - dagster-scheduler - INFO - Completed scheduled launch of run {run_id} for simple_schedule | ||||
""".format( | """.format( | ||||
run_id=instance.get_runs()[0].run_id | run_id=instance.get_runs()[0].run_id | ||||
) | ) | ||||
) | ) | ||||
# Verify idempotence | # Verify idempotence | ||||
launch_scheduled_runs( | launch_scheduled_runs( | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | instance, get_default_scheduler_logger(), pendulum.now("UTC"), | ||||
) | ) | ||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
assert ticks[0].status == ScheduleTickStatus.SUCCESS | assert ticks[0].status == ScheduleTickStatus.SUCCESS | ||||
# Verify advancing in time but not going past a tick doesn't add any new runs | # Verify advancing in time but not going past a tick doesn't add any new runs | ||||
frozen_datetime.tick(delta=timedelta(seconds=2)) | freeze_datetime = freeze_datetime.add(seconds=2) | ||||
with pendulum.test(freeze_datetime): | |||||
launch_scheduled_runs( | launch_scheduled_runs( | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | instance, get_default_scheduler_logger(), pendulum.now("UTC"), | ||||
) | ) | ||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
assert ticks[0].status == ScheduleTickStatus.SUCCESS | assert ticks[0].status == ScheduleTickStatus.SUCCESS | ||||
freeze_datetime = freeze_datetime.add(days=2) | |||||
with pendulum.test(freeze_datetime): | |||||
capfd.readouterr() | capfd.readouterr() | ||||
# Traveling two more days in the future before running results in two new ticks | # Traveling two more days in the future before running results in two new ticks | ||||
frozen_datetime.tick(delta=timedelta(days=2)) | |||||
launch_scheduled_runs( | launch_scheduled_runs( | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | instance, get_default_scheduler_logger(), pendulum.now("UTC"), | ||||
) | ) | ||||
assert instance.get_runs_count() == 3 | assert instance.get_runs_count() == 3 | ||||
ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ||||
assert len(ticks) == 3 | assert len(ticks) == 3 | ||||
assert ( | assert len([tick for tick in ticks if tick.status == ScheduleTickStatus.SUCCESS]) == 3 | ||||
len([tick for tick in ticks if tick.status == ScheduleTickStatus.SUCCESS]) == 3 | |||||
) | |||||
runs_by_partition = { | runs_by_partition = {run.tags[PARTITION_NAME_TAG]: run for run in instance.get_runs()} | ||||
run.tags[PARTITION_NAME_TAG]: run for run in instance.get_runs() | |||||
} | |||||
assert "2019-02-28" in runs_by_partition | assert "2019-02-28" in runs_by_partition | ||||
assert "2019-03-01" in runs_by_partition | assert "2019-03-01" in runs_by_partition | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert ( | assert ( | ||||
captured.out | captured.out | ||||
== """2019-03-01 18:00:03 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | == """2019-03-01 18:00:03 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | ||||
2019-03-01 18:00:03 - dagster-scheduler - INFO - Launching 2 runs for simple_schedule at the following times: 2019-03-01 00:00:00+0000, 2019-03-02 00:00:00+0000 | 2019-03-01 18:00:03 - dagster-scheduler - INFO - Launching 2 runs for simple_schedule at the following times: 2019-03-01 00:00:00+0000, 2019-03-02 00:00:00+0000 | ||||
2019-03-01 18:00:03 - dagster-scheduler - INFO - Completed scheduled launch of run {first_run_id} for simple_schedule | 2019-03-01 18:00:03 - dagster-scheduler - INFO - Completed scheduled launch of run {first_run_id} for simple_schedule | ||||
2019-03-01 18:00:03 - dagster-scheduler - INFO - Completed scheduled launch of run {second_run_id} for simple_schedule | 2019-03-01 18:00:03 - dagster-scheduler - INFO - Completed scheduled launch of run {second_run_id} for simple_schedule | ||||
""".format( | """.format( | ||||
first_run_id=instance.get_runs()[1].run_id, | first_run_id=instance.get_runs()[1].run_id, | ||||
second_run_id=instance.get_runs()[0].run_id, | second_run_id=instance.get_runs()[0].run_id, | ||||
) | ) | ||||
) | ) | ||||
# Check idempotence again | # Check idempotence again | ||||
launch_scheduled_runs( | launch_scheduled_runs(instance, get_default_scheduler_logger(), pendulum.now("UTC")) | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | |||||
) | |||||
assert instance.get_runs_count() == 3 | assert instance.get_runs_count() == 3 | ||||
ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ||||
assert len(ticks) == 3 | assert len(ticks) == 3 | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
"external_repo_context", [cli_api_repo, grpc_repo], | "external_repo_context", [cli_api_repo, grpc_repo], | ||||
) | ) | ||||
def test_bad_env_fn(external_repo_context, capfd): | def test_bad_env_fn(external_repo_context, capfd): | ||||
with instance_with_schedules(external_repo_context) as (instance, external_repo): | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
external_schedule = external_repo.get_external_schedule("bad_env_fn_schedule") | external_schedule = external_repo.get_external_schedule("bad_env_fn_schedule") | ||||
schedule_origin = external_schedule.get_origin() | schedule_origin = external_schedule.get_origin() | ||||
initial_datetime = datetime( | initial_datetime = pendulum.datetime(year=2019, month=2, day=27, hour=0, minute=0, second=0) | ||||
year=2019, month=2, day=27, hour=0, minute=0, second=0, tzinfo=get_utc_timezone(), | with pendulum.test(initial_datetime): | ||||
) | |||||
with freeze_time(initial_datetime): | |||||
instance.start_schedule_and_update_storage_state(external_schedule) | instance.start_schedule_and_update_storage_state(external_schedule) | ||||
launch_scheduled_runs( | launch_scheduled_runs(instance, get_default_scheduler_logger(), pendulum.now("UTC")) | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | |||||
) | |||||
assert instance.get_runs_count() == 0 | assert instance.get_runs_count() == 0 | ||||
ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
validate_tick( | validate_tick( | ||||
ticks[0], | ticks[0], | ||||
external_schedule, | external_schedule, | ||||
Show All 16 Lines | |||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
"external_repo_context", [cli_api_repo, grpc_repo], | "external_repo_context", [cli_api_repo, grpc_repo], | ||||
) | ) | ||||
def test_bad_should_execute(external_repo_context, capfd): | def test_bad_should_execute(external_repo_context, capfd): | ||||
with instance_with_schedules(external_repo_context) as (instance, external_repo): | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
external_schedule = external_repo.get_external_schedule("bad_should_execute_schedule") | external_schedule = external_repo.get_external_schedule("bad_should_execute_schedule") | ||||
schedule_origin = external_schedule.get_origin() | schedule_origin = external_schedule.get_origin() | ||||
initial_datetime = datetime( | initial_datetime = pendulum.datetime( | ||||
year=2019, month=2, day=27, hour=0, minute=0, second=0, tzinfo=get_utc_timezone(), | year=2019, month=2, day=27, hour=0, minute=0, second=0, | ||||
) | ) | ||||
with freeze_time(initial_datetime): | with pendulum.test(initial_datetime): | ||||
instance.start_schedule_and_update_storage_state(external_schedule) | instance.start_schedule_and_update_storage_state(external_schedule) | ||||
launch_scheduled_runs( | launch_scheduled_runs(instance, get_default_scheduler_logger(), pendulum.now("UTC")) | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | |||||
) | |||||
assert instance.get_runs_count() == 0 | assert instance.get_runs_count() == 0 | ||||
ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
validate_tick( | validate_tick( | ||||
ticks[0], | ticks[0], | ||||
external_schedule, | external_schedule, | ||||
Show All 16 Lines | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
assert "Exception: bananas" in captured.out | assert "Exception: bananas" in captured.out | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
"external_repo_context", [cli_api_repo, grpc_repo], | "external_repo_context", [cli_api_repo, grpc_repo], | ||||
) | ) | ||||
def test_skip(external_repo_context, capfd): | def test_skip(external_repo_context, capfd): | ||||
with central_timezone(): | |||||
with instance_with_schedules(external_repo_context) as (instance, external_repo): | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
external_schedule = external_repo.get_external_schedule("skip_schedule") | external_schedule = external_repo.get_external_schedule("skip_schedule") | ||||
schedule_origin = external_schedule.get_origin() | schedule_origin = external_schedule.get_origin() | ||||
initial_datetime = datetime( | initial_datetime = pendulum.datetime( | ||||
year=2019, month=2, day=27, hour=0, minute=0, second=0, tzinfo=get_utc_timezone(), | year=2019, month=2, day=27, hour=0, minute=0, second=0, | ||||
) | ).in_tz("US/Central") | ||||
with freeze_time(initial_datetime): | with pendulum.test(initial_datetime): | ||||
instance.start_schedule_and_update_storage_state(external_schedule) | instance.start_schedule_and_update_storage_state(external_schedule) | ||||
launch_scheduled_runs( | launch_scheduled_runs(instance, get_default_scheduler_logger(), pendulum.now("UTC")) | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | |||||
) | |||||
assert instance.get_runs_count() == 0 | assert instance.get_runs_count() == 0 | ||||
ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
validate_tick( | validate_tick( | ||||
ticks[0], external_schedule, initial_datetime, ScheduleTickStatus.SKIPPED, None, | ticks[0], external_schedule, initial_datetime, ScheduleTickStatus.SKIPPED, None, | ||||
) | ) | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert ( | assert ( | ||||
captured.out | captured.out | ||||
== """2019-02-26 18:00:00 - dagster-scheduler - INFO - Checking for new runs for the following schedules: skip_schedule | == """2019-02-26 18:00:00 - dagster-scheduler - INFO - Checking for new runs for the following schedules: skip_schedule | ||||
2019-02-26 18:00:00 - dagster-scheduler - INFO - Launching run for skip_schedule at 2019-02-27 00:00:00+0000 | 2019-02-26 18:00:00 - dagster-scheduler - INFO - Launching run for skip_schedule at 2019-02-27 00:00:00+0000 | ||||
2019-02-26 18:00:00 - dagster-scheduler - INFO - should_execute returned False for skip_schedule, skipping | 2019-02-26 18:00:00 - dagster-scheduler - INFO - should_execute returned False for skip_schedule, skipping | ||||
""" | """ | ||||
) | ) | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
"external_repo_context", [cli_api_repo, grpc_repo], | "external_repo_context", [cli_api_repo, grpc_repo], | ||||
) | ) | ||||
def test_wrong_config(external_repo_context, capfd): | def test_wrong_config(external_repo_context, capfd): | ||||
with instance_with_schedules(external_repo_context) as (instance, external_repo): | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
external_schedule = external_repo.get_external_schedule("wrong_config_schedule") | external_schedule = external_repo.get_external_schedule("wrong_config_schedule") | ||||
schedule_origin = external_schedule.get_origin() | schedule_origin = external_schedule.get_origin() | ||||
initial_datetime = datetime( | initial_datetime = pendulum.datetime(year=2019, month=2, day=27, hour=0, minute=0, second=0) | ||||
year=2019, month=2, day=27, hour=0, minute=0, second=0, tzinfo=get_utc_timezone(), | with pendulum.test(initial_datetime): | ||||
) | |||||
with freeze_time(initial_datetime): | |||||
instance.start_schedule_and_update_storage_state(external_schedule) | instance.start_schedule_and_update_storage_state(external_schedule) | ||||
launch_scheduled_runs( | launch_scheduled_runs(instance, get_default_scheduler_logger(), pendulum.now("UTC")) | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | |||||
) | |||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
wait_for_all_runs_to_start(instance) | wait_for_all_runs_to_start(instance) | ||||
run = instance.get_runs()[0] | run = instance.get_runs()[0] | ||||
validate_run_started(run, initial_datetime, "2019-02-26", expected_success=False) | validate_run_started(run, initial_datetime, "2019-02-26", expected_success=False) | ||||
Show All 38 Lines | def test_bad_schedule_mixed_with_good_schedule(external_repo_context): | ||||
with instance_with_schedules(external_repo_context) as (instance, external_repo): | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
good_schedule = external_repo.get_external_schedule("simple_schedule") | good_schedule = external_repo.get_external_schedule("simple_schedule") | ||||
bad_schedule = external_repo.get_external_schedule( | bad_schedule = external_repo.get_external_schedule( | ||||
"bad_should_execute_schedule_on_odd_days" | "bad_should_execute_schedule_on_odd_days" | ||||
) | ) | ||||
good_origin = good_schedule.get_origin() | good_origin = good_schedule.get_origin() | ||||
bad_origin = bad_schedule.get_origin() | bad_origin = bad_schedule.get_origin() | ||||
initial_datetime = datetime( | initial_datetime = pendulum.datetime( | ||||
year=2019, month=2, day=27, hour=0, minute=0, second=0, tzinfo=get_utc_timezone(), | year=2019, month=2, day=27, hour=0, minute=0, second=0, | ||||
) | ) | ||||
with freeze_time(initial_datetime) as frozen_datetime: | with pendulum.test(initial_datetime): | ||||
instance.start_schedule_and_update_storage_state(good_schedule) | instance.start_schedule_and_update_storage_state(good_schedule) | ||||
instance.start_schedule_and_update_storage_state(bad_schedule) | instance.start_schedule_and_update_storage_state(bad_schedule) | ||||
launch_scheduled_runs( | launch_scheduled_runs(instance, get_default_scheduler_logger(), pendulum.now("UTC")) | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | |||||
) | |||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
wait_for_all_runs_to_start(instance) | wait_for_all_runs_to_start(instance) | ||||
validate_run_started(instance.get_runs()[0], initial_datetime, "2019-02-26") | validate_run_started(instance.get_runs()[0], initial_datetime, "2019-02-26") | ||||
good_ticks = instance.get_schedule_ticks(good_origin.get_id()) | good_ticks = instance.get_schedule_ticks(good_origin.get_id()) | ||||
assert len(good_ticks) == 1 | assert len(good_ticks) == 1 | ||||
validate_tick( | validate_tick( | ||||
Show All 9 Lines | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
assert bad_ticks[0].status == ScheduleTickStatus.FAILURE | assert bad_ticks[0].status == ScheduleTickStatus.FAILURE | ||||
assert ( | assert ( | ||||
"Error occurred during the execution of should_execute " | "Error occurred during the execution of should_execute " | ||||
"for schedule bad_should_execute_schedule" in bad_ticks[0].error.message | "for schedule bad_should_execute_schedule" in bad_ticks[0].error.message | ||||
) | ) | ||||
frozen_datetime.tick(delta=timedelta(days=1)) | initial_datetime = initial_datetime.add(days=1) | ||||
with pendulum.test(initial_datetime): | |||||
new_now = get_current_datetime_in_utc() | new_now = pendulum.now("UTC") | ||||
launch_scheduled_runs(instance, get_default_scheduler_logger(), new_now) | |||||
launch_scheduled_runs( | |||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | |||||
) | |||||
assert instance.get_runs_count() == 3 | assert instance.get_runs_count() == 3 | ||||
wait_for_all_runs_to_start(instance) | wait_for_all_runs_to_start(instance) | ||||
good_schedule_runs = instance.get_runs( | good_schedule_runs = instance.get_runs( | ||||
filters=PipelineRunsFilter.for_schedule(good_schedule) | filters=PipelineRunsFilter.for_schedule(good_schedule) | ||||
) | ) | ||||
assert len(good_schedule_runs) == 2 | assert len(good_schedule_runs) == 2 | ||||
Show All 29 Lines | |||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
"external_repo_context", [cli_api_repo, grpc_repo], | "external_repo_context", [cli_api_repo, grpc_repo], | ||||
) | ) | ||||
def test_run_scheduled_on_time_boundary(external_repo_context): | def test_run_scheduled_on_time_boundary(external_repo_context): | ||||
with instance_with_schedules(external_repo_context) as (instance, external_repo): | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
external_schedule = external_repo.get_external_schedule("simple_schedule") | external_schedule = external_repo.get_external_schedule("simple_schedule") | ||||
schedule_origin = external_schedule.get_origin() | schedule_origin = external_schedule.get_origin() | ||||
initial_datetime = datetime( | initial_datetime = pendulum.datetime( | ||||
year=2019, month=2, day=27, hour=0, minute=0, second=0, tzinfo=get_utc_timezone(), | year=2019, month=2, day=27, hour=0, minute=0, second=0, | ||||
) | ) | ||||
with freeze_time(initial_datetime): | with pendulum.test(initial_datetime): | ||||
# Start schedule exactly at midnight | # Start schedule exactly at midnight | ||||
instance.start_schedule_and_update_storage_state(external_schedule) | instance.start_schedule_and_update_storage_state(external_schedule) | ||||
launch_scheduled_runs( | launch_scheduled_runs(instance, get_default_scheduler_logger(), pendulum.now("UTC")) | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | |||||
) | |||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
assert ticks[0].status == ScheduleTickStatus.SUCCESS | assert ticks[0].status == ScheduleTickStatus.SUCCESS | ||||
def test_bad_load(capfd): | def test_bad_load(capfd): | ||||
with schedule_instance() as instance: | with schedule_instance() as instance: | ||||
working_directory = os.path.dirname(__file__) | working_directory = os.path.dirname(__file__) | ||||
recon_repo = ReconstructableRepository.for_file(__file__, "doesnt_exist", working_directory) | recon_repo = ReconstructableRepository.for_file(__file__, "doesnt_exist", working_directory) | ||||
schedule = recon_repo.get_reconstructable_schedule("also_doesnt_exist") | schedule = recon_repo.get_reconstructable_schedule("also_doesnt_exist") | ||||
fake_origin = schedule.get_origin() | fake_origin = schedule.get_origin() | ||||
initial_datetime = datetime( | initial_datetime = pendulum.datetime( | ||||
year=2019, month=2, day=27, hour=23, minute=59, second=59, tzinfo=get_utc_timezone(), | year=2019, month=2, day=27, hour=23, minute=59, second=59, | ||||
) | ) | ||||
with freeze_time(initial_datetime) as frozen_datetime: | with pendulum.test(initial_datetime): | ||||
schedule_state = ScheduleState( | schedule_state = ScheduleState( | ||||
fake_origin, | fake_origin, | ||||
ScheduleStatus.RUNNING, | ScheduleStatus.RUNNING, | ||||
"0 0 * * *", | "0 0 * * *", | ||||
get_timestamp_from_utc_datetime(get_current_datetime_in_utc()), | pendulum.now("UTC").float_timestamp, | ||||
) | ) | ||||
instance.add_schedule_state(schedule_state) | instance.add_schedule_state(schedule_state) | ||||
frozen_datetime.tick(delta=timedelta(seconds=1)) | initial_datetime = initial_datetime.add(seconds=1) | ||||
with pendulum.test(initial_datetime): | |||||
launch_scheduled_runs( | launch_scheduled_runs(instance, get_default_scheduler_logger(), pendulum.now("UTC")) | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | |||||
) | |||||
assert instance.get_runs_count() == 0 | assert instance.get_runs_count() == 0 | ||||
ticks = instance.get_schedule_ticks(fake_origin.get_id()) | ticks = instance.get_schedule_ticks(fake_origin.get_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
assert ticks[0].status == ScheduleTickStatus.FAILURE | assert ticks[0].status == ScheduleTickStatus.FAILURE | ||||
assert ticks[0].timestamp == get_timestamp_from_utc_datetime( | assert ticks[0].timestamp == pendulum.now("UTC").float_timestamp | ||||
get_current_datetime_in_utc() | |||||
) | |||||
assert "doesnt_exist not found at module scope in file" in ticks[0].error.message | assert "doesnt_exist not found at module scope in file" in ticks[0].error.message | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert "Error launching scheduled run" in captured.out | assert "Error launching scheduled run" in captured.out | ||||
assert "doesnt_exist not found at module scope" in captured.out | assert "doesnt_exist not found at module scope" in captured.out | ||||
frozen_datetime.tick(delta=timedelta(days=1)) | initial_datetime = initial_datetime.add(days=1) | ||||
with pendulum.test(initial_datetime): | |||||
launch_scheduled_runs( | launch_scheduled_runs(instance, get_default_scheduler_logger(), pendulum.now("UTC")) | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | |||||
) | |||||
assert instance.get_runs_count() == 0 | assert instance.get_runs_count() == 0 | ||||
ticks = instance.get_schedule_ticks(fake_origin.get_id()) | ticks = instance.get_schedule_ticks(fake_origin.get_id()) | ||||
assert len(ticks) == 2 | assert len(ticks) == 2 | ||||
assert ticks[0].status == ScheduleTickStatus.FAILURE | assert ticks[0].status == ScheduleTickStatus.FAILURE | ||||
assert ticks[0].timestamp == get_timestamp_from_utc_datetime( | assert ticks[0].timestamp == pendulum.now("UTC").float_timestamp | ||||
get_current_datetime_in_utc() | |||||
) | |||||
assert "doesnt_exist not found at module scope in file" in ticks[0].error.message | assert "doesnt_exist not found at module scope in file" in ticks[0].error.message | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert "Error launching scheduled run" in captured.out | assert "Error launching scheduled run" in captured.out | ||||
assert "doesnt_exist not found at module scope" in captured.out | assert "doesnt_exist not found at module scope" in captured.out | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
"external_repo_context", [cli_api_repo, grpc_repo], | "external_repo_context", [cli_api_repo, grpc_repo], | ||||
) | ) | ||||
def test_multiple_schedules_on_different_time_ranges(external_repo_context, capfd): | def test_multiple_schedules_on_different_time_ranges(external_repo_context, capfd): | ||||
with central_timezone(): | |||||
with instance_with_schedules(external_repo_context) as (instance, external_repo): | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
external_schedule = external_repo.get_external_schedule("simple_schedule") | external_schedule = external_repo.get_external_schedule("simple_schedule") | ||||
external_hourly_schedule = external_repo.get_external_schedule("simple_hourly_schedule") | external_hourly_schedule = external_repo.get_external_schedule("simple_hourly_schedule") | ||||
initial_datetime = datetime( | initial_datetime = pendulum.datetime( | ||||
year=2019, | year=2019, month=2, day=27, hour=23, minute=59, second=59, | ||||
month=2, | ).in_tz("US/Central") | ||||
day=27, | with pendulum.test(initial_datetime): | ||||
hour=23, | |||||
minute=59, | |||||
second=59, | |||||
tzinfo=get_utc_timezone(), | |||||
) | |||||
with freeze_time(initial_datetime) as frozen_datetime: | |||||
instance.start_schedule_and_update_storage_state(external_schedule) | instance.start_schedule_and_update_storage_state(external_schedule) | ||||
instance.start_schedule_and_update_storage_state(external_hourly_schedule) | instance.start_schedule_and_update_storage_state(external_hourly_schedule) | ||||
frozen_datetime.tick(delta=timedelta(seconds=2)) | |||||
initial_datetime = initial_datetime.add(seconds=2) | |||||
with pendulum.test(initial_datetime): | |||||
launch_scheduled_runs( | launch_scheduled_runs( | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | instance, get_default_scheduler_logger(), pendulum.now("UTC"), | ||||
) | ) | ||||
assert instance.get_runs_count() == 2 | assert instance.get_runs_count() == 2 | ||||
ticks = instance.get_schedule_ticks(external_schedule.get_origin_id()) | ticks = instance.get_schedule_ticks(external_schedule.get_origin_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
assert ticks[0].status == ScheduleTickStatus.SUCCESS | assert ticks[0].status == ScheduleTickStatus.SUCCESS | ||||
hourly_ticks = instance.get_schedule_ticks(external_hourly_schedule.get_origin_id()) | hourly_ticks = instance.get_schedule_ticks(external_hourly_schedule.get_origin_id()) | ||||
assert len(hourly_ticks) == 1 | assert len(hourly_ticks) == 1 | ||||
assert hourly_ticks[0].status == ScheduleTickStatus.SUCCESS | assert hourly_ticks[0].status == ScheduleTickStatus.SUCCESS | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert ( | assert ( | ||||
captured.out | captured.out | ||||
== """2019-02-27 18:00:01 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_hourly_schedule, simple_schedule | == """2019-02-27 18:00:01 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_hourly_schedule, simple_schedule | ||||
2019-02-27 18:00:01 - dagster-scheduler - INFO - Launching run for simple_hourly_schedule at 2019-02-28 00:00:00+0000 | 2019-02-27 18:00:01 - dagster-scheduler - INFO - Launching run for simple_hourly_schedule at 2019-02-28 00:00:00+0000 | ||||
2019-02-27 18:00:01 - dagster-scheduler - INFO - Completed scheduled launch of run {first_run_id} for simple_hourly_schedule | 2019-02-27 18:00:01 - dagster-scheduler - INFO - Completed scheduled launch of run {first_run_id} for simple_hourly_schedule | ||||
2019-02-27 18:00:01 - dagster-scheduler - INFO - Launching run for simple_schedule at 2019-02-28 00:00:00+0000 | 2019-02-27 18:00:01 - dagster-scheduler - INFO - Launching run for simple_schedule at 2019-02-28 00:00:00+0000 | ||||
2019-02-27 18:00:01 - dagster-scheduler - INFO - Completed scheduled launch of run {second_run_id} for simple_schedule | 2019-02-27 18:00:01 - dagster-scheduler - INFO - Completed scheduled launch of run {second_run_id} for simple_schedule | ||||
""".format( | """.format( | ||||
first_run_id=instance.get_runs()[1].run_id, | first_run_id=instance.get_runs()[1].run_id, | ||||
second_run_id=instance.get_runs()[0].run_id, | second_run_id=instance.get_runs()[0].run_id, | ||||
) | ) | ||||
) | ) | ||||
frozen_datetime.tick(delta=timedelta(hours=1)) | initial_datetime = initial_datetime.add(hours=1) | ||||
with pendulum.test(initial_datetime): | |||||
launch_scheduled_runs( | launch_scheduled_runs(instance, get_default_scheduler_logger(), pendulum.now("UTC")) | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | |||||
) | |||||
assert instance.get_runs_count() == 3 | assert instance.get_runs_count() == 3 | ||||
ticks = instance.get_schedule_ticks(external_schedule.get_origin_id()) | ticks = instance.get_schedule_ticks(external_schedule.get_origin_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
assert ticks[0].status == ScheduleTickStatus.SUCCESS | assert ticks[0].status == ScheduleTickStatus.SUCCESS | ||||
hourly_ticks = instance.get_schedule_ticks(external_hourly_schedule.get_origin_id()) | hourly_ticks = instance.get_schedule_ticks(external_hourly_schedule.get_origin_id()) | ||||
assert len(hourly_ticks) == 2 | assert len(hourly_ticks) == 2 | ||||
assert ( | assert ( | ||||
len( | len([tick for tick in hourly_ticks if tick.status == ScheduleTickStatus.SUCCESS]) | ||||
[tick for tick in hourly_ticks if tick.status == ScheduleTickStatus.SUCCESS] | |||||
) | |||||
== 2 | == 2 | ||||
) | ) | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert ( | assert ( | ||||
captured.out | captured.out | ||||
== """2019-02-27 19:00:01 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_hourly_schedule, simple_schedule | == """2019-02-27 19:00:01 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_hourly_schedule, simple_schedule | ||||
2019-02-27 19:00:01 - dagster-scheduler - INFO - Launching run for simple_hourly_schedule at 2019-02-28 01:00:00+0000 | 2019-02-27 19:00:01 - dagster-scheduler - INFO - Launching run for simple_hourly_schedule at 2019-02-28 01:00:00+0000 | ||||
2019-02-27 19:00:01 - dagster-scheduler - INFO - Completed scheduled launch of run {third_run_id} for simple_hourly_schedule | 2019-02-27 19:00:01 - dagster-scheduler - INFO - Completed scheduled launch of run {third_run_id} for simple_hourly_schedule | ||||
2019-02-27 19:00:01 - dagster-scheduler - INFO - No new runs for simple_schedule | 2019-02-27 19:00:01 - dagster-scheduler - INFO - No new runs for simple_schedule | ||||
""".format( | """.format( | ||||
third_run_id=instance.get_runs()[0].run_id | third_run_id=instance.get_runs()[0].run_id | ||||
) | ) | ||||
) | ) | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
"external_repo_context", [cli_api_repo, grpc_repo], | "external_repo_context", [cli_api_repo, grpc_repo], | ||||
) | ) | ||||
def test_launch_failure(external_repo_context, capfd): | def test_launch_failure(external_repo_context, capfd): | ||||
with central_timezone(): | |||||
with instance_with_schedules( | with instance_with_schedules( | ||||
external_repo_context, | external_repo_context, | ||||
overrides={ | overrides={ | ||||
"run_launcher": { | "run_launcher": {"module": "dagster.core.test_utils", "class": "ExplodingRunLauncher",}, | ||||
"module": "dagster.core.test_utils", | |||||
"class": "ExplodingRunLauncher", | |||||
}, | |||||
}, | }, | ||||
) as (instance, external_repo): | ) as (instance, external_repo): | ||||
external_schedule = external_repo.get_external_schedule("simple_schedule") | external_schedule = external_repo.get_external_schedule("simple_schedule") | ||||
schedule_origin = external_schedule.get_origin() | schedule_origin = external_schedule.get_origin() | ||||
initial_datetime = datetime( | initial_datetime = pendulum.datetime( | ||||
year=2019, month=2, day=27, hour=0, minute=0, second=0, tzinfo=get_utc_timezone(), | year=2019, month=2, day=27, hour=0, minute=0, second=0, | ||||
) | ).in_tz("US/Central") | ||||
with freeze_time(initial_datetime): | with pendulum.test(initial_datetime): | ||||
instance.start_schedule_and_update_storage_state(external_schedule) | instance.start_schedule_and_update_storage_state(external_schedule) | ||||
launch_scheduled_runs( | launch_scheduled_runs(instance, get_default_scheduler_logger(), pendulum.now("UTC")) | ||||
instance, get_default_scheduler_logger(), get_current_datetime_in_utc() | |||||
) | |||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
run = instance.get_runs()[0] | run = instance.get_runs()[0] | ||||
validate_run_started(run, initial_datetime, "2019-02-26", expected_success=False) | validate_run_started(run, initial_datetime, "2019-02-26", expected_success=False) | ||||
ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
validate_tick( | validate_tick( | ||||
ticks[0], | ticks[0], | ||||
external_schedule, | external_schedule, | ||||
initial_datetime, | initial_datetime, | ||||
ScheduleTickStatus.SUCCESS, | ScheduleTickStatus.SUCCESS, | ||||
run.run_id, | run.run_id, | ||||
) | ) | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert ( | assert ( | ||||
captured.out | captured.out | ||||
== """2019-02-26 18:00:00 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | == """2019-02-26 18:00:00 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | ||||
2019-02-26 18:00:00 - dagster-scheduler - INFO - Launching run for simple_schedule at 2019-02-27 00:00:00+0000 | 2019-02-26 18:00:00 - dagster-scheduler - INFO - Launching run for simple_schedule at 2019-02-27 00:00:00+0000 | ||||
2019-02-26 18:00:00 - dagster-scheduler - ERROR - Run {run_id} created successfully but failed to launch. | 2019-02-26 18:00:00 - dagster-scheduler - ERROR - Run {run_id} created successfully but failed to launch. | ||||
""".format( | """.format( | ||||
run_id=instance.get_runs()[0].run_id | run_id=instance.get_runs()[0].run_id | ||||
) | ) | ||||
) | ) | ||||
def test_max_catchup_runs(capfd): | def test_max_catchup_runs(capfd): | ||||
initial_datetime = datetime( | initial_datetime = pendulum.datetime( | ||||
year=2019, month=2, day=27, hour=23, minute=59, second=59, tzinfo=get_utc_timezone(), | year=2019, month=2, day=27, hour=23, minute=59, second=59 | ||||
) | ).in_tz("US/Central") | ||||
with central_timezone(): | |||||
with instance_with_schedules(grpc_repo) as (instance, external_repo): | with instance_with_schedules(grpc_repo) as (instance, external_repo): | ||||
with freeze_time(initial_datetime) as frozen_datetime: | with pendulum.test(initial_datetime): | ||||
external_schedule = external_repo.get_external_schedule("simple_schedule") | external_schedule = external_repo.get_external_schedule("simple_schedule") | ||||
schedule_origin = external_schedule.get_origin() | schedule_origin = external_schedule.get_origin() | ||||
instance.start_schedule_and_update_storage_state(external_schedule) | instance.start_schedule_and_update_storage_state(external_schedule) | ||||
initial_datetime = initial_datetime.add(days=5) | |||||
with pendulum.test(initial_datetime): | |||||
# Day is now March 4 at 11:59PM | # Day is now March 4 at 11:59PM | ||||
frozen_datetime.tick(delta=timedelta(days=5)) | |||||
launch_scheduled_runs( | launch_scheduled_runs( | ||||
instance, | instance, get_default_scheduler_logger(), pendulum.now("UTC"), max_catchup_runs=2, | ||||
get_default_scheduler_logger(), | |||||
get_current_datetime_in_utc(), | |||||
max_catchup_runs=2, | |||||
) | ) | ||||
assert instance.get_runs_count() == 2 | assert instance.get_runs_count() == 2 | ||||
ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ticks = instance.get_schedule_ticks(schedule_origin.get_id()) | ||||
assert len(ticks) == 2 | assert len(ticks) == 2 | ||||
first_datetime = datetime(year=2019, month=3, day=4, tzinfo=get_utc_timezone()) | first_datetime = pendulum.datetime(year=2019, month=3, day=4) | ||||
wait_for_all_runs_to_start(instance) | wait_for_all_runs_to_start(instance) | ||||
validate_tick( | validate_tick( | ||||
ticks[0], | ticks[0], | ||||
external_schedule, | external_schedule, | ||||
first_datetime, | first_datetime, | ||||
ScheduleTickStatus.SUCCESS, | ScheduleTickStatus.SUCCESS, | ||||
instance.get_runs()[0].run_id, | instance.get_runs()[0].run_id, | ||||
) | ) | ||||
validate_run_started(instance.get_runs()[0], first_datetime, "2019-03-03") | validate_run_started(instance.get_runs()[0], first_datetime, "2019-03-03") | ||||
second_datetime = datetime(year=2019, month=3, day=3, tzinfo=get_utc_timezone()) | second_datetime = pendulum.datetime(year=2019, month=3, day=3) | ||||
validate_tick( | validate_tick( | ||||
ticks[1], | ticks[1], | ||||
external_schedule, | external_schedule, | ||||
second_datetime, | second_datetime, | ||||
ScheduleTickStatus.SUCCESS, | ScheduleTickStatus.SUCCESS, | ||||
instance.get_runs()[1].run_id, | instance.get_runs()[1].run_id, | ||||
) | ) | ||||
validate_run_started(instance.get_runs()[1], second_datetime, "2019-03-02") | validate_run_started(instance.get_runs()[1], second_datetime, "2019-03-02") | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert ( | assert ( | ||||
captured.out | captured.out | ||||
== """2019-03-04 17:59:59 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | == """2019-03-04 17:59:59 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | ||||
2019-03-04 17:59:59 - dagster-scheduler - WARNING - simple_schedule has fallen behind, only launching 2 runs | 2019-03-04 17:59:59 - dagster-scheduler - WARNING - simple_schedule has fallen behind, only launching 2 runs | ||||
2019-03-04 17:59:59 - dagster-scheduler - INFO - Launching 2 runs for simple_schedule at the following times: 2019-03-03 00:00:00+0000, 2019-03-04 00:00:00+0000 | 2019-03-04 17:59:59 - dagster-scheduler - INFO - Launching 2 runs for simple_schedule at the following times: 2019-03-03 00:00:00+0000, 2019-03-04 00:00:00+0000 | ||||
2019-03-04 17:59:59 - dagster-scheduler - INFO - Completed scheduled launch of run {first_run_id} for simple_schedule | 2019-03-04 17:59:59 - dagster-scheduler - INFO - Completed scheduled launch of run {first_run_id} for simple_schedule | ||||
2019-03-04 17:59:59 - dagster-scheduler - INFO - Completed scheduled launch of run {second_run_id} for simple_schedule | 2019-03-04 17:59:59 - dagster-scheduler - INFO - Completed scheduled launch of run {second_run_id} for simple_schedule | ||||
""".format( | """.format( | ||||
first_run_id=instance.get_runs()[1].run_id, | first_run_id=instance.get_runs()[1].run_id, | ||||
second_run_id=instance.get_runs()[0].run_id, | second_run_id=instance.get_runs()[0].run_id, | ||||
) | ) | ||||
) | ) |