Changeset View
Standalone View
python_modules/dagster-graphql/dagster_graphql/client/client.py
from typing import Any, Dict, Optional | from typing import Any, Dict, List, Optional | |||||||||||||||
from dagster import check | from dagster import check | |||||||||||||||
from dagster.core.storage.pipeline_run import PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRunStatus | |||||||||||||||
from gql import Client, gql | from gql import Client, gql | |||||||||||||||
from gql.transport.requests import RequestsHTTPTransport | from gql.transport.requests import RequestsHTTPTransport | |||||||||||||||
from .client_queries import GET_PIPELINE_RUN_STATUS_QUERY, RELOAD_REPOSITORY_LOCATION_MUTATION | from .client_queries import ( | |||||||||||||||
CLIENT_GET_REPO_LOCATIONS_AND_NAMES_QUERY, | ||||||||||||||||
CLIENT_SUBMIT_PIPELINE_RUN_MUTATION, | ||||||||||||||||
GET_PIPELINE_RUN_STATUS_QUERY, | ||||||||||||||||
RELOAD_REPOSITORY_LOCATION_MUTATION, | ||||||||||||||||
) | ||||||||||||||||
from .utils import ( | from .utils import ( | |||||||||||||||
DagsterGraphQLClientError, | DagsterGraphQLClientError, | |||||||||||||||
ReloadRepositoryLocationInfo, | ReloadRepositoryLocationInfo, | |||||||||||||||
ReloadRepositoryLocationStatus, | ReloadRepositoryLocationStatus, | |||||||||||||||
RepositoryNode, | ||||||||||||||||
) | ) | |||||||||||||||
class DagsterGraphQLClient: | class DagsterGraphQLClient: | |||||||||||||||
"""Official Dagster Python Client for GraphQL | """Official Dagster Python Client for GraphQL | |||||||||||||||
Utilizes the gql library to dispatch queries over HTTP to a remote Dagster GraphQL Server | Utilizes the gql library to dispatch queries over HTTP to a remote Dagster GraphQL Server | |||||||||||||||
Show All 15 Lines | def __init__(self, hostname: str, port_number: Optional[int] = None): | |||||||||||||||
"http://" | "http://" | |||||||||||||||
+ (f"{self._hostname}:{self._port_number}" if self._port_number else self._hostname) | + (f"{self._hostname}:{self._port_number}" if self._port_number else self._hostname) | |||||||||||||||
+ "/graphql" | + "/graphql" | |||||||||||||||
) | ) | |||||||||||||||
self._transport = RequestsHTTPTransport(url=self._url, use_json=True) | self._transport = RequestsHTTPTransport(url=self._url, use_json=True) | |||||||||||||||
self._client = Client(transport=self._transport, fetch_schema_from_transport=True) | self._client = Client(transport=self._transport, fetch_schema_from_transport=True) | |||||||||||||||
super(DagsterGraphQLClient, self).__init__() | super(DagsterGraphQLClient, self).__init__() | |||||||||||||||
def _execute(self, query: str, variables: Dict[str, Any]): | def _execute(self, query: str, variables: Dict[str, Any] = None): | |||||||||||||||
rexledesma: nit: type - weird how this isn't caught by mypy? | ||||||||||||||||
Done Inline Actionsyeah that is pretty weird actually.... ? sidkmenon: yeah that is pretty weird actually.... ? | ||||||||||||||||
try: | try: | |||||||||||||||
if variables is None: | ||||||||||||||||
return self._client.execute(gql(query)) | ||||||||||||||||
else: | ||||||||||||||||
return self._client.execute(gql(query), variable_values=variables) | return self._client.execute(gql(query), variable_values=variables) | |||||||||||||||
except Exception as exc: | except Exception as exc: | |||||||||||||||
Done Inline Actions
rexledesma: | ||||||||||||||||
raise DagsterGraphQLClientError( | raise DagsterGraphQLClientError( | |||||||||||||||
f"Query \n{query}\n with variables \n{variables}\n failed GraphQL validation" | f"Query \n{query}\n with variables \n{variables}\n failed GraphQL validation" | |||||||||||||||
) from exc | ) from exc | |||||||||||||||
def _get_repo_locations_and_names(self) -> List[RepositoryNode]: | ||||||||||||||||
res_data = self._execute(CLIENT_GET_REPO_LOCATIONS_AND_NAMES_QUERY) | ||||||||||||||||
query_res = res_data["repositoryLocationsOrError"] | ||||||||||||||||
repo_location_connection_status = query_res["__typename"] | ||||||||||||||||
if repo_location_connection_status == "RepositoryLocationConnection": | ||||||||||||||||
valid_nodes: List[RepositoryNode] = [ | ||||||||||||||||
RepositoryNode.from_dict(node) | ||||||||||||||||
for node in query_res["nodes"] | ||||||||||||||||
if node["__typename"] == "RepositoryLocation" | ||||||||||||||||
] | ||||||||||||||||
Done Inline Actions
this should be flattened for readability rexledesma: this should be flattened for readability | ||||||||||||||||
return valid_nodes | ||||||||||||||||
else: | ||||||||||||||||
raise DagsterGraphQLClientError(repo_location_connection_status, query_res["message"]) | ||||||||||||||||
def submit_pipeline_execution( | def submit_pipeline_execution( | |||||||||||||||
Not Done Inline ActionsAm I understanding correctly that the underlying GraphQL mutation is called launchPipelineExecution? If so, any reason not to call this method launch_pipeline_execution? sandyryza: Am I understanding correctly that the underlying GraphQL mutation is called… | ||||||||||||||||
Done Inline ActionsI think after a conversation with Alex, he recommended calling this submit_pipeline_execution? I think we have been doing a rename under the hood from launch_pipeline to submit_pipeline, I guess ostensibly because the submit change now sends the run to the daemon which launches it instead. From conversations with Johann, it seems a bit messy, so I'm happy to rename this to launch_pipeline_execution if you think that's better. sidkmenon: I think after a conversation with Alex, he recommended calling this `submit_pipeline_execution`? | ||||||||||||||||
Not Done Inline Actionscc @johann https://github.com/dagster-io/dagster/issues/3182 for context - when we introduced the RunCoordinator for run queueing we made launch from dagit / graphql actually do submit but didn't follow up on any renames in the product or GraphQL schema its not obvious whats the best path here, we could
alangenfeld: cc @johann
https://github.com/dagster-io/dagster/issues/3182
for context - when we introduced… | ||||||||||||||||
Not Done Inline ActionsAh - got it. How difficult would it be to just add submitPipelineExecution to the GraphQL schema? sandyryza: Ah - got it. How difficult would it be to just add submitPipelineExecution to the GraphQL… | ||||||||||||||||
Not Done Inline Actionsadding it - pretty easy alangenfeld: adding it - pretty easy
putting it under sufficient test - not complicated but will take time | ||||||||||||||||
Done Inline ActionsIdea - could we land this & then change the backing query on the client when submitPipelineExecution is ready? I have the backwards compat setup already handled so I think we should be good? sidkmenon: Idea - could we land this & then change the backing query on the client when… | ||||||||||||||||
self, | self, | |||||||||||||||
pipeline_name: str, | pipeline_name: str, | |||||||||||||||
repository_location_name: Optional[str] = None, | repository_location_name: Optional[str] = None, | |||||||||||||||
repository_name: Optional[str] = None, | repository_name: Optional[str] = None, | |||||||||||||||
run_config_data: Optional[Any] = None, | run_config_data: Optional[Any] = None, | |||||||||||||||
mode: Optional[str] = None, | mode: Optional[str] = "default", | |||||||||||||||
preset_name: Optional[str] = None, | preset_name: Optional[str] = None, | |||||||||||||||
) -> str: | ) -> str: | |||||||||||||||
raise NotImplementedError("not yet implemented") | """Submits a Pipeline with attached configuration to the RunCoordinator for execution. | |||||||||||||||
Args: | ||||||||||||||||
pipeline_name (str): The pipeline's name | ||||||||||||||||
repository_location_name (Optional[str]): The name of the repository location where | ||||||||||||||||
the pipeline is located. If omitted, the client will try to infer the repository location | ||||||||||||||||
from the available options on the Dagster deployment. Defaults to None. | ||||||||||||||||
repository_name (Optional[str]): The name of the repository where the pipeline is located. | ||||||||||||||||
If omitted, the client will try to infer the repository from the available options | ||||||||||||||||
on the Dagster deployment. Defaults to None. | ||||||||||||||||
run_config_data (Optional[Any], optional): This is the run config to execute the pipeline with. | ||||||||||||||||
Note that runConfigData is any-typed in the GraphQL type system. This type is used when passing in | ||||||||||||||||
an arbitrary object for run config. However, it must conform to the constraints of the config | ||||||||||||||||
schema for this pipeline. If it does not, the client will throw a DagsterGraphQLClientError with a message of | ||||||||||||||||
PipelineConfigValidationInvalid. Defaults to None. | ||||||||||||||||
mode (Optional[str], optional): The mode to run the pipeline with. If you have not | ||||||||||||||||
defined any custom modes for your pipeline, the default mode is "default". Defaults to "default" | ||||||||||||||||
preset_name (Optional[str], optional): The name of a pre-defined preset to use instead of a | ||||||||||||||||
Done Inline Actions
unnecessary? rexledesma: unnecessary? | ||||||||||||||||
Done Inline Actionsthe extra optional seems to be part of the autogenerated doc strings. but it looks like I accidentally deleted the optionals or something on repository_location_name and repository_name, etc, so I added them back for consistency. sidkmenon: the extra `optional` seems to be part of the autogenerated doc strings. but it looks like I… | ||||||||||||||||
run config. Defaults to None. | ||||||||||||||||
Raises: | ||||||||||||||||
DagsterGraphQLClientError: client errors can happen in several ways, including: | ||||||||||||||||
1. An error from submitted params | ||||||||||||||||
2. An error from the response: | ||||||||||||||||
a. InvalidStepError - the pipeline has an invalid step | ||||||||||||||||
b. InvalidOutputError - some solid has an invalid output within the pipeline | ||||||||||||||||
c. ConflictingExecutionParamsError - if a preset & a run_config + mode are present | ||||||||||||||||
that conflict with one another | ||||||||||||||||
d. PresetNotFoundError - if the provided preset name is not found | ||||||||||||||||
e. PipelineConfigurationInvalid - the run_config_data is not in the expected format | ||||||||||||||||
for the pipeline | ||||||||||||||||
f. PipelineNotFoundError - the requested pipeline does not exist | ||||||||||||||||
g. PythonError - an internal framework error | ||||||||||||||||
Returns: | ||||||||||||||||
str: run id of the submitted pipeline run | ||||||||||||||||
""" | ||||||||||||||||
check.opt_str_param(repository_location_name, "repository_location_name") | ||||||||||||||||
check.opt_str_param(repository_name, "repository_name") | ||||||||||||||||
check.str_param(pipeline_name, "pipeline_name") | ||||||||||||||||
check.opt_str_param(mode, "mode") | ||||||||||||||||
check.opt_str_param(preset_name, "preset_name") | ||||||||||||||||
check.invariant( | ||||||||||||||||
(mode is not None and run_config_data is not None) or preset_name is not None, | ||||||||||||||||
"Either a mode and run_config_data or a preset must be specified in order to" | ||||||||||||||||
+ f"submit the pipeline {pipeline_name} for execution", | ||||||||||||||||
Done Inline Actions
spacing, and remove + for concat multiline strings rexledesma: spacing, and remove `+` for concat multiline strings | ||||||||||||||||
) | ||||||||||||||||
if not repository_location_name or not repository_name: | ||||||||||||||||
repo_node_lst = self._get_repo_locations_and_names() | ||||||||||||||||
if len(repo_node_lst) == 1: | ||||||||||||||||
repo_node = repo_node_lst[0] | ||||||||||||||||
if len(repo_node.repository_names) == 1: | ||||||||||||||||
repository_location_name = repo_node.repository_location_name | ||||||||||||||||
repository_name = repo_node.repository_names[0] | ||||||||||||||||
variables = { | ||||||||||||||||
"executionParams": { | ||||||||||||||||
"selector": { | ||||||||||||||||
"repositoryLocationName": repository_location_name, | ||||||||||||||||
"repositoryName": repository_name, | ||||||||||||||||
"pipelineName": pipeline_name, | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
if preset_name is not None: | ||||||||||||||||
variables["executionParams"]["preset"] = preset_name | ||||||||||||||||
if mode is not None and run_config_data is not None: | ||||||||||||||||
variables["executionParams"] = { | ||||||||||||||||
**variables["executionParams"], | ||||||||||||||||
"runConfigData": run_config_data, | ||||||||||||||||
"mode": mode, | ||||||||||||||||
} | ||||||||||||||||
res_data: Dict[str, Any] = self._execute(CLIENT_SUBMIT_PIPELINE_RUN_MUTATION, variables) | ||||||||||||||||
query_result = res_data["launchPipelineExecution"] | ||||||||||||||||
query_result_type = query_result["__typename"] | ||||||||||||||||
if query_result_type == "LaunchPipelineRunSuccess": | ||||||||||||||||
return query_result["run"]["runId"] | ||||||||||||||||
elif query_result_type == "InvalidStepError": | ||||||||||||||||
raise DagsterGraphQLClientError(query_result_type, query_result["invalidStepKey"]) | ||||||||||||||||
elif query_result_type == "InvalidOutputError": | ||||||||||||||||
relevant_error_properties = frozenset({"stepKey", "invalidOutputName"}) | ||||||||||||||||
raise DagsterGraphQLClientError( | ||||||||||||||||
query_result_type, | ||||||||||||||||
{key: query_result.get(key, None) for key in relevant_error_properties}, | ||||||||||||||||
) | ||||||||||||||||
elif query_result_type == "PipelineConfigurationInvalid": | ||||||||||||||||
raise DagsterGraphQLClientError(query_result_type, query_result["errors"]) | ||||||||||||||||
else: | ||||||||||||||||
# query_result_type is a ConflictingExecutionParamsError, a PresetNotFoundError | ||||||||||||||||
# or a PipelineNotFoundError, or a PythonError | ||||||||||||||||
raise DagsterGraphQLClientError(query_result_type, query_result["message"]) | ||||||||||||||||
def get_run_status(self, run_id: str) -> PipelineRunStatus: | def get_run_status(self, run_id: str) -> PipelineRunStatus: | |||||||||||||||
"""Get the status of a given Pipeline Run | """Get the status of a given Pipeline Run | |||||||||||||||
Args: | Args: | |||||||||||||||
run_id (str): run id of the requested pipeline run. | run_id (str): run id of the requested pipeline run. | |||||||||||||||
Raises: | Raises: | |||||||||||||||
▲ Show 20 Lines • Show All 50 Lines • Show Last 20 Lines |
nit: type - weird how this isn't caught by mypy?