From c13b35b83d5b9b372b913c92e7ef479ca8dfd7a5 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Mon, 2 Mar 2026 09:06:10 +0530 Subject: [PATCH] feat(telegram): improve DM topics support (#30579) (thanks @kesor) --- CHANGELOG.md | 1 + src/auto-reply/reply/inbound-meta.ts | 1 + src/auto-reply/templating.ts | 2 + src/config/types.telegram.ts | 22 +++++++ src/config/zod-schema.providers-core.ts | 15 +++++ src/gateway/origin-check.test.ts | 2 +- src/infra/outbound/outbound-session.ts | 17 +++++- src/infra/outbound/outbound.test.ts | 13 ++++ src/telegram/bot-handlers.ts | 65 +++++++++++++++++--- src/telegram/bot-message-context.ts | 52 +++++++++++++--- src/telegram/bot-native-commands.ts | 22 ++++++- src/telegram/bot.test.ts | 81 +++++++++++++++++++++++++ src/telegram/bot.ts | 25 +++++--- src/telegram/bot/helpers.ts | 25 ++++++-- src/telegram/group-access.ts | 28 +++++++-- src/telegram/group-config-helpers.ts | 8 ++- 16 files changed, 335 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index be073477e14..9e9ea26cf6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai - Feishu/Chat tooling: add `feishu_chat` tool actions for chat info and member queries, with configurable enablement under `channels.feishu.tools.chat`. (#14674) Thanks @liuweifly. - Feishu/Doc permissions: support optional owner permission grant fields on `feishu_doc` create and report permission metadata only when the grant call succeeds, with regression coverage for success/failure/omitted-owner paths. (#28295) Thanks @zhoulongchao77. - Memory/LanceDB: support custom OpenAI `baseUrl` and embedding dimensions for LanceDB memory. (#17874) Thanks @rish2jain and @vincentkoc. +- Telegram/DM topics: add per-DM `direct` + topic config (allowlists, `dmPolicy`, `skills`, `systemPrompt`, `requireTopic`), route DM topics as distinct inbound/outbound sessions, and enforce topic-aware authorization/debounce for messages, callbacks, commands, and reactions. Landed from contributor PR #30579 by @kesor. Thanks @kesor. - ACP/ACPX streaming: pin ACPX plugin support to `0.1.15`, add configurable ACPX command/version probing, and streamline ACP stream delivery (`final_only` default + reduced tool-event noise) with matching runtime and test updates. (#30036) Thanks @osolmaz. - Cron/Heartbeat light bootstrap context: add opt-in lightweight bootstrap mode for automation runs (`--light-context` for cron agent turns and `agents.*.heartbeat.lightContext` for heartbeat), keeping only `HEARTBEAT.md` for heartbeat runs and skipping bootstrap-file injection for cron lightweight runs. (#26064) Thanks @jose-velez. diff --git a/src/auto-reply/reply/inbound-meta.ts b/src/auto-reply/reply/inbound-meta.ts index 8b517a5d9c7..4d417e5e0fa 100644 --- a/src/auto-reply/reply/inbound-meta.ts +++ b/src/auto-reply/reply/inbound-meta.ts @@ -107,6 +107,7 @@ export function buildInboundUserContextPrefix(ctx: TemplateContext): string { group_channel: safeTrim(ctx.GroupChannel), group_space: safeTrim(ctx.GroupSpace), thread_label: safeTrim(ctx.ThreadLabel), + topic_id: ctx.MessageThreadId != null ? String(ctx.MessageThreadId) : undefined, is_forum: ctx.IsForum === true ? true : undefined, is_group_chat: !isDirect ? true : undefined, was_mentioned: ctx.WasMentioned === true ? true : undefined, diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts index 0910addc162..f0934279c80 100644 --- a/src/auto-reply/templating.ts +++ b/src/auto-reply/templating.ts @@ -139,6 +139,8 @@ export type MsgContext = { MessageThreadId?: string | number; /** Telegram forum supergroup marker. */ IsForum?: boolean; + /** Warning: DM has topics enabled but this message is not in a topic. */ + TopicRequiredButMissing?: boolean; /** * Originating channel for reply routing. * When set, replies should be routed back to this provider diff --git a/src/config/types.telegram.ts b/src/config/types.telegram.ts index 3417cbb496e..408e7906ed5 100644 --- a/src/config/types.telegram.ts +++ b/src/config/types.telegram.ts @@ -79,6 +79,8 @@ export type TelegramAccountConfig = { /** Control reply threading when reply tags are present (off|first|all). */ replyToMode?: ReplyToMode; groups?: Record; + /** Per-DM configuration for Telegram DM topics (key is chat ID). */ + direct?: Record; /** DM allowlist (numeric Telegram user IDs). Onboarding can resolve @username to IDs. */ allowFrom?: Array; /** Default delivery target for CLI `--deliver` when no explicit `--reply-to` is provided. */ @@ -204,6 +206,26 @@ export type TelegramGroupConfig = { systemPrompt?: string; }; +export type TelegramDirectConfig = { + /** Per-DM override for DM message policy (open|disabled|allowlist). */ + dmPolicy?: DmPolicy; + /** Optional tool policy overrides for this DM. */ + tools?: GroupToolPolicyConfig; + toolsBySender?: GroupToolPolicyBySenderConfig; + /** If specified, only load these skills for this DM (when no topic). Omit = all skills; empty = no skills. */ + skills?: string[]; + /** Per-topic configuration for DM topics (key is message_thread_id as string) */ + topics?: Record; + /** If false, disable the bot for this DM (and its topics). */ + enabled?: boolean; + /** If true, require messages to be from a topic when topics are enabled. */ + requireTopic?: boolean; + /** Optional allowlist for DM senders (numeric Telegram user IDs). */ + allowFrom?: Array; + /** Optional system prompt snippet for this DM. */ + systemPrompt?: string; +}; + export type TelegramConfig = { /** Optional per-account Telegram configuration (multi-account). */ accounts?: Record; diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index 260e202bc04..1ca8aad7888 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -79,6 +79,20 @@ export const TelegramGroupSchema = z }) .strict(); +export const TelegramDirectSchema = z + .object({ + dmPolicy: DmPolicySchema.optional(), + tools: ToolPolicySchema, + toolsBySender: ToolPolicyBySenderSchema, + skills: z.array(z.string()).optional(), + enabled: z.boolean().optional(), + allowFrom: z.array(z.union([z.string(), z.number()])).optional(), + systemPrompt: z.string().optional(), + topics: z.record(z.string(), TelegramTopicSchema.optional()).optional(), + requireTopic: z.boolean().optional(), + }) + .strict(); + const TelegramCustomCommandSchema = z .object({ command: z.string().transform(normalizeTelegramCommandName), @@ -148,6 +162,7 @@ export const TelegramAccountSchemaBase = z historyLimit: z.number().int().min(0).optional(), dmHistoryLimit: z.number().int().min(0).optional(), dms: z.record(z.string(), DmConfigSchema.optional()).optional(), + direct: z.record(z.string(), TelegramDirectSchema.optional()).optional(), textChunkLimit: z.number().int().positive().optional(), chunkMode: z.enum(["length", "newline"]).optional(), streaming: z.union([z.boolean(), z.enum(["off", "partial", "block", "progress"])]).optional(), diff --git a/src/gateway/origin-check.test.ts b/src/gateway/origin-check.test.ts index 0797d6c4066..a239e7e6f78 100644 --- a/src/gateway/origin-check.test.ts +++ b/src/gateway/origin-check.test.ts @@ -79,7 +79,7 @@ describe("checkBrowserOrigin", () => { expect(result.ok).toBe(true); }); - it('accepts wildcard entries with surrounding whitespace', () => { + it("accepts wildcard entries with surrounding whitespace", () => { const result = checkBrowserOrigin({ requestHost: "100.86.79.37:18789", origin: "https://100.86.79.37:18789", diff --git a/src/infra/outbound/outbound-session.ts b/src/infra/outbound/outbound-session.ts index b310f454923..3655c6e69ff 100644 --- a/src/infra/outbound/outbound-session.ts +++ b/src/infra/outbound/outbound-session.ts @@ -293,7 +293,9 @@ function resolveTelegramSession( (chatType === "unknown" && params.resolvedTarget?.kind && params.resolvedTarget.kind !== "user"); - const peerId = isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : chatId; + // For groups: include thread ID in peerId. For DMs: use simple chatId (thread handled via suffix). + const peerId = + isGroup && resolvedThreadId ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : chatId; const peer: RoutePeer = { kind: isGroup ? "group" : "direct", id: peerId, @@ -305,12 +307,21 @@ function resolveTelegramSession( accountId: params.accountId, peer, }); + // Use thread suffix for DM topics to match inbound session key format + const threadKeys = + resolvedThreadId && !isGroup + ? { sessionKey: `${baseSessionKey}:thread:${resolvedThreadId}` } + : null; return { - sessionKey: baseSessionKey, + sessionKey: threadKeys?.sessionKey ?? baseSessionKey, baseSessionKey, peer, chatType: isGroup ? "group" : "direct", - from: isGroup ? `telegram:group:${peerId}` : `telegram:${chatId}`, + from: isGroup + ? `telegram:group:${peerId}` + : resolvedThreadId + ? `telegram:${chatId}:topic:${resolvedThreadId}` + : `telegram:${chatId}`, to: `telegram:${chatId}`, threadId: resolvedThreadId, }; diff --git a/src/infra/outbound/outbound.test.ts b/src/infra/outbound/outbound.test.ts index 4ce6afd0618..08e6876a751 100644 --- a/src/infra/outbound/outbound.test.ts +++ b/src/infra/outbound/outbound.test.ts @@ -925,6 +925,19 @@ describe("resolveOutboundSessionRoute", () => { threadId: 42, }, }, + { + name: "Telegram DM with topic", + cfg: perChannelPeerCfg, + channel: "telegram", + target: "123456789:topic:99", + expected: { + sessionKey: "agent:main:telegram:direct:123456789:thread:99", + from: "telegram:123456789:topic:99", + to: "telegram:123456789", + threadId: 99, + chatType: "direct", + }, + }, { name: "Telegram unresolved username DM", cfg: perChannelPeerCfg, diff --git a/src/telegram/bot-handlers.ts b/src/telegram/bot-handlers.ts index 59e84ab0f28..17ba2a29ac3 100644 --- a/src/telegram/bot-handlers.ts +++ b/src/telegram/bot-handlers.ts @@ -18,7 +18,11 @@ import { loadConfig } from "../config/config.js"; import { writeConfigFile } from "../config/io.js"; import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; import type { DmPolicy } from "../config/types.base.js"; -import type { TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js"; +import type { + TelegramDirectConfig, + TelegramGroupConfig, + TelegramTopicConfig, +} from "../config/types.js"; import { danger, logVerbose, warn } from "../globals.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { MediaFetchError } from "../media/fetch.js"; @@ -608,22 +612,30 @@ export const registerTelegramHandlers = ({ const resolveTelegramEventAuthorizationContext = async (params: { chatId: number; + isGroup: boolean; isForum: boolean; messageThreadId?: number; groupAllowContext?: TelegramGroupAllowContext; }): Promise => { - const dmPolicy = telegramCfg.dmPolicy ?? "pairing"; const groupAllowContext = params.groupAllowContext ?? (await resolveTelegramGroupAllowFromContext({ chatId: params.chatId, accountId, + isGroup: params.isGroup, isForum: params.isForum, messageThreadId: params.messageThreadId, groupAllowFrom, resolveTelegramGroupConfig, })); - return { dmPolicy, ...groupAllowContext }; + // Use direct config dmPolicy override if available for DMs + const effectiveDmPolicy = + !params.isGroup && + groupAllowContext.groupConfig && + "dmPolicy" in groupAllowContext.groupConfig + ? (groupAllowContext.groupConfig.dmPolicy ?? telegramCfg.dmPolicy ?? "pairing") + : (telegramCfg.dmPolicy ?? "pairing"); + return { dmPolicy: effectiveDmPolicy, ...groupAllowContext }; }; const authorizeTelegramEventSender = (params: { @@ -642,6 +654,7 @@ export const registerTelegramHandlers = ({ storeAllowFrom, groupConfig, topicConfig, + groupAllowOverride, effectiveGroupAllow, hasGroupAllowOverride, } = context; @@ -677,8 +690,10 @@ export const registerTelegramHandlers = ({ return { allowed: false, reason: "direct-disabled" }; } if (dmPolicy !== "open") { + // For DMs, prefer per-DM/topic allowFrom (groupAllowOverride) over account-level allowFrom + const dmAllowFrom = groupAllowOverride ?? allowFrom; const effectiveDmAllow = normalizeDmAllowFromWithStore({ - allowFrom, + allowFrom: dmAllowFrom, storeAllowFrom, dmPolicy, }); @@ -729,6 +744,7 @@ export const registerTelegramHandlers = ({ } const eventAuthContext = await resolveTelegramEventAuthorizationContext({ chatId, + isGroup, isForum, }); const senderAuthorization = authorizeTelegramEventSender({ @@ -744,6 +760,20 @@ export const registerTelegramHandlers = ({ return; } + // Enforce requireTopic for DM reactions: since Telegram doesn't provide messageThreadId + // for reactions, we cannot determine if the reaction came from a topic, so block all + // reactions if requireTopic is enabled for this DM. + if (!isGroup) { + const requireTopic = (eventAuthContext.groupConfig as TelegramDirectConfig | undefined) + ?.requireTopic; + if (requireTopic === true) { + logVerbose( + `Blocked telegram reaction in DM ${chatId}: requireTopic=true but topic unknown for reactions`, + ); + return; + } + } + // Detect added reactions. const oldEmojis = new Set( reaction.old_reaction @@ -811,6 +841,7 @@ export const registerTelegramHandlers = ({ msg: Message; chatId: number; resolvedThreadId?: number; + dmThreadId?: number; storeAllowFrom: string[]; sendOversizeWarning: boolean; oversizeLogMessage: string; @@ -820,6 +851,7 @@ export const registerTelegramHandlers = ({ msg, chatId, resolvedThreadId, + dmThreadId, storeAllowFrom, sendOversizeWarning, oversizeLogMessage, @@ -832,7 +864,9 @@ export const registerTelegramHandlers = ({ if (text && !isCommandLike) { const nowMs = Date.now(); const senderId = msg.from?.id != null ? String(msg.from.id) : "unknown"; - const key = `text:${chatId}:${resolvedThreadId ?? "main"}:${senderId}`; + // Use resolvedThreadId for forum groups, dmThreadId for DM topics + const threadId = resolvedThreadId ?? dmThreadId; + const key = `text:${chatId}:${threadId ?? "main"}:${senderId}`; const existing = textFragmentBuffer.get(key); if (existing) { @@ -970,8 +1004,9 @@ export const registerTelegramHandlers = ({ ] : []; const senderId = msg.from?.id ? String(msg.from.id) : ""; + const conversationThreadId = resolvedThreadId ?? dmThreadId; const conversationKey = - resolvedThreadId != null ? `${chatId}:topic:${resolvedThreadId}` : String(chatId); + conversationThreadId != null ? `${chatId}:topic:${conversationThreadId}` : String(chatId); const debounceLane = resolveTelegramDebounceLane(msg); const debounceKey = senderId ? `telegram:${accountId ?? "default"}:${conversationKey}:${senderId}:${debounceLane}` @@ -1065,10 +1100,18 @@ export const registerTelegramHandlers = ({ const isForum = callbackMessage.chat.is_forum === true; const eventAuthContext = await resolveTelegramEventAuthorizationContext({ chatId, + isGroup, isForum, messageThreadId, }); - const { resolvedThreadId, storeAllowFrom } = eventAuthContext; + const { resolvedThreadId, dmThreadId, storeAllowFrom, groupConfig } = eventAuthContext; + const requireTopic = (groupConfig as { requireTopic?: boolean } | undefined)?.requireTopic; + if (!isGroup && requireTopic === true && dmThreadId == null) { + logVerbose( + `Blocked telegram callback in DM ${chatId}: requireTopic=true but no topic present`, + ); + return; + } const senderId = callback.from?.id ? String(callback.from.id) : ""; const senderUsername = callback.from?.username ?? ""; const authorizationMode: TelegramEventAuthorizationMode = @@ -1323,20 +1366,25 @@ export const registerTelegramHandlers = ({ } const eventAuthContext = await resolveTelegramEventAuthorizationContext({ chatId: event.chatId, + isGroup: event.isGroup, isForum: event.isForum, messageThreadId: event.messageThreadId, }); const { dmPolicy, resolvedThreadId, + dmThreadId, storeAllowFrom, groupConfig, topicConfig, + groupAllowOverride, effectiveGroupAllow, hasGroupAllowOverride, } = eventAuthContext; + // For DMs, prefer per-DM/topic allowFrom (groupAllowOverride) over account-level allowFrom + const dmAllowFrom = groupAllowOverride ?? allowFrom; const effectiveDmAllow = normalizeDmAllowFromWithStore({ - allowFrom, + allowFrom: dmAllowFrom, storeAllowFrom, dmPolicy, }); @@ -1384,6 +1432,7 @@ export const registerTelegramHandlers = ({ msg: event.msg, chatId: event.chatId, resolvedThreadId, + dmThreadId, storeAllowFrom, sendOversizeWarning: event.sendOversizeWarning, oversizeLogMessage: event.oversizeLogMessage, diff --git a/src/telegram/bot-message-context.ts b/src/telegram/bot-message-context.ts index 10aa207c7f2..7db6f7838fa 100644 --- a/src/telegram/bot-message-context.ts +++ b/src/telegram/bot-message-context.ts @@ -30,7 +30,12 @@ import { import type { OpenClawConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js"; import { readSessionUpdatedAt, resolveStorePath } from "../config/sessions.js"; -import type { DmPolicy, TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js"; +import type { + DmPolicy, + TelegramDirectConfig, + TelegramGroupConfig, + TelegramTopicConfig, +} from "../config/types.js"; import { logVerbose, shouldLogVerbose } from "../globals.js"; import { recordChannelActivity } from "../infra/channel-activity.js"; import { resolveAgentRoute } from "../routing/resolve-route.js"; @@ -87,7 +92,10 @@ type TelegramLogger = { type ResolveTelegramGroupConfig = ( chatId: string | number, messageThreadId?: number, -) => { groupConfig?: TelegramGroupConfig; topicConfig?: TelegramTopicConfig }; +) => { + groupConfig?: TelegramGroupConfig | TelegramDirectConfig; + topicConfig?: TelegramTopicConfig; +}; type ResolveGroupActivation = (params: { chatId: string | number; @@ -174,7 +182,14 @@ export const buildTelegramMessageContext = async ({ }); const resolvedThreadId = threadSpec.scope === "forum" ? threadSpec.id : undefined; const replyThreadId = threadSpec.id; - const { groupConfig, topicConfig } = resolveTelegramGroupConfig(chatId, resolvedThreadId); + const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined; + const threadIdForConfig = resolvedThreadId ?? dmThreadId; + const { groupConfig, topicConfig } = resolveTelegramGroupConfig(chatId, threadIdForConfig); + // Use direct config dmPolicy override if available for DMs + const effectiveDmPolicy = + !isGroup && groupConfig && "dmPolicy" in groupConfig + ? (groupConfig.dmPolicy ?? dmPolicy) + : dmPolicy; const peerId = isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId); const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId }); // Fresh config for bindings lookup; other routing inputs are payload-derived. @@ -200,16 +215,22 @@ export const buildTelegramMessageContext = async ({ return null; } const baseSessionKey = route.sessionKey; - // DMs: use raw messageThreadId for thread sessions (not forum topic ids) - const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined; + // DMs: use thread suffix for session isolation (works regardless of dmScope) const threadKeys = dmThreadId != null ? resolveThreadSessionKeys({ baseSessionKey, threadId: `${chatId}:${dmThreadId}` }) : null; const sessionKey = threadKeys?.sessionKey ?? baseSessionKey; const mentionRegexes = buildMentionRegexes(cfg, route.agentId); - const effectiveDmAllow = normalizeDmAllowFromWithStore({ allowFrom, storeAllowFrom, dmPolicy }); + // Calculate groupAllowOverride first - it's needed for both DM and group allowlist checks const groupAllowOverride = firstDefined(topicConfig?.allowFrom, groupConfig?.allowFrom); + // For DMs, prefer per-DM/topic allowFrom (groupAllowOverride) over account-level allowFrom + const dmAllowFrom = groupAllowOverride ?? allowFrom; + const effectiveDmAllow = normalizeDmAllowFromWithStore({ + allowFrom: dmAllowFrom, + storeAllowFrom, + dmPolicy: effectiveDmPolicy, + }); // Group sender checks are explicit and must not inherit DM pairing-store entries. const effectiveGroupAllow = normalizeAllowFrom(groupAllowOverride ?? groupAllowFrom); const hasGroupAllowOverride = typeof groupAllowOverride !== "undefined"; @@ -237,7 +258,11 @@ export const buildTelegramMessageContext = async ({ ); return null; } - logVerbose(`Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)`); + logVerbose( + isGroup + ? `Blocked telegram group sender ${senderId || "unknown"} (group allowFrom override)` + : `Blocked telegram DM sender ${senderId || "unknown"} (DM allowFrom override)`, + ); return null; } @@ -252,10 +277,17 @@ export const buildTelegramMessageContext = async ({ const requireMention = firstDefined( activationOverride, topicConfig?.requireMention, - groupConfig?.requireMention, + (groupConfig as TelegramGroupConfig | undefined)?.requireMention, baseRequireMention, ); + const requireTopic = (groupConfig as TelegramDirectConfig | undefined)?.requireTopic; + const topicRequiredButMissing = !isGroup && requireTopic === true && dmThreadId == null; + if (topicRequiredButMissing) { + logVerbose(`Blocked telegram DM ${chatId}: requireTopic=true but no topic present`); + return null; + } + const sendTyping = async () => { await withTelegramApiErrorLogging({ operation: "sendChatAction", @@ -287,7 +319,7 @@ export const buildTelegramMessageContext = async ({ if ( !(await enforceTelegramDmAccess({ isGroup, - dmPolicy, + dmPolicy: effectiveDmPolicy, msg, chatId, effectiveDmAllow, @@ -669,7 +701,7 @@ export const buildTelegramMessageContext = async ({ ChatType: isGroup ? "group" : "direct", ConversationLabel: conversationLabel, GroupSubject: isGroup ? (msg.chat.title ?? undefined) : undefined, - GroupSystemPrompt: isGroup ? groupSystemPrompt : undefined, + GroupSystemPrompt: isGroup || (!isGroup && groupConfig) ? groupSystemPrompt : undefined, SenderName: senderName, SenderId: senderId || undefined, SenderUsername: senderUsername || undefined, diff --git a/src/telegram/bot-native-commands.ts b/src/telegram/bot-native-commands.ts index cc9846cbcd5..0f07fc363da 100644 --- a/src/telegram/bot-native-commands.ts +++ b/src/telegram/bot-native-commands.ts @@ -26,6 +26,7 @@ import { import type { ReplyToMode, TelegramAccountConfig, + TelegramDirectConfig, TelegramGroupConfig, TelegramTopicConfig, } from "../config/types.js"; @@ -172,6 +173,7 @@ async function resolveTelegramCommandAuth(params: { const groupAllowContext = await resolveTelegramGroupAllowFromContext({ chatId, accountId, + isGroup, isForum, messageThreadId, groupAllowFrom, @@ -179,12 +181,26 @@ async function resolveTelegramCommandAuth(params: { }); const { resolvedThreadId, + dmThreadId, storeAllowFrom, groupConfig, topicConfig, + groupAllowOverride, effectiveGroupAllow, hasGroupAllowOverride, } = groupAllowContext; + // Use direct config dmPolicy override if available for DMs + const effectiveDmPolicy = + !isGroup && groupConfig && "dmPolicy" in groupConfig + ? (groupConfig.dmPolicy ?? telegramCfg.dmPolicy ?? "pairing") + : (telegramCfg.dmPolicy ?? "pairing"); + const requireTopic = (groupConfig as TelegramDirectConfig | undefined)?.requireTopic; + if (!isGroup && requireTopic === true && dmThreadId == null) { + logVerbose(`Blocked telegram command in DM ${chatId}: requireTopic=true but no topic present`); + return null; + } + // For DMs, prefer per-DM/topic allowFrom (groupAllowOverride) over account-level allowFrom + const dmAllowFrom = groupAllowOverride ?? allowFrom; const senderId = msg.from?.id ? String(msg.from.id) : ""; const senderUsername = msg.from?.username ?? ""; @@ -254,9 +270,9 @@ async function resolveTelegramCommandAuth(params: { } const dmAllow = normalizeDmAllowFromWithStore({ - allowFrom: allowFrom, + allowFrom: dmAllowFrom, storeAllowFrom: isGroup ? [] : storeAllowFrom, - dmPolicy: telegramCfg.dmPolicy ?? "pairing", + dmPolicy: effectiveDmPolicy, }); const senderAllowed = isSenderAllowed({ allow: dmAllow, @@ -575,7 +591,7 @@ export const registerTelegramNativeCommands = ({ ChatType: isGroup ? "group" : "direct", ConversationLabel: conversationLabel, GroupSubject: isGroup ? (msg.chat.title ?? undefined) : undefined, - GroupSystemPrompt: isGroup ? groupSystemPrompt : undefined, + GroupSystemPrompt: isGroup || (!isGroup && groupConfig) ? groupSystemPrompt : undefined, SenderName: buildSenderName(msg), SenderId: senderId || undefined, SenderUsername: senderUsername || undefined, diff --git a/src/telegram/bot.test.ts b/src/telegram/bot.test.ts index bbd506982bf..e667b3a60f4 100644 --- a/src/telegram/bot.test.ts +++ b/src/telegram/bot.test.ts @@ -588,6 +588,87 @@ describe("createTelegramBot", () => { } }); + it("isolates inbound debounce by DM topic thread id", async () => { + const DEBOUNCE_MS = 4321; + onSpy.mockClear(); + replySpy.mockClear(); + loadConfig.mockReturnValue({ + agents: { + defaults: { + envelopeTimezone: "utc", + }, + }, + messages: { + inbound: { + debounceMs: DEBOUNCE_MS, + }, + }, + channels: { + telegram: { + dmPolicy: "open", + allowFrom: ["*"], + }, + }, + }); + + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + try { + createTelegramBot({ token: "tok" }); + const handler = getOnHandler("message") as (ctx: Record) => Promise; + + await handler({ + message: { + chat: { id: 7, type: "private" }, + text: "topic-100", + date: 1736380800, + message_id: 201, + message_thread_id: 100, + from: { id: 42, first_name: "Ada" }, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({}), + }); + await handler({ + message: { + chat: { id: 7, type: "private" }, + text: "topic-200", + date: 1736380801, + message_id: 202, + message_thread_id: 200, + from: { id: 42, first_name: "Ada" }, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({}), + }); + + expect(replySpy).not.toHaveBeenCalled(); + + const debounceTimerIndexes = setTimeoutSpy.mock.calls + .map((call, index) => ({ index, delay: call[1] })) + .filter((entry) => entry.delay === DEBOUNCE_MS) + .map((entry) => entry.index); + expect(debounceTimerIndexes.length).toBeGreaterThanOrEqual(2); + + for (const index of debounceTimerIndexes) { + clearTimeout(setTimeoutSpy.mock.results[index]?.value as ReturnType); + } + for (const index of debounceTimerIndexes) { + const flushTimer = setTimeoutSpy.mock.calls[index]?.[0] as (() => unknown) | undefined; + await flushTimer?.(); + } + + await vi.waitFor(() => { + expect(replySpy).toHaveBeenCalledTimes(2); + }); + const threadIds = replySpy.mock.calls + .map((call) => (call[0] as { MessageThreadId?: number }).MessageThreadId) + .toSorted((a, b) => (a ?? 0) - (b ?? 0)); + expect(threadIds).toEqual([100, 200]); + } finally { + setTimeoutSpy.mockRestore(); + } + }); + it("handles quote-only replies without reply metadata", async () => { onSpy.mockClear(); sendMessageSpy.mockClear(); diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index a501be23206..1c06da199c5 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -270,12 +270,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { const dmPolicy = telegramCfg.dmPolicy ?? "pairing"; const allowFrom = opts.allowFrom ?? telegramCfg.allowFrom; const groupAllowFrom = - opts.groupAllowFrom ?? - telegramCfg.groupAllowFrom ?? - (telegramCfg.allowFrom && telegramCfg.allowFrom.length > 0 - ? telegramCfg.allowFrom - : undefined) ?? - (opts.allowFrom && opts.allowFrom.length > 0 ? opts.allowFrom : undefined); + opts.groupAllowFrom ?? telegramCfg.groupAllowFrom ?? telegramCfg.allowFrom ?? allowFrom; const replyToMode = opts.replyToMode ?? telegramCfg.replyToMode ?? "off"; const nativeEnabled = resolveNativeCommandsEnabled({ providerId: "telegram", @@ -339,11 +334,25 @@ export function createTelegramBot(opts: TelegramBotOptions) { }); const resolveTelegramGroupConfig = (chatId: string | number, messageThreadId?: number) => { const groups = telegramCfg.groups; + const direct = telegramCfg.direct; + const chatIdStr = String(chatId); + const isDm = !chatIdStr.startsWith("-"); + + if (isDm) { + const directConfig = direct?.[chatIdStr] ?? direct?.["*"]; + if (directConfig) { + const topicConfig = + messageThreadId != null ? directConfig.topics?.[String(messageThreadId)] : undefined; + return { groupConfig: directConfig, topicConfig }; + } + // DMs without direct config: don't fall through to groups lookup + return { groupConfig: undefined, topicConfig: undefined }; + } + if (!groups) { return { groupConfig: undefined, topicConfig: undefined }; } - const groupKey = String(chatId); - const groupConfig = groups[groupKey] ?? groups["*"]; + const groupConfig = groups[chatIdStr] ?? groups["*"]; const topicConfig = messageThreadId != null ? groupConfig?.topics?.[String(messageThreadId)] : undefined; return { groupConfig, topicConfig }; diff --git a/src/telegram/bot/helpers.ts b/src/telegram/bot/helpers.ts index 11d9798e262..24e2ba47e70 100644 --- a/src/telegram/bot/helpers.ts +++ b/src/telegram/bot/helpers.ts @@ -1,7 +1,11 @@ import type { Chat, Message, MessageOrigin, User } from "@grammyjs/types"; import { formatLocationText, type NormalizedLocation } from "../../channels/location.js"; import { resolveTelegramPreviewStreamMode } from "../../config/discord-preview-streaming.js"; -import type { TelegramGroupConfig, TelegramTopicConfig } from "../../config/types.js"; +import type { + TelegramDirectConfig, + TelegramGroupConfig, + TelegramTopicConfig, +} from "../../config/types.js"; import { readChannelAllowFromStore } from "../../pairing/pairing-store.js"; import { normalizeAccountId } from "../../routing/session-key.js"; import { firstDefined, normalizeAllowFrom, type NormalizedAllowFrom } from "../bot-access.js"; @@ -17,33 +21,43 @@ export type TelegramThreadSpec = { export async function resolveTelegramGroupAllowFromContext(params: { chatId: string | number; accountId?: string; + isGroup?: boolean; isForum?: boolean; messageThreadId?: number | null; groupAllowFrom?: Array; resolveTelegramGroupConfig: ( chatId: string | number, messageThreadId?: number, - ) => { groupConfig?: TelegramGroupConfig; topicConfig?: TelegramTopicConfig }; + ) => { + groupConfig?: TelegramGroupConfig | TelegramDirectConfig; + topicConfig?: TelegramTopicConfig; + }; }): Promise<{ resolvedThreadId?: number; + dmThreadId?: number; storeAllowFrom: string[]; - groupConfig?: TelegramGroupConfig; + groupConfig?: TelegramGroupConfig | TelegramDirectConfig; topicConfig?: TelegramTopicConfig; groupAllowOverride?: Array; effectiveGroupAllow: NormalizedAllowFrom; hasGroupAllowOverride: boolean; }> { const accountId = normalizeAccountId(params.accountId); - const resolvedThreadId = resolveTelegramForumThreadId({ + // Use resolveTelegramThreadSpec to handle both forum groups AND DM topics + const threadSpec = resolveTelegramThreadSpec({ + isGroup: params.isGroup ?? false, isForum: params.isForum, messageThreadId: params.messageThreadId, }); + const resolvedThreadId = threadSpec.scope === "forum" ? threadSpec.id : undefined; + const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined; + const threadIdForConfig = resolvedThreadId ?? dmThreadId; const storeAllowFrom = await readChannelAllowFromStore("telegram", process.env, accountId).catch( () => [], ); const { groupConfig, topicConfig } = params.resolveTelegramGroupConfig( params.chatId, - resolvedThreadId, + threadIdForConfig, ); const groupAllowOverride = firstDefined(topicConfig?.allowFrom, groupConfig?.allowFrom); // Group sender access must remain explicit (groupAllowFrom/per-group allowFrom only). @@ -52,6 +66,7 @@ export async function resolveTelegramGroupAllowFromContext(params: { const hasGroupAllowOverride = typeof groupAllowOverride !== "undefined"; return { resolvedThreadId, + dmThreadId, storeAllowFrom, groupConfig, topicConfig, diff --git a/src/telegram/group-access.ts b/src/telegram/group-access.ts index 3eaa28731fb..363c7d490d5 100644 --- a/src/telegram/group-access.ts +++ b/src/telegram/group-access.ts @@ -3,6 +3,7 @@ import type { ChannelGroupPolicy } from "../config/group-policy.js"; import { resolveOpenProviderRuntimeGroupPolicy } from "../config/runtime-group-policy.js"; import type { TelegramAccountConfig, + TelegramDirectConfig, TelegramGroupConfig, TelegramTopicConfig, } from "../config/types.js"; @@ -20,7 +21,7 @@ export type TelegramGroupBaseAccessResult = export const evaluateTelegramGroupBaseAccess = (params: { isGroup: boolean; - groupConfig?: TelegramGroupConfig; + groupConfig?: TelegramGroupConfig | TelegramDirectConfig; topicConfig?: TelegramTopicConfig; hasGroupAllowOverride: boolean; effectiveGroupAllow: NormalizedAllowFrom; @@ -29,15 +30,34 @@ export const evaluateTelegramGroupBaseAccess = (params: { enforceAllowOverride: boolean; requireSenderForAllowOverride: boolean; }): TelegramGroupBaseAccessResult => { - if (!params.isGroup) { - return { allowed: true }; - } + // Check enabled flags for both groups and DMs if (params.groupConfig?.enabled === false) { return { allowed: false, reason: "group-disabled" }; } if (params.topicConfig?.enabled === false) { return { allowed: false, reason: "topic-disabled" }; } + if (!params.isGroup) { + // For DMs, check allowFrom override if present + if (params.enforceAllowOverride && params.hasGroupAllowOverride) { + if (!params.effectiveGroupAllow.hasEntries) { + return { allowed: false, reason: "group-override-unauthorized" }; + } + const senderId = params.senderId ?? ""; + if (params.requireSenderForAllowOverride && !senderId) { + return { allowed: false, reason: "group-override-unauthorized" }; + } + const allowed = isSenderAllowed({ + allow: params.effectiveGroupAllow, + senderId, + senderUsername: params.senderUsername ?? "", + }); + if (!allowed) { + return { allowed: false, reason: "group-override-unauthorized" }; + } + } + return { allowed: true }; + } if (!params.enforceAllowOverride || !params.hasGroupAllowOverride) { return { allowed: true }; } diff --git a/src/telegram/group-config-helpers.ts b/src/telegram/group-config-helpers.ts index 15f74e3dcd1..523f1df57e0 100644 --- a/src/telegram/group-config-helpers.ts +++ b/src/telegram/group-config-helpers.ts @@ -1,8 +1,12 @@ -import type { TelegramGroupConfig, TelegramTopicConfig } from "../config/types.js"; +import type { + TelegramDirectConfig, + TelegramGroupConfig, + TelegramTopicConfig, +} from "../config/types.js"; import { firstDefined } from "./bot-access.js"; export function resolveTelegramGroupPromptSettings(params: { - groupConfig?: TelegramGroupConfig; + groupConfig?: TelegramGroupConfig | TelegramDirectConfig; topicConfig?: TelegramTopicConfig; }): { skillFilter: string[] | undefined;