Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/solid_container.py
from abc import ABCMeta, abstractmethod, abstractproperty | from abc import ABCMeta, abstractmethod, abstractproperty | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import six | import six | ||||
from toposort import CircularDependencyError | |||||
from dagster import check | from dagster import check | ||||
from dagster.core.errors import DagsterInvalidDefinitionError | from dagster.core.errors import DagsterInvalidDefinitionError | ||||
from dagster.core.types.dagster_type import DagsterTypeKind | from dagster.core.types.dagster_type import DagsterTypeKind | ||||
from dagster.core.utils import toposort_flatten | |||||
from .dependency import DependencyStructure, IDependencyDefinition, Solid, SolidInvocation | from .dependency import DependencyStructure, IDependencyDefinition, Solid, SolidInvocation | ||||
class IContainSolids(six.with_metaclass(ABCMeta)): # pylint: disable=no-init | class IContainSolids(six.with_metaclass(ABCMeta)): # pylint: disable=no-init | ||||
@abstractproperty | @abstractproperty | ||||
def solids(self): | def solids(self): | ||||
"""List[Solid]: Top-level solids in the container.""" | """List[Solid]: Top-level solids in the container.""" | ||||
@abstractproperty | @abstractproperty | ||||
def dependency_structure(self): | def dependency_structure(self): | ||||
"""DependencyStructure: The dependencies between top-level solids in the container.""" | """DependencyStructure: The dependencies between top-level solids in the container.""" | ||||
@abstractmethod | @abstractmethod | ||||
def solid_named(self, name): | def solid_named(self, name): | ||||
"""Return the (top-level) solid with a given name. | """Return the (top-level) solid with a given name. | ||||
Args: | Args: | ||||
name (str): The name of the top level solid. | name (str): The name of the top level solid. | ||||
Returns: | Returns: | ||||
Solid: The solid with the given name | Solid: The solid with the given name | ||||
""" | """ | ||||
def _solids_in_topological_order(self): | |||||
_forward_edges, backward_edges = _create_adjacency_lists( | |||||
self.solids, self.dependency_structure | |||||
) | |||||
try: | |||||
order = toposort_flatten(backward_edges) | |||||
except CircularDependencyError as err: | |||||
six.raise_from( | |||||
DagsterInvalidDefinitionError(str(err)), err, | |||||
) | |||||
return [self.solid_named(solid_name) for solid_name in order] | |||||
def _create_adjacency_lists(solids, dep_structure): | |||||
check.list_param(solids, "solids", Solid) | |||||
check.inst_param(dep_structure, "dep_structure", DependencyStructure) | |||||
visit_dict = {s.name: False for s in solids} | |||||
forward_edges = {s.name: set() for s in solids} | |||||
backward_edges = {s.name: set() for s in solids} | |||||
def visit(solid_name): | |||||
if visit_dict[solid_name]: | |||||
return | |||||
visit_dict[solid_name] = True | |||||
for output_handle in dep_structure.all_upstream_outputs_from_solid(solid_name): | |||||
forward_node = output_handle.solid.name | |||||
backward_node = solid_name | |||||
if forward_node in forward_edges: | |||||
forward_edges[forward_node].add(backward_node) | |||||
backward_edges[backward_node].add(forward_node) | |||||
visit(forward_node) | |||||
for s in solids: | |||||
visit(s.name) | |||||
return (forward_edges, backward_edges) | |||||
def validate_dependency_dict(dependencies): | def validate_dependency_dict(dependencies): | ||||
prelude = ( | prelude = ( | ||||
'The expected type for "dependencies" is dict[Union[str, SolidInvocation], dict[str, ' | 'The expected type for "dependencies" is dict[Union[str, SolidInvocation], dict[str, ' | ||||
"DependencyDefinition]]. " | "DependencyDefinition]]. " | ||||
) | ) | ||||
if dependencies is None: | if dependencies is None: | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | for key, dep_dict in dependencies.items(): | ||||
"Received value {val} of type {type}.".format( | "Received value {val} of type {type}.".format( | ||||
key=key, input_key=input_key, val=dep, type=type(dep) | key=key, input_key=input_key, val=dep, type=type(dep) | ||||
) | ) | ||||
) | ) | ||||
return dependencies | return dependencies | ||||
def create_execution_structure(solid_defs, dependencies_dict, container_definition): | def create_execution_structure(solid_defs, dependencies_dict, graph_definition): | ||||
"""This builder takes the dependencies dictionary specified during creation of the | """This builder takes the dependencies dictionary specified during creation of the | ||||
PipelineDefinition object and builds (1) the execution structure and (2) a solid dependency | PipelineDefinition object and builds (1) the execution structure and (2) a solid dependency | ||||
dictionary. | dictionary. | ||||
For example, for the following dependencies: | For example, for the following dependencies: | ||||
dep_dict = { | dep_dict = { | ||||
SolidInvocation('giver'): {}, | SolidInvocation('giver'): {}, | ||||
Show All 25 Lines | pipeline_solid_dict = { | ||||
'sleeper_2': <dagster.core.definitions.dependency.Solid object>, | 'sleeper_2': <dagster.core.definitions.dependency.Solid object>, | ||||
'sleeper_3': <dagster.core.definitions.dependency.Solid object>, | 'sleeper_3': <dagster.core.definitions.dependency.Solid object>, | ||||
'sleeper_4': <dagster.core.definitions.dependency.Solid object>, | 'sleeper_4': <dagster.core.definitions.dependency.Solid object>, | ||||
'total': <dagster.core.definitions.dependency.Solid object> | 'total': <dagster.core.definitions.dependency.Solid object> | ||||
} | } | ||||
as well as a dagster.core.definitions.dependency.DependencyStructure object. | as well as a dagster.core.definitions.dependency.DependencyStructure object. | ||||
""" | """ | ||||
from .solid import ISolidDefinition, CompositeSolidDefinition | from .solid import NodeDefinition | ||||
from .graph import GraphDefinition | |||||
check.list_param(solid_defs, "solid_defs", of_type=ISolidDefinition) | check.list_param(solid_defs, "solid_defs", of_type=NodeDefinition) | ||||
check.dict_param( | check.dict_param( | ||||
dependencies_dict, | dependencies_dict, | ||||
"dependencies_dict", | "dependencies_dict", | ||||
key_type=six.string_types + (SolidInvocation,), | key_type=six.string_types + (SolidInvocation,), | ||||
value_type=dict, | value_type=dict, | ||||
) | ) | ||||
# container_definition is none in the context of a pipeline | # graph_definition is none in the context of a pipeline | ||||
check.opt_inst_param(container_definition, "container_definition", CompositeSolidDefinition) | check.inst_param(graph_definition, "graph_definition", GraphDefinition) | ||||
# Same as dep_dict but with SolidInvocation replaced by alias string | # Same as dep_dict but with SolidInvocation replaced by alias string | ||||
aliased_dependencies_dict = {} | aliased_dependencies_dict = {} | ||||
# Keep track of solid name -> all aliases used and alias -> name | # Keep track of solid name -> all aliases used and alias -> name | ||||
name_to_aliases = defaultdict(set) | name_to_aliases = defaultdict(set) | ||||
alias_to_solid_instance = {} | alias_to_solid_instance = {} | ||||
alias_to_name = {} | alias_to_name = {} | ||||
Show All 11 Lines | for solid_key, input_dep_dict in dependencies_dict.items(): | ||||
alias_to_name[alias] = solid_key.name | alias_to_name[alias] = solid_key.name | ||||
aliased_dependencies_dict[alias] = input_dep_dict | aliased_dependencies_dict[alias] = input_dep_dict | ||||
for dependency in input_dep_dict.values(): | for dependency in input_dep_dict.values(): | ||||
for dep in dependency.get_definitions(): | for dep in dependency.get_definitions(): | ||||
name_to_aliases[dep.solid].add(dep.solid) | name_to_aliases[dep.solid].add(dep.solid) | ||||
pipeline_solid_dict = _build_pipeline_solid_dict( | pipeline_solid_dict = _build_pipeline_solid_dict( | ||||
solid_defs, name_to_aliases, alias_to_solid_instance, container_definition | solid_defs, name_to_aliases, alias_to_solid_instance, graph_definition | ||||
) | ) | ||||
_validate_dependencies(aliased_dependencies_dict, pipeline_solid_dict, alias_to_name) | _validate_dependencies(aliased_dependencies_dict, pipeline_solid_dict, alias_to_name) | ||||
dependency_structure = DependencyStructure.from_definitions( | dependency_structure = DependencyStructure.from_definitions( | ||||
pipeline_solid_dict, aliased_dependencies_dict | pipeline_solid_dict, aliased_dependencies_dict | ||||
) | ) | ||||
return dependency_structure, pipeline_solid_dict | return dependency_structure, pipeline_solid_dict | ||||
def _build_pipeline_solid_dict( | def _build_pipeline_solid_dict( | ||||
solid_defs, name_to_aliases, alias_to_solid_instance, container_definition | solid_defs, name_to_aliases, alias_to_solid_instance, graph_definition | ||||
): | ): | ||||
pipeline_solids = [] | pipeline_solids = [] | ||||
for solid_def in solid_defs: | for solid_def in solid_defs: | ||||
uses_of_solid = name_to_aliases.get(solid_def.name, {solid_def.name}) | uses_of_solid = name_to_aliases.get(solid_def.name, {solid_def.name}) | ||||
for alias in uses_of_solid: | for alias in uses_of_solid: | ||||
solid_instance = alias_to_solid_instance.get(alias) | solid_instance = alias_to_solid_instance.get(alias) | ||||
solid_instance_tags = solid_instance.tags if solid_instance else {} | solid_instance_tags = solid_instance.tags if solid_instance else {} | ||||
hook_defs = solid_instance.hook_defs if solid_instance else frozenset() | hook_defs = solid_instance.hook_defs if solid_instance else frozenset() | ||||
pipeline_solids.append( | pipeline_solids.append( | ||||
Solid( | Solid( | ||||
name=alias, | name=alias, | ||||
definition=solid_def, | definition=solid_def, | ||||
container_definition=container_definition, | graph_definition=graph_definition, | ||||
tags=solid_instance_tags, | tags=solid_instance_tags, | ||||
hook_defs=hook_defs, | hook_defs=hook_defs, | ||||
) | ) | ||||
) | ) | ||||
return {ps.name: ps for ps in pipeline_solids} | return {ps.name: ps for ps in pipeline_solids} | ||||
▲ Show 20 Lines • Show All 83 Lines • Show Last 20 Lines |