Page MenuHomePhabricator

solid-hook-1 @hook and HookDefinition
ClosedPublic

Authored by yuhan on Jul 22 2020, 9:10 PM.

Details

Summary

breaking down D3812 into stacked diffs

The problems that hooks are trying to solve:

  • Pipeline is a shared responsibility. so users want to only alert the solid author rather than being alerted when any solid execution fails.
  • Only alert on high-priority solid executions.
  • Solids are reusable in a shared library. so different people want to apply different hooks to the same solid definition.

Also users should be able to

  • Turn on/off hooks in dev.

Decision:

  • Enable a generic hook with a list of dagster events in its context so users can write their own event filter logic
  • Also provide hooks on success and hooks on failure for the most common use cases, e.g. solid-level success/failure alerting
  • Because users may want to apply different sets of hooks to a shared solid definition, hooks should not be tied to solid definition and it should be applied on solid invocation instead.
  • Hooks will always run. If a user doesn't want to send the real slack message or run the prod hook code in certain env, they are able to "turn off" the hook by mocking out the hook's resources.

this diff builds

  • @success_hook: hooks that will run when the execution of the associated solid succeeds
@success_hook
def do_something_hook(context):
    do_something()

@success_hook(required_resource_keys={'slack'})
def slack_hook(context):
    message = 'solid {} succeeded'.format(context.solid.name)
    context.resources.slack.send_message(message)
  • @failure_hook: hooks that will run when the execution of the associated solid fails
@failure_hook
def do_something_hook(context):
    do_something()

@failure_hook(required_resource_keys={'slack'})
def slack_hook(context):
    message = 'solid {} failed'.format(context.solid.name)
    context.resources.slack.send_message(message)
  • hook events

  • HOOK_COMPLETED when the user-defined function finishes successfully
  • HOOK_ERRORED when the user-defined function runs into errors
  • HOOK_SKIPPED when the user-defined function is not triggered

next diff: hooks will be applied on solid invocation as:

@pipeline
def a_pipeline()
    a_solid.hook(hook_defs={a_hook})()
    a_solid.alias('solid_with_hook').hook(hook_defs={a_hook})()
    a_solid.alias('solid_without_hook')()
Test Plan

bk

Diff Detail

Repository
R1 dagster
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
schrockn added inline comments.Jul 28 2020, 8:37 PM
python_modules/dagster/dagster/core/definitions/decorators/hook.py
24–28

All hooks should be "generic" at this point in the stack. From the standpoint of the _SolidHook the success/failure hooks should be just vanilla hooks

119–150

I think this could be much more simple if you just implemented this in terms of @solid_hook instead of _SolidHook. I think it would make the layering more clear and would eliminate the special-casing you currently have inside of _SolidHook

def solid_failure_hook(...):
    @hook(...)
    def _on_failure_hook(context, event_stream):
        for event in event_stream:
            if event.event_type == DagsterEventType.STEP_SUCCESS:
                fn(context)
                break
    return _on_failure_hook
This revision now requires changes to proceed.Jul 28 2020, 8:37 PM
prha added inline comments.Jul 28 2020, 9:25 PM
python_modules/dagster/dagster/core/execution/plan/execute_plan.py
89–141

I think that's only for finally/yield... without the finally, it should exit fine.

yuhan updated this revision to Diff 19747.Jul 29 2020, 8:13 PM
yuhan marked 6 inline comments as done.

streaming behavior in execute_plan

python_modules/dagster/dagster/core/definitions/decorators/hook.py
119–150

👍 will do

python_modules/dagster/dagster/core/execution/plan/execute_plan.py
89–91

tested the interleave behavior and got

order = ['enter-hook', 'consumed-mat-one', 'enter-solid', 'consumed-mat-two', 'reentered-after-yielded-asset-one', 'consumed-output', 'reentered-after-yielded-asset-two', 'reentered-after-output']

but when i moved the yield hook_event after the yield step_event, it became to consume the entire event stream synchronously.

not quite sure why these two behave differently

yuhan updated this revision to Diff 19764.Jul 30 2020, 12:57 AM
yuhan marked 8 inline comments as done.
  • implement success/failure hook on top of @solid_hook
  • better test case for streaming behavior
yuhan added inline comments.Jul 30 2020, 1:03 AM
python_modules/dagster/dagster/core/execution/plan/execute_plan.py
89–91

tests done:

  1. see test test_hook_event_stream
  2. also executed a solid that interleaves sleep calls with yielded events: https://dagster.phacility.com/P67

python_modules/dagster/dagster/core/execution/plan/plan.py
313

hook_defs is Set[HookDefinition] so here we will need to union sets instead of a simple set comprehension.

yuhan updated this revision to Diff 19791.Jul 30 2020, 5:24 AM

