Differential D8684 Diff 40813 examples/docs_snippets/docs_snippets/concepts/solids_pipelines/unit_tests.py
Changeset View
Changeset View
Standalone View
Standalone View
examples/docs_snippets/docs_snippets/concepts/solids_pipelines/unit_tests.py
Show All 10 Lines | from dagster import ( | ||||
execute_pipeline, | execute_pipeline, | ||||
execute_solid, | execute_solid, | ||||
pipeline, | pipeline, | ||||
solid, | solid, | ||||
) | ) | ||||
@solid( | @solid( | ||||
input_defs=[InputDefinition(name="num", dagster_type=int, default_value=1)], | input_defs=[ | ||||
InputDefinition(name="num", dagster_type=int, default_value=1) | |||||
], | |||||
) | ) | ||||
def add_one(num: int) -> int: | def add_one(num: int) -> int: | ||||
return num + 1 | return num + 1 | ||||
@solid( | @solid( | ||||
input_defs=[InputDefinition(name="num", dagster_type=int, default_value=1)], | input_defs=[ | ||||
InputDefinition(name="num", dagster_type=int, default_value=1) | |||||
], | |||||
) | ) | ||||
def add_two(num: int) -> int: | def add_two(num: int) -> int: | ||||
return num + 2 | return num + 2 | ||||
@solid | @solid | ||||
def subtract(left: int, right: int) -> int: | def subtract(left: int, right: int) -> int: | ||||
return left - right | return left - right | ||||
@pipeline | @pipeline | ||||
def do_math(): | def do_math(): | ||||
subtract(add_one(), add_two()) | subtract(add_one(), add_two()) | ||||
@solid( | @solid( | ||||
input_defs=[InputDefinition(name="input_num", dagster_type=int)], | input_defs=[InputDefinition(name="input_num", dagster_type=int)], | ||||
output_defs=[OutputDefinition(name="a_num", dagster_type=int)], | output_defs=[OutputDefinition(name="a_num", dagster_type=int)], | ||||
) | ) | ||||
def emit_events_solid(input_num): | def emit_events_solid(input_num): | ||||
a_num = input_num + 1 | a_num = input_num + 1 | ||||
yield ExpectationResult( | yield ExpectationResult( | ||||
success=a_num > 0, label="positive", description="A num must be positive" | success=a_num > 0, | ||||
label="positive", | |||||
description="A num must be positive", | |||||
) | ) | ||||
yield AssetMaterialization( | yield AssetMaterialization( | ||||
asset_key="persisted_string", description="Let us pretend we persisted the string somewhere" | asset_key="persisted_string", | ||||
description="Let us pretend we persisted the string somewhere", | |||||
) | ) | ||||
yield Output(value=a_num, output_name="a_num") | yield Output(value=a_num, output_name="a_num") | ||||
@pipeline | @pipeline | ||||
def emit_events_pipeline(): | def emit_events_pipeline(): | ||||
emit_events_solid() | emit_events_solid() | ||||
▲ Show 20 Lines • Show All 80 Lines • ▼ Show 20 Lines | |||||
# start_test_resource_def_marker | # start_test_resource_def_marker | ||||
@resource(config_schema={"my_str": str}) | @resource(config_schema={"my_str": str}) | ||||
def my_foo_resource(context): | def my_foo_resource(context): | ||||
return context.resource_config["my_str"] | return context.resource_config["my_str"] | ||||
def test_solid_resource_def(): | def test_solid_resource_def(): | ||||
context = build_solid_context(resources={"foo": my_foo_resource.configured({"my_str": "bar"})}) | context = build_solid_context( | ||||
resources={"foo": my_foo_resource.configured({"my_str": "bar"})} | |||||
) | |||||
assert solid_requires_foo(context) == "found bar" | assert solid_requires_foo(context) == "found bar" | ||||
# end_test_resource_def_marker | # end_test_resource_def_marker | ||||
# start_test_pipeline_with_config | # start_test_pipeline_with_config | ||||
def test_pipeline_with_config(): | def test_pipeline_with_config(): | ||||
result = execute_pipeline( | result = execute_pipeline( | ||||
do_math, | do_math, | ||||
run_config={ | run_config={ | ||||
"solids": {"add_one": {"inputs": {"num": 2}}, "add_two": {"inputs": {"num": 3}}} | "solids": { | ||||
"add_one": {"inputs": {"num": 2}}, | |||||
"add_two": {"inputs": {"num": 3}}, | |||||
} | |||||
}, | }, | ||||
) | ) | ||||
assert result.success | assert result.success | ||||
assert result.output_for_solid("add_one") == 3 | assert result.output_for_solid("add_one") == 3 | ||||
assert result.output_for_solid("add_two") == 5 | assert result.output_for_solid("add_two") == 5 | ||||
assert result.output_for_solid("subtract") == -2 | assert result.output_for_solid("subtract") == -2 | ||||
Show All 11 Lines | def test_subset_execution(): | ||||
) | ) | ||||
assert result.success | assert result.success | ||||
assert result.output_for_solid("add_one") == 2 | assert result.output_for_solid("add_one") == 2 | ||||
assert result.output_for_solid("add_two") == 3 | assert result.output_for_solid("add_two") == 3 | ||||
# solid_result_list returns List[SolidExecutionResult] | # solid_result_list returns List[SolidExecutionResult] | ||||
# this checks to see that only two were executed | # this checks to see that only two were executed | ||||
assert {solid_result.solid.name for solid_result in result.solid_result_list} == { | assert { | ||||
solid_result.solid.name for solid_result in result.solid_result_list | |||||
} == { | |||||
"add_one", | "add_one", | ||||
"add_two", | "add_two", | ||||
} | } | ||||
# end_test_subset_execution | # end_test_subset_execution | ||||
# start_test_event_stream | # start_test_event_stream | ||||
def test_event_stream(): | def test_event_stream(): | ||||
pipeline_result = execute_pipeline( | pipeline_result = execute_pipeline( | ||||
emit_events_pipeline, {"solids": {"emit_events_solid": {"inputs": {"input_num": 1}}}} | emit_events_pipeline, | ||||
{"solids": {"emit_events_solid": {"inputs": {"input_num": 1}}}}, | |||||
) | ) | ||||
assert pipeline_result.success | assert pipeline_result.success | ||||
solid_result = pipeline_result.result_for_solid("emit_events_solid") | solid_result = pipeline_result.result_for_solid("emit_events_solid") | ||||
assert isinstance(solid_result, SolidExecutionResult) | assert isinstance(solid_result, SolidExecutionResult) | ||||
# when one has multiple outputs, you need to specify output name | # when one has multiple outputs, you need to specify output name | ||||
Show All 16 Lines | ( | ||||
expectation_event, | expectation_event, | ||||
materialization_event, | materialization_event, | ||||
_num_output_event, | _num_output_event, | ||||
_num_handled_output_operation, | _num_handled_output_operation, | ||||
_success, | _success, | ||||
) = solid_result.step_events | ) = solid_result.step_events | ||||
# apologies for verboseness here! we can do better. | # apologies for verboseness here! we can do better. | ||||
expectation_result = expectation_event.event_specific_data.expectation_result | expectation_result = ( | ||||
expectation_event.event_specific_data.expectation_result | |||||
) | |||||
assert isinstance(expectation_result, ExpectationResult) | assert isinstance(expectation_result, ExpectationResult) | ||||
assert expectation_result.success | assert expectation_result.success | ||||
assert expectation_result.label == "positive" | assert expectation_result.label == "positive" | ||||
materialization = materialization_event.event_specific_data.materialization | materialization = ( | ||||
materialization_event.event_specific_data.materialization | |||||
) | |||||
assert isinstance(materialization, AssetMaterialization) | assert isinstance(materialization, AssetMaterialization) | ||||
assert materialization.label == "persisted_string" | assert materialization.label == "persisted_string" | ||||
# end_test_event_stream | # end_test_event_stream |