mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-08 06:54:24 +00:00
fix(slack): preserve dedupe while recovering dropped app_mention (#34937)
This PR fixes Slack mention loss without reintroducing duplicate dispatches. - Preserve seen-message dedupe at ingress to prevent duplicate processing. - Allow a one-time app_mention retry only when the paired message event was previously dropped before dispatch. - Add targeted race tests for both recovery and duplicate-prevention paths. Co-authored-by: littleben <1573829+littleben@users.noreply.github.com> Co-authored-by: OpenClaw Agent <agent@openclaw.ai> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -256,6 +256,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Synology Chat/reply delivery: resolve webhook usernames to Chat API `user_id` values for outbound chatbot replies, avoiding mismatches between webhook user IDs and `method=chatbot` recipient IDs in multi-account setups. (#23709) Thanks @druide67.
|
||||
- Slack/thread context payloads: only inject thread starter/history text on first thread turn for new sessions while preserving thread metadata, reducing repeated context-token bloat on long-lived thread sessions. (#32133) Thanks @sourman.
|
||||
- Slack/session routing: keep top-level channel messages in one shared session when `replyToMode=off`, while preserving thread-scoped keys for true thread replies and non-off modes. (#32193) Thanks @bmendonca3.
|
||||
- Slack/app_mention dedupe race handling: keep seen-message dedupe to prevent duplicate replies while allowing a one-time app_mention retry when the paired message event was dropped pre-dispatch, so requireMention channels do not lose mentions under Slack event reordering. (#34937) Thanks @littleben.
|
||||
- Voice-call/webhook routing: require exact webhook path matches (instead of prefix matches) so lookalike paths cannot reach provider verification/dispatch logic. (#31930) Thanks @afurm.
|
||||
- Zalo/Pairing auth tests: add webhook regression coverage asserting DM pairing-store reads/writes remain account-scoped, preventing cross-account authorization bleed in multi-account setups. (#26121) Thanks @bmendonca3.
|
||||
- Zalouser/Pairing auth tests: add account-scoped DM pairing-store regression coverage (`monitor.account-scope.test.ts`) to prevent cross-account allowlist bleed in multi-account setups. (#26672) Thanks @bmendonca3.
|
||||
|
||||
157
src/slack/monitor/message-handler.app-mention-race.test.ts
Normal file
157
src/slack/monitor/message-handler.app-mention-race.test.ts
Normal file
@@ -0,0 +1,157 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const prepareSlackMessageMock =
|
||||
vi.fn<
|
||||
(params: {
|
||||
opts: { source: "message" | "app_mention"; wasMentioned?: boolean };
|
||||
}) => Promise<unknown>
|
||||
>();
|
||||
const dispatchPreparedSlackMessageMock = vi.fn<(prepared: unknown) => Promise<void>>();
|
||||
|
||||
vi.mock("../../channels/inbound-debounce-policy.js", () => ({
|
||||
shouldDebounceTextInbound: () => false,
|
||||
createChannelInboundDebouncer: (params: {
|
||||
onFlush: (
|
||||
entries: Array<{
|
||||
message: Record<string, unknown>;
|
||||
opts: { source: "message" | "app_mention"; wasMentioned?: boolean };
|
||||
}>,
|
||||
) => Promise<void>;
|
||||
}) => ({
|
||||
debounceMs: 0,
|
||||
debouncer: {
|
||||
enqueue: async (entry: {
|
||||
message: Record<string, unknown>;
|
||||
opts: { source: "message" | "app_mention"; wasMentioned?: boolean };
|
||||
}) => {
|
||||
await params.onFlush([entry]);
|
||||
},
|
||||
flushKey: async (_key: string) => {},
|
||||
},
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("./thread-resolution.js", () => ({
|
||||
createSlackThreadTsResolver: () => ({
|
||||
resolve: async ({ message }: { message: Record<string, unknown> }) => message,
|
||||
}),
|
||||
}));
|
||||
|
||||
vi.mock("./message-handler/prepare.js", () => ({
|
||||
prepareSlackMessage: (
|
||||
params: Parameters<typeof prepareSlackMessageMock>[0],
|
||||
): ReturnType<typeof prepareSlackMessageMock> => prepareSlackMessageMock(params),
|
||||
}));
|
||||
|
||||
vi.mock("./message-handler/dispatch.js", () => ({
|
||||
dispatchPreparedSlackMessage: (
|
||||
prepared: Parameters<typeof dispatchPreparedSlackMessageMock>[0],
|
||||
): ReturnType<typeof dispatchPreparedSlackMessageMock> =>
|
||||
dispatchPreparedSlackMessageMock(prepared),
|
||||
}));
|
||||
|
||||
import { createSlackMessageHandler } from "./message-handler.js";
|
||||
|
||||
function createMarkMessageSeen() {
|
||||
const seen = new Set<string>();
|
||||
return (channel: string | undefined, ts: string | undefined) => {
|
||||
if (!channel || !ts) {
|
||||
return false;
|
||||
}
|
||||
const key = `${channel}:${ts}`;
|
||||
if (seen.has(key)) {
|
||||
return true;
|
||||
}
|
||||
seen.add(key);
|
||||
return false;
|
||||
};
|
||||
}
|
||||
|
||||
describe("createSlackMessageHandler app_mention race handling", () => {
|
||||
beforeEach(() => {
|
||||
prepareSlackMessageMock.mockReset();
|
||||
dispatchPreparedSlackMessageMock.mockReset();
|
||||
});
|
||||
|
||||
it("allows a single app_mention retry when message event was dropped before dispatch", async () => {
|
||||
prepareSlackMessageMock.mockImplementation(async ({ opts }) => {
|
||||
if (opts.source === "message") {
|
||||
return null;
|
||||
}
|
||||
return { ctxPayload: {} };
|
||||
});
|
||||
|
||||
const handler = createSlackMessageHandler({
|
||||
ctx: {
|
||||
cfg: {},
|
||||
accountId: "default",
|
||||
app: { client: {} },
|
||||
runtime: {},
|
||||
markMessageSeen: createMarkMessageSeen(),
|
||||
} as Parameters<typeof createSlackMessageHandler>[0]["ctx"],
|
||||
account: { accountId: "default" } as Parameters<
|
||||
typeof createSlackMessageHandler
|
||||
>[0]["account"],
|
||||
});
|
||||
|
||||
await handler(
|
||||
{ type: "message", channel: "C1", ts: "1700000000.000100", text: "hello" } as never,
|
||||
{ source: "message" },
|
||||
);
|
||||
await handler(
|
||||
{
|
||||
type: "app_mention",
|
||||
channel: "C1",
|
||||
ts: "1700000000.000100",
|
||||
text: "<@U_BOT> hello",
|
||||
} as never,
|
||||
{ source: "app_mention", wasMentioned: true },
|
||||
);
|
||||
await handler(
|
||||
{
|
||||
type: "app_mention",
|
||||
channel: "C1",
|
||||
ts: "1700000000.000100",
|
||||
text: "<@U_BOT> hello",
|
||||
} as never,
|
||||
{ source: "app_mention", wasMentioned: true },
|
||||
);
|
||||
|
||||
expect(prepareSlackMessageMock).toHaveBeenCalledTimes(2);
|
||||
expect(dispatchPreparedSlackMessageMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("keeps app_mention deduped when message event already dispatched", async () => {
|
||||
prepareSlackMessageMock.mockResolvedValue({ ctxPayload: {} });
|
||||
|
||||
const handler = createSlackMessageHandler({
|
||||
ctx: {
|
||||
cfg: {},
|
||||
accountId: "default",
|
||||
app: { client: {} },
|
||||
runtime: {},
|
||||
markMessageSeen: createMarkMessageSeen(),
|
||||
} as Parameters<typeof createSlackMessageHandler>[0]["ctx"],
|
||||
account: { accountId: "default" } as Parameters<
|
||||
typeof createSlackMessageHandler
|
||||
>[0]["account"],
|
||||
});
|
||||
|
||||
await handler(
|
||||
{ type: "message", channel: "C1", ts: "1700000000.000200", text: "hello" } as never,
|
||||
{ source: "message" },
|
||||
);
|
||||
await handler(
|
||||
{
|
||||
type: "app_mention",
|
||||
channel: "C1",
|
||||
ts: "1700000000.000200",
|
||||
text: "<@U_BOT> hello",
|
||||
} as never,
|
||||
{ source: "app_mention", wasMentioned: true },
|
||||
);
|
||||
|
||||
expect(prepareSlackMessageMock).toHaveBeenCalledTimes(1);
|
||||
expect(dispatchPreparedSlackMessageMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
@@ -15,6 +15,8 @@ export type SlackMessageHandler = (
|
||||
opts: { source: "message" | "app_mention"; wasMentioned?: boolean },
|
||||
) => Promise<void>;
|
||||
|
||||
const APP_MENTION_RETRY_TTL_MS = 60_000;
|
||||
|
||||
function resolveSlackSenderId(message: SlackMessageEvent): string | null {
|
||||
return message.user ?? message.bot_id ?? null;
|
||||
}
|
||||
@@ -51,6 +53,13 @@ function shouldDebounceSlackMessage(message: SlackMessageEvent, cfg: SlackMonito
|
||||
});
|
||||
}
|
||||
|
||||
function buildSeenMessageKey(channelId: string | undefined, ts: string | undefined): string | null {
|
||||
if (!channelId || !ts) {
|
||||
return null;
|
||||
}
|
||||
return `${channelId}:${ts}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a debounce key that isolates messages by thread (or by message timestamp
|
||||
* for top-level non-DM channel messages). Without per-message scoping, concurrent
|
||||
@@ -133,9 +142,18 @@ export function createSlackMessageHandler(params: {
|
||||
wasMentioned: combinedMentioned || last.opts.wasMentioned,
|
||||
},
|
||||
});
|
||||
const seenMessageKey = buildSeenMessageKey(last.message.channel, last.message.ts);
|
||||
if (!prepared) {
|
||||
const hasMessageSource = entries.some((entry) => entry.opts.source === "message");
|
||||
const hasAppMentionSource = entries.some((entry) => entry.opts.source === "app_mention");
|
||||
if (seenMessageKey && hasMessageSource && !hasAppMentionSource) {
|
||||
rememberAppMentionRetryKey(seenMessageKey);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (seenMessageKey) {
|
||||
appMentionRetryKeys.delete(seenMessageKey);
|
||||
}
|
||||
if (entries.length > 1) {
|
||||
const ids = entries.map((entry) => entry.message.ts).filter(Boolean) as string[];
|
||||
if (ids.length > 0) {
|
||||
@@ -152,6 +170,31 @@ export function createSlackMessageHandler(params: {
|
||||
});
|
||||
const threadTsResolver = createSlackThreadTsResolver({ client: ctx.app.client });
|
||||
const pendingTopLevelDebounceKeys = new Map<string, Set<string>>();
|
||||
const appMentionRetryKeys = new Map<string, number>();
|
||||
|
||||
const pruneAppMentionRetryKeys = (now: number) => {
|
||||
for (const [key, expiresAt] of appMentionRetryKeys) {
|
||||
if (expiresAt <= now) {
|
||||
appMentionRetryKeys.delete(key);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const rememberAppMentionRetryKey = (key: string) => {
|
||||
const now = Date.now();
|
||||
pruneAppMentionRetryKeys(now);
|
||||
appMentionRetryKeys.set(key, now + APP_MENTION_RETRY_TTL_MS);
|
||||
};
|
||||
|
||||
const consumeAppMentionRetryKey = (key: string) => {
|
||||
const now = Date.now();
|
||||
pruneAppMentionRetryKeys(now);
|
||||
if (!appMentionRetryKeys.has(key)) {
|
||||
return false;
|
||||
}
|
||||
appMentionRetryKeys.delete(key);
|
||||
return true;
|
||||
};
|
||||
|
||||
return async (message, opts) => {
|
||||
if (opts.source === "message" && message.type !== "message") {
|
||||
@@ -165,8 +208,13 @@ export function createSlackMessageHandler(params: {
|
||||
) {
|
||||
return;
|
||||
}
|
||||
if (ctx.markMessageSeen(message.channel, message.ts)) {
|
||||
return;
|
||||
const seenMessageKey = buildSeenMessageKey(message.channel, message.ts);
|
||||
if (seenMessageKey && ctx.markMessageSeen(message.channel, message.ts)) {
|
||||
// Allow exactly one app_mention retry if the same ts was previously dropped
|
||||
// from the message stream before it reached dispatch.
|
||||
if (opts.source !== "app_mention" || !consumeAppMentionRetryKey(seenMessageKey)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
trackEvent?.();
|
||||
const resolvedMessage = await threadTsResolver.resolve({ message, source: opts.source });
|
||||
|
||||
Reference in New Issue
Block a user