Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/partition.py
from collections import namedtuple | from collections import namedtuple | ||||
from datetime import timedelta | from datetime import timedelta | ||||
import pendulum | |||||
from dateutil.relativedelta import relativedelta | from dateutil.relativedelta import relativedelta | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions.schedule import ScheduleDefinition, ScheduleExecutionContext | from dagster.core.definitions.schedule import ScheduleDefinition, ScheduleExecutionContext | ||||
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError | from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError | ||||
from dagster.core.instance import DagsterInstance | |||||
from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | from dagster.core.storage.pipeline_run import PipelineRun, PipelineRunStatus, PipelineRunsFilter | ||||
from dagster.core.storage.tags import check_tags | from dagster.core.storage.tags import check_tags | ||||
from dagster.seven import get_args | |||||
from dagster.utils import merge_dicts | from dagster.utils import merge_dicts | ||||
from dagster.utils.partitions import DEFAULT_DATE_FORMAT | from dagster.utils.partitions import DEFAULT_DATE_FORMAT | ||||
from .mode import DEFAULT_MODE_NAME | from .mode import DEFAULT_MODE_NAME | ||||
from .utils import check_for_invalid_name_and_warn | from .utils import check_for_invalid_name_and_warn | ||||
def by_name(partition): | def by_name(partition): | ||||
Show All 10 Lines | class Partition(namedtuple("_Partition", ("value name"))): | ||||
""" | """ | ||||
def __new__(cls, value=None, name=None): | def __new__(cls, value=None, name=None): | ||||
return super(Partition, cls).__new__( | return super(Partition, cls).__new__( | ||||
cls, name=check.opt_str_param(name, "name", str(value)), value=value | cls, name=check.opt_str_param(name, "name", str(value)), value=value | ||||
) | ) | ||||
def create_default_partition_selector_fn( | def create_default_partition_selector_fn(delta=timedelta(days=1), fmt=DEFAULT_DATE_FORMAT): | ||||
delta=timedelta(days=1), fmt=DEFAULT_DATE_FORMAT, partition_in_utc=False | |||||
): | |||||
check.inst_param(delta, "timedelta", (timedelta, relativedelta)) | check.inst_param(delta, "timedelta", (timedelta, relativedelta)) | ||||
check.str_param(fmt, "fmt") | check.str_param(fmt, "fmt") | ||||
def default_partition_selector(context, partition_set_def): | def default_partition_selector(context, partition_set_def): | ||||
check.inst_param(context, "context", ScheduleExecutionContext) | check.inst_param(context, "context", ScheduleExecutionContext) | ||||
check.inst_param(partition_set_def, "partition_set_def", PartitionSetDefinition) | check.inst_param(partition_set_def, "partition_set_def", PartitionSetDefinition) | ||||
if not context.scheduled_execution_time: | if not context.scheduled_execution_time: | ||||
return last_partition(context, partition_set_def) | return last_partition(context, partition_set_def) | ||||
# The tick at a given datetime corresponds to the time for the previous partition | # The tick at a given datetime corresponds to the time for the previous partition | ||||
# e.g. midnight on 12/31 is actually the 12/30 partition | # e.g. midnight on 12/31 is actually the 12/30 partition | ||||
if partition_in_utc: | |||||
partition_time = ( | |||||
pendulum.instance(context.scheduled_execution_time).in_tz("UTC") - delta | |||||
) | |||||
else: | |||||
partition_time = context.scheduled_execution_time - delta | partition_time = context.scheduled_execution_time - delta | ||||
partition_name = partition_time.strftime(fmt) | partition_name = partition_time.strftime(fmt) | ||||
if not partition_name in partition_set_def.get_partition_names(): | if not partition_name in partition_set_def.get_partition_names(context.instance): | ||||
return None | return None | ||||
return partition_set_def.get_partition(partition_name) | return partition_set_def.get_partition(partition_name, context.instance) | ||||
return default_partition_selector | return default_partition_selector | ||||
def last_partition(context, partition_set_def): | def last_partition(context, partition_set_def): | ||||
check.inst_param(context, "context", ScheduleExecutionContext) | check.inst_param(context, "context", ScheduleExecutionContext) | ||||
check.inst_param(partition_set_def, "partition_set_def", PartitionSetDefinition) | check.inst_param(partition_set_def, "partition_set_def", PartitionSetDefinition) | ||||
partitions = partition_set_def.get_partitions() | partitions = partition_set_def.get_partitions(context.instance) | ||||
if not partitions: | if not partitions: | ||||
return None | return None | ||||
return partitions[-1] | return partitions[-1] | ||||
def last_empty_partition(context, partition_set_def): | def last_empty_partition(context, partition_set_def): | ||||
check.inst_param(context, "context", ScheduleExecutionContext) | check.inst_param(context, "context", ScheduleExecutionContext) | ||||
partition_set_def = check.inst_param( | partition_set_def = check.inst_param( | ||||
partition_set_def, "partition_set_def", PartitionSetDefinition | partition_set_def, "partition_set_def", PartitionSetDefinition | ||||
) | ) | ||||
partitions = partition_set_def.get_partitions() | partitions = partition_set_def.get_partitions(context.instance) | ||||
if not partitions: | if not partitions: | ||||
return None | return None | ||||
selected = None | selected = None | ||||
for partition in reversed(partitions): | for partition in reversed(partitions): | ||||
filters = PipelineRunsFilter.for_partition(partition_set_def, partition) | filters = PipelineRunsFilter.for_partition(partition_set_def, partition) | ||||
matching = context.instance.get_runs(filters) | matching = context.instance.get_runs(filters) | ||||
if not any(run.status == PipelineRunStatus.SUCCESS for run in matching): | if not any(run.status == PipelineRunStatus.SUCCESS for run in matching): | ||||
selected = partition | selected = partition | ||||
break | break | ||||
return selected | return selected | ||||
def first_partition(context, partition_set_def=None): | def first_partition(context, partition_set_def=None): | ||||
check.inst_param(context, "context", ScheduleExecutionContext) | check.inst_param(context, "context", ScheduleExecutionContext) | ||||
partition_set_def = check.inst_param( | partition_set_def = check.inst_param( | ||||
partition_set_def, "partition_set_def", PartitionSetDefinition | partition_set_def, "partition_set_def", PartitionSetDefinition | ||||
) | ) | ||||
partitions = partition_set_def.get_partitions() | partitions = partition_set_def.get_partitions(context.instance) | ||||
if not partitions: | if not partitions: | ||||
return None | return None | ||||
return partitions[0] | return partitions[0] | ||||
class PartitionSetDefinition( | class PartitionSetDefinition( | ||||
namedtuple( | namedtuple( | ||||
Show All 37 Lines | ): | ||||
if isinstance(x, Partition): | if isinstance(x, Partition): | ||||
return x | return x | ||||
if isinstance(x, str): | if isinstance(x, str): | ||||
return Partition(x) | return Partition(x) | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
"Expected <Partition> | <str>, received {type}".format(type=type(x)) | "Expected <Partition> | <str>, received {type}".format(type=type(x)) | ||||
) | ) | ||||
# partition fn may take in an instance or no parameters | |||||
if len(get_args(partition_fn)) > 0: | |||||
wrapper_partition_fn = lambda instance: [ | |||||
_wrap(x) for x in check.callable_param(partition_fn, "partition_fn")(instance) | |||||
] | |||||
else: | |||||
wrapper_partition_fn = lambda instance: [ | |||||
_wrap(x) for x in check.callable_param(partition_fn, "partition_fn")() | |||||
] | |||||
return super(PartitionSetDefinition, cls).__new__( | return super(PartitionSetDefinition, cls).__new__( | ||||
cls, | cls, | ||||
name=check_for_invalid_name_and_warn(name), | name=check_for_invalid_name_and_warn(name), | ||||
pipeline_name=check.str_param(pipeline_name, "pipeline_name"), | pipeline_name=check.str_param(pipeline_name, "pipeline_name"), | ||||
partition_fn=lambda: [ | partition_fn=wrapper_partition_fn, | ||||
_wrap(x) for x in check.callable_param(partition_fn, "partition_fn")() | |||||
], | |||||
solid_selection=check.opt_nullable_list_param( | solid_selection=check.opt_nullable_list_param( | ||||
solid_selection, "solid_selection", of_type=str | solid_selection, "solid_selection", of_type=str | ||||
), | ), | ||||
mode=check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME), | mode=check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME), | ||||
user_defined_run_config_fn_for_partition=check.callable_param( | user_defined_run_config_fn_for_partition=check.callable_param( | ||||
run_config_fn_for_partition, "run_config_fn_for_partition" | run_config_fn_for_partition, "run_config_fn_for_partition" | ||||
), | ), | ||||
user_defined_tags_fn_for_partition=check.callable_param( | user_defined_tags_fn_for_partition=check.callable_param( | ||||
tags_fn_for_partition, "tags_fn_for_partition" | tags_fn_for_partition, "tags_fn_for_partition" | ||||
), | ), | ||||
) | ) | ||||
def run_config_for_partition(self, partition): | def run_config_for_partition(self, partition): | ||||
return self.user_defined_run_config_fn_for_partition(partition) | return self.user_defined_run_config_fn_for_partition(partition) | ||||
def tags_for_partition(self, partition): | def tags_for_partition(self, partition): | ||||
user_tags = self.user_defined_tags_fn_for_partition(partition) | user_tags = self.user_defined_tags_fn_for_partition(partition) | ||||
check_tags(user_tags, "user_tags") | check_tags(user_tags, "user_tags") | ||||
tags = merge_dicts(user_tags, PipelineRun.tags_for_partition_set(self, partition)) | tags = merge_dicts(user_tags, PipelineRun.tags_for_partition_set(self, partition)) | ||||
return tags | return tags | ||||
def get_partitions(self): | def get_partitions(self, instance): | ||||
return self.partition_fn() | check.inst_param(instance, "instance", DagsterInstance) | ||||
return self.partition_fn(instance) | |||||
def get_partition(self, name): | def get_partition(self, name, instance): | ||||
for partition in self.get_partitions(): | for partition in self.get_partitions(instance): | ||||
if partition.name == name: | if partition.name == name: | ||||
return partition | return partition | ||||
check.failed("Partition name {} not found!".format(name)) | check.failed("Partition name {} not found!".format(name)) | ||||
def get_partition_names(self): | def get_partition_names(self, instance): | ||||
return [part.name for part in self.get_partitions()] | return [part.name for part in self.get_partitions(instance)] | ||||
def create_schedule_definition( | def create_schedule_definition( | ||||
self, | self, | ||||
schedule_name, | schedule_name, | ||||
cron_schedule, | cron_schedule, | ||||
should_execute=None, | should_execute=None, | ||||
partition_selector=last_partition, | partition_selector=last_partition, | ||||
environment_vars=None, | environment_vars=None, | ||||
Show All 23 Lines | ): | ||||
check.opt_dict_param(environment_vars, "environment_vars", key_type=str, value_type=str) | check.opt_dict_param(environment_vars, "environment_vars", key_type=str, value_type=str) | ||||
check.callable_param(partition_selector, "partition_selector") | check.callable_param(partition_selector, "partition_selector") | ||||
check.opt_str_param(execution_timezone, "execution_timezone") | check.opt_str_param(execution_timezone, "execution_timezone") | ||||
def _should_execute_wrapper(context): | def _should_execute_wrapper(context): | ||||
check.inst_param(context, "context", ScheduleExecutionContext) | check.inst_param(context, "context", ScheduleExecutionContext) | ||||
selected_partition = partition_selector(context, self) | selected_partition = partition_selector(context, self) | ||||
if not selected_partition or not selected_partition.name in self.get_partition_names(): | if not selected_partition or not selected_partition.name in self.get_partition_names( | ||||
context.instance | |||||
): | |||||
return False | return False | ||||
elif not should_execute: | elif not should_execute: | ||||
return True | return True | ||||
else: | else: | ||||
return should_execute(context) | return should_execute(context) | ||||
def _run_config_fn_wrapper(context): | def _run_config_fn_wrapper(context): | ||||
check.inst_param(context, "context", ScheduleExecutionContext) | check.inst_param(context, "context", ScheduleExecutionContext) | ||||
selected_partition = partition_selector(context, self) | selected_partition = partition_selector(context, self) | ||||
if not selected_partition or not selected_partition.name in self.get_partition_names(): | if not selected_partition or not selected_partition.name in self.get_partition_names( | ||||
context.instance | |||||
): | |||||
raise DagsterInvariantViolationError( | raise DagsterInvariantViolationError( | ||||
"The partition selection function `{selector}` did not return " | "The partition selection function `{selector}` did not return " | ||||
"a partition from PartitionSet {partition_set}".format( | "a partition from PartitionSet {partition_set}".format( | ||||
selector=getattr(partition_selector, "__name__", repr(partition_selector)), | selector=getattr(partition_selector, "__name__", repr(partition_selector)), | ||||
partition_set=self.name, | partition_set=self.name, | ||||
) | ) | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 66 Lines • Show Last 20 Lines |