@@ -24,6 +24,14 @@ type RedisMessage =
2424
2525function sendToMcpServer ( sessionId : string , message : JSONRPCMessage , extra ?: { authInfo ?: AuthInfo ; } , options ?: TransportSendOptions ) : Promise < void > {
2626 const toServerChannel = getToServerChannel ( sessionId ) ;
27+
28+ logger . debug ( 'Sending message to MCP server via Redis' , {
29+ sessionId,
30+ channel : toServerChannel ,
31+ method : ( message as any ) . method ,
32+ id : ( message as any ) . id
33+ } ) ;
34+
2735 const redisMessage : RedisMessage = { type : 'mcp' , message, extra, options } ;
2836 return redisClient . publish ( toServerChannel , JSON . stringify ( redisMessage ) ) ;
2937}
@@ -51,6 +59,7 @@ function sendControlMessage(sessionId: string, action: 'SHUTDOWN' | 'PING' | 'ST
5159}
5260
5361export async function shutdownSession ( sessionId : string ) : Promise < void > {
62+ logger . info ( 'Sending shutdown control message' , { sessionId } ) ;
5463 return sendControlMessage ( sessionId , 'SHUTDOWN' ) ;
5564}
5665
@@ -61,6 +70,7 @@ export async function isLive(sessionId: string): Promise<boolean> {
6170}
6271
6372export async function setSessionOwner ( sessionId : string , userId : string ) : Promise < void > {
73+ logger . debug ( 'Setting session owner' , { sessionId, userId } ) ;
6474 await redisClient . set ( `session:${ sessionId } :owner` , userId ) ;
6575}
6676
@@ -76,30 +86,54 @@ export async function validateSessionOwnership(sessionId: string, userId: string
7686export async function isSessionOwnedBy ( sessionId : string , userId : string ) : Promise < boolean > {
7787 const isLiveSession = await isLive ( sessionId ) ;
7888 if ( ! isLiveSession ) {
89+ logger . debug ( 'Session not live' , { sessionId } ) ;
7990 return false ;
8091 }
81- return await validateSessionOwnership ( sessionId , userId ) ;
92+ const isOwned = await validateSessionOwnership ( sessionId , userId ) ;
93+ logger . debug ( 'Session ownership check' , { sessionId, userId, isOwned } ) ;
94+ return isOwned ;
8295}
8396
8497
8598export async function redisRelayToMcpServer ( sessionId : string , transport : Transport , isGetRequest : boolean = false ) : Promise < ( ) => Promise < void > > {
99+ logger . debug ( 'Setting up Redis relay to MCP server' , {
100+ sessionId,
101+ isGetRequest
102+ } ) ;
103+
86104 let redisCleanup : ( ( ) => Promise < void > ) | undefined = undefined ;
87105 const cleanup = async ( ) => {
88106 // TODO: solve race conditions where we call cleanup while the subscription is being created / before it is created
89107 if ( redisCleanup ) {
108+ logger . debug ( 'Cleaning up Redis relay' , { sessionId } ) ;
90109 await redisCleanup ( ) ;
91110 }
92111 }
93112
94113 const subscribe = async ( requestId : string ) => {
95114 const toClientChannel = getToClientChannel ( sessionId , requestId ) ;
115+
116+ logger . debug ( 'Subscribing to client channel' , {
117+ sessionId,
118+ requestId,
119+ channel : toClientChannel
120+ } ) ;
96121
97122 redisCleanup = await redisClient . createSubscription ( toClientChannel , async ( redisMessageJson ) => {
98123 const redisMessage = JSON . parse ( redisMessageJson ) as RedisMessage ;
99124 if ( redisMessage . type === 'mcp' ) {
125+ logger . debug ( 'Relaying message from Redis to client' , {
126+ sessionId,
127+ requestId,
128+ method : ( redisMessage . message as any ) . method
129+ } ) ;
100130 await transport . send ( redisMessage . message , redisMessage . options ) ;
101131 }
102132 } , ( error ) => {
133+ logger . error ( 'Error in Redis relay subscription' , error , {
134+ sessionId,
135+ channel : toClientChannel
136+ } ) ;
103137 transport . onerror ?.( error ) ;
104138 } ) ;
105139 }
@@ -111,6 +145,11 @@ export async function redisRelayToMcpServer(sessionId: string, transport: Transp
111145 transport . onmessage = async ( message , extra ) => {
112146 // First, set up response subscription if needed
113147 if ( "id" in message ) {
148+ logger . debug ( 'Setting up response subscription' , {
149+ sessionId,
150+ messageId : message . id ,
151+ method : ( message as any ) . method
152+ } ) ;
114153 await subscribe ( message . id . toString ( ) ) ;
115154 }
116155 // Now send the message to the MCP server
@@ -171,40 +210,78 @@ export class ServerRedisTransport implements Transport {
171210 }
172211
173212 async start ( ) : Promise < void > {
213+ logger . info ( 'Starting ServerRedisTransport' , {
214+ sessionId : this . _sessionId ,
215+ inactivityTimeoutMs : this . INACTIVITY_TIMEOUT_MS
216+ } ) ;
217+
174218 // Start inactivity timer
175219 this . resetInactivityTimer ( ) ;
176220
177221 // Subscribe to MCP messages from clients
178222 const serverChannel = getToServerChannel ( this . _sessionId ) ;
223+ logger . debug ( 'Subscribing to server channel' , {
224+ sessionId : this . _sessionId ,
225+ channel : serverChannel
226+ } ) ;
227+
179228 this . serverCleanup = await redisClient . createSubscription (
180229 serverChannel ,
181230 ( messageJson ) => {
182231 const redisMessage = JSON . parse ( messageJson ) as RedisMessage ;
183232 if ( redisMessage . type === 'mcp' ) {
184233 // Reset inactivity timer on each message from client
185234 this . resetInactivityTimer ( ) ;
235+
236+ logger . debug ( 'Received MCP message from client' , {
237+ sessionId : this . _sessionId ,
238+ method : ( redisMessage . message as any ) . method ,
239+ id : ( redisMessage . message as any ) . id
240+ } ) ;
241+
186242 this . onmessage ?.( redisMessage . message , redisMessage . extra ) ;
187243 }
188244 } ,
189245 ( error ) => {
246+ logger . error ( 'Error in server channel subscription' , error , {
247+ sessionId : this . _sessionId ,
248+ channel : serverChannel
249+ } ) ;
190250 this . onerror ?.( error ) ;
191251 }
192252 ) ;
193253
194254 // Subscribe to control messages for shutdown
195255 const controlChannel = getControlChannel ( this . _sessionId ) ;
256+ logger . debug ( 'Subscribing to control channel' , {
257+ sessionId : this . _sessionId ,
258+ channel : controlChannel
259+ } ) ;
260+
196261 this . controlCleanup = await redisClient . createSubscription (
197262 controlChannel ,
198263 ( messageJson ) => {
199264 const redisMessage = JSON . parse ( messageJson ) as RedisMessage ;
200265 if ( redisMessage . type === 'control' ) {
266+ logger . info ( 'Received control message' , {
267+ sessionId : this . _sessionId ,
268+ action : redisMessage . action
269+ } ) ;
270+
201271 if ( redisMessage . action === 'SHUTDOWN' ) {
272+ logger . info ( 'Shutting down transport due to control message' , {
273+ sessionId : this . _sessionId
274+ } ) ;
202275 this . shouldShutdown = true ;
203276 this . close ( ) ;
204277 }
205278 }
206279 } ,
207280 ( error ) => {
281+ logger . error ( 'Error in control channel subscription' , error , {
282+ sessionId : this . _sessionId ,
283+ channel : controlChannel
284+ } ) ;
208285 this . onerror ?.( error ) ;
209286 }
210287 ) ;
@@ -215,12 +292,25 @@ export class ServerRedisTransport implements Transport {
215292 const relatedRequestId = options ?. relatedRequestId ?. toString ( ) ?? ( "id" in message ? message . id ?. toString ( ) : notificationStreamId ) ;
216293 const channel = getToClientChannel ( this . _sessionId , relatedRequestId )
217294
295+ logger . debug ( 'Sending message to client' , {
296+ sessionId : this . _sessionId ,
297+ channel,
298+ method : ( message as any ) . method ,
299+ id : ( message as any ) . id ,
300+ relatedRequestId
301+ } ) ;
302+
218303 const redisMessage : RedisMessage = { type : 'mcp' , message, options } ;
219304 const messageStr = JSON . stringify ( redisMessage ) ;
220305 await redisClient . publish ( channel , messageStr ) ;
221306 }
222307
223308 async close ( ) : Promise < void > {
309+ logger . info ( 'Closing ServerRedisTransport' , {
310+ sessionId : this . _sessionId ,
311+ wasShutdown : this . shouldShutdown
312+ } ) ;
313+
224314 // Clear inactivity timer
225315 this . clearInactivityTimer ( ) ;
226316
@@ -241,6 +331,11 @@ export class ServerRedisTransport implements Transport {
241331}
242332
243333export async function getShttpTransport ( sessionId : string , onsessionclosed : ( sessionId : string ) => void | Promise < void > , isGetRequest : boolean = false ) : Promise < StreamableHTTPServerTransport > {
334+ logger . debug ( 'Getting StreamableHTTPServerTransport for existing session' , {
335+ sessionId,
336+ isGetRequest
337+ } ) ;
338+
244339 // Giving undefined here and setting the sessionId means the
245340 // transport wont try to create a new session.
246341 const shttpTransport = new StreamableHTTPServerTransport ( {
0 commit comments