Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/partition.py
import inspect | import inspect | ||||
from abc import ABC, abstractmethod | from abc import ABC, abstractmethod | ||||
from collections import namedtuple | from collections import namedtuple | ||||
from datetime import datetime, time | from datetime import datetime, time | ||||
from enum import Enum | from enum import Enum | ||||
from typing import Callable, List, NamedTuple, Optional, cast | from typing import Any, Callable, Dict, Generic, List, NamedTuple, Optional, TypeVar, cast | ||||
import pendulum | import pendulum | ||||
from dagster import check | from dagster import check | ||||
from ...seven.compat.pendulum import PendulumDateTime, to_timezone | from ...seven.compat.pendulum import PendulumDateTime, to_timezone | ||||
from ...utils import frozenlist, merge_dicts | from ...utils import frozenlist, merge_dicts | ||||
from ...utils.schedules import schedule_execution_time_iterator | from ...utils.schedules import schedule_execution_time_iterator | ||||
from ..decorator_utils import get_function_params | from ..decorator_utils import get_function_params | ||||
from ..errors import ( | from ..errors import ( | ||||
DagsterInvalidDefinitionError, | DagsterInvalidDefinitionError, | ||||
DagsterInvalidInvocationError, | DagsterInvalidInvocationError, | ||||
DagsterInvariantViolationError, | DagsterInvariantViolationError, | ||||
ScheduleExecutionError, | ScheduleExecutionError, | ||||
user_code_error_boundary, | user_code_error_boundary, | ||||
) | ) | ||||
from ..storage.pipeline_run import PipelineRun | from ..storage.pipeline_run import PipelineRun | ||||
from ..storage.tags import check_tags | from ..storage.tags import check_tags | ||||
from .mode import DEFAULT_MODE_NAME | from .mode import DEFAULT_MODE_NAME | ||||
from .run_request import RunRequest, SkipReason | from .run_request import RunRequest, SkipReason | ||||
from .schedule import ScheduleDefinition, ScheduleEvaluationContext | from .schedule import ScheduleDefinition, ScheduleEvaluationContext | ||||
from .utils import check_valid_name | from .utils import check_valid_name | ||||
DEFAULT_DATE_FORMAT = "%Y-%m-%d" | DEFAULT_DATE_FORMAT = "%Y-%m-%d" | ||||
T = TypeVar("T") | |||||
class Partition(namedtuple("_Partition", ("value name"))): | |||||
class Partition(Generic[T]): | |||||
""" | """ | ||||
Partition is the representation of a logical slice across an axis of a pipeline's work | Partition is the representation of a logical slice across an axis of a pipeline's work | ||||
Args: | Args: | ||||
value (Any): The object for this partition | value (Any): The object for this partition | ||||
name (str): Name for this partition | name (str): Name for this partition | ||||
""" | """ | ||||
def __new__(cls, value=None, name=None): | def __init__(self, value: T, name: Optional[str] = None): | ||||
return super(Partition, cls).__new__( | self._value = value | ||||
cls, name=check.opt_str_param(name, "name", str(value)), value=value | self._name = cast(str, check.opt_str_param(name, "name", str(value))) | ||||
) | |||||
@property | |||||
def value(self) -> T: | |||||
return self._value | |||||
@property | |||||
def name(self) -> str: | |||||
return self._name | |||||
def schedule_partition_range( | def schedule_partition_range( | ||||
start, | start: datetime, | ||||
end, | end: Optional[datetime], | ||||
cron_schedule, | cron_schedule: str, | ||||
fmt, | fmt: str, | ||||
timezone, | timezone: Optional[str], | ||||
execution_time_to_partition_fn, | execution_time_to_partition_fn: Callable, | ||||
): | current_time: Optional[datetime], | ||||
check.inst_param(start, "start", datetime) | ) -> List[Partition[datetime]]: | ||||
check.opt_inst_param(end, "end", datetime) | |||||
check.str_param(cron_schedule, "cron_schedule") | |||||
check.str_param(fmt, "fmt") | |||||
check.opt_str_param(timezone, "timezone") | |||||
check.callable_param(execution_time_to_partition_fn, "execution_time_to_partition_fn") | |||||
if end and start > end: | if end and start > end: | ||||
raise DagsterInvariantViolationError( | raise DagsterInvariantViolationError( | ||||
'Selected date range start "{start}" is after date range end "{end}'.format( | 'Selected date range start "{start}" is after date range end "{end}'.format( | ||||
start=start.strftime(fmt), | start=start.strftime(fmt), | ||||
end=end.strftime(fmt), | end=end.strftime(fmt), | ||||
) | ) | ||||
) | ) | ||||
def _get_schedule_range_partitions(current_time=None): | |||||
check.opt_inst_param(current_time, "current_time", datetime) | |||||
tz = timezone if timezone else "UTC" | tz = timezone if timezone else "UTC" | ||||
_current_time = current_time if current_time else pendulum.now(tz) | _current_time = current_time if current_time else pendulum.now(tz) | ||||
# Coerce to the definition timezone | # Coerce to the definition timezone | ||||
_start = ( | _start = ( | ||||
to_timezone(start, tz) | to_timezone(start, tz) | ||||
if isinstance(start, PendulumDateTime) | if isinstance(start, PendulumDateTime) | ||||
else pendulum.instance(start, tz=tz) | else pendulum.instance(start, tz=tz) | ||||
) | ) | ||||
_current_time = ( | _current_time = ( | ||||
to_timezone(_current_time, tz) | to_timezone(_current_time, tz) | ||||
if isinstance(_current_time, PendulumDateTime) | if isinstance(_current_time, PendulumDateTime) | ||||
else pendulum.instance(_current_time, tz=tz) | else pendulum.instance(_current_time, tz=tz) | ||||
) | ) | ||||
# The end partition time should be before the last partition that | # The end partition time should be before the last partition that | ||||
# executes before the current time | # executes before the current time | ||||
end_partition_time = execution_time_to_partition_fn(_current_time) | end_partition_time = execution_time_to_partition_fn(_current_time) | ||||
# The partition set has an explicit end time that represents the end of the partition range | # The partition set has an explicit end time that represents the end of the partition range | ||||
if end: | if end: | ||||
_end = ( | _end = ( | ||||
to_timezone(end, tz) | to_timezone(end, tz) | ||||
if isinstance(end, PendulumDateTime) | if isinstance(end, PendulumDateTime) | ||||
else pendulum.instance(end, tz=tz) | else pendulum.instance(end, tz=tz) | ||||
) | ) | ||||
# If the explicit end time is before the last partition time, | # If the explicit end time is before the last partition time, | ||||
# update the end partition time | # update the end partition time | ||||
end_partition_time = min(_end, end_partition_time) | end_partition_time = min(_end, end_partition_time) | ||||
end_timestamp = end_partition_time.timestamp() | end_timestamp = end_partition_time.timestamp() | ||||
partitions = [] | partitions: List[Partition[datetime]] = [] | ||||
for next_time in schedule_execution_time_iterator(_start.timestamp(), cron_schedule, tz): | for next_time in schedule_execution_time_iterator(_start.timestamp(), cron_schedule, tz): | ||||
partition_time = execution_time_to_partition_fn(next_time) | partition_time = execution_time_to_partition_fn(next_time) | ||||
if partition_time.timestamp() > end_timestamp: | if partition_time.timestamp() > end_timestamp: | ||||
break | break | ||||
if partition_time.timestamp() < _start.timestamp(): | if partition_time.timestamp() < _start.timestamp(): | ||||
continue | continue | ||||
partitions.append(Partition(value=partition_time, name=partition_time.strftime(fmt))) | partitions.append(Partition(value=partition_time, name=partition_time.strftime(fmt))) | ||||
return partitions | return partitions | ||||
return _get_schedule_range_partitions | |||||
class ScheduleType(Enum): | class ScheduleType(Enum): | ||||
HOURLY = "HOURLY" | HOURLY = "HOURLY" | ||||
DAILY = "DAILY" | DAILY = "DAILY" | ||||
WEEKLY = "WEEKLY" | WEEKLY = "WEEKLY" | ||||
MONTHLY = "MONTHLY" | MONTHLY = "MONTHLY" | ||||
class PartitionParams(ABC): | class Partitions(ABC, Generic[T]): | ||||
dgibson: PartitionsFactory? :)
get_partitions() on Partitions feels weird | |||||
@abstractmethod | @abstractmethod | ||||
def get_partitions(self, current_time: Optional[datetime] = None) -> List[Partition]: | def get_partitions(self, current_time: Optional[datetime] = None) -> List[Partition[T]]: | ||||
... | ... | ||||
class StaticPartitionParams( | class StaticPartitions(Partitions[T]): # pylint: disable=unsubscriptable-object | ||||
PartitionParams, NamedTuple("_StaticPartitionParams", [("partitions", List[Partition])]) | def __init__(self, partitions: List[Partition[T]]): | ||||
): | self._partitions = partitions | ||||
Not Done Inline Actionswould probably be nice to have a check here rexledesma: would probably be nice to have a check here | |||||
def __new__(cls, partitions: List[Partition]): | |||||
return super(StaticPartitionParams, cls).__new__( | def get_partitions( | ||||
cls, check.list_param(partitions, "partitions", of_type=Partition) | self, current_time: Optional[datetime] = None # pylint: disable=unused-argument | ||||
) | ) -> List[Partition[T]]: | ||||
return self._partitions | |||||
def get_partitions(self, current_time: Optional[datetime] = None) -> List[Partition]: | |||||
return self.partitions | |||||
class TimeBasedPartitions( | |||||
class TimeBasedPartitionParams( | Partitions[datetime], # pylint: disable=unsubscriptable-object | ||||
PartitionParams, | |||||
NamedTuple( | NamedTuple( | ||||
"_TimeBasedPartitionParams", | "_TimeBasedPartitions", | ||||
[ | [ | ||||
("schedule_type", ScheduleType), | ("schedule_type", ScheduleType), | ||||
("start", datetime), | ("start", datetime), | ||||
("execution_time", time), | ("execution_time", time), | ||||
("execution_day", Optional[int]), | ("execution_day", Optional[int]), | ||||
("end", Optional[datetime]), | ("end", Optional[datetime]), | ||||
("fmt", Optional[str]), | ("fmt", str), | ||||
("timezone", Optional[str]), | ("timezone", Optional[str]), | ||||
("offset", Optional[int]), | ("offset", Optional[int]), | ||||
], | ], | ||||
), | ), | ||||
): | ): | ||||
def __new__( | def __new__( | ||||
cls, | cls, | ||||
schedule_type: ScheduleType, | schedule_type: ScheduleType, | ||||
Show All 29 Lines | ): | ||||
elif schedule_type is ScheduleType.MONTHLY: | elif schedule_type is ScheduleType.MONTHLY: | ||||
execution_day = execution_day if execution_day is not None else 1 | execution_day = execution_day if execution_day is not None else 1 | ||||
check.invariant( | check.invariant( | ||||
execution_day is not None and 1 <= execution_day <= 31, | execution_day is not None and 1 <= execution_day <= 31, | ||||
f'Execution day "{execution_day}" must be between 1 and 31 for ' | f'Execution day "{execution_day}" must be between 1 and 31 for ' | ||||
f'schedule type "{schedule_type}"', | f'schedule type "{schedule_type}"', | ||||
) | ) | ||||
return super(TimeBasedPartitionParams, cls).__new__( | return super(TimeBasedPartitions, cls).__new__( | ||||
cls, | cls, | ||||
check.inst_param(schedule_type, "schedule_type", ScheduleType), | check.inst_param(schedule_type, "schedule_type", ScheduleType), | ||||
check.inst_param(start, "start", datetime), | check.inst_param(start, "start", datetime), | ||||
check.opt_inst_param(execution_time, "execution_time", time, time(0, 0)), | check.opt_inst_param(execution_time, "execution_time", time, time(0, 0)), | ||||
check.opt_int_param( | check.opt_int_param( | ||||
execution_day, | execution_day, | ||||
"execution_day", | "execution_day", | ||||
), | ), | ||||
check.opt_inst_param(end, "end", datetime), | check.opt_inst_param(end, "end", datetime), | ||||
check.opt_str_param(fmt, "fmt", default=DEFAULT_DATE_FORMAT), | cast(str, check.opt_str_param(fmt, "fmt", default=DEFAULT_DATE_FORMAT)), | ||||
check.opt_str_param(timezone, "timezone", default="UTC"), | check.opt_str_param(timezone, "timezone", default="UTC"), | ||||
check.opt_int_param(offset, "offset", default=1), | check.opt_int_param(offset, "offset", default=1), | ||||
) | ) | ||||
def get_partitions(self, current_time: Optional[datetime] = None) -> List[Partition]: | def get_partitions(self, current_time: Optional[datetime] = None) -> List[Partition[datetime]]: | ||||
check.opt_inst_param(current_time, "current_time", datetime) | check.opt_inst_param(current_time, "current_time", datetime) | ||||
partition_fn = schedule_partition_range( | return schedule_partition_range( | ||||
start=self.start, | start=self.start, | ||||
end=self.end, | end=self.end, | ||||
cron_schedule=self.get_cron_schedule(), | cron_schedule=self.get_cron_schedule(), | ||||
fmt=self.fmt, | fmt=self.fmt, | ||||
timezone=self.timezone, | timezone=self.timezone, | ||||
execution_time_to_partition_fn=self.get_execution_time_to_partition_fn(), | execution_time_to_partition_fn=self.get_execution_time_to_partition_fn(), | ||||
current_time=current_time, | |||||
) | ) | ||||
return partition_fn(current_time=current_time) | |||||
def get_cron_schedule(self) -> str: | def get_cron_schedule(self) -> str: | ||||
minute = self.execution_time.minute | minute = self.execution_time.minute | ||||
hour = self.execution_time.hour | hour = self.execution_time.hour | ||||
day = self.execution_day | day = self.execution_day | ||||
if self.schedule_type is ScheduleType.HOURLY: | if self.schedule_type is ScheduleType.HOURLY: | ||||
return f"{minute} * * * *" | return f"{minute} * * * *" | ||||
elif self.schedule_type is ScheduleType.DAILY: | elif self.schedule_type is ScheduleType.DAILY: | ||||
Show All 22 Lines | def get_execution_time_to_partition_fn(self) -> Callable[[datetime], datetime]: | ||||
execution_day = cast(int, self.execution_day) | execution_day = cast(int, self.execution_day) | ||||
return lambda d: pendulum.instance(d).subtract( | return lambda d: pendulum.instance(d).subtract( | ||||
months=self.offset, days=execution_day - 1, hours=d.hour, minutes=d.minute | months=self.offset, days=execution_day - 1, hours=d.hour, minutes=d.minute | ||||
) | ) | ||||
else: | else: | ||||
check.assert_never(self.schedule_type) | check.assert_never(self.schedule_type) | ||||
class DynamicPartitionParams( | class DynamicPartitions( | ||||
PartitionParams, | Partitions, | ||||
NamedTuple( | NamedTuple( | ||||
"_DynamicPartitionParams", | "_DynamicPartitions", | ||||
[("partition_fn", Callable[[Optional[datetime]], List[Partition]])], | [("partition_fn", Callable[[Optional[datetime]], List[Partition]])], | ||||
), | ), | ||||
): | ): | ||||
def __new__(cls, partition_fn: Callable[[Optional[datetime]], List[Partition]]): | def __new__(cls, partition_fn: Callable[[Optional[datetime]], List[Partition]]): | ||||
return super(DynamicPartitionParams, cls).__new__( | return super(DynamicPartitions, cls).__new__( | ||||
cls, check.callable_param(partition_fn, "partition_fn") | cls, check.callable_param(partition_fn, "partition_fn") | ||||
) | ) | ||||
def get_partitions(self, current_time: Optional[datetime] = None) -> List[Partition]: | def get_partitions(self, current_time: Optional[datetime] = None) -> List[Partition]: | ||||
return self.partition_fn(current_time) | return self.partition_fn(current_time) | ||||
class PartitionSetDefinition( | class PartitionSetDefinition( | ||||
namedtuple( | namedtuple( | ||||
"_PartitionSetDefinition", | "_PartitionSetDefinition", | ||||
( | ( | ||||
"name pipeline_name partition_fn solid_selection mode " | "name pipeline_name partition_fn solid_selection mode " | ||||
"user_defined_run_config_fn_for_partition user_defined_tags_fn_for_partition " | "user_defined_run_config_fn_for_partition user_defined_tags_fn_for_partition " | ||||
"partition_params" | "partitions" | ||||
), | ), | ||||
) | ) | ||||
): | ): | ||||
""" | """ | ||||
Defines a partition set, representing the set of slices making up an axis of a pipeline | Defines a partition set, representing the set of slices making up an axis of a pipeline | ||||
Args: | Args: | ||||
name (str): Name for this partition set | name (str): Name for this partition set | ||||
pipeline_name (str): The name of the pipeline definition | pipeline_name (str): The name of the pipeline definition | ||||
partition_fn (Optional[Callable[void, List[Partition]]]): User-provided function to define | partition_fn (Optional[Callable[void, List[Partition]]]): User-provided function to define | ||||
the set of valid partition objects. | the set of valid partition objects. | ||||
solid_selection (Optional[List[str]]): A list of solid subselection (including single | solid_selection (Optional[List[str]]): A list of solid subselection (including single | ||||
solid names) to execute with this partition. e.g. ``['*some_solid+', 'other_solid']`` | solid names) to execute with this partition. e.g. ``['*some_solid+', 'other_solid']`` | ||||
mode (Optional[str]): The mode to apply when executing this partition. (default: 'default') | mode (Optional[str]): The mode to apply when executing this partition. (default: 'default') | ||||
run_config_fn_for_partition (Callable[[Partition], [Any]]): A | run_config_fn_for_partition (Callable[[Partition], Any]): A | ||||
function that takes a :py:class:`~dagster.Partition` and returns the run | function that takes a :py:class:`~dagster.Partition` and returns the run | ||||
configuration that parameterizes the execution for this partition. | configuration that parameterizes the execution for this partition. | ||||
tags_fn_for_partition (Callable[[Partition], Optional[dict[str, str]]]): A function that | tags_fn_for_partition (Callable[[Partition], Optional[dict[str, str]]]): A function that | ||||
takes a :py:class:`~dagster.Partition` and returns a list of key value pairs that will | takes a :py:class:`~dagster.Partition` and returns a list of key value pairs that will | ||||
be added to the generated run for this partition. | be added to the generated run for this partition. | ||||
partition_params (Optional[PartitionParams]): A set of parameters used to construct the set | partitions (Optional[Partitions]): A set of parameters used to construct the set | ||||
Not Done Inline Actionswe could probably do a better job explaining here the invariant below that one of 'partition_fn' or 'partitions' must be supplied dgibson: we could probably do a better job explaining here the invariant below that one of… | |||||
of valid partition objects. | of valid partition objects. | ||||
""" | """ | ||||
def __new__( | def __new__( | ||||
cls, | cls, | ||||
name, | name: str, | ||||
pipeline_name, | pipeline_name: str, | ||||
partition_fn=None, | partition_fn: Optional[Callable[[], List[Partition]]] = None, | ||||
solid_selection=None, | solid_selection: Optional[List[str]] = None, | ||||
mode=None, | mode: Optional[str] = None, | ||||
run_config_fn_for_partition=lambda _partition: {}, | run_config_fn_for_partition: Callable[[Partition], Any] = lambda _partition: {}, | ||||
tags_fn_for_partition=lambda _partition: {}, | tags_fn_for_partition: Callable[ | ||||
partition_params=None, | [Partition], Optional[Dict[str, str]] | ||||
] = lambda _partition: {}, | |||||
partitions: Optional[Partitions] = None, | |||||
): | ): | ||||
check.invariant( | check.invariant( | ||||
partition_fn is not None or partition_params is not None, | partition_fn is not None or partitions is not None, | ||||
"One of `partition_fn` or `partition_params` must be supplied.", | "One of `partition_fn` or `partitions` must be supplied.", | ||||
) | ) | ||||
check.invariant( | check.invariant( | ||||
not (partition_fn and partition_params), | not (partition_fn and partitions), | ||||
"Only one of `partition_fn` or `partition_params` must be supplied.", | "Only one of `partition_fn` or `partitions` must be supplied.", | ||||
) | ) | ||||
_wrap_partition_fn = None | _wrap_partition_fn = None | ||||
if partition_fn is not None: | if partition_fn is not None: | ||||
partition_fn_param_count = len(inspect.signature(partition_fn).parameters) | partition_fn_param_count = len(inspect.signature(partition_fn).parameters) | ||||
def _wrap_partition(x): | def _wrap_partition(x): | ||||
Show All 28 Lines | ): | ||||
), | ), | ||||
mode=check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME), | mode=check.opt_str_param(mode, "mode", DEFAULT_MODE_NAME), | ||||
user_defined_run_config_fn_for_partition=check.callable_param( | user_defined_run_config_fn_for_partition=check.callable_param( | ||||
run_config_fn_for_partition, "run_config_fn_for_partition" | run_config_fn_for_partition, "run_config_fn_for_partition" | ||||
), | ), | ||||
user_defined_tags_fn_for_partition=check.callable_param( | user_defined_tags_fn_for_partition=check.callable_param( | ||||
tags_fn_for_partition, "tags_fn_for_partition" | tags_fn_for_partition, "tags_fn_for_partition" | ||||
), | ), | ||||
partition_params=check.opt_inst_param( | partitions=check.opt_inst_param( | ||||
partition_params, | partitions, | ||||
"partition_params", | "partitions", | ||||
PartitionParams, | Partitions, | ||||
default=DynamicPartitionParams(partition_fn=_wrap_partition_fn) | default=DynamicPartitions(partition_fn=_wrap_partition_fn) | ||||
if partition_fn is not None | if partition_fn is not None | ||||
else None, | else None, | ||||
), | ), | ||||
) | ) | ||||
def run_config_for_partition(self, partition): | def run_config_for_partition(self, partition: Partition) -> Dict[str, Any]: | ||||
return self.user_defined_run_config_fn_for_partition(partition) | return self.user_defined_run_config_fn_for_partition(partition) | ||||
def tags_for_partition(self, partition): | def tags_for_partition(self, partition: Partition) -> Dict[str, str]: | ||||
user_tags = self.user_defined_tags_fn_for_partition(partition) | user_tags = self.user_defined_tags_fn_for_partition(partition) | ||||
check_tags(user_tags, "user_tags") | check_tags(user_tags, "user_tags") | ||||
tags = merge_dicts(user_tags, PipelineRun.tags_for_partition_set(self, partition)) | tags = merge_dicts(user_tags, PipelineRun.tags_for_partition_set(self, partition)) | ||||
return tags | return tags | ||||
def get_partitions(self, current_time=None): | def get_partitions(self, current_time: Optional[datetime] = None) -> List[Partition]: | ||||
"""Return the set of known partitions. | """Return the set of known partitions. | ||||
Arguments: | Arguments: | ||||
current_time (Optional[datetime]): The evaluation time for the partition function, which | current_time (Optional[datetime]): The evaluation time for the partition function, which | ||||
is passed through to the ``partition_fn`` (if it accepts a parameter). Defaults to | is passed through to the ``partition_fn`` (if it accepts a parameter). Defaults to | ||||
the current time in UTC. | the current time in UTC. | ||||
""" | """ | ||||
return self.partition_params.get_partitions(current_time) | return self.partitions.get_partitions(current_time) | ||||
def get_partition(self, name): | def get_partition(self, name: str) -> Partition: | ||||
for partition in self.get_partitions(): | for partition in self.get_partitions(): | ||||
if partition.name == name: | if partition.name == name: | ||||
return partition | return partition | ||||
check.failed("Partition name {} not found!".format(name)) | check.failed("Partition name {} not found!".format(name)) | ||||
def get_partition_names(self, current_time=None): | def get_partition_names(self, current_time: Optional[datetime] = None) -> List[str]: | ||||
return [part.name for part in self.get_partitions(current_time)] | return [part.name for part in self.get_partitions(current_time)] | ||||
def create_schedule_definition( | def create_schedule_definition( | ||||
self, | self, | ||||
schedule_name, | schedule_name, | ||||
cron_schedule, | cron_schedule, | ||||
partition_selector, | partition_selector, | ||||
should_execute=None, | should_execute=None, | ||||
environment_vars=None, | environment_vars=None, | ||||
execution_timezone=None, | execution_timezone=None, | ||||
description=None, | description=None, | ||||
decorated_fn=None, | decorated_fn=None, | ||||
job=None, | job=None, | ||||
): | ): | ||||
"""Create a ScheduleDefinition from a PartitionSetDefinition. | """Create a ScheduleDefinition from a PartitionSetDefinition. | ||||
Arguments: | Arguments: | ||||
schedule_name (str): The name of the schedule. | schedule_name (str): The name of the schedule. | ||||
cron_schedule (str): A valid cron string for the schedule | cron_schedule (str): A valid cron string for the schedule | ||||
partition_selector (Callable[ScheduleEvaluationContext, PartitionSetDefinition], Union[Partition, List[Partition]]): | partition_selector (Callable[ScheduleEvaluationContext, PartitionSetDefinition], Union[Partition, List[Partition]]): | ||||
Function that determines the partition to use at a given execution time. Can return | Function that determines the partition to use at a given execution time. Can return | ||||
either a single Partition or a list of Partitions. For time-based partition sets, | either a single Partition or a list of Partitions. For time-based partition sets, | ||||
Not Done Inline Actionsis this line right? I thought it was still either returning a single thing or a list of things, but the thing in both was the same dgibson: is this line right? I thought it was still either returning a single thing or a list of things… | |||||
Done Inline Actionscodemod error. will fix sandyryza: codemod error. will fix
| |||||
will likely be either `identity_partition_selector` or a selector returned by | will likely be either `identity_partition_selector` or a selector returned by | ||||
`create_offset_partition_selector`. | `create_offset_partition_selector`. | ||||
should_execute (Optional[function]): Function that runs at schedule execution time that | should_execute (Optional[function]): Function that runs at schedule execution time that | ||||
determines whether a schedule should execute. Defaults to a function that always returns | determines whether a schedule should execute. Defaults to a function that always returns | ||||
``True``. | ``True``. | ||||
environment_vars (Optional[dict]): The environment variables to set for the schedule. | environment_vars (Optional[dict]): The environment variables to set for the schedule. | ||||
execution_timezone (Optional[str]): Timezone in which the schedule should run. Only works | execution_timezone (Optional[str]): Timezone in which the schedule should run. Only works | ||||
with DagsterDaemonScheduler, and must be set when using that scheduler. | with DagsterDaemonScheduler, and must be set when using that scheduler. | ||||
▲ Show 20 Lines • Show All 174 Lines • Show Last 20 Lines |
PartitionsFactory? :)
get_partitions() on Partitions feels weird