From 92d41e2135ccee53bd841a3fef5827e26be6ed54 Mon Sep 17 00:00:00 2001 From: Alex Knight Date: Mon, 4 May 2026 15:48:07 +1000 Subject: [PATCH] feat(plugin-state): add atomic dedupe claims --- CHANGELOG.md | 1 + docs/channels/bluebubbles.md | 18 ++ docs/plugins/sdk-runtime.md | 2 +- extensions/bluebubbles/openclaw.plugin.json | 4 +- .../bluebubbles/src/inbound-dedupe.test.ts | 118 ++++++++++- extensions/bluebubbles/src/inbound-dedupe.ts | 191 ++++++++++++++++++ extensions/bluebubbles/src/runtime.ts | 4 + .../plugin-state-store.e2e.test.ts | 10 +- src/plugin-state/plugin-state-store.sqlite.ts | 13 +- src/plugin-state/plugin-state-store.test.ts | 35 ++++ src/plugin-state/plugin-state-store.ts | 32 ++- src/plugin-state/plugin-state-store.types.ts | 1 + 12 files changed, 412 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fe843cb3db6..f4070f4e73c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai ### Changes +- BlueBubbles: add an experimental opt-in that stores inbound replay dedupe claims in the SQLite-backed plugin runtime state store while preserving the existing file-backed dedupe default. Thanks @amknight. - Control UI/header: show the active agent name in dashboard breadcrumbs without adding the current session key, keeping non-chat views oriented without crowding the topbar. - Control UI/cron: make the New Job sidebar collapsible so the jobs list can reclaim space while keeping the form one click away. Thanks @BunsDev. - Gateway/startup: keep model-catalog test helpers, run-session lookup code, QR pairing helpers, and TypeBox memory-tool schema construction out of hot startup import paths, reducing default gateway benchmark plugin-load and memory pressure. diff --git a/docs/channels/bluebubbles.md b/docs/channels/bluebubbles.md index f67ddf32d80..f694a7481fb 100644 --- a/docs/channels/bluebubbles.md +++ b/docs/channels/bluebubbles.md @@ -575,6 +575,24 @@ Full configuration: [Configuration](/gateway/configuration) - `channels.bluebubbles.dmHistoryLimit`: DM history limit. - `channels.bluebubbles.replyContextApiFallback`: When an inbound reply lands without `replyToBody`/`replyToSender` and the in-memory reply-context cache misses, fetch the original message from the BlueBubbles HTTP API as a best-effort fallback (default: `false`). Useful for multi-instance deployments sharing one BlueBubbles account, after process restarts, or after long-lived TTL/LRU cache eviction. The fetch is SSRF-guarded by the same policy as every other BlueBubbles client request, never throws, and populates the cache so subsequent replies amortize. Per-account override: `channels.bluebubbles.accounts..replyContextApiFallback`. A channel-level setting propagates to accounts that omit the flag. + + + - `plugins.entries.bluebubbles.config.experimentalPersistentState`: Opt in to the experimental SQLite-backed plugin runtime state store for inbound webhook dedupe. Default: `false` (the existing file-backed dedupe remains the safe default). When enabled, BlueBubbles uses an atomic durable claim for each inbound GUID, keeps the same 7-day TTL, and requests a 50,000-row plugin-state capacity so high-volume replay dedupe is not reduced by the runtime state's default 1,000-row cap. + + ```json5 + { + plugins: { + entries: { + bluebubbles: { + config: { + experimentalPersistentState: true, + }, + }, + }, + }, + } + ``` + - `channels.bluebubbles.actions`: Enable/disable specific actions. diff --git a/docs/plugins/sdk-runtime.md b/docs/plugins/sdk-runtime.md index bd4835df097..6ff1e1506fc 100644 --- a/docs/plugins/sdk-runtime.md +++ b/docs/plugins/sdk-runtime.md @@ -423,7 +423,7 @@ Provider and channel execution paths must use the active runtime config snapshot await store.clear(); ``` - Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Use `registerIfAbsent(...)` for atomic dedupe claims: it returns `true` when the key was missing or expired and registered, or `false` when a live value already exists without overwriting its value, creation time, or TTL. Limits: `maxEntries` per namespace, 1,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry. + Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Use `registerIfAbsent(...)` for atomic dedupe claims: it returns `true` when the key was missing or expired and registered, or `false` when a live value already exists without overwriting its value, creation time, or TTL. Limits: `maxEntries` per namespace, 1,000 live rows per plugin by default, an optional bounded `maxPluginEntries` override for high-volume stores, JSON values under 64KB, and optional TTL expiry. Bundled plugins only in this release. diff --git a/extensions/bluebubbles/openclaw.plugin.json b/extensions/bluebubbles/openclaw.plugin.json index 9e6b15892bc..c087c097f18 100644 --- a/extensions/bluebubbles/openclaw.plugin.json +++ b/extensions/bluebubbles/openclaw.plugin.json @@ -7,6 +7,8 @@ "configSchema": { "type": "object", "additionalProperties": false, - "properties": {} + "properties": { + "experimentalPersistentState": { "type": "boolean" } + } } } diff --git a/extensions/bluebubbles/src/inbound-dedupe.test.ts b/extensions/bluebubbles/src/inbound-dedupe.test.ts index 190ea06d7b3..a755bae6412 100644 --- a/extensions/bluebubbles/src/inbound-dedupe.test.ts +++ b/extensions/bluebubbles/src/inbound-dedupe.test.ts @@ -1,10 +1,74 @@ -import { beforeEach, describe, expect, it } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; import { _resetBlueBubblesInboundDedupForTest, claimBlueBubblesInboundMessage, commitBlueBubblesCoalescedMessageIds, resolveBlueBubblesInboundDedupeKey, } from "./inbound-dedupe.js"; +import type { PluginRuntime } from "./runtime-api.js"; +import { clearBlueBubblesRuntime, setBlueBubblesRuntime } from "./runtime.js"; + +type RuntimeStateRecord = { status: "claimed" | "committed"; at: number }; + +type RuntimeStateStore = { + register: (key: string, value: RuntimeStateRecord, opts?: { ttlMs?: number }) => Promise; + registerIfAbsent: ( + key: string, + value: RuntimeStateRecord, + opts?: { ttlMs?: number }, + ) => Promise; + lookup: (key: string) => Promise; + delete: (key: string) => Promise; + entries: () => Promise; + clear: () => Promise; +}; + +function createMemoryRuntimeStateStore(): RuntimeStateStore { + const entries = new Map(); + return { + async register(key, value) { + entries.set(key, value); + }, + async registerIfAbsent(key, value) { + if (entries.has(key)) { + return false; + } + entries.set(key, value); + return true; + }, + async lookup(key) { + return entries.get(key); + }, + async delete(key) { + return entries.delete(key); + }, + async entries() { + return [...entries.entries()]; + }, + async clear() { + entries.clear(); + }, + }; +} + +function installRuntimeStateStub(enabled: boolean, store = createMemoryRuntimeStateStore()) { + const openKeyedStore = vi.fn(() => store); + setBlueBubblesRuntime({ + config: { + current: () => ({ + plugins: { + entries: { + bluebubbles: { + config: { experimentalPersistentState: enabled }, + }, + }, + }, + }), + }, + state: { openKeyedStore }, + } as unknown as PluginRuntime); + return { openKeyedStore, store }; +} async function claimAndFinalize(guid: string | undefined, accountId: string): Promise { const claim = await claimBlueBubblesInboundMessage({ guid, accountId }); @@ -16,6 +80,7 @@ async function claimAndFinalize(guid: string | undefined, accountId: string): Pr describe("claimBlueBubblesInboundMessage", () => { beforeEach(() => { + clearBlueBubblesRuntime(); _resetBlueBubblesInboundDedupForTest(); }); @@ -24,6 +89,56 @@ describe("claimBlueBubblesInboundMessage", () => { expect(await claimAndFinalize("g1", "acc")).toBe("duplicate"); }); + it("keeps file-backed dedupe as the default when persistent state is not opted in", async () => { + const { openKeyedStore } = installRuntimeStateStub(false); + + expect(await claimAndFinalize("g-default", "acc")).toBe("claimed"); + expect(await claimAndFinalize("g-default", "acc")).toBe("duplicate"); + expect(openKeyedStore).not.toHaveBeenCalled(); + }); + + it("uses runtime state registerIfAbsent when persistent state is opted in", async () => { + const { openKeyedStore, store } = installRuntimeStateStub(true); + const registerIfAbsent = vi.spyOn(store, "registerIfAbsent"); + + const first = await claimBlueBubblesInboundMessage({ guid: "g-sqlite", accountId: "acc" }); + expect(first.kind).toBe("claimed"); + const second = await claimBlueBubblesInboundMessage({ guid: "g-sqlite", accountId: "acc" }); + expect(second.kind).toBe("inflight"); + if (first.kind === "claimed") { + await first.finalize(); + } + + expect( + (await claimBlueBubblesInboundMessage({ guid: "g-sqlite", accountId: "acc" })).kind, + ).toBe("duplicate"); + expect(openKeyedStore).toHaveBeenCalledWith( + expect.objectContaining({ + namespace: expect.stringMatching(/^inbound-dedupe\./), + maxEntries: 50_000, + maxPluginEntries: 50_000, + }), + ); + expect(registerIfAbsent).toHaveBeenCalledWith( + "g-sqlite", + expect.objectContaining({ status: "claimed" }), + { ttlMs: 7 * 24 * 60 * 60 * 1_000 }, + ); + }); + + it("releases opted-in runtime state claims so later replays can retry", async () => { + installRuntimeStateStub(true); + + const first = await claimBlueBubblesInboundMessage({ guid: "g-release", accountId: "acc" }); + expect(first.kind).toBe("claimed"); + if (first.kind === "claimed") { + first.release(); + } + expect( + (await claimBlueBubblesInboundMessage({ guid: "g-release", accountId: "acc" })).kind, + ).toBe("claimed"); + }); + it("scopes dedupe per account", async () => { expect(await claimAndFinalize("g1", "a")).toBe("claimed"); expect(await claimAndFinalize("g1", "b")).toBe("claimed"); @@ -61,6 +176,7 @@ describe("claimBlueBubblesInboundMessage", () => { describe("commitBlueBubblesCoalescedMessageIds", () => { beforeEach(() => { + clearBlueBubblesRuntime(); _resetBlueBubblesInboundDedupForTest(); }); diff --git a/extensions/bluebubbles/src/inbound-dedupe.ts b/extensions/bluebubbles/src/inbound-dedupe.ts index b94db99b08d..ccb60e94288 100644 --- a/extensions/bluebubbles/src/inbound-dedupe.ts +++ b/extensions/bluebubbles/src/inbound-dedupe.ts @@ -5,6 +5,7 @@ import { type ClaimableDedupe, createClaimableDedupe } from "openclaw/plugin-sdk import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; import type { NormalizedWebhookMessage } from "./monitor-normalize.js"; +import { tryGetBlueBubblesRuntime } from "./runtime.js"; // BlueBubbles has no sequence/ack in its webhook protocol, and its // MessagePoller replays its ~1-week lookback window as `new-message` events @@ -17,6 +18,7 @@ import type { NormalizedWebhookMessage } from "./monitor-normalize.js"; const DEDUP_TTL_MS = 7 * 24 * 60 * 60 * 1_000; const MEMORY_MAX_SIZE = 5_000; const FILE_MAX_ENTRIES = 50_000; +const SQLITE_NAMESPACE_PREFIX = "inbound-dedupe"; // Cap GUID length so a malformed or hostile payload can't bloat the on-disk // dedupe file. Real BB GUIDs are short (<64 chars); 512 is generous. const MAX_GUID_CHARS = 512; @@ -100,8 +102,89 @@ function buildMemoryOnlyImpl(): ClaimableDedupe { }); } +type BlueBubblesSqliteDedupeRecord = { + status: "claimed" | "committed"; + at: number; +}; + +type BlueBubblesSqliteDedupeStore = { + register( + key: string, + value: BlueBubblesSqliteDedupeRecord, + opts?: { ttlMs?: number }, + ): Promise; + registerIfAbsent( + key: string, + value: BlueBubblesSqliteDedupeRecord, + opts?: { ttlMs?: number }, + ): Promise; + lookup(key: string): Promise; + delete(key: string): Promise; +}; + +const sqliteInflightClaims = new Set(); +const sqliteStores = new Map(); let impl: ClaimableDedupe = buildPersistentImpl(); +function isRecord(value: unknown): value is Record { + return Boolean(value && typeof value === "object"); +} + +function isExperimentalPersistentStateEnabled(): boolean { + const runtime = tryGetBlueBubblesRuntime(); + if (!runtime) { + return false; + } + const cfg = runtime.config.current() as unknown; + if (!isRecord(cfg)) { + return false; + } + const plugins = cfg.plugins; + if (!isRecord(plugins)) { + return false; + } + const entries = plugins.entries; + if (!isRecord(entries)) { + return false; + } + const entry = entries.bluebubbles; + if (!isRecord(entry)) { + return false; + } + const pluginConfig = entry.config; + return isRecord(pluginConfig) && pluginConfig.experimentalPersistentState === true; +} + +function resolveSqliteNamespace(accountId: string): string { + const safePrefix = accountId.replace(/[^a-zA-Z0-9_-]/g, "_").slice(0, 48) || "account"; + const hash = createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 12); + return `${SQLITE_NAMESPACE_PREFIX}.${safePrefix}_${hash}`; +} + +function getSqliteStore(accountId: string): BlueBubblesSqliteDedupeStore { + const namespace = resolveSqliteNamespace(accountId); + const existing = sqliteStores.get(namespace); + if (existing) { + return existing; + } + const runtime = tryGetBlueBubblesRuntime(); + if (!runtime) { + throw new Error("BlueBubbles runtime not initialized"); + } + const store = runtime.state.openKeyedStore({ + namespace, + maxEntries: FILE_MAX_ENTRIES, + maxPluginEntries: FILE_MAX_ENTRIES, + defaultTtlMs: DEDUP_TTL_MS, + }); + sqliteStores.set(namespace, store); + return store; +} + +function resolveSqliteScopedClaimKey(accountId: string, guid: string): string { + return `${accountId}\0${guid}`; +} + function sanitizeGuid(guid: string | undefined | null): string | null { const trimmed = guid?.trim(); if (!trimmed) { @@ -186,6 +269,14 @@ export async function claimBlueBubblesInboundMessage(params: { if (!normalized) { return { kind: "skip" }; } + if (isExperimentalPersistentStateEnabled()) { + return claimBlueBubblesSqliteInboundMessage({ + guid: normalized, + accountId: params.accountId, + onDiskError: params.onDiskError, + }); + } + const claim = await impl.claim(normalized, { namespace: params.accountId, onDiskError: params.onDiskError, @@ -210,6 +301,87 @@ export async function claimBlueBubblesInboundMessage(params: { }; } +async function claimBlueBubblesSqliteInboundMessage(params: { + guid: string; + accountId: string; + onDiskError?: (error: unknown) => void; +}): Promise { + const scopedClaimKey = resolveSqliteScopedClaimKey(params.accountId, params.guid); + if (sqliteInflightClaims.has(scopedClaimKey)) { + return { kind: "inflight" }; + } + sqliteInflightClaims.add(scopedClaimKey); + try { + const store = getSqliteStore(params.accountId); + const inserted = await store.registerIfAbsent( + params.guid, + { status: "claimed", at: Date.now() }, + { ttlMs: DEDUP_TTL_MS }, + ); + if (!inserted) { + sqliteInflightClaims.delete(scopedClaimKey); + return { kind: "duplicate" }; + } + return { + kind: "claimed", + finalize: async () => { + try { + await store.register( + params.guid, + { status: "committed", at: Date.now() }, + { ttlMs: DEDUP_TTL_MS }, + ); + } catch (error) { + params.onDiskError?.(error); + } finally { + sqliteInflightClaims.delete(scopedClaimKey); + } + }, + release: () => { + sqliteInflightClaims.delete(scopedClaimKey); + void store.delete(params.guid).catch(params.onDiskError); + }, + }; + } catch (error) { + sqliteInflightClaims.delete(scopedClaimKey); + params.onDiskError?.(error); + return claimBlueBubblesFileBackedInboundMessage({ + guid: params.guid, + accountId: params.accountId, + onDiskError: params.onDiskError, + }); + } +} + +async function claimBlueBubblesFileBackedInboundMessage(params: { + guid: string; + accountId: string; + onDiskError?: (error: unknown) => void; +}): Promise { + const claim = await impl.claim(params.guid, { + namespace: params.accountId, + onDiskError: params.onDiskError, + }); + if (claim.kind === "duplicate") { + return { kind: "duplicate" }; + } + if (claim.kind === "inflight") { + return { kind: "inflight" }; + } + return { + kind: "claimed", + finalize: async () => { + await impl.commit(params.guid, { + namespace: params.accountId, + onDiskError: params.onDiskError, + }); + }, + release: () => { + impl.release(params.guid, { namespace: params.accountId }); + }, + }; +} + /** * Mark a set of source messageIds as already processed, without going through * the `claim()` protocol. Intended for the coalesced-batch case: when the @@ -232,6 +404,19 @@ export async function commitBlueBubblesCoalescedMessageIds(params: { if (!normalized) { continue; } + if (isExperimentalPersistentStateEnabled()) { + try { + const store = getSqliteStore(params.accountId); + await store.registerIfAbsent( + normalized, + { status: "committed", at: Date.now() }, + { ttlMs: DEDUP_TTL_MS }, + ); + continue; + } catch (error) { + params.onDiskError?.(error); + } + } await impl.commit(normalized, { namespace: params.accountId, onDiskError: params.onDiskError, @@ -246,6 +431,10 @@ export async function commitBlueBubblesCoalescedMessageIds(params: { * file-naming convention changed between versions. */ export async function warmupBlueBubblesInboundDedupe(accountId: string): Promise { + if (isExperimentalPersistentStateEnabled()) { + getSqliteStore(accountId); + return; + } // Trigger the migration side-effect inside resolveNamespaceFilePath. resolveNamespaceFilePath(accountId); await impl.warmup(accountId); @@ -258,4 +447,6 @@ export async function warmupBlueBubblesInboundDedupe(accountId: string): Promise */ export function _resetBlueBubblesInboundDedupForTest(): void { impl = buildMemoryOnlyImpl(); + sqliteInflightClaims.clear(); + sqliteStores.clear(); } diff --git a/extensions/bluebubbles/src/runtime.ts b/extensions/bluebubbles/src/runtime.ts index f8b1098ec1a..05c0564c90d 100644 --- a/extensions/bluebubbles/src/runtime.ts +++ b/extensions/bluebubbles/src/runtime.ts @@ -16,6 +16,10 @@ export function getBlueBubblesRuntime(): PluginRuntime { return runtimeStore.getRuntime(); } +export function tryGetBlueBubblesRuntime(): PluginRuntime | null { + return runtimeStore.tryGetRuntime(); +} + export function warnBlueBubbles(message: string): void { const formatted = `[bluebubbles] ${message}`; // Backward-compatible with tests/legacy injections that pass { log }. diff --git a/src/plugin-state/plugin-state-store.e2e.test.ts b/src/plugin-state/plugin-state-store.e2e.test.ts index d6ab66a72de..b9a769d28ca 100644 --- a/src/plugin-state/plugin-state-store.e2e.test.ts +++ b/src/plugin-state/plugin-state-store.e2e.test.ts @@ -11,7 +11,7 @@ import { sweepExpiredPluginStateEntries, } from "./plugin-state-store.js"; import { resolvePluginStateDir, resolvePluginStateSqlitePath } from "./plugin-state-store.paths.js"; -import { MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN } from "./plugin-state-store.sqlite.js"; +import { DEFAULT_MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN } from "./plugin-state-store.sqlite.js"; import { seedPluginStateEntriesForTests } from "./plugin-state-store.test-helpers.js"; afterEach(() => { @@ -196,12 +196,12 @@ describe("limits", () => { it("enforces the per-plugin live-row cap", async () => { await withOpenClawTestState({ label: "e2e-limit-plugin" }, async () => { - // Spread MAX_ENTRIES_PER_PLUGIN rows across several namespaces so - // namespace eviction never fires (each namespace has generous room). + // Spread the default plugin cap across several namespaces so namespace + // eviction never fires (each namespace has generous room). const nsCount = 10; - const perNs = MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN / nsCount; // 100 + const perNs = DEFAULT_MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN / nsCount; // 100 seedPluginStateEntriesForTests( - Array.from({ length: MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN }, (_, index) => { + Array.from({ length: DEFAULT_MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN }, (_, index) => { const ns = Math.floor(index / perNs); const k = index % perNs; return { diff --git a/src/plugin-state/plugin-state-store.sqlite.ts b/src/plugin-state/plugin-state-store.sqlite.ts index 38125dceb7e..2ed93d2a364 100644 --- a/src/plugin-state/plugin-state-store.sqlite.ts +++ b/src/plugin-state/plugin-state-store.sqlite.ts @@ -19,7 +19,8 @@ const PLUGIN_STATE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const; const MAX_ENTRIES_PER_PLUGIN = 1_000; export const MAX_PLUGIN_STATE_VALUE_BYTES = 65_536; -export const MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN = MAX_ENTRIES_PER_PLUGIN; +export const DEFAULT_MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN = MAX_ENTRIES_PER_PLUGIN; +export const MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN = 50_000; type PluginStateRow = { plugin_id: string; @@ -386,6 +387,7 @@ function enforcePostRegisterLimits(params: { pluginId: string; namespace: string; maxEntries: number; + maxPluginEntries?: number; now: number; }): void { const namespaceCount = countRow( @@ -404,16 +406,17 @@ function enforcePostRegisterLimits(params: { ); } + const maxPluginEntries = params.maxPluginEntries ?? DEFAULT_MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN; const pluginCount = countRow( params.store.statements.countLivePlugin.get(params.pluginId, params.now) as | CountRow | undefined, ); - if (pluginCount > MAX_ENTRIES_PER_PLUGIN) { + if (pluginCount > maxPluginEntries) { throw createPluginStateError({ code: "PLUGIN_STATE_LIMIT_EXCEEDED", operation: "register", - message: `Plugin state for ${params.pluginId} exceeds the ${MAX_ENTRIES_PER_PLUGIN} live row limit.`, + message: `Plugin state for ${params.pluginId} exceeds the ${maxPluginEntries} live row limit.`, path: params.store.path, }); } @@ -425,6 +428,7 @@ export function pluginStateRegister(params: { key: string; valueJson: string; maxEntries: number; + maxPluginEntries?: number; ttlMs?: number; }): void { try { @@ -445,6 +449,7 @@ export function pluginStateRegister(params: { pluginId: params.pluginId, namespace: params.namespace, maxEntries: params.maxEntries, + maxPluginEntries: params.maxPluginEntries, now, }); }); @@ -464,6 +469,7 @@ export function pluginStateRegisterIfAbsent(params: { key: string; valueJson: string; maxEntries: number; + maxPluginEntries?: number; ttlMs?: number; }): boolean { try { @@ -487,6 +493,7 @@ export function pluginStateRegisterIfAbsent(params: { pluginId: params.pluginId, namespace: params.namespace, maxEntries: params.maxEntries, + maxPluginEntries: params.maxPluginEntries, now, }); return true; diff --git a/src/plugin-state/plugin-state-store.test.ts b/src/plugin-state/plugin-state-store.test.ts index 94521be7af1..1f1780a453b 100644 --- a/src/plugin-state/plugin-state-store.test.ts +++ b/src/plugin-state/plugin-state-store.test.ts @@ -301,6 +301,27 @@ describe("plugin state keyed store", () => { }); }); + it("allows a bounded per-plugin live row ceiling increase", async () => { + await withOpenClawTestState({ label: "plugin-state-plugin-limit-override" }, async () => { + seedPluginStateEntriesForTests([ + ...Array.from({ length: 1_000 }, (_, entryIndex) => ({ + pluginId: "bluebubbles", + namespace: "dedupe", + key: `k-${entryIndex}`, + value: { entryIndex }, + })), + ]); + + const store = createPluginStateKeyedStore("bluebubbles", { + namespace: "dedupe", + maxEntries: 1_001, + maxPluginEntries: 1_001, + }); + await expect(store.registerIfAbsent("overflow", { overflow: true })).resolves.toBe(true); + await expect(store.lookup("overflow")).resolves.toEqual({ overflow: true }); + }); + }); + it("segregates plugins sharing a namespace and key", async () => { await withOpenClawTestState({ label: "plugin-state-segregation" }, async () => { const discord = createPluginStateKeyedStore("discord", { namespace: "same", maxEntries: 10 }); @@ -325,6 +346,20 @@ describe("plugin state keyed store", () => { expect(() => createPluginStateKeyedStore("discord", { namespace: "bad-max", maxEntries: 0 }), ).toThrow(PluginStateStoreError); + expect(() => + createPluginStateKeyedStore("discord", { + namespace: "bad-plugin-max", + maxEntries: 10, + maxPluginEntries: 9, + }), + ).toThrow(PluginStateStoreError); + expect(() => + createPluginStateKeyedStore("discord", { + namespace: "too-large-plugin-max", + maxEntries: 10, + maxPluginEntries: 50_001, + }), + ).toThrow(PluginStateStoreError); const store = createPluginStateKeyedStore("discord", { namespace: "valid", maxEntries: 10 }); await expect(store.register(" ", { ok: true })).rejects.toThrow(PluginStateStoreError); diff --git a/src/plugin-state/plugin-state-store.ts b/src/plugin-state/plugin-state-store.ts index 5ae358cedb9..9c887be8806 100644 --- a/src/plugin-state/plugin-state-store.ts +++ b/src/plugin-state/plugin-state-store.ts @@ -1,5 +1,6 @@ import { closePluginStateSqliteStore, + MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN, MAX_PLUGIN_STATE_VALUE_BYTES, pluginStateClear, pluginStateConsume, @@ -42,6 +43,7 @@ const MAX_JSON_DEPTH = 64; type StoreOptionSignature = { maxEntries: number; defaultTtlMs?: number; + maxPluginEntries?: number; }; const namespaceOptionSignatures = new Map(); @@ -93,7 +95,8 @@ function validateMaxEntries(value: number): number { return value; } -function validateOptionalTtlMs( +function validateOptionalPositiveInteger( + label: string, value: number | undefined, operation: PluginStateStoreOperation = "register", ): number | undefined { @@ -101,7 +104,7 @@ function validateOptionalTtlMs( return undefined; } if (!Number.isInteger(value) || value < 1) { - throw invalidInput("plugin state ttlMs must be a positive integer", operation); + throw invalidInput(`plugin state ${label} must be a positive integer`, operation); } return value; } @@ -200,7 +203,8 @@ function assertConsistentOptions( } if ( existing.maxEntries !== signature.maxEntries || - existing.defaultTtlMs !== signature.defaultTtlMs + existing.defaultTtlMs !== signature.defaultTtlMs || + existing.maxPluginEntries !== signature.maxPluginEntries ) { throw invalidInput( `plugin state namespace ${namespace} for ${pluginId} was reopened with incompatible options`, @@ -215,8 +219,22 @@ function createKeyedStoreForPluginId( ): PluginStateKeyedStore { const namespace = validateNamespace(options.namespace); const maxEntries = validateMaxEntries(options.maxEntries); - const defaultTtlMs = validateOptionalTtlMs(options.defaultTtlMs); - assertConsistentOptions(pluginId, namespace, { maxEntries, defaultTtlMs }); + const defaultTtlMs = validateOptionalPositiveInteger("ttlMs", options.defaultTtlMs); + const maxPluginEntries = validateOptionalPositiveInteger( + "maxPluginEntries", + options.maxPluginEntries, + "open", + ); + if (maxPluginEntries != null && maxPluginEntries < maxEntries) { + throw invalidInput("plugin state maxPluginEntries must be >= maxEntries", "open"); + } + if (maxPluginEntries != null && maxPluginEntries > MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN) { + throw invalidInput( + `plugin state maxPluginEntries must be <= ${MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN}`, + "open", + ); + } + assertConsistentOptions(pluginId, namespace, { maxEntries, defaultTtlMs, maxPluginEntries }); const prepareRegisterParams = ( key: string, @@ -227,7 +245,7 @@ function createKeyedStoreForPluginId( assertJsonSerializable(value); const json = JSON.stringify(value); assertValueSize(json); - const ttlMs = validateOptionalTtlMs(opts?.ttlMs, "register") ?? defaultTtlMs; + const ttlMs = validateOptionalPositiveInteger("ttlMs", opts?.ttlMs, "register") ?? defaultTtlMs; return { key: normalizedKey, valueJson: json, @@ -244,6 +262,7 @@ function createKeyedStoreForPluginId( key: params.key, valueJson: params.valueJson, maxEntries, + ...(maxPluginEntries != null ? { maxPluginEntries } : {}), ...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}), }); }, @@ -255,6 +274,7 @@ function createKeyedStoreForPluginId( key: params.key, valueJson: params.valueJson, maxEntries, + ...(maxPluginEntries != null ? { maxPluginEntries } : {}), ...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}), }); }, diff --git a/src/plugin-state/plugin-state-store.types.ts b/src/plugin-state/plugin-state-store.types.ts index 9c07e95a7b3..3e12cb7cd04 100644 --- a/src/plugin-state/plugin-state-store.types.ts +++ b/src/plugin-state/plugin-state-store.types.ts @@ -19,6 +19,7 @@ export type OpenKeyedStoreOptions = { namespace: string; maxEntries: number; defaultTtlMs?: number; + maxPluginEntries?: number; }; export type PluginStateStoreErrorCode =