rename solid_hook -> hook

yuhan retitled this revision from solid-hook-1 @solid_hook and HookDefinition to solid-hook-1 @hook and HookDefinition.Jul 30 2020, 5:24 AM
yuhan updated this revision to Diff 19794.Jul 30 2020, 5:37 AM

revert to Diff 10 -- D19792 should be in next diff

schrockn requested changes to this revision.Jul 30 2020, 12:28 PM

Ok this is getting very very close.

I think one additional step that we need to do is do a quick meeting with Alex and I to do a quick code walkthrough of the core changes in the runtime. This is very tricky and subtle stuff and I want to make sure that a few of us understand this codepath in case difficult bugs come up, and then I think we can also use that meeting to generate some nice explanatory comments.

python_modules/dagster/dagster/core/definitions/decorators/hook.py
11–12

I believe we can totally delete special_fn_wrapper now that the layering of the success/failure hook has been done

39

I believe _hook_fn_wrapper is a no-op and can be eliminated

116

same note as below re: dynamic import

123–130

lovely

176

could do event.is_step_failure here and avoid the dynamic import.

Also import top level dagster within dagster is quite dangerous. Highly recommend importing directly from dagster.core.whatever

190–197

I think this is a no-op

python_modules/dagster/dagster/core/execution/plan/execute_plan.py
100

Oh this is super confusing as you are overwriting the passed in parameter. This threw me. Let's name this variable differently

110

Ok this is obviously working, but I don't fully understand *why* it is working. What does the ensure_gen call do exactly? The contract of the hook function as currently defined just consumes the event stream and returns nothing. So AFAIK this always returns None and therefore will always just get wrapped in function and yields None. But then we do nothing with the returned function.

python_modules/dagster/dagster/core/execution/plan/plan.py
313

hmmm can you explain further? pretty sure all comprehensions make a copy if that is why this is needed.

it's not that big of deal though

python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_hooks.py
84

i don't think we need these sleeps in order for this test to work so we probably shouldn't check it in for test perf

114–126

Let's add a test confirming that the order of *dagster* events is exactly what we expect (effectively encoding the video you posted in an automated test)

257–263

let's do the same (test for dagster event order) in all of these as well

This revision now requires changes to proceed.Jul 30 2020, 12:28 PM

