refactor(acp): extract generic current conversation binding store

This commit is contained in:
Peter Steinberger
2026-03-28 05:52:06 +00:00
parent bde5bae69f
commit 83135c31c9
13 changed files with 473 additions and 244 deletions

View File

@@ -100,6 +100,9 @@ export const bluebubblesPlugin: ChannelPlugin<ResolvedBlueBubblesAccount, BlueBu
isConfigured: (account) => account.configured,
describeAccount: (account): ChannelAccountSnapshot => describeBlueBubblesAccount(account),
},
conversationBindings: {
supportsCurrentConversationBinding: true,
},
actions: bluebubblesMessageActions,
bindings: {
compileConfiguredBinding: ({ conversationId }) =>

View File

@@ -0,0 +1,2 @@
export { googlechatPlugin } from "./src/channel.js";
export { setGoogleChatRuntime } from "./src/runtime.js";

View File

@@ -134,6 +134,9 @@ export const imessagePlugin: ChannelPlugin<ResolvedIMessageAccount, IMessageProb
resolveRequireMention: resolveIMessageGroupRequireMention,
resolveToolPolicy: resolveIMessageGroupToolPolicy,
},
conversationBindings: {
supportsCurrentConversationBinding: true,
},
bindings: {
compileConfiguredBinding: ({ conversationId }) =>
normalizeIMessageAcpConversationId(conversationId),

View File

@@ -293,6 +293,9 @@ export const matrixPlugin: ChannelPlugin<ResolvedMatrixAccount, MatrixProbe> =
resolveRequireMention: resolveMatrixGroupRequireMention,
resolveToolPolicy: resolveMatrixGroupToolPolicy,
},
conversationBindings: {
supportsCurrentConversationBinding: true,
},
messaging: {
normalizeTarget: normalizeMatrixMessagingTarget,
resolveOutboundSessionRoute: (params) => resolveMatrixOutboundSessionRoute(params),

View File

@@ -1,49 +1,30 @@
import { normalizeConversationText } from "../../../acp/conversation-id.js";
import { resolveConversationBindingContext } from "../../../channels/conversation-binding-context.js";
import type { HandleCommandsParams } from "../commands-types.js";
import {
resolveConversationBindingAccountIdFromMessage,
resolveConversationBindingChannelFromMessage,
resolveConversationBindingContextFromAcpCommand,
resolveConversationBindingThreadIdFromMessage,
} from "../conversation-binding-input.js";
export function resolveAcpCommandChannel(params: HandleCommandsParams): string {
const raw =
params.ctx.OriginatingChannel ??
params.command.channel ??
params.ctx.Surface ??
params.ctx.Provider;
return normalizeConversationText(raw).toLowerCase();
const resolved = resolveConversationBindingChannelFromMessage(params.ctx, params.command.channel);
return normalizeConversationText(resolved).toLowerCase();
}
export function resolveAcpCommandAccountId(params: HandleCommandsParams): string {
const accountId = normalizeConversationText(params.ctx.AccountId);
return accountId || "default";
return resolveConversationBindingAccountIdFromMessage(params.ctx);
}
export function resolveAcpCommandThreadId(params: HandleCommandsParams): string | undefined {
const threadId =
params.ctx.MessageThreadId != null
? normalizeConversationText(String(params.ctx.MessageThreadId))
: "";
return threadId || undefined;
return resolveConversationBindingThreadIdFromMessage(params.ctx);
}
function resolveAcpCommandConversationRef(params: HandleCommandsParams): {
conversationId: string;
parentConversationId?: string;
} | null {
const resolved = resolveConversationBindingContext({
cfg: params.cfg,
channel: resolveAcpCommandChannel(params),
accountId: resolveAcpCommandAccountId(params),
chatType: params.ctx.ChatType,
threadId: resolveAcpCommandThreadId(params),
threadParentId: params.ctx.ThreadParentId,
senderId: params.command.senderId ?? params.ctx.SenderId,
sessionKey: params.sessionKey,
parentSessionKey: params.ctx.ParentSessionKey,
originatingTo: params.ctx.OriginatingTo,
commandTo: params.command.to,
fallbackTo: params.ctx.To,
from: params.ctx.From,
nativeChannelId: params.ctx.NativeChannelId,
});
const resolved = resolveConversationBindingContextFromAcpCommand(params);
if (!resolved) {
return null;
}

View File

@@ -0,0 +1,94 @@
import { normalizeConversationText } from "../../acp/conversation-id.js";
import { resolveConversationBindingContext } from "../../channels/conversation-binding-context.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { MsgContext } from "../templating.js";
import type { HandleCommandsParams } from "./commands-types.js";
type BindingMsgContext = Pick<
MsgContext,
| "OriginatingChannel"
| "Surface"
| "Provider"
| "AccountId"
| "ChatType"
| "MessageThreadId"
| "ThreadParentId"
| "SenderId"
| "SessionKey"
| "ParentSessionKey"
| "OriginatingTo"
| "To"
| "From"
| "NativeChannelId"
>;
function resolveBindingChannel(ctx: BindingMsgContext, commandChannel?: string | null): string {
const raw = ctx.OriginatingChannel ?? commandChannel ?? ctx.Surface ?? ctx.Provider;
return normalizeConversationText(raw).toLowerCase();
}
function resolveBindingAccountId(ctx: BindingMsgContext): string {
const accountId = normalizeConversationText(ctx.AccountId);
return accountId || "default";
}
function resolveBindingThreadId(threadId: string | number | null | undefined): string | undefined {
const normalized = threadId != null ? normalizeConversationText(String(threadId)) : undefined;
return normalized || undefined;
}
export function resolveConversationBindingContextFromMessage(params: {
cfg: OpenClawConfig;
ctx: BindingMsgContext;
senderId?: string | null;
sessionKey?: string | null;
parentSessionKey?: string | null;
commandTo?: string | null;
}): ReturnType<typeof resolveConversationBindingContext> {
return resolveConversationBindingContext({
cfg: params.cfg,
channel: resolveBindingChannel(params.ctx),
accountId: resolveBindingAccountId(params.ctx),
chatType: params.ctx.ChatType,
threadId: resolveBindingThreadId(params.ctx.MessageThreadId),
threadParentId: params.ctx.ThreadParentId,
senderId: params.senderId ?? params.ctx.SenderId,
sessionKey: params.sessionKey ?? params.ctx.SessionKey,
parentSessionKey: params.parentSessionKey ?? params.ctx.ParentSessionKey,
originatingTo: params.ctx.OriginatingTo,
commandTo: params.commandTo,
fallbackTo: params.ctx.To,
from: params.ctx.From,
nativeChannelId: params.ctx.NativeChannelId,
});
}
export function resolveConversationBindingContextFromAcpCommand(
params: HandleCommandsParams,
): ReturnType<typeof resolveConversationBindingContext> {
return resolveConversationBindingContextFromMessage({
cfg: params.cfg,
ctx: params.ctx,
senderId: params.command.senderId,
sessionKey: params.sessionKey,
parentSessionKey: params.ctx.ParentSessionKey,
commandTo: params.command.to,
});
}
export function resolveConversationBindingChannelFromMessage(
ctx: BindingMsgContext,
commandChannel?: string | null,
): string {
return resolveBindingChannel(ctx, commandChannel);
}
export function resolveConversationBindingAccountIdFromMessage(ctx: BindingMsgContext): string {
return resolveBindingAccountId(ctx);
}
export function resolveConversationBindingThreadIdFromMessage(
ctx: Pick<BindingMsgContext, "MessageThreadId">,
): string | undefined {
return resolveBindingThreadId(ctx.MessageThreadId);
}

View File

@@ -4,7 +4,6 @@ import { normalizeConversationText } from "../../acp/conversation-id.js";
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import { clearBootstrapSnapshotOnSessionRollover } from "../../agents/bootstrap-cache.js";
import { normalizeChatType } from "../../channels/chat-type.js";
import { resolveConversationBindingContext } from "../../channels/conversation-binding-context.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveGroupSessionKey } from "../../config/sessions/group.js";
import { deriveSessionMetaPatch } from "../../config/sessions/metadata.js";
@@ -36,6 +35,7 @@ import { isInternalMessageChannel } from "../../utils/message-channel.js";
import { resolveCommandAuthorization } from "../command-auth.js";
import type { MsgContext, TemplateContext } from "../templating.js";
import { resolveEffectiveResetTargetSessionKey } from "./acp-reset-target.js";
import { resolveConversationBindingContextFromMessage } from "./conversation-binding-input.js";
import { normalizeInboundTextNewlines } from "./inbound-text.js";
import { stripMentions, stripStructuralPrefixes } from "./mentions.js";
import {
@@ -81,7 +81,7 @@ function isResetAuthorizedForContext(params: {
commandAuthorized: boolean;
}): boolean {
const auth = resolveCommandAuthorization(params);
if (!auth.isAuthorizedSender) {
if (!params.commandAuthorized && !auth.isAuthorizedSender) {
return false;
}
const provider = params.ctx.Provider;
@@ -98,7 +98,7 @@ function isResetAuthorizedForContext(params: {
return scopes.includes("operator.admin");
}
function resolveAcpResetBindingContext(
function resolveSessionConversationBindingContext(
cfg: OpenClawConfig,
ctx: MsgContext,
): {
@@ -107,20 +107,9 @@ function resolveAcpResetBindingContext(
conversationId: string;
parentConversationId?: string;
} | null {
const bindingContext = resolveConversationBindingContext({
const bindingContext = resolveConversationBindingContextFromMessage({
cfg,
channel: ctx.OriginatingChannel ?? ctx.Surface ?? ctx.Provider,
accountId: ctx.AccountId,
chatType: ctx.ChatType,
threadId: ctx.MessageThreadId,
threadParentId: ctx.ThreadParentId,
senderId: ctx.SenderId,
sessionKey: ctx.SessionKey,
parentSessionKey: ctx.ParentSessionKey,
originatingTo: ctx.OriginatingTo,
fallbackTo: ctx.To,
from: ctx.From,
nativeChannelId: ctx.NativeChannelId,
ctx,
});
if (!bindingContext) {
return null;
@@ -138,9 +127,16 @@ function resolveAcpResetBindingContext(
function resolveBoundAcpSessionForReset(params: {
cfg: OpenClawConfig;
ctx: MsgContext;
bindingContext?: {
channel: string;
accountId: string;
conversationId: string;
parentConversationId?: string;
} | null;
}): string | undefined {
const activeSessionKey = normalizeConversationText(params.ctx.SessionKey);
const bindingContext = resolveAcpResetBindingContext(params.cfg, params.ctx);
const bindingContext =
params.bindingContext ?? resolveSessionConversationBindingContext(params.cfg, params.ctx);
return resolveEffectiveResetTargetSessionKey({
cfg: params.cfg,
channel: bindingContext?.channel,
@@ -157,22 +153,15 @@ function resolveBoundAcpSessionForReset(params: {
function resolveBoundConversationSessionKey(params: {
cfg: OpenClawConfig;
ctx: MsgContext;
bindingContext?: {
channel: string;
accountId: string;
conversationId: string;
parentConversationId?: string;
} | null;
}): string | undefined {
const bindingContext = resolveConversationBindingContext({
cfg: params.cfg,
channel: params.ctx.OriginatingChannel ?? params.ctx.Surface ?? params.ctx.Provider,
accountId: params.ctx.AccountId,
chatType: params.ctx.ChatType,
threadId: params.ctx.MessageThreadId,
threadParentId: params.ctx.ThreadParentId,
senderId: params.ctx.SenderId,
sessionKey: params.ctx.SessionKey,
parentSessionKey: params.ctx.ParentSessionKey,
originatingTo: params.ctx.OriginatingTo,
fallbackTo: params.ctx.To,
from: params.ctx.From,
nativeChannelId: params.ctx.NativeChannelId,
});
const bindingContext =
params.bindingContext ?? resolveSessionConversationBindingContext(params.cfg, params.ctx);
if (!bindingContext) {
return undefined;
}
@@ -197,6 +186,7 @@ export async function initSessionState(params: {
commandAuthorized: boolean;
}): Promise<SessionInitResult> {
const { ctx, cfg, commandAuthorized } = params;
const conversationBindingContext = resolveSessionConversationBindingContext(cfg, ctx);
// Native slash commands (Telegram/Discord/Slack) are delivered on a separate
// "slash session" key, but should mutate the target chat session.
const commandTargetSessionKey =
@@ -205,6 +195,7 @@ export async function initSessionState(params: {
resolveBoundConversationSessionKey({
cfg,
ctx,
bindingContext: conversationBindingContext,
}) ?? commandTargetSessionKey;
const sessionCtxForState =
targetSessionKey && targetSessionKey !== ctx.SessionKey
@@ -300,6 +291,7 @@ export async function initSessionState(params: {
resolveBoundAcpSessionForReset({
cfg,
ctx: sessionCtxForState,
bindingContext: conversationBindingContext,
}),
);
const shouldBypassAcpResetForTrigger = (triggerLower: string): boolean =>

View File

@@ -64,6 +64,18 @@ function resolveChannelTargetId(params: {
return undefined;
}
const lower = target.toLowerCase();
const channelPrefix = `${params.channel}:`;
if (lower.startsWith(channelPrefix)) {
return resolveChannelTargetId({
channel: params.channel,
target: target.slice(channelPrefix.length),
});
}
if (CANONICAL_TARGET_PREFIXES.some((prefix) => lower.startsWith(prefix))) {
return target;
}
const parsed = parseExplicitTargetForChannel(params.channel, target);
const parsedTarget = normalizeText(parsed?.to);
if (parsedTarget) {
@@ -74,15 +86,6 @@ function resolveChannelTargetId(params: {
);
}
const lower = target.toLowerCase();
const channelPrefix = `${params.channel}:`;
if (lower.startsWith(channelPrefix)) {
return normalizeText(target.slice(channelPrefix.length));
}
if (CANONICAL_TARGET_PREFIXES.some((prefix) => lower.startsWith(prefix))) {
return target;
}
const explicitConversationId = resolveConversationIdFromTargets({
targets: [target],
});

View File

@@ -603,6 +603,10 @@ export type ChannelConfiguredBindingProvider = {
) => ChannelConfiguredBindingConversationRef | null;
};
export type ChannelConversationBindingSupport = {
supportsCurrentConversationBinding?: boolean;
};
export type ChannelSecurityAdapter<ResolvedAccount = unknown> = {
resolveDmPolicy?: (
ctx: ChannelSecurityContext<ResolvedAccount>,

View File

@@ -3,6 +3,7 @@ import type {
ChannelAuthAdapter,
ChannelCommandAdapter,
ChannelConfigAdapter,
ChannelConversationBindingSupport,
ChannelDirectoryAdapter,
ChannelExecApprovalAdapter,
ChannelResolverAdapter,
@@ -102,6 +103,7 @@ export type ChannelPlugin<ResolvedAccount = any, Probe = unknown, Audit = unknow
execApprovals?: ChannelExecApprovalAdapter;
allowlist?: ChannelAllowlistAdapter;
bindings?: ChannelConfiguredBindingProvider;
conversationBindings?: ChannelConversationBindingSupport;
streaming?: ChannelStreamingAdapter;
threading?: ChannelThreadingAdapter;
messaging?: ChannelMessagingAdapter;

View File

@@ -0,0 +1,280 @@
import fs from "node:fs";
import path from "node:path";
import { normalizeConversationText } from "../../acp/conversation-id.js";
import { bundledChannelPlugins } from "../../channels/plugins/bundled.js";
import { normalizeAnyChannelId } from "../../channels/registry.js";
import { resolveStateDir } from "../../config/paths.js";
import { loadJsonFile } from "../../infra/json-file.js";
import { writeJsonFileAtomically } from "../../plugin-sdk/json-store.js";
import { getActivePluginChannelRegistry } from "../../plugins/runtime.js";
import { normalizeAccountId } from "../../routing/session-key.js";
import type {
ConversationRef,
SessionBindingBindInput,
SessionBindingCapabilities,
SessionBindingRecord,
SessionBindingUnbindInput,
} from "./session-binding-service.js";
type PersistedCurrentConversationBindingsFile = {
version: 1;
bindings: SessionBindingRecord[];
};
const CURRENT_BINDINGS_FILE_VERSION = 1;
const CURRENT_BINDINGS_ID_PREFIX = "generic:";
let bindingsLoaded = false;
let persistPromise: Promise<void> = Promise.resolve();
const bindingsByConversationKey = new Map<string, SessionBindingRecord>();
function normalizeConversationRef(ref: ConversationRef): ConversationRef {
return {
channel: ref.channel.trim().toLowerCase(),
accountId: normalizeAccountId(ref.accountId),
conversationId: ref.conversationId.trim(),
parentConversationId: ref.parentConversationId?.trim() || undefined,
};
}
function buildConversationKey(ref: ConversationRef): string {
const normalized = normalizeConversationRef(ref);
return [
normalized.channel,
normalized.accountId,
normalized.parentConversationId ?? "",
normalized.conversationId,
].join("\u241f");
}
function buildBindingId(ref: ConversationRef): string {
return `${CURRENT_BINDINGS_ID_PREFIX}${buildConversationKey(ref)}`;
}
function resolveBindingsFilePath(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveStateDir(env), "bindings", "current-conversations.json");
}
function isBindingExpired(record: SessionBindingRecord, now = Date.now()): boolean {
return typeof record.expiresAt === "number" && Number.isFinite(record.expiresAt)
? record.expiresAt <= now
: false;
}
function toPersistedFile(): PersistedCurrentConversationBindingsFile {
const bindings = [...bindingsByConversationKey.values()]
.filter((record) => !isBindingExpired(record))
.toSorted((a, b) => a.bindingId.localeCompare(b.bindingId));
return {
version: CURRENT_BINDINGS_FILE_VERSION,
bindings,
};
}
function loadBindingsIntoMemory(): void {
if (bindingsLoaded) {
return;
}
bindingsLoaded = true;
bindingsByConversationKey.clear();
const parsed = loadJsonFile(resolveBindingsFilePath()) as
| PersistedCurrentConversationBindingsFile
| undefined;
const bindings = parsed?.version === CURRENT_BINDINGS_FILE_VERSION ? parsed.bindings : [];
for (const record of bindings ?? []) {
if (!record?.bindingId || !record?.conversation?.conversationId || isBindingExpired(record)) {
continue;
}
bindingsByConversationKey.set(buildConversationKey(record.conversation), {
...record,
conversation: normalizeConversationRef(record.conversation),
});
}
}
async function persistBindingsToDisk(): Promise<void> {
await writeJsonFileAtomically(resolveBindingsFilePath(), toPersistedFile());
}
function enqueuePersist(): Promise<void> {
persistPromise = persistPromise
.catch(() => {})
.then(async () => {
await persistBindingsToDisk();
});
return persistPromise;
}
function pruneExpiredBinding(key: string): SessionBindingRecord | null {
loadBindingsIntoMemory();
const record = bindingsByConversationKey.get(key) ?? null;
if (!record) {
return null;
}
if (!isBindingExpired(record)) {
return record;
}
bindingsByConversationKey.delete(key);
void enqueuePersist();
return null;
}
function resolveChannelSupportsCurrentConversationBinding(channel: string): boolean {
const normalized =
normalizeAnyChannelId(channel) ?? normalizeConversationText(channel)?.trim().toLowerCase();
if (!normalized) {
return false;
}
const matchesPluginId = (plugin: { id: string; meta?: { aliases?: string[] } }) =>
plugin.id === normalized ||
(plugin.meta?.aliases ?? []).some((alias) => alias.trim().toLowerCase() === normalized);
const plugin =
getActivePluginChannelRegistry()?.channels.find((entry) => matchesPluginId(entry.plugin))
?.plugin ?? bundledChannelPlugins.find((entry) => matchesPluginId(entry));
return plugin?.conversationBindings?.supportsCurrentConversationBinding === true;
}
export function getGenericCurrentConversationBindingCapabilities(params: {
channel: string;
accountId: string;
}): SessionBindingCapabilities | null {
void params.accountId;
if (!resolveChannelSupportsCurrentConversationBinding(params.channel)) {
return null;
}
return {
adapterAvailable: true,
bindSupported: true,
unbindSupported: true,
placements: ["current"],
};
}
export async function bindGenericCurrentConversation(
input: SessionBindingBindInput,
): Promise<SessionBindingRecord | null> {
const conversation = normalizeConversationRef(input.conversation);
const targetSessionKey = input.targetSessionKey.trim();
if (!conversation.channel || !conversation.conversationId || !targetSessionKey) {
return null;
}
loadBindingsIntoMemory();
const now = Date.now();
const ttlMs =
typeof input.ttlMs === "number" && Number.isFinite(input.ttlMs)
? Math.max(0, Math.floor(input.ttlMs))
: undefined;
const key = buildConversationKey(conversation);
const existing = pruneExpiredBinding(key);
const record: SessionBindingRecord = {
bindingId: buildBindingId(conversation),
targetSessionKey,
targetKind: input.targetKind,
conversation,
status: "active",
boundAt: now,
...(ttlMs != null ? { expiresAt: now + ttlMs } : {}),
metadata: {
...existing?.metadata,
...input.metadata,
lastActivityAt: now,
},
};
bindingsByConversationKey.set(key, record);
await enqueuePersist();
return record;
}
export function resolveGenericCurrentConversationBinding(
ref: ConversationRef,
): SessionBindingRecord | null {
return pruneExpiredBinding(buildConversationKey(ref));
}
export function listGenericCurrentConversationBindingsBySession(
targetSessionKey: string,
): SessionBindingRecord[] {
loadBindingsIntoMemory();
const results: SessionBindingRecord[] = [];
for (const key of bindingsByConversationKey.keys()) {
const record = pruneExpiredBinding(key);
if (!record || record.targetSessionKey !== targetSessionKey) {
continue;
}
results.push(record);
}
return results;
}
export function touchGenericCurrentConversationBinding(bindingId: string, at = Date.now()): void {
loadBindingsIntoMemory();
if (!bindingId.startsWith(CURRENT_BINDINGS_ID_PREFIX)) {
return;
}
const key = bindingId.slice(CURRENT_BINDINGS_ID_PREFIX.length);
const record = pruneExpiredBinding(key);
if (!record) {
return;
}
bindingsByConversationKey.set(key, {
...record,
metadata: {
...record.metadata,
lastActivityAt: at,
},
});
}
export async function unbindGenericCurrentConversationBindings(
input: SessionBindingUnbindInput,
): Promise<SessionBindingRecord[]> {
loadBindingsIntoMemory();
const removed: SessionBindingRecord[] = [];
const normalizedBindingId = input.bindingId?.trim();
const normalizedTargetSessionKey = input.targetSessionKey?.trim();
if (normalizedBindingId?.startsWith(CURRENT_BINDINGS_ID_PREFIX)) {
const key = normalizedBindingId.slice(CURRENT_BINDINGS_ID_PREFIX.length);
const record = pruneExpiredBinding(key);
if (record) {
bindingsByConversationKey.delete(key);
removed.push(record);
await enqueuePersist();
}
return removed;
}
if (!normalizedTargetSessionKey) {
return removed;
}
for (const key of bindingsByConversationKey.keys()) {
const record = pruneExpiredBinding(key);
if (!record || record.targetSessionKey !== normalizedTargetSessionKey) {
continue;
}
bindingsByConversationKey.delete(key);
removed.push(record);
}
if (removed.length > 0) {
await enqueuePersist();
}
return removed;
}
export const __testing = {
resetCurrentConversationBindingsForTests(params?: {
deletePersistedFile?: boolean;
env?: NodeJS.ProcessEnv;
}) {
bindingsLoaded = false;
bindingsByConversationKey.clear();
persistPromise = Promise.resolve();
if (params?.deletePersistedFile) {
const filePath = resolveBindingsFilePath(params.env);
try {
fs.rmSync(filePath, { force: true });
} catch {
// ignore test cleanup failures
}
}
},
resolveBindingsFilePath,
};

View File

@@ -1,7 +1,14 @@
import { normalizeChannelId } from "../../channels/registry.js";
import { getActivePluginChannelRegistry } from "../../plugins/runtime.js";
import { normalizeAccountId } from "../../routing/session-key.js";
import { resolveGlobalMap } from "../../shared/global-singleton.js";
import {
__testing as genericCurrentConversationBindingTesting,
bindGenericCurrentConversation,
getGenericCurrentConversationBindingCapabilities,
listGenericCurrentConversationBindingsBySession,
resolveGenericCurrentConversationBinding,
touchGenericCurrentConversationBinding,
unbindGenericCurrentConversationBindings,
} from "./current-conversation-bindings.js";
export type BindingTargetKind = "subagent" | "session";
export type BindingStatus = "active" | "ending" | "ended";
@@ -159,14 +166,6 @@ const ADAPTERS_BY_CHANNEL_ACCOUNT = resolveGlobalMap<string, SessionBindingAdapt
SESSION_BINDING_ADAPTERS_KEY,
);
const GENERIC_SESSION_BINDINGS_KEY = Symbol.for("openclaw.sessionBinding.genericBindings");
const GENERIC_BINDINGS_BY_CONVERSATION = resolveGlobalMap<string, SessionBindingRecord>(
GENERIC_SESSION_BINDINGS_KEY,
);
const GENERIC_BINDING_ID_PREFIX = "generic:";
function getActiveAdapterForKey(key: string): SessionBindingAdapter | null {
const registrations = ADAPTERS_BY_CHANNEL_ACCOUNT.get(key);
return registrations?.[0]?.normalizedAdapter ?? null;
@@ -239,150 +238,6 @@ function resolveAdapterForChannelAccount(params: {
return getActiveAdapterForKey(key);
}
function supportsGenericCurrentConversationBindings(params: {
channel: string;
accountId: string;
}): boolean {
void params.accountId;
const normalizedChannel = params.channel.trim().toLowerCase();
return Boolean(
normalizeChannelId(params.channel) ||
getActivePluginChannelRegistry()?.channels.some(
(entry) =>
entry.plugin.id.trim().toLowerCase() === normalizedChannel ||
(entry.plugin.meta?.aliases ?? []).some(
(alias) => alias.trim().toLowerCase() === normalizedChannel,
),
),
);
}
function buildGenericConversationKey(ref: ConversationRef): string {
const normalized = normalizeConversationRef(ref);
return [
normalized.channel,
normalized.accountId,
normalized.parentConversationId ?? "",
normalized.conversationId,
].join("\u241f");
}
function buildGenericBindingId(ref: ConversationRef): string {
return `${GENERIC_BINDING_ID_PREFIX}${buildGenericConversationKey(ref)}`;
}
function isGenericBindingExpired(record: SessionBindingRecord, now = Date.now()): boolean {
return typeof record.expiresAt === "number" && Number.isFinite(record.expiresAt)
? record.expiresAt <= now
: false;
}
function pruneExpiredGenericBinding(key: string): SessionBindingRecord | null {
const record = GENERIC_BINDINGS_BY_CONVERSATION.get(key) ?? null;
if (!record) {
return null;
}
if (!isGenericBindingExpired(record)) {
return record;
}
GENERIC_BINDINGS_BY_CONVERSATION.delete(key);
return null;
}
function bindGenericConversation(input: SessionBindingBindInput): SessionBindingRecord | null {
const conversation = normalizeConversationRef(input.conversation);
const targetSessionKey = input.targetSessionKey.trim();
if (!conversation.channel || !conversation.conversationId || !targetSessionKey) {
return null;
}
const now = Date.now();
const key = buildGenericConversationKey(conversation);
const existing = pruneExpiredGenericBinding(key);
const ttlMs =
typeof input.ttlMs === "number" && Number.isFinite(input.ttlMs)
? Math.max(0, Math.floor(input.ttlMs))
: undefined;
const metadata = {
...existing?.metadata,
...input.metadata,
lastActivityAt: now,
};
const record: SessionBindingRecord = {
bindingId: buildGenericBindingId(conversation),
targetSessionKey,
targetKind: input.targetKind,
conversation,
status: "active",
boundAt: now,
...(ttlMs != null ? { expiresAt: now + ttlMs } : {}),
metadata,
};
GENERIC_BINDINGS_BY_CONVERSATION.set(key, record);
return record;
}
function listGenericBindingsBySession(targetSessionKey: string): SessionBindingRecord[] {
const results: SessionBindingRecord[] = [];
for (const key of GENERIC_BINDINGS_BY_CONVERSATION.keys()) {
const active = pruneExpiredGenericBinding(key);
if (!active || active.targetSessionKey !== targetSessionKey) {
continue;
}
results.push(active);
}
return results;
}
function resolveGenericBindingByConversation(ref: ConversationRef): SessionBindingRecord | null {
const key = buildGenericConversationKey(ref);
return pruneExpiredGenericBinding(key);
}
function touchGenericBinding(bindingId: string, at = Date.now()): void {
if (!bindingId.startsWith(GENERIC_BINDING_ID_PREFIX)) {
return;
}
const key = bindingId.slice(GENERIC_BINDING_ID_PREFIX.length);
const record = pruneExpiredGenericBinding(key);
if (!record) {
return;
}
GENERIC_BINDINGS_BY_CONVERSATION.set(key, {
...record,
metadata: {
...record.metadata,
lastActivityAt: at,
},
});
}
function unbindGenericBindings(input: SessionBindingUnbindInput): SessionBindingRecord[] {
const removed: SessionBindingRecord[] = [];
const normalizedBindingId = input.bindingId?.trim();
const normalizedTargetSessionKey = input.targetSessionKey?.trim();
if (normalizedBindingId?.startsWith(GENERIC_BINDING_ID_PREFIX)) {
const key = normalizedBindingId.slice(GENERIC_BINDING_ID_PREFIX.length);
const record = pruneExpiredGenericBinding(key);
if (record) {
GENERIC_BINDINGS_BY_CONVERSATION.delete(key);
removed.push(record);
}
return removed;
}
if (!normalizedTargetSessionKey) {
return removed;
}
for (const key of GENERIC_BINDINGS_BY_CONVERSATION.keys()) {
const active = pruneExpiredGenericBinding(key);
if (!active || active.targetSessionKey !== normalizedTargetSessionKey) {
continue;
}
GENERIC_BINDINGS_BY_CONVERSATION.delete(key);
removed.push(active);
}
return removed;
}
function getActiveRegisteredAdapters(): SessionBindingAdapter[] {
return [...ADAPTERS_BY_CHANNEL_ACCOUNT.values()]
.map((registrations) => registrations[0]?.normalizedAdapter ?? null)
@@ -406,12 +261,11 @@ function createDefaultSessionBindingService(): SessionBindingService {
const normalizedConversation = normalizeConversationRef(input.conversation);
const adapter = resolveAdapterForConversation(normalizedConversation);
if (!adapter) {
if (
supportsGenericCurrentConversationBindings({
channel: normalizedConversation.channel,
accountId: normalizedConversation.accountId,
})
) {
const genericCapabilities = getGenericCurrentConversationBindingCapabilities({
channel: normalizedConversation.channel,
accountId: normalizedConversation.accountId,
});
if (genericCapabilities?.bindSupported) {
const placement =
normalizePlacement(input.placement) ?? inferDefaultPlacement(normalizedConversation);
if (placement !== "current") {
@@ -425,7 +279,7 @@ function createDefaultSessionBindingService(): SessionBindingService {
},
);
}
const bound = bindGenericConversation({
const bound = await bindGenericCurrentConversation({
...input,
conversation: normalizedConversation,
placement,
@@ -499,13 +353,15 @@ function createDefaultSessionBindingService(): SessionBindingService {
channel: params.channel,
accountId: params.accountId,
});
if (!adapter && supportsGenericCurrentConversationBindings(params)) {
return {
adapterAvailable: true,
bindSupported: true,
unbindSupported: true,
placements: ["current"],
};
if (!adapter) {
return (
getGenericCurrentConversationBindingCapabilities(params) ?? {
adapterAvailable: false,
bindSupported: false,
unbindSupported: false,
placements: [],
}
);
}
return resolveAdapterCapabilities(adapter);
},
@@ -521,7 +377,7 @@ function createDefaultSessionBindingService(): SessionBindingService {
results.push(...entries);
}
}
results.push(...listGenericBindingsBySession(key));
results.push(...listGenericCurrentConversationBindingsBySession(key));
return dedupeBindings(results);
},
resolveByConversation: (ref) => {
@@ -531,7 +387,7 @@ function createDefaultSessionBindingService(): SessionBindingService {
}
const adapter = resolveAdapterForConversation(normalized);
if (!adapter) {
return resolveGenericBindingByConversation(normalized);
return resolveGenericCurrentConversationBinding(normalized);
}
return adapter.resolveByConversation(normalized);
},
@@ -543,7 +399,7 @@ function createDefaultSessionBindingService(): SessionBindingService {
for (const adapter of getActiveRegisteredAdapters()) {
adapter.touch?.(normalizedBindingId, at);
}
touchGenericBinding(normalizedBindingId, at);
touchGenericCurrentConversationBinding(normalizedBindingId, at);
},
unbind: async (input) => {
const removed: SessionBindingRecord[] = [];
@@ -556,7 +412,7 @@ function createDefaultSessionBindingService(): SessionBindingService {
removed.push(...entries);
}
}
removed.push(...unbindGenericBindings(input));
removed.push(...(await unbindGenericCurrentConversationBindings(input)));
return dedupeBindings(removed);
},
};
@@ -571,7 +427,9 @@ export function getSessionBindingService(): SessionBindingService {
export const __testing = {
resetSessionBindingAdaptersForTests() {
ADAPTERS_BY_CHANNEL_ACCOUNT.clear();
GENERIC_BINDINGS_BY_CONVERSATION.clear();
genericCurrentConversationBindingTesting.resetCurrentConversationBindingsForTests({
deletePersistedFile: true,
});
},
getRegisteredAdapterKeys() {
return [...ADAPTERS_BY_CHANNEL_ACCOUNT.keys()];

View File

@@ -519,6 +519,10 @@ export function createChatChannelPlugin<
}): ChannelPlugin<TResolvedAccount, Probe, Audit> {
return {
...params.base,
conversationBindings: {
supportsCurrentConversationBinding: true,
...params.base.conversationBindings,
},
...(params.security ? { security: resolveChatChannelSecurity(params.security) } : {}),
...(params.pairing ? { pairing: resolveChatChannelPairing(params.pairing) } : {}),
...(params.threading ? { threading: resolveChatChannelThreading(params.threading) } : {}),