Skip to content

Commit b586c2d

Browse files
rustyconoverclaude
andcommitted
Add access log enrichment: request_data, stream_id, auth, error_message
Enrich vgi_rpc.access log entries with full call context: - request_data: base64 Arrow IPC bytes of the request batch, captured in _read_request() before deserialization. Contains all call parameters. - stream_id: UUID generated per stream dispatch, carried in the HTTP state token (v3 format) for cross-request correlation. - authenticated: bool from AuthContext (was missing). - claims: dict from AuthContext JWT/OAuth claims (was missing). - error_message: str(exc)[:500] on error entries (was missing). - server_version: static string passed to RpcServer at construction. - request_state/response_state: base64 state token bytes for HTTP exchange streams (inbound and outbound). Infrastructure: - _current_request_batch and _current_stream_id contextvars in _common.py - _AccessLogContextMiddleware resets contextvars between HTTP requests - Token format bumped v2→v3 to carry stream_id - _produce_stream_response returns error_message in 3-tuple - access_log_conformance.py: standalone CLI validator for cross-language conformance testing of access log output Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 596e2aa commit b586c2d

5 files changed

Lines changed: 345 additions & 20 deletions

File tree

vgi_rpc/access_log_conformance.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
"""Access log conformance validator for VGI servers.
2+
3+
Reads JSON log lines from ``vgi_rpc.access``, applies per-method-type
4+
rules, and reports violations. Language-agnostic — any VGI server
5+
implementation dumps its access logs, this tool checks them.
6+
7+
Usage::
8+
9+
python -m vgi_rpc.access_log_conformance /tmp/vgi-http-test-server.log
10+
cat server.log | python -m vgi_rpc.access_log_conformance
11+
12+
Exit code 0 if all entries pass, 1 if any violations are found.
13+
"""
14+
15+
from __future__ import annotations
16+
17+
import json
18+
import sys
19+
from dataclasses import dataclass
20+
from pathlib import Path
21+
22+
23+
@dataclass(frozen=True)
24+
class Violation:
25+
"""A single conformance violation."""
26+
27+
entry_index: int
28+
method: str
29+
field: str
30+
rule: str
31+
32+
33+
# ---------------------------------------------------------------------------
34+
# Rules
35+
# ---------------------------------------------------------------------------
36+
37+
# Fields required on every access log entry regardless of method.
38+
_ALWAYS_REQUIRED = ["server_version", "authenticated"]
39+
40+
# Fields required on entries that carry request parameters.
41+
# These methods go through _read_request() which stashes the raw batch.
42+
_REQUEST_DATA_METHODS = {"bind", "init", "table_function_cardinality"}
43+
44+
# Stream methods that must have a stream_id.
45+
_STREAM_METHODS = {"init"}
46+
47+
48+
def _is_catalog_method(method: str) -> bool:
49+
return method.startswith("catalog_")
50+
51+
52+
def validate_access_logs(entries: list[dict[str, object]]) -> list[Violation]:
53+
"""Validate a list of parsed access log entries against conformance rules.
54+
55+
Args:
56+
entries: Parsed JSON dicts from ``vgi_rpc.access`` log lines.
57+
58+
Returns:
59+
List of violations (empty if all entries conform).
60+
61+
"""
62+
violations: list[Violation] = []
63+
64+
for i, entry in enumerate(entries):
65+
method = str(entry.get("method", ""))
66+
status = str(entry.get("status", ""))
67+
68+
# --- Always required ---
69+
for field in _ALWAYS_REQUIRED:
70+
if field not in entry:
71+
violations.append(Violation(i, method, field, "required on all entries"))
72+
73+
# --- request_data on dispatch methods ---
74+
if (method in _REQUEST_DATA_METHODS or _is_catalog_method(method)) and not entry.get("request_data"):
75+
violations.append(Violation(i, method, "request_data", "required on dispatch methods"))
76+
77+
# --- stream_id ---
78+
if method in _STREAM_METHODS and not entry.get("stream_id"):
79+
violations.append(Violation(i, method, "stream_id", "required on stream methods"))
80+
81+
# --- HTTP exchange/produce continuations ---
82+
# These are stream method entries that lack request_data (not the init).
83+
# They should have stream_id from the state token.
84+
method_type = str(entry.get("method_type", ""))
85+
if (
86+
method_type == "stream"
87+
and method not in _STREAM_METHODS
88+
and not _is_catalog_method(method)
89+
and not entry.get("stream_id")
90+
):
91+
violations.append(Violation(i, method, "stream_id", "required on stream continuations (from state token)"))
92+
93+
# --- error_message on errors ---
94+
if status == "error" and not entry.get("error_message"):
95+
violations.append(Violation(i, method, "error_message", "required on error entries"))
96+
97+
return violations
98+
99+
100+
# ---------------------------------------------------------------------------
101+
# CLI
102+
# ---------------------------------------------------------------------------
103+
104+
105+
def _parse_json_log_lines(source: list[str]) -> list[dict[str, object]]:
106+
"""Parse JSON log lines, skipping non-JSON lines."""
107+
entries: list[dict[str, object]] = []
108+
for line in source:
109+
line = line.strip()
110+
if not line:
111+
continue
112+
try:
113+
obj = json.loads(line)
114+
if isinstance(obj, dict):
115+
entries.append(obj)
116+
except json.JSONDecodeError:
117+
continue
118+
return entries
119+
120+
121+
def _filter_access_logs(entries: list[dict[str, object]]) -> list[dict[str, object]]:
122+
"""Keep only vgi_rpc.access entries."""
123+
return [e for e in entries if e.get("logger") == "vgi_rpc.access"]
124+
125+
126+
def main(argv: list[str] | None = None) -> int:
127+
"""Run the conformance validator from the command line."""
128+
args = argv if argv is not None else sys.argv[1:]
129+
130+
if args and args[0] not in ("-", "--help", "-h"):
131+
path = Path(args[0])
132+
if not path.exists():
133+
print(f"ERROR: File not found: {path}", file=sys.stderr)
134+
return 1
135+
lines = path.read_text().splitlines()
136+
else:
137+
if args and args[0] in ("--help", "-h"):
138+
print(__doc__ or "")
139+
return 0
140+
lines = sys.stdin.read().splitlines()
141+
142+
all_entries = _parse_json_log_lines(lines)
143+
access_logs = _filter_access_logs(all_entries)
144+
145+
if not access_logs:
146+
print("WARNING: No vgi_rpc.access entries found in input.", file=sys.stderr)
147+
return 1
148+
149+
violations = validate_access_logs(access_logs)
150+
151+
if violations:
152+
print(f"FAIL: {len(access_logs)} entries validated, {len(violations)} violations")
153+
for v in violations:
154+
status = ""
155+
# Find the entry to show status if error
156+
if v.entry_index < len(access_logs):
157+
entry = access_logs[v.entry_index]
158+
if entry.get("status") == "error":
159+
status = ", status=error"
160+
print(f" entry {v.entry_index} (method={v.method}{status}): {v.rule} — missing '{v.field}'")
161+
return 1
162+
163+
print(f"PASS: {len(access_logs)} entries validated, 0 violations")
164+
return 0
165+
166+
167+
if __name__ == "__main__":
168+
sys.exit(main())

0 commit comments

Comments
 (0)