Differential D4920 Diff 24623 python_modules/dagster/dagster/core/definitions/decorators/composite_solid.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/decorators/composite_solid.py
from functools import update_wrapper | from functools import update_wrapper | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.errors import DagsterInvalidDefinitionError | |||||
from ..composition import ( | from ..composition import do_composition | ||||
InputMappingNode, | |||||
composite_mapping_from_output, | |||||
enter_composition, | |||||
exit_composition, | |||||
) | |||||
from ..config import ConfigMapping | |||||
from ..inference import ( | |||||
has_explicit_return_type, | |||||
infer_input_definitions_for_composite_solid, | |||||
infer_output_definitions, | |||||
) | |||||
from ..input import InputDefinition | from ..input import InputDefinition | ||||
from ..output import OutputDefinition | from ..output import OutputDefinition | ||||
from ..solid import CompositeSolidDefinition | from ..solid import CompositeSolidDefinition | ||||
from .solid import validate_solid_fn | |||||
class _CompositeSolid(object): | class _CompositeSolid(object): | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
name=None, | name=None, | ||||
input_defs=None, | input_defs=None, | ||||
output_defs=None, | output_defs=None, | ||||
Show All 13 Lines | ): | ||||
self.config_fn = check.opt_callable_param(config_fn, "config_fn") | self.config_fn = check.opt_callable_param(config_fn, "config_fn") | ||||
def __call__(self, fn): | def __call__(self, fn): | ||||
check.callable_param(fn, "fn") | check.callable_param(fn, "fn") | ||||
if not self.name: | if not self.name: | ||||
self.name = fn.__name__ | self.name = fn.__name__ | ||||
input_defs = ( | ( | ||||
self.input_defs | input_mappings, | ||||
if self.input_defs is not None | output_mappings, | ||||
else infer_input_definitions_for_composite_solid(self.name, fn) | dependencies, | ||||
) | solid_defs, | ||||
config_mapping, | |||||
explicit_outputs = False | positional_inputs, | ||||
if self.output_defs is not None: | ) = do_composition( | ||||
explicit_outputs = True | "@composite_solid", | ||||
output_defs = self.output_defs | self.name, | ||||
else: | fn, | ||||
explicit_outputs = has_explicit_return_type(fn) | self.input_defs, | ||||
output_defs = infer_output_definitions("@composite_solid", self.name, fn) | self.output_defs, | ||||
self.config_schema, | |||||
positional_inputs = validate_solid_fn( | self.config_fn, | ||||
"@composite_solid", self.name, fn, input_defs, exclude_nothing=False | ignore_output_from_composition_fn=False, | ||||
) | |||||
kwargs = {input_def.name: InputMappingNode(input_def) for input_def in input_defs} | |||||
output = None | |||||
mapping = None | |||||
enter_composition(self.name, "@composite_solid") | |||||
try: | |||||
output = fn(**kwargs) | |||||
mapping = composite_mapping_from_output(output, output_defs, self.name) | |||||
finally: | |||||
context = exit_composition(mapping) | |||||
check.invariant( | |||||
context.name == self.name, | |||||
"Composition context stack desync: received context for " | |||||
'"{context.name}" expected "{self.name}"'.format(context=context, self=self), | |||||
) | |||||
# line up mappings in definition order | |||||
input_mappings = [] | |||||
for defn in input_defs: | |||||
mappings = [ | |||||
mapping | |||||
for mapping in context.input_mappings | |||||
if mapping.definition.name == defn.name | |||||
] | |||||
if len(mappings) == 0: | |||||
raise DagsterInvalidDefinitionError( | |||||
"@composite_solid '{solid_name}' has unmapped input '{input_name}'. " | |||||
"Remove it or pass it to the appropriate solid invocation.".format( | |||||
solid_name=self.name, input_name=defn.name | |||||
) | |||||
) | |||||
input_mappings += mappings | |||||
output_mappings = [] | |||||
for defn in output_defs: | |||||
mapping = context.output_mapping_dict.get(defn.name) | |||||
if mapping is None: | |||||
# if we inferred output_defs we will be flexible and either take a mapping or not | |||||
if not explicit_outputs: | |||||
continue | |||||
raise DagsterInvalidDefinitionError( | |||||
"@composite_solid '{solid_name}' has unmapped output '{output_name}'. " | |||||
"Remove it or return a value from the appropriate solid invocation.".format( | |||||
solid_name=self.name, output_name=defn.name | |||||
) | |||||
) | |||||
output_mappings.append(mapping) | |||||
config_mapping = _get_validated_config_mapping( | |||||
self.name, self.config_schema, self.config_fn | |||||
) | ) | ||||
composite_def = CompositeSolidDefinition( | composite_def = CompositeSolidDefinition( | ||||
name=self.name, | name=self.name, | ||||
input_mappings=input_mappings, | input_mappings=input_mappings, | ||||
output_mappings=output_mappings, | output_mappings=output_mappings, | ||||
dependencies=context.dependencies, | dependencies=dependencies, | ||||
solid_defs=context.solid_defs, | solid_defs=solid_defs, | ||||
description=self.description, | description=self.description, | ||||
config_mapping=config_mapping, | config_mapping=config_mapping, | ||||
positional_inputs=positional_inputs, | positional_inputs=positional_inputs, | ||||
) | ) | ||||
update_wrapper(composite_def, fn) | update_wrapper(composite_def, fn) | ||||
return composite_def | return composite_def | ||||
def _get_validated_config_mapping(name, config_schema, config_fn): | |||||
"""Config mapping must set composite config_schema and config_fn or neither. | |||||
""" | |||||
if config_fn is None and config_schema is None: | |||||
return None | |||||
elif config_fn is not None and config_schema is not None: | |||||
return ConfigMapping(config_fn=config_fn, config_schema=config_schema) | |||||
else: | |||||
if config_fn is not None: | |||||
raise DagsterInvalidDefinitionError( | |||||
"@composite_solid '{solid_name}' defines a configuration function {config_fn} " | |||||
"but does not define a configuration schema. If you intend this composite to take " | |||||
"no config_schema, you must explicitly specify config_schema={{}}.".format( | |||||
solid_name=name, config_fn=config_fn.__name__ | |||||
) | |||||
) | |||||
else: | |||||
raise DagsterInvalidDefinitionError( | |||||
"@composite_solid '{solid_name}' defines a configuration schema but does not " | |||||
"define a configuration function.".format(solid_name=name) | |||||
) | |||||
def composite_solid( | def composite_solid( | ||||
name=None, | name=None, | ||||
input_defs=None, | input_defs=None, | ||||
output_defs=None, | output_defs=None, | ||||
description=None, | description=None, | ||||
config_schema=None, | config_schema=None, | ||||
config_fn=None, | config_fn=None, | ||||
): | ): | ||||
▲ Show 20 Lines • Show All 71 Lines • Show Last 20 Lines |