Page MenuHomePhabricator

RFC: skip pipeline steps during retry for which we have intermediates
AbandonedPublic

Authored by prha on Oct 16 2019, 12:02 AM.

Details

Reviewers
nate
alangenfeld
Summary

This diff moves the copy_intermediate_from_prev_run calls for retried pipelines from
the pipeline setup to the engine execution.

This is so that we can check the intermediates and skip steps for which we already have outputs.

Previously, we reused step outputs from the previous run, but still ran each step execution and
thus had to deal with any new failures.

Because we reconstitute the previous runs step output based on object_store operations, we also
need to emit object store operations for any events that are skipped (for future retries).
Moving the copy operations means that we transparently expose these operations as DagsterEvents
when we retry.

The downside of moving the copy operation to the engine is that every engine implementation must
now be aware of retries and handle this behavior explicitly.

This diff does not test for nor explicitly handle retries in the Dask engine.

Test Plan

created new pytest

Diff Detail

Repository
R1 dagster
Lint
Lint OK
Unit
No Unit Test Coverage

Event Timeline

prha created this revision.Oct 16 2019, 12:02 AM
prha planned changes to this revision.Oct 16 2019, 12:20 AM

need to look at these build failures

prha updated this revision to Diff 5868.Oct 16 2019, 5:19 AM

fix tests, handle 0 output case

prha updated this revision to Diff 5869.Oct 16 2019, 6:02 AM

change skip semantics

I like the over all approach but its worth spending some more time thinking through how to avoid burdening every new executor with this blob of complexity. Whether thats shuffling things around or just refactoring it such that it can be a shared utility brought in to each executor. In the worst case well have to figure out the product implications of not supporting it in every executor.

js_modules/dagit/src/schema.graphql
834

hmm always dubious of bool input args, wonder how else we could set this up

python_modules/dagster/dagster/core/engine/engine_multiprocess.py
117–126

I think its worth considering framing this more abstractly. This same code could be used to prevent re-computation in cases that are not resume-retry. IE if i have some scheduled job and none of the inputs change for some subset i can copy the last results over instead of recomputing. It might be too difficult to abstract this now but its worth thinking through what this is doing from first principles and considering renaming/restructuring it along those lines instead of its current sole intended use case.

141–143

huh cute - i thought we would have to have dealt with success and fails in the previous run instead of just what intermediates are available. We should maybe test a solid that yields multiple outputs and then fails. Do we overwrite the copied intermediates when we rerun? Do we not copy those intermediates?

prha added a comment.Oct 16 2019, 5:10 PM

Yeah, maybe I'll take another pass at this, pulling out some sort of RetryConfig that does all of this outside of the engine and just passes in a set of steps to skip... This gives a surface area that is more easy to reason about at the engine level, but gives some flexibility around examining the success/failure state of the steps from the previous run.

I still have some open questions, around how we should think about side effects, materializations, etc.

python_modules/dagster/dagster/core/engine/engine_multiprocess.py
141–143

Good call, I'll add a test for this.

In general, there's a lot of subtlety into what the right behavior for retries is. You're right that it should not just be based on the successful generation of all the output (we should take into account the general step success/failure). Should it also take into account materializations? There were a couple test cases in our test suite that had solids performing undocumented side-effects (see test_basic_solid_with_config in dagster_tests/core_tests/test_solid_with_config.py) that made me think about where we might be challenging some assumptions.

prha updated this revision to Diff 5878.Oct 17 2019, 12:38 AM

pull reexecution logic out of engine

prha updated this revision to Diff 5879.Oct 17 2019, 12:59 AM

update tests

prha updated this revision to Diff 5880.Oct 17 2019, 1:06 AM

update snapshot

prha added a comment.Oct 17 2019, 4:46 AM

Another RFC, but I moved all of the object store operations out of the engine implementation. This does mean that the engine API now takes in a list of steps to skip. The pipeline calculates this list based off of the reexecution config and the execution plan.

I think it's important to distinguish between step_keys_to_execute and step_keys_to_skip (they're orthogonal), but not sure the best way to make that clear.

nate added inline comments.Oct 17 2019, 5:42 PM
python_modules/dagster/dagster/core/engine/engine_base.py
10

