import type { DatabaseSync } from "node:sqlite"; import type { Insertable, Selectable } from "kysely"; import { executeSqliteQuerySync, executeSqliteQueryTakeFirstSync, getNodeSqliteKysely, } from "../infra/kysely-sync.js"; import { requireNodeSqlite } from "../infra/node-sqlite.js"; import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js"; import { openOpenClawStateDatabase, type OpenClawStateDatabaseOptions, runOpenClawStateWriteTransaction, } from "../state/openclaw-state-db.js"; import { resolvePluginStateSqlitePath } from "./plugin-state-store.paths.js"; import { PluginStateStoreError, type PluginStateEntry, type PluginStateStoreErrorCode, type PluginStateStoreOperation, type PluginStateStoreProbeResult, type PluginStateStoreProbeStep, } from "./plugin-state-store.types.js"; export const MAX_PLUGIN_STATE_VALUE_BYTES = 65_536; type PluginStateEntriesTable = OpenClawStateKyselyDatabase["plugin_state_entries"]; type PluginStateStoreDatabase = Pick; type PluginStateRow = Selectable; type CountRow = { count: number | bigint; }; type PluginStateDatabase = { db: DatabaseSync; path: string; }; type PluginStateSeedEntryForTests = { pluginId: string; namespace: string; key: string; valueJson: string; createdAt?: number; expiresAt?: number | null; }; let cachedDatabase: PluginStateDatabase | null = null; function normalizeNumber(value: number | bigint | null): number | undefined { if (typeof value === "bigint") { return Number(value); } return typeof value === "number" ? value : undefined; } function createPluginStateError(params: { code: PluginStateStoreErrorCode; operation: PluginStateStoreOperation; message: string; path?: string; cause?: unknown; }): PluginStateStoreError { return new PluginStateStoreError(params.message, { code: params.code, operation: params.operation, ...(params.path ? { path: params.path } : {}), cause: params.cause, }); } function wrapPluginStateError( error: unknown, operation: PluginStateStoreOperation, fallbackCode: PluginStateStoreErrorCode, message: string, pathname = resolvePluginStateSqlitePath(process.env), ): PluginStateStoreError { if (error instanceof PluginStateStoreError) { return error; } const errorMessage = error instanceof Error ? error.message : String(error); if (errorMessage.includes("schema version")) { return createPluginStateError({ code: "PLUGIN_STATE_SCHEMA_UNSUPPORTED", operation: "ensure-schema", message: errorMessage, path: pathname, cause: error, }); } return createPluginStateError({ code: fallbackCode, operation, message, path: pathname, cause: error, }); } function parseStoredJson(raw: string, operation: PluginStateStoreOperation): unknown { try { return JSON.parse(raw) as unknown; } catch (error) { throw createPluginStateError({ code: "PLUGIN_STATE_CORRUPT", operation, message: "Plugin state entry contains corrupt JSON.", path: resolvePluginStateSqlitePath(process.env), cause: error, }); } } function rowToEntry( row: PluginStateRow, operation: PluginStateStoreOperation, ): PluginStateEntry { const expiresAt = normalizeNumber(row.expires_at); return { key: row.entry_key, value: parseStoredJson(row.value_json, operation), createdAt: normalizeNumber(row.created_at) ?? 0, ...(expiresAt != null ? { expiresAt } : {}), }; } function getPluginStateKysely(db: DatabaseSync) { return getNodeSqliteKysely(db); } function bindPluginStateEntry(params: { pluginId: string; namespace: string; key: string; valueJson: string; createdAt: number; expiresAt: number | null; }): Insertable { return { plugin_id: params.pluginId, namespace: params.namespace, entry_key: params.key, value_json: params.valueJson, created_at: params.createdAt, expires_at: params.expiresAt, }; } function upsertPluginStateEntry(db: DatabaseSync, row: Insertable): void { executeSqliteQuerySync( db, getPluginStateKysely(db) .insertInto("plugin_state_entries") .values(row) .onConflict((conflict) => conflict.columns(["plugin_id", "namespace", "entry_key"]).doUpdateSet({ value_json: (eb) => eb.ref("excluded.value_json"), created_at: (eb) => eb.ref("excluded.created_at"), expires_at: (eb) => eb.ref("excluded.expires_at"), }), ), ); } function insertPluginStateEntryIfAbsent( db: DatabaseSync, row: Insertable, ): boolean { const result = executeSqliteQuerySync( db, getPluginStateKysely(db).insertInto("plugin_state_entries").orIgnore().values(row), ); return Number(result.numAffectedRows ?? 0) > 0; } function selectPluginStateEntry( db: DatabaseSync, params: { pluginId: string; namespace: string; key: string; now: number }, ): PluginStateRow | undefined { return executeSqliteQueryTakeFirstSync( db, getPluginStateKysely(db) .selectFrom("plugin_state_entries") .select(["plugin_id", "namespace", "entry_key", "value_json", "created_at", "expires_at"]) .where("plugin_id", "=", params.pluginId) .where("namespace", "=", params.namespace) .where("entry_key", "=", params.key) .where((eb) => eb.or([eb("expires_at", "is", null), eb("expires_at", ">", params.now)])), ); } function selectPluginStateEntries( db: DatabaseSync, params: { pluginId: string; namespace: string; now: number }, ): PluginStateRow[] { return executeSqliteQuerySync( db, getPluginStateKysely(db) .selectFrom("plugin_state_entries") .select(["plugin_id", "namespace", "entry_key", "value_json", "created_at", "expires_at"]) .where("plugin_id", "=", params.pluginId) .where("namespace", "=", params.namespace) .where((eb) => eb.or([eb("expires_at", "is", null), eb("expires_at", ">", params.now)])) .orderBy("created_at", "asc") .orderBy("entry_key", "asc"), ).rows; } function deletePluginStateEntry( db: DatabaseSync, params: { pluginId: string; namespace: string; key: string }, ): number { const result = executeSqliteQuerySync( db, getPluginStateKysely(db) .deleteFrom("plugin_state_entries") .where("plugin_id", "=", params.pluginId) .where("namespace", "=", params.namespace) .where("entry_key", "=", params.key), ); return Number(result.numAffectedRows ?? 0); } function deleteExpiredPluginStateNamespaceEntries( db: DatabaseSync, params: { pluginId: string; namespace: string; now: number }, ): void { executeSqliteQuerySync( db, getPluginStateKysely(db) .deleteFrom("plugin_state_entries") .where("plugin_id", "=", params.pluginId) .where("namespace", "=", params.namespace) .where("expires_at", "is not", null) .where("expires_at", "<=", params.now), ); } function countLivePluginStateNamespaceEntries( db: DatabaseSync, params: { pluginId: string; namespace: string; now: number }, ): number { const row = executeSqliteQueryTakeFirstSync( db, getPluginStateKysely(db) .selectFrom("plugin_state_entries") .select((eb) => eb.fn.countAll().as("count")) .where("plugin_id", "=", params.pluginId) .where("namespace", "=", params.namespace) .where((eb) => eb.or([eb("expires_at", "is", null), eb("expires_at", ">", params.now)])), ); return countRow(row); } function deleteOldestPluginStateNamespaceEntries( db: DatabaseSync, params: { pluginId: string; namespace: string; protectedKey: string; now: number; limit: number }, ): void { const keys = executeSqliteQuerySync<{ entry_key: string }>( db, getPluginStateKysely(db) .selectFrom("plugin_state_entries") .select(["entry_key"]) .where("plugin_id", "=", params.pluginId) .where("namespace", "=", params.namespace) .where("entry_key", "!=", params.protectedKey) .where((eb) => eb.or([eb("expires_at", "is", null), eb("expires_at", ">", params.now)])) .orderBy("created_at", "asc") .orderBy("entry_key", "asc") .limit(params.limit), ).rows; for (const row of keys) { deletePluginStateEntry(db, { pluginId: params.pluginId, namespace: params.namespace, key: row.entry_key, }); } } function sweepExpiredPluginStateEntriesFromDatabase(db: DatabaseSync, now: number): number { const result = executeSqliteQuerySync( db, getPluginStateKysely(db) .deleteFrom("plugin_state_entries") .where("expires_at", "is not", null) .where("expires_at", "<=", now), ); return Number(result.numAffectedRows ?? 0); } function openPluginStateDatabase( operation: PluginStateStoreOperation = "open", options: OpenClawStateDatabaseOptions = {}, ): PluginStateDatabase { const env = options.env ?? process.env; const pathname = resolvePluginStateSqlitePath(env); if (cachedDatabase && cachedDatabase.path === pathname && cachedDatabase.db.isOpen) { return cachedDatabase; } if (cachedDatabase && !cachedDatabase.db.isOpen) { cachedDatabase = null; } try { const database = openOpenClawStateDatabase(options); cachedDatabase = { db: database.db, path: database.path, }; return cachedDatabase; } catch (error) { const message = error instanceof Error ? error.message : String(error); if (message.includes("schema version")) { throw createPluginStateError({ code: "PLUGIN_STATE_SCHEMA_UNSUPPORTED", operation: "ensure-schema", message, path: pathname, cause: error, }); } throw wrapPluginStateError( error, operation, "PLUGIN_STATE_OPEN_FAILED", "Failed to open the plugin state database.", pathname, ); } } function countRow(row: CountRow | undefined): number { const raw = row?.count ?? 0; return typeof raw === "bigint" ? Number(raw) : raw; } function envOptions(env?: NodeJS.ProcessEnv): OpenClawStateDatabaseOptions { return env ? { env } : {}; } function runWriteTransaction( operation: PluginStateStoreOperation, write: (store: PluginStateDatabase) => T, options: OpenClawStateDatabaseOptions = {}, ): T { return runOpenClawStateWriteTransaction(() => { const store = openPluginStateDatabase(operation, options); const result = write(store); return result; }, options); } function enforcePostRegisterLimits(params: { store: PluginStateDatabase; pluginId: string; namespace: string; maxEntries: number; now: number; protectedKey: string; }): void { const namespaceCount = countLivePluginStateNamespaceEntries(params.store.db, { pluginId: params.pluginId, namespace: params.namespace, now: params.now, }); if (namespaceCount > params.maxEntries) { deleteOldestPluginStateNamespaceEntries(params.store.db, { pluginId: params.pluginId, namespace: params.namespace, protectedKey: params.protectedKey, now: params.now, limit: namespaceCount - params.maxEntries, }); } } export function pluginStateRegister(params: { pluginId: string; namespace: string; key: string; valueJson: string; maxEntries: number; ttlMs?: number; env?: NodeJS.ProcessEnv; }): void { try { runWriteTransaction( "register", (store) => { const now = Date.now(); const expiresAt = params.ttlMs == null ? null : now + params.ttlMs; deleteExpiredPluginStateNamespaceEntries(store.db, { pluginId: params.pluginId, namespace: params.namespace, now, }); upsertPluginStateEntry( store.db, bindPluginStateEntry({ pluginId: params.pluginId, namespace: params.namespace, key: params.key, valueJson: params.valueJson, createdAt: now, expiresAt, }), ); enforcePostRegisterLimits({ store, pluginId: params.pluginId, namespace: params.namespace, maxEntries: params.maxEntries, now, protectedKey: params.key, }); }, envOptions(params.env), ); } catch (error) { throw wrapPluginStateError( error, "register", "PLUGIN_STATE_WRITE_FAILED", "Failed to register plugin state entry.", ); } } export function pluginStateRegisterIfAbsent(params: { pluginId: string; namespace: string; key: string; valueJson: string; maxEntries: number; ttlMs?: number; env?: NodeJS.ProcessEnv; }): boolean { try { return runWriteTransaction( "register", (store) => { const now = Date.now(); const expiresAt = params.ttlMs == null ? null : now + params.ttlMs; deleteExpiredPluginStateNamespaceEntries(store.db, { pluginId: params.pluginId, namespace: params.namespace, now, }); const inserted = insertPluginStateEntryIfAbsent( store.db, bindPluginStateEntry({ pluginId: params.pluginId, namespace: params.namespace, key: params.key, valueJson: params.valueJson, createdAt: now, expiresAt, }), ); if (!inserted) { return false; } enforcePostRegisterLimits({ store, pluginId: params.pluginId, namespace: params.namespace, maxEntries: params.maxEntries, now, protectedKey: params.key, }); return true; }, envOptions(params.env), ); } catch (error) { throw wrapPluginStateError( error, "register", "PLUGIN_STATE_WRITE_FAILED", "Failed to register plugin state entry.", ); } } export function pluginStateLookup(params: { pluginId: string; namespace: string; key: string; env?: NodeJS.ProcessEnv; }): unknown { try { const { db } = openPluginStateDatabase("lookup", envOptions(params.env)); const row = selectPluginStateEntry(db, { pluginId: params.pluginId, namespace: params.namespace, key: params.key, now: Date.now(), }); return row ? parseStoredJson(row.value_json, "lookup") : undefined; } catch (error) { throw wrapPluginStateError( error, "lookup", "PLUGIN_STATE_READ_FAILED", "Failed to read plugin state entry.", ); } } export function pluginStateConsume(params: { pluginId: string; namespace: string; key: string; env?: NodeJS.ProcessEnv; }): unknown { try { return runWriteTransaction( "consume", (store) => { const row = selectPluginStateEntry(store.db, { pluginId: params.pluginId, namespace: params.namespace, key: params.key, now: Date.now(), }); if (!row) { return undefined; } deletePluginStateEntry(store.db, params); return parseStoredJson(row.value_json, "consume"); }, envOptions(params.env), ); } catch (error) { throw wrapPluginStateError( error, "consume", "PLUGIN_STATE_READ_FAILED", "Failed to consume plugin state entry.", ); } } export function pluginStateDelete(params: { pluginId: string; namespace: string; key: string; env?: NodeJS.ProcessEnv; }): boolean { try { return runWriteTransaction( "delete", ({ db }) => { return deletePluginStateEntry(db, params) > 0; }, envOptions(params.env), ); } catch (error) { throw wrapPluginStateError( error, "delete", "PLUGIN_STATE_WRITE_FAILED", "Failed to delete plugin state entry.", ); } } export function pluginStateEntries(params: { pluginId: string; namespace: string; env?: NodeJS.ProcessEnv; }): PluginStateEntry[] { try { const { db } = openPluginStateDatabase("entries", envOptions(params.env)); const rows = selectPluginStateEntries(db, { pluginId: params.pluginId, namespace: params.namespace, now: Date.now(), }); return rows.map((row) => rowToEntry(row, "entries")); } catch (error) { throw wrapPluginStateError( error, "entries", "PLUGIN_STATE_READ_FAILED", "Failed to list plugin state entries.", ); } } export function pluginStateClear(params: { pluginId: string; namespace: string; env?: NodeJS.ProcessEnv; }): void { try { runWriteTransaction( "clear", ({ db }) => { executeSqliteQuerySync( db, getPluginStateKysely(db) .deleteFrom("plugin_state_entries") .where("plugin_id", "=", params.pluginId) .where("namespace", "=", params.namespace), ); }, envOptions(params.env), ); } catch (error) { throw wrapPluginStateError( error, "clear", "PLUGIN_STATE_WRITE_FAILED", "Failed to clear plugin state namespace.", ); } } export function sweepExpiredPluginStateEntries(): number { try { return runWriteTransaction("sweep", ({ db }) => sweepExpiredPluginStateEntriesFromDatabase(db, Date.now()), ); } catch (error) { throw wrapPluginStateError( error, "sweep", "PLUGIN_STATE_WRITE_FAILED", "Failed to sweep expired plugin state entries.", ); } } export function isPluginStateDatabaseOpen(): boolean { return cachedDatabase?.db.isOpen === true; } export function clearPluginStateSqliteStoreForTests(): void { const store = openPluginStateDatabase("clear"); store.db.exec("DELETE FROM plugin_state_entries;"); } export function seedPluginStateSqliteEntriesForTests( entries: readonly PluginStateSeedEntryForTests[], ): void { if (entries.length === 0) { return; } const now = Date.now(); runWriteTransaction("register", (store) => { for (let index = 0; index < entries.length; index += 1) { const entry = entries[index]; store.statements.upsertEntry.run({ plugin_id: entry.pluginId, namespace: entry.namespace, entry_key: entry.key, value_json: entry.valueJson, created_at: entry.createdAt ?? now + index, expires_at: entry.expiresAt ?? null, }); } }); } export function probePluginStateStore(): PluginStateStoreProbeResult { const dbPath = resolvePluginStateSqlitePath(process.env); const steps: PluginStateStoreProbeStep[] = []; const wasOpen = cachedDatabase !== null; const pushOk = (name: string) => steps.push({ name, ok: true }); const pushFailure = (name: string, error: unknown) => { const wrapped = error instanceof PluginStateStoreError ? error : createPluginStateError({ code: "PLUGIN_STATE_OPEN_FAILED", operation: "probe", message: error instanceof Error ? error.message : String(error), path: dbPath, cause: error, }); steps.push({ name, ok: false, code: wrapped.code, message: wrapped.message }); }; try { requireNodeSqlite(); pushOk("load-sqlite"); } catch (error) { pushFailure( "load-sqlite", createPluginStateError({ code: "PLUGIN_STATE_SQLITE_UNAVAILABLE", operation: "load-sqlite", message: "SQLite support is unavailable for plugin state storage.", path: dbPath, cause: error, }), ); return { ok: false, dbPath, steps }; } try { openPluginStateDatabase("probe"); pushOk("open"); pushOk("schema"); runWriteTransaction("probe", ({ db }) => { const now = Date.now(); upsertPluginStateEntry( db, bindPluginStateEntry({ pluginId: "core:plugin-state-probe", namespace: "diagnostics", key: "probe", valueJson: JSON.stringify({ ok: true }), createdAt: now, expiresAt: now + 60_000, }), ); selectPluginStateEntry(db, { pluginId: "core:plugin-state-probe", namespace: "diagnostics", key: "probe", now, }); deletePluginStateEntry(db, { pluginId: "core:plugin-state-probe", namespace: "diagnostics", key: "probe", }); }); pushOk("write-read-delete"); openOpenClawStateDatabase().walMaintenance.checkpoint(); pushOk("checkpoint"); } catch (error) { pushFailure("probe", error); } finally { if (!wasOpen) { closePluginStateSqliteStore(); } } return { ok: steps.every((step) => step.ok), dbPath, steps }; } export function closePluginStateSqliteStore(): void { cachedDatabase = null; }