Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/core_tests/test_composites.py
Show All 25 Lines | from dagster.core.utility_solids import ( | ||||
create_root_solid, | create_root_solid, | ||||
create_solid_with_deps, | create_solid_with_deps, | ||||
define_stub_solid, | define_stub_solid, | ||||
input_set, | input_set, | ||||
) | ) | ||||
from dagster.utils.test import get_temp_dir | from dagster.utils.test import get_temp_dir | ||||
def test_composite_basic_execution(): | def test_pipeline_in_pipeline(): | ||||
@solid | |||||
def a_solid(_): | |||||
pass | |||||
@pipeline | |||||
def inner(): | |||||
a_solid() | |||||
@pipeline | |||||
def outer(): | |||||
inner() | |||||
assert execute_pipeline(outer).success | |||||
@pytest.mark.parametrize( | |||||
"composition_decorator", | |||||
[pytest.param(composite_solid, id="composite_solid"), pytest.param(pipeline, id="pipeline")], | |||||
) | |||||
def test_composite_basic_execution(composition_decorator): | |||||
a_source = define_stub_solid("A_source", [input_set("A_input")]) | a_source = define_stub_solid("A_source", [input_set("A_input")]) | ||||
node_a = create_root_solid("A") | node_a = create_root_solid("A") | ||||
node_b = create_solid_with_deps("B", node_a) | node_b = create_solid_with_deps("B", node_a) | ||||
node_c = create_solid_with_deps("C", node_a) | node_c = create_solid_with_deps("C", node_a) | ||||
node_d = create_solid_with_deps("D", node_b, node_c) | node_d = create_solid_with_deps("D", node_b, node_c) | ||||
@composite_solid | @composition_decorator | ||||
def diamond_composite(): | def diamond_composite(): | ||||
a = node_a(a_source()) | a = node_a(a_source()) | ||||
node_d(B=node_b(a), C=node_c(a)) | node_d(B=node_b(a), C=node_c(a)) | ||||
res = execute_solid(diamond_composite) | res = execute_solid(diamond_composite) | ||||
assert res.success | assert res.success | ||||
@pipeline | @pipeline | ||||
def test_pipeline_double(): | def test_pipeline_double(): | ||||
diamond_composite.alias("D1")() | diamond_composite.alias("D1")() | ||||
diamond_composite.alias("D2")() | diamond_composite.alias("D2")() | ||||
result = execute_pipeline(test_pipeline_double) | result = execute_pipeline(test_pipeline_double) | ||||
assert result.success | assert result.success | ||||
@composite_solid | @composition_decorator | ||||
def wrapped_composite(): | def wrapped_composite(): | ||||
diamond_composite() | diamond_composite() | ||||
@pipeline | @pipeline | ||||
def test_pipeline_mixed(): | def test_pipeline_mixed(): | ||||
diamond_composite() | diamond_composite() | ||||
wrapped_composite() | wrapped_composite() | ||||
result = execute_pipeline(test_pipeline_mixed) | result = execute_pipeline(test_pipeline_mixed) | ||||
assert result.success | assert result.success | ||||
@composite_solid | @composition_decorator | ||||
def empty(): | def empty(): | ||||
pass | pass | ||||
@pipeline | @pipeline | ||||
def test_pipeline_empty(): | def test_pipeline_empty(): | ||||
empty() | empty() | ||||
result = execute_pipeline(test_pipeline_empty) | result = execute_pipeline(test_pipeline_empty) | ||||
assert result.success | assert result.success | ||||
nested_pipeline = nesting_composite_pipeline(6, 2) | nested_pipeline = nesting_composite_pipeline(6, 2) | ||||
result = execute_pipeline(nested_pipeline) | result = execute_pipeline(nested_pipeline) | ||||
assert result.success | assert result.success | ||||
def test_composite_config(): | @pytest.mark.parametrize( | ||||
"composition_decorator", | |||||
[pytest.param(composite_solid, id="composite_solid"), pytest.param(pipeline, id="pipeline")], | |||||
) | |||||
def test_composite_config(composition_decorator): | |||||
called = {} | called = {} | ||||
@solid(config_schema=Field(String)) | @solid(config_schema=Field(String)) | ||||
def configured(context): | def configured(context): | ||||
called["configured"] = True | called["configured"] = True | ||||
assert context.solid_config == "yes" | assert context.solid_config == "yes" | ||||
@composite_solid | @composition_decorator | ||||
def inner(): | def inner(): | ||||
configured() | configured() | ||||
@composite_solid | @composition_decorator | ||||
def outer(): | def outer(): | ||||
inner() | inner() | ||||
@pipeline | @pipeline | ||||
def composites_pipeline(): | def composites_pipeline(): | ||||
outer() | outer() | ||||
result = execute_pipeline( | result = execute_pipeline( | ||||
composites_pipeline, | composites_pipeline, | ||||
{"solids": {"outer": {"solids": {"inner": {"solids": {"configured": {"config": "yes"}}}}}}}, | {"solids": {"outer": {"solids": {"inner": {"solids": {"configured": {"config": "yes"}}}}}}}, | ||||
) | ) | ||||
assert result.success | assert result.success | ||||
assert called["configured"] | assert called["configured"] | ||||
def test_composite_config_input(): | @pytest.mark.parametrize( | ||||
"composition_decorator", | |||||
[pytest.param(composite_solid, id="composite_solid"), pytest.param(pipeline, id="pipeline")], | |||||
) | |||||
def test_composite_config_input(composition_decorator): | |||||
called = {} | called = {} | ||||
@solid(input_defs=[InputDefinition("one")]) | @solid(input_defs=[InputDefinition("one")]) | ||||
def node_a(_context, one): | def node_a(_context, one): | ||||
called["node_a"] = True | called["node_a"] = True | ||||
assert one == 1 | assert one == 1 | ||||
@composite_solid | @composition_decorator | ||||
def inner(): | def inner(): | ||||
node_a() | node_a() | ||||
@composite_solid | @composition_decorator | ||||
def outer(): | def outer(): | ||||
inner() | inner() | ||||
@pipeline | @pipeline | ||||
def composites_pipeline(): | def composites_pipeline(): | ||||
outer() | outer() | ||||
result = execute_pipeline( | result = execute_pipeline( | ||||
composites_pipeline, | composites_pipeline, | ||||
{ | { | ||||
"solids": { | "solids": { | ||||
"outer": { | "outer": { | ||||
"solids": {"inner": {"solids": {"node_a": {"inputs": {"one": {"value": 1}}}}}} | "solids": {"inner": {"solids": {"node_a": {"inputs": {"one": {"value": 1}}}}}} | ||||
} | } | ||||
} | } | ||||
}, | }, | ||||
) | ) | ||||
assert result.success | assert result.success | ||||
assert called["node_a"] | assert called["node_a"] | ||||
def test_mapped_composite_config_input(): | @pytest.mark.parametrize( | ||||
"composition_class", | |||||
[ | |||||
pytest.param(CompositeSolidDefinition, id="composite_solid"), | |||||
pytest.param(PipelineDefinition, id="pipeline"), | |||||
], | |||||
) | |||||
def test_mapped_composite_config_input(composition_class): | |||||
called = {} | called = {} | ||||
@solid(input_defs=[InputDefinition("one")]) | @solid(input_defs=[InputDefinition("one")]) | ||||
def node_a(_context, one): | def node_a(_context, one): | ||||
called["node_a"] = True | called["node_a"] = True | ||||
assert one == 1 | assert one == 1 | ||||
assert node_a.has_config_entry is True | |||||
assert not node_a.input_has_default("one") | |||||
@composite_solid | @composite_solid | ||||
def inner(inner_one): | def inner(inner_one): | ||||
node_a(inner_one) | node_a(inner_one) | ||||
outer = CompositeSolidDefinition( | assert inner.has_config_entry | ||||
assert not inner.has_config_mapping | |||||
outer = composition_class( | |||||
name="outer", | name="outer", | ||||
solid_defs=[inner], | solid_defs=[inner], | ||||
input_mappings=[InputDefinition("outer_one").mapping_to("inner", "inner_one")], | input_mappings=[InputDefinition("outer_one").mapping_to("inner", "inner_one")], | ||||
) | ) | ||||
assert not outer.input_has_default("outer_one") | |||||
assert outer.has_config_entry | |||||
assert not outer.has_config_mapping | |||||
pipe = PipelineDefinition(name="composites_pipeline", solid_defs=[outer]) | pipe = PipelineDefinition(name="composites_pipeline", solid_defs=[outer]) | ||||
result = execute_pipeline(pipe, {"solids": {"outer": {"inputs": {"outer_one": {"value": 1}}}}}) | result = execute_pipeline(pipe, {"solids": {"outer": {"inputs": {"outer_one": {"value": 1}}}}}) | ||||
assert result.success | assert result.success | ||||
assert called["node_a"] | assert called["node_a"] | ||||
def test_composite_io_mapping(): | @pytest.mark.parametrize( | ||||
"composition_class", | |||||
[ | |||||
pytest.param(CompositeSolidDefinition, id="composite_solid"), | |||||
pytest.param(PipelineDefinition, id="pipeline"), | |||||
], | |||||
) | |||||
def test_composite_io_mapping(composition_class): | |||||
a_source = define_stub_solid("A_source", [input_set("A_input")]) | a_source = define_stub_solid("A_source", [input_set("A_input")]) | ||||
node_a = create_root_solid("A") | node_a = create_root_solid("A") | ||||
node_b = create_solid_with_deps("B", node_a) | node_b = create_solid_with_deps("B", node_a) | ||||
node_c = create_solid_with_deps("C", node_b) | node_c = create_solid_with_deps("C", node_b) | ||||
comp_a_inner = CompositeSolidDefinition( | comp_a_inner = composition_class( | ||||
name="comp_a_inner", | name="comp_a_inner", | ||||
solid_defs=[a_source, node_a], | solid_defs=[a_source, node_a], | ||||
dependencies={"A": {"A_input": DependencyDefinition("A_source")}}, | dependencies={"A": {"A_input": DependencyDefinition("A_source")}}, | ||||
output_mappings=[OutputDefinition().mapping_from("A")], | output_mappings=[OutputDefinition().mapping_from("A")], | ||||
) | ) | ||||
comp_a_outer = CompositeSolidDefinition( | comp_a_outer = composition_class( | ||||
name="comp_a_outer", | name="comp_a_outer", | ||||
solid_defs=[comp_a_inner], | solid_defs=[comp_a_inner], | ||||
output_mappings=[OutputDefinition().mapping_from("comp_a_inner")], | output_mappings=[OutputDefinition().mapping_from("comp_a_inner")], | ||||
) | ) | ||||
comp_bc_inner = CompositeSolidDefinition( | comp_bc_inner = composition_class( | ||||
name="comp_bc_inner", | name="comp_bc_inner", | ||||
solid_defs=[node_b, node_c], | solid_defs=[node_b, node_c], | ||||
dependencies={"C": {"B": DependencyDefinition("B")}}, | dependencies={"C": {"B": DependencyDefinition("B")}}, | ||||
input_mappings=[ | input_mappings=[ | ||||
InputDefinition(name="inner_B_in").mapping_to(solid_name="B", input_name="A") | InputDefinition(name="inner_B_in").mapping_to(solid_name="B", input_name="A") | ||||
], | ], | ||||
) | ) | ||||
comp_bc_outer = CompositeSolidDefinition( | comp_bc_outer = composition_class( | ||||
name="comp_bc_outer", | name="comp_bc_outer", | ||||
solid_defs=[comp_bc_inner], | solid_defs=[comp_bc_inner], | ||||
dependencies={}, | dependencies={}, | ||||
input_mappings=[ | input_mappings=[ | ||||
InputDefinition(name="outer_B_in").mapping_to( | InputDefinition(name="outer_B_in").mapping_to( | ||||
solid_name="comp_bc_inner", input_name="inner_B_in" | solid_name="comp_bc_inner", input_name="inner_B_in" | ||||
) | ) | ||||
], | ], | ||||
) | ) | ||||
@pipeline | @pipeline | ||||
def wrapped_io(): | def wrapped_io(): | ||||
comp_bc_outer(comp_a_outer()) | comp_bc_outer(comp_a_outer()) | ||||
result = execute_pipeline(wrapped_io) | result = execute_pipeline(wrapped_io) | ||||
assert result.success | assert result.success | ||||
def test_io_error_is_decent(): | @pytest.mark.parametrize( | ||||
"composition_class", | |||||
[ | |||||
pytest.param(CompositeSolidDefinition, id="composite_solid"), | |||||
pytest.param(PipelineDefinition, id="pipeline"), | |||||
], | |||||
) | |||||
def test_io_error_is_decent(composition_class): | |||||
with pytest.raises(DagsterInvalidDefinitionError, match="mapping_to"): | with pytest.raises(DagsterInvalidDefinitionError, match="mapping_to"): | ||||
CompositeSolidDefinition( | composition_class( | ||||
name="comp_a_outer", solid_defs=[], input_mappings=[InputDefinition("should_be_mapped")] | name="comp_a_outer", solid_defs=[], input_mappings=[InputDefinition("should_be_mapped")] | ||||
) | ) | ||||
with pytest.raises(DagsterInvalidDefinitionError, match="mapping_from"): | with pytest.raises(DagsterInvalidDefinitionError, match="mapping_from"): | ||||
CompositeSolidDefinition( | composition_class(name="comp_a_outer", solid_defs=[], output_mappings=[OutputDefinition()]) | ||||
name="comp_a_outer", solid_defs=[], output_mappings=[OutputDefinition()] | |||||
) | |||||
def test_types_descent(): | @pytest.mark.parametrize( | ||||
"composition_decorator", | |||||
[pytest.param(composite_solid, id="composite_solid"), pytest.param(pipeline, id="pipeline")], | |||||
) | |||||
def test_types_descent(composition_decorator): | |||||
@usable_as_dagster_type | @usable_as_dagster_type | ||||
class Foo(object): | class Foo(object): | ||||
pass | pass | ||||
@solid(output_defs=[OutputDefinition(Foo)]) | @solid(output_defs=[OutputDefinition(Foo)]) | ||||
def inner_solid(_context): | def inner_solid(_context): | ||||
return Foo() | return Foo() | ||||
@composite_solid | @composition_decorator | ||||
def middle_solid(): | def middle_solid(): | ||||
inner_solid() | inner_solid() | ||||
@composite_solid | @composition_decorator | ||||
def outer_solid(): | def outer_solid(): | ||||
middle_solid() | middle_solid() | ||||
@pipeline | @pipeline | ||||
def layered_types(): | def layered_types(): | ||||
outer_solid() | outer_solid() | ||||
assert layered_types.has_dagster_type("Foo") | assert layered_types.has_dagster_type("Foo") | ||||
def test_deep_mapping(): | @pytest.mark.parametrize( | ||||
"composition_decorator", | |||||
[pytest.param(composite_solid, id="composite_solid"), pytest.param(pipeline, id="pipeline")], | |||||
) | |||||
def test_deep_mapping(composition_decorator): | |||||
@lambda_solid(output_def=OutputDefinition(String)) | @lambda_solid(output_def=OutputDefinition(String)) | ||||
def echo(blah): | def echo(blah): | ||||
return blah | return blah | ||||
@lambda_solid(output_def=OutputDefinition(String)) | @lambda_solid(output_def=OutputDefinition(String)) | ||||
def emit_foo(): | def emit_foo(): | ||||
return "foo" | return "foo" | ||||
@composite_solid(output_defs=[OutputDefinition(String, "z")]) | @composition_decorator(output_defs=[OutputDefinition(String, "z")]) | ||||
def az(a): | def az(a): | ||||
return echo(a) | return echo(a) | ||||
@composite_solid(output_defs=[OutputDefinition(String, "y")]) | @composition_decorator(output_defs=[OutputDefinition(String, "y")]) | ||||
def by(b): | def by(b): | ||||
return az(b) | return az(b) | ||||
@composite_solid(output_defs=[OutputDefinition(String, "x")]) | @composition_decorator(output_defs=[OutputDefinition(String, "x")]) | ||||
def cx(c): | def cx(c): | ||||
return by(c) | return by(c) | ||||
@pipeline | @pipeline | ||||
def nested(): | def nested(): | ||||
echo(cx(emit_foo())) | echo(cx(emit_foo())) | ||||
result = execute_pipeline(nested) | result = execute_pipeline(nested) | ||||
assert result.result_for_solid("echo").output_value() == "foo" | assert result.result_for_solid("echo").output_value() == "foo" | ||||
def test_mapping_parrallel_composite(): | @pytest.mark.parametrize( | ||||
"composition_decorator", | |||||
[pytest.param(composite_solid, id="composite_solid"), pytest.param(pipeline, id="pipeline")], | |||||
) | |||||
def test_mapping_parallel_composite(composition_decorator): | |||||
@lambda_solid(output_def=OutputDefinition(int)) | @lambda_solid(output_def=OutputDefinition(int)) | ||||
def one(): | def one(): | ||||
return 1 | return 1 | ||||
@lambda_solid(output_def=OutputDefinition(int)) | @lambda_solid(output_def=OutputDefinition(int)) | ||||
def two(): | def two(): | ||||
return 2 | return 2 | ||||
@lambda_solid( | @lambda_solid( | ||||
input_defs=[ | input_defs=[ | ||||
InputDefinition(dagster_type=int, name="a"), | InputDefinition(dagster_type=int, name="a"), | ||||
InputDefinition(dagster_type=int, name="b"), | InputDefinition(dagster_type=int, name="b"), | ||||
], | ], | ||||
output_def=OutputDefinition(int), | output_def=OutputDefinition(int), | ||||
) | ) | ||||
def adder(a, b): | def adder(a, b): | ||||
return a + b | return a + b | ||||
@composite_solid( | @composition_decorator( | ||||
output_defs=[ | output_defs=[ | ||||
OutputDefinition(dagster_type=int, name="two"), | OutputDefinition(dagster_type=int, name="two"), | ||||
OutputDefinition(dagster_type=int, name="four"), | OutputDefinition(dagster_type=int, name="four"), | ||||
] | ] | ||||
) | ) | ||||
def composite_adder(): | def composite_adder(): | ||||
result_one = one() | result_one = one() | ||||
result_two = two() | result_two = two() | ||||
Show All 19 Lines | def recreate_issue_pipeline(): | ||||
result = composite_adder() | result = composite_adder() | ||||
assert_two(result.two) # pylint: disable=no-member | assert_two(result.two) # pylint: disable=no-member | ||||
assert_four(result.four) # pylint: disable=no-member | assert_four(result.four) # pylint: disable=no-member | ||||
assert execute_pipeline(recreate_issue_pipeline).success | assert execute_pipeline(recreate_issue_pipeline).success | ||||
def test_composite_config_driven_materialization(): | @pytest.mark.parametrize( | ||||
"composition_decorator", | |||||
[pytest.param(composite_solid, id="composite_solid"), pytest.param(pipeline, id="pipeline")], | |||||
) | |||||
def test_composite_config_driven_materialization(composition_decorator): | |||||
@lambda_solid | @lambda_solid | ||||
def one(): | def one(): | ||||
return 1 | return 1 | ||||
@composite_solid | @composition_decorator(output_defs=[OutputDefinition()]) | ||||
def wrap_one(): | def wrap_one(): | ||||
return one() | return one() | ||||
@pipeline | @pipeline | ||||
def composite_config_driven_materialization_pipeline(): | def composite_config_driven_materialization_pipeline(): | ||||
wrap_one() | wrap_one() | ||||
with get_temp_dir() as write_directory: | with get_temp_dir() as write_directory: | ||||
write_location = os.path.join(write_directory, "wrap_one.json") | write_location = os.path.join(write_directory, "wrap_one.json") | ||||
execute_pipeline( | execute_pipeline( | ||||
composite_config_driven_materialization_pipeline, | composite_config_driven_materialization_pipeline, | ||||
run_config={ | run_config={ | ||||
"solids": { | "solids": { | ||||
"wrap_one": {"outputs": [{"result": {"json": {"path": write_location}}}]} | "wrap_one": {"outputs": [{"result": {"json": {"path": write_location}}}]} | ||||
} | } | ||||
}, | }, | ||||
) | ) | ||||
assert os.path.exists(write_location) | assert os.path.exists(write_location) | ||||
def test_mapping_errors(): | @pytest.mark.parametrize( | ||||
"composition_class", | |||||
[ | |||||
pytest.param(CompositeSolidDefinition, id="composite_solid"), | |||||
pytest.param(PipelineDefinition, id="pipeline"), | |||||
], | |||||
) | |||||
def test_mapping_errors(composition_class): | |||||
@lambda_solid | @lambda_solid | ||||
def echo(foo): | def echo(foo): | ||||
return foo | return foo | ||||
with pytest.raises( | with pytest.raises( | ||||
DagsterInvalidDefinitionError, match="references solid 'inner' which it does not contain" | DagsterInvalidDefinitionError, match="references solid 'inner' which it does not contain" | ||||
): | ): | ||||
CompositeSolidDefinition( | composition_class( | ||||
name="bad", | name="bad", | ||||
solid_defs=[echo], | solid_defs=[echo], | ||||
input_mappings=[InputDefinition("mismatch").mapping_to("inner", "foo")], | input_mappings=[InputDefinition("mismatch").mapping_to("inner", "foo")], | ||||
) | ) | ||||
with pytest.raises(DagsterInvalidDefinitionError, match="no input named 'bar'"): | with pytest.raises(DagsterInvalidDefinitionError, match="no input named 'bar'"): | ||||
CompositeSolidDefinition( | composition_class( | ||||
name="bad", | name="bad", | ||||
solid_defs=[echo], | solid_defs=[echo], | ||||
input_mappings=[InputDefinition("mismatch").mapping_to("echo", "bar")], | input_mappings=[InputDefinition("mismatch").mapping_to("echo", "bar")], | ||||
) | ) | ||||
with pytest.raises( | with pytest.raises( | ||||
DagsterInvalidDefinitionError, | DagsterInvalidDefinitionError, | ||||
match="InputMapping source and destination must have the same type", | match="InputMapping source and destination must have the same type", | ||||
): | ): | ||||
CompositeSolidDefinition( | composition_class( | ||||
name="bad", | name="bad", | ||||
solid_defs=[echo], | solid_defs=[echo], | ||||
input_mappings=[InputDefinition("mismatch", str).mapping_to("echo", "foo")], | input_mappings=[InputDefinition("mismatch", str).mapping_to("echo", "foo")], | ||||
) | ) | ||||
with pytest.raises( | with pytest.raises( | ||||
DagsterInvalidDefinitionError, | DagsterInvalidDefinitionError, | ||||
match="mappings with same definition name but different definitions", | match="mappings with same definition name but different definitions", | ||||
): | ): | ||||
CompositeSolidDefinition( | composition_class( | ||||
name="bad", | name="bad", | ||||
solid_defs=[echo], | solid_defs=[echo], | ||||
input_mappings=[ | input_mappings=[ | ||||
InputDefinition("mismatch").mapping_to("echo", "foo"), | InputDefinition("mismatch").mapping_to("echo", "foo"), | ||||
InputDefinition("mismatch").mapping_to("echo_2", "foo"), | InputDefinition("mismatch").mapping_to("echo_2", "foo"), | ||||
], | ], | ||||
) | ) | ||||
with pytest.raises( | with pytest.raises( | ||||
DagsterInvalidDefinitionError, match="references solid 'inner' which it does not contain" | DagsterInvalidDefinitionError, match="references solid 'inner' which it does not contain" | ||||
): | ): | ||||
CompositeSolidDefinition( | composition_class( | ||||
name="bad", | name="bad", | ||||
solid_defs=[echo], | solid_defs=[echo], | ||||
output_mappings=[OutputDefinition().mapping_from("inner", "result")], | output_mappings=[OutputDefinition().mapping_from("inner", "result")], | ||||
) | ) | ||||
with pytest.raises(DagsterInvalidDefinitionError, match="no output named 'return'"): | with pytest.raises(DagsterInvalidDefinitionError, match="no output named 'return'"): | ||||
CompositeSolidDefinition( | composition_class( | ||||
name="bad", | name="bad", | ||||
solid_defs=[echo], | solid_defs=[echo], | ||||
output_mappings=[OutputDefinition().mapping_from("echo", "return")], | output_mappings=[OutputDefinition().mapping_from("echo", "return")], | ||||
) | ) | ||||
with pytest.raises( | with pytest.raises( | ||||
DagsterInvalidDefinitionError, | DagsterInvalidDefinitionError, | ||||
match="OutputMapping source and destination must have the same type", | match="OutputMapping source and destination must have the same type", | ||||
): | ): | ||||
CompositeSolidDefinition( | composition_class( | ||||
name="bad", | name="bad", | ||||
solid_defs=[echo], | solid_defs=[echo], | ||||
output_mappings=[OutputDefinition(str).mapping_from("echo", "result")], | output_mappings=[OutputDefinition(str).mapping_from("echo", "result")], | ||||
) | ) | ||||
def test_composite_skippable_output_result(): | @pytest.mark.parametrize( | ||||
"composition_decorator", | |||||
[pytest.param(composite_solid, id="composite_solid"), pytest.param(pipeline, id="pipeline")], | |||||
) | |||||
def test_composite_skippable_output_result(composition_decorator): | |||||
@lambda_solid(output_def=OutputDefinition(int)) | @lambda_solid(output_def=OutputDefinition(int)) | ||||
def emit_one(): | def emit_one(): | ||||
return 1 | return 1 | ||||
@lambda_solid(output_def=OutputDefinition(Optional[float])) | @lambda_solid(output_def=OutputDefinition(Optional[float])) | ||||
def echo(x): | def echo(x): | ||||
return x | return x | ||||
@solid(output_defs=[OutputDefinition(Optional[float], name="foo_output", is_required=False)]) | @solid(output_defs=[OutputDefinition(Optional[float], name="foo_output", is_required=False)]) | ||||
def foo_solid(_, condition=False): | def foo_solid(_, condition=False): | ||||
if condition: | if condition: | ||||
yield Output(1.0, "foo_output") | yield Output(1.0, "foo_output") | ||||
@composite_solid( | @composition_decorator( | ||||
output_defs=[ | output_defs=[ | ||||
OutputDefinition(Optional[float], name="foo_output", is_required=False), | OutputDefinition(Optional[float], name="foo_output", is_required=False), | ||||
OutputDefinition(int, name="one_output"), | OutputDefinition(int, name="one_output"), | ||||
] | ] | ||||
) | ) | ||||
def foo_composite(): | def foo_composite(): | ||||
return {"foo_output": foo_solid(), "one_output": emit_one()} | return {"foo_output": foo_solid(), "one_output": emit_one()} | ||||
@composite_solid( | @composition_decorator( | ||||
output_defs=[OutputDefinition(Optional[float], name="foo_output", is_required=False),] | output_defs=[OutputDefinition(Optional[float], name="foo_output", is_required=False),] | ||||
) | ) | ||||
def baz_composite(): | def baz_composite(): | ||||
return {"foo_output": echo(foo_solid())} | return {"foo_output": echo(foo_solid())} | ||||
result = execute_solid(foo_composite) | result = execute_solid(foo_composite) | ||||
assert result.output_values == {"one_output": 1} | assert result.output_values == {"one_output": 1} | ||||
result = execute_solid(baz_composite) | result = execute_solid(baz_composite) | ||||
assert result.output_values == {} | assert result.output_values == {} |