From 9b85a8221286ba464386d5f642d39b6978341e80 Mon Sep 17 00:00:00 2001 From: theomonnom Date: Mon, 11 May 2026 16:25:34 +0000 Subject: [PATCH 1/5] fix(agents): require session management grant for remote sessions --- agents/src/voice/index.ts | 1 + agents/src/voice/remote_session.ts | 278 ++++++++++++++++++----------- 2 files changed, 179 insertions(+), 100 deletions(-) diff --git a/agents/src/voice/index.ts b/agents/src/voice/index.ts index 808ac88c1..a8f826385 100644 --- a/agents/src/voice/index.ts +++ b/agents/src/voice/index.ts @@ -12,6 +12,7 @@ export { export * from './avatar/index.js'; export * from './background_audio.js'; export { + type IncomingMessage, type TextInputCallback, type TextInputEvent, RemoteSession, diff --git a/agents/src/voice/remote_session.ts b/agents/src/voice/remote_session.ts index b8cb359a8..476c243a7 100644 --- a/agents/src/voice/remote_session.ts +++ b/agents/src/voice/remote_session.ts @@ -83,27 +83,29 @@ export type RemoteSessionCallbacks = { export abstract class SessionTransport { async start(): Promise {} - abstract sendMessage(msg: pb.AgentSessionMessage): Promise; + abstract sendMessage( + msg: pb.AgentSessionMessage, + opts?: { destinationIdentity?: string }, + ): Promise; abstract close(): Promise; - abstract [Symbol.asyncIterator](): AsyncIterator; + abstract [Symbol.asyncIterator](): AsyncIterator; +} + +export interface IncomingMessage { + message: pb.AgentSessionMessage; + senderIdentity?: string; } export class RoomSessionTransport extends SessionTransport { private readonly room: Room; private handlerRegistered = false; private closed = false; - private pendingMessages: pb.AgentSessionMessage[] = []; - private waitingResolve: ((value: IteratorResult) => void) | null = null; - private roomIO: RoomIO; + private pendingMessages: IncomingMessage[] = []; + private waitingResolve: ((value: IteratorResult) => void) | null = null; - constructor(room: Room, roomIO: RoomIO) { + constructor(room: Room, _roomIO?: RoomIO) { super(); this.room = room; - this.roomIO = roomIO; - } - - private getRemoteIdentity() { - return this.roomIO.linkedParticipant?.identity; } override async start(): Promise { @@ -113,15 +115,35 @@ export class RoomSessionTransport extends SessionTransport { } private onByteStream = (reader: ByteStreamReader, participantInfo: { identity: string }) => { - if (this.getRemoteIdentity() && participantInfo.identity !== this.getRemoteIdentity()) { + if (!this.canManage(participantInfo.identity)) { + log().debug( + { participant: participantInfo.identity }, + 'ignoring session message from participant without canManageAgentSession grant', + ); return; } - this.readStream(reader).catch((e) => { + this.readStream(reader, participantInfo.identity).catch((e) => { log().warn({ error: e }, 'failed to read binary stream message'); }); }; - private async readStream(reader: ByteStreamReader): Promise { + private canManage(identity: string): boolean { + return ( + this.room.remoteParticipants.get(identity)?.info.permission?.canManageAgentSession === true + ); + } + + private authorizedIdentities(): string[] { + const identities: string[] = []; + for (const [identity, participant] of this.room.remoteParticipants.entries()) { + if (participant.info.permission?.canManageAgentSession === true) { + identities.push(identity); + } + } + return identities; + } + + private async readStream(reader: ByteStreamReader, senderIdentity: string): Promise { try { const chunks = await reader.readAll(); let totalLength = 0; @@ -135,7 +157,7 @@ export class RoomSessionTransport extends SessionTransport { offset += chunk.length; } const msg = pb.AgentSessionMessage.fromBinary(data); - this.enqueue(msg); + this.enqueue({ message: msg, senderIdentity }); } catch (e) { if (!this.closed) { log().warn({ error: e }, 'failed to parse binary stream message'); @@ -143,20 +165,27 @@ export class RoomSessionTransport extends SessionTransport { } } - override async sendMessage(msg: pb.AgentSessionMessage): Promise { + override async sendMessage( + msg: pb.AgentSessionMessage, + opts: { destinationIdentity?: string } = {}, + ): Promise { if (this.closed || !this.room.isConnected) return; + const destinationIdentities = opts.destinationIdentity + ? this.canManage(opts.destinationIdentity) + ? [opts.destinationIdentity] + : [] + : this.authorizedIdentities(); + if (destinationIdentities.length === 0) return; + try { const data = msg.toBinary(); - const opts: Record = { + const streamOpts: Record = { topic: TOPIC_SESSION_MESSAGES, name: shortuuid('AS_'), + destinationIdentities, }; - const remoteIdentity = this.getRemoteIdentity(); - if (remoteIdentity) { - opts.destinationIdentities = [remoteIdentity]; - } - const writer = await this.room.localParticipant!.streamBytes(opts); + const writer = await this.room.localParticipant!.streamBytes(streamOpts); await writer.write(new Uint8Array(data)); await writer.close(); } catch (e) { @@ -179,14 +208,14 @@ export class RoomSessionTransport extends SessionTransport { if (this.waitingResolve) { this.waitingResolve({ - value: undefined as unknown as pb.AgentSessionMessage, + value: undefined as unknown as IncomingMessage, done: true, }); this.waitingResolve = null; } } - private enqueue(msg: pb.AgentSessionMessage): void { + private enqueue(msg: IncomingMessage): void { if (this.closed) return; if (this.waitingResolve) { @@ -198,12 +227,12 @@ export class RoomSessionTransport extends SessionTransport { } } - override [Symbol.asyncIterator](): AsyncIterator { + override [Symbol.asyncIterator](): AsyncIterator { return { - next: (): Promise> => { + next: (): Promise> => { if (this.closed && this.pendingMessages.length === 0) { return ThrowsPromise.resolve({ - value: undefined as unknown as pb.AgentSessionMessage, + value: undefined as unknown as IncomingMessage, done: true, }); } @@ -213,14 +242,14 @@ export class RoomSessionTransport extends SessionTransport { return ThrowsPromise.resolve({ value: pending, done: false }); } - return new ThrowsPromise, never>((resolve) => { + return new ThrowsPromise, never>((resolve) => { this.waitingResolve = resolve; }); }, - return: (): Promise> => { + return: (): Promise> => { this.close(); return ThrowsPromise.resolve({ - value: undefined as unknown as pb.AgentSessionMessage, + value: undefined as unknown as IncomingMessage, done: true, }); }, @@ -559,12 +588,11 @@ export class SessionHost { private async recvLoop(): Promise { try { - for await (const msg of this.transport) { + for await (const incoming of this.transport) { + const msg = incoming.message; if (msg.message.case === 'request') { if (this.session) { - this.trackTask( - Task.from(async () => this.handleRequestSafe(msg.message.value as pb.SessionRequest)), - ); + this.trackTask(Task.from(async () => this.handleRequestSafe(incoming))); } } } @@ -726,9 +754,10 @@ export class SessionHost { }); } - private async handleRequestSafe(req: pb.SessionRequest): Promise { + private async handleRequestSafe(incoming: IncomingMessage): Promise { + const req = incoming.message.message.value as pb.SessionRequest; try { - await this.handleRequest(req); + await this.handleRequest(req, incoming.senderIdentity); } catch (e) { log().warn({ error: e, requestId: req.requestId }, 'error handling session request'); try { @@ -741,75 +770,104 @@ export class SessionHost { }), }, }); - await this.transport.sendMessage(resp); + await this.transport.sendMessage(resp, { destinationIdentity: incoming.senderIdentity }); } catch (e) { log().debug({ error: e }, 'failed to send error response'); } } } - private async handleRequest(req: pb.SessionRequest): Promise { + private async handleRequest(req: pb.SessionRequest, destinationIdentity?: string): Promise { if (!this.session) return; switch (req.request.case) { case 'ping': - return this.sendResponse(req.requestId, { - case: 'pong', - value: new pb.SessionResponse_Pong(), - }); + return this.sendResponse( + req.requestId, + { + case: 'pong', + value: new pb.SessionResponse_Pong(), + }, + undefined, + destinationIdentity, + ); case 'getChatHistory': - return this.handleGetChatHistory(req.requestId); + return this.handleGetChatHistory(req.requestId, destinationIdentity); case 'getAgentInfo': - return this.handleGetAgentInfo(req.requestId); + return this.handleGetAgentInfo(req.requestId, destinationIdentity); case 'runInput': - return this.handleRunInput(req.requestId, req.request.value); + return this.handleRunInput(req.requestId, req.request.value, destinationIdentity); case 'getSessionState': - return this.handleGetSessionState(req.requestId); + return this.handleGetSessionState(req.requestId, destinationIdentity); case 'getRtcStats': - return this.sendResponse(req.requestId, { - case: 'getRtcStats', - value: new pb.SessionResponse_GetRTCStatsResponse({ - publisherStats: [], - subscriberStats: [], - }), - }); + return this.sendResponse( + req.requestId, + { + case: 'getRtcStats', + value: new pb.SessionResponse_GetRTCStatsResponse({ + publisherStats: [], + subscriberStats: [], + }), + }, + undefined, + destinationIdentity, + ); case 'getSessionUsage': - return this.handleGetSessionUsage(req.requestId); + return this.handleGetSessionUsage(req.requestId, destinationIdentity); case 'getFrameworkInfo': - return this.sendResponse(req.requestId, { - case: 'getFrameworkInfo', - value: new pb.SessionResponse_GetFrameworkInfoResponse({ - sdk: 'js', - sdkVersion: version, - }), - }); + return this.sendResponse( + req.requestId, + { + case: 'getFrameworkInfo', + value: new pb.SessionResponse_GetFrameworkInfoResponse({ + sdk: 'js', + sdkVersion: version, + }), + }, + undefined, + destinationIdentity, + ); } } - private async handleGetChatHistory(requestId: string): Promise { + private async handleGetChatHistory( + requestId: string, + destinationIdentity?: string, + ): Promise { const items = chatItemsToProto(this.session!.history.items); - return this.sendResponse(requestId, { - case: 'getChatHistory', - value: new pb.SessionResponse_GetChatHistoryResponse({ items }), - }); + return this.sendResponse( + requestId, + { + case: 'getChatHistory', + value: new pb.SessionResponse_GetChatHistoryResponse({ items }), + }, + undefined, + destinationIdentity, + ); } - private async handleGetAgentInfo(requestId: string): Promise { + private async handleGetAgentInfo(requestId: string, destinationIdentity?: string): Promise { const agent = this.session!.currentAgent; - return this.sendResponse(requestId, { - case: 'getAgentInfo', - value: new pb.SessionResponse_GetAgentInfoResponse({ - id: agent.id, - instructions: agent.instructions, - tools: toolNames(agent.toolCtx), - chatCtx: chatItemsToProto(agent.chatCtx.items), - }), - }); + return this.sendResponse( + requestId, + { + case: 'getAgentInfo', + value: new pb.SessionResponse_GetAgentInfoResponse({ + id: agent.id, + instructions: agent.instructions, + tools: toolNames(agent.toolCtx), + chatCtx: chatItemsToProto(agent.chatCtx.items), + }), + }, + undefined, + destinationIdentity, + ); } private async handleRunInput( requestId: string, input: pb.SessionRequest_RunInput, + destinationIdentity?: string, ): Promise { const text = input.text; let items: pb.ChatContext_ChatItem[] = []; @@ -845,43 +903,61 @@ export class SessionHost { value: new pb.SessionResponse_RunInputResponse({ items }), }, error, + destinationIdentity, ); } - private async handleGetSessionState(requestId: string): Promise { + private async handleGetSessionState( + requestId: string, + destinationIdentity?: string, + ): Promise { const agent = this.session!.currentAgent; const startedAt = this.session!._startedAt ?? Date.now(); - return this.sendResponse(requestId, { - case: 'getSessionState', - value: new pb.SessionResponse_GetSessionStateResponse({ - agentState: AGENT_STATE_MAP[this.session!.agentState], - userState: USER_STATE_MAP[this.session!.userState], - agentId: agent.id, - options: protoSerializeOptions({ - turnHandling: this.session!.sessionOptions.turnHandling, - maxToolSteps: this.session!.sessionOptions.maxToolSteps, - userAwayTimeout: this.session!.sessionOptions.userAwayTimeout, - useTtsAlignedTranscript: this.session!.sessionOptions.useTtsAlignedTranscript, + return this.sendResponse( + requestId, + { + case: 'getSessionState', + value: new pb.SessionResponse_GetSessionStateResponse({ + agentState: AGENT_STATE_MAP[this.session!.agentState], + userState: USER_STATE_MAP[this.session!.userState], + agentId: agent.id, + options: protoSerializeOptions({ + turnHandling: this.session!.sessionOptions.turnHandling, + maxToolSteps: this.session!.sessionOptions.maxToolSteps, + userAwayTimeout: this.session!.sessionOptions.userAwayTimeout, + useTtsAlignedTranscript: this.session!.sessionOptions.useTtsAlignedTranscript, + }), + createdAt: msToTimestamp(startedAt), }), - createdAt: msToTimestamp(startedAt), - }), - }); + }, + undefined, + destinationIdentity, + ); } - private async handleGetSessionUsage(requestId: string): Promise { - return this.sendResponse(requestId, { - case: 'getSessionUsage', - value: new pb.SessionResponse_GetSessionUsageResponse({ - usage: sessionUsageToProto(this.session!.usage), - createdAt: nowTimestamp(), - }), - }); + private async handleGetSessionUsage( + requestId: string, + destinationIdentity?: string, + ): Promise { + return this.sendResponse( + requestId, + { + case: 'getSessionUsage', + value: new pb.SessionResponse_GetSessionUsageResponse({ + usage: sessionUsageToProto(this.session!.usage), + createdAt: nowTimestamp(), + }), + }, + undefined, + destinationIdentity, + ); } private async sendResponse( requestId: string, response: pb.SessionResponse['response'], error?: string, + destinationIdentity?: string, ): Promise { await this.transport.sendMessage( new pb.AgentSessionMessage({ @@ -890,6 +966,7 @@ export class SessionHost { value: new pb.SessionResponse({ requestId, response, error }), }, }), + { destinationIdentity }, ); } @@ -955,7 +1032,8 @@ export class RemoteSession extends (EventEmitter as new () => TypedEventEmitter< private async recvLoop(): Promise { try { - for await (const msg of this.transport) { + for await (const incoming of this.transport) { + const msg = incoming.message; switch (msg.message.case) { case 'event': this.dispatchEvent(msg.message.value); From 4f4348c71ea374bbbdba23025e417ed730774086 Mon Sep 17 00:00:00 2001 From: theomonnom Date: Mon, 11 May 2026 16:53:01 +0000 Subject: [PATCH 2/5] fix(agents): preserve remote session client routing --- agents/src/voice/agent_session.ts | 4 ++- agents/src/voice/remote_session.ts | 48 ++++++++++++++++++++++++------ 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 46901b4c0..841b17121 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -448,7 +448,9 @@ export class AgentSession< this._roomIO.start(); - const transport = new RoomSessionTransport(room, this._roomIO); + const transport = new RoomSessionTransport(room, this._roomIO, { + requireCanManageAgentSession: true, + }); this.sessionHost = new SessionHost(transport); this.sessionHost.registerSession(this); if (inputOptions?.textEnabled !== false) { diff --git a/agents/src/voice/remote_session.ts b/agents/src/voice/remote_session.ts index 476c243a7..479b6b352 100644 --- a/agents/src/voice/remote_session.ts +++ b/agents/src/voice/remote_session.ts @@ -96,16 +96,24 @@ export interface IncomingMessage { senderIdentity?: string; } +interface RoomSessionTransportOptions { + requireCanManageAgentSession?: boolean; +} + export class RoomSessionTransport extends SessionTransport { private readonly room: Room; + private readonly roomIO?: RoomIO; + private readonly requireCanManageAgentSession: boolean; private handlerRegistered = false; private closed = false; private pendingMessages: IncomingMessage[] = []; private waitingResolve: ((value: IteratorResult) => void) | null = null; - constructor(room: Room, _roomIO?: RoomIO) { + constructor(room: Room, roomIO?: RoomIO, opts: RoomSessionTransportOptions = {}) { super(); this.room = room; + this.roomIO = roomIO; + this.requireCanManageAgentSession = opts.requireCanManageAgentSession ?? false; } override async start(): Promise { @@ -115,18 +123,30 @@ export class RoomSessionTransport extends SessionTransport { } private onByteStream = (reader: ByteStreamReader, participantInfo: { identity: string }) => { - if (!this.canManage(participantInfo.identity)) { + if (this.requireCanManageAgentSession && !this.canManage(participantInfo.identity)) { log().debug( { participant: participantInfo.identity }, 'ignoring session message from participant without canManageAgentSession grant', ); return; } + const remoteIdentity = this.getRemoteIdentity(); + if ( + !this.requireCanManageAgentSession && + remoteIdentity && + participantInfo.identity !== remoteIdentity + ) { + return; + } this.readStream(reader, participantInfo.identity).catch((e) => { log().warn({ error: e }, 'failed to read binary stream message'); }); }; + private getRemoteIdentity(): string | undefined { + return this.roomIO?.linkedParticipant?.identity; + } + private canManage(identity: string): boolean { return ( this.room.remoteParticipants.get(identity)?.info.permission?.canManageAgentSession === true @@ -171,20 +191,18 @@ export class RoomSessionTransport extends SessionTransport { ): Promise { if (this.closed || !this.room.isConnected) return; - const destinationIdentities = opts.destinationIdentity - ? this.canManage(opts.destinationIdentity) - ? [opts.destinationIdentity] - : [] - : this.authorizedIdentities(); - if (destinationIdentities.length === 0) return; + const destinationIdentities = this.getDestinationIdentities(opts.destinationIdentity); + if (destinationIdentities?.length === 0) return; try { const data = msg.toBinary(); const streamOpts: Record = { topic: TOPIC_SESSION_MESSAGES, name: shortuuid('AS_'), - destinationIdentities, }; + if (destinationIdentities) { + streamOpts.destinationIdentities = destinationIdentities; + } const writer = await this.room.localParticipant!.streamBytes(streamOpts); await writer.write(new Uint8Array(data)); await writer.close(); @@ -193,6 +211,18 @@ export class RoomSessionTransport extends SessionTransport { } } + private getDestinationIdentities(destinationIdentity?: string): string[] | undefined { + if (this.requireCanManageAgentSession) { + if (destinationIdentity) { + return this.canManage(destinationIdentity) ? [destinationIdentity] : []; + } + return this.authorizedIdentities(); + } + + const remoteIdentity = destinationIdentity ?? this.getRemoteIdentity(); + return remoteIdentity ? [remoteIdentity] : undefined; + } + override async close(): Promise { if (this.closed) return; this.closed = true; From d8376ab44f163bde397dfc35d6cc4db756e92e0c Mon Sep 17 00:00:00 2001 From: theomonnom Date: Mon, 11 May 2026 16:55:44 +0000 Subject: [PATCH 3/5] refactor(agents): simplify room session transport flag --- agents/src/voice/agent_session.ts | 4 +--- agents/src/voice/remote_session.ts | 8 ++------ 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 841b17121..81e4d95d8 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -448,9 +448,7 @@ export class AgentSession< this._roomIO.start(); - const transport = new RoomSessionTransport(room, this._roomIO, { - requireCanManageAgentSession: true, - }); + const transport = new RoomSessionTransport(room, this._roomIO, true); this.sessionHost = new SessionHost(transport); this.sessionHost.registerSession(this); if (inputOptions?.textEnabled !== false) { diff --git a/agents/src/voice/remote_session.ts b/agents/src/voice/remote_session.ts index 479b6b352..e1c954e4c 100644 --- a/agents/src/voice/remote_session.ts +++ b/agents/src/voice/remote_session.ts @@ -96,10 +96,6 @@ export interface IncomingMessage { senderIdentity?: string; } -interface RoomSessionTransportOptions { - requireCanManageAgentSession?: boolean; -} - export class RoomSessionTransport extends SessionTransport { private readonly room: Room; private readonly roomIO?: RoomIO; @@ -109,11 +105,11 @@ export class RoomSessionTransport extends SessionTransport { private pendingMessages: IncomingMessage[] = []; private waitingResolve: ((value: IteratorResult) => void) | null = null; - constructor(room: Room, roomIO?: RoomIO, opts: RoomSessionTransportOptions = {}) { + constructor(room: Room, roomIO?: RoomIO, requireCanManageAgentSession = false) { super(); this.room = room; this.roomIO = roomIO; - this.requireCanManageAgentSession = opts.requireCanManageAgentSession ?? false; + this.requireCanManageAgentSession = requireCanManageAgentSession; } override async start(): Promise { From 859f75db41ef1c4e748e755967eff2029b421459 Mon Sep 17 00:00:00 2001 From: theomonnom Date: Mon, 11 May 2026 16:59:07 +0000 Subject: [PATCH 4/5] refactor(agents): split remote session room transports --- agents/src/voice/agent_session.ts | 2 +- agents/src/voice/remote_session.ts | 69 +++++++++++++++++------------- 2 files changed, 40 insertions(+), 31 deletions(-) diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 81e4d95d8..4cdb22578 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -448,7 +448,7 @@ export class AgentSession< this._roomIO.start(); - const transport = new RoomSessionTransport(room, this._roomIO, true); + const transport = new RoomSessionTransport(room); this.sessionHost = new SessionHost(transport); this.sessionHost.registerSession(this); if (inputOptions?.textEnabled !== false) { diff --git a/agents/src/voice/remote_session.ts b/agents/src/voice/remote_session.ts index e1c954e4c..40f5e5379 100644 --- a/agents/src/voice/remote_session.ts +++ b/agents/src/voice/remote_session.ts @@ -98,18 +98,14 @@ export interface IncomingMessage { export class RoomSessionTransport extends SessionTransport { private readonly room: Room; - private readonly roomIO?: RoomIO; - private readonly requireCanManageAgentSession: boolean; private handlerRegistered = false; private closed = false; private pendingMessages: IncomingMessage[] = []; private waitingResolve: ((value: IteratorResult) => void) | null = null; - constructor(room: Room, roomIO?: RoomIO, requireCanManageAgentSession = false) { + constructor(room: Room, _roomIO?: RoomIO) { super(); this.room = room; - this.roomIO = roomIO; - this.requireCanManageAgentSession = requireCanManageAgentSession; } override async start(): Promise { @@ -119,19 +115,7 @@ export class RoomSessionTransport extends SessionTransport { } private onByteStream = (reader: ByteStreamReader, participantInfo: { identity: string }) => { - if (this.requireCanManageAgentSession && !this.canManage(participantInfo.identity)) { - log().debug( - { participant: participantInfo.identity }, - 'ignoring session message from participant without canManageAgentSession grant', - ); - return; - } - const remoteIdentity = this.getRemoteIdentity(); - if ( - !this.requireCanManageAgentSession && - remoteIdentity && - participantInfo.identity !== remoteIdentity - ) { + if (!this.shouldAcceptMessage(participantInfo.identity)) { return; } this.readStream(reader, participantInfo.identity).catch((e) => { @@ -139,8 +123,15 @@ export class RoomSessionTransport extends SessionTransport { }); }; - private getRemoteIdentity(): string | undefined { - return this.roomIO?.linkedParticipant?.identity; + protected shouldAcceptMessage(identity: string): boolean { + if (this.canManage(identity)) { + return true; + } + log().debug( + { participant: identity }, + 'ignoring session message from participant without canManageAgentSession grant', + ); + return false; } private canManage(identity: string): boolean { @@ -207,16 +198,11 @@ export class RoomSessionTransport extends SessionTransport { } } - private getDestinationIdentities(destinationIdentity?: string): string[] | undefined { - if (this.requireCanManageAgentSession) { - if (destinationIdentity) { - return this.canManage(destinationIdentity) ? [destinationIdentity] : []; - } - return this.authorizedIdentities(); + protected getDestinationIdentities(destinationIdentity?: string): string[] | undefined { + if (destinationIdentity) { + return this.canManage(destinationIdentity) ? [destinationIdentity] : []; } - - const remoteIdentity = destinationIdentity ?? this.getRemoteIdentity(); - return remoteIdentity ? [remoteIdentity] : undefined; + return this.authorizedIdentities(); } override async close(): Promise { @@ -283,6 +269,29 @@ export class RoomSessionTransport extends SessionTransport { } } +class LinkedParticipantSessionTransport extends RoomSessionTransport { + private readonly roomIO: RoomIO; + + constructor(room: Room, roomIO: RoomIO) { + super(room); + this.roomIO = roomIO; + } + + protected override shouldAcceptMessage(identity: string): boolean { + const remoteIdentity = this.getRemoteIdentity(); + return !remoteIdentity || identity === remoteIdentity; + } + + protected override getDestinationIdentities(destinationIdentity?: string): string[] | undefined { + const remoteIdentity = destinationIdentity ?? this.getRemoteIdentity(); + return remoteIdentity ? [remoteIdentity] : undefined; + } + + private getRemoteIdentity(): string | undefined { + return this.roomIO.linkedParticipant?.identity; + } +} + // =========================================================================== // Enum maps // =========================================================================== @@ -1024,7 +1033,7 @@ export class RemoteSession extends (EventEmitter as new () => TypedEventEmitter< } static fromRoom(room: Room, roomIO: RoomIO): RemoteSession { - const transport = new RoomSessionTransport(room, roomIO); + const transport = new LinkedParticipantSessionTransport(room, roomIO); return new RemoteSession(transport); } From ee6c5d444d49ae345861a974577c530de96a48a2 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Mon, 11 May 2026 11:16:49 -0700 Subject: [PATCH 5/5] Create blue-kids-cough.md --- .changeset/blue-kids-cough.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/blue-kids-cough.md diff --git a/.changeset/blue-kids-cough.md b/.changeset/blue-kids-cough.md new file mode 100644 index 000000000..8c6d4d01a --- /dev/null +++ b/.changeset/blue-kids-cough.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +Require session management grant for remote sessions