Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/storage/init.py
from collections import namedtuple | from collections import namedtuple | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions import ( | from dagster.core.definitions import ( | ||||
IntermediateStorageDefinition, | IntermediateStorageDefinition, | ||||
ModeDefinition, | ModeDefinition, | ||||
PipelineDefinition, | PipelineDefinition, | ||||
SystemStorageDefinition, | SystemStorageDefinition, | ||||
) | ) | ||||
from dagster.core.execution.plan.objects import StepOutputHandle | |||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster.core.storage.pipeline_run import PipelineRun | from dagster.core.storage.pipeline_run import PipelineRun | ||||
from dagster.core.storage.type_storage import TypeStoragePluginRegistry | from dagster.core.storage.type_storage import TypeStoragePluginRegistry | ||||
from dagster.core.system_config.objects import EnvironmentConfig | from dagster.core.system_config.objects import EnvironmentConfig | ||||
class InitSystemStorageContext( | class InitSystemStorageContext( | ||||
namedtuple( | namedtuple( | ||||
"InitSystemStorageContext", | "InitSystemStorageContext", | ||||
( | ( | ||||
"pipeline_def mode_def system_storage_def pipeline_run instance environment_config " | "pipeline_def mode_def system_storage_def pipeline_run instance environment_config " | ||||
"type_storage_plugin_registry resources system_storage_config" | "type_storage_plugin_registry resources system_storage_config external_intermediates" | ||||
), | ), | ||||
) | ) | ||||
): | ): | ||||
"""System storage-specific initialization context. | """System storage-specific initialization context. | ||||
Attributes: | Attributes: | ||||
pipeline_def (PipelineDefinition): The definition of the pipeline in context. | pipeline_def (PipelineDefinition): The definition of the pipeline in context. | ||||
mode_def (ModeDefinition): The definition of the mode in contxt. | mode_def (ModeDefinition): The definition of the mode in contxt. | ||||
system_storage_def (SystemStorageDefinition): The definition of the system storage to be | system_storage_def (SystemStorageDefinition): The definition of the system storage to be | ||||
constructed. | constructed. | ||||
pipeline_run (PipelineRun): The pipeline run in context. | pipeline_run (PipelineRun): The pipeline run in context. | ||||
instance (DagsterInstance): The instance. | instance (DagsterInstance): The instance. | ||||
environment_config (EnvironmentConfig): The environment config. | environment_config (EnvironmentConfig): The environment config. | ||||
type_storage_plugin_registry (TypeStoragePluginRegistry): Registry containing custom type | type_storage_plugin_registry (TypeStoragePluginRegistry): Registry containing custom type | ||||
storage plugins. | storage plugins. | ||||
resources (Any): Resources available in context. | resources (Any): Resources available in context. | ||||
system_storage_config (Dict[str, Any]): The system storage-specific configuration data | system_storage_config (Dict[str, Any]): The system storage-specific configuration data | ||||
provided by the environment config. The schema for this data is defined by the | provided by the environment config. The schema for this data is defined by the | ||||
``config_field`` argument to :py:class:`SystemStorageDefinition`. | ``config_field`` argument to :py:class:`SystemStorageDefinition`. | ||||
external_intermediates (Dict[StepOutputHandle, Address]): The mapping from step output to | |||||
address which tracks the intermediates that have been stored outside the system | |||||
intermediate directory. | |||||
""" | """ | ||||
def __new__( | def __new__( | ||||
cls, | cls, | ||||
pipeline_def, | pipeline_def, | ||||
mode_def, | mode_def, | ||||
system_storage_def, | system_storage_def, | ||||
pipeline_run, | pipeline_run, | ||||
instance, | instance, | ||||
environment_config, | environment_config, | ||||
type_storage_plugin_registry, | type_storage_plugin_registry, | ||||
resources, | resources, | ||||
system_storage_config, | system_storage_config, | ||||
external_intermediates, | |||||
): | ): | ||||
return super(InitSystemStorageContext, cls).__new__( | return super(InitSystemStorageContext, cls).__new__( | ||||
cls, | cls, | ||||
pipeline_def=check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition), | pipeline_def=check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition), | ||||
mode_def=check.inst_param(mode_def, "mode_def", ModeDefinition), | mode_def=check.inst_param(mode_def, "mode_def", ModeDefinition), | ||||
system_storage_def=check.inst_param( | system_storage_def=check.inst_param( | ||||
system_storage_def, "system_storage_def", SystemStorageDefinition | system_storage_def, "system_storage_def", SystemStorageDefinition | ||||
), | ), | ||||
pipeline_run=check.inst_param(pipeline_run, "pipeline_run", PipelineRun), | pipeline_run=check.inst_param(pipeline_run, "pipeline_run", PipelineRun), | ||||
instance=check.inst_param(instance, "instance", DagsterInstance), | instance=check.inst_param(instance, "instance", DagsterInstance), | ||||
environment_config=check.inst_param( | environment_config=check.inst_param( | ||||
environment_config, "environment_config", EnvironmentConfig | environment_config, "environment_config", EnvironmentConfig | ||||
), | ), | ||||
type_storage_plugin_registry=check.inst_param( | type_storage_plugin_registry=check.inst_param( | ||||
type_storage_plugin_registry, | type_storage_plugin_registry, | ||||
"type_storage_plugin_registry", | "type_storage_plugin_registry", | ||||
TypeStoragePluginRegistry, | TypeStoragePluginRegistry, | ||||
), | ), | ||||
resources=check.not_none_param(resources, "resources"), | resources=check.not_none_param(resources, "resources"), | ||||
system_storage_config=check.dict_param( | system_storage_config=check.dict_param( | ||||
system_storage_config, system_storage_config, key_type=str | system_storage_config, system_storage_config, key_type=str | ||||
), | ), | ||||
external_intermediates=check.opt_dict_param( | |||||
external_intermediates, "external_intermediates", key_type=StepOutputHandle | |||||
), | |||||
) | ) | ||||
class InitIntermediateStorageContext( | class InitIntermediateStorageContext( | ||||
namedtuple( | namedtuple( | ||||
"InitIntermediateStorageContext", | "InitIntermediateStorageContext", | ||||
( | ( | ||||
"pipeline_def mode_def intermediate_storage_def pipeline_run instance environment_config " | "pipeline_def mode_def intermediate_storage_def pipeline_run instance environment_config " | ||||
"type_storage_plugin_registry resources intermediate_storage_config" | "type_storage_plugin_registry resources intermediate_storage_config external_intermediates" | ||||
), | ), | ||||
) | ) | ||||
): | ): | ||||
"""Intermediate storage-specific initialization context. | """Intermediate storage-specific initialization context. | ||||
Attributes: | Attributes: | ||||
pipeline_def (PipelineDefinition): The definition of the pipeline in context. | pipeline_def (PipelineDefinition): The definition of the pipeline in context. | ||||
mode_def (ModeDefinition): The definition of the mode in contxt. | mode_def (ModeDefinition): The definition of the mode in contxt. | ||||
intermediate_storage_def (IntermediateStorageDefinition): The definition of the intermediate storage to be | intermediate_storage_def (IntermediateStorageDefinition): The definition of the intermediate storage to be | ||||
constructed. | constructed. | ||||
pipeline_run (PipelineRun): The pipeline run in context. | pipeline_run (PipelineRun): The pipeline run in context. | ||||
instance (DagsterInstance): The instance. | instance (DagsterInstance): The instance. | ||||
environment_config (EnvironmentConfig): The environment config. | environment_config (EnvironmentConfig): The environment config. | ||||
type_storage_plugin_registry (TypeStoragePluginRegistry): Registry containing custom type | type_storage_plugin_registry (TypeStoragePluginRegistry): Registry containing custom type | ||||
storage plugins. | storage plugins. | ||||
resources (Any): Resources available in context. | resources (Any): Resources available in context. | ||||
intermediate_storage_config (Dict[str, Any]): The intermediate storage-specific configuration data | intermediate_storage_config (Dict[str, Any]): The intermediate storage-specific configuration data | ||||
provided by the environment config. The schema for this data is defined by the | provided by the environment config. The schema for this data is defined by the | ||||
``config_field`` argument to :py:class:`IntermediateStorageDefinition`. | ``config_field`` argument to :py:class:`IntermediateStorageDefinition`. | ||||
external_intermediates (Dict[StepOutputHandle, Address]): The mapping from step output to | |||||
address which tracks the intermediates that have been stored outside the system | |||||
intermediate directory. | |||||
""" | """ | ||||
def __new__( | def __new__( | ||||
cls, | cls, | ||||
pipeline_def, | pipeline_def, | ||||
mode_def, | mode_def, | ||||
intermediate_storage_def, | intermediate_storage_def, | ||||
pipeline_run, | pipeline_run, | ||||
instance, | instance, | ||||
environment_config, | environment_config, | ||||
type_storage_plugin_registry, | type_storage_plugin_registry, | ||||
resources, | resources, | ||||
intermediate_storage_config, | intermediate_storage_config, | ||||
external_intermediates, | |||||
): | ): | ||||
return super(InitIntermediateStorageContext, cls).__new__( | return super(InitIntermediateStorageContext, cls).__new__( | ||||
cls, | cls, | ||||
pipeline_def=check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition), | pipeline_def=check.inst_param(pipeline_def, "pipeline_def", PipelineDefinition), | ||||
mode_def=check.inst_param(mode_def, "mode_def", ModeDefinition), | mode_def=check.inst_param(mode_def, "mode_def", ModeDefinition), | ||||
intermediate_storage_def=check.inst_param( | intermediate_storage_def=check.inst_param( | ||||
intermediate_storage_def, "intermediate_storage_def", IntermediateStorageDefinition | intermediate_storage_def, "intermediate_storage_def", IntermediateStorageDefinition | ||||
), | ), | ||||
pipeline_run=check.inst_param(pipeline_run, "pipeline_run", PipelineRun), | pipeline_run=check.inst_param(pipeline_run, "pipeline_run", PipelineRun), | ||||
instance=check.inst_param(instance, "instance", DagsterInstance), | instance=check.inst_param(instance, "instance", DagsterInstance), | ||||
environment_config=check.inst_param( | environment_config=check.inst_param( | ||||
environment_config, "environment_config", EnvironmentConfig | environment_config, "environment_config", EnvironmentConfig | ||||
), | ), | ||||
type_storage_plugin_registry=check.inst_param( | type_storage_plugin_registry=check.inst_param( | ||||
type_storage_plugin_registry, | type_storage_plugin_registry, | ||||
"type_storage_plugin_registry", | "type_storage_plugin_registry", | ||||
TypeStoragePluginRegistry, | TypeStoragePluginRegistry, | ||||
), | ), | ||||
resources=check.not_none_param(resources, "resources"), | resources=check.not_none_param(resources, "resources"), | ||||
intermediate_storage_config=check.dict_param( | intermediate_storage_config=check.dict_param( | ||||
intermediate_storage_config, intermediate_storage_config, key_type=str | intermediate_storage_config, intermediate_storage_config, key_type=str | ||||
), | ), | ||||
external_intermediates=check.opt_dict_param( | |||||
external_intermediates, "external_intermediates", key_type=StepOutputHandle | |||||
), | |||||
) | ) |