Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/core/definitions/decorators/sensor.py
import inspect | import inspect | ||||
from dagster import check | from dagster import check | ||||
from dagster.core.definitions.sensor import SensorDefinition, SensorRunParams, SensorSkipData | from dagster.core.definitions.sensor import RunRequest, SensorDefinition, SkipReason | ||||
from dagster.core.errors import DagsterInvariantViolationError | from dagster.core.errors import DagsterInvariantViolationError | ||||
from dagster.utils.backcompat import experimental | from dagster.utils.backcompat import experimental | ||||
@experimental | @experimental | ||||
def sensor(pipeline_name, name=None, solid_selection=None, mode=None): | def sensor(pipeline_name, name=None, solid_selection=None, mode=None): | ||||
""" | """ | ||||
Creates a sensor where the decorated function is used as the sensor's evaluation function. The | Creates a sensor where the decorated function is used as the sensor's evaluation function. The | ||||
decorated function may: | decorated function may: | ||||
1. Return a `SensorRunParams` object. | 1. Return a `RunRequest` object. | ||||
2. Yield a number of `SensorRunParams` objects. | 2. Yield multiple of `RunRequest` objects. | ||||
3. Return a `SensorSkipData` object, providing a descriptive message of why no runs were | 3. Return a `SkipReason` object, providing a descriptive message of why no runs were requested. | ||||
requested. | |||||
4. Yield nothing (skipping without providing a reason) | 4. Yield nothing (skipping without providing a reason) | ||||
Takes a :py:class:`~dagster.SensorExecutionContext`. | Takes a :py:class:`~dagster.SensorExecutionContext`. | ||||
Args: | Args: | ||||
name (str): The name of this sensor | name (str): The name of this sensor | ||||
solid_selection (Optional[List[str]]): A list of solid subselection (including single | solid_selection (Optional[List[str]]): A list of solid subselection (including single | ||||
solid names) to execute for runs for this sensor e.g. | solid names) to execute for runs for this sensor e.g. | ||||
``['*some_solid+', 'other_solid']`` | ``['*some_solid+', 'other_solid']`` | ||||
mode (Optional[str]): The mode to apply when executing runs for this sensor. | mode (Optional[str]): The mode to apply when executing runs for this sensor. | ||||
(default: 'default') | (default: 'default') | ||||
""" | """ | ||||
check.opt_str_param(name, "name") | check.opt_str_param(name, "name") | ||||
def inner(fn): | def inner(fn): | ||||
check.callable_param(fn, "fn") | check.callable_param(fn, "fn") | ||||
sensor_name = name or fn.__name__ | sensor_name = name or fn.__name__ | ||||
def _wrapped_fn(context): | def _wrapped_fn(context): | ||||
result = fn(context) | result = fn(context) | ||||
if inspect.isgenerator(result): | if inspect.isgenerator(result): | ||||
for item in result: | for item in result: | ||||
yield item | yield item | ||||
elif isinstance(result, (SensorSkipData, SensorRunParams)): | elif isinstance(result, (SkipReason, RunRequest)): | ||||
yield result | yield result | ||||
elif result is not None: | elif result is not None: | ||||
raise DagsterInvariantViolationError( | raise DagsterInvariantViolationError( | ||||
( | ( | ||||
"Error in sensor {sensor_name}: Sensor unexpectedly returned output " | "Error in sensor {sensor_name}: Sensor unexpectedly returned output " | ||||
"{result} of type {type_}. Should only return SensorSkipData or " | "{result} of type {type_}. Should only return SkipReason or " | ||||
"SensorRunParams objects." | "RunRequest objects." | ||||
).format(sensor_name=sensor_name, result=result, type_=type(result)) | ).format(sensor_name=sensor_name, result=result, type_=type(result)) | ||||
) | ) | ||||
return SensorDefinition( | return SensorDefinition( | ||||
name=sensor_name, | name=sensor_name, | ||||
pipeline_name=pipeline_name, | pipeline_name=pipeline_name, | ||||
evaluation_fn=_wrapped_fn, | evaluation_fn=_wrapped_fn, | ||||
solid_selection=solid_selection, | solid_selection=solid_selection, | ||||
mode=mode, | mode=mode, | ||||
) | ) | ||||
return inner | return inner |