Differential D4718 Diff 24113 integration_tests/test_suites/k8s-integration-test-suite/test_integration.py
Changeset View
Changeset View
Standalone View
Standalone View
integration_tests/test_suites/k8s-integration-test-suite/test_integration.py
Show All 11 Lines | |||||
) | ) | ||||
from dagster.core.storage.pipeline_run import PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRunStatus | ||||
from dagster.core.test_utils import create_run_for_test | from dagster.core.test_utils import create_run_for_test | ||||
from dagster.utils import load_yaml_from_path | from dagster.utils import load_yaml_from_path | ||||
@pytest.mark.integration | @pytest.mark.integration | ||||
def test_k8s_run_launcher_default(dagster_instance, helm_namespace): | def test_k8s_run_launcher_default( | ||||
run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), "env.yaml")) | dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher | ||||
pipeline_name = "demo_pipeline" | ): | ||||
tags = {"key": "value"} | run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), 'env.yaml')) | ||||
pipeline_name = 'demo_pipeline' | |||||
tags = {'key': 'value'} | |||||
run = create_run_for_test( | run = create_run_for_test( | ||||
dagster_instance, | dagster_instance_for_k8s_run_launcher, | ||||
pipeline_name=pipeline_name, | pipeline_name=pipeline_name, | ||||
run_config=run_config, | run_config=run_config, | ||||
tags=tags, | tags=tags, | ||||
mode="default", | mode="default", | ||||
) | ) | ||||
dagster_instance.launch_run( | dagster_instance_for_k8s_run_launcher.launch_run( | ||||
run.run_id, | run.run_id, | ||||
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ||||
) | ) | ||||
result = wait_for_job_and_get_raw_logs( | result = wait_for_job_and_get_raw_logs( | ||||
job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace | job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher | ||||
) | ) | ||||
assert "PIPELINE_SUCCESS" in result, "no match, result: {}".format(result) | assert "PIPELINE_SUCCESS" in result, "no match, result: {}".format(result) | ||||
@pytest.mark.integration | @pytest.mark.integration | ||||
def test_failing_k8s_run_launcher(dagster_instance, helm_namespace): | def test_failing_k8s_run_launcher( | ||||
run_config = {"blah blah this is wrong": {}} | dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher | ||||
pipeline_name = "demo_pipeline" | ): | ||||
run = create_run_for_test(dagster_instance, pipeline_name=pipeline_name, run_config=run_config) | run_config = {'blah blah this is wrong': {}} | ||||
pipeline_name = 'demo_pipeline' | |||||
run = create_run_for_test( | |||||
dagster_instance_for_k8s_run_launcher, pipeline_name=pipeline_name, run_config=run_config | |||||
) | |||||
dagster_instance.launch_run( | dagster_instance_for_k8s_run_launcher.launch_run( | ||||
run.run_id, | run.run_id, | ||||
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ||||
) | ) | ||||
result = wait_for_job_and_get_raw_logs( | result = wait_for_job_and_get_raw_logs( | ||||
job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace | job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher | ||||
) | ) | ||||
assert "PIPELINE_SUCCESS" not in result, "no match, result: {}".format(result) | assert "PIPELINE_SUCCESS" not in result, "no match, result: {}".format(result) | ||||
event_records = dagster_instance.all_logs(run.run_id) | event_records = dagster_instance_for_k8s_run_launcher.all_logs(run.run_id) | ||||
assert any( | assert any( | ||||
['Undefined field "blah blah this is wrong"' in str(event) for event in event_records] | ['Undefined field "blah blah this is wrong"' in str(event) for event in event_records] | ||||
) | ) | ||||
assert any(['Missing required field "solids"' in str(event) for event in event_records]) | assert any(['Missing required field "solids"' in str(event) for event in event_records]) | ||||
@pytest.mark.integration | @pytest.mark.integration | ||||
def test_k8s_run_launcher_terminate(dagster_instance, helm_namespace): | def test_k8s_run_launcher_terminate( | ||||
pipeline_name = "slow_pipeline" | dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher | ||||
): | |||||
pipeline_name = 'slow_pipeline' | |||||
tags = {"key": "value"} | tags = {"key": "value"} | ||||
run = create_run_for_test( | run = create_run_for_test( | ||||
dagster_instance, pipeline_name=pipeline_name, run_config=None, tags=tags, mode="default", | dagster_instance_for_k8s_run_launcher, | ||||
pipeline_name=pipeline_name, | |||||
run_config=None, | |||||
tags=tags, | |||||
mode='default', | |||||
) | ) | ||||
dagster_instance.launch_run( | dagster_instance_for_k8s_run_launcher.launch_run( | ||||
run.run_id, | run.run_id, | ||||
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ||||
) | ) | ||||
wait_for_job(job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace) | wait_for_job( | ||||
job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher | |||||
) | |||||
timeout = datetime.timedelta(0, 30) | timeout = datetime.timedelta(0, 30) | ||||
start_time = datetime.datetime.now() | start_time = datetime.datetime.now() | ||||
while datetime.datetime.now() < start_time + timeout: | while datetime.datetime.now() < start_time + timeout: | ||||
if dagster_instance.run_launcher.can_terminate(run_id=run.run_id): | if dagster_instance_for_k8s_run_launcher.run_launcher.can_terminate(run_id=run.run_id): | ||||
break | break | ||||
time.sleep(5) | time.sleep(5) | ||||
assert dagster_instance.run_launcher.can_terminate(run_id=run.run_id) | assert dagster_instance_for_k8s_run_launcher.run_launcher.can_terminate(run_id=run.run_id) | ||||
assert dagster_instance.run_launcher.terminate(run_id=run.run_id) | assert dagster_instance_for_k8s_run_launcher.run_launcher.terminate(run_id=run.run_id) | ||||
start_time = datetime.datetime.now() | start_time = datetime.datetime.now() | ||||
pipeline_run = None | pipeline_run = None | ||||
while datetime.datetime.now() < start_time + timeout: | while datetime.datetime.now() < start_time + timeout: | ||||
pipeline_run = dagster_instance.get_run_by_id(run.run_id) | pipeline_run = dagster_instance_for_k8s_run_launcher.get_run_by_id(run.run_id) | ||||
if pipeline_run.status == PipelineRunStatus.FAILURE: | if pipeline_run.status == PipelineRunStatus.FAILURE: | ||||
break | break | ||||
time.sleep(5) | time.sleep(5) | ||||
assert pipeline_run.status == PipelineRunStatus.FAILURE | assert pipeline_run.status == PipelineRunStatus.FAILURE | ||||
assert not dagster_instance.run_launcher.terminate(run_id=run.run_id) | assert not dagster_instance_for_k8s_run_launcher.run_launcher.terminate(run_id=run.run_id) |