Differential D4718 Diff 24113 integration_tests/test_suites/k8s-integration-test-suite/test_scheduler.py
Changeset View
Changeset View
Standalone View
Standalone View
integration_tests/test_suites/k8s-integration-test-suite/test_scheduler.py
Show First 20 Lines • Show All 89 Lines • ▼ Show 20 Lines | |||||
def get_smaller_external_repo(): | def get_smaller_external_repo(): | ||||
with environ({"DAGSTER_TEST_SMALL_REPO": "1"}): | with environ({"DAGSTER_TEST_SMALL_REPO": "1"}): | ||||
return get_test_external_repo() | return get_test_external_repo() | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_init( | def test_init( | ||||
dagster_instance_with_k8s_scheduler, schedule_tempdir, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, | ||||
schedule_tempdir, | |||||
helm_namespace_for_k8s_run_launcher, | |||||
restore_k8s_cron_tab, | |||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repository = get_test_external_repo() | external_repository = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repository) | instance.reconcile_scheduler_state(external_repository) | ||||
# Check schedules are saved to disk | # Check schedules are saved to disk | ||||
assert "schedules" in os.listdir(schedule_tempdir) | assert "schedules" in os.listdir(schedule_tempdir) | ||||
assert len(instance.all_stored_schedule_state()) == 3 | assert len(instance.all_stored_schedule_state()) == 3 | ||||
schedules = instance.all_stored_schedule_state() | schedules = instance.all_stored_schedule_state() | ||||
for schedule in schedules: | for schedule in schedules: | ||||
assert schedule.status == ScheduleStatus.STOPPED | assert schedule.status == ScheduleStatus.STOPPED | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_re_init( | def test_re_init( | ||||
dagster_instance_with_k8s_scheduler, schedule_tempdir, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, | ||||
schedule_tempdir, | |||||
helm_namespace_for_k8s_run_launcher, | |||||
restore_k8s_cron_tab, | |||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
# Start schedule | # Start schedule | ||||
Show All 11 Lines | ): # pylint:disable=unused-argument | ||||
for state in schedule_states: | for state in schedule_states: | ||||
if state.name == "no_config_pipeline_every_min_schedule": | if state.name == "no_config_pipeline_every_min_schedule": | ||||
assert state == schedule_state | assert state == schedule_state | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_start_and_stop_schedule( | def test_start_and_stop_schedule( | ||||
dagster_instance_with_k8s_scheduler, schedule_tempdir, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, | ||||
schedule_tempdir, | |||||
helm_namespace_for_k8s_run_launcher, | |||||
restore_k8s_cron_tab, | |||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
schedule = external_repo.get_external_schedule( | schedule = external_repo.get_external_schedule( | ||||
schedule_name="no_config_pipeline_every_min_schedule" | schedule_name="no_config_pipeline_every_min_schedule" | ||||
) | ) | ||||
schedule_origin_id = schedule.get_origin_id() | schedule_origin_id = schedule.get_origin_id() | ||||
instance.start_schedule_and_update_storage_state(external_schedule=schedule) | instance.start_schedule_and_update_storage_state(external_schedule=schedule) | ||||
assert "schedules" in os.listdir(schedule_tempdir) | assert "schedules" in os.listdir(schedule_tempdir) | ||||
assert instance.scheduler.get_cron_job(schedule_origin_id=schedule_origin_id) | assert instance.scheduler.get_cron_job(schedule_origin_id=schedule_origin_id) | ||||
instance.stop_schedule_and_update_storage_state(schedule_origin_id=schedule_origin_id) | instance.stop_schedule_and_update_storage_state(schedule_origin_id=schedule_origin_id) | ||||
assert not instance.scheduler.get_cron_job(schedule_origin_id=schedule_origin_id) | assert not instance.scheduler.get_cron_job(schedule_origin_id=schedule_origin_id) | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_start_non_existent_schedule( | def test_start_non_existent_schedule( | ||||
dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, | ||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
with pytest.raises(DagsterScheduleDoesNotExist): | with pytest.raises(DagsterScheduleDoesNotExist): | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.stop_schedule_and_update_storage_state("asdf") | instance.stop_schedule_and_update_storage_state("asdf") | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_start_schedule_cron_job( | def test_start_schedule_cron_job( | ||||
dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, | ||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
instance.start_schedule_and_update_storage_state( | instance.start_schedule_and_update_storage_state( | ||||
Show All 30 Lines | for cron_job in cron_jobs: | ||||
"launch_scheduled_execution", | "launch_scheduled_execution", | ||||
"/tmp/launch_scheduled_execution_output", | "/tmp/launch_scheduled_execution_output", | ||||
"--schedule_name", | "--schedule_name", | ||||
] | ] | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_remove_schedule_def( | def test_remove_schedule_def( | ||||
dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, | ||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
assert len(instance.all_stored_schedule_state()) == 3 | assert len(instance.all_stored_schedule_state()) == 3 | ||||
instance.reconcile_scheduler_state(get_smaller_external_repo()) | instance.reconcile_scheduler_state(get_smaller_external_repo()) | ||||
assert len(instance.all_stored_schedule_state()) == 2 | assert len(instance.all_stored_schedule_state()) == 2 | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_add_schedule_def( | def test_add_schedule_def( | ||||
dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, | ||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_smaller_external_repo() | external_repo = get_smaller_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(get_smaller_external_repo()) | instance.reconcile_scheduler_state(get_smaller_external_repo()) | ||||
# Start all schedule and verify cron tab, schedule storage, and errors | # Start all schedule and verify cron tab, schedule storage, and errors | ||||
Show All 23 Lines | ): # pylint:disable=unused-argument | ||||
assert len(instance.all_stored_schedule_state()) == 3 | assert len(instance.all_stored_schedule_state()) == 3 | ||||
assert len(instance.scheduler.get_all_cron_jobs()) == 3 | assert len(instance.scheduler.get_all_cron_jobs()) == 3 | ||||
assert len(instance.scheduler_debug_info().errors) == 0 | assert len(instance.scheduler_debug_info().errors) == 0 | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_start_and_stop_schedule_cron_tab( | def test_start_and_stop_schedule_cron_tab( | ||||
dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, | ||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
# Start schedule | # Start schedule | ||||
instance.start_schedule_and_update_storage_state( | instance.start_schedule_and_update_storage_state( | ||||
▲ Show 20 Lines • Show All 73 Lines • ▼ Show 20 Lines | ): # pylint:disable=unused-argument | ||||
# Reconcile schedule state, should be in the same state | # Reconcile schedule state, should be in the same state | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
cron_jobs = instance.scheduler.get_all_cron_jobs() | cron_jobs = instance.scheduler.get_all_cron_jobs() | ||||
assert len(cron_jobs) == 0 | assert len(cron_jobs) == 0 | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_script_execution( | def test_script_execution( | ||||
dagster_instance_with_k8s_scheduler, unset_dagster_home, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, | ||||
unset_dagster_home, | |||||
helm_namespace_for_k8s_run_launcher, | |||||
restore_k8s_cron_tab, | |||||
): # pylint:disable=unused-argument,redefined-outer-name | ): # pylint:disable=unused-argument,redefined-outer-name | ||||
with seven.TemporaryDirectory() as tempdir: | with seven.TemporaryDirectory() as tempdir: | ||||
with environ({"DAGSTER_HOME": tempdir}): | with environ({"DAGSTER_HOME": tempdir}): | ||||
local_instance = DagsterInstance.get() | local_instance = DagsterInstance.get() | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
dagster_instance_with_k8s_scheduler.reconcile_scheduler_state(external_repo) | dagster_instance_with_k8s_scheduler.reconcile_scheduler_state(external_repo) | ||||
dagster_instance_with_k8s_scheduler.start_schedule_and_update_storage_state( | dagster_instance_with_k8s_scheduler.start_schedule_and_update_storage_state( | ||||
external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | ||||
) | ) | ||||
local_runs = local_instance.get_runs() | local_runs = local_instance.get_runs() | ||||
assert len(local_runs) == 0 | assert len(local_runs) == 0 | ||||
cron_job_name = external_repo.get_external_schedule( | cron_job_name = external_repo.get_external_schedule( | ||||
"no_config_pipeline_every_min_schedule" | "no_config_pipeline_every_min_schedule" | ||||
).get_origin_id() | ).get_origin_id() | ||||
batch_v1beta1_api = kubernetes.client.BatchV1beta1Api() | batch_v1beta1_api = kubernetes.client.BatchV1beta1Api() | ||||
cron_job = batch_v1beta1_api.read_namespaced_cron_job(cron_job_name, helm_namespace) | cron_job = batch_v1beta1_api.read_namespaced_cron_job( | ||||
cron_job_name, helm_namespace_for_k8s_run_launcher | |||||
) | |||||
container = cron_job.spec.job_template.spec.template.spec.containers[0] | container = cron_job.spec.job_template.spec.template.spec.containers[0] | ||||
command = container.command | command = container.command | ||||
args = container.args | args = container.args | ||||
cli_cmd = [sys.executable, "-m"] + command + args | cli_cmd = [sys.executable, "-m"] + command + args | ||||
p = subprocess.Popen( | p = subprocess.Popen( | ||||
cli_cmd, | cli_cmd, | ||||
stdin=subprocess.PIPE, | stdin=subprocess.PIPE, | ||||
Show All 17 Lines | with seven.TemporaryDirectory() as tempdir: | ||||
pipeline_run = local_instance.get_run_by_id(run_id) | pipeline_run = local_instance.get_run_by_id(run_id) | ||||
assert pipeline_run | assert pipeline_run | ||||
assert pipeline_run.status == PipelineRunStatus.SUCCESS | assert pipeline_run.status == PipelineRunStatus.SUCCESS | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_start_schedule_fails( | def test_start_schedule_fails( | ||||
dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, | ||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
def raises(*args, **kwargs): | def raises(*args, **kwargs): | ||||
Show All 9 Lines | schedule = instance.get_schedule_state( | ||||
external_repo.get_external_schedule("no_config_pipeline_every_min_schedule").get_origin_id() | external_repo.get_external_schedule("no_config_pipeline_every_min_schedule").get_origin_id() | ||||
) | ) | ||||
assert schedule.status == ScheduleStatus.STOPPED | assert schedule.status == ScheduleStatus.STOPPED | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_start_schedule_unsuccessful( | def test_start_schedule_unsuccessful( | ||||
dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, | ||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
def do_nothing(**_): | def do_nothing(**_): | ||||
Show All 11 Lines | ): # pylint:disable=unused-argument | ||||
): | ): | ||||
instance.start_schedule_and_update_storage_state( | instance.start_schedule_and_update_storage_state( | ||||
external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | ||||
) | ) | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_start_schedule_manual_delete_debug( | def test_start_schedule_manual_delete_debug( | ||||
dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, | ||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
instance.start_schedule_and_update_storage_state( | instance.start_schedule_and_update_storage_state( | ||||
Show All 16 Lines | ): # pylint:disable=unused-argument | ||||
# Reconcile should fix error | # Reconcile should fix error | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
debug_info = instance.scheduler_debug_info() | debug_info = instance.scheduler_debug_info() | ||||
assert len(debug_info.errors) == 0 | assert len(debug_info.errors) == 0 | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_start_schedule_manual_add_debug( | def test_start_schedule_manual_add_debug( | ||||
dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, | ||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
# Manually add the schedule from to the crontab | # Manually add the schedule from to the crontab | ||||
instance.scheduler._start_cron_job( # pylint: disable=protected-access | instance.scheduler._start_cron_job( # pylint: disable=protected-access | ||||
external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | ||||
) | ) | ||||
# Check debug command | # Check debug command | ||||
debug_info = instance.scheduler_debug_info() | debug_info = instance.scheduler_debug_info() | ||||
assert len(debug_info.errors) == 1 | assert len(debug_info.errors) == 1 | ||||
# Reconcile should fix error | # Reconcile should fix error | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
debug_info = instance.scheduler_debug_info() | debug_info = instance.scheduler_debug_info() | ||||
assert len(debug_info.errors) == 0 | assert len(debug_info.errors) == 0 | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_stop_schedule_fails( | def test_stop_schedule_fails( | ||||
dagster_instance_with_k8s_scheduler, schedule_tempdir, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, | ||||
schedule_tempdir, | |||||
helm_namespace_for_k8s_run_launcher, | |||||
restore_k8s_cron_tab, | |||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
external_schedule = external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | external_schedule = external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | ||||
Show All 14 Lines | ): # pylint:disable=unused-argument | ||||
schedule = instance.get_schedule_state(schedule_origin_id) | schedule = instance.get_schedule_state(schedule_origin_id) | ||||
assert schedule.status == ScheduleStatus.RUNNING | assert schedule.status == ScheduleStatus.RUNNING | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_stop_schedule_unsuccessful( | def test_stop_schedule_unsuccessful( | ||||
dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, | ||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
def do_nothing(**_): | def do_nothing(**_): | ||||
Show All 15 Lines | ): | ||||
external_repo.get_external_schedule( | external_repo.get_external_schedule( | ||||
"no_config_pipeline_every_min_schedule" | "no_config_pipeline_every_min_schedule" | ||||
).get_origin_id() | ).get_origin_id() | ||||
) | ) | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_wipe( | def test_wipe( | ||||
dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab | dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab | ||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
# Start schedule | # Start schedule | ||||
instance.start_schedule_and_update_storage_state( | instance.start_schedule_and_update_storage_state( | ||||
external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | ||||
) | ) | ||||
# Wipe scheduler | # Wipe scheduler | ||||
instance.wipe_all_schedules() | instance.wipe_all_schedules() | ||||
# Check schedules are wiped | # Check schedules are wiped | ||||
assert instance.all_stored_schedule_state() == [] | assert instance.all_stored_schedule_state() == [] | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_reconcile_failure( | def test_reconcile_failure( | ||||
dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, | ||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
instance.start_schedule_and_update_storage_state( | instance.start_schedule_and_update_storage_state( | ||||
external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | external_repo.get_external_schedule("no_config_pipeline_every_min_schedule") | ||||
) | ) | ||||
Show All 15 Lines | with pytest.raises( | ||||
DagsterScheduleReconciliationError, | DagsterScheduleReconciliationError, | ||||
match="Error 1: Failed to stop\n Error 2: Failed to stop\n Error 3: Failed to refresh", | match="Error 1: Failed to stop\n Error 2: Failed to stop\n Error 3: Failed to refresh", | ||||
): | ): | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
@mark_scheduler | @mark_scheduler | ||||
def test_reconcile_failure_when_deleting_schedule_def( | def test_reconcile_failure_when_deleting_schedule_def( | ||||
dagster_instance_with_k8s_scheduler, helm_namespace, restore_k8s_cron_tab, | dagster_instance_with_k8s_scheduler, helm_namespace_for_k8s_run_launcher, restore_k8s_cron_tab, | ||||
): # pylint:disable=unused-argument | ): # pylint:disable=unused-argument | ||||
instance = dagster_instance_with_k8s_scheduler | instance = dagster_instance_with_k8s_scheduler | ||||
external_repo = get_test_external_repo() | external_repo = get_test_external_repo() | ||||
# Initialize scheduler | # Initialize scheduler | ||||
instance.reconcile_scheduler_state(external_repo) | instance.reconcile_scheduler_state(external_repo) | ||||
assert len(instance.all_stored_schedule_state()) == 3 | assert len(instance.all_stored_schedule_state()) == 3 | ||||
Show All 12 Lines |