depends on D7476
address the clock skew issue by updating the pipeline sensor query to context.instance.event_log_storage.get_event_rows
- for non-sqlite event storage: it cursors at record_id (auto incremented by db which eliminates clock skew risk)
- for sqlite event storage: it cursors at update_timestamp, SqliteEventLogStorage overrides get_event_rows to workaround the run sharding: it get a list of run ids from the run storage first and then open run-based connection one at a time, i.e. it costs 1 run storage query and N event for run id queries.
how we init the cursor:
- created a serde-able PipelineFailureSensorCursor which includes record_id and `update_timestamp, and handles backcompat
- the cursor will be init to (record_id=most_record_event_id_from_event_storage, update_timestamp=curr_timestamp) so we would only alert on future runs