Skip to content

Commit d32e7b1

Browse files
committed
add: parallelize trace file discovery
1 parent 7d64ac3 commit d32e7b1

1 file changed

Lines changed: 22 additions & 4 deletions

File tree

traincheck/onlinechecker/streamhandler_filesystem.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44
import re
55
import time
6+
from concurrent.futures import ThreadPoolExecutor, as_completed
67

78
from watchdog.events import FileCreatedEvent, FileSystemEventHandler
89
from watchdog.observers.polling import PollingObserver
@@ -380,10 +381,27 @@ def run_stream_monitor(traces, trace_folders, checker_data: Checker_data):
380381
observer.schedule(creation_handler, path=folder_abs, recursive=False)
381382

382383
# Pick up any trace files that already exist in the folder.
383-
for file in sorted(os.listdir(trace_folder)):
384-
if file.startswith("trace_") or file.endswith("proxy_log.json"):
385-
file_path = os.path.join(folder_abs, file)
386-
creation_handler.attach(file_path)
384+
existing_files = [
385+
os.path.join(folder_abs, file)
386+
for file in sorted(os.listdir(trace_folder))
387+
if file.startswith("trace_") or file.endswith("proxy_log.json")
388+
]
389+
390+
# Create handlers in parallel — each reads and processes the file's
391+
# initial content, which is the slow part (can be minutes per file).
392+
def _make_handler(fp):
393+
return fp, StreamLogHandler(fp, checker_data)
394+
395+
with ThreadPoolExecutor() as executor:
396+
futures = {
397+
executor.submit(_make_handler, fp): fp for fp in existing_files
398+
}
399+
for future in as_completed(futures):
400+
fp, handler = future.result()
401+
creation_handler.seen_files.add(fp)
402+
observer.schedule(
403+
handler, path=os.path.dirname(fp), recursive=False
404+
)
387405

388406
observer.start()
389407
return observer

0 commit comments

Comments
 (0)