From d3b2135f862fcc9597dec89f80f41d05c7da6f59 Mon Sep 17 00:00:00 2001 From: rodbland2021 <86267410+rodbland2021@users.noreply.github.com> Date: Sat, 14 Feb 2026 06:35:43 +1100 Subject: [PATCH] fix(agents): wait for agent idle before flushing pending tool results (#13746) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(agents): wait for agent idle before flushing pending tool results When pi-agent-core's auto-retry mechanism handles overloaded/rate-limit errors, it resolves waitForRetry() on assistant message receipt — before tool execution completes in the retried agent loop. This causes the attempt's finally block to call flushPendingToolResults() while tools are still executing, inserting synthetic 'missing tool result' errors and causing silent agent failures. The fix adds a waitForIdle() call before the flush to ensure the agent's retry loop (including tool execution) has fully completed. Evidence from real session: tool call and synthetic error were only 53ms apart — the tool never had a chance to execute before being flushed. Root cause is in pi-agent-core's _resolveRetry() firing on message_end instead of agent_end, but this workaround in OpenClaw prevents the symptom without requiring an upstream fix. Fixes #8643 Fixes #13351 Refs #6682, #12595 * test: add tests for tool result flush race condition Validates that: - Real tool results are not replaced by synthetic errors when they arrive in time - Flush correctly inserts synthetic errors for genuinely orphaned tool calls - Flush is a no-op after real tool results have already been received Refs #8643, #13748 * fix(agents): add waitForIdle to all flushPendingToolResults call sites The original fix only covered the main run finally block, but there are two additional call sites that can trigger flushPendingToolResults while tools are still executing: 1. The catch block in attempt.ts (session setup error handler) 2. The finally block in compact.ts (compaction teardown) Both now await agent.waitForIdle() with a 30s timeout before flushing, matching the pattern already applied to the main finally block. Production testing on VPS with debug logging confirmed these additional paths can fire during sub-agent runs, producing spurious synthetic 'missing tool result' errors. * fix(agents): centralize idle-wait flush and clear timeout handle --------- Co-authored-by: Renue Development Co-authored-by: Peter Steinberger --- ...ner.guard.waitforidle-before-flush.test.ts | 112 ++++++++++++++++++ src/agents/pi-embedded-runner/compact.ts | 6 +- src/agents/pi-embedded-runner/run/attempt.ts | 18 ++- .../wait-for-idle-before-flush.ts | 45 +++++++ 4 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 src/agents/pi-embedded-runner.guard.waitforidle-before-flush.test.ts create mode 100644 src/agents/pi-embedded-runner/wait-for-idle-before-flush.ts diff --git a/src/agents/pi-embedded-runner.guard.waitforidle-before-flush.test.ts b/src/agents/pi-embedded-runner.guard.waitforidle-before-flush.test.ts new file mode 100644 index 00000000000..7ed7c04ef91 --- /dev/null +++ b/src/agents/pi-embedded-runner.guard.waitforidle-before-flush.test.ts @@ -0,0 +1,112 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { flushPendingToolResultsAfterIdle } from "./pi-embedded-runner/wait-for-idle-before-flush.js"; +import { guardSessionManager } from "./session-tool-result-guard-wrapper.js"; + +function assistantToolCall(id: string): AgentMessage { + return { + role: "assistant", + content: [{ type: "toolCall", id, name: "exec", arguments: {} }], + stopReason: "toolUse", + } as AgentMessage; +} + +function toolResult(id: string, text: string): AgentMessage { + return { + role: "toolResult", + toolCallId: id, + content: [{ type: "text", text }], + isError: false, + } as AgentMessage; +} + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + const promise = new Promise((r) => { + resolve = r; + }); + return { promise, resolve }; +} + +function getMessages(sm: ReturnType): AgentMessage[] { + return sm + .getEntries() + .filter((e) => e.type === "message") + .map((e) => (e as { message: AgentMessage }).message); +} + +describe("flushPendingToolResultsAfterIdle", () => { + afterEach(() => { + vi.useRealTimers(); + }); + + it("waits for idle so real tool results can land before flush", async () => { + const sm = guardSessionManager(SessionManager.inMemory()); + const idle = deferred(); + const agent = { waitForIdle: () => idle.promise }; + + sm.appendMessage(assistantToolCall("call_retry_1")); + const flushPromise = flushPendingToolResultsAfterIdle({ + agent, + sessionManager: sm, + timeoutMs: 1_000, + }); + + // Flush is waiting for idle; synthetic result must not appear yet. + await Promise.resolve(); + expect(getMessages(sm).map((m) => m.role)).toEqual(["assistant"]); + + // Tool completes before idle wait finishes. + sm.appendMessage(toolResult("call_retry_1", "command output here")); + idle.resolve(); + await flushPromise; + + const messages = getMessages(sm); + expect(messages.map((m) => m.role)).toEqual(["assistant", "toolResult"]); + expect((messages[1] as { isError?: boolean }).isError).not.toBe(true); + expect((messages[1] as { content?: Array<{ text?: string }> }).content?.[0]?.text).toBe( + "command output here", + ); + }); + + it("flushes pending tool call after timeout when idle never resolves", async () => { + const sm = guardSessionManager(SessionManager.inMemory()); + vi.useFakeTimers(); + const agent = { waitForIdle: () => new Promise(() => {}) }; + + sm.appendMessage(assistantToolCall("call_orphan_1")); + + const flushPromise = flushPendingToolResultsAfterIdle({ + agent, + sessionManager: sm, + timeoutMs: 30, + }); + await vi.advanceTimersByTimeAsync(30); + await flushPromise; + + const entries = getMessages(sm); + + expect(entries.length).toBe(2); + expect(entries[1].role).toBe("toolResult"); + expect((entries[1] as { isError?: boolean }).isError).toBe(true); + expect((entries[1] as { content?: Array<{ text?: string }> }).content?.[0]?.text).toContain( + "missing tool result", + ); + }); + + it("clears timeout handle when waitForIdle resolves first", async () => { + const sm = guardSessionManager(SessionManager.inMemory()); + vi.useFakeTimers(); + const agent = { + waitForIdle: async () => {}, + }; + + await flushPendingToolResultsAfterIdle({ + agent, + sessionManager: sm, + timeoutMs: 30_000, + }); + expect(vi.getTimerCount()).toBe(0); + }); +}); diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 84a0c616618..0eec28249ce 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -74,6 +74,7 @@ import { } from "./system-prompt.js"; import { splitSdkTools } from "./tool-split.js"; import { describeUnknownError, mapThinkingLevel, resolveExecToolDefaults } from "./utils.js"; +import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js"; export type CompactEmbeddedPiSessionParams = { sessionId: string; @@ -471,7 +472,10 @@ export async function compactEmbeddedPiSessionDirect( }, }; } finally { - sessionManager.flushPendingToolResults?.(); + await flushPendingToolResultsAfterIdle({ + agent: session?.agent, + sessionManager, + }); session.dispose(); } } finally { diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 41123de1474..425a30a506d 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -89,6 +89,7 @@ import { } from "../system-prompt.js"; import { splitSdkTools } from "../tool-split.js"; import { describeUnknownError, mapThinkingLevel } from "../utils.js"; +import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js"; import { detectAndLoadPromptImages } from "./images.js"; export function injectHistoryImagesIntoMessages( @@ -577,7 +578,10 @@ export async function runEmbeddedAttempt( activeSession.agent.replaceMessages(limited); } } catch (err) { - sessionManager.flushPendingToolResults?.(); + await flushPendingToolResultsAfterIdle({ + agent: activeSession?.agent, + sessionManager, + }); activeSession.dispose(); throw err; } @@ -940,7 +944,17 @@ export async function runEmbeddedAttempt( }; } finally { // Always tear down the session (and release the lock) before we leave this attempt. - sessionManager?.flushPendingToolResults?.(); + // + // BUGFIX: Wait for the agent to be truly idle before flushing pending tool results. + // pi-agent-core's auto-retry resolves waitForRetry() on assistant message receipt, + // *before* tool execution completes in the retried agent loop. Without this wait, + // flushPendingToolResults() fires while tools are still executing, inserting + // synthetic "missing tool result" errors and causing silent agent failures. + // See: https://github.com/openclaw/openclaw/issues/8643 + await flushPendingToolResultsAfterIdle({ + agent: session?.agent, + sessionManager, + }); session?.dispose(); await sessionLock.release(); } diff --git a/src/agents/pi-embedded-runner/wait-for-idle-before-flush.ts b/src/agents/pi-embedded-runner/wait-for-idle-before-flush.ts new file mode 100644 index 00000000000..c3cefd7d17e --- /dev/null +++ b/src/agents/pi-embedded-runner/wait-for-idle-before-flush.ts @@ -0,0 +1,45 @@ +type IdleAwareAgent = { + waitForIdle?: (() => Promise) | undefined; +}; + +type ToolResultFlushManager = { + flushPendingToolResults?: (() => void) | undefined; +}; + +export const DEFAULT_WAIT_FOR_IDLE_TIMEOUT_MS = 30_000; + +async function waitForAgentIdleBestEffort( + agent: IdleAwareAgent | null | undefined, + timeoutMs: number, +): Promise { + const waitForIdle = agent?.waitForIdle; + if (typeof waitForIdle !== "function") { + return; + } + + let timeoutHandle: ReturnType | undefined; + try { + await Promise.race([ + waitForIdle.call(agent), + new Promise((resolve) => { + timeoutHandle = setTimeout(resolve, timeoutMs); + timeoutHandle.unref?.(); + }), + ]); + } catch { + // Best-effort during cleanup. + } finally { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + } +} + +export async function flushPendingToolResultsAfterIdle(opts: { + agent: IdleAwareAgent | null | undefined; + sessionManager: ToolResultFlushManager | null | undefined; + timeoutMs?: number; +}): Promise { + await waitForAgentIdleBestEffort(opts.agent, opts.timeoutMs ?? DEFAULT_WAIT_FOR_IDLE_TIMEOUT_MS); + opts.sessionManager?.flushPendingToolResults?.(); +}