Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/schema/external.py
import graphene | |||||
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import ( | from dagster.core.host_representation import ( | ||||
ExternalRepository, | ExternalRepository, | ||||
ExternalRepositoryOrigin, | ExternalRepositoryOrigin, | ||||
GrpcServerRepositoryLocationHandle, | GrpcServerRepositoryLocationHandle, | ||||
ManagedGrpcPythonEnvRepositoryLocationHandle, | ManagedGrpcPythonEnvRepositoryLocationHandle, | ||||
RepositoryLocation, | |||||
) | ) | ||||
from dagster.core.host_representation import RepositoryLocation as DagsterRepositoryLocation | |||||
from dagster.core.host_representation.grpc_server_state_subscriber import ( | from dagster.core.host_representation.grpc_server_state_subscriber import ( | ||||
LocationStateChangeEventType, | LocationStateChangeEventType, | ||||
) | ) | ||||
from dagster.utils.error import SerializableErrorInfo | from dagster.utils.error import SerializableErrorInfo | ||||
from dagster_graphql import dauphin | |||||
from dagster_graphql.implementation.fetch_solids import get_solid, get_solids | from dagster_graphql.implementation.fetch_solids import get_solid, get_solids | ||||
from dagster_graphql.schema.errors import DauphinPythonError | |||||
DauphinLocationStateChangeEventType = dauphin.Enum.from_enum(LocationStateChangeEventType) | from .errors import PythonError, RepositoryNotFoundError | ||||
from .partition_sets import PartitionSet | |||||
from .pipelines.pipeline import Pipeline | |||||
from .repository_origin import RepositoryOrigin | |||||
from .schedules import Schedule | |||||
from .sensors import Sensor | |||||
from .used_solid import UsedSolid | |||||
from .util import non_null_list | |||||
LocationStateChangeEventType = graphene.Enum.from_enum(LocationStateChangeEventType) | |||||
class RepositoryLocation(graphene.ObjectType): | |||||
id = graphene.NonNull(graphene.ID) | |||||
name = graphene.NonNull(graphene.String) | |||||
is_reload_supported = graphene.NonNull(graphene.Boolean) | |||||
environment_path = graphene.String() | |||||
repositories = non_null_list(lambda: Repository) | |||||
server_id = graphene.String() | |||||
def __init__(self, location): | |||||
self._location = check.inst_param(location, "location", RepositoryLocation) | |||||
environment_path = ( | |||||
location.location_handle.executable_path | |||||
if isinstance(location.location_handle, ManagedGrpcPythonEnvRepositoryLocationHandle) | |||||
else None | |||||
) | |||||
class DauphinRepository(dauphin.ObjectType): | server_id = ( | ||||
class Meta: | location.location_handle.server_id | ||||
name = "Repository" | if isinstance(location.location_handle, GrpcServerRepositoryLocationHandle) | ||||
else None | |||||
) | |||||
check.invariant(location.name is not None) | |||||
super(RepositoryLocation, self).__init__( | |||||
name=location.name, | |||||
environment_path=environment_path, | |||||
is_reload_supported=location.is_reload_supported, | |||||
server_id=server_id, | |||||
) | |||||
def resolve_id(self, _): | |||||
return self.name | |||||
def resolve_repositories(self, _graphene_info): | |||||
return [ | |||||
Repository(repository, self._location) | |||||
for repository in self._location.get_repositories().values() | |||||
] | |||||
class Repository(graphene.ObjectType): | |||||
def __init__(self, repository, repository_location): | def __init__(self, repository, repository_location): | ||||
self._repository = check.inst_param(repository, "repository", ExternalRepository) | self._repository = check.inst_param(repository, "repository", ExternalRepository) | ||||
self._repository_location = check.inst_param( | self._repository_location = check.inst_param( | ||||
repository_location, "repository_location", RepositoryLocation | repository_location, "repository_location", DagsterRepositoryLocation | ||||
) | ) | ||||
super(DauphinRepository, self).__init__(name=repository.name) | super(Repository, self).__init__(name=repository.name) | ||||
id = dauphin.NonNull(dauphin.ID) | id = graphene.NonNull(graphene.ID) | ||||
name = dauphin.NonNull(dauphin.String) | name = graphene.NonNull(graphene.String) | ||||
location = dauphin.NonNull("RepositoryLocation") | location = graphene.NonNull(RepositoryLocation) | ||||
pipelines = dauphin.non_null_list("Pipeline") | pipelines = non_null_list(Pipeline) | ||||
usedSolids = dauphin.Field(dauphin.non_null_list("UsedSolid")) | usedSolids = graphene.Field(non_null_list(UsedSolid)) | ||||
usedSolid = dauphin.Field("UsedSolid", name=dauphin.NonNull(dauphin.String)) | usedSolid = graphene.Field(UsedSolid, name=graphene.NonNull(graphene.String)) | ||||
origin = dauphin.NonNull("RepositoryOrigin") | origin = graphene.NonNull(RepositoryOrigin) | ||||
partitionSets = dauphin.non_null_list("PartitionSet") | partitionSets = non_null_list(PartitionSet) | ||||
schedules = dauphin.non_null_list("Schedule") | schedules = non_null_list(Schedule) | ||||
sensors = dauphin.non_null_list("Sensor") | sensors = non_null_list(Sensor) | ||||
def resolve_id(self, _graphene_info): | def resolve_id(self, _graphene_info): | ||||
return self._repository.get_external_origin_id() | return self._repository.get_external_origin_id() | ||||
def resolve_origin(self, graphene_info): | def resolve_origin(self, _graphene_info): | ||||
origin = self._repository.get_external_origin() | origin = self._repository.get_external_origin() | ||||
return graphene_info.schema.type_named("RepositoryOrigin")(origin) | return RepositoryOrigin(origin) | ||||
def resolve_location(self, graphene_info): | def resolve_location(self, _graphene_info): | ||||
return graphene_info.schema.type_named("RepositoryLocation")(self._repository_location) | return RepositoryLocation(self._repository_location) | ||||
def resolve_schedules(self, graphene_info): | def resolve_schedules(self, graphene_info): | ||||
schedules = self._repository.get_external_schedules() | schedules = self._repository.get_external_schedules() | ||||
return sorted( | return sorted( | ||||
[ | [Schedule(graphene_info, schedule) for schedule in schedules], | ||||
graphene_info.schema.type_named("Schedule")(graphene_info, schedule) | |||||
for schedule in schedules | |||||
], | |||||
key=lambda schedule: schedule.name, | key=lambda schedule: schedule.name, | ||||
) | ) | ||||
def resolve_sensors(self, graphene_info): | def resolve_sensors(self, graphene_info): | ||||
sensors = self._repository.get_external_sensors() | sensors = self._repository.get_external_sensors() | ||||
return sorted( | return sorted( | ||||
[ | [Sensor(graphene_info, sensor) for sensor in sensors], key=lambda sensor: sensor.name, | ||||
graphene_info.schema.type_named("Sensor")(graphene_info, sensor) | |||||
for sensor in sensors | |||||
], | |||||
key=lambda sensor: sensor.name, | |||||
) | ) | ||||
def resolve_pipelines(self, graphene_info): | def resolve_pipelines(self, _graphene_info): | ||||
return sorted( | return sorted( | ||||
[ | [Pipeline(pipeline) for pipeline in self._repository.get_all_external_pipelines()], | ||||
graphene_info.schema.type_named("Pipeline")(pipeline) | |||||
for pipeline in self._repository.get_all_external_pipelines() | |||||
], | |||||
key=lambda pipeline: pipeline.name, | key=lambda pipeline: pipeline.name, | ||||
) | ) | ||||
def resolve_usedSolid(self, _graphene_info, name): | def resolve_usedSolid(self, _graphene_info, name): | ||||
return get_solid(self._repository, name) | return get_solid(self._repository, name) | ||||
def resolve_usedSolids(self, _graphene_info): | def resolve_usedSolids(self, _graphene_info): | ||||
return get_solids(self._repository) | return get_solids(self._repository) | ||||
def resolve_partitionSets(self, graphene_info): | def resolve_partitionSets(self, _graphene_info): | ||||
return ( | return ( | ||||
graphene_info.schema.type_named("PartitionSet")(self._repository.handle, partition_set) | PartitionSet(self._repository.handle, partition_set) | ||||
for partition_set in self._repository.get_external_partition_sets() | for partition_set in self._repository.get_external_partition_sets() | ||||
) | ) | ||||
class DauphinRepositoryOrigin(dauphin.ObjectType): | class RepositoryLocationLoadFailure(graphene.ObjectType): | ||||
class Meta: | id = graphene.NonNull(graphene.ID) | ||||
name = "RepositoryOrigin" | name = graphene.NonNull(graphene.String) | ||||
error = graphene.NonNull(PythonError) | |||||
repository_location_name = dauphin.NonNull(dauphin.String) | |||||
repository_name = dauphin.NonNull(dauphin.String) | |||||
repository_location_metadata = dauphin.non_null_list("RepositoryMetadata") | |||||
def __init__(self, origin): | |||||
self._origin = check.inst_param(origin, "origin", ExternalRepositoryOrigin) | |||||
def resolve_repository_location_name(self, _graphene_info): | |||||
return self._origin.repository_location_origin.location_name | |||||
def resolve_repository_name(self, _graphene_info): | |||||
return self._origin.repository_name | |||||
def resolve_repository_location_metadata(self, graphene_info): | |||||
metadata = self._origin.repository_location_origin.get_display_metadata() | |||||
return [ | |||||
graphene_info.schema.type_named("RepositoryMetadata")(key=key, value=value) | |||||
for key, value in metadata.items() | |||||
if value is not None | |||||
] | |||||
class DauphinRepositoryMetadata(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "RepositoryMetadata" | |||||
key = dauphin.NonNull(dauphin.String) | |||||
value = dauphin.NonNull(dauphin.String) | |||||
class DauphinRepositoryLocationOrLoadFailure(dauphin.Union): | |||||
class Meta: | |||||
name = "RepositoryLocationOrLoadFailure" | |||||
types = ("RepositoryLocation", "RepositoryLocationLoadFailure") | |||||
class DauphinRepositoryLocation(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "RepositoryLocation" | |||||
id = dauphin.NonNull(dauphin.ID) | |||||
name = dauphin.NonNull(dauphin.String) | |||||
is_reload_supported = dauphin.NonNull(dauphin.Boolean) | |||||
environment_path = dauphin.String() | |||||
repositories = dauphin.non_null_list("Repository") | |||||
server_id = dauphin.String() | |||||
def __init__(self, location): | |||||
self._location = check.inst_param(location, "location", RepositoryLocation) | |||||
environment_path = ( | |||||
location.location_handle.executable_path | |||||
if isinstance(location.location_handle, ManagedGrpcPythonEnvRepositoryLocationHandle) | |||||
else None | |||||
) | |||||
server_id = ( | |||||
location.location_handle.server_id | |||||
if isinstance(location.location_handle, GrpcServerRepositoryLocationHandle) | |||||
else None | |||||
) | |||||
check.invariant(location.name is not None) | |||||
super(DauphinRepositoryLocation, self).__init__( | |||||
name=location.name, | |||||
environment_path=environment_path, | |||||
is_reload_supported=location.is_reload_supported, | |||||
server_id=server_id, | |||||
) | |||||
def resolve_id(self, _): | |||||
return self.name | |||||
def resolve_repositories(self, graphene_info): | |||||
return [ | |||||
graphene_info.schema.type_named("Repository")(repository, self._location) | |||||
for repository in self._location.get_repositories().values() | |||||
] | |||||
class DauphinRepositoryLocationLoadFailure(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "RepositoryLocationLoadFailure" | |||||
id = dauphin.NonNull(dauphin.ID) | |||||
name = dauphin.NonNull(dauphin.String) | |||||
error = dauphin.NonNull("PythonError") | |||||
def __init__(self, name, error): | def __init__(self, name, error): | ||||
check.str_param(name, "name") | check.str_param(name, "name") | ||||
check.inst_param(error, "error", SerializableErrorInfo) | check.inst_param(error, "error", SerializableErrorInfo) | ||||
super(DauphinRepositoryLocationLoadFailure, self).__init__( | super(RepositoryLocationLoadFailure, self).__init__(name=name, error=PythonError(error)) | ||||
name=name, error=DauphinPythonError(error) | |||||
) | |||||
def resolve_id(self, _): | def resolve_id(self, _): | ||||
return self.name | return self.name | ||||
class DauphinRepositoryConnection(dauphin.ObjectType): | class RepositoryLocationOrLoadFailure(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "RepositoryConnection" | types = (RepositoryLocation, RepositoryLocationLoadFailure) | ||||
nodes = dauphin.non_null_list("Repository") | |||||
class DauphinRepositoryLocationConnection(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "RepositoryLocationConnection" | |||||
nodes = dauphin.non_null_list("RepositoryLocationOrLoadFailure") | class RepositoryConnection(graphene.ObjectType): | ||||
nodes = non_null_list(Repository) | |||||
class DauphinLocationStateChangeSubscription(dauphin.ObjectType): | class RepositoryLocationConnection(graphene.ObjectType): | ||||
class Meta(object): | nodes = non_null_list(RepositoryLocationOrLoadFailure) | ||||
name = "LocationStateChangeSubscription" | |||||
event = dauphin.Field(dauphin.NonNull("LocationStateChangeEvent")) | |||||
class LocationStateChangeEvent(graphene.ObjectType): | |||||
event_type = graphene.NonNull(LocationStateChangeEventType) | |||||
message = graphene.NonNull(graphene.String) | |||||
location_name = graphene.NonNull(graphene.String) | |||||
server_id = graphene.Field(graphene.String) | |||||
class DauphinLocationStateChangeEvent(dauphin.ObjectType): | |||||
class Meta(object): | |||||
name = "LocationStateChangeEvent" | |||||
event_type = dauphin.NonNull("LocationStateChangeEventType") | class LocationStateChangeSubscription(graphene.ObjectType): | ||||
message = dauphin.NonNull(dauphin.String) | event = graphene.Field(graphene.NonNull(LocationStateChangeEvent)) | ||||
location_name = dauphin.NonNull(dauphin.String) | |||||
server_id = dauphin.Field(dauphin.String) | |||||
def get_location_state_change_observable(graphene_info): | def get_location_state_change_observable(graphene_info): | ||||
context = graphene_info.context | context = graphene_info.context | ||||
return context.location_state_events.map( | return context.location_state_events.map( | ||||
lambda event: graphene_info.schema.type_named("LocationStateChangeSubscription")( | lambda event: LocationStateChangeSubscription( | ||||
event=graphene_info.schema.type_named("LocationStateChangeEvent")( | event=LocationStateChangeEvent( | ||||
event_type=event.event_type, | event_type=event.event_type, | ||||
location_name=event.location_name, | location_name=event.location_name, | ||||
message=event.message, | message=event.message, | ||||
server_id=event.server_id, | server_id=event.server_id, | ||||
), | ), | ||||
) | ) | ||||
) | ) | ||||
class RepositoriesOrError(graphene.Union): | |||||
class Meta: | |||||
types = (RepositoryConnection, PythonError) | |||||
class RepositoryLocationsOrError(graphene.Union): | |||||
class Meta: | |||||
types = (RepositoryLocationConnection, PythonError) | |||||
class RepositoryOrError(graphene.Union): | |||||
class Meta: | |||||
types = (PythonError, Repository, RepositoryNotFoundError) | |||||
types = [ | |||||
LocationStateChangeEvent, | |||||
LocationStateChangeSubscription, | |||||
RepositoriesOrError, | |||||
Repository, | |||||
RepositoryConnection, | |||||
RepositoryLocation, | |||||
RepositoryLocationConnection, | |||||
RepositoryLocationLoadFailure, | |||||
RepositoryLocationOrLoadFailure, | |||||
RepositoryLocationsOrError, | |||||
RepositoryOrError, | |||||
] | |||||