Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/pipeline.py
import uuid | import uuid | ||||
import warnings | import warnings | ||||
import six | import six | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions.solid import NodeDefinition | |||||
from dagster.core.errors import ( | from dagster.core.errors import ( | ||||
DagsterInvalidDefinitionError, | DagsterInvalidDefinitionError, | ||||
DagsterInvalidSubsetError, | DagsterInvalidSubsetError, | ||||
DagsterInvariantViolationError, | DagsterInvariantViolationError, | ||||
) | ) | ||||
from dagster.core.types.dagster_type import DagsterTypeKind, construct_dagster_type_dictionary | from dagster.core.types.dagster_type import DagsterTypeKind, construct_dagster_type_dictionary | ||||
from dagster.core.utils import str_format_set | from dagster.core.utils import str_format_set | ||||
from dagster.utils.backcompat import experimental_arg_warning | |||||
from .dependency import ( | from .dependency import ( | ||||
DependencyDefinition, | DependencyDefinition, | ||||
MultiDependencyDefinition, | MultiDependencyDefinition, | ||||
SolidHandle, | SolidHandle, | ||||
SolidInvocation, | SolidInvocation, | ||||
) | ) | ||||
from .graph import GraphDefinition | |||||
from .hook import HookDefinition | from .hook import HookDefinition | ||||
from .mode import ModeDefinition | from .mode import ModeDefinition | ||||
from .preset import PresetDefinition | from .preset import PresetDefinition | ||||
from .solid import ISolidDefinition | from .solid import NodeDefinition | ||||
from .solid_container import IContainSolids, create_execution_structure, validate_dependency_dict | from .utils import validate_tags | ||||
from .utils import check_valid_name, validate_tags | |||||
def _check_solids_arg(pipeline_name, solid_defs): | |||||
if not isinstance(solid_defs, list): | |||||
raise DagsterInvalidDefinitionError( | |||||
'"solids" arg to pipeline "{name}" is not a list. Got {val}.'.format( | |||||
name=pipeline_name, val=repr(solid_defs) | |||||
) | |||||
) | |||||
for solid_def in solid_defs: | |||||
if isinstance(solid_def, ISolidDefinition): | |||||
continue | |||||
elif callable(solid_def): | |||||
raise DagsterInvalidDefinitionError( | |||||
"""You have passed a lambda or function {func} into pipeline {name} that is | |||||
not a solid. You have likely forgetten to annotate this function with | |||||
an @solid or @lambda_solid decorator.' | |||||
""".format( | |||||
name=pipeline_name, func=solid_def.__name__ | |||||
) | |||||
) | |||||
else: | |||||
raise DagsterInvalidDefinitionError( | |||||
"Invalid item in solid list: {item}".format(item=repr(solid_def)) | |||||
) | |||||
return solid_defs | |||||
def _anonymous_pipeline_name(): | def _anonymous_pipeline_name(): | ||||
return "__pipeline__" + str(uuid.uuid4()).replace("-", "") | return "__pipeline__" + str(uuid.uuid4()).replace("-", "") | ||||
class PipelineDefinition(IContainSolids): | class PipelineDefinition(GraphDefinition): | ||||
"""Defines a Dagster pipeline. | """Defines a Dagster pipeline. | ||||
A pipeline is made up of | A pipeline is made up of | ||||
- Solids, each of which is a single functional unit of data computation. | - Solids, each of which is a single functional unit of data computation. | ||||
- Dependencies, which determine how the values produced by solids as their outputs flow from | - Dependencies, which determine how the values produced by solids as their outputs flow from | ||||
one solid to another. This tells Dagster how to arrange solids, and potentially multiple | one solid to another. This tells Dagster how to arrange solids, and potentially multiple | ||||
aliased instances of solids, into a directed, acyclic graph (DAG) of compute. | aliased instances of solids, into a directed, acyclic graph (DAG) of compute. | ||||
▲ Show 20 Lines • Show All 79 Lines • ▼ Show 20 Lines | def __init__( | ||||
solid_defs, | solid_defs, | ||||
name=None, | name=None, | ||||
description=None, | description=None, | ||||
dependencies=None, | dependencies=None, | ||||
mode_defs=None, | mode_defs=None, | ||||
preset_defs=None, | preset_defs=None, | ||||
tags=None, | tags=None, | ||||
hook_defs=None, | hook_defs=None, | ||||
input_mappings=None, | |||||
output_mappings=None, | |||||
config_mapping=None, | |||||
positional_inputs=None, | |||||
_parent_pipeline_def=None, # https://github.com/dagster-io/dagster/issues/2115 | _parent_pipeline_def=None, # https://github.com/dagster-io/dagster/issues/2115 | ||||
_configured_config_mapping_fn=None, | |||||
_configured_config_schema=None, | |||||
): | ): | ||||
if not name: | if not name: | ||||
warnings.warn( | warnings.warn( | ||||
"Pipeline must have a name. Names will be required starting in 0.10.0 or later." | "Pipeline must have a name. Names will be required starting in 0.10.0 or later." | ||||
) | ) | ||||
name = _anonymous_pipeline_name() | |||||
# For these warnings they check truthiness because they get changed to [] higher | |||||
# in the stack for the decorator case | |||||
self._name = check_valid_name(name) if name else _anonymous_pipeline_name() | if input_mappings: | ||||
experimental_arg_warning("input_mappings", "PipelineDefinition") | |||||
self._description = check.opt_str_param(description, "description") | if output_mappings: | ||||
experimental_arg_warning("output_mappings", "PipelineDefinition") | |||||
if config_mapping is not None: | |||||
experimental_arg_warning("config_mapping", "PipelineDefinition") | |||||
if positional_inputs: | |||||
experimental_arg_warning("positional_inputs", "PipelineDefinition") | |||||
super(PipelineDefinition, self).__init__( | |||||
name=name, | |||||
description=description, | |||||
dependencies=dependencies, | |||||
node_defs=solid_defs, | |||||
tags=check.opt_dict_param(tags, "tags", key_type=str), | |||||
positional_inputs=positional_inputs, | |||||
input_mappings=input_mappings, | |||||
output_mappings=output_mappings, | |||||
config_mapping=config_mapping, | |||||
_configured_config_mapping_fn=_configured_config_mapping_fn, | |||||
_configured_config_schema=_configured_config_schema, | |||||
) | |||||
self._current_level_node_defs = solid_defs | |||||
self._tags = validate_tags(tags) | |||||
mode_definitions = check.opt_list_param(mode_defs, "mode_defs", of_type=ModeDefinition) | mode_definitions = check.opt_list_param(mode_defs, "mode_defs", of_type=ModeDefinition) | ||||
if not mode_definitions: | if not mode_definitions: | ||||
mode_definitions = [ModeDefinition()] | mode_definitions = [ModeDefinition()] | ||||
self._mode_definitions = mode_definitions | self._mode_definitions = mode_definitions | ||||
self._current_level_solid_defs = check.list_param( | |||||
_check_solids_arg(self._name, solid_defs), "solid_defs", of_type=ISolidDefinition | |||||
) | |||||
self._tags = validate_tags(tags) | |||||
seen_modes = set() | seen_modes = set() | ||||
for mode_def in mode_definitions: | for mode_def in mode_definitions: | ||||
if mode_def.name in seen_modes: | if mode_def.name in seen_modes: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
( | ( | ||||
'Two modes seen with the name "{mode_name}" in "{pipeline_name}". ' | 'Two modes seen with the name "{mode_name}" in "{pipeline_name}". ' | ||||
"Modes must have unique names." | "Modes must have unique names." | ||||
).format(mode_name=mode_def.name, pipeline_name=self._name) | ).format(mode_name=mode_def.name, pipeline_name=self._name) | ||||
) | ) | ||||
seen_modes.add(mode_def.name) | seen_modes.add(mode_def.name) | ||||
self._dependencies = validate_dependency_dict(dependencies) | self._dagster_type_dict = construct_dagster_type_dictionary(self._current_level_node_defs) | ||||
dependency_structure, solid_dict = create_execution_structure( | |||||
self._current_level_solid_defs, self._dependencies, container_definition=None | |||||
) | |||||
self._solid_dict = solid_dict | |||||
self._dependency_structure = dependency_structure | |||||
# eager toposort solids to detect cycles | |||||
self.solids_in_topological_order = self._solids_in_topological_order() | |||||
self._dagster_type_dict = construct_dagster_type_dictionary(self._current_level_solid_defs) | |||||
self._hook_defs = check.opt_set_param(hook_defs, "hook_defs", of_type=HookDefinition) | self._hook_defs = check.opt_set_param(hook_defs, "hook_defs", of_type=HookDefinition) | ||||
self._preset_defs = check.opt_list_param(preset_defs, "preset_defs", PresetDefinition) | self._preset_defs = check.opt_list_param(preset_defs, "preset_defs", PresetDefinition) | ||||
self._preset_dict = {} | self._preset_dict = {} | ||||
for preset in self._preset_defs: | for preset in self._preset_defs: | ||||
if preset.name in self._preset_dict: | if preset.name in self._preset_dict: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
Show All 9 Lines | ): | ||||
'references mode "{mode}" which is not defined.' | 'references mode "{mode}" which is not defined.' | ||||
).format(name=preset.name, pipeline_name=self._name, mode=preset.mode) | ).format(name=preset.name, pipeline_name=self._name, mode=preset.mode) | ||||
) | ) | ||||
self._preset_dict[preset.name] = preset | self._preset_dict[preset.name] = preset | ||||
# Validate solid resource dependencies | # Validate solid resource dependencies | ||||
_validate_resource_dependencies( | _validate_resource_dependencies( | ||||
self._mode_definitions, | self._mode_definitions, | ||||
self._current_level_solid_defs, | self._current_level_node_defs, | ||||
self._solid_dict, | self._solid_dict, | ||||
self._hook_defs, | self._hook_defs, | ||||
) | ) | ||||
# Validate unsatisfied inputs can be materialized from config | # Validate unsatisfied inputs can be materialized from config | ||||
_validate_inputs(self._dependency_structure, self._solid_dict) | _validate_inputs(self._dependency_structure, self._solid_dict) | ||||
self._all_solid_defs = _build_all_solid_defs(self._current_level_solid_defs) | # Recursively explore all nodes in the this pipeline | ||||
self._all_node_defs = _build_all_node_defs(self._current_level_node_defs) | |||||
self._parent_pipeline_def = check.opt_inst_param( | self._parent_pipeline_def = check.opt_inst_param( | ||||
_parent_pipeline_def, "_parent_pipeline_def", PipelineDefinition | _parent_pipeline_def, "_parent_pipeline_def", PipelineDefinition | ||||
) | ) | ||||
self._cached_run_config_schemas = {} | self._cached_run_config_schemas = {} | ||||
self._cached_external_pipeline = None | self._cached_external_pipeline = None | ||||
def construct_configured_copy( | |||||
self, | |||||
new_name, | |||||
new_description, | |||||
new_configured_config_schema, | |||||
new_configured_config_mapping_fn, | |||||
): | |||||
return PipelineDefinition( | |||||
solid_defs=self._solid_defs, | |||||
name=new_name, | |||||
description=new_description, | |||||
dependencies=self._dependencies, | |||||
mode_defs=self._mode_definitions, | |||||
preset_defs=self.preset_defs, | |||||
hook_defs=self.hook_defs, | |||||
input_mappings=self._input_mappings, | |||||
output_mappings=self._output_mappings, | |||||
config_mapping=self._config_mapping, | |||||
positional_inputs=self.positional_inputs, | |||||
_parent_pipeline_def=self._parent_pipeline_def, | |||||
_configured_config_schema=new_configured_config_schema, | |||||
_configured_config_mapping_fn=new_configured_config_mapping_fn, | |||||
) | |||||
def get_run_config_schema(self, mode=None): | def get_run_config_schema(self, mode=None): | ||||
check.str_param(mode, "mode") | check.str_param(mode, "mode") | ||||
mode_def = self.get_mode_definition(mode) | mode_def = self.get_mode_definition(mode) | ||||
if mode_def.name in self._cached_run_config_schemas: | if mode_def.name in self._cached_run_config_schemas: | ||||
return self._cached_run_config_schemas[mode_def.name] | return self._cached_run_config_schemas[mode_def.name] | ||||
self._cached_run_config_schemas[mode_def.name] = _create_run_config_schema(self, mode_def) | self._cached_run_config_schemas[mode_def.name] = _create_run_config_schema(self, mode_def) | ||||
return self._cached_run_config_schemas[mode_def.name] | return self._cached_run_config_schemas[mode_def.name] | ||||
@property | @property | ||||
def name(self): | |||||
return self._name | |||||
@property | |||||
def description(self): | |||||
return self._description | |||||
@property | |||||
def mode_definitions(self): | def mode_definitions(self): | ||||
return self._mode_definitions | return self._mode_definitions | ||||
@property | @property | ||||
def dependencies(self): | |||||
return self._dependencies | |||||
@property | |||||
def preset_defs(self): | def preset_defs(self): | ||||
return self._preset_defs | return self._preset_defs | ||||
def _get_mode_definition(self, mode): | def _get_mode_definition(self, mode): | ||||
check.str_param(mode, "mode") | check.str_param(mode, "mode") | ||||
for mode_definition in self._mode_definitions: | for mode_definition in self._mode_definitions: | ||||
if mode_definition.name == mode: | if mode_definition.name == mode: | ||||
return mode_definition | return mode_definition | ||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | def display_name(self): | ||||
is unnamed the method will return "<<unnamed>>". | is unnamed the method will return "<<unnamed>>". | ||||
""" | """ | ||||
return self._name if self._name else "<<unnamed>>" | return self._name if self._name else "<<unnamed>>" | ||||
@property | @property | ||||
def tags(self): | def tags(self): | ||||
return self._tags | return self._tags | ||||
@property | |||||
def solids(self): | |||||
"""List[Solid]: Top-level solids in the pipeline. | |||||
""" | |||||
return list(set(self._solid_dict.values())) | |||||
def has_solid_named(self, name): | |||||
"""Return whether or not there is a top level solid with this name in the pipeline | |||||
Args: | |||||
name (str): Name of solid | |||||
Returns: | |||||
bool: True if the solid is in the pipeline | |||||
""" | |||||
check.str_param(name, "name") | |||||
return name in self._solid_dict | |||||
def solid_named(self, name): | |||||
"""Return the top level solid named "name". Throws if it does not exist. | |||||
Args: | |||||
name (str): Name of solid | |||||
Returns: | |||||
Solid: | |||||
""" | |||||
check.str_param(name, "name") | |||||
check.invariant( | |||||
name in self._solid_dict, | |||||
"Pipeline {pipeline_name} has no solid named {name}.".format( | |||||
pipeline_name=self._name, name=name | |||||
), | |||||
) | |||||
return self._solid_dict[name] | |||||
def get_solid(self, handle): | |||||
"""Return the solid contained anywhere within the pipeline via its handle. | |||||
Args: | |||||
handle (SolidHandle): The solid's handle | |||||
Returns: | |||||
Solid: | |||||
""" | |||||
check.inst_param(handle, "handle", SolidHandle) | |||||
current = handle | |||||
lineage = [] | |||||
while current: | |||||
lineage.append(current.name) | |||||
current = current.parent | |||||
name = lineage.pop() | |||||
solid = self.solid_named(name) | |||||
while lineage: | |||||
name = lineage.pop() | |||||
solid = solid.definition.solid_named(name) | |||||
return solid | |||||
@property | |||||
def dependency_structure(self): | |||||
return self._dependency_structure | |||||
def has_dagster_type(self, name): | def has_dagster_type(self, name): | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
return name in self._dagster_type_dict | return name in self._dagster_type_dict | ||||
def dagster_type_named(self, name): | def dagster_type_named(self, name): | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
return self._dagster_type_dict[name] | return self._dagster_type_dict[name] | ||||
def all_dagster_types(self): | def all_dagster_types(self): | ||||
return self._dagster_type_dict.values() | return self._dagster_type_dict.values() | ||||
@property | @property | ||||
def all_solid_defs(self): | def all_solid_defs(self): | ||||
return list(self._all_solid_defs.values()) | return list(self._all_node_defs.values()) | ||||
@property | @property | ||||
def top_level_solid_defs(self): | def top_level_solid_defs(self): | ||||
return self._current_level_solid_defs | return self._current_level_node_defs | ||||
def solid_def_named(self, name): | def solid_def_named(self, name): | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
check.invariant(name in self._all_solid_defs, "{} not found".format(name)) | check.invariant(name in self._all_node_defs, "{} not found".format(name)) | ||||
return self._all_solid_defs[name] | return self._all_node_defs[name] | ||||
def has_solid_def(self, name): | def has_solid_def(self, name): | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
return name in self._all_solid_defs | return name in self._all_node_defs | ||||
def get_pipeline_subset_def(self, solids_to_execute): | def get_pipeline_subset_def(self, solids_to_execute): | ||||
return ( | return ( | ||||
self if solids_to_execute is None else _get_pipeline_subset_def(self, solids_to_execute) | self if solids_to_execute is None else _get_pipeline_subset_def(self, solids_to_execute) | ||||
) | ) | ||||
def get_presets(self): | def get_presets(self): | ||||
return list(self._preset_dict.values()) | return list(self._preset_dict.values()) | ||||
▲ Show 20 Lines • Show All 201 Lines • ▼ Show 20 Lines | except DagsterInvalidDefinitionError as exc: | ||||
solids_to_execute=str_format_set(solids_to_execute), | solids_to_execute=str_format_set(solids_to_execute), | ||||
pipeline_name=pipeline_def.name, | pipeline_name=pipeline_def.name, | ||||
) | ) | ||||
), | ), | ||||
exc, | exc, | ||||
) | ) | ||||
def _validate_resource_dependencies(mode_definitions, solid_defs, solid_dict, pipeline_hook_defs): | def _validate_resource_dependencies(mode_definitions, node_defs, solid_dict, pipeline_hook_defs): | ||||
"""This validation ensures that each pipeline context provides the resources that are required | """This validation ensures that each pipeline context provides the resources that are required | ||||
by each solid. | by each solid. | ||||
""" | """ | ||||
check.list_param(mode_definitions, "mode_definitions", of_type=ModeDefinition) | check.list_param(mode_definitions, "mode_definitions", of_type=ModeDefinition) | ||||
check.list_param(solid_defs, "solid_defs", of_type=ISolidDefinition) | check.list_param(node_defs, "node_defs", of_type=NodeDefinition) | ||||
check.set_param(pipeline_hook_defs, "pipeline_hook_defs", of_type=HookDefinition) | check.set_param(pipeline_hook_defs, "pipeline_hook_defs", of_type=HookDefinition) | ||||
for mode_def in mode_definitions: | for mode_def in mode_definitions: | ||||
mode_resources = set(mode_def.resource_defs.keys()) | mode_resources = set(mode_def.resource_defs.keys()) | ||||
for solid_def in solid_defs: | for node_def in node_defs: | ||||
for required_resource in solid_def.required_resource_keys: | for required_resource in node_def.required_resource_keys: | ||||
if required_resource not in mode_resources: | if required_resource not in mode_resources: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
( | ( | ||||
'Resource "{resource}" is required by solid def {solid_def_name}, but is not ' | 'Resource "{resource}" is required by solid def {node_def_name}, but is not ' | ||||
'provided by mode "{mode_name}".' | 'provided by mode "{mode_name}".' | ||||
).format( | ).format( | ||||
resource=required_resource, | resource=required_resource, | ||||
solid_def_name=solid_def.name, | node_def_name=node_def.name, | ||||
mode_name=mode_def.name, | mode_name=mode_def.name, | ||||
) | ) | ||||
) | ) | ||||
for system_storage_def in mode_def.system_storage_defs: | for system_storage_def in mode_def.system_storage_defs: | ||||
for required_resource in system_storage_def.required_resource_keys: | for required_resource in system_storage_def.required_resource_keys: | ||||
if required_resource not in mode_resources: | if required_resource not in mode_resources: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
( | ( | ||||
▲ Show 20 Lines • Show All 65 Lines • ▼ Show 20 Lines | for solid in solid_dict.values(): | ||||
' * connect "{input_name}" to the output of another solid\n'.format( | ' * connect "{input_name}" to the output of another solid\n'.format( | ||||
solid_name=solid.name, | solid_name=solid.name, | ||||
input_name=handle.input_def.name, | input_name=handle.input_def.name, | ||||
dagster_type=handle.input_def.dagster_type.display_name, | dagster_type=handle.input_def.dagster_type.display_name, | ||||
) | ) | ||||
) | ) | ||||
def _build_all_solid_defs(solid_defs): | def _build_all_node_defs(node_defs): | ||||
all_defs = {} | all_defs = {} | ||||
for current_level_solid_def in solid_defs: | for current_level_node_def in node_defs: | ||||
for solid_def in current_level_solid_def.iterate_solid_defs(): | for node_def in current_level_node_def.iterate_node_defs(): | ||||
if solid_def.name in all_defs: | if node_def.name in all_defs: | ||||
if all_defs[solid_def.name] != solid_def: | if all_defs[node_def.name] != node_def: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
'Detected conflicting solid definitions with the same name "{name}"'.format( | 'Detected conflicting solid definitions with the same name "{name}"'.format( | ||||
name=solid_def.name | name=node_def.name | ||||
) | ) | ||||
) | ) | ||||
else: | else: | ||||
all_defs[solid_def.name] = solid_def | all_defs[node_def.name] = node_def | ||||
return all_defs | return all_defs | ||||
def _create_run_config_schema(pipeline_def, mode_definition): | def _create_run_config_schema(pipeline_def, mode_definition): | ||||
from .environment_configs import ( | from .environment_configs import ( | ||||
EnvironmentClassCreationData, | EnvironmentClassCreationData, | ||||
construct_config_type_dictionary, | construct_config_type_dictionary, | ||||
Show All 23 Lines |