Page MenuHomePhabricator

[sensors-3] add sensor cli, stored state
ClosedPublic

Authored by prha on Tue, Nov 3, 9:56 PM.

Details

Summary

Adds a generic jobs and job ticks storage table, which is structured the same as the schedules / schedule tick tables.

Currently the schedules are still only being written to the schedules table. The plan is to have both schedules and sensors written to a unified jobs table, and to then drop the schedules only tables.

Test Plan

bk

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

prha requested review of this revision.Tue, Nov 3, 10:15 PM
dgibson requested changes to this revision.Wed, Nov 4, 2:31 AM

if we're doing a new table can we have it keyed by external_origin_id rather than origin_id? i know it leaves the schedule and jobs tables out of sync but the plan is to move the schedules table to use external_origin_id as well, see D4941

python_modules/dagster/dagster/core/origin.py
226–241 ↗(On Diff #25157)

can we make ExternalJobOrigin instead?

python_modules/dagster/dagster/core/scheduler/job.py
118

we discussed this a bit offline but the tricky thing with schedules is that the key here is really more of a semantic thing (like a partition key) then the time at which execution started

Like if your scheduler goes down for a day and it runs on the full previous day, each schedule that it runs has a timestamp that's spaced an hour apart, they don't all have the same timestamp - so this description won't be accurate

Do jobs have any kind of an identifier string that could go here as well? how do you tell two job executions from the same job apart if they executed at the same time?

This revision now requires changes to proceed.Wed, Nov 4, 2:31 AM
prha edited the summary of this revision. (Show Details)
  • changed origin => external origin
  • added execution_key string

cool, this looks good (other than the black failure). What's the testing plan though as there's a whole bunch of new untested code here? Is that going to be elsewhere in the stack?

python_modules/dagster/dagster/core/storage/schedules/schema.py
39

index? we do lots of filters of this by type right?

52

index?

53

index?

59–61

oh wait there are indexes down here? what does the index=True above do then?

prha planned changes to this revision.Thu, Nov 5, 4:05 PM

Will probably have another round of edits as I add tests to this and subsequent diffs.

python_modules/dagster/dagster/core/storage/schedules/schema.py
39

good idea

59–61

oh, interesting, missed the index=True.

I think we still want the multiple column indexes, but can drop idx_job_tick_origin_key.

added cli tests, schedule storage tests
changed sql indexes
renamed timestamp => cursor in table
tracked last_cursor in job state

dgibson requested changes to this revision.Tue, Nov 17, 2:33 PM

I think this is looking great - the one thing I want to discuss before it lands since it's a semi-permanent schema change is whether or not we need both execution_key and cursor - if you anticipate people will ever use them differently than I buy it, but if cursors aren't part of the user-facing API for sensors i think it's unlikely? Will see how that plays out later in the stack

Less importantly (could be done separately from this diff) I think we don't need the 'up' command for sensors if we make a few small changes to what happens when we start a sensor (like we just did with schedules)

python_modules/dagster/dagster/cli/sensor.py
22

wondering if we can move straight to a place where you never need to run this command? That's where we are with schedules in the new scheduler now

188–199

yeah maybe let's just take this out? and apply the changes from https://github.com/dagster-io/dagster/commit/09a38373a2c060f9348f309bb667dc847835561b. (pretty much just creating a row when none exists when starting a sensor) to the sensors DB

python_modules/dagster/dagster/core/host_representation/handle.py
357

is it worth including the job type here as well or no?

python_modules/dagster/dagster/core/host_representation/origin.py
228–231

yay

python_modules/dagster/dagster/core/scheduler/job.py
28

last_completed_cursor? or is that not accurate

115

yay (although not positive about the name, i have no helpful alternate suggestions)

132–145

i've kind of lost track of where we landed in the back and forth about whether a single tick can have multiple runs? appears from this that it cannot, at least at this part of the stack?

python_modules/dagster/dagster/core/storage/schedules/schema.py
52–53

i'm happy with the inclusion of execution_key - but do you think cursor will ever differ from execution_key?

python_modules/dagster/dagster_tests/cli_tests/command_tests/test_cli_commands.py
271–289

these are actually the same now, I just removed this in D5167 - new tests can just use scheduler_instance, no need for managed_grpc_scheduler_instance

325–331

remove me

334

remove this mark (it is going away)

This revision now requires changes to proceed.Tue, Nov 17, 2:33 PM
prha planned changes to this revision.Wed, Nov 18, 3:36 PM

fixed bad rebase before, renamed cursor back to timestamp.

Previously, we were using update_timestamp to grab the get_latest_job_tick, but that's
insufficient for re-running historical ticks. Also, it's the book-keeping timestamp, rather
than the logical timestamp.

Having both timestamp and execution_key in the table are valuable for enforcing idempotency
with schedules when a tick generate multiple runs. The tuple of timestamp/execution_key
become the unique execution identifier for schedules.

dgibson added inline comments.
python_modules/dagster/dagster/core/host_representation/external.py
454–457

eeeeexcellent

python_modules/dagster/dagster/core/host_representation/origin.py
243–244

remove this if unused?

This revision is now accepted and ready to land.Wed, Nov 18, 7:46 PM
This revision was automatically updated to reflect the committed changes.