Differential D4920 Diff 24623 python_modules/dagster/dagster/core/definitions/decorators/pipeline.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/decorators/pipeline.py
import warnings | |||||
from functools import update_wrapper | from functools import update_wrapper | ||||
from dagster import check | from dagster import check | ||||
from dagster.utils.backcompat import experimental_arg_warning | |||||
from ..composition import enter_composition, exit_composition | |||||
from ..hook import HookDefinition | from ..hook import HookDefinition | ||||
from ..input import InputDefinition | |||||
from ..mode import ModeDefinition | from ..mode import ModeDefinition | ||||
from ..output import OutputDefinition | |||||
from ..pipeline import PipelineDefinition | from ..pipeline import PipelineDefinition | ||||
from ..preset import PresetDefinition | from ..preset import PresetDefinition | ||||
class _Pipeline(object): | class _Pipeline(object): | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
name=None, | name=None, | ||||
mode_defs=None, | mode_defs=None, | ||||
preset_defs=None, | preset_defs=None, | ||||
description=None, | description=None, | ||||
tags=None, | tags=None, | ||||
hook_defs=None, | hook_defs=None, | ||||
input_defs=None, | |||||
output_defs=None, | |||||
config_schema=None, | |||||
config_fn=None, | |||||
): | ): | ||||
self.name = check.opt_str_param(name, "name") | self.name = check.opt_str_param(name, "name") | ||||
self.mode_definitions = check.opt_list_param(mode_defs, "mode_defs", ModeDefinition) | self.mode_definitions = check.opt_list_param(mode_defs, "mode_defs", ModeDefinition) | ||||
self.preset_definitions = check.opt_list_param(preset_defs, "preset_defs", PresetDefinition) | self.preset_definitions = check.opt_list_param(preset_defs, "preset_defs", PresetDefinition) | ||||
self.description = check.opt_str_param(description, "description") | self.description = check.opt_str_param(description, "description") | ||||
self.tags = check.opt_dict_param(tags, "tags") | self.tags = check.opt_dict_param(tags, "tags") | ||||
self.hook_defs = check.opt_set_param(hook_defs, "hook_defs", of_type=HookDefinition) | self.hook_defs = check.opt_set_param(hook_defs, "hook_defs", of_type=HookDefinition) | ||||
self.input_defs = check.opt_nullable_list_param( | |||||
input_defs, "input_defs", of_type=InputDefinition | |||||
) | |||||
self.did_pass_outputs = output_defs is not None | |||||
self.output_defs = check.opt_nullable_list_param( | |||||
output_defs, "output_defs", of_type=OutputDefinition | |||||
) | |||||
self.config_schema = config_schema | |||||
self.config_fn = check.opt_callable_param(config_fn, "config_fn") | |||||
def __call__(self, fn): | def __call__(self, fn): | ||||
check.callable_param(fn, "fn") | check.callable_param(fn, "fn") | ||||
if not self.name: | if not self.name: | ||||
self.name = fn.__name__ | self.name = fn.__name__ | ||||
enter_composition(self.name, "@pipeline") | from dagster.core.definitions.decorators.composite_solid import do_composition | ||||
try: | |||||
output = fn() | |||||
if output is not None: | |||||
warnings.warn( | |||||
"You have returned a value out of a @pipeline-decorated function. " | |||||
"This currently has no effect on behavior, but will after 0.10.0 is " | |||||
"released. In order to preserve existing behavior to do not return " | |||||
"anything out of this function. Pipelines (and its successor, graphs) " | |||||
"will have meaningful outputs just like composite solids do today, " | |||||
"and the return value will be meaningful.", | |||||
stacklevel=2, | |||||
) | |||||
finally: | ( | ||||
context = exit_composition() | input_mappings, | ||||
output_mappings, | |||||
dependencies, | |||||
solid_defs, | |||||
config_mapping, | |||||
positional_inputs, | |||||
) = do_composition( | |||||
"@pipeline", | |||||
self.name, | |||||
fn, | |||||
self.input_defs, | |||||
self.output_defs, | |||||
self.config_schema, | |||||
self.config_fn, | |||||
ignore_output_from_composition_fn=not self.did_pass_outputs, | |||||
) | |||||
pipeline_def = PipelineDefinition( | pipeline_def = PipelineDefinition( | ||||
name=self.name, | name=self.name, | ||||
dependencies=context.dependencies, | dependencies=dependencies, | ||||
solid_defs=context.solid_defs, | solid_defs=solid_defs, | ||||
mode_defs=self.mode_definitions, | mode_defs=self.mode_definitions, | ||||
preset_defs=self.preset_definitions, | preset_defs=self.preset_definitions, | ||||
description=self.description, | description=self.description, | ||||
tags=self.tags, | tags=self.tags, | ||||
hook_defs=self.hook_defs, | hook_defs=self.hook_defs, | ||||
input_mappings=input_mappings, | |||||
output_mappings=output_mappings, | |||||
config_mapping=config_mapping, | |||||
positional_inputs=positional_inputs, | |||||
) | ) | ||||
update_wrapper(pipeline_def, fn) | update_wrapper(pipeline_def, fn) | ||||
return pipeline_def | return pipeline_def | ||||
def pipeline( | def pipeline( | ||||
name=None, description=None, mode_defs=None, preset_defs=None, tags=None, hook_defs=None | name=None, | ||||
description=None, | |||||
mode_defs=None, | |||||
preset_defs=None, | |||||
tags=None, | |||||
hook_defs=None, | |||||
input_defs=None, | |||||
output_defs=None, | |||||
config_schema=None, | |||||
config_fn=None, | |||||
): | ): | ||||
"""Create a pipeline with the specified parameters from the decorated composition function. | """Create a pipeline with the specified parameters from the decorated composition function. | ||||
Using this decorator allows you to build up the dependency graph of the pipeline by writing a | Using this decorator allows you to build up the dependency graph of the pipeline by writing a | ||||
function that invokes solids and passes the output to other solids. | function that invokes solids and passes the output to other solids. | ||||
Args: | Args: | ||||
name (Optional[str]): The name of the pipeline. Must be unique within any | name (Optional[str]): The name of the pipeline. Must be unique within any | ||||
Show All 37 Lines | Example: | ||||
@pipeline | @pipeline | ||||
def math_pipeline(): | def math_pipeline(): | ||||
two, four = emit_two_four() | two, four = emit_two_four() | ||||
add_one(two) | add_one(two) | ||||
mult_two(four) | mult_two(four) | ||||
""" | """ | ||||
if input_defs is not None: | |||||
experimental_arg_warning("input_defs", "pipeline") | |||||
if output_defs is not None: | |||||
experimental_arg_warning("output_defs", "pipeline") | |||||
if config_schema is not None: | |||||
experimental_arg_warning("config_schema", "pipeline") | |||||
if config_fn is not None: | |||||
experimental_arg_warning("config_fn", "pipeline") | |||||
if callable(name): | if callable(name): | ||||
check.invariant(description is None) | check.invariant(description is None) | ||||
return _Pipeline()(name) | return _Pipeline()(name) | ||||
return _Pipeline( | return _Pipeline( | ||||
name=name, | name=name, | ||||
mode_defs=mode_defs, | mode_defs=mode_defs, | ||||
preset_defs=preset_defs, | preset_defs=preset_defs, | ||||
description=description, | description=description, | ||||
tags=tags, | tags=tags, | ||||
hook_defs=hook_defs, | hook_defs=hook_defs, | ||||
input_defs=input_defs, | |||||
output_defs=output_defs, | |||||
config_schema=config_schema, | |||||
config_fn=config_fn, | |||||
) | ) |