Differential D4743 Diff 24684 integration_tests/test_suites/celery-k8s-integration-test-suite/test_integration.py
Changeset View
Changeset View
Standalone View
Standalone View
integration_tests/test_suites/celery-k8s-integration-test-suite/test_integration.py
# pylint doesn't know about pytest fixtures | # pylint doesn't know about pytest fixtures | ||||
# pylint: disable=unused-argument | # pylint: disable=unused-argument | ||||
import datetime | import datetime | ||||
import os | import os | ||||
import sys | import sys | ||||
import time | import time | ||||
import boto3 | import boto3 | ||||
import pytest | import pytest | ||||
from dagster import DagsterEventType | from dagster import DagsterEventType | ||||
from dagster.core.storage.pipeline_run import PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRunStatus | ||||
from dagster.core.test_utils import create_run_for_test | from dagster.core.test_utils import create_run_for_test | ||||
from dagster.seven import get_current_datetime_in_utc | |||||
from dagster.utils import merge_dicts | from dagster.utils import merge_dicts | ||||
from dagster.utils.yaml_utils import merge_yamls | from dagster.utils.yaml_utils import merge_yamls | ||||
from dagster_celery_k8s.launcher import CeleryK8sRunLauncher | from dagster_celery_k8s.launcher import CeleryK8sRunLauncher | ||||
from dagster_k8s.test import wait_for_job_and_get_raw_logs | from dagster_k8s.test import wait_for_job_and_get_raw_logs | ||||
from dagster_test.test_project import ( | from dagster_test.test_project import ( | ||||
ReOriginatedExternalPipelineForTest, | ReOriginatedExternalPipelineForTest, | ||||
get_test_project_external_pipeline, | get_test_project_external_pipeline, | ||||
test_project_environments_path, | test_project_environments_path, | ||||
▲ Show 20 Lines • Show All 181 Lines • ▼ Show 20 Lines | def _test_termination(dagster_instance, run_config): | ||||
dagster_instance.launch_run( | dagster_instance.launch_run( | ||||
run.run_id, | run.run_id, | ||||
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ||||
) | ) | ||||
assert isinstance(dagster_instance.run_launcher, CeleryK8sRunLauncher) | assert isinstance(dagster_instance.run_launcher, CeleryK8sRunLauncher) | ||||
# Wait for pipeline run to start | # Wait for pipeline run to start | ||||
timeout = datetime.timedelta(0, 120) | timeout = datetime.timedelta(0, 120) | ||||
start_time = datetime.datetime.now() | start_time = get_current_datetime_in_utc() | ||||
can_terminate = False | can_terminate = False | ||||
while datetime.datetime.now() < start_time + timeout: | while get_current_datetime_in_utc() < start_time + timeout: | ||||
if dagster_instance.run_launcher.can_terminate(run_id=run.run_id): | if dagster_instance.run_launcher.can_terminate(run_id=run.run_id): | ||||
can_terminate = True | can_terminate = True | ||||
break | break | ||||
time.sleep(5) | time.sleep(5) | ||||
assert can_terminate | assert can_terminate | ||||
# Wait for step to start | # Wait for step to start | ||||
step_start_found = False | step_start_found = False | ||||
start_time = datetime.datetime.now() | start_time = get_current_datetime_in_utc() | ||||
while datetime.datetime.now() < start_time + timeout: | while get_current_datetime_in_utc() < start_time + timeout: | ||||
event_records = dagster_instance.all_logs(run.run_id) | event_records = dagster_instance.all_logs(run.run_id) | ||||
for event_record in event_records: | for event_record in event_records: | ||||
if ( | if ( | ||||
event_record.dagster_event | event_record.dagster_event | ||||
and event_record.dagster_event.event_type == DagsterEventType.STEP_START | and event_record.dagster_event.event_type == DagsterEventType.STEP_START | ||||
): | ): | ||||
step_start_found = True | step_start_found = True | ||||
break | break | ||||
time.sleep(5) | time.sleep(5) | ||||
assert step_start_found | assert step_start_found | ||||
# Terminate run | # Terminate run | ||||
assert dagster_instance.run_launcher.can_terminate(run_id=run.run_id) | assert dagster_instance.run_launcher.can_terminate(run_id=run.run_id) | ||||
assert dagster_instance.run_launcher.terminate(run_id=run.run_id) | assert dagster_instance.run_launcher.terminate(run_id=run.run_id) | ||||
# Check that pipeline run is marked as failed | # Check that pipeline run is marked as failed | ||||
pipeline_run_status_failure = False | pipeline_run_status_failure = False | ||||
start_time = datetime.datetime.now() | start_time = get_current_datetime_in_utc() | ||||
while datetime.datetime.now() < start_time + timeout: | while get_current_datetime_in_utc() < start_time + timeout: | ||||
pipeline_run = dagster_instance.get_run_by_id(run.run_id) | pipeline_run = dagster_instance.get_run_by_id(run.run_id) | ||||
if pipeline_run.status == PipelineRunStatus.FAILURE: | if pipeline_run.status == PipelineRunStatus.FAILURE: | ||||
pipeline_run_status_failure = True | pipeline_run_status_failure = True | ||||
break | break | ||||
time.sleep(5) | time.sleep(5) | ||||
assert pipeline_run_status_failure | assert pipeline_run_status_failure | ||||
# Check that terminate cannot be called again | # Check that terminate cannot be called again | ||||
assert not dagster_instance.run_launcher.can_terminate(run_id=run.run_id) | assert not dagster_instance.run_launcher.can_terminate(run_id=run.run_id) | ||||
assert not dagster_instance.run_launcher.terminate(run_id=run.run_id) | assert not dagster_instance.run_launcher.terminate(run_id=run.run_id) | ||||
# Check for step failure and resource tear down | # Check for step failure and resource tear down | ||||
expected_events_found = False | expected_events_found = False | ||||
start_time = datetime.datetime.now() | start_time = get_current_datetime_in_utc() | ||||
while datetime.datetime.now() < start_time + timeout: | while get_current_datetime_in_utc() < start_time + timeout: | ||||
step_failures_count = 0 | step_failures_count = 0 | ||||
resource_tear_down_count = 0 | resource_tear_down_count = 0 | ||||
resource_init_count = 0 | resource_init_count = 0 | ||||
termination_request_count = 0 | termination_request_count = 0 | ||||
termination_success_count = 0 | termination_success_count = 0 | ||||
event_records = dagster_instance.all_logs(run.run_id) | event_records = dagster_instance.all_logs(run.run_id) | ||||
for event_record in event_records: | for event_record in event_records: | ||||
if event_record.dagster_event: | if event_record.dagster_event: | ||||
▲ Show 20 Lines • Show All 96 Lines • ▼ Show 20 Lines | ): | ||||
dagster_instance.launch_run( | dagster_instance.launch_run( | ||||
run.run_id, | run.run_id, | ||||
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ||||
) | ) | ||||
assert isinstance(dagster_instance.run_launcher, CeleryK8sRunLauncher) | assert isinstance(dagster_instance.run_launcher, CeleryK8sRunLauncher) | ||||
# Check that pipeline run is marked as failed | # Check that pipeline run is marked as failed | ||||
pipeline_run_status_failure = False | pipeline_run_status_failure = False | ||||
start_time = datetime.datetime.now() | start_time = get_current_datetime_in_utc() | ||||
timeout = datetime.timedelta(0, 120) | timeout = datetime.timedelta(0, 120) | ||||
while datetime.datetime.now() < start_time + timeout: | while get_current_datetime_in_utc() < start_time + timeout: | ||||
pipeline_run = dagster_instance.get_run_by_id(run.run_id) | pipeline_run = dagster_instance.get_run_by_id(run.run_id) | ||||
if pipeline_run.status == PipelineRunStatus.FAILURE: | if pipeline_run.status == PipelineRunStatus.FAILURE: | ||||
pipeline_run_status_failure = True | pipeline_run_status_failure = True | ||||
break | break | ||||
time.sleep(5) | time.sleep(5) | ||||
assert pipeline_run_status_failure | assert pipeline_run_status_failure | ||||
# Check for step failure for hard_fail_or_0.compute | # Check for step failure for hard_fail_or_0.compute | ||||
start_time = datetime.datetime.now() | start_time = get_current_datetime_in_utc() | ||||
step_failure_found = False | step_failure_found = False | ||||
while datetime.datetime.now() < start_time + timeout: | while get_current_datetime_in_utc() < start_time + timeout: | ||||
event_records = dagster_instance.all_logs(run.run_id) | event_records = dagster_instance.all_logs(run.run_id) | ||||
for event_record in event_records: | for event_record in event_records: | ||||
if event_record.dagster_event: | if event_record.dagster_event: | ||||
if ( | if ( | ||||
event_record.dagster_event.event_type == DagsterEventType.STEP_FAILURE | event_record.dagster_event.event_type == DagsterEventType.STEP_FAILURE | ||||
and event_record.dagster_event.step_key == "hard_fail_or_0.compute" | and event_record.dagster_event.step_key == "hard_fail_or_0.compute" | ||||
): | ): | ||||
step_failure_found = True | step_failure_found = True | ||||
break | break | ||||
time.sleep(5) | time.sleep(5) | ||||
assert step_failure_found | assert step_failure_found |