Page MenuHomePhabricator

refactor in_process engine core to plan_execution_iterator
ClosedPublic

Authored by alangenfeld on Mar 18 2020, 10:12 PM.

Details

Summary

A refactor to remove the weird engine -> engine setup we currently have.

The new control flow looks like this:


https://excalidraw.com/#json=5715587808362496,jNlDGr4uOi7FyuTu_UfLoA

  • Retries are a new top level argument to execute_plan APIs
  • changes celery to not go through GraphQL since it didn't really make sense
Test Plan

buildkite

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

alangenfeld created this revision.Mar 18 2020, 10:12 PM
alangenfeld edited the summary of this revision. (Show Details)Mar 19 2020, 5:08 PM
alangenfeld edited the summary of this revision. (Show Details)
alangenfeld retitled this revision from [WIP] plan_execution_iterator to [RFC] plan_execution_iterator.Mar 19 2020, 7:58 PM
alangenfeld edited the summary of this revision. (Show Details)
alangenfeld added reviewers: max, schrockn, prha, nate.
alangenfeld edited the summary of this revision. (Show Details)

rebase

schrockn requested changes to this revision.Mar 20 2020, 1:24 PM

picard_clap

This is great.

In your chart:

  • Would be helpful to note the purpose of some of the functions. E.g. that execute_plan_iterator's job is to reconstruct the execution context and then call inner plan
  • I actually think it would be really helpful to show in your chart how the ingress from the airflow/dask integrations work (GraphQL Mutation --> execute_plan) and which shows that in those cases those systems take the role of the "engine" and are calling into a low level layer in our libraries.

I think there is an opportunity here to more precisely define what the "Engine" represents and then write a nice docblock. It's subtle, especially since we "outsource" the engine to Dask/Airflow in those cases. The way I think about it is that the Engine's job is to orchestrate a series of subplan executions in a way the satisfies the constraints of the entire plan being executed (e.g. dag order). In the case of Airflow/Dask, we are actually using those *external* systems to satisfy those constraints. Precisely defining what the responsibilities of internal/external engines do would be really useful.

Also should we just jump straight to a higher-level config object instead of Retries? Or just refactor when we get there.

Aside: This was actually pretty hard to review, I think partially because of decisions that differential was making rather than anything you did. In future could be useful to chop up something like this into two diffs is possible. One that is just moving as much code as you can around without changing it.

python_modules/dagster/dagster/core/execution/api.py
297–305

document retries

python_modules/dagster/dagster/core/execution/plan/execute.py
49

most excellent

238

total aside: i'm confused how differential is rendering this. it's a huge deletion block in the previous file but seems to be showing only modifications here. makes it appear that you are deleting tons of code that is in fact only moved

341–342

skeptical

use a proper api like type kind

345–359

not a big fan of _input_value -> input_value naming pattern for canonicalization

if all((isinstance(x, ObjectStoreOperation) for x in _input_value)): seems super fragile too although this is not new i think in this diff

370–381

_ prefix misleading. generally indicates that it is unused. I would declare this function in the global scope

python_modules/dagster/dagster/core/execution/retries.py
16–18

without a crisp, higher-level explanation of terms like "orchestrator" and "engine" this is pretty hard to parse

46

lol

This revision now requires changes to proceed.Mar 20 2020, 1:24 PM
schrockn added inline comments.Mar 20 2020, 1:25 PM
python_modules/dagster/dagster/core/execution/plan/execute.py
342

i think all comments in this file predate diff. but it would be cool to see this cleaned up a bit in a followup

alangenfeld edited the summary of this revision. (Show Details)Mar 20 2020, 5:28 PM
alangenfeld edited the summary of this revision. (Show Details)Mar 20 2020, 6:13 PM

shuffle code around to prevent diffing

alangenfeld planned changes to this revision.Mar 20 2020, 8:21 PM
alangenfeld edited the summary of this revision. (Show Details)Mar 20 2020, 9:30 PM

engine comments

schrockn accepted this revision.Mar 20 2020, 9:41 PM

the chart is excellent. we should have that somewhere in architecture docs. it might be time for an "advanced" section of the our docs site target towards internal developers

python_modules/dagster/dagster/core/execution/api.py
305

still need to document retries

python_modules/dagster/dagster/core/execution/plan/execute.py
516

check inst retries

python_modules/libraries/dagster-celery/dagster_celery/tasks.py
20

motherofgod

oh we were doing graphql for inprocess wut

This revision is now accepted and ready to land.Mar 20 2020, 9:41 PM
alangenfeld added inline comments.Mar 20 2020, 9:48 PM
python_modules/dagster/dagster/core/execution/api.py
305

this is the execute_pipeline_with_mode which doesnt have it

319–354

we currently don't doc the execute_plan methods

python_modules/libraries/dagster-celery/dagster_celery/tasks.py
20

we are producing serialized output [1] so it wasn't crazy at the outset - but since we're using serdes now it doesn't make sense

61–62

[1]

alangenfeld retitled this revision from [RFC] plan_execution_iterator to refactor in_process engine core to plan_execution_iterator.Mar 20 2020, 9:50 PM