Page MenuHomePhabricator

Celery run queue
AbandonedPublic

Authored by johann on Sep 8 2020, 8:14 PM.

Details

Summary

A new run launcher which enqueues a new celery task to create the run coordinator job. This enables concurrency limits on pipeline runs.

These tasks should always go to a separate celery worker pool than the step jobs to avoid deadlock in Celery (if all workers were busy with run coordinators). This also should help avoid deadlock at the k8s level when no step jobs can be scheduled, since you can limit the # of run coordinators.

To enable:
In configmap-instance.yaml, CeleryK8sRunLauncher should be CeleryK8sQueuedRunLauncher.
In values.yaml, add

celery:
  extraWorkerQueues:
    - name: "dagster-run-coordinators"
      replicaCount: 1

TODO:

  • priorities
Test Plan

manual

Flower shows the coordinator going to the special worker, and the steps executing on the other.

TODO: unit and integration

Diff Detail

Repository
R1 dagster
Branch
run-queue (branched from master)
Lint
Lint OK
Unit
No Unit Test Coverage

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm_values.py
9

nit: enable_user_deployments, enable_run_launcher_queue?

19

nit: get_dagit_config(...), get_celery_config(...), get_scheduler_config(...)

75

nit: mix of plural and singular in "dagster-run-launchers" and "run_launcher_queued"

python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s/launcher.py
94–96

I prefer "enable_run_coordinator_queue" and "run_coordinator_queue" instead. I think the use of "default" in "default_run_queue" can be confusing. Also, is "queue_runs" necessary? Can we just check whether "default_run_queue" is set?

296

i think it would be better to instantiate the celery executor and then call app_args() here

403

what if DAGSTER_CELERY_QUEUE_TAG and default_run_queue are both not set?

440

check.inst_param

441

check.inst_param

442

check.str_param since check.is_str returns a bool

443

check.str_param

444

check.str_param

445

check.inst_param

447

check.str_param

448

check.inst_param

497

is Exception type compatible with text?

546

inst_param

552

str_param

553

str_param

554

str_param

555

inst_param

562

str_param

563

inst_param

590

i think we dont need to specify EngineEventData([]), since its the default. Should we add some more engine data here?

python_modules/libraries/dagster-k8s/dagster_k8s/client.py
201

should we have a check_pipeline_run_not_started for the run coordinators?

integration_tests/test_suites/celery-k8s-integration-test-suite/test_run_launcher_queued.py
38

can we test that the run master job gets queued in the right queue via celery api / and that we cant have two run masters going through the this queue at the same time?

integration_tests/test_suites/celery-k8s-integration-test-suite/test_run_launcher_queued.py
38

that we cant have two run masters going through the this queue at the same time?

Do you mean testing that # concurrent runs is not above # of celery worker processes?

integration_tests/test_suites/celery-k8s-integration-test-suite/test_run_launcher_queued.py
38

I was thinking of testing the use case where a user utilizes this feature to run backfills so we can kick off a celery worker with concurrency = 1 and then run a backfill to sanity check that the pipelines run in sequence without overlap

johann added inline comments.
python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s/launcher.py
94–96

If default_run_queue is None but queue_runs is True, then you can use pipeline tags to selectively queue runs. That's also why I used the name default. Is enable_run_coordinator_queue and default_run_coordinator_queue good?

403

The run won't be queued. I think this is useful for now for only enqueuing backfill jobs

563

The only difference is in the error message right? Here and above it's not checking a param, pipeline_run_packed is what's passed

johann added inline comments.
integration_tests/test_suites/celery-k8s-integration-test-suite/test_run_launcher_queued.py
38

Got it. I think that test will be blocked on https://github.com/dagster-io/dagster/issues/2671, since tests get very flakey rn if you have multiple workers. If you put everything in the same queue then you can't verify that behavior

integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/cluster.py
226

dagster_instance_for_run_coordinator_queue?

integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm.py
68

helm_namespace_for_run_coordinator_queue?

integration_tests/python_modules/dagster-k8s-test-infra/dagster_k8s_test_infra/helm_values.py
10

given the update in the constructer, should we use enable_for_run_coordinator_queue?

python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s/launcher.py
574

i think this naming could be a bit misleading since "launch" in our system usually refers to fire-and-forget

590

what if wait_for_job_success raises an error?

python_modules/libraries/dagster-k8s/helm/dagster/templates/deployment-celery.yaml
43 ↗(On Diff #22652)

should we have the worker listen to both queues for this release and deprecate dagster queue in a later release? otherwise we will break k8s users in a frustrating way

python_modules/libraries/dagster-k8s/helm/dagster/values.yaml
270 ↗(On Diff #22712)

runCoordinatorQueue for consistency? or runCoordinatorQueueConfig?

272 ↗(On Diff #22712)

defaultRunCoordinatorQueue for consistency?

273 ↗(On Diff #22712)

I think we should split out out this comment so that we have a description per field. ie "If not enabled, by default runs will launch without queuing." could be above the "enabled field"

johann marked 11 inline comments as done.

various renamings

python_modules/libraries/dagster-k8s/dagster_k8s/client.py
201

I be

@catherinewu I think this has addressed most concerns.

Outstanding comments:

  • Should we check_pipeline_run_not_started for the run coordinators Seems reasonable, that would apply to both the queued and current code paths though so I think it would make sense in a different diff
  • Checking celery queues in the integration test Maybe I'm over thinking this, but it seems tough to do this robustly. I think this would be done with Celery.app.control.inspect and checking for tasks with the monitoring? Comments here imply that it can be a bit tricky, and might also require some rabbitmq specific checks. Seems like the better way to go would be using the Celery mock utils. I think that also could go in a different diff- using mocked celery would apply to testing the executor as well
helm/dagster/values.yaml
274

typo: can

integration_tests/test_suites/celery-k8s-integration-test-suite/test_run_launcher_queued.py
73

can we check for this engine event "Sending run coordinator task to Celery" to make sure it goes down the code path we expect?

and "Run coordinator finished, Celery exiting"

python_modules/libraries/dagster-celery-k8s/dagster_celery_k8s/launcher.py
575

i think we should catch if wait_for_job_success throws an exception

approving conditional on checking for engine events (ie "Sending run coordinator task to Celery" + "Run coordinator finished, Celery exiting") in tests and some error handling for if "wait_for_job_success" raises an exception

Macro pooh_eating_honey:

This revision is now accepted and ready to land.Tue, Oct 6, 4:59 PM