Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-test/dagster_test/toys/retries.py
import time | import time | ||||
from dagster import PresetDefinition, RetryRequested, lambda_solid, pipeline, solid | from dagster import RetryRequested, graph, op | ||||
@lambda_solid | @op | ||||
def echo(x): | def echo(x): | ||||
return x | return x | ||||
@solid(config_schema={"max_retries": int, "delay": float, "work_on_attempt": int}) | @op(config_schema={"max_retries": int, "delay": float, "work_on_attempt": int}) | ||||
def retry_solid(context): | def retry_solid(context) -> str: | ||||
time.sleep(0.1) | time.sleep(0.1) | ||||
if (context.retry_number + 1) >= context.solid_config["work_on_attempt"]: | if (context.retry_number + 1) >= context.solid_config["work_on_attempt"]: | ||||
return "success" | return "success" | ||||
else: | else: | ||||
raise RetryRequested( | raise RetryRequested( | ||||
max_retries=context.solid_config["max_retries"], | max_retries=context.solid_config["max_retries"], | ||||
seconds_to_wait=context.solid_config["delay"], | seconds_to_wait=context.solid_config["delay"], | ||||
) | ) | ||||
@pipeline( | @graph | ||||
preset_defs=[ | def retry_graph(): | ||||
PresetDefinition( | echo(retry_solid()) | ||||
name="pass_after_retry", | |||||
run_config={ | |||||
retry_job = retry_graph.to_job( | |||||
name="pass_after_retry_job", | |||||
default_config={ | |||||
"solids": { | "solids": { | ||||
"retry_solid": { | "retry_solid": { | ||||
"config": { | "config": { | ||||
"delay": 0.2, | "delay": 0.2, | ||||
"work_on_attempt": 2, | "work_on_attempt": 2, | ||||
"max_retries": 1, | "max_retries": 1, | ||||
} | } | ||||
} | } | ||||
} | } | ||||
}, | }, | ||||
) | ) | ||||
] | |||||
) | |||||
def retry_pipeline(): | |||||
echo(retry_solid()) |