mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-27 00:17:29 +00:00
fix(security): enforce bounded webhook body handling
This commit is contained in:
@@ -2,11 +2,14 @@ import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk";
|
||||
import {
|
||||
createReplyPrefixOptions,
|
||||
isRequestBodyLimitError,
|
||||
logAckFailure,
|
||||
logInboundDrop,
|
||||
logTypingFailure,
|
||||
readRequestBodyWithLimit,
|
||||
resolveAckReaction,
|
||||
resolveControlCommandGate,
|
||||
requestBodyErrorToText,
|
||||
} from "openclaw/plugin-sdk";
|
||||
import type { ResolvedBlueBubblesAccount } from "./accounts.js";
|
||||
import type { BlueBubblesAccountConfig, BlueBubblesAttachment } from "./types.js";
|
||||
@@ -511,63 +514,40 @@ export function registerBlueBubblesWebhookTarget(target: WebhookTarget): () => v
|
||||
}
|
||||
|
||||
async function readJsonBody(req: IncomingMessage, maxBytes: number, timeoutMs = 30_000) {
|
||||
const chunks: Buffer[] = [];
|
||||
let total = 0;
|
||||
return await new Promise<{ ok: boolean; value?: unknown; error?: string }>((resolve) => {
|
||||
let done = false;
|
||||
const finish = (result: { ok: boolean; value?: unknown; error?: string }) => {
|
||||
if (done) {
|
||||
return;
|
||||
}
|
||||
done = true;
|
||||
clearTimeout(timer);
|
||||
resolve(result);
|
||||
};
|
||||
let rawBody = "";
|
||||
try {
|
||||
rawBody = await readRequestBodyWithLimit(req, { maxBytes, timeoutMs });
|
||||
} catch (error) {
|
||||
if (isRequestBodyLimitError(error, "PAYLOAD_TOO_LARGE")) {
|
||||
return { ok: false, error: "payload too large" };
|
||||
}
|
||||
if (isRequestBodyLimitError(error, "REQUEST_BODY_TIMEOUT")) {
|
||||
return { ok: false, error: requestBodyErrorToText("REQUEST_BODY_TIMEOUT") };
|
||||
}
|
||||
if (isRequestBodyLimitError(error, "CONNECTION_CLOSED")) {
|
||||
return { ok: false, error: requestBodyErrorToText("CONNECTION_CLOSED") };
|
||||
}
|
||||
return { ok: false, error: error instanceof Error ? error.message : String(error) };
|
||||
}
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
finish({ ok: false, error: "request body timeout" });
|
||||
req.destroy();
|
||||
}, timeoutMs);
|
||||
|
||||
req.on("data", (chunk: Buffer) => {
|
||||
total += chunk.length;
|
||||
if (total > maxBytes) {
|
||||
finish({ ok: false, error: "payload too large" });
|
||||
req.destroy();
|
||||
return;
|
||||
try {
|
||||
const raw = rawBody.toString();
|
||||
if (!raw.trim()) {
|
||||
return { ok: false, error: "empty payload" };
|
||||
}
|
||||
try {
|
||||
return { ok: true, value: JSON.parse(raw) as unknown };
|
||||
} catch {
|
||||
const params = new URLSearchParams(raw);
|
||||
const payload = params.get("payload") ?? params.get("data") ?? params.get("message");
|
||||
if (payload) {
|
||||
return { ok: true, value: JSON.parse(payload) as unknown };
|
||||
}
|
||||
chunks.push(chunk);
|
||||
});
|
||||
req.on("end", () => {
|
||||
try {
|
||||
const raw = Buffer.concat(chunks).toString("utf8");
|
||||
if (!raw.trim()) {
|
||||
finish({ ok: false, error: "empty payload" });
|
||||
return;
|
||||
}
|
||||
try {
|
||||
finish({ ok: true, value: JSON.parse(raw) as unknown });
|
||||
return;
|
||||
} catch {
|
||||
const params = new URLSearchParams(raw);
|
||||
const payload = params.get("payload") ?? params.get("data") ?? params.get("message");
|
||||
if (payload) {
|
||||
finish({ ok: true, value: JSON.parse(payload) as unknown });
|
||||
return;
|
||||
}
|
||||
throw new Error("invalid json");
|
||||
}
|
||||
} catch (err) {
|
||||
finish({ ok: false, error: err instanceof Error ? err.message : String(err) });
|
||||
}
|
||||
});
|
||||
req.on("error", (err) => {
|
||||
finish({ ok: false, error: err instanceof Error ? err.message : String(err) });
|
||||
});
|
||||
req.on("close", () => {
|
||||
finish({ ok: false, error: "connection closed" });
|
||||
});
|
||||
});
|
||||
throw new Error("invalid json");
|
||||
}
|
||||
} catch (error) {
|
||||
return { ok: false, error: error instanceof Error ? error.message : String(error) };
|
||||
}
|
||||
}
|
||||
|
||||
function asRecord(value: unknown): Record<string, unknown> | null {
|
||||
@@ -1461,7 +1441,12 @@ export async function handleBlueBubblesWebhookRequest(
|
||||
|
||||
const body = await readJsonBody(req, 1024 * 1024);
|
||||
if (!body.ok) {
|
||||
res.statusCode = body.error === "payload too large" ? 413 : 400;
|
||||
res.statusCode =
|
||||
body.error === "payload too large"
|
||||
? 413
|
||||
: body.error === requestBodyErrorToText("REQUEST_BODY_TIMEOUT")
|
||||
? 408
|
||||
: 400;
|
||||
res.end(body.error ?? "invalid payload");
|
||||
console.warn(`[bluebubbles] webhook rejected: ${body.error ?? "invalid payload"}`);
|
||||
return true;
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "openclaw/plugin-sdk";
|
||||
import * as Lark from "@larksuiteoapi/node-sdk";
|
||||
import * as http from "http";
|
||||
import {
|
||||
type ClawdbotConfig,
|
||||
type RuntimeEnv,
|
||||
type HistoryEntry,
|
||||
installRequestBodyLimitGuard,
|
||||
} from "openclaw/plugin-sdk";
|
||||
import type { ResolvedFeishuAccount } from "./types.js";
|
||||
import { resolveFeishuAccount, listEnabledFeishuAccounts } from "./accounts.js";
|
||||
import { handleFeishuMessage, type FeishuMessageEvent, type FeishuBotAddedEvent } from "./bot.js";
|
||||
@@ -18,6 +23,8 @@ export type MonitorFeishuOpts = {
|
||||
const wsClients = new Map<string, Lark.WSClient>();
|
||||
const httpServers = new Map<string, http.Server>();
|
||||
const botOpenIds = new Map<string, string>();
|
||||
const FEISHU_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024;
|
||||
const FEISHU_WEBHOOK_BODY_TIMEOUT_MS = 30_000;
|
||||
|
||||
async function fetchBotOpenId(account: ResolvedFeishuAccount): Promise<string | undefined> {
|
||||
try {
|
||||
@@ -197,7 +204,26 @@ async function monitorWebhook({
|
||||
log(`feishu[${accountId}]: starting Webhook server on port ${port}, path ${path}...`);
|
||||
|
||||
const server = http.createServer();
|
||||
server.on("request", Lark.adaptDefault(path, eventDispatcher, { autoChallenge: true }));
|
||||
const webhookHandler = Lark.adaptDefault(path, eventDispatcher, { autoChallenge: true });
|
||||
server.on("request", (req, res) => {
|
||||
const guard = installRequestBodyLimitGuard(req, res, {
|
||||
maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES,
|
||||
timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS,
|
||||
responseFormat: "text",
|
||||
});
|
||||
if (guard.isTripped()) {
|
||||
return;
|
||||
}
|
||||
void Promise.resolve(webhookHandler(req, res))
|
||||
.catch((err) => {
|
||||
if (!guard.isTripped()) {
|
||||
error(`feishu[${accountId}]: webhook handler error: ${String(err)}`);
|
||||
}
|
||||
})
|
||||
.finally(() => {
|
||||
guard.dispose();
|
||||
});
|
||||
});
|
||||
httpServers.set(accountId, server);
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk";
|
||||
import { createReplyPrefixOptions, resolveMentionGatingWithBypass } from "openclaw/plugin-sdk";
|
||||
import {
|
||||
createReplyPrefixOptions,
|
||||
readJsonBodyWithLimit,
|
||||
requestBodyErrorToText,
|
||||
resolveMentionGatingWithBypass,
|
||||
} from "openclaw/plugin-sdk";
|
||||
import type {
|
||||
GoogleChatAnnotation,
|
||||
GoogleChatAttachment,
|
||||
@@ -84,46 +89,6 @@ function resolveWebhookPath(webhookPath?: string, webhookUrl?: string): string |
|
||||
return "/googlechat";
|
||||
}
|
||||
|
||||
async function readJsonBody(req: IncomingMessage, maxBytes: number) {
|
||||
const chunks: Buffer[] = [];
|
||||
let total = 0;
|
||||
return await new Promise<{ ok: boolean; value?: unknown; error?: string }>((resolve) => {
|
||||
let resolved = false;
|
||||
const doResolve = (value: { ok: boolean; value?: unknown; error?: string }) => {
|
||||
if (resolved) {
|
||||
return;
|
||||
}
|
||||
resolved = true;
|
||||
req.removeAllListeners();
|
||||
resolve(value);
|
||||
};
|
||||
req.on("data", (chunk: Buffer) => {
|
||||
total += chunk.length;
|
||||
if (total > maxBytes) {
|
||||
doResolve({ ok: false, error: "payload too large" });
|
||||
req.destroy();
|
||||
return;
|
||||
}
|
||||
chunks.push(chunk);
|
||||
});
|
||||
req.on("end", () => {
|
||||
try {
|
||||
const raw = Buffer.concat(chunks).toString("utf8");
|
||||
if (!raw.trim()) {
|
||||
doResolve({ ok: false, error: "empty payload" });
|
||||
return;
|
||||
}
|
||||
doResolve({ ok: true, value: JSON.parse(raw) as unknown });
|
||||
} catch (err) {
|
||||
doResolve({ ok: false, error: err instanceof Error ? err.message : String(err) });
|
||||
}
|
||||
});
|
||||
req.on("error", (err) => {
|
||||
doResolve({ ok: false, error: err instanceof Error ? err.message : String(err) });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function registerGoogleChatWebhookTarget(target: WebhookTarget): () => void {
|
||||
const key = normalizeWebhookPath(target.path);
|
||||
const normalizedTarget = { ...target, path: key };
|
||||
@@ -178,10 +143,19 @@ export async function handleGoogleChatWebhookRequest(
|
||||
? authHeader.slice("bearer ".length)
|
||||
: "";
|
||||
|
||||
const body = await readJsonBody(req, 1024 * 1024);
|
||||
const body = await readJsonBodyWithLimit(req, {
|
||||
maxBytes: 1024 * 1024,
|
||||
timeoutMs: 30_000,
|
||||
emptyObjectOnEmpty: false,
|
||||
});
|
||||
if (!body.ok) {
|
||||
res.statusCode = body.error === "payload too large" ? 413 : 400;
|
||||
res.end(body.error ?? "invalid payload");
|
||||
res.statusCode =
|
||||
body.code === "PAYLOAD_TOO_LARGE" ? 413 : body.code === "REQUEST_BODY_TIMEOUT" ? 408 : 400;
|
||||
res.end(
|
||||
body.code === "REQUEST_BODY_TIMEOUT"
|
||||
? requestBodyErrorToText("REQUEST_BODY_TIMEOUT")
|
||||
: body.error,
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { Request, Response } from "express";
|
||||
import {
|
||||
DEFAULT_WEBHOOK_MAX_BODY_BYTES,
|
||||
mergeAllowlist,
|
||||
summarizeMapping,
|
||||
type OpenClawConfig,
|
||||
@@ -32,6 +33,8 @@ export type MonitorMSTeamsResult = {
|
||||
shutdown: () => Promise<void>;
|
||||
};
|
||||
|
||||
const MSTEAMS_WEBHOOK_MAX_BODY_BYTES = DEFAULT_WEBHOOK_MAX_BODY_BYTES;
|
||||
|
||||
export async function monitorMSTeamsProvider(
|
||||
opts: MonitorMSTeamsOpts,
|
||||
): Promise<MonitorMSTeamsResult> {
|
||||
@@ -239,7 +242,14 @@ export async function monitorMSTeamsProvider(
|
||||
|
||||
// Create Express server
|
||||
const expressApp = express.default();
|
||||
expressApp.use(express.json());
|
||||
expressApp.use(express.json({ limit: MSTEAMS_WEBHOOK_MAX_BODY_BYTES }));
|
||||
expressApp.use((err: unknown, _req: Request, res: Response, next: (err?: unknown) => void) => {
|
||||
if (err && typeof err === "object" && "status" in err && err.status === 413) {
|
||||
res.status(413).json({ error: "Payload too large" });
|
||||
return;
|
||||
}
|
||||
next(err);
|
||||
});
|
||||
expressApp.use(authorizeJWT(authConfig));
|
||||
|
||||
// Set up the messages endpoint - use configured path and /api/messages as fallback
|
||||
|
||||
38
extensions/nextcloud-talk/src/monitor.read-body.test.ts
Normal file
38
extensions/nextcloud-talk/src/monitor.read-body.test.ts
Normal file
@@ -0,0 +1,38 @@
|
||||
import type { IncomingMessage } from "node:http";
|
||||
import { EventEmitter } from "node:events";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { readNextcloudTalkWebhookBody } from "./monitor.js";
|
||||
|
||||
function createMockRequest(chunks: string[]): IncomingMessage {
|
||||
const req = new EventEmitter() as IncomingMessage & { destroyed?: boolean; destroy: () => void };
|
||||
req.destroyed = false;
|
||||
req.headers = {};
|
||||
req.destroy = () => {
|
||||
req.destroyed = true;
|
||||
};
|
||||
|
||||
void Promise.resolve().then(() => {
|
||||
for (const chunk of chunks) {
|
||||
req.emit("data", Buffer.from(chunk, "utf-8"));
|
||||
if (req.destroyed) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
req.emit("end");
|
||||
});
|
||||
|
||||
return req;
|
||||
}
|
||||
|
||||
describe("readNextcloudTalkWebhookBody", () => {
|
||||
it("reads valid body within max bytes", async () => {
|
||||
const req = createMockRequest(['{"type":"Create"}']);
|
||||
const body = await readNextcloudTalkWebhookBody(req, 1024);
|
||||
expect(body).toBe('{"type":"Create"}');
|
||||
});
|
||||
|
||||
it("rejects when payload exceeds max bytes", async () => {
|
||||
const req = createMockRequest(["x".repeat(300)]);
|
||||
await expect(readNextcloudTalkWebhookBody(req, 128)).rejects.toThrow("PayloadTooLarge");
|
||||
});
|
||||
});
|
||||
@@ -1,5 +1,10 @@
|
||||
import type { RuntimeEnv } from "openclaw/plugin-sdk";
|
||||
import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http";
|
||||
import {
|
||||
type RuntimeEnv,
|
||||
isRequestBodyLimitError,
|
||||
readRequestBodyWithLimit,
|
||||
requestBodyErrorToText,
|
||||
} from "openclaw/plugin-sdk";
|
||||
import type {
|
||||
CoreConfig,
|
||||
NextcloudTalkInboundMessage,
|
||||
@@ -14,6 +19,8 @@ import { extractNextcloudTalkHeaders, verifyNextcloudTalkSignature } from "./sig
|
||||
const DEFAULT_WEBHOOK_PORT = 8788;
|
||||
const DEFAULT_WEBHOOK_HOST = "0.0.0.0";
|
||||
const DEFAULT_WEBHOOK_PATH = "/nextcloud-talk-webhook";
|
||||
const DEFAULT_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024;
|
||||
const DEFAULT_WEBHOOK_BODY_TIMEOUT_MS = 30_000;
|
||||
const HEALTH_PATH = "/healthz";
|
||||
|
||||
function formatError(err: unknown): string {
|
||||
@@ -62,12 +69,13 @@ function payloadToInboundMessage(
|
||||
};
|
||||
}
|
||||
|
||||
function readBody(req: IncomingMessage): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const chunks: Buffer[] = [];
|
||||
req.on("data", (chunk: Buffer) => chunks.push(chunk));
|
||||
req.on("end", () => resolve(Buffer.concat(chunks).toString("utf-8")));
|
||||
req.on("error", reject);
|
||||
export function readNextcloudTalkWebhookBody(
|
||||
req: IncomingMessage,
|
||||
maxBodyBytes: number,
|
||||
): Promise<string> {
|
||||
return readRequestBodyWithLimit(req, {
|
||||
maxBytes: maxBodyBytes,
|
||||
timeoutMs: DEFAULT_WEBHOOK_BODY_TIMEOUT_MS,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -77,6 +85,12 @@ export function createNextcloudTalkWebhookServer(opts: NextcloudTalkWebhookServe
|
||||
stop: () => void;
|
||||
} {
|
||||
const { port, host, path, secret, onMessage, onError, abortSignal } = opts;
|
||||
const maxBodyBytes =
|
||||
typeof opts.maxBodyBytes === "number" &&
|
||||
Number.isFinite(opts.maxBodyBytes) &&
|
||||
opts.maxBodyBytes > 0
|
||||
? Math.floor(opts.maxBodyBytes)
|
||||
: DEFAULT_WEBHOOK_MAX_BODY_BYTES;
|
||||
|
||||
const server = createServer(async (req: IncomingMessage, res: ServerResponse) => {
|
||||
if (req.url === HEALTH_PATH) {
|
||||
@@ -92,7 +106,7 @@ export function createNextcloudTalkWebhookServer(opts: NextcloudTalkWebhookServe
|
||||
}
|
||||
|
||||
try {
|
||||
const body = await readBody(req);
|
||||
const body = await readNextcloudTalkWebhookBody(req, maxBodyBytes);
|
||||
|
||||
const headers = extractNextcloudTalkHeaders(
|
||||
req.headers as Record<string, string | string[] | undefined>,
|
||||
@@ -140,6 +154,20 @@ export function createNextcloudTalkWebhookServer(opts: NextcloudTalkWebhookServe
|
||||
onError?.(err instanceof Error ? err : new Error(formatError(err)));
|
||||
}
|
||||
} catch (err) {
|
||||
if (isRequestBodyLimitError(err, "PAYLOAD_TOO_LARGE")) {
|
||||
if (!res.headersSent) {
|
||||
res.writeHead(413, { "Content-Type": "application/json" });
|
||||
res.end(JSON.stringify({ error: "Payload too large" }));
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (isRequestBodyLimitError(err, "REQUEST_BODY_TIMEOUT")) {
|
||||
if (!res.headersSent) {
|
||||
res.writeHead(408, { "Content-Type": "application/json" });
|
||||
res.end(JSON.stringify({ error: requestBodyErrorToText("REQUEST_BODY_TIMEOUT") }));
|
||||
}
|
||||
return;
|
||||
}
|
||||
const error = err instanceof Error ? err : new Error(formatError(err));
|
||||
onError?.(error);
|
||||
if (!res.headersSent) {
|
||||
|
||||
@@ -168,6 +168,7 @@ export type NextcloudTalkWebhookServerOptions = {
|
||||
host: string;
|
||||
path: string;
|
||||
secret: string;
|
||||
maxBodyBytes?: number;
|
||||
onMessage: (message: NextcloudTalkInboundMessage) => void | Promise<void>;
|
||||
onError?: (error: Error) => void;
|
||||
abortSignal?: AbortSignal;
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
*/
|
||||
|
||||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import { readJsonBodyWithLimit, requestBodyErrorToText } from "openclaw/plugin-sdk";
|
||||
import { z } from "zod";
|
||||
import { publishNostrProfile, getNostrProfileState } from "./channel.js";
|
||||
import { NostrProfileSchema, type NostrProfile } from "./config-schema.js";
|
||||
@@ -234,54 +235,24 @@ async function readJsonBody(
|
||||
maxBytes = 64 * 1024,
|
||||
timeoutMs = 30_000,
|
||||
): Promise<unknown> {
|
||||
return new Promise((resolve, reject) => {
|
||||
let done = false;
|
||||
const finish = (fn: () => void) => {
|
||||
if (done) {
|
||||
return;
|
||||
}
|
||||
done = true;
|
||||
clearTimeout(timer);
|
||||
fn();
|
||||
};
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
finish(() => {
|
||||
const err = new Error("Request body timeout");
|
||||
req.destroy(err);
|
||||
reject(err);
|
||||
});
|
||||
}, timeoutMs);
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
let totalBytes = 0;
|
||||
|
||||
req.on("data", (chunk: Buffer) => {
|
||||
totalBytes += chunk.length;
|
||||
if (totalBytes > maxBytes) {
|
||||
finish(() => {
|
||||
reject(new Error("Request body too large"));
|
||||
req.destroy();
|
||||
});
|
||||
return;
|
||||
}
|
||||
chunks.push(chunk);
|
||||
});
|
||||
|
||||
req.on("end", () => {
|
||||
finish(() => {
|
||||
try {
|
||||
const body = Buffer.concat(chunks).toString("utf-8");
|
||||
resolve(body ? JSON.parse(body) : {});
|
||||
} catch {
|
||||
reject(new Error("Invalid JSON"));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
req.on("error", (err) => finish(() => reject(err)));
|
||||
req.on("close", () => finish(() => reject(new Error("Connection closed"))));
|
||||
const result = await readJsonBodyWithLimit(req, {
|
||||
maxBytes,
|
||||
timeoutMs,
|
||||
emptyObjectOnEmpty: true,
|
||||
});
|
||||
if (result.ok) {
|
||||
return result.value;
|
||||
}
|
||||
if (result.code === "PAYLOAD_TOO_LARGE") {
|
||||
throw new Error("Request body too large");
|
||||
}
|
||||
if (result.code === "REQUEST_BODY_TIMEOUT") {
|
||||
throw new Error(requestBodyErrorToText("REQUEST_BODY_TIMEOUT"));
|
||||
}
|
||||
if (result.code === "CONNECTION_CLOSED") {
|
||||
throw new Error(requestBodyErrorToText("CONNECTION_CLOSED"));
|
||||
}
|
||||
throw new Error(result.code === "INVALID_JSON" ? "Invalid JSON" : result.error);
|
||||
}
|
||||
|
||||
function parseAccountIdFromPath(pathname: string): string | null {
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import { spawn } from "node:child_process";
|
||||
import http from "node:http";
|
||||
import { URL } from "node:url";
|
||||
import {
|
||||
isRequestBodyLimitError,
|
||||
readRequestBodyWithLimit,
|
||||
requestBodyErrorToText,
|
||||
} from "openclaw/plugin-sdk";
|
||||
import type { VoiceCallConfig } from "./config.js";
|
||||
import type { CoreConfig } from "./core-bridge.js";
|
||||
import type { CallManager } from "./manager.js";
|
||||
@@ -244,11 +249,16 @@ export class VoiceCallWebhookServer {
|
||||
try {
|
||||
body = await this.readBody(req, MAX_WEBHOOK_BODY_BYTES);
|
||||
} catch (err) {
|
||||
if (err instanceof Error && err.message === "PayloadTooLarge") {
|
||||
if (isRequestBodyLimitError(err, "PAYLOAD_TOO_LARGE")) {
|
||||
res.statusCode = 413;
|
||||
res.end("Payload Too Large");
|
||||
return;
|
||||
}
|
||||
if (isRequestBodyLimitError(err, "REQUEST_BODY_TIMEOUT")) {
|
||||
res.statusCode = 408;
|
||||
res.end(requestBodyErrorToText("REQUEST_BODY_TIMEOUT"));
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
|
||||
@@ -303,42 +313,7 @@ export class VoiceCallWebhookServer {
|
||||
maxBytes: number,
|
||||
timeoutMs = 30_000,
|
||||
): Promise<string> {
|
||||
return new Promise((resolve, reject) => {
|
||||
let done = false;
|
||||
const finish = (fn: () => void) => {
|
||||
if (done) {
|
||||
return;
|
||||
}
|
||||
done = true;
|
||||
clearTimeout(timer);
|
||||
fn();
|
||||
};
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
finish(() => {
|
||||
const err = new Error("Request body timeout");
|
||||
req.destroy(err);
|
||||
reject(err);
|
||||
});
|
||||
}, timeoutMs);
|
||||
|
||||
const chunks: Buffer[] = [];
|
||||
let totalBytes = 0;
|
||||
req.on("data", (chunk: Buffer) => {
|
||||
totalBytes += chunk.length;
|
||||
if (totalBytes > maxBytes) {
|
||||
finish(() => {
|
||||
req.destroy();
|
||||
reject(new Error("PayloadTooLarge"));
|
||||
});
|
||||
return;
|
||||
}
|
||||
chunks.push(chunk);
|
||||
});
|
||||
req.on("end", () => finish(() => resolve(Buffer.concat(chunks).toString("utf-8"))));
|
||||
req.on("error", (err) => finish(() => reject(err)));
|
||||
req.on("close", () => finish(() => reject(new Error("Connection closed"))));
|
||||
});
|
||||
return readRequestBodyWithLimit(req, { maxBytes, timeoutMs });
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
import type { IncomingMessage, ServerResponse } from "node:http";
|
||||
import type { OpenClawConfig, MarkdownTableMode } from "openclaw/plugin-sdk";
|
||||
import { createReplyPrefixOptions } from "openclaw/plugin-sdk";
|
||||
import {
|
||||
createReplyPrefixOptions,
|
||||
readJsonBodyWithLimit,
|
||||
requestBodyErrorToText,
|
||||
} from "openclaw/plugin-sdk";
|
||||
import type { ResolvedZaloAccount } from "./accounts.js";
|
||||
import {
|
||||
ZaloApiError,
|
||||
@@ -61,37 +65,6 @@ function isSenderAllowed(senderId: string, allowFrom: string[]): boolean {
|
||||
});
|
||||
}
|
||||
|
||||
async function readJsonBody(req: IncomingMessage, maxBytes: number) {
|
||||
const chunks: Buffer[] = [];
|
||||
let total = 0;
|
||||
return await new Promise<{ ok: boolean; value?: unknown; error?: string }>((resolve) => {
|
||||
req.on("data", (chunk: Buffer) => {
|
||||
total += chunk.length;
|
||||
if (total > maxBytes) {
|
||||
resolve({ ok: false, error: "payload too large" });
|
||||
req.destroy();
|
||||
return;
|
||||
}
|
||||
chunks.push(chunk);
|
||||
});
|
||||
req.on("end", () => {
|
||||
try {
|
||||
const raw = Buffer.concat(chunks).toString("utf8");
|
||||
if (!raw.trim()) {
|
||||
resolve({ ok: false, error: "empty payload" });
|
||||
return;
|
||||
}
|
||||
resolve({ ok: true, value: JSON.parse(raw) as unknown });
|
||||
} catch (err) {
|
||||
resolve({ ok: false, error: err instanceof Error ? err.message : String(err) });
|
||||
}
|
||||
});
|
||||
req.on("error", (err) => {
|
||||
resolve({ ok: false, error: err instanceof Error ? err.message : String(err) });
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
type WebhookTarget = {
|
||||
token: string;
|
||||
account: ResolvedZaloAccount;
|
||||
@@ -177,10 +150,19 @@ export async function handleZaloWebhookRequest(
|
||||
return true;
|
||||
}
|
||||
|
||||
const body = await readJsonBody(req, 1024 * 1024);
|
||||
const body = await readJsonBodyWithLimit(req, {
|
||||
maxBytes: 1024 * 1024,
|
||||
timeoutMs: 30_000,
|
||||
emptyObjectOnEmpty: false,
|
||||
});
|
||||
if (!body.ok) {
|
||||
res.statusCode = body.error === "payload too large" ? 413 : 400;
|
||||
res.end(body.error ?? "invalid payload");
|
||||
res.statusCode =
|
||||
body.code === "PAYLOAD_TOO_LARGE" ? 413 : body.code === "REQUEST_BODY_TIMEOUT" ? 408 : 400;
|
||||
res.end(
|
||||
body.code === "REQUEST_BODY_TIMEOUT"
|
||||
? requestBodyErrorToText("REQUEST_BODY_TIMEOUT")
|
||||
: body.error,
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user