refactor: remove session file locking stores

This commit is contained in:
Peter Steinberger
2026-05-09 16:36:51 +01:00
parent f7cca9a0d7
commit 42e5bdb3cc
12 changed files with 64 additions and 4054 deletions

View File

@@ -1,10 +1,12 @@
import {
getSessionEntry,
mergeSessionEntry,
setSessionRuntimeModel,
type SessionEntry,
updateSessionStore,
upsertSessionEntry,
} from "../../config/sessions.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { clearCliSession, setCliSessionBinding, setCliSessionId } from "../cli-session.js";
@@ -45,12 +47,33 @@ function removeLifecycleStateFromMetadataPatch(entry: SessionEntry): SessionEntr
return next;
}
export async function updateSessionStoreAfterAgentRun(params: {
function persistMergedSessionEntry(params: {
sessionKey: string;
sessionStore: Record<string, SessionEntry>;
patch: SessionEntry;
}): SessionEntry {
const agentId = resolveAgentIdFromSessionKey(params.sessionKey);
if (!agentId) {
throw new Error(
`Session stores are SQLite-only; cannot resolve agent for ${params.sessionKey}`,
);
}
const existing = getSessionEntry({ agentId, sessionKey: params.sessionKey });
const merged = mergeSessionEntry(existing, params.patch);
upsertSessionEntry({
agentId,
sessionKey: params.sessionKey,
entry: merged,
});
params.sessionStore[params.sessionKey] = merged;
return merged;
}
export async function updateSessionEntryAfterAgentRun(params: {
cfg: OpenClawConfig;
contextTokensOverride?: number;
sessionId: string;
sessionKey: string;
storePath: string;
sessionStore: Record<string, SessionEntry>;
defaultProvider: string;
defaultModel: string;
@@ -70,7 +93,6 @@ export async function updateSessionStoreAfterAgentRun(params: {
cfg,
sessionId,
sessionKey,
storePath,
sessionStore,
defaultProvider,
defaultModel,
@@ -228,21 +250,19 @@ export async function updateSessionStoreAfterAgentRun(params: {
next.compactionCount = (entry.compactionCount ?? 0) + compactionsThisRun;
}
const metadataPatch = removeLifecycleStateFromMetadataPatch(next);
const persisted = await updateSessionStore(storePath, (store) => {
const merged = mergeSessionEntry(store[sessionKey], metadataPatch);
store[sessionKey] = merged;
return merged;
persistMergedSessionEntry({
sessionKey,
sessionStore,
patch: metadataPatch,
});
sessionStore[sessionKey] = persisted;
}
export async function clearCliSessionInStore(params: {
export async function clearCliSessionEntry(params: {
provider: string;
sessionKey: string;
sessionStore: Record<string, SessionEntry>;
storePath: string;
}): Promise<SessionEntry | undefined> {
const { provider, sessionKey, sessionStore, storePath } = params;
const { provider, sessionKey, sessionStore } = params;
const entry = sessionStore[sessionKey];
if (!entry) {
return undefined;
@@ -252,22 +272,19 @@ export async function clearCliSessionInStore(params: {
clearCliSession(next, provider);
next.updatedAt = Date.now();
const persisted = await updateSessionStore(storePath, (store) => {
const merged = mergeSessionEntry(store[sessionKey], next);
store[sessionKey] = merged;
return merged;
return persistMergedSessionEntry({
sessionKey,
sessionStore,
patch: next,
});
sessionStore[sessionKey] = persisted;
return persisted;
}
export async function recordCliCompactionInStore(params: {
export async function recordCliCompactionInSessionEntry(params: {
provider: string;
sessionKey: string;
sessionStore: Record<string, SessionEntry>;
storePath: string;
}): Promise<SessionEntry | undefined> {
const { provider, sessionKey, sessionStore, storePath } = params;
const { provider, sessionKey, sessionStore } = params;
const entry = sessionStore[sessionKey];
if (!entry) {
return undefined;
@@ -278,11 +295,9 @@ export async function recordCliCompactionInStore(params: {
next.compactionCount = (entry.compactionCount ?? 0) + 1;
next.updatedAt = Date.now();
const persisted = await updateSessionStore(storePath, (store) => {
const merged = mergeSessionEntry(store[sessionKey], next);
store[sessionKey] = merged;
return merged;
return persistMergedSessionEntry({
sessionKey,
sessionStore,
patch: next,
});
sessionStore[sessionKey] = persisted;
return persisted;
}

View File

@@ -1 +0,0 @@
export { updateSessionStoreAfterAgentRun } from "./session-store.js";

File diff suppressed because it is too large Load Diff

View File

@@ -1,710 +0,0 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
exportSqliteSessionTranscriptJsonl,
replaceSqliteSessionTranscriptEvents,
resolveSqliteSessionTranscriptScopeForPath,
} from "../config/sessions/transcript-store.sqlite.js";
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
import { BLANK_USER_FALLBACK_TEXT, repairSessionFileIfNeeded } from "./session-file-repair.js";
function buildSessionHeaderAndMessage() {
const header = {
type: "session",
version: 7,
id: "session-1",
timestamp: new Date().toISOString(),
cwd: "/tmp",
};
const message = {
type: "message",
id: "msg-1",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: "user", content: "hello" },
};
return { header, message };
}
const tempDirs: string[] = [];
async function createTempSessionPath() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-repair-"));
tempDirs.push(dir);
return { dir, file: path.join(dir, "session.jsonl") };
}
afterEach(async () => {
closeOpenClawStateDatabaseForTest();
await Promise.all(tempDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true })));
});
function writeTranscriptEvents(file: string, events: unknown[]) {
const sessionId =
events.find((event): event is { type: "session"; id: string } =>
Boolean(
event &&
typeof event === "object" &&
(event as { type?: unknown }).type === "session" &&
typeof (event as { id?: unknown }).id === "string",
),
)?.id ?? path.basename(file, ".jsonl");
replaceSqliteSessionTranscriptEvents({
agentId: "main",
sessionId,
transcriptPath: file,
events,
});
}
async function readTranscriptJsonl(file: string): Promise<string> {
const scope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath: file });
return scope ? exportSqliteSessionTranscriptJsonl(scope) : "";
}
describe("repairSessionFileIfNeeded", () => {
it("rewrites session files that contain malformed lines", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
writeTranscriptEvents(file, [
header,
message,
{ type: "message", id: "corrupt", message: { role: null, content: "bad" } },
]);
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(true);
expect(result.droppedLines).toBe(1);
const repaired = await readTranscriptJsonl(file);
expect(repaired.trim().split("\n")).toHaveLength(2);
});
it("does not drop CRLF-terminated JSONL lines", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const content = `${JSON.stringify(header)}\r\n${JSON.stringify(message)}\r\n`;
await fs.writeFile(file, content, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
expect(result.droppedLines).toBe(0);
});
it("warns and skips repair when the session header is invalid", async () => {
const { file } = await createTempSessionPath();
const badHeader = {
type: "message",
id: "msg-1",
timestamp: new Date().toISOString(),
message: { role: "user", content: "hello" },
};
writeTranscriptEvents(file, [badHeader]);
const warn = vi.fn();
const result = await repairSessionFileIfNeeded({ sessionFile: file, warn });
expect(result.repaired).toBe(false);
expect(result.reason).toBe("invalid session header");
expect(warn).toHaveBeenCalledTimes(1);
expect(warn.mock.calls[0]?.[0]).toContain("invalid session header");
});
it("returns a detailed reason when read errors are not ENOENT", async () => {
const { dir } = await createTempSessionPath();
const warn = vi.fn();
const result = await repairSessionFileIfNeeded({ sessionFile: dir, warn });
expect(result.repaired).toBe(false);
expect(result.reason).toBe("missing SQLite transcript");
expect(warn).not.toHaveBeenCalled();
});
it("rewrites persisted assistant messages with empty content arrays", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const poisonedAssistantEntry = {
type: "message",
id: "msg-2",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [],
api: "bedrock-converse-stream",
provider: "amazon-bedrock",
model: "anthropic.claude-3-haiku-20240307-v1:0",
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 },
stopReason: "error",
errorMessage: "transient stream failure",
},
};
// Follow-up keeps this case focused on empty error-turn repair.
const followUp = {
type: "message",
id: "msg-3",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: "user", content: "retry" },
};
writeTranscriptEvents(file, [header, message, poisonedAssistantEntry, followUp]);
const debug = vi.fn();
const result = await repairSessionFileIfNeeded({ sessionFile: file, debug });
expect(result.repaired).toBe(true);
expect(result.droppedLines).toBe(0);
expect(result.rewrittenAssistantMessages).toBe(1);
expect(debug).toHaveBeenCalledTimes(1);
const debugMessage = debug.mock.calls[0]?.[0] as string;
expect(debugMessage).toContain("rewrote 1 assistant message(s)");
expect(debugMessage).not.toContain("dropped");
const repaired = await readTranscriptJsonl(file);
const repairedLines = repaired.trim().split("\n");
expect(repairedLines).toHaveLength(4);
const repairedEntry: { message: { content: { type: string; text: string }[] } } = JSON.parse(
repairedLines[2],
);
expect(repairedEntry.message.content).toEqual([
{ type: "text", text: "[assistant turn failed before producing content]" },
]);
});
it("rewrites blank-only user text messages to synthetic placeholder instead of dropping", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const blankUserEntry = {
type: "message",
id: "msg-blank",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "user",
content: [{ type: "text", text: "" }],
},
};
writeTranscriptEvents(file, [header, blankUserEntry, message]);
const debug = vi.fn();
const result = await repairSessionFileIfNeeded({ sessionFile: file, debug });
expect(result.repaired).toBe(true);
expect(result.rewrittenUserMessages).toBe(1);
expect(result.droppedBlankUserMessages).toBe(0);
expect(debug.mock.calls[0]?.[0]).toContain("rewrote 1 user message(s)");
const repaired = await readTranscriptJsonl(file);
const repairedLines = repaired.trim().split("\n");
expect(repairedLines).toHaveLength(3);
const rewrittenEntry = JSON.parse(repairedLines[1]);
expect(rewrittenEntry.id).toBe("msg-blank");
expect(rewrittenEntry.message.content).toEqual([
{ type: "text", text: BLANK_USER_FALLBACK_TEXT },
]);
});
it("rewrites blank string-content user messages to placeholder", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const blankStringUserEntry = {
type: "message",
id: "msg-blank-str",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "user",
content: " ",
},
};
writeTranscriptEvents(file, [header, blankStringUserEntry, message]);
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(true);
expect(result.rewrittenUserMessages).toBe(1);
const repaired = await readTranscriptJsonl(file);
const repairedLines = repaired.trim().split("\n");
expect(repairedLines).toHaveLength(3);
const rewrittenEntry = JSON.parse(repairedLines[1]);
expect(rewrittenEntry.message.content).toBe(BLANK_USER_FALLBACK_TEXT);
});
it("removes blank user text blocks while preserving media blocks", async () => {
const { file } = await createTempSessionPath();
const { header } = buildSessionHeaderAndMessage();
const mediaUserEntry = {
type: "message",
id: "msg-media",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "user",
content: [
{ type: "text", text: " " },
{ type: "image", data: "AA==", mimeType: "image/png" },
],
},
};
writeTranscriptEvents(file, [header, mediaUserEntry]);
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(true);
expect(result.rewrittenUserMessages).toBe(1);
const repaired = await readTranscriptJsonl(file);
const repairedEntry = JSON.parse(repaired.trim().split("\n")[1] ?? "{}");
expect(repairedEntry.message.content).toEqual([
{ type: "image", data: "AA==", mimeType: "image/png" },
]);
});
it("reports both drops and rewrites in the debug message when both occur", async () => {
const { file } = await createTempSessionPath();
const { header } = buildSessionHeaderAndMessage();
const poisonedAssistantEntry = {
type: "message",
id: "msg-2",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [],
api: "bedrock-converse-stream",
provider: "amazon-bedrock",
model: "anthropic.claude-3-haiku-20240307-v1:0",
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 },
stopReason: "error",
},
};
writeTranscriptEvents(file, [
header,
poisonedAssistantEntry,
{ type: "message", id: "corrupt", message: { role: null, content: "bad" } },
]);
const debug = vi.fn();
const result = await repairSessionFileIfNeeded({ sessionFile: file, debug });
expect(result.repaired).toBe(true);
expect(result.droppedLines).toBe(1);
expect(result.rewrittenAssistantMessages).toBe(1);
const debugMessage = debug.mock.calls[0]?.[0] as string;
expect(debugMessage).toContain("dropped 1 malformed line(s)");
expect(debugMessage).toContain("rewrote 1 assistant message(s)");
});
it("does not rewrite silent-reply turns (stopReason=stop, content=[]) on disk", async () => {
const { file } = await createTempSessionPath();
const { header } = buildSessionHeaderAndMessage();
const silentReplyEntry = {
type: "message",
id: "msg-2",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [],
api: "openai-responses",
provider: "ollama",
model: "glm-5.1:cloud",
usage: { input: 100, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 100 },
stopReason: "stop",
},
};
// Follow-up keeps this case focused on silent-reply preservation.
const followUp = {
type: "message",
id: "msg-3",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: "user", content: "follow up" },
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(silentReplyEntry)}\n${JSON.stringify(followUp)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
expect(result.rewrittenAssistantMessages ?? 0).toBe(0);
const after = await fs.readFile(file, "utf-8");
expect(after).toBe(original);
});
it("preserves delivered trailing assistant messages in the session file", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const assistantEntry = {
type: "message",
id: "msg-asst",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "stale answer" }],
stopReason: "stop",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(assistantEntry)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
const after = await fs.readFile(file, "utf-8");
expect(after).toBe(original);
});
it("preserves multiple consecutive delivered trailing assistant messages", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const assistantEntry1 = {
type: "message",
id: "msg-asst-1",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "first" }],
stopReason: "stop",
},
};
const assistantEntry2 = {
type: "message",
id: "msg-asst-2",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "second" }],
stopReason: "stop",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(assistantEntry1)}\n${JSON.stringify(assistantEntry2)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
const after = await fs.readFile(file, "utf-8");
expect(after).toBe(original);
});
it("does not trim non-trailing assistant messages", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const assistantEntry = {
type: "message",
id: "msg-asst",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "answer" }],
stopReason: "stop",
},
};
const userFollowUp = {
type: "message",
id: "msg-user-2",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: "user", content: "follow up" },
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(assistantEntry)}\n${JSON.stringify(userFollowUp)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
});
it("preserves trailing assistant messages that contain tool calls", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const toolCallAssistant = {
type: "message",
id: "msg-asst-tc",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [
{ type: "text", text: "Let me check that." },
{ type: "toolCall", id: "call_1", name: "read", input: { path: "/tmp/test" } },
],
stopReason: "toolUse",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(toolCallAssistant)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
const after = await fs.readFile(file, "utf-8");
expect(after).toBe(original);
});
it("preserves adjacent trailing tool-call and text assistant messages", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const toolCallAssistant = {
type: "message",
id: "msg-asst-tc",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "toolUse", id: "call_1", name: "read" }],
stopReason: "toolUse",
},
};
const plainAssistant = {
type: "message",
id: "msg-asst-plain",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "stale" }],
stopReason: "stop",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(toolCallAssistant)}\n${JSON.stringify(plainAssistant)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
const after = await fs.readFile(file, "utf-8");
expect(after).toBe(original);
});
it("preserves final text assistant turn that follows a tool-call/tool-result pair", async () => {
// Regression: a trailing assistant message with stopReason "stop" that follows a
// tool-call turn and its matching tool-result must never be trimmed by the repair
// pass. This is the exact sequence produced by any agent run that calls at least
// one tool before returning a final text response, and it must survive intact so
// subsequent user messages are parented to the correct leaf node.
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const toolCallAssistant = {
type: "message",
id: "msg-asst-tc",
parentId: "msg-1",
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "toolCall", id: "call_1", name: "get_tasks", input: {} }],
stopReason: "toolUse",
},
};
const toolResult = {
type: "message",
id: "msg-tool-result",
parentId: "msg-asst-tc",
timestamp: new Date().toISOString(),
message: {
role: "toolResult",
toolCallId: "call_1",
toolName: "get_tasks",
content: [{ type: "text", text: "Task A, Task B" }],
isError: false,
},
};
const finalAssistant = {
type: "message",
id: "msg-asst-final",
parentId: "msg-tool-result",
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "Here are your tasks: Task A, Task B." }],
stopReason: "stop",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(toolCallAssistant)}\n${JSON.stringify(toolResult)}\n${JSON.stringify(finalAssistant)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
const after = await fs.readFile(file, "utf-8");
expect(after).toBe(original);
});
it("preserves assistant-only session history after the header", async () => {
const { file } = await createTempSessionPath();
const { header } = buildSessionHeaderAndMessage();
const assistantEntry = {
type: "message",
id: "msg-asst",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "orphan" }],
stopReason: "stop",
},
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(assistantEntry)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
const after = await fs.readFile(file, "utf-8");
expect(after).toBe(original);
});
it("is a no-op on a session that was already repaired", async () => {
const { file } = await createTempSessionPath();
const { header } = buildSessionHeaderAndMessage();
const healedEntry = {
type: "message",
id: "msg-2",
parentId: null,
timestamp: new Date().toISOString(),
message: {
role: "assistant",
content: [{ type: "text", text: "[assistant turn failed before producing content]" }],
api: "bedrock-converse-stream",
provider: "amazon-bedrock",
model: "anthropic.claude-3-haiku-20240307-v1:0",
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, totalTokens: 0 },
stopReason: "error",
},
};
// Follow-up keeps this case focused on idempotent empty error-turn repair.
const followUp = {
type: "message",
id: "msg-3",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: "user", content: "follow up" },
};
const original = `${JSON.stringify(header)}\n${JSON.stringify(healedEntry)}\n${JSON.stringify(followUp)}\n`;
await fs.writeFile(file, original, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
expect(result.rewrittenAssistantMessages ?? 0).toBe(0);
const after = await fs.readFile(file, "utf-8");
expect(after).toBe(original);
});
it("drops type:message entries with null role instead of preserving them through repair (#77228)", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const nullRoleEntry = {
type: "message",
id: "corrupt-1",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: null, content: "ignored" },
};
const missingRoleEntry = {
type: "message",
id: "corrupt-2",
parentId: null,
timestamp: new Date().toISOString(),
message: { content: "no role at all" },
};
const emptyRoleEntry = {
type: "message",
id: "corrupt-3",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: " ", content: "blank role" },
};
writeTranscriptEvents(file, [header, message, nullRoleEntry, missingRoleEntry, emptyRoleEntry]);
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(true);
expect(result.droppedLines).toBe(3);
const after = await readTranscriptJsonl(file);
const lines = after.trimEnd().split("\n");
expect(lines).toHaveLength(2);
expect(JSON.parse(lines[0])).toEqual(header);
expect(JSON.parse(lines[1])).toEqual(message);
expect(after).not.toContain('"role":null');
});
it("drops a type:message entry whose message field is missing or non-object", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const missingMessage = {
type: "message",
id: "corrupt-4",
parentId: null,
timestamp: new Date().toISOString(),
};
const stringMessage = {
type: "message",
id: "corrupt-5",
parentId: null,
timestamp: new Date().toISOString(),
message: "not an object",
};
writeTranscriptEvents(file, [header, message, missingMessage, stringMessage]);
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(true);
expect(result.droppedLines).toBe(2);
const after = await readTranscriptJsonl(file);
const lines = after.trimEnd().split("\n");
expect(lines).toHaveLength(2);
});
it("preserves non-`message` envelope types (e.g. compactionSummary, custom) without role inspection", async () => {
const { file } = await createTempSessionPath();
const { header, message } = buildSessionHeaderAndMessage();
const summary = {
type: "summary",
id: "summary-1",
timestamp: new Date().toISOString(),
summary: "opaque summary blob",
};
const custom = {
type: "custom",
id: "custom-1",
customType: "model-snapshot",
timestamp: new Date().toISOString(),
data: { provider: "openai", modelApi: "openai-responses", modelId: "gpt-5" },
};
const content = [
JSON.stringify(header),
JSON.stringify(message),
JSON.stringify(summary),
JSON.stringify(custom),
].join("\n");
await fs.writeFile(file, `${content}\n`, "utf-8");
const result = await repairSessionFileIfNeeded({ sessionFile: file });
expect(result.repaired).toBe(false);
expect(result.droppedLines).toBe(0);
const after = await fs.readFile(file, "utf-8");
expect(after).toBe(`${content}\n`);
});
});

