Differential D5721 Diff 28324 python_modules/dagster/dagster_tests/core_tests/dynamic_tests/test_not_allowed.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/core_tests/dynamic_tests/test_not_allowed.py
import pytest | import pytest | ||||
from dagster import ( | from dagster import DagsterInvalidDefinitionError, composite_solid, pipeline, solid | ||||
DagsterInvalidDefinitionError, | |||||
OutputDefinition, | |||||
composite_solid, | |||||
pipeline, | |||||
solid, | |||||
) | |||||
from dagster.experimental import DynamicOutput, DynamicOutputDefinition | from dagster.experimental import DynamicOutput, DynamicOutputDefinition | ||||
@solid(output_defs=[DynamicOutputDefinition()]) | @solid(output_defs=[DynamicOutputDefinition()]) | ||||
def dynamic_solid(_): | def dynamic_solid(_): | ||||
yield DynamicOutput(1, mapping_key="1") | yield DynamicOutput(1, mapping_key="1") | ||||
yield DynamicOutput(2, mapping_key="2") | yield DynamicOutput(2, mapping_key="2") | ||||
@solid | @solid | ||||
def echo(_, x): | def echo(_, x): | ||||
return x | return x | ||||
@solid | @solid | ||||
def add(_, x, y): | def add(_, x, y): | ||||
return x + y | return x + y | ||||
def test_composite(): | # preserved for conversation purposes - this pattern cant really be expressed without "map" or a direct unpacking function | ||||
with pytest.raises( | # | ||||
DagsterInvalidDefinitionError, match="Definition types must align", | # def test_composite(): | ||||
): | # with pytest.raises( | ||||
# DagsterInvalidDefinitionError, match="Definition types must align", | |||||
@composite_solid(output_defs=[OutputDefinition()]) | # ): | ||||
def _should_fail(): | |||||
return dynamic_solid() | # @composite_solid(output_defs=[OutputDefinition()]) | ||||
# def _should_fail(): | |||||
with pytest.raises( | # return dynamic_solid() | ||||
DagsterInvalidDefinitionError, | |||||
match=" must be a DynamicOutputDefinition since it is downstream of dynamic output", | # with pytest.raises( | ||||
): | # DagsterInvalidDefinitionError, | ||||
# match=" must be a DynamicOutputDefinition since it is downstream of dynamic output", | |||||
@composite_solid(output_defs=[OutputDefinition()]) | # ): | ||||
def _should_fail(): | |||||
return echo(dynamic_solid()) | # @composite_solid(output_defs=[OutputDefinition()]) | ||||
# def _should_fail(): | |||||
# return echo(dynamic_solid()) | |||||
alangenfeld: [1] | |||||
def test_fan_in(): | def test_fan_in(): | ||||
with pytest.raises( | with pytest.raises( | ||||
DagsterInvalidDefinitionError, | DagsterInvalidDefinitionError, | ||||
match='Problematic dependency on dynamic output "dynamic_solid:result"', | match='Problematic dependency on dynamic output "dynamic_solid:result"', | ||||
): | ): | ||||
@pipeline | @pipeline | ||||
def _should_fail(): | def _should_fail(): | ||||
echo([dynamic_solid()]) | numbers = [] | ||||
# pylint: disable=no-member | |||||
dynamic_solid().forEach(numbers.append) | |||||
echo(numbers) | |||||
def test_multi_direct(): | def test_multi_direct(): | ||||
with pytest.raises( | with pytest.raises( | ||||
DagsterInvalidDefinitionError, match="cannot be downstream of more than one dynamic output", | DagsterInvalidDefinitionError, match="cannot be downstream of more than one dynamic output", | ||||
): | ): | ||||
@pipeline | @pipeline | ||||
def _should_fail(): | def _should_fail(): | ||||
add(dynamic_solid(), dynamic_solid()) | def _add(x): | ||||
# pylint: disable=no-member | |||||
dynamic_solid().forEach(lambda y: add(x, y)) | |||||
# pylint: disable=no-member | |||||
def test_multi_indirect(): | dynamic_solid().forEach(_add) | ||||
with pytest.raises( | |||||
DagsterInvalidDefinitionError, match="cannot be downstream of more than one dynamic output", | |||||
): | |||||
@pipeline | |||||
def _should_fail(): | |||||
x = echo(dynamic_solid()) | |||||
add(dynamic_solid(), x) | |||||
def test_multi_composite_out(): | def test_multi_indirect(): | ||||
with pytest.raises( | with pytest.raises( | ||||
DagsterInvalidDefinitionError, match="cannot be downstream of more than one dynamic output", | DagsterInvalidDefinitionError, match="cannot be downstream of more than one dynamic output", | ||||
): | ): | ||||
@composite_solid(output_defs=[DynamicOutputDefinition()]) | |||||
def composed_echo(): | |||||
return echo(dynamic_solid()) | |||||
@pipeline | @pipeline | ||||
def _should_fail(): | def _should_fail(): | ||||
add(composed_echo(), dynamic_solid()) | def _add(x): | ||||
# pylint: disable=no-member | |||||
dynamic_solid().forEach(lambda y: add(x, y)) | |||||
# pylint: disable=no-member | |||||
dynamic_solid().forEach(lambda z: _add(echo(z))) | |||||
# same as commented out case above | |||||
# | |||||
# def test_multi_composite_out(): | |||||
# with pytest.raises( | |||||
# DagsterInvalidDefinitionError, match="cannot be downstream of more than one dynamic output", | |||||
# ): | |||||
# @composite_solid(output_defs=[DynamicOutputDefinition()]) | |||||
# def composed_echo(): | |||||
# return echo(dynamic_solid()) | |||||
# @pipeline | |||||
# def _should_fail(): | |||||
# add(composed_echo(), dynamic_solid()) | |||||
alangenfeldAuthorUnsubmitted Done Inline Actions[1] alangenfeld: [1] | |||||
def test_multi_composite_in(): | def test_multi_composite_in(): | ||||
with pytest.raises( | with pytest.raises( | ||||
DagsterInvalidDefinitionError, | DagsterInvalidDefinitionError, | ||||
match='cannot be downstream of dynamic output "dynamic_solid:result" since input "a" maps to a solid that is already downstream of another dynamic output', | match='cannot be downstream of dynamic output "dynamic_solid:result" since input "a" maps to a solid that is already downstream of another dynamic output', | ||||
): | ): | ||||
@composite_solid | @composite_solid | ||||
def composed_add(a): | def composed_add(a): | ||||
add(a, dynamic_solid()) | # pylint: disable=no-member | ||||
dynamic_solid().forEach(lambda b: add(a, b)) | |||||
@pipeline | @pipeline | ||||
def _should_fail(): | def _should_fail(): | ||||
y = echo(dynamic_solid()) | # pylint: disable=no-member | ||||
composed_add(y) | dynamic_solid().forEach(lambda x: composed_add(echo(x))) | ||||
def test_direct_dep(): | def test_direct_dep(): | ||||
@solid(output_defs=[DynamicOutputDefinition()]) | @solid(output_defs=[DynamicOutputDefinition()]) | ||||
def dynamic_add(_, x): | def dynamic_add(_, x): | ||||
yield DynamicOutput(x + 1, mapping_key="1") | yield DynamicOutput(x + 1, mapping_key="1") | ||||
yield DynamicOutput(x + 2, mapping_key="2") | yield DynamicOutput(x + 2, mapping_key="2") | ||||
@pipeline | @pipeline | ||||
def _is_fine(): | def _is_fine(): | ||||
dynamic_add(dynamic_solid()) | # pylint: disable=no-member | ||||
dynamic_solid().forEach(dynamic_add) | |||||
with pytest.raises( | with pytest.raises( | ||||
DagsterInvalidDefinitionError, match="cannot be downstream of more than one dynamic output", | DagsterInvalidDefinitionError, match="cannot be downstream of more than one dynamic output", | ||||
): | ): | ||||
@pipeline | @pipeline | ||||
def _should_fail(): | def _should_fail(): | ||||
echo(dynamic_add(dynamic_solid())) | # pylint: disable=no-member | ||||
dynamic_solid().forEach(lambda x: dynamic_add(x).forEach(echo)) |
[1]