Differential D6164 Diff 30546 python_modules/dagster-graphql/dagster_graphql/implementation/fetch_sensors.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/implementation/fetch_sensors.py
from dagster import check | from dagster import check | ||||
from dagster.core.definitions.job import JobType | from dagster.core.definitions.job import JobType | ||||
from dagster.core.host_representation import ( | from dagster.core.host_representation import ( | ||||
ExternalSensor, | ExternalSensor, | ||||
PipelineSelector, | PipelineSelector, | ||||
RepositorySelector, | RepositorySelector, | ||||
SensorSelector, | SensorSelector, | ||||
) | ) | ||||
from dagster.core.scheduler.job import JobStatus | from dagster.core.scheduler.job import JobStatus | ||||
from dagster.daemon.controller import SENSOR_DAEMON_INTERVAL | from dagster.daemon.controller import SENSOR_DAEMON_INTERVAL | ||||
from dagster.seven import get_current_datetime_in_utc, get_timestamp_from_utc_datetime | from dagster.seven import get_current_datetime_in_utc, get_timestamp_from_utc_datetime | ||||
from graphql.execution.base import ResolveInfo | from graphql.execution.base import ResolveInfo | ||||
from .utils import UserFacingGraphQLError, capture_dauphin_error | from .utils import UserFacingGraphQLError, capture_error | ||||
@capture_dauphin_error | @capture_error | ||||
def get_sensors_or_error(graphene_info, repository_selector): | def get_sensors_or_error(graphene_info, repository_selector): | ||||
from ..schema.sensors import GrapheneSensor, GrapheneSensors | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(repository_selector, "repository_selector", RepositorySelector) | check.inst_param(repository_selector, "repository_selector", RepositorySelector) | ||||
location = graphene_info.context.get_repository_location(repository_selector.location_name) | location = graphene_info.context.get_repository_location(repository_selector.location_name) | ||||
repository = location.get_repository(repository_selector.repository_name) | repository = location.get_repository(repository_selector.repository_name) | ||||
return graphene_info.schema.type_named("Sensors")( | return GrapheneSensors( | ||||
results=[ | results=[ | ||||
graphene_info.schema.type_named("Sensor")(graphene_info, sensor) | GrapheneSensor(graphene_info, sensor) for sensor in repository.get_external_sensors() | ||||
for sensor in repository.get_external_sensors() | |||||
] | ] | ||||
) | ) | ||||
@capture_dauphin_error | @capture_error | ||||
def get_sensor_or_error(graphene_info, selector): | def get_sensor_or_error(graphene_info, selector): | ||||
from ..schema.errors import GrapheneSensorNotFoundError | |||||
from ..schema.sensors import GrapheneSensor | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(selector, "selector", SensorSelector) | check.inst_param(selector, "selector", SensorSelector) | ||||
location = graphene_info.context.get_repository_location(selector.location_name) | location = graphene_info.context.get_repository_location(selector.location_name) | ||||
repository = location.get_repository(selector.repository_name) | repository = location.get_repository(selector.repository_name) | ||||
external_job = repository.get_external_job(selector.sensor_name) | external_job = repository.get_external_job(selector.sensor_name) | ||||
if not external_job or not isinstance(external_job, ExternalSensor): | if not external_job or not isinstance(external_job, ExternalSensor): | ||||
raise UserFacingGraphQLError( | raise UserFacingGraphQLError(GrapheneSensorNotFoundError(selector.sensor_name)) | ||||
graphene_info.schema.type_named("SensorNotFoundError")(selector.sensor_name) | |||||
) | |||||
return graphene_info.schema.type_named("Sensor")(graphene_info, external_job) | return GrapheneSensor(graphene_info, external_job) | ||||
@capture_dauphin_error | @capture_error | ||||
def start_sensor(graphene_info, sensor_selector): | def start_sensor(graphene_info, sensor_selector): | ||||
from ..schema.errors import GrapheneSensorNotFoundError | |||||
from ..schema.sensors import GrapheneSensor | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(sensor_selector, "sensor_selector", SensorSelector) | check.inst_param(sensor_selector, "sensor_selector", SensorSelector) | ||||
location = graphene_info.context.get_repository_location(sensor_selector.location_name) | location = graphene_info.context.get_repository_location(sensor_selector.location_name) | ||||
repository = location.get_repository(sensor_selector.repository_name) | repository = location.get_repository(sensor_selector.repository_name) | ||||
external_sensor = repository.get_external_job(sensor_selector.sensor_name) | external_sensor = repository.get_external_job(sensor_selector.sensor_name) | ||||
if not isinstance(external_sensor, ExternalSensor): | if not isinstance(external_sensor, ExternalSensor): | ||||
raise UserFacingGraphQLError( | raise UserFacingGraphQLError(GrapheneSensorNotFoundError(sensor_selector.sensor_name)) | ||||
graphene_info.schema.type_named("SensorNotFoundError")(sensor_selector.sensor_name) | |||||
) | |||||
graphene_info.context.instance.start_sensor(external_sensor) | graphene_info.context.instance.start_sensor(external_sensor) | ||||
return graphene_info.schema.type_named("Sensor")(graphene_info, external_sensor) | return GrapheneSensor(graphene_info, external_sensor) | ||||
@capture_dauphin_error | @capture_error | ||||
def stop_sensor(graphene_info, job_origin_id): | def stop_sensor(graphene_info, job_origin_id): | ||||
from ..schema.sensors import GrapheneStopSensorMutationResult | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.str_param(job_origin_id, "job_origin_id") | check.str_param(job_origin_id, "job_origin_id") | ||||
instance = graphene_info.context.instance | instance = graphene_info.context.instance | ||||
job_state = instance.get_job_state(job_origin_id) | job_state = instance.get_job_state(job_origin_id) | ||||
if not job_state: | if not job_state: | ||||
return graphene_info.schema.type_named("StopSensorMutationResult")(jobState=None) | return GrapheneStopSensorMutationResult(job_state=None) | ||||
instance.stop_sensor(job_origin_id) | instance.stop_sensor(job_origin_id) | ||||
return graphene_info.schema.type_named("StopSensorMutationResult")( | return GrapheneStopSensorMutationResult(job_state=job_state.with_status(JobStatus.STOPPED)) | ||||
job_state=job_state.with_status(JobStatus.STOPPED) | |||||
) | |||||
@capture_dauphin_error | @capture_error | ||||
def get_unloadable_sensor_states_or_error(graphene_info): | def get_unloadable_sensor_states_or_error(graphene_info): | ||||
from ..schema.jobs import GrapheneJobState, GrapheneJobStates | |||||
sensor_states = graphene_info.context.instance.all_stored_job_state(job_type=JobType.SENSOR) | sensor_states = graphene_info.context.instance.all_stored_job_state(job_type=JobType.SENSOR) | ||||
external_sensors = [ | external_sensors = [ | ||||
sensor | sensor | ||||
for repository_location in graphene_info.context.repository_locations | for repository_location in graphene_info.context.repository_locations | ||||
for repository in repository_location.get_repositories().values() | for repository in repository_location.get_repositories().values() | ||||
for sensor in repository.get_external_sensors() | for sensor in repository.get_external_sensors() | ||||
] | ] | ||||
sensor_origin_ids = { | sensor_origin_ids = { | ||||
external_sensor.get_external_origin_id() for external_sensor in external_sensors | external_sensor.get_external_origin_id() for external_sensor in external_sensors | ||||
} | } | ||||
unloadable_states = [ | unloadable_states = [ | ||||
sensor_state | sensor_state | ||||
for sensor_state in sensor_states | for sensor_state in sensor_states | ||||
if sensor_state.job_origin_id not in sensor_origin_ids | if sensor_state.job_origin_id not in sensor_origin_ids | ||||
] | ] | ||||
return graphene_info.schema.type_named("JobStates")( | return GrapheneJobStates( | ||||
results=[ | results=[GrapheneJobState(job_state=job_state) for job_state in unloadable_states] | ||||
graphene_info.schema.type_named("JobState")(job_state=job_state) | |||||
for job_state in unloadable_states | |||||
] | |||||
) | ) | ||||
def get_sensors_for_pipeline(graphene_info, pipeline_selector): | def get_sensors_for_pipeline(graphene_info, pipeline_selector): | ||||
from ..schema.sensors import GrapheneSensor | |||||
check.inst_param(graphene_info, "graphene_info", ResolveInfo) | check.inst_param(graphene_info, "graphene_info", ResolveInfo) | ||||
check.inst_param(pipeline_selector, "pipeline_selector", PipelineSelector) | check.inst_param(pipeline_selector, "pipeline_selector", PipelineSelector) | ||||
location = graphene_info.context.get_repository_location(pipeline_selector.location_name) | location = graphene_info.context.get_repository_location(pipeline_selector.location_name) | ||||
repository = location.get_repository(pipeline_selector.repository_name) | repository = location.get_repository(pipeline_selector.repository_name) | ||||
external_sensors = repository.get_external_sensors() | external_sensors = repository.get_external_sensors() | ||||
return [ | return [ | ||||
graphene_info.schema.type_named("Sensor")(graphene_info, external_sensor) | GrapheneSensor(graphene_info, external_sensor) | ||||
for external_sensor in external_sensors | for external_sensor in external_sensors | ||||
if external_sensor.pipeline_name == pipeline_selector.pipeline_name | if external_sensor.pipeline_name == pipeline_selector.pipeline_name | ||||
] | ] | ||||
def get_sensor_next_tick(graphene_info, sensor_state): | def get_sensor_next_tick(graphene_info, sensor_state): | ||||
from ..schema.jobs import GrapheneFutureJobTick | |||||
if sensor_state.status != JobStatus.RUNNING: | if sensor_state.status != JobStatus.RUNNING: | ||||
return None | return None | ||||
latest_tick = graphene_info.context.instance.get_latest_job_tick(sensor_state.job_origin_id) | latest_tick = graphene_info.context.instance.get_latest_job_tick(sensor_state.job_origin_id) | ||||
if not latest_tick: | if not latest_tick: | ||||
return None | return None | ||||
next_timestamp = latest_tick.timestamp + SENSOR_DAEMON_INTERVAL | next_timestamp = latest_tick.timestamp + SENSOR_DAEMON_INTERVAL | ||||
if next_timestamp < get_timestamp_from_utc_datetime(get_current_datetime_in_utc()): | if next_timestamp < get_timestamp_from_utc_datetime(get_current_datetime_in_utc()): | ||||
return None | return None | ||||
return graphene_info.schema.type_named("FutureJobTick")(sensor_state, next_timestamp) | return GrapheneFutureJobTick(sensor_state, next_timestamp) |