Differential D6164 Diff 30381 python_modules/dagster-graphql/dagster_graphql/implementation/context.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/implementation/context.py
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import PipelineSelector, RepositoryLocation | from dagster.core.host_representation import PipelineSelector, RepositoryLocation | ||||
from dagster.core.host_representation.external import ExternalPipeline | from dagster.core.host_representation.external import ExternalPipeline | ||||
from dagster.core.host_representation.grpc_server_state_subscriber import ( | from dagster.core.host_representation.grpc_server_state_subscriber import ( | ||||
LocationStateChangeEventType, | LocationStateChangeEventType, | ||||
LocationStateSubscriber, | LocationStateSubscriber, | ||||
) | ) | ||||
from dagster.core.instance import DagsterInstance | from dagster.core.instance import DagsterInstance | ||||
from dagster_graphql.implementation.utils import UserFacingGraphQLError | |||||
from dagster_graphql.schema.errors import DauphinInvalidSubsetError | |||||
from dagster_graphql.schema.pipelines import DauphinPipeline | |||||
from rx.subjects import Subject | from rx.subjects import Subject | ||||
class DagsterGraphQLContext: | class DagsterGraphQLContext: | ||||
def __init__(self, instance, workspace, version=None): | def __init__(self, instance, workspace, version=None): | ||||
self._instance = check.inst_param(instance, "instance", DagsterInstance) | self._instance = check.inst_param(instance, "instance", DagsterInstance) | ||||
self._workspace = workspace | self._workspace = workspace | ||||
self._repository_locations = {} | self._repository_locations = {} | ||||
▲ Show 20 Lines • Show All 75 Lines • ▼ Show 20 Lines | def reload_repository_location(self, name): | ||||
new_handle.add_state_subscriber(self._location_state_subscriber) | new_handle.add_state_subscriber(self._location_state_subscriber) | ||||
new_location = RepositoryLocation.from_handle(new_handle) | new_location = RepositoryLocation.from_handle(new_handle) | ||||
check.invariant(new_location.name == name) | check.invariant(new_location.name == name) | ||||
self._repository_locations[name] = new_location | self._repository_locations[name] = new_location | ||||
elif name in self._repository_locations: | elif name in self._repository_locations: | ||||
del self._repository_locations[name] | del self._repository_locations[name] | ||||
def get_subset_external_pipeline(self, selector): | def get_subset_external_pipeline(self, selector): | ||||
from ..schema.pipelines.pipeline_errors import InvalidSubsetError | |||||
max: Throughout, we abandon fully-qualified imports in favor of relative imports, and in… | |||||
from ..schema.pipelines.pipeline import Pipeline | |||||
from .utils import UserFacingGraphQLError | |||||
check.inst_param(selector, "selector", PipelineSelector) | check.inst_param(selector, "selector", PipelineSelector) | ||||
# We have to grab the pipeline from the location instead of the repository directly | # We have to grab the pipeline from the location instead of the repository directly | ||||
# since we may have to request a subset we don't have in memory yet | # since we may have to request a subset we don't have in memory yet | ||||
repository_location = self._repository_locations[selector.location_name] | repository_location = self._repository_locations[selector.location_name] | ||||
external_repository = repository_location.get_repository(selector.repository_name) | external_repository = repository_location.get_repository(selector.repository_name) | ||||
subset_result = repository_location.get_subset_external_pipeline_result(selector) | subset_result = repository_location.get_subset_external_pipeline_result(selector) | ||||
if not subset_result.success: | if not subset_result.success: | ||||
error_info = subset_result.error | error_info = subset_result.error | ||||
raise UserFacingGraphQLError( | raise UserFacingGraphQLError( | ||||
DauphinInvalidSubsetError( | InvalidSubsetError( | ||||
message="{message}{cause_message}".format( | message="{message}{cause_message}".format( | ||||
message=error_info.message, | message=error_info.message, | ||||
cause_message="\n{}".format(error_info.cause.message) | cause_message="\n{}".format(error_info.cause.message) | ||||
if error_info.cause | if error_info.cause | ||||
else "", | else "", | ||||
), | ), | ||||
pipeline=DauphinPipeline(self.get_full_external_pipeline(selector)), | pipeline=Pipeline(self.get_full_external_pipeline(selector)), | ||||
) | ) | ||||
) | ) | ||||
return ExternalPipeline( | return ExternalPipeline( | ||||
subset_result.external_pipeline_data, repository_handle=external_repository.handle, | subset_result.external_pipeline_data, repository_handle=external_repository.handle, | ||||
) | ) | ||||
def has_external_pipeline(self, selector): | def has_external_pipeline(self, selector): | ||||
▲ Show 20 Lines • Show All 60 Lines • Show Last 20 Lines |
Throughout, we abandon fully-qualified imports in favor of relative imports, and in implementation we nest these inside the functions that make use of them.