diff --git a/extensions/matrix/src/matrix/client-bootstrap.ts b/extensions/matrix/src/matrix/client-bootstrap.ts index 66512291945..b2744d50039 100644 --- a/extensions/matrix/src/matrix/client-bootstrap.ts +++ b/extensions/matrix/src/matrix/client-bootstrap.ts @@ -1,4 +1,6 @@ -import { createMatrixClient } from "./client.js"; +import { LogService } from "@vector-im/matrix-bot-sdk"; +import { createMatrixClient } from "./client/create-client.js"; +import { startMatrixClientWithGrace } from "./client/startup.js"; type MatrixClientBootstrapAuth = { homeserver: string; @@ -34,6 +36,11 @@ export async function createPreparedMatrixClient(opts: { // Ignore crypto prep failures for one-off requests. } } - await client.start(); + await startMatrixClientWithGrace({ + client, + onError: (err: unknown) => { + LogService.error("MatrixClientBootstrap", "client.start() error:", err); + }, + }); return client; } diff --git a/extensions/matrix/src/matrix/client/shared.ts b/extensions/matrix/src/matrix/client/shared.ts index 29cac45d6b4..d64b61ee083 100644 --- a/extensions/matrix/src/matrix/client/shared.ts +++ b/extensions/matrix/src/matrix/client/shared.ts @@ -4,6 +4,7 @@ import { normalizeAccountId } from "openclaw/plugin-sdk/account-id"; import type { CoreConfig } from "../../types.js"; import { resolveMatrixAuth } from "./config.js"; import { createMatrixClient } from "./create-client.js"; +import { startMatrixClientWithGrace } from "./startup.js"; import { DEFAULT_ACCOUNT_KEY } from "./storage.js"; import type { MatrixAuth } from "./types.js"; @@ -84,26 +85,13 @@ async function ensureSharedClientStarted(params: { } } - // bot-sdk start() returns a promise that never resolves on success - // (infinite sync loop), so we must not await it or startup hangs forever. - // However, it DOES reject on errors (bad token, unreachable homeserver). - // Strategy: race client.start() against a grace timer. If start() rejects - // during or after the window, mark the client as failed so subsequent - // resolveSharedMatrixClient() calls know to retry. - const startPromiseInner = client.start(); - let settled = false; - let startError: unknown = undefined; - startPromiseInner.catch((err: unknown) => { - settled = true; - startError = err; - params.state.started = false; - LogService.error("MatrixClientLite", "client.start() error:", err); + await startMatrixClientWithGrace({ + client, + onError: (err: unknown) => { + params.state.started = false; + LogService.error("MatrixClientLite", "client.start() error:", err); + }, }); - // Give the sync loop a moment to initialize before marking ready - await new Promise((resolve) => setTimeout(resolve, 2000)); - if (settled) { - throw startError; - } params.state.started = true; })(); sharedClientStartPromises.set(key, startPromise); diff --git a/extensions/matrix/src/matrix/client/startup.test.ts b/extensions/matrix/src/matrix/client/startup.test.ts new file mode 100644 index 00000000000..c7135a012f5 --- /dev/null +++ b/extensions/matrix/src/matrix/client/startup.test.ts @@ -0,0 +1,49 @@ +import { describe, expect, it, vi } from "vitest"; +import { MATRIX_CLIENT_STARTUP_GRACE_MS, startMatrixClientWithGrace } from "./startup.js"; + +describe("startMatrixClientWithGrace", () => { + it("resolves after grace when start loop keeps running", async () => { + vi.useFakeTimers(); + const client = { + start: vi.fn().mockReturnValue(new Promise(() => {})), + }; + const startPromise = startMatrixClientWithGrace({ client }); + await vi.advanceTimersByTimeAsync(MATRIX_CLIENT_STARTUP_GRACE_MS); + await expect(startPromise).resolves.toBeUndefined(); + vi.useRealTimers(); + }); + + it("rejects when startup fails during grace", async () => { + vi.useFakeTimers(); + const startError = new Error("invalid token"); + const client = { + start: vi.fn().mockRejectedValue(startError), + }; + const startPromise = startMatrixClientWithGrace({ client }); + const startupExpectation = expect(startPromise).rejects.toBe(startError); + await vi.advanceTimersByTimeAsync(MATRIX_CLIENT_STARTUP_GRACE_MS); + await startupExpectation; + vi.useRealTimers(); + }); + + it("calls onError for late failures after startup returns", async () => { + vi.useFakeTimers(); + const lateError = new Error("late disconnect"); + let rejectStart: ((err: unknown) => void) | undefined; + const startLoop = new Promise((_resolve, reject) => { + rejectStart = reject; + }); + const onError = vi.fn(); + const client = { + start: vi.fn().mockReturnValue(startLoop), + }; + const startPromise = startMatrixClientWithGrace({ client, onError }); + await vi.advanceTimersByTimeAsync(MATRIX_CLIENT_STARTUP_GRACE_MS); + await expect(startPromise).resolves.toBeUndefined(); + + rejectStart?.(lateError); + await Promise.resolve(); + expect(onError).toHaveBeenCalledWith(lateError); + vi.useRealTimers(); + }); +}); diff --git a/extensions/matrix/src/matrix/client/startup.ts b/extensions/matrix/src/matrix/client/startup.ts new file mode 100644 index 00000000000..4ae8cd64733 --- /dev/null +++ b/extensions/matrix/src/matrix/client/startup.ts @@ -0,0 +1,29 @@ +import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; + +export const MATRIX_CLIENT_STARTUP_GRACE_MS = 2000; + +export async function startMatrixClientWithGrace(params: { + client: Pick; + graceMs?: number; + onError?: (err: unknown) => void; +}): Promise { + const graceMs = params.graceMs ?? MATRIX_CLIENT_STARTUP_GRACE_MS; + let startFailed = false; + let startError: unknown = undefined; + let startPromise: Promise; + try { + startPromise = params.client.start(); + } catch (err) { + params.onError?.(err); + throw err; + } + void startPromise.catch((err: unknown) => { + startFailed = true; + startError = err; + params.onError?.(err); + }); + await new Promise((resolve) => setTimeout(resolve, graceMs)); + if (startFailed) { + throw startError; + } +} diff --git a/extensions/matrix/src/matrix/monitor/direct.test.ts b/extensions/matrix/src/matrix/monitor/direct.test.ts index 3c5ee878065..2f6471f4be3 100644 --- a/extensions/matrix/src/matrix/monitor/direct.test.ts +++ b/extensions/matrix/src/matrix/monitor/direct.test.ts @@ -42,13 +42,15 @@ describe("createDirectRoomTracker", () => { }); it("does not classify 2-member rooms as DMs without direct flags", async () => { - const tracker = createDirectRoomTracker(createMockClient({ isDm: false })); + const client = createMockClient({ isDm: false }); + const tracker = createDirectRoomTracker(client); await expect( tracker.isDirectMessage({ roomId: "!room:example.org", senderId: "@alice:example.org", }), ).resolves.toBe(false); + expect(client.getJoinedRoomMembers).not.toHaveBeenCalled(); }); it("uses is_direct member flags when present", async () => { diff --git a/extensions/matrix/src/matrix/monitor/direct.ts b/extensions/matrix/src/matrix/monitor/direct.ts index 4506bd7d80e..d938c57b4e5 100644 --- a/extensions/matrix/src/matrix/monitor/direct.ts +++ b/extensions/matrix/src/matrix/monitor/direct.ts @@ -8,15 +8,19 @@ type DirectMessageCheck = { type DirectRoomTrackerOptions = { log?: (message: string) => void; + includeMemberCountInLogs?: boolean; }; const DM_CACHE_TTL_MS = 30_000; export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTrackerOptions = {}) { const log = opts.log ?? (() => {}); + const includeMemberCountInLogs = opts.includeMemberCountInLogs === true; let lastDmUpdateMs = 0; let cachedSelfUserId: string | null = null; - const memberCountCache = new Map(); + const memberCountCache = includeMemberCountInLogs + ? new Map() + : undefined; const ensureSelfUserId = async (): Promise => { if (cachedSelfUserId) { @@ -44,6 +48,9 @@ export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTr }; const resolveMemberCount = async (roomId: string): Promise => { + if (!memberCountCache) { + return null; + } const cached = memberCountCache.get(roomId); const now = Date.now(); if (cached && now - cached.ts < DM_CACHE_TTL_MS) { @@ -98,6 +105,10 @@ export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTr // were being misclassified as DMs, causing messages to be routed through // DM policy instead of group policy and silently dropped. // See: https://github.com/openclaw/openclaw/issues/20145 + if (!includeMemberCountInLogs) { + log(`matrix: dm check room=${roomId} result=group`); + return false; + } const memberCount = await resolveMemberCount(roomId); log(`matrix: dm check room=${roomId} result=group members=${memberCount ?? "unknown"}`); return false; diff --git a/extensions/matrix/src/matrix/monitor/events.test.ts b/extensions/matrix/src/matrix/monitor/events.test.ts index f0fa9eba8e9..eeedb8195c6 100644 --- a/extensions/matrix/src/matrix/monitor/events.test.ts +++ b/extensions/matrix/src/matrix/monitor/events.test.ts @@ -160,16 +160,13 @@ describe("registerMatrixMonitorEvents", () => { "") as PluginRuntime["system"]["formatNativeDependencyHint"], onRoomMessage: vi.fn(), }; - const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {}); - registerMatrixMonitorEvents(params); const initialCallCount = onMock.mock.calls.length; registerMatrixMonitorEvents(params); expect(onMock).toHaveBeenCalledTimes(initialCallCount); - expect(errorSpy).toHaveBeenCalledWith( - "[matrix] skipping duplicate listener registration for client", + expect(params.logVerboseMessage).toHaveBeenCalledWith( + "matrix: skipping duplicate listener registration for client", ); - errorSpy.mockRestore(); }); }); diff --git a/extensions/matrix/src/matrix/monitor/events.ts b/extensions/matrix/src/matrix/monitor/events.ts index a8519dcb734..76d2168a14d 100644 --- a/extensions/matrix/src/matrix/monitor/events.ts +++ b/extensions/matrix/src/matrix/monitor/events.ts @@ -5,11 +5,20 @@ import { sendReadReceiptMatrix } from "../send.js"; import type { MatrixRawEvent } from "./types.js"; import { EventType } from "./types.js"; -// Track which clients have had monitor events registered to prevent -// duplicate listener registration when the plugin loads twice -// (e.g. bundled channel + extension both try to start). -// See: https://github.com/openclaw/openclaw/issues/18330 -const registeredClients = new WeakSet(); +const matrixMonitorListenerRegistry = (() => { + // Prevent duplicate listener registration when both bundled and extension + // paths attempt to start monitors against the same shared client. + const registeredClients = new WeakSet(); + return { + tryRegister(client: object): boolean { + if (registeredClients.has(client)) { + return false; + } + registeredClients.add(client); + return true; + }, + }; +})(); function createSelfUserIdResolver(client: Pick) { let selfUserId: string | undefined; @@ -47,11 +56,10 @@ export function registerMatrixMonitorEvents(params: { formatNativeDependencyHint: PluginRuntime["system"]["formatNativeDependencyHint"]; onRoomMessage: (roomId: string, event: MatrixRawEvent) => void | Promise; }): void { - if (registeredClients.has(params.client)) { - console.error("[matrix] skipping duplicate listener registration for client"); + if (!matrixMonitorListenerRegistry.tryRegister(params.client)) { + params.logVerboseMessage("matrix: skipping duplicate listener registration for client"); return; } - registeredClients.add(params.client); const { client, diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index a2e118a0ace..4f7df2a7a08 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -10,7 +10,7 @@ import { } from "openclaw/plugin-sdk"; import { resolveMatrixTargets } from "../../resolve-targets.js"; import { getMatrixRuntime } from "../../runtime.js"; -import type { CoreConfig, ReplyToMode } from "../../types.js"; +import type { CoreConfig, MatrixConfig, MatrixRoomConfig, ReplyToMode } from "../../types.js"; import { resolveMatrixAccount } from "../accounts.js"; import { setActiveMatrixClient } from "../active-client.js"; import { @@ -42,6 +42,194 @@ export function isConfiguredMatrixRoomEntry(entry: string): boolean { return entry.startsWith("!") || (entry.startsWith("#") && entry.includes(":")); } +function normalizeMatrixUserEntry(raw: string): string { + return raw + .replace(/^matrix:/i, "") + .replace(/^user:/i, "") + .trim(); +} + +function normalizeMatrixRoomEntry(raw: string): string { + return raw + .replace(/^matrix:/i, "") + .replace(/^(room|channel):/i, "") + .trim(); +} + +function isMatrixUserId(value: string): boolean { + return value.startsWith("@") && value.includes(":"); +} + +async function resolveMatrixUserAllowlist(params: { + cfg: CoreConfig; + runtime: RuntimeEnv; + label: string; + list?: Array; +}): Promise { + let allowList = params.list ?? []; + if (allowList.length === 0) { + return allowList.map(String); + } + const entries = allowList + .map((entry) => normalizeMatrixUserEntry(String(entry))) + .filter((entry) => entry && entry !== "*"); + if (entries.length === 0) { + return allowList.map(String); + } + const mapping: string[] = []; + const unresolved: string[] = []; + const additions: string[] = []; + const pending: string[] = []; + for (const entry of entries) { + if (isMatrixUserId(entry)) { + additions.push(normalizeMatrixUserId(entry)); + continue; + } + pending.push(entry); + } + if (pending.length > 0) { + const resolved = await resolveMatrixTargets({ + cfg: params.cfg, + inputs: pending, + kind: "user", + runtime: params.runtime, + }); + for (const entry of resolved) { + if (entry.resolved && entry.id) { + const normalizedId = normalizeMatrixUserId(entry.id); + additions.push(normalizedId); + mapping.push(`${entry.input}→${normalizedId}`); + } else { + unresolved.push(entry.input); + } + } + } + allowList = mergeAllowlist({ existing: allowList, additions }); + summarizeMapping(params.label, mapping, unresolved, params.runtime); + if (unresolved.length > 0) { + params.runtime.log?.( + `${params.label} entries must be full Matrix IDs (example: @user:server). Unresolved entries are ignored.`, + ); + } + return allowList.map(String); +} + +async function resolveMatrixRoomsConfig(params: { + cfg: CoreConfig; + runtime: RuntimeEnv; + roomsConfig?: Record; +}): Promise | undefined> { + let roomsConfig = params.roomsConfig; + if (!roomsConfig || Object.keys(roomsConfig).length === 0) { + return roomsConfig; + } + const mapping: string[] = []; + const unresolved: string[] = []; + const nextRooms: Record = {}; + if (roomsConfig["*"]) { + nextRooms["*"] = roomsConfig["*"]; + } + const pending: Array<{ input: string; query: string; config: MatrixRoomConfig }> = []; + for (const [entry, roomConfig] of Object.entries(roomsConfig)) { + if (entry === "*") { + continue; + } + const trimmed = entry.trim(); + if (!trimmed) { + continue; + } + const cleaned = normalizeMatrixRoomEntry(trimmed); + if (isConfiguredMatrixRoomEntry(cleaned)) { + if (!nextRooms[cleaned]) { + nextRooms[cleaned] = roomConfig; + } + if (cleaned !== entry) { + mapping.push(`${entry}→${cleaned}`); + } + continue; + } + pending.push({ input: entry, query: trimmed, config: roomConfig }); + } + if (pending.length > 0) { + const resolved = await resolveMatrixTargets({ + cfg: params.cfg, + inputs: pending.map((entry) => entry.query), + kind: "group", + runtime: params.runtime, + }); + resolved.forEach((entry, index) => { + const source = pending[index]; + if (!source) { + return; + } + if (entry.resolved && entry.id) { + if (!nextRooms[entry.id]) { + nextRooms[entry.id] = source.config; + } + mapping.push(`${source.input}→${entry.id}`); + } else { + unresolved.push(source.input); + } + }); + } + roomsConfig = nextRooms; + summarizeMapping("matrix rooms", mapping, unresolved, params.runtime); + if (unresolved.length > 0) { + params.runtime.log?.( + "matrix rooms must be room IDs or aliases (example: !room:server or #alias:server). Unresolved entries are ignored.", + ); + } + if (Object.keys(roomsConfig).length === 0) { + return roomsConfig; + } + const nextRoomsWithUsers = { ...roomsConfig }; + for (const [roomKey, roomConfig] of Object.entries(roomsConfig)) { + const users = roomConfig?.users ?? []; + if (users.length === 0) { + continue; + } + const resolvedUsers = await resolveMatrixUserAllowlist({ + cfg: params.cfg, + runtime: params.runtime, + label: `matrix room users (${roomKey})`, + list: users, + }); + if (resolvedUsers !== users) { + nextRoomsWithUsers[roomKey] = { ...roomConfig, users: resolvedUsers }; + } + } + return nextRoomsWithUsers; +} + +async function resolveMatrixMonitorConfig(params: { + cfg: CoreConfig; + runtime: RuntimeEnv; + accountConfig: MatrixConfig; +}): Promise<{ + allowFrom: string[]; + groupAllowFrom: string[]; + roomsConfig?: Record; +}> { + const allowFrom = await resolveMatrixUserAllowlist({ + cfg: params.cfg, + runtime: params.runtime, + label: "matrix dm allowlist", + list: params.accountConfig.dm?.allowFrom ?? [], + }); + const groupAllowFrom = await resolveMatrixUserAllowlist({ + cfg: params.cfg, + runtime: params.runtime, + label: "matrix group allowlist", + list: params.accountConfig.groupAllowFrom ?? [], + }); + const roomsConfig = await resolveMatrixRoomsConfig({ + cfg: params.cfg, + runtime: params.runtime, + roomsConfig: params.accountConfig.groups ?? params.accountConfig.rooms, + }); + return { allowFrom, groupAllowFrom, roomsConfig }; +} + export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promise { if (isBunRuntime()) { throw new Error("Matrix provider requires Node (bun runtime not supported)"); @@ -65,154 +253,15 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi logger.debug?.(message); }; - const normalizeUserEntry = (raw: string) => - raw - .replace(/^matrix:/i, "") - .replace(/^user:/i, "") - .trim(); - const normalizeRoomEntry = (raw: string) => - raw - .replace(/^matrix:/i, "") - .replace(/^(room|channel):/i, "") - .trim(); - const isMatrixUserId = (value: string) => value.startsWith("@") && value.includes(":"); - const resolveUserAllowlist = async ( - label: string, - list?: Array, - ): Promise => { - let allowList = list ?? []; - if (allowList.length === 0) { - return allowList.map(String); - } - const entries = allowList - .map((entry) => normalizeUserEntry(String(entry))) - .filter((entry) => entry && entry !== "*"); - if (entries.length === 0) { - return allowList.map(String); - } - const mapping: string[] = []; - const unresolved: string[] = []; - const additions: string[] = []; - const pending: string[] = []; - for (const entry of entries) { - if (isMatrixUserId(entry)) { - additions.push(normalizeMatrixUserId(entry)); - continue; - } - pending.push(entry); - } - if (pending.length > 0) { - const resolved = await resolveMatrixTargets({ - cfg, - inputs: pending, - kind: "user", - runtime, - }); - for (const entry of resolved) { - if (entry.resolved && entry.id) { - const normalizedId = normalizeMatrixUserId(entry.id); - additions.push(normalizedId); - mapping.push(`${entry.input}→${normalizedId}`); - } else { - unresolved.push(entry.input); - } - } - } - allowList = mergeAllowlist({ existing: allowList, additions }); - summarizeMapping(label, mapping, unresolved, runtime); - if (unresolved.length > 0) { - runtime.log?.( - `${label} entries must be full Matrix IDs (example: @user:server). Unresolved entries are ignored.`, - ); - } - return allowList.map(String); - }; - // Resolve account-specific config for multi-account support const account = resolveMatrixAccount({ cfg, accountId: opts.accountId }); const accountConfig = account.config; - const allowlistOnly = accountConfig.allowlistOnly === true; - let allowFrom: string[] = (accountConfig.dm?.allowFrom ?? []).map(String); - let groupAllowFrom: string[] = (accountConfig.groupAllowFrom ?? []).map(String); - let roomsConfig = accountConfig.groups ?? accountConfig.rooms; - - allowFrom = await resolveUserAllowlist("matrix dm allowlist", allowFrom); - groupAllowFrom = await resolveUserAllowlist("matrix group allowlist", groupAllowFrom); - - if (roomsConfig && Object.keys(roomsConfig).length > 0) { - const mapping: string[] = []; - const unresolved: string[] = []; - const nextRooms: Record = {}; - if (roomsConfig["*"]) { - nextRooms["*"] = roomsConfig["*"]; - } - const pending: Array<{ input: string; query: string; config: (typeof roomsConfig)[string] }> = - []; - for (const [entry, roomConfig] of Object.entries(roomsConfig)) { - if (entry === "*") { - continue; - } - const trimmed = entry.trim(); - if (!trimmed) { - continue; - } - const cleaned = normalizeRoomEntry(trimmed); - if (isConfiguredMatrixRoomEntry(cleaned)) { - if (!nextRooms[cleaned]) { - nextRooms[cleaned] = roomConfig; - } - if (cleaned !== entry) { - mapping.push(`${entry}→${cleaned}`); - } - continue; - } - pending.push({ input: entry, query: trimmed, config: roomConfig }); - } - if (pending.length > 0) { - const resolved = await resolveMatrixTargets({ - cfg, - inputs: pending.map((entry) => entry.query), - kind: "group", - runtime, - }); - resolved.forEach((entry, index) => { - const source = pending[index]; - if (!source) { - return; - } - if (entry.resolved && entry.id) { - if (!nextRooms[entry.id]) { - nextRooms[entry.id] = source.config; - } - mapping.push(`${source.input}→${entry.id}`); - } else { - unresolved.push(source.input); - } - }); - } - roomsConfig = nextRooms; - summarizeMapping("matrix rooms", mapping, unresolved, runtime); - if (unresolved.length > 0) { - runtime.log?.( - "matrix rooms must be room IDs or aliases (example: !room:server or #alias:server). Unresolved entries are ignored.", - ); - } - } - if (roomsConfig && Object.keys(roomsConfig).length > 0) { - const nextRooms = { ...roomsConfig }; - for (const [roomKey, roomConfig] of Object.entries(roomsConfig)) { - const users = roomConfig?.users ?? []; - if (users.length === 0) { - continue; - } - const resolvedUsers = await resolveUserAllowlist(`matrix room users (${roomKey})`, users); - if (resolvedUsers !== users) { - nextRooms[roomKey] = { ...roomConfig, users: resolvedUsers }; - } - } - roomsConfig = nextRooms; - } + const { allowFrom, groupAllowFrom, roomsConfig } = await resolveMatrixMonitorConfig({ + cfg, + runtime, + accountConfig, + }); cfg = { ...cfg, @@ -274,7 +323,10 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi const mediaMaxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024; const startupMs = Date.now(); const startupGraceMs = DEFAULT_STARTUP_GRACE_MS; - const directTracker = createDirectRoomTracker(client, { log: logVerboseMessage }); + const directTracker = createDirectRoomTracker(client, { + log: logVerboseMessage, + includeMemberCountInLogs: core.logging.shouldLogVerbose(), + }); registerMatrixAutoJoin({ client, cfg, runtime }); const warnedEncryptedRooms = new Set(); const warnedCryptoMissingRooms = new Set();