Differential D4671 Diff 23887 python_modules/dagster/dagster/core/definitions/decorators/schedule.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/decorators/schedule.py
Show First 20 Lines • Show All 142 Lines • ▼ Show 20 Lines | ): | ||||
cron_schedule = "{minute} {hour} {day} * *".format( | cron_schedule = "{minute} {hour} {day} * *".format( | ||||
minute=execution_time.minute, hour=execution_time.hour, day=execution_day_of_month | minute=execution_time.minute, hour=execution_time.hour, day=execution_day_of_month | ||||
) | ) | ||||
fmt = "%Y-%m" | fmt = "%Y-%m" | ||||
delta = relativedelta(months=1) | delta = relativedelta(months=1) | ||||
execution_offset = relativedelta( | |||||
days=execution_day_of_month - 1, hours=execution_time.hour, minutes=execution_time.minute, | |||||
) | |||||
partition_fn = date_partition_range(start_date, end=end_date, delta=delta, fmt=fmt) | partition_fn = date_partition_range(start_date, end=end_date, delta=delta, fmt=fmt) | ||||
def inner(fn): | def inner(fn): | ||||
check.callable_param(fn, "fn") | check.callable_param(fn, "fn") | ||||
schedule_name = name or fn.__name__ | schedule_name = name or fn.__name__ | ||||
tags_fn_for_partition_value = lambda partition: {} | tags_fn_for_partition_value = lambda partition: {} | ||||
Show All 10 Lines | def inner(fn): | ||||
mode=mode, | mode=mode, | ||||
) | ) | ||||
return partition_set.create_schedule_definition( | return partition_set.create_schedule_definition( | ||||
schedule_name, | schedule_name, | ||||
cron_schedule, | cron_schedule, | ||||
should_execute=should_execute, | should_execute=should_execute, | ||||
environment_vars=environment_vars, | environment_vars=environment_vars, | ||||
partition_selector=create_default_partition_selector_fn(delta + execution_offset, fmt), | partition_selector=create_default_partition_selector_fn(delta, fmt), | ||||
) | ) | ||||
return inner | return inner | ||||
def weekly_schedule( | def weekly_schedule( | ||||
pipeline_name, | pipeline_name, | ||||
start_date, | start_date, | ||||
▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | cron_schedule = "{minute} {hour} * * {day}".format( | ||||
minute=execution_time.minute, hour=execution_time.hour, day=execution_day_of_week | minute=execution_time.minute, hour=execution_time.hour, day=execution_day_of_week | ||||
) | ) | ||||
fmt = "%Y-%m-%d" | fmt = "%Y-%m-%d" | ||||
delta = relativedelta(weeks=1) | delta = relativedelta(weeks=1) | ||||
day_difference = (execution_day_of_week - start_date.weekday()) % 7 | day_difference = (execution_day_of_week - start_date.weekday()) % 7 | ||||
execution_offset = relativedelta( | execution_offset = relativedelta(days=day_difference) | ||||
sashank: Oh wow this was just completely wrong before, wasn't it | |||||
Done Inline ActionsI don't think it was wrong when everything was in UTC - it may not even be wrong now, but its no longer needed to get the time exactly right as long as you're on the right day and will produce the right partition name dgibson: I don't think it was wrong when everything was in UTC - it may not even be wrong now, but its… | |||||
Done Inline Actionsonce you're no longer in UTC it's no longer true that, say, going back 3 hours at 3AM is no will always reach midnight, which you can count on in UTC dgibson: once you're no longer in UTC it's no longer true that, say, going back 3 hours at 3AM is no… | |||||
days=day_difference, hours=execution_time.hour, minutes=execution_time.minute | |||||
) | |||||
partition_fn = date_partition_range(start_date, end=end_date, delta=delta, fmt=fmt) | partition_fn = date_partition_range(start_date, end=end_date, delta=delta, fmt=fmt) | ||||
def inner(fn): | def inner(fn): | ||||
check.callable_param(fn, "fn") | check.callable_param(fn, "fn") | ||||
schedule_name = name or fn.__name__ | schedule_name = name or fn.__name__ | ||||
▲ Show 20 Lines • Show All 73 Lines • ▼ Show 20 Lines | ): | ||||
check.str_param(pipeline_name, "pipeline_name") | check.str_param(pipeline_name, "pipeline_name") | ||||
check.inst_param(execution_time, "execution_time", datetime.time) | check.inst_param(execution_time, "execution_time", datetime.time) | ||||
cron_schedule = "{minute} {hour} * * *".format( | cron_schedule = "{minute} {hour} * * *".format( | ||||
minute=execution_time.minute, hour=execution_time.hour | minute=execution_time.minute, hour=execution_time.hour | ||||
) | ) | ||||
delta = datetime.timedelta(days=1) | delta = datetime.timedelta(days=1) | ||||
fmt = "%Y-%m-%d" | |||||
partition_fn = date_partition_range(start_date, end=end_date, delta=delta) | partition_fn = date_partition_range(start_date, end=end_date, delta=delta) | ||||
execution_offset = datetime.timedelta(hours=execution_time.hour, minutes=execution_time.minute) | |||||
def inner(fn): | def inner(fn): | ||||
check.callable_param(fn, "fn") | check.callable_param(fn, "fn") | ||||
schedule_name = name or fn.__name__ | schedule_name = name or fn.__name__ | ||||
tags_fn_for_partition_value = lambda partition: {} | tags_fn_for_partition_value = lambda partition: {} | ||||
if tags_fn_for_date: | if tags_fn_for_date: | ||||
tags_fn_for_partition_value = lambda partition: tags_fn_for_date(partition.value) | tags_fn_for_partition_value = lambda partition: tags_fn_for_date(partition.value) | ||||
partition_set = PartitionSetDefinition( | partition_set = PartitionSetDefinition( | ||||
name="{}_partitions".format(schedule_name), | name="{}_partitions".format(schedule_name), | ||||
pipeline_name=pipeline_name, | pipeline_name=pipeline_name, | ||||
partition_fn=partition_fn, | partition_fn=partition_fn, | ||||
run_config_fn_for_partition=lambda partition: fn(partition.value), | run_config_fn_for_partition=lambda partition: fn(partition.value), | ||||
solid_selection=solid_selection, | solid_selection=solid_selection, | ||||
tags_fn_for_partition=tags_fn_for_partition_value, | tags_fn_for_partition=tags_fn_for_partition_value, | ||||
mode=mode, | mode=mode, | ||||
) | ) | ||||
return partition_set.create_schedule_definition( | return partition_set.create_schedule_definition( | ||||
schedule_name, | schedule_name, | ||||
cron_schedule, | cron_schedule, | ||||
should_execute=should_execute, | should_execute=should_execute, | ||||
environment_vars=environment_vars, | environment_vars=environment_vars, | ||||
partition_selector=create_default_partition_selector_fn(delta + execution_offset), | partition_selector=create_default_partition_selector_fn(fmt=fmt, delta=delta), | ||||
) | ) | ||||
return inner | return inner | ||||
def hourly_schedule( | def hourly_schedule( | ||||
pipeline_name, | pipeline_name, | ||||
start_date, | start_date, | ||||
▲ Show 20 Lines • Show All 85 Lines • ▼ Show 20 Lines | def inner(fn): | ||||
mode=mode, | mode=mode, | ||||
) | ) | ||||
return partition_set.create_schedule_definition( | return partition_set.create_schedule_definition( | ||||
schedule_name, | schedule_name, | ||||
cron_schedule, | cron_schedule, | ||||
should_execute=should_execute, | should_execute=should_execute, | ||||
environment_vars=environment_vars, | environment_vars=environment_vars, | ||||
partition_selector=create_default_partition_selector_fn(delta + execution_offset, fmt), | partition_selector=create_default_partition_selector_fn( | ||||
delta + execution_offset, | |||||
fmt, | |||||
# Express hourly partitions in UTC so that they don't change | |||||
# depending on what timezone the schedule runs in | |||||
partition_in_utc=True, | |||||
), | |||||
) | ) | ||||
return inner | return inner |
Oh wow this was just completely wrong before, wasn't it