diff --git a/CHANGELOG.md b/CHANGELOG.md index 47fda0a2858..cc32ac16cd4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -256,6 +256,7 @@ Docs: https://docs.openclaw.ai - Synology Chat/reply delivery: resolve webhook usernames to Chat API `user_id` values for outbound chatbot replies, avoiding mismatches between webhook user IDs and `method=chatbot` recipient IDs in multi-account setups. (#23709) Thanks @druide67. - Slack/thread context payloads: only inject thread starter/history text on first thread turn for new sessions while preserving thread metadata, reducing repeated context-token bloat on long-lived thread sessions. (#32133) Thanks @sourman. - Slack/session routing: keep top-level channel messages in one shared session when `replyToMode=off`, while preserving thread-scoped keys for true thread replies and non-off modes. (#32193) Thanks @bmendonca3. +- Slack/app_mention dedupe race handling: keep seen-message dedupe to prevent duplicate replies while allowing a one-time app_mention retry when the paired message event was dropped pre-dispatch, so requireMention channels do not lose mentions under Slack event reordering. (#34937) Thanks @littleben. - Voice-call/webhook routing: require exact webhook path matches (instead of prefix matches) so lookalike paths cannot reach provider verification/dispatch logic. (#31930) Thanks @afurm. - Zalo/Pairing auth tests: add webhook regression coverage asserting DM pairing-store reads/writes remain account-scoped, preventing cross-account authorization bleed in multi-account setups. (#26121) Thanks @bmendonca3. - Zalouser/Pairing auth tests: add account-scoped DM pairing-store regression coverage (`monitor.account-scope.test.ts`) to prevent cross-account allowlist bleed in multi-account setups. (#26672) Thanks @bmendonca3. diff --git a/src/slack/monitor/message-handler.app-mention-race.test.ts b/src/slack/monitor/message-handler.app-mention-race.test.ts new file mode 100644 index 00000000000..cfb44c8496e --- /dev/null +++ b/src/slack/monitor/message-handler.app-mention-race.test.ts @@ -0,0 +1,157 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const prepareSlackMessageMock = + vi.fn< + (params: { + opts: { source: "message" | "app_mention"; wasMentioned?: boolean }; + }) => Promise + >(); +const dispatchPreparedSlackMessageMock = vi.fn<(prepared: unknown) => Promise>(); + +vi.mock("../../channels/inbound-debounce-policy.js", () => ({ + shouldDebounceTextInbound: () => false, + createChannelInboundDebouncer: (params: { + onFlush: ( + entries: Array<{ + message: Record; + opts: { source: "message" | "app_mention"; wasMentioned?: boolean }; + }>, + ) => Promise; + }) => ({ + debounceMs: 0, + debouncer: { + enqueue: async (entry: { + message: Record; + opts: { source: "message" | "app_mention"; wasMentioned?: boolean }; + }) => { + await params.onFlush([entry]); + }, + flushKey: async (_key: string) => {}, + }, + }), +})); + +vi.mock("./thread-resolution.js", () => ({ + createSlackThreadTsResolver: () => ({ + resolve: async ({ message }: { message: Record }) => message, + }), +})); + +vi.mock("./message-handler/prepare.js", () => ({ + prepareSlackMessage: ( + params: Parameters[0], + ): ReturnType => prepareSlackMessageMock(params), +})); + +vi.mock("./message-handler/dispatch.js", () => ({ + dispatchPreparedSlackMessage: ( + prepared: Parameters[0], + ): ReturnType => + dispatchPreparedSlackMessageMock(prepared), +})); + +import { createSlackMessageHandler } from "./message-handler.js"; + +function createMarkMessageSeen() { + const seen = new Set(); + return (channel: string | undefined, ts: string | undefined) => { + if (!channel || !ts) { + return false; + } + const key = `${channel}:${ts}`; + if (seen.has(key)) { + return true; + } + seen.add(key); + return false; + }; +} + +describe("createSlackMessageHandler app_mention race handling", () => { + beforeEach(() => { + prepareSlackMessageMock.mockReset(); + dispatchPreparedSlackMessageMock.mockReset(); + }); + + it("allows a single app_mention retry when message event was dropped before dispatch", async () => { + prepareSlackMessageMock.mockImplementation(async ({ opts }) => { + if (opts.source === "message") { + return null; + } + return { ctxPayload: {} }; + }); + + const handler = createSlackMessageHandler({ + ctx: { + cfg: {}, + accountId: "default", + app: { client: {} }, + runtime: {}, + markMessageSeen: createMarkMessageSeen(), + } as Parameters[0]["ctx"], + account: { accountId: "default" } as Parameters< + typeof createSlackMessageHandler + >[0]["account"], + }); + + await handler( + { type: "message", channel: "C1", ts: "1700000000.000100", text: "hello" } as never, + { source: "message" }, + ); + await handler( + { + type: "app_mention", + channel: "C1", + ts: "1700000000.000100", + text: "<@U_BOT> hello", + } as never, + { source: "app_mention", wasMentioned: true }, + ); + await handler( + { + type: "app_mention", + channel: "C1", + ts: "1700000000.000100", + text: "<@U_BOT> hello", + } as never, + { source: "app_mention", wasMentioned: true }, + ); + + expect(prepareSlackMessageMock).toHaveBeenCalledTimes(2); + expect(dispatchPreparedSlackMessageMock).toHaveBeenCalledTimes(1); + }); + + it("keeps app_mention deduped when message event already dispatched", async () => { + prepareSlackMessageMock.mockResolvedValue({ ctxPayload: {} }); + + const handler = createSlackMessageHandler({ + ctx: { + cfg: {}, + accountId: "default", + app: { client: {} }, + runtime: {}, + markMessageSeen: createMarkMessageSeen(), + } as Parameters[0]["ctx"], + account: { accountId: "default" } as Parameters< + typeof createSlackMessageHandler + >[0]["account"], + }); + + await handler( + { type: "message", channel: "C1", ts: "1700000000.000200", text: "hello" } as never, + { source: "message" }, + ); + await handler( + { + type: "app_mention", + channel: "C1", + ts: "1700000000.000200", + text: "<@U_BOT> hello", + } as never, + { source: "app_mention", wasMentioned: true }, + ); + + expect(prepareSlackMessageMock).toHaveBeenCalledTimes(1); + expect(dispatchPreparedSlackMessageMock).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/slack/monitor/message-handler.ts b/src/slack/monitor/message-handler.ts index 647c9a62c53..7ad7d792bc1 100644 --- a/src/slack/monitor/message-handler.ts +++ b/src/slack/monitor/message-handler.ts @@ -15,6 +15,8 @@ export type SlackMessageHandler = ( opts: { source: "message" | "app_mention"; wasMentioned?: boolean }, ) => Promise; +const APP_MENTION_RETRY_TTL_MS = 60_000; + function resolveSlackSenderId(message: SlackMessageEvent): string | null { return message.user ?? message.bot_id ?? null; } @@ -51,6 +53,13 @@ function shouldDebounceSlackMessage(message: SlackMessageEvent, cfg: SlackMonito }); } +function buildSeenMessageKey(channelId: string | undefined, ts: string | undefined): string | null { + if (!channelId || !ts) { + return null; + } + return `${channelId}:${ts}`; +} + /** * Build a debounce key that isolates messages by thread (or by message timestamp * for top-level non-DM channel messages). Without per-message scoping, concurrent @@ -133,9 +142,18 @@ export function createSlackMessageHandler(params: { wasMentioned: combinedMentioned || last.opts.wasMentioned, }, }); + const seenMessageKey = buildSeenMessageKey(last.message.channel, last.message.ts); if (!prepared) { + const hasMessageSource = entries.some((entry) => entry.opts.source === "message"); + const hasAppMentionSource = entries.some((entry) => entry.opts.source === "app_mention"); + if (seenMessageKey && hasMessageSource && !hasAppMentionSource) { + rememberAppMentionRetryKey(seenMessageKey); + } return; } + if (seenMessageKey) { + appMentionRetryKeys.delete(seenMessageKey); + } if (entries.length > 1) { const ids = entries.map((entry) => entry.message.ts).filter(Boolean) as string[]; if (ids.length > 0) { @@ -152,6 +170,31 @@ export function createSlackMessageHandler(params: { }); const threadTsResolver = createSlackThreadTsResolver({ client: ctx.app.client }); const pendingTopLevelDebounceKeys = new Map>(); + const appMentionRetryKeys = new Map(); + + const pruneAppMentionRetryKeys = (now: number) => { + for (const [key, expiresAt] of appMentionRetryKeys) { + if (expiresAt <= now) { + appMentionRetryKeys.delete(key); + } + } + }; + + const rememberAppMentionRetryKey = (key: string) => { + const now = Date.now(); + pruneAppMentionRetryKeys(now); + appMentionRetryKeys.set(key, now + APP_MENTION_RETRY_TTL_MS); + }; + + const consumeAppMentionRetryKey = (key: string) => { + const now = Date.now(); + pruneAppMentionRetryKeys(now); + if (!appMentionRetryKeys.has(key)) { + return false; + } + appMentionRetryKeys.delete(key); + return true; + }; return async (message, opts) => { if (opts.source === "message" && message.type !== "message") { @@ -165,8 +208,13 @@ export function createSlackMessageHandler(params: { ) { return; } - if (ctx.markMessageSeen(message.channel, message.ts)) { - return; + const seenMessageKey = buildSeenMessageKey(message.channel, message.ts); + if (seenMessageKey && ctx.markMessageSeen(message.channel, message.ts)) { + // Allow exactly one app_mention retry if the same ts was previously dropped + // from the message stream before it reached dispatch. + if (opts.source !== "app_mention" || !consumeAppMentionRetryKey(seenMessageKey)) { + return; + } } trackEvent?.(); const resolvedMessage = await threadTsResolver.resolve({ message, source: opts.source });