refactor(matrix): unify startup + split monitor config flow

This commit is contained in:
Peter Steinberger
2026-03-02 00:37:05 +00:00
parent f696b64b51
commit dc816b84ea
9 changed files with 325 additions and 182 deletions

View File

@@ -1,4 +1,6 @@
import { createMatrixClient } from "./client.js";
import { LogService } from "@vector-im/matrix-bot-sdk";
import { createMatrixClient } from "./client/create-client.js";
import { startMatrixClientWithGrace } from "./client/startup.js";
type MatrixClientBootstrapAuth = {
homeserver: string;
@@ -34,6 +36,11 @@ export async function createPreparedMatrixClient(opts: {
// Ignore crypto prep failures for one-off requests.
}
}
await client.start();
await startMatrixClientWithGrace({
client,
onError: (err: unknown) => {
LogService.error("MatrixClientBootstrap", "client.start() error:", err);
},
});
return client;
}

View File

@@ -4,6 +4,7 @@ import { normalizeAccountId } from "openclaw/plugin-sdk/account-id";
import type { CoreConfig } from "../../types.js";
import { resolveMatrixAuth } from "./config.js";
import { createMatrixClient } from "./create-client.js";
import { startMatrixClientWithGrace } from "./startup.js";
import { DEFAULT_ACCOUNT_KEY } from "./storage.js";
import type { MatrixAuth } from "./types.js";
@@ -84,26 +85,13 @@ async function ensureSharedClientStarted(params: {
}
}
// bot-sdk start() returns a promise that never resolves on success
// (infinite sync loop), so we must not await it or startup hangs forever.
// However, it DOES reject on errors (bad token, unreachable homeserver).
// Strategy: race client.start() against a grace timer. If start() rejects
// during or after the window, mark the client as failed so subsequent
// resolveSharedMatrixClient() calls know to retry.
const startPromiseInner = client.start();
let settled = false;
let startError: unknown = undefined;
startPromiseInner.catch((err: unknown) => {
settled = true;
startError = err;
params.state.started = false;
LogService.error("MatrixClientLite", "client.start() error:", err);
await startMatrixClientWithGrace({
client,
onError: (err: unknown) => {
params.state.started = false;
LogService.error("MatrixClientLite", "client.start() error:", err);
},
});
// Give the sync loop a moment to initialize before marking ready
await new Promise((resolve) => setTimeout(resolve, 2000));
if (settled) {
throw startError;
}
params.state.started = true;
})();
sharedClientStartPromises.set(key, startPromise);

View File

@@ -0,0 +1,49 @@
import { describe, expect, it, vi } from "vitest";
import { MATRIX_CLIENT_STARTUP_GRACE_MS, startMatrixClientWithGrace } from "./startup.js";
describe("startMatrixClientWithGrace", () => {
it("resolves after grace when start loop keeps running", async () => {
vi.useFakeTimers();
const client = {
start: vi.fn().mockReturnValue(new Promise<void>(() => {})),
};
const startPromise = startMatrixClientWithGrace({ client });
await vi.advanceTimersByTimeAsync(MATRIX_CLIENT_STARTUP_GRACE_MS);
await expect(startPromise).resolves.toBeUndefined();
vi.useRealTimers();
});
it("rejects when startup fails during grace", async () => {
vi.useFakeTimers();
const startError = new Error("invalid token");
const client = {
start: vi.fn().mockRejectedValue(startError),
};
const startPromise = startMatrixClientWithGrace({ client });
const startupExpectation = expect(startPromise).rejects.toBe(startError);
await vi.advanceTimersByTimeAsync(MATRIX_CLIENT_STARTUP_GRACE_MS);
await startupExpectation;
vi.useRealTimers();
});
it("calls onError for late failures after startup returns", async () => {
vi.useFakeTimers();
const lateError = new Error("late disconnect");
let rejectStart: ((err: unknown) => void) | undefined;
const startLoop = new Promise<void>((_resolve, reject) => {
rejectStart = reject;
});
const onError = vi.fn();
const client = {
start: vi.fn().mockReturnValue(startLoop),
};
const startPromise = startMatrixClientWithGrace({ client, onError });
await vi.advanceTimersByTimeAsync(MATRIX_CLIENT_STARTUP_GRACE_MS);
await expect(startPromise).resolves.toBeUndefined();
rejectStart?.(lateError);
await Promise.resolve();
expect(onError).toHaveBeenCalledWith(lateError);
vi.useRealTimers();
});
});

View File

