Page MenuHomeElementl

RunStatusSensorDefinition and @run_status_sensor
ClosedPublic

Authored by yuhan on Jul 14 2021, 7:56 PM.

Details

Summary

depends on D8896

introducing a generic pipeline_sensor so users can define sensors that react to pipeline statuses like starting, started, success, etc
API:

@pipeline_sensor(pipeline_run_status=PipelineRunStatus.STARTED)
def my_pipeline_started_sensor(_):
    ...

it takes pipeline_run_status arg which internally will be mapped to a dagster event type (lookup in D8896)

changes on implementation details:

  • created PipelineSensorDefinition
  • built pipeline_failure_sensor on top of this generic
Test Plan

bk

Diff Detail

Repository
R1 dagster
Branch
yuhan/pipeline-sensor
Lint
Lint Passed
Unit
No Test Coverage

Event Timeline

Harbormaster returned this revision to the author for changes because remote builds failed.Jul 14 2021, 8:20 PM
Harbormaster failed remote builds in B33719: Diff 41637!

The run status => event mapping feels a little weird to me, but maybe results in the desired behavior.

For started events, it's expected that I fire the sensor even though by the time I've fired it, the run might have already changed state?

Thoughts on calling this pipeline_event_sensor? pipeline_sensor could be interpreted as a sensor that kicks off a pipeline.

For started events, it's expected that I fire the sensor even though by the time I've fired it, the run might have already changed state?

yes bc it's looking at the events not the pipeline statuses, which i think is the desired behavior. alternatively, if we look at the run status, the evaluation may miss firing if the state changes faster than the tick interval.


re: pipeline_event_sensor
yes or:

  • run_status_sensor
  • pipeline_status_sensor

if we go with pipeline_event_sensor, it may make more sense to just have dagster_event as the arg instead of pipeline_run_status

It gets a little hairy if we allow arbitrary dagster event types, because the current implementation (with the RunShardedEventsCursor) implicitly assumes a 1-to-1 event to run mapping.

(reminds me that we should have tests exercise both the sharded and unsharded sqlite implementations).

run_status_sensor or something like that makes sense to me.

  • run_status_sensor
  • added test to cover the run interleave case for both run-sharded storage and non-run-sharded
yuhan retitled this revision from PipelineSensorDefinition and @pipeline_sensor to RunStatusSensorDefinition and @run_status_sensor.Jul 19 2021, 9:12 PM
yuhan added a reviewer: alangenfeld.
python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
56

Would it make sense to call this RunStatusSensorContext?

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
56

oops. will update

PipelineSensor -> RunStatusSensor

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
26

Should this be RunStatusSensorCursor?

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
26

this is an internal representation and it gets persisted in the job ticks. i'll do a backcompat follow up to rename it so we wont break users.

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
26

What’s the reason for renaming it in this diff?

PipelineSensorCursor -> RunStatusSensorCursor + backcompat fallback

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
26

nvm updated the diff to include it

This revision is now accepted and ready to land.Jul 22 2021, 12:41 AM