From 4e7d3975faf701c77d271c429e6829bd2f0ddd5c Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 8 May 2026 13:12:13 +0100 Subject: [PATCH] refactor: move legacy sidecar imports to doctor --- .../check-database-first-legacy-stores.mjs | 3 - src/commands/doctor-sqlite-sidecars.ts | 752 ++++++++++++++++++ src/commands/doctor-sqlite-state.ts | 20 +- src/plugin-state/plugin-state-store.sqlite.ts | 80 +- src/plugin-state/plugin-state-store.test.ts | 6 +- src/plugin-state/plugin-state-store.ts | 2 - src/tasks/task-flow-registry.store.sqlite.ts | 162 ---- src/tasks/task-flow-registry.store.test.ts | 2 +- src/tasks/task-registry.store.sqlite.ts | 181 ----- src/tasks/task-registry.store.test.ts | 2 +- 10 files changed, 767 insertions(+), 443 deletions(-) create mode 100644 src/commands/doctor-sqlite-sidecars.ts diff --git a/scripts/check-database-first-legacy-stores.mjs b/scripts/check-database-first-legacy-stores.mjs index 3a4fb972db7..f6bb5fa26c8 100644 --- a/scripts/check-database-first-legacy-stores.mjs +++ b/scripts/check-database-first-legacy-stores.mjs @@ -133,9 +133,6 @@ const allowedExactPaths = new Set([ "extensions/qqbot/src/state-migrations.ts", "extensions/telegram/src/state-migrations.ts", "src/infra/state-migrations.ts", - "src/plugin-state/plugin-state-store.sqlite.ts", - "src/tasks/task-flow-registry.store.sqlite.ts", - "src/tasks/task-registry.store.sqlite.ts", "src/trajectory/export.ts", "src/trajectory/paths.ts", ]); diff --git a/src/commands/doctor-sqlite-sidecars.ts b/src/commands/doctor-sqlite-sidecars.ts new file mode 100644 index 00000000000..563c8c3eb18 --- /dev/null +++ b/src/commands/doctor-sqlite-sidecars.ts @@ -0,0 +1,752 @@ +import { existsSync, rmSync } from "node:fs"; +import type { DatabaseSync } from "node:sqlite"; +import type { Insertable, Selectable } from "kysely"; +import { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js"; +import { requireNodeSqlite } from "../infra/node-sqlite.js"; +import { resolveLegacyPluginStateSqlitePath } from "../plugin-state/plugin-state-store.paths.js"; +import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js"; +import { runOpenClawStateWriteTransaction } from "../state/openclaw-state-db.js"; +import { resolveLegacyTaskFlowRegistrySqlitePath } from "../tasks/task-flow-registry.paths.js"; +import type { TaskFlowRecord, TaskFlowSyncMode } from "../tasks/task-flow-registry.types.js"; +import { resolveLegacyTaskRegistrySqlitePath } from "../tasks/task-registry.paths.js"; +import type { TaskDeliveryState, TaskRecord } from "../tasks/task-registry.types.js"; +import type { DeliveryContext } from "../utils/delivery-context.types.js"; + +const SQLITE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const; + +type PluginStateEntriesTable = OpenClawStateKyselyDatabase["plugin_state_entries"]; +type PluginStateRow = Selectable; +type PluginStateDatabase = Pick; + +type TaskRunsTable = OpenClawStateKyselyDatabase["task_runs"]; +type TaskDeliveryStateTable = OpenClawStateKyselyDatabase["task_delivery_state"]; +type TaskRegistryDatabase = Pick; +type TaskRegistryRow = Selectable & { + runtime: TaskRecord["runtime"]; + scope_kind: TaskRecord["scopeKind"]; + status: TaskRecord["status"]; + delivery_status: TaskRecord["deliveryStatus"]; + notify_policy: TaskRecord["notifyPolicy"]; + terminal_outcome: TaskRecord["terminalOutcome"] | null; +}; +type TaskDeliveryStateRow = Selectable; + +type FlowRunsTable = OpenClawStateKyselyDatabase["flow_runs"]; +type FlowRegistryDatabase = Pick; +type FlowRegistryRow = Selectable & { + sync_mode: TaskFlowSyncMode | null; + status: TaskFlowRecord["status"]; + notify_policy: TaskFlowRecord["notifyPolicy"]; +}; + +type TableInfoRow = { + name: string; +}; + +export type LegacyPluginStateSidecarImportResult = { + sourcePath: string; + importedEntries: number; + removedSource: boolean; +}; + +function normalizeNumber(value: number | bigint | null): number | undefined { + if (typeof value === "bigint") { + return Number(value); + } + return typeof value === "number" ? value : undefined; +} + +function serializeJson(value: unknown): string | null { + return value === undefined ? null : JSON.stringify(value); +} + +// oxlint-disable-next-line typescript/no-unnecessary-type-parameters -- Legacy JSON columns are typed by the receiving field. +function parseJsonValue(raw: string | null): T | undefined { + if (!raw?.trim()) { + return undefined; + } + try { + return JSON.parse(raw) as T; + } catch { + return undefined; + } +} + +function getPluginStateKysely(db: DatabaseSync) { + return getNodeSqliteKysely(db); +} + +function getTaskRegistryKysely(db: DatabaseSync) { + return getNodeSqliteKysely(db); +} + +function getFlowRegistryKysely(db: DatabaseSync) { + return getNodeSqliteKysely(db); +} + +function removeSqliteSidecars(pathname: string): boolean { + for (const suffix of SQLITE_SIDECAR_SUFFIXES) { + rmSync(`${pathname}${suffix}`, { force: true }); + } + return !existsSync(pathname); +} + +function selectPluginStateRows(db: DatabaseSync): PluginStateRow[] { + return executeSqliteQuerySync( + db, + getPluginStateKysely(db) + .selectFrom("plugin_state_entries") + .select(["plugin_id", "namespace", "entry_key", "value_json", "created_at", "expires_at"]) + .orderBy("plugin_id", "asc") + .orderBy("namespace", "asc") + .orderBy("entry_key", "asc"), + ).rows; +} + +function upsertPluginStateRow(db: DatabaseSync, row: Insertable): void { + executeSqliteQuerySync( + db, + getPluginStateKysely(db) + .insertInto("plugin_state_entries") + .values(row) + .onConflict((conflict) => + conflict.columns(["plugin_id", "namespace", "entry_key"]).doUpdateSet({ + value_json: (eb) => eb.ref("excluded.value_json"), + created_at: (eb) => eb.ref("excluded.created_at"), + expires_at: (eb) => eb.ref("excluded.expires_at"), + }), + ), + ); +} + +export function legacyPluginStateSidecarExists(env: NodeJS.ProcessEnv = process.env): boolean { + return existsSync(resolveLegacyPluginStateSqlitePath(env)); +} + +export function importLegacyPluginStateSidecarToSqlite( + env: NodeJS.ProcessEnv = process.env, +): LegacyPluginStateSidecarImportResult { + const sourcePath = resolveLegacyPluginStateSqlitePath(env); + if (!existsSync(sourcePath)) { + return { + sourcePath, + importedEntries: 0, + removedSource: false, + }; + } + + const { DatabaseSync } = requireNodeSqlite(); + const legacyDb = new DatabaseSync(sourcePath); + let rows: PluginStateRow[]; + try { + rows = selectPluginStateRows(legacyDb); + } finally { + legacyDb.close(); + } + + runOpenClawStateWriteTransaction( + (database) => { + for (const row of rows) { + upsertPluginStateRow(database.db, { + ...row, + created_at: normalizeNumber(row.created_at) ?? 0, + expires_at: normalizeNumber(row.expires_at) ?? null, + }); + } + }, + { env }, + ); + + return { + sourcePath, + importedEntries: rows.length, + removedSource: removeSqliteSidecars(sourcePath), + }; +} + +function hasLegacyColumn(db: DatabaseSync, tableName: string, columnName: string): boolean { + const rows = db.prepare(`PRAGMA table_info(${tableName})`).all() as TableInfoRow[]; + return rows.some((row) => row.name === columnName); +} + +function migrateLegacyTaskOwnerColumns(db: DatabaseSync) { + if (!hasLegacyColumn(db, "task_runs", "owner_key")) { + db.exec(`ALTER TABLE task_runs ADD COLUMN owner_key TEXT;`); + } + if (!hasLegacyColumn(db, "task_runs", "requester_session_key")) { + db.exec(`ALTER TABLE task_runs ADD COLUMN requester_session_key TEXT;`); + } + if (!hasLegacyColumn(db, "task_runs", "scope_kind")) { + db.exec(`ALTER TABLE task_runs ADD COLUMN scope_kind TEXT NOT NULL DEFAULT 'session';`); + } + db.exec(` + UPDATE task_runs + SET owner_key = requester_session_key + WHERE owner_key IS NULL + `); + db.exec(` + UPDATE task_runs + SET owner_key = CASE + WHEN trim(COALESCE(owner_key, '')) <> '' THEN trim(owner_key) + ELSE 'system:' || runtime || ':' || COALESCE(NULLIF(source_id, ''), task_id) + END + `); + db.exec(` + UPDATE task_runs + SET scope_kind = CASE + WHEN scope_kind = 'system' THEN 'system' + WHEN owner_key LIKE 'system:%' THEN 'system' + ELSE 'session' + END + `); + db.exec(` + UPDATE task_runs + SET requester_session_key = CASE + WHEN scope_kind = 'system' THEN '' + WHEN trim(COALESCE(requester_session_key, '')) <> '' THEN trim(requester_session_key) + ELSE owner_key + END + `); +} + +function ensureLegacyTaskRegistrySchema(db: DatabaseSync) { + db.exec(` + CREATE TABLE IF NOT EXISTS task_runs ( + task_id TEXT NOT NULL PRIMARY KEY, + runtime TEXT NOT NULL, + task_kind TEXT, + source_id TEXT, + requester_session_key TEXT, + owner_key TEXT NOT NULL, + scope_kind TEXT NOT NULL, + child_session_key TEXT, + parent_flow_id TEXT, + parent_task_id TEXT, + agent_id TEXT, + run_id TEXT, + label TEXT, + task TEXT NOT NULL, + status TEXT NOT NULL, + delivery_status TEXT NOT NULL, + notify_policy TEXT NOT NULL, + created_at INTEGER NOT NULL, + started_at INTEGER, + ended_at INTEGER, + last_event_at INTEGER, + cleanup_after INTEGER, + error TEXT, + progress_summary TEXT, + terminal_summary TEXT, + terminal_outcome TEXT + ); + `); + migrateLegacyTaskOwnerColumns(db); + for (const column of ["task_kind", "parent_flow_id"]) { + if (!hasLegacyColumn(db, "task_runs", column)) { + db.exec(`ALTER TABLE task_runs ADD COLUMN ${column} TEXT;`); + } + } + db.exec(` + CREATE TABLE IF NOT EXISTS task_delivery_state ( + task_id TEXT NOT NULL PRIMARY KEY, + requester_origin_json TEXT, + last_notified_event_at INTEGER + ); + `); +} + +function rowToTaskRecord(row: TaskRegistryRow): TaskRecord { + const startedAt = normalizeNumber(row.started_at); + const endedAt = normalizeNumber(row.ended_at); + const lastEventAt = normalizeNumber(row.last_event_at); + const cleanupAfter = normalizeNumber(row.cleanup_after); + const requesterSessionKey = + row.scope_kind === "system" ? "" : row.requester_session_key?.trim() || row.owner_key; + return { + taskId: row.task_id, + runtime: row.runtime, + ...(row.task_kind ? { taskKind: row.task_kind } : {}), + ...(row.source_id ? { sourceId: row.source_id } : {}), + requesterSessionKey, + ownerKey: row.owner_key, + scopeKind: row.scope_kind, + ...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}), + ...(row.parent_flow_id ? { parentFlowId: row.parent_flow_id } : {}), + ...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}), + ...(row.agent_id ? { agentId: row.agent_id } : {}), + ...(row.run_id ? { runId: row.run_id } : {}), + ...(row.label ? { label: row.label } : {}), + task: row.task, + status: row.status, + deliveryStatus: row.delivery_status, + notifyPolicy: row.notify_policy, + createdAt: normalizeNumber(row.created_at) ?? 0, + ...(startedAt != null ? { startedAt } : {}), + ...(endedAt != null ? { endedAt } : {}), + ...(lastEventAt != null ? { lastEventAt } : {}), + ...(cleanupAfter != null ? { cleanupAfter } : {}), + ...(row.error ? { error: row.error } : {}), + ...(row.progress_summary ? { progressSummary: row.progress_summary } : {}), + ...(row.terminal_summary ? { terminalSummary: row.terminal_summary } : {}), + ...(row.terminal_outcome ? { terminalOutcome: row.terminal_outcome } : {}), + }; +} + +function rowToTaskDeliveryState(row: TaskDeliveryStateRow): TaskDeliveryState { + const requesterOrigin = parseJsonValue(row.requester_origin_json); + const lastNotifiedEventAt = normalizeNumber(row.last_notified_event_at); + return { + taskId: row.task_id, + ...(requesterOrigin ? { requesterOrigin } : {}), + ...(lastNotifiedEventAt != null ? { lastNotifiedEventAt } : {}), + }; +} + +function bindTaskRecord(record: TaskRecord): Insertable { + return { + task_id: record.taskId, + runtime: record.runtime, + task_kind: record.taskKind ?? null, + source_id: record.sourceId ?? null, + requester_session_key: record.scopeKind === "system" ? "" : record.requesterSessionKey, + owner_key: record.ownerKey, + scope_kind: record.scopeKind, + child_session_key: record.childSessionKey ?? null, + parent_flow_id: record.parentFlowId ?? null, + parent_task_id: record.parentTaskId ?? null, + agent_id: record.agentId ?? null, + run_id: record.runId ?? null, + label: record.label ?? null, + task: record.task, + status: record.status, + delivery_status: record.deliveryStatus, + notify_policy: record.notifyPolicy, + created_at: record.createdAt, + started_at: record.startedAt ?? null, + ended_at: record.endedAt ?? null, + last_event_at: record.lastEventAt ?? null, + cleanup_after: record.cleanupAfter ?? null, + error: record.error ?? null, + progress_summary: record.progressSummary ?? null, + terminal_summary: record.terminalSummary ?? null, + terminal_outcome: record.terminalOutcome ?? null, + }; +} + +function bindTaskDeliveryState(state: TaskDeliveryState): Insertable { + return { + task_id: state.taskId, + requester_origin_json: serializeJson(state.requesterOrigin), + last_notified_event_at: state.lastNotifiedEventAt ?? null, + }; +} + +function selectTaskRows(db: DatabaseSync): TaskRegistryRow[] { + return executeSqliteQuerySync( + db, + getTaskRegistryKysely(db) + .selectFrom("task_runs") + .select([ + "task_id", + "runtime", + "task_kind", + "source_id", + "requester_session_key", + "owner_key", + "scope_kind", + "child_session_key", + "parent_flow_id", + "parent_task_id", + "agent_id", + "run_id", + "label", + "task", + "status", + "delivery_status", + "notify_policy", + "created_at", + "started_at", + "ended_at", + "last_event_at", + "cleanup_after", + "error", + "progress_summary", + "terminal_summary", + "terminal_outcome", + ]) + .orderBy("created_at", "asc") + .orderBy("task_id", "asc"), + ).rows; +} + +function selectTaskDeliveryStateRows(db: DatabaseSync): TaskDeliveryStateRow[] { + return executeSqliteQuerySync( + db, + getTaskRegistryKysely(db) + .selectFrom("task_delivery_state") + .select(["task_id", "requester_origin_json", "last_notified_event_at"]) + .orderBy("task_id", "asc"), + ).rows; +} + +function upsertTaskRow(db: DatabaseSync, row: Insertable): void { + executeSqliteQuerySync( + db, + getTaskRegistryKysely(db) + .insertInto("task_runs") + .values(row) + .onConflict((conflict) => + conflict.column("task_id").doUpdateSet({ + runtime: (eb) => eb.ref("excluded.runtime"), + task_kind: (eb) => eb.ref("excluded.task_kind"), + source_id: (eb) => eb.ref("excluded.source_id"), + requester_session_key: (eb) => eb.ref("excluded.requester_session_key"), + owner_key: (eb) => eb.ref("excluded.owner_key"), + scope_kind: (eb) => eb.ref("excluded.scope_kind"), + child_session_key: (eb) => eb.ref("excluded.child_session_key"), + parent_flow_id: (eb) => eb.ref("excluded.parent_flow_id"), + parent_task_id: (eb) => eb.ref("excluded.parent_task_id"), + agent_id: (eb) => eb.ref("excluded.agent_id"), + run_id: (eb) => eb.ref("excluded.run_id"), + label: (eb) => eb.ref("excluded.label"), + task: (eb) => eb.ref("excluded.task"), + status: (eb) => eb.ref("excluded.status"), + delivery_status: (eb) => eb.ref("excluded.delivery_status"), + notify_policy: (eb) => eb.ref("excluded.notify_policy"), + created_at: (eb) => eb.ref("excluded.created_at"), + started_at: (eb) => eb.ref("excluded.started_at"), + ended_at: (eb) => eb.ref("excluded.ended_at"), + last_event_at: (eb) => eb.ref("excluded.last_event_at"), + cleanup_after: (eb) => eb.ref("excluded.cleanup_after"), + error: (eb) => eb.ref("excluded.error"), + progress_summary: (eb) => eb.ref("excluded.progress_summary"), + terminal_summary: (eb) => eb.ref("excluded.terminal_summary"), + terminal_outcome: (eb) => eb.ref("excluded.terminal_outcome"), + }), + ), + ); +} + +function replaceTaskDeliveryStateRow( + db: DatabaseSync, + row: Insertable, +): void { + executeSqliteQuerySync( + db, + getTaskRegistryKysely(db) + .insertInto("task_delivery_state") + .values(row) + .onConflict((conflict) => + conflict.column("task_id").doUpdateSet({ + requester_origin_json: (eb) => eb.ref("excluded.requester_origin_json"), + last_notified_event_at: (eb) => eb.ref("excluded.last_notified_event_at"), + }), + ), + ); +} + +export function legacyTaskRegistrySidecarExists(env: NodeJS.ProcessEnv = process.env): boolean { + return existsSync(resolveLegacyTaskRegistrySqlitePath(env)); +} + +export function importLegacyTaskRegistrySidecarToSqlite(env: NodeJS.ProcessEnv = process.env): { + importedTasks: number; + importedDeliveryStates: number; + removedSource: boolean; + sourcePath: string; +} { + const sourcePath = resolveLegacyTaskRegistrySqlitePath(env); + if (!existsSync(sourcePath)) { + return { + importedTasks: 0, + importedDeliveryStates: 0, + removedSource: false, + sourcePath, + }; + } + + const { DatabaseSync } = requireNodeSqlite(); + const legacyDb = new DatabaseSync(sourcePath); + let tasks: TaskRecord[]; + let deliveryStates: TaskDeliveryState[]; + try { + ensureLegacyTaskRegistrySchema(legacyDb); + tasks = selectTaskRows(legacyDb).map(rowToTaskRecord); + deliveryStates = selectTaskDeliveryStateRows(legacyDb).map(rowToTaskDeliveryState); + } finally { + legacyDb.close(); + } + runOpenClawStateWriteTransaction( + (database) => { + for (const task of tasks) { + upsertTaskRow(database.db, bindTaskRecord(task)); + } + for (const deliveryState of deliveryStates) { + replaceTaskDeliveryStateRow(database.db, bindTaskDeliveryState(deliveryState)); + } + }, + { env }, + ); + return { + importedTasks: tasks.length, + importedDeliveryStates: deliveryStates.length, + removedSource: removeSqliteSidecars(sourcePath), + sourcePath, + }; +} + +export function removeLegacyTaskRegistrySidecar(env: NodeJS.ProcessEnv = process.env): { + removedSource: boolean; + sourcePath: string; +} { + const sourcePath = resolveLegacyTaskRegistrySqlitePath(env); + return { + removedSource: removeSqliteSidecars(sourcePath), + sourcePath, + }; +} + +function ensureLegacyTaskFlowRegistrySchema(db: DatabaseSync) { + db.exec(` + CREATE TABLE IF NOT EXISTS flow_runs ( + flow_id TEXT NOT NULL PRIMARY KEY, + shape TEXT, + sync_mode TEXT NOT NULL DEFAULT 'managed', + owner_key TEXT NOT NULL, + requester_origin_json TEXT, + controller_id TEXT, + revision INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL, + notify_policy TEXT NOT NULL, + goal TEXT NOT NULL, + current_step TEXT, + blocked_task_id TEXT, + blocked_summary TEXT, + state_json TEXT, + wait_json TEXT, + cancel_requested_at INTEGER, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + ended_at INTEGER + ); + `); + if ( + !hasLegacyColumn(db, "flow_runs", "owner_key") && + hasLegacyColumn(db, "flow_runs", "owner_session_key") + ) { + db.exec(`ALTER TABLE flow_runs ADD COLUMN owner_key TEXT;`); + db.exec(` + UPDATE flow_runs + SET owner_key = owner_session_key + WHERE owner_key IS NULL + `); + } + const optionalColumns = [ + ["shape", "TEXT"], + ["sync_mode", "TEXT"], + ["controller_id", "TEXT"], + ["revision", "INTEGER"], + ["blocked_task_id", "TEXT"], + ["blocked_summary", "TEXT"], + ["state_json", "TEXT"], + ["wait_json", "TEXT"], + ["cancel_requested_at", "INTEGER"], + ] as const; + for (const [column, type] of optionalColumns) { + if (!hasLegacyColumn(db, "flow_runs", column)) { + db.exec(`ALTER TABLE flow_runs ADD COLUMN ${column} ${type};`); + } + } + db.exec(` + UPDATE flow_runs + SET sync_mode = CASE + WHEN shape = 'single_task' THEN 'task_mirrored' + ELSE COALESCE(sync_mode, 'managed') + END + WHERE sync_mode IS NULL + `); + db.exec(` + UPDATE flow_runs + SET controller_id = 'core/legacy-restored' + WHERE sync_mode = 'managed' + AND (controller_id IS NULL OR trim(controller_id) = '') + `); + db.exec(` + UPDATE flow_runs + SET revision = 0 + WHERE revision IS NULL + `); +} + +function rowToSyncMode(row: FlowRegistryRow): TaskFlowSyncMode { + if (row.sync_mode === "task_mirrored" || row.sync_mode === "managed") { + return row.sync_mode; + } + return row.shape === "single_task" ? "task_mirrored" : "managed"; +} + +function rowToFlowRecord(row: FlowRegistryRow): TaskFlowRecord { + const endedAt = normalizeNumber(row.ended_at); + const cancelRequestedAt = normalizeNumber(row.cancel_requested_at); + const requesterOrigin = parseJsonValue(row.requester_origin_json); + const stateJson = parseJsonValue(row.state_json); + const waitJson = parseJsonValue(row.wait_json); + return { + flowId: row.flow_id, + syncMode: rowToSyncMode(row), + ownerKey: row.owner_key, + ...(requesterOrigin ? { requesterOrigin } : {}), + ...(row.controller_id ? { controllerId: row.controller_id } : {}), + revision: normalizeNumber(row.revision) ?? 0, + status: row.status, + notifyPolicy: row.notify_policy, + goal: row.goal, + ...(row.current_step ? { currentStep: row.current_step } : {}), + ...(row.blocked_task_id ? { blockedTaskId: row.blocked_task_id } : {}), + ...(row.blocked_summary ? { blockedSummary: row.blocked_summary } : {}), + ...(stateJson !== undefined ? { stateJson } : {}), + ...(waitJson !== undefined ? { waitJson } : {}), + ...(cancelRequestedAt != null ? { cancelRequestedAt } : {}), + createdAt: normalizeNumber(row.created_at) ?? 0, + updatedAt: normalizeNumber(row.updated_at) ?? 0, + ...(endedAt != null ? { endedAt } : {}), + }; +} + +function bindFlowRecord(record: TaskFlowRecord): Insertable { + return { + flow_id: record.flowId, + sync_mode: record.syncMode, + shape: null, + owner_key: record.ownerKey, + requester_origin_json: serializeJson(record.requesterOrigin), + controller_id: record.controllerId ?? null, + revision: record.revision, + status: record.status, + notify_policy: record.notifyPolicy, + goal: record.goal, + current_step: record.currentStep ?? null, + blocked_task_id: record.blockedTaskId ?? null, + blocked_summary: record.blockedSummary ?? null, + state_json: serializeJson(record.stateJson), + wait_json: serializeJson(record.waitJson), + cancel_requested_at: record.cancelRequestedAt ?? null, + created_at: record.createdAt, + updated_at: record.updatedAt, + ended_at: record.endedAt ?? null, + }; +} + +function selectFlowRows(db: DatabaseSync): FlowRegistryRow[] { + return executeSqliteQuerySync( + db, + getFlowRegistryKysely(db) + .selectFrom("flow_runs") + .select([ + "flow_id", + "sync_mode", + "shape", + "owner_key", + "requester_origin_json", + "controller_id", + "revision", + "status", + "notify_policy", + "goal", + "current_step", + "blocked_task_id", + "blocked_summary", + "state_json", + "wait_json", + "cancel_requested_at", + "created_at", + "updated_at", + "ended_at", + ]) + .orderBy("created_at", "asc") + .orderBy("flow_id", "asc"), + ).rows; +} + +function upsertFlowRow(db: DatabaseSync, row: Insertable): void { + executeSqliteQuerySync( + db, + getFlowRegistryKysely(db) + .insertInto("flow_runs") + .values(row) + .onConflict((conflict) => + conflict.column("flow_id").doUpdateSet({ + sync_mode: (eb) => eb.ref("excluded.sync_mode"), + owner_key: (eb) => eb.ref("excluded.owner_key"), + requester_origin_json: (eb) => eb.ref("excluded.requester_origin_json"), + controller_id: (eb) => eb.ref("excluded.controller_id"), + revision: (eb) => eb.ref("excluded.revision"), + status: (eb) => eb.ref("excluded.status"), + notify_policy: (eb) => eb.ref("excluded.notify_policy"), + goal: (eb) => eb.ref("excluded.goal"), + current_step: (eb) => eb.ref("excluded.current_step"), + blocked_task_id: (eb) => eb.ref("excluded.blocked_task_id"), + blocked_summary: (eb) => eb.ref("excluded.blocked_summary"), + state_json: (eb) => eb.ref("excluded.state_json"), + wait_json: (eb) => eb.ref("excluded.wait_json"), + cancel_requested_at: (eb) => eb.ref("excluded.cancel_requested_at"), + created_at: (eb) => eb.ref("excluded.created_at"), + updated_at: (eb) => eb.ref("excluded.updated_at"), + ended_at: (eb) => eb.ref("excluded.ended_at"), + }), + ), + ); +} + +export function legacyTaskFlowRegistrySidecarExists(env: NodeJS.ProcessEnv = process.env): boolean { + return existsSync(resolveLegacyTaskFlowRegistrySqlitePath(env)); +} + +export function importLegacyTaskFlowRegistrySidecarToSqlite(env: NodeJS.ProcessEnv = process.env): { + importedFlows: number; + removedSource: boolean; + sourcePath: string; +} { + const sourcePath = resolveLegacyTaskFlowRegistrySqlitePath(env); + if (!existsSync(sourcePath)) { + return { + importedFlows: 0, + removedSource: false, + sourcePath, + }; + } + + const { DatabaseSync } = requireNodeSqlite(); + const legacyDb = new DatabaseSync(sourcePath); + let flows: TaskFlowRecord[]; + try { + ensureLegacyTaskFlowRegistrySchema(legacyDb); + flows = selectFlowRows(legacyDb).map(rowToFlowRecord); + } finally { + legacyDb.close(); + } + runOpenClawStateWriteTransaction( + (database) => { + for (const flow of flows) { + upsertFlowRow(database.db, bindFlowRecord(flow)); + } + }, + { env }, + ); + return { + importedFlows: flows.length, + removedSource: removeSqliteSidecars(sourcePath), + sourcePath, + }; +} + +export function removeLegacyTaskFlowRegistrySidecar(env: NodeJS.ProcessEnv = process.env): { + removedSource: boolean; + sourcePath: string; +} { + const sourcePath = resolveLegacyTaskFlowRegistrySqlitePath(env); + return { + removedSource: removeSqliteSidecars(sourcePath), + sourcePath, + }; +} diff --git a/src/commands/doctor-sqlite-state.ts b/src/commands/doctor-sqlite-state.ts index 065086216be..40c8acbdf3b 100644 --- a/src/commands/doctor-sqlite-state.ts +++ b/src/commands/doctor-sqlite-state.ts @@ -62,10 +62,6 @@ import { importLegacyChannelPairingFilesToSqlite, legacyChannelPairingFilesExist, } from "../pairing/pairing-store.js"; -import { - importLegacyPluginStateSidecarToSqlite, - legacyPluginStateSidecarExists, -} from "../plugin-state/plugin-state-store.js"; import { importLegacyPluginBindingApprovalFileToSqlite, legacyPluginBindingApprovalFileExists, @@ -74,20 +70,20 @@ import { importLegacyInstalledPluginIndexFileToSqlite, legacyInstalledPluginIndexFileExists, } from "../plugins/installed-plugin-index-store.js"; -import { - importLegacyTaskFlowRegistrySidecarToSqlite, - legacyTaskFlowRegistrySidecarExists, -} from "../tasks/task-flow-registry.store.sqlite.js"; -import { - importLegacyTaskRegistrySidecarToSqlite, - legacyTaskRegistrySidecarExists, -} from "../tasks/task-registry.store.sqlite.js"; import { note } from "../terminal/note.js"; import { importLegacyTuiLastSessionStoreToSqlite, legacyTuiLastSessionFileExists, } from "../tui/tui-last-session.js"; import type { DoctorPrompter } from "./doctor-prompter.js"; +import { + importLegacyPluginStateSidecarToSqlite, + importLegacyTaskFlowRegistrySidecarToSqlite, + importLegacyTaskRegistrySidecarToSqlite, + legacyPluginStateSidecarExists, + legacyTaskFlowRegistrySidecarExists, + legacyTaskRegistrySidecarExists, +} from "./doctor-sqlite-sidecars.js"; type LegacyStateProbe = { deviceIdentity: boolean; diff --git a/src/plugin-state/plugin-state-store.sqlite.ts b/src/plugin-state/plugin-state-store.sqlite.ts index 9980fb5e14f..915620790d9 100644 --- a/src/plugin-state/plugin-state-store.sqlite.ts +++ b/src/plugin-state/plugin-state-store.sqlite.ts @@ -1,4 +1,3 @@ -import { existsSync, rmSync } from "node:fs"; import type { DatabaseSync } from "node:sqlite"; import type { Insertable, Selectable } from "kysely"; import { @@ -12,10 +11,7 @@ import { openOpenClawStateDatabase, runOpenClawStateWriteTransaction, } from "../state/openclaw-state-db.js"; -import { - resolveLegacyPluginStateSqlitePath, - resolvePluginStateSqlitePath, -} from "./plugin-state-store.paths.js"; +import { resolvePluginStateSqlitePath } from "./plugin-state-store.paths.js"; import { PluginStateStoreError, type PluginStateEntry, @@ -25,8 +21,6 @@ import { type PluginStateStoreProbeStep, } from "./plugin-state-store.types.js"; -const LEGACY_PLUGIN_STATE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const; - export const MAX_PLUGIN_STATE_VALUE_BYTES = 65_536; type PluginStateEntriesTable = OpenClawStateKyselyDatabase["plugin_state_entries"]; @@ -682,75 +676,3 @@ export function probePluginStateStore(): PluginStateStoreProbeResult { export function closePluginStateSqliteStore(): void { cachedDatabase = null; } - -export type LegacyPluginStateSidecarImportResult = { - sourcePath: string; - importedEntries: number; - removedSource: boolean; -}; - -function removeLegacyPluginStateSidecarFiles(sourcePath: string): boolean { - let removed = false; - for (const suffix of LEGACY_PLUGIN_STATE_SIDECAR_SUFFIXES) { - const candidate = `${sourcePath}${suffix}`; - if (!existsSync(candidate)) { - continue; - } - rmSync(candidate, { force: true }); - removed = true; - } - return removed; -} - -export function legacyPluginStateSidecarExists(env: NodeJS.ProcessEnv = process.env): boolean { - return existsSync(resolveLegacyPluginStateSqlitePath(env)); -} - -export function importLegacyPluginStateSidecarToSqlite( - env: NodeJS.ProcessEnv = process.env, -): LegacyPluginStateSidecarImportResult { - const sourcePath = resolveLegacyPluginStateSqlitePath(env); - if (!existsSync(sourcePath)) { - return { - sourcePath, - importedEntries: 0, - removedSource: false, - }; - } - - const { DatabaseSync } = requireNodeSqlite(); - const legacyDb = new DatabaseSync(sourcePath); - let rows: PluginStateRow[]; - try { - rows = executeSqliteQuerySync( - legacyDb, - getPluginStateKysely(legacyDb) - .selectFrom("plugin_state_entries") - .select(["plugin_id", "namespace", "entry_key", "value_json", "created_at", "expires_at"]) - .orderBy("plugin_id", "asc") - .orderBy("namespace", "asc") - .orderBy("entry_key", "asc"), - ).rows; - } finally { - legacyDb.close(); - } - - runOpenClawStateWriteTransaction( - (database) => { - for (const row of rows) { - upsertPluginStateEntry(database.db, { - ...row, - created_at: normalizeNumber(row.created_at) ?? 0, - expires_at: normalizeNumber(row.expires_at) ?? null, - }); - } - }, - { env }, - ); - - return { - sourcePath, - importedEntries: rows.length, - removedSource: removeLegacyPluginStateSidecarFiles(sourcePath), - }; -} diff --git a/src/plugin-state/plugin-state-store.test.ts b/src/plugin-state/plugin-state-store.test.ts index 2ac2b376326..4df7bae8b95 100644 --- a/src/plugin-state/plugin-state-store.test.ts +++ b/src/plugin-state/plugin-state-store.test.ts @@ -1,5 +1,9 @@ import { mkdirSync, statSync } from "node:fs"; import { afterEach, describe, expect, it, vi } from "vitest"; +import { + importLegacyPluginStateSidecarToSqlite, + legacyPluginStateSidecarExists, +} from "../commands/doctor-sqlite-sidecars.js"; import { requireNodeSqlite } from "../infra/node-sqlite.js"; import { closeOpenClawStateDatabaseForTest, @@ -10,8 +14,6 @@ import { closePluginStateSqliteStore, createCorePluginStateKeyedStore, createPluginStateKeyedStore, - importLegacyPluginStateSidecarToSqlite, - legacyPluginStateSidecarExists, PluginStateStoreError, probePluginStateStore, resetPluginStateStoreForTests, diff --git a/src/plugin-state/plugin-state-store.ts b/src/plugin-state/plugin-state-store.ts index dff168dab4e..eba2020db16 100644 --- a/src/plugin-state/plugin-state-store.ts +++ b/src/plugin-state/plugin-state-store.ts @@ -32,9 +32,7 @@ export type { export { PluginStateStoreError } from "./plugin-state-store.types.js"; export { closePluginStateSqliteStore, - importLegacyPluginStateSidecarToSqlite, isPluginStateDatabaseOpen, - legacyPluginStateSidecarExists, probePluginStateStore, sweepExpiredPluginStateEntries, } from "./plugin-state-store.sqlite.js"; diff --git a/src/tasks/task-flow-registry.store.sqlite.ts b/src/tasks/task-flow-registry.store.sqlite.ts index e3461c6f2fe..aad06f76832 100644 --- a/src/tasks/task-flow-registry.store.sqlite.ts +++ b/src/tasks/task-flow-registry.store.sqlite.ts @@ -1,15 +1,12 @@ -import { existsSync, rmSync } from "node:fs"; import type { DatabaseSync } from "node:sqlite"; import type { Insertable, Selectable } from "kysely"; import { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js"; -import { requireNodeSqlite } from "../infra/node-sqlite.js"; import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js"; import { openOpenClawStateDatabase, runOpenClawStateWriteTransaction, } from "../state/openclaw-state-db.js"; import type { DeliveryContext } from "../utils/delivery-context.types.js"; -import { resolveLegacyTaskFlowRegistrySqlitePath } from "./task-flow-registry.paths.js"; import type { TaskFlowRegistryStoreSnapshot } from "./task-flow-registry.store.types.js"; import { parseOptionalTaskFlowSyncMode, @@ -35,7 +32,6 @@ type FlowRegistryDatabase = { }; let cachedDatabase: FlowRegistryDatabase | null = null; -const SQLITE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const; function normalizeNumber(value: number | bigint | null): number | undefined { if (typeof value === "bigint") { @@ -181,105 +177,6 @@ function upsertFlowRow(db: DatabaseSync, row: Insertable): void { ); } -function hasLegacyFlowRunsColumn(db: DatabaseSync, columnName: string): boolean { - const rows = db.prepare(`PRAGMA table_info(flow_runs)`).all() as Array<{ name?: string }>; - return rows.some((row) => row.name === columnName); -} - -function ensureLegacyTaskFlowRegistrySchema(db: DatabaseSync) { - db.exec(` - CREATE TABLE IF NOT EXISTS flow_runs ( - flow_id TEXT NOT NULL PRIMARY KEY, - shape TEXT, - sync_mode TEXT NOT NULL DEFAULT 'managed', - owner_key TEXT NOT NULL, - requester_origin_json TEXT, - controller_id TEXT, - revision INTEGER NOT NULL DEFAULT 0, - status TEXT NOT NULL, - notify_policy TEXT NOT NULL, - goal TEXT NOT NULL, - current_step TEXT, - blocked_task_id TEXT, - blocked_summary TEXT, - state_json TEXT, - wait_json TEXT, - cancel_requested_at INTEGER, - created_at INTEGER NOT NULL, - updated_at INTEGER NOT NULL, - ended_at INTEGER - ); - `); - if ( - !hasLegacyFlowRunsColumn(db, "owner_key") && - hasLegacyFlowRunsColumn(db, "owner_session_key") - ) { - db.exec(`ALTER TABLE flow_runs ADD COLUMN owner_key TEXT;`); - db.exec(` - UPDATE flow_runs - SET owner_key = owner_session_key - WHERE owner_key IS NULL - `); - } - if (!hasLegacyFlowRunsColumn(db, "shape")) { - db.exec(`ALTER TABLE flow_runs ADD COLUMN shape TEXT;`); - } - if (!hasLegacyFlowRunsColumn(db, "sync_mode")) { - db.exec(`ALTER TABLE flow_runs ADD COLUMN sync_mode TEXT;`); - if (hasLegacyFlowRunsColumn(db, "shape")) { - db.exec(` - UPDATE flow_runs - SET sync_mode = CASE - WHEN shape = 'single_task' THEN 'task_mirrored' - ELSE 'managed' - END - WHERE sync_mode IS NULL - `); - } else { - db.exec(` - UPDATE flow_runs - SET sync_mode = 'managed' - WHERE sync_mode IS NULL - `); - } - } - if (!hasLegacyFlowRunsColumn(db, "controller_id")) { - db.exec(`ALTER TABLE flow_runs ADD COLUMN controller_id TEXT;`); - } - db.exec(` - UPDATE flow_runs - SET controller_id = 'core/legacy-restored' - WHERE sync_mode = 'managed' - AND (controller_id IS NULL OR trim(controller_id) = '') - `); - if (!hasLegacyFlowRunsColumn(db, "revision")) { - db.exec(`ALTER TABLE flow_runs ADD COLUMN revision INTEGER;`); - db.exec(` - UPDATE flow_runs - SET revision = 0 - WHERE revision IS NULL - `); - } - if (!hasLegacyFlowRunsColumn(db, "blocked_task_id")) { - db.exec(`ALTER TABLE flow_runs ADD COLUMN blocked_task_id TEXT;`); - } - if (!hasLegacyFlowRunsColumn(db, "blocked_summary")) { - db.exec(`ALTER TABLE flow_runs ADD COLUMN blocked_summary TEXT;`); - } - if (!hasLegacyFlowRunsColumn(db, "state_json")) { - db.exec(`ALTER TABLE flow_runs ADD COLUMN state_json TEXT;`); - } - if (!hasLegacyFlowRunsColumn(db, "wait_json")) { - db.exec(`ALTER TABLE flow_runs ADD COLUMN wait_json TEXT;`); - } - if (!hasLegacyFlowRunsColumn(db, "cancel_requested_at")) { - db.exec(`ALTER TABLE flow_runs ADD COLUMN cancel_requested_at INTEGER;`); - } - db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`); - db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_owner_key ON flow_runs(owner_key);`); - db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_updated_at ON flow_runs(updated_at);`); -} - function openFlowRegistryDatabase(): FlowRegistryDatabase { const database = openOpenClawStateDatabase(); const pathname = database.path; @@ -335,62 +232,3 @@ export function deleteTaskFlowRegistryRecordFromSqlite(flowId: string) { export function closeTaskFlowRegistrySqliteStore() { cachedDatabase = null; } - -export function legacyTaskFlowRegistrySidecarExists(env: NodeJS.ProcessEnv = process.env): boolean { - return existsSync(resolveLegacyTaskFlowRegistrySqlitePath(env)); -} - -function removeSqliteSidecars(pathname: string): boolean { - for (const suffix of SQLITE_SIDECAR_SUFFIXES) { - rmSync(`${pathname}${suffix}`, { force: true }); - } - return !existsSync(pathname); -} - -export function importLegacyTaskFlowRegistrySidecarToSqlite(env: NodeJS.ProcessEnv = process.env): { - importedFlows: number; - removedSource: boolean; - sourcePath: string; -} { - const sourcePath = resolveLegacyTaskFlowRegistrySqlitePath(env); - if (!existsSync(sourcePath)) { - return { - importedFlows: 0, - removedSource: false, - sourcePath, - }; - } - - const { DatabaseSync } = requireNodeSqlite(); - const legacyDb = new DatabaseSync(sourcePath); - let importedFlows = 0; - try { - ensureLegacyTaskFlowRegistrySchema(legacyDb); - const rows = selectFlowRows(legacyDb); - const flows = rows.map(rowToFlowRecord); - withWriteTransaction(({ db }) => { - for (const flow of flows) { - upsertFlowRow(db, bindFlowRecord(flow)); - } - }); - importedFlows = flows.length; - } finally { - legacyDb.close(); - } - return { - importedFlows, - removedSource: removeSqliteSidecars(sourcePath), - sourcePath, - }; -} - -export function removeLegacyTaskFlowRegistrySidecar(env: NodeJS.ProcessEnv = process.env): { - removedSource: boolean; - sourcePath: string; -} { - const sourcePath = resolveLegacyTaskFlowRegistrySqlitePath(env); - return { - removedSource: removeSqliteSidecars(sourcePath), - sourcePath, - }; -} diff --git a/src/tasks/task-flow-registry.store.test.ts b/src/tasks/task-flow-registry.store.test.ts index f3ea9b26495..3c13dbfb34c 100644 --- a/src/tasks/task-flow-registry.store.test.ts +++ b/src/tasks/task-flow-registry.store.test.ts @@ -1,6 +1,7 @@ import { existsSync, mkdirSync, statSync } from "node:fs"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { importLegacyTaskFlowRegistrySidecarToSqlite } from "../commands/doctor-sqlite-sidecars.js"; import { requireNodeSqlite } from "../infra/node-sqlite.js"; import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js"; import { @@ -16,7 +17,6 @@ import { resolveTaskFlowRegistrySqlitePath, } from "./task-flow-registry.paths.js"; import { configureTaskFlowRegistryRuntime } from "./task-flow-registry.store.js"; -import { importLegacyTaskFlowRegistrySidecarToSqlite } from "./task-flow-registry.store.sqlite.js"; import type { TaskFlowRecord } from "./task-flow-registry.types.js"; function createStoredFlow(): TaskFlowRecord { diff --git a/src/tasks/task-registry.store.sqlite.ts b/src/tasks/task-registry.store.sqlite.ts index 91ddd938a2e..6471ea515b3 100644 --- a/src/tasks/task-registry.store.sqlite.ts +++ b/src/tasks/task-registry.store.sqlite.ts @@ -1,15 +1,12 @@ -import { existsSync, rmSync } from "node:fs"; import type { DatabaseSync } from "node:sqlite"; import type { Insertable, Selectable } from "kysely"; import { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js"; -import { requireNodeSqlite } from "../infra/node-sqlite.js"; import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js"; import { openOpenClawStateDatabase, runOpenClawStateWriteTransaction, } from "../state/openclaw-state-db.js"; import type { DeliveryContext } from "../utils/delivery-context.types.js"; -import { resolveLegacyTaskRegistrySqlitePath } from "./task-registry.paths.js"; import type { TaskRegistryStoreSnapshot } from "./task-registry.store.types.js"; import { parseOptionalTaskTerminalOutcome, @@ -40,17 +37,12 @@ type TaskRegistryRow = Selectable & { type TaskDeliveryStateRow = Selectable; -type TableInfoRow = { - name: string; -}; - type TaskRegistryDatabase = { db: DatabaseSync; path: string; }; let cachedDatabase: TaskRegistryDatabase | null = null; -const SQLITE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const; function normalizeNumber(value: number | bigint | null): number | undefined { if (typeof value === "bigint") { @@ -267,110 +259,6 @@ function replaceTaskDeliveryStateRow( ); } -function hasLegacyTaskRunsColumn(db: DatabaseSync, columnName: string): boolean { - const rows = db.prepare(`PRAGMA table_info(task_runs)`).all() as TableInfoRow[]; - return rows.some((row) => row.name === columnName); -} - -function migrateLegacyOwnerColumns(db: DatabaseSync) { - if (!hasLegacyTaskRunsColumn(db, "owner_key")) { - db.exec(`ALTER TABLE task_runs ADD COLUMN owner_key TEXT;`); - } - if (!hasLegacyTaskRunsColumn(db, "requester_session_key")) { - db.exec(`ALTER TABLE task_runs ADD COLUMN requester_session_key TEXT;`); - } - if (!hasLegacyTaskRunsColumn(db, "scope_kind")) { - db.exec(`ALTER TABLE task_runs ADD COLUMN scope_kind TEXT NOT NULL DEFAULT 'session';`); - } - if (hasLegacyTaskRunsColumn(db, "requester_session_key")) { - db.exec(` - UPDATE task_runs - SET owner_key = requester_session_key - WHERE owner_key IS NULL - `); - } - db.exec(` - UPDATE task_runs - SET owner_key = CASE - WHEN trim(COALESCE(owner_key, '')) <> '' THEN trim(owner_key) - ELSE 'system:' || runtime || ':' || COALESCE(NULLIF(source_id, ''), task_id) - END - `); - db.exec(` - UPDATE task_runs - SET scope_kind = CASE - WHEN scope_kind = 'system' THEN 'system' - WHEN owner_key LIKE 'system:%' THEN 'system' - ELSE 'session' - END - `); - db.exec(` - UPDATE task_runs - SET requester_session_key = CASE - WHEN scope_kind = 'system' THEN '' - WHEN trim(COALESCE(requester_session_key, '')) <> '' THEN trim(requester_session_key) - ELSE owner_key - END - `); -} - -function ensureLegacyTaskRegistrySchema(db: DatabaseSync) { - db.exec(` - CREATE TABLE IF NOT EXISTS task_runs ( - task_id TEXT NOT NULL PRIMARY KEY, - runtime TEXT NOT NULL, - task_kind TEXT, - source_id TEXT, - requester_session_key TEXT, - owner_key TEXT NOT NULL, - scope_kind TEXT NOT NULL, - child_session_key TEXT, - parent_flow_id TEXT, - parent_task_id TEXT, - agent_id TEXT, - run_id TEXT, - label TEXT, - task TEXT NOT NULL, - status TEXT NOT NULL, - delivery_status TEXT NOT NULL, - notify_policy TEXT NOT NULL, - created_at INTEGER NOT NULL, - started_at INTEGER, - ended_at INTEGER, - last_event_at INTEGER, - cleanup_after INTEGER, - error TEXT, - progress_summary TEXT, - terminal_summary TEXT, - terminal_outcome TEXT - ); - `); - migrateLegacyOwnerColumns(db); - if (!hasLegacyTaskRunsColumn(db, "task_kind")) { - db.exec(`ALTER TABLE task_runs ADD COLUMN task_kind TEXT;`); - } - if (!hasLegacyTaskRunsColumn(db, "parent_flow_id")) { - db.exec(`ALTER TABLE task_runs ADD COLUMN parent_flow_id TEXT;`); - } - db.exec(` - CREATE TABLE IF NOT EXISTS task_delivery_state ( - task_id TEXT NOT NULL PRIMARY KEY, - requester_origin_json TEXT, - last_notified_event_at INTEGER - ); - `); - db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_run_id ON task_runs(run_id);`); - db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_status ON task_runs(status);`); - db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_runtime_status ON task_runs(runtime, status);`); - db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_cleanup_after ON task_runs(cleanup_after);`); - db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_last_event_at ON task_runs(last_event_at);`); - db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_owner_key ON task_runs(owner_key);`); - db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_parent_flow_id ON task_runs(parent_flow_id);`); - db.exec( - `CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);`, - ); -} - function openTaskRegistryDatabase(): TaskRegistryDatabase { const database = openOpenClawStateDatabase(); const pathname = database.path; @@ -480,72 +368,3 @@ export function deleteTaskDeliveryStateFromSqlite(taskId: string) { export function closeTaskRegistrySqliteStore() { cachedDatabase = null; } - -export function legacyTaskRegistrySidecarExists(env: NodeJS.ProcessEnv = process.env): boolean { - return existsSync(resolveLegacyTaskRegistrySqlitePath(env)); -} - -function removeSqliteSidecars(pathname: string): boolean { - for (const suffix of SQLITE_SIDECAR_SUFFIXES) { - rmSync(`${pathname}${suffix}`, { force: true }); - } - return !existsSync(pathname); -} - -export function importLegacyTaskRegistrySidecarToSqlite(env: NodeJS.ProcessEnv = process.env): { - importedTasks: number; - importedDeliveryStates: number; - removedSource: boolean; - sourcePath: string; -} { - const sourcePath = resolveLegacyTaskRegistrySqlitePath(env); - if (!existsSync(sourcePath)) { - return { - importedTasks: 0, - importedDeliveryStates: 0, - removedSource: false, - sourcePath, - }; - } - - const { DatabaseSync } = requireNodeSqlite(); - const legacyDb = new DatabaseSync(sourcePath); - let importedTasks = 0; - let importedDeliveryStates = 0; - try { - ensureLegacyTaskRegistrySchema(legacyDb); - const taskRows = selectTaskRows(legacyDb); - const deliveryRows = selectTaskDeliveryStateRows(legacyDb); - const tasks = taskRows.map(rowToTaskRecord); - const deliveryStates = deliveryRows.map(rowToTaskDeliveryState); - withWriteTransaction(({ db }) => { - for (const task of tasks) { - upsertTaskRow(db, bindTaskRecordBase(task)); - } - for (const deliveryState of deliveryStates) { - replaceTaskDeliveryStateRow(db, bindTaskDeliveryState(deliveryState)); - } - }); - importedTasks = tasks.length; - importedDeliveryStates = deliveryStates.length; - } finally { - legacyDb.close(); - } - return { - importedTasks, - importedDeliveryStates, - removedSource: removeSqliteSidecars(sourcePath), - sourcePath, - }; -} - -export function removeLegacyTaskRegistrySidecar(env: NodeJS.ProcessEnv = process.env): { - removedSource: boolean; - sourcePath: string; -} { - const sourcePath = resolveLegacyTaskRegistrySqlitePath(env); - return { - removedSource: removeSqliteSidecars(sourcePath), - sourcePath, - }; -} diff --git a/src/tasks/task-registry.store.test.ts b/src/tasks/task-registry.store.test.ts index 14b5dddd457..7635c8f5911 100644 --- a/src/tasks/task-registry.store.test.ts +++ b/src/tasks/task-registry.store.test.ts @@ -1,6 +1,7 @@ import { existsSync, mkdirSync, statSync } from "node:fs"; import path from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; +import { importLegacyTaskRegistrySidecarToSqlite } from "../commands/doctor-sqlite-sidecars.js"; import { requireNodeSqlite } from "../infra/node-sqlite.js"; import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js"; import { createManagedTaskFlow, resetTaskFlowRegistryForTests } from "./task-flow-registry.js"; @@ -21,7 +22,6 @@ import { configureTaskRegistryRuntime, type TaskRegistryObserverEvent, } from "./task-registry.store.js"; -import { importLegacyTaskRegistrySidecarToSqlite } from "./task-registry.store.sqlite.js"; import type { TaskRecord } from "./task-registry.types.js"; const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;