Add a run coordinator that simply wraps the run launcher. This will allow instance methods to always go through the coordinator, instead of conditionally using the coordinator if enabled and the launcher otherwise.
Is there a better place for this to live? Both potential call sites that I'm aware of are in instigation but it doesn't necessarily seem specific to that
TODO (later diff): migrate the scheduler to use this util
Should we just check equality of everything? Other cases to test?
as per offline convo think it's worth considering a new public enqueue_run on DagsterInstance that:
- creates pipeline_run without putting it in storage
- passes run to queuer
- queuer is responsible for making any needed changes to the pipeline run for the chosen queuing strategy and putting it in run storage
That would accomodate the current world, the new run queuer world, and maybe even someday other queuers that don't use the run storage as the source of truth. Plus the crazy list of arguments ends up only on one DagsterInstance method instead of on every queuer.
|11–12 ↗||(On Diff #24440)|
this is slightly underselling the number of args :)
|152 ↗||(On Diff #24440)|
this was already moved to MockedRunLauncher in core.test_utils if you rebase
how does this work for the case of the process based run queuer exactly?
Do you mean the dequeuer running in a separate process? The latter two steps are actually just one, once the run is in the run store as QUEUED it will eventually be picked up by the dequeuer
|31–44 ↗||(On Diff #24478)|
Good point, at least the comment should be updated ("Returns true if the run was alive and was successfully terminated or removed from the queue")
I think it makes sense that you can call this method on either a queued or launched run (STARTED/NOT_STARTED). It can check if the item is in the queue (if so, delete it), otherwise it can hand off to the launcher. There is potential for a race condition however, since the canceled run could have just been read by the dequeuer. Even if the run is launched though, once the run status is 'FAILED' the run coordinator/step jobs can abort.
Maybe call it cancel_run()?
vary the queuer and dequeuer independently
In general no. One case that could end up supporting is an "Instant launch", where most runs are using the default queuer (which sends to the external process), but some runs can be launched directly from dagit. I don't think we have a clear use case for that though.
The two cases that are enabled by this stack are 1. use the instant queuer, and that's it. 2. use the default queuer, and also run dagster-instigator dequeuer run somewhere. In the latter, we could have the dequeuer warn if a different queuer is configured. Conversely will need a way to surface a warning in Dagit when the default queuer is enabled but we're not getting heartbeats from the dequeuer (future work).
Is the plan to move this on the instance?
I think we should consider having the instance component be a more wholistic understanding of the selected queueing process. I think - in general - we are at a point where we wish we had fewer bigger instance components than the fine grained break down we have now - and I would like to avoid that with this subsystem
|8–24 ↗||(On Diff #24555)|
this reads pretty confusing to me - this is called a RunQueuer - but it expects the run to already be in a queued state and launches the run in its enqueue method
naming time continued
submit_run -> feels good with the two existing cases - i think
unsubmit_run -> if it only works pre-launch
interesting design decision on the latter - should it be two explicit separate actions so that its clear if you are canceling something before vs terminating something thats going - or should we hide that
if we are going introduce this as a peer to run launcher at first - that may effect our short term choices. It might be reasonable to get to both capabilities (separate cancel_pending & terminate versus combined) later
|8–10 ↗||(On Diff #24770)|
What would be a case where you would want to cancel a run only if it had yet to be launched? I suppose I can imagine wanting to cancel all pending runs
other than the naming stuff this is looking good to me, and we can do a rename pass at the end of the stack too. @alangenfeld did you have any blockers left here?
should the instance param be a weakref to avoid even the possibility of circular dependencies preventing GC? That's what all the run launchers end up doing with it
let's take off instance here since you get it in initialize - the only reason runlauncher has that param is for backwards compatibility
I don't really see how the scheduler would use this? there's no run yet in the scheduler. I think this method is actually fairly specific to the run coordinator.
external_pipeline_from_origin could be shared though
nit why q?
this should be the Default, but we can do a naming pass at the end
this can be return False rather than NotImplemented
this will change a bit after D4994, but only a bit