Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/pipeline.py
import uuid | import uuid | ||||
import warnings | import warnings | ||||
import six | import six | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions.solid import ISolidDefinition | |||||
from dagster.core.errors import ( | from dagster.core.errors import ( | ||||
DagsterInvalidDefinitionError, | DagsterInvalidDefinitionError, | ||||
DagsterInvalidSubsetError, | DagsterInvalidSubsetError, | ||||
DagsterInvariantViolationError, | DagsterInvariantViolationError, | ||||
) | ) | ||||
from dagster.core.types.dagster_type import DagsterTypeKind, construct_dagster_type_dictionary | from dagster.core.types.dagster_type import DagsterTypeKind, construct_dagster_type_dictionary | ||||
from dagster.core.utils import str_format_set | from dagster.core.utils import str_format_set | ||||
from .dependency import ( | from .dependency import ( | ||||
DependencyDefinition, | DependencyDefinition, | ||||
MultiDependencyDefinition, | MultiDependencyDefinition, | ||||
SolidHandle, | SolidHandle, | ||||
SolidInvocation, | SolidInvocation, | ||||
) | ) | ||||
from .graph import GraphDefinition | |||||
from .hook import HookDefinition | from .hook import HookDefinition | ||||
from .mode import ModeDefinition | from .mode import ModeDefinition | ||||
from .preset import PresetDefinition | from .preset import PresetDefinition | ||||
from .solid import ISolidDefinition | from .solid import ISolidDefinition | ||||
from .solid_container import IContainSolids, create_execution_structure, validate_dependency_dict | from .utils import validate_tags | ||||
from .utils import check_valid_name, validate_tags | |||||
def _check_solids_arg(pipeline_name, solid_defs): | def _check_solids_arg(pipeline_name, solid_defs): | ||||
if not isinstance(solid_defs, list): | if not isinstance(solid_defs, list): | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
'"solids" arg to pipeline "{name}" is not a list. Got {val}.'.format( | '"solids" arg to pipeline "{name}" is not a list. Got {val}.'.format( | ||||
name=pipeline_name, val=repr(solid_defs) | name=pipeline_name, val=repr(solid_defs) | ||||
) | ) | ||||
Show All 17 Lines | def _check_solids_arg(pipeline_name, solid_defs): | ||||
return solid_defs | return solid_defs | ||||
def _anonymous_pipeline_name(): | def _anonymous_pipeline_name(): | ||||
return "__pipeline__" + str(uuid.uuid4()).replace("-", "") | return "__pipeline__" + str(uuid.uuid4()).replace("-", "") | ||||
class PipelineDefinition(IContainSolids): | class PipelineDefinition(GraphDefinition): | ||||
"""Defines a Dagster pipeline. | """Defines a Dagster pipeline. | ||||
A pipeline is made up of | A pipeline is made up of | ||||
- Solids, each of which is a single functional unit of data computation. | - Solids, each of which is a single functional unit of data computation. | ||||
- Dependencies, which determine how the values produced by solids as their outputs flow from | - Dependencies, which determine how the values produced by solids as their outputs flow from | ||||
one solid to another. This tells Dagster how to arrange solids, and potentially multiple | one solid to another. This tells Dagster how to arrange solids, and potentially multiple | ||||
aliased instances of solids, into a directed, acyclic graph (DAG) of compute. | aliased instances of solids, into a directed, acyclic graph (DAG) of compute. | ||||
▲ Show 20 Lines • Show All 79 Lines • ▼ Show 20 Lines | def __init__( | ||||
solid_defs, | solid_defs, | ||||
name=None, | name=None, | ||||
description=None, | description=None, | ||||
dependencies=None, | dependencies=None, | ||||
mode_defs=None, | mode_defs=None, | ||||
preset_defs=None, | preset_defs=None, | ||||
tags=None, | tags=None, | ||||
hook_defs=None, | hook_defs=None, | ||||
input_mappings=None, | |||||
output_mappings=None, | |||||
config_mapping=None, | |||||
positional_inputs=None, | |||||
_parent_pipeline_def=None, # https://github.com/dagster-io/dagster/issues/2115 | _parent_pipeline_def=None, # https://github.com/dagster-io/dagster/issues/2115 | ||||
_configured_config_mapping_fn=None, | |||||
_configured_config_schema=None, | |||||
): | ): | ||||
if not name: | if not name: | ||||
warnings.warn( | warnings.warn( | ||||
"Pipeline must have a name. Names will be required starting in 0.10.0 or later." | "Pipeline must have a name. Names will be required starting in 0.10.0 or later." | ||||
) | ) | ||||
name = _anonymous_pipeline_name() | |||||
self._name = check_valid_name(name) if name else _anonymous_pipeline_name() | super(PipelineDefinition, self).__init__( | ||||
name=name, | |||||
description=description, | |||||
dependencies=dependencies, | |||||
solid_defs=solid_defs, | |||||
tags=check.opt_dict_param(tags, "tags", key_type=str), | |||||
positional_inputs=positional_inputs, | |||||
input_mappings=input_mappings, | |||||
output_mappings=output_mappings, | |||||
config_mapping=config_mapping, | |||||
_configured_config_mapping_fn=_configured_config_mapping_fn, | |||||
_configured_config_schema=_configured_config_schema, | |||||
) | |||||
self._description = check.opt_str_param(description, "description") | self._current_level_solid_defs = check.list_param( | ||||
_check_solids_arg(self._name, solid_defs), "solid_defs", of_type=ISolidDefinition | |||||
) | |||||
self._tags = validate_tags(tags) | |||||
mode_definitions = check.opt_list_param(mode_defs, "mode_defs", of_type=ModeDefinition) | mode_definitions = check.opt_list_param(mode_defs, "mode_defs", of_type=ModeDefinition) | ||||
if not mode_definitions: | if not mode_definitions: | ||||
mode_definitions = [ModeDefinition()] | mode_definitions = [ModeDefinition()] | ||||
self._mode_definitions = mode_definitions | self._mode_definitions = mode_definitions | ||||
self._current_level_solid_defs = check.list_param( | |||||
_check_solids_arg(self._name, solid_defs), "solid_defs", of_type=ISolidDefinition | |||||
) | |||||
self._tags = validate_tags(tags) | |||||
seen_modes = set() | seen_modes = set() | ||||
for mode_def in mode_definitions: | for mode_def in mode_definitions: | ||||
if mode_def.name in seen_modes: | if mode_def.name in seen_modes: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
( | ( | ||||
'Two modes seen with the name "{mode_name}" in "{pipeline_name}". ' | 'Two modes seen with the name "{mode_name}" in "{pipeline_name}". ' | ||||
"Modes must have unique names." | "Modes must have unique names." | ||||
).format(mode_name=mode_def.name, pipeline_name=self._name) | ).format(mode_name=mode_def.name, pipeline_name=self._name) | ||||
) | ) | ||||
seen_modes.add(mode_def.name) | seen_modes.add(mode_def.name) | ||||
self._dependencies = validate_dependency_dict(dependencies) | |||||
dependency_structure, solid_dict = create_execution_structure( | |||||
self._current_level_solid_defs, self._dependencies, container_definition=None | |||||
) | |||||
self._solid_dict = solid_dict | |||||
self._dependency_structure = dependency_structure | |||||
# eager toposort solids to detect cycles | |||||
self.solids_in_topological_order = self._solids_in_topological_order() | |||||
self._dagster_type_dict = construct_dagster_type_dictionary(self._current_level_solid_defs) | self._dagster_type_dict = construct_dagster_type_dictionary(self._current_level_solid_defs) | ||||
self._hook_defs = check.opt_set_param(hook_defs, "hook_defs", of_type=HookDefinition) | self._hook_defs = check.opt_set_param(hook_defs, "hook_defs", of_type=HookDefinition) | ||||
self._preset_defs = check.opt_list_param(preset_defs, "preset_defs", PresetDefinition) | self._preset_defs = check.opt_list_param(preset_defs, "preset_defs", PresetDefinition) | ||||
self._preset_dict = {} | self._preset_dict = {} | ||||
for preset in self._preset_defs: | for preset in self._preset_defs: | ||||
if preset.name in self._preset_dict: | if preset.name in self._preset_dict: | ||||
Show All 25 Lines | ): | ||||
self._all_solid_defs = _build_all_solid_defs(self._current_level_solid_defs) | self._all_solid_defs = _build_all_solid_defs(self._current_level_solid_defs) | ||||
self._parent_pipeline_def = check.opt_inst_param( | self._parent_pipeline_def = check.opt_inst_param( | ||||
_parent_pipeline_def, "_parent_pipeline_def", PipelineDefinition | _parent_pipeline_def, "_parent_pipeline_def", PipelineDefinition | ||||
) | ) | ||||
self._cached_run_config_schemas = {} | self._cached_run_config_schemas = {} | ||||
self._cached_external_pipeline = None | self._cached_external_pipeline = None | ||||
def construct_configured_copy( | |||||
self, | |||||
new_name, | |||||
new_description, | |||||
new_configured_config_schema, | |||||
new_configured_config_mapping_fn, | |||||
): | |||||
return PipelineDefinition( | |||||
solid_defs=self._solid_defs, | |||||
name=new_name, | |||||
description=new_description, | |||||
dependencies=self._dependencies, | |||||
mode_defs=self._mode_definitions, | |||||
preset_defs=self.preset_defs, | |||||
hook_defs=self.hook_defs, | |||||
input_mappings=self._input_mappings, | |||||
output_mappings=self._output_mappings, | |||||
config_mapping=self._config_mapping, | |||||
positional_inputs=self.positional_inputs, | |||||
_parent_pipeline_def=self._parent_pipeline_def, | |||||
_configured_config_schema=new_configured_config_schema, | |||||
_configured_config_mapping_fn=new_configured_config_mapping_fn, | |||||
schrockn: me too | |||||
) | |||||
Not Done Inline Actionsalways worry about these type of callsites missing new args when they get added alangenfeld: always worry about these type of callsites missing new args when they get added | |||||
def get_run_config_schema(self, mode=None): | def get_run_config_schema(self, mode=None): | ||||
check.str_param(mode, "mode") | check.str_param(mode, "mode") | ||||
mode_def = self.get_mode_definition(mode) | mode_def = self.get_mode_definition(mode) | ||||
if mode_def.name in self._cached_run_config_schemas: | if mode_def.name in self._cached_run_config_schemas: | ||||
return self._cached_run_config_schemas[mode_def.name] | return self._cached_run_config_schemas[mode_def.name] | ||||
self._cached_run_config_schemas[mode_def.name] = _create_run_config_schema(self, mode_def) | self._cached_run_config_schemas[mode_def.name] = _create_run_config_schema(self, mode_def) | ||||
return self._cached_run_config_schemas[mode_def.name] | return self._cached_run_config_schemas[mode_def.name] | ||||
@property | @property | ||||
def name(self): | |||||
return self._name | |||||
@property | |||||
def description(self): | |||||
return self._description | |||||
@property | |||||
def mode_definitions(self): | def mode_definitions(self): | ||||
return self._mode_definitions | return self._mode_definitions | ||||
@property | @property | ||||
def dependencies(self): | |||||
return self._dependencies | |||||
@property | |||||
def preset_defs(self): | def preset_defs(self): | ||||
return self._preset_defs | return self._preset_defs | ||||
def _get_mode_definition(self, mode): | def _get_mode_definition(self, mode): | ||||
check.str_param(mode, "mode") | check.str_param(mode, "mode") | ||||
for mode_definition in self._mode_definitions: | for mode_definition in self._mode_definitions: | ||||
if mode_definition.name == mode: | if mode_definition.name == mode: | ||||
return mode_definition | return mode_definition | ||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | def display_name(self): | ||||
is unnamed the method will return "<<unnamed>>". | is unnamed the method will return "<<unnamed>>". | ||||
""" | """ | ||||
return self._name if self._name else "<<unnamed>>" | return self._name if self._name else "<<unnamed>>" | ||||
@property | @property | ||||
def tags(self): | def tags(self): | ||||
return self._tags | return self._tags | ||||
@property | |||||
def solids(self): | |||||
"""List[Solid]: Top-level solids in the pipeline. | |||||
""" | |||||
return list(set(self._solid_dict.values())) | |||||
def has_solid_named(self, name): | |||||
"""Return whether or not there is a top level solid with this name in the pipeline | |||||
Args: | |||||
name (str): Name of solid | |||||
Returns: | |||||
bool: True if the solid is in the pipeline | |||||
""" | |||||
check.str_param(name, "name") | |||||
return name in self._solid_dict | |||||
def solid_named(self, name): | |||||
"""Return the top level solid named "name". Throws if it does not exist. | |||||
Args: | |||||
name (str): Name of solid | |||||
Returns: | |||||
Solid: | |||||
""" | |||||
check.str_param(name, "name") | |||||
check.invariant( | |||||
name in self._solid_dict, | |||||
"Pipeline {pipeline_name} has no solid named {name}.".format( | |||||
pipeline_name=self._name, name=name | |||||
), | |||||
) | |||||
return self._solid_dict[name] | |||||
def get_solid(self, handle): | |||||
"""Return the solid contained anywhere within the pipeline via its handle. | |||||
Args: | |||||
handle (SolidHandle): The solid's handle | |||||
Returns: | |||||
Solid: | |||||
""" | |||||
check.inst_param(handle, "handle", SolidHandle) | |||||
current = handle | |||||
lineage = [] | |||||
while current: | |||||
lineage.append(current.name) | |||||
current = current.parent | |||||
name = lineage.pop() | |||||
solid = self.solid_named(name) | |||||
while lineage: | |||||
name = lineage.pop() | |||||
solid = solid.definition.solid_named(name) | |||||
return solid | |||||
@property | |||||
def dependency_structure(self): | |||||
return self._dependency_structure | |||||
def has_dagster_type(self, name): | def has_dagster_type(self, name): | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
return name in self._dagster_type_dict | return name in self._dagster_type_dict | ||||
def dagster_type_named(self, name): | def dagster_type_named(self, name): | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
return self._dagster_type_dict[name] | return self._dagster_type_dict[name] | ||||
▲ Show 20 Lines • Show All 369 Lines • Show Last 20 Lines |
me too