refactor: keep compaction transcript handles out of session rows

This commit is contained in:
Peter Steinberger
2026-05-09 07:34:20 +01:00
parent b958c509c3
commit 442d8065ca
10 changed files with 159 additions and 148 deletions

View File

@@ -53,7 +53,7 @@ const sessionMocks = vi.hoisted(() => {
Object.entries(sessionStore.value).map(([sessionKey, entry]) => ({ sessionKey, entry })),
),
recordSessionMetaFromInbound: vi.fn(),
resolveAndPersistSessionTranscriptLocator: vi.fn(),
resolveAndPersistSessionTranscriptIdentity: vi.fn(),
sessionStore,
};
});
@@ -160,8 +160,8 @@ vi.mock("openclaw/plugin-sdk/session-store-runtime", async () => {
...actual,
getSessionEntry: sessionMocks.getSessionEntry,
listSessionEntries: sessionMocks.listSessionEntries,
resolveAndPersistSessionTranscriptLocator:
sessionMocks.resolveAndPersistSessionTranscriptLocator,
resolveAndPersistSessionTranscriptIdentity:
sessionMocks.resolveAndPersistSessionTranscriptIdentity,
};
});
vi.mock("openclaw/plugin-sdk/command-auth-native", async () => {
@@ -484,17 +484,16 @@ describe("registerTelegramNativeCommands — session metadata", () => {
})),
);
sessionMocks.recordSessionMetaFromInbound.mockClear().mockResolvedValue(undefined);
sessionMocks.resolveAndPersistSessionTranscriptLocator
sessionMocks.resolveAndPersistSessionTranscriptIdentity
.mockClear()
.mockImplementation(async (params) => {
const transcriptLocator =
params.fallbackTranscriptLocator ?? `/tmp/openclaw-sessions/${params.sessionId}.jsonl`;
const topicSuffix = params.topicId === undefined ? "" : `?topic=${params.topicId}`;
const transcriptLocator = `sqlite-transcript://${params.agentId ?? "main"}/${params.sessionId}${topicSuffix}`;
return {
transcriptLocator,
sessionEntry: {
...params.sessionEntry,
sessionId: params.sessionId,
sessionFile: transcriptLocator,
updatedAt: Date.now(),
},
};
@@ -1163,7 +1162,7 @@ describe("registerTelegramNativeCommands — session metadata", () => {
expectUnauthorizedNewCommandBlocked(sendMessage);
});
it("passes a persisted topic session file to plugin commands", async () => {
it("passes a persisted topic transcript locator to plugin commands", async () => {
sessionMocks.sessionStore.value = {
"agent:main:telegram:group:-1001234567890:topic:42": {
sessionId: "sess-topic",
@@ -1200,18 +1199,18 @@ describe("registerTelegramNativeCommands — session metadata", () => {
createTelegramTopicCommandContext({ match: "bind --cwd /tmp/work", threadId: 42 }),
);
expect(sessionMocks.resolveAndPersistSessionTranscriptLocator).toHaveBeenCalledWith(
expect(sessionMocks.resolveAndPersistSessionTranscriptIdentity).toHaveBeenCalledWith(
expect.objectContaining({
sessionId: "sess-topic",
sessionKey: "agent:main:telegram:group:-1001234567890:topic:42",
fallbackTranscriptLocator: "sqlite-transcript://main/sess-topic-topic-42.jsonl",
topicId: 42,
}),
);
expect(pluginRuntimeMocks.executePluginCommand).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:main:telegram:group:-1001234567890:topic:42",
sessionId: "sess-topic",
sessionFile: "sqlite-transcript://main/sess-topic-topic-42.jsonl",
transcriptLocator: "sqlite-transcript://main/sess-topic?topic=42",
messageThreadId: 42,
}),
);

View File

@@ -36,10 +36,9 @@ import { danger, logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { getChildLogger } from "openclaw/plugin-sdk/runtime-env";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import {
createSqliteSessionTranscriptLocator,
getSessionEntry,
listSessionEntries,
resolveAndPersistSessionTranscriptLocator,
resolveAndPersistSessionTranscriptIdentity,
resolveSessionRowEntry,
} from "openclaw/plugin-sdk/session-store-runtime";
import {
@@ -183,17 +182,12 @@ async function resolveTelegramCommandTranscriptLocator(params: {
sessionKey,
});
const sessionId = resolved.existing?.sessionId?.trim() || randomUUID();
const fallbackTranscriptLocator = createSqliteSessionTranscriptLocator({
sessionId,
agentId: params.agentId,
topicId: params.threadId,
});
const persisted = await resolveAndPersistSessionTranscriptLocator({
const persisted = await resolveAndPersistSessionTranscriptIdentity({
sessionId,
sessionKey: resolved.normalizedKey,
sessionEntry: resolved.existing,
agentId: params.agentId,
fallbackTranscriptLocator,
topicId: params.threadId,
});
return { sessionId, sessionFile: persisted.transcriptLocator };
} catch {

View File

@@ -88,10 +88,10 @@ describe("runMemoryFlushIfNeeded", () => {
};
if (typeof params.newSessionId === "string" && params.newSessionId) {
nextEntry.sessionId = params.newSessionId;
if (typeof params.newSessionFile === "string" && params.newSessionFile) {
nextEntry.sessionFile = params.newSessionFile;
if (typeof params.newTranscriptLocator === "string" && params.newTranscriptLocator) {
nextEntry.transcriptLocator = params.newTranscriptLocator;
} else {
nextEntry.sessionFile = resolveMainTranscriptPath(rootDir, params.newSessionId);
nextEntry.transcriptLocator = resolveMainTranscriptPath(rootDir, params.newSessionId);
}
}
params.sessionStore[sessionKey] = nextEntry;
@@ -187,11 +187,12 @@ describe("runMemoryFlushIfNeeded", () => {
key: sessionKey,
previousSessionId: "session",
nextSessionId: "session-rotated",
nextSessionFile: expect.stringContaining("session-rotated.jsonl"),
nextTranscriptLocator: "sqlite-transcript://main/session-rotated",
});
const persisted = readTestSessionRow(sessionKey);
expect(persisted?.sessionId).toBe("session-rotated");
expect(persisted?.transcriptLocator).toBeUndefined();
expect(persisted?.compactionCount).toBe(2);
expect(persisted?.memoryFlushCompactionCount).toBe(1);
expect(persisted?.memoryFlushAt).toBe(1_700_000_000_000);
@@ -329,11 +330,11 @@ describe("runMemoryFlushIfNeeded", () => {
});
it("passes runtime policy session key to preflight compaction sandbox resolution", async () => {
const sessionFile = resolveMainTranscriptPath(rootDir, "session");
const transcriptLocator = resolveMainTranscriptPath(rootDir, "session");
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "message",
id: "m1",
@@ -350,7 +351,7 @@ describe("runMemoryFlushIfNeeded", () => {
}));
const sessionEntry: SessionEntry = {
sessionId: "session",
sessionFile,
transcriptLocator,
updatedAt: Date.now(),
totalTokensFresh: false,
};
@@ -359,7 +360,7 @@ describe("runMemoryFlushIfNeeded", () => {
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun({
sessionId: "session",
sessionFile,
transcriptLocator,
sessionKey: "agent:main:main",
runtimePolicySessionKey: "agent:main:telegram:default:direct:12345",
}),
@@ -382,12 +383,12 @@ describe("runMemoryFlushIfNeeded", () => {
});
it("updates the active preflight run after transcript rotation", async () => {
const sessionFile = resolveMainTranscriptPath(rootDir, "session");
const transcriptLocator = resolveMainTranscriptPath(rootDir, "session");
const successorFile = resolveMainTranscriptPath(rootDir, "session-rotated");
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "message",
id: "m1",
@@ -408,19 +409,19 @@ describe("runMemoryFlushIfNeeded", () => {
result: {
tokensAfter: 42,
sessionId: "session-rotated",
sessionFile: successorFile,
transcriptLocator: successorFile,
},
});
const sessionEntry: SessionEntry = {
sessionId: "session",
sessionFile,
transcriptLocator,
updatedAt: Date.now(),
totalTokensFresh: false,
};
const sessionStore = { "agent:main:main": sessionEntry };
const followupRun = createTestFollowupRun({
sessionId: "session",
sessionFile,
transcriptLocator,
sessionKey: "agent:main:main",
});
const updateSessionId = vi.fn();
@@ -443,24 +444,24 @@ describe("runMemoryFlushIfNeeded", () => {
});
expect(entry?.sessionId).toBe("session-rotated");
expect(entry?.sessionFile).toBe(successorFile);
expect(entry?.transcriptLocator).toBeUndefined();
expect(followupRun.run.sessionId).toBe("session-rotated");
expect(followupRun.run.sessionFile).toBe(successorFile);
expect(followupRun.run.transcriptLocator).toBe(successorFile);
expect(updateSessionId).toHaveBeenCalledWith("session-rotated");
expect(refreshQueuedFollowupSessionMock).toHaveBeenCalledWith({
key: "agent:main:main",
previousSessionId: "session",
nextSessionId: "session-rotated",
nextSessionFile: successorFile,
nextTranscriptLocator: successorFile,
});
});
it("includes recent output tokens when deciding preflight compaction", async () => {
const sessionFile = resolveMainTranscriptPath(rootDir, "session");
const transcriptLocator = resolveMainTranscriptPath(rootDir, "session");
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "message",
id: "m1",
@@ -481,7 +482,7 @@ describe("runMemoryFlushIfNeeded", () => {
}));
const sessionEntry: SessionEntry = {
sessionId: "session",
sessionFile,
transcriptLocator,
updatedAt: Date.now(),
totalTokensFresh: false,
};
@@ -490,7 +491,7 @@ describe("runMemoryFlushIfNeeded", () => {
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun({
sessionId: "session",
sessionFile,
transcriptLocator,
sessionKey: "main",
}),
defaultModel: "anthropic/claude-opus-4-6",
@@ -508,12 +509,12 @@ describe("runMemoryFlushIfNeeded", () => {
expect(compactCall.currentTokenCount).toBeGreaterThanOrEqual(100_000);
});
it("uses the active run sessionFile when the session entry has no transcript path", async () => {
const sessionFile = resolveMainTranscriptPath(rootDir, "session");
it("uses the active run transcriptLocator when the session entry has no transcript path", async () => {
const transcriptLocator = resolveMainTranscriptPath(rootDir, "session");
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "message",
id: "m1",
@@ -542,7 +543,7 @@ describe("runMemoryFlushIfNeeded", () => {
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun({
sessionId: "session",
sessionFile,
transcriptLocator,
sessionKey: "main",
}),
defaultModel: "anthropic/claude-opus-4-6",
@@ -557,17 +558,17 @@ describe("runMemoryFlushIfNeeded", () => {
expect(compactEmbeddedPiSessionMock).toHaveBeenCalledWith(
expect.objectContaining({
sessionId: "session",
sessionFile,
transcriptLocator,
}),
);
});
it("keeps preflight compaction conservative for content appended after latest usage", async () => {
const sessionFile = resolveMainTranscriptPath(rootDir, "session");
const transcriptLocator = resolveMainTranscriptPath(rootDir, "session");
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "message",
id: "m1",
@@ -581,7 +582,7 @@ describe("runMemoryFlushIfNeeded", () => {
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "message",
id: "m2",
@@ -601,7 +602,7 @@ describe("runMemoryFlushIfNeeded", () => {
}));
const sessionEntry: SessionEntry = {
sessionId: "session",
sessionFile,
transcriptLocator,
updatedAt: Date.now(),
totalTokensFresh: false,
};
@@ -610,7 +611,7 @@ describe("runMemoryFlushIfNeeded", () => {
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun({
sessionId: "session",
sessionFile,
transcriptLocator,
sessionKey: "main",
}),
defaultModel: "anthropic/claude-opus-4-6",
@@ -629,11 +630,11 @@ describe("runMemoryFlushIfNeeded", () => {
});
it("combines latest usage with post-usage tail pressure for preflight compaction", async () => {
const sessionFile = resolveMainTranscriptPath(rootDir, "session");
const transcriptLocator = resolveMainTranscriptPath(rootDir, "session");
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "message",
id: "m1",
@@ -647,7 +648,7 @@ describe("runMemoryFlushIfNeeded", () => {
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "message",
id: "m2",
@@ -667,7 +668,7 @@ describe("runMemoryFlushIfNeeded", () => {
}));
const sessionEntry: SessionEntry = {
sessionId: "session",
sessionFile,
transcriptLocator,
updatedAt: Date.now(),
totalTokensFresh: false,
};
@@ -676,7 +677,7 @@ describe("runMemoryFlushIfNeeded", () => {
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun({
sessionId: "session",
sessionFile,
transcriptLocator,
sessionKey: "main",
}),
defaultModel: "anthropic/claude-opus-4-6",
@@ -695,11 +696,11 @@ describe("runMemoryFlushIfNeeded", () => {
});
it("does not count bytes from a large latest usage record as post-usage tail pressure", async () => {
const sessionFile = resolveMainTranscriptPath(rootDir, "session");
const transcriptLocator = resolveMainTranscriptPath(rootDir, "session");
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "session",
id: "session",
@@ -708,7 +709,7 @@ describe("runMemoryFlushIfNeeded", () => {
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "message",
id: "m1",
@@ -729,7 +730,7 @@ describe("runMemoryFlushIfNeeded", () => {
}));
const sessionEntry: SessionEntry = {
sessionId: "session",
sessionFile,
transcriptLocator,
updatedAt: Date.now(),
totalTokensFresh: false,
};
@@ -738,7 +739,7 @@ describe("runMemoryFlushIfNeeded", () => {
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
followupRun: createTestFollowupRun({
sessionId: "session",
sessionFile,
transcriptLocator,
sessionKey: "main",
}),
defaultModel: "anthropic/claude-opus-4-6",
@@ -755,11 +756,11 @@ describe("runMemoryFlushIfNeeded", () => {
});
it("does not treat raw transcript metadata bytes as token pressure", async () => {
const sessionFile = resolveMainTranscriptPath(rootDir, "session");
const transcriptLocator = resolveMainTranscriptPath(rootDir, "session");
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "session",
id: "session",
@@ -768,7 +769,7 @@ describe("runMemoryFlushIfNeeded", () => {
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "custom",
payload: "x".repeat(450_000),
@@ -777,7 +778,7 @@ describe("runMemoryFlushIfNeeded", () => {
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "message",
id: "m1",
@@ -798,7 +799,7 @@ describe("runMemoryFlushIfNeeded", () => {
}));
const sessionEntry: SessionEntry = {
sessionId: "session",
sessionFile,
transcriptLocator,
updatedAt: Date.now(),
totalTokensFresh: false,
};
@@ -817,7 +818,7 @@ describe("runMemoryFlushIfNeeded", () => {
},
followupRun: createTestFollowupRun({
sessionId: "session",
sessionFile,
transcriptLocator,
sessionKey: "main",
}),
defaultModel: "anthropic/claude-opus-4-6",
@@ -834,11 +835,11 @@ describe("runMemoryFlushIfNeeded", () => {
});
it("triggers preflight compaction when the active transcript exceeds the configured byte threshold", async () => {
const sessionFile = resolveMainTranscriptPath(rootDir, "session");
const transcriptLocator = resolveMainTranscriptPath(rootDir, "session");
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "message",
id: "m1",
@@ -847,7 +848,7 @@ describe("runMemoryFlushIfNeeded", () => {
});
const sessionEntry: SessionEntry = {
sessionId: "session",
sessionFile,
transcriptLocator,
updatedAt: Date.now(),
totalTokens: 10,
totalTokensFresh: true,
@@ -873,7 +874,7 @@ describe("runMemoryFlushIfNeeded", () => {
},
followupRun: createTestFollowupRun({
sessionId: "session",
sessionFile,
transcriptLocator,
sessionKey: "main",
}),
defaultModel: "anthropic/claude-opus-4-6",
@@ -889,7 +890,7 @@ describe("runMemoryFlushIfNeeded", () => {
expect(replyOperation.setPhase).toHaveBeenCalledWith("preflight_compacting");
const compactCall = compactEmbeddedPiSessionMock.mock.calls[0]?.[0] as {
currentTokenCount?: number;
sessionFile?: string;
transcriptLocator?: string;
sessionId?: string;
trigger?: string;
};
@@ -900,15 +901,15 @@ describe("runMemoryFlushIfNeeded", () => {
currentTokenCount: 10,
}),
);
expect(compactCall.sessionFile).toBe(sessionFile);
expect(compactCall.transcriptLocator).toBe(transcriptLocator);
});
it("keeps the active transcript byte threshold inactive unless transcript rotation is enabled", async () => {
const sessionFile = resolveMainTranscriptPath(rootDir, "session");
const transcriptLocator = resolveMainTranscriptPath(rootDir, "session");
appendSqliteSessionTranscriptEvent({
agentId: "main",
sessionId: "session",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
event: {
type: "message",
id: "m1",
@@ -917,7 +918,7 @@ describe("runMemoryFlushIfNeeded", () => {
});
const sessionEntry: SessionEntry = {
sessionId: "session",
sessionFile,
transcriptLocator,
updatedAt: Date.now(),
totalTokens: 10,
totalTokensFresh: true,
@@ -936,7 +937,7 @@ describe("runMemoryFlushIfNeeded", () => {
},
followupRun: createTestFollowupRun({
sessionId: "session",
sessionFile,
transcriptLocator,
sessionKey: "main",
}),
defaultModel: "anthropic/claude-opus-4-6",

View File

@@ -214,9 +214,10 @@ function resolveSessionLogPath(
const transcriptPath = normalizeOptionalString(
(sessionEntry as (SessionEntry & { transcriptPath?: string }) | undefined)?.transcriptPath,
);
const sessionFile = normalizeOptionalString(sessionEntry?.sessionFile) || transcriptPath;
const transcriptLocator =
normalizeOptionalString(sessionEntry?.transcriptLocator) || transcriptPath;
const agentId = resolveAgentIdFromSessionKey(sessionKey);
void sessionFile;
void transcriptLocator;
return createSqliteSessionTranscriptLocator({ agentId, sessionId });
} catch {
return undefined;
@@ -239,7 +240,7 @@ function resolveSqliteSessionTranscriptPath(params: {
if (candidates.length === 0) {
return undefined;
}
const explicit = normalizeOptionalString(params.sessionEntry?.sessionFile);
const explicit = normalizeOptionalString(params.sessionEntry?.transcriptLocator);
if (explicit) {
const matched = candidates.find((entry) => entry.locator === explicit);
if (matched) {
@@ -313,7 +314,7 @@ async function readSessionLogSnapshot(params: {
const scope = resolveSqliteSessionTranscriptScope({
agentId: params.sessionKey ? resolveAgentIdFromSessionKey(params.sessionKey) : undefined,
sessionId,
transcriptPath: params.sessionEntry?.sessionFile,
transcriptPath: params.sessionEntry?.transcriptLocator,
});
if (!scope) {
return snapshot;
@@ -363,17 +364,17 @@ async function estimatePromptTokensFromSessionTranscript(params: {
sessionId?: string;
sessionEntry?: SessionEntry;
sessionKey?: string;
sessionFile?: string;
transcriptLocator?: string;
}): Promise<TranscriptTokenEstimate | undefined> {
const sessionId = normalizeOptionalString(params.sessionId);
if (!sessionId) {
return undefined;
}
const fallbackTranscriptLocator = normalizeOptionalString(params.sessionFile);
const fallbackTranscriptLocator = normalizeOptionalString(params.transcriptLocator);
const sessionEntryForTranscript =
params.sessionEntry?.sessionFile || !fallbackTranscriptLocator
params.sessionEntry?.transcriptLocator || !fallbackTranscriptLocator
? params.sessionEntry
: ({ ...params.sessionEntry, sessionFile: fallbackTranscriptLocator } as SessionEntry);
: ({ ...params.sessionEntry, transcriptLocator: fallbackTranscriptLocator } as SessionEntry);
try {
const snapshot = await readSessionLogSnapshot({
sessionId,
@@ -402,7 +403,7 @@ async function estimatePromptTokensFromSessionTranscript(params: {
}
const messages = (await readSessionMessagesAsync(
sessionId,
sessionEntryForTranscript?.sessionFile,
sessionEntryForTranscript?.transcriptLocator,
{
agentId: resolveAgentIdFromSessionKey(params.sessionKey),
mode: "recent",
@@ -479,9 +480,9 @@ export async function runPreflightCompactionIfNeeded(params: {
? await readSessionLogSnapshot({
sessionId: entry.sessionId,
sessionEntry:
entry.sessionFile || !params.followupRun.run.sessionFile
entry.transcriptLocator || !params.followupRun.run.transcriptLocator
? entry
: { ...entry, sessionFile: params.followupRun.run.sessionFile },
: { ...entry, transcriptLocator: params.followupRun.run.transcriptLocator },
sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey,
includeByteSize: true,
includeUsage: false,
@@ -506,7 +507,7 @@ export async function runPreflightCompactionIfNeeded(params: {
sessionId: entry.sessionId,
sessionEntry: entry,
sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey,
sessionFile: entry.sessionFile ?? params.followupRun.run.sessionFile,
transcriptLocator: entry.transcriptLocator ?? params.followupRun.run.transcriptLocator,
});
const stalePersistedPromptTokens = hasPersistedTotalTokens
? Math.floor(persistedTotalTokens)
@@ -572,18 +573,20 @@ export async function runPreflightCompactionIfNeeded(params: {
);
params.replyOperation.setPhase("preflight_compacting");
const sessionFile =
const transcriptLocator =
resolveSqliteSessionTranscriptPath({
sessionId: entry.sessionId,
sessionEntry:
entry.sessionFile || !params.followupRun.run.sessionFile
entry.transcriptLocator || !params.followupRun.run.transcriptLocator
? entry
: { ...entry, sessionFile: params.followupRun.run.sessionFile },
: { ...entry, transcriptLocator: params.followupRun.run.transcriptLocator },
sessionKey: params.sessionKey ?? params.followupRun.run.sessionKey,
}) ??
resolveSessionLogPath(
entry.sessionId,
entry.sessionFile ? entry : { ...entry, sessionFile: params.followupRun.run.sessionFile },
entry.transcriptLocator
? entry
: { ...entry, transcriptLocator: params.followupRun.run.transcriptLocator },
params.sessionKey ?? params.followupRun.run.sessionKey,
);
const result = await memoryDeps.compactEmbeddedPiSession({
@@ -599,7 +602,7 @@ export async function runPreflightCompactionIfNeeded(params: {
senderName: params.followupRun.run.senderName,
senderUsername: params.followupRun.run.senderUsername,
senderE164: params.followupRun.run.senderE164,
sessionFile: sessionFile ?? params.followupRun.run.sessionFile,
transcriptLocator: transcriptLocator ?? params.followupRun.run.transcriptLocator,
workspaceDir: params.followupRun.run.workspaceDir,
agentDir: params.followupRun.run.agentDir,
config: params.cfg,
@@ -631,7 +634,7 @@ export async function runPreflightCompactionIfNeeded(params: {
sessionKey: params.sessionKey,
tokensAfter: result.result?.tokensAfter,
newSessionId: result.result?.sessionId,
newSessionFile: result.result?.sessionFile,
newTranscriptLocator: result.result?.transcriptLocator,
});
await appendPostCompactionRefreshPrompt({
cfg: params.cfg,
@@ -642,8 +645,8 @@ export async function runPreflightCompactionIfNeeded(params: {
const previousSessionId = params.followupRun.run.sessionId;
params.followupRun.run.sessionId = entry.sessionId;
params.replyOperation.updateSessionId(entry.sessionId);
if (entry.sessionFile) {
params.followupRun.run.sessionFile = entry.sessionFile;
if (result.result?.transcriptLocator) {
params.followupRun.run.transcriptLocator = result.result.transcriptLocator;
}
const queueKey = params.followupRun.run.sessionKey ?? params.sessionKey;
if (queueKey) {
@@ -651,7 +654,7 @@ export async function runPreflightCompactionIfNeeded(params: {
key: queueKey,
previousSessionId,
nextSessionId: entry.sessionId,
nextSessionFile: entry.sessionFile,
nextTranscriptLocator: result.result?.transcriptLocator,
});
}
}
@@ -891,7 +894,7 @@ export async function runMemoryFlushIfNeeded(params: {
.filter(Boolean)
.join("\n\n");
let postCompactionSessionId: string | undefined;
let postCompactionSessionFile: string | undefined;
let postCompactionTranscriptLocator: string | undefined;
try {
await memoryDeps.runWithModelFallback({
...resolveMemoryFlushModelFallbackOptions(
@@ -940,9 +943,15 @@ export async function runMemoryFlushIfNeeded(params: {
});
if (result.meta?.agentMeta?.sessionId) {
postCompactionSessionId = result.meta.agentMeta.sessionId;
postCompactionTranscriptLocator =
result.meta.agentMeta.transcriptLocator ??
createSqliteSessionTranscriptLocator({
agentId: params.followupRun.run.agentId,
sessionId: result.meta.agentMeta.sessionId,
});
}
if (result.meta?.agentMeta?.sessionFile) {
postCompactionSessionFile = result.meta.agentMeta.sessionFile;
if (result.meta?.agentMeta?.transcriptLocator) {
postCompactionTranscriptLocator = result.meta.agentMeta.transcriptLocator;
}
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
result.meta?.systemPromptReport,
@@ -962,15 +971,15 @@ export async function runMemoryFlushIfNeeded(params: {
sessionStore: activeSessionStore,
sessionKey: params.sessionKey,
newSessionId: postCompactionSessionId,
newSessionFile: postCompactionSessionFile,
newTranscriptLocator: postCompactionTranscriptLocator,
});
const updatedEntry = params.sessionKey ? activeSessionStore?.[params.sessionKey] : undefined;
if (updatedEntry) {
activeSessionEntry = updatedEntry;
params.followupRun.run.sessionId = updatedEntry.sessionId;
params.replyOperation.updateSessionId(updatedEntry.sessionId);
if (updatedEntry.sessionFile) {
params.followupRun.run.sessionFile = updatedEntry.sessionFile;
if (postCompactionTranscriptLocator) {
params.followupRun.run.transcriptLocator = postCompactionTranscriptLocator;
}
const queueKey = params.followupRun.run.sessionKey ?? params.sessionKey;
if (queueKey) {
@@ -978,7 +987,7 @@ export async function runMemoryFlushIfNeeded(params: {
key: queueKey,
previousSessionId,
nextSessionId: updatedEntry.sessionId,
nextSessionFile: updatedEntry.sessionFile,
nextTranscriptLocator: postCompactionTranscriptLocator,
});
}
}
@@ -998,8 +1007,8 @@ export async function runMemoryFlushIfNeeded(params: {
activeSessionEntry = updatedEntry;
params.followupRun.run.sessionId = updatedEntry.sessionId;
params.replyOperation.updateSessionId(updatedEntry.sessionId);
if (updatedEntry.sessionFile) {
params.followupRun.run.sessionFile = updatedEntry.sessionFile;
if (postCompactionTranscriptLocator) {
params.followupRun.run.transcriptLocator = postCompactionTranscriptLocator;
}
}
} catch (err) {

View File

@@ -53,7 +53,7 @@ describe("resetReplyRunSession", () => {
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: 1,
sessionFile: path.join(transcriptDir, "session.jsonl"),
transcriptLocator: path.join(transcriptDir, "session.jsonl"),
modelProvider: "qwencode",
model: "qwen",
contextTokens: 123,
@@ -108,12 +108,13 @@ describe("resetReplyRunSession", () => {
key: "main",
previousSessionId: "session",
nextSessionId: activeSessionEntry?.sessionId,
nextSessionFile: activeSessionEntry?.sessionFile,
nextTranscriptLocator: followupRun.run.transcriptLocator,
});
expect(errorMock).toHaveBeenCalledWith("reset 00000000-0000-0000-0000-000000000123");
const persisted = readTestSessionRow("main");
expect(persisted?.sessionId).toBe(activeSessionEntry?.sessionId);
expect(persisted?.transcriptLocator).toBeUndefined();
expect(persisted?.fallbackNoticeReason).toBeUndefined();
});
@@ -122,7 +123,7 @@ describe("resetReplyRunSession", () => {
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: 1,
sessionFile: path.join(transcriptDir, "session.jsonl"),
transcriptLocator: path.join(transcriptDir, "session.jsonl"),
totalTokens: 42,
compactionCount: 1,
};
@@ -148,7 +149,10 @@ describe("resetReplyRunSession", () => {
expect(activeSessionEntry?.sessionId).toBe("00000000-0000-0000-0000-000000000123");
expect(activeSessionEntry?.totalTokens).toBeUndefined();
expect(activeSessionEntry?.compactionCount).toBe(1);
expect(activeSessionEntry?.transcriptLocator).toBeUndefined();
expect(followupRun.run.sessionId).toBe(activeSessionEntry?.sessionId);
expect(readTestSessionRow("main")?.sessionId).toBe(activeSessionEntry?.sessionId);
const persisted = readTestSessionRow("main");
expect(persisted?.sessionId).toBe(activeSessionEntry?.sessionId);
expect(persisted?.transcriptLocator).toBeUndefined();
});
});

View File

@@ -44,7 +44,7 @@ export async function resetReplyRunSession(params: {
messageThreadId?: string;
followupRun: FollowupRun;
onActiveSessionEntry: (entry: SessionEntry) => void;
onNewSession: (newSessionId: string, nextSessionFile: string) => void;
onNewSession: (newSessionId: string, nextTranscriptLocator: string) => void;
}): Promise<boolean> {
if (!params.sessionKey) {
return false;
@@ -86,11 +86,10 @@ export async function resetReplyRunSession(params: {
fallbackNoticeActiveModel: undefined,
fallbackNoticeReason: undefined,
};
const nextSessionFile = createSqliteSessionTranscriptLocator({
const nextTranscriptLocator = createSqliteSessionTranscriptLocator({
sessionId: nextSessionId,
agentId,
});
nextEntry.sessionFile = nextSessionFile;
if (params.activeSessionStore) {
params.activeSessionStore[params.sessionKey] = nextEntry;
}
@@ -112,21 +111,20 @@ export async function resetReplyRunSession(params: {
await replayRecentUserAssistantMessages({
sourceAgentId: agentId,
sourceSessionId: prevEntry.sessionId,
sourceTranscript: prevEntry.sessionFile,
targetAgentId: agentId,
targetTranscript: nextSessionFile,
targetTranscript: nextTranscriptLocator,
newSessionId: nextSessionId,
});
params.followupRun.run.sessionId = nextSessionId;
params.followupRun.run.sessionFile = nextSessionFile;
params.followupRun.run.transcriptLocator = nextTranscriptLocator;
deps.refreshQueuedFollowupSession({
key: params.queueKey,
previousSessionId: prevEntry.sessionId,
nextSessionId,
nextSessionFile,
nextTranscriptLocator,
});
params.onActiveSessionEntry(nextEntry);
params.onNewSession(nextSessionId, nextSessionFile);
params.onNewSession(nextSessionId, nextTranscriptLocator);
deps.error(params.options.buildLogMessage(nextSessionId));
return true;
}

View File

@@ -30,8 +30,7 @@ async function createFixture() {
}
process.env.OPENCLAW_STATE_DIR = root;
const sessionKey = "agent:main:forum:direct:compaction";
const transcriptPath = path.join(root, "transcripts", "s1.jsonl");
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
const transcriptPath = createSqliteSessionTranscriptLocator({ agentId: "main", sessionId: "s1" });
replaceSqliteSessionTranscriptEvents({
agentId: "main",
sessionId: "s1",
@@ -40,7 +39,6 @@ async function createFixture() {
});
const entry = {
sessionId: "s1",
sessionFile: transcriptPath,
updatedAt: Date.now(),
compactionCount: 0,
} as SessionEntry;
@@ -111,7 +109,7 @@ describe("session-updates lifecycle hooks", () => {
sessionKey,
reason: "compaction",
});
expect(endEvent?.sessionFile).toBe(
expect(endEvent?.transcriptLocator).toBe(
createSqliteSessionTranscriptLocator({ agentId: "main", sessionId: "s1" }),
);
expect(endContext).toMatchObject({
@@ -132,7 +130,7 @@ describe("session-updates lifecycle hooks", () => {
});
});
it("keeps SQLite transcript locators virtual when compaction rotates topic sessions", async () => {
it("keeps topic compaction identity out of active session rows", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-updates-sqlite-"));
tempDirs.push(root);
if (!previousStateDirCaptured) {
@@ -141,7 +139,7 @@ describe("session-updates lifecycle hooks", () => {
}
process.env.OPENCLAW_STATE_DIR = root;
const sessionKey = "agent:main:forum:direct:compaction:topic:456";
const sessionFile = createSqliteSessionTranscriptLocator({
const transcriptLocator = createSqliteSessionTranscriptLocator({
agentId: "main",
sessionId: "s1",
topicId: 456,
@@ -149,12 +147,11 @@ describe("session-updates lifecycle hooks", () => {
replaceSqliteSessionTranscriptEvents({
agentId: "main",
sessionId: "s1",
transcriptPath: sessionFile,
transcriptPath: transcriptLocator,
events: [{ type: "message" }],
});
const entry = {
sessionId: "s1",
sessionFile,
updatedAt: Date.now(),
compactionCount: 0,
} as SessionEntry;
@@ -172,15 +169,9 @@ describe("session-updates lifecycle hooks", () => {
newSessionId: "s2",
});
const expectedNextFile = createSqliteSessionTranscriptLocator({
agentId: "main",
sessionId: "s2",
topicId: 456,
});
expect(sessionStore[sessionKey]?.sessionFile).toBe(expectedNextFile);
expect(sessionStore[sessionKey]?.sessionFile).toContain("sqlite-transcript://");
expect(sessionStore[sessionKey]?.sessionFile).not.toMatch(/^sqlite-transcript:\/[^/]/u);
expect(sessionStore[sessionKey]?.sessionId).toBe("s2");
expect(sessionStore[sessionKey]?.transcriptLocator).toBeUndefined();
const [endEvent] = hookRunnerMocks.runSessionEnd.mock.calls[0] ?? [];
expect(endEvent?.sessionFile).toBe(sessionFile);
expect(endEvent?.transcriptLocator).toBe(transcriptLocator);
});
});

View File

@@ -15,6 +15,7 @@ import {
type SessionEntry,
upsertSessionEntry,
} from "../../config/sessions.js";
import { parseSessionThreadInfoFast } from "../../config/sessions/thread-info.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { resolveStableSessionEndTranscript } from "../../gateway/session-transcript-paths.js";
import { logVerbose } from "../../globals.js";
@@ -69,6 +70,7 @@ function emitCompactionSessionLifecycleHooks(params: {
const transcript = resolveStableSessionEndTranscript({
sessionId: params.previousEntry.sessionId,
agentId: resolveAgentIdFromSessionKey(params.sessionKey),
topicId: resolveCompactionTopicId(params.sessionKey),
});
const payload = buildSessionEndHookPayload({
sessionId: params.previousEntry.sessionId,
@@ -96,6 +98,15 @@ function emitCompactionSessionLifecycleHooks(params: {
}
}
function resolveCompactionTopicId(sessionKey: string): string | undefined {
const parsedThreadId = parseSessionThreadInfoFast(sessionKey).threadId;
if (parsedThreadId) {
return parsedThreadId;
}
const match = /(?:^|:)topic:([^:]+)/u.exec(sessionKey);
return match?.[1];
}
function resolvePositiveTokenCount(value: number | undefined): number | undefined {
return typeof value === "number" && Number.isFinite(value) && value > 0
? Math.floor(value)
@@ -274,8 +285,9 @@ export async function incrementCompactionCount(params: {
updates.cacheRead = undefined;
updates.cacheWrite = undefined;
}
const { transcriptLocator: _derivedTranscriptLocator, ...entryWithoutLocator } = entry;
sessionStore[sessionKey] = {
...entry,
...entryWithoutLocator,
...updates,
};
const agentId =

View File

@@ -10,8 +10,9 @@ function normalizeTranscriptLocator(value: string | undefined): string | undefin
export function resolveSessionTranscriptCandidates(
sessionId: string,
sessionFile?: string,
transcriptLocator?: string,
agentId?: string,
topicId?: string | number,
): string[] {
const candidates: string[] = [];
const pushCandidate = (resolve: () => string): void => {
@@ -22,13 +23,13 @@ export function resolveSessionTranscriptCandidates(
}
};
const normalizedSessionFile = normalizeTranscriptLocator(sessionFile);
if (normalizedSessionFile) {
candidates.push(normalizedSessionFile);
const normalizedTranscriptLocator = normalizeTranscriptLocator(transcriptLocator);
if (normalizedTranscriptLocator) {
candidates.push(normalizedTranscriptLocator);
}
if (agentId) {
pushCandidate(() => createSqliteSessionTranscriptLocator({ sessionId, agentId }));
pushCandidate(() => createSqliteSessionTranscriptLocator({ sessionId, agentId, topicId }));
}
return Array.from(new Set(candidates));
@@ -36,18 +37,20 @@ export function resolveSessionTranscriptCandidates(
export function resolveStableSessionEndTranscript(params: {
sessionId: string;
sessionFile?: string;
transcriptLocator?: string;
agentId?: string;
}): { sessionFile?: string } {
const stableLocator = normalizeTranscriptLocator(params.sessionFile);
topicId?: string | number;
}): { transcriptLocator?: string } {
const stableLocator = normalizeTranscriptLocator(params.transcriptLocator);
if (stableLocator) {
return { sessionFile: stableLocator };
return { transcriptLocator: stableLocator };
}
const [candidate] = resolveSessionTranscriptCandidates(
params.sessionId,
params.sessionFile,
params.transcriptLocator,
params.agentId,
params.topicId,
);
return candidate ? { sessionFile: candidate } : {};
return candidate ? { transcriptLocator: candidate } : {};
}

View File

@@ -7,7 +7,7 @@ export {
export { openOpenClawStateDatabase } from "../state/openclaw-state-db.js";
export { resolveSessionRowEntry } from "../config/sessions/store-entry.js";
export { createSqliteSessionTranscriptLocator } from "../config/sessions/paths.js";
export { resolveAndPersistSessionTranscriptLocator } from "../config/sessions/session-locator.js";
export { resolveAndPersistSessionTranscriptIdentity } from "../config/sessions/session-locator.js";
export { resolveSessionKey } from "../config/sessions/session-key.js";
export { resolveGroupSessionKey } from "../config/sessions/group.js";
export { canonicalizeMainSessionAlias } from "../config/sessions/main-session.js";