Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/composition.py
import warnings | import warnings | ||||
from collections import namedtuple | from collections import namedtuple | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError | from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError | ||||
from dagster.utils import frozentags | from dagster.utils import frozentags | ||||
from .config import ConfigMapping | |||||
from .decorators.solid import validate_solid_fn | |||||
from .dependency import DependencyDefinition, MultiDependencyDefinition, SolidInvocation | from .dependency import DependencyDefinition, MultiDependencyDefinition, SolidInvocation | ||||
from .hook import HookDefinition | from .hook import HookDefinition | ||||
from .inference import ( | |||||
has_explicit_return_type, | |||||
infer_input_definitions_for_graph, | |||||
infer_output_definitions, | |||||
) | |||||
from .output import OutputDefinition | from .output import OutputDefinition | ||||
from .solid import ISolidDefinition | from .solid import NodeDefinition | ||||
from .utils import validate_tags | from .utils import validate_tags | ||||
_composition_stack = [] | _composition_stack = [] | ||||
def _not_invoked_warning(solid, context_source, context_name): | def _not_invoked_warning(solid, context_source, context_name): | ||||
check.inst_param(solid, "solid", CallableSolidNode) | check.inst_param(solid, "solid", CallableNode) | ||||
warning_message = ( | warning_message = ( | ||||
"While in {context} context '{name}', received an uninvoked solid '{solid_name}'.\n" | "While in {context} context '{name}', received an uninvoked solid '{solid_name}'.\n" | ||||
) | ) | ||||
if solid.given_alias: | if solid.given_alias: | ||||
warning_message += "'{solid_name}' was aliased as '{given_alias}'.\n" | warning_message += "'{solid_name}' was aliased as '{given_alias}'.\n" | ||||
if solid.tags: | if solid.tags: | ||||
warning_message += "Provided tags: {tags}.\n" | warning_message += "Provided tags: {tags}.\n" | ||||
if solid.hook_defs: | if solid.hook_defs: | ||||
warning_message += "Provided hook definitions: {hooks}.\n" | warning_message += "Provided hook definitions: {hooks}.\n" | ||||
warning_message = warning_message.format( | warning_message = warning_message.format( | ||||
context=context_source, | context=context_source, | ||||
name=context_name, | name=context_name, | ||||
solid_name=solid.solid_def.name, | solid_name=solid.node_def.name, | ||||
given_alias=solid.given_alias, | given_alias=solid.given_alias, | ||||
tags=solid.tags, | tags=solid.tags, | ||||
hooks=[hook.name for hook in solid.hook_defs], | hooks=[hook.name for hook in solid.hook_defs], | ||||
) | ) | ||||
warnings.warn(warning_message.strip()) | warnings.warn(warning_message.strip()) | ||||
Show All 30 Lines | class InProgressCompositionContext(object): | ||||
def __init__(self, name, source): | def __init__(self, name, source): | ||||
self.name = check.str_param(name, "name") | self.name = check.str_param(name, "name") | ||||
self.source = check.str_param(source, "source") | self.source = check.str_param(source, "source") | ||||
self._invocations = {} | self._invocations = {} | ||||
self._collisions = {} | self._collisions = {} | ||||
self._pending_invocations = {} | self._pending_invocations = {} | ||||
def observe_invocation( | def observe_invocation( | ||||
self, given_alias, solid_def, input_bindings, input_mappings, tags=None, hook_defs=None | self, given_alias, node_def, input_bindings, input_mappings, tags=None, hook_defs=None | ||||
): | ): | ||||
if given_alias is None: | if given_alias is None: | ||||
solid_name = solid_def.name | node_name = node_def.name | ||||
self._pending_invocations.pop(solid_name, None) | self._pending_invocations.pop(node_name, None) | ||||
if self._collisions.get(solid_name): | if self._collisions.get(node_name): | ||||
self._collisions[solid_name] += 1 | self._collisions[node_name] += 1 | ||||
solid_name = "{solid_name}_{n}".format( | node_name = "{node_name}_{n}".format( | ||||
solid_name=solid_name, n=self._collisions[solid_name] | node_name=node_name, n=self._collisions[node_name] | ||||
) | ) | ||||
else: | else: | ||||
self._collisions[solid_name] = 1 | self._collisions[node_name] = 1 | ||||
else: | else: | ||||
solid_name = given_alias | node_name = given_alias | ||||
self._pending_invocations.pop(solid_name, None) | self._pending_invocations.pop(node_name, None) | ||||
if self._invocations.get(solid_name): | if self._invocations.get(node_name): | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
"{source} {name} invoked the same solid ({solid_name}) twice without aliasing.".format( | "{source} {name} invoked the same node ({node_name}) twice without aliasing.".format( | ||||
source=self.source, name=self.name, solid_name=solid_name | source=self.source, name=self.name, node_name=node_name | ||||
) | ) | ||||
) | ) | ||||
self._invocations[solid_name] = InvokedSolidNode( | self._invocations[node_name] = InvokedNode( | ||||
solid_name, solid_def, input_bindings, input_mappings, tags, hook_defs | node_name, node_def, input_bindings, input_mappings, tags, hook_defs | ||||
) | ) | ||||
return solid_name | return node_name | ||||
def add_pending_invocation(self, solid): | def add_pending_invocation(self, solid): | ||||
solid = check.opt_inst_param(solid, "solid", CallableSolidNode) | solid = check.opt_inst_param(solid, "solid", CallableNode) | ||||
solid_name = solid.given_alias if solid.given_alias else solid.solid_def.name | solid_name = solid.given_alias if solid.given_alias else solid.node_def.name | ||||
self._pending_invocations[solid_name] = solid | self._pending_invocations[solid_name] = solid | ||||
def complete(self, output): | def complete(self, output): | ||||
return CompleteCompositionContext( | return CompleteCompositionContext( | ||||
self.name, | self.name, | ||||
self.source, | self.source, | ||||
self._invocations, | self._invocations, | ||||
check.opt_dict_param(output, "output"), | check.opt_dict_param(output, "output"), | ||||
self._pending_invocations, | self._pending_invocations, | ||||
) | ) | ||||
class CompleteCompositionContext( | class CompleteCompositionContext( | ||||
namedtuple( | namedtuple( | ||||
"_CompositionContext", "name solid_defs dependencies input_mappings output_mapping_dict" | "_CompositionContext", "name solid_defs dependencies input_mappings output_mapping_dict" | ||||
) | ) | ||||
): | ): | ||||
"""The processed information from capturing solid invocations during a composition function. | """The processed information from capturing solid invocations during a composition function. | ||||
""" | """ | ||||
def __new__(cls, name, source, invocations, output_mapping_dict, pending_invocations): | def __new__(cls, name, source, invocations, output_mapping_dict, pending_invocations): | ||||
dep_dict = {} | dep_dict = {} | ||||
solid_def_dict = {} | node_def_dict = {} | ||||
input_mappings = [] | input_mappings = [] | ||||
for solid in pending_invocations.values(): | for solid in pending_invocations.values(): | ||||
_not_invoked_warning(solid, source, name) | _not_invoked_warning(solid, source, name) | ||||
for invocation in invocations.values(): | for invocation in invocations.values(): | ||||
def_name = invocation.solid_def.name | def_name = invocation.node_def.name | ||||
if def_name in solid_def_dict and solid_def_dict[def_name] is not invocation.solid_def: | if def_name in node_def_dict and node_def_dict[def_name] is not invocation.node_def: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
'Detected conflicting solid definitions with the same name "{name}"'.format( | 'Detected conflicting solid definitions with the same name "{name}"'.format( | ||||
name=def_name | name=def_name | ||||
) | ) | ||||
) | ) | ||||
solid_def_dict[def_name] = invocation.solid_def | node_def_dict[def_name] = invocation.node_def | ||||
deps = {} | deps = {} | ||||
for input_name, node in invocation.input_bindings.items(): | for input_name, node in invocation.input_bindings.items(): | ||||
if isinstance(node, InvokedSolidOutputHandle): | if isinstance(node, InvokedSolidOutputHandle): | ||||
deps[input_name] = DependencyDefinition(node.solid_name, node.output_name) | deps[input_name] = DependencyDefinition(node.solid_name, node.output_name) | ||||
elif isinstance(node, list) and all( | elif isinstance(node, list) and all( | ||||
map(lambda item: isinstance(item, InvokedSolidOutputHandle), node) | map(lambda item: isinstance(item, InvokedSolidOutputHandle), node) | ||||
): | ): | ||||
deps[input_name] = MultiDependencyDefinition( | deps[input_name] = MultiDependencyDefinition( | ||||
[DependencyDefinition(call.solid_name, call.output_name) for call in node] | [DependencyDefinition(call.solid_name, call.output_name) for call in node] | ||||
) | ) | ||||
else: | else: | ||||
check.failed("Unexpected input binding - got {node}".format(node=node)) | check.failed("Unexpected input binding - got {node}".format(node=node)) | ||||
dep_dict[ | dep_dict[ | ||||
SolidInvocation( | SolidInvocation( | ||||
invocation.solid_def.name, | invocation.node_def.name, | ||||
invocation.solid_name, | invocation.node_name, | ||||
tags=invocation.tags, | tags=invocation.tags, | ||||
hook_defs=invocation.hook_defs, | hook_defs=invocation.hook_defs, | ||||
) | ) | ||||
] = deps | ] = deps | ||||
for input_name, node in invocation.input_mappings.items(): | for input_name, node in invocation.input_mappings.items(): | ||||
input_mappings.append(node.input_def.mapping_to(invocation.solid_name, input_name)) | input_mappings.append(node.input_def.mapping_to(invocation.node_name, input_name)) | ||||
return super(cls, CompleteCompositionContext).__new__( | return super(cls, CompleteCompositionContext).__new__( | ||||
cls, name, list(solid_def_dict.values()), dep_dict, input_mappings, output_mapping_dict | cls, name, list(node_def_dict.values()), dep_dict, input_mappings, output_mapping_dict | ||||
) | ) | ||||
class CallableSolidNode(object): | class CallableNode(object): | ||||
"""An intermediate object in solid composition to allow for binding information such as | """An intermediate object in composition to allow for binding information such as | ||||
an alias before invoking. | an alias before invoking. | ||||
""" | """ | ||||
def __init__(self, solid_def, given_alias=None, tags=None, hook_defs=None): | def __init__(self, node_def, given_alias=None, tags=None, hook_defs=None): | ||||
self.solid_def = solid_def | self.node_def = check.inst_param(node_def, "node_def", NodeDefinition) | ||||
self.given_alias = check.opt_str_param(given_alias, "given_alias") | self.given_alias = check.opt_str_param(given_alias, "given_alias") | ||||
self.tags = check.opt_inst_param(tags, "tags", frozentags) | self.tags = check.opt_inst_param(tags, "tags", frozentags) | ||||
self.hook_defs = check.opt_set_param(hook_defs, "hook_defs", HookDefinition) | self.hook_defs = check.opt_set_param(hook_defs, "hook_defs", HookDefinition) | ||||
if _is_in_composition(): | if _is_in_composition(): | ||||
current_context().add_pending_invocation(self) | current_context().add_pending_invocation(self) | ||||
def __call__(self, *args, **kwargs): | def __call__(self, *args, **kwargs): | ||||
solid_name = self.given_alias if self.given_alias else self.solid_def.name | node_name = self.given_alias if self.given_alias else self.node_def.name | ||||
assert_in_composition(solid_name) | assert_in_composition(node_name) | ||||
input_bindings = {} | input_bindings = {} | ||||
input_mappings = {} | input_mappings = {} | ||||
# handle *args | # handle *args | ||||
for idx, output_node in enumerate(args): | for idx, output_node in enumerate(args): | ||||
if idx >= len(self.solid_def.input_defs): | if idx >= len(self.node_def.input_defs): | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
"In {source} {name}, received too many inputs for solid " | "In {source} {name}, received too many inputs for " | ||||
"invocation {solid_name}. Only {def_num} defined, received {arg_num}".format( | "invocation {node_name}. Only {def_num} defined, received {arg_num}".format( | ||||
source=current_context().source, | source=current_context().source, | ||||
name=current_context().name, | name=current_context().name, | ||||
solid_name=solid_name, | node_name=node_name, | ||||
def_num=len(self.solid_def.input_defs), | def_num=len(self.node_def.input_defs), | ||||
arg_num=len(args), | arg_num=len(args), | ||||
) | ) | ||||
) | ) | ||||
input_name = self.solid_def.resolve_input_name_at_position(idx) | input_name = self.node_def.resolve_input_name_at_position(idx) | ||||
if input_name is None: | if input_name is None: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
"In {source} {name}, could not resolve input based on position at " | "In {source} {name}, could not resolve input based on position at " | ||||
"index {idx} for solid invocation {solid_name}. Use keyword args instead, " | "index {idx} for invocation {node_name}. Use keyword args instead, " | ||||
"available inputs are: {inputs}".format( | "available inputs are: {inputs}".format( | ||||
idx=idx, | idx=idx, | ||||
source=current_context().source, | source=current_context().source, | ||||
name=current_context().name, | name=current_context().name, | ||||
solid_name=solid_name, | node_name=node_name, | ||||
inputs=list(map(lambda inp: inp.name, self.solid_def.input_defs)), | inputs=list(map(lambda inp: inp.name, self.node_def.input_defs)), | ||||
) | ) | ||||
) | ) | ||||
self._process_argument_node( | self._process_argument_node( | ||||
solid_name, | node_name, | ||||
output_node, | output_node, | ||||
input_name, | input_name, | ||||
input_mappings, | input_mappings, | ||||
input_bindings, | input_bindings, | ||||
"(at position {idx})".format(idx=idx), | "(at position {idx})".format(idx=idx), | ||||
) | ) | ||||
# then **kwargs | # then **kwargs | ||||
for input_name, output_node in kwargs.items(): | for input_name, output_node in kwargs.items(): | ||||
self._process_argument_node( | self._process_argument_node( | ||||
solid_name, | node_name, | ||||
output_node, | output_node, | ||||
input_name, | input_name, | ||||
input_mappings, | input_mappings, | ||||
input_bindings, | input_bindings, | ||||
"(passed by keyword)", | "(passed by keyword)", | ||||
) | ) | ||||
solid_name = current_context().observe_invocation( | # the node name is potentially reassigned for aliasing | ||||
resolved_node_name = current_context().observe_invocation( | |||||
self.given_alias, | self.given_alias, | ||||
self.solid_def, | self.node_def, | ||||
input_bindings, | input_bindings, | ||||
input_mappings, | input_mappings, | ||||
self.tags, | self.tags, | ||||
self.hook_defs, | self.hook_defs, | ||||
) | ) | ||||
if len(self.solid_def.output_defs) == 0: | if len(self.node_def.output_defs) == 0: | ||||
return None | return None | ||||
if len(self.solid_def.output_defs) == 1: | if len(self.node_def.output_defs) == 1: | ||||
output_name = self.solid_def.output_defs[0].name | output_name = self.node_def.output_defs[0].name | ||||
return InvokedSolidOutputHandle(solid_name, output_name) | return InvokedSolidOutputHandle(resolved_node_name, output_name) | ||||
outputs = [output_def.name for output_def in self.solid_def.output_defs] | outputs = [output_def.name for output_def in self.node_def.output_defs] | ||||
return namedtuple("_{solid_def}_outputs".format(solid_def=self.solid_def.name), outputs)( | return namedtuple("_{node_def}_outputs".format(node_def=self.node_def.name), outputs)( | ||||
**{output: InvokedSolidOutputHandle(solid_name, output) for output in outputs} | **{output: InvokedSolidOutputHandle(resolved_node_name, output) for output in outputs} | ||||
) | ) | ||||
def _process_argument_node( | def _process_argument_node( | ||||
self, solid_name, output_node, input_name, input_mappings, input_bindings, arg_desc | self, solid_name, output_node, input_name, input_mappings, input_bindings, arg_desc | ||||
): | ): | ||||
if isinstance(output_node, InvokedSolidOutputHandle): | if isinstance(output_node, InvokedSolidOutputHandle): | ||||
input_bindings[input_name] = output_node | input_bindings[input_name] = output_node | ||||
Show All 26 Lines | ): | ||||
source=current_context().source, | source=current_context().source, | ||||
name=current_context().name, | name=current_context().name, | ||||
arg_desc=arg_desc, | arg_desc=arg_desc, | ||||
input_name=input_name, | input_name=input_name, | ||||
solid_name=solid_name, | solid_name=solid_name, | ||||
options=output_node._fields, | options=output_node._fields, | ||||
) | ) | ||||
) | ) | ||||
elif isinstance(output_node, CallableSolidNode) or isinstance( | elif isinstance(output_node, CallableNode) or isinstance(output_node, NodeDefinition): | ||||
output_node, ISolidDefinition | |||||
): | |||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
"In {source} {name}, received an un-invoked solid for input " | "In {source} {name}, received an un-invoked solid for input " | ||||
'"{input_name}" {arg_desc} in solid invocation "{solid_name}". ' | '"{input_name}" {arg_desc} in solid invocation "{solid_name}". ' | ||||
"Did you forget parentheses?".format( | "Did you forget parentheses?".format( | ||||
source=current_context().source, | source=current_context().source, | ||||
name=current_context().name, | name=current_context().name, | ||||
arg_desc=arg_desc, | arg_desc=arg_desc, | ||||
input_name=input_name, | input_name=input_name, | ||||
Show All 11 Lines | ): | ||||
type=type(output_node), | type=type(output_node), | ||||
arg_desc=arg_desc, | arg_desc=arg_desc, | ||||
input_name=input_name, | input_name=input_name, | ||||
solid_name=solid_name, | solid_name=solid_name, | ||||
) | ) | ||||
) | ) | ||||
def alias(self, name): | def alias(self, name): | ||||
return CallableSolidNode(self.solid_def, name, self.tags) | return CallableNode(self.node_def, name, self.tags) | ||||
def tag(self, tags): | def tag(self, tags): | ||||
tags = validate_tags(tags) | tags = validate_tags(tags) | ||||
return CallableSolidNode( | return CallableNode( | ||||
self.solid_def, | self.node_def, | ||||
self.given_alias, | self.given_alias, | ||||
frozentags(tags) if self.tags is None else self.tags.updated_with(tags), | frozentags(tags) if self.tags is None else self.tags.updated_with(tags), | ||||
) | ) | ||||
def with_hooks(self, hook_defs): | def with_hooks(self, hook_defs): | ||||
hook_defs = check.set_param(hook_defs, "hook_defs", of_type=HookDefinition) | hook_defs = check.set_param(hook_defs, "hook_defs", of_type=HookDefinition) | ||||
return CallableSolidNode( | return CallableNode( | ||||
self.solid_def, self.given_alias, self.tags, hook_defs.union(self.hook_defs) | self.node_def, self.given_alias, self.tags, hook_defs.union(self.hook_defs) | ||||
) | ) | ||||
class InvokedSolidNode( | class InvokedNode( | ||||
namedtuple( | namedtuple("_InvokedNode", "node_name, node_def input_bindings input_mappings tags hook_defs") | ||||
"_InvokedSolidNode", "solid_name solid_def input_bindings input_mappings tags hook_defs" | |||||
) | |||||
): | ): | ||||
"""The metadata about a solid invocation saved by the current composition context. | """The metadata about a solid invocation saved by the current composition context. | ||||
""" | """ | ||||
def __new__( | def __new__( | ||||
cls, solid_name, solid_def, input_bindings, input_mappings, tags=None, hook_defs=None | cls, node_name, node_def, input_bindings, input_mappings, tags=None, hook_defs=None | ||||
): | ): | ||||
return super(cls, InvokedSolidNode).__new__( | return super(cls, InvokedNode).__new__( | ||||
cls, | cls, | ||||
check.str_param(solid_name, "solid_name"), | check.str_param(node_name, "node_name"), | ||||
check.inst_param(solid_def, "solid_def", ISolidDefinition), | check.inst_param(node_def, "node_def", NodeDefinition), | ||||
check.dict_param(input_bindings, "input_bindings", key_type=str), | check.dict_param(input_bindings, "input_bindings", key_type=str), | ||||
check.dict_param( | check.dict_param( | ||||
input_mappings, "input_mappings", key_type=str, value_type=InputMappingNode | input_mappings, "input_mappings", key_type=str, value_type=InputMappingNode | ||||
), | ), | ||||
check.opt_inst_param(tags, "tags", frozentags), | check.opt_inst_param(tags, "tags", frozentags), | ||||
check.opt_set_param(hook_defs, "hook_defs", HookDefinition), | check.opt_set_param(hook_defs, "hook_defs", HookDefinition), | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 129 Lines • ▼ Show 20 Lines | def composite_mapping_from_output(output, output_defs, solid_name): | ||||
if output is not None: | if output is not None: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
"@composite_solid {name} returned problematic value " | "@composite_solid {name} returned problematic value " | ||||
"of type {type}. Expected return value from invoked solid or dict mapping " | "of type {type}. Expected return value from invoked solid or dict mapping " | ||||
"output name to return values from invoked solids".format( | "output name to return values from invoked solids".format( | ||||
name=solid_name, type=type(output) | name=solid_name, type=type(output) | ||||
) | ) | ||||
) | ) | ||||
def do_composition( | |||||
decorator_name, | |||||
graph_name, | |||||
fn, | |||||
provided_input_defs, | |||||
provided_output_defs, | |||||
config_schema, | |||||
config_fn, | |||||
ignore_output_from_composition_fn, | |||||
): | |||||
""" | |||||
This a function used by both @pipeline and @composite_solid to implement their composition | |||||
function which is our DSL for constructing a dependency graph. | |||||
Args: | |||||
decorator_name (str): Name of the calling decorator. e.g. "@pipeline", | |||||
"@composite_solid", "@graph" | |||||
graph_name (str): User-defined name of the definition being constructed | |||||
fn (Callable): The composition function to be called. | |||||
provided_input_defs(List[InputDefinition]): List of input definitions | |||||
explicitly provided to the decorator by the user. | |||||
provided_output_defs(List[OutputDefinition]): List of output definitions | |||||
explicitly provided to the decorator by the user. | |||||
config_schema(Any): Config schema provided to decorator by user. | |||||
config_fn(Callable): Config fn provided to decorator by user. | |||||
ignore_output_from_composite_fn(Bool): Because of backwards compatibility | |||||
issues, pipelines ignore the return value out of the mapping if | |||||
the user has not explicitly provided the output definitions. | |||||
This should be removed in 0.10.0. | |||||
""" | |||||
actual_input_defs = ( | |||||
provided_input_defs | |||||
if provided_input_defs is not None | |||||
else infer_input_definitions_for_graph(decorator_name, graph_name, fn) | |||||
) | |||||
actual_output_defs, outputs_are_explicit = ( | |||||
(provided_output_defs, True) | |||||
if provided_output_defs is not None | |||||
else ( | |||||
infer_output_definitions(decorator_name, graph_name, fn), | |||||
has_explicit_return_type(fn), | |||||
) | |||||
) | |||||
positional_inputs = validate_solid_fn( | |||||
decorator_name, graph_name, fn, actual_input_defs, exclude_nothing=False | |||||
) | |||||
kwargs = {input_def.name: InputMappingNode(input_def) for input_def in actual_input_defs} | |||||
output = None | |||||
returned_mapping = None | |||||
enter_composition(graph_name, decorator_name) | |||||
try: | |||||
output = fn(**kwargs) | |||||
if ignore_output_from_composition_fn: | |||||
if output is not None: | |||||
warnings.warn( | |||||
"You have returned a value out of a @pipeline-decorated function. " | |||||
"This currently has no effect on behavior, but will after 0.10.0 is " | |||||
"released. In order to preserve existing behavior to do not return " | |||||
"anything out of this function. Pipelines (and its successor, graphs) " | |||||
"will have meaningful outputs just like composite solids do today, " | |||||
"and the return value will be meaningful.", | |||||
stacklevel=3, | |||||
) | |||||
output = None | |||||
returned_mapping = composite_mapping_from_output(output, actual_output_defs, graph_name) | |||||
finally: | |||||
context = exit_composition(returned_mapping) | |||||
check.invariant( | |||||
context.name == graph_name, | |||||
"Composition context stack desync: received context for " | |||||
'"{context.name}" expected "{graph_name}"'.format(context=context, graph_name=graph_name), | |||||
) | |||||
# line up mappings in definition order | |||||
input_mappings = [] | |||||
for defn in actual_input_defs: | |||||
mappings = [ | |||||
mapping for mapping in context.input_mappings if mapping.definition.name == defn.name | |||||
] | |||||
if len(mappings) == 0: | |||||
raise DagsterInvalidDefinitionError( | |||||
"{decorator_name} '{graph_name}' has unmapped input '{input_name}'. " | |||||
"Remove it or pass it to the appropriate solid invocation.".format( | |||||
decorator_name=decorator_name, graph_name=graph_name, input_name=defn.name | |||||
) | |||||
) | |||||
input_mappings += mappings | |||||
output_mappings = [] | |||||
for defn in actual_output_defs: | |||||
mapping = context.output_mapping_dict.get(defn.name) | |||||
if mapping is None: | |||||
# if we inferred output_defs we will be flexible and either take a mapping or not | |||||
if not outputs_are_explicit: | |||||
continue | |||||
raise DagsterInvalidDefinitionError( | |||||
"{decorator_name} '{graph_name}' has unmapped output '{output_name}'. " | |||||
"Remove it or return a value from the appropriate solid invocation.".format( | |||||
decorator_name=decorator_name, graph_name=graph_name, output_name=defn.name | |||||
) | |||||
) | |||||
output_mappings.append(mapping) | |||||
config_mapping = _get_validated_config_mapping(graph_name, config_schema, config_fn) | |||||
return ( | |||||
input_mappings, | |||||
output_mappings, | |||||
context.dependencies, | |||||
context.solid_defs, | |||||
config_mapping, | |||||
positional_inputs, | |||||
) | |||||
def _get_validated_config_mapping(name, config_schema, config_fn): | |||||
"""Config mapping must set composite config_schema and config_fn or neither. | |||||
""" | |||||
if config_fn is None and config_schema is None: | |||||
return None | |||||
elif config_fn is not None and config_schema is not None: | |||||
return ConfigMapping(config_fn=config_fn, config_schema=config_schema) | |||||
else: | |||||
if config_fn is not None: | |||||
raise DagsterInvalidDefinitionError( | |||||
"@composite_solid '{solid_name}' defines a configuration function {config_fn} " | |||||
"but does not define a configuration schema. If you intend this composite to take " | |||||
"no config_schema, you must explicitly specify config_schema={{}}.".format( | |||||
solid_name=name, config_fn=config_fn.__name__ | |||||
) | |||||
) | |||||
else: | |||||
raise DagsterInvalidDefinitionError( | |||||
"@composite_solid '{solid_name}' defines a configuration schema but does not " | |||||
"define a configuration function.".format(solid_name=name) | |||||
) |