Changeset View
Standalone View
python_modules/dagster-graphql/dagster_graphql/client/client.py
from typing import Any, Dict, Optional | from itertools import chain | |||||||||||||||
from typing import Any, Dict, Iterable, List, Optional | ||||||||||||||||
from dagster import check | from dagster import check | |||||||||||||||
from dagster.core.storage.pipeline_run import PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRunStatus | |||||||||||||||
from dagster.utils.backcompat import experimental_class_warning | from dagster.utils.backcompat import experimental_class_warning | |||||||||||||||
from gql import Client, gql | from gql import Client, gql | |||||||||||||||
from gql.transport.requests import RequestsHTTPTransport | from gql.transport.requests import RequestsHTTPTransport | |||||||||||||||
from .client_queries import GET_PIPELINE_RUN_STATUS_QUERY, RELOAD_REPOSITORY_LOCATION_MUTATION | from .client_queries import ( | |||||||||||||||
CLIENT_GET_REPO_LOCATIONS_NAMES_AND_PIPELINES_QUERY, | ||||||||||||||||
CLIENT_SUBMIT_PIPELINE_RUN_MUTATION, | ||||||||||||||||
GET_PIPELINE_RUN_STATUS_QUERY, | ||||||||||||||||
RELOAD_REPOSITORY_LOCATION_MUTATION, | ||||||||||||||||
) | ||||||||||||||||
from .utils import ( | from .utils import ( | |||||||||||||||
DagsterGraphQLClientError, | DagsterGraphQLClientError, | |||||||||||||||
InvalidOutputErrorInfo, | ||||||||||||||||
PipelineInfo, | ||||||||||||||||
ReloadRepositoryLocationInfo, | ReloadRepositoryLocationInfo, | |||||||||||||||
ReloadRepositoryLocationStatus, | ReloadRepositoryLocationStatus, | |||||||||||||||
) | ) | |||||||||||||||
class DagsterGraphQLClient: | class DagsterGraphQLClient: | |||||||||||||||
"""Official Dagster Python Client for GraphQL | """Official Dagster Python Client for GraphQL | |||||||||||||||
Show All 17 Lines | def __init__(self, hostname: str, port_number: Optional[int] = None): | |||||||||||||||
self._url = ( | self._url = ( | |||||||||||||||
"http://" | "http://" | |||||||||||||||
+ (f"{self._hostname}:{self._port_number}" if self._port_number else self._hostname) | + (f"{self._hostname}:{self._port_number}" if self._port_number else self._hostname) | |||||||||||||||
+ "/graphql" | + "/graphql" | |||||||||||||||
) | ) | |||||||||||||||
self._transport = RequestsHTTPTransport(url=self._url, use_json=True) | self._transport = RequestsHTTPTransport(url=self._url, use_json=True) | |||||||||||||||
self._client = Client(transport=self._transport, fetch_schema_from_transport=True) | self._client = Client(transport=self._transport, fetch_schema_from_transport=True) | |||||||||||||||
def _execute(self, query: str, variables: Dict[str, Any]): | def _execute(self, query: str, variables: Optional[Dict[str, Any]] = None): | |||||||||||||||
rexledesma: nit: type - weird how this isn't caught by mypy? | ||||||||||||||||
Done Inline Actionsyeah that is pretty weird actually.... ? sidkmenon: yeah that is pretty weird actually.... ? | ||||||||||||||||
try: | try: | |||||||||||||||
return self._client.execute(gql(query), variable_values=variables) | return self._client.execute(gql(query), variable_values=variables) | |||||||||||||||
except Exception as exc: # catch generic Exception from the gql client | except Exception as exc: # catch generic Exception from the gql client | |||||||||||||||
Done Inline Actions
rexledesma: | ||||||||||||||||
raise DagsterGraphQLClientError( | raise DagsterGraphQLClientError( | |||||||||||||||
f"Query \n{query}\n with variables \n{variables}\n failed GraphQL validation" | f"Query \n{query}\n with variables \n{variables}\n failed GraphQL validation" | |||||||||||||||
) from exc | ) from exc | |||||||||||||||
def _get_repo_locations_and_names_with_pipeline(self, pipeline_name: str) -> List[PipelineInfo]: | ||||||||||||||||
res_data = self._execute(CLIENT_GET_REPO_LOCATIONS_NAMES_AND_PIPELINES_QUERY) | ||||||||||||||||
query_res = res_data["repositoriesOrError"] | ||||||||||||||||
repo_connection_status = query_res["__typename"] | ||||||||||||||||
if repo_connection_status == "RepositoryConnection": | ||||||||||||||||
valid_nodes: Iterable[PipelineInfo] = chain( | ||||||||||||||||
map(PipelineInfo.from_node, query_res["nodes"]) | ||||||||||||||||
) | ||||||||||||||||
return [ | ||||||||||||||||
repo_node_tuple | ||||||||||||||||
Done Inline Actions
this should be flattened for readability rexledesma: this should be flattened for readability | ||||||||||||||||
for repo_node_tuple_lst in valid_nodes | ||||||||||||||||
for repo_node_tuple in repo_node_tuple_lst | ||||||||||||||||
if repo_node_tuple.pipeline_name == pipeline_name | ||||||||||||||||
] | ||||||||||||||||
else: | ||||||||||||||||
raise DagsterGraphQLClientError(repo_connection_status, query_res["message"]) | ||||||||||||||||
def submit_pipeline_execution( | def submit_pipeline_execution( | |||||||||||||||
Not Done Inline ActionsAm I understanding correctly that the underlying GraphQL mutation is called launchPipelineExecution? If so, any reason not to call this method launch_pipeline_execution? sandyryza: Am I understanding correctly that the underlying GraphQL mutation is called… | ||||||||||||||||
Done Inline ActionsI think after a conversation with Alex, he recommended calling this submit_pipeline_execution? I think we have been doing a rename under the hood from launch_pipeline to submit_pipeline, I guess ostensibly because the submit change now sends the run to the daemon which launches it instead. From conversations with Johann, it seems a bit messy, so I'm happy to rename this to launch_pipeline_execution if you think that's better. sidkmenon: I think after a conversation with Alex, he recommended calling this `submit_pipeline_execution`? | ||||||||||||||||
Not Done Inline Actionscc @johann https://github.com/dagster-io/dagster/issues/3182 for context - when we introduced the RunCoordinator for run queueing we made launch from dagit / graphql actually do submit but didn't follow up on any renames in the product or GraphQL schema its not obvious whats the best path here, we could
alangenfeld: cc @johann
https://github.com/dagster-io/dagster/issues/3182
for context - when we introduced… | ||||||||||||||||
Not Done Inline ActionsAh - got it. How difficult would it be to just add submitPipelineExecution to the GraphQL schema? sandyryza: Ah - got it. How difficult would it be to just add submitPipelineExecution to the GraphQL… | ||||||||||||||||
Not Done Inline Actionsadding it - pretty easy alangenfeld: adding it - pretty easy
putting it under sufficient test - not complicated but will take time | ||||||||||||||||
Done Inline ActionsIdea - could we land this & then change the backing query on the client when submitPipelineExecution is ready? I have the backwards compat setup already handled so I think we should be good? sidkmenon: Idea - could we land this & then change the backing query on the client when… | ||||||||||||||||
self, | self, | |||||||||||||||
pipeline_name: str, | pipeline_name: str, | |||||||||||||||
repository_location_name: Optional[str] = None, | repository_location_name: Optional[str] = None, | |||||||||||||||
repository_name: Optional[str] = None, | repository_name: Optional[str] = None, | |||||||||||||||
run_config: Optional[Any] = None, | run_config: Optional[Any] = None, | |||||||||||||||
mode: Optional[str] = None, | mode: Optional[str] = None, | |||||||||||||||
preset: Optional[str] = None, | preset: Optional[str] = None, | |||||||||||||||
) -> str: | ) -> str: | |||||||||||||||
raise NotImplementedError("not yet implemented") | """Submits a Pipeline with attached configuration for execution. | |||||||||||||||
Args: | ||||||||||||||||
pipeline_name (str): The pipeline's name | ||||||||||||||||
repository_location_name (Optional[str], optional): The name of the repository location where | ||||||||||||||||
the pipeline is located. If omitted, the client will try to infer the repository location | ||||||||||||||||
from the available options on the Dagster deployment. Defaults to None. | ||||||||||||||||
repository_name (Optional[str], optional): The name of the repository where the pipeline is located. | ||||||||||||||||
If omitted, the client will try to infer the repository from the available options | ||||||||||||||||
on the Dagster deployment. Defaults to None. | ||||||||||||||||
run_config (Optional[Any], optional): This is the run config to execute the pipeline with. | ||||||||||||||||
Note that runConfigData is any-typed in the GraphQL type system. This type is used when passing in | ||||||||||||||||
an arbitrary object for run config. However, it must conform to the constraints of the config | ||||||||||||||||
schema for this pipeline. If it does not, the client will throw a DagsterGraphQLClientError with a message of | ||||||||||||||||
PipelineConfigValidationInvalid. Defaults to None. | ||||||||||||||||
mode (Optional[str], optional): The mode to run the pipeline with. If you have not | ||||||||||||||||
defined any custom modes for your pipeline, the default mode is "default". Defaults to None. | ||||||||||||||||
preset (Optional[str], optional): The name of a pre-defined preset to use instead of a | ||||||||||||||||
Done Inline Actions
unnecessary? rexledesma: unnecessary? | ||||||||||||||||
Done Inline Actionsthe extra optional seems to be part of the autogenerated doc strings. but it looks like I accidentally deleted the optionals or something on repository_location_name and repository_name, etc, so I added them back for consistency. sidkmenon: the extra `optional` seems to be part of the autogenerated doc strings. but it looks like I… | ||||||||||||||||
run config. Defaults to None. | ||||||||||||||||
Raises: | ||||||||||||||||
DagsterGraphQLClientError("InvalidStepError", invalid_step_key): the pipeline has an invalid step | ||||||||||||||||
DagsterGraphQLClientError("InvalidOutputError", body=error_object): some solid has an invalid output within the pipeline. | ||||||||||||||||
The error_object is of type dagster_graphql.InvalidOutputErrorInfo. | ||||||||||||||||
DagsterGraphQLClientError("ConflictingExecutionParamsError", invalid_step_key): a preset and a run_config & mode are present | ||||||||||||||||
that conflict with one another | ||||||||||||||||
DagsterGraphQLClientError("PresetNotFoundError", message): if the provided preset name is not found | ||||||||||||||||
DagsterGraphQLClientError("PipelineConfigurationInvalid", invalid_step_key): the run_config is not in the expected format | ||||||||||||||||
for the pipeline | ||||||||||||||||
DagsterGraphQLClientError("PipelineNotFoundError", message): the requested pipeline does not exist | ||||||||||||||||
DagsterGraphQLClientError("PythonError", message): an internal framework error occurred | ||||||||||||||||
Returns: | ||||||||||||||||
str: run id of the submitted pipeline run | ||||||||||||||||
""" | ||||||||||||||||
check.opt_str_param(repository_location_name, "repository_location_name") | ||||||||||||||||
check.opt_str_param(repository_name, "repository_name") | ||||||||||||||||
check.str_param(pipeline_name, "pipeline_name") | ||||||||||||||||
check.opt_str_param(mode, "mode") | ||||||||||||||||
check.opt_str_param(preset, "preset") | ||||||||||||||||
check.invariant( | ||||||||||||||||
(mode is not None and run_config is not None) or preset is not None, | ||||||||||||||||
"Either a mode and run_config or a preset must be specified in order to " | ||||||||||||||||
f"submit the pipeline {pipeline_name} for execution", | ||||||||||||||||
) | ||||||||||||||||
Done Inline Actions
spacing, and remove + for concat multiline strings rexledesma: spacing, and remove `+` for concat multiline strings | ||||||||||||||||
if not repository_location_name or not repository_name: | ||||||||||||||||
pipeline_info_lst = self._get_repo_locations_and_names_with_pipeline(pipeline_name) | ||||||||||||||||
if len(pipeline_info_lst) == 0: | ||||||||||||||||
raise DagsterGraphQLClientError( | ||||||||||||||||
"PipelineNotFoundError", f"No pipelines with the name `{pipeline_name}` exist" | ||||||||||||||||
) | ||||||||||||||||
elif len(pipeline_info_lst) == 1: | ||||||||||||||||
pipeline_info = pipeline_info_lst[0] | ||||||||||||||||
repository_location_name = pipeline_info.repository_location_name | ||||||||||||||||
repository_name = pipeline_info.repository_name | ||||||||||||||||
else: | ||||||||||||||||
raise DagsterGraphQLClientError( | ||||||||||||||||
"Must specify repository_location_name and repository_name" | ||||||||||||||||
f" since there are multiple pipelines with the name {pipeline_name}." | ||||||||||||||||
f"\n\tchoose one of: {pipeline_info_lst}" | ||||||||||||||||
) | ||||||||||||||||
variables = { | ||||||||||||||||
"executionParams": { | ||||||||||||||||
"selector": { | ||||||||||||||||
"repositoryLocationName": repository_location_name, | ||||||||||||||||
"repositoryName": repository_name, | ||||||||||||||||
"pipelineName": pipeline_name, | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
if preset is not None: | ||||||||||||||||
variables["executionParams"]["preset"] = preset | ||||||||||||||||
if mode is not None and run_config is not None: | ||||||||||||||||
variables["executionParams"] = { | ||||||||||||||||
**variables["executionParams"], | ||||||||||||||||
"runConfigData": run_config, | ||||||||||||||||
"mode": mode, | ||||||||||||||||
} | ||||||||||||||||
res_data: Dict[str, Any] = self._execute(CLIENT_SUBMIT_PIPELINE_RUN_MUTATION, variables) | ||||||||||||||||
query_result = res_data["launchPipelineExecution"] | ||||||||||||||||
query_result_type = query_result["__typename"] | ||||||||||||||||
if query_result_type == "LaunchPipelineRunSuccess": | ||||||||||||||||
return query_result["run"]["runId"] | ||||||||||||||||
elif query_result_type == "InvalidStepError": | ||||||||||||||||
raise DagsterGraphQLClientError(query_result_type, query_result["invalidStepKey"]) | ||||||||||||||||
elif query_result_type == "InvalidOutputError": | ||||||||||||||||
error_info = InvalidOutputErrorInfo( | ||||||||||||||||
step_key=query_result["stepKey"], | ||||||||||||||||
invalid_output_name=query_result["invalidOutputName"], | ||||||||||||||||
) | ||||||||||||||||
raise DagsterGraphQLClientError(query_result_type, body=error_info) | ||||||||||||||||
elif query_result_type == "PipelineConfigValidationInvalid": | ||||||||||||||||
raise DagsterGraphQLClientError(query_result_type, query_result["errors"]) | ||||||||||||||||
else: | ||||||||||||||||
# query_result_type is a ConflictingExecutionParamsError, a PresetNotFoundError | ||||||||||||||||
# or a PipelineNotFoundError, or a PythonError | ||||||||||||||||
raise DagsterGraphQLClientError(query_result_type, query_result["message"]) | ||||||||||||||||
def get_run_status(self, run_id: str) -> PipelineRunStatus: | def get_run_status(self, run_id: str) -> PipelineRunStatus: | |||||||||||||||
"""Get the status of a given Pipeline Run | """Get the status of a given Pipeline Run | |||||||||||||||
Args: | Args: | |||||||||||||||
run_id (str): run id of the requested pipeline run. | run_id (str): run id of the requested pipeline run. | |||||||||||||||
Raises: | Raises: | |||||||||||||||
▲ Show 20 Lines • Show All 49 Lines • Show Last 20 Lines |
nit: type - weird how this isn't caught by mypy?