Skip to content

Commit db9bf45

Browse files
Essozclaude
andcommitted
fix: resolve online checker crashes and add dynamic trace file detection
- Use .get() for args/kwargs (PRE) and return_values/exception (POST) when populating pt_map, since functions instrumented without argument capture (e.g. Adadelta.step) omit these fields — previously caused KeyError that silently dropped the record and prevented APIContainRelation from triggering - Add FolderCreationHandler that watches each trace folder and dynamically attaches a StreamLogHandler for any trace_*/proxy_log.json file created after the checker starts, fixing the checker getting stuck when training had not yet created trace files at startup time - Set float('inf') sentinel in read_time_map after _save_initial_content so files with no live updates don't block min_read_time indefinitely - Rename _get_api_args_map_to_check → all_needed_args_api in Checker_data and sort_inv_file for clarity Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 6ae7c8e commit db9bf45

2 files changed

Lines changed: 79 additions & 34 deletions

File tree

traincheck/onlinechecker/streamhandler_filesystem.py

Lines changed: 77 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import re
55
import time
66

7-
from watchdog.events import FileSystemEventHandler
7+
from watchdog.events import FileCreatedEvent, FileSystemEventHandler
88
from watchdog.observers.polling import PollingObserver
99

1010
from traincheck.config import config
@@ -43,7 +43,7 @@ def __init__(self, file_path, checker_data: Checker_data):
4343

4444
self.needed_vars = checker_data.needed_vars
4545
self.needed_apis = checker_data.needed_apis
46-
self._get_api_args_map_to_check = checker_data._get_api_args_map_to_check
46+
self.all_needed_args_api = checker_data.all_needed_args_api
4747

4848
self.min_read_time = checker_data.min_read_time
4949
self.lock = checker_data.lock
@@ -61,11 +61,22 @@ def _save_initial_content(self):
6161
self.logger.info(f"Processing initial content from {self.file_path}")
6262
self.fp.seek(0)
6363
lines = self.fp.readlines()
64-
if not lines:
65-
return
6664

67-
self._handle_line(lines)
68-
self.logger.info(f"Initial content from {self.file_path} processed.")
65+
if lines:
66+
self._handle_line(lines)
67+
self.logger.info(f"Initial content from {self.file_path} processed.")
68+
69+
# Mark this file as fully read so it doesn't block other files that
70+
# have records at later timestamps. Live on_modified updates override
71+
# this when new records actually arrive.
72+
with self.cond:
73+
self.checker_data.read_time_map[self.file_path] = float("inf")
74+
pre_min = self.checker_data.min_read_time
75+
self.checker_data.min_read_path, self.checker_data.min_read_time = min(
76+
self.checker_data.read_time_map.items(), default=(None, None)
77+
)
78+
if pre_min != self.checker_data.min_read_time:
79+
self.checker_data.cond.notify_all()
6980

7081
def on_modified(self, event):
7182
if os.path.abspath(event.src_path) != os.path.abspath(self.file_path):
@@ -181,33 +192,37 @@ def _set_func_map(self, trace_record):
181192
)
182193
if trace_type == TraceLineType.FUNC_CALL_PRE:
183194
self.pt_map[ptname][func_call_id].pre_record = trace_record
184-
self.pt_map[ptname][func_call_id].args = trace_record["args"]
185-
self.pt_map[ptname][func_call_id].kwargs = trace_record["kwargs"]
195+
self.pt_map[ptname][func_call_id].args = trace_record.get("args")
196+
self.pt_map[ptname][func_call_id].kwargs = trace_record.get(
197+
"kwargs"
198+
)
186199
elif trace_type == TraceLineType.FUNC_CALL_POST:
187200
assert self.pt_map[ptname][func_call_id].pre_record is not None
188201
self.pt_map[ptname][func_call_id].post_record = trace_record
189-
self.pt_map[ptname][func_call_id].return_values = trace_record[
202+
self.pt_map[ptname][func_call_id].return_values = trace_record.get(
190203
"return_values"
191-
]
204+
)
192205
elif trace_type == TraceLineType.FUNC_CALL_POST_EXCEPTION:
193206
self.pt_map[ptname][func_call_id].post_record = trace_record
194-
self.pt_map[ptname][func_call_id].exception = trace_record[
207+
self.pt_map[ptname][func_call_id].exception = trace_record.get(
195208
"exception"
196-
]
209+
)
197210

