From b443729f37500c16043dfaf01717ca74cf07fcbc Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 8 May 2026 18:01:22 +0100 Subject: [PATCH] refactor: resolve session keys by transcript locator --- src/gateway/server-session-events.ts | 4 +-- src/gateway/session-message-events.test.ts | 22 +++++++++------- src/gateway/session-transcript-key.test.ts | 30 ++++++++++++---------- src/gateway/session-transcript-key.ts | 28 ++++++++++---------- 4 files changed, 45 insertions(+), 39 deletions(-) diff --git a/src/gateway/server-session-events.ts b/src/gateway/server-session-events.ts index 5bf31a9a872..d7316a8ec31 100644 --- a/src/gateway/server-session-events.ts +++ b/src/gateway/server-session-events.ts @@ -7,7 +7,7 @@ import type { SessionEventSubscriberRegistry, SessionMessageSubscriberRegistry, } from "./server-chat.js"; -import { resolveSessionKeyForTranscriptFile } from "./session-transcript-key.js"; +import { resolveSessionKeyForTranscriptLocator } from "./session-transcript-key.js"; import { attachOpenClawTranscriptMeta, loadGatewaySessionRow, @@ -105,7 +105,7 @@ async function handleTranscriptUpdateBroadcast( }, update: SessionTranscriptUpdate, ): Promise { - const sessionKey = update.sessionKey ?? resolveSessionKeyForTranscriptFile(update.sessionFile); + const sessionKey = update.sessionKey ?? resolveSessionKeyForTranscriptLocator(update.sessionFile); if (!sessionKey || update.message === undefined) { return; } diff --git a/src/gateway/session-message-events.test.ts b/src/gateway/session-message-events.test.ts index 9a5713e1c4f..a8f8a92f399 100644 --- a/src/gateway/session-message-events.test.ts +++ b/src/gateway/session-message-events.test.ts @@ -2,6 +2,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterAll, afterEach, beforeAll, describe, expect, test, vi } from "vitest"; +import { createSqliteSessionTranscriptLocator } from "../config/sessions/paths.js"; import { exportSqliteSessionTranscriptJsonl, replaceSqliteSessionTranscriptEvents, @@ -12,7 +13,7 @@ import * as transcriptEvents from "../sessions/transcript-events.js"; import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js"; import { closeOpenClawAgentDatabasesForTest } from "../state/openclaw-agent-db.js"; import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; -import { resolveSessionKeyForTranscriptFile } from "./session-transcript-key.js"; +import { resolveSessionKeyForTranscriptLocator } from "./session-transcript-key.js"; import { connectOk, createGatewaySuiteHarness, @@ -712,26 +713,29 @@ describe("session.message websocket events", () => { } }); - test("routes transcript-only updates to the freshest session owner when different sessionIds share a transcript path", async () => { - const transcriptDir = await createTranscriptFixtureDir(); - const transcriptPath = path.join(transcriptDir, "shared.jsonl"); + test("routes transcript-only updates to the freshest session owner when different sessionIds share a transcript locator", async () => { + await createTranscriptFixtureDir(); + const transcriptLocator = createSqliteSessionTranscriptLocator({ + agentId: "main", + sessionId: "shared", + }); await seedGatewaySessionEntries({ entries: { older: { sessionId: "sess-old", - sessionFile: transcriptPath, + sessionFile: transcriptLocator, updatedAt: Date.now(), }, newer: { sessionId: "sess-new", - sessionFile: transcriptPath, + sessionFile: transcriptLocator, updatedAt: Date.now() + 10, }, }, }); replaceTranscriptEvents({ sessionId: "sess-new", - transcriptPath, + transcriptPath: transcriptLocator, events: [ { type: "session", version: 1, id: "sess-new" }, { @@ -744,13 +748,13 @@ describe("session.message websocket events", () => { }, ], }); - expect(resolveSessionKeyForTranscriptFile(transcriptPath)).toBe("agent:main:newer"); + expect(resolveSessionKeyForTranscriptLocator(transcriptLocator)).toBe("agent:main:newer"); await withOperatorSessionSubscriber(async (ws) => { const messageEventPromise = waitForSessionMessageEvent(ws, "agent:main:newer"); emitSessionTranscriptUpdate({ - sessionFile: transcriptPath, + sessionFile: transcriptLocator, message: { role: "assistant", content: [{ type: "text", text: "shared transcript update" }], diff --git a/src/gateway/session-transcript-key.test.ts b/src/gateway/session-transcript-key.test.ts index 136c914385b..3962e483aa3 100644 --- a/src/gateway/session-transcript-key.test.ts +++ b/src/gateway/session-transcript-key.test.ts @@ -26,10 +26,10 @@ vi.mock("./session-utils.js", () => ({ import { clearSessionTranscriptKeyCacheForTests, - resolveSessionKeyForTranscriptFile, + resolveSessionKeyForTranscriptLocator, } from "./session-transcript-key.js"; -describe("resolveSessionKeyForTranscriptFile", () => { +describe("resolveSessionKeyForTranscriptLocator", () => { const now = 1_700_000_000_000; const locator = (sessionId: string, agentId = "main") => createSqliteSessionTranscriptLocator({ agentId, sessionId }); @@ -67,10 +67,10 @@ describe("resolveSessionKeyForTranscriptFile", () => { return []; }); - expect(resolveSessionKeyForTranscriptFile(locator("sess-2"))).toBe("agent:main:two"); + expect(resolveSessionKeyForTranscriptLocator(locator("sess-2"))).toBe("agent:main:two"); expect(resolveSessionTranscriptCandidatesMock).toHaveBeenCalledTimes(2); - expect(resolveSessionKeyForTranscriptFile(locator("sess-2"))).toBe("agent:main:two"); + expect(resolveSessionKeyForTranscriptLocator(locator("sess-2"))).toBe("agent:main:two"); expect(resolveSessionTranscriptCandidatesMock).toHaveBeenCalledTimes(3); }); @@ -98,7 +98,7 @@ describe("resolveSessionKeyForTranscriptFile", () => { }, ); - expect(resolveSessionKeyForTranscriptFile(locator("shared"))).toBe("agent:main:beta"); + expect(resolveSessionKeyForTranscriptLocator(locator("shared"))).toBe("agent:main:beta"); store = { "agent:main:alpha": { sessionId: "sess-alpha-2", updatedAt: now + 1 }, @@ -109,11 +109,11 @@ describe("resolveSessionKeyForTranscriptFile", () => { }, }; - expect(resolveSessionKeyForTranscriptFile(locator("shared"))).toBe("agent:main:alpha"); + expect(resolveSessionKeyForTranscriptLocator(locator("shared"))).toBe("agent:main:alpha"); }); - it("returns undefined for blank transcript paths", () => { - expect(resolveSessionKeyForTranscriptFile(" ")).toBeUndefined(); + it("returns undefined for blank transcript locators", () => { + expect(resolveSessionKeyForTranscriptLocator(" ")).toBeUndefined(); expect(loadCombinedSessionEntriesForGatewayMock).not.toHaveBeenCalled(); }); @@ -128,10 +128,12 @@ describe("resolveSessionKeyForTranscriptFile", () => { }); resolveSessionTranscriptCandidatesMock.mockReturnValue([locator("run-dup")]); - expect(resolveSessionKeyForTranscriptFile(locator("run-dup"))).toBe("agent:main:acp:run-dup"); + expect(resolveSessionKeyForTranscriptLocator(locator("run-dup"))).toBe( + "agent:main:acp:run-dup", + ); }); - it("prefers the freshest matching session when different sessionIds share a transcript path", () => { + it("prefers the freshest matching session when different sessionIds share a transcript locator", () => { const store = { "agent:main:older": { sessionId: "sess-old", updatedAt: now }, "agent:main:newer": { sessionId: "sess-new", updatedAt: now + 10 }, @@ -142,7 +144,7 @@ describe("resolveSessionKeyForTranscriptFile", () => { }); resolveSessionTranscriptCandidatesMock.mockReturnValue([locator("shared")]); - expect(resolveSessionKeyForTranscriptFile(locator("shared"))).toBe("agent:main:newer"); + expect(resolveSessionKeyForTranscriptLocator(locator("shared"))).toBe("agent:main:newer"); }); it("evicts oldest entry when cache exceeds 256 entries (#63643)", () => { @@ -158,7 +160,7 @@ describe("resolveSessionKeyForTranscriptFile", () => { entries: store, }); resolveSessionTranscriptCandidatesMock.mockReturnValue([transcriptPath]); - resolveSessionKeyForTranscriptFile(transcriptPath); + resolveSessionKeyForTranscriptLocator(transcriptPath); } // Now add the 257th — should evict session-0 @@ -172,7 +174,7 @@ describe("resolveSessionKeyForTranscriptFile", () => { entries: overflowStore, }); resolveSessionTranscriptCandidatesMock.mockReturnValue([overflowPath]); - expect(resolveSessionKeyForTranscriptFile(overflowPath)).toBe(overflowKey); + expect(resolveSessionKeyForTranscriptLocator(overflowPath)).toBe(overflowKey); // session-0 should have been evicted from cache — next lookup will // re-resolve from the store (returns undefined since store was mocked @@ -182,6 +184,6 @@ describe("resolveSessionKeyForTranscriptFile", () => { entries: overflowStore, }); resolveSessionTranscriptCandidatesMock.mockReturnValue([]); - expect(resolveSessionKeyForTranscriptFile(locator("session-0"))).toBeUndefined(); + expect(resolveSessionKeyForTranscriptLocator(locator("session-0"))).toBeUndefined(); }); }); diff --git a/src/gateway/session-transcript-key.ts b/src/gateway/session-transcript-key.ts index 40d061952f3..00c1f0c96ac 100644 --- a/src/gateway/session-transcript-key.ts +++ b/src/gateway/session-transcript-key.ts @@ -21,11 +21,11 @@ function resolveTranscriptIdentityForComparison(value: string | undefined): stri return trimmed; } -function sessionKeyMatchesTranscriptPath(params: { +function sessionKeyMatchesTranscriptLocator(params: { cfg: OpenClawConfig; store: Record; key: string; - targetPath: string; + targetLocator: string; }): boolean { const entry = params.store[params.key]; if (!entry?.sessionId) { @@ -42,29 +42,29 @@ function sessionKeyMatchesTranscriptPath(params: { entry.sessionId, entry.sessionFile, sessionAgentId, - ).some((candidate) => resolveTranscriptIdentityForComparison(candidate) === params.targetPath); + ).some((candidate) => resolveTranscriptIdentityForComparison(candidate) === params.targetLocator); } export function clearSessionTranscriptKeyCacheForTests(): void { TRANSCRIPT_SESSION_KEY_CACHE.clear(); } -export function resolveSessionKeyForTranscriptFile(sessionFile: string): string | undefined { - const targetPath = resolveTranscriptIdentityForComparison(sessionFile); - if (!targetPath) { +export function resolveSessionKeyForTranscriptLocator(locator: string): string | undefined { + const targetLocator = resolveTranscriptIdentityForComparison(locator); + if (!targetLocator) { return undefined; } const cfg = getRuntimeConfig(); const { entries: store } = loadCombinedSessionEntriesForGateway(cfg); - const cachedKey = TRANSCRIPT_SESSION_KEY_CACHE.get(targetPath); + const cachedKey = TRANSCRIPT_SESSION_KEY_CACHE.get(targetLocator); if ( cachedKey && - sessionKeyMatchesTranscriptPath({ + sessionKeyMatchesTranscriptLocator({ cfg, store, key: cachedKey, - targetPath, + targetLocator, }) ) { return cachedKey; @@ -76,11 +76,11 @@ export function resolveSessionKeyForTranscriptFile(sessionFile: string): string continue; } if ( - sessionKeyMatchesTranscriptPath({ + sessionKeyMatchesTranscriptLocator({ cfg, store, key, - targetPath, + targetLocator, }) ) { matchingEntries.push([key, entry]); @@ -131,7 +131,7 @@ export function resolveSessionKeyForTranscriptFile(sessionFile: string): string if (resolvedKey) { // Evict oldest-inserted entry when cache exceeds size cap (FIFO bound). if ( - !TRANSCRIPT_SESSION_KEY_CACHE.has(targetPath) && + !TRANSCRIPT_SESSION_KEY_CACHE.has(targetLocator) && TRANSCRIPT_SESSION_KEY_CACHE.size >= TRANSCRIPT_SESSION_KEY_CACHE_MAX ) { const oldest = TRANSCRIPT_SESSION_KEY_CACHE.keys().next().value; @@ -139,11 +139,11 @@ export function resolveSessionKeyForTranscriptFile(sessionFile: string): string TRANSCRIPT_SESSION_KEY_CACHE.delete(oldest); } } - TRANSCRIPT_SESSION_KEY_CACHE.set(targetPath, resolvedKey); + TRANSCRIPT_SESSION_KEY_CACHE.set(targetLocator, resolvedKey); return resolvedKey; } } - TRANSCRIPT_SESSION_KEY_CACHE.delete(targetPath); + TRANSCRIPT_SESSION_KEY_CACHE.delete(targetLocator); return undefined; }