Page MenuHomeElementl

pipeline hook prototype
Needs ReviewPublic

Authored by yuhan on Fri, Apr 16, 12:08 AM.

Details

Summary

V0: sensor daemon + dummy pipeline run: https://dagster.phacility.com/D7476?id=35632

v1: sensor daemon + invoke hooks in sensor's evaluation fun

  • #"system hook sensor defs" = #hook_def x #pipeline_def
  • no extra dummy pipeline runs
  • hook execution logs back to the original pipeline run via engine events

here's a demo where we have slack_on_pipeline_start and slack_on_pipeline_failure, and the alerts are "near-real-time" (lagged by minimum_interval_seconds)

Test Plan

local dagit + local daemon + slack message

Diff Detail

Repository
R1 dagster
Branch
yuhan/pipeline-hook-sensor
Lint
Lint Passed
Unit
No Test Coverage

Event Timeline

Harbormaster returned this revision to the author for changes because remote builds failed.Fri, Apr 16, 12:28 AM
Harbormaster failed remote builds in B28974: Diff 35558!

@pipeline_hook(dagster_event_type_value, pipeline_name, resource_defs)

up

python_modules/dagster/dagster/core/definitions/pipeline_hook/repo.py
26–56

user code: slack_on_pipeline_failure

yuhan published this revision for review.Fri, Apr 16, 11:11 PM
yuhan edited the summary of this revision. (Show Details)

What do you think about invoking the hook_fn directly inside the sensor instead of launching a pipeline run to invoke it? A few potential advantages:

  • If the run launcher is broken or k8s is full, the hook can still execute.
  • If the run queue is full, the hook can still execute.
  • If we made some changes to the sensor API to allow it to not target a pipeline, would address the awkwardness of the pipeline_hook API needing to return multiple definitions.

@sandyryza

  • If the run launcher is broken or k8s is full, the hook can still execute.

agreed

  • If the run queue is full, the hook can still execute
  • If we made some changes to the sensor API to allow it to not target a pipeline, would address the awkwardness of the pipeline_hook API needing to return multiple definitions.

i agreed that invoking the hook_fn inside the sensor's evaluation_fn can make things much easier. but it would also cause some other issues which were taken care of by the pipeline execution process:

  • sensors execute serially 1) hook_fn will have to run sequentially while the pipeline runs can be concurrent 2) if one hook_fn takes longer, it'd slow down all other sensors (not only the hook ones) in the instance.
  • sensors don't init resources

^ not sure if we want to solve these by changing the sensor API or deferring to a pipeline run + introducing some "dummy" pipeline run concept (i.e. hiding these from the users)

cc @prha in case my understanding of the sensor implementation is off

I strongly recommend not actually using sensors for this implementation. Rather than contort sensor definitions to make them fit this use case, we should consider just creating a different daemon that won't have the same limitations.

working but need a lot clean up
sensor daemon + invoke hooks in sensor's evaluation_fn

yuhan removed a reviewer: sandyryza.

no need to yield PipelineHookRunSuccess

yuhan retitled this revision from WIP convert hook_fn to a dummy pipeline and a sensor to pipeline hook prototype.Wed, May 5, 7:21 AM
yuhan edited the summary of this revision. (Show Details)
python_modules/dagster/dagster/core/definitions/pipeline_hook/repo.py
12

I think I prefer having explicit pipeline_failure_hook and pipeline_success_hook.

If we do expose a more generic event-based pipeline_hook, we should probably use DagsterEventType instead of a string

74–75

Should we not be able to apply the pipeline level hook in a similar fashion to the solid-level hooks?

e.g.

@slack_on_pipeline_failure
@slack_on_pipeline_start 
@pipeline(mode_defs=[...])
def my_pipeline():
    this_solid_will_fail(this_solid_will_succeed))
yuhan edited the summary of this revision. (Show Details)

up

python_modules/dagster/dagster/core/definitions/pipeline_hook/repo.py
12

👍

74–75

i think we could either

  1. only allow pipeline hook to decorate a pipeline def bc it feels more natural and intuitive to do
  2. allow both bc a) when the user decorates it with both solid hooks and pipeline hooks, i'd expect them to be aware of both mechanism b) if not, we can guard it and warn users in the decorator layer.

i was thinking (and planning to do it in a follow up after we are happy with the arg)

@slack_on_pipeline_failure
@slack_on_pipeline_start 
@pipeline(mode_defs=[...])
def my_pipeline():
    this_solid_will_fail(this_solid_will_succeed))

would be equivalent to

@pipeline(pipeline_hook_name_pending_defs=[slack_on_pipeline_failure, slack_on_pipeline_start], mode_defs=[...])
def my_pipeline():
    this_solid_will_fail(this_solid_will_succeed))

I'm excited to see progress on this. The main top-level feedback I have here is that I think we should strongly consider making hooks standalone objects on the repository, like sensor defs, rather than live on pipeline definitions.

