Skip to content

Commit 32669c3

Browse files
fixup! simplify: remove OutboundMiddleware/useOutbound; StreamDriver calls TaskManager directly
Invert the dependency per review: StreamDriver is now fully task-agnostic. - StreamDriver exposes generic per-request `intercept` hook (RequestOptions) and per-connection `onresponse` tap returning {consumed, preserveProgress}. - TaskManager.sendRequest/sendNotification helpers wrap Outbound.request/notification, threading processOutboundRequest through `intercept`. - Protocol/McpServer/Client wire TaskManager via these helpers + the onresponse tap. - McpServer request-shaped Outbound also honours `intercept`.
1 parent 934f6e5 commit 32669c3

7 files changed

Lines changed: 123 additions & 76 deletions

File tree

packages/client/src/client/client.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,11 +282,14 @@ export class Client extends Dispatcher<ClientContext> {
282282
if (isChannelTransport(transport)) {
283283
const driverOpts: StreamDriverOptions = {
284284
supportedProtocolVersions: this._supportedProtocolVersions,
285-
debouncedNotificationMethods: this._options?.debouncedNotificationMethods,
286-
taskManager: this._taskManager
285+
debouncedNotificationMethods: this._options?.debouncedNotificationMethods
287286
};
288287
this._ct = channelAsClientTransport(transport, this, driverOpts);
289-
this._ct.driver!.onclose = () => this.onclose?.();
288+
this._ct.driver!.onresponse = (r, id) => this._taskManager.processInboundResponse(r, id);
289+
this._ct.driver!.onclose = () => {
290+
this._taskManager.onClose();
291+
this.onclose?.();
292+
};
290293
this._ct.driver!.onerror = e => this.onerror?.(e);
291294
const skipInit = transport.sessionId !== undefined;
292295
if (skipInit) {

packages/core/src/shared/context.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import type {
88
ElicitRequestURLParams,
99
ElicitResult,
1010
JSONRPCMessage,
11+
JSONRPCRequest,
12+
JSONRPCResultResponse,
1113
LoggingLevel,
1214
Notification,
1315
Progress,
@@ -148,6 +150,20 @@ export type RequestOptions = {
148150
* If provided, associates this request with a related task.
149151
*/
150152
relatedTask?: RelatedTaskMetadata;
153+
154+
/**
155+
* @internal Called by the channel adapter after the wire request is built
156+
* (id assigned, `_meta.progressToken` set, response/progress handlers registered)
157+
* but before send. Return `true` to skip the send; the caller takes ownership of
158+
* delivering `wire` later. The registered handlers stay live. `settle` injects
159+
* a result/error into the registered response handler out-of-band.
160+
*/
161+
intercept?: (
162+
wire: JSONRPCRequest,
163+
messageId: number,
164+
settle: (response: JSONRPCResultResponse | Error) => void,
165+
onError: (error: unknown) => void
166+
) => boolean;
151167
} & TransportSendOptions;
152168

153169
/**

packages/core/src/shared/protocol.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,13 @@ export abstract class Protocol<ContextT extends BaseContext> {
162162
const driver = new StreamDriver(this._dispatcher, transport, {
163163
supportedProtocolVersions: this._supportedProtocolVersions,
164164
debouncedNotificationMethods: this._options?.debouncedNotificationMethods,
165-
buildEnv: (extra, base) => ({ ...base, _transportExtra: extra }),
166-
taskManager: this._ownTaskManager
165+
buildEnv: (extra, base) => ({ ...base, _transportExtra: extra })
167166
});
168167
this._outbound = driver;
168+
driver.onresponse = (r, id) => this._ownTaskManager.processInboundResponse(r, id);
169169
driver.onclose = () => {
170170
if (this._outbound === driver) this._outbound = undefined;
171+
this._ownTaskManager.onClose();
171172
this.onclose?.();
172173
};
173174
driver.onerror = error => this.onerror?.(error);
@@ -215,7 +216,7 @@ export abstract class Protocol<ContextT extends BaseContext> {
215216
if (this._options?.enforceStrictCapabilities === true) {
216217
this.assertCapabilityForMethod(request.method as RequestMethod);
217218
}
218-
return this._outbound.request(request, resultSchema, options);
219+
return this._ownTaskManager.sendRequest(request, resultSchema, options, this._outbound);
219220
}
220221

221222
/**
@@ -226,7 +227,7 @@ export abstract class Protocol<ContextT extends BaseContext> {
226227
throw new SdkError(SdkErrorCode.NotConnected, 'Not connected');
227228
}
228229
this.assertNotificationCapability(notification.method as NotificationMethod);
229-
return this._outbound.notification(notification, options);
230+
return this._ownTaskManager.sendNotification(notification, options, this._outbound);
230231
}
231232

232233
// ───────────────────────────────────────────────────────────────────────

packages/core/src/shared/streamDriver.ts

Lines changed: 26 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import { parseSchema } from '../util/schema.js';
2828
import type { NotificationOptions, Outbound, ProgressCallback, RequestEnv, RequestOptions } from './context.js';
2929
import { DEFAULT_REQUEST_TIMEOUT_MSEC } from './context.js';
3030
import type { Dispatcher } from './dispatcher.js';
31-
import type { TaskManager } from './taskManager.js';
3231
import type { AttachOptions, Transport } from './transport.js';
3332

3433
type TimeoutInfo = {
@@ -48,13 +47,6 @@ export type StreamDriverOptions = {
4847
* {@linkcode MessageExtraInfo} (e.g. auth, http req).
4948
*/
5049
buildEnv?: (extra: MessageExtraInfo | undefined, base: RequestEnv) => RequestEnv;
51-
/**
52-
* Optional {@linkcode TaskManager}. When provided, the driver calls its
53-
* `processOutbound*` / `processInboundResponse` / `onClose` hooks at the
54-
* request-correlation seam. Inbound task processing happens via the
55-
* TaskManager's dispatch middleware (registered by the caller), not here.
56-
*/
57-
taskManager?: TaskManager;
5850
};
5951

6052
/**
@@ -73,10 +65,18 @@ export class StreamDriver implements Outbound {
7365
private _pendingDebouncedNotifications = new Set<string>();
7466
private _closed = false;
7567
private _supportedProtocolVersions: string[];
76-
private _taskManager?: TaskManager;
7768

7869
onclose?: () => void;
7970
onerror?: (error: Error) => void;
71+
/**
72+
* Tap for every inbound response. Return `consumed: true` to claim it (suppresses the
73+
* matched-handler dispatch / unknown-id error). Return `preserveProgress: true` to keep
74+
* the progress handler registered after the matched handler runs. Set by the owner.
75+
*/
76+
onresponse?: (
77+
response: JSONRPCResponse | JSONRPCErrorResponse,
78+
messageId: number
79+
) => { consumed: boolean; preserveProgress?: boolean };
8080

8181
constructor(
8282
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- driver is context-agnostic; subclass owns ContextT
@@ -85,7 +85,6 @@ export class StreamDriver implements Outbound {
8585
private _options: StreamDriverOptions = {}
8686
) {
8787
this._supportedProtocolVersions = _options.supportedProtocolVersions ?? SUPPORTED_PROTOCOL_VERSIONS;
88-
this._taskManager = _options.taskManager;
8988
}
9089

9190
/** {@linkcode Outbound.removeProgressHandler}. */
@@ -208,31 +207,19 @@ export class StreamDriver implements Outbound {
208207
options?.resetTimeoutOnProgress ?? false
209208
);
210209

211-
let queued = false;
212-
if (this._taskManager) {
213-
const sideChannelResponse = (resp: JSONRPCResultResponse | Error) => {
214-
const h = this._responseHandlers.get(messageId);
215-
if (h) h(resp);
216-
else this._onerror(new Error(`Response handler missing for side-channeled request ${messageId}`));
217-
};
218-
try {
219-
queued = this._taskManager.processOutboundRequest(jsonrpcRequest, options, messageId, sideChannelResponse, error => {
220-
this._progressHandlers.delete(messageId);
221-
reject(error);
222-
}).queued;
223-
} catch (error) {
210+
if (options?.intercept) {
211+
const settle = (r: JSONRPCResultResponse | Error) => this._responseHandlers.get(messageId)?.(r);
212+
const onError = (e: unknown) => {
224213
this._progressHandlers.delete(messageId);
225-
reject(error);
226-
return;
227-
}
214+
reject(e);
215+
};
216+
if (options.intercept(jsonrpcRequest, messageId, settle, onError)) return;
228217
}
229218

230-
if (!queued) {
231-
this.pipe.send(jsonrpcRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch(error => {
232-
this._progressHandlers.delete(messageId);
233-
reject(error);
234-
});
235-
}
219+
this.pipe.send(jsonrpcRequest, { relatedRequestId, resumptionToken, onresumptiontoken }).catch(error => {
220+
this._progressHandlers.delete(messageId);
221+
reject(error);
222+
});
236223
}).finally(() => {
237224
if (onAbort) options?.signal?.removeEventListener('abort', onAbort);
238225
if (cleanupId !== undefined) {
@@ -246,9 +233,8 @@ export class StreamDriver implements Outbound {
246233
* Sends a notification over the pipe. Supports debouncing per the constructor option.
247234
*/
248235
async notification(notification: Notification, options?: NotificationOptions): Promise<void> {
249-
const taskResult = await this._taskManager?.processOutboundNotification(notification, options);
250-
if (taskResult?.queued || this._closed) return;
251-
const jsonrpc: JSONRPCNotification = taskResult?.jsonrpcNotification ?? {
236+
if (this._closed) return;
237+
const jsonrpc: JSONRPCNotification = {
252238
jsonrpc: '2.0',
253239
method: notification.method,
254240
params: notification.params
@@ -345,19 +331,16 @@ export class StreamDriver implements Outbound {
345331

346332
private _onresponse(response: JSONRPCResponse | JSONRPCErrorResponse): void {
347333
const messageId = Number(response.id);
348-
const taskResult = this._taskManager?.processInboundResponse(response, messageId);
349-
if (taskResult?.consumed) return;
350-
334+
const tap = this.onresponse?.(response, messageId);
335+
if (tap?.consumed) return;
351336
const handler = this._responseHandlers.get(messageId);
352337
if (handler === undefined) {
353338
this._onerror(new Error(`Received a response for an unknown message ID: ${JSON.stringify(response)}`));
354339
return;
355340
}
356341
this._responseHandlers.delete(messageId);
357342
this._cleanupTimeout(messageId);
358-
if (!taskResult?.preserveProgress) {
359-
this._progressHandlers.delete(messageId);
360-
}
343+
if (!tap?.preserveProgress) this._progressHandlers.delete(messageId);
361344
if (isJSONRPCResultResponse(response)) {
362345
handler(response);
363346
} else {
@@ -370,7 +353,6 @@ export class StreamDriver implements Outbound {
370353
const responseHandlers = this._responseHandlers;
371354
this._responseHandlers = new Map();
372355
this._progressHandlers.clear();
373-
this._taskManager?.onClose();
374356
this._pendingDebouncedNotifications.clear();
375357
for (const info of this._timeoutInfo.values()) clearTimeout(info.timeoutId);
376358
this._timeoutInfo.clear();
@@ -439,12 +421,12 @@ export async function attachChannelTransport(
439421
const driver = new StreamDriver(dispatcher, pipe, {
440422
supportedProtocolVersions: options?.supportedProtocolVersions,
441423
debouncedNotificationMethods: options?.debouncedNotificationMethods,
442-
taskManager: options?.taskManager,
443424
buildEnv: options?.buildEnv
444425
});
445-
if (options?.onclose || options?.onerror) {
426+
if (options?.onclose || options?.onerror || options?.onresponse) {
446427
driver.onclose = options.onclose;
447428
driver.onerror = options.onerror;
429+
driver.onresponse = options.onresponse;
448430
}
449431
await driver.start();
450432
return driver;

packages/core/src/shared/taskManager.ts

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ export class TaskManager {
384384
private _outboundRequest<T extends AnySchema>(req: Request, schema: T, opts?: RequestOptions): Promise<SchemaOutput<T>> {
385385
const ch = this._requireHooks.channel();
386386
if (!ch) throw new ProtocolError(ProtocolErrorCode.InternalError, 'Not connected');
387-
return ch.request(req, schema, opts);
387+
return this.sendRequest(req, schema, opts, ch);
388388
}
389389

390390
// -- Public API (client-facing) --
@@ -804,7 +804,44 @@ export class TaskManager {
804804
};
805805
}
806806

807-
// -- Outbound-seam lifecycle methods (called directly by StreamDriver) --
807+
// -- Outbound helpers (called by McpServer/Client/Protocol before delegating to Outbound) --
808+
809+
/**
810+
* Task-aware request send: routes through {@linkcode RequestOptions.intercept} so the
811+
* channel adapter builds the wire (id/progressToken/handlers) and TaskManager decides
812+
* whether to queue it. Use this where instance-level outbound requests are made
813+
* (Protocol/McpServer/Client), so the channel adapter stays task-agnostic.
814+
*/
815+
sendRequest<T extends AnySchema>(
816+
request: Request,
817+
resultSchema: T,
818+
options: RequestOptions | undefined,
819+
outbound: Outbound
820+
): Promise<SchemaOutput<T>> {
821+
if (!options?.relatedTask && !options?.task) {
822+
return outbound.request(request, resultSchema, options);
823+
}
824+
return outbound.request(request, resultSchema, {
825+
...options,
826+
intercept: (wire, messageId, settle, onError) =>
827+
this.processOutboundRequest(wire, options, messageId, settle, onError).queued
828+
});
829+
}
830+
831+
/**
832+
* Task-aware notification send: queues when `options.relatedTask` is set, otherwise
833+
* delegates to `outbound.notification()` with related-task metadata attached.
834+
*/
835+
async sendNotification(notification: Notification, options: NotificationOptions | undefined, outbound: Outbound): Promise<void> {
836+
const result = await this.processOutboundNotification(notification, options);
837+
if (result.queued) return;
838+
await outbound.notification(
839+
result.jsonrpcNotification
840+
? { method: result.jsonrpcNotification.method, params: result.jsonrpcNotification.params }
841+
: notification,
842+
options
843+
);
844+
}
808845

809846
processOutboundRequest(
810847
jsonrpcRequest: JSONRPCRequest,

packages/core/src/shared/transport.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import type {
88
RequestId
99
} from '../types/index.js';
1010
import type { RequestEnv } from './context.js';
11-
import type { TaskManager } from './taskManager.js';
1211

1312
export type FetchLike = (url: string | URL, init?: RequestInit) => Promise<Response>;
1413

@@ -163,10 +162,14 @@ export type Transport = ChannelTransport;
163162
export type AttachOptions = {
164163
supportedProtocolVersions?: string[];
165164
debouncedNotificationMethods?: string[];
166-
taskManager?: TaskManager;
167165
buildEnv?: (extra: MessageExtraInfo | undefined, base: RequestEnv) => RequestEnv;
168166
onclose?: () => void;
169167
onerror?: (error: Error) => void;
168+
/** Tap for every inbound response. See {@linkcode StreamDriver.onresponse}. */
169+
onresponse?: (
170+
response: JSONRPCResultResponse | JSONRPCErrorResponse,
171+
messageId: number
172+
) => { consumed: boolean; preserveProgress?: boolean };
170173
};
171174

172175
/**

packages/server/src/server/mcpServer.ts

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import type {
2020
JSONRPCErrorResponse,
2121
JSONRPCMessage,
2222
JSONRPCNotification,
23+
JSONRPCResultResponse,
2324
JSONRPCRequest,
2425
JSONRPCResponse,
2526
JsonSchemaType,
@@ -311,36 +312,40 @@ export class McpServer extends Dispatcher<ServerContext> implements RegistriesHo
311312
outbound = {
312313
close: () => transport.close(),
313314
notification: transport.notify
314-
? async (n, opts) => {
315-
const out = await this._taskManager.processOutboundNotification(n, opts);
316-
if (!out.queued)
317-
await transport.notify!(out.jsonrpcNotification ?? ({ jsonrpc: '2.0', ...n } as JSONRPCNotification));
318-
}
315+
? async (n, _opts) => transport.notify!({ jsonrpc: '2.0', ...n } as JSONRPCNotification)
319316
: noOutbound('notifications'),
320317
request: transport.request
321-
? async (r, schema, _opts) => {
322-
const id = this._nextOutboundId++;
323-
const resp = await transport.request!({
324-
jsonrpc: '2.0',
325-
id,
326-
method: r.method,
327-
params: r.params
328-
} as JSONRPCRequest);
329-
if ('error' in resp) throw ProtocolError.fromError(resp.error.code, resp.error.message, resp.error.data);
330-
const parsed = parseSchema(schema, resp.result);
331-
if (!parsed.success) throw parsed.error;
332-
return parsed.data as SchemaOutput<typeof schema>;
333-
}
318+
? (r, schema, opts) =>
319+
new Promise((resolve, reject) => {
320+
const id = this._nextOutboundId++;
321+
const wire = { jsonrpc: '2.0', id, method: r.method, params: r.params } as JSONRPCRequest;
322+
const finish = (resp: JSONRPCResultResponse | Error) => {
323+
if (resp instanceof Error) return reject(resp);
324+
const parsed = parseSchema(schema, resp.result);
325+
if (!parsed.success) return reject(parsed.error);
326+
resolve(parsed.data as SchemaOutput<typeof schema>);
327+
};
328+
if (opts?.intercept?.(wire, id, finish, reject)) return;
329+
transport
330+
.request!(wire)
331+
.then(resp =>
332+
'error' in resp
333+
? reject(ProtocolError.fromError(resp.error.code, resp.error.message, resp.error.data))
334+
: finish(resp)
335+
)
336+
.catch(reject);
337+
})
334338
: noOutbound('requests')
335339
};
336340
} else {
337341
outbound = await attachChannelTransport(transport, this, {
338342
supportedProtocolVersions: this._supportedProtocolVersions,
339343
debouncedNotificationMethods: this._options?.debouncedNotificationMethods,
340344
buildEnv: (extra, base) => ({ ...base, _transportExtra: extra }),
341-
taskManager: this._taskManager,
345+
onresponse: (r, id) => this._taskManager.processInboundResponse(r, id),
342346
onclose: () => {
343347
if (this._outbound === outbound) this._outbound = undefined;
348+
this._taskManager.onClose();
344349
this.onclose?.();
345350
},
346351
onerror: e => this.onerror?.(e)
@@ -594,7 +599,7 @@ export class McpServer extends Dispatcher<ServerContext> implements RegistriesHo
594599
if (this._options?.enforceStrictCapabilities === true) {
595600
assertCapabilityForMethod(req.method as RequestMethod, this._clientCapabilities);
596601
}
597-
return this._requireOutbound().request(req, schema as never, options) as Promise<T>;
602+
return this._taskManager.sendRequest(req, schema as never, options, this._requireOutbound()) as Promise<T>;
598603
}
599604

600605
/**
@@ -748,7 +753,7 @@ export class McpServer extends Dispatcher<ServerContext> implements RegistriesHo
748753
*/
749754
async notification(notification: Notification, options?: NotificationOptions): Promise<void> {
750755
assertNotificationCapability(notification.method as NotificationMethod, this._capabilities, this._clientCapabilities);
751-
await this._outbound?.notification(notification, options);
756+
if (this._outbound) await this._taskManager.sendNotification(notification, options, this._outbound);
752757
}
753758

754759
async sendLoggingMessage(params: LoggingMessageNotification['params'], sessionId?: string): Promise<void> {

0 commit comments

Comments
 (0)