Changeset View
Changeset View
Standalone View
Standalone View
docs/content/concepts/partitions-schedules-sensors/sensors.mdx
Show First 20 Lines • Show All 65 Lines • ▼ Show 20 Lines | |||||
@sensor(pipeline_name="log_file_pipeline") | @sensor(pipeline_name="log_file_pipeline") | ||||
def my_directory_sensor(): | def my_directory_sensor(): | ||||
for filename in os.listdir(MY_DIRECTORY): | for filename in os.listdir(MY_DIRECTORY): | ||||
filepath = os.path.join(MY_DIRECTORY, filename) | filepath = os.path.join(MY_DIRECTORY, filename) | ||||
if os.path.isfile(filepath): | if os.path.isfile(filepath): | ||||
yield RunRequest( | yield RunRequest( | ||||
run_key=filename, | run_key=filename, | ||||
run_config={"solids": {"process_file": {"config": {"filename": filename}}}}, | run_config={ | ||||
"solids": { | |||||
"process_file": {"config": {"filename": filename}} | |||||
} | |||||
}, | |||||
) | ) | ||||
``` | ``` | ||||
This sensor iterates through all the files in `MY_DIRECTORY` and `yields` a <PyObject object="RunRequest"/> for each file. | This sensor iterates through all the files in `MY_DIRECTORY` and `yields` a <PyObject object="RunRequest"/> for each file. | ||||
Once `my_directory_sensor` is added to a <PyObject object="repository"/> with `log_file_pipeline`, it can be enabled and used. | Once `my_directory_sensor` is added to a <PyObject object="repository"/> with `log_file_pipeline`, it can be enabled and used. | ||||
## Idempotence and Cursors | ## Idempotence and Cursors | ||||
When instigating runs based on external events, you usually want to run exactly one pipeline run for each event. There are two ways to define your sensors to avoid creating duplicate runs for your events: using `run_key` and using a cursor. | When instigating runs based on external events, you usually want to run exactly one pipeline run for each event. There are two ways to define your sensors to avoid creating duplicate runs for your events: using `run_key` and using a cursor. | ||||
### Idempotence using run keys | ### Idempotence using run keys | ||||
In the example sensor above, the <PyObject object="RunRequest"/> is constructed with a `run_key`. | In the example sensor above, the <PyObject object="RunRequest"/> is constructed with a `run_key`. | ||||
```python file=concepts/partitions_schedules_sensors/sensors/sensors.py startafter=start_run_request_marker endbefore=end_run_request_marker | ```python file=concepts/partitions_schedules_sensors/sensors/sensors.py startafter=start_run_request_marker endbefore=end_run_request_marker | ||||
yield RunRequest( | yield RunRequest( | ||||
run_key=filename, | run_key=filename, | ||||
run_config={"solids": {"process_file": {"config": {"filename": filename}}}}, | run_config={ | ||||
"solids": {"process_file": {"config": {"filename": filename}}} | |||||
}, | |||||
) | ) | ||||
``` | ``` | ||||
Dagster guarantees that for a given sensor, at most one run is created for each <PyObject object="RunRequest"/> with a unique `run_key`. If a sensor yields a new run request with a previously used `run_key`, Dagster skips processing the new run request. | Dagster guarantees that for a given sensor, at most one run is created for each <PyObject object="RunRequest"/> with a unique `run_key`. If a sensor yields a new run request with a previously used `run_key`, Dagster skips processing the new run request. | ||||
In the example, a <PyObject object="RunRequest"/> is requested for each file during _every_ sensor evaluation. Therefore, for a given sensor evaluation, there already exists a `RunRequest` with a `run_key` for any file that existed during the previous sensor evaluation. Dagster skips processing duplicate run requests, so Dagster launches runs for only the files added since the last sensor evaluation. The result is exactly one run per file. | In the example, a <PyObject object="RunRequest"/> is requested for each file during _every_ sensor evaluation. Therefore, for a given sensor evaluation, there already exists a `RunRequest` with a `run_key` for any file that existed during the previous sensor evaluation. Dagster skips processing duplicate run requests, so Dagster launches runs for only the files added since the last sensor evaluation. The result is exactly one run per file. | ||||
Run keys allow you to write sensor evaluation functions that declaratively describe what pipeline runs should exist, and helps you avoid the need for more complex logic that manages state. However, when dealing with high-volume external events, some state-tracking optimizations might be necessary. | Run keys allow you to write sensor evaluation functions that declaratively describe what pipeline runs should exist, and helps you avoid the need for more complex logic that manages state. However, when dealing with high-volume external events, some state-tracking optimizations might be necessary. | ||||
Show All 20 Lines | for filename in os.listdir(MY_DIRECTORY): | ||||
if os.path.isfile(filepath): | if os.path.isfile(filepath): | ||||
fstats = os.stat(filepath) | fstats = os.stat(filepath) | ||||
file_mtime = fstats.st_mtime | file_mtime = fstats.st_mtime | ||||
if file_mtime <= last_mtime: | if file_mtime <= last_mtime: | ||||
continue | continue | ||||
# the run key should include mtime if we want to kick off new runs based on file modifications | # the run key should include mtime if we want to kick off new runs based on file modifications | ||||
run_key = f"{filename}:{str(file_mtime)}" | run_key = f"{filename}:{str(file_mtime)}" | ||||
run_config = {"solids": {"process_file": {"config": {"filename": filename}}}} | run_config = { | ||||
"solids": {"process_file": {"config": {"filename": filename}}} | |||||
} | |||||
yield RunRequest(run_key=run_key, run_config=run_config) | yield RunRequest(run_key=run_key, run_config=run_config) | ||||
max_mtime = max(max_mtime, file_mtime) | max_mtime = max(max_mtime, file_mtime) | ||||
context.update_cursor(str(max_mtime)) | context.update_cursor(str(max_mtime)) | ||||
``` | ``` | ||||
For sensors that consume multiple event streams, you may need to serialize and deserialize a more complex data structure in and out of the cursor string to keep track of the sensor's progress over the multiple streams. | For sensors that consume multiple event streams, you may need to serialize and deserialize a more complex data structure in and out of the cursor string to keep track of the sensor's progress over the multiple streams. | ||||
Show All 28 Lines | |||||
@sensor(pipeline_name="log_file_pipeline") | @sensor(pipeline_name="log_file_pipeline") | ||||
def my_directory_sensor_with_skip_reasons(): | def my_directory_sensor_with_skip_reasons(): | ||||
has_files = False | has_files = False | ||||
for filename in os.listdir(MY_DIRECTORY): | for filename in os.listdir(MY_DIRECTORY): | ||||
filepath = os.path.join(MY_DIRECTORY, filename) | filepath = os.path.join(MY_DIRECTORY, filename) | ||||
if os.path.isfile(filepath): | if os.path.isfile(filepath): | ||||
yield RunRequest( | yield RunRequest( | ||||
run_key=filename, | run_key=filename, | ||||
run_config={"solids": {"process_file": {"config": {"filename": filename}}}}, | run_config={ | ||||
"solids": { | |||||
"process_file": {"config": {"filename": filename}} | |||||
} | |||||
}, | |||||
) | ) | ||||
has_files = True | has_files = True | ||||
if not has_files: | if not has_files: | ||||
yield SkipReason(f"No files found in {MY_DIRECTORY}.") | yield SkipReason(f"No files found in {MY_DIRECTORY}.") | ||||
``` | ``` | ||||
## Testing sensors | ## Testing sensors | ||||
To quickly preview what an existing sensor would generate when evaluated, you can run the CLI command `dagster sensor preview my_sensor_name`. | To quickly preview what an existing sensor would generate when evaluated, you can run the CLI command `dagster sensor preview my_sensor_name`. | ||||
In order to unit test sensors, you can invoke the sensor directly. This will return all the run requests yielded by the sensor. The config obtained from these can be validated using the <PyObject object="validate_run_config" /> function. | In order to unit test sensors, you can invoke the sensor directly. This will return all the run requests yielded by the sensor. The config obtained from these can be validated using the <PyObject object="validate_run_config" /> function. | ||||
```python file=concepts/partitions_schedules_sensors/sensors/sensors.py startafter=start_sensor_testing endbefore=end_sensor_testing | ```python file=concepts/partitions_schedules_sensors/sensors/sensors.py startafter=start_sensor_testing endbefore=end_sensor_testing | ||||
from dagster import validate_run_config | from dagster import validate_run_config | ||||
@sensor(pipeline_name="log_file_pipeline") | @sensor(pipeline_name="log_file_pipeline") | ||||
def sensor_to_test(): | def sensor_to_test(): | ||||
yield RunRequest( | yield RunRequest( | ||||
run_key="foo", | run_key="foo", | ||||
run_config={"solids": {"process_file": {"config": {"filename": "foo"}}}}, | run_config={ | ||||
"solids": {"process_file": {"config": {"filename": "foo"}}} | |||||
}, | |||||
) | ) | ||||
def test_sensor(): | def test_sensor(): | ||||
for run_request in sensor_to_test(): | for run_request in sensor_to_test(): | ||||
assert validate_run_config(log_file_pipeline, run_request.run_config) | assert validate_run_config(log_file_pipeline, run_request.run_config) | ||||
``` | ``` | ||||
Show All 10 Lines | for filename in os.listdir(MY_DIRECTORY): | ||||
if os.path.isfile(filepath): | if os.path.isfile(filepath): | ||||
fstats = os.stat(filepath) | fstats = os.stat(filepath) | ||||
file_mtime = fstats.st_mtime | file_mtime = fstats.st_mtime | ||||
if file_mtime <= last_mtime: | if file_mtime <= last_mtime: | ||||
continue | continue | ||||
# the run key should include mtime if we want to kick off new runs based on file modifications | # the run key should include mtime if we want to kick off new runs based on file modifications | ||||
run_key = f"{filename}:{str(file_mtime)}" | run_key = f"{filename}:{str(file_mtime)}" | ||||
run_config = {"solids": {"process_file": {"config": {"filename": filename}}}} | run_config = { | ||||
"solids": {"process_file": {"config": {"filename": filename}}} | |||||
} | |||||
yield RunRequest(run_key=run_key, run_config=run_config) | yield RunRequest(run_key=run_key, run_config=run_config) | ||||
max_mtime = max(max_mtime, file_mtime) | max_mtime = max(max_mtime, file_mtime) | ||||
context.update_cursor(str(max_mtime)) | context.update_cursor(str(max_mtime)) | ||||
``` | ``` | ||||
This sensor makes use of the `context` argument, and thus to invoke it, we need to provide one. | This sensor makes use of the `context` argument, and thus to invoke it, we need to provide one. | ||||
▲ Show 20 Lines • Show All 88 Lines • ▼ Show 20 Lines | |||||
@solid(required_resource_keys={"slack"}) | @solid(required_resource_keys={"slack"}) | ||||
def slack_message_on_failure_solid(context): | def slack_message_on_failure_solid(context): | ||||
message = f"Solid {context.solid.name} failed" | message = f"Solid {context.solid.name} failed" | ||||
context.resources.slack.chat.post_message(channel="#foo", text=message) | context.resources.slack.chat.post_message(channel="#foo", text=message) | ||||
@pipeline( | @pipeline( | ||||
mode_defs=[ | mode_defs=[ | ||||
ModeDefinition(name="test", resource_defs={"slack": ResourceDefinition.mock_resource()}), | ModeDefinition( | ||||
name="test", | |||||
resource_defs={"slack": ResourceDefinition.mock_resource()}, | |||||
), | |||||
ModeDefinition(name="prod", resource_defs={"slack": slack_resource}), | ModeDefinition(name="prod", resource_defs={"slack": slack_resource}), | ||||
] | ] | ||||
) | ) | ||||
def failure_alert_pipeline(): | def failure_alert_pipeline(): | ||||
slack_message_on_failure_solid() | slack_message_on_failure_solid() | ||||
``` | ``` | ||||
Then, you can define a sensor that fetches the failed runs from the runs table via `context.instance`, and instigates a `failure_alert_pipeline` run for every failed run. Note that we use the failed run's id as the `run_key` to prevent sending an alert twice for the same pipeline run. | Then, you can define a sensor that fetches the failed runs from the runs table via `context.instance`, and instigates a `failure_alert_pipeline` run for every failed run. Note that we use the failed run's id as the `run_key` to prevent sending an alert twice for the same pipeline run. | ||||
Show All 36 Lines |