This adds a process invoked via dagster cli that polls the run storage for QUEUED runs and tries to launch them
Details
Diff Detail
- Repository
- R1 dagster
- Branch
- native-run-queue (branched from master)
- Lint
Lint OK - Unit
No Unit Test Coverage
Event Timeline
python_modules/dagster/dagster_tests/queuing_tests/dequeuer_test.py | ||
---|---|---|
2 | Is there somewhere else I can get a handle |
seems very reasonable overall!
python_modules/dagster/dagster/cli/__init__.py | ||
---|---|---|
23 | nit maybe 'queue' or 'run_queue'? the 'ing' part feels a little out of place compared to the rest | |
python_modules/dagster/dagster/queuing/dequeuer.py | ||
59–62 | one thing to think about here is that in this scheme, hanging runs (e.g. the gRPC server crashes while its executing and nothing finishes it) go from a minor annoyance to something that will potentially hold up the queue. We may need some kind of cleanup process that detects hanging runs for that reason (not blocking for this) | |
python_modules/dagster/dagster_tests/queuing_tests/dequeuer_test.py | ||
21 | you could also use overrides={"run_launcher": ...} on instance_for_test and sub in an InMemoryRunLauncher here (might need to pull it out of the test where its currently used) |
did another pass here - the one thing I think we should clarify now is where this CLI command should go - i think it might make more sense alongside the scheduler in that group, to pave the way for deploying the scheduler and the dequeuer together?
dunno if @alangenfeld has other thoughts here too
python_modules/dagster/dagster/cli/__init__.py | ||
---|---|---|
21–24 | so the scheduler ended up with its own dagster-scheduler CLI entry (mostly because schedule vs. scheduler was confusing, plus the fact that it was a separate standing process) maybe this should end up in that dagster-scheduler group instead? esp if we think they are going to end up being deployed together as part of the same process (along with triggers as well) | |
python_modules/dagster/dagster/core/instance/__init__.py | ||
786 | we are going to want to make this non-optional as part of this work, right? maybe not at first since it may be a breaking change, but as part of 0.10.0? | |
python_modules/dagster/dagster/core/storage/runs/sql_run_storage.py | ||
164 | rm | |
python_modules/dagster/dagster/queuing/dequeuer.py | ||
17 | i think we can just leave this out yeah, seems separate | |
51 | meh, is there a concrete benefit to this? | |
python_modules/dagster/dagster_tests/queuing_tests/dequeuer_test.py | ||
21 | ^^ still true, and probably better than mocking out functions |
Not sure what the plan is with this RFC is but I think the principled thing would be to pull out the pipeline_origin change at least to its own diff - likely the queued state addition as well
overall I think this is a reasonable place to start
python_modules/dagster/dagster/cli/__init__.py | ||
---|---|---|
21–24 | ya agree we need to figure out something cohesive around this | |
python_modules/dagster/dagster/cli/queuing.py | ||
16 | what unit is this? | |
16–19 | set types for these options | |
python_modules/dagster/dagster/core/instance/__init__.py | ||
786 | hm i wonder if there are cases where its tough to guarantee this is easily resolvable - in the near term the only assertion that matters is QUEUED status requires an origin |
python_modules/dagster/dagster/cli/__init__.py | ||
---|---|---|
21–24 | That makes sense to me | |
python_modules/dagster/dagster/core/instance/__init__.py | ||
786 | Yes, will add a note | |
python_modules/dagster/dagster/queuing/dequeuer.py | ||
51 | Just to decrease the size of the payload, probably not a big deal. It would now need <run_id, pipeline_origin> |