Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster-graphql/dagster_graphql/schema/sensors.py
Show All 33 Lines | def __init__(self, graphene_info, external_sensor): | ||||
self._external_sensor = check.inst_param(external_sensor, "external_sensor", ExternalSensor) | self._external_sensor = check.inst_param(external_sensor, "external_sensor", ExternalSensor) | ||||
self._sensor_state = graphene_info.context.instance.get_job_state( | self._sensor_state = graphene_info.context.instance.get_job_state( | ||||
self._external_sensor.get_external_origin_id() | self._external_sensor.get_external_origin_id() | ||||
) | ) | ||||
if not self._sensor_state: | if not self._sensor_state: | ||||
# Also include a SensorState for a stopped sensor that may not | # Also include a SensorState for a stopped sensor that may not | ||||
# have a stored database row yet | # have a stored database row yet | ||||
self._sensor_state = self._external_sensor.get_default_job_state() | self._sensor_state = self._external_sensor.get_default_job_state( | ||||
graphene_info.context.instance | |||||
) | |||||
super(DauphinSensor, self).__init__( | super(DauphinSensor, self).__init__( | ||||
name=external_sensor.name, | name=external_sensor.name, | ||||
jobOriginId=external_sensor.get_external_origin_id(), | jobOriginId=external_sensor.get_external_origin_id(), | ||||
pipelineName=external_sensor.pipeline_name, | pipelineName=external_sensor.pipeline_name, | ||||
solidSelection=external_sensor.solid_selection, | solidSelection=external_sensor.solid_selection, | ||||
mode=external_sensor.mode, | mode=external_sensor.mode, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 77 Lines • Show Last 20 Lines |