refactor: store current conversation bindings in sqlite table

This commit is contained in:
Peter Steinberger
2026-05-09 00:11:11 +01:00
parent bbca94041c
commit b41b6382af
9 changed files with 451 additions and 71 deletions

View File

@@ -104,8 +104,9 @@ The branch already has a real shared SQLite base:
`plugin_state_entries`, `plugin_blob_entries`, `transcript_files`,
`capture_sessions`, `capture_events`, `capture_blobs`,
`sandbox_registry_entries`, `cron_run_logs`, `cron_jobs`, `commitments`,
`delivery_queue_entries`, `task_runs`, `task_delivery_state`, `flow_runs`,
`subagent_runs`, `migration_runs`, and `backup_runs`.
`delivery_queue_entries`, `current_conversation_bindings`, `task_runs`,
`task_delivery_state`, `flow_runs`, `subagent_runs`, `migration_runs`, and
`backup_runs`.
- `src/state/openclaw-agent-db.ts` opens
`agents/<agentId>/agent/openclaw-agent.sqlite`, registers the database in the
global DB, and owns agent-local session, transcript, VFS, artifact, and cache
@@ -115,6 +116,9 @@ The branch already has a real shared SQLite base:
- Subagent run recovery state now lives in typed shared `subagent_runs` rows
with indexed child, requester, and controller session keys. The old
`subagent_runs` KV scope is migration input only.
- Current conversation bindings now live in typed shared
`current_conversation_bindings` rows keyed by normalized conversation id and
indexed by target session. The old KV scope is migration input only.
- `src/agents/filesystem/virtual-agent-fs.sqlite.ts` implements a SQLite VFS
over the agent database `vfs_entries` table.
- `src/agents/runtime-worker.entry.ts` creates per-run SQLite VFS, tool artifact,
@@ -529,6 +533,7 @@ task_runs(...)
task_delivery_state(...)
flow_runs(...)
subagent_runs(run_id, child_session_key, requester_session_key, controller_session_key, created_at, ended_at, cleanup_handled, payload_json)
current_conversation_bindings(binding_key, binding_id, channel, account_id, conversation_id, target_session_key, status, bound_at, expires_at, record_json)
plugin_state_entries(plugin_id, namespace, entry_key, value_json, created_at, expires_at)
plugin_blob_entries(plugin_id, namespace, entry_key, metadata_json, blob, created_at, expires_at)
media_blobs(subdir, id, content_type, size_bytes, blob, created_at, updated_at)

View File

