From 1d8968c8a821ff1a05c294a1846b3bcb6f343794 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 22 Feb 2026 23:25:11 +0100 Subject: [PATCH] fix(voice-call): harden media stream pre-start websocket handling --- CHANGELOG.md | 1 + docs/plugins/voice-call.md | 9 + extensions/voice-call/README.md | 11 ++ extensions/voice-call/src/config.test.ts | 4 + extensions/voice-call/src/config.ts | 15 ++ .../voice-call/src/media-stream.test.ts | 175 ++++++++++++++++++ extensions/voice-call/src/media-stream.ts | 110 +++++++++++ extensions/voice-call/src/webhook.ts | 18 +- 8 files changed, 340 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ba6aef977db..5eaf10faf49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Security/Voice Call: harden media stream WebSocket handling against pre-auth idle-connection DoS by adding strict pre-start timeouts, pending/per-IP connection limits, and total connection caps for streaming endpoints. This ships in the next npm release. Thanks @jiseoung for reporting. - Telegram/Discord extensions: propagate trusted `mediaLocalRoots` through extension outbound `sendMedia` options so extension direct-send media paths honor agent-scoped local-media allowlists. (#20029, #21903, #23227) - Exec/Background: stop applying the default exec timeout to background sessions (`background: true` or explicit `yieldMs`) when no explicit timeout is set, so long-running background jobs are no longer terminated at the default timeout boundary. (#23303) - Plugins/Media sandbox: propagate trusted `mediaLocalRoots` through plugin action dispatch (including Discord/Telegram action adapters) so plugin send paths enforce the same agent-scoped local-media sandbox roots as core outbound sends. (#20258, #22718) diff --git a/docs/plugins/voice-call.md b/docs/plugins/voice-call.md index aba63555026..8637685bbe9 100644 --- a/docs/plugins/voice-call.md +++ b/docs/plugins/voice-call.md @@ -107,6 +107,10 @@ Set config under `plugins.entries.voice-call.config`: streaming: { enabled: true, streamPath: "/voice/stream", + preStartTimeoutMs: 5000, + maxPendingConnections: 32, + maxPendingConnectionsPerIp: 4, + maxConnections: 128, }, }, }, @@ -125,6 +129,11 @@ Notes: - If you use ngrok free tier, set `publicUrl` to the exact ngrok URL; signature verification is always enforced. - `tunnel.allowNgrokFreeTierLoopbackBypass: true` allows Twilio webhooks with invalid signatures **only** when `tunnel.provider="ngrok"` and `serve.bind` is loopback (ngrok local agent). Use for local dev only. - Ngrok free tier URLs can change or add interstitial behavior; if `publicUrl` drifts, Twilio signatures will fail. For production, prefer a stable domain or Tailscale funnel. +- Streaming security defaults: + - `streaming.preStartTimeoutMs` closes sockets that never send a valid `start` frame. + - `streaming.maxPendingConnections` caps total unauthenticated pre-start sockets. + - `streaming.maxPendingConnectionsPerIp` caps unauthenticated pre-start sockets per source IP. + - `streaming.maxConnections` caps total open media stream sockets (pending + active). ## Stale call reaper diff --git a/extensions/voice-call/README.md b/extensions/voice-call/README.md index 88328b6a339..f278c22cb74 100644 --- a/extensions/voice-call/README.md +++ b/extensions/voice-call/README.md @@ -76,6 +76,10 @@ Put under `plugins.entries.voice-call.config`: streaming: { enabled: true, streamPath: "/voice/stream", + preStartTimeoutMs: 5000, + maxPendingConnections: 32, + maxPendingConnectionsPerIp: 4, + maxConnections: 128, }, } ``` @@ -87,6 +91,13 @@ Notes: - Telnyx requires `telnyx.publicKey` (or `TELNYX_PUBLIC_KEY`) unless `skipSignatureVerification` is true. - `tunnel.allowNgrokFreeTierLoopbackBypass: true` allows Twilio webhooks with invalid signatures **only** when `tunnel.provider="ngrok"` and `serve.bind` is loopback (ngrok local agent). Use for local dev only. +Streaming security defaults: + +- `streaming.preStartTimeoutMs` closes sockets that never send a valid `start` frame. +- `streaming.maxPendingConnections` caps total unauthenticated pre-start sockets. +- `streaming.maxPendingConnectionsPerIp` caps unauthenticated pre-start sockets per source IP. +- `streaming.maxConnections` caps total open media stream sockets (pending + active). + ## Stale call reaper Use `staleCallReaperSeconds` to end calls that never receive a terminal webhook diff --git a/extensions/voice-call/src/config.test.ts b/extensions/voice-call/src/config.test.ts index 893e7868d47..ba1889edb4f 100644 --- a/extensions/voice-call/src/config.test.ts +++ b/extensions/voice-call/src/config.test.ts @@ -30,6 +30,10 @@ function createBaseConfig(provider: "telnyx" | "twilio" | "plivo" | "mock"): Voi silenceDurationMs: 800, vadThreshold: 0.5, streamPath: "/voice/stream", + preStartTimeoutMs: 5000, + maxPendingConnections: 32, + maxPendingConnectionsPerIp: 4, + maxConnections: 128, }, skipSignatureVerification: false, stt: { provider: "openai", model: "whisper-1" }, diff --git a/extensions/voice-call/src/config.ts b/extensions/voice-call/src/config.ts index 68b197c09bb..36b77778e9f 100644 --- a/extensions/voice-call/src/config.ts +++ b/extensions/voice-call/src/config.ts @@ -219,6 +219,17 @@ export const VoiceCallStreamingConfigSchema = z vadThreshold: z.number().min(0).max(1).default(0.5), /** WebSocket path for media stream connections */ streamPath: z.string().min(1).default("/voice/stream"), + /** + * Close unauthenticated media stream sockets if no valid `start` frame arrives in time. + * Protects against pre-auth idle connection hold attacks. + */ + preStartTimeoutMs: z.number().int().positive().default(5000), + /** Maximum number of concurrently pending (pre-start) media stream sockets. */ + maxPendingConnections: z.number().int().positive().default(32), + /** Maximum pending media stream sockets per source IP. */ + maxPendingConnectionsPerIp: z.number().int().positive().default(4), + /** Hard cap for all open media stream sockets (pending + active). */ + maxConnections: z.number().int().positive().default(128), }) .strict() .default({ @@ -228,6 +239,10 @@ export const VoiceCallStreamingConfigSchema = z silenceDurationMs: 800, vadThreshold: 0.5, streamPath: "/voice/stream", + preStartTimeoutMs: 5000, + maxPendingConnections: 32, + maxPendingConnectionsPerIp: 4, + maxConnections: 128, }); export type VoiceCallStreamingConfig = z.infer; diff --git a/extensions/voice-call/src/media-stream.test.ts b/extensions/voice-call/src/media-stream.test.ts index ac2c5e53733..ecd4727318c 100644 --- a/extensions/voice-call/src/media-stream.test.ts +++ b/extensions/voice-call/src/media-stream.test.ts @@ -1,4 +1,7 @@ +import { once } from "node:events"; +import http from "node:http"; import { describe, expect, it } from "vitest"; +import { WebSocket } from "ws"; import { MediaStreamHandler } from "./media-stream.js"; import type { OpenAIRealtimeSTTProvider, @@ -34,6 +37,70 @@ const waitForAbort = (signal: AbortSignal): Promise => signal.addEventListener("abort", () => resolve(), { once: true }); }); +const withTimeout = async (promise: Promise, timeoutMs = 2000): Promise => { + let timer: ReturnType | null = null; + const timeout = new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error(`Timed out after ${timeoutMs}ms`)), timeoutMs); + }); + + try { + return await Promise.race([promise, timeout]); + } finally { + if (timer) { + clearTimeout(timer); + } + } +}; + +const startWsServer = async ( + handler: MediaStreamHandler, +): Promise<{ + url: string; + close: () => Promise; +}> => { + const server = http.createServer(); + server.on("upgrade", (request, socket, head) => { + handler.handleUpgrade(request, socket, head); + }); + + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", resolve); + }); + + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("Failed to resolve test server address"); + } + + return { + url: `ws://127.0.0.1:${address.port}/voice/stream`, + close: async () => { + await new Promise((resolve, reject) => { + server.close((err) => (err ? reject(err) : resolve())); + }); + }, + }; +}; + +const connectWs = async (url: string): Promise => { + const ws = new WebSocket(url); + await withTimeout(once(ws, "open") as Promise<[unknown]>); + return ws; +}; + +const waitForClose = async ( + ws: WebSocket, +): Promise<{ + code: number; + reason: string; +}> => { + const [code, reason] = (await withTimeout(once(ws, "close") as Promise<[number, Buffer]>)) ?? []; + return { + code, + reason: Buffer.isBuffer(reason) ? reason.toString() : String(reason || ""), + }; +}; + describe("MediaStreamHandler TTS queue", () => { it("serializes TTS playback and resolves in order", async () => { const handler = new MediaStreamHandler({ @@ -94,3 +161,111 @@ describe("MediaStreamHandler TTS queue", () => { expect(queuedRan).toBe(false); }); }); + +describe("MediaStreamHandler security hardening", () => { + it("closes idle pre-start connections after timeout", async () => { + const shouldAcceptStreamCalls: Array<{ callId: string; streamSid: string; token?: string }> = + []; + const handler = new MediaStreamHandler({ + sttProvider: createStubSttProvider(), + preStartTimeoutMs: 40, + shouldAcceptStream: (params) => { + shouldAcceptStreamCalls.push(params); + return true; + }, + }); + const server = await startWsServer(handler); + + try { + const ws = await connectWs(server.url); + const closed = await waitForClose(ws); + + expect(closed.code).toBe(1008); + expect(closed.reason).toBe("Start timeout"); + expect(shouldAcceptStreamCalls).toEqual([]); + } finally { + await server.close(); + } + }); + + it("enforces pending connection limits", async () => { + const handler = new MediaStreamHandler({ + sttProvider: createStubSttProvider(), + preStartTimeoutMs: 5_000, + maxPendingConnections: 1, + maxPendingConnectionsPerIp: 1, + }); + const server = await startWsServer(handler); + + try { + const first = await connectWs(server.url); + const second = await connectWs(server.url); + const secondClosed = await waitForClose(second); + + expect(secondClosed.code).toBe(1013); + expect(secondClosed.reason).toContain("Too many pending"); + expect(first.readyState).toBe(WebSocket.OPEN); + + first.close(); + await waitForClose(first); + } finally { + await server.close(); + } + }); + + it("rejects upgrades when max connection cap is reached", async () => { + const handler = new MediaStreamHandler({ + sttProvider: createStubSttProvider(), + preStartTimeoutMs: 5_000, + maxConnections: 1, + maxPendingConnections: 10, + maxPendingConnectionsPerIp: 10, + }); + const server = await startWsServer(handler); + + try { + const first = await connectWs(server.url); + const secondError = await withTimeout( + new Promise((resolve) => { + const ws = new WebSocket(server.url); + ws.once("error", (err) => resolve(err as Error)); + }), + ); + + expect(secondError.message).toContain("Unexpected server response: 503"); + + first.close(); + await waitForClose(first); + } finally { + await server.close(); + } + }); + + it("clears pending state after valid start", async () => { + const handler = new MediaStreamHandler({ + sttProvider: createStubSttProvider(), + preStartTimeoutMs: 40, + shouldAcceptStream: () => true, + }); + const server = await startWsServer(handler); + + try { + const ws = await connectWs(server.url); + ws.send( + JSON.stringify({ + event: "start", + streamSid: "MZ123", + start: { callSid: "CA123", customParameters: { token: "token-123" } }, + }), + ); + + await new Promise((resolve) => setTimeout(resolve, 80)); + expect(ws.readyState).toBe(WebSocket.OPEN); + + ws.close(); + await waitForClose(ws); + } finally { + await server.close(); + } + }); +}); diff --git a/extensions/voice-call/src/media-stream.ts b/extensions/voice-call/src/media-stream.ts index ebb0ed9d844..11fa0109c12 100644 --- a/extensions/voice-call/src/media-stream.ts +++ b/extensions/voice-call/src/media-stream.ts @@ -21,6 +21,14 @@ import type { export interface MediaStreamConfig { /** STT provider for transcription */ sttProvider: OpenAIRealtimeSTTProvider; + /** Close sockets that never send a valid `start` frame within this window. */ + preStartTimeoutMs?: number; + /** Max concurrent pre-start sockets. */ + maxPendingConnections?: number; + /** Max concurrent pre-start sockets from a single source IP. */ + maxPendingConnectionsPerIp?: number; + /** Max total open sockets (pending + active sessions). */ + maxConnections?: number; /** Validate whether to accept a media stream for the given call ID */ shouldAcceptStream?: (params: { callId: string; streamSid: string; token?: string }) => boolean; /** Callback when transcript is received */ @@ -52,6 +60,16 @@ type TtsQueueEntry = { reject: (error: unknown) => void; }; +type PendingConnection = { + ip: string; + timeout: ReturnType; +}; + +const DEFAULT_PRE_START_TIMEOUT_MS = 5000; +const DEFAULT_MAX_PENDING_CONNECTIONS = 32; +const DEFAULT_MAX_PENDING_CONNECTIONS_PER_IP = 4; +const DEFAULT_MAX_CONNECTIONS = 128; + /** * Manages WebSocket connections for Twilio media streams. */ @@ -59,6 +77,14 @@ export class MediaStreamHandler { private wss: WebSocketServer | null = null; private sessions = new Map(); private config: MediaStreamConfig; + /** Pending sockets that have upgraded but not yet sent an accepted `start` frame. */ + private pendingConnections = new Map(); + /** Pending socket count per remote IP for pre-auth throttling. */ + private pendingByIp = new Map(); + private preStartTimeoutMs: number; + private maxPendingConnections: number; + private maxPendingConnectionsPerIp: number; + private maxConnections: number; /** TTS playback queues per stream (serialize audio to prevent overlap) */ private ttsQueues = new Map(); /** Whether TTS is currently playing per stream */ @@ -68,6 +94,11 @@ export class MediaStreamHandler { constructor(config: MediaStreamConfig) { this.config = config; + this.preStartTimeoutMs = config.preStartTimeoutMs ?? DEFAULT_PRE_START_TIMEOUT_MS; + this.maxPendingConnections = config.maxPendingConnections ?? DEFAULT_MAX_PENDING_CONNECTIONS; + this.maxPendingConnectionsPerIp = + config.maxPendingConnectionsPerIp ?? DEFAULT_MAX_PENDING_CONNECTIONS_PER_IP; + this.maxConnections = config.maxConnections ?? DEFAULT_MAX_CONNECTIONS; } /** @@ -79,6 +110,12 @@ export class MediaStreamHandler { this.wss.on("connection", (ws, req) => this.handleConnection(ws, req)); } + const currentConnections = this.wss.clients.size; + if (currentConnections >= this.maxConnections) { + this.rejectUpgrade(socket, 503, "Too many media stream connections"); + return; + } + this.wss.handleUpgrade(request, socket, head, (ws) => { this.wss?.emit("connection", ws, request); }); @@ -90,6 +127,12 @@ export class MediaStreamHandler { private async handleConnection(ws: WebSocket, _request: IncomingMessage): Promise { let session: StreamSession | null = null; const streamToken = this.getStreamToken(_request); + const ip = this.getClientIp(_request); + + if (!this.registerPendingConnection(ws, ip)) { + ws.close(1013, "Too many pending media stream connections"); + return; + } ws.on("message", async (data: Buffer) => { try { @@ -102,6 +145,9 @@ export class MediaStreamHandler { case "start": session = await this.handleStart(ws, message, streamToken); + if (session) { + this.clearPendingConnection(ws); + } break; case "media": @@ -125,6 +171,7 @@ export class MediaStreamHandler { }); ws.on("close", () => { + this.clearPendingConnection(ws); if (session) { this.handleStop(session); } @@ -226,6 +273,69 @@ export class MediaStreamHandler { } } + private getClientIp(request: IncomingMessage): string { + return request.socket.remoteAddress || "unknown"; + } + + private registerPendingConnection(ws: WebSocket, ip: string): boolean { + if (this.pendingConnections.size >= this.maxPendingConnections) { + console.warn("[MediaStream] Rejecting connection: pending connection limit reached"); + return false; + } + + const pendingForIp = this.pendingByIp.get(ip) ?? 0; + if (pendingForIp >= this.maxPendingConnectionsPerIp) { + console.warn(`[MediaStream] Rejecting connection: pending per-IP limit reached (${ip})`); + return false; + } + + const timeout = setTimeout(() => { + if (!this.pendingConnections.has(ws)) { + return; + } + console.warn( + `[MediaStream] Closing pre-start idle connection after ${this.preStartTimeoutMs}ms (${ip})`, + ); + ws.close(1008, "Start timeout"); + }, this.preStartTimeoutMs); + + timeout.unref?.(); + this.pendingConnections.set(ws, { ip, timeout }); + this.pendingByIp.set(ip, pendingForIp + 1); + return true; + } + + private clearPendingConnection(ws: WebSocket): void { + const pending = this.pendingConnections.get(ws); + if (!pending) { + return; + } + + clearTimeout(pending.timeout); + this.pendingConnections.delete(ws); + + const current = this.pendingByIp.get(pending.ip) ?? 0; + if (current <= 1) { + this.pendingByIp.delete(pending.ip); + return; + } + this.pendingByIp.set(pending.ip, current - 1); + } + + private rejectUpgrade(socket: Duplex, statusCode: 429 | 503, message: string): void { + const statusText = statusCode === 429 ? "Too Many Requests" : "Service Unavailable"; + const body = `${message}\n`; + socket.write( + `HTTP/1.1 ${statusCode} ${statusText}\r\n` + + "Connection: close\r\n" + + "Content-Type: text/plain; charset=utf-8\r\n" + + `Content-Length: ${Buffer.byteLength(body)}\r\n` + + "\r\n" + + body, + ); + socket.destroy(); + } + /** * Get an active session with an open WebSocket, or undefined if unavailable. */ diff --git a/extensions/voice-call/src/webhook.ts b/extensions/voice-call/src/webhook.ts index f9e18a9dacf..ec052342285 100644 --- a/extensions/voice-call/src/webhook.ts +++ b/extensions/voice-call/src/webhook.ts @@ -77,6 +77,10 @@ export class VoiceCallWebhookServer { const streamConfig: MediaStreamConfig = { sttProvider, + preStartTimeoutMs: this.config.streaming?.preStartTimeoutMs, + maxPendingConnections: this.config.streaming?.maxPendingConnections, + maxPendingConnectionsPerIp: this.config.streaming?.maxPendingConnectionsPerIp, + maxConnections: this.config.streaming?.maxConnections, shouldAcceptStream: ({ callId, token }) => { const call = this.manager.getCallByProviderCallId(callId); if (!call) { @@ -192,9 +196,8 @@ export class VoiceCallWebhookServer { // Handle WebSocket upgrades for media streams if (this.mediaStreamHandler) { this.server.on("upgrade", (request, socket, head) => { - const url = new URL(request.url || "/", `http://${request.headers.host}`); - - if (url.pathname === streamPath) { + const path = this.getUpgradePathname(request); + if (path === streamPath) { console.log("[voice-call] WebSocket upgrade for media stream"); this.mediaStreamHandler?.handleUpgrade(request, socket, head); } else { @@ -269,6 +272,15 @@ export class VoiceCallWebhookServer { }); } + private getUpgradePathname(request: http.IncomingMessage): string | null { + try { + const host = request.headers.host || "localhost"; + return new URL(request.url || "/", `http://${host}`).pathname; + } catch { + return null; + } + } + /** * Handle incoming HTTP request. */