diff --git a/package.json b/package.json index 5735729c70c..7b74cfee6b1 100644 --- a/package.json +++ b/package.json @@ -1695,7 +1695,7 @@ "@mariozechner/pi-tui": "0.73.0", "@modelcontextprotocol/sdk": "1.29.0", "@mozilla/readability": "^0.6.0", - "@openclaw/fs-safe": "^0.1.1", + "@openclaw/fs-safe": "github:openclaw/fs-safe#3c508734af0b", "@slack/bolt": "^4.7.2", "@slack/types": "^2.21.0", "@slack/web-api": "^7.15.2", @@ -1801,6 +1801,7 @@ "uuid": "14.0.0" }, "onlyBuiltDependencies": [ + "@openclaw/fs-safe", "@discordjs/opus", "@google/genai", "@lydell/node-pty", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 69fd57b8a2a..84261999e0f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -104,8 +104,8 @@ importers: specifier: ^0.6.0 version: 0.6.0 '@openclaw/fs-safe': - specifier: ^0.1.1 - version: 0.1.1 + specifier: github:openclaw/fs-safe#3c508734af0b + version: https://codeload.github.com/openclaw/fs-safe/tar.gz/3c508734af0baaad4f61cf70619bea98c783043b '@slack/bolt': specifier: ^4.7.2 version: 4.7.2(@types/express@5.0.6) @@ -3233,8 +3233,9 @@ packages: cpu: [x64] os: [win32] - '@openclaw/fs-safe@0.1.1': - resolution: {integrity: sha512-+50LBpW7nKWzu3wJb4C5X9BQAcAxmj3oNRd/ZqZK+YO1ZdiikOPZ6fy5ta631xsV13+m+Ap5s0Gb3zPSl+0kOQ==} + '@openclaw/fs-safe@https://codeload.github.com/openclaw/fs-safe/tar.gz/3c508734af0baaad4f61cf70619bea98c783043b': + resolution: {tarball: https://codeload.github.com/openclaw/fs-safe/tar.gz/3c508734af0baaad4f61cf70619bea98c783043b} + version: 0.1.2 engines: {node: '>=20.11'} '@opentelemetry/api-logs@0.216.0': @@ -10005,7 +10006,7 @@ snapshots: '@openai/codex@0.128.0-win32-x64': optional: true - '@openclaw/fs-safe@0.1.1': + '@openclaw/fs-safe@https://codeload.github.com/openclaw/fs-safe/tar.gz/3c508734af0baaad4f61cf70619bea98c783043b': optionalDependencies: jszip: 3.10.1 tar: 7.5.13 diff --git a/src/agents/pi-project-settings-snapshot.ts b/src/agents/pi-project-settings-snapshot.ts index c6a98f40147..0ca955abd7c 100644 --- a/src/agents/pi-project-settings-snapshot.ts +++ b/src/agents/pi-project-settings-snapshot.ts @@ -1,9 +1,8 @@ -import fs from "node:fs"; import path from "node:path"; import type { SettingsManager } from "@mariozechner/pi-coding-agent"; import { applyMergePatch } from "../config/merge-patch.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; -import { openRootFileSync } from "../infra/boundary-file-read.js"; +import { readRootJsonObjectSync } from "../infra/json-files.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import type { BundleMcpServerConfig } from "../plugins/bundle-mcp.js"; import { @@ -11,7 +10,6 @@ import { resolveEffectivePluginActivationState, } from "../plugins/config-policy.js"; import { loadPluginMetadataSnapshot } from "../plugins/plugin-metadata-snapshot.js"; -import { isRecord } from "../utils.js"; import { loadEmbeddedPiMcpConfig } from "./embedded-pi-mcp.js"; const log = createSubsystemLogger("embedded-pi-settings"); @@ -43,29 +41,21 @@ function loadBundleSettingsFile(params: { relativePath: string; }): PiSettingsSnapshot | null { const absolutePath = path.join(params.rootDir, params.relativePath); - const opened = openRootFileSync({ - absolutePath, - rootPath: params.rootDir, + const result = readRootJsonObjectSync({ + rootDir: params.rootDir, + relativePath: params.relativePath, boundaryLabel: "plugin root", rejectHardlinks: true, }); - if (!opened.ok) { + if (!result.ok && result.reason === "open") { log.warn(`skipping unsafe bundle settings file: ${absolutePath}`); return null; } - try { - const raw = JSON.parse(fs.readFileSync(opened.fd, "utf-8")) as unknown; - if (!isRecord(raw)) { - log.warn(`skipping bundle settings file with non-object JSON: ${absolutePath}`); - return null; - } - return sanitizePiSettingsSnapshot(raw as PiSettingsSnapshot); - } catch (error) { - log.warn(`failed to parse bundle settings file ${absolutePath}: ${String(error)}`); + if (!result.ok) { + log.warn(`${result.error}: ${absolutePath}`); return null; - } finally { - fs.closeSync(opened.fd); } + return sanitizePiSettingsSnapshot(result.value as PiSettingsSnapshot); } export function loadEnabledBundlePiSettingsSnapshot(params: { diff --git a/src/infra/detect-package-manager.ts b/src/infra/detect-package-manager.ts index 675d7fa4929..74dae795631 100644 --- a/src/infra/detect-package-manager.ts +++ b/src/infra/detect-package-manager.ts @@ -1,18 +1,12 @@ import fs from "node:fs/promises"; -import path from "node:path"; +import { readPackageManagerSpec } from "./package-json.js"; type DetectedPackageManager = "pnpm" | "bun" | "npm"; export async function detectPackageManager(root: string): Promise { - try { - const raw = await fs.readFile(path.join(root, "package.json"), "utf-8"); - const parsed = JSON.parse(raw) as { packageManager?: string }; - const pm = parsed?.packageManager?.split("@")[0]?.trim(); - if (pm === "pnpm" || pm === "bun" || pm === "npm") { - return pm; - } - } catch { - // ignore + const pm = (await readPackageManagerSpec(root))?.split("@")[0]?.trim(); + if (pm === "pnpm" || pm === "bun" || pm === "npm") { + return pm; } const files = await fs.readdir(root).catch((): string[] => []); diff --git a/src/infra/json-files.ts b/src/infra/json-files.ts index fda9494add1..ebeab29b0ed 100644 --- a/src/infra/json-files.ts +++ b/src/infra/json-files.ts @@ -6,6 +6,9 @@ export { readJsonIfExists, readJsonIfExists as readDurableJsonFile, readJsonSync, + readRootJsonObjectSync, + readRootJsonSync, + readRootStructuredFileSync, tryReadJson, tryReadJson as readJsonFile, tryReadJsonSync, diff --git a/src/infra/outbound/delivery-queue-storage.ts b/src/infra/outbound/delivery-queue-storage.ts index 419e7a79445..e613f497433 100644 --- a/src/infra/outbound/delivery-queue-storage.ts +++ b/src/infra/outbound/delivery-queue-storage.ts @@ -1,10 +1,18 @@ -import fs from "node:fs"; import path from "node:path"; +import { + ackJsonDurableQueueEntry, + ensureJsonDurableQueueDirs, + loadJsonDurableQueueEntry, + loadPendingJsonDurableQueueEntries, + moveJsonDurableQueueEntryToFailed, + readJsonDurableQueueEntry, + resolveJsonDurableQueueEntryPaths, + writeJsonDurableQueueEntry, +} from "@openclaw/fs-safe/store"; import type { ReplyPayload } from "../../auto-reply/types.js"; import type { RenderedMessageBatchPlanItem } from "../../channels/message/types.js"; import { resolveStateDir } from "../../config/paths.js"; import type { ReplyToMode } from "../../config/types.js"; -import { replaceFileAtomic } from "../replace-file.js"; import { generateSecureUuid } from "../secure-random.js"; import type { OutboundDeliveryFormattingOptions } from "./formatting.js"; import type { OutboundIdentity } from "./identity.js"; @@ -14,6 +22,7 @@ import type { OutboundChannel } from "./targets.js"; const QUEUE_DIRNAME = "delivery-queue"; const FAILED_DIRNAME = "failed"; +const QUEUE_TEMP_PREFIX = ".delivery-queue"; export type QueuedRenderedMessageBatchPlan = { payloadCount: number; @@ -80,38 +89,19 @@ function resolveQueueEntryPaths( jsonPath: string; deliveredPath: string; } { - const queueDir = resolveQueueDir(stateDir); - return { - jsonPath: path.join(queueDir, `${id}.json`), - deliveredPath: path.join(queueDir, `${id}.delivered`), - }; -} - -function getErrnoCode(err: unknown): string | null { - return err && typeof err === "object" && "code" in err - ? String((err as { code?: unknown }).code) - : null; -} - -async function unlinkBestEffort(filePath: string): Promise { - try { - await fs.promises.unlink(filePath); - } catch { - // Best-effort cleanup. - } + return resolveJsonDurableQueueEntryPaths(resolveQueueDir(stateDir), id); } async function writeQueueEntry(filePath: string, entry: QueuedDelivery): Promise { - await replaceFileAtomic({ + await writeJsonDurableQueueEntry({ filePath, - content: JSON.stringify(entry, null, 2), - mode: 0o600, - tempPrefix: ".delivery-queue", + entry, + tempPrefix: QUEUE_TEMP_PREFIX, }); } async function readQueueEntry(filePath: string): Promise { - return JSON.parse(await fs.promises.readFile(filePath, "utf-8")) as QueuedDelivery; + return await readJsonDurableQueueEntry(filePath); } function normalizeLegacyQueuedDeliveryEntry(entry: QueuedDelivery): { @@ -144,8 +134,10 @@ function normalizeLegacyQueuedDeliveryEntry(entry: QueuedDelivery): { /** Ensure the queue directory (and failed/ subdirectory) exist. */ export async function ensureQueueDir(stateDir?: string): Promise { const queueDir = resolveQueueDir(stateDir); - await fs.promises.mkdir(queueDir, { recursive: true, mode: 0o700 }); - await fs.promises.mkdir(resolveFailedDir(stateDir), { recursive: true, mode: 0o700 }); + await ensureJsonDurableQueueDirs({ + queueDir, + failedDir: resolveFailedDir(stateDir), + }); return queueDir; } @@ -191,22 +183,7 @@ export async function enqueueDelivery( * by {@link loadPendingDeliveries} on the next startup without re-sending. */ export async function ackDelivery(id: string, stateDir?: string): Promise { - const { jsonPath, deliveredPath } = resolveQueueEntryPaths(id, stateDir); - try { - // Phase 1: atomic rename marks the delivery as complete. - await fs.promises.rename(jsonPath, deliveredPath); - } catch (err) { - const code = getErrnoCode(err); - if (code === "ENOENT") { - // .json already gone — may have been renamed by a previous ack attempt. - // Try to clean up a leftover .delivered marker if present. - await unlinkBestEffort(deliveredPath); - return; - } - throw err; - } - // Phase 2: remove the marker file. - await unlinkBestEffort(deliveredPath); + await ackJsonDurableQueueEntry(resolveQueueEntryPaths(id, stateDir)); } /** Update a queue entry after a failed delivery attempt. */ @@ -246,76 +223,28 @@ export async function loadPendingDelivery( id: string, stateDir?: string, ): Promise { - const { jsonPath } = resolveQueueEntryPaths(id, stateDir); - try { - const stat = await fs.promises.stat(jsonPath); - if (!stat.isFile()) { - return null; - } - const { entry, migrated } = normalizeLegacyQueuedDeliveryEntry(await readQueueEntry(jsonPath)); - if (migrated) { - await writeQueueEntry(jsonPath, entry); - } - return entry; - } catch (err) { - if (getErrnoCode(err) === "ENOENT") { - return null; - } - throw err; - } + return await loadJsonDurableQueueEntry({ + paths: resolveQueueEntryPaths(id, stateDir), + tempPrefix: QUEUE_TEMP_PREFIX, + read: async (entry) => normalizeLegacyQueuedDeliveryEntry(entry), + }); } /** Load all pending delivery entries from the queue directory. */ export async function loadPendingDeliveries(stateDir?: string): Promise { const queueDir = resolveQueueDir(stateDir); - let files: string[]; - try { - files = await fs.promises.readdir(queueDir); - } catch (err) { - const code = getErrnoCode(err); - if (code === "ENOENT") { - return []; - } - throw err; - } - - // Clean up .delivered markers left by ackDelivery if the process crashed - // between the rename and the unlink. - for (const file of files) { - if (file.endsWith(".delivered")) { - await unlinkBestEffort(path.join(queueDir, file)); - } - } - - const entries: QueuedDelivery[] = []; - for (const file of files) { - if (!file.endsWith(".json")) { - continue; - } - const filePath = path.join(queueDir, file); - try { - const stat = await fs.promises.stat(filePath); - if (!stat.isFile()) { - continue; - } - const { entry, migrated } = normalizeLegacyQueuedDeliveryEntry( - await readQueueEntry(filePath), - ); - if (migrated) { - await writeQueueEntry(filePath, entry); - } - entries.push(entry); - } catch { - // Skip malformed or inaccessible entries. - } - } - return entries; + return await loadPendingJsonDurableQueueEntries({ + queueDir, + tempPrefix: QUEUE_TEMP_PREFIX, + read: async (entry) => normalizeLegacyQueuedDeliveryEntry(entry), + }); } /** Move a queue entry to the failed/ subdirectory. */ export async function moveToFailed(id: string, stateDir?: string): Promise { - const queueDir = resolveQueueDir(stateDir); - const failedDir = resolveFailedDir(stateDir); - await fs.promises.mkdir(failedDir, { recursive: true, mode: 0o700 }); - await fs.promises.rename(path.join(queueDir, `${id}.json`), path.join(failedDir, `${id}.json`)); + await moveJsonDurableQueueEntryToFailed({ + queueDir: resolveQueueDir(stateDir), + failedDir: resolveFailedDir(stateDir), + id, + }); } diff --git a/src/infra/package-json.test.ts b/src/infra/package-json.test.ts index a5ee8d864a6..76444b5a80e 100644 --- a/src/infra/package-json.test.ts +++ b/src/infra/package-json.test.ts @@ -2,7 +2,7 @@ import fs from "node:fs/promises"; import path from "node:path"; import { describe, expect, it } from "vitest"; import { withTempDir } from "../test-helpers/temp-dir.js"; -import { readPackageName, readPackageVersion } from "./package-json.js"; +import { readPackageManagerSpec, readPackageName, readPackageVersion } from "./package-json.js"; async function expectPackageMeta(params: { root: string; @@ -18,7 +18,11 @@ describe("package-json helpers", () => { await withTempDir({ prefix: "openclaw-package-json-" }, async (root) => { await fs.writeFile( path.join(root, "package.json"), - JSON.stringify({ version: " 1.2.3 ", name: " @openclaw/demo " }), + JSON.stringify({ + version: " 1.2.3 ", + name: " @openclaw/demo ", + packageManager: " pnpm@10.8.1 ", + }), "utf8", ); @@ -27,6 +31,7 @@ describe("package-json helpers", () => { expectedVersion: "1.2.3", expectedName: "@openclaw/demo", }); + await expect(readPackageManagerSpec(root)).resolves.toBe("pnpm@10.8.1"); }); }); diff --git a/src/infra/package-json.ts b/src/infra/package-json.ts index 8da5108938a..b6529912e3e 100644 --- a/src/infra/package-json.ts +++ b/src/infra/package-json.ts @@ -1,24 +1,31 @@ -import fs from "node:fs/promises"; import path from "node:path"; +import { tryReadJson } from "./json-files.js"; + +type PackageJson = { + name?: unknown; + packageManager?: unknown; + version?: unknown; +}; + +function normalizeString(value: unknown): string | null { + return typeof value === "string" && value.trim() ? value.trim() : null; +} + +export async function readPackageJson(root: string): Promise { + const parsed = await tryReadJson(path.join(root, "package.json")); + return parsed && typeof parsed === "object" && !Array.isArray(parsed) + ? (parsed as PackageJson) + : null; +} export async function readPackageVersion(root: string): Promise { - try { - const raw = await fs.readFile(path.join(root, "package.json"), "utf-8"); - const parsed = JSON.parse(raw) as { version?: string }; - const version = parsed?.version?.trim(); - return version ? version : null; - } catch { - return null; - } + return normalizeString((await readPackageJson(root))?.version); } export async function readPackageName(root: string): Promise { - try { - const raw = await fs.readFile(path.join(root, "package.json"), "utf-8"); - const parsed = JSON.parse(raw) as { name?: string }; - const name = parsed?.name?.trim(); - return name ? name : null; - } catch { - return null; - } + return normalizeString((await readPackageJson(root))?.name); +} + +export async function readPackageManagerSpec(root: string): Promise { + return normalizeString((await readPackageJson(root))?.packageManager); } diff --git a/src/infra/package-update-utils.ts b/src/infra/package-update-utils.ts index 2f2ac5067ea..47d21e8c23f 100644 --- a/src/infra/package-update-utils.ts +++ b/src/infra/package-update-utils.ts @@ -1,6 +1,6 @@ import fsSync from "node:fs"; import path from "node:path"; -import { openRootFileSync } from "./boundary-file-read.js"; +import { readRootJsonObjectSync } from "@openclaw/fs-safe/json"; export function expectedIntegrityForUpdate( spec: string | undefined, @@ -29,23 +29,12 @@ function isRecord(value: unknown): value is Record { } function readInstalledPackageManifest(dir: string): Record | undefined { - const manifestPath = path.join(dir, "package.json"); - const opened = openRootFileSync({ - absolutePath: manifestPath, - rootPath: dir, + const result = readRootJsonObjectSync({ + rootDir: dir, + relativePath: "package.json", boundaryLabel: "installed package directory", }); - if (!opened.ok) { - return undefined; - } - try { - const parsed = JSON.parse(fsSync.readFileSync(opened.fd, "utf-8")) as unknown; - return isRecord(parsed) ? parsed : undefined; - } catch { - return undefined; - } finally { - fsSync.closeSync(opened.fd); - } + return result.ok ? result.value : undefined; } export async function readInstalledPackageVersion(dir: string): Promise { diff --git a/src/infra/session-delivery-queue-storage.ts b/src/infra/session-delivery-queue-storage.ts index 0fc87752752..17f74a6600d 100644 --- a/src/infra/session-delivery-queue-storage.ts +++ b/src/infra/session-delivery-queue-storage.ts @@ -1,14 +1,24 @@ import { createHash } from "node:crypto"; -import fs from "node:fs"; import path from "node:path"; +import { + ackJsonDurableQueueEntry, + ensureJsonDurableQueueDirs, + jsonDurableQueueEntryExists, + loadJsonDurableQueueEntry, + loadPendingJsonDurableQueueEntries, + moveJsonDurableQueueEntryToFailed, + readJsonDurableQueueEntry, + resolveJsonDurableQueueEntryPaths, + writeJsonDurableQueueEntry, +} from "@openclaw/fs-safe/store"; import type { ChatType } from "../channels/chat-type.js"; import { resolveStateDir } from "../config/paths.js"; -import { replaceFileAtomic } from "./replace-file.js"; import { generateSecureUuid } from "./secure-random.js"; const QUEUE_DIRNAME = "session-delivery-queue"; const FAILED_DIRNAME = "failed"; const TMP_SWEEP_MAX_AGE_MS = 5_000; +const QUEUE_TEMP_PREFIX = ".session-delivery-queue"; type SessionDeliveryContext = { channel?: string; @@ -52,12 +62,6 @@ export type QueuedSessionDelivery = QueuedSessionDeliveryPayload & { lastError?: string; }; -function getErrnoCode(err: unknown): string | null { - return err && typeof err === "object" && "code" in err - ? String((err as { code?: unknown }).code) - : null; -} - function buildEntryId(idempotencyKey?: string): string { if (!idempotencyKey) { return generateSecureUuid(); @@ -65,38 +69,16 @@ function buildEntryId(idempotencyKey?: string): string { return createHash("sha256").update(idempotencyKey).digest("hex"); } -async function unlinkBestEffort(filePath: string): Promise { - await fs.promises.unlink(filePath).catch(() => undefined); -} - -async function unlinkStaleTmpBestEffort(filePath: string, now: number): Promise { - try { - const stat = await fs.promises.stat(filePath); - if (!stat.isFile()) { - return; - } - if (now - stat.mtimeMs < TMP_SWEEP_MAX_AGE_MS) { - return; - } - await unlinkBestEffort(filePath); - } catch (err) { - if (getErrnoCode(err) !== "ENOENT") { - throw err; - } - } -} - async function writeQueueEntry(filePath: string, entry: QueuedSessionDelivery): Promise { - await replaceFileAtomic({ + await writeJsonDurableQueueEntry({ filePath, - content: JSON.stringify(entry, null, 2), - mode: 0o600, - tempPrefix: ".session-delivery-queue", + entry, + tempPrefix: QUEUE_TEMP_PREFIX, }); } async function readQueueEntry(filePath: string): Promise { - return JSON.parse(await fs.promises.readFile(filePath, "utf-8")) as QueuedSessionDelivery; + return await readJsonDurableQueueEntry(filePath); } export function resolveSessionDeliveryQueueDir(stateDir?: string): string { @@ -115,17 +97,15 @@ function resolveQueueEntryPaths( jsonPath: string; deliveredPath: string; } { - const queueDir = resolveSessionDeliveryQueueDir(stateDir); - return { - jsonPath: path.join(queueDir, `${id}.json`), - deliveredPath: path.join(queueDir, `${id}.delivered`), - }; + return resolveJsonDurableQueueEntryPaths(resolveSessionDeliveryQueueDir(stateDir), id); } async function ensureSessionDeliveryQueueDir(stateDir?: string): Promise { const queueDir = resolveSessionDeliveryQueueDir(stateDir); - await fs.promises.mkdir(queueDir, { recursive: true, mode: 0o700 }); - await fs.promises.mkdir(resolveFailedDir(stateDir), { recursive: true, mode: 0o700 }); + await ensureJsonDurableQueueDirs({ + queueDir, + failedDir: resolveFailedDir(stateDir), + }); return queueDir; } @@ -138,15 +118,8 @@ export async function enqueueSessionDelivery( const filePath = path.join(queueDir, `${id}.json`); if (params.idempotencyKey) { - try { - const stat = await fs.promises.stat(filePath); - if (stat.isFile()) { - return id; - } - } catch (err) { - if (getErrnoCode(err) !== "ENOENT") { - throw err; - } + if (await jsonDurableQueueEntryExists(filePath)) { + return id; } } @@ -160,18 +133,7 @@ export async function enqueueSessionDelivery( } export async function ackSessionDelivery(id: string, stateDir?: string): Promise { - const { jsonPath, deliveredPath } = resolveQueueEntryPaths(id, stateDir); - try { - await fs.promises.rename(jsonPath, deliveredPath); - } catch (err) { - const code = getErrnoCode(err); - if (code === "ENOENT") { - await unlinkBestEffort(deliveredPath); - return; - } - throw err; - } - await unlinkBestEffort(deliveredPath); + await ackJsonDurableQueueEntry(resolveQueueEntryPaths(id, stateDir)); } export async function failSessionDelivery( @@ -191,66 +153,26 @@ export async function loadPendingSessionDelivery( id: string, stateDir?: string, ): Promise { - const { jsonPath } = resolveQueueEntryPaths(id, stateDir); - try { - const stat = await fs.promises.stat(jsonPath); - if (!stat.isFile()) { - return null; - } - return await readQueueEntry(jsonPath); - } catch (err) { - if (getErrnoCode(err) === "ENOENT") { - return null; - } - throw err; - } + return await loadJsonDurableQueueEntry({ + paths: resolveQueueEntryPaths(id, stateDir), + tempPrefix: QUEUE_TEMP_PREFIX, + }); } export async function loadPendingSessionDeliveries( stateDir?: string, ): Promise { - const queueDir = resolveSessionDeliveryQueueDir(stateDir); - let files: string[]; - try { - files = await fs.promises.readdir(queueDir); - } catch (err) { - if (getErrnoCode(err) === "ENOENT") { - return []; - } - throw err; - } - - const now = Date.now(); - for (const file of files) { - if (file.endsWith(".delivered")) { - await unlinkBestEffort(path.join(queueDir, file)); - } else if (file.endsWith(".tmp")) { - await unlinkStaleTmpBestEffort(path.join(queueDir, file), now); - } - } - - const entries: QueuedSessionDelivery[] = []; - for (const file of files) { - if (!file.endsWith(".json")) { - continue; - } - const filePath = path.join(queueDir, file); - try { - const stat = await fs.promises.stat(filePath); - if (!stat.isFile()) { - continue; - } - entries.push(await readQueueEntry(filePath)); - } catch { - continue; - } - } - return entries; + return await loadPendingJsonDurableQueueEntries({ + queueDir: resolveSessionDeliveryQueueDir(stateDir), + tempPrefix: QUEUE_TEMP_PREFIX, + cleanupTmpMaxAgeMs: TMP_SWEEP_MAX_AGE_MS, + }); } export async function moveSessionDeliveryToFailed(id: string, stateDir?: string): Promise { - const queueDir = resolveSessionDeliveryQueueDir(stateDir); - const failedDir = resolveFailedDir(stateDir); - await fs.promises.mkdir(failedDir, { recursive: true, mode: 0o700 }); - await fs.promises.rename(path.join(queueDir, `${id}.json`), path.join(failedDir, `${id}.json`)); + await moveJsonDurableQueueEntryToFailed({ + queueDir: resolveSessionDeliveryQueueDir(stateDir), + failedDir: resolveFailedDir(stateDir), + id, + }); } diff --git a/src/media/store.ts b/src/media/store.ts index a8b31c6bfef..d2d5be0f35f 100644 --- a/src/media/store.ts +++ b/src/media/store.ts @@ -152,14 +152,28 @@ export async function ensureMediaDir() { return mediaDir; } -function isMissingPathError(err: unknown): err is NodeJS.ErrnoException { - return err instanceof Error && "code" in err && err.code === "ENOENT"; +function findErrorWithCode(err: unknown, code: string): NodeJS.ErrnoException | undefined { + if (!(err instanceof Error)) { + return undefined; + } + if ("code" in err && err.code === code) { + return err as NodeJS.ErrnoException; + } + return findErrorWithCode(err.cause, code); +} + +function isMissingPathError(err: unknown): boolean { + return findErrorWithCode(err, "ENOENT") !== undefined; } async function retryAfterRecreatingDir(dir: string, run: () => Promise): Promise { try { return await run(); } catch (err) { + const noSpaceError = findErrorWithCode(err, "ENOSPC"); + if (noSpaceError) { + throw noSpaceError; + } if (!isMissingPathError(err)) { throw err; } diff --git a/src/plugins/bundle-commands.ts b/src/plugins/bundle-commands.ts index 366ac656c4f..2b6120490d2 100644 --- a/src/plugins/bundle-commands.ts +++ b/src/plugins/bundle-commands.ts @@ -1,7 +1,7 @@ import fs from "node:fs"; import path from "node:path"; import type { OpenClawConfig } from "../config/types.openclaw.js"; -import { openRootFileSync } from "../infra/boundary-file-read.js"; +import { readRootJsonObjectSync } from "../infra/json-files.js"; import { parseFrontmatterBlock } from "../markdown/frontmatter.js"; import { isPathInsideWithRealpath } from "../security/scan-paths.js"; import { @@ -55,26 +55,13 @@ function stripFrontmatter(content: string): string { } function readClaudeBundleManifest(rootDir: string): Record { - const manifestPath = path.join(rootDir, CLAUDE_BUNDLE_MANIFEST_RELATIVE_PATH); - const opened = openRootFileSync({ - absolutePath: manifestPath, - rootPath: rootDir, + const result = readRootJsonObjectSync({ + rootDir, + relativePath: CLAUDE_BUNDLE_MANIFEST_RELATIVE_PATH, boundaryLabel: "plugin root", rejectHardlinks: true, }); - if (!opened.ok) { - return {}; - } - try { - const raw = JSON.parse(fs.readFileSync(opened.fd, "utf-8")) as unknown; - return raw && typeof raw === "object" && !Array.isArray(raw) - ? (raw as Record) - : {}; - } catch { - return {}; - } finally { - fs.closeSync(opened.fd); - } + return result.ok ? result.value : {}; } function resolveClaudeCommandRootDirs(rootDir: string): string[] { diff --git a/src/plugins/bundle-config-shared.ts b/src/plugins/bundle-config-shared.ts index 8ec0ca4acc8..a07d8ab82a7 100644 --- a/src/plugins/bundle-config-shared.ts +++ b/src/plugins/bundle-config-shared.ts @@ -1,9 +1,7 @@ -import fs from "node:fs"; -import path from "node:path"; import { applyMergePatch } from "../config/merge-patch.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; -import { matchRootFileOpenFailure, openRootFileSync } from "../infra/boundary-file-read.js"; -import { isRecord } from "../utils.js"; +import { matchRootFileOpenFailure, type RootFileOpenFailure } from "../infra/boundary-file-read.js"; +import { readRootJsonObjectSync } from "../infra/json-files.js"; import { normalizePluginsConfig, resolveEffectivePluginActivationState } from "./config-state.js"; import type { PluginBundleFormat } from "./manifest-types.js"; import { loadPluginManifestRegistryForPluginRegistry } from "./plugin-registry.js"; @@ -22,35 +20,25 @@ export type BundleServerRuntimeSupport = { export function readBundleJsonObject(params: { rootDir: string; relativePath: string; - onOpenFailure?: ( - failure: Extract, { ok: false }>, - ) => ReadBundleJsonResult; + onOpenFailure?: (failure: RootFileOpenFailure) => ReadBundleJsonResult; }): ReadBundleJsonResult { - const absolutePath = path.join(params.rootDir, params.relativePath); - const opened = openRootFileSync({ - absolutePath, - rootPath: params.rootDir, + const result = readRootJsonObjectSync({ + rootDir: params.rootDir, + relativePath: params.relativePath, boundaryLabel: "plugin root", rejectHardlinks: true, }); - if (!opened.ok) { - return params.onOpenFailure?.(opened) ?? { ok: true, raw: {} }; + if (result.ok) { + return { ok: true, raw: result.value }; } - try { - const raw = JSON.parse(fs.readFileSync(opened.fd, "utf-8")) as unknown; - if (!isRecord(raw)) { - return { ok: false, error: `${params.relativePath} must contain a JSON object` }; - } - return { ok: true, raw }; - } catch (error) { - return { ok: false, error: `failed to parse ${params.relativePath}: ${String(error)}` }; - } finally { - fs.closeSync(opened.fd); + if (result.reason === "open") { + return params.onOpenFailure?.(result.failure) ?? { ok: true, raw: {} }; } + return { ok: false, error: result.error }; } export function resolveBundleJsonOpenFailure(params: { - failure: Extract, { ok: false }>; + failure: RootFileOpenFailure; relativePath: string; allowMissing?: boolean; }): ReadBundleJsonResult { diff --git a/src/plugins/bundle-lsp.ts b/src/plugins/bundle-lsp.ts index 2e66f1c3f00..5066be4f74d 100644 --- a/src/plugins/bundle-lsp.ts +++ b/src/plugins/bundle-lsp.ts @@ -2,7 +2,7 @@ import fs from "node:fs"; import path from "node:path"; import { applyMergePatch } from "../config/merge-patch.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; -import { openRootFileSync } from "../infra/boundary-file-read.js"; +import { readRootJsonObjectSync } from "../infra/json-files.js"; import { isRecord } from "../utils.js"; import { inspectBundleServerRuntimeSupport, @@ -60,30 +60,32 @@ function resolveBundleLspConfigPaths(params: { return mergeBundlePathLists(defaults, declared); } -function loadBundleLspConfigFile(params: { - rootDir: string; - relativePath: string; -}): BundleLspConfig { - const absolutePath = path.resolve(params.rootDir, params.relativePath); - const opened = openRootFileSync({ - absolutePath, - rootPath: params.rootDir, +function loadBundleLspConfigFile(params: { rootDir: string; relativePath: string }): { + config: BundleLspConfig; + diagnostics: string[]; +} { + const result = readRootJsonObjectSync({ + rootDir: params.rootDir, + relativePath: params.relativePath, boundaryLabel: "plugin root", rejectHardlinks: true, }); - if (!opened.ok) { - return { lspServers: {} }; - } - try { - const stat = fs.fstatSync(opened.fd); - if (!stat.isFile()) { - return { lspServers: {} }; + if (!result.ok) { + if (result.reason === "open") { + return { + config: { lspServers: {} }, + diagnostics: + result.failure.reason === "path" + ? [] + : [`unable to read ${params.relativePath}: ${result.failure.reason}`], + }; } - const raw = JSON.parse(fs.readFileSync(opened.fd, "utf-8")) as unknown; - return { lspServers: extractLspServerMap(raw) }; - } finally { - fs.closeSync(opened.fd); + return { + config: { lspServers: {} }, + diagnostics: [`unable to read ${params.relativePath}: ${result.error}`], + }; } + return { config: { lspServers: extractLspServerMap(result.value) }, diagnostics: [] }; } function loadBundleLspConfig(params: { @@ -109,17 +111,17 @@ function loadBundleLspConfig(params: { raw: manifestLoaded.raw, rootDir: params.rootDir, }); + const diagnostics: string[] = []; for (const relativePath of filePaths) { - merged = applyMergePatch( - merged, - loadBundleLspConfigFile({ - rootDir: params.rootDir, - relativePath, - }), - ) as BundleLspConfig; + const loaded = loadBundleLspConfigFile({ + rootDir: params.rootDir, + relativePath, + }); + diagnostics.push(...loaded.diagnostics); + merged = applyMergePatch(merged, loaded.config) as BundleLspConfig; } - return { config: merged, diagnostics: [] }; + return { config: merged, diagnostics }; } export function inspectBundleLspRuntimeSupport(params: { diff --git a/src/plugins/bundle-manifest.ts b/src/plugins/bundle-manifest.ts index 3234ddbe623..d6bd7a656bb 100644 --- a/src/plugins/bundle-manifest.ts +++ b/src/plugins/bundle-manifest.ts @@ -1,7 +1,8 @@ import fs from "node:fs"; import path from "node:path"; import JSON5 from "json5"; -import { matchRootFileOpenFailure, openRootFileSync } from "../infra/boundary-file-read.js"; +import { matchRootFileOpenFailure } from "../infra/boundary-file-read.js"; +import { readRootStructuredFileSync } from "../infra/json-files.js"; import { normalizeLowercaseStringOrEmpty, normalizeOptionalString, @@ -98,15 +99,17 @@ function loadBundleManifestFile(params: { allowMissing?: boolean; }): BundleManifestFileLoadResult { const manifestPath = path.join(params.rootDir, params.manifestRelativePath); - const opened = openRootFileSync({ - absolutePath: manifestPath, - rootPath: params.rootDir, + const result = readRootStructuredFileSync>({ + rootDir: params.rootDir, ...(params.rootRealPath !== undefined ? { rootRealPath: params.rootRealPath } : {}), + relativePath: params.manifestRelativePath, boundaryLabel: "plugin root", rejectHardlinks: params.rejectHardlinks, + parse: (raw) => JSON5.parse(raw), + validate: isRecord, }); - if (!opened.ok) { - return matchRootFileOpenFailure(opened, { + if (!result.ok && result.reason === "open") { + return matchRootFileOpenFailure(result.failure, { path: () => { if (params.allowMissing) { return { ok: true, raw: {}, manifestPath }; @@ -120,21 +123,17 @@ function loadBundleManifestFile(params: { }), }); } - try { - const raw = JSON5.parse(fs.readFileSync(opened.fd, "utf-8")) as unknown; - if (!isRecord(raw)) { - return { ok: false, error: "plugin manifest must be an object", manifestPath }; - } - return { ok: true, raw, manifestPath }; - } catch (err) { + if (!result.ok) { return { ok: false, - error: `failed to parse plugin manifest: ${String(err)}`, + error: + result.reason === "invalid" + ? "plugin manifest must be an object" + : `failed to parse plugin manifest: ${result.error}`, manifestPath, }; - } finally { - fs.closeSync(opened.fd); } + return { ok: true, raw: result.value, manifestPath }; } function resolveCodexSkillDirs(raw: Record, rootDir: string): string[] { diff --git a/src/plugins/bundle-mcp.test.ts b/src/plugins/bundle-mcp.test.ts index 42971d3ae49..5d5d6ec9d98 100644 --- a/src/plugins/bundle-mcp.test.ts +++ b/src/plugins/bundle-mcp.test.ts @@ -3,6 +3,7 @@ import path from "node:path"; import { afterEach, describe, expect, it } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; import { isRecord } from "../utils.js"; +import { loadEnabledBundleLspConfig } from "./bundle-lsp.js"; import { loadEnabledBundleMcpConfig } from "./bundle-mcp.js"; import { createEnabledPluginEntries, @@ -218,4 +219,66 @@ describe("loadEnabledBundleMcpConfig", () => { }, ); }); + + it("reports malformed file-backed MCP configs instead of silently dropping servers", async () => { + await withBundleHomeEnv( + tempHarness, + "openclaw-bundle-malformed-mcp", + async ({ homeDir, workspaceDir }) => { + const pluginRoot = await writeClaudeBundleManifest({ + homeDir, + pluginId: "malformed-mcp", + manifest: { + name: "malformed-mcp", + mcpServers: ".mcp.json", + }, + }); + await fs.writeFile(path.join(pluginRoot, ".mcp.json"), "{", "utf-8"); + + const loaded = loadEnabledBundleMcpConfig({ + workspaceDir, + cfg: createEnabledBundleConfig(["malformed-mcp"]), + }); + + expect(loaded.config.mcpServers).toEqual({}); + expect(loaded.diagnostics).toEqual([ + expect.objectContaining({ + pluginId: "malformed-mcp", + message: expect.stringContaining("unable to read .mcp.json"), + }), + ]); + }, + ); + }); + + it("reports malformed file-backed LSP configs instead of silently dropping servers", async () => { + await withBundleHomeEnv( + tempHarness, + "openclaw-bundle-malformed-lsp", + async ({ homeDir, workspaceDir }) => { + const pluginRoot = await writeClaudeBundleManifest({ + homeDir, + pluginId: "malformed-lsp", + manifest: { + name: "malformed-lsp", + lspServers: ".lsp.json", + }, + }); + await fs.writeFile(path.join(pluginRoot, ".lsp.json"), "{", "utf-8"); + + const loaded = loadEnabledBundleLspConfig({ + workspaceDir, + cfg: createEnabledBundleConfig(["malformed-lsp"]), + }); + + expect(loaded.config.lspServers).toEqual({}); + expect(loaded.diagnostics).toEqual([ + expect.objectContaining({ + pluginId: "malformed-lsp", + message: expect.stringContaining("unable to read .lsp.json"), + }), + ]); + }, + ); + }); }); diff --git a/src/plugins/bundle-mcp.ts b/src/plugins/bundle-mcp.ts index 1530c1ae25e..d6583644d90 100644 --- a/src/plugins/bundle-mcp.ts +++ b/src/plugins/bundle-mcp.ts @@ -2,7 +2,7 @@ import fs from "node:fs"; import path from "node:path"; import { applyMergePatch } from "../config/merge-patch.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; -import { openRootFileSync } from "../infra/boundary-file-read.js"; +import { readRootJsonObjectSync } from "../infra/json-files.js"; import { isRecord } from "../utils.js"; import { inspectBundleServerRuntimeSupport, @@ -162,40 +162,46 @@ function absolutizeBundleMcpServer(params: { return next; } -function loadBundleFileBackedMcpConfig(params: { - rootDir: string; - relativePath: string; -}): BundleMcpConfig { +function loadBundleFileBackedMcpConfig(params: { rootDir: string; relativePath: string }): { + config: BundleMcpConfig; + diagnostics: string[]; +} { const rootDir = normalizeBundlePath(params.rootDir); const absolutePath = path.resolve(rootDir, params.relativePath); - const opened = openRootFileSync({ - absolutePath, - rootPath: rootDir, + const result = readRootJsonObjectSync({ + rootDir, + relativePath: params.relativePath, boundaryLabel: "plugin root", rejectHardlinks: true, }); - if (!opened.ok) { - return { mcpServers: {} }; - } - try { - const stat = fs.fstatSync(opened.fd); - if (!stat.isFile()) { - return { mcpServers: {} }; + if (!result.ok) { + if (result.reason === "open") { + return { + config: { mcpServers: {} }, + diagnostics: + result.failure.reason === "path" + ? [] + : [`unable to read ${params.relativePath}: ${result.failure.reason}`], + }; } - const raw = JSON.parse(fs.readFileSync(opened.fd, "utf-8")) as unknown; - const servers = extractMcpServerMap(raw); - const baseDir = normalizeBundlePath(path.dirname(absolutePath)); return { + config: { mcpServers: {} }, + diagnostics: [`unable to read ${params.relativePath}: ${result.error}`], + }; + } + const servers = extractMcpServerMap(result.value); + const baseDir = normalizeBundlePath(path.dirname(absolutePath)); + return { + config: { mcpServers: Object.fromEntries( Object.entries(servers).map(([serverName, server]) => [ serverName, absolutizeBundleMcpServer({ rootDir, baseDir, server }), ]), ), - }; - } finally { - fs.closeSync(opened.fd); - } + }, + diagnostics: [], + }; } function loadBundleInlineMcpConfig(params: { @@ -243,14 +249,14 @@ function loadBundleMcpConfig(params: { rootDir: params.rootDir, bundleFormat: params.bundleFormat, }); + const diagnostics: string[] = []; for (const relativePath of filePaths) { - merged = applyMergePatch( - merged, - loadBundleFileBackedMcpConfig({ - rootDir: params.rootDir, - relativePath, - }), - ) as BundleMcpConfig; + const loaded = loadBundleFileBackedMcpConfig({ + rootDir: params.rootDir, + relativePath, + }); + diagnostics.push(...loaded.diagnostics); + merged = applyMergePatch(merged, loaded.config) as BundleMcpConfig; } merged = applyMergePatch( @@ -261,7 +267,7 @@ function loadBundleMcpConfig(params: { }), ) as BundleMcpConfig; - return { config: merged, diagnostics: [] }; + return { config: merged, diagnostics }; } export function inspectBundleMcpRuntimeSupport(params: { diff --git a/src/plugins/discovery.ts b/src/plugins/discovery.ts index ff434732da2..11e5091ad2b 100644 --- a/src/plugins/discovery.ts +++ b/src/plugins/discovery.ts @@ -1,7 +1,7 @@ import fs from "node:fs"; import path from "node:path"; import type { PluginInstallRecord } from "../config/types.plugins.js"; -import { openRootFileSync } from "../infra/boundary-file-read.js"; +import { readRootJsonObjectSync } from "../infra/json-files.js"; import { tryReadJsonSync } from "../infra/json-files.js"; import { normalizeLowercaseStringOrEmpty, @@ -415,25 +415,14 @@ function readPackageManifest( rejectHardlinks = true, rootRealPath?: string, ): PackageManifest | null { - const manifestPath = path.join(dir, "package.json"); - const opened = openRootFileSync({ - absolutePath: manifestPath, - rootPath: dir, + const result = readRootJsonObjectSync({ + rootDir: dir, ...(rootRealPath !== undefined ? { rootRealPath } : {}), + relativePath: "package.json", boundaryLabel: "plugin package directory", rejectHardlinks, }); - if (!opened.ok) { - return null; - } - try { - const raw = fs.readFileSync(opened.fd, "utf-8"); - return JSON.parse(raw) as PackageManifest; - } catch { - return null; - } finally { - fs.closeSync(opened.fd); - } + return result.ok ? (result.value as PackageManifest) : null; } function readTrustedPackageManifest(dir: string): PackageManifest | null {