refactor: move session state migration to doctor

This commit is contained in:
Peter Steinberger
2026-05-07 22:00:59 +01:00
parent 3e7945e74d
commit 012a1e966b
44 changed files with 1746 additions and 1762 deletions

View File

@@ -2318,6 +2318,7 @@ async function mirrorTranscriptBestEffort(params: {
try {
await mirrorCodexAppServerTranscript({
sessionFile: params.params.sessionFile,
sessionId: params.params.sessionId,
agentId: params.agentId,
sessionKey: params.sessionKey,
messages: params.result.messagesSnapshot,

View File

@@ -13,7 +13,12 @@ import {
makeAgentAssistantMessage,
makeAgentUserMessage,
} from "openclaw/plugin-sdk/test-fixtures";
import { afterEach, describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
loadSqliteSessionTranscriptEvents,
replaceSqliteSessionTranscriptEvents,
} from "../../../../src/config/sessions/transcript-store.sqlite.js";
import { closeOpenClawStateDatabaseForTest } from "../../../../src/state/openclaw-state-db.js";
import { attachCodexMirrorIdentity, mirrorCodexAppServerTranscript } from "./transcript-mirror.js";
type MirroredAgentMessage = Extract<AgentMessage, { role: "user" | "assistant" | "toolResult" }>;
@@ -29,6 +34,8 @@ const tempDirs: string[] = [];
afterEach(async () => {
resetGlobalHookRunner();
closeOpenClawStateDatabaseForTest();
vi.unstubAllEnvs();
for (const dir of tempDirs.splice(0)) {
await fs.rm(dir, { recursive: true, force: true });
}
@@ -37,23 +44,31 @@ afterEach(async () => {
async function createTempSessionFile() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-codex-transcript-"));
tempDirs.push(dir);
vi.stubEnv("OPENCLAW_STATE_DIR", dir);
return path.join(dir, "session.jsonl");
}
async function makeRoot(prefix: string): Promise<string> {
const root = await fs.mkdtemp(path.join(os.tmpdir(), prefix));
tempDirs.push(root);
vi.stubEnv("OPENCLAW_STATE_DIR", root);
return root;
}
function parseJsonLines<T>(raw: string): T[] {
const records: T[] = [];
for (const line of raw.trim().split("\n")) {
if (line.length > 0) {
records.push(JSON.parse(line) as T);
}
}
return records;
function sessionIdFromFile(sessionFile: string): string {
return path.basename(sessionFile).replace(/\.jsonl$/i, "");
}
function readTranscriptEvents(sessionFile: string, sessionId = sessionIdFromFile(sessionFile)) {
return loadSqliteSessionTranscriptEvents({
agentId: "main",
sessionId,
}).map((entry) => entry.event);
}
function readTranscriptRaw(sessionFile: string, sessionId = sessionIdFromFile(sessionFile)) {
const lines = readTranscriptEvents(sessionFile, sessionId).map((event) => JSON.stringify(event));
return lines.length ? `${lines.join("\n")}\n` : "";
}
describe("mirrorCodexAppServerTranscript", () => {
@@ -88,7 +103,7 @@ describe("mirrorCodexAppServerTranscript", () => {
idempotencyScope: "scope-1",
});
const raw = await fs.readFile(sessionFile, "utf8");
const raw = readTranscriptRaw(sessionFile);
expect(raw).toContain('"role":"user"');
expect(raw).toContain('"content":[{"type":"text","text":"hello"}]');
expect(raw).toContain('"role":"assistant"');
@@ -121,7 +136,7 @@ describe("mirrorCodexAppServerTranscript", () => {
idempotencyScope: "scope-1",
});
const raw = await fs.readFile(sessionFile, "utf8");
const raw = readTranscriptRaw(sessionFile);
expect(raw).toContain('"role":"assistant"');
expect(raw).toContain('"content":[{"type":"text","text":"first mirror"}]');
});
@@ -152,9 +167,11 @@ describe("mirrorCodexAppServerTranscript", () => {
idempotencyScope: "scope-1",
});
const records = parseJsonLines<{ type?: string; message?: { role?: string } }>(
await fs.readFile(sessionFile, "utf8"),
);
const records = readTranscriptRaw(sessionFile)
.trim()
.split("\n")
.filter(Boolean)
.map((line) => JSON.parse(line) as { type?: string; message?: { role?: string } });
expect(records.slice(1)).toHaveLength(2);
});
@@ -185,7 +202,7 @@ describe("mirrorCodexAppServerTranscript", () => {
idempotencyScope: "scope-1",
});
const raw = await fs.readFile(sessionFile, "utf8");
const raw = readTranscriptRaw(sessionFile);
expect(raw).toContain('"content":[{"type":"text","text":"hello [hooked]"}]');
// The idempotency fingerprint is derived from the pre-hook message so a
// hook rewrite cannot bypass dedupe by reshaping content on every retry.
@@ -221,7 +238,7 @@ describe("mirrorCodexAppServerTranscript", () => {
idempotencyScope: "scope-1",
});
const raw = await fs.readFile(sessionFile, "utf8");
const raw = readTranscriptRaw(sessionFile);
expect(raw).toContain(
`"idempotencyKey":"scope-1:assistant:${expectedFingerprint(sourceMessage)}"`,
);
@@ -251,33 +268,36 @@ describe("mirrorCodexAppServerTranscript", () => {
idempotencyScope: "scope-1",
});
await expect(fs.readFile(sessionFile, "utf8")).rejects.toMatchObject({ code: "ENOENT" });
expect(readTranscriptRaw(sessionFile)).toBe("");
});
it("migrates small linear transcripts before mirroring", async () => {
const sessionFile = await createTempSessionFile();
await fs.writeFile(
sessionFile,
[
JSON.stringify({
replaceSqliteSessionTranscriptEvents({
agentId: "main",
sessionId: "linear-codex-session",
transcriptPath: sessionFile,
events: [
{
type: "session",
version: 3,
id: "linear-codex-session",
timestamp: new Date().toISOString(),
cwd: process.cwd(),
}),
JSON.stringify({
},
{
type: "message",
id: "legacy-user",
parentId: null,
timestamp: new Date().toISOString(),
message: { role: "user", content: "legacy user" },
}),
].join("\n") + "\n",
"utf8",
);
},
],
});
await mirrorCodexAppServerTranscript({
sessionFile,
sessionId: "linear-codex-session",
sessionKey: "session-1",
messages: [
makeAgentAssistantMessage({
@@ -288,7 +308,7 @@ describe("mirrorCodexAppServerTranscript", () => {
idempotencyScope: "scope-1",
});
const records = (await fs.readFile(sessionFile, "utf8"))
const records = readTranscriptRaw(sessionFile, "linear-codex-session")
.trim()
.split("\n")
.map(
@@ -373,9 +393,7 @@ describe("mirrorCodexAppServerTranscript", () => {
idempotencyScope: "codex-app-server:thread-X",
});
const messageTexts = readFileMessages(await fs.readFile(sessionFile, "utf8")).map(
(m) => m.text,
);
const messageTexts = readFileMessages(readTranscriptRaw(sessionFile)).map((m) => m.text);
expect(messageTexts).toEqual(["hello", "hi there", "[Codex reasoning] thinking"]);
});
@@ -427,7 +445,7 @@ describe("mirrorCodexAppServerTranscript", () => {
idempotencyScope: "codex-app-server:thread-X",
});
expect(readFileMessages(await fs.readFile(sessionFile, "utf8"))).toEqual([
expect(readFileMessages(readTranscriptRaw(sessionFile))).toEqual([
{ role: "user", text: "yes" },
{ role: "assistant", text: "ok 1" },
{ role: "user", text: "yes" },
@@ -487,7 +505,7 @@ describe("mirrorCodexAppServerTranscript", () => {
idempotencyScope: "codex-app-server:thread-X",
});
expect(readFileMessages(await fs.readFile(sessionFile, "utf8"))).toEqual([
expect(readFileMessages(readTranscriptRaw(sessionFile))).toEqual([
{ role: "user", text: "msg1" },
{ role: "assistant", text: "reply1" },
{ role: "user", text: "msg2" },
@@ -517,7 +535,7 @@ describe("mirrorCodexAppServerTranscript", () => {
idempotencyScope: "scope-1",
});
const raw = await fs.readFile(sessionFile, "utf8");
const raw = readTranscriptRaw(sessionFile);
expect(raw).toContain(`"idempotencyKey":"scope-1:user:${expectedFingerprint(userMessage)}"`);
expect(raw).toContain(
`"idempotencyKey":"scope-1:assistant:${expectedFingerprint(assistantMessage)}"`,

View File

@@ -1,15 +1,15 @@
import { createHash } from "node:crypto";
import fs from "node:fs/promises";
import path from "node:path";
import {
acquireSessionWriteLock,
appendSessionTranscriptMessage,
emitSessionTranscriptUpdate,
resolveSessionWriteLockAcquireTimeoutMs,
runAgentHarnessBeforeMessageWriteHook,
type AgentMessage,
type SessionWriteLockAcquireTimeoutConfig,
} from "openclaw/plugin-sdk/agent-harness-runtime";
const DEFAULT_AGENT_ID = "main";
type MirroredAgentMessage = Extract<AgentMessage, { role: "user" | "assistant" | "toolResult" }>;
const MIRROR_IDENTITY_META_KEY = "mirrorIdentity" as const;
@@ -67,6 +67,7 @@ function buildMirrorDedupeIdentity(message: MirroredAgentMessage): string {
export async function mirrorCodexAppServerTranscript(params: {
sessionFile: string;
sessionId?: string;
sessionKey?: string;
agentId?: string;
messages: AgentMessage[];
@@ -81,51 +82,46 @@ export async function mirrorCodexAppServerTranscript(params: {
return;
}
const lock = await acquireSessionWriteLock({
sessionFile: params.sessionFile,
timeoutMs: resolveSessionWriteLockAcquireTimeoutMs(params.config),
});
try {
const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile);
for (const message of messages) {
const dedupeIdentity = buildMirrorDedupeIdentity(message);
const idempotencyKey = params.idempotencyScope
? `${params.idempotencyScope}:${dedupeIdentity}`
: undefined;
if (idempotencyKey && existingIdempotencyKeys.has(idempotencyKey)) {
continue;
}
const transcriptMessage = {
...message,
...(idempotencyKey ? { idempotencyKey } : {}),
} as AgentMessage;
const nextMessage = runAgentHarnessBeforeMessageWriteHook({
message: transcriptMessage,
agentId: params.agentId,
sessionKey: params.sessionKey,
});
if (!nextMessage) {
continue;
}
const messageToAppend = (
idempotencyKey
? {
...(nextMessage as unknown as Record<string, unknown>),
idempotencyKey,
}
: nextMessage
) as AgentMessage;
await appendSessionTranscriptMessage({
transcriptPath: params.sessionFile,
message: messageToAppend,
config: params.config,
});
if (idempotencyKey) {
existingIdempotencyKeys.add(idempotencyKey);
}
const agentId = params.agentId?.trim() || DEFAULT_AGENT_ID;
const sessionId =
params.sessionId?.trim() ||
path
.basename(params.sessionFile)
.replace(/\.jsonl$/i, "")
.trim();
for (const message of messages) {
const dedupeIdentity = buildMirrorDedupeIdentity(message);
const idempotencyKey = params.idempotencyScope
? `${params.idempotencyScope}:${dedupeIdentity}`
: undefined;
const transcriptMessage = {
...message,
...(idempotencyKey ? { idempotencyKey } : {}),
} as AgentMessage;
const nextMessage = runAgentHarnessBeforeMessageWriteHook({
message: transcriptMessage,
agentId: params.agentId,
sessionKey: params.sessionKey,
});
if (!nextMessage) {
continue;
}
} finally {
await lock.release();
const messageToAppend = (
idempotencyKey
? {
...(nextMessage as unknown as Record<string, unknown>),
idempotencyKey,
}
: nextMessage
) as AgentMessage;
await appendSessionTranscriptMessage({
transcriptPath: params.sessionFile,
agentId,
sessionId,
message: messageToAppend,
config: params.config,
});
}
if (params.sessionKey) {
@@ -134,30 +130,3 @@ export async function mirrorCodexAppServerTranscript(params: {
emitSessionTranscriptUpdate(params.sessionFile);
}
}
async function readTranscriptIdempotencyKeys(sessionFile: string): Promise<Set<string>> {
const keys = new Set<string>();
let raw: string;
try {
raw = await fs.readFile(sessionFile, "utf8");
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
throw error;
}
return keys;
}
for (const line of raw.split(/\r?\n/)) {
if (!line.trim()) {
continue;
}
try {
const parsed = JSON.parse(line) as { message?: { idempotencyKey?: unknown } };
if (typeof parsed.message?.idempotencyKey === "string") {
keys.add(parsed.message.idempotencyKey);
}
} catch {
continue;
}
}
return keys;
}