View File

@@ -1,296 +0,0 @@
import path from "node:path";
import {
loadSqliteSessionTranscriptEvents,
replaceSqliteSessionTranscriptEvents,
resolveSqliteSessionTranscriptScopeForPath,
} from "../config/sessions/transcript-store.sqlite.js";
import { STREAM_ERROR_FALLBACK_TEXT } from "./stream-message-shared.js";
/** Placeholder for blank user messages — preserves the user turn so strict
* providers that require at least one user message don't reject the transcript. */
export const BLANK_USER_FALLBACK_TEXT = "(continue)";
type RepairReport = {
repaired: boolean;
droppedLines: number;
rewrittenAssistantMessages?: number;
droppedBlankUserMessages?: number;
rewrittenUserMessages?: number;
backupPath?: string;
reason?: string;
};
// The sentinel text is shared with stream-message-shared.ts and
// replay-history.ts so a repaired entry is byte-identical to a live
// stream-error turn, keeping the repair pass idempotent.
type SessionMessageEntry = {
type: "message";
message: { role: string; content?: unknown } & Record<string, unknown>;
} & Record<string, unknown>;
function isSessionHeader(entry: unknown): entry is { type: string; id: string } {
if (!entry || typeof entry !== "object") {
return false;
}
const record = entry as { type?: unknown; id?: unknown };
return record.type === "session" && typeof record.id === "string" && record.id.length > 0;
}
/**
* Detect a `type: "message"` entry whose `message.role` is missing, `null`, or
* not a non-empty string. Such entries surface in the wild as "null role"
* JSONL corruption (e.g. #77228 reported transcripts that contained 935+
* entries with null roles after an earlier failure). They cannot be replayed
* to any provider — every provider router branches on `message.role` — and
* preserving them through repair just relocates the corruption from the
* original file into the post-repair file. Treat them as malformed lines:
* drop during repair so the cleaned transcript no longer carries them.
*/
function isStructurallyInvalidMessageEntry(entry: unknown): boolean {
if (!entry || typeof entry !== "object") {
return false;
}
const record = entry as { type?: unknown; message?: unknown };
if (record.type !== "message") {
return false;
}
if (!record.message || typeof record.message !== "object") {
return true;
}
const role = (record.message as { role?: unknown }).role;
return typeof role !== "string" || role.trim().length === 0;
}
function isAssistantEntryWithEmptyContent(entry: unknown): entry is SessionMessageEntry {
if (!entry || typeof entry !== "object") {
return false;
}
const record = entry as { type?: unknown; message?: unknown };
if (record.type !== "message" || !record.message || typeof record.message !== "object") {
return false;
}
const message = record.message as {
role?: unknown;
content?: unknown;
stopReason?: unknown;
};
if (message.role !== "assistant") {
return false;
}
if (!Array.isArray(message.content) || message.content.length !== 0) {
return false;
}
// Only error stops — clean stops with empty content (NO_REPLY path) are
// valid silent replies that must not be overwritten with synthetic text.
return message.stopReason === "error";
}
function rewriteAssistantEntryWithEmptyContent(entry: SessionMessageEntry): SessionMessageEntry {
return {
...entry,
message: {
...entry.message,
content: [{ type: "text", text: STREAM_ERROR_FALLBACK_TEXT }],
},
};
}
type UserEntryRepair =
| { kind: "drop" }
| { kind: "rewrite"; entry: SessionMessageEntry }
| { kind: "keep" };
function repairUserEntryWithBlankTextContent(entry: SessionMessageEntry): UserEntryRepair {
const content = entry.message.content;
if (typeof content === "string") {
if (content.trim()) {
return { kind: "keep" };
}
return {
kind: "rewrite",
entry: {
...entry,
message: {
...entry.message,
content: BLANK_USER_FALLBACK_TEXT,
},
},
};
}
if (!Array.isArray(content)) {
return { kind: "keep" };
}
let touched = false;
const nextContent = content.filter((block) => {
if (!block || typeof block !== "object") {
return true;
}
if ((block as { type?: unknown }).type !== "text") {
return true;
}
const text = (block as { text?: unknown }).text;
if (typeof text !== "string" || text.trim().length > 0) {
return true;
}
touched = true;
return false;
});
if (nextContent.length === 0) {
return {
kind: "rewrite",
entry: {
...entry,
message: {
...entry.message,
content: [{ type: "text", text: BLANK_USER_FALLBACK_TEXT }],
},
},
};
}
if (!touched) {
return { kind: "keep" };
}
return {
kind: "rewrite",
entry: {
...entry,
message: {
...entry.message,
content: nextContent,
},
},
};
}
function buildRepairSummaryParts(params: {
droppedLines: number;
rewrittenAssistantMessages: number;
droppedBlankUserMessages: number;
rewrittenUserMessages: number;
}): string {
const parts: string[] = [];
if (params.droppedLines > 0) {
parts.push(`dropped ${params.droppedLines} malformed line(s)`);
}
if (params.rewrittenAssistantMessages > 0) {
parts.push(`rewrote ${params.rewrittenAssistantMessages} assistant message(s)`);
}
if (params.droppedBlankUserMessages > 0) {
parts.push(`dropped ${params.droppedBlankUserMessages} blank user message(s)`);
}
if (params.rewrittenUserMessages > 0) {
parts.push(`rewrote ${params.rewrittenUserMessages} user message(s)`);
}
return parts.length > 0 ? parts.join(", ") : "no changes";
}
export async function repairSessionFileIfNeeded(params: {
sessionFile: string;
debug?: (message: string) => void;
warn?: (message: string) => void;
}): Promise<RepairReport> {
const sessionFile = params.sessionFile.trim();
if (!sessionFile) {
return { repaired: false, droppedLines: 0, reason: "missing session file" };
}
const scope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath: sessionFile });
if (!scope) {
return { repaired: false, droppedLines: 0, reason: "missing SQLite transcript" };
}
const storedEntries = loadSqliteSessionTranscriptEvents(scope).map((entry) => entry.event);
const entries: unknown[] = [];
let droppedLines = 0;
let rewrittenAssistantMessages = 0;
let droppedBlankUserMessages = 0;
let rewrittenUserMessages = 0;
for (const entry of storedEntries) {
if (isStructurallyInvalidMessageEntry(entry)) {
// Drop "null role" / missing-role message entries the same way the old
// JSONL repair dropped malformed lines: providers cannot replay them.
droppedLines += 1;
continue;
}
if (isAssistantEntryWithEmptyContent(entry)) {
entries.push(rewriteAssistantEntryWithEmptyContent(entry));
rewrittenAssistantMessages += 1;
continue;
}
if (
entry &&
typeof entry === "object" &&
(entry as { type?: unknown }).type === "message" &&
typeof (entry as { message?: unknown }).message === "object" &&
((entry as { message: { role?: unknown } }).message?.role ?? undefined) === "user"
) {
const repairedUser = repairUserEntryWithBlankTextContent(entry as SessionMessageEntry);
if (repairedUser.kind === "drop") {
droppedBlankUserMessages += 1;
continue;
}
if (repairedUser.kind === "rewrite") {
entries.push(repairedUser.entry);
rewrittenUserMessages += 1;
continue;
}
}
entries.push(entry);
}
if (entries.length === 0) {
return { repaired: false, droppedLines, reason: "empty session file" };
}
if (!isSessionHeader(entries[0])) {
params.warn?.(
`session file repair skipped: invalid session header (${path.basename(sessionFile)})`,
);
return { repaired: false, droppedLines, reason: "invalid session header" };
}
if (
droppedLines === 0 &&
rewrittenAssistantMessages === 0 &&
droppedBlankUserMessages === 0 &&
rewrittenUserMessages === 0
) {
return { repaired: false, droppedLines: 0 };
}
try {
replaceSqliteSessionTranscriptEvents({
...scope,
transcriptPath: sessionFile,
events: entries,
});
} catch (err) {
return {
repaired: false,
droppedLines,
rewrittenAssistantMessages,
droppedBlankUserMessages,
rewrittenUserMessages,
reason: `repair failed: ${err instanceof Error ? err.message : "unknown error"}`,
};
}
params.debug?.(
`session file repaired: ${buildRepairSummaryParts({
droppedLines,
rewrittenAssistantMessages,
droppedBlankUserMessages,
rewrittenUserMessages,
})} (${path.basename(sessionFile)})`,
);
return {
repaired: true,
droppedLines,
rewrittenAssistantMessages,
droppedBlankUserMessages,
rewrittenUserMessages,
};
}

