Differential D4742 Diff 24115 python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs.py
import copy | import copy | ||||
from dagster_graphql.test.utils import ( | from dagster_graphql.test.utils import ( | ||||
define_context_for_file, | define_out_of_process_context, | ||||
execute_dagster_graphql, | execute_dagster_graphql, | ||||
infer_pipeline_selector, | infer_pipeline_selector, | ||||
) | ) | ||||
from dagster_graphql_tests.graphql.graphql_context_test_suite import ( | from dagster_graphql_tests.graphql.graphql_context_test_suite import ( | ||||
ExecutingGraphQLContextTestMatrix, | ExecutingGraphQLContextTestMatrix, | ||||
) | ) | ||||
from dagster import execute_pipeline, lambda_solid, pipeline, repository, seven | from dagster import execute_pipeline, lambda_solid, pipeline, repository, seven | ||||
▲ Show 20 Lines • Show All 307 Lines • ▼ Show 20 Lines | with seven.TemporaryDirectory() as temp_dir: | ||||
repo_1.get_pipeline("evolving_pipeline").get_pipeline_subset_def({"solid_A"}), | repo_1.get_pipeline("evolving_pipeline").get_pipeline_subset_def({"solid_A"}), | ||||
instance=instance, | instance=instance, | ||||
).run_id | ).run_id | ||||
evolve_b_run_id = execute_pipeline( | evolve_b_run_id = execute_pipeline( | ||||
repo_1.get_pipeline("evolving_pipeline").get_pipeline_subset_def({"solid_B"}), | repo_1.get_pipeline("evolving_pipeline").get_pipeline_subset_def({"solid_B"}), | ||||
instance=instance, | instance=instance, | ||||
).run_id | ).run_id | ||||
context_at_time_1 = define_context_for_file(__file__, "get_repo_at_time_1", instance) | context_at_time_1 = define_out_of_process_context(__file__, "get_repo_at_time_1", instance) | ||||
result = execute_dagster_graphql(context_at_time_1, ALL_RUNS_QUERY) | result = execute_dagster_graphql(context_at_time_1, ALL_RUNS_QUERY) | ||||
assert result.data | assert result.data | ||||
t1_runs = {run["runId"]: run for run in result.data["pipelineRunsOrError"]["results"]} | t1_runs = {run["runId"]: run for run in result.data["pipelineRunsOrError"]["results"]} | ||||
assert t1_runs[full_evolve_run_id]["pipeline"] == { | assert t1_runs[full_evolve_run_id]["pipeline"] == { | ||||
"__typename": "PipelineSnapshot", | "__typename": "PipelineSnapshot", | ||||
Show All 14 Lines | with seven.TemporaryDirectory() as temp_dir: | ||||
} | } | ||||
assert t1_runs[evolve_b_run_id]["pipeline"] == { | assert t1_runs[evolve_b_run_id]["pipeline"] == { | ||||
"__typename": "PipelineSnapshot", | "__typename": "PipelineSnapshot", | ||||
"name": "evolving_pipeline", | "name": "evolving_pipeline", | ||||
"solidSelection": ["solid_B"], | "solidSelection": ["solid_B"], | ||||
} | } | ||||
context_at_time_2 = define_context_for_file(__file__, "get_repo_at_time_2", instance) | context_at_time_2 = define_out_of_process_context(__file__, "get_repo_at_time_2", instance) | ||||
result = execute_dagster_graphql(context_at_time_2, ALL_RUNS_QUERY) | result = execute_dagster_graphql(context_at_time_2, ALL_RUNS_QUERY) | ||||
assert result.data | assert result.data | ||||
t2_runs = {run["runId"]: run for run in result.data["pipelineRunsOrError"]["results"]} | t2_runs = {run["runId"]: run for run in result.data["pipelineRunsOrError"]["results"]} | ||||
assert t2_runs[full_evolve_run_id]["pipeline"] == { | assert t2_runs[full_evolve_run_id]["pipeline"] == { | ||||
"__typename": "PipelineSnapshot", | "__typename": "PipelineSnapshot", | ||||
Show All 34 Lines | with seven.TemporaryDirectory() as tempdir: | ||||
repo_1.get_pipeline("evolving_pipeline").get_pipeline_subset_def({"solid_A"}), | repo_1.get_pipeline("evolving_pipeline").get_pipeline_subset_def({"solid_A"}), | ||||
instance=instance, | instance=instance, | ||||
).run_id | ).run_id | ||||
evolve_b_run_id = execute_pipeline( | evolve_b_run_id = execute_pipeline( | ||||
repo_1.get_pipeline("evolving_pipeline").get_pipeline_subset_def({"solid_B"}), | repo_1.get_pipeline("evolving_pipeline").get_pipeline_subset_def({"solid_B"}), | ||||
instance=instance, | instance=instance, | ||||
).run_id | ).run_id | ||||
context_at_time_1 = define_context_for_file(__file__, "get_repo_at_time_1", instance) | context_at_time_1 = define_out_of_process_context(__file__, "get_repo_at_time_1", instance) | ||||
result = execute_dagster_graphql(context_at_time_1, ALL_RUN_GROUPS_QUERY) | result = execute_dagster_graphql(context_at_time_1, ALL_RUN_GROUPS_QUERY) | ||||
assert result.data | assert result.data | ||||
assert "runGroupsOrError" in result.data | assert "runGroupsOrError" in result.data | ||||
assert "results" in result.data["runGroupsOrError"] | assert "results" in result.data["runGroupsOrError"] | ||||
assert len(result.data["runGroupsOrError"]["results"]) == 4 | assert len(result.data["runGroupsOrError"]["results"]) == 4 | ||||
t1_runs = { | t1_runs = { | ||||
Show All 26 Lines | with seven.TemporaryDirectory() as tempdir: | ||||
# test evolve_b_run_id | # test evolve_b_run_id | ||||
assert t1_runs[evolve_b_run_id]["pipeline"] == { | assert t1_runs[evolve_b_run_id]["pipeline"] == { | ||||
"__typename": "PipelineSnapshot", | "__typename": "PipelineSnapshot", | ||||
"name": "evolving_pipeline", | "name": "evolving_pipeline", | ||||
"solidSelection": ["solid_B"], | "solidSelection": ["solid_B"], | ||||
} | } | ||||
context_at_time_2 = define_context_for_file(__file__, "get_repo_at_time_2", instance) | context_at_time_2 = define_out_of_process_context(__file__, "get_repo_at_time_2", instance) | ||||
result = execute_dagster_graphql(context_at_time_2, ALL_RUN_GROUPS_QUERY) | result = execute_dagster_graphql(context_at_time_2, ALL_RUN_GROUPS_QUERY) | ||||
assert "runGroupsOrError" in result.data | assert "runGroupsOrError" in result.data | ||||
assert "results" in result.data["runGroupsOrError"] | assert "results" in result.data["runGroupsOrError"] | ||||
assert len(result.data["runGroupsOrError"]["results"]) == 4 | assert len(result.data["runGroupsOrError"]["results"]) == 4 | ||||
t2_runs = { | t2_runs = { | ||||
run["runId"]: run | run["runId"]: run | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | with seven.TemporaryDirectory() as temp_dir: | ||||
instance = DagsterInstance.local_temp(temp_dir) | instance = DagsterInstance.local_temp(temp_dir) | ||||
repo = get_repo_at_time_1() | repo = get_repo_at_time_1() | ||||
run_id_1 = execute_pipeline( | run_id_1 = execute_pipeline( | ||||
repo.get_pipeline("foo_pipeline"), instance=instance, tags={"run": "one"} | repo.get_pipeline("foo_pipeline"), instance=instance, tags={"run": "one"} | ||||
).run_id | ).run_id | ||||
_run_id_2 = execute_pipeline( | _run_id_2 = execute_pipeline( | ||||
repo.get_pipeline("foo_pipeline"), instance=instance, tags={"run": "two"} | repo.get_pipeline("foo_pipeline"), instance=instance, tags={"run": "two"} | ||||
).run_id | ).run_id | ||||
context = define_context_for_file(__file__, "get_repo_at_time_1", instance) | context = define_out_of_process_context(__file__, "get_repo_at_time_1", instance) | ||||
result = execute_dagster_graphql( | result = execute_dagster_graphql( | ||||
context, FILTERED_RUN_QUERY, variables={"filter": {"runId": run_id_1}} | context, FILTERED_RUN_QUERY, variables={"filter": {"runId": run_id_1}} | ||||
) | ) | ||||
assert result.data | assert result.data | ||||
run_ids = [run["runId"] for run in result.data["pipelineRunsOrError"]["results"]] | run_ids = [run["runId"] for run in result.data["pipelineRunsOrError"]["results"]] | ||||
assert len(run_ids) == 1 | assert len(run_ids) == 1 | ||||
assert run_ids[0] == run_id_1 | assert run_ids[0] == run_id_1 | ||||
Show All 21 Lines | with seven.TemporaryDirectory() as temp_dir: | ||||
foo_pipeline, | foo_pipeline, | ||||
parent_run_id=root_run_id, | parent_run_id=root_run_id, | ||||
root_run_id=root_run_id, | root_run_id=root_run_id, | ||||
tags={PARENT_RUN_ID_TAG: root_run_id, ROOT_RUN_ID_TAG: root_run_id}, | tags={PARENT_RUN_ID_TAG: root_run_id, ROOT_RUN_ID_TAG: root_run_id}, | ||||
) | ) | ||||
execute_run(InMemoryPipeline(foo_pipeline), run, instance) | execute_run(InMemoryPipeline(foo_pipeline), run, instance) | ||||
runs.append(run) | runs.append(run) | ||||
context_at_time_1 = define_context_for_file(__file__, "get_repo_at_time_1", instance) | context_at_time_1 = define_out_of_process_context(__file__, "get_repo_at_time_1", instance) | ||||
result_one = execute_dagster_graphql( | result_one = execute_dagster_graphql( | ||||
context_at_time_1, RUN_GROUP_QUERY, variables={"runId": root_run_id}, | context_at_time_1, RUN_GROUP_QUERY, variables={"runId": root_run_id}, | ||||
) | ) | ||||
assert result_one.data["runGroupOrError"]["__typename"] == "RunGroup" | assert result_one.data["runGroupOrError"]["__typename"] == "RunGroup" | ||||
assert len(result_one.data["runGroupOrError"]["runs"]) == 4 | assert len(result_one.data["runGroupOrError"]["runs"]) == 4 | ||||
Show All 10 Lines | with seven.TemporaryDirectory() as temp_dir: | ||||
assert ( | assert ( | ||||
result_one.data["runGroupOrError"]["runs"] == result_two.data["runGroupOrError"]["runs"] | result_one.data["runGroupOrError"]["runs"] == result_two.data["runGroupOrError"]["runs"] | ||||
) | ) | ||||
def test_run_group_not_found(): | def test_run_group_not_found(): | ||||
with seven.TemporaryDirectory() as temp_dir: | with seven.TemporaryDirectory() as temp_dir: | ||||
instance = DagsterInstance.local_temp(temp_dir) | instance = DagsterInstance.local_temp(temp_dir) | ||||
context_at_time_1 = define_context_for_file(__file__, "get_repo_at_time_1", instance) | context_at_time_1 = define_out_of_process_context(__file__, "get_repo_at_time_1", instance) | ||||
result = execute_dagster_graphql( | result = execute_dagster_graphql( | ||||
context_at_time_1, RUN_GROUP_QUERY, variables={"runId": "foo"}, | context_at_time_1, RUN_GROUP_QUERY, variables={"runId": "foo"}, | ||||
) | ) | ||||
assert result.data | assert result.data | ||||
assert result.data["runGroupOrError"] | assert result.data["runGroupOrError"] | ||||
assert result.data["runGroupOrError"]["__typename"] == "RunGroupNotFoundError" | assert result.data["runGroupOrError"]["__typename"] == "RunGroupNotFoundError" | ||||
assert result.data["runGroupOrError"]["runId"] == "foo" | assert result.data["runGroupOrError"]["runId"] == "foo" | ||||
Show All 14 Lines | with seven.TemporaryDirectory() as tempdir: | ||||
for _ in range(5): | for _ in range(5): | ||||
for root_run_id in root_run_ids: | for root_run_id in root_run_ids: | ||||
execute_pipeline( | execute_pipeline( | ||||
foo_pipeline, | foo_pipeline, | ||||
tags={PARENT_RUN_ID_TAG: root_run_id, ROOT_RUN_ID_TAG: root_run_id}, | tags={PARENT_RUN_ID_TAG: root_run_id, ROOT_RUN_ID_TAG: root_run_id}, | ||||
instance=instance, | instance=instance, | ||||
) | ) | ||||
context_at_time_1 = define_context_for_file(__file__, "get_repo_at_time_1", instance) | context_at_time_1 = define_out_of_process_context(__file__, "get_repo_at_time_1", instance) | ||||
result = execute_dagster_graphql(context_at_time_1, ALL_RUN_GROUPS_QUERY) | result = execute_dagster_graphql(context_at_time_1, ALL_RUN_GROUPS_QUERY) | ||||
assert result.data | assert result.data | ||||
assert "runGroupsOrError" in result.data | assert "runGroupsOrError" in result.data | ||||
assert "results" in result.data["runGroupsOrError"] | assert "results" in result.data["runGroupsOrError"] | ||||
assert len(result.data["runGroupsOrError"]["results"]) == 3 | assert len(result.data["runGroupsOrError"]["results"]) == 3 | ||||
for run_group in result.data["runGroupsOrError"]["results"]: | for run_group in result.data["runGroupsOrError"]["results"]: | ||||
assert run_group["rootRunId"] in root_run_ids | assert run_group["rootRunId"] in root_run_ids | ||||
assert len(run_group["runs"]) == 6 | assert len(run_group["runs"]) == 6 |