I wonder if step_keys_to_execute / step_keys_to_skip could be properties of the execution plan? I could imagine us annotating the execution plan with various other metadata about how we want things to be executed down in the engines, seems like it'd be nice to encapsulate all of that in a single object and then hand it off to the engines to do with it as they will.

thoughts?

python_modules/dagster/dagster/core/execution/api.py
374

hmm, what happens here for a step key that's downstream of a failed task? I guess we're only skipping execution for steps that previously executed and therefore have their outputs covered?

python_modules/dagster/dagster/core/execution/config.py
130

maybe add check.inst_ calls here?

prha planned changes to this revision.Oct 17 2019, 6:59 PM
prha added inline comments.
python_modules/dagster/dagster/core/engine/engine_base.py
10

Yeah, I think my next pass will try to move these onto execution plan.

The engines should just take in pipeline context and the plan and execute that plan.

python_modules/dagster/dagster/core/execution/api.py
374

Yep, the assumption here is that ReexecutionConfig.retry_steps is all the non-successful steps from the previous run.

prha updated this revision to Diff 5935.Oct 21 2019, 1:53 AM

change reexecution API to pass in force_reexecution_step_keys instead of a set of output handles, use memoization class to group retry logic

prha updated this revision to Diff 5936.Oct 21 2019, 2:17 AM

fix pylint, dagster-aws errors

prha updated this revision to Diff 5937.Oct 21 2019, 2:42 AM

pylint

prha added a comment.Oct 21 2019, 2:59 AM

kind of a large diff, but it's helpful to have everything be in one place. once the approach looks good, i'll abandon this and break up into smaller changes.

I like the overall direction here a lot, great work.

Throwing an idea out there: should we have a test that does:
A: normal run (that fails)
B: resume-retry of A (also fails)
C: single step against B
D: regular-retry of B

The purpose of such a test is to help feel out having enough information in PipelineRun to pull off a retry of a resume-retry.

python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_execute_pipeline.py
416–422 ↗(On Diff #5937)

what is the exact relationship between stepKeys and forceReexecutionStepKeys? What would it look like to nuke reexecutionConfig and just express sufficient intent via executionParams?

python_modules/dagster/dagster/core/engine/engine_inprocess.py
66–67

this feels a bit off at a mental model level - maybe just the name generate_events on MemoizationStrategy. Is this just to generate the intermediate store operation events?

119–121

nice

python_modules/dagster/dagster/core/execution/api.py
354–355

might be better to set this up as a factory - especially if we derive the memoization subtype via the run_config

prha added inline comments.Oct 21 2019, 5:44 PM
python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_execute_pipeline.py
416–422 ↗(On Diff #5937)

I was thinking that stepKeys and forceReexecutionStepKeys could be different, in that stepKeys is the set of keys we want to show in the run (but could be skipped due to memoization) and forceReexecutionStepKeys would be the set of keys that we wanted to compute regardless of the previous' runs success state.

In reality, these two are generally kept in lock-step for single-step reexecution.

For resume/retries, we could merge the two concepts, but that would mean that we'd show the step status as not having run at all instead of being skipped. Thoughts?

python_modules/dagster/dagster/core/engine/engine_inprocess.py
66–67

Yeah, in order to have retries of retries, we need to generate the intermediate store operation events so that the next retry can pull the results.

We need the event logs from the previous run, and it felt bad to fetch that directly in the execution api / engine.

alangenfeld added inline comments.Oct 21 2019, 6:22 PM
python_modules/dagster/dagster/core/engine/engine_inprocess.py
66–67

ya just changing the names will make this feel better (to me):

for store_event in memoization_strategy.manage_intermediates(execution_plan):
  yield store_event
alangenfeld added inline comments.Oct 21 2019, 6:23 PM
python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_execute_pipeline.py
416–422 ↗(On Diff #5937)

I like the goal of merging - its unclear that the subtle behavior achieved by having them separate is worth the confusion

prha planned changes to this revision.Oct 21 2019, 7:57 PM

Okay, sounds good! Will try to kill reexecution config completely and consolidate / create some tests for clarity

prha abandoned this revision.Oct 31 2019, 10:01 PM