Page MenuHomeElementl

[crag] Add mode to the pipeline runs filter
ClosedPublic

Authored by cdecarolis on Jun 30 2021, 7:50 PM.

Details

Summary

This diff adds mode to the PipelineRunsFilter object, adds a schema entry for mode in the runs table, and a method for data migration to retrieve the mode off of the pipeline run and add it to the corresponding row in the runs table.

Test Plan

Added backcompat tests for pg and sqlite, as well as tests to the run storage battery.

Diff Detail

Repository
R1 dagster
Branch
pipelinerunfilter
Lint
Lint Passed
Unit
No Test Coverage

Event Timeline

cdecarolis retitled this revision from [crag] Add a mode filter to the pipeline runs filter to [crag] Add mode to the pipeline runs filter.Jun 30 2021, 7:51 PM
This revision now requires changes to proceed.Jun 30 2021, 8:18 PM

Add unit test for mode, add to run table schema

python_modules/dagster/dagster/core/storage/runs/schema.py
18

do we want to add this without a DB migration to fill it in for historical runs? I feel like its a very busted experience if the filter just ignores old runs.

Do we want to do the migration and add this column even though in the future with Job it seems likely we will not have mode ?

python_modules/dagster/dagster/core/storage/runs/schema.py
18

won't we always need this for backcompat tho? Or is the idea that pipeline name + job will be consolidated into one column by the time we get here

python_modules/dagster/dagster/core/storage/runs/schema.py
18

I think we need both a schema migration and a data migration.

I *think* it might be okay to kick off the data migration inside of the schema migration, but I'm not sure. In the past, the line was whether it touches the event log or not.

python_modules/dagster/dagster/core/storage/runs/schema.py
18

We will need to make a large number of schema changes and migrations as we move from pipelines to jobs. It's hard to say exactly how we will want to handle back compat in that future without making further progress on it. You could imagine that we a fill a job_name column with pipeline_name:mode. You could also imagine other complex schemes.

To be clear, the add mode migration potentially followed by the remove mode migration may be the right path, but I think its worth some discussion.

python_modules/dagster/dagster/core/storage/runs/schema.py
18

If we don't promote mode to be a column, the read perf of the query might be bad cos we would be doing json serdes to find the qualified job i.e. tuple(pipeline_name, mode).

I guess we would need both schema migration (for perf) and data migration (to fill mode for historical runs) - even though we know in the future mode won't exist with job, i could imagine to backcompat layer live for a while

if we want to avoid data migration, i think we would need to do backcompat in the storage layer to fill the mode after fetching pipeline runs from the db

to try to distill down why i am hesitant here

  • I think instance migrations are a very negative user experience
  • I think the utility of an efficient implementation of this feature is low given its expected prominent surfacing being behind an opt-in flag

examples of things that i could find persuasive:

  • even the inefficient solutions are not really feasible
  • users who do not attempt to use the mode filter will not be prompted to upgrade their instance

how bad is the perf hit we're likely to take? is it worse than 2x? do we not need to unpack these blobs already?

Evaluate non-schema based approach

implement approach without schema change

python_modules/dagster/dagster/core/storage/runs/sql_run_storage.py
248–249

so the fundamental problem with this is limit

Lets we have (pipeline, mode) history like

(A, 1)
(A, 1)
(A, 1)
(A, 2)
(A, 2)
(A, 2)

If i request pipeline_name=A, mode=2, limit=3 DB-based query will return the first 3 (A, 1) s, and then the in mem will filter all 3 out. I get 0 back and the front-end interprets that as no more results, leaving the (A, 2) s that are actually available hidden.

python_modules/dagster/dagster/core/storage/runs/sql_run_storage.py
248–249

note that likely the front-end interprets getting less results than the limit as a sign to not include a next page button, so failing to meet the limit at all is problematic, not just the case where you 0 out.

couple approaches:

  • determine there is a non-db filter, do not apply any limit in the DB query and limit when the in memory filter is applied
  • pass through the limit, but repeat the DB query (changing cursor and maybe the limit) until you hit limit or exhaust db results

Respin with dagit limit

python_modules/dagster/dagster/core/storage/runs/sql_run_storage.py
248–249

@alangenfeld so if I understand correctly, in reality we want to limit the number of rows dagit displays, but the implementation as it stands is limiting the number of rows actually queried? If so, that makes sense, and I will draw that into the approach.

Put up filter-based approach for posterity even though I'm about to get rid of it

This revision now requires changes to proceed.Jul 6 2021, 8:02 PM
python_modules/dagster/dagster/core/storage/runs/sql_run_storage.py
359–372

I think this should probably only live in the migration function, not a method on the storage class. I don't imagine anyone ever calling this except to run the migration.

TEST PLAN
dagit

Can you be more specific?

python_modules/dagster/dagster/core/storage/runs/migration.py
38–41 ↗(On Diff #41044)

tqdm to show progress at least

python_modules/dagster/dagster/core/storage/runs/sql_run_storage.py
359–372

+

Move migration method off run storage

cdecarolis edited the test plan for this revision. (Show Details)

Add tqdm if print_fn during migration

Add to backcompat test to ensure that migration error only when using new fxnality

This revision is now accepted and ready to land.Jul 7 2021, 10:36 PM

Ensure that pipelines run pre-migration