diff --git a/src/agents/pi-embedded-runner.guard.waitforidle-before-flush.test.ts b/src/agents/pi-embedded-runner.guard.waitforidle-before-flush.test.ts new file mode 100644 index 00000000000..7ed7c04ef91 --- /dev/null +++ b/src/agents/pi-embedded-runner.guard.waitforidle-before-flush.test.ts @@ -0,0 +1,112 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { flushPendingToolResultsAfterIdle } from "./pi-embedded-runner/wait-for-idle-before-flush.js"; +import { guardSessionManager } from "./session-tool-result-guard-wrapper.js"; + +function assistantToolCall(id: string): AgentMessage { + return { + role: "assistant", + content: [{ type: "toolCall", id, name: "exec", arguments: {} }], + stopReason: "toolUse", + } as AgentMessage; +} + +function toolResult(id: string, text: string): AgentMessage { + return { + role: "toolResult", + toolCallId: id, + content: [{ type: "text", text }], + isError: false, + } as AgentMessage; +} + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + const promise = new Promise((r) => { + resolve = r; + }); + return { promise, resolve }; +} + +function getMessages(sm: ReturnType): AgentMessage[] { + return sm + .getEntries() + .filter((e) => e.type === "message") + .map((e) => (e as { message: AgentMessage }).message); +} + +describe("flushPendingToolResultsAfterIdle", () => { + afterEach(() => { + vi.useRealTimers(); + }); + + it("waits for idle so real tool results can land before flush", async () => { + const sm = guardSessionManager(SessionManager.inMemory()); + const idle = deferred(); + const agent = { waitForIdle: () => idle.promise }; + + sm.appendMessage(assistantToolCall("call_retry_1")); + const flushPromise = flushPendingToolResultsAfterIdle({ + agent, + sessionManager: sm, + timeoutMs: 1_000, + }); + + // Flush is waiting for idle; synthetic result must not appear yet. + await Promise.resolve(); + expect(getMessages(sm).map((m) => m.role)).toEqual(["assistant"]); + + // Tool completes before idle wait finishes. + sm.appendMessage(toolResult("call_retry_1", "command output here")); + idle.resolve(); + await flushPromise; + + const messages = getMessages(sm); + expect(messages.map((m) => m.role)).toEqual(["assistant", "toolResult"]); + expect((messages[1] as { isError?: boolean }).isError).not.toBe(true); + expect((messages[1] as { content?: Array<{ text?: string }> }).content?.[0]?.text).toBe( + "command output here", + ); + }); + + it("flushes pending tool call after timeout when idle never resolves", async () => { + const sm = guardSessionManager(SessionManager.inMemory()); + vi.useFakeTimers(); + const agent = { waitForIdle: () => new Promise(() => {}) }; + + sm.appendMessage(assistantToolCall("call_orphan_1")); + + const flushPromise = flushPendingToolResultsAfterIdle({ + agent, + sessionManager: sm, + timeoutMs: 30, + }); + await vi.advanceTimersByTimeAsync(30); + await flushPromise; + + const entries = getMessages(sm); + + expect(entries.length).toBe(2); + expect(entries[1].role).toBe("toolResult"); + expect((entries[1] as { isError?: boolean }).isError).toBe(true); + expect((entries[1] as { content?: Array<{ text?: string }> }).content?.[0]?.text).toContain( + "missing tool result", + ); + }); + + it("clears timeout handle when waitForIdle resolves first", async () => { + const sm = guardSessionManager(SessionManager.inMemory()); + vi.useFakeTimers(); + const agent = { + waitForIdle: async () => {}, + }; + + await flushPendingToolResultsAfterIdle({ + agent, + sessionManager: sm, + timeoutMs: 30_000, + }); + expect(vi.getTimerCount()).toBe(0); + }); +}); diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 84a0c616618..0eec28249ce 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -74,6 +74,7 @@ import { } from "./system-prompt.js"; import { splitSdkTools } from "./tool-split.js"; import { describeUnknownError, mapThinkingLevel, resolveExecToolDefaults } from "./utils.js"; +import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js"; export type CompactEmbeddedPiSessionParams = { sessionId: string; @@ -471,7 +472,10 @@ export async function compactEmbeddedPiSessionDirect( }, }; } finally { - sessionManager.flushPendingToolResults?.(); + await flushPendingToolResultsAfterIdle({ + agent: session?.agent, + sessionManager, + }); session.dispose(); } } finally { diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 41123de1474..425a30a506d 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -89,6 +89,7 @@ import { } from "../system-prompt.js"; import { splitSdkTools } from "../tool-split.js"; import { describeUnknownError, mapThinkingLevel } from "../utils.js"; +import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js"; import { detectAndLoadPromptImages } from "./images.js"; export function injectHistoryImagesIntoMessages( @@ -577,7 +578,10 @@ export async function runEmbeddedAttempt( activeSession.agent.replaceMessages(limited); } } catch (err) { - sessionManager.flushPendingToolResults?.(); + await flushPendingToolResultsAfterIdle({ + agent: activeSession?.agent, + sessionManager, + }); activeSession.dispose(); throw err; } @@ -940,7 +944,17 @@ export async function runEmbeddedAttempt( }; } finally { // Always tear down the session (and release the lock) before we leave this attempt. - sessionManager?.flushPendingToolResults?.(); + // + // BUGFIX: Wait for the agent to be truly idle before flushing pending tool results. + // pi-agent-core's auto-retry resolves waitForRetry() on assistant message receipt, + // *before* tool execution completes in the retried agent loop. Without this wait, + // flushPendingToolResults() fires while tools are still executing, inserting + // synthetic "missing tool result" errors and causing silent agent failures. + // See: https://github.com/openclaw/openclaw/issues/8643 + await flushPendingToolResultsAfterIdle({ + agent: session?.agent, + sessionManager, + }); session?.dispose(); await sessionLock.release(); } diff --git a/src/agents/pi-embedded-runner/wait-for-idle-before-flush.ts b/src/agents/pi-embedded-runner/wait-for-idle-before-flush.ts new file mode 100644 index 00000000000..c3cefd7d17e --- /dev/null +++ b/src/agents/pi-embedded-runner/wait-for-idle-before-flush.ts @@ -0,0 +1,45 @@ +type IdleAwareAgent = { + waitForIdle?: (() => Promise) | undefined; +}; + +type ToolResultFlushManager = { + flushPendingToolResults?: (() => void) | undefined; +}; + +export const DEFAULT_WAIT_FOR_IDLE_TIMEOUT_MS = 30_000; + +async function waitForAgentIdleBestEffort( + agent: IdleAwareAgent | null | undefined, + timeoutMs: number, +): Promise { + const waitForIdle = agent?.waitForIdle; + if (typeof waitForIdle !== "function") { + return; + } + + let timeoutHandle: ReturnType | undefined; + try { + await Promise.race([ + waitForIdle.call(agent), + new Promise((resolve) => { + timeoutHandle = setTimeout(resolve, timeoutMs); + timeoutHandle.unref?.(); + }), + ]); + } catch { + // Best-effort during cleanup. + } finally { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + } +} + +export async function flushPendingToolResultsAfterIdle(opts: { + agent: IdleAwareAgent | null | undefined; + sessionManager: ToolResultFlushManager | null | undefined; + timeoutMs?: number; +}): Promise { + await waitForAgentIdleBestEffort(opts.agent, opts.timeoutMs ?? DEFAULT_WAIT_FOR_IDLE_TIMEOUT_MS); + opts.sessionManager?.flushPendingToolResults?.(); +}