Page MenuHomeElementl

pipeline sensor - fetch events without run id + sqlite trick
ClosedPublic

Authored by yuhan on Jun 3 2021, 9:14 AM.
Tags
None
Referenced Files
Unknown Object (File)
Thu, Mar 23, 12:54 AM
Unknown Object (File)
Tue, Mar 21, 9:01 AM
Unknown Object (File)
Mon, Mar 20, 10:34 PM
Unknown Object (File)
Mon, Mar 20, 4:36 AM
Unknown Object (File)
Sun, Mar 19, 11:07 PM
Unknown Object (File)
Sun, Mar 19, 11:07 PM
Unknown Object (File)
Sat, Mar 18, 1:10 PM
Unknown Object (File)
Wed, Mar 15, 12:26 PM
Subscribers
None

Details

Summary

depends on D7476
address the clock skew issue by updating the pipeline sensor query to context.instance.event_log_storage.get_event_rows

  • for non-sqlite event storage: it cursors at record_id (auto incremented by db which eliminates clock skew risk)
  • for sqlite event storage: it cursors at update_timestamp, SqliteEventLogStorage overrides get_event_rows to workaround the run sharding: it get a list of run ids from the run storage first and then open run-based connection one at a time, i.e. it costs 1 run storage query and N event for run id queries.

how we init the cursor:

  • created a serde-able PipelineFailureSensorCursor which includes record_id and `update_timestamp, and handles backcompat
  • the cursor will be init to (record_id=most_record_event_id_from_event_storage, update_timestamp=curr_timestamp) so we would only alert on future runs
Test Plan
  • bk
  • local dagit+slack alerts using ConsolidatedSqliteEventLogStorage and SqliteEventLogStorage

Diff Detail

Repository
R1 dagster
Lint
Lint Not Applicable
Unit
Tests Not Applicable

Event Timeline

Harbormaster returned this revision to the author for changes because remote builds failed.Jun 3 2021, 9:51 AM
Harbormaster failed remote builds in B31583: Diff 38875!

This is looking close.

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
43

Could we use typing.NamedTuple to make this easier to read?

class PipelineFailureSensorCursor(NamedTuple):
    record_id: int
    update_timestamp: str

    @staticmethod
    def is_valid(json_str):
        if json_str is None:
            return False
        try:
            obj = deserialize_json_to_dagster_namedtuple(json_str)
            return isinstance(obj, PipelineFailureSensorCursor)
        except JSONDecodeError:
            return False

    def to_json(self):
        return serialize_dagster_namedtuple(self)

    @staticmethod
    def from_json(json_str):
        return deserialize_json_to_dagster_namedtuple(json_str)
114

recend -> recent

136

Small thing, but it's a little bit dangerous to include a comment about how a function is implemented at a call site for that function - the implementation could change and the comment would end up inaccurate.

145–148

I don't think the parentheses around record_id, event_record are required.

python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py
577

It's a little bit weird to just ignore this parameter. I know it puts us in a tough spot, because if we honor it, we face the clock skew issue. Not sure what the best solution is - maybe the event log should expose some sort of boolean that expresses whether it can be queried by cursor?

yuhan marked 3 inline comments as done.

feedback

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
43

typing.NamedTuple doesn't check types at runtime. we will need runtime type check for checking is_valid, for example, if we change the type of update_timestamp, we would want is_valid to return False (dagster.check will error and therefore the func will return False, but typing.NamedTuple will pass and we will miss this backcompat check)

136

good point. a similar comment is also included in SqliteEventLogStorage.get_event_rows. will refer to that instead of expanding here

python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py
577

how about making it _run_updated_after and therefore when someone passes it in, they really mean to use it.

alternatively, we could warn that run_updated_after is ignored - but the downside is basically the warning shows up every time bc sqlite is the rare case.

going with the private arg for now.

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
43

Ah I see. That will raise an error though, not False, right? Because check errors are not JSONDecodeErrors?

python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py
577

Agree on the downside about that warning. @alangenfeld do you have an opinion on the private arg? I don't love it, but I'd be ok with it if you're OK with it.

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
43

ya i updated the except to capture all Exception. open to suggestion tho

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
44

should still feed mypy types using the less-nice NamedTuple set-up

python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py
577

what do you think about generalizing the cursor and allowing it flow through in after_cursor (and probably before_cursor)? EventsCursor, GlobalEventsCursor, AllEventsCursor

then each impl can unpack and discard/use what it wants and its contained within the impl

python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py
577

i think thats a brilliant idea! will do

yuhan marked 2 inline comments as done.
  • EventsCursor(id, run_updated_after)
  • remove before_cursor bc we are not using it and implementing it in sqlite requires run storage breaking change (run storage now only allows updated_after). if we need it later, we can add it back and update the PipelineRunsFilter to take run_updated_before.

LGTM!

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
43

It's best to be as specific as possible with excepts. I think except (JSONDecodeError, CheckError) would cover all cases?

This revision is now accepted and ready to land.Jun 8 2021, 3:58 PM

nit: mypy the new functions

python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
43

should we have this and EventsCursor they seem kind of redundant (if we made EventCursor serdes)

think except (JSONDecodeError, CheckError) would cover all cases?

instead of CheckError I recently added a DeserializationError in the serdes module

yuhan marked an inline comment as done.
  • depends on D8299
  • specific except
  • mypy
python_modules/dagster/dagster/core/definitions/pipeline_sensor.py
43

PipelineFailureSensorCursor is the sensor's cursor while EventsCursor is a wrapper for storage cursor. At the moment they are kinda redundant (PipelineFailureSensorCursor does the serdes plus the validation and init) , but I'd prefer to separate these two to avoid future confusion.

If later we think they will be the same, we can drop this wrapper as it's just an internal representation.

This revision was landed with ongoing or failed builds.Jun 18 2021, 5:07 PM
This revision was automatically updated to reflect the committed changes.