refactor(feishu): split monitor startup and transport concerns

This commit is contained in:
Peter Steinberger
2026-03-02 04:09:19 +00:00
parent c0bf42f2a8
commit f46bd2e0cc
10 changed files with 943 additions and 761 deletions

View File

@@ -0,0 +1,62 @@
const RACE_TIMEOUT = Symbol("race-timeout");
const RACE_ABORT = Symbol("race-abort");
export type RaceWithTimeoutAndAbortResult<T> =
| { status: "resolved"; value: T }
| { status: "timeout" }
| { status: "aborted" };
export async function raceWithTimeoutAndAbort<T>(
promise: Promise<T>,
options: {
timeoutMs?: number;
abortSignal?: AbortSignal;
} = {},
): Promise<RaceWithTimeoutAndAbortResult<T>> {
if (options.abortSignal?.aborted) {
return { status: "aborted" };
}
if (options.timeoutMs === undefined && !options.abortSignal) {
return { status: "resolved", value: await promise };
}
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
let abortHandler: (() => void) | undefined;
const contenders: Array<Promise<T | typeof RACE_TIMEOUT | typeof RACE_ABORT>> = [promise];
if (options.timeoutMs !== undefined) {
contenders.push(
new Promise((resolve) => {
timeoutHandle = setTimeout(() => resolve(RACE_TIMEOUT), options.timeoutMs);
}),
);
}
if (options.abortSignal) {
contenders.push(
new Promise((resolve) => {
abortHandler = () => resolve(RACE_ABORT);
options.abortSignal?.addEventListener("abort", abortHandler, { once: true });
}),
);
}
try {
const result = await Promise.race(contenders);
if (result === RACE_TIMEOUT) {
return { status: "timeout" };
}
if (result === RACE_ABORT) {
return { status: "aborted" };
}
return { status: "resolved", value: result };
} finally {
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
if (abortHandler) {
options.abortSignal?.removeEventListener("abort", abortHandler);
}
}
}

View File

