test: move streamed tool result ordering off runReplyAgent e2e

This commit is contained in:
Peter Steinberger
2026-04-09 05:44:03 +01:00
parent c9e969c1a6
commit 19cf9a5326
2 changed files with 93 additions and 87 deletions

View File

@@ -333,6 +333,99 @@ describe("runAgentTurnWithFallback", () => {
expect(onToolResult).toHaveBeenCalledWith({ text: "The user is saying hello" });
});
it("continues delivering later streamed tool results after an earlier delivery failure", async () => {
const delivered: string[] = [];
const onToolResult = vi.fn(async (payload: { text?: string }) => {
if (payload.text === "first") {
throw new Error("simulated delivery failure");
}
delivered.push(payload.text ?? "");
});
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
params.onToolResult?.({ text: "first", mediaUrls: [] });
params.onToolResult?.({ text: "second", mediaUrls: [] });
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const pendingToolTasks = new Set<Promise<void>>();
const result = await runAgentTurnWithFallback({
commandBody: "hello",
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp",
MessageSid: "msg",
} as unknown as TemplateContext,
opts: { onToolResult } satisfies GetReplyOptions,
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true,
shouldEmitToolOutput: () => false,
pendingToolTasks,
resetSessionAfterCompactionFailure: async () => false,
resetSessionAfterRoleOrderingConflict: async () => false,
isHeartbeat: false,
sessionKey: "main",
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off",
});
await Promise.all(pendingToolTasks);
expect(result.kind).toBe("success");
expect(onToolResult).toHaveBeenCalledTimes(2);
expect(delivered).toEqual(["second"]);
});
it("delivers streamed tool results in callback order even when dispatch latency differs", async () => {
const deliveryOrder: string[] = [];
const onToolResult = vi.fn(async (payload: { text?: string }) => {
const delay = payload.text === "first" ? 5 : 1;
await new Promise((resolve) => setTimeout(resolve, delay));
deliveryOrder.push(payload.text ?? "");
});
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
params.onToolResult?.({ text: "first", mediaUrls: [] });
params.onToolResult?.({ text: "second", mediaUrls: [] });
return { payloads: [{ text: "final" }], meta: {} };
});
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const pendingToolTasks = new Set<Promise<void>>();
const result = await runAgentTurnWithFallback({
commandBody: "hello",
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "whatsapp",
MessageSid: "msg",
} as unknown as TemplateContext,
opts: { onToolResult } satisfies GetReplyOptions,
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true,
shouldEmitToolOutput: () => false,
pendingToolTasks,
resetSessionAfterCompactionFailure: async () => false,
resetSessionAfterRoleOrderingConflict: async () => false,
isHeartbeat: false,
sessionKey: "main",
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off",
});
await Promise.all(pendingToolTasks);
expect(result.kind).toBe("success");
expect(onToolResult).toHaveBeenCalledTimes(2);
expect(deliveryOrder).toEqual(["first", "second"]);
});
it("forwards item lifecycle events to reply options", async () => {
const onItemEvent = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {

View File

@@ -642,93 +642,6 @@ describe("runReplyAgent typing (heartbeat)", () => {
vi.useRealTimers();
});
it("delivers tool results in order even when dispatched concurrently", async () => {
const deliveryOrder: string[] = [];
const onToolResult = vi.fn(async (payload: { text?: string }) => {
// Simulate variable network latency: first result is slower than second
const delay = payload.text === "first" ? 5 : 1;
await new Promise((r) => setTimeout(r, delay));
deliveryOrder.push(payload.text ?? "");
});
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
// Fire two tool results without awaiting each one; await both at the end.
const first = params.onToolResult?.({ text: "first", mediaUrls: [] });
const second = params.onToolResult?.({ text: "second", mediaUrls: [] });
await Promise.all([first, second]);
return { payloads: [{ text: "final" }], meta: {} };
});
const { run } = createMinimalRun({
typingMode: "message",
opts: { onToolResult },
});
await run();
expect(onToolResult).toHaveBeenCalledTimes(2);
// Despite "first" having higher latency, it must be delivered before "second"
expect(deliveryOrder).toEqual(["first", "second"]);
});
it("continues delivering later tool results after an earlier tool result fails", async () => {
const delivered: string[] = [];
const onToolResult = vi.fn(async (payload: { text?: string }) => {
if (payload.text === "first") {
throw new Error("simulated delivery failure");
}
delivered.push(payload.text ?? "");
});
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
const first = params.onToolResult?.({ text: "first", mediaUrls: [] });
const second = params.onToolResult?.({ text: "second", mediaUrls: [] });
await Promise.allSettled([first, second]);
return { payloads: [{ text: "final" }], meta: {} };
});
const { run } = createMinimalRun({
typingMode: "message",
opts: { onToolResult },
});
await run();
expect(onToolResult).toHaveBeenCalledTimes(2);
expect(delivered).toEqual(["second"]);
});
it("refreshes queued followups when auto-compaction rotates the session", async () => {
await withTempStateDir(async (stateDir) => {
const storePath = path.join(stateDir, "sessions", "sessions.json");
const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: Date.now() };
const sessionStore = { main: sessionEntry };
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "final" }],
meta: {
agentMeta: {
sessionId: "session-rotated",
compactionCount: 1,
},
},
});
const { run } = createMinimalRun({
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
});
await run();
expect(vi.mocked(refreshQueuedFollowupSession)).toHaveBeenCalledWith({
key: "main",
previousSessionId: "session",
nextSessionId: "session-rotated",
nextSessionFile: expect.stringContaining("session-rotated.jsonl"),
});
});
});
it("announces model fallback only when verbose mode is enabled", async () => {
const cases = [
{ name: "verbose on", verbose: "on" as const, expectNotice: true },