Page MenuHomePhabricator

Streaming API call over GRPC
ClosedPublic

Authored by max on Jun 26 2020, 6:36 PM.

Details

Summary

This moves execute_run to use GRPC. A follow-on diff will implement the (fairly complex) soft termination scheme.

Test Plan

Unit

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
max added a comment.Jun 30 2020, 1:35 PM

Parallel API

alangenfeld requested changes to this revision.Jul 1 2020, 4:10 PM

to you for discussion

python_modules/dagster/dagster/core/launcher/grpc_run_launcher.py
19

should we just modify CliApiRunLauncher to have the ability to launch grpc instead of cli direct? gated by config or by poking at the ExternalPipeline once we add a load_from variant / config that opts in to grpc? A bit weary of all this duped code.

72–100

if we stick with a process - does this code need to account for the intermediate process dying but not the grpc server process? just seems like we shouldn't have a straight copy paste of the cli api run launcher given this works a bit differently.

What isolation is already provided by the grpc server? If we have a crashy pipeline what actually happens? Do we need to even worry about doing this cleanup or will the grpc client in the intermediate process raise when the stream closes unexpectedly eliminating the need for all this zombie clean up checking.

117–118

[1]

120–131

why a process instead of a thread?

python_modules/dagster/dagster/grpc/client.py
132–146

did you look in to doing a bidirectional stream in GRPC and sending the termination request through that way?

python_modules/dagster/dagster/serdes/ipc.py
197–200

you run this on windows yet? keep an eye on the azure pipeline if not

python_modules/dagster/dagster_tests/core_tests/launcher_tests/test_grpc_run_launcher.py
163–192

what happens at [1] in this test? could we just try/catch there and drop the zombie checker thread?

This revision now requires changes to proceed.Jul 1 2020, 4:10 PM
max planned changes to this revision.Jul 1 2020, 4:39 PM

thanks for this, moving to threads will allow us to drop the zombie checking machinery completely.

python_modules/dagster/dagster/core/launcher/grpc_run_launcher.py
19

sure, we could do that. then the cutover is obscured / out of user control vs. requiring people to update their dagster.yaml to reflect the change. this seemed more explicit and avoids us having to keep track of two different families of subprocesses in one launcher, but happy to make this change if you think it's worthwhile.

python_modules/dagster/dagster/grpc/client.py
132–146

yep, not an option if we want something that acts like an interrupt. the only solution for soft termination that i can see is roughly https://excalidraw.com/#json=5646104355930112,oYttHEA0LtS-GfJlpOzmLw

python_modules/dagster/dagster/serdes/ipc.py
197–200

this should work since the pid is the process group id and we are now starting with the process group flags set correctly

max requested review of this revision.Jul 1 2020, 4:40 PM
max planned changes to this revision.
alangenfeld added inline comments.Jul 1 2020, 4:45 PM
python_modules/dagster/dagster/core/launcher/grpc_run_launcher.py
19

im not certain what path is best - something to hash out as we figure out the details of our cut-over / migration / opt-in strategy.

Also container based stuff is going to make us reconsider a lot of stuff so that might influence as well.

python_modules/dagster/dagster/grpc/client.py
132–146

alright - looking at the diagram it looks like [2] will happen in a subprocess - which feels especially right for long running servers. Once that happens maybe the bidirectional comms would make sense, but not sure. Stuff to hash out later.

python_modules/dagster/dagster/grpc/server.py
155–161

[2]

max updated this revision to Diff 17939.Jul 2 2020, 7:45 PM

Multithreading scheme

max updated this revision to Diff 17941.Jul 2 2020, 7:58 PM

fix assert

max updated this revision to Diff 17942.Jul 2 2020, 8:00 PM

fix assertion

max updated this revision to Diff 17977.Jul 3 2020, 12:51 PM

py2 typecheck

max updated this revision to Diff 18016.Jul 6 2020, 12:12 PM

py2 instance check

max updated this revision to Diff 18035.Jul 6 2020, 2:47 PM

Rebase

