Differential D4718 Diff 24113 integration_tests/test_suites/k8s-integration-test-suite/test_job_spec.py
Changeset View
Changeset View
Standalone View
Standalone View
integration_tests/test_suites/k8s-integration-test-suite/test_job_spec.py
Show First 20 Lines • Show All 309 Lines • ▼ Show 20 Lines | assert ( | ||||
operator: In | operator: In | ||||
values: | values: | ||||
- e2e-az1 | - e2e-az1 | ||||
- e2e-az2""", | - e2e-az2""", | ||||
).strip() | ).strip() | ||||
) | ) | ||||
def test_k8s_run_launcher(dagster_instance, helm_namespace): | def test_k8s_run_launcher( | ||||
run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), "env.yaml")) | dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher | ||||
pipeline_name = "demo_pipeline" | ): | ||||
run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), 'env.yaml')) | |||||
pipeline_name = 'demo_pipeline' | |||||
run = create_run_for_test( | run = create_run_for_test( | ||||
dagster_instance, pipeline_name=pipeline_name, run_config=run_config, mode="default", | dagster_instance_for_k8s_run_launcher, | ||||
pipeline_name=pipeline_name, | |||||
run_config=run_config, | |||||
mode='default', | |||||
) | ) | ||||
dagster_instance.launch_run( | dagster_instance_for_k8s_run_launcher.launch_run( | ||||
run.run_id, | run.run_id, | ||||
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ||||
) | ) | ||||
result = wait_for_job_and_get_raw_logs( | result = wait_for_job_and_get_raw_logs( | ||||
job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace | job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher | ||||
) | ) | ||||
assert "PIPELINE_SUCCESS" in result, "no match, result: {}".format(result) | assert "PIPELINE_SUCCESS" in result, "no match, result: {}".format(result) | ||||
def test_failing_k8s_run_launcher(dagster_instance, helm_namespace): | def test_failing_k8s_run_launcher( | ||||
run_config = {"blah blah this is wrong": {}} | dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher | ||||
pipeline_name = "demo_pipeline" | ): | ||||
run = create_run_for_test(dagster_instance, pipeline_name=pipeline_name, run_config=run_config) | run_config = {'blah blah this is wrong': {}} | ||||
dagster_instance.launch_run( | pipeline_name = 'demo_pipeline' | ||||
run = create_run_for_test( | |||||
dagster_instance_for_k8s_run_launcher, pipeline_name=pipeline_name, run_config=run_config | |||||
) | |||||
dagster_instance_for_k8s_run_launcher.launch_run( | |||||
run.run_id, | run.run_id, | ||||
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ||||
) | ) | ||||
result = wait_for_job_and_get_raw_logs( | result = wait_for_job_and_get_raw_logs( | ||||
job_name="dagster-run-%s" % run.run_id, namespace=helm_namespace | job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher | ||||
) | ) | ||||
assert "PIPELINE_SUCCESS" not in result, "no match, result: {}".format(result) | assert "PIPELINE_SUCCESS" not in result, "no match, result: {}".format(result) | ||||
event_records = dagster_instance.all_logs(run.run_id) | event_records = dagster_instance_for_k8s_run_launcher.all_logs(run.run_id) | ||||
assert any( | assert any( | ||||
['Undefined field "blah blah this is wrong"' in str(event) for event in event_records] | ['Undefined field "blah blah this is wrong"' in str(event) for event in event_records] | ||||
) | ) | ||||
assert any(['Missing required field "solids"' in str(event) for event in event_records]) | assert any(['Missing required field "solids"' in str(event) for event in event_records]) |