refactor: store acp replay ledger in sqlite tables

This commit is contained in:
Peter Steinberger
2026-05-08 23:50:57 +01:00
parent 4287f1558e
commit 126c5bf194
7 changed files with 273 additions and 25 deletions

View File

@@ -698,8 +698,9 @@ Move these into agent databases:
the per-agent `cache_entries` table. Gateway-wide model caches stay in the
global database unless they become agent-specific.
- ACP parent stream logs. Done for runtime writes.
- ACP replay ledger sessions. Done for runtime writes; legacy
`acp/event-ledger.json` remains only as doctor/migrate input.
- ACP replay ledger sessions. Done for runtime writes via
`acp_replay_sessions` and `acp_replay_events`; legacy `acp/event-ledger.json`
remains only as doctor/migrate input.
- Trajectory sidecars when they are not explicit export files. Done for runtime
writes: trajectory capture writes agent-database `trajectory_runtime_events`
rows and mirrors run-scoped artifacts into SQLite. Legacy sidecars remain

View File

@@ -1,6 +1,7 @@
import fs from "node:fs/promises";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
import { withTempDir } from "../test-helpers/temp-dir.js";
import {
@@ -118,6 +119,45 @@ describe("ACP event ledger", () => {
});
});
it("stores SQLite replay state in relational tables instead of legacy kv blobs", async () => {
await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => {
const dbPath = path.join(dir, "openclaw-state.sqlite");
const ledger = createSqliteAcpEventLedger({ path: dbPath, now: () => 1000 });
await ledger.startSession({
sessionId: "session-1",
sessionKey: "agent:main:work",
cwd: "/work",
complete: true,
});
await ledger.recordUpdate({
sessionId: "session-1",
sessionKey: "agent:main:work",
runId: "run-1",
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "Answer" },
},
});
closeOpenClawStateDatabaseForTest();
const sqlite = requireNodeSqlite();
const db = new sqlite.DatabaseSync(dbPath);
try {
expect(db.prepare("SELECT COUNT(*) AS count FROM acp_replay_sessions").get()).toEqual({
count: 1,
});
expect(db.prepare("SELECT COUNT(*) AS count FROM acp_replay_events").get()).toEqual({
count: 1,
});
expect(
db.prepare("SELECT COUNT(*) AS count FROM kv WHERE scope = 'acp_event_ledger'").get(),
).toEqual({ count: 0 });
} finally {
db.close();
}
});
});
it("can replay a complete session by Gateway session key", async () => {
const ledger = createInMemoryAcpEventLedger({ now: () => 1000 });
await ledger.startSession({

View File

@@ -84,13 +84,35 @@ type LedgerOptions = {
now?: () => number;
};
type AcpEventLedgerKvDatabase = Pick<OpenClawStateKyselyDatabase, "kv">;
type AcpEventLedgerDatabase = Pick<
OpenClawStateKyselyDatabase,
"acp_replay_events" | "acp_replay_sessions" | "kv"
>;
type LedgerKvRow = {
key: string;
value_json: string;
};
type LedgerSessionRow = {
complete: number;
created_at: number;
cwd: string;
next_seq: number;
session_id: string;
session_key: string;
updated_at: number;
};
type LedgerEventRow = {
at: number;
run_id: string | null;
seq: number;
session_id: string;
session_key: string;
update_json: string;
};
type MutableLedgerState = {
store: LedgerStore;
maxSessions: number;
@@ -447,8 +469,8 @@ function dbOptionsFromParams(
};
}
function loadStoreFromSqliteDb(database: DatabaseSync): LedgerStore {
const db = getNodeSqliteKysely<AcpEventLedgerKvDatabase>(database);
function loadStoreFromLegacyKvRows(database: DatabaseSync): LedgerStore {
const db = getNodeSqliteKysely<AcpEventLedgerDatabase>(database);
const rows = executeSqliteQuerySync<LedgerKvRow>(
database,
db
@@ -473,30 +495,90 @@ function loadStoreFromSqliteDb(database: DatabaseSync): LedgerStore {
return { version: LEDGER_VERSION, sessions };
}
function loadStoreFromSqliteDb(database: DatabaseSync): LedgerStore {
const db = getNodeSqliteKysely<AcpEventLedgerDatabase>(database);
const sessionRows = executeSqliteQuerySync<LedgerSessionRow>(
database,
db
.selectFrom("acp_replay_sessions")
.select([
"session_id",
"session_key",
"cwd",
"complete",
"created_at",
"updated_at",
"next_seq",
])
.orderBy("updated_at", "desc")
.orderBy("session_id", "asc"),
).rows;
if (sessionRows.length === 0) {
return loadStoreFromLegacyKvRows(database);
}
const sessions: Record<string, LedgerSession> = {};
for (const row of sessionRows) {
sessions[row.session_id] = {
sessionId: row.session_id,
sessionKey: row.session_key,
cwd: row.cwd,
complete: row.complete === 1,
createdAt: row.created_at,
updatedAt: row.updated_at,
nextSeq: row.next_seq,
events: [],
};
}
const eventRows = executeSqliteQuerySync<LedgerEventRow>(
database,
db
.selectFrom("acp_replay_events")
.select(["session_id", "seq", "at", "session_key", "run_id", "update_json"])
.orderBy("session_id", "asc")
.orderBy("seq", "asc"),
).rows;
for (const row of eventRows) {
const session = sessions[row.session_id];
if (!session) {
continue;
}
try {
session.events.push({
seq: row.seq,
at: row.at,
sessionId: row.session_id,
sessionKey: row.session_key,
...(row.run_id ? { runId: row.run_id } : {}),
update: JSON.parse(row.update_json) as SessionUpdate,
});
} catch {
session.complete = false;
}
}
return { version: LEDGER_VERSION, sessions };
}
function writeStoreToSqliteDb(
database: DatabaseSync,
store: LedgerStore,
updatedAt: number,
options: { pruneMissing?: boolean } = {},
): void {
const db = getNodeSqliteKysely<AcpEventLedgerKvDatabase>(database);
const db = getNodeSqliteKysely<AcpEventLedgerDatabase>(database);
if (options.pruneMissing !== false) {
const existing = executeSqliteQuerySync<LedgerKvRow>(
const existing = executeSqliteQuerySync<{ session_id: string }>(
database,
db
.selectFrom("kv")
.select(["key", "value_json"])
.where("scope", "=", ACP_EVENT_LEDGER_KV_SCOPE),
db.selectFrom("acp_replay_sessions").select("session_id"),
).rows;
const retained = new Set(Object.keys(store.sessions));
for (const row of existing) {
if (!retained.has(row.key)) {
if (!retained.has(row.session_id)) {
executeSqliteQuerySync(
database,
db
.deleteFrom("kv")
.where("scope", "=", ACP_EVENT_LEDGER_KV_SCOPE)
.where("key", "=", row.key),
db.deleteFrom("acp_replay_sessions").where("session_id", "=", row.session_id),
);
}
}
@@ -505,21 +587,65 @@ function writeStoreToSqliteDb(
executeSqliteQuerySync(
database,
db
.insertInto("kv")
.insertInto("acp_replay_sessions")
.values({
scope: ACP_EVENT_LEDGER_KV_SCOPE,
key: session.sessionId,
value_json: JSON.stringify(session),
updated_at: updatedAt,
session_id: session.sessionId,
session_key: session.sessionKey,
cwd: session.cwd,
complete: session.complete ? 1 : 0,
created_at: session.createdAt,
updated_at: session.updatedAt || updatedAt,
next_seq: session.nextSeq,
})
.onConflict((conflict) =>
conflict.columns(["scope", "key"]).doUpdateSet({
value_json: JSON.stringify(session),
updated_at: updatedAt,
conflict.column("session_id").doUpdateSet({
session_key: session.sessionKey,
cwd: session.cwd,
complete: session.complete ? 1 : 0,
created_at: session.createdAt,
updated_at: session.updatedAt || updatedAt,
next_seq: session.nextSeq,
}),
),
);
executeSqliteQuerySync(
database,
db.deleteFrom("acp_replay_events").where("session_id", "=", session.sessionId),
);
if (session.events.length > 0) {
executeSqliteQuerySync(
database,
db.insertInto("acp_replay_events").values(
session.events.map((event) => ({
session_id: event.sessionId,
seq: event.seq,
at: event.at,
session_key: event.sessionKey,
run_id: event.runId ?? null,
update_json: JSON.stringify(event.update),
})),
),
);
}
}
executeSqliteQuerySync(
database,
db.deleteFrom("kv").where("scope", "=", ACP_EVENT_LEDGER_KV_SCOPE),
);
executeSqliteQuerySync(
database,
db
.deleteFrom("acp_replay_events")
.where(({ not, exists, selectFrom, ref }) =>
not(
exists(
selectFrom("acp_replay_sessions")
.select("session_id")
.whereRef("acp_replay_sessions.session_id", "=", ref("acp_replay_events.session_id")),
),
),
),
);
}
function writeStoreToSqlite(

View File

@@ -9,6 +9,25 @@ export type Generated<T> = T extends ColumnType<infer S, infer I, infer U>
? ColumnType<S, I | undefined, U>
: ColumnType<T, T | undefined, T>;
export interface AcpReplayEvents {
at: number;
run_id: string | null;
seq: number;
session_id: string;
session_key: string;
update_json: string;
}
export interface AcpReplaySessions {
complete: number;
created_at: number;
cwd: string;
next_seq: number;
session_id: string;
session_key: string;
updated_at: number;
}
export interface AgentDatabases {
agent_id: string;
last_seen_at: number;
@@ -256,6 +275,8 @@ export interface TranscriptFiles {
}
export interface DB {
acp_replay_events: AcpReplayEvents;
acp_replay_sessions: AcpReplaySessions;
agent_databases: AgentDatabases;
agents: Agents;
backup_runs: BackupRuns;

View File

@@ -17,7 +17,7 @@ import {
} from "./openclaw-state-db.paths.js";
import { OPENCLAW_STATE_SCHEMA_SQL } from "./openclaw-state-schema.generated.js";
const OPENCLAW_STATE_SCHEMA_VERSION = 17;
const OPENCLAW_STATE_SCHEMA_VERSION = 18;
export const OPENCLAW_SQLITE_BUSY_TIMEOUT_MS = 30_000;
const OPENCLAW_STATE_DIR_MODE = 0o700;
const OPENCLAW_STATE_FILE_MODE = 0o600;

View File

@@ -16,6 +16,36 @@ CREATE TABLE IF NOT EXISTS kv (
PRIMARY KEY (scope, key)
);
CREATE TABLE IF NOT EXISTS acp_replay_sessions (
session_id TEXT NOT NULL PRIMARY KEY,
session_key TEXT NOT NULL,
cwd TEXT NOT NULL,
complete INTEGER NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
next_seq INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_acp_replay_sessions_key_updated
ON acp_replay_sessions(session_key, complete, updated_at DESC, session_id);
CREATE INDEX IF NOT EXISTS idx_acp_replay_sessions_updated
ON acp_replay_sessions(updated_at DESC, session_id);
CREATE TABLE IF NOT EXISTS acp_replay_events (
session_id TEXT NOT NULL,
seq INTEGER NOT NULL,
at INTEGER NOT NULL,
session_key TEXT NOT NULL,
run_id TEXT,
update_json TEXT NOT NULL,
PRIMARY KEY (session_id, seq),
FOREIGN KEY (session_id) REFERENCES acp_replay_sessions(session_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_acp_replay_events_session_seq
ON acp_replay_events(session_id, seq);
CREATE TABLE IF NOT EXISTS agents (
agent_id TEXT NOT NULL PRIMARY KEY,
config_json TEXT NOT NULL,

View File

@@ -11,6 +11,36 @@ CREATE TABLE IF NOT EXISTS kv (
PRIMARY KEY (scope, key)
);
CREATE TABLE IF NOT EXISTS acp_replay_sessions (
session_id TEXT NOT NULL PRIMARY KEY,
session_key TEXT NOT NULL,
cwd TEXT NOT NULL,
complete INTEGER NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
next_seq INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_acp_replay_sessions_key_updated
ON acp_replay_sessions(session_key, complete, updated_at DESC, session_id);
CREATE INDEX IF NOT EXISTS idx_acp_replay_sessions_updated
ON acp_replay_sessions(updated_at DESC, session_id);
CREATE TABLE IF NOT EXISTS acp_replay_events (
session_id TEXT NOT NULL,
seq INTEGER NOT NULL,
at INTEGER NOT NULL,
session_key TEXT NOT NULL,
run_id TEXT,
update_json TEXT NOT NULL,
PRIMARY KEY (session_id, seq),
FOREIGN KEY (session_id) REFERENCES acp_replay_sessions(session_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_acp_replay_events_session_seq
ON acp_replay_events(session_id, seq);
CREATE TABLE IF NOT EXISTS agents (
agent_id TEXT NOT NULL PRIMARY KEY,
config_json TEXT NOT NULL,