Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-test/dagster_test/toys/unreliable.py
from random import random | from random import random | ||||
from dagster import Field, pipeline, solid | from dagster import Field, graph, op | ||||
DEFAULT_EXCEPTION_RATE = 0.3 | DEFAULT_EXCEPTION_RATE = 0.3 | ||||
@solid | @op | ||||
def unreliable_start(_): | def unreliable_start() -> int: | ||||
return 1 | return 1 | ||||
@solid( | @op(config_schema={"rate": Field(float, is_required=False, default_value=DEFAULT_EXCEPTION_RATE)}) | ||||
config_schema={"rate": Field(float, is_required=False, default_value=DEFAULT_EXCEPTION_RATE)} | def unreliable(context, num: int) -> int: | ||||
) | |||||
def unreliable(context, num): | |||||
if random() < context.solid_config["rate"]: | if random() < context.solid_config["rate"]: | ||||
raise Exception("blah") | raise Exception("blah") | ||||
return num | return num | ||||
@pipeline(description="Demo pipeline of chained solids that fail with a configurable probability.") | @graph(description="Demo pipeline of chained solids that fail with a configurable probability.") | ||||
def unreliable_pipeline(): | def unreliable_graph(): | ||||
one = unreliable.alias("one") | one = unreliable.alias("one") | ||||
two = unreliable.alias("two") | two = unreliable.alias("two") | ||||
three = unreliable.alias("three") | three = unreliable.alias("three") | ||||
four = unreliable.alias("four") | four = unreliable.alias("four") | ||||
five = unreliable.alias("five") | five = unreliable.alias("five") | ||||
six = unreliable.alias("six") | six = unreliable.alias("six") | ||||
seven = unreliable.alias("seven") | seven = unreliable.alias("seven") | ||||
seven(six(five(four(three(two(one(unreliable_start()))))))) | seven(six(five(four(three(two(one(unreliable_start()))))))) | ||||
unreliable_job = unreliable_graph.to_job(name="unreliable_graph") |