Differential D4761 Diff 23634 python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_decorators.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/core_tests/definitions_tests/test_decorators.py
Show First 20 Lines • Show All 451 Lines • ▼ Show 20 Lines | ): | ||||
pipeline_name="foo_pipeline", | pipeline_name="foo_pipeline", | ||||
start_date=datetime(year=2019, month=1, day=1), | start_date=datetime(year=2019, month=1, day=1), | ||||
execution_timezone="MadeUpTimeZone", | execution_timezone="MadeUpTimeZone", | ||||
) | ) | ||||
def invalid_timezone_foo_schedule(): | def invalid_timezone_foo_schedule(): | ||||
return {} | return {} | ||||
def _check_partitions(partition_schedule_def, expected_num_partitions, expected_relative_delta): | def _check_partitions( | ||||
partitions = partition_schedule_def.get_partition_set().partition_fn() | instance, partition_schedule_def, expected_num_partitions, expected_relative_delta | ||||
): | |||||
partitions = partition_schedule_def.get_partition_set().partition_fn(instance) | |||||
assert len(partitions) == expected_num_partitions | assert len(partitions) == expected_num_partitions | ||||
for index, partition in enumerate(partitions): | for index, partition in enumerate(partitions): | ||||
assert partition.value == partitions[0].value + (index * expected_relative_delta) | assert partition.value == partitions[0].value + (index * expected_relative_delta) | ||||
@freeze_time("2019-02-27 00:01:01") | @freeze_time("2019-02-27 00:01:01") | ||||
def test_partitions_for_hourly_schedule_decorators(): | def test_partitions_for_hourly_schedule_decorators(): | ||||
with instance_for_test() as instance: | with instance_for_test() as instance: | ||||
context_without_time = ScheduleExecutionContext(instance, None) | context_without_time = ScheduleExecutionContext(instance, None) | ||||
@hourly_schedule( | @hourly_schedule( | ||||
pipeline_name="foo_pipeline", | pipeline_name="foo_pipeline", | ||||
start_date=datetime(year=2019, month=1, day=1, minute=1), | start_date=datetime(year=2019, month=1, day=1, minute=1), | ||||
execution_time=time(hour=0, minute=25), | execution_time=time(hour=0, minute=25), | ||||
) | ) | ||||
def hourly_foo_schedule(hourly_time): | def hourly_foo_schedule(hourly_time): | ||||
return {"hourly_time": hourly_time.isoformat()} | return {"hourly_time": hourly_time.isoformat()} | ||||
_check_partitions(hourly_foo_schedule, 24 * (31 + 26), relativedelta(hours=1)) | _check_partitions(instance, hourly_foo_schedule, 24 * (31 + 26), relativedelta(hours=1)) | ||||
assert hourly_foo_schedule.get_run_config(context_without_time) == { | assert hourly_foo_schedule.get_run_config(context_without_time) == { | ||||
"hourly_time": datetime(year=2019, month=2, day=26, hour=23, minute=1).isoformat() | "hourly_time": datetime(year=2019, month=2, day=26, hour=23, minute=1).isoformat() | ||||
} | } | ||||
assert hourly_foo_schedule.should_execute(context_without_time) | assert hourly_foo_schedule.should_execute(context_without_time) | ||||
# time that's invalid since it corresponds to a partition that hasn't happened yet | # time that's invalid since it corresponds to a partition that hasn't happened yet | ||||
# should not execute and should throw if it tries to generate run config | # should not execute and should throw if it tries to generate run config | ||||
Show All 31 Lines | with instance_for_test() as instance: | ||||
@daily_schedule( | @daily_schedule( | ||||
pipeline_name="foo_pipeline", | pipeline_name="foo_pipeline", | ||||
start_date=datetime(year=2019, month=1, day=1), | start_date=datetime(year=2019, month=1, day=1), | ||||
execution_time=time(hour=9, minute=30), | execution_time=time(hour=9, minute=30), | ||||
) | ) | ||||
def daily_foo_schedule(daily_time): | def daily_foo_schedule(daily_time): | ||||
return {"daily_time": daily_time.isoformat()} | return {"daily_time": daily_time.isoformat()} | ||||
_check_partitions(daily_foo_schedule, (31 + 26), relativedelta(days=1)) | _check_partitions(instance, daily_foo_schedule, (31 + 26), relativedelta(days=1)) | ||||
valid_daily_time = datetime(year=2019, month=1, day=27, hour=9, minute=30) | valid_daily_time = datetime(year=2019, month=1, day=27, hour=9, minute=30) | ||||
context_with_valid_time = ScheduleExecutionContext(instance, valid_daily_time) | context_with_valid_time = ScheduleExecutionContext(instance, valid_daily_time) | ||||
assert daily_foo_schedule.get_run_config(context_with_valid_time) == { | assert daily_foo_schedule.get_run_config(context_with_valid_time) == { | ||||
"daily_time": datetime(year=2019, month=1, day=26).isoformat() | "daily_time": datetime(year=2019, month=1, day=26).isoformat() | ||||
} | } | ||||
assert daily_foo_schedule.should_execute(context_with_valid_time) | assert daily_foo_schedule.should_execute(context_with_valid_time) | ||||
Show All 27 Lines | with instance_for_test() as instance: | ||||
assert weekly_foo_schedule.should_execute(context_with_valid_time) | assert weekly_foo_schedule.should_execute(context_with_valid_time) | ||||
assert weekly_foo_schedule.get_run_config(context_without_time) == { | assert weekly_foo_schedule.get_run_config(context_without_time) == { | ||||
"weekly_time": datetime(year=2019, month=2, day=19).isoformat() | "weekly_time": datetime(year=2019, month=2, day=19).isoformat() | ||||
} | } | ||||
assert weekly_foo_schedule.should_execute(context_without_time) | assert weekly_foo_schedule.should_execute(context_without_time) | ||||
_check_partitions(weekly_foo_schedule, 8, relativedelta(weeks=1)) | _check_partitions(instance, weekly_foo_schedule, 8, relativedelta(weeks=1)) | ||||
@freeze_time("2019-02-27 00:01:01") | @freeze_time("2019-02-27 00:01:01") | ||||
def test_partitions_for_monthly_schedule_decorators(): | def test_partitions_for_monthly_schedule_decorators(): | ||||
with instance_for_test() as instance: | with instance_for_test() as instance: | ||||
context_without_time = ScheduleExecutionContext(instance, None) | context_without_time = ScheduleExecutionContext(instance, None) | ||||
@monthly_schedule( | @monthly_schedule( | ||||
Show All 13 Lines | with instance_for_test() as instance: | ||||
) == {"monthly_time": datetime(year=2019, month=1, day=1).isoformat()} | ) == {"monthly_time": datetime(year=2019, month=1, day=1).isoformat()} | ||||
assert monthly_foo_schedule.should_execute(context_with_valid_time) | assert monthly_foo_schedule.should_execute(context_with_valid_time) | ||||
assert monthly_foo_schedule.get_run_config(context_without_time) == { | assert monthly_foo_schedule.get_run_config(context_without_time) == { | ||||
"monthly_time": datetime(year=2019, month=1, day=1).isoformat() | "monthly_time": datetime(year=2019, month=1, day=1).isoformat() | ||||
} | } | ||||
assert monthly_foo_schedule.should_execute(context_without_time) | assert monthly_foo_schedule.should_execute(context_without_time) | ||||
_check_partitions(monthly_foo_schedule, 1, relativedelta(months=1)) | _check_partitions(instance, monthly_foo_schedule, 1, relativedelta(months=1)) | ||||
def test_schedule_decorators_bad(): | def test_schedule_decorators_bad(): | ||||
@solid | @solid | ||||
def do_nothing(_): | def do_nothing(_): | ||||
pass | pass | ||||
@pipeline | @pipeline | ||||
▲ Show 20 Lines • Show All 166 Lines • Show Last 20 Lines |