fix(matrix): align DM room session routing (#61373)

Merged via squash.

Prepared head SHA: 9529d2e161
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
Gustavo Madeira Santana
2026-04-05 15:15:46 -04:00
committed by GitHub
parent 55192e2d51
commit dcd0cf9f98
25 changed files with 1501 additions and 50 deletions

View File

@@ -200,6 +200,7 @@ Docs: https://docs.openclaw.ai
- Agents/video generation: accept `agents.defaults.videoGenerationModel` in strict config validation and `openclaw config set/get`, so gateways using `video_generate` no longer fail to boot after enabling a video model.
- Discord/image generation: persist volatile workspace-generated media into durable outbound media before final reply delivery so generated image replies stop failing with missing local workspace paths.
- Matrix: move legacy top-level `avatarUrl` into the default account during multi-account promotion and keep env-backed account setup avatar config persisted. (#61437) Thanks @gumadeiras.
- Matrix/DM sessions: add `channels.matrix.dm.sessionScope`, shared-session collision notices, and aligned outbound session reuse so separate Matrix DM rooms can keep distinct context when configured. (#61373) Thanks @gumadeiras.
## 2026.4.2

View File

@@ -152,6 +152,7 @@ This is a practical baseline config with DM pairing, room allowlist, and E2EE en
dm: {
policy: "pairing",
sessionScope: "per-room",
threadReplies: "off",
},
@@ -522,12 +523,17 @@ The repair flow does not delete old rooms automatically. It only picks the healt
Matrix supports native Matrix threads for both automatic replies and message-tool sends.
- `dm.sessionScope: "per-user"` (default) keeps Matrix DM routing sender-scoped, so multiple DM rooms can share one session when they resolve to the same peer.
- `dm.sessionScope: "per-room"` isolates each Matrix DM room into its own session key while still using normal DM auth and allowlist checks.
- Explicit Matrix conversation bindings still win over `dm.sessionScope`, so bound rooms and threads keep their chosen target session.
- `threadReplies: "off"` keeps replies top-level and keeps inbound threaded messages on the parent session.
- `threadReplies: "inbound"` replies inside a thread only when the inbound message was already in that thread.
- `threadReplies: "always"` keeps room replies in a thread rooted at the triggering message and routes that conversation through the matching thread-scoped session from the first triggering message.
- `dm.threadReplies` overrides the top-level setting for DMs only. For example, you can keep room threads isolated while keeping DMs flat.
- Inbound threaded messages include the thread root message as extra agent context.
- Message-tool sends now auto-inherit the current Matrix thread when the target is the same room, or the same DM user target, unless an explicit `threadId` is provided.
- Same-session DM user-target reuse only kicks in when the current session metadata proves the same DM peer on the same Matrix account; otherwise OpenClaw falls back to normal user-scoped routing.
- When OpenClaw sees a Matrix DM room collide with another DM room on the same shared Matrix DM session, it posts a one-time `m.notice` in that room with the `/focus` escape hatch when thread bindings are enabled and the `dm.sessionScope` hint.
- Runtime thread bindings are supported for Matrix. `/focus`, `/unfocus`, `/agents`, `/session idle`, `/session max-age`, and thread-bound `/acp spawn` now work in Matrix rooms and DMs.
- Top-level Matrix room/DM `/focus` creates a new Matrix thread and binds it to the target session when `threadBindings.spawnSubagentSessions=true`.
- Running `/focus` or `/acp spawn --thread here` inside an existing Matrix thread binds that current thread instead.
@@ -842,8 +848,9 @@ Live directory lookup uses the logged-in Matrix account:
- `mediaMaxMb`: media size cap in MB for Matrix media handling. It applies to outbound sends and inbound media processing.
- `autoJoin`: invite auto-join policy (`always`, `allowlist`, `off`). Default: `off`.
- `autoJoinAllowlist`: rooms/aliases allowed when `autoJoin` is `allowlist`. Alias entries are resolved to room IDs during invite handling; OpenClaw does not trust alias state claimed by the invited room.
- `dm`: DM policy block (`enabled`, `policy`, `allowFrom`, `threadReplies`).
- `dm`: DM policy block (`enabled`, `policy`, `allowFrom`, `sessionScope`, `threadReplies`).
- `dm.allowFrom` entries should be full Matrix user IDs unless you already resolved them through live directory lookup.
- `dm.sessionScope`: `per-user` (default) or `per-room`. Use `per-room` when you want each Matrix DM room to keep separate context even if the peer is the same.
- `dm.threadReplies`: DM-only thread policy override (`off`, `inbound`, `always`). It overrides the top-level `threadReplies` setting for both reply placement and session isolation in DMs.
- `execApprovals`: Matrix-native exec approval delivery (`enabled`, `approvers`, `target`, `agentFilter`, `sessionFilter`).
- `execApprovals.approvers`: Matrix user IDs allowed to approve exec requests. Optional when `dm.allowFrom` already identifies the approvers.

View File

@@ -655,6 +655,7 @@ Matrix is extension-backed and configured under `channels.matrix`.
- `sessionFilter`: optional session key patterns (substring or regex).
- `target`: where to send approval prompts. `"dm"` (default), `"channel"` (originating room), or `"both"`.
- Per-account overrides: `channels.matrix.accounts.<id>.execApprovals`.
- `channels.matrix.dm.sessionScope` controls how Matrix DMs group into sessions: `per-user` (default) shares by routed peer, while `per-room` isolates each DM room.
- Matrix status probes and live directory lookups use the same proxy policy as runtime traffic.
- Full Matrix configuration, targeting rules, and setup examples are documented in [Matrix](/channels/matrix).

View File

@@ -31,6 +31,18 @@ describe("MatrixConfigSchema SecretInput", () => {
expect(result.success).toBe(true);
});
it("accepts dm sessionScope overrides", () => {
const result = MatrixConfigSchema.safeParse({
homeserver: "https://matrix.example.org",
accessToken: "token",
dm: {
policy: "pairing",
sessionScope: "per-room",
},
});
expect(result.success).toBe(true);
});
it("accepts room-level account assignments", () => {
const result = MatrixConfigSchema.safeParse({
homeserver: "https://matrix.example.org",

View File

@@ -104,6 +104,7 @@ export const MatrixConfigSchema = z.object({
autoJoinAllowlist: AllowFromListSchema,
groupAllowFrom: AllowFromListSchema,
dm: buildNestedDmConfigSchema({
sessionScope: z.enum(["per-user", "per-room"]).optional(),
threadReplies: z.enum(["off", "inbound", "always"]).optional(),
}),
execApprovals: matrixExecApprovalsSchema,

View File

@@ -31,6 +31,7 @@ type MatrixHandlerTestHarnessOptions = {
replyToMode?: ReplyToMode;
threadReplies?: "off" | "inbound" | "always";
dmThreadReplies?: "off" | "inbound" | "always";
dmSessionScope?: "per-user" | "per-room";
streaming?: "partial" | "off";
blockStreamingEnabled?: boolean;
dmEnabled?: boolean;
@@ -214,6 +215,7 @@ export function createMatrixHandlerTestHarness(
replyToMode: options.replyToMode ?? "off",
threadReplies: options.threadReplies ?? "inbound",
dmThreadReplies: options.dmThreadReplies,
dmSessionScope: options.dmSessionScope,
streaming: options.streaming ?? "off",
blockStreamingEnabled: options.blockStreamingEnabled ?? false,
dmEnabled: options.dmEnabled ?? true,

View File

@@ -1,3 +1,7 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { recordSessionMetaFromInbound } from "openclaw/plugin-sdk/config-runtime";
import {
__testing as sessionBindingTesting,
registerSessionBindingAdapter,
@@ -682,6 +686,423 @@ describe("matrix monitor handler pairing account scope", () => {
);
});
it("posts a one-time notice when another Matrix DM room already owns the shared DM session", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-shared-notice-"));
const storePath = path.join(tempDir, "sessions.json");
const sendNotice = vi.fn(async () => "$notice");
try {
await recordSessionMetaFromInbound({
storePath,
sessionKey: "agent:ops:main",
ctx: {
SessionKey: "agent:ops:main",
AccountId: "ops",
ChatType: "direct",
Provider: "matrix",
Surface: "matrix",
From: "matrix:@user:example.org",
To: "room:!other:example.org",
NativeChannelId: "!other:example.org",
OriginatingChannel: "matrix",
OriginatingTo: "room:!other:example.org",
},
});
const { handler } = createMatrixHandlerTestHarness({
isDirectMessage: true,
resolveStorePath: () => storePath,
client: {
sendMessage: sendNotice,
},
});
await handler(
"!dm:example.org",
createMatrixTextMessageEvent({
eventId: "$dm1",
body: "follow up",
}),
);
expect(sendNotice).toHaveBeenCalledWith(
"!dm:example.org",
expect.objectContaining({
msgtype: "m.notice",
body: expect.stringContaining("channels.matrix.dm.sessionScope"),
}),
);
await handler(
"!dm:example.org",
createMatrixTextMessageEvent({
eventId: "$dm2",
body: "again",
}),
);
expect(sendNotice).toHaveBeenCalledTimes(1);
} finally {
fs.rmSync(tempDir, { recursive: true, force: true });
}
});
it("checks flat DM collision notices against the current DM session key", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-flat-notice-"));
const storePath = path.join(tempDir, "sessions.json");
const sendNotice = vi.fn(async () => "$notice");
try {
await recordSessionMetaFromInbound({
storePath,
sessionKey: "agent:ops:matrix:direct:@user:example.org",
ctx: {
SessionKey: "agent:ops:matrix:direct:@user:example.org",
AccountId: "ops",
ChatType: "direct",
Provider: "matrix",
Surface: "matrix",
From: "matrix:@user:example.org",
To: "room:!other:example.org",
NativeChannelId: "!other:example.org",
OriginatingChannel: "matrix",
OriginatingTo: "room:!other:example.org",
},
});
const { handler } = createMatrixHandlerTestHarness({
isDirectMessage: true,
resolveStorePath: () => storePath,
resolveAgentRoute: () => ({
agentId: "ops",
channel: "matrix",
accountId: "ops",
sessionKey: "agent:ops:matrix:direct:@user:example.org",
mainSessionKey: "agent:ops:main",
matchedBy: "binding.account" as const,
}),
client: {
sendMessage: sendNotice,
},
});
await handler(
"!dm:example.org",
createMatrixTextMessageEvent({
eventId: "$dm-flat-1",
body: "follow up",
}),
);
expect(sendNotice).toHaveBeenCalledWith(
"!dm:example.org",
expect.objectContaining({
msgtype: "m.notice",
body: expect.stringContaining("channels.matrix.dm.sessionScope"),
}),
);
} finally {
fs.rmSync(tempDir, { recursive: true, force: true });
}
});
it("checks threaded DM collision notices against the parent DM session", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-thread-notice-"));
const storePath = path.join(tempDir, "sessions.json");
const sendNotice = vi.fn(async () => "$notice");
try {
await recordSessionMetaFromInbound({
storePath,
sessionKey: "agent:ops:main",
ctx: {
SessionKey: "agent:ops:main",
AccountId: "ops",
ChatType: "direct",
Provider: "matrix",
Surface: "matrix",
From: "matrix:@user:example.org",
To: "room:!other:example.org",
NativeChannelId: "!other:example.org",
OriginatingChannel: "matrix",
OriginatingTo: "room:!other:example.org",
},
});
const { handler } = createMatrixHandlerTestHarness({
isDirectMessage: true,
threadReplies: "always",
resolveStorePath: () => storePath,
client: {
sendMessage: sendNotice,
getEvent: async (_roomId, eventId) =>
eventId === "$root"
? createMatrixTextMessageEvent({
eventId: "$root",
sender: "@alice:example.org",
body: "Root topic",
})
: ({ sender: "@bot:example.org" } as never),
},
getMemberDisplayName: async (_roomId, userId) =>
userId === "@alice:example.org" ? "Alice" : "sender",
});
await handler(
"!dm:example.org",
createMatrixTextMessageEvent({
eventId: "$reply1",
body: "follow up",
relatesTo: {
rel_type: "m.thread",
event_id: "$root",
"m.in_reply_to": { event_id: "$root" },
},
}),
);
expect(sendNotice).toHaveBeenCalledWith(
"!dm:example.org",
expect.objectContaining({
msgtype: "m.notice",
body: expect.stringContaining("channels.matrix.dm.sessionScope"),
}),
);
} finally {
fs.rmSync(tempDir, { recursive: true, force: true });
}
});
it("keeps the shared-session notice after user-target outbound metadata overwrites latest room fields", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-shared-notice-stable-"));
const storePath = path.join(tempDir, "sessions.json");
const sendNotice = vi.fn(async () => "$notice");
try {
await recordSessionMetaFromInbound({
storePath,
sessionKey: "agent:ops:main",
ctx: {
SessionKey: "agent:ops:main",
AccountId: "ops",
ChatType: "direct",
Provider: "matrix",
Surface: "matrix",
From: "matrix:@user:example.org",
To: "room:!other:example.org",
NativeChannelId: "!other:example.org",
OriginatingChannel: "matrix",
OriginatingTo: "room:!other:example.org",
},
});
await recordSessionMetaFromInbound({
storePath,
sessionKey: "agent:ops:main",
ctx: {
SessionKey: "agent:ops:main",
AccountId: "ops",
ChatType: "direct",
Provider: "matrix",
Surface: "matrix",
From: "matrix:@other:example.org",
To: "room:@other:example.org",
NativeDirectUserId: "@user:example.org",
OriginatingChannel: "matrix",
OriginatingTo: "room:@other:example.org",
},
});
const { handler } = createMatrixHandlerTestHarness({
isDirectMessage: true,
resolveStorePath: () => storePath,
client: {
sendMessage: sendNotice,
},
});
await handler(
"!dm:example.org",
createMatrixTextMessageEvent({
eventId: "$dm1",
body: "follow up",
}),
);
expect(sendNotice).toHaveBeenCalledWith(
"!dm:example.org",
expect.objectContaining({
msgtype: "m.notice",
body: expect.stringContaining("channels.matrix.dm.sessionScope"),
}),
);
} finally {
fs.rmSync(tempDir, { recursive: true, force: true });
}
});
it("skips the shared-session notice when the prior Matrix session metadata is not a DM", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-shared-notice-room-"));
const storePath = path.join(tempDir, "sessions.json");
const sendNotice = vi.fn(async () => "$notice");
try {
await recordSessionMetaFromInbound({
storePath,
sessionKey: "agent:ops:main",
ctx: {
SessionKey: "agent:ops:main",
AccountId: "ops",
ChatType: "group",
Provider: "matrix",
Surface: "matrix",
From: "matrix:channel:!group:example.org",
To: "room:!group:example.org",
NativeChannelId: "!group:example.org",
OriginatingChannel: "matrix",
OriginatingTo: "room:!group:example.org",
},
});
const { handler } = createMatrixHandlerTestHarness({
isDirectMessage: true,
resolveStorePath: () => storePath,
client: {
sendMessage: sendNotice,
},
});
await handler(
"!dm:example.org",
createMatrixTextMessageEvent({
eventId: "$dm1",
body: "follow up",
}),
);
expect(sendNotice).not.toHaveBeenCalled();
} finally {
fs.rmSync(tempDir, { recursive: true, force: true });
}
});
it("skips the shared-session notice when Matrix DMs are isolated per room", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-room-scope-"));
const storePath = path.join(tempDir, "sessions.json");
fs.writeFileSync(
storePath,
JSON.stringify({
"agent:ops:main": {
sessionId: "sess-main",
updatedAt: Date.now(),
deliveryContext: {
channel: "matrix",
to: "room:!other:example.org",
accountId: "ops",
},
},
}),
"utf8",
);
const sendNotice = vi.fn(async () => "$notice");
try {
const { handler, recordInboundSession } = createMatrixHandlerTestHarness({
isDirectMessage: true,
dmSessionScope: "per-room",
resolveStorePath: () => storePath,
client: {
sendMessage: sendNotice,
},
});
await handler(
"!dm:example.org",
createMatrixTextMessageEvent({
eventId: "$dm1",
body: "follow up",
}),
);
expect(sendNotice).not.toHaveBeenCalled();
expect(recordInboundSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey: "agent:ops:matrix:channel:!dm:example.org",
}),
);
} finally {
fs.rmSync(tempDir, { recursive: true, force: true });
}
});
it("skips the shared-session notice when a Matrix DM is explicitly bound", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-dm-bound-notice-"));
const storePath = path.join(tempDir, "sessions.json");
fs.writeFileSync(
storePath,
JSON.stringify({
"agent:bound:session-1": {
sessionId: "sess-bound",
updatedAt: Date.now(),
deliveryContext: {
channel: "matrix",
to: "room:!other:example.org",
accountId: "ops",
},
},
}),
"utf8",
);
const sendNotice = vi.fn(async () => "$notice");
const touch = vi.fn();
registerSessionBindingAdapter({
channel: "matrix",
accountId: "ops",
listBySession: () => [],
resolveByConversation: (ref) =>
ref.conversationId === "!dm:example.org"
? {
bindingId: "ops:!dm:example.org",
targetSessionKey: "agent:bound:session-1",
targetKind: "session",
conversation: {
channel: "matrix",
accountId: "ops",
conversationId: "!dm:example.org",
},
status: "active",
boundAt: Date.now(),
metadata: {
boundBy: "user-1",
},
}
: null,
touch,
});
try {
const { handler } = createMatrixHandlerTestHarness({
isDirectMessage: true,
resolveStorePath: () => storePath,
client: {
sendMessage: sendNotice,
},
});
await handler(
"!dm:example.org",
createMatrixTextMessageEvent({
eventId: "$dm-bound-1",
body: "follow up",
}),
);
expect(sendNotice).not.toHaveBeenCalled();
expect(touch).toHaveBeenCalledOnce();
} finally {
fs.rmSync(tempDir, { recursive: true, force: true });
}
});
it("uses stable room ids instead of room-declared aliases in group context", async () => {
const { handler, finalizeInboundContext } = createMatrixHandlerTestHarness({
isDirectMessage: false,

View File

@@ -1,5 +1,9 @@
import { resolveControlCommandGate } from "openclaw/plugin-sdk/command-auth";
import { resolveChannelContextVisibilityMode } from "openclaw/plugin-sdk/config-runtime";
import {
loadSessionStore,
resolveChannelContextVisibilityMode,
resolveSessionStoreEntry,
} from "openclaw/plugin-sdk/config-runtime";
import { getSessionBindingService } from "openclaw/plugin-sdk/conversation-runtime";
import { evaluateSupplementalContextVisibility } from "openclaw/plugin-sdk/security-runtime";
import type { CoreConfig, MatrixRoomConfig, ReplyToMode } from "../../types.js";
@@ -27,6 +31,7 @@ import {
sendReadReceiptMatrix,
sendTypingMatrix,
} from "../send.js";
import { resolveMatrixStoredSessionMeta } from "../session-store-metadata.js";
import { resolveMatrixMonitorAccessState } from "./access-state.js";
import { resolveMatrixAckReactionConfig } from "./ack-config.js";
import { resolveMatrixAllowListMatch } from "./allowlist.js";
@@ -68,6 +73,7 @@ import { isMatrixVerificationRoomMessage } from "./verification-utils.js";
const ALLOW_FROM_STORE_CACHE_TTL_MS = 30_000;
const PAIRING_REPLY_COOLDOWN_MS = 5 * 60_000;
const MAX_TRACKED_PAIRING_REPLY_SENDERS = 512;
const MAX_TRACKED_SHARED_DM_CONTEXT_NOTICES = 512;
type MatrixAllowBotsMode = "off" | "mentions" | "all";
export type MatrixMonitorHandlerParams = {
@@ -88,6 +94,8 @@ export type MatrixMonitorHandlerParams = {
threadReplies: "off" | "inbound" | "always";
/** DM-specific threadReplies override. Falls back to threadReplies when absent. */
dmThreadReplies?: "off" | "inbound" | "always";
/** DM session grouping behavior. */
dmSessionScope?: "per-user" | "per-room";
streaming: "partial" | "off";
blockStreamingEnabled: boolean;
dmEnabled: boolean;
@@ -163,6 +171,73 @@ function resolveMatrixInboundBodyText(params: {
});
}
function markTrackedRoomIfFirst(set: Set<string>, roomId: string): boolean {
if (set.has(roomId)) {
return false;
}
set.add(roomId);
if (set.size > MAX_TRACKED_SHARED_DM_CONTEXT_NOTICES) {
const oldest = set.keys().next().value;
if (typeof oldest === "string") {
set.delete(oldest);
}
}
return true;
}
function resolveMatrixSharedDmContextNotice(params: {
storePath: string;
sessionKey: string;
roomId: string;
accountId: string;
dmSessionScope?: "per-user" | "per-room";
sentRooms: Set<string>;
logVerboseMessage: (message: string) => void;
}): string | null {
if ((params.dmSessionScope ?? "per-user") === "per-room") {
return null;
}
if (params.sentRooms.has(params.roomId)) {
return null;
}
try {
const store = loadSessionStore(params.storePath);
const currentSession = resolveMatrixStoredSessionMeta(
resolveSessionStoreEntry({
store,
sessionKey: params.sessionKey,
}).existing,
);
if (!currentSession) {
return null;
}
if (currentSession.channel && currentSession.channel !== "matrix") {
return null;
}
if (currentSession.accountId && currentSession.accountId !== params.accountId) {
return null;
}
if (!currentSession.directUserId) {
return null;
}
if (!currentSession.roomId || currentSession.roomId === params.roomId) {
return null;
}
return [
"This Matrix DM is sharing a session with another Matrix DM room.",
"Use /focus here for a one-off isolated thread session when thread bindings are enabled, or set",
"channels.matrix.dm.sessionScope to per-room to isolate each Matrix DM room.",
].join(" ");
} catch (err) {
params.logVerboseMessage(
`matrix: failed checking shared DM session notice room=${params.roomId} (${String(err)})`,
);
return null;
}
}
function resolveMatrixPendingHistoryText(params: {
mentionPrecheckText: string;
content: RoomMessageEventContent;
@@ -214,6 +289,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
replyToMode,
threadReplies,
dmThreadReplies,
dmSessionScope,
streaming,
blockStreamingEnabled,
dmEnabled,
@@ -252,6 +328,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
});
const roomHistoryTracker = createRoomHistoryTracker();
const roomIngressTails = new Map<string, Promise<void>>();
const sharedDmContextNoticeRooms = new Set<string>();
const readStoreAllowFrom = async (): Promise<string[]> => {
const now = Date.now();
@@ -672,10 +749,12 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
roomId,
senderId,
isDirectMessage,
dmSessionScope,
threadId: thread.threadId,
eventTs: eventTs ?? undefined,
resolveAgentRoute: core.channel.routing.resolveAgentRoute,
});
const hasExplicitSessionBinding = _configuredBinding !== null || _runtimeBindingId !== null;
const agentMentionRegexes = core.channel.mentions.buildMentionRegexes(cfg, _route.agentId);
const selfDisplayName = content.formatted_body
? await getMemberDisplayName(roomId, selfUserId).catch(() => undefined)
@@ -870,6 +949,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
return {
route: _route,
hasExplicitSessionBinding,
roomConfig,
isDirectMessage,
isRoom,
@@ -922,6 +1002,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
const {
route: _route,
hasExplicitSessionBinding,
roomConfig,
isDirectMessage,
isRoom,
@@ -1023,6 +1104,22 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
storePath,
sessionKey: _route.sessionKey,
});
const sharedDmNoticeSessionKey = threadTarget
? _route.mainSessionKey || _route.sessionKey
: _route.sessionKey;
const sharedDmContextNotice = isDirectMessage
? hasExplicitSessionBinding
? null
: resolveMatrixSharedDmContextNotice({
storePath,
sessionKey: sharedDmNoticeSessionKey,
roomId,
accountId: _route.accountId,
dmSessionScope,
sentRooms: sharedDmContextNoticeRooms,
logVerboseMessage,
})
: null;
const body = core.channel.reply.formatAgentEnvelope({
channel: "Matrix",
from: envelopeFrom,
@@ -1065,6 +1162,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
...locationPayload?.context,
CommandAuthorized: commandAuthorized,
CommandSource: "text" as const,
NativeChannelId: roomId,
NativeDirectUserId: isDirectMessage ? senderId : undefined,
OriginatingChannel: "matrix" as const,
OriginatingTo: `room:${roomId}`,
});
@@ -1090,6 +1189,19 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
},
});
if (sharedDmContextNotice && markTrackedRoomIfFirst(sharedDmContextNoticeRooms, roomId)) {
client
.sendMessage(roomId, {
msgtype: "m.notice",
body: sharedDmContextNotice,
})
.catch((err) => {
logVerboseMessage(
`matrix: failed sending shared DM session notice room=${roomId}: ${String(err)}`,
);
});
}
const preview = bodyText.slice(0, 200).replace(/\n/g, "\\n");
logVerboseMessage(`matrix inbound: room=${roomId} from=${senderId} preview="${preview}"`);

View File

@@ -207,6 +207,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
const dmEnabled = dmConfig?.enabled ?? true;
const dmPolicyRaw = dmConfig?.policy ?? "pairing";
const dmPolicy = allowlistOnly && dmPolicyRaw !== "disabled" ? "allowlist" : dmPolicyRaw;
const dmSessionScope = dmConfig?.sessionScope ?? "per-user";
const textLimit = core.channel.text.resolveTextChunkLimit(cfg, "matrix", effectiveAccountId);
const globalGroupChatHistoryLimit = (
cfg.messages as { groupChat?: { historyLimit?: number } } | undefined
@@ -271,6 +272,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
replyToMode,
threadReplies,
dmThreadReplies,
dmSessionScope,
streaming,
blockStreamingEnabled,
dmEnabled,

View File

@@ -169,6 +169,7 @@ export async function handleInboundMatrixReaction(params: {
roomId: params.roomId,
senderId: params.senderId,
isDirectMessage: params.isDirectMessage,
dmSessionScope: accountConfig.dm?.sessionScope ?? "per-user",
threadId: thread.threadId,
eventTs: params.event.origin_server_ts,
resolveAgentRoute: params.core.channel.routing.resolveAgentRoute,

View File

@@ -17,13 +17,19 @@ const baseCfg = {
},
} satisfies OpenClawConfig;
function resolveDmRoute(cfg: OpenClawConfig) {
function resolveDmRoute(
cfg: OpenClawConfig,
opts: {
dmSessionScope?: "per-user" | "per-room";
} = {},
) {
return resolveMatrixInboundRoute({
cfg,
accountId: "ops",
roomId: "!dm:example.org",
senderId: "@alice:example.org",
isDirectMessage: true,
dmSessionScope: opts.dmSessionScope,
resolveAgentRoute,
});
}
@@ -97,6 +103,33 @@ describe("resolveMatrixInboundRoute", () => {
expect(route.sessionKey).toBe("agent:room-agent:main");
});
it("can isolate Matrix DMs per room without changing agent selection", () => {
const cfg = {
...baseCfg,
bindings: [
{
agentId: "sender-agent",
match: {
channel: "matrix",
accountId: "ops",
peer: { kind: "direct", id: "@alice:example.org" },
},
},
],
} satisfies OpenClawConfig;
const { route, configuredBinding } = resolveDmRoute(cfg, {
dmSessionScope: "per-room",
});
expect(configuredBinding).toBeNull();
expect(route.agentId).toBe("sender-agent");
expect(route.matchedBy).toBe("binding.peer");
expect(route.sessionKey).toBe("agent:sender-agent:matrix:channel:!dm:example.org");
expect(route.mainSessionKey).toBe("agent:sender-agent:main");
expect(route.lastRoutePolicy).toBe("session");
});
it("lets configured ACP room bindings override DM parent-peer routing", () => {
const cfg = {
...baseCfg,
@@ -130,6 +163,42 @@ describe("resolveMatrixInboundRoute", () => {
expect(route.lastRoutePolicy).toBe("session");
});
it("keeps configured ACP room bindings ahead of per-room DM session scope", () => {
const cfg = {
...baseCfg,
bindings: [
{
agentId: "room-agent",
match: {
channel: "matrix",
accountId: "ops",
peer: { kind: "channel", id: "!dm:example.org" },
},
},
{
type: "acp",
agentId: "acp-agent",
match: {
channel: "matrix",
accountId: "ops",
peer: { kind: "channel", id: "!dm:example.org" },
},
},
],
} satisfies OpenClawConfig;
const { route, configuredBinding } = resolveDmRoute(cfg, {
dmSessionScope: "per-room",
});
expect(configuredBinding?.spec.agentId).toBe("acp-agent");
expect(route.agentId).toBe("acp-agent");
expect(route.matchedBy).toBe("binding.channel");
expect(route.sessionKey).toContain("agent:acp-agent:acp:binding:matrix:ops:");
expect(route.sessionKey).not.toBe("agent:acp-agent:matrix:channel:!dm:example.org");
expect(route.lastRoutePolicy).toBe("session");
});
it("lets runtime conversation bindings override both sender and room route matches", () => {
const touch = vi.fn();
registerSessionBindingAdapter({

View File

@@ -1,4 +1,4 @@
import { deriveLastRoutePolicy } from "openclaw/plugin-sdk/routing";
import { buildAgentSessionKey, deriveLastRoutePolicy } from "openclaw/plugin-sdk/routing";
import {
getSessionBindingService,
resolveAgentIdFromSessionKey,
@@ -10,12 +10,41 @@ import { resolveMatrixThreadSessionKeys } from "./threads.js";
type MatrixResolvedRoute = ReturnType<PluginRuntime["channel"]["routing"]["resolveAgentRoute"]>;
function resolveMatrixDmSessionKey(params: {
accountId: string;
agentId: string;
roomId: string;
dmSessionScope?: "per-user" | "per-room";
fallbackSessionKey: string;
}): string {
if (params.dmSessionScope !== "per-room") {
return params.fallbackSessionKey;
}
return buildAgentSessionKey({
agentId: params.agentId,
channel: "matrix",
accountId: params.accountId,
peer: {
kind: "channel",
id: params.roomId,
},
}).toLowerCase();
}
function shouldApplyMatrixPerRoomDmSessionScope(params: {
isDirectMessage: boolean;
configuredSessionKey?: string;
}): boolean {
return params.isDirectMessage && !params.configuredSessionKey;
}
export function resolveMatrixInboundRoute(params: {
cfg: CoreConfig;
accountId: string;
roomId: string;
senderId: string;
isDirectMessage: boolean;
dmSessionScope?: "per-user" | "per-room";
threadId?: string;
eventTs?: number;
resolveAgentRoute: PluginRuntime["channel"]["routing"]["resolveAgentRoute"];
@@ -98,21 +127,42 @@ export function resolveMatrixInboundRoute(params: {
}
: baseRoute;
const dmSessionKey = shouldApplyMatrixPerRoomDmSessionScope({
isDirectMessage: params.isDirectMessage,
configuredSessionKey,
})
? resolveMatrixDmSessionKey({
accountId: params.accountId,
agentId: effectiveRoute.agentId,
roomId: params.roomId,
dmSessionScope: params.dmSessionScope,
fallbackSessionKey: effectiveRoute.sessionKey,
})
: effectiveRoute.sessionKey;
const routeWithDmScope =
dmSessionKey === effectiveRoute.sessionKey
? effectiveRoute
: {
...effectiveRoute,
sessionKey: dmSessionKey,
lastRoutePolicy: "session" as const,
};
// When no binding overrides the session key, isolate threads into their own sessions.
if (!configuredBinding && !configuredSessionKey && params.threadId) {
const threadKeys = resolveMatrixThreadSessionKeys({
baseSessionKey: effectiveRoute.sessionKey,
baseSessionKey: routeWithDmScope.sessionKey,
threadId: params.threadId,
parentSessionKey: effectiveRoute.sessionKey,
parentSessionKey: routeWithDmScope.sessionKey,
});
return {
route: {
...effectiveRoute,
...routeWithDmScope,
sessionKey: threadKeys.sessionKey,
mainSessionKey: threadKeys.parentSessionKey ?? effectiveRoute.sessionKey,
mainSessionKey: threadKeys.parentSessionKey ?? routeWithDmScope.sessionKey,
lastRoutePolicy: deriveLastRoutePolicy({
sessionKey: threadKeys.sessionKey,
mainSessionKey: threadKeys.parentSessionKey ?? effectiveRoute.sessionKey,
mainSessionKey: threadKeys.parentSessionKey ?? routeWithDmScope.sessionKey,
}),
},
configuredBinding,
@@ -121,7 +171,7 @@ export function resolveMatrixInboundRoute(params: {
}
return {
route: effectiveRoute,
route: routeWithDmScope,
configuredBinding,
runtimeBindingId: null,
};

View File

@@ -0,0 +1,108 @@
import { normalizeAccountId } from "openclaw/plugin-sdk/account-id";
import { resolveMatrixDirectUserId, resolveMatrixTargetIdentity } from "./target-ids.js";
export function trimMaybeString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : undefined;
}
function resolveMatrixRoomTargetId(value: unknown): string | undefined {
const trimmed = trimMaybeString(value);
if (!trimmed) {
return undefined;
}
const target = resolveMatrixTargetIdentity(trimmed);
return target?.kind === "room" && target.id.startsWith("!") ? target.id : undefined;
}
export function resolveMatrixSessionAccountId(value: unknown): string | undefined {
const trimmed = trimMaybeString(value);
return trimmed ? normalizeAccountId(trimmed) : undefined;
}
export function resolveMatrixStoredRoomId(params: {
deliveryTo?: unknown;
lastTo?: unknown;
originNativeChannelId?: unknown;
originTo?: unknown;
}): string | undefined {
return (
resolveMatrixRoomTargetId(params.deliveryTo) ??
resolveMatrixRoomTargetId(params.lastTo) ??
resolveMatrixRoomTargetId(params.originNativeChannelId) ??
resolveMatrixRoomTargetId(params.originTo)
);
}
type MatrixStoredSessionEntryLike = {
deliveryContext?: {
channel?: unknown;
to?: unknown;
accountId?: unknown;
};
origin?: {
provider?: unknown;
from?: unknown;
to?: unknown;
nativeChannelId?: unknown;
nativeDirectUserId?: unknown;
accountId?: unknown;
chatType?: unknown;
};
lastChannel?: unknown;
lastTo?: unknown;
lastAccountId?: unknown;
chatType?: unknown;
};
export function resolveMatrixStoredSessionMeta(entry?: MatrixStoredSessionEntryLike): {
channel?: string;
accountId?: string;
roomId?: string;
directUserId?: string;
} | null {
if (!entry) {
return null;
}
const channel =
trimMaybeString(entry.deliveryContext?.channel) ??
trimMaybeString(entry.lastChannel) ??
trimMaybeString(entry.origin?.provider);
const accountId =
resolveMatrixSessionAccountId(
entry.deliveryContext?.accountId ?? entry.lastAccountId ?? entry.origin?.accountId,
) ?? undefined;
const roomId = resolveMatrixStoredRoomId({
deliveryTo: entry.deliveryContext?.to,
lastTo: entry.lastTo,
originNativeChannelId: entry.origin?.nativeChannelId,
originTo: entry.origin?.to,
});
const chatType =
trimMaybeString(entry.origin?.chatType) ?? trimMaybeString(entry.chatType) ?? undefined;
const directUserId =
chatType === "direct"
? (trimMaybeString(entry.origin?.nativeDirectUserId) ??
resolveMatrixDirectUserId({
from: trimMaybeString(entry.origin?.from),
to:
(roomId ? `room:${roomId}` : undefined) ??
trimMaybeString(entry.deliveryContext?.to) ??
trimMaybeString(entry.lastTo) ??
trimMaybeString(entry.origin?.to),
chatType,
}))
: undefined;
if (!channel && !accountId && !roomId && !directUserId) {
return null;
}
return {
...(channel ? { channel } : {}),
...(accountId ? { accountId } : {}),
...(roomId ? { roomId } : {}),
...(directUserId ? { directUserId } : {}),
};
}

View File

@@ -0,0 +1,483 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import type { OpenClawConfig } from "./runtime-api.js";
import { resolveMatrixOutboundSessionRoute } from "./session-route.js";
const tempDirs = new Set<string>();
function createTempStore(entries: Record<string, unknown>): string {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-session-route-"));
tempDirs.add(tempDir);
const storePath = path.join(tempDir, "sessions.json");
fs.writeFileSync(storePath, JSON.stringify(entries), "utf8");
return storePath;
}
afterEach(() => {
for (const tempDir of tempDirs) {
fs.rmSync(tempDir, { recursive: true, force: true });
}
tempDirs.clear();
});
describe("resolveMatrixOutboundSessionRoute", () => {
it("reuses the current DM room session for same-user sends when Matrix DMs are per-room", () => {
const storePath = createTempStore({
"agent:main:matrix:channel:!dm:example.org": {
sessionId: "sess-1",
updatedAt: Date.now(),
chatType: "direct",
origin: {
chatType: "direct",
from: "matrix:@alice:example.org",
to: "room:!dm:example.org",
accountId: "ops",
},
deliveryContext: {
channel: "matrix",
to: "room:!dm:example.org",
accountId: "ops",
},
},
});
const cfg = {
session: {
store: storePath,
},
channels: {
matrix: {
dm: {
sessionScope: "per-room",
},
},
},
} satisfies OpenClawConfig;
const route = resolveMatrixOutboundSessionRoute({
cfg,
agentId: "main",
accountId: "ops",
currentSessionKey: "agent:main:matrix:channel:!dm:example.org",
target: "@alice:example.org",
resolvedTarget: {
to: "@alice:example.org",
kind: "user",
source: "normalized",
},
});
expect(route).toMatchObject({
sessionKey: "agent:main:matrix:channel:!dm:example.org",
baseSessionKey: "agent:main:matrix:channel:!dm:example.org",
peer: { kind: "channel", id: "!dm:example.org" },
chatType: "direct",
from: "matrix:@alice:example.org",
to: "room:!dm:example.org",
});
});
it("falls back to user-scoped routing when the current session is for another DM peer", () => {
const storePath = createTempStore({
"agent:main:matrix:channel:!dm:example.org": {
sessionId: "sess-1",
updatedAt: Date.now(),
chatType: "direct",
origin: {
chatType: "direct",
from: "matrix:@bob:example.org",
to: "room:!dm:example.org",
accountId: "ops",
},
deliveryContext: {
channel: "matrix",
to: "room:!dm:example.org",
accountId: "ops",
},
},
});
const cfg = {
session: {
store: storePath,
},
channels: {
matrix: {
dm: {
sessionScope: "per-room",
},
},
},
} satisfies OpenClawConfig;
const route = resolveMatrixOutboundSessionRoute({
cfg,
agentId: "main",
accountId: "ops",
currentSessionKey: "agent:main:matrix:channel:!dm:example.org",
target: "@alice:example.org",
resolvedTarget: {
to: "@alice:example.org",
kind: "user",
source: "normalized",
},
});
expect(route).toMatchObject({
sessionKey: "agent:main:main",
baseSessionKey: "agent:main:main",
peer: { kind: "direct", id: "@alice:example.org" },
chatType: "direct",
from: "matrix:@alice:example.org",
to: "room:@alice:example.org",
});
});
it("falls back to user-scoped routing when the current session belongs to another Matrix account", () => {
const storePath = createTempStore({
"agent:main:matrix:channel:!dm:example.org": {
sessionId: "sess-1",
updatedAt: Date.now(),
chatType: "direct",
origin: {
chatType: "direct",
from: "matrix:@alice:example.org",
to: "room:!dm:example.org",
accountId: "ops",
},
deliveryContext: {
channel: "matrix",
to: "room:!dm:example.org",
accountId: "ops",
},
},
});
const cfg = {
session: {
store: storePath,
},
channels: {
matrix: {
dm: {
sessionScope: "per-room",
},
},
},
} satisfies OpenClawConfig;
const route = resolveMatrixOutboundSessionRoute({
cfg,
agentId: "main",
accountId: "support",
currentSessionKey: "agent:main:matrix:channel:!dm:example.org",
target: "@alice:example.org",
resolvedTarget: {
to: "@alice:example.org",
kind: "user",
source: "normalized",
},
});
expect(route).toMatchObject({
sessionKey: "agent:main:main",
baseSessionKey: "agent:main:main",
peer: { kind: "direct", id: "@alice:example.org" },
chatType: "direct",
from: "matrix:@alice:example.org",
to: "room:@alice:example.org",
});
});
it("reuses the canonical DM room after user-target outbound metadata overwrites latest to fields", () => {
const storePath = createTempStore({
"agent:main:matrix:channel:!dm:example.org": {
sessionId: "sess-1",
updatedAt: Date.now(),
chatType: "direct",
origin: {
chatType: "direct",
from: "matrix:@bob:example.org",
to: "room:@bob:example.org",
nativeChannelId: "!dm:example.org",
nativeDirectUserId: "@alice:example.org",
accountId: "ops",
},
deliveryContext: {
channel: "matrix",
to: "room:@bob:example.org",
accountId: "ops",
},
lastTo: "room:@bob:example.org",
lastAccountId: "ops",
},
});
const cfg = {
session: {
store: storePath,
},
channels: {
matrix: {
dm: {
sessionScope: "per-room",
},
},
},
} satisfies OpenClawConfig;
const route = resolveMatrixOutboundSessionRoute({
cfg,
agentId: "main",
accountId: "ops",
currentSessionKey: "agent:main:matrix:channel:!dm:example.org",
target: "@alice:example.org",
resolvedTarget: {
to: "@alice:example.org",
kind: "user",
source: "normalized",
},
});
expect(route).toMatchObject({
sessionKey: "agent:main:matrix:channel:!dm:example.org",
baseSessionKey: "agent:main:matrix:channel:!dm:example.org",
peer: { kind: "channel", id: "!dm:example.org" },
chatType: "direct",
from: "matrix:@alice:example.org",
to: "room:!dm:example.org",
});
});
it("does not reuse the canonical DM room for a different Matrix user after latest metadata drift", () => {
const storePath = createTempStore({
"agent:main:matrix:channel:!dm:example.org": {
sessionId: "sess-1",
updatedAt: Date.now(),
chatType: "direct",
origin: {
chatType: "direct",
from: "matrix:@bob:example.org",
to: "room:@bob:example.org",
nativeChannelId: "!dm:example.org",
nativeDirectUserId: "@alice:example.org",
accountId: "ops",
},
deliveryContext: {
channel: "matrix",
to: "room:@bob:example.org",
accountId: "ops",
},
lastTo: "room:@bob:example.org",
lastAccountId: "ops",
},
});
const cfg = {
session: {
store: storePath,
},
channels: {
matrix: {
dm: {
sessionScope: "per-room",
},
},
},
} satisfies OpenClawConfig;
const route = resolveMatrixOutboundSessionRoute({
cfg,
agentId: "main",
accountId: "ops",
currentSessionKey: "agent:main:matrix:channel:!dm:example.org",
target: "@bob:example.org",
resolvedTarget: {
to: "@bob:example.org",
kind: "user",
source: "normalized",
},
});
expect(route).toMatchObject({
sessionKey: "agent:main:main",
baseSessionKey: "agent:main:main",
peer: { kind: "direct", id: "@bob:example.org" },
chatType: "direct",
from: "matrix:@bob:example.org",
to: "room:@bob:example.org",
});
});
it("does not reuse a room after the session metadata was overwritten by a non-DM Matrix send", () => {
const storePath = createTempStore({
"agent:main:matrix:channel:!dm:example.org": {
sessionId: "sess-1",
updatedAt: Date.now(),
chatType: "channel",
origin: {
chatType: "channel",
from: "matrix:channel:!ops:example.org",
to: "room:!ops:example.org",
nativeChannelId: "!ops:example.org",
nativeDirectUserId: "@alice:example.org",
accountId: "ops",
},
deliveryContext: {
channel: "matrix",
to: "room:!ops:example.org",
accountId: "ops",
},
lastTo: "room:!ops:example.org",
lastAccountId: "ops",
},
});
const cfg = {
session: {
store: storePath,
},
channels: {
matrix: {
dm: {
sessionScope: "per-room",
},
},
},
} satisfies OpenClawConfig;
const route = resolveMatrixOutboundSessionRoute({
cfg,
agentId: "main",
accountId: "ops",
currentSessionKey: "agent:main:matrix:channel:!dm:example.org",
target: "@alice:example.org",
resolvedTarget: {
to: "@alice:example.org",
kind: "user",
source: "normalized",
},
});
expect(route).toMatchObject({
sessionKey: "agent:main:main",
baseSessionKey: "agent:main:main",
peer: { kind: "direct", id: "@alice:example.org" },
chatType: "direct",
from: "matrix:@alice:example.org",
to: "room:@alice:example.org",
});
});
it("uses the effective default Matrix account when accountId is omitted", () => {
const storePath = createTempStore({
"agent:main:matrix:channel:!dm:example.org": {
sessionId: "sess-1",
updatedAt: Date.now(),
chatType: "direct",
origin: {
chatType: "direct",
from: "matrix:@alice:example.org",
to: "room:!dm:example.org",
accountId: "ops",
},
deliveryContext: {
channel: "matrix",
to: "room:!dm:example.org",
accountId: "ops",
},
},
});
const cfg = {
session: {
store: storePath,
},
channels: {
matrix: {
defaultAccount: "ops",
accounts: {
ops: {
dm: {
sessionScope: "per-room",
},
},
},
},
},
} satisfies OpenClawConfig;
const route = resolveMatrixOutboundSessionRoute({
cfg,
agentId: "main",
currentSessionKey: "agent:main:matrix:channel:!dm:example.org",
target: "@alice:example.org",
resolvedTarget: {
to: "@alice:example.org",
kind: "user",
source: "normalized",
},
});
expect(route).toMatchObject({
sessionKey: "agent:main:matrix:channel:!dm:example.org",
baseSessionKey: "agent:main:matrix:channel:!dm:example.org",
peer: { kind: "channel", id: "!dm:example.org" },
chatType: "direct",
from: "matrix:@alice:example.org",
to: "room:!dm:example.org",
});
});
it("reuses the current DM room when stored account metadata is missing", () => {
const storePath = createTempStore({
"agent:main:matrix:channel:!dm:example.org": {
sessionId: "sess-1",
updatedAt: Date.now(),
chatType: "direct",
origin: {
chatType: "direct",
from: "matrix:@alice:example.org",
to: "room:!dm:example.org",
},
deliveryContext: {
channel: "matrix",
to: "room:!dm:example.org",
},
},
});
const cfg = {
session: {
store: storePath,
},
channels: {
matrix: {
defaultAccount: "ops",
accounts: {
ops: {
dm: {
sessionScope: "per-room",
},
},
},
},
},
} satisfies OpenClawConfig;
const route = resolveMatrixOutboundSessionRoute({
cfg,
agentId: "main",
currentSessionKey: "agent:main:matrix:channel:!dm:example.org",
target: "@alice:example.org",
resolvedTarget: {
to: "@alice:example.org",
kind: "user",
source: "normalized",
},
});
expect(route).toMatchObject({
sessionKey: "agent:main:matrix:channel:!dm:example.org",
baseSessionKey: "agent:main:matrix:channel:!dm:example.org",
peer: { kind: "channel", id: "!dm:example.org" },
chatType: "direct",
from: "matrix:@alice:example.org",
to: "room:!dm:example.org",
});
});
});

View File

@@ -1,29 +1,113 @@
import { normalizeAccountId } from "openclaw/plugin-sdk/account-id";
import {
buildChannelOutboundSessionRoute,
stripChannelTargetPrefix,
stripTargetKindPrefix,
type ChannelOutboundSessionRouteParams,
} from "openclaw/plugin-sdk/channel-core";
import {
loadSessionStore,
resolveSessionStoreEntry,
resolveStorePath,
} from "openclaw/plugin-sdk/config-runtime";
import { resolveMatrixAccountConfig } from "./matrix/account-config.js";
import { resolveDefaultMatrixAccountId } from "./matrix/accounts.js";
import { resolveMatrixStoredSessionMeta } from "./matrix/session-store-metadata.js";
import { resolveMatrixTargetIdentity } from "./matrix/target-ids.js";
function resolveEffectiveMatrixAccountId(
params: Pick<ChannelOutboundSessionRouteParams, "cfg" | "accountId">,
): string {
return normalizeAccountId(params.accountId ?? resolveDefaultMatrixAccountId(params.cfg));
}
function resolveMatrixDmSessionScope(params: {
cfg: ChannelOutboundSessionRouteParams["cfg"];
accountId: string;
}): "per-user" | "per-room" {
return (
resolveMatrixAccountConfig({
cfg: params.cfg,
accountId: params.accountId,
}).dm?.sessionScope ?? "per-user"
);
}
function resolveMatrixCurrentDmRoomId(params: {
cfg: ChannelOutboundSessionRouteParams["cfg"];
agentId: string;
accountId: string;
currentSessionKey?: string;
targetUserId: string;
}): string | undefined {
const sessionKey = params.currentSessionKey?.trim();
if (!sessionKey) {
return undefined;
}
try {
const storePath = resolveStorePath(params.cfg.session?.store, {
agentId: params.agentId,
});
const store = loadSessionStore(storePath);
const existing = resolveSessionStoreEntry({
store,
sessionKey,
}).existing;
const currentSession = resolveMatrixStoredSessionMeta(existing);
if (!currentSession) {
return undefined;
}
if (currentSession.accountId && currentSession.accountId !== params.accountId) {
return undefined;
}
if (!currentSession.directUserId || currentSession.directUserId !== params.targetUserId) {
return undefined;
}
return currentSession.roomId;
} catch {
return undefined;
}
}
export function resolveMatrixOutboundSessionRoute(params: ChannelOutboundSessionRouteParams) {
const stripped = stripChannelTargetPrefix(params.target, "matrix");
const isUser =
params.resolvedTarget?.kind === "user" || stripped.startsWith("@") || /^user:/i.test(stripped);
const rawId = stripTargetKindPrefix(stripped);
if (!rawId) {
const target =
resolveMatrixTargetIdentity(params.resolvedTarget?.to ?? params.target) ??
resolveMatrixTargetIdentity(params.target);
if (!target) {
return null;
}
const effectiveAccountId = resolveEffectiveMatrixAccountId(params);
const roomScopedDmId =
target.kind === "user" &&
resolveMatrixDmSessionScope({
cfg: params.cfg,
accountId: effectiveAccountId,
}) === "per-room"
? resolveMatrixCurrentDmRoomId({
cfg: params.cfg,
agentId: params.agentId,
accountId: effectiveAccountId,
currentSessionKey: params.currentSessionKey,
targetUserId: target.id,
})
: undefined;
const peer =
roomScopedDmId !== undefined
? { kind: "channel" as const, id: roomScopedDmId }
: {
kind: target.kind === "user" ? ("direct" as const) : ("channel" as const),
id: target.id,
};
const chatType = target.kind === "user" ? "direct" : "channel";
const from = target.kind === "user" ? `matrix:${target.id}` : `matrix:channel:${target.id}`;
const to = `room:${roomScopedDmId ?? target.id}`;
return buildChannelOutboundSessionRoute({
cfg: params.cfg,
agentId: params.agentId,
channel: "matrix",
accountId: params.accountId,
peer: {
kind: isUser ? "direct" : "channel",
id: rawId,
},
chatType: isUser ? "direct" : "channel",
from: isUser ? `matrix:${rawId}` : `matrix:channel:${rawId}`,
to: `room:${rawId}`,
accountId: effectiveAccountId,
peer,
chatType,
from,
to,
});
}

View File

@@ -16,6 +16,12 @@ export type MatrixDmConfig = {
policy?: DmPolicy;
/** Allowlist for DM senders (matrix user IDs or "*"). */
allowFrom?: Array<string | number>;
/**
* How Matrix DMs map to sessions.
* - `per-user` (default): all DM rooms with the same routed peer share one DM session.
* - `per-room`: each Matrix DM room gets its own session key.
*/
sessionScope?: "per-user" | "per-room";
/** Per-DM thread reply behavior override (off|inbound|always). Overrides top-level threadReplies for direct messages. */
threadReplies?: "off" | "inbound" | "always";
};

View File

@@ -161,6 +161,8 @@ export type MsgContext = {
MessageThreadId?: string | number;
/** Platform-native channel/conversation id (e.g. Slack DM channel "D…" id). */
NativeChannelId?: string;
/** Stable provider-native direct-peer id when a DM room/user mapping must survive later writes. */
NativeDirectUserId?: string;
/** Telegram forum supergroup marker. */
IsForum?: boolean;
/** Warning: DM has topics enabled but this message is not in a topic. */

View File

@@ -534,6 +534,7 @@ export type ChannelMessagingAdapter = {
agentId: string;
accountId?: string | null;
target: string;
currentSessionKey?: string;
resolvedTarget?: {
to: string;
kind: ChannelDirectoryEntryKind | "channel";

View File

@@ -32,6 +32,12 @@ const mergeOrigin = (
if (next?.to) {
merged.to = next.to;
}
if (next?.nativeChannelId) {
merged.nativeChannelId = next.nativeChannelId;
}
if (next?.nativeDirectUserId) {
merged.nativeDirectUserId = next.nativeDirectUserId;
}
if (next?.accountId) {
merged.accountId = next.accountId;
}
@@ -53,6 +59,8 @@ export function deriveSessionOrigin(ctx: MsgContext): SessionOrigin | undefined
const from = ctx.From?.trim();
const to =
(typeof ctx.OriginatingTo === "string" ? ctx.OriginatingTo : ctx.To)?.trim() ?? undefined;
const nativeChannelId = ctx.NativeChannelId?.trim();
const nativeDirectUserId = ctx.NativeDirectUserId?.trim();
const accountId = ctx.AccountId?.trim();
const threadId = ctx.MessageThreadId ?? undefined;
@@ -75,6 +83,12 @@ export function deriveSessionOrigin(ctx: MsgContext): SessionOrigin | undefined
if (to) {
origin.to = to;
}
if (nativeChannelId) {
origin.nativeChannelId = nativeChannelId;
}
if (nativeDirectUserId) {
origin.nativeDirectUserId = nativeDirectUserId;
}
if (accountId) {
origin.accountId = accountId;
}

View File

@@ -18,6 +18,8 @@ export type SessionOrigin = {
chatType?: SessionChatType;
from?: string;
to?: string;
nativeChannelId?: string;
nativeDirectUserId?: string;
accountId?: string;
threadId?: string | number;
};

View File

@@ -597,6 +597,74 @@ describe("gateway send mirroring", () => {
});
});
it("still resolves outbound routing metadata when a sessionKey is provided", async () => {
mockDeliverySuccess("m-matrix-session-route");
mocks.resolveOutboundSessionRoute.mockResolvedValueOnce({
sessionKey: "agent:main:matrix:channel:!dm:example.org",
baseSessionKey: "agent:main:matrix:channel:!dm:example.org",
peer: { kind: "channel", id: "!dm:example.org" },
chatType: "direct",
from: "matrix:@alice:example.org",
to: "room:!dm:example.org",
});
await runSend({
to: "@alice:example.org",
message: "hello",
channel: "matrix",
sessionKey: "agent:main:matrix:channel:!dm:example.org",
idempotencyKey: "idem-matrix-session-route",
});
expect(mocks.resolveOutboundSessionRoute).toHaveBeenCalledWith(
expect.objectContaining({
channel: "matrix",
target: "resolved",
currentSessionKey: "agent:main:matrix:channel:!dm:example.org",
}),
);
expect(mocks.ensureOutboundSessionEntry).toHaveBeenCalledWith(
expect.objectContaining({
route: expect.objectContaining({
sessionKey: "agent:main:matrix:channel:!dm:example.org",
baseSessionKey: "agent:main:matrix:channel:!dm:example.org",
to: "room:!dm:example.org",
}),
}),
);
expectDeliverySessionMirror({
agentId: "main",
sessionKey: "agent:main:matrix:channel:!dm:example.org",
});
});
it("falls back to the provided sessionKey when outbound route lookup returns null", async () => {
mockDeliverySuccess("m-session-fallback");
mocks.resolveOutboundSessionRoute.mockResolvedValueOnce(null);
await runSend({
to: "channel:C1",
message: "hello",
channel: "slack",
sessionKey: "agent:work:slack:channel:c1",
idempotencyKey: "idem-session-fallback",
});
expect(mocks.ensureOutboundSessionEntry).not.toHaveBeenCalled();
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
session: expect.objectContaining({
agentId: "work",
key: "agent:work:slack:channel:c1",
}),
mirror: expect.objectContaining({
sessionKey: "agent:work:slack:channel:c1",
agentId: "work",
}),
}),
);
});
it("prefers explicit agentId over sessionKey agent for delivery and mirror", async () => {
mockDeliverySuccess("m-agent-precedence");

View File

@@ -231,31 +231,39 @@ export const sendHandlers: GatewayRequestHandlers = {
: undefined;
const defaultAgentId = resolveSessionAgentId({ config: cfg });
const effectiveAgentId = explicitAgentId ?? sessionAgentId ?? defaultAgentId;
// If callers omit sessionKey, derive a target session key from the outbound route.
const derivedRoute = !providedSessionKey
? await resolveOutboundSessionRoute({
cfg,
channel,
agentId: effectiveAgentId,
accountId,
target: deliveryTarget,
resolvedTarget: idLikeTarget,
threadId,
})
const derivedRoute = await resolveOutboundSessionRoute({
cfg,
channel,
agentId: effectiveAgentId,
accountId,
target: deliveryTarget,
currentSessionKey: providedSessionKey,
resolvedTarget: idLikeTarget,
threadId,
});
const outboundRoute = derivedRoute
? providedSessionKey
? {
...derivedRoute,
sessionKey: providedSessionKey,
baseSessionKey: providedSessionKey,
}
: derivedRoute
: null;
if (derivedRoute) {
if (outboundRoute) {
await ensureOutboundSessionEntry({
cfg,
agentId: effectiveAgentId,
channel,
accountId,
route: derivedRoute,
route: outboundRoute,
});
}
const outboundSessionKey = outboundRoute?.sessionKey ?? providedSessionKey;
const outboundSession = buildOutboundSessionContext({
cfg,
agentId: effectiveAgentId,
sessionKey: providedSessionKey ?? derivedRoute?.sessionKey,
sessionKey: outboundSessionKey,
});
const results = await deliverOutboundPayloads({
cfg,
@@ -268,23 +276,15 @@ export const sendHandlers: GatewayRequestHandlers = {
threadId: threadId ?? null,
deps: outboundDeps,
gatewayClientScopes: client?.connect?.scopes ?? [],
mirror: providedSessionKey
mirror: outboundSessionKey
? {
sessionKey: providedSessionKey,
sessionKey: outboundSessionKey,
agentId: effectiveAgentId,
text: mirrorText || message,
mediaUrls: mirrorMediaUrls.length > 0 ? mirrorMediaUrls : undefined,
idempotencyKey: idem,
}
: derivedRoute
? {
sessionKey: derivedRoute.sessionKey,
agentId: effectiveAgentId,
text: mirrorText || message,
mediaUrls: mirrorMediaUrls.length > 0 ? mirrorMediaUrls : undefined,
idempotencyKey: idem,
}
: undefined,
: undefined,
});
const result = results.at(-1);

View File

@@ -502,6 +502,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise<MessageActi
accountId,
toolContext: input.toolContext,
agentId,
currentSessionKey: input.sessionKey,
dryRun,
resolvedTarget,
resolveAutoThreadId: getChannelPlugin(channel)?.threading?.resolveAutoThreadId,

View File

@@ -47,6 +47,7 @@ export async function prepareOutboundMirrorRoute(params: {
accountId?: string | null;
toolContext?: ChannelThreadingToolContext;
agentId?: string;
currentSessionKey?: string;
dryRun?: boolean;
resolvedTarget?: ResolvedMessagingTarget;
resolveAutoThreadId?: ResolveAutoThreadId;
@@ -80,6 +81,7 @@ export async function prepareOutboundMirrorRoute(params: {
agentId: params.agentId,
accountId: params.accountId,
target: params.to,
currentSessionKey: params.currentSessionKey,
resolvedTarget: params.resolvedTarget,
replyToId,
threadId: resolvedThreadId,

View File

@@ -26,6 +26,7 @@ export type ResolveOutboundSessionRouteParams = {
agentId: string;
accountId?: string | null;
target: string;
currentSessionKey?: string;
resolvedTarget?: ResolvedMessagingTarget;
replyToId?: string | null;
threadId?: string | number | null;