mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-08 06:54:24 +00:00
fix: harden routing/session isolation for followups and heartbeat
This commit is contained in:
@@ -11,6 +11,9 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Routing/Session isolation: harden followup routing so explicit cross-channel origin replies never fall back to the active dispatcher on route failure, preserve queued overflow summary routing metadata (`channel`/`to`/`thread`) across followup drain, and prefer originating channel context over internal provider tags for embedded followup runs. This prevents webchat/control-ui context from hijacking Discord-targeted replies in shared sessions. (#25864) Thanks @Gamedesigner.
|
||||
- Messaging tool dedupe: treat originating channel metadata as authoritative for same-target `message.send` suppression in proactive runs (heartbeat/cron/exec-event), including synthetic-provider contexts, so `delivery-mirror` transcript entries no longer cause duplicate Telegram sends. (#25835) Thanks @jadeathena84-arch.
|
||||
- Cron/Heartbeat delivery: stop inheriting cached session `lastThreadId` for heartbeat-mode target resolution unless a thread/topic is explicitly requested, so announce-mode cron and heartbeat deliveries stay on top-level destinations instead of leaking into active conversation threads. (#25730) Thanks @markshields-tl.
|
||||
- Security/Sandbox media: restrict sandbox media tmp-path allowances to OpenClaw-managed tmp roots instead of broad host `os.tmpdir()` trust, and add outbound/channel guardrails (tmp-path lint + media-root smoke tests) to prevent regressions in local media attachment reads.
|
||||
- Config/Plugins: treat stale removed `google-antigravity-auth` plugin references as compatibility warnings (not hard validation errors) across `plugins.entries`, `plugins.allow`, `plugins.deny`, and `plugins.slots.memory`, so startup no longer fails after antigravity removal. (#25538, #25862) Thanks @chilu18.
|
||||
- Security/Message actions: enforce local media root checks for `sendAttachment` and `setGroupIcon` when `sandboxRoot` is unset, preventing attachment hydration from reading arbitrary host files via local absolute paths. This ships in the next npm release. Thanks @GCXWLP for reporting.
|
||||
|
||||
@@ -71,4 +71,18 @@ describe("buildReplyPayloads media filter integration", () => {
|
||||
expect(replyPayloads).toHaveLength(1);
|
||||
expect(replyPayloads[0]?.mediaUrl).toBe("file:///tmp/photo.jpg");
|
||||
});
|
||||
|
||||
it("suppresses same-target replies when messageProvider is synthetic but originatingChannel is set", () => {
|
||||
const { replyPayloads } = buildReplyPayloads({
|
||||
...baseParams,
|
||||
payloads: [{ text: "hello world!" }],
|
||||
messageProvider: "heartbeat",
|
||||
originatingChannel: "telegram",
|
||||
originatingTo: "268300329",
|
||||
messagingToolSentTexts: ["different message"],
|
||||
messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }],
|
||||
});
|
||||
|
||||
expect(replyPayloads).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -32,6 +32,7 @@ export function buildReplyPayloads(params: {
|
||||
messagingToolSentTargets?: Parameters<
|
||||
typeof shouldSuppressMessagingToolReplies
|
||||
>[0]["messagingToolSentTargets"];
|
||||
originatingChannel?: OriginatingChannelType;
|
||||
originatingTo?: string;
|
||||
accountId?: string;
|
||||
}): { replyPayloads: ReplyPayload[]; didLogHeartbeatStrip: boolean } {
|
||||
@@ -86,7 +87,7 @@ export function buildReplyPayloads(params: {
|
||||
const messagingToolSentTexts = params.messagingToolSentTexts ?? [];
|
||||
const messagingToolSentTargets = params.messagingToolSentTargets ?? [];
|
||||
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({
|
||||
messageProvider: params.messageProvider,
|
||||
messageProvider: params.originatingChannel ?? params.messageProvider,
|
||||
messagingToolSentTargets,
|
||||
originatingTo: params.originatingTo,
|
||||
accountId: params.accountId,
|
||||
|
||||
@@ -149,4 +149,22 @@ describe("agent-runner-utils", () => {
|
||||
senderE164: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it("prefers OriginatingChannel over Provider for messageProvider", () => {
|
||||
const run = makeRun();
|
||||
|
||||
const resolved = buildEmbeddedRunContexts({
|
||||
run,
|
||||
sessionCtx: {
|
||||
Provider: "heartbeat",
|
||||
OriginatingChannel: "Telegram",
|
||||
OriginatingTo: "268300329",
|
||||
},
|
||||
hasRepliedRef: undefined,
|
||||
provider: "openai",
|
||||
});
|
||||
|
||||
expect(resolved.embeddedContext.messageProvider).toBe("telegram");
|
||||
expect(resolved.embeddedContext.messageTo).toBe("268300329");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -196,7 +196,10 @@ export function buildEmbeddedContextFromTemplate(params: {
|
||||
sessionId: params.run.sessionId,
|
||||
sessionKey: params.run.sessionKey,
|
||||
agentId: params.run.agentId,
|
||||
messageProvider: params.sessionCtx.Provider?.trim().toLowerCase() || undefined,
|
||||
messageProvider:
|
||||
params.sessionCtx.OriginatingChannel?.trim().toLowerCase() ||
|
||||
params.sessionCtx.Provider?.trim().toLowerCase() ||
|
||||
undefined,
|
||||
agentAccountId: params.sessionCtx.AccountId,
|
||||
messageTo: params.sessionCtx.OriginatingTo ?? params.sessionCtx.To,
|
||||
messageThreadId: params.sessionCtx.MessageThreadId ?? undefined,
|
||||
|
||||
@@ -514,6 +514,7 @@ export async function runReplyAgent(params: {
|
||||
messagingToolSentTexts: runResult.messagingToolSentTexts,
|
||||
messagingToolSentMediaUrls: runResult.messagingToolSentMediaUrls,
|
||||
messagingToolSentTargets: runResult.messagingToolSentTargets,
|
||||
originatingChannel: sessionCtx.OriginatingChannel,
|
||||
originatingTo: sessionCtx.OriginatingTo ?? sessionCtx.To,
|
||||
accountId: sessionCtx.AccountId,
|
||||
});
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
import fs from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { loadSessionStore, saveSessionStore, type SessionEntry } from "../../config/sessions.js";
|
||||
import type { FollowupRun } from "./queue.js";
|
||||
import { createMockTypingController } from "./test-helpers.js";
|
||||
|
||||
const runEmbeddedPiAgentMock = vi.fn();
|
||||
const routeReplyMock = vi.fn();
|
||||
|
||||
vi.mock(
|
||||
"../../agents/model-fallback.js",
|
||||
@@ -17,8 +18,21 @@ vi.mock("../../agents/pi-embedded.js", () => ({
|
||||
runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params),
|
||||
}));
|
||||
|
||||
vi.mock("./route-reply.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("./route-reply.js")>();
|
||||
return {
|
||||
...actual,
|
||||
routeReply: (...args: unknown[]) => routeReplyMock(...args),
|
||||
};
|
||||
});
|
||||
|
||||
import { createFollowupRunner } from "./followup-runner.js";
|
||||
|
||||
beforeEach(() => {
|
||||
routeReplyMock.mockReset();
|
||||
routeReplyMock.mockResolvedValue({ ok: true });
|
||||
});
|
||||
|
||||
const baseQueuedRun = (messageProvider = "whatsapp"): FollowupRun =>
|
||||
({
|
||||
prompt: "hello",
|
||||
@@ -204,6 +218,26 @@ describe("createFollowupRunner messaging tool dedupe", () => {
|
||||
expect(onBlockReply).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("suppresses replies when provider is synthetic but originating channel matches", async () => {
|
||||
const onBlockReply = vi.fn(async () => {});
|
||||
runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
||||
payloads: [{ text: "hello world!" }],
|
||||
messagingToolSentTexts: ["different message"],
|
||||
messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }],
|
||||
meta: {},
|
||||
});
|
||||
|
||||
const runner = createMessagingDedupeRunner(onBlockReply);
|
||||
|
||||
await runner({
|
||||
...baseQueuedRun("heartbeat"),
|
||||
originatingChannel: "telegram",
|
||||
originatingTo: "268300329",
|
||||
} as FollowupRun);
|
||||
|
||||
expect(onBlockReply).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("drops media URL from payload when messaging tool already sent it", async () => {
|
||||
const onBlockReply = vi.fn(async () => {});
|
||||
runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
||||
@@ -278,6 +312,29 @@ describe("createFollowupRunner messaging tool dedupe", () => {
|
||||
expect(store[sessionKey]?.inputTokens).toBe(1_000);
|
||||
expect(store[sessionKey]?.outputTokens).toBe(50);
|
||||
});
|
||||
|
||||
it("does not fall back to dispatcher when explicit origin routing fails", async () => {
|
||||
const onBlockReply = vi.fn(async () => {});
|
||||
runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
||||
payloads: [{ text: "hello world!" }],
|
||||
meta: {},
|
||||
});
|
||||
routeReplyMock.mockResolvedValueOnce({
|
||||
ok: false,
|
||||
error: "forced route failure",
|
||||
});
|
||||
|
||||
const runner = createMessagingDedupeRunner(onBlockReply);
|
||||
|
||||
await runner({
|
||||
...baseQueuedRun("webchat"),
|
||||
originatingChannel: "discord",
|
||||
originatingTo: "channel:C1",
|
||||
} as FollowupRun);
|
||||
|
||||
expect(routeReplyMock).toHaveBeenCalled();
|
||||
expect(onBlockReply).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("createFollowupRunner agentDir forwarding", () => {
|
||||
|
||||
@@ -98,13 +98,10 @@ export function createFollowupRunner(params: {
|
||||
cfg: queued.run.config,
|
||||
});
|
||||
if (!result.ok) {
|
||||
// Log error and fall back to dispatcher if available.
|
||||
// Keep origin isolation strict: do not fall back to the current
|
||||
// dispatcher when explicit origin routing failed.
|
||||
const errorMsg = result.error ?? "unknown error";
|
||||
logVerbose(`followup queue: route-reply failed: ${errorMsg}`);
|
||||
// Fallback: try the dispatcher if routing failed.
|
||||
if (opts?.onBlockReply) {
|
||||
await opts.onBlockReply(payload);
|
||||
}
|
||||
}
|
||||
} else if (opts?.onBlockReply) {
|
||||
await opts.onBlockReply(payload);
|
||||
@@ -259,10 +256,10 @@ export function createFollowupRunner(params: {
|
||||
sentMediaUrls: runResult.messagingToolSentMediaUrls ?? [],
|
||||
});
|
||||
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({
|
||||
messageProvider: queued.run.messageProvider,
|
||||
messageProvider: queued.originatingChannel ?? queued.run.messageProvider,
|
||||
messagingToolSentTargets: runResult.messagingToolSentTargets,
|
||||
originatingTo: queued.originatingTo,
|
||||
accountId: queued.run.agentAccountId,
|
||||
accountId: queued.originatingAccountId ?? queued.run.agentAccountId,
|
||||
});
|
||||
const finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads;
|
||||
|
||||
|
||||
@@ -460,7 +460,10 @@ export async function runPreparedReply(
|
||||
agentDir,
|
||||
sessionId: sessionIdFinal,
|
||||
sessionKey,
|
||||
messageProvider: sessionCtx.Provider?.trim().toLowerCase() || undefined,
|
||||
messageProvider:
|
||||
sessionCtx.OriginatingChannel?.trim().toLowerCase() ||
|
||||
sessionCtx.Provider?.trim().toLowerCase() ||
|
||||
undefined,
|
||||
agentAccountId: sessionCtx.AccountId,
|
||||
groupId: resolveGroupSessionKey(sessionCtx)?.id ?? undefined,
|
||||
groupChannel: sessionCtx.GroupChannel?.trim() ?? sessionCtx.GroupSubject?.trim(),
|
||||
|
||||
@@ -111,11 +111,15 @@ export function scheduleFollowupDrain(
|
||||
break;
|
||||
}
|
||||
if (
|
||||
!(await drainNextQueueItem(queue.items, async () => {
|
||||
!(await drainNextQueueItem(queue.items, async (item) => {
|
||||
await runFollowup({
|
||||
prompt: summaryPrompt,
|
||||
run,
|
||||
enqueuedAt: Date.now(),
|
||||
originatingChannel: item.originatingChannel,
|
||||
originatingTo: item.originatingTo,
|
||||
originatingAccountId: item.originatingAccountId,
|
||||
originatingThreadId: item.originatingThreadId,
|
||||
});
|
||||
}))
|
||||
) {
|
||||
|
||||
@@ -1046,6 +1046,51 @@ describe("followup queue collect routing", () => {
|
||||
expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap.");
|
||||
expect(calls[0]?.prompt).toContain("- first");
|
||||
});
|
||||
|
||||
it("preserves routing metadata on overflow summary followups", async () => {
|
||||
const key = `test-overflow-summary-routing-${Date.now()}`;
|
||||
const calls: FollowupRun[] = [];
|
||||
const done = createDeferred<void>();
|
||||
const runFollowup = async (run: FollowupRun) => {
|
||||
calls.push(run);
|
||||
done.resolve();
|
||||
};
|
||||
const settings: QueueSettings = {
|
||||
mode: "followup",
|
||||
debounceMs: 0,
|
||||
cap: 1,
|
||||
dropPolicy: "summarize",
|
||||
};
|
||||
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "first",
|
||||
originatingChannel: "discord",
|
||||
originatingTo: "channel:C1",
|
||||
originatingThreadId: "1739142736.000100",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
enqueueFollowupRun(
|
||||
key,
|
||||
createRun({
|
||||
prompt: "second",
|
||||
originatingChannel: "discord",
|
||||
originatingTo: "channel:C1",
|
||||
originatingThreadId: "1739142736.000100",
|
||||
}),
|
||||
settings,
|
||||
);
|
||||
|
||||
scheduleFollowupDrain(key, runFollowup);
|
||||
await done.promise;
|
||||
|
||||
expect(calls[0]?.originatingChannel).toBe("discord");
|
||||
expect(calls[0]?.originatingTo).toBe("channel:C1");
|
||||
expect(calls[0]?.originatingThreadId).toBe("1739142736.000100");
|
||||
expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap.");
|
||||
});
|
||||
});
|
||||
|
||||
const emptyCfg = {} as OpenClawConfig;
|
||||
|
||||
@@ -591,6 +591,8 @@ describe("runHeartbeatOnce", () => {
|
||||
SessionKey: sessionKey,
|
||||
From: "+1555",
|
||||
To: "+1555",
|
||||
OriginatingChannel: "whatsapp",
|
||||
OriginatingTo: "+1555",
|
||||
Provider: "heartbeat",
|
||||
}),
|
||||
expect.objectContaining({ isHeartbeat: true, suppressToolErrorWarnings: false }),
|
||||
|
||||
@@ -663,6 +663,10 @@ export async function runHeartbeatOnce(opts: {
|
||||
Body: appendCronStyleCurrentTimeLine(prompt, cfg, startedAt),
|
||||
From: sender,
|
||||
To: sender,
|
||||
OriginatingChannel: delivery.channel !== "none" ? delivery.channel : undefined,
|
||||
OriginatingTo: delivery.to,
|
||||
AccountId: delivery.accountId,
|
||||
MessageThreadId: delivery.threadId,
|
||||
Provider: hasExecCompletion ? "exec-event" : hasCronEvents ? "cron-event" : "heartbeat",
|
||||
SessionKey: sessionKey,
|
||||
};
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { resolveOutboundTarget, resolveSessionDeliveryTarget } from "./targets.js";
|
||||
import {
|
||||
resolveHeartbeatDeliveryTarget,
|
||||
resolveOutboundTarget,
|
||||
resolveSessionDeliveryTarget,
|
||||
} from "./targets.js";
|
||||
import {
|
||||
installResolveOutboundTargetPluginRegistryHooks,
|
||||
runResolveOutboundTargetCoreTests,
|
||||
@@ -175,6 +179,22 @@ describe("resolveSessionDeliveryTarget", () => {
|
||||
expect(resolved.threadId).toBe(999);
|
||||
});
|
||||
|
||||
it("does not inherit lastThreadId in heartbeat mode", () => {
|
||||
const resolved = resolveSessionDeliveryTarget({
|
||||
entry: {
|
||||
sessionId: "sess-heartbeat-thread",
|
||||
updatedAt: 1,
|
||||
lastChannel: "slack",
|
||||
lastTo: "user:U123",
|
||||
lastThreadId: "1739142736.000100",
|
||||
},
|
||||
requestedChannel: "last",
|
||||
mode: "heartbeat",
|
||||
});
|
||||
|
||||
expect(resolved.threadId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("falls back to a provided channel when requested is unsupported", () => {
|
||||
const resolved = resolveSessionDeliveryTarget({
|
||||
entry: {
|
||||
@@ -280,4 +300,25 @@ describe("resolveSessionDeliveryTarget", () => {
|
||||
expect(resolved.threadId).toBe(42);
|
||||
expect(resolved.to).toBe("63448508");
|
||||
});
|
||||
|
||||
it("does not return inherited threadId from resolveHeartbeatDeliveryTarget", () => {
|
||||
const cfg: OpenClawConfig = {};
|
||||
const resolved = resolveHeartbeatDeliveryTarget({
|
||||
cfg,
|
||||
entry: {
|
||||
sessionId: "sess-heartbeat-outbound",
|
||||
updatedAt: 1,
|
||||
lastChannel: "slack",
|
||||
lastTo: "user:U123",
|
||||
lastThreadId: "1739142736.000100",
|
||||
},
|
||||
heartbeat: {
|
||||
target: "last",
|
||||
},
|
||||
});
|
||||
|
||||
expect(resolved.channel).toBe("slack");
|
||||
expect(resolved.to).toBe("user:U123");
|
||||
expect(resolved.threadId).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -115,9 +115,10 @@ export function resolveSessionDeliveryTarget(params: {
|
||||
}
|
||||
}
|
||||
|
||||
const accountId = channel && channel === lastChannel ? lastAccountId : undefined;
|
||||
const threadId = channel && channel === lastChannel ? lastThreadId : undefined;
|
||||
const mode = params.mode ?? (explicitTo ? "explicit" : "implicit");
|
||||
const accountId = channel && channel === lastChannel ? lastAccountId : undefined;
|
||||
const threadId =
|
||||
mode !== "heartbeat" && channel && channel === lastChannel ? lastThreadId : undefined;
|
||||
|
||||
const resolvedThreadId = explicitThreadId ?? threadId;
|
||||
return {
|
||||
|
||||
Reference in New Issue
Block a user