From 74d0c39b3274f4312b7ed5829cd98fa35852fb76 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 16 Mar 2026 00:40:32 -0700 Subject: [PATCH] refactor: move session lifecycle and outbound fallbacks into plugins --- .../reply/commands-session-lifecycle.test.ts | 54 +++++--- src/auto-reply/reply/commands-session.ts | 37 ++--- ...sessions.gateway-server-sessions-a.test.ts | 20 ++- src/gateway/session-reset-service.ts | 5 +- src/infra/exec-approval-forwarder.ts | 12 +- .../outbound/built-in-channel-adapters.ts | 127 ------------------ .../outbound/built-in-channel-messaging.ts | 41 ------ src/infra/outbound/channel-adapters.ts | 5 +- src/infra/outbound/targets.test.ts | 6 +- src/infra/outbound/targets.ts | 8 +- src/plugins/runtime/runtime-discord.ts | 20 +++ src/plugins/runtime/runtime-telegram.ts | 8 ++ src/plugins/runtime/types-channel.ts | 14 ++ 13 files changed, 114 insertions(+), 243 deletions(-) delete mode 100644 src/infra/outbound/built-in-channel-adapters.ts delete mode 100644 src/infra/outbound/built-in-channel-messaging.ts diff --git a/src/auto-reply/reply/commands-session-lifecycle.test.ts b/src/auto-reply/reply/commands-session-lifecycle.test.ts index 92812abaae6..bb56ef82bd9 100644 --- a/src/auto-reply/reply/commands-session-lifecycle.test.ts +++ b/src/auto-reply/reply/commands-session-lifecycle.test.ts @@ -1,6 +1,9 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; +import { telegramPlugin } from "../../../extensions/telegram/src/channel.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { SessionBindingRecord } from "../../infra/outbound/session-binding-service.js"; +import { setActivePluginRegistry } from "../../plugins/runtime.js"; +import { createTestRegistry } from "../../test-utils/channel-plugins.js"; const hoisted = vi.hoisted(() => { const getThreadBindingManagerMock = vi.fn(); @@ -19,28 +22,34 @@ const hoisted = vi.hoisted(() => { }; }); -vi.mock("../../../extensions/discord/src/monitor/thread-bindings.js", async (importOriginal) => { - const actual = - await importOriginal< - typeof import("../../../extensions/discord/src/monitor/thread-bindings.js") - >(); +vi.mock("../../plugins/runtime/index.js", async () => { + const discordThreadBindings = await vi.importActual< + typeof import("../../../extensions/discord/src/monitor/thread-bindings.js") + >("../../../extensions/discord/src/monitor/thread-bindings.js"); return { - ...actual, - getThreadBindingManager: hoisted.getThreadBindingManagerMock, - setThreadBindingIdleTimeoutBySessionKey: hoisted.setThreadBindingIdleTimeoutBySessionKeyMock, - setThreadBindingMaxAgeBySessionKey: hoisted.setThreadBindingMaxAgeBySessionKeyMock, - }; -}); - -vi.mock("../../../extensions/telegram/src/thread-bindings.js", async (importOriginal) => { - const actual = - await importOriginal(); - return { - ...actual, - setTelegramThreadBindingIdleTimeoutBySessionKey: - hoisted.setTelegramThreadBindingIdleTimeoutBySessionKeyMock, - setTelegramThreadBindingMaxAgeBySessionKey: - hoisted.setTelegramThreadBindingMaxAgeBySessionKeyMock, + createPluginRuntime: () => ({ + channel: { + discord: { + threadBindings: { + getManager: hoisted.getThreadBindingManagerMock, + resolveIdleTimeoutMs: discordThreadBindings.resolveThreadBindingIdleTimeoutMs, + resolveInactivityExpiresAt: + discordThreadBindings.resolveThreadBindingInactivityExpiresAt, + resolveMaxAgeMs: discordThreadBindings.resolveThreadBindingMaxAgeMs, + resolveMaxAgeExpiresAt: discordThreadBindings.resolveThreadBindingMaxAgeExpiresAt, + setIdleTimeoutBySessionKey: hoisted.setThreadBindingIdleTimeoutBySessionKeyMock, + setMaxAgeBySessionKey: hoisted.setThreadBindingMaxAgeBySessionKeyMock, + unbindBySessionKey: vi.fn(), + }, + }, + telegram: { + threadBindings: { + setIdleTimeoutBySessionKey: hoisted.setTelegramThreadBindingIdleTimeoutBySessionKeyMock, + setMaxAgeBySessionKey: hoisted.setTelegramThreadBindingMaxAgeBySessionKeyMock, + }, + }, + }, + }), }; }); @@ -168,6 +177,9 @@ function createFakeThreadBindingManager(binding: FakeBinding | null) { describe("/session idle and /session max-age", () => { beforeEach(() => { + setActivePluginRegistry( + createTestRegistry([{ pluginId: "telegram", source: "test", plugin: telegramPlugin }]), + ); hoisted.getThreadBindingManagerMock.mockReset(); hoisted.setThreadBindingIdleTimeoutBySessionKeyMock.mockReset(); hoisted.setThreadBindingMaxAgeBySessionKeyMock.mockReset(); diff --git a/src/auto-reply/reply/commands-session.ts b/src/auto-reply/reply/commands-session.ts index b04d5112345..07f984eb92e 100644 --- a/src/auto-reply/reply/commands-session.ts +++ b/src/auto-reply/reply/commands-session.ts @@ -1,18 +1,5 @@ -import { - formatThreadBindingDurationLabel, - getThreadBindingManager, - resolveThreadBindingIdleTimeoutMs, - resolveThreadBindingInactivityExpiresAt, - resolveThreadBindingMaxAgeExpiresAt, - resolveThreadBindingMaxAgeMs, - setThreadBindingIdleTimeoutBySessionKey, - setThreadBindingMaxAgeBySessionKey, -} from "../../../extensions/discord/src/monitor/thread-bindings.js"; -import { - setTelegramThreadBindingIdleTimeoutBySessionKey, - setTelegramThreadBindingMaxAgeBySessionKey, -} from "../../../extensions/telegram/src/thread-bindings.js"; import { resolveFastModeState } from "../../agents/fast-mode.js"; +import { formatThreadBindingDurationLabel } from "../../channels/thread-bindings-messages.js"; import { parseDurationMs } from "../../cli/parse-duration.js"; import { isRestartEnabled } from "../../config/commands.js"; import { logVerbose } from "../../globals.js"; @@ -20,6 +7,7 @@ import { getSessionBindingService } from "../../infra/outbound/session-binding-s import type { SessionBindingRecord } from "../../infra/outbound/session-binding-service.js"; import { scheduleGatewaySigusr1Restart, triggerOpenClawRestart } from "../../infra/restart.js"; import { loadCostUsageSummary, loadSessionCostSummary } from "../../infra/session-cost-usage.js"; +import { createPluginRuntime } from "../../plugins/runtime/index.js"; import { formatTokenCount, formatUsd } from "../../utils/usage-format.js"; import { parseActivationCommand } from "../group-activation.js"; import { parseSendPolicyCommand } from "../send-policy.js"; @@ -34,6 +22,7 @@ const SESSION_COMMAND_PREFIX = "/session"; const SESSION_DURATION_OFF_VALUES = new Set(["off", "disable", "disabled", "none", "0"]); const SESSION_ACTION_IDLE = "idle"; const SESSION_ACTION_MAX_AGE = "max-age"; +const channelRuntime = createPluginRuntime().channel; function resolveSessionCommandUsage() { return "Usage: /session idle | /session max-age (example: /session idle 24h)"; @@ -385,7 +374,9 @@ export const handleSessionCommand: CommandHandler = async (params, allowTextComm params.ctx.MessageThreadId != null ? String(params.ctx.MessageThreadId).trim() : ""; const telegramConversationId = onTelegram ? resolveTelegramConversationId(params) : undefined; - const discordManager = onDiscord ? getThreadBindingManager(accountId) : null; + const discordManager = onDiscord + ? channelRuntime.discord.threadBindings.getManager(accountId) + : null; if (onDiscord && !discordManager) { return { shouldContinue: false, @@ -433,13 +424,13 @@ export const handleSessionCommand: CommandHandler = async (params, allowTextComm } const idleTimeoutMs = onDiscord - ? resolveThreadBindingIdleTimeoutMs({ + ? channelRuntime.discord.threadBindings.resolveIdleTimeoutMs({ record: discordBinding!, defaultIdleTimeoutMs: discordManager!.getIdleTimeoutMs(), }) : resolveTelegramBindingDurationMs(telegramBinding!, "idleTimeoutMs", 24 * 60 * 60 * 1000); const idleExpiresAt = onDiscord - ? resolveThreadBindingInactivityExpiresAt({ + ? channelRuntime.discord.threadBindings.resolveInactivityExpiresAt({ record: discordBinding!, defaultIdleTimeoutMs: discordManager!.getIdleTimeoutMs(), }) @@ -447,13 +438,13 @@ export const handleSessionCommand: CommandHandler = async (params, allowTextComm ? resolveTelegramBindingLastActivityAt(telegramBinding!) + idleTimeoutMs : undefined; const maxAgeMs = onDiscord - ? resolveThreadBindingMaxAgeMs({ + ? channelRuntime.discord.threadBindings.resolveMaxAgeMs({ record: discordBinding!, defaultMaxAgeMs: discordManager!.getMaxAgeMs(), }) : resolveTelegramBindingDurationMs(telegramBinding!, "maxAgeMs", 0); const maxAgeExpiresAt = onDiscord - ? resolveThreadBindingMaxAgeExpiresAt({ + ? channelRuntime.discord.threadBindings.resolveMaxAgeExpiresAt({ record: discordBinding!, defaultMaxAgeMs: discordManager!.getMaxAgeMs(), }) @@ -528,24 +519,24 @@ export const handleSessionCommand: CommandHandler = async (params, allowTextComm const updatedBindings = (() => { if (onDiscord) { return action === SESSION_ACTION_IDLE - ? setThreadBindingIdleTimeoutBySessionKey({ + ? channelRuntime.discord.threadBindings.setIdleTimeoutBySessionKey({ targetSessionKey: discordBinding!.targetSessionKey, accountId, idleTimeoutMs: durationMs, }) - : setThreadBindingMaxAgeBySessionKey({ + : channelRuntime.discord.threadBindings.setMaxAgeBySessionKey({ targetSessionKey: discordBinding!.targetSessionKey, accountId, maxAgeMs: durationMs, }); } return action === SESSION_ACTION_IDLE - ? setTelegramThreadBindingIdleTimeoutBySessionKey({ + ? channelRuntime.telegram.threadBindings.setIdleTimeoutBySessionKey({ targetSessionKey: telegramBinding!.targetSessionKey, accountId, idleTimeoutMs: durationMs, }) - : setTelegramThreadBindingMaxAgeBySessionKey({ + : channelRuntime.telegram.threadBindings.setMaxAgeBySessionKey({ targetSessionKey: telegramBinding!.targetSessionKey, accountId, maxAgeMs: durationMs, diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts index fdce44e33f4..903a52592a3 100644 --- a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts +++ b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts @@ -102,15 +102,21 @@ vi.mock("../plugins/hook-runner-global.js", async (importOriginal) => { }; }); -vi.mock("../../extensions/discord/src/monitor/thread-bindings.js", async (importOriginal) => { - const actual = - await importOriginal< - typeof import("../../extensions/discord/src/monitor/thread-bindings.js") - >(); +vi.mock("../plugins/runtime/runtime-discord.js", async (importOriginal) => { + const actual = await importOriginal(); return { ...actual, - unbindThreadBindingsBySessionKey: (params: unknown) => - threadBindingMocks.unbindThreadBindingsBySessionKey(params), + createRuntimeDiscord: () => { + const runtime = actual.createRuntimeDiscord(); + return { + ...runtime, + threadBindings: { + ...runtime.threadBindings, + unbindBySessionKey: (params: unknown) => + threadBindingMocks.unbindThreadBindingsBySessionKey(params), + }, + }; + }, }; }); diff --git a/src/gateway/session-reset-service.ts b/src/gateway/session-reset-service.ts index b07bf0095dd..d662d60841b 100644 --- a/src/gateway/session-reset-service.ts +++ b/src/gateway/session-reset-service.ts @@ -1,5 +1,4 @@ import { randomUUID } from "node:crypto"; -import { unbindThreadBindingsBySessionKey } from "../../extensions/discord/src/monitor/thread-bindings.js"; import { getAcpSessionManager } from "../acp/control-plane/manager.js"; import { resolveDefaultAgentId } from "../agents/agent-scope.js"; import { clearBootstrapSnapshot } from "../agents/bootstrap-cache.js"; @@ -16,6 +15,7 @@ import { import { logVerbose } from "../globals.js"; import { createInternalHookEvent, triggerInternalHook } from "../hooks/internal-hooks.js"; import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; +import { createPluginRuntime } from "../plugins/runtime/index.js"; import { isSubagentSessionKey, normalizeAgentId, @@ -31,6 +31,7 @@ import { } from "./session-utils.js"; const ACP_RUNTIME_CLEANUP_TIMEOUT_MS = 15_000; +const channelRuntime = createPluginRuntime().channel; function stripRuntimeModelState(entry?: SessionEntry): SessionEntry | undefined { if (!entry) { @@ -70,7 +71,7 @@ export async function emitSessionUnboundLifecycleEvent(params: { emitHooks?: boolean; }) { const targetKind = isSubagentSessionKey(params.targetSessionKey) ? "subagent" : "acp"; - unbindThreadBindingsBySessionKey({ + channelRuntime.discord.threadBindings.unbindBySessionKey({ targetSessionKey: params.targetSessionKey, targetKind, reason: params.reason, diff --git a/src/infra/exec-approval-forwarder.ts b/src/infra/exec-approval-forwarder.ts index eba329f57ff..93a53d445c7 100644 --- a/src/infra/exec-approval-forwarder.ts +++ b/src/infra/exec-approval-forwarder.ts @@ -22,7 +22,6 @@ import type { ExecApprovalRequest, ExecApprovalResolved, } from "./exec-approvals.js"; -import { resolveBuiltInExecApprovalAdapter } from "./outbound/built-in-channel-adapters.js"; import { deliverOutboundPayloads } from "./outbound/deliver.js"; const log = createSubsystemLogger("gateway/exec-approvals"); @@ -119,8 +118,7 @@ function shouldSkipForwardingFallback(params: { if (!channel) { return false; } - const adapter = - getChannelPlugin(channel)?.execApprovals ?? resolveBuiltInExecApprovalAdapter(channel); + const adapter = getChannelPlugin(channel)?.execApprovals; return ( adapter?.shouldSuppressForwardingFallback?.({ cfg: params.cfg, @@ -278,9 +276,7 @@ function buildRequestPayloadForTarget( ): ReplyPayload { const channel = normalizeMessageChannel(target.channel) ?? target.channel; const pluginPayload = channel - ? ( - getChannelPlugin(channel)?.execApprovals ?? resolveBuiltInExecApprovalAdapter(channel) - )?.buildPendingPayload?.({ + ? getChannelPlugin(channel)?.execApprovals?.buildPendingPayload?.({ cfg, request, target, @@ -415,9 +411,7 @@ export function createExecApprovalForwarder( if (!channel) { return; } - await ( - getChannelPlugin(channel)?.execApprovals ?? resolveBuiltInExecApprovalAdapter(channel) - )?.beforeDeliverPending?.({ + await getChannelPlugin(channel)?.execApprovals?.beforeDeliverPending?.({ cfg, target, payload, diff --git a/src/infra/outbound/built-in-channel-adapters.ts b/src/infra/outbound/built-in-channel-adapters.ts deleted file mode 100644 index 335457dc48c..00000000000 --- a/src/infra/outbound/built-in-channel-adapters.ts +++ /dev/null @@ -1,127 +0,0 @@ -import { Separator, TextDisplay } from "@buape/carbon"; -import { - listDiscordAccountIds, - resolveDiscordAccount, -} from "../../../extensions/discord/src/accounts.js"; -import { isDiscordExecApprovalClientEnabled } from "../../../extensions/discord/src/exec-approvals.js"; -import { DiscordUiContainer } from "../../../extensions/discord/src/ui.js"; -import { listTelegramAccountIds } from "../../../extensions/telegram/src/accounts.js"; -import { buildTelegramExecApprovalButtons } from "../../../extensions/telegram/src/approval-buttons.js"; -import { - isTelegramExecApprovalClientEnabled, - resolveTelegramExecApprovalTarget, -} from "../../../extensions/telegram/src/exec-approvals.js"; -import type { ChannelExecApprovalAdapter } from "../../channels/plugins/types.adapters.js"; -import type { ChannelCrossContextComponentsFactory } from "../../channels/plugins/types.core.js"; -import type { ChannelId } from "../../channels/plugins/types.js"; -import type { OpenClawConfig } from "../../config/config.js"; -import { normalizeMessageChannel } from "../../utils/message-channel.js"; -import { resolveExecApprovalCommandDisplay } from "../exec-approval-command-display.js"; -import { buildExecApprovalPendingReplyPayload } from "../exec-approval-reply.js"; - -const BUILT_IN_DISCORD_CROSS_CONTEXT_COMPONENTS: ChannelCrossContextComponentsFactory = ( - params, -) => { - const trimmed = params.message.trim(); - const components: Array = []; - if (trimmed) { - components.push(new TextDisplay(params.message)); - components.push(new Separator({ divider: true, spacing: "small" })); - } - components.push(new TextDisplay(`*From ${params.originLabel}*`)); - return [new DiscordUiContainer({ cfg: params.cfg, accountId: params.accountId, components })]; -}; - -function hasDiscordExecApprovalDmRoute(cfg: OpenClawConfig): boolean { - return listDiscordAccountIds(cfg).some((accountId) => { - const execApprovals = resolveDiscordAccount({ cfg, accountId }).config.execApprovals; - if (!execApprovals?.enabled || (execApprovals.approvers?.length ?? 0) === 0) { - return false; - } - const target = execApprovals.target ?? "dm"; - return target === "dm" || target === "both"; - }); -} - -function hasTelegramExecApprovalDmRoute(cfg: OpenClawConfig): boolean { - return listTelegramAccountIds(cfg).some((accountId) => { - if (!isTelegramExecApprovalClientEnabled({ cfg, accountId })) { - return false; - } - const target = resolveTelegramExecApprovalTarget({ cfg, accountId }); - return target === "dm" || target === "both"; - }); -} - -const BUILT_IN_DISCORD_EXEC_APPROVALS: ChannelExecApprovalAdapter = { - getInitiatingSurfaceState: ({ cfg, accountId }) => - isDiscordExecApprovalClientEnabled({ cfg, accountId }) - ? { kind: "enabled" } - : { kind: "disabled" }, - hasConfiguredDmRoute: ({ cfg }) => hasDiscordExecApprovalDmRoute(cfg), - shouldSuppressForwardingFallback: ({ cfg, target }) => - (normalizeMessageChannel(target.channel) ?? target.channel) === "discord" && - isDiscordExecApprovalClientEnabled({ cfg, accountId: target.accountId }), -}; - -const BUILT_IN_TELEGRAM_EXEC_APPROVALS: ChannelExecApprovalAdapter = { - getInitiatingSurfaceState: ({ cfg, accountId }) => - isTelegramExecApprovalClientEnabled({ cfg, accountId }) - ? { kind: "enabled" } - : { kind: "disabled" }, - hasConfiguredDmRoute: ({ cfg }) => hasTelegramExecApprovalDmRoute(cfg), - shouldSuppressForwardingFallback: ({ cfg, target, request }) => { - const channel = normalizeMessageChannel(target.channel) ?? target.channel; - if (channel !== "telegram") { - return false; - } - const requestChannel = normalizeMessageChannel(request.request.turnSourceChannel ?? ""); - if (requestChannel !== "telegram") { - return false; - } - const accountId = target.accountId?.trim() || request.request.turnSourceAccountId?.trim(); - return isTelegramExecApprovalClientEnabled({ cfg, accountId }); - }, - buildPendingPayload: ({ request, nowMs }) => { - const payload = buildExecApprovalPendingReplyPayload({ - approvalId: request.id, - approvalSlug: request.id.slice(0, 8), - approvalCommandId: request.id, - command: resolveExecApprovalCommandDisplay(request.request).commandText, - cwd: request.request.cwd ?? undefined, - host: request.request.host === "node" ? "node" : "gateway", - nodeId: request.request.nodeId ?? undefined, - expiresAtMs: request.expiresAtMs, - nowMs, - }); - const buttons = buildTelegramExecApprovalButtons(request.id); - if (!buttons) { - return payload; - } - return { - ...payload, - channelData: { - ...payload.channelData, - telegram: { buttons }, - }, - }; - }, -}; - -export function resolveBuiltInCrossContextComponentsFactory( - channel: ChannelId, -): ChannelCrossContextComponentsFactory | undefined { - return channel === "discord" ? BUILT_IN_DISCORD_CROSS_CONTEXT_COMPONENTS : undefined; -} - -export function resolveBuiltInExecApprovalAdapter( - channel: ChannelId, -): ChannelExecApprovalAdapter | undefined { - if (channel === "discord") { - return BUILT_IN_DISCORD_EXEC_APPROVALS; - } - if (channel === "telegram") { - return BUILT_IN_TELEGRAM_EXEC_APPROVALS; - } - return undefined; -} diff --git a/src/infra/outbound/built-in-channel-messaging.ts b/src/infra/outbound/built-in-channel-messaging.ts deleted file mode 100644 index b06b6c41bc3..00000000000 --- a/src/infra/outbound/built-in-channel-messaging.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { parseTelegramTarget } from "../../../extensions/telegram/src/targets.js"; -import type { ChatType } from "../../channels/chat-type.js"; -import type { ChannelId } from "../../channels/plugins/types.js"; -import { isWhatsAppGroupJid, normalizeWhatsAppTarget } from "../../whatsapp/normalize.js"; - -export type BuiltInExplicitTarget = { - to: string; - threadId?: string | number; - chatType?: ChatType; -}; - -export function resolveBuiltInExplicitTarget( - channel: ChannelId, - raw: string, -): BuiltInExplicitTarget | null { - if (channel === "telegram") { - const target = parseTelegramTarget(raw); - return { - to: target.chatId, - threadId: target.messageThreadId, - chatType: target.chatType === "unknown" ? undefined : target.chatType, - }; - } - - if (channel === "whatsapp") { - const normalized = normalizeWhatsAppTarget(raw); - if (!normalized) { - return null; - } - return { - to: normalized, - chatType: isWhatsAppGroupJid(normalized) ? "group" : "direct", - }; - } - - return null; -} - -export function resolveBuiltInTargetChatType(channel: ChannelId, to: string): ChatType | undefined { - return resolveBuiltInExplicitTarget(channel, to)?.chatType; -} diff --git a/src/infra/outbound/channel-adapters.ts b/src/infra/outbound/channel-adapters.ts index 4f541bf0f84..0c752854e8d 100644 --- a/src/infra/outbound/channel-adapters.ts +++ b/src/infra/outbound/channel-adapters.ts @@ -2,7 +2,6 @@ import type { TopLevelComponents } from "@buape/carbon"; import { getChannelPlugin } from "../../channels/plugins/index.js"; import type { ChannelId } from "../../channels/plugins/types.js"; import type { OpenClawConfig } from "../../config/config.js"; -import { resolveBuiltInCrossContextComponentsFactory } from "./built-in-channel-adapters.js"; export type CrossContextComponentsBuilder = (message: string) => TopLevelComponents[]; @@ -23,9 +22,7 @@ const DEFAULT_ADAPTER: ChannelMessageAdapter = { }; export function getChannelMessageAdapter(channel: ChannelId): ChannelMessageAdapter { - const adapter = - getChannelPlugin(channel)?.messaging?.buildCrossContextComponents ?? - resolveBuiltInCrossContextComponentsFactory(channel); + const adapter = getChannelPlugin(channel)?.messaging?.buildCrossContextComponents; if (adapter) { return { supportsComponentsV2: true, diff --git a/src/infra/outbound/targets.test.ts b/src/infra/outbound/targets.test.ts index cfe8d06bd25..2ac707a804d 100644 --- a/src/infra/outbound/targets.test.ts +++ b/src/infra/outbound/targets.test.ts @@ -387,7 +387,7 @@ describe("resolveSessionDeliveryTarget", () => { expect(resolved.threadId).toBeUndefined(); }); - it("keeps :topic: parsing when the telegram plugin registry is unavailable", () => { + it("keeps raw :topic: targets when the telegram plugin registry is unavailable", () => { setActivePluginRegistry(createTestRegistry([])); const resolved = resolveSessionDeliveryTarget({ @@ -401,8 +401,8 @@ describe("resolveSessionDeliveryTarget", () => { explicitTo: "63448508:topic:1008013", }); - expect(resolved.to).toBe("63448508"); - expect(resolved.threadId).toBe(1008013); + expect(resolved.to).toBe("63448508:topic:1008013"); + expect(resolved.threadId).toBeUndefined(); }); it("explicitThreadId takes priority over :topic: parsed value", () => { diff --git a/src/infra/outbound/targets.ts b/src/infra/outbound/targets.ts index 313db3cf636..b15dfb881b2 100644 --- a/src/infra/outbound/targets.ts +++ b/src/infra/outbound/targets.ts @@ -16,10 +16,6 @@ import { isDeliverableMessageChannel, normalizeMessageChannel, } from "../../utils/message-channel.js"; -import { - resolveBuiltInExplicitTarget, - resolveBuiltInTargetChatType, -} from "./built-in-channel-messaging.js"; import { normalizeDeliverableOutboundChannel, resolveOutboundChannelPlugin, @@ -78,7 +74,7 @@ function parseExplicitTargetWithPlugin(params: { return ( resolveOutboundChannelPlugin({ channel: provider })?.messaging?.parseExplicitTarget?.({ raw, - }) ?? resolveBuiltInExplicitTarget(provider, raw) + }) ?? null ); } @@ -422,7 +418,7 @@ function inferChatTypeFromTarget(params: { return ( resolveOutboundChannelPlugin({ channel: params.channel, - })?.messaging?.inferTargetChatType?.({ to }) ?? resolveBuiltInTargetChatType(params.channel, to) + })?.messaging?.inferTargetChatType?.({ to }) ?? undefined ); } diff --git a/src/plugins/runtime/runtime-discord.ts b/src/plugins/runtime/runtime-discord.ts index 6f827f27504..1ca7b32bf9b 100644 --- a/src/plugins/runtime/runtime-discord.ts +++ b/src/plugins/runtime/runtime-discord.ts @@ -4,6 +4,16 @@ import { listDiscordDirectoryPeersLive, } from "../../../extensions/discord/src/directory-live.js"; import { monitorDiscordProvider } from "../../../extensions/discord/src/monitor.js"; +import { + getThreadBindingManager, + resolveThreadBindingIdleTimeoutMs, + resolveThreadBindingInactivityExpiresAt, + resolveThreadBindingMaxAgeExpiresAt, + resolveThreadBindingMaxAgeMs, + setThreadBindingIdleTimeoutBySessionKey, + setThreadBindingMaxAgeBySessionKey, + unbindThreadBindingsBySessionKey, +} from "../../../extensions/discord/src/monitor/thread-bindings.js"; import { probeDiscord } from "../../../extensions/discord/src/probe.js"; import { resolveDiscordChannelAllowlist } from "../../../extensions/discord/src/resolve-channels.js"; import { resolveDiscordUserAllowlist } from "../../../extensions/discord/src/resolve-users.js"; @@ -36,6 +46,16 @@ export function createRuntimeDiscord(): PluginRuntimeChannel["discord"] { sendMessageDiscord, sendPollDiscord, monitorDiscordProvider, + threadBindings: { + getManager: getThreadBindingManager, + resolveIdleTimeoutMs: resolveThreadBindingIdleTimeoutMs, + resolveInactivityExpiresAt: resolveThreadBindingInactivityExpiresAt, + resolveMaxAgeMs: resolveThreadBindingMaxAgeMs, + resolveMaxAgeExpiresAt: resolveThreadBindingMaxAgeExpiresAt, + setIdleTimeoutBySessionKey: setThreadBindingIdleTimeoutBySessionKey, + setMaxAgeBySessionKey: setThreadBindingMaxAgeBySessionKey, + unbindBySessionKey: unbindThreadBindingsBySessionKey, + }, typing: { pulse: sendTypingDiscord, start: async ({ channelId, accountId, cfg, intervalMs }) => diff --git a/src/plugins/runtime/runtime-telegram.ts b/src/plugins/runtime/runtime-telegram.ts index 864761480ff..1e6993ef489 100644 --- a/src/plugins/runtime/runtime-telegram.ts +++ b/src/plugins/runtime/runtime-telegram.ts @@ -15,6 +15,10 @@ import { sendTypingTelegram, unpinMessageTelegram, } from "../../../extensions/telegram/src/send.js"; +import { + setTelegramThreadBindingIdleTimeoutBySessionKey, + setTelegramThreadBindingMaxAgeBySessionKey, +} from "../../../extensions/telegram/src/thread-bindings.js"; import { resolveTelegramToken } from "../../../extensions/telegram/src/token.js"; import { telegramMessageActions } from "../../channels/plugins/actions/telegram.js"; import { createTelegramTypingLease } from "./runtime-telegram-typing.js"; @@ -30,6 +34,10 @@ export function createRuntimeTelegram(): PluginRuntimeChannel["telegram"] { sendPollTelegram, monitorTelegramProvider, messageActions: telegramMessageActions, + threadBindings: { + setIdleTimeoutBySessionKey: setTelegramThreadBindingIdleTimeoutBySessionKey, + setMaxAgeBySessionKey: setTelegramThreadBindingMaxAgeBySessionKey, + }, typing: { pulse: sendTypingTelegram, start: async ({ to, accountId, cfg, intervalMs, messageThreadId }) => diff --git a/src/plugins/runtime/types-channel.ts b/src/plugins/runtime/types-channel.ts index f2e775b7275..b0b5941f24d 100644 --- a/src/plugins/runtime/types-channel.ts +++ b/src/plugins/runtime/types-channel.ts @@ -98,6 +98,16 @@ export type PluginRuntimeChannel = { sendMessageDiscord: typeof import("../../../extensions/discord/src/send.js").sendMessageDiscord; sendPollDiscord: typeof import("../../../extensions/discord/src/send.js").sendPollDiscord; monitorDiscordProvider: typeof import("../../../extensions/discord/src/monitor.js").monitorDiscordProvider; + threadBindings: { + getManager: typeof import("../../../extensions/discord/src/monitor/thread-bindings.js").getThreadBindingManager; + resolveIdleTimeoutMs: typeof import("../../../extensions/discord/src/monitor/thread-bindings.js").resolveThreadBindingIdleTimeoutMs; + resolveInactivityExpiresAt: typeof import("../../../extensions/discord/src/monitor/thread-bindings.js").resolveThreadBindingInactivityExpiresAt; + resolveMaxAgeMs: typeof import("../../../extensions/discord/src/monitor/thread-bindings.js").resolveThreadBindingMaxAgeMs; + resolveMaxAgeExpiresAt: typeof import("../../../extensions/discord/src/monitor/thread-bindings.js").resolveThreadBindingMaxAgeExpiresAt; + setIdleTimeoutBySessionKey: typeof import("../../../extensions/discord/src/monitor/thread-bindings.js").setThreadBindingIdleTimeoutBySessionKey; + setMaxAgeBySessionKey: typeof import("../../../extensions/discord/src/monitor/thread-bindings.js").setThreadBindingMaxAgeBySessionKey; + unbindBySessionKey: typeof import("../../../extensions/discord/src/monitor/thread-bindings.js").unbindThreadBindingsBySessionKey; + }; typing: { pulse: typeof import("../../../extensions/discord/src/send.js").sendTypingDiscord; start: (params: { @@ -138,6 +148,10 @@ export type PluginRuntimeChannel = { sendPollTelegram: typeof import("../../../extensions/telegram/src/send.js").sendPollTelegram; monitorTelegramProvider: typeof import("../../../extensions/telegram/src/monitor.js").monitorTelegramProvider; messageActions: typeof import("../../channels/plugins/actions/telegram.js").telegramMessageActions; + threadBindings: { + setIdleTimeoutBySessionKey: typeof import("../../../extensions/telegram/src/thread-bindings.js").setTelegramThreadBindingIdleTimeoutBySessionKey; + setMaxAgeBySessionKey: typeof import("../../../extensions/telegram/src/thread-bindings.js").setTelegramThreadBindingMaxAgeBySessionKey; + }; typing: { pulse: typeof import("../../../extensions/telegram/src/send.js").sendTypingTelegram; start: (params: {