From ccbeb332e096c1d0f64321379886bc2ebbe1d2ac Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 24 Feb 2026 23:13:51 +0000 Subject: [PATCH] fix: harden routing/session isolation for followups and heartbeat --- CHANGELOG.md | 3 + .../reply/agent-runner-payloads.test.ts | 14 +++++ src/auto-reply/reply/agent-runner-payloads.ts | 3 +- .../reply/agent-runner-utils.test.ts | 18 ++++++ src/auto-reply/reply/agent-runner-utils.ts | 5 +- src/auto-reply/reply/agent-runner.ts | 1 + src/auto-reply/reply/followup-runner.test.ts | 59 ++++++++++++++++++- src/auto-reply/reply/followup-runner.ts | 11 ++-- src/auto-reply/reply/get-reply-run.ts | 5 +- src/auto-reply/reply/queue/drain.ts | 6 +- src/auto-reply/reply/reply-flow.test.ts | 45 ++++++++++++++ ...tbeat-runner.returns-default-unset.test.ts | 2 + src/infra/heartbeat-runner.ts | 4 ++ src/infra/outbound/targets.test.ts | 43 +++++++++++++- src/infra/outbound/targets.ts | 5 +- 15 files changed, 209 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e352a5b6f22..fcbd4cb1e56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ Docs: https://docs.openclaw.ai ### Fixes +- Routing/Session isolation: harden followup routing so explicit cross-channel origin replies never fall back to the active dispatcher on route failure, preserve queued overflow summary routing metadata (`channel`/`to`/`thread`) across followup drain, and prefer originating channel context over internal provider tags for embedded followup runs. This prevents webchat/control-ui context from hijacking Discord-targeted replies in shared sessions. (#25864) Thanks @Gamedesigner. +- Messaging tool dedupe: treat originating channel metadata as authoritative for same-target `message.send` suppression in proactive runs (heartbeat/cron/exec-event), including synthetic-provider contexts, so `delivery-mirror` transcript entries no longer cause duplicate Telegram sends. (#25835) Thanks @jadeathena84-arch. +- Cron/Heartbeat delivery: stop inheriting cached session `lastThreadId` for heartbeat-mode target resolution unless a thread/topic is explicitly requested, so announce-mode cron and heartbeat deliveries stay on top-level destinations instead of leaking into active conversation threads. (#25730) Thanks @markshields-tl. - Security/Sandbox media: restrict sandbox media tmp-path allowances to OpenClaw-managed tmp roots instead of broad host `os.tmpdir()` trust, and add outbound/channel guardrails (tmp-path lint + media-root smoke tests) to prevent regressions in local media attachment reads. - Config/Plugins: treat stale removed `google-antigravity-auth` plugin references as compatibility warnings (not hard validation errors) across `plugins.entries`, `plugins.allow`, `plugins.deny`, and `plugins.slots.memory`, so startup no longer fails after antigravity removal. (#25538, #25862) Thanks @chilu18. - Security/Message actions: enforce local media root checks for `sendAttachment` and `setGroupIcon` when `sandboxRoot` is unset, preventing attachment hydration from reading arbitrary host files via local absolute paths. This ships in the next npm release. Thanks @GCXWLP for reporting. diff --git a/src/auto-reply/reply/agent-runner-payloads.test.ts b/src/auto-reply/reply/agent-runner-payloads.test.ts index 88f7d41a4c9..40d0ae72ad3 100644 --- a/src/auto-reply/reply/agent-runner-payloads.test.ts +++ b/src/auto-reply/reply/agent-runner-payloads.test.ts @@ -71,4 +71,18 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads).toHaveLength(1); expect(replyPayloads[0]?.mediaUrl).toBe("file:///tmp/photo.jpg"); }); + + it("suppresses same-target replies when messageProvider is synthetic but originatingChannel is set", () => { + const { replyPayloads } = buildReplyPayloads({ + ...baseParams, + payloads: [{ text: "hello world!" }], + messageProvider: "heartbeat", + originatingChannel: "telegram", + originatingTo: "268300329", + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }], + }); + + expect(replyPayloads).toHaveLength(0); + }); }); diff --git a/src/auto-reply/reply/agent-runner-payloads.ts b/src/auto-reply/reply/agent-runner-payloads.ts index a1de8c1d163..6aaa93aa633 100644 --- a/src/auto-reply/reply/agent-runner-payloads.ts +++ b/src/auto-reply/reply/agent-runner-payloads.ts @@ -32,6 +32,7 @@ export function buildReplyPayloads(params: { messagingToolSentTargets?: Parameters< typeof shouldSuppressMessagingToolReplies >[0]["messagingToolSentTargets"]; + originatingChannel?: OriginatingChannelType; originatingTo?: string; accountId?: string; }): { replyPayloads: ReplyPayload[]; didLogHeartbeatStrip: boolean } { @@ -86,7 +87,7 @@ export function buildReplyPayloads(params: { const messagingToolSentTexts = params.messagingToolSentTexts ?? []; const messagingToolSentTargets = params.messagingToolSentTargets ?? []; const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({ - messageProvider: params.messageProvider, + messageProvider: params.originatingChannel ?? params.messageProvider, messagingToolSentTargets, originatingTo: params.originatingTo, accountId: params.accountId, diff --git a/src/auto-reply/reply/agent-runner-utils.test.ts b/src/auto-reply/reply/agent-runner-utils.test.ts index 0650f5d6520..397cefcb82a 100644 --- a/src/auto-reply/reply/agent-runner-utils.test.ts +++ b/src/auto-reply/reply/agent-runner-utils.test.ts @@ -149,4 +149,22 @@ describe("agent-runner-utils", () => { senderE164: undefined, }); }); + + it("prefers OriginatingChannel over Provider for messageProvider", () => { + const run = makeRun(); + + const resolved = buildEmbeddedRunContexts({ + run, + sessionCtx: { + Provider: "heartbeat", + OriginatingChannel: "Telegram", + OriginatingTo: "268300329", + }, + hasRepliedRef: undefined, + provider: "openai", + }); + + expect(resolved.embeddedContext.messageProvider).toBe("telegram"); + expect(resolved.embeddedContext.messageTo).toBe("268300329"); + }); }); diff --git a/src/auto-reply/reply/agent-runner-utils.ts b/src/auto-reply/reply/agent-runner-utils.ts index 58cf1951227..c3d09877a4d 100644 --- a/src/auto-reply/reply/agent-runner-utils.ts +++ b/src/auto-reply/reply/agent-runner-utils.ts @@ -196,7 +196,10 @@ export function buildEmbeddedContextFromTemplate(params: { sessionId: params.run.sessionId, sessionKey: params.run.sessionKey, agentId: params.run.agentId, - messageProvider: params.sessionCtx.Provider?.trim().toLowerCase() || undefined, + messageProvider: + params.sessionCtx.OriginatingChannel?.trim().toLowerCase() || + params.sessionCtx.Provider?.trim().toLowerCase() || + undefined, agentAccountId: params.sessionCtx.AccountId, messageTo: params.sessionCtx.OriginatingTo ?? params.sessionCtx.To, messageThreadId: params.sessionCtx.MessageThreadId ?? undefined, diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index b00dcd969f8..e3f47246e7c 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -514,6 +514,7 @@ export async function runReplyAgent(params: { messagingToolSentTexts: runResult.messagingToolSentTexts, messagingToolSentMediaUrls: runResult.messagingToolSentMediaUrls, messagingToolSentTargets: runResult.messagingToolSentTargets, + originatingChannel: sessionCtx.OriginatingChannel, originatingTo: sessionCtx.OriginatingTo ?? sessionCtx.To, accountId: sessionCtx.AccountId, }); diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 0da9b1ff76d..a77bb0be44e 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -1,12 +1,13 @@ import fs from "node:fs/promises"; import { tmpdir } from "node:os"; import path from "node:path"; -import { describe, expect, it, vi } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; import { loadSessionStore, saveSessionStore, type SessionEntry } from "../../config/sessions.js"; import type { FollowupRun } from "./queue.js"; import { createMockTypingController } from "./test-helpers.js"; const runEmbeddedPiAgentMock = vi.fn(); +const routeReplyMock = vi.fn(); vi.mock( "../../agents/model-fallback.js", @@ -17,8 +18,21 @@ vi.mock("../../agents/pi-embedded.js", () => ({ runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params), })); +vi.mock("./route-reply.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + routeReply: (...args: unknown[]) => routeReplyMock(...args), + }; +}); + import { createFollowupRunner } from "./followup-runner.js"; +beforeEach(() => { + routeReplyMock.mockReset(); + routeReplyMock.mockResolvedValue({ ok: true }); +}); + const baseQueuedRun = (messageProvider = "whatsapp"): FollowupRun => ({ prompt: "hello", @@ -204,6 +218,26 @@ describe("createFollowupRunner messaging tool dedupe", () => { expect(onBlockReply).not.toHaveBeenCalled(); }); + it("suppresses replies when provider is synthetic but originating channel matches", async () => { + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + messagingToolSentTexts: ["different message"], + messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }], + meta: {}, + }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("heartbeat"), + originatingChannel: "telegram", + originatingTo: "268300329", + } as FollowupRun); + + expect(onBlockReply).not.toHaveBeenCalled(); + }); + it("drops media URL from payload when messaging tool already sent it", async () => { const onBlockReply = vi.fn(async () => {}); runEmbeddedPiAgentMock.mockResolvedValueOnce({ @@ -278,6 +312,29 @@ describe("createFollowupRunner messaging tool dedupe", () => { expect(store[sessionKey]?.inputTokens).toBe(1_000); expect(store[sessionKey]?.outputTokens).toBe(50); }); + + it("does not fall back to dispatcher when explicit origin routing fails", async () => { + const onBlockReply = vi.fn(async () => {}); + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "hello world!" }], + meta: {}, + }); + routeReplyMock.mockResolvedValueOnce({ + ok: false, + error: "forced route failure", + }); + + const runner = createMessagingDedupeRunner(onBlockReply); + + await runner({ + ...baseQueuedRun("webchat"), + originatingChannel: "discord", + originatingTo: "channel:C1", + } as FollowupRun); + + expect(routeReplyMock).toHaveBeenCalled(); + expect(onBlockReply).not.toHaveBeenCalled(); + }); }); describe("createFollowupRunner agentDir forwarding", () => { diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index cdae8d014af..5c0ec491f56 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -98,13 +98,10 @@ export function createFollowupRunner(params: { cfg: queued.run.config, }); if (!result.ok) { - // Log error and fall back to dispatcher if available. + // Keep origin isolation strict: do not fall back to the current + // dispatcher when explicit origin routing failed. const errorMsg = result.error ?? "unknown error"; logVerbose(`followup queue: route-reply failed: ${errorMsg}`); - // Fallback: try the dispatcher if routing failed. - if (opts?.onBlockReply) { - await opts.onBlockReply(payload); - } } } else if (opts?.onBlockReply) { await opts.onBlockReply(payload); @@ -259,10 +256,10 @@ export function createFollowupRunner(params: { sentMediaUrls: runResult.messagingToolSentMediaUrls ?? [], }); const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({ - messageProvider: queued.run.messageProvider, + messageProvider: queued.originatingChannel ?? queued.run.messageProvider, messagingToolSentTargets: runResult.messagingToolSentTargets, originatingTo: queued.originatingTo, - accountId: queued.run.agentAccountId, + accountId: queued.originatingAccountId ?? queued.run.agentAccountId, }); const finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads; diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index 85f657b4815..edcf15b31c7 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -460,7 +460,10 @@ export async function runPreparedReply( agentDir, sessionId: sessionIdFinal, sessionKey, - messageProvider: sessionCtx.Provider?.trim().toLowerCase() || undefined, + messageProvider: + sessionCtx.OriginatingChannel?.trim().toLowerCase() || + sessionCtx.Provider?.trim().toLowerCase() || + undefined, agentAccountId: sessionCtx.AccountId, groupId: resolveGroupSessionKey(sessionCtx)?.id ?? undefined, groupChannel: sessionCtx.GroupChannel?.trim() ?? sessionCtx.GroupSubject?.trim(), diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index 75e6ffa07d8..b37c2b01f15 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -111,11 +111,15 @@ export function scheduleFollowupDrain( break; } if ( - !(await drainNextQueueItem(queue.items, async () => { + !(await drainNextQueueItem(queue.items, async (item) => { await runFollowup({ prompt: summaryPrompt, run, enqueuedAt: Date.now(), + originatingChannel: item.originatingChannel, + originatingTo: item.originatingTo, + originatingAccountId: item.originatingAccountId, + originatingThreadId: item.originatingThreadId, }); })) ) { diff --git a/src/auto-reply/reply/reply-flow.test.ts b/src/auto-reply/reply/reply-flow.test.ts index 3f79e3e6803..3b91cf52d40 100644 --- a/src/auto-reply/reply/reply-flow.test.ts +++ b/src/auto-reply/reply/reply-flow.test.ts @@ -1046,6 +1046,51 @@ describe("followup queue collect routing", () => { expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap."); expect(calls[0]?.prompt).toContain("- first"); }); + + it("preserves routing metadata on overflow summary followups", async () => { + const key = `test-overflow-summary-routing-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + done.resolve(); + }; + const settings: QueueSettings = { + mode: "followup", + debounceMs: 0, + cap: 1, + dropPolicy: "summarize", + }; + + enqueueFollowupRun( + key, + createRun({ + prompt: "first", + originatingChannel: "discord", + originatingTo: "channel:C1", + originatingThreadId: "1739142736.000100", + }), + settings, + ); + enqueueFollowupRun( + key, + createRun({ + prompt: "second", + originatingChannel: "discord", + originatingTo: "channel:C1", + originatingThreadId: "1739142736.000100", + }), + settings, + ); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + + expect(calls[0]?.originatingChannel).toBe("discord"); + expect(calls[0]?.originatingTo).toBe("channel:C1"); + expect(calls[0]?.originatingThreadId).toBe("1739142736.000100"); + expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap."); + }); }); const emptyCfg = {} as OpenClawConfig; diff --git a/src/infra/heartbeat-runner.returns-default-unset.test.ts b/src/infra/heartbeat-runner.returns-default-unset.test.ts index d0d34a7bd75..908f45ebb52 100644 --- a/src/infra/heartbeat-runner.returns-default-unset.test.ts +++ b/src/infra/heartbeat-runner.returns-default-unset.test.ts @@ -591,6 +591,8 @@ describe("runHeartbeatOnce", () => { SessionKey: sessionKey, From: "+1555", To: "+1555", + OriginatingChannel: "whatsapp", + OriginatingTo: "+1555", Provider: "heartbeat", }), expect.objectContaining({ isHeartbeat: true, suppressToolErrorWarnings: false }), diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index a34ccfdb7e3..ad2c091f156 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -663,6 +663,10 @@ export async function runHeartbeatOnce(opts: { Body: appendCronStyleCurrentTimeLine(prompt, cfg, startedAt), From: sender, To: sender, + OriginatingChannel: delivery.channel !== "none" ? delivery.channel : undefined, + OriginatingTo: delivery.to, + AccountId: delivery.accountId, + MessageThreadId: delivery.threadId, Provider: hasExecCompletion ? "exec-event" : hasCronEvents ? "cron-event" : "heartbeat", SessionKey: sessionKey, }; diff --git a/src/infra/outbound/targets.test.ts b/src/infra/outbound/targets.test.ts index ac9fa08b1e7..9d7ec7dde5e 100644 --- a/src/infra/outbound/targets.test.ts +++ b/src/infra/outbound/targets.test.ts @@ -1,6 +1,10 @@ import { describe, expect, it } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; -import { resolveOutboundTarget, resolveSessionDeliveryTarget } from "./targets.js"; +import { + resolveHeartbeatDeliveryTarget, + resolveOutboundTarget, + resolveSessionDeliveryTarget, +} from "./targets.js"; import { installResolveOutboundTargetPluginRegistryHooks, runResolveOutboundTargetCoreTests, @@ -175,6 +179,22 @@ describe("resolveSessionDeliveryTarget", () => { expect(resolved.threadId).toBe(999); }); + it("does not inherit lastThreadId in heartbeat mode", () => { + const resolved = resolveSessionDeliveryTarget({ + entry: { + sessionId: "sess-heartbeat-thread", + updatedAt: 1, + lastChannel: "slack", + lastTo: "user:U123", + lastThreadId: "1739142736.000100", + }, + requestedChannel: "last", + mode: "heartbeat", + }); + + expect(resolved.threadId).toBeUndefined(); + }); + it("falls back to a provided channel when requested is unsupported", () => { const resolved = resolveSessionDeliveryTarget({ entry: { @@ -280,4 +300,25 @@ describe("resolveSessionDeliveryTarget", () => { expect(resolved.threadId).toBe(42); expect(resolved.to).toBe("63448508"); }); + + it("does not return inherited threadId from resolveHeartbeatDeliveryTarget", () => { + const cfg: OpenClawConfig = {}; + const resolved = resolveHeartbeatDeliveryTarget({ + cfg, + entry: { + sessionId: "sess-heartbeat-outbound", + updatedAt: 1, + lastChannel: "slack", + lastTo: "user:U123", + lastThreadId: "1739142736.000100", + }, + heartbeat: { + target: "last", + }, + }); + + expect(resolved.channel).toBe("slack"); + expect(resolved.to).toBe("user:U123"); + expect(resolved.threadId).toBeUndefined(); + }); }); diff --git a/src/infra/outbound/targets.ts b/src/infra/outbound/targets.ts index 608e62c6005..8a33353bb5a 100644 --- a/src/infra/outbound/targets.ts +++ b/src/infra/outbound/targets.ts @@ -115,9 +115,10 @@ export function resolveSessionDeliveryTarget(params: { } } - const accountId = channel && channel === lastChannel ? lastAccountId : undefined; - const threadId = channel && channel === lastChannel ? lastThreadId : undefined; const mode = params.mode ?? (explicitTo ? "explicit" : "implicit"); + const accountId = channel && channel === lastChannel ? lastAccountId : undefined; + const threadId = + mode !== "heartbeat" && channel && channel === lastChannel ? lastThreadId : undefined; const resolvedThreadId = explicitThreadId ?? threadId; return {