Differential D8684 Diff 40955 examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/sensors/sensors.py
Changeset View
Changeset View
Standalone View
Standalone View
examples/docs_snippets/docs_snippets/concepts/partitions_schedules_sensors/sensors/sensors.py
Show All 28 Lines | |||||
@sensor(pipeline_name="log_file_pipeline") | @sensor(pipeline_name="log_file_pipeline") | ||||
def my_directory_sensor(): | def my_directory_sensor(): | ||||
for filename in os.listdir(MY_DIRECTORY): | for filename in os.listdir(MY_DIRECTORY): | ||||
filepath = os.path.join(MY_DIRECTORY, filename) | filepath = os.path.join(MY_DIRECTORY, filename) | ||||
if os.path.isfile(filepath): | if os.path.isfile(filepath): | ||||
yield RunRequest( | yield RunRequest( | ||||
run_key=filename, | run_key=filename, | ||||
run_config={"solids": {"process_file": {"config": {"filename": filename}}}}, | run_config={ | ||||
"solids": { | |||||
"process_file": {"config": {"filename": filename}} | |||||
} | |||||
}, | |||||
) | ) | ||||
# end_directory_sensor_marker | # end_directory_sensor_marker | ||||
# start_sensor_testing_no | # start_sensor_testing_no | ||||
from dagster import validate_run_config | from dagster import validate_run_config | ||||
@sensor(pipeline_name="log_file_pipeline") | @sensor(pipeline_name="log_file_pipeline") | ||||
def sensor_to_test(): | def sensor_to_test(): | ||||
yield RunRequest( | yield RunRequest( | ||||
run_key="foo", | run_key="foo", | ||||
run_config={"solids": {"process_file": {"config": {"filename": "foo"}}}}, | run_config={ | ||||
"solids": {"process_file": {"config": {"filename": "foo"}}} | |||||
}, | |||||
) | ) | ||||
def test_sensor(): | def test_sensor(): | ||||
for run_request in sensor_to_test(): | for run_request in sensor_to_test(): | ||||
assert validate_run_config(log_file_pipeline, run_request.run_config) | assert validate_run_config(log_file_pipeline, run_request.run_config) | ||||
# end_sensor_testing_no | # end_sensor_testing_no | ||||
def isolated_run_request(): | def isolated_run_request(): | ||||
filename = "placeholder" | filename = "placeholder" | ||||
# start_run_request_marker | # start_run_request_marker | ||||
yield RunRequest( | yield RunRequest( | ||||
run_key=filename, | run_key=filename, | ||||
run_config={"solids": {"process_file": {"config": {"filename": filename}}}}, | run_config={ | ||||
"solids": {"process_file": {"config": {"filename": filename}}} | |||||
}, | |||||
) | ) | ||||
# end_run_request_marker | # end_run_request_marker | ||||
# start_interval_sensors_maker | # start_interval_sensors_maker | ||||
Show All 21 Lines | for filename in os.listdir(MY_DIRECTORY): | ||||
if os.path.isfile(filepath): | if os.path.isfile(filepath): | ||||
fstats = os.stat(filepath) | fstats = os.stat(filepath) | ||||
file_mtime = fstats.st_mtime | file_mtime = fstats.st_mtime | ||||
if file_mtime <= last_mtime: | if file_mtime <= last_mtime: | ||||
continue | continue | ||||
# the run key should include mtime if we want to kick off new runs based on file modifications | # the run key should include mtime if we want to kick off new runs based on file modifications | ||||
run_key = f"{filename}:{str(file_mtime)}" | run_key = f"{filename}:{str(file_mtime)}" | ||||
run_config = {"solids": {"process_file": {"config": {"filename": filename}}}} | run_config = { | ||||
"solids": {"process_file": {"config": {"filename": filename}}} | |||||
} | |||||
yield RunRequest(run_key=run_key, run_config=run_config) | yield RunRequest(run_key=run_key, run_config=run_config) | ||||
max_mtime = max(max_mtime, file_mtime) | max_mtime = max(max_mtime, file_mtime) | ||||
context.update_cursor(str(max_mtime)) | context.update_cursor(str(max_mtime)) | ||||
# end_cursor_sensors_marker | # end_cursor_sensors_marker | ||||
Show All 14 Lines | |||||
@sensor(pipeline_name="log_file_pipeline") | @sensor(pipeline_name="log_file_pipeline") | ||||
def my_directory_sensor_with_skip_reasons(): | def my_directory_sensor_with_skip_reasons(): | ||||
has_files = False | has_files = False | ||||
for filename in os.listdir(MY_DIRECTORY): | for filename in os.listdir(MY_DIRECTORY): | ||||
filepath = os.path.join(MY_DIRECTORY, filename) | filepath = os.path.join(MY_DIRECTORY, filename) | ||||
if os.path.isfile(filepath): | if os.path.isfile(filepath): | ||||
yield RunRequest( | yield RunRequest( | ||||
run_key=filename, | run_key=filename, | ||||
run_config={"solids": {"process_file": {"config": {"filename": filename}}}}, | run_config={ | ||||
"solids": { | |||||
"process_file": {"config": {"filename": filename}} | |||||
} | |||||
}, | |||||
) | ) | ||||
has_files = True | has_files = True | ||||
if not has_files: | if not has_files: | ||||
yield SkipReason(f"No files found in {MY_DIRECTORY}.") | yield SkipReason(f"No files found in {MY_DIRECTORY}.") | ||||
# end_skip_sensors_marker | # end_skip_sensors_marker | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | |||||
@pipeline | @pipeline | ||||
def my_pipeline(): | def my_pipeline(): | ||||
pass | pass | ||||
@repository | @repository | ||||
def my_repository(): | def my_repository(): | ||||
return [my_pipeline, log_file_pipeline, my_directory_sensor, sensor_A, sensor_B] | return [ | ||||
my_pipeline, | |||||
log_file_pipeline, | |||||
my_directory_sensor, | |||||
sensor_A, | |||||
sensor_B, | |||||
] |