Page MenuHomePhabricator

add toy asset sensor
ClosedPublic

Authored by prha on Dec 1 2020, 4:18 PM.

Details

Test Plan

ran longitudinal pipeline, saw log_asset_pipeline run

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

prha requested review of this revision.Dec 1 2020, 4:39 PM

Overall this makes sense to me, leaving for @sandyryza to take a look too

python_modules/dagster-test/dagster_test/toys/sensors.py
25–29

is raising an exception if something unexpected happens not part of this? Or is that implied?

26

drop the 'of'

56

exception?

63–77

is this going to be shared among all wrapped sensors? worth pulling out into a util?

python_modules/dagster/dagster/core/storage/event_log/sql_event_log.py
588–589

does this need a test case

python_modules/dagster-test/dagster_test/toys/sensors.py
54

similar feedback to the other one. I was expecting "last_run_key"

schrockn requested changes to this revision.Dec 1 2020, 6:51 PM

same question/request around last_run_key. would be interested in seeing this implemented in terms of that

This revision now requires changes to proceed.Dec 1 2020, 6:51 PM

update... I think this pattern actually makes more sense using time rather than run key

i'm generally happy if @schrockn is happy. Agree time makes more sense for this one - I'm actually kind of struggling to come up with a concrete use case that would use the last_asset_key case and not the time case, although I have also advocated for it in the past...

python_modules/dagster-test/dagster_test/toys/sensors.py
19–23

should we mention the Exception case here as well? or is that implied?

54

should this case have a SkipReason? or does 'no new events since last check' not justify one? Seems like it would still be useful to see the message in the UI?

58

is there a reason to not launch one for all of them?

python_modules/dagster-test/dagster_test/toys/sensors.py
58

My thinking was that the asset use case is one where the asset is something like a data table, and we want to kick off dependent jobs based on that asset changing. It matters less then, that the asset is changed multiple times, just that the changes get consumed somehow. But again, that kind of depends on what the asset is and what the pipeline is doing.

python_modules/dagster-test/dagster_test/toys/sensors.py
52

Should we trust our ability to compare timestamps in this way? The timestamps logged for the asset events may be coming from different nodes than the timestamp used for last_completion time. We can't expect their clocks to be 100% synced.

schrockn requested changes to this revision.Dec 2 2020, 4:23 PM

still concerned about basing this off of time

python_modules/dagster-test/dagster_test/toys/sensors.py
52

Yeah what @sandyryza says. I'm still concerned about the reliance on time here. This might just be a question of last_completion_time being poorly named, but to me an asset-based sensor should be keying off some sort of value persisted in a database/source of truth.

For example a sensor could have the policy of kicking off a run request for every unique asset materialization (we may need to start generating IDs for them). That is much more reliable than relying on time.

This revision now requires changes to proceed.Dec 2 2020, 4:23 PM

At a minimum, I'd like an explanation of how this would work in world with clock skew on the different nodes involved.

update to rely on run_key, cursor instead of timestamp

This is awesome.

My main remaining comment is on how we present this. I'm imagining a user asking in Slack "how do I set up a cross-dag dependency". It would be nice to be able to point them to this. In its current form, I suspect it would take them a while to grok it, because they need to wrap their head around the decorator definition.

What do you think about cutting down to something like this:

@sensor(pipeline_name="log_asset_pipeline")
def toy_asset_sensor(context):
    asset_key = AssetKey(["model"])
    events = context.instance.events_for_asset_key(
        asset_key, cursor=context.last_run_key, ascending=True
    )
    
    if not events:
        return

    run_key, event = events[0]  # take the most recent materialization
    from_pipeline = event.pipeline_name

    yield RunRequest(
        run_key=run_key,
        run_config={
            "solids": {
                "read_materialization": {
                    "config": {"asset_key": ["model"], "pipeline": from_pipeline}
                }
            }
        }
    )

get rid of extra asset_sensor decorator in favor of simpler example

This really is lovely. Excited to see it land.

This revision is now accepted and ready to land.Jan 6 2021, 6:57 PM
This revision was automatically updated to reflect the committed changes.