Skip to content

Commit b8208ba

Browse files
committed
perf: harden prompt cache and add llm_call observability
1 parent ae1ed98 commit b8208ba

13 files changed

Lines changed: 527 additions & 24 deletions

internal/agent/cache_monitor.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package agent
2+
3+
import (
4+
"hash/fnv"
5+
"sort"
6+
7+
"github.com/voocel/agentcore"
8+
"github.com/voocel/codebot/internal/storage"
9+
)
10+
11+
// cacheSnapshot captures enough state about one LLM request to diagnose a
12+
// cache-break on the following turn. It does NOT store the request itself —
13+
// only cheap fingerprints of the inputs and the observed cache_read figure.
14+
type cacheSnapshot struct {
15+
SystemHash uint64
16+
ToolsHash uint64
17+
CacheReadTokens int
18+
Valid bool // false before the first turn
19+
}
20+
21+
// breakDropFraction is the minimum relative drop in cache_read (vs previous
22+
// turn) that we treat as a "break". Mirrors Claude Code's 5% heuristic.
23+
const breakDropFraction = 0.05
24+
25+
// breakDropAbsolute is the minimum absolute token drop to avoid false positives
26+
// at small context sizes. Mirrors Claude Code's 2000-token threshold.
27+
const breakDropAbsolute = 2000
28+
29+
// hashSystemBlocks returns a stable fingerprint for the current system prompt.
30+
// The fingerprint covers every block's text; cache_control metadata is ignored
31+
// because flipping a cache-control TTL should not look like a content change.
32+
func hashSystemBlocks(blocks []agentcore.SystemBlock) uint64 {
33+
h := fnv.New64a()
34+
for _, b := range blocks {
35+
h.Write([]byte{0}) // block separator
36+
h.Write([]byte(b.Text))
37+
}
38+
return h.Sum64()
39+
}
40+
41+
// hashTools returns a stable fingerprint for the current tool set as the LLM
42+
// sees it: sorted by name, each entry contributes name + description. Schema
43+
// is excluded — changing it means breaking cache, and we want the fingerprint
44+
// to reflect "same name, same caller contract" rather than incidental schema
45+
// reorderings.
46+
func hashTools(tools []agentcore.Tool) uint64 {
47+
names := make([]string, 0, len(tools))
48+
byName := make(map[string]agentcore.Tool, len(tools))
49+
for _, t := range tools {
50+
n := t.Name()
51+
if _, seen := byName[n]; seen {
52+
continue
53+
}
54+
byName[n] = t
55+
names = append(names, n)
56+
}
57+
sort.Strings(names)
58+
59+
h := fnv.New64a()
60+
for _, n := range names {
61+
h.Write([]byte{0})
62+
h.Write([]byte(n))
63+
h.Write([]byte{1})
64+
h.Write([]byte(byName[n].Description()))
65+
}
66+
return h.Sum64()
67+
}
68+
69+
// detectCacheBreak decides whether the observed cache_read drop between
70+
// snapshots is large enough to record, and if so produces a structured
71+
// explanation. Returns nil when no break is detected.
72+
func detectCacheBreak(prev, curr cacheSnapshot) *storage.CacheBreakInfo {
73+
if !prev.Valid {
74+
return nil
75+
}
76+
// A turn with zero previous cache_read has no baseline to drop from.
77+
if prev.CacheReadTokens <= 0 {
78+
return nil
79+
}
80+
dropAbs := prev.CacheReadTokens - curr.CacheReadTokens
81+
if dropAbs < breakDropAbsolute {
82+
return nil
83+
}
84+
frac := float64(dropAbs) / float64(prev.CacheReadTokens)
85+
if frac < breakDropFraction {
86+
return nil
87+
}
88+
89+
info := &storage.CacheBreakInfo{
90+
PrevCacheReadTokens: prev.CacheReadTokens,
91+
CurrCacheReadTokens: curr.CacheReadTokens,
92+
DropAbsolute: dropAbs,
93+
DropFraction: frac,
94+
SystemChanged: prev.SystemHash != curr.SystemHash,
95+
ToolsChanged: prev.ToolsHash != curr.ToolsHash,
96+
}
97+
switch {
98+
case info.SystemChanged && info.ToolsChanged:
99+
info.Note = "system prompt and tool set both changed"
100+
case info.SystemChanged:
101+
info.Note = "system prompt changed between turns"
102+
case info.ToolsChanged:
103+
info.Note = "tool set changed between turns"
104+
default:
105+
info.Note = "no input change detected (TTL expiry, provider-side miss, or cache_control not honored by provider)"
106+
}
107+
return info
108+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package agent
2+
3+
import (
4+
"testing"
5+
6+
"github.com/voocel/agentcore"
7+
)
8+
9+
func TestHashSystemBlocksStableAcrossCallOrder(t *testing.T) {
10+
t.Parallel()
11+
12+
a := []agentcore.SystemBlock{{Text: "identity"}, {Text: "instructions"}}
13+
b := []agentcore.SystemBlock{{Text: "identity"}, {Text: "instructions"}}
14+
if hashSystemBlocks(a) != hashSystemBlocks(b) {
15+
t.Fatalf("identical blocks must hash to the same value")
16+
}
17+
18+
c := []agentcore.SystemBlock{{Text: "identity"}, {Text: "instructions v2"}}
19+
if hashSystemBlocks(a) == hashSystemBlocks(c) {
20+
t.Fatalf("differing text must change the hash")
21+
}
22+
}
23+
24+
func TestHashSystemBlocksIgnoresCacheControl(t *testing.T) {
25+
t.Parallel()
26+
27+
a := []agentcore.SystemBlock{{Text: "identity", CacheControl: "ephemeral"}}
28+
b := []agentcore.SystemBlock{{Text: "identity"}}
29+
if hashSystemBlocks(a) != hashSystemBlocks(b) {
30+
t.Fatalf("cache_control metadata must not influence the fingerprint")
31+
}
32+
}
33+
34+
func TestHashToolsSortsByName(t *testing.T) {
35+
t.Parallel()
36+
37+
forward := []agentcore.Tool{&stubTool{name: "read", desc: "read files"}, &stubTool{name: "bash", desc: "run shell"}}
38+
reverse := []agentcore.Tool{&stubTool{name: "bash", desc: "run shell"}, &stubTool{name: "read", desc: "read files"}}
39+
if hashTools(forward) != hashTools(reverse) {
40+
t.Fatalf("tool ordering must not affect the fingerprint")
41+
}
42+
}
43+
44+
func TestDetectCacheBreakNilWhenNoBaseline(t *testing.T) {
45+
t.Parallel()
46+
47+
info := detectCacheBreak(cacheSnapshot{Valid: false}, cacheSnapshot{Valid: true, CacheReadTokens: 5000})
48+
if info != nil {
49+
t.Fatalf("no baseline should suppress detection, got %+v", info)
50+
}
51+
}
52+
53+
func TestDetectCacheBreakIgnoresSmallDrops(t *testing.T) {
54+
t.Parallel()
55+
56+
// 4% drop, well below breakDropFraction.
57+
prev := cacheSnapshot{Valid: true, CacheReadTokens: 100000}
58+
curr := cacheSnapshot{Valid: true, CacheReadTokens: 96000, SystemHash: prev.SystemHash}
59+
if info := detectCacheBreak(prev, curr); info != nil {
60+
t.Fatalf("small drop should not trigger, got %+v", info)
61+
}
62+
63+
// 10% drop but absolute delta below 2000.
64+
prev = cacheSnapshot{Valid: true, CacheReadTokens: 10000}
65+
curr = cacheSnapshot{Valid: true, CacheReadTokens: 9000}
66+
if info := detectCacheBreak(prev, curr); info != nil {
67+
t.Fatalf("absolute drop below threshold should not trigger, got %+v", info)
68+
}
69+
}
70+
71+
func TestDetectCacheBreakReportsSystemChange(t *testing.T) {
72+
t.Parallel()
73+
74+
prev := cacheSnapshot{Valid: true, SystemHash: 1, ToolsHash: 2, CacheReadTokens: 50000}
75+
curr := cacheSnapshot{Valid: true, SystemHash: 9, ToolsHash: 2, CacheReadTokens: 0}
76+
info := detectCacheBreak(prev, curr)
77+
if info == nil {
78+
t.Fatal("expected a cache break to be reported")
79+
}
80+
if !info.SystemChanged || info.ToolsChanged {
81+
t.Fatalf("expected system-only change, got %+v", info)
82+
}
83+
if info.DropAbsolute != 50000 {
84+
t.Fatalf("drop_absolute = %d, want 50000", info.DropAbsolute)
85+
}
86+
if info.Note == "" {
87+
t.Fatal("note should be populated so logs are self-describing")
88+
}
89+
}
90+
91+
func TestDetectCacheBreakReportsUnknownWhenHashesMatch(t *testing.T) {
92+
t.Parallel()
93+
94+
prev := cacheSnapshot{Valid: true, SystemHash: 1, ToolsHash: 2, CacheReadTokens: 80000}
95+
curr := cacheSnapshot{Valid: true, SystemHash: 1, ToolsHash: 2, CacheReadTokens: 0}
96+
info := detectCacheBreak(prev, curr)
97+
if info == nil {
98+
t.Fatal("expected a break to be reported even without hash diffs")
99+
}
100+
if info.SystemChanged || info.ToolsChanged {
101+
t.Fatalf("expected neither hash to flip, got %+v", info)
102+
}
103+
}
104+
105+
func TestCompactionEventInvalidatesCacheBaseline(t *testing.T) {
106+
t.Parallel()
107+
108+
s := &Session{cacheSnap: cacheSnapshot{Valid: true, CacheReadTokens: 50000, SystemHash: 1}}
109+
110+
// An "unchanged" compaction must not reset the baseline — no rewrite happened.
111+
s.emit(SessionEvent{Type: SEAutoCompactionEnd, CompactionChanged: false})
112+
if !s.cacheSnap.Valid || s.cacheSnap.CacheReadTokens != 50000 {
113+
t.Fatalf("unchanged compaction should preserve baseline, got %+v", s.cacheSnap)
114+
}
115+
116+
// A real compaction rewrites the prefix; the baseline must be dropped so
117+
// the next turn's expected cache_read drop is not misreported.
118+
s.emit(SessionEvent{Type: SEAutoCompactionEnd, CompactionChanged: true})
119+
if s.cacheSnap.Valid || s.cacheSnap.CacheReadTokens != 0 {
120+
t.Fatalf("changed compaction should invalidate baseline, got %+v", s.cacheSnap)
121+
}
122+
}

internal/agent/session.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ type Session struct {
106106
lazyPersist bool
107107
pendingUserMsg []agentcore.Message
108108
autoNamed bool
109+
lastAssistantStart time.Time // set at EventMessageStart (assistant), consumed at EventMessageEnd for latency_ms
110+
cacheSnap cacheSnapshot // previous turn's system/tools fingerprint + cache_read, updated after every LLM call
109111
pendingToolCalls map[string]pendingToolCall
110112
recentToolCalls []toolCallFingerprint
111113
pendingReminderContinue bool

internal/agent/session_prompt.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,12 @@ func (m *sessionPromptManager) rebuildPrompt() {
207207
m.session.mu.Lock()
208208
orderedSkills := skill.OrderForPrompt(m.session.skills, m.session.cwd, m.session.skillUsageScoresLocked())
209209
m.session.staticReminders = config.BuildReminders(m.session.contextFiles, orderedSkills)
210+
// Refresh cache-break fingerprints. CacheReadTokens / Valid are owned by
211+
// persistLLMCall — only the input hashes are updated here, so a prompt
212+
// rebuild mid-session leaves the "previous observed cache_read" intact
213+
// and the next turn can still detect a drop.
214+
m.session.cacheSnap.SystemHash = hashSystemBlocks(blocks)
215+
m.session.cacheSnap.ToolsHash = hashTools(m.session.activeTools)
210216
m.session.mu.Unlock()
211217
}
212218

internal/agent/session_state.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/voocel/agentcore"
1111
"github.com/voocel/codebot/internal/skill"
12+
"github.com/voocel/codebot/internal/storage"
1213
)
1314

1415
type sessionPersistence struct {
@@ -42,6 +43,14 @@ func (s *Session) Subscribe(fn func(SessionEvent)) func() {
4243
func (s *Session) handleAgentEvent(ev agentcore.Event) {
4344
s.runtime.handleEvent(ev)
4445

46+
if ev.Type == agentcore.EventMessageStart {
47+
if msg, ok := ev.Message.(agentcore.Message); ok && msg.Role == agentcore.RoleAssistant {
48+
s.mu.Lock()
49+
s.lastAssistantStart = time.Now()
50+
s.mu.Unlock()
51+
}
52+
}
53+
4554
if ev.Type == agentcore.EventMessageEnd {
4655
if msg, ok := ev.Message.(agentcore.Message); ok {
4756
s.persistence.handleMessageEnd(msg)
@@ -96,6 +105,16 @@ func (s *Session) emit(ev SessionEvent) {
96105
if ev.AgentEvent != nil && ev.AgentEvent.Type == agentcore.EventError {
97106
s.recordErrorDiagnostic(ev.AgentEvent.Err)
98107
}
108+
case SEAutoCompactionEnd:
109+
if ev.CompactionChanged {
110+
// A compaction rewrites the prompt prefix: the next turn's
111+
// cache_read drop is expected, not a bug. Invalidate the cache
112+
// baseline so detectCacheBreak does not flag it as a break.
113+
s.mu.Lock()
114+
s.cacheSnap.CacheReadTokens = 0
115+
s.cacheSnap.Valid = false
116+
s.mu.Unlock()
117+
}
99118
}
100119

101120
s.mu.Lock()
@@ -238,10 +257,69 @@ func (p *sessionPersistence) handleMessageEnd(msg agentcore.Message) {
238257
p.persistMessage(msg)
239258

240259
if msg.Role == agentcore.RoleAssistant {
260+
p.persistLLMCall(msg)
241261
p.tryAutoName()
242262
}
243263
}
244264

265+
// persistLLMCall writes a per-turn observability record for the just-finished
266+
// assistant response. Non-fatal: logging failures are surfaced via SEError but
267+
// never block the session. Skipped when usage is empty (e.g. recovered message).
268+
func (p *sessionPersistence) persistLLMCall(msg agentcore.Message) {
269+
if msg.Usage == nil {
270+
return
271+
}
272+
u := msg.Usage
273+
if u.Input == 0 && u.Output == 0 && u.TotalTokens == 0 {
274+
return
275+
}
276+
277+
p.session.mu.Lock()
278+
store := p.session.store
279+
start := p.session.lastAssistantStart
280+
p.session.lastAssistantStart = time.Time{}
281+
provider := p.session.provider
282+
model := p.session.modelName
283+
thinking := p.session.settings.ThinkingLevel
284+
prevSnap := p.session.cacheSnap
285+
currSnap := cacheSnapshot{
286+
SystemHash: p.session.cacheSnap.SystemHash,
287+
ToolsHash: p.session.cacheSnap.ToolsHash,
288+
CacheReadTokens: u.CacheRead,
289+
Valid: true,
290+
}
291+
p.session.cacheSnap = currSnap
292+
p.session.mu.Unlock()
293+
294+
if store == nil {
295+
return
296+
}
297+
298+
var latencyMs int64
299+
if !start.IsZero() {
300+
latencyMs = time.Since(start).Milliseconds()
301+
}
302+
entry := storage.LLMCallEntry{
303+
Provider: provider,
304+
Model: model,
305+
InputTokens: u.Input,
306+
OutputTokens: u.Output,
307+
CacheReadTokens: u.CacheRead,
308+
CacheCreationTokens: u.CacheWrite,
309+
TotalTokens: u.TotalTokens,
310+
LatencyMs: latencyMs,
311+
StopReason: string(msg.StopReason),
312+
ThinkingLevel: thinking,
313+
CacheBreak: detectCacheBreak(prevSnap, currSnap),
314+
}
315+
if err := store.AppendLLMCall(entry); err != nil {
316+
p.session.emit(SessionEvent{
317+
Type: SEError,
318+
Error: fmt.Errorf("persist llm_call: %w", err),
319+
})
320+
}
321+
}
322+
245323
func (p *sessionPersistence) persistMessage(msg agentcore.Message) {
246324
p.session.mu.Lock()
247325
store := p.session.store

0 commit comments

Comments
 (0)