Changeset View
Changeset View
Standalone View
Standalone View
python_modules/dagster/dagster/serdes/ipc.py
Show First 20 Lines • Show All 132 Lines • ▼ Show 20 Lines | def ipc_read_event_stream(file_path, timeout=30): | ||||
sleep_interval = 0.1 | sleep_interval = 0.1 | ||||
elapsed_time = 0 | elapsed_time = 0 | ||||
while elapsed_time < timeout and not os.path.exists(file_path): | while elapsed_time < timeout and not os.path.exists(file_path): | ||||
elapsed_time += sleep_interval | elapsed_time += sleep_interval | ||||
sleep(sleep_interval) | sleep(sleep_interval) | ||||
if not os.path.exists(file_path): | if not os.path.exists(file_path): | ||||
raise DagsterIPCProtocolError( | raise DagsterIPCProtocolError( | ||||
"Timeout: read stream has not received any data in {timeout} seconds" | "Timeout: read stream has not received any data in {timeout} seconds".format( | ||||
timeout=timeout | |||||
) | |||||
) | ) | ||||
with open(os.path.abspath(file_path), "r") as file_pointer: | with open(os.path.abspath(file_path), "r") as file_pointer: | ||||
message = _process_line(file_pointer) | message = _process_line(file_pointer) | ||||
while elapsed_time < timeout and message == None: | while elapsed_time < timeout and message == None: | ||||
elapsed_time += sleep_interval | elapsed_time += sleep_interval | ||||
sleep(sleep_interval) | sleep(sleep_interval) | ||||
message = _process_line(file_pointer) | message = _process_line(file_pointer) | ||||
▲ Show 20 Lines • Show All 45 Lines • Show Last 20 Lines |