Differential D8612 Diff 40567 python_modules/dagster/dagster/core/definitions/decorators/schedule.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/decorators/schedule.py
import datetime | import datetime | ||||
import warnings | import warnings | ||||
from functools import update_wrapper | from functools import update_wrapper | ||||
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, cast | from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, cast | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions.partition import ( | from dagster.core.definitions.partition import ( | ||||
PartitionScheduleDefinition, | PartitionScheduleDefinition, | ||||
PartitionSetDefinition, | PartitionSetDefinition, | ||||
ScheduleType, | ScheduleType, | ||||
TimeBasedPartitionParams, | TimeBasedPartitions, | ||||
) | ) | ||||
from dagster.core.definitions.pipeline import PipelineDefinition | from dagster.core.definitions.pipeline import PipelineDefinition | ||||
from dagster.core.errors import DagsterInvalidDefinitionError | from dagster.core.errors import DagsterInvalidDefinitionError | ||||
from dagster.utils.partitions import ( | from dagster.utils.partitions import ( | ||||
DEFAULT_DATE_FORMAT, | DEFAULT_DATE_FORMAT, | ||||
DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE, | DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE, | ||||
DEFAULT_HOURLY_FORMAT_WITH_TIMEZONE, | DEFAULT_HOURLY_FORMAT_WITH_TIMEZONE, | ||||
DEFAULT_MONTHLY_FORMAT, | DEFAULT_MONTHLY_FORMAT, | ||||
▲ Show 20 Lines • Show All 198 Lines • ▼ Show 20 Lines | def inner(fn: Callable[[datetime.datetime], Dict[str, Any]]) -> PartitionScheduleDefinition: | ||||
if tags_fn_for_date: | if tags_fn_for_date: | ||||
tags_fn = cast( | tags_fn = cast( | ||||
Callable[[datetime.datetime], Optional[Dict[str, str]]], tags_fn_for_date | Callable[[datetime.datetime], Optional[Dict[str, str]]], tags_fn_for_date | ||||
) | ) | ||||
tags_fn_for_partition_value = lambda partition: tags_fn(partition.value) | tags_fn_for_partition_value = lambda partition: tags_fn(partition.value) | ||||
fmt = DEFAULT_MONTHLY_FORMAT | fmt = DEFAULT_MONTHLY_FORMAT | ||||
partition_params = TimeBasedPartitionParams( | partitions = TimeBasedPartitions( | ||||
schedule_type=ScheduleType.MONTHLY, | schedule_type=ScheduleType.MONTHLY, | ||||
start=start_date, | start=start_date, | ||||
execution_day=execution_day_of_month, | execution_day=execution_day_of_month, | ||||
execution_time=execution_time, | execution_time=execution_time, | ||||
end=end_date, | end=end_date, | ||||
fmt=fmt, | fmt=fmt, | ||||
timezone=execution_timezone, | timezone=execution_timezone, | ||||
offset=partition_months_offset, | offset=partition_months_offset, | ||||
) | ) | ||||
partition_set = PartitionSetDefinition( | partition_set = PartitionSetDefinition( | ||||
name="{}_partitions".format(schedule_name), | name="{}_partitions".format(schedule_name), | ||||
pipeline_name=pipeline_name, | pipeline_name=pipeline_name, # type: ignore[arg-type] | ||||
run_config_fn_for_partition=lambda partition: fn(partition.value), | run_config_fn_for_partition=lambda partition: fn(partition.value), | ||||
solid_selection=solid_selection, | solid_selection=solid_selection, | ||||
tags_fn_for_partition=tags_fn_for_partition_value, | tags_fn_for_partition=tags_fn_for_partition_value, | ||||
mode=mode, | mode=mode, | ||||
partition_params=partition_params, | partitions=partitions, | ||||
) | ) | ||||
schedule_def = partition_set.create_schedule_definition( | schedule_def = partition_set.create_schedule_definition( | ||||
schedule_name, | schedule_name, | ||||
partition_params.get_cron_schedule(), | partitions.get_cron_schedule(), | ||||
should_execute=should_execute, | should_execute=should_execute, | ||||
environment_vars=environment_vars, | environment_vars=environment_vars, | ||||
partition_selector=create_offset_partition_selector( | partition_selector=create_offset_partition_selector( | ||||
execution_time_to_partition_fn=partition_params.get_execution_time_to_partition_fn() | execution_time_to_partition_fn=partitions.get_execution_time_to_partition_fn() | ||||
), | ), | ||||
execution_timezone=execution_timezone, | execution_timezone=execution_timezone, | ||||
description=description, | description=description, | ||||
decorated_fn=fn, | decorated_fn=fn, | ||||
job=job, | job=job, | ||||
) | ) | ||||
update_wrapper(schedule_def, wrapped=fn) | update_wrapper(schedule_def, wrapped=fn) | ||||
▲ Show 20 Lines • Show All 108 Lines • ▼ Show 20 Lines | def inner(fn: Callable[[datetime.datetime], Dict[str, Any]]) -> PartitionScheduleDefinition: | ||||
if tags_fn_for_date: | if tags_fn_for_date: | ||||
tags_fn = cast( | tags_fn = cast( | ||||
Callable[[datetime.datetime], Optional[Dict[str, str]]], tags_fn_for_date | Callable[[datetime.datetime], Optional[Dict[str, str]]], tags_fn_for_date | ||||
) | ) | ||||
tags_fn_for_partition_value = lambda partition: tags_fn(partition.value) | tags_fn_for_partition_value = lambda partition: tags_fn(partition.value) | ||||
fmt = DEFAULT_DATE_FORMAT | fmt = DEFAULT_DATE_FORMAT | ||||
partition_params = TimeBasedPartitionParams( | partitions = TimeBasedPartitions( | ||||
schedule_type=ScheduleType.WEEKLY, | schedule_type=ScheduleType.WEEKLY, | ||||
start=start_date, | start=start_date, | ||||
execution_time=execution_time, | execution_time=execution_time, | ||||
execution_day=execution_day_of_week, | execution_day=execution_day_of_week, | ||||
end=end_date, | end=end_date, | ||||
fmt=fmt, | fmt=fmt, | ||||
timezone=execution_timezone, | timezone=execution_timezone, | ||||
offset=partition_weeks_offset, | offset=partition_weeks_offset, | ||||
) | ) | ||||
partition_set = PartitionSetDefinition( | partition_set = PartitionSetDefinition( | ||||
name="{}_partitions".format(schedule_name), | name="{}_partitions".format(schedule_name), | ||||
pipeline_name=pipeline_name, | pipeline_name=pipeline_name, # type: ignore[arg-type] | ||||
run_config_fn_for_partition=lambda partition: fn(partition.value), | run_config_fn_for_partition=lambda partition: fn(partition.value), | ||||
solid_selection=solid_selection, | solid_selection=solid_selection, | ||||
tags_fn_for_partition=tags_fn_for_partition_value, | tags_fn_for_partition=tags_fn_for_partition_value, | ||||
mode=mode, | mode=mode, | ||||
partition_params=partition_params, | partitions=partitions, | ||||
) | ) | ||||
schedule_def = partition_set.create_schedule_definition( | schedule_def = partition_set.create_schedule_definition( | ||||
schedule_name, | schedule_name, | ||||
partition_params.get_cron_schedule(), | partitions.get_cron_schedule(), | ||||
should_execute=should_execute, | should_execute=should_execute, | ||||
environment_vars=environment_vars, | environment_vars=environment_vars, | ||||
partition_selector=create_offset_partition_selector( | partition_selector=create_offset_partition_selector( | ||||
execution_time_to_partition_fn=partition_params.get_execution_time_to_partition_fn(), | execution_time_to_partition_fn=partitions.get_execution_time_to_partition_fn(), | ||||
), | ), | ||||
execution_timezone=execution_timezone, | execution_timezone=execution_timezone, | ||||
description=description, | description=description, | ||||
decorated_fn=fn, | decorated_fn=fn, | ||||
job=job, | job=job, | ||||
) | ) | ||||
update_wrapper(schedule_def, wrapped=fn) | update_wrapper(schedule_def, wrapped=fn) | ||||
▲ Show 20 Lines • Show All 98 Lines • ▼ Show 20 Lines | def inner(fn: Callable[[datetime.datetime], Dict[str, Any]]) -> PartitionScheduleDefinition: | ||||
["Partition"], Optional[Dict[str, str]] | ["Partition"], Optional[Dict[str, str]] | ||||
] = lambda partition: {} | ] = lambda partition: {} | ||||
if tags_fn_for_date: | if tags_fn_for_date: | ||||
tags_fn = cast( | tags_fn = cast( | ||||
Callable[[datetime.datetime], Optional[Dict[str, str]]], tags_fn_for_date | Callable[[datetime.datetime], Optional[Dict[str, str]]], tags_fn_for_date | ||||
) | ) | ||||
tags_fn_for_partition_value = lambda partition: tags_fn(partition.value) | tags_fn_for_partition_value = lambda partition: tags_fn(partition.value) | ||||
partition_params = TimeBasedPartitionParams( | partitions = TimeBasedPartitions( | ||||
schedule_type=ScheduleType.DAILY, | schedule_type=ScheduleType.DAILY, | ||||
start=start_date, | start=start_date, | ||||
execution_time=execution_time, | execution_time=execution_time, | ||||
end=end_date, | end=end_date, | ||||
fmt=fmt, | fmt=fmt, | ||||
timezone=execution_timezone, | timezone=execution_timezone, | ||||
offset=partition_days_offset, | offset=partition_days_offset, | ||||
) | ) | ||||
partition_set = PartitionSetDefinition( | partition_set = PartitionSetDefinition( | ||||
name="{}_partitions".format(schedule_name), | name="{}_partitions".format(schedule_name), | ||||
pipeline_name=pipeline_name, | pipeline_name=pipeline_name, # type: ignore[arg-type] | ||||
run_config_fn_for_partition=lambda partition: fn(partition.value), | run_config_fn_for_partition=lambda partition: fn(partition.value), | ||||
solid_selection=solid_selection, | solid_selection=solid_selection, | ||||
tags_fn_for_partition=tags_fn_for_partition_value, | tags_fn_for_partition=tags_fn_for_partition_value, | ||||
mode=mode, | mode=mode, | ||||
partition_params=partition_params, | partitions=partitions, | ||||
) | ) | ||||
schedule_def = partition_set.create_schedule_definition( | schedule_def = partition_set.create_schedule_definition( | ||||
schedule_name, | schedule_name, | ||||
partition_params.get_cron_schedule(), | partitions.get_cron_schedule(), | ||||
should_execute=should_execute, | should_execute=should_execute, | ||||
environment_vars=environment_vars, | environment_vars=environment_vars, | ||||
partition_selector=create_offset_partition_selector( | partition_selector=create_offset_partition_selector( | ||||
execution_time_to_partition_fn=partition_params.get_execution_time_to_partition_fn(), | execution_time_to_partition_fn=partitions.get_execution_time_to_partition_fn(), | ||||
), | ), | ||||
execution_timezone=execution_timezone, | execution_timezone=execution_timezone, | ||||
description=description, | description=description, | ||||
decorated_fn=fn, | decorated_fn=fn, | ||||
job=job, | job=job, | ||||
) | ) | ||||
update_wrapper(schedule_def, wrapped=fn) | update_wrapper(schedule_def, wrapped=fn) | ||||
return schedule_def | return schedule_def | ||||
▲ Show 20 Lines • Show All 113 Lines • ▼ Show 20 Lines | def inner(fn: Callable[[datetime.datetime], Dict[str, Any]]) -> PartitionScheduleDefinition: | ||||
tags_fn_for_partition_value = lambda partition: tags_fn(partition.value) | tags_fn_for_partition_value = lambda partition: tags_fn(partition.value) | ||||
fmt = ( | fmt = ( | ||||
DEFAULT_HOURLY_FORMAT_WITH_TIMEZONE | DEFAULT_HOURLY_FORMAT_WITH_TIMEZONE | ||||
if execution_timezone | if execution_timezone | ||||
else DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE | else DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE | ||||
) | ) | ||||
partition_params = TimeBasedPartitionParams( | partitions = TimeBasedPartitions( | ||||
schedule_type=ScheduleType.HOURLY, | schedule_type=ScheduleType.HOURLY, | ||||
start=start_date, | start=start_date, | ||||
execution_time=execution_time, | execution_time=execution_time, | ||||
end=end_date, | end=end_date, | ||||
fmt=fmt, | fmt=fmt, | ||||
timezone=execution_timezone, | timezone=execution_timezone, | ||||
offset=partition_hours_offset, | offset=partition_hours_offset, | ||||
) | ) | ||||
partition_set = PartitionSetDefinition( | partition_set = PartitionSetDefinition( | ||||
name="{}_partitions".format(schedule_name), | name="{}_partitions".format(schedule_name), | ||||
pipeline_name=pipeline_name, | pipeline_name=pipeline_name, # type: ignore[arg-type] | ||||
run_config_fn_for_partition=lambda partition: fn(partition.value), | run_config_fn_for_partition=lambda partition: fn(partition.value), | ||||
solid_selection=solid_selection, | solid_selection=solid_selection, | ||||
tags_fn_for_partition=tags_fn_for_partition_value, | tags_fn_for_partition=tags_fn_for_partition_value, | ||||
mode=mode, | mode=mode, | ||||
partition_params=partition_params, | partitions=partitions, | ||||
) | ) | ||||
schedule_def = partition_set.create_schedule_definition( | schedule_def = partition_set.create_schedule_definition( | ||||
schedule_name, | schedule_name, | ||||
partition_params.get_cron_schedule(), | partitions.get_cron_schedule(), | ||||
should_execute=should_execute, | should_execute=should_execute, | ||||
environment_vars=environment_vars, | environment_vars=environment_vars, | ||||
partition_selector=create_offset_partition_selector( | partition_selector=create_offset_partition_selector( | ||||
execution_time_to_partition_fn=partition_params.get_execution_time_to_partition_fn(), | execution_time_to_partition_fn=partitions.get_execution_time_to_partition_fn(), | ||||
), | ), | ||||
execution_timezone=execution_timezone, | execution_timezone=execution_timezone, | ||||
description=description, | description=description, | ||||
decorated_fn=fn, | decorated_fn=fn, | ||||
job=job, | job=job, | ||||
) | ) | ||||
update_wrapper(schedule_def, wrapped=fn) | update_wrapper(schedule_def, wrapped=fn) | ||||
return schedule_def | return schedule_def | ||||
return inner | return inner |