mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-19 04:28:22 +00:00
322 lines
10 KiB
TypeScript
322 lines
10 KiB
TypeScript
// Runtime helpers for migration providers that need filesystem side effects.
|
|
|
|
import crypto from "node:crypto";
|
|
import fs from "node:fs/promises";
|
|
import path from "node:path";
|
|
import { pathExists } from "../infra/fs-safe.js";
|
|
import { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js";
|
|
import type {
|
|
MigrationApplyResult,
|
|
MigrationItem,
|
|
MigrationProviderContext,
|
|
} from "../plugins/types.js";
|
|
import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js";
|
|
import { runOpenClawStateWriteTransaction } from "../state/openclaw-state-db.js";
|
|
import {
|
|
MIGRATION_REASON_MISSING_SOURCE_OR_TARGET,
|
|
MIGRATION_REASON_TARGET_EXISTS,
|
|
markMigrationItemConflict,
|
|
markMigrationItemError,
|
|
redactMigrationPlan,
|
|
} from "./migration.js";
|
|
|
|
export type { MigrationApplyResult, MigrationItem } from "../plugins/types.js";
|
|
|
|
type PluginMigrationDatabase = Pick<
|
|
OpenClawStateKyselyDatabase,
|
|
"plugin_blob_entries" | "plugin_state_entries"
|
|
>;
|
|
|
|
export function upsertPluginStateMigrationEntry(params: {
|
|
pluginId: string;
|
|
namespace: string;
|
|
key: string;
|
|
value: unknown;
|
|
createdAt: number;
|
|
expiresAt?: number | null;
|
|
env?: NodeJS.ProcessEnv;
|
|
}): void {
|
|
const valueJson = JSON.stringify(params.value);
|
|
runOpenClawStateWriteTransaction(
|
|
(stateDatabase) => {
|
|
const db = getNodeSqliteKysely<PluginMigrationDatabase>(stateDatabase.db);
|
|
executeSqliteQuerySync(
|
|
stateDatabase.db,
|
|
db
|
|
.insertInto("plugin_state_entries")
|
|
.values({
|
|
plugin_id: params.pluginId,
|
|
namespace: params.namespace,
|
|
entry_key: params.key,
|
|
value_json: valueJson,
|
|
created_at: params.createdAt,
|
|
expires_at: params.expiresAt ?? null,
|
|
})
|
|
.onConflict((conflict) =>
|
|
conflict.columns(["plugin_id", "namespace", "entry_key"]).doUpdateSet({
|
|
value_json: (eb) => eb.ref("excluded.value_json"),
|
|
created_at: (eb) => eb.ref("excluded.created_at"),
|
|
expires_at: (eb) => eb.ref("excluded.expires_at"),
|
|
}),
|
|
),
|
|
);
|
|
},
|
|
{ env: params.env },
|
|
);
|
|
}
|
|
|
|
export function upsertPluginBlobMigrationEntry(params: {
|
|
pluginId: string;
|
|
namespace: string;
|
|
key: string;
|
|
metadata: unknown;
|
|
blob: Buffer;
|
|
createdAt: number;
|
|
expiresAt?: number | null;
|
|
env?: NodeJS.ProcessEnv;
|
|
}): void {
|
|
const metadataJson = JSON.stringify(params.metadata);
|
|
runOpenClawStateWriteTransaction(
|
|
(stateDatabase) => {
|
|
const db = getNodeSqliteKysely<PluginMigrationDatabase>(stateDatabase.db);
|
|
executeSqliteQuerySync(
|
|
stateDatabase.db,
|
|
db
|
|
.insertInto("plugin_blob_entries")
|
|
.values({
|
|
plugin_id: params.pluginId,
|
|
namespace: params.namespace,
|
|
entry_key: params.key,
|
|
metadata_json: metadataJson,
|
|
blob: params.blob,
|
|
created_at: params.createdAt,
|
|
expires_at: params.expiresAt ?? null,
|
|
})
|
|
.onConflict((conflict) =>
|
|
conflict.columns(["plugin_id", "namespace", "entry_key"]).doUpdateSet({
|
|
metadata_json: (eb) => eb.ref("excluded.metadata_json"),
|
|
blob: (eb) => eb.ref("excluded.blob"),
|
|
created_at: (eb) => eb.ref("excluded.created_at"),
|
|
expires_at: (eb) => eb.ref("excluded.expires_at"),
|
|
}),
|
|
),
|
|
);
|
|
},
|
|
{ env: params.env },
|
|
);
|
|
}
|
|
|
|
export function withCachedMigrationConfigRuntime(
|
|
runtime: MigrationProviderContext["runtime"] | undefined,
|
|
fallbackConfig: MigrationProviderContext["config"],
|
|
): MigrationProviderContext["runtime"] | undefined {
|
|
if (!runtime) {
|
|
return undefined;
|
|
}
|
|
const configApi = runtime.config;
|
|
if (!configApi?.current || !configApi.mutateConfigFile) {
|
|
return runtime;
|
|
}
|
|
let cachedConfig: MigrationProviderContext["config"] | undefined;
|
|
const current = (): ReturnType<typeof configApi.current> => {
|
|
cachedConfig ??= structuredClone(
|
|
(configApi.current() ?? fallbackConfig) as MigrationProviderContext["config"],
|
|
);
|
|
return cachedConfig;
|
|
};
|
|
return {
|
|
...runtime,
|
|
config: {
|
|
...runtime.config,
|
|
current,
|
|
mutateConfigFile: async (params) => {
|
|
const result = await configApi.mutateConfigFile({
|
|
...params,
|
|
mutate: async (draft, context) => {
|
|
const mutationResult = await params.mutate(draft, context);
|
|
cachedConfig = structuredClone(draft);
|
|
return mutationResult;
|
|
},
|
|
});
|
|
cachedConfig = structuredClone(result.nextConfig);
|
|
return result;
|
|
},
|
|
...(configApi.replaceConfigFile
|
|
? {
|
|
replaceConfigFile: async (params) => {
|
|
const result = await configApi.replaceConfigFile(params);
|
|
cachedConfig = structuredClone(result.nextConfig);
|
|
return result;
|
|
},
|
|
}
|
|
: {}),
|
|
},
|
|
};
|
|
}
|
|
|
|
async function exists(filePath: string): Promise<boolean> {
|
|
return await pathExists(filePath);
|
|
}
|
|
|
|
async function backupExistingMigrationTarget(
|
|
target: string,
|
|
reportDir: string,
|
|
): Promise<string | undefined> {
|
|
if (!(await exists(target))) {
|
|
return undefined;
|
|
}
|
|
const backupRoot = path.join(reportDir, "item-backups");
|
|
await fs.mkdir(backupRoot, { recursive: true });
|
|
const targetHash = crypto
|
|
.createHash("sha256")
|
|
.update(path.resolve(target))
|
|
.digest("hex")
|
|
.slice(0, 12);
|
|
const backupDir = await fs.mkdtemp(
|
|
path.join(backupRoot, `${Date.now()}-${targetHash}-${path.basename(target)}-`),
|
|
);
|
|
const backupPath = path.join(backupDir, path.basename(target));
|
|
await fs.cp(target, backupPath, { recursive: true, force: true });
|
|
return backupPath;
|
|
}
|
|
|
|
function isFileAlreadyExistsError(err: unknown): boolean {
|
|
return Boolean(
|
|
err &&
|
|
typeof err === "object" &&
|
|
"code" in err &&
|
|
((err as { code?: unknown }).code === "ERR_FS_CP_EEXIST" ||
|
|
(err as { code?: unknown }).code === "EEXIST"),
|
|
);
|
|
}
|
|
|
|
function readArchiveRelativePath(item: MigrationItem): string {
|
|
const detailPath = item.details?.archiveRelativePath;
|
|
const raw = typeof detailPath === "string" && detailPath.trim() ? detailPath : undefined;
|
|
const fallback = item.source ? path.basename(item.source) : item.id;
|
|
const normalized = path
|
|
.normalize(raw ?? fallback)
|
|
.split(path.sep)
|
|
.filter((part) => part && part !== "." && part !== "..")
|
|
.join(path.sep);
|
|
return normalized || "item";
|
|
}
|
|
|
|
async function resolveUniqueArchivePath(
|
|
archiveRoot: string,
|
|
relativePath: string,
|
|
): Promise<string> {
|
|
const parsed = path.parse(relativePath);
|
|
let candidate = path.join(archiveRoot, relativePath);
|
|
let index = 2;
|
|
while (await exists(candidate)) {
|
|
const filename = `${parsed.name}-${index}${parsed.ext}`;
|
|
candidate = path.join(archiveRoot, parsed.dir, filename);
|
|
index += 1;
|
|
}
|
|
return candidate;
|
|
}
|
|
|
|
export async function archiveMigrationItem(
|
|
item: MigrationItem,
|
|
reportDir: string,
|
|
): Promise<MigrationItem> {
|
|
if (!item.source) {
|
|
return markMigrationItemError(item, MIGRATION_REASON_MISSING_SOURCE_OR_TARGET);
|
|
}
|
|
try {
|
|
const sourceStat = await fs.lstat(item.source);
|
|
if (sourceStat.isSymbolicLink()) {
|
|
return markMigrationItemError(item, "archive source is a symlink");
|
|
}
|
|
const archiveRoot = path.join(reportDir, "archive");
|
|
const relativePath = readArchiveRelativePath(item);
|
|
const archivePath = await resolveUniqueArchivePath(archiveRoot, relativePath);
|
|
await fs.mkdir(path.dirname(archivePath), { recursive: true });
|
|
await fs.cp(item.source, archivePath, {
|
|
recursive: true,
|
|
force: false,
|
|
errorOnExist: true,
|
|
verbatimSymlinks: true,
|
|
});
|
|
return {
|
|
...item,
|
|
status: "migrated",
|
|
target: archivePath,
|
|
details: { ...item.details, archivePath, archiveRelativePath: relativePath },
|
|
};
|
|
} catch (err) {
|
|
if (isFileAlreadyExistsError(err)) {
|
|
return markMigrationItemConflict(item, MIGRATION_REASON_TARGET_EXISTS);
|
|
}
|
|
return markMigrationItemError(item, err instanceof Error ? err.message : String(err));
|
|
}
|
|
}
|
|
|
|
export async function copyMigrationFileItem(
|
|
item: MigrationItem,
|
|
reportDir: string,
|
|
opts: { overwrite?: boolean } = {},
|
|
): Promise<MigrationItem> {
|
|
if (!item.source || !item.target) {
|
|
return markMigrationItemError(item, MIGRATION_REASON_MISSING_SOURCE_OR_TARGET);
|
|
}
|
|
try {
|
|
const targetExists = await exists(item.target);
|
|
if (targetExists && !opts.overwrite) {
|
|
return markMigrationItemConflict(item, MIGRATION_REASON_TARGET_EXISTS);
|
|
}
|
|
const backupPath = opts.overwrite
|
|
? await backupExistingMigrationTarget(item.target, reportDir)
|
|
: undefined;
|
|
await fs.mkdir(path.dirname(item.target), { recursive: true });
|
|
await fs.cp(item.source, item.target, {
|
|
recursive: true,
|
|
force: Boolean(opts.overwrite),
|
|
errorOnExist: !opts.overwrite,
|
|
});
|
|
return {
|
|
...item,
|
|
status: "migrated",
|
|
details: { ...item.details, ...(backupPath ? { backupPath } : {}) },
|
|
};
|
|
} catch (err) {
|
|
if (isFileAlreadyExistsError(err)) {
|
|
return markMigrationItemConflict(item, MIGRATION_REASON_TARGET_EXISTS);
|
|
}
|
|
return markMigrationItemError(item, err instanceof Error ? err.message : String(err));
|
|
}
|
|
}
|
|
|
|
export async function writeMigrationReport(
|
|
result: MigrationApplyResult,
|
|
opts: { title?: string } = {},
|
|
): Promise<void> {
|
|
if (!result.reportDir) {
|
|
return;
|
|
}
|
|
await fs.mkdir(result.reportDir, { recursive: true });
|
|
await fs.writeFile(
|
|
path.join(result.reportDir, "report.json"),
|
|
`${JSON.stringify(redactMigrationPlan(result), null, 2)}\n`,
|
|
"utf8",
|
|
);
|
|
const lines = [
|
|
`# ${opts.title ?? "Migration Report"}`,
|
|
"",
|
|
`Source: ${result.source}`,
|
|
result.target ? `Target: ${result.target}` : undefined,
|
|
result.backupPath ? `Backup: ${result.backupPath}` : undefined,
|
|
"",
|
|
`Migrated: ${result.summary.migrated}`,
|
|
`Skipped: ${result.summary.skipped}`,
|
|
`Conflicts: ${result.summary.conflicts}`,
|
|
`Errors: ${result.summary.errors}`,
|
|
"",
|
|
...result.items.map(
|
|
(item) => `- ${item.status}: ${item.id}${item.reason ? ` (${item.reason})` : ""}`,
|
|
),
|
|
].filter((line): line is string => typeof line === "string");
|
|
await fs.writeFile(path.join(result.reportDir, "summary.md"), `${lines.join("\n")}\n`, "utf8");
|
|
}
|