Page MenuHomeElementl

pipeline sensor - fetch events without run id + sqlite trick

Authored by yuhan on Thu, Jun 3, 9:14 AM.



depends on D7476
address the clock skew issue by updating the pipeline sensor query to context.instance.event_log_storage.get_event_rows

  • for non-sqlite event storage: it cursors at record_id (auto incremented by db which eliminates clock skew risk)
  • for sqlite event storage: it cursors at update_timestamp, SqliteEventLogStorage overrides get_event_rows to workaround the run sharding: it get a list of run ids from the run storage first and then open run-based connection one at a time, i.e. it costs 1 run storage query and N event for run id queries.

how we init the cursor:

  • created a serde-able PipelineFailureSensorCursor which includes record_id and `update_timestamp, and handles backcompat
  • the cursor will be init to (record_id=most_record_event_id_from_event_storage, update_timestamp=curr_timestamp) so we would only alert on future runs
Test Plan
  • bk
  • local dagit+slack alerts using ConsolidatedSqliteEventLogStorage and SqliteEventLogStorage

Diff Detail

R1 dagster
Lint Passed
No Test Coverage

Event Timeline

Harbormaster returned this revision to the author for changes because remote builds failed.Thu, Jun 3, 9:51 AM
Harbormaster failed remote builds in B31583: Diff 38875!
yuhan requested review of this revision.Thu, Jun 3, 6:10 PM

This is looking close.


Could we use typing.NamedTuple to make this easier to read?

class PipelineFailureSensorCursor(NamedTuple):
    record_id: int
    update_timestamp: str

    def is_valid(json_str):
        if json_str is None:
            return False
            obj = deserialize_json_to_dagster_namedtuple(json_str)
            return isinstance(obj, PipelineFailureSensorCursor)
        except JSONDecodeError:
            return False

    def to_json(self):
        return serialize_dagster_namedtuple(self)

    def from_json(json_str):
        return deserialize_json_to_dagster_namedtuple(json_str)

recend -> recent


Small thing, but it's a little bit dangerous to include a comment about how a function is implemented at a call site for that function - the implementation could change and the comment would end up inaccurate.


I don't think the parentheses around record_id, event_record are required.


It's a little bit weird to just ignore this parameter. I know it puts us in a tough spot, because if we honor it, we face the clock skew issue. Not sure what the best solution is - maybe the event log should expose some sort of boolean that expresses whether it can be queried by cursor?

yuhan marked 3 inline comments as done.



typing.NamedTuple doesn't check types at runtime. we will need runtime type check for checking is_valid, for example, if we change the type of update_timestamp, we would want is_valid to return False (dagster.check will error and therefore the func will return False, but typing.NamedTuple will pass and we will miss this backcompat check)


good point. a similar comment is also included in SqliteEventLogStorage.get_event_rows. will refer to that instead of expanding here


how about making it _run_updated_after and therefore when someone passes it in, they really mean to use it.

alternatively, we could warn that run_updated_after is ignored - but the downside is basically the warning shows up every time bc sqlite is the rare case.

going with the private arg for now.


Ah I see. That will raise an error though, not False, right? Because check errors are not JSONDecodeErrors?


Agree on the downside about that warning. @alangenfeld do you have an opinion on the private arg? I don't love it, but I'd be ok with it if you're OK with it.


ya i updated the except to capture all Exception. open to suggestion tho


should still feed mypy types using the less-nice NamedTuple set-up


what do you think about generalizing the cursor and allowing it flow through in after_cursor (and probably before_cursor)? EventsCursor, GlobalEventsCursor, AllEventsCursor

then each impl can unpack and discard/use what it wants and its contained within the impl


i think thats a brilliant idea! will do

yuhan marked 2 inline comments as done.
  • EventsCursor(id, run_updated_after)
  • remove before_cursor bc we are not using it and implementing it in sqlite requires run storage breaking change (run storage now only allows updated_after). if we need it later, we can add it back and update the PipelineRunsFilter to take run_updated_before.



It's best to be as specific as possible with excepts. I think except (JSONDecodeError, CheckError) would cover all cases?

This revision is now accepted and ready to land.Tue, Jun 8, 3:58 PM

nit: mypy the new functions


should we have this and EventsCursor they seem kind of redundant (if we made EventCursor serdes)

think except (JSONDecodeError, CheckError) would cover all cases?

instead of CheckError I recently added a DeserializationError in the serdes module

yuhan marked an inline comment as done.
  • depends on D8299
  • specific except
  • mypy

PipelineFailureSensorCursor is the sensor's cursor while EventsCursor is a wrapper for storage cursor. At the moment they are kinda redundant (PipelineFailureSensorCursor does the serdes plus the validation and init) , but I'd prefer to separate these two to avoid future confusion.

If later we think they will be the same, we can drop this wrapper as it's just an internal representation.