Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/cli/workspace/context.py
Show First 20 Lines • Show All 87 Lines • ▼ Show 20 Lines | def get_load_status(self, name: str) -> WorkspaceLocationLoadStatus: | ||||
return self.workspace_snapshot[name].load_status | return self.workspace_snapshot[name].load_status | ||||
def has_repository_location_error(self, name: str) -> bool: | def has_repository_location_error(self, name: str) -> bool: | ||||
return self.get_repository_location_error(name) != None | return self.get_repository_location_error(name) != None | ||||
def get_repository_location_error(self, name: str) -> Optional[SerializableErrorInfo]: | def get_repository_location_error(self, name: str) -> Optional[SerializableErrorInfo]: | ||||
return self.workspace_snapshot[name].load_error | return self.workspace_snapshot[name].load_error | ||||
def has_repository_location_name(self, name: str) -> bool: | |||||
return bool(self.workspace_snapshot.get(name)) | |||||
def has_repository_location(self, name: str) -> bool: | def has_repository_location(self, name: str) -> bool: | ||||
location_entry = self.workspace_snapshot.get(name) | location_entry = self.workspace_snapshot.get(name) | ||||
return bool(location_entry and location_entry.repository_location != None) | return bool(location_entry and location_entry.repository_location != None) | ||||
def is_reload_supported(self, name: str) -> bool: | def is_reload_supported(self, name: str) -> bool: | ||||
return self.workspace_snapshot[name].origin.is_reload_supported | return self.workspace_snapshot[name].origin.is_reload_supported | ||||
def is_shutdown_supported(self, name: str) -> bool: | |||||
return self.workspace_snapshot[name].origin.is_shutdown_supported | |||||
def reload_repository_location(self, name: str) -> "IWorkspaceRequestContext": | def reload_repository_location(self, name: str) -> "IWorkspaceRequestContext": | ||||
# This method reloads the location on the process context, and returns a new | # This method reloads the location on the process context, and returns a new | ||||
# request context created from the updated process context | # request context created from the updated process context | ||||
self.process_context.reload_repository_location(name) | self.process_context.reload_repository_location(name) | ||||
return self.process_context.create_request_context() | return self.process_context.create_request_context() | ||||
def shutdown_repository_location(self, name: str): | |||||
self.process_context.shutdown_repository_location(name) | |||||
def reload_workspace(self) -> "IWorkspaceRequestContext": | def reload_workspace(self) -> "IWorkspaceRequestContext": | ||||
self.process_context.reload_workspace() | self.process_context.reload_workspace() | ||||
return self.process_context.create_request_context() | return self.process_context.create_request_context() | ||||
def has_external_pipeline(self, selector: PipelineSelector) -> bool: | def has_external_pipeline(self, selector: PipelineSelector) -> bool: | ||||
check.inst_param(selector, "selector", PipelineSelector) | check.inst_param(selector, "selector", PipelineSelector) | ||||
loc = self.get_repository_location(selector.location_name) | loc = self.get_repository_location(selector.location_name) | ||||
return ( | return ( | ||||
▲ Show 20 Lines • Show All 129 Lines • ▼ Show 20 Lines | class IWorkspaceProcessContext(ABC): | ||||
@abstractproperty | @abstractproperty | ||||
def location_state_events(self) -> "Subject": | def location_state_events(self) -> "Subject": | ||||
pass | pass | ||||
@abstractmethod | @abstractmethod | ||||
def reload_repository_location(self, name: str) -> None: | def reload_repository_location(self, name: str) -> None: | ||||
pass | pass | ||||
def shutdown_repository_location(self, name: str) -> None: | |||||
raise NotImplementedError | |||||
@abstractmethod | @abstractmethod | ||||
def reload_workspace(self) -> None: | def reload_workspace(self) -> None: | ||||
pass | pass | ||||
@abstractproperty | @abstractproperty | ||||
def instance(self): | def instance(self): | ||||
pass | pass | ||||
▲ Show 20 Lines • Show All 82 Lines • ▼ Show 20 Lines | def _location_state_events_handler(self, event: LocationStateChangeEvent) -> None: | ||||
# with the correct error messages | # with the correct error messages | ||||
self.reload_repository_location(event.location_name) | self.reload_repository_location(event.location_name) | ||||
self._location_state_events.on_next(event) | self._location_state_events.on_next(event) | ||||
def reload_repository_location(self, name: str) -> None: | def reload_repository_location(self, name: str) -> None: | ||||
self._workspace.reload_repository_location(name) | self._workspace.reload_repository_location(name) | ||||
def shutdown_repository_location(self, name: str) -> None: | |||||
self._workspace.shutdown_repository_location(name) | |||||
def reload_workspace(self) -> None: | def reload_workspace(self) -> None: | ||||
self._workspace.reload_workspace() | self._workspace.reload_workspace() | ||||
self._set_state_subscribers() | self._set_state_subscribers() |