Skip to content

Commit c823def

Browse files
simplify: remove OutboundMiddleware/useOutbound; StreamDriver calls TaskManager directly
OutboundMiddleware was a 4-hook record (request/notification/response/close) with first-claim-wins composition, registered via useOutbound() on McpServer/ Client/Protocol. Its only consumer was TaskManager, and it created a second, incompatible middleware pattern alongside Dispatcher.use(). Now: StreamDriver takes `taskManager?: TaskManager` and calls processOutbound*/ processInboundResponse/onClose at explicit call sites. McpServer/Client/Protocol pass their TaskManager via attach options. TaskManager.attachTo() returns void (inbound use() registration + handler install only). Dispatcher.use() (the standard (next)=>fn inbound pattern) is unchanged.
1 parent fc9885b commit c823def

7 files changed

Lines changed: 44 additions & 175 deletions

File tree

packages/client/src/client/client.ts

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import type {
3232
Notification,
3333
NotificationMethod,
3434
NotificationOptions,
35-
OutboundMiddleware,
3635
ProtocolOptions,
3736
ReadResourceRequest,
3837
Request,
@@ -55,7 +54,6 @@ import {
5554
CallToolResultSchema,
5655
CancelTaskResultSchema,
5756
CompleteResultSchema,
58-
composeOutboundMiddleware,
5957
CreateMessageRequestSchema,
6058
CreateMessageResultSchema,
6159
CreateMessageResultWithToolsSchema,
@@ -230,7 +228,6 @@ export class Client extends Dispatcher<ClientContext> {
230228
private _experimental?: { tasks: ExperimentalClientTasks };
231229
private _listChangedDebounceTimers: Map<string, ReturnType<typeof setTimeout>> = new Map();
232230
private _taskManager: TaskManager;
233-
private readonly _outboundMw: OutboundMiddleware[] = [];
234231

235232
onclose?: () => void;
236233
onerror?: (error: Error) => void;
@@ -249,7 +246,7 @@ export class Client extends Dispatcher<ClientContext> {
249246

250247
const tasksOpts = extractTaskManagerOptions(_options?.capabilities?.tasks);
251248
this._taskManager = tasksOpts ? new TaskManager(tasksOpts) : new NullTaskManager();
252-
const tasksOutbound = this._taskManager.attachTo(this, {
249+
this._taskManager.attachTo(this, {
253250
channel: () =>
254251
this._ct
255252
? {
@@ -264,7 +261,6 @@ export class Client extends Dispatcher<ClientContext> {
264261
assertTaskCapability: () => {},
265262
assertTaskHandlerCapability: () => {}
266263
});
267-
this.useOutbound(tasksOutbound);
268264

269265
// Strip runtime-only fields from advertised capabilities
270266
if (_options?.capabilities?.tasks) {
@@ -277,14 +273,6 @@ export class Client extends Dispatcher<ClientContext> {
277273
super.setRequestHandler('ping', async () => ({}));
278274
}
279275

280-
/**
281-
* Register an {@linkcode OutboundMiddleware} applied at the request-correlation seam.
282-
*/
283-
useOutbound(mw: OutboundMiddleware): this {
284-
this._outboundMw.push(mw);
285-
return this;
286-
}
287-
288276
/**
289277
* Connects to a server. Accepts either a {@linkcode ClientTransport}
290278
* (2026-06-native, request-shaped) or a legacy pipe {@linkcode Transport}
@@ -297,7 +285,7 @@ export class Client extends Dispatcher<ClientContext> {
297285
const driverOpts: StreamDriverOptions = {
298286
supportedProtocolVersions: this._supportedProtocolVersions,
299287
debouncedNotificationMethods: this._options?.debouncedNotificationMethods,
300-
outboundMw: this._outboundMw
288+
taskManager: this._taskManager
301289
};
302290
this._ct = channelAsClientTransport(transport, this, driverOpts);
303291
this._ct.driver!.onclose = () => this.onclose?.();
@@ -354,8 +342,7 @@ export class Client extends Dispatcher<ClientContext> {
354342
return resp ?? { jsonrpc: '2.0', id: r.id, error: { code: -32_601, message: 'Method not found' } };
355343
},
356344
onresponse: r => {
357-
const mw = composeOutboundMiddleware(this._outboundMw);
358-
const consumed = mw.response?.(r, Number(r.id))?.consumed ?? false;
345+
const consumed = this._taskManager.processInboundResponse(r, Number(r.id)).consumed;
359346
if (!consumed) this.onerror?.(new Error(`Unmatched response on standalone stream: ${JSON.stringify(r)}`));
360347
}
361348
});
@@ -699,8 +686,7 @@ export class Client extends Dispatcher<ClientContext> {
699686
onresumptiontoken: options?.onresumptiontoken,
700687
onnotification: n => void this.dispatchNotification(n).catch(error => this.onerror?.(error)),
701688
onresponse: r => {
702-
const mw = composeOutboundMiddleware(this._outboundMw);
703-
const consumed = mw.response?.(r, Number(r.id))?.consumed ?? false;
689+
const consumed = this._taskManager.processInboundResponse(r, Number(r.id)).consumed;
704690
if (!consumed) this.onerror?.(new Error(`Unmatched response on stream: ${JSON.stringify(r)}`));
705691
},
706692
onrequest: async r => {

packages/core/src/shared/context.ts

Lines changed: 0 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,7 @@ import type {
77
ElicitRequestFormParams,
88
ElicitRequestURLParams,
99
ElicitResult,
10-
JSONRPCErrorResponse,
1110
JSONRPCMessage,
12-
JSONRPCNotification,
13-
JSONRPCRequest,
14-
JSONRPCResponse,
15-
JSONRPCResultResponse,
1611
LoggingLevel,
1712
Notification,
1813
Progress,
@@ -194,76 +189,6 @@ export interface Outbound {
194189
sendRaw?(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void>;
195190
}
196191

197-
/**
198-
* Middleware around the request-correlation seam of an {@linkcode Outbound}.
199-
* Registered via `useOutbound()` on {@linkcode McpServer} / {@linkcode Client};
200-
* the transport adapter (e.g. {@linkcode StreamDriver}) calls each hook without
201-
* knowing why a message is queued or consumed.
202-
*
203-
* Unlike {@linkcode DispatchMiddleware} (a single function wrapping `next`), the
204-
* outbound seam has four distinct call sites, so this is a record of optional hooks.
205-
* Multiple middleware are composed with {@linkcode composeOutboundMiddleware} —
206-
* first to claim wins for `request`/`notification`/`response`; all `close` run.
207-
*/
208-
export interface OutboundMiddleware {
209-
/** Called before each outbound request hits the wire. Return `queued: true` to suppress the send (caller resolves via `settle`). */
210-
request?(
211-
jr: JSONRPCRequest,
212-
options: RequestOptions | undefined,
213-
messageId: number,
214-
settle: (r: JSONRPCResultResponse | Error) => void,
215-
reject: (e: unknown) => void
216-
): { queued: boolean };
217-
/** Called before each outbound notification. May suppress and/or rewrite. */
218-
notification?(
219-
n: Notification,
220-
options: NotificationOptions | undefined
221-
): Promise<{ queued: boolean; jsonrpcNotification?: JSONRPCNotification }>;
222-
/** Called for each inbound response before correlation. `consumed: true` swallows it. */
223-
response?(r: JSONRPCResponse | JSONRPCErrorResponse, messageId: number): { consumed: boolean; preserveProgress?: boolean };
224-
/** Called on connection close. */
225-
close?(): void;
226-
}
227-
228-
/**
229-
* Composes a list of {@linkcode OutboundMiddleware} into one, registration-order.
230-
* For `request`/`notification`/`response` the first middleware to claim (queued/consumed)
231-
* short-circuits the rest; `close` runs all.
232-
*/
233-
export function composeOutboundMiddleware(mws: OutboundMiddleware[]): OutboundMiddleware {
234-
if (mws.length <= 1) return mws[0] ?? {};
235-
return {
236-
request(jr, opts, id, settle, reject) {
237-
for (const mw of mws) {
238-
const r = mw.request?.(jr, opts, id, settle, reject);
239-
if (r?.queued) return r;
240-
}
241-
return { queued: false };
242-
},
243-
async notification(n, opts) {
244-
let rewritten: JSONRPCNotification | undefined;
245-
for (const mw of mws) {
246-
const r = await mw.notification?.(n, opts);
247-
if (r?.queued) return r;
248-
if (r?.jsonrpcNotification) rewritten = r.jsonrpcNotification;
249-
}
250-
return { queued: false, jsonrpcNotification: rewritten };
251-
},
252-
response(r, id) {
253-
let preserveProgress = false;
254-
for (const mw of mws) {
255-
const out = mw.response?.(r, id);
256-
if (out?.consumed) return out;
257-
if (out?.preserveProgress) preserveProgress = true;
258-
}
259-
return { consumed: false, preserveProgress };
260-
},
261-
close() {
262-
for (const mw of mws) mw.close?.();
263-
}
264-
};
265-
}
266-
267192
/**
268193
* Base context provided to all request handlers.
269194
*/

packages/core/src/shared/protocol.ts

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,7 @@ import type {
2020
} from '../types/index.js';
2121
import { getResultSchema, SUPPORTED_PROTOCOL_VERSIONS } from '../types/index.js';
2222
import type { AnySchema, SchemaOutput } from '../util/schema.js';
23-
import type {
24-
BaseContext,
25-
NotificationOptions,
26-
Outbound,
27-
OutboundMiddleware,
28-
ProtocolOptions,
29-
RequestEnv,
30-
RequestOptions
31-
} from './context.js';
23+
import type { BaseContext, NotificationOptions, Outbound, ProtocolOptions, RequestEnv, RequestOptions } from './context.js';
3224
import type { DispatchMiddleware } from './dispatcher.js';
3325
import { Dispatcher } from './dispatcher.js';
3426
import { StreamDriver } from './streamDriver.js';
@@ -73,31 +65,23 @@ export abstract class Protocol<ContextT extends BaseContext> {
7365
})();
7466
this._supportedProtocolVersions = _options?.supportedProtocolVersions ?? SUPPORTED_PROTOCOL_VERSIONS;
7567
this._ownTaskManager = _options?.tasks ? new TaskManager(_options.tasks) : new NullTaskManager();
76-
const omw = this._ownTaskManager.attachTo(this._dispatcher, {
68+
this._ownTaskManager.attachTo(this._dispatcher, {
7769
channel: () => this._outbound,
7870
reportError: e => this.onerror?.(e),
7971
enforceStrictCapabilities: this._options?.enforceStrictCapabilities === true,
8072
assertTaskCapability: m => this.assertTaskCapability(m),
8173
assertTaskHandlerCapability: m => this.assertTaskHandlerCapability(m)
8274
});
83-
this._outboundMw.push(omw);
8475
}
8576

8677
private readonly _ownTaskManager: TaskManager;
87-
private readonly _outboundMw: OutboundMiddleware[] = [];
8878

8979
/** Register a {@linkcode DispatchMiddleware} on the inner dispatcher. */
9080
use(mw: DispatchMiddleware): this {
9181
this._dispatcher.use(mw);
9282
return this;
9383
}
9484

95-
/** Register an {@linkcode OutboundMiddleware} applied at the request-correlation seam. */
96-
useOutbound(mw: OutboundMiddleware): this {
97-
this._outboundMw.push(mw);
98-
return this;
99-
}
100-
10185
// ───────────────────────────────────────────────────────────────────────
10286
// Subclass hooks (v1 signatures)
10387
// ───────────────────────────────────────────────────────────────────────
@@ -179,7 +163,7 @@ export abstract class Protocol<ContextT extends BaseContext> {
179163
supportedProtocolVersions: this._supportedProtocolVersions,
180164
debouncedNotificationMethods: this._options?.debouncedNotificationMethods,
181165
buildEnv: (extra, base) => ({ ...base, _transportExtra: extra }),
182-
outboundMw: this._outboundMw
166+
taskManager: this._ownTaskManager
183167
});
184168
this._outbound = driver;
185169
driver.onclose = () => {

packages/core/src/shared/streamDriver.ts

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ import {
2525
} from '../types/index.js';
2626
import type { AnySchema, SchemaOutput } from '../util/schema.js';
2727
import { parseSchema } from '../util/schema.js';
28-
import type { NotificationOptions, Outbound, OutboundMiddleware, ProgressCallback, RequestEnv, RequestOptions } from './context.js';
29-
import { composeOutboundMiddleware, DEFAULT_REQUEST_TIMEOUT_MSEC } from './context.js';
28+
import type { NotificationOptions, Outbound, ProgressCallback, RequestEnv, RequestOptions } from './context.js';
29+
import { DEFAULT_REQUEST_TIMEOUT_MSEC } from './context.js';
3030
import type { Dispatcher } from './dispatcher.js';
31+
import type { TaskManager } from './taskManager.js';
3132
import type { AttachOptions, Transport } from './transport.js';
3233

3334
type TimeoutInfo = {
@@ -48,10 +49,12 @@ export type StreamDriverOptions = {
4849
*/
4950
buildEnv?: (extra: MessageExtraInfo | undefined, base: RequestEnv) => RequestEnv;
5051
/**
51-
* {@linkcode OutboundMiddleware} hooks invoked at the request-correlation seam.
52-
* Composed in registration order via {@linkcode composeOutboundMiddleware}.
52+
* Optional {@linkcode TaskManager}. When provided, the driver calls its
53+
* `processOutbound*` / `processInboundResponse` / `onClose` hooks at the
54+
* request-correlation seam. Inbound task processing happens via the
55+
* TaskManager's dispatch middleware (registered by the caller), not here.
5356
*/
54-
outboundMw?: OutboundMiddleware[];
57+
taskManager?: TaskManager;
5558
};
5659

5760
/**
@@ -70,7 +73,7 @@ export class StreamDriver implements Outbound {
7073
private _pendingDebouncedNotifications = new Set<string>();
7174
private _closed = false;
7275
private _supportedProtocolVersions: string[];
73-
private _mw: OutboundMiddleware;
76+
private _taskManager?: TaskManager;
7477

7578
onclose?: () => void;
7679
onerror?: (error: Error) => void;
@@ -82,7 +85,7 @@ export class StreamDriver implements Outbound {
8285
private _options: StreamDriverOptions = {}
8386
) {
8487
this._supportedProtocolVersions = _options.supportedProtocolVersions ?? SUPPORTED_PROTOCOL_VERSIONS;
85-
this._mw = composeOutboundMiddleware(_options.outboundMw ?? []);
88+
this._taskManager = _options.taskManager;
8689
}
8790

8891
/** {@linkcode Outbound.removeProgressHandler}. */
@@ -206,14 +209,14 @@ export class StreamDriver implements Outbound {
206209
);
207210

208211
let queued = false;
209-
if (this._mw.request) {
212+
if (this._taskManager) {
210213
const sideChannelResponse = (resp: JSONRPCResultResponse | Error) => {
211214
const h = this._responseHandlers.get(messageId);
212215
if (h) h(resp);
213216
else this._onerror(new Error(`Response handler missing for side-channeled request ${messageId}`));
214217
};
215218
try {
216-
queued = this._mw.request(jsonrpcRequest, options, messageId, sideChannelResponse, error => {
219+
queued = this._taskManager.processOutboundRequest(jsonrpcRequest, options, messageId, sideChannelResponse, error => {
217220
this._progressHandlers.delete(messageId);
218221
reject(error);
219222
}).queued;
@@ -243,9 +246,9 @@ export class StreamDriver implements Outbound {
243246
* Sends a notification over the pipe. Supports debouncing per the constructor option.
244247
*/
245248
async notification(notification: Notification, options?: NotificationOptions): Promise<void> {
246-
const intercepted = await this._mw.notification?.(notification, options);
247-
if (intercepted?.queued || this._closed) return;
248-
const jsonrpc: JSONRPCNotification = intercepted?.jsonrpcNotification ?? {
249+
const taskResult = await this._taskManager?.processOutboundNotification(notification, options);
250+
if (taskResult?.queued || this._closed) return;
251+
const jsonrpc: JSONRPCNotification = taskResult?.jsonrpcNotification ?? {
249252
jsonrpc: '2.0',
250253
method: notification.method,
251254
params: notification.params
@@ -342,8 +345,8 @@ export class StreamDriver implements Outbound {
342345

343346
private _onresponse(response: JSONRPCResponse | JSONRPCErrorResponse): void {
344347
const messageId = Number(response.id);
345-
const intercepted = this._mw.response?.(response, messageId);
346-
if (intercepted?.consumed) return;
348+
const taskResult = this._taskManager?.processInboundResponse(response, messageId);
349+
if (taskResult?.consumed) return;
347350

348351
const handler = this._responseHandlers.get(messageId);
349352
if (handler === undefined) {
@@ -352,7 +355,7 @@ export class StreamDriver implements Outbound {
352355
}
353356
this._responseHandlers.delete(messageId);
354357
this._cleanupTimeout(messageId);
355-
if (!intercepted?.preserveProgress) {
358+
if (!taskResult?.preserveProgress) {
356359
this._progressHandlers.delete(messageId);
357360
}
358361
if (isJSONRPCResultResponse(response)) {
@@ -367,7 +370,7 @@ export class StreamDriver implements Outbound {
367370
const responseHandlers = this._responseHandlers;
368371
this._responseHandlers = new Map();
369372
this._progressHandlers.clear();
370-
this._mw.close?.();
373+
this._taskManager?.onClose();
371374
this._pendingDebouncedNotifications.clear();
372375
for (const info of this._timeoutInfo.values()) clearTimeout(info.timeoutId);
373376
this._timeoutInfo.clear();
@@ -436,7 +439,7 @@ export async function attachChannelTransport(
436439
const driver = new StreamDriver(dispatcher, pipe, {
437440
supportedProtocolVersions: options?.supportedProtocolVersions,
438441
debouncedNotificationMethods: options?.debouncedNotificationMethods,
439-
outboundMw: options?.outboundMw,
442+
taskManager: options?.taskManager,
440443
buildEnv: options?.buildEnv
441444
});
442445
if (options?.onclose || options?.onerror) {

0 commit comments

Comments
 (0)