Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-test/dagster_test/toys/error_monster.py
from typing import cast | |||||
from dagster import ( | from dagster import ( | ||||
Failure, | Failure, | ||||
Field, | Field, | ||||
IOManager, | IOManager, | ||||
InputDefinition, | |||||
Int, | |||||
ModeDefinition, | |||||
OutputDefinition, | |||||
PresetDefinition, | |||||
ResourceDefinition, | ResourceDefinition, | ||||
RetryRequested, | RetryRequested, | ||||
String, | graph, | ||||
execute_pipeline, | |||||
io_manager, | io_manager, | ||||
pipeline, | op, | ||||
solid, | |||||
) | ) | ||||
from dagster.core.definitions.utils import config_from_pkg_resources | |||||
from dagster.utils import segfault | from dagster.utils import segfault | ||||
class ExampleException(Exception): | class ExampleException(Exception): | ||||
pass | pass | ||||
class ErrorableIOManager(IOManager): | class ErrorableIOManager(IOManager): | ||||
▲ Show 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | if solid_config["failure_in_solid"]: | ||||
}, | }, | ||||
) from e | ) from e | ||||
elif solid_config["throw_in_solid"]: | elif solid_config["throw_in_solid"]: | ||||
raise ExampleException("I threw up") | raise ExampleException("I threw up") | ||||
elif solid_config["request_retry"]: | elif solid_config["request_retry"]: | ||||
raise RetryRequested() | raise RetryRequested() | ||||
@solid( | @op( | ||||
output_defs=[OutputDefinition(Int)], | |||||
config_schema=solid_throw_config, | config_schema=solid_throw_config, | ||||
required_resource_keys={"errorable_resource"}, | required_resource_keys={"errorable_resource"}, | ||||
) | ) | ||||
def emit_num(context): | def emit_num(context) -> int: | ||||
_act_on_config(context.solid_config) | _act_on_config(context.solid_config) | ||||
if context.solid_config["return_wrong_type"]: | if context.solid_config["return_wrong_type"]: | ||||
return "wow" | return cast(int, "wow") | ||||
return 13 | return 13 | ||||
@solid( | @op( | ||||
input_defs=[InputDefinition("num", Int)], | |||||
output_defs=[OutputDefinition(String)], | |||||
config_schema=solid_throw_config, | config_schema=solid_throw_config, | ||||
required_resource_keys={"errorable_resource"}, | required_resource_keys={"errorable_resource"}, | ||||
) | ) | ||||
def num_to_str(context, num): | def num_to_str(context, num: int) -> str: | ||||
_act_on_config(context.solid_config) | _act_on_config(context.solid_config) | ||||
if context.solid_config["return_wrong_type"]: | if context.solid_config["return_wrong_type"]: | ||||
return num + num | return cast(str, num + num) | ||||
alangenfeld: nit: mypy ignore instead? since this is meant to trigger an error | |||||
return str(num) | return str(num) | ||||
@solid( | @op( | ||||
input_defs=[InputDefinition("string", String)], | |||||
output_defs=[OutputDefinition(Int)], | |||||
config_schema=solid_throw_config, | config_schema=solid_throw_config, | ||||
required_resource_keys={"errorable_resource"}, | required_resource_keys={"errorable_resource"}, | ||||
) | ) | ||||
def str_to_num(context, string): | def str_to_num(context, string: str) -> int: | ||||
_act_on_config(context.solid_config) | _act_on_config(context.solid_config) | ||||
if context.solid_config["return_wrong_type"]: | if context.solid_config["return_wrong_type"]: | ||||
return string + string | return cast(int, string + string) | ||||
Not Done Inline Actions^ alangenfeld: ^ | |||||
return int(string) | return int(string) | ||||
@pipeline( | @graph( | ||||
description=( | description=( | ||||
"Demo pipeline that enables configurable types of errors thrown during pipeline execution, " | "Demo pipeline that enables configurable types of errors thrown during pipeline execution, " | ||||
"including solid execution errors, type errors, and resource initialization errors." | "including solid execution errors, type errors, and resource initialization errors." | ||||
), | |||||
mode_defs=[ | |||||
ModeDefinition( | |||||
name="errorable_mode", | |||||
resource_defs={ | |||||
"errorable_resource": define_errorable_resource(), | |||||
"io_manager": errorable_io_manager, | |||||
}, | |||||
), | |||||
], | |||||
preset_defs=[ | |||||
PresetDefinition.from_pkg_resources( | |||||
"passing", | |||||
pkg_resource_defs=[("dagster_test.toys.environments", "error.yaml")], | |||||
mode="errorable_mode", | |||||
) | ) | ||||
], | |||||
tags={"monster": "error"}, | |||||
) | ) | ||||
def error_monster(): | def error_monster_graph(): | ||||
start = emit_num.alias("start")() | start = emit_num.alias("start")() | ||||
middle = num_to_str.alias("middle")(num=start) | middle = num_to_str.alias("middle")(num=start) | ||||
str_to_num.alias("end")(string=middle) | str_to_num.alias("end")(string=middle) | ||||
if __name__ == "__main__": | error_monster_job = error_monster_graph.to_job( | ||||
result = execute_pipeline( | name="error_monster_job", | ||||
error_monster, | resource_defs={ | ||||
{ | "errorable_resource": define_errorable_resource(), | ||||
"solids": { | "io_manager": errorable_io_manager, | ||||
"start": {"config": {"throw_in_solid": False, "return_wrong_type": False}}, | |||||
"middle": {"config": {"throw_in_solid": False, "return_wrong_type": True}}, | |||||
"end": {"config": {"throw_in_solid": False, "return_wrong_type": False}}, | |||||
}, | |||||
"resources": {"errorable_resource": {"config": {"throw_on_resource_init": False}}}, | |||||
}, | }, | ||||
default_config=config_from_pkg_resources([("dagster_test.toys.environments", "error.yaml")]), | |||||
) | ) |
nit: mypy ignore instead? since this is meant to trigger an error