mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-19 12:32:59 +00:00
refactor: resolve session keys by transcript locator
This commit is contained in:
@@ -7,7 +7,7 @@ import type {
|
||||
SessionEventSubscriberRegistry,
|
||||
SessionMessageSubscriberRegistry,
|
||||
} from "./server-chat.js";
|
||||
import { resolveSessionKeyForTranscriptFile } from "./session-transcript-key.js";
|
||||
import { resolveSessionKeyForTranscriptLocator } from "./session-transcript-key.js";
|
||||
import {
|
||||
attachOpenClawTranscriptMeta,
|
||||
loadGatewaySessionRow,
|
||||
@@ -105,7 +105,7 @@ async function handleTranscriptUpdateBroadcast(
|
||||
},
|
||||
update: SessionTranscriptUpdate,
|
||||
): Promise<void> {
|
||||
const sessionKey = update.sessionKey ?? resolveSessionKeyForTranscriptFile(update.sessionFile);
|
||||
const sessionKey = update.sessionKey ?? resolveSessionKeyForTranscriptLocator(update.sessionFile);
|
||||
if (!sessionKey || update.message === undefined) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterAll, afterEach, beforeAll, describe, expect, test, vi } from "vitest";
|
||||
import { createSqliteSessionTranscriptLocator } from "../config/sessions/paths.js";
|
||||
import {
|
||||
exportSqliteSessionTranscriptJsonl,
|
||||
replaceSqliteSessionTranscriptEvents,
|
||||
@@ -12,7 +13,7 @@ import * as transcriptEvents from "../sessions/transcript-events.js";
|
||||
import { emitSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import { closeOpenClawAgentDatabasesForTest } from "../state/openclaw-agent-db.js";
|
||||
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
|
||||
import { resolveSessionKeyForTranscriptFile } from "./session-transcript-key.js";
|
||||
import { resolveSessionKeyForTranscriptLocator } from "./session-transcript-key.js";
|
||||
import {
|
||||
connectOk,
|
||||
createGatewaySuiteHarness,
|
||||
@@ -712,26 +713,29 @@ describe("session.message websocket events", () => {
|
||||
}
|
||||
});
|
||||
|
||||
test("routes transcript-only updates to the freshest session owner when different sessionIds share a transcript path", async () => {
|
||||
const transcriptDir = await createTranscriptFixtureDir();
|
||||
const transcriptPath = path.join(transcriptDir, "shared.jsonl");
|
||||
test("routes transcript-only updates to the freshest session owner when different sessionIds share a transcript locator", async () => {
|
||||
await createTranscriptFixtureDir();
|
||||
const transcriptLocator = createSqliteSessionTranscriptLocator({
|
||||
agentId: "main",
|
||||
sessionId: "shared",
|
||||
});
|
||||
await seedGatewaySessionEntries({
|
||||
entries: {
|
||||
older: {
|
||||
sessionId: "sess-old",
|
||||
sessionFile: transcriptPath,
|
||||
sessionFile: transcriptLocator,
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
newer: {
|
||||
sessionId: "sess-new",
|
||||
sessionFile: transcriptPath,
|
||||
sessionFile: transcriptLocator,
|
||||
updatedAt: Date.now() + 10,
|
||||
},
|
||||
},
|
||||
});
|
||||
replaceTranscriptEvents({
|
||||
sessionId: "sess-new",
|
||||
transcriptPath,
|
||||
transcriptPath: transcriptLocator,
|
||||
events: [
|
||||
{ type: "session", version: 1, id: "sess-new" },
|
||||
{
|
||||
@@ -744,13 +748,13 @@ describe("session.message websocket events", () => {
|
||||
},
|
||||
],
|
||||
});
|
||||
expect(resolveSessionKeyForTranscriptFile(transcriptPath)).toBe("agent:main:newer");
|
||||
expect(resolveSessionKeyForTranscriptLocator(transcriptLocator)).toBe("agent:main:newer");
|
||||
|
||||
await withOperatorSessionSubscriber(async (ws) => {
|
||||
const messageEventPromise = waitForSessionMessageEvent(ws, "agent:main:newer");
|
||||
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: transcriptPath,
|
||||
sessionFile: transcriptLocator,
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "shared transcript update" }],
|
||||
|
||||
@@ -26,10 +26,10 @@ vi.mock("./session-utils.js", () => ({
|
||||
|
||||
import {
|
||||
clearSessionTranscriptKeyCacheForTests,
|
||||
resolveSessionKeyForTranscriptFile,
|
||||
resolveSessionKeyForTranscriptLocator,
|
||||
} from "./session-transcript-key.js";
|
||||
|
||||
describe("resolveSessionKeyForTranscriptFile", () => {
|
||||
describe("resolveSessionKeyForTranscriptLocator", () => {
|
||||
const now = 1_700_000_000_000;
|
||||
const locator = (sessionId: string, agentId = "main") =>
|
||||
createSqliteSessionTranscriptLocator({ agentId, sessionId });
|
||||
@@ -67,10 +67,10 @@ describe("resolveSessionKeyForTranscriptFile", () => {
|
||||
return [];
|
||||
});
|
||||
|
||||
expect(resolveSessionKeyForTranscriptFile(locator("sess-2"))).toBe("agent:main:two");
|
||||
expect(resolveSessionKeyForTranscriptLocator(locator("sess-2"))).toBe("agent:main:two");
|
||||
expect(resolveSessionTranscriptCandidatesMock).toHaveBeenCalledTimes(2);
|
||||
|
||||
expect(resolveSessionKeyForTranscriptFile(locator("sess-2"))).toBe("agent:main:two");
|
||||
expect(resolveSessionKeyForTranscriptLocator(locator("sess-2"))).toBe("agent:main:two");
|
||||
expect(resolveSessionTranscriptCandidatesMock).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
@@ -98,7 +98,7 @@ describe("resolveSessionKeyForTranscriptFile", () => {
|
||||
},
|
||||
);
|
||||
|
||||
expect(resolveSessionKeyForTranscriptFile(locator("shared"))).toBe("agent:main:beta");
|
||||
expect(resolveSessionKeyForTranscriptLocator(locator("shared"))).toBe("agent:main:beta");
|
||||
|
||||
store = {
|
||||
"agent:main:alpha": { sessionId: "sess-alpha-2", updatedAt: now + 1 },
|
||||
@@ -109,11 +109,11 @@ describe("resolveSessionKeyForTranscriptFile", () => {
|
||||
},
|
||||
};
|
||||
|
||||
expect(resolveSessionKeyForTranscriptFile(locator("shared"))).toBe("agent:main:alpha");
|
||||
expect(resolveSessionKeyForTranscriptLocator(locator("shared"))).toBe("agent:main:alpha");
|
||||
});
|
||||
|
||||
it("returns undefined for blank transcript paths", () => {
|
||||
expect(resolveSessionKeyForTranscriptFile(" ")).toBeUndefined();
|
||||
it("returns undefined for blank transcript locators", () => {
|
||||
expect(resolveSessionKeyForTranscriptLocator(" ")).toBeUndefined();
|
||||
expect(loadCombinedSessionEntriesForGatewayMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
@@ -128,10 +128,12 @@ describe("resolveSessionKeyForTranscriptFile", () => {
|
||||
});
|
||||
resolveSessionTranscriptCandidatesMock.mockReturnValue([locator("run-dup")]);
|
||||
|
||||
expect(resolveSessionKeyForTranscriptFile(locator("run-dup"))).toBe("agent:main:acp:run-dup");
|
||||
expect(resolveSessionKeyForTranscriptLocator(locator("run-dup"))).toBe(
|
||||
"agent:main:acp:run-dup",
|
||||
);
|
||||
});
|
||||
|
||||
it("prefers the freshest matching session when different sessionIds share a transcript path", () => {
|
||||
it("prefers the freshest matching session when different sessionIds share a transcript locator", () => {
|
||||
const store = {
|
||||
"agent:main:older": { sessionId: "sess-old", updatedAt: now },
|
||||
"agent:main:newer": { sessionId: "sess-new", updatedAt: now + 10 },
|
||||
@@ -142,7 +144,7 @@ describe("resolveSessionKeyForTranscriptFile", () => {
|
||||
});
|
||||
resolveSessionTranscriptCandidatesMock.mockReturnValue([locator("shared")]);
|
||||
|
||||
expect(resolveSessionKeyForTranscriptFile(locator("shared"))).toBe("agent:main:newer");
|
||||
expect(resolveSessionKeyForTranscriptLocator(locator("shared"))).toBe("agent:main:newer");
|
||||
});
|
||||
|
||||
it("evicts oldest entry when cache exceeds 256 entries (#63643)", () => {
|
||||
@@ -158,7 +160,7 @@ describe("resolveSessionKeyForTranscriptFile", () => {
|
||||
entries: store,
|
||||
});
|
||||
resolveSessionTranscriptCandidatesMock.mockReturnValue([transcriptPath]);
|
||||
resolveSessionKeyForTranscriptFile(transcriptPath);
|
||||
resolveSessionKeyForTranscriptLocator(transcriptPath);
|
||||
}
|
||||
|
||||
// Now add the 257th — should evict session-0
|
||||
@@ -172,7 +174,7 @@ describe("resolveSessionKeyForTranscriptFile", () => {
|
||||
entries: overflowStore,
|
||||
});
|
||||
resolveSessionTranscriptCandidatesMock.mockReturnValue([overflowPath]);
|
||||
expect(resolveSessionKeyForTranscriptFile(overflowPath)).toBe(overflowKey);
|
||||
expect(resolveSessionKeyForTranscriptLocator(overflowPath)).toBe(overflowKey);
|
||||
|
||||
// session-0 should have been evicted from cache — next lookup will
|
||||
// re-resolve from the store (returns undefined since store was mocked
|
||||
@@ -182,6 +184,6 @@ describe("resolveSessionKeyForTranscriptFile", () => {
|
||||
entries: overflowStore,
|
||||
});
|
||||
resolveSessionTranscriptCandidatesMock.mockReturnValue([]);
|
||||
expect(resolveSessionKeyForTranscriptFile(locator("session-0"))).toBeUndefined();
|
||||
expect(resolveSessionKeyForTranscriptLocator(locator("session-0"))).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -21,11 +21,11 @@ function resolveTranscriptIdentityForComparison(value: string | undefined): stri
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
function sessionKeyMatchesTranscriptPath(params: {
|
||||
function sessionKeyMatchesTranscriptLocator(params: {
|
||||
cfg: OpenClawConfig;
|
||||
store: Record<string, SessionEntry>;
|
||||
key: string;
|
||||
targetPath: string;
|
||||
targetLocator: string;
|
||||
}): boolean {
|
||||
const entry = params.store[params.key];
|
||||
if (!entry?.sessionId) {
|
||||
@@ -42,29 +42,29 @@ function sessionKeyMatchesTranscriptPath(params: {
|
||||
entry.sessionId,
|
||||
entry.sessionFile,
|
||||
sessionAgentId,
|
||||
).some((candidate) => resolveTranscriptIdentityForComparison(candidate) === params.targetPath);
|
||||
).some((candidate) => resolveTranscriptIdentityForComparison(candidate) === params.targetLocator);
|
||||
}
|
||||
|
||||
export function clearSessionTranscriptKeyCacheForTests(): void {
|
||||
TRANSCRIPT_SESSION_KEY_CACHE.clear();
|
||||
}
|
||||
|
||||
export function resolveSessionKeyForTranscriptFile(sessionFile: string): string | undefined {
|
||||
const targetPath = resolveTranscriptIdentityForComparison(sessionFile);
|
||||
if (!targetPath) {
|
||||
export function resolveSessionKeyForTranscriptLocator(locator: string): string | undefined {
|
||||
const targetLocator = resolveTranscriptIdentityForComparison(locator);
|
||||
if (!targetLocator) {
|
||||
return undefined;
|
||||
}
|
||||
const cfg = getRuntimeConfig();
|
||||
const { entries: store } = loadCombinedSessionEntriesForGateway(cfg);
|
||||
|
||||
const cachedKey = TRANSCRIPT_SESSION_KEY_CACHE.get(targetPath);
|
||||
const cachedKey = TRANSCRIPT_SESSION_KEY_CACHE.get(targetLocator);
|
||||
if (
|
||||
cachedKey &&
|
||||
sessionKeyMatchesTranscriptPath({
|
||||
sessionKeyMatchesTranscriptLocator({
|
||||
cfg,
|
||||
store,
|
||||
key: cachedKey,
|
||||
targetPath,
|
||||
targetLocator,
|
||||
})
|
||||
) {
|
||||
return cachedKey;
|
||||
@@ -76,11 +76,11 @@ export function resolveSessionKeyForTranscriptFile(sessionFile: string): string
|
||||
continue;
|
||||
}
|
||||
if (
|
||||
sessionKeyMatchesTranscriptPath({
|
||||
sessionKeyMatchesTranscriptLocator({
|
||||
cfg,
|
||||
store,
|
||||
key,
|
||||
targetPath,
|
||||
targetLocator,
|
||||
})
|
||||
) {
|
||||
matchingEntries.push([key, entry]);
|
||||
@@ -131,7 +131,7 @@ export function resolveSessionKeyForTranscriptFile(sessionFile: string): string
|
||||
if (resolvedKey) {
|
||||
// Evict oldest-inserted entry when cache exceeds size cap (FIFO bound).
|
||||
if (
|
||||
!TRANSCRIPT_SESSION_KEY_CACHE.has(targetPath) &&
|
||||
!TRANSCRIPT_SESSION_KEY_CACHE.has(targetLocator) &&
|
||||
TRANSCRIPT_SESSION_KEY_CACHE.size >= TRANSCRIPT_SESSION_KEY_CACHE_MAX
|
||||
) {
|
||||
const oldest = TRANSCRIPT_SESSION_KEY_CACHE.keys().next().value;
|
||||
@@ -139,11 +139,11 @@ export function resolveSessionKeyForTranscriptFile(sessionFile: string): string
|
||||
TRANSCRIPT_SESSION_KEY_CACHE.delete(oldest);
|
||||
}
|
||||
}
|
||||
TRANSCRIPT_SESSION_KEY_CACHE.set(targetPath, resolvedKey);
|
||||
TRANSCRIPT_SESSION_KEY_CACHE.set(targetLocator, resolvedKey);
|
||||
return resolvedKey;
|
||||
}
|
||||
}
|
||||
|
||||
TRANSCRIPT_SESSION_KEY_CACHE.delete(targetPath);
|
||||
TRANSCRIPT_SESSION_KEY_CACHE.delete(targetLocator);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user