Page MenuHomeElementl

monitor sensor prototype
ClosedPublic

Authored by yuhan on Apr 16 2021, 12:08 AM.

Details

Summary

define a sensor that listens to the event logs and invokes user_fn on pipeline failure events:

@pipeline_failure_sensor
def slack_on_pipeline_failure(context: PipelineFailureSensorContext):
    user_fn()

add slack_on_pipeline_failure the same way as a regular sensor:

@repository
def my_repo():
    return [slack_on_pipeline_failure]

next steps: https://elementl.quip.com/0B9wAdCZAb3m/Pipeline-Sensors#VFTACAyPhp3

Test Plan

local dagit + local daemon + slack message

Screen Shot 2021-05-12 at 6.53.36 PM.png (2×5 px, 2 MB)

special cases:

  1. when there are interleaving runs, i.e. pipeline1 is running (starts before pipeline2 and will eventually fail), and pipeline2 has failed, it'd alert on pipeline 2 first and then pipeline 1 when it fails
  2. when cursor is none (the first time the user runs the sensor), it'd init the cursor to be the current timestamp and alert going forward:
    Screen Shot 2021-05-24 at 10.14.12 PM.png (345×1 px, 41 KB)
  3. when the daemon is down and back, it'd backfill alerts to where the cursor was at
  4. skip a monitor request:
    Screen Shot 2021-05-24 at 10.14.18 PM.png (331×1 px, 35 KB)

Diff Detail

