Differential D4671 Diff 23273 python_modules/dagster/dagster_tests/scheduler_tests/test_scheduler_failure_recovery.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/scheduler_tests/test_scheduler_failure_recovery.py
import signal | import signal | ||||
from datetime import datetime, timedelta | |||||
import pendulum | |||||
import pytest | import pytest | ||||
from freezegun import freeze_time | |||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.scheduler import ScheduleTickStatus | from dagster.core.scheduler import ScheduleTickStatus | ||||
from dagster.core.storage.pipeline_run import PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRunStatus | ||||
from dagster.core.storage.tags import PARTITION_NAME_TAG, SCHEDULED_EXECUTION_TIME_TAG | from dagster.core.storage.tags import PARTITION_NAME_TAG, SCHEDULED_EXECUTION_TIME_TAG | ||||
from dagster.scheduler.scheduler import get_default_scheduler_logger, launch_scheduled_runs | from dagster.scheduler.scheduler import get_default_scheduler_logger, launch_scheduled_runs | ||||
from dagster.seven import get_current_datetime_in_utc, get_utc_timezone, multiprocessing | from dagster.seven import multiprocessing | ||||
from .test_scheduler_run import ( | from .test_scheduler_run import ( | ||||
central_timezone, | |||||
cli_api_repo, | cli_api_repo, | ||||
grpc_repo, | grpc_repo, | ||||
instance_with_schedules, | instance_with_schedules, | ||||
validate_run_started, | validate_run_started, | ||||
validate_tick, | validate_tick, | ||||
wait_for_all_runs_to_start, | wait_for_all_runs_to_start, | ||||
) | ) | ||||
def _test_launch_scheduled_runs_in_subprocess(instance_ref, execution_datetime, debug_crash_flags): | def _test_launch_scheduled_runs_in_subprocess(instance_ref, execution_datetime, debug_crash_flags): | ||||
with DagsterInstance.from_ref(instance_ref) as instance: | with DagsterInstance.from_ref(instance_ref) as instance: | ||||
with freeze_time(execution_datetime): | with pendulum.test(execution_datetime): | ||||
launch_scheduled_runs( | launch_scheduled_runs( | ||||
instance, | instance, | ||||
get_default_scheduler_logger(), | get_default_scheduler_logger(), | ||||
get_current_datetime_in_utc(), | pendulum.now("UTC"), | ||||
debug_crash_flags=debug_crash_flags, | debug_crash_flags=debug_crash_flags, | ||||
) | ) | ||||
@pytest.mark.parametrize("external_repo_context", [cli_api_repo, grpc_repo]) | @pytest.mark.parametrize("external_repo_context", [cli_api_repo, grpc_repo]) | ||||
@pytest.mark.parametrize("crash_location", ["TICK_CREATED", "TICK_HELD"]) | @pytest.mark.parametrize("crash_location", ["TICK_CREATED", "TICK_HELD"]) | ||||
@pytest.mark.parametrize("crash_signal", [signal.SIGKILL, signal.SIGINT]) | @pytest.mark.parametrize("crash_signal", [signal.SIGKILL, signal.SIGINT]) | ||||
def test_failure_recovery_before_run_created( | def test_failure_recovery_before_run_created( | ||||
external_repo_context, crash_location, crash_signal, capfd | external_repo_context, crash_location, crash_signal, capfd | ||||
): | ): | ||||
with central_timezone(): | |||||
# Verify that if the scheduler crashes or is interrupted before a run is created, | # Verify that if the scheduler crashes or is interrupted before a run is created, | ||||
# it will create exactly one tick/run when it is re-launched | # it will create exactly one tick/run when it is re-launched | ||||
with instance_with_schedules(external_repo_context) as (instance, external_repo): | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
initial_datetime = datetime( | initial_datetime = pendulum.datetime( | ||||
year=2019, month=2, day=27, hour=0, minute=0, second=0, tzinfo=get_utc_timezone(), | year=2019, month=2, day=27, hour=0, minute=0, second=0 | ||||
) | ).in_tz("US/Central") | ||||
frozen_datetime = initial_datetime.add() | |||||
external_schedule = external_repo.get_external_schedule("simple_schedule") | external_schedule = external_repo.get_external_schedule("simple_schedule") | ||||
with freeze_time(initial_datetime) as frozen_datetime: | with pendulum.test(frozen_datetime): | ||||
instance.start_schedule_and_update_storage_state(external_schedule) | instance.start_schedule_and_update_storage_state(external_schedule) | ||||
debug_crash_flags = {external_schedule.name: {crash_location: crash_signal}} | debug_crash_flags = {external_schedule.name: {crash_location: crash_signal}} | ||||
scheduler_process = multiprocessing.Process( | scheduler_process = multiprocessing.Process( | ||||
target=_test_launch_scheduled_runs_in_subprocess, | target=_test_launch_scheduled_runs_in_subprocess, | ||||
args=[instance.get_ref(), get_current_datetime_in_utc(), debug_crash_flags], | args=[instance.get_ref(), frozen_datetime, debug_crash_flags], | ||||
) | ) | ||||
scheduler_process.start() | scheduler_process.start() | ||||
scheduler_process.join(timeout=60) | scheduler_process.join(timeout=60) | ||||
assert scheduler_process.exitcode != 0 | assert scheduler_process.exitcode != 0 | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert ( | assert ( | ||||
captured.out | captured.out | ||||
== """2019-02-26 18:00:00 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | == """2019-02-26 18:00:00 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | ||||
2019-02-26 18:00:00 - dagster-scheduler - INFO - Launching run for simple_schedule at 2019-02-27 00:00:00+0000 | 2019-02-26 18:00:00 - dagster-scheduler - INFO - Launching run for simple_schedule at 2019-02-27 00:00:00+0000 | ||||
""" | """ | ||||
) | ) | ||||
ticks = instance.get_schedule_ticks(external_schedule.get_origin_id()) | ticks = instance.get_schedule_ticks(external_schedule.get_origin_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
assert ticks[0].status == ScheduleTickStatus.STARTED | assert ticks[0].status == ScheduleTickStatus.STARTED | ||||
assert instance.get_runs_count() == 0 | assert instance.get_runs_count() == 0 | ||||
frozen_datetime.tick(delta=timedelta(minutes=5)) | frozen_datetime = frozen_datetime.add(minutes=5) | ||||
with pendulum.test(frozen_datetime): | |||||
scheduler_process = multiprocessing.Process( | scheduler_process = multiprocessing.Process( | ||||
target=_test_launch_scheduled_runs_in_subprocess, | target=_test_launch_scheduled_runs_in_subprocess, | ||||
args=[instance.get_ref(), get_current_datetime_in_utc(), None], | args=[instance.get_ref(), frozen_datetime, None], | ||||
) | ) | ||||
scheduler_process.start() | scheduler_process.start() | ||||
scheduler_process.join(timeout=60) | scheduler_process.join(timeout=60) | ||||
assert scheduler_process.exitcode == 0 | assert scheduler_process.exitcode == 0 | ||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
wait_for_all_runs_to_start(instance) | wait_for_all_runs_to_start(instance) | ||||
validate_run_started(instance.get_runs()[0], initial_datetime, "2019-02-26") | validate_run_started(instance.get_runs()[0], initial_datetime, "2019-02-26") | ||||
ticks = instance.get_schedule_ticks(external_schedule.get_origin_id()) | ticks = instance.get_schedule_ticks(external_schedule.get_origin_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
validate_tick( | validate_tick( | ||||
ticks[0], | ticks[0], | ||||
external_schedule, | external_schedule, | ||||
initial_datetime, | initial_datetime, | ||||
ScheduleTickStatus.SUCCESS, | ScheduleTickStatus.SUCCESS, | ||||
instance.get_runs()[0].run_id, | instance.get_runs()[0].run_id, | ||||
) | ) | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert ( | assert ( | ||||
captured.out | captured.out | ||||
== """2019-02-26 18:05:00 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | == """2019-02-26 18:05:00 - dagster-scheduler - INFO - Checking for new runs for the following schedules: simple_schedule | ||||
2019-02-26 18:05:00 - dagster-scheduler - INFO - Launching run for simple_schedule at 2019-02-27 00:00:00+0000 | 2019-02-26 18:05:00 - dagster-scheduler - INFO - Launching run for simple_schedule at 2019-02-27 00:00:00+0000 | ||||
2019-02-26 18:05:00 - dagster-scheduler - INFO - Resuming previously interrupted schedule execution | 2019-02-26 18:05:00 - dagster-scheduler - INFO - Resuming previously interrupted schedule execution | ||||
2019-02-26 18:05:00 - dagster-scheduler - INFO - Completed scheduled launch of run {run_id} for simple_schedule | 2019-02-26 18:05:00 - dagster-scheduler - INFO - Completed scheduled launch of run {run_id} for simple_schedule | ||||
""".format( | """.format( | ||||
run_id=instance.get_runs()[0].run_id | run_id=instance.get_runs()[0].run_id | ||||
) | ) | ||||
) | ) | ||||
@pytest.mark.parametrize("external_repo_context", [cli_api_repo, grpc_repo]) | @pytest.mark.parametrize("external_repo_context", [cli_api_repo, grpc_repo]) | ||||
@pytest.mark.parametrize("crash_location", ["RUN_CREATED", "RUN_LAUNCHED"]) | @pytest.mark.parametrize("crash_location", ["RUN_CREATED", "RUN_LAUNCHED"]) | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
"crash_signal", [signal.SIGKILL, signal.SIGINT], | "crash_signal", [signal.SIGKILL, signal.SIGINT], | ||||
) | ) | ||||
def test_failure_recovery_after_run_created( | def test_failure_recovery_after_run_created( | ||||
external_repo_context, crash_location, crash_signal, capfd | external_repo_context, crash_location, crash_signal, capfd | ||||
): | ): | ||||
# Verify that if the scheduler crashes or is interrupted after a run is created, | # Verify that if the scheduler crashes or is interrupted after a run is created, | ||||
# it will just re-launch the already-created run when it runs again | # it will just re-launch the already-created run when it runs again | ||||
with instance_with_schedules(external_repo_context) as (instance, external_repo): | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
initial_datetime = datetime( | initial_datetime = pendulum.datetime(year=2019, month=2, day=27, hour=0, minute=0, second=0) | ||||
year=2019, month=2, day=27, hour=0, minute=0, second=0, tzinfo=get_utc_timezone(), | frozen_datetime = initial_datetime.add() | ||||
) | |||||
external_schedule = external_repo.get_external_schedule("simple_schedule") | external_schedule = external_repo.get_external_schedule("simple_schedule") | ||||
with freeze_time(initial_datetime) as frozen_datetime: | with pendulum.test(frozen_datetime): | ||||
instance.start_schedule_and_update_storage_state(external_schedule) | instance.start_schedule_and_update_storage_state(external_schedule) | ||||
debug_crash_flags = {external_schedule.name: {crash_location: crash_signal}} | debug_crash_flags = {external_schedule.name: {crash_location: crash_signal}} | ||||
scheduler_process = multiprocessing.Process( | scheduler_process = multiprocessing.Process( | ||||
target=_test_launch_scheduled_runs_in_subprocess, | target=_test_launch_scheduled_runs_in_subprocess, | ||||
args=[instance.get_ref(), get_current_datetime_in_utc(), debug_crash_flags], | args=[instance.get_ref(), frozen_datetime, debug_crash_flags], | ||||
) | ) | ||||
scheduler_process.start() | scheduler_process.start() | ||||
scheduler_process.join(timeout=60) | scheduler_process.join(timeout=60) | ||||
assert scheduler_process.exitcode != 0 | assert scheduler_process.exitcode != 0 | ||||
capfd.readouterr() | capfd.readouterr() | ||||
ticks = instance.get_schedule_ticks(external_schedule.get_origin_id()) | ticks = instance.get_schedule_ticks(external_schedule.get_origin_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
assert ticks[0].status == ScheduleTickStatus.STARTED | assert ticks[0].status == ScheduleTickStatus.STARTED | ||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
if crash_location == "RUN_CREATED": | if crash_location == "RUN_CREATED": | ||||
run = instance.get_runs()[0] | run = instance.get_runs()[0] | ||||
# Run was created, but hasn't launched yet | # Run was created, but hasn't launched yet | ||||
assert run.tags[SCHEDULED_EXECUTION_TIME_TAG] == initial_datetime.isoformat() | assert run.tags[SCHEDULED_EXECUTION_TIME_TAG] == frozen_datetime.isoformat() | ||||
assert run.tags[PARTITION_NAME_TAG] == "2019-02-26" | assert run.tags[PARTITION_NAME_TAG] == "2019-02-26" | ||||
assert run.status == PipelineRunStatus.NOT_STARTED | assert run.status == PipelineRunStatus.NOT_STARTED | ||||
else: | else: | ||||
# The run was created and launched - running again should do nothing other than | # The run was created and launched - running again should do nothing other than | ||||
# moving the tick to success state. | # moving the tick to success state. | ||||
# The fact that we need to add this line indicates that there is still a theoretical | # The fact that we need to add this line indicates that there is still a theoretical | ||||
# possible race condition - if the scheduler fails after launching a run | # possible race condition - if the scheduler fails after launching a run | ||||
# and then runs again between when the run was launched and when its status is changed to STARTED by the executor, we could | # and then runs again between when the run was launched and when its status is changed to STARTED by the executor, we could | ||||
# end up launching the same run twice. Run queueing or some other way to immediately | # end up launching the same run twice. Run queueing or some other way to immediately | ||||
# identify that a run was launched would help eliminate this race condition. For now, | # identify that a run was launched would help eliminate this race condition. For now, | ||||
# eliminate the possibility by waiting for the run to start before running the | # eliminate the possibility by waiting for the run to start before running the | ||||
# scheduler again. | # scheduler again. | ||||
wait_for_all_runs_to_start(instance) | wait_for_all_runs_to_start(instance) | ||||
run = instance.get_runs()[0] | run = instance.get_runs()[0] | ||||
validate_run_started(instance.get_runs()[0], initial_datetime, "2019-02-26") | validate_run_started(instance.get_runs()[0], frozen_datetime, "2019-02-26") | ||||
assert run.status in [PipelineRunStatus.STARTED, PipelineRunStatus.SUCCESS] | assert run.status in [PipelineRunStatus.STARTED, PipelineRunStatus.SUCCESS] | ||||
frozen_datetime.tick(delta=timedelta(minutes=5)) | frozen_datetime = frozen_datetime.add(minutes=5) | ||||
with pendulum.test(frozen_datetime): | |||||
# Running again just launches the existing run and marks the tick as success | # Running again just launches the existing run and marks the tick as success | ||||
scheduler_process = multiprocessing.Process( | scheduler_process = multiprocessing.Process( | ||||
target=_test_launch_scheduled_runs_in_subprocess, | target=_test_launch_scheduled_runs_in_subprocess, | ||||
args=[instance.get_ref(), get_current_datetime_in_utc(), None], | args=[instance.get_ref(), frozen_datetime, None], | ||||
) | ) | ||||
scheduler_process.start() | scheduler_process.start() | ||||
scheduler_process.join(timeout=60) | scheduler_process.join(timeout=60) | ||||
assert scheduler_process.exitcode == 0 | assert scheduler_process.exitcode == 0 | ||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
wait_for_all_runs_to_start(instance) | wait_for_all_runs_to_start(instance) | ||||
validate_run_started(instance.get_runs()[0], initial_datetime, "2019-02-26") | validate_run_started(instance.get_runs()[0], initial_datetime, "2019-02-26") | ||||
Show All 27 Lines | |||||
@pytest.mark.parametrize("external_repo_context", [cli_api_repo, grpc_repo]) | @pytest.mark.parametrize("external_repo_context", [cli_api_repo, grpc_repo]) | ||||
@pytest.mark.parametrize("crash_location", ["TICK_SUCCESS"]) | @pytest.mark.parametrize("crash_location", ["TICK_SUCCESS"]) | ||||
@pytest.mark.parametrize("crash_signal", [signal.SIGKILL, signal.SIGINT]) | @pytest.mark.parametrize("crash_signal", [signal.SIGKILL, signal.SIGINT]) | ||||
def test_failure_recovery_after_tick_success(external_repo_context, crash_location, crash_signal): | def test_failure_recovery_after_tick_success(external_repo_context, crash_location, crash_signal): | ||||
# Verify that if the scheduler crashes or is interrupted after a run is created, | # Verify that if the scheduler crashes or is interrupted after a run is created, | ||||
# it will just re-launch the already-created run when it runs again | # it will just re-launch the already-created run when it runs again | ||||
with instance_with_schedules(external_repo_context) as (instance, external_repo): | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
initial_datetime = datetime( | initial_datetime = pendulum.datetime(year=2019, month=2, day=27, hour=0, minute=0, second=0) | ||||
year=2019, month=2, day=27, hour=0, minute=0, second=0, tzinfo=get_utc_timezone(), | frozen_datetime = initial_datetime.add() | ||||
) | |||||
external_schedule = external_repo.get_external_schedule("simple_schedule") | external_schedule = external_repo.get_external_schedule("simple_schedule") | ||||
with freeze_time(initial_datetime) as frozen_datetime: | with pendulum.test(frozen_datetime): | ||||
instance.start_schedule_and_update_storage_state(external_schedule) | instance.start_schedule_and_update_storage_state(external_schedule) | ||||
debug_crash_flags = {external_schedule.name: {crash_location: crash_signal}} | debug_crash_flags = {external_schedule.name: {crash_location: crash_signal}} | ||||
scheduler_process = multiprocessing.Process( | scheduler_process = multiprocessing.Process( | ||||
target=_test_launch_scheduled_runs_in_subprocess, | target=_test_launch_scheduled_runs_in_subprocess, | ||||
args=[instance.get_ref(), get_current_datetime_in_utc(), debug_crash_flags], | args=[instance.get_ref(), frozen_datetime, debug_crash_flags], | ||||
) | ) | ||||
scheduler_process.start() | scheduler_process.start() | ||||
scheduler_process.join(timeout=60) | scheduler_process.join(timeout=60) | ||||
assert scheduler_process.exitcode != 0 | assert scheduler_process.exitcode != 0 | ||||
# As above there's a possible race condition here if the scheduler crashes | # As above there's a possible race condition here if the scheduler crashes | ||||
# and launches the same run twice if we crash right after the launch and re-run | # and launches the same run twice if we crash right after the launch and re-run | ||||
Show All 14 Lines | with instance_with_schedules(external_repo_context) as (instance, external_repo): | ||||
validate_tick( | validate_tick( | ||||
ticks[0], | ticks[0], | ||||
external_schedule, | external_schedule, | ||||
initial_datetime, | initial_datetime, | ||||
ScheduleTickStatus.SUCCESS, | ScheduleTickStatus.SUCCESS, | ||||
instance.get_runs()[0].run_id, | instance.get_runs()[0].run_id, | ||||
) | ) | ||||
frozen_datetime.tick(delta=timedelta(minutes=5)) | frozen_datetime = frozen_datetime.add(minutes=1) | ||||
with pendulum.test(frozen_datetime): | |||||
# Running again just marks the tick as success since the run has already started | # Running again just marks the tick as success since the run has already started | ||||
scheduler_process = multiprocessing.Process( | scheduler_process = multiprocessing.Process( | ||||
target=_test_launch_scheduled_runs_in_subprocess, | target=_test_launch_scheduled_runs_in_subprocess, | ||||
args=[instance.get_ref(), get_current_datetime_in_utc(), None], | args=[instance.get_ref(), frozen_datetime, None], | ||||
) | ) | ||||
scheduler_process.start() | scheduler_process.start() | ||||
scheduler_process.join(timeout=60) | scheduler_process.join(timeout=60) | ||||
assert scheduler_process.exitcode == 0 | assert scheduler_process.exitcode == 0 | ||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
validate_run_started(instance.get_runs()[0], initial_datetime, "2019-02-26") | validate_run_started(instance.get_runs()[0], initial_datetime, "2019-02-26") | ||||
Show All 9 Lines |