@@ -1,12 +1,16 @@
import type { DatabaseSync } from "node:sqlite";
import type { Insertable, Selectable } from "kysely";
import { normalizeConversationText } from "../../acp/conversation-id.js";
import { normalizeAnyChannelId } from "../../channels/registry.js";
import { executeSqliteQuerySync, getNodeSqliteKysely } from "../../infra/kysely-sync.js";
import { getActivePluginChannelRegistryFromState } from "../../plugins/runtime-channel-state.js";
import { normalizeOptionalLowercaseString } from "../../shared/string-coerce.js";
import type { DB as OpenClawStateKyselyDatabase } from "../../state/openclaw-state-db.generated.js";
import {
deleteOpenClawStateKvJson,
listOpenClawStateKvJson,
writeOpenClawStateKvJson,
} from "../../state/openclaw-state-kv.js";
type OpenClawStateDatabaseOptions,
openOpenClawStateDatabase,
runOpenClawStateWriteTransaction,
} from "../../state/openclaw-state-db.js";
import { normalizeConversationRef } from "./session-binding-normalization.js";
import type {
ConversationRef,
@@ -16,13 +20,14 @@ import type {
SessionBindingUnbindInput,
} from "./session-binding.types.js";
type PersistedCurrentConversationBindingEntry = {
version: 1;
binding: SessionBindingRecord;
};
type CurrentConversationBindingsTable =
OpenClawStateKyselyDatabase["current_conversation_bindings"];
type CurrentConversationBindingRow = Selectable<CurrentConversationBindingsTable>;
type CurrentConversationBindingsDatabase = Pick<
OpenClawStateKyselyDatabase,
"current_conversation_bindings"
>;
const CURRENT_BINDINGS_FILE_VERSION = 1;
const CURRENT_BINDINGS_KV_SCOPE = "current-conversation-bindings";
const CURRENT_BINDINGS_ID_PREFIX = "generic:";
let bindingsLoaded = false;
@@ -48,19 +53,134 @@ function isBindingExpired(record: SessionBindingRecord, now = Date.now()): boole
: false;
}
function toPersistedEntry(record: SessionBindingRecord): PersistedCurrentConversationBindingEntry {
function sqliteOptionsForEnv(env: NodeJS.ProcessEnv = process.env): OpenClawStateDatabaseOptions {
return {
version: CURRENT_BINDINGS_FILE_VERSION,
binding: record,
env,
};
}
function parsePersistedBindingEntry(value: unknown): SessionBindingRecord | null {
if (!value || typeof value !== "object" || Array.isArray(value)) {
function getCurrentConversationBindingsKysely(db: DatabaseSync) {
return getNodeSqliteKysely<CurrentConversationBindingsDatabase>(db);
}
function serializeJson(value: unknown): string | null {
return value == null ? null : JSON.stringify(value);
}
function parseJsonRecord(raw: string | null): Record<string, unknown> | undefined {
if (!raw?.trim()) {
return undefined;
}
try {
const parsed = JSON.parse(raw) as unknown;
return parsed && typeof parsed === "object" && !Array.isArray(parsed)
? (parsed as Record<string, unknown>)
: undefined;
} catch {
return undefined;
}
}
function normalizeNumber(value: number | bigint | null): number | undefined {
if (typeof value === "bigint") {
return Number(value);
}
return typeof value === "number" ? value : undefined;
}
function recordToRow(record: SessionBindingRecord): Insertable<CurrentConversationBindingsTable> {
const conversation = normalizeConversationRef(record.conversation);
const bindingKey = buildConversationKey(conversation);
const bindingId = buildBindingId(conversation);
const normalized: SessionBindingRecord = {
...record,
bindingId,
conversation,
targetSessionKey: record.targetSessionKey.trim(),
};
return {
binding_key: bindingKey,
binding_id: bindingId,
channel: conversation.channel,
account_id: conversation.accountId,
parent_conversation_id: conversation.parentConversationId ?? null,
conversation_id: conversation.conversationId,
target_session_key: normalized.targetSessionKey,
target_kind: normalized.targetKind,
status: normalized.status,
bound_at: normalized.boundAt,
expires_at: normalized.expiresAt ?? null,
metadata_json: serializeJson(normalized.metadata),
record_json: JSON.stringify(normalized),
updated_at: Date.now(),
};
}
function rowToRecord(row: CurrentConversationBindingRow): SessionBindingRecord | null {
const parsed = parseJsonRecord(row.record_json) ?? {};
const conversation = normalizeConversationRef({
channel: row.channel,
accountId: row.account_id,
conversationId: row.conversation_id,
...(row.parent_conversation_id ? { parentConversationId: row.parent_conversation_id } : {}),
});
const targetSessionKey = row.target_session_key.trim();
if (!conversation.channel || !conversation.conversationId || !targetSessionKey) {
return null;
}
const record = value as PersistedCurrentConversationBindingEntry;
return record.version === CURRENT_BINDINGS_FILE_VERSION ? record.binding : null;
return {
...(parsed as Partial<SessionBindingRecord>),
bindingId: buildBindingId(conversation),
targetSessionKey,
targetKind: row.target_kind === "subagent" ? "subagent" : "session",
conversation,
status:
row.status === "ending" || row.status === "ended" || row.status === "active"
? row.status
: "active",
boundAt: normalizeNumber(row.bound_at) ?? 0,
...(row.expires_at != null ? { expiresAt: normalizeNumber(row.expires_at) } : {}),
metadata: parseJsonRecord(row.metadata_json),
};
}
function upsertBindingRow(record: SessionBindingRecord, env?: NodeJS.ProcessEnv): void {
runOpenClawStateWriteTransaction((stateDatabase) => {
const db = getCurrentConversationBindingsKysely(stateDatabase.db);
executeSqliteQuerySync(
stateDatabase.db,
db
.insertInto("current_conversation_bindings")
.values(recordToRow(record))
.onConflict((conflict) =>
conflict.column("binding_key").doUpdateSet({
binding_id: (eb) => eb.ref("excluded.binding_id"),
channel: (eb) => eb.ref("excluded.channel"),
account_id: (eb) => eb.ref("excluded.account_id"),
parent_conversation_id: (eb) => eb.ref("excluded.parent_conversation_id"),
conversation_id: (eb) => eb.ref("excluded.conversation_id"),
target_session_key: (eb) => eb.ref("excluded.target_session_key"),
target_kind: (eb) => eb.ref("excluded.target_kind"),
status: (eb) => eb.ref("excluded.status"),
bound_at: (eb) => eb.ref("excluded.bound_at"),
expires_at: (eb) => eb.ref("excluded.expires_at"),
metadata_json: (eb) => eb.ref("excluded.metadata_json"),
record_json: (eb) => eb.ref("excluded.record_json"),
updated_at: (eb) => eb.ref("excluded.updated_at"),
}),
),
);
}, sqliteOptionsForEnv(env));
}
function deleteBindingRow(key: string, env?: NodeJS.ProcessEnv): void {
runOpenClawStateWriteTransaction((stateDatabase) => {
const db = getCurrentConversationBindingsKysely(stateDatabase.db);
executeSqliteQuerySync(
stateDatabase.db,
db.deleteFrom("current_conversation_bindings").where("binding_key", "=", key),
);
}, sqliteOptionsForEnv(env));
}
function loadBindingsIntoMemory(): void {
@@ -69,12 +189,16 @@ function loadBindingsIntoMemory(): void {
}
bindingsLoaded = true;
bindingsByConversationKey.clear();
const entries =
listOpenClawStateKvJson<PersistedCurrentConversationBindingEntry>(CURRENT_BINDINGS_KV_SCOPE);
for (const entry of entries) {
const record = parsePersistedBindingEntry(entry.value);
const stateDatabase = openOpenClawStateDatabase();
const db = getCurrentConversationBindingsKysely(stateDatabase.db);
const rows = executeSqliteQuerySync<CurrentConversationBindingRow>(
stateDatabase.db,
db.selectFrom("current_conversation_bindings").selectAll().orderBy("updated_at", "asc"),
).rows;
for (const row of rows) {
const record = rowToRecord(row);
if (!record?.bindingId || !record?.conversation?.conversationId || isBindingExpired(record)) {
deleteOpenClawStateKvJson(CURRENT_BINDINGS_KV_SCOPE, entry.key);
deleteBindingRow(row.binding_key);
continue;
}
const conversation = normalizeConversationRef(record.conversation);
@@ -92,12 +216,11 @@ function loadBindingsIntoMemory(): void {
}
function persistBinding(record: SessionBindingRecord): void {
const key = buildConversationKey(record.conversation);
writeOpenClawStateKvJson(CURRENT_BINDINGS_KV_SCOPE, key, toPersistedEntry(record));
upsertBindingRow(record);
}
function deletePersistedBinding(key: string): void {
deleteOpenClawStateKvJson(CURRENT_BINDINGS_KV_SCOPE, key);
deleteBindingRow(key);
}
function pruneExpiredBinding(key: string): SessionBindingRecord | null {
@@ -273,11 +396,10 @@ export const __testing = {
bindingsLoaded = false;
bindingsByConversationKey.clear();
if (params?.deletePersistedFile) {
for (const entry of listOpenClawStateKvJson(CURRENT_BINDINGS_KV_SCOPE, {
env: params.env,
})) {
deleteOpenClawStateKvJson(CURRENT_BINDINGS_KV_SCOPE, entry.key, { env: params.env });
}
runOpenClawStateWriteTransaction((stateDatabase) => {
const db = getCurrentConversationBindingsKysely(stateDatabase.db);
executeSqliteQuerySync(stateDatabase.db, db.deleteFrom("current_conversation_bindings"));
}, sqliteOptionsForEnv(params.env));
}
},
persistBindingForTests(record: SessionBindingRecord, env?: NodeJS.ProcessEnv) {
@@ -287,15 +409,6 @@ export const __testing = {
bindingId: buildBindingId(conversation),
conversation,
};
writeOpenClawStateKvJson(
CURRENT_BINDINGS_KV_SCOPE,
buildConversationKey(conversation),
{
version: CURRENT_BINDINGS_FILE_VERSION,
binding: normalized,
},
{ env },
);
upsertBindingRow(normalized, env);
},
currentBindingsKvScope: CURRENT_BINDINGS_KV_SCOPE,
};

View File

@@ -21,6 +21,10 @@ import { detectLegacyStateMigrations, runLegacyStateMigrations } from "./state-m
type DeliveryQueueTestDatabase = Pick<OpenClawStateKyselyDatabase, "delivery_queue_entries">;
type KvTestDatabase = Pick<OpenClawStateKyselyDatabase, "kv">;
type CurrentConversationBindingsTestDatabase = Pick<
OpenClawStateKyselyDatabase,
"current_conversation_bindings"
>;
type PluginStateTestDatabase = Pick<OpenClawStateKyselyDatabase, "plugin_state_entries">;
type MigrationSourceRow = {
migration_kind: string;
@@ -588,7 +592,7 @@ describe("state migrations", () => {
await expect(fs.stat(sourcePath)).rejects.toMatchObject({ code: "ENOENT" });
});
it("imports legacy current conversation bindings into SQLite KV", async () => {
it("imports legacy current conversation bindings into SQLite rows", async () => {
const { root, stateDir, env, cfg } = await createLegacyStateFixture();
const sourcePath = path.join(stateDir, "bindings", "current-conversations.json");
await fs.mkdir(path.dirname(sourcePath), { recursive: true });
@@ -637,26 +641,31 @@ describe("state migrations", () => {
);
const stateDatabase = openOpenClawStateDatabase({ env });
const db = getNodeSqliteKysely<KvTestDatabase>(stateDatabase.db);
const row = executeSqliteQuerySync<{ key: string; value_json: string }>(
const db = getNodeSqliteKysely<CurrentConversationBindingsTestDatabase>(stateDatabase.db);
const row = executeSqliteQuerySync<{
binding_key: string;
binding_id: string;
target_session_key: string;
record_json: string;
}>(
stateDatabase.db,
db
.selectFrom("kv")
.select(["key", "value_json"])
.where("scope", "=", "current-conversation-bindings"),
.selectFrom("current_conversation_bindings")
.select(["binding_key", "binding_id", "target_session_key", "record_json"]),
).rows[0];
expect(row?.key).toBe("forum\u241fdefault\u241f\u241f6098642967");
expect(JSON.parse(row?.value_json ?? "{}")).toMatchObject({
version: 1,
binding: {
bindingId: "generic:forum\u241fdefault\u241f\u241f6098642967",
targetSessionKey: "agent:worker-1:acp:forum-dm",
conversation: {
channel: "forum",
accountId: "default",
conversationId: "6098642967",
},
expect(row).toMatchObject({
binding_key: "forum\u241fdefault\u241f\u241f6098642967",
binding_id: "generic:forum\u241fdefault\u241f\u241f6098642967",
target_session_key: "agent:worker-1:acp:forum-dm",
});
expect(JSON.parse(row?.record_json ?? "{}")).toMatchObject({
bindingId: "generic:forum\u241fdefault\u241f\u241f6098642967",
targetSessionKey: "agent:worker-1:acp:forum-dm",
conversation: {
channel: "forum",
accountId: "default",
conversationId: "6098642967",
},
});
await expect(fs.stat(sourcePath)).rejects.toMatchObject({ code: "ENOENT" });

View File

@@ -411,7 +411,6 @@ type LegacyCurrentConversationBindingsFile = {
bindings?: unknown;
};
const CURRENT_CONVERSATION_BINDINGS_KV_SCOPE = "current-conversation-bindings";
const CURRENT_CONVERSATION_BINDINGS_ID_PREFIX = "generic:";
function readLegacyQueueJson(filePath: string, id: string, enqueuedAt: number): string {
@@ -997,7 +996,9 @@ function importLegacyCurrentConversationBindingsToSqlite(
let imported = 0;
runOpenClawStateWriteTransaction(
(stateDatabase) => {
const db = getNodeSqliteKysely<KvMigrationDatabase>(stateDatabase.db);
const db = getNodeSqliteKysely<CurrentConversationBindingsMigrationDatabase>(
stateDatabase.db,
);
for (const candidate of bindings) {
if (!candidate || typeof candidate !== "object" || Array.isArray(candidate)) {
continue;
@@ -1017,16 +1018,38 @@ function importLegacyCurrentConversationBindingsToSqlite(
executeSqliteQuerySync(
stateDatabase.db,
db
.insertInto("kv")
.insertInto("current_conversation_bindings")
.values({
scope: CURRENT_CONVERSATION_BINDINGS_KV_SCOPE,
key,
value_json: JSON.stringify({ version: 1, binding: normalized }),
binding_key: key,
binding_id: normalized.bindingId,
channel: normalized.conversation.channel,
account_id: normalized.conversation.accountId,
parent_conversation_id: normalized.conversation.parentConversationId ?? null,
conversation_id: normalized.conversation.conversationId,
target_session_key: normalized.targetSessionKey,
target_kind: normalized.targetKind,
status: normalized.status,
bound_at: normalized.boundAt,
expires_at: normalized.expiresAt ?? null,
metadata_json:
normalized.metadata == null ? null : JSON.stringify(normalized.metadata),
record_json: JSON.stringify(normalized),
updated_at: updatedAt,
})
.onConflict((conflict) =>
conflict.columns(["scope", "key"]).doUpdateSet({
value_json: (eb) => eb.ref("excluded.value_json"),
conflict.column("binding_key").doUpdateSet({
binding_id: (eb) => eb.ref("excluded.binding_id"),
channel: (eb) => eb.ref("excluded.channel"),
account_id: (eb) => eb.ref("excluded.account_id"),
parent_conversation_id: (eb) => eb.ref("excluded.parent_conversation_id"),
conversation_id: (eb) => eb.ref("excluded.conversation_id"),
target_session_key: (eb) => eb.ref("excluded.target_session_key"),
target_kind: (eb) => eb.ref("excluded.target_kind"),
status: (eb) => eb.ref("excluded.status"),
bound_at: (eb) => eb.ref("excluded.bound_at"),
expires_at: (eb) => eb.ref("excluded.expires_at"),
metadata_json: (eb) => eb.ref("excluded.metadata_json"),
record_json: (eb) => eb.ref("excluded.record_json"),
updated_at: (eb) => eb.ref("excluded.updated_at"),
}),
),
@@ -1842,6 +1865,10 @@ type AgentToolArtifactMigrationDatabase = Pick<OpenClawAgentKyselyDatabase, "too
type LegacyGlobalAgentOwnedMigrationDatabase = Pick<OpenClawStateKyselyDatabase, never>;
type DeliveryQueueMigrationDatabase = Pick<OpenClawStateKyselyDatabase, "delivery_queue_entries">;
type KvMigrationDatabase = Pick<OpenClawStateKyselyDatabase, "kv">;
type CurrentConversationBindingsMigrationDatabase = Pick<
OpenClawStateKyselyDatabase,
"current_conversation_bindings"
>;
type PluginStateMigrationDatabase = Pick<OpenClawStateKyselyDatabase, "plugin_state_entries">;
const CLAWHUB_SKILL_STATE_OWNER_ID = "core:clawhub-skills";

View File

@@ -129,6 +129,23 @@ export interface CronRunLogs {
ts: number;
}
export interface CurrentConversationBindings {
account_id: string;
binding_id: string;
binding_key: string;
bound_at: number;
channel: string;
conversation_id: string;
expires_at: number | null;
metadata_json: string | null;
parent_conversation_id: string | null;
record_json: string;
status: string;
target_kind: string;
target_session_key: string;
updated_at: number;
}
export interface DeliveryQueueEntries {
enqueued_at: number;
entry_json: string;
@@ -339,6 +356,7 @@ export interface DB {
commitments: Commitments;
cron_jobs: CronJobs;
cron_run_logs: CronRunLogs;
current_conversation_bindings: CurrentConversationBindings;
delivery_queue_entries: DeliveryQueueEntries;
flow_runs: FlowRuns;
kv: Kv;

View File

@@ -157,7 +157,7 @@ describe("openclaw state database", () => {
expect(columns.some((column) => column.name === "sort_order")).toBe(true);
expect(index?.sql).toContain("sort_order ASC");
expect(version.user_version).toBe(20);
expect(version.user_version).toBe(21);
});
it("migrates legacy cron runtime state from kv into cron job columns", () => {
@@ -215,7 +215,7 @@ describe("openclaw state database", () => {
.prepare("SELECT COUNT(*) AS count FROM kv WHERE scope = ?")
.get("cron.jobs.state"),
).toEqual({ count: 0 });
expect(database.db.prepare("PRAGMA user_version").get()).toEqual({ user_version: 20 });
expect(database.db.prepare("PRAGMA user_version").get()).toEqual({ user_version: 21 });
});
it("migrates persisted subagent runs from kv into subagent run rows", () => {
@@ -264,7 +264,57 @@ describe("openclaw state database", () => {
expect(
database.db.prepare("SELECT COUNT(*) AS count FROM kv WHERE scope = ?").get("subagent_runs"),
).toEqual({ count: 0 });
expect(database.db.prepare("PRAGMA user_version").get()).toEqual({ user_version: 20 });
expect(database.db.prepare("PRAGMA user_version").get()).toEqual({ user_version: 21 });
});
it("migrates current conversation bindings from kv into binding rows", () => {
const stateDir = createTempStateDir();
const dbPath = resolveOpenClawStateSqlitePath({ OPENCLAW_STATE_DIR: stateDir });
fs.mkdirSync(path.dirname(dbPath), { recursive: true });
const sqlite = requireNodeSqlite();
const oldDb = new sqlite.DatabaseSync(dbPath);
oldDb.exec(`
CREATE TABLE kv (
scope TEXT NOT NULL,
key TEXT NOT NULL,
value_json TEXT NOT NULL,
updated_at INTEGER NOT NULL,
PRIMARY KEY (scope, key)
);
INSERT INTO kv (scope, key, value_json, updated_at)
VALUES (
'current-conversation-bindings',
'forum\u241fdefault\u241f6098642967\u241f6098642967',
'{"version":1,"binding":{"bindingId":"generic:forum\u241fdefault\u241f6098642967\u241f6098642967","targetSessionKey":" agent:worker-1:acp:forum-dm ","targetKind":"session","conversation":{"channel":"forum","accountId":"default","conversationId":"6098642967","parentConversationId":"6098642967"},"status":"active","boundAt":1234,"metadata":{"label":"forum-dm"}}}',
5678
);
PRAGMA user_version = 20;
`);
oldDb.close();
const database = openOpenClawStateDatabase({
env: { OPENCLAW_STATE_DIR: stateDir },
});
expect(
database.db
.prepare(
"SELECT binding_key, binding_id, parent_conversation_id, target_session_key, metadata_json FROM current_conversation_bindings WHERE conversation_id = ?",
)
.get("6098642967"),
).toEqual({
binding_key: "forum\u241fdefault\u241f\u241f6098642967",
binding_id: "generic:forum\u241fdefault\u241f\u241f6098642967",
parent_conversation_id: null,
target_session_key: "agent:worker-1:acp:forum-dm",
metadata_json: '{"label":"forum-dm"}',
});
expect(
database.db
.prepare("SELECT COUNT(*) AS count FROM kv WHERE scope = ?")
.get("current-conversation-bindings"),
).toEqual({ count: 0 });
expect(database.db.prepare("PRAGMA user_version").get()).toEqual({ user_version: 21 });
});
it("upgrades task delivery state with task-run cascade integrity", () => {

View File

@@ -17,7 +17,7 @@ import {
} from "./openclaw-state-db.paths.js";
import { OPENCLAW_STATE_SCHEMA_SQL } from "./openclaw-state-schema.generated.js";
const OPENCLAW_STATE_SCHEMA_VERSION = 20;
const OPENCLAW_STATE_SCHEMA_VERSION = 21;
export const OPENCLAW_SQLITE_BUSY_TIMEOUT_MS = 30_000;
const OPENCLAW_STATE_DIR_MODE = 0o700;
const OPENCLAW_STATE_FILE_MODE = 0o600;
@@ -346,6 +346,117 @@ function migrateSubagentRunsFromKv(db: DatabaseSync): void {
db.prepare("DELETE FROM kv WHERE scope = 'subagent_runs'").run();
}
function migrateCurrentConversationBindingsFromKv(db: DatabaseSync): void {
const legacyRows = db
.prepare(
"SELECT key, value_json, updated_at FROM kv WHERE scope = 'current-conversation-bindings'",
)
.all() as Array<{ key?: unknown; value_json?: unknown; updated_at?: unknown }>;
if (legacyRows.length === 0) {
return;
}
const insert = db.prepare(`
INSERT OR REPLACE INTO current_conversation_bindings (
binding_key,
binding_id,
channel,
account_id,
parent_conversation_id,
conversation_id,
target_session_key,
target_kind,
status,
bound_at,
expires_at,
metadata_json,
record_json,
updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
for (const row of legacyRows) {
if (typeof row.key !== "string" || typeof row.value_json !== "string") {
continue;
}
let parsed: unknown;
try {
parsed = JSON.parse(row.value_json);
} catch {
continue;
}
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
continue;
}
const binding =
(parsed as { version?: unknown; binding?: unknown }).version === 1
? (parsed as { binding?: unknown }).binding
: parsed;
if (!binding || typeof binding !== "object" || Array.isArray(binding)) {
continue;
}
const record = binding as Record<string, unknown>;
const conversation = record.conversation;
if (!conversation || typeof conversation !== "object" || Array.isArray(conversation)) {
continue;
}
const conversationRecord = conversation as Record<string, unknown>;
const channel = readString(conversationRecord.channel)?.toLowerCase();
const accountId = readString(conversationRecord.accountId) ?? "default";
const conversationId = readString(conversationRecord.conversationId);
const parentConversationIdRaw = readString(conversationRecord.parentConversationId);
const parentConversationId =
parentConversationIdRaw && parentConversationIdRaw !== conversationId
? parentConversationIdRaw
: null;
const targetSessionKey = readString(record.targetSessionKey);
if (!channel || !conversationId || !targetSessionKey) {
continue;
}
const bindingKey = [channel, accountId, parentConversationId ?? "", conversationId].join(
"\u241f",
);
const bindingId = `generic:${bindingKey}`;
const targetKind = record.targetKind === "subagent" ? "subagent" : "session";
const status =
record.status === "ending" || record.status === "ended" ? record.status : "active";
const updatedAt = readFiniteNumber(row.updated_at) ?? Date.now();
const normalized = {
...record,
bindingId,
targetSessionKey,
targetKind,
status,
conversation: {
...conversationRecord,
channel,
accountId,
conversationId,
...(parentConversationId ? { parentConversationId } : {}),
},
};
insert.run(
bindingKey,
bindingId,
channel,
accountId,
parentConversationId,
conversationId,
targetSessionKey,
targetKind,
status,
readFiniteNumber(record.boundAt) ?? updatedAt,
readFiniteNumber(record.expiresAt),
readJsonText(record.metadata),
JSON.stringify(normalized),
updatedAt,
);
}
db.prepare("DELETE FROM kv WHERE scope = 'current-conversation-bindings'").run();
}
function rebuildTaskDeliveryStateWithForeignKey(db: DatabaseSync): void {
db.exec(`
CREATE TABLE IF NOT EXISTS task_delivery_state_next (
@@ -425,6 +536,9 @@ function migrateStateSchema(db: DatabaseSync, fromVersion: number): void {
if (fromVersion < 20) {
migrateSubagentRunsFromKv(db);
}
if (fromVersion < 21) {
migrateCurrentConversationBindingsFromKv(db);
}
}
function ensureSchema(db: DatabaseSync, pathname: string): void {

View File

@@ -353,6 +353,28 @@ CREATE INDEX IF NOT EXISTS idx_subagent_runs_archive_at
CREATE INDEX IF NOT EXISTS idx_subagent_runs_ended_cleanup
ON subagent_runs(ended_at, cleanup_handled, run_id);
CREATE TABLE IF NOT EXISTS current_conversation_bindings (
binding_key TEXT NOT NULL PRIMARY KEY,
binding_id TEXT NOT NULL,
channel TEXT NOT NULL,
account_id TEXT NOT NULL,
parent_conversation_id TEXT,
conversation_id TEXT NOT NULL,
target_session_key TEXT NOT NULL,
target_kind TEXT NOT NULL,
status TEXT NOT NULL,
bound_at INTEGER NOT NULL,
expires_at INTEGER,
metadata_json TEXT,
record_json TEXT NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_current_conversation_bindings_target
ON current_conversation_bindings(target_session_key, updated_at DESC, binding_key);
CREATE INDEX IF NOT EXISTS idx_current_conversation_bindings_expires
ON current_conversation_bindings(expires_at, binding_key);
CREATE TABLE IF NOT EXISTS task_delivery_state (
task_id TEXT NOT NULL PRIMARY KEY,
requester_origin_json TEXT,

View File

@@ -348,6 +348,28 @@ CREATE INDEX IF NOT EXISTS idx_subagent_runs_archive_at
CREATE INDEX IF NOT EXISTS idx_subagent_runs_ended_cleanup
ON subagent_runs(ended_at, cleanup_handled, run_id);
CREATE TABLE IF NOT EXISTS current_conversation_bindings (
binding_key TEXT NOT NULL PRIMARY KEY,
binding_id TEXT NOT NULL,
channel TEXT NOT NULL,
account_id TEXT NOT NULL,
parent_conversation_id TEXT,
conversation_id TEXT NOT NULL,
target_session_key TEXT NOT NULL,
target_kind TEXT NOT NULL,
status TEXT NOT NULL,
bound_at INTEGER NOT NULL,
expires_at INTEGER,
metadata_json TEXT,
record_json TEXT NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_current_conversation_bindings_target
ON current_conversation_bindings(target_session_key, updated_at DESC, binding_key);
CREATE INDEX IF NOT EXISTS idx_current_conversation_bindings_expires
ON current_conversation_bindings(expires_at, binding_key);
CREATE TABLE IF NOT EXISTS task_delivery_state (
task_id TEXT NOT NULL PRIMARY KEY,
requester_origin_json TEXT,