Differential D4743 Diff 24684 integration_tests/test_suites/k8s-integration-test-suite/test_integration.py
Changeset View
Changeset View
Standalone View
Standalone View
integration_tests/test_suites/k8s-integration-test-suite/test_integration.py
import datetime | import datetime | ||||
import os | import os | ||||
import time | import time | ||||
import pytest | import pytest | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.storage.pipeline_run import PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRunStatus | ||||
from dagster.core.test_utils import create_run_for_test | from dagster.core.test_utils import create_run_for_test | ||||
from dagster.seven import get_current_datetime_in_utc | |||||
from dagster.utils import load_yaml_from_path | from dagster.utils import load_yaml_from_path | ||||
from dagster_k8s.client import DagsterKubernetesClient | from dagster_k8s.client import DagsterKubernetesClient | ||||
from dagster_k8s.launcher import K8sRunLauncher | from dagster_k8s.launcher import K8sRunLauncher | ||||
from dagster_k8s.test import wait_for_job_and_get_raw_logs | from dagster_k8s.test import wait_for_job_and_get_raw_logs | ||||
from dagster_k8s.utils import wait_for_job | from dagster_k8s.utils import wait_for_job | ||||
from dagster_test.test_project import ( | from dagster_test.test_project import ( | ||||
ReOriginatedExternalPipelineForTest, | ReOriginatedExternalPipelineForTest, | ||||
get_test_project_external_pipeline, | get_test_project_external_pipeline, | ||||
▲ Show 20 Lines • Show All 84 Lines • ▼ Show 20 Lines | dagster_instance_for_k8s_run_launcher.launch_run( | ||||
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ||||
) | ) | ||||
wait_for_job( | wait_for_job( | ||||
job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace_for_k8s_run_launcher | job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace_for_k8s_run_launcher | ||||
) | ) | ||||
timeout = datetime.timedelta(0, 30) | timeout = datetime.timedelta(0, 30) | ||||
start_time = datetime.datetime.now() | start_time = get_current_datetime_in_utc() | ||||
while datetime.datetime.now() < start_time + timeout: | while get_current_datetime_in_utc() < start_time + timeout: | ||||
if dagster_instance_for_k8s_run_launcher.run_launcher.can_terminate(run_id=run.run_id): | if dagster_instance_for_k8s_run_launcher.run_launcher.can_terminate(run_id=run.run_id): | ||||
break | break | ||||
time.sleep(5) | time.sleep(5) | ||||
assert dagster_instance_for_k8s_run_launcher.run_launcher.can_terminate(run_id=run.run_id) | assert dagster_instance_for_k8s_run_launcher.run_launcher.can_terminate(run_id=run.run_id) | ||||
assert dagster_instance_for_k8s_run_launcher.run_launcher.terminate(run_id=run.run_id) | assert dagster_instance_for_k8s_run_launcher.run_launcher.terminate(run_id=run.run_id) | ||||
start_time = datetime.datetime.now() | start_time = get_current_datetime_in_utc() | ||||
pipeline_run = None | pipeline_run = None | ||||
while datetime.datetime.now() < start_time + timeout: | while get_current_datetime_in_utc() < start_time + timeout: | ||||
pipeline_run = dagster_instance_for_k8s_run_launcher.get_run_by_id(run.run_id) | pipeline_run = dagster_instance_for_k8s_run_launcher.get_run_by_id(run.run_id) | ||||
if pipeline_run.status == PipelineRunStatus.FAILURE: | if pipeline_run.status == PipelineRunStatus.FAILURE: | ||||
break | break | ||||
time.sleep(5) | time.sleep(5) | ||||
assert pipeline_run.status == PipelineRunStatus.FAILURE | assert pipeline_run.status == PipelineRunStatus.FAILURE | ||||
assert not dagster_instance_for_k8s_run_launcher.run_launcher.terminate(run_id=run.run_id) | assert not dagster_instance_for_k8s_run_launcher.run_launcher.terminate(run_id=run.run_id) |