Page MenuHomeElementl

Add sensor cursor (independent from run_key), to skip unnecessary sensor evaluation
ClosedPublic

Authored by prha on Apr 26 2021, 11:00 PM.
Tags
None
Referenced Files
Unknown Object (File)
Sun, Mar 12, 10:58 AM
Unknown Object (File)
Sun, Mar 5, 12:37 AM
Unknown Object (File)
Sat, Mar 4, 1:26 AM
Unknown Object (File)
Thu, Mar 2, 1:05 PM
Unknown Object (File)
Thu, Mar 2, 7:44 AM
Unknown Object (File)
Feb 22 2023, 3:56 PM
Unknown Object (File)
Feb 21 2023, 8:17 PM
Unknown Object (File)
Feb 21 2023, 12:34 PM
Subscribers

Details

Summary

Currently there's no way to reduce the search space of a sensor evaluation function without
requesting a run. This is because there's no way to pass a run_key to advance a cursor when you
want to skip. This diff separates out the idempotence mechanism (run_key) with the sensor
evaluation checkpointing (cursor).

Test Plan

bk

Diff Detail

Repository
R1 dagster
Lint
Lint Not Applicable
Unit
Tests Not Applicable

Event Timeline

Harbormaster returned this revision to the author for changes because remote builds failed.Apr 26 2021, 11:21 PM
Harbormaster failed remote builds in B29516: Diff 36235!

add CLI tool to set / wipe sensor cursor

A couple thoughts:

A sensor might rely on multiple cursors. For example, maybe a pipeline consumes data thats produced by two upstream pipelines, and the pipeline should only trigger when both of them have updated an asset. With the proposed solution, users could come up with a scheme for mushing these cursors into a single string, but could get ugly.

Is it meaningful to attach the cursor update to the RunRequest / SkipReason? Vs. yielding some other event? I'm imagining a situation like the following:

  • I want to launch pipeline Y every time there's a failed run of pipeline X.
  • My cursor is a cursor into the entire event log. When my sensor runs, I query for all events since my cursor.
  • One of the times this happens, I uncover an event that's a pipeline run failure for pipeline X. After that event, there are also a bunch of events I scan that I determine are not pipeline run failures for pipeline X. I want to update my cursor to be at the end of all those events, as well as kick off a pipeline run corresponding to that particular failure of pipeline X.
  • Should I put my cursor on the RunRequest? Should I yield both a RunRequest and a SkipReason?

Re: string cursor... I don't see why we wouldn't have manual construction of the string cursor. With a generic sensor evaluation function, we have no other way of representing intermediate state. Even if we are using a sensor to detect the state of two distinct assets, it's still a single evaluation function and we need some serialized way of representing the progress. At the time that we provide an atomic asset sensor (for a single asset) and we want to provide a way of combining them into a multi-asset sensor, maybe we need to provide some encoding/decoding utility functions, but this feels like the right API for the generic case. Feel free to propose alternatives if you're thinking of something specific.

Re: SkipReason/RunRequest vs some extra object just to carry the cursor... I've been thinking of the RunRequest as the basic output of the sensor evaluation function. We iterate through the run requests in order, and update the recorded cursor as we submit runs. The sensor evaluation function can yield a SkipReason if and only if there are no RunRequests yielded (we otherwise generate an error). While yielding a separate UpdateCursor (or something like that) event might be clearer in terms of answer the question "how do I update the cursor", I think it might muddy the API of the sensor evaluation function as primarily requesting runs (or explicitly skipping). I'm curious how you would come up with conclusion of needing to yield both a RunRequest and a SkipReason? I don't have the freshest pair of eyes on this, but the API doesn't seem that confusing to me.

Feel free to propose alternatives if you're thinking of something specific.

I guess the alternative would be some sort of key/value thing. E.g.

yield RunRequest(..., cursors={"snowflake.orders_table": latest_orders_id, "snowflake.users_table": latest_users_event_id})

