From 3cbcba10cf30c2ffb898f0d8c7dfb929f15f8930 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 13 Feb 2026 19:14:36 +0100 Subject: [PATCH] fix(security): enforce bounded webhook body handling --- extensions/bluebubbles/src/monitor.ts | 97 +++-- extensions/feishu/src/monitor.ts | 30 +- extensions/googlechat/src/monitor.ts | 62 +--- extensions/msteams/src/monitor.ts | 12 +- .../src/monitor.read-body.test.ts | 38 ++ extensions/nextcloud-talk/src/monitor.ts | 44 ++- extensions/nextcloud-talk/src/types.ts | 1 + extensions/nostr/src/nostr-profile-http.ts | 65 +--- extensions/voice-call/src/webhook.ts | 49 +-- extensions/zalo/src/monitor.ts | 52 +-- src/gateway/hooks.ts | 57 +-- src/gateway/http-common.ts | 12 + src/gateway/server-http.ts | 7 +- src/infra/http-body.test.ts | 116 ++++++ src/infra/http-body.ts | 347 ++++++++++++++++++ src/line/monitor.read-body.test.ts | 38 ++ src/line/monitor.ts | 35 +- src/plugin-sdk/index.ts | 10 + src/slack/monitor/provider.ts | 23 +- src/telegram/webhook.ts | 20 + 20 files changed, 834 insertions(+), 281 deletions(-) create mode 100644 extensions/nextcloud-talk/src/monitor.read-body.test.ts create mode 100644 src/infra/http-body.test.ts create mode 100644 src/infra/http-body.ts create mode 100644 src/line/monitor.read-body.test.ts diff --git a/extensions/bluebubbles/src/monitor.ts b/extensions/bluebubbles/src/monitor.ts index bc325b48dab..cc69bc48246 100644 --- a/extensions/bluebubbles/src/monitor.ts +++ b/extensions/bluebubbles/src/monitor.ts @@ -2,11 +2,14 @@ import type { IncomingMessage, ServerResponse } from "node:http"; import type { OpenClawConfig } from "openclaw/plugin-sdk"; import { createReplyPrefixOptions, + isRequestBodyLimitError, logAckFailure, logInboundDrop, logTypingFailure, + readRequestBodyWithLimit, resolveAckReaction, resolveControlCommandGate, + requestBodyErrorToText, } from "openclaw/plugin-sdk"; import type { ResolvedBlueBubblesAccount } from "./accounts.js"; import type { BlueBubblesAccountConfig, BlueBubblesAttachment } from "./types.js"; @@ -511,63 +514,40 @@ export function registerBlueBubblesWebhookTarget(target: WebhookTarget): () => v } async function readJsonBody(req: IncomingMessage, maxBytes: number, timeoutMs = 30_000) { - const chunks: Buffer[] = []; - let total = 0; - return await new Promise<{ ok: boolean; value?: unknown; error?: string }>((resolve) => { - let done = false; - const finish = (result: { ok: boolean; value?: unknown; error?: string }) => { - if (done) { - return; - } - done = true; - clearTimeout(timer); - resolve(result); - }; + let rawBody = ""; + try { + rawBody = await readRequestBodyWithLimit(req, { maxBytes, timeoutMs }); + } catch (error) { + if (isRequestBodyLimitError(error, "PAYLOAD_TOO_LARGE")) { + return { ok: false, error: "payload too large" }; + } + if (isRequestBodyLimitError(error, "REQUEST_BODY_TIMEOUT")) { + return { ok: false, error: requestBodyErrorToText("REQUEST_BODY_TIMEOUT") }; + } + if (isRequestBodyLimitError(error, "CONNECTION_CLOSED")) { + return { ok: false, error: requestBodyErrorToText("CONNECTION_CLOSED") }; + } + return { ok: false, error: error instanceof Error ? error.message : String(error) }; + } - const timer = setTimeout(() => { - finish({ ok: false, error: "request body timeout" }); - req.destroy(); - }, timeoutMs); - - req.on("data", (chunk: Buffer) => { - total += chunk.length; - if (total > maxBytes) { - finish({ ok: false, error: "payload too large" }); - req.destroy(); - return; + try { + const raw = rawBody.toString(); + if (!raw.trim()) { + return { ok: false, error: "empty payload" }; + } + try { + return { ok: true, value: JSON.parse(raw) as unknown }; + } catch { + const params = new URLSearchParams(raw); + const payload = params.get("payload") ?? params.get("data") ?? params.get("message"); + if (payload) { + return { ok: true, value: JSON.parse(payload) as unknown }; } - chunks.push(chunk); - }); - req.on("end", () => { - try { - const raw = Buffer.concat(chunks).toString("utf8"); - if (!raw.trim()) { - finish({ ok: false, error: "empty payload" }); - return; - } - try { - finish({ ok: true, value: JSON.parse(raw) as unknown }); - return; - } catch { - const params = new URLSearchParams(raw); - const payload = params.get("payload") ?? params.get("data") ?? params.get("message"); - if (payload) { - finish({ ok: true, value: JSON.parse(payload) as unknown }); - return; - } - throw new Error("invalid json"); - } - } catch (err) { - finish({ ok: false, error: err instanceof Error ? err.message : String(err) }); - } - }); - req.on("error", (err) => { - finish({ ok: false, error: err instanceof Error ? err.message : String(err) }); - }); - req.on("close", () => { - finish({ ok: false, error: "connection closed" }); - }); - }); + throw new Error("invalid json"); + } + } catch (error) { + return { ok: false, error: error instanceof Error ? error.message : String(error) }; + } } function asRecord(value: unknown): Record | null { @@ -1461,7 +1441,12 @@ export async function handleBlueBubblesWebhookRequest( const body = await readJsonBody(req, 1024 * 1024); if (!body.ok) { - res.statusCode = body.error === "payload too large" ? 413 : 400; + res.statusCode = + body.error === "payload too large" + ? 413 + : body.error === requestBodyErrorToText("REQUEST_BODY_TIMEOUT") + ? 408 + : 400; res.end(body.error ?? "invalid payload"); console.warn(`[bluebubbles] webhook rejected: ${body.error ?? "invalid payload"}`); return true; diff --git a/extensions/feishu/src/monitor.ts b/extensions/feishu/src/monitor.ts index 31a890c2f92..51af5a4aeb4 100644 --- a/extensions/feishu/src/monitor.ts +++ b/extensions/feishu/src/monitor.ts @@ -1,6 +1,11 @@ -import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "openclaw/plugin-sdk"; import * as Lark from "@larksuiteoapi/node-sdk"; import * as http from "http"; +import { + type ClawdbotConfig, + type RuntimeEnv, + type HistoryEntry, + installRequestBodyLimitGuard, +} from "openclaw/plugin-sdk"; import type { ResolvedFeishuAccount } from "./types.js"; import { resolveFeishuAccount, listEnabledFeishuAccounts } from "./accounts.js"; import { handleFeishuMessage, type FeishuMessageEvent, type FeishuBotAddedEvent } from "./bot.js"; @@ -18,6 +23,8 @@ export type MonitorFeishuOpts = { const wsClients = new Map(); const httpServers = new Map(); const botOpenIds = new Map(); +const FEISHU_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; +const FEISHU_WEBHOOK_BODY_TIMEOUT_MS = 30_000; async function fetchBotOpenId(account: ResolvedFeishuAccount): Promise { try { @@ -197,7 +204,26 @@ async function monitorWebhook({ log(`feishu[${accountId}]: starting Webhook server on port ${port}, path ${path}...`); const server = http.createServer(); - server.on("request", Lark.adaptDefault(path, eventDispatcher, { autoChallenge: true })); + const webhookHandler = Lark.adaptDefault(path, eventDispatcher, { autoChallenge: true }); + server.on("request", (req, res) => { + const guard = installRequestBodyLimitGuard(req, res, { + maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES, + timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS, + responseFormat: "text", + }); + if (guard.isTripped()) { + return; + } + void Promise.resolve(webhookHandler(req, res)) + .catch((err) => { + if (!guard.isTripped()) { + error(`feishu[${accountId}]: webhook handler error: ${String(err)}`); + } + }) + .finally(() => { + guard.dispose(); + }); + }); httpServers.set(accountId, server); return new Promise((resolve, reject) => { diff --git a/extensions/googlechat/src/monitor.ts b/extensions/googlechat/src/monitor.ts index fe8eeef68ba..4ca340e845c 100644 --- a/extensions/googlechat/src/monitor.ts +++ b/extensions/googlechat/src/monitor.ts @@ -1,6 +1,11 @@ import type { IncomingMessage, ServerResponse } from "node:http"; import type { OpenClawConfig } from "openclaw/plugin-sdk"; -import { createReplyPrefixOptions, resolveMentionGatingWithBypass } from "openclaw/plugin-sdk"; +import { + createReplyPrefixOptions, + readJsonBodyWithLimit, + requestBodyErrorToText, + resolveMentionGatingWithBypass, +} from "openclaw/plugin-sdk"; import type { GoogleChatAnnotation, GoogleChatAttachment, @@ -84,46 +89,6 @@ function resolveWebhookPath(webhookPath?: string, webhookUrl?: string): string | return "/googlechat"; } -async function readJsonBody(req: IncomingMessage, maxBytes: number) { - const chunks: Buffer[] = []; - let total = 0; - return await new Promise<{ ok: boolean; value?: unknown; error?: string }>((resolve) => { - let resolved = false; - const doResolve = (value: { ok: boolean; value?: unknown; error?: string }) => { - if (resolved) { - return; - } - resolved = true; - req.removeAllListeners(); - resolve(value); - }; - req.on("data", (chunk: Buffer) => { - total += chunk.length; - if (total > maxBytes) { - doResolve({ ok: false, error: "payload too large" }); - req.destroy(); - return; - } - chunks.push(chunk); - }); - req.on("end", () => { - try { - const raw = Buffer.concat(chunks).toString("utf8"); - if (!raw.trim()) { - doResolve({ ok: false, error: "empty payload" }); - return; - } - doResolve({ ok: true, value: JSON.parse(raw) as unknown }); - } catch (err) { - doResolve({ ok: false, error: err instanceof Error ? err.message : String(err) }); - } - }); - req.on("error", (err) => { - doResolve({ ok: false, error: err instanceof Error ? err.message : String(err) }); - }); - }); -} - export function registerGoogleChatWebhookTarget(target: WebhookTarget): () => void { const key = normalizeWebhookPath(target.path); const normalizedTarget = { ...target, path: key }; @@ -178,10 +143,19 @@ export async function handleGoogleChatWebhookRequest( ? authHeader.slice("bearer ".length) : ""; - const body = await readJsonBody(req, 1024 * 1024); + const body = await readJsonBodyWithLimit(req, { + maxBytes: 1024 * 1024, + timeoutMs: 30_000, + emptyObjectOnEmpty: false, + }); if (!body.ok) { - res.statusCode = body.error === "payload too large" ? 413 : 400; - res.end(body.error ?? "invalid payload"); + res.statusCode = + body.code === "PAYLOAD_TOO_LARGE" ? 413 : body.code === "REQUEST_BODY_TIMEOUT" ? 408 : 400; + res.end( + body.code === "REQUEST_BODY_TIMEOUT" + ? requestBodyErrorToText("REQUEST_BODY_TIMEOUT") + : body.error, + ); return true; } diff --git a/extensions/msteams/src/monitor.ts b/extensions/msteams/src/monitor.ts index 6c97d3c25b4..f26c8018eda 100644 --- a/extensions/msteams/src/monitor.ts +++ b/extensions/msteams/src/monitor.ts @@ -1,5 +1,6 @@ import type { Request, Response } from "express"; import { + DEFAULT_WEBHOOK_MAX_BODY_BYTES, mergeAllowlist, summarizeMapping, type OpenClawConfig, @@ -32,6 +33,8 @@ export type MonitorMSTeamsResult = { shutdown: () => Promise; }; +const MSTEAMS_WEBHOOK_MAX_BODY_BYTES = DEFAULT_WEBHOOK_MAX_BODY_BYTES; + export async function monitorMSTeamsProvider( opts: MonitorMSTeamsOpts, ): Promise { @@ -239,7 +242,14 @@ export async function monitorMSTeamsProvider( // Create Express server const expressApp = express.default(); - expressApp.use(express.json()); + expressApp.use(express.json({ limit: MSTEAMS_WEBHOOK_MAX_BODY_BYTES })); + expressApp.use((err: unknown, _req: Request, res: Response, next: (err?: unknown) => void) => { + if (err && typeof err === "object" && "status" in err && err.status === 413) { + res.status(413).json({ error: "Payload too large" }); + return; + } + next(err); + }); expressApp.use(authorizeJWT(authConfig)); // Set up the messages endpoint - use configured path and /api/messages as fallback diff --git a/extensions/nextcloud-talk/src/monitor.read-body.test.ts b/extensions/nextcloud-talk/src/monitor.read-body.test.ts new file mode 100644 index 00000000000..c54096a65d9 --- /dev/null +++ b/extensions/nextcloud-talk/src/monitor.read-body.test.ts @@ -0,0 +1,38 @@ +import type { IncomingMessage } from "node:http"; +import { EventEmitter } from "node:events"; +import { describe, expect, it } from "vitest"; +import { readNextcloudTalkWebhookBody } from "./monitor.js"; + +function createMockRequest(chunks: string[]): IncomingMessage { + const req = new EventEmitter() as IncomingMessage & { destroyed?: boolean; destroy: () => void }; + req.destroyed = false; + req.headers = {}; + req.destroy = () => { + req.destroyed = true; + }; + + void Promise.resolve().then(() => { + for (const chunk of chunks) { + req.emit("data", Buffer.from(chunk, "utf-8")); + if (req.destroyed) { + return; + } + } + req.emit("end"); + }); + + return req; +} + +describe("readNextcloudTalkWebhookBody", () => { + it("reads valid body within max bytes", async () => { + const req = createMockRequest(['{"type":"Create"}']); + const body = await readNextcloudTalkWebhookBody(req, 1024); + expect(body).toBe('{"type":"Create"}'); + }); + + it("rejects when payload exceeds max bytes", async () => { + const req = createMockRequest(["x".repeat(300)]); + await expect(readNextcloudTalkWebhookBody(req, 128)).rejects.toThrow("PayloadTooLarge"); + }); +}); diff --git a/extensions/nextcloud-talk/src/monitor.ts b/extensions/nextcloud-talk/src/monitor.ts index 877313fa19a..f0d87dea103 100644 --- a/extensions/nextcloud-talk/src/monitor.ts +++ b/extensions/nextcloud-talk/src/monitor.ts @@ -1,5 +1,10 @@ -import type { RuntimeEnv } from "openclaw/plugin-sdk"; import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http"; +import { + type RuntimeEnv, + isRequestBodyLimitError, + readRequestBodyWithLimit, + requestBodyErrorToText, +} from "openclaw/plugin-sdk"; import type { CoreConfig, NextcloudTalkInboundMessage, @@ -14,6 +19,8 @@ import { extractNextcloudTalkHeaders, verifyNextcloudTalkSignature } from "./sig const DEFAULT_WEBHOOK_PORT = 8788; const DEFAULT_WEBHOOK_HOST = "0.0.0.0"; const DEFAULT_WEBHOOK_PATH = "/nextcloud-talk-webhook"; +const DEFAULT_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; +const DEFAULT_WEBHOOK_BODY_TIMEOUT_MS = 30_000; const HEALTH_PATH = "/healthz"; function formatError(err: unknown): string { @@ -62,12 +69,13 @@ function payloadToInboundMessage( }; } -function readBody(req: IncomingMessage): Promise { - return new Promise((resolve, reject) => { - const chunks: Buffer[] = []; - req.on("data", (chunk: Buffer) => chunks.push(chunk)); - req.on("end", () => resolve(Buffer.concat(chunks).toString("utf-8"))); - req.on("error", reject); +export function readNextcloudTalkWebhookBody( + req: IncomingMessage, + maxBodyBytes: number, +): Promise { + return readRequestBodyWithLimit(req, { + maxBytes: maxBodyBytes, + timeoutMs: DEFAULT_WEBHOOK_BODY_TIMEOUT_MS, }); } @@ -77,6 +85,12 @@ export function createNextcloudTalkWebhookServer(opts: NextcloudTalkWebhookServe stop: () => void; } { const { port, host, path, secret, onMessage, onError, abortSignal } = opts; + const maxBodyBytes = + typeof opts.maxBodyBytes === "number" && + Number.isFinite(opts.maxBodyBytes) && + opts.maxBodyBytes > 0 + ? Math.floor(opts.maxBodyBytes) + : DEFAULT_WEBHOOK_MAX_BODY_BYTES; const server = createServer(async (req: IncomingMessage, res: ServerResponse) => { if (req.url === HEALTH_PATH) { @@ -92,7 +106,7 @@ export function createNextcloudTalkWebhookServer(opts: NextcloudTalkWebhookServe } try { - const body = await readBody(req); + const body = await readNextcloudTalkWebhookBody(req, maxBodyBytes); const headers = extractNextcloudTalkHeaders( req.headers as Record, @@ -140,6 +154,20 @@ export function createNextcloudTalkWebhookServer(opts: NextcloudTalkWebhookServe onError?.(err instanceof Error ? err : new Error(formatError(err))); } } catch (err) { + if (isRequestBodyLimitError(err, "PAYLOAD_TOO_LARGE")) { + if (!res.headersSent) { + res.writeHead(413, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ error: "Payload too large" })); + } + return; + } + if (isRequestBodyLimitError(err, "REQUEST_BODY_TIMEOUT")) { + if (!res.headersSent) { + res.writeHead(408, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ error: requestBodyErrorToText("REQUEST_BODY_TIMEOUT") })); + } + return; + } const error = err instanceof Error ? err : new Error(formatError(err)); onError?.(error); if (!res.headersSent) { diff --git a/extensions/nextcloud-talk/src/types.ts b/extensions/nextcloud-talk/src/types.ts index 9d851b39bc6..ecdbe8437ae 100644 --- a/extensions/nextcloud-talk/src/types.ts +++ b/extensions/nextcloud-talk/src/types.ts @@ -168,6 +168,7 @@ export type NextcloudTalkWebhookServerOptions = { host: string; path: string; secret: string; + maxBodyBytes?: number; onMessage: (message: NextcloudTalkInboundMessage) => void | Promise; onError?: (error: Error) => void; abortSignal?: AbortSignal; diff --git a/extensions/nostr/src/nostr-profile-http.ts b/extensions/nostr/src/nostr-profile-http.ts index ebb98e885d7..57098fd7f47 100644 --- a/extensions/nostr/src/nostr-profile-http.ts +++ b/extensions/nostr/src/nostr-profile-http.ts @@ -8,6 +8,7 @@ */ import type { IncomingMessage, ServerResponse } from "node:http"; +import { readJsonBodyWithLimit, requestBodyErrorToText } from "openclaw/plugin-sdk"; import { z } from "zod"; import { publishNostrProfile, getNostrProfileState } from "./channel.js"; import { NostrProfileSchema, type NostrProfile } from "./config-schema.js"; @@ -234,54 +235,24 @@ async function readJsonBody( maxBytes = 64 * 1024, timeoutMs = 30_000, ): Promise { - return new Promise((resolve, reject) => { - let done = false; - const finish = (fn: () => void) => { - if (done) { - return; - } - done = true; - clearTimeout(timer); - fn(); - }; - - const timer = setTimeout(() => { - finish(() => { - const err = new Error("Request body timeout"); - req.destroy(err); - reject(err); - }); - }, timeoutMs); - - const chunks: Buffer[] = []; - let totalBytes = 0; - - req.on("data", (chunk: Buffer) => { - totalBytes += chunk.length; - if (totalBytes > maxBytes) { - finish(() => { - reject(new Error("Request body too large")); - req.destroy(); - }); - return; - } - chunks.push(chunk); - }); - - req.on("end", () => { - finish(() => { - try { - const body = Buffer.concat(chunks).toString("utf-8"); - resolve(body ? JSON.parse(body) : {}); - } catch { - reject(new Error("Invalid JSON")); - } - }); - }); - - req.on("error", (err) => finish(() => reject(err))); - req.on("close", () => finish(() => reject(new Error("Connection closed")))); + const result = await readJsonBodyWithLimit(req, { + maxBytes, + timeoutMs, + emptyObjectOnEmpty: true, }); + if (result.ok) { + return result.value; + } + if (result.code === "PAYLOAD_TOO_LARGE") { + throw new Error("Request body too large"); + } + if (result.code === "REQUEST_BODY_TIMEOUT") { + throw new Error(requestBodyErrorToText("REQUEST_BODY_TIMEOUT")); + } + if (result.code === "CONNECTION_CLOSED") { + throw new Error(requestBodyErrorToText("CONNECTION_CLOSED")); + } + throw new Error(result.code === "INVALID_JSON" ? "Invalid JSON" : result.error); } function parseAccountIdFromPath(pathname: string): string | null { diff --git a/extensions/voice-call/src/webhook.ts b/extensions/voice-call/src/webhook.ts index 99f14a4680f..79ecc843cd4 100644 --- a/extensions/voice-call/src/webhook.ts +++ b/extensions/voice-call/src/webhook.ts @@ -1,6 +1,11 @@ import { spawn } from "node:child_process"; import http from "node:http"; import { URL } from "node:url"; +import { + isRequestBodyLimitError, + readRequestBodyWithLimit, + requestBodyErrorToText, +} from "openclaw/plugin-sdk"; import type { VoiceCallConfig } from "./config.js"; import type { CoreConfig } from "./core-bridge.js"; import type { CallManager } from "./manager.js"; @@ -244,11 +249,16 @@ export class VoiceCallWebhookServer { try { body = await this.readBody(req, MAX_WEBHOOK_BODY_BYTES); } catch (err) { - if (err instanceof Error && err.message === "PayloadTooLarge") { + if (isRequestBodyLimitError(err, "PAYLOAD_TOO_LARGE")) { res.statusCode = 413; res.end("Payload Too Large"); return; } + if (isRequestBodyLimitError(err, "REQUEST_BODY_TIMEOUT")) { + res.statusCode = 408; + res.end(requestBodyErrorToText("REQUEST_BODY_TIMEOUT")); + return; + } throw err; } @@ -303,42 +313,7 @@ export class VoiceCallWebhookServer { maxBytes: number, timeoutMs = 30_000, ): Promise { - return new Promise((resolve, reject) => { - let done = false; - const finish = (fn: () => void) => { - if (done) { - return; - } - done = true; - clearTimeout(timer); - fn(); - }; - - const timer = setTimeout(() => { - finish(() => { - const err = new Error("Request body timeout"); - req.destroy(err); - reject(err); - }); - }, timeoutMs); - - const chunks: Buffer[] = []; - let totalBytes = 0; - req.on("data", (chunk: Buffer) => { - totalBytes += chunk.length; - if (totalBytes > maxBytes) { - finish(() => { - req.destroy(); - reject(new Error("PayloadTooLarge")); - }); - return; - } - chunks.push(chunk); - }); - req.on("end", () => finish(() => resolve(Buffer.concat(chunks).toString("utf-8")))); - req.on("error", (err) => finish(() => reject(err))); - req.on("close", () => finish(() => reject(new Error("Connection closed")))); - }); + return readRequestBodyWithLimit(req, { maxBytes, timeoutMs }); } /** diff --git a/extensions/zalo/src/monitor.ts b/extensions/zalo/src/monitor.ts index 1847cc217ea..171033b75e3 100644 --- a/extensions/zalo/src/monitor.ts +++ b/extensions/zalo/src/monitor.ts @@ -1,6 +1,10 @@ import type { IncomingMessage, ServerResponse } from "node:http"; import type { OpenClawConfig, MarkdownTableMode } from "openclaw/plugin-sdk"; -import { createReplyPrefixOptions } from "openclaw/plugin-sdk"; +import { + createReplyPrefixOptions, + readJsonBodyWithLimit, + requestBodyErrorToText, +} from "openclaw/plugin-sdk"; import type { ResolvedZaloAccount } from "./accounts.js"; import { ZaloApiError, @@ -61,37 +65,6 @@ function isSenderAllowed(senderId: string, allowFrom: string[]): boolean { }); } -async function readJsonBody(req: IncomingMessage, maxBytes: number) { - const chunks: Buffer[] = []; - let total = 0; - return await new Promise<{ ok: boolean; value?: unknown; error?: string }>((resolve) => { - req.on("data", (chunk: Buffer) => { - total += chunk.length; - if (total > maxBytes) { - resolve({ ok: false, error: "payload too large" }); - req.destroy(); - return; - } - chunks.push(chunk); - }); - req.on("end", () => { - try { - const raw = Buffer.concat(chunks).toString("utf8"); - if (!raw.trim()) { - resolve({ ok: false, error: "empty payload" }); - return; - } - resolve({ ok: true, value: JSON.parse(raw) as unknown }); - } catch (err) { - resolve({ ok: false, error: err instanceof Error ? err.message : String(err) }); - } - }); - req.on("error", (err) => { - resolve({ ok: false, error: err instanceof Error ? err.message : String(err) }); - }); - }); -} - type WebhookTarget = { token: string; account: ResolvedZaloAccount; @@ -177,10 +150,19 @@ export async function handleZaloWebhookRequest( return true; } - const body = await readJsonBody(req, 1024 * 1024); + const body = await readJsonBodyWithLimit(req, { + maxBytes: 1024 * 1024, + timeoutMs: 30_000, + emptyObjectOnEmpty: false, + }); if (!body.ok) { - res.statusCode = body.error === "payload too large" ? 413 : 400; - res.end(body.error ?? "invalid payload"); + res.statusCode = + body.code === "PAYLOAD_TOO_LARGE" ? 413 : body.code === "REQUEST_BODY_TIMEOUT" ? 408 : 400; + res.end( + body.code === "REQUEST_BODY_TIMEOUT" + ? requestBodyErrorToText("REQUEST_BODY_TIMEOUT") + : body.error, + ); return true; } diff --git a/src/gateway/hooks.ts b/src/gateway/hooks.ts index 1069b209177..56b6a39835e 100644 --- a/src/gateway/hooks.ts +++ b/src/gateway/hooks.ts @@ -4,6 +4,7 @@ import type { ChannelId } from "../channels/plugins/types.js"; import type { OpenClawConfig } from "../config/config.js"; import { listAgentIds, resolveDefaultAgentId } from "../agents/agent-scope.js"; import { listChannelPlugins } from "../channels/plugins/index.js"; +import { readJsonBodyWithLimit, requestBodyErrorToText } from "../infra/http-body.js"; import { normalizeAgentId } from "../routing/session-key.js"; import { normalizeMessageChannel } from "../utils/message-channel.js"; import { type HookMappingResolved, resolveHookMappings } from "./hooks-mapping.js"; @@ -177,48 +178,20 @@ export async function readJsonBody( req: IncomingMessage, maxBytes: number, ): Promise<{ ok: true; value: unknown } | { ok: false; error: string }> { - return await new Promise((resolve) => { - let done = false; - let total = 0; - const chunks: Buffer[] = []; - req.on("data", (chunk: Buffer) => { - if (done) { - return; - } - total += chunk.length; - if (total > maxBytes) { - done = true; - resolve({ ok: false, error: "payload too large" }); - req.destroy(); - return; - } - chunks.push(chunk); - }); - req.on("end", () => { - if (done) { - return; - } - done = true; - const raw = Buffer.concat(chunks).toString("utf-8").trim(); - if (!raw) { - resolve({ ok: true, value: {} }); - return; - } - try { - const parsed = JSON.parse(raw) as unknown; - resolve({ ok: true, value: parsed }); - } catch (err) { - resolve({ ok: false, error: String(err) }); - } - }); - req.on("error", (err) => { - if (done) { - return; - } - done = true; - resolve({ ok: false, error: String(err) }); - }); - }); + const result = await readJsonBodyWithLimit(req, { maxBytes, emptyObjectOnEmpty: true }); + if (result.ok) { + return result; + } + if (result.code === "PAYLOAD_TOO_LARGE") { + return { ok: false, error: "payload too large" }; + } + if (result.code === "REQUEST_BODY_TIMEOUT") { + return { ok: false, error: "request body timeout" }; + } + if (result.code === "CONNECTION_CLOSED") { + return { ok: false, error: requestBodyErrorToText("CONNECTION_CLOSED") }; + } + return { ok: false, error: result.error }; } export function normalizeHookHeaders(req: IncomingMessage) { diff --git a/src/gateway/http-common.ts b/src/gateway/http-common.ts index b9788861808..22e09254fdc 100644 --- a/src/gateway/http-common.ts +++ b/src/gateway/http-common.ts @@ -58,6 +58,18 @@ export async function readJsonBodyOrError( ): Promise { const body = await readJsonBody(req, maxBytes); if (!body.ok) { + if (body.error === "payload too large") { + sendJson(res, 413, { + error: { message: "Payload too large", type: "invalid_request_error" }, + }); + return undefined; + } + if (body.error === "request body timeout") { + sendJson(res, 408, { + error: { message: "Request body timeout", type: "invalid_request_error" }, + }); + return undefined; + } sendInvalidRequest(res, body.error); return undefined; } diff --git a/src/gateway/server-http.ts b/src/gateway/server-http.ts index feb71a3ee12..7b5630d1a11 100644 --- a/src/gateway/server-http.ts +++ b/src/gateway/server-http.ts @@ -287,7 +287,12 @@ export function createHooksRequestHandler( const body = await readJsonBody(req, hooksConfig.maxBodyBytes); if (!body.ok) { - const status = body.error === "payload too large" ? 413 : 400; + const status = + body.error === "payload too large" + ? 413 + : body.error === "request body timeout" + ? 408 + : 400; sendJson(res, status, { ok: false, error: body.error }); return true; } diff --git a/src/infra/http-body.test.ts b/src/infra/http-body.test.ts new file mode 100644 index 00000000000..93302c7bae6 --- /dev/null +++ b/src/infra/http-body.test.ts @@ -0,0 +1,116 @@ +import type { IncomingMessage, ServerResponse } from "node:http"; +import { EventEmitter } from "node:events"; +import { describe, expect, it } from "vitest"; +import { + installRequestBodyLimitGuard, + isRequestBodyLimitError, + readJsonBodyWithLimit, + readRequestBodyWithLimit, +} from "./http-body.js"; + +function createMockRequest(params: { + chunks?: string[]; + headers?: Record; + emitEnd?: boolean; +}): IncomingMessage { + const req = new EventEmitter() as IncomingMessage & { destroyed?: boolean; destroy: () => void }; + req.destroyed = false; + req.headers = params.headers ?? {}; + req.destroy = () => { + req.destroyed = true; + }; + + if (params.chunks) { + void Promise.resolve().then(() => { + for (const chunk of params.chunks ?? []) { + req.emit("data", Buffer.from(chunk, "utf-8")); + if (req.destroyed) { + return; + } + } + if (params.emitEnd !== false) { + req.emit("end"); + } + }); + } + + return req; +} + +function createMockResponse(): ServerResponse & { body?: string } { + const headers: Record = {}; + const res = { + headersSent: false, + statusCode: 200, + setHeader: (key: string, value: string) => { + headers[key.toLowerCase()] = value; + return res; + }, + end: (body?: string) => { + res.headersSent = true; + res.body = body; + return res; + }, + } as unknown as ServerResponse & { body?: string }; + return res; +} + +describe("http body limits", () => { + it("reads body within max bytes", async () => { + const req = createMockRequest({ chunks: ['{"ok":true}'] }); + await expect(readRequestBodyWithLimit(req, { maxBytes: 1024 })).resolves.toBe('{"ok":true}'); + }); + + it("rejects oversized body", async () => { + const req = createMockRequest({ chunks: ["x".repeat(512)] }); + await expect(readRequestBodyWithLimit(req, { maxBytes: 64 })).rejects.toMatchObject({ + message: "PayloadTooLarge", + }); + }); + + it("returns json parse error when body is invalid", async () => { + const req = createMockRequest({ chunks: ["{bad json"] }); + const result = await readJsonBodyWithLimit(req, { maxBytes: 1024, emptyObjectOnEmpty: false }); + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.code).toBe("INVALID_JSON"); + } + }); + + it("returns payload-too-large for json body", async () => { + const req = createMockRequest({ chunks: ["x".repeat(1024)] }); + const result = await readJsonBodyWithLimit(req, { maxBytes: 10 }); + expect(result).toEqual({ ok: false, code: "PAYLOAD_TOO_LARGE", error: "Payload too large" }); + }); + + it("guard rejects oversized declared content-length", () => { + const req = createMockRequest({ + headers: { "content-length": "9999" }, + emitEnd: false, + }); + const res = createMockResponse(); + const guard = installRequestBodyLimitGuard(req, res, { maxBytes: 128 }); + expect(guard.isTripped()).toBe(true); + expect(guard.code()).toBe("PAYLOAD_TOO_LARGE"); + expect(res.statusCode).toBe(413); + }); + + it("guard rejects streamed oversized body", async () => { + const req = createMockRequest({ chunks: ["small", "x".repeat(256)], emitEnd: false }); + const res = createMockResponse(); + const guard = installRequestBodyLimitGuard(req, res, { maxBytes: 128, responseFormat: "text" }); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(guard.isTripped()).toBe(true); + expect(guard.code()).toBe("PAYLOAD_TOO_LARGE"); + expect(res.statusCode).toBe(413); + expect(res.body).toBe("Payload too large"); + }); + + it("timeout surfaces typed error", async () => { + const req = createMockRequest({ emitEnd: false }); + const promise = readRequestBodyWithLimit(req, { maxBytes: 128, timeoutMs: 10 }); + await expect(promise).rejects.toSatisfy((error: unknown) => + isRequestBodyLimitError(error, "REQUEST_BODY_TIMEOUT"), + ); + }); +}); diff --git a/src/infra/http-body.ts b/src/infra/http-body.ts new file mode 100644 index 00000000000..e296f00be44 --- /dev/null +++ b/src/infra/http-body.ts @@ -0,0 +1,347 @@ +import type { IncomingMessage, ServerResponse } from "node:http"; + +export const DEFAULT_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; +export const DEFAULT_WEBHOOK_BODY_TIMEOUT_MS = 30_000; + +export type RequestBodyLimitErrorCode = + | "PAYLOAD_TOO_LARGE" + | "REQUEST_BODY_TIMEOUT" + | "CONNECTION_CLOSED"; + +type RequestBodyLimitErrorInit = { + code: RequestBodyLimitErrorCode; + message?: string; +}; + +const DEFAULT_ERROR_MESSAGE: Record = { + PAYLOAD_TOO_LARGE: "PayloadTooLarge", + REQUEST_BODY_TIMEOUT: "RequestBodyTimeout", + CONNECTION_CLOSED: "RequestBodyConnectionClosed", +}; + +const DEFAULT_ERROR_STATUS_CODE: Record = { + PAYLOAD_TOO_LARGE: 413, + REQUEST_BODY_TIMEOUT: 408, + CONNECTION_CLOSED: 400, +}; + +const DEFAULT_RESPONSE_MESSAGE: Record = { + PAYLOAD_TOO_LARGE: "Payload too large", + REQUEST_BODY_TIMEOUT: "Request body timeout", + CONNECTION_CLOSED: "Connection closed", +}; + +export class RequestBodyLimitError extends Error { + readonly code: RequestBodyLimitErrorCode; + readonly statusCode: number; + + constructor(init: RequestBodyLimitErrorInit) { + super(init.message ?? DEFAULT_ERROR_MESSAGE[init.code]); + this.name = "RequestBodyLimitError"; + this.code = init.code; + this.statusCode = DEFAULT_ERROR_STATUS_CODE[init.code]; + } +} + +export function isRequestBodyLimitError( + error: unknown, + code?: RequestBodyLimitErrorCode, +): error is RequestBodyLimitError { + if (!(error instanceof RequestBodyLimitError)) { + return false; + } + if (!code) { + return true; + } + return error.code === code; +} + +export function requestBodyErrorToText(code: RequestBodyLimitErrorCode): string { + return DEFAULT_RESPONSE_MESSAGE[code]; +} + +function parseContentLengthHeader(req: IncomingMessage): number | null { + const header = req.headers["content-length"]; + const raw = Array.isArray(header) ? header[0] : header; + if (typeof raw !== "string") { + return null; + } + const parsed = Number.parseInt(raw, 10); + if (!Number.isFinite(parsed) || parsed < 0) { + return null; + } + return parsed; +} + +export type ReadRequestBodyOptions = { + maxBytes: number; + timeoutMs?: number; + encoding?: BufferEncoding; +}; + +export async function readRequestBodyWithLimit( + req: IncomingMessage, + options: ReadRequestBodyOptions, +): Promise { + const maxBytes = Number.isFinite(options.maxBytes) + ? Math.max(1, Math.floor(options.maxBytes)) + : 1; + const timeoutMs = + typeof options.timeoutMs === "number" && Number.isFinite(options.timeoutMs) + ? Math.max(1, Math.floor(options.timeoutMs)) + : DEFAULT_WEBHOOK_BODY_TIMEOUT_MS; + const encoding = options.encoding ?? "utf-8"; + + const declaredLength = parseContentLengthHeader(req); + if (declaredLength !== null && declaredLength > maxBytes) { + const error = new RequestBodyLimitError({ code: "PAYLOAD_TOO_LARGE" }); + if (!req.destroyed) { + req.destroy(error); + } + throw error; + } + + return await new Promise((resolve, reject) => { + let done = false; + let ended = false; + let totalBytes = 0; + const chunks: Buffer[] = []; + + const cleanup = () => { + req.removeListener("data", onData); + req.removeListener("end", onEnd); + req.removeListener("error", onError); + req.removeListener("close", onClose); + clearTimeout(timer); + }; + + const finish = (cb: () => void) => { + if (done) { + return; + } + done = true; + cleanup(); + cb(); + }; + + const fail = (error: RequestBodyLimitError | Error) => { + finish(() => reject(error)); + }; + + const timer = setTimeout(() => { + const error = new RequestBodyLimitError({ code: "REQUEST_BODY_TIMEOUT" }); + if (!req.destroyed) { + req.destroy(error); + } + fail(error); + }, timeoutMs); + + const onData = (chunk: Buffer | string) => { + if (done) { + return; + } + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); + totalBytes += buffer.length; + if (totalBytes > maxBytes) { + const error = new RequestBodyLimitError({ code: "PAYLOAD_TOO_LARGE" }); + if (!req.destroyed) { + req.destroy(error); + } + fail(error); + return; + } + chunks.push(buffer); + }; + + const onEnd = () => { + ended = true; + finish(() => resolve(Buffer.concat(chunks).toString(encoding))); + }; + + const onError = (error: Error) => { + if (done) { + return; + } + fail(error); + }; + + const onClose = () => { + if (done || ended) { + return; + } + fail(new RequestBodyLimitError({ code: "CONNECTION_CLOSED" })); + }; + + req.on("data", onData); + req.on("end", onEnd); + req.on("error", onError); + req.on("close", onClose); + }); +} + +export type ReadJsonBodyResult = + | { ok: true; value: unknown } + | { ok: false; error: string; code: RequestBodyLimitErrorCode | "INVALID_JSON" }; + +export type ReadJsonBodyOptions = ReadRequestBodyOptions & { + emptyObjectOnEmpty?: boolean; +}; + +export async function readJsonBodyWithLimit( + req: IncomingMessage, + options: ReadJsonBodyOptions, +): Promise { + try { + const raw = await readRequestBodyWithLimit(req, options); + const trimmed = raw.trim(); + if (!trimmed) { + if (options.emptyObjectOnEmpty === false) { + return { ok: false, code: "INVALID_JSON", error: "empty payload" }; + } + return { ok: true, value: {} }; + } + try { + return { ok: true, value: JSON.parse(trimmed) as unknown }; + } catch (error) { + return { + ok: false, + code: "INVALID_JSON", + error: error instanceof Error ? error.message : String(error), + }; + } + } catch (error) { + if (isRequestBodyLimitError(error)) { + return { ok: false, code: error.code, error: requestBodyErrorToText(error.code) }; + } + return { + ok: false, + code: "INVALID_JSON", + error: error instanceof Error ? error.message : String(error), + }; + } +} + +export type RequestBodyLimitGuard = { + dispose: () => void; + isTripped: () => boolean; + code: () => RequestBodyLimitErrorCode | null; +}; + +export type RequestBodyLimitGuardOptions = { + maxBytes: number; + timeoutMs?: number; + responseFormat?: "json" | "text"; + responseText?: Partial>; +}; + +export function installRequestBodyLimitGuard( + req: IncomingMessage, + res: ServerResponse, + options: RequestBodyLimitGuardOptions, +): RequestBodyLimitGuard { + const maxBytes = Number.isFinite(options.maxBytes) + ? Math.max(1, Math.floor(options.maxBytes)) + : 1; + const timeoutMs = + typeof options.timeoutMs === "number" && Number.isFinite(options.timeoutMs) + ? Math.max(1, Math.floor(options.timeoutMs)) + : DEFAULT_WEBHOOK_BODY_TIMEOUT_MS; + const responseFormat = options.responseFormat ?? "json"; + const customText = options.responseText ?? {}; + + let tripped = false; + let reason: RequestBodyLimitErrorCode | null = null; + let done = false; + let ended = false; + let totalBytes = 0; + + const cleanup = () => { + req.removeListener("data", onData); + req.removeListener("end", onEnd); + req.removeListener("close", onClose); + req.removeListener("error", onError); + clearTimeout(timer); + }; + + const finish = () => { + if (done) { + return; + } + done = true; + cleanup(); + }; + + const respond = (error: RequestBodyLimitError) => { + const text = customText[error.code] ?? requestBodyErrorToText(error.code); + if (!res.headersSent) { + res.statusCode = error.statusCode; + if (responseFormat === "text") { + res.setHeader("Content-Type", "text/plain; charset=utf-8"); + res.end(text); + } else { + res.setHeader("Content-Type", "application/json; charset=utf-8"); + res.end(JSON.stringify({ error: text })); + } + } + }; + + const trip = (error: RequestBodyLimitError) => { + if (tripped) { + return; + } + tripped = true; + reason = error.code; + finish(); + respond(error); + if (!req.destroyed) { + req.destroy(error); + } + }; + + const onData = (chunk: Buffer | string) => { + if (done) { + return; + } + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); + totalBytes += buffer.length; + if (totalBytes > maxBytes) { + trip(new RequestBodyLimitError({ code: "PAYLOAD_TOO_LARGE" })); + } + }; + + const onEnd = () => { + ended = true; + finish(); + }; + + const onClose = () => { + if (done || ended) { + return; + } + finish(); + }; + + const onError = () => { + finish(); + }; + + const timer = setTimeout(() => { + trip(new RequestBodyLimitError({ code: "REQUEST_BODY_TIMEOUT" })); + }, timeoutMs); + + req.on("data", onData); + req.on("end", onEnd); + req.on("close", onClose); + req.on("error", onError); + + const declaredLength = parseContentLengthHeader(req); + if (declaredLength !== null && declaredLength > maxBytes) { + trip(new RequestBodyLimitError({ code: "PAYLOAD_TOO_LARGE" })); + } + + return { + dispose: finish, + isTripped: () => tripped, + code: () => reason, + }; +} diff --git a/src/line/monitor.read-body.test.ts b/src/line/monitor.read-body.test.ts new file mode 100644 index 00000000000..1c2e53544bb --- /dev/null +++ b/src/line/monitor.read-body.test.ts @@ -0,0 +1,38 @@ +import type { IncomingMessage } from "node:http"; +import { EventEmitter } from "node:events"; +import { describe, expect, it } from "vitest"; +import { readLineWebhookRequestBody } from "./monitor.js"; + +function createMockRequest(chunks: string[]): IncomingMessage { + const req = new EventEmitter() as IncomingMessage & { destroyed?: boolean; destroy: () => void }; + req.destroyed = false; + req.headers = {}; + req.destroy = () => { + req.destroyed = true; + }; + + void Promise.resolve().then(() => { + for (const chunk of chunks) { + req.emit("data", Buffer.from(chunk, "utf-8")); + if (req.destroyed) { + return; + } + } + req.emit("end"); + }); + + return req; +} + +describe("readLineWebhookRequestBody", () => { + it("reads body within limit", async () => { + const req = createMockRequest(['{"events":[{"type":"message"}]}']); + const body = await readLineWebhookRequestBody(req, 1024); + expect(body).toContain('"events"'); + }); + + it("rejects oversized body", async () => { + const req = createMockRequest(["x".repeat(2048)]); + await expect(readLineWebhookRequestBody(req, 128)).rejects.toThrow("PayloadTooLarge"); + }); +}); diff --git a/src/line/monitor.ts b/src/line/monitor.ts index 170225c7498..821cb7b37ec 100644 --- a/src/line/monitor.ts +++ b/src/line/monitor.ts @@ -7,6 +7,11 @@ import { chunkMarkdownText } from "../auto-reply/chunk.js"; import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; import { createReplyPrefixOptions } from "../channels/reply-prefix.js"; import { danger, logVerbose } from "../globals.js"; +import { + isRequestBodyLimitError, + readRequestBodyWithLimit, + requestBodyErrorToText, +} from "../infra/http-body.js"; import { normalizePluginHttpPath } from "../plugins/http-path.js"; import { registerPluginHttpRoute } from "../plugins/http-registry.js"; import { deliverLineAutoReply } from "./auto-reply-delivery.js"; @@ -46,6 +51,9 @@ export interface LineProviderMonitor { stop: () => void; } +const LINE_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; +const LINE_WEBHOOK_BODY_TIMEOUT_MS = 30_000; + // Track runtime state in memory (simplified version) const runtimeState = new Map< string, @@ -85,12 +93,13 @@ export function getLineRuntimeState(accountId: string) { return runtimeState.get(`line:${accountId}`); } -async function readRequestBody(req: IncomingMessage): Promise { - return new Promise((resolve, reject) => { - const chunks: Buffer[] = []; - req.on("data", (chunk) => chunks.push(chunk)); - req.on("end", () => resolve(Buffer.concat(chunks).toString("utf-8"))); - req.on("error", reject); +export async function readLineWebhookRequestBody( + req: IncomingMessage, + maxBytes = LINE_WEBHOOK_MAX_BODY_BYTES, +): Promise { + return await readRequestBodyWithLimit(req, { + maxBytes, + timeoutMs: LINE_WEBHOOK_BODY_TIMEOUT_MS, }); } @@ -310,7 +319,7 @@ export async function monitorLineProvider( } try { - const rawBody = await readRequestBody(req); + const rawBody = await readLineWebhookRequestBody(req, LINE_WEBHOOK_MAX_BODY_BYTES); const signature = req.headers["x-line-signature"]; // Validate signature @@ -346,6 +355,18 @@ export async function monitorLineProvider( }); } } catch (err) { + if (isRequestBodyLimitError(err, "PAYLOAD_TOO_LARGE")) { + res.statusCode = 413; + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ error: "Payload too large" })); + return; + } + if (isRequestBodyLimitError(err, "REQUEST_BODY_TIMEOUT")) { + res.statusCode = 408; + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ error: requestBodyErrorToText("REQUEST_BODY_TIMEOUT") })); + return; + } runtime.error?.(danger(`line webhook error: ${String(err)}`)); if (!res.headersSent) { res.statusCode = 500; diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index 5355d933e5c..23d232d62d6 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -136,6 +136,16 @@ export { rejectDevicePairing, } from "../infra/device-pairing.js"; export { formatErrorMessage } from "../infra/errors.js"; +export { + DEFAULT_WEBHOOK_BODY_TIMEOUT_MS, + DEFAULT_WEBHOOK_MAX_BODY_BYTES, + RequestBodyLimitError, + installRequestBodyLimitGuard, + isRequestBodyLimitError, + readJsonBodyWithLimit, + readRequestBodyWithLimit, + requestBodyErrorToText, +} from "../infra/http-body.js"; export { isWSLSync, isWSL2Sync, isWSLEnv } from "../infra/wsl.js"; export { isTruthyEnvValue } from "../infra/env.js"; export { resolveToolsBySender } from "../config/group-policy.js"; diff --git a/src/slack/monitor/provider.ts b/src/slack/monitor/provider.ts index 4db17c533d3..6c544655cca 100644 --- a/src/slack/monitor/provider.ts +++ b/src/slack/monitor/provider.ts @@ -8,6 +8,7 @@ import { DEFAULT_GROUP_HISTORY_LIMIT } from "../../auto-reply/reply/history.js"; import { mergeAllowlist, summarizeMapping } from "../../channels/allowlists/resolve-utils.js"; import { loadConfig } from "../../config/config.js"; import { warn } from "../../globals.js"; +import { installRequestBodyLimitGuard } from "../../infra/http-body.js"; import { normalizeMainKey } from "../../routing/session-key.js"; import { resolveSlackAccount } from "../accounts.js"; import { resolveSlackWebClientOptions } from "../client.js"; @@ -30,6 +31,10 @@ const slackBoltModule = SlackBolt as typeof import("@slack/bolt") & { const slackBolt = (slackBoltModule.App ? slackBoltModule : slackBoltModule.default) ?? slackBoltModule; const { App, HTTPReceiver } = slackBolt; + +const SLACK_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; +const SLACK_WEBHOOK_BODY_TIMEOUT_MS = 30_000; + function parseApiAppIdFromAppToken(raw?: string) { const token = raw?.trim(); if (!token) { @@ -146,7 +151,23 @@ export async function monitorSlackProvider(opts: MonitorSlackOpts = {}) { const slackHttpHandler = slackMode === "http" && receiver ? async (req: IncomingMessage, res: ServerResponse) => { - await Promise.resolve(receiver.requestListener(req, res)); + const guard = installRequestBodyLimitGuard(req, res, { + maxBytes: SLACK_WEBHOOK_MAX_BODY_BYTES, + timeoutMs: SLACK_WEBHOOK_BODY_TIMEOUT_MS, + responseFormat: "text", + }); + if (guard.isTripped()) { + return; + } + try { + await Promise.resolve(receiver.requestListener(req, res)); + } catch (err) { + if (!guard.isTripped()) { + throw err; + } + } finally { + guard.dispose(); + } } : null; let unregisterHttpHandler: (() => void) | null = null; diff --git a/src/telegram/webhook.ts b/src/telegram/webhook.ts index 83c6f9afc7c..85b5806935a 100644 --- a/src/telegram/webhook.ts +++ b/src/telegram/webhook.ts @@ -4,6 +4,7 @@ import type { OpenClawConfig } from "../config/config.js"; import type { RuntimeEnv } from "../runtime.js"; import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js"; import { formatErrorMessage } from "../infra/errors.js"; +import { installRequestBodyLimitGuard } from "../infra/http-body.js"; import { logWebhookError, logWebhookProcessed, @@ -16,6 +17,9 @@ import { resolveTelegramAllowedUpdates } from "./allowed-updates.js"; import { withTelegramApiErrorLogging } from "./api-logging.js"; import { createTelegramBot } from "./bot.js"; +const TELEGRAM_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; +const TELEGRAM_WEBHOOK_BODY_TIMEOUT_MS = 30_000; + export async function startTelegramWebhook(opts: { token: string; accountId?: string; @@ -66,6 +70,14 @@ export async function startTelegramWebhook(opts: { if (diagnosticsEnabled) { logWebhookReceived({ channel: "telegram", updateType: "telegram-post" }); } + const guard = installRequestBodyLimitGuard(req, res, { + maxBytes: TELEGRAM_WEBHOOK_MAX_BODY_BYTES, + timeoutMs: TELEGRAM_WEBHOOK_BODY_TIMEOUT_MS, + responseFormat: "text", + }); + if (guard.isTripped()) { + return; + } const handled = handler(req, res); if (handled && typeof handled.catch === "function") { void handled @@ -79,6 +91,9 @@ export async function startTelegramWebhook(opts: { } }) .catch((err) => { + if (guard.isTripped()) { + return; + } const errMsg = formatErrorMessage(err); if (diagnosticsEnabled) { logWebhookError({ @@ -92,8 +107,13 @@ export async function startTelegramWebhook(opts: { res.writeHead(500); } res.end(); + }) + .finally(() => { + guard.dispose(); }); + return; } + guard.dispose(); }); const publicUrl =