Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/utils/partitions.py
import datetime | import datetime | ||||
import warnings | import warnings | ||||
import pendulum | import pendulum | ||||
from dateutil.relativedelta import relativedelta | from dateutil.relativedelta import relativedelta | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.errors import DagsterInvariantViolationError | from dagster.core.errors import DagsterInvariantViolationError | ||||
from dagster.seven import get_utc_timezone | |||||
DEFAULT_MONTHLY_FORMAT = "%Y-%m" | DEFAULT_MONTHLY_FORMAT = "%Y-%m" | ||||
DEFAULT_DATE_FORMAT = "%Y-%m-%d" | DEFAULT_DATE_FORMAT = "%Y-%m-%d" | ||||
DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE = "%Y-%m-%d-%H:%M" | DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE = "%Y-%m-%d-%H:%M" | ||||
DEFAULT_HOURLY_FORMAT_WITH_TIMEZONE = DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE + "%z" | DEFAULT_HOURLY_FORMAT_WITH_TIMEZONE = DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE + "%z" | ||||
# Remove this when the 'delta' param to date_partition_range is removed | # Remove this when the 'delta' param to date_partition_range is removed | ||||
Show All 30 Lines | |||||
def date_partition_range( | def date_partition_range( | ||||
start, end=None, delta=None, delta_range="days", fmt=None, inclusive=False, timezone=None, | start, end=None, delta=None, delta_range="days", fmt=None, inclusive=False, timezone=None, | ||||
): | ): | ||||
""" Utility function that returns a partition generating function to be used in creating a | """ Utility function that returns a partition generating function to be used in creating a | ||||
`PartitionSet` definition. | `PartitionSet` definition. | ||||
Args: | Args: | ||||
start (datetime): Datetime capturing the start of the time range. | start (datetime): Datetime capturing the start of the time range. If no timezone is | ||||
specified, this will be interpreted as UTC. | |||||
end (Optional(datetime)): Datetime capturing the end of the partition. By default, the | end (Optional(datetime)): Datetime capturing the end of the partition. By default, the | ||||
current time is used. The range is not inclusive of the end | current time is used. The range is not inclusive of the end value. If no timezone is | ||||
value. | specified, this will be interpreted as UTC. | ||||
delta (Optional(timedelta)): Timedelta representing the time duration of each partition. | delta (Optional(timedelta)): Timedelta representing the time duration of each partition. | ||||
dgibson: This is the desired end state and where i'm planning that we get to in 0.10.0 (see https… | |||||
DEPRECATED: use 'delta_range' instead, which handles timezone transitions correctly. | DEPRECATED: use 'delta_range' instead, which handles timezone transitions correctly. | ||||
delta_range (Optional(str)): string representing the time duration of each partition. | delta_range (Optional(str)): string representing the time duration of each partition. | ||||
Must be a valid argument to pendulum.period.range ("days", "hours", "months", etc.). | Must be a valid argument to pendulum.period.range ("days", "hours", "months", etc.). | ||||
fmt (Optional(str)): Format string to represent each partition by its start time | fmt (Optional(str)): Format string to represent each partition by its start time | ||||
inclusive (Optional(bool)): By default, the partition set only contains date interval | inclusive (Optional(bool)): By default, the partition set only contains date interval | ||||
partitions for which the end time of the interval is less than current time. In other | partitions for which the end time of the interval is less than current time. In other | ||||
words, the partition set contains date interval partitions that are completely in the | words, the partition set contains date interval partitions that are completely in the | ||||
past. If inclusive is set to True, then the partition set will include all date | past. If inclusive is set to True, then the partition set will include all date | ||||
Show All 21 Lines | if delta: | ||||
"passed in delta=timedelta(days=1), pass in delta_range='days' instead. The 'delta' " | "passed in delta=timedelta(days=1), pass in delta_range='days' instead. The 'delta' " | ||||
"argument will be removed in the dagster 0.10.0 release." | "argument will be removed in the dagster 0.10.0 release." | ||||
) | ) | ||||
delta_range, delta_amount = _delta_to_delta_range(delta) | delta_range, delta_amount = _delta_to_delta_range(delta) | ||||
else: | else: | ||||
check.invariant(delta_range, "Must include either a 'delta' or 'delta_range' parameter") | check.invariant(delta_range, "Must include either a 'delta' or 'delta_range' parameter") | ||||
delta_amount = 1 | delta_amount = 1 | ||||
if start.tzinfo is None: | |||||
start.replace(tzinfo=get_utc_timezone()) | |||||
if end and end.tzinfo is None: | |||||
end.replace(tzinfo=get_utc_timezone()) | |||||
if end and start > end: | if end and start > end: | ||||
raise DagsterInvariantViolationError( | raise DagsterInvariantViolationError( | ||||
'Selected date range start "{start}" is after date range end "{end}'.format( | 'Selected date range start "{start}" is after date range end "{end}'.format( | ||||
start=start.strftime(fmt), end=end.strftime(fmt), | start=start.strftime(fmt), end=end.strftime(fmt), | ||||
) | ) | ||||
) | ) | ||||
def get_date_range_partitions(): | def get_date_range_partitions(): | ||||
tz = timezone if timezone else pendulum.now().timezone.name | tz = timezone if timezone else pendulum.now().timezone.name | ||||
_start = ( | _start = ( | ||||
start.in_tz(tz) | start.in_tz(tz) | ||||
if isinstance(start, pendulum.Pendulum) | if isinstance(start, pendulum.Pendulum) | ||||
else pendulum.instance(start, tz=tz) | else pendulum.instance(start, tz=tz) | ||||
) | ) | ||||
if not end: | if not end: | ||||
_end = pendulum.now(tz) | _end = pendulum.now(tz) | ||||
Show All 20 Lines |
This is the desired end state and where i'm planning that we get to in 0.10.0 (see https://github.com/dagster-io/dagster/issues/3128 which tracks adding a requirement that you specify a timezone), but changing it right now could break existing users' schedules, if they're assuming that the datetime they specify is in the system timezone
I think it's also not true as currently written? on line 112 we still set the timezone to the system timezone if one isn't currently set (via pendulum.now().timezone.name)