diff --git a/CHANGELOG.md b/CHANGELOG.md index 62beb85b694..914f5db6c97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,9 @@ Docs: https://docs.openclaw.ai ### Fixes +- Discord/Block streaming: restore block-streamed reply delivery by suppressing only reasoning payloads (instead of all `block` payloads), fixing missing Discord replies in `channels.discord.streaming=block` mode. (#25839, #25836, #25792) Thanks @pewallin. +- Matrix/Read receipts: send read receipts as soon as Matrix messages arrive (before handler pipeline work), so clients no longer show long-lived unread/sent states while replies are processing. (#25841, #25840) Thanks @joshjhall. +- Sandbox/FS bridge: build canonical-path shell scripts with newline separators (not `; ` joins) to avoid POSIX `sh` `do;` syntax errors that broke sandbox file/image read-write operations. (#25737, #25824, #25868) Thanks @DennisGoldfinger and @peteragility. - Routing/Session isolation: harden followup routing so explicit cross-channel origin replies never fall back to the active dispatcher on route failure, preserve queued overflow summary routing metadata (`channel`/`to`/`thread`) across followup drain, and prefer originating channel context over internal provider tags for embedded followup runs. This prevents webchat/control-ui context from hijacking Discord-targeted replies in shared sessions. (#25864) Thanks @Gamedesigner. - Messaging tool dedupe: treat originating channel metadata as authoritative for same-target `message.send` suppression in proactive runs (heartbeat/cron/exec-event), including synthetic-provider contexts, so `delivery-mirror` transcript entries no longer cause duplicate Telegram sends. (#25835) Thanks @jadeathena84-arch. - Cron/Heartbeat delivery: stop inheriting cached session `lastThreadId` for heartbeat-mode target resolution unless a thread/topic is explicitly requested, so announce-mode cron and heartbeat deliveries stay on top-level destinations instead of leaking into active conversation threads. (#25730) Thanks @markshields-tl. diff --git a/extensions/matrix/src/matrix/monitor/events.test.ts b/extensions/matrix/src/matrix/monitor/events.test.ts new file mode 100644 index 00000000000..dbd2245046d --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/events.test.ts @@ -0,0 +1,96 @@ +import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; +import type { PluginRuntime, RuntimeLogger } from "openclaw/plugin-sdk"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { MatrixAuth } from "../client.js"; +import { registerMatrixMonitorEvents } from "./events.js"; +import type { MatrixRawEvent } from "./types.js"; + +const sendReadReceiptMatrixMock = vi.hoisted(() => vi.fn().mockResolvedValue(undefined)); + +vi.mock("../send.js", () => ({ + sendReadReceiptMatrix: (...args: unknown[]) => sendReadReceiptMatrixMock(...args), +})); + +describe("registerMatrixMonitorEvents", () => { + beforeEach(() => { + sendReadReceiptMatrixMock.mockClear(); + }); + + function createHarness() { + const handlers = new Map void>(); + const client = { + on: vi.fn((event: string, handler: (...args: unknown[]) => void) => { + handlers.set(event, handler); + }), + getUserId: vi.fn().mockResolvedValue("@bot:example.org"), + crypto: undefined, + } as unknown as MatrixClient; + + const onRoomMessage = vi.fn(); + const logVerboseMessage = vi.fn(); + const logger = { + warn: vi.fn(), + } as unknown as RuntimeLogger; + + registerMatrixMonitorEvents({ + client, + auth: { encryption: false } as MatrixAuth, + logVerboseMessage, + warnedEncryptedRooms: new Set(), + warnedCryptoMissingRooms: new Set(), + logger, + formatNativeDependencyHint: (() => + "") as PluginRuntime["system"]["formatNativeDependencyHint"], + onRoomMessage, + }); + + const roomMessageHandler = handlers.get("room.message"); + if (!roomMessageHandler) { + throw new Error("missing room.message handler"); + } + + return { client, onRoomMessage, roomMessageHandler }; + } + + it("sends read receipt immediately for non-self messages", async () => { + const { client, onRoomMessage, roomMessageHandler } = createHarness(); + const event = { + event_id: "$e1", + sender: "@alice:example.org", + } as MatrixRawEvent; + + roomMessageHandler("!room:example.org", event); + + expect(onRoomMessage).toHaveBeenCalledWith("!room:example.org", event); + await vi.waitFor(() => { + expect(sendReadReceiptMatrixMock).toHaveBeenCalledWith("!room:example.org", "$e1", client); + }); + }); + + it("does not send read receipts for self messages", async () => { + const { onRoomMessage, roomMessageHandler } = createHarness(); + const event = { + event_id: "$e2", + sender: "@bot:example.org", + } as MatrixRawEvent; + + roomMessageHandler("!room:example.org", event); + await vi.waitFor(() => { + expect(onRoomMessage).toHaveBeenCalledWith("!room:example.org", event); + }); + expect(sendReadReceiptMatrixMock).not.toHaveBeenCalled(); + }); + + it("skips receipt when message lacks sender or event id", async () => { + const { onRoomMessage, roomMessageHandler } = createHarness(); + const event = { + sender: "@alice:example.org", + } as MatrixRawEvent; + + roomMessageHandler("!room:example.org", event); + await vi.waitFor(() => { + expect(onRoomMessage).toHaveBeenCalledWith("!room:example.org", event); + }); + expect(sendReadReceiptMatrixMock).not.toHaveBeenCalled(); + }); +}); diff --git a/extensions/matrix/src/matrix/monitor/events.ts b/extensions/matrix/src/matrix/monitor/events.ts index 60bbe574add..ab548ef18c2 100644 --- a/extensions/matrix/src/matrix/monitor/events.ts +++ b/extensions/matrix/src/matrix/monitor/events.ts @@ -1,6 +1,7 @@ import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; import type { PluginRuntime, RuntimeLogger } from "openclaw/plugin-sdk"; import type { MatrixAuth } from "../client.js"; +import { sendReadReceiptMatrix } from "../send.js"; import type { MatrixRawEvent } from "./types.js"; import { EventType } from "./types.js"; @@ -25,7 +26,32 @@ export function registerMatrixMonitorEvents(params: { onRoomMessage, } = params; - client.on("room.message", onRoomMessage); + let selfUserId: string | undefined; + client.on("room.message", (roomId: string, event: MatrixRawEvent) => { + const eventId = event?.event_id; + const senderId = event?.sender; + if (eventId && senderId) { + void (async () => { + if (!selfUserId) { + try { + selfUserId = await client.getUserId(); + } catch { + return; + } + } + if (senderId === selfUserId) { + return; + } + await sendReadReceiptMatrix(roomId, eventId, client).catch((err) => { + logVerboseMessage( + `matrix: early read receipt failed room=${roomId} id=${eventId}: ${String(err)}`, + ); + }); + })(); + } + + onRoomMessage(roomId, event); + }); client.on("room.encrypted_event", (roomId: string, event: MatrixRawEvent) => { const eventId = event?.event_id ?? "unknown"; diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index d884879001e..c1df46fec2a 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -18,12 +18,7 @@ import { parsePollStartContent, type PollStartContent, } from "../poll-types.js"; -import { - reactMatrixMessage, - sendMessageMatrix, - sendReadReceiptMatrix, - sendTypingMatrix, -} from "../send.js"; +import { reactMatrixMessage, sendMessageMatrix, sendTypingMatrix } from "../send.js"; import { normalizeMatrixAllowList, resolveMatrixAllowListMatch, @@ -602,14 +597,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam return; } - if (messageId) { - sendReadReceiptMatrix(roomId, messageId, client).catch((err) => { - logVerboseMessage( - `matrix: read receipt failed room=${roomId} id=${messageId}: ${String(err)}`, - ); - }); - } - let didSendReply = false; const tableMode = core.channel.text.resolveMarkdownTableMode({ cfg, diff --git a/extensions/matrix/src/matrix/send-queue.test.ts b/extensions/matrix/src/matrix/send-queue.test.ts new file mode 100644 index 00000000000..34e6e30166c --- /dev/null +++ b/extensions/matrix/src/matrix/send-queue.test.ts @@ -0,0 +1,89 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { enqueueSend } from "./send-queue.js"; + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +describe("enqueueSend", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("serializes sends per room", async () => { + const gate = deferred(); + const events: string[] = []; + + const first = enqueueSend("!room:example.org", async () => { + events.push("start1"); + await gate.promise; + events.push("end1"); + return "one"; + }); + const second = enqueueSend("!room:example.org", async () => { + events.push("start2"); + events.push("end2"); + return "two"; + }); + + await vi.advanceTimersByTimeAsync(150); + expect(events).toEqual(["start1"]); + + await vi.advanceTimersByTimeAsync(300); + expect(events).toEqual(["start1"]); + + gate.resolve(); + await first; + await vi.advanceTimersByTimeAsync(149); + expect(events).toEqual(["start1", "end1"]); + await vi.advanceTimersByTimeAsync(1); + await second; + expect(events).toEqual(["start1", "end1", "start2", "end2"]); + }); + + it("does not serialize across different rooms", async () => { + const events: string[] = []; + + const a = enqueueSend("!a:example.org", async () => { + events.push("a"); + return "a"; + }); + const b = enqueueSend("!b:example.org", async () => { + events.push("b"); + return "b"; + }); + + await vi.advanceTimersByTimeAsync(150); + await Promise.all([a, b]); + expect(events.sort()).toEqual(["a", "b"]); + }); + + it("continues queue after failures", async () => { + const first = enqueueSend("!room:example.org", async () => { + throw new Error("boom"); + }).then( + () => ({ ok: true as const }), + (error) => ({ ok: false as const, error }), + ); + + await vi.advanceTimersByTimeAsync(150); + const firstResult = await first; + expect(firstResult.ok).toBe(false); + expect(firstResult.error).toBeInstanceOf(Error); + expect((firstResult.error as Error).message).toBe("boom"); + + const second = enqueueSend("!room:example.org", async () => "ok"); + await vi.advanceTimersByTimeAsync(150); + await expect(second).resolves.toBe("ok"); + }); +}); diff --git a/extensions/matrix/src/matrix/send-queue.ts b/extensions/matrix/src/matrix/send-queue.ts new file mode 100644 index 00000000000..0d5e43b40e2 --- /dev/null +++ b/extensions/matrix/src/matrix/send-queue.ts @@ -0,0 +1,33 @@ +const SEND_GAP_MS = 150; + +// Serialize sends per room to preserve Matrix delivery order. +const roomQueues = new Map>(); + +export async function enqueueSend(roomId: string, fn: () => Promise): Promise { + const previous = roomQueues.get(roomId) ?? Promise.resolve(); + + const next = previous + .catch(() => {}) + .then(async () => { + await delay(SEND_GAP_MS); + return await fn(); + }); + + const queueMarker = next.then( + () => {}, + () => {}, + ); + roomQueues.set(roomId, queueMarker); + + queueMarker.finally(() => { + if (roomQueues.get(roomId) === queueMarker) { + roomQueues.delete(roomId); + } + }); + + return await next; +} + +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/extensions/matrix/src/matrix/send.ts b/extensions/matrix/src/matrix/send.ts index b531b55dcda..dd72ec2883b 100644 --- a/extensions/matrix/src/matrix/send.ts +++ b/extensions/matrix/src/matrix/send.ts @@ -2,6 +2,7 @@ import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; import type { PollInput } from "openclaw/plugin-sdk"; import { getMatrixRuntime } from "../runtime.js"; import { buildPollStartContent, M_POLL_START } from "./poll-types.js"; +import { enqueueSend } from "./send-queue.js"; import { resolveMatrixClient, resolveMediaMaxBytes } from "./send/client.js"; import { buildReplyRelation, @@ -49,103 +50,105 @@ export async function sendMessageMatrix( }); try { const roomId = await resolveMatrixRoomId(client, to); - const cfg = getCore().config.loadConfig(); - const tableMode = getCore().channel.text.resolveMarkdownTableMode({ - cfg, - channel: "matrix", - accountId: opts.accountId, - }); - const convertedMessage = getCore().channel.text.convertMarkdownTables( - trimmedMessage, - tableMode, - ); - const textLimit = getCore().channel.text.resolveTextChunkLimit(cfg, "matrix"); - const chunkLimit = Math.min(textLimit, MATRIX_TEXT_LIMIT); - const chunkMode = getCore().channel.text.resolveChunkMode(cfg, "matrix", opts.accountId); - const chunks = getCore().channel.text.chunkMarkdownTextWithMode( - convertedMessage, - chunkLimit, - chunkMode, - ); - const threadId = normalizeThreadId(opts.threadId); - const relation = threadId - ? buildThreadRelation(threadId, opts.replyToId) - : buildReplyRelation(opts.replyToId); - const sendContent = async (content: MatrixOutboundContent) => { - // @vector-im/matrix-bot-sdk uses sendMessage differently - const eventId = await client.sendMessage(roomId, content); - return eventId; - }; + return await enqueueSend(roomId, async () => { + const cfg = getCore().config.loadConfig(); + const tableMode = getCore().channel.text.resolveMarkdownTableMode({ + cfg, + channel: "matrix", + accountId: opts.accountId, + }); + const convertedMessage = getCore().channel.text.convertMarkdownTables( + trimmedMessage, + tableMode, + ); + const textLimit = getCore().channel.text.resolveTextChunkLimit(cfg, "matrix"); + const chunkLimit = Math.min(textLimit, MATRIX_TEXT_LIMIT); + const chunkMode = getCore().channel.text.resolveChunkMode(cfg, "matrix", opts.accountId); + const chunks = getCore().channel.text.chunkMarkdownTextWithMode( + convertedMessage, + chunkLimit, + chunkMode, + ); + const threadId = normalizeThreadId(opts.threadId); + const relation = threadId + ? buildThreadRelation(threadId, opts.replyToId) + : buildReplyRelation(opts.replyToId); + const sendContent = async (content: MatrixOutboundContent) => { + // @vector-im/matrix-bot-sdk uses sendMessage differently + const eventId = await client.sendMessage(roomId, content); + return eventId; + }; - let lastMessageId = ""; - if (opts.mediaUrl) { - const maxBytes = resolveMediaMaxBytes(opts.accountId); - const media = await getCore().media.loadWebMedia(opts.mediaUrl, maxBytes); - const uploaded = await uploadMediaMaybeEncrypted(client, roomId, media.buffer, { - contentType: media.contentType, - filename: media.fileName, - }); - const durationMs = await resolveMediaDurationMs({ - buffer: media.buffer, - contentType: media.contentType, - fileName: media.fileName, - kind: media.kind, - }); - const baseMsgType = resolveMatrixMsgType(media.contentType, media.fileName); - const { useVoice } = resolveMatrixVoiceDecision({ - wantsVoice: opts.audioAsVoice === true, - contentType: media.contentType, - fileName: media.fileName, - }); - const msgtype = useVoice ? MsgType.Audio : baseMsgType; - const isImage = msgtype === MsgType.Image; - const imageInfo = isImage - ? await prepareImageInfo({ buffer: media.buffer, client }) - : undefined; - const [firstChunk, ...rest] = chunks; - const body = useVoice ? "Voice message" : (firstChunk ?? media.fileName ?? "(file)"); - const content = buildMediaContent({ - msgtype, - body, - url: uploaded.url, - file: uploaded.file, - filename: media.fileName, - mimetype: media.contentType, - size: media.buffer.byteLength, - durationMs, - relation, - isVoice: useVoice, - imageInfo, - }); - const eventId = await sendContent(content); - lastMessageId = eventId ?? lastMessageId; - const textChunks = useVoice ? chunks : rest; - const followupRelation = threadId ? relation : undefined; - for (const chunk of textChunks) { - const text = chunk.trim(); - if (!text) { - continue; - } - const followup = buildTextContent(text, followupRelation); - const followupEventId = await sendContent(followup); - lastMessageId = followupEventId ?? lastMessageId; - } - } else { - for (const chunk of chunks.length ? chunks : [""]) { - const text = chunk.trim(); - if (!text) { - continue; - } - const content = buildTextContent(text, relation); + let lastMessageId = ""; + if (opts.mediaUrl) { + const maxBytes = resolveMediaMaxBytes(opts.accountId); + const media = await getCore().media.loadWebMedia(opts.mediaUrl, maxBytes); + const uploaded = await uploadMediaMaybeEncrypted(client, roomId, media.buffer, { + contentType: media.contentType, + filename: media.fileName, + }); + const durationMs = await resolveMediaDurationMs({ + buffer: media.buffer, + contentType: media.contentType, + fileName: media.fileName, + kind: media.kind, + }); + const baseMsgType = resolveMatrixMsgType(media.contentType, media.fileName); + const { useVoice } = resolveMatrixVoiceDecision({ + wantsVoice: opts.audioAsVoice === true, + contentType: media.contentType, + fileName: media.fileName, + }); + const msgtype = useVoice ? MsgType.Audio : baseMsgType; + const isImage = msgtype === MsgType.Image; + const imageInfo = isImage + ? await prepareImageInfo({ buffer: media.buffer, client }) + : undefined; + const [firstChunk, ...rest] = chunks; + const body = useVoice ? "Voice message" : (firstChunk ?? media.fileName ?? "(file)"); + const content = buildMediaContent({ + msgtype, + body, + url: uploaded.url, + file: uploaded.file, + filename: media.fileName, + mimetype: media.contentType, + size: media.buffer.byteLength, + durationMs, + relation, + isVoice: useVoice, + imageInfo, + }); const eventId = await sendContent(content); lastMessageId = eventId ?? lastMessageId; + const textChunks = useVoice ? chunks : rest; + const followupRelation = threadId ? relation : undefined; + for (const chunk of textChunks) { + const text = chunk.trim(); + if (!text) { + continue; + } + const followup = buildTextContent(text, followupRelation); + const followupEventId = await sendContent(followup); + lastMessageId = followupEventId ?? lastMessageId; + } + } else { + for (const chunk of chunks.length ? chunks : [""]) { + const text = chunk.trim(); + if (!text) { + continue; + } + const content = buildTextContent(text, relation); + const eventId = await sendContent(content); + lastMessageId = eventId ?? lastMessageId; + } } - } - return { - messageId: lastMessageId || "unknown", - roomId, - }; + return { + messageId: lastMessageId || "unknown", + roomId, + }; + }); } finally { if (stopOnDone) { client.stop(); diff --git a/src/agents/sandbox/fs-bridge.test.ts b/src/agents/sandbox/fs-bridge.test.ts index f1d72be03b6..982d3cbf6a5 100644 --- a/src/agents/sandbox/fs-bridge.test.ts +++ b/src/agents/sandbox/fs-bridge.test.ts @@ -77,10 +77,22 @@ describe("sandbox fs bridge shell compatibility", () => { const executables = mockedExecDockerRaw.mock.calls.map(([args]) => args[3] ?? ""); expect(executables.every((shell) => shell === "sh")).toBe(true); - expect(scripts.every((script) => script.includes("set -eu;"))).toBe(true); + expect(scripts.every((script) => /set -eu[;\n]/.test(script))).toBe(true); expect(scripts.some((script) => script.includes("pipefail"))).toBe(false); }); + it("resolveCanonicalContainerPath script is valid POSIX sh (no do; token)", async () => { + const bridge = createSandboxFsBridge({ sandbox: createSandbox() }); + + await bridge.readFile({ filePath: "a.txt" }); + + const scripts = mockedExecDockerRaw.mock.calls.map(([args]) => args[5] ?? ""); + const canonicalScript = scripts.find((script) => script.includes("allow_final")); + expect(canonicalScript).toBeDefined(); + // "; " joining can create "do; cmd", which is invalid in POSIX sh. + expect(canonicalScript).not.toMatch(/\bdo;/); + }); + it("resolves bind-mounted absolute container paths for reads", async () => { const sandbox = createSandbox({ docker: { diff --git a/src/agents/sandbox/fs-bridge.ts b/src/agents/sandbox/fs-bridge.ts index fdcaf0cc46c..dee44e1b237 100644 --- a/src/agents/sandbox/fs-bridge.ts +++ b/src/agents/sandbox/fs-bridge.ts @@ -305,7 +305,7 @@ class SandboxFsBridgeImpl implements SandboxFsBridge { "done", 'canonical=$(readlink -f -- "$cursor")', 'printf "%s%s\\n" "$canonical" "$suffix"', - ].join("; "); + ].join("\n"); const result = await this.runCommand(script, { args: [params.containerPath, params.allowFinalSymlink ? "1" : "0"], }); diff --git a/src/discord/monitor/message-handler.process.test.ts b/src/discord/monitor/message-handler.process.test.ts index 482f61cfc3f..79af5ffa477 100644 --- a/src/discord/monitor/message-handler.process.test.ts +++ b/src/discord/monitor/message-handler.process.test.ts @@ -31,7 +31,10 @@ const deliverDiscordReply = deliveryMocks.deliverDiscordReply; const createDiscordDraftStream = deliveryMocks.createDiscordDraftStream; type DispatchInboundParams = { dispatcher: { - sendBlockReply: (payload: { text?: string }) => boolean | Promise; + sendBlockReply: (payload: { + text?: string; + isReasoning?: boolean; + }) => boolean | Promise; sendFinalReply: (payload: { text?: string }) => boolean | Promise; }; replyOptions?: { @@ -427,9 +430,9 @@ describe("processDiscordMessage draft streaming", () => { expect(deliverDiscordReply).toHaveBeenCalledTimes(1); }); - it("suppresses block-kind payload delivery to Discord", async () => { + it("suppresses reasoning payload delivery to Discord", async () => { dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { - await params?.dispatcher.sendBlockReply({ text: "thinking..." }); + await params?.dispatcher.sendBlockReply({ text: "thinking...", isReasoning: true }); return { queuedFinal: false, counts: { final: 0, tool: 0, block: 1 } }; }); @@ -441,6 +444,20 @@ describe("processDiscordMessage draft streaming", () => { expect(deliverDiscordReply).not.toHaveBeenCalled(); }); + it("delivers non-reasoning block payloads to Discord", async () => { + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.dispatcher.sendBlockReply({ text: "hello from block stream" }); + return { queuedFinal: false, counts: { final: 0, tool: 0, block: 1 } }; + }); + + const ctx = await createBaseContext({ discordConfig: { streamMode: "off" } }); + + // oxlint-disable-next-line typescript/no-explicit-any + await processDiscordMessage(ctx as any); + + expect(deliverDiscordReply).toHaveBeenCalledTimes(1); + }); + it("streams block previews using draft chunking", async () => { const draftStream = createMockDraftStream(); createDiscordDraftStream.mockReturnValueOnce(draftStream); diff --git a/src/discord/monitor/message-handler.process.ts b/src/discord/monitor/message-handler.process.ts index 60966cff3cc..4dd357d656f 100644 --- a/src/discord/monitor/message-handler.process.ts +++ b/src/discord/monitor/message-handler.process.ts @@ -564,9 +564,8 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) humanDelay: resolveHumanDelayConfig(cfg, route.agentId), deliver: async (payload: ReplyPayload, info) => { const isFinal = info.kind === "final"; - if (info.kind === "block") { - // Block payloads carry reasoning/thinking content that should not be - // delivered to external channels. Skip them regardless of streamMode. + if (payload.isReasoning) { + // Reasoning/thinking payloads should not be delivered to Discord. return; } if (draftStream && isFinal) {