Skip to content

Commit b7fae72

Browse files
committed
Persist restored session metadata to Redis in loadSession
After a successful RestoreSession call, write the restored session's metadata back to Redis so per-backend session IDs stay current. Backends that ignore Mcp-Session-Id hints (e.g. SSE transports) assign a fresh ID on every restore; without this write the stale IDs persisted in Redis indefinitely. With Redis kept consistent, revert checkSession's comparison from the narrowed MetadataKeyBackendIDs-only workaround (introduced in #4724) back to maps.Equal, restoring full cross-pod consistency detection for any metadata drift. Closes #4836
1 parent 2fd2d3b commit b7fae72

2 files changed

Lines changed: 246 additions & 9 deletions

File tree

pkg/vmcp/server/sessionmanager/session_manager.go

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,18 @@ const createSessionStorageTimeout = 5 * time.Second
182182
// timeout bounds how long a slow or unreachable Redis can stall a request goroutine.
183183
const validateTimeout = 3 * time.Second
184184

185-
// restoreStorageTimeout bounds the storage.Load call in the GetMultiSession
186-
// cache-miss restore path. The operation is a single Redis GETEX, so 3 s is
187-
// generous.
185+
// restoreStorageTimeout bounds storage.Load calls (GETEX) in the
186+
// GetMultiSession restore path (loadSession) and in the checkSession liveness
187+
// check. Both are single-key Redis reads; 3 s is generous.
188188
const restoreStorageTimeout = 3 * time.Second
189189

190+
// restoreMetadataWriteTimeout bounds the storage.Update call that persists
191+
// the restored session's metadata back to Redis after a successful
192+
// RestoreSession. Single-key Redis SET XX operation; 5 s is consistent with
193+
// other write timeouts (createSessionStorageTimeout, terminateTimeout,
194+
// decorateTimeout, notifyBackendExpiredTimeout).
195+
const restoreMetadataWriteTimeout = 5 * time.Second
196+
190197
// restoreSessionTimeout bounds factory.RestoreSession in the GetMultiSession
191198
// cache-miss path. RestoreSession opens HTTP connections to each backend, so
192199
// we allow more time than a simple storage read. Aligned with discoveryTimeout
@@ -658,12 +665,13 @@ func (sm *Manager) checkSession(sessionID string, sess vmcpsession.MultiSession)
658665
//
659666
// We intentionally compare only MetadataKeyBackendIDs rather than the full
660667
// metadata map. Per-backend session IDs (MetadataKeyBackendSessionPrefix+*)
661-
// are the session IDs negotiated by the restoring pod's backend connections.
662-
// RestoreSession sends the stored IDs as Mcp-Session-Id hints, so a backend
663-
// that honors session resumption will return the same ID — but not all backends
664-
// do (e.g. SSE transports have no session ID at all). Comparing the full map
665-
// would evict on every cross-pod cache hit whenever any backend assigns a
666-
// fresh ID, preventing tools from ever being served.
668+
// are the session IDs negotiated by each pod's independent RestoreSession call.
669+
// Backends that do not honor Mcp-Session-Id hints (e.g. SSE transports, some
670+
// StreamableHTTP backends) assign a fresh ID on every restore, so different pods
671+
// legitimately hold different per-backend IDs for the same session. Comparing
672+
// the full map would cause each pod's loadSession write-back to invalidate all
673+
// other pods' cached sessions, creating an infinite eviction storm that prevents
674+
// tools from ever being served in multi-pod deployments.
667675
sessBackendIDs := sess.GetMetadata()[vmcpsession.MetadataKeyBackendIDs]
668676
if sessBackendIDs != metadata[vmcpsession.MetadataKeyBackendIDs] {
669677
return cache.ErrExpired
@@ -715,6 +723,37 @@ func (sm *Manager) loadSession(sessionID string) (vmcpsession.MultiSession, erro
715723
return nil, restoreErr
716724
}
717725

726+
// Persist the restored session's metadata back to Redis so that
727+
// per-backend session IDs are kept current. Backends that do not honor
728+
// Mcp-Session-Id hints (e.g. SSE transports) assign a fresh ID on every
729+
// restore; without this write the stale IDs would persist in Redis
730+
// indefinitely.
731+
//
732+
// We use Update (SET XX) rather than Upsert so we never resurrect a key
733+
// that was concurrently deleted (Terminate / TTL expiry). A (false, nil)
734+
// result means the key is already gone — treat it as not found so the
735+
// cache never serves a session that no longer exists in storage.
736+
updateCtx, updateCancel := context.WithTimeout(context.Background(), restoreMetadataWriteTimeout)
737+
defer updateCancel()
738+
updated, updateErr := sm.storage.Update(updateCtx, sessionID, restored.GetMetadata())
739+
if updateErr != nil {
740+
slog.Warn("Manager.loadSession: failed to persist restored session metadata",
741+
"session_id", sessionID, "error", updateErr)
742+
// Non-fatal: the session is still usable on this pod. checkSession
743+
// will detect metadata drift on the next liveness check and evict,
744+
// triggering a fresh restore that will retry the write.
745+
} else if !updated {
746+
// Session was concurrently deleted (Terminate / TTL expiry) between
747+
// RestoreSession and this write — do not cache the restored session.
748+
slog.Debug("Manager.loadSession: session already gone before metadata could be persisted; treating as not found",
749+
"session_id", sessionID)
750+
if closeErr := restored.Close(); closeErr != nil {
751+
slog.Warn("Manager.loadSession: failed to close restored session after concurrent deletion",
752+
"session_id", sessionID, "error", closeErr)
753+
}
754+
return nil, transportsession.ErrSessionNotFound
755+
}
756+
718757
slog.Debug("Manager.loadSession: restored session from storage", "session_id", sessionID)
719758
return restored, nil
720759
}

pkg/vmcp/server/sessionmanager/session_manager_test.go

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,39 @@ func (s *configurableFailDataStorage) Delete(ctx context.Context, id string) err
132132
return s.DataStorage.Delete(ctx, id)
133133
}
134134

