Files
moltbot/src/plugin-state/plugin-state-store.sqlite.ts
2026-05-11 01:43:17 +01:00

716 lines
20 KiB
TypeScript

import type { DatabaseSync } from "node:sqlite";
import type { Insertable, Selectable } from "kysely";
import {
executeSqliteQuerySync,
executeSqliteQueryTakeFirstSync,
getNodeSqliteKysely,
} from "../infra/kysely-sync.js";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js";
import {
openOpenClawStateDatabase,
runOpenClawStateWriteTransaction,
} from "../state/openclaw-state-db.js";
import { resolvePluginStateSqlitePath } from "./plugin-state-store.paths.js";
import {
PluginStateStoreError,
type PluginStateEntry,
type PluginStateStoreErrorCode,
type PluginStateStoreOperation,
type PluginStateStoreProbeResult,
type PluginStateStoreProbeStep,
} from "./plugin-state-store.types.js";
export const MAX_PLUGIN_STATE_VALUE_BYTES = 65_536;
type PluginStateEntriesTable = OpenClawStateKyselyDatabase["plugin_state_entries"];
type PluginStateStoreDatabase = Pick<OpenClawStateKyselyDatabase, "plugin_state_entries">;
type PluginStateRow = Selectable<PluginStateEntriesTable>;
type CountRow = {
count: number | bigint;
};
type PluginStateDatabase = {
db: DatabaseSync;
path: string;
};
type PluginStateSeedEntryForTests = {
pluginId: string;
namespace: string;
key: string;
valueJson: string;
createdAt?: number;
expiresAt?: number | null;
};
let cachedDatabase: PluginStateDatabase | null = null;
function normalizeNumber(value: number | bigint | null): number | undefined {
if (typeof value === "bigint") {
return Number(value);
}
return typeof value === "number" ? value : undefined;
}
function createPluginStateError(params: {
code: PluginStateStoreErrorCode;
operation: PluginStateStoreOperation;
message: string;
path?: string;
cause?: unknown;
}): PluginStateStoreError {
return new PluginStateStoreError(params.message, {
code: params.code,
operation: params.operation,
...(params.path ? { path: params.path } : {}),
cause: params.cause,
});
}
function wrapPluginStateError(
error: unknown,
operation: PluginStateStoreOperation,
fallbackCode: PluginStateStoreErrorCode,
message: string,
pathname = resolvePluginStateSqlitePath(process.env),
): PluginStateStoreError {
if (error instanceof PluginStateStoreError) {
return error;
}
const errorMessage = error instanceof Error ? error.message : String(error);
if (errorMessage.includes("schema version")) {
return createPluginStateError({
code: "PLUGIN_STATE_SCHEMA_UNSUPPORTED",
operation: "ensure-schema",
message: errorMessage,
path: pathname,
cause: error,
});
}
return createPluginStateError({
code: fallbackCode,
operation,
message,
path: pathname,
cause: error,
});
}
function parseStoredJson(raw: string, operation: PluginStateStoreOperation): unknown {
try {
return JSON.parse(raw) as unknown;
} catch (error) {
throw createPluginStateError({
code: "PLUGIN_STATE_CORRUPT",
operation,
message: "Plugin state entry contains corrupt JSON.",
path: resolvePluginStateSqlitePath(process.env),
cause: error,
});
}
}
function rowToEntry(
row: PluginStateRow,
operation: PluginStateStoreOperation,
): PluginStateEntry<unknown> {
const expiresAt = normalizeNumber(row.expires_at);
return {
key: row.entry_key,
value: parseStoredJson(row.value_json, operation),
createdAt: normalizeNumber(row.created_at) ?? 0,
...(expiresAt != null ? { expiresAt } : {}),
};
}
function getPluginStateKysely(db: DatabaseSync) {
return getNodeSqliteKysely<PluginStateStoreDatabase>(db);
}
function bindPluginStateEntry(params: {
pluginId: string;
namespace: string;
key: string;
valueJson: string;
createdAt: number;
expiresAt: number | null;
}): Insertable<PluginStateEntriesTable> {
return {
plugin_id: params.pluginId,
namespace: params.namespace,
entry_key: params.key,
value_json: params.valueJson,
created_at: params.createdAt,
expires_at: params.expiresAt,
};
}
function upsertPluginStateEntry(db: DatabaseSync, row: Insertable<PluginStateEntriesTable>): void {
executeSqliteQuerySync(
db,
getPluginStateKysely(db)
.insertInto("plugin_state_entries")
.values(row)
.onConflict((conflict) =>
conflict.columns(["plugin_id", "namespace", "entry_key"]).doUpdateSet({
value_json: (eb) => eb.ref("excluded.value_json"),
created_at: (eb) => eb.ref("excluded.created_at"),
expires_at: (eb) => eb.ref("excluded.expires_at"),
}),
),
);
}
function insertPluginStateEntryIfAbsent(
db: DatabaseSync,
row: Insertable<PluginStateEntriesTable>,
): boolean {
const result = executeSqliteQuerySync(
db,
getPluginStateKysely(db).insertInto("plugin_state_entries").orIgnore().values(row),
);
return Number(result.numAffectedRows ?? 0) > 0;
}
function selectPluginStateEntry(
db: DatabaseSync,
params: { pluginId: string; namespace: string; key: string; now: number },
): PluginStateRow | undefined {
return executeSqliteQueryTakeFirstSync<PluginStateRow>(
db,
getPluginStateKysely(db)
.selectFrom("plugin_state_entries")
.select(["plugin_id", "namespace", "entry_key", "value_json", "created_at", "expires_at"])
.where("plugin_id", "=", params.pluginId)
.where("namespace", "=", params.namespace)
.where("entry_key", "=", params.key)
.where((eb) => eb.or([eb("expires_at", "is", null), eb("expires_at", ">", params.now)])),
);
}
function selectPluginStateEntries(
db: DatabaseSync,
params: { pluginId: string; namespace: string; now: number },
): PluginStateRow[] {
return executeSqliteQuerySync<PluginStateRow>(
db,
getPluginStateKysely(db)
.selectFrom("plugin_state_entries")
.select(["plugin_id", "namespace", "entry_key", "value_json", "created_at", "expires_at"])
.where("plugin_id", "=", params.pluginId)
.where("namespace", "=", params.namespace)
.where((eb) => eb.or([eb("expires_at", "is", null), eb("expires_at", ">", params.now)]))
.orderBy("created_at", "asc")
.orderBy("entry_key", "asc"),
).rows;
}
function deletePluginStateEntry(
db: DatabaseSync,
params: { pluginId: string; namespace: string; key: string },
): number {
const result = executeSqliteQuerySync(
db,
getPluginStateKysely(db)
.deleteFrom("plugin_state_entries")
.where("plugin_id", "=", params.pluginId)
.where("namespace", "=", params.namespace)
.where("entry_key", "=", params.key),
);
return Number(result.numAffectedRows ?? 0);
}
function deleteExpiredPluginStateNamespaceEntries(
db: DatabaseSync,
params: { pluginId: string; namespace: string; now: number },
): void {
executeSqliteQuerySync(
db,
getPluginStateKysely(db)
.deleteFrom("plugin_state_entries")
.where("plugin_id", "=", params.pluginId)
.where("namespace", "=", params.namespace)
.where("expires_at", "is not", null)
.where("expires_at", "<=", params.now),
);
}
function countLivePluginStateNamespaceEntries(
db: DatabaseSync,
params: { pluginId: string; namespace: string; now: number },
): number {
const row = executeSqliteQueryTakeFirstSync<CountRow>(
db,
getPluginStateKysely(db)
.selectFrom("plugin_state_entries")
.select((eb) => eb.fn.countAll<number | bigint>().as("count"))
.where("plugin_id", "=", params.pluginId)
.where("namespace", "=", params.namespace)
.where((eb) => eb.or([eb("expires_at", "is", null), eb("expires_at", ">", params.now)])),
);
return countRow(row);
}
function deleteOldestPluginStateNamespaceEntries(
db: DatabaseSync,
params: { pluginId: string; namespace: string; protectedKey: string; now: number; limit: number },
): void {
const keys = executeSqliteQuerySync<{ entry_key: string }>(
db,
getPluginStateKysely(db)
.selectFrom("plugin_state_entries")
.select(["entry_key"])
.where("plugin_id", "=", params.pluginId)
.where("namespace", "=", params.namespace)
.where("entry_key", "!=", params.protectedKey)
.where((eb) => eb.or([eb("expires_at", "is", null), eb("expires_at", ">", params.now)]))
.orderBy("created_at", "asc")
.orderBy("entry_key", "asc")
.limit(params.limit),
).rows;
for (const row of keys) {
deletePluginStateEntry(db, {
pluginId: params.pluginId,
namespace: params.namespace,
key: row.entry_key,
});
}
}
function sweepExpiredPluginStateEntriesFromDatabase(db: DatabaseSync, now: number): number {
const result = executeSqliteQuerySync(
db,
getPluginStateKysely(db)
.deleteFrom("plugin_state_entries")
.where("expires_at", "is not", null)
.where("expires_at", "<=", now),
);
return Number(result.numAffectedRows ?? 0);
}
function openPluginStateDatabase(
operation: PluginStateStoreOperation = "open",
): PluginStateDatabase {
const pathname = resolvePluginStateSqlitePath(process.env);
if (cachedDatabase && cachedDatabase.path === pathname && cachedDatabase.db.isOpen) {
return cachedDatabase;
}
if (cachedDatabase && !cachedDatabase.db.isOpen) {
cachedDatabase = null;
}
try {
const database = openOpenClawStateDatabase();
cachedDatabase = {
db: database.db,
path: database.path,
};
return cachedDatabase;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (message.includes("schema version")) {
throw createPluginStateError({
code: "PLUGIN_STATE_SCHEMA_UNSUPPORTED",
operation: "ensure-schema",
message,
path: pathname,
cause: error,
});
}
throw wrapPluginStateError(
error,
operation,
"PLUGIN_STATE_OPEN_FAILED",
"Failed to open the plugin state database.",
pathname,
);
}
}
function countRow(row: CountRow | undefined): number {
const raw = row?.count ?? 0;
return typeof raw === "bigint" ? Number(raw) : raw;
}
function runWriteTransaction<T>(
operation: PluginStateStoreOperation,
write: (store: PluginStateDatabase) => T,
): T {
return runOpenClawStateWriteTransaction(() => {
const store = openPluginStateDatabase(operation);
const result = write(store);
return result;
});
}
function enforcePostRegisterLimits(params: {
store: PluginStateDatabase;
pluginId: string;
namespace: string;
maxEntries: number;
now: number;
protectedKey: string;
}): void {
const namespaceCount = countLivePluginStateNamespaceEntries(params.store.db, {
pluginId: params.pluginId,
namespace: params.namespace,
now: params.now,
});
if (namespaceCount > params.maxEntries) {
deleteOldestPluginStateNamespaceEntries(params.store.db, {
pluginId: params.pluginId,
namespace: params.namespace,
protectedKey: params.protectedKey,
now: params.now,
limit: namespaceCount - params.maxEntries,
});
}
}
export function pluginStateRegister(params: {
pluginId: string;
namespace: string;
key: string;
valueJson: string;
maxEntries: number;
ttlMs?: number;
}): void {
try {
runWriteTransaction("register", (store) => {
const now = Date.now();
const expiresAt = params.ttlMs == null ? null : now + params.ttlMs;
deleteExpiredPluginStateNamespaceEntries(store.db, {
pluginId: params.pluginId,
namespace: params.namespace,
now,
});
upsertPluginStateEntry(
store.db,
bindPluginStateEntry({
pluginId: params.pluginId,
namespace: params.namespace,
key: params.key,
valueJson: params.valueJson,
createdAt: now,
expiresAt,
}),
);
enforcePostRegisterLimits({
store,
pluginId: params.pluginId,
namespace: params.namespace,
maxEntries: params.maxEntries,
now,
protectedKey: params.key,
});
});
} catch (error) {
throw wrapPluginStateError(
error,
"register",
"PLUGIN_STATE_WRITE_FAILED",
"Failed to register plugin state entry.",
);
}
}
export function pluginStateRegisterIfAbsent(params: {
pluginId: string;
namespace: string;
key: string;
valueJson: string;
maxEntries: number;
ttlMs?: number;
}): boolean {
try {
return runWriteTransaction("register", (store) => {
const now = Date.now();
const expiresAt = params.ttlMs == null ? null : now + params.ttlMs;
deleteExpiredPluginStateNamespaceEntries(store.db, {
pluginId: params.pluginId,
namespace: params.namespace,
now,
});
const inserted = insertPluginStateEntryIfAbsent(
store.db,
bindPluginStateEntry({
pluginId: params.pluginId,
namespace: params.namespace,
key: params.key,
valueJson: params.valueJson,
createdAt: now,
expiresAt,
}),
);
if (!inserted) {
return false;
}
enforcePostRegisterLimits({
store,
pluginId: params.pluginId,
namespace: params.namespace,
maxEntries: params.maxEntries,
now,
protectedKey: params.key,
});
return true;
});
} catch (error) {
throw wrapPluginStateError(
error,
"register",
"PLUGIN_STATE_WRITE_FAILED",
"Failed to register plugin state entry.",
);
}
}
export function pluginStateLookup(params: {
pluginId: string;
namespace: string;
key: string;
}): unknown {
try {
const { db } = openPluginStateDatabase("lookup");
const row = selectPluginStateEntry(db, {
pluginId: params.pluginId,
namespace: params.namespace,
key: params.key,
now: Date.now(),
});
return row ? parseStoredJson(row.value_json, "lookup") : undefined;
} catch (error) {
throw wrapPluginStateError(
error,
"lookup",
"PLUGIN_STATE_READ_FAILED",
"Failed to read plugin state entry.",
);
}
}
export function pluginStateConsume(params: {
pluginId: string;
namespace: string;
key: string;
}): unknown {
try {
return runWriteTransaction("consume", (store) => {
const row = selectPluginStateEntry(store.db, {
pluginId: params.pluginId,
namespace: params.namespace,
key: params.key,
now: Date.now(),
});
if (!row) {
return undefined;
}
deletePluginStateEntry(store.db, params);
return parseStoredJson(row.value_json, "consume");
});
} catch (error) {
throw wrapPluginStateError(
error,
"consume",
"PLUGIN_STATE_READ_FAILED",
"Failed to consume plugin state entry.",
);
}
}
export function pluginStateDelete(params: {
pluginId: string;
namespace: string;
key: string;
}): boolean {
try {
return runWriteTransaction("delete", ({ db }) => {
return deletePluginStateEntry(db, params) > 0;
});
} catch (error) {
throw wrapPluginStateError(
error,
"delete",
"PLUGIN_STATE_WRITE_FAILED",
"Failed to delete plugin state entry.",
);
}
}
export function pluginStateEntries(params: {
pluginId: string;
namespace: string;
}): PluginStateEntry<unknown>[] {
try {
const { db } = openPluginStateDatabase("entries");
const rows = selectPluginStateEntries(db, {
pluginId: params.pluginId,
namespace: params.namespace,
now: Date.now(),
});
return rows.map((row) => rowToEntry(row, "entries"));
} catch (error) {
throw wrapPluginStateError(
error,
"entries",
"PLUGIN_STATE_READ_FAILED",
"Failed to list plugin state entries.",
);
}
}
export function pluginStateClear(params: { pluginId: string; namespace: string }): void {
try {
runWriteTransaction("clear", ({ db }) => {
executeSqliteQuerySync(
db,
getPluginStateKysely(db)
.deleteFrom("plugin_state_entries")
.where("plugin_id", "=", params.pluginId)
.where("namespace", "=", params.namespace),
);
});
} catch (error) {
throw wrapPluginStateError(
error,
"clear",
"PLUGIN_STATE_WRITE_FAILED",
"Failed to clear plugin state namespace.",
);
}
}
export function sweepExpiredPluginStateEntries(): number {
try {
return runWriteTransaction("sweep", ({ db }) =>
sweepExpiredPluginStateEntriesFromDatabase(db, Date.now()),
);
} catch (error) {
throw wrapPluginStateError(
error,
"sweep",
"PLUGIN_STATE_WRITE_FAILED",
"Failed to sweep expired plugin state entries.",
);
}
}
export function isPluginStateDatabaseOpen(): boolean {
return cachedDatabase?.db.isOpen === true;
}
export function clearPluginStateSqliteStoreForTests(): void {
const store = openPluginStateDatabase("clear");
store.db.exec("DELETE FROM plugin_state_entries;");
}
export function seedPluginStateSqliteEntriesForTests(
entries: readonly PluginStateSeedEntryForTests[],
): void {
if (entries.length === 0) {
return;
}
const now = Date.now();
runWriteTransaction("register", (store) => {
for (let index = 0; index < entries.length; index += 1) {
const entry = entries[index];
store.statements.upsertEntry.run({
plugin_id: entry.pluginId,
namespace: entry.namespace,
entry_key: entry.key,
value_json: entry.valueJson,
created_at: entry.createdAt ?? now + index,
expires_at: entry.expiresAt ?? null,
});
}
});
}
export function probePluginStateStore(): PluginStateStoreProbeResult {
const dbPath = resolvePluginStateSqlitePath(process.env);
const steps: PluginStateStoreProbeStep[] = [];
const wasOpen = cachedDatabase !== null;
const pushOk = (name: string) => steps.push({ name, ok: true });
const pushFailure = (name: string, error: unknown) => {
const wrapped =
error instanceof PluginStateStoreError
? error
: createPluginStateError({
code: "PLUGIN_STATE_OPEN_FAILED",
operation: "probe",
message: error instanceof Error ? error.message : String(error),
path: dbPath,
cause: error,
});
steps.push({ name, ok: false, code: wrapped.code, message: wrapped.message });
};
try {
requireNodeSqlite();
pushOk("load-sqlite");
} catch (error) {
pushFailure(
"load-sqlite",
createPluginStateError({
code: "PLUGIN_STATE_SQLITE_UNAVAILABLE",
operation: "load-sqlite",
message: "SQLite support is unavailable for plugin state storage.",
path: dbPath,
cause: error,
}),
);
return { ok: false, dbPath, steps };
}
try {
openPluginStateDatabase("probe");
pushOk("open");
pushOk("schema");
runWriteTransaction("probe", ({ db }) => {
const now = Date.now();
upsertPluginStateEntry(
db,
bindPluginStateEntry({
pluginId: "core:plugin-state-probe",
namespace: "diagnostics",
key: "probe",
valueJson: JSON.stringify({ ok: true }),
createdAt: now,
expiresAt: now + 60_000,
}),
);
selectPluginStateEntry(db, {
pluginId: "core:plugin-state-probe",
namespace: "diagnostics",
key: "probe",
now,
});
deletePluginStateEntry(db, {
pluginId: "core:plugin-state-probe",
namespace: "diagnostics",
key: "probe",
});
});
pushOk("write-read-delete");
openOpenClawStateDatabase().walMaintenance.checkpoint();
pushOk("checkpoint");
} catch (error) {
pushFailure("probe", error);
} finally {
if (!wasOpen) {
closePluginStateSqliteStore();
}
}
return { ok: steps.every((step) => step.ok), dbPath, steps };
}
export function closePluginStateSqliteStore(): void {
cachedDatabase = null;
}