Differential D4718 Diff 23670 integration_tests/test_suites/k8s-integration-test-suite/test_job_spec.py
Changeset View
Changeset View
Standalone View
Standalone View
integration_tests/test_suites/k8s-integration-test-suite/test_job_spec.py
Show First 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | spec: | ||||
- executeRunInProcess | - executeRunInProcess | ||||
- -v | - -v | ||||
- '{{"runId": "{run_id}"}}' | - '{{"runId": "{run_id}"}}' | ||||
command: | command: | ||||
- dagster-graphql | - dagster-graphql | ||||
env: | env: | ||||
- name: DAGSTER_HOME | - name: DAGSTER_HOME | ||||
value: /opt/dagster/dagster_home | value: /opt/dagster/dagster_home | ||||
- name: DAGSTER_K8S_INSTANCE_CONFIG_MAP | |||||
value: dagster-instance | |||||
- name: DAGSTER_K8S_PG_PASSWORD_SECRET | |||||
value: dagster-postgresql-secret | |||||
- name: DAGSTER_PG_PASSWORD | - name: DAGSTER_PG_PASSWORD | ||||
value_from: | value_from: | ||||
secret_key_ref: | secret_key_ref: | ||||
key: postgresql-password | key: postgresql-password | ||||
name: dagster-postgresql-secret | name: dagster-postgresql-secret | ||||
env_from: | env_from: | ||||
- config_map_ref: | - config_map_ref: | ||||
name: dagster-pipeline-env | name: dagster-pipeline-env | ||||
▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | spec: | ||||
- executeRunInProcess | - executeRunInProcess | ||||
- -v | - -v | ||||
- '{{"runId": "{run_id}"}}' | - '{{"runId": "{run_id}"}}' | ||||
command: | command: | ||||
- dagster-graphql | - dagster-graphql | ||||
env: | env: | ||||
- name: DAGSTER_HOME | - name: DAGSTER_HOME | ||||
value: /opt/dagster/dagster_home | value: /opt/dagster/dagster_home | ||||
- name: DAGSTER_K8S_INSTANCE_CONFIG_MAP | |||||
value: dagster-instance | |||||
- name: DAGSTER_K8S_PG_PASSWORD_SECRET | |||||
value: dagster-postgresql-secret | |||||
- name: DAGSTER_PG_PASSWORD | - name: DAGSTER_PG_PASSWORD | ||||
value_from: | value_from: | ||||
secret_key_ref: | secret_key_ref: | ||||
key: postgresql-password | key: postgresql-password | ||||
name: dagster-postgresql-secret | name: dagster-postgresql-secret | ||||
env_from: | env_from: | ||||
- config_map_ref: | - config_map_ref: | ||||
name: dagster-pipeline-env | name: dagster-pipeline-env | ||||
▲ Show 20 Lines • Show All 181 Lines • ▼ Show 20 Lines | assert ( | ||||
operator: In | operator: In | ||||
values: | values: | ||||
- e2e-az1 | - e2e-az1 | ||||
- e2e-az2''', | - e2e-az2''', | ||||
).strip() | ).strip() | ||||
) | ) | ||||
def test_k8s_run_launcher(dagster_instance, helm_namespace): | def test_k8s_run_launcher( | ||||
dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher | |||||
): | |||||
run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), 'env.yaml')) | run_config = load_yaml_from_path(os.path.join(test_project_environments_path(), 'env.yaml')) | ||||
pipeline_name = 'demo_pipeline' | pipeline_name = 'demo_pipeline' | ||||
run = create_run_for_test( | run = create_run_for_test( | ||||
dagster_instance, pipeline_name=pipeline_name, run_config=run_config, mode='default', | dagster_instance_for_k8s_run_launcher, | ||||
pipeline_name=pipeline_name, | |||||
run_config=run_config, | |||||
mode='default', | |||||
) | ) | ||||
dagster_instance.launch_run( | dagster_instance_for_k8s_run_launcher.launch_run( | ||||
run.run_id, | run.run_id, | ||||
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ||||
) | ) | ||||
result = wait_for_job_and_get_raw_logs( | result = wait_for_job_and_get_raw_logs( | ||||
job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace | job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher | ||||
) | ) | ||||
assert 'PIPELINE_SUCCESS' in result, 'no match, result: {}'.format(result) | assert 'PIPELINE_SUCCESS' in result, 'no match, result: {}'.format(result) | ||||
def test_failing_k8s_run_launcher(dagster_instance, helm_namespace): | def test_failing_k8s_run_launcher( | ||||
dagster_instance_for_k8s_run_launcher, helm_namespace_for_k8s_run_launcher | |||||
): | |||||
run_config = {'blah blah this is wrong': {}} | run_config = {'blah blah this is wrong': {}} | ||||
pipeline_name = 'demo_pipeline' | pipeline_name = 'demo_pipeline' | ||||
run = create_run_for_test(dagster_instance, pipeline_name=pipeline_name, run_config=run_config) | run = create_run_for_test( | ||||
dagster_instance.launch_run( | dagster_instance_for_k8s_run_launcher, pipeline_name=pipeline_name, run_config=run_config | ||||
) | |||||
dagster_instance_for_k8s_run_launcher.launch_run( | |||||
run.run_id, | run.run_id, | ||||
ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ReOriginatedExternalPipelineForTest(get_test_project_external_pipeline(pipeline_name)), | ||||
) | ) | ||||
result = wait_for_job_and_get_raw_logs( | result = wait_for_job_and_get_raw_logs( | ||||
job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace | job_name='dagster-run-%s' % run.run_id, namespace=helm_namespace_for_k8s_run_launcher | ||||
) | ) | ||||
assert 'PIPELINE_SUCCESS' not in result, 'no match, result: {}'.format(result) | assert 'PIPELINE_SUCCESS' not in result, 'no match, result: {}'.format(result) | ||||
event_records = dagster_instance.all_logs(run.run_id) | event_records = dagster_instance_for_k8s_run_launcher.all_logs(run.run_id) | ||||
assert any( | assert any( | ||||
['Undefined field "blah blah this is wrong"' in str(event) for event in event_records] | ['Undefined field "blah blah this is wrong"' in str(event) for event in event_records] | ||||
) | ) | ||||
assert any(['Missing required field "solids"' in str(event) for event in event_records]) | assert any(['Missing required field "solids"' in str(event) for event in event_records]) |