Differential D4920 Diff 24623 python_modules/dagster/dagster/core/definitions/environment_configs.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/environment_configs.py
from collections import namedtuple | from collections import namedtuple | ||||
from dagster.config import Field, Selector | from dagster.config import Field, Selector | ||||
from dagster.config.config_type import ALL_CONFIG_BUILTINS, Array, ConfigType | from dagster.config.config_type import ALL_CONFIG_BUILTINS, Array, ConfigType | ||||
from dagster.config.field import check_opt_field_param | from dagster.config.field import check_opt_field_param | ||||
from dagster.config.field_utils import FIELD_NO_DEFAULT_PROVIDED, Shape, all_optional_type | from dagster.config.field_utils import FIELD_NO_DEFAULT_PROVIDED, Shape, all_optional_type | ||||
from dagster.config.iterate_types import iterate_config_types | from dagster.config.iterate_types import iterate_config_types | ||||
from dagster.core.errors import DagsterInvalidDefinitionError | from dagster.core.errors import DagsterInvalidDefinitionError | ||||
from dagster.core.storage.system_storage import ( | from dagster.core.storage.system_storage import ( | ||||
default_intermediate_storage_defs, | default_intermediate_storage_defs, | ||||
default_system_storage_defs, | default_system_storage_defs, | ||||
) | ) | ||||
from dagster.core.types.dagster_type import ALL_RUNTIME_BUILTINS, construct_dagster_type_dictionary | from dagster.core.types.dagster_type import ALL_RUNTIME_BUILTINS, construct_dagster_type_dictionary | ||||
from dagster.utils import check, ensure_single_item | from dagster.utils import check, ensure_single_item | ||||
from .dependency import DependencyStructure, Solid, SolidHandle, SolidInputHandle | from .dependency import DependencyStructure, Solid, SolidHandle, SolidInputHandle | ||||
from .graph import GraphDefinition | |||||
from .logger import LoggerDefinition | from .logger import LoggerDefinition | ||||
from .mode import ModeDefinition | from .mode import ModeDefinition | ||||
from .resource import ResourceDefinition | from .resource import ResourceDefinition | ||||
from .solid import CompositeSolidDefinition, ISolidDefinition, SolidDefinition | from .solid import NodeDefinition, SolidDefinition | ||||
def _is_selector_field_optional(config_type): | def _is_selector_field_optional(config_type): | ||||
check.inst_param(config_type, "config_type", ConfigType) | check.inst_param(config_type, "config_type", ConfigType) | ||||
if len(config_type.fields) > 1: | if len(config_type.fields) > 1: | ||||
return False | return False | ||||
else: | else: | ||||
_name, field = ensure_single_item(config_type.fields) | _name, field = ensure_single_item(config_type.fields) | ||||
▲ Show 20 Lines • Show All 249 Lines • ▼ Show 20 Lines | def define_isolid_field(solid, handle, dependency_structure): | ||||
# 4) `configured` composite with field mapping: a 'config' key with the config_schema that was | # 4) `configured` composite with field mapping: a 'config' key with the config_schema that was | ||||
# provided when `configured` was called (via CompositeSolidDefinition#config_schema) | # provided when `configured` was called (via CompositeSolidDefinition#config_schema) | ||||
if isinstance(solid.definition, SolidDefinition): | if isinstance(solid.definition, SolidDefinition): | ||||
return construct_leaf_solid_config( | return construct_leaf_solid_config( | ||||
solid, handle, dependency_structure, solid.definition.config_schema | solid, handle, dependency_structure, solid.definition.config_schema | ||||
) | ) | ||||
composite_def = check.inst(solid.definition, CompositeSolidDefinition) | graph_def = check.inst(solid.definition, GraphDefinition) | ||||
if composite_def.has_config_mapping: | if graph_def.has_config_mapping: | ||||
# has_config_mapping covers cases 2 & 4 from above (only config mapped composite solids can | # has_config_mapping covers cases 2 & 4 from above (only config mapped composite solids can | ||||
# be `configured`)... | # be `configured`)... | ||||
return construct_leaf_solid_config( | return construct_leaf_solid_config( | ||||
solid, | solid, | ||||
handle, | handle, | ||||
dependency_structure, | dependency_structure, | ||||
# ...and in both cases, the correct schema for 'config' key is exposed by this property: | # ...and in both cases, the correct schema for 'config' key is exposed by this property: | ||||
composite_def.config_schema, | graph_def.config_schema, | ||||
) | ) | ||||
# This case omits a 'solids' key, thus if a composite solid is `configured` or has a field | # This case omits a 'solids' key, thus if a composite solid is `configured` or has a field | ||||
# mapping, the user cannot stub any config, inputs, or outputs for inner (child) solids. | # mapping, the user cannot stub any config, inputs, or outputs for inner (child) solids. | ||||
else: | else: | ||||
return filtered_system_dict( | return filtered_system_dict( | ||||
{ | { | ||||
"inputs": get_inputs_field(solid, handle, dependency_structure), | "inputs": get_inputs_field(solid, handle, dependency_structure), | ||||
"outputs": get_outputs_field(solid, handle), | "outputs": get_outputs_field(solid, handle), | ||||
"solids": Field( | "solids": Field( | ||||
define_solid_dictionary_cls( | define_solid_dictionary_cls( | ||||
composite_def.solids, composite_def.dependency_structure, handle, | graph_def.solids, graph_def.dependency_structure, handle, | ||||
) | ) | ||||
), | ), | ||||
} | } | ||||
) | ) | ||||
def define_solid_dictionary_cls(solids, dependency_structure, parent_handle=None): | def define_solid_dictionary_cls(solids, dependency_structure, parent_handle=None): | ||||
check.list_param(solids, "solids", of_type=Solid) | check.list_param(solids, "solids", of_type=Solid) | ||||
check.inst_param(dependency_structure, "dependency_structure", DependencyStructure) | check.inst_param(dependency_structure, "dependency_structure", DependencyStructure) | ||||
check.opt_inst_param(parent_handle, "parent_handle", SolidHandle) | check.opt_inst_param(parent_handle, "parent_handle", SolidHandle) | ||||
fields = {} | fields = {} | ||||
for solid in solids: | for solid in solids: | ||||
if solid.definition.has_config_entry: | if solid.definition.has_config_entry: | ||||
fields[solid.name] = define_isolid_field( | fields[solid.name] = define_isolid_field( | ||||
solid, SolidHandle(solid.name, parent_handle), dependency_structure, | solid, SolidHandle(solid.name, parent_handle), dependency_structure, | ||||
) | ) | ||||
return Shape(fields) | return Shape(fields) | ||||
def iterate_solid_def_config_types(solid_def): | def iterate_node_def_config_types(node_def): | ||||
check.inst_param(node_def, "node_def", NodeDefinition) | |||||
if isinstance(solid_def, SolidDefinition): | if isinstance(node_def, SolidDefinition): | ||||
if solid_def.config_schema: | if node_def.config_schema: | ||||
for config_type in iterate_config_types(solid_def.config_schema.config_type): | for config_type in iterate_config_types(node_def.config_schema.config_type): | ||||
yield config_type | yield config_type | ||||
elif isinstance(solid_def, CompositeSolidDefinition): | elif isinstance(node_def, GraphDefinition): | ||||
for solid in solid_def.solids: | for solid in node_def.solids: | ||||
for config_type in iterate_solid_def_config_types(solid.definition): | for config_type in iterate_node_def_config_types(solid.definition): | ||||
yield config_type | yield config_type | ||||
else: | else: | ||||
check.invariant("Unexpected ISolidDefinition type {type}".format(type=type(solid_def))) | check.invariant("Unexpected NodeDefinition type {type}".format(type=type(node_def))) | ||||
def _gather_all_schemas(solid_defs): | def _gather_all_schemas(node_defs): | ||||
dagster_types = construct_dagster_type_dictionary(solid_defs) | dagster_types = construct_dagster_type_dictionary(node_defs) | ||||
for dagster_type in list(dagster_types.values()) + list(ALL_RUNTIME_BUILTINS): | for dagster_type in list(dagster_types.values()) + list(ALL_RUNTIME_BUILTINS): | ||||
if dagster_type.loader: | if dagster_type.loader: | ||||
for ct in iterate_config_types(dagster_type.loader.schema_type): | for ct in iterate_config_types(dagster_type.loader.schema_type): | ||||
yield ct | yield ct | ||||
if dagster_type.materializer: | if dagster_type.materializer: | ||||
for ct in iterate_config_types(dagster_type.materializer.schema_type): | for ct in iterate_config_types(dagster_type.materializer.schema_type): | ||||
yield ct | yield ct | ||||
def _gather_all_config_types(solid_defs, environment_type): | def _gather_all_config_types(node_defs, environment_type): | ||||
check.list_param(solid_defs, "solid_defs", ISolidDefinition) | check.list_param(node_defs, "node_defs", NodeDefinition) | ||||
check.inst_param(environment_type, "environment_type", ConfigType) | check.inst_param(environment_type, "environment_type", ConfigType) | ||||
for solid_def in solid_defs: | for node_def in node_defs: | ||||
for config_type in iterate_solid_def_config_types(solid_def): | for config_type in iterate_node_def_config_types(node_def): | ||||
yield config_type | yield config_type | ||||
for config_type in iterate_config_types(environment_type): | for config_type in iterate_config_types(environment_type): | ||||
yield config_type | yield config_type | ||||
def construct_config_type_dictionary(solid_defs, environment_type): | def construct_config_type_dictionary(node_defs, environment_type): | ||||
check.list_param(solid_defs, "solid_defs", ISolidDefinition) | check.list_param(node_defs, "node_defs", NodeDefinition) | ||||
check.inst_param(environment_type, "environment_type", ConfigType) | check.inst_param(environment_type, "environment_type", ConfigType) | ||||
type_dict_by_name = {t.given_name: t for t in ALL_CONFIG_BUILTINS if t.given_name} | type_dict_by_name = {t.given_name: t for t in ALL_CONFIG_BUILTINS if t.given_name} | ||||
type_dict_by_key = {t.key: t for t in ALL_CONFIG_BUILTINS} | type_dict_by_key = {t.key: t for t in ALL_CONFIG_BUILTINS} | ||||
all_types = list(_gather_all_config_types(solid_defs, environment_type)) + list( | all_types = list(_gather_all_config_types(node_defs, environment_type)) + list( | ||||
_gather_all_schemas(solid_defs) | _gather_all_schemas(node_defs) | ||||
) | ) | ||||
for config_type in all_types: | for config_type in all_types: | ||||
name = config_type.given_name | name = config_type.given_name | ||||
if name and name in type_dict_by_name: | if name and name in type_dict_by_name: | ||||
if type(config_type) is not type(type_dict_by_name[name]): | if type(config_type) is not type(type_dict_by_name[name]): | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
( | ( | ||||
Show All 10 Lines |