Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/schema/external.py
import graphene | |||||
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import ( | from dagster.core.host_representation import ( | ||||
ExternalRepository, | ExternalRepository, | ||||
ExternalRepositoryOrigin, | |||||
GrpcServerRepositoryLocationHandle, | GrpcServerRepositoryLocationHandle, | ||||
ManagedGrpcPythonEnvRepositoryLocationHandle, | ManagedGrpcPythonEnvRepositoryLocationHandle, | ||||
RepositoryLocation, | RepositoryLocation, | ||||
) | ) | ||||
from dagster.core.host_representation.grpc_server_state_subscriber import ( | from dagster.core.host_representation.grpc_server_state_subscriber import ( | ||||
LocationStateChangeEventType, | LocationStateChangeEventType, | ||||
) | ) | ||||
from dagster.utils.error import SerializableErrorInfo | from dagster.utils.error import SerializableErrorInfo | ||||
from dagster_graphql import dauphin | |||||
from dagster_graphql.implementation.fetch_solids import get_solid, get_solids | from dagster_graphql.implementation.fetch_solids import get_solid, get_solids | ||||
from dagster_graphql.schema.errors import DauphinPythonError | |||||
DauphinLocationStateChangeEventType = dauphin.Enum.from_enum(LocationStateChangeEventType) | from .errors import GraphenePythonError, GrapheneRepositoryNotFoundError | ||||
from .partition_sets import GraphenePartitionSet | |||||
from .pipelines.pipeline import GraphenePipeline | |||||
from .repository_origin import GrapheneRepositoryOrigin | |||||
from .schedules import GrapheneSchedule | |||||
from .sensors import GrapheneSensor | |||||
from .used_solid import GrapheneUsedSolid | |||||
from .util import non_null_list | |||||
class GrapheneLocationStateChangeEventType(graphene.Enum): | |||||
LOCATION_UPDATED = "LOCATION_UPDATED" | |||||
LOCATION_DISCONNECTED = "LOCATION_DISCONNECTED" | |||||
LOCATION_RECONNECTED = "LOCATION_RECONNECTED" | |||||
LOCATION_ERROR = "LOCATION_ERROR" | |||||
class Meta: | |||||
name = "LocationStateChangeEventType" | |||||
class GrapheneRepositoryLocation(graphene.ObjectType): | |||||
id = graphene.NonNull(graphene.ID) | |||||
name = graphene.NonNull(graphene.String) | |||||
is_reload_supported = graphene.NonNull(graphene.Boolean) | |||||
environment_path = graphene.String() | |||||
repositories = non_null_list(lambda: GrapheneRepository) | |||||
server_id = graphene.String() | |||||
class Meta: | |||||
name = "RepositoryLocation" | |||||
def __init__(self, location): | |||||
self._location = check.inst_param(location, "location", RepositoryLocation) | |||||
environment_path = ( | |||||
location.location_handle.executable_path | |||||
if isinstance(location.location_handle, ManagedGrpcPythonEnvRepositoryLocationHandle) | |||||
else None | |||||
) | |||||
server_id = ( | |||||
location.location_handle.server_id | |||||
if isinstance(location.location_handle, GrpcServerRepositoryLocationHandle) | |||||
else None | |||||
) | |||||
check.invariant(location.name is not None) | |||||
super().__init__( | |||||
name=location.name, | |||||
environment_path=environment_path, | |||||
is_reload_supported=location.is_reload_supported, | |||||
server_id=server_id, | |||||
) | |||||
def resolve_id(self, _): | |||||
return self.name | |||||
def resolve_repositories(self, _graphene_info): | |||||
return [ | |||||
GrapheneRepository(repository, self._location) | |||||
for repository in self._location.get_repositories().values() | |||||
] | |||||
class GrapheneRepository(graphene.ObjectType): | |||||
id = graphene.NonNull(graphene.ID) | |||||
name = graphene.NonNull(graphene.String) | |||||
location = graphene.NonNull(GrapheneRepositoryLocation) | |||||
pipelines = non_null_list(GraphenePipeline) | |||||
usedSolids = graphene.Field(non_null_list(GrapheneUsedSolid)) | |||||
usedSolid = graphene.Field(GrapheneUsedSolid, name=graphene.NonNull(graphene.String)) | |||||
origin = graphene.NonNull(GrapheneRepositoryOrigin) | |||||
partitionSets = non_null_list(GraphenePartitionSet) | |||||
schedules = non_null_list(GrapheneSchedule) | |||||
sensors = non_null_list(GrapheneSensor) | |||||
class DauphinRepository(dauphin.ObjectType): | |||||
class Meta: | class Meta: | ||||
name = "Repository" | name = "Repository" | ||||
def __init__(self, repository, repository_location): | def __init__(self, repository, repository_location): | ||||
self._repository = check.inst_param(repository, "repository", ExternalRepository) | self._repository = check.inst_param(repository, "repository", ExternalRepository) | ||||
self._repository_location = check.inst_param( | self._repository_location = check.inst_param( | ||||
repository_location, "repository_location", RepositoryLocation | repository_location, "repository_location", RepositoryLocation | ||||
) | ) | ||||
super(DauphinRepository, self).__init__(name=repository.name) | super().__init__(name=repository.name) | ||||
id = dauphin.NonNull(dauphin.ID) | |||||
name = dauphin.NonNull(dauphin.String) | |||||
location = dauphin.NonNull("RepositoryLocation") | |||||
pipelines = dauphin.non_null_list("Pipeline") | |||||
usedSolids = dauphin.Field(dauphin.non_null_list("UsedSolid")) | |||||
usedSolid = dauphin.Field("UsedSolid", name=dauphin.NonNull(dauphin.String)) | |||||
origin = dauphin.NonNull("RepositoryOrigin") | |||||
partitionSets = dauphin.non_null_list("PartitionSet") | |||||
schedules = dauphin.non_null_list("Schedule") | |||||
sensors = dauphin.non_null_list("Sensor") | |||||
def resolve_id(self, _graphene_info): | def resolve_id(self, _graphene_info): | ||||
return self._repository.get_external_origin_id() | return self._repository.get_external_origin_id() | ||||
def resolve_origin(self, graphene_info): | def resolve_origin(self, _graphene_info): | ||||
origin = self._repository.get_external_origin() | origin = self._repository.get_external_origin() | ||||
return graphene_info.schema.type_named("RepositoryOrigin")(origin) | return GrapheneRepositoryOrigin(origin) | ||||
def resolve_location(self, graphene_info): | def resolve_location(self, _graphene_info): | ||||
return graphene_info.schema.type_named("RepositoryLocation")(self._repository_location) | return GrapheneRepositoryLocation(self._repository_location) | ||||
def resolve_schedules(self, graphene_info): | def resolve_schedules(self, graphene_info): | ||||
schedules = self._repository.get_external_schedules() | schedules = self._repository.get_external_schedules() | ||||
return sorted( | return sorted( | ||||
[ | [GrapheneSchedule(graphene_info, schedule) for schedule in schedules], | ||||
graphene_info.schema.type_named("Schedule")(graphene_info, schedule) | |||||
for schedule in schedules | |||||
], | |||||
key=lambda schedule: schedule.name, | key=lambda schedule: schedule.name, | ||||
) | ) | ||||
def resolve_sensors(self, graphene_info): | def resolve_sensors(self, graphene_info): | ||||
sensors = self._repository.get_external_sensors() | sensors = self._repository.get_external_sensors() | ||||
return sorted( | return sorted( | ||||
[ | [GrapheneSensor(graphene_info, sensor) for sensor in sensors], | ||||
graphene_info.schema.type_named("Sensor")(graphene_info, sensor) | |||||
for sensor in sensors | |||||
], | |||||
key=lambda sensor: sensor.name, | key=lambda sensor: sensor.name, | ||||
) | ) | ||||
def resolve_pipelines(self, graphene_info): | def resolve_pipelines(self, _graphene_info): | ||||
return sorted( | return [ | ||||
[ | GraphenePipeline(pipeline) | ||||
graphene_info.schema.type_named("Pipeline")(pipeline) | for pipeline in sorted( | ||||
for pipeline in self._repository.get_all_external_pipelines() | self._repository.get_all_external_pipelines(), key=lambda pipeline: pipeline.name | ||||
], | |||||
key=lambda pipeline: pipeline.name, | |||||
) | ) | ||||
] | |||||
def resolve_usedSolid(self, _graphene_info, name): | def resolve_usedSolid(self, _graphene_info, name): | ||||
return get_solid(self._repository, name) | return get_solid(self._repository, name) | ||||
def resolve_usedSolids(self, _graphene_info): | def resolve_usedSolids(self, _graphene_info): | ||||
return get_solids(self._repository) | return get_solids(self._repository) | ||||
def resolve_partitionSets(self, graphene_info): | def resolve_partitionSets(self, _graphene_info): | ||||
return ( | return ( | ||||
graphene_info.schema.type_named("PartitionSet")(self._repository.handle, partition_set) | GraphenePartitionSet(self._repository.handle, partition_set) | ||||
for partition_set in self._repository.get_external_partition_sets() | for partition_set in self._repository.get_external_partition_sets() | ||||
) | ) | ||||
class DauphinRepositoryOrigin(dauphin.ObjectType): | class GrapheneRepositoryLocationLoadFailure(graphene.ObjectType): | ||||
class Meta: | id = graphene.NonNull(graphene.ID) | ||||
name = "RepositoryOrigin" | name = graphene.NonNull(graphene.String) | ||||
error = graphene.NonNull(GraphenePythonError) | |||||
repository_location_name = dauphin.NonNull(dauphin.String) | |||||
repository_name = dauphin.NonNull(dauphin.String) | |||||
repository_location_metadata = dauphin.non_null_list("RepositoryMetadata") | |||||
def __init__(self, origin): | |||||
self._origin = check.inst_param(origin, "origin", ExternalRepositoryOrigin) | |||||
def resolve_repository_location_name(self, _graphene_info): | |||||
return self._origin.repository_location_origin.location_name | |||||
def resolve_repository_name(self, _graphene_info): | |||||
return self._origin.repository_name | |||||
def resolve_repository_location_metadata(self, graphene_info): | |||||
metadata = self._origin.repository_location_origin.get_display_metadata() | |||||
return [ | |||||
graphene_info.schema.type_named("RepositoryMetadata")(key=key, value=value) | |||||
for key, value in metadata.items() | |||||
if value is not None | |||||
] | |||||
class DauphinRepositoryMetadata(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "RepositoryMetadata" | |||||
key = dauphin.NonNull(dauphin.String) | |||||
value = dauphin.NonNull(dauphin.String) | |||||
class DauphinRepositoryLocationOrLoadFailure(dauphin.Union): | |||||
class Meta: | |||||
name = "RepositoryLocationOrLoadFailure" | |||||
types = ("RepositoryLocation", "RepositoryLocationLoadFailure") | |||||
class DauphinRepositoryLocation(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "RepositoryLocation" | |||||
id = dauphin.NonNull(dauphin.ID) | |||||
name = dauphin.NonNull(dauphin.String) | |||||
is_reload_supported = dauphin.NonNull(dauphin.Boolean) | |||||
environment_path = dauphin.String() | |||||
repositories = dauphin.non_null_list("Repository") | |||||
server_id = dauphin.String() | |||||
def __init__(self, location): | |||||
self._location = check.inst_param(location, "location", RepositoryLocation) | |||||
environment_path = ( | |||||
location.location_handle.executable_path | |||||
if isinstance(location.location_handle, ManagedGrpcPythonEnvRepositoryLocationHandle) | |||||
else None | |||||
) | |||||
server_id = ( | |||||
location.location_handle.server_id | |||||
if isinstance(location.location_handle, GrpcServerRepositoryLocationHandle) | |||||
else None | |||||
) | |||||
check.invariant(location.name is not None) | |||||
super(DauphinRepositoryLocation, self).__init__( | |||||
name=location.name, | |||||
environment_path=environment_path, | |||||
is_reload_supported=location.is_reload_supported, | |||||
server_id=server_id, | |||||
) | |||||
def resolve_id(self, _): | |||||
return self.name | |||||
def resolve_repositories(self, graphene_info): | |||||
return [ | |||||
graphene_info.schema.type_named("Repository")(repository, self._location) | |||||
for repository in self._location.get_repositories().values() | |||||
] | |||||
class DauphinRepositoryLocationLoadFailure(dauphin.ObjectType): | |||||
class Meta: | class Meta: | ||||
name = "RepositoryLocationLoadFailure" | name = "RepositoryLocationLoadFailure" | ||||
id = dauphin.NonNull(dauphin.ID) | |||||
name = dauphin.NonNull(dauphin.String) | |||||
error = dauphin.NonNull("PythonError") | |||||
def __init__(self, name, error): | def __init__(self, name, error): | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
check.inst_param(error, "error", SerializableErrorInfo) | check.inst_param(error, "error", SerializableErrorInfo) | ||||
super(DauphinRepositoryLocationLoadFailure, self).__init__( | super().__init__(name=name, error=GraphenePythonError(error)) | ||||
name=name, error=DauphinPythonError(error) | |||||
) | |||||
def resolve_id(self, _): | def resolve_id(self, _): | ||||
return self.name | return self.name | ||||
class DauphinRepositoryConnection(dauphin.ObjectType): | class GrapheneRepositoryLocationOrLoadFailure(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "RepositoryConnection" | types = (GrapheneRepositoryLocation, GrapheneRepositoryLocationLoadFailure) | ||||
name = "RepositoryLocationOrLoadFailure" | |||||
nodes = dauphin.non_null_list("Repository") | |||||
class GrapheneRepositoryConnection(graphene.ObjectType): | |||||
nodes = non_null_list(GrapheneRepository) | |||||
class DauphinRepositoryLocationConnection(dauphin.ObjectType): | |||||
class Meta: | class Meta: | ||||
name = "RepositoryLocationConnection" | name = "RepositoryConnection" | ||||
nodes = dauphin.non_null_list("RepositoryLocationOrLoadFailure") | |||||
class GrapheneRepositoryLocationConnection(graphene.ObjectType): | |||||
nodes = non_null_list(GrapheneRepositoryLocationOrLoadFailure) | |||||
class DauphinLocationStateChangeSubscription(dauphin.ObjectType): | class Meta: | ||||
class Meta(object): | name = "RepositoryLocationConnection" | ||||
name = "LocationStateChangeSubscription" | |||||
event = dauphin.Field(dauphin.NonNull("LocationStateChangeEvent")) | |||||
class GrapheneLocationStateChangeEvent(graphene.ObjectType): | |||||
event_type = graphene.NonNull(GrapheneLocationStateChangeEventType) | |||||
message = graphene.NonNull(graphene.String) | |||||
location_name = graphene.NonNull(graphene.String) | |||||
server_id = graphene.Field(graphene.String) | |||||
class DauphinLocationStateChangeEvent(dauphin.ObjectType): | class Meta: | ||||
class Meta(object): | |||||
name = "LocationStateChangeEvent" | name = "LocationStateChangeEvent" | ||||
event_type = dauphin.NonNull("LocationStateChangeEventType") | |||||
message = dauphin.NonNull(dauphin.String) | class GrapheneLocationStateChangeSubscription(graphene.ObjectType): | ||||
location_name = dauphin.NonNull(dauphin.String) | event = graphene.Field(graphene.NonNull(GrapheneLocationStateChangeEvent)) | ||||
server_id = dauphin.Field(dauphin.String) | |||||
class Meta: | |||||
name = "LocationStateChangeSubscription" | |||||
def get_location_state_change_observable(graphene_info): | def get_location_state_change_observable(graphene_info): | ||||
context = graphene_info.context | context = graphene_info.context | ||||
return context.location_state_events.map( | return context.location_state_events.map( | ||||
lambda event: graphene_info.schema.type_named("LocationStateChangeSubscription")( | lambda event: GrapheneLocationStateChangeSubscription( | ||||
event=graphene_info.schema.type_named("LocationStateChangeEvent")( | event=GrapheneLocationStateChangeEvent( | ||||
event_type=event.event_type, | event_type=event.event_type, | ||||
location_name=event.location_name, | location_name=event.location_name, | ||||
message=event.message, | message=event.message, | ||||
server_id=event.server_id, | server_id=event.server_id, | ||||
), | ), | ||||
) | ) | ||||
) | ) | ||||
class GrapheneRepositoriesOrError(graphene.Union): | |||||
class Meta: | |||||
types = (GrapheneRepositoryConnection, GraphenePythonError) | |||||
name = "RepositoriesOrError" | |||||
class GrapheneRepositoryLocationsOrError(graphene.Union): | |||||
class Meta: | |||||
types = (GrapheneRepositoryLocationConnection, GraphenePythonError) | |||||
name = "RepositoryLocationsOrError" | |||||
class GrapheneRepositoryOrError(graphene.Union): | |||||
class Meta: | |||||
types = (GraphenePythonError, GrapheneRepository, GrapheneRepositoryNotFoundError) | |||||
name = "RepositoryOrError" | |||||
types = [ | |||||
GrapheneLocationStateChangeEvent, | |||||
GrapheneLocationStateChangeEventType, | |||||
GrapheneLocationStateChangeSubscription, | |||||
GrapheneRepositoriesOrError, | |||||
GrapheneRepository, | |||||
GrapheneRepositoryConnection, | |||||
GrapheneRepositoryLocation, | |||||
GrapheneRepositoryLocationConnection, | |||||
GrapheneRepositoryLocationLoadFailure, | |||||
GrapheneRepositoryLocationOrLoadFailure, | |||||
GrapheneRepositoryLocationsOrError, | |||||
GrapheneRepositoryOrError, | |||||
] |