Differential D5228 Diff 26378 python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster_tests/scheduler_tests/test_sensor_run.py
import os | import os | ||||
import sys | import sys | ||||
import time | import time | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
import pendulum | import pendulum | ||||
import pytest | import pytest | ||||
from dagster import pipeline, repository, solid | from dagster import pipeline, repository, solid | ||||
from dagster.core.definitions.decorators.sensor import sensor | from dagster.core.definitions.decorators.sensor import sensor | ||||
from dagster.core.definitions.job import JobType | from dagster.core.definitions.job import JobType | ||||
from dagster.core.definitions.sensor import SensorRunParams, SensorSkipData | from dagster.core.definitions.sensor import RunRequest, SkipReason | ||||
from dagster.core.host_representation import ( | from dagster.core.host_representation import ( | ||||
ManagedGrpcPythonEnvRepositoryLocationOrigin, | ManagedGrpcPythonEnvRepositoryLocationOrigin, | ||||
RepositoryLocation, | RepositoryLocation, | ||||
RepositoryLocationHandle, | RepositoryLocationHandle, | ||||
) | ) | ||||
from dagster.core.scheduler.job import JobState, JobStatus, JobTickStatus | from dagster.core.scheduler.job import JobState, JobStatus, JobTickStatus | ||||
from dagster.core.storage.pipeline_run import PipelineRunStatus | from dagster.core.storage.pipeline_run import PipelineRunStatus | ||||
from dagster.core.test_utils import instance_for_test | from dagster.core.test_utils import instance_for_test | ||||
Show All 10 Lines | |||||
@pipeline | @pipeline | ||||
def the_pipeline(): | def the_pipeline(): | ||||
the_solid() | the_solid() | ||||
@sensor(pipeline_name="the_pipeline") | @sensor(pipeline_name="the_pipeline") | ||||
def simple_sensor(context): | def simple_sensor(context): | ||||
if not context.last_completion_time or not int(context.last_completion_time) % 2: | if not context.last_completion_time or not int(context.last_completion_time) % 2: | ||||
return SensorSkipData() | return SkipReason() | ||||
return SensorRunParams(execution_key=None, run_config={}, tags={}) | return RunRequest(run_key=None, run_config={}, tags={}) | ||||
@sensor(pipeline_name="the_pipeline") | @sensor(pipeline_name="the_pipeline") | ||||
def always_on_sensor(_context): | def always_on_sensor(_context): | ||||
return SensorRunParams(execution_key=None, run_config={}, tags={}) | return RunRequest(run_key=None, run_config={}, tags={}) | ||||
@sensor(pipeline_name="the_pipeline") | @sensor(pipeline_name="the_pipeline") | ||||
def execution_key_sensor(_context): | def run_key_sensor(_context): | ||||
return SensorRunParams(execution_key="only_once", run_config={}, tags={}) | return RunRequest(run_key="only_once", run_config={}, tags={}) | ||||
@sensor(pipeline_name="the_pipeline") | @sensor(pipeline_name="the_pipeline") | ||||
def error_sensor(context): | def error_sensor(context): | ||||
raise Exception("womp womp") | raise Exception("womp womp") | ||||
@repository | @repository | ||||
def the_repo(): | def the_repo(): | ||||
return [the_pipeline, simple_sensor, error_sensor, always_on_sensor, execution_key_sensor] | return [the_pipeline, simple_sensor, error_sensor, always_on_sensor, run_key_sensor] | ||||
@contextmanager | @contextmanager | ||||
def instance_with_sensors(external_repo_context, overrides=None): | def instance_with_sensors(external_repo_context, overrides=None): | ||||
with instance_for_test(overrides) as instance: | with instance_for_test(overrides) as instance: | ||||
with external_repo_context() as external_repo: | with external_repo_context() as external_repo: | ||||
yield (instance, external_repo) | yield (instance, external_repo) | ||||
Show All 21 Lines | |||||
def validate_tick( | def validate_tick( | ||||
tick, | tick, | ||||
external_sensor, | external_sensor, | ||||
expected_datetime, | expected_datetime, | ||||
expected_status, | expected_status, | ||||
expected_run_id=None, | expected_run_id=None, | ||||
expected_error=None, | expected_error=None, | ||||
expected_execution_key=None, | expected_run_key=None, | ||||
): | ): | ||||
tick_data = tick.job_tick_data | tick_data = tick.job_tick_data | ||||
assert tick_data.job_origin_id == external_sensor.get_external_origin_id() | assert tick_data.job_origin_id == external_sensor.get_external_origin_id() | ||||
assert tick_data.job_name == external_sensor.name | assert tick_data.job_name == external_sensor.name | ||||
assert tick_data.job_type == JobType.SENSOR | assert tick_data.job_type == JobType.SENSOR | ||||
assert tick_data.status == expected_status | assert tick_data.status == expected_status | ||||
assert tick_data.timestamp == expected_datetime.timestamp() | assert tick_data.timestamp == expected_datetime.timestamp() | ||||
assert tick_data.run_id == expected_run_id | assert tick_data.run_id == expected_run_id | ||||
assert tick_data.execution_key == expected_execution_key | assert tick_data.run_key == expected_run_key | ||||
if expected_error: | if expected_error: | ||||
assert expected_error in tick_data.error.message | assert expected_error in tick_data.error.message | ||||
def validate_run_started(run, expected_success=True): | def validate_run_started(run, expected_success=True): | ||||
if expected_success: | if expected_success: | ||||
assert run.status == PipelineRunStatus.STARTED or run.status == PipelineRunStatus.SUCCESS | assert run.status == PipelineRunStatus.STARTED or run.status == PipelineRunStatus.SUCCESS | ||||
else: | else: | ||||
▲ Show 20 Lines • Show All 152 Lines • ▼ Show 20 Lines | |||||
@pytest.mark.parametrize("external_repo_context", repos()) | @pytest.mark.parametrize("external_repo_context", repos()) | ||||
def test_launch_once(external_repo_context, capfd): | def test_launch_once(external_repo_context, capfd): | ||||
freeze_datetime = pendulum.datetime( | freeze_datetime = pendulum.datetime( | ||||
year=2019, month=2, day=27, hour=23, minute=59, second=59, | year=2019, month=2, day=27, hour=23, minute=59, second=59, | ||||
).in_tz("US/Central") | ).in_tz("US/Central") | ||||
with instance_with_sensors(external_repo_context) as (instance, external_repo): | with instance_with_sensors(external_repo_context) as (instance, external_repo): | ||||
with pendulum.test(freeze_datetime): | with pendulum.test(freeze_datetime): | ||||
external_sensor = external_repo.get_external_sensor("execution_key_sensor") | external_sensor = external_repo.get_external_sensor("run_key_sensor") | ||||
instance.add_job_state( | instance.add_job_state( | ||||
JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) | JobState(external_sensor.get_external_origin(), JobType.SENSOR, JobStatus.RUNNING) | ||||
) | ) | ||||
assert instance.get_runs_count() == 0 | assert instance.get_runs_count() == 0 | ||||
ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ||||
assert len(ticks) == 0 | assert len(ticks) == 0 | ||||
execute_sensor_iteration(instance, get_default_daemon_logger("SensorDaemon")) | execute_sensor_iteration(instance, get_default_daemon_logger("SensorDaemon")) | ||||
wait_for_all_runs_to_start(instance) | wait_for_all_runs_to_start(instance) | ||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
run = instance.get_runs()[0] | run = instance.get_runs()[0] | ||||
ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ||||
assert len(ticks) == 1 | assert len(ticks) == 1 | ||||
validate_tick( | validate_tick( | ||||
ticks[0], | ticks[0], | ||||
external_sensor, | external_sensor, | ||||
freeze_datetime, | freeze_datetime, | ||||
JobTickStatus.SUCCESS, | JobTickStatus.SUCCESS, | ||||
expected_run_id=run.run_id, | expected_run_id=run.run_id, | ||||
expected_execution_key="only_once", | expected_run_key="only_once", | ||||
) | ) | ||||
# run again, ensure | # run again, ensure | ||||
execute_sensor_iteration(instance, get_default_daemon_logger("SensorDaemon")) | execute_sensor_iteration(instance, get_default_daemon_logger("SensorDaemon")) | ||||
assert instance.get_runs_count() == 1 | assert instance.get_runs_count() == 1 | ||||
ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ticks = instance.get_job_ticks(external_sensor.get_external_origin_id()) | ||||
assert len(ticks) == 2 | assert len(ticks) == 2 | ||||
validate_tick( | validate_tick( | ||||
ticks[0], | ticks[0], | ||||
external_sensor, | external_sensor, | ||||
freeze_datetime, | freeze_datetime, | ||||
JobTickStatus.SKIPPED, | JobTickStatus.SKIPPED, | ||||
expected_execution_key="only_once", | expected_run_key="only_once", | ||||
) | ) | ||||
captured = capfd.readouterr() | captured = capfd.readouterr() | ||||
assert ( | assert ( | ||||
"Found existing run for sensor execution_key_sensor with execution_key `only_once`, skipping." | "Found existing run for sensor run_key_sensor with run_key `only_once`, skipping." | ||||
in captured.out | in captured.out | ||||
) | ) |