From b41b6382afdfe0fb4eb5e224e76a55860fbbbf97 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 9 May 2026 00:11:11 +0100 Subject: [PATCH] refactor: store current conversation bindings in sqlite table --- docs/refactor/database-first.md | 9 +- .../outbound/current-conversation-bindings.ts | 193 ++++++++++++++---- src/infra/state-migrations.test.ts | 43 ++-- src/infra/state-migrations.ts | 43 +++- src/state/openclaw-state-db.generated.d.ts | 18 ++ src/state/openclaw-state-db.test.ts | 56 ++++- src/state/openclaw-state-db.ts | 116 ++++++++++- src/state/openclaw-state-schema.generated.ts | 22 ++ src/state/openclaw-state-schema.sql | 22 ++ 9 files changed, 451 insertions(+), 71 deletions(-) diff --git a/docs/refactor/database-first.md b/docs/refactor/database-first.md index 6e2a07c6aa6..4c5bf65e93c 100644 --- a/docs/refactor/database-first.md +++ b/docs/refactor/database-first.md @@ -104,8 +104,9 @@ The branch already has a real shared SQLite base: `plugin_state_entries`, `plugin_blob_entries`, `transcript_files`, `capture_sessions`, `capture_events`, `capture_blobs`, `sandbox_registry_entries`, `cron_run_logs`, `cron_jobs`, `commitments`, - `delivery_queue_entries`, `task_runs`, `task_delivery_state`, `flow_runs`, - `subagent_runs`, `migration_runs`, and `backup_runs`. + `delivery_queue_entries`, `current_conversation_bindings`, `task_runs`, + `task_delivery_state`, `flow_runs`, `subagent_runs`, `migration_runs`, and + `backup_runs`. - `src/state/openclaw-agent-db.ts` opens `agents//agent/openclaw-agent.sqlite`, registers the database in the global DB, and owns agent-local session, transcript, VFS, artifact, and cache @@ -115,6 +116,9 @@ The branch already has a real shared SQLite base: - Subagent run recovery state now lives in typed shared `subagent_runs` rows with indexed child, requester, and controller session keys. The old `subagent_runs` KV scope is migration input only. +- Current conversation bindings now live in typed shared + `current_conversation_bindings` rows keyed by normalized conversation id and + indexed by target session. The old KV scope is migration input only. - `src/agents/filesystem/virtual-agent-fs.sqlite.ts` implements a SQLite VFS over the agent database `vfs_entries` table. - `src/agents/runtime-worker.entry.ts` creates per-run SQLite VFS, tool artifact, @@ -529,6 +533,7 @@ task_runs(...) task_delivery_state(...) flow_runs(...) subagent_runs(run_id, child_session_key, requester_session_key, controller_session_key, created_at, ended_at, cleanup_handled, payload_json) +current_conversation_bindings(binding_key, binding_id, channel, account_id, conversation_id, target_session_key, status, bound_at, expires_at, record_json) plugin_state_entries(plugin_id, namespace, entry_key, value_json, created_at, expires_at) plugin_blob_entries(plugin_id, namespace, entry_key, metadata_json, blob, created_at, expires_at) media_blobs(subdir, id, content_type, size_bytes, blob, created_at, updated_at) diff --git a/src/infra/outbound/current-conversation-bindings.ts b/src/infra/outbound/current-conversation-bindings.ts index 9523c40d31f..3070b976712 100644 --- a/src/infra/outbound/current-conversation-bindings.ts +++ b/src/infra/outbound/current-conversation-bindings.ts @@ -1,12 +1,16 @@ +import type { DatabaseSync } from "node:sqlite"; +import type { Insertable, Selectable } from "kysely"; import { normalizeConversationText } from "../../acp/conversation-id.js"; import { normalizeAnyChannelId } from "../../channels/registry.js"; +import { executeSqliteQuerySync, getNodeSqliteKysely } from "../../infra/kysely-sync.js"; import { getActivePluginChannelRegistryFromState } from "../../plugins/runtime-channel-state.js"; import { normalizeOptionalLowercaseString } from "../../shared/string-coerce.js"; +import type { DB as OpenClawStateKyselyDatabase } from "../../state/openclaw-state-db.generated.js"; import { - deleteOpenClawStateKvJson, - listOpenClawStateKvJson, - writeOpenClawStateKvJson, -} from "../../state/openclaw-state-kv.js"; + type OpenClawStateDatabaseOptions, + openOpenClawStateDatabase, + runOpenClawStateWriteTransaction, +} from "../../state/openclaw-state-db.js"; import { normalizeConversationRef } from "./session-binding-normalization.js"; import type { ConversationRef, @@ -16,13 +20,14 @@ import type { SessionBindingUnbindInput, } from "./session-binding.types.js"; -type PersistedCurrentConversationBindingEntry = { - version: 1; - binding: SessionBindingRecord; -}; +type CurrentConversationBindingsTable = + OpenClawStateKyselyDatabase["current_conversation_bindings"]; +type CurrentConversationBindingRow = Selectable; +type CurrentConversationBindingsDatabase = Pick< + OpenClawStateKyselyDatabase, + "current_conversation_bindings" +>; -const CURRENT_BINDINGS_FILE_VERSION = 1; -const CURRENT_BINDINGS_KV_SCOPE = "current-conversation-bindings"; const CURRENT_BINDINGS_ID_PREFIX = "generic:"; let bindingsLoaded = false; @@ -48,19 +53,134 @@ function isBindingExpired(record: SessionBindingRecord, now = Date.now()): boole : false; } -function toPersistedEntry(record: SessionBindingRecord): PersistedCurrentConversationBindingEntry { +function sqliteOptionsForEnv(env: NodeJS.ProcessEnv = process.env): OpenClawStateDatabaseOptions { return { - version: CURRENT_BINDINGS_FILE_VERSION, - binding: record, + env, }; } -function parsePersistedBindingEntry(value: unknown): SessionBindingRecord | null { - if (!value || typeof value !== "object" || Array.isArray(value)) { +function getCurrentConversationBindingsKysely(db: DatabaseSync) { + return getNodeSqliteKysely(db); +} + +function serializeJson(value: unknown): string | null { + return value == null ? null : JSON.stringify(value); +} + +function parseJsonRecord(raw: string | null): Record | undefined { + if (!raw?.trim()) { + return undefined; + } + try { + const parsed = JSON.parse(raw) as unknown; + return parsed && typeof parsed === "object" && !Array.isArray(parsed) + ? (parsed as Record) + : undefined; + } catch { + return undefined; + } +} + +function normalizeNumber(value: number | bigint | null): number | undefined { + if (typeof value === "bigint") { + return Number(value); + } + return typeof value === "number" ? value : undefined; +} + +function recordToRow(record: SessionBindingRecord): Insertable { + const conversation = normalizeConversationRef(record.conversation); + const bindingKey = buildConversationKey(conversation); + const bindingId = buildBindingId(conversation); + const normalized: SessionBindingRecord = { + ...record, + bindingId, + conversation, + targetSessionKey: record.targetSessionKey.trim(), + }; + return { + binding_key: bindingKey, + binding_id: bindingId, + channel: conversation.channel, + account_id: conversation.accountId, + parent_conversation_id: conversation.parentConversationId ?? null, + conversation_id: conversation.conversationId, + target_session_key: normalized.targetSessionKey, + target_kind: normalized.targetKind, + status: normalized.status, + bound_at: normalized.boundAt, + expires_at: normalized.expiresAt ?? null, + metadata_json: serializeJson(normalized.metadata), + record_json: JSON.stringify(normalized), + updated_at: Date.now(), + }; +} + +function rowToRecord(row: CurrentConversationBindingRow): SessionBindingRecord | null { + const parsed = parseJsonRecord(row.record_json) ?? {}; + const conversation = normalizeConversationRef({ + channel: row.channel, + accountId: row.account_id, + conversationId: row.conversation_id, + ...(row.parent_conversation_id ? { parentConversationId: row.parent_conversation_id } : {}), + }); + const targetSessionKey = row.target_session_key.trim(); + if (!conversation.channel || !conversation.conversationId || !targetSessionKey) { return null; } - const record = value as PersistedCurrentConversationBindingEntry; - return record.version === CURRENT_BINDINGS_FILE_VERSION ? record.binding : null; + return { + ...(parsed as Partial), + bindingId: buildBindingId(conversation), + targetSessionKey, + targetKind: row.target_kind === "subagent" ? "subagent" : "session", + conversation, + status: + row.status === "ending" || row.status === "ended" || row.status === "active" + ? row.status + : "active", + boundAt: normalizeNumber(row.bound_at) ?? 0, + ...(row.expires_at != null ? { expiresAt: normalizeNumber(row.expires_at) } : {}), + metadata: parseJsonRecord(row.metadata_json), + }; +} + +function upsertBindingRow(record: SessionBindingRecord, env?: NodeJS.ProcessEnv): void { + runOpenClawStateWriteTransaction((stateDatabase) => { + const db = getCurrentConversationBindingsKysely(stateDatabase.db); + executeSqliteQuerySync( + stateDatabase.db, + db + .insertInto("current_conversation_bindings") + .values(recordToRow(record)) + .onConflict((conflict) => + conflict.column("binding_key").doUpdateSet({ + binding_id: (eb) => eb.ref("excluded.binding_id"), + channel: (eb) => eb.ref("excluded.channel"), + account_id: (eb) => eb.ref("excluded.account_id"), + parent_conversation_id: (eb) => eb.ref("excluded.parent_conversation_id"), + conversation_id: (eb) => eb.ref("excluded.conversation_id"), + target_session_key: (eb) => eb.ref("excluded.target_session_key"), + target_kind: (eb) => eb.ref("excluded.target_kind"), + status: (eb) => eb.ref("excluded.status"), + bound_at: (eb) => eb.ref("excluded.bound_at"), + expires_at: (eb) => eb.ref("excluded.expires_at"), + metadata_json: (eb) => eb.ref("excluded.metadata_json"), + record_json: (eb) => eb.ref("excluded.record_json"), + updated_at: (eb) => eb.ref("excluded.updated_at"), + }), + ), + ); + }, sqliteOptionsForEnv(env)); +} + +function deleteBindingRow(key: string, env?: NodeJS.ProcessEnv): void { + runOpenClawStateWriteTransaction((stateDatabase) => { + const db = getCurrentConversationBindingsKysely(stateDatabase.db); + executeSqliteQuerySync( + stateDatabase.db, + db.deleteFrom("current_conversation_bindings").where("binding_key", "=", key), + ); + }, sqliteOptionsForEnv(env)); } function loadBindingsIntoMemory(): void { @@ -69,12 +189,16 @@ function loadBindingsIntoMemory(): void { } bindingsLoaded = true; bindingsByConversationKey.clear(); - const entries = - listOpenClawStateKvJson(CURRENT_BINDINGS_KV_SCOPE); - for (const entry of entries) { - const record = parsePersistedBindingEntry(entry.value); + const stateDatabase = openOpenClawStateDatabase(); + const db = getCurrentConversationBindingsKysely(stateDatabase.db); + const rows = executeSqliteQuerySync( + stateDatabase.db, + db.selectFrom("current_conversation_bindings").selectAll().orderBy("updated_at", "asc"), + ).rows; + for (const row of rows) { + const record = rowToRecord(row); if (!record?.bindingId || !record?.conversation?.conversationId || isBindingExpired(record)) { - deleteOpenClawStateKvJson(CURRENT_BINDINGS_KV_SCOPE, entry.key); + deleteBindingRow(row.binding_key); continue; } const conversation = normalizeConversationRef(record.conversation); @@ -92,12 +216,11 @@ function loadBindingsIntoMemory(): void { } function persistBinding(record: SessionBindingRecord): void { - const key = buildConversationKey(record.conversation); - writeOpenClawStateKvJson(CURRENT_BINDINGS_KV_SCOPE, key, toPersistedEntry(record)); + upsertBindingRow(record); } function deletePersistedBinding(key: string): void { - deleteOpenClawStateKvJson(CURRENT_BINDINGS_KV_SCOPE, key); + deleteBindingRow(key); } function pruneExpiredBinding(key: string): SessionBindingRecord | null { @@ -273,11 +396,10 @@ export const __testing = { bindingsLoaded = false; bindingsByConversationKey.clear(); if (params?.deletePersistedFile) { - for (const entry of listOpenClawStateKvJson(CURRENT_BINDINGS_KV_SCOPE, { - env: params.env, - })) { - deleteOpenClawStateKvJson(CURRENT_BINDINGS_KV_SCOPE, entry.key, { env: params.env }); - } + runOpenClawStateWriteTransaction((stateDatabase) => { + const db = getCurrentConversationBindingsKysely(stateDatabase.db); + executeSqliteQuerySync(stateDatabase.db, db.deleteFrom("current_conversation_bindings")); + }, sqliteOptionsForEnv(params.env)); } }, persistBindingForTests(record: SessionBindingRecord, env?: NodeJS.ProcessEnv) { @@ -287,15 +409,6 @@ export const __testing = { bindingId: buildBindingId(conversation), conversation, }; - writeOpenClawStateKvJson( - CURRENT_BINDINGS_KV_SCOPE, - buildConversationKey(conversation), - { - version: CURRENT_BINDINGS_FILE_VERSION, - binding: normalized, - }, - { env }, - ); + upsertBindingRow(normalized, env); }, - currentBindingsKvScope: CURRENT_BINDINGS_KV_SCOPE, }; diff --git a/src/infra/state-migrations.test.ts b/src/infra/state-migrations.test.ts index 3b488198389..cee1f2b1858 100644 --- a/src/infra/state-migrations.test.ts +++ b/src/infra/state-migrations.test.ts @@ -21,6 +21,10 @@ import { detectLegacyStateMigrations, runLegacyStateMigrations } from "./state-m type DeliveryQueueTestDatabase = Pick; type KvTestDatabase = Pick; +type CurrentConversationBindingsTestDatabase = Pick< + OpenClawStateKyselyDatabase, + "current_conversation_bindings" +>; type PluginStateTestDatabase = Pick; type MigrationSourceRow = { migration_kind: string; @@ -588,7 +592,7 @@ describe("state migrations", () => { await expect(fs.stat(sourcePath)).rejects.toMatchObject({ code: "ENOENT" }); }); - it("imports legacy current conversation bindings into SQLite KV", async () => { + it("imports legacy current conversation bindings into SQLite rows", async () => { const { root, stateDir, env, cfg } = await createLegacyStateFixture(); const sourcePath = path.join(stateDir, "bindings", "current-conversations.json"); await fs.mkdir(path.dirname(sourcePath), { recursive: true }); @@ -637,26 +641,31 @@ describe("state migrations", () => { ); const stateDatabase = openOpenClawStateDatabase({ env }); - const db = getNodeSqliteKysely(stateDatabase.db); - const row = executeSqliteQuerySync<{ key: string; value_json: string }>( + const db = getNodeSqliteKysely(stateDatabase.db); + const row = executeSqliteQuerySync<{ + binding_key: string; + binding_id: string; + target_session_key: string; + record_json: string; + }>( stateDatabase.db, db - .selectFrom("kv") - .select(["key", "value_json"]) - .where("scope", "=", "current-conversation-bindings"), + .selectFrom("current_conversation_bindings") + .select(["binding_key", "binding_id", "target_session_key", "record_json"]), ).rows[0]; - expect(row?.key).toBe("forum\u241fdefault\u241f\u241f6098642967"); - expect(JSON.parse(row?.value_json ?? "{}")).toMatchObject({ - version: 1, - binding: { - bindingId: "generic:forum\u241fdefault\u241f\u241f6098642967", - targetSessionKey: "agent:worker-1:acp:forum-dm", - conversation: { - channel: "forum", - accountId: "default", - conversationId: "6098642967", - }, + expect(row).toMatchObject({ + binding_key: "forum\u241fdefault\u241f\u241f6098642967", + binding_id: "generic:forum\u241fdefault\u241f\u241f6098642967", + target_session_key: "agent:worker-1:acp:forum-dm", + }); + expect(JSON.parse(row?.record_json ?? "{}")).toMatchObject({ + bindingId: "generic:forum\u241fdefault\u241f\u241f6098642967", + targetSessionKey: "agent:worker-1:acp:forum-dm", + conversation: { + channel: "forum", + accountId: "default", + conversationId: "6098642967", }, }); await expect(fs.stat(sourcePath)).rejects.toMatchObject({ code: "ENOENT" }); diff --git a/src/infra/state-migrations.ts b/src/infra/state-migrations.ts index 9bba7f3720d..617d75afe0d 100644 --- a/src/infra/state-migrations.ts +++ b/src/infra/state-migrations.ts @@ -411,7 +411,6 @@ type LegacyCurrentConversationBindingsFile = { bindings?: unknown; }; -const CURRENT_CONVERSATION_BINDINGS_KV_SCOPE = "current-conversation-bindings"; const CURRENT_CONVERSATION_BINDINGS_ID_PREFIX = "generic:"; function readLegacyQueueJson(filePath: string, id: string, enqueuedAt: number): string { @@ -997,7 +996,9 @@ function importLegacyCurrentConversationBindingsToSqlite( let imported = 0; runOpenClawStateWriteTransaction( (stateDatabase) => { - const db = getNodeSqliteKysely(stateDatabase.db); + const db = getNodeSqliteKysely( + stateDatabase.db, + ); for (const candidate of bindings) { if (!candidate || typeof candidate !== "object" || Array.isArray(candidate)) { continue; @@ -1017,16 +1018,38 @@ function importLegacyCurrentConversationBindingsToSqlite( executeSqliteQuerySync( stateDatabase.db, db - .insertInto("kv") + .insertInto("current_conversation_bindings") .values({ - scope: CURRENT_CONVERSATION_BINDINGS_KV_SCOPE, - key, - value_json: JSON.stringify({ version: 1, binding: normalized }), + binding_key: key, + binding_id: normalized.bindingId, + channel: normalized.conversation.channel, + account_id: normalized.conversation.accountId, + parent_conversation_id: normalized.conversation.parentConversationId ?? null, + conversation_id: normalized.conversation.conversationId, + target_session_key: normalized.targetSessionKey, + target_kind: normalized.targetKind, + status: normalized.status, + bound_at: normalized.boundAt, + expires_at: normalized.expiresAt ?? null, + metadata_json: + normalized.metadata == null ? null : JSON.stringify(normalized.metadata), + record_json: JSON.stringify(normalized), updated_at: updatedAt, }) .onConflict((conflict) => - conflict.columns(["scope", "key"]).doUpdateSet({ - value_json: (eb) => eb.ref("excluded.value_json"), + conflict.column("binding_key").doUpdateSet({ + binding_id: (eb) => eb.ref("excluded.binding_id"), + channel: (eb) => eb.ref("excluded.channel"), + account_id: (eb) => eb.ref("excluded.account_id"), + parent_conversation_id: (eb) => eb.ref("excluded.parent_conversation_id"), + conversation_id: (eb) => eb.ref("excluded.conversation_id"), + target_session_key: (eb) => eb.ref("excluded.target_session_key"), + target_kind: (eb) => eb.ref("excluded.target_kind"), + status: (eb) => eb.ref("excluded.status"), + bound_at: (eb) => eb.ref("excluded.bound_at"), + expires_at: (eb) => eb.ref("excluded.expires_at"), + metadata_json: (eb) => eb.ref("excluded.metadata_json"), + record_json: (eb) => eb.ref("excluded.record_json"), updated_at: (eb) => eb.ref("excluded.updated_at"), }), ), @@ -1842,6 +1865,10 @@ type AgentToolArtifactMigrationDatabase = Pick; type DeliveryQueueMigrationDatabase = Pick; type KvMigrationDatabase = Pick; +type CurrentConversationBindingsMigrationDatabase = Pick< + OpenClawStateKyselyDatabase, + "current_conversation_bindings" +>; type PluginStateMigrationDatabase = Pick; const CLAWHUB_SKILL_STATE_OWNER_ID = "core:clawhub-skills"; diff --git a/src/state/openclaw-state-db.generated.d.ts b/src/state/openclaw-state-db.generated.d.ts index 77d4827f179..d02e0b2deaf 100644 --- a/src/state/openclaw-state-db.generated.d.ts +++ b/src/state/openclaw-state-db.generated.d.ts @@ -129,6 +129,23 @@ export interface CronRunLogs { ts: number; } +export interface CurrentConversationBindings { + account_id: string; + binding_id: string; + binding_key: string; + bound_at: number; + channel: string; + conversation_id: string; + expires_at: number | null; + metadata_json: string | null; + parent_conversation_id: string | null; + record_json: string; + status: string; + target_kind: string; + target_session_key: string; + updated_at: number; +} + export interface DeliveryQueueEntries { enqueued_at: number; entry_json: string; @@ -339,6 +356,7 @@ export interface DB { commitments: Commitments; cron_jobs: CronJobs; cron_run_logs: CronRunLogs; + current_conversation_bindings: CurrentConversationBindings; delivery_queue_entries: DeliveryQueueEntries; flow_runs: FlowRuns; kv: Kv; diff --git a/src/state/openclaw-state-db.test.ts b/src/state/openclaw-state-db.test.ts index d7a0d61f6f7..e31543b1d1b 100644 --- a/src/state/openclaw-state-db.test.ts +++ b/src/state/openclaw-state-db.test.ts @@ -157,7 +157,7 @@ describe("openclaw state database", () => { expect(columns.some((column) => column.name === "sort_order")).toBe(true); expect(index?.sql).toContain("sort_order ASC"); - expect(version.user_version).toBe(20); + expect(version.user_version).toBe(21); }); it("migrates legacy cron runtime state from kv into cron job columns", () => { @@ -215,7 +215,7 @@ describe("openclaw state database", () => { .prepare("SELECT COUNT(*) AS count FROM kv WHERE scope = ?") .get("cron.jobs.state"), ).toEqual({ count: 0 }); - expect(database.db.prepare("PRAGMA user_version").get()).toEqual({ user_version: 20 }); + expect(database.db.prepare("PRAGMA user_version").get()).toEqual({ user_version: 21 }); }); it("migrates persisted subagent runs from kv into subagent run rows", () => { @@ -264,7 +264,57 @@ describe("openclaw state database", () => { expect( database.db.prepare("SELECT COUNT(*) AS count FROM kv WHERE scope = ?").get("subagent_runs"), ).toEqual({ count: 0 }); - expect(database.db.prepare("PRAGMA user_version").get()).toEqual({ user_version: 20 }); + expect(database.db.prepare("PRAGMA user_version").get()).toEqual({ user_version: 21 }); + }); + + it("migrates current conversation bindings from kv into binding rows", () => { + const stateDir = createTempStateDir(); + const dbPath = resolveOpenClawStateSqlitePath({ OPENCLAW_STATE_DIR: stateDir }); + fs.mkdirSync(path.dirname(dbPath), { recursive: true }); + const sqlite = requireNodeSqlite(); + const oldDb = new sqlite.DatabaseSync(dbPath); + oldDb.exec(` + CREATE TABLE kv ( + scope TEXT NOT NULL, + key TEXT NOT NULL, + value_json TEXT NOT NULL, + updated_at INTEGER NOT NULL, + PRIMARY KEY (scope, key) + ); + INSERT INTO kv (scope, key, value_json, updated_at) + VALUES ( + 'current-conversation-bindings', + 'forum\u241fdefault\u241f6098642967\u241f6098642967', + '{"version":1,"binding":{"bindingId":"generic:forum\u241fdefault\u241f6098642967\u241f6098642967","targetSessionKey":" agent:worker-1:acp:forum-dm ","targetKind":"session","conversation":{"channel":"forum","accountId":"default","conversationId":"6098642967","parentConversationId":"6098642967"},"status":"active","boundAt":1234,"metadata":{"label":"forum-dm"}}}', + 5678 + ); + PRAGMA user_version = 20; + `); + oldDb.close(); + + const database = openOpenClawStateDatabase({ + env: { OPENCLAW_STATE_DIR: stateDir }, + }); + + expect( + database.db + .prepare( + "SELECT binding_key, binding_id, parent_conversation_id, target_session_key, metadata_json FROM current_conversation_bindings WHERE conversation_id = ?", + ) + .get("6098642967"), + ).toEqual({ + binding_key: "forum\u241fdefault\u241f\u241f6098642967", + binding_id: "generic:forum\u241fdefault\u241f\u241f6098642967", + parent_conversation_id: null, + target_session_key: "agent:worker-1:acp:forum-dm", + metadata_json: '{"label":"forum-dm"}', + }); + expect( + database.db + .prepare("SELECT COUNT(*) AS count FROM kv WHERE scope = ?") + .get("current-conversation-bindings"), + ).toEqual({ count: 0 }); + expect(database.db.prepare("PRAGMA user_version").get()).toEqual({ user_version: 21 }); }); it("upgrades task delivery state with task-run cascade integrity", () => { diff --git a/src/state/openclaw-state-db.ts b/src/state/openclaw-state-db.ts index f503facd796..c630c096840 100644 --- a/src/state/openclaw-state-db.ts +++ b/src/state/openclaw-state-db.ts @@ -17,7 +17,7 @@ import { } from "./openclaw-state-db.paths.js"; import { OPENCLAW_STATE_SCHEMA_SQL } from "./openclaw-state-schema.generated.js"; -const OPENCLAW_STATE_SCHEMA_VERSION = 20; +const OPENCLAW_STATE_SCHEMA_VERSION = 21; export const OPENCLAW_SQLITE_BUSY_TIMEOUT_MS = 30_000; const OPENCLAW_STATE_DIR_MODE = 0o700; const OPENCLAW_STATE_FILE_MODE = 0o600; @@ -346,6 +346,117 @@ function migrateSubagentRunsFromKv(db: DatabaseSync): void { db.prepare("DELETE FROM kv WHERE scope = 'subagent_runs'").run(); } +function migrateCurrentConversationBindingsFromKv(db: DatabaseSync): void { + const legacyRows = db + .prepare( + "SELECT key, value_json, updated_at FROM kv WHERE scope = 'current-conversation-bindings'", + ) + .all() as Array<{ key?: unknown; value_json?: unknown; updated_at?: unknown }>; + if (legacyRows.length === 0) { + return; + } + + const insert = db.prepare(` + INSERT OR REPLACE INTO current_conversation_bindings ( + binding_key, + binding_id, + channel, + account_id, + parent_conversation_id, + conversation_id, + target_session_key, + target_kind, + status, + bound_at, + expires_at, + metadata_json, + record_json, + updated_at + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `); + + for (const row of legacyRows) { + if (typeof row.key !== "string" || typeof row.value_json !== "string") { + continue; + } + let parsed: unknown; + try { + parsed = JSON.parse(row.value_json); + } catch { + continue; + } + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + continue; + } + const binding = + (parsed as { version?: unknown; binding?: unknown }).version === 1 + ? (parsed as { binding?: unknown }).binding + : parsed; + if (!binding || typeof binding !== "object" || Array.isArray(binding)) { + continue; + } + const record = binding as Record; + const conversation = record.conversation; + if (!conversation || typeof conversation !== "object" || Array.isArray(conversation)) { + continue; + } + const conversationRecord = conversation as Record; + const channel = readString(conversationRecord.channel)?.toLowerCase(); + const accountId = readString(conversationRecord.accountId) ?? "default"; + const conversationId = readString(conversationRecord.conversationId); + const parentConversationIdRaw = readString(conversationRecord.parentConversationId); + const parentConversationId = + parentConversationIdRaw && parentConversationIdRaw !== conversationId + ? parentConversationIdRaw + : null; + const targetSessionKey = readString(record.targetSessionKey); + if (!channel || !conversationId || !targetSessionKey) { + continue; + } + const bindingKey = [channel, accountId, parentConversationId ?? "", conversationId].join( + "\u241f", + ); + const bindingId = `generic:${bindingKey}`; + const targetKind = record.targetKind === "subagent" ? "subagent" : "session"; + const status = + record.status === "ending" || record.status === "ended" ? record.status : "active"; + const updatedAt = readFiniteNumber(row.updated_at) ?? Date.now(); + const normalized = { + ...record, + bindingId, + targetSessionKey, + targetKind, + status, + conversation: { + ...conversationRecord, + channel, + accountId, + conversationId, + ...(parentConversationId ? { parentConversationId } : {}), + }, + }; + insert.run( + bindingKey, + bindingId, + channel, + accountId, + parentConversationId, + conversationId, + targetSessionKey, + targetKind, + status, + readFiniteNumber(record.boundAt) ?? updatedAt, + readFiniteNumber(record.expiresAt), + readJsonText(record.metadata), + JSON.stringify(normalized), + updatedAt, + ); + } + + db.prepare("DELETE FROM kv WHERE scope = 'current-conversation-bindings'").run(); +} + function rebuildTaskDeliveryStateWithForeignKey(db: DatabaseSync): void { db.exec(` CREATE TABLE IF NOT EXISTS task_delivery_state_next ( @@ -425,6 +536,9 @@ function migrateStateSchema(db: DatabaseSync, fromVersion: number): void { if (fromVersion < 20) { migrateSubagentRunsFromKv(db); } + if (fromVersion < 21) { + migrateCurrentConversationBindingsFromKv(db); + } } function ensureSchema(db: DatabaseSync, pathname: string): void { diff --git a/src/state/openclaw-state-schema.generated.ts b/src/state/openclaw-state-schema.generated.ts index b39c314f35e..7f5324c2ad4 100644 --- a/src/state/openclaw-state-schema.generated.ts +++ b/src/state/openclaw-state-schema.generated.ts @@ -353,6 +353,28 @@ CREATE INDEX IF NOT EXISTS idx_subagent_runs_archive_at CREATE INDEX IF NOT EXISTS idx_subagent_runs_ended_cleanup ON subagent_runs(ended_at, cleanup_handled, run_id); +CREATE TABLE IF NOT EXISTS current_conversation_bindings ( + binding_key TEXT NOT NULL PRIMARY KEY, + binding_id TEXT NOT NULL, + channel TEXT NOT NULL, + account_id TEXT NOT NULL, + parent_conversation_id TEXT, + conversation_id TEXT NOT NULL, + target_session_key TEXT NOT NULL, + target_kind TEXT NOT NULL, + status TEXT NOT NULL, + bound_at INTEGER NOT NULL, + expires_at INTEGER, + metadata_json TEXT, + record_json TEXT NOT NULL, + updated_at INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_current_conversation_bindings_target + ON current_conversation_bindings(target_session_key, updated_at DESC, binding_key); +CREATE INDEX IF NOT EXISTS idx_current_conversation_bindings_expires + ON current_conversation_bindings(expires_at, binding_key); + CREATE TABLE IF NOT EXISTS task_delivery_state ( task_id TEXT NOT NULL PRIMARY KEY, requester_origin_json TEXT, diff --git a/src/state/openclaw-state-schema.sql b/src/state/openclaw-state-schema.sql index b83ca6639a7..922014eba89 100644 --- a/src/state/openclaw-state-schema.sql +++ b/src/state/openclaw-state-schema.sql @@ -348,6 +348,28 @@ CREATE INDEX IF NOT EXISTS idx_subagent_runs_archive_at CREATE INDEX IF NOT EXISTS idx_subagent_runs_ended_cleanup ON subagent_runs(ended_at, cleanup_handled, run_id); +CREATE TABLE IF NOT EXISTS current_conversation_bindings ( + binding_key TEXT NOT NULL PRIMARY KEY, + binding_id TEXT NOT NULL, + channel TEXT NOT NULL, + account_id TEXT NOT NULL, + parent_conversation_id TEXT, + conversation_id TEXT NOT NULL, + target_session_key TEXT NOT NULL, + target_kind TEXT NOT NULL, + status TEXT NOT NULL, + bound_at INTEGER NOT NULL, + expires_at INTEGER, + metadata_json TEXT, + record_json TEXT NOT NULL, + updated_at INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_current_conversation_bindings_target + ON current_conversation_bindings(target_session_key, updated_at DESC, binding_key); +CREATE INDEX IF NOT EXISTS idx_current_conversation_bindings_expires + ON current_conversation_bindings(expires_at, binding_key); + CREATE TABLE IF NOT EXISTS task_delivery_state ( task_id TEXT NOT NULL PRIMARY KEY, requester_origin_json TEXT,