test(gateway): share transcript event waiters

This commit is contained in:
Vincent Koc
2026-04-12 18:33:47 +01:00
parent 9c2b094f3f
commit 9259e593e6

View File

@@ -78,6 +78,45 @@ function waitForSessionMessageEvent(
);
}
function waitForSessionsChangedMessagePhase(
ws: Awaited<ReturnType<Awaited<ReturnType<typeof createGatewaySuiteHarness>>["openWs"]>>,
sessionKey: string,
) {
return onceMessage(
ws,
(message) =>
message.type === "event" &&
message.event === "sessions.changed" &&
(message.payload as { phase?: string; sessionKey?: string } | undefined)?.phase ===
"message" &&
(message.payload as { sessionKey?: string } | undefined)?.sessionKey === sessionKey,
);
}
async function emitTranscriptUpdateAndCollectEvents(params: {
ws: Awaited<ReturnType<Awaited<ReturnType<typeof createGatewaySuiteHarness>>["openWs"]>>;
sessionKey: string;
sessionFile: string;
message: Record<string, unknown>;
messageId: string;
}) {
const messageEventPromise = waitForSessionMessageEvent(params.ws, params.sessionKey);
const changedEventPromise = waitForSessionsChangedMessagePhase(params.ws, params.sessionKey);
emitSessionTranscriptUpdate({
sessionFile: params.sessionFile,
sessionKey: params.sessionKey,
message: params.message,
messageId: params.messageId,
});
const [messageEvent, changedEvent] = await Promise.all([
messageEventPromise,
changedEventPromise,
]);
return { messageEvent, changedEvent };
}
async function expectNoMessageWithin(params: {
action?: () => Promise<void> | void;
watch: () => Promise<unknown>;
@@ -289,29 +328,13 @@ describe("session.message websocket events", () => {
);
await withOperatorSessionSubscriber(harness, async (ws) => {
const messageEventPromise = waitForSessionMessageEvent(ws, "agent:main:main");
const changedEventPromise = onceMessage(
const { messageEvent, changedEvent } = await emitTranscriptUpdateAndCollectEvents({
ws,
(message) =>
message.type === "event" &&
message.event === "sessions.changed" &&
(message.payload as { phase?: string; sessionKey?: string } | undefined)?.phase ===
"message" &&
(message.payload as { sessionKey?: string } | undefined)?.sessionKey ===
"agent:main:main",
);
emitSessionTranscriptUpdate({
sessionFile: transcriptPath,
sessionKey: "agent:main:main",
sessionFile: transcriptPath,
message: transcriptMessage,
messageId: "msg-usage",
});
const [messageEvent, changedEvent] = await Promise.all([
messageEventPromise,
changedEventPromise,
]);
expect(messageEvent.payload).toMatchObject({
sessionKey: "agent:main:main",
messageId: "msg-usage",
@@ -465,29 +488,13 @@ describe("session.message websocket events", () => {
);
await withOperatorSessionSubscriber(harness, async (ws) => {
const messageEventPromise = waitForSessionMessageEvent(ws, "agent:main:main");
const changedEventPromise = onceMessage(
const { messageEvent, changedEvent } = await emitTranscriptUpdateAndCollectEvents({
ws,
(message) =>
message.type === "event" &&
message.event === "sessions.changed" &&
(message.payload as { phase?: string; sessionKey?: string } | undefined)?.phase ===
"message" &&
(message.payload as { sessionKey?: string } | undefined)?.sessionKey ===
"agent:main:main",
);
emitSessionTranscriptUpdate({
sessionFile: transcriptPath,
sessionKey: "agent:main:main",
sessionFile: transcriptPath,
message: transcriptMessage,
messageId: "msg-thread",
});
const [messageEvent, changedEvent] = await Promise.all([
messageEventPromise,
changedEventPromise,
]);
expect(messageEvent.payload).toMatchObject({
sessionKey: "agent:main:main",
lastChannel: "telegram",