Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/repository.py
from dagster import check | from dagster import check | ||||
from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError | from dagster.core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError | ||||
from dagster.utils import merge_dicts | from dagster.utils import merge_dicts | ||||
from .executable import ExecutableDefinition | from .job import JobDefinition | ||||
from .partition import PartitionScheduleDefinition, PartitionSetDefinition | from .partition import PartitionScheduleDefinition, PartitionSetDefinition | ||||
from .pipeline import PipelineDefinition | from .pipeline import PipelineDefinition | ||||
from .schedule import ScheduleDefinition | from .schedule import ScheduleDefinition | ||||
from .utils import check_valid_name | from .utils import check_valid_name | ||||
VALID_REPOSITORY_DATA_DICT_KEYS = { | VALID_REPOSITORY_DATA_DICT_KEYS = { | ||||
"pipelines", | "pipelines", | ||||
"partition_sets", | "partition_sets", | ||||
"schedules", | "schedules", | ||||
"executables", | "jobs", | ||||
} | } | ||||
class _CacheingDefinitionIndex(object): | class _CacheingDefinitionIndex(object): | ||||
def __init__(self, definition_class, definition_class_name, definition_kind, definitions): | def __init__(self, definition_class, definition_class_name, definition_kind, definitions): | ||||
for key, definition in definitions.items(): | for key, definition in definitions.items(): | ||||
check.invariant( | check.invariant( | ||||
▲ Show 20 Lines • Show All 96 Lines • ▼ Show 20 Lines | class RepositoryData(object): | ||||
"""Contains definitions belonging to a repository. | """Contains definitions belonging to a repository. | ||||
Users should usually rely on the :py:func:`@repository <repository>` decorator to create new | Users should usually rely on the :py:func:`@repository <repository>` decorator to create new | ||||
repositories, which will in turn call the static constructors on this class. However, users may | repositories, which will in turn call the static constructors on this class. However, users may | ||||
subclass RepositoryData for fine-grained control over access to and lazy creation | subclass RepositoryData for fine-grained control over access to and lazy creation | ||||
of repository members. | of repository members. | ||||
""" | """ | ||||
def __init__(self, pipelines, partition_sets, schedules, executables): | def __init__(self, pipelines, partition_sets, schedules, jobs): | ||||
"""Constructs a new RepositoryData object. | """Constructs a new RepositoryData object. | ||||
You may pass pipeline, partition_set, and schedule definitions directly, or you may pass | You may pass pipeline, partition_set, and schedule definitions directly, or you may pass | ||||
callables with no arguments that will be invoked to lazily construct definitions when | callables with no arguments that will be invoked to lazily construct definitions when | ||||
accessed by name. This can be helpful for performance when there are many definitions in a | accessed by name. This can be helpful for performance when there are many definitions in a | ||||
repository, or when constructing the definitions is costly. | repository, or when constructing the definitions is costly. | ||||
Note that when lazily constructing a definition, the name of the definition must match its | Note that when lazily constructing a definition, the name of the definition must match its | ||||
key in its dictionary index, or a :py:class:`DagsterInvariantViolationError` will be thrown | key in its dictionary index, or a :py:class:`DagsterInvariantViolationError` will be thrown | ||||
at retrieval time. | at retrieval time. | ||||
Args: | Args: | ||||
pipelines (Dict[str, Union[PipelineDefinition, Callable[[], PipelineDefinition]]]): | pipelines (Dict[str, Union[PipelineDefinition, Callable[[], PipelineDefinition]]]): | ||||
The pipeline definitions belonging to the repository. | The pipeline definitions belonging to the repository. | ||||
partition_sets (Dict[str, Union[PartitionSetDefinition, Callable[[], PartitionSetDefinition]]]): | partition_sets (Dict[str, Union[PartitionSetDefinition, Callable[[], PartitionSetDefinition]]]): | ||||
The partition sets belonging to the repository. | The partition sets belonging to the repository. | ||||
schedules (Dict[str, Union[ScheduleDefinition, Callable[[], ScheduleDefinition]]]): | schedules (Dict[str, Union[ScheduleDefinition, Callable[[], ScheduleDefinition]]]): | ||||
The schedules belonging to the repository. | The schedules belonging to the repository. | ||||
executables (Dict[str, Union[ExecutableDefinition, Callable[[], ExecutableDefinition]]]): | jobs (Dict[str, Union[JobDefinition, Callable[[], JobDefinition]]]): | ||||
The predefined executables for a repository. | The predefined jobs for a repository. | ||||
""" | """ | ||||
check.dict_param(pipelines, "pipelines", key_type=str) | check.dict_param(pipelines, "pipelines", key_type=str) | ||||
check.dict_param(partition_sets, "partition_sets", key_type=str) | check.dict_param(partition_sets, "partition_sets", key_type=str) | ||||
check.dict_param(schedules, "schedules", key_type=str) | check.dict_param(schedules, "schedules", key_type=str) | ||||
check.dict_param(executables, "executables", key_type=str) | check.dict_param(jobs, "jobs", key_type=str) | ||||
self._pipelines = _CacheingDefinitionIndex( | self._pipelines = _CacheingDefinitionIndex( | ||||
PipelineDefinition, "PipelineDefinition", "pipeline", pipelines | PipelineDefinition, "PipelineDefinition", "pipeline", pipelines | ||||
) | ) | ||||
self._schedules = _CacheingDefinitionIndex( | self._schedules = _CacheingDefinitionIndex( | ||||
ScheduleDefinition, "ScheduleDefinition", "schedule", schedules | ScheduleDefinition, "ScheduleDefinition", "schedule", schedules | ||||
) | ) | ||||
schedule_partition_sets = [ | schedule_partition_sets = [ | ||||
schedule.get_partition_set() | schedule.get_partition_set() | ||||
for schedule in self._schedules.get_all_definitions() | for schedule in self._schedules.get_all_definitions() | ||||
if isinstance(schedule, PartitionScheduleDefinition) | if isinstance(schedule, PartitionScheduleDefinition) | ||||
] | ] | ||||
self._partition_sets = _CacheingDefinitionIndex( | self._partition_sets = _CacheingDefinitionIndex( | ||||
PartitionSetDefinition, | PartitionSetDefinition, | ||||
"PartitionSetDefinition", | "PartitionSetDefinition", | ||||
"partition set", | "partition set", | ||||
merge_dicts( | merge_dicts( | ||||
{partition_set.name: partition_set for partition_set in schedule_partition_sets}, | {partition_set.name: partition_set for partition_set in schedule_partition_sets}, | ||||
partition_sets, | partition_sets, | ||||
), | ), | ||||
) | ) | ||||
self._executables = _CacheingDefinitionIndex( | self._jobs = _CacheingDefinitionIndex(JobDefinition, "JobDefinition", "job", jobs,) | ||||
ExecutableDefinition, "ExecutableDefinition", "executable", executables, | |||||
) | |||||
self._all_pipelines = None | self._all_pipelines = None | ||||
self._solids = None | self._solids = None | ||||
self._all_solids = None | self._all_solids = None | ||||
@staticmethod | @staticmethod | ||||
def from_dict(repository_definitions): | def from_dict(repository_definitions): | ||||
"""Static constructor. | """Static constructor. | ||||
Show All 40 Lines | def from_list(cls, repository_definitions): | ||||
Args: | Args: | ||||
repository_definition (List[Union[PipelineDefinition, PartitionSetDefinition, ScheduleDefinition]]): | repository_definition (List[Union[PipelineDefinition, PartitionSetDefinition, ScheduleDefinition]]): | ||||
Use this constructor when you have no need to lazy load pipelines or other | Use this constructor when you have no need to lazy load pipelines or other | ||||
definitions. | definitions. | ||||
""" | """ | ||||
pipelines = {} | pipelines = {} | ||||
partition_sets = {} | partition_sets = {} | ||||
schedules = {} | schedules = {} | ||||
executables = {} | jobs = {} | ||||
for definition in repository_definitions: | for definition in repository_definitions: | ||||
if isinstance(definition, PipelineDefinition): | if isinstance(definition, PipelineDefinition): | ||||
if definition.name in pipelines: | if definition.name in pipelines: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
"Duplicate pipeline definition found for pipeline {pipeline_name}".format( | "Duplicate pipeline definition found for pipeline {pipeline_name}".format( | ||||
pipeline_name=definition.name | pipeline_name=definition.name | ||||
) | ) | ||||
) | ) | ||||
Show All 19 Lines | def from_list(cls, repository_definitions): | ||||
partition_set_def.name in partition_sets | partition_set_def.name in partition_sets | ||||
and partition_set_def != partition_sets[partition_set_def.name] | and partition_set_def != partition_sets[partition_set_def.name] | ||||
): | ): | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
"Duplicate partition set definition found for partition set " | "Duplicate partition set definition found for partition set " | ||||
"{partition_set_name}".format(partition_set_name=partition_set_def.name) | "{partition_set_name}".format(partition_set_name=partition_set_def.name) | ||||
) | ) | ||||
partition_sets[partition_set_def.name] = partition_set_def | partition_sets[partition_set_def.name] = partition_set_def | ||||
elif isinstance(definition, ExecutableDefinition): | elif isinstance(definition, JobDefinition): | ||||
if definition.name in executables: | if definition.name in jobs: | ||||
raise DagsterInvalidDefinitionError( | raise DagsterInvalidDefinitionError( | ||||
"Duplicate executable definition found for executable {name}".format( | "Duplicate job definition found for job {name}".format(name=definition.name) | ||||
name=definition.name | |||||
) | |||||
) | ) | ||||
executables[definition.name] = definition | jobs[definition.name] = definition | ||||
return RepositoryData( | return RepositoryData( | ||||
pipelines=pipelines, | pipelines=pipelines, partition_sets=partition_sets, schedules=schedules, jobs=jobs, | ||||
partition_sets=partition_sets, | |||||
schedules=schedules, | |||||
executables=executables, | |||||
) | ) | ||||
def get_pipeline_names(self): | def get_pipeline_names(self): | ||||
"""Get the names of all pipelines in the repository. | """Get the names of all pipelines in the repository. | ||||
Returns: | Returns: | ||||
List[str] | List[str] | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 125 Lines • ▼ Show 20 Lines | def get_schedule(self, schedule_name): | ||||
return self._schedules.get_definition(schedule_name) | return self._schedules.get_definition(schedule_name) | ||||
def has_schedule(self, schedule_name): | def has_schedule(self, schedule_name): | ||||
check.str_param(schedule_name, "schedule_name") | check.str_param(schedule_name, "schedule_name") | ||||
return self._schedules.has_definition(schedule_name) | return self._schedules.has_definition(schedule_name) | ||||
def get_all_executables(self): | def get_all_jobs(self): | ||||
return self._executables.get_all_definitions() | return self._jobs.get_all_definitions() | ||||
def get_executable(self, name): | def get_job(self, name): | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
return self._executables.get_definition(name) | return self._jobs.get_definition(name) | ||||
def has_executable(self, name): | def has_job(self, name): | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
return self._executables.has_definition(name) | return self._jobs.has_definition(name) | ||||
def get_all_solid_defs(self): | def get_all_solid_defs(self): | ||||
if self._all_solids is not None: | if self._all_solids is not None: | ||||
return self._all_solids | return self._all_solids | ||||
self._all_solids = self._construct_solid_defs() | self._all_solids = self._construct_solid_defs() | ||||
return list(self._all_solids.values()) | return list(self._all_solids.values()) | ||||
▲ Show 20 Lines • Show All 154 Lines • ▼ Show 20 Lines | class RepositoryDefinition(object): | ||||
def get_schedule_def(self, name): | def get_schedule_def(self, name): | ||||
return self._repository_data.get_schedule(name) | return self._repository_data.get_schedule(name) | ||||
def has_schedule_def(self, name): | def has_schedule_def(self, name): | ||||
return self._repository_data.has_schedule(name) | return self._repository_data.has_schedule(name) | ||||
@property | @property | ||||
def executable_defs(self): | def job_defs(self): | ||||
return self._repository_data.get_all_executables() | return self._repository_data.get_all_jobs() | ||||
def get_executable_def(self, name): | def get_job_def(self, name): | ||||
return self._repository_data.get_executable(name) | return self._repository_data.get_job(name) | ||||
def has_executable_def(self, name): | def has_job_def(self, name): | ||||
return self._repository_data.has_executable(name) | return self._repository_data.has_job(name) |