Page MenuHomePhabricator

[Run-queue-3] LaunchImmediateRunsCoordinator
ClosedPublic

Authored by johann on Oct 23 2020, 11:31 PM.

Details

Summary

Add a run coordinator that simply wraps the run launcher. This will allow instance methods to always go through the coordinator, instead of conditionally using the coordinator if enabled and the launcher otherwise.

Test Plan

unit

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
johann marked an inline comment as not done.Oct 26 2020, 3:27 PM
johann added inline comments.
python_modules/dagster/dagster/utils/external.py
12

Is there a better place for this to live? Both potential call sites that I'm aware of are in instigation but it doesn't necessarily seem specific to that

TODO (later diff): migrate the scheduler to use this util

python_modules/dagster/dagster_tests/utils_tests/test_external.py
17

Should we just check equality of everything? Other cases to test?

dgibson added a subscriber: schrockn.

as per offline convo think it's worth considering a new public enqueue_run on DagsterInstance that:

  • creates pipeline_run without putting it in storage
  • passes run to queuer
  • queuer is responsible for making any needed changes to the pipeline run for the chosen queuing strategy and putting it in run storage

That would accomodate the current world, the new run queuer world, and maybe even someday other queuers that don't use the run storage as the source of truth. Plus the crazy list of arguments ends up only on one DagsterInstance method instead of on every queuer.

curious if others (hi @alangenfeld @schrockn) think that seems reasonable

python_modules/dagster/dagster/core/queuer/base.py
11–12 ↗(On Diff #24440)

this is slightly underselling the number of args :)

python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py
152 ↗(On Diff #24440)

this was already moved to MockedRunLauncher in core.test_utils if you rebase

Update queuer.enqueue_run to take an unstored pipeline run

Re Dan's comments: the suggestions for the RunQueuer:enqueue_run method are now reflected in this diff. The instance methods will be in the next, https://dagster.phacility.com/D4903

passes run to queuer

how does this work for the case of the process based run queuer exactly?

python_modules/dagster/dagster/core/queuer/base.py
31–44 ↗(On Diff #24478)

is terminate the right semantic? should we delete things to drop them from the queue ?

how does this work for the case of the process based run queuer exactly?

Do you mean the dequeuer running in a separate process? The latter two steps are actually just one, once the run is in the run store as QUEUED it will eventually be picked up by the dequeuer

python_modules/dagster/dagster/core/queuer/base.py
31–44 ↗(On Diff #24478)

Good point, at least the comment should be updated ("Returns true if the run was alive and was successfully terminated or removed from the queue")

I think it makes sense that you can call this method on either a queued or launched run (STARTED/NOT_STARTED). It can check if the item is in the queue (if so, delete it), otherwise it can hand off to the launcher. There is potential for a race condition however, since the canceled run could have just been read by the dequeuer. Even if the run is launched though, once the run status is 'FAILED' the run coordinator/step jobs can abort.

Maybe call it cancel_run()?

does it make sense to vary the queuer and dequeuer independently? What are other queuer implementations that we expect? is the plan to move this on the instance?

vary the queuer and dequeuer independently

In general no. One case that could end up supporting is an "Instant launch", where most runs are using the default queuer (which sends to the external process), but some runs can be launched directly from dagit. I don't think we have a clear use case for that though.

The two cases that are enabled by this stack are 1. use the instant queuer, and that's it. 2. use the default queuer, and also run dagster-instigator dequeuer run somewhere. In the latter, we could have the dequeuer warn if a different queuer is configured. Conversely will need a way to surface a warning in Dagit when the default queuer is enabled but we're not getting heartbeats from the dequeuer (future work).

Is the plan to move this on the instance?

https://dagster.phacility.com/D4903

johann retitled this revision from [Run-queue-3] Default run queuer to [Run-queue-3] Instant run queuer.Oct 27 2020, 5:22 PM

refactor queuer to take external pipeline rahter than getting it from origin

I think we should consider having the instance component be a more wholistic understanding of the selected queueing process. I think - in general - we are at a point where we wish we had fewer bigger instance components than the fine grained break down we have now - and I would like to avoid that with this subsystem

python_modules/dagster/dagster/core/queuer/instant_queuer.py
8–24 ↗(On Diff #24555)

this reads pretty confusing to me - this is called a RunQueuer - but it expects the run to already be in a queued state and launches the run in its enqueue method

to your queue - re: discussions of changing the ontology here to make things more clear

This revision now requires changes to proceed.Oct 28 2020, 3:39 PM
johann retitled this revision from [Run-queue-3] Instant run queuer to [Run-queue-3] Local Runs Coordinator.Oct 29 2020, 5:15 PM
johann edited the summary of this revision. (Show Details)
python_modules/dagster/dagster/core/runs_coordinator/base.py
15–49

naming time continued

submit_run -> feels good with the two existing cases - i think

unsubmit_run -> if it only works pre-launch
cancel_pending_run -> ^
cancel_run -> removes it from queue or terminates if already launched

interesting design decision on the latter - should it be two explicit separate actions so that its clear if you are canceling something before vs terminating something thats going - or should we hide that

if we are going introduce this as a peer to run launcher at first - that may effect our short term choices. It might be reasonable to get to both capabilities (separate cancel_pending & terminate versus combined) later

python_modules/dagster/dagster/core/runs_coordinator/local_runs_coordinator.py
8–10 ↗(On Diff #24770)

LaunchImmediateRunsCoordinator

python_modules/dagster/dagster/core/runs_coordinator/base.py
15–49

What would be a case where you would want to cancel a run only if it had yet to be launched? I suppose I can imagine wanting to cancel all pending runs

johann retitled this revision from [Run-queue-3] Local Runs Coordinator to [Run-queue-3] LaunchImmediateRunsCoordinator.Oct 29 2020, 7:18 PM
johann marked an inline comment as done.
python_modules/dagster/dagster/core/runs_coordinator/base.py
15–49

ya I don't know what the right call is - but i think its worth thinking through

other than the naming stuff this is looking good to me, and we can do a rename pass at the end of the stack too. @alangenfeld did you have any blockers left here?

python_modules/dagster/dagster/core/runs_coordinator/base.py
7

should the instance param be a weakref to avoid even the possibility of circular dependencies preventing GC? That's what all the run launchers end up doing with it

16

let's take off instance here since you get it in initialize - the only reason runlauncher has that param is for backwards compatibility

python_modules/dagster/dagster/utils/external.py
12

I don't really see how the scheduler would use this? there's no run yet in the scheduler. I think this method is actually fairly specific to the run coordinator.

external_pipeline_from_origin could be shared though

python_modules/dagster/dagster_tests/core_tests/runs_coordinator_tests/test_launch_immediate_runs_coordinator.py
25

nit why q?

dgibson added inline comments.
python_modules/dagster/dagster/core/runs_coordinator/base.py
21

rm

python_modules/dagster/dagster/core/runs_coordinator/launch_immediate_runs_coordinator.py
10

this should be the Default, but we can do a naming pass at the end

34

this can be return False rather than NotImplemented

python_modules/dagster/dagster/utils/external.py
17

this will change a bit after D4994, but only a bit

johann marked 4 inline comments as done.

nah im good - we can do a rename pass at the end

This revision is now accepted and ready to land.Mon, Nov 2, 4:41 PM
This revision was landed with ongoing or failed builds.Tue, Nov 3, 1:40 AM
This revision was automatically updated to reflect the committed changes.