@@ -0,0 +1,286 @@
import * as crypto from "crypto";
import * as Lark from "@larksuiteoapi/node-sdk";
import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "openclaw/plugin-sdk";
import { resolveFeishuAccount } from "./accounts.js";
import { raceWithTimeoutAndAbort } from "./async.js";
import { handleFeishuMessage, type FeishuMessageEvent, type FeishuBotAddedEvent } from "./bot.js";
import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js";
import { createEventDispatcher } from "./client.js";
import { fetchBotOpenIdForMonitor } from "./monitor.startup.js";
import { botOpenIds } from "./monitor.state.js";
import { monitorWebhook, monitorWebSocket } from "./monitor.transport.js";
import { getMessageFeishu } from "./send.js";
import type { ResolvedFeishuAccount } from "./types.js";
const FEISHU_REACTION_VERIFY_TIMEOUT_MS = 1_500;
export type FeishuReactionCreatedEvent = {
message_id: string;
chat_id?: string;
chat_type?: "p2p" | "group";
reaction_type?: { emoji_type?: string };
operator_type?: string;
user_id?: { open_id?: string };
action_time?: string;
};
type ResolveReactionSyntheticEventParams = {
cfg: ClawdbotConfig;
accountId: string;
event: FeishuReactionCreatedEvent;
botOpenId?: string;
fetchMessage?: typeof getMessageFeishu;
verificationTimeoutMs?: number;
logger?: (message: string) => void;
uuid?: () => string;
};
export async function resolveReactionSyntheticEvent(
params: ResolveReactionSyntheticEventParams,
): Promise<FeishuMessageEvent | null> {
const {
cfg,
accountId,
event,
botOpenId,
fetchMessage = getMessageFeishu,
verificationTimeoutMs = FEISHU_REACTION_VERIFY_TIMEOUT_MS,
logger,
uuid = () => crypto.randomUUID(),
} = params;
const emoji = event.reaction_type?.emoji_type;
const messageId = event.message_id;
const senderId = event.user_id?.open_id;
if (!emoji || !messageId || !senderId) {
return null;
}
const account = resolveFeishuAccount({ cfg, accountId });
const reactionNotifications = account.config.reactionNotifications ?? "own";
if (reactionNotifications === "off") {
return null;
}
if (event.operator_type === "app" || senderId === botOpenId) {
return null;
}
if (emoji === "Typing") {
return null;
}
if (reactionNotifications === "own" && !botOpenId) {
logger?.(
`feishu[${accountId}]: bot open_id unavailable, skipping reaction ${emoji} on ${messageId}`,
);
return null;
}
const reactedMsg = await raceWithTimeoutAndAbort(fetchMessage({ cfg, messageId, accountId }), {
timeoutMs: verificationTimeoutMs,
})
.then((result) => (result.status === "resolved" ? result.value : null))
.catch(() => null);
const isBotMessage = reactedMsg?.senderType === "app" || reactedMsg?.senderOpenId === botOpenId;
if (!reactedMsg || (reactionNotifications === "own" && !isBotMessage)) {
logger?.(
`feishu[${accountId}]: ignoring reaction on non-bot/unverified message ${messageId} ` +
`(sender: ${reactedMsg?.senderOpenId ?? "unknown"})`,
);
return null;
}
const syntheticChatIdRaw = event.chat_id ?? reactedMsg.chatId;
const syntheticChatId = syntheticChatIdRaw?.trim() ? syntheticChatIdRaw : `p2p:${senderId}`;
const syntheticChatType: "p2p" | "group" = event.chat_type ?? "p2p";
return {
sender: {
sender_id: { open_id: senderId },
sender_type: "user",
},
message: {
message_id: `${messageId}:reaction:${emoji}:${uuid()}`,
chat_id: syntheticChatId,
chat_type: syntheticChatType,
message_type: "text",
content: JSON.stringify({
text: `[reacted with ${emoji} to message ${messageId}]`,
}),
},
};
}
type RegisterEventHandlersContext = {
cfg: ClawdbotConfig;
accountId: string;
runtime?: RuntimeEnv;
chatHistories: Map<string, HistoryEntry[]>;
fireAndForget?: boolean;
};
function registerEventHandlers(
eventDispatcher: Lark.EventDispatcher,
context: RegisterEventHandlersContext,
): void {
const { cfg, accountId, runtime, chatHistories, fireAndForget } = context;
const log = runtime?.log ?? console.log;
const error = runtime?.error ?? console.error;
eventDispatcher.register({
"im.message.receive_v1": async (data) => {
try {
const event = data as unknown as FeishuMessageEvent;
const promise = handleFeishuMessage({
cfg,
event,
botOpenId: botOpenIds.get(accountId),
runtime,
chatHistories,
accountId,
});
if (fireAndForget) {
promise.catch((err) => {
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
});
} else {
await promise;
}
} catch (err) {
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
}
},
"im.message.message_read_v1": async () => {
// Ignore read receipts
},
"im.chat.member.bot.added_v1": async (data) => {
try {
const event = data as unknown as FeishuBotAddedEvent;
log(`feishu[${accountId}]: bot added to chat ${event.chat_id}`);
} catch (err) {
error(`feishu[${accountId}]: error handling bot added event: ${String(err)}`);
}
},
"im.chat.member.bot.deleted_v1": async (data) => {
try {
const event = data as unknown as { chat_id: string };
log(`feishu[${accountId}]: bot removed from chat ${event.chat_id}`);
} catch (err) {
error(`feishu[${accountId}]: error handling bot removed event: ${String(err)}`);
}
},
"im.message.reaction.created_v1": async (data) => {
const processReaction = async () => {
const event = data as FeishuReactionCreatedEvent;
const myBotId = botOpenIds.get(accountId);
const syntheticEvent = await resolveReactionSyntheticEvent({
cfg,
accountId,
event,
botOpenId: myBotId,
logger: log,
});
if (!syntheticEvent) {
return;
}
const promise = handleFeishuMessage({
cfg,
event: syntheticEvent,
botOpenId: myBotId,
runtime,
chatHistories,
accountId,
});
if (fireAndForget) {
promise.catch((err) => {
error(`feishu[${accountId}]: error handling reaction: ${String(err)}`);
});
return;
}
await promise;
};
if (fireAndForget) {
void processReaction().catch((err) => {
error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`);
});
return;
}
try {
await processReaction();
} catch (err) {
error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`);
}
},
"im.message.reaction.deleted_v1": async () => {
// Ignore reaction removals
},
"card.action.trigger": async (data: unknown) => {
try {
const event = data as unknown as FeishuCardActionEvent;
const promise = handleFeishuCardAction({
cfg,
event,
botOpenId: botOpenIds.get(accountId),
runtime,
accountId,
});
if (fireAndForget) {
promise.catch((err) => {
error(`feishu[${accountId}]: error handling card action: ${String(err)}`);
});
} else {
await promise;
}
} catch (err) {
error(`feishu[${accountId}]: error handling card action: ${String(err)}`);
}
},
});
}
export type BotOpenIdSource = { kind: "prefetched"; botOpenId?: string } | { kind: "fetch" };
export type MonitorSingleAccountParams = {
cfg: ClawdbotConfig;
account: ResolvedFeishuAccount;
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
botOpenIdSource?: BotOpenIdSource;
};
export async function monitorSingleAccount(params: MonitorSingleAccountParams): Promise<void> {
const { cfg, account, runtime, abortSignal } = params;
const { accountId } = account;
const log = runtime?.log ?? console.log;
const botOpenIdSource = params.botOpenIdSource ?? { kind: "fetch" };
const botOpenId =
botOpenIdSource.kind === "prefetched"
? botOpenIdSource.botOpenId
: await fetchBotOpenIdForMonitor(account, { runtime, abortSignal });
botOpenIds.set(accountId, botOpenId ?? "");
log(`feishu[${accountId}]: bot open_id resolved: ${botOpenId ?? "unknown"}`);
const connectionMode = account.config.connectionMode ?? "websocket";
if (connectionMode === "webhook" && !account.verificationToken?.trim()) {
throw new Error(`Feishu account "${accountId}" webhook mode requires verificationToken`);
}
const eventDispatcher = createEventDispatcher(account);
const chatHistories = new Map<string, HistoryEntry[]>();
registerEventHandlers(eventDispatcher, {
cfg,
accountId,
runtime,
chatHistories,
fireAndForget: true,
});
if (connectionMode === "webhook") {
return monitorWebhook({ account, accountId, runtime, abortSignal, eventDispatcher });
}
return monitorWebSocket({ account, accountId, runtime, abortSignal, eventDispatcher });
}

View File

@@ -0,0 +1,187 @@
import type { ClawdbotConfig } from "openclaw/plugin-sdk";
import { afterEach, describe, expect, it, vi } from "vitest";
const probeFeishuMock = vi.hoisted(() => vi.fn());
vi.mock("./probe.js", () => ({
probeFeishu: probeFeishuMock,
}));
vi.mock("./client.js", () => ({
createFeishuWSClient: vi.fn(() => ({ start: vi.fn() })),
createEventDispatcher: vi.fn(() => ({ register: vi.fn() })),
}));
import { monitorFeishuProvider, stopFeishuMonitor } from "./monitor.js";
function buildMultiAccountWebsocketConfig(accountIds: string[]): ClawdbotConfig {
return {
channels: {
feishu: {
enabled: true,
accounts: Object.fromEntries(
accountIds.map((accountId) => [
accountId,
{
enabled: true,
appId: `cli_${accountId}`,
appSecret: `secret_${accountId}`,
connectionMode: "websocket",
},
]),
),
},
},
} as ClawdbotConfig;
}
afterEach(() => {
stopFeishuMonitor();
});
describe("Feishu monitor startup preflight", () => {
it("starts account probes sequentially to avoid startup bursts", async () => {
let inFlight = 0;
let maxInFlight = 0;
const started: string[] = [];
let releaseProbes!: () => void;
const probesReleased = new Promise<void>((resolve) => {
releaseProbes = () => resolve();
});
probeFeishuMock.mockImplementation(async (account: { accountId: string }) => {
started.push(account.accountId);
inFlight += 1;
maxInFlight = Math.max(maxInFlight, inFlight);
await probesReleased;
inFlight -= 1;
return { ok: true, botOpenId: `bot_${account.accountId}` };
});
const abortController = new AbortController();
const monitorPromise = monitorFeishuProvider({
config: buildMultiAccountWebsocketConfig(["alpha", "beta", "gamma"]),
abortSignal: abortController.signal,
});
try {
await Promise.resolve();
await Promise.resolve();
expect(started).toEqual(["alpha"]);
expect(maxInFlight).toBe(1);
} finally {
releaseProbes();
abortController.abort();
await monitorPromise;
}
});
it("does not refetch bot info after a failed sequential preflight", async () => {
const started: string[] = [];
let releaseBetaProbe!: () => void;
const betaProbeReleased = new Promise<void>((resolve) => {
releaseBetaProbe = () => resolve();
});
probeFeishuMock.mockImplementation(async (account: { accountId: string }) => {
started.push(account.accountId);
if (account.accountId === "alpha") {
return { ok: false };
}
await betaProbeReleased;
return { ok: true, botOpenId: `bot_${account.accountId}` };
});
const abortController = new AbortController();
const monitorPromise = monitorFeishuProvider({
config: buildMultiAccountWebsocketConfig(["alpha", "beta"]),
abortSignal: abortController.signal,
});
try {
for (let i = 0; i < 10 && !started.includes("beta"); i += 1) {
await Promise.resolve();
}
expect(started).toEqual(["alpha", "beta"]);
expect(started.filter((accountId) => accountId === "alpha")).toHaveLength(1);
} finally {
releaseBetaProbe();
abortController.abort();
await monitorPromise;
}
});
it("continues startup when probe layer reports timeout", async () => {
const started: string[] = [];
let releaseBetaProbe!: () => void;
const betaProbeReleased = new Promise<void>((resolve) => {
releaseBetaProbe = () => resolve();
});
probeFeishuMock.mockImplementation((account: { accountId: string }) => {
started.push(account.accountId);
if (account.accountId === "alpha") {
return Promise.resolve({ ok: false, error: "probe timed out after 10000ms" });
}
return betaProbeReleased.then(() => ({ ok: true, botOpenId: `bot_${account.accountId}` }));
});
const abortController = new AbortController();
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() };
const monitorPromise = monitorFeishuProvider({
config: buildMultiAccountWebsocketConfig(["alpha", "beta"]),
runtime,
abortSignal: abortController.signal,
});
try {
for (let i = 0; i < 10 && !started.includes("beta"); i += 1) {
await Promise.resolve();
}
expect(started).toEqual(["alpha", "beta"]);
expect(runtime.error).toHaveBeenCalledWith(
expect.stringContaining("bot info probe timed out"),
);
} finally {
releaseBetaProbe();
abortController.abort();
await monitorPromise;
}
});
it("stops sequential preflight when aborted during probe", async () => {
const started: string[] = [];
probeFeishuMock.mockImplementation(
(account: { accountId: string }, options: { abortSignal?: AbortSignal }) => {
started.push(account.accountId);
return new Promise((resolve) => {
options.abortSignal?.addEventListener(
"abort",
() => resolve({ ok: false, error: "probe aborted" }),
{ once: true },
);
});
},
);
const abortController = new AbortController();
const monitorPromise = monitorFeishuProvider({
config: buildMultiAccountWebsocketConfig(["alpha", "beta"]),
abortSignal: abortController.signal,
});
try {
await Promise.resolve();
expect(started).toEqual(["alpha"]);
abortController.abort();
await monitorPromise;
expect(started).toEqual(["alpha"]);
} finally {
abortController.abort();
}
});
});

