Differential D8684 Diff 40955 examples/docs_snippets/docs_snippets_tests/concepts_tests/partitions_schedules_sensors_tests/test_sensors.py
Changeset View
Changeset View
Standalone View
Standalone View
examples/docs_snippets/docs_snippets_tests/concepts_tests/partitions_schedules_sensors_tests/test_sensors.py
from contextlib import suppress | from contextlib import suppress | ||||
from dagster import build_sensor_context, execute_pipeline, pipeline, reconstructable, solid | from dagster import ( | ||||
build_sensor_context, | |||||
execute_pipeline, | |||||
pipeline, | |||||
reconstructable, | |||||
solid, | |||||
) | |||||
from dagster.core.definitions.run_request import RunRequest | from dagster.core.definitions.run_request import RunRequest | ||||
from dagster.core.test_utils import instance_for_test | from dagster.core.test_utils import instance_for_test | ||||
from docs_snippets.concepts.partitions_schedules_sensors.sensors.sensor_alert import ( | from docs_snippets.concepts.partitions_schedules_sensors.sensors.sensor_alert import ( | ||||
failure_alert_pipeline, | failure_alert_pipeline, | ||||
pipeline_failure_sensor, | pipeline_failure_sensor, | ||||
) | ) | ||||
from docs_snippets.concepts.partitions_schedules_sensors.sensors.sensors import ( | from docs_snippets.concepts.partitions_schedules_sensors.sensors.sensors import ( | ||||
isolated_run_request, | isolated_run_request, | ||||
Show All 19 Lines | |||||
def test_failure_alert_pipeline(): | def test_failure_alert_pipeline(): | ||||
result = execute_pipeline(failure_alert_pipeline, mode="test") | result = execute_pipeline(failure_alert_pipeline, mode="test") | ||||
assert result.success | assert result.success | ||||
def test_log_file_pipeline(): | def test_log_file_pipeline(): | ||||
result = execute_pipeline( | result = execute_pipeline( | ||||
log_file_pipeline, run_config={"solids": {"process_file": {"config": {"filename": "test"}}}} | log_file_pipeline, | ||||
run_config={ | |||||
"solids": {"process_file": {"config": {"filename": "test"}}} | |||||
}, | |||||
) | ) | ||||
assert result.success | assert result.success | ||||
def test_my_directory_sensor(): | def test_my_directory_sensor(): | ||||
# TODO: Actually test | # TODO: Actually test | ||||
assert my_directory_sensor | assert my_directory_sensor | ||||
▲ Show 20 Lines • Show All 43 Lines • Show Last 20 Lines |