From 442d8065ca0d7affd468a3d5fb0dba213d2408af Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 9 May 2026 07:34:20 +0100 Subject: [PATCH] refactor: keep compaction transcript handles out of session rows --- .../bot-native-commands.session-meta.test.ts | 21 ++-- .../telegram/src/bot-native-commands.ts | 12 +- .../reply/agent-runner-memory.test.ts | 113 +++++++++--------- src/auto-reply/reply/agent-runner-memory.ts | 69 ++++++----- .../reply/agent-runner-session-reset.test.ts | 12 +- .../reply/agent-runner-session-reset.ts | 14 +-- .../reply/session-updates.lifecycle.test.ts | 25 ++-- src/auto-reply/reply/session-updates.ts | 14 ++- src/gateway/session-transcript-paths.ts | 25 ++-- src/plugin-sdk/session-store-runtime.ts | 2 +- 10 files changed, 159 insertions(+), 148 deletions(-) diff --git a/extensions/telegram/src/bot-native-commands.session-meta.test.ts b/extensions/telegram/src/bot-native-commands.session-meta.test.ts index 76072bb355d..dd222f1fd79 100644 --- a/extensions/telegram/src/bot-native-commands.session-meta.test.ts +++ b/extensions/telegram/src/bot-native-commands.session-meta.test.ts @@ -53,7 +53,7 @@ const sessionMocks = vi.hoisted(() => { Object.entries(sessionStore.value).map(([sessionKey, entry]) => ({ sessionKey, entry })), ), recordSessionMetaFromInbound: vi.fn(), - resolveAndPersistSessionTranscriptLocator: vi.fn(), + resolveAndPersistSessionTranscriptIdentity: vi.fn(), sessionStore, }; }); @@ -160,8 +160,8 @@ vi.mock("openclaw/plugin-sdk/session-store-runtime", async () => { ...actual, getSessionEntry: sessionMocks.getSessionEntry, listSessionEntries: sessionMocks.listSessionEntries, - resolveAndPersistSessionTranscriptLocator: - sessionMocks.resolveAndPersistSessionTranscriptLocator, + resolveAndPersistSessionTranscriptIdentity: + sessionMocks.resolveAndPersistSessionTranscriptIdentity, }; }); vi.mock("openclaw/plugin-sdk/command-auth-native", async () => { @@ -484,17 +484,16 @@ describe("registerTelegramNativeCommands — session metadata", () => { })), ); sessionMocks.recordSessionMetaFromInbound.mockClear().mockResolvedValue(undefined); - sessionMocks.resolveAndPersistSessionTranscriptLocator + sessionMocks.resolveAndPersistSessionTranscriptIdentity .mockClear() .mockImplementation(async (params) => { - const transcriptLocator = - params.fallbackTranscriptLocator ?? `/tmp/openclaw-sessions/${params.sessionId}.jsonl`; + const topicSuffix = params.topicId === undefined ? "" : `?topic=${params.topicId}`; + const transcriptLocator = `sqlite-transcript://${params.agentId ?? "main"}/${params.sessionId}${topicSuffix}`; return { transcriptLocator, sessionEntry: { ...params.sessionEntry, sessionId: params.sessionId, - sessionFile: transcriptLocator, updatedAt: Date.now(), }, }; @@ -1163,7 +1162,7 @@ describe("registerTelegramNativeCommands — session metadata", () => { expectUnauthorizedNewCommandBlocked(sendMessage); }); - it("passes a persisted topic session file to plugin commands", async () => { + it("passes a persisted topic transcript locator to plugin commands", async () => { sessionMocks.sessionStore.value = { "agent:main:telegram:group:-1001234567890:topic:42": { sessionId: "sess-topic", @@ -1200,18 +1199,18 @@ describe("registerTelegramNativeCommands — session metadata", () => { createTelegramTopicCommandContext({ match: "bind --cwd /tmp/work", threadId: 42 }), ); - expect(sessionMocks.resolveAndPersistSessionTranscriptLocator).toHaveBeenCalledWith( + expect(sessionMocks.resolveAndPersistSessionTranscriptIdentity).toHaveBeenCalledWith( expect.objectContaining({ sessionId: "sess-topic", sessionKey: "agent:main:telegram:group:-1001234567890:topic:42", - fallbackTranscriptLocator: "sqlite-transcript://main/sess-topic-topic-42.jsonl", + topicId: 42, }), ); expect(pluginRuntimeMocks.executePluginCommand).toHaveBeenCalledWith( expect.objectContaining({ sessionKey: "agent:main:telegram:group:-1001234567890:topic:42", sessionId: "sess-topic", - sessionFile: "sqlite-transcript://main/sess-topic-topic-42.jsonl", + transcriptLocator: "sqlite-transcript://main/sess-topic?topic=42", messageThreadId: 42, }), ); diff --git a/extensions/telegram/src/bot-native-commands.ts b/extensions/telegram/src/bot-native-commands.ts index d41c1d36bc5..00b8085dc3b 100644 --- a/extensions/telegram/src/bot-native-commands.ts +++ b/extensions/telegram/src/bot-native-commands.ts @@ -36,10 +36,9 @@ import { danger, logVerbose } from "openclaw/plugin-sdk/runtime-env"; import { getChildLogger } from "openclaw/plugin-sdk/runtime-env"; import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; import { - createSqliteSessionTranscriptLocator, getSessionEntry, listSessionEntries, - resolveAndPersistSessionTranscriptLocator, + resolveAndPersistSessionTranscriptIdentity, resolveSessionRowEntry, } from "openclaw/plugin-sdk/session-store-runtime"; import { @@ -183,17 +182,12 @@ async function resolveTelegramCommandTranscriptLocator(params: { sessionKey, }); const sessionId = resolved.existing?.sessionId?.trim() || randomUUID(); - const fallbackTranscriptLocator = createSqliteSessionTranscriptLocator({ - sessionId, - agentId: params.agentId, - topicId: params.threadId, - }); - const persisted = await resolveAndPersistSessionTranscriptLocator({ + const persisted = await resolveAndPersistSessionTranscriptIdentity({ sessionId, sessionKey: resolved.normalizedKey, sessionEntry: resolved.existing, agentId: params.agentId, - fallbackTranscriptLocator, + topicId: params.threadId, }); return { sessionId, sessionFile: persisted.transcriptLocator }; } catch { diff --git a/src/auto-reply/reply/agent-runner-memory.test.ts b/src/auto-reply/reply/agent-runner-memory.test.ts index 85689849e7d..6b4acc92781 100644 --- a/src/auto-reply/reply/agent-runner-memory.test.ts +++ b/src/auto-reply/reply/agent-runner-memory.test.ts @@ -88,10 +88,10 @@ describe("runMemoryFlushIfNeeded", () => { }; if (typeof params.newSessionId === "string" && params.newSessionId) { nextEntry.sessionId = params.newSessionId; - if (typeof params.newSessionFile === "string" && params.newSessionFile) { - nextEntry.sessionFile = params.newSessionFile; + if (typeof params.newTranscriptLocator === "string" && params.newTranscriptLocator) { + nextEntry.transcriptLocator = params.newTranscriptLocator; } else { - nextEntry.sessionFile = resolveMainTranscriptPath(rootDir, params.newSessionId); + nextEntry.transcriptLocator = resolveMainTranscriptPath(rootDir, params.newSessionId); } } params.sessionStore[sessionKey] = nextEntry; @@ -187,11 +187,12 @@ describe("runMemoryFlushIfNeeded", () => { key: sessionKey, previousSessionId: "session", nextSessionId: "session-rotated", - nextSessionFile: expect.stringContaining("session-rotated.jsonl"), + nextTranscriptLocator: "sqlite-transcript://main/session-rotated", }); const persisted = readTestSessionRow(sessionKey); expect(persisted?.sessionId).toBe("session-rotated"); + expect(persisted?.transcriptLocator).toBeUndefined(); expect(persisted?.compactionCount).toBe(2); expect(persisted?.memoryFlushCompactionCount).toBe(1); expect(persisted?.memoryFlushAt).toBe(1_700_000_000_000); @@ -329,11 +330,11 @@ describe("runMemoryFlushIfNeeded", () => { }); it("passes runtime policy session key to preflight compaction sandbox resolution", async () => { - const sessionFile = resolveMainTranscriptPath(rootDir, "session"); + const transcriptLocator = resolveMainTranscriptPath(rootDir, "session"); appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "message", id: "m1", @@ -350,7 +351,7 @@ describe("runMemoryFlushIfNeeded", () => { })); const sessionEntry: SessionEntry = { sessionId: "session", - sessionFile, + transcriptLocator, updatedAt: Date.now(), totalTokensFresh: false, }; @@ -359,7 +360,7 @@ describe("runMemoryFlushIfNeeded", () => { cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, followupRun: createTestFollowupRun({ sessionId: "session", - sessionFile, + transcriptLocator, sessionKey: "agent:main:main", runtimePolicySessionKey: "agent:main:telegram:default:direct:12345", }), @@ -382,12 +383,12 @@ describe("runMemoryFlushIfNeeded", () => { }); it("updates the active preflight run after transcript rotation", async () => { - const sessionFile = resolveMainTranscriptPath(rootDir, "session"); + const transcriptLocator = resolveMainTranscriptPath(rootDir, "session"); const successorFile = resolveMainTranscriptPath(rootDir, "session-rotated"); appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "message", id: "m1", @@ -408,19 +409,19 @@ describe("runMemoryFlushIfNeeded", () => { result: { tokensAfter: 42, sessionId: "session-rotated", - sessionFile: successorFile, + transcriptLocator: successorFile, }, }); const sessionEntry: SessionEntry = { sessionId: "session", - sessionFile, + transcriptLocator, updatedAt: Date.now(), totalTokensFresh: false, }; const sessionStore = { "agent:main:main": sessionEntry }; const followupRun = createTestFollowupRun({ sessionId: "session", - sessionFile, + transcriptLocator, sessionKey: "agent:main:main", }); const updateSessionId = vi.fn(); @@ -443,24 +444,24 @@ describe("runMemoryFlushIfNeeded", () => { }); expect(entry?.sessionId).toBe("session-rotated"); - expect(entry?.sessionFile).toBe(successorFile); + expect(entry?.transcriptLocator).toBeUndefined(); expect(followupRun.run.sessionId).toBe("session-rotated"); - expect(followupRun.run.sessionFile).toBe(successorFile); + expect(followupRun.run.transcriptLocator).toBe(successorFile); expect(updateSessionId).toHaveBeenCalledWith("session-rotated"); expect(refreshQueuedFollowupSessionMock).toHaveBeenCalledWith({ key: "agent:main:main", previousSessionId: "session", nextSessionId: "session-rotated", - nextSessionFile: successorFile, + nextTranscriptLocator: successorFile, }); }); it("includes recent output tokens when deciding preflight compaction", async () => { - const sessionFile = resolveMainTranscriptPath(rootDir, "session"); + const transcriptLocator = resolveMainTranscriptPath(rootDir, "session"); appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "message", id: "m1", @@ -481,7 +482,7 @@ describe("runMemoryFlushIfNeeded", () => { })); const sessionEntry: SessionEntry = { sessionId: "session", - sessionFile, + transcriptLocator, updatedAt: Date.now(), totalTokensFresh: false, }; @@ -490,7 +491,7 @@ describe("runMemoryFlushIfNeeded", () => { cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, followupRun: createTestFollowupRun({ sessionId: "session", - sessionFile, + transcriptLocator, sessionKey: "main", }), defaultModel: "anthropic/claude-opus-4-6", @@ -508,12 +509,12 @@ describe("runMemoryFlushIfNeeded", () => { expect(compactCall.currentTokenCount).toBeGreaterThanOrEqual(100_000); }); - it("uses the active run sessionFile when the session entry has no transcript path", async () => { - const sessionFile = resolveMainTranscriptPath(rootDir, "session"); + it("uses the active run transcriptLocator when the session entry has no transcript path", async () => { + const transcriptLocator = resolveMainTranscriptPath(rootDir, "session"); appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "message", id: "m1", @@ -542,7 +543,7 @@ describe("runMemoryFlushIfNeeded", () => { cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, followupRun: createTestFollowupRun({ sessionId: "session", - sessionFile, + transcriptLocator, sessionKey: "main", }), defaultModel: "anthropic/claude-opus-4-6", @@ -557,17 +558,17 @@ describe("runMemoryFlushIfNeeded", () => { expect(compactEmbeddedPiSessionMock).toHaveBeenCalledWith( expect.objectContaining({ sessionId: "session", - sessionFile, + transcriptLocator, }), ); }); it("keeps preflight compaction conservative for content appended after latest usage", async () => { - const sessionFile = resolveMainTranscriptPath(rootDir, "session"); + const transcriptLocator = resolveMainTranscriptPath(rootDir, "session"); appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "message", id: "m1", @@ -581,7 +582,7 @@ describe("runMemoryFlushIfNeeded", () => { appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "message", id: "m2", @@ -601,7 +602,7 @@ describe("runMemoryFlushIfNeeded", () => { })); const sessionEntry: SessionEntry = { sessionId: "session", - sessionFile, + transcriptLocator, updatedAt: Date.now(), totalTokensFresh: false, }; @@ -610,7 +611,7 @@ describe("runMemoryFlushIfNeeded", () => { cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, followupRun: createTestFollowupRun({ sessionId: "session", - sessionFile, + transcriptLocator, sessionKey: "main", }), defaultModel: "anthropic/claude-opus-4-6", @@ -629,11 +630,11 @@ describe("runMemoryFlushIfNeeded", () => { }); it("combines latest usage with post-usage tail pressure for preflight compaction", async () => { - const sessionFile = resolveMainTranscriptPath(rootDir, "session"); + const transcriptLocator = resolveMainTranscriptPath(rootDir, "session"); appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "message", id: "m1", @@ -647,7 +648,7 @@ describe("runMemoryFlushIfNeeded", () => { appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "message", id: "m2", @@ -667,7 +668,7 @@ describe("runMemoryFlushIfNeeded", () => { })); const sessionEntry: SessionEntry = { sessionId: "session", - sessionFile, + transcriptLocator, updatedAt: Date.now(), totalTokensFresh: false, }; @@ -676,7 +677,7 @@ describe("runMemoryFlushIfNeeded", () => { cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, followupRun: createTestFollowupRun({ sessionId: "session", - sessionFile, + transcriptLocator, sessionKey: "main", }), defaultModel: "anthropic/claude-opus-4-6", @@ -695,11 +696,11 @@ describe("runMemoryFlushIfNeeded", () => { }); it("does not count bytes from a large latest usage record as post-usage tail pressure", async () => { - const sessionFile = resolveMainTranscriptPath(rootDir, "session"); + const transcriptLocator = resolveMainTranscriptPath(rootDir, "session"); appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "session", id: "session", @@ -708,7 +709,7 @@ describe("runMemoryFlushIfNeeded", () => { appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "message", id: "m1", @@ -729,7 +730,7 @@ describe("runMemoryFlushIfNeeded", () => { })); const sessionEntry: SessionEntry = { sessionId: "session", - sessionFile, + transcriptLocator, updatedAt: Date.now(), totalTokensFresh: false, }; @@ -738,7 +739,7 @@ describe("runMemoryFlushIfNeeded", () => { cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } }, followupRun: createTestFollowupRun({ sessionId: "session", - sessionFile, + transcriptLocator, sessionKey: "main", }), defaultModel: "anthropic/claude-opus-4-6", @@ -755,11 +756,11 @@ describe("runMemoryFlushIfNeeded", () => { }); it("does not treat raw transcript metadata bytes as token pressure", async () => { - const sessionFile = resolveMainTranscriptPath(rootDir, "session"); + const transcriptLocator = resolveMainTranscriptPath(rootDir, "session"); appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "session", id: "session", @@ -768,7 +769,7 @@ describe("runMemoryFlushIfNeeded", () => { appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "custom", payload: "x".repeat(450_000), @@ -777,7 +778,7 @@ describe("runMemoryFlushIfNeeded", () => { appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "message", id: "m1", @@ -798,7 +799,7 @@ describe("runMemoryFlushIfNeeded", () => { })); const sessionEntry: SessionEntry = { sessionId: "session", - sessionFile, + transcriptLocator, updatedAt: Date.now(), totalTokensFresh: false, }; @@ -817,7 +818,7 @@ describe("runMemoryFlushIfNeeded", () => { }, followupRun: createTestFollowupRun({ sessionId: "session", - sessionFile, + transcriptLocator, sessionKey: "main", }), defaultModel: "anthropic/claude-opus-4-6", @@ -834,11 +835,11 @@ describe("runMemoryFlushIfNeeded", () => { }); it("triggers preflight compaction when the active transcript exceeds the configured byte threshold", async () => { - const sessionFile = resolveMainTranscriptPath(rootDir, "session"); + const transcriptLocator = resolveMainTranscriptPath(rootDir, "session"); appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "message", id: "m1", @@ -847,7 +848,7 @@ describe("runMemoryFlushIfNeeded", () => { }); const sessionEntry: SessionEntry = { sessionId: "session", - sessionFile, + transcriptLocator, updatedAt: Date.now(), totalTokens: 10, totalTokensFresh: true, @@ -873,7 +874,7 @@ describe("runMemoryFlushIfNeeded", () => { }, followupRun: createTestFollowupRun({ sessionId: "session", - sessionFile, + transcriptLocator, sessionKey: "main", }), defaultModel: "anthropic/claude-opus-4-6", @@ -889,7 +890,7 @@ describe("runMemoryFlushIfNeeded", () => { expect(replyOperation.setPhase).toHaveBeenCalledWith("preflight_compacting"); const compactCall = compactEmbeddedPiSessionMock.mock.calls[0]?.[0] as { currentTokenCount?: number; - sessionFile?: string; + transcriptLocator?: string; sessionId?: string; trigger?: string; }; @@ -900,15 +901,15 @@ describe("runMemoryFlushIfNeeded", () => { currentTokenCount: 10, }), ); - expect(compactCall.sessionFile).toBe(sessionFile); + expect(compactCall.transcriptLocator).toBe(transcriptLocator); }); it("keeps the active transcript byte threshold inactive unless transcript rotation is enabled", async () => { - const sessionFile = resolveMainTranscriptPath(rootDir, "session"); + const transcriptLocator = resolveMainTranscriptPath(rootDir, "session"); appendSqliteSessionTranscriptEvent({ agentId: "main", sessionId: "session", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, event: { type: "message", id: "m1", @@ -917,7 +918,7 @@ describe("runMemoryFlushIfNeeded", () => { }); const sessionEntry: SessionEntry = { sessionId: "session", - sessionFile, + transcriptLocator, updatedAt: Date.now(), totalTokens: 10, totalTokensFresh: true, @@ -936,7 +937,7 @@ describe("runMemoryFlushIfNeeded", () => { }, followupRun: createTestFollowupRun({ sessionId: "session", - sessionFile, + transcriptLocator, sessionKey: "main", }), defaultModel: "anthropic/claude-opus-4-6", diff --git a/src/auto-reply/reply/agent-runner-memory.ts b/src/auto-reply/reply/agent-runner-memory.ts index c96d705541a..748894d5ba4 100644 --- a/src/auto-reply/reply/agent-runner-memory.ts +++ b/src/auto-reply/reply/agent-runner-memory.ts @@ -214,9 +214,10 @@ function resolveSessionLogPath( const transcriptPath = normalizeOptionalString( (sessionEntry as (SessionEntry & { transcriptPath?: string }) | undefined)?.transcriptPath, ); - const sessionFile = normalizeOptionalString(sessionEntry?.sessionFile) || transcriptPath; + const transcriptLocator = + normalizeOptionalString(sessionEntry?.transcriptLocator) || transcriptPath; const agentId = resolveAgentIdFromSessionKey(sessionKey); - void sessionFile; + void transcriptLocator; return createSqliteSessionTranscriptLocator({ agentId, sessionId }); } catch { return undefined; @@ -239,7 +240,7 @@ function resolveSqliteSessionTranscriptPath(params: { if (candidates.length === 0) { return undefined; } - const explicit = normalizeOptionalString(params.sessionEntry?.sessionFile); + const explicit = normalizeOptionalString(params.sessionEntry?.transcriptLocator); if (explicit) { const matched = candidates.find((entry) => entry.locator === explicit); if (matched) { @@ -313,7 +314,7 @@ async function readSessionLogSnapshot(params: { const scope = resolveSqliteSessionTranscriptScope({ agentId: params.sessionKey ? resolveAgentIdFromSessionKey(params.sessionKey) : undefined, sessionId, - transcriptPath: params.sessionEntry?.sessionFile, + transcriptPath: params.sessionEntry?.transcriptLocator, }); if (!scope) { return snapshot; @@ -363,17 +364,17 @@ async function estimatePromptTokensFromSessionTranscript(params: { sessionId?: string; sessionEntry?: SessionEntry; sessionKey?: string; - sessionFile?: string; + transcriptLocator?: string; }): Promise { const sessionId = normalizeOptionalString(params.sessionId); if (!sessionId) { return undefined; } - const fallbackTranscriptLocator = normalizeOptionalString(params.sessionFile); + const fallbackTranscriptLocator = normalizeOptionalString(params.transcriptLocator); const sessionEntryForTranscript = - params.sessionEntry?.sessionFile || !fallbackTranscriptLocator + params.sessionEntry?.transcriptLocator || !fallbackTranscriptLocator ? params.sessionEntry - : ({ ...params.sessionEntry, sessionFile: fallbackTranscriptLocator } as SessionEntry); + : ({ ...params.sessionEntry, transcriptLocator: fallbackTranscriptLocator } as SessionEntry); try { const snapshot = await readSessionLogSnapshot({ sessionId, @@ -402,7 +403,7 @@ async function estimatePromptTokensFromSessionTranscript(params: { } const messages = (await readSessionMessagesAsync( sessionId, - sessionEntryForTranscript?.sessionFile, + sessionEntryForTranscript?.transcriptLocator, { agentId: resolveAgentIdFromSessionKey(params.sessionKey), mode: "recent", @@ -479,9 +480,9 @@ export async function runPreflightCompactionIfNeeded(params: { ? await readSessionLogSnapshot({ sessionId: entry.sessionId, sessionEntry: - entry.sessionFile || !params.followupRun.run.sessionFile + entry.transcriptLocator || !params.followupRun.run.transcriptLocator ? entry - : { ...entry, sessionFile: params.followupRun.run.sessionFile }, + : { ...entry, transcriptLocator: params.followupRun.run.transcriptLocator }, sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey, includeByteSize: true, includeUsage: false, @@ -506,7 +507,7 @@ export async function runPreflightCompactionIfNeeded(params: { sessionId: entry.sessionId, sessionEntry: entry, sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey, - sessionFile: entry.sessionFile ?? params.followupRun.run.sessionFile, + transcriptLocator: entry.transcriptLocator ?? params.followupRun.run.transcriptLocator, }); const stalePersistedPromptTokens = hasPersistedTotalTokens ? Math.floor(persistedTotalTokens) @@ -572,18 +573,20 @@ export async function runPreflightCompactionIfNeeded(params: { ); params.replyOperation.setPhase("preflight_compacting"); - const sessionFile = + const transcriptLocator = resolveSqliteSessionTranscriptPath({ sessionId: entry.sessionId, sessionEntry: - entry.sessionFile || !params.followupRun.run.sessionFile + entry.transcriptLocator || !params.followupRun.run.transcriptLocator ? entry - : { ...entry, sessionFile: params.followupRun.run.sessionFile }, + : { ...entry, transcriptLocator: params.followupRun.run.transcriptLocator }, sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey, }) ?? resolveSessionLogPath( entry.sessionId, - entry.sessionFile ? entry : { ...entry, sessionFile: params.followupRun.run.sessionFile }, + entry.transcriptLocator + ? entry + : { ...entry, transcriptLocator: params.followupRun.run.transcriptLocator }, params.sessionKey ?? params.followupRun.run.sessionKey, ); const result = await memoryDeps.compactEmbeddedPiSession({ @@ -599,7 +602,7 @@ export async function runPreflightCompactionIfNeeded(params: { senderName: params.followupRun.run.senderName, senderUsername: params.followupRun.run.senderUsername, senderE164: params.followupRun.run.senderE164, - sessionFile: sessionFile ?? params.followupRun.run.sessionFile, + transcriptLocator: transcriptLocator ?? params.followupRun.run.transcriptLocator, workspaceDir: params.followupRun.run.workspaceDir, agentDir: params.followupRun.run.agentDir, config: params.cfg, @@ -631,7 +634,7 @@ export async function runPreflightCompactionIfNeeded(params: { sessionKey: params.sessionKey, tokensAfter: result.result?.tokensAfter, newSessionId: result.result?.sessionId, - newSessionFile: result.result?.sessionFile, + newTranscriptLocator: result.result?.transcriptLocator, }); await appendPostCompactionRefreshPrompt({ cfg: params.cfg, @@ -642,8 +645,8 @@ export async function runPreflightCompactionIfNeeded(params: { const previousSessionId = params.followupRun.run.sessionId; params.followupRun.run.sessionId = entry.sessionId; params.replyOperation.updateSessionId(entry.sessionId); - if (entry.sessionFile) { - params.followupRun.run.sessionFile = entry.sessionFile; + if (result.result?.transcriptLocator) { + params.followupRun.run.transcriptLocator = result.result.transcriptLocator; } const queueKey = params.followupRun.run.sessionKey ?? params.sessionKey; if (queueKey) { @@ -651,7 +654,7 @@ export async function runPreflightCompactionIfNeeded(params: { key: queueKey, previousSessionId, nextSessionId: entry.sessionId, - nextSessionFile: entry.sessionFile, + nextTranscriptLocator: result.result?.transcriptLocator, }); } } @@ -891,7 +894,7 @@ export async function runMemoryFlushIfNeeded(params: { .filter(Boolean) .join("\n\n"); let postCompactionSessionId: string | undefined; - let postCompactionSessionFile: string | undefined; + let postCompactionTranscriptLocator: string | undefined; try { await memoryDeps.runWithModelFallback({ ...resolveMemoryFlushModelFallbackOptions( @@ -940,9 +943,15 @@ export async function runMemoryFlushIfNeeded(params: { }); if (result.meta?.agentMeta?.sessionId) { postCompactionSessionId = result.meta.agentMeta.sessionId; + postCompactionTranscriptLocator = + result.meta.agentMeta.transcriptLocator ?? + createSqliteSessionTranscriptLocator({ + agentId: params.followupRun.run.agentId, + sessionId: result.meta.agentMeta.sessionId, + }); } - if (result.meta?.agentMeta?.sessionFile) { - postCompactionSessionFile = result.meta.agentMeta.sessionFile; + if (result.meta?.agentMeta?.transcriptLocator) { + postCompactionTranscriptLocator = result.meta.agentMeta.transcriptLocator; } bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( result.meta?.systemPromptReport, @@ -962,15 +971,15 @@ export async function runMemoryFlushIfNeeded(params: { sessionStore: activeSessionStore, sessionKey: params.sessionKey, newSessionId: postCompactionSessionId, - newSessionFile: postCompactionSessionFile, + newTranscriptLocator: postCompactionTranscriptLocator, }); const updatedEntry = params.sessionKey ? activeSessionStore?.[params.sessionKey] : undefined; if (updatedEntry) { activeSessionEntry = updatedEntry; params.followupRun.run.sessionId = updatedEntry.sessionId; params.replyOperation.updateSessionId(updatedEntry.sessionId); - if (updatedEntry.sessionFile) { - params.followupRun.run.sessionFile = updatedEntry.sessionFile; + if (postCompactionTranscriptLocator) { + params.followupRun.run.transcriptLocator = postCompactionTranscriptLocator; } const queueKey = params.followupRun.run.sessionKey ?? params.sessionKey; if (queueKey) { @@ -978,7 +987,7 @@ export async function runMemoryFlushIfNeeded(params: { key: queueKey, previousSessionId, nextSessionId: updatedEntry.sessionId, - nextSessionFile: updatedEntry.sessionFile, + nextTranscriptLocator: postCompactionTranscriptLocator, }); } } @@ -998,8 +1007,8 @@ export async function runMemoryFlushIfNeeded(params: { activeSessionEntry = updatedEntry; params.followupRun.run.sessionId = updatedEntry.sessionId; params.replyOperation.updateSessionId(updatedEntry.sessionId); - if (updatedEntry.sessionFile) { - params.followupRun.run.sessionFile = updatedEntry.sessionFile; + if (postCompactionTranscriptLocator) { + params.followupRun.run.transcriptLocator = postCompactionTranscriptLocator; } } } catch (err) { diff --git a/src/auto-reply/reply/agent-runner-session-reset.test.ts b/src/auto-reply/reply/agent-runner-session-reset.test.ts index 9d5c0d53a93..5c56c1a49b7 100644 --- a/src/auto-reply/reply/agent-runner-session-reset.test.ts +++ b/src/auto-reply/reply/agent-runner-session-reset.test.ts @@ -53,7 +53,7 @@ describe("resetReplyRunSession", () => { const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: 1, - sessionFile: path.join(transcriptDir, "session.jsonl"), + transcriptLocator: path.join(transcriptDir, "session.jsonl"), modelProvider: "qwencode", model: "qwen", contextTokens: 123, @@ -108,12 +108,13 @@ describe("resetReplyRunSession", () => { key: "main", previousSessionId: "session", nextSessionId: activeSessionEntry?.sessionId, - nextSessionFile: activeSessionEntry?.sessionFile, + nextTranscriptLocator: followupRun.run.transcriptLocator, }); expect(errorMock).toHaveBeenCalledWith("reset 00000000-0000-0000-0000-000000000123"); const persisted = readTestSessionRow("main"); expect(persisted?.sessionId).toBe(activeSessionEntry?.sessionId); + expect(persisted?.transcriptLocator).toBeUndefined(); expect(persisted?.fallbackNoticeReason).toBeUndefined(); }); @@ -122,7 +123,7 @@ describe("resetReplyRunSession", () => { const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: 1, - sessionFile: path.join(transcriptDir, "session.jsonl"), + transcriptLocator: path.join(transcriptDir, "session.jsonl"), totalTokens: 42, compactionCount: 1, }; @@ -148,7 +149,10 @@ describe("resetReplyRunSession", () => { expect(activeSessionEntry?.sessionId).toBe("00000000-0000-0000-0000-000000000123"); expect(activeSessionEntry?.totalTokens).toBeUndefined(); expect(activeSessionEntry?.compactionCount).toBe(1); + expect(activeSessionEntry?.transcriptLocator).toBeUndefined(); expect(followupRun.run.sessionId).toBe(activeSessionEntry?.sessionId); - expect(readTestSessionRow("main")?.sessionId).toBe(activeSessionEntry?.sessionId); + const persisted = readTestSessionRow("main"); + expect(persisted?.sessionId).toBe(activeSessionEntry?.sessionId); + expect(persisted?.transcriptLocator).toBeUndefined(); }); }); diff --git a/src/auto-reply/reply/agent-runner-session-reset.ts b/src/auto-reply/reply/agent-runner-session-reset.ts index eaab3f50af3..753fdba607b 100644 --- a/src/auto-reply/reply/agent-runner-session-reset.ts +++ b/src/auto-reply/reply/agent-runner-session-reset.ts @@ -44,7 +44,7 @@ export async function resetReplyRunSession(params: { messageThreadId?: string; followupRun: FollowupRun; onActiveSessionEntry: (entry: SessionEntry) => void; - onNewSession: (newSessionId: string, nextSessionFile: string) => void; + onNewSession: (newSessionId: string, nextTranscriptLocator: string) => void; }): Promise { if (!params.sessionKey) { return false; @@ -86,11 +86,10 @@ export async function resetReplyRunSession(params: { fallbackNoticeActiveModel: undefined, fallbackNoticeReason: undefined, }; - const nextSessionFile = createSqliteSessionTranscriptLocator({ + const nextTranscriptLocator = createSqliteSessionTranscriptLocator({ sessionId: nextSessionId, agentId, }); - nextEntry.sessionFile = nextSessionFile; if (params.activeSessionStore) { params.activeSessionStore[params.sessionKey] = nextEntry; } @@ -112,21 +111,20 @@ export async function resetReplyRunSession(params: { await replayRecentUserAssistantMessages({ sourceAgentId: agentId, sourceSessionId: prevEntry.sessionId, - sourceTranscript: prevEntry.sessionFile, targetAgentId: agentId, - targetTranscript: nextSessionFile, + targetTranscript: nextTranscriptLocator, newSessionId: nextSessionId, }); params.followupRun.run.sessionId = nextSessionId; - params.followupRun.run.sessionFile = nextSessionFile; + params.followupRun.run.transcriptLocator = nextTranscriptLocator; deps.refreshQueuedFollowupSession({ key: params.queueKey, previousSessionId: prevEntry.sessionId, nextSessionId, - nextSessionFile, + nextTranscriptLocator, }); params.onActiveSessionEntry(nextEntry); - params.onNewSession(nextSessionId, nextSessionFile); + params.onNewSession(nextSessionId, nextTranscriptLocator); deps.error(params.options.buildLogMessage(nextSessionId)); return true; } diff --git a/src/auto-reply/reply/session-updates.lifecycle.test.ts b/src/auto-reply/reply/session-updates.lifecycle.test.ts index a6a5abf67f9..bfcb8976fe0 100644 --- a/src/auto-reply/reply/session-updates.lifecycle.test.ts +++ b/src/auto-reply/reply/session-updates.lifecycle.test.ts @@ -30,8 +30,7 @@ async function createFixture() { } process.env.OPENCLAW_STATE_DIR = root; const sessionKey = "agent:main:forum:direct:compaction"; - const transcriptPath = path.join(root, "transcripts", "s1.jsonl"); - await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + const transcriptPath = createSqliteSessionTranscriptLocator({ agentId: "main", sessionId: "s1" }); replaceSqliteSessionTranscriptEvents({ agentId: "main", sessionId: "s1", @@ -40,7 +39,6 @@ async function createFixture() { }); const entry = { sessionId: "s1", - sessionFile: transcriptPath, updatedAt: Date.now(), compactionCount: 0, } as SessionEntry; @@ -111,7 +109,7 @@ describe("session-updates lifecycle hooks", () => { sessionKey, reason: "compaction", }); - expect(endEvent?.sessionFile).toBe( + expect(endEvent?.transcriptLocator).toBe( createSqliteSessionTranscriptLocator({ agentId: "main", sessionId: "s1" }), ); expect(endContext).toMatchObject({ @@ -132,7 +130,7 @@ describe("session-updates lifecycle hooks", () => { }); }); - it("keeps SQLite transcript locators virtual when compaction rotates topic sessions", async () => { + it("keeps topic compaction identity out of active session rows", async () => { const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-updates-sqlite-")); tempDirs.push(root); if (!previousStateDirCaptured) { @@ -141,7 +139,7 @@ describe("session-updates lifecycle hooks", () => { } process.env.OPENCLAW_STATE_DIR = root; const sessionKey = "agent:main:forum:direct:compaction:topic:456"; - const sessionFile = createSqliteSessionTranscriptLocator({ + const transcriptLocator = createSqliteSessionTranscriptLocator({ agentId: "main", sessionId: "s1", topicId: 456, @@ -149,12 +147,11 @@ describe("session-updates lifecycle hooks", () => { replaceSqliteSessionTranscriptEvents({ agentId: "main", sessionId: "s1", - transcriptPath: sessionFile, + transcriptPath: transcriptLocator, events: [{ type: "message" }], }); const entry = { sessionId: "s1", - sessionFile, updatedAt: Date.now(), compactionCount: 0, } as SessionEntry; @@ -172,15 +169,9 @@ describe("session-updates lifecycle hooks", () => { newSessionId: "s2", }); - const expectedNextFile = createSqliteSessionTranscriptLocator({ - agentId: "main", - sessionId: "s2", - topicId: 456, - }); - expect(sessionStore[sessionKey]?.sessionFile).toBe(expectedNextFile); - expect(sessionStore[sessionKey]?.sessionFile).toContain("sqlite-transcript://"); - expect(sessionStore[sessionKey]?.sessionFile).not.toMatch(/^sqlite-transcript:\/[^/]/u); + expect(sessionStore[sessionKey]?.sessionId).toBe("s2"); + expect(sessionStore[sessionKey]?.transcriptLocator).toBeUndefined(); const [endEvent] = hookRunnerMocks.runSessionEnd.mock.calls[0] ?? []; - expect(endEvent?.sessionFile).toBe(sessionFile); + expect(endEvent?.transcriptLocator).toBe(transcriptLocator); }); }); diff --git a/src/auto-reply/reply/session-updates.ts b/src/auto-reply/reply/session-updates.ts index c49ef8d4223..1ffdd22e6e4 100644 --- a/src/auto-reply/reply/session-updates.ts +++ b/src/auto-reply/reply/session-updates.ts @@ -15,6 +15,7 @@ import { type SessionEntry, upsertSessionEntry, } from "../../config/sessions.js"; +import { parseSessionThreadInfoFast } from "../../config/sessions/thread-info.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { resolveStableSessionEndTranscript } from "../../gateway/session-transcript-paths.js"; import { logVerbose } from "../../globals.js"; @@ -69,6 +70,7 @@ function emitCompactionSessionLifecycleHooks(params: { const transcript = resolveStableSessionEndTranscript({ sessionId: params.previousEntry.sessionId, agentId: resolveAgentIdFromSessionKey(params.sessionKey), + topicId: resolveCompactionTopicId(params.sessionKey), }); const payload = buildSessionEndHookPayload({ sessionId: params.previousEntry.sessionId, @@ -96,6 +98,15 @@ function emitCompactionSessionLifecycleHooks(params: { } } +function resolveCompactionTopicId(sessionKey: string): string | undefined { + const parsedThreadId = parseSessionThreadInfoFast(sessionKey).threadId; + if (parsedThreadId) { + return parsedThreadId; + } + const match = /(?:^|:)topic:([^:]+)/u.exec(sessionKey); + return match?.[1]; +} + function resolvePositiveTokenCount(value: number | undefined): number | undefined { return typeof value === "number" && Number.isFinite(value) && value > 0 ? Math.floor(value) @@ -274,8 +285,9 @@ export async function incrementCompactionCount(params: { updates.cacheRead = undefined; updates.cacheWrite = undefined; } + const { transcriptLocator: _derivedTranscriptLocator, ...entryWithoutLocator } = entry; sessionStore[sessionKey] = { - ...entry, + ...entryWithoutLocator, ...updates, }; const agentId = diff --git a/src/gateway/session-transcript-paths.ts b/src/gateway/session-transcript-paths.ts index fa9bb204731..8471e968907 100644 --- a/src/gateway/session-transcript-paths.ts +++ b/src/gateway/session-transcript-paths.ts @@ -10,8 +10,9 @@ function normalizeTranscriptLocator(value: string | undefined): string | undefin export function resolveSessionTranscriptCandidates( sessionId: string, - sessionFile?: string, + transcriptLocator?: string, agentId?: string, + topicId?: string | number, ): string[] { const candidates: string[] = []; const pushCandidate = (resolve: () => string): void => { @@ -22,13 +23,13 @@ export function resolveSessionTranscriptCandidates( } }; - const normalizedSessionFile = normalizeTranscriptLocator(sessionFile); - if (normalizedSessionFile) { - candidates.push(normalizedSessionFile); + const normalizedTranscriptLocator = normalizeTranscriptLocator(transcriptLocator); + if (normalizedTranscriptLocator) { + candidates.push(normalizedTranscriptLocator); } if (agentId) { - pushCandidate(() => createSqliteSessionTranscriptLocator({ sessionId, agentId })); + pushCandidate(() => createSqliteSessionTranscriptLocator({ sessionId, agentId, topicId })); } return Array.from(new Set(candidates)); @@ -36,18 +37,20 @@ export function resolveSessionTranscriptCandidates( export function resolveStableSessionEndTranscript(params: { sessionId: string; - sessionFile?: string; + transcriptLocator?: string; agentId?: string; -}): { sessionFile?: string } { - const stableLocator = normalizeTranscriptLocator(params.sessionFile); + topicId?: string | number; +}): { transcriptLocator?: string } { + const stableLocator = normalizeTranscriptLocator(params.transcriptLocator); if (stableLocator) { - return { sessionFile: stableLocator }; + return { transcriptLocator: stableLocator }; } const [candidate] = resolveSessionTranscriptCandidates( params.sessionId, - params.sessionFile, + params.transcriptLocator, params.agentId, + params.topicId, ); - return candidate ? { sessionFile: candidate } : {}; + return candidate ? { transcriptLocator: candidate } : {}; } diff --git a/src/plugin-sdk/session-store-runtime.ts b/src/plugin-sdk/session-store-runtime.ts index 785920d37b6..b2a021a87ea 100644 --- a/src/plugin-sdk/session-store-runtime.ts +++ b/src/plugin-sdk/session-store-runtime.ts @@ -7,7 +7,7 @@ export { export { openOpenClawStateDatabase } from "../state/openclaw-state-db.js"; export { resolveSessionRowEntry } from "../config/sessions/store-entry.js"; export { createSqliteSessionTranscriptLocator } from "../config/sessions/paths.js"; -export { resolveAndPersistSessionTranscriptLocator } from "../config/sessions/session-locator.js"; +export { resolveAndPersistSessionTranscriptIdentity } from "../config/sessions/session-locator.js"; export { resolveSessionKey } from "../config/sessions/session-key.js"; export { resolveGroupSessionKey } from "../config/sessions/group.js"; export { canonicalizeMainSessionAlias } from "../config/sessions/main-session.js";