Differential D8684 Diff 40813 examples/docs_snippets/docs_snippets_tests/concepts_tests/solids_pipelines_tests/test_hooks.py
Changeset View
Changeset View
Standalone View
Standalone View
examples/docs_snippets/docs_snippets_tests/concepts_tests/solids_pipelines_tests/test_hooks.py
from unittest import mock | from unittest import mock | ||||
from dagster import DagsterEventType, ModeDefinition, ResourceDefinition, execute_pipeline, pipeline | from dagster import ( | ||||
DagsterEventType, | |||||
ModeDefinition, | |||||
ResourceDefinition, | |||||
execute_pipeline, | |||||
pipeline, | |||||
) | |||||
from dagster.core.definitions import solid | from dagster.core.definitions import solid | ||||
from docs_snippets.concepts.solids_pipelines.solid_hooks import ( | from docs_snippets.concepts.solids_pipelines.solid_hooks import ( | ||||
a, | a, | ||||
notif_all, | notif_all, | ||||
selective_notif, | selective_notif, | ||||
slack_message_on_failure, | slack_message_on_failure, | ||||
slack_message_on_success, | slack_message_on_success, | ||||
test_my_success_hook, | test_my_success_hook, | ||||
) | ) | ||||
from docs_snippets.concepts.solids_pipelines.solid_hooks_context import my_failure_hook | from docs_snippets.concepts.solids_pipelines.solid_hooks_context import ( | ||||
my_failure_hook, | |||||
) | |||||
def test_notif_all_pipeline(): | def test_notif_all_pipeline(): | ||||
result = execute_pipeline(notif_all, mode="dev", raise_on_error=False) | result = execute_pipeline(notif_all, mode="dev", raise_on_error=False) | ||||
assert not result.success | assert not result.success | ||||
for event in result.event_list: | for event in result.event_list: | ||||
if event.is_hook_event: | if event.is_hook_event: | ||||
if event.event_type == DagsterEventType.HOOK_SKIPPED: | if event.event_type == DagsterEventType.HOOK_SKIPPED: | ||||
assert event.step_key == "a" | assert event.step_key == "a" | ||||
if event.event_type == DagsterEventType.HOOK_COMPLETED: | if event.event_type == DagsterEventType.HOOK_COMPLETED: | ||||
assert event.step_key == "b" | assert event.step_key == "b" | ||||
def test_selective_notif_pipeline(): | def test_selective_notif_pipeline(): | ||||
result = execute_pipeline(selective_notif, mode="dev", raise_on_error=False) | result = execute_pipeline( | ||||
selective_notif, mode="dev", raise_on_error=False | |||||
) | |||||
assert not result.success | assert not result.success | ||||
for event in result.event_list: | for event in result.event_list: | ||||
if event.is_hook_event: | if event.is_hook_event: | ||||
if event.event_type == DagsterEventType.HOOK_SKIPPED: | if event.event_type == DagsterEventType.HOOK_SKIPPED: | ||||
assert event.step_key == "a" | assert event.step_key == "a" | ||||
if event.event_type == DagsterEventType.HOOK_COMPLETED: | if event.event_type == DagsterEventType.HOOK_COMPLETED: | ||||
assert event.step_key == "a" | assert event.step_key == "a" | ||||
def test_hook_resource(): | def test_hook_resource(): | ||||
slack_mock = mock.MagicMock() | slack_mock = mock.MagicMock() | ||||
@pipeline( | @pipeline( | ||||
mode_defs=[ | mode_defs=[ | ||||
ModeDefinition( | ModeDefinition( | ||||
"unittest", | "unittest", | ||||
resource_defs={"slack": ResourceDefinition.hardcoded_resource(slack_mock)}, | resource_defs={ | ||||
"slack": ResourceDefinition.hardcoded_resource(slack_mock) | |||||
}, | |||||
), | ), | ||||
] | ] | ||||
) | ) | ||||
def foo(): | def foo(): | ||||
a.with_hooks({slack_message_on_success, slack_message_on_failure})() | a.with_hooks({slack_message_on_success, slack_message_on_failure})() | ||||
execute_pipeline(foo) | execute_pipeline(foo) | ||||
assert slack_mock.chat.post_message.call_count == 1 | assert slack_mock.chat.post_message.call_count == 1 | ||||
Show All 18 Lines |