Differential D8612 Diff 40567 python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/core_tests/partition_tests/test_partition.py
from datetime import datetime, time | from datetime import datetime, time | ||||
from typing import Callable, List, Optional | from typing import Callable, List, Optional | ||||
import pendulum | import pendulum | ||||
import pytest | import pytest | ||||
from dagster.check import CheckError | from dagster.check import CheckError | ||||
from dagster.core.definitions.partition import ( | from dagster.core.definitions.partition import ( | ||||
DynamicPartitionParams, | DynamicPartitions, | ||||
Partition, | Partition, | ||||
ScheduleType, | ScheduleType, | ||||
StaticPartitionParams, | StaticPartitions, | ||||
TimeBasedPartitionParams, | TimeBasedPartitions, | ||||
) | ) | ||||
from dagster.seven.compat.pendulum import create_pendulum_time | from dagster.seven.compat.pendulum import create_pendulum_time | ||||
from dagster.utils.partitions import DEFAULT_HOURLY_FORMAT_WITH_TIMEZONE | from dagster.utils.partitions import DEFAULT_HOURLY_FORMAT_WITH_TIMEZONE | ||||
def assert_expected_partitions( | def assert_expected_partitions( | ||||
generated_partitions: List[Partition], expected_partitions: List[str] | generated_partitions: List[Partition], expected_partitions: List[str] | ||||
): | ): | ||||
assert all( | assert all( | ||||
isinstance(generated_partition, Partition) for generated_partition in generated_partitions | isinstance(generated_partition, Partition) for generated_partition in generated_partitions | ||||
) | ) | ||||
assert len(generated_partitions) == len(expected_partitions) | assert len(generated_partitions) == len(expected_partitions) | ||||
for generated_partition, expected_partition_name in zip( | for generated_partition, expected_partition_name in zip( | ||||
generated_partitions, expected_partitions | generated_partitions, expected_partitions | ||||
): | ): | ||||
assert generated_partition.name == expected_partition_name | assert generated_partition.name == expected_partition_name | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
argnames=["partitions"], | argnames=["partitions"], | ||||
argvalues=[([Partition("a_partition")],), ([Partition(x) for x in range(10)],)], | argvalues=[([Partition("a_partition")],), ([Partition(x) for x in range(10)],)], | ||||
) | ) | ||||
def test_static_partition_params(partitions: List[Partition]): | def test_static_partitions(partitions: List[Partition]): | ||||
partition_params = StaticPartitionParams(partitions) | partitions = StaticPartitions(partitions) | ||||
assert partition_params.get_partitions() == partitions | assert partitions.get_partitions() == partitions | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
argnames=["schedule_type", "start", "execution_day", "end", "error_message_regex"], | argnames=["schedule_type", "start", "execution_day", "end", "error_message_regex"], | ||||
ids=[ | ids=[ | ||||
"start should be before end", | "start should be before end", | ||||
"hourly partitions, execution day should not be provided", | "hourly partitions, execution day should not be provided", | ||||
"daily partitions, execution day should not be provided", | "daily partitions, execution day should not be provided", | ||||
Show All 33 Lines | argvalues=[ | ||||
ScheduleType.MONTHLY, | ScheduleType.MONTHLY, | ||||
datetime(year=2021, month=1, day=1), | datetime(year=2021, month=1, day=1), | ||||
0, | 0, | ||||
datetime(year=2021, month=2, day=1), | datetime(year=2021, month=2, day=1), | ||||
"Execution day .* must be between 1 and 31", | "Execution day .* must be between 1 and 31", | ||||
), | ), | ||||
], | ], | ||||
) | ) | ||||
def test_time_based_partition_params_invariants( | def test_time_based_partitions_invariants( | ||||
schedule_type: ScheduleType, | schedule_type: ScheduleType, | ||||
start: datetime, | start: datetime, | ||||
execution_day: Optional[int], | execution_day: Optional[int], | ||||
end: Optional[datetime], | end: Optional[datetime], | ||||
error_message_regex: str, | error_message_regex: str, | ||||
): | ): | ||||
with pytest.raises(CheckError, match=error_message_regex): | with pytest.raises(CheckError, match=error_message_regex): | ||||
TimeBasedPartitionParams( | TimeBasedPartitions( | ||||
schedule_type=schedule_type, | schedule_type=schedule_type, | ||||
start=start, | start=start, | ||||
execution_day=execution_day, | execution_day=execution_day, | ||||
execution_time=None, | execution_time=None, | ||||
end=end, | end=end, | ||||
fmt=None, | fmt=None, | ||||
timezone=None, | timezone=None, | ||||
offset=None, | offset=None, | ||||
▲ Show 20 Lines • Show All 116 Lines • ▼ Show 20 Lines | argvalues=[ | ||||
time(1, 20), | time(1, 20), | ||||
datetime(year=2021, month=3, day=3), | datetime(year=2021, month=3, day=3), | ||||
0, | 0, | ||||
create_pendulum_time(2021, 3, 3, 1, 20), | create_pendulum_time(2021, 3, 3, 1, 20), | ||||
["2021-02-28", "2021-03-01", "2021-03-02", "2021-03-03"], | ["2021-02-28", "2021-03-01", "2021-03-02", "2021-03-03"], | ||||
), | ), | ||||
], | ], | ||||
) | ) | ||||
def test_time_partition_params_daily_partitions( | def test_time_partitions_daily_partitions( | ||||
start: datetime, | start: datetime, | ||||
execution_time: time, | execution_time: time, | ||||
end: datetime, | end: datetime, | ||||
partition_days_offset: Optional[int], | partition_days_offset: Optional[int], | ||||
current_time, | current_time, | ||||
expected_partitions: List[str], | expected_partitions: List[str], | ||||
): | ): | ||||
with pendulum.test(current_time): | with pendulum.test(current_time): | ||||
partition_params = TimeBasedPartitionParams( | partitions = TimeBasedPartitions( | ||||
schedule_type=ScheduleType.DAILY, | schedule_type=ScheduleType.DAILY, | ||||
start=start, | start=start, | ||||
execution_time=execution_time, | execution_time=execution_time, | ||||
end=end, | end=end, | ||||
offset=partition_days_offset, | offset=partition_days_offset, | ||||
) | ) | ||||
assert_expected_partitions(partition_params.get_partitions(), expected_partitions) | assert_expected_partitions(partitions.get_partitions(), expected_partitions) | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
argnames=[ | argnames=[ | ||||
"start", | "start", | ||||
"end", | "end", | ||||
"partition_months_offset", | "partition_months_offset", | ||||
"current_time", | "current_time", | ||||
▲ Show 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | argvalues=[ | ||||
datetime(year=2021, month=1, day=3), | datetime(year=2021, month=1, day=3), | ||||
datetime(year=2021, month=1, day=31), | datetime(year=2021, month=1, day=31), | ||||
0, | 0, | ||||
create_pendulum_time(2021, 1, 31), | create_pendulum_time(2021, 1, 31), | ||||
[], | [], | ||||
), | ), | ||||
], | ], | ||||
) | ) | ||||
def test_time_partition_params_monthly_partitions( | def test_time_partitions_monthly_partitions( | ||||
start: datetime, | start: datetime, | ||||
end: datetime, | end: datetime, | ||||
partition_months_offset: Optional[int], | partition_months_offset: Optional[int], | ||||
current_time, | current_time, | ||||
expected_partitions: List[str], | expected_partitions: List[str], | ||||
): | ): | ||||
with pendulum.test(current_time): | with pendulum.test(current_time): | ||||
partition_params = TimeBasedPartitionParams( | partitions = TimeBasedPartitions( | ||||
schedule_type=ScheduleType.MONTHLY, | schedule_type=ScheduleType.MONTHLY, | ||||
start=start, | start=start, | ||||
execution_time=time(1, 20), | execution_time=time(1, 20), | ||||
execution_day=1, | execution_day=1, | ||||
end=end, | end=end, | ||||
offset=partition_months_offset, | offset=partition_months_offset, | ||||
) | ) | ||||
assert_expected_partitions(partition_params.get_partitions(), expected_partitions) | assert_expected_partitions(partitions.get_partitions(), expected_partitions) | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
argnames=[ | argnames=[ | ||||
"start", | "start", | ||||
"end", | "end", | ||||
"partition_weeks_offset", | "partition_weeks_offset", | ||||
"current_time", | "current_time", | ||||
▲ Show 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | argvalues=[ | ||||
datetime(year=2021, month=1, day=4), | datetime(year=2021, month=1, day=4), | ||||
datetime(year=2021, month=1, day=9), | datetime(year=2021, month=1, day=9), | ||||
0, | 0, | ||||
create_pendulum_time(2021, 1, 9), | create_pendulum_time(2021, 1, 9), | ||||
[], | [], | ||||
), | ), | ||||
], | ], | ||||
) | ) | ||||
def test_time_partition_params_weekly_partitions( | def test_time_partitions_weekly_partitions( | ||||
start: datetime, | start: datetime, | ||||
end: datetime, | end: datetime, | ||||
partition_weeks_offset: Optional[int], | partition_weeks_offset: Optional[int], | ||||
current_time, | current_time, | ||||
expected_partitions: List[str], | expected_partitions: List[str], | ||||
): | ): | ||||
with pendulum.test(current_time): | with pendulum.test(current_time): | ||||
partition_params = TimeBasedPartitionParams( | partitions = TimeBasedPartitions( | ||||
schedule_type=ScheduleType.WEEKLY, | schedule_type=ScheduleType.WEEKLY, | ||||
start=start, | start=start, | ||||
execution_time=time(1, 20), | execution_time=time(1, 20), | ||||
execution_day=0, | execution_day=0, | ||||
end=end, | end=end, | ||||
offset=partition_weeks_offset, | offset=partition_weeks_offset, | ||||
) | ) | ||||
assert_expected_partitions(partition_params.get_partitions(), expected_partitions) | assert_expected_partitions(partitions.get_partitions(), expected_partitions) | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
argnames=[ | argnames=[ | ||||
"start", | "start", | ||||
"end", | "end", | ||||
"timezone", | "timezone", | ||||
"partition_hours_offset", | "partition_hours_offset", | ||||
▲ Show 20 Lines • Show All 170 Lines • ▼ Show 20 Lines | argvalues=[ | ||||
"2021-11-07-01:00-0600", | "2021-11-07-01:00-0600", | ||||
"2021-11-07-02:00-0600", | "2021-11-07-02:00-0600", | ||||
"2021-11-07-03:00-0600", | "2021-11-07-03:00-0600", | ||||
"2021-11-07-04:00-0600", | "2021-11-07-04:00-0600", | ||||
], | ], | ||||
), | ), | ||||
], | ], | ||||
) | ) | ||||
def test_time_partition_params_hourly_partitions( | def test_time_partitions_hourly_partitions( | ||||
start: datetime, | start: datetime, | ||||
end: datetime, | end: datetime, | ||||
timezone: Optional[str], | timezone: Optional[str], | ||||
partition_hours_offset: int, | partition_hours_offset: int, | ||||
current_time, | current_time, | ||||
expected_partitions: List[str], | expected_partitions: List[str], | ||||
): | ): | ||||
with pendulum.test(current_time): | with pendulum.test(current_time): | ||||
partition_params = TimeBasedPartitionParams( | partitions = TimeBasedPartitions( | ||||
schedule_type=ScheduleType.HOURLY, | schedule_type=ScheduleType.HOURLY, | ||||
start=start, | start=start, | ||||
execution_time=time(0, 1), | execution_time=time(0, 1), | ||||
end=end, | end=end, | ||||
timezone=timezone, | timezone=timezone, | ||||
fmt=DEFAULT_HOURLY_FORMAT_WITH_TIMEZONE, | fmt=DEFAULT_HOURLY_FORMAT_WITH_TIMEZONE, | ||||
offset=partition_hours_offset, | offset=partition_hours_offset, | ||||
) | ) | ||||
assert_expected_partitions(partition_params.get_partitions(), expected_partitions) | assert_expected_partitions(partitions.get_partitions(), expected_partitions) | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
argnames=["partition_fn"], | argnames=["partition_fn"], | ||||
argvalues=[ | argvalues=[ | ||||
(lambda _current_time: [Partition("a_partition")],), | (lambda _current_time: [Partition("a_partition")],), | ||||
(lambda _current_time: [Partition(x) for x in range(10)],), | (lambda _current_time: [Partition(x) for x in range(10)],), | ||||
], | ], | ||||
) | ) | ||||
def test_dynamic_partitions(partition_fn: Callable[[Optional[datetime]], List[Partition]]): | def test_dynamic_partitions(partition_fn: Callable[[Optional[datetime]], List[Partition]]): | ||||
partition_params = DynamicPartitionParams(partition_fn) | partitions = DynamicPartitions(partition_fn) | ||||
assert partition_params.get_partitions() == partition_fn(None) | assert [(p.name, p.value) for p in partitions.get_partitions()] == [ | ||||
(p.name, p.value) for p in partition_fn(None) | |||||
] |