diff --git a/extensions/bluebubbles/src/config-schema.test.ts b/extensions/bluebubbles/src/config-schema.test.ts new file mode 100644 index 00000000000..be32c8f96b0 --- /dev/null +++ b/extensions/bluebubbles/src/config-schema.test.ts @@ -0,0 +1,55 @@ +import { describe, expect, it } from "vitest"; +import { BlueBubblesConfigSchema } from "./config-schema.js"; + +describe("BlueBubblesConfigSchema", () => { + it("accepts account config when serverUrl and password are both set", () => { + const parsed = BlueBubblesConfigSchema.safeParse({ + serverUrl: "http://localhost:1234", + password: "secret", + }); + expect(parsed.success).toBe(true); + }); + + it("requires password when top-level serverUrl is configured", () => { + const parsed = BlueBubblesConfigSchema.safeParse({ + serverUrl: "http://localhost:1234", + }); + expect(parsed.success).toBe(false); + if (parsed.success) { + return; + } + expect(parsed.error.issues[0]?.path).toEqual(["password"]); + expect(parsed.error.issues[0]?.message).toBe( + "password is required when serverUrl is configured", + ); + }); + + it("requires password when account serverUrl is configured", () => { + const parsed = BlueBubblesConfigSchema.safeParse({ + accounts: { + work: { + serverUrl: "http://localhost:1234", + }, + }, + }); + expect(parsed.success).toBe(false); + if (parsed.success) { + return; + } + expect(parsed.error.issues[0]?.path).toEqual(["accounts", "work", "password"]); + expect(parsed.error.issues[0]?.message).toBe( + "password is required when serverUrl is configured", + ); + }); + + it("allows password omission when serverUrl is not configured", () => { + const parsed = BlueBubblesConfigSchema.safeParse({ + accounts: { + work: { + name: "Work iMessage", + }, + }, + }); + expect(parsed.success).toBe(true); + }); +}); diff --git a/extensions/bluebubbles/src/config-schema.ts b/extensions/bluebubbles/src/config-schema.ts index 097071757c3..b575ab85fe1 100644 --- a/extensions/bluebubbles/src/config-schema.ts +++ b/extensions/bluebubbles/src/config-schema.ts @@ -24,27 +24,39 @@ const bluebubblesGroupConfigSchema = z.object({ tools: ToolPolicySchema, }); -const bluebubblesAccountSchema = z.object({ - name: z.string().optional(), - enabled: z.boolean().optional(), - markdown: MarkdownConfigSchema, - serverUrl: z.string().optional(), - password: z.string().optional(), - webhookPath: z.string().optional(), - dmPolicy: z.enum(["pairing", "allowlist", "open", "disabled"]).optional(), - allowFrom: z.array(allowFromEntry).optional(), - groupAllowFrom: z.array(allowFromEntry).optional(), - groupPolicy: z.enum(["open", "disabled", "allowlist"]).optional(), - historyLimit: z.number().int().min(0).optional(), - dmHistoryLimit: z.number().int().min(0).optional(), - textChunkLimit: z.number().int().positive().optional(), - chunkMode: z.enum(["length", "newline"]).optional(), - mediaMaxMb: z.number().int().positive().optional(), - mediaLocalRoots: z.array(z.string()).optional(), - sendReadReceipts: z.boolean().optional(), - blockStreaming: z.boolean().optional(), - groups: z.object({}).catchall(bluebubblesGroupConfigSchema).optional(), -}); +const bluebubblesAccountSchema = z + .object({ + name: z.string().optional(), + enabled: z.boolean().optional(), + markdown: MarkdownConfigSchema, + serverUrl: z.string().optional(), + password: z.string().optional(), + webhookPath: z.string().optional(), + dmPolicy: z.enum(["pairing", "allowlist", "open", "disabled"]).optional(), + allowFrom: z.array(allowFromEntry).optional(), + groupAllowFrom: z.array(allowFromEntry).optional(), + groupPolicy: z.enum(["open", "disabled", "allowlist"]).optional(), + historyLimit: z.number().int().min(0).optional(), + dmHistoryLimit: z.number().int().min(0).optional(), + textChunkLimit: z.number().int().positive().optional(), + chunkMode: z.enum(["length", "newline"]).optional(), + mediaMaxMb: z.number().int().positive().optional(), + mediaLocalRoots: z.array(z.string()).optional(), + sendReadReceipts: z.boolean().optional(), + blockStreaming: z.boolean().optional(), + groups: z.object({}).catchall(bluebubblesGroupConfigSchema).optional(), + }) + .superRefine((value, ctx) => { + const serverUrl = value.serverUrl?.trim() ?? ""; + const password = value.password?.trim() ?? ""; + if (serverUrl && !password) { + ctx.addIssue({ + code: z.ZodIssueCode.custom, + path: ["password"], + message: "password is required when serverUrl is configured", + }); + } + }); export const BlueBubblesConfigSchema = bluebubblesAccountSchema.extend({ accounts: z.object({}).catchall(bluebubblesAccountSchema).optional(), diff --git a/extensions/bluebubbles/src/monitor.test.ts b/extensions/bluebubbles/src/monitor.test.ts index 7b4d95d7667..1ebd9455830 100644 --- a/extensions/bluebubbles/src/monitor.test.ts +++ b/extensions/bluebubbles/src/monitor.test.ts @@ -452,6 +452,45 @@ describe("BlueBubbles webhook monitor", () => { expect(res.statusCode).toBe(400); }); + it("accepts URL-encoded payload wrappers", async () => { + const account = createMockAccount(); + const config: OpenClawConfig = {}; + const core = createMockRuntime(); + setBlueBubblesRuntime(core); + + unregister = registerBlueBubblesWebhookTarget({ + account, + config, + runtime: { log: vi.fn(), error: vi.fn() }, + core, + path: "/bluebubbles-webhook", + }); + + const payload = { + type: "new-message", + data: { + text: "hello", + handle: { address: "+15551234567" }, + isGroup: false, + isFromMe: false, + guid: "msg-1", + date: Date.now(), + }, + }; + const encodedBody = new URLSearchParams({ + payload: JSON.stringify(payload), + }).toString(); + + const req = createMockRequest("POST", "/bluebubbles-webhook", encodedBody); + const res = createMockResponse(); + + const handled = await handleBlueBubblesWebhookRequest(req, res); + + expect(handled).toBe(true); + expect(res.statusCode).toBe(200); + expect(res.body).toBe("ok"); + }); + it("returns 408 when request body times out (Slow-Loris protection)", async () => { vi.useFakeTimers(); try { diff --git a/extensions/bluebubbles/src/monitor.ts b/extensions/bluebubbles/src/monitor.ts index 367f095b8bc..fa148e5dd20 100644 --- a/extensions/bluebubbles/src/monitor.ts +++ b/extensions/bluebubbles/src/monitor.ts @@ -2,8 +2,12 @@ import { timingSafeEqual } from "node:crypto"; import type { IncomingMessage, ServerResponse } from "node:http"; import type { OpenClawConfig } from "openclaw/plugin-sdk"; import { + isRequestBodyLimitError, + readRequestBodyWithLimit, registerWebhookTarget, rejectNonPostWebhookRequest, + requestBodyErrorToText, + resolveSingleWebhookTarget, resolveWebhookTargets, } from "openclaw/plugin-sdk"; import { @@ -231,12 +235,6 @@ function removeDebouncer(target: WebhookTarget): void { } export function registerBlueBubblesWebhookTarget(target: WebhookTarget): () => void { - const webhookPassword = target.account.config.password?.trim() ?? ""; - if (!webhookPassword) { - target.runtime.error?.( - `[${target.account.accountId}] BlueBubbles webhook auth requires channels.bluebubbles.password. Configure a password and include it in the webhook URL.`, - ); - } const registered = registerWebhookTarget(webhookTargets, target); return () => { registered.unregister(); @@ -245,64 +243,61 @@ export function registerBlueBubblesWebhookTarget(target: WebhookTarget): () => v }; } -async function readJsonBody(req: IncomingMessage, maxBytes: number, timeoutMs = 30_000) { - const chunks: Buffer[] = []; - let total = 0; - return await new Promise<{ ok: boolean; value?: unknown; error?: string }>((resolve) => { - let done = false; - const finish = (result: { ok: boolean; value?: unknown; error?: string }) => { - if (done) { - return; - } - done = true; - clearTimeout(timer); - resolve(result); +type ReadBlueBubblesWebhookBodyResult = + | { ok: true; value: unknown } + | { ok: false; statusCode: number; error: string }; + +function parseBlueBubblesWebhookPayload( + rawBody: string, +): { ok: true; value: unknown } | { ok: false; error: string } { + const trimmed = rawBody.trim(); + if (!trimmed) { + return { ok: false, error: "empty payload" }; + } + try { + return { ok: true, value: JSON.parse(trimmed) as unknown }; + } catch { + const params = new URLSearchParams(rawBody); + const payload = params.get("payload") ?? params.get("data") ?? params.get("message"); + if (!payload) { + return { ok: false, error: "invalid json" }; + } + try { + return { ok: true, value: JSON.parse(payload) as unknown }; + } catch (error) { + return { ok: false, error: error instanceof Error ? error.message : String(error) }; + } + } +} + +async function readBlueBubblesWebhookBody( + req: IncomingMessage, + maxBytes: number, +): Promise { + try { + const rawBody = await readRequestBodyWithLimit(req, { + maxBytes, + timeoutMs: 30_000, + }); + const parsed = parseBlueBubblesWebhookPayload(rawBody); + if (!parsed.ok) { + return { ok: false, statusCode: 400, error: parsed.error }; + } + return parsed; + } catch (error) { + if (isRequestBodyLimitError(error)) { + return { + ok: false, + statusCode: error.statusCode, + error: requestBodyErrorToText(error.code), + }; + } + return { + ok: false, + statusCode: 400, + error: error instanceof Error ? error.message : String(error), }; - - const timer = setTimeout(() => { - finish({ ok: false, error: "request body timeout" }); - req.destroy(); - }, timeoutMs); - - req.on("data", (chunk: Buffer) => { - total += chunk.length; - if (total > maxBytes) { - finish({ ok: false, error: "payload too large" }); - req.destroy(); - return; - } - chunks.push(chunk); - }); - req.on("end", () => { - try { - const raw = Buffer.concat(chunks).toString("utf8"); - if (!raw.trim()) { - finish({ ok: false, error: "empty payload" }); - return; - } - try { - finish({ ok: true, value: JSON.parse(raw) as unknown }); - return; - } catch { - const params = new URLSearchParams(raw); - const payload = params.get("payload") ?? params.get("data") ?? params.get("message"); - if (payload) { - finish({ ok: true, value: JSON.parse(payload) as unknown }); - return; - } - throw new Error("invalid json"); - } - } catch (err) { - finish({ ok: false, error: err instanceof Error ? err.message : String(err) }); - } - }); - req.on("error", (err) => { - finish({ ok: false, error: err instanceof Error ? err.message : String(err) }); - }); - req.on("close", () => { - finish({ ok: false, error: "connection closed" }); - }); - }); + } } function asRecord(value: unknown): Record | null { @@ -343,26 +338,6 @@ function safeEqualSecret(aRaw: string, bRaw: string): boolean { return timingSafeEqual(bufA, bufB); } -function resolveAuthenticatedWebhookTargets( - targets: WebhookTarget[], - presentedToken: string, -): WebhookTarget[] { - const matches: WebhookTarget[] = []; - for (const target of targets) { - const token = target.account.config.password?.trim() ?? ""; - if (!token) { - continue; - } - if (safeEqualSecret(presentedToken, token)) { - matches.push(target); - if (matches.length > 1) { - break; - } - } - } - return matches; -} - export async function handleBlueBubblesWebhookRequest( req: IncomingMessage, res: ServerResponse, @@ -378,15 +353,9 @@ export async function handleBlueBubblesWebhookRequest( return true; } - const body = await readJsonBody(req, 1024 * 1024); + const body = await readBlueBubblesWebhookBody(req, 1024 * 1024); if (!body.ok) { - if (body.error === "payload too large") { - res.statusCode = 413; - } else if (body.error === "request body timeout") { - res.statusCode = 408; - } else { - res.statusCode = 400; - } + res.statusCode = body.statusCode; res.end(body.error ?? "invalid payload"); console.warn(`[bluebubbles] webhook rejected: ${body.error ?? "invalid payload"}`); return true; @@ -450,9 +419,12 @@ export async function handleBlueBubblesWebhookRequest( req.headers["x-bluebubbles-guid"] ?? req.headers["authorization"]; const guid = (Array.isArray(headerToken) ? headerToken[0] : headerToken) ?? guidParam ?? ""; - const matching = resolveAuthenticatedWebhookTargets(targets, guid); + const matchedTarget = resolveSingleWebhookTarget(targets, (target) => { + const token = target.account.config.password?.trim() ?? ""; + return safeEqualSecret(guid, token); + }); - if (matching.length === 0) { + if (matchedTarget.kind === "none") { res.statusCode = 401; res.end("unauthorized"); console.warn( @@ -461,14 +433,14 @@ export async function handleBlueBubblesWebhookRequest( return true; } - if (matching.length > 1) { + if (matchedTarget.kind === "ambiguous") { res.statusCode = 401; res.end("ambiguous webhook target"); console.warn(`[bluebubbles] webhook rejected: ambiguous target match path=${path}`); return true; } - const target = matching[0]; + const target = matchedTarget.target; target.statusSink?.({ lastInboundAt: Date.now() }); if (reaction) { processReaction(reaction, target).catch((err) => { diff --git a/extensions/googlechat/src/monitor.ts b/extensions/googlechat/src/monitor.ts index 272f3abc833..9cdcbc070fb 100644 --- a/extensions/googlechat/src/monitor.ts +++ b/extensions/googlechat/src/monitor.ts @@ -5,6 +5,7 @@ import { readJsonBodyWithLimit, registerWebhookTarget, rejectNonPostWebhookRequest, + resolveSingleWebhookTargetAsync, resolveWebhookPath, resolveWebhookTargets, requestBodyErrorToText, @@ -208,8 +209,7 @@ export async function handleGoogleChatWebhookRequest( ? authHeaderNow.slice("bearer ".length) : bearer; - const matchedTargets: WebhookTarget[] = []; - for (const target of targets) { + const matchedTarget = await resolveSingleWebhookTargetAsync(targets, async (target) => { const audienceType = target.audienceType; const audience = target.audience; const verification = await verifyGoogleChatRequest({ @@ -217,27 +217,22 @@ export async function handleGoogleChatWebhookRequest( audienceType, audience, }); - if (verification.ok) { - matchedTargets.push(target); - if (matchedTargets.length > 1) { - break; - } - } - } + return verification.ok; + }); - if (matchedTargets.length === 0) { + if (matchedTarget.kind === "none") { res.statusCode = 401; res.end("unauthorized"); return true; } - if (matchedTargets.length > 1) { + if (matchedTarget.kind === "ambiguous") { res.statusCode = 401; res.end("ambiguous webhook target"); return true; } - const selected = matchedTargets[0]; + const selected = matchedTarget.target; selected.statusSink?.({ lastInboundAt: Date.now() }); processGoogleChatEvent(event, selected).catch((err) => { selected?.runtime.error?.( diff --git a/extensions/zalo/src/monitor.ts b/extensions/zalo/src/monitor.ts index 8a28927f6ad..3e1b3256f72 100644 --- a/extensions/zalo/src/monitor.ts +++ b/extensions/zalo/src/monitor.ts @@ -6,6 +6,7 @@ import { readJsonBodyWithLimit, registerWebhookTarget, rejectNonPostWebhookRequest, + resolveSingleWebhookTarget, resolveSenderCommandAuthorization, resolveWebhookPath, resolveWebhookTargets, @@ -195,20 +196,22 @@ export async function handleZaloWebhookRequest( } const headerToken = String(req.headers["x-bot-api-secret-token"] ?? ""); - const matching = targets.filter((entry) => timingSafeEquals(entry.secret, headerToken)); - if (matching.length === 0) { + const matchedTarget = resolveSingleWebhookTarget(targets, (entry) => + timingSafeEquals(entry.secret, headerToken), + ); + if (matchedTarget.kind === "none") { res.statusCode = 401; res.end("unauthorized"); recordWebhookStatus(targets[0]?.runtime, req.url ?? "", res.statusCode); return true; } - if (matching.length > 1) { + if (matchedTarget.kind === "ambiguous") { res.statusCode = 401; res.end("ambiguous webhook target"); recordWebhookStatus(targets[0]?.runtime, req.url ?? "", res.statusCode); return true; } - const target = matching[0]; + const target = matchedTarget.target; const path = req.url ?? ""; const rateLimitKey = `${path}:${req.socket.remoteAddress ?? "unknown"}`; const nowMs = Date.now(); diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index c18892c6fa9..f70fc1419c9 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -88,8 +88,11 @@ export { normalizeWebhookPath, resolveWebhookPath } from "./webhook-path.js"; export { registerWebhookTarget, rejectNonPostWebhookRequest, + resolveSingleWebhookTarget, + resolveSingleWebhookTargetAsync, resolveWebhookTargets, } from "./webhook-targets.js"; +export type { WebhookTargetMatchResult } from "./webhook-targets.js"; export type { AgentMediaPayload } from "./agent-media-payload.js"; export { buildAgentMediaPayload } from "./agent-media-payload.js"; export { diff --git a/src/plugin-sdk/webhook-targets.test.ts b/src/plugin-sdk/webhook-targets.test.ts new file mode 100644 index 00000000000..5c4255533da --- /dev/null +++ b/src/plugin-sdk/webhook-targets.test.ts @@ -0,0 +1,120 @@ +import { EventEmitter } from "node:events"; +import type { IncomingMessage, ServerResponse } from "node:http"; +import { describe, expect, it, vi } from "vitest"; +import { + registerWebhookTarget, + rejectNonPostWebhookRequest, + resolveSingleWebhookTarget, + resolveSingleWebhookTargetAsync, + resolveWebhookTargets, +} from "./webhook-targets.js"; + +function createRequest(method: string, url: string): IncomingMessage { + const req = new EventEmitter() as IncomingMessage; + req.method = method; + req.url = url; + req.headers = {}; + return req; +} + +describe("registerWebhookTarget", () => { + it("normalizes the path and unregisters cleanly", () => { + const targets = new Map>(); + const registered = registerWebhookTarget(targets, { + path: "hook", + id: "A", + }); + + expect(registered.target.path).toBe("/hook"); + expect(targets.get("/hook")).toEqual([registered.target]); + + registered.unregister(); + expect(targets.has("/hook")).toBe(false); + }); +}); + +describe("resolveWebhookTargets", () => { + it("resolves normalized path targets", () => { + const targets = new Map>(); + targets.set("/hook", [{ id: "A" }]); + + expect(resolveWebhookTargets(createRequest("POST", "/hook/"), targets)).toEqual({ + path: "/hook", + targets: [{ id: "A" }], + }); + }); + + it("returns null when path has no targets", () => { + const targets = new Map>(); + expect(resolveWebhookTargets(createRequest("POST", "/missing"), targets)).toBeNull(); + }); +}); + +describe("rejectNonPostWebhookRequest", () => { + it("sets 405 for non-POST requests", () => { + const setHeaderMock = vi.fn(); + const endMock = vi.fn(); + const res = { + statusCode: 200, + setHeader: setHeaderMock, + end: endMock, + } as unknown as ServerResponse; + + const rejected = rejectNonPostWebhookRequest(createRequest("GET", "/hook"), res); + + expect(rejected).toBe(true); + expect(res.statusCode).toBe(405); + expect(setHeaderMock).toHaveBeenCalledWith("Allow", "POST"); + expect(endMock).toHaveBeenCalledWith("Method Not Allowed"); + }); +}); + +describe("resolveSingleWebhookTarget", () => { + it("returns none when no target matches", () => { + const result = resolveSingleWebhookTarget(["a", "b"], (value) => value === "c"); + expect(result).toEqual({ kind: "none" }); + }); + + it("returns the single match", () => { + const result = resolveSingleWebhookTarget(["a", "b"], (value) => value === "b"); + expect(result).toEqual({ kind: "single", target: "b" }); + }); + + it("returns ambiguous after second match", () => { + const calls: string[] = []; + const result = resolveSingleWebhookTarget(["a", "b", "c"], (value) => { + calls.push(value); + return value === "a" || value === "b"; + }); + expect(result).toEqual({ kind: "ambiguous" }); + expect(calls).toEqual(["a", "b"]); + }); +}); + +describe("resolveSingleWebhookTargetAsync", () => { + it("returns none when no target matches", async () => { + const result = await resolveSingleWebhookTargetAsync( + ["a", "b"], + async (value) => value === "c", + ); + expect(result).toEqual({ kind: "none" }); + }); + + it("returns the single async match", async () => { + const result = await resolveSingleWebhookTargetAsync( + ["a", "b"], + async (value) => value === "b", + ); + expect(result).toEqual({ kind: "single", target: "b" }); + }); + + it("returns ambiguous after second async match", async () => { + const calls: string[] = []; + const result = await resolveSingleWebhookTargetAsync(["a", "b", "c"], async (value) => { + calls.push(value); + return value === "a" || value === "b"; + }); + expect(result).toEqual({ kind: "ambiguous" }); + expect(calls).toEqual(["a", "b"]); + }); +}); diff --git a/src/plugin-sdk/webhook-targets.ts b/src/plugin-sdk/webhook-targets.ts index 81747c89412..1a7cd40accf 100644 --- a/src/plugin-sdk/webhook-targets.ts +++ b/src/plugin-sdk/webhook-targets.ts @@ -38,6 +38,51 @@ export function resolveWebhookTargets( return { path, targets }; } +export type WebhookTargetMatchResult = + | { kind: "none" } + | { kind: "single"; target: T } + | { kind: "ambiguous" }; + +export function resolveSingleWebhookTarget( + targets: readonly T[], + isMatch: (target: T) => boolean, +): WebhookTargetMatchResult { + let matched: T | undefined; + for (const target of targets) { + if (!isMatch(target)) { + continue; + } + if (matched) { + return { kind: "ambiguous" }; + } + matched = target; + } + if (!matched) { + return { kind: "none" }; + } + return { kind: "single", target: matched }; +} + +export async function resolveSingleWebhookTargetAsync( + targets: readonly T[], + isMatch: (target: T) => Promise, +): Promise> { + let matched: T | undefined; + for (const target of targets) { + if (!(await isMatch(target))) { + continue; + } + if (matched) { + return { kind: "ambiguous" }; + } + matched = target; + } + if (!matched) { + return { kind: "none" }; + } + return { kind: "single", target: matched }; +} + export function rejectNonPostWebhookRequest(req: IncomingMessage, res: ServerResponse): boolean { if (req.method === "POST") { return false;