Skip to content

Commit 4d26ddd

Browse files
Copilotlpcox
andauthored
fix: reconnect HTTP backend sessions on expiry, increase server-side session timeout
When an MCP backend session expires (e.g. after 30+ minutes of inactivity on the gateway→safeoutputs connection), the gateway now: 1. Detects the "session not found" HTTP 404 response from the backend. 2. Transparently re-initialises a new session (reconnectPlainJSON). 3. Retries the original request exactly once with the new session. For SDK-based transports (streamable HTTP / SSE), the same reconnect-and-retry pattern is applied via callSDKMethodWithReconnect whenever the SDK error message contains "session not found". Additionally, the server-side idle SessionTimeout is increased from 30 minutes to 2 hours so that long-running agent workflows (lake build, etc.) do not cause the agent→gateway session to expire during periods of no MCP activity. Fixes: MCP session expires mid-run, causing agent retry loops Agent-Logs-Url: https://github.com/github/gh-aw-mcpg/sessions/a43c25fb-e9bc-4fc8-865e-19acf449fa21 Co-authored-by: lpcox <15877973+lpcox@users.noreply.github.com>
1 parent d291ea2 commit 4d26ddd

4 files changed

Lines changed: 392 additions & 32 deletions

File tree

internal/mcp/connection.go

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net/http"
1212
"os/exec"
1313
"strings"
14+
"sync"
1415
"time"
1516

1617
"github.com/github/gh-aw-mcpg/internal/difc"
@@ -71,6 +72,9 @@ type Connection struct {
7172
httpClient *http.Client
7273
httpSessionID string // Session ID returned by the HTTP backend
7374
httpTransportType HTTPTransportType // Type of HTTP transport in use
75+
// reconnectMu serialises session-reconnect operations so that only one
76+
// goroutine performs the reconnect while others wait for it to finish.
77+
reconnectMu sync.Mutex
7478
}
7579

7680
// NewConnection creates a new MCP connection using the official SDK
@@ -255,6 +259,91 @@ func (c *Connection) GetHTTPHeaders() map[string]string {
255259
return c.headers
256260
}
257261