View File

@@ -0,0 +1,51 @@
import type { RuntimeEnv } from "openclaw/plugin-sdk";
import { probeFeishu } from "./probe.js";
import type { ResolvedFeishuAccount } from "./types.js";
export const FEISHU_STARTUP_BOT_INFO_TIMEOUT_MS = 10_000;
type FetchBotOpenIdOptions = {
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
timeoutMs?: number;
};
function isTimeoutErrorMessage(message: string | undefined): boolean {
return message?.toLowerCase().includes("timeout") || message?.toLowerCase().includes("timed out")
? true
: false;
}
function isAbortErrorMessage(message: string | undefined): boolean {
return message?.toLowerCase().includes("aborted") ?? false;
}
export async function fetchBotOpenIdForMonitor(
account: ResolvedFeishuAccount,
options: FetchBotOpenIdOptions = {},
): Promise<string | undefined> {
if (options.abortSignal?.aborted) {
return undefined;
}
const timeoutMs = options.timeoutMs ?? FEISHU_STARTUP_BOT_INFO_TIMEOUT_MS;
const result = await probeFeishu(account, {
timeoutMs,
abortSignal: options.abortSignal,
});
if (result.ok) {
return result.botOpenId;
}
if (options.abortSignal?.aborted || isAbortErrorMessage(result.error)) {
return undefined;
}
if (isTimeoutErrorMessage(result.error)) {
const error = options.runtime?.error ?? console.error;
error(
`feishu[${account.accountId}]: bot info probe timed out after ${timeoutMs}ms; continuing startup`,
);
}
return undefined;
}

View File

@@ -0,0 +1,76 @@
import * as http from "http";
import * as Lark from "@larksuiteoapi/node-sdk";
import {
createFixedWindowRateLimiter,
createWebhookAnomalyTracker,
type RuntimeEnv,
WEBHOOK_ANOMALY_COUNTER_DEFAULTS,
WEBHOOK_RATE_LIMIT_DEFAULTS,
} from "openclaw/plugin-sdk";
export const wsClients = new Map<string, Lark.WSClient>();
export const httpServers = new Map<string, http.Server>();
export const botOpenIds = new Map<string, string>();
export const FEISHU_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024;
export const FEISHU_WEBHOOK_BODY_TIMEOUT_MS = 30_000;
export const feishuWebhookRateLimiter = createFixedWindowRateLimiter({
windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs,
maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests,
maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys,
});
const feishuWebhookAnomalyTracker = createWebhookAnomalyTracker({
maxTrackedKeys: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.maxTrackedKeys,
ttlMs: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.ttlMs,
logEvery: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.logEvery,
});
export function clearFeishuWebhookRateLimitStateForTest(): void {
feishuWebhookRateLimiter.clear();
feishuWebhookAnomalyTracker.clear();
}
export function getFeishuWebhookRateLimitStateSizeForTest(): number {
return feishuWebhookRateLimiter.size();
}
export function isWebhookRateLimitedForTest(key: string, nowMs: number): boolean {
return feishuWebhookRateLimiter.isRateLimited(key, nowMs);
}
export function recordWebhookStatus(
runtime: RuntimeEnv | undefined,
accountId: string,
path: string,
statusCode: number,
): void {
feishuWebhookAnomalyTracker.record({
key: `${accountId}:${path}:${statusCode}`,
statusCode,
log: runtime?.log ?? console.log,
message: (count) =>
`feishu[${accountId}]: webhook anomaly path=${path} status=${statusCode} count=${count}`,
});
}
export function stopFeishuMonitorState(accountId?: string): void {
if (accountId) {
wsClients.delete(accountId);
const server = httpServers.get(accountId);
if (server) {
server.close();
httpServers.delete(accountId);
}
botOpenIds.delete(accountId);
return;
}
wsClients.clear();
for (const server of httpServers.values()) {
server.close();
}
httpServers.clear();
botOpenIds.clear();
}

