Differential D4920 Diff 24623 python_modules/dagster/dagster/core/system_config/composite_descent.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/system_config/composite_descent.py
from collections import namedtuple | from collections import namedtuple | ||||
from dagster import check | from dagster import check | ||||
from dagster.config.evaluate_value_result import EvaluateValueResult | from dagster.config.evaluate_value_result import EvaluateValueResult | ||||
from dagster.config.validate import process_config | from dagster.config.validate import process_config | ||||
from dagster.core.definitions.dependency import SolidHandle | from dagster.core.definitions.dependency import SolidHandle | ||||
from dagster.core.definitions.environment_configs import define_solid_dictionary_cls | from dagster.core.definitions.environment_configs import define_solid_dictionary_cls | ||||
from dagster.core.definitions.graph import GraphDefinition | |||||
from dagster.core.definitions.pipeline import PipelineDefinition | from dagster.core.definitions.pipeline import PipelineDefinition | ||||
from dagster.core.definitions.solid import CompositeSolidDefinition | from dagster.core.definitions.solid import SolidDefinition | ||||
from dagster.core.errors import ( | from dagster.core.errors import ( | ||||
DagsterConfigMappingFunctionError, | DagsterConfigMappingFunctionError, | ||||
DagsterInvalidConfigError, | DagsterInvalidConfigError, | ||||
user_code_error_boundary, | user_code_error_boundary, | ||||
) | ) | ||||
from dagster.core.system_config.objects import SolidConfig | from dagster.core.system_config.objects import SolidConfig | ||||
from dagster.utils.merger import merge_dicts | from dagster.utils.merger import merge_dicts | ||||
▲ Show 20 Lines • Show All 73 Lines • ▼ Show 20 Lines | def _composite_descent(parent_stack, solids_config_dict): | ||||
for solid in parent_stack.current_container.solids: | for solid in parent_stack.current_container.solids: | ||||
current_stack = parent_stack.descend(solid) | current_stack = parent_stack.descend(solid) | ||||
current_handle = current_stack.handle | current_handle = current_stack.handle | ||||
current_solid_config = solids_config_dict.get(solid.name, {}) | current_solid_config = solids_config_dict.get(solid.name, {}) | ||||
# the base case | # the base case | ||||
if not isinstance(solid.definition, CompositeSolidDefinition): | if isinstance(solid.definition, SolidDefinition): | ||||
config_mapped_solid_config = solid.definition.apply_config_mapping( | config_mapped_solid_config = solid.definition.apply_config_mapping( | ||||
{"config": current_solid_config.get("config")} | {"config": current_solid_config.get("config")} | ||||
) | ) | ||||
if not config_mapped_solid_config.success: | if not config_mapped_solid_config.success: | ||||
raise DagsterInvalidConfigError( | raise DagsterInvalidConfigError( | ||||
"Error in config for solid {}".format(solid.name), | "Error in config for solid {}".format(solid.name), | ||||
config_mapped_solid_config.errors, | config_mapped_solid_config.errors, | ||||
config_mapped_solid_config, | config_mapped_solid_config, | ||||
) | ) | ||||
complete_config_object = merge_dicts( | complete_config_object = merge_dicts( | ||||
current_solid_config, config_mapped_solid_config.value | current_solid_config, config_mapped_solid_config.value | ||||
) | ) | ||||
yield SolidConfigEntry(current_handle, SolidConfig.from_dict(complete_config_object)) | yield SolidConfigEntry(current_handle, SolidConfig.from_dict(complete_config_object)) | ||||
continue | continue | ||||
composite_def = check.inst(solid.definition, CompositeSolidDefinition) | graph_def = check.inst(solid.definition, GraphDefinition) | ||||
yield SolidConfigEntry( | yield SolidConfigEntry( | ||||
current_handle, | current_handle, | ||||
SolidConfig.from_dict( | SolidConfig.from_dict( | ||||
{ | { | ||||
"inputs": current_solid_config.get("inputs"), | "inputs": current_solid_config.get("inputs"), | ||||
"outputs": current_solid_config.get("outputs"), | "outputs": current_solid_config.get("outputs"), | ||||
} | } | ||||
), | ), | ||||
) | ) | ||||
# If there is a config mapping, invoke it and get the descendent solids | # If there is a config mapping, invoke it and get the descendent solids | ||||
# config that way. Else just grabs the solids entry of the current config | # config that way. Else just grabs the solids entry of the current config | ||||
solids_dict = ( | solids_dict = ( | ||||
_get_mapped_solids_dict(solid, composite_def, current_stack, current_solid_config) | _get_mapped_solids_dict(solid, graph_def, current_stack, current_solid_config) | ||||
if composite_def.config_mapping | if graph_def.config_mapping | ||||
else current_solid_config.get("solids", {}) | else current_solid_config.get("solids", {}) | ||||
) | ) | ||||
for sce in _composite_descent(current_stack, solids_dict): | for sce in _composite_descent(current_stack, solids_dict): | ||||
yield sce | yield sce | ||||
def _get_mapped_solids_dict(composite, composite_def, current_stack, current_solid_config): | def _get_mapped_solids_dict(composite, graph_def, current_stack, current_solid_config): | ||||
# the spec of the config mapping function is that it takes the dictionary at: | # the spec of the config mapping function is that it takes the dictionary at: | ||||
# solid_name: | # solid_name: | ||||
# config: {dict_passed_to_user} | # config: {dict_passed_to_user} | ||||
# and it returns the dictionary rooted at solids | # and it returns the dictionary rooted at solids | ||||
# solid_name: | # solid_name: | ||||
# solids: {return_value_of_config_fn} | # solids: {return_value_of_config_fn} | ||||
# We must call the config mapping function and then validate it against | # We must call the config mapping function and then validate it against | ||||
# the child schema. | # the child schema. | ||||
# apply @configured config mapping to the composite's incoming config before we get to the | # apply @configured config mapping to the composite's incoming config before we get to the | ||||
# composite's own config mapping process | # composite's own config mapping process | ||||
config_mapped_solid_config = composite_def.apply_config_mapping(current_solid_config) | config_mapped_solid_config = graph_def.apply_config_mapping(current_solid_config) | ||||
if not config_mapped_solid_config.success: | if not config_mapped_solid_config.success: | ||||
raise DagsterInvalidConfigError( | raise DagsterInvalidConfigError( | ||||
"Error in config for composite solid {}".format(composite.name), | "Error in config for composite solid {}".format(composite.name), | ||||
config_mapped_solid_config.errors, | config_mapped_solid_config.errors, | ||||
config_mapped_solid_config, | config_mapped_solid_config, | ||||
) | ) | ||||
with user_code_error_boundary( | with user_code_error_boundary( | ||||
DagsterConfigMappingFunctionError, _get_error_lambda(current_stack) | DagsterConfigMappingFunctionError, _get_error_lambda(current_stack) | ||||
): | ): | ||||
mapped_solids_config = composite_def.config_mapping.config_fn( | mapped_solids_config = graph_def.config_mapping.config_fn( | ||||
config_mapped_solid_config.value.get("config", {}) | config_mapped_solid_config.value.get("config", {}) | ||||
) | ) | ||||
# Dynamically construct the type that the output of the config mapping function will | # Dynamically construct the type that the output of the config mapping function will | ||||
# be evaluated against | # be evaluated against | ||||
type_to_evaluate_against = define_solid_dictionary_cls( | type_to_evaluate_against = define_solid_dictionary_cls( | ||||
composite_def.solids, composite_def.dependency_structure, current_stack.handle | graph_def.solids, graph_def.dependency_structure, current_stack.handle | ||||
) | ) | ||||
# process against that new type | # process against that new type | ||||
evr = process_config(type_to_evaluate_against, mapped_solids_config) | evr = process_config(type_to_evaluate_against, mapped_solids_config) | ||||
if not evr.success: | if not evr.success: | ||||
raise_composite_descent_config_error(current_stack, mapped_solids_config, evr) | raise_composite_descent_config_error(current_stack, mapped_solids_config, evr) | ||||
Show All 38 Lines |