Here are some of the advantages:

  • It’s easy to forget to include a hook after defining a new pipeline. Many users would prefer to define a hook that consumes events from all of their pipelines. (That's what we would have done at KeepTruckin if both options were available).
  • People will want to receive failure notifications if a schedule or sensor function hits an error, and scoping the hook to the pipeline level makes it awkward for it to act on that.
  • Including the hook on the pipeline implies that it will run whenever the pipeline runs, which won’t be the case if the daemon isn’t running. I know we've discussed naming parameter very carefully to try to avoid this misconception, and that might work.
  • In many cases, it might be desirable to put the hook in a separate repository from the pipeline. In many cases, we're going to be constantly forking a new Python process to execute the hook, and someone might prefer that process to not load a bunch of Python dependencies it doesn't need.
  • In many cases, it might be desirable to get a notification when something does *not* happen, rather than when something does happen. I.e. notify me if this asset has not been materialized by 1 pm.

I think we should strongly consider making hooks standalone objects on the repository, like sensor defs, rather than live on pipeline definitions.

this was my intuitive reaction - attaching to pipeline feels off to me.

Rather than contort sensor definitions to make them fit this use case, we should consider just creating a different daemon that won't have the same limitations.

what exactly are the limitations/constraints of sensors that we would not want to apply for these "monitoring" cases?

My thoughts around this - stemming from thinking it makes sense for these to be repo level entities sibling to pipelines:
Should these not just be sensors? very related to constraints question above.

  • If I kick off a pipeline because I sense another pipeline yielded an asset - I think currently we view that as a sensor
  • if I kick off a pipeline because I sense another pipeline completed successfully - should that be in a sensor?
  • If i kick off a pipeline because I sense the pipeline failed and I want to resume-retry it - should that be in a sensor?
  • If i notify slack AND kick off a pipeline run in response to a success/failure - should that be in a sensor?

I understand we would have to change / broaden what a sensor is to move in this direction - but I *think* it could make sense

If there are good reasons to have these be distinct from sensors, my pitch would be to call them monitors

I think I was originally thinking of the sensor definition as being frozen, and then having to wrap each "hook" function in a surrounding pipeline definition and having to hide that and figure out the existing run storage and how that would look.

That seemed much worse than what I think you're proposing, which is broadening what the outputs of sensors are and relaxing the constraints of what they can kick off (which would strictly be a pipeline run)

That seemed much worse than what I think you're proposing, which is broadening what the outputs of sensors are and relaxing the constraints of what they can kick off (which would strictly be a pipeline run)

cool - well here are the properties i think we need for this idea to work, whether that would be available to all sensors, something sibling to sensors, or a special subclass of sensor

  • origin / attribution - this variant *can* run scoped to an originating run_id. This allows this variant to report back to the origins event log, providing events that bridge to runs that have been kicked off in response or just confirming that actions have been taken. Examples where this is useful
    • kicked off retry-from-failure run in response to failure - links to new run
    • sent slack messages on failure - can see in origin run that action was taken
    • failed to send slack messages - can at least see that an exception occurred in my thing thats why i didn't get notified
    • kicked off downstream pipeline when asset was produced - link to run
    • kicked off downstream pipeline on success - link to run
  • multiple targets - one "retry_on_failures" sensor-variant can kick off retry runs for any pipeline in that same repository. RunRequest can select target
  • execution control - current sensors fire continuously on some fixed interval and checks some external state. This variant of sensor is mostly about things happening in dagster - so i think some amount of the "checking" process should be moved up in to our system code so we can manage it efficiently. If I want to set up a "failure" sensor - it would be nice for the scope of code executed in user space to just be fired on the failure event (and given the already loaded failure event). This is very tied to the origin / attribution property.

This allows this variant to report back to the origins event log

How would this work in a world where we encourage people to print (or whatever) instead of using context.log?

How would this work in a world where we encourage people to print (or whatever) instead of using context.log?

i think either way, we can thread the message and report it back to the original run like [1]

python_modules/dagster/dagster/core/definitions/pipeline_hook/pipeline_hook.py
72

[1]

How would this work in a world where we encourage people to print (or whatever) instead of using context.log?

I wasn't thinking about a single dagster created event based on what occurred in the whole sensor-variant run ie what runs were kicked off via RunRequests, not necessarily individual user provided entries. Though I do think a variant of instance.report_event(origin_run_id, ...) could exist if we believe that was required. My head is at accepting a metadata bundle from the sensor-variant that would go in the singular event before that though.

I like the direction alex is proposing. this is my mental model of this direction:

image.png (460×1 px, 84 KB)

a sensor's flow could look like that^ where

  • the input could be either
    • external states (e.g. s3 file)
    • dagster-aware states (e.g. pipeline failure, pipeline success, dagster-aware assets)

( ^ we currently don't differentiate these two and for sensor-variant i don't think we need to either. but in terms of execution control, we could eventually separate the evaluation based on this and make the evaluation/execution/triggering more efficient and robust.)

  • the output could be one of the following:
    • SkipReason to skip further actions (existing sensor behavior)
    • RunRequest to originate a new PipelineRun (existing sensor behavior)
    • MonitorRequest(origin_run_id, ...) to do something (i.e. run hook_fn) and report back to the existing run whose id is origin_run_id (new. fit the hook scenarios)

re: the properties

  • origin / attribution: can carry this info inside the new output type like MonitorRequest(origin_run_id)
  • multiple targets: should be straightforward to allow multiple "outputs"
  • execution control: i think this could be a next step to figure out as im not sure if we have enough confidence to pre-optimize the control flow.
    • but an interesting discussion to have rn is "where would the run hook_fn piece happen" - this diff puts it inside the sensor's evaluation fn and i think it's ok to do it for now esp given D7613 will allow us to advance a "cursor" without requesting a run

note: all names/nouns are pending :)

@sandyryza re: capturing logs, one of the changes I'm making to compute logs is to loosen the hard run_id/step_key <=> compute logs binding there currently is.

I could see the reporting mechanism insert some event in the logs of the originating run id which would point to the location of the compute logs of the hook, even though it's captured outside of the pipeline run context.

reporting mechanism insert some event in the logs of the originating run id which would point to the location of the compute logs of the hook, even though it's captured outside of the pipeline run context.

something similar to instance.report_event(origin_run_id, ...) sounds like a plan