Changeset View
Changeset View
Standalone View
Standalone View
docs/content/guides/dagster/re-execution.mdx
Show All 35 Lines | if random() < failure_rate: | ||||
raise Exception("blah") | raise Exception("blah") | ||||
@solid() | @solid() | ||||
def unreliable_end(_, num): | def unreliable_end(_, num): | ||||
return | return | ||||
@pipeline(mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})]) | @pipeline( | ||||
mode_defs=[ModeDefinition(resource_defs={"io_manager": fs_io_manager})] | |||||
) | |||||
def unreliable_pipeline(): | def unreliable_pipeline(): | ||||
unreliable_end(unreliable(unreliable_start())) | unreliable_end(unreliable(unreliable_start())) | ||||
``` | ``` | ||||
Although very simple, there are inputs and outputs passed between tasks. With an IO manager, re-execution is able to handle inputs and outputs stored from the initial run. | Although very simple, there are inputs and outputs passed between tasks. With an IO manager, re-execution is able to handle inputs and outputs stored from the initial run. | ||||
Upon launching the execution in Dagit, you can find the re-execution option on the top right. | Upon launching the execution in Dagit, you can find the re-execution option on the top right. | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | |||||
from dagster import DagsterInstance, execute_pipeline, reexecute_pipeline | from dagster import DagsterInstance, execute_pipeline, reexecute_pipeline | ||||
from reexecution.unreliable_pipeline import unreliable_pipeline | from reexecution.unreliable_pipeline import unreliable_pipeline | ||||
def reexecution(): | def reexecution(): | ||||
instance = DagsterInstance.ephemeral() | instance = DagsterInstance.ephemeral() | ||||
# Initial execution | # Initial execution | ||||
pipeline_result_full = execute_pipeline(unreliable_pipeline, instance=instance) | pipeline_result_full = execute_pipeline( | ||||
unreliable_pipeline, instance=instance | |||||
) | |||||
if not pipeline_result_full.success: | if not pipeline_result_full.success: | ||||
# Re-execution: Entire pipeline | # Re-execution: Entire pipeline | ||||
reexecution_result_full = reexecute_pipeline( | reexecution_result_full = reexecute_pipeline( | ||||
unreliable_pipeline, | unreliable_pipeline, | ||||
parent_run_id=pipeline_result_full.run_id, | parent_run_id=pipeline_result_full.run_id, | ||||
instance=instance, | instance=instance, | ||||
) | ) | ||||
Show All 15 Lines |