Differential D4847 Diff 24311 python_modules/dagster/dagster/core/definitions/environment_configs.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/environment_configs.py
from collections import namedtuple | from collections import namedtuple | ||||
from dagster.config import Field, Selector | from dagster.config import Field, Selector | ||||
from dagster.config.config_type import ALL_CONFIG_BUILTINS, Array, ConfigType | from dagster.config.config_type import ALL_CONFIG_BUILTINS, Array, ConfigType | ||||
from dagster.config.field import check_opt_field_param | from dagster.config.field import check_opt_field_param | ||||
from dagster.config.field_utils import FIELD_NO_DEFAULT_PROVIDED, Shape, all_optional_type | from dagster.config.field_utils import FIELD_NO_DEFAULT_PROVIDED, Shape, all_optional_type | ||||
from dagster.config.iterate_types import iterate_config_types | from dagster.config.iterate_types import iterate_config_types | ||||
from dagster.core.errors import DagsterInvalidDefinitionError | from dagster.core.errors import DagsterInvalidDefinitionError | ||||
from dagster.core.storage.system_storage import ( | from dagster.core.storage.system_storage import ( | ||||
default_intermediate_storage_defs, | default_intermediate_storage_defs, | ||||
default_system_storage_defs, | default_system_storage_defs, | ||||
) | ) | ||||
from dagster.core.types.dagster_type import ALL_RUNTIME_BUILTINS, construct_dagster_type_dictionary | from dagster.core.types.dagster_type import ALL_RUNTIME_BUILTINS, construct_dagster_type_dictionary | ||||
from dagster.utils import check, ensure_single_item | from dagster.utils import check, ensure_single_item | ||||
from .dependency import DependencyStructure, Solid, SolidHandle, SolidInputHandle | from .dependency import DependencyStructure, Solid, SolidHandle, SolidInputHandle | ||||
from .graph import GraphDefinition | |||||
from .logger import LoggerDefinition | from .logger import LoggerDefinition | ||||
from .mode import ModeDefinition | from .mode import ModeDefinition | ||||
from .resource import ResourceDefinition | from .resource import ResourceDefinition | ||||
from .solid import CompositeSolidDefinition, ISolidDefinition, SolidDefinition | from .solid import ISolidDefinition, SolidDefinition | ||||
def _is_selector_field_optional(config_type): | def _is_selector_field_optional(config_type): | ||||
check.inst_param(config_type, "config_type", ConfigType) | check.inst_param(config_type, "config_type", ConfigType) | ||||
if len(config_type.fields) > 1: | if len(config_type.fields) > 1: | ||||
return False | return False | ||||
else: | else: | ||||
_name, field = ensure_single_item(config_type.fields) | _name, field = ensure_single_item(config_type.fields) | ||||
▲ Show 20 Lines • Show All 249 Lines • ▼ Show 20 Lines | def define_isolid_field(solid, handle, dependency_structure): | ||||
# 4) `configured` composite with field mapping: a 'config' key with the config_schema that was | # 4) `configured` composite with field mapping: a 'config' key with the config_schema that was | ||||
# provided when `configured` was called (via CompositeSolidDefinition#config_schema) | # provided when `configured` was called (via CompositeSolidDefinition#config_schema) | ||||
if isinstance(solid.definition, SolidDefinition): | if isinstance(solid.definition, SolidDefinition): | ||||
return construct_leaf_solid_config( | return construct_leaf_solid_config( | ||||
solid, handle, dependency_structure, solid.definition.config_schema | solid, handle, dependency_structure, solid.definition.config_schema | ||||
) | ) | ||||
composite_def = check.inst(solid.definition, CompositeSolidDefinition) | graph_def = check.inst(solid.definition, GraphDefinition) | ||||
if composite_def.has_config_mapping: | if graph_def.has_config_mapping: | ||||
# has_config_mapping covers cases 2 & 4 from above (only config mapped composite solids can | # has_config_mapping covers cases 2 & 4 from above (only config mapped composite solids can | ||||
# be `configured`)... | # be `configured`)... | ||||
return construct_leaf_solid_config( | return construct_leaf_solid_config( | ||||
solid, | solid, | ||||
handle, | handle, | ||||
dependency_structure, | dependency_structure, | ||||
# ...and in both cases, the correct schema for 'config' key is exposed by this property: | # ...and in both cases, the correct schema for 'config' key is exposed by this property: | ||||
composite_def.config_schema, | graph_def.config_schema, | ||||
) | ) | ||||
# This case omits a 'solids' key, thus if a composite solid is `configured` or has a field | # This case omits a 'solids' key, thus if a composite solid is `configured` or has a field | ||||
# mapping, the user cannot stub any config, inputs, or outputs for inner (child) solids. | # mapping, the user cannot stub any config, inputs, or outputs for inner (child) solids. | ||||
else: | else: | ||||
return filtered_system_dict( | return filtered_system_dict( | ||||
{ | { | ||||
"inputs": get_inputs_field(solid, handle, dependency_structure), | "inputs": get_inputs_field(solid, handle, dependency_structure), | ||||
"outputs": get_outputs_field(solid, handle), | "outputs": get_outputs_field(solid, handle), | ||||
"solids": Field( | "solids": Field( | ||||
define_solid_dictionary_cls( | define_solid_dictionary_cls( | ||||
composite_def.solids, composite_def.dependency_structure, handle, | graph_def.solids, graph_def.dependency_structure, handle, | ||||
) | ) | ||||
), | ), | ||||
} | } | ||||
) | ) | ||||
def define_solid_dictionary_cls(solids, dependency_structure, parent_handle=None): | def define_solid_dictionary_cls(solids, dependency_structure, parent_handle=None): | ||||
check.list_param(solids, "solids", of_type=Solid) | check.list_param(solids, "solids", of_type=Solid) | ||||
Show All 11 Lines | |||||
def iterate_solid_def_config_types(solid_def): | def iterate_solid_def_config_types(solid_def): | ||||
if isinstance(solid_def, SolidDefinition): | if isinstance(solid_def, SolidDefinition): | ||||
if solid_def.config_schema: | if solid_def.config_schema: | ||||
for config_type in iterate_config_types(solid_def.config_schema.config_type): | for config_type in iterate_config_types(solid_def.config_schema.config_type): | ||||
yield config_type | yield config_type | ||||
elif isinstance(solid_def, CompositeSolidDefinition): | elif isinstance(solid_def, GraphDefinition): | ||||
for solid in solid_def.solids: | for solid in solid_def.solids: | ||||
for config_type in iterate_solid_def_config_types(solid.definition): | for config_type in iterate_solid_def_config_types(solid.definition): | ||||
yield config_type | yield config_type | ||||
else: | else: | ||||
check.invariant("Unexpected ISolidDefinition type {type}".format(type=type(solid_def))) | check.invariant("Unexpected ISolidDefinition type {type}".format(type=type(solid_def))) | ||||
▲ Show 20 Lines • Show All 49 Lines • Show Last 20 Lines |