Or

context.update_cursor("snowflake.orders_table", latest_orders_id)
context.update_cursor("snowflake.users_table", latest_users_event_id)

One advantage is that we could make them display prettier in Dagit if we wanted to?

I'm curious how you would come up with conclusion of needing to yield both a RunRequest and a SkipReason?

I think I came to it sort of like this:

  • A cursor is kind of an abstraction over the event stream.
  • If I process a single event and want to kick off a run based on it, then I yield a RunRequest with that event ID as the cursor.
  • If I process a single event and choose not to kick off a run based on it, then I yield a SkipReason with that event ID as the cursor.
  • If I process two events and choose to kick off a run based on one of them, but not kick off a run based on the other, shouldn't I yield the union of what I'd be yielding in the above cases?

I agree that it doesn't really make sense if I try to think more deeply about it, but I think the confusion is a bit of a symptom of putting the cursor on the RunRequest / SkipReason - it makes it seem like they must be related.

Oh, I see.

I guess cursor might have a specific connotation with a database cursor or a cursor for a single event stream. I tend to think of it more generically as meaning more of a "checkpoint" and only having semantics towards the sensor itself, and not necessarily any underlying data that the sensor might be inspecting.

I guess cursor might have a specific connotation with a database cursor or a cursor for a single event stream. I tend to think of it more generically as meaning more of a "checkpoint" and only having semantics towards the sensor itself, and not necessarily any underlying data that the sensor might be inspecting.

Yeah, I guess it depends on what metaphor / mental model we want people to have for their sensors. I could imagine a couple directions:

  • An "opinionated" mental model where we expect users to think of sensors as stream listeners. We could explain a sensor as consuming multiple streams of events and name the parameter something that communicates it's pointing to a position in a stream.
  • An "un-opinionated" mental model where we don't try to impose any particular metaphor. In that case, maybe we just call it something like "persistent_state"?

Maybe worth an in-person discussion.

  • change context to add update_cursor
python_modules/dagster/dagster/core/definitions/sensor.py
80

A cursor is used to represent a position in a stream of events that the sensor is listening to. This allows to code inside the sensor to query using the cursor and avoid processing the same events repeatedly.

For sensors that consume data from multiple streams - e.g. that make the decision of whether to submit a run based on updates to multiple assets - the cursor might represent a composite of the positions in each stream. 

Thoughts?

python_modules/dagster/dagster/core/definitions/sensor.py
80

I just don't want to be prescriptive in defining what an event stream is. How about we can add the example of multiple streams the first time we get a question about it in #dagster-support?

An optional string value that can be used as a cursor when the sensor is processing an event stream.  This allows the sensor to keep track of progress and avoid duplicate work across sensor evaluations.  One common use of this might be to encode/decode serializable state data between sensor evaluations.

@sandyryza does evaluation_cursor seem better or more descriptive?

Sorry for being slow on reviewing this. This is nothing new exactly, but my broad concern remains that:

  • I think the multiple-upstream case is a first class use case, not an edge case that we need to wait to discover is important.
  • I don't think what's provided here is friendly to users who want to implement that.

A few of the ways we could make it friendlier:

  • Orienting the API around some sort of collection.
  • Talking about it in the API doc.
  • Adding some examples.

It doesn't necessarily need to be in this diff, but IMO it's worth doing at least one of those before considering this "complete".

I know that you and I see things differently. I'm OK with this getting merged, but I think it would be worthwhile to solicit an opinion from at least one other person on the team.

@sandyryza does evaluation_cursor seem better or more descriptive?

I think I prefer cursor to evaluation_cursor.

sandyryza added a subscriber: schrockn.

FYI @schrockn because public API change

This revision is now accepted and ready to land.May 11 2021, 11:16 PM
This revision was landed with ongoing or failed builds.May 13 2021, 5:43 PM
This revision was automatically updated to reflect the committed changes.