Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/snap/solid.py
from typing import Any, Dict, List, NamedTuple, Optional, Union | from typing import Any, Dict, List, NamedTuple, Optional, Union | ||||
from dagster import check | from dagster import check | ||||
from dagster.config.snap import ConfigFieldSnap, snap_from_field | from dagster.config.snap import ConfigFieldSnap, snap_from_field | ||||
from dagster.core.definitions import ( | from dagster.core.definitions import ( | ||||
CompositeSolidDefinition, | CompositeSolidDefinition, | ||||
GraphDefinition, | |||||
InputDefinition, | InputDefinition, | ||||
InputMapping, | InputMapping, | ||||
OutputDefinition, | OutputDefinition, | ||||
OutputMapping, | OutputMapping, | ||||
PipelineDefinition, | PipelineDefinition, | ||||
SolidDefinition, | SolidDefinition, | ||||
) | ) | ||||
from dagster.serdes import whitelist_for_serdes | from dagster.serdes import register_serdes_tuple_fallbacks, whitelist_for_serdes | ||||
from .dep_snapshot import ( | from .dep_snapshot import DependencyStructureSnapshot, build_dep_structure_snapshot_from_graph | ||||
DependencyStructureSnapshot, | |||||
build_dep_structure_snapshot_from_icontains_solids, | |||||
) | |||||
@whitelist_for_serdes | @whitelist_for_serdes | ||||
class InputDefSnap( | class InputDefSnap( | ||||
NamedTuple( | NamedTuple( | ||||
"_InputDefSnap", | "_InputDefSnap", | ||||
[ | [ | ||||
("name", str), | ("name", str), | ||||
▲ Show 20 Lines • Show All 147 Lines • ▼ Show 20 Lines | return dict( | ||||
tags=check.dict_param(tags, "tags"), # validate using validate_tags? | tags=check.dict_param(tags, "tags"), # validate using validate_tags? | ||||
config_field_snap=check.opt_inst_param( | config_field_snap=check.opt_inst_param( | ||||
config_field_snap, "config_field_snap", ConfigFieldSnap | config_field_snap, "config_field_snap", ConfigFieldSnap | ||||
), | ), | ||||
) | ) | ||||
@whitelist_for_serdes | @whitelist_for_serdes | ||||
class CompositeSolidDefSnap( | class GraphDefSnap( | ||||
NamedTuple( | NamedTuple( | ||||
"_CompositeSolidDefSnap", | "_GraphDefSnap", | ||||
[ | [ | ||||
("name", str), | ("name", str), | ||||
("input_def_snaps", List[InputDefSnap]), | ("input_def_snaps", List[InputDefSnap]), | ||||
("output_def_snaps", List[OutputDefSnap]), | ("output_def_snaps", List[OutputDefSnap]), | ||||
("description", Optional[str]), | ("description", Optional[str]), | ||||
("tags", Dict[str, Any]), | ("tags", Dict[str, Any]), | ||||
("config_field_snap", Optional[ConfigFieldSnap]), | ("config_field_snap", Optional[ConfigFieldSnap]), | ||||
("dep_structure_snapshot", DependencyStructureSnapshot), | ("dep_structure_snapshot", DependencyStructureSnapshot), | ||||
Show All 9 Lines | def __new__( | ||||
output_def_snaps: List[OutputDefSnap], | output_def_snaps: List[OutputDefSnap], | ||||
description: Optional[str], | description: Optional[str], | ||||
tags: Dict[str, Any], | tags: Dict[str, Any], | ||||
config_field_snap: Optional[ConfigFieldSnap], | config_field_snap: Optional[ConfigFieldSnap], | ||||
dep_structure_snapshot: DependencyStructureSnapshot, | dep_structure_snapshot: DependencyStructureSnapshot, | ||||
input_mapping_snaps: List[InputMappingSnap], | input_mapping_snaps: List[InputMappingSnap], | ||||
output_mapping_snaps: List[OutputMappingSnap], | output_mapping_snaps: List[OutputMappingSnap], | ||||
): | ): | ||||
return super(CompositeSolidDefSnap, cls).__new__( | return super(GraphDefSnap, cls).__new__( | ||||
cls, | cls, | ||||
dep_structure_snapshot=check.inst_param( | dep_structure_snapshot=check.inst_param( | ||||
dep_structure_snapshot, "dep_structure_snapshot", DependencyStructureSnapshot | dep_structure_snapshot, "dep_structure_snapshot", DependencyStructureSnapshot | ||||
), | ), | ||||
input_mapping_snaps=check.list_param( | input_mapping_snaps=check.list_param( | ||||
input_mapping_snaps, "input_mapping_snaps", of_type=InputMappingSnap | input_mapping_snaps, "input_mapping_snaps", of_type=InputMappingSnap | ||||
), | ), | ||||
output_mapping_snaps=check.list_param( | output_mapping_snaps=check.list_param( | ||||
Show All 25 Lines | ): | ||||
def get_input_snap(self, name: str) -> InputDefSnap: | def get_input_snap(self, name: str) -> InputDefSnap: | ||||
return _get_input_snap(self, name) | return _get_input_snap(self, name) | ||||
def get_output_snap(self, name: str) -> OutputDefSnap: | def get_output_snap(self, name: str) -> OutputDefSnap: | ||||
return _get_output_snap(self, name) | return _get_output_snap(self, name) | ||||
register_serdes_tuple_fallbacks( | |||||
{ | |||||
# Composite Solids / Pipeline consolidated in to Graph | |||||
"CompositeSolidDefSnap": GraphDefSnap, | |||||
} | |||||
) | |||||
@whitelist_for_serdes | @whitelist_for_serdes | ||||
class SolidDefSnap( | class SolidDefSnap( | ||||
NamedTuple( | NamedTuple( | ||||
"_SolidDefMeta", | "_SolidDefMeta", | ||||
[ | [ | ||||
("name", str), | ("name", str), | ||||
("input_def_snaps", List[InputDefSnap]), | ("input_def_snaps", List[InputDefSnap]), | ||||
("output_def_snaps", List[OutputDefSnap]), | ("output_def_snaps", List[OutputDefSnap]), | ||||
Show All 37 Lines | |||||
@whitelist_for_serdes | @whitelist_for_serdes | ||||
class SolidDefinitionsSnapshot( | class SolidDefinitionsSnapshot( | ||||
NamedTuple( | NamedTuple( | ||||
"_SolidDefinitionsSnapshot", | "_SolidDefinitionsSnapshot", | ||||
[ | [ | ||||
("solid_def_snaps", List[SolidDefSnap]), | ("solid_def_snaps", List[SolidDefSnap]), | ||||
("composite_solid_def_snaps", List[CompositeSolidDefSnap]), | ("graph_def_snaps", List[GraphDefSnap]), | ||||
], | ], | ||||
) | ) | ||||
): | ): | ||||
def __new__( | def __new__( | ||||
cls, | cls, | ||||
solid_def_snaps: List[SolidDefSnap], | solid_def_snaps: List[SolidDefSnap], | ||||
composite_solid_def_snaps: List[CompositeSolidDefSnap], | # composite_solid_def_snaps -> graph_def_snaps | ||||
): | graph_def_snaps: Optional[List[GraphDefSnap]] = None, | ||||
composite_solid_def_snaps: Optional[List[GraphDefSnap]] = None, | |||||
): | |||||
if composite_solid_def_snaps is not None: | |||||
check.invariant( | |||||
graph_def_snaps is None, | |||||
"Unexpected graph_def_snaps and composite_solid_def_snaps set", | |||||
) | |||||
graph_def_snaps = composite_solid_def_snaps | |||||
return super(SolidDefinitionsSnapshot, cls).__new__( | return super(SolidDefinitionsSnapshot, cls).__new__( | ||||
cls, | cls, | ||||
solid_def_snaps=sorted( | solid_def_snaps=sorted( | ||||
check.list_param(solid_def_snaps, "solid_def_snaps", of_type=SolidDefSnap), | check.list_param(solid_def_snaps, "solid_def_snaps", of_type=SolidDefSnap), | ||||
key=lambda solid_def: solid_def.name, | key=lambda solid_def: solid_def.name, | ||||
), | ), | ||||
composite_solid_def_snaps=sorted( | graph_def_snaps=sorted( | ||||
check.list_param( | check.list_param( | ||||
composite_solid_def_snaps, | graph_def_snaps, | ||||
"composite_solid_def_snaps", | "graph_def_snaps", | ||||
of_type=CompositeSolidDefSnap, | of_type=GraphDefSnap, | ||||
), | ), | ||||
key=lambda comp_def: comp_def.name, | key=lambda comp_def: comp_def.name, | ||||
), | ), | ||||
) | ) | ||||
def build_solid_definitions_snapshot(pipeline_def: PipelineDefinition) -> SolidDefinitionsSnapshot: | def build_solid_definitions_snapshot(pipeline_def: PipelineDefinition) -> SolidDefinitionsSnapshot: | ||||
check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) | check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition) | ||||
return SolidDefinitionsSnapshot( | solid_snaps = [] | ||||
solid_def_snaps=[ | graph_snaps = [] | ||||
build_core_solid_def_snap(solid_def) | |||||
for solid_def in pipeline_def.all_solid_defs | for node_def in pipeline_def.all_node_defs: # all_solid_defs needs to be renamed | ||||
if isinstance(solid_def, SolidDefinition) | if isinstance(node_def, SolidDefinition): | ||||
], | solid_snaps.append(build_core_solid_def_snap(node_def)) | ||||
composite_solid_def_snaps=[ | elif isinstance(node_def, GraphDefinition): | ||||
build_composite_solid_def_snap(solid_def) | graph_snaps.append(build_graph_def_snap(node_def)) | ||||
for solid_def in pipeline_def.all_solid_defs | else: | ||||
if isinstance(solid_def, CompositeSolidDefinition) | check.failed("Unexpected node {node_def}") | ||||
], | |||||
) | |||||
graph_snaps.append(build_graph_def_snap(pipeline_def.graph)) | |||||
def build_i_solid_def_snap( | return SolidDefinitionsSnapshot( | ||||
i_solid_def: Union[SolidDefinition, CompositeSolidDefinition] | solid_def_snaps=solid_snaps, | ||||
) -> Union[CompositeSolidDefSnap, SolidDefSnap]: | graph_def_snaps=graph_snaps, | ||||
return ( | |||||
build_composite_solid_def_snap(i_solid_def) | |||||
if isinstance(i_solid_def, CompositeSolidDefinition) | |||||
else build_core_solid_def_snap(i_solid_def) | |||||
) | ) | ||||
def build_composite_solid_def_snap(comp_solid_def): | def build_graph_def_snap(graph_def: GraphDefinition): | ||||
check.inst_param(comp_solid_def, "comp_solid_def", CompositeSolidDefinition) | check.inst_param(graph_def, "graph_def", GraphDefinition) | ||||
return CompositeSolidDefSnap( | return GraphDefSnap( | ||||
name=comp_solid_def.name, | name=graph_def.name, | ||||
input_def_snaps=list(map(build_input_def_snap, comp_solid_def.input_defs)), | input_def_snaps=list(map(build_input_def_snap, graph_def.input_defs)), | ||||
output_def_snaps=list(map(build_output_def_snap, comp_solid_def.output_defs)), | output_def_snaps=list(map(build_output_def_snap, graph_def.output_defs)), | ||||
description=comp_solid_def.description, | description=graph_def.description, | ||||
tags=comp_solid_def.tags, | tags=graph_def.tags, | ||||
config_field_snap=snap_from_field( | config_field_snap=snap_from_field( | ||||
"config", comp_solid_def.config_mapping.config_schema.as_field() | "config", graph_def.config_mapping.config_schema.as_field() | ||||
) | ) | ||||
if comp_solid_def.config_mapping | if graph_def.config_mapping | ||||
and comp_solid_def.config_mapping.config_schema | and graph_def.config_mapping.config_schema | ||||
and comp_solid_def.config_mapping.config_schema.as_field() | and graph_def.config_mapping.config_schema.as_field() | ||||
else None, | else None, | ||||
dep_structure_snapshot=build_dep_structure_snapshot_from_icontains_solids(comp_solid_def), | dep_structure_snapshot=build_dep_structure_snapshot_from_graph(graph_def), | ||||
input_mapping_snaps=list(map(build_input_mapping_snap, comp_solid_def.input_mappings)), | input_mapping_snaps=list(map(build_input_mapping_snap, graph_def.input_mappings)), | ||||
output_mapping_snaps=list(map(build_output_mapping_snap, comp_solid_def.output_mappings)), | output_mapping_snaps=list(map(build_output_mapping_snap, graph_def.output_mappings)), | ||||
) | ) | ||||
def build_core_solid_def_snap(solid_def): | def build_core_solid_def_snap(solid_def): | ||||
check.inst_param(solid_def, "solid_def", SolidDefinition) | check.inst_param(solid_def, "solid_def", SolidDefinition) | ||||
return SolidDefSnap( | return SolidDefSnap( | ||||
name=solid_def.name, | name=solid_def.name, | ||||
input_def_snaps=list(map(build_input_def_snap, solid_def.input_defs)), | input_def_snaps=list(map(build_input_def_snap, solid_def.input_defs)), | ||||
output_def_snaps=list(map(build_output_def_snap, solid_def.output_defs)), | output_def_snaps=list(map(build_output_def_snap, solid_def.output_defs)), | ||||
description=solid_def.description, | description=solid_def.description, | ||||
tags=solid_def.tags, | tags=solid_def.tags, | ||||
required_resource_keys=sorted(list(solid_def.required_resource_keys)), | required_resource_keys=sorted(list(solid_def.required_resource_keys)), | ||||
config_field_snap=snap_from_field("config", solid_def.config_field) | config_field_snap=snap_from_field("config", solid_def.config_field) | ||||
if solid_def.has_config_field | if solid_def.has_config_field | ||||
else None, | else None, | ||||
) | ) | ||||
# shared impl for CompositeSolidDefSnap and SolidDefSnap | # shared impl for GraphDefSnap and SolidDefSnap | ||||
def _get_input_snap( | def _get_input_snap(solid_def: Union[GraphDefSnap, SolidDefSnap], name: str) -> InputDefSnap: | ||||
solid_def: Union[CompositeSolidDefSnap, SolidDefSnap], name: str | |||||
) -> InputDefSnap: | |||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
for inp in solid_def.input_def_snaps: | for inp in solid_def.input_def_snaps: | ||||
if inp.name == name: | if inp.name == name: | ||||
return inp | return inp | ||||
check.failed( | check.failed( | ||||
"Could not find input {input_name} in solid def {solid_def_name}".format( | "Could not find input {input_name} in solid def {solid_def_name}".format( | ||||
input_name=name, solid_def_name=solid_def.name | input_name=name, solid_def_name=solid_def.name | ||||
) | ) | ||||
) | ) | ||||
# shared impl for CompositeSolidDefSnap and SolidDefSnap | # shared impl for GraphDefSnap and SolidDefSnap | ||||
def _get_output_snap( | def _get_output_snap(solid_def: Union[GraphDefSnap, SolidDefSnap], name: str) -> OutputDefSnap: | ||||
solid_def: Union[CompositeSolidDefSnap, SolidDefSnap], name: str | |||||
) -> OutputDefSnap: | |||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
for out in solid_def.output_def_snaps: | for out in solid_def.output_def_snaps: | ||||
if out.name == name: | if out.name == name: | ||||
return out | return out | ||||
check.failed( | check.failed( | ||||
"Could not find output {output_name} in solid def {solid_def_name}".format( | "Could not find output {output_name} in solid def {solid_def_name}".format( | ||||
output_name=name, solid_def_name=solid_def.name | output_name=name, solid_def_name=solid_def.name | ||||
) | ) | ||||
) | ) |