Channels: add global threading and directory contracts

This commit is contained in:
Vincent Koc
2026-03-16 08:39:17 -07:00
parent 750ce393bc
commit 0f013575f8
6 changed files with 476 additions and 0 deletions

View File

@@ -0,0 +1,12 @@
import { describe } from "vitest";
import { directoryContractRegistry } from "./registry.js";
import { installChannelDirectoryContractSuite } from "./suites.js";
for (const entry of directoryContractRegistry) {
describe(`${entry.id} directory contract`, () => {
installChannelDirectoryContractSuite({
plugin: entry.plugin,
invokeLookups: entry.invokeLookups,
});
});
}

View File

@@ -1,10 +1,12 @@
import { describe, expect, it } from "vitest";
import {
actionContractRegistry,
directoryContractRegistry,
pluginContractRegistry,
setupContractRegistry,
statusContractRegistry,
surfaceContractRegistry,
threadingContractRegistry,
type ChannelPluginSurface,
} from "./registry.js";
@@ -70,4 +72,26 @@ describe("channel contract registry", () => {
expect(statusSurfaceIds.has(entry.id)).toBe(true);
}
});
it("only installs deep threading coverage for plugins that declare threading", () => {
const threadingSurfaceIds = new Set(
surfaceContractRegistry
.filter((entry) => entry.surfaces.includes("threading"))
.map((entry) => entry.id),
);
for (const entry of threadingContractRegistry) {
expect(threadingSurfaceIds.has(entry.id)).toBe(true);
}
});
it("only installs deep directory coverage for plugins that declare directory", () => {
const directorySurfaceIds = new Set(
surfaceContractRegistry
.filter((entry) => entry.surfaces.includes("directory"))
.map((entry) => entry.id),
);
for (const entry of directoryContractRegistry) {
expect(directorySurfaceIds.has(entry.id)).toBe(true);
}
});
});

View File

@@ -84,6 +84,17 @@ type SurfaceContractEntry = {
surfaces: readonly ChannelPluginSurface[];
};
type ThreadingContractEntry = {
id: string;
plugin: Pick<ChannelPlugin, "id" | "threading">;
};
type DirectoryContractEntry = {
id: string;
plugin: Pick<ChannelPlugin, "id" | "directory">;
invokeLookups: boolean;
};
const telegramListActionsMock = vi.fn();
const telegramGetCapabilitiesMock = vi.fn();
const discordListActionsMock = vi.fn();
@@ -672,3 +683,20 @@ export const surfaceContractRegistry: SurfaceContractEntry[] = [
],
},
];
export const threadingContractRegistry: ThreadingContractEntry[] = surfaceContractRegistry
.filter((entry) => entry.surfaces.includes("threading"))
.map((entry) => ({
id: entry.id,
plugin: entry.plugin,
}));
const directoryShapeOnlyIds = new Set(["matrix", "whatsapp", "zalouser"]);
export const directoryContractRegistry: DirectoryContractEntry[] = surfaceContractRegistry
.filter((entry) => entry.surfaces.includes("directory"))
.map((entry) => ({
id: entry.id,
plugin: entry.plugin,
invokeLookups: !directoryShapeOnlyIds.has(entry.id),
}));

View File

