diff --git a/src/channels/plugins/outbound/direct-text-media.ts b/src/channels/plugins/outbound/direct-text-media.ts index 32e4ed5e5aa..3949963dfe8 100644 --- a/src/channels/plugins/outbound/direct-text-media.ts +++ b/src/channels/plugins/outbound/direct-text-media.ts @@ -20,6 +20,51 @@ type DirectSendFn, TResult extends DirectS opts: TOpts, ) => Promise; +type SendPayloadContext = Parameters>[0]; +type SendPayloadResult = Awaited>>; +type SendPayloadAdapter = Pick< + ChannelOutboundAdapter, + "sendMedia" | "sendText" | "chunker" | "textChunkLimit" +>; + +export async function sendTextMediaPayload(params: { + channel: string; + ctx: SendPayloadContext; + adapter: SendPayloadAdapter; +}): Promise { + const text = params.ctx.payload.text ?? ""; + const urls = params.ctx.payload.mediaUrls?.length + ? params.ctx.payload.mediaUrls + : params.ctx.payload.mediaUrl + ? [params.ctx.payload.mediaUrl] + : []; + if (!text && urls.length === 0) { + return { channel: params.channel, messageId: "" }; + } + if (urls.length > 0) { + let lastResult = await params.adapter.sendMedia!({ + ...params.ctx, + text, + mediaUrl: urls[0], + }); + for (let i = 1; i < urls.length; i++) { + lastResult = await params.adapter.sendMedia!({ + ...params.ctx, + text: "", + mediaUrl: urls[i], + }); + } + return lastResult; + } + const limit = params.adapter.textChunkLimit; + const chunks = limit && params.adapter.chunker ? params.adapter.chunker(text, limit) : [text]; + let lastResult: Awaited>>; + for (const chunk of chunks) { + lastResult = await params.adapter.sendText!({ ...params.ctx, text: chunk }); + } + return lastResult!; +} + export function resolveScopedChannelMediaMaxBytes(params: { cfg: OpenClawConfig; accountId?: string | null; @@ -91,39 +136,8 @@ export function createDirectTextMediaOutbound< chunker: chunkText, chunkerMode: "text", textChunkLimit: 4000, - sendPayload: async (ctx) => { - const text = ctx.payload.text ?? ""; - const urls = ctx.payload.mediaUrls?.length - ? ctx.payload.mediaUrls - : ctx.payload.mediaUrl - ? [ctx.payload.mediaUrl] - : []; - if (!text && urls.length === 0) { - return { channel: params.channel, messageId: "" }; - } - if (urls.length > 0) { - let lastResult = await outbound.sendMedia!({ - ...ctx, - text, - mediaUrl: urls[0], - }); - for (let i = 1; i < urls.length; i++) { - lastResult = await outbound.sendMedia!({ - ...ctx, - text: "", - mediaUrl: urls[i], - }); - } - return lastResult; - } - const limit = outbound.textChunkLimit; - const chunks = limit && outbound.chunker ? outbound.chunker(text, limit) : [text]; - let lastResult: Awaited>>; - for (const chunk of chunks) { - lastResult = await outbound.sendText!({ ...ctx, text: chunk }); - } - return lastResult!; - }, + sendPayload: async (ctx) => + await sendTextMediaPayload({ channel: params.channel, ctx, adapter: outbound }), sendText: async ({ cfg, to, text, accountId, deps, replyToId }) => { return await sendDirect({ cfg, diff --git a/src/channels/plugins/outbound/discord.ts b/src/channels/plugins/outbound/discord.ts index 9c416c590bb..4f959d23e38 100644 --- a/src/channels/plugins/outbound/discord.ts +++ b/src/channels/plugins/outbound/discord.ts @@ -10,6 +10,7 @@ import { import type { OutboundIdentity } from "../../../infra/outbound/identity.js"; import { normalizeDiscordOutboundTarget } from "../normalize/discord.js"; import type { ChannelOutboundAdapter } from "../types.js"; +import { sendTextMediaPayload } from "./direct-text-media.js"; function resolveDiscordOutboundTarget(params: { to: string; @@ -80,39 +81,8 @@ export const discordOutbound: ChannelOutboundAdapter = { textChunkLimit: 2000, pollMaxOptions: 10, resolveTarget: ({ to }) => normalizeDiscordOutboundTarget(to), - sendPayload: async (ctx) => { - const text = ctx.payload.text ?? ""; - const urls = ctx.payload.mediaUrls?.length - ? ctx.payload.mediaUrls - : ctx.payload.mediaUrl - ? [ctx.payload.mediaUrl] - : []; - if (!text && urls.length === 0) { - return { channel: "discord", messageId: "" }; - } - if (urls.length > 0) { - let lastResult = await discordOutbound.sendMedia!({ - ...ctx, - text, - mediaUrl: urls[0], - }); - for (let i = 1; i < urls.length; i++) { - lastResult = await discordOutbound.sendMedia!({ - ...ctx, - text: "", - mediaUrl: urls[i], - }); - } - return lastResult; - } - const limit = discordOutbound.textChunkLimit; - const chunks = limit && discordOutbound.chunker ? discordOutbound.chunker(text, limit) : [text]; - let lastResult: Awaited>>; - for (const chunk of chunks) { - lastResult = await discordOutbound.sendText!({ ...ctx, text: chunk }); - } - return lastResult!; - }, + sendPayload: async (ctx) => + await sendTextMediaPayload({ channel: "discord", ctx, adapter: discordOutbound }), sendText: async ({ to, text, accountId, deps, replyToId, threadId, identity, silent }) => { if (!silent) { const webhookResult = await maybeSendDiscordWebhookText({ diff --git a/src/channels/plugins/outbound/slack.ts b/src/channels/plugins/outbound/slack.ts index 3828eaff3e1..562336776c9 100644 --- a/src/channels/plugins/outbound/slack.ts +++ b/src/channels/plugins/outbound/slack.ts @@ -2,6 +2,7 @@ import type { OutboundIdentity } from "../../../infra/outbound/identity.js"; import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js"; import { sendMessageSlack, type SlackSendIdentity } from "../../../slack/send.js"; import type { ChannelOutboundAdapter } from "../types.js"; +import { sendTextMediaPayload } from "./direct-text-media.js"; function resolveSlackSendIdentity(identity?: OutboundIdentity): SlackSendIdentity | undefined { if (!identity) { @@ -93,39 +94,8 @@ export const slackOutbound: ChannelOutboundAdapter = { deliveryMode: "direct", chunker: null, textChunkLimit: 4000, - sendPayload: async (ctx) => { - const text = ctx.payload.text ?? ""; - const urls = ctx.payload.mediaUrls?.length - ? ctx.payload.mediaUrls - : ctx.payload.mediaUrl - ? [ctx.payload.mediaUrl] - : []; - if (!text && urls.length === 0) { - return { channel: "slack", messageId: "" }; - } - if (urls.length > 0) { - let lastResult = await slackOutbound.sendMedia!({ - ...ctx, - text, - mediaUrl: urls[0], - }); - for (let i = 1; i < urls.length; i++) { - lastResult = await slackOutbound.sendMedia!({ - ...ctx, - text: "", - mediaUrl: urls[i], - }); - } - return lastResult; - } - const limit = slackOutbound.textChunkLimit; - const chunks = limit && slackOutbound.chunker ? slackOutbound.chunker(text, limit) : [text]; - let lastResult: Awaited>>; - for (const chunk of chunks) { - lastResult = await slackOutbound.sendText!({ ...ctx, text: chunk }); - } - return lastResult!; - }, + sendPayload: async (ctx) => + await sendTextMediaPayload({ channel: "slack", ctx, adapter: slackOutbound }), sendText: async ({ to, text, accountId, deps, replyToId, threadId, identity }) => { return await sendSlackOutboundMessage({ to, diff --git a/src/channels/plugins/outbound/whatsapp.ts b/src/channels/plugins/outbound/whatsapp.ts index daa47e3324f..a314b372e70 100644 --- a/src/channels/plugins/outbound/whatsapp.ts +++ b/src/channels/plugins/outbound/whatsapp.ts @@ -3,6 +3,7 @@ import { shouldLogVerbose } from "../../../globals.js"; import { sendPollWhatsApp } from "../../../web/outbound.js"; import { resolveWhatsAppOutboundTarget } from "../../../whatsapp/resolve-outbound-target.js"; import type { ChannelOutboundAdapter } from "../types.js"; +import { sendTextMediaPayload } from "./direct-text-media.js"; export const whatsappOutbound: ChannelOutboundAdapter = { deliveryMode: "gateway", @@ -12,40 +13,8 @@ export const whatsappOutbound: ChannelOutboundAdapter = { pollMaxOptions: 12, resolveTarget: ({ to, allowFrom, mode }) => resolveWhatsAppOutboundTarget({ to, allowFrom, mode }), - sendPayload: async (ctx) => { - const text = ctx.payload.text ?? ""; - const urls = ctx.payload.mediaUrls?.length - ? ctx.payload.mediaUrls - : ctx.payload.mediaUrl - ? [ctx.payload.mediaUrl] - : []; - if (!text && urls.length === 0) { - return { channel: "whatsapp", messageId: "" }; - } - if (urls.length > 0) { - let lastResult = await whatsappOutbound.sendMedia!({ - ...ctx, - text, - mediaUrl: urls[0], - }); - for (let i = 1; i < urls.length; i++) { - lastResult = await whatsappOutbound.sendMedia!({ - ...ctx, - text: "", - mediaUrl: urls[i], - }); - } - return lastResult; - } - const limit = whatsappOutbound.textChunkLimit; - const chunks = - limit && whatsappOutbound.chunker ? whatsappOutbound.chunker(text, limit) : [text]; - let lastResult: Awaited>>; - for (const chunk of chunks) { - lastResult = await whatsappOutbound.sendText!({ ...ctx, text: chunk }); - } - return lastResult!; - }, + sendPayload: async (ctx) => + await sendTextMediaPayload({ channel: "whatsapp", ctx, adapter: whatsappOutbound }), sendText: async ({ to, text, accountId, deps, gifPlayback }) => { const send = deps?.sendWhatsApp ?? (await import("../../../web/outbound.js")).sendMessageWhatsApp; diff --git a/src/discord/monitor.tool-result.test-harness.ts b/src/discord/monitor.tool-result.test-harness.ts index bdea448526b..0d4596b3281 100644 --- a/src/discord/monitor.tool-result.test-harness.ts +++ b/src/discord/monitor.tool-result.test-harness.ts @@ -25,10 +25,18 @@ vi.mock("../auto-reply/dispatch.js", async (importOriginal) => { }; }); -vi.mock("../pairing/pairing-store.js", () => ({ - readChannelAllowFromStore: (...args: unknown[]) => readAllowFromStoreMock(...args), - upsertChannelPairingRequest: (...args: unknown[]) => upsertPairingRequestMock(...args), -})); +function createPairingStoreMocks() { + return { + readChannelAllowFromStore(...args: unknown[]) { + return readAllowFromStoreMock(...args); + }, + upsertChannelPairingRequest(...args: unknown[]) { + return upsertPairingRequestMock(...args); + }, + }; +} + +vi.mock("../pairing/pairing-store.js", () => createPairingStoreMocks()); vi.mock("../config/sessions.js", async (importOriginal) => { const actual = await importOriginal(); diff --git a/src/discord/monitor/listeners.ts b/src/discord/monitor/listeners.ts index e8b1cf40cf9..f9427e2be04 100644 --- a/src/discord/monitor/listeners.ts +++ b/src/discord/monitor/listeners.ts @@ -43,8 +43,12 @@ type DiscordReactionEvent = Parameters[0]; type DiscordReactionListenerParams = { cfg: LoadedConfig; - accountId: string; runtime: RuntimeEnv; + logger: Logger; + onEvent?: () => void; +} & DiscordReactionRoutingParams; + +type DiscordReactionRoutingParams = { botUserId?: string; dmEnabled: boolean; groupDmEnabled: boolean; @@ -54,8 +58,6 @@ type DiscordReactionListenerParams = { groupPolicy: "open" | "allowlist" | "disabled"; allowNameMatching: boolean; guildEntries?: Record; - logger: Logger; - onEvent?: () => void; }; const DISCORD_SLOW_LISTENER_THRESHOLD_MS = 30_000; @@ -315,23 +317,15 @@ async function authorizeDiscordReactionIngress( return { allowed: true }; } -async function handleDiscordReactionEvent(params: { - data: DiscordReactionEvent; - client: Client; - action: "added" | "removed"; - cfg: LoadedConfig; - accountId: string; - botUserId?: string; - dmEnabled: boolean; - groupDmEnabled: boolean; - groupDmChannels: string[]; - dmPolicy: "open" | "pairing" | "allowlist" | "disabled"; - allowFrom: string[]; - groupPolicy: "open" | "allowlist" | "disabled"; - allowNameMatching: boolean; - guildEntries?: Record; - logger: Logger; -}) { +async function handleDiscordReactionEvent( + params: { + data: DiscordReactionEvent; + client: Client; + action: "added" | "removed"; + cfg: LoadedConfig; + logger: Logger; + } & DiscordReactionRoutingParams, +) { try { const { data, client, action, botUserId, guildEntries } = params; if (!("user" in data)) { diff --git a/src/discord/monitor/message-handler.process.test.ts b/src/discord/monitor/message-handler.process.test.ts index 6284509073f..c453a003b17 100644 --- a/src/discord/monitor/message-handler.process.test.ts +++ b/src/discord/monitor/message-handler.process.test.ts @@ -120,6 +120,19 @@ const { processDiscordMessage } = await import("./message-handler.process.js"); const createBaseContext = createBaseDiscordMessageContext; +function mockDispatchSingleBlockReply(payload: { text: string; isReasoning?: boolean }) { + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.dispatcher.sendBlockReply(payload); + return { queuedFinal: false, counts: { final: 0, tool: 0, block: 1 } }; + }); +} + +async function processStreamOffDiscordMessage() { + const ctx = await createBaseContext({ discordConfig: { streamMode: "off" } }); + // oxlint-disable-next-line typescript/no-explicit-any + await processDiscordMessage(ctx as any); +} + beforeEach(() => { vi.useRealTimers(); sendMocks.reactMessageDiscord.mockClear(); @@ -463,15 +476,8 @@ describe("processDiscordMessage draft streaming", () => { }); it("suppresses reasoning payload delivery to Discord", async () => { - dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { - await params?.dispatcher.sendBlockReply({ text: "thinking...", isReasoning: true }); - return { queuedFinal: false, counts: { final: 0, tool: 0, block: 1 } }; - }); - - const ctx = await createBaseContext({ discordConfig: { streamMode: "off" } }); - - // oxlint-disable-next-line typescript/no-explicit-any - await processDiscordMessage(ctx as any); + mockDispatchSingleBlockReply({ text: "thinking...", isReasoning: true }); + await processStreamOffDiscordMessage(); expect(deliverDiscordReply).not.toHaveBeenCalled(); }); @@ -495,15 +501,8 @@ describe("processDiscordMessage draft streaming", () => { }); it("delivers non-reasoning block payloads to Discord", async () => { - dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { - await params?.dispatcher.sendBlockReply({ text: "hello from block stream" }); - return { queuedFinal: false, counts: { final: 0, tool: 0, block: 1 } }; - }); - - const ctx = await createBaseContext({ discordConfig: { streamMode: "off" } }); - - // oxlint-disable-next-line typescript/no-explicit-any - await processDiscordMessage(ctx as any); + mockDispatchSingleBlockReply({ text: "hello from block stream" }); + await processStreamOffDiscordMessage(); expect(deliverDiscordReply).toHaveBeenCalledTimes(1); }); diff --git a/src/discord/monitor/native-command.model-picker.test.ts b/src/discord/monitor/native-command.model-picker.test.ts index 2932dc9dbf5..22d9fd94730 100644 --- a/src/discord/monitor/native-command.model-picker.test.ts +++ b/src/discord/monitor/native-command.model-picker.test.ts @@ -210,8 +210,10 @@ function createBoundThreadBindingManager(params: { targetSessionKey: string; agentId: string; }): ThreadBindingManager { + const baseManager = createNoopThreadBindingManager(params.accountId); + const now = Date.now(); return { - accountId: params.accountId, + ...baseManager, getIdleTimeoutMs: () => 24 * 60 * 60 * 1000, getMaxAgeMs: () => 0, getByThreadId: (threadId: string) => @@ -224,20 +226,12 @@ function createBoundThreadBindingManager(params: { targetSessionKey: params.targetSessionKey, agentId: params.agentId, boundBy: "system", - boundAt: Date.now(), - lastActivityAt: Date.now(), + boundAt: now, + lastActivityAt: now, idleTimeoutMs: 24 * 60 * 60 * 1000, maxAgeMs: 0, } - : undefined, - getBySessionKey: () => undefined, - listBySessionKey: () => [], - listBindings: () => [], - touchThread: () => null, - bindTarget: async () => null, - unbindThread: () => null, - unbindBySessionKey: () => [], - stop: () => {}, + : baseManager.getByThreadId(threadId), }; } diff --git a/src/discord/monitor/provider.test.ts b/src/discord/monitor/provider.test.ts index e41fa45ae76..8e597e8dca6 100644 --- a/src/discord/monitor/provider.test.ts +++ b/src/discord/monitor/provider.test.ts @@ -258,6 +258,14 @@ describe("monitorDiscordProvider", () => { }, }) as OpenClawConfig; + const getConstructedEventQueue = (): { listenerTimeout?: number } | undefined => { + expect(clientConstructorOptionsMock).toHaveBeenCalledTimes(1); + const opts = clientConstructorOptionsMock.mock.calls[0]?.[0] as { + eventQueue?: { listenerTimeout?: number }; + }; + return opts.eventQueue; + }; + beforeEach(() => { clientConstructorOptionsMock.mockClear(); clientFetchUserMock.mockClear().mockResolvedValue({ id: "bot-1" }); @@ -349,12 +357,9 @@ describe("monitorDiscordProvider", () => { runtime: baseRuntime(), }); - expect(clientConstructorOptionsMock).toHaveBeenCalledTimes(1); - const opts = clientConstructorOptionsMock.mock.calls[0]?.[0] as { - eventQueue?: { listenerTimeout?: number }; - }; - expect(opts.eventQueue).toBeDefined(); - expect(opts.eventQueue?.listenerTimeout).toBe(120_000); + const eventQueue = getConstructedEventQueue(); + expect(eventQueue).toBeDefined(); + expect(eventQueue?.listenerTimeout).toBe(120_000); }); it("forwards custom eventQueue config from discord config to Carbon Client", async () => { @@ -377,10 +382,7 @@ describe("monitorDiscordProvider", () => { runtime: baseRuntime(), }); - expect(clientConstructorOptionsMock).toHaveBeenCalledTimes(1); - const opts = clientConstructorOptionsMock.mock.calls[0]?.[0] as { - eventQueue?: { listenerTimeout?: number }; - }; - expect(opts.eventQueue?.listenerTimeout).toBe(300_000); + const eventQueue = getConstructedEventQueue(); + expect(eventQueue?.listenerTimeout).toBe(300_000); }); }); diff --git a/src/slack/actions.download-file.test.ts b/src/slack/actions.download-file.test.ts index d75330435ad..a4ac167a7b5 100644 --- a/src/slack/actions.download-file.test.ts +++ b/src/slack/actions.download-file.test.ts @@ -60,6 +60,13 @@ function expectResolveSlackMediaCalledWithDefaults() { }); } +function mockSuccessfulMediaDownload(client: ReturnType) { + client.files.info.mockResolvedValueOnce({ + file: makeSlackFileInfo(), + }); + resolveSlackMedia.mockResolvedValueOnce([makeResolvedSlackMedia()]); +} + describe("downloadSlackFile", () => { beforeEach(() => { resolveSlackMedia.mockReset(); @@ -86,10 +93,7 @@ describe("downloadSlackFile", () => { it("downloads via resolveSlackMedia using fresh files.info metadata", async () => { const client = createClient(); - client.files.info.mockResolvedValueOnce({ - file: makeSlackFileInfo(), - }); - resolveSlackMedia.mockResolvedValueOnce([makeResolvedSlackMedia()]); + mockSuccessfulMediaDownload(client); const result = await downloadSlackFile("F123", { client, @@ -143,10 +147,7 @@ describe("downloadSlackFile", () => { it("keeps legacy behavior when file metadata does not expose channel/thread shares", async () => { const client = createClient(); - client.files.info.mockResolvedValueOnce({ - file: makeSlackFileInfo(), - }); - resolveSlackMedia.mockResolvedValueOnce([makeResolvedSlackMedia()]); + mockSuccessfulMediaDownload(client); const result = await downloadSlackFile("F123", { client, diff --git a/src/telegram/bot/delivery.resolve-media-retry.test.ts b/src/telegram/bot/delivery.resolve-media-retry.test.ts index bc0774ff3b1..e265d265d70 100644 --- a/src/telegram/bot/delivery.resolve-media-retry.test.ts +++ b/src/telegram/bot/delivery.resolve-media-retry.test.ts @@ -110,6 +110,18 @@ function setupTransientGetFileRetry() { return getFile; } +function mockPdfFetchAndSave(fileName: string | undefined) { + fetchRemoteMedia.mockResolvedValueOnce({ + buffer: Buffer.from("pdf-data"), + contentType: "application/pdf", + fileName, + }); + saveMediaBuffer.mockResolvedValueOnce({ + path: "/tmp/file_42---uuid.pdf", + contentType: "application/pdf", + }); +} + function createFileTooBigError(): Error { return new Error("GrammyError: Call to 'getFile' failed! (400: Bad Request: file is too big)"); } @@ -321,15 +333,7 @@ describe("resolveMedia original filename preservation", () => { it("falls back to fetched.fileName when telegram file_name is absent", async () => { const getFile = vi.fn().mockResolvedValue({ file_path: "documents/file_42.pdf" }); - fetchRemoteMedia.mockResolvedValueOnce({ - buffer: Buffer.from("pdf-data"), - contentType: "application/pdf", - fileName: "file_42.pdf", - }); - saveMediaBuffer.mockResolvedValueOnce({ - path: "/tmp/file_42---uuid.pdf", - contentType: "application/pdf", - }); + mockPdfFetchAndSave("file_42.pdf"); const ctx = makeCtx("document", getFile); const result = await resolveMedia(ctx, MAX_MEDIA_BYTES, BOT_TOKEN); @@ -346,15 +350,7 @@ describe("resolveMedia original filename preservation", () => { it("falls back to filePath when neither telegram nor fetched fileName is available", async () => { const getFile = vi.fn().mockResolvedValue({ file_path: "documents/file_42.pdf" }); - fetchRemoteMedia.mockResolvedValueOnce({ - buffer: Buffer.from("pdf-data"), - contentType: "application/pdf", - fileName: undefined, - }); - saveMediaBuffer.mockResolvedValueOnce({ - path: "/tmp/file_42---uuid.pdf", - contentType: "application/pdf", - }); + mockPdfFetchAndSave(undefined); const ctx = makeCtx("document", getFile); const result = await resolveMedia(ctx, MAX_MEDIA_BYTES, BOT_TOKEN); diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts index 22bafa37feb..594b5df9693 100644 --- a/src/telegram/draft-stream.test.ts +++ b/src/telegram/draft-stream.test.ts @@ -44,6 +44,14 @@ async function expectInitialForumSend( ); } +function expectDmMessagePreviewViaSendMessage( + api: ReturnType, + text = "Hello", +): void { + expect(api.sendMessage).toHaveBeenCalledWith(123, text, { message_thread_id: 42 }); + expect(api.editMessageText).not.toHaveBeenCalled(); +} + function createForceNewMessageHarness(params: { throttleMs?: number } = {}) { const api = createMockDraftApi(); api.sendMessage @@ -135,9 +143,8 @@ describe("createTelegramDraftStream", () => { stream.update("Hello"); await stream.flush(); - expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { message_thread_id: 42 }); + expectDmMessagePreviewViaSendMessage(api); expect(api.sendMessageDraft).not.toHaveBeenCalled(); - expect(api.editMessageText).not.toHaveBeenCalled(); }); it("falls back to message transport when sendMessageDraft is unavailable", async () => { @@ -153,8 +160,7 @@ describe("createTelegramDraftStream", () => { stream.update("Hello"); await stream.flush(); - expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { message_thread_id: 42 }); - expect(api.editMessageText).not.toHaveBeenCalled(); + expectDmMessagePreviewViaSendMessage(api); expect(warn).toHaveBeenCalledWith( "telegram stream preview: sendMessageDraft unavailable; falling back to sendMessage/editMessageText", ); @@ -392,6 +398,14 @@ describe("draft stream initial message debounce", () => { deleteMessage: vi.fn().mockResolvedValue(true), }); + function createDebouncedStream(api: ReturnType, minInitialChars = 30) { + return createTelegramDraftStream({ + api: api as unknown as Bot["api"], + chatId: 123, + minInitialChars, + }); + } + beforeEach(() => { vi.useFakeTimers(); }); @@ -403,11 +417,7 @@ describe("draft stream initial message debounce", () => { describe("isFinal has highest priority", () => { it("sends immediately on stop() even with 1 character", async () => { const api = createMockApi(); - const stream = createTelegramDraftStream({ - api: api as unknown as Bot["api"], - chatId: 123, - minInitialChars: 30, - }); + const stream = createDebouncedStream(api); stream.update("Y"); await stream.stop(); @@ -418,11 +428,7 @@ describe("draft stream initial message debounce", () => { it("sends immediately on stop() with short sentence", async () => { const api = createMockApi(); - const stream = createTelegramDraftStream({ - api: api as unknown as Bot["api"], - chatId: 123, - minInitialChars: 30, - }); + const stream = createDebouncedStream(api); stream.update("Ok."); await stream.stop(); @@ -435,11 +441,7 @@ describe("draft stream initial message debounce", () => { describe("minInitialChars threshold", () => { it("does not send first message below threshold", async () => { const api = createMockApi(); - const stream = createTelegramDraftStream({ - api: api as unknown as Bot["api"], - chatId: 123, - minInitialChars: 30, - }); + const stream = createDebouncedStream(api); stream.update("Processing"); // 10 chars, below 30 await stream.flush(); @@ -449,11 +451,7 @@ describe("draft stream initial message debounce", () => { it("sends first message when reaching threshold", async () => { const api = createMockApi(); - const stream = createTelegramDraftStream({ - api: api as unknown as Bot["api"], - chatId: 123, - minInitialChars: 30, - }); + const stream = createDebouncedStream(api); // Exactly 30 chars stream.update("I am processing your request.."); @@ -464,11 +462,7 @@ describe("draft stream initial message debounce", () => { it("works with longer text above threshold", async () => { const api = createMockApi(); - const stream = createTelegramDraftStream({ - api: api as unknown as Bot["api"], - chatId: 123, - minInitialChars: 30, - }); + const stream = createDebouncedStream(api); stream.update("I am processing your request, please wait a moment"); // 50 chars await stream.flush(); @@ -480,11 +474,7 @@ describe("draft stream initial message debounce", () => { describe("subsequent updates after first message", () => { it("edits normally after first message is sent", async () => { const api = createMockApi(); - const stream = createTelegramDraftStream({ - api: api as unknown as Bot["api"], - chatId: 123, - minInitialChars: 30, - }); + const stream = createDebouncedStream(api); // First message at threshold (30 chars) stream.update("I am processing your request.."); diff --git a/src/telegram/webhook.test.ts b/src/telegram/webhook.test.ts index f4c7c404803..b2863a11dbb 100644 --- a/src/telegram/webhook.test.ts +++ b/src/telegram/webhook.test.ts @@ -1,6 +1,6 @@ import { createHash } from "node:crypto"; import { once } from "node:events"; -import { request } from "node:http"; +import { request, type IncomingMessage } from "node:http"; import { setTimeout as sleep } from "node:timers/promises"; import { describe, expect, it, vi } from "vitest"; import { startTelegramWebhook } from "./webhook.js"; @@ -24,6 +24,22 @@ const TELEGRAM_TOKEN = "tok"; const TELEGRAM_SECRET = "secret"; const TELEGRAM_WEBHOOK_PATH = "/hook"; +function collectResponseBody( + res: IncomingMessage, + onDone: (payload: { statusCode: number; body: string }) => void, +): void { + const chunks: Buffer[] = []; + res.on("data", (chunk: Buffer | string) => { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }); + res.on("end", () => { + onDone({ + statusCode: res.statusCode ?? 0, + body: Buffer.concat(chunks).toString("utf-8"), + }); + }); +} + vi.mock("grammy", async (importOriginal) => { const actual = await importOriginal(); return { @@ -124,16 +140,7 @@ async function postWebhookPayloadWithChunkPlan(params: { }, }, (res) => { - const chunks: Buffer[] = []; - res.on("data", (chunk: Buffer | string) => { - chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); - }); - res.on("end", () => { - finishResolve({ - statusCode: res.statusCode ?? 0, - body: Buffer.concat(chunks).toString("utf-8"), - }); - }); + collectResponseBody(res, finishResolve); }, ); @@ -555,16 +562,8 @@ describe("startTelegramWebhook", () => { }, }, (res) => { - const chunks: Buffer[] = []; - res.on("data", (chunk: Buffer | string) => { - chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); - }); - res.on("end", () => { - resolve({ - kind: "response", - statusCode: res.statusCode ?? 0, - body: Buffer.concat(chunks).toString("utf-8"), - }); + collectResponseBody(res, (payload) => { + resolve({ kind: "response", ...payload }); }); }, ); diff --git a/src/web/inbound.media.test.ts b/src/web/inbound.media.test.ts index fe835be6a66..82cc0fb83d0 100644 --- a/src/web/inbound.media.test.ts +++ b/src/web/inbound.media.test.ts @@ -26,10 +26,16 @@ vi.mock("../config/config.js", async (importOriginal) => { }; }); -vi.mock("../pairing/pairing-store.js", () => ({ - readChannelAllowFromStore: (...args: unknown[]) => readAllowFromStoreMock(...args), - upsertChannelPairingRequest: (...args: unknown[]) => upsertPairingRequestMock(...args), -})); +vi.mock("../pairing/pairing-store.js", () => { + return { + readChannelAllowFromStore(...args: unknown[]) { + return readAllowFromStoreMock(...args); + }, + upsertChannelPairingRequest(...args: unknown[]) { + return upsertPairingRequestMock(...args); + }, + }; +}); vi.mock("../media/store.js", async (importOriginal) => { const actual = await importOriginal(); diff --git a/src/web/monitor-inbox.allows-messages-from-senders-allowfrom-list.test.ts b/src/web/monitor-inbox.allows-messages-from-senders-allowfrom-list.test.ts index 828236a2e74..545a010ed50 100644 --- a/src/web/monitor-inbox.allows-messages-from-senders-allowfrom-list.test.ts +++ b/src/web/monitor-inbox.allows-messages-from-senders-allowfrom-list.test.ts @@ -3,6 +3,7 @@ import { describe, expect, it, vi } from "vitest"; import { monitorWebInbox } from "./inbound.js"; import { DEFAULT_ACCOUNT_ID, + expectPairingPromptSent, getAuthDir, getSock, installWebMonitorInboxUnitTestHooks, @@ -182,13 +183,7 @@ describe("web monitor inbox", () => { sock.ev.emit("messages.upsert", upsertBlocked); await new Promise((resolve) => setImmediate(resolve)); expect(onMessage).not.toHaveBeenCalled(); - expect(sock.sendMessage).toHaveBeenCalledTimes(1); - expect(sock.sendMessage).toHaveBeenCalledWith("999@s.whatsapp.net", { - text: expect.stringContaining("Your WhatsApp phone number: +999"), - }); - expect(sock.sendMessage).toHaveBeenCalledWith("999@s.whatsapp.net", { - text: expect.stringContaining("Pairing code: PAIRCODE"), - }); + expectPairingPromptSent(sock, "999@s.whatsapp.net", "+999"); const upsertBlockedAgain = { type: "notify", diff --git a/src/web/monitor-inbox.blocks-messages-from-unauthorized-senders-not-allowfrom.test.ts b/src/web/monitor-inbox.blocks-messages-from-unauthorized-senders-not-allowfrom.test.ts index ca7e8656508..586df46a527 100644 --- a/src/web/monitor-inbox.blocks-messages-from-unauthorized-senders-not-allowfrom.test.ts +++ b/src/web/monitor-inbox.blocks-messages-from-unauthorized-senders-not-allowfrom.test.ts @@ -3,6 +3,7 @@ import { describe, expect, it, vi } from "vitest"; import { monitorWebInbox } from "./inbound.js"; import { DEFAULT_ACCOUNT_ID, + expectPairingPromptSent, getAuthDir, getSock, installWebMonitorInboxUnitTestHooks, @@ -116,13 +117,7 @@ describe("web monitor inbox", () => { expect(onMessage).not.toHaveBeenCalled(); // Should NOT send read receipts for blocked senders (privacy + avoids Baileys Bad MAC churn). expect(sock.readMessages).not.toHaveBeenCalled(); - expect(sock.sendMessage).toHaveBeenCalledTimes(1); - expect(sock.sendMessage).toHaveBeenCalledWith("999@s.whatsapp.net", { - text: expect.stringContaining("Your WhatsApp phone number: +999"), - }); - expect(sock.sendMessage).toHaveBeenCalledWith("999@s.whatsapp.net", { - text: expect.stringContaining("Pairing code: PAIRCODE"), - }); + expectPairingPromptSent(sock, "999@s.whatsapp.net", "+999"); await listener.close(); }); diff --git a/src/web/monitor-inbox.test-harness.ts b/src/web/monitor-inbox.test-harness.ts index 5d5eeed9052..a4e9f62f92b 100644 --- a/src/web/monitor-inbox.test-harness.ts +++ b/src/web/monitor-inbox.test-harness.ts @@ -2,7 +2,7 @@ import { EventEmitter } from "node:events"; import fsSync from "node:fs"; import os from "node:os"; import path from "node:path"; -import { afterEach, beforeEach, vi } from "vitest"; +import { afterEach, beforeEach, expect, vi } from "vitest"; import { resetLogger, setLoggerOverride } from "../logging.js"; // Avoid exporting vitest mock types (TS2742 under pnpm + d.ts emit). @@ -47,14 +47,18 @@ export type MockSock = { user: { id: string }; }; +function createResolvedMock() { + return vi.fn().mockResolvedValue(undefined); +} + function createMockSock(): MockSock { const ev = new EventEmitter(); return { ev, ws: { close: vi.fn() }, - sendPresenceUpdate: vi.fn().mockResolvedValue(undefined), - sendMessage: vi.fn().mockResolvedValue(undefined), - readMessages: vi.fn().mockResolvedValue(undefined), + sendPresenceUpdate: createResolvedMock(), + sendMessage: createResolvedMock(), + readMessages: createResolvedMock(), updateMediaMessage: vi.fn(), logger: {}, signalRepository: { @@ -66,6 +70,15 @@ function createMockSock(): MockSock { }; } +function getPairingStoreMocks() { + const readChannelAllowFromStore = (...args: unknown[]) => readAllowFromStoreMock(...args); + const upsertChannelPairingRequest = (...args: unknown[]) => upsertPairingRequestMock(...args); + return { + readChannelAllowFromStore, + upsertChannelPairingRequest, + }; +} + const sock: MockSock = createMockSock(); vi.mock("../media/store.js", () => ({ @@ -85,10 +98,7 @@ vi.mock("../config/config.js", async (importOriginal) => { }; }); -vi.mock("../pairing/pairing-store.js", () => ({ - readChannelAllowFromStore: (...args: unknown[]) => readAllowFromStoreMock(...args), - upsertChannelPairingRequest: (...args: unknown[]) => upsertPairingRequestMock(...args), -})); +vi.mock("../pairing/pairing-store.js", () => getPairingStoreMocks()); vi.mock("./session.js", () => ({ createWaSocket: vi.fn().mockResolvedValue(sock), @@ -100,6 +110,16 @@ export function getSock(): MockSock { return sock; } +export function expectPairingPromptSent(sock: MockSock, jid: string, senderE164: string) { + expect(sock.sendMessage).toHaveBeenCalledTimes(1); + expect(sock.sendMessage).toHaveBeenCalledWith(jid, { + text: expect.stringContaining(`Your WhatsApp phone number: ${senderE164}`), + }); + expect(sock.sendMessage).toHaveBeenCalledWith(jid, { + text: expect.stringContaining("Pairing code: PAIRCODE"), + }); +} + let authDir: string | undefined; export function getAuthDir(): string { diff --git a/src/whatsapp/resolve-outbound-target.test.ts b/src/whatsapp/resolve-outbound-target.test.ts index b97f5646cd8..5c4495053b2 100644 --- a/src/whatsapp/resolve-outbound-target.test.ts +++ b/src/whatsapp/resolve-outbound-target.test.ts @@ -8,6 +8,8 @@ vi.mock("../infra/outbound/target-errors.js", () => ({ })); type ResolveParams = Parameters[0]; +const PRIMARY_TARGET = "+11234567890"; +const SECONDARY_TARGET = "+19876543210"; function expectResolutionError(params: ResolveParams) { const result = resolveWhatsAppOutboundTarget(params); @@ -23,6 +25,42 @@ function expectResolutionOk(params: ResolveParams, expectedTarget: string) { expect(result).toEqual({ ok: true, to: expectedTarget }); } +function mockNormalizedDirectMessage(...values: Array) { + const normalizeMock = vi.mocked(normalize.normalizeWhatsAppTarget); + for (const value of values) { + normalizeMock.mockReturnValueOnce(value); + } + vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); +} + +function expectAllowedForTarget(params: { + allowFrom: ResolveParams["allowFrom"]; + mode: ResolveParams["mode"]; + to?: string; +}) { + const to = params.to ?? PRIMARY_TARGET; + expectResolutionOk( + { + to, + allowFrom: params.allowFrom, + mode: params.mode, + }, + to, + ); +} + +function expectDeniedForTarget(params: { + allowFrom: ResolveParams["allowFrom"]; + mode: ResolveParams["mode"]; + to?: string; +}) { + expectResolutionError({ + to: params.to ?? PRIMARY_TARGET, + allowFrom: params.allowFrom, + mode: params.mode, + }); +} + describe("resolveWhatsAppOutboundTarget", () => { beforeEach(() => { vi.resetAllMocks(); @@ -82,64 +120,23 @@ describe("resolveWhatsAppOutboundTarget", () => { describe("implicit/heartbeat mode with allowList", () => { it("allows message when wildcard is present", () => { - vi.mocked(normalize.normalizeWhatsAppTarget) - .mockReturnValueOnce("+11234567890") - .mockReturnValueOnce("+11234567890"); - vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); - - expectResolutionOk( - { - to: "+11234567890", - allowFrom: ["*"], - mode: "implicit", - }, - "+11234567890", - ); + mockNormalizedDirectMessage(PRIMARY_TARGET, PRIMARY_TARGET); + expectAllowedForTarget({ allowFrom: ["*"], mode: "implicit" }); }); it("allows message when allowList is empty", () => { - vi.mocked(normalize.normalizeWhatsAppTarget) - .mockReturnValueOnce("+11234567890") - .mockReturnValueOnce("+11234567890"); - vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); - - expectResolutionOk( - { - to: "+11234567890", - allowFrom: [], - mode: "implicit", - }, - "+11234567890", - ); + mockNormalizedDirectMessage(PRIMARY_TARGET, PRIMARY_TARGET); + expectAllowedForTarget({ allowFrom: [], mode: "implicit" }); }); it("allows message when target is in allowList", () => { - vi.mocked(normalize.normalizeWhatsAppTarget) - .mockReturnValueOnce("+11234567890") - .mockReturnValueOnce("+11234567890"); - vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); - - expectResolutionOk( - { - to: "+11234567890", - allowFrom: ["+11234567890"], - mode: "implicit", - }, - "+11234567890", - ); + mockNormalizedDirectMessage(PRIMARY_TARGET, PRIMARY_TARGET); + expectAllowedForTarget({ allowFrom: [PRIMARY_TARGET], mode: "implicit" }); }); it("denies message when target is not in allowList", () => { - vi.mocked(normalize.normalizeWhatsAppTarget) - .mockReturnValueOnce("+11234567890") - .mockReturnValueOnce("+19876543210"); - vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); - - expectResolutionError({ - to: "+11234567890", - allowFrom: ["+19876543210"], - mode: "implicit", - }); + mockNormalizedDirectMessage(PRIMARY_TARGET, SECONDARY_TARGET); + expectDeniedForTarget({ allowFrom: [SECONDARY_TARGET], mode: "implicit" }); }); it("handles mixed numeric and string allowList entries", () => { @@ -149,14 +146,10 @@ describe("resolveWhatsAppOutboundTarget", () => { .mockReturnValueOnce("+11234567890"); // for allowFrom[1] vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); - expectResolutionOk( - { - to: "+11234567890", - allowFrom: [1234567890, "+11234567890"], - mode: "implicit", - }, - "+11234567890", - ); + expectAllowedForTarget({ + allowFrom: [1234567890, PRIMARY_TARGET], + mode: "implicit", + }); }); it("filters out invalid normalized entries from allowList", () => { @@ -166,136 +159,72 @@ describe("resolveWhatsAppOutboundTarget", () => { .mockReturnValueOnce("+11234567890"); // for 'to' param (processed last) vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); - expectResolutionOk( - { - to: "+11234567890", - allowFrom: ["invalid", "+11234567890"], - mode: "implicit", - }, - "+11234567890", - ); + expectAllowedForTarget({ + allowFrom: ["invalid", PRIMARY_TARGET], + mode: "implicit", + }); }); }); describe("heartbeat mode", () => { it("allows message when target is in allowList in heartbeat mode", () => { - vi.mocked(normalize.normalizeWhatsAppTarget) - .mockReturnValueOnce("+11234567890") - .mockReturnValueOnce("+11234567890"); - vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); - - expectResolutionOk( - { - to: "+11234567890", - allowFrom: ["+11234567890"], - mode: "heartbeat", - }, - "+11234567890", - ); + mockNormalizedDirectMessage(PRIMARY_TARGET, PRIMARY_TARGET); + expectAllowedForTarget({ allowFrom: [PRIMARY_TARGET], mode: "heartbeat" }); }); it("denies message when target is not in allowList in heartbeat mode", () => { - vi.mocked(normalize.normalizeWhatsAppTarget) - .mockReturnValueOnce("+11234567890") - .mockReturnValueOnce("+19876543210"); - vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); - - expectResolutionError({ - to: "+11234567890", - allowFrom: ["+19876543210"], - mode: "heartbeat", - }); + mockNormalizedDirectMessage(PRIMARY_TARGET, SECONDARY_TARGET); + expectDeniedForTarget({ allowFrom: [SECONDARY_TARGET], mode: "heartbeat" }); }); }); describe("explicit/custom modes", () => { it("allows message in null mode when allowList is not set", () => { - vi.mocked(normalize.normalizeWhatsAppTarget).mockReturnValueOnce("+11234567890"); - vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); - - expectResolutionOk( - { - to: "+11234567890", - allowFrom: undefined, - mode: null, - }, - "+11234567890", - ); + mockNormalizedDirectMessage(PRIMARY_TARGET); + expectAllowedForTarget({ allowFrom: undefined, mode: null }); }); it("allows message in undefined mode when allowList is not set", () => { - vi.mocked(normalize.normalizeWhatsAppTarget).mockReturnValueOnce("+11234567890"); - vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); - - expectResolutionOk( - { - to: "+11234567890", - allowFrom: undefined, - mode: undefined, - }, - "+11234567890", - ); + mockNormalizedDirectMessage(PRIMARY_TARGET); + expectAllowedForTarget({ allowFrom: undefined, mode: undefined }); }); it("enforces allowList in custom mode string", () => { - vi.mocked(normalize.normalizeWhatsAppTarget) - .mockReturnValueOnce("+19876543210") // for allowFrom[0] (happens first!) - .mockReturnValueOnce("+11234567890"); // for 'to' param (happens second) - vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); - - expectResolutionError({ - to: "+11234567890", - allowFrom: ["+19876543210"], - mode: "broadcast", - }); + mockNormalizedDirectMessage(SECONDARY_TARGET, PRIMARY_TARGET); + expectDeniedForTarget({ allowFrom: [SECONDARY_TARGET], mode: "broadcast" }); }); it("allows message in custom mode string when target is in allowList", () => { - vi.mocked(normalize.normalizeWhatsAppTarget) - .mockReturnValueOnce("+11234567890") // for allowFrom[0] - .mockReturnValueOnce("+11234567890"); // for 'to' param - vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); - - expectResolutionOk( - { - to: "+11234567890", - allowFrom: ["+11234567890"], - mode: "broadcast", - }, - "+11234567890", - ); + mockNormalizedDirectMessage(PRIMARY_TARGET, PRIMARY_TARGET); + expectAllowedForTarget({ allowFrom: [PRIMARY_TARGET], mode: "broadcast" }); }); }); describe("whitespace handling", () => { it("trims whitespace from to parameter", () => { - vi.mocked(normalize.normalizeWhatsAppTarget).mockReturnValueOnce("+11234567890"); - vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); + mockNormalizedDirectMessage(PRIMARY_TARGET); expectResolutionOk( { - to: " +11234567890 ", + to: ` ${PRIMARY_TARGET} `, allowFrom: undefined, mode: undefined, }, - "+11234567890", + PRIMARY_TARGET, ); - expect(vi.mocked(normalize.normalizeWhatsAppTarget)).toHaveBeenCalledWith("+11234567890"); + expect(vi.mocked(normalize.normalizeWhatsAppTarget)).toHaveBeenCalledWith(PRIMARY_TARGET); }); it("trims whitespace from allowList entries", () => { - vi.mocked(normalize.normalizeWhatsAppTarget) - .mockReturnValueOnce("+11234567890") - .mockReturnValueOnce("+11234567890"); - vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false); + mockNormalizedDirectMessage(PRIMARY_TARGET, PRIMARY_TARGET); resolveWhatsAppOutboundTarget({ - to: "+11234567890", - allowFrom: [" +11234567890 "], + to: PRIMARY_TARGET, + allowFrom: [` ${PRIMARY_TARGET} `], mode: undefined, }); - expect(vi.mocked(normalize.normalizeWhatsAppTarget)).toHaveBeenCalledWith("+11234567890"); + expect(vi.mocked(normalize.normalizeWhatsAppTarget)).toHaveBeenCalledWith(PRIMARY_TARGET); }); }); });