Changeset View
Changeset View
Standalone View
Standalone View
docs/content/concepts/partitions-schedules-sensors/schedules.mdx
Show First 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | |||||
```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_partition_based_schedule endbefore=end_partition_based_schedule | ```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_partition_based_schedule endbefore=end_partition_based_schedule | ||||
@daily_schedule( | @daily_schedule( | ||||
pipeline_name="my_pipeline", | pipeline_name="my_pipeline", | ||||
start_date=datetime.datetime(2021, 1, 1), | start_date=datetime.datetime(2021, 1, 1), | ||||
execution_time=datetime.time(11, 0), | execution_time=datetime.time(11, 0), | ||||
execution_timezone="US/Central", | execution_timezone="US/Central", | ||||
) | ) | ||||
def my_daily_schedule(date): | def my_daily_schedule(date): | ||||
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}} | return { | ||||
"solids": { | |||||
"process_data_for_date": { | |||||
"config": {"date": date.strftime("%Y-%m-%d")} | |||||
} | |||||
} | |||||
} | |||||
``` | ``` | ||||
The decorated schedule function should accept a `datetime` and return the run config for the pipeline run associated with that partition. The scheduler will ensure that the schedule function is evaluated exactly once for every partition to generate run config, which is then used to create and launch a pipeline run. | The decorated schedule function should accept a `datetime` and return the run config for the pipeline run associated with that partition. The scheduler will ensure that the schedule function is evaluated exactly once for every partition to generate run config, which is then used to create and launch a pipeline run. | ||||
Partition-based schedules require a `start_date` that indicates when the set of partitions begins. The scheduler will only kick off runs for times after both the `start_date` and the time when the schedule was turned on. You can kick off runs for times between the `start_date` and when the schedule is turned on by launching a [backfill](/concepts/partitions-schedules-sensors/backfills). | Partition-based schedules require a `start_date` that indicates when the set of partitions begins. The scheduler will only kick off runs for times after both the `start_date` and the time when the schedule was turned on. You can kick off runs for times between the `start_date` and when the schedule is turned on by launching a [backfill](/concepts/partitions-schedules-sensors/backfills). | ||||
By default, the partition that is used for the run will be one partition earlier than the partition that includes the current time, to capture a common ETL use case - for example, a daily schedule will fill in the previous day's partition, and a monthly schedule will fill in last month's partition. You can customize this behavior by changing the `partition_days_offset` parameter for a daily schedule. The default value of this parameter is 1, which means that the scheduler goes back one day to determine the partition. Setting the value to 0 will cause the schedule to fill in the partition that corresponds to the current day, and increasing it above 1 will cause it to fill in an even earlier partition. A similarly-named parameter also exists for the other execution intervals. | By default, the partition that is used for the run will be one partition earlier than the partition that includes the current time, to capture a common ETL use case - for example, a daily schedule will fill in the previous day's partition, and a monthly schedule will fill in last month's partition. You can customize this behavior by changing the `partition_days_offset` parameter for a daily schedule. The default value of this parameter is 1, which means that the scheduler goes back one day to determine the partition. Setting the value to 0 will cause the schedule to fill in the partition that corresponds to the current day, and increasing it above 1 will cause it to fill in an even earlier partition. A similarly-named parameter also exists for the other execution intervals. | ||||
### Non-partition-based schedules | ### Non-partition-based schedules | ||||
When you want to run a schedule on a fixed interval and don't need partitions, you can use the <PyObject object="schedule" decorator /> decorator to define your schedule. | When you want to run a schedule on a fixed interval and don't need partitions, you can use the <PyObject object="schedule" decorator /> decorator to define your schedule. | ||||
For example, this schedule runs at 1:00 AM in US/Central time every day: | For example, this schedule runs at 1:00 AM in US/Central time every day: | ||||
```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_non_partition_based_schedule endbefore=end_non_partition_based_schedule | ```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_non_partition_based_schedule endbefore=end_non_partition_based_schedule | ||||
@schedule(cron_schedule="0 1 * * *", pipeline_name="my_pipeline", execution_timezone="US/Central") | @schedule( | ||||
cron_schedule="0 1 * * *", | |||||
pipeline_name="my_pipeline", | |||||
execution_timezone="US/Central", | |||||
) | |||||
def my_schedule(_context): | def my_schedule(_context): | ||||
return {"solids": {"process_data": {"config": {"dataset_name": "my_dataset"}}}} | return { | ||||
"solids": {"process_data": {"config": {"dataset_name": "my_dataset"}}} | |||||
} | |||||
``` | ``` | ||||
You can also access the execution time fron the passed-in context: | You can also access the execution time fron the passed-in context: | ||||
```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_execution_time endbefore=end_execution_time | ```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_execution_time endbefore=end_execution_time | ||||
@schedule(cron_schedule="0 1 * * *", pipeline_name="my_pipeline", execution_timezone="US/Central") | @schedule( | ||||
cron_schedule="0 1 * * *", | |||||
pipeline_name="my_pipeline", | |||||
execution_timezone="US/Central", | |||||
) | |||||
def my_execution_time_schedule(context): | def my_execution_time_schedule(context): | ||||
date = context.scheduled_execution_time.strftime("%Y-%m-%d") | date = context.scheduled_execution_time.strftime("%Y-%m-%d") | ||||
return { | return { | ||||
"solids": { | "solids": { | ||||
"process_data": {"config": {"dataset_name": "my_dataset", "execution_date": date}} | "process_data": { | ||||
"config": { | |||||
"dataset_name": "my_dataset", | |||||
"execution_date": date, | |||||
} | |||||
} | |||||
} | } | ||||
} | } | ||||
``` | ``` | ||||
### Timezones | ### Timezones | ||||
You can customize the timezone in which your schedule executes by setting the `execution_timezone` parameter on your schedule to any [tz timezone](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones). Schedules with no timezone set run in UTC. | You can customize the timezone in which your schedule executes by setting the `execution_timezone` parameter on your schedule to any [tz timezone](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones). Schedules with no timezone set run in UTC. | ||||
For example, the following schedule executes daily at 9AM in US/Pacific time: | For example, the following schedule executes daily at 9AM in US/Pacific time: | ||||
```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_timezone endbefore=end_timezone | ```python file=concepts/partitions_schedules_sensors/schedules/schedules.py startafter=start_timezone endbefore=end_timezone | ||||
@daily_schedule( | @daily_schedule( | ||||
pipeline_name="my_data_pipeline", | pipeline_name="my_data_pipeline", | ||||
start_date=datetime.datetime(2020, 1, 1), | start_date=datetime.datetime(2020, 1, 1), | ||||
execution_time=datetime.time(9, 0), | execution_time=datetime.time(9, 0), | ||||
execution_timezone="US/Pacific", | execution_timezone="US/Pacific", | ||||
) | ) | ||||
def my_timezone_schedule(date): | def my_timezone_schedule(date): | ||||
return { | return { | ||||
"solids": { | "solids": { | ||||
"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d %H:%M:%S")}} | "process_data_for_date": { | ||||
"config": {"date": date.strftime("%Y-%m-%d %H:%M:%S")} | |||||
} | |||||
} | } | ||||
} | } | ||||
``` | ``` | ||||
### Daylight Savings Time | ### Daylight Savings Time | ||||
Because of Daylight Savings Time transitions, it's possible to specify an execution time that does not exist for every scheduled interval. For example, say you have a daily schedule with an execution time of 2:30 AM in the US/Eastern timezone. On 2019/03/10, the time jumps from 2:00 AM to 3:00 AM when Daylight Savings Time begins. Therefore, the time of 2:30 AM did not exist for the day. | Because of Daylight Savings Time transitions, it's possible to specify an execution time that does not exist for every scheduled interval. For example, say you have a daily schedule with an execution time of 2:30 AM in the US/Eastern timezone. On 2019/03/10, the time jumps from 2:00 AM to 3:00 AM when Daylight Savings Time begins. Therefore, the time of 2:30 AM did not exist for the day. | ||||
▲ Show 20 Lines • Show All 74 Lines • ▼ Show 20 Lines | return { | ||||
} | } | ||||
} | } | ||||
from dagster import build_schedule_context, validate_run_config | from dagster import build_schedule_context, validate_run_config | ||||
def test_my_cron_schedule_with_context(): | def test_my_cron_schedule_with_context(): | ||||
context = build_schedule_context(scheduled_execution_time=datetime.datetime(2020, 1, 1)) | context = build_schedule_context( | ||||
scheduled_execution_time=datetime.datetime(2020, 1, 1) | |||||
) | |||||
run_config = my_schedule_uses_context(context) | run_config = my_schedule_uses_context(context) | ||||
assert validate_run_config(pipeline_for_test, run_config) | assert validate_run_config(pipeline_for_test, run_config) | ||||
``` | ``` | ||||
## Examples | ## Examples | ||||
### Hourly partition-based schedule | ### Hourly partition-based schedule | ||||
```python file=concepts/partitions_schedules_sensors/schedules/schedule_examples.py startafter=start_hourly_schedule endbefore=end_hourly_schedule | ```python file=concepts/partitions_schedules_sensors/schedules/schedule_examples.py startafter=start_hourly_schedule endbefore=end_hourly_schedule | ||||
@hourly_schedule( | @hourly_schedule( | ||||
pipeline_name="my_pipeline", | pipeline_name="my_pipeline", | ||||
start_date=datetime.datetime(2020, 1, 1), | start_date=datetime.datetime(2020, 1, 1), | ||||
execution_time=datetime.time(hour=0, minute=25), | execution_time=datetime.time(hour=0, minute=25), | ||||
execution_timezone="US/Central", | execution_timezone="US/Central", | ||||
) | ) | ||||
def my_hourly_schedule(date): | def my_hourly_schedule(date): | ||||
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d %H")}}}} | return { | ||||
"solids": { | |||||
"process_data_for_date": { | |||||
"config": {"date": date.strftime("%Y-%m-%d %H")} | |||||
} | |||||
} | |||||
} | |||||
``` | ``` | ||||
### Daily partition-based schedule | ### Daily partition-based schedule | ||||
```python file=concepts/partitions_schedules_sensors/schedules/schedule_examples.py startafter=start_daily_schedule endbefore=end_daily_schedule | ```python file=concepts/partitions_schedules_sensors/schedules/schedule_examples.py startafter=start_daily_schedule endbefore=end_daily_schedule | ||||
@daily_schedule( | @daily_schedule( | ||||
pipeline_name="my_pipeline", | pipeline_name="my_pipeline", | ||||
start_date=datetime.datetime(2020, 1, 1), | start_date=datetime.datetime(2020, 1, 1), | ||||
execution_time=datetime.time(hour=9, minute=0), | execution_time=datetime.time(hour=9, minute=0), | ||||
execution_timezone="US/Central", | execution_timezone="US/Central", | ||||
) | ) | ||||
def my_daily_schedule(date): | def my_daily_schedule(date): | ||||
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}} | return { | ||||
"solids": { | |||||
"process_data_for_date": { | |||||
"config": {"date": date.strftime("%Y-%m-%d")} | |||||
} | |||||
} | |||||
} | |||||
``` | ``` | ||||
### Weekly partition-based schedule | ### Weekly partition-based schedule | ||||
```python file=concepts/partitions_schedules_sensors/schedules/schedule_examples.py startafter=start_weekly_schedule endbefore=end_weekly_schedule | ```python file=concepts/partitions_schedules_sensors/schedules/schedule_examples.py startafter=start_weekly_schedule endbefore=end_weekly_schedule | ||||
@weekly_schedule( | @weekly_schedule( | ||||
pipeline_name="my_pipeline", | pipeline_name="my_pipeline", | ||||
start_date=datetime.datetime(2020, 1, 1), | start_date=datetime.datetime(2020, 1, 1), | ||||
execution_day_of_week=1, # Monday | execution_day_of_week=1, # Monday | ||||
execution_timezone="US/Central", | execution_timezone="US/Central", | ||||
) | ) | ||||
def my_weekly_schedule(date): | def my_weekly_schedule(date): | ||||
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m-%d")}}}} | return { | ||||
"solids": { | |||||
"process_data_for_date": { | |||||
"config": {"date": date.strftime("%Y-%m-%d")} | |||||
} | |||||
} | |||||
} | |||||
``` | ``` | ||||
### Monthly partition-based schedule | ### Monthly partition-based schedule | ||||
```python file=concepts/partitions_schedules_sensors/schedules/schedule_examples.py startafter=start_monthly_schedule endbefore=end_monthly_schedule | ```python file=concepts/partitions_schedules_sensors/schedules/schedule_examples.py startafter=start_monthly_schedule endbefore=end_monthly_schedule | ||||
@monthly_schedule( | @monthly_schedule( | ||||
pipeline_name="my_pipeline", | pipeline_name="my_pipeline", | ||||
start_date=datetime.datetime(2020, 1, 1), | start_date=datetime.datetime(2020, 1, 1), | ||||
execution_timezone="US/Central", | execution_timezone="US/Central", | ||||
execution_day_of_month=15, | execution_day_of_month=15, | ||||
execution_time=datetime.time(hour=9, minute=0), | execution_time=datetime.time(hour=9, minute=0), | ||||
) | ) | ||||
def my_monthly_schedule(date): | def my_monthly_schedule(date): | ||||
return {"solids": {"process_data_for_date": {"config": {"date": date.strftime("%Y-%m")}}}} | return { | ||||
"solids": { | |||||
"process_data_for_date": { | |||||
"config": {"date": date.strftime("%Y-%m")} | |||||
} | |||||
} | |||||
} | |||||
``` | ``` | ||||
## Patterns | ## Patterns | ||||
### Using a preset in a schedule definition | ### Using a preset in a schedule definition | ||||
If you already have a preset defined for a pipeline you want to schedule, you can extract the necessary attributes from the preset and pass them to the schedule decorator. | If you already have a preset defined for a pipeline you want to schedule, you can extract the necessary attributes from the preset and pass them to the schedule decorator. | ||||
Show All 21 Lines | @daily_schedule( | ||||
start_date=datetime.datetime(2020, 1, 1), | start_date=datetime.datetime(2020, 1, 1), | ||||
pipeline_name="my_pipeline", | pipeline_name="my_pipeline", | ||||
solid_selection=preset.solid_selection, | solid_selection=preset.solid_selection, | ||||
mode=preset.mode, | mode=preset.mode, | ||||
tags_fn_for_date=lambda _: preset.tags, | tags_fn_for_date=lambda _: preset.tags, | ||||
) | ) | ||||
def my_modified_preset_schedule(date): | def my_modified_preset_schedule(date): | ||||
modified_run_config = copy.deepcopy(preset.run_config) | modified_run_config = copy.deepcopy(preset.run_config) | ||||
modified_run_config["solids"]["process_data_for_date"]["config"]["date"] = date.strftime( | modified_run_config["solids"]["process_data_for_date"]["config"][ | ||||
"%Y-%m-%d" | "date" | ||||
) | ] = date.strftime("%Y-%m-%d") | ||||
return modified_run_config | return modified_run_config | ||||
``` | ``` | ||||
If you find yourself using presets to generate schedule definitions frequently, you can use a helper function similar to this one to take a preset and return a schedule. | If you find yourself using presets to generate schedule definitions frequently, you can use a helper function similar to this one to take a preset and return a schedule. | ||||
```python file=concepts/partitions_schedules_sensors/schedules/preset_helper.py startafter=start_preset_helper endbefore=end_preset_helper | ```python file=concepts/partitions_schedules_sensors/schedules/preset_helper.py startafter=start_preset_helper endbefore=end_preset_helper | ||||
def daily_schedule_definition_from_pipeline_preset(pipeline, preset_name, start_date): | def daily_schedule_definition_from_pipeline_preset( | ||||
pipeline, preset_name, start_date | |||||
): | |||||
preset = pipeline.get_preset(preset_name) | preset = pipeline.get_preset(preset_name) | ||||
if not preset: | if not preset: | ||||
raise Exception( | raise Exception( | ||||
"Preset {preset_name} was not found " | "Preset {preset_name} was not found " | ||||
"on pipeline {pipeline_name}".format( | "on pipeline {pipeline_name}".format( | ||||
preset_name=preset_name, pipeline_name=pipeline.name | preset_name=preset_name, pipeline_name=pipeline.name | ||||
) | ) | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 43 Lines • Show Last 20 Lines |