View File

@@ -0,0 +1,163 @@
import * as http from "http";
import * as Lark from "@larksuiteoapi/node-sdk";
import {
applyBasicWebhookRequestGuards,
type RuntimeEnv,
installRequestBodyLimitGuard,
} from "openclaw/plugin-sdk";
import { createFeishuWSClient } from "./client.js";
import {
botOpenIds,
FEISHU_WEBHOOK_BODY_TIMEOUT_MS,
FEISHU_WEBHOOK_MAX_BODY_BYTES,
feishuWebhookRateLimiter,
httpServers,
recordWebhookStatus,
wsClients,
} from "./monitor.state.js";
import type { ResolvedFeishuAccount } from "./types.js";
export type MonitorTransportParams = {
account: ResolvedFeishuAccount;
accountId: string;
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
eventDispatcher: Lark.EventDispatcher;
};
export async function monitorWebSocket({
account,
accountId,
runtime,
abortSignal,
eventDispatcher,
}: MonitorTransportParams): Promise<void> {
const log = runtime?.log ?? console.log;
log(`feishu[${accountId}]: starting WebSocket connection...`);
const wsClient = createFeishuWSClient(account);
wsClients.set(accountId, wsClient);
return new Promise((resolve, reject) => {
const cleanup = () => {
wsClients.delete(accountId);
botOpenIds.delete(accountId);
};
const handleAbort = () => {
log(`feishu[${accountId}]: abort signal received, stopping`);
cleanup();
resolve();
};
if (abortSignal?.aborted) {
cleanup();
resolve();
return;
}
abortSignal?.addEventListener("abort", handleAbort, { once: true });
try {
wsClient.start({ eventDispatcher });
log(`feishu[${accountId}]: WebSocket client started`);
} catch (err) {
cleanup();
abortSignal?.removeEventListener("abort", handleAbort);
reject(err);
}
});
}
export async function monitorWebhook({
account,
accountId,
runtime,
abortSignal,
eventDispatcher,
}: MonitorTransportParams): Promise<void> {
const log = runtime?.log ?? console.log;
const error = runtime?.error ?? console.error;
const port = account.config.webhookPort ?? 3000;
const path = account.config.webhookPath ?? "/feishu/events";
const host = account.config.webhookHost ?? "127.0.0.1";
log(`feishu[${accountId}]: starting Webhook server on ${host}:${port}, path ${path}...`);
const server = http.createServer();
const webhookHandler = Lark.adaptDefault(path, eventDispatcher, { autoChallenge: true });
server.on("request", (req, res) => {
res.on("finish", () => {
recordWebhookStatus(runtime, accountId, path, res.statusCode);
});
const rateLimitKey = `${accountId}:${path}:${req.socket.remoteAddress ?? "unknown"}`;
if (
!applyBasicWebhookRequestGuards({
req,
res,
rateLimiter: feishuWebhookRateLimiter,
rateLimitKey,
nowMs: Date.now(),
requireJsonContentType: true,
})
) {
return;
}
const guard = installRequestBodyLimitGuard(req, res, {
maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES,
timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS,
responseFormat: "text",
});
if (guard.isTripped()) {
return;
}
void Promise.resolve(webhookHandler(req, res))
.catch((err) => {
if (!guard.isTripped()) {
error(`feishu[${accountId}]: webhook handler error: ${String(err)}`);
}
})
.finally(() => {
guard.dispose();
});
});
httpServers.set(accountId, server);
return new Promise((resolve, reject) => {
const cleanup = () => {
server.close();
httpServers.delete(accountId);
botOpenIds.delete(accountId);
};
const handleAbort = () => {
log(`feishu[${accountId}]: abort signal received, stopping Webhook server`);
cleanup();
resolve();
};
if (abortSignal?.aborted) {
cleanup();
resolve();
return;
}
abortSignal?.addEventListener("abort", handleAbort, { once: true });
server.listen(port, host, () => {
log(`feishu[${accountId}]: Webhook server listening on ${host}:${port}`);
});
server.on("error", (err) => {
error(`feishu[${accountId}]: Webhook server error: ${err}`);
abortSignal?.removeEventListener("abort", handleAbort);
reject(err);
});
});
}

View File

