diff --git a/src/gateway/server-methods/talk-session.ts b/src/gateway/server-methods/talk-session.ts index b67206d00a2..49a4591dfa7 100644 --- a/src/gateway/server-methods/talk-session.ts +++ b/src/gateway/server-methods/talk-session.ts @@ -24,6 +24,7 @@ import { cancelTalkHandoffTurn, createTalkHandoff, endTalkHandoffTurn, + getTalkHandoff, joinTalkHandoff, revokeTalkHandoff, startTalkHandoffTurn, @@ -89,6 +90,32 @@ function normalizeTalkSessionBrain(params: { mode: TalkMode; brain?: string }): return params.mode === "transcription" ? "none" : "agent-consult"; } +function isActiveManagedRoomClient( + session: { handoffId: string }, + connId: string | undefined, +): boolean { + if (!connId) { + return false; + } + const handoff = getTalkHandoff(session.handoffId); + return handoff?.room.activeClientId === connId; +} + +function canCloseManagedRoomSession( + session: { handoffId: string }, + connId: string | undefined, +): boolean { + const handoff = getTalkHandoff(session.handoffId); + return !handoff?.room.activeClientId || handoff.room.activeClientId === connId; +} + +function managedRoomOwnershipError(action: string) { + return errorShape( + ErrorCodes.INVALID_REQUEST, + `talk.session.${action} requires the active managed-room connection`, + ); +} + export const talkSessionHandlers: GatewayRequestHandlers = { "talk.session.create": async ({ params, respond, context, client }) => { if (!validateTalkSessionCreateParams(params)) { @@ -413,6 +440,10 @@ export const talkSessionHandlers: GatewayRequestHandlers = { ); return; } + if (!isActiveManagedRoomClient(session, client?.connId)) { + respond(false, undefined, managedRoomOwnershipError("startTurn")); + return; + } const result = startTalkHandoffTurn(session.handoffId, session.token, { turnId: params.turnId, clientId: client?.connId, @@ -438,7 +469,7 @@ export const talkSessionHandlers: GatewayRequestHandlers = { respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err))); } }, - "talk.session.endTurn": async ({ params, respond, context }) => { + "talk.session.endTurn": async ({ params, respond, client, context }) => { if (!validateTalkSessionTurnParams(params)) { respond( false, @@ -460,6 +491,10 @@ export const talkSessionHandlers: GatewayRequestHandlers = { ); return; } + if (!isActiveManagedRoomClient(session, client?.connId)) { + respond(false, undefined, managedRoomOwnershipError("endTurn")); + return; + } const result = endTalkHandoffTurn(session.handoffId, session.token, { turnId: params.turnId, }); @@ -515,6 +550,10 @@ export const talkSessionHandlers: GatewayRequestHandlers = { respond(true, { ok: true }, undefined); return; } + if (!isActiveManagedRoomClient(session, client?.connId)) { + respond(false, undefined, managedRoomOwnershipError("cancelTurn")); + return; + } const result = cancelTalkHandoffTurn(session.handoffId, session.token, { turnId: params.turnId, reason: params.reason, @@ -613,7 +652,7 @@ export const talkSessionHandlers: GatewayRequestHandlers = { respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err))); } }, - "talk.session.close": async ({ params, respond, client }) => { + "talk.session.close": async ({ params, respond, client, context }) => { if (!validateTalkSessionCloseParams(params)) { respond( false, @@ -637,7 +676,16 @@ export const talkSessionHandlers: GatewayRequestHandlers = { connId, }); } else { - revokeTalkHandoff(session.handoffId); + if (!canCloseManagedRoomSession(session, client?.connId)) { + respond(false, undefined, managedRoomOwnershipError("close")); + return; + } + const result = revokeTalkHandoff(session.handoffId); + broadcastTalkRoomEvents(context, result.activeClientId, { + handoffId: session.handoffId, + roomId: session.roomId, + events: result.events, + }); } forgetUnifiedTalkSession(params.sessionId); respond(true, { ok: true }, undefined); diff --git a/src/gateway/server-methods/talk.test.ts b/src/gateway/server-methods/talk.test.ts index 6d6e8be65b5..fa0d7510ad2 100644 --- a/src/gateway/server-methods/talk.test.ts +++ b/src/gateway/server-methods/talk.test.ts @@ -718,9 +718,40 @@ describe("talk.session unified handlers", () => { }, }); + const joinRespond = vi.fn(); + await talkHandlers["talk.session.join"]({ + req: { type: "req", id: "2", method: "talk.session.join" }, + params: { sessionId: session.sessionId, token: session.token }, + client: { connId: "conn-1" } as never, + isWebchatConnect: () => false, + respond: joinRespond as never, + context: { + broadcastToConnIds, + } as never, + }); + expect(joinRespond).toHaveBeenCalledWith( + true, + expect.objectContaining({ + id: session.sessionId, + room: expect.objectContaining({ + activeClientId: "conn-1", + }), + }), + undefined, + ); + expect(broadcastToConnIds).toHaveBeenCalledWith( + "talk.event", + expect.objectContaining({ + handoffId: session.sessionId, + talkEvent: expect.objectContaining({ type: "session.ready" }), + }), + new Set(["conn-1"]), + { dropIfSlow: true }, + ); + const startRespond = vi.fn(); await talkHandlers["talk.session.startTurn"]({ - req: { type: "req", id: "2", method: "talk.session.startTurn" }, + req: { type: "req", id: "3", method: "talk.session.startTurn" }, params: { sessionId: session.sessionId, turnId: "turn-1" }, client: { connId: "conn-1" } as never, isWebchatConnect: () => false, @@ -752,14 +783,162 @@ describe("talk.session unified handlers", () => { const closeRespond = vi.fn(); await talkHandlers["talk.session.close"]({ - req: { type: "req", id: "3", method: "talk.session.close" }, + req: { type: "req", id: "4", method: "talk.session.close" }, params: { sessionId: session.sessionId }, client: { connId: "conn-1" } as never, isWebchatConnect: () => false, respond: closeRespond as never, - context: {} as never, + context: { + broadcastToConnIds, + } as never, }); expect(closeRespond).toHaveBeenCalledWith(true, { ok: true }, undefined); + expect(broadcastToConnIds).toHaveBeenCalledWith( + "talk.event", + expect.objectContaining({ + handoffId: session.sessionId, + talkEvent: expect.objectContaining({ type: "session.closed", final: true }), + }), + new Set(["conn-1"]), + { dropIfSlow: true }, + ); + }); + + it("requires managed-room ownership before turn control", async () => { + const broadcastToConnIds = vi.fn(); + const createRespond = vi.fn(); + await talkHandlers["talk.session.create"]({ + req: { type: "req", id: "1", method: "talk.session.create" }, + params: { + mode: "stt-tts", + transport: "managed-room", + sessionKey: "session:main", + }, + client: { connId: "creator" } as never, + isWebchatConnect: () => false, + respond: createRespond as never, + context: { + getRuntimeConfig: () => ({}) as OpenClawConfig, + } as never, + }); + const session = createRespond.mock.calls[0]?.[1] as { sessionId: string; token: string }; + + const unjoinedStartRespond = vi.fn(); + await talkHandlers["talk.session.startTurn"]({ + req: { type: "req", id: "2", method: "talk.session.startTurn" }, + params: { sessionId: session.sessionId, turnId: "turn-1" }, + client: { connId: "creator" } as never, + isWebchatConnect: () => false, + respond: unjoinedStartRespond as never, + context: { broadcastToConnIds } as never, + }); + expect(unjoinedStartRespond).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ + code: ErrorCodes.INVALID_REQUEST, + message: "talk.session.startTurn requires the active managed-room connection", + }), + ); + + await talkHandlers["talk.session.join"]({ + req: { type: "req", id: "3", method: "talk.session.join" }, + params: { sessionId: session.sessionId, token: session.token }, + client: { connId: "conn-1" } as never, + isWebchatConnect: () => false, + respond: vi.fn() as never, + context: { broadcastToConnIds } as never, + }); + + const staleStartRespond = vi.fn(); + await talkHandlers["talk.session.startTurn"]({ + req: { type: "req", id: "4", method: "talk.session.startTurn" }, + params: { sessionId: session.sessionId, turnId: "turn-1" }, + client: { connId: "conn-2" } as never, + isWebchatConnect: () => false, + respond: staleStartRespond as never, + context: { broadcastToConnIds } as never, + }); + expect(staleStartRespond).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ + code: ErrorCodes.INVALID_REQUEST, + message: "talk.session.startTurn requires the active managed-room connection", + }), + ); + + await talkHandlers["talk.session.startTurn"]({ + req: { type: "req", id: "5", method: "talk.session.startTurn" }, + params: { sessionId: session.sessionId, turnId: "turn-1" }, + client: { connId: "conn-1" } as never, + isWebchatConnect: () => false, + respond: vi.fn() as never, + context: { broadcastToConnIds } as never, + }); + + const staleEndRespond = vi.fn(); + await talkHandlers["talk.session.endTurn"]({ + req: { type: "req", id: "6", method: "talk.session.endTurn" }, + params: { sessionId: session.sessionId, turnId: "turn-1" }, + client: { connId: "conn-2" } as never, + isWebchatConnect: () => false, + respond: staleEndRespond as never, + context: { broadcastToConnIds } as never, + }); + expect(staleEndRespond).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ + code: ErrorCodes.INVALID_REQUEST, + message: "talk.session.endTurn requires the active managed-room connection", + }), + ); + + const staleCancelRespond = vi.fn(); + await talkHandlers["talk.session.cancelTurn"]({ + req: { type: "req", id: "7", method: "talk.session.cancelTurn" }, + params: { sessionId: session.sessionId, turnId: "turn-1" }, + client: { connId: "conn-2" } as never, + isWebchatConnect: () => false, + respond: staleCancelRespond as never, + context: { broadcastToConnIds } as never, + }); + expect(staleCancelRespond).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ + code: ErrorCodes.INVALID_REQUEST, + message: "talk.session.cancelTurn requires the active managed-room connection", + }), + ); + + const staleCloseRespond = vi.fn(); + await talkHandlers["talk.session.close"]({ + req: { type: "req", id: "8", method: "talk.session.close" }, + params: { sessionId: session.sessionId }, + client: { connId: "conn-2" } as never, + isWebchatConnect: () => false, + respond: staleCloseRespond as never, + context: { broadcastToConnIds } as never, + }); + expect(staleCloseRespond).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ + code: ErrorCodes.INVALID_REQUEST, + message: "talk.session.close requires the active managed-room connection", + }), + ); + + await talkHandlers["talk.session.close"]({ + req: { type: "req", id: "9", method: "talk.session.close" }, + params: { sessionId: session.sessionId }, + client: { connId: "conn-1" } as never, + isWebchatConnect: () => false, + respond: vi.fn() as never, + context: { broadcastToConnIds } as never, + }); }); it("keeps direct-tools managed-room sessions behind admin scope", async () => {