Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/composition.py
Show First 20 Lines • Show All 266 Lines • ▼ Show 20 Lines | def __call__(self, *args, **kwargs): | ||||
resolved_node_name = current_context().observe_invocation( | resolved_node_name = current_context().observe_invocation( | ||||
self.given_alias, self.node_def, input_bindings, self.tags, self.hook_defs, | self.given_alias, self.node_def, input_bindings, self.tags, self.hook_defs, | ||||
) | ) | ||||
if len(self.node_def.output_defs) == 0: | if len(self.node_def.output_defs) == 0: | ||||
return None | return None | ||||
if len(self.node_def.output_defs) == 1: | if len(self.node_def.output_defs) == 1: | ||||
output_name = self.node_def.output_defs[0].name | output_def = self.node_def.output_defs[0] | ||||
output_name = output_def.name | |||||
if output_def.is_dynamic: | |||||
return InvokedSolidDynamicOutputWrapper(resolved_node_name, output_name) | |||||
else: | |||||
return InvokedSolidOutputHandle(resolved_node_name, output_name) | return InvokedSolidOutputHandle(resolved_node_name, output_name) | ||||
outputs = [output_def.name for output_def in self.node_def.output_defs] | outputs = [output_def for output_def in self.node_def.output_defs] | ||||
return namedtuple("_{node_def}_outputs".format(node_def=self.node_def.name), outputs)( | invoked_output_handles = {} | ||||
**{output: InvokedSolidOutputHandle(resolved_node_name, output) for output in outputs} | for output_def in outputs: | ||||
if output_def.is_dynamic: | |||||
invoked_output_handles[output_def.name] = InvokedSolidDynamicOutputWrapper( | |||||
resolved_node_name, output_def.name | |||||
) | |||||
else: | |||||
invoked_output_handles[output_def.name] = InvokedSolidOutputHandle( | |||||
resolved_node_name, output_def.name | |||||
) | ) | ||||
return namedtuple( | |||||
"_{node_def}_outputs".format(node_def=self.node_def.name), | |||||
" ".join([output_def.name for output_def in outputs]), | |||||
)(**invoked_output_handles) | |||||
def _process_argument_node(self, solid_name, output_node, input_name, input_bindings, arg_desc): | def _process_argument_node(self, solid_name, output_node, input_name, input_bindings, arg_desc): | ||||
if isinstance(output_node, (InvokedSolidOutputHandle, InputMappingNode)): | if isinstance(output_node, (InvokedSolidOutputHandle, InputMappingNode)): | ||||
input_bindings[input_name] = output_node | input_bindings[input_name] = output_node | ||||
elif isinstance(output_node, list): | elif isinstance(output_node, list): | ||||
input_bindings[input_name] = [] | input_bindings[input_name] = [] | ||||
for idx, fanned_in_node in enumerate(output_node): | for idx, fanned_in_node in enumerate(output_node): | ||||
Show All 26 Lines | def _process_argument_node(self, solid_name, output_node, input_name, input_bindings, arg_desc): | ||||
source=current_context().source, | source=current_context().source, | ||||
name=current_context().name, | name=current_context().name, | ||||
arg_desc=arg_desc, | arg_desc=arg_desc, | ||||
input_name=input_name, | input_name=input_name, | ||||
solid_name=solid_name, | solid_name=solid_name, | ||||
options=output_node._fields, | options=output_node._fields, | ||||
) | ) | ||||
) | ) | ||||
elif isinstance(output_node, InvokedSolidDynamicOutputWrapper): | |||||
raise DagsterInvalidDefinitionError( | |||||
f"In {current_context().source} {current_context().name}, received the dynamic output " | |||||
f"{output_node.output_name} from solid {output_node.solid_name} directly. Dynamic " | |||||
"output must be unpacked by invoking forEach." | |||||
) | |||||
elif isinstance(output_node, CallableNode) or isinstance(output_node, NodeDefinition): | elif isinstance(output_node, CallableNode) or isinstance(output_node, NodeDefinition): | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
"In {source} {name}, received an un-invoked solid for input " | "In {source} {name}, received an un-invoked solid for input " | ||||
'"{input_name}" {arg_desc} in solid invocation "{solid_name}". ' | '"{input_name}" {arg_desc} in solid invocation "{solid_name}". ' | ||||
"Did you forget parentheses?".format( | "Did you forget parentheses?".format( | ||||
source=current_context().source, | source=current_context().source, | ||||
name=current_context().name, | name=current_context().name, | ||||
arg_desc=arg_desc, | arg_desc=arg_desc, | ||||
▲ Show 20 Lines • Show All 97 Lines • ▼ Show 20 Lines | def with_hooks(self, _): | ||||
name=current_context().name, | name=current_context().name, | ||||
cls=self.__class__.__name__, | cls=self.__class__.__name__, | ||||
solid=self.solid_name, | solid=self.solid_name, | ||||
out=self.output_name, | out=self.output_name, | ||||
) | ) | ||||
) | ) | ||||
class InvokedSolidDynamicOutputWrapper: | |||||
""" | |||||
The return value for a dynamic output when invoking a solid in a composition function. | |||||
Must be unwrapped by invoking forEach. | |||||
""" | |||||
def __init__(self, solid_name, output_name): | |||||
self.solid_name = check.str_param(solid_name, "solid_name") | |||||
self.output_name = check.str_param(output_name, "output_name") | |||||
def forEach(self, fn): | |||||
check.is_callable(fn) | |||||
fn(InvokedSolidOutputHandle(self.solid_name, self.output_name)) | |||||
def __iter__(self): | |||||
raise DagsterInvariantViolationError( | |||||
'Attempted to iterate over an {cls}. This object represents the dynamic output "{out}" ' | |||||
'from the solid "{solid}". Use the "forEach" method on this object to create ' | |||||
"downstream dependencies that will be cloned for each DynamicOutput " | |||||
"that is resolved at runtime.".format( | |||||
cls=self.__class__.__name__, out=self.output_name, solid=self.solid_name | |||||
) | |||||
) | |||||
def __getitem__(self, idx): | |||||
raise DagsterInvariantViolationError( | |||||
'Attempted to index in to an {cls}. This object represents the dynamic output "{out}" ' | |||||
'from the solid "{solid}". Use the "forEach" method on this object to create ' | |||||
"downstream dependencies that will be cloned for each DynamicOutput " | |||||
"that is resolved at runtime.".format( | |||||
cls=self.__class__.__name__, out=self.output_name, solid=self.solid_name | |||||
) | |||||
) | |||||
def alias(self, _): | |||||
raise DagsterInvariantViolationError( | |||||
"In {source} {name}, attempted to call alias method for {cls}. This object " | |||||
'represents the dynamic output "{out}" from the already invoked solid "{solid}". Consider ' | |||||
"checking the location of parentheses.".format( | |||||
source=current_context().source, | |||||
name=current_context().name, | |||||
cls=self.__class__.__name__, | |||||
solid=self.solid_name, | |||||
out=self.output_name, | |||||
) | |||||
) | |||||
def with_hooks(self, _): | |||||
raise DagsterInvariantViolationError( | |||||
"In {source} {name}, attempted to call hook method for {cls}. This object " | |||||
'represents the dynamic output "{out}" from the already invoked solid "{solid}". Consider ' | |||||
"checking the location of parentheses.".format( | |||||
source=current_context().source, | |||||
name=current_context().name, | |||||
cls=self.__class__.__name__, | |||||
solid=self.solid_name, | |||||
out=self.output_name, | |||||
) | |||||
) | |||||
class InputMappingNode: | class InputMappingNode: | ||||
def __init__(self, input_def): | def __init__(self, input_def): | ||||
self.input_def = input_def | self.input_def = input_def | ||||
def composite_mapping_from_output(output, output_defs, solid_name): | def composite_mapping_from_output(output, output_defs, solid_name): | ||||
# output can be different types | # output can be different types | ||||
check.list_param(output_defs, "output_defs", OutputDefinition) | check.list_param(output_defs, "output_defs", OutputDefinition) | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | if isinstance(output, dict): | ||||
) | ) | ||||
output_mapping_dict[name] = output_def_dict[name].mapping_from( | output_mapping_dict[name] = output_def_dict[name].mapping_from( | ||||
handle.solid_name, handle.output_name | handle.solid_name, handle.output_name | ||||
) | ) | ||||
return output_mapping_dict | return output_mapping_dict | ||||
elif isinstance(output, InvokedSolidDynamicOutputWrapper): | |||||
raise DagsterInvalidDefinitionError( | |||||
f"In @composite_solid {solid_name}, received the dynamic output " | |||||
f"{output.output_name} from solid {output.solid_name} directly. Dynamic " | |||||
"output must be unpacked by invoking PLACEHOLDER before mapping." | |||||
) | |||||
# error | # error | ||||
if output is not None: | if output is not None: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
"@composite_solid {name} returned problematic value " | "@composite_solid {name} returned problematic value " | ||||
"of type {type}. Expected return value from invoked solid or dict mapping " | "of type {type}. Expected return value from invoked solid or dict mapping " | ||||
"output name to return values from invoked solids".format( | "output name to return values from invoked solids".format( | ||||
name=solid_name, type=type(output) | name=solid_name, type=type(output) | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 154 Lines • Show Last 20 Lines |