Skip to content

Commit bb7d383

Browse files
feat: implement config templates and circuit breaker limits
- Added support for serial template merging (default -> specific -> local) - Introduced 'settings' schema for depth, file count, and size limits - Optimized directory traversal by pruning excluded paths in os.walk - Refactored core logic into modular, type-hinted functions
1 parent 3e871ac commit bb7d383

9 files changed

Lines changed: 335 additions & 174 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,6 @@ build/
1010

1111
# Virtual environments
1212
.venv
13+
14+
15+
buffer.md

annotator/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# __init__.py

annotator/cli.py

Lines changed: 70 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,86 @@
1-
import argparse
2-
import json
31
import os
42
import sys
3+
import json
4+
import argparse
5+
import pathspec
6+
from typing import Any, Dict, List, Optional
57
from .core import annotate_project
6-
from .defaults import DEFAULT_COMMENT_STYLES, DEFAULT_EXCLUDE_DIRS
78

8-
CONFIG_FILE = ".annotator.json"
9+
CONFIG_NAME: str = ".annotator.json"
10+
TEMPLATES_BASE: str = os.path.join(os.path.dirname(__file__), "templates")
911

1012

11-
def load_config(root):
12-
config_path = os.path.join(root, CONFIG_FILE)
13-
if not os.path.exists(config_path):
13+
def load_json(path: str) -> Dict[str, Any]:
14+
"""Safe JSON loader."""
15+
if not os.path.exists(path):
1416
return {}
1517
try:
16-
with open(config_path, "r", encoding="utf-8") as f:
17-
return json.load(f)
18-
except json.JSONDecodeError as e:
19-
print(f"[ERROR] Invalid JSON in {CONFIG_FILE}: {e}", file=sys.stderr)
20-
return {}
21-
except Exception as e:
22-
print(f"[ERROR] Could not read {CONFIG_FILE}: {e}", file=sys.stderr)
18+
with open(path, "r", encoding="utf-8") as f:
19+
data = json.load(f)
20+
return data if isinstance(data, dict) else {}
21+
except Exception:
2322
return {}
2423

2524

26-
def main():
27-
parser = argparse.ArgumentParser(description="Annotate files with relative paths.")
28-
parser.add_argument("path", nargs="?", default=".", help="Project root (default: current dir)")
25+
def merge_configs(base: Dict[str, Any], overlay: Dict[str, Any]) -> Dict[str, Any]:
26+
"""Merges dictionaries and combines lists uniquely."""
27+
result: Dict[str, Any] = base.copy()
28+
for key, value in overlay.items():
29+
if isinstance(value, dict) and key in result and isinstance(result[key], dict):
30+
result[key] = {**result[key], **value}
31+
elif (
32+
isinstance(value, list) and key in result and isinstance(result[key], list)
33+
):
34+
result[key] = list(set(result[key] + value))
35+
else:
36+
result[key] = value
37+
return result
38+
39+
40+
def get_config(root: str) -> Dict[str, Any]:
41+
"""Loads default, then templates, then local config."""
42+
user_config: Dict[str, Any] = load_json(os.path.join(root, CONFIG_NAME))
43+
44+
final_config: Dict[str, Any] = {}
45+
# Default is always the starting point
46+
template_names: List[str] = user_config.get("templates", ["default"])
47+
48+
for name in template_names:
49+
t_path: str = os.path.join(TEMPLATES_BASE, f"{name}.json")
50+
final_config = merge_configs(final_config, load_json(t_path))
51+
52+
# User's local .annotator.json overrides everything
53+
return merge_configs(final_config, user_config)
54+
55+
56+
def get_spec(root: str) -> Optional[pathspec.PathSpec]:
57+
"""Parses .gitignore using pathspec."""
58+
git_path: str = os.path.join(root, ".gitignore")
59+
if not os.path.exists(git_path):
60+
return None
61+
try:
62+
with open(git_path, "r") as f:
63+
return pathspec.PathSpec.from_lines("gitwildmatch", f)
64+
except Exception:
65+
return None
66+
67+
68+
def main() -> None:
69+
parser = argparse.ArgumentParser(description="Recursive File Annotator")
70+
parser.add_argument("path", nargs="?", default=".", help="Project root")
2971
args = parser.parse_args()
3072

31-
root = os.path.abspath(args.path)
32-
if not os.path.exists(root):
33-
print(f"[ERROR] Path does not exist: {root}", file=sys.stderr)
34-
sys.exit(1)
73+
root: str = os.path.abspath(args.path)
3574
if not os.path.isdir(root):
36-
print(f"[ERROR] Path is not a directory: {root}", file=sys.stderr)
75+
print(f"[ERROR] Directory not found: {root}")
3776
sys.exit(1)
3877

