mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-07 22:44:16 +00:00
fix(voice-call): harden media stream pre-start websocket handling
This commit is contained in:
@@ -32,6 +32,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Security/Voice Call: harden media stream WebSocket handling against pre-auth idle-connection DoS by adding strict pre-start timeouts, pending/per-IP connection limits, and total connection caps for streaming endpoints. This ships in the next npm release. Thanks @jiseoung for reporting.
|
||||
- Telegram/Discord extensions: propagate trusted `mediaLocalRoots` through extension outbound `sendMedia` options so extension direct-send media paths honor agent-scoped local-media allowlists. (#20029, #21903, #23227)
|
||||
- Exec/Background: stop applying the default exec timeout to background sessions (`background: true` or explicit `yieldMs`) when no explicit timeout is set, so long-running background jobs are no longer terminated at the default timeout boundary. (#23303)
|
||||
- Plugins/Media sandbox: propagate trusted `mediaLocalRoots` through plugin action dispatch (including Discord/Telegram action adapters) so plugin send paths enforce the same agent-scoped local-media sandbox roots as core outbound sends. (#20258, #22718)
|
||||
|
||||
@@ -107,6 +107,10 @@ Set config under `plugins.entries.voice-call.config`:
|
||||
streaming: {
|
||||
enabled: true,
|
||||
streamPath: "/voice/stream",
|
||||
preStartTimeoutMs: 5000,
|
||||
maxPendingConnections: 32,
|
||||
maxPendingConnectionsPerIp: 4,
|
||||
maxConnections: 128,
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -125,6 +129,11 @@ Notes:
|
||||
- If you use ngrok free tier, set `publicUrl` to the exact ngrok URL; signature verification is always enforced.
|
||||
- `tunnel.allowNgrokFreeTierLoopbackBypass: true` allows Twilio webhooks with invalid signatures **only** when `tunnel.provider="ngrok"` and `serve.bind` is loopback (ngrok local agent). Use for local dev only.
|
||||
- Ngrok free tier URLs can change or add interstitial behavior; if `publicUrl` drifts, Twilio signatures will fail. For production, prefer a stable domain or Tailscale funnel.
|
||||
- Streaming security defaults:
|
||||
- `streaming.preStartTimeoutMs` closes sockets that never send a valid `start` frame.
|
||||
- `streaming.maxPendingConnections` caps total unauthenticated pre-start sockets.
|
||||
- `streaming.maxPendingConnectionsPerIp` caps unauthenticated pre-start sockets per source IP.
|
||||
- `streaming.maxConnections` caps total open media stream sockets (pending + active).
|
||||
|
||||
## Stale call reaper
|
||||
|
||||
|
||||
@@ -76,6 +76,10 @@ Put under `plugins.entries.voice-call.config`:
|
||||
streaming: {
|
||||
enabled: true,
|
||||
streamPath: "/voice/stream",
|
||||
preStartTimeoutMs: 5000,
|
||||
maxPendingConnections: 32,
|
||||
maxPendingConnectionsPerIp: 4,
|
||||
maxConnections: 128,
|
||||
},
|
||||
}
|
||||
```
|
||||
@@ -87,6 +91,13 @@ Notes:
|
||||
- Telnyx requires `telnyx.publicKey` (or `TELNYX_PUBLIC_KEY`) unless `skipSignatureVerification` is true.
|
||||
- `tunnel.allowNgrokFreeTierLoopbackBypass: true` allows Twilio webhooks with invalid signatures **only** when `tunnel.provider="ngrok"` and `serve.bind` is loopback (ngrok local agent). Use for local dev only.
|
||||
|
||||
Streaming security defaults:
|
||||
|
||||
- `streaming.preStartTimeoutMs` closes sockets that never send a valid `start` frame.
|
||||
- `streaming.maxPendingConnections` caps total unauthenticated pre-start sockets.
|
||||
- `streaming.maxPendingConnectionsPerIp` caps unauthenticated pre-start sockets per source IP.
|
||||
- `streaming.maxConnections` caps total open media stream sockets (pending + active).
|
||||
|
||||
## Stale call reaper
|
||||
|
||||
Use `staleCallReaperSeconds` to end calls that never receive a terminal webhook
|
||||
|
||||
@@ -30,6 +30,10 @@ function createBaseConfig(provider: "telnyx" | "twilio" | "plivo" | "mock"): Voi
|
||||
silenceDurationMs: 800,
|
||||
vadThreshold: 0.5,
|
||||
streamPath: "/voice/stream",
|
||||
preStartTimeoutMs: 5000,
|
||||
maxPendingConnections: 32,
|
||||
maxPendingConnectionsPerIp: 4,
|
||||
maxConnections: 128,
|
||||
},
|
||||
skipSignatureVerification: false,
|
||||
stt: { provider: "openai", model: "whisper-1" },
|
||||
|
||||
@@ -219,6 +219,17 @@ export const VoiceCallStreamingConfigSchema = z
|
||||
vadThreshold: z.number().min(0).max(1).default(0.5),
|
||||
/** WebSocket path for media stream connections */
|
||||
streamPath: z.string().min(1).default("/voice/stream"),
|
||||
/**
|
||||
* Close unauthenticated media stream sockets if no valid `start` frame arrives in time.
|
||||
* Protects against pre-auth idle connection hold attacks.
|
||||
*/
|
||||
preStartTimeoutMs: z.number().int().positive().default(5000),
|
||||
/** Maximum number of concurrently pending (pre-start) media stream sockets. */
|
||||
maxPendingConnections: z.number().int().positive().default(32),
|
||||
/** Maximum pending media stream sockets per source IP. */
|
||||
maxPendingConnectionsPerIp: z.number().int().positive().default(4),
|
||||
/** Hard cap for all open media stream sockets (pending + active). */
|
||||
maxConnections: z.number().int().positive().default(128),
|
||||
})
|
||||
.strict()
|
||||
.default({
|
||||
@@ -228,6 +239,10 @@ export const VoiceCallStreamingConfigSchema = z
|
||||
silenceDurationMs: 800,
|
||||
vadThreshold: 0.5,
|
||||
streamPath: "/voice/stream",
|
||||
preStartTimeoutMs: 5000,
|
||||
maxPendingConnections: 32,
|
||||
maxPendingConnectionsPerIp: 4,
|
||||
maxConnections: 128,
|
||||
});
|
||||
export type VoiceCallStreamingConfig = z.infer<typeof VoiceCallStreamingConfigSchema>;
|
||||
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
import { once } from "node:events";
|
||||
import http from "node:http";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { WebSocket } from "ws";
|
||||
import { MediaStreamHandler } from "./media-stream.js";
|
||||
import type {
|
||||
OpenAIRealtimeSTTProvider,
|
||||
@@ -34,6 +37,70 @@ const waitForAbort = (signal: AbortSignal): Promise<void> =>
|
||||
signal.addEventListener("abort", () => resolve(), { once: true });
|
||||
});
|
||||
|
||||
const withTimeout = async <T>(promise: Promise<T>, timeoutMs = 2000): Promise<T> => {
|
||||
let timer: ReturnType<typeof setTimeout> | null = null;
|
||||
const timeout = new Promise<never>((_, reject) => {
|
||||
timer = setTimeout(() => reject(new Error(`Timed out after ${timeoutMs}ms`)), timeoutMs);
|
||||
});
|
||||
|
||||
try {
|
||||
return await Promise.race([promise, timeout]);
|
||||
} finally {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const startWsServer = async (
|
||||
handler: MediaStreamHandler,
|
||||
): Promise<{
|
||||
url: string;
|
||||
close: () => Promise<void>;
|
||||
}> => {
|
||||
const server = http.createServer();
|
||||
server.on("upgrade", (request, socket, head) => {
|
||||
handler.handleUpgrade(request, socket, head);
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
server.listen(0, "127.0.0.1", resolve);
|
||||
});
|
||||
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("Failed to resolve test server address");
|
||||
}
|
||||
|
||||
return {
|
||||
url: `ws://127.0.0.1:${address.port}/voice/stream`,
|
||||
close: async () => {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server.close((err) => (err ? reject(err) : resolve()));
|
||||
});
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
const connectWs = async (url: string): Promise<WebSocket> => {
|
||||
const ws = new WebSocket(url);
|
||||
await withTimeout(once(ws, "open") as Promise<[unknown]>);
|
||||
return ws;
|
||||
};
|
||||
|
||||
const waitForClose = async (
|
||||
ws: WebSocket,
|
||||
): Promise<{
|
||||
code: number;
|
||||
reason: string;
|
||||
}> => {
|
||||
const [code, reason] = (await withTimeout(once(ws, "close") as Promise<[number, Buffer]>)) ?? [];
|
||||
return {
|
||||
code,
|
||||
reason: Buffer.isBuffer(reason) ? reason.toString() : String(reason || ""),
|
||||
};
|
||||
};
|
||||
|
||||
describe("MediaStreamHandler TTS queue", () => {
|
||||
it("serializes TTS playback and resolves in order", async () => {
|
||||
const handler = new MediaStreamHandler({
|
||||
@@ -94,3 +161,111 @@ describe("MediaStreamHandler TTS queue", () => {
|
||||
expect(queuedRan).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("MediaStreamHandler security hardening", () => {
|
||||
it("closes idle pre-start connections after timeout", async () => {
|
||||
const shouldAcceptStreamCalls: Array<{ callId: string; streamSid: string; token?: string }> =
|
||||
[];
|
||||
const handler = new MediaStreamHandler({
|
||||
sttProvider: createStubSttProvider(),
|
||||
preStartTimeoutMs: 40,
|
||||
shouldAcceptStream: (params) => {
|
||||
shouldAcceptStreamCalls.push(params);
|
||||
return true;
|
||||
},
|
||||
});
|
||||
const server = await startWsServer(handler);
|
||||
|
||||
try {
|
||||
const ws = await connectWs(server.url);
|
||||
const closed = await waitForClose(ws);
|
||||
|
||||
expect(closed.code).toBe(1008);
|
||||
expect(closed.reason).toBe("Start timeout");
|
||||
expect(shouldAcceptStreamCalls).toEqual([]);
|
||||
} finally {
|
||||
await server.close();
|
||||
}
|
||||
});
|
||||
|
||||
it("enforces pending connection limits", async () => {
|
||||
const handler = new MediaStreamHandler({
|
||||
sttProvider: createStubSttProvider(),
|
||||
preStartTimeoutMs: 5_000,
|
||||
maxPendingConnections: 1,
|
||||
maxPendingConnectionsPerIp: 1,
|
||||
});
|
||||
const server = await startWsServer(handler);
|
||||
|
||||
try {
|
||||
const first = await connectWs(server.url);
|
||||
const second = await connectWs(server.url);
|
||||
const secondClosed = await waitForClose(second);
|
||||
|
||||
expect(secondClosed.code).toBe(1013);
|
||||
expect(secondClosed.reason).toContain("Too many pending");
|
||||
expect(first.readyState).toBe(WebSocket.OPEN);
|
||||
|
||||
first.close();
|
||||
await waitForClose(first);
|
||||
} finally {
|
||||
await server.close();
|
||||
}
|
||||
});
|
||||
|
||||
it("rejects upgrades when max connection cap is reached", async () => {
|
||||
const handler = new MediaStreamHandler({
|
||||
sttProvider: createStubSttProvider(),
|
||||
preStartTimeoutMs: 5_000,
|
||||
maxConnections: 1,
|
||||
maxPendingConnections: 10,
|
||||
maxPendingConnectionsPerIp: 10,
|
||||
});
|
||||
const server = await startWsServer(handler);
|
||||
|
||||
try {
|
||||
const first = await connectWs(server.url);
|
||||
const secondError = await withTimeout(
|
||||
new Promise<Error>((resolve) => {
|
||||
const ws = new WebSocket(server.url);
|
||||
ws.once("error", (err) => resolve(err as Error));
|
||||
}),
|
||||
);
|
||||
|
||||
expect(secondError.message).toContain("Unexpected server response: 503");
|
||||
|
||||
first.close();
|
||||
await waitForClose(first);
|
||||
} finally {
|
||||
await server.close();
|
||||
}
|
||||
});
|
||||
|
||||
it("clears pending state after valid start", async () => {
|
||||
const handler = new MediaStreamHandler({
|
||||
sttProvider: createStubSttProvider(),
|
||||
preStartTimeoutMs: 40,
|
||||
shouldAcceptStream: () => true,
|
||||
});
|
||||
const server = await startWsServer(handler);
|
||||
|
||||
try {
|
||||
const ws = await connectWs(server.url);
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
event: "start",
|
||||
streamSid: "MZ123",
|
||||
start: { callSid: "CA123", customParameters: { token: "token-123" } },
|
||||
}),
|
||||
);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 80));
|
||||
expect(ws.readyState).toBe(WebSocket.OPEN);
|
||||
|
||||
ws.close();
|
||||
await waitForClose(ws);
|
||||
} finally {
|
||||
await server.close();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -21,6 +21,14 @@ import type {
|
||||
export interface MediaStreamConfig {
|
||||
/** STT provider for transcription */
|
||||
sttProvider: OpenAIRealtimeSTTProvider;
|
||||
/** Close sockets that never send a valid `start` frame within this window. */
|
||||
preStartTimeoutMs?: number;
|
||||
/** Max concurrent pre-start sockets. */
|
||||
maxPendingConnections?: number;
|
||||
/** Max concurrent pre-start sockets from a single source IP. */
|
||||
maxPendingConnectionsPerIp?: number;
|
||||
/** Max total open sockets (pending + active sessions). */
|
||||
maxConnections?: number;
|
||||
/** Validate whether to accept a media stream for the given call ID */
|
||||
shouldAcceptStream?: (params: { callId: string; streamSid: string; token?: string }) => boolean;
|
||||
/** Callback when transcript is received */
|
||||
@@ -52,6 +60,16 @@ type TtsQueueEntry = {
|
||||
reject: (error: unknown) => void;
|
||||
};
|
||||
|
||||
type PendingConnection = {
|
||||
ip: string;
|
||||
timeout: ReturnType<typeof setTimeout>;
|
||||
};
|
||||
|
||||
const DEFAULT_PRE_START_TIMEOUT_MS = 5000;
|
||||
const DEFAULT_MAX_PENDING_CONNECTIONS = 32;
|
||||
const DEFAULT_MAX_PENDING_CONNECTIONS_PER_IP = 4;
|
||||
const DEFAULT_MAX_CONNECTIONS = 128;
|
||||
|
||||
/**
|
||||
* Manages WebSocket connections for Twilio media streams.
|
||||
*/
|
||||
@@ -59,6 +77,14 @@ export class MediaStreamHandler {
|
||||
private wss: WebSocketServer | null = null;
|
||||
private sessions = new Map<string, StreamSession>();
|
||||
private config: MediaStreamConfig;
|
||||
/** Pending sockets that have upgraded but not yet sent an accepted `start` frame. */
|
||||
private pendingConnections = new Map<WebSocket, PendingConnection>();
|
||||
/** Pending socket count per remote IP for pre-auth throttling. */
|
||||
private pendingByIp = new Map<string, number>();
|
||||
private preStartTimeoutMs: number;
|
||||
private maxPendingConnections: number;
|
||||
private maxPendingConnectionsPerIp: number;
|
||||
private maxConnections: number;
|
||||
/** TTS playback queues per stream (serialize audio to prevent overlap) */
|
||||
private ttsQueues = new Map<string, TtsQueueEntry[]>();
|
||||
/** Whether TTS is currently playing per stream */
|
||||
@@ -68,6 +94,11 @@ export class MediaStreamHandler {
|
||||
|
||||
constructor(config: MediaStreamConfig) {
|
||||
this.config = config;
|
||||
this.preStartTimeoutMs = config.preStartTimeoutMs ?? DEFAULT_PRE_START_TIMEOUT_MS;
|
||||
this.maxPendingConnections = config.maxPendingConnections ?? DEFAULT_MAX_PENDING_CONNECTIONS;
|
||||
this.maxPendingConnectionsPerIp =
|
||||
config.maxPendingConnectionsPerIp ?? DEFAULT_MAX_PENDING_CONNECTIONS_PER_IP;
|
||||
this.maxConnections = config.maxConnections ?? DEFAULT_MAX_CONNECTIONS;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -79,6 +110,12 @@ export class MediaStreamHandler {
|
||||
this.wss.on("connection", (ws, req) => this.handleConnection(ws, req));
|
||||
}
|
||||
|
||||
const currentConnections = this.wss.clients.size;
|
||||
if (currentConnections >= this.maxConnections) {
|
||||
this.rejectUpgrade(socket, 503, "Too many media stream connections");
|
||||
return;
|
||||
}
|
||||
|
||||
this.wss.handleUpgrade(request, socket, head, (ws) => {
|
||||
this.wss?.emit("connection", ws, request);
|
||||
});
|
||||
@@ -90,6 +127,12 @@ export class MediaStreamHandler {
|
||||
private async handleConnection(ws: WebSocket, _request: IncomingMessage): Promise<void> {
|
||||
let session: StreamSession | null = null;
|
||||
const streamToken = this.getStreamToken(_request);
|
||||
const ip = this.getClientIp(_request);
|
||||
|
||||
if (!this.registerPendingConnection(ws, ip)) {
|
||||
ws.close(1013, "Too many pending media stream connections");
|
||||
return;
|
||||
}
|
||||
|
||||
ws.on("message", async (data: Buffer) => {
|
||||
try {
|
||||
@@ -102,6 +145,9 @@ export class MediaStreamHandler {
|
||||
|
||||
case "start":
|
||||
session = await this.handleStart(ws, message, streamToken);
|
||||
if (session) {
|
||||
this.clearPendingConnection(ws);
|
||||
}
|
||||
break;
|
||||
|
||||
case "media":
|
||||
@@ -125,6 +171,7 @@ export class MediaStreamHandler {
|
||||
});
|
||||
|
||||
ws.on("close", () => {
|
||||
this.clearPendingConnection(ws);
|
||||
if (session) {
|
||||
this.handleStop(session);
|
||||
}
|
||||
@@ -226,6 +273,69 @@ export class MediaStreamHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private getClientIp(request: IncomingMessage): string {
|
||||
return request.socket.remoteAddress || "unknown";
|
||||
}
|
||||
|
||||
private registerPendingConnection(ws: WebSocket, ip: string): boolean {
|
||||
if (this.pendingConnections.size >= this.maxPendingConnections) {
|
||||
console.warn("[MediaStream] Rejecting connection: pending connection limit reached");
|
||||
return false;
|
||||
}
|
||||
|
||||
const pendingForIp = this.pendingByIp.get(ip) ?? 0;
|
||||
if (pendingForIp >= this.maxPendingConnectionsPerIp) {
|
||||
console.warn(`[MediaStream] Rejecting connection: pending per-IP limit reached (${ip})`);
|
||||
return false;
|
||||
}
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
if (!this.pendingConnections.has(ws)) {
|
||||
return;
|
||||
}
|
||||
console.warn(
|
||||
`[MediaStream] Closing pre-start idle connection after ${this.preStartTimeoutMs}ms (${ip})`,
|
||||
);
|
||||
ws.close(1008, "Start timeout");
|
||||
}, this.preStartTimeoutMs);
|
||||
|
||||
timeout.unref?.();
|
||||
this.pendingConnections.set(ws, { ip, timeout });
|
||||
this.pendingByIp.set(ip, pendingForIp + 1);
|
||||
return true;
|
||||
}
|
||||
|
||||
private clearPendingConnection(ws: WebSocket): void {
|
||||
const pending = this.pendingConnections.get(ws);
|
||||
if (!pending) {
|
||||
return;
|
||||
}
|
||||
|
||||
clearTimeout(pending.timeout);
|
||||
this.pendingConnections.delete(ws);
|
||||
|
||||
const current = this.pendingByIp.get(pending.ip) ?? 0;
|
||||
if (current <= 1) {
|
||||
this.pendingByIp.delete(pending.ip);
|
||||
return;
|
||||
}
|
||||
this.pendingByIp.set(pending.ip, current - 1);
|
||||
}
|
||||
|
||||
private rejectUpgrade(socket: Duplex, statusCode: 429 | 503, message: string): void {
|
||||
const statusText = statusCode === 429 ? "Too Many Requests" : "Service Unavailable";
|
||||
const body = `${message}\n`;
|
||||
socket.write(
|
||||
`HTTP/1.1 ${statusCode} ${statusText}\r\n` +
|
||||
"Connection: close\r\n" +
|
||||
"Content-Type: text/plain; charset=utf-8\r\n" +
|
||||
`Content-Length: ${Buffer.byteLength(body)}\r\n` +
|
||||
"\r\n" +
|
||||
body,
|
||||
);
|
||||
socket.destroy();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an active session with an open WebSocket, or undefined if unavailable.
|
||||
*/
|
||||
|
||||
@@ -77,6 +77,10 @@ export class VoiceCallWebhookServer {
|
||||
|
||||
const streamConfig: MediaStreamConfig = {
|
||||
sttProvider,
|
||||
preStartTimeoutMs: this.config.streaming?.preStartTimeoutMs,
|
||||
maxPendingConnections: this.config.streaming?.maxPendingConnections,
|
||||
maxPendingConnectionsPerIp: this.config.streaming?.maxPendingConnectionsPerIp,
|
||||
maxConnections: this.config.streaming?.maxConnections,
|
||||
shouldAcceptStream: ({ callId, token }) => {
|
||||
const call = this.manager.getCallByProviderCallId(callId);
|
||||
if (!call) {
|
||||
@@ -192,9 +196,8 @@ export class VoiceCallWebhookServer {
|
||||
// Handle WebSocket upgrades for media streams
|
||||
if (this.mediaStreamHandler) {
|
||||
this.server.on("upgrade", (request, socket, head) => {
|
||||
const url = new URL(request.url || "/", `http://${request.headers.host}`);
|
||||
|
||||
if (url.pathname === streamPath) {
|
||||
const path = this.getUpgradePathname(request);
|
||||
if (path === streamPath) {
|
||||
console.log("[voice-call] WebSocket upgrade for media stream");
|
||||
this.mediaStreamHandler?.handleUpgrade(request, socket, head);
|
||||
} else {
|
||||
@@ -269,6 +272,15 @@ export class VoiceCallWebhookServer {
|
||||
});
|
||||
}
|
||||
|
||||
private getUpgradePathname(request: http.IncomingMessage): string | null {
|
||||
try {
|
||||
const host = request.headers.host || "localhost";
|
||||
return new URL(request.url || "/", `http://${host}`).pathname;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle incoming HTTP request.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user