View File

@@ -1,29 +0,0 @@
const SESSION_WRITE_LOCK_TIMEOUT_CODE = "OPENCLAW_SESSION_WRITE_LOCK_TIMEOUT";
export class SessionWriteLockTimeoutError extends Error {
readonly code = SESSION_WRITE_LOCK_TIMEOUT_CODE;
readonly timeoutMs: number;
readonly owner: string;
readonly lockPath: string;
constructor(params: { timeoutMs: number; owner: string; lockPath: string }) {
super(
`session file locked (timeout ${params.timeoutMs}ms): ${params.owner} ${params.lockPath}`,
);
this.name = "SessionWriteLockTimeoutError";
this.timeoutMs = params.timeoutMs;
this.owner = params.owner;
this.lockPath = params.lockPath;
}
}
export function isSessionWriteLockTimeoutError(err: unknown): boolean {
return (
err instanceof SessionWriteLockTimeoutError ||
Boolean(
err &&
typeof err === "object" &&
(err as { code?: unknown }).code === SESSION_WRITE_LOCK_TIMEOUT_CODE,
)
);
}

View File

@@ -1,766 +0,0 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeAll, describe, expect, it, vi } from "vitest";
const FAKE_STARTTIME = 12345;
let __testing: typeof import("./session-write-lock.js").__testing;
let acquireSessionWriteLock: typeof import("./session-write-lock.js").acquireSessionWriteLock;
let cleanStaleLockFiles: typeof import("./session-write-lock.js").cleanStaleLockFiles;
let resetSessionWriteLockStateForTest: typeof import("./session-write-lock.js").resetSessionWriteLockStateForTest;
let resolveSessionLockMaxHoldFromTimeout: typeof import("./session-write-lock.js").resolveSessionLockMaxHoldFromTimeout;
let resolveSessionWriteLockAcquireTimeoutMs: typeof import("./session-write-lock.js").resolveSessionWriteLockAcquireTimeoutMs;
async function expectLockRemovedOnlyAfterFinalRelease(params: {
lockPath: string;
firstLock: { release: () => Promise<void> };
secondLock: { release: () => Promise<void> };
}) {
await expect(fs.access(params.lockPath)).resolves.toBeUndefined();
await params.firstLock.release();
await expect(fs.access(params.lockPath)).resolves.toBeUndefined();
await params.secondLock.release();
await expectPathMissing(params.lockPath);
}
async function expectPathMissing(targetPath: string): Promise<void> {
try {
await fs.access(targetPath);
} catch (error) {
expect((error as NodeJS.ErrnoException).code).toBe("ENOENT");
return;
}
throw new Error(`Expected path to be missing: ${targetPath}`);
}
async function expectCurrentPidOwnsLock(params: {
sessionFile: string;
timeoutMs: number;
staleMs?: number;
}) {
const { sessionFile, timeoutMs, staleMs } = params;
const lockPath = `${sessionFile}.lock`;
const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs, staleMs });
const raw = await fs.readFile(lockPath, "utf8");
const payload = JSON.parse(raw) as { pid: number };
expect(payload.pid).toBe(process.pid);
await lock.release();
}
async function withTempSessionLockFile(
run: (params: { root: string; sessionFile: string; lockPath: string }) => Promise<void>,
) {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
try {
const sessionFile = path.join(root, "sessions.json");
await run({ root, sessionFile, lockPath: `${sessionFile}.lock` });
} finally {
await fs.rm(root, { recursive: true, force: true });
}
}
async function writeCurrentProcessLock(lockPath: string, extra?: Record<string, unknown>) {
await fs.writeFile(
lockPath,
JSON.stringify({
pid: process.pid,
createdAt: new Date().toISOString(),
...extra,
}),
"utf8",
);
}
async function withSymlinkedSessionPaths(
run: (params: {
sessionReal: string;
sessionLink: string;
realLockPath: string;
linkLockPath: string;
}) => Promise<void>,
) {
if (process.platform === "win32") {
return;
}
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
try {
const realDir = path.join(root, "real");
const linkDir = path.join(root, "link");
await fs.mkdir(realDir, { recursive: true });
await fs.symlink(realDir, linkDir);
const sessionReal = path.join(realDir, "sessions.json");
const sessionLink = path.join(linkDir, "sessions.json");
await run({
sessionReal,
sessionLink,
realLockPath: `${sessionReal}.lock`,
linkLockPath: `${sessionLink}.lock`,
});
} finally {
await fs.rm(root, { recursive: true, force: true });
}
}
async function expectActiveInProcessLockIsNotReclaimed(params?: {
legacyStarttime?: unknown;
}): Promise<void> {
await withTempSessionLockFile(async ({ sessionFile, lockPath }) => {
const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
const lockPayload = {
pid: process.pid,
createdAt: new Date().toISOString(),
...(params && "legacyStarttime" in params ? { starttime: params.legacyStarttime } : {}),
};
await fs.writeFile(lockPath, JSON.stringify(lockPayload), "utf8");
await expect(
acquireSessionWriteLock({
sessionFile,
timeoutMs: 5,
allowReentrant: false,
}),
).rejects.toThrow(/session file locked/);
await lock.release();
});
}
describe("acquireSessionWriteLock", () => {
beforeAll(async () => {
({
__testing,
acquireSessionWriteLock,
cleanStaleLockFiles,
resetSessionWriteLockStateForTest,
resolveSessionLockMaxHoldFromTimeout,
resolveSessionWriteLockAcquireTimeoutMs,
} = await import("./session-write-lock.js"));
});
afterEach(() => {
resetSessionWriteLockStateForTest();
vi.clearAllMocks();
});
function pinCurrentProcessStartTimeForTest(): void {
__testing.setProcessStartTimeResolverForTest((pid) =>
pid === process.pid ? FAKE_STARTTIME : null,
);
}
it("reuses locks across symlinked session paths", async () => {
await withSymlinkedSessionPaths(
async ({ sessionReal, sessionLink, realLockPath, linkLockPath }) => {
const lockA = await acquireSessionWriteLock({
sessionFile: sessionReal,
timeoutMs: 500,
allowReentrant: true,
});
const lockB = await acquireSessionWriteLock({
sessionFile: sessionLink,
timeoutMs: 500,
allowReentrant: true,
});
await expect(fs.access(realLockPath)).resolves.toBeUndefined();
await expect(fs.access(linkLockPath)).resolves.toBeUndefined();
const [realCanonicalLockPath, linkCanonicalLockPath] = await Promise.all([
fs.realpath(realLockPath),
fs.realpath(linkLockPath),
]);
expect(linkCanonicalLockPath).toBe(realCanonicalLockPath);
await expectLockRemovedOnlyAfterFinalRelease({
lockPath: realLockPath,
firstLock: lockA,
secondLock: lockB,
});
},
);
});
it("keeps the lock file until the last release", async () => {
await withTempSessionLockFile(async ({ sessionFile, lockPath }) => {
const lockA = await acquireSessionWriteLock({
sessionFile,
timeoutMs: 500,
allowReentrant: true,
});
const lockB = await acquireSessionWriteLock({
sessionFile,
timeoutMs: 500,
allowReentrant: true,
});
await expectLockRemovedOnlyAfterFinalRelease({
lockPath,
firstLock: lockA,
secondLock: lockB,
});
});
});
it("does not reenter locks by default in the same process", async () => {
await withTempSessionLockFile(async ({ sessionFile }) => {
const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
await expect(
acquireSessionWriteLock({ sessionFile, timeoutMs: 5, staleMs: 60_000 }),
).rejects.toThrow(/session file locked/);
await lock.release();
});
});
it("does not reenter locks by default through symlinked session paths", async () => {
await withSymlinkedSessionPaths(async ({ sessionReal, sessionLink }) => {
const lock = await acquireSessionWriteLock({ sessionFile: sessionReal, timeoutMs: 500 });
await expect(
acquireSessionWriteLock({ sessionFile: sessionLink, timeoutMs: 5, staleMs: 60_000 }),
).rejects.toThrow(/session file locked/);
await lock.release();
});
});
it("allows a new default lock acquisition after the held lock is released", async () => {
await withTempSessionLockFile(async ({ sessionFile }) => {
const lockA = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
await expect(
acquireSessionWriteLock({ sessionFile, timeoutMs: 5, staleMs: 60_000 }),
).rejects.toThrow(/session file locked/);
await lockA.release();
const lockB = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
await lockB.release();
});
});
it("reclaims stale lock files", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
try {
const sessionFile = path.join(root, "sessions.json");
const lockPath = `${sessionFile}.lock`;
await fs.writeFile(
lockPath,
JSON.stringify({ pid: 2 ** 30, createdAt: new Date(Date.now() - 60_000).toISOString() }),
"utf8",
);
await expectCurrentPidOwnsLock({ sessionFile, timeoutMs: 500, staleMs: 10 });
} finally {
await fs.rm(root, { recursive: true, force: true });
}
});
it("does not reclaim fresh malformed lock files during contention", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
try {
const sessionFile = path.join(root, "sessions.json");
const lockPath = `${sessionFile}.lock`;
await fs.writeFile(lockPath, "{}", "utf8");
await expect(
acquireSessionWriteLock({ sessionFile, timeoutMs: 5, staleMs: 60_000 }),
).rejects.toThrow(/session file locked/);
await expect(fs.access(lockPath)).resolves.toBeUndefined();
} finally {
await fs.rm(root, { recursive: true, force: true });
}
});
it("reclaims payload-less orphan lock files after the short init grace", async () => {
await withTempSessionLockFile(async ({ sessionFile, lockPath }) => {
await fs.writeFile(lockPath, "", "utf8");
const orphanDate = new Date(Date.now() - 10_000);
await fs.utimes(lockPath, orphanDate, orphanDate);
const lock = await acquireSessionWriteLock({
sessionFile,
timeoutMs: 10_000,
staleMs: 60_000,
});
const raw = await fs.readFile(lockPath, "utf8");
const payload = JSON.parse(raw) as { pid?: unknown };
expect(payload.pid).toBe(process.pid);
await lock.release();
});
});
it("reclaims malformed lock files once they are old enough", async () => {
await withTempSessionLockFile(async ({ sessionFile, lockPath }) => {
await fs.writeFile(lockPath, "{}", "utf8");
const staleDate = new Date(Date.now() - 2 * 60_000);
await fs.utimes(lockPath, staleDate, staleDate);
const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500, staleMs: 10_000 });
await lock.release();
await expectPathMissing(lockPath);
});
});
it("watchdog releases stale in-process locks", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
try {
const sessionFile = path.join(root, "session.jsonl");
const lockPath = `${sessionFile}.lock`;
const lockA = await acquireSessionWriteLock({
sessionFile,
timeoutMs: 500,
maxHoldMs: 1,
});
const released = await __testing.runLockWatchdogCheck(Date.now() + 1000);
expect(released).toBeGreaterThanOrEqual(1);
await expectPathMissing(lockPath);
const lockB = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
await expect(fs.access(lockPath)).resolves.toBeUndefined();
// Old release handle must not affect the new lock.
await expectLockRemovedOnlyAfterFinalRelease({
lockPath,
firstLock: lockA,
secondLock: lockB,
});
} finally {
stderrSpy.mockRestore();
await fs.rm(root, { recursive: true, force: true });
}
});
it("removes lock files during process-exit cleanup", async () => {
await withTempSessionLockFile(async ({ sessionFile, lockPath }) => {
const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
__testing.releaseAllLocksSync();
await expectPathMissing(lockPath);
await lock.release();
});
});
it("derives max hold from timeout plus grace", () => {
expect(resolveSessionLockMaxHoldFromTimeout({ timeoutMs: 600_000 })).toBe(720_000);
expect(resolveSessionLockMaxHoldFromTimeout({ timeoutMs: 1_000, minMs: 5_000 })).toBe(121_000);
});
it("resolves the session write-lock acquire timeout", () => {
expect(resolveSessionWriteLockAcquireTimeoutMs()).toBe(60_000);
expect(
resolveSessionWriteLockAcquireTimeoutMs({
session: { writeLock: { acquireTimeoutMs: 90_000 } },
}),
).toBe(90_000);
expect(
resolveSessionWriteLockAcquireTimeoutMs({
session: { writeLock: { acquireTimeoutMs: 0 } },
}),
).toBe(60_000);
});
it("clamps max hold for effectively no-timeout runs", () => {
expect(
resolveSessionLockMaxHoldFromTimeout({
timeoutMs: 2_147_000_000,
}),
).toBe(2_147_000_000);
});
it("cleans stale .jsonl lock files in sessions directories", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
const sessionsDir = path.join(root, "sessions");
await fs.mkdir(sessionsDir, { recursive: true });
const nowMs = Date.now();
const staleDeadLock = path.join(sessionsDir, "dead.jsonl.lock");
const staleAliveLock = path.join(sessionsDir, "old-live.jsonl.lock");
const freshAliveLock = path.join(sessionsDir, "fresh-live.jsonl.lock");
try {
await fs.writeFile(
staleDeadLock,
JSON.stringify({
pid: 999_999,
createdAt: new Date(nowMs - 120_000).toISOString(),
}),
"utf8",
);
await fs.writeFile(
staleAliveLock,
JSON.stringify({
pid: process.pid,
createdAt: new Date(nowMs - 120_000).toISOString(),
}),
"utf8",
);
await fs.writeFile(
freshAliveLock,
JSON.stringify({
pid: process.pid,
createdAt: new Date(nowMs - 1_000).toISOString(),
}),
"utf8",
);
const result = await cleanStaleLockFiles({
sessionsDir,
staleMs: 30_000,
nowMs,
removeStale: true,
readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "agent"],
});
expect(result.locks).toHaveLength(3);
expect(result.cleaned).toHaveLength(2);
expect(result.cleaned.map((entry) => path.basename(entry.lockPath)).toSorted()).toEqual([
"dead.jsonl.lock",
"old-live.jsonl.lock",
]);
await expectPathMissing(staleDeadLock);
await expectPathMissing(staleAliveLock);
await expect(fs.access(freshAliveLock)).resolves.toBeUndefined();
} finally {
await fs.rm(root, { recursive: true, force: true });
}
});
it("cleans fresh live .jsonl lock files owned by a non-OpenClaw process", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
const sessionsDir = path.join(root, "sessions");
await fs.mkdir(sessionsDir, { recursive: true });
const nowMs = Date.now();
const falseLiveLock = path.join(sessionsDir, "false-live.jsonl.lock");
try {
await fs.writeFile(
falseLiveLock,
JSON.stringify({
pid: process.pid,
createdAt: new Date(nowMs).toISOString(),
}),
"utf8",
);
const result = await cleanStaleLockFiles({
sessionsDir,
staleMs: 30_000,
nowMs,
removeStale: true,
readOwnerProcessArgs: () => ["python", "worker.py"],
});
expect(result.locks).toHaveLength(1);
expect(result.cleaned.map((entry) => path.basename(entry.lockPath))).toEqual([
"false-live.jsonl.lock",
]);
expect(result.cleaned[0]?.staleReasons).toContain("non-openclaw-owner");
await expect(fs.access(falseLiveLock)).rejects.toThrow();
} finally {
await fs.rm(root, { recursive: true, force: true });
}
});
it("cleans fresh live .jsonl lock files owned by generic non-OpenClaw entrypoints", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
const sessionsDir = path.join(root, "sessions");
await fs.mkdir(sessionsDir, { recursive: true });
const nowMs = Date.now();
const falseLiveLock = path.join(sessionsDir, "false-live-generic-entry.jsonl.lock");
try {
await fs.writeFile(
falseLiveLock,
JSON.stringify({
pid: process.pid,
createdAt: new Date(nowMs).toISOString(),
}),
"utf8",
);
const result = await cleanStaleLockFiles({
sessionsDir,
staleMs: 30_000,
nowMs,
removeStale: true,
readOwnerProcessArgs: () => ["node", "/srv/app/dist/index.js"],
});
expect(result.cleaned.map((entry) => path.basename(entry.lockPath))).toEqual([
"false-live-generic-entry.jsonl.lock",
]);
expect(result.cleaned[0]?.staleReasons).toContain("non-openclaw-owner");
await expect(fs.access(falseLiveLock)).rejects.toThrow();
} finally {
await fs.rm(root, { recursive: true, force: true });
}
});
it("keeps fresh live .jsonl lock files with OpenClaw or unknown owners", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
const sessionsDir = path.join(root, "sessions");
await fs.mkdir(sessionsDir, { recursive: true });
const nowMs = Date.now();
const openclawLock = path.join(sessionsDir, "openclaw-live.jsonl.lock");
const gatewayLock = path.join(sessionsDir, "gateway-live.jsonl.lock");
const unknownLock = path.join(sessionsDir, "unknown-live.jsonl.lock");
try {
await fs.writeFile(
openclawLock,
JSON.stringify({
pid: process.pid,
createdAt: new Date(nowMs).toISOString(),
}),
"utf8",
);
const openclawResult = await cleanStaleLockFiles({
sessionsDir,
staleMs: 30_000,
nowMs,
removeStale: true,
readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "agent"],
});
expect(openclawResult.cleaned).toEqual([]);
await expect(fs.access(openclawLock)).resolves.toBeUndefined();
await fs.rm(openclawLock, { force: true });
await fs.writeFile(
gatewayLock,
JSON.stringify({
pid: process.pid,
createdAt: new Date(nowMs).toISOString(),
}),
"utf8",
);
const gatewayResult = await cleanStaleLockFiles({
sessionsDir,
staleMs: 30_000,
nowMs,
removeStale: true,
readOwnerProcessArgs: () => ["node", "dist/index.js", "gateway", "run"],
});
expect(gatewayResult.cleaned).toEqual([]);
await expect(fs.access(gatewayLock)).resolves.toBeUndefined();
await fs.rm(gatewayLock, { force: true });
await fs.writeFile(
unknownLock,
JSON.stringify({
pid: process.pid,
createdAt: new Date(nowMs).toISOString(),
}),
"utf8",
);
const unknownResult = await cleanStaleLockFiles({
sessionsDir,
staleMs: 30_000,
nowMs,
removeStale: true,
readOwnerProcessArgs: () => null,
});
expect(unknownResult.cleaned).toEqual([]);
await expect(fs.access(unknownLock)).resolves.toBeUndefined();
} finally {
await fs.rm(root, { recursive: true, force: true });
}
});
it("cleans untracked current-process .jsonl lock files with matching starttime", async () => {
pinCurrentProcessStartTimeForTest();
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
const sessionsDir = path.join(root, "sessions");
await fs.mkdir(sessionsDir, { recursive: true });
const nowMs = Date.now();
const orphanSelfLock = path.join(sessionsDir, "orphan-self.jsonl.lock");
try {
await fs.writeFile(
orphanSelfLock,
JSON.stringify({
pid: process.pid,
createdAt: new Date(nowMs).toISOString(),
starttime: FAKE_STARTTIME,
}),
"utf8",
);
const result = await cleanStaleLockFiles({
sessionsDir,
staleMs: 30_000,
nowMs,
removeStale: true,
});
expect(result.locks).toHaveLength(1);
expect(result.cleaned.map((entry) => path.basename(entry.lockPath))).toEqual([
"orphan-self.jsonl.lock",
]);
expect(result.cleaned[0]?.staleReasons).toContain("orphan-self-pid");
await expectPathMissing(orphanSelfLock);
} finally {
await fs.rm(root, { recursive: true, force: true });
}
});
it("removes held locks on termination signals", async () => {
const signals = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const;
const originalKill = process.kill.bind(process);
process.kill = ((_pid: number, _signal?: NodeJS.Signals) => true) as typeof process.kill;
try {
for (const signal of signals) {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-cleanup-"));
try {
const sessionFile = path.join(root, "sessions.json");
const lockPath = `${sessionFile}.lock`;
await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
const keepAlive = () => {};
if (signal === "SIGINT") {
process.on(signal, keepAlive);
}
__testing.handleTerminationSignal(signal);
await expectPathMissing(lockPath);
if (signal === "SIGINT") {
process.off(signal, keepAlive);
}
} finally {
await fs.rm(root, { recursive: true, force: true });
}
}
} finally {
process.kill = originalKill;
}
});
it("reclaims lock files with recycled PIDs", async () => {
if (process.platform !== "linux") {
return;
}
await withTempSessionLockFile(async ({ sessionFile, lockPath }) => {
pinCurrentProcessStartTimeForTest();
// Write a lock with a live PID (current process) but a wrong starttime,
// simulating PID recycling: the PID is alive but belongs to a different
// process than the one that created the lock.
await writeCurrentProcessLock(lockPath, { starttime: 999_999_999 });
await expectCurrentPidOwnsLock({ sessionFile, timeoutMs: 500 });
});
});
it("reclaims orphan lock files without starttime when PID matches current process", async () => {
await withTempSessionLockFile(async ({ sessionFile, lockPath }) => {
// Simulate an old-format lock file left behind by a previous process
// instance that reused the same PID (common in containers).
await writeCurrentProcessLock(lockPath);
await expectCurrentPidOwnsLock({ sessionFile, timeoutMs: 500 });
});
});
it("reclaims untracked current-process lock files with matching starttime", async () => {
await withTempSessionLockFile(async ({ sessionFile, lockPath }) => {
pinCurrentProcessStartTimeForTest();
await writeCurrentProcessLock(lockPath, { starttime: FAKE_STARTTIME });
await expectCurrentPidOwnsLock({ sessionFile, timeoutMs: 500 });
});
});
it("does not reclaim active in-process lock files without starttime", async () => {
await expectActiveInProcessLockIsNotReclaimed();
});
it("does not reclaim active in-process lock files with malformed starttime", async () => {
await expectActiveInProcessLockIsNotReclaimed({ legacyStarttime: 123.5 });
});
it("does not reclaim active in-process lock files with matching starttime", async () => {
pinCurrentProcessStartTimeForTest();
await expectActiveInProcessLockIsNotReclaimed({ legacyStarttime: FAKE_STARTTIME });
});
it("registers cleanup for SIGQUIT and SIGABRT", () => {
expect(__testing.cleanupSignals).toContain("SIGQUIT");
expect(__testing.cleanupSignals).toContain("SIGABRT");
});
it("cleans up locks on SIGINT without removing other handlers", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
const originalKill = process.kill.bind(process);
const killCalls: Array<NodeJS.Signals | undefined> = [];
let otherHandlerCalled = false;
process.kill = ((pid: number, signal?: NodeJS.Signals) => {
killCalls.push(signal);
return true;
}) as typeof process.kill;
const otherHandler = () => {
otherHandlerCalled = true;
};
process.on("SIGINT", otherHandler);
try {
const sessionFile = path.join(root, "sessions.json");
const lockPath = `${sessionFile}.lock`;
await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
__testing.handleTerminationSignal("SIGINT");
await expectPathMissing(lockPath);
expect(otherHandlerCalled).toBe(false);
expect(killCalls).toStrictEqual([]);
} finally {
process.off("SIGINT", otherHandler);
process.kill = originalKill;
await fs.rm(root, { recursive: true, force: true });
}
});
it("cleans up locks on exit", async () => {
await withTempSessionLockFile(async ({ sessionFile, lockPath }) => {
await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
process.emit("exit", 0);
await expectPathMissing(lockPath);
});
});
it("does not accumulate exit listeners across reset cycles", async () => {
const baselineExitListeners = process.listenerCount("exit");
await withTempSessionLockFile(async ({ sessionFile }) => {
for (let i = 0; i < 3; i += 1) {
const lock = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
await lock.release();
resetSessionWriteLockStateForTest();
expect(process.listenerCount("exit")).toBe(baselineExitListeners);
}
});
});
it("keeps other signal listeners registered", () => {
const keepAlive = () => {};
const originalKill = process.kill.bind(process);
process.kill = ((_pid: number, _signal?: NodeJS.Signals) => true) as typeof process.kill;
process.on("SIGINT", keepAlive);
try {
__testing.handleTerminationSignal("SIGINT");
expect(process.listeners("SIGINT")).toContain(keepAlive);
} finally {
process.off("SIGINT", keepAlive);
process.kill = originalKill;
}
});
});

