Differential D4920 Diff 24623 python_modules/dagster/dagster_tests/core_tests/test_pipeline_execution.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/core_tests/test_pipeline_execution.py
Show All 22 Lines | from dagster import ( | ||||
lambda_solid, | lambda_solid, | ||||
pipeline, | pipeline, | ||||
reexecute_pipeline, | reexecute_pipeline, | ||||
solid, | solid, | ||||
) | ) | ||||
from dagster.cli.workspace.load import location_handle_from_python_file | from dagster.cli.workspace.load import location_handle_from_python_file | ||||
from dagster.core.definitions import Solid | from dagster.core.definitions import Solid | ||||
from dagster.core.definitions.dependency import DependencyStructure | from dagster.core.definitions.dependency import DependencyStructure | ||||
from dagster.core.definitions.solid_container import _create_adjacency_lists | from dagster.core.definitions.graph import _create_adjacency_lists | ||||
from dagster.core.errors import DagsterInvalidSubsetError, DagsterInvariantViolationError | from dagster.core.errors import DagsterInvalidSubsetError, DagsterInvariantViolationError | ||||
from dagster.core.execution.results import SolidExecutionResult | from dagster.core.execution.results import SolidExecutionResult | ||||
from dagster.core.host_representation import RepositoryLocation, UserProcessApi | from dagster.core.host_representation import RepositoryLocation, UserProcessApi | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.test_utils import step_output_event_filter | from dagster.core.test_utils import step_output_event_filter | ||||
from dagster.core.utility_solids import ( | from dagster.core.utility_solids import ( | ||||
create_root_solid, | create_root_solid, | ||||
create_solid_with_deps, | create_solid_with_deps, | ||||
Show All 30 Lines | def compute(context, inputs): | ||||
result.extend(passed_rows) | result.extend(passed_rows) | ||||
result.append({context.solid.name: "compute_called"}) | result.append({context.solid.name: "compute_called"}) | ||||
return result | return result | ||||
return compute | return compute | ||||
def _do_construct(solids, dependencies): | def _do_construct(solids, dependencies): | ||||
solids = {s.name: Solid(name=s.name, definition=s) for s in solids} | pipeline_def = PipelineDefinition(name="test", solid_defs=solids, dependencies=dependencies) | ||||
solids = { | |||||
s.name: Solid(name=s.name, definition=s, graph_definition=pipeline_def) for s in solids | |||||
} | |||||
dependency_structure = DependencyStructure.from_definitions(solids, dependencies) | dependency_structure = DependencyStructure.from_definitions(solids, dependencies) | ||||
return _create_adjacency_lists(list(solids.values()), dependency_structure) | return _create_adjacency_lists(list(solids.values()), dependency_structure) | ||||
def test_empty_adjaceny_lists(): | def test_empty_adjaceny_lists(): | ||||
solids = [create_root_solid("a_node")] | solids = [create_root_solid("a_node")] | ||||
forward_edges, backwards_edges = _do_construct(solids, {}) | forward_edges, backwards_edges = _do_construct(solids, {}) | ||||
assert forward_edges == {"a_node": set()} | assert forward_edges == {"a_node": set()} | ||||
▲ Show 20 Lines • Show All 987 Lines • Show Last 20 Lines |