198211
if trace_type == TraceLineType.FUNC_CALL_PRE:
199-
if function_name in self.checker_data._get_api_args_map_to_check:
200-
if "args" in trace_record:
201-
if "meta_vars.step" not in trace_record:
202-
trace_record["meta_vars.step"] = -1
203-
step = trace_record["meta_vars.step"]
204-
if function_name not in self.args_map:
205-
self.args_map[function_name] = {}
206-
if step not in self.args_map[function_name]:
207-
self.args_map[function_name][step] = {}
208-
if ptid not in self.args_map[function_name][step]:
209-
self.args_map[function_name][step][ptid] = []
210-
self.args_map[function_name][step][ptid].append(trace_record)
212+
if function_name in self.checker_data.all_needed_args_api:
213+
assert (
214+
"args" in trace_record and "kwargs" in trace_record
215+
), f"Trace record for function call {function_name} does not contain args or kwargs: {trace_record}"
216+
if "meta_vars.step" not in trace_record:
217+
trace_record["meta_vars.step"] = -1
218+
step = trace_record["meta_vars.step"]
219+
if function_name not in self.args_map:
220+
self.args_map[function_name] = {}
221+
if step not in self.args_map[function_name]:
222+
self.args_map[function_name][step] = {}
223+
if ptid not in self.args_map[function_name][step]:
224+
self.args_map[function_name][step][ptid] = []
225+
self.args_map[function_name][step][ptid].append(trace_record)
211226

212227
if (
213228
".__enter__" in function_name
@@ -310,29 +325,59 @@ def _set_read_time(self, trace_record):
310325
self.checker_data.cond.notify_all()
311326

312327

328+
class FolderCreationHandler(FileSystemEventHandler):
329+
"""Watches a trace folder and dynamically attaches StreamLogHandler for new trace files."""
330+
331+
def __init__(self, trace_folder, checker_data: Checker_data, observer):
332+
self.trace_folder = os.path.abspath(trace_folder)
333+
self.checker_data = checker_data
334+
self.observer = observer
335+
self.seen_files: set[str] = set()
336+
self.logger = logging.getLogger(__name__)
337+
338+
def _is_trace_file(self, filename: str) -> bool:
339+
return filename.startswith("trace_") or filename.endswith("proxy_log.json")
340+
341+
def attach(self, file_path: str):
342+
"""Create and schedule a StreamLogHandler for a newly discovered file."""
343+
if file_path in self.seen_files:
344+
return
345+
self.seen_files.add(file_path)
346+
self.logger.info(f"New trace file detected, watching: {file_path}")
347+
handler = StreamLogHandler(file_path, self.checker_data)
348+
self.observer.schedule(
349+
handler, path=os.path.dirname(file_path), recursive=False
350+
)
351+
352+
def on_created(self, event):
353+
if isinstance(event, FileCreatedEvent):
354+
filename = os.path.basename(event.src_path)
355+
if self._is_trace_file(filename):
356+
self.attach(os.path.abspath(event.src_path))
357+
358+
313359
def run_stream_monitor(traces, trace_folders, checker_data: Checker_data):
314360
"""Run the stream monitor to watch the trace files and folders."""
315361
logger = logging.getLogger(__name__)
316362
observer = PollingObserver()
317-
handlers = []
318363
if traces is not None:
319364
file_path = os.path.abspath(traces[0])
320365
handler = StreamLogHandler(file_path, checker_data)
321-
handlers.append(handler)
322366
watch_dir = os.path.dirname(file_path)
323367
observer.schedule(handler, path=watch_dir, recursive=False)
324368
logger.info(f"Watching: {file_path}")
325369

326370
if trace_folders is not None:
327371
for trace_folder in trace_folders:
372+
folder_abs = os.path.abspath(trace_folder)
373+
creation_handler = FolderCreationHandler(folder_abs, checker_data, observer)
374+
observer.schedule(creation_handler, path=folder_abs, recursive=False)
375+
376+
# Pick up any trace files that already exist in the folder.
328377
for file in sorted(os.listdir(trace_folder)):
329378
if file.startswith("trace_") or file.endswith("proxy_log.json"):
330-
file_path = os.path.join(trace_folder, file)
331-
handler = StreamLogHandler(file_path, checker_data)
332-
handlers.append(handler)
333-
watch_dir = os.path.dirname(file_path)
334-
observer.schedule(handler, path=watch_dir, recursive=False)
335-
logger.info(f"Watching: {file_path}")
379+
file_path = os.path.join(folder_abs, file)
380+
creation_handler.attach(file_path)
336381

337382
observer.start()
338383
return observer

traincheck/onlinechecker/utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ class Checker_data:
99
"""Data structure for online checker threads. Holds the needed data and the queue for processing."""
1010

1111
def __init__(self, needed_data):
12-
needed_vars, needed_apis, _get_api_args_map_to_check = needed_data
12+
needed_vars, needed_apis, all_needed_args_api = needed_data
1313
self.needed_vars = needed_vars
1414
self.needed_apis = needed_apis
15-
self._get_api_args_map_to_check = _get_api_args_map_to_check
15+
self.all_needed_args_api = all_needed_args_api
1616

1717
self.check_queue = queue.Queue()
1818
self.varid_map = {}

0 commit comments

Comments
 (0)