mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-23 06:41:44 +00:00
fix: prefer freshest transcript session owners
This commit is contained in:
@@ -386,4 +386,78 @@ describe("session.message websocket events", () => {
|
||||
await harness.close();
|
||||
}
|
||||
});
|
||||
|
||||
test("routes transcript-only updates to the freshest session owner when different sessionIds share a transcript path", async () => {
|
||||
const storePath = await createSessionStoreFile();
|
||||
const transcriptPath = path.join(path.dirname(storePath), "shared.jsonl");
|
||||
await writeSessionStore({
|
||||
entries: {
|
||||
older: {
|
||||
sessionId: "sess-old",
|
||||
sessionFile: transcriptPath,
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
newer: {
|
||||
sessionId: "sess-new",
|
||||
sessionFile: transcriptPath,
|
||||
updatedAt: Date.now() + 10,
|
||||
},
|
||||
},
|
||||
storePath,
|
||||
});
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({ type: "session", version: 1, id: "sess-new" }),
|
||||
JSON.stringify({
|
||||
id: "msg-shared",
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "shared transcript update" }],
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
}),
|
||||
].join("\n"),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const harness = await createGatewaySuiteHarness();
|
||||
try {
|
||||
const ws = await harness.openWs();
|
||||
try {
|
||||
await connectOk(ws, { scopes: ["operator.read"] });
|
||||
await rpcReq(ws, "sessions.subscribe");
|
||||
|
||||
const messageEventPromise = onceMessage(
|
||||
ws,
|
||||
(message) =>
|
||||
message.type === "event" &&
|
||||
message.event === "session.message" &&
|
||||
(message.payload as { sessionKey?: string } | undefined)?.sessionKey ===
|
||||
"agent:main:newer",
|
||||
);
|
||||
|
||||
emitSessionTranscriptUpdate({
|
||||
sessionFile: transcriptPath,
|
||||
message: {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "shared transcript update" }],
|
||||
timestamp: Date.now(),
|
||||
},
|
||||
messageId: "msg-shared",
|
||||
});
|
||||
|
||||
const messageEvent = await messageEventPromise;
|
||||
expect(messageEvent.payload).toMatchObject({
|
||||
sessionKey: "agent:main:newer",
|
||||
messageId: "msg-shared",
|
||||
messageSeq: 1,
|
||||
});
|
||||
} finally {
|
||||
ws.close();
|
||||
}
|
||||
} finally {
|
||||
await harness.close();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -127,4 +127,18 @@ describe("resolveSessionKeyForTranscriptFile", () => {
|
||||
|
||||
expect(resolveSessionKeyForTranscriptFile("/tmp/shared.jsonl")).toBe("agent:main:acp:run-dup");
|
||||
});
|
||||
|
||||
it("prefers the freshest matching session when different sessionIds share a transcript path", () => {
|
||||
const store = {
|
||||
"agent:main:older": { sessionId: "sess-old", updatedAt: now },
|
||||
"agent:main:newer": { sessionId: "sess-new", updatedAt: now + 10 },
|
||||
} satisfies Record<string, SessionEntry>;
|
||||
loadCombinedSessionStoreForGatewayMock.mockReturnValue({
|
||||
storePath: "(multiple)",
|
||||
store,
|
||||
});
|
||||
resolveSessionTranscriptCandidatesMock.mockReturnValue(["/tmp/shared.jsonl"]);
|
||||
|
||||
expect(resolveSessionKeyForTranscriptFile("/tmp/shared.jsonl")).toBe("agent:main:newer");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -93,13 +93,46 @@ export function resolveSessionKeyForTranscriptFile(sessionFile: string): string
|
||||
}
|
||||
|
||||
if (matchingEntries.length > 0) {
|
||||
const firstSessionId = matchingEntries[0]?.[1].sessionId;
|
||||
const sameSessionMatches = matchingEntries.filter(
|
||||
(entry): entry is [string, SessionEntry] => entry[1].sessionId === firstSessionId,
|
||||
const matchesBySessionId = new Map<string, Array<[string, SessionEntry]>>();
|
||||
for (const entry of matchingEntries) {
|
||||
const sessionId = entry[1].sessionId;
|
||||
if (!sessionId) {
|
||||
continue;
|
||||
}
|
||||
const group = matchesBySessionId.get(sessionId);
|
||||
if (group) {
|
||||
group.push(entry);
|
||||
} else {
|
||||
matchesBySessionId.set(sessionId, [entry]);
|
||||
}
|
||||
}
|
||||
|
||||
const resolvedMatches = Array.from(matchesBySessionId.entries())
|
||||
.map(([sessionId, matches]) => {
|
||||
const resolvedKey =
|
||||
resolvePreferredSessionKeyForSessionIdMatches(matches, sessionId) ?? matches[0]?.[0];
|
||||
const resolvedEntry = resolvedKey
|
||||
? matches.find(([key]) => key === resolvedKey)?.[1]
|
||||
: undefined;
|
||||
return resolvedKey && resolvedEntry
|
||||
? {
|
||||
key: resolvedKey,
|
||||
updatedAt: resolvedEntry.updatedAt ?? 0,
|
||||
}
|
||||
: undefined;
|
||||
})
|
||||
.filter((match): match is { key: string; updatedAt: number } => match !== undefined);
|
||||
|
||||
const sortedResolvedMatches = [...resolvedMatches].toSorted(
|
||||
(a, b) => b.updatedAt - a.updatedAt,
|
||||
);
|
||||
const [freshestMatch, secondFreshestMatch] = sortedResolvedMatches;
|
||||
const resolvedKey =
|
||||
resolvePreferredSessionKeyForSessionIdMatches(sameSessionMatches, firstSessionId) ??
|
||||
matchingEntries[0]?.[0];
|
||||
resolvedMatches.length === 1
|
||||
? freshestMatch?.key
|
||||
: (freshestMatch?.updatedAt ?? 0) > (secondFreshestMatch?.updatedAt ?? 0)
|
||||
? freshestMatch?.key
|
||||
: undefined;
|
||||
if (resolvedKey) {
|
||||
TRANSCRIPT_SESSION_KEY_CACHE.set(targetPath, resolvedKey);
|
||||
return resolvedKey;
|
||||
|
||||
Reference in New Issue
Block a user