Differential D8675 Diff 40779 python_modules/dagster/dagster_tests/core_tests/snap_tests/test_pipeline_snap.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/core_tests/snap_tests/test_pipeline_snap.py
Show All 17 Lines | from dagster.core.snap import ( | ||||
PipelineSnapshot, | PipelineSnapshot, | ||||
SolidInvocationSnap, | SolidInvocationSnap, | ||||
create_pipeline_snapshot_id, | create_pipeline_snapshot_id, | ||||
snap_from_config_type, | snap_from_config_type, | ||||
) | ) | ||||
from dagster.core.snap.dep_snapshot import ( | from dagster.core.snap.dep_snapshot import ( | ||||
InputHandle, | InputHandle, | ||||
OutputHandleSnap, | OutputHandleSnap, | ||||
build_dep_structure_snapshot_from_icontains_solids, | build_dep_structure_snapshot_from_graph, | ||||
) | ) | ||||
from dagster.serdes import ( | from dagster.serdes import ( | ||||
deserialize_json_to_dagster_namedtuple, | deserialize_json_to_dagster_namedtuple, | ||||
serialize_dagster_namedtuple, | serialize_dagster_namedtuple, | ||||
serialize_pp, | serialize_pp, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | def test_noop_deps_snap(): | ||||
@solid | @solid | ||||
def noop_solid(_): | def noop_solid(_): | ||||
pass | pass | ||||
@pipeline | @pipeline | ||||
def noop_pipeline(): | def noop_pipeline(): | ||||
noop_solid() | noop_solid() | ||||
invocations = build_dep_structure_snapshot_from_icontains_solids( | invocations = build_dep_structure_snapshot_from_graph( | ||||
noop_pipeline.graph | noop_pipeline.graph | ||||
).solid_invocation_snaps | ).solid_invocation_snaps | ||||
assert len(invocations) == 1 | assert len(invocations) == 1 | ||||
assert isinstance(invocations[0], SolidInvocationSnap) | assert isinstance(invocations[0], SolidInvocationSnap) | ||||
def test_two_invocations_deps_snap(snapshot): | def test_two_invocations_deps_snap(snapshot): | ||||
@solid | @solid | ||||
def noop_solid(_): | def noop_solid(_): | ||||
pass | pass | ||||
@pipeline | @pipeline | ||||
def two_solid_pipeline(): | def two_solid_pipeline(): | ||||
noop_solid.alias("one")() | noop_solid.alias("one")() | ||||
noop_solid.alias("two")() | noop_solid.alias("two")() | ||||
index = DependencyStructureIndex( | index = DependencyStructureIndex( | ||||
build_dep_structure_snapshot_from_icontains_solids(two_solid_pipeline.graph) | build_dep_structure_snapshot_from_graph(two_solid_pipeline.graph) | ||||
) | ) | ||||
assert index.get_invocation("one") | assert index.get_invocation("one") | ||||
assert index.get_invocation("two") | assert index.get_invocation("two") | ||||
pipeline_snapshot = PipelineSnapshot.from_pipeline_def(two_solid_pipeline) | pipeline_snapshot = PipelineSnapshot.from_pipeline_def(two_solid_pipeline) | ||||
assert pipeline_snapshot == serialize_rt(pipeline_snapshot) | assert pipeline_snapshot == serialize_rt(pipeline_snapshot) | ||||
snapshot.assert_match(serialize_pp(pipeline_snapshot)) | snapshot.assert_match(serialize_pp(pipeline_snapshot)) | ||||
Show All 9 Lines | def test_basic_dep(): | ||||
def passthrough(_, value): | def passthrough(_, value): | ||||
return value | return value | ||||
@pipeline | @pipeline | ||||
def single_dep_pipeline(): | def single_dep_pipeline(): | ||||
passthrough(return_one()) | passthrough(return_one()) | ||||
index = DependencyStructureIndex( | index = DependencyStructureIndex( | ||||
build_dep_structure_snapshot_from_icontains_solids(single_dep_pipeline.graph) | build_dep_structure_snapshot_from_graph(single_dep_pipeline.graph) | ||||
) | ) | ||||
assert index.get_invocation("return_one") | assert index.get_invocation("return_one") | ||||
assert index.get_invocation("passthrough") | assert index.get_invocation("passthrough") | ||||
outputs = index.get_upstream_outputs("passthrough", "value") | outputs = index.get_upstream_outputs("passthrough", "value") | ||||
assert len(outputs) == 1 | assert len(outputs) == 1 | ||||
assert outputs[0].solid_name == "return_one" | assert outputs[0].solid_name == "return_one" | ||||
Show All 10 Lines | def passthrough(_, value): | ||||
return value | return value | ||||
@pipeline | @pipeline | ||||
def single_dep_pipeline(): | def single_dep_pipeline(): | ||||
return_one_result = return_one() | return_one_result = return_one() | ||||
passthrough.alias("passone")(return_one_result) | passthrough.alias("passone")(return_one_result) | ||||
passthrough.alias("passtwo")(return_one_result) | passthrough.alias("passtwo")(return_one_result) | ||||
dep_structure_snapshot = build_dep_structure_snapshot_from_icontains_solids( | dep_structure_snapshot = build_dep_structure_snapshot_from_graph(single_dep_pipeline.graph) | ||||
single_dep_pipeline.graph | |||||
) | |||||
index = DependencyStructureIndex(dep_structure_snapshot) | index = DependencyStructureIndex(dep_structure_snapshot) | ||||
assert index.get_invocation("return_one") | assert index.get_invocation("return_one") | ||||
assert index.get_invocation("passone") | assert index.get_invocation("passone") | ||||
assert index.get_invocation("passtwo") | assert index.get_invocation("passtwo") | ||||
assert index.get_upstream_output("passone", "value") == OutputHandleSnap("return_one", "result") | assert index.get_upstream_output("passone", "value") == OutputHandleSnap("return_one", "result") | ||||
assert index.get_upstream_output("passtwo", "value") == OutputHandleSnap("return_one", "result") | assert index.get_upstream_output("passtwo", "value") == OutputHandleSnap("return_one", "result") | ||||
Show All 27 Lines | def take_nothings(_): | ||||
return None | return None | ||||
@pipeline | @pipeline | ||||
def fan_in_test(): | def fan_in_test(): | ||||
take_nothings( | take_nothings( | ||||
[return_nothing.alias("nothing_one")(), return_nothing.alias("nothing_two")()] | [return_nothing.alias("nothing_one")(), return_nothing.alias("nothing_two")()] | ||||
) | ) | ||||
dep_structure_snapshot = build_dep_structure_snapshot_from_icontains_solids(fan_in_test.graph) | dep_structure_snapshot = build_dep_structure_snapshot_from_graph(fan_in_test.graph) | ||||
index = DependencyStructureIndex(dep_structure_snapshot) | index = DependencyStructureIndex(dep_structure_snapshot) | ||||
assert index.get_invocation("nothing_one") | assert index.get_invocation("nothing_one") | ||||
assert index.get_invocation("take_nothings") | assert index.get_invocation("take_nothings") | ||||
assert index.get_upstream_outputs("take_nothings", "nothing") == [ | assert index.get_upstream_outputs("take_nothings", "nothing") == [ | ||||
OutputHandleSnap("nothing_one", "result"), | OutputHandleSnap("nothing_one", "result"), | ||||
OutputHandleSnap("nothing_two", "result"), | OutputHandleSnap("nothing_two", "result"), | ||||
▲ Show 20 Lines • Show All 254 Lines • Show Last 20 Lines |