mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-06 23:55:12 +00:00
feat(plugin-state): add atomic dedupe claims
This commit is contained in:
@@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Changes
|
||||
|
||||
- BlueBubbles: add an experimental opt-in that stores inbound replay dedupe claims in the SQLite-backed plugin runtime state store while preserving the existing file-backed dedupe default. Thanks @amknight.
|
||||
- Control UI/header: show the active agent name in dashboard breadcrumbs without adding the current session key, keeping non-chat views oriented without crowding the topbar.
|
||||
- Control UI/cron: make the New Job sidebar collapsible so the jobs list can reclaim space while keeping the form one click away. Thanks @BunsDev.
|
||||
- Gateway/startup: keep model-catalog test helpers, run-session lookup code, QR pairing helpers, and TypeBox memory-tool schema construction out of hot startup import paths, reducing default gateway benchmark plugin-load and memory pressure.
|
||||
|
||||
@@ -575,6 +575,24 @@ Full configuration: [Configuration](/gateway/configuration)
|
||||
- `channels.bluebubbles.dmHistoryLimit`: DM history limit.
|
||||
- `channels.bluebubbles.replyContextApiFallback`: When an inbound reply lands without `replyToBody`/`replyToSender` and the in-memory reply-context cache misses, fetch the original message from the BlueBubbles HTTP API as a best-effort fallback (default: `false`). Useful for multi-instance deployments sharing one BlueBubbles account, after process restarts, or after long-lived TTL/LRU cache eviction. The fetch is SSRF-guarded by the same policy as every other BlueBubbles client request, never throws, and populates the cache so subsequent replies amortize. Per-account override: `channels.bluebubbles.accounts.<accountId>.replyContextApiFallback`. A channel-level setting propagates to accounts that omit the flag.
|
||||
|
||||
</Accordion>
|
||||
<Accordion title="Experimental plugin state">
|
||||
- `plugins.entries.bluebubbles.config.experimentalPersistentState`: Opt in to the experimental SQLite-backed plugin runtime state store for inbound webhook dedupe. Default: `false` (the existing file-backed dedupe remains the safe default). When enabled, BlueBubbles uses an atomic durable claim for each inbound GUID, keeps the same 7-day TTL, and requests a 50,000-row plugin-state capacity so high-volume replay dedupe is not reduced by the runtime state's default 1,000-row cap.
|
||||
|
||||
```json5
|
||||
{
|
||||
plugins: {
|
||||
entries: {
|
||||
bluebubbles: {
|
||||
config: {
|
||||
experimentalPersistentState: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
</Accordion>
|
||||
<Accordion title="Actions and accounts">
|
||||
- `channels.bluebubbles.actions`: Enable/disable specific actions.
|
||||
|
||||
@@ -423,7 +423,7 @@ Provider and channel execution paths must use the active runtime config snapshot
|
||||
await store.clear();
|
||||
```
|
||||
|
||||
Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Use `registerIfAbsent(...)` for atomic dedupe claims: it returns `true` when the key was missing or expired and registered, or `false` when a live value already exists without overwriting its value, creation time, or TTL. Limits: `maxEntries` per namespace, 1,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry.
|
||||
Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Use `registerIfAbsent(...)` for atomic dedupe claims: it returns `true` when the key was missing or expired and registered, or `false` when a live value already exists without overwriting its value, creation time, or TTL. Limits: `maxEntries` per namespace, 1,000 live rows per plugin by default, an optional bounded `maxPluginEntries` override for high-volume stores, JSON values under 64KB, and optional TTL expiry.
|
||||
|
||||
<Warning>
|
||||
Bundled plugins only in this release.
|
||||
|
||||
@@ -7,6 +7,8 @@
|
||||
"configSchema": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"properties": {}
|
||||
"properties": {
|
||||
"experimentalPersistentState": { "type": "boolean" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,74 @@
|
||||
import { beforeEach, describe, expect, it } from "vitest";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
_resetBlueBubblesInboundDedupForTest,
|
||||
claimBlueBubblesInboundMessage,
|
||||
commitBlueBubblesCoalescedMessageIds,
|
||||
resolveBlueBubblesInboundDedupeKey,
|
||||
} from "./inbound-dedupe.js";
|
||||
import type { PluginRuntime } from "./runtime-api.js";
|
||||
import { clearBlueBubblesRuntime, setBlueBubblesRuntime } from "./runtime.js";
|
||||
|
||||
type RuntimeStateRecord = { status: "claimed" | "committed"; at: number };
|
||||
|
||||
type RuntimeStateStore = {
|
||||
register: (key: string, value: RuntimeStateRecord, opts?: { ttlMs?: number }) => Promise<void>;
|
||||
registerIfAbsent: (
|
||||
key: string,
|
||||
value: RuntimeStateRecord,
|
||||
opts?: { ttlMs?: number },
|
||||
) => Promise<boolean>;
|
||||
lookup: (key: string) => Promise<RuntimeStateRecord | undefined>;
|
||||
delete: (key: string) => Promise<boolean>;
|
||||
entries: () => Promise<unknown[]>;
|
||||
clear: () => Promise<void>;
|
||||
};
|
||||
|
||||
function createMemoryRuntimeStateStore(): RuntimeStateStore {
|
||||
const entries = new Map<string, RuntimeStateRecord>();
|
||||
return {
|
||||
async register(key, value) {
|
||||
entries.set(key, value);
|
||||
},
|
||||
async registerIfAbsent(key, value) {
|
||||
if (entries.has(key)) {
|
||||
return false;
|
||||
}
|
||||
entries.set(key, value);
|
||||
return true;
|
||||
},
|
||||
async lookup(key) {
|
||||
return entries.get(key);
|
||||
},
|
||||
async delete(key) {
|
||||
return entries.delete(key);
|
||||
},
|
||||
async entries() {
|
||||
return [...entries.entries()];
|
||||
},
|
||||
async clear() {
|
||||
entries.clear();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function installRuntimeStateStub(enabled: boolean, store = createMemoryRuntimeStateStore()) {
|
||||
const openKeyedStore = vi.fn(() => store);
|
||||
setBlueBubblesRuntime({
|
||||
config: {
|
||||
current: () => ({
|
||||
plugins: {
|
||||
entries: {
|
||||
bluebubbles: {
|
||||
config: { experimentalPersistentState: enabled },
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
},
|
||||
state: { openKeyedStore },
|
||||
} as unknown as PluginRuntime);
|
||||
return { openKeyedStore, store };
|
||||
}
|
||||
|
||||
async function claimAndFinalize(guid: string | undefined, accountId: string): Promise<string> {
|
||||
const claim = await claimBlueBubblesInboundMessage({ guid, accountId });
|
||||
@@ -16,6 +80,7 @@ async function claimAndFinalize(guid: string | undefined, accountId: string): Pr
|
||||
|
||||
describe("claimBlueBubblesInboundMessage", () => {
|
||||
beforeEach(() => {
|
||||
clearBlueBubblesRuntime();
|
||||
_resetBlueBubblesInboundDedupForTest();
|
||||
});
|
||||
|
||||
@@ -24,6 +89,56 @@ describe("claimBlueBubblesInboundMessage", () => {
|
||||
expect(await claimAndFinalize("g1", "acc")).toBe("duplicate");
|
||||
});
|
||||
|
||||
it("keeps file-backed dedupe as the default when persistent state is not opted in", async () => {
|
||||
const { openKeyedStore } = installRuntimeStateStub(false);
|
||||
|
||||
expect(await claimAndFinalize("g-default", "acc")).toBe("claimed");
|
||||
expect(await claimAndFinalize("g-default", "acc")).toBe("duplicate");
|
||||
expect(openKeyedStore).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("uses runtime state registerIfAbsent when persistent state is opted in", async () => {
|
||||
const { openKeyedStore, store } = installRuntimeStateStub(true);
|
||||
const registerIfAbsent = vi.spyOn(store, "registerIfAbsent");
|
||||
|
||||
const first = await claimBlueBubblesInboundMessage({ guid: "g-sqlite", accountId: "acc" });
|
||||
expect(first.kind).toBe("claimed");
|
||||
const second = await claimBlueBubblesInboundMessage({ guid: "g-sqlite", accountId: "acc" });
|
||||
expect(second.kind).toBe("inflight");
|
||||
if (first.kind === "claimed") {
|
||||
await first.finalize();
|
||||
}
|
||||
|
||||
expect(
|
||||
(await claimBlueBubblesInboundMessage({ guid: "g-sqlite", accountId: "acc" })).kind,
|
||||
).toBe("duplicate");
|
||||
expect(openKeyedStore).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
namespace: expect.stringMatching(/^inbound-dedupe\./),
|
||||
maxEntries: 50_000,
|
||||
maxPluginEntries: 50_000,
|
||||
}),
|
||||
);
|
||||
expect(registerIfAbsent).toHaveBeenCalledWith(
|
||||
"g-sqlite",
|
||||
expect.objectContaining({ status: "claimed" }),
|
||||
{ ttlMs: 7 * 24 * 60 * 60 * 1_000 },
|
||||
);
|
||||
});
|
||||
|
||||
it("releases opted-in runtime state claims so later replays can retry", async () => {
|
||||
installRuntimeStateStub(true);
|
||||
|
||||
const first = await claimBlueBubblesInboundMessage({ guid: "g-release", accountId: "acc" });
|
||||
expect(first.kind).toBe("claimed");
|
||||
if (first.kind === "claimed") {
|
||||
first.release();
|
||||
}
|
||||
expect(
|
||||
(await claimBlueBubblesInboundMessage({ guid: "g-release", accountId: "acc" })).kind,
|
||||
).toBe("claimed");
|
||||
});
|
||||
|
||||
it("scopes dedupe per account", async () => {
|
||||
expect(await claimAndFinalize("g1", "a")).toBe("claimed");
|
||||
expect(await claimAndFinalize("g1", "b")).toBe("claimed");
|
||||
@@ -61,6 +176,7 @@ describe("claimBlueBubblesInboundMessage", () => {
|
||||
|
||||
describe("commitBlueBubblesCoalescedMessageIds", () => {
|
||||
beforeEach(() => {
|
||||
clearBlueBubblesRuntime();
|
||||
_resetBlueBubblesInboundDedupForTest();
|
||||
});
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import { type ClaimableDedupe, createClaimableDedupe } from "openclaw/plugin-sdk
|
||||
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
|
||||
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path";
|
||||
import type { NormalizedWebhookMessage } from "./monitor-normalize.js";
|
||||
import { tryGetBlueBubblesRuntime } from "./runtime.js";
|
||||
|
||||
// BlueBubbles has no sequence/ack in its webhook protocol, and its
|
||||
// MessagePoller replays its ~1-week lookback window as `new-message` events
|
||||
@@ -17,6 +18,7 @@ import type { NormalizedWebhookMessage } from "./monitor-normalize.js";
|
||||
const DEDUP_TTL_MS = 7 * 24 * 60 * 60 * 1_000;
|
||||
const MEMORY_MAX_SIZE = 5_000;
|
||||
const FILE_MAX_ENTRIES = 50_000;
|
||||
const SQLITE_NAMESPACE_PREFIX = "inbound-dedupe";
|
||||
// Cap GUID length so a malformed or hostile payload can't bloat the on-disk
|
||||
// dedupe file. Real BB GUIDs are short (<64 chars); 512 is generous.
|
||||
const MAX_GUID_CHARS = 512;
|
||||
@@ -100,8 +102,89 @@ function buildMemoryOnlyImpl(): ClaimableDedupe {
|
||||
});
|
||||
}
|
||||
|
||||
type BlueBubblesSqliteDedupeRecord = {
|
||||
status: "claimed" | "committed";
|
||||
at: number;
|
||||
};
|
||||
|
||||
type BlueBubblesSqliteDedupeStore = {
|
||||
register(
|
||||
key: string,
|
||||
value: BlueBubblesSqliteDedupeRecord,
|
||||
opts?: { ttlMs?: number },
|
||||
): Promise<void>;
|
||||
registerIfAbsent(
|
||||
key: string,
|
||||
value: BlueBubblesSqliteDedupeRecord,
|
||||
opts?: { ttlMs?: number },
|
||||
): Promise<boolean>;
|
||||
lookup(key: string): Promise<BlueBubblesSqliteDedupeRecord | undefined>;
|
||||
delete(key: string): Promise<boolean>;
|
||||
};
|
||||
|
||||
const sqliteInflightClaims = new Set<string>();
|
||||
const sqliteStores = new Map<string, BlueBubblesSqliteDedupeStore>();
|
||||
let impl: ClaimableDedupe = buildPersistentImpl();
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return Boolean(value && typeof value === "object");
|
||||
}
|
||||
|
||||
function isExperimentalPersistentStateEnabled(): boolean {
|
||||
const runtime = tryGetBlueBubblesRuntime();
|
||||
if (!runtime) {
|
||||
return false;
|
||||
}
|
||||
const cfg = runtime.config.current() as unknown;
|
||||
if (!isRecord(cfg)) {
|
||||
return false;
|
||||
}
|
||||
const plugins = cfg.plugins;
|
||||
if (!isRecord(plugins)) {
|
||||
return false;
|
||||
}
|
||||
const entries = plugins.entries;
|
||||
if (!isRecord(entries)) {
|
||||
return false;
|
||||
}
|
||||
const entry = entries.bluebubbles;
|
||||
if (!isRecord(entry)) {
|
||||
return false;
|
||||
}
|
||||
const pluginConfig = entry.config;
|
||||
return isRecord(pluginConfig) && pluginConfig.experimentalPersistentState === true;
|
||||
}
|
||||
|
||||
function resolveSqliteNamespace(accountId: string): string {
|
||||
const safePrefix = accountId.replace(/[^a-zA-Z0-9_-]/g, "_").slice(0, 48) || "account";
|
||||
const hash = createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 12);
|
||||
return `${SQLITE_NAMESPACE_PREFIX}.${safePrefix}_${hash}`;
|
||||
}
|
||||
|
||||
function getSqliteStore(accountId: string): BlueBubblesSqliteDedupeStore {
|
||||
const namespace = resolveSqliteNamespace(accountId);
|
||||
const existing = sqliteStores.get(namespace);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const runtime = tryGetBlueBubblesRuntime();
|
||||
if (!runtime) {
|
||||
throw new Error("BlueBubbles runtime not initialized");
|
||||
}
|
||||
const store = runtime.state.openKeyedStore<BlueBubblesSqliteDedupeRecord>({
|
||||
namespace,
|
||||
maxEntries: FILE_MAX_ENTRIES,
|
||||
maxPluginEntries: FILE_MAX_ENTRIES,
|
||||
defaultTtlMs: DEDUP_TTL_MS,
|
||||
});
|
||||
sqliteStores.set(namespace, store);
|
||||
return store;
|
||||
}
|
||||
|
||||
function resolveSqliteScopedClaimKey(accountId: string, guid: string): string {
|
||||
return `${accountId}\0${guid}`;
|
||||
}
|
||||
|
||||
function sanitizeGuid(guid: string | undefined | null): string | null {
|
||||
const trimmed = guid?.trim();
|
||||
if (!trimmed) {
|
||||
@@ -186,6 +269,14 @@ export async function claimBlueBubblesInboundMessage(params: {
|
||||
if (!normalized) {
|
||||
return { kind: "skip" };
|
||||
}
|
||||
if (isExperimentalPersistentStateEnabled()) {
|
||||
return claimBlueBubblesSqliteInboundMessage({
|
||||
guid: normalized,
|
||||
accountId: params.accountId,
|
||||
onDiskError: params.onDiskError,
|
||||
});
|
||||
}
|
||||
|
||||
const claim = await impl.claim(normalized, {
|
||||
namespace: params.accountId,
|
||||
onDiskError: params.onDiskError,
|
||||
@@ -210,6 +301,87 @@ export async function claimBlueBubblesInboundMessage(params: {
|
||||
};
|
||||
}
|
||||
|
||||
async function claimBlueBubblesSqliteInboundMessage(params: {
|
||||
guid: string;
|
||||
accountId: string;
|
||||
onDiskError?: (error: unknown) => void;
|
||||
}): Promise<InboundDedupeClaim> {
|
||||
const scopedClaimKey = resolveSqliteScopedClaimKey(params.accountId, params.guid);
|
||||
if (sqliteInflightClaims.has(scopedClaimKey)) {
|
||||
return { kind: "inflight" };
|
||||
}
|
||||
sqliteInflightClaims.add(scopedClaimKey);
|
||||
try {
|
||||
const store = getSqliteStore(params.accountId);
|
||||
const inserted = await store.registerIfAbsent(
|
||||
params.guid,
|
||||
{ status: "claimed", at: Date.now() },
|
||||
{ ttlMs: DEDUP_TTL_MS },
|
||||
);
|
||||
if (!inserted) {
|
||||
sqliteInflightClaims.delete(scopedClaimKey);
|
||||
return { kind: "duplicate" };
|
||||
}
|
||||
return {
|
||||
kind: "claimed",
|
||||
finalize: async () => {
|
||||
try {
|
||||
await store.register(
|
||||
params.guid,
|
||||
{ status: "committed", at: Date.now() },
|
||||
{ ttlMs: DEDUP_TTL_MS },
|
||||
);
|
||||
} catch (error) {
|
||||
params.onDiskError?.(error);
|
||||
} finally {
|
||||
sqliteInflightClaims.delete(scopedClaimKey);
|
||||
}
|
||||
},
|
||||
release: () => {
|
||||
sqliteInflightClaims.delete(scopedClaimKey);
|
||||
void store.delete(params.guid).catch(params.onDiskError);
|
||||
},
|
||||
};
|
||||
} catch (error) {
|
||||
sqliteInflightClaims.delete(scopedClaimKey);
|
||||
params.onDiskError?.(error);
|
||||
return claimBlueBubblesFileBackedInboundMessage({
|
||||
guid: params.guid,
|
||||
accountId: params.accountId,
|
||||
onDiskError: params.onDiskError,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function claimBlueBubblesFileBackedInboundMessage(params: {
|
||||
guid: string;
|
||||
accountId: string;
|
||||
onDiskError?: (error: unknown) => void;
|
||||
}): Promise<InboundDedupeClaim> {
|
||||
const claim = await impl.claim(params.guid, {
|
||||
namespace: params.accountId,
|
||||
onDiskError: params.onDiskError,
|
||||
});
|
||||
if (claim.kind === "duplicate") {
|
||||
return { kind: "duplicate" };
|
||||
}
|
||||
if (claim.kind === "inflight") {
|
||||
return { kind: "inflight" };
|
||||
}
|
||||
return {
|
||||
kind: "claimed",
|
||||
finalize: async () => {
|
||||
await impl.commit(params.guid, {
|
||||
namespace: params.accountId,
|
||||
onDiskError: params.onDiskError,
|
||||
});
|
||||
},
|
||||
release: () => {
|
||||
impl.release(params.guid, { namespace: params.accountId });
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a set of source messageIds as already processed, without going through
|
||||
* the `claim()` protocol. Intended for the coalesced-batch case: when the
|
||||
@@ -232,6 +404,19 @@ export async function commitBlueBubblesCoalescedMessageIds(params: {
|
||||
if (!normalized) {
|
||||
continue;
|
||||
}
|
||||
if (isExperimentalPersistentStateEnabled()) {
|
||||
try {
|
||||
const store = getSqliteStore(params.accountId);
|
||||
await store.registerIfAbsent(
|
||||
normalized,
|
||||
{ status: "committed", at: Date.now() },
|
||||
{ ttlMs: DEDUP_TTL_MS },
|
||||
);
|
||||
continue;
|
||||
} catch (error) {
|
||||
params.onDiskError?.(error);
|
||||
}
|
||||
}
|
||||
await impl.commit(normalized, {
|
||||
namespace: params.accountId,
|
||||
onDiskError: params.onDiskError,
|
||||
@@ -246,6 +431,10 @@ export async function commitBlueBubblesCoalescedMessageIds(params: {
|
||||
* file-naming convention changed between versions.
|
||||
*/
|
||||
export async function warmupBlueBubblesInboundDedupe(accountId: string): Promise<void> {
|
||||
if (isExperimentalPersistentStateEnabled()) {
|
||||
getSqliteStore(accountId);
|
||||
return;
|
||||
}
|
||||
// Trigger the migration side-effect inside resolveNamespaceFilePath.
|
||||
resolveNamespaceFilePath(accountId);
|
||||
await impl.warmup(accountId);
|
||||
@@ -258,4 +447,6 @@ export async function warmupBlueBubblesInboundDedupe(accountId: string): Promise
|
||||
*/
|
||||
export function _resetBlueBubblesInboundDedupForTest(): void {
|
||||
impl = buildMemoryOnlyImpl();
|
||||
sqliteInflightClaims.clear();
|
||||
sqliteStores.clear();
|
||||
}
|
||||
|
||||
@@ -16,6 +16,10 @@ export function getBlueBubblesRuntime(): PluginRuntime {
|
||||
return runtimeStore.getRuntime();
|
||||
}
|
||||
|
||||
export function tryGetBlueBubblesRuntime(): PluginRuntime | null {
|
||||
return runtimeStore.tryGetRuntime();
|
||||
}
|
||||
|
||||
export function warnBlueBubbles(message: string): void {
|
||||
const formatted = `[bluebubbles] ${message}`;
|
||||
// Backward-compatible with tests/legacy injections that pass { log }.
|
||||
|
||||
@@ -11,7 +11,7 @@ import {
|
||||
sweepExpiredPluginStateEntries,
|
||||
} from "./plugin-state-store.js";
|
||||
import { resolvePluginStateDir, resolvePluginStateSqlitePath } from "./plugin-state-store.paths.js";
|
||||
import { MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN } from "./plugin-state-store.sqlite.js";
|
||||
import { DEFAULT_MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN } from "./plugin-state-store.sqlite.js";
|
||||
import { seedPluginStateEntriesForTests } from "./plugin-state-store.test-helpers.js";
|
||||
|
||||
afterEach(() => {
|
||||
@@ -196,12 +196,12 @@ describe("limits", () => {
|
||||
|
||||
it("enforces the per-plugin live-row cap", async () => {
|
||||
await withOpenClawTestState({ label: "e2e-limit-plugin" }, async () => {
|
||||
// Spread MAX_ENTRIES_PER_PLUGIN rows across several namespaces so
|
||||
// namespace eviction never fires (each namespace has generous room).
|
||||
// Spread the default plugin cap across several namespaces so namespace
|
||||
// eviction never fires (each namespace has generous room).
|
||||
const nsCount = 10;
|
||||
const perNs = MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN / nsCount; // 100
|
||||
const perNs = DEFAULT_MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN / nsCount; // 100
|
||||
seedPluginStateEntriesForTests(
|
||||
Array.from({ length: MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN }, (_, index) => {
|
||||
Array.from({ length: DEFAULT_MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN }, (_, index) => {
|
||||
const ns = Math.floor(index / perNs);
|
||||
const k = index % perNs;
|
||||
return {
|
||||
|
||||
@@ -19,7 +19,8 @@ const PLUGIN_STATE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
|
||||
const MAX_ENTRIES_PER_PLUGIN = 1_000;
|
||||
|
||||
export const MAX_PLUGIN_STATE_VALUE_BYTES = 65_536;
|
||||
export const MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN = MAX_ENTRIES_PER_PLUGIN;
|
||||
export const DEFAULT_MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN = MAX_ENTRIES_PER_PLUGIN;
|
||||
export const MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN = 50_000;
|
||||
|
||||
type PluginStateRow = {
|
||||
plugin_id: string;
|
||||
@@ -386,6 +387,7 @@ function enforcePostRegisterLimits(params: {
|
||||
pluginId: string;
|
||||
namespace: string;
|
||||
maxEntries: number;
|
||||
maxPluginEntries?: number;
|
||||
now: number;
|
||||
}): void {
|
||||
const namespaceCount = countRow(
|
||||
@@ -404,16 +406,17 @@ function enforcePostRegisterLimits(params: {
|
||||
);
|
||||
}
|
||||
|
||||
const maxPluginEntries = params.maxPluginEntries ?? DEFAULT_MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN;
|
||||
const pluginCount = countRow(
|
||||
params.store.statements.countLivePlugin.get(params.pluginId, params.now) as
|
||||
| CountRow
|
||||
| undefined,
|
||||
);
|
||||
if (pluginCount > MAX_ENTRIES_PER_PLUGIN) {
|
||||
if (pluginCount > maxPluginEntries) {
|
||||
throw createPluginStateError({
|
||||
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
|
||||
operation: "register",
|
||||
message: `Plugin state for ${params.pluginId} exceeds the ${MAX_ENTRIES_PER_PLUGIN} live row limit.`,
|
||||
message: `Plugin state for ${params.pluginId} exceeds the ${maxPluginEntries} live row limit.`,
|
||||
path: params.store.path,
|
||||
});
|
||||
}
|
||||
@@ -425,6 +428,7 @@ export function pluginStateRegister(params: {
|
||||
key: string;
|
||||
valueJson: string;
|
||||
maxEntries: number;
|
||||
maxPluginEntries?: number;
|
||||
ttlMs?: number;
|
||||
}): void {
|
||||
try {
|
||||
@@ -445,6 +449,7 @@ export function pluginStateRegister(params: {
|
||||
pluginId: params.pluginId,
|
||||
namespace: params.namespace,
|
||||
maxEntries: params.maxEntries,
|
||||
maxPluginEntries: params.maxPluginEntries,
|
||||
now,
|
||||
});
|
||||
});
|
||||
@@ -464,6 +469,7 @@ export function pluginStateRegisterIfAbsent(params: {
|
||||
key: string;
|
||||
valueJson: string;
|
||||
maxEntries: number;
|
||||
maxPluginEntries?: number;
|
||||
ttlMs?: number;
|
||||
}): boolean {
|
||||
try {
|
||||
@@ -487,6 +493,7 @@ export function pluginStateRegisterIfAbsent(params: {
|
||||
pluginId: params.pluginId,
|
||||
namespace: params.namespace,
|
||||
maxEntries: params.maxEntries,
|
||||
maxPluginEntries: params.maxPluginEntries,
|
||||
now,
|
||||
});
|
||||
return true;
|
||||
|
||||
@@ -301,6 +301,27 @@ describe("plugin state keyed store", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("allows a bounded per-plugin live row ceiling increase", async () => {
|
||||
await withOpenClawTestState({ label: "plugin-state-plugin-limit-override" }, async () => {
|
||||
seedPluginStateEntriesForTests([
|
||||
...Array.from({ length: 1_000 }, (_, entryIndex) => ({
|
||||
pluginId: "bluebubbles",
|
||||
namespace: "dedupe",
|
||||
key: `k-${entryIndex}`,
|
||||
value: { entryIndex },
|
||||
})),
|
||||
]);
|
||||
|
||||
const store = createPluginStateKeyedStore("bluebubbles", {
|
||||
namespace: "dedupe",
|
||||
maxEntries: 1_001,
|
||||
maxPluginEntries: 1_001,
|
||||
});
|
||||
await expect(store.registerIfAbsent("overflow", { overflow: true })).resolves.toBe(true);
|
||||
await expect(store.lookup("overflow")).resolves.toEqual({ overflow: true });
|
||||
});
|
||||
});
|
||||
|
||||
it("segregates plugins sharing a namespace and key", async () => {
|
||||
await withOpenClawTestState({ label: "plugin-state-segregation" }, async () => {
|
||||
const discord = createPluginStateKeyedStore("discord", { namespace: "same", maxEntries: 10 });
|
||||
@@ -325,6 +346,20 @@ describe("plugin state keyed store", () => {
|
||||
expect(() =>
|
||||
createPluginStateKeyedStore("discord", { namespace: "bad-max", maxEntries: 0 }),
|
||||
).toThrow(PluginStateStoreError);
|
||||
expect(() =>
|
||||
createPluginStateKeyedStore("discord", {
|
||||
namespace: "bad-plugin-max",
|
||||
maxEntries: 10,
|
||||
maxPluginEntries: 9,
|
||||
}),
|
||||
).toThrow(PluginStateStoreError);
|
||||
expect(() =>
|
||||
createPluginStateKeyedStore("discord", {
|
||||
namespace: "too-large-plugin-max",
|
||||
maxEntries: 10,
|
||||
maxPluginEntries: 50_001,
|
||||
}),
|
||||
).toThrow(PluginStateStoreError);
|
||||
|
||||
const store = createPluginStateKeyedStore("discord", { namespace: "valid", maxEntries: 10 });
|
||||
await expect(store.register(" ", { ok: true })).rejects.toThrow(PluginStateStoreError);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import {
|
||||
closePluginStateSqliteStore,
|
||||
MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN,
|
||||
MAX_PLUGIN_STATE_VALUE_BYTES,
|
||||
pluginStateClear,
|
||||
pluginStateConsume,
|
||||
@@ -42,6 +43,7 @@ const MAX_JSON_DEPTH = 64;
|
||||
type StoreOptionSignature = {
|
||||
maxEntries: number;
|
||||
defaultTtlMs?: number;
|
||||
maxPluginEntries?: number;
|
||||
};
|
||||
|
||||
const namespaceOptionSignatures = new Map<string, StoreOptionSignature>();
|
||||
@@ -93,7 +95,8 @@ function validateMaxEntries(value: number): number {
|
||||
return value;
|
||||
}
|
||||
|
||||
function validateOptionalTtlMs(
|
||||
function validateOptionalPositiveInteger(
|
||||
label: string,
|
||||
value: number | undefined,
|
||||
operation: PluginStateStoreOperation = "register",
|
||||
): number | undefined {
|
||||
@@ -101,7 +104,7 @@ function validateOptionalTtlMs(
|
||||
return undefined;
|
||||
}
|
||||
if (!Number.isInteger(value) || value < 1) {
|
||||
throw invalidInput("plugin state ttlMs must be a positive integer", operation);
|
||||
throw invalidInput(`plugin state ${label} must be a positive integer`, operation);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
@@ -200,7 +203,8 @@ function assertConsistentOptions(
|
||||
}
|
||||
if (
|
||||
existing.maxEntries !== signature.maxEntries ||
|
||||
existing.defaultTtlMs !== signature.defaultTtlMs
|
||||
existing.defaultTtlMs !== signature.defaultTtlMs ||
|
||||
existing.maxPluginEntries !== signature.maxPluginEntries
|
||||
) {
|
||||
throw invalidInput(
|
||||
`plugin state namespace ${namespace} for ${pluginId} was reopened with incompatible options`,
|
||||
@@ -215,8 +219,22 @@ function createKeyedStoreForPluginId<T>(
|
||||
): PluginStateKeyedStore<T> {
|
||||
const namespace = validateNamespace(options.namespace);
|
||||
const maxEntries = validateMaxEntries(options.maxEntries);
|
||||
const defaultTtlMs = validateOptionalTtlMs(options.defaultTtlMs);
|
||||
assertConsistentOptions(pluginId, namespace, { maxEntries, defaultTtlMs });
|
||||
const defaultTtlMs = validateOptionalPositiveInteger("ttlMs", options.defaultTtlMs);
|
||||
const maxPluginEntries = validateOptionalPositiveInteger(
|
||||
"maxPluginEntries",
|
||||
options.maxPluginEntries,
|
||||
"open",
|
||||
);
|
||||
if (maxPluginEntries != null && maxPluginEntries < maxEntries) {
|
||||
throw invalidInput("plugin state maxPluginEntries must be >= maxEntries", "open");
|
||||
}
|
||||
if (maxPluginEntries != null && maxPluginEntries > MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN) {
|
||||
throw invalidInput(
|
||||
`plugin state maxPluginEntries must be <= ${MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN}`,
|
||||
"open",
|
||||
);
|
||||
}
|
||||
assertConsistentOptions(pluginId, namespace, { maxEntries, defaultTtlMs, maxPluginEntries });
|
||||
|
||||
const prepareRegisterParams = (
|
||||
key: string,
|
||||
@@ -227,7 +245,7 @@ function createKeyedStoreForPluginId<T>(
|
||||
assertJsonSerializable(value);
|
||||
const json = JSON.stringify(value);
|
||||
assertValueSize(json);
|
||||
const ttlMs = validateOptionalTtlMs(opts?.ttlMs, "register") ?? defaultTtlMs;
|
||||
const ttlMs = validateOptionalPositiveInteger("ttlMs", opts?.ttlMs, "register") ?? defaultTtlMs;
|
||||
return {
|
||||
key: normalizedKey,
|
||||
valueJson: json,
|
||||
@@ -244,6 +262,7 @@ function createKeyedStoreForPluginId<T>(
|
||||
key: params.key,
|
||||
valueJson: params.valueJson,
|
||||
maxEntries,
|
||||
...(maxPluginEntries != null ? { maxPluginEntries } : {}),
|
||||
...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}),
|
||||
});
|
||||
},
|
||||
@@ -255,6 +274,7 @@ function createKeyedStoreForPluginId<T>(
|
||||
key: params.key,
|
||||
valueJson: params.valueJson,
|
||||
maxEntries,
|
||||
...(maxPluginEntries != null ? { maxPluginEntries } : {}),
|
||||
...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}),
|
||||
});
|
||||
},
|
||||
|
||||
@@ -19,6 +19,7 @@ export type OpenKeyedStoreOptions = {
|
||||
namespace: string;
|
||||
maxEntries: number;
|
||||
defaultTtlMs?: number;
|
||||
maxPluginEntries?: number;
|
||||
};
|
||||
|
||||
export type PluginStateStoreErrorCode =
|
||||
|
||||
Reference in New Issue
Block a user