View File

@@ -1,710 +0,0 @@
import "../infra/fs-safe-defaults.js";
import type fsSync from "node:fs";
import fs from "node:fs/promises";
import path from "node:path";
import { createFileLockManager } from "../infra/file-lock-manager.js";
import { readGatewayProcessArgsSync as readProcessArgsSync } from "../infra/gateway-processes.js";
import { getProcessStartTime, isPidAlive } from "../shared/pid-alive.js";
import { SessionWriteLockTimeoutError } from "./session-write-lock-error.js";
type LockFilePayload = {
pid?: number;
createdAt?: string;
/** Process start time in clock ticks (from /proc/pid/stat field 22). */
starttime?: number;
};
function isValidLockNumber(value: unknown): value is number {
return typeof value === "number" && Number.isInteger(value) && value >= 0;
}
export type SessionLockInspection = {
lockPath: string;
pid: number | null;
pidAlive: boolean;
createdAt: string | null;
ageMs: number | null;
stale: boolean;
staleReasons: string[];
removed: boolean;
};
export type SessionLockOwnerProcessArgsReader = (pid: number) => string[] | null;
const CLEANUP_SIGNALS = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const;
type CleanupSignal = (typeof CLEANUP_SIGNALS)[number];
const CLEANUP_STATE_KEY = Symbol.for("openclaw.sessionWriteLockCleanupState");
const WATCHDOG_STATE_KEY = Symbol.for("openclaw.sessionWriteLockWatchdogState");
const DEFAULT_STALE_MS = 30 * 60 * 1000;
const DEFAULT_MAX_HOLD_MS = 5 * 60 * 1000;
export const DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS = 60_000;
const DEFAULT_WATCHDOG_INTERVAL_MS = 60_000;
const DEFAULT_TIMEOUT_GRACE_MS = 2 * 60 * 1000;
// A payload-less lock can be left behind if shutdown lands between open("wx")
// and the owner metadata write. Keep the grace short so 10s callers recover.
const ORPHAN_LOCK_PAYLOAD_GRACE_MS = 5_000;
const MAX_LOCK_HOLD_MS = 2_147_000_000;
type CleanupState = {
registered: boolean;
exitHandler?: () => void;
cleanupHandlers: Map<CleanupSignal, () => void>;
};
type WatchdogState = {
started: boolean;
intervalMs: number;
timer?: NodeJS.Timeout;
};
type LockInspectionDetails = Pick<
SessionLockInspection,
"pid" | "pidAlive" | "createdAt" | "ageMs" | "stale" | "staleReasons"
>;
const SESSION_LOCKS = createFileLockManager("openclaw.session-write-lock");
let resolveProcessStartTimeForLock = getProcessStartTime;
function isFileLockError(error: unknown, code: string): boolean {
return (error as { code?: unknown } | null)?.code === code;
}
export type SessionWriteLockAcquireTimeoutConfig = {
session?: {
writeLock?: {
acquireTimeoutMs?: number;
};
};
};
export function resolveSessionWriteLockAcquireTimeoutMs(
config?: SessionWriteLockAcquireTimeoutConfig,
): number {
return resolvePositiveMs(
config?.session?.writeLock?.acquireTimeoutMs,
DEFAULT_SESSION_WRITE_LOCK_ACQUIRE_TIMEOUT_MS,
{ allowInfinity: true },
);
}
function resolveCleanupState(): CleanupState {
const proc = process as NodeJS.Process & {
[CLEANUP_STATE_KEY]?: CleanupState;
};
if (!proc[CLEANUP_STATE_KEY]) {
proc[CLEANUP_STATE_KEY] = {
registered: false,
exitHandler: undefined,
cleanupHandlers: new Map<CleanupSignal, () => void>(),
};
}
return proc[CLEANUP_STATE_KEY];
}
function resolveWatchdogState(): WatchdogState {
const proc = process as NodeJS.Process & {
[WATCHDOG_STATE_KEY]?: WatchdogState;
};
if (!proc[WATCHDOG_STATE_KEY]) {
proc[WATCHDOG_STATE_KEY] = {
started: false,
intervalMs: DEFAULT_WATCHDOG_INTERVAL_MS,
};
}
return proc[WATCHDOG_STATE_KEY];
}
function resolvePositiveMs(
value: number | undefined,
fallback: number,
opts: { allowInfinity?: boolean } = {},
): number {
if (typeof value !== "number" || Number.isNaN(value) || value <= 0) {
return fallback;
}
if (value === Number.POSITIVE_INFINITY) {
return opts.allowInfinity ? value : fallback;
}
if (!Number.isFinite(value)) {
return fallback;
}
return value;
}
export function resolveSessionLockMaxHoldFromTimeout(params: {
timeoutMs: number;
graceMs?: number;
minMs?: number;
}): number {
const minMs = resolvePositiveMs(params.minMs, DEFAULT_MAX_HOLD_MS);
const timeoutMs = resolvePositiveMs(params.timeoutMs, minMs, { allowInfinity: true });
if (timeoutMs === Number.POSITIVE_INFINITY) {
return MAX_LOCK_HOLD_MS;
}
const graceMs = resolvePositiveMs(params.graceMs, DEFAULT_TIMEOUT_GRACE_MS);
return Math.min(MAX_LOCK_HOLD_MS, Math.max(minMs, timeoutMs + graceMs));
}
/**
* Synchronously release all held locks.
* Used during process exit when async operations aren't reliable.
*/
function releaseAllLocksSync(): void {
SESSION_LOCKS.reset();
stopWatchdogTimer();
}
async function runLockWatchdogCheck(nowMs = Date.now()): Promise<number> {
let released = 0;
for (const held of SESSION_LOCKS.heldEntries()) {
const maxHoldMs =
typeof held.metadata.maxHoldMs === "number" ? held.metadata.maxHoldMs : DEFAULT_MAX_HOLD_MS;
const heldForMs = nowMs - held.acquiredAt;
if (heldForMs <= maxHoldMs) {
continue;
}
process.stderr.write(
`[session-write-lock] releasing lock held for ${heldForMs}ms (max=${maxHoldMs}ms): ${held.lockPath}\n`,
);
const didRelease = await held.forceRelease();
if (didRelease) {
released += 1;
}
}
return released;
}
function stopWatchdogTimer(): void {
const watchdogState = resolveWatchdogState();
if (watchdogState.timer) {
clearInterval(watchdogState.timer);
watchdogState.timer = undefined;
}
watchdogState.started = false;
}
function shouldStartBackgroundWatchdog(): boolean {
return process.env.VITEST !== "true" || process.env.OPENCLAW_TEST_SESSION_LOCK_WATCHDOG === "1";
}
function ensureWatchdogStarted(intervalMs: number): void {
if (!shouldStartBackgroundWatchdog()) {
return;
}
const watchdogState = resolveWatchdogState();
if (watchdogState.started) {
return;
}
watchdogState.started = true;
watchdogState.intervalMs = intervalMs;
watchdogState.timer = setInterval(() => {
void runLockWatchdogCheck().catch(() => {
// Ignore watchdog errors - best effort cleanup only.
});
}, intervalMs);
watchdogState.timer.unref?.();
}
function handleTerminationSignal(signal: CleanupSignal): void {
releaseAllLocksSync();
const cleanupState = resolveCleanupState();
const shouldReraise = process.listenerCount(signal) === 1;
if (shouldReraise) {
const handler = cleanupState.cleanupHandlers.get(signal);
if (handler) {
process.off(signal, handler);
cleanupState.cleanupHandlers.delete(signal);
}
try {
process.kill(process.pid, signal);
} catch {
// Ignore errors during shutdown
}
}
}
function registerCleanupHandlers(): void {
const cleanupState = resolveCleanupState();
cleanupState.registered = true;
if (!cleanupState.exitHandler) {
// Cleanup on normal exit and process.exit() calls
cleanupState.exitHandler = () => {
releaseAllLocksSync();
};
process.on("exit", cleanupState.exitHandler);
}
ensureWatchdogStarted(DEFAULT_WATCHDOG_INTERVAL_MS);
// Handle termination signals
for (const signal of CLEANUP_SIGNALS) {
if (cleanupState.cleanupHandlers.has(signal)) {
continue;
}
try {
const handler = () => handleTerminationSignal(signal);
cleanupState.cleanupHandlers.set(signal, handler);
process.on(signal, handler);
} catch {
// Ignore unsupported signals on this platform.
}
}
}
function unregisterCleanupHandlers(): void {
const cleanupState = resolveCleanupState();
if (cleanupState.exitHandler) {
process.off("exit", cleanupState.exitHandler);
cleanupState.exitHandler = undefined;
}
for (const [signal, handler] of cleanupState.cleanupHandlers) {
process.off(signal, handler);
}
cleanupState.cleanupHandlers.clear();
cleanupState.registered = false;
}
async function readLockPayload(lockPath: string): Promise<LockFilePayload | null> {
try {
const raw = await fs.readFile(lockPath, "utf8");
const parsed = JSON.parse(raw) as Record<string, unknown>;
const payload: LockFilePayload = {};
if (isValidLockNumber(parsed.pid) && parsed.pid > 0) {
payload.pid = parsed.pid;
}
if (typeof parsed.createdAt === "string") {
payload.createdAt = parsed.createdAt;
}
if (isValidLockNumber(parsed.starttime)) {
payload.starttime = parsed.starttime;
}
return payload;
} catch {
return null;
}
}
async function resolveNormalizedSessionFile(sessionFile: string): Promise<string> {
const resolvedSessionFile = path.resolve(sessionFile);
const sessionDir = path.dirname(resolvedSessionFile);
try {
const normalizedDir = await fs.realpath(sessionDir);
return path.join(normalizedDir, path.basename(resolvedSessionFile));
} catch {
return resolvedSessionFile;
}
}
function normalizeOwnerProcessArg(arg: string): string {
return arg.trim().replaceAll("\\", "/").toLowerCase();
}
function isOpenClawSessionOwnerArgv(args: string[]): boolean {
const normalized = args.map(normalizeOwnerProcessArg).filter(Boolean);
if (normalized.length === 0) {
return false;
}
const exe = (normalized[0] ?? "").replace(/\.(bat|cmd|exe)$/i, "");
if (exe === "openclaw" || exe.endsWith("/openclaw") || exe.endsWith("/openclaw-gateway")) {
return true;
}
if (
normalized.some(
(arg) =>
arg === "openclaw" ||
arg.endsWith("/openclaw") ||
arg === "openclaw.mjs" ||
arg.endsWith("/openclaw.mjs"),
)
) {
return true;
}
const entryCandidates = [
"dist/index.js",
"dist/entry.js",
"scripts/run-node.mjs",
"src/entry.ts",
"src/index.ts",
];
const hasOpenClawCommandToken = normalized.some((arg) => arg === "gateway" || arg === "agent");
return normalized.some(
(arg) => entryCandidates.some((entry) => arg.endsWith(entry)) && hasOpenClawCommandToken,
);
}
function readOwnerProcessArgs(
reader: SessionLockOwnerProcessArgsReader,
pid: number,
): string[] | null {
try {
const args = reader(pid);
return Array.isArray(args) ? args : null;
} catch {
return null;
}
}
function inspectLockPayload(
payload: LockFilePayload | null,
staleMs: number,
nowMs: number,
): LockInspectionDetails {
const pid = isValidLockNumber(payload?.pid) && payload.pid > 0 ? payload.pid : null;
const pidAlive = pid !== null ? isPidAlive(pid) : false;
const createdAt = typeof payload?.createdAt === "string" ? payload.createdAt : null;
const createdAtMs = createdAt ? Date.parse(createdAt) : Number.NaN;
const ageMs = Number.isFinite(createdAtMs) ? Math.max(0, nowMs - createdAtMs) : null;
// Detect PID recycling: if the PID is alive but its start time differs from
// what was recorded in the lock file, the original process died and the OS
// reassigned the same PID to a different process.
const storedStarttime = isValidLockNumber(payload?.starttime) ? payload.starttime : null;
const pidRecycled =
pidAlive && pid !== null && storedStarttime !== null
? (() => {
const currentStarttime = resolveProcessStartTimeForLock(pid);
return currentStarttime !== null && currentStarttime !== storedStarttime;
})()
: false;
const staleReasons: string[] = [];
if (pid === null) {
staleReasons.push("missing-pid");
} else if (!pidAlive) {
staleReasons.push("dead-pid");
} else if (pidRecycled) {
staleReasons.push("recycled-pid");
}
if (ageMs === null) {
staleReasons.push("invalid-createdAt");
} else if (ageMs > staleMs) {
staleReasons.push("too-old");
}
return {
pid,
pidAlive,
createdAt,
ageMs,
stale: staleReasons.length > 0,
staleReasons,
};
}
function shouldTreatAsNonOpenClawOwner(params: {
payload: LockFilePayload | null;
inspected: LockInspectionDetails;
heldByThisProcess: boolean;
readOwnerProcessArgs: SessionLockOwnerProcessArgsReader;
}): boolean {
if (params.inspected.stale || params.inspected.pid === null || !params.inspected.pidAlive) {
return false;
}
if (params.inspected.pid === process.pid && params.heldByThisProcess) {
return false;
}
if (!isValidLockNumber(params.payload?.pid) || params.payload.pid <= 0) {
return false;
}
const args = readOwnerProcessArgs(params.readOwnerProcessArgs, params.payload.pid);
if (!args || args.every((arg) => !arg.trim())) {
return false;
}
return !isOpenClawSessionOwnerArgv(args);
}
function lockInspectionNeedsMtimeStaleFallback(details: LockInspectionDetails): boolean {
return (
details.stale &&
details.staleReasons.every(
(reason) => reason === "missing-pid" || reason === "invalid-createdAt",
)
);
}
async function shouldReclaimContendedLockFile(
lockPath: string,
details: LockInspectionDetails,
staleMs: number,
nowMs: number,
): Promise<boolean> {
if (!details.stale) {
return false;
}
if (!lockInspectionNeedsMtimeStaleFallback(details)) {
return true;
}
try {
const stat = await fs.stat(lockPath);
const ageMs = Math.max(0, nowMs - stat.mtimeMs);
return ageMs > Math.min(staleMs, ORPHAN_LOCK_PAYLOAD_GRACE_MS);
} catch (error) {
const code = (error as { code?: string } | null)?.code;
return code !== "ENOENT";
}
}
function sessionLockHeldByThisProcess(normalizedSessionFile: string): boolean {
return SESSION_LOCKS.heldEntries().some(
(entry) => entry.normalizedTargetPath === normalizedSessionFile,
);
}
async function removeReportedStaleLockIfStillStale(params: {
lockPath: string;
normalizedSessionFile: string;
staleMs: number;
readOwnerProcessArgs?: SessionLockOwnerProcessArgsReader;
}): Promise<boolean> {
const nowMs = Date.now();
const payload = await readLockPayload(params.lockPath);
const inspected = inspectLockPayloadForSession({
payload,
staleMs: params.staleMs,
nowMs,
heldByThisProcess: sessionLockHeldByThisProcess(params.normalizedSessionFile),
reclaimLockWithoutStarttime: true,
readOwnerProcessArgs: params.readOwnerProcessArgs ?? readProcessArgsSync,
});
if (!(await shouldReclaimContendedLockFile(params.lockPath, inspected, params.staleMs, nowMs))) {
return false;
}
await fs.rm(params.lockPath, { force: true });
return true;
}
function shouldTreatAsOrphanSelfLock(params: {
payload: LockFilePayload | null;
heldByThisProcess: boolean;
reclaimLockWithoutStarttime: boolean;
}): boolean {
const pid = isValidLockNumber(params.payload?.pid) ? params.payload.pid : null;
if (pid !== process.pid) {
return false;
}
if (params.heldByThisProcess) {
return false;
}
const storedStarttime = isValidLockNumber(params.payload?.starttime)
? params.payload.starttime
: null;
if (storedStarttime === null) {
return params.reclaimLockWithoutStarttime;
}
const currentStarttime = resolveProcessStartTimeForLock(process.pid);
return currentStarttime !== null && currentStarttime === storedStarttime;
}
function inspectLockPayloadForSession(params: {
payload: LockFilePayload | null;
staleMs: number;
nowMs: number;
heldByThisProcess: boolean;
reclaimLockWithoutStarttime: boolean;
readOwnerProcessArgs: SessionLockOwnerProcessArgsReader;
}): LockInspectionDetails {
const inspected = inspectLockPayload(params.payload, params.staleMs, params.nowMs);
if (
shouldTreatAsOrphanSelfLock({
payload: params.payload,
heldByThisProcess: params.heldByThisProcess,
reclaimLockWithoutStarttime: params.reclaimLockWithoutStarttime,
})
) {
return {
...inspected,
stale: true,
staleReasons: inspected.staleReasons.includes("orphan-self-pid")
? inspected.staleReasons
: [...inspected.staleReasons, "orphan-self-pid"],
};
}
if (
shouldTreatAsNonOpenClawOwner({
payload: params.payload,
inspected,
heldByThisProcess: params.heldByThisProcess,
readOwnerProcessArgs: params.readOwnerProcessArgs,
})
) {
return {
...inspected,
stale: true,
staleReasons: [...inspected.staleReasons, "non-openclaw-owner"],
};
}
return inspected;
}
export async function cleanStaleLockFiles(params: {
sessionsDir: string;
staleMs?: number;
removeStale?: boolean;
nowMs?: number;
readOwnerProcessArgs?: SessionLockOwnerProcessArgsReader;
log?: {
warn?: (message: string) => void;
info?: (message: string) => void;
};
}): Promise<{ locks: SessionLockInspection[]; cleaned: SessionLockInspection[] }> {
const sessionsDir = path.resolve(params.sessionsDir);
const staleMs = resolvePositiveMs(params.staleMs, DEFAULT_STALE_MS);
const removeStale = params.removeStale !== false;
const nowMs = params.nowMs ?? Date.now();
const ownerProcessArgsReader = params.readOwnerProcessArgs ?? readProcessArgsSync;
let entries: fsSync.Dirent[] = [];
try {
entries = await fs.readdir(sessionsDir, { withFileTypes: true });
} catch (err) {
const code = (err as { code?: string }).code;
if (code === "ENOENT") {
return { locks: [], cleaned: [] };
}
throw err;
}
const locks: SessionLockInspection[] = [];
const cleaned: SessionLockInspection[] = [];
const lockEntries = entries
.filter((entry) => entry.name.endsWith(".jsonl.lock"))
.toSorted((a, b) => a.name.localeCompare(b.name));
for (const entry of lockEntries) {
const lockPath = path.join(sessionsDir, entry.name);
const payload = await readLockPayload(lockPath);
const inspected = inspectLockPayloadForSession({
payload,
staleMs,
nowMs,
heldByThisProcess: false,
reclaimLockWithoutStarttime: false,
readOwnerProcessArgs: ownerProcessArgsReader,
});
const lockInfo: SessionLockInspection = {
lockPath,
...inspected,
removed: false,
};
if (lockInfo.stale && removeStale) {
await fs.rm(lockPath, { force: true });
lockInfo.removed = true;
cleaned.push(lockInfo);
params.log?.warn?.(
`removed stale session lock: ${lockPath} (${lockInfo.staleReasons.join(", ") || "unknown"})`,
);
}
locks.push(lockInfo);
}
return { locks, cleaned };
}
export async function acquireSessionWriteLock(params: {
sessionFile: string;
timeoutMs?: number;
staleMs?: number;
maxHoldMs?: number;
allowReentrant?: boolean;
}): Promise<{
release: () => Promise<void>;
}> {
registerCleanupHandlers();
const allowReentrant = params.allowReentrant ?? false;
const timeoutMs = resolvePositiveMs(params.timeoutMs, resolveSessionWriteLockAcquireTimeoutMs(), {
allowInfinity: true,
});
const staleMs = resolvePositiveMs(params.staleMs, DEFAULT_STALE_MS);
const maxHoldMs = resolvePositiveMs(params.maxHoldMs, DEFAULT_MAX_HOLD_MS);
const sessionFile = path.resolve(params.sessionFile);
const sessionDir = path.dirname(sessionFile);
const normalizedSessionFile = await resolveNormalizedSessionFile(sessionFile);
const lockPath = `${normalizedSessionFile}.lock`;
await fs.mkdir(sessionDir, { recursive: true });
while (true) {
try {
const lock = await SESSION_LOCKS.acquire(sessionFile, {
staleMs,
timeoutMs,
retry: { minTimeout: 50, maxTimeout: 1000, factor: 1 },
allowReentrant,
metadata: { maxHoldMs },
payload: () => {
const createdAt = new Date().toISOString();
const starttime = resolveProcessStartTimeForLock(process.pid);
const lockPayload: LockFilePayload = { pid: process.pid, createdAt };
if (starttime !== null) {
lockPayload.starttime = starttime;
}
return lockPayload as Record<string, unknown>;
},
shouldReclaim: async ({ payload, nowMs, heldByThisProcess }) => {
const inspected = inspectLockPayloadForSession({
payload: payload as LockFilePayload | null,
staleMs,
nowMs,
heldByThisProcess,
reclaimLockWithoutStarttime: true,
readOwnerProcessArgs: readProcessArgsSync,
});
return await shouldReclaimContendedLockFile(lockPath, inspected, staleMs, nowMs);
},
});
return { release: lock.release };
} catch (err) {
if (isFileLockError(err, "file_lock_stale")) {
const staleLockPath = (err as { lockPath?: string }).lockPath ?? lockPath;
if (
await removeReportedStaleLockIfStillStale({
lockPath: staleLockPath,
normalizedSessionFile,
staleMs,
})
) {
continue;
}
}
if (!isFileLockError(err, "file_lock_timeout")) {
throw err;
}
const timeoutLockPath = (err as { lockPath?: string }).lockPath ?? lockPath;
const payload = await readLockPayload(timeoutLockPath);
const owner = typeof payload?.pid === "number" ? `pid=${payload.pid}` : "unknown";
throw new SessionWriteLockTimeoutError({ timeoutMs, owner, lockPath: timeoutLockPath });
}
}
}
export const __testing = {
cleanupSignals: [...CLEANUP_SIGNALS],
handleTerminationSignal,
releaseAllLocksSync,
runLockWatchdogCheck,
setProcessStartTimeResolverForTest(resolver: ((pid: number) => number | null) | null): void {
resolveProcessStartTimeForLock = resolver ?? getProcessStartTime;
},
};
export async function drainSessionWriteLockStateForTest(): Promise<void> {
await SESSION_LOCKS.drain();
stopWatchdogTimer();
unregisterCleanupHandlers();
}
export function resetSessionWriteLockStateForTest(): void {
releaseAllLocksSync();
stopWatchdogTimer();
unregisterCleanupHandlers();
resolveProcessStartTimeForLock = getProcessStartTime;
}

