-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathanalyzer.py
More file actions
481 lines (405 loc) · 18.8 KB
/
analyzer.py
File metadata and controls
481 lines (405 loc) · 18.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
import os
import json
import time
import traceback
import math
from pathlib import Path
from typing import Optional, Dict, Any, List
import concurrent.futures
import threading
from db.operations import store_file, needs_reindex, set_project_metadata_batch, get_project_metadata
from db.vector_operations import (
connect_db as _connect_db,
load_sqlite_vector_extension as _load_sqlite_vector_extension,
ensure_chunks_and_meta as _ensure_chunks_and_meta,
insert_chunk_vector_with_retry as _insert_chunk_vector_with_retry,
get_chunk_text as _get_chunk_text,
)
from .openai import call_coding_api
from .llama_embeddings import OpenAICompatibleEmbedding
from .llama_chunker import chunk_with_llama_index
from llama_index.core import Document
from utils.logger import get_logger
from utils import compute_file_hash, norm, cosine
import logging
# reduce noise from httpx used by external libs
logging.getLogger("httpx").setLevel(logging.WARNING)
# language detection by extension
EXT_LANG = {
".py": "python",
".js": "javascript",
".ts": "typescript",
".java": "java",
".go": "go",
".rs": "rust",
".c": "c",
".cpp": "cpp",
".h": "c",
".html": "html",
".css": "css",
".md": "markdown",
}
# Chunking parameters (tunable)
CHUNK_SIZE = 800 # characters per chunk
CHUNK_OVERLAP = 100 # overlapping characters between chunks
EMBEDDING_CONCURRENCY = 4
# Increase batch size for parallel processing
EMBEDDING_BATCH_SIZE = 16 # Process embeddings in batches for better throughput
PROGRESS_LOG_INTERVAL = 10 # Log progress every N completed files
# Timeout for future.result() must account for retries: (max_retries + 1) × SDK_timeout + buffer
# With SDK timeout of 15s and max_retries=2, this allows 3 × 15s = 45s + 15s buffer = 60s
EMBEDDING_TIMEOUT = 60 # Timeout in seconds for each embedding API call (including retries)
FILE_PROCESSING_TIMEOUT = 300 # Timeout in seconds for processing a single file (5 minutes)
_FILE_EXECUTOR_WORKERS = 4
_EMBEDDING_EXECUTOR_WORKERS = 4
_FILE_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=_FILE_EXECUTOR_WORKERS)
_EMBEDDING_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=_EMBEDDING_EXECUTOR_WORKERS)
logger = get_logger(__name__)
# Initialize llama-index embedding client
_embedding_client = OpenAICompatibleEmbedding()
# Thread-local storage to track execution state inside futures
_thread_state = threading.local()
def _get_embedding_with_semaphore(semaphore: threading.Semaphore, text: str, file_path: str = "<unknown>", chunk_index: int = 0, model: Optional[str] = None):
"""
Wrapper to acquire semaphore inside executor task to avoid deadlock.
The semaphore is acquired in the worker thread, not the main thread.
Tracks execution state for debugging timeout issues.
"""
# Initialize thread state tracking
_thread_state.stage = "acquiring_semaphore"
_thread_state.file_path = file_path
_thread_state.chunk_index = chunk_index
_thread_state.start_time = time.time()
semaphore.acquire()
try:
_thread_state.stage = "calling_embed_text"
# Use llama-index embedding client
result = _embedding_client._get_text_embedding(text)
_thread_state.stage = "completed"
return result
except Exception as e:
_thread_state.stage = f"exception: {type(e).__name__}"
_thread_state.exception = str(e)
logger.error(f"Worker thread exception in embed_text for {file_path} chunk {chunk_index}: {e}")
raise
finally:
_thread_state.stage = "releasing_semaphore"
semaphore.release()
_thread_state.stage = "finished"
def detect_language(path: str):
if "LICENSE.md" in path:
return "text"
if "__editable__" in path:
return "text"
if "_virtualenv.py" in path:
return "text"
if "activate_this.py" in path:
return "text"
ext = Path(path).suffix.lower()
return EXT_LANG.get(ext, "text")
# Main synchronous processing for a single file
def _process_file_sync(
semaphore: threading.Semaphore,
database_path: str,
full_path: str,
rel_path: str,
cfg: Optional[Dict[str, Any]],
incremental: bool = True,
file_num: int = 0,
total_files: int = 0,
):
"""
Synchronous implementation of per-file processing.
Intended to run on a ThreadPoolExecutor worker thread.
Returns a dict: {"stored": bool, "embedded": bool, "skipped": bool}
Args:
file_num: The current file number being processed (1-indexed)
total_files: Total number of files to process
"""
try:
# read file content
try:
with open(full_path, "r", encoding="utf-8", errors="ignore") as fh:
content = fh.read()
# Get file modification time
mtime = os.path.getmtime(full_path)
except Exception:
return {"stored": False, "embedded": False, "skipped": False}
if not content:
return {"stored": False, "embedded": False, "skipped": False}
lang = detect_language(rel_path)
if lang == "text":
return {"stored": False, "embedded": False, "skipped": False}
# Compute hash for change detection
file_hash = compute_file_hash(content)
# Check if file needs reindexing (incremental mode)
if incremental and not needs_reindex(database_path, rel_path, mtime, file_hash):
return {"stored": False, "embedded": False, "skipped": True}
# store file (synchronous DB writer) with metadata
try:
fid = store_file(database_path, rel_path, content, lang, mtime, file_hash)
except Exception:
logger.exception("Failed to store file %s", rel_path)
return {"stored": False, "embedded": False, "skipped": False}
_ = Document(text=content, extra_info={"path": rel_path, "lang": lang})
embedding_model = None
if isinstance(cfg, dict):
embedding_model = cfg.get("embedding_model")
# Use llama-index chunking for all content
chunks = chunk_with_llama_index(content, language=lang, chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP)
if not chunks:
chunks = [content]
# Ensure extension present and tables created
conn_test = _connect_db(database_path)
try:
_load_sqlite_vector_extension(conn_test)
_ensure_chunks_and_meta(conn_test)
finally:
conn_test.close()
embedded_any = False
# Collect all chunks first for batch processing
chunk_tasks = []
for idx, chunk in enumerate(chunks):
chunk_doc = Document(text=chunk, extra_info={"path": rel_path, "lang": lang, "chunk_index": idx, "chunk_count": len(chunks)})
chunk_tasks.append((idx, chunk_doc))
# Process embeddings in parallel batches for better throughput
num_batches = math.ceil(len(chunk_tasks) / EMBEDDING_BATCH_SIZE)
for batch_num, batch_start in enumerate(range(0, len(chunk_tasks), EMBEDDING_BATCH_SIZE), 1):
batch = chunk_tasks[batch_start:batch_start + EMBEDDING_BATCH_SIZE]
# Log batch processing start
batch_start_time = time.time()
embedding_futures = []
for idx, chunk_doc in batch:
# Submit task to executor; semaphore will be acquired inside the worker
embedding_start_time = time.time()
future = _EMBEDDING_EXECUTOR.submit(_get_embedding_with_semaphore, semaphore, chunk_doc.text, rel_path, idx, embedding_model)
embedding_futures.append((idx, chunk_doc, future, embedding_start_time))
# Wait for batch to complete and store results
saved_count = 0
failed_count = 0
for idx, chunk_doc, future, embedding_start_time in embedding_futures:
try:
emb = future.result(timeout=EMBEDDING_TIMEOUT) # Add timeout to prevent hanging indefinitely
except concurrent.futures.TimeoutError:
elapsed = time.time() - embedding_start_time
# Try to get exception info from the future if available
future_exception = None
try:
future_exception = future.exception(timeout=0.1)
except concurrent.futures.TimeoutError:
future_exception = None # Still running
except Exception as e:
future_exception = e
# Build diagnostic information
diagnostic_info = [
f"Future timeout ({EMBEDDING_TIMEOUT}s) for {rel_path} chunk {idx}:",
f" - Elapsed time: {elapsed:.2f}s",
f" - Future state: {future._state if hasattr(future, '_state') else 'unknown'}",
]
if future_exception:
diagnostic_info.append(f" - Future exception: {type(future_exception).__name__}: {future_exception}")
else:
diagnostic_info.append(f" - Future exception: None (still running or completed)")
# Add information about running status
if future.running():
diagnostic_info.append(f" - Future.running(): True - worker thread is still executing")
elif future.done():
diagnostic_info.append(f" - Future.done(): True - worker thread completed but future.result() timed out retrieving result")
else:
diagnostic_info.append(f" - Future status: Pending/Unknown")
emb = None
failed_count += 1
except Exception as e:
logger.exception("Embedding retrieval failed for %s chunk %d: %s", rel_path, idx, e)
emb = None
failed_count += 1
if emb:
try:
db_start_time = time.time()
conn2 = _connect_db(database_path)
try:
_load_sqlite_vector_extension(conn2)
_insert_chunk_vector_with_retry(conn2, fid, rel_path, idx, emb)
saved_count += 1
finally:
conn2.close()
embedded_any = True
except Exception as e:
failed_count += 1
logger.error(f"Failed to insert chunk vector for {rel_path} chunk {idx}: {e}")
else:
pass
return {"stored": True, "embedded": embedded_any, "skipped": False}
except Exception:
logger.exception("Failed to process file %s", rel_path)
return {"stored": False, "embedded": False, "skipped": False}
def analyze_local_path_sync(
local_path: str,
database_path: str,
venv_path: Optional[str] = None,
max_file_size: int = 200000,
cfg: Optional[dict] = None,
incremental: bool = True,
):
"""
Synchronous implementation of the analysis pipeline.
Submits per-file tasks to a shared ThreadPoolExecutor.
Supports incremental indexing to skip unchanged files.
"""
from db.operations import set_project_metadata
semaphore = threading.Semaphore(EMBEDDING_CONCURRENCY)
start_time = time.time()
# Store project path in metadata for filesystem access
try:
set_project_metadata(database_path, "project_path", local_path)
logger.info(f"Starting indexing for project at: {local_path}")
except Exception as e:
logger.warning(f"Failed to store project path in metadata: {e}")
try:
file_count = 0
emb_count = 0
skipped_count = 0
file_paths: List[Dict[str, str]] = []
# Collect files to process
logger.info("Collecting files to index...")
for root, dirs, files in os.walk(local_path):
for fname in files:
full = os.path.join(root, fname)
rel = os.path.relpath(full, local_path)
try:
size = os.path.getsize(full)
if size > max_file_size:
continue
except Exception:
continue
file_paths.append({"full": full, "rel": rel})
total_files = len(file_paths)
logger.info(f"Found {total_files} files to process")
# Thread-safe counters: [submitted_count, completed_count, lock]
counters = [0, 0, threading.Lock()]
# Process files in chunks to avoid too many futures at once.
CHUNK_SUBMIT = 256
for chunk_start in range(0, len(file_paths), CHUNK_SUBMIT):
chunk = file_paths[chunk_start : chunk_start + CHUNK_SUBMIT]
futures = []
for f in chunk:
# Increment counter before starting file processing
with counters[2]:
counters[0] += 1
file_num = counters[0]
fut = _FILE_EXECUTOR.submit(
_process_file_sync,
semaphore,
database_path,
f["full"],
f["rel"],
cfg,
incremental,
file_num,
total_files,
)
futures.append(fut)
for fut in concurrent.futures.as_completed(futures):
try:
r = fut.result(timeout=FILE_PROCESSING_TIMEOUT)
# Increment completed counter and check for periodic logging
with counters[2]:
counters[1] += 1
completed_count = counters[1]
should_log = completed_count % PROGRESS_LOG_INTERVAL == 0
if isinstance(r, dict):
if r.get("stored"):
file_count += 1
if r.get("embedded"):
emb_count += 1
if r.get("skipped"):
skipped_count += 1
# Log periodic progress updates (every 10 files)
if should_log:
logger.info(f"Progress: {completed_count}/{total_files} files processed ({file_count} stored, {emb_count} with embeddings, {skipped_count} skipped)")
except concurrent.futures.TimeoutError:
logger.error(f"File processing timeout ({FILE_PROCESSING_TIMEOUT}s exceeded)")
with counters[2]:
counters[1] += 1
except Exception:
logger.exception("A per-file task failed")
# Store indexing metadata
end_time = time.time()
duration = end_time - start_time
# Log summary
logger.info(f"Indexing completed: {file_count} files processed, {emb_count} embeddings created, {skipped_count} files skipped in {duration:.2f}s")
try:
# Use batch update for efficiency - single database transaction
# Store total_files for performance (avoid re-scanning directory on every request)
set_project_metadata_batch(database_path, {
"last_indexed_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"last_index_duration": str(duration),
"files_indexed": str(file_count),
"files_skipped": str(skipped_count),
"total_files": str(total_files) # Store total files found during indexing
})
except Exception:
logger.exception("Failed to store indexing metadata")
# store uv_detected.json metadata if possible
uv_info = None
try:
uv_info = None if local_path is None else local_path
except Exception:
uv_info = None
try:
# Metadata storage is non-critical, ignore return value
_ = store_file(
database_path,
"uv_detected.json",
json.dumps(uv_info, indent=2),
"meta",
)
except Exception:
pass
except Exception:
logger.exception("Analysis failed for %s", local_path)
def analyze_local_path_background(local_path: str, database_path: str, venv_path: Optional[str] = None, max_file_size: int = 200000, cfg: Optional[dict] = None):
"""
Wrapper intended to be scheduled by FastAPI BackgroundTasks.
This function runs the synchronous analyzer in the FastAPI background task.
Usage from FastAPI endpoint:
background_tasks.add_task(analyze_local_path_background, local_path, database_path, venv_path, max_file_size, cfg)
"""
try:
analyze_local_path_sync(local_path, database_path, venv_path=venv_path, max_file_size=max_file_size, cfg=cfg)
except Exception:
logger.exception("Background analysis worker failed for %s", local_path)
def search_semantic(query: str, database_path: str, top_k: int = 5):
"""
Uses llama-index with sqlite-vector backend to retrieve best-matching chunks.
Always includes content as it's needed for the coding model context.
Args:
query: Search query text
database_path: Path to the SQLite database
top_k: Number of results to return
Returns:
List of dicts with file_id, path, chunk_index, score, and content
"""
try:
# Use llama-index for semantic search
from .llama_integration import llama_index_search
docs = llama_index_search(query, database_path, top_k=top_k)
results = []
for doc in docs:
metadata = doc.metadata or {}
result = {
"file_id": metadata.get("file_id", 0),
"path": metadata.get("path", ""),
"chunk_index": metadata.get("chunk_index", 0),
"score": metadata.get("score", 0.0),
"content": doc.text or "" # Always include content for LLM context
}
results.append(result)
logger.info(f"llama-index search returned {len(results)} results")
return results
except Exception as e:
logger.exception(f"Semantic search failed: {e}")
raise
def call_coding_model(prompt: str, context: str = ""):
combined = f"Context:\n{context}\n\nPrompt:\n{prompt}" if context else prompt
return call_coding_api(combined)