From 747902a26a635f1bc82d6c8b5c4390f0f9abda1c Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 2 Mar 2026 17:23:08 -0800 Subject: [PATCH] fix(hooks): propagate run/tool IDs for tool hook correlation (#32360) * Plugin SDK: add run and tool call fields to tool hooks * Agents: propagate runId and toolCallId in before_tool_call * Agents: thread runId through tool wrapper context * Runner: pass runId into tool hook context * Compaction: pass runId into tool hook context * Agents: scope after_tool_call start data by run * Tests: cover run and tool IDs in before_tool_call hooks * Tests: add run-scoped after_tool_call collision coverage * Hooks: scope adjusted tool params by run * Tests: cover run-scoped adjusted param collisions * Hooks: preserve active tool start metadata until end * Changelog: add tool-hook correlation note --- CHANGELOG.md | 1 + src/agents/pi-embedded-runner/compact.ts | 1 + src/agents/pi-embedded-runner/run/attempt.ts | 2 + .../pi-embedded-subscribe.handlers.tools.ts | 28 ++++-- ...s.before-tool-call.integration.e2e.test.ts | 44 ++++++++- src/agents/pi-tools.before-tool-call.ts | 36 ++++++-- src/agents/pi-tools.ts | 3 + src/plugins/types.ts | 12 +++ .../wired-hooks-after-tool-call.e2e.test.ts | 92 ++++++++++++++++++- 9 files changed, 200 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 55db01b4669..d0bb16bcdef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -99,6 +99,7 @@ Docs: https://docs.openclaw.ai - Agents/tool-result guard: always clear pending tool-call state on interruptions even when synthetic tool results are disabled, preventing orphaned tool-use transcripts that cause follow-up provider request failures. (#32120) Thanks @jnMetaCode. - Hooks/after_tool_call: include embedded session context (`sessionKey`, `agentId`) and fire the hook exactly once per tool execution by removing duplicate adapter-path dispatch in embedded runs. (#32201) Thanks @jbeno, @scoootscooob, @vincentkoc. - Hooks/session-scoped memory context: expose ephemeral `sessionId` in embedded plugin tool contexts and `before_tool_call`/`after_tool_call` hook contexts (including compaction and client-tool wiring) so plugins can isolate per-conversation state across `/new` and `/reset`. Related #31253 and #31304. Thanks @Sid-Qin and @Servo-AIpex. +- Hooks/tool-call correlation: include `runId` and `toolCallId` in plugin tool hook payloads/context and scope tool start/adjusted-param tracking by run to prevent cross-run collisions in `before_tool_call` and `after_tool_call`. (#32360) Thanks @vincentkoc. - Webchat/stream finalization: persist streamed assistant text when final events omit `message`, while keeping final payload precedence and skipping empty stream buffers to prevent disappearing replies after tool turns. (#31920) Thanks @Sid-Qin. - Cron/store migration: normalize legacy cron jobs with string `schedule` and top-level `command`/`timeout` fields into canonical schedule/payload/session-target shape on load, preventing schedule-error loops on old persisted stores. (#31926) Thanks @bmendonca3. - Gateway/Heartbeat model reload: treat `models.*` and `agents.defaults.model` config updates as heartbeat hot-reload triggers so heartbeat picks up model changes without a full gateway restart. (#32046) Thanks @stakeswky. diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 361d5f8593a..f65df4d4290 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -371,6 +371,7 @@ export async function compactEmbeddedPiSessionDirect( agentAccountId: params.agentAccountId, sessionKey: sandboxSessionKey, sessionId: params.sessionId, + runId: params.runId, groupId: params.groupId, groupChannel: params.groupChannel, groupSpace: params.groupSpace, diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 77b466b5730..64a8f2fd2cf 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -645,6 +645,7 @@ export async function runEmbeddedAttempt( senderIsOwner: params.senderIsOwner, sessionKey: sandboxSessionKey, sessionId: params.sessionId, + runId: params.runId, agentDir, workspaceDir: effectiveWorkspace, config: params.config, @@ -920,6 +921,7 @@ export async function runEmbeddedAttempt( agentId: sessionAgentId, sessionKey: sandboxSessionKey, sessionId: params.sessionId, + runId: params.runId, loopDetection: clientToolLoopDetection, }, ) diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.ts b/src/agents/pi-embedded-subscribe.handlers.tools.ts index 87e165c9c77..8abd9469bbc 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.ts @@ -22,8 +22,17 @@ import { consumeAdjustedParamsForToolCall } from "./pi-tools.before-tool-call.js import { buildToolMutationState, isSameToolMutationAction } from "./tool-mutation.js"; import { normalizeToolName } from "./tool-policy.js"; -/** Track tool execution start times and args for after_tool_call hook */ -const toolStartData = new Map(); +type ToolStartRecord = { + startTime: number; + args: unknown; +}; + +/** Track tool execution start data for after_tool_call hook. */ +const toolStartData = new Map(); + +function buildToolStartKey(runId: string, toolCallId: string): string { + return `${runId}:${toolCallId}`; +} function isCronAddAction(args: unknown): boolean { if (!args || typeof args !== "object") { @@ -182,9 +191,10 @@ export async function handleToolExecutionStart( const toolName = normalizeToolName(rawToolName); const toolCallId = String(evt.toolCallId); const args = evt.args; + const runId = ctx.params.runId; // Track start time and args for after_tool_call hook - toolStartData.set(toolCallId, { startTime: Date.now(), args }); + toolStartData.set(buildToolStartKey(runId, toolCallId), { startTime: Date.now(), args }); if (toolName === "read") { const record = args && typeof args === "object" ? (args as Record) : {}; @@ -302,12 +312,14 @@ export async function handleToolExecutionEnd( ) { const toolName = normalizeToolName(String(evt.toolName)); const toolCallId = String(evt.toolCallId); + const runId = ctx.params.runId; const isError = Boolean(evt.isError); const result = evt.result; const isToolError = isError || isToolResultError(result); const sanitizedResult = sanitizeToolResult(result); - const startData = toolStartData.get(toolCallId); - toolStartData.delete(toolCallId); + const toolStartKey = buildToolStartKey(runId, toolCallId); + const startData = toolStartData.get(toolStartKey); + toolStartData.delete(toolStartKey); const callSummary = ctx.state.toolMetaById.get(toolCallId); const meta = callSummary?.meta; ctx.state.toolMetas.push({ toolName, meta }); @@ -364,7 +376,7 @@ export async function handleToolExecutionEnd( startData?.args && typeof startData.args === "object" ? (startData.args as Record) : {}; - const adjustedArgs = consumeAdjustedParamsForToolCall(toolCallId); + const adjustedArgs = consumeAdjustedParamsForToolCall(toolCallId, runId); const afterToolCallArgs = adjustedArgs && typeof adjustedArgs === "object" ? (adjustedArgs as Record) @@ -424,6 +436,8 @@ export async function handleToolExecutionEnd( const hookEvent: PluginHookAfterToolCallEvent = { toolName, params: afterToolCallArgs, + runId, + toolCallId, result: sanitizedResult, error: isToolError ? extractToolErrorMessage(sanitizedResult) : undefined, durationMs, @@ -434,6 +448,8 @@ export async function handleToolExecutionEnd( agentId: ctx.params.agentId, sessionKey: ctx.params.sessionKey, sessionId: ctx.params.sessionId, + runId, + toolCallId, }) .catch((err) => { ctx.log.warn(`after_tool_call hook failed: tool=${toolName} error=${String(err)}`); diff --git a/src/agents/pi-tools.before-tool-call.integration.e2e.test.ts b/src/agents/pi-tools.before-tool-call.integration.e2e.test.ts index ec1d8c5d4f1..d6a86e00a2f 100644 --- a/src/agents/pi-tools.before-tool-call.integration.e2e.test.ts +++ b/src/agents/pi-tools.before-tool-call.integration.e2e.test.ts @@ -3,7 +3,11 @@ import { resetDiagnosticSessionStateForTest } from "../logging/diagnostic-sessio import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; import { toClientToolDefinitions, toToolDefinitions } from "./pi-tool-definition-adapter.js"; import { wrapToolWithAbortSignal } from "./pi-tools.abort.js"; -import { wrapToolWithBeforeToolCallHook } from "./pi-tools.before-tool-call.js"; +import { + __testing as beforeToolCallTesting, + consumeAdjustedParamsForToolCall, + wrapToolWithBeforeToolCallHook, +} from "./pi-tools.before-tool-call.js"; vi.mock("../plugins/hook-runner-global.js"); @@ -37,6 +41,7 @@ describe("before_tool_call hook integration", () => { beforeEach(() => { resetDiagnosticSessionStateForTest(); + beforeToolCallTesting.adjustedParamsByToolCallId.clear(); hookRunner = installMockHookRunner(); }); @@ -123,6 +128,7 @@ describe("before_tool_call hook integration", () => { agentId: "main", sessionKey: "main", sessionId: "ephemeral-main", + runId: "run-main", }); const extensionContext = {} as Parameters[3]; @@ -132,15 +138,51 @@ describe("before_tool_call hook integration", () => { { toolName: "read", params: {}, + runId: "run-main", + toolCallId: "call-5", }, { toolName: "read", agentId: "main", sessionKey: "main", sessionId: "ephemeral-main", + runId: "run-main", + toolCallId: "call-5", }, ); }); + + it("keeps adjusted params isolated per run when toolCallId collides", async () => { + hookRunner.hasHooks.mockReturnValue(true); + hookRunner.runBeforeToolCall + .mockResolvedValueOnce({ params: { marker: "A" } }) + .mockResolvedValueOnce({ params: { marker: "B" } }); + const execute = vi.fn().mockResolvedValue({ content: [], details: { ok: true } }); + // oxlint-disable-next-line typescript/no-explicit-any + const toolA = wrapToolWithBeforeToolCallHook({ name: "Read", execute } as any, { + runId: "run-a", + }); + // oxlint-disable-next-line typescript/no-explicit-any + const toolB = wrapToolWithBeforeToolCallHook({ name: "Read", execute } as any, { + runId: "run-b", + }); + const extensionContextA = {} as Parameters[3]; + const extensionContextB = {} as Parameters[3]; + const sharedToolCallId = "shared-call"; + + await toolA.execute(sharedToolCallId, { path: "/tmp/a.txt" }, undefined, extensionContextA); + await toolB.execute(sharedToolCallId, { path: "/tmp/b.txt" }, undefined, extensionContextB); + + expect(consumeAdjustedParamsForToolCall(sharedToolCallId, "run-a")).toEqual({ + path: "/tmp/a.txt", + marker: "A", + }); + expect(consumeAdjustedParamsForToolCall(sharedToolCallId, "run-b")).toEqual({ + path: "/tmp/b.txt", + marker: "B", + }); + expect(consumeAdjustedParamsForToolCall(sharedToolCallId, "run-a")).toBeUndefined(); + }); }); describe("before_tool_call hook deduplication (#15502)", () => { diff --git a/src/agents/pi-tools.before-tool-call.ts b/src/agents/pi-tools.before-tool-call.ts index 45e48df02eb..c1435c92de8 100644 --- a/src/agents/pi-tools.before-tool-call.ts +++ b/src/agents/pi-tools.before-tool-call.ts @@ -11,6 +11,7 @@ export type HookContext = { sessionKey?: string; /** Ephemeral session UUID — regenerated on /new and /reset. */ sessionId?: string; + runId?: string; loopDetection?: ToolLoopDetectionConfig; }; @@ -23,6 +24,13 @@ const MAX_TRACKED_ADJUSTED_PARAMS = 1024; const LOOP_WARNING_BUCKET_SIZE = 10; const MAX_LOOP_WARNING_KEYS = 256; +function buildAdjustedParamsKey(params: { runId?: string; toolCallId: string }): string { + if (params.runId && params.runId.trim()) { + return `${params.runId}:${params.toolCallId}`; + } + return params.toolCallId; +} + function shouldEmitLoopWarning(state: SessionState, warningKey: string, count: number): boolean { if (!state.toolLoopWarningBuckets) { state.toolLoopWarningBuckets = new Map(); @@ -141,17 +149,22 @@ export async function runBeforeToolCallHook(args: { try { const normalizedParams = isPlainObject(params) ? params : {}; + const toolContext = { + toolName, + ...(args.ctx?.agentId ? { agentId: args.ctx.agentId } : {}), + ...(args.ctx?.sessionKey ? { sessionKey: args.ctx.sessionKey } : {}), + ...(args.ctx?.sessionId ? { sessionId: args.ctx.sessionId } : {}), + ...(args.ctx?.runId ? { runId: args.ctx.runId } : {}), + ...(args.toolCallId ? { toolCallId: args.toolCallId } : {}), + }; const hookResult = await hookRunner.runBeforeToolCall( { toolName, params: normalizedParams, + ...(args.ctx?.runId ? { runId: args.ctx.runId } : {}), + ...(args.toolCallId ? { toolCallId: args.toolCallId } : {}), }, - { - toolName, - agentId: args.ctx?.agentId, - sessionKey: args.ctx?.sessionKey, - sessionId: args.ctx?.sessionId, - }, + toolContext, ); if (hookResult?.block) { @@ -197,7 +210,8 @@ export function wrapToolWithBeforeToolCallHook( throw new Error(outcome.reason); } if (toolCallId) { - adjustedParamsByToolCallId.set(toolCallId, outcome.params); + const adjustedParamsKey = buildAdjustedParamsKey({ runId: ctx?.runId, toolCallId }); + adjustedParamsByToolCallId.set(adjustedParamsKey, outcome.params); if (adjustedParamsByToolCallId.size > MAX_TRACKED_ADJUSTED_PARAMS) { const oldest = adjustedParamsByToolCallId.keys().next().value; if (oldest) { @@ -240,14 +254,16 @@ export function isToolWrappedWithBeforeToolCallHook(tool: AnyAgentTool): boolean return taggedTool[BEFORE_TOOL_CALL_WRAPPED] === true; } -export function consumeAdjustedParamsForToolCall(toolCallId: string): unknown { - const params = adjustedParamsByToolCallId.get(toolCallId); - adjustedParamsByToolCallId.delete(toolCallId); +export function consumeAdjustedParamsForToolCall(toolCallId: string, runId?: string): unknown { + const adjustedParamsKey = buildAdjustedParamsKey({ runId, toolCallId }); + const params = adjustedParamsByToolCallId.get(adjustedParamsKey); + adjustedParamsByToolCallId.delete(adjustedParamsKey); return params; } export const __testing = { BEFORE_TOOL_CALL_WRAPPED, + buildAdjustedParamsKey, adjustedParamsByToolCallId, runBeforeToolCallHook, isPlainObject, diff --git a/src/agents/pi-tools.ts b/src/agents/pi-tools.ts index 3f038e9aadf..7d6fdf1c140 100644 --- a/src/agents/pi-tools.ts +++ b/src/agents/pi-tools.ts @@ -190,6 +190,8 @@ export function createOpenClawCodingTools(options?: { sessionKey?: string; /** Ephemeral session UUID — regenerated on /new and /reset. */ sessionId?: string; + /** Stable run identifier for this agent invocation. */ + runId?: string; agentDir?: string; workspaceDir?: string; config?: OpenClawConfig; @@ -537,6 +539,7 @@ export function createOpenClawCodingTools(options?: { agentId, sessionKey: options?.sessionKey, sessionId: options?.sessionId, + runId: options?.runId, loopDetection: resolveToolLoopDetectionConfig({ cfg: options?.config, agentId }), }), ); diff --git a/src/plugins/types.ts b/src/plugins/types.ts index 96fd41f555e..2dbb0a23bfc 100644 --- a/src/plugins/types.ts +++ b/src/plugins/types.ts @@ -486,13 +486,21 @@ export type PluginHookToolContext = { sessionKey?: string; /** Ephemeral session UUID — regenerated on /new and /reset. */ sessionId?: string; + /** Stable run identifier for this agent invocation. */ + runId?: string; toolName: string; + /** Provider-specific tool call ID when available. */ + toolCallId?: string; }; // before_tool_call hook export type PluginHookBeforeToolCallEvent = { toolName: string; params: Record; + /** Stable run identifier for this agent invocation. */ + runId?: string; + /** Provider-specific tool call ID when available. */ + toolCallId?: string; }; export type PluginHookBeforeToolCallResult = { @@ -505,6 +513,10 @@ export type PluginHookBeforeToolCallResult = { export type PluginHookAfterToolCallEvent = { toolName: string; params: Record; + /** Stable run identifier for this agent invocation. */ + runId?: string; + /** Provider-specific tool call ID when available. */ + toolCallId?: string; result?: unknown; error?: string; durationMs?: number; diff --git a/src/plugins/wired-hooks-after-tool-call.e2e.test.ts b/src/plugins/wired-hooks-after-tool-call.e2e.test.ts index a84c1ad492e..ad04cd80f44 100644 --- a/src/plugins/wired-hooks-after-tool-call.e2e.test.ts +++ b/src/plugins/wired-hooks-after-tool-call.e2e.test.ts @@ -115,10 +115,24 @@ describe("after_tool_call hook wiring", () => { const firstCall = (hookMocks.runner.runAfterToolCall as ReturnType).mock.calls[0]; expect(firstCall).toBeDefined(); const event = firstCall?.[0] as - | { toolName?: string; params?: unknown; error?: unknown; durationMs?: unknown } + | { + toolName?: string; + params?: unknown; + error?: unknown; + durationMs?: unknown; + runId?: string; + toolCallId?: string; + } | undefined; const context = firstCall?.[1] as - | { toolName?: string; agentId?: string; sessionKey?: string; sessionId?: string } + | { + toolName?: string; + agentId?: string; + sessionKey?: string; + sessionId?: string; + runId?: string; + toolCallId?: string; + } | undefined; expect(event).toBeDefined(); expect(context).toBeDefined(); @@ -129,10 +143,14 @@ describe("after_tool_call hook wiring", () => { expect(event.params).toEqual({ path: "/tmp/file.txt" }); expect(event.error).toBeUndefined(); expect(typeof event.durationMs).toBe("number"); + expect(event.runId).toBe("test-run-1"); + expect(event.toolCallId).toBe("wired-hook-call-1"); expect(context.toolName).toBe("read"); expect(context.agentId).toBe("main"); expect(context.sessionKey).toBe("test-session"); expect(context.sessionId).toBe("test-ephemeral-session"); + expect(context.runId).toBe("test-run-1"); + expect(context.toolCallId).toBe("wired-hook-call-1"); }); it("includes error in after_tool_call event on tool failure", async () => { @@ -195,4 +213,74 @@ describe("after_tool_call hook wiring", () => { expect(hookMocks.runner.runAfterToolCall).not.toHaveBeenCalled(); }); + + it("keeps start args isolated per run when toolCallId collides", async () => { + hookMocks.runner.hasHooks.mockReturnValue(true); + const sharedToolCallId = "shared-tool-call-id"; + + const ctxA = createToolHandlerCtx({ + runId: "run-a", + sessionKey: "session-a", + sessionId: "ephemeral-a", + agentId: "agent-a", + }); + const ctxB = createToolHandlerCtx({ + runId: "run-b", + sessionKey: "session-b", + sessionId: "ephemeral-b", + agentId: "agent-b", + }); + + await handleToolExecutionStart( + ctxA as never, + { + type: "tool_execution_start", + toolName: "read", + toolCallId: sharedToolCallId, + args: { path: "/tmp/path-a.txt" }, + } as never, + ); + await handleToolExecutionStart( + ctxB as never, + { + type: "tool_execution_start", + toolName: "read", + toolCallId: sharedToolCallId, + args: { path: "/tmp/path-b.txt" }, + } as never, + ); + + await handleToolExecutionEnd( + ctxA as never, + { + type: "tool_execution_end", + toolName: "read", + toolCallId: sharedToolCallId, + isError: false, + result: { content: [{ type: "text", text: "done-a" }] }, + } as never, + ); + await handleToolExecutionEnd( + ctxB as never, + { + type: "tool_execution_end", + toolName: "read", + toolCallId: sharedToolCallId, + isError: false, + result: { content: [{ type: "text", text: "done-b" }] }, + } as never, + ); + + expect(hookMocks.runner.runAfterToolCall).toHaveBeenCalledTimes(2); + + const callA = (hookMocks.runner.runAfterToolCall as ReturnType).mock.calls[0]; + const callB = (hookMocks.runner.runAfterToolCall as ReturnType).mock.calls[1]; + const eventA = callA?.[0] as { params?: unknown; runId?: string } | undefined; + const eventB = callB?.[0] as { params?: unknown; runId?: string } | undefined; + + expect(eventA?.runId).toBe("run-a"); + expect(eventA?.params).toEqual({ path: "/tmp/path-a.txt" }); + expect(eventB?.runId).toBe("run-b"); + expect(eventB?.params).toEqual({ path: "/tmp/path-b.txt" }); + }); });