Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/output.py
from collections import namedtuple | from collections import namedtuple | ||||
from typing import Any, Dict, List, NamedTuple, Optional, Set, TypeVar | from typing import Any, Callable, Dict, NamedTuple, Optional, Set, Type, TypeVar, Union | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions.events import AssetKey | from dagster.core.definitions.events import AssetKey | ||||
from dagster.core.errors import DagsterError, DagsterInvalidDefinitionError | from dagster.core.errors import DagsterError, DagsterInvalidDefinitionError | ||||
from dagster.core.types.dagster_type import DagsterType, resolve_dagster_type | from dagster.core.types.dagster_type import DagsterType, resolve_dagster_type | ||||
from dagster.utils.backcompat import experimental_arg_warning | from dagster.utils.backcompat import experimental_arg_warning | ||||
from .inference import InferredOutputProps | from .inference import InferredOutputProps | ||||
▲ Show 20 Lines • Show All 274 Lines • ▼ Show 20 Lines | class OutputMapping(namedtuple("_OutputMapping", "definition maps_from")): | ||||
def __new__(cls, definition, maps_from): | def __new__(cls, definition, maps_from): | ||||
return super(OutputMapping, cls).__new__( | return super(OutputMapping, cls).__new__( | ||||
cls, | cls, | ||||
check.inst_param(definition, "definition", OutputDefinition), | check.inst_param(definition, "definition", OutputDefinition), | ||||
check.inst_param(maps_from, "maps_from", OutputPointer), | check.inst_param(maps_from, "maps_from", OutputPointer), | ||||
) | ) | ||||
class Out( | class Out( | ||||
namedtuple( | NamedTuple( | ||||
"_Out", | "_Out", | ||||
"dagster_type name description is_required io_manager_key metadata asset_key " | [ | ||||
"asset_partitions", | ("dagster_type", Union[DagsterType, Type[NoValueSentinel]]), | ||||
("description", Optional[str]), | |||||
("is_required", Optional[bool]), | |||||
("io_manager_key", Optional[str]), | |||||
("metadata", Optional[Dict[str, Any]]), | |||||
("asset_key", Optional[Union[AssetKey, Callable[..., AssetKey]]]), | |||||
("asset_partitions", Optional[Union[Set[str], Callable[..., Set[str]]]]), | |||||
], | |||||
) | ) | ||||
): | ): | ||||
alangenfeld: nit: `typing.NamedTuple` | |||||
"""Experimental replacement for OutputDefinition intended to decrease verbosity.""" | """Experimental replacement for OutputDefinition intended to decrease verbosity.""" | ||||
def __new__( | def __new__( | ||||
cls, | cls, | ||||
dagster_type=NoValueSentinel, | dagster_type=NoValueSentinel, | ||||
name=None, | |||||
description=None, | description=None, | ||||
is_required=None, | is_required=None, | ||||
io_manager_key=None, | io_manager_key=None, | ||||
metadata=None, | metadata=None, | ||||
asset_key=None, | asset_key=None, | ||||
asset_partitions=None, | asset_partitions=None, | ||||
# make sure new parameters are updated in combine_with_inferred below | # make sure new parameters are updated in combine_with_inferred below | ||||
): | ): | ||||
return super(Out, cls).__new__( | return super(Out, cls).__new__( | ||||
cls, | cls, | ||||
dagster_type=dagster_type, | dagster_type=dagster_type, | ||||
name=name, | |||||
description=description, | description=description, | ||||
is_required=is_required, | is_required=is_required, | ||||
io_manager_key=io_manager_key, | io_manager_key=io_manager_key, | ||||
metadata=metadata, | metadata=metadata, | ||||
asset_key=asset_key, | asset_key=asset_key, | ||||
asset_partitions=asset_partitions, | asset_partitions=asset_partitions, | ||||
) | ) | ||||
def to_definition(self, inferred: InferredOutputProps) -> "OutputDefinition": | def to_definition(self, annotation_type: type, name: Optional[str]) -> "OutputDefinition": | ||||
dagster_type = ( | dagster_type = ( | ||||
self.dagster_type if self.dagster_type is not NoValueSentinel else inferred.annotation | self.dagster_type if self.dagster_type is not NoValueSentinel else annotation_type | ||||
) | ) | ||||
return OutputDefinition( | return OutputDefinition( | ||||
dagster_type=dagster_type, | dagster_type=dagster_type, | ||||
name=self.name, | name=name, | ||||
description=self.description, | description=self.description, | ||||
is_required=self.is_required, | is_required=self.is_required, | ||||
io_manager_key=self.io_manager_key, | io_manager_key=self.io_manager_key, | ||||
metadata=self.metadata, | metadata=self.metadata, | ||||
asset_key=self.asset_key, | asset_key=self.asset_key, | ||||
asset_partitions=self.asset_partitions, | asset_partitions=self.asset_partitions, | ||||
) | ) | ||||
class MultiOut(NamedTuple("_MultiOut", [("outs", List[Out])])): | |||||
"""Experimental replacement for providing a list of output definitions, to decrease verbosity.""" | |||||
def __new__(cls, outs: Dict[str, Out]): | |||||
out_list = [ | |||||
Out( | |||||
dagster_type=out.dagster_type, | |||||
name=key, | |||||
description=out.description, | |||||
is_required=out.is_required, | |||||
io_manager_key=out.io_manager_key, | |||||
metadata=out.metadata, | |||||
asset_key=out.asset_key, | |||||
asset_partitions=out.asset_partitions, | |||||
) | |||||
for key, out in outs.items() | |||||
] | |||||
return super(MultiOut, cls).__new__( | |||||
cls, | |||||
check.list_param(out_list, "outs", Out), | |||||
) | |||||
def to_definition_list(self, inferred: InferredOutputProps) -> List[OutputDefinition]: | |||||
output_defs = [] | |||||
for idx, out in enumerate(self.outs): | |||||
annotation_type = inferred.annotation.__args__[idx] if inferred.annotation else None | |||||
output_defs.append( | |||||
OutputDefinition( | |||||
dagster_type=annotation_type, | |||||
name=out.name, | |||||
description=out.description, | |||||
is_required=out.is_required, | |||||
io_manager_key=out.io_manager_key, | |||||
metadata=out.metadata, | |||||
asset_key=out.asset_key, | |||||
asset_partitions=out.asset_partitions, | |||||
) | |||||
) | |||||
return output_defs |
nit: typing.NamedTuple