Differential D7970 Diff 38053 python_modules/libraries/dagstermill/dagstermill_tests/test_event_callback.py
Changeset View
Changeset View
Standalone View
Standalone View
python_modules/libraries/dagstermill/dagstermill_tests/test_event_callback.py
Show All 16 Lines | def _event_callback(record): | ||||
events[record.dagster_event.event_type].append(record) | events[record.dagster_event.event_type].append(record) | ||||
pipeline = ReconstructablePipeline.for_module( | pipeline = ReconstructablePipeline.for_module( | ||||
"dagstermill.examples.repository", | "dagstermill.examples.repository", | ||||
"hello_logging_pipeline", | "hello_logging_pipeline", | ||||
) | ) | ||||
pipeline_def = pipeline.get_definition() | pipeline_def = pipeline.get_definition() | ||||
with instance_for_test() as instance: | with instance_for_test() as instance: | ||||
pipeline_run = instance.create_run_for_pipeline(pipeline_def) | pipeline_run = instance.create_run_for_pipeline(pipeline_def) | ||||
instance.watch_event_logs(pipeline_run.run_id, -1, _event_callback) | instance.watch_event_logs(pipeline_run.run_id, -1, _event_callback) | ||||
execute_run( | res = execute_run( | ||||
pipeline, | pipeline, | ||||
pipeline_run, | pipeline_run, | ||||
instance, | instance, | ||||
) | ) | ||||
assert res.success | |||||
passed_before_timeout = False | passed_before_timeout = False | ||||
retries = 5 | retries = 5 | ||||
while retries > 0: | while retries > 0: | ||||
time.sleep(0.333) | time.sleep(0.333) | ||||
if DagsterEventType.PIPELINE_FAILURE in events.keys(): | if DagsterEventType.PIPELINE_FAILURE in events.keys(): | ||||
break | break | ||||
if DagsterEventType.PIPELINE_SUCCESS in events.keys(): | if DagsterEventType.PIPELINE_SUCCESS in events.keys(): | ||||
passed_before_timeout = True | passed_before_timeout = True | ||||
break | break | ||||
retries -= 1 | retries -= 1 | ||||
assert passed_before_timeout | assert passed_before_timeout |