The other further extension we may want to consider (shouldn't block this diff) is the ability of the hook functions themselves to yield events.

yuhan added inline comments.Jul 30 2020, 6:58 PM
python_modules/dagster/dagster/core/definitions/decorators/hook.py
11–12

my bad. will remove

python_modules/dagster/dagster/core/execution/plan/execute_plan.py
110

oh forgot to remove this. was thinking if we expect hook func to return something and we want to yield them, for example allowing hook func to yield events, we will need it to ensure we return a generator (similar to https://dagster.phacility.com/source/dagster/browse/master/python_modules/dagster/dagster/core/execution/resources_init.py$168)
now we are doing nothing about it so i should remove it.

python_modules/dagster/dagster/core/execution/plan/plan.py
313

step.hook_defs = {hook_def_1, hook_def_2, ...}. an ExecutionPlan will have multiple steps which means multiple sets of hook_defs. here we are flattening the set of sets so we can easily go over all hooks in a pipeline

yuhan marked 7 inline comments as done.Jul 30 2020, 11:16 PM

two things to note:
[1] AFAIK event_list can't get the hook events and step events in their actual order. for example, in this test case, the actual order (events in log and console) is

DagsterEventType.STEP_START,
DagsterEventType.STEP_MATERIALIZATION,
DagsterEventType.STEP_MATERIALIZATION,
DagsterEventType.STEP_OUTPUT,
# different order
DagsterEventType.HOOK_SUCCESS,
DagsterEventType.STEP_SUCCESS

but the result.event_list which is generated from inner_plan_execution_iterator will have the order as

DagsterEventType.STEP_START,
# different order
DagsterEventType.HOOK_SUCCESS,
DagsterEventType.STEP_MATERIALIZATION,
DagsterEventType.STEP_MATERIALIZATION,
DagsterEventType.STEP_OUTPUT,
DagsterEventType.STEP_SUCCESS

this is because in inner_plan_execution_iterator we did

for step_event, hook_event in zip_longest(event_stream_to_yield, hook_events):
    if step_event is not None:
        yield step_event
    if hook_event is not None:
        yield hook_event

it interleaves the step events [STEP_START, STEP_MATERIALIZATION, STEP_MATERIALIZATION, STEP_OUTPUT, ..] and hook events [HOOK_SUCCESS] to yield one of each at a time and ensure the async behavior. it means when we have multiple hooks on a solid, in the event_list we will get something like below but the console/dagit/logger will still get the events in their actual order (i.e. HOOK_SUCCESS will be fired once the hook stops consuming the stream)

STEP_START
HOOK_SUCCESS
STEP_MATERIALIZATION
HOOK_SUCCESS
STEP_MATERIALIZATION
HOOK_SUCCESS
STEP_OUTPUT
HOOK_SUCCESS
STEP_SUCCESS
HOOK_SUCCESS

[2] when there are multiple hooks configured on a solid, the hooks will run sequentially for that solid. i think we can't execute hooks for a solid truly asynchronously until we drop python 2.

python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_hooks.py
114–126

the video is testing the same behavior as the order does. they both test if hook is asynchronously consuming the event stream.

im adding another test to test the dagster event through logger callback. can't check result.event_list directly bc of [1]

yuhan updated this revision to Diff 19844.Jul 31 2020, 12:58 AM
yuhan marked 2 inline comments as done.

add more tests for event order

alangenfeld added inline comments.Jul 31 2020, 8:27 PM
python_modules/dagster/dagster/core/execution/plan/execute_plan.py
89–141

scaredbaby

python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_hooks.py
467

I think we need another test for the hook throwing in the middle of iteration - just to make sure our crazy iterator manipulation code handles that case correctly and the hook crashing doesnt effect the solid

yuhan planned changes to this revision.Jul 31 2020, 10:31 PM

per offline conversation, we are going to defer the decision of whether to provide event stream to a generic hook.

  • because we currently don't have a good tradeoff assessment on introducing complexity in a critical code path (the iterator manipulation in execute_plan) VS limiting hooks to wait until the step finishes and get the full list of step events

so we are going to

  • revert the event streaming behavior back to passing a full list of step events.
  • roll out @success_hook and @failure_hook to users and keep the generic @hook internally. so we can dogfood the hook signature and better understand the user value of the event_list vs event_stream. experimenting the api internally can also allow us to easily switch to streaming if needed.
yuhan updated this revision to Diff 19923.Aug 1 2020, 1:02 AM
  • event_stream -> event_list
  • HOOK_SUCCESS -> HOOK_COMPLETED, HOOK_FAILURE -> HOOK_ERRORED
  • new HOOK_SKIPPED event when user-defined fn is not triggered
yuhan edited the summary of this revision. (Show Details)Aug 3 2020, 5:40 PM
yuhan updated this revision to Diff 19966.Aug 3 2020, 6:06 PM

rebase

yuhan updated this revision to Diff 19972.Aug 3 2020, 8:18 PM

update docstring

schrockn requested changes to this revision.Aug 4 2020, 3:56 PM

ok almost there. most critically we definitely don't want to be mutating the context.

python_modules/dagster/dagster/core/definitions/decorators/hook.py
43

I think we should not name this @hook and instead reserve that name. Let's make success_hook and failure_hook the only public APIs for now. Maybe event_list_hook

python_modules/dagster/dagster/core/events/__init__.py
54–56

much more clear with these names

python_modules/dagster/dagster/core/execution/context/system.py
357–359

Really feel strongly against mutating the context in this matter. Let's instead make the contract of the hook function return something that indicates a skip

python_modules/dagster/dagster/core/execution/plan/execute_plan.py
114

yeah have something return here that indicates a skip

This revision now requires changes to proceed.Aug 4 2020, 3:56 PM
schrockn added inline comments.Aug 4 2020, 4:12 PM
python_modules/dagster/dagster/__init__.py
14

I actually don't think we should make this public for now. Let's only support the more constrained failure_hook and success_hook as an API we'll stand behind. I think we want the flexibility to change HookDefinition

yuhan updated this revision to Diff 20077.Aug 4 2020, 7:14 PM
yuhan marked 4 inline comments as done.

HookExecutionResult

schrockn accepted this revision.Aug 4 2020, 8:18 PM

verygood

Great stuff. Just a couple minor comments but excited to land this

python_modules/dagster/dagster/core/definitions/dependency.py
85

why not frozenset here?

python_modules/dagster/dagster/core/execution/plan/execute_plan.py
121–126

I say let's just hard enforce that this returns a HookExecutionResult and not allow None

This revision is now accepted and ready to land.Aug 4 2020, 8:18 PM
yuhan updated this revision to Diff 20094.Aug 4 2020, 10:46 PM
yuhan marked an inline comment as done.

enforce that event_list_hook should return a HookExecutionResult

yuhan added inline comments.Aug 4 2020, 10:48 PM
python_modules/dagster/dagster/core/definitions/dependency.py
85

the input arg should already be frozenset

This revision was landed with ongoing or failed builds.Aug 5 2020, 6:34 PM
This revision was automatically updated to reflect the committed changes.