@@ -1,24 +1,17 @@
import * as crypto from "crypto";
import * as http from "http";
import * as Lark from "@larksuiteoapi/node-sdk";
import type { ClawdbotConfig, RuntimeEnv } from "openclaw/plugin-sdk";
import { listEnabledFeishuAccounts, resolveFeishuAccount } from "./accounts.js";
import {
applyBasicWebhookRequestGuards,
type ClawdbotConfig,
createFixedWindowRateLimiter,
createWebhookAnomalyTracker,
type RuntimeEnv,
type HistoryEntry,
installRequestBodyLimitGuard,
WEBHOOK_ANOMALY_COUNTER_DEFAULTS,
WEBHOOK_RATE_LIMIT_DEFAULTS,
} from "openclaw/plugin-sdk";
import { resolveFeishuAccount, listEnabledFeishuAccounts } from "./accounts.js";
import { handleFeishuMessage, type FeishuMessageEvent, type FeishuBotAddedEvent } from "./bot.js";
import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js";
import { createFeishuWSClient, createEventDispatcher } from "./client.js";
import { probeFeishu } from "./probe.js";
import { getMessageFeishu } from "./send.js";
import type { ResolvedFeishuAccount } from "./types.js";
monitorSingleAccount,
resolveReactionSyntheticEvent,
type FeishuReactionCreatedEvent,
} from "./monitor.account.js";
import { fetchBotOpenIdForMonitor } from "./monitor.startup.js";
import {
clearFeishuWebhookRateLimitStateForTest,
getFeishuWebhookRateLimitStateSizeForTest,
isWebhookRateLimitedForTest,
stopFeishuMonitorState,
} from "./monitor.state.js";
export type MonitorFeishuOpts = {
config?: ClawdbotConfig;
@@ -27,556 +20,14 @@ export type MonitorFeishuOpts = {
accountId?: string;
};
// Per-account WebSocket clients, HTTP servers, and bot info
const wsClients = new Map<string, Lark.WSClient>();
const httpServers = new Map<string, http.Server>();
const botOpenIds = new Map<string, string>();
const FEISHU_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024;
const FEISHU_WEBHOOK_BODY_TIMEOUT_MS = 30_000;
const FEISHU_REACTION_VERIFY_TIMEOUT_MS = 1_500;
const FEISHU_STARTUP_BOT_INFO_TIMEOUT_MS = 10_000;
const FEISHU_BOT_INFO_FETCH_ABORTED = Symbol("feishu-bot-info-fetch-aborted");
const FEISHU_BOT_INFO_FETCH_TIMED_OUT = Symbol("feishu-bot-info-fetch-timed-out");
export type FeishuReactionCreatedEvent = {
message_id: string;
chat_id?: string;
chat_type?: "p2p" | "group";
reaction_type?: { emoji_type?: string };
operator_type?: string;
user_id?: { open_id?: string };
action_time?: string;
export {
clearFeishuWebhookRateLimitStateForTest,
getFeishuWebhookRateLimitStateSizeForTest,
isWebhookRateLimitedForTest,
resolveReactionSyntheticEvent,
};
export type { FeishuReactionCreatedEvent };
type ResolveReactionSyntheticEventParams = {
cfg: ClawdbotConfig;
accountId: string;
event: FeishuReactionCreatedEvent;
botOpenId?: string;
fetchMessage?: typeof getMessageFeishu;
verificationTimeoutMs?: number;
logger?: (message: string) => void;
uuid?: () => string;
};
const feishuWebhookRateLimiter = createFixedWindowRateLimiter({
windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs,
maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests,
maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys,
});
const feishuWebhookAnomalyTracker = createWebhookAnomalyTracker({
maxTrackedKeys: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.maxTrackedKeys,
ttlMs: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.ttlMs,
logEvery: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.logEvery,
});
export function clearFeishuWebhookRateLimitStateForTest(): void {
feishuWebhookRateLimiter.clear();
feishuWebhookAnomalyTracker.clear();
}
export function getFeishuWebhookRateLimitStateSizeForTest(): number {
return feishuWebhookRateLimiter.size();
}
export function isWebhookRateLimitedForTest(key: string, nowMs: number): boolean {
return feishuWebhookRateLimiter.isRateLimited(key, nowMs);
}
function recordWebhookStatus(
runtime: RuntimeEnv | undefined,
accountId: string,
path: string,
statusCode: number,
): void {
feishuWebhookAnomalyTracker.record({
key: `${accountId}:${path}:${statusCode}`,
statusCode,
log: runtime?.log ?? console.log,
message: (count) =>
`feishu[${accountId}]: webhook anomaly path=${path} status=${statusCode} count=${count}`,
});
}
async function withTimeout<T>(promise: Promise<T>, timeoutMs: number): Promise<T | null> {
let timeoutId: NodeJS.Timeout | undefined;
try {
return await Promise.race<T | null>([
promise,
new Promise<null>((resolve) => {
timeoutId = setTimeout(() => resolve(null), timeoutMs);
}),
]);
} finally {
if (timeoutId) {
clearTimeout(timeoutId);
}
}
}
export async function resolveReactionSyntheticEvent(
params: ResolveReactionSyntheticEventParams,
): Promise<FeishuMessageEvent | null> {
const {
cfg,
accountId,
event,
botOpenId,
fetchMessage = getMessageFeishu,
verificationTimeoutMs = FEISHU_REACTION_VERIFY_TIMEOUT_MS,
logger,
uuid = () => crypto.randomUUID(),
} = params;
const emoji = event.reaction_type?.emoji_type;
const messageId = event.message_id;
const senderId = event.user_id?.open_id;
if (!emoji || !messageId || !senderId) {
return null;
}
const account = resolveFeishuAccount({ cfg, accountId });
const reactionNotifications = account.config.reactionNotifications ?? "own";
if (reactionNotifications === "off") {
return null;
}
// Skip bot self-reactions
if (event.operator_type === "app" || senderId === botOpenId) {
return null;
}
// Skip typing indicator emoji
if (emoji === "Typing") {
return null;
}
if (reactionNotifications === "own" && !botOpenId) {
logger?.(
`feishu[${accountId}]: bot open_id unavailable, skipping reaction ${emoji} on ${messageId}`,
);
return null;
}
const reactedMsg = await withTimeout(
fetchMessage({ cfg, messageId, accountId }),
verificationTimeoutMs,
).catch(() => null);
const isBotMessage = reactedMsg?.senderType === "app" || reactedMsg?.senderOpenId === botOpenId;
if (!reactedMsg || (reactionNotifications === "own" && !isBotMessage)) {
logger?.(
`feishu[${accountId}]: ignoring reaction on non-bot/unverified message ${messageId} ` +
`(sender: ${reactedMsg?.senderOpenId ?? "unknown"})`,
);
return null;
}
const syntheticChatIdRaw = event.chat_id ?? reactedMsg.chatId;
const syntheticChatId = syntheticChatIdRaw?.trim() ? syntheticChatIdRaw : `p2p:${senderId}`;
const syntheticChatType: "p2p" | "group" = event.chat_type ?? "p2p";
return {
sender: {
sender_id: { open_id: senderId },
sender_type: "user",
},
message: {
message_id: `${messageId}:reaction:${emoji}:${uuid()}`,
chat_id: syntheticChatId,
chat_type: syntheticChatType,
message_type: "text",
content: JSON.stringify({
text: `[reacted with ${emoji} to message ${messageId}]`,
}),
},
};
}
type FetchBotOpenIdOptions = {
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
timeoutMs?: number;
};
async function fetchBotOpenId(
account: ResolvedFeishuAccount,
options: FetchBotOpenIdOptions = {},
): Promise<string | undefined> {
if (options.abortSignal?.aborted) {
return undefined;
}
const timeoutMs = options.timeoutMs ?? FEISHU_STARTUP_BOT_INFO_TIMEOUT_MS;
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
let abortHandler: (() => void) | undefined;
try {
const contenders: Array<
Promise<
| string
| undefined
| typeof FEISHU_BOT_INFO_FETCH_ABORTED
| typeof FEISHU_BOT_INFO_FETCH_TIMED_OUT
>
> = [
probeFeishu(account)
.then((result) => (result.ok ? result.botOpenId : undefined))
.catch(() => undefined),
new Promise((resolve) => {
timeoutHandle = setTimeout(() => resolve(FEISHU_BOT_INFO_FETCH_TIMED_OUT), timeoutMs);
}),
];
if (options.abortSignal) {
contenders.push(
new Promise((resolve) => {
abortHandler = () => resolve(FEISHU_BOT_INFO_FETCH_ABORTED);
options.abortSignal?.addEventListener("abort", abortHandler, { once: true });
}),
);
}
const outcome = await Promise.race(contenders);
if (outcome === FEISHU_BOT_INFO_FETCH_ABORTED) {
return undefined;
}
if (outcome === FEISHU_BOT_INFO_FETCH_TIMED_OUT) {
const error = options.runtime?.error ?? console.error;
error(
`feishu[${account.accountId}]: bot info probe timed out after ${timeoutMs}ms; continuing startup`,
);
return undefined;
}
return outcome;
} catch {
return undefined;
} finally {
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
if (abortHandler) {
options.abortSignal?.removeEventListener("abort", abortHandler);
}
}
}
/**
* Register common event handlers on an EventDispatcher.
* When fireAndForget is true, message handling is not awaited to avoid blocking
* event processing (Lark webhooks require <3s response).
*/
function registerEventHandlers(
eventDispatcher: Lark.EventDispatcher,
context: {
cfg: ClawdbotConfig;
accountId: string;
runtime?: RuntimeEnv;
chatHistories: Map<string, HistoryEntry[]>;
fireAndForget?: boolean;
},
) {
const { cfg, accountId, runtime, chatHistories, fireAndForget } = context;
const log = runtime?.log ?? console.log;
const error = runtime?.error ?? console.error;
eventDispatcher.register({
"im.message.receive_v1": async (data) => {
try {
const event = data as unknown as FeishuMessageEvent;
const promise = handleFeishuMessage({
cfg,
event,
botOpenId: botOpenIds.get(accountId),
runtime,
chatHistories,
accountId,
});
if (fireAndForget) {
promise.catch((err) => {
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
});
} else {
await promise;
}
} catch (err) {
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
}
},
"im.message.message_read_v1": async () => {
// Ignore read receipts
},
"im.chat.member.bot.added_v1": async (data) => {
try {
const event = data as unknown as FeishuBotAddedEvent;
log(`feishu[${accountId}]: bot added to chat ${event.chat_id}`);
} catch (err) {
error(`feishu[${accountId}]: error handling bot added event: ${String(err)}`);
}
},
"im.chat.member.bot.deleted_v1": async (data) => {
try {
const event = data as unknown as { chat_id: string };
log(`feishu[${accountId}]: bot removed from chat ${event.chat_id}`);
} catch (err) {
error(`feishu[${accountId}]: error handling bot removed event: ${String(err)}`);
}
},
"im.message.reaction.created_v1": async (data) => {
const processReaction = async () => {
const event = data as FeishuReactionCreatedEvent;
const myBotId = botOpenIds.get(accountId);
const syntheticEvent = await resolveReactionSyntheticEvent({
cfg,
accountId,
event,
botOpenId: myBotId,
logger: log,
});
if (!syntheticEvent) {
return;
}
const promise = handleFeishuMessage({
cfg,
event: syntheticEvent,
botOpenId: myBotId,
runtime,
chatHistories,
accountId,
});
if (fireAndForget) {
promise.catch((err) => {
error(`feishu[${accountId}]: error handling reaction: ${String(err)}`);
});
return;
}
await promise;
};
if (fireAndForget) {
void processReaction().catch((err) => {
error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`);
});
return;
}
try {
await processReaction();
} catch (err) {
error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`);
}
},
"im.message.reaction.deleted_v1": async () => {
// Ignore reaction removals
},
"card.action.trigger": async (data: unknown) => {
try {
const event = data as unknown as FeishuCardActionEvent;
const promise = handleFeishuCardAction({
cfg,
event,
botOpenId: botOpenIds.get(accountId),
runtime,
accountId,
});
if (fireAndForget) {
promise.catch((err) => {
error(`feishu[${accountId}]: error handling card action: ${String(err)}`);
});
} else {
await promise;
}
} catch (err) {
error(`feishu[${accountId}]: error handling card action: ${String(err)}`);
}
},
});
}
type MonitorAccountParams = {
cfg: ClawdbotConfig;
account: ResolvedFeishuAccount;
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
botOpenId?: string;
botOpenIdPrefetched?: boolean;
};
/**
* Monitor a single Feishu account.
*/
async function monitorSingleAccount(params: MonitorAccountParams): Promise<void> {
const { cfg, account, runtime, abortSignal } = params;
const { accountId } = account;
const log = runtime?.log ?? console.log;
// Fetch bot open_id
const botOpenId = params.botOpenIdPrefetched
? params.botOpenId
: await fetchBotOpenId(account, { runtime, abortSignal });
botOpenIds.set(accountId, botOpenId ?? "");
log(`feishu[${accountId}]: bot open_id resolved: ${botOpenId ?? "unknown"}`);
const connectionMode = account.config.connectionMode ?? "websocket";
if (connectionMode === "webhook" && !account.verificationToken?.trim()) {
throw new Error(`Feishu account "${accountId}" webhook mode requires verificationToken`);
}
const eventDispatcher = createEventDispatcher(account);
const chatHistories = new Map<string, HistoryEntry[]>();
registerEventHandlers(eventDispatcher, {
cfg,
accountId,
runtime,
chatHistories,
fireAndForget: true,
});
if (connectionMode === "webhook") {
return monitorWebhook({ params, accountId, eventDispatcher });
}
return monitorWebSocket({ params, accountId, eventDispatcher });
}
type ConnectionParams = {
params: MonitorAccountParams;
accountId: string;
eventDispatcher: Lark.EventDispatcher;
};
async function monitorWebSocket({
params,
accountId,
eventDispatcher,
}: ConnectionParams): Promise<void> {
const { account, runtime, abortSignal } = params;
const log = runtime?.log ?? console.log;
const error = runtime?.error ?? console.error;
log(`feishu[${accountId}]: starting WebSocket connection...`);
const wsClient = createFeishuWSClient(account);
wsClients.set(accountId, wsClient);
return new Promise((resolve, reject) => {
const cleanup = () => {
wsClients.delete(accountId);
botOpenIds.delete(accountId);
};
const handleAbort = () => {
log(`feishu[${accountId}]: abort signal received, stopping`);
cleanup();
resolve();
};
if (abortSignal?.aborted) {
cleanup();
resolve();
return;
}
abortSignal?.addEventListener("abort", handleAbort, { once: true });
try {
wsClient.start({ eventDispatcher });
log(`feishu[${accountId}]: WebSocket client started`);
} catch (err) {
cleanup();
abortSignal?.removeEventListener("abort", handleAbort);
reject(err);
}
});
}
async function monitorWebhook({
params,
accountId,
eventDispatcher,
}: ConnectionParams): Promise<void> {
const { account, runtime, abortSignal } = params;
const log = runtime?.log ?? console.log;
const error = runtime?.error ?? console.error;
const port = account.config.webhookPort ?? 3000;
const path = account.config.webhookPath ?? "/feishu/events";
const host = account.config.webhookHost ?? "127.0.0.1";
log(`feishu[${accountId}]: starting Webhook server on ${host}:${port}, path ${path}...`);
const server = http.createServer();
const webhookHandler = Lark.adaptDefault(path, eventDispatcher, { autoChallenge: true });
server.on("request", (req, res) => {
res.on("finish", () => {
recordWebhookStatus(runtime, accountId, path, res.statusCode);
});
const rateLimitKey = `${accountId}:${path}:${req.socket.remoteAddress ?? "unknown"}`;
if (
!applyBasicWebhookRequestGuards({
req,
res,
rateLimiter: feishuWebhookRateLimiter,
rateLimitKey,
nowMs: Date.now(),
requireJsonContentType: true,
})
) {
return;
}
const guard = installRequestBodyLimitGuard(req, res, {
maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES,
timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS,
responseFormat: "text",
});
if (guard.isTripped()) {
return;
}
void Promise.resolve(webhookHandler(req, res))
.catch((err) => {
if (!guard.isTripped()) {
error(`feishu[${accountId}]: webhook handler error: ${String(err)}`);
}
})
.finally(() => {
guard.dispose();
});
});
httpServers.set(accountId, server);
return new Promise((resolve, reject) => {
const cleanup = () => {
server.close();
httpServers.delete(accountId);
botOpenIds.delete(accountId);
};
const handleAbort = () => {
log(`feishu[${accountId}]: abort signal received, stopping Webhook server`);
cleanup();
resolve();
};
if (abortSignal?.aborted) {
cleanup();
resolve();
return;
}
abortSignal?.addEventListener("abort", handleAbort, { once: true });
server.listen(port, host, () => {
log(`feishu[${accountId}]: Webhook server listening on ${host}:${port}`);
});
server.on("error", (err) => {
error(`feishu[${accountId}]: Webhook server error: ${err}`);
abortSignal?.removeEventListener("abort", handleAbort);
reject(err);
});
});
}
/**
* Main entry: start monitoring for all enabled accounts.
*/
export async function monitorFeishuProvider(opts: MonitorFeishuOpts = {}): Promise<void> {
const cfg = opts.config;
if (!cfg) {
@@ -585,7 +36,6 @@ export async function monitorFeishuProvider(opts: MonitorFeishuOpts = {}): Promi
const log = opts.runtime?.log ?? console.log;
// If accountId is specified, only monitor that account
if (opts.accountId) {
const account = resolveFeishuAccount({ cfg, accountId: opts.accountId });
if (!account.enabled || !account.configured) {
@@ -599,7 +49,6 @@ export async function monitorFeishuProvider(opts: MonitorFeishuOpts = {}): Promi
});
}
// Otherwise, start all enabled accounts
const accounts = listEnabledFeishuAccounts(cfg);
if (accounts.length === 0) {
throw new Error("No enabled Feishu accounts configured");
@@ -615,47 +64,32 @@ export async function monitorFeishuProvider(opts: MonitorFeishuOpts = {}): Promi
log("feishu: abort signal received during startup preflight; stopping startup");
break;
}
// Probe sequentially so large multi-account startups do not burst Feishu's bot-info endpoint.
const botOpenId = await fetchBotOpenId(account, {
const botOpenId = await fetchBotOpenIdForMonitor(account, {
runtime: opts.runtime,
abortSignal: opts.abortSignal,
});
if (opts.abortSignal?.aborted) {
log("feishu: abort signal received during startup preflight; stopping startup");
break;
}
monitorPromises.push(
monitorSingleAccount({
cfg,
account,
runtime: opts.runtime,
abortSignal: opts.abortSignal,
botOpenId,
botOpenIdPrefetched: true,
botOpenIdSource: { kind: "prefetched", botOpenId },
}),
);
}
await Promise.all(monitorPromises);
}
/**
* Stop monitoring for a specific account or all accounts.
*/
export function stopFeishuMonitor(accountId?: string): void {
if (accountId) {
wsClients.delete(accountId);
const server = httpServers.get(accountId);
if (server) {
server.close();
httpServers.delete(accountId);
}
botOpenIds.delete(accountId);
} else {
wsClients.clear();
for (const server of httpServers.values()) {
server.close();
}
httpServers.clear();
botOpenIds.clear();
}
stopFeishuMonitorState(accountId);
}

View File

@@ -84,27 +84,6 @@ function buildConfig(params: {
} as ClawdbotConfig;
}
function buildMultiAccountWebsocketConfig(accountIds: string[]): ClawdbotConfig {
return {
channels: {
feishu: {
enabled: true,
accounts: Object.fromEntries(
accountIds.map((accountId) => [
accountId,
{
enabled: true,
appId: `cli_${accountId}`,
appSecret: `secret_${accountId}`,
connectionMode: "websocket",
},
]),
),
},
},
} as ClawdbotConfig;
}
async function withRunningWebhookMonitor(
params: {
accountId: string;
@@ -227,145 +206,4 @@ describe("Feishu webhook security hardening", () => {
isWebhookRateLimitedForTest("/feishu-rate-limit-stale:fresh", now + 60_001);
expect(getFeishuWebhookRateLimitStateSizeForTest()).toBe(1);
});
it("starts account probes sequentially to avoid startup bursts", async () => {
let inFlight = 0;
let maxInFlight = 0;
const started: string[] = [];
let releaseProbes!: () => void;
const probesReleased = new Promise<void>((resolve) => {
releaseProbes = () => resolve();
});
probeFeishuMock.mockImplementation(async (account: { accountId: string }) => {
started.push(account.accountId);
inFlight += 1;
maxInFlight = Math.max(maxInFlight, inFlight);
await probesReleased;
inFlight -= 1;
return { ok: true, botOpenId: `bot_${account.accountId}` };
});
const abortController = new AbortController();
const monitorPromise = monitorFeishuProvider({
config: buildMultiAccountWebsocketConfig(["alpha", "beta", "gamma"]),
abortSignal: abortController.signal,
});
try {
await Promise.resolve();
await Promise.resolve();
expect(started).toEqual(["alpha"]);
expect(maxInFlight).toBe(1);
} finally {
releaseProbes();
abortController.abort();
await monitorPromise;
}
});
it("does not refetch bot info after a failed sequential preflight", async () => {
const started: string[] = [];
let releaseBetaProbe!: () => void;
const betaProbeReleased = new Promise<void>((resolve) => {
releaseBetaProbe = () => resolve();
});
probeFeishuMock.mockImplementation(async (account: { accountId: string }) => {
started.push(account.accountId);
if (account.accountId === "alpha") {
return { ok: false };
}
await betaProbeReleased;
return { ok: true, botOpenId: `bot_${account.accountId}` };
});
const abortController = new AbortController();
const monitorPromise = monitorFeishuProvider({
config: buildMultiAccountWebsocketConfig(["alpha", "beta"]),
abortSignal: abortController.signal,
});
try {
for (let i = 0; i < 10 && !started.includes("beta"); i += 1) {
await Promise.resolve();
}
expect(started).toEqual(["alpha", "beta"]);
expect(started.filter((accountId) => accountId === "alpha")).toHaveLength(1);
} finally {
releaseBetaProbe();
abortController.abort();
await monitorPromise;
}
});
it("continues startup when a sequential preflight probe times out", async () => {
vi.useFakeTimers();
const started: string[] = [];
let releaseBetaProbe!: () => void;
const betaProbeReleased = new Promise<void>((resolve) => {
releaseBetaProbe = () => resolve();
});
probeFeishuMock.mockImplementation((account: { accountId: string }) => {
started.push(account.accountId);
if (account.accountId === "alpha") {
return new Promise<never>(() => {});
}
return betaProbeReleased.then(() => ({ ok: true, botOpenId: `bot_${account.accountId}` }));
});
const abortController = new AbortController();
const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() };
const monitorPromise = monitorFeishuProvider({
config: buildMultiAccountWebsocketConfig(["alpha", "beta"]),
runtime,
abortSignal: abortController.signal,
});
try {
await Promise.resolve();
expect(started).toEqual(["alpha"]);
await vi.advanceTimersByTimeAsync(10_000);
await Promise.resolve();
expect(started).toEqual(["alpha", "beta"]);
expect(runtime.error).toHaveBeenCalledWith(
expect.stringContaining("bot info probe timed out"),
);
} finally {
releaseBetaProbe();
abortController.abort();
await monitorPromise;
vi.useRealTimers();
}
});
it("stops sequential preflight when aborted during a stuck probe", async () => {
const started: string[] = [];
probeFeishuMock.mockImplementation((account: { accountId: string }) => {
started.push(account.accountId);
return new Promise<never>(() => {});
});
const abortController = new AbortController();
const monitorPromise = monitorFeishuProvider({
config: buildMultiAccountWebsocketConfig(["alpha", "beta"]),
abortSignal: abortController.signal,
});
try {
await Promise.resolve();
expect(started).toEqual(["alpha"]);
abortController.abort();
await monitorPromise;
expect(started).toEqual(["alpha"]);
} finally {
abortController.abort();
}
});
});

View File

@@ -76,6 +76,36 @@ describe("probeFeishu", () => {
);
});
it("returns timeout error when request exceeds timeout", async () => {
vi.useFakeTimers();
try {
const requestFn = vi.fn().mockImplementation(() => new Promise(() => {}));
createFeishuClientMock.mockReturnValue({ request: requestFn });
const promise = probeFeishu({ appId: "cli_123", appSecret: "secret" }, { timeoutMs: 1_000 });
await vi.advanceTimersByTimeAsync(1_000);
const result = await promise;
expect(result).toMatchObject({ ok: false, error: "probe timed out after 1000ms" });
} finally {
vi.useRealTimers();
}
});
it("returns aborted when abort signal is already aborted", async () => {
createFeishuClientMock.mockClear();
const abortController = new AbortController();
abortController.abort();
const result = await probeFeishu(
{ appId: "cli_123", appSecret: "secret" },
{ abortSignal: abortController.signal },
);
expect(result).toMatchObject({ ok: false, error: "probe aborted" });
expect(createFeishuClientMock).not.toHaveBeenCalled();
});
it("returns cached result on subsequent calls within TTL", async () => {
const requestFn = setupClient({
code: 0,

View File

@@ -1,3 +1,4 @@
import { raceWithTimeoutAndAbort } from "./async.js";
import { createFeishuClient, type FeishuClientCredentials } from "./client.js";
import type { FeishuProbeResult } from "./types.js";
@@ -10,13 +11,37 @@ const PROBE_CACHE_TTL_MS = 10 * 60 * 1000; // 10 minutes
const MAX_PROBE_CACHE_SIZE = 64;
export const FEISHU_PROBE_REQUEST_TIMEOUT_MS = 10_000;
export async function probeFeishu(creds?: FeishuClientCredentials): Promise<FeishuProbeResult> {
export type ProbeFeishuOptions = {
timeoutMs?: number;
abortSignal?: AbortSignal;
};
type FeishuBotInfoResponse = {
code: number;
msg?: string;
bot?: { bot_name?: string; open_id?: string };
data?: { bot?: { bot_name?: string; open_id?: string } };
};
export async function probeFeishu(
creds?: FeishuClientCredentials,
options: ProbeFeishuOptions = {},
): Promise<FeishuProbeResult> {
if (!creds?.appId || !creds?.appSecret) {
return {
ok: false,
error: "missing credentials (appId, appSecret)",
};
}
if (options.abortSignal?.aborted) {
return {
ok: false,
appId: creds.appId,
error: "probe aborted",
};
}
const timeoutMs = options.timeoutMs ?? FEISHU_PROBE_REQUEST_TIMEOUT_MS;
// Return cached result if still valid.
// Use accountId when available; otherwise include appSecret prefix so two
@@ -32,12 +57,42 @@ export async function probeFeishu(creds?: FeishuClientCredentials): Promise<Feis
const client = createFeishuClient(creds);
// Use bot/v3/info API to get bot information
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- SDK generic request method
const response = await (client as any).request({
method: "GET",
url: "/open-apis/bot/v3/info",
data: {},
timeout: FEISHU_PROBE_REQUEST_TIMEOUT_MS,
});
const responseResult = await raceWithTimeoutAndAbort<FeishuBotInfoResponse>(
(client as any).request({
method: "GET",
url: "/open-apis/bot/v3/info",
data: {},
timeout: timeoutMs,
}) as Promise<FeishuBotInfoResponse>,
{
timeoutMs,
abortSignal: options.abortSignal,
},
);
if (responseResult.status === "aborted") {
return {
ok: false,
appId: creds.appId,
error: "probe aborted",
};
}
if (responseResult.status === "timeout") {
return {
ok: false,
appId: creds.appId,
error: `probe timed out after ${timeoutMs}ms`,
};
}
const response = responseResult.value;
if (options.abortSignal?.aborted) {
return {
ok: false,
appId: creds.appId,
error: "probe aborted",
};
}
if (response.code !== 0) {
return {