Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/decorators/op.py
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Union | from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Union | ||||
from dagster import check | from dagster import check | ||||
from dagster.utils.backcompat import experimental_decorator | |||||
from ....seven.typing import get_origin | |||||
from ....utils.backcompat import experimental_decorator | |||||
from ...errors import DagsterInvariantViolationError | |||||
from ..inference import infer_output_props | from ..inference import infer_output_props | ||||
from ..input import In, InputDefinition | from ..input import In, InputDefinition | ||||
from ..output import MultiOut, Out, OutputDefinition | from ..output import Out, OutputDefinition | ||||
from ..policy import RetryPolicy | from ..policy import RetryPolicy | ||||
from ..solid import SolidDefinition | from ..solid import SolidDefinition | ||||
from .solid import _Solid | from .solid import _Solid | ||||
class _Op: | class _Op: | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
name: Optional[str] = None, | name: Optional[str] = None, | ||||
input_defs: Optional[Sequence[InputDefinition]] = None, | input_defs: Optional[Sequence[InputDefinition]] = None, | ||||
output_defs: Optional[Sequence[OutputDefinition]] = None, | output_defs: Optional[Sequence[OutputDefinition]] = None, | ||||
description: Optional[str] = None, | description: Optional[str] = None, | ||||
required_resource_keys: Optional[Set[str]] = None, | required_resource_keys: Optional[Set[str]] = None, | ||||
config_schema: Optional[Union[Any, Dict[str, Any]]] = None, | config_schema: Optional[Union[Any, Dict[str, Any]]] = None, | ||||
tags: Optional[Dict[str, Any]] = None, | tags: Optional[Dict[str, Any]] = None, | ||||
version: Optional[str] = None, | version: Optional[str] = None, | ||||
decorator_takes_context: Optional[bool] = True, | decorator_takes_context: Optional[bool] = True, | ||||
retry_policy: Optional[RetryPolicy] = None, | retry_policy: Optional[RetryPolicy] = None, | ||||
ins: Optional[Dict[str, In]] = None, | ins: Optional[Dict[str, In]] = None, | ||||
out: Optional[Union[Out, MultiOut]] = None, | out: Optional[Union[Out, Dict[str, Out]]] = None, | ||||
): | ): | ||||
self.name = check.opt_str_param(name, "name") | self.name = check.opt_str_param(name, "name") | ||||
self.input_defs = input_defs | self.input_defs = input_defs | ||||
self.output_defs = output_defs | self.output_defs = output_defs | ||||
self.decorator_takes_context = check.bool_param( | self.decorator_takes_context = check.bool_param( | ||||
decorator_takes_context, "decorator_takes_context" | decorator_takes_context, "decorator_takes_context" | ||||
) | ) | ||||
Show All 19 Lines | def __call__(self, fn: Callable[..., Any]) -> SolidDefinition: | ||||
check.failed("Values cannot be provided for both the 'output_defs' and 'out' arguments") | check.failed("Values cannot be provided for both the 'output_defs' and 'out' arguments") | ||||
inferred_out = infer_output_props(fn) | inferred_out = infer_output_props(fn) | ||||
input_defs = [inp.to_definition(name) for name, inp in self.ins.items()] | input_defs = [inp.to_definition(name) for name, inp in self.ins.items()] | ||||
final_output_defs: Optional[Sequence[OutputDefinition]] = None | final_output_defs: Optional[Sequence[OutputDefinition]] = None | ||||
if self.out: | if self.out: | ||||
check.inst_param(self.out, "out", (Out, MultiOut)) | check.inst_param(self.out, "out", (Out, dict)) | ||||
if isinstance(self.out, Out): | if isinstance(self.out, Out): | ||||
final_output_defs = [self.out.to_definition(inferred_out)] | final_output_defs = [self.out.to_definition(inferred_out.annotation, name=None)] | ||||
elif isinstance(self.out, MultiOut): | else: | ||||
final_output_defs = self.out.to_definition_list(inferred_out) | final_output_defs = [] | ||||
# If only a single entry has been provided to the out dict, then slurp the | |||||
# annotation into the entry. | |||||
if len(self.out) == 1: | |||||
name = list(self.out.keys())[0] | |||||
out = list(self.out.values())[0] | |||||
final_output_defs.append(out.to_definition(inferred_out.annotation, name)) | |||||
else: | |||||
# Introspection on type annotations is experimental, so checking | |||||
# metaclass is the best we can do. | |||||
if inferred_out.annotation and not get_origin(inferred_out.annotation) == tuple: | |||||
raise DagsterInvariantViolationError( | |||||
"Expected Tuple annotation for multiple outputs, but received non-tuple annotation." | |||||
) | |||||
if inferred_out.annotation and not len(inferred_out.annotation.__args__) == len( | |||||
self.out | |||||
): | |||||
raise DagsterInvariantViolationError( | |||||
"Expected Tuple annotation to have number of entries matching the " | |||||
f"number of outputs for more than one output. Expected {len(self.out)} " | |||||
f"outputs but annotation has {len(inferred_out.annotation.__args__)}." | |||||
) | |||||
for idx, (name, out) in enumerate(self.out.items()): | |||||
annotation_type = ( | |||||
inferred_out.annotation.__args__[idx] | |||||
if inferred_out.annotation | |||||
else None | |||||
) | |||||
final_output_defs.append(out.to_definition(annotation_type, name=name)) | |||||
else: | else: | ||||
final_output_defs = self.output_defs | final_output_defs = self.output_defs | ||||
return _Solid( | return _Solid( | ||||
name=self.name, | name=self.name, | ||||
input_defs=self.input_defs or input_defs, | input_defs=self.input_defs or input_defs, | ||||
output_defs=final_output_defs, | output_defs=final_output_defs, | ||||
description=self.description, | description=self.description, | ||||
Show All 13 Lines | def op( | ||||
input_defs: Optional[List[InputDefinition]] = None, | input_defs: Optional[List[InputDefinition]] = None, | ||||
output_defs: Optional[List[OutputDefinition]] = None, | output_defs: Optional[List[OutputDefinition]] = None, | ||||
config_schema: Optional[Union[Any, Dict[str, Any]]] = None, | config_schema: Optional[Union[Any, Dict[str, Any]]] = None, | ||||
required_resource_keys: Optional[Set[str]] = None, | required_resource_keys: Optional[Set[str]] = None, | ||||
tags: Optional[Dict[str, Any]] = None, | tags: Optional[Dict[str, Any]] = None, | ||||
version: Optional[str] = None, | version: Optional[str] = None, | ||||
retry_policy: Optional[RetryPolicy] = None, | retry_policy: Optional[RetryPolicy] = None, | ||||
ins: Optional[Dict[str, In]] = None, | ins: Optional[Dict[str, In]] = None, | ||||
out: Optional[Union[Out, MultiOut]] = None, | out: Optional[Union[Out, Dict[str, Out]]] = None, | ||||
) -> Union[_Op, SolidDefinition]: | ) -> Union[_Op, SolidDefinition]: | ||||
"""Op is an experimental replacement for solid, intended to decrease verbosity of core API.""" | """Op is an experimental replacement for solid, intended to decrease verbosity of core API.""" | ||||
alangenfeld: maybe worth taking a first pass on this doc block to start iterating on how we are going to… | |||||
Not Done Inline Actionsbump: documenting here or in follow up alangenfeld: bump: documenting here or in follow up | |||||
# This case is for when decorator is used bare, without arguments. e.g. @op versus @op() | # This case is for when decorator is used bare, without arguments. e.g. @op versus @op() | ||||
if callable(name): | if callable(name): | ||||
check.invariant(input_defs is None) | check.invariant(input_defs is None) | ||||
check.invariant(output_defs is None) | check.invariant(output_defs is None) | ||||
check.invariant(description is None) | check.invariant(description is None) | ||||
check.invariant(config_schema is None) | check.invariant(config_schema is None) | ||||
check.invariant(required_resource_keys is None) | check.invariant(required_resource_keys is None) | ||||
check.invariant(tags is None) | check.invariant(tags is None) | ||||
Show All 17 Lines |
maybe worth taking a first pass on this doc block to start iterating on how we are going to communicate the behavior we are building here