Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@ APP_HOST=0.0.0.0
APP_PORT=8000
APP_RELOAD=true
APP_ORIGIN=http://localhost:5173
ENABLE_LIVE_LOGS=false
LIVE_LOGS_MAX_ROWS=5000
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_LOGS_TOPIC=application-logs
KAFKA_GROUP_ID=ai-observability-agent
GROQ_API_KEY=
GOOGLE_API_KEY=
PINECONE_API_KEY=
PINECONE_INDEX_NAME=ai-observability-agent
PINECONE_NAMESPACE=observability-docs
LANGCHAIN_API_KEY=
LANGCHAIN_TRACING_V2=true
LANGCHAIN_PROJECT=ai-observability-agent
EMBEDDING_MODEL=sentence-transformers/all-MiniLM-L6-v2
LLM_PROVIDER=groq
GROQ_MODEL=llama-3.1-8b-instant
GOOGLE_MODEL=gemini-3-flash-preview
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ __pycache__/
node_modules/
dist/
docs
logs/
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ Small agentic AI project scaffold for learning:
uvicorn backend.main:app --reload --app-dir .
```

The backend now writes structured JSON logs to `logs/backend.log` and mirrors them to stdout.
Each HTTP response also includes an `X-Request-ID` header, and chat responses include observability metadata such as intent, tools used, retrieval source, and per-stage latency.

To switch LLM providers, configure `.env` like this:

```bash
LLM_PROVIDER=google
GOOGLE_API_KEY=your_google_api_key
GOOGLE_MODEL=gemini-3-flash-preview
```

Groq remains available with:

```bash
LLM_PROVIDER=groq
GROQ_API_KEY=your_groq_api_key
GROQ_MODEL=llama-3.1-8b-instant
```

5. Ingest documents:

```bash
Expand Down Expand Up @@ -85,8 +104,31 @@ Then verify:

```bash
curl http://localhost:8000/api/health
curl http://localhost:8000/api/debug/status
```

### Runtime modes

Optional environment flags:

- `MOCK_MODE=true` disables live LLM usage and keeps responses deterministic for demos.
- `USE_PINECONE=false` forces local document fallback retrieval.
- `ENABLE_MCP=false` disables the MCP server startup path.

### Evaluation dataset

Sample regression-style prompts live in:

- [evaluation/sample_queries.json](/Users/suraj/Documents/Codex/ai-observability-agent/evaluation/sample_queries.json)

They capture expected intent, tool path, and evidence type so you can measure changes over time.

### MCP config

A starter MCP client configuration is included at:

- [mcp.json](/Users/suraj/Documents/Codex/ai-observability-agent/mcp.json)

### CI

