Version: 0.3.0 Status: Living document Last Updated: 2026-03-31 License: Apache-2.0
- Introduction
- System Context & Data Flow
- Architecture Overview
- Module Design
- Data Model
- Query Architecture
- Evaluation Framework
- LLM Execution Strategy
- Async/Sync Boundary Design
- Extensibility & Plugin Points
- Error Handling Philosophy
- Testing Strategy
- Security Considerations
- Future Directions
The BigQuery Agent Analytics SDK is the consumption layer for agent observability at scale. It reads agent execution traces stored in BigQuery and provides a Python toolkit for trace reconstruction, deterministic and semantic evaluation, drift detection, insights generation, anomaly detection, and long-horizon agent memory.
AI agents built on the Agent Development Kit (ADK) generate rich telemetry — LLM calls, tool invocations, state changes, errors, and latency data — that is logged to BigQuery via the BigQuery Agent Analytics Plugin. However, raw event rows in BigQuery are insufficient for:
- Debugging: Reconstructing the causal chain of events within a session
- Evaluation: Systematically assessing agent quality against deterministic and semantic criteria
- Monitoring: Detecting production drift, anomalies, and regression
- Curation: Building and managing evaluation datasets with lifecycle governance
This SDK bridges the gap between raw telemetry and actionable agent analytics.
| Principle | Manifestation |
|---|---|
| BigQuery-native | Pushes computation to BigQuery via SQL wherever possible; minimizes data transfer |
| Graceful degradation | Optional dependencies (google-genai, bigframes) degrade features, never crash |
| Async-first, sync-friendly | Core pipeline is async for concurrency; Client exposes sync methods for simplicity |
| Zero infrastructure | No additional services needed — BigQuery is the only backend |
| Injection over inheritance | BigQuery client is injectable for testing; no subclass-based extension |
| Builder over config | Fluent method chaining over complex configuration objects |
┌──────────────────────────────────────────────────────────┐
│ Agent Runtime (ADK) │
│ │
│ LlmAgent ─── Runner ─── SessionService │
│ │ │
│ BigQueryAgentAnalyticsPlugin │
│ (production layer) │
│ │ │
│ BQ Storage Write API ─── PyArrow serialization │
└─────────────────┬────────────────────────────────────────┘
│ Writes events
▼
┌──────────────────────────────────────────────────────────┐
│ BigQuery (agent_events) │
│ │
│ Partitioned by DATE(timestamp) │
│ Clustered by event_type, agent, user_id │
└─────────────────┬────────────────────────────────────────┘
│ Reads events
▼
┌──────────────────────────────────────────────────────────┐
│ BigQuery Agent Analytics SDK │
│ (consumption layer) │
│ │
│ Client ─── Trace ─── Evaluators ─── Insights │
│ Memory ─── AI/ML ─── Feedback ─── EvalSuite │
└──────────────────────────────────────────────────────────┘
The SDK does not write agent events to BigQuery. That responsibility belongs entirely to the ADK plugin. The SDK reads event data, and optionally writes derived artifacts (evaluation results, embeddings indexes) to separate tables.
The BigQueryAgentAnalyticsPlugin (part of google-adk, not this SDK) captures agent lifecycle events and writes them to BigQuery asynchronously:
- Transport: BigQuery Storage Write API with PyArrow serialization for high-throughput, low-latency writes
- Batching: Configurable
batch_sizeandbatch_flush_intervalwith async queue (queue_max_sizeup to 10,000) - Tracing: OpenTelemetry integration populates
trace_id,span_id,parent_span_idwhen aTracerProvideris configured; falls back to internal UUIDs otherwise - Content handling:
HybridContentParser+GCSOffloaderoffload large payloads (>500KB default) to GCS, storingObjectRefincontent_parts - Safety:
_safe_callbackdecorator wraps every callback to swallow plugin errors, preventing observability failures from affecting agent execution - Filtering:
event_allowlist/event_denylistfor selective logging;content_formatterfor redaction
Event types logged:
| Category | Events |
|---|---|
| User interaction | USER_MESSAGE_RECEIVED |
| Agent lifecycle | AGENT_STARTING, AGENT_COMPLETED, INVOCATION_STARTING, INVOCATION_COMPLETED |
| LLM operations | LLM_REQUEST, LLM_RESPONSE, LLM_ERROR |
| Tool operations | TOOL_STARTING, TOOL_COMPLETED, TOOL_ERROR |
| State management | STATE_DELTA |
The SDK reads events through the standard BigQuery client library (google-cloud-bigquery) using parameterized SQL queries. It performs the following operations:
BigQuery
│
┌───────────┼───────────────────────┐
│ │ │
Standard SQL AI.GENERATE ML.DETECT_ANOMALIES
(trace, eval) (LLM judge, (ARIMA, Autoencoder)
facets, insights)
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────┐
│ SDK Python Processing │
│ │
│ Trace reconstruction ─── DAG tree building │
│ Score computation ─── Report aggregation │
│ Trajectory matching ─── Drift analysis │
│ Context selection ─── Memory ranking │
└─────────────────────────────────────────────┘
As demonstrated in the e2e demo:
Phase 1 — Trace Generation:
- ADK
LlmAgentwith tools is created and wired to aRunner BigQueryAgentAnalyticsPluginis attached as a plugin- User messages are sent through
runner.run_async() - Plugin captures every event callback and writes to BigQuery
plugin.flush()ensures all buffered events are written
Phase 2 — Evaluation:
Client.get_trace()retrieves all events for a sessionSystemEvaluatorpreset factories assess latency, turn count, error rate, token efficiencyPerformanceEvaluatorperforms evaluates performance metrics
Phase 3 — Insights:
Client.insights()triggers the multi-stage pipeline- Session metadata is aggregated from BigQuery
- Per-session facets are extracted via
AI.GENERATEwith structured output - Seven analysis prompts generate specialized reports
- Executive summary synthesizes all findings
┌─────────┐
│ Client │ (entry point)
└────┬────┘
┌─────────┬───┴───┬──────────┐
▼ ▼ ▼ ▼
┌────────┐ ┌──────┐ ┌────────┐ ┌──────────┐
│ trace │ │eval- │ │feedback│ │ insights │
│ │ │uators│ │ │ │ │
└────────┘ └──┬───┘ └────────┘ └────┬─────┘
│ │
┌────┘ │
▼ ▼
┌──────────────┐ ┌────────────┐
│grader_pipeline│ │ evaluators │
└──────────────┘ └────────────┘
┌────────────────┐ ┌────────────┐ ┌──────────────┐
│trace_evaluator │ │ eval_suite │ │eval_validator│
└───────┬────────┘ └──────┬─────┘ └──────┬───────┘
│ │ │
▼ │ ▼
┌──────────────┐ │ ┌──────────┐
│ multi_trial │ └──────────►│eval_suite│
└──────────────┘
┌──────────────────┐ ┌───────────────────┐ ┌─────────────────────┐
│ memory_service │ │ ai_ml_integration │ │bigframes_evaluator │
│ (requires ADK) │ │ │ │(requires bigframes) │
└──────────────────┘ └───────────────────┘ └─────────────────────┘
┌──────────────────┐ ┌───────────────────┐ ┌──────────────────────┐
│event_semantics │ │ views │ │ context_graph │
│(canonical helpers│ │(per-event BQ views│ │(Property Graph, GQL, │
└──────────────────┘ └───────────────────┘ │ world-change detect) │
└──────────────────────┘
┌──────────────────────┐ ┌──────────────────────┐ ┌──────────────────┐
│ categorical_evaluator│ │ ontology_* (6 modules)│ │ cli │
│ categorical_views │ │ (YAML → AI.GENERATE → │ │ (Typer commands) │
│ (label evaluation) │ │ tables → PG → GQL) │ │ │
└──────────────────────┘ └──────────────────────┘ └──────────────────┘
┌──────────────────┐ ┌───────────────────┐
│ udf_kernels │ │ serialization │
│ udf_sql_templates│ │ formatter │
└──────────────────┘ └───────────────────┘
| Layer | Modules | Responsibility |
|---|---|---|
| Entry Point | client.py |
High-level sync API, BigQuery query orchestration |
| Core Data | trace.py |
Trace/Span reconstruction, DAG rendering, filtering |
| Evaluation Engine | system_evaluator.py, performance_evaluator.py, multi_trial_performance_evaluator.py, aggregate_grader.py |
Deterministic metrics, LLM-as-judge, trajectory matching, multi-trial statistics, grader composition |
| Categorical Evaluation | categorical_evaluator.py, categorical_views.py |
User-defined categorical classification with AI.GENERATE + Gemini fallback, dashboard views with dedup |
| Eval Governance | eval_suite.py, eval_validator.py |
Task lifecycle management, static quality validation |
| Feedback & Insights | feedback.py, insights.py |
Drift detection, question distribution, multi-stage analysis pipeline |
| AI/ML | ai_ml_integration.py, bigframes_evaluator.py |
BigQuery AI.GENERATE, embeddings, anomaly detection, DataFrame API |
| Memory | memory_service.py |
Cross-session context retrieval, semantic search, user profiling |
| Context Graph (V2/V3) | context_graph.py |
Property Graph linking traces to business entities, BizNode extraction via AI.GENERATE, GQL traversal, world-change detection, decision semantics |
| Ontology Graph (V4) | ontology_models.py, ontology_schema_compiler.py, ontology_graph.py, ontology_materializer.py, ontology_property_graph.py, ontology_orchestrator.py |
Configuration-driven graph pipeline: YAML spec loading, AI extraction, physical table routing, Property Graph DDL transpilation, GQL showcase |
| CLI | cli.py |
Command-line interface: traces, evaluation, categorical evaluation, insights, drift, views, ontology pipeline |
| Interfaces | serialization.py, formatter.py |
JSON serialization for CLI/Remote Function boundaries, output formatting (JSON/text/table) |
| UDF Kernels | udf_kernels.py, udf_sql_templates.py |
Pure analytical kernels for BigQuery Python UDFs, SQL templates for UDF registration |
| Utilities | event_semantics.py, views.py |
Canonical event type predicates, per-event-type BigQuery view management |
| Package | __init__.py |
Graceful optional import aggregation |
Decision 1: BigQuery as the sole backend. All data lives in BigQuery. The SDK does not introduce Redis, PostgreSQL, or any other storage. This keeps the operational footprint minimal — if you have BigQuery, you have everything you need.
Decision 2: SQL-first computation.
Aggregations, filtering, joins, and even LLM evaluation (via AI.GENERATE) are pushed down to BigQuery. Python-side processing is reserved for tasks that cannot be expressed in SQL: tree reconstruction, trajectory matching algorithms, report formatting, and metric composition.
Decision 3: Three-tier LLM execution.
LLM-based evaluation can run via (1) BigQuery AI.GENERATE, (2) legacy BigQuery ML ML.GENERATE_TEXT, or (3) the Gemini API directly. This maximizes compatibility across different GCP configurations.
Decision 4: Composition over inheritance.
The AggregateGrader composes SystemEvaluator, PerformanceEvaluator, and custom functions via a builder pattern rather than requiring them to share a common base class. The BigQueryMemoryService composes four internal services rather than extending a single monolithic class.
The Client class is the primary interface for users who want a batteries-included experience. It encapsulates BigQuery connection management and provides synchronous methods that internally orchestrate async evaluation pipelines.
Constructor parameters:
Client(
project_id: str, # GCP project
dataset_id: str, # BigQuery dataset
table_id: str = "agent_events",
location: str | None = None, # BQ location; None lets the client auto-detect
gcs_bucket_name: str | None, # For GCS-offloaded payload access
verify_schema: bool = True, # Schema validation on init
endpoint: str | None, # AI.GENERATE endpoint
connection_id: str | None, # BQ connection for AI functions
bq_client = None, # Injectable for testing
)Key methods:
| Method | Returns | SQL Template | Description |
|---|---|---|---|
get_trace(session_id) |
Trace |
_SESSION_EVENTS_QUERY |
Fetches all events for a session, constructs Span tree |
list_traces(filter_criteria) |
list[Trace] |
_LIST_SESSIONS_QUERY + per-session fetch |
Discovers sessions matching TraceFilter, fetches each |
evaluate(evaluator, filters) |
EvaluationReport |
SESSION_SUMMARY_QUERY or AI_GENERATE_JUDGE_BATCH_QUERY |
Runs code or LLM evaluation over matching sessions |
drift_detection(golden_dataset, filters) |
DriftReport |
Production + golden question queries | Compares golden vs. production question coverage |
insights(filters, config) |
InsightsReport |
Multi-stage pipeline (5 SQL templates) | Generates comprehensive analysis report |
deep_analysis(filters, configuration) |
QuestionDistribution |
Frequency or semantic grouping queries | Question distribution analysis |
Sync-to-async bridge:
def evaluate(self, evaluator, filters=None):
# ... build SQL ...
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(self._run_evaluation(...))
finally:
loop.close()This pattern allows the Client to be used in Jupyter notebooks, synchronous scripts, and async applications alike.
Legacy model detection:
def _is_legacy_model_ref(self, ref: str) -> bool:
return ref.count(".") >= 2 # e.g., "project.dataset.model_name"This heuristic determines whether to use AI.GENERATE (modern endpoint name like "gemini-2.5-flash") or ML.GENERATE_TEXT (fully-qualified BQML model reference like "project.dataset.gemini_model").
Core data structures:
@dataclass
class Span:
event_type: str
agent: str | None
timestamp: datetime
content: Any # Parsed JSON payload
span_id: str | None
parent_span_id: str | None
invocation_id: str | None
latency_ms: float | None
status: str | None
error_message: str | None
attributes: dict | None
children: list[Span] # Populated by tree builderSpan.from_bigquery_row(row) factory:
Handles polymorphic content extraction:
- Attempts
JSON.loads(row.get("content", "{}"))for JSON columns - Falls back to string content
- Parses
latency_msfrom JSON ({"total_ms": ...}) or direct numeric - Parses
attributesfrom JSON string or dict
Trace class:
@dataclass
class Trace:
session_id: str
spans: list[Span]
_roots: list[Span] # Top-level spans (no parent)DAG reconstruction algorithm (_build_tree):
Two-pass O(n) algorithm:
- Index pass: Build
{span_id: span}lookup dictionary - Link pass: For each span, if
parent_span_idexists in index, append to parent'schildren; otherwise add to_rootslist
This produces a forest of trees rooted at top-level events (typically USER_MESSAGE_RECEIVED or INVOCATION_STARTING).
render() output:
Recursive DFS tree rendering with Unicode box-drawing characters:
Session: sess-001 (12 events, 3420ms)
├── USER_MESSAGE_RECEIVED: "What is the weather?"
│ └── AGENT_STARTING: weather_agent
│ ├── LLM_REQUEST → LLM_RESPONSE (320ms)
│ ├── TOOL_STARTING: get_weather(city="NYC")
│ │ └── TOOL_COMPLETED: {"temp": 72} (1200ms)
│ └── AGENT_COMPLETED: "The weather is 72F."
TraceFilter dataclass:
Converts filter criteria to parameterized SQL:
@dataclass
class TraceFilter:
start_time: datetime | None
end_time: datetime | None
agent_id: str | None
user_id: str | None
session_ids: list[str] | None
has_error: bool | None
min_latency_ms: float | None
max_latency_ms: float | None
event_types: list[str] | None
def to_sql_conditions(self) -> tuple[str, list[QueryParameter]]:
# Returns (WHERE clause fragment, parameter list)Each field generates a separate AND condition with a corresponding bigquery.ScalarQueryParameter or bigquery.ArrayQueryParameter. This is the only dynamic SQL in the SDK — everything else uses static templates.
Deterministic evaluation using code-defined metric functions.
Internal storage:
_metrics: list[dict] # [{"name": str, "fn": Callable[[dict], float], "threshold": float}]Each metric function receives a session summary dict (aggregated from BigQuery) and returns a score in [0.0, 1.0].
Pre-built factory methods (Python path — raw-budget binary gates):
Since v0.2.2 the Python prebuilts compare the observed metric directly
against the user-supplied budget: score = 1.0 if observed <= budget else 0.0, with threshold = 1.0. A session fails iff the observed
value strictly exceeds the budget.
| Factory | Observed Value | Pass Condition |
|---|---|---|
latency(threshold_ms) |
avg_latency_ms |
observed <= threshold_ms |
turn_count(max_turns) |
turn_count |
observed <= max_turns |
error_rate(max_error_rate) |
tool_errors / tool_calls (0 when no calls) |
observed <= max_error_rate |
token_efficiency(max_tokens) |
total_tokens |
observed <= max_tokens |
ttft(threshold_ms) |
avg_ttft_ms |
observed <= threshold_ms |
cost_per_session(max_cost_usd, ...) |
(input_tokens/1K)*input_rate + (output_tokens/1K)*output_rate |
observed <= max_cost_usd |
Prior to v0.2.2 these factories used a normalized score
1.0 - min(observed / budget, 1.0)with a0.5pass cutoff, which effectively fired every gate at roughly half the budget the user typed (e.g.latency(threshold_ms=5000)failed atavg_latency_ms > 2500). SeeCHANGELOG.mdfor the migration note.
SQL-native UDF path (udf_kernels.score_*, used by
udf_sql_templates.py):
Unchanged — keeps the normalized 1.0 - min(observed / budget, 1.0)
score because BigQuery SQL callers (e.g. scorecard dashboards) may
already interpret the normalized value. The divergence from the Python
path is intentional: Python prebuilts are binary CI gates; SQL UDFs
are normalized metrics for analytical workloads.
evaluate_session(session_summary) -> SessionScore:
Iterates all metrics, computes each score, determines passed = all(score >= threshold for each metric).
SQL template (SESSION_SUMMARY_QUERY):
Aggregates per-session statistics from raw events:
COUNT(*)as event_countCOUNT(CASE event_type = 'TOOL_STARTING')as tool_callsCOUNT(CASE status = 'ERROR')as tool_errorsAVG(JSON_VALUE(latency_ms, '$.total_ms'))as avg_latency_msSUM(JSON_VALUE(content, '$.usage.total'))as total_tokens- Turn count from
USER_MESSAGE_RECEIVEDevents
Evaluates performance metrics leverage agent-generated traces/responses and optionally, golden traces/responses.
evaluate_session() flow:
1. Fetch trace from BigQuery (_SESSION_TRACE_QUERY)
2. Parse into SessionTrace (tool_calls, events, final_response)
3. Extract actual ToolCall sequence
4. Compute trajectory score (based on MatchType)
5. Compute step efficiency
6. Run LLM-based metrics to evaluate correctness, hallucination, sentiment, and efficiency.
7. Determine pass/fail against thresholds
8. Return EvaluationResult
evaluate_batch() flow:
1. Create asyncio.Semaphore(concurrency)
2. For each task in eval_dataset:
- Acquire semaphore
- evaluate_session(task)
- Release semaphore
3. Gather all results
Three matching algorithms:
compute_exact_match(actual, expected) -> float:
- Requires identical sequence length and order
- Each position: compares
tool_nameequality; if match and both haveargs, compares args equality - Score = matching_positions / max(len(actual), len(expected))
compute_in_order_match(actual, expected) -> float:
- Expected tools must appear in order within actual sequence
- Extra tools between expected steps are allowed
- Uses greedy forward scan: for each expected step, scan forward in actual
- Score = matched_expected / len(expected)
compute_any_order_match(actual, expected) -> float:
- All expected tools must be present, order doesn't matter
- Uses set-like matching with
tool_namecomparison - Score = matched_expected / len(expected)
compute_step_efficiency(actual_steps, expected_steps) -> float:
- Measures whether agent used minimal steps
- Score = min(expected / actual, 1.0) if actual > 0, else 0.0
- Penalizes extra steps, doesn't reward fewer
Deterministic replay for debugging and comparison:
replay_session(session_id, replay_mode, step_callback): Fetches trace, replays events in order. Modes:"full"(all events),"step"(with callback per event),"tool_only"(only tool events)compare_replays(session_a, session_b): Replays both sessions, diffs tool sequences and response similarity
Agents are non-deterministic. A single evaluation run is not statistically meaningful. MultiTrialPerformanceEvaluator addresses this.
MultiTrialPerformanceEvaluator(evaluator, num_trials, concurrency):
Runs N trials of the same evaluation task with bounded concurrency via asyncio.Semaphore.
Key metrics:
def compute_pass_at_k(n: int, c: int) -> float:
"""Probability at least 1 of k trials passes.
Formula: 1 - C(n-c, k) / C(n, k)
where n=total trials, c=passed trials, k=n
"""
if c == n: return 1.0
if c == 0: return 0.0
return 1.0 - math.comb(n - c, n) / math.comb(n, n)
def compute_pass_pow_k(n: int, c: int) -> float:
"""Probability all k trials pass.
Formula: (c/n)^n
"""
return (c / n) ** n if n > 0 else 0.0MultiTrialReport:
class MultiTrialReport(BaseModel):
session_id: str
num_trials: int
num_passed: int
pass_at_k: float
pass_pow_k: float
per_trial_pass_rate: float
mean_scores: dict[str, float]
score_std_dev: dict[str, float]
trial_results: list[TrialResult]Combines heterogeneous evaluators into a unified verdict using a strategy pattern.
Architecture:
┌──────────────────┐
│ AggregateGrader │
│ │
│ strategy: ──────┼──► ScoringStrategy
│ graders: ───────┼──► list[_GraderEntry]
└────────┬─────────┘
│
┌──────────────┼─────────────────┐
▼ ▼ ▼
SystemEvaluator PerformanceEvaluator Custom Fn
(sync) (async) (sync)
│ │ │
▼ ▼ ▼
GraderResult GraderResult GraderResult
│ │ │
└──────────────┼─────────────────┘
▼
┌────────────────┐
│ScoringStrategy │
│ .aggregate() │
└───────┬────────┘
▼
AggregateVerdict
Scoring strategies:
| Strategy | Aggregation | Pass Condition |
|---|---|---|
WeightedStrategy(weights, threshold) |
Weighted average of grader scores | Average >= threshold |
BinaryStrategy() |
Average of all scores | ALL graders must pass individually |
MajorityStrategy() |
Average of all scores | Majority (>50%) of graders must pass |
Manages collections of evaluation tasks with lifecycle governance.
Task lifecycle:
add_task()
│
▼
┌──────────────┐
│ CAPABILITY │ ◄── New tasks start here
│ (active dev) │
└──────┬───────┘
│ auto_graduate() or graduate_to_regression()
│ (requires consistent passing over threshold_runs)
▼
┌──────────────┐
│ REGRESSION │ ◄── Stable, must-pass tests
│ (locked) │
└──────────────┘
Health monitoring (check_health()):
Detects:
- Balance issues: Positive/negative case ratio outside 30-70%
- Saturation: Tasks at 100% pass rate (may need difficulty increase)
- Missing expectations: Tasks without
expected_trajectoryANDexpected_response
auto_graduate(pass_history, threshold_runs=10):
A task graduates to REGRESSION if all(pass_history[task_id][-threshold_runs:]) — i.e., it has passed all of the last N runs.
Five static checks that run without executing any evaluations:
| Check | Detects | Severity |
|---|---|---|
check_ambiguity |
Tasks missing both expected trajectory and response | warning |
check_balance |
Positive/negative ratio outside 30-70% | warning |
check_threshold_consistency |
Thresholds at 0.0 (always pass) or 1.0 (require perfection) | warning |
check_duplicate_sessions |
Multiple tasks using the same session_id | info |
check_saturation |
Tasks at 100% pass rate (last 5+ runs) | info |
Compares golden (curated Q&A) against production traffic:
- Load golden questions from dedicated BigQuery table
- Load production questions from
USER_MESSAGE_RECEIVEDevents (withTraceFilter) - Perform keyword overlap matching:
set(q.lower().strip()) - Report coverage %, covered/uncovered/new questions
Four analysis modes:
| Mode | Approach |
|---|---|
frequently_asked |
SQL GROUP BY question ORDER BY COUNT(*) DESC |
frequently_unanswered |
Join with error sessions, count unanswered |
auto_group_using_semantics |
AI.GENERATE classifies each question into categories |
custom |
AI.GENERATE with user-provided category list |
The most complex module. Implements a six-stage pipeline:
Stage 1: Session Metadata Extraction
│ SQL: _SESSION_METADATA_QUERY
│ → list[SessionMetadata]
▼
Stage 2: Session Facet Extraction
│ SQL: _AI_GENERATE_FACET_EXTRACTION_QUERY (or legacy/API fallback)
│ → list[SessionFacet]
│
│ Per-session structured analysis:
│ goal_categories, outcome, satisfaction, friction_types,
│ session_type, agent_effectiveness, primary_success,
│ key_topics, summary
▼
Stage 3: Aggregation
│ Python: aggregate_facets()
│ → AggregatedInsights (counters, distributions, averages)
▼
Stage 4: Multi-Prompt Analysis
│ 7 specialized prompts via AI.GENERATE or API:
│ task_areas, interaction_patterns, what_works_well,
│ friction_analysis, tool_usage, suggestions, trends
│ → list[AnalysisSection]
▼
Stage 5: Executive Summary
│ Synthesizes all 7 sections into 4-6 sentence overview
│ → str
▼
Stage 6: Report Assembly
→ InsightsReport
Facet extraction prompt design:
The facet extraction prompt constrains LLM output to predefined vocabularies:
GOAL_CATEGORIES: 14 categories (question_answering, data_retrieval, task_automation, ...)OUTCOMES: 5 levels (success, partial_success, failure, abandoned, unclear)SATISFACTION_LEVELS: 6 levelsFRICTION_TYPES: 12 types (tool_error, slow_response, wrong_answer, ...)SESSION_TYPES: 5 types
When using AI.GENERATE, structured output_schema enforces type safety. When using the API fallback, parse_facet_response() validates against these vocabularies and applies defaults for invalid values.
Analysis context computation (build_analysis_context()):
Computes derived statistics from aggregated facets and metadata to feed into analysis prompts:
- Success goals vs. failure goals (by category)
- Tool error rates, underused tools (used in <=5% of sessions)
- Low success rate goals (<50%)
- Time range of analyzed data
Implements the ADK BaseMemoryService interface, enabling agents to access cross-session context stored in BigQuery.
Composed sub-services:
BigQueryMemoryService (ADK BaseMemoryService)
├── BigQuerySessionMemory — Recent session context retrieval
├── BigQueryEpisodicMemory — Semantic similarity search
├── ContextManager — Token-budget-aware memory selection
└── UserProfileBuilder — User behavior profiling via LLM
Semantic search strategy:
1. Try vector similarity search (if embedding_model configured)
- Generate query embedding via genai.embed_content()
- Run ML.DISTANCE(embedding, @query_embedding, 'COSINE')
- Return top-k by distance ASC
2. Fall back to keyword overlap search
- Fetch recent USER_MESSAGE_RECEIVED events
- Score: |query_words ∩ message_words| / |query_words|
- Return top-k by score DESC
Context selection algorithm (ContextManager.select_relevant_context()):
score = relevance_weight * relevance + recency_weight * recency
# relevance: word overlap between current task and memory content
# recency: exponential decay with 24-hour half-life: 2^(-age_hours/24)
# Greedy selection within token budget:
for memory in sorted_by_score_desc:
tokens = len(memory_text) // 4 + 10 # rough estimate
if current_tokens + tokens <= max_context_tokens:
select(memory)Direct wrappers around BigQuery's native ML capabilities:
| Class | BigQuery Feature | Use Case |
|---|---|---|
BigQueryAIClient |
AI.GENERATE |
Text generation, trace analysis |
EmbeddingSearchClient |
AI.EMBED (scalar); legacy ML.GENERATE_EMBEDDING |
Semantic search over traces |
AnomalyDetector |
AI.DETECT_ANOMALIES, AI.FORECAST (TimesFM); legacy ML.DETECT_ANOMALIES, ML.FORECAST (ARIMA_PLUS); ML.DETECT_ANOMALIES (AUTOENCODER) |
Latency anomalies, latency forecasting, behavioral anomalies |
BatchEvaluator |
AI.GENERATE with output_schema |
Batch session evaluation with persistence |
AI.FORECAST latency (primary — no model training needed):
SELECT *
FROM AI.FORECAST(
(SELECT TIMESTAMP_TRUNC(timestamp, HOUR) AS hour,
AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) AS avg_latency
FROM `{table}`
WHERE event_type = 'LLM_RESPONSE'
AND timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL @training_days DAY)
GROUP BY hour),
horizon => 24,
confidence_level => 0.95,
timestamp_col => 'hour',
data_col => 'avg_latency'
)Legacy ARIMA latency model (requires use_legacy_anomaly_model=True):
CREATE OR REPLACE MODEL `{dataset}.latency_anomaly_model`
OPTIONS(
model_type = 'ARIMA_PLUS',
time_series_timestamp_col = 'hour',
time_series_data_col = 'avg_latency',
auto_arima = TRUE,
data_frequency = 'HOURLY'
) AS
SELECT TIMESTAMP_TRUNC(timestamp, HOUR) AS hour,
AVG(CAST(JSON_VALUE(latency_ms, '$.total_ms') AS FLOAT64)) AS avg_latency
FROM `{table}`
WHERE event_type = 'LLM_RESPONSE'
AND timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL @training_days DAY)
GROUP BY hourAutoencoder behavioral model:
CREATE OR REPLACE MODEL `{dataset}.behavior_anomaly_model`
OPTIONS(
model_type = 'AUTOENCODER',
activation_fn = 'RELU',
hidden_units = [16, 8, 16], -- symmetric encoder-decoder
l2_reg = 0.0001,
learn_rate = 0.001
) AS
SELECT total_events, tool_calls, tool_errors, llm_calls,
avg_latency, session_duration
FROM `{dataset}.session_features`Detection uses STRUCT(0.01 AS contamination) as the anomaly threshold, flagging ~1% of sessions as anomalous.
Notebook-friendly alternative using BigFrames (pandas-compatible API backed by BigQuery):
# Instead of SQL strings, uses BigFrames operations:
df = bpd.read_gbq(query)
df["prompt"] = prompt_prefix + df["trace_text"]
result = bbq.ai.generate(
df["prompt"],
endpoint=endpoint,
output_schema={"score": "INT64", "justification": "STRING"}
)Returns bigframes.DataFrame that can be displayed directly in Jupyter notebooks.
Builds a BigQuery Property Graph that cross-links technical execution traces to business-domain entities. The module is organized around a 4-pillar architecture:
Pillar 1: TechNodes — The existing agent_events table serves as the technical graph. Each row is a node with span_id, parent_span_id, event_type, etc.
Pillar 2: BizNodes — Business entities extracted from trace payloads via AI.GENERATE with structured output_schema. Stored in context_graph_biz_nodes with composite key biz_node_id = span_id:node_type:node_value.
Pillar 3: Caused edges — Implicit edges from TechNode to BizNode via the shared span_id foreign key.
Pillar 4: Evaluated cross-links — Explicit edges stored in context_graph_cross_links, connecting BizNodes to their evaluation context. Carries artifact_uri and link_type.
Key design decisions:
| Decision | Rationale |
|---|---|
output_schema in AI.GENERATE |
Forces structured JSON output; eliminates post-hoc parsing failures |
Composite biz_node_id |
Prevents key collisions when the same span produces multiple entities |
| MERGE with DELETE | WHEN NOT MATCHED BY SOURCE ... DELETE cleans stale nodes on re-extraction |
| Fail-closed world-change | Query or callback errors → check_failed=True, is_safe_to_approve=False |
| Legacy endpoint rejection | project.dataset.model refs raise ValueError instead of silently producing bad URLs |
Key methods on ContextGraphManager:
| Method | Description |
|---|---|
build_context_graph(session_ids) |
End-to-end pipeline: extract → cross-link → create graph |
extract_biz_nodes(session_ids) |
Extract business entities via AI.GENERATE or client-side |
store_biz_nodes(nodes) |
Persist BizNodes to BigQuery |
create_cross_links(session_ids) |
Create Evaluated edges between BizNodes and TechNodes |
create_property_graph() |
Execute CREATE PROPERTY GRAPH DDL |
detect_world_changes(session_id, fn) |
Check if business entities have drifted since evaluation |
reconstruct_trace_gql(session_id) |
GQL-based trace reconstruction with quantified-path traversal |
explain_decision(event_type, entity) |
Get the reasoning chain for a specific decision |
get_biz_nodes_for_session(session_id) |
Read BizNodes for a session |
Classifies agent sessions into user-defined categories (e.g., GOOD / NEEDS_IMPROVEMENT / CRITICAL) using BigQuery AI.GENERATE with automatic Gemini API fallback.
Key design decisions:
| Decision | Rationale |
|---|---|
| BigQuery-first with Gemini fallback | AI.GENERATE processes batches server-side; Gemini API provides universal access when BigQuery ML is unavailable |
execution_mode tracking |
Every result records whether it used bq_ai_generate, api_fallback, or api_direct, enabling operational monitoring |
Configurable prompt_version |
Allows A/B testing of prompt strategies with per-version result tracking |
Persistence to categorical_results |
Append-only streaming insert; dedup is handled at the view layer |
Key classes:
CategoricalEvaluationConfig— metric definitions with categories, thresholds, and prompt templatesCategoricalEvaluator— orchestrates batch evaluation withevaluate()andevaluate_and_persist()
Provides pre-aggregated BigQuery views over the categorical_results table with deduplication at read time.
Views:
| View | Purpose |
|---|---|
categorical_results_latest |
Base dedup view using ROW_NUMBER() OVER (PARTITION BY session_id, metric_name, prompt_version) |
categorical_daily_counts |
Daily category distribution by metric |
categorical_hourly_counts |
Rolling hourly counts for near-real-time dashboards |
categorical_operational_metrics |
Parse error rate + fallback rate by day and endpoint |
All downstream views query from the dedup base, not the raw append-only table.
Six modules that together implement a fully YAML-driven graph extraction and materialization pipeline. See Ontology Graph V4 Design for the full specification.
Module responsibilities:
| Module | Class / Function | Role |
|---|---|---|
ontology_models.py |
GraphSpec, EntitySpec, load_graph_spec() |
YAML parsing, {{ env }} resolution, Pydantic validation |
ontology_schema_compiler.py |
compile_extraction_prompt(), compile_output_schema() |
Deterministic prompt + JSON schema generation from spec |
ontology_graph.py |
OntologyGraphManager |
AI.GENERATE extraction → ExtractedGraph hydration |
ontology_materializer.py |
OntologyMaterializer |
Table creation + streaming insert with delete-then-insert idempotency |
ontology_property_graph.py |
OntologyPropertyGraphCompiler |
YAML → CREATE PROPERTY GRAPH DDL transpilation |
ontology_orchestrator.py |
build_ontology_graph(), compile_showcase_gql() |
One-shot pipeline + GQL query generation |
Key design decisions:
| Decision | Rationale |
|---|---|
| YAML over code | Domain experts define ontologies without Python; spec changes don't require redeployment |
| Label-only inheritance | extends adds graph labels, not property/binding inheritance — keeps resolution deterministic |
| Session-scoped node identity | session_id in every KEY prevents cross-session entity collisions |
| All columns in PROPERTIES | BigQuery Property Graph KEY columns are not queryable in GQL unless also listed in PROPERTIES |
Typer-based CLI exposing SDK functionality for CI/CD pipelines, cron jobs, and ad-hoc use.
Command groups:
| Command | Description |
|---|---|
doctor |
Validate BigQuery connectivity and table schema |
traces list / traces get |
List and inspect agent traces |
evaluate |
Run code or LLM evaluation |
categorical-eval |
Run categorical evaluation with optional persistence |
categorical-views |
Create dashboard views over categorical results |
insights |
Generate multi-stage analysis reports |
drift |
Compare production traces against golden datasets |
distribution |
Analyze question distribution patterns |
views create-all |
Create per-event-type BigQuery views |
ontology-build |
Run the full ontology graph pipeline |
ontology-showcase-gql |
Generate GQL traversal queries from spec |
All commands support --format (json/text/table) output via the formatter.py module.
Pure analytical functions designed to run inside BigQuery Python UDFs. These kernels are deterministic, side-effect free, and depend only on the Python standard library.
Available kernels: classify_event_family, extract_tool_outcome, compute_latency_bucket, is_error_event, extract_response_text.
udf_sql_templates.py generates the CREATE FUNCTION SQL for registering these kernels as BigQuery UDFs.
The canonical schema written by the ADK plugin and read by this SDK:
| Column | Type | Mode | Description |
|---|---|---|---|
timestamp |
TIMESTAMP | REQUIRED | UTC event creation time (microsecond precision) |
event_type |
STRING | NULLABLE | Event category (e.g., LLM_REQUEST, TOOL_COMPLETED) |
agent |
STRING | NULLABLE | Agent name |
session_id |
STRING | NULLABLE | Persistent conversation thread identifier |
invocation_id |
STRING | NULLABLE | Single execution turn identifier |
user_id |
STRING | NULLABLE | User identifier |
trace_id |
STRING | NULLABLE | OpenTelemetry trace ID (32-char hex) |
span_id |
STRING | NULLABLE | OpenTelemetry span ID (16-char hex) |
parent_span_id |
STRING | NULLABLE | Parent span ID for hierarchy reconstruction |
content |
JSON | NULLABLE | Polymorphic event payload |
content_parts |
RECORD | REPEATED | Multimodal segments (text, image, GCS refs) |
attributes |
JSON | NULLABLE | Metadata: model info, token usage, custom tags |
latency_ms |
JSON | NULLABLE | Performance metrics (total_ms, time_to_first_token_ms) |
status |
STRING | NULLABLE | OK or ERROR |
error_message |
STRING | NULLABLE | Exception message when status is ERROR |
is_truncated |
BOOLEAN | NULLABLE | True if content exceeded 10MB cell limit |
Partitioning: PARTITION BY DATE(timestamp)
Clustering: CLUSTER BY event_type, agent, user_id
The content JSON column is polymorphic — its structure depends on event_type:
USER_MESSAGE_RECEIVED:
{"text_summary": "What is the weather in NYC?"}
AGENT_STARTING:
"You are a helpful assistant..." (system prompt as string)
LLM_REQUEST:
{"system_prompt": "...", "prompt": [{"role": "user", "content": "..."}]}
LLM_RESPONSE:
{"response": "The weather is...", "usage": {"completion": 19, "prompt": 10129, "total": 10148}}
TOOL_STARTING:
{"tool": "get_weather", "args": {"city": "NYC"}}
TOOL_COMPLETED:
{"tool": "get_weather", "result": {"temp": 72, "condition": "sunny"}}
TOOL_ERROR / LLM_ERROR:
(content may be present; error_message column has the error)
STATE_DELTA:
(attributes.state_delta has the state change)
The content_parts REPEATED RECORD stores multimodal content segments:
ARRAY<STRUCT<
mime_type STRING, -- "text/plain", "image/png", etc.
uri STRING, -- Direct URI if applicable
object_ref STRUCT< -- GCS offloaded reference
uri STRING, -- "gs://bucket/path/file.txt"
version STRING,
authorizer STRING, -- BQ connection for signed URL generation
details JSON
>,
text STRING, -- Inline text content
part_index INT64, -- Position in original content
part_attributes STRING, -- Additional metadata
storage_mode STRING -- "INLINE" or "GCS_REFERENCE"
>>The SDK may create additional tables for derived data:
| Table | Created By | Purpose |
|---|---|---|
trace_embeddings |
EmbeddingSearchClient.build_embeddings_index() |
Vector embeddings for semantic search |
session_features |
AnomalyDetector.train_behavior_model() |
Aggregated session features for autoencoder |
session_evaluations |
BatchEvaluator.store_evaluation_results() |
Persisted evaluation scores |
latency_arima_model |
AnomalyDetector.train_latency_model() |
ARIMA time-series model |
behavior_autoencoder_model |
AnomalyDetector.train_behavior_model() |
Autoencoder anomaly detection model |
context_graph_biz_nodes |
ContextGraphManager.extract_biz_nodes() |
Business entities extracted from traces via AI.GENERATE |
context_graph_cross_links |
ContextGraphManager.create_cross_links() |
Cross-link edges connecting BizNodes to TechNodes |
agent_context_graph |
ContextGraphManager.create_property_graph() |
BigQuery Property Graph (DDL) with TechNode, BizNode, Caused, Evaluated |
Key SDK data models (all Pydantic v2 BaseModel):
SessionScore
├── session_id: str
├── scores: dict[str, float] # metric_name -> normalized score [0,1]
├── passed: bool
└── llm_feedback: dict | None
EvaluationReport
├── evaluator_name: str
├── session_scores: list[SessionScore]
├── aggregate_scores: dict[str, float] # normalized metrics only
├── details: dict[str, Any] # operational metadata (parse_errors, etc.)
├── pass_rate: float # computed property
└── summary() -> str
EvaluationResult
├── session_id: str
├── eval_status: EvalStatus # PASSED | FAILED | ERROR
├── scores: dict[str, float]
├── overall_score: float
├── details: dict
└── error: str | None
InsightsReport
├── session_facets: list[SessionFacet]
├── session_metadata: list[SessionMetadata]
├── aggregated: AggregatedInsights
├── analysis_sections: list[AnalysisSection]
├── executive_summary: str
└── summary() -> str
BizNode (dataclass)
├── span_id: str
├── session_id: str
├── node_type: str # "Product", "Targeting", "Campaign", "Budget"
├── node_value: str
├── confidence: float # 0.0-1.0
├── evaluated_at: datetime | None
├── artifact_uri: str | None # GCS URI for persisted artifacts
└── metadata: dict
WorldChangeReport (Pydantic)
├── session_id: str
├── alerts: list[WorldChangeAlert]
├── total_entities_checked: int
├── stale_entities: int
├── is_safe_to_approve: bool
├── check_failed: bool # fail-closed on query/callback errors
├── checked_at: datetime
└── summary() -> str
All queries follow this pattern to prevent SQL injection:
query = """
SELECT session_id, COUNT(*) AS event_count
FROM `{project}.{dataset}.{table}`
WHERE {where_clause}
GROUP BY session_id
"""
# Table references: Python string formatting (not user-controlled)
formatted = query.format(
project=self.project_id,
dataset=self.dataset_id,
table=self.table_id,
where_clause=where_sql,
)
# User-controlled values: BigQuery parameters
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("agent_id", "STRING", agent_id),
bigquery.ArrayQueryParameter("session_ids", "STRING", session_ids),
]
)
results = client.query(formatted, job_config=job_config)Important distinction:
- Table names (
project.dataset.table) are interpolated via Python f-strings — these come from constructor parameters, not user input - Filter values (
agent_id,session_ids, timestamps) use BigQuery@paramsyntax with typedQueryParameterobjects
| Module | Template | Purpose |
|---|---|---|
client.py |
_SESSION_EVENTS_QUERY |
Fetch all events for a session |
client.py |
_LIST_SESSIONS_QUERY |
Discover sessions matching filter |
system_evaluator.py |
SESSION_SUMMARY_QUERY |
Aggregate session metrics for code evaluation |
system_evaluator.py |
AI_GENERATE_JUDGE_BATCH_QUERY |
Batch LLM-as-judge via AI.GENERATE |
system_evaluator.py |
LLM_JUDGE_BATCH_QUERY |
Legacy batch evaluation via ML.GENERATE_TEXT |
performance_evaluator.py |
_SESSION_TRACE_QUERY |
Fetch trace for trajectory matching |
insights.py |
_SESSION_METADATA_QUERY |
Aggregate session metadata |
insights.py |
_SESSION_TRANSCRIPT_QUERY |
Build session transcripts |
insights.py |
_AI_GENERATE_FACET_EXTRACTION_QUERY |
Extract structured facets via AI.GENERATE |
insights.py |
_AI_GENERATE_ANALYSIS_QUERY |
Run analysis prompt via AI.GENERATE |
insights.py |
_LEGACY_FACET_EXTRACTION_QUERY |
Legacy facet extraction via ML.GENERATE_TEXT |
insights.py |
_LEGACY_ANALYSIS_QUERY |
Legacy analysis via ML.GENERATE_TEXT |
feedback.py |
_PRODUCTION_QUESTIONS_QUERY |
Extract production questions |
feedback.py |
_GOLDEN_QUESTIONS_QUERY |
Load golden dataset questions |
feedback.py |
_SEMANTIC_DRIFT_QUERY |
Embedding-based drift detection |
feedback.py |
_FREQUENTLY_ASKED_QUERY |
Question frequency analysis |
feedback.py |
_FREQUENTLY_UNANSWERED_QUERY |
Unanswered question analysis |
feedback.py |
_AI_GENERATE_SEMANTIC_GROUPING_QUERY |
Question classification via AI.GENERATE |
feedback.py |
_LEGACY_SEMANTIC_GROUPING_QUERY |
Legacy question classification |
memory_service.py |
_RECENT_CONTEXT_QUERY |
Recent session history for a user |
memory_service.py |
_SIMILARITY_SEARCH_QUERY |
Keyword-based memory search |
memory_service.py |
_VECTOR_SEARCH_QUERY |
Vector similarity memory search |
memory_service.py |
_USER_STATS_QUERY |
User statistics for profile building |
memory_service.py |
_USER_MESSAGES_QUERY |
Recent user messages for profiling |
ai_ml_integration.py |
_AI_GENERATE_QUERY |
Direct text generation |
ai_ml_integration.py |
_VECTOR_SEARCH_QUERY |
Embedding vector search |
ai_ml_integration.py |
_CREATE_EMBEDDINGS_TABLE_QUERY |
Create embeddings table DDL |
ai_ml_integration.py |
_INDEX_EMBEDDINGS_QUERY |
Build/refresh embeddings index |
ai_ml_integration.py |
_CREATE_LATENCY_MODEL_QUERY |
ARIMA model training DDL |
ai_ml_integration.py |
_DETECT_LATENCY_ANOMALIES_QUERY |
ARIMA anomaly detection |
ai_ml_integration.py |
_CREATE_BEHAVIOR_MODEL_QUERY |
Autoencoder model training DDL |
ai_ml_integration.py |
_BATCH_EVALUATION_QUERY |
Batch evaluation via AI.GENERATE |
Evaluation
├── Deterministic (SystemEvaluator)
│ ├── Latency
│ ├── Turn count
│ ├── Error rate
│ ├── Token efficiency
│ ├── Time to first token
│ ├── Cost per session
│ └── Custom metric functions
│
├── Performance (PerformanceEvaluator)
│ ├── Correctness
│ ├── Hallucination
│ ├── Efficiency
│ ├── Sentiment
│ ├── Trajectory matching (exact, in-order, any-order, step efficiency)
│ └── Custom criteria with prompt templates
│
├── Composite (AggregateGrader)
│ ├── Weighted average
│ ├── Binary (all-pass)
│ └── Majority vote
│
└── Statistical (MultiTrialPerformanceEvaluator)
├── pass@k
├── pass^k
└── Per-trial pass rate
All evaluation scores in the SDK are normalized to [0.0, 1.0]:
| Source | Raw Range | Normalization |
|---|---|---|
| Code metrics | Already [0, 1] | None needed |
| LLM judge (API) | Integer 1-10 | Divide by 10.0 |
| LLM judge (AI.GENERATE) | INT64 1-10 | Divide by 10.0 |
| Trajectory match | Already [0, 1] | None needed |
| Step efficiency | Already [0, 1] | None needed |
| Anomaly severity | Already [0, 1] | None needed |
| Mode | Evaluator | Where Computation Runs |
|---|---|---|
| Single session (sync) | SystemEvaluator.evaluate_session() |
Python |
| Single session (async) | PerformanceEvaluator.evaluate_session() |
Python, Gemini API |
| Batch via Client | Client.evaluate() |
BigQuery (SQL + AI.GENERATE) |
| Trajectory matching | PerformanceEvaluator.evaluate_session() |
BigQuery (fetch) + Python (matching) |
| Multi-trial | MultiTrialPerformanceEvaluator.run_trials() |
BigQuery (fetch) + Python (N iterations) |
| Pipeline | AggregateGrader.evaluate() |
Mixed (code=Python, LLM=API/BQ) |
| DataFrame | BigFramesEvaluator.evaluate_sessions() |
BigQuery (BigFrames + AI.GENERATE) |
The SDK supports three paths for LLM-based operations, selected automatically:
Tier 1: AI.GENERATE (preferred)
├── Modern BigQuery SQL function
├── Endpoint: model name (e.g., "gemini-2.5-flash")
├── Supports output_schema for typed structured output
├── Zero data movement — computation runs in BigQuery
└── No pre-created model required
Tier 2: ML.GENERATE_TEXT (legacy)
├── Classic BigQuery ML function
├── Endpoint: fully-qualified model ref (e.g., "project.dataset.model_name")
├── Requires pre-created BQML remote model
├── Output is unstructured text (requires JSON parsing)
└── Detected by: endpoint.count(".") >= 2
Tier 3: Gemini API (fallback)
├── Direct API call via google-genai library
├── Used when BQ AI functions unavailable
├── Requires google-genai optional dependency
├── Data transfers to/from API endpoint
└── Used by: insights API fallback, memory service, grader pipeline
When using Tier 1, the SDK leverages output_schema for type-safe LLM output:
AI.GENERATE(
(prompt_text),
endpoint => 'gemini-2.5-flash',
output_schema => STRUCT<
score INT64,
justification STRING
>(...),
temperature => 0.3
)This eliminates JSON parsing fragility — BigQuery returns typed columns directly.
When structured output is not available, multi-strategy JSON extraction is used:
def _parse_json_from_text(text: str) -> dict | None:
# Strategy 1: Extract from markdown code fence
match = re.search(r'```json\s*(.*?)\s*```', text, re.DOTALL)
if match:
return json.loads(match.group(1))
# Strategy 2: Find JSON object in raw text
match = re.search(r'\{.*\}', text, re.DOTALL)
if match:
return json.loads(match.group(0))
# Strategy 3: Regex extraction of specific fields
# ... pattern-specific extraction ...
return NoneThe SDK faces a tension: BigQuery operations are I/O-bound (network calls) and benefit from async concurrency, but many users (especially in notebooks) expect synchronous APIs.
Solution: Async-first internals with synchronous Client wrapper.
Synchronous (user-facing):
├── Client.get_trace()
├── Client.list_traces()
├── Client.evaluate()
├── Client.drift_detection()
├── Client.insights()
├── Client.deep_analysis()
├── SystemEvaluator.evaluate_session()
├── EvalSuite.*
├── EvalValidator.*
└── BigFramesEvaluator.*
Async (internal / advanced users):
├── PerformanceEvaluator.evaluate_session()
├── PerformanceEvaluator.evaluate_batch()
├── MultiTrialPerformanceEvaluator.run_trials()
├── MultiTrialPerformanceEvaluator.run_trials_batch()
├── AggregateGrader.evaluate()
├── BigQueryMemoryService.search_memory()
├── BigQueryMemoryService.get_session_context()
├── compute_drift()
├── compute_question_distribution()
├── All ai_ml_integration functions
└── All insights pipeline stages
# Client sync-to-async bridge pattern:
def evaluate(self, evaluator, filters=None):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(
self._run_evaluation(evaluator, filters)
)
finally:
loop.close()
# BigQuery call in async context (blocking I/O wrapped in executor):
async def _execute_query(self, query, params):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.client.query(query, job_config=config).result()
)MultiTrialPerformanceEvaluator and PerformanceEvaluator.evaluate_batch() use asyncio.Semaphore for bounded concurrency:
semaphore = asyncio.Semaphore(concurrency)
async def _run_one(task):
async with semaphore:
return await evaluator.evaluate_session(**task)
results = await asyncio.gather(*[_run_one(t) for t in tasks])evaluator = SystemEvaluator(name="custom").add_metric(
name="business_metric",
fn=lambda session: your_scoring_logic(session),
threshold=0.7,
)The metric function receives the full session summary dict and returns float in [0, 1].
evaluator = PerformanceEvaluator(
project_id="my-project",
dataset_id="agent_analytics",
)
# Register a custom semantic evaluation rubric with a pass/fail threshold
evaluator.add_rubric(
name="brand_alignment",
prompt_template="Does the agent explicitly mention the company name and remain positive? Rate 1 to 5.",
score_key="brand_alignment",
threshold=4.0,
)def my_grader(context: dict) -> GraderResult:
# context has: session_summary, trace_text, final_response
return GraderResult(
grader_name="my_grader",
scores={"metric": 0.9},
passed=True,
)
pipeline.add_custom_grader("my_grader", my_grader)config = InsightsConfig(
analysis_prompts=["custom_prompt_1", "custom_prompt_2"],
)Every class that uses BigQuery accepts an optional client parameter:
Client(project_id="...", dataset_id="...", bq_client=custom_client)
BigQueryTraceEvaluator(..., bq_client=mock_client) -> PerformanceEvaluator(..., bq_client=mock_client)
BigQueryAIClient(..., client=mock_client)- Never crash on observability failures. The SDK is an analytics tool — it should degrade gracefully, not block the user.
- Log warnings, return defaults. When a component fails (BQ query, LLM call, JSON parsing), log at WARNING level and return a safe default (empty list, zero score, etc.).
- Individual failures don't poison batches. A single session evaluation failure in a batch produces a failed result entry; the rest of the batch continues.
| Layer | Strategy | Default |
|---|---|---|
| BigQuery query failure | Catch Exception, log WARNING |
Empty result set |
| Schema verification failure | Log WARNING, continue | No enforcement |
| LLM call failure | Catch Exception, log WARNING |
Score 0.0, passed=False |
| JSON parse failure | Multi-strategy parser, then default | SessionFacet with default values |
| Import error (optional deps) | Catch ImportError in __init__.py |
Feature unavailable, DEBUG log |
| Single metric failure | Catch in metric loop | Score 0.0 for that metric |
| Single grader failure | Catch in grader loop | Failed GraderResult, continue |
| Facet validation error | Clamp/default invalid values | Validated SessionFacet |
When AI.GENERATE returns values outside the expected vocabulary:
def parse_facet_from_ai_generate_row(session_id, row):
outcome = row.get("outcome", "unclear")
if outcome not in OUTCOMES:
outcome = "unclear" # Default to safe value
effectiveness = row.get("agent_effectiveness", 5.0)
effectiveness = max(1.0, min(10.0, effectiveness)) # Clamp to [1, 10]
key_topics = row.get("key_topics", [])[:5] # Truncate to max 5
summary = (row.get("summary", "") or "")[:200] # Truncate to 200 charsAll tests mock BigQuery — no GCP credentials or live BigQuery access is needed.
tests/
├── test_sdk_client.py # Client integration tests
├── test_sdk_evaluators.py # SystemEvaluator + PerformanceEvaluator
├── test_sdk_trace.py # Trace/Span reconstruction
├── test_sdk_feedback.py # Drift detection
├── test_sdk_insights.py # Insights pipeline
├── test_trace_evaluator.py # Trajectory matching
├── test_multi_trial.py # Multi-trial runner
├── test_grader_pipeline.py # Grader composition
├── test_eval_suite.py # Eval suite lifecycle
├── test_eval_validator.py # Static validation
├── test_memory_service.py # Memory service
├── test_ai_ml_integration.py # AI/ML integration
├── test_bigframes_evaluator.py # BigFrames evaluator
├── test_categorical_evaluator.py # Categorical evaluation engine
├── test_categorical_views.py # Dashboard view SQL generation
├── test_context_graph.py # Context Graph V2/V3
├── test_ontology_models.py # YAML spec parsing + validation
├── test_ontology_schema_compiler.py# Schema + prompt compilation
├── test_ontology_graph.py # Ontology extraction + hydration
├── test_ontology_materializer.py # Table creation + materialization
├── test_ontology_property_graph.py # DDL transpilation
├── test_ontology_orchestrator.py # End-to-end orchestrator + GQL
├── test_cli.py # CLI command tests
├── test_event_semantics.py # Event semantic layer
├── test_views.py # BigQuery view management
├── test_formatter.py # Output formatting
├── test_serialization.py # JSON serialization
├── test_udf_kernels.py # UDF kernel functions
└── test_udf_sql_templates.py # UDF SQL generation
1297 tests as of v0.3.0, all running without GCP credentials.
BigQuery client mocking via dependency injection:
mock_bq = MagicMock()
mock_job = MagicMock()
mock_job.result.return_value = [mock_row_1, mock_row_2]
mock_bq.query.return_value = mock_job
client = Client(
project_id="test-project",
dataset_id="test-dataset",
verify_schema=False, # Skip schema verification query
bq_client=mock_bq,
)Mock row protocol:
BigQuery rows must implement a dict-like interface. Mock rows are constructed with:
def _make_mock_row(data: dict):
row = MagicMock()
row.__iter__ = Mock(return_value=iter(data.keys()))
row.get = data.get
row.keys = data.keys
row.values = data.values
row.items = data.items
row.__getitem__ = data.__getitem__
return rowpytest-asyncio with asyncio_mode = "auto" enables async tests without decorators:
class TestTraceEvaluator:
async def test_evaluate_session(self):
# No @pytest.mark.asyncio needed
result = await evaluator.evaluate_session(...)
assert result.eval_status == EvalStatus.PASSED- Table references: Interpolated from constructor parameters (developer-controlled, not user input)
- Filter values: Always use
bigquery.QueryParameterobjects with typed parameters - Dynamic WHERE clauses: Generated by
TraceFilter.to_sql_conditions()which only produces parameterized conditions
- The ADK plugin supports
content_formatterfor redaction before logging - The SDK reads content as-is; redaction is the producer's responsibility
event_allowlist/event_denyliston the plugin side controls what gets logged
- Uses Google Cloud Application Default Credentials (ADC)
- No credential storage or management in the SDK
- IAM roles required:
bigquery.jobUser(project),bigquery.dataReader(table)
bigframes_evaluator.py constructs SQL with inline f-string interpolation for session IDs rather than parameterized queries. This is a known limitation — the values come from the SDK's own query results, not user input, but it deviates from the parameterized query pattern used elsewhere.
The following capabilities have been delivered since the original design:
Context Graph V2/V3 (v0.2):
- 4-Pillar Property Graph: TechNode + BizNode + Caused edges + Evaluated cross-links
- BizNode extraction via
AI.GENERATEwith structuredoutput_schema - GQL trace reconstruction replacing recursive CTEs with quantified-path traversal
- World-change detection with fail-closed semantics for HITL safety
- Decision Semantics: DecisionPoint + Candidate node types with rejection rationale tracking
Ontology Graph V4 (v0.3):
- Configuration-driven graph pipeline: YAML spec → AI extraction → materialization → Property Graph
- Fully generic — any business ontology can be modeled without code changes
build_ontology_graph()one-shot orchestrator andcompile_showcase_gql()query generator- See Ontology Graph V4 Design
Categorical Evaluation (v0.3):
- User-defined categorical classification with configurable categories and prompt templates
- BigQuery
AI.GENERATEwith automatic Gemini API fallback - Persistent result storage with append-only writes and dedup-at-read views
- Dashboard views: daily/hourly counts, operational metrics (parse error rate, fallback rate)
- CLI exposure via
categorical-evalandcategorical-viewscommands
CLI & Interfaces (v0.2):
- Typer-based CLI with 12+ commands covering traces, evaluation, insights, drift, views, ontology
- JSON/text/table output formatting
- Serialization layer for Remote Function boundaries
Python UDF Kernels (v0.2):
- Pure analytical kernels for BigQuery Python UDFs
- SQL template generation for UDF registration
Streaming / near-real-time evaluation:
The batch evaluation model works for post-hoc analysis. Micro-batch evaluation (e.g., TraceFilter(last="5m") + cron) provides near-real-time coverage. True streaming evaluation via BigQuery subscriptions or Pub/Sub remains a future option.
Cross-session entity resolution: The Ontology Graph V4 creates session-scoped nodes by design. Deduplicating business entities across sessions (e.g., "Yahoo Homepage" appearing in multiple campaigns) would enable cross-session graph analytics.
Graph-based anomaly detection: GQL pattern matching over Property Graphs could detect unusual execution paths (e.g., a tool call sequence that never appeared in golden datasets).
Embedding-based drift detection:
The SQL template for semantic drift (_SEMANTIC_DRIFT_QUERY) exists but is not wired into the execution path. Activating vector-based drift detection would improve coverage detection accuracy.
Multi-agent trace support: Multi-agent orchestration (where an agent delegates to sub-agents) would benefit from cross-session trace correlation and agent-specific evaluation.
Cost attribution: Deeper cost attribution by tool, agent, and prompt version would enable cost optimization at the component level beyond the current per-session estimates.