Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-test/dagster_test/toys/schedules.py
Show All 23 Lines | def backfilling_partition_selector( | ||||
context: ScheduleExecutionContext, partition_set_def: PartitionSetDefinition, retry_failed=False | context: ScheduleExecutionContext, partition_set_def: PartitionSetDefinition, retry_failed=False | ||||
): | ): | ||||
status_filters = [PipelineRunStatus.SUCCESS] if retry_failed else None | status_filters = [PipelineRunStatus.SUCCESS] if retry_failed else None | ||||
runs_by_partition = _fetch_runs_by_partition( | runs_by_partition = _fetch_runs_by_partition( | ||||
context.instance, partition_set_def, status_filters | context.instance, partition_set_def, status_filters | ||||
) | ) | ||||
selected = None | selected = None | ||||
for partition in partition_set_def.get_partitions(): | for partition in partition_set_def.get_partitions(context.instance): | ||||
runs = runs_by_partition[partition.name] | runs = runs_by_partition[partition.name] | ||||
selected = partition | selected = partition | ||||
# break when we find the first empty partition | # break when we find the first empty partition | ||||
if len(runs) == 0: | if len(runs) == 0: | ||||
break | break | ||||
Show All 9 Lines | runs_by_partition = _fetch_runs_by_partition( | ||||
context.instance, partition_set_def, status_filters | context.instance, partition_set_def, status_filters | ||||
) | ) | ||||
for runs in runs_by_partition.values(): | for runs in runs_by_partition.values(): | ||||
for run in runs: | for run in runs: | ||||
# if any active runs - don't start a new one | # if any active runs - don't start a new one | ||||
if run.status == PipelineRunStatus.STARTED: | if run.status == PipelineRunStatus.STARTED: | ||||
return False # would be nice to return a reason here | return False # would be nice to return a reason here | ||||
available_partitions = set([partition.name for partition in partition_set_def.get_partitions()]) | available_partitions = set( | ||||
[partition.name for partition in partition_set_def.get_partitions(context.instance)] | |||||
) | |||||
satisfied_partitions = set(runs_by_partition.keys()) | satisfied_partitions = set(runs_by_partition.keys()) | ||||
is_remaining_partitions = bool(available_partitions.difference(satisfied_partitions)) | is_remaining_partitions = bool(available_partitions.difference(satisfied_partitions)) | ||||
return is_remaining_partitions | return is_remaining_partitions | ||||
def backfill_test_schedule(): | def backfill_test_schedule(): | ||||
schedule_name = "backfill_unreliable_weekly" | schedule_name = "backfill_unreliable_weekly" | ||||
# create weekly partition set | # create weekly partition set | ||||
▲ Show 20 Lines • Show All 115 Lines • Show Last 20 Lines |