Skip to content

Commit 94c5403

Browse files
refactor(core): generic dispatch/outbound middleware; Tasks as opt-in middleware
Dispatcher gains `use(mw: DispatchMiddleware)` and McpServer/Client/Protocol gain `useOutbound(mw: OutboundMiddleware)`. TaskManager is no longer a bound host vtable: `attachTo(d, hooks)` registers itself via `d.use()`, installs the `tasks/*` handlers, and returns the OutboundMiddleware the caller registers. McpServer/StreamDriver/Client no longer import TaskManager-specific types; the `_dispatchYielders`/`_dispatchOutboundId` state and the McpServer dispatch override are gone (the per-dispatch sideQueue lives inside the middleware via `TaskContext.sendOnResponseStream`). Renames (no aliases; rebuild-only names): `OutboundChannel`->`Outbound`, `OutboundInterceptor`->`OutboundMiddleware`, `attachPipeTransport`-> `attachChannelTransport`, `isPipeTransport`->`isChannelTransport`, `pipeAsClientTransport`->`channelAsClientTransport`. `DispatchEnv` moved to context.ts and renamed `RequestEnv` (deprecated alias kept) so transport.ts no longer type-imports from dispatcher.ts.
1 parent 1a658e2 commit 94c5403

16 files changed

Lines changed: 531 additions & 622 deletions

File tree

packages/client/src/client/client.ts

