fix(channels,sandbox): land hard breakage cluster from reviewed PR bases

Lands reviewed fixes based on #25839 (@pewallin), #25841 (@joshjhall), and #25737/@25713 (@DennisGoldfinger/@peteragility), with additional hardening + regression tests for queue cleanup and shell script safety.

Fixes #25836
Fixes #25840
Fixes #25824
Fixes #25868

Co-authored-by: Peter Wallin <pwallin@gmail.com>
Co-authored-by: Joshua Hall <josh@yaplabs.com>
Co-authored-by: Dennis Goldfinger <dennisgoldfinger@gmail.com>
Co-authored-by: peteragility <peteragility@users.noreply.github.com>
This commit is contained in:
Peter Steinberger
2026-02-24 23:27:12 +00:00
parent 5552f9073f
commit e7a5f9f4d8
11 changed files with 380 additions and 115 deletions

View File

@@ -15,6 +15,9 @@ Docs: https://docs.openclaw.ai
### Fixes
- Discord/Block streaming: restore block-streamed reply delivery by suppressing only reasoning payloads (instead of all `block` payloads), fixing missing Discord replies in `channels.discord.streaming=block` mode. (#25839, #25836, #25792) Thanks @pewallin.
- Matrix/Read receipts: send read receipts as soon as Matrix messages arrive (before handler pipeline work), so clients no longer show long-lived unread/sent states while replies are processing. (#25841, #25840) Thanks @joshjhall.
- Sandbox/FS bridge: build canonical-path shell scripts with newline separators (not `; ` joins) to avoid POSIX `sh` `do;` syntax errors that broke sandbox file/image read-write operations. (#25737, #25824, #25868) Thanks @DennisGoldfinger and @peteragility.
- Routing/Session isolation: harden followup routing so explicit cross-channel origin replies never fall back to the active dispatcher on route failure, preserve queued overflow summary routing metadata (`channel`/`to`/`thread`) across followup drain, and prefer originating channel context over internal provider tags for embedded followup runs. This prevents webchat/control-ui context from hijacking Discord-targeted replies in shared sessions. (#25864) Thanks @Gamedesigner.
- Messaging tool dedupe: treat originating channel metadata as authoritative for same-target `message.send` suppression in proactive runs (heartbeat/cron/exec-event), including synthetic-provider contexts, so `delivery-mirror` transcript entries no longer cause duplicate Telegram sends. (#25835) Thanks @jadeathena84-arch.
- Cron/Heartbeat delivery: stop inheriting cached session `lastThreadId` for heartbeat-mode target resolution unless a thread/topic is explicitly requested, so announce-mode cron and heartbeat deliveries stay on top-level destinations instead of leaking into active conversation threads. (#25730) Thanks @markshields-tl.

View File

@@ -0,0 +1,96 @@
import type { MatrixClient } from "@vector-im/matrix-bot-sdk";
import type { PluginRuntime, RuntimeLogger } from "openclaw/plugin-sdk";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { MatrixAuth } from "../client.js";
import { registerMatrixMonitorEvents } from "./events.js";
import type { MatrixRawEvent } from "./types.js";
const sendReadReceiptMatrixMock = vi.hoisted(() => vi.fn().mockResolvedValue(undefined));
vi.mock("../send.js", () => ({
sendReadReceiptMatrix: (...args: unknown[]) => sendReadReceiptMatrixMock(...args),
}));
describe("registerMatrixMonitorEvents", () => {
beforeEach(() => {
sendReadReceiptMatrixMock.mockClear();
});
function createHarness() {
const handlers = new Map<string, (...args: unknown[]) => void>();
const client = {
on: vi.fn((event: string, handler: (...args: unknown[]) => void) => {
handlers.set(event, handler);
}),
getUserId: vi.fn().mockResolvedValue("@bot:example.org"),
crypto: undefined,
} as unknown as MatrixClient;
const onRoomMessage = vi.fn();
const logVerboseMessage = vi.fn();
const logger = {
warn: vi.fn(),
} as unknown as RuntimeLogger;
registerMatrixMonitorEvents({
client,
auth: { encryption: false } as MatrixAuth,
logVerboseMessage,
warnedEncryptedRooms: new Set<string>(),
warnedCryptoMissingRooms: new Set<string>(),
logger,
formatNativeDependencyHint: (() =>
"") as PluginRuntime["system"]["formatNativeDependencyHint"],
onRoomMessage,
});
const roomMessageHandler = handlers.get("room.message");
if (!roomMessageHandler) {
throw new Error("missing room.message handler");
}
return { client, onRoomMessage, roomMessageHandler };
}
it("sends read receipt immediately for non-self messages", async () => {
const { client, onRoomMessage, roomMessageHandler } = createHarness();
const event = {
event_id: "$e1",
sender: "@alice:example.org",
} as MatrixRawEvent;
roomMessageHandler("!room:example.org", event);
expect(onRoomMessage).toHaveBeenCalledWith("!room:example.org", event);
await vi.waitFor(() => {
expect(sendReadReceiptMatrixMock).toHaveBeenCalledWith("!room:example.org", "$e1", client);
});
});
it("does not send read receipts for self messages", async () => {
const { onRoomMessage, roomMessageHandler } = createHarness();
const event = {
event_id: "$e2",
sender: "@bot:example.org",
} as MatrixRawEvent;
roomMessageHandler("!room:example.org", event);
await vi.waitFor(() => {
expect(onRoomMessage).toHaveBeenCalledWith("!room:example.org", event);
});
expect(sendReadReceiptMatrixMock).not.toHaveBeenCalled();
});
it("skips receipt when message lacks sender or event id", async () => {
const { onRoomMessage, roomMessageHandler } = createHarness();
const event = {
sender: "@alice:example.org",
} as MatrixRawEvent;
roomMessageHandler("!room:example.org", event);
await vi.waitFor(() => {
expect(onRoomMessage).toHaveBeenCalledWith("!room:example.org", event);
});
expect(sendReadReceiptMatrixMock).not.toHaveBeenCalled();
});
});

View File

@@ -1,6 +1,7 @@
import type { MatrixClient } from "@vector-im/matrix-bot-sdk";
import type { PluginRuntime, RuntimeLogger } from "openclaw/plugin-sdk";
import type { MatrixAuth } from "../client.js";
import { sendReadReceiptMatrix } from "../send.js";
import type { MatrixRawEvent } from "./types.js";
import { EventType } from "./types.js";
@@ -25,7 +26,32 @@ export function registerMatrixMonitorEvents(params: {
onRoomMessage,
} = params;
client.on("room.message", onRoomMessage);
let selfUserId: string | undefined;
client.on("room.message", (roomId: string, event: MatrixRawEvent) => {
const eventId = event?.event_id;
const senderId = event?.sender;
if (eventId && senderId) {
void (async () => {
if (!selfUserId) {
try {
selfUserId = await client.getUserId();
} catch {
return;
}
}
if (senderId === selfUserId) {
return;
}
await sendReadReceiptMatrix(roomId, eventId, client).catch((err) => {
logVerboseMessage(
`matrix: early read receipt failed room=${roomId} id=${eventId}: ${String(err)}`,
);
});
})();
}
onRoomMessage(roomId, event);
});
client.on("room.encrypted_event", (roomId: string, event: MatrixRawEvent) => {
const eventId = event?.event_id ?? "unknown";

View File

@@ -18,12 +18,7 @@ import {
parsePollStartContent,
type PollStartContent,
} from "../poll-types.js";
import {
reactMatrixMessage,
sendMessageMatrix,
sendReadReceiptMatrix,
sendTypingMatrix,
} from "../send.js";
import { reactMatrixMessage, sendMessageMatrix, sendTypingMatrix } from "../send.js";
import {
normalizeMatrixAllowList,
resolveMatrixAllowListMatch,
@@ -602,14 +597,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
return;
}
if (messageId) {
sendReadReceiptMatrix(roomId, messageId, client).catch((err) => {
logVerboseMessage(
`matrix: read receipt failed room=${roomId} id=${messageId}: ${String(err)}`,
);
});
}
let didSendReply = false;
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg,

View File

@@ -0,0 +1,89 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { enqueueSend } from "./send-queue.js";
function deferred<T>() {
let resolve!: (value: T | PromiseLike<T>) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
describe("enqueueSend", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("serializes sends per room", async () => {
const gate = deferred<void>();
const events: string[] = [];
const first = enqueueSend("!room:example.org", async () => {
events.push("start1");
await gate.promise;
events.push("end1");
return "one";
});
const second = enqueueSend("!room:example.org", async () => {
events.push("start2");
events.push("end2");
return "two";
});
await vi.advanceTimersByTimeAsync(150);
expect(events).toEqual(["start1"]);
await vi.advanceTimersByTimeAsync(300);
expect(events).toEqual(["start1"]);
gate.resolve();
await first;
await vi.advanceTimersByTimeAsync(149);
expect(events).toEqual(["start1", "end1"]);
await vi.advanceTimersByTimeAsync(1);
await second;
expect(events).toEqual(["start1", "end1", "start2", "end2"]);
});
it("does not serialize across different rooms", async () => {
const events: string[] = [];
const a = enqueueSend("!a:example.org", async () => {
events.push("a");
return "a";
});
const b = enqueueSend("!b:example.org", async () => {
events.push("b");
return "b";
});
await vi.advanceTimersByTimeAsync(150);
await Promise.all([a, b]);
expect(events.sort()).toEqual(["a", "b"]);
});
it("continues queue after failures", async () => {
const first = enqueueSend("!room:example.org", async () => {
throw new Error("boom");
}).then(
() => ({ ok: true as const }),
(error) => ({ ok: false as const, error }),
);
await vi.advanceTimersByTimeAsync(150);
const firstResult = await first;
expect(firstResult.ok).toBe(false);
expect(firstResult.error).toBeInstanceOf(Error);
expect((firstResult.error as Error).message).toBe("boom");
const second = enqueueSend("!room:example.org", async () => "ok");
await vi.advanceTimersByTimeAsync(150);
await expect(second).resolves.toBe("ok");
});
});

View File

@@ -0,0 +1,33 @@
const SEND_GAP_MS = 150;
// Serialize sends per room to preserve Matrix delivery order.
const roomQueues = new Map<string, Promise<void>>();
export async function enqueueSend<T>(roomId: string, fn: () => Promise<T>): Promise<T> {
const previous = roomQueues.get(roomId) ?? Promise.resolve();
const next = previous
.catch(() => {})
.then(async () => {
await delay(SEND_GAP_MS);
return await fn();
});
const queueMarker = next.then(
() => {},
() => {},
);
roomQueues.set(roomId, queueMarker);
queueMarker.finally(() => {
if (roomQueues.get(roomId) === queueMarker) {
roomQueues.delete(roomId);
}
});
return await next;
}
function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

View File

@@ -2,6 +2,7 @@ import type { MatrixClient } from "@vector-im/matrix-bot-sdk";
import type { PollInput } from "openclaw/plugin-sdk";
import { getMatrixRuntime } from "../runtime.js";
import { buildPollStartContent, M_POLL_START } from "./poll-types.js";
import { enqueueSend } from "./send-queue.js";
import { resolveMatrixClient, resolveMediaMaxBytes } from "./send/client.js";
import {
buildReplyRelation,
@@ -49,103 +50,105 @@ export async function sendMessageMatrix(
});
try {
const roomId = await resolveMatrixRoomId(client, to);
const cfg = getCore().config.loadConfig();
const tableMode = getCore().channel.text.resolveMarkdownTableMode({
cfg,
channel: "matrix",
accountId: opts.accountId,
});
const convertedMessage = getCore().channel.text.convertMarkdownTables(
trimmedMessage,
tableMode,
);
const textLimit = getCore().channel.text.resolveTextChunkLimit(cfg, "matrix");
const chunkLimit = Math.min(textLimit, MATRIX_TEXT_LIMIT);
const chunkMode = getCore().channel.text.resolveChunkMode(cfg, "matrix", opts.accountId);
const chunks = getCore().channel.text.chunkMarkdownTextWithMode(
convertedMessage,
chunkLimit,
chunkMode,
);
const threadId = normalizeThreadId(opts.threadId);
const relation = threadId
? buildThreadRelation(threadId, opts.replyToId)
: buildReplyRelation(opts.replyToId);
const sendContent = async (content: MatrixOutboundContent) => {
// @vector-im/matrix-bot-sdk uses sendMessage differently
const eventId = await client.sendMessage(roomId, content);
return eventId;
};
return await enqueueSend(roomId, async () => {
const cfg = getCore().config.loadConfig();
const tableMode = getCore().channel.text.resolveMarkdownTableMode({
cfg,
channel: "matrix",
accountId: opts.accountId,
});
const convertedMessage = getCore().channel.text.convertMarkdownTables(
trimmedMessage,
tableMode,
);
const textLimit = getCore().channel.text.resolveTextChunkLimit(cfg, "matrix");
const chunkLimit = Math.min(textLimit, MATRIX_TEXT_LIMIT);
const chunkMode = getCore().channel.text.resolveChunkMode(cfg, "matrix", opts.accountId);
const chunks = getCore().channel.text.chunkMarkdownTextWithMode(
convertedMessage,
chunkLimit,
chunkMode,
);
const threadId = normalizeThreadId(opts.threadId);
const relation = threadId
? buildThreadRelation(threadId, opts.replyToId)
: buildReplyRelation(opts.replyToId);
const sendContent = async (content: MatrixOutboundContent) => {
// @vector-im/matrix-bot-sdk uses sendMessage differently
const eventId = await client.sendMessage(roomId, content);
return eventId;
};
let lastMessageId = "";
if (opts.mediaUrl) {
const maxBytes = resolveMediaMaxBytes(opts.accountId);
const media = await getCore().media.loadWebMedia(opts.mediaUrl, maxBytes);
const uploaded = await uploadMediaMaybeEncrypted(client, roomId, media.buffer, {
contentType: media.contentType,
filename: media.fileName,
});
const durationMs = await resolveMediaDurationMs({
buffer: media.buffer,
contentType: media.contentType,
fileName: media.fileName,
kind: media.kind,
});
const baseMsgType = resolveMatrixMsgType(media.contentType, media.fileName);
const { useVoice } = resolveMatrixVoiceDecision({
wantsVoice: opts.audioAsVoice === true,
contentType: media.contentType,
fileName: media.fileName,
});
const msgtype = useVoice ? MsgType.Audio : baseMsgType;
const isImage = msgtype === MsgType.Image;
const imageInfo = isImage
? await prepareImageInfo({ buffer: media.buffer, client })
: undefined;
const [firstChunk, ...rest] = chunks;
const body = useVoice ? "Voice message" : (firstChunk ?? media.fileName ?? "(file)");
const content = buildMediaContent({
msgtype,
body,
url: uploaded.url,
file: uploaded.file,
filename: media.fileName,
mimetype: media.contentType,
size: media.buffer.byteLength,
durationMs,
relation,
isVoice: useVoice,
imageInfo,
});
const eventId = await sendContent(content);
lastMessageId = eventId ?? lastMessageId;
const textChunks = useVoice ? chunks : rest;
const followupRelation = threadId ? relation : undefined;
for (const chunk of textChunks) {
const text = chunk.trim();
if (!text) {
continue;
}
const followup = buildTextContent(text, followupRelation);
const followupEventId = await sendContent(followup);
lastMessageId = followupEventId ?? lastMessageId;
}
} else {
for (const chunk of chunks.length ? chunks : [""]) {
const text = chunk.trim();
if (!text) {
continue;
}
const content = buildTextContent(text, relation);
let lastMessageId = "";
if (opts.mediaUrl) {
const maxBytes = resolveMediaMaxBytes(opts.accountId);
const media = await getCore().media.loadWebMedia(opts.mediaUrl, maxBytes);
const uploaded = await uploadMediaMaybeEncrypted(client, roomId, media.buffer, {
contentType: media.contentType,
filename: media.fileName,
});
const durationMs = await resolveMediaDurationMs({
buffer: media.buffer,
contentType: media.contentType,
fileName: media.fileName,
kind: media.kind,
});
const baseMsgType = resolveMatrixMsgType(media.contentType, media.fileName);
const { useVoice } = resolveMatrixVoiceDecision({
wantsVoice: opts.audioAsVoice === true,
contentType: media.contentType,
fileName: media.fileName,
});
const msgtype = useVoice ? MsgType.Audio : baseMsgType;
const isImage = msgtype === MsgType.Image;
const imageInfo = isImage
? await prepareImageInfo({ buffer: media.buffer, client })
: undefined;
const [firstChunk, ...rest] = chunks;
const body = useVoice ? "Voice message" : (firstChunk ?? media.fileName ?? "(file)");
const content = buildMediaContent({
msgtype,
body,
url: uploaded.url,
file: uploaded.file,
filename: media.fileName,
mimetype: media.contentType,
size: media.buffer.byteLength,
durationMs,
relation,
isVoice: useVoice,
imageInfo,
});
const eventId = await sendContent(content);
lastMessageId = eventId ?? lastMessageId;
const textChunks = useVoice ? chunks : rest;
const followupRelation = threadId ? relation : undefined;
for (const chunk of textChunks) {
const text = chunk.trim();
if (!text) {
continue;
}
const followup = buildTextContent(text, followupRelation);
const followupEventId = await sendContent(followup);
lastMessageId = followupEventId ?? lastMessageId;
}
} else {
for (const chunk of chunks.length ? chunks : [""]) {
const text = chunk.trim();
if (!text) {
continue;
}
const content = buildTextContent(text, relation);
const eventId = await sendContent(content);
lastMessageId = eventId ?? lastMessageId;
}
}
}
return {
messageId: lastMessageId || "unknown",
roomId,
};
return {
messageId: lastMessageId || "unknown",
roomId,
};
});
} finally {
if (stopOnDone) {
client.stop();

View File

@@ -77,10 +77,22 @@ describe("sandbox fs bridge shell compatibility", () => {
const executables = mockedExecDockerRaw.mock.calls.map(([args]) => args[3] ?? "");
expect(executables.every((shell) => shell === "sh")).toBe(true);
expect(scripts.every((script) => script.includes("set -eu;"))).toBe(true);
expect(scripts.every((script) => /set -eu[;\n]/.test(script))).toBe(true);
expect(scripts.some((script) => script.includes("pipefail"))).toBe(false);
});
it("resolveCanonicalContainerPath script is valid POSIX sh (no do; token)", async () => {
const bridge = createSandboxFsBridge({ sandbox: createSandbox() });
await bridge.readFile({ filePath: "a.txt" });
const scripts = mockedExecDockerRaw.mock.calls.map(([args]) => args[5] ?? "");
const canonicalScript = scripts.find((script) => script.includes("allow_final"));
expect(canonicalScript).toBeDefined();
// "; " joining can create "do; cmd", which is invalid in POSIX sh.
expect(canonicalScript).not.toMatch(/\bdo;/);
});
it("resolves bind-mounted absolute container paths for reads", async () => {
const sandbox = createSandbox({
docker: {

View File

@@ -305,7 +305,7 @@ class SandboxFsBridgeImpl implements SandboxFsBridge {
"done",
'canonical=$(readlink -f -- "$cursor")',
'printf "%s%s\\n" "$canonical" "$suffix"',
].join("; ");
].join("\n");
const result = await this.runCommand(script, {
args: [params.containerPath, params.allowFinalSymlink ? "1" : "0"],
});

View File

@@ -31,7 +31,10 @@ const deliverDiscordReply = deliveryMocks.deliverDiscordReply;
const createDiscordDraftStream = deliveryMocks.createDiscordDraftStream;
type DispatchInboundParams = {
dispatcher: {
sendBlockReply: (payload: { text?: string }) => boolean | Promise<boolean>;
sendBlockReply: (payload: {
text?: string;
isReasoning?: boolean;
}) => boolean | Promise<boolean>;
sendFinalReply: (payload: { text?: string }) => boolean | Promise<boolean>;
};
replyOptions?: {
@@ -427,9 +430,9 @@ describe("processDiscordMessage draft streaming", () => {
expect(deliverDiscordReply).toHaveBeenCalledTimes(1);
});
it("suppresses block-kind payload delivery to Discord", async () => {
it("suppresses reasoning payload delivery to Discord", async () => {
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
await params?.dispatcher.sendBlockReply({ text: "thinking..." });
await params?.dispatcher.sendBlockReply({ text: "thinking...", isReasoning: true });
return { queuedFinal: false, counts: { final: 0, tool: 0, block: 1 } };
});
@@ -441,6 +444,20 @@ describe("processDiscordMessage draft streaming", () => {
expect(deliverDiscordReply).not.toHaveBeenCalled();
});
it("delivers non-reasoning block payloads to Discord", async () => {
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
await params?.dispatcher.sendBlockReply({ text: "hello from block stream" });
return { queuedFinal: false, counts: { final: 0, tool: 0, block: 1 } };
});
const ctx = await createBaseContext({ discordConfig: { streamMode: "off" } });
// oxlint-disable-next-line typescript/no-explicit-any
await processDiscordMessage(ctx as any);
expect(deliverDiscordReply).toHaveBeenCalledTimes(1);
});
it("streams block previews using draft chunking", async () => {
const draftStream = createMockDraftStream();
createDiscordDraftStream.mockReturnValueOnce(draftStream);

View File

@@ -564,9 +564,8 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
deliver: async (payload: ReplyPayload, info) => {
const isFinal = info.kind === "final";
if (info.kind === "block") {
// Block payloads carry reasoning/thinking content that should not be
// delivered to external channels. Skip them regardless of streamMode.
if (payload.isReasoning) {
// Reasoning/thinking payloads should not be delivered to Discord.
return;
}
if (draftStream && isFinal) {