diff --git a/src/agents/command/session-store.ts b/src/agents/command/session-entry-updates.ts similarity index 87% rename from src/agents/command/session-store.ts rename to src/agents/command/session-entry-updates.ts index a5281539c95..8e50c425367 100644 --- a/src/agents/command/session-store.ts +++ b/src/agents/command/session-entry-updates.ts @@ -1,10 +1,12 @@ import { + getSessionEntry, mergeSessionEntry, setSessionRuntimeModel, type SessionEntry, - updateSessionStore, + upsertSessionEntry, } from "../../config/sessions.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; +import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; import { createLazyImportLoader } from "../../shared/lazy-promise.js"; import { normalizeOptionalString } from "../../shared/string-coerce.js"; import { clearCliSession, setCliSessionBinding, setCliSessionId } from "../cli-session.js"; @@ -45,12 +47,33 @@ function removeLifecycleStateFromMetadataPatch(entry: SessionEntry): SessionEntr return next; } -export async function updateSessionStoreAfterAgentRun(params: { +function persistMergedSessionEntry(params: { + sessionKey: string; + sessionStore: Record; + patch: SessionEntry; +}): SessionEntry { + const agentId = resolveAgentIdFromSessionKey(params.sessionKey); + if (!agentId) { + throw new Error( + `Session stores are SQLite-only; cannot resolve agent for ${params.sessionKey}`, + ); + } + const existing = getSessionEntry({ agentId, sessionKey: params.sessionKey }); + const merged = mergeSessionEntry(existing, params.patch); + upsertSessionEntry({ + agentId, + sessionKey: params.sessionKey, + entry: merged, + }); + params.sessionStore[params.sessionKey] = merged; + return merged; +} + +export async function updateSessionEntryAfterAgentRun(params: { cfg: OpenClawConfig; contextTokensOverride?: number; sessionId: string; sessionKey: string; - storePath: string; sessionStore: Record; defaultProvider: string; defaultModel: string; @@ -70,7 +93,6 @@ export async function updateSessionStoreAfterAgentRun(params: { cfg, sessionId, sessionKey, - storePath, sessionStore, defaultProvider, defaultModel, @@ -228,21 +250,19 @@ export async function updateSessionStoreAfterAgentRun(params: { next.compactionCount = (entry.compactionCount ?? 0) + compactionsThisRun; } const metadataPatch = removeLifecycleStateFromMetadataPatch(next); - const persisted = await updateSessionStore(storePath, (store) => { - const merged = mergeSessionEntry(store[sessionKey], metadataPatch); - store[sessionKey] = merged; - return merged; + persistMergedSessionEntry({ + sessionKey, + sessionStore, + patch: metadataPatch, }); - sessionStore[sessionKey] = persisted; } -export async function clearCliSessionInStore(params: { +export async function clearCliSessionEntry(params: { provider: string; sessionKey: string; sessionStore: Record; - storePath: string; }): Promise { - const { provider, sessionKey, sessionStore, storePath } = params; + const { provider, sessionKey, sessionStore } = params; const entry = sessionStore[sessionKey]; if (!entry) { return undefined; @@ -252,22 +272,19 @@ export async function clearCliSessionInStore(params: { clearCliSession(next, provider); next.updatedAt = Date.now(); - const persisted = await updateSessionStore(storePath, (store) => { - const merged = mergeSessionEntry(store[sessionKey], next); - store[sessionKey] = merged; - return merged; + return persistMergedSessionEntry({ + sessionKey, + sessionStore, + patch: next, }); - sessionStore[sessionKey] = persisted; - return persisted; } -export async function recordCliCompactionInStore(params: { +export async function recordCliCompactionInSessionEntry(params: { provider: string; sessionKey: string; sessionStore: Record; - storePath: string; }): Promise { - const { provider, sessionKey, sessionStore, storePath } = params; + const { provider, sessionKey, sessionStore } = params; const entry = sessionStore[sessionKey]; if (!entry) { return undefined; @@ -278,11 +295,9 @@ export async function recordCliCompactionInStore(params: { next.compactionCount = (entry.compactionCount ?? 0) + 1; next.updatedAt = Date.now(); - const persisted = await updateSessionStore(storePath, (store) => { - const merged = mergeSessionEntry(store[sessionKey], next); - store[sessionKey] = merged; - return merged; + return persistMergedSessionEntry({ + sessionKey, + sessionStore, + patch: next, }); - sessionStore[sessionKey] = persisted; - return persisted; } diff --git a/src/agents/command/session-store.runtime.ts b/src/agents/command/session-store.runtime.ts deleted file mode 100644 index c60601cba5b..00000000000 --- a/src/agents/command/session-store.runtime.ts +++ /dev/null @@ -1 +0,0 @@ -export { updateSessionStoreAfterAgentRun } from "./session-store.js"; diff --git a/src/agents/command/session-store.test.ts b/src/agents/command/session-store.test.ts deleted file mode 100644 index 9660481641e..00000000000 --- a/src/agents/command/session-store.test.ts +++ /dev/null @@ -1,1263 +0,0 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; -import { describe, expect, it, vi } from "vitest"; -import type { OpenClawConfig } from "../../config/config.js"; -import type { SessionEntry } from "../../config/sessions.js"; -import { loadSessionStore } from "../../config/sessions.js"; -import type { EmbeddedPiRunResult } from "../pi-embedded.js"; -import { clearCliSessionInStore, updateSessionStoreAfterAgentRun } from "./session-store.js"; -import { resolveSession } from "./session.js"; - -vi.mock("../model-selection.js", () => ({ - isCliProvider: (provider: string, cfg?: OpenClawConfig) => - Object.hasOwn(cfg?.agents?.defaults?.cliBackends ?? {}, provider), - normalizeProviderId: (provider: string) => provider.trim().toLowerCase(), -})); - -type MockCost = { - input?: number; - output?: number; -}; - -type MockProviderModel = { - id: string; - cost?: MockCost; -}; - -type MockUsageFormatConfig = { - models?: { - providers?: Record; - }; -}; - -vi.mock("../../utils/usage-format.js", () => ({ - estimateUsageCost: (params: { usage?: { input?: number; output?: number }; cost?: MockCost }) => { - if (!params.usage || !params.cost) { - return undefined; - } - const input = params.usage.input ?? 0; - const output = params.usage.output ?? 0; - const costInput = params.cost.input ?? 0; - const costOutput = params.cost.output ?? 0; - const total = input * costInput + output * costOutput; - if (!Number.isFinite(total)) { - return undefined; - } - return total / 1e6; - }, - resolveModelCostConfig: (params: { provider?: string; model?: string; config?: unknown }) => { - const providers = (params.config as MockUsageFormatConfig | undefined)?.models?.providers; - if (!providers) { - return undefined; - } - const model = providers[params.provider ?? ""]?.models?.find( - (entry) => entry.id === params.model, - ); - if (!model) { - return undefined; - } - return model.cost; - }, -})); - -vi.mock("../../config/sessions.js", async () => { - const fsSync = await import("node:fs"); - const fs = await import("node:fs/promises"); - const path = await import("node:path"); - const readStore = async (storePath: string): Promise> => { - try { - return JSON.parse(await fs.readFile(storePath, "utf8")) as Record; - } catch { - return {}; - } - }; - const writeStore = async (storePath: string, store: Record) => { - await fs.mkdir(path.dirname(storePath), { recursive: true }); - await fs.writeFile(storePath, JSON.stringify(store, null, 2), "utf8"); - }; - return { - mergeSessionEntry: (existing: SessionEntry | undefined, patch: Partial) => ({ - ...existing, - ...patch, - sessionId: patch.sessionId ?? existing?.sessionId ?? "mock-session", - updatedAt: Math.max(existing?.updatedAt ?? 0, patch.updatedAt ?? 0, Date.now()), - }), - setSessionRuntimeModel: (entry: SessionEntry, runtime: { provider: string; model: string }) => { - entry.modelProvider = runtime.provider; - entry.model = runtime.model; - return true; - }, - updateSessionStore: async ( - storePath: string, - mutator: (store: Record) => Promise | T, - ) => { - const store = await readStore(storePath); - const previousAcpByKey = new Map( - Object.entries(store) - .filter( - (entry): entry is [string, SessionEntry & { acp: NonNullable }] => - Boolean(entry[1]?.acp), - ) - .map(([key, entry]) => [key, entry.acp]), - ); - const result = await mutator(store); - for (const [key, acp] of previousAcpByKey) { - const next = store[key]; - if (next && !next.acp) { - next.acp = acp; - } - } - await writeStore(storePath, store); - return result; - }, - loadSessionStore: (storePath: string) => { - try { - return JSON.parse(fsSync.readFileSync(storePath, "utf8")) as Record; - } catch { - return {}; - } - }, - }; -}); - -function acpMeta() { - return { - backend: "acpx", - agent: "codex", - runtimeSessionName: "runtime-1", - mode: "persistent" as const, - state: "idle" as const, - lastActivityAt: Date.now(), - }; -} - -async function withTempSessionStore( - run: (params: { dir: string; storePath: string }) => Promise, -): Promise { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-store-")); - try { - return await run({ dir, storePath: path.join(dir, "sessions.json") }); - } finally { - await fs.rm(dir, { recursive: true, force: true }); - } -} - -describe("updateSessionStoreAfterAgentRun", () => { - it("persists the selected embedded harness id on the session", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = {} as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-harness-pin"; - const sessionId = "test-harness-pin-session"; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - const result: EmbeddedPiRunResult = { - meta: { - durationMs: 1, - agentMeta: { - sessionId, - provider: "openai", - model: "gpt-5.4", - agentHarnessId: "codex", - }, - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "openai", - defaultModel: "gpt-5.4", - result, - }); - - expect(sessionStore[sessionKey]?.agentHarnessId).toBe("codex"); - expect(loadSessionStore(storePath)[sessionKey]?.agentHarnessId).toBe("codex"); - }); - }); - - it("uses the runtime context budget from agent metadata instead of cold fallback", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = {} as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-runtime-context"; - const sessionId = "test-runtime-context-session"; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - const result: EmbeddedPiRunResult = { - meta: { - durationMs: 1, - agentMeta: { - sessionId, - provider: "openai-codex", - model: "gpt-5.5", - contextTokens: 400_000, - }, - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "openai-codex", - defaultModel: "gpt-5.5", - result, - }); - - expect(sessionStore[sessionKey]?.contextTokens).toBe(400_000); - expect(loadSessionStore(storePath)[sessionKey]?.contextTokens).toBe(400_000); - }); - }); - - it("clears the embedded harness pin after a CLI run", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = { - agents: { - defaults: { - cliBackends: { - "claude-cli": { - command: "claude", - }, - }, - }, - }, - } as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-harness-pin-cli"; - const sessionId = "test-harness-pin-cli-session"; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - agentHarnessId: "codex", - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - const result: EmbeddedPiRunResult = { - meta: { - durationMs: 1, - executionTrace: { runner: "cli" }, - agentMeta: { - sessionId: "cli-session-123", - provider: "claude-cli", - model: "claude-sonnet-4-6", - }, - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "claude-cli", - defaultModel: "claude-sonnet-4-6", - result, - }); - - expect(sessionStore[sessionKey]?.agentHarnessId).toBeUndefined(); - expect(loadSessionStore(storePath)[sessionKey]?.agentHarnessId).toBeUndefined(); - }); - }); - - it("persists claude-cli session bindings when the backend is configured", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = { - agents: { - defaults: { - cliBackends: { - "claude-cli": { - command: "claude", - }, - }, - }, - }, - } as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-claude-cli"; - const sessionId = "test-openclaw-session"; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - const result: EmbeddedPiRunResult = { - meta: { - durationMs: 1, - agentMeta: { - sessionId: "cli-session-123", - provider: "claude-cli", - model: "claude-sonnet-4-6", - cliSessionBinding: { - sessionId: "cli-session-123", - }, - }, - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - contextTokensOverride: 200_000, - defaultProvider: "claude-cli", - defaultModel: "claude-sonnet-4-6", - result, - }); - - expect(sessionStore[sessionKey]?.cliSessionBindings?.["claude-cli"]).toEqual({ - sessionId: "cli-session-123", - }); - expect(sessionStore[sessionKey]?.cliSessionIds?.["claude-cli"]).toBe("cli-session-123"); - expect(sessionStore[sessionKey]?.claudeCliSessionId).toBe("cli-session-123"); - - const persisted = loadSessionStore(storePath); - expect(persisted[sessionKey]?.cliSessionBindings?.["claude-cli"]).toEqual({ - sessionId: "cli-session-123", - }); - expect(persisted[sessionKey]?.cliSessionIds?.["claude-cli"]).toBe("cli-session-123"); - expect(persisted[sessionKey]?.claudeCliSessionId).toBe("cli-session-123"); - }); - }); - - it("preserves ACP metadata when caller has a stale session snapshot", async () => { - await withTempSessionStore(async ({ storePath }) => { - const sessionKey = "agent:codex:acp:test-acp-preserve"; - const sessionId = "test-acp-session"; - - const existing: SessionEntry = { - sessionId, - updatedAt: Date.now(), - acp: acpMeta(), - }; - await fs.writeFile(storePath, JSON.stringify({ [sessionKey]: existing }, null, 2), "utf8"); - - const staleInMemory: Record = { - [sessionKey]: { - sessionId, - updatedAt: Date.now(), - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg: {} as never, - sessionId, - sessionKey, - storePath, - sessionStore: staleInMemory, - contextTokensOverride: 200_000, - defaultProvider: "openai", - defaultModel: "gpt-5.4", - result: { - payloads: [], - meta: { - aborted: false, - agentMeta: { - provider: "openai", - model: "gpt-5.4", - }, - }, - } as never, - }); - - const persisted = loadSessionStore(storePath)[sessionKey]; - expect(persisted?.acp).toMatchObject({ - backend: "acpx", - agent: "codex", - runtimeSessionName: "runtime-1", - mode: "persistent", - state: "idle", - }); - expect(staleInMemory[sessionKey]?.acp).toEqual(persisted?.acp); - }); - }); - - it("preserves terminal lifecycle state when caller has a stale running snapshot", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = {} as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-lifecycle-preserve"; - const sessionId = "test-lifecycle-preserve-session"; - const terminalEntry: SessionEntry = { - sessionId, - updatedAt: 2_000, - status: "done", - startedAt: 1_000, - endedAt: 1_900, - runtimeMs: 900, - }; - await fs.writeFile(storePath, JSON.stringify({ [sessionKey]: terminalEntry }, null, 2)); - - const staleInMemory: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1_100, - status: "running", - startedAt: 1_000, - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore: staleInMemory, - defaultProvider: "openai", - defaultModel: "gpt-5.4", - result: { - payloads: [], - meta: { - aborted: false, - agentMeta: { - provider: "openai", - model: "gpt-5.4", - }, - }, - } as never, - }); - - const persisted = loadSessionStore(storePath)[sessionKey]; - expect(persisted).toMatchObject({ - status: "done", - startedAt: 1_000, - endedAt: 1_900, - runtimeMs: 900, - modelProvider: "openai", - model: "gpt-5.4", - }); - expect(staleInMemory[sessionKey]?.status).toBe("done"); - }); - }); - - it("persists latest systemPromptReport for downstream warning dedupe", async () => { - await withTempSessionStore(async ({ storePath }) => { - const sessionKey = "agent:codex:report:test-system-prompt-report"; - const sessionId = "test-system-prompt-report-session"; - - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: Date.now(), - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf8"); - - const report = { - source: "run" as const, - generatedAt: Date.now(), - bootstrapTruncation: { - warningMode: "once" as const, - warningSignaturesSeen: ["sig-a", "sig-b"], - }, - systemPrompt: { - chars: 1, - projectContextChars: 1, - nonProjectContextChars: 0, - }, - injectedWorkspaceFiles: [], - skills: { promptChars: 0, entries: [] }, - tools: { listChars: 0, schemaChars: 0, entries: [] }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg: {} as never, - sessionId, - sessionKey, - storePath, - sessionStore, - contextTokensOverride: 200_000, - defaultProvider: "openai", - defaultModel: "gpt-5.4", - result: { - payloads: [], - meta: { - agentMeta: { - provider: "openai", - model: "gpt-5.4", - }, - systemPromptReport: report, - }, - } as never, - }); - - const persisted = loadSessionStore(storePath)[sessionKey]; - expect(persisted?.systemPromptReport?.bootstrapTruncation?.warningSignaturesSeen).toEqual([ - "sig-a", - "sig-b", - ]); - expect(sessionStore[sessionKey]?.systemPromptReport?.bootstrapTruncation?.warningMode).toBe( - "once", - ); - }); - }); - - it("stores and reloads the runtime model for explicit session-id-only runs", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = { - session: { - store: storePath, - mainKey: "main", - }, - agents: { - defaults: { - cliBackends: { - "claude-cli": { command: "claude" }, - }, - }, - }, - } as never; - - const first = resolveSession({ - cfg, - sessionId: "explicit-session-123", - }); - - expect(first.sessionKey).toBe("agent:main:explicit:explicit-session-123"); - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId: first.sessionId, - sessionKey: first.sessionKey!, - storePath: first.storePath, - sessionStore: first.sessionStore!, - contextTokensOverride: 200_000, - defaultProvider: "claude-cli", - defaultModel: "claude-sonnet-4-6", - result: { - payloads: [], - meta: { - agentMeta: { - provider: "claude-cli", - model: "claude-sonnet-4-6", - sessionId: "claude-cli-session-1", - cliSessionBinding: { - sessionId: "claude-cli-session-1", - authEpoch: "auth-epoch-1", - }, - }, - }, - } as never, - }); - - const second = resolveSession({ - cfg, - sessionId: "explicit-session-123", - }); - - expect(second.sessionKey).toBe(first.sessionKey); - expect(second.sessionEntry?.cliSessionBindings?.["claude-cli"]).toEqual({ - sessionId: "claude-cli-session-1", - authEpoch: "auth-epoch-1", - }); - - const persisted = loadSessionStore(storePath)[first.sessionKey!]; - expect(persisted?.cliSessionBindings?.["claude-cli"]).toEqual({ - sessionId: "claude-cli-session-1", - authEpoch: "auth-epoch-1", - }); - }); - }); - - it("preserves previous totalTokens when provider returns no usage data (#67667)", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = {} as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-no-usage"; - const sessionId = "test-session"; - - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - totalTokens: 21225, - totalTokensFresh: true, - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - const result: EmbeddedPiRunResult = { - meta: { - durationMs: 500, - agentMeta: { - sessionId, - provider: "minimax", - model: "MiniMax-M2.7", - }, - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "minimax", - defaultModel: "MiniMax-M2.7", - result, - }); - - expect(sessionStore[sessionKey]?.totalTokens).toBe(21225); - expect(sessionStore[sessionKey]?.totalTokensFresh).toBe(false); - - const persisted = loadSessionStore(storePath); - expect(persisted[sessionKey]?.totalTokens).toBe(21225); - expect(persisted[sessionKey]?.totalTokensFresh).toBe(false); - }); - }); - - it("does not treat CLI cumulative usage as a fresh context snapshot", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = { - agents: { - defaults: { - cliBackends: { - "claude-cli": { command: "claude" }, - }, - }, - }, - } as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-cli-cumulative-usage"; - const sessionId = "test-cli-cumulative-usage-session"; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - totalTokens: 95_000, - totalTokensFresh: true, - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - await updateSessionStoreAfterAgentRun({ - cfg, - contextTokensOverride: 1_000_000, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "claude-cli", - defaultModel: "claude-opus-4-7", - result: { - meta: { - durationMs: 1, - executionTrace: { runner: "cli" }, - agentMeta: { - sessionId, - provider: "claude-cli", - model: "claude-opus-4-7", - usage: { - input: 3_800_000, - output: 20_000, - total: 3_820_000, - }, - }, - }, - }, - }); - - expect(sessionStore[sessionKey]?.inputTokens).toBe(3_800_000); - expect(sessionStore[sessionKey]?.outputTokens).toBe(20_000); - expect(sessionStore[sessionKey]?.totalTokens).toBeUndefined(); - expect(sessionStore[sessionKey]?.totalTokensFresh).toBe(false); - }); - }); - - it("persists compaction tokensAfter when provider usage is unavailable", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = {} as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-compaction-tokens-after"; - const sessionId = "test-compaction-tokens-after-session"; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - const result: EmbeddedPiRunResult = { - meta: { - durationMs: 500, - agentMeta: { - sessionId, - provider: "minimax", - model: "MiniMax-M2.7", - compactionCount: 1, - compactionTokensAfter: 21_225, - }, - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "minimax", - defaultModel: "MiniMax-M2.7", - result, - }); - - expect(sessionStore[sessionKey]?.totalTokens).toBe(21_225); - expect(sessionStore[sessionKey]?.totalTokensFresh).toBe(true); - expect(sessionStore[sessionKey]?.compactionCount).toBe(1); - - const persisted = loadSessionStore(storePath); - expect(persisted[sessionKey]?.totalTokens).toBe(21_225); - expect(persisted[sessionKey]?.totalTokensFresh).toBe(true); - }); - }); - - it("ignores non-finite compaction tokensAfter values", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = {} as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-compaction-tokens-after-invalid"; - const sessionId = "test-compaction-tokens-after-invalid-session"; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - totalTokens: 12_000, - totalTokensFresh: true, - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "minimax", - defaultModel: "MiniMax-M2.7", - result: { - meta: { - durationMs: 500, - agentMeta: { - sessionId, - provider: "minimax", - model: "MiniMax-M2.7", - compactionCount: 1, - compactionTokensAfter: Number.POSITIVE_INFINITY, - }, - }, - }, - }); - - expect(sessionStore[sessionKey]?.totalTokens).toBe(12_000); - expect(sessionStore[sessionKey]?.totalTokensFresh).toBe(false); - }); - }); - - it("snapshots cost instead of accumulating (fixes #69347)", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = { - models: { - providers: { - openai: { - models: [ - { - id: "gpt-4", - cost: { - input: 10, - output: 30, - cacheRead: 0, - cacheWrite: 0, - }, - }, - ], - }, - }, - }, - } as unknown as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-cost-snapshot"; - const sessionId = "test-cost-snapshot-session"; - - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - // Simulate a run with 10k input + 5k output tokens - // Cost = (10000 * 10 + 5000 * 30) / 1e6 = $0.25 - const result: EmbeddedPiRunResult = { - meta: { - durationMs: 500, - agentMeta: { - sessionId, - provider: "openai", - model: "gpt-4", - usage: { - input: 10000, - output: 5000, - }, - }, - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "openai", - defaultModel: "gpt-4", - result, - }); - - // First run: cost should be $0.25 - expect(sessionStore[sessionKey]?.estimatedCostUsd).toBeCloseTo(0.25, 4); - - // Simulate a second persist with the SAME cumulative usage (e.g., from a heartbeat or - // redundant persist). Before the fix, this would double the cost. - // After the fix, cost should remain the same because it's snapshotted. - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "openai", - defaultModel: "gpt-4", - result, // Same usage again - }); - - // After second persist with same usage, cost should STILL be $0.25 (not $0.50) - expect(sessionStore[sessionKey]?.estimatedCostUsd).toBeCloseTo(0.25, 4); - - const persisted = loadSessionStore(storePath); - expect(persisted[sessionKey]?.estimatedCostUsd).toBeCloseTo(0.25, 4); - }); - }); - - it("preserves lastInteractionAt for non-interactive system runs", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = {} as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-system-run"; - const sessionId = "test-system-run-session"; - const lastInteractionAt = Date.now() - 60 * 60_000; - const sessionStartedAt = Date.now() - 2 * 60 * 60_000; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: Date.now() - 10_000, - sessionStartedAt, - lastInteractionAt, - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "openai", - defaultModel: "gpt-5.4", - result: { - meta: { - durationMs: 1, - agentMeta: { - sessionId, - provider: "openai", - model: "gpt-5.4", - }, - }, - }, - touchInteraction: false, - }); - - expect(sessionStore[sessionKey]?.lastInteractionAt).toBe(lastInteractionAt); - expect(sessionStore[sessionKey]?.sessionStartedAt).toBe(sessionStartedAt); - expect(sessionStore[sessionKey]?.updatedAt).toBeGreaterThan(lastInteractionAt); - }); - }); - - it("advances lastInteractionAt for interactive runs", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = {} as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-user-run"; - const sessionId = "test-user-run-session"; - const lastInteractionAt = Date.now() - 60 * 60_000; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: Date.now() - 10_000, - lastInteractionAt, - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "openai", - defaultModel: "gpt-5.4", - result: { - meta: { - durationMs: 1, - agentMeta: { - sessionId, - provider: "openai", - model: "gpt-5.4", - }, - }, - }, - }); - - expect(sessionStore[sessionKey]?.lastInteractionAt).toBeGreaterThan(lastInteractionAt); - }); - }); - - it("preserves runtime model and contextTokens when preserveRuntimeModel is true (heartbeat bleed fix)", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = {} as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-heartbeat-bleed"; - const sessionId = "test-heartbeat-bleed-session"; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - modelProvider: "anthropic", - model: "claude-opus-4-6", - contextTokens: 1_000_000, - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - // Heartbeat turn uses a different model - const result: EmbeddedPiRunResult = { - meta: { - durationMs: 500, - agentMeta: { - sessionId, - provider: "ollama", - model: "llama3.2:1b", - contextTokens: 128_000, - }, - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "anthropic", - defaultModel: "claude-opus-4-6", - result, - preserveRuntimeModel: true, - }); - - // Runtime model and contextTokens should be preserved from the original entry - expect(sessionStore[sessionKey]?.model).toBe("claude-opus-4-6"); - expect(sessionStore[sessionKey]?.modelProvider).toBe("anthropic"); - expect(sessionStore[sessionKey]?.contextTokens).toBe(1_000_000); - - const persisted = loadSessionStore(storePath); - expect(persisted[sessionKey]?.model).toBe("claude-opus-4-6"); - expect(persisted[sessionKey]?.modelProvider).toBe("anthropic"); - expect(persisted[sessionKey]?.contextTokens).toBe(1_000_000); - }); - }); - - it("leaves contextTokens unset when entry has prior model but no contextTokens (heartbeat bleed guard)", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = {} as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-heartbeat-no-context-tokens"; - const sessionId = "test-heartbeat-no-context-tokens-session"; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - modelProvider: "anthropic", - model: "claude-opus-4-6", - // contextTokens intentionally missing — older session without cached context - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - // Heartbeat turn uses a different, smaller model - const result: EmbeddedPiRunResult = { - meta: { - durationMs: 500, - agentMeta: { - sessionId, - provider: "ollama", - model: "llama3.2:1b", - contextTokens: 128_000, - }, - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "anthropic", - defaultModel: "claude-opus-4-6", - result, - preserveRuntimeModel: true, - }); - - // Runtime model should be preserved - expect(sessionStore[sessionKey]?.model).toBe("claude-opus-4-6"); - expect(sessionStore[sessionKey]?.modelProvider).toBe("anthropic"); - // contextTokens should NOT bleed from the heartbeat run's smaller window - expect(sessionStore[sessionKey]?.contextTokens).toBeUndefined(); - }); - }); - - it("does not set runtime model when preserveRuntimeModel is true and entry has no prior runtime model", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = {} as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-heartbeat-new-session"; - const sessionId = "test-heartbeat-new-session-id"; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - const result: EmbeddedPiRunResult = { - meta: { - durationMs: 500, - agentMeta: { - sessionId, - provider: "ollama", - model: "llama3.2:1b", - contextTokens: 128_000, - }, - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "ollama", - defaultModel: "llama3.2:1b", - result, - preserveRuntimeModel: true, - }); - - // Heartbeat should NOT establish initial model state on an empty session - expect(sessionStore[sessionKey]?.model).toBeUndefined(); - expect(sessionStore[sessionKey]?.modelProvider).toBeUndefined(); - expect(sessionStore[sessionKey]?.contextTokens).toBeUndefined(); - }); - }); - - it("preserves model without borrowing heartbeat provider when entry has model but no modelProvider", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = {} as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-heartbeat-model-no-provider"; - const sessionId = "test-heartbeat-model-no-provider-session"; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - model: "claude-opus-4-6", - // modelProvider intentionally missing - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - // Heartbeat turn uses a different provider - const result: EmbeddedPiRunResult = { - meta: { - durationMs: 500, - agentMeta: { - sessionId, - provider: "ollama", - model: "llama3.2:1b", - contextTokens: 128_000, - }, - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "anthropic", - defaultModel: "claude-opus-4-6", - result, - preserveRuntimeModel: true, - }); - - // Model preserved, provider NOT borrowed from heartbeat - expect(sessionStore[sessionKey]?.model).toBe("claude-opus-4-6"); - expect(sessionStore[sessionKey]?.modelProvider).toBeUndefined(); - - const persisted = loadSessionStore(storePath); - expect(persisted[sessionKey]?.model).toBe("claude-opus-4-6"); - expect(persisted[sessionKey]?.modelProvider).toBeUndefined(); - }); - }); - - it("overwrites runtime model when preserveRuntimeModel is false (default behavior)", async () => { - await withTempSessionStore(async ({ storePath }) => { - const cfg = {} as OpenClawConfig; - const sessionKey = "agent:main:explicit:test-normal-overwrite"; - const sessionId = "test-normal-overwrite-session"; - const sessionStore: Record = { - [sessionKey]: { - sessionId, - updatedAt: 1, - modelProvider: "anthropic", - model: "claude-opus-4-6", - contextTokens: 1_000_000, - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2)); - - const result: EmbeddedPiRunResult = { - meta: { - durationMs: 500, - agentMeta: { - sessionId, - provider: "openai", - model: "gpt-5.4", - contextTokens: 400_000, - }, - }, - }; - - await updateSessionStoreAfterAgentRun({ - cfg, - sessionId, - sessionKey, - storePath, - sessionStore, - defaultProvider: "openai", - defaultModel: "gpt-5.4", - result, - }); - - // Normal turn: runtime model is updated - expect(sessionStore[sessionKey]?.model).toBe("gpt-5.4"); - expect(sessionStore[sessionKey]?.modelProvider).toBe("openai"); - expect(sessionStore[sessionKey]?.contextTokens).toBe(400_000); - }); - }); -}); - -describe("clearCliSessionInStore", () => { - it("persists cleared Claude CLI bindings through session-store merge", async () => { - await withTempSessionStore(async ({ storePath }) => { - const sessionKey = "agent:main:explicit:test-clear-claude-cli"; - const entry: SessionEntry = { - sessionId: "openclaw-session-1", - updatedAt: 1, - cliSessionBindings: { - "claude-cli": { - sessionId: "claude-session-1", - authEpoch: "epoch-1", - }, - "codex-cli": { - sessionId: "codex-session-1", - }, - }, - cliSessionIds: { - "claude-cli": "claude-session-1", - "codex-cli": "codex-session-1", - }, - claudeCliSessionId: "claude-session-1", - }; - const sessionStore: Record = { [sessionKey]: entry }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf8"); - - const cleared = await clearCliSessionInStore({ - provider: "claude-cli", - sessionKey, - sessionStore, - storePath, - }); - - expect(cleared?.cliSessionBindings?.["claude-cli"]).toBeUndefined(); - expect(cleared?.cliSessionBindings?.["codex-cli"]).toEqual({ - sessionId: "codex-session-1", - }); - expect(cleared?.cliSessionIds?.["claude-cli"]).toBeUndefined(); - expect(cleared?.cliSessionIds?.["codex-cli"]).toBe("codex-session-1"); - expect(cleared?.claudeCliSessionId).toBeUndefined(); - expect(sessionStore[sessionKey]).toEqual(cleared); - - const persisted = loadSessionStore(storePath)[sessionKey]; - expect(persisted?.cliSessionBindings?.["claude-cli"]).toBeUndefined(); - expect(persisted?.cliSessionBindings?.["codex-cli"]).toEqual({ - sessionId: "codex-session-1", - }); - expect(persisted?.cliSessionIds?.["claude-cli"]).toBeUndefined(); - expect(persisted?.cliSessionIds?.["codex-cli"]).toBe("codex-session-1"); - expect(persisted?.claudeCliSessionId).toBeUndefined(); - }); - }); - - it("leaves the caller snapshot intact when the session entry is missing", async () => { - await withTempSessionStore(async ({ storePath }) => { - const existingKey = "agent:main:explicit:existing"; - const sessionStore: Record = { - [existingKey]: { - sessionId: "openclaw-session-1", - updatedAt: 1, - claudeCliSessionId: "claude-session-1", - }, - }; - await fs.writeFile(storePath, JSON.stringify(sessionStore, null, 2), "utf8"); - - const cleared = await clearCliSessionInStore({ - provider: "claude-cli", - sessionKey: "agent:main:explicit:missing", - sessionStore, - storePath, - }); - - expect(cleared).toBeUndefined(); - expect(sessionStore[existingKey]?.claudeCliSessionId).toBe("claude-session-1"); - expect(loadSessionStore(storePath)[existingKey]?.claudeCliSessionId).toBe("claude-session-1"); - }); - }); -}); diff --git a/src/agents/session-file-repair.test.ts b/src/agents/session-file-repair.test.ts deleted file mode 100644 index b68a9ba1140..00000000000 --- a/src/agents/session-file-repair.test.ts +++ /dev/null @@ -1,710 +0,0 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; -import { afterEach, describe, expect, it, vi } from "vitest"; -import { - exportSqliteSessionTranscriptJsonl, - replaceSqliteSessionTranscriptEvents, - resolveSqliteSessionTranscriptScopeForPath, -} from "../config/sessions/transcript-store.sqlite.js"; -import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; -import { BLANK_USER_FALLBACK_TEXT, repairSessionFileIfNeeded } from "./session-file-repair.js"; - -function buildSessionHeaderAndMessage() { - const header = { - type: "session", - version: 7, - id: "session-1", - timestamp: new Date().toISOString(), - cwd: "/tmp", - }; - const message = { - type: "message", - id: "msg-1", - parentId: null, - timestamp: new Date().toISOString(), - message: { role: "user", content: "hello" }, - }; - return { header, message }; -} - -const tempDirs: string[] = []; - -async function createTempSessionPath() { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-repair-")); - tempDirs.push(dir); - return { dir, file: path.join(dir, "session.jsonl") }; -} - -afterEach(async () => { - closeOpenClawStateDatabaseForTest(); - await Promise.all(tempDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true }))); -}); - -function writeTranscriptEvents(file: string, events: unknown[]) { - const sessionId = - events.find((event): event is { type: "session"; id: string } => - Boolean( - event && - typeof event === "object" && - (event as { type?: unknown }).type === "session" && - typeof (event as { id?: unknown }).id === "string", - ), - )?.id ?? path.basename(file, ".jsonl"); - replaceSqliteSessionTranscriptEvents({ - agentId: "main", - sessionId, - transcriptPath: file, - events, - }); -} - -async function readTranscriptJsonl(file: string): Promise { - const scope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath: file }); - return scope ? exportSqliteSessionTranscriptJsonl(scope) : ""; -} - -describe("repairSessionFileIfNeeded", () => { - it("rewrites session files that contain malformed lines", async () => { - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - - writeTranscriptEvents(file, [ - header, - message, - { type: "message", id: "corrupt", message: { role: null, content: "bad" } }, - ]); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - expect(result.repaired).toBe(true); - expect(result.droppedLines).toBe(1); - const repaired = await readTranscriptJsonl(file); - expect(repaired.trim().split("\n")).toHaveLength(2); - }); - - it("does not drop CRLF-terminated JSONL lines", async () => { - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - const content = `${JSON.stringify(header)}\r\n${JSON.stringify(message)}\r\n`; - await fs.writeFile(file, content, "utf-8"); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - expect(result.repaired).toBe(false); - expect(result.droppedLines).toBe(0); - }); - - it("warns and skips repair when the session header is invalid", async () => { - const { file } = await createTempSessionPath(); - const badHeader = { - type: "message", - id: "msg-1", - timestamp: new Date().toISOString(), - message: { role: "user", content: "hello" }, - }; - writeTranscriptEvents(file, [badHeader]); - - const warn = vi.fn(); - const result = await repairSessionFileIfNeeded({ sessionFile: file, warn }); - - expect(result.repaired).toBe(false); - expect(result.reason).toBe("invalid session header"); - expect(warn).toHaveBeenCalledTimes(1); - expect(warn.mock.calls[0]?.[0]).toContain("invalid session header"); - }); - - it("returns a detailed reason when read errors are not ENOENT", async () => { - const { dir } = await createTempSessionPath(); - const warn = vi.fn(); - - const result = await repairSessionFileIfNeeded({ sessionFile: dir, warn }); - - expect(result.repaired).toBe(false); - expect(result.reason).toBe("missing SQLite transcript"); - expect(warn).not.toHaveBeenCalled(); - }); - - it("rewrites persisted assistant messages with empty content arrays", async () => { - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - const poisonedAssistantEntry = { - type: "message", - id: "msg-2", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [], - api: "bedrock-converse-stream", - provider: "amazon-bedrock", - model: "anthropic.claude-3-haiku-20240307-v1:0", - usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 }, - stopReason: "error", - errorMessage: "transient stream failure", - }, - }; - // Follow-up keeps this case focused on empty error-turn repair. - const followUp = { - type: "message", - id: "msg-3", - parentId: null, - timestamp: new Date().toISOString(), - message: { role: "user", content: "retry" }, - }; - writeTranscriptEvents(file, [header, message, poisonedAssistantEntry, followUp]); - - const debug = vi.fn(); - const result = await repairSessionFileIfNeeded({ sessionFile: file, debug }); - - expect(result.repaired).toBe(true); - expect(result.droppedLines).toBe(0); - expect(result.rewrittenAssistantMessages).toBe(1); - expect(debug).toHaveBeenCalledTimes(1); - const debugMessage = debug.mock.calls[0]?.[0] as string; - expect(debugMessage).toContain("rewrote 1 assistant message(s)"); - expect(debugMessage).not.toContain("dropped"); - - const repaired = await readTranscriptJsonl(file); - const repairedLines = repaired.trim().split("\n"); - expect(repairedLines).toHaveLength(4); - const repairedEntry: { message: { content: { type: string; text: string }[] } } = JSON.parse( - repairedLines[2], - ); - expect(repairedEntry.message.content).toEqual([ - { type: "text", text: "[assistant turn failed before producing content]" }, - ]); - }); - - it("rewrites blank-only user text messages to synthetic placeholder instead of dropping", async () => { - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - const blankUserEntry = { - type: "message", - id: "msg-blank", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "user", - content: [{ type: "text", text: "" }], - }, - }; - writeTranscriptEvents(file, [header, blankUserEntry, message]); - - const debug = vi.fn(); - const result = await repairSessionFileIfNeeded({ sessionFile: file, debug }); - - expect(result.repaired).toBe(true); - expect(result.rewrittenUserMessages).toBe(1); - expect(result.droppedBlankUserMessages).toBe(0); - expect(debug.mock.calls[0]?.[0]).toContain("rewrote 1 user message(s)"); - - const repaired = await readTranscriptJsonl(file); - const repairedLines = repaired.trim().split("\n"); - expect(repairedLines).toHaveLength(3); - const rewrittenEntry = JSON.parse(repairedLines[1]); - expect(rewrittenEntry.id).toBe("msg-blank"); - expect(rewrittenEntry.message.content).toEqual([ - { type: "text", text: BLANK_USER_FALLBACK_TEXT }, - ]); - }); - - it("rewrites blank string-content user messages to placeholder", async () => { - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - const blankStringUserEntry = { - type: "message", - id: "msg-blank-str", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "user", - content: " ", - }, - }; - writeTranscriptEvents(file, [header, blankStringUserEntry, message]); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(true); - expect(result.rewrittenUserMessages).toBe(1); - - const repaired = await readTranscriptJsonl(file); - const repairedLines = repaired.trim().split("\n"); - expect(repairedLines).toHaveLength(3); - const rewrittenEntry = JSON.parse(repairedLines[1]); - expect(rewrittenEntry.message.content).toBe(BLANK_USER_FALLBACK_TEXT); - }); - - it("removes blank user text blocks while preserving media blocks", async () => { - const { file } = await createTempSessionPath(); - const { header } = buildSessionHeaderAndMessage(); - const mediaUserEntry = { - type: "message", - id: "msg-media", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "user", - content: [ - { type: "text", text: " " }, - { type: "image", data: "AA==", mimeType: "image/png" }, - ], - }, - }; - writeTranscriptEvents(file, [header, mediaUserEntry]); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(true); - expect(result.rewrittenUserMessages).toBe(1); - const repaired = await readTranscriptJsonl(file); - const repairedEntry = JSON.parse(repaired.trim().split("\n")[1] ?? "{}"); - expect(repairedEntry.message.content).toEqual([ - { type: "image", data: "AA==", mimeType: "image/png" }, - ]); - }); - - it("reports both drops and rewrites in the debug message when both occur", async () => { - const { file } = await createTempSessionPath(); - const { header } = buildSessionHeaderAndMessage(); - const poisonedAssistantEntry = { - type: "message", - id: "msg-2", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [], - api: "bedrock-converse-stream", - provider: "amazon-bedrock", - model: "anthropic.claude-3-haiku-20240307-v1:0", - usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 }, - stopReason: "error", - }, - }; - writeTranscriptEvents(file, [ - header, - poisonedAssistantEntry, - { type: "message", id: "corrupt", message: { role: null, content: "bad" } }, - ]); - - const debug = vi.fn(); - const result = await repairSessionFileIfNeeded({ sessionFile: file, debug }); - - expect(result.repaired).toBe(true); - expect(result.droppedLines).toBe(1); - expect(result.rewrittenAssistantMessages).toBe(1); - const debugMessage = debug.mock.calls[0]?.[0] as string; - expect(debugMessage).toContain("dropped 1 malformed line(s)"); - expect(debugMessage).toContain("rewrote 1 assistant message(s)"); - }); - - it("does not rewrite silent-reply turns (stopReason=stop, content=[]) on disk", async () => { - const { file } = await createTempSessionPath(); - const { header } = buildSessionHeaderAndMessage(); - const silentReplyEntry = { - type: "message", - id: "msg-2", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [], - api: "openai-responses", - provider: "ollama", - model: "glm-5.1:cloud", - usage: { input: 100, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 100 }, - stopReason: "stop", - }, - }; - // Follow-up keeps this case focused on silent-reply preservation. - const followUp = { - type: "message", - id: "msg-3", - parentId: null, - timestamp: new Date().toISOString(), - message: { role: "user", content: "follow up" }, - }; - const original = `${JSON.stringify(header)}\n${JSON.stringify(silentReplyEntry)}\n${JSON.stringify(followUp)}\n`; - await fs.writeFile(file, original, "utf-8"); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(false); - expect(result.rewrittenAssistantMessages ?? 0).toBe(0); - const after = await fs.readFile(file, "utf-8"); - expect(after).toBe(original); - }); - - it("preserves delivered trailing assistant messages in the session file", async () => { - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - const assistantEntry = { - type: "message", - id: "msg-asst", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [{ type: "text", text: "stale answer" }], - stopReason: "stop", - }, - }; - const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(assistantEntry)}\n`; - await fs.writeFile(file, original, "utf-8"); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(false); - - const after = await fs.readFile(file, "utf-8"); - expect(after).toBe(original); - }); - - it("preserves multiple consecutive delivered trailing assistant messages", async () => { - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - const assistantEntry1 = { - type: "message", - id: "msg-asst-1", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [{ type: "text", text: "first" }], - stopReason: "stop", - }, - }; - const assistantEntry2 = { - type: "message", - id: "msg-asst-2", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [{ type: "text", text: "second" }], - stopReason: "stop", - }, - }; - const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(assistantEntry1)}\n${JSON.stringify(assistantEntry2)}\n`; - await fs.writeFile(file, original, "utf-8"); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(false); - - const after = await fs.readFile(file, "utf-8"); - expect(after).toBe(original); - }); - - it("does not trim non-trailing assistant messages", async () => { - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - const assistantEntry = { - type: "message", - id: "msg-asst", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [{ type: "text", text: "answer" }], - stopReason: "stop", - }, - }; - const userFollowUp = { - type: "message", - id: "msg-user-2", - parentId: null, - timestamp: new Date().toISOString(), - message: { role: "user", content: "follow up" }, - }; - const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(assistantEntry)}\n${JSON.stringify(userFollowUp)}\n`; - await fs.writeFile(file, original, "utf-8"); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(false); - }); - - it("preserves trailing assistant messages that contain tool calls", async () => { - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - const toolCallAssistant = { - type: "message", - id: "msg-asst-tc", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [ - { type: "text", text: "Let me check that." }, - { type: "toolCall", id: "call_1", name: "read", input: { path: "/tmp/test" } }, - ], - stopReason: "toolUse", - }, - }; - const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(toolCallAssistant)}\n`; - await fs.writeFile(file, original, "utf-8"); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(false); - const after = await fs.readFile(file, "utf-8"); - expect(after).toBe(original); - }); - - it("preserves adjacent trailing tool-call and text assistant messages", async () => { - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - const toolCallAssistant = { - type: "message", - id: "msg-asst-tc", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [{ type: "toolUse", id: "call_1", name: "read" }], - stopReason: "toolUse", - }, - }; - const plainAssistant = { - type: "message", - id: "msg-asst-plain", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [{ type: "text", text: "stale" }], - stopReason: "stop", - }, - }; - const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(toolCallAssistant)}\n${JSON.stringify(plainAssistant)}\n`; - await fs.writeFile(file, original, "utf-8"); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(false); - - const after = await fs.readFile(file, "utf-8"); - expect(after).toBe(original); - }); - - it("preserves final text assistant turn that follows a tool-call/tool-result pair", async () => { - // Regression: a trailing assistant message with stopReason "stop" that follows a - // tool-call turn and its matching tool-result must never be trimmed by the repair - // pass. This is the exact sequence produced by any agent run that calls at least - // one tool before returning a final text response, and it must survive intact so - // subsequent user messages are parented to the correct leaf node. - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - const toolCallAssistant = { - type: "message", - id: "msg-asst-tc", - parentId: "msg-1", - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [{ type: "toolCall", id: "call_1", name: "get_tasks", input: {} }], - stopReason: "toolUse", - }, - }; - const toolResult = { - type: "message", - id: "msg-tool-result", - parentId: "msg-asst-tc", - timestamp: new Date().toISOString(), - message: { - role: "toolResult", - toolCallId: "call_1", - toolName: "get_tasks", - content: [{ type: "text", text: "Task A, Task B" }], - isError: false, - }, - }; - const finalAssistant = { - type: "message", - id: "msg-asst-final", - parentId: "msg-tool-result", - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [{ type: "text", text: "Here are your tasks: Task A, Task B." }], - stopReason: "stop", - }, - }; - const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(toolCallAssistant)}\n${JSON.stringify(toolResult)}\n${JSON.stringify(finalAssistant)}\n`; - await fs.writeFile(file, original, "utf-8"); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(false); - - const after = await fs.readFile(file, "utf-8"); - expect(after).toBe(original); - }); - - it("preserves assistant-only session history after the header", async () => { - const { file } = await createTempSessionPath(); - const { header } = buildSessionHeaderAndMessage(); - const assistantEntry = { - type: "message", - id: "msg-asst", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [{ type: "text", text: "orphan" }], - stopReason: "stop", - }, - }; - const original = `${JSON.stringify(header)}\n${JSON.stringify(assistantEntry)}\n`; - await fs.writeFile(file, original, "utf-8"); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(false); - - const after = await fs.readFile(file, "utf-8"); - expect(after).toBe(original); - }); - - it("is a no-op on a session that was already repaired", async () => { - const { file } = await createTempSessionPath(); - const { header } = buildSessionHeaderAndMessage(); - const healedEntry = { - type: "message", - id: "msg-2", - parentId: null, - timestamp: new Date().toISOString(), - message: { - role: "assistant", - content: [{ type: "text", text: "[assistant turn failed before producing content]" }], - api: "bedrock-converse-stream", - provider: "amazon-bedrock", - model: "anthropic.claude-3-haiku-20240307-v1:0", - usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 }, - stopReason: "error", - }, - }; - // Follow-up keeps this case focused on idempotent empty error-turn repair. - const followUp = { - type: "message", - id: "msg-3", - parentId: null, - timestamp: new Date().toISOString(), - message: { role: "user", content: "follow up" }, - }; - const original = `${JSON.stringify(header)}\n${JSON.stringify(healedEntry)}\n${JSON.stringify(followUp)}\n`; - await fs.writeFile(file, original, "utf-8"); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(false); - expect(result.rewrittenAssistantMessages ?? 0).toBe(0); - const after = await fs.readFile(file, "utf-8"); - expect(after).toBe(original); - }); - - it("drops type:message entries with null role instead of preserving them through repair (#77228)", async () => { - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - - const nullRoleEntry = { - type: "message", - id: "corrupt-1", - parentId: null, - timestamp: new Date().toISOString(), - message: { role: null, content: "ignored" }, - }; - const missingRoleEntry = { - type: "message", - id: "corrupt-2", - parentId: null, - timestamp: new Date().toISOString(), - message: { content: "no role at all" }, - }; - const emptyRoleEntry = { - type: "message", - id: "corrupt-3", - parentId: null, - timestamp: new Date().toISOString(), - message: { role: " ", content: "blank role" }, - }; - - writeTranscriptEvents(file, [header, message, nullRoleEntry, missingRoleEntry, emptyRoleEntry]); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(true); - expect(result.droppedLines).toBe(3); - const after = await readTranscriptJsonl(file); - const lines = after.trimEnd().split("\n"); - expect(lines).toHaveLength(2); - expect(JSON.parse(lines[0])).toEqual(header); - expect(JSON.parse(lines[1])).toEqual(message); - expect(after).not.toContain('"role":null'); - }); - - it("drops a type:message entry whose message field is missing or non-object", async () => { - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - - const missingMessage = { - type: "message", - id: "corrupt-4", - parentId: null, - timestamp: new Date().toISOString(), - }; - const stringMessage = { - type: "message", - id: "corrupt-5", - parentId: null, - timestamp: new Date().toISOString(), - message: "not an object", - }; - - writeTranscriptEvents(file, [header, message, missingMessage, stringMessage]); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(true); - expect(result.droppedLines).toBe(2); - - const after = await readTranscriptJsonl(file); - const lines = after.trimEnd().split("\n"); - expect(lines).toHaveLength(2); - }); - - it("preserves non-`message` envelope types (e.g. compactionSummary, custom) without role inspection", async () => { - const { file } = await createTempSessionPath(); - const { header, message } = buildSessionHeaderAndMessage(); - - const summary = { - type: "summary", - id: "summary-1", - timestamp: new Date().toISOString(), - summary: "opaque summary blob", - }; - const custom = { - type: "custom", - id: "custom-1", - customType: "model-snapshot", - timestamp: new Date().toISOString(), - data: { provider: "openai", modelApi: "openai-responses", modelId: "gpt-5" }, - }; - - const content = [ - JSON.stringify(header), - JSON.stringify(message), - JSON.stringify(summary), - JSON.stringify(custom), - ].join("\n"); - await fs.writeFile(file, `${content}\n`, "utf-8"); - - const result = await repairSessionFileIfNeeded({ sessionFile: file }); - - expect(result.repaired).toBe(false); - expect(result.droppedLines).toBe(0); - const after = await fs.readFile(file, "utf-8"); - expect(after).toBe(`${content}\n`); - }); -}); diff --git a/src/agents/session-file-repair.ts b/src/agents/session-file-repair.ts deleted file mode 100644 index 8fba0236028..00000000000 --- a/src/agents/session-file-repair.ts +++ /dev/null @@ -1,296 +0,0 @@ -import path from "node:path"; -import { - loadSqliteSessionTranscriptEvents, - replaceSqliteSessionTranscriptEvents, - resolveSqliteSessionTranscriptScopeForPath, -} from "../config/sessions/transcript-store.sqlite.js"; -import { STREAM_ERROR_FALLBACK_TEXT } from "./stream-message-shared.js"; - -/** Placeholder for blank user messages — preserves the user turn so strict - * providers that require at least one user message don't reject the transcript. */ -export const BLANK_USER_FALLBACK_TEXT = "(continue)"; - -type RepairReport = { - repaired: boolean; - droppedLines: number; - rewrittenAssistantMessages?: number; - droppedBlankUserMessages?: number; - rewrittenUserMessages?: number; - backupPath?: string; - reason?: string; -}; - -// The sentinel text is shared with stream-message-shared.ts and -// replay-history.ts so a repaired entry is byte-identical to a live -// stream-error turn, keeping the repair pass idempotent. - -type SessionMessageEntry = { - type: "message"; - message: { role: string; content?: unknown } & Record; -} & Record; - -function isSessionHeader(entry: unknown): entry is { type: string; id: string } { - if (!entry || typeof entry !== "object") { - return false; - } - const record = entry as { type?: unknown; id?: unknown }; - return record.type === "session" && typeof record.id === "string" && record.id.length > 0; -} - -/** - * Detect a `type: "message"` entry whose `message.role` is missing, `null`, or - * not a non-empty string. Such entries surface in the wild as "null role" - * JSONL corruption (e.g. #77228 reported transcripts that contained 935+ - * entries with null roles after an earlier failure). They cannot be replayed - * to any provider — every provider router branches on `message.role` — and - * preserving them through repair just relocates the corruption from the - * original file into the post-repair file. Treat them as malformed lines: - * drop during repair so the cleaned transcript no longer carries them. - */ -function isStructurallyInvalidMessageEntry(entry: unknown): boolean { - if (!entry || typeof entry !== "object") { - return false; - } - const record = entry as { type?: unknown; message?: unknown }; - if (record.type !== "message") { - return false; - } - if (!record.message || typeof record.message !== "object") { - return true; - } - const role = (record.message as { role?: unknown }).role; - return typeof role !== "string" || role.trim().length === 0; -} - -function isAssistantEntryWithEmptyContent(entry: unknown): entry is SessionMessageEntry { - if (!entry || typeof entry !== "object") { - return false; - } - const record = entry as { type?: unknown; message?: unknown }; - if (record.type !== "message" || !record.message || typeof record.message !== "object") { - return false; - } - const message = record.message as { - role?: unknown; - content?: unknown; - stopReason?: unknown; - }; - if (message.role !== "assistant") { - return false; - } - if (!Array.isArray(message.content) || message.content.length !== 0) { - return false; - } - // Only error stops — clean stops with empty content (NO_REPLY path) are - // valid silent replies that must not be overwritten with synthetic text. - return message.stopReason === "error"; -} - -function rewriteAssistantEntryWithEmptyContent(entry: SessionMessageEntry): SessionMessageEntry { - return { - ...entry, - message: { - ...entry.message, - content: [{ type: "text", text: STREAM_ERROR_FALLBACK_TEXT }], - }, - }; -} - -type UserEntryRepair = - | { kind: "drop" } - | { kind: "rewrite"; entry: SessionMessageEntry } - | { kind: "keep" }; - -function repairUserEntryWithBlankTextContent(entry: SessionMessageEntry): UserEntryRepair { - const content = entry.message.content; - if (typeof content === "string") { - if (content.trim()) { - return { kind: "keep" }; - } - return { - kind: "rewrite", - entry: { - ...entry, - message: { - ...entry.message, - content: BLANK_USER_FALLBACK_TEXT, - }, - }, - }; - } - if (!Array.isArray(content)) { - return { kind: "keep" }; - } - - let touched = false; - const nextContent = content.filter((block) => { - if (!block || typeof block !== "object") { - return true; - } - if ((block as { type?: unknown }).type !== "text") { - return true; - } - const text = (block as { text?: unknown }).text; - if (typeof text !== "string" || text.trim().length > 0) { - return true; - } - touched = true; - return false; - }); - if (nextContent.length === 0) { - return { - kind: "rewrite", - entry: { - ...entry, - message: { - ...entry.message, - content: [{ type: "text", text: BLANK_USER_FALLBACK_TEXT }], - }, - }, - }; - } - if (!touched) { - return { kind: "keep" }; - } - return { - kind: "rewrite", - entry: { - ...entry, - message: { - ...entry.message, - content: nextContent, - }, - }, - }; -} - -function buildRepairSummaryParts(params: { - droppedLines: number; - rewrittenAssistantMessages: number; - droppedBlankUserMessages: number; - rewrittenUserMessages: number; -}): string { - const parts: string[] = []; - if (params.droppedLines > 0) { - parts.push(`dropped ${params.droppedLines} malformed line(s)`); - } - if (params.rewrittenAssistantMessages > 0) { - parts.push(`rewrote ${params.rewrittenAssistantMessages} assistant message(s)`); - } - if (params.droppedBlankUserMessages > 0) { - parts.push(`dropped ${params.droppedBlankUserMessages} blank user message(s)`); - } - if (params.rewrittenUserMessages > 0) { - parts.push(`rewrote ${params.rewrittenUserMessages} user message(s)`); - } - return parts.length > 0 ? parts.join(", ") : "no changes"; -} - -export async function repairSessionFileIfNeeded(params: { - sessionFile: string; - debug?: (message: string) => void; - warn?: (message: string) => void; -}): Promise { - const sessionFile = params.sessionFile.trim(); - if (!sessionFile) { - return { repaired: false, droppedLines: 0, reason: "missing session file" }; - } - - const scope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath: sessionFile }); - if (!scope) { - return { repaired: false, droppedLines: 0, reason: "missing SQLite transcript" }; - } - - const storedEntries = loadSqliteSessionTranscriptEvents(scope).map((entry) => entry.event); - const entries: unknown[] = []; - let droppedLines = 0; - let rewrittenAssistantMessages = 0; - let droppedBlankUserMessages = 0; - let rewrittenUserMessages = 0; - - for (const entry of storedEntries) { - if (isStructurallyInvalidMessageEntry(entry)) { - // Drop "null role" / missing-role message entries the same way the old - // JSONL repair dropped malformed lines: providers cannot replay them. - droppedLines += 1; - continue; - } - if (isAssistantEntryWithEmptyContent(entry)) { - entries.push(rewriteAssistantEntryWithEmptyContent(entry)); - rewrittenAssistantMessages += 1; - continue; - } - if ( - entry && - typeof entry === "object" && - (entry as { type?: unknown }).type === "message" && - typeof (entry as { message?: unknown }).message === "object" && - ((entry as { message: { role?: unknown } }).message?.role ?? undefined) === "user" - ) { - const repairedUser = repairUserEntryWithBlankTextContent(entry as SessionMessageEntry); - if (repairedUser.kind === "drop") { - droppedBlankUserMessages += 1; - continue; - } - if (repairedUser.kind === "rewrite") { - entries.push(repairedUser.entry); - rewrittenUserMessages += 1; - continue; - } - } - entries.push(entry); - } - - if (entries.length === 0) { - return { repaired: false, droppedLines, reason: "empty session file" }; - } - - if (!isSessionHeader(entries[0])) { - params.warn?.( - `session file repair skipped: invalid session header (${path.basename(sessionFile)})`, - ); - return { repaired: false, droppedLines, reason: "invalid session header" }; - } - - if ( - droppedLines === 0 && - rewrittenAssistantMessages === 0 && - droppedBlankUserMessages === 0 && - rewrittenUserMessages === 0 - ) { - return { repaired: false, droppedLines: 0 }; - } - - try { - replaceSqliteSessionTranscriptEvents({ - ...scope, - transcriptPath: sessionFile, - events: entries, - }); - } catch (err) { - return { - repaired: false, - droppedLines, - rewrittenAssistantMessages, - droppedBlankUserMessages, - rewrittenUserMessages, - reason: `repair failed: ${err instanceof Error ? err.message : "unknown error"}`, - }; - } - - params.debug?.( - `session file repaired: ${buildRepairSummaryParts({ - droppedLines, - rewrittenAssistantMessages, - droppedBlankUserMessages, - rewrittenUserMessages, - })} (${path.basename(sessionFile)})`, - ); - return { - repaired: true, - droppedLines, - rewrittenAssistantMessages, - droppedBlankUserMessages, - rewrittenUserMessages, - }; -} diff --git a/src/agents/session-write-lock-error.ts b/src/agents/session-write-lock-error.ts deleted file mode 100644 index 66db9d22b53..00000000000 --- a/src/agents/session-write-lock-error.ts +++ /dev/null @@ -1,29 +0,0 @@ -const SESSION_WRITE_LOCK_TIMEOUT_CODE = "OPENCLAW_SESSION_WRITE_LOCK_TIMEOUT"; - -export class SessionWriteLockTimeoutError extends Error { - readonly code = SESSION_WRITE_LOCK_TIMEOUT_CODE; - readonly timeoutMs: number; - readonly owner: string; - readonly lockPath: string; - - constructor(params: { timeoutMs: number; owner: string; lockPath: string }) { - super( - `session file locked (timeout ${params.timeoutMs}ms): ${params.owner} ${params.lockPath}`, - ); - this.name = "SessionWriteLockTimeoutError"; - this.timeoutMs = params.timeoutMs; - this.owner = params.owner; - this.lockPath = params.lockPath; - } -} - -export function isSessionWriteLockTimeoutError(err: unknown): boolean { - return ( - err instanceof SessionWriteLockTimeoutError || - Boolean( - err && - typeof err === "object" && - (err as { code?: unknown }).code === SESSION_WRITE_LOCK_TIMEOUT_CODE, - ) - ); -} diff --git a/src/agents/session-write-lock.test.ts b/src/agents/session-write-lock.test.ts deleted file mode 100644 index bc4e73d9884..00000000000 --- a/src/agents/session-write-lock.test.ts +++ /dev/null @@ -1,766 +0,0 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; -import { afterEach, beforeAll, describe, expect, it, vi } from "vitest"; - -const FAKE_STARTTIME = 12345; -let __testing: typeof import("./session-write-lock.js").__testing; -let acquireSessionWriteLock: typeof import("./session-write-lock.js").acquireSessionWriteLock; -let cleanStaleLockFiles: typeof import("./session-write-lock.js").cleanStaleLockFiles; -let resetSessionWriteLockStateForTest: typeof import("./session-write-lock.js").resetSessionWriteLockStateForTest; -let resolveSessionLockMaxHoldFromTimeout: typeof import("./session-write-lock.js").resolveSessionLockMaxHoldFromTimeout; -let resolveSessionWriteLockAcquireTimeoutMs: typeof import("./session-write-lock.js").resolveSessionWriteLockAcquireTimeoutMs; - -async function expectLockRemovedOnlyAfterFinalRelease(params: { - lockPath: string; - firstLock: { release: () => Promise }; - secondLock: { release: () => Promise }; -}) { - await expect(fs.access(params.lockPath)).resolves.toBeUndefined(); - await params.firstLock.release(); - await expect(fs.access(params.lockPath)).resolves.toBeUndefined(); - await params.secondLock.release(); - await expectPathMissing(params.lockPath); -} - -async function expectPathMissing(targetPath: string): Promise { - try { - await fs.access(targetPath); - } catch (error) { - expect((error as NodeJS.ErrnoException).code).toBe("ENOENT"); - return; - } - throw new Error(`Expected path to be missing: ${targetPath}`); -} - -async function expectCurrentPidOwnsLock(params: { - sessionFile: string; - timeoutMs: number; - staleMs?: number; -}) { - const { sessionFile, timeoutMs, staleMs } = params; - const lockPath = `${sessionFile}.lock`; - const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs, staleMs }); - const raw = await fs.readFile(lockPath, "utf8"); - const payload = JSON.parse(raw) as { pid: number }; - expect(payload.pid).toBe(process.pid); - await lock.release(); -} - -async function withTempSessionLockFile( - run: (params: { root: string; sessionFile: string; lockPath: string }) => Promise, -) { - const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-")); - try { - const sessionFile = path.join(root, "sessions.json"); - await run({ root, sessionFile, lockPath: `${sessionFile}.lock` }); - } finally { - await fs.rm(root, { recursive: true, force: true }); - } -} - -async function writeCurrentProcessLock(lockPath: string, extra?: Record) { - await fs.writeFile( - lockPath, - JSON.stringify({ - pid: process.pid, - createdAt: new Date().toISOString(), - ...extra, - }), - "utf8", - ); -} - -async function withSymlinkedSessionPaths( - run: (params: { - sessionReal: string; - sessionLink: string; - realLockPath: string; - linkLockPath: string; - }) => Promise, -) { - if (process.platform === "win32") { - return; - } - - const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-")); - try { - const realDir = path.join(root, "real"); - const linkDir = path.join(root, "link"); - await fs.mkdir(realDir, { recursive: true }); - await fs.symlink(realDir, linkDir); - - const sessionReal = path.join(realDir, "sessions.json"); - const sessionLink = path.join(linkDir, "sessions.json"); - await run({ - sessionReal, - sessionLink, - realLockPath: `${sessionReal}.lock`, - linkLockPath: `${sessionLink}.lock`, - }); - } finally { - await fs.rm(root, { recursive: true, force: true }); - } -} - -async function expectActiveInProcessLockIsNotReclaimed(params?: { - legacyStarttime?: unknown; -}): Promise { - await withTempSessionLockFile(async ({ sessionFile, lockPath }) => { - const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); - const lockPayload = { - pid: process.pid, - createdAt: new Date().toISOString(), - ...(params && "legacyStarttime" in params ? { starttime: params.legacyStarttime } : {}), - }; - await fs.writeFile(lockPath, JSON.stringify(lockPayload), "utf8"); - - await expect( - acquireSessionWriteLock({ - sessionFile, - timeoutMs: 5, - allowReentrant: false, - }), - ).rejects.toThrow(/session file locked/); - await lock.release(); - }); -} - -describe("acquireSessionWriteLock", () => { - beforeAll(async () => { - ({ - __testing, - acquireSessionWriteLock, - cleanStaleLockFiles, - resetSessionWriteLockStateForTest, - resolveSessionLockMaxHoldFromTimeout, - resolveSessionWriteLockAcquireTimeoutMs, - } = await import("./session-write-lock.js")); - }); - - afterEach(() => { - resetSessionWriteLockStateForTest(); - vi.clearAllMocks(); - }); - - function pinCurrentProcessStartTimeForTest(): void { - __testing.setProcessStartTimeResolverForTest((pid) => - pid === process.pid ? FAKE_STARTTIME : null, - ); - } - it("reuses locks across symlinked session paths", async () => { - await withSymlinkedSessionPaths( - async ({ sessionReal, sessionLink, realLockPath, linkLockPath }) => { - const lockA = await acquireSessionWriteLock({ - sessionFile: sessionReal, - timeoutMs: 500, - allowReentrant: true, - }); - const lockB = await acquireSessionWriteLock({ - sessionFile: sessionLink, - timeoutMs: 500, - allowReentrant: true, - }); - - await expect(fs.access(realLockPath)).resolves.toBeUndefined(); - await expect(fs.access(linkLockPath)).resolves.toBeUndefined(); - const [realCanonicalLockPath, linkCanonicalLockPath] = await Promise.all([ - fs.realpath(realLockPath), - fs.realpath(linkLockPath), - ]); - expect(linkCanonicalLockPath).toBe(realCanonicalLockPath); - await expectLockRemovedOnlyAfterFinalRelease({ - lockPath: realLockPath, - firstLock: lockA, - secondLock: lockB, - }); - }, - ); - }); - - it("keeps the lock file until the last release", async () => { - await withTempSessionLockFile(async ({ sessionFile, lockPath }) => { - const lockA = await acquireSessionWriteLock({ - sessionFile, - timeoutMs: 500, - allowReentrant: true, - }); - const lockB = await acquireSessionWriteLock({ - sessionFile, - timeoutMs: 500, - allowReentrant: true, - }); - - await expectLockRemovedOnlyAfterFinalRelease({ - lockPath, - firstLock: lockA, - secondLock: lockB, - }); - }); - }); - - it("does not reenter locks by default in the same process", async () => { - await withTempSessionLockFile(async ({ sessionFile }) => { - const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); - await expect( - acquireSessionWriteLock({ sessionFile, timeoutMs: 5, staleMs: 60_000 }), - ).rejects.toThrow(/session file locked/); - await lock.release(); - }); - }); - - it("does not reenter locks by default through symlinked session paths", async () => { - await withSymlinkedSessionPaths(async ({ sessionReal, sessionLink }) => { - const lock = await acquireSessionWriteLock({ sessionFile: sessionReal, timeoutMs: 500 }); - - await expect( - acquireSessionWriteLock({ sessionFile: sessionLink, timeoutMs: 5, staleMs: 60_000 }), - ).rejects.toThrow(/session file locked/); - - await lock.release(); - }); - }); - - it("allows a new default lock acquisition after the held lock is released", async () => { - await withTempSessionLockFile(async ({ sessionFile }) => { - const lockA = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); - await expect( - acquireSessionWriteLock({ sessionFile, timeoutMs: 5, staleMs: 60_000 }), - ).rejects.toThrow(/session file locked/); - await lockA.release(); - - const lockB = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); - await lockB.release(); - }); - }); - - it("reclaims stale lock files", async () => { - const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-")); - try { - const sessionFile = path.join(root, "sessions.json"); - const lockPath = `${sessionFile}.lock`; - await fs.writeFile( - lockPath, - JSON.stringify({ pid: 2 ** 30, createdAt: new Date(Date.now() - 60_000).toISOString() }), - "utf8", - ); - - await expectCurrentPidOwnsLock({ sessionFile, timeoutMs: 500, staleMs: 10 }); - } finally { - await fs.rm(root, { recursive: true, force: true }); - } - }); - - it("does not reclaim fresh malformed lock files during contention", async () => { - const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-")); - try { - const sessionFile = path.join(root, "sessions.json"); - const lockPath = `${sessionFile}.lock`; - await fs.writeFile(lockPath, "{}", "utf8"); - - await expect( - acquireSessionWriteLock({ sessionFile, timeoutMs: 5, staleMs: 60_000 }), - ).rejects.toThrow(/session file locked/); - await expect(fs.access(lockPath)).resolves.toBeUndefined(); - } finally { - await fs.rm(root, { recursive: true, force: true }); - } - }); - - it("reclaims payload-less orphan lock files after the short init grace", async () => { - await withTempSessionLockFile(async ({ sessionFile, lockPath }) => { - await fs.writeFile(lockPath, "", "utf8"); - const orphanDate = new Date(Date.now() - 10_000); - await fs.utimes(lockPath, orphanDate, orphanDate); - - const lock = await acquireSessionWriteLock({ - sessionFile, - timeoutMs: 10_000, - staleMs: 60_000, - }); - const raw = await fs.readFile(lockPath, "utf8"); - const payload = JSON.parse(raw) as { pid?: unknown }; - expect(payload.pid).toBe(process.pid); - await lock.release(); - }); - }); - - it("reclaims malformed lock files once they are old enough", async () => { - await withTempSessionLockFile(async ({ sessionFile, lockPath }) => { - await fs.writeFile(lockPath, "{}", "utf8"); - const staleDate = new Date(Date.now() - 2 * 60_000); - await fs.utimes(lockPath, staleDate, staleDate); - - const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500, staleMs: 10_000 }); - await lock.release(); - await expectPathMissing(lockPath); - }); - }); - - it("watchdog releases stale in-process locks", async () => { - const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-")); - const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true); - try { - const sessionFile = path.join(root, "session.jsonl"); - const lockPath = `${sessionFile}.lock`; - const lockA = await acquireSessionWriteLock({ - sessionFile, - timeoutMs: 500, - maxHoldMs: 1, - }); - - const released = await __testing.runLockWatchdogCheck(Date.now() + 1000); - expect(released).toBeGreaterThanOrEqual(1); - await expectPathMissing(lockPath); - - const lockB = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); - await expect(fs.access(lockPath)).resolves.toBeUndefined(); - - // Old release handle must not affect the new lock. - await expectLockRemovedOnlyAfterFinalRelease({ - lockPath, - firstLock: lockA, - secondLock: lockB, - }); - } finally { - stderrSpy.mockRestore(); - await fs.rm(root, { recursive: true, force: true }); - } - }); - - it("removes lock files during process-exit cleanup", async () => { - await withTempSessionLockFile(async ({ sessionFile, lockPath }) => { - const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); - - __testing.releaseAllLocksSync(); - - await expectPathMissing(lockPath); - await lock.release(); - }); - }); - - it("derives max hold from timeout plus grace", () => { - expect(resolveSessionLockMaxHoldFromTimeout({ timeoutMs: 600_000 })).toBe(720_000); - expect(resolveSessionLockMaxHoldFromTimeout({ timeoutMs: 1_000, minMs: 5_000 })).toBe(121_000); - }); - - it("resolves the session write-lock acquire timeout", () => { - expect(resolveSessionWriteLockAcquireTimeoutMs()).toBe(60_000); - expect( - resolveSessionWriteLockAcquireTimeoutMs({ - session: { writeLock: { acquireTimeoutMs: 90_000 } }, - }), - ).toBe(90_000); - expect( - resolveSessionWriteLockAcquireTimeoutMs({ - session: { writeLock: { acquireTimeoutMs: 0 } }, - }), - ).toBe(60_000); - }); - - it("clamps max hold for effectively no-timeout runs", () => { - expect( - resolveSessionLockMaxHoldFromTimeout({ - timeoutMs: 2_147_000_000, - }), - ).toBe(2_147_000_000); - }); - - it("cleans stale .jsonl lock files in sessions directories", async () => { - const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-")); - const sessionsDir = path.join(root, "sessions"); - await fs.mkdir(sessionsDir, { recursive: true }); - - const nowMs = Date.now(); - const staleDeadLock = path.join(sessionsDir, "dead.jsonl.lock"); - const staleAliveLock = path.join(sessionsDir, "old-live.jsonl.lock"); - const freshAliveLock = path.join(sessionsDir, "fresh-live.jsonl.lock"); - - try { - await fs.writeFile( - staleDeadLock, - JSON.stringify({ - pid: 999_999, - createdAt: new Date(nowMs - 120_000).toISOString(), - }), - "utf8", - ); - await fs.writeFile( - staleAliveLock, - JSON.stringify({ - pid: process.pid, - createdAt: new Date(nowMs - 120_000).toISOString(), - }), - "utf8", - ); - await fs.writeFile( - freshAliveLock, - JSON.stringify({ - pid: process.pid, - createdAt: new Date(nowMs - 1_000).toISOString(), - }), - "utf8", - ); - - const result = await cleanStaleLockFiles({ - sessionsDir, - staleMs: 30_000, - nowMs, - removeStale: true, - readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "agent"], - }); - - expect(result.locks).toHaveLength(3); - expect(result.cleaned).toHaveLength(2); - expect(result.cleaned.map((entry) => path.basename(entry.lockPath)).toSorted()).toEqual([ - "dead.jsonl.lock", - "old-live.jsonl.lock", - ]); - - await expectPathMissing(staleDeadLock); - await expectPathMissing(staleAliveLock); - await expect(fs.access(freshAliveLock)).resolves.toBeUndefined(); - } finally { - await fs.rm(root, { recursive: true, force: true }); - } - }); - - it("cleans fresh live .jsonl lock files owned by a non-OpenClaw process", async () => { - const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-")); - const sessionsDir = path.join(root, "sessions"); - await fs.mkdir(sessionsDir, { recursive: true }); - - const nowMs = Date.now(); - const falseLiveLock = path.join(sessionsDir, "false-live.jsonl.lock"); - - try { - await fs.writeFile( - falseLiveLock, - JSON.stringify({ - pid: process.pid, - createdAt: new Date(nowMs).toISOString(), - }), - "utf8", - ); - - const result = await cleanStaleLockFiles({ - sessionsDir, - staleMs: 30_000, - nowMs, - removeStale: true, - readOwnerProcessArgs: () => ["python", "worker.py"], - }); - - expect(result.locks).toHaveLength(1); - expect(result.cleaned.map((entry) => path.basename(entry.lockPath))).toEqual([ - "false-live.jsonl.lock", - ]); - expect(result.cleaned[0]?.staleReasons).toContain("non-openclaw-owner"); - await expect(fs.access(falseLiveLock)).rejects.toThrow(); - } finally { - await fs.rm(root, { recursive: true, force: true }); - } - }); - - it("cleans fresh live .jsonl lock files owned by generic non-OpenClaw entrypoints", async () => { - const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-")); - const sessionsDir = path.join(root, "sessions"); - await fs.mkdir(sessionsDir, { recursive: true }); - - const nowMs = Date.now(); - const falseLiveLock = path.join(sessionsDir, "false-live-generic-entry.jsonl.lock"); - - try { - await fs.writeFile( - falseLiveLock, - JSON.stringify({ - pid: process.pid, - createdAt: new Date(nowMs).toISOString(), - }), - "utf8", - ); - - const result = await cleanStaleLockFiles({ - sessionsDir, - staleMs: 30_000, - nowMs, - removeStale: true, - readOwnerProcessArgs: () => ["node", "/srv/app/dist/index.js"], - }); - - expect(result.cleaned.map((entry) => path.basename(entry.lockPath))).toEqual([ - "false-live-generic-entry.jsonl.lock", - ]); - expect(result.cleaned[0]?.staleReasons).toContain("non-openclaw-owner"); - await expect(fs.access(falseLiveLock)).rejects.toThrow(); - } finally { - await fs.rm(root, { recursive: true, force: true }); - } - }); - - it("keeps fresh live .jsonl lock files with OpenClaw or unknown owners", async () => { - const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-")); - const sessionsDir = path.join(root, "sessions"); - await fs.mkdir(sessionsDir, { recursive: true }); - - const nowMs = Date.now(); - const openclawLock = path.join(sessionsDir, "openclaw-live.jsonl.lock"); - const gatewayLock = path.join(sessionsDir, "gateway-live.jsonl.lock"); - const unknownLock = path.join(sessionsDir, "unknown-live.jsonl.lock"); - - try { - await fs.writeFile( - openclawLock, - JSON.stringify({ - pid: process.pid, - createdAt: new Date(nowMs).toISOString(), - }), - "utf8", - ); - const openclawResult = await cleanStaleLockFiles({ - sessionsDir, - staleMs: 30_000, - nowMs, - removeStale: true, - readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "agent"], - }); - - expect(openclawResult.cleaned).toEqual([]); - await expect(fs.access(openclawLock)).resolves.toBeUndefined(); - - await fs.rm(openclawLock, { force: true }); - await fs.writeFile( - gatewayLock, - JSON.stringify({ - pid: process.pid, - createdAt: new Date(nowMs).toISOString(), - }), - "utf8", - ); - const gatewayResult = await cleanStaleLockFiles({ - sessionsDir, - staleMs: 30_000, - nowMs, - removeStale: true, - readOwnerProcessArgs: () => ["node", "dist/index.js", "gateway", "run"], - }); - - expect(gatewayResult.cleaned).toEqual([]); - await expect(fs.access(gatewayLock)).resolves.toBeUndefined(); - - await fs.rm(gatewayLock, { force: true }); - await fs.writeFile( - unknownLock, - JSON.stringify({ - pid: process.pid, - createdAt: new Date(nowMs).toISOString(), - }), - "utf8", - ); - const unknownResult = await cleanStaleLockFiles({ - sessionsDir, - staleMs: 30_000, - nowMs, - removeStale: true, - readOwnerProcessArgs: () => null, - }); - - expect(unknownResult.cleaned).toEqual([]); - await expect(fs.access(unknownLock)).resolves.toBeUndefined(); - } finally { - await fs.rm(root, { recursive: true, force: true }); - } - }); - - it("cleans untracked current-process .jsonl lock files with matching starttime", async () => { - pinCurrentProcessStartTimeForTest(); - const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-")); - const sessionsDir = path.join(root, "sessions"); - await fs.mkdir(sessionsDir, { recursive: true }); - - const nowMs = Date.now(); - const orphanSelfLock = path.join(sessionsDir, "orphan-self.jsonl.lock"); - - try { - await fs.writeFile( - orphanSelfLock, - JSON.stringify({ - pid: process.pid, - createdAt: new Date(nowMs).toISOString(), - starttime: FAKE_STARTTIME, - }), - "utf8", - ); - - const result = await cleanStaleLockFiles({ - sessionsDir, - staleMs: 30_000, - nowMs, - removeStale: true, - }); - - expect(result.locks).toHaveLength(1); - expect(result.cleaned.map((entry) => path.basename(entry.lockPath))).toEqual([ - "orphan-self.jsonl.lock", - ]); - expect(result.cleaned[0]?.staleReasons).toContain("orphan-self-pid"); - await expectPathMissing(orphanSelfLock); - } finally { - await fs.rm(root, { recursive: true, force: true }); - } - }); - - it("removes held locks on termination signals", async () => { - const signals = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const; - const originalKill = process.kill.bind(process); - process.kill = ((_pid: number, _signal?: NodeJS.Signals) => true) as typeof process.kill; - try { - for (const signal of signals) { - const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-cleanup-")); - try { - const sessionFile = path.join(root, "sessions.json"); - const lockPath = `${sessionFile}.lock`; - await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); - const keepAlive = () => {}; - if (signal === "SIGINT") { - process.on(signal, keepAlive); - } - - __testing.handleTerminationSignal(signal); - - await expectPathMissing(lockPath); - if (signal === "SIGINT") { - process.off(signal, keepAlive); - } - } finally { - await fs.rm(root, { recursive: true, force: true }); - } - } - } finally { - process.kill = originalKill; - } - }); - - it("reclaims lock files with recycled PIDs", async () => { - if (process.platform !== "linux") { - return; - } - await withTempSessionLockFile(async ({ sessionFile, lockPath }) => { - pinCurrentProcessStartTimeForTest(); - // Write a lock with a live PID (current process) but a wrong starttime, - // simulating PID recycling: the PID is alive but belongs to a different - // process than the one that created the lock. - await writeCurrentProcessLock(lockPath, { starttime: 999_999_999 }); - - await expectCurrentPidOwnsLock({ sessionFile, timeoutMs: 500 }); - }); - }); - - it("reclaims orphan lock files without starttime when PID matches current process", async () => { - await withTempSessionLockFile(async ({ sessionFile, lockPath }) => { - // Simulate an old-format lock file left behind by a previous process - // instance that reused the same PID (common in containers). - await writeCurrentProcessLock(lockPath); - - await expectCurrentPidOwnsLock({ sessionFile, timeoutMs: 500 }); - }); - }); - - it("reclaims untracked current-process lock files with matching starttime", async () => { - await withTempSessionLockFile(async ({ sessionFile, lockPath }) => { - pinCurrentProcessStartTimeForTest(); - await writeCurrentProcessLock(lockPath, { starttime: FAKE_STARTTIME }); - - await expectCurrentPidOwnsLock({ sessionFile, timeoutMs: 500 }); - }); - }); - - it("does not reclaim active in-process lock files without starttime", async () => { - await expectActiveInProcessLockIsNotReclaimed(); - }); - - it("does not reclaim active in-process lock files with malformed starttime", async () => { - await expectActiveInProcessLockIsNotReclaimed({ legacyStarttime: 123.5 }); - }); - - it("does not reclaim active in-process lock files with matching starttime", async () => { - pinCurrentProcessStartTimeForTest(); - await expectActiveInProcessLockIsNotReclaimed({ legacyStarttime: FAKE_STARTTIME }); - }); - - it("registers cleanup for SIGQUIT and SIGABRT", () => { - expect(__testing.cleanupSignals).toContain("SIGQUIT"); - expect(__testing.cleanupSignals).toContain("SIGABRT"); - }); - it("cleans up locks on SIGINT without removing other handlers", async () => { - const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-")); - const originalKill = process.kill.bind(process); - const killCalls: Array = []; - let otherHandlerCalled = false; - - process.kill = ((pid: number, signal?: NodeJS.Signals) => { - killCalls.push(signal); - return true; - }) as typeof process.kill; - - const otherHandler = () => { - otherHandlerCalled = true; - }; - - process.on("SIGINT", otherHandler); - - try { - const sessionFile = path.join(root, "sessions.json"); - const lockPath = `${sessionFile}.lock`; - await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); - - __testing.handleTerminationSignal("SIGINT"); - - await expectPathMissing(lockPath); - expect(otherHandlerCalled).toBe(false); - expect(killCalls).toStrictEqual([]); - } finally { - process.off("SIGINT", otherHandler); - process.kill = originalKill; - await fs.rm(root, { recursive: true, force: true }); - } - }); - - it("cleans up locks on exit", async () => { - await withTempSessionLockFile(async ({ sessionFile, lockPath }) => { - await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); - - process.emit("exit", 0); - - await expectPathMissing(lockPath); - }); - }); - - it("does not accumulate exit listeners across reset cycles", async () => { - const baselineExitListeners = process.listenerCount("exit"); - - await withTempSessionLockFile(async ({ sessionFile }) => { - for (let i = 0; i < 3; i += 1) { - const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 }); - await lock.release(); - resetSessionWriteLockStateForTest(); - expect(process.listenerCount("exit")).toBe(baselineExitListeners); - } - }); - }); - - it("keeps other signal listeners registered", () => { - const keepAlive = () => {}; - const originalKill = process.kill.bind(process); - process.kill = ((_pid: number, _signal?: NodeJS.Signals) => true) as typeof process.kill; - process.on("SIGINT", keepAlive); - - try { - __testing.handleTerminationSignal("SIGINT"); - expect(process.listeners("SIGINT")).toContain(keepAlive); - } finally { - process.off("SIGINT", keepAlive); - process.kill = originalKill; - } - }); -}); diff --git a/src/agents/session-write-lock.ts b/src/agents/session-write-lock.ts deleted file mode 100644 index a548be75929..00000000000 --- a/src/agents/session-write-lock.ts +++ /dev/null @@ -1,710 +0,0 @@ -import "../infra/fs-safe-defaults.js"; -import type fsSync from "node:fs"; -import fs from "node:fs/promises"; -import path from "node:path"; -import { createFileLockManager } from "../infra/file-lock-manager.js"; -import { readGatewayProcessArgsSync as readProcessArgsSync } from "../infra/gateway-processes.js"; -import { getProcessStartTime, isPidAlive } from "../shared/pid-alive.js"; -import { SessionWriteLockTimeoutError } from "./session-write-lock-error.js"; - -type LockFilePayload = { - pid?: number; - createdAt?: string; - /** Process start time in clock ticks (from /proc/pid/stat field 22). */ - starttime?: number; -}; - -function isValidLockNumber(value: unknown): value is number { - return typeof value === "number" && Number.isInteger(value) && value >= 0; -} - -export type SessionLockInspection = { - lockPath: string; - pid: number | null; - pidAlive: boolean; - createdAt: string | null; - ageMs: number | null; - stale: boolean; - staleReasons: string[]; - removed: boolean; -}; - -export type SessionLockOwnerProcessArgsReader = (pid: number) => string[] | null; - -const CLEANUP_SIGNALS = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const; -type CleanupSignal = (typeof CLEANUP_SIGNALS)[number]; -const CLEANUP_STATE_KEY = Symbol.for("openclaw.sessionWriteLockCleanupState"); -const WATCHDOG_STATE_KEY = Symbol.for("openclaw.sessionWriteLockWatchdogState"); - -const DEFAULT_STALE_MS = 30 * 60 * 1000; -const DEFAULT_MAX_HOLD_MS = 5 * 60 * 1000; -export const DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS = 60_000; -const DEFAULT_WATCHDOG_INTERVAL_MS = 60_000; -const DEFAULT_TIMEOUT_GRACE_MS = 2 * 60 * 1000; -// A payload-less lock can be left behind if shutdown lands between open("wx") -// and the owner metadata write. Keep the grace short so 10s callers recover. -const ORPHAN_LOCK_PAYLOAD_GRACE_MS = 5_000; -const MAX_LOCK_HOLD_MS = 2_147_000_000; - -type CleanupState = { - registered: boolean; - exitHandler?: () => void; - cleanupHandlers: Map void>; -}; - -type WatchdogState = { - started: boolean; - intervalMs: number; - timer?: NodeJS.Timeout; -}; - -type LockInspectionDetails = Pick< - SessionLockInspection, - "pid" | "pidAlive" | "createdAt" | "ageMs" | "stale" | "staleReasons" ->; - -const SESSION_LOCKS = createFileLockManager("openclaw.session-write-lock"); -let resolveProcessStartTimeForLock = getProcessStartTime; - -function isFileLockError(error: unknown, code: string): boolean { - return (error as { code?: unknown } | null)?.code === code; -} - -export type SessionWriteLockAcquireTimeoutConfig = { - session?: { - writeLock?: { - acquireTimeoutMs?: number; - }; - }; -}; - -export function resolveSessionWriteLockAcquireTimeoutMs( - config?: SessionWriteLockAcquireTimeoutConfig, -): number { - return resolvePositiveMs( - config?.session?.writeLock?.acquireTimeoutMs, - DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS, - { allowInfinity: true }, - ); -} - -function resolveCleanupState(): CleanupState { - const proc = process as NodeJS.Process & { - [CLEANUP_STATE_KEY]?: CleanupState; - }; - if (!proc[CLEANUP_STATE_KEY]) { - proc[CLEANUP_STATE_KEY] = { - registered: false, - exitHandler: undefined, - cleanupHandlers: new Map void>(), - }; - } - return proc[CLEANUP_STATE_KEY]; -} - -function resolveWatchdogState(): WatchdogState { - const proc = process as NodeJS.Process & { - [WATCHDOG_STATE_KEY]?: WatchdogState; - }; - if (!proc[WATCHDOG_STATE_KEY]) { - proc[WATCHDOG_STATE_KEY] = { - started: false, - intervalMs: DEFAULT_WATCHDOG_INTERVAL_MS, - }; - } - return proc[WATCHDOG_STATE_KEY]; -} - -function resolvePositiveMs( - value: number | undefined, - fallback: number, - opts: { allowInfinity?: boolean } = {}, -): number { - if (typeof value !== "number" || Number.isNaN(value) || value <= 0) { - return fallback; - } - if (value === Number.POSITIVE_INFINITY) { - return opts.allowInfinity ? value : fallback; - } - if (!Number.isFinite(value)) { - return fallback; - } - return value; -} - -export function resolveSessionLockMaxHoldFromTimeout(params: { - timeoutMs: number; - graceMs?: number; - minMs?: number; -}): number { - const minMs = resolvePositiveMs(params.minMs, DEFAULT_MAX_HOLD_MS); - const timeoutMs = resolvePositiveMs(params.timeoutMs, minMs, { allowInfinity: true }); - if (timeoutMs === Number.POSITIVE_INFINITY) { - return MAX_LOCK_HOLD_MS; - } - const graceMs = resolvePositiveMs(params.graceMs, DEFAULT_TIMEOUT_GRACE_MS); - return Math.min(MAX_LOCK_HOLD_MS, Math.max(minMs, timeoutMs + graceMs)); -} - -/** - * Synchronously release all held locks. - * Used during process exit when async operations aren't reliable. - */ -function releaseAllLocksSync(): void { - SESSION_LOCKS.reset(); - stopWatchdogTimer(); -} - -async function runLockWatchdogCheck(nowMs = Date.now()): Promise { - let released = 0; - for (const held of SESSION_LOCKS.heldEntries()) { - const maxHoldMs = - typeof held.metadata.maxHoldMs === "number" ? held.metadata.maxHoldMs : DEFAULT_MAX_HOLD_MS; - const heldForMs = nowMs - held.acquiredAt; - if (heldForMs <= maxHoldMs) { - continue; - } - - process.stderr.write( - `[session-write-lock] releasing lock held for ${heldForMs}ms (max=${maxHoldMs}ms): ${held.lockPath}\n`, - ); - - const didRelease = await held.forceRelease(); - if (didRelease) { - released += 1; - } - } - return released; -} - -function stopWatchdogTimer(): void { - const watchdogState = resolveWatchdogState(); - if (watchdogState.timer) { - clearInterval(watchdogState.timer); - watchdogState.timer = undefined; - } - watchdogState.started = false; -} - -function shouldStartBackgroundWatchdog(): boolean { - return process.env.VITEST !== "true" || process.env.OPENCLAW_TEST_SESSION_LOCK_WATCHDOG === "1"; -} - -function ensureWatchdogStarted(intervalMs: number): void { - if (!shouldStartBackgroundWatchdog()) { - return; - } - const watchdogState = resolveWatchdogState(); - if (watchdogState.started) { - return; - } - watchdogState.started = true; - watchdogState.intervalMs = intervalMs; - watchdogState.timer = setInterval(() => { - void runLockWatchdogCheck().catch(() => { - // Ignore watchdog errors - best effort cleanup only. - }); - }, intervalMs); - watchdogState.timer.unref?.(); -} - -function handleTerminationSignal(signal: CleanupSignal): void { - releaseAllLocksSync(); - const cleanupState = resolveCleanupState(); - const shouldReraise = process.listenerCount(signal) === 1; - if (shouldReraise) { - const handler = cleanupState.cleanupHandlers.get(signal); - if (handler) { - process.off(signal, handler); - cleanupState.cleanupHandlers.delete(signal); - } - try { - process.kill(process.pid, signal); - } catch { - // Ignore errors during shutdown - } - } -} - -function registerCleanupHandlers(): void { - const cleanupState = resolveCleanupState(); - cleanupState.registered = true; - if (!cleanupState.exitHandler) { - // Cleanup on normal exit and process.exit() calls - cleanupState.exitHandler = () => { - releaseAllLocksSync(); - }; - process.on("exit", cleanupState.exitHandler); - } - - ensureWatchdogStarted(DEFAULT_WATCHDOG_INTERVAL_MS); - - // Handle termination signals - for (const signal of CLEANUP_SIGNALS) { - if (cleanupState.cleanupHandlers.has(signal)) { - continue; - } - try { - const handler = () => handleTerminationSignal(signal); - cleanupState.cleanupHandlers.set(signal, handler); - process.on(signal, handler); - } catch { - // Ignore unsupported signals on this platform. - } - } -} - -function unregisterCleanupHandlers(): void { - const cleanupState = resolveCleanupState(); - if (cleanupState.exitHandler) { - process.off("exit", cleanupState.exitHandler); - cleanupState.exitHandler = undefined; - } - for (const [signal, handler] of cleanupState.cleanupHandlers) { - process.off(signal, handler); - } - cleanupState.cleanupHandlers.clear(); - cleanupState.registered = false; -} - -async function readLockPayload(lockPath: string): Promise { - try { - const raw = await fs.readFile(lockPath, "utf8"); - const parsed = JSON.parse(raw) as Record; - const payload: LockFilePayload = {}; - if (isValidLockNumber(parsed.pid) && parsed.pid > 0) { - payload.pid = parsed.pid; - } - if (typeof parsed.createdAt === "string") { - payload.createdAt = parsed.createdAt; - } - if (isValidLockNumber(parsed.starttime)) { - payload.starttime = parsed.starttime; - } - return payload; - } catch { - return null; - } -} - -async function resolveNormalizedSessionFile(sessionFile: string): Promise { - const resolvedSessionFile = path.resolve(sessionFile); - const sessionDir = path.dirname(resolvedSessionFile); - try { - const normalizedDir = await fs.realpath(sessionDir); - return path.join(normalizedDir, path.basename(resolvedSessionFile)); - } catch { - return resolvedSessionFile; - } -} - -function normalizeOwnerProcessArg(arg: string): string { - return arg.trim().replaceAll("\\", "/").toLowerCase(); -} - -function isOpenClawSessionOwnerArgv(args: string[]): boolean { - const normalized = args.map(normalizeOwnerProcessArg).filter(Boolean); - if (normalized.length === 0) { - return false; - } - const exe = (normalized[0] ?? "").replace(/\.(bat|cmd|exe)$/i, ""); - if (exe === "openclaw" || exe.endsWith("/openclaw") || exe.endsWith("/openclaw-gateway")) { - return true; - } - if ( - normalized.some( - (arg) => - arg === "openclaw" || - arg.endsWith("/openclaw") || - arg === "openclaw.mjs" || - arg.endsWith("/openclaw.mjs"), - ) - ) { - return true; - } - - const entryCandidates = [ - "dist/index.js", - "dist/entry.js", - "scripts/run-node.mjs", - "src/entry.ts", - "src/index.ts", - ]; - const hasOpenClawCommandToken = normalized.some((arg) => arg === "gateway" || arg === "agent"); - return normalized.some( - (arg) => entryCandidates.some((entry) => arg.endsWith(entry)) && hasOpenClawCommandToken, - ); -} - -function readOwnerProcessArgs( - reader: SessionLockOwnerProcessArgsReader, - pid: number, -): string[] | null { - try { - const args = reader(pid); - return Array.isArray(args) ? args : null; - } catch { - return null; - } -} - -function inspectLockPayload( - payload: LockFilePayload | null, - staleMs: number, - nowMs: number, -): LockInspectionDetails { - const pid = isValidLockNumber(payload?.pid) && payload.pid > 0 ? payload.pid : null; - const pidAlive = pid !== null ? isPidAlive(pid) : false; - const createdAt = typeof payload?.createdAt === "string" ? payload.createdAt : null; - const createdAtMs = createdAt ? Date.parse(createdAt) : Number.NaN; - const ageMs = Number.isFinite(createdAtMs) ? Math.max(0, nowMs - createdAtMs) : null; - - // Detect PID recycling: if the PID is alive but its start time differs from - // what was recorded in the lock file, the original process died and the OS - // reassigned the same PID to a different process. - const storedStarttime = isValidLockNumber(payload?.starttime) ? payload.starttime : null; - const pidRecycled = - pidAlive && pid !== null && storedStarttime !== null - ? (() => { - const currentStarttime = resolveProcessStartTimeForLock(pid); - return currentStarttime !== null && currentStarttime !== storedStarttime; - })() - : false; - - const staleReasons: string[] = []; - if (pid === null) { - staleReasons.push("missing-pid"); - } else if (!pidAlive) { - staleReasons.push("dead-pid"); - } else if (pidRecycled) { - staleReasons.push("recycled-pid"); - } - if (ageMs === null) { - staleReasons.push("invalid-createdAt"); - } else if (ageMs > staleMs) { - staleReasons.push("too-old"); - } - - return { - pid, - pidAlive, - createdAt, - ageMs, - stale: staleReasons.length > 0, - staleReasons, - }; -} - -function shouldTreatAsNonOpenClawOwner(params: { - payload: LockFilePayload | null; - inspected: LockInspectionDetails; - heldByThisProcess: boolean; - readOwnerProcessArgs: SessionLockOwnerProcessArgsReader; -}): boolean { - if (params.inspected.stale || params.inspected.pid === null || !params.inspected.pidAlive) { - return false; - } - if (params.inspected.pid === process.pid && params.heldByThisProcess) { - return false; - } - if (!isValidLockNumber(params.payload?.pid) || params.payload.pid <= 0) { - return false; - } - - const args = readOwnerProcessArgs(params.readOwnerProcessArgs, params.payload.pid); - if (!args || args.every((arg) => !arg.trim())) { - return false; - } - return !isOpenClawSessionOwnerArgv(args); -} - -function lockInspectionNeedsMtimeStaleFallback(details: LockInspectionDetails): boolean { - return ( - details.stale && - details.staleReasons.every( - (reason) => reason === "missing-pid" || reason === "invalid-createdAt", - ) - ); -} - -async function shouldReclaimContendedLockFile( - lockPath: string, - details: LockInspectionDetails, - staleMs: number, - nowMs: number, -): Promise { - if (!details.stale) { - return false; - } - if (!lockInspectionNeedsMtimeStaleFallback(details)) { - return true; - } - try { - const stat = await fs.stat(lockPath); - const ageMs = Math.max(0, nowMs - stat.mtimeMs); - return ageMs > Math.min(staleMs, ORPHAN_LOCK_PAYLOAD_GRACE_MS); - } catch (error) { - const code = (error as { code?: string } | null)?.code; - return code !== "ENOENT"; - } -} - -function sessionLockHeldByThisProcess(normalizedSessionFile: string): boolean { - return SESSION_LOCKS.heldEntries().some( - (entry) => entry.normalizedTargetPath === normalizedSessionFile, - ); -} - -async function removeReportedStaleLockIfStillStale(params: { - lockPath: string; - normalizedSessionFile: string; - staleMs: number; - readOwnerProcessArgs?: SessionLockOwnerProcessArgsReader; -}): Promise { - const nowMs = Date.now(); - const payload = await readLockPayload(params.lockPath); - const inspected = inspectLockPayloadForSession({ - payload, - staleMs: params.staleMs, - nowMs, - heldByThisProcess: sessionLockHeldByThisProcess(params.normalizedSessionFile), - reclaimLockWithoutStarttime: true, - readOwnerProcessArgs: params.readOwnerProcessArgs ?? readProcessArgsSync, - }); - if (!(await shouldReclaimContendedLockFile(params.lockPath, inspected, params.staleMs, nowMs))) { - return false; - } - await fs.rm(params.lockPath, { force: true }); - return true; -} - -function shouldTreatAsOrphanSelfLock(params: { - payload: LockFilePayload | null; - heldByThisProcess: boolean; - reclaimLockWithoutStarttime: boolean; -}): boolean { - const pid = isValidLockNumber(params.payload?.pid) ? params.payload.pid : null; - if (pid !== process.pid) { - return false; - } - if (params.heldByThisProcess) { - return false; - } - - const storedStarttime = isValidLockNumber(params.payload?.starttime) - ? params.payload.starttime - : null; - if (storedStarttime === null) { - return params.reclaimLockWithoutStarttime; - } - - const currentStarttime = resolveProcessStartTimeForLock(process.pid); - return currentStarttime !== null && currentStarttime === storedStarttime; -} - -function inspectLockPayloadForSession(params: { - payload: LockFilePayload | null; - staleMs: number; - nowMs: number; - heldByThisProcess: boolean; - reclaimLockWithoutStarttime: boolean; - readOwnerProcessArgs: SessionLockOwnerProcessArgsReader; -}): LockInspectionDetails { - const inspected = inspectLockPayload(params.payload, params.staleMs, params.nowMs); - if ( - shouldTreatAsOrphanSelfLock({ - payload: params.payload, - heldByThisProcess: params.heldByThisProcess, - reclaimLockWithoutStarttime: params.reclaimLockWithoutStarttime, - }) - ) { - return { - ...inspected, - stale: true, - staleReasons: inspected.staleReasons.includes("orphan-self-pid") - ? inspected.staleReasons - : [...inspected.staleReasons, "orphan-self-pid"], - }; - } - - if ( - shouldTreatAsNonOpenClawOwner({ - payload: params.payload, - inspected, - heldByThisProcess: params.heldByThisProcess, - readOwnerProcessArgs: params.readOwnerProcessArgs, - }) - ) { - return { - ...inspected, - stale: true, - staleReasons: [...inspected.staleReasons, "non-openclaw-owner"], - }; - } - - return inspected; -} - -export async function cleanStaleLockFiles(params: { - sessionsDir: string; - staleMs?: number; - removeStale?: boolean; - nowMs?: number; - readOwnerProcessArgs?: SessionLockOwnerProcessArgsReader; - log?: { - warn?: (message: string) => void; - info?: (message: string) => void; - }; -}): Promise<{ locks: SessionLockInspection[]; cleaned: SessionLockInspection[] }> { - const sessionsDir = path.resolve(params.sessionsDir); - const staleMs = resolvePositiveMs(params.staleMs, DEFAULT_STALE_MS); - const removeStale = params.removeStale !== false; - const nowMs = params.nowMs ?? Date.now(); - const ownerProcessArgsReader = params.readOwnerProcessArgs ?? readProcessArgsSync; - - let entries: fsSync.Dirent[] = []; - try { - entries = await fs.readdir(sessionsDir, { withFileTypes: true }); - } catch (err) { - const code = (err as { code?: string }).code; - if (code === "ENOENT") { - return { locks: [], cleaned: [] }; - } - throw err; - } - - const locks: SessionLockInspection[] = []; - const cleaned: SessionLockInspection[] = []; - const lockEntries = entries - .filter((entry) => entry.name.endsWith(".jsonl.lock")) - .toSorted((a, b) => a.name.localeCompare(b.name)); - - for (const entry of lockEntries) { - const lockPath = path.join(sessionsDir, entry.name); - const payload = await readLockPayload(lockPath); - const inspected = inspectLockPayloadForSession({ - payload, - staleMs, - nowMs, - heldByThisProcess: false, - reclaimLockWithoutStarttime: false, - readOwnerProcessArgs: ownerProcessArgsReader, - }); - const lockInfo: SessionLockInspection = { - lockPath, - ...inspected, - removed: false, - }; - - if (lockInfo.stale && removeStale) { - await fs.rm(lockPath, { force: true }); - lockInfo.removed = true; - cleaned.push(lockInfo); - params.log?.warn?.( - `removed stale session lock: ${lockPath} (${lockInfo.staleReasons.join(", ") || "unknown"})`, - ); - } - - locks.push(lockInfo); - } - - return { locks, cleaned }; -} - -export async function acquireSessionWriteLock(params: { - sessionFile: string; - timeoutMs?: number; - staleMs?: number; - maxHoldMs?: number; - allowReentrant?: boolean; -}): Promise<{ - release: () => Promise; -}> { - registerCleanupHandlers(); - const allowReentrant = params.allowReentrant ?? false; - const timeoutMs = resolvePositiveMs(params.timeoutMs, resolveSessionWriteLockAcquireTimeoutMs(), { - allowInfinity: true, - }); - const staleMs = resolvePositiveMs(params.staleMs, DEFAULT_STALE_MS); - const maxHoldMs = resolvePositiveMs(params.maxHoldMs, DEFAULT_MAX_HOLD_MS); - const sessionFile = path.resolve(params.sessionFile); - const sessionDir = path.dirname(sessionFile); - const normalizedSessionFile = await resolveNormalizedSessionFile(sessionFile); - const lockPath = `${normalizedSessionFile}.lock`; - await fs.mkdir(sessionDir, { recursive: true }); - while (true) { - try { - const lock = await SESSION_LOCKS.acquire(sessionFile, { - staleMs, - timeoutMs, - retry: { minTimeout: 50, maxTimeout: 1000, factor: 1 }, - allowReentrant, - metadata: { maxHoldMs }, - payload: () => { - const createdAt = new Date().toISOString(); - const starttime = resolveProcessStartTimeForLock(process.pid); - const lockPayload: LockFilePayload = { pid: process.pid, createdAt }; - if (starttime !== null) { - lockPayload.starttime = starttime; - } - return lockPayload as Record; - }, - shouldReclaim: async ({ payload, nowMs, heldByThisProcess }) => { - const inspected = inspectLockPayloadForSession({ - payload: payload as LockFilePayload | null, - staleMs, - nowMs, - heldByThisProcess, - reclaimLockWithoutStarttime: true, - readOwnerProcessArgs: readProcessArgsSync, - }); - return await shouldReclaimContendedLockFile(lockPath, inspected, staleMs, nowMs); - }, - }); - return { release: lock.release }; - } catch (err) { - if (isFileLockError(err, "file_lock_stale")) { - const staleLockPath = (err as { lockPath?: string }).lockPath ?? lockPath; - if ( - await removeReportedStaleLockIfStillStale({ - lockPath: staleLockPath, - normalizedSessionFile, - staleMs, - }) - ) { - continue; - } - } - if (!isFileLockError(err, "file_lock_timeout")) { - throw err; - } - const timeoutLockPath = (err as { lockPath?: string }).lockPath ?? lockPath; - const payload = await readLockPayload(timeoutLockPath); - const owner = typeof payload?.pid === "number" ? `pid=${payload.pid}` : "unknown"; - throw new SessionWriteLockTimeoutError({ timeoutMs, owner, lockPath: timeoutLockPath }); - } - } -} - -export const __testing = { - cleanupSignals: [...CLEANUP_SIGNALS], - handleTerminationSignal, - releaseAllLocksSync, - runLockWatchdogCheck, - setProcessStartTimeResolverForTest(resolver: ((pid: number) => number | null) | null): void { - resolveProcessStartTimeForLock = resolver ?? getProcessStartTime; - }, -}; - -export async function drainSessionWriteLockStateForTest(): Promise { - await SESSION_LOCKS.drain(); - stopWatchdogTimer(); - unregisterCleanupHandlers(); -} - -export function resetSessionWriteLockStateForTest(): void { - releaseAllLocksSync(); - stopWatchdogTimer(); - unregisterCleanupHandlers(); - resolveProcessStartTimeForLock = getProcessStartTime; -} diff --git a/src/auto-reply/reply/commands-session-store.ts b/src/auto-reply/reply/commands-session-entry.ts similarity index 55% rename from src/auto-reply/reply/commands-session-store.ts rename to src/auto-reply/reply/commands-session-entry.ts index dd7e223d89b..62d86d61f89 100644 --- a/src/auto-reply/reply/commands-session-store.ts +++ b/src/auto-reply/reply/commands-session-entry.ts @@ -1,5 +1,9 @@ -import type { SessionEntry } from "../../config/sessions.js"; -import { updateSessionStore } from "../../config/sessions.js"; +import { + getSessionEntry, + resolveAgentIdFromSessionKey, + upsertSessionEntry, + type SessionEntry, +} from "../../config/sessions.js"; import { applyAbortCutoffToSessionEntry, type AbortCutoff } from "./abort-cutoff.js"; import type { CommandHandler } from "./commands-types.js"; @@ -11,11 +15,11 @@ export async function persistSessionEntry(params: CommandParams): Promise { - store[params.sessionKey] = params.sessionEntry as SessionEntry; - }); - } + upsertSessionEntry({ + agentId: resolveAgentIdFromSessionKey(params.sessionKey), + sessionKey: params.sessionKey, + entry: params.sessionEntry, + }); return true; } @@ -23,10 +27,9 @@ export async function persistAbortTargetEntry(params: { entry?: SessionEntry; key?: string; sessionStore?: Record; - storePath?: string; abortCutoff?: AbortCutoff; }): Promise { - const { entry, key, sessionStore, storePath, abortCutoff } = params; + const { entry, key, sessionStore, abortCutoff } = params; if (!entry || !key || !sessionStore) { return false; } @@ -36,18 +39,16 @@ export async function persistAbortTargetEntry(params: { entry.updatedAt = Date.now(); sessionStore[key] = entry; - if (storePath) { - await updateSessionStore(storePath, (store) => { - const nextEntry = store[key] ?? entry; - if (!nextEntry) { - return; - } - nextEntry.abortedLastRun = true; - applyAbortCutoffToSessionEntry(nextEntry, abortCutoff); - nextEntry.updatedAt = Date.now(); - store[key] = nextEntry; - }); - } + const agentId = resolveAgentIdFromSessionKey(key); + const nextEntry = getSessionEntry({ agentId, sessionKey: key }) ?? entry; + nextEntry.abortedLastRun = true; + applyAbortCutoffToSessionEntry(nextEntry, abortCutoff); + nextEntry.updatedAt = Date.now(); + upsertSessionEntry({ + agentId, + sessionKey: key, + entry: nextEntry, + }); return true; } diff --git a/src/commands/doctor-session-locks.test.ts b/src/commands/doctor-session-locks.test.ts deleted file mode 100644 index 4dd56a5b8c5..00000000000 --- a/src/commands/doctor-session-locks.test.ts +++ /dev/null @@ -1,123 +0,0 @@ -import fs from "node:fs/promises"; -import path from "node:path"; -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { - createOpenClawTestState, - type OpenClawTestState, -} from "../test-utils/openclaw-test-state.js"; - -const note = vi.hoisted(() => vi.fn()); - -vi.mock("../terminal/note.js", () => ({ - note, -})); - -import { noteSessionLockHealth } from "./doctor-session-locks.js"; - -async function expectPathMissing(targetPath: string): Promise { - try { - await fs.access(targetPath); - throw new Error(`expected missing path: ${targetPath}`); - } catch (error) { - expect((error as NodeJS.ErrnoException).code).toBe("ENOENT"); - } -} - -describe("noteSessionLockHealth", () => { - let state: OpenClawTestState; - - beforeEach(async () => { - note.mockClear(); - state = await createOpenClawTestState({ - layout: "state-only", - prefix: "openclaw-doctor-locks-", - }); - }); - - afterEach(async () => { - await state.cleanup(); - }); - - it("reports existing lock files with pid status and age", async () => { - const sessionsDir = state.sessionsDir(); - await fs.mkdir(sessionsDir, { recursive: true }); - const lockPath = path.join(sessionsDir, "active.jsonl.lock"); - await fs.writeFile( - lockPath, - JSON.stringify({ pid: process.pid, createdAt: new Date(Date.now() - 1500).toISOString() }), - "utf8", - ); - - await noteSessionLockHealth({ - shouldRepair: false, - staleMs: 60_000, - readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "doctor"], - }); - - expect(note).toHaveBeenCalledTimes(1); - const [message, title] = note.mock.calls[0] as [string, string]; - expect(title).toBe("Session locks"); - expect(message).toContain("Found 1 session lock file"); - expect(message).toContain(`pid=${process.pid} (alive)`); - expect(message).toContain("stale=no"); - await expect(fs.access(lockPath)).resolves.toBeUndefined(); - }); - - it("removes stale locks in repair mode", async () => { - const sessionsDir = state.sessionsDir(); - await fs.mkdir(sessionsDir, { recursive: true }); - - const staleLock = path.join(sessionsDir, "stale.jsonl.lock"); - const freshLock = path.join(sessionsDir, "fresh.jsonl.lock"); - - await fs.writeFile( - staleLock, - JSON.stringify({ pid: -1, createdAt: new Date(Date.now() - 120_000).toISOString() }), - "utf8", - ); - await fs.writeFile( - freshLock, - JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }), - "utf8", - ); - - await noteSessionLockHealth({ - shouldRepair: true, - staleMs: 30_000, - readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "doctor"], - }); - - expect(note).toHaveBeenCalledTimes(1); - const [message] = note.mock.calls[0] as [string, string]; - expect(message).toContain("[removed]"); - expect(message).toContain("Removed 1 stale session lock file"); - - await expectPathMissing(staleLock); - await expect(fs.access(freshLock)).resolves.toBeUndefined(); - }); - - it("removes fresh live locks when the owner is not an OpenClaw process", async () => { - const sessionsDir = state.sessionsDir(); - await fs.mkdir(sessionsDir, { recursive: true }); - - const falseLiveLock = path.join(sessionsDir, "false-live.jsonl.lock"); - await fs.writeFile( - falseLiveLock, - JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }), - "utf8", - ); - - await noteSessionLockHealth({ - shouldRepair: true, - staleMs: 60_000, - readOwnerProcessArgs: () => ["python", "worker.py"], - }); - - expect(note).toHaveBeenCalledTimes(1); - const [message] = note.mock.calls[0] as [string, string]; - expect(message).toContain("stale=yes (non-openclaw-owner)"); - expect(message).toContain("[removed]"); - expect(message).toContain("Removed 1 stale session lock file"); - await expect(fs.access(falseLiveLock)).rejects.toThrow(); - }); -}); diff --git a/src/commands/doctor-session-locks.ts b/src/commands/doctor-session-locks.ts deleted file mode 100644 index c7377783c3f..00000000000 --- a/src/commands/doctor-session-locks.ts +++ /dev/null @@ -1,94 +0,0 @@ -import { resolveAgentSessionDirs } from "../agents/session-dirs.js"; -import { - cleanStaleLockFiles, - type SessionLockInspection, - type SessionLockOwnerProcessArgsReader, -} from "../agents/session-write-lock.js"; -import { resolveStateDir } from "../config/paths.js"; -import { note } from "../terminal/note.js"; -import { shortenHomePath } from "../utils.js"; - -const DEFAULT_STALE_MS = 30 * 60 * 1000; - -function formatAge(ageMs: number | null): string { - if (ageMs === null) { - return "unknown"; - } - const seconds = Math.floor(ageMs / 1000); - if (seconds < 60) { - return `${seconds}s`; - } - const minutes = Math.floor(seconds / 60); - const remainingSeconds = seconds % 60; - if (minutes < 60) { - return `${minutes}m${remainingSeconds}s`; - } - const hours = Math.floor(minutes / 60); - const remainingMinutes = minutes % 60; - return `${hours}h${remainingMinutes}m`; -} - -function formatLockLine(lock: SessionLockInspection): string { - const pidStatus = - lock.pid === null ? "pid=missing" : `pid=${lock.pid} (${lock.pidAlive ? "alive" : "dead"})`; - const ageStatus = `age=${formatAge(lock.ageMs)}`; - const staleStatus = lock.stale - ? `stale=yes (${lock.staleReasons.join(", ") || "unknown"})` - : "stale=no"; - const removedStatus = lock.removed ? " [removed]" : ""; - return `- ${shortenHomePath(lock.lockPath)} ${pidStatus} ${ageStatus} ${staleStatus}${removedStatus}`; -} - -export async function noteSessionLockHealth(params?: { - shouldRepair?: boolean; - staleMs?: number; - readOwnerProcessArgs?: SessionLockOwnerProcessArgsReader; -}) { - const shouldRepair = params?.shouldRepair === true; - const staleMs = params?.staleMs ?? DEFAULT_STALE_MS; - let sessionDirs: string[] = []; - try { - sessionDirs = await resolveAgentSessionDirs(resolveStateDir(process.env)); - } catch (err) { - note(`- Failed to inspect session lock files: ${String(err)}`, "Session locks"); - return; - } - - if (sessionDirs.length === 0) { - return; - } - - const allLocks: SessionLockInspection[] = []; - for (const sessionsDir of sessionDirs) { - const result = await cleanStaleLockFiles({ - sessionsDir, - staleMs, - removeStale: shouldRepair, - readOwnerProcessArgs: params?.readOwnerProcessArgs, - }); - allLocks.push(...result.locks); - } - - if (allLocks.length === 0) { - return; - } - - const staleCount = allLocks.filter((lock) => lock.stale).length; - const removedCount = allLocks.filter((lock) => lock.removed).length; - const lines: string[] = [ - `- Found ${allLocks.length} session lock file${allLocks.length === 1 ? "" : "s"}.`, - ...allLocks.toSorted((a, b) => a.lockPath.localeCompare(b.lockPath)).map(formatLockLine), - ]; - - if (staleCount > 0 && !shouldRepair) { - lines.push(`- ${staleCount} lock file${staleCount === 1 ? " is" : "s are"} stale.`); - lines.push('- Run "openclaw doctor --fix" to remove stale lock files automatically.'); - } - if (shouldRepair && removedCount > 0) { - lines.push( - `- Removed ${removedCount} stale session lock file${removedCount === 1 ? "" : "s"}.`, - ); - } - - note(lines.join("\n"), "Session locks"); -} diff --git a/src/test-utils/session-write-lock-module-mock.ts b/src/test-utils/session-write-lock-module-mock.ts deleted file mode 100644 index 0d4013ae84e..00000000000 --- a/src/test-utils/session-write-lock-module-mock.ts +++ /dev/null @@ -1,14 +0,0 @@ -import type * as SessionWriteLockModule from "../agents/session-write-lock.js"; - -type SessionWriteLockModuleShape = typeof SessionWriteLockModule; - -export async function buildSessionWriteLockModuleMock( - loadActual: () => Promise, - acquireSessionWriteLock: SessionWriteLockModuleShape["acquireSessionWriteLock"], -): Promise { - const original = await loadActual(); - return { - ...original, - acquireSessionWriteLock, - }; -}