135+
// deleteBeforeUpdateStorage wraps a real DataStorage and deletes the key
136+
// from the underlying store on the first Update call, simulating a concurrent
137+
// Terminate / TTL expiry that races with loadSession's metadata write-back.
138+
// The Update then returns (false, nil) because the key no longer exists.
139+
type deleteBeforeUpdateStorage struct {
140+
transportsession.DataStorage
141+
deleted bool
142+
}
143+
144+
func (s *deleteBeforeUpdateStorage) Update(ctx context.Context, id string, metadata map[string]string) (bool, error) {
145+
if !s.deleted {
146+
s.deleted = true
147+
_ = s.Delete(ctx, id)
148+
}
149+
return s.DataStorage.Update(ctx, id, metadata)
150+
}
151+
152+
// errorOnUpdateStorage wraps a real DataStorage and returns an error on the
153+
// first Update call, simulating a transient Redis write failure during
154+
// loadSession's metadata write-back.
155+
type errorOnUpdateStorage struct {
156+
transportsession.DataStorage
157+
errored bool
158+
}
159+
160+
func (s *errorOnUpdateStorage) Update(_ context.Context, _ string, _ map[string]string) (bool, error) {
161+
if !s.errored {
162+
s.errored = true
163+
return false, errors.New("injected Update failure")
164+
}
165+
return true, nil
166+
}
167+
135168
// fakeBackendRegistry is a simple BackendRegistry for tests.
136169
type fakeBackendRegistry struct {
137170
backends []vmcp.Backend
@@ -933,6 +966,137 @@ func TestSessionManager_GetMultiSession(t *testing.T) {
933966
require.NotNil(t, multiSess)
934967
assert.Equal(t, sessionID, multiSess.ID())
935968
})
969+
970+
t.Run("restore path: restored metadata is persisted back to storage", func(t *testing.T) {
971+
t.Parallel()
972+
973+
// Simulate a backend that doesn't honor Mcp-Session-Id hints (e.g. SSE
974+
// transport): RestoreSession assigns a fresh per-backend session ID.
975+
// loadSession must write the restored session's metadata back to Redis so
976+
// that stale per-backend session IDs do not persist indefinitely in storage.
977+
ctrl := gomock.NewController(t)
978+
factory := sessionfactorymocks.NewMockMultiSessionFactory(ctrl)
979+
factory.EXPECT().MakeSessionWithID(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
980+
Times(0)
981+
982+
sessionID := "restore-metadata-persist-session"
983+
984+
// The restored session returns fresh per-backend session metadata.
985+
freshMeta := map[string]string{
986+
sessiontypes.MetadataKeyTokenHash: "",
987+
vmcpsession.MetadataKeyBackendIDs: "backend-a",
988+
vmcpsession.MetadataKeyBackendSessionPrefix + "backend-a": "fresh-session-id",
989+
}
990+
restored := sessionmocks.NewMockMultiSession(ctrl)
991+
restored.EXPECT().ID().Return(sessionID).AnyTimes()
992+
restored.EXPECT().GetMetadata().Return(freshMeta).AnyTimes()
993+
994+
factory.EXPECT().
995+
RestoreSession(gomock.Any(), sessionID, gomock.Any(), gomock.Any()).
996+
Return(restored, nil).Times(1)
997+
998+
sm, storage := newTestSessionManager(t, factory, newFakeRegistry())
999+
1000+
// Seed storage with stale per-backend session ID.
1001+
staleMeta := map[string]string{
1002+
sessiontypes.MetadataKeyTokenHash: "",
1003+
vmcpsession.MetadataKeyBackendIDs: "backend-a",
1004+
vmcpsession.MetadataKeyBackendSessionPrefix + "backend-a": "stale-session-id",
1005+
}
1006+
_, err := sm.storage.Create(context.Background(), sessionID, staleMeta)
1007+
require.NoError(t, err)
1008+
1009+
multiSess, ok := sm.GetMultiSession(sessionID)
1010+
require.True(t, ok, "session must be restored")
1011+
require.NotNil(t, multiSess)
1012+
1013+
// Verify storage now contains the fresh metadata written by loadSession.
1014+
storedMeta, loadErr := storage.Load(context.Background(), sessionID)
1015+
require.NoError(t, loadErr)
1016+
assert.Equal(t, freshMeta, storedMeta,
1017+
"loadSession must persist restored session metadata back to storage")
1018+
})
1019+
1020+
t.Run("restore path: concurrent delete between RestoreSession and Update returns ErrSessionNotFound", func(t *testing.T) {
1021+
t.Parallel()
1022+
1023+
// Simulate a Terminate / TTL expiry that races with loadSession's
1024+
// metadata write-back: deleteBeforeUpdateStorage deletes the key just
1025+
// before the first Update, so Update returns (false, nil).
1026+
// loadSession must treat this as ErrSessionNotFound and NOT cache the
1027+
// restored session.
1028+
ctrl := gomock.NewController(t)
1029+
factory := sessionfactorymocks.NewMockMultiSessionFactory(ctrl)
1030+
factory.EXPECT().MakeSessionWithID(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
1031+
Times(0)
1032+
1033+
sessionID := "restore-concurrent-delete-session"
1034+
restored := sessionmocks.NewMockMultiSession(ctrl)
1035+
restored.EXPECT().ID().Return(sessionID).AnyTimes()
1036+
restored.EXPECT().GetMetadata().Return(map[string]string{
1037+
sessiontypes.MetadataKeyTokenHash: "",
1038+
}).AnyTimes()
1039+
1040+
factory.EXPECT().
1041+
RestoreSession(gomock.Any(), sessionID, gomock.Any(), gomock.Any()).
1042+
Return(restored, nil).Times(1)
1043+
1044+
// Build Manager with the wrapping storage.
1045+
innerStorage := newTestSessionDataStorage(t)
1046+
racyStorage := &deleteBeforeUpdateStorage{DataStorage: innerStorage}
1047+
sm, cleanup, err := New(racyStorage, &FactoryConfig{Base: factory, CacheCapacity: 1000}, newFakeRegistry())
1048+
require.NoError(t, err)
1049+
t.Cleanup(func() { _ = cleanup(context.Background()) })
1050+
1051+
// Seed the inner storage with a valid session record.
1052+
_, err = innerStorage.Create(context.Background(), sessionID, map[string]string{
1053+
sessiontypes.MetadataKeyTokenHash: "",
1054+
})
1055+
require.NoError(t, err)
1056+
1057+
// GetMultiSession triggers loadSession; the racing delete causes
1058+
// Update to return (false, nil) → ErrSessionNotFound → (nil, false).
1059+
multiSess, ok := sm.GetMultiSession(sessionID)
1060+
assert.False(t, ok, "session deleted before metadata write-back must not be cached")
1061+
assert.Nil(t, multiSess)
1062+
})
1063+
1064+
t.Run("restore path: transient Update error is non-fatal, session is still returned", func(t *testing.T) {
1065+
t.Parallel()
1066+
1067+
// A transient Redis write failure during loadSession's metadata write-back
1068+
// must not prevent the restored session from being cached and served.
1069+
// The session is still usable on this pod; checkSession will detect any
1070+
// metadata drift on the next liveness check and evict if necessary.
1071+
ctrl := gomock.NewController(t)
1072+
factory := sessionfactorymocks.NewMockMultiSessionFactory(ctrl)
1073+
factory.EXPECT().MakeSessionWithID(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
1074+
Times(0)
1075+
1076+
sessionID := "restore-update-error-session"
1077+
restored := newMockSession(t, ctrl, sessionID, nil)
1078+
1079+
factory.EXPECT().
1080+
RestoreSession(gomock.Any(), sessionID, gomock.Any(), gomock.Any()).
1081+
Return(restored, nil).Times(1)
1082+
1083+
innerStorage := newTestSessionDataStorage(t)
1084+
faultyStorage := &errorOnUpdateStorage{DataStorage: innerStorage}
1085+
sm, cleanup, err := New(faultyStorage, &FactoryConfig{Base: factory, CacheCapacity: 1000}, newFakeRegistry())
1086+
require.NoError(t, err)
1087+
t.Cleanup(func() { _ = cleanup(context.Background()) })
1088+
1089+
_, err = innerStorage.Create(context.Background(), sessionID, map[string]string{
1090+
sessiontypes.MetadataKeyTokenHash: "",
1091+
})
1092+
require.NoError(t, err)
1093+
1094+
// Write failure must be non-fatal: session is still returned and cached.
1095+
multiSess, ok := sm.GetMultiSession(sessionID)
1096+
assert.True(t, ok, "transient Update error must not prevent session from being served")
1097+
assert.NotNil(t, multiSess)
1098+
assert.Equal(t, sessionID, multiSess.ID())
1099+
})
9361100
}
9371101

9381102
// ---------------------------------------------------------------------------
@@ -2087,6 +2251,40 @@ func TestSessionManager_CheckSession(t *testing.T) {
20872251
err = sm.checkSession(sessionID, cached)
20882252
assert.NoError(t, err, "matching empty metadata must not cause eviction")
20892253
})
2254+
2255+
t.Run("differing per-backend session IDs do not evict", func(t *testing.T) {
2256+
t.Parallel()
2257+
// In multi-pod deployments, each pod's RestoreSession independently
2258+
// negotiates its own per-backend session IDs with backends that do not
2259+
// honor Mcp-Session-Id hints (e.g. SSE transports). Each pod then
2260+
// writes its own IDs back to Redis via loadSession. checkSession must
2261+
// NOT evict when only per-backend session IDs differ — only when the
2262+
// backend ID list (MetadataKeyBackendIDs) changes. Evicting on per-
2263+
// backend ID drift would cause each pod's write-back to invalidate all
2264+
// other pods' sessions, creating an infinite eviction storm.
2265+
sm, storage := newTestSessionManager(t, makeFactory(t), newFakeRegistry())
2266+
sessionID := "multi-pod-per-backend-ids"
2267+
2268+
// Storage holds IDs written by another pod's RestoreSession.
2269+
_, err := storage.Create(context.Background(), sessionID, map[string]string{
2270+
vmcpsession.MetadataKeyBackendIDs: "backend-a",
2271+
vmcpsession.MetadataKeyBackendSessionPrefix + "backend-a": "pod-a-session-id",
2272+
})
2273+
require.NoError(t, err)
2274+
2275+
// This pod cached different per-backend IDs from its own RestoreSession.
2276+
ctrl := gomock.NewController(t)
2277+
cached := sessionmocks.NewMockMultiSession(ctrl)
2278+
cached.EXPECT().GetMetadata().Return(map[string]string{
2279+
vmcpsession.MetadataKeyBackendIDs: "backend-a",
2280+
vmcpsession.MetadataKeyBackendSessionPrefix + "backend-a": "pod-b-session-id",
2281+
}).AnyTimes()
2282+
sm.sessions.Set(sessionID, cached)
2283+
2284+
err = sm.checkSession(sessionID, cached)
2285+
assert.NoError(t, err,
2286+
"differing per-backend session IDs must not evict to avoid cross-pod eviction storms")
2287+
})
20902288
}
20912289

20922290
// ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)