Lines changed: 37 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,10 @@ import type {
1111
ClientResult,
1212
CompleteRequest,
1313
CreateTaskResult,
14-
DispatchEnv,
15-
DispatchOutput,
1614
GetPromptRequest,
1715
GetTaskRequest,
1816
GetTaskResult,
1917
Implementation,
20-
InboundContext,
2118
JSONRPCErrorResponse,
2219
JSONRPCRequest,
2320
JSONRPCResultResponse,
@@ -35,6 +32,7 @@ import type {
3532
Notification,
3633
NotificationMethod,
3734
NotificationOptions,
35+
OutboundMiddleware,
3836
ProtocolOptions,
3937
ReadResourceRequest,
4038
Request,
@@ -48,7 +46,6 @@ import type {
4846
StandardSchemaV1,
4947
StreamDriverOptions,
5048
SubscribeRequest,
51-
TaskManagerHost,
5249
TaskManagerOptions,
5350
Tool,
5451
Transport,
@@ -58,6 +55,7 @@ import {
5855
CallToolResultSchema,
5956
CancelTaskResultSchema,
6057
CompleteResultSchema,
58+
composeOutboundMiddleware,
6159
CreateMessageRequestSchema,
6260
CreateMessageResultSchema,
6361
CreateMessageResultWithToolsSchema,
@@ -93,7 +91,7 @@ import {
9391

9492
import { ExperimentalClientTasks } from '../experimental/tasks/client.js';
9593
import type { ClientFetchOptions, ClientTransport } from './clientTransport.js';
96-
import { isJSONRPCErrorResponse, isPipeTransport, pipeAsClientTransport } from './clientTransport.js';
94+
import { channelAsClientTransport, isChannelTransport, isJSONRPCErrorResponse } from './clientTransport.js';
9795

9896
/**
9997
* Elicitation default application helper. Applies defaults to the `data` based on the `schema`.
@@ -231,8 +229,8 @@ export class Client extends Dispatcher<ClientContext> {
231229
private _pendingListChangedConfig?: ListChangedHandlers;
232230
private _experimental?: { tasks: ExperimentalClientTasks };
233231
private _listChangedDebounceTimers: Map<string, ReturnType<typeof setTimeout>> = new Map();
234-
private _tasksOptions?: TaskManagerOptions;
235-
private _taskManager?: TaskManager;
232+
private _taskManager: TaskManager;
233+
private readonly _outboundMw: OutboundMiddleware[] = [];
236234

237235
onclose?: () => void;
238236
onerror?: (error: Error) => void;
@@ -248,7 +246,25 @@ export class Client extends Dispatcher<ClientContext> {
248246
this._enforceStrictCapabilities = _options?.enforceStrictCapabilities ?? false;
249247
this._mrtrMaxRounds = _options?.mrtrMaxRounds ?? DEFAULT_MRTR_MAX_ROUNDS;
250248
this._pendingListChangedConfig = _options?.listChanged;
251-
this._tasksOptions = extractTaskManagerOptions(_options?.capabilities?.tasks);
249+
250+
const tasksOpts = extractTaskManagerOptions(_options?.capabilities?.tasks);
251+
this._taskManager = tasksOpts ? new TaskManager(tasksOpts) : new NullTaskManager();
252+
const tasksOutbound = this._taskManager.attachTo(this, {
253+
channel: () =>
254+
this._ct
255+
? {
256+
request: (r, schema, opts) => this._request(r, schema, opts),
257+
notification: (n, opts) => this.notification(n, opts),
258+
close: () => this.close(),
259+
removeProgressHandler: t => this._ct?.driver?.removeProgressHandler(t)
260+
}
261+
: undefined,
262+
reportError: e => this.onerror?.(e),
263+
enforceStrictCapabilities: this._enforceStrictCapabilities,
264+
assertTaskCapability: () => {},
265+
assertTaskHandlerCapability: () => {}
266+
});
267+
this.useOutbound(tasksOutbound);
252268

253269
// Strip runtime-only fields from advertised capabilities
254270
if (_options?.capabilities?.tasks) {
@@ -262,79 +278,28 @@ export class Client extends Dispatcher<ClientContext> {
262278
}
263279

264280
/**
265-
* Task-aware dispatch for inbound server-initiated requests (sampling,
266-
* elicitation, roots, ping). Threads {@linkcode TaskManager.processInboundRequest}
267-
* so task-augmented requests and queueing work over both pipe and request-shaped paths.
281+
* Register an {@linkcode OutboundMiddleware} applied at the request-correlation seam.
268282
*/
269-
override async *dispatch(request: JSONRPCRequest, env: DispatchEnv = {}): AsyncGenerator<DispatchOutput, void, void> {
270-
const tm = this._taskManager;
271-
if (!tm) {
272-
yield* super.dispatch(request, env);
273-
return;
274-
}
275-
const inboundCtx: InboundContext = {
276-
sessionId: env.sessionId,
277-
sendNotification: (n, opts) => this.notification(n, { ...opts, relatedRequestId: request.id }),
278-
sendRequest: (r, schema, opts) => this._request(r, schema, { ...opts, relatedRequestId: request.id })
279-
};
280-
const tr = tm.processInboundRequest(request, inboundCtx);
281-
if (tr.validateInbound) {
282-
try {
283-
tr.validateInbound();
284-
} catch (error) {
285-
const e = error as { code?: number; message?: string; data?: unknown };
286-
yield {
287-
kind: 'response',
288-
message: {
289-
jsonrpc: '2.0',
290-
id: request.id,
291-
error: {
292-
code: Number.isSafeInteger(e?.code) ? (e.code as number) : ProtocolErrorCode.InternalError,
293-
message: e?.message ?? 'Internal error',
294-
...(e?.data !== undefined && { data: e.data })
295-
}
296-
}
297-
};
298-
return;
299-
}
300-
}
301-
const taskEnv: DispatchEnv = {
302-
...env,
303-
task: tr.taskContext ?? env.task,
304-
send: (r, opts) => tr.sendRequest(r, getResultSchema(r.method as RequestMethod), opts) as Promise<Result>
305-
};
306-
for await (const out of super.dispatch(request, taskEnv)) {
307-
if (out.kind === 'response') {
308-
const routed = await tr.routeResponse(out.message);
309-
if (!routed) yield out;
310-
} else {
311-
await tr.sendNotification({ method: out.message.method, params: out.message.params });
312-
}
313-
}
283+
useOutbound(mw: OutboundMiddleware): this {
284+
this._outboundMw.push(mw);
285+
return this;
314286
}
315287

316288
/**
317289
* Connects to a server. Accepts either a {@linkcode ClientTransport}
318290
* (2026-06-native, request-shaped) or a legacy pipe {@linkcode Transport}
319291
* (stdio, SSE, the v1 SHTTP class). Pipe transports are adapted via
320-
* {@linkcode pipeAsClientTransport} and the 2025-11 initialize handshake
292+
* {@linkcode channelAsClientTransport} and the 2025-11 initialize handshake
321293
* is performed.
322294
*/
323295
async connect(transport: Transport | ClientTransport, options?: RequestOptions): Promise<void> {
324-
this._bindTaskManager();
325-
if (isPipeTransport(transport)) {
326-
const tm = this._taskManager!;
296+
if (isChannelTransport(transport)) {
327297
const driverOpts: StreamDriverOptions = {
328298
supportedProtocolVersions: this._supportedProtocolVersions,
329299
debouncedNotificationMethods: this._options?.debouncedNotificationMethods,
330-
interceptor: {
331-
request: (jr, opts, id, settle, reject) => tm.processOutboundRequest(jr, opts, id, settle, reject),
332-
notification: (n, opts) => tm.processOutboundNotification(n, opts),
333-
response: (r, id) => tm.processInboundResponse(r, id),
334-
close: () => tm.onClose()
335-
}
300+
outboundMw: this._outboundMw
336301
};
337-
this._ct = pipeAsClientTransport(transport, this, driverOpts);
302+
this._ct = channelAsClientTransport(transport, this, driverOpts);
338303
this._ct.driver!.onclose = () => this.onclose?.();
339304
this._ct.driver!.onerror = e => this.onerror?.(e);
340305
const skipInit = transport.sessionId !== undefined;
@@ -389,7 +354,8 @@ export class Client extends Dispatcher<ClientContext> {
389354
return resp ?? { jsonrpc: '2.0', id: r.id, error: { code: -32_601, message: 'Method not found' } };
390355
},
391356
onresponse: r => {
392-
const consumed = this.taskManager.processInboundResponse(r, Number(r.id)).consumed;
357+
const mw = composeOutboundMiddleware(this._outboundMw);
358+
const consumed = mw.response?.(r, Number(r.id))?.consumed ?? false;
393359
if (!consumed) this.onerror?.(new Error(`Unmatched response on standalone stream: ${JSON.stringify(r)}`));
394360
}
395361
});
@@ -402,30 +368,6 @@ export class Client extends Dispatcher<ClientContext> {
402368
})();
403369
}
404370

405-
/**
406-
* Construct and bind this client's {@linkcode TaskManager}. Owned by the client
407-
* (not the transport adapter); the pipe-shaped path threads it via
408-
* {@linkcode StreamDriverOptions.interceptor}.
409-
*/
410-
private _bindTaskManager(): void {
411-
const tm = this._tasksOptions ? new TaskManager(this._tasksOptions) : new NullTaskManager();
412-
const host: TaskManagerHost = {
413-
request: (r, schema, opts) => this._request(r, schema, opts),
414-
notification: (n, opts) => this.notification(n, opts),
415-
reportError: e => this.onerror?.(e),
416-
removeProgressHandler: t => this._ct?.driver?.removeProgressHandler(t),
417-
registerHandler: (method, handler) => this.setRawRequestHandler(method, handler),
418-
sendOnResponseStream: async () => {
419-
throw new SdkError(SdkErrorCode.NotConnected, 'sendOnResponseStream is server-side only');
420-
},
421-
enforceStrictCapabilities: this._enforceStrictCapabilities,
422-
assertTaskCapability: () => {},
423-
assertTaskHandlerCapability: () => {}
424-
};
425-
tm.bind(host);
426-
this._taskManager = tm;
427-
}
428-
429371
async close(): Promise<void> {
430372
const ct = this._ct;
431373
this._ct = undefined;
@@ -660,9 +602,6 @@ export class Client extends Dispatcher<ClientContext> {
660602
* This client's {@linkcode TaskManager}. Owned here (not by the transport adapter).
661603
*/
662604
get taskManager(): TaskManager {
663-
if (!this._taskManager) {
664-
throw new SdkError(SdkErrorCode.NotConnected, 'taskManager is unavailable: call connect() first.');
665-
}
666605
return this._taskManager;
667606
}
668607

@@ -760,7 +699,8 @@ export class Client extends Dispatcher<ClientContext> {
760699
onresumptiontoken: options?.onresumptiontoken,
761700
onnotification: n => void this.dispatchNotification(n).catch(error => this.onerror?.(error)),
762701
onresponse: r => {
763-
const consumed = this.taskManager.processInboundResponse(r, Number(r.id)).consumed;
702+
const mw = composeOutboundMiddleware(this._outboundMw);
703+
const consumed = mw.response?.(r, Number(r.id))?.consumed ?? false;
764704
if (!consumed) this.onerror?.(new Error(`Unmatched response on stream: ${JSON.stringify(r)}`));
765705
},
766706
onrequest: async r => {
@@ -1099,4 +1039,4 @@ function formatErr(e: unknown): string {
10991039
}
11001040

11011041
export type { ClientFetchOptions, ClientTransport } from './clientTransport.js';
1102-
export { isPipeTransport, pipeAsClientTransport } from './clientTransport.js';
1042+
export { channelAsClientTransport, isChannelTransport } from './clientTransport.js';

packages/client/src/client/clientTransport.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ export type ClientFetchOptions = {
6262
* version) but the contract is per-call.
6363
*
6464
* This is the 2026-06-native shape. The legacy pipe {@linkcode Transport}
65-
* interface is adapted via {@linkcode pipeAsClientTransport}.
65+
* interface is adapted via {@linkcode channelAsClientTransport}.
6666
*/
6767
export interface ClientTransport {
6868
/**
@@ -103,7 +103,7 @@ export interface ClientTransport {
103103
* {@linkcode ClientTransport} so {@linkcode Client.connect} uses the
104104
* request-shaped path.
105105
*/
106-
export function isPipeTransport(t: Transport | ClientTransport): t is Transport {
106+
export function isChannelTransport(t: Transport | ClientTransport): t is Transport {
107107
if (typeof (t as ClientTransport).fetch === 'function') return false;
108108
return typeof (t as Transport).start === 'function' && typeof (t as Transport).send === 'function';
109109
}
@@ -117,7 +117,7 @@ export function isPipeTransport(t: Transport | ClientTransport): t is Transport
117117
* server-initiated requests (sampling, elicitation, roots) that arrive on the pipe.
118118
*/
119119
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- adapter is context-agnostic; the caller's Dispatcher subclass owns ContextT
120-
export function pipeAsClientTransport(pipe: Transport, dispatcher: Dispatcher<any>, options?: StreamDriverOptions): ClientTransport {
120+
export function channelAsClientTransport(pipe: Transport, dispatcher: Dispatcher<any>, options?: StreamDriverOptions): ClientTransport {
121121
const driver = new StreamDriver(dispatcher, pipe, options);
122122
let started = false;
123123
const subscribers: Set<(n: JSONRPCNotification) => void> = new Set();

packages/client/test/client/client.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { InMemoryTransport, LATEST_PROTOCOL_VERSION, ProtocolError, ProtocolErro
1111
import { describe, expect, it, vi } from 'vitest';
1212

1313
import type { ClientFetchOptions, ClientTransport } from '../../src/client/clientTransport.js';
14-
import { isPipeTransport } from '../../src/client/clientTransport.js';
14+
import { isChannelTransport } from '../../src/client/clientTransport.js';
1515
import { Client } from '../../src/client/client.js';
1616

1717
type FetchResp = JSONRPCResultResponse | JSONRPCErrorResponse;
@@ -79,11 +79,11 @@ describe('Client (V2)', () => {
7979
expect(c.getServerVersion()?.name).toBe('d');
8080
});
8181

82-
it('isPipeTransport correctly distinguishes the two shapes', () => {
82+
it('isChannelTransport correctly distinguishes the two shapes', () => {
8383
const [a] = InMemoryTransport.createLinkedPair();
8484
const { ct } = mockTransport(r => ok(r.id, {}));
85-
expect(isPipeTransport(a)).toBe(true);
86-
expect(isPipeTransport(ct)).toBe(false);
85+
expect(isChannelTransport(a)).toBe(true);
86+
expect(isChannelTransport(ct)).toBe(false);
8787
});
8888
});
8989

packages/core/src/index.ts

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,7 @@ export * from './shared/protocol.js';
88
export * from './shared/responseMessage.js';
99
export * from './shared/stdio.js';
1010
export * from './shared/streamDriver.js';
11-
export type {
12-
InboundContext,
13-
InboundResult,
14-
RequestTaskStore,
15-
TaskContext,
16-
TaskManagerHost,
17-
TaskManagerOptions,
18-
TaskRequestOptions
19-
} from './shared/taskManager.js';
11+
export type { RequestTaskStore, TaskAttachHooks, TaskContext, TaskManagerOptions, TaskRequestOptions } from './shared/taskManager.js';
2012
export { extractTaskManagerOptions, NullTaskManager, TaskManager } from './shared/taskManager.js';
2113
export * from './shared/toolNameValidation.js';
2214
export * from './shared/transport.js';

0 commit comments

Comments
 (0)