diff --git a/extensions/feishu/src/async.ts b/extensions/feishu/src/async.ts new file mode 100644 index 00000000000..7ad6ce792f4 --- /dev/null +++ b/extensions/feishu/src/async.ts @@ -0,0 +1,62 @@ +const RACE_TIMEOUT = Symbol("race-timeout"); +const RACE_ABORT = Symbol("race-abort"); + +export type RaceWithTimeoutAndAbortResult = + | { status: "resolved"; value: T } + | { status: "timeout" } + | { status: "aborted" }; + +export async function raceWithTimeoutAndAbort( + promise: Promise, + options: { + timeoutMs?: number; + abortSignal?: AbortSignal; + } = {}, +): Promise> { + if (options.abortSignal?.aborted) { + return { status: "aborted" }; + } + + if (options.timeoutMs === undefined && !options.abortSignal) { + return { status: "resolved", value: await promise }; + } + + let timeoutHandle: ReturnType | undefined; + let abortHandler: (() => void) | undefined; + const contenders: Array> = [promise]; + + if (options.timeoutMs !== undefined) { + contenders.push( + new Promise((resolve) => { + timeoutHandle = setTimeout(() => resolve(RACE_TIMEOUT), options.timeoutMs); + }), + ); + } + + if (options.abortSignal) { + contenders.push( + new Promise((resolve) => { + abortHandler = () => resolve(RACE_ABORT); + options.abortSignal?.addEventListener("abort", abortHandler, { once: true }); + }), + ); + } + + try { + const result = await Promise.race(contenders); + if (result === RACE_TIMEOUT) { + return { status: "timeout" }; + } + if (result === RACE_ABORT) { + return { status: "aborted" }; + } + return { status: "resolved", value: result }; + } finally { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + if (abortHandler) { + options.abortSignal?.removeEventListener("abort", abortHandler); + } + } +} diff --git a/extensions/feishu/src/monitor.account.ts b/extensions/feishu/src/monitor.account.ts new file mode 100644 index 00000000000..77dbf44dea9 --- /dev/null +++ b/extensions/feishu/src/monitor.account.ts @@ -0,0 +1,286 @@ +import * as crypto from "crypto"; +import * as Lark from "@larksuiteoapi/node-sdk"; +import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "openclaw/plugin-sdk"; +import { resolveFeishuAccount } from "./accounts.js"; +import { raceWithTimeoutAndAbort } from "./async.js"; +import { handleFeishuMessage, type FeishuMessageEvent, type FeishuBotAddedEvent } from "./bot.js"; +import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js"; +import { createEventDispatcher } from "./client.js"; +import { fetchBotOpenIdForMonitor } from "./monitor.startup.js"; +import { botOpenIds } from "./monitor.state.js"; +import { monitorWebhook, monitorWebSocket } from "./monitor.transport.js"; +import { getMessageFeishu } from "./send.js"; +import type { ResolvedFeishuAccount } from "./types.js"; + +const FEISHU_REACTION_VERIFY_TIMEOUT_MS = 1_500; + +export type FeishuReactionCreatedEvent = { + message_id: string; + chat_id?: string; + chat_type?: "p2p" | "group"; + reaction_type?: { emoji_type?: string }; + operator_type?: string; + user_id?: { open_id?: string }; + action_time?: string; +}; + +type ResolveReactionSyntheticEventParams = { + cfg: ClawdbotConfig; + accountId: string; + event: FeishuReactionCreatedEvent; + botOpenId?: string; + fetchMessage?: typeof getMessageFeishu; + verificationTimeoutMs?: number; + logger?: (message: string) => void; + uuid?: () => string; +}; + +export async function resolveReactionSyntheticEvent( + params: ResolveReactionSyntheticEventParams, +): Promise { + const { + cfg, + accountId, + event, + botOpenId, + fetchMessage = getMessageFeishu, + verificationTimeoutMs = FEISHU_REACTION_VERIFY_TIMEOUT_MS, + logger, + uuid = () => crypto.randomUUID(), + } = params; + + const emoji = event.reaction_type?.emoji_type; + const messageId = event.message_id; + const senderId = event.user_id?.open_id; + if (!emoji || !messageId || !senderId) { + return null; + } + + const account = resolveFeishuAccount({ cfg, accountId }); + const reactionNotifications = account.config.reactionNotifications ?? "own"; + if (reactionNotifications === "off") { + return null; + } + + if (event.operator_type === "app" || senderId === botOpenId) { + return null; + } + + if (emoji === "Typing") { + return null; + } + + if (reactionNotifications === "own" && !botOpenId) { + logger?.( + `feishu[${accountId}]: bot open_id unavailable, skipping reaction ${emoji} on ${messageId}`, + ); + return null; + } + + const reactedMsg = await raceWithTimeoutAndAbort(fetchMessage({ cfg, messageId, accountId }), { + timeoutMs: verificationTimeoutMs, + }) + .then((result) => (result.status === "resolved" ? result.value : null)) + .catch(() => null); + const isBotMessage = reactedMsg?.senderType === "app" || reactedMsg?.senderOpenId === botOpenId; + if (!reactedMsg || (reactionNotifications === "own" && !isBotMessage)) { + logger?.( + `feishu[${accountId}]: ignoring reaction on non-bot/unverified message ${messageId} ` + + `(sender: ${reactedMsg?.senderOpenId ?? "unknown"})`, + ); + return null; + } + + const syntheticChatIdRaw = event.chat_id ?? reactedMsg.chatId; + const syntheticChatId = syntheticChatIdRaw?.trim() ? syntheticChatIdRaw : `p2p:${senderId}`; + const syntheticChatType: "p2p" | "group" = event.chat_type ?? "p2p"; + return { + sender: { + sender_id: { open_id: senderId }, + sender_type: "user", + }, + message: { + message_id: `${messageId}:reaction:${emoji}:${uuid()}`, + chat_id: syntheticChatId, + chat_type: syntheticChatType, + message_type: "text", + content: JSON.stringify({ + text: `[reacted with ${emoji} to message ${messageId}]`, + }), + }, + }; +} + +type RegisterEventHandlersContext = { + cfg: ClawdbotConfig; + accountId: string; + runtime?: RuntimeEnv; + chatHistories: Map; + fireAndForget?: boolean; +}; + +function registerEventHandlers( + eventDispatcher: Lark.EventDispatcher, + context: RegisterEventHandlersContext, +): void { + const { cfg, accountId, runtime, chatHistories, fireAndForget } = context; + const log = runtime?.log ?? console.log; + const error = runtime?.error ?? console.error; + + eventDispatcher.register({ + "im.message.receive_v1": async (data) => { + try { + const event = data as unknown as FeishuMessageEvent; + const promise = handleFeishuMessage({ + cfg, + event, + botOpenId: botOpenIds.get(accountId), + runtime, + chatHistories, + accountId, + }); + if (fireAndForget) { + promise.catch((err) => { + error(`feishu[${accountId}]: error handling message: ${String(err)}`); + }); + } else { + await promise; + } + } catch (err) { + error(`feishu[${accountId}]: error handling message: ${String(err)}`); + } + }, + "im.message.message_read_v1": async () => { + // Ignore read receipts + }, + "im.chat.member.bot.added_v1": async (data) => { + try { + const event = data as unknown as FeishuBotAddedEvent; + log(`feishu[${accountId}]: bot added to chat ${event.chat_id}`); + } catch (err) { + error(`feishu[${accountId}]: error handling bot added event: ${String(err)}`); + } + }, + "im.chat.member.bot.deleted_v1": async (data) => { + try { + const event = data as unknown as { chat_id: string }; + log(`feishu[${accountId}]: bot removed from chat ${event.chat_id}`); + } catch (err) { + error(`feishu[${accountId}]: error handling bot removed event: ${String(err)}`); + } + }, + "im.message.reaction.created_v1": async (data) => { + const processReaction = async () => { + const event = data as FeishuReactionCreatedEvent; + const myBotId = botOpenIds.get(accountId); + const syntheticEvent = await resolveReactionSyntheticEvent({ + cfg, + accountId, + event, + botOpenId: myBotId, + logger: log, + }); + if (!syntheticEvent) { + return; + } + const promise = handleFeishuMessage({ + cfg, + event: syntheticEvent, + botOpenId: myBotId, + runtime, + chatHistories, + accountId, + }); + if (fireAndForget) { + promise.catch((err) => { + error(`feishu[${accountId}]: error handling reaction: ${String(err)}`); + }); + return; + } + await promise; + }; + + if (fireAndForget) { + void processReaction().catch((err) => { + error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`); + }); + return; + } + + try { + await processReaction(); + } catch (err) { + error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`); + } + }, + "im.message.reaction.deleted_v1": async () => { + // Ignore reaction removals + }, + "card.action.trigger": async (data: unknown) => { + try { + const event = data as unknown as FeishuCardActionEvent; + const promise = handleFeishuCardAction({ + cfg, + event, + botOpenId: botOpenIds.get(accountId), + runtime, + accountId, + }); + if (fireAndForget) { + promise.catch((err) => { + error(`feishu[${accountId}]: error handling card action: ${String(err)}`); + }); + } else { + await promise; + } + } catch (err) { + error(`feishu[${accountId}]: error handling card action: ${String(err)}`); + } + }, + }); +} + +export type BotOpenIdSource = { kind: "prefetched"; botOpenId?: string } | { kind: "fetch" }; + +export type MonitorSingleAccountParams = { + cfg: ClawdbotConfig; + account: ResolvedFeishuAccount; + runtime?: RuntimeEnv; + abortSignal?: AbortSignal; + botOpenIdSource?: BotOpenIdSource; +}; + +export async function monitorSingleAccount(params: MonitorSingleAccountParams): Promise { + const { cfg, account, runtime, abortSignal } = params; + const { accountId } = account; + const log = runtime?.log ?? console.log; + + const botOpenIdSource = params.botOpenIdSource ?? { kind: "fetch" }; + const botOpenId = + botOpenIdSource.kind === "prefetched" + ? botOpenIdSource.botOpenId + : await fetchBotOpenIdForMonitor(account, { runtime, abortSignal }); + botOpenIds.set(accountId, botOpenId ?? ""); + log(`feishu[${accountId}]: bot open_id resolved: ${botOpenId ?? "unknown"}`); + + const connectionMode = account.config.connectionMode ?? "websocket"; + if (connectionMode === "webhook" && !account.verificationToken?.trim()) { + throw new Error(`Feishu account "${accountId}" webhook mode requires verificationToken`); + } + + const eventDispatcher = createEventDispatcher(account); + const chatHistories = new Map(); + + registerEventHandlers(eventDispatcher, { + cfg, + accountId, + runtime, + chatHistories, + fireAndForget: true, + }); + + if (connectionMode === "webhook") { + return monitorWebhook({ account, accountId, runtime, abortSignal, eventDispatcher }); + } + return monitorWebSocket({ account, accountId, runtime, abortSignal, eventDispatcher }); +} diff --git a/extensions/feishu/src/monitor.startup.test.ts b/extensions/feishu/src/monitor.startup.test.ts new file mode 100644 index 00000000000..5abd61cc5b7 --- /dev/null +++ b/extensions/feishu/src/monitor.startup.test.ts @@ -0,0 +1,187 @@ +import type { ClawdbotConfig } from "openclaw/plugin-sdk"; +import { afterEach, describe, expect, it, vi } from "vitest"; + +const probeFeishuMock = vi.hoisted(() => vi.fn()); + +vi.mock("./probe.js", () => ({ + probeFeishu: probeFeishuMock, +})); + +vi.mock("./client.js", () => ({ + createFeishuWSClient: vi.fn(() => ({ start: vi.fn() })), + createEventDispatcher: vi.fn(() => ({ register: vi.fn() })), +})); + +import { monitorFeishuProvider, stopFeishuMonitor } from "./monitor.js"; + +function buildMultiAccountWebsocketConfig(accountIds: string[]): ClawdbotConfig { + return { + channels: { + feishu: { + enabled: true, + accounts: Object.fromEntries( + accountIds.map((accountId) => [ + accountId, + { + enabled: true, + appId: `cli_${accountId}`, + appSecret: `secret_${accountId}`, + connectionMode: "websocket", + }, + ]), + ), + }, + }, + } as ClawdbotConfig; +} + +afterEach(() => { + stopFeishuMonitor(); +}); + +describe("Feishu monitor startup preflight", () => { + it("starts account probes sequentially to avoid startup bursts", async () => { + let inFlight = 0; + let maxInFlight = 0; + const started: string[] = []; + let releaseProbes!: () => void; + const probesReleased = new Promise((resolve) => { + releaseProbes = () => resolve(); + }); + probeFeishuMock.mockImplementation(async (account: { accountId: string }) => { + started.push(account.accountId); + inFlight += 1; + maxInFlight = Math.max(maxInFlight, inFlight); + await probesReleased; + inFlight -= 1; + return { ok: true, botOpenId: `bot_${account.accountId}` }; + }); + + const abortController = new AbortController(); + const monitorPromise = monitorFeishuProvider({ + config: buildMultiAccountWebsocketConfig(["alpha", "beta", "gamma"]), + abortSignal: abortController.signal, + }); + + try { + await Promise.resolve(); + await Promise.resolve(); + + expect(started).toEqual(["alpha"]); + expect(maxInFlight).toBe(1); + } finally { + releaseProbes(); + abortController.abort(); + await monitorPromise; + } + }); + + it("does not refetch bot info after a failed sequential preflight", async () => { + const started: string[] = []; + let releaseBetaProbe!: () => void; + const betaProbeReleased = new Promise((resolve) => { + releaseBetaProbe = () => resolve(); + }); + + probeFeishuMock.mockImplementation(async (account: { accountId: string }) => { + started.push(account.accountId); + if (account.accountId === "alpha") { + return { ok: false }; + } + await betaProbeReleased; + return { ok: true, botOpenId: `bot_${account.accountId}` }; + }); + + const abortController = new AbortController(); + const monitorPromise = monitorFeishuProvider({ + config: buildMultiAccountWebsocketConfig(["alpha", "beta"]), + abortSignal: abortController.signal, + }); + + try { + for (let i = 0; i < 10 && !started.includes("beta"); i += 1) { + await Promise.resolve(); + } + + expect(started).toEqual(["alpha", "beta"]); + expect(started.filter((accountId) => accountId === "alpha")).toHaveLength(1); + } finally { + releaseBetaProbe(); + abortController.abort(); + await monitorPromise; + } + }); + + it("continues startup when probe layer reports timeout", async () => { + const started: string[] = []; + let releaseBetaProbe!: () => void; + const betaProbeReleased = new Promise((resolve) => { + releaseBetaProbe = () => resolve(); + }); + + probeFeishuMock.mockImplementation((account: { accountId: string }) => { + started.push(account.accountId); + if (account.accountId === "alpha") { + return Promise.resolve({ ok: false, error: "probe timed out after 10000ms" }); + } + return betaProbeReleased.then(() => ({ ok: true, botOpenId: `bot_${account.accountId}` })); + }); + + const abortController = new AbortController(); + const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() }; + const monitorPromise = monitorFeishuProvider({ + config: buildMultiAccountWebsocketConfig(["alpha", "beta"]), + runtime, + abortSignal: abortController.signal, + }); + + try { + for (let i = 0; i < 10 && !started.includes("beta"); i += 1) { + await Promise.resolve(); + } + + expect(started).toEqual(["alpha", "beta"]); + expect(runtime.error).toHaveBeenCalledWith( + expect.stringContaining("bot info probe timed out"), + ); + } finally { + releaseBetaProbe(); + abortController.abort(); + await monitorPromise; + } + }); + + it("stops sequential preflight when aborted during probe", async () => { + const started: string[] = []; + probeFeishuMock.mockImplementation( + (account: { accountId: string }, options: { abortSignal?: AbortSignal }) => { + started.push(account.accountId); + return new Promise((resolve) => { + options.abortSignal?.addEventListener( + "abort", + () => resolve({ ok: false, error: "probe aborted" }), + { once: true }, + ); + }); + }, + ); + + const abortController = new AbortController(); + const monitorPromise = monitorFeishuProvider({ + config: buildMultiAccountWebsocketConfig(["alpha", "beta"]), + abortSignal: abortController.signal, + }); + + try { + await Promise.resolve(); + expect(started).toEqual(["alpha"]); + + abortController.abort(); + await monitorPromise; + + expect(started).toEqual(["alpha"]); + } finally { + abortController.abort(); + } + }); +}); diff --git a/extensions/feishu/src/monitor.startup.ts b/extensions/feishu/src/monitor.startup.ts new file mode 100644 index 00000000000..aab61bca933 --- /dev/null +++ b/extensions/feishu/src/monitor.startup.ts @@ -0,0 +1,51 @@ +import type { RuntimeEnv } from "openclaw/plugin-sdk"; +import { probeFeishu } from "./probe.js"; +import type { ResolvedFeishuAccount } from "./types.js"; + +export const FEISHU_STARTUP_BOT_INFO_TIMEOUT_MS = 10_000; + +type FetchBotOpenIdOptions = { + runtime?: RuntimeEnv; + abortSignal?: AbortSignal; + timeoutMs?: number; +}; + +function isTimeoutErrorMessage(message: string | undefined): boolean { + return message?.toLowerCase().includes("timeout") || message?.toLowerCase().includes("timed out") + ? true + : false; +} + +function isAbortErrorMessage(message: string | undefined): boolean { + return message?.toLowerCase().includes("aborted") ?? false; +} + +export async function fetchBotOpenIdForMonitor( + account: ResolvedFeishuAccount, + options: FetchBotOpenIdOptions = {}, +): Promise { + if (options.abortSignal?.aborted) { + return undefined; + } + + const timeoutMs = options.timeoutMs ?? FEISHU_STARTUP_BOT_INFO_TIMEOUT_MS; + const result = await probeFeishu(account, { + timeoutMs, + abortSignal: options.abortSignal, + }); + if (result.ok) { + return result.botOpenId; + } + + if (options.abortSignal?.aborted || isAbortErrorMessage(result.error)) { + return undefined; + } + + if (isTimeoutErrorMessage(result.error)) { + const error = options.runtime?.error ?? console.error; + error( + `feishu[${account.accountId}]: bot info probe timed out after ${timeoutMs}ms; continuing startup`, + ); + } + return undefined; +} diff --git a/extensions/feishu/src/monitor.state.ts b/extensions/feishu/src/monitor.state.ts new file mode 100644 index 00000000000..95a0beb3bf4 --- /dev/null +++ b/extensions/feishu/src/monitor.state.ts @@ -0,0 +1,76 @@ +import * as http from "http"; +import * as Lark from "@larksuiteoapi/node-sdk"; +import { + createFixedWindowRateLimiter, + createWebhookAnomalyTracker, + type RuntimeEnv, + WEBHOOK_ANOMALY_COUNTER_DEFAULTS, + WEBHOOK_RATE_LIMIT_DEFAULTS, +} from "openclaw/plugin-sdk"; + +export const wsClients = new Map(); +export const httpServers = new Map(); +export const botOpenIds = new Map(); + +export const FEISHU_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; +export const FEISHU_WEBHOOK_BODY_TIMEOUT_MS = 30_000; + +export const feishuWebhookRateLimiter = createFixedWindowRateLimiter({ + windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs, + maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests, + maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys, +}); + +const feishuWebhookAnomalyTracker = createWebhookAnomalyTracker({ + maxTrackedKeys: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.maxTrackedKeys, + ttlMs: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.ttlMs, + logEvery: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.logEvery, +}); + +export function clearFeishuWebhookRateLimitStateForTest(): void { + feishuWebhookRateLimiter.clear(); + feishuWebhookAnomalyTracker.clear(); +} + +export function getFeishuWebhookRateLimitStateSizeForTest(): number { + return feishuWebhookRateLimiter.size(); +} + +export function isWebhookRateLimitedForTest(key: string, nowMs: number): boolean { + return feishuWebhookRateLimiter.isRateLimited(key, nowMs); +} + +export function recordWebhookStatus( + runtime: RuntimeEnv | undefined, + accountId: string, + path: string, + statusCode: number, +): void { + feishuWebhookAnomalyTracker.record({ + key: `${accountId}:${path}:${statusCode}`, + statusCode, + log: runtime?.log ?? console.log, + message: (count) => + `feishu[${accountId}]: webhook anomaly path=${path} status=${statusCode} count=${count}`, + }); +} + +export function stopFeishuMonitorState(accountId?: string): void { + if (accountId) { + wsClients.delete(accountId); + const server = httpServers.get(accountId); + if (server) { + server.close(); + httpServers.delete(accountId); + } + botOpenIds.delete(accountId); + return; + } + + wsClients.clear(); + for (const server of httpServers.values()) { + server.close(); + } + httpServers.clear(); + botOpenIds.clear(); +} diff --git a/extensions/feishu/src/monitor.transport.ts b/extensions/feishu/src/monitor.transport.ts new file mode 100644 index 00000000000..9fcb2783f39 --- /dev/null +++ b/extensions/feishu/src/monitor.transport.ts @@ -0,0 +1,163 @@ +import * as http from "http"; +import * as Lark from "@larksuiteoapi/node-sdk"; +import { + applyBasicWebhookRequestGuards, + type RuntimeEnv, + installRequestBodyLimitGuard, +} from "openclaw/plugin-sdk"; +import { createFeishuWSClient } from "./client.js"; +import { + botOpenIds, + FEISHU_WEBHOOK_BODY_TIMEOUT_MS, + FEISHU_WEBHOOK_MAX_BODY_BYTES, + feishuWebhookRateLimiter, + httpServers, + recordWebhookStatus, + wsClients, +} from "./monitor.state.js"; +import type { ResolvedFeishuAccount } from "./types.js"; + +export type MonitorTransportParams = { + account: ResolvedFeishuAccount; + accountId: string; + runtime?: RuntimeEnv; + abortSignal?: AbortSignal; + eventDispatcher: Lark.EventDispatcher; +}; + +export async function monitorWebSocket({ + account, + accountId, + runtime, + abortSignal, + eventDispatcher, +}: MonitorTransportParams): Promise { + const log = runtime?.log ?? console.log; + log(`feishu[${accountId}]: starting WebSocket connection...`); + + const wsClient = createFeishuWSClient(account); + wsClients.set(accountId, wsClient); + + return new Promise((resolve, reject) => { + const cleanup = () => { + wsClients.delete(accountId); + botOpenIds.delete(accountId); + }; + + const handleAbort = () => { + log(`feishu[${accountId}]: abort signal received, stopping`); + cleanup(); + resolve(); + }; + + if (abortSignal?.aborted) { + cleanup(); + resolve(); + return; + } + + abortSignal?.addEventListener("abort", handleAbort, { once: true }); + + try { + wsClient.start({ eventDispatcher }); + log(`feishu[${accountId}]: WebSocket client started`); + } catch (err) { + cleanup(); + abortSignal?.removeEventListener("abort", handleAbort); + reject(err); + } + }); +} + +export async function monitorWebhook({ + account, + accountId, + runtime, + abortSignal, + eventDispatcher, +}: MonitorTransportParams): Promise { + const log = runtime?.log ?? console.log; + const error = runtime?.error ?? console.error; + + const port = account.config.webhookPort ?? 3000; + const path = account.config.webhookPath ?? "/feishu/events"; + const host = account.config.webhookHost ?? "127.0.0.1"; + + log(`feishu[${accountId}]: starting Webhook server on ${host}:${port}, path ${path}...`); + + const server = http.createServer(); + const webhookHandler = Lark.adaptDefault(path, eventDispatcher, { autoChallenge: true }); + + server.on("request", (req, res) => { + res.on("finish", () => { + recordWebhookStatus(runtime, accountId, path, res.statusCode); + }); + + const rateLimitKey = `${accountId}:${path}:${req.socket.remoteAddress ?? "unknown"}`; + if ( + !applyBasicWebhookRequestGuards({ + req, + res, + rateLimiter: feishuWebhookRateLimiter, + rateLimitKey, + nowMs: Date.now(), + requireJsonContentType: true, + }) + ) { + return; + } + + const guard = installRequestBodyLimitGuard(req, res, { + maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES, + timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS, + responseFormat: "text", + }); + if (guard.isTripped()) { + return; + } + + void Promise.resolve(webhookHandler(req, res)) + .catch((err) => { + if (!guard.isTripped()) { + error(`feishu[${accountId}]: webhook handler error: ${String(err)}`); + } + }) + .finally(() => { + guard.dispose(); + }); + }); + + httpServers.set(accountId, server); + + return new Promise((resolve, reject) => { + const cleanup = () => { + server.close(); + httpServers.delete(accountId); + botOpenIds.delete(accountId); + }; + + const handleAbort = () => { + log(`feishu[${accountId}]: abort signal received, stopping Webhook server`); + cleanup(); + resolve(); + }; + + if (abortSignal?.aborted) { + cleanup(); + resolve(); + return; + } + + abortSignal?.addEventListener("abort", handleAbort, { once: true }); + + server.listen(port, host, () => { + log(`feishu[${accountId}]: Webhook server listening on ${host}:${port}`); + }); + + server.on("error", (err) => { + error(`feishu[${accountId}]: Webhook server error: ${err}`); + abortSignal?.removeEventListener("abort", handleAbort); + reject(err); + }); + }); +} diff --git a/extensions/feishu/src/monitor.ts b/extensions/feishu/src/monitor.ts index 7e9a9d90c1c..b7156fd238d 100644 --- a/extensions/feishu/src/monitor.ts +++ b/extensions/feishu/src/monitor.ts @@ -1,24 +1,17 @@ -import * as crypto from "crypto"; -import * as http from "http"; -import * as Lark from "@larksuiteoapi/node-sdk"; +import type { ClawdbotConfig, RuntimeEnv } from "openclaw/plugin-sdk"; +import { listEnabledFeishuAccounts, resolveFeishuAccount } from "./accounts.js"; import { - applyBasicWebhookRequestGuards, - type ClawdbotConfig, - createFixedWindowRateLimiter, - createWebhookAnomalyTracker, - type RuntimeEnv, - type HistoryEntry, - installRequestBodyLimitGuard, - WEBHOOK_ANOMALY_COUNTER_DEFAULTS, - WEBHOOK_RATE_LIMIT_DEFAULTS, -} from "openclaw/plugin-sdk"; -import { resolveFeishuAccount, listEnabledFeishuAccounts } from "./accounts.js"; -import { handleFeishuMessage, type FeishuMessageEvent, type FeishuBotAddedEvent } from "./bot.js"; -import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js"; -import { createFeishuWSClient, createEventDispatcher } from "./client.js"; -import { probeFeishu } from "./probe.js"; -import { getMessageFeishu } from "./send.js"; -import type { ResolvedFeishuAccount } from "./types.js"; + monitorSingleAccount, + resolveReactionSyntheticEvent, + type FeishuReactionCreatedEvent, +} from "./monitor.account.js"; +import { fetchBotOpenIdForMonitor } from "./monitor.startup.js"; +import { + clearFeishuWebhookRateLimitStateForTest, + getFeishuWebhookRateLimitStateSizeForTest, + isWebhookRateLimitedForTest, + stopFeishuMonitorState, +} from "./monitor.state.js"; export type MonitorFeishuOpts = { config?: ClawdbotConfig; @@ -27,556 +20,14 @@ export type MonitorFeishuOpts = { accountId?: string; }; -// Per-account WebSocket clients, HTTP servers, and bot info -const wsClients = new Map(); -const httpServers = new Map(); -const botOpenIds = new Map(); -const FEISHU_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; -const FEISHU_WEBHOOK_BODY_TIMEOUT_MS = 30_000; -const FEISHU_REACTION_VERIFY_TIMEOUT_MS = 1_500; -const FEISHU_STARTUP_BOT_INFO_TIMEOUT_MS = 10_000; -const FEISHU_BOT_INFO_FETCH_ABORTED = Symbol("feishu-bot-info-fetch-aborted"); -const FEISHU_BOT_INFO_FETCH_TIMED_OUT = Symbol("feishu-bot-info-fetch-timed-out"); - -export type FeishuReactionCreatedEvent = { - message_id: string; - chat_id?: string; - chat_type?: "p2p" | "group"; - reaction_type?: { emoji_type?: string }; - operator_type?: string; - user_id?: { open_id?: string }; - action_time?: string; +export { + clearFeishuWebhookRateLimitStateForTest, + getFeishuWebhookRateLimitStateSizeForTest, + isWebhookRateLimitedForTest, + resolveReactionSyntheticEvent, }; +export type { FeishuReactionCreatedEvent }; -type ResolveReactionSyntheticEventParams = { - cfg: ClawdbotConfig; - accountId: string; - event: FeishuReactionCreatedEvent; - botOpenId?: string; - fetchMessage?: typeof getMessageFeishu; - verificationTimeoutMs?: number; - logger?: (message: string) => void; - uuid?: () => string; -}; - -const feishuWebhookRateLimiter = createFixedWindowRateLimiter({ - windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs, - maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests, - maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys, -}); -const feishuWebhookAnomalyTracker = createWebhookAnomalyTracker({ - maxTrackedKeys: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.maxTrackedKeys, - ttlMs: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.ttlMs, - logEvery: WEBHOOK_ANOMALY_COUNTER_DEFAULTS.logEvery, -}); - -export function clearFeishuWebhookRateLimitStateForTest(): void { - feishuWebhookRateLimiter.clear(); - feishuWebhookAnomalyTracker.clear(); -} - -export function getFeishuWebhookRateLimitStateSizeForTest(): number { - return feishuWebhookRateLimiter.size(); -} - -export function isWebhookRateLimitedForTest(key: string, nowMs: number): boolean { - return feishuWebhookRateLimiter.isRateLimited(key, nowMs); -} - -function recordWebhookStatus( - runtime: RuntimeEnv | undefined, - accountId: string, - path: string, - statusCode: number, -): void { - feishuWebhookAnomalyTracker.record({ - key: `${accountId}:${path}:${statusCode}`, - statusCode, - log: runtime?.log ?? console.log, - message: (count) => - `feishu[${accountId}]: webhook anomaly path=${path} status=${statusCode} count=${count}`, - }); -} - -async function withTimeout(promise: Promise, timeoutMs: number): Promise { - let timeoutId: NodeJS.Timeout | undefined; - try { - return await Promise.race([ - promise, - new Promise((resolve) => { - timeoutId = setTimeout(() => resolve(null), timeoutMs); - }), - ]); - } finally { - if (timeoutId) { - clearTimeout(timeoutId); - } - } -} - -export async function resolveReactionSyntheticEvent( - params: ResolveReactionSyntheticEventParams, -): Promise { - const { - cfg, - accountId, - event, - botOpenId, - fetchMessage = getMessageFeishu, - verificationTimeoutMs = FEISHU_REACTION_VERIFY_TIMEOUT_MS, - logger, - uuid = () => crypto.randomUUID(), - } = params; - - const emoji = event.reaction_type?.emoji_type; - const messageId = event.message_id; - const senderId = event.user_id?.open_id; - if (!emoji || !messageId || !senderId) { - return null; - } - - const account = resolveFeishuAccount({ cfg, accountId }); - const reactionNotifications = account.config.reactionNotifications ?? "own"; - if (reactionNotifications === "off") { - return null; - } - - // Skip bot self-reactions - if (event.operator_type === "app" || senderId === botOpenId) { - return null; - } - - // Skip typing indicator emoji - if (emoji === "Typing") { - return null; - } - - if (reactionNotifications === "own" && !botOpenId) { - logger?.( - `feishu[${accountId}]: bot open_id unavailable, skipping reaction ${emoji} on ${messageId}`, - ); - return null; - } - - const reactedMsg = await withTimeout( - fetchMessage({ cfg, messageId, accountId }), - verificationTimeoutMs, - ).catch(() => null); - const isBotMessage = reactedMsg?.senderType === "app" || reactedMsg?.senderOpenId === botOpenId; - if (!reactedMsg || (reactionNotifications === "own" && !isBotMessage)) { - logger?.( - `feishu[${accountId}]: ignoring reaction on non-bot/unverified message ${messageId} ` + - `(sender: ${reactedMsg?.senderOpenId ?? "unknown"})`, - ); - return null; - } - - const syntheticChatIdRaw = event.chat_id ?? reactedMsg.chatId; - const syntheticChatId = syntheticChatIdRaw?.trim() ? syntheticChatIdRaw : `p2p:${senderId}`; - const syntheticChatType: "p2p" | "group" = event.chat_type ?? "p2p"; - return { - sender: { - sender_id: { open_id: senderId }, - sender_type: "user", - }, - message: { - message_id: `${messageId}:reaction:${emoji}:${uuid()}`, - chat_id: syntheticChatId, - chat_type: syntheticChatType, - message_type: "text", - content: JSON.stringify({ - text: `[reacted with ${emoji} to message ${messageId}]`, - }), - }, - }; -} - -type FetchBotOpenIdOptions = { - runtime?: RuntimeEnv; - abortSignal?: AbortSignal; - timeoutMs?: number; -}; - -async function fetchBotOpenId( - account: ResolvedFeishuAccount, - options: FetchBotOpenIdOptions = {}, -): Promise { - if (options.abortSignal?.aborted) { - return undefined; - } - - const timeoutMs = options.timeoutMs ?? FEISHU_STARTUP_BOT_INFO_TIMEOUT_MS; - let timeoutHandle: ReturnType | undefined; - let abortHandler: (() => void) | undefined; - try { - const contenders: Array< - Promise< - | string - | undefined - | typeof FEISHU_BOT_INFO_FETCH_ABORTED - | typeof FEISHU_BOT_INFO_FETCH_TIMED_OUT - > - > = [ - probeFeishu(account) - .then((result) => (result.ok ? result.botOpenId : undefined)) - .catch(() => undefined), - new Promise((resolve) => { - timeoutHandle = setTimeout(() => resolve(FEISHU_BOT_INFO_FETCH_TIMED_OUT), timeoutMs); - }), - ]; - if (options.abortSignal) { - contenders.push( - new Promise((resolve) => { - abortHandler = () => resolve(FEISHU_BOT_INFO_FETCH_ABORTED); - options.abortSignal?.addEventListener("abort", abortHandler, { once: true }); - }), - ); - } - const outcome = await Promise.race(contenders); - if (outcome === FEISHU_BOT_INFO_FETCH_ABORTED) { - return undefined; - } - if (outcome === FEISHU_BOT_INFO_FETCH_TIMED_OUT) { - const error = options.runtime?.error ?? console.error; - error( - `feishu[${account.accountId}]: bot info probe timed out after ${timeoutMs}ms; continuing startup`, - ); - return undefined; - } - return outcome; - } catch { - return undefined; - } finally { - if (timeoutHandle) { - clearTimeout(timeoutHandle); - } - if (abortHandler) { - options.abortSignal?.removeEventListener("abort", abortHandler); - } - } -} - -/** - * Register common event handlers on an EventDispatcher. - * When fireAndForget is true, message handling is not awaited to avoid blocking - * event processing (Lark webhooks require <3s response). - */ -function registerEventHandlers( - eventDispatcher: Lark.EventDispatcher, - context: { - cfg: ClawdbotConfig; - accountId: string; - runtime?: RuntimeEnv; - chatHistories: Map; - fireAndForget?: boolean; - }, -) { - const { cfg, accountId, runtime, chatHistories, fireAndForget } = context; - const log = runtime?.log ?? console.log; - const error = runtime?.error ?? console.error; - - eventDispatcher.register({ - "im.message.receive_v1": async (data) => { - try { - const event = data as unknown as FeishuMessageEvent; - const promise = handleFeishuMessage({ - cfg, - event, - botOpenId: botOpenIds.get(accountId), - runtime, - chatHistories, - accountId, - }); - if (fireAndForget) { - promise.catch((err) => { - error(`feishu[${accountId}]: error handling message: ${String(err)}`); - }); - } else { - await promise; - } - } catch (err) { - error(`feishu[${accountId}]: error handling message: ${String(err)}`); - } - }, - "im.message.message_read_v1": async () => { - // Ignore read receipts - }, - "im.chat.member.bot.added_v1": async (data) => { - try { - const event = data as unknown as FeishuBotAddedEvent; - log(`feishu[${accountId}]: bot added to chat ${event.chat_id}`); - } catch (err) { - error(`feishu[${accountId}]: error handling bot added event: ${String(err)}`); - } - }, - "im.chat.member.bot.deleted_v1": async (data) => { - try { - const event = data as unknown as { chat_id: string }; - log(`feishu[${accountId}]: bot removed from chat ${event.chat_id}`); - } catch (err) { - error(`feishu[${accountId}]: error handling bot removed event: ${String(err)}`); - } - }, - "im.message.reaction.created_v1": async (data) => { - const processReaction = async () => { - const event = data as FeishuReactionCreatedEvent; - const myBotId = botOpenIds.get(accountId); - const syntheticEvent = await resolveReactionSyntheticEvent({ - cfg, - accountId, - event, - botOpenId: myBotId, - logger: log, - }); - if (!syntheticEvent) { - return; - } - const promise = handleFeishuMessage({ - cfg, - event: syntheticEvent, - botOpenId: myBotId, - runtime, - chatHistories, - accountId, - }); - if (fireAndForget) { - promise.catch((err) => { - error(`feishu[${accountId}]: error handling reaction: ${String(err)}`); - }); - return; - } - await promise; - }; - - if (fireAndForget) { - void processReaction().catch((err) => { - error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`); - }); - return; - } - - try { - await processReaction(); - } catch (err) { - error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`); - } - }, - "im.message.reaction.deleted_v1": async () => { - // Ignore reaction removals - }, - "card.action.trigger": async (data: unknown) => { - try { - const event = data as unknown as FeishuCardActionEvent; - const promise = handleFeishuCardAction({ - cfg, - event, - botOpenId: botOpenIds.get(accountId), - runtime, - accountId, - }); - if (fireAndForget) { - promise.catch((err) => { - error(`feishu[${accountId}]: error handling card action: ${String(err)}`); - }); - } else { - await promise; - } - } catch (err) { - error(`feishu[${accountId}]: error handling card action: ${String(err)}`); - } - }, - }); -} - -type MonitorAccountParams = { - cfg: ClawdbotConfig; - account: ResolvedFeishuAccount; - runtime?: RuntimeEnv; - abortSignal?: AbortSignal; - botOpenId?: string; - botOpenIdPrefetched?: boolean; -}; - -/** - * Monitor a single Feishu account. - */ -async function monitorSingleAccount(params: MonitorAccountParams): Promise { - const { cfg, account, runtime, abortSignal } = params; - const { accountId } = account; - const log = runtime?.log ?? console.log; - - // Fetch bot open_id - const botOpenId = params.botOpenIdPrefetched - ? params.botOpenId - : await fetchBotOpenId(account, { runtime, abortSignal }); - botOpenIds.set(accountId, botOpenId ?? ""); - log(`feishu[${accountId}]: bot open_id resolved: ${botOpenId ?? "unknown"}`); - - const connectionMode = account.config.connectionMode ?? "websocket"; - if (connectionMode === "webhook" && !account.verificationToken?.trim()) { - throw new Error(`Feishu account "${accountId}" webhook mode requires verificationToken`); - } - const eventDispatcher = createEventDispatcher(account); - const chatHistories = new Map(); - - registerEventHandlers(eventDispatcher, { - cfg, - accountId, - runtime, - chatHistories, - fireAndForget: true, - }); - - if (connectionMode === "webhook") { - return monitorWebhook({ params, accountId, eventDispatcher }); - } - - return monitorWebSocket({ params, accountId, eventDispatcher }); -} - -type ConnectionParams = { - params: MonitorAccountParams; - accountId: string; - eventDispatcher: Lark.EventDispatcher; -}; - -async function monitorWebSocket({ - params, - accountId, - eventDispatcher, -}: ConnectionParams): Promise { - const { account, runtime, abortSignal } = params; - const log = runtime?.log ?? console.log; - const error = runtime?.error ?? console.error; - - log(`feishu[${accountId}]: starting WebSocket connection...`); - - const wsClient = createFeishuWSClient(account); - wsClients.set(accountId, wsClient); - - return new Promise((resolve, reject) => { - const cleanup = () => { - wsClients.delete(accountId); - botOpenIds.delete(accountId); - }; - - const handleAbort = () => { - log(`feishu[${accountId}]: abort signal received, stopping`); - cleanup(); - resolve(); - }; - - if (abortSignal?.aborted) { - cleanup(); - resolve(); - return; - } - - abortSignal?.addEventListener("abort", handleAbort, { once: true }); - - try { - wsClient.start({ eventDispatcher }); - log(`feishu[${accountId}]: WebSocket client started`); - } catch (err) { - cleanup(); - abortSignal?.removeEventListener("abort", handleAbort); - reject(err); - } - }); -} - -async function monitorWebhook({ - params, - accountId, - eventDispatcher, -}: ConnectionParams): Promise { - const { account, runtime, abortSignal } = params; - const log = runtime?.log ?? console.log; - const error = runtime?.error ?? console.error; - - const port = account.config.webhookPort ?? 3000; - const path = account.config.webhookPath ?? "/feishu/events"; - const host = account.config.webhookHost ?? "127.0.0.1"; - - log(`feishu[${accountId}]: starting Webhook server on ${host}:${port}, path ${path}...`); - - const server = http.createServer(); - const webhookHandler = Lark.adaptDefault(path, eventDispatcher, { autoChallenge: true }); - server.on("request", (req, res) => { - res.on("finish", () => { - recordWebhookStatus(runtime, accountId, path, res.statusCode); - }); - - const rateLimitKey = `${accountId}:${path}:${req.socket.remoteAddress ?? "unknown"}`; - if ( - !applyBasicWebhookRequestGuards({ - req, - res, - rateLimiter: feishuWebhookRateLimiter, - rateLimitKey, - nowMs: Date.now(), - requireJsonContentType: true, - }) - ) { - return; - } - - const guard = installRequestBodyLimitGuard(req, res, { - maxBytes: FEISHU_WEBHOOK_MAX_BODY_BYTES, - timeoutMs: FEISHU_WEBHOOK_BODY_TIMEOUT_MS, - responseFormat: "text", - }); - if (guard.isTripped()) { - return; - } - void Promise.resolve(webhookHandler(req, res)) - .catch((err) => { - if (!guard.isTripped()) { - error(`feishu[${accountId}]: webhook handler error: ${String(err)}`); - } - }) - .finally(() => { - guard.dispose(); - }); - }); - httpServers.set(accountId, server); - - return new Promise((resolve, reject) => { - const cleanup = () => { - server.close(); - httpServers.delete(accountId); - botOpenIds.delete(accountId); - }; - - const handleAbort = () => { - log(`feishu[${accountId}]: abort signal received, stopping Webhook server`); - cleanup(); - resolve(); - }; - - if (abortSignal?.aborted) { - cleanup(); - resolve(); - return; - } - - abortSignal?.addEventListener("abort", handleAbort, { once: true }); - - server.listen(port, host, () => { - log(`feishu[${accountId}]: Webhook server listening on ${host}:${port}`); - }); - - server.on("error", (err) => { - error(`feishu[${accountId}]: Webhook server error: ${err}`); - abortSignal?.removeEventListener("abort", handleAbort); - reject(err); - }); - }); -} - -/** - * Main entry: start monitoring for all enabled accounts. - */ export async function monitorFeishuProvider(opts: MonitorFeishuOpts = {}): Promise { const cfg = opts.config; if (!cfg) { @@ -585,7 +36,6 @@ export async function monitorFeishuProvider(opts: MonitorFeishuOpts = {}): Promi const log = opts.runtime?.log ?? console.log; - // If accountId is specified, only monitor that account if (opts.accountId) { const account = resolveFeishuAccount({ cfg, accountId: opts.accountId }); if (!account.enabled || !account.configured) { @@ -599,7 +49,6 @@ export async function monitorFeishuProvider(opts: MonitorFeishuOpts = {}): Promi }); } - // Otherwise, start all enabled accounts const accounts = listEnabledFeishuAccounts(cfg); if (accounts.length === 0) { throw new Error("No enabled Feishu accounts configured"); @@ -615,47 +64,32 @@ export async function monitorFeishuProvider(opts: MonitorFeishuOpts = {}): Promi log("feishu: abort signal received during startup preflight; stopping startup"); break; } + // Probe sequentially so large multi-account startups do not burst Feishu's bot-info endpoint. - const botOpenId = await fetchBotOpenId(account, { + const botOpenId = await fetchBotOpenIdForMonitor(account, { runtime: opts.runtime, abortSignal: opts.abortSignal, }); + if (opts.abortSignal?.aborted) { log("feishu: abort signal received during startup preflight; stopping startup"); break; } + monitorPromises.push( monitorSingleAccount({ cfg, account, runtime: opts.runtime, abortSignal: opts.abortSignal, - botOpenId, - botOpenIdPrefetched: true, + botOpenIdSource: { kind: "prefetched", botOpenId }, }), ); } + await Promise.all(monitorPromises); } -/** - * Stop monitoring for a specific account or all accounts. - */ export function stopFeishuMonitor(accountId?: string): void { - if (accountId) { - wsClients.delete(accountId); - const server = httpServers.get(accountId); - if (server) { - server.close(); - httpServers.delete(accountId); - } - botOpenIds.delete(accountId); - } else { - wsClients.clear(); - for (const server of httpServers.values()) { - server.close(); - } - httpServers.clear(); - botOpenIds.clear(); - } + stopFeishuMonitorState(accountId); } diff --git a/extensions/feishu/src/monitor.webhook-security.test.ts b/extensions/feishu/src/monitor.webhook-security.test.ts index fe27ef755e6..9da288032de 100644 --- a/extensions/feishu/src/monitor.webhook-security.test.ts +++ b/extensions/feishu/src/monitor.webhook-security.test.ts @@ -84,27 +84,6 @@ function buildConfig(params: { } as ClawdbotConfig; } -function buildMultiAccountWebsocketConfig(accountIds: string[]): ClawdbotConfig { - return { - channels: { - feishu: { - enabled: true, - accounts: Object.fromEntries( - accountIds.map((accountId) => [ - accountId, - { - enabled: true, - appId: `cli_${accountId}`, - appSecret: `secret_${accountId}`, - connectionMode: "websocket", - }, - ]), - ), - }, - }, - } as ClawdbotConfig; -} - async function withRunningWebhookMonitor( params: { accountId: string; @@ -227,145 +206,4 @@ describe("Feishu webhook security hardening", () => { isWebhookRateLimitedForTest("/feishu-rate-limit-stale:fresh", now + 60_001); expect(getFeishuWebhookRateLimitStateSizeForTest()).toBe(1); }); - - it("starts account probes sequentially to avoid startup bursts", async () => { - let inFlight = 0; - let maxInFlight = 0; - const started: string[] = []; - let releaseProbes!: () => void; - const probesReleased = new Promise((resolve) => { - releaseProbes = () => resolve(); - }); - probeFeishuMock.mockImplementation(async (account: { accountId: string }) => { - started.push(account.accountId); - inFlight += 1; - maxInFlight = Math.max(maxInFlight, inFlight); - await probesReleased; - inFlight -= 1; - return { ok: true, botOpenId: `bot_${account.accountId}` }; - }); - - const abortController = new AbortController(); - const monitorPromise = monitorFeishuProvider({ - config: buildMultiAccountWebsocketConfig(["alpha", "beta", "gamma"]), - abortSignal: abortController.signal, - }); - - try { - await Promise.resolve(); - await Promise.resolve(); - - expect(started).toEqual(["alpha"]); - expect(maxInFlight).toBe(1); - } finally { - releaseProbes(); - abortController.abort(); - await monitorPromise; - } - }); - - it("does not refetch bot info after a failed sequential preflight", async () => { - const started: string[] = []; - let releaseBetaProbe!: () => void; - const betaProbeReleased = new Promise((resolve) => { - releaseBetaProbe = () => resolve(); - }); - - probeFeishuMock.mockImplementation(async (account: { accountId: string }) => { - started.push(account.accountId); - if (account.accountId === "alpha") { - return { ok: false }; - } - await betaProbeReleased; - return { ok: true, botOpenId: `bot_${account.accountId}` }; - }); - - const abortController = new AbortController(); - const monitorPromise = monitorFeishuProvider({ - config: buildMultiAccountWebsocketConfig(["alpha", "beta"]), - abortSignal: abortController.signal, - }); - - try { - for (let i = 0; i < 10 && !started.includes("beta"); i += 1) { - await Promise.resolve(); - } - - expect(started).toEqual(["alpha", "beta"]); - expect(started.filter((accountId) => accountId === "alpha")).toHaveLength(1); - } finally { - releaseBetaProbe(); - abortController.abort(); - await monitorPromise; - } - }); - - it("continues startup when a sequential preflight probe times out", async () => { - vi.useFakeTimers(); - const started: string[] = []; - let releaseBetaProbe!: () => void; - const betaProbeReleased = new Promise((resolve) => { - releaseBetaProbe = () => resolve(); - }); - - probeFeishuMock.mockImplementation((account: { accountId: string }) => { - started.push(account.accountId); - if (account.accountId === "alpha") { - return new Promise(() => {}); - } - return betaProbeReleased.then(() => ({ ok: true, botOpenId: `bot_${account.accountId}` })); - }); - - const abortController = new AbortController(); - const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() }; - const monitorPromise = monitorFeishuProvider({ - config: buildMultiAccountWebsocketConfig(["alpha", "beta"]), - runtime, - abortSignal: abortController.signal, - }); - - try { - await Promise.resolve(); - expect(started).toEqual(["alpha"]); - - await vi.advanceTimersByTimeAsync(10_000); - await Promise.resolve(); - - expect(started).toEqual(["alpha", "beta"]); - expect(runtime.error).toHaveBeenCalledWith( - expect.stringContaining("bot info probe timed out"), - ); - } finally { - releaseBetaProbe(); - abortController.abort(); - await monitorPromise; - vi.useRealTimers(); - } - }); - - it("stops sequential preflight when aborted during a stuck probe", async () => { - const started: string[] = []; - probeFeishuMock.mockImplementation((account: { accountId: string }) => { - started.push(account.accountId); - return new Promise(() => {}); - }); - - const abortController = new AbortController(); - const monitorPromise = monitorFeishuProvider({ - config: buildMultiAccountWebsocketConfig(["alpha", "beta"]), - abortSignal: abortController.signal, - }); - - try { - await Promise.resolve(); - expect(started).toEqual(["alpha"]); - - abortController.abort(); - await monitorPromise; - - expect(started).toEqual(["alpha"]); - } finally { - abortController.abort(); - } - }); }); diff --git a/extensions/feishu/src/probe.test.ts b/extensions/feishu/src/probe.test.ts index 23b45c21165..521b0b4d6d1 100644 --- a/extensions/feishu/src/probe.test.ts +++ b/extensions/feishu/src/probe.test.ts @@ -76,6 +76,36 @@ describe("probeFeishu", () => { ); }); + it("returns timeout error when request exceeds timeout", async () => { + vi.useFakeTimers(); + try { + const requestFn = vi.fn().mockImplementation(() => new Promise(() => {})); + createFeishuClientMock.mockReturnValue({ request: requestFn }); + + const promise = probeFeishu({ appId: "cli_123", appSecret: "secret" }, { timeoutMs: 1_000 }); + await vi.advanceTimersByTimeAsync(1_000); + const result = await promise; + + expect(result).toMatchObject({ ok: false, error: "probe timed out after 1000ms" }); + } finally { + vi.useRealTimers(); + } + }); + + it("returns aborted when abort signal is already aborted", async () => { + createFeishuClientMock.mockClear(); + const abortController = new AbortController(); + abortController.abort(); + + const result = await probeFeishu( + { appId: "cli_123", appSecret: "secret" }, + { abortSignal: abortController.signal }, + ); + + expect(result).toMatchObject({ ok: false, error: "probe aborted" }); + expect(createFeishuClientMock).not.toHaveBeenCalled(); + }); + it("returns cached result on subsequent calls within TTL", async () => { const requestFn = setupClient({ code: 0, diff --git a/extensions/feishu/src/probe.ts b/extensions/feishu/src/probe.ts index a5efacd6a74..31da461f80a 100644 --- a/extensions/feishu/src/probe.ts +++ b/extensions/feishu/src/probe.ts @@ -1,3 +1,4 @@ +import { raceWithTimeoutAndAbort } from "./async.js"; import { createFeishuClient, type FeishuClientCredentials } from "./client.js"; import type { FeishuProbeResult } from "./types.js"; @@ -10,13 +11,37 @@ const PROBE_CACHE_TTL_MS = 10 * 60 * 1000; // 10 minutes const MAX_PROBE_CACHE_SIZE = 64; export const FEISHU_PROBE_REQUEST_TIMEOUT_MS = 10_000; -export async function probeFeishu(creds?: FeishuClientCredentials): Promise { +export type ProbeFeishuOptions = { + timeoutMs?: number; + abortSignal?: AbortSignal; +}; + +type FeishuBotInfoResponse = { + code: number; + msg?: string; + bot?: { bot_name?: string; open_id?: string }; + data?: { bot?: { bot_name?: string; open_id?: string } }; +}; + +export async function probeFeishu( + creds?: FeishuClientCredentials, + options: ProbeFeishuOptions = {}, +): Promise { if (!creds?.appId || !creds?.appSecret) { return { ok: false, error: "missing credentials (appId, appSecret)", }; } + if (options.abortSignal?.aborted) { + return { + ok: false, + appId: creds.appId, + error: "probe aborted", + }; + } + + const timeoutMs = options.timeoutMs ?? FEISHU_PROBE_REQUEST_TIMEOUT_MS; // Return cached result if still valid. // Use accountId when available; otherwise include appSecret prefix so two @@ -32,12 +57,42 @@ export async function probeFeishu(creds?: FeishuClientCredentials): Promise( + (client as any).request({ + method: "GET", + url: "/open-apis/bot/v3/info", + data: {}, + timeout: timeoutMs, + }) as Promise, + { + timeoutMs, + abortSignal: options.abortSignal, + }, + ); + + if (responseResult.status === "aborted") { + return { + ok: false, + appId: creds.appId, + error: "probe aborted", + }; + } + if (responseResult.status === "timeout") { + return { + ok: false, + appId: creds.appId, + error: `probe timed out after ${timeoutMs}ms`, + }; + } + + const response = responseResult.value; + if (options.abortSignal?.aborted) { + return { + ok: false, + appId: creds.appId, + error: "probe aborted", + }; + } if (response.code !== 0) { return {