diff --git a/docs/refactor/database-first.md b/docs/refactor/database-first.md index 8556cfdd4a3..b561bd851d5 100644 --- a/docs/refactor/database-first.md +++ b/docs/refactor/database-first.md @@ -698,8 +698,9 @@ Move these into agent databases: the per-agent `cache_entries` table. Gateway-wide model caches stay in the global database unless they become agent-specific. - ACP parent stream logs. Done for runtime writes. -- ACP replay ledger sessions. Done for runtime writes; legacy - `acp/event-ledger.json` remains only as doctor/migrate input. +- ACP replay ledger sessions. Done for runtime writes via + `acp_replay_sessions` and `acp_replay_events`; legacy `acp/event-ledger.json` + remains only as doctor/migrate input. - Trajectory sidecars when they are not explicit export files. Done for runtime writes: trajectory capture writes agent-database `trajectory_runtime_events` rows and mirrors run-scoped artifacts into SQLite. Legacy sidecars remain diff --git a/src/acp/event-ledger.test.ts b/src/acp/event-ledger.test.ts index 2ba04bfb857..43abb47b11a 100644 --- a/src/acp/event-ledger.test.ts +++ b/src/acp/event-ledger.test.ts @@ -1,6 +1,7 @@ import fs from "node:fs/promises"; import path from "node:path"; import { afterEach, describe, expect, it } from "vitest"; +import { requireNodeSqlite } from "../infra/node-sqlite.js"; import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; import { withTempDir } from "../test-helpers/temp-dir.js"; import { @@ -118,6 +119,45 @@ describe("ACP event ledger", () => { }); }); + it("stores SQLite replay state in relational tables instead of legacy kv blobs", async () => { + await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => { + const dbPath = path.join(dir, "openclaw-state.sqlite"); + const ledger = createSqliteAcpEventLedger({ path: dbPath, now: () => 1000 }); + await ledger.startSession({ + sessionId: "session-1", + sessionKey: "agent:main:work", + cwd: "/work", + complete: true, + }); + await ledger.recordUpdate({ + sessionId: "session-1", + sessionKey: "agent:main:work", + runId: "run-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Answer" }, + }, + }); + closeOpenClawStateDatabaseForTest(); + + const sqlite = requireNodeSqlite(); + const db = new sqlite.DatabaseSync(dbPath); + try { + expect(db.prepare("SELECT COUNT(*) AS count FROM acp_replay_sessions").get()).toEqual({ + count: 1, + }); + expect(db.prepare("SELECT COUNT(*) AS count FROM acp_replay_events").get()).toEqual({ + count: 1, + }); + expect( + db.prepare("SELECT COUNT(*) AS count FROM kv WHERE scope = 'acp_event_ledger'").get(), + ).toEqual({ count: 0 }); + } finally { + db.close(); + } + }); + }); + it("can replay a complete session by Gateway session key", async () => { const ledger = createInMemoryAcpEventLedger({ now: () => 1000 }); await ledger.startSession({ diff --git a/src/acp/event-ledger.ts b/src/acp/event-ledger.ts index 9ed3a557797..c75c6b9901c 100644 --- a/src/acp/event-ledger.ts +++ b/src/acp/event-ledger.ts @@ -84,13 +84,35 @@ type LedgerOptions = { now?: () => number; }; -type AcpEventLedgerKvDatabase = Pick; +type AcpEventLedgerDatabase = Pick< + OpenClawStateKyselyDatabase, + "acp_replay_events" | "acp_replay_sessions" | "kv" +>; type LedgerKvRow = { key: string; value_json: string; }; +type LedgerSessionRow = { + complete: number; + created_at: number; + cwd: string; + next_seq: number; + session_id: string; + session_key: string; + updated_at: number; +}; + +type LedgerEventRow = { + at: number; + run_id: string | null; + seq: number; + session_id: string; + session_key: string; + update_json: string; +}; + type MutableLedgerState = { store: LedgerStore; maxSessions: number; @@ -447,8 +469,8 @@ function dbOptionsFromParams( }; } -function loadStoreFromSqliteDb(database: DatabaseSync): LedgerStore { - const db = getNodeSqliteKysely(database); +function loadStoreFromLegacyKvRows(database: DatabaseSync): LedgerStore { + const db = getNodeSqliteKysely(database); const rows = executeSqliteQuerySync( database, db @@ -473,30 +495,90 @@ function loadStoreFromSqliteDb(database: DatabaseSync): LedgerStore { return { version: LEDGER_VERSION, sessions }; } +function loadStoreFromSqliteDb(database: DatabaseSync): LedgerStore { + const db = getNodeSqliteKysely(database); + const sessionRows = executeSqliteQuerySync( + database, + db + .selectFrom("acp_replay_sessions") + .select([ + "session_id", + "session_key", + "cwd", + "complete", + "created_at", + "updated_at", + "next_seq", + ]) + .orderBy("updated_at", "desc") + .orderBy("session_id", "asc"), + ).rows; + if (sessionRows.length === 0) { + return loadStoreFromLegacyKvRows(database); + } + + const sessions: Record = {}; + for (const row of sessionRows) { + sessions[row.session_id] = { + sessionId: row.session_id, + sessionKey: row.session_key, + cwd: row.cwd, + complete: row.complete === 1, + createdAt: row.created_at, + updatedAt: row.updated_at, + nextSeq: row.next_seq, + events: [], + }; + } + + const eventRows = executeSqliteQuerySync( + database, + db + .selectFrom("acp_replay_events") + .select(["session_id", "seq", "at", "session_key", "run_id", "update_json"]) + .orderBy("session_id", "asc") + .orderBy("seq", "asc"), + ).rows; + for (const row of eventRows) { + const session = sessions[row.session_id]; + if (!session) { + continue; + } + try { + session.events.push({ + seq: row.seq, + at: row.at, + sessionId: row.session_id, + sessionKey: row.session_key, + ...(row.run_id ? { runId: row.run_id } : {}), + update: JSON.parse(row.update_json) as SessionUpdate, + }); + } catch { + session.complete = false; + } + } + + return { version: LEDGER_VERSION, sessions }; +} + function writeStoreToSqliteDb( database: DatabaseSync, store: LedgerStore, updatedAt: number, options: { pruneMissing?: boolean } = {}, ): void { - const db = getNodeSqliteKysely(database); + const db = getNodeSqliteKysely(database); if (options.pruneMissing !== false) { - const existing = executeSqliteQuerySync( + const existing = executeSqliteQuerySync<{ session_id: string }>( database, - db - .selectFrom("kv") - .select(["key", "value_json"]) - .where("scope", "=", ACP_EVENT_LEDGER_KV_SCOPE), + db.selectFrom("acp_replay_sessions").select("session_id"), ).rows; const retained = new Set(Object.keys(store.sessions)); for (const row of existing) { - if (!retained.has(row.key)) { + if (!retained.has(row.session_id)) { executeSqliteQuerySync( database, - db - .deleteFrom("kv") - .where("scope", "=", ACP_EVENT_LEDGER_KV_SCOPE) - .where("key", "=", row.key), + db.deleteFrom("acp_replay_sessions").where("session_id", "=", row.session_id), ); } } @@ -505,21 +587,65 @@ function writeStoreToSqliteDb( executeSqliteQuerySync( database, db - .insertInto("kv") + .insertInto("acp_replay_sessions") .values({ - scope: ACP_EVENT_LEDGER_KV_SCOPE, - key: session.sessionId, - value_json: JSON.stringify(session), - updated_at: updatedAt, + session_id: session.sessionId, + session_key: session.sessionKey, + cwd: session.cwd, + complete: session.complete ? 1 : 0, + created_at: session.createdAt, + updated_at: session.updatedAt || updatedAt, + next_seq: session.nextSeq, }) .onConflict((conflict) => - conflict.columns(["scope", "key"]).doUpdateSet({ - value_json: JSON.stringify(session), - updated_at: updatedAt, + conflict.column("session_id").doUpdateSet({ + session_key: session.sessionKey, + cwd: session.cwd, + complete: session.complete ? 1 : 0, + created_at: session.createdAt, + updated_at: session.updatedAt || updatedAt, + next_seq: session.nextSeq, }), ), ); + executeSqliteQuerySync( + database, + db.deleteFrom("acp_replay_events").where("session_id", "=", session.sessionId), + ); + if (session.events.length > 0) { + executeSqliteQuerySync( + database, + db.insertInto("acp_replay_events").values( + session.events.map((event) => ({ + session_id: event.sessionId, + seq: event.seq, + at: event.at, + session_key: event.sessionKey, + run_id: event.runId ?? null, + update_json: JSON.stringify(event.update), + })), + ), + ); + } } + executeSqliteQuerySync( + database, + db.deleteFrom("kv").where("scope", "=", ACP_EVENT_LEDGER_KV_SCOPE), + ); + executeSqliteQuerySync( + database, + db + .deleteFrom("acp_replay_events") + .where(({ not, exists, selectFrom, ref }) => + not( + exists( + selectFrom("acp_replay_sessions") + .select("session_id") + .whereRef("acp_replay_sessions.session_id", "=", ref("acp_replay_events.session_id")), + ), + ), + ), + ); } function writeStoreToSqlite( diff --git a/src/state/openclaw-state-db.generated.d.ts b/src/state/openclaw-state-db.generated.d.ts index 58985b6fe51..48fcf848766 100644 --- a/src/state/openclaw-state-db.generated.d.ts +++ b/src/state/openclaw-state-db.generated.d.ts @@ -9,6 +9,25 @@ export type Generated = T extends ColumnType ? ColumnType : ColumnType; +export interface AcpReplayEvents { + at: number; + run_id: string | null; + seq: number; + session_id: string; + session_key: string; + update_json: string; +} + +export interface AcpReplaySessions { + complete: number; + created_at: number; + cwd: string; + next_seq: number; + session_id: string; + session_key: string; + updated_at: number; +} + export interface AgentDatabases { agent_id: string; last_seen_at: number; @@ -256,6 +275,8 @@ export interface TranscriptFiles { } export interface DB { + acp_replay_events: AcpReplayEvents; + acp_replay_sessions: AcpReplaySessions; agent_databases: AgentDatabases; agents: Agents; backup_runs: BackupRuns; diff --git a/src/state/openclaw-state-db.ts b/src/state/openclaw-state-db.ts index cdb1063b354..4ae5d19ca92 100644 --- a/src/state/openclaw-state-db.ts +++ b/src/state/openclaw-state-db.ts @@ -17,7 +17,7 @@ import { } from "./openclaw-state-db.paths.js"; import { OPENCLAW_STATE_SCHEMA_SQL } from "./openclaw-state-schema.generated.js"; -const OPENCLAW_STATE_SCHEMA_VERSION = 17; +const OPENCLAW_STATE_SCHEMA_VERSION = 18; export const OPENCLAW_SQLITE_BUSY_TIMEOUT_MS = 30_000; const OPENCLAW_STATE_DIR_MODE = 0o700; const OPENCLAW_STATE_FILE_MODE = 0o600; diff --git a/src/state/openclaw-state-schema.generated.ts b/src/state/openclaw-state-schema.generated.ts index e3b177bbd70..f84246ae0ce 100644 --- a/src/state/openclaw-state-schema.generated.ts +++ b/src/state/openclaw-state-schema.generated.ts @@ -16,6 +16,36 @@ CREATE TABLE IF NOT EXISTS kv ( PRIMARY KEY (scope, key) ); +CREATE TABLE IF NOT EXISTS acp_replay_sessions ( + session_id TEXT NOT NULL PRIMARY KEY, + session_key TEXT NOT NULL, + cwd TEXT NOT NULL, + complete INTEGER NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + next_seq INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_acp_replay_sessions_key_updated + ON acp_replay_sessions(session_key, complete, updated_at DESC, session_id); + +CREATE INDEX IF NOT EXISTS idx_acp_replay_sessions_updated + ON acp_replay_sessions(updated_at DESC, session_id); + +CREATE TABLE IF NOT EXISTS acp_replay_events ( + session_id TEXT NOT NULL, + seq INTEGER NOT NULL, + at INTEGER NOT NULL, + session_key TEXT NOT NULL, + run_id TEXT, + update_json TEXT NOT NULL, + PRIMARY KEY (session_id, seq), + FOREIGN KEY (session_id) REFERENCES acp_replay_sessions(session_id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_acp_replay_events_session_seq + ON acp_replay_events(session_id, seq); + CREATE TABLE IF NOT EXISTS agents ( agent_id TEXT NOT NULL PRIMARY KEY, config_json TEXT NOT NULL, diff --git a/src/state/openclaw-state-schema.sql b/src/state/openclaw-state-schema.sql index e4ca2d960bf..18bc8fd8ae7 100644 --- a/src/state/openclaw-state-schema.sql +++ b/src/state/openclaw-state-schema.sql @@ -11,6 +11,36 @@ CREATE TABLE IF NOT EXISTS kv ( PRIMARY KEY (scope, key) ); +CREATE TABLE IF NOT EXISTS acp_replay_sessions ( + session_id TEXT NOT NULL PRIMARY KEY, + session_key TEXT NOT NULL, + cwd TEXT NOT NULL, + complete INTEGER NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + next_seq INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_acp_replay_sessions_key_updated + ON acp_replay_sessions(session_key, complete, updated_at DESC, session_id); + +CREATE INDEX IF NOT EXISTS idx_acp_replay_sessions_updated + ON acp_replay_sessions(updated_at DESC, session_id); + +CREATE TABLE IF NOT EXISTS acp_replay_events ( + session_id TEXT NOT NULL, + seq INTEGER NOT NULL, + at INTEGER NOT NULL, + session_key TEXT NOT NULL, + run_id TEXT, + update_json TEXT NOT NULL, + PRIMARY KEY (session_id, seq), + FOREIGN KEY (session_id) REFERENCES acp_replay_sessions(session_id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_acp_replay_events_session_seq + ON acp_replay_events(session_id, seq); + CREATE TABLE IF NOT EXISTS agents ( agent_id TEXT NOT NULL PRIMARY KEY, config_json TEXT NOT NULL,