Differential D4761 Diff 23634 python_modules/dagster/dagster/core/host_representation/repository_location.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/host_representation/repository_location.py
Show First 20 Lines • Show All 136 Lines • ▼ Show 20 Lines | class RepositoryLocation(six.with_metaclass(ABCMeta)): | ||||
): | ): | ||||
pass | pass | ||||
@abstractmethod | @abstractmethod | ||||
def get_subset_external_pipeline_result(self, selector): | def get_subset_external_pipeline_result(self, selector): | ||||
pass | pass | ||||
@abstractmethod | @abstractmethod | ||||
def get_external_partition_config(self, repository_handle, partition_set_name, partition_name): | def get_external_partition_config( | ||||
self, instance, repository_handle, partition_set_name, partition_name | |||||
): | |||||
pass | pass | ||||
@abstractmethod | @abstractmethod | ||||
def get_external_partition_tags(self, repository_handle, partition_set_name, partition_name): | def get_external_partition_tags( | ||||
self, instance, repository_handle, partition_set_name, partition_name | |||||
): | |||||
pass | pass | ||||
@abstractmethod | @abstractmethod | ||||
def get_external_partition_names(self, repository_handle, partition_set_name): | def get_external_partition_names(self, instance, repository_handle, partition_set_name): | ||||
pass | pass | ||||
@abstractmethod | @abstractmethod | ||||
def get_external_partition_set_execution_param_data( | def get_external_partition_set_execution_param_data( | ||||
self, repository_handle, partition_set_name, partition_names | self, instance, repository_handle, partition_set_name, partition_names | ||||
): | ): | ||||
pass | pass | ||||
@abstractmethod | @abstractmethod | ||||
def get_external_schedule_execution_data( | def get_external_schedule_execution_data( | ||||
self, | self, | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
▲ Show 20 Lines • Show All 156 Lines • ▼ Show 20 Lines | ): | ||||
pipeline = self.get_reconstructable_pipeline( | pipeline = self.get_reconstructable_pipeline( | ||||
external_pipeline.name | external_pipeline.name | ||||
).subset_for_execution_from_existing_pipeline(external_pipeline.solids_to_execute) | ).subset_for_execution_from_existing_pipeline(external_pipeline.solids_to_execute) | ||||
execution_result = execute_run(pipeline, pipeline_run, instance) | execution_result = execute_run(pipeline, pipeline_run, instance) | ||||
return ExternalPipelineExecutionResult(event_list=execution_result.event_list) | return ExternalPipelineExecutionResult(event_list=execution_result.event_list) | ||||
def get_external_partition_config(self, repository_handle, partition_set_name, partition_name): | def get_external_partition_config( | ||||
self, instance, repository_handle, partition_set_name, partition_name | |||||
): | |||||
check.inst_param(instance, "instance", DagsterInstance) | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
check.str_param(partition_name, "partition_name") | check.str_param(partition_name, "partition_name") | ||||
args = PartitionArgs( | args = PartitionArgs( | ||||
repository_origin=repository_handle.get_origin(), | repository_origin=repository_handle.get_origin(), | ||||
partition_set_name=partition_set_name, | partition_set_name=partition_set_name, | ||||
partition_name=partition_name, | partition_name=partition_name, | ||||
instance_ref=instance.get_ref(), | |||||
) | ) | ||||
return get_partition_config(args) | return get_partition_config(args) | ||||
def get_external_partition_tags(self, repository_handle, partition_set_name, partition_name): | def get_external_partition_tags( | ||||
self, instance, repository_handle, partition_set_name, partition_name | |||||
): | |||||
check.inst_param(instance, "instance", DagsterInstance) | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
check.str_param(partition_name, "partition_name") | check.str_param(partition_name, "partition_name") | ||||
args = PartitionArgs( | args = PartitionArgs( | ||||
repository_origin=repository_handle.get_origin(), | repository_origin=repository_handle.get_origin(), | ||||
partition_set_name=partition_set_name, | partition_set_name=partition_set_name, | ||||
partition_name=partition_name, | partition_name=partition_name, | ||||
instance_ref=instance.get_ref(), | |||||
) | ) | ||||
return get_partition_tags(args) | return get_partition_tags(args) | ||||
def get_external_partition_names(self, repository_handle, partition_set_name): | def get_external_partition_names(self, instance, repository_handle, partition_set_name): | ||||
check.inst_param(instance, "instance", DagsterInstance) | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
args = PartitionNamesArgs( | args = PartitionNamesArgs( | ||||
repository_origin=repository_handle.get_origin(), partition_set_name=partition_set_name | repository_origin=repository_handle.get_origin(), | ||||
partition_set_name=partition_set_name, | |||||
instance_ref=instance.get_ref(), | |||||
) | ) | ||||
return get_partition_names(args) | return get_partition_names(args) | ||||
def get_external_schedule_execution_data( | def get_external_schedule_execution_data( | ||||
self, | self, | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
Show All 34 Lines | def get_external_executable_params(self, instance, repository_handle, name): | ||||
repo_origin = repository_handle.get_origin() | repo_origin = repository_handle.get_origin() | ||||
recon_repo = recon_repository_from_origin(repo_origin) | recon_repo = recon_repository_from_origin(repo_origin) | ||||
args = ExternalExecutableArgs( | args = ExternalExecutableArgs( | ||||
instance_ref=instance.get_ref(), repository_origin=repo_origin, name=name, | instance_ref=instance.get_ref(), repository_origin=repo_origin, name=name, | ||||
) | ) | ||||
return get_external_executable_params(recon_repo, args) | return get_external_executable_params(recon_repo, args) | ||||
def get_external_partition_set_execution_param_data( | def get_external_partition_set_execution_param_data( | ||||
self, repository_handle, partition_set_name, partition_names | self, instance, repository_handle, partition_set_name, partition_names | ||||
): | ): | ||||
check.inst_param(instance, "instance", DagsterInstance) | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
check.list_param(partition_names, "partition_names", of_type=str) | check.list_param(partition_names, "partition_names", of_type=str) | ||||
args = PartitionSetExecutionParamArgs( | args = PartitionSetExecutionParamArgs( | ||||
repository_origin=repository_handle.get_origin(), | repository_origin=repository_handle.get_origin(), | ||||
partition_set_name=partition_set_name, | partition_set_name=partition_set_name, | ||||
partition_names=partition_names, | partition_names=partition_names, | ||||
instance_ref=instance.get_ref(), | |||||
) | ) | ||||
return get_partition_set_execution_param_data(args) | return get_partition_set_execution_param_data(args) | ||||
class GrpcServerRepositoryLocation(RepositoryLocation): | class GrpcServerRepositoryLocation(RepositoryLocation): | ||||
def __init__(self, repository_location_handle): | def __init__(self, repository_location_handle): | ||||
check.param_invariant( | check.param_invariant( | ||||
isinstance(repository_location_handle, GrpcServerRepositoryLocationHandle) | isinstance(repository_location_handle, GrpcServerRepositoryLocationHandle) | ||||
▲ Show 20 Lines • Show All 91 Lines • ▼ Show 20 Lines | def get_subset_external_pipeline_result(self, selector): | ||||
) | ) | ||||
external_repository = self.external_repositories[selector.repository_name] | external_repository = self.external_repositories[selector.repository_name] | ||||
pipeline_handle = PipelineHandle(selector.pipeline_name, external_repository.handle) | pipeline_handle = PipelineHandle(selector.pipeline_name, external_repository.handle) | ||||
return sync_get_external_pipeline_subset_grpc( | return sync_get_external_pipeline_subset_grpc( | ||||
self._handle.client, pipeline_handle.get_origin(), selector.solid_selection | self._handle.client, pipeline_handle.get_origin(), selector.solid_selection | ||||
) | ) | ||||
def get_external_partition_config(self, repository_handle, partition_set_name, partition_name): | def get_external_partition_config( | ||||
self, instance, repository_handle, partition_set_name, partition_name | |||||
): | |||||
check.inst_param(instance, "instance", DagsterInstance) | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
check.str_param(partition_name, "partition_name") | check.str_param(partition_name, "partition_name") | ||||
return sync_get_external_partition_config_grpc( | return sync_get_external_partition_config_grpc( | ||||
self._handle.client, repository_handle, partition_set_name, partition_name | self._handle.client, instance, repository_handle, partition_set_name, partition_name | ||||
) | ) | ||||
def get_external_partition_tags(self, repository_handle, partition_set_name, partition_name): | def get_external_partition_tags( | ||||
self, instance, repository_handle, partition_set_name, partition_name | |||||
): | |||||
check.inst_param(instance, "instance", DagsterInstance) | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
check.str_param(partition_name, "partition_name") | check.str_param(partition_name, "partition_name") | ||||
return sync_get_external_partition_tags_grpc( | return sync_get_external_partition_tags_grpc( | ||||
self._handle.client, repository_handle, partition_set_name, partition_name | self._handle.client, instance, repository_handle, partition_set_name, partition_name | ||||
) | ) | ||||
def get_external_partition_names(self, repository_handle, partition_set_name): | def get_external_partition_names(self, instance, repository_handle, partition_set_name): | ||||
check.inst_param(instance, "instance", DagsterInstance) | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
return sync_get_external_partition_names_grpc( | return sync_get_external_partition_names_grpc( | ||||
self._handle.client, repository_handle, partition_set_name | self._handle.client, instance, repository_handle, partition_set_name | ||||
) | ) | ||||
def get_external_schedule_execution_data( | def get_external_schedule_execution_data( | ||||
self, | self, | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
Show All 22 Lines | def get_external_executable_params(self, instance, repository_handle, name): | ||||
check.inst_param(instance, "instance", DagsterInstance) | check.inst_param(instance, "instance", DagsterInstance) | ||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
return sync_get_external_executable_params_grpc( | return sync_get_external_executable_params_grpc( | ||||
self._handle.client, instance, repository_handle, name | self._handle.client, instance, repository_handle, name | ||||
) | ) | ||||
def get_external_partition_set_execution_param_data( | def get_external_partition_set_execution_param_data( | ||||
self, repository_handle, partition_set_name, partition_names | self, instance, repository_handle, partition_set_name, partition_names | ||||
): | ): | ||||
check.inst_param(instance, "instance", DagsterInstance) | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
check.list_param(partition_names, "partition_names", of_type=str) | check.list_param(partition_names, "partition_names", of_type=str) | ||||
return sync_get_external_partition_set_execution_param_data_grpc( | return sync_get_external_partition_set_execution_param_data_grpc( | ||||
self._handle.client, repository_handle, partition_set_name, partition_names | self._handle.client, instance, repository_handle, partition_set_name, partition_names, | ||||
) | ) | ||||
class PythonEnvRepositoryLocation(RepositoryLocation): | class PythonEnvRepositoryLocation(RepositoryLocation): | ||||
def __init__(self, repository_location_handle): | def __init__(self, repository_location_handle): | ||||
self._handle = check.inst_param( | self._handle = check.inst_param( | ||||
repository_location_handle, | repository_location_handle, | ||||
"repository_location_handle", | "repository_location_handle", | ||||
▲ Show 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | def get_subset_external_pipeline_result(self, selector): | ||||
) | ) | ||||
external_repository = self.external_repositories[selector.repository_name] | external_repository = self.external_repositories[selector.repository_name] | ||||
pipeline_handle = PipelineHandle(selector.pipeline_name, external_repository.handle) | pipeline_handle = PipelineHandle(selector.pipeline_name, external_repository.handle) | ||||
return sync_get_external_pipeline_subset( | return sync_get_external_pipeline_subset( | ||||
pipeline_handle.get_origin(), selector.solid_selection | pipeline_handle.get_origin(), selector.solid_selection | ||||
) | ) | ||||
def get_external_partition_config(self, repository_handle, partition_set_name, partition_name): | def get_external_partition_config( | ||||
self, instance, repository_handle, partition_set_name, partition_name | |||||
): | |||||
check.inst_param(instance, "instance", DagsterInstance) | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
check.str_param(partition_name, "partition_name") | check.str_param(partition_name, "partition_name") | ||||
return sync_get_external_partition_config( | return sync_get_external_partition_config( | ||||
repository_handle, partition_set_name, partition_name | instance, repository_handle, partition_set_name, partition_name | ||||
) | ) | ||||
def get_external_partition_tags(self, repository_handle, partition_set_name, partition_name): | def get_external_partition_tags( | ||||
self, instance, repository_handle, partition_set_name, partition_name | |||||
): | |||||
check.inst_param(instance, "instacne", DagsterInstance) | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
check.str_param(partition_name, "partition_name") | check.str_param(partition_name, "partition_name") | ||||
return sync_get_external_partition_tags( | return sync_get_external_partition_tags( | ||||
repository_handle, partition_set_name, partition_name | instance, repository_handle, partition_set_name, partition_name | ||||
) | ) | ||||
def get_external_partition_names(self, repository_handle, partition_set_name): | def get_external_partition_names(self, instance, repository_handle, partition_set_name): | ||||
check.inst_param(instance, "instance", DagsterInstance) | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
return sync_get_external_partition_names(repository_handle, partition_set_name) | return sync_get_external_partition_names(instance, repository_handle, partition_set_name) | ||||
def get_external_schedule_execution_data( | def get_external_schedule_execution_data( | ||||
self, | self, | ||||
instance, | instance, | ||||
repository_handle, | repository_handle, | ||||
schedule_name, | schedule_name, | ||||
schedule_execution_data_mode, | schedule_execution_data_mode, | ||||
scheduled_execution_time, | scheduled_execution_time, | ||||
Show All 18 Lines | class PythonEnvRepositoryLocation(RepositoryLocation): | ||||
def get_external_executable_params(self, instance, repository_handle, name): | def get_external_executable_params(self, instance, repository_handle, name): | ||||
check.inst_param(instance, "instance", DagsterInstance) | check.inst_param(instance, "instance", DagsterInstance) | ||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
return sync_get_external_executable_params(instance, repository_handle, name) | return sync_get_external_executable_params(instance, repository_handle, name) | ||||
def get_external_partition_set_execution_param_data( | def get_external_partition_set_execution_param_data( | ||||
self, repository_handle, partition_set_name, partition_names | self, instance, repository_handle, partition_set_name, partition_names | ||||
): | ): | ||||
check.inst_param(instance, "instance", DagsterInstance) | |||||
check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | check.inst_param(repository_handle, "repository_handle", RepositoryHandle) | ||||
check.str_param(partition_set_name, "partition_set_name") | check.str_param(partition_set_name, "partition_set_name") | ||||
check.list_param(partition_names, "partition_names", of_type=str) | check.list_param(partition_names, "partition_names", of_type=str) | ||||
return sync_get_external_partition_set_execution_param_data( | return sync_get_external_partition_set_execution_param_data( | ||||
repository_handle, partition_set_name, partition_names | instance, repository_handle, partition_set_name, partition_names, | ||||
) | ) |