CI runs on pull requests and pushes to `main`:
Expand Down
15 changes: 15 additions & 0 deletions backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,18 @@ python scripts/test_mcp_client.py --list-tools
python scripts/test_mcp_client.py --tool search_logs --args '{"query":"error","limit":3}'
python scripts/test_mcp_client.py --tool get_metrics --args '{"service_name":"checkout-service"}'
```

## MCP client config

This repo now includes a starter [mcp.json](/Users/suraj/Documents/Codex/ai-observability-agent/mcp.json) file at the repository root.
It shows one way an MCP-aware client can launch the server with:

```bash
python -m backend.mcp.server
```

If you do not want the MCP server enabled, set:

```bash
ENABLE_MCP=false
```
53 changes: 46 additions & 7 deletions backend/agent/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

from time import perf_counter
from typing import Any, TypedDict

from langchain_groq import ChatGroq
Expand All @@ -10,23 +11,58 @@
from backend.agent.nodes import classify_query, execute_tools, generate_answer, retrieve_context
from backend.config import get_settings

try:
from langchain_google_genai import ChatGoogleGenerativeAI
except ImportError: # pragma: no cover
ChatGoogleGenerativeAI = None


class AgentState(TypedDict, total=False):
query: str
intent: str
retrieved_context: str
tool_context: str
answer: str
tools_used: list[str]
retrieval_source: str
retrieval_hit: bool
evidence_found: bool
llm_enabled: bool
error: str | None
stage_latencies_ms: dict[str, float]
total_latency_ms: float


def _build_llm() -> ChatGroq | None:
def _build_llm() -> Any:
settings = get_settings()
if not settings.groq_api_key:
if settings.mock_mode:
return None
return ChatGroq(
api_key=settings.groq_api_key,
model=settings.groq_model,
temperature=0.2,

provider = settings.llm_provider.lower()
if provider == "groq":
if not settings.groq_api_key:
return None
return ChatGroq(
api_key=settings.groq_api_key,
model=settings.groq_model,
temperature=0.2,
)

if provider == "google":
if ChatGoogleGenerativeAI is None:
raise RuntimeError(
"Install `langchain-google-genai` to use Gemini models."
)
if not settings.google_api_key:
return None
return ChatGoogleGenerativeAI(
google_api_key=settings.google_api_key,
model=settings.google_model,
temperature=0.2,
)

raise RuntimeError(
f"Unsupported LLM_PROVIDER `{settings.llm_provider}`. Use `groq` or `google`."
)


Expand All @@ -51,5 +87,8 @@ def build_graph() -> Any:

def run_agent(query: str) -> dict[str, Any]:
"""Run the full agent graph for a user query."""
started_at = perf_counter()
app = build_graph()
return app.invoke({"query": query})
result = app.invoke({"query": query})
result["total_latency_ms"] = round((perf_counter() - started_at) * 1000, 2)
return result
88 changes: 82 additions & 6 deletions backend/agent/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,54 @@

from __future__ import annotations

from time import perf_counter
from typing import Any

from langchain_core.messages import HumanMessage, SystemMessage

from backend.agent.prompts import ANSWER_PROMPT, SYSTEM_PROMPT
from backend.rag.retriever import get_retrieval_source
from backend.tools.doc_tool import retrieve_docs_tool
from backend.tools.log_tool import analyze_logs_tool, search_logs_tool
from backend.tools.metrics_tool import get_metrics_tool


def _updated_latencies(state: dict[str, Any], stage_name: str, started_at: float) -> dict[str, float]:
latencies = dict(state.get("stage_latencies_ms", {}))
latencies[stage_name] = round((perf_counter() - started_at) * 1000, 2)
return latencies


def _normalize_llm_content(content: Any) -> str:
"""Flatten provider-specific content blocks into plain text."""
if isinstance(content, str):
return content

if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, str):
parts.append(item)
elif isinstance(item, dict):
if item.get("type") == "text":
parts.append(str(item.get("text", "")))
else:
parts.append(str(item))
else:
text_value = getattr(item, "text", None)
parts.append(str(text_value if text_value is not None else item))
return "\n".join(part for part in parts if part).strip()

text_value = getattr(content, "text", None)
if text_value is not None:
return str(text_value)

return str(content)


def classify_query(state: dict[str, Any]) -> dict[str, Any]:
"""Route queries to the right tooling using lightweight heuristics."""
started_at = perf_counter()
query = state["query"].lower()
if any(
keyword in query
Expand All @@ -34,43 +70,79 @@ def classify_query(state: dict[str, Any]) -> dict[str, Any]:
intent = "metrics"
else:
intent = "docs"
return {"intent": intent}
return {
"intent": intent,
"stage_latencies_ms": _updated_latencies(state, "classify_query", started_at),
}


def retrieve_context(state: dict[str, Any]) -> dict[str, Any]:
"""Fetch top matching RAG documents."""
started_at = perf_counter()
query = state["query"]
docs = retrieve_docs_tool.invoke({"query": query, "k": 4})
return {"retrieved_context": docs}
retrieval_source = get_retrieval_source()
retrieval_hit = "No matching observability documentation found." not in docs
return {
"retrieved_context": docs,
"retrieval_source": retrieval_source,
"retrieval_hit": retrieval_hit,
"stage_latencies_ms": _updated_latencies(state, "retrieve_context", started_at),
}


def execute_tools(state: dict[str, Any]) -> dict[str, Any]:
"""Run intent-specific tools."""
started_at = perf_counter()
query = state["query"]
intent = state["intent"]
tools_used: list[str]

if intent == "logs":
matches = search_logs_tool.invoke({"query": query, "limit": 5})
tool_output = analyze_logs_tool.invoke({"log_text": matches})
tools_used = ["search_logs", "analyze_logs"]
elif intent == "metrics":
tool_output = get_metrics_tool.invoke({"service_name": "checkout-service"})
tools_used = ["get_metrics"]
else:
tool_output = "Documentation-first query. No runtime tool needed."
tools_used = []

evidence_missing_markers = (
"No sample log file found.",
"No matching log lines found.",
"No matching observability documentation found.",
"No runtime tool needed.",
)
evidence_found = not any(marker in tool_output for marker in evidence_missing_markers)

return {"tool_context": tool_output}
return {
"tool_context": tool_output,
"tools_used": tools_used,
"evidence_found": evidence_found,
"stage_latencies_ms": _updated_latencies(state, "execute_tools", started_at),
}


def generate_answer(state: dict[str, Any], llm: Any) -> dict[str, Any]:
"""Use the LLM to produce the final answer."""
started_at = perf_counter()
if llm is None:
answer = (
f"Diagnosis for `{state['query']}`\n\n"
f"Intent: {state['intent']}\n"
f"Tools used: {', '.join(state.get('tools_used', [])) or 'None'}\n"
f"Retrieval source: {state.get('retrieval_source', 'unknown')}\n"
f"Evidence: {state.get('tool_context', 'No tool output.')}\n"
f"Docs: {state.get('retrieved_context', 'No docs found.')}\n\n"
"Next actions: configure `GROQ_API_KEY` to enable the full LLM reasoning step."
)
return {"answer": answer}
return {
"answer": answer,
"llm_enabled": False,
"stage_latencies_ms": _updated_latencies(state, "generate_answer", started_at),
}

prompt = ANSWER_PROMPT.format(
query=state["query"],
Expand All @@ -84,5 +156,9 @@ def generate_answer(state: dict[str, Any], llm: Any) -> dict[str, Any]:
HumanMessage(content=prompt),
]
)
content = getattr(response, "content", str(response))
return {"answer": content}
content = _normalize_llm_content(getattr(response, "content", str(response)))
return {
"answer": content,
"llm_enabled": True,
"stage_latencies_ms": _updated_latencies(state, "generate_answer", started_at),
}
Loading
Loading