Differential D4918 Diff 25032 python_modules/dagster/dagster/core/host_representation/repository_location.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/host_representation/repository_location.py
Show All 19 Lines | |||||
from dagster.api.snapshot_repository import ( | from dagster.api.snapshot_repository import ( | ||||
sync_get_external_repositories, | sync_get_external_repositories, | ||||
sync_get_streaming_external_repositories_grpc, | sync_get_streaming_external_repositories_grpc, | ||||
) | ) | ||||
from dagster.api.snapshot_schedule import ( | from dagster.api.snapshot_schedule import ( | ||||
sync_get_external_schedule_execution_data, | sync_get_external_schedule_execution_data, | ||||
sync_get_external_schedule_execution_data_grpc, | sync_get_external_schedule_execution_data_grpc, | ||||
) | ) | ||||
from dagster.core.definitions.reconstructable import ReconstructableRepository | |||||
from dagster.core.execution.api import create_execution_plan | from dagster.core.execution.api import create_execution_plan | ||||
from dagster.core.host_representation import ( | from dagster.core.host_representation import ( | ||||
ExternalExecutionPlan, | ExternalExecutionPlan, | ||||
ExternalPipeline, | ExternalPipeline, | ||||
GrpcServerRepositoryLocationHandle, | GrpcServerRepositoryLocationHandle, | ||||
InProcessRepositoryLocationHandle, | InProcessRepositoryLocationHandle, | ||||
ManagedGrpcPythonEnvRepositoryLocationHandle, | ManagedGrpcPythonEnvRepositoryLocationHandle, | ||||
PipelineHandle, | PipelineHandle, | ||||
▲ Show 20 Lines • Show All 117 Lines • ▼ Show 20 Lines | class RepositoryLocation(six.with_metaclass(ABCMeta)): | ||||
def is_reload_supported(self): | def is_reload_supported(self): | ||||
pass | pass | ||||
@staticmethod | @staticmethod | ||||
def from_handle(repository_location_handle): | def from_handle(repository_location_handle): | ||||
check.inst_param( | check.inst_param( | ||||
repository_location_handle, "repository_location_handle", RepositoryLocationHandle | repository_location_handle, "repository_location_handle", RepositoryLocationHandle | ||||
) | ) | ||||
if isinstance(repository_location_handle, InProcessRepositoryLocationHandle): | if isinstance(repository_location_handle, InProcessRepositoryLocationHandle): | ||||
check.invariant(len(repository_location_handle.repository_code_pointer_dict) == 1) | return InProcessRepositoryLocation(repository_location_handle) | ||||
pointer = next(iter(repository_location_handle.repository_code_pointer_dict.values())) | |||||
return InProcessRepositoryLocation(ReconstructableRepository(pointer)) | |||||
elif isinstance(repository_location_handle, PythonEnvRepositoryLocationHandle): | elif isinstance(repository_location_handle, PythonEnvRepositoryLocationHandle): | ||||
return PythonEnvRepositoryLocation(repository_location_handle) | return PythonEnvRepositoryLocation(repository_location_handle) | ||||
elif isinstance( | elif isinstance( | ||||
repository_location_handle, GrpcServerRepositoryLocationHandle | repository_location_handle, GrpcServerRepositoryLocationHandle | ||||
) or isinstance(repository_location_handle, ManagedGrpcPythonEnvRepositoryLocationHandle): | ) or isinstance(repository_location_handle, ManagedGrpcPythonEnvRepositoryLocationHandle): | ||||
return GrpcServerRepositoryLocation(repository_location_handle) | return GrpcServerRepositoryLocation(repository_location_handle) | ||||
else: | else: | ||||
check.failed("Unsupported handle: {}".format(repository_location_handle)) | check.failed("Unsupported handle: {}".format(repository_location_handle)) | ||||
def create_reloaded_repository_location(self): | def create_reloaded_repository_location(self): | ||||
return RepositoryLocation.from_handle(self.location_handle.create_reloaded_handle()) | return RepositoryLocation.from_handle(self.location_handle.create_reloaded_handle()) | ||||
class InProcessRepositoryLocation(RepositoryLocation): | class InProcessRepositoryLocation(RepositoryLocation): | ||||
def __init__(self, recon_repo): | def __init__(self, handle): | ||||
self._recon_repo = check.inst_param(recon_repo, "recon_repo", ReconstructableRepository) | self._handle = check.inst_param(handle, "handle", InProcessRepositoryLocationHandle,) | ||||
self._handle = RepositoryLocationHandle.create_in_process_location(recon_repo.pointer) | |||||
recon_repo = self._handle.origin.recon_repo | |||||
repo_def = recon_repo.get_definition() | repo_def = recon_repo.get_definition() | ||||
def_name = repo_def.name | def_name = repo_def.name | ||||
self._external_repo = external_repo_from_def( | self._external_repo = external_repo_from_def( | ||||
repo_def, | repo_def, | ||||
RepositoryHandle(repository_name=def_name, repository_location_handle=self._handle), | RepositoryHandle(repository_name=def_name, repository_location_handle=self._handle), | ||||
) | ) | ||||
self._repositories = {self._external_repo.name: self._external_repo} | self._repositories = {self._external_repo.name: self._external_repo} | ||||
@property | @property | ||||
def is_reload_supported(self): | def is_reload_supported(self): | ||||
return False | return False | ||||
def get_reconstructable_pipeline(self, name): | def get_reconstructable_pipeline(self, name): | ||||
return self._recon_repo.get_reconstructable_pipeline(name) | return self.get_reconstructable_repository().get_reconstructable_pipeline(name) | ||||
def get_reconstructable_repository(self): | def get_reconstructable_repository(self): | ||||
return self._recon_repo | return self._handle.origin.recon_repo | ||||
@property | @property | ||||
def name(self): | def name(self): | ||||
return self._handle.location_name | return self._handle.location_name | ||||
@property | @property | ||||
def location_handle(self): | def location_handle(self): | ||||
return self._handle | return self._handle | ||||
▲ Show 20 Lines • Show All 439 Lines • Show Last 20 Lines |