@@ -0,0 +1,29 @@
import type { MatrixClient } from "@vector-im/matrix-bot-sdk";
export const MATRIX_CLIENT_STARTUP_GRACE_MS = 2000;
export async function startMatrixClientWithGrace(params: {
client: Pick<MatrixClient, "start">;
graceMs?: number;
onError?: (err: unknown) => void;
}): Promise<void> {
const graceMs = params.graceMs ?? MATRIX_CLIENT_STARTUP_GRACE_MS;
let startFailed = false;
let startError: unknown = undefined;
let startPromise: Promise<unknown>;
try {
startPromise = params.client.start();
} catch (err) {
params.onError?.(err);
throw err;
}
void startPromise.catch((err: unknown) => {
startFailed = true;
startError = err;
params.onError?.(err);
});
await new Promise((resolve) => setTimeout(resolve, graceMs));
if (startFailed) {
throw startError;
}
}

View File

@@ -42,13 +42,15 @@ describe("createDirectRoomTracker", () => {
});
it("does not classify 2-member rooms as DMs without direct flags", async () => {
const tracker = createDirectRoomTracker(createMockClient({ isDm: false }));
const client = createMockClient({ isDm: false });
const tracker = createDirectRoomTracker(client);
await expect(
tracker.isDirectMessage({
roomId: "!room:example.org",
senderId: "@alice:example.org",
}),
).resolves.toBe(false);
expect(client.getJoinedRoomMembers).not.toHaveBeenCalled();
});
it("uses is_direct member flags when present", async () => {

View File

@@ -8,15 +8,19 @@ type DirectMessageCheck = {
type DirectRoomTrackerOptions = {
log?: (message: string) => void;
includeMemberCountInLogs?: boolean;
};
const DM_CACHE_TTL_MS = 30_000;
export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTrackerOptions = {}) {
const log = opts.log ?? (() => {});
const includeMemberCountInLogs = opts.includeMemberCountInLogs === true;
let lastDmUpdateMs = 0;
let cachedSelfUserId: string | null = null;
const memberCountCache = new Map<string, { count: number; ts: number }>();
const memberCountCache = includeMemberCountInLogs
? new Map<string, { count: number; ts: number }>()
: undefined;
const ensureSelfUserId = async (): Promise<string | null> => {
if (cachedSelfUserId) {
@@ -44,6 +48,9 @@ export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTr
};
const resolveMemberCount = async (roomId: string): Promise<number | null> => {
if (!memberCountCache) {
return null;
}
const cached = memberCountCache.get(roomId);
const now = Date.now();
if (cached && now - cached.ts < DM_CACHE_TTL_MS) {
@@ -98,6 +105,10 @@ export function createDirectRoomTracker(client: MatrixClient, opts: DirectRoomTr
// were being misclassified as DMs, causing messages to be routed through
// DM policy instead of group policy and silently dropped.
// See: https://github.com/openclaw/openclaw/issues/20145
if (!includeMemberCountInLogs) {
log(`matrix: dm check room=${roomId} result=group`);
return false;
}
const memberCount = await resolveMemberCount(roomId);
log(`matrix: dm check room=${roomId} result=group members=${memberCount ?? "unknown"}`);
return false;

View File

@@ -160,16 +160,13 @@ describe("registerMatrixMonitorEvents", () => {
"") as PluginRuntime["system"]["formatNativeDependencyHint"],
onRoomMessage: vi.fn(),
};
const errorSpy = vi.spyOn(console, "error").mockImplementation(() => {});
registerMatrixMonitorEvents(params);
const initialCallCount = onMock.mock.calls.length;
registerMatrixMonitorEvents(params);
expect(onMock).toHaveBeenCalledTimes(initialCallCount);
expect(errorSpy).toHaveBeenCalledWith(
"[matrix] skipping duplicate listener registration for client",
expect(params.logVerboseMessage).toHaveBeenCalledWith(
"matrix: skipping duplicate listener registration for client",
);
errorSpy.mockRestore();
});
});

View File

@@ -5,11 +5,20 @@ import { sendReadReceiptMatrix } from "../send.js";
import type { MatrixRawEvent } from "./types.js";
import { EventType } from "./types.js";
// Track which clients have had monitor events registered to prevent
// duplicate listener registration when the plugin loads twice
// (e.g. bundled channel + extension both try to start).
// See: https://github.com/openclaw/openclaw/issues/18330
const registeredClients = new WeakSet<object>();
const matrixMonitorListenerRegistry = (() => {
// Prevent duplicate listener registration when both bundled and extension
// paths attempt to start monitors against the same shared client.
const registeredClients = new WeakSet<object>();
return {
tryRegister(client: object): boolean {
if (registeredClients.has(client)) {
return false;
}
registeredClients.add(client);
return true;
},
};
})();
function createSelfUserIdResolver(client: Pick<MatrixClient, "getUserId">) {
let selfUserId: string | undefined;
@@ -47,11 +56,10 @@ export function registerMatrixMonitorEvents(params: {
formatNativeDependencyHint: PluginRuntime["system"]["formatNativeDependencyHint"];
onRoomMessage: (roomId: string, event: MatrixRawEvent) => void | Promise<void>;
}): void {
if (registeredClients.has(params.client)) {
console.error("[matrix] skipping duplicate listener registration for client");
if (!matrixMonitorListenerRegistry.tryRegister(params.client)) {
params.logVerboseMessage("matrix: skipping duplicate listener registration for client");
return;
}
registeredClients.add(params.client);
const {
client,

View File

@@ -10,7 +10,7 @@ import {
} from "openclaw/plugin-sdk";
import { resolveMatrixTargets } from "../../resolve-targets.js";
import { getMatrixRuntime } from "../../runtime.js";
import type { CoreConfig, ReplyToMode } from "../../types.js";
import type { CoreConfig, MatrixConfig, MatrixRoomConfig, ReplyToMode } from "../../types.js";
import { resolveMatrixAccount } from "../accounts.js";
import { setActiveMatrixClient } from "../active-client.js";
import {
@@ -42,6 +42,194 @@ export function isConfiguredMatrixRoomEntry(entry: string): boolean {
return entry.startsWith("!") || (entry.startsWith("#") && entry.includes(":"));
}
function normalizeMatrixUserEntry(raw: string): string {
return raw
.replace(/^matrix:/i, "")
.replace(/^user:/i, "")
.trim();
}
function normalizeMatrixRoomEntry(raw: string): string {
return raw
.replace(/^matrix:/i, "")
.replace(/^(room|channel):/i, "")
.trim();
}
function isMatrixUserId(value: string): boolean {
return value.startsWith("@") && value.includes(":");
}
async function resolveMatrixUserAllowlist(params: {
cfg: CoreConfig;
runtime: RuntimeEnv;
label: string;
list?: Array<string | number>;
}): Promise<string[]> {
let allowList = params.list ?? [];
if (allowList.length === 0) {
return allowList.map(String);
}
const entries = allowList
.map((entry) => normalizeMatrixUserEntry(String(entry)))
.filter((entry) => entry && entry !== "*");
if (entries.length === 0) {
return allowList.map(String);
}
const mapping: string[] = [];
const unresolved: string[] = [];
const additions: string[] = [];
const pending: string[] = [];
for (const entry of entries) {
if (isMatrixUserId(entry)) {
additions.push(normalizeMatrixUserId(entry));
continue;
}
pending.push(entry);
}
if (pending.length > 0) {
const resolved = await resolveMatrixTargets({
cfg: params.cfg,
inputs: pending,
kind: "user",
runtime: params.runtime,
});
for (const entry of resolved) {
if (entry.resolved && entry.id) {
const normalizedId = normalizeMatrixUserId(entry.id);
additions.push(normalizedId);
mapping.push(`${entry.input}${normalizedId}`);
} else {
unresolved.push(entry.input);
}
}
}
allowList = mergeAllowlist({ existing: allowList, additions });
summarizeMapping(params.label, mapping, unresolved, params.runtime);
if (unresolved.length > 0) {
params.runtime.log?.(
`${params.label} entries must be full Matrix IDs (example: @user:server). Unresolved entries are ignored.`,
);
}
return allowList.map(String);
}
async function resolveMatrixRoomsConfig(params: {
cfg: CoreConfig;
runtime: RuntimeEnv;
roomsConfig?: Record<string, MatrixRoomConfig>;
}): Promise<Record<string, MatrixRoomConfig> | undefined> {
let roomsConfig = params.roomsConfig;
if (!roomsConfig || Object.keys(roomsConfig).length === 0) {
return roomsConfig;
}
const mapping: string[] = [];
const unresolved: string[] = [];
const nextRooms: Record<string, MatrixRoomConfig> = {};
if (roomsConfig["*"]) {
nextRooms["*"] = roomsConfig["*"];
}
const pending: Array<{ input: string; query: string; config: MatrixRoomConfig }> = [];
for (const [entry, roomConfig] of Object.entries(roomsConfig)) {
if (entry === "*") {
continue;
}
const trimmed = entry.trim();
if (!trimmed) {
continue;
}
const cleaned = normalizeMatrixRoomEntry(trimmed);
if (isConfiguredMatrixRoomEntry(cleaned)) {
if (!nextRooms[cleaned]) {
nextRooms[cleaned] = roomConfig;
}
if (cleaned !== entry) {
mapping.push(`${entry}${cleaned}`);
}
continue;
}
pending.push({ input: entry, query: trimmed, config: roomConfig });
}
if (pending.length > 0) {
const resolved = await resolveMatrixTargets({
cfg: params.cfg,
inputs: pending.map((entry) => entry.query),
kind: "group",
runtime: params.runtime,
});
resolved.forEach((entry, index) => {
const source = pending[index];
if (!source) {
return;
}
if (entry.resolved && entry.id) {
if (!nextRooms[entry.id]) {
nextRooms[entry.id] = source.config;
}
mapping.push(`${source.input}${entry.id}`);
} else {
unresolved.push(source.input);
}
});
}
roomsConfig = nextRooms;
summarizeMapping("matrix rooms", mapping, unresolved, params.runtime);
if (unresolved.length > 0) {
params.runtime.log?.(
"matrix rooms must be room IDs or aliases (example: !room:server or #alias:server). Unresolved entries are ignored.",
);
}
if (Object.keys(roomsConfig).length === 0) {
return roomsConfig;
}
const nextRoomsWithUsers = { ...roomsConfig };
for (const [roomKey, roomConfig] of Object.entries(roomsConfig)) {
const users = roomConfig?.users ?? [];
if (users.length === 0) {
continue;
}
const resolvedUsers = await resolveMatrixUserAllowlist({
cfg: params.cfg,
runtime: params.runtime,
label: `matrix room users (${roomKey})`,
list: users,
});
if (resolvedUsers !== users) {
nextRoomsWithUsers[roomKey] = { ...roomConfig, users: resolvedUsers };
}
}
return nextRoomsWithUsers;
}
async function resolveMatrixMonitorConfig(params: {
cfg: CoreConfig;
runtime: RuntimeEnv;
accountConfig: MatrixConfig;
}): Promise<{
allowFrom: string[];
groupAllowFrom: string[];
roomsConfig?: Record<string, MatrixRoomConfig>;
}> {
const allowFrom = await resolveMatrixUserAllowlist({
cfg: params.cfg,
runtime: params.runtime,
label: "matrix dm allowlist",
list: params.accountConfig.dm?.allowFrom ?? [],
});
const groupAllowFrom = await resolveMatrixUserAllowlist({
cfg: params.cfg,
runtime: params.runtime,
label: "matrix group allowlist",
list: params.accountConfig.groupAllowFrom ?? [],
});
const roomsConfig = await resolveMatrixRoomsConfig({
cfg: params.cfg,
runtime: params.runtime,
roomsConfig: params.accountConfig.groups ?? params.accountConfig.rooms,
});
return { allowFrom, groupAllowFrom, roomsConfig };
}
export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promise<void> {
if (isBunRuntime()) {
throw new Error("Matrix provider requires Node (bun runtime not supported)");
@@ -65,154 +253,15 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
logger.debug?.(message);
};
const normalizeUserEntry = (raw: string) =>
raw
.replace(/^matrix:/i, "")
.replace(/^user:/i, "")
.trim();
const normalizeRoomEntry = (raw: string) =>
raw
.replace(/^matrix:/i, "")
.replace(/^(room|channel):/i, "")
.trim();
const isMatrixUserId = (value: string) => value.startsWith("@") && value.includes(":");
const resolveUserAllowlist = async (
label: string,
list?: Array<string | number>,
): Promise<string[]> => {
let allowList = list ?? [];
if (allowList.length === 0) {
return allowList.map(String);
}
const entries = allowList
.map((entry) => normalizeUserEntry(String(entry)))
.filter((entry) => entry && entry !== "*");
if (entries.length === 0) {
return allowList.map(String);
}
const mapping: string[] = [];
const unresolved: string[] = [];
const additions: string[] = [];
const pending: string[] = [];
for (const entry of entries) {
if (isMatrixUserId(entry)) {
additions.push(normalizeMatrixUserId(entry));
continue;
}
pending.push(entry);
}
if (pending.length > 0) {
const resolved = await resolveMatrixTargets({
cfg,
inputs: pending,
kind: "user",
runtime,
});
for (const entry of resolved) {
if (entry.resolved && entry.id) {
const normalizedId = normalizeMatrixUserId(entry.id);
additions.push(normalizedId);
mapping.push(`${entry.input}${normalizedId}`);
} else {
unresolved.push(entry.input);
}
}
}
allowList = mergeAllowlist({ existing: allowList, additions });
summarizeMapping(label, mapping, unresolved, runtime);
if (unresolved.length > 0) {
runtime.log?.(
`${label} entries must be full Matrix IDs (example: @user:server). Unresolved entries are ignored.`,
);
}
return allowList.map(String);
};
// Resolve account-specific config for multi-account support
const account = resolveMatrixAccount({ cfg, accountId: opts.accountId });
const accountConfig = account.config;
const allowlistOnly = accountConfig.allowlistOnly === true;
let allowFrom: string[] = (accountConfig.dm?.allowFrom ?? []).map(String);
let groupAllowFrom: string[] = (accountConfig.groupAllowFrom ?? []).map(String);
let roomsConfig = accountConfig.groups ?? accountConfig.rooms;
allowFrom = await resolveUserAllowlist("matrix dm allowlist", allowFrom);
groupAllowFrom = await resolveUserAllowlist("matrix group allowlist", groupAllowFrom);
if (roomsConfig && Object.keys(roomsConfig).length > 0) {
const mapping: string[] = [];
const unresolved: string[] = [];
const nextRooms: Record<string, (typeof roomsConfig)[string]> = {};
if (roomsConfig["*"]) {
nextRooms["*"] = roomsConfig["*"];
}
const pending: Array<{ input: string; query: string; config: (typeof roomsConfig)[string] }> =
[];
for (const [entry, roomConfig] of Object.entries(roomsConfig)) {
if (entry === "*") {
continue;
}
const trimmed = entry.trim();
if (!trimmed) {
continue;
}
const cleaned = normalizeRoomEntry(trimmed);
if (isConfiguredMatrixRoomEntry(cleaned)) {
if (!nextRooms[cleaned]) {
nextRooms[cleaned] = roomConfig;
}
if (cleaned !== entry) {
mapping.push(`${entry}${cleaned}`);
}
continue;
}
pending.push({ input: entry, query: trimmed, config: roomConfig });
}
if (pending.length > 0) {
const resolved = await resolveMatrixTargets({
cfg,
inputs: pending.map((entry) => entry.query),
kind: "group",
runtime,
});
resolved.forEach((entry, index) => {
const source = pending[index];
if (!source) {
return;
}
if (entry.resolved && entry.id) {
if (!nextRooms[entry.id]) {
nextRooms[entry.id] = source.config;
}
mapping.push(`${source.input}${entry.id}`);
} else {
unresolved.push(source.input);
}
});
}
roomsConfig = nextRooms;
summarizeMapping("matrix rooms", mapping, unresolved, runtime);
if (unresolved.length > 0) {
runtime.log?.(
"matrix rooms must be room IDs or aliases (example: !room:server or #alias:server). Unresolved entries are ignored.",
);
}
}
if (roomsConfig && Object.keys(roomsConfig).length > 0) {
const nextRooms = { ...roomsConfig };
for (const [roomKey, roomConfig] of Object.entries(roomsConfig)) {
const users = roomConfig?.users ?? [];
if (users.length === 0) {
continue;
}
const resolvedUsers = await resolveUserAllowlist(`matrix room users (${roomKey})`, users);
if (resolvedUsers !== users) {
nextRooms[roomKey] = { ...roomConfig, users: resolvedUsers };
}
}
roomsConfig = nextRooms;
}
const { allowFrom, groupAllowFrom, roomsConfig } = await resolveMatrixMonitorConfig({
cfg,
runtime,
accountConfig,
});
cfg = {
...cfg,
@@ -274,7 +323,10 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
const mediaMaxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024;
const startupMs = Date.now();
const startupGraceMs = DEFAULT_STARTUP_GRACE_MS;
const directTracker = createDirectRoomTracker(client, { log: logVerboseMessage });
const directTracker = createDirectRoomTracker(client, {
log: logVerboseMessage,
includeMemberCountInLogs: core.logging.shouldLogVerbose(),
});
registerMatrixAutoJoin({ client, cfg, runtime });
const warnedEncryptedRooms = new Set<string>();
const warnedCryptoMissingRooms = new Set<string>();