262+
// reconnectPlainJSON re-initialises the plain JSON-RPC session with the HTTP backend.
263+
// It is safe for concurrent callers: only one reconnect runs at a time, and the updated
264+
// session ID is available to all callers once the mutex is released.
265+
func (c *Connection) reconnectPlainJSON() error {
266+
c.reconnectMu.Lock()
267+
defer c.reconnectMu.Unlock()
268+
269+
logConn.Printf("Session expired, reconnecting plain JSON-RPC for serverID=%s", c.serverID)
270+
logger.LogWarn("backend", "MCP session expired for %s, attempting to reconnect...", c.serverID)
271+
272+
sessionID, err := c.initializeHTTPSession()
273+
if err != nil {
274+
logger.LogError("backend", "Session reconnect failed for %s: %v", c.serverID, err)
275+
return fmt.Errorf("session reconnect failed: %w", err)
276+
}
277+
278+
c.httpSessionID = sessionID
279+
logConn.Printf("Reconnected plain JSON-RPC session for serverID=%s, new sessionID=%s", c.serverID, sessionID)
280+
logger.LogInfo("backend", "Session successfully reconnected for %s", c.serverID)
281+
return nil
282+
}
283+
284+
// reconnectSDKTransport re-establishes the SDK session for streamable or SSE transports.
285+
// It is safe for concurrent callers: only one reconnect runs at a time.
286+
func (c *Connection) reconnectSDKTransport() error {
287+
c.reconnectMu.Lock()
288+
defer c.reconnectMu.Unlock()
289+
290+
logConn.Printf("Session expired, reconnecting SDK transport for serverID=%s, type=%s", c.serverID, c.httpTransportType)
291+
logger.LogWarn("backend", "MCP session expired for %s, attempting to reconnect...", c.serverID)
292+
293+
// Close the existing session gracefully (ignore error – it's already dead).
294+
if c.session != nil {
295+
_ = c.session.Close()
296+
}
297+
298+
// Build the appropriate transport.
299+
client := newMCPClient(logConn)
300+
var transport sdk.Transport
301+
switch c.httpTransportType {
302+
case HTTPTransportStreamable:
303+
transport = &sdk.StreamableClientTransport{
304+
Endpoint: c.httpURL,
305+
HTTPClient: c.httpClient,
306+
MaxRetries: 0,
307+
}
308+
case HTTPTransportSSE:
309+
transport = &sdk.SSEClientTransport{
310+
Endpoint: c.httpURL,
311+
HTTPClient: c.httpClient,
312+
}
313+
default:
314+
return fmt.Errorf("cannot reconnect: unsupported transport type %s", c.httpTransportType)
315+
}
316+
317+
connectCtx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
318+
defer cancel()
319+
320+
session, err := client.Connect(connectCtx, transport, nil)
321+
if err != nil {
322+
logger.LogError("backend", "Session reconnect failed for %s: %v", c.serverID, err)
323+
return fmt.Errorf("session reconnect failed: %w", err)
324+
}
325+
326+
c.client = client
327+
c.session = session
328+
329+
logConn.Printf("Reconnected SDK session for serverID=%s", c.serverID)
330+
logger.LogInfo("backend", "Session successfully reconnected for %s", c.serverID)
331+
return nil
332+
}
333+
334+
// callSDKMethodWithReconnect calls the SDK method and, if the session has expired,
335+
// reconnects and retries exactly once before propagating the error.
336+
func (c *Connection) callSDKMethodWithReconnect(method string, params interface{}) (*Response, error) {
337+
result, err := c.callSDKMethod(method, params)
338+
if err != nil && isSessionNotFoundError(err) {
339+
logConn.Printf("Session not found error from SDK (serverID=%s), attempting reconnect", c.serverID)
340+
if reconnErr := c.reconnectSDKTransport(); reconnErr == nil {
341+
result, err = c.callSDKMethod(method, params)
342+
}
343+
}
344+
return result, err
345+
}
346+
258347
// SendRequest sends a JSON-RPC request and waits for the response
259348
// The serverID parameter is used for logging to associate the request with a backend server
260349
func (c *Connection) SendRequest(method string, params interface{}) (*Response, error) {
@@ -301,7 +390,7 @@ func (c *Connection) SendRequestWithServerID(ctx context.Context, method string,
301390
}
302391

303392
// For streamable and SSE transports, use SDK session methods
304-
result, err = c.callSDKMethod(method, params)
393+
result, err = c.callSDKMethodWithReconnect(method, params)
305394
// Log the response from backend server
306395
var responsePayload []byte
307396
if result != nil {

internal/mcp/http_transport.go

Lines changed: 76 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,24 @@ func isHTTPConnectionError(err error) bool {
6262
return false
6363
}
6464

65+
// isSessionNotFoundError checks if an error message indicates a backend MCP session has expired
66+
// or is not found. This is used to detect when automatic reconnection to the backend is needed.
67+
func isSessionNotFoundError(err error) bool {
68+
if err == nil {
69+
return false
70+
}
71+
return strings.Contains(strings.ToLower(err.Error()), "session not found")
72+
}
73+
74+
// isSessionNotFoundHTTPResponse checks if an HTTP response indicates the backend session was not found.
75+
// MCP backends return HTTP 404 with a "session not found" body when a session has expired.
76+
func isSessionNotFoundHTTPResponse(statusCode int, body []byte) bool {
77+
if statusCode != http.StatusNotFound {
78+
return false
79+
}
80+
return strings.Contains(strings.ToLower(string(body)), "session not found")
81+
}
82+
6583
// parseSSEResponse extracts JSON data from SSE-formatted response
6684
// SSE format: "event: message\ndata: {json}\n\n"
6785
func parseSSEResponse(body []byte) ([]byte, error) {
@@ -436,58 +454,45 @@ func (c *Connection) initializeHTTPSession() (string, error) {
436454
return sessionID, nil
437455
}
438456

439-
// sendHTTPRequest sends a JSON-RPC request to an HTTP MCP server
440-
// The ctx parameter is used to extract session ID for the Mcp-Session-Id header
441-
func (c *Connection) sendHTTPRequest(ctx context.Context, method string, params interface{}) (*Response, error) {
442-
// Generate unique request ID using atomic counter
443-
requestID := atomic.AddUint64(&requestIDCounter, 1)
444-
445-
// For tools/call, ensure arguments field always exists (MCP protocol requirement)
446-
if method == "tools/call" {
447-
params = ensureToolCallArguments(params)
448-
}
449-
450-
logConn.Printf("Sending HTTP request to %s: method=%s, id=%d", c.httpURL, method, requestID)
451-
452-
// Execute HTTP request with custom header modification for session ID
453-
result, err := c.executeHTTPRequest(ctx, method, params, requestID, func(httpReq *http.Request) {
454-
// Add Mcp-Session-Id header with priority:
455-
// 1) Context session ID (if explicitly provided for this request)
456-
// 2) Stored httpSessionID from initialization
457+
// buildSessionHeaderModifier returns a header modifier function that adds the Mcp-Session-Id header.
458+
// Priority: context session ID > stored connection session ID.
459+
// The returned function reads c.httpSessionID at call time, so it picks up any reconnected session.
460+
func (c *Connection) buildSessionHeaderModifier(ctx context.Context) func(*http.Request) {
461+
// Capture any context-provided session ID once (it never changes for this request).
462+
ctxSessionID, _ := ctx.Value(SessionIDContextKey).(string)
463+
return func(httpReq *http.Request) {
457464
var sessionID string
458-
if ctxSessionID, ok := ctx.Value(SessionIDContextKey).(string); ok && ctxSessionID != "" {
465+
if ctxSessionID != "" {
459466
sessionID = ctxSessionID
460467
logConn.Printf("Using session ID from context: %s", sessionID)
461468
} else if c.httpSessionID != "" {
462469
sessionID = c.httpSessionID
463470
logConn.Printf("Using stored session ID from initialization: %s", sessionID)
464471
}
465-
466472
if sessionID != "" {
467473
httpReq.Header.Set("Mcp-Session-Id", sessionID)
468474
} else {
469475
logConn.Printf("No session ID available (backend may not require session management)")
470476
}
471-
})
472-
if err != nil {
473-
return nil, err
474477
}
478+
}
475479

476-
logConn.Printf("Received HTTP response: status=%d, body_len=%d", result.StatusCode, len(result.ResponseBody))
477-
478-
// Parse JSON-RPC response
479-
// The response might be in SSE format (event: message\ndata: {...})
480+
// parseHTTPResult converts a raw httpRequestResult into a JSON-RPC Response, handling non-OK
481+
// HTTP status codes by synthesising a JSON-RPC error when the server did not provide one.
482+
func parseHTTPResult(result *httpRequestResult) (*Response, error) {
483+
// Parse JSON-RPC response.
484+
// The response might be in SSE format (event: message\ndata: {...}).
480485
rpcResponse, err := parseJSONRPCResponseWithSSE(result.ResponseBody, result.StatusCode, "JSON-RPC response")
481486
if err != nil {
482487
return nil, err
483488
}
484489

485-
// Check for HTTP errors after parsing
490+
// Check for HTTP errors after parsing.
486491
// If we have a non-OK status but successfully parsed a JSON-RPC response,
487-
// pass it through (it may already contain an error field)
492+
// pass it through (it may already contain an error field).
488493
if result.StatusCode != http.StatusOK {
489494
logConn.Printf("HTTP error status=%d with valid JSON-RPC response, passing through", result.StatusCode)
490-
// If the response doesn't already have an error, construct one
495+
// If the response doesn't already have an error, construct one.
491496
if rpcResponse.Error == nil {
492497
rpcResponse.Error = &ResponseError{
493498
Code: -32603, // Internal error
@@ -499,3 +504,44 @@ func (c *Connection) sendHTTPRequest(ctx context.Context, method string, params
499504

500505
return rpcResponse, nil
501506
}
507+
508+
// sendHTTPRequest sends a JSON-RPC request to an HTTP MCP server.
509+
// The ctx parameter is used to extract session ID for the Mcp-Session-Id header.
510+
// If the backend returns a "session not found" (HTTP 404) response, it attempts a one-time
511+
// session reconnect and retries the request transparently.
512+
func (c *Connection) sendHTTPRequest(ctx context.Context, method string, params interface{}) (*Response, error) {
513+
// For tools/call, ensure arguments field always exists (MCP protocol requirement)
514+
if method == "tools/call" {
515+
params = ensureToolCallArguments(params)
516+
}
517+
518+
headerModifier := c.buildSessionHeaderModifier(ctx)
519+
520+
requestID := atomic.AddUint64(&requestIDCounter, 1)
521+
logConn.Printf("Sending HTTP request to %s: method=%s, id=%d", c.httpURL, method, requestID)
522+
523+
result, err := c.executeHTTPRequest(ctx, method, params, requestID, headerModifier)
524+
if err != nil {
525+
return nil, err
526+
}
527+
528+
logConn.Printf("Received HTTP response: status=%d, body_len=%d", result.StatusCode, len(result.ResponseBody))
529+
530+
// If the backend reported that the session has expired, reconnect and retry once.
531+
if isSessionNotFoundHTTPResponse(result.StatusCode, result.ResponseBody) {
532+
logConn.Printf("Session not found from %s (serverID=%s), attempting reconnect", c.httpURL, c.serverID)
533+
if reconnErr := c.reconnectPlainJSON(); reconnErr == nil {
534+
requestID = atomic.AddUint64(&requestIDCounter, 1)
535+
logConn.Printf("Retrying HTTP request after reconnect: method=%s, id=%d", method, requestID)
536+
result, err = c.executeHTTPRequest(ctx, method, params, requestID, headerModifier)
537+
if err != nil {
538+
return nil, err
539+
}
540+
logConn.Printf("Retry HTTP response: status=%d, body_len=%d", result.StatusCode, len(result.ResponseBody))
541+
} else {
542+
logConn.Printf("Session reconnect failed (%v), returning original session-not-found error", reconnErr)
543+
}
544+
}
545+
546+
return parseHTTPResult(result)
547+
}

0 commit comments

Comments
 (0)