Repository
R1 dagster
Branch
yuhan/pipeline-hook-sensor
Lint
Lint Passed
Unit
No Test Coverage

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
python_modules/dagster-test/dagster_test/toys/sensors.py
129 ↗(On Diff #37440)

ok will defer it

python_modules/dagster/dagster/core/definitions/monitor_sensor.py
48 ↗(On Diff #37440)

updated the diff just now - im imagining yielding MonitorRequest to differentiate the output from a regular sensor's RunRequest. this is also to thread info like origin_run_ids so

  • the sensor machinery can log meaningful info like "a monitor request has succeeded"
  • the sensor job state and tick can be updated with that info specific to this kind of sensors
  • dagit can show relevant runs where the runs are the originating runs rather than the target runs
yuhan marked an inline comment as done.

base decorator

naming spitballing:

  • pipeline_failure_sensor / pipeline_sensor which means it watches pipelines - accurate, no new noun
  • pipeline_failure_monitor / pipeline_monitor which creates a new noun "monitor" but the name "monitor" is more intuitive as it describes the use case not the machinery

as im writing, i think im leaning towards pipeline_failure_sensor / pipeline_sensor so we wont be introducing new nouns too early


If a monitor ultimately boils down to a sensor, is it worth introducing a new noun? I.e. should we just call this pipeline_failure_sensor?

i was thinking of subclassing the sensor def to a PipelineSensorDefinition later in a diff, and we can have a @pipeline_sensor to address a generic sensing pipeline (i.e. monitoring) case.
it'd take arbitrary dagster event types and a set of pipeline pointer (e.g. pipeline_name), react to events and yield MonitorRequest. something like:

@pipeline_sensor(
   dagster_event_types=[DagsterEventType.PIPELINE_START],
   pipeline_names=["my_pipeline"], # pipeline_names=None means it watches all pipelines
) 
def alert_on_pipeline_start:
   ...

We probably will want to add a version that monitors / senses failures for a particular pipeline or set of pipelines. How do we imagine the name of that fitting in with the name of this API?

@pipeline_failure_sensor(pipeline_names=["my_pipeline"])
def alert_on_pipeline_failure_whitelisted:
   ...

This is looking pretty good to me. Not sure about the naming, but don't have great alternatives (pipeline_failure_watcher? watch_pipeline_failure?).

As an aside, wondering if the user function could optionally return some metadata that gets reported back in the engine event.

What about @pipeline_failure_sensor? @alangenfeld might also have opinions.

python_modules/dagster/dagster/core/definitions/monitor_sensor.py
75 ↗(On Diff #37499)

Actually, how does this work.... Wouldn't this set the cursor beyond a run that doesn't fail immediately?

I'm ambivalent about pipeline_failure_sensor. I think it's nice that it exposes the underlying implementation, but not sure if we want to lock in the expectation that we support the same signature for the user function as you would a regular sensor evaluation function.

I'm ambivalent about pipeline_failure_sensor. I think it's nice that it exposes the underlying implementation, but not sure if we want to lock in the expectation that we support the same signature for the user function as you would a regular sensor evaluation function.

Yeah, I suppose it partly depends on how closely we want to connect the two concepts. Are there two separate user-facing concepts that share an underlying implementation, or is there a single flexible concept? When someone goes to view it in Dagit, should it show up in the list of sensors, or in a separate list of "monitors"? If someone wants to understand the object that's returned by the pipeline_failure_x decorator, do we direct them to the SensorDefinition API doc, or do we direct them to API doc for MonitorDefinition? If we later want to enable someone to define something that launches retries on pipeline failure, would we want to add a pipeline(s) param to the pipeline_failure_x, or would we want to expose a separate decorator and type?

bumping my early comment in case you missed it.

In D7476#206864, @yuhan wrote:

naming spitballing:

  • pipeline_failure_sensor / pipeline_sensor which means it watches pipelines - accurate, no new noun
  • pipeline_failure_monitor / pipeline_monitor which creates a new noun "monitor" but the name "monitor" is more intuitive as it describes the use case not the machinery

as im writing, i think im leaning towards pipeline_failure_sensor / pipeline_sensor so we wont be introducing new nouns too early


If a monitor ultimately boils down to a sensor, is it worth introducing a new noun? I.e. should we just call this pipeline_failure_sensor?

i was thinking of subclassing the sensor def to a PipelineSensorDefinition later in a diff, and we can have a @pipeline_sensor to address a generic sensing pipeline (i.e. monitoring) case.
it'd take arbitrary dagster event types and a set of pipeline pointer (e.g. pipeline_name), react to events and yield MonitorRequest. something like:

@pipeline_sensor(
   dagster_event_types=[DagsterEventType.PIPELINE_START],
   pipeline_names=["my_pipeline"], # pipeline_names=None means it watches all pipelines
) 
def alert_on_pipeline_start:
   ...

We probably will want to add a version that monitors / senses failures for a particular pipeline or set of pipelines. How do we imagine the name of that fitting in with the name of this API?

@pipeline_failure_sensor(pipeline_names=["my_pipeline"])
def alert_on_pipeline_failure_whitelisted:
   ...

i share all the concerns you two raised. i'm leaning towards pipeline_failure_sensor because

  • it accurately describes what it is without introducing new nouns.
  • one con of this is that it exposes the underlying implementation rather connecting to the use cases directly. but imo it's too early to decide a new noun as there are a few open questions that affect the decision.

here are a few next steps that will help us understand whether we need a new noun and what it would be called (sequenced by priority):

user facing wise:

  • in dagit, how would a "pipeline failure sensor" be represented? currently in mvp, it makes sense to share the sensor page ui. going forward, if we think we need to lift it up to be a new job type (for example, we would have 3 separate pages sensor, schedule, and monitor in dagit).
  • in python api, what a more generic pipeline sensor would look like? see the proposal above
  • in python api, do we return a sensor def or something else like a subclass like MonitorDefinition? - i think the name of the returned type should be consistent with the decorator name, i.e. @pipeline_failure_sensor returns SensorDefinition or @pipeline_failure_monitor returns MonitorDefinition.

internal representation wise, would this underlying implementation eventually diverge from SensorDefinition and be another definition that's at the same level of sensor/schedule?

im proposing naming it pipeline_failure_sensor for now without exporting it, prototyping the above steps and see if we need a new noun

python_modules/dagster/dagster/core/definitions/monitor_sensor.py
75 ↗(On Diff #37499)

good catch. this logic assumes runs dont interleave. will switch to scan the event table and cursor on record_id

im proposing naming it pipeline_failure_sensor for now without exporting it, prototyping the above steps and see if we need a new noun

That sounds like a good direction to me. Thanks for your detailed response. I hadn't seen that comment you linked - sounds like you had thought through a bunch of this stuff already.

fixing db query logic

python_modules/dagster/dagster/core/definitions/monitor_sensor.py
75 ↗(On Diff #37499)

was changing the evaluation logic to scan the event table and move the cursor to the curr record id on the event log table. but when hit the issue that i cant scan event logs without run id in sqlite. prototype here: https://dagster.phacility.com/D7929 - seems a dead end

will need to figure out another way to 1) move the cursor forward while 2) support the "runs not failing immediate" case. - also thinking of separating this query fix from this diff so to isolate the problem...


the "runs not failing immediate" case is:

when an evaluation starts and at the time:

  • pipeline1 is running (will eventually fail)
  • pipeline2 has failed
  • pipeline1 started before pipeline2

the machinery will alert on pipeline2 and move the the cursor to the latest origin_run_id which will be pipeline2's run id. so the later evaluations will query after pipeline2. it means when pipeline1 eventually fails, we won't alert on it, because pipeline1 record in the runs db existed before pipeline2.

fix query: add get_runs_by_timestamp

pipeline_failure_monitor -> pipeline_failure_sensor
MonitorSensor -> PipelineSensor

python_modules/dagster/dagster/core/storage/runs/in_memory.py
19–32 ↗(On Diff #38280)

we could change this out to be sqlite that opens in memory only DB copy using ":memory:" https://sqlite.org/inmemorydb.html

145–146 ↗(On Diff #38280)

nothing enforces that this method will only be called by sensors - i think an error message like this would age poorly. It should just report that its not implemented here because the in memory run storage doesnt (yet) track times

python_modules/dagster/dagster/core/storage/runs/sql_run_storage.py
264–291 ↗(On Diff #38280)

I think an alternate approach to this would be

  • add get_run_rows as an alternate to get_runs that returns the whole row
  • add a filter for updated after
  • add an orderby arg to get_runs / get_run_rows

I just think its weird to have this partial specialization that takes the same filters that get_runs does but is a separate function

python_modules/dagster-test/dagster_test/toys/sensors.py
110 ↗(On Diff #38280)

I saw a comment that said pipeline_failure_monitor -> pipeline_failure_sensor, but still seeing pipeline_failure_monitor here.

yuhan marked an inline comment as done.

get_run_rows(self, filters=None, limit=None, order_by=None, ascending=False) returns the entire rows as a list of dicts

yuhan added inline comments.
python_modules/dagster-test/dagster_test/toys/sensors.py
110 ↗(On Diff #38280)

updated. note that im keeping MonitorRequest to differentiate the RunRequest (PipelineRequest sounds odd). it's currently internal only so imo it's fine to keep the "monitor" name there until we set on a better name

python_modules/dagster/dagster/core/storage/runs/in_memory.py
19–32 ↗(On Diff #38280)

sounds good. will do a follow up diff for it

145–146 ↗(On Diff #38280)

will implement it the in mem sqlite follow up

python_modules/dagster/dagster/core/storage/runs/sql_run_storage.py
264–291 ↗(On Diff #38280)

get_run_rows then

python_modules/dagster-test/dagster_test/toys/sensors.py
110 ↗(On Diff #38280)

got it - sounds good

This is looking close!

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
19 ↗(On Diff #38398)

Consider using a NamedTuple.

26 ↗(On Diff #38398)

Type annotations would be helpful.

30 ↗(On Diff #38398)

Requiring users to do events[0] is a little ugly. It's also not entirely clear to me what events refers to - e.g. is it all events inside the pipeline? Thoughts on making this PipelineFailureSensorContext and having this called failure_event?

42 ↗(On Diff #38398)

monitor -> sensor?

45 ↗(On Diff #38398)

Leave out this mention of MonitorRequest?

54 ↗(On Diff #38398)

monitor -> sensor?

110 ↗(On Diff #38398)

If we choose to later transition to a record_id-based approach, will need to think about what back-compat looks like.

134 ↗(On Diff #38398)

Might be worth mentioning that a sensor was involved. E.g. 'Sensor "{sensor_name}" processed failure of this run".

python_modules/dagster/dagster/core/definitions/run_request.py
59 ↗(On Diff #38398)

We chatted in person about this name. Not super high stakes because we're not surfacing it to users, but I do think it's worth picking a name that will make sense to internal readers. Maybe something like PipelineRunEventReaction? "Communicates that the sensor acted on an event from a particular pipeline run".

This revision now requires changes to proceed.Fri, May 28, 6:45 PM
yuhan added inline comments.
python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
30 ↗(On Diff #38398)

see discussion on https://elementl.quip.com/0B9wAdCZAb3m/Pipeline-Sensors#VFTACA1GYf4 i think it makes sense for now to make the context be PipelineFailureSensorContext and expose failure_event so we don't have to worry about potential generic extension atm

42 ↗(On Diff #38398)

oops yes

110 ↗(On Diff #38398)

yes and i plan to land the stacked diffs all together. in terms of potential back-compat, we can have a backcompat cursor value check at the beginning if needed.

python_modules/dagster/dagster/core/definitions/run_request.py
59 ↗(On Diff #38398)

this really is just a metadata record for passing some values to the UI/logs. just spitballing: how about some generic form like:

  • SensorMetadata
  • SensorTickResult
  • EvaluationResult
yuhan marked 7 inline comments as done.
  • PipelineFailureSensorContext.failure_event
  • MonitorRequest -> PipelineRunReaction
  • move instance.report_engine_event from sensor's evaluation to sensor's machinery (daemon/sensor.py)
python_modules/dagster/dagster/core/definitions/run_request.py
59 ↗(On Diff #38398)

PipelineRunReaction then. and move the instance.report_engine_event piece into the sensor's machinery so the "reaction request" actually does something other than passing data.

This revision is now accepted and ready to land.Tue, Jun 1, 7:51 PM
yuhan edited the summary of this revision. (Show Details)

rebase

yuhan added a child revision: Restricted Differential Revision.

add updated_after to GraphenePipelineRunsFilter

This revision was automatically updated to reflect the committed changes.
yuhan removed a child revision: Restricted Differential Revision.