39-
config = load_config(root)
40-
41-
valid_keys = {"comment_styles", "exclude_extensions", "exclude_dirs", "exclude_files"}
42-
for key in config.keys():
43-
if key not in valid_keys:
44-
print(f"[WARN] Unknown key in {CONFIG_FILE}: '{key}'", file=sys.stderr)
45-
46-
comment_styles = DEFAULT_COMMENT_STYLES.copy()
47-
if "comment_styles" in config and isinstance(config["comment_styles"], dict):
48-
comment_styles.update(config["comment_styles"])
49-
50-
exclude_exts = set(config.get("exclude_extensions", []))
51-
exclude_dirs = set(config.get("exclude_dirs", []))
52-
exclude_files = set(config.get("exclude_files", []))
53-
54-
print(f"[INFO] Annotating project at {root}")
55-
annotate_project(
56-
root,
57-
comment_styles,
58-
exclude_exts,
59-
exclude_dirs,
60-
exclude_files
61-
)
62-
print("[INFO] Done.")
78+
config: Dict[str, Any] = get_config(root)
79+
spec: Optional[pathspec.PathSpec] = get_spec(root)
80+
81+
print(f"[START] Root: {root}")
82+
annotate_project(root, config, spec)
83+
84+
85+
if __name__ == "__main__":
86+
main()

annotator/core.py

Lines changed: 84 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,89 +1,116 @@
11
import os
2-
from .defaults import DEFAULT_COMMENT_STYLES, DEFAULT_EXCLUDE_DIRS
2+
from typing import Any, Dict, Optional, Set
33

44

5-
def get_full_extension(filename: str) -> str:
6-
"""Return everything after the first dot, e.g., 'unit.test.js' -> '.test.js'."""
7-
if "." not in filename:
8-
return ""
9-
return filename[filename.index("."):] # keeps the dot
5+
def get_extension(filename: str) -> str:
6+
"""Return the extension after the last dot."""
7+
return os.path.splitext(filename)[1]
108

119

12-
def is_excluded_dir(filepath: str, root: str, exclude_dirs: set) -> bool:
13-
"""Check if file lives in an excluded directory (relative to root)."""
14-
rel_path = os.path.relpath(filepath, root)
15-
parts = rel_path.split(os.sep)
16-
return any(part in exclude_dirs for part in parts)
10+
def is_too_deep(subdir: str, root: str, max_depth: int) -> bool:
11+
"""Check if the current directory exceeds recursion limits."""
12+
rel_path: str = os.path.relpath(subdir, root)
13+
if rel_path == ".":
14+
return False
15+
return len(rel_path.split(os.sep)) > max_depth
1716

1817

19-
def is_excluded_file(filename: str, exclude_files: set) -> bool:
20-
"""Check if filename matches exactly one of the excluded files."""
21-
return filename in exclude_files
18+
def is_too_large(filepath: str, max_kb: int) -> bool:
19+
"""Check if file size exceeds the limit."""
20+
try:
21+
return (os.path.getsize(filepath) / 1024) > max_kb
22+
except OSError:
23+
return True
24+
25+
26+
def is_git_ignored(filepath: str, root: str, spec: Any) -> bool:
27+
"""Check if path matches gitignore rules."""
28+
if not spec:
29+
return False
30+
rel_path: str = os.path.relpath(filepath, root)
31+
return spec.match_file(rel_path)
2232

2333

24-
def is_excluded_ext(filename: str, exclude_exts: set) -> bool:
25-
"""Check if full extension matches excluded extensions."""
26-
ext = get_full_extension(filename)
27-
return ext in exclude_exts
34+
def is_excluded_file(filename: str, config: Dict[str, Any]) -> bool:
35+
"""Check if filename or extension is in the exclusion lists."""
36+
if filename in config.get("exclude_files", []):
37+
return True
38+
if get_extension(filename) in config.get("exclude_extensions", []):
39+
return True
40+
return False
2841

2942

30-
def annotate_file(root, filepath, comment_styles):
31-
rel_path = os.path.relpath(filepath, root)
32-
ext = get_full_extension(os.path.basename(filepath))
43+
def get_prefix(filename: str, config: Dict[str, Any]) -> str:
44+
"""Determine comment style from config; fallback to '#'."""
45+
styles: Dict[str, str] = config.get("comment_styles", {})
46+
ext: str = get_extension(filename)
47+
# Priority: Filename > Extension > Fallback
48+
return styles.get(filename) or styles.get(ext) or "#"
3349

