mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-08 06:54:24 +00:00
fix(agents): wait for agent idle before flushing pending tool results (#13746)
* fix(agents): wait for agent idle before flushing pending tool results When pi-agent-core's auto-retry mechanism handles overloaded/rate-limit errors, it resolves waitForRetry() on assistant message receipt — before tool execution completes in the retried agent loop. This causes the attempt's finally block to call flushPendingToolResults() while tools are still executing, inserting synthetic 'missing tool result' errors and causing silent agent failures. The fix adds a waitForIdle() call before the flush to ensure the agent's retry loop (including tool execution) has fully completed. Evidence from real session: tool call and synthetic error were only 53ms apart — the tool never had a chance to execute before being flushed. Root cause is in pi-agent-core's _resolveRetry() firing on message_end instead of agent_end, but this workaround in OpenClaw prevents the symptom without requiring an upstream fix. Fixes #8643 Fixes #13351 Refs #6682, #12595 * test: add tests for tool result flush race condition Validates that: - Real tool results are not replaced by synthetic errors when they arrive in time - Flush correctly inserts synthetic errors for genuinely orphaned tool calls - Flush is a no-op after real tool results have already been received Refs #8643, #13748 * fix(agents): add waitForIdle to all flushPendingToolResults call sites The original fix only covered the main run finally block, but there are two additional call sites that can trigger flushPendingToolResults while tools are still executing: 1. The catch block in attempt.ts (session setup error handler) 2. The finally block in compact.ts (compaction teardown) Both now await agent.waitForIdle() with a 30s timeout before flushing, matching the pattern already applied to the main finally block. Production testing on VPS with debug logging confirmed these additional paths can fire during sub-agent runs, producing spurious synthetic 'missing tool result' errors. * fix(agents): centralize idle-wait flush and clear timeout handle --------- Co-authored-by: Renue Development <dev@renuebyscience.com> Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
@@ -0,0 +1,112 @@
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import { SessionManager } from "@mariozechner/pi-coding-agent";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { flushPendingToolResultsAfterIdle } from "./pi-embedded-runner/wait-for-idle-before-flush.js";
|
||||
import { guardSessionManager } from "./session-tool-result-guard-wrapper.js";
|
||||
|
||||
function assistantToolCall(id: string): AgentMessage {
|
||||
return {
|
||||
role: "assistant",
|
||||
content: [{ type: "toolCall", id, name: "exec", arguments: {} }],
|
||||
stopReason: "toolUse",
|
||||
} as AgentMessage;
|
||||
}
|
||||
|
||||
function toolResult(id: string, text: string): AgentMessage {
|
||||
return {
|
||||
role: "toolResult",
|
||||
toolCallId: id,
|
||||
content: [{ type: "text", text }],
|
||||
isError: false,
|
||||
} as AgentMessage;
|
||||
}
|
||||
|
||||
function deferred<T>() {
|
||||
let resolve!: (value: T | PromiseLike<T>) => void;
|
||||
const promise = new Promise<T>((r) => {
|
||||
resolve = r;
|
||||
});
|
||||
return { promise, resolve };
|
||||
}
|
||||
|
||||
function getMessages(sm: ReturnType<typeof guardSessionManager>): AgentMessage[] {
|
||||
return sm
|
||||
.getEntries()
|
||||
.filter((e) => e.type === "message")
|
||||
.map((e) => (e as { message: AgentMessage }).message);
|
||||
}
|
||||
|
||||
describe("flushPendingToolResultsAfterIdle", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("waits for idle so real tool results can land before flush", async () => {
|
||||
const sm = guardSessionManager(SessionManager.inMemory());
|
||||
const idle = deferred<void>();
|
||||
const agent = { waitForIdle: () => idle.promise };
|
||||
|
||||
sm.appendMessage(assistantToolCall("call_retry_1"));
|
||||
const flushPromise = flushPendingToolResultsAfterIdle({
|
||||
agent,
|
||||
sessionManager: sm,
|
||||
timeoutMs: 1_000,
|
||||
});
|
||||
|
||||
// Flush is waiting for idle; synthetic result must not appear yet.
|
||||
await Promise.resolve();
|
||||
expect(getMessages(sm).map((m) => m.role)).toEqual(["assistant"]);
|
||||
|
||||
// Tool completes before idle wait finishes.
|
||||
sm.appendMessage(toolResult("call_retry_1", "command output here"));
|
||||
idle.resolve();
|
||||
await flushPromise;
|
||||
|
||||
const messages = getMessages(sm);
|
||||
expect(messages.map((m) => m.role)).toEqual(["assistant", "toolResult"]);
|
||||
expect((messages[1] as { isError?: boolean }).isError).not.toBe(true);
|
||||
expect((messages[1] as { content?: Array<{ text?: string }> }).content?.[0]?.text).toBe(
|
||||
"command output here",
|
||||
);
|
||||
});
|
||||
|
||||
it("flushes pending tool call after timeout when idle never resolves", async () => {
|
||||
const sm = guardSessionManager(SessionManager.inMemory());
|
||||
vi.useFakeTimers();
|
||||
const agent = { waitForIdle: () => new Promise<void>(() => {}) };
|
||||
|
||||
sm.appendMessage(assistantToolCall("call_orphan_1"));
|
||||
|
||||
const flushPromise = flushPendingToolResultsAfterIdle({
|
||||
agent,
|
||||
sessionManager: sm,
|
||||
timeoutMs: 30,
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(30);
|
||||
await flushPromise;
|
||||
|
||||
const entries = getMessages(sm);
|
||||
|
||||
expect(entries.length).toBe(2);
|
||||
expect(entries[1].role).toBe("toolResult");
|
||||
expect((entries[1] as { isError?: boolean }).isError).toBe(true);
|
||||
expect((entries[1] as { content?: Array<{ text?: string }> }).content?.[0]?.text).toContain(
|
||||
"missing tool result",
|
||||
);
|
||||
});
|
||||
|
||||
it("clears timeout handle when waitForIdle resolves first", async () => {
|
||||
const sm = guardSessionManager(SessionManager.inMemory());
|
||||
vi.useFakeTimers();
|
||||
const agent = {
|
||||
waitForIdle: async () => {},
|
||||
};
|
||||
|
||||
await flushPendingToolResultsAfterIdle({
|
||||
agent,
|
||||
sessionManager: sm,
|
||||
timeoutMs: 30_000,
|
||||
});
|
||||
expect(vi.getTimerCount()).toBe(0);
|
||||
});
|
||||
});
|
||||
@@ -74,6 +74,7 @@ import {
|
||||
} from "./system-prompt.js";
|
||||
import { splitSdkTools } from "./tool-split.js";
|
||||
import { describeUnknownError, mapThinkingLevel, resolveExecToolDefaults } from "./utils.js";
|
||||
import { flushPendingToolResultsAfterIdle } from "./wait-for-idle-before-flush.js";
|
||||
|
||||
export type CompactEmbeddedPiSessionParams = {
|
||||
sessionId: string;
|
||||
@@ -471,7 +472,10 @@ export async function compactEmbeddedPiSessionDirect(
|
||||
},
|
||||
};
|
||||
} finally {
|
||||
sessionManager.flushPendingToolResults?.();
|
||||
await flushPendingToolResultsAfterIdle({
|
||||
agent: session?.agent,
|
||||
sessionManager,
|
||||
});
|
||||
session.dispose();
|
||||
}
|
||||
} finally {
|
||||
|
||||
@@ -89,6 +89,7 @@ import {
|
||||
} from "../system-prompt.js";
|
||||
import { splitSdkTools } from "../tool-split.js";
|
||||
import { describeUnknownError, mapThinkingLevel } from "../utils.js";
|
||||
import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js";
|
||||
import { detectAndLoadPromptImages } from "./images.js";
|
||||
|
||||
export function injectHistoryImagesIntoMessages(
|
||||
@@ -577,7 +578,10 @@ export async function runEmbeddedAttempt(
|
||||
activeSession.agent.replaceMessages(limited);
|
||||
}
|
||||
} catch (err) {
|
||||
sessionManager.flushPendingToolResults?.();
|
||||
await flushPendingToolResultsAfterIdle({
|
||||
agent: activeSession?.agent,
|
||||
sessionManager,
|
||||
});
|
||||
activeSession.dispose();
|
||||
throw err;
|
||||
}
|
||||
@@ -940,7 +944,17 @@ export async function runEmbeddedAttempt(
|
||||
};
|
||||
} finally {
|
||||
// Always tear down the session (and release the lock) before we leave this attempt.
|
||||
sessionManager?.flushPendingToolResults?.();
|
||||
//
|
||||
// BUGFIX: Wait for the agent to be truly idle before flushing pending tool results.
|
||||
// pi-agent-core's auto-retry resolves waitForRetry() on assistant message receipt,
|
||||
// *before* tool execution completes in the retried agent loop. Without this wait,
|
||||
// flushPendingToolResults() fires while tools are still executing, inserting
|
||||
// synthetic "missing tool result" errors and causing silent agent failures.
|
||||
// See: https://github.com/openclaw/openclaw/issues/8643
|
||||
await flushPendingToolResultsAfterIdle({
|
||||
agent: session?.agent,
|
||||
sessionManager,
|
||||
});
|
||||
session?.dispose();
|
||||
await sessionLock.release();
|
||||
}
|
||||
|
||||
45
src/agents/pi-embedded-runner/wait-for-idle-before-flush.ts
Normal file
45
src/agents/pi-embedded-runner/wait-for-idle-before-flush.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
type IdleAwareAgent = {
|
||||
waitForIdle?: (() => Promise<void>) | undefined;
|
||||
};
|
||||
|
||||
type ToolResultFlushManager = {
|
||||
flushPendingToolResults?: (() => void) | undefined;
|
||||
};
|
||||
|
||||
export const DEFAULT_WAIT_FOR_IDLE_TIMEOUT_MS = 30_000;
|
||||
|
||||
async function waitForAgentIdleBestEffort(
|
||||
agent: IdleAwareAgent | null | undefined,
|
||||
timeoutMs: number,
|
||||
): Promise<void> {
|
||||
const waitForIdle = agent?.waitForIdle;
|
||||
if (typeof waitForIdle !== "function") {
|
||||
return;
|
||||
}
|
||||
|
||||
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
|
||||
try {
|
||||
await Promise.race([
|
||||
waitForIdle.call(agent),
|
||||
new Promise<void>((resolve) => {
|
||||
timeoutHandle = setTimeout(resolve, timeoutMs);
|
||||
timeoutHandle.unref?.();
|
||||
}),
|
||||
]);
|
||||
} catch {
|
||||
// Best-effort during cleanup.
|
||||
} finally {
|
||||
if (timeoutHandle) {
|
||||
clearTimeout(timeoutHandle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function flushPendingToolResultsAfterIdle(opts: {
|
||||
agent: IdleAwareAgent | null | undefined;
|
||||
sessionManager: ToolResultFlushManager | null | undefined;
|
||||
timeoutMs?: number;
|
||||
}): Promise<void> {
|
||||
await waitForAgentIdleBestEffort(opts.agent, opts.timeoutMs ?? DEFAULT_WAIT_FOR_IDLE_TIMEOUT_MS);
|
||||
opts.sessionManager?.flushPendingToolResults?.();
|
||||
}
|
||||
Reference in New Issue
Block a user