View File

@@ -1,5 +1,9 @@
import type { SessionEntry } from "../../config/sessions.js";
import { updateSessionStore } from "../../config/sessions.js";
import {
getSessionEntry,
resolveAgentIdFromSessionKey,
upsertSessionEntry,
type SessionEntry,
} from "../../config/sessions.js";
import { applyAbortCutoffToSessionEntry, type AbortCutoff } from "./abort-cutoff.js";
import type { CommandHandler } from "./commands-types.js";
@@ -11,11 +15,11 @@ export async function persistSessionEntry(params: CommandParams): Promise<boolea
}
params.sessionEntry.updatedAt = Date.now();
params.sessionStore[params.sessionKey] = params.sessionEntry;
if (params.storePath) {
await updateSessionStore(params.storePath, (store) => {
store[params.sessionKey] = params.sessionEntry as SessionEntry;
});
}
upsertSessionEntry({
agentId: resolveAgentIdFromSessionKey(params.sessionKey),
sessionKey: params.sessionKey,
entry: params.sessionEntry,
});
return true;
}
@@ -23,10 +27,9 @@ export async function persistAbortTargetEntry(params: {
entry?: SessionEntry;
key?: string;
sessionStore?: Record<string, SessionEntry>;
storePath?: string;
abortCutoff?: AbortCutoff;
}): Promise<boolean> {
const { entry, key, sessionStore, storePath, abortCutoff } = params;
const { entry, key, sessionStore, abortCutoff } = params;
if (!entry || !key || !sessionStore) {
return false;
}
@@ -36,18 +39,16 @@ export async function persistAbortTargetEntry(params: {
entry.updatedAt = Date.now();
sessionStore[key] = entry;
if (storePath) {
await updateSessionStore(storePath, (store) => {
const nextEntry = store[key] ?? entry;
if (!nextEntry) {
return;
}
nextEntry.abortedLastRun = true;
applyAbortCutoffToSessionEntry(nextEntry, abortCutoff);
nextEntry.updatedAt = Date.now();
store[key] = nextEntry;
});
}
const agentId = resolveAgentIdFromSessionKey(key);
const nextEntry = getSessionEntry({ agentId, sessionKey: key }) ?? entry;
nextEntry.abortedLastRun = true;
applyAbortCutoffToSessionEntry(nextEntry, abortCutoff);
nextEntry.updatedAt = Date.now();
upsertSessionEntry({
agentId,
sessionKey: key,
entry: nextEntry,
});
return true;
}

View File

@@ -1,123 +0,0 @@
import fs from "node:fs/promises";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
createOpenClawTestState,
type OpenClawTestState,
} from "../test-utils/openclaw-test-state.js";
const note = vi.hoisted(() => vi.fn());
vi.mock("../terminal/note.js", () => ({
note,
}));
import { noteSessionLockHealth } from "./doctor-session-locks.js";
async function expectPathMissing(targetPath: string): Promise<void> {
try {
await fs.access(targetPath);
throw new Error(`expected missing path: ${targetPath}`);
} catch (error) {
expect((error as NodeJS.ErrnoException).code).toBe("ENOENT");
}
}
describe("noteSessionLockHealth", () => {
let state: OpenClawTestState;
beforeEach(async () => {
note.mockClear();
state = await createOpenClawTestState({
layout: "state-only",
prefix: "openclaw-doctor-locks-",
});
});
afterEach(async () => {
await state.cleanup();
});
it("reports existing lock files with pid status and age", async () => {
const sessionsDir = state.sessionsDir();
await fs.mkdir(sessionsDir, { recursive: true });
const lockPath = path.join(sessionsDir, "active.jsonl.lock");
await fs.writeFile(
lockPath,
JSON.stringify({ pid: process.pid, createdAt: new Date(Date.now() - 1500).toISOString() }),
"utf8",
);
await noteSessionLockHealth({
shouldRepair: false,
staleMs: 60_000,
readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "doctor"],
});
expect(note).toHaveBeenCalledTimes(1);
const [message, title] = note.mock.calls[0] as [string, string];
expect(title).toBe("Session locks");
expect(message).toContain("Found 1 session lock file");
expect(message).toContain(`pid=${process.pid} (alive)`);
expect(message).toContain("stale=no");
await expect(fs.access(lockPath)).resolves.toBeUndefined();
});
it("removes stale locks in repair mode", async () => {
const sessionsDir = state.sessionsDir();
await fs.mkdir(sessionsDir, { recursive: true });
const staleLock = path.join(sessionsDir, "stale.jsonl.lock");
const freshLock = path.join(sessionsDir, "fresh.jsonl.lock");
await fs.writeFile(
staleLock,
JSON.stringify({ pid: -1, createdAt: new Date(Date.now() - 120_000).toISOString() }),
"utf8",
);
await fs.writeFile(
freshLock,
JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }),
"utf8",
);
await noteSessionLockHealth({
shouldRepair: true,
staleMs: 30_000,
readOwnerProcessArgs: () => ["node", "/opt/openclaw/openclaw.mjs", "doctor"],
});
expect(note).toHaveBeenCalledTimes(1);
const [message] = note.mock.calls[0] as [string, string];
expect(message).toContain("[removed]");
expect(message).toContain("Removed 1 stale session lock file");
await expectPathMissing(staleLock);
await expect(fs.access(freshLock)).resolves.toBeUndefined();
});
it("removes fresh live locks when the owner is not an OpenClaw process", async () => {
const sessionsDir = state.sessionsDir();
await fs.mkdir(sessionsDir, { recursive: true });
const falseLiveLock = path.join(sessionsDir, "false-live.jsonl.lock");
await fs.writeFile(
falseLiveLock,
JSON.stringify({ pid: process.pid, createdAt: new Date().toISOString() }),
"utf8",
);
await noteSessionLockHealth({
shouldRepair: true,
staleMs: 60_000,
readOwnerProcessArgs: () => ["python", "worker.py"],
});
expect(note).toHaveBeenCalledTimes(1);
const [message] = note.mock.calls[0] as [string, string];
expect(message).toContain("stale=yes (non-openclaw-owner)");
expect(message).toContain("[removed]");
expect(message).toContain("Removed 1 stale session lock file");
await expect(fs.access(falseLiveLock)).rejects.toThrow();
});
});

