Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/schema/sensors.py
from datetime import datetime | from datetime import datetime | ||||
import graphene | |||||
from dagster import check | from dagster import check | ||||
from dagster.core.host_representation import ExternalSensor, SensorSelector | from dagster.core.host_representation import ExternalSensor | ||||
from dagster.core.scheduler.job import JobState, JobStatus | from dagster.core.host_representation import SensorSelector as DagsterSensorSelector | ||||
from dagster.core.scheduler.job import JobState as DagsterJobState | |||||
from dagster.core.scheduler.job import JobStatus | |||||
from dagster.utils import datetime_as_float | from dagster.utils import datetime_as_float | ||||
from dagster_graphql import dauphin | |||||
from dagster_graphql.implementation.fetch_sensors import start_sensor, stop_sensor | |||||
from dagster_graphql.schema.errors import ( | |||||
DauphinPythonError, | |||||
DauphinRepositoryNotFoundError, | |||||
DauphinSensorNotFoundError, | |||||
) | |||||
SENSOR_DAEMON_INTERVAL = 30 | from ..implementation.fetch_sensors import start_sensor, stop_sensor | ||||
from .errors import PythonError, RepositoryNotFoundError, SensorNotFoundError | |||||
from .inputs import SensorSelector | |||||
from .jobs import FutureJobTick, JobState | |||||
from .util import non_null_list | |||||
SENSOR_DAEMON_INTERVAL = 30 | |||||
class DauphinSensor(dauphin.ObjectType): | |||||
class Meta: | |||||
name = "Sensor" | |||||
id = dauphin.NonNull(dauphin.ID) | class Sensor(graphene.ObjectType): | ||||
jobOriginId = dauphin.NonNull(dauphin.String) | id = graphene.NonNull(graphene.ID) | ||||
name = dauphin.NonNull(dauphin.String) | jobOriginId = graphene.NonNull(graphene.String) | ||||
pipelineName = dauphin.NonNull(dauphin.String) | name = graphene.NonNull(graphene.String) | ||||
solidSelection = dauphin.List(dauphin.String) | pipelineName = graphene.NonNull(graphene.String) | ||||
mode = dauphin.NonNull(dauphin.String) | solidSelection = graphene.List(graphene.String) | ||||
sensorState = dauphin.NonNull("JobState") | mode = graphene.NonNull(graphene.String) | ||||
nextTick = dauphin.Field("FutureJobTick") | sensorState = graphene.NonNull(JobState) | ||||
nextTick = graphene.Field(FutureJobTick) | |||||
def resolve_id(self, _): | def resolve_id(self, _): | ||||
return "%s:%s" % (self.name, self.pipelineName) | return f"{self.name}:{self.pipelineName}" | ||||
def __init__(self, graphene_info, external_sensor): | def __init__(self, graphene_info, external_sensor): | ||||
self._external_sensor = check.inst_param(external_sensor, "external_sensor", ExternalSensor) | self._external_sensor = check.inst_param(external_sensor, "external_sensor", ExternalSensor) | ||||
self._sensor_state = graphene_info.context.instance.get_job_state( | self._sensor_state = graphene_info.context.instance.get_job_state( | ||||
self._external_sensor.get_external_origin_id() | self._external_sensor.get_external_origin_id() | ||||
) | ) | ||||
if not self._sensor_state: | if not self._sensor_state: | ||||
# Also include a SensorState for a stopped sensor that may not | # Also include a SensorState for a stopped sensor that may not | ||||
# have a stored database row yet | # have a stored database row yet | ||||
self._sensor_state = self._external_sensor.get_default_job_state() | self._sensor_state = self._external_sensor.get_default_job_state() | ||||
super(DauphinSensor, self).__init__( | super(Sensor, self).__init__( | ||||
name=external_sensor.name, | name=external_sensor.name, | ||||
jobOriginId=external_sensor.get_external_origin_id(), | jobOriginId=external_sensor.get_external_origin_id(), | ||||
pipelineName=external_sensor.pipeline_name, | pipelineName=external_sensor.pipeline_name, | ||||
solidSelection=external_sensor.solid_selection, | solidSelection=external_sensor.solid_selection, | ||||
mode=external_sensor.mode, | mode=external_sensor.mode, | ||||
) | ) | ||||
def resolve_sensorState(self, graphene_info): | def resolve_sensorState(self, _graphene_info): | ||||
return graphene_info.schema.type_named("JobState")(self._sensor_state) | return JobState(self._sensor_state) | ||||
def resolve_nextTick(self, graphene_info): | def resolve_nextTick(self, graphene_info): | ||||
if self._sensor_state.status != JobStatus.RUNNING: | if self._sensor_state.status != JobStatus.RUNNING: | ||||
return None | return None | ||||
latest_tick = graphene_info.context.instance.get_latest_job_tick( | latest_tick = graphene_info.context.instance.get_latest_job_tick( | ||||
self._sensor_state.job_origin_id | self._sensor_state.job_origin_id | ||||
) | ) | ||||
if not latest_tick: | if not latest_tick: | ||||
return None | return None | ||||
next_timestamp = latest_tick.timestamp + SENSOR_DAEMON_INTERVAL | next_timestamp = latest_tick.timestamp + SENSOR_DAEMON_INTERVAL | ||||
if next_timestamp < datetime_as_float(datetime.now()): | if next_timestamp < datetime_as_float(datetime.now()): | ||||
return None | return None | ||||
return graphene_info.schema.type_named("FutureJobTick")(next_timestamp) | return FutureJobTick(next_timestamp) | ||||
class DauphinSensorOrError(dauphin.Union): | class SensorOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "SensorOrError" | |||||
types = ( | types = ( | ||||
"Sensor", | Sensor, | ||||
DauphinSensorNotFoundError, | SensorNotFoundError, | ||||
DauphinPythonError, | PythonError, | ||||
) | ) | ||||
class DauphinSensors(dauphin.ObjectType): | class Sensors(graphene.ObjectType): | ||||
class Meta: | results = non_null_list(Sensor) | ||||
name = "Sensors" | |||||
results = dauphin.non_null_list("Sensor") | |||||
class DauphinSensorsOrError(dauphin.Union): | class SensorsOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "SensorsOrError" | types = (Sensors, RepositoryNotFoundError, PythonError) | ||||
types = (DauphinSensors, DauphinRepositoryNotFoundError, DauphinPythonError) | |||||
class DauphinStartSensorMutation(dauphin.Mutation): | class StartSensorMutation(graphene.Mutation): | ||||
class Meta: | |||||
name = "StartSensorMutation" | |||||
class Arguments: | class Arguments: | ||||
sensor_selector = dauphin.NonNull("SensorSelector") | sensor_selector = graphene.NonNull(SensorSelector) | ||||
Output = dauphin.NonNull("SensorOrError") | Output = graphene.NonNull(SensorOrError) | ||||
def mutate(self, graphene_info, sensor_selector): | def mutate(self, graphene_info, sensor_selector): | ||||
return start_sensor(graphene_info, SensorSelector.from_graphql_input(sensor_selector)) | return start_sensor( | ||||
graphene_info, DagsterSensorSelector.from_graphql_input(sensor_selector) | |||||
) | |||||
class DauphinStopSensorMutation(dauphin.Mutation): | class StopSensorMutationResult(graphene.ObjectType): | ||||
class Meta: | jobState = graphene.Field(JobState) | ||||
name = "StopSensorMutation" | |||||
class Arguments: | def __init__(self, job_state): | ||||
job_origin_id = dauphin.NonNull(dauphin.String) | super().__init__() | ||||
self._job_state = check.inst_param(job_state, "job_state", DagsterJobState) | |||||
Output = dauphin.NonNull("StopSensorMutationResultOrError") | def resolve_jobState(self, _graphene_info): | ||||
if not self._job_state: | |||||
return None | |||||
def mutate(self, graphene_info, job_origin_id): | return JobState(job_state=self._job_state) | ||||
return stop_sensor(graphene_info, job_origin_id) | |||||
class DauphinStopSensorMutationResult(dauphin.ObjectType): | class StopSensorMutationResultOrError(graphene.Union): | ||||
class Meta: | class Meta: | ||||
name = "StopSensorMutationResult" | types = (StopSensorMutationResult, PythonError) | ||||
jobState = dauphin.Field("JobState") | |||||
def __init__(self, job_state): | class StopSensorMutation(graphene.Mutation): | ||||
self._job_state = check.inst_param(job_state, "job_state", JobState) | class Arguments: | ||||
job_origin_id = graphene.NonNull(graphene.String) | |||||
def resolve_jobState(self, graphene_info): | Output = graphene.NonNull(StopSensorMutationResultOrError) | ||||
if not self._job_state: | |||||
return None | |||||
return graphene_info.schema.type_named("JobState")(job_state=self._job_state) | def mutate(self, graphene_info, job_origin_id): | ||||
return stop_sensor(graphene_info, job_origin_id) | |||||
class DauphinStopSensorMutationResultOrError(dauphin.Union): | types = [ | ||||
class Meta: | Sensor, | ||||
name = "StopSensorMutationResultOrError" | SensorOrError, | ||||
types = ("StopSensorMutationResult", "PythonError") | Sensors, | ||||
SensorsOrError, | |||||
StopSensorMutation, | |||||
StopSensorMutationResult, | |||||
StopSensorMutationResultOrError, | |||||
StopSensorMutation, | |||||
] |