max updated this revision to Diff 18038.Jul 6 2020, 2:57 PM

imports

max updated this revision to Diff 18039.Jul 6 2020, 3:35 PM

update snapshots

alangenfeld added inline comments.Jul 6 2020, 4:19 PM
python_modules/dagster/dagster/core/launcher/grpc_run_launcher.py
86

did you also try to send over the instance ref and create new instances of the instance in the threads ?

python_modules/dagster/dagster/core/storage/event_log/sqlite/sqlite_event_log.py
109–114 ↗(On Diff #18039)

oof brutal - I wonder what else we can do about this

if we are going to land this we should at least filter down on message like we did for the exceptions below this, just to prevent hiding some other problem

python_modules/dagster/dagster_tests/grpc_tests/test_ping.py
143 ↗(On Diff #18039)

?

alangenfeld requested changes to this revision.Jul 6 2020, 10:07 PM
alangenfeld added inline comments.
python_modules/dagster/dagster/api/execute_run.py
97–100

[1]

python_modules/dagster/dagster/core/launcher/grpc_run_launcher.py
86

oh boy ok after a second pass i see whats happening now, missing the instance_ref above was my bad but the current control flow scheme here of calling back on to this class from [1] is pretty wild.

Could we instead manage the api client creation sync here and then only hand over the execute_run invocation to a thread? I think that would clean up these odd control flow issues

That said, why use a streaming request at all if we are ignoring the events, should we just have unary start_run and terminate_run instead?

This revision now requires changes to proceed.Jul 6 2020, 10:07 PM
max added inline comments.Jul 7 2020, 1:50 PM
python_modules/dagster/dagster/core/launcher/grpc_run_launcher.py
86

unfortunately i think this whole scheme isn't viable -- we need to revert to the approach in https://dagster.phacility.com/D3662?id=17660

  • we can't use threads, because we need the run launcher to be fire-and-forget to support backfills appropriately; the api clients need to be spawned (non-daemon) subprocesses
  • the clients need to be streaming, because we aren't guaranteed that the run launcher will be around to manage the server lifecycle
python_modules/dagster/dagster_tests/grpc_tests/test_ping.py
143 ↗(On Diff #18039)

i.e., this got interrupted -- we asked for 100k responses but only got a few hundred

schrockn requested changes to this revision.Jul 7 2020, 2:20 PM
This comment was removed by schrockn.

deleted previous comment because i wanted to expound about it more but then TJ started making trouble and had to go get him :-)

max updated this revision to Diff 18118.Jul 7 2020, 2:49 PM

revert to multiprocessing scheme

schrockn added inline comments.Jul 7 2020, 2:57 PM
python_modules/dagster/dagster/core/launcher/grpc_run_launcher.py
2

ah excellent much more comfortable with this approach.

alangenfeld accepted this revision.Jul 7 2020, 3:01 PM

alright well apologies for the long journey back to the start

python_modules/dagster/dagster/core/launcher/grpc_run_launcher.py
19

maybe a more specific name since this is ephemeral / cli grpc and not more general

also a lil comment block describing cant hurt

49–55

I think this aspect should get dropped since marking the pipeline run failed if the watcher / client process exited unexpectedly doesn't quite make sense [2]

91–97

[2]

157–158

[2]

python_modules/dagster/dagster_tests/api_tests/test_api_execute_run.py
45–111

as recently pointed out by leor these type of tests are brittle wrt adding new events. This is better than the mystery number indexing, but if you can think of a better approach probably worth making the change

max updated this revision to Diff 18139.Jul 7 2020, 5:13 PM

respond to comments

schrockn accepted this revision.Jul 7 2020, 5:17 PM

awesome. really excited to see this land @alangenfeld should also probably sign off

python_modules/dagster/dagster/api/execute_run.py
64

why cli prefix?

This revision is now accepted and ready to land.Jul 7 2020, 5:17 PM

oh alex already approved! nvm. lets merge it

max updated this revision to Diff 18272.Jul 8 2020, 1:46 PM

rebase

This revision was automatically updated to reflect the committed changes.