Skip to content

Commit 36eeed9

Browse files
committed
Add LRU capacity to ValidatingCache, remove sentinel pattern, add storage Update
- Introduce pkg/cache.ValidatingCache[K,V]: a fully-typed, capacity-bounded LRU cache using mutex+map+container/list. Replaces the sessionmanager-internal RestorableCache and its sync.Map backing store. - Add cache.ErrExpired, cache.New(capacity, load, check, onEvict). A capacity of 0 keeps the existing unbounded behaviour; non-zero evicts the LRU entry (calling onEvict) when the limit is reached. - Eliminate terminatedSentinel / Peek(any) / Store(any) / CompareAndSwap(any): all cache operations are now fully typed. The no-resurrection invariant is pushed into the storage layer instead. - Add DataStorage.Update (SET XX semantics): overwrites metadata only if the key already exists, returning (false, nil) when absent. Implemented via Redis SetArgs{Mode:"XX"} and a mutex-protected Load+Store in the local in-memory backend. - Replace Terminate's sentinel+rollback dance with a simpler storage.Delete followed by cache.Delete. Concurrent RestoreSession calls fail naturally via storage.Load returning ErrSessionNotFound. - Replace updateMetadata's Load+Upsert+sentinel-check with a single storage.Update call, closing the TOCTOU resurrection window entirely. - Replace DecorateSession's storage.Upsert with storage.Update; a (false, nil) result rolls back the in-cache CAS instead of silently resurrecting a deleted session. - Add FactoryConfig.CacheCapacity to wire the limit through sessionmanager.New. - Remove sessionmanager/cache.go and cache_test.go; cache tests live in pkg/cache/validating_cache_test.go, covering LRU eviction order, capacity=1, unlimited capacity, singleflight deduplication, and liveness expiry. Closes: #4494
1 parent d04eb02 commit 36eeed9

10 files changed

Lines changed: 776 additions & 411 deletions