34-
prefix = comment_styles.get(ext)
35-
if not prefix:
36-
return
3750

38-
annotation = f"{prefix} {rel_path}\n"
51+
def apply_annotation(filepath: str, root: str, config: Dict[str, Any]) -> bool:
52+
"""Reads, checks for existing header, and writes the new annotation."""
53+
rel_path: str = os.path.relpath(filepath, root)
54+
prefix: str = get_prefix(os.path.basename(filepath), config)
55+
56+
annotation: str
3957
if prefix in ["<!--", "/*"]:
40-
annotation = f"{prefix} {rel_path} {'-->' if prefix == '<!--' else '*/'}\n"
58+
suffix: str = " -->" if prefix == "<!--" else " */"
59+
annotation = f"{prefix} {rel_path}{suffix}\n"
60+
else:
61+
annotation = f"{prefix} {rel_path}\n"
4162

4263
try:
4364
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
44-
lines = f.readlines()
45-
except Exception as e:
46-
print(f"[WARN] Skipping {filepath}: {e}")
47-
return
65+
content: str = f.read()
4866

49-
if lines and rel_path in lines[0]:
50-
return
67+
if content.startswith(annotation.strip()):
68+
return False
5169

52-
new_content = [annotation] + lines
53-
try:
5470
with open(filepath, "w", encoding="utf-8") as f:
55-
f.writelines(new_content)
56-
print(f"[OK] Annotated {rel_path}")
71+
f.write(annotation + content)
72+
return True
5773
except Exception as e:
58-
print(f"[ERROR] Failed to write {filepath}: {e}")
59-
74+
print(f"[ERROR] Failed {rel_path}: {e}")
75+
return False
6076

61-
def annotate_project(root=".", comment_styles=None,
62-
exclude_exts=None, exclude_dirs=None, exclude_files=None):
63-
if comment_styles is None:
64-
comment_styles = DEFAULT_COMMENT_STYLES
65-
if exclude_exts is None:
66-
exclude_exts = set()
67-
if exclude_dirs is None:
68-
exclude_dirs = set()
69-
if exclude_files is None:
70-
exclude_files = set()
7177

72-
effective_exclude_dirs = set(DEFAULT_EXCLUDE_DIRS) | set(exclude_dirs)
78+
def annotate_project(
79+
root: str, config: Dict[str, Any], spec: Optional[Any] = None
80+
) -> None:
81+
"""Traverse and annotate based on template/config settings."""
82+
settings: Dict[str, Any] = config.get("settings", {})
83+
max_depth: int = settings.get("max_recursive_depth", 10)
84+
max_files: int = settings.get("max_num_of_files", 1000)
85+
max_kb: int = settings.get("max_file_size_kb", 512)
86+
ex_dirs: Set[str] = set(config.get("exclude_dirs", []))
7387

88+
annotated_count: int = 0
7489
for subdir, dirs, files in os.walk(root):
75-
# skip excluded dirs at traversal level
76-
dirs[:] = [d for d in dirs if d not in effective_exclude_dirs]
90+
# 1. Prune depth
91+
if is_too_deep(subdir, root, max_depth):
92+
dirs[:] = []
93+
continue
94+
95+
# 2. Prune excluded directories (Efficient path exclusion)
96+
dirs[:] = [d for d in dirs if d not in ex_dirs]
7797

7898
for file in files:
79-
filepath = os.path.join(subdir, file)
99+
if annotated_count >= max_files:
100+
print(f"[HALT] Reached file limit: {max_files}")
101+
return
80102

81-
# precedence rules
82-
if is_excluded_dir(filepath, root, effective_exclude_dirs):
103+
filepath: str = os.path.join(subdir, file)
104+
105+
if is_git_ignored(filepath, root, spec):
83106
continue
84-
if is_excluded_file(file, exclude_files):
107+
if is_excluded_file(file, config):
85108
continue
86-
if is_excluded_ext(file, exclude_exts):
109+
if is_too_large(filepath, max_kb):
87110
continue
88111

89-
annotate_file(root, filepath, comment_styles)
112+
if apply_annotation(filepath, root, config):
113+
annotated_count += 1
114+
print(f"[OK] {os.path.relpath(filepath, root)}")
115+
116+
print(f"[DONE] Processed {annotated_count} files.")

annotator/defaults.py

Lines changed: 0 additions & 69 deletions
This file was deleted.

0 commit comments

Comments
 (0)