Differential D5228 Diff 26378 python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_failure_recovery.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_failure_recovery.py
import pendulum | import pendulum | ||||
import pytest | import pytest | ||||
from dagster.core.definitions.job import JobType | from dagster.core.definitions.job import JobType | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.scheduler.job import JobState, JobStatus, JobTickStatus | from dagster.core.scheduler.job import JobState, JobStatus, JobTickStatus | ||||
from dagster.core.storage.pipeline_run import PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRunStatus | ||||
from dagster.core.storage.tags import EXECUTION_KEY_TAG, SENSOR_NAME_TAG | from dagster.core.storage.tags import RUN_KEY_TAG, SENSOR_NAME_TAG | ||||
from dagster.core.test_utils import cleanup_test_instance, get_crash_signals | from dagster.core.test_utils import cleanup_test_instance, get_crash_signals | ||||
from dagster.daemon import get_default_daemon_logger | from dagster.daemon import get_default_daemon_logger | ||||
from dagster.scheduler.sensor import execute_sensor_iteration | from dagster.scheduler.sensor import execute_sensor_iteration | ||||
from dagster.seven import IS_WINDOWS, multiprocessing | from dagster.seven import IS_WINDOWS, multiprocessing | ||||
from .test_sensor_run import instance_with_sensors, repos, wait_for_all_runs_to_start | from .test_sensor_run import instance_with_sensors, repos, wait_for_all_runs_to_start | ||||
▲ Show 20 Lines • Show All 108 Lines • ▼ Show 20 Lines | |||||
def test_failure_after_run_created_before_run_launched( | def test_failure_after_run_created_before_run_launched( | ||||
external_repo_context, crash_location, crash_signal, capfd | external_repo_context, crash_location, crash_signal, capfd | ||||
): | ): | ||||
frozen_datetime = pendulum.datetime( | frozen_datetime = pendulum.datetime( | ||||
year=2019, month=2, day=28, hour=0, minute=0, second=0, | year=2019, month=2, day=28, hour=0, minute=0, second=0, | ||||
).in_tz("US/Central") | ).in_tz("US/Central") | ||||
with instance_with_sensors(external_repo_context) as (instance, external_repo): | with instance_with_sensors(external_repo_context) as (instance, external_repo): | ||||
with pendulum.test(frozen_datetime): | with pendulum.test(frozen_datetime): | ||||
external_sensor = external_repo.get_external_sensor("execution_key_sensor") | external_sensor = external_repo.get_external_sensor("run_key_sensor") | ||||
instance.add_job_state( | instance.add_job_state( | ||||
JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) | JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) | ||||
) | ) | ||||
# create a starting tick, but crash | # create a starting tick, but crash | ||||
debug_crash_flags = {external_sensor.name: {crash_location: crash_signal}} | debug_crash_flags = {external_sensor.name: {crash_location: crash_signal}} | ||||
launch_process = multiprocessing.Process( | launch_process = multiprocessing.Process( | ||||
target=_test_launch_sensor_runs_in_subprocess, | target=_test_launch_sensor_runs_in_subprocess, | ||||
args=[instance.get_ref(), frozen_datetime, debug_crash_flags], | args=[instance.get_ref(), frozen_datetime, debug_crash_flags], | ||||
) | ) | ||||
launch_process.start() | launch_process.start() | ||||
launch_process.join(timeout=60) | launch_process.join(timeout=60) | ||||
assert launch_process.exitcode != 0 | assert launch_process.exitcode != 0 | ||||
ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
assert ticks[0].status == JobTickStatus.STARTED | assert ticks[0].status == JobTickStatus.STARTED | ||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
run = instance.get_runs()[0] | run = instance.get_runs()[0] | ||||
# Run was created, but hasn't launched yet | # Run was created, but hasn't launched yet | ||||
assert run.status == PipelineRunStatus.NOT_STARTED | assert run.status == PipelineRunStatus.NOT_STARTED | ||||
assert run.tags.get(SENSOR_NAME_TAG) == "execution_key_sensor" | assert run.tags.get(SENSOR_NAME_TAG) == "run_key_sensor" | ||||
assert run.tags.get(EXECUTION_KEY_TAG) == "only_once" | assert run.tags.get(RUN_KEY_TAG) == "only_once" | ||||
# clear output | # clear output | ||||
capfd.readouterr() | capfd.readouterr() | ||||
launch_process = multiprocessing.Process( | launch_process = multiprocessing.Process( | ||||
target=_test_launch_sensor_runs_in_subprocess, | target=_test_launch_sensor_runs_in_subprocess, | ||||
args=[instance.get_ref(), frozen_datetime.add(seconds=1), None], | args=[instance.get_ref(), frozen_datetime.add(seconds=1), None], | ||||
) | ) | ||||
launch_process.start() | launch_process.start() | ||||
launch_process.join(timeout=60) | launch_process.join(timeout=60) | ||||
assert launch_process.exitcode == 0 | assert launch_process.exitcode == 0 | ||||
wait_for_all_runs_to_start(instance) | wait_for_all_runs_to_start(instance) | ||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
run = instance.get_runs()[0] | run = instance.get_runs()[0] | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert ( | assert ( | ||||
f"Run {run.run_id} already created with the execution key `only_once` for execution_key_sensor" | f"Run {run.run_id} already created with the run key `only_once` for run_key_sensor" | ||||
in captured.out | in captured.out | ||||
) | ) | ||||
ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
assert ticks[0].status == JobTickStatus.SUCCESS | assert ticks[0].status == JobTickStatus.SUCCESS | ||||
@pytest.mark.skipif( | @pytest.mark.skipif( | ||||
IS_WINDOWS, reason="Windows keeps resources open after termination in a flaky way" | IS_WINDOWS, reason="Windows keeps resources open after termination in a flaky way" | ||||
) | ) | ||||
@pytest.mark.parametrize("external_repo_context", repos()) | @pytest.mark.parametrize("external_repo_context", repos()) | ||||
@pytest.mark.parametrize("crash_location", ["RUN_LAUNCHED"]) | @pytest.mark.parametrize("crash_location", ["RUN_LAUNCHED"]) | ||||
@pytest.mark.parametrize("crash_signal", get_crash_signals()) | @pytest.mark.parametrize("crash_signal", get_crash_signals()) | ||||
def test_failure_after_run_launched(external_repo_context, crash_location, crash_signal, capfd): | def test_failure_after_run_launched(external_repo_context, crash_location, crash_signal, capfd): | ||||
frozen_datetime = pendulum.datetime( | frozen_datetime = pendulum.datetime( | ||||
year=2019, month=2, day=28, hour=0, minute=0, second=0, | year=2019, month=2, day=28, hour=0, minute=0, second=0, | ||||
).in_tz("US/Central") | ).in_tz("US/Central") | ||||
with instance_with_sensors(external_repo_context) as (instance, external_repo): | with instance_with_sensors(external_repo_context) as (instance, external_repo): | ||||
with pendulum.test(frozen_datetime): | with pendulum.test(frozen_datetime): | ||||
external_sensor = external_repo.get_external_sensor("execution_key_sensor") | external_sensor = external_repo.get_external_sensor("run_key_sensor") | ||||
instance.add_job_state( | instance.add_job_state( | ||||
JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) | JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) | ||||
) | ) | ||||
# create a run, launch but crash | # create a run, launch but crash | ||||
debug_crash_flags = {external_sensor.name: {crash_location: crash_signal}} | debug_crash_flags = {external_sensor.name: {crash_location: crash_signal}} | ||||
launch_process = multiprocessing.Process( | launch_process = multiprocessing.Process( | ||||
target=_test_launch_sensor_runs_in_subprocess, | target=_test_launch_sensor_runs_in_subprocess, | ||||
args=[instance.get_ref(), frozen_datetime, debug_crash_flags], | args=[instance.get_ref(), frozen_datetime, debug_crash_flags], | ||||
) | ) | ||||
launch_process.start() | launch_process.start() | ||||
launch_process.join(timeout=60) | launch_process.join(timeout=60) | ||||
assert launch_process.exitcode != 0 | assert launch_process.exitcode != 0 | ||||
ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
assert ticks[0].status == JobTickStatus.STARTED | assert ticks[0].status == JobTickStatus.STARTED | ||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
run = instance.get_runs()[0] | run = instance.get_runs()[0] | ||||
wait_for_all_runs_to_start(instance) | wait_for_all_runs_to_start(instance) | ||||
assert run.tags.get(SENSOR_NAME_TAG) == "execution_key_sensor" | assert run.tags.get(SENSOR_NAME_TAG) == "run_key_sensor" | ||||
assert run.tags.get(EXECUTION_KEY_TAG) == "only_once" | assert run.tags.get(RUN_KEY_TAG) == "only_once" | ||||
capfd.readouterr() | capfd.readouterr() | ||||
launch_process = multiprocessing.Process( | launch_process = multiprocessing.Process( | ||||
target=_test_launch_sensor_runs_in_subprocess, | target=_test_launch_sensor_runs_in_subprocess, | ||||
args=[instance.get_ref(), frozen_datetime.add(seconds=1), None], | args=[instance.get_ref(), frozen_datetime.add(seconds=1), None], | ||||
) | ) | ||||
launch_process.start() | launch_process.start() | ||||
launch_process.join(timeout=60) | launch_process.join(timeout=60) | ||||
assert launch_process.exitcode == 0 | assert launch_process.exitcode == 0 | ||||
wait_for_all_runs_to_start(instance) | wait_for_all_runs_to_start(instance) | ||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
run = instance.get_runs()[0] | run = instance.get_runs()[0] | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert ( | assert ( | ||||
f"Run {run.run_id} already completed with the execution key `only_once` for execution_key_sensor" | f"Run {run.run_id} already completed with the run key `only_once` for run_key_sensor" | ||||
in captured.out | in captured.out | ||||
) | ) | ||||
ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
assert ticks[0].status == JobTickStatus.SUCCESS | assert ticks[0].status == JobTickStatus.SUCCESS |