pkg/cache/validating_cache.go

Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// Package cache provides a generic, capacity-bounded cache with singleflight
5+
// deduplication and per-hit liveness validation.
6+
package cache
7+
8+
import (
9+
"container/list"
10+
"errors"
11+
"fmt"
12+
"sync"
13+
14+
"golang.org/x/sync/singleflight"
15+
)
16+
17+
// ErrExpired is returned by the check function passed to New to signal that a
18+
// cached entry has definitively expired and should be evicted.
19+
var ErrExpired = errors.New("cache entry expired")
20+
21+
// ValidatingCache is a node-local write-through cache backed by a
22+
// capacity-bounded LRU map, with singleflight-deduplicated restore on cache
23+
// miss and lazy liveness validation on cache hit.
24+
//
25+
// Type parameter K is the key type (must be comparable).
26+
// Type parameter V is the cached value type.
27+
//
28+
// Every operation on V is fully typed; there are no sentinel markers or
29+
// untyped any parameters. The no-resurrection invariant (preventing a
30+
// concurrent restore from overwriting a deletion) is enforced at the storage
31+
// layer via a conditional-write Update call rather than by a cache-level
32+
// tombstone.
33+
type ValidatingCache[K comparable, V any] struct {
34+
mu sync.Mutex
35+
entries map[K]*cacheEntry[K, V]
36+
lru *list.List // front=MRU, back=LRU; elements hold *cacheEntry[K, V]
37+
capacity int // 0 means unlimited
38+
39+
flight singleflight.Group
40+
41+
// load is called on a cache miss. Return (value, nil) on success.
42+
// A successful result is stored in the cache before being returned.
43+
load func(key K) (V, error)
44+
45+
// check is called on every cache hit to confirm liveness. Returning nil
46+
// means the entry is alive. Returning ErrExpired means it has definitively
47+
// expired (the entry is evicted and onEvict is called). Any other error is
48+
// treated as a transient failure and the cached value is returned unchanged.
49+
check func(key K) error
50+
51+
// onEvict is called after a confirmed-expired or LRU-evicted entry has been
52+
// removed from the cache. It is always called outside the internal mutex.
53+
// The evicted key and value are passed to allow resource cleanup
54+
// (e.g. closing connections). May be nil.
55+
onEvict func(key K, v V)
56+
}
57+
58+
// cacheEntry holds a single key-value pair and its position in the LRU list.
59+
type cacheEntry[K comparable, V any] struct {
60+
key K
61+
val V
62+
elem *list.Element // pointer to this entry's position in ValidatingCache.lru
63+
}
64+
65+
// New creates a ValidatingCache with the given capacity and callbacks.
66+
//
67+
// capacity is the maximum number of entries. When the cache is full and a new
68+
// entry must be stored, the least-recently-used entry is evicted first.
69+
// A capacity of 0 disables the limit (the cache grows without bound).
70+
// Negative values panic.
71+
//
72+
// load is called on a cache miss to restore the value; it must not be nil.
73+
// check is called on every cache hit to confirm liveness; it must not be nil.
74+
// onEvict is called after any eviction (LRU or expiry); it may be nil.
75+
func New[K comparable, V any](
76+
capacity int,
77+
load func(K) (V, error),
78+
check func(K) error,
79+
onEvict func(K, V),
80+
) *ValidatingCache[K, V] {
81+
if capacity < 0 {
82+
panic(fmt.Sprintf("cache.New: capacity must be >= 0, got %d", capacity))
83+
}
84+
if load == nil {
85+
panic("cache.New: load must not be nil")
86+
}
87+
if check == nil {
88+
panic("cache.New: check must not be nil")
89+
}
90+
return &ValidatingCache[K, V]{
91+
entries: make(map[K]*cacheEntry[K, V]),
92+
lru: list.New(),
93+
capacity: capacity,
94+
load: load,
95+
check: check,
96+
onEvict: onEvict,
97+
}
98+
}
99+
100+
// getHit validates a known-present cache entry and returns its value.
101+
// If the entry has definitively expired it is evicted and (zero, false) is
102+
// returned. Transient check errors leave the entry in place and return the
103+
// cached value.
104+
func (c *ValidatingCache[K, V]) getHit(key K, e *cacheEntry[K, V]) (V, bool) {
105+
if err := c.check(key); err != nil {
106+
if errors.Is(err, ErrExpired) {
107+
var evicted bool
108+
c.mu.Lock()
109+
// Only evict if the entry hasn't been replaced by a concurrent writer.
110+
if cur, exists := c.entries[key]; exists && cur == e {
111+
c.evictEntryLocked(e)
112+
evicted = true
113+
}
114+
c.mu.Unlock()
115+
if evicted && c.onEvict != nil {
116+
c.onEvict(key, e.val)
117+
}
118+
var zero V
119+
return zero, false
120+
}
121+
}
122+
return e.val, true
123+
}
124+
125+
// Get returns the value for key, loading it on a cache miss. On a cache hit
126+
// the entry's liveness is validated via the check function provided to New:
127+
// ErrExpired evicts the entry and returns (zero, false); transient errors
128+
// return the cached value unchanged. On a cache miss, load is called under a
129+
// singleflight group so at most one restore runs concurrently per key.
130+
func (c *ValidatingCache[K, V]) Get(key K) (V, bool) {
131+
c.mu.Lock()
132+
e, hit := c.entries[key]
133+
if hit {
134+
c.lru.MoveToFront(e.elem)
135+
}
136+
c.mu.Unlock()
137+
138+
if hit {
139+
return c.getHit(key, e)
140+
}
141+
142+
// Cache miss: use singleflight to prevent concurrent restores for the same key.
143+
type result struct{ v V }
144+
raw, err, _ := c.flight.Do(fmt.Sprint(key), func() (any, error) {
145+
// Re-check the cache: a concurrent singleflight group may have stored
146+
// the value between our miss check above and acquiring this group.
147+
c.mu.Lock()
148+
if existing, ok := c.entries[key]; ok {
149+
c.lru.MoveToFront(existing.elem)
150+
v := existing.val
151+
c.mu.Unlock()
152+
return result{v: v}, nil
153+
}
154+
c.mu.Unlock()
155+
156+
v, loadErr := c.load(key)
157+
if loadErr != nil {
158+
return nil, loadErr
159+
}
160+
161+
// Guard against a concurrent Set/Delete that occurred while load() was
162+
// running. Store only if the key is still absent; if another writer got
163+
// in first, return their value and discard ours via onEvict.
164+
c.mu.Lock()
165+
if existing, exists := c.entries[key]; exists {
166+
winner := existing.val
167+
c.lru.MoveToFront(existing.elem)
168+
c.mu.Unlock()
169+
if c.onEvict != nil {
170+
c.onEvict(key, v)
171+
}
172+
return result{v: winner}, nil
173+
}
174+
evicted := c.storeLocked(key, v)
175+
c.mu.Unlock()
176+
177+
if evicted != nil && c.onEvict != nil {
178+
c.onEvict(evicted.key, evicted.val)
179+
}
180+
181+
return result{v: v}, nil
182+
})
183+
if err != nil {
184+
var zero V
185+
return zero, false
186+
}
187+
r, ok := raw.(result)
188+
return r.v, ok
189+
}
190+
191+
// Set stores value under key, moving the entry to the MRU position. If the
192+
// cache is at capacity, the least-recently-used entry is evicted first and
193+
// onEvict is called for it.
194+
func (c *ValidatingCache[K, V]) Set(key K, value V) {
195+
c.mu.Lock()
196+
// If key already exists, update in place without changing capacity.
197+
if e, ok := c.entries[key]; ok {
198+
e.val = value
199+
c.lru.MoveToFront(e.elem)
200+
c.mu.Unlock()
201+
return
202+
}
203+
evicted := c.storeLocked(key, value)
204+
c.mu.Unlock()
205+
206+
if evicted != nil && c.onEvict != nil {
207+
c.onEvict(evicted.key, evicted.val)
208+
}
209+
}
210+
211+
// Delete removes key from the cache.
212+
func (c *ValidatingCache[K, V]) Delete(key K) {
213+
c.mu.Lock()
214+
if e, ok := c.entries[key]; ok {
215+
c.evictEntryLocked(e)
216+
}
217+
c.mu.Unlock()
218+
}
219+
220+
// Peek returns the cached V value for key without updating the LRU order,
221+
// running a liveness check, or triggering a restore on a cache miss.
222+
// Used by callers that need to read the current in-memory value without
223+
// side effects (e.g. reading metadata for comparison inside a liveness check).
224+
func (c *ValidatingCache[K, V]) Peek(key K) (V, bool) {
225+
c.mu.Lock()
226+
e, ok := c.entries[key]
227+
c.mu.Unlock()
228+
if !ok {
229+
var zero V
230+
return zero, false
231+
}
232+
return e.val, true
233+
}
234+
235+
// CompareAndSwap atomically replaces the value stored under key from old to
236+
// replacement. The comparison uses == semantics: for interface types this
237+
// means pointer equality when the dynamic value is a pointer. Returns true if
238+
// the swap was performed. Returns false (without panicking) if V's dynamic
239+
// type is not comparable (e.g. slices, maps, funcs).
240+
func (c *ValidatingCache[K, V]) CompareAndSwap(key K, old, replacement V) (swapped bool) {
241+
c.mu.Lock()
242+
defer c.mu.Unlock()
243+
e, ok := c.entries[key]
244+
if !ok {
245+
return false
246+
}
247+
// Compare via any to use interface == semantics (pointer equality for
248+
// pointer-backed types, which is correct for the session manager use case).
249+
// Guard with recover in case V's dynamic type is not comparable.
250+
defer func() {
251+
if recover() != nil {
252+
swapped = false
253+
}
254+
}()
255+
if any(e.val) != any(old) {
256+
return false
257+
}
258+
e.val = replacement
259+
c.lru.MoveToFront(e.elem)
260+
return true
261+
}
262+
263+
// Len returns the number of entries currently in the cache.
264+
func (c *ValidatingCache[K, V]) Len() int {
265+
c.mu.Lock()
266+
defer c.mu.Unlock()
267+
return len(c.entries)
268+
}
269+
270+
// storeLocked inserts a new entry and returns the evicted LRU entry (or nil if
271+
// no eviction was needed). Must be called with c.mu held.
272+
func (c *ValidatingCache[K, V]) storeLocked(key K, value V) *cacheEntry[K, V] {
273+
var evicted *cacheEntry[K, V]
274+
if c.capacity > 0 && len(c.entries) >= c.capacity {
275+
evicted = c.evictLRULocked()
276+
}
277+
e := &cacheEntry[K, V]{key: key, val: value}
278+
e.elem = c.lru.PushFront(e)
279+
c.entries[key] = e
280+
return evicted
281+
}
282+
283+
// evictEntryLocked removes the given entry from the map and LRU list.
284+
// Must be called with c.mu held.
285+
func (c *ValidatingCache[K, V]) evictEntryLocked(e *cacheEntry[K, V]) {
286+
c.lru.Remove(e.elem)
287+
delete(c.entries, e.key)
288+
}
289+
290+
// evictLRULocked removes and returns the least-recently-used entry.
291+
// Returns nil if the cache is empty. Must be called with c.mu held.
292+
func (c *ValidatingCache[K, V]) evictLRULocked() *cacheEntry[K, V] {
293+
back := c.lru.Back()
294+
if back == nil {
295+
return nil
296+
}
297+
e := back.Value.(*cacheEntry[K, V])
298+
c.evictEntryLocked(e)
299+
return e
300+
}

0 commit comments

Comments
 (0)