Differential D4787 Diff 24117 python_modules/dagster/dagster/core/definitions/decorators/schedule.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/decorators/schedule.py
Show First 20 Lines • Show All 143 Lines • ▼ Show 20 Lines | ): | ||||
mode = check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME) | mode = check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME) | ||||
check.opt_callable_param(should_execute, "should_execute") | check.opt_callable_param(should_execute, "should_execute") | ||||
check.opt_dict_param(environment_vars, "environment_vars", key_type=str, value_type=str) | check.opt_dict_param(environment_vars, "environment_vars", key_type=str, value_type=str) | ||||
check.str_param(pipeline_name, "pipeline_name") | check.str_param(pipeline_name, "pipeline_name") | ||||
check.int_param(execution_day_of_month, "execution_day") | check.int_param(execution_day_of_month, "execution_day") | ||||
check.inst_param(execution_time, "execution_time", datetime.time) | check.inst_param(execution_time, "execution_time", datetime.time) | ||||
check.opt_str_param(execution_timezone, "execution_timezone") | check.opt_str_param(execution_timezone, "execution_timezone") | ||||
if ( | |||||
start_date.day != 1 | |||||
or start_date.hour != 0 | |||||
or start_date.minute != 0 | |||||
or start_date.second != 0 | |||||
): | |||||
raise DagsterInvalidDefinitionError( | |||||
"`start_date` must be at the beginning of the first day of the month for a monthly " | |||||
"schedule. Use `execution_day_of_month` and `execution_time` to execute the schedule " | |||||
"at a specific time within the month. For example, to run the schedule at 3AM on the " | |||||
"23rd of each month starting in October, your schedule definition would look like:" | |||||
""" | |||||
@monthly_schedule( | |||||
start_date=datetime.datetime(2020, 10, 1), | |||||
execution_day_of_month=23, | |||||
execution_time=datetime.time(3, 0) | |||||
): | |||||
def my_schedule_definition(_): | |||||
... | |||||
""" | |||||
) | |||||
if execution_day_of_month <= 0 or execution_day_of_month > 31: | if execution_day_of_month <= 0 or execution_day_of_month > 31: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
"`execution_day_of_month={}` is not valid for monthly schedule. Execution day must be " | "`execution_day_of_month={}` is not valid for monthly schedule. Execution day must be " | ||||
"between 1 and 31".format(execution_day_of_month) | "between 1 and 31".format(execution_day_of_month) | ||||
) | ) | ||||
cron_schedule = "{minute} {hour} {day} * *".format( | cron_schedule = "{minute} {hour} {day} * *".format( | ||||
minute=execution_time.minute, hour=execution_time.hour, day=execution_day_of_month | minute=execution_time.minute, hour=execution_time.hour, day=execution_day_of_month | ||||
▲ Show 20 Lines • Show All 92 Lines • ▼ Show 20 Lines | ): | ||||
mode = check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME) | mode = check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME) | ||||
check.opt_callable_param(should_execute, "should_execute") | check.opt_callable_param(should_execute, "should_execute") | ||||
check.opt_dict_param(environment_vars, "environment_vars", key_type=str, value_type=str) | check.opt_dict_param(environment_vars, "environment_vars", key_type=str, value_type=str) | ||||
check.str_param(pipeline_name, "pipeline_name") | check.str_param(pipeline_name, "pipeline_name") | ||||
check.int_param(execution_day_of_week, "execution_day_of_week") | check.int_param(execution_day_of_week, "execution_day_of_week") | ||||
check.inst_param(execution_time, "execution_time", datetime.time) | check.inst_param(execution_time, "execution_time", datetime.time) | ||||
check.opt_str_param(execution_timezone, "execution_timezone") | check.opt_str_param(execution_timezone, "execution_timezone") | ||||
if start_date.hour != 0 or start_date.minute != 0 or start_date.second != 0: | |||||
raise DagsterInvalidDefinitionError( | |||||
"`start_date` must be at the beginning of a day for a weekly schedule. " | |||||
"Use `execution_time` to execute the schedule at a specific time of day. For example, " | |||||
"to run the schedule at 3AM each Tuesday starting on 10/20/2020, your schedule " | |||||
"definition would look like:" | |||||
""" | |||||
@weekly_schedule( | |||||
start_date=datetime.datetime(2020, 10, 20), | |||||
execution_day_of_week=1, | |||||
execution_time=datetime.time(3, 0) | |||||
): | |||||
def my_schedule_definition(_): | |||||
... | |||||
""" | |||||
) | |||||
if execution_day_of_week < 0 or execution_day_of_week >= 7: | if execution_day_of_week < 0 or execution_day_of_week >= 7: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
"`execution_day_of_week={}` is not valid for weekly schedule. Execution day must be " | "`execution_day_of_week={}` is not valid for weekly schedule. Execution day must be " | ||||
"between 0 [Sunday] and 6 [Saturday]".format(execution_day_of_week) | "between 0 [Sunday] and 6 [Saturday]".format(execution_day_of_week) | ||||
) | ) | ||||
cron_schedule = "{minute} {hour} * * {day}".format( | cron_schedule = "{minute} {hour} * * {day}".format( | ||||
minute=execution_time.minute, hour=execution_time.hour, day=execution_day_of_week | minute=execution_time.minute, hour=execution_time.hour, day=execution_day_of_week | ||||
▲ Show 20 Lines • Show All 91 Lines • ▼ Show 20 Lines | ): | ||||
check.opt_inst_param(end_date, "end_date", datetime.datetime) | check.opt_inst_param(end_date, "end_date", datetime.datetime) | ||||
check.opt_callable_param(tags_fn_for_date, "tags_fn_for_date") | check.opt_callable_param(tags_fn_for_date, "tags_fn_for_date") | ||||
check.opt_nullable_list_param(solid_selection, "solid_selection", of_type=str) | check.opt_nullable_list_param(solid_selection, "solid_selection", of_type=str) | ||||
mode = check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME) | mode = check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME) | ||||
check.opt_callable_param(should_execute, "should_execute") | check.opt_callable_param(should_execute, "should_execute") | ||||
check.opt_dict_param(environment_vars, "environment_vars", key_type=str, value_type=str) | check.opt_dict_param(environment_vars, "environment_vars", key_type=str, value_type=str) | ||||
check.opt_str_param(execution_timezone, "execution_timezone") | check.opt_str_param(execution_timezone, "execution_timezone") | ||||
if start_date.hour != 0 or start_date.minute != 0 or start_date.second != 0: | |||||
raise DagsterInvalidDefinitionError( | |||||
"`start_date` must be at the beginning of a day for a daily schedule. " | |||||
"Use `execution_time` to execute the schedule at a specific time of day. For example, " | |||||
sashank: Nit: "at execute the schedule at a specified time in the day"? Same for the other messages. | |||||
Done Inline ActionsSounds good, will add the example (minus the timezones since that isn't ready yet until the new scheduler is documented and stuff) dgibson: Sounds good, will add the example (minus the timezones since that isn't ready yet until the new… | |||||
"to run the schedule at 3AM each day starting on 10/20/2020, your schedule " | |||||
"definition would look like:" | |||||
""" | |||||
@daily_schedule( | |||||
start_date=datetime.datetime(2020, 10, 20), | |||||
execution_time=datetime.time(3, 0) | |||||
): | |||||
def my_schedule_definition(_): | |||||
... | |||||
""" | |||||
) | |||||
cron_schedule = "{minute} {hour} * * *".format( | cron_schedule = "{minute} {hour} * * *".format( | ||||
minute=execution_time.minute, hour=execution_time.hour | minute=execution_time.minute, hour=execution_time.hour | ||||
) | ) | ||||
fmt = DEFAULT_DATE_FORMAT | fmt = DEFAULT_DATE_FORMAT | ||||
partition_fn = date_partition_range( | partition_fn = date_partition_range( | ||||
start_date, end=end_date, delta_range="days", timezone=execution_timezone, | start_date, end=end_date, delta_range="days", timezone=execution_timezone, | ||||
▲ Show 20 Lines • Show All 84 Lines • ▼ Show 20 Lines | ): | ||||
check.opt_nullable_list_param(solid_selection, "solid_selection", of_type=str) | check.opt_nullable_list_param(solid_selection, "solid_selection", of_type=str) | ||||
mode = check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME) | mode = check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME) | ||||
check.opt_callable_param(should_execute, "should_execute") | check.opt_callable_param(should_execute, "should_execute") | ||||
check.opt_dict_param(environment_vars, "environment_vars", key_type=str, value_type=str) | check.opt_dict_param(environment_vars, "environment_vars", key_type=str, value_type=str) | ||||
check.str_param(pipeline_name, "pipeline_name") | check.str_param(pipeline_name, "pipeline_name") | ||||
check.inst_param(execution_time, "execution_time", datetime.time) | check.inst_param(execution_time, "execution_time", datetime.time) | ||||
check.opt_str_param(execution_timezone, "execution_timezone") | check.opt_str_param(execution_timezone, "execution_timezone") | ||||
if start_date.minute != 0 or start_date.second != 0: | |||||
raise DagsterInvalidDefinitionError( | |||||
"`start_date` must be at the beginning of the hour for an hourly schedule. " | |||||
"Use `execution_time` to execute the schedule at a specific time within the hour. For " | |||||
"example, to run the schedule each hour at 15 minutes past the hour starting at 3AM " | |||||
"on 10/20/2020, your schedule definition would look like:" | |||||
""" | |||||
@hourly_schedule( | |||||
start_date=datetime.datetime(2020, 10, 20, 3), | |||||
execution_time=datetime.time(0, 15) | |||||
): | |||||
def my_schedule_definition(_): | |||||
... | |||||
""" | |||||
) | |||||
if execution_time.hour != 0: | if execution_time.hour != 0: | ||||
warnings.warn( | warnings.warn( | ||||
"Hourly schedule {schedule_name} created with:\n" | "Hourly schedule {schedule_name} created with:\n" | ||||
"\tschedule_time=datetime.time(hour={hour}, minute={minute}, ...)." | "\tschedule_time=datetime.time(hour={hour}, minute={minute}, ...)." | ||||
"Since this is a hourly schedule, the hour parameter will be ignored and the schedule " | "Since this is an hourly schedule, the hour parameter will be ignored and the schedule " | ||||
"will run on the {minute} mark for the previous hour interval. Replace " | "will run on the {minute} mark for the previous hour interval. Replace " | ||||
"datetime.time(hour={hour}, minute={minute}, ...) with " | "datetime.time(hour={hour}, minute={minute}, ...) with " | ||||
"datetime.time(minute={minute}, ...) to fix this warning." | "datetime.time(minute={minute}, ...) to fix this warning." | ||||
) | ) | ||||
cron_schedule = "{minute} * * * *".format(minute=execution_time.minute) | cron_schedule = "{minute} * * * *".format(minute=execution_time.minute) | ||||
fmt = ( | fmt = ( | ||||
▲ Show 20 Lines • Show All 43 Lines • Show Last 20 Lines |
Nit: "at execute the schedule at a specified time in the day"? Same for the other messages.
It might be a good idea to have a tiny example right here in these error messages, because I don't think it's entirely intuitive, but an example would immediately clarify what's wrong.
Something along the lines of: