@@ -81,7 +81,7 @@ export async function isSessionOwnedBy(sessionId: string, userId: string): Promi
8181}
8282
8383
84- export function redisRelayToMcpServer ( sessionId : string , transport : Transport ) : ( ) => Promise < void > {
84+ export async function redisRelayToMcpServer ( sessionId : string , transport : Transport , isGetRequest : boolean = false ) : Promise < ( ) => Promise < void > > {
8585 let redisCleanup : ( ( ) => Promise < void > ) | undefined = undefined ;
8686 const cleanup = async ( ) => {
8787 // TODO: solve race conditions where we call cleanup while the subscription is being created / before it is created
@@ -90,33 +90,39 @@ export function redisRelayToMcpServer(sessionId: string, transport: Transport):
9090 }
9191 }
9292
93- const messagePromise = new Promise < JSONRPCMessage > ( ( resolve ) => {
94- transport . onmessage = async ( message , extra ) => {
95- // First, set up response subscription if needed
96- if ( "id" in message ) {
97- const toClientChannel = getToClientChannel ( sessionId , message . id . toString ( ) ) ;
93+ const subscribe = async ( requestId : string ) => {
94+ const toClientChannel = getToClientChannel ( sessionId , requestId ) ;
9895
99- redisCleanup = await redisClient . createSubscription ( toClientChannel , async ( redisMessageJson ) => {
100- const redisMessage = JSON . parse ( redisMessageJson ) as RedisMessage ;
101- if ( redisMessage . type === 'mcp' ) {
102- await transport . send ( redisMessage . message , redisMessage . options ) ;
103- }
104- } , ( error ) => {
105- transport . onerror ?.( error ) ;
106- } ) ;
96+ redisCleanup = await redisClient . createSubscription ( toClientChannel , async ( redisMessageJson ) => {
97+ const redisMessage = JSON . parse ( redisMessageJson ) as RedisMessage ;
98+ if ( redisMessage . type === 'mcp' ) {
99+ await transport . send ( redisMessage . message , redisMessage . options ) ;
107100 }
108-
109- // Now send the message to the MCP server
110- await sendToMcpServer ( sessionId , message , extra ) ;
111- resolve ( message ) ;
112- }
113- } ) ;
114-
115- messagePromise . catch ( ( error ) => {
116- transport . onerror ?.( error ) ;
117- cleanup ( ) ;
118- } ) ;
101+ } , ( error ) => {
102+ transport . onerror ?.( error ) ;
103+ } ) ;
104+ }
119105
106+ if ( isGetRequest ) {
107+ await subscribe ( notificationStreamId ) ;
108+ } else {
109+ const messagePromise = new Promise < JSONRPCMessage > ( ( resolve ) => {
110+ transport . onmessage = async ( message , extra ) => {
111+ // First, set up response subscription if needed
112+ if ( "id" in message ) {
113+ await subscribe ( message . id . toString ( ) ) ;
114+ }
115+ // Now send the message to the MCP server
116+ await sendToMcpServer ( sessionId , message , extra ) ;
117+ resolve ( message ) ;
118+ }
119+ } ) ;
120+
121+ messagePromise . catch ( ( error ) => {
122+ transport . onerror ?.( error ) ;
123+ cleanup ( ) ;
124+ } ) ;
125+ }
120126 return cleanup ;
121127}
122128
@@ -202,7 +208,7 @@ export class ServerRedisTransport implements Transport {
202208 }
203209}
204210
205- export async function getShttpTransport ( sessionId : string , onsessionclosed : ( sessionId : string ) => void | Promise < void > ) : Promise < StreamableHTTPServerTransport > {
211+ export async function getShttpTransport ( sessionId : string , onsessionclosed : ( sessionId : string ) => void | Promise < void > , isGetRequest : boolean = false ) : Promise < StreamableHTTPServerTransport > {
206212 // Giving undefined here and setting the sessionId means the
207213 // transport wont try to create a new session.
208214 const shttpTransport = new StreamableHTTPServerTransport ( {
@@ -212,7 +218,7 @@ export async function getShttpTransport(sessionId: string, onsessionclosed: (ses
212218 shttpTransport . sessionId = sessionId ;
213219
214220 // Use the new request-id based relay approach
215- const cleanup = redisRelayToMcpServer ( sessionId , shttpTransport ) ;
221+ const cleanup = await redisRelayToMcpServer ( sessionId , shttpTransport , isGetRequest ) ;
216222 shttpTransport . onclose = cleanup ;
217223 return shttpTransport ;
218224}
0 commit comments