refactor: adopt chat plugin builder in matrix

This commit is contained in:
Peter Steinberger
2026-03-22 22:21:38 +00:00
parent 5a8f77aa6a
commit 7709aa33d8

View File

@@ -13,6 +13,7 @@ import {
projectWarningCollector,
} from "openclaw/plugin-sdk/channel-policy";
import { createScopedAccountReplyToModeResolver } from "openclaw/plugin-sdk/conversation-runtime";
import { createChatChannelPlugin } from "openclaw/plugin-sdk/core";
import {
createChannelDirectoryAdapter,
createRuntimeDirectoryLiveAdapter,
@@ -35,6 +36,7 @@ import {
type ResolvedMatrixAccount,
} from "./matrix/accounts.js";
import { normalizeMatrixAllowList, normalizeMatrixUserId } from "./matrix/monitor/allowlist.js";
import type { MatrixProbe } from "./matrix/probe.js";
import {
normalizeMatrixMessagingTarget,
resolveMatrixDirectUserId,
@@ -192,286 +194,293 @@ function matchMatrixAcpConversation(params: {
return null;
}
export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount> = {
id: "matrix",
meta,
pairing: createTextPairingAdapter({
idLabel: "matrixUserId",
message: PAIRING_APPROVED_MESSAGE,
normalizeAllowEntry: createPairingPrefixStripper(/^matrix:/i),
notify: async ({ id, message, accountId }) => {
const { sendMessageMatrix } = await loadMatrixChannelRuntime();
await sendMessageMatrix(`user:${id}`, message, {
...(accountId ? { accountId } : {}),
});
},
}),
capabilities: {
chatTypes: ["direct", "group", "thread"],
polls: true,
reactions: true,
threads: true,
media: true,
},
reload: { configPrefixes: ["channels.matrix"] },
configSchema: buildChannelConfigSchema(MatrixConfigSchema),
config: {
...matrixConfigAdapter,
isConfigured: (account) => account.configured,
describeAccount: (account) =>
describeAccountSnapshot({
account,
configured: account.configured,
extra: {
baseUrl: account.homeserver,
export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount, MatrixProbe> =
createChatChannelPlugin({
base: {
id: "matrix",
meta,
capabilities: {
chatTypes: ["direct", "group", "thread"],
polls: true,
reactions: true,
threads: true,
media: true,
},
reload: { configPrefixes: ["channels.matrix"] },
configSchema: buildChannelConfigSchema(MatrixConfigSchema),
config: {
...matrixConfigAdapter,
isConfigured: (account) => account.configured,
describeAccount: (account) =>
describeAccountSnapshot({
account,
configured: account.configured,
extra: {
baseUrl: account.homeserver,
},
}),
},
groups: {
resolveRequireMention: resolveMatrixGroupRequireMention,
resolveToolPolicy: resolveMatrixGroupToolPolicy,
},
messaging: {
normalizeTarget: normalizeMatrixMessagingTarget,
resolveOutboundSessionRoute: (params) => resolveMatrixOutboundSessionRoute(params),
targetResolver: {
looksLikeId: (raw) => {
const trimmed = raw.trim();
if (!trimmed) {
return false;
}
if (/^(matrix:)?[!#@]/i.test(trimmed)) {
return true;
}
return trimmed.includes(":");
},
hint: "<room|alias|user>",
},
}),
},
security: {
resolveDmPolicy: resolveMatrixDmPolicy,
collectWarnings: projectWarningCollector(
({ account, cfg }: { account: ResolvedMatrixAccount; cfg: unknown }) => ({
account,
cfg: cfg as CoreConfig,
}),
collectMatrixSecurityWarningsForAccount,
),
},
groups: {
resolveRequireMention: resolveMatrixGroupRequireMention,
resolveToolPolicy: resolveMatrixGroupToolPolicy,
},
threading: {
resolveReplyToMode: createScopedAccountReplyToModeResolver<
ReturnType<typeof resolveMatrixAccountConfig>
>({
resolveAccount: adaptScopedAccountAccessor(resolveMatrixAccountConfig),
resolveReplyToMode: (account) => account.replyToMode,
}),
buildToolContext: ({ context, hasRepliedRef }) => {
const currentTarget = context.To;
return {
currentChannelId: currentTarget?.trim() || undefined,
currentThreadTs:
context.MessageThreadId != null ? String(context.MessageThreadId) : undefined,
currentDirectUserId: resolveMatrixDirectUserId({
from: context.From,
to: context.To,
chatType: context.ChatType,
},
directory: createChannelDirectoryAdapter({
listPeers: async (params) => {
const entries = listResolvedDirectoryEntriesFromSources<ResolvedMatrixAccount>({
...params,
kind: "user",
resolveAccount: adaptScopedAccountAccessor(resolveMatrixAccount),
resolveSources: (account) => [
account.config.dm?.allowFrom ?? [],
account.config.groupAllowFrom ?? [],
...Object.values(account.config.groups ?? account.config.rooms ?? {}).map(
(room) => room.users ?? [],
),
],
normalizeId: (entry) => {
const raw = entry.replace(/^matrix:/i, "").trim();
if (!raw || raw === "*") {
return null;
}
const lowered = raw.toLowerCase();
const cleaned = lowered.startsWith("user:") ? raw.slice("user:".length).trim() : raw;
return cleaned.startsWith("@") ? `user:${cleaned}` : cleaned;
},
});
return entries.map((entry) => {
const raw = entry.id.startsWith("user:") ? entry.id.slice("user:".length) : entry.id;
const incomplete = !raw.startsWith("@") || !raw.includes(":");
return incomplete ? { ...entry, name: "incomplete id; expected @user:server" } : entry;
});
},
listGroups: async (params) =>
listResolvedDirectoryEntriesFromSources<ResolvedMatrixAccount>({
...params,
kind: "group",
resolveAccount: adaptScopedAccountAccessor(resolveMatrixAccount),
resolveSources: (account) => [
Object.keys(account.config.groups ?? account.config.rooms ?? {}),
],
normalizeId: (entry) => {
const raw = entry.replace(/^matrix:/i, "").trim();
if (!raw || raw === "*") {
return null;
}
const lowered = raw.toLowerCase();
if (lowered.startsWith("room:") || lowered.startsWith("channel:")) {
return raw;
}
return raw.startsWith("!") ? `room:${raw}` : raw;
},
}),
...createRuntimeDirectoryLiveAdapter({
getRuntime: loadMatrixChannelRuntime,
listPeersLive: (runtime) => runtime.listMatrixDirectoryPeersLive,
listGroupsLive: (runtime) => runtime.listMatrixDirectoryGroupsLive,
}),
hasRepliedRef,
};
},
},
messaging: {
normalizeTarget: normalizeMatrixMessagingTarget,
resolveOutboundSessionRoute: (params) => resolveMatrixOutboundSessionRoute(params),
targetResolver: {
looksLikeId: (raw) => {
const trimmed = raw.trim();
if (!trimmed) {
return false;
}
if (/^(matrix:)?[!#@]/i.test(trimmed)) {
return true;
}
return trimmed.includes(":");
}),
resolver: {
resolveTargets: async ({ cfg, accountId, inputs, kind, runtime }) =>
(await loadMatrixChannelRuntime()).resolveMatrixTargets({
cfg,
accountId,
inputs,
kind,
runtime,
}),
},
hint: "<room|alias|user>",
},
},
directory: createChannelDirectoryAdapter({
listPeers: async (params) => {
const entries = listResolvedDirectoryEntriesFromSources<ResolvedMatrixAccount>({
...params,
kind: "user",
resolveAccount: adaptScopedAccountAccessor(resolveMatrixAccount),
resolveSources: (account) => [
account.config.dm?.allowFrom ?? [],
account.config.groupAllowFrom ?? [],
...Object.values(account.config.groups ?? account.config.rooms ?? {}).map(
(room) => room.users ?? [],
actions: matrixMessageActions,
setup: matrixSetupAdapter,
bindings: {
compileConfiguredBinding: ({ conversationId }) =>
normalizeMatrixAcpConversationId(conversationId),
matchInboundConversation: ({ compiledBinding, conversationId, parentConversationId }) =>
matchMatrixAcpConversation({
bindingConversationId: compiledBinding.conversationId,
conversationId,
parentConversationId,
}),
},
status: {
defaultRuntime: {
accountId: DEFAULT_ACCOUNT_ID,
running: false,
lastStartAt: null,
lastStopAt: null,
lastError: null,
},
collectStatusIssues: (accounts) => collectStatusIssuesFromLastError("matrix", accounts),
buildChannelSummary: ({ snapshot }) =>
buildProbeChannelStatusSummary(snapshot, { baseUrl: snapshot.baseUrl ?? null }),
probeAccount: async ({ account, timeoutMs, cfg }) => {
try {
const { probeMatrix, resolveMatrixAuth } = await loadMatrixChannelRuntime();
const auth = await resolveMatrixAuth({
cfg: cfg as CoreConfig,
accountId: account.accountId,
});
return await probeMatrix({
homeserver: auth.homeserver,
accessToken: auth.accessToken,
userId: auth.userId,
timeoutMs,
accountId: account.accountId,
allowPrivateNetwork: auth.allowPrivateNetwork,
ssrfPolicy: auth.ssrfPolicy,
});
} catch (err) {
return {
ok: false,
error: err instanceof Error ? err.message : String(err),
elapsedMs: 0,
};
}
},
buildAccountSnapshot: ({ account, runtime, probe }) =>
buildComputedAccountStatusSnapshot(
{
accountId: account.accountId,
name: account.name,
enabled: account.enabled,
configured: account.configured,
runtime,
probe,
},
{
baseUrl: account.homeserver,
lastProbeAt: runtime?.lastProbeAt ?? null,
...buildTrafficStatusSummary(runtime),
},
),
],
normalizeId: (entry) => {
const raw = entry.replace(/^matrix:/i, "").trim();
if (!raw || raw === "*") {
return null;
},
gateway: {
startAccount: async (ctx) => {
const account = ctx.account;
ctx.setStatus({
accountId: account.accountId,
baseUrl: account.homeserver,
});
ctx.log?.info(
`[${account.accountId}] starting provider (${account.homeserver ?? "matrix"})`,
);
// Serialize startup: wait for any previous startup to complete import phase.
// This works around a race condition with concurrent dynamic imports.
//
// INVARIANT: The import() below cannot hang because:
// 1. It only loads local ESM modules with no circular awaits
// 2. Module initialization is synchronous (no top-level await in ./matrix/monitor/index.js)
// 3. The lock only serializes the import phase, not the provider startup
const previousLock = matrixStartupLock;
let releaseLock: () => void = () => {};
matrixStartupLock = new Promise<void>((resolve) => {
releaseLock = resolve;
});
await previousLock;
// Lazy import: the monitor pulls the reply pipeline; avoid ESM init cycles.
// Wrap in try/finally to ensure lock is released even if import fails.
let monitorMatrixProvider: typeof import("./matrix/monitor/index.js").monitorMatrixProvider;
try {
const module = await import("./matrix/monitor/index.js");
monitorMatrixProvider = module.monitorMatrixProvider;
} finally {
// Release lock after import completes or fails
releaseLock();
}
const lowered = raw.toLowerCase();
const cleaned = lowered.startsWith("user:") ? raw.slice("user:".length).trim() : raw;
return cleaned.startsWith("@") ? `user:${cleaned}` : cleaned;
return monitorMatrixProvider({
runtime: ctx.runtime,
abortSignal: ctx.abortSignal,
mediaMaxMb: account.config.mediaMaxMb,
initialSyncLimit: account.config.initialSyncLimit,
replyToMode: account.config.replyToMode,
accountId: account.accountId,
});
},
});
return entries.map((entry) => {
const raw = entry.id.startsWith("user:") ? entry.id.slice("user:".length) : entry.id;
const incomplete = !raw.startsWith("@") || !raw.includes(":");
return incomplete ? { ...entry, name: "incomplete id; expected @user:server" } : entry;
});
},
},
listGroups: async (params) =>
listResolvedDirectoryEntriesFromSources<ResolvedMatrixAccount>({
...params,
kind: "group",
resolveAccount: adaptScopedAccountAccessor(resolveMatrixAccount),
resolveSources: (account) => [
Object.keys(account.config.groups ?? account.config.rooms ?? {}),
],
normalizeId: (entry) => {
const raw = entry.replace(/^matrix:/i, "").trim();
if (!raw || raw === "*") {
return null;
}
const lowered = raw.toLowerCase();
if (lowered.startsWith("room:") || lowered.startsWith("channel:")) {
return raw;
}
return raw.startsWith("!") ? `room:${raw}` : raw;
},
}),
...createRuntimeDirectoryLiveAdapter({
getRuntime: loadMatrixChannelRuntime,
listPeersLive: (runtime) => runtime.listMatrixDirectoryPeersLive,
listGroupsLive: (runtime) => runtime.listMatrixDirectoryGroupsLive,
}),
}),
resolver: {
resolveTargets: async ({ cfg, accountId, inputs, kind, runtime }) =>
(await loadMatrixChannelRuntime()).resolveMatrixTargets({
cfg,
accountId,
inputs,
kind,
runtime,
}),
},
actions: matrixMessageActions,
setup: matrixSetupAdapter,
outbound: {
deliveryMode: "direct",
chunker: (text, limit) => getMatrixRuntime().channel.text.chunkMarkdownText!(text, limit),
chunkerMode: "markdown",
textChunkLimit: 4000,
...createRuntimeOutboundDelegates({
getRuntime: loadMatrixChannelRuntime,
sendText: {
resolve: (runtime) => runtime.matrixOutbound.sendText,
unavailableMessage: "Matrix outbound text delivery is unavailable",
},
sendMedia: {
resolve: (runtime) => runtime.matrixOutbound.sendMedia,
unavailableMessage: "Matrix outbound media delivery is unavailable",
},
sendPoll: {
resolve: (runtime) => runtime.matrixOutbound.sendPoll,
unavailableMessage: "Matrix outbound poll delivery is unavailable",
},
}),
},
bindings: {
compileConfiguredBinding: ({ conversationId }) =>
normalizeMatrixAcpConversationId(conversationId),
matchInboundConversation: ({ compiledBinding, conversationId, parentConversationId }) =>
matchMatrixAcpConversation({
bindingConversationId: compiledBinding.conversationId,
conversationId,
parentConversationId,
}),
},
status: {
defaultRuntime: {
accountId: DEFAULT_ACCOUNT_ID,
running: false,
lastStartAt: null,
lastStopAt: null,
lastError: null,
},
collectStatusIssues: (accounts) => collectStatusIssuesFromLastError("matrix", accounts),
buildChannelSummary: ({ snapshot }) =>
buildProbeChannelStatusSummary(snapshot, { baseUrl: snapshot.baseUrl ?? null }),
probeAccount: async ({ account, timeoutMs, cfg }) => {
try {
const { probeMatrix, resolveMatrixAuth } = await loadMatrixChannelRuntime();
const auth = await resolveMatrixAuth({
security: {
resolveDmPolicy: resolveMatrixDmPolicy,
collectWarnings: projectWarningCollector(
({ account, cfg }: { account: ResolvedMatrixAccount; cfg: unknown }) => ({
account,
cfg: cfg as CoreConfig,
accountId: account.accountId,
});
return await probeMatrix({
homeserver: auth.homeserver,
accessToken: auth.accessToken,
userId: auth.userId,
timeoutMs,
accountId: account.accountId,
allowPrivateNetwork: auth.allowPrivateNetwork,
ssrfPolicy: auth.ssrfPolicy,
});
} catch (err) {
return {
ok: false,
error: err instanceof Error ? err.message : String(err),
elapsedMs: 0,
};
}
},
buildAccountSnapshot: ({ account, runtime, probe }) =>
buildComputedAccountStatusSnapshot(
{
accountId: account.accountId,
name: account.name,
enabled: account.enabled,
configured: account.configured,
runtime,
probe,
},
{
baseUrl: account.homeserver,
lastProbeAt: runtime?.lastProbeAt ?? null,
...buildTrafficStatusSummary(runtime),
},
}),
collectMatrixSecurityWarningsForAccount,
),
},
gateway: {
startAccount: async (ctx) => {
const account = ctx.account;
ctx.setStatus({
accountId: account.accountId,
baseUrl: account.homeserver,
});
ctx.log?.info(`[${account.accountId}] starting provider (${account.homeserver ?? "matrix"})`);
// Serialize startup: wait for any previous startup to complete import phase.
// This works around a race condition with concurrent dynamic imports.
//
// INVARIANT: The import() below cannot hang because:
// 1. It only loads local ESM modules with no circular awaits
// 2. Module initialization is synchronous (no top-level await in ./matrix/monitor/index.js)
// 3. The lock only serializes the import phase, not the provider startup
const previousLock = matrixStartupLock;
let releaseLock: () => void = () => {};
matrixStartupLock = new Promise<void>((resolve) => {
releaseLock = resolve;
});
await previousLock;
// Lazy import: the monitor pulls the reply pipeline; avoid ESM init cycles.
// Wrap in try/finally to ensure lock is released even if import fails.
let monitorMatrixProvider: typeof import("./matrix/monitor/index.js").monitorMatrixProvider;
try {
const module = await import("./matrix/monitor/index.js");
monitorMatrixProvider = module.monitorMatrixProvider;
} finally {
// Release lock after import completes or fails
releaseLock();
}
return monitorMatrixProvider({
runtime: ctx.runtime,
abortSignal: ctx.abortSignal,
mediaMaxMb: account.config.mediaMaxMb,
initialSyncLimit: account.config.initialSyncLimit,
replyToMode: account.config.replyToMode,
accountId: account.accountId,
});
},
},
};
pairing: {
text: {
idLabel: "matrixUserId",
message: PAIRING_APPROVED_MESSAGE,
normalizeAllowEntry: createPairingPrefixStripper(/^matrix:/i),
notify: async ({ id, message, accountId }) => {
const { sendMessageMatrix } = await loadMatrixChannelRuntime();
await sendMessageMatrix(`user:${id}`, message, {
...(accountId ? { accountId } : {}),
});
},
},
},
threading: {
resolveReplyToMode: createScopedAccountReplyToModeResolver<
ReturnType<typeof resolveMatrixAccountConfig>
>({
resolveAccount: adaptScopedAccountAccessor(resolveMatrixAccountConfig),
resolveReplyToMode: (account) => account.replyToMode,
}),
buildToolContext: ({ context, hasRepliedRef }) => {
const currentTarget = context.To;
return {
currentChannelId: currentTarget?.trim() || undefined,
currentThreadTs:
context.MessageThreadId != null ? String(context.MessageThreadId) : undefined,
currentDirectUserId: resolveMatrixDirectUserId({
from: context.From,
to: context.To,
chatType: context.ChatType,
}),
hasRepliedRef,
};
},
},
outbound: {
deliveryMode: "direct",
chunker: (text, limit) => getMatrixRuntime().channel.text.chunkMarkdownText!(text, limit),
chunkerMode: "markdown",
textChunkLimit: 4000,
...createRuntimeOutboundDelegates({
getRuntime: loadMatrixChannelRuntime,
sendText: {
resolve: (runtime) => runtime.matrixOutbound.sendText,
unavailableMessage: "Matrix outbound text delivery is unavailable",
},
sendMedia: {
resolve: (runtime) => runtime.matrixOutbound.sendMedia,
unavailableMessage: "Matrix outbound media delivery is unavailable",
},
sendPoll: {
resolve: (runtime) => runtime.matrixOutbound.sendPoll,
unavailableMessage: "Matrix outbound poll delivery is unavailable",
},
}),
},
});