Differential D8684 Diff 40813 examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/sensors/sensors.py
Changeset View
Changeset View
Standalone View
Standalone View
examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/sensors/sensors.py
Show All 28 Lines | |||||
@sensor(pipeline_name="log_file_pipeline") | @sensor(pipeline_name="log_file_pipeline") | ||||
def my_directory_sensor(): | def my_directory_sensor(): | ||||
for filename in os.listdir(MY_DIRECTORY): | for filename in os.listdir(MY_DIRECTORY): | ||||
filepath = os.path.join(MY_DIRECTORY, filename) | filepath = os.path.join(MY_DIRECTORY, filename) | ||||
if os.path.isfile(filepath): | if os.path.isfile(filepath): | ||||
yield RunRequest( | yield RunRequest( | ||||
run_key=filename, | run_key=filename, | ||||
run_config={"solids": {"process_file": {"config": {"filename": filename}}}}, | run_config={ | ||||
"solids": { | |||||
"process_file": {"config": {"filename": filename}} | |||||
} | |||||
}, | |||||
) | ) | ||||
# end_directory_sensor_marker | # end_directory_sensor_marker | ||||
# start_sensor_testing_no | # start_sensor_testing_no | ||||
from dagster import validate_run_config | from dagster import validate_run_config | ||||
@sensor(pipeline_name="log_file_pipeline") | @sensor(pipeline_name="log_file_pipeline") | ||||
def sensor_to_test(): | def sensor_to_test(): | ||||
yield RunRequest( | yield RunRequest( | ||||
run_key="foo", | run_key="foo", | ||||
run_config={"solids": {"process_file": {"config": {"filename": "foo"}}}}, | run_config={ | ||||
"solids": {"process_file": {"config": {"filename": "foo"}}} | |||||
}, | |||||
) | ) | ||||
def test_sensor(): | def test_sensor(): | ||||
for run_request in sensor_to_test(): | for run_request in sensor_to_test(): | ||||
assert validate_run_config(log_file_pipeline, run_request.run_config) | assert validate_run_config(log_file_pipeline, run_request.run_config) | ||||
# end_sensor_testing_no | # end_sensor_testing_no | ||||
def isolated_run_request(): | def isolated_run_request(): | ||||
filename = "placeholder" | filename = "placeholder" | ||||
# start_run_request_marker | # start_run_request_marker | ||||
yield RunRequest( | yield RunRequest( | ||||
run_key=filename, | run_key=filename, | ||||
run_config={"solids": {"process_file": {"config": {"filename": filename}}}}, | run_config={ | ||||
"solids": {"process_file": {"config": {"filename": filename}}} | |||||
}, | |||||
) | ) | ||||
# end_run_request_marker | # end_run_request_marker | ||||
# start_interval_sensors_maker | # start_interval_sensors_maker | ||||
Show All 21 Lines | for filename in os.listdir(MY_DIRECTORY): | ||||
if os.path.isfile(filepath): | if os.path.isfile(filepath): | ||||
fstats = os.stat(filepath) | fstats = os.stat(filepath) | ||||
file_mtime = fstats.st_mtime | file_mtime = fstats.st_mtime | ||||
if file_mtime <= last_mtime: | if file_mtime <= last_mtime: | ||||
continue | continue | ||||
# the run key should include mtime if we want to kick off new runs based on file modifications | # the run key should include mtime if we want to kick off new runs based on file modifications | ||||
run_key = f"{filename}:{str(file_mtime)}" | run_key = f"{filename}:{str(file_mtime)}" | ||||
run_config = {"solids": {"process_file": {"config": {"filename": filename}}}} | run_config = { | ||||
"solids": {"process_file": {"config": {"filename": filename}}} | |||||
} | |||||
yield RunRequest(run_key=run_key, run_config=run_config) | yield RunRequest(run_key=run_key, run_config=run_config) | ||||
max_mtime = max(max_mtime, file_mtime) | max_mtime = max(max_mtime, file_mtime) | ||||
context.update_cursor(str(max_mtime)) | context.update_cursor(str(max_mtime)) | ||||
# end_cursor_sensors_marker | # end_cursor_sensors_marker | ||||
Show All 14 Lines | |||||
@sensor(pipeline_name="log_file_pipeline") | @sensor(pipeline_name="log_file_pipeline") | ||||
def my_directory_sensor_with_skip_reasons(): | def my_directory_sensor_with_skip_reasons(): | ||||
has_files = False | has_files = False | ||||
for filename in os.listdir(MY_DIRECTORY): | for filename in os.listdir(MY_DIRECTORY): | ||||
filepath = os.path.join(MY_DIRECTORY, filename) | filepath = os.path.join(MY_DIRECTORY, filename) | ||||
if os.path.isfile(filepath): | if os.path.isfile(filepath): | ||||
yield RunRequest( | yield RunRequest( | ||||
run_key=filename, | run_key=filename, | ||||
run_config={"solids": {"process_file": {"config": {"filename": filename}}}}, | run_config={ | ||||
"solids": { | |||||
"process_file": {"config": {"filename": filename}} | |||||
} | |||||
}, | |||||
) | ) | ||||
has_files = True | has_files = True | ||||
if not has_files: | if not has_files: | ||||
yield SkipReason(f"No files found in {MY_DIRECTORY}.") | yield SkipReason(f"No files found in {MY_DIRECTORY}.") | ||||
# end_skip_sensors_marker | # end_skip_sensors_marker | ||||
# start_asset_sensors_marker | # start_asset_sensors_marker | ||||
from dagster import AssetKey | from dagster import AssetKey | ||||
@sensor(pipeline_name="my_pipeline") | @sensor(pipeline_name="my_pipeline") | ||||
def my_asset_sensor(context): | def my_asset_sensor(context): | ||||
events = context.instance.events_for_asset_key( | events = context.instance.events_for_asset_key( | ||||
AssetKey("my_table"), after_cursor=context.cursor, ascending=False, limit=1 | AssetKey("my_table"), | ||||
after_cursor=context.cursor, | |||||
ascending=False, | |||||
limit=1, | |||||
) | ) | ||||
if not events: | if not events: | ||||
return | return | ||||
record_id, event = events[0] # take the most recent materialization | record_id, event = events[0] # take the most recent materialization | ||||
yield RunRequest( | yield RunRequest( | ||||
run_key=str(record_id), run_config={}, tags={"source_pipeline": event.pipeline_name} | run_key=str(record_id), | ||||
run_config={}, | |||||
tags={"source_pipeline": event.pipeline_name}, | |||||
) | ) | ||||
context.update_cursor(str(record_id)) | context.update_cursor(str(record_id)) | ||||
# end_asset_sensors_marker | # end_asset_sensors_marker | ||||
# start_s3_sensors_marker | # start_s3_sensors_marker | ||||
Show All 15 Lines | |||||
@pipeline | @pipeline | ||||
def my_pipeline(): | def my_pipeline(): | ||||
pass | pass | ||||
@repository | @repository | ||||
def my_repository(): | def my_repository(): | ||||
return [my_pipeline, log_file_pipeline, my_directory_sensor, sensor_A, sensor_B] | return [ | ||||
my_pipeline, | |||||
log_file_pipeline, | |||||
my_directory_sensor, | |||||
sensor_A, | |||||
sensor_B, | |||||
] |