Page MenuHomePhabricator

Only allow grpc run termination in user code
ClosedPublic

Authored by dgibson on Sep 16 2020, 3:41 PM.

Details

Summary

We previously introduced delay_interrupts to prevent exceptions in bad places from leaving us in a bad state during termination, and started applying it in whack-a-mole fashion whenever a test broke. However, it's not really possible to safely guard against all bad KeyboardInterrupts, and would be super brittle even if it could.

Instead, stick a guard at the very beginning of gRPC execution to prevent interrupts from breaking system code, and re-enable them during step execution (also add a checkpoint before each step to handle any interrupts that did happen to come in during system code, so they don't have to wait until the end of execution).

Test Plan

existing termination tests, new BK test for the new functions

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes

do we have test coverage for the multiprocess executor? Since the step execution won't happen in the "run coordinator" process this guard won't be set.

probably worth also doing some manual validation of ctrl-c-ing out dagit in a terminal to validate what happens

python_modules/dagster/dagster/grpc/impl.py
179–200

I think we need to think about where to put these to work in cases where the step execution isn't happening in the root / run coordinator process

python_modules/dagster/dagster/utils/__init__.py
351–354

nit: "check" seems more like a return bool api, maybe raise_delayed_interrupts or something even more verbose?

Add checkpoints in the other executors as well so that we can still terminate on hanging steps (as well as a check after each step completes, independent of the executor). This part needs some more test coverage though

Re-enable test_interrupt_multiproc (wrapped in a delay_interrupts call now)

Make the multiproc interrupt test sleep for 60 seconds instead of running forever, so that if it does go flaky again its less painful

python_modules/dagster/dagster/core/executor/child_process_executor.py
142–145

do we need a copy of this in every executor then? seems like trying to put it in ActiveExecution may be better

148

its not obvious to me / I dont recall, but based on what you are doing above we must yield Exception objects directly here?

my general concern is with how these are scattered about, and what will help us ensure we don't miss adding/moving them over time. It would be good to find fewer more central location to put these

I hear you on them being scattered about and the desire to have a more central location, but I don't have a great alternative if we want to only wrap the 'user code' parts.

We do have one central location that gives us an OK base behavior if you make no changes to any executors - in _core_execution_iterator, we check for any interrupts after each step finishes (and before we go into any executors). That gives us a default behavior where you can terminate and it will work after the next event comes in.

However, that wont' help you if the next event takes a really long time (or hangs forever, at which point your interrupt would never get signalled). To handle that, you need to add checkpoints deeper down in the code in the individual executors (or restore the original signal handler around code that you know is interrupt-safe).

I could add some comments to make it clearer what's going on in each of these cases?

python_modules/dagster/dagster/core/executor/child_process_executor.py
142–145

we don't need an exact copy of this, but if you want to be able to terminate inside of a step you need to either have a call to raise_delayed_interrupts in a loop that doesn't block on user code (as we do here and in the celery executor), or wrap the call to user code in raise_interrupts_immediately() (as we do in the dask executor, and in _dagster_event_sequence_for_step, which is called by the single processor)

I don't see where we could put it on ActiveExecution that would make it able to terminate a hanging solid

maybe the PipelineContextManager or somewhere around there for with delayed_interrupts()? The current setup only covers the default&grpc run launcher entry points.

In any process where we are actually executing user code for steps, [1] should have us covered. Then the remaining problem is the host-process executors and how they will deal with their delegated steps if asked to stop. Scattering around with raise_interrupts_immediately(): feels very dubious to me.

One idea is to introduce a "stopping" state to ActiveExecution which could provide a reasonable default behavior of at least not starting any new steps. Then some way to override this if an executor is capable of forwarding / stopping its delegated step computations.

python_modules/dagster/dagster/core/execution/api.py
646–653

what led you to add this one?

python_modules/dagster/dagster/core/execution/plan/execute_step.py
415–416

[1]

python_modules/dagster/dagster/core/execution/plan/external_step.py
60–65

what would actually happen here if you got an interrupt while waiting on a subprocess.call

python_modules/libraries/dagster-dask/dagster_dask/executor.py
255–256

did you test this? what state does this actually leave things in?

python_modules/libraries/dagster-databricks/dagster_databricks/databricks_pyspark_step_launcher.py
126–130

is this actually desired if we don't forward the interrupt to databricks?

"maybe the PipelineContextManager or somewhere around there for with delayed_interrupts()" - i think the problem with this is that we really do want to immediately override the signal handler as soon as we enter the new process, because in general Python code is not interrupt-safe (for example, an interrupt during any destructor will just not get interrupted).

"One idea is to introduce a "stopping" state to ActiveExecution which could provide a reasonable default behavior of at least not starting any new steps. " - this is essentially what we have now by having raise_delayed_interrupts after each event yield in _core_execution_iterator (see my response to your inline comments there) - that's more like 'dont start any new events' than 'dont start any new steps', but has the benefit of applying to every execution, whereas not all executions currently use ActiveExecution. I'm not totally following how this ActiveExecution proposal would work for other executors, like dask or celery.

python_modules/dagster/dagster/core/execution/api.py
646–653

to provide a reasonable default behavior for all executors, even custom ones that other people build who don't want to mess around with interrupts (to check for interrupts after each step)

python_modules/dagster/dagster/core/execution/plan/external_step.py
60–65

I would expect it to terminate the subprocess when the parent process dies (as it does today)

python_modules/libraries/dagster-dask/dagster_dask/executor.py
255–256

the short answer is 'no worse than things were before :) ' The nice thing about this change is that since we're just decreasing the places where interrupts can get thrown from 'literally anywhere' to 'a whitelisted set of places', it's unlikely that we're going to be able to introduce new bad behavior from receiving an interrupt - i.e. the places where you can interrupt can really only strictly decrease after this change.

But no, i did not specifically test it yet, I will if we have consensus about this approach.

python_modules/libraries/dagster-databricks/dagster_databricks/databricks_pyspark_step_launcher.py
126–130

as above, it certainly won't make anything worse that it is now. I'm open to making these non-interruptable if it's not something we can actually guarantee.

Just to be clear, I think this is shaping up to be a huge improvement to the system, so all of this discussion is just an effort to make it the best big step forward it can be.

i think the problem with this is that we really do want to immediately override the signal handler as soon as we enter the new process, because in general Python code is not interrupt-safe (for example, an interrupt during any destructor will just not get interrupted).

Hmm, the gap between when start and when we open the context manager is pretty small so I'm still a little skeptical. We could do something where we ensure the guard has been installed in the manager though so that we don't miss new entry points? Or setup it up if not yet setup?

this is essentially what we have now by having raise_delayed_interrupts after each event yield in _core_execution_iterator

I guess to clarify my proposal, this new default would be to not start any new steps while continuing to allow all currently outbound steps to run to completion. The raise in _core_execution_iterator is going to cause the run coordinator process to stop immediately, abandoning all currently-executing steps. Currently the way this manifests is the run coordinator process reporting the run as failed and exiting as the abandoned step process continue to add events to the event log. If we move the check in to ActiveExecution somehow, we could at least emit an engine event saying "hey i got an interrupt, steps [X, Y, Z] are still in flight but ima just die now" if not do something better.

I'm not totally following how this ActiveExecution proposal would work for other executors, like dask or celery.

Celery uses ActiveExecution https://dagster.phacility.com/source/dagster/browse/master/python_modules/libraries/dagster-celery/dagster_celery/core_execution_loop.py$55
Our dask integration is very different than the rest, we submit the whole DAG and then sit back [2]. I think the right answer for dask is a function of how their framework / futures handle the interrupt. What you currently have might be right.

python_modules/dagster/dagster/core/execution/api.py
646–653

could this end up conflicting when executors are trying to handle it them selves? Does it create a very small window while an event is being yielded in which this raise would cause immediate termination instead of allowing the executor to do its clean up?

python_modules/dagster/dagster/utils/__init__.py
406–408

is this actually equivalent to the default sigint handler? just want to make sure it can pierce sleep

python_modules/libraries/dagster-dask/dagster_dask/executor.py
203–206

[2]

alex's feedback:

  • add an additional delay_interrupts() call that we can ensure that all executions go through (you can add one at the very top of the stack if you want to be sure). We could also have it throw if you haven't delayed interrupts and audit top-level callsites :/
  • Take out my default behavior where it checked after every event
  • Add more state to ActiveExecution to detect interrupts, use it in the multiprocess executor and celery executor. Dask is unchanged, as is the single-process executor.

Need to figure out how to test this, esp. the celery piece

python_modules/dagster/dagster/core/execution/plan/execute_step.py
416–421

This test failure is really breaking my brain.

We essentially have a raise_interrupts_immediately contextmanager nested inside a delay_interrupts contextmanager (the one in _ExecuteRunWithPlanIterable.iter).

If an exception is thrown inside the compute_fn here (like a user code error), what i'm seeing is that the finally branch in the OUTER contextmanager (delay_interrupts) is executing BEFORE the finally branch in the INNER contextmanager (raise_interrupts_immediately), causing all kinds of bad things to happen. I cannot come up with any reason that would happen.

If there's no user code exception the two contextmanagers unwind in the expected order.

Don't ever yield while in the raise_interrupts_immediately context

TEST PLAN
existing termination tests, new BK test for the new functions

Did you do any other manual testing? Just to make sure things like ctrl-c from terminal experience is still good for some different configurations (which is hard to capture in BK tests)?

I think this is looking good, let's try to get this in friday/monday so it has some soak time before release?

python_modules/dagster/dagster/core/execution/context_creation_pipeline.py
189

lol

python_modules/dagster/dagster/utils/__init__.py
348

add comment
name _process_receieved_interrupt to help clarify use of global?

375–379

not sure if there is anything that can be done about it - but just thinking about the case that this was somehow already changed from the "true original" sigint handler

python_modules/libraries/dagster-celery/dagster_celery/core_execution_loop.py
69 ↗(On Diff #22516)

did you consider setting terminate=True on revoke here? Definitely want some more substantial testing for this one

alangenfeld added inline comments.
python_modules/libraries/dagster-celery/dagster_celery/core_execution_loop.py
69 ↗(On Diff #22516)

cc @catherinewu did you experiment with revoke when you were adding celery-k8s termination support ?

python_modules/libraries/dagster-celery/dagster_celery/core_execution_loop.py
69 ↗(On Diff #22516)

I didn't experiment with it, but I am reasonably convinced that it is preferable to the status quo https://github.com/dagster-io/dagster/issues/2780

to your queue for celery test / test plan updates

This revision now requires changes to proceed.Tue, Sep 29, 4:00 PM

rebase, added diffs on top of this that test multiprocessing, celery, and dask

depending on how confident you are feeling - wait til after release to land

This revision is now accepted and ready to land.Thu, Oct 1, 3:07 PM

don't mark interrupted tasks as failed, so that we can take advantage of the IncompleteExecutionPlan exception

tweak interrupted log language