diff --git a/src/gateway/session-compaction-checkpoints.test.ts b/src/gateway/session-compaction-checkpoints.test.ts index bf2755a1963..47f3c819058 100644 --- a/src/gateway/session-compaction-checkpoints.test.ts +++ b/src/gateway/session-compaction-checkpoints.test.ts @@ -5,6 +5,10 @@ import { afterEach, describe, expect, test, vi } from "vitest"; import type { AssistantMessage } from "../agents/pi-ai-contract.js"; import { SessionManager } from "../agents/transcript/session-transcript-contract.js"; import { getSessionEntry, upsertSessionEntry } from "../config/sessions.js"; +import { + createSqliteSessionTranscriptLocator, + isSqliteSessionTranscriptLocator, +} from "../config/sessions/paths.js"; import { exportSqliteSessionTranscriptJsonl, hasSqliteSessionTranscriptEvents, @@ -167,6 +171,56 @@ describe("session-compaction-checkpoints", () => { } }); + test("async capture keeps checkpoint transcript locators virtual for SQLite sources", async () => { + const sourceSessionId = "source-capture-virtual"; + const sourceFile = createSqliteSessionTranscriptLocator({ + agentId: DEFAULT_AGENT_ID, + sessionId: sourceSessionId, + }); + replaceSqliteSessionTranscriptEvents({ + agentId: DEFAULT_AGENT_ID, + sessionId: sourceSessionId, + transcriptPath: sourceFile, + events: [ + { + type: "session", + id: sourceSessionId, + timestamp: new Date(0).toISOString(), + cwd: "/tmp/openclaw-virtual-capture", + }, + { + type: "message", + id: "capture-leaf", + role: "user", + content: "virtual checkpoint source", + }, + ], + }); + + const snapshot = await captureCompactionCheckpointSnapshotAsync({ + sessionFile: sourceFile, + }); + + expect(snapshot).not.toBeNull(); + expect(snapshot?.leafId).toBe("capture-leaf"); + expect(snapshot?.sessionFile).toBeTruthy(); + expect(isSqliteSessionTranscriptLocator(snapshot?.sessionFile)).toBe(true); + expect(snapshot?.sessionFile).toContain("sqlite-transcript://"); + expect(snapshot?.sessionFile).not.toMatch(/^sqlite-transcript:\/[^/]/u); + expect( + hasSqliteSessionTranscriptSnapshot({ + agentId: DEFAULT_AGENT_ID, + sessionId: sourceSessionId, + snapshotId: snapshot!.sessionId, + }), + ).toBe(true); + expect(readSqliteTranscriptEvents(snapshot!.sessionId)[0]).toMatchObject({ + type: "session", + id: snapshot!.sessionId, + parentSession: sourceFile, + }); + }); + test("async capture skips oversized pre-compaction transcripts without sync copy", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-async-oversized-")); tempDirs.push(dir); @@ -245,6 +299,59 @@ describe("session-compaction-checkpoints", () => { ); }); + test("async fork keeps transcript locators virtual for SQLite sources", async () => { + const sourceSessionId = "source-fork-virtual"; + const sourceFile = createSqliteSessionTranscriptLocator({ + agentId: DEFAULT_AGENT_ID, + sessionId: sourceSessionId, + }); + replaceSqliteSessionTranscriptEvents({ + agentId: DEFAULT_AGENT_ID, + sessionId: sourceSessionId, + transcriptPath: sourceFile, + events: [ + { + type: "session", + id: sourceSessionId, + timestamp: new Date(0).toISOString(), + cwd: "/tmp/openclaw-virtual-fork", + }, + { + type: "message", + id: "fork-leaf", + role: "assistant", + content: "virtual fork source", + }, + ], + }); + + const forked = await forkCompactionCheckpointTranscriptAsync({ + sourceFile, + }); + + expect(forked).not.toBeNull(); + expect(forked?.sessionId).toBeTruthy(); + expect(isSqliteSessionTranscriptLocator(forked?.sessionFile)).toBe(true); + expect(forked?.sessionFile).toContain("sqlite-transcript://"); + expect(forked?.sessionFile).not.toMatch(/^sqlite-transcript:\/[^/]/u); + const forkedEntries = readSqliteTranscriptEvents(forked!.sessionId); + expect(forkedEntries[0]).toMatchObject({ + type: "session", + id: forked!.sessionId, + cwd: "/tmp/openclaw-virtual-fork", + parentSession: sourceFile, + }); + expect(forkedEntries[1]).toMatchObject({ + type: "message", + role: "assistant", + content: "virtual fork source", + }); + expect(readSqliteTranscriptEvents(sourceSessionId)[1]).toMatchObject({ + type: "message", + id: "fork-leaf", + }); + }); + test("async fork ignores legacy checkpoint files that doctor has not imported", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-legacy-fork-")); tempDirs.push(dir); diff --git a/src/gateway/session-compaction-checkpoints.ts b/src/gateway/session-compaction-checkpoints.ts index 56c4186cb9a..7f04ba81875 100644 --- a/src/gateway/session-compaction-checkpoints.ts +++ b/src/gateway/session-compaction-checkpoints.ts @@ -13,6 +13,10 @@ import type { SessionCompactionCheckpointReason, SessionEntry, } from "../config/sessions.js"; +import { + createSqliteSessionTranscriptLocator, + isSqliteSessionTranscriptLocator, +} from "../config/sessions/paths.js"; import { deleteSqliteSessionTranscript, deleteSqliteSessionTranscriptSnapshot, @@ -142,6 +146,13 @@ function createCheckpointVirtualTranscriptPath(params: { if (!sourceFile) { return undefined; } + if (isSqliteSessionTranscriptLocator(sourceFile)) { + const scope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath: sourceFile }); + return createSqliteSessionTranscriptLocator({ + agentId: scope?.agentId ?? DEFAULT_AGENT_ID, + sessionId: params.checkpointId, + }); + } const parsed = path.parse(sourceFile); return path.join( parsed.dir, @@ -183,11 +194,21 @@ export async function forkCompactionCheckpointTranscriptAsync(params: { migrateSessionEntries(entries); const targetCwd = params.targetCwd ?? sourceHeader.cwd ?? process.cwd(); - const sessionDir = params.sessionDir ?? (sourceFile ? path.dirname(sourceFile) : process.cwd()); const sessionId = randomUUID(); const timestamp = new Date().toISOString(); - const fileTimestamp = timestamp.replace(/[:.]/g, "-"); - const sessionFile = path.join(sessionDir, `${fileTimestamp}_${sessionId}.jsonl`); + const sourceScope = sourceFile + ? resolveSqliteSessionTranscriptScopeForPath({ transcriptPath: sourceFile }) + : undefined; + const agentId = params.agentId?.trim() || sourceScope?.agentId || DEFAULT_AGENT_ID; + const sessionFile = + sourceFile && isSqliteSessionTranscriptLocator(sourceFile) + ? createSqliteSessionTranscriptLocator({ agentId, sessionId }) + : (() => { + const sessionDir = + params.sessionDir ?? (sourceFile ? path.dirname(sourceFile) : process.cwd()); + const fileTimestamp = timestamp.replace(/[:.]/g, "-"); + return path.join(sessionDir, `${fileTimestamp}_${sessionId}.jsonl`); + })(); const header = { type: "session", version: CURRENT_SESSION_VERSION, @@ -199,7 +220,7 @@ export async function forkCompactionCheckpointTranscriptAsync(params: { try { replaceSqliteSessionTranscriptEvents({ - agentId: params.agentId?.trim() || DEFAULT_AGENT_ID, + agentId, sessionId, transcriptPath: sessionFile, events: [