refactor: move legacy sidecar imports to doctor

This commit is contained in:
Peter Steinberger
2026-05-08 13:12:13 +01:00
parent b5fdf9ec2a
commit 4e7d3975fa
10 changed files with 767 additions and 443 deletions

View File

@@ -133,9 +133,6 @@ const allowedExactPaths = new Set([
"extensions/qqbot/src/state-migrations.ts",
"extensions/telegram/src/state-migrations.ts",
"src/infra/state-migrations.ts",
"src/plugin-state/plugin-state-store.sqlite.ts",
"src/tasks/task-flow-registry.store.sqlite.ts",
"src/tasks/task-registry.store.sqlite.ts",
"src/trajectory/export.ts",
"src/trajectory/paths.ts",
]);

View File

@@ -0,0 +1,752 @@
import { existsSync, rmSync } from "node:fs";
import type { DatabaseSync } from "node:sqlite";
import type { Insertable, Selectable } from "kysely";
import { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { resolveLegacyPluginStateSqlitePath } from "../plugin-state/plugin-state-store.paths.js";
import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js";
import { runOpenClawStateWriteTransaction } from "../state/openclaw-state-db.js";
import { resolveLegacyTaskFlowRegistrySqlitePath } from "../tasks/task-flow-registry.paths.js";
import type { TaskFlowRecord, TaskFlowSyncMode } from "../tasks/task-flow-registry.types.js";
import { resolveLegacyTaskRegistrySqlitePath } from "../tasks/task-registry.paths.js";
import type { TaskDeliveryState, TaskRecord } from "../tasks/task-registry.types.js";
import type { DeliveryContext } from "../utils/delivery-context.types.js";
const SQLITE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
type PluginStateEntriesTable = OpenClawStateKyselyDatabase["plugin_state_entries"];
type PluginStateRow = Selectable<PluginStateEntriesTable>;
type PluginStateDatabase = Pick<OpenClawStateKyselyDatabase, "plugin_state_entries">;
type TaskRunsTable = OpenClawStateKyselyDatabase["task_runs"];
type TaskDeliveryStateTable = OpenClawStateKyselyDatabase["task_delivery_state"];
type TaskRegistryDatabase = Pick<OpenClawStateKyselyDatabase, "task_delivery_state" | "task_runs">;
type TaskRegistryRow = Selectable<TaskRunsTable> & {
runtime: TaskRecord["runtime"];
scope_kind: TaskRecord["scopeKind"];
status: TaskRecord["status"];
delivery_status: TaskRecord["deliveryStatus"];
notify_policy: TaskRecord["notifyPolicy"];
terminal_outcome: TaskRecord["terminalOutcome"] | null;
};
type TaskDeliveryStateRow = Selectable<TaskDeliveryStateTable>;
type FlowRunsTable = OpenClawStateKyselyDatabase["flow_runs"];
type FlowRegistryDatabase = Pick<OpenClawStateKyselyDatabase, "flow_runs">;
type FlowRegistryRow = Selectable<FlowRunsTable> & {
sync_mode: TaskFlowSyncMode | null;
status: TaskFlowRecord["status"];
notify_policy: TaskFlowRecord["notifyPolicy"];
};
type TableInfoRow = {
name: string;
};
export type LegacyPluginStateSidecarImportResult = {
sourcePath: string;
importedEntries: number;
removedSource: boolean;
};
function normalizeNumber(value: number | bigint | null): number | undefined {
if (typeof value === "bigint") {
return Number(value);
}
return typeof value === "number" ? value : undefined;
}
function serializeJson(value: unknown): string | null {
return value === undefined ? null : JSON.stringify(value);
}
// oxlint-disable-next-line typescript/no-unnecessary-type-parameters -- Legacy JSON columns are typed by the receiving field.
function parseJsonValue<T>(raw: string | null): T | undefined {
if (!raw?.trim()) {
return undefined;
}
try {
return JSON.parse(raw) as T;
} catch {
return undefined;
}
}
function getPluginStateKysely(db: DatabaseSync) {
return getNodeSqliteKysely<PluginStateDatabase>(db);
}
function getTaskRegistryKysely(db: DatabaseSync) {
return getNodeSqliteKysely<TaskRegistryDatabase>(db);
}
function getFlowRegistryKysely(db: DatabaseSync) {
return getNodeSqliteKysely<FlowRegistryDatabase>(db);
}
function removeSqliteSidecars(pathname: string): boolean {
for (const suffix of SQLITE_SIDECAR_SUFFIXES) {
rmSync(`${pathname}${suffix}`, { force: true });
}
return !existsSync(pathname);
}
function selectPluginStateRows(db: DatabaseSync): PluginStateRow[] {
return executeSqliteQuerySync<PluginStateRow>(
db,
getPluginStateKysely(db)
.selectFrom("plugin_state_entries")
.select(["plugin_id", "namespace", "entry_key", "value_json", "created_at", "expires_at"])
.orderBy("plugin_id", "asc")
.orderBy("namespace", "asc")
.orderBy("entry_key", "asc"),
).rows;
}
function upsertPluginStateRow(db: DatabaseSync, row: Insertable<PluginStateEntriesTable>): void {
executeSqliteQuerySync(
db,
getPluginStateKysely(db)
.insertInto("plugin_state_entries")
.values(row)
.onConflict((conflict) =>
conflict.columns(["plugin_id", "namespace", "entry_key"]).doUpdateSet({
value_json: (eb) => eb.ref("excluded.value_json"),
created_at: (eb) => eb.ref("excluded.created_at"),
expires_at: (eb) => eb.ref("excluded.expires_at"),
}),
),
);
}
export function legacyPluginStateSidecarExists(env: NodeJS.ProcessEnv = process.env): boolean {
return existsSync(resolveLegacyPluginStateSqlitePath(env));
}
export function importLegacyPluginStateSidecarToSqlite(
env: NodeJS.ProcessEnv = process.env,
): LegacyPluginStateSidecarImportResult {
const sourcePath = resolveLegacyPluginStateSqlitePath(env);
if (!existsSync(sourcePath)) {
return {
sourcePath,
importedEntries: 0,
removedSource: false,
};
}
const { DatabaseSync } = requireNodeSqlite();
const legacyDb = new DatabaseSync(sourcePath);
let rows: PluginStateRow[];
try {
rows = selectPluginStateRows(legacyDb);
} finally {
legacyDb.close();
}
runOpenClawStateWriteTransaction(
(database) => {
for (const row of rows) {
upsertPluginStateRow(database.db, {
...row,
created_at: normalizeNumber(row.created_at) ?? 0,
expires_at: normalizeNumber(row.expires_at) ?? null,
});
}
},
{ env },
);
return {
sourcePath,
importedEntries: rows.length,
removedSource: removeSqliteSidecars(sourcePath),
};
}
function hasLegacyColumn(db: DatabaseSync, tableName: string, columnName: string): boolean {
const rows = db.prepare(`PRAGMA table_info(${tableName})`).all() as TableInfoRow[];
return rows.some((row) => row.name === columnName);
}
function migrateLegacyTaskOwnerColumns(db: DatabaseSync) {
if (!hasLegacyColumn(db, "task_runs", "owner_key")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN owner_key TEXT;`);
}
if (!hasLegacyColumn(db, "task_runs", "requester_session_key")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN requester_session_key TEXT;`);
}
if (!hasLegacyColumn(db, "task_runs", "scope_kind")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN scope_kind TEXT NOT NULL DEFAULT 'session';`);
}
db.exec(`
UPDATE task_runs
SET owner_key = requester_session_key
WHERE owner_key IS NULL
`);
db.exec(`
UPDATE task_runs
SET owner_key = CASE
WHEN trim(COALESCE(owner_key, '')) <> '' THEN trim(owner_key)
ELSE 'system:' || runtime || ':' || COALESCE(NULLIF(source_id, ''), task_id)
END
`);
db.exec(`
UPDATE task_runs
SET scope_kind = CASE
WHEN scope_kind = 'system' THEN 'system'
WHEN owner_key LIKE 'system:%' THEN 'system'
ELSE 'session'
END
`);
db.exec(`
UPDATE task_runs
SET requester_session_key = CASE
WHEN scope_kind = 'system' THEN ''
WHEN trim(COALESCE(requester_session_key, '')) <> '' THEN trim(requester_session_key)
ELSE owner_key
END
`);
}
function ensureLegacyTaskRegistrySchema(db: DatabaseSync) {
db.exec(`
CREATE TABLE IF NOT EXISTS task_runs (
task_id TEXT NOT NULL PRIMARY KEY,
runtime TEXT NOT NULL,
task_kind TEXT,
source_id TEXT,
requester_session_key TEXT,
owner_key TEXT NOT NULL,
scope_kind TEXT NOT NULL,
child_session_key TEXT,
parent_flow_id TEXT,
parent_task_id TEXT,
agent_id TEXT,
run_id TEXT,
label TEXT,
task TEXT NOT NULL,
status TEXT NOT NULL,
delivery_status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
created_at INTEGER NOT NULL,
started_at INTEGER,
ended_at INTEGER,
last_event_at INTEGER,
cleanup_after INTEGER,
error TEXT,
progress_summary TEXT,
terminal_summary TEXT,
terminal_outcome TEXT
);
`);
migrateLegacyTaskOwnerColumns(db);
for (const column of ["task_kind", "parent_flow_id"]) {
if (!hasLegacyColumn(db, "task_runs", column)) {
db.exec(`ALTER TABLE task_runs ADD COLUMN ${column} TEXT;`);
}
}
db.exec(`
CREATE TABLE IF NOT EXISTS task_delivery_state (
task_id TEXT NOT NULL PRIMARY KEY,
requester_origin_json TEXT,
last_notified_event_at INTEGER
);
`);
}
function rowToTaskRecord(row: TaskRegistryRow): TaskRecord {
const startedAt = normalizeNumber(row.started_at);
const endedAt = normalizeNumber(row.ended_at);
const lastEventAt = normalizeNumber(row.last_event_at);
const cleanupAfter = normalizeNumber(row.cleanup_after);
const requesterSessionKey =
row.scope_kind === "system" ? "" : row.requester_session_key?.trim() || row.owner_key;
return {
taskId: row.task_id,
runtime: row.runtime,
...(row.task_kind ? { taskKind: row.task_kind } : {}),
...(row.source_id ? { sourceId: row.source_id } : {}),
requesterSessionKey,
ownerKey: row.owner_key,
scopeKind: row.scope_kind,
...(row.child_session_key ? { childSessionKey: row.child_session_key } : {}),
...(row.parent_flow_id ? { parentFlowId: row.parent_flow_id } : {}),
...(row.parent_task_id ? { parentTaskId: row.parent_task_id } : {}),
...(row.agent_id ? { agentId: row.agent_id } : {}),
...(row.run_id ? { runId: row.run_id } : {}),
...(row.label ? { label: row.label } : {}),
task: row.task,
status: row.status,
deliveryStatus: row.delivery_status,
notifyPolicy: row.notify_policy,
createdAt: normalizeNumber(row.created_at) ?? 0,
...(startedAt != null ? { startedAt } : {}),
...(endedAt != null ? { endedAt } : {}),
...(lastEventAt != null ? { lastEventAt } : {}),
...(cleanupAfter != null ? { cleanupAfter } : {}),
...(row.error ? { error: row.error } : {}),
...(row.progress_summary ? { progressSummary: row.progress_summary } : {}),
...(row.terminal_summary ? { terminalSummary: row.terminal_summary } : {}),
...(row.terminal_outcome ? { terminalOutcome: row.terminal_outcome } : {}),
};
}
function rowToTaskDeliveryState(row: TaskDeliveryStateRow): TaskDeliveryState {
const requesterOrigin = parseJsonValue<DeliveryContext>(row.requester_origin_json);
const lastNotifiedEventAt = normalizeNumber(row.last_notified_event_at);
return {
taskId: row.task_id,
...(requesterOrigin ? { requesterOrigin } : {}),
...(lastNotifiedEventAt != null ? { lastNotifiedEventAt } : {}),
};
}
function bindTaskRecord(record: TaskRecord): Insertable<TaskRunsTable> {
return {
task_id: record.taskId,
runtime: record.runtime,
task_kind: record.taskKind ?? null,
source_id: record.sourceId ?? null,
requester_session_key: record.scopeKind === "system" ? "" : record.requesterSessionKey,
owner_key: record.ownerKey,
scope_kind: record.scopeKind,
child_session_key: record.childSessionKey ?? null,
parent_flow_id: record.parentFlowId ?? null,
parent_task_id: record.parentTaskId ?? null,
agent_id: record.agentId ?? null,
run_id: record.runId ?? null,
label: record.label ?? null,
task: record.task,
status: record.status,
delivery_status: record.deliveryStatus,
notify_policy: record.notifyPolicy,
created_at: record.createdAt,
started_at: record.startedAt ?? null,
ended_at: record.endedAt ?? null,
last_event_at: record.lastEventAt ?? null,
cleanup_after: record.cleanupAfter ?? null,
error: record.error ?? null,
progress_summary: record.progressSummary ?? null,
terminal_summary: record.terminalSummary ?? null,
terminal_outcome: record.terminalOutcome ?? null,
};
}
function bindTaskDeliveryState(state: TaskDeliveryState): Insertable<TaskDeliveryStateTable> {
return {
task_id: state.taskId,
requester_origin_json: serializeJson(state.requesterOrigin),
last_notified_event_at: state.lastNotifiedEventAt ?? null,
};
}
function selectTaskRows(db: DatabaseSync): TaskRegistryRow[] {
return executeSqliteQuerySync<TaskRegistryRow>(
db,
getTaskRegistryKysely(db)
.selectFrom("task_runs")
.select([
"task_id",
"runtime",
"task_kind",
"source_id",
"requester_session_key",
"owner_key",
"scope_kind",
"child_session_key",
"parent_flow_id",
"parent_task_id",
"agent_id",
"run_id",
"label",
"task",
"status",
"delivery_status",
"notify_policy",
"created_at",
"started_at",
"ended_at",
"last_event_at",
"cleanup_after",
"error",
"progress_summary",
"terminal_summary",
"terminal_outcome",
])
.orderBy("created_at", "asc")
.orderBy("task_id", "asc"),
).rows;
}
function selectTaskDeliveryStateRows(db: DatabaseSync): TaskDeliveryStateRow[] {
return executeSqliteQuerySync<TaskDeliveryStateRow>(
db,
getTaskRegistryKysely(db)
.selectFrom("task_delivery_state")
.select(["task_id", "requester_origin_json", "last_notified_event_at"])
.orderBy("task_id", "asc"),
).rows;
}
function upsertTaskRow(db: DatabaseSync, row: Insertable<TaskRunsTable>): void {
executeSqliteQuerySync(
db,
getTaskRegistryKysely(db)
.insertInto("task_runs")
.values(row)
.onConflict((conflict) =>
conflict.column("task_id").doUpdateSet({
runtime: (eb) => eb.ref("excluded.runtime"),
task_kind: (eb) => eb.ref("excluded.task_kind"),
source_id: (eb) => eb.ref("excluded.source_id"),
requester_session_key: (eb) => eb.ref("excluded.requester_session_key"),
owner_key: (eb) => eb.ref("excluded.owner_key"),
scope_kind: (eb) => eb.ref("excluded.scope_kind"),
child_session_key: (eb) => eb.ref("excluded.child_session_key"),
parent_flow_id: (eb) => eb.ref("excluded.parent_flow_id"),
parent_task_id: (eb) => eb.ref("excluded.parent_task_id"),
agent_id: (eb) => eb.ref("excluded.agent_id"),
run_id: (eb) => eb.ref("excluded.run_id"),
label: (eb) => eb.ref("excluded.label"),
task: (eb) => eb.ref("excluded.task"),
status: (eb) => eb.ref("excluded.status"),
delivery_status: (eb) => eb.ref("excluded.delivery_status"),
notify_policy: (eb) => eb.ref("excluded.notify_policy"),
created_at: (eb) => eb.ref("excluded.created_at"),
started_at: (eb) => eb.ref("excluded.started_at"),
ended_at: (eb) => eb.ref("excluded.ended_at"),
last_event_at: (eb) => eb.ref("excluded.last_event_at"),
cleanup_after: (eb) => eb.ref("excluded.cleanup_after"),
error: (eb) => eb.ref("excluded.error"),
progress_summary: (eb) => eb.ref("excluded.progress_summary"),
terminal_summary: (eb) => eb.ref("excluded.terminal_summary"),
terminal_outcome: (eb) => eb.ref("excluded.terminal_outcome"),
}),
),
);
}
function replaceTaskDeliveryStateRow(
db: DatabaseSync,
row: Insertable<TaskDeliveryStateTable>,
): void {
executeSqliteQuerySync(
db,
getTaskRegistryKysely(db)
.insertInto("task_delivery_state")
.values(row)
.onConflict((conflict) =>
conflict.column("task_id").doUpdateSet({
requester_origin_json: (eb) => eb.ref("excluded.requester_origin_json"),
last_notified_event_at: (eb) => eb.ref("excluded.last_notified_event_at"),
}),
),
);
}
export function legacyTaskRegistrySidecarExists(env: NodeJS.ProcessEnv = process.env): boolean {
return existsSync(resolveLegacyTaskRegistrySqlitePath(env));
}
export function importLegacyTaskRegistrySidecarToSqlite(env: NodeJS.ProcessEnv = process.env): {
importedTasks: number;
importedDeliveryStates: number;
removedSource: boolean;
sourcePath: string;
} {
const sourcePath = resolveLegacyTaskRegistrySqlitePath(env);
if (!existsSync(sourcePath)) {
return {
importedTasks: 0,
importedDeliveryStates: 0,
removedSource: false,
sourcePath,
};
}
const { DatabaseSync } = requireNodeSqlite();
const legacyDb = new DatabaseSync(sourcePath);
let tasks: TaskRecord[];
let deliveryStates: TaskDeliveryState[];
try {
ensureLegacyTaskRegistrySchema(legacyDb);
tasks = selectTaskRows(legacyDb).map(rowToTaskRecord);
deliveryStates = selectTaskDeliveryStateRows(legacyDb).map(rowToTaskDeliveryState);
} finally {
legacyDb.close();
}
runOpenClawStateWriteTransaction(
(database) => {
for (const task of tasks) {
upsertTaskRow(database.db, bindTaskRecord(task));
}
for (const deliveryState of deliveryStates) {
replaceTaskDeliveryStateRow(database.db, bindTaskDeliveryState(deliveryState));
}
},
{ env },
);
return {
importedTasks: tasks.length,
importedDeliveryStates: deliveryStates.length,
removedSource: removeSqliteSidecars(sourcePath),
sourcePath,
};
}
export function removeLegacyTaskRegistrySidecar(env: NodeJS.ProcessEnv = process.env): {
removedSource: boolean;
sourcePath: string;
} {
const sourcePath = resolveLegacyTaskRegistrySqlitePath(env);
return {
removedSource: removeSqliteSidecars(sourcePath),
sourcePath,
};
}
function ensureLegacyTaskFlowRegistrySchema(db: DatabaseSync) {
db.exec(`
CREATE TABLE IF NOT EXISTS flow_runs (
flow_id TEXT NOT NULL PRIMARY KEY,
shape TEXT,
sync_mode TEXT NOT NULL DEFAULT 'managed',
owner_key TEXT NOT NULL,
requester_origin_json TEXT,
controller_id TEXT,
revision INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
goal TEXT NOT NULL,
current_step TEXT,
blocked_task_id TEXT,
blocked_summary TEXT,
state_json TEXT,
wait_json TEXT,
cancel_requested_at INTEGER,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
ended_at INTEGER
);
`);
if (
!hasLegacyColumn(db, "flow_runs", "owner_key") &&
hasLegacyColumn(db, "flow_runs", "owner_session_key")
) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN owner_key TEXT;`);
db.exec(`
UPDATE flow_runs
SET owner_key = owner_session_key
WHERE owner_key IS NULL
`);
}
const optionalColumns = [
["shape", "TEXT"],
["sync_mode", "TEXT"],
["controller_id", "TEXT"],
["revision", "INTEGER"],
["blocked_task_id", "TEXT"],
["blocked_summary", "TEXT"],
["state_json", "TEXT"],
["wait_json", "TEXT"],
["cancel_requested_at", "INTEGER"],
] as const;
for (const [column, type] of optionalColumns) {
if (!hasLegacyColumn(db, "flow_runs", column)) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN ${column} ${type};`);
}
}
db.exec(`
UPDATE flow_runs
SET sync_mode = CASE
WHEN shape = 'single_task' THEN 'task_mirrored'
ELSE COALESCE(sync_mode, 'managed')
END
WHERE sync_mode IS NULL
`);
db.exec(`
UPDATE flow_runs
SET controller_id = 'core/legacy-restored'
WHERE sync_mode = 'managed'
AND (controller_id IS NULL OR trim(controller_id) = '')
`);
db.exec(`
UPDATE flow_runs
SET revision = 0
WHERE revision IS NULL
`);
}
function rowToSyncMode(row: FlowRegistryRow): TaskFlowSyncMode {
if (row.sync_mode === "task_mirrored" || row.sync_mode === "managed") {
return row.sync_mode;
}
return row.shape === "single_task" ? "task_mirrored" : "managed";
}
function rowToFlowRecord(row: FlowRegistryRow): TaskFlowRecord {
const endedAt = normalizeNumber(row.ended_at);
const cancelRequestedAt = normalizeNumber(row.cancel_requested_at);
const requesterOrigin = parseJsonValue<DeliveryContext>(row.requester_origin_json);
const stateJson = parseJsonValue<TaskFlowRecord["stateJson"]>(row.state_json);
const waitJson = parseJsonValue<TaskFlowRecord["waitJson"]>(row.wait_json);
return {
flowId: row.flow_id,
syncMode: rowToSyncMode(row),
ownerKey: row.owner_key,
...(requesterOrigin ? { requesterOrigin } : {}),
...(row.controller_id ? { controllerId: row.controller_id } : {}),
revision: normalizeNumber(row.revision) ?? 0,
status: row.status,
notifyPolicy: row.notify_policy,
goal: row.goal,
...(row.current_step ? { currentStep: row.current_step } : {}),
...(row.blocked_task_id ? { blockedTaskId: row.blocked_task_id } : {}),
...(row.blocked_summary ? { blockedSummary: row.blocked_summary } : {}),
...(stateJson !== undefined ? { stateJson } : {}),
...(waitJson !== undefined ? { waitJson } : {}),
...(cancelRequestedAt != null ? { cancelRequestedAt } : {}),
createdAt: normalizeNumber(row.created_at) ?? 0,
updatedAt: normalizeNumber(row.updated_at) ?? 0,
...(endedAt != null ? { endedAt } : {}),
};
}
function bindFlowRecord(record: TaskFlowRecord): Insertable<FlowRunsTable> {
return {
flow_id: record.flowId,
sync_mode: record.syncMode,
shape: null,
owner_key: record.ownerKey,
requester_origin_json: serializeJson(record.requesterOrigin),
controller_id: record.controllerId ?? null,
revision: record.revision,
status: record.status,
notify_policy: record.notifyPolicy,
goal: record.goal,
current_step: record.currentStep ?? null,
blocked_task_id: record.blockedTaskId ?? null,
blocked_summary: record.blockedSummary ?? null,
state_json: serializeJson(record.stateJson),
wait_json: serializeJson(record.waitJson),
cancel_requested_at: record.cancelRequestedAt ?? null,
created_at: record.createdAt,
updated_at: record.updatedAt,
ended_at: record.endedAt ?? null,
};
}
function selectFlowRows(db: DatabaseSync): FlowRegistryRow[] {
return executeSqliteQuerySync<FlowRegistryRow>(
db,
getFlowRegistryKysely(db)
.selectFrom("flow_runs")
.select([
"flow_id",
"sync_mode",
"shape",
"owner_key",
"requester_origin_json",
"controller_id",
"revision",
"status",
"notify_policy",
"goal",
"current_step",
"blocked_task_id",
"blocked_summary",
"state_json",
"wait_json",
"cancel_requested_at",
"created_at",
"updated_at",
"ended_at",
])
.orderBy("created_at", "asc")
.orderBy("flow_id", "asc"),
).rows;
}
function upsertFlowRow(db: DatabaseSync, row: Insertable<FlowRunsTable>): void {
executeSqliteQuerySync(
db,
getFlowRegistryKysely(db)
.insertInto("flow_runs")
.values(row)
.onConflict((conflict) =>
conflict.column("flow_id").doUpdateSet({
sync_mode: (eb) => eb.ref("excluded.sync_mode"),
owner_key: (eb) => eb.ref("excluded.owner_key"),
requester_origin_json: (eb) => eb.ref("excluded.requester_origin_json"),
controller_id: (eb) => eb.ref("excluded.controller_id"),
revision: (eb) => eb.ref("excluded.revision"),
status: (eb) => eb.ref("excluded.status"),
notify_policy: (eb) => eb.ref("excluded.notify_policy"),
goal: (eb) => eb.ref("excluded.goal"),
current_step: (eb) => eb.ref("excluded.current_step"),
blocked_task_id: (eb) => eb.ref("excluded.blocked_task_id"),
blocked_summary: (eb) => eb.ref("excluded.blocked_summary"),
state_json: (eb) => eb.ref("excluded.state_json"),
wait_json: (eb) => eb.ref("excluded.wait_json"),
cancel_requested_at: (eb) => eb.ref("excluded.cancel_requested_at"),
created_at: (eb) => eb.ref("excluded.created_at"),
updated_at: (eb) => eb.ref("excluded.updated_at"),
ended_at: (eb) => eb.ref("excluded.ended_at"),
}),
),
);
}
export function legacyTaskFlowRegistrySidecarExists(env: NodeJS.ProcessEnv = process.env): boolean {
return existsSync(resolveLegacyTaskFlowRegistrySqlitePath(env));
}
export function importLegacyTaskFlowRegistrySidecarToSqlite(env: NodeJS.ProcessEnv = process.env): {
importedFlows: number;
removedSource: boolean;
sourcePath: string;
} {
const sourcePath = resolveLegacyTaskFlowRegistrySqlitePath(env);
if (!existsSync(sourcePath)) {
return {
importedFlows: 0,
removedSource: false,
sourcePath,
};
}
const { DatabaseSync } = requireNodeSqlite();
const legacyDb = new DatabaseSync(sourcePath);
let flows: TaskFlowRecord[];
try {
ensureLegacyTaskFlowRegistrySchema(legacyDb);
flows = selectFlowRows(legacyDb).map(rowToFlowRecord);
} finally {
legacyDb.close();
}
runOpenClawStateWriteTransaction(
(database) => {
for (const flow of flows) {
upsertFlowRow(database.db, bindFlowRecord(flow));
}
},
{ env },
);
return {
importedFlows: flows.length,
removedSource: removeSqliteSidecars(sourcePath),
sourcePath,
};
}
export function removeLegacyTaskFlowRegistrySidecar(env: NodeJS.ProcessEnv = process.env): {
removedSource: boolean;
sourcePath: string;
} {
const sourcePath = resolveLegacyTaskFlowRegistrySqlitePath(env);
return {
removedSource: removeSqliteSidecars(sourcePath),
sourcePath,
};
}

View File

@@ -62,10 +62,6 @@ import {
importLegacyChannelPairingFilesToSqlite,
legacyChannelPairingFilesExist,
} from "../pairing/pairing-store.js";
import {
importLegacyPluginStateSidecarToSqlite,
legacyPluginStateSidecarExists,
} from "../plugin-state/plugin-state-store.js";
import {
importLegacyPluginBindingApprovalFileToSqlite,
legacyPluginBindingApprovalFileExists,
@@ -74,20 +70,20 @@ import {
importLegacyInstalledPluginIndexFileToSqlite,
legacyInstalledPluginIndexFileExists,
} from "../plugins/installed-plugin-index-store.js";
import {
importLegacyTaskFlowRegistrySidecarToSqlite,
legacyTaskFlowRegistrySidecarExists,
} from "../tasks/task-flow-registry.store.sqlite.js";
import {
importLegacyTaskRegistrySidecarToSqlite,
legacyTaskRegistrySidecarExists,
} from "../tasks/task-registry.store.sqlite.js";
import { note } from "../terminal/note.js";
import {
importLegacyTuiLastSessionStoreToSqlite,
legacyTuiLastSessionFileExists,
} from "../tui/tui-last-session.js";
import type { DoctorPrompter } from "./doctor-prompter.js";
import {
importLegacyPluginStateSidecarToSqlite,
importLegacyTaskFlowRegistrySidecarToSqlite,
importLegacyTaskRegistrySidecarToSqlite,
legacyPluginStateSidecarExists,
legacyTaskFlowRegistrySidecarExists,
legacyTaskRegistrySidecarExists,
} from "./doctor-sqlite-sidecars.js";
type LegacyStateProbe = {
deviceIdentity: boolean;

View File

@@ -1,4 +1,3 @@
import { existsSync, rmSync } from "node:fs";
import type { DatabaseSync } from "node:sqlite";
import type { Insertable, Selectable } from "kysely";
import {
@@ -12,10 +11,7 @@ import {
openOpenClawStateDatabase,
runOpenClawStateWriteTransaction,
} from "../state/openclaw-state-db.js";
import {
resolveLegacyPluginStateSqlitePath,
resolvePluginStateSqlitePath,
} from "./plugin-state-store.paths.js";
import { resolvePluginStateSqlitePath } from "./plugin-state-store.paths.js";
import {
PluginStateStoreError,
type PluginStateEntry,
@@ -25,8 +21,6 @@ import {
type PluginStateStoreProbeStep,
} from "./plugin-state-store.types.js";
const LEGACY_PLUGIN_STATE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
export const MAX_PLUGIN_STATE_VALUE_BYTES = 65_536;
type PluginStateEntriesTable = OpenClawStateKyselyDatabase["plugin_state_entries"];
@@ -682,75 +676,3 @@ export function probePluginStateStore(): PluginStateStoreProbeResult {
export function closePluginStateSqliteStore(): void {
cachedDatabase = null;
}
export type LegacyPluginStateSidecarImportResult = {
sourcePath: string;
importedEntries: number;
removedSource: boolean;
};
function removeLegacyPluginStateSidecarFiles(sourcePath: string): boolean {
let removed = false;
for (const suffix of LEGACY_PLUGIN_STATE_SIDECAR_SUFFIXES) {
const candidate = `${sourcePath}${suffix}`;
if (!existsSync(candidate)) {
continue;
}
rmSync(candidate, { force: true });
removed = true;
}
return removed;
}
export function legacyPluginStateSidecarExists(env: NodeJS.ProcessEnv = process.env): boolean {
return existsSync(resolveLegacyPluginStateSqlitePath(env));
}
export function importLegacyPluginStateSidecarToSqlite(
env: NodeJS.ProcessEnv = process.env,
): LegacyPluginStateSidecarImportResult {
const sourcePath = resolveLegacyPluginStateSqlitePath(env);
if (!existsSync(sourcePath)) {
return {
sourcePath,
importedEntries: 0,
removedSource: false,
};
}
const { DatabaseSync } = requireNodeSqlite();
const legacyDb = new DatabaseSync(sourcePath);
let rows: PluginStateRow[];
try {
rows = executeSqliteQuerySync<PluginStateRow>(
legacyDb,
getPluginStateKysely(legacyDb)
.selectFrom("plugin_state_entries")
.select(["plugin_id", "namespace", "entry_key", "value_json", "created_at", "expires_at"])
.orderBy("plugin_id", "asc")
.orderBy("namespace", "asc")
.orderBy("entry_key", "asc"),
).rows;
} finally {
legacyDb.close();
}
runOpenClawStateWriteTransaction(
(database) => {
for (const row of rows) {
upsertPluginStateEntry(database.db, {
...row,
created_at: normalizeNumber(row.created_at) ?? 0,
expires_at: normalizeNumber(row.expires_at) ?? null,
});
}
},
{ env },
);
return {
sourcePath,
importedEntries: rows.length,
removedSource: removeLegacyPluginStateSidecarFiles(sourcePath),
};
}

View File

@@ -1,5 +1,9 @@
import { mkdirSync, statSync } from "node:fs";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
importLegacyPluginStateSidecarToSqlite,
legacyPluginStateSidecarExists,
} from "../commands/doctor-sqlite-sidecars.js";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import {
closeOpenClawStateDatabaseForTest,
@@ -10,8 +14,6 @@ import {
closePluginStateSqliteStore,
createCorePluginStateKeyedStore,
createPluginStateKeyedStore,
importLegacyPluginStateSidecarToSqlite,
legacyPluginStateSidecarExists,
PluginStateStoreError,
probePluginStateStore,
resetPluginStateStoreForTests,

View File

@@ -32,9 +32,7 @@ export type {
export { PluginStateStoreError } from "./plugin-state-store.types.js";
export {
closePluginStateSqliteStore,
importLegacyPluginStateSidecarToSqlite,
isPluginStateDatabaseOpen,
legacyPluginStateSidecarExists,
probePluginStateStore,
sweepExpiredPluginStateEntries,
} from "./plugin-state-store.sqlite.js";

View File

@@ -1,15 +1,12 @@
import { existsSync, rmSync } from "node:fs";
import type { DatabaseSync } from "node:sqlite";
import type { Insertable, Selectable } from "kysely";
import { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js";
import {
openOpenClawStateDatabase,
runOpenClawStateWriteTransaction,
} from "../state/openclaw-state-db.js";
import type { DeliveryContext } from "../utils/delivery-context.types.js";
import { resolveLegacyTaskFlowRegistrySqlitePath } from "./task-flow-registry.paths.js";
import type { TaskFlowRegistryStoreSnapshot } from "./task-flow-registry.store.types.js";
import {
parseOptionalTaskFlowSyncMode,
@@ -35,7 +32,6 @@ type FlowRegistryDatabase = {
};
let cachedDatabase: FlowRegistryDatabase | null = null;
const SQLITE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
function normalizeNumber(value: number | bigint | null): number | undefined {
if (typeof value === "bigint") {
@@ -181,105 +177,6 @@ function upsertFlowRow(db: DatabaseSync, row: Insertable<FlowRunsTable>): void {
);
}
function hasLegacyFlowRunsColumn(db: DatabaseSync, columnName: string): boolean {
const rows = db.prepare(`PRAGMA table_info(flow_runs)`).all() as Array<{ name?: string }>;
return rows.some((row) => row.name === columnName);
}
function ensureLegacyTaskFlowRegistrySchema(db: DatabaseSync) {
db.exec(`
CREATE TABLE IF NOT EXISTS flow_runs (
flow_id TEXT NOT NULL PRIMARY KEY,
shape TEXT,
sync_mode TEXT NOT NULL DEFAULT 'managed',
owner_key TEXT NOT NULL,
requester_origin_json TEXT,
controller_id TEXT,
revision INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
goal TEXT NOT NULL,
current_step TEXT,
blocked_task_id TEXT,
blocked_summary TEXT,
state_json TEXT,
wait_json TEXT,
cancel_requested_at INTEGER,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
ended_at INTEGER
);
`);
if (
!hasLegacyFlowRunsColumn(db, "owner_key") &&
hasLegacyFlowRunsColumn(db, "owner_session_key")
) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN owner_key TEXT;`);
db.exec(`
UPDATE flow_runs
SET owner_key = owner_session_key
WHERE owner_key IS NULL
`);
}
if (!hasLegacyFlowRunsColumn(db, "shape")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN shape TEXT;`);
}
if (!hasLegacyFlowRunsColumn(db, "sync_mode")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN sync_mode TEXT;`);
if (hasLegacyFlowRunsColumn(db, "shape")) {
db.exec(`
UPDATE flow_runs
SET sync_mode = CASE
WHEN shape = 'single_task' THEN 'task_mirrored'
ELSE 'managed'
END
WHERE sync_mode IS NULL
`);
} else {
db.exec(`
UPDATE flow_runs
SET sync_mode = 'managed'
WHERE sync_mode IS NULL
`);
}
}
if (!hasLegacyFlowRunsColumn(db, "controller_id")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN controller_id TEXT;`);
}
db.exec(`
UPDATE flow_runs
SET controller_id = 'core/legacy-restored'
WHERE sync_mode = 'managed'
AND (controller_id IS NULL OR trim(controller_id) = '')
`);
if (!hasLegacyFlowRunsColumn(db, "revision")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN revision INTEGER;`);
db.exec(`
UPDATE flow_runs
SET revision = 0
WHERE revision IS NULL
`);
}
if (!hasLegacyFlowRunsColumn(db, "blocked_task_id")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN blocked_task_id TEXT;`);
}
if (!hasLegacyFlowRunsColumn(db, "blocked_summary")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN blocked_summary TEXT;`);
}
if (!hasLegacyFlowRunsColumn(db, "state_json")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN state_json TEXT;`);
}
if (!hasLegacyFlowRunsColumn(db, "wait_json")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN wait_json TEXT;`);
}
if (!hasLegacyFlowRunsColumn(db, "cancel_requested_at")) {
db.exec(`ALTER TABLE flow_runs ADD COLUMN cancel_requested_at INTEGER;`);
}
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_status ON flow_runs(status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_owner_key ON flow_runs(owner_key);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_flow_runs_updated_at ON flow_runs(updated_at);`);
}
function openFlowRegistryDatabase(): FlowRegistryDatabase {
const database = openOpenClawStateDatabase();
const pathname = database.path;
@@ -335,62 +232,3 @@ export function deleteTaskFlowRegistryRecordFromSqlite(flowId: string) {
export function closeTaskFlowRegistrySqliteStore() {
cachedDatabase = null;
}
export function legacyTaskFlowRegistrySidecarExists(env: NodeJS.ProcessEnv = process.env): boolean {
return existsSync(resolveLegacyTaskFlowRegistrySqlitePath(env));
}
function removeSqliteSidecars(pathname: string): boolean {
for (const suffix of SQLITE_SIDECAR_SUFFIXES) {
rmSync(`${pathname}${suffix}`, { force: true });
}
return !existsSync(pathname);
}
export function importLegacyTaskFlowRegistrySidecarToSqlite(env: NodeJS.ProcessEnv = process.env): {
importedFlows: number;
removedSource: boolean;
sourcePath: string;
} {
const sourcePath = resolveLegacyTaskFlowRegistrySqlitePath(env);
if (!existsSync(sourcePath)) {
return {
importedFlows: 0,
removedSource: false,
sourcePath,
};
}
const { DatabaseSync } = requireNodeSqlite();
const legacyDb = new DatabaseSync(sourcePath);
let importedFlows = 0;
try {
ensureLegacyTaskFlowRegistrySchema(legacyDb);
const rows = selectFlowRows(legacyDb);
const flows = rows.map(rowToFlowRecord);
withWriteTransaction(({ db }) => {
for (const flow of flows) {
upsertFlowRow(db, bindFlowRecord(flow));
}
});
importedFlows = flows.length;
} finally {
legacyDb.close();
}
return {
importedFlows,
removedSource: removeSqliteSidecars(sourcePath),
sourcePath,
};
}
export function removeLegacyTaskFlowRegistrySidecar(env: NodeJS.ProcessEnv = process.env): {
removedSource: boolean;
sourcePath: string;
} {
const sourcePath = resolveLegacyTaskFlowRegistrySqlitePath(env);
return {
removedSource: removeSqliteSidecars(sourcePath),
sourcePath,
};
}

View File

@@ -1,6 +1,7 @@
import { existsSync, mkdirSync, statSync } from "node:fs";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { importLegacyTaskFlowRegistrySidecarToSqlite } from "../commands/doctor-sqlite-sidecars.js";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js";
import {
@@ -16,7 +17,6 @@ import {
resolveTaskFlowRegistrySqlitePath,
} from "./task-flow-registry.paths.js";
import { configureTaskFlowRegistryRuntime } from "./task-flow-registry.store.js";
import { importLegacyTaskFlowRegistrySidecarToSqlite } from "./task-flow-registry.store.sqlite.js";
import type { TaskFlowRecord } from "./task-flow-registry.types.js";
function createStoredFlow(): TaskFlowRecord {

View File

@@ -1,15 +1,12 @@
import { existsSync, rmSync } from "node:fs";
import type { DatabaseSync } from "node:sqlite";
import type { Insertable, Selectable } from "kysely";
import { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js";
import {
openOpenClawStateDatabase,
runOpenClawStateWriteTransaction,
} from "../state/openclaw-state-db.js";
import type { DeliveryContext } from "../utils/delivery-context.types.js";
import { resolveLegacyTaskRegistrySqlitePath } from "./task-registry.paths.js";
import type { TaskRegistryStoreSnapshot } from "./task-registry.store.types.js";
import {
parseOptionalTaskTerminalOutcome,
@@ -40,17 +37,12 @@ type TaskRegistryRow = Selectable<TaskRunsTable> & {
type TaskDeliveryStateRow = Selectable<TaskDeliveryStateTable>;
type TableInfoRow = {
name: string;
};
type TaskRegistryDatabase = {
db: DatabaseSync;
path: string;
};
let cachedDatabase: TaskRegistryDatabase | null = null;
const SQLITE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
function normalizeNumber(value: number | bigint | null): number | undefined {
if (typeof value === "bigint") {
@@ -267,110 +259,6 @@ function replaceTaskDeliveryStateRow(
);
}
function hasLegacyTaskRunsColumn(db: DatabaseSync, columnName: string): boolean {
const rows = db.prepare(`PRAGMA table_info(task_runs)`).all() as TableInfoRow[];
return rows.some((row) => row.name === columnName);
}
function migrateLegacyOwnerColumns(db: DatabaseSync) {
if (!hasLegacyTaskRunsColumn(db, "owner_key")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN owner_key TEXT;`);
}
if (!hasLegacyTaskRunsColumn(db, "requester_session_key")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN requester_session_key TEXT;`);
}
if (!hasLegacyTaskRunsColumn(db, "scope_kind")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN scope_kind TEXT NOT NULL DEFAULT 'session';`);
}
if (hasLegacyTaskRunsColumn(db, "requester_session_key")) {
db.exec(`
UPDATE task_runs
SET owner_key = requester_session_key
WHERE owner_key IS NULL
`);
}
db.exec(`
UPDATE task_runs
SET owner_key = CASE
WHEN trim(COALESCE(owner_key, '')) <> '' THEN trim(owner_key)
ELSE 'system:' || runtime || ':' || COALESCE(NULLIF(source_id, ''), task_id)
END
`);
db.exec(`
UPDATE task_runs
SET scope_kind = CASE
WHEN scope_kind = 'system' THEN 'system'
WHEN owner_key LIKE 'system:%' THEN 'system'
ELSE 'session'
END
`);
db.exec(`
UPDATE task_runs
SET requester_session_key = CASE
WHEN scope_kind = 'system' THEN ''
WHEN trim(COALESCE(requester_session_key, '')) <> '' THEN trim(requester_session_key)
ELSE owner_key
END
`);
}
function ensureLegacyTaskRegistrySchema(db: DatabaseSync) {
db.exec(`
CREATE TABLE IF NOT EXISTS task_runs (
task_id TEXT NOT NULL PRIMARY KEY,
runtime TEXT NOT NULL,
task_kind TEXT,
source_id TEXT,
requester_session_key TEXT,
owner_key TEXT NOT NULL,
scope_kind TEXT NOT NULL,
child_session_key TEXT,
parent_flow_id TEXT,
parent_task_id TEXT,
agent_id TEXT,
run_id TEXT,
label TEXT,
task TEXT NOT NULL,
status TEXT NOT NULL,
delivery_status TEXT NOT NULL,
notify_policy TEXT NOT NULL,
created_at INTEGER NOT NULL,
started_at INTEGER,
ended_at INTEGER,
last_event_at INTEGER,
cleanup_after INTEGER,
error TEXT,
progress_summary TEXT,
terminal_summary TEXT,
terminal_outcome TEXT
);
`);
migrateLegacyOwnerColumns(db);
if (!hasLegacyTaskRunsColumn(db, "task_kind")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN task_kind TEXT;`);
}
if (!hasLegacyTaskRunsColumn(db, "parent_flow_id")) {
db.exec(`ALTER TABLE task_runs ADD COLUMN parent_flow_id TEXT;`);
}
db.exec(`
CREATE TABLE IF NOT EXISTS task_delivery_state (
task_id TEXT NOT NULL PRIMARY KEY,
requester_origin_json TEXT,
last_notified_event_at INTEGER
);
`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_run_id ON task_runs(run_id);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_status ON task_runs(status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_runtime_status ON task_runs(runtime, status);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_cleanup_after ON task_runs(cleanup_after);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_last_event_at ON task_runs(last_event_at);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_owner_key ON task_runs(owner_key);`);
db.exec(`CREATE INDEX IF NOT EXISTS idx_task_runs_parent_flow_id ON task_runs(parent_flow_id);`);
db.exec(
`CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);`,
);
}
function openTaskRegistryDatabase(): TaskRegistryDatabase {
const database = openOpenClawStateDatabase();
const pathname = database.path;
@@ -480,72 +368,3 @@ export function deleteTaskDeliveryStateFromSqlite(taskId: string) {
export function closeTaskRegistrySqliteStore() {
cachedDatabase = null;
}
export function legacyTaskRegistrySidecarExists(env: NodeJS.ProcessEnv = process.env): boolean {
return existsSync(resolveLegacyTaskRegistrySqlitePath(env));
}
function removeSqliteSidecars(pathname: string): boolean {
for (const suffix of SQLITE_SIDECAR_SUFFIXES) {
rmSync(`${pathname}${suffix}`, { force: true });
}
return !existsSync(pathname);
}
export function importLegacyTaskRegistrySidecarToSqlite(env: NodeJS.ProcessEnv = process.env): {
importedTasks: number;
importedDeliveryStates: number;
removedSource: boolean;
sourcePath: string;
} {
const sourcePath = resolveLegacyTaskRegistrySqlitePath(env);
if (!existsSync(sourcePath)) {
return {
importedTasks: 0,
importedDeliveryStates: 0,
removedSource: false,
sourcePath,
};
}
const { DatabaseSync } = requireNodeSqlite();
const legacyDb = new DatabaseSync(sourcePath);
let importedTasks = 0;
let importedDeliveryStates = 0;
try {
ensureLegacyTaskRegistrySchema(legacyDb);
const taskRows = selectTaskRows(legacyDb);
const deliveryRows = selectTaskDeliveryStateRows(legacyDb);
const tasks = taskRows.map(rowToTaskRecord);
const deliveryStates = deliveryRows.map(rowToTaskDeliveryState);
withWriteTransaction(({ db }) => {
for (const task of tasks) {
upsertTaskRow(db, bindTaskRecordBase(task));
}
for (const deliveryState of deliveryStates) {
replaceTaskDeliveryStateRow(db, bindTaskDeliveryState(deliveryState));
}
});
importedTasks = tasks.length;
importedDeliveryStates = deliveryStates.length;
} finally {
legacyDb.close();
}
return {
importedTasks,
importedDeliveryStates,
removedSource: removeSqliteSidecars(sourcePath),
sourcePath,
};
}
export function removeLegacyTaskRegistrySidecar(env: NodeJS.ProcessEnv = process.env): {
removedSource: boolean;
sourcePath: string;
} {
const sourcePath = resolveLegacyTaskRegistrySqlitePath(env);
return {
removedSource: removeSqliteSidecars(sourcePath),
sourcePath,
};
}

View File

@@ -1,6 +1,7 @@
import { existsSync, mkdirSync, statSync } from "node:fs";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { importLegacyTaskRegistrySidecarToSqlite } from "../commands/doctor-sqlite-sidecars.js";
import { requireNodeSqlite } from "../infra/node-sqlite.js";
import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js";
import { createManagedTaskFlow, resetTaskFlowRegistryForTests } from "./task-flow-registry.js";
@@ -21,7 +22,6 @@ import {
configureTaskRegistryRuntime,
type TaskRegistryObserverEvent,
} from "./task-registry.store.js";
import { importLegacyTaskRegistrySidecarToSqlite } from "./task-registry.store.sqlite.js";
import type { TaskRecord } from "./task-registry.types.js";
const ORIGINAL_STATE_DIR = process.env.OPENCLAW_STATE_DIR;