Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/client/client.py
Show All 9 Lines | |||||
from gql.transport import Transport | from gql.transport import Transport | ||||
from gql.transport.requests import RequestsHTTPTransport | from gql.transport.requests import RequestsHTTPTransport | ||||
from .client_queries import ( | from .client_queries import ( | ||||
CLIENT_GET_REPO_LOCATIONS_NAMES_AND_PIPELINES_QUERY, | CLIENT_GET_REPO_LOCATIONS_NAMES_AND_PIPELINES_QUERY, | ||||
CLIENT_SUBMIT_PIPELINE_RUN_MUTATION, | CLIENT_SUBMIT_PIPELINE_RUN_MUTATION, | ||||
GET_PIPELINE_RUN_STATUS_QUERY, | GET_PIPELINE_RUN_STATUS_QUERY, | ||||
RELOAD_REPOSITORY_LOCATION_MUTATION, | RELOAD_REPOSITORY_LOCATION_MUTATION, | ||||
SHUTDOWN_REPOSITORY_LOCATION_MUTATION, | |||||
) | ) | ||||
from .utils import ( | from .utils import ( | ||||
DagsterGraphQLClientError, | DagsterGraphQLClientError, | ||||
InvalidOutputErrorInfo, | InvalidOutputErrorInfo, | ||||
PipelineInfo, | PipelineInfo, | ||||
ReloadRepositoryLocationInfo, | ReloadRepositoryLocationInfo, | ||||
ReloadRepositoryLocationStatus, | ReloadRepositoryLocationStatus, | ||||
ShutdownRepositoryLocationInfo, | |||||
ShutdownRepositoryLocationStatus, | |||||
) | ) | ||||
class DagsterGraphQLClient: | class DagsterGraphQLClient: | ||||
"""Official Dagster Python Client for GraphQL | """Official Dagster Python Client for GraphQL | ||||
Utilizes the gql library to dispatch queries over HTTP to a remote Dagster GraphQL Server | Utilizes the gql library to dispatch queries over HTTP to a remote Dagster GraphQL Server | ||||
▲ Show 20 Lines • Show All 258 Lines • ▼ Show 20 Lines | ) -> ReloadRepositoryLocationInfo: | ||||
) | ) | ||||
else: | else: | ||||
# query_result_type is either ReloadNotSupported or RepositoryLocationNotFound | # query_result_type is either ReloadNotSupported or RepositoryLocationNotFound | ||||
return ReloadRepositoryLocationInfo( | return ReloadRepositoryLocationInfo( | ||||
status=ReloadRepositoryLocationStatus.FAILURE, | status=ReloadRepositoryLocationStatus.FAILURE, | ||||
failure_type=query_result_type, | failure_type=query_result_type, | ||||
message=query_result["message"], | message=query_result["message"], | ||||
) | ) | ||||
def shutdown_repository_location( | |||||
self, repository_location_name: str | |||||
) -> ShutdownRepositoryLocationInfo: | |||||
"""Shuts down the server that is serving metadata for the provided repository location. | |||||
This is primarily useful when you want the server to be restarted by the compute environment | |||||
in which it is running (for example, in Kubernetes, the pod in which the server is running | |||||
will automatically restart when the server is shut down, and the repository metadata will | |||||
be reloaded) | |||||
Args: | |||||
repository_location_name (str): The name of the repository location | |||||
Returns: | |||||
ShutdownRepositoryLocationInfo: Object with information about the result of the reload request | |||||
""" | |||||
check.str_param(repository_location_name, "repository_location_name") | |||||
res_data: Dict[str, Dict[str, Any]] = self._execute( | |||||
SHUTDOWN_REPOSITORY_LOCATION_MUTATION, | |||||
{"repositoryLocationName": repository_location_name}, | |||||
) | |||||
query_result: Dict[str, Any] = res_data["shutdownRepositoryLocation"] | |||||
query_result_type: str = query_result["__typename"] | |||||
if query_result_type == "ShutdownRepositoryLocationSuccess": | |||||
return ShutdownRepositoryLocationInfo(status=ShutdownRepositoryLocationStatus.SUCCESS) | |||||
elif ( | |||||
query_result_type == "RepositoryLocationNotFound" or query_result_type == "PythonError" | |||||
): | |||||
return ShutdownRepositoryLocationInfo( | |||||
status=ShutdownRepositoryLocationStatus.FAILURE, | |||||
message=query_result["message"], | |||||
) | |||||
else: | |||||
raise Exception(f"Unexpected query result type {query_result_type}") |