diff --git a/CHANGELOG.md b/CHANGELOG.md index 8783efccf72..88afb0b7f0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Gateway/Restart: fix restart-loop edge cases by keeping `openclaw.mjs -> dist/entry.js` bootstrap detection explicit, reacquiring the gateway lock for in-process restart fallback paths, and tightening restart-loop regression coverage. (#23416) Thanks @jeffwnli. +- Channels/Dedupe: centralize plugin dedupe primitives in plugin SDK (memory + persistent), move Feishu inbound dedupe to a namespace-scoped persistent store, and reuse shared dedupe cache logic for Zalo webhook replay + Tlon processed-message tracking to reduce duplicate handling during reconnect/replay paths. - Security/Audit: add `openclaw security audit` detection for open group policies that expose runtime/filesystem tools without sandbox/workspace guards (`security.exposure.open_groups_with_runtime_or_fs`). - Security/Exec env: block request-scoped `HOME` and `ZDOTDIR` overrides in host exec env sanitizers (Node + macOS), preventing shell startup-file execution before allowlist-evaluated command bodies. This ships in the next npm release. Thanks @tdjackey for reporting. - Security/Gateway: emit a startup security warning when insecure/dangerous config flags are enabled (including `gateway.controlUi.dangerouslyDisableDeviceAuth=true`) and point operators to `openclaw security audit`. diff --git a/extensions/feishu/src/bot.ts b/extensions/feishu/src/bot.ts index bee417c5741..14d9219193a 100644 --- a/extensions/feishu/src/bot.ts +++ b/extensions/feishu/src/bot.ts @@ -9,7 +9,7 @@ import { } from "openclaw/plugin-sdk"; import { resolveFeishuAccount } from "./accounts.js"; import { createFeishuClient } from "./client.js"; -import { tryRecordMessage } from "./dedup.js"; +import { tryRecordMessagePersistent } from "./dedup.js"; import { maybeCreateDynamicAgent } from "./dynamic-agent.js"; import { normalizeFeishuExternalKey } from "./external-keys.js"; import { downloadMessageResourceFeishu } from "./media.js"; @@ -510,9 +510,9 @@ export async function handleFeishuMessage(params: { const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; - // Dedup check: skip if this message was already processed + // Dedup check: skip if this message was already processed (memory + disk). const messageId = event.message.message_id; - if (!tryRecordMessage(messageId)) { + if (!(await tryRecordMessagePersistent(messageId, account.accountId, log))) { log(`feishu: skipping duplicate message ${messageId}`); return; } diff --git a/extensions/feishu/src/dedup.ts b/extensions/feishu/src/dedup.ts index 25677f628d5..84e4eb6634c 100644 --- a/extensions/feishu/src/dedup.ts +++ b/extensions/feishu/src/dedup.ts @@ -1,33 +1,54 @@ -// Prevent duplicate processing when WebSocket reconnects or Feishu redelivers messages. -const DEDUP_TTL_MS = 30 * 60 * 1000; // 30 minutes -const DEDUP_MAX_SIZE = 1_000; -const DEDUP_CLEANUP_INTERVAL_MS = 5 * 60 * 1000; // cleanup every 5 minutes -const processedMessageIds = new Map(); // messageId -> timestamp -let lastCleanupTime = Date.now(); +import os from "node:os"; +import path from "node:path"; +import { createDedupeCache, createPersistentDedupe } from "openclaw/plugin-sdk"; -export function tryRecordMessage(messageId: string): boolean { - const now = Date.now(); +// Persistent TTL: 24 hours — survives restarts & WebSocket reconnects. +const DEDUP_TTL_MS = 24 * 60 * 60 * 1000; +const MEMORY_MAX_SIZE = 1_000; +const FILE_MAX_ENTRIES = 10_000; - // Throttled cleanup: evict expired entries at most once per interval. - if (now - lastCleanupTime > DEDUP_CLEANUP_INTERVAL_MS) { - for (const [id, ts] of processedMessageIds) { - if (now - ts > DEDUP_TTL_MS) { - processedMessageIds.delete(id); - } - } - lastCleanupTime = now; +const memoryDedupe = createDedupeCache({ ttlMs: DEDUP_TTL_MS, maxSize: MEMORY_MAX_SIZE }); + +function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string { + const stateOverride = env.OPENCLAW_STATE_DIR?.trim() || env.CLAWDBOT_STATE_DIR?.trim(); + if (stateOverride) { + return stateOverride; } - - if (processedMessageIds.has(messageId)) { - return false; + if (env.VITEST || env.NODE_ENV === "test") { + return path.join(os.tmpdir(), `openclaw-vitest-${process.pid}`); } - - // Evict oldest entries if cache is full. - if (processedMessageIds.size >= DEDUP_MAX_SIZE) { - const first = processedMessageIds.keys().next().value!; - processedMessageIds.delete(first); - } - - processedMessageIds.set(messageId, now); - return true; + return path.join(os.homedir(), ".openclaw"); +} + +function resolveNamespaceFilePath(namespace: string): string { + const safe = namespace.replace(/[^a-zA-Z0-9_-]/g, "_"); + return path.join(resolveStateDirFromEnv(), "feishu", "dedup", `${safe}.json`); +} + +const persistentDedupe = createPersistentDedupe({ + ttlMs: DEDUP_TTL_MS, + memoryMaxSize: MEMORY_MAX_SIZE, + fileMaxEntries: FILE_MAX_ENTRIES, + resolveFilePath: resolveNamespaceFilePath, +}); + +/** + * Synchronous dedup — memory only. + * Kept for backward compatibility; prefer {@link tryRecordMessagePersistent}. + */ +export function tryRecordMessage(messageId: string): boolean { + return !memoryDedupe.check(messageId); +} + +export async function tryRecordMessagePersistent( + messageId: string, + namespace = "global", + log?: (...args: unknown[]) => void, +): Promise { + return persistentDedupe.checkAndRecord(messageId, { + namespace, + onDiskError: (error) => { + log?.(`feishu-dedup: disk error, falling back to memory: ${String(error)}`); + }, + }); } diff --git a/extensions/tlon/src/monitor/processed-messages.ts b/extensions/tlon/src/monitor/processed-messages.ts index dfae103f310..560db28575a 100644 --- a/extensions/tlon/src/monitor/processed-messages.ts +++ b/extensions/tlon/src/monitor/processed-messages.ts @@ -1,3 +1,5 @@ +import { createDedupeCache } from "openclaw/plugin-sdk"; + export type ProcessedMessageTracker = { mark: (id?: string | null) => boolean; has: (id?: string | null) => boolean; @@ -5,29 +7,14 @@ export type ProcessedMessageTracker = { }; export function createProcessedMessageTracker(limit = 2000): ProcessedMessageTracker { - const seen = new Set(); - const order: string[] = []; + const dedupe = createDedupeCache({ ttlMs: 0, maxSize: limit }); const mark = (id?: string | null) => { const trimmed = id?.trim(); if (!trimmed) { return true; } - if (seen.has(trimmed)) { - return false; - } - seen.add(trimmed); - order.push(trimmed); - if (order.length > limit) { - const overflow = order.length - limit; - for (let i = 0; i < overflow; i += 1) { - const oldest = order.shift(); - if (oldest) { - seen.delete(oldest); - } - } - } - return true; + return !dedupe.check(trimmed); }; const has = (id?: string | null) => { @@ -35,12 +22,12 @@ export function createProcessedMessageTracker(limit = 2000): ProcessedMessageTra if (!trimmed) { return false; } - return seen.has(trimmed); + return dedupe.peek(trimmed); }; return { mark, has, - size: () => seen.size, + size: () => dedupe.size(), }; } diff --git a/extensions/zalo/src/monitor.ts b/extensions/zalo/src/monitor.ts index 819a3afe831..6b253d3cd7b 100644 --- a/extensions/zalo/src/monitor.ts +++ b/extensions/zalo/src/monitor.ts @@ -2,6 +2,7 @@ import { timingSafeEqual } from "node:crypto"; import type { IncomingMessage, ServerResponse } from "node:http"; import type { OpenClawConfig, MarkdownTableMode } from "openclaw/plugin-sdk"; import { + createDedupeCache, createReplyPrefixOptions, readJsonBodyWithLimit, registerWebhookTarget, @@ -92,7 +93,10 @@ type WebhookTarget = { const webhookTargets = new Map(); const webhookRateLimits = new Map(); -const recentWebhookEvents = new Map(); +const recentWebhookEvents = createDedupeCache({ + ttlMs: ZALO_WEBHOOK_REPLAY_WINDOW_MS, + maxSize: 5000, +}); const webhookStatusCounters = new Map(); function isJsonContentType(value: string | string[] | undefined): boolean { @@ -141,22 +145,7 @@ function isReplayEvent(update: ZaloUpdate, nowMs: number): boolean { return false; } const key = `${update.event_name}:${messageId}`; - const seenAt = recentWebhookEvents.get(key); - recentWebhookEvents.set(key, nowMs); - - if (seenAt && nowMs - seenAt < ZALO_WEBHOOK_REPLAY_WINDOW_MS) { - return true; - } - - if (recentWebhookEvents.size > 5000) { - for (const [eventKey, timestamp] of recentWebhookEvents) { - if (nowMs - timestamp >= ZALO_WEBHOOK_REPLAY_WINDOW_MS) { - recentWebhookEvents.delete(eventKey); - } - } - } - - return false; + return recentWebhookEvents.check(key, nowMs); } function recordWebhookStatus( diff --git a/src/infra/dedupe.ts b/src/infra/dedupe.ts index ffb26d295c5..2103d74c19c 100644 --- a/src/infra/dedupe.ts +++ b/src/infra/dedupe.ts @@ -2,6 +2,7 @@ import { pruneMapToMaxSize } from "./map-size.js"; export type DedupeCache = { check: (key: string | undefined | null, now?: number) => boolean; + peek: (key: string | undefined | null, now?: number) => boolean; clear: () => void; size: () => number; }; @@ -37,20 +38,39 @@ export function createDedupeCache(options: DedupeCacheOptions): DedupeCache { pruneMapToMaxSize(cache, maxSize); }; + const hasUnexpired = (key: string, now: number, touchOnRead: boolean): boolean => { + const existing = cache.get(key); + if (existing === undefined) { + return false; + } + if (ttlMs > 0 && now - existing >= ttlMs) { + cache.delete(key); + return false; + } + if (touchOnRead) { + touch(key, now); + } + return true; + }; + return { check: (key, now = Date.now()) => { if (!key) { return false; } - const existing = cache.get(key); - if (existing !== undefined && (ttlMs <= 0 || now - existing < ttlMs)) { - touch(key, now); + if (hasUnexpired(key, now, true)) { return true; } touch(key, now); prune(now); return false; }, + peek: (key, now = Date.now()) => { + if (!key) { + return false; + } + return hasUnexpired(key, now, false); + }, clear: () => { cache.clear(); }, diff --git a/src/infra/infra-store.test.ts b/src/infra/infra-store.test.ts index 0f25a80594d..cd36e52dd44 100644 --- a/src/infra/infra-store.test.ts +++ b/src/infra/infra-store.test.ts @@ -227,5 +227,13 @@ describe("infra store", () => { expect(cache.check("c", 200)).toBe(false); expect(cache.size()).toBe(2); }); + + it("supports non-mutating existence checks via peek()", () => { + const cache = createDedupeCache({ ttlMs: 1000, maxSize: 10 }); + expect(cache.peek("a", 100)).toBe(false); + expect(cache.check("a", 100)).toBe(false); + expect(cache.peek("a", 200)).toBe(true); + expect(cache.peek("a", 1201)).toBe(false); + }); }); }); diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index b23b52a072e..a3f58c034cc 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -182,6 +182,12 @@ export { } from "../infra/device-pairing.js"; export { createDedupeCache } from "../infra/dedupe.js"; export type { DedupeCache } from "../infra/dedupe.js"; +export { createPersistentDedupe } from "./persistent-dedupe.js"; +export type { + PersistentDedupe, + PersistentDedupeCheckOptions, + PersistentDedupeOptions, +} from "./persistent-dedupe.js"; export { formatErrorMessage } from "../infra/errors.js"; export { DEFAULT_WEBHOOK_BODY_TIMEOUT_MS, diff --git a/src/plugin-sdk/persistent-dedupe.test.ts b/src/plugin-sdk/persistent-dedupe.test.ts new file mode 100644 index 00000000000..e1a1e3faefa --- /dev/null +++ b/src/plugin-sdk/persistent-dedupe.test.ts @@ -0,0 +1,73 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { createPersistentDedupe } from "./persistent-dedupe.js"; + +const tmpRoots: string[] = []; + +async function makeTmpRoot(): Promise { + const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-dedupe-")); + tmpRoots.push(root); + return root; +} + +afterEach(async () => { + await Promise.all( + tmpRoots.splice(0).map((root) => fs.rm(root, { recursive: true, force: true })), + ); +}); + +describe("createPersistentDedupe", () => { + it("deduplicates keys and persists across instances", async () => { + const root = await makeTmpRoot(); + const resolveFilePath = (namespace: string) => path.join(root, `${namespace}.json`); + + const first = createPersistentDedupe({ + ttlMs: 24 * 60 * 60 * 1000, + memoryMaxSize: 100, + fileMaxEntries: 1000, + resolveFilePath, + }); + expect(await first.checkAndRecord("m1", { namespace: "a" })).toBe(true); + expect(await first.checkAndRecord("m1", { namespace: "a" })).toBe(false); + + const second = createPersistentDedupe({ + ttlMs: 24 * 60 * 60 * 1000, + memoryMaxSize: 100, + fileMaxEntries: 1000, + resolveFilePath, + }); + expect(await second.checkAndRecord("m1", { namespace: "a" })).toBe(false); + expect(await second.checkAndRecord("m1", { namespace: "b" })).toBe(true); + }); + + it("guards concurrent calls for the same key", async () => { + const root = await makeTmpRoot(); + const dedupe = createPersistentDedupe({ + ttlMs: 10_000, + memoryMaxSize: 100, + fileMaxEntries: 1000, + resolveFilePath: (namespace) => path.join(root, `${namespace}.json`), + }); + + const [first, second] = await Promise.all([ + dedupe.checkAndRecord("race-key", { namespace: "feishu" }), + dedupe.checkAndRecord("race-key", { namespace: "feishu" }), + ]); + expect(first).toBe(true); + expect(second).toBe(false); + }); + + it("falls back to memory-only behavior on disk errors", async () => { + const dedupe = createPersistentDedupe({ + ttlMs: 10_000, + memoryMaxSize: 100, + fileMaxEntries: 1000, + resolveFilePath: () => path.join("/dev/null", "dedupe.json"), + }); + + expect(await dedupe.checkAndRecord("memory-only", { namespace: "x" })).toBe(true); + expect(await dedupe.checkAndRecord("memory-only", { namespace: "x" })).toBe(false); + }); +}); diff --git a/src/plugin-sdk/persistent-dedupe.ts b/src/plugin-sdk/persistent-dedupe.ts new file mode 100644 index 00000000000..947217fda68 --- /dev/null +++ b/src/plugin-sdk/persistent-dedupe.ts @@ -0,0 +1,164 @@ +import { createDedupeCache } from "../infra/dedupe.js"; +import type { FileLockOptions } from "./file-lock.js"; +import { withFileLock } from "./file-lock.js"; +import { readJsonFileWithFallback, writeJsonFileAtomically } from "./json-store.js"; + +type PersistentDedupeData = Record; + +export type PersistentDedupeOptions = { + ttlMs: number; + memoryMaxSize: number; + fileMaxEntries: number; + resolveFilePath: (namespace: string) => string; + lockOptions?: Partial; + onDiskError?: (error: unknown) => void; +}; + +export type PersistentDedupeCheckOptions = { + namespace?: string; + now?: number; + onDiskError?: (error: unknown) => void; +}; + +export type PersistentDedupe = { + checkAndRecord: (key: string, options?: PersistentDedupeCheckOptions) => Promise; + clearMemory: () => void; + memorySize: () => number; +}; + +const DEFAULT_LOCK_OPTIONS: FileLockOptions = { + retries: { + retries: 6, + factor: 1.35, + minTimeout: 8, + maxTimeout: 180, + randomize: true, + }, + stale: 60_000, +}; + +function mergeLockOptions(overrides?: Partial): FileLockOptions { + return { + stale: overrides?.stale ?? DEFAULT_LOCK_OPTIONS.stale, + retries: { + retries: overrides?.retries?.retries ?? DEFAULT_LOCK_OPTIONS.retries.retries, + factor: overrides?.retries?.factor ?? DEFAULT_LOCK_OPTIONS.retries.factor, + minTimeout: overrides?.retries?.minTimeout ?? DEFAULT_LOCK_OPTIONS.retries.minTimeout, + maxTimeout: overrides?.retries?.maxTimeout ?? DEFAULT_LOCK_OPTIONS.retries.maxTimeout, + randomize: overrides?.retries?.randomize ?? DEFAULT_LOCK_OPTIONS.retries.randomize, + }, + }; +} + +function sanitizeData(value: unknown): PersistentDedupeData { + if (!value || typeof value !== "object") { + return {}; + } + const out: PersistentDedupeData = {}; + for (const [key, ts] of Object.entries(value as Record)) { + if (typeof ts === "number" && Number.isFinite(ts) && ts > 0) { + out[key] = ts; + } + } + return out; +} + +function pruneData( + data: PersistentDedupeData, + now: number, + ttlMs: number, + maxEntries: number, +): void { + if (ttlMs > 0) { + for (const [key, ts] of Object.entries(data)) { + if (now - ts >= ttlMs) { + delete data[key]; + } + } + } + + const keys = Object.keys(data); + if (keys.length <= maxEntries) { + return; + } + + keys + .toSorted((a, b) => data[a] - data[b]) + .slice(0, keys.length - maxEntries) + .forEach((key) => { + delete data[key]; + }); +} + +export function createPersistentDedupe(options: PersistentDedupeOptions): PersistentDedupe { + const ttlMs = Math.max(0, Math.floor(options.ttlMs)); + const memoryMaxSize = Math.max(0, Math.floor(options.memoryMaxSize)); + const fileMaxEntries = Math.max(1, Math.floor(options.fileMaxEntries)); + const lockOptions = mergeLockOptions(options.lockOptions); + const memory = createDedupeCache({ ttlMs, maxSize: memoryMaxSize }); + const inflight = new Map>(); + + async function checkAndRecordInner( + key: string, + namespace: string, + scopedKey: string, + now: number, + onDiskError?: (error: unknown) => void, + ): Promise { + if (memory.check(scopedKey, now)) { + return false; + } + + const path = options.resolveFilePath(namespace); + try { + const duplicate = await withFileLock(path, lockOptions, async () => { + const { value } = await readJsonFileWithFallback(path, {}); + const data = sanitizeData(value); + const seenAt = data[key]; + const isRecent = seenAt != null && (ttlMs <= 0 || now - seenAt < ttlMs); + if (isRecent) { + return true; + } + data[key] = now; + pruneData(data, now, ttlMs, fileMaxEntries); + await writeJsonFileAtomically(path, data); + return false; + }); + return !duplicate; + } catch (error) { + onDiskError?.(error); + return true; + } + } + + async function checkAndRecord( + key: string, + dedupeOptions?: PersistentDedupeCheckOptions, + ): Promise { + const trimmed = key.trim(); + if (!trimmed) { + return true; + } + const namespace = dedupeOptions?.namespace?.trim() || "global"; + const scopedKey = `${namespace}:${trimmed}`; + if (inflight.has(scopedKey)) { + return false; + } + + const onDiskError = dedupeOptions?.onDiskError ?? options.onDiskError; + const now = dedupeOptions?.now ?? Date.now(); + const work = checkAndRecordInner(trimmed, namespace, scopedKey, now, onDiskError); + inflight.set(scopedKey, work); + try { + return await work; + } finally { + inflight.delete(scopedKey); + } + } + + return { + checkAndRecord, + clearMemory: () => memory.clear(), + memorySize: () => memory.size(), + }; +}