@@ -0,0 +1,151 @@
import { beforeEach, describe, expect } from "vitest";
import {
__testing as feishuThreadBindingTesting,
createFeishuThreadBindingManager,
} from "../../../../extensions/feishu/src/thread-bindings.js";
import {
__testing as telegramThreadBindingTesting,
createTelegramThreadBindingManager,
} from "../../../../extensions/telegram/src/thread-bindings.js";
import type { OpenClawConfig } from "../../../config/config.js";
import {
__testing as sessionBindingTesting,
getSessionBindingService,
} from "../../../infra/outbound/session-binding-service.js";
import { installSessionBindingContractSuite } from "./suites.js";
const baseCfg = {
session: { mainKey: "main", scope: "per-sender" },
} satisfies OpenClawConfig;
beforeEach(() => {
sessionBindingTesting.resetSessionBindingAdaptersForTests();
feishuThreadBindingTesting.resetFeishuThreadBindingsForTests();
telegramThreadBindingTesting.resetTelegramThreadBindingsForTests();
});
describe("feishu session binding contract", () => {
installSessionBindingContractSuite({
expectedCapabilities: {
adapterAvailable: true,
bindSupported: true,
unbindSupported: true,
placements: ["current"],
},
getCapabilities: () => {
createFeishuThreadBindingManager({ cfg: baseCfg, accountId: "default" });
return getSessionBindingService().getCapabilities({
channel: "feishu",
accountId: "default",
});
},
bindAndResolve: async () => {
createFeishuThreadBindingManager({ cfg: baseCfg, accountId: "default" });
const service = getSessionBindingService();
const binding = await service.bind({
targetSessionKey: "agent:codex:acp:binding:feishu:default:abc123",
targetKind: "session",
conversation: {
channel: "feishu",
accountId: "default",
conversationId: "oc_group_chat:topic:om_topic_root",
parentConversationId: "oc_group_chat",
},
placement: "current",
metadata: {
agentId: "codex",
label: "codex-main",
},
});
expect(
service.resolveByConversation({
channel: "feishu",
accountId: "default",
conversationId: "oc_group_chat:topic:om_topic_root",
}),
)?.toMatchObject({
targetSessionKey: "agent:codex:acp:binding:feishu:default:abc123",
});
return binding;
},
cleanup: async () => {
const manager = createFeishuThreadBindingManager({ cfg: baseCfg, accountId: "default" });
manager.stop();
expect(
getSessionBindingService().resolveByConversation({
channel: "feishu",
accountId: "default",
conversationId: "oc_group_chat:topic:om_topic_root",
}),
).toBeNull();
},
});
});
describe("telegram session binding contract", () => {
installSessionBindingContractSuite({
expectedCapabilities: {
adapterAvailable: true,
bindSupported: true,
unbindSupported: true,
placements: ["current"],
},
getCapabilities: () => {
createTelegramThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
});
return getSessionBindingService().getCapabilities({
channel: "telegram",
accountId: "default",
});
},
bindAndResolve: async () => {
createTelegramThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
});
const service = getSessionBindingService();
const binding = await service.bind({
targetSessionKey: "agent:main:subagent:child-1",
targetKind: "subagent",
conversation: {
channel: "telegram",
accountId: "default",
conversationId: "-100200300:topic:77",
},
placement: "current",
metadata: {
boundBy: "user-1",
},
});
expect(
service.resolveByConversation({
channel: "telegram",
accountId: "default",
conversationId: "-100200300:topic:77",
}),
)?.toMatchObject({
targetSessionKey: "agent:main:subagent:child-1",
});
return binding;
},
cleanup: async () => {
const manager = createTelegramThreadBindingManager({
accountId: "default",
persist: false,
enableSweeper: false,
});
manager.stop();
expect(
getSessionBindingService().resolveByConversation({
channel: "telegram",
accountId: "default",
conversationId: "-100200300:topic:77",
}),
).toBeNull();
},
});
});

View File

