refactor: dedupe channel outbound and monitor tests

This commit is contained in:
Peter Steinberger
2026-03-03 00:14:52 +00:00
parent 6a42d09129
commit d7dda4dd1a
18 changed files with 301 additions and 450 deletions

View File

@@ -20,6 +20,51 @@ type DirectSendFn<TOpts extends Record<string, unknown>, TResult extends DirectS
opts: TOpts,
) => Promise<TResult>;
type SendPayloadContext = Parameters<NonNullable<ChannelOutboundAdapter["sendPayload"]>>[0];
type SendPayloadResult = Awaited<ReturnType<NonNullable<ChannelOutboundAdapter["sendPayload"]>>>;
type SendPayloadAdapter = Pick<
ChannelOutboundAdapter,
"sendMedia" | "sendText" | "chunker" | "textChunkLimit"
>;
export async function sendTextMediaPayload(params: {
channel: string;
ctx: SendPayloadContext;
adapter: SendPayloadAdapter;
}): Promise<SendPayloadResult> {
const text = params.ctx.payload.text ?? "";
const urls = params.ctx.payload.mediaUrls?.length
? params.ctx.payload.mediaUrls
: params.ctx.payload.mediaUrl
? [params.ctx.payload.mediaUrl]
: [];
if (!text && urls.length === 0) {
return { channel: params.channel, messageId: "" };
}
if (urls.length > 0) {
let lastResult = await params.adapter.sendMedia!({
...params.ctx,
text,
mediaUrl: urls[0],
});
for (let i = 1; i < urls.length; i++) {
lastResult = await params.adapter.sendMedia!({
...params.ctx,
text: "",
mediaUrl: urls[i],
});
}
return lastResult;
}
const limit = params.adapter.textChunkLimit;
const chunks = limit && params.adapter.chunker ? params.adapter.chunker(text, limit) : [text];
let lastResult: Awaited<ReturnType<NonNullable<typeof params.adapter.sendText>>>;
for (const chunk of chunks) {
lastResult = await params.adapter.sendText!({ ...params.ctx, text: chunk });
}
return lastResult!;
}
export function resolveScopedChannelMediaMaxBytes(params: {
cfg: OpenClawConfig;
accountId?: string | null;
@@ -91,39 +136,8 @@ export function createDirectTextMediaOutbound<
chunker: chunkText,
chunkerMode: "text",
textChunkLimit: 4000,
sendPayload: async (ctx) => {
const text = ctx.payload.text ?? "";
const urls = ctx.payload.mediaUrls?.length
? ctx.payload.mediaUrls
: ctx.payload.mediaUrl
? [ctx.payload.mediaUrl]
: [];
if (!text && urls.length === 0) {
return { channel: params.channel, messageId: "" };
}
if (urls.length > 0) {
let lastResult = await outbound.sendMedia!({
...ctx,
text,
mediaUrl: urls[0],
});
for (let i = 1; i < urls.length; i++) {
lastResult = await outbound.sendMedia!({
...ctx,
text: "",
mediaUrl: urls[i],
});
}
return lastResult;
}
const limit = outbound.textChunkLimit;
const chunks = limit && outbound.chunker ? outbound.chunker(text, limit) : [text];
let lastResult: Awaited<ReturnType<NonNullable<typeof outbound.sendText>>>;
for (const chunk of chunks) {
lastResult = await outbound.sendText!({ ...ctx, text: chunk });
}
return lastResult!;
},
sendPayload: async (ctx) =>
await sendTextMediaPayload({ channel: params.channel, ctx, adapter: outbound }),
sendText: async ({ cfg, to, text, accountId, deps, replyToId }) => {
return await sendDirect({
cfg,

View File

@@ -10,6 +10,7 @@ import {
import type { OutboundIdentity } from "../../../infra/outbound/identity.js";
import { normalizeDiscordOutboundTarget } from "../normalize/discord.js";
import type { ChannelOutboundAdapter } from "../types.js";
import { sendTextMediaPayload } from "./direct-text-media.js";
function resolveDiscordOutboundTarget(params: {
to: string;
@@ -80,39 +81,8 @@ export const discordOutbound: ChannelOutboundAdapter = {
textChunkLimit: 2000,
pollMaxOptions: 10,
resolveTarget: ({ to }) => normalizeDiscordOutboundTarget(to),
sendPayload: async (ctx) => {
const text = ctx.payload.text ?? "";
const urls = ctx.payload.mediaUrls?.length
? ctx.payload.mediaUrls
: ctx.payload.mediaUrl
? [ctx.payload.mediaUrl]
: [];
if (!text && urls.length === 0) {
return { channel: "discord", messageId: "" };
}
if (urls.length > 0) {
let lastResult = await discordOutbound.sendMedia!({
...ctx,
text,
mediaUrl: urls[0],
});
for (let i = 1; i < urls.length; i++) {
lastResult = await discordOutbound.sendMedia!({
...ctx,
text: "",
mediaUrl: urls[i],
});
}
return lastResult;
}
const limit = discordOutbound.textChunkLimit;
const chunks = limit && discordOutbound.chunker ? discordOutbound.chunker(text, limit) : [text];
let lastResult: Awaited<ReturnType<NonNullable<typeof discordOutbound.sendText>>>;
for (const chunk of chunks) {
lastResult = await discordOutbound.sendText!({ ...ctx, text: chunk });
}
return lastResult!;
},
sendPayload: async (ctx) =>
await sendTextMediaPayload({ channel: "discord", ctx, adapter: discordOutbound }),
sendText: async ({ to, text, accountId, deps, replyToId, threadId, identity, silent }) => {
if (!silent) {
const webhookResult = await maybeSendDiscordWebhookText({

View File

@@ -2,6 +2,7 @@ import type { OutboundIdentity } from "../../../infra/outbound/identity.js";
import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js";
import { sendMessageSlack, type SlackSendIdentity } from "../../../slack/send.js";
import type { ChannelOutboundAdapter } from "../types.js";
import { sendTextMediaPayload } from "./direct-text-media.js";
function resolveSlackSendIdentity(identity?: OutboundIdentity): SlackSendIdentity | undefined {
if (!identity) {
@@ -93,39 +94,8 @@ export const slackOutbound: ChannelOutboundAdapter = {
deliveryMode: "direct",
chunker: null,
textChunkLimit: 4000,
sendPayload: async (ctx) => {
const text = ctx.payload.text ?? "";
const urls = ctx.payload.mediaUrls?.length
? ctx.payload.mediaUrls
: ctx.payload.mediaUrl
? [ctx.payload.mediaUrl]
: [];
if (!text && urls.length === 0) {
return { channel: "slack", messageId: "" };
}
if (urls.length > 0) {
let lastResult = await slackOutbound.sendMedia!({
...ctx,
text,
mediaUrl: urls[0],
});
for (let i = 1; i < urls.length; i++) {
lastResult = await slackOutbound.sendMedia!({
...ctx,
text: "",
mediaUrl: urls[i],
});
}
return lastResult;
}
const limit = slackOutbound.textChunkLimit;
const chunks = limit && slackOutbound.chunker ? slackOutbound.chunker(text, limit) : [text];
let lastResult: Awaited<ReturnType<NonNullable<typeof slackOutbound.sendText>>>;
for (const chunk of chunks) {
lastResult = await slackOutbound.sendText!({ ...ctx, text: chunk });
}
return lastResult!;
},
sendPayload: async (ctx) =>
await sendTextMediaPayload({ channel: "slack", ctx, adapter: slackOutbound }),
sendText: async ({ to, text, accountId, deps, replyToId, threadId, identity }) => {
return await sendSlackOutboundMessage({
to,

View File

@@ -3,6 +3,7 @@ import { shouldLogVerbose } from "../../../globals.js";
import { sendPollWhatsApp } from "../../../web/outbound.js";
import { resolveWhatsAppOutboundTarget } from "../../../whatsapp/resolve-outbound-target.js";
import type { ChannelOutboundAdapter } from "../types.js";
import { sendTextMediaPayload } from "./direct-text-media.js";
export const whatsappOutbound: ChannelOutboundAdapter = {
deliveryMode: "gateway",
@@ -12,40 +13,8 @@ export const whatsappOutbound: ChannelOutboundAdapter = {
pollMaxOptions: 12,
resolveTarget: ({ to, allowFrom, mode }) =>
resolveWhatsAppOutboundTarget({ to, allowFrom, mode }),
sendPayload: async (ctx) => {
const text = ctx.payload.text ?? "";
const urls = ctx.payload.mediaUrls?.length
? ctx.payload.mediaUrls
: ctx.payload.mediaUrl
? [ctx.payload.mediaUrl]
: [];
if (!text && urls.length === 0) {
return { channel: "whatsapp", messageId: "" };
}
if (urls.length > 0) {
let lastResult = await whatsappOutbound.sendMedia!({
...ctx,
text,
mediaUrl: urls[0],
});
for (let i = 1; i < urls.length; i++) {
lastResult = await whatsappOutbound.sendMedia!({
...ctx,
text: "",
mediaUrl: urls[i],
});
}
return lastResult;
}
const limit = whatsappOutbound.textChunkLimit;
const chunks =
limit && whatsappOutbound.chunker ? whatsappOutbound.chunker(text, limit) : [text];
let lastResult: Awaited<ReturnType<NonNullable<typeof whatsappOutbound.sendText>>>;
for (const chunk of chunks) {
lastResult = await whatsappOutbound.sendText!({ ...ctx, text: chunk });
}
return lastResult!;
},
sendPayload: async (ctx) =>
await sendTextMediaPayload({ channel: "whatsapp", ctx, adapter: whatsappOutbound }),
sendText: async ({ to, text, accountId, deps, gifPlayback }) => {
const send =
deps?.sendWhatsApp ?? (await import("../../../web/outbound.js")).sendMessageWhatsApp;

View File

@@ -25,10 +25,18 @@ vi.mock("../auto-reply/dispatch.js", async (importOriginal) => {
};
});
vi.mock("../pairing/pairing-store.js", () => ({
readChannelAllowFromStore: (...args: unknown[]) => readAllowFromStoreMock(...args),
upsertChannelPairingRequest: (...args: unknown[]) => upsertPairingRequestMock(...args),
}));
function createPairingStoreMocks() {
return {
readChannelAllowFromStore(...args: unknown[]) {
return readAllowFromStoreMock(...args);
},
upsertChannelPairingRequest(...args: unknown[]) {
return upsertPairingRequestMock(...args);
},
};
}
vi.mock("../pairing/pairing-store.js", () => createPairingStoreMocks());
vi.mock("../config/sessions.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/sessions.js")>();

View File

@@ -43,8 +43,12 @@ type DiscordReactionEvent = Parameters<MessageReactionAddListener["handle"]>[0];
type DiscordReactionListenerParams = {
cfg: LoadedConfig;
accountId: string;
runtime: RuntimeEnv;
logger: Logger;
onEvent?: () => void;
} & DiscordReactionRoutingParams;
type DiscordReactionRoutingParams = {
botUserId?: string;
dmEnabled: boolean;
groupDmEnabled: boolean;
@@ -54,8 +58,6 @@ type DiscordReactionListenerParams = {
groupPolicy: "open" | "allowlist" | "disabled";
allowNameMatching: boolean;
guildEntries?: Record<string, import("./allow-list.js").DiscordGuildEntryResolved>;
logger: Logger;
onEvent?: () => void;
};
const DISCORD_SLOW_LISTENER_THRESHOLD_MS = 30_000;
@@ -315,23 +317,15 @@ async function authorizeDiscordReactionIngress(
return { allowed: true };
}
async function handleDiscordReactionEvent(params: {
data: DiscordReactionEvent;
client: Client;
action: "added" | "removed";
cfg: LoadedConfig;
accountId: string;
botUserId?: string;
dmEnabled: boolean;
groupDmEnabled: boolean;
groupDmChannels: string[];
dmPolicy: "open" | "pairing" | "allowlist" | "disabled";
allowFrom: string[];
groupPolicy: "open" | "allowlist" | "disabled";
allowNameMatching: boolean;
guildEntries?: Record<string, import("./allow-list.js").DiscordGuildEntryResolved>;
logger: Logger;
}) {
async function handleDiscordReactionEvent(
params: {
data: DiscordReactionEvent;
client: Client;
action: "added" | "removed";
cfg: LoadedConfig;
logger: Logger;
} & DiscordReactionRoutingParams,
) {
try {
const { data, client, action, botUserId, guildEntries } = params;
if (!("user" in data)) {

View File

@@ -120,6 +120,19 @@ const { processDiscordMessage } = await import("./message-handler.process.js");
const createBaseContext = createBaseDiscordMessageContext;
function mockDispatchSingleBlockReply(payload: { text: string; isReasoning?: boolean }) {
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
await params?.dispatcher.sendBlockReply(payload);
return { queuedFinal: false, counts: { final: 0, tool: 0, block: 1 } };
});
}
async function processStreamOffDiscordMessage() {
const ctx = await createBaseContext({ discordConfig: { streamMode: "off" } });
// oxlint-disable-next-line typescript/no-explicit-any
await processDiscordMessage(ctx as any);
}
beforeEach(() => {
vi.useRealTimers();
sendMocks.reactMessageDiscord.mockClear();
@@ -463,15 +476,8 @@ describe("processDiscordMessage draft streaming", () => {
});
it("suppresses reasoning payload delivery to Discord", async () => {
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
await params?.dispatcher.sendBlockReply({ text: "thinking...", isReasoning: true });
return { queuedFinal: false, counts: { final: 0, tool: 0, block: 1 } };
});
const ctx = await createBaseContext({ discordConfig: { streamMode: "off" } });
// oxlint-disable-next-line typescript/no-explicit-any
await processDiscordMessage(ctx as any);
mockDispatchSingleBlockReply({ text: "thinking...", isReasoning: true });
await processStreamOffDiscordMessage();
expect(deliverDiscordReply).not.toHaveBeenCalled();
});
@@ -495,15 +501,8 @@ describe("processDiscordMessage draft streaming", () => {
});
it("delivers non-reasoning block payloads to Discord", async () => {
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
await params?.dispatcher.sendBlockReply({ text: "hello from block stream" });
return { queuedFinal: false, counts: { final: 0, tool: 0, block: 1 } };
});
const ctx = await createBaseContext({ discordConfig: { streamMode: "off" } });
// oxlint-disable-next-line typescript/no-explicit-any
await processDiscordMessage(ctx as any);
mockDispatchSingleBlockReply({ text: "hello from block stream" });
await processStreamOffDiscordMessage();
expect(deliverDiscordReply).toHaveBeenCalledTimes(1);
});

View File

@@ -210,8 +210,10 @@ function createBoundThreadBindingManager(params: {
targetSessionKey: string;
agentId: string;
}): ThreadBindingManager {
const baseManager = createNoopThreadBindingManager(params.accountId);
const now = Date.now();
return {
accountId: params.accountId,
...baseManager,
getIdleTimeoutMs: () => 24 * 60 * 60 * 1000,
getMaxAgeMs: () => 0,
getByThreadId: (threadId: string) =>
@@ -224,20 +226,12 @@ function createBoundThreadBindingManager(params: {
targetSessionKey: params.targetSessionKey,
agentId: params.agentId,
boundBy: "system",
boundAt: Date.now(),
lastActivityAt: Date.now(),
boundAt: now,
lastActivityAt: now,
idleTimeoutMs: 24 * 60 * 60 * 1000,
maxAgeMs: 0,
}
: undefined,
getBySessionKey: () => undefined,
listBySessionKey: () => [],
listBindings: () => [],
touchThread: () => null,
bindTarget: async () => null,
unbindThread: () => null,
unbindBySessionKey: () => [],
stop: () => {},
: baseManager.getByThreadId(threadId),
};
}

View File

@@ -258,6 +258,14 @@ describe("monitorDiscordProvider", () => {
},
}) as OpenClawConfig;
const getConstructedEventQueue = (): { listenerTimeout?: number } | undefined => {
expect(clientConstructorOptionsMock).toHaveBeenCalledTimes(1);
const opts = clientConstructorOptionsMock.mock.calls[0]?.[0] as {
eventQueue?: { listenerTimeout?: number };
};
return opts.eventQueue;
};
beforeEach(() => {
clientConstructorOptionsMock.mockClear();
clientFetchUserMock.mockClear().mockResolvedValue({ id: "bot-1" });
@@ -349,12 +357,9 @@ describe("monitorDiscordProvider", () => {
runtime: baseRuntime(),
});
expect(clientConstructorOptionsMock).toHaveBeenCalledTimes(1);
const opts = clientConstructorOptionsMock.mock.calls[0]?.[0] as {
eventQueue?: { listenerTimeout?: number };
};
expect(opts.eventQueue).toBeDefined();
expect(opts.eventQueue?.listenerTimeout).toBe(120_000);
const eventQueue = getConstructedEventQueue();
expect(eventQueue).toBeDefined();
expect(eventQueue?.listenerTimeout).toBe(120_000);
});
it("forwards custom eventQueue config from discord config to Carbon Client", async () => {
@@ -377,10 +382,7 @@ describe("monitorDiscordProvider", () => {
runtime: baseRuntime(),
});
expect(clientConstructorOptionsMock).toHaveBeenCalledTimes(1);
const opts = clientConstructorOptionsMock.mock.calls[0]?.[0] as {
eventQueue?: { listenerTimeout?: number };
};
expect(opts.eventQueue?.listenerTimeout).toBe(300_000);
const eventQueue = getConstructedEventQueue();
expect(eventQueue?.listenerTimeout).toBe(300_000);
});
});

View File

@@ -60,6 +60,13 @@ function expectResolveSlackMediaCalledWithDefaults() {
});
}
function mockSuccessfulMediaDownload(client: ReturnType<typeof createClient>) {
client.files.info.mockResolvedValueOnce({
file: makeSlackFileInfo(),
});
resolveSlackMedia.mockResolvedValueOnce([makeResolvedSlackMedia()]);
}
describe("downloadSlackFile", () => {
beforeEach(() => {
resolveSlackMedia.mockReset();
@@ -86,10 +93,7 @@ describe("downloadSlackFile", () => {
it("downloads via resolveSlackMedia using fresh files.info metadata", async () => {
const client = createClient();
client.files.info.mockResolvedValueOnce({
file: makeSlackFileInfo(),
});
resolveSlackMedia.mockResolvedValueOnce([makeResolvedSlackMedia()]);
mockSuccessfulMediaDownload(client);
const result = await downloadSlackFile("F123", {
client,
@@ -143,10 +147,7 @@ describe("downloadSlackFile", () => {
it("keeps legacy behavior when file metadata does not expose channel/thread shares", async () => {
const client = createClient();
client.files.info.mockResolvedValueOnce({
file: makeSlackFileInfo(),
});
resolveSlackMedia.mockResolvedValueOnce([makeResolvedSlackMedia()]);
mockSuccessfulMediaDownload(client);
const result = await downloadSlackFile("F123", {
client,

View File

@@ -110,6 +110,18 @@ function setupTransientGetFileRetry() {
return getFile;
}
function mockPdfFetchAndSave(fileName: string | undefined) {
fetchRemoteMedia.mockResolvedValueOnce({
buffer: Buffer.from("pdf-data"),
contentType: "application/pdf",
fileName,
});
saveMediaBuffer.mockResolvedValueOnce({
path: "/tmp/file_42---uuid.pdf",
contentType: "application/pdf",
});
}
function createFileTooBigError(): Error {
return new Error("GrammyError: Call to 'getFile' failed! (400: Bad Request: file is too big)");
}
@@ -321,15 +333,7 @@ describe("resolveMedia original filename preservation", () => {
it("falls back to fetched.fileName when telegram file_name is absent", async () => {
const getFile = vi.fn().mockResolvedValue({ file_path: "documents/file_42.pdf" });
fetchRemoteMedia.mockResolvedValueOnce({
buffer: Buffer.from("pdf-data"),
contentType: "application/pdf",
fileName: "file_42.pdf",
});
saveMediaBuffer.mockResolvedValueOnce({
path: "/tmp/file_42---uuid.pdf",
contentType: "application/pdf",
});
mockPdfFetchAndSave("file_42.pdf");
const ctx = makeCtx("document", getFile);
const result = await resolveMedia(ctx, MAX_MEDIA_BYTES, BOT_TOKEN);
@@ -346,15 +350,7 @@ describe("resolveMedia original filename preservation", () => {
it("falls back to filePath when neither telegram nor fetched fileName is available", async () => {
const getFile = vi.fn().mockResolvedValue({ file_path: "documents/file_42.pdf" });
fetchRemoteMedia.mockResolvedValueOnce({
buffer: Buffer.from("pdf-data"),
contentType: "application/pdf",
fileName: undefined,
});
saveMediaBuffer.mockResolvedValueOnce({
path: "/tmp/file_42---uuid.pdf",
contentType: "application/pdf",
});
mockPdfFetchAndSave(undefined);
const ctx = makeCtx("document", getFile);
const result = await resolveMedia(ctx, MAX_MEDIA_BYTES, BOT_TOKEN);

View File

@@ -44,6 +44,14 @@ async function expectInitialForumSend(
);
}
function expectDmMessagePreviewViaSendMessage(
api: ReturnType<typeof createMockDraftApi>,
text = "Hello",
): void {
expect(api.sendMessage).toHaveBeenCalledWith(123, text, { message_thread_id: 42 });
expect(api.editMessageText).not.toHaveBeenCalled();
}
function createForceNewMessageHarness(params: { throttleMs?: number } = {}) {
const api = createMockDraftApi();
api.sendMessage
@@ -135,9 +143,8 @@ describe("createTelegramDraftStream", () => {
stream.update("Hello");
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { message_thread_id: 42 });
expectDmMessagePreviewViaSendMessage(api);
expect(api.sendMessageDraft).not.toHaveBeenCalled();
expect(api.editMessageText).not.toHaveBeenCalled();
});
it("falls back to message transport when sendMessageDraft is unavailable", async () => {
@@ -153,8 +160,7 @@ describe("createTelegramDraftStream", () => {
stream.update("Hello");
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { message_thread_id: 42 });
expect(api.editMessageText).not.toHaveBeenCalled();
expectDmMessagePreviewViaSendMessage(api);
expect(warn).toHaveBeenCalledWith(
"telegram stream preview: sendMessageDraft unavailable; falling back to sendMessage/editMessageText",
);
@@ -392,6 +398,14 @@ describe("draft stream initial message debounce", () => {
deleteMessage: vi.fn().mockResolvedValue(true),
});
function createDebouncedStream(api: ReturnType<typeof createMockApi>, minInitialChars = 30) {
return createTelegramDraftStream({
api: api as unknown as Bot["api"],
chatId: 123,
minInitialChars,
});
}
beforeEach(() => {
vi.useFakeTimers();
});
@@ -403,11 +417,7 @@ describe("draft stream initial message debounce", () => {
describe("isFinal has highest priority", () => {
it("sends immediately on stop() even with 1 character", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
api: api as unknown as Bot["api"],
chatId: 123,
minInitialChars: 30,
});
const stream = createDebouncedStream(api);
stream.update("Y");
await stream.stop();
@@ -418,11 +428,7 @@ describe("draft stream initial message debounce", () => {
it("sends immediately on stop() with short sentence", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
api: api as unknown as Bot["api"],
chatId: 123,
minInitialChars: 30,
});
const stream = createDebouncedStream(api);
stream.update("Ok.");
await stream.stop();
@@ -435,11 +441,7 @@ describe("draft stream initial message debounce", () => {
describe("minInitialChars threshold", () => {
it("does not send first message below threshold", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
api: api as unknown as Bot["api"],
chatId: 123,
minInitialChars: 30,
});
const stream = createDebouncedStream(api);
stream.update("Processing"); // 10 chars, below 30
await stream.flush();
@@ -449,11 +451,7 @@ describe("draft stream initial message debounce", () => {
it("sends first message when reaching threshold", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
api: api as unknown as Bot["api"],
chatId: 123,
minInitialChars: 30,
});
const stream = createDebouncedStream(api);
// Exactly 30 chars
stream.update("I am processing your request..");
@@ -464,11 +462,7 @@ describe("draft stream initial message debounce", () => {
it("works with longer text above threshold", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
api: api as unknown as Bot["api"],
chatId: 123,
minInitialChars: 30,
});
const stream = createDebouncedStream(api);
stream.update("I am processing your request, please wait a moment"); // 50 chars
await stream.flush();
@@ -480,11 +474,7 @@ describe("draft stream initial message debounce", () => {
describe("subsequent updates after first message", () => {
it("edits normally after first message is sent", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
api: api as unknown as Bot["api"],
chatId: 123,
minInitialChars: 30,
});
const stream = createDebouncedStream(api);
// First message at threshold (30 chars)
stream.update("I am processing your request..");

View File

@@ -1,6 +1,6 @@
import { createHash } from "node:crypto";
import { once } from "node:events";
import { request } from "node:http";
import { request, type IncomingMessage } from "node:http";
import { setTimeout as sleep } from "node:timers/promises";
import { describe, expect, it, vi } from "vitest";
import { startTelegramWebhook } from "./webhook.js";
@@ -24,6 +24,22 @@ const TELEGRAM_TOKEN = "tok";
const TELEGRAM_SECRET = "secret";
const TELEGRAM_WEBHOOK_PATH = "/hook";
function collectResponseBody(
res: IncomingMessage,
onDone: (payload: { statusCode: number; body: string }) => void,
): void {
const chunks: Buffer[] = [];
res.on("data", (chunk: Buffer | string) => {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
});
res.on("end", () => {
onDone({
statusCode: res.statusCode ?? 0,
body: Buffer.concat(chunks).toString("utf-8"),
});
});
}
vi.mock("grammy", async (importOriginal) => {
const actual = await importOriginal<typeof import("grammy")>();
return {
@@ -124,16 +140,7 @@ async function postWebhookPayloadWithChunkPlan(params: {
},
},
(res) => {
const chunks: Buffer[] = [];
res.on("data", (chunk: Buffer | string) => {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
});
res.on("end", () => {
finishResolve({
statusCode: res.statusCode ?? 0,
body: Buffer.concat(chunks).toString("utf-8"),
});
});
collectResponseBody(res, finishResolve);
},
);
@@ -555,16 +562,8 @@ describe("startTelegramWebhook", () => {
},
},
(res) => {
const chunks: Buffer[] = [];
res.on("data", (chunk: Buffer | string) => {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
});
res.on("end", () => {
resolve({
kind: "response",
statusCode: res.statusCode ?? 0,
body: Buffer.concat(chunks).toString("utf-8"),
});
collectResponseBody(res, (payload) => {
resolve({ kind: "response", ...payload });
});
},
);

View File

@@ -26,10 +26,16 @@ vi.mock("../config/config.js", async (importOriginal) => {
};
});
vi.mock("../pairing/pairing-store.js", () => ({
readChannelAllowFromStore: (...args: unknown[]) => readAllowFromStoreMock(...args),
upsertChannelPairingRequest: (...args: unknown[]) => upsertPairingRequestMock(...args),
}));
vi.mock("../pairing/pairing-store.js", () => {
return {
readChannelAllowFromStore(...args: unknown[]) {
return readAllowFromStoreMock(...args);
},
upsertChannelPairingRequest(...args: unknown[]) {
return upsertPairingRequestMock(...args);
},
};
});
vi.mock("../media/store.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../media/store.js")>();

View File

@@ -3,6 +3,7 @@ import { describe, expect, it, vi } from "vitest";
import { monitorWebInbox } from "./inbound.js";
import {
DEFAULT_ACCOUNT_ID,
expectPairingPromptSent,
getAuthDir,
getSock,
installWebMonitorInboxUnitTestHooks,
@@ -182,13 +183,7 @@ describe("web monitor inbox", () => {
sock.ev.emit("messages.upsert", upsertBlocked);
await new Promise((resolve) => setImmediate(resolve));
expect(onMessage).not.toHaveBeenCalled();
expect(sock.sendMessage).toHaveBeenCalledTimes(1);
expect(sock.sendMessage).toHaveBeenCalledWith("999@s.whatsapp.net", {
text: expect.stringContaining("Your WhatsApp phone number: +999"),
});
expect(sock.sendMessage).toHaveBeenCalledWith("999@s.whatsapp.net", {
text: expect.stringContaining("Pairing code: PAIRCODE"),
});
expectPairingPromptSent(sock, "999@s.whatsapp.net", "+999");
const upsertBlockedAgain = {
type: "notify",

View File

@@ -3,6 +3,7 @@ import { describe, expect, it, vi } from "vitest";
import { monitorWebInbox } from "./inbound.js";
import {
DEFAULT_ACCOUNT_ID,
expectPairingPromptSent,
getAuthDir,
getSock,
installWebMonitorInboxUnitTestHooks,
@@ -116,13 +117,7 @@ describe("web monitor inbox", () => {
expect(onMessage).not.toHaveBeenCalled();
// Should NOT send read receipts for blocked senders (privacy + avoids Baileys Bad MAC churn).
expect(sock.readMessages).not.toHaveBeenCalled();
expect(sock.sendMessage).toHaveBeenCalledTimes(1);
expect(sock.sendMessage).toHaveBeenCalledWith("999@s.whatsapp.net", {
text: expect.stringContaining("Your WhatsApp phone number: +999"),
});
expect(sock.sendMessage).toHaveBeenCalledWith("999@s.whatsapp.net", {
text: expect.stringContaining("Pairing code: PAIRCODE"),
});
expectPairingPromptSent(sock, "999@s.whatsapp.net", "+999");
await listener.close();
});

View File

@@ -2,7 +2,7 @@ import { EventEmitter } from "node:events";
import fsSync from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, vi } from "vitest";
import { afterEach, beforeEach, expect, vi } from "vitest";
import { resetLogger, setLoggerOverride } from "../logging.js";
// Avoid exporting vitest mock types (TS2742 under pnpm + d.ts emit).
@@ -47,14 +47,18 @@ export type MockSock = {
user: { id: string };
};
function createResolvedMock() {
return vi.fn().mockResolvedValue(undefined);
}
function createMockSock(): MockSock {
const ev = new EventEmitter();
return {
ev,
ws: { close: vi.fn() },
sendPresenceUpdate: vi.fn().mockResolvedValue(undefined),
sendMessage: vi.fn().mockResolvedValue(undefined),
readMessages: vi.fn().mockResolvedValue(undefined),
sendPresenceUpdate: createResolvedMock(),
sendMessage: createResolvedMock(),
readMessages: createResolvedMock(),
updateMediaMessage: vi.fn(),
logger: {},
signalRepository: {
@@ -66,6 +70,15 @@ function createMockSock(): MockSock {
};
}
function getPairingStoreMocks() {
const readChannelAllowFromStore = (...args: unknown[]) => readAllowFromStoreMock(...args);
const upsertChannelPairingRequest = (...args: unknown[]) => upsertPairingRequestMock(...args);
return {
readChannelAllowFromStore,
upsertChannelPairingRequest,
};
}
const sock: MockSock = createMockSock();
vi.mock("../media/store.js", () => ({
@@ -85,10 +98,7 @@ vi.mock("../config/config.js", async (importOriginal) => {
};
});
vi.mock("../pairing/pairing-store.js", () => ({
readChannelAllowFromStore: (...args: unknown[]) => readAllowFromStoreMock(...args),
upsertChannelPairingRequest: (...args: unknown[]) => upsertPairingRequestMock(...args),
}));
vi.mock("../pairing/pairing-store.js", () => getPairingStoreMocks());
vi.mock("./session.js", () => ({
createWaSocket: vi.fn().mockResolvedValue(sock),
@@ -100,6 +110,16 @@ export function getSock(): MockSock {
return sock;
}
export function expectPairingPromptSent(sock: MockSock, jid: string, senderE164: string) {
expect(sock.sendMessage).toHaveBeenCalledTimes(1);
expect(sock.sendMessage).toHaveBeenCalledWith(jid, {
text: expect.stringContaining(`Your WhatsApp phone number: ${senderE164}`),
});
expect(sock.sendMessage).toHaveBeenCalledWith(jid, {
text: expect.stringContaining("Pairing code: PAIRCODE"),
});
}
let authDir: string | undefined;
export function getAuthDir(): string {

View File

@@ -8,6 +8,8 @@ vi.mock("../infra/outbound/target-errors.js", () => ({
}));
type ResolveParams = Parameters<typeof resolveWhatsAppOutboundTarget>[0];
const PRIMARY_TARGET = "+11234567890";
const SECONDARY_TARGET = "+19876543210";
function expectResolutionError(params: ResolveParams) {
const result = resolveWhatsAppOutboundTarget(params);
@@ -23,6 +25,42 @@ function expectResolutionOk(params: ResolveParams, expectedTarget: string) {
expect(result).toEqual({ ok: true, to: expectedTarget });
}
function mockNormalizedDirectMessage(...values: Array<string | null>) {
const normalizeMock = vi.mocked(normalize.normalizeWhatsAppTarget);
for (const value of values) {
normalizeMock.mockReturnValueOnce(value);
}
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
}
function expectAllowedForTarget(params: {
allowFrom: ResolveParams["allowFrom"];
mode: ResolveParams["mode"];
to?: string;
}) {
const to = params.to ?? PRIMARY_TARGET;
expectResolutionOk(
{
to,
allowFrom: params.allowFrom,
mode: params.mode,
},
to,
);
}
function expectDeniedForTarget(params: {
allowFrom: ResolveParams["allowFrom"];
mode: ResolveParams["mode"];
to?: string;
}) {
expectResolutionError({
to: params.to ?? PRIMARY_TARGET,
allowFrom: params.allowFrom,
mode: params.mode,
});
}
describe("resolveWhatsAppOutboundTarget", () => {
beforeEach(() => {
vi.resetAllMocks();
@@ -82,64 +120,23 @@ describe("resolveWhatsAppOutboundTarget", () => {
describe("implicit/heartbeat mode with allowList", () => {
it("allows message when wildcard is present", () => {
vi.mocked(normalize.normalizeWhatsAppTarget)
.mockReturnValueOnce("+11234567890")
.mockReturnValueOnce("+11234567890");
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
expectResolutionOk(
{
to: "+11234567890",
allowFrom: ["*"],
mode: "implicit",
},
"+11234567890",
);
mockNormalizedDirectMessage(PRIMARY_TARGET, PRIMARY_TARGET);
expectAllowedForTarget({ allowFrom: ["*"], mode: "implicit" });
});
it("allows message when allowList is empty", () => {
vi.mocked(normalize.normalizeWhatsAppTarget)
.mockReturnValueOnce("+11234567890")
.mockReturnValueOnce("+11234567890");
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
expectResolutionOk(
{
to: "+11234567890",
allowFrom: [],
mode: "implicit",
},
"+11234567890",
);
mockNormalizedDirectMessage(PRIMARY_TARGET, PRIMARY_TARGET);
expectAllowedForTarget({ allowFrom: [], mode: "implicit" });
});
it("allows message when target is in allowList", () => {
vi.mocked(normalize.normalizeWhatsAppTarget)
.mockReturnValueOnce("+11234567890")
.mockReturnValueOnce("+11234567890");
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
expectResolutionOk(
{
to: "+11234567890",
allowFrom: ["+11234567890"],
mode: "implicit",
},
"+11234567890",
);
mockNormalizedDirectMessage(PRIMARY_TARGET, PRIMARY_TARGET);
expectAllowedForTarget({ allowFrom: [PRIMARY_TARGET], mode: "implicit" });
});
it("denies message when target is not in allowList", () => {
vi.mocked(normalize.normalizeWhatsAppTarget)
.mockReturnValueOnce("+11234567890")
.mockReturnValueOnce("+19876543210");
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
expectResolutionError({
to: "+11234567890",
allowFrom: ["+19876543210"],
mode: "implicit",
});
mockNormalizedDirectMessage(PRIMARY_TARGET, SECONDARY_TARGET);
expectDeniedForTarget({ allowFrom: [SECONDARY_TARGET], mode: "implicit" });
});
it("handles mixed numeric and string allowList entries", () => {
@@ -149,14 +146,10 @@ describe("resolveWhatsAppOutboundTarget", () => {
.mockReturnValueOnce("+11234567890"); // for allowFrom[1]
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
expectResolutionOk(
{
to: "+11234567890",
allowFrom: [1234567890, "+11234567890"],
mode: "implicit",
},
"+11234567890",
);
expectAllowedForTarget({
allowFrom: [1234567890, PRIMARY_TARGET],
mode: "implicit",
});
});
it("filters out invalid normalized entries from allowList", () => {
@@ -166,136 +159,72 @@ describe("resolveWhatsAppOutboundTarget", () => {
.mockReturnValueOnce("+11234567890"); // for 'to' param (processed last)
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
expectResolutionOk(
{
to: "+11234567890",
allowFrom: ["invalid", "+11234567890"],
mode: "implicit",
},
"+11234567890",
);
expectAllowedForTarget({
allowFrom: ["invalid", PRIMARY_TARGET],
mode: "implicit",
});
});
});
describe("heartbeat mode", () => {
it("allows message when target is in allowList in heartbeat mode", () => {
vi.mocked(normalize.normalizeWhatsAppTarget)
.mockReturnValueOnce("+11234567890")
.mockReturnValueOnce("+11234567890");
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
expectResolutionOk(
{
to: "+11234567890",
allowFrom: ["+11234567890"],
mode: "heartbeat",
},
"+11234567890",
);
mockNormalizedDirectMessage(PRIMARY_TARGET, PRIMARY_TARGET);
expectAllowedForTarget({ allowFrom: [PRIMARY_TARGET], mode: "heartbeat" });
});
it("denies message when target is not in allowList in heartbeat mode", () => {
vi.mocked(normalize.normalizeWhatsAppTarget)
.mockReturnValueOnce("+11234567890")
.mockReturnValueOnce("+19876543210");
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
expectResolutionError({
to: "+11234567890",
allowFrom: ["+19876543210"],
mode: "heartbeat",
});
mockNormalizedDirectMessage(PRIMARY_TARGET, SECONDARY_TARGET);
expectDeniedForTarget({ allowFrom: [SECONDARY_TARGET], mode: "heartbeat" });
});
});
describe("explicit/custom modes", () => {
it("allows message in null mode when allowList is not set", () => {
vi.mocked(normalize.normalizeWhatsAppTarget).mockReturnValueOnce("+11234567890");
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
expectResolutionOk(
{
to: "+11234567890",
allowFrom: undefined,
mode: null,
},
"+11234567890",
);
mockNormalizedDirectMessage(PRIMARY_TARGET);
expectAllowedForTarget({ allowFrom: undefined, mode: null });
});
it("allows message in undefined mode when allowList is not set", () => {
vi.mocked(normalize.normalizeWhatsAppTarget).mockReturnValueOnce("+11234567890");
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
expectResolutionOk(
{
to: "+11234567890",
allowFrom: undefined,
mode: undefined,
},
"+11234567890",
);
mockNormalizedDirectMessage(PRIMARY_TARGET);
expectAllowedForTarget({ allowFrom: undefined, mode: undefined });
});
it("enforces allowList in custom mode string", () => {
vi.mocked(normalize.normalizeWhatsAppTarget)
.mockReturnValueOnce("+19876543210") // for allowFrom[0] (happens first!)
.mockReturnValueOnce("+11234567890"); // for 'to' param (happens second)
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
expectResolutionError({
to: "+11234567890",
allowFrom: ["+19876543210"],
mode: "broadcast",
});
mockNormalizedDirectMessage(SECONDARY_TARGET, PRIMARY_TARGET);
expectDeniedForTarget({ allowFrom: [SECONDARY_TARGET], mode: "broadcast" });
});
it("allows message in custom mode string when target is in allowList", () => {
vi.mocked(normalize.normalizeWhatsAppTarget)
.mockReturnValueOnce("+11234567890") // for allowFrom[0]
.mockReturnValueOnce("+11234567890"); // for 'to' param
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
expectResolutionOk(
{
to: "+11234567890",
allowFrom: ["+11234567890"],
mode: "broadcast",
},
"+11234567890",
);
mockNormalizedDirectMessage(PRIMARY_TARGET, PRIMARY_TARGET);
expectAllowedForTarget({ allowFrom: [PRIMARY_TARGET], mode: "broadcast" });
});
});
describe("whitespace handling", () => {
it("trims whitespace from to parameter", () => {
vi.mocked(normalize.normalizeWhatsAppTarget).mockReturnValueOnce("+11234567890");
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
mockNormalizedDirectMessage(PRIMARY_TARGET);
expectResolutionOk(
{
to: " +11234567890 ",
to: ` ${PRIMARY_TARGET} `,
allowFrom: undefined,
mode: undefined,
},
"+11234567890",
PRIMARY_TARGET,
);
expect(vi.mocked(normalize.normalizeWhatsAppTarget)).toHaveBeenCalledWith("+11234567890");
expect(vi.mocked(normalize.normalizeWhatsAppTarget)).toHaveBeenCalledWith(PRIMARY_TARGET);
});
it("trims whitespace from allowList entries", () => {
vi.mocked(normalize.normalizeWhatsAppTarget)
.mockReturnValueOnce("+11234567890")
.mockReturnValueOnce("+11234567890");
vi.mocked(normalize.isWhatsAppGroupJid).mockReturnValueOnce(false);
mockNormalizedDirectMessage(PRIMARY_TARGET, PRIMARY_TARGET);
resolveWhatsAppOutboundTarget({
to: "+11234567890",
allowFrom: [" +11234567890 "],
to: PRIMARY_TARGET,
allowFrom: [` ${PRIMARY_TARGET} `],
mode: undefined,
});
expect(vi.mocked(normalize.normalizeWhatsAppTarget)).toHaveBeenCalledWith("+11234567890");
expect(vi.mocked(normalize.normalizeWhatsAppTarget)).toHaveBeenCalledWith(PRIMARY_TARGET);
});
});
});