Page MenuHomePhabricator

[RFC] Cancel queued runs
ClosedPublic

Authored by johann on Nov 25 2020, 3:13 PM.

Details

Summary
  • If queued, send a failed event
  • otherwise, pass on to the run launcher

Currently this is lumping together cancel (remove from queue) and terminate (kill a launched run). I think this is ok for now (how often does the use case come up that you only want to cancel a certain run if it is queued?), but it doesn't seem like a difficult decision to walk back if we need to make a distinction later.

An oddity now is that pipelines go from QUEUED (can click terminate) to NOT_STARTED (can't click terminate) to STARTED (can click terminate). A possible solution is to guarantee that every run worker checks pipeline state before starting a run, so a NOT_STARTED run could be safely marked as FAILED.

Not in this diff:

  • Override can_terminate
  • Bulk deletion mutation
Test Plan

Integration

manually tried in dagit

TODO: add cases

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

johann retitled this revision from Cancel queued runs to [RFC] Cancel queued runs.Nov 25 2020, 3:53 PM
johann edited the summary of this revision. (Show Details)
johann edited the test plan for this revision. (Show Details)
johann added reviewers: alangenfeld, dgibson, prha.
johann edited the summary of this revision. (Show Details)

sqllite test

python_modules/dagster/dagster/core/run_coordinator/queued_run_coordinator.py
97

We see pipeline as queued -> dequeuer takes it off queue, launches it -> we mark pipeline as failed -> run worker marks pipeline as NOT_STARTED

The only solution I've thought of so far is to use full transactions between the event and run stores. E.g something like

if run_state = QUEUED
    add_event(RUN_CANCELLED)
else:
    abort

This involves combining event log and run store. (Or we could make run states not tied to event store, but that's a pretty core assumption so far)

python_modules/dagster/dagster/core/run_coordinator/queued_run_coordinator.py
97

Typo, should be

We see pipeline as queued -> dequeuer takes it off queue, places it in NOT_STARTED, launches it -> we mark pipeline as failed -> run worker marks pipeline as STARTED

Now the user might have seen the pipeline as cancelled in dagit, but it's continuing to execute. Dagit will see conflicting events in the log and have potentially unexpected behavior.

This seems reasonable to me - curious what @alangenfeld thinks and how this might interact with https://github.com/dagster-io/dagster/issues/3239 and/or https://github.com/dagster-io/dagster/issues/3236 ? Left some thoughts on the race condition inline.

python_modules/dagster/dagster/core/run_coordinator/queued_run_coordinator.py
97

yeah this is tricky.One thing that we could do is have cancelling here just set a flag that we then expect the daemon and run worker to periodically check for? The race-y part of this is the fact that the dequeuer overwrites the change that we make here (the run status). If this cancel operation set some other flag that doesn't get overwritten, the race goes away right?

python_modules/dagster/dagster/core/run_coordinator/queued_run_coordinator.py
97

and sorry the implication there would be that this process wouldn't actually do the status update as well - it would be the daemon and/or run worker that do the actual run status update, at the same time that execution is actually halted

oof ya the race conditions here are really gnarly. Probably worth drawing up the full state machine diagram. I feel like we need to have a safer overall scheme like @dgibson is talking about. I think delete and mark-as-failed are emergency escape hatches that may put the system in to a bad state if the user does them at the wrong time. In all other cases there should be a clear "owner" for the run and only that "owner" should manage transitions.


https://excalidraw.com/#json=5708906424696832,sAeHwFZOmqRS79v0HZ7gWQ

python_modules/dagster/dagster/core/run_coordinator/queued_run_coordinator.py
103–117

instance.report_run_failed

nit: reported something as "failed" that never started feels weird to me - i think an explicit canceled state may be worth it

If we do that it should cover terminated too IMO

just throwing some ideas around -

  • we add column to run storage, user_action, user_override or something like that
  • add a new event type, not engine event, that corresponds
  • run storage response to this new event does a smart column update so that it is only written from "origin host process" ie dagit and shouldn't conflict with other writes
  • add a new API (which we could use for other reasons) of instance.get_run_row_by_id to allow fetching the full run row including this column
  • use this for cancel / terminate from the dequeuer / run worker

I think we determined today that this is good to go for now (see alex's inline comments though)

This revision is now accepted and ready to land.Nov 30 2020, 11:18 PM

worth submitting a tracking issue and linking it in a comment too

report_run_failed and add tracking issue

This revision was landed with ongoing or failed builds.Dec 2 2020, 6:18 PM
This revision was automatically updated to reflect the committed changes.