@@ -5,13 +5,21 @@ import type {
ResolveProviderRuntimeGroupPolicyParams,
RuntimeGroupPolicyResolution,
} from "../../../config/runtime-group-policy.js";
import type {
SessionBindingCapabilities,
SessionBindingRecord,
} from "../../../infra/outbound/session-binding-service.js";
import { normalizeChatType } from "../../chat-type.js";
import { resolveConversationLabel } from "../../conversation-label.js";
import { validateSenderIdentity } from "../../sender-identity.js";
import type {
ChannelAccountSnapshot,
ChannelAccountState,
ChannelDirectoryEntry,
ChannelFocusedBindingContext,
ChannelReplyTransport,
ChannelSetupInput,
ChannelThreadingToolContext,
} from "../types.core.js";
import type {
ChannelMessageActionName,
@@ -23,6 +31,68 @@ function sortStrings(values: readonly string[]) {
return [...values].toSorted((left, right) => left.localeCompare(right));
}
function expectDirectoryEntryShape(entry: ChannelDirectoryEntry) {
expect(["user", "group", "channel"]).toContain(entry.kind);
expect(typeof entry.id).toBe("string");
expect(entry.id.trim()).not.toBe("");
if (entry.name !== undefined) {
expect(typeof entry.name).toBe("string");
}
if (entry.handle !== undefined) {
expect(typeof entry.handle).toBe("string");
}
if (entry.avatarUrl !== undefined) {
expect(typeof entry.avatarUrl).toBe("string");
}
if (entry.rank !== undefined) {
expect(typeof entry.rank).toBe("number");
}
}
function expectThreadingToolContextShape(context: ChannelThreadingToolContext) {
if (context.currentChannelId !== undefined) {
expect(typeof context.currentChannelId).toBe("string");
}
if (context.currentChannelProvider !== undefined) {
expect(typeof context.currentChannelProvider).toBe("string");
}
if (context.currentThreadTs !== undefined) {
expect(typeof context.currentThreadTs).toBe("string");
}
if (context.currentMessageId !== undefined) {
expect(["string", "number"]).toContain(typeof context.currentMessageId);
}
if (context.replyToMode !== undefined) {
expect(["off", "first", "all"]).toContain(context.replyToMode);
}
if (context.hasRepliedRef !== undefined) {
expect(typeof context.hasRepliedRef).toBe("object");
}
if (context.skipCrossContextDecoration !== undefined) {
expect(typeof context.skipCrossContextDecoration).toBe("boolean");
}
}
function expectReplyTransportShape(transport: ChannelReplyTransport) {
if (transport.replyToId !== undefined && transport.replyToId !== null) {
expect(typeof transport.replyToId).toBe("string");
}
if (transport.threadId !== undefined && transport.threadId !== null) {
expect(["string", "number"]).toContain(typeof transport.threadId);
}
}
function expectFocusedBindingShape(binding: ChannelFocusedBindingContext) {
expect(typeof binding.conversationId).toBe("string");
expect(binding.conversationId.trim()).not.toBe("");
if (binding.parentConversationId !== undefined) {
expect(typeof binding.parentConversationId).toBe("string");
}
expect(["current", "child"]).toContain(binding.placement);
expect(typeof binding.labelNoun).toBe("string");
expect(binding.labelNoun.trim()).not.toBe("");
}
export function installChannelPluginContractSuite(params: {
plugin: Pick<ChannelPlugin, "id" | "meta" | "capabilities" | "config">;
}) {
@@ -228,6 +298,186 @@ export function installChannelSurfaceContractSuite(params: {
});
}
export function installChannelThreadingContractSuite(params: {
plugin: Pick<ChannelPlugin, "id" | "threading">;
}) {
it("exposes the base threading contract", () => {
expect(params.plugin.threading).toBeDefined();
});
it("keeps threading return values normalized", () => {
const threading = params.plugin.threading;
expect(threading).toBeDefined();
if (threading?.resolveReplyToMode) {
expect(
["off", "first", "all"].includes(
threading.resolveReplyToMode({
cfg: {} as OpenClawConfig,
accountId: "default",
chatType: "group",
}),
),
).toBe(true);
}
const repliedRef = { value: false };
const toolContext = threading?.buildToolContext?.({
cfg: {} as OpenClawConfig,
accountId: "default",
context: {
Channel: "group:test",
From: "user:test",
To: "group:test",
ChatType: "group",
CurrentMessageId: "msg-1",
ReplyToId: "msg-0",
ReplyToIdFull: "thread-0",
MessageThreadId: "thread-0",
NativeChannelId: "native:test",
},
hasRepliedRef: repliedRef,
});
if (toolContext) {
expectThreadingToolContextShape(toolContext);
if (toolContext.hasRepliedRef) {
expect(toolContext.hasRepliedRef).toBe(repliedRef);
}
}
const autoThreadId = threading?.resolveAutoThreadId?.({
cfg: {} as OpenClawConfig,
accountId: "default",
to: "group:test",
toolContext,
replyToId: null,
});
if (autoThreadId !== undefined) {
expect(typeof autoThreadId).toBe("string");
expect(autoThreadId.trim()).not.toBe("");
}
const replyTransport = threading?.resolveReplyTransport?.({
cfg: {} as OpenClawConfig,
accountId: "default",
threadId: "thread-0",
replyToId: "msg-0",
});
if (replyTransport) {
expectReplyTransportShape(replyTransport);
}
const focusedBinding = threading?.resolveFocusedBinding?.({
cfg: {} as OpenClawConfig,
accountId: "default",
context: {
Channel: "group:test",
From: "user:test",
To: "group:test",
ChatType: "group",
CurrentMessageId: "msg-1",
ReplyToId: "msg-0",
ReplyToIdFull: "thread-0",
MessageThreadId: "thread-0",
NativeChannelId: "native:test",
},
});
if (focusedBinding) {
expectFocusedBindingShape(focusedBinding);
}
});
}
export function installChannelDirectoryContractSuite(params: {
plugin: Pick<ChannelPlugin, "id" | "directory">;
invokeLookups?: boolean;
}) {
it("exposes the base directory contract", async () => {
const directory = params.plugin.directory;
expect(directory).toBeDefined();
if (params.invokeLookups === false) {
return;
}
const self = await directory?.self?.({
cfg: {} as OpenClawConfig,
accountId: "default",
});
if (self) {
expectDirectoryEntryShape(self);
}
const peers =
(await directory?.listPeers?.({
cfg: {} as OpenClawConfig,
accountId: "default",
query: "",
limit: 5,
})) ?? [];
expect(Array.isArray(peers)).toBe(true);
for (const peer of peers) {
expectDirectoryEntryShape(peer);
}
const groups =
(await directory?.listGroups?.({
cfg: {} as OpenClawConfig,
accountId: "default",
query: "",
limit: 5,
})) ?? [];
expect(Array.isArray(groups)).toBe(true);
for (const group of groups) {
expectDirectoryEntryShape(group);
}
if (directory?.listGroupMembers && groups[0]?.id) {
const members = await directory.listGroupMembers({
cfg: {} as OpenClawConfig,
accountId: "default",
groupId: groups[0].id,
query: "",
limit: 5,
});
expect(Array.isArray(members)).toBe(true);
for (const member of members) {
expectDirectoryEntryShape(member);
}
}
});
}
export function installSessionBindingContractSuite(params: {
getCapabilities: () => SessionBindingCapabilities;
bindAndResolve: () => Promise<SessionBindingRecord>;
cleanup: () => Promise<void> | void;
expectedCapabilities: SessionBindingCapabilities;
}) {
it("registers the expected session binding capabilities", () => {
expect(params.getCapabilities()).toEqual(params.expectedCapabilities);
});
it("binds and resolves a session binding through the shared service", async () => {
const binding = await params.bindAndResolve();
expect(typeof binding.bindingId).toBe("string");
expect(binding.bindingId.trim()).not.toBe("");
expect(typeof binding.targetSessionKey).toBe("string");
expect(binding.targetSessionKey.trim()).not.toBe("");
expect(["session", "subagent"]).toContain(binding.targetKind);
expect(typeof binding.conversation.channel).toBe("string");
expect(typeof binding.conversation.accountId).toBe("string");
expect(typeof binding.conversation.conversationId).toBe("string");
expect(["active", "ending", "ended"]).toContain(binding.status);
expect(typeof binding.boundAt).toBe("number");
});
it("cleans up registered bindings", async () => {
await params.cleanup();
});
}
type ChannelSetupContractCase<ResolvedAccount> = {
name: string;
cfg: OpenClawConfig;

View File

@@ -0,0 +1,11 @@
import { describe } from "vitest";
import { threadingContractRegistry } from "./registry.js";
import { installChannelThreadingContractSuite } from "./suites.js";
for (const entry of threadingContractRegistry) {
describe(`${entry.id} threading contract`, () => {
installChannelThreadingContractSuite({
plugin: entry.plugin,
});
});
}