Page MenuHomePhabricator

Multiprocessing and Dask execution through Dagit
ClosedPublic

Authored by max on Aug 6 2019, 10:10 PM.

Details

Summary
Test Plan

Unit, manual:

Execute the sleepy_pipeline from dagit with config as follows:

storage:
  filesystem:
execution:
  multiprocess:
    config:
      max_concurrent: 4
solids:
  giver:
    config:
      - 1
      - 5
      - 10
      - 5

Execute the hammer_pipeline from Dagit with config as follows:

storage:
  filesystem:

execution:
  dask:

(logs do not stream, but progress is visible on command line)

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

max created this revision.Aug 6 2019, 10:10 PM
max updated this revision to Diff 3473.Aug 6 2019, 10:16 PM

Fix sort, black

max retitled this revision from Wip to Multiprocessing execution through Dagit.Aug 6 2019, 11:59 PM
max edited the test plan for this revision. (Show Details)
max edited the test plan for this revision. (Show Details)Aug 7 2019, 12:12 AM
max updated this revision to Diff 3478.Aug 7 2019, 12:35 AM

Fix tests

max updated this revision to Diff 3479.Aug 7 2019, 4:06 PM

Dask

max updated this revision to Diff 3480.Aug 7 2019, 4:13 PM

Name collision

max updated this revision to Diff 3481.Aug 7 2019, 4:16 PM

Properly nest config

max updated this revision to Diff 3482.Aug 7 2019, 4:20 PM

Fixup config

max updated this revision to Diff 3484.Aug 7 2019, 4:39 PM

Multiproc fixes

max updated this revision to Diff 3485.Aug 7 2019, 5:13 PM

Nits

max retitled this revision from Multiprocessing execution through Dagit to Multiprocessing and Dask execution through Dagit.Aug 7 2019, 5:39 PM
max edited the test plan for this revision. (Show Details)
max added reviewers: alangenfeld, natekupp, schrockn, Restricted Project.
max edited the summary of this revision. (Show Details)

this is fantastic, thanks for taking this on! will let @alangenfeld take a look also but lgtm

python_modules/dagster-graphql/dagster_graphql/implementation/pipeline_execution_manager.py
336–337

meant to remove these commented lines?

python_modules/dagster-graphql/dagster_graphql/implementation/pipeline_run_storage.py
139 ↗(On Diff #3485)

print

python_modules/dagster/dagster/core/definitions/executor.py
74

maybe we just (by default) derive the name from the name of the function vs. specifying explicitly?

max added inline comments.Aug 7 2019, 6:19 PM
python_modules/dagster-graphql/dagster_graphql/implementation/pipeline_execution_manager.py
336–337

thx

python_modules/dagster-graphql/dagster_graphql/implementation/pipeline_run_storage.py
139 ↗(On Diff #3485)

thx

max updated this revision to Diff 3486.Aug 7 2019, 6:25 PM

Undebug

alangenfeld added inline comments.
python_modules/dagster/dagster/check/__init__.py
29–47

End previous string with a space so that additional_message doesnt have to prefix with it

python_modules/dagster/dagster/core/definitions/executor.py
15–16

we should try to be consistent with how definitions are set up - i think @sashank changed most to use _ prefixed internal vars and defined @properties to keep them "read only"

35–37

what exactly is the purpose of this class? name is vague and without context it seems like ExecutorConfig could flow through without this wrapper

python_modules/dagster/dagster/core/definitions/mode.py
19–27

hmm - wonder how often some of these we will be the same for all modes

This is excellent. I'll leave it to @alangenfeld to approve once this feedback is addressed.

python_modules/dagster-graphql/dagster_graphql/implementation/pipeline_execution_manager.py
142

why not just execute_pipeline here? seems the same

python_modules/dagster/dagster/core/definitions/executor.py
17

yeah what alex said

max added inline comments.Aug 7 2019, 8:15 PM
python_modules/dagster-graphql/dagster_graphql/implementation/pipeline_execution_manager.py
142

yeah, it's tricky and i'll add a comment. execute_pipeline will insert spurious pipeline start and pipeline success or pipeline failure event s into the stream.

python_modules/dagster/dagster/core/definitions/executor.py
17

yep

35–37

yes, it certainly could -- i am really cargo culting here

python_modules/dagster/dagster/core/definitions/mode.py
19–27

I am tempted to make this an append-only thing with built-in defaults, as for logging. See the infelicity at, e.g. hammer.py l. 27

alangenfeld requested changes to this revision.Aug 7 2019, 8:18 PM

to your queue for the existing comments - ill take a final pass after and this should be g2g

This revision now requires changes to proceed.Aug 7 2019, 8:18 PM
max marked 5 inline comments as done.Aug 8 2019, 2:42 PM
max updated this revision to Diff 3513.Aug 8 2019, 2:45 PM

Respond to comments

max updated this revision to Diff 3517.Aug 8 2019, 2:48 PM

Fix import

max updated this revision to Diff 3518.Aug 8 2019, 2:50 PM

Fix import

schrockn accepted this revision.Aug 8 2019, 3:04 PM
schrockn added inline comments.
examples/dagster_examples/toys/resources.py
8

i think we should generally keep these in sorted order as sed/vi/unix would do it

python_modules/dagster-dask/dagster_dask/executor.py
11

some comments could be nice

python_modules/dagster/dagster/core/engine/init.py
14

you can get rid of parens and make this a single string

python_modules/dagster/dagster/core/execution/config.py
114–122

Can you file an issue for this or make the action item otherwise more obvious? Or shall I say, less gnomic? :-)

max updated this revision to Diff 3526.Aug 8 2019, 5:01 PM
max marked 2 inline comments as done.

Respond to feedback

max added inline comments.Aug 8 2019, 6:10 PM
examples/dagster_examples/toys/resources.py
8

addressing this separately -- we should add some tooling

python_modules/dagster-dask/dagster_dask/executor.py
11

yep

python_modules/dagster/dagster/core/engine/init.py
14

ugh black

This revision is now accepted and ready to land.Aug 8 2019, 7:00 PM