Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/utils/test/run_storage.py
Show First 20 Lines • Show All 261 Lines • ▼ Show 20 Lines | def test_fetch_by_filter(self, storage): | ||||
some_runs = storage.get_runs(PipelineRunsFilter(pipeline_name="some_pipeline")) | some_runs = storage.get_runs(PipelineRunsFilter(pipeline_name="some_pipeline")) | ||||
count = storage.get_runs_count(PipelineRunsFilter(pipeline_name="some_pipeline")) | count = storage.get_runs_count(PipelineRunsFilter(pipeline_name="some_pipeline")) | ||||
assert len(some_runs) == 2 | assert len(some_runs) == 2 | ||||
assert count == 2 | assert count == 2 | ||||
assert some_runs[0].run_id == two | assert some_runs[0].run_id == two | ||||
assert some_runs[1].run_id == one | assert some_runs[1].run_id == one | ||||
some_runs = storage.get_runs(PipelineRunsFilter(status=PipelineRunStatus.SUCCESS)) | some_runs = storage.get_runs(PipelineRunsFilter(statuses=[PipelineRunStatus.SUCCESS])) | ||||
count = storage.get_runs_count(PipelineRunsFilter(status=PipelineRunStatus.SUCCESS)) | count = storage.get_runs_count(PipelineRunsFilter(statuses=[PipelineRunStatus.SUCCESS])) | ||||
assert len(some_runs) == 2 | assert len(some_runs) == 2 | ||||
assert count == 2 | assert count == 2 | ||||
assert some_runs[0].run_id == three | assert some_runs[0].run_id == three | ||||
assert some_runs[1].run_id == one | assert some_runs[1].run_id == one | ||||
some_runs = storage.get_runs(PipelineRunsFilter(tags={"tag": "hello"})) | some_runs = storage.get_runs(PipelineRunsFilter(tags={"tag": "hello"})) | ||||
count = storage.get_runs_count(PipelineRunsFilter(tags={"tag": "hello"})) | count = storage.get_runs_count(PipelineRunsFilter(tags={"tag": "hello"})) | ||||
assert len(some_runs) == 2 | assert len(some_runs) == 2 | ||||
Show All 17 Lines | def test_fetch_by_filter(self, storage): | ||||
assert count == 2 | assert count == 2 | ||||
assert some_runs[0].run_id == two | assert some_runs[0].run_id == two | ||||
assert some_runs[1].run_id == one | assert some_runs[1].run_id == one | ||||
some_runs = storage.get_runs( | some_runs = storage.get_runs( | ||||
PipelineRunsFilter( | PipelineRunsFilter( | ||||
pipeline_name="some_pipeline", | pipeline_name="some_pipeline", | ||||
tags={"tag": "hello"}, | tags={"tag": "hello"}, | ||||
status=PipelineRunStatus.SUCCESS, | statuses=[PipelineRunStatus.SUCCESS], | ||||
) | ) | ||||
) | ) | ||||
count = storage.get_runs_count( | count = storage.get_runs_count( | ||||
PipelineRunsFilter( | PipelineRunsFilter( | ||||
pipeline_name="some_pipeline", | pipeline_name="some_pipeline", | ||||
tags={"tag": "hello"}, | tags={"tag": "hello"}, | ||||
status=PipelineRunStatus.SUCCESS, | statuses=[PipelineRunStatus.SUCCESS], | ||||
) | ) | ||||
) | ) | ||||
assert len(some_runs) == 1 | assert len(some_runs) == 1 | ||||
assert count == 1 | assert count == 1 | ||||
assert some_runs[0].run_id == one | assert some_runs[0].run_id == one | ||||
# All filters | # All filters | ||||
some_runs = storage.get_runs( | some_runs = storage.get_runs( | ||||
PipelineRunsFilter( | PipelineRunsFilter( | ||||
run_ids=[one], | run_ids=[one], | ||||
pipeline_name="some_pipeline", | pipeline_name="some_pipeline", | ||||
tags={"tag": "hello"}, | tags={"tag": "hello"}, | ||||
status=PipelineRunStatus.SUCCESS, | statuses=[PipelineRunStatus.SUCCESS], | ||||
) | ) | ||||
) | ) | ||||
count = storage.get_runs_count( | count = storage.get_runs_count( | ||||
PipelineRunsFilter( | PipelineRunsFilter( | ||||
run_ids=[one], | run_ids=[one], | ||||
pipeline_name="some_pipeline", | pipeline_name="some_pipeline", | ||||
tags={"tag": "hello"}, | tags={"tag": "hello"}, | ||||
status=PipelineRunStatus.SUCCESS, | statuses=[PipelineRunStatus.SUCCESS], | ||||
) | ) | ||||
) | ) | ||||
assert len(some_runs) == 1 | assert len(some_runs) == 1 | ||||
assert count == 1 | assert count == 1 | ||||
assert some_runs[0].run_id == one | assert some_runs[0].run_id == one | ||||
some_runs = storage.get_runs(PipelineRunsFilter()) | some_runs = storage.get_runs(PipelineRunsFilter()) | ||||
count = storage.get_runs_count(PipelineRunsFilter()) | count = storage.get_runs_count(PipelineRunsFilter()) | ||||
▲ Show 20 Lines • Show All 135 Lines • ▼ Show 20 Lines | def test_fetch_by_status(self, storage): | ||||
storage.add_run( | storage.add_run( | ||||
TestRunStorage.build_run( | TestRunStorage.build_run( | ||||
run_id=four, pipeline_name="some_pipeline", status=PipelineRunStatus.FAILURE | run_id=four, pipeline_name="some_pipeline", status=PipelineRunStatus.FAILURE | ||||
) | ) | ||||
) | ) | ||||
assert { | assert { | ||||
run.run_id | run.run_id | ||||
for run in storage.get_runs(PipelineRunsFilter(status=PipelineRunStatus.NOT_STARTED)) | for run in storage.get_runs( | ||||
PipelineRunsFilter(statuses=[PipelineRunStatus.NOT_STARTED]) | |||||
) | |||||
} == {one} | } == {one} | ||||
assert { | assert { | ||||
run.run_id | run.run_id | ||||
for run in storage.get_runs(PipelineRunsFilter(status=PipelineRunStatus.STARTED)) | for run in storage.get_runs(PipelineRunsFilter(statuses=[PipelineRunStatus.STARTED])) | ||||
} == {two, three,} | } == {two, three,} | ||||
assert { | assert { | ||||
run.run_id | run.run_id | ||||
for run in storage.get_runs(PipelineRunsFilter(status=PipelineRunStatus.FAILURE)) | for run in storage.get_runs(PipelineRunsFilter(statuses=[PipelineRunStatus.FAILURE])) | ||||
} == {four} | } == {four} | ||||
assert { | assert { | ||||
run.run_id | run.run_id | ||||
for run in storage.get_runs(PipelineRunsFilter(status=PipelineRunStatus.SUCCESS)) | for run in storage.get_runs(PipelineRunsFilter(statuses=[PipelineRunStatus.SUCCESS])) | ||||
} == set() | } == set() | ||||
def test_fetch_by_status_cursored(self, storage): | def test_fetch_by_status_cursored(self, storage): | ||||
assert storage | assert storage | ||||
one = make_new_run_id() | one = make_new_run_id() | ||||
two = make_new_run_id() | two = make_new_run_id() | ||||
three = make_new_run_id() | three = make_new_run_id() | ||||
four = make_new_run_id() | four = make_new_run_id() | ||||
Show All 14 Lines | def test_fetch_by_status_cursored(self, storage): | ||||
) | ) | ||||
storage.add_run( | storage.add_run( | ||||
TestRunStorage.build_run( | TestRunStorage.build_run( | ||||
run_id=four, pipeline_name="some_pipeline", status=PipelineRunStatus.STARTED | run_id=four, pipeline_name="some_pipeline", status=PipelineRunStatus.STARTED | ||||
) | ) | ||||
) | ) | ||||
cursor_four_runs = storage.get_runs( | cursor_four_runs = storage.get_runs( | ||||
PipelineRunsFilter(status=PipelineRunStatus.STARTED), cursor=four | PipelineRunsFilter(statuses=[PipelineRunStatus.STARTED]), cursor=four | ||||
) | ) | ||||
assert len(cursor_four_runs) == 2 | assert len(cursor_four_runs) == 2 | ||||
assert {run.run_id for run in cursor_four_runs} == {one, two} | assert {run.run_id for run in cursor_four_runs} == {one, two} | ||||
cursor_two_runs = storage.get_runs( | cursor_two_runs = storage.get_runs( | ||||
PipelineRunsFilter(status=PipelineRunStatus.STARTED), cursor=two | PipelineRunsFilter(statuses=[PipelineRunStatus.STARTED]), cursor=two | ||||
) | ) | ||||
assert len(cursor_two_runs) == 1 | assert len(cursor_two_runs) == 1 | ||||
assert {run.run_id for run in cursor_two_runs} == {one} | assert {run.run_id for run in cursor_two_runs} == {one} | ||||
cursor_one_runs = storage.get_runs( | cursor_one_runs = storage.get_runs( | ||||
PipelineRunsFilter(status=PipelineRunStatus.STARTED), cursor=one | PipelineRunsFilter(statuses=[PipelineRunStatus.STARTED]), cursor=one | ||||
) | ) | ||||
assert not cursor_one_runs | assert not cursor_one_runs | ||||
cursor_four_limit_one = storage.get_runs( | cursor_four_limit_one = storage.get_runs( | ||||
PipelineRunsFilter(status=PipelineRunStatus.STARTED), cursor=four, limit=1 | PipelineRunsFilter(statuses=[PipelineRunStatus.STARTED]), cursor=four, limit=1 | ||||
) | ) | ||||
assert len(cursor_four_limit_one) == 1 | assert len(cursor_four_limit_one) == 1 | ||||
assert cursor_four_limit_one[0].run_id == two | assert cursor_four_limit_one[0].run_id == two | ||||
def test_delete(self, storage): | def test_delete(self, storage): | ||||
assert storage | assert storage | ||||
run_id = make_new_run_id() | run_id = make_new_run_id() | ||||
storage.add_run(TestRunStorage.build_run(run_id=run_id, pipeline_name="some_pipeline")) | storage.add_run(TestRunStorage.build_run(run_id=run_id, pipeline_name="some_pipeline")) | ||||
▲ Show 20 Lines • Show All 262 Lines • ▼ Show 20 Lines | def test_fetch_run_groups_filter(self, storage): | ||||
tags={PARENT_RUN_ID_TAG: failed_run_id, ROOT_RUN_ID_TAG: root_run.run_id}, | tags={PARENT_RUN_ID_TAG: failed_run_id, ROOT_RUN_ID_TAG: root_run.run_id}, | ||||
) | ) | ||||
) | ) | ||||
for run in runs: | for run in runs: | ||||
storage.add_run(run) | storage.add_run(run) | ||||
run_groups = storage.get_run_groups( | run_groups = storage.get_run_groups( | ||||
limit=5, filters=PipelineRunsFilter(status=PipelineRunStatus.FAILURE) | limit=5, filters=PipelineRunsFilter(statuses=[PipelineRunStatus.FAILURE]) | ||||
) | ) | ||||
assert len(run_groups) == 3 | assert len(run_groups) == 3 | ||||
for root_run_id in run_groups: | for root_run_id in run_groups: | ||||
assert len(run_groups[root_run_id]["runs"]) == 2 | assert len(run_groups[root_run_id]["runs"]) == 2 | ||||
assert run_groups[root_run_id]["count"] == 5 | assert run_groups[root_run_id]["count"] == 5 | ||||
▲ Show 20 Lines • Show All 41 Lines • Show Last 20 Lines |