Page MenuHomePhabricator

[sensors-2] sensor def
ClosedPublic

Authored by prha on Tue, Nov 3, 9:43 PM.

Details

Summary

Sets up a sensor definition, inheriting from the abstract job type.

Currently all jobs are defined by:
pipeline_name
solid_selection
mode

At evaluation time, sensors return a should_execute boolean, a run_config dict, and a tags dict.

An alternative implementation has a sensor defined by a single job params function, which returns a list of JobParams (containing a run config dict and a tags dict). Will send out in a separate diff for comparison.

Test Plan

bk

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

prha requested review of this revision.Tue, Nov 3, 10:02 PM

This seems very reasonable overall. The part I like the least is the name 'last_checked_time', that kind of implies non-idempotency (which i guess is more true for sensors than it is for schedules). Just trying to think ahead about how I'll fit schedules into this model if we combine them more

python_modules/dagster/dagster/api/snapshot_sensor.py
11

so for this to work with schedules it wouldn't so much be last_checked_time as last_executed_time? not sure if that matters

last_checkpoint_time?

python_modules/dagster/dagster/core/definitions/decorators/sensor.py
16

this is out of date right?

python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py
13–15

can this be

with instance_for_test() as instance:
  with get_bar_repo_handle() as repository_handle:

I'm switching up this pattern in the other tests right now, trying to get us to use instance_for_test everywhere

19–20

test should probably exercise should_execute and tags, you could make the test sensor a little more interesting and sometimes return false based on the time passed in or something?

This revision is now accepted and ready to land.Wed, Nov 4, 2:10 AM

Here are some of the use cases that I had in mind for sensors:

  • Whenever file X is updated, I want to launch a run.
  • Whenever pipeline X completes a run, I want to kick off a run for pipeline Y.
  • Whenever an object shows up in S3 directory X, I want to launch a run that processes that file.
  • Whenever pipeline X completes writing a new partition to table A, I want pipeline Y to process that partition.
  • All the solids in pipeline X have a manually assigned version attached to them, and pipeline runs are tagged with the version for each solid. Whenever the version for a solid does not match the version of the most recent run, launch a run of that solid and all its downstreams.

I have a clear idea of how I'd implement the first two with this API, and a less clear idea for the last three. Are there ways of implementing them that I'm missing? Or are sensors not meant to cover them?

@sandyryza I think either sensor API should be able to accomplish all of the scenarios you've outlined... But that's a good list.

Next up on my plate is to create a suite of "interesting" sensors to implement.

python_modules/dagster/dagster/api/snapshot_sensor.py
11

I initially had last_execution_time but execution makes me think of a pipeline run.

What do you think about last_evaluation_time?

This revision was automatically updated to reflect the committed changes.