Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/schema/sensors.py
import graphene | |||||
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import ExternalSensor, SensorSelector | from dagster.core.host_representation import ExternalSensor, SensorSelector | ||||
from dagster.core.scheduler.job import JobState | from dagster.core.scheduler.job import JobState | ||||
from dagster_graphql import dauphin | |||||
from dagster_graphql.implementation.fetch_sensors import ( | |||||
get_sensor_next_tick, | |||||
start_sensor, | |||||
stop_sensor, | |||||
) | |||||
from dagster_graphql.schema.errors import ( | |||||
DauphinPythonError, | |||||
DauphinRepositoryNotFoundError, | |||||
DauphinSensorNotFoundError, | |||||
) | |||||
from ..implementation.fetch_sensors import get_sensor_next_tick, start_sensor, stop_sensor | |||||
from .errors import ( | |||||
GraphenePythonError, | |||||
GrapheneRepositoryNotFoundError, | |||||
GrapheneSensorNotFoundError, | |||||
) | |||||
from .inputs import GrapheneSensorSelector | |||||
from .jobs import GrapheneFutureJobTick, GrapheneJobState | |||||
from .util import non_null_list | |||||
class GrapheneSensor(graphene.ObjectType): | |||||
id = graphene.NonNull(graphene.ID) | |||||
jobOriginId = graphene.NonNull(graphene.String) | |||||
name = graphene.NonNull(graphene.String) | |||||
pipelineName = graphene.NonNull(graphene.String) | |||||
solidSelection = graphene.List(graphene.String) | |||||
mode = graphene.NonNull(graphene.String) | |||||
sensorState = graphene.NonNull(GrapheneJobState) | |||||
nextTick = graphene.Field(GrapheneFutureJobTick) | |||||
class DauphinSensor(dauphin.ObjectType): | |||||
class Meta: | class Meta: | ||||
name = "Sensor" | name = "Sensor" | ||||
id = dauphin.NonNull(dauphin.ID) | |||||
jobOriginId = dauphin.NonNull(dauphin.String) | |||||
name = dauphin.NonNull(dauphin.String) | |||||
pipelineName = dauphin.NonNull(dauphin.String) | |||||
solidSelection = dauphin.List(dauphin.String) | |||||
mode = dauphin.NonNull(dauphin.String) | |||||
sensorState = dauphin.NonNull("JobState") | |||||
nextTick = dauphin.Field("FutureJobTick") | |||||
def resolve_id(self, _): | |||||
return "%s:%s" % (self.name, self.pipelineName) | |||||
def __init__(self, graphene_info, external_sensor): | def __init__(self, graphene_info, external_sensor): | ||||
self._external_sensor = check.inst_param(external_sensor, "external_sensor", ExternalSensor) | self._external_sensor = check.inst_param(external_sensor, "external_sensor", ExternalSensor) | ||||
self._sensor_state = graphene_info.context.instance.get_job_state( | self._sensor_state = graphene_info.context.instance.get_job_state( | ||||
self._external_sensor.get_external_origin_id() | self._external_sensor.get_external_origin_id() | ||||
) | ) | ||||
if not self._sensor_state: | if not self._sensor_state: | ||||
# Also include a SensorState for a stopped sensor that may not | # Also include a SensorState for a stopped sensor that may not | ||||
# have a stored database row yet | # have a stored database row yet | ||||
self._sensor_state = self._external_sensor.get_default_job_state( | self._sensor_state = self._external_sensor.get_default_job_state( | ||||
graphene_info.context.instance | graphene_info.context.instance | ||||
) | ) | ||||
super(DauphinSensor, self).__init__( | super().__init__( | ||||
name=external_sensor.name, | name=external_sensor.name, | ||||
jobOriginId=external_sensor.get_external_origin_id(), | jobOriginId=external_sensor.get_external_origin_id(), | ||||
pipelineName=external_sensor.pipeline_name, | pipelineName=external_sensor.pipeline_name, | ||||
solidSelection=external_sensor.solid_selection, | solidSelection=external_sensor.solid_selection, | ||||
mode=external_sensor.mode, | mode=external_sensor.mode, | ||||
) | ) | ||||
def resolve_sensorState(self, graphene_info): | def resolve_id(self, _): | ||||
return graphene_info.schema.type_named("JobState")(self._sensor_state) | return f"{self.name}:{self.pipelineName}" | ||||
def resolve_sensorState(self, _graphene_info): | |||||
return GrapheneJobState(self._sensor_state) | |||||
def resolve_nextTick(self, graphene_info): | def resolve_nextTick(self, graphene_info): | ||||
return get_sensor_next_tick(graphene_info, self._sensor_state) | return get_sensor_next_tick(graphene_info, self._sensor_state) | ||||
class DauphinSensorOrError(dauphin.Union): | class GrapheneSensorOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "SensorOrError" | |||||
types = ( | types = ( | ||||
"Sensor", | GrapheneSensor, | ||||
DauphinSensorNotFoundError, | GrapheneSensorNotFoundError, | ||||
DauphinPythonError, | GraphenePythonError, | ||||
) | ) | ||||
name = "SensorOrError" | |||||
class GrapheneSensors(graphene.ObjectType): | |||||
results = non_null_list(GrapheneSensor) | |||||
class DauphinSensors(dauphin.ObjectType): | |||||
class Meta: | class Meta: | ||||
name = "Sensors" | name = "Sensors" | ||||
results = dauphin.non_null_list("Sensor") | |||||
class GrapheneSensorsOrError(graphene.Union): | |||||
class DauphinSensorsOrError(dauphin.Union): | |||||
class Meta: | class Meta: | ||||
types = (GrapheneSensors, GrapheneRepositoryNotFoundError, GraphenePythonError) | |||||
name = "SensorsOrError" | name = "SensorsOrError" | ||||
types = (DauphinSensors, DauphinRepositoryNotFoundError, DauphinPythonError) | |||||
class DauphinStartSensorMutation(dauphin.Mutation): | class GrapheneStartSensorMutation(graphene.Mutation): | ||||
class Meta: | Output = graphene.NonNull(GrapheneSensorOrError) | ||||
name = "StartSensorMutation" | |||||
class Arguments: | class Arguments: | ||||
sensor_selector = dauphin.NonNull("SensorSelector") | sensor_selector = graphene.NonNull(GrapheneSensorSelector) | ||||
Output = dauphin.NonNull("SensorOrError") | class Meta: | ||||
name = "StartSensorMutation" | |||||
def mutate(self, graphene_info, sensor_selector): | def mutate(self, graphene_info, sensor_selector): | ||||
return start_sensor(graphene_info, SensorSelector.from_graphql_input(sensor_selector)) | return start_sensor(graphene_info, SensorSelector.from_graphql_input(sensor_selector)) | ||||
class DauphinStopSensorMutation(dauphin.Mutation): | class GrapheneStopSensorMutationResult(graphene.ObjectType): | ||||
class Meta: | jobState = graphene.Field(GrapheneJobState) | ||||
name = "StopSensorMutation" | |||||
class Arguments: | |||||
job_origin_id = dauphin.NonNull(dauphin.String) | |||||
Output = dauphin.NonNull("StopSensorMutationResultOrError") | |||||
def mutate(self, graphene_info, job_origin_id): | |||||
return stop_sensor(graphene_info, job_origin_id) | |||||
class DauphinStopSensorMutationResult(dauphin.ObjectType): | |||||
class Meta: | class Meta: | ||||
name = "StopSensorMutationResult" | name = "StopSensorMutationResult" | ||||
jobState = dauphin.Field("JobState") | |||||
def __init__(self, job_state): | def __init__(self, job_state): | ||||
super().__init__() | |||||
self._job_state = check.inst_param(job_state, "job_state", JobState) | self._job_state = check.inst_param(job_state, "job_state", JobState) | ||||
def resolve_jobState(self, graphene_info): | def resolve_jobState(self, _graphene_info): | ||||
if not self._job_state: | if not self._job_state: | ||||
return None | return None | ||||
return graphene_info.schema.type_named("JobState")(job_state=self._job_state) | return GrapheneJobState(job_state=self._job_state) | ||||
class DauphinStopSensorMutationResultOrError(dauphin.Union): | class GrapheneStopSensorMutationResultOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
types = (GrapheneStopSensorMutationResult, GraphenePythonError) | |||||
name = "StopSensorMutationResultOrError" | name = "StopSensorMutationResultOrError" | ||||
types = ("StopSensorMutationResult", "PythonError") | |||||
class GrapheneStopSensorMutation(graphene.Mutation): | |||||
Output = graphene.NonNull(GrapheneStopSensorMutationResultOrError) | |||||
class Arguments: | |||||
job_origin_id = graphene.NonNull(graphene.String) | |||||
class Meta: | |||||
name = "StopSensorMutation" | |||||
def mutate(self, graphene_info, job_origin_id): | |||||
return stop_sensor(graphene_info, job_origin_id) | |||||
types = [ | |||||
GrapheneSensor, | |||||
GrapheneSensorOrError, | |||||
GrapheneSensors, | |||||
GrapheneSensorsOrError, | |||||
GrapheneStopSensorMutation, | |||||
GrapheneStopSensorMutationResult, | |||||
GrapheneStopSensorMutationResultOrError, | |||||
GrapheneStopSensorMutation, | |||||
] |