Page MenuHomeElementl

RunStatusSensorDefinition and @run_status_sensor
ClosedPublic

Authored by yuhan on Jul 14 2021, 7:56 PM.

Details

Summary

depends on D8896

introducing a generic pipeline_sensor so users can define sensors that react to pipeline statuses like starting, started, success, etc
API:

@pipeline_sensor(pipeline_run_status=PipelineRunStatus.STARTED)
def my_pipeline_started_sensor(_):
    ...

it takes pipeline_run_status arg which internally will be mapped to a dagster event type (lookup in D8896)

changes on implementation details:

  • created PipelineSensorDefinition
  • built pipeline_failure_sensor on top of this generic
Test Plan

bk

Diff Detail

Repository
R1 dagster
Lint
Lint Not Applicable
Unit
Tests Not Applicable

Event Timeline

Harbormaster returned this revision to the author for changes because remote builds failed.Jul 14 2021, 8:20 PM
Harbormaster failed remote builds in B33719: Diff 41637!

The run status => event mapping feels a little weird to me, but maybe results in the desired behavior.

For started events, it's expected that I fire the sensor even though by the time I've fired it, the run might have already changed state?

Thoughts on calling this pipeline_event_sensor? pipeline_sensor could be interpreted as a sensor that kicks off a pipeline.

For started events, it's expected that I fire the sensor even though by the time I've fired it, the run might have already changed state?

yes bc it's looking at the events not the pipeline statuses, which i think is the desired behavior. alternatively, if we look at the run status, the evaluation may miss firing if the state changes faster than the tick interval.


re: pipeline_event_sensor
yes or:

  • run_status_sensor
  • pipeline_status_sensor

if we go with pipeline_event_sensor, it may make more sense to just have dagster_event as the arg instead of pipeline_run_status

It gets a little hairy if we allow arbitrary dagster event types, because the current implementation (with the RunShardedEventsCursor) implicitly assumes a 1-to-1 event to run mapping.

(reminds me that we should have tests exercise both the sharded and unsharded sqlite implementations).

run_status_sensor or something like that makes sense to me.

  • run_status_sensor
  • added test to cover the run interleave case for both run-sharded storage and non-run-sharded
yuhan retitled this revision from PipelineSensorDefinition and @pipeline_sensor to RunStatusSensorDefinition and @run_status_sensor.Jul 19 2021, 9:12 PM
yuhan added a reviewer: alangenfeld.
python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
56

Would it make sense to call this RunStatusSensorContext?

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
56

oops. will update

PipelineSensor -> RunStatusSensor

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
26

Should this be RunStatusSensorCursor?

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
26

this is an internal representation and it gets persisted in the job ticks. i'll do a backcompat follow up to rename it so we wont break users.

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
26

What’s the reason for renaming it in this diff?

PipelineSensorCursor -> RunStatusSensorCursor + backcompat fallback

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
26

nvm updated the diff to include it

This revision is now accepted and ready to land.Jul 22 2021, 12:41 AM