View File

@@ -1,94 +0,0 @@
import { resolveAgentSessionDirs } from "../agents/session-dirs.js";
import {
cleanStaleLockFiles,
type SessionLockInspection,
type SessionLockOwnerProcessArgsReader,
} from "../agents/session-write-lock.js";
import { resolveStateDir } from "../config/paths.js";
import { note } from "../terminal/note.js";
import { shortenHomePath } from "../utils.js";
const DEFAULT_STALE_MS = 30 * 60 * 1000;
function formatAge(ageMs: number | null): string {
if (ageMs === null) {
return "unknown";
}
const seconds = Math.floor(ageMs / 1000);
if (seconds < 60) {
return `${seconds}s`;
}
const minutes = Math.floor(seconds / 60);
const remainingSeconds = seconds % 60;
if (minutes < 60) {
return `${minutes}m${remainingSeconds}s`;
}
const hours = Math.floor(minutes / 60);
const remainingMinutes = minutes % 60;
return `${hours}h${remainingMinutes}m`;
}
function formatLockLine(lock: SessionLockInspection): string {
const pidStatus =
lock.pid === null ? "pid=missing" : `pid=${lock.pid} (${lock.pidAlive ? "alive" : "dead"})`;
const ageStatus = `age=${formatAge(lock.ageMs)}`;
const staleStatus = lock.stale
? `stale=yes (${lock.staleReasons.join(", ") || "unknown"})`
: "stale=no";
const removedStatus = lock.removed ? " [removed]" : "";
return `- ${shortenHomePath(lock.lockPath)} ${pidStatus} ${ageStatus} ${staleStatus}${removedStatus}`;
}
export async function noteSessionLockHealth(params?: {
shouldRepair?: boolean;
staleMs?: number;
readOwnerProcessArgs?: SessionLockOwnerProcessArgsReader;
}) {
const shouldRepair = params?.shouldRepair === true;
const staleMs = params?.staleMs ?? DEFAULT_STALE_MS;
let sessionDirs: string[] = [];
try {
sessionDirs = await resolveAgentSessionDirs(resolveStateDir(process.env));
} catch (err) {
note(`- Failed to inspect session lock files: ${String(err)}`, "Session locks");
return;
}
if (sessionDirs.length === 0) {
return;
}
const allLocks: SessionLockInspection[] = [];
for (const sessionsDir of sessionDirs) {
const result = await cleanStaleLockFiles({
sessionsDir,
staleMs,
removeStale: shouldRepair,
readOwnerProcessArgs: params?.readOwnerProcessArgs,
});
allLocks.push(...result.locks);
}
if (allLocks.length === 0) {
return;
}
const staleCount = allLocks.filter((lock) => lock.stale).length;
const removedCount = allLocks.filter((lock) => lock.removed).length;
const lines: string[] = [
`- Found ${allLocks.length} session lock file${allLocks.length === 1 ? "" : "s"}.`,
...allLocks.toSorted((a, b) => a.lockPath.localeCompare(b.lockPath)).map(formatLockLine),
];
if (staleCount > 0 && !shouldRepair) {
lines.push(`- ${staleCount} lock file${staleCount === 1 ? " is" : "s are"} stale.`);
lines.push('- Run "openclaw doctor --fix" to remove stale lock files automatically.');
}
if (shouldRepair && removedCount > 0) {
lines.push(
`- Removed ${removedCount} stale session lock file${removedCount === 1 ? "" : "s"}.`,
);
}
note(lines.join("\n"), "Session locks");
}

View File

@@ -1,14 +0,0 @@
import type * as SessionWriteLockModule from "../agents/session-write-lock.js";
type SessionWriteLockModuleShape = typeof SessionWriteLockModule;
export async function buildSessionWriteLockModuleMock(
loadActual: () => Promise<SessionWriteLockModuleShape>,
acquireSessionWriteLock: SessionWriteLockModuleShape["acquireSessionWriteLock"],
): Promise<SessionWriteLockModuleShape> {
const original = await loadActual();
return {
...original,
acquireSessionWriteLock,
};
}