mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-07 22:44:16 +00:00
fix(telegram): route native topic commands to the active session (#38871)
* fix(telegram): resolve session entry for /stop in forum topics Fixes #38675 - Export normalizeStoreSessionKey from store.ts for reuse - Use it in resolveSessionEntryForKey so topic session keys (lowercase in store) are found when handling /stop - Add test for forum topic session key lookup * fix(telegram): share native topic routing with inbound messages * fix: land telegram topic routing follow-up (#38871) --------- Co-authored-by: xialonglee <li.xialong@xydigit.com>
This commit is contained in:
@@ -230,6 +230,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/Telegram polling health monitor: skip stale-socket restarts for Telegram long-polling channels and thread channel identity through shared health evaluation so polling connections are not restarted on the WebSocket stale-socket heuristic. (#38395) Thanks @ql-wade and @Takhoffman.
|
||||
- Daemon/systemd fresh-install probe: check for OpenClaw's managed user unit before running `systemctl --user is-enabled`, so first-time Linux installs no longer fail on generic missing-unit probe errors. (#38819) Thanks @adaHubble.
|
||||
- Gateway/Windows restart supervision: relaunch task-managed gateways through Scheduled Task with quoted helper-script command paths, distinguish restart-capable supervisors per platform, and stop orphaned Windows gateway children during self-restart. (#38825) Thanks @obviyus.
|
||||
- Telegram/native topic command routing: resolve forum-topic native commands through the same conversation route as inbound messages so topic `agentId` overrides and bound topic sessions target the active session instead of the default topic-parent session. (#38871) Thanks @obviyus.
|
||||
|
||||
## 2026.3.2
|
||||
|
||||
|
||||
@@ -356,6 +356,20 @@ describe("abort detection", () => {
|
||||
expect(resolveSessionEntryForKey(undefined, "session-1")).toEqual({});
|
||||
});
|
||||
|
||||
it("resolves Telegram forum topic session when lookup key has different casing than store", () => {
|
||||
// Store normalizes keys to lowercase; caller may pass mixed-case. /stop in topic must find entry.
|
||||
const storeKey = "agent:main:telegram:group:-1001234567890:topic:99";
|
||||
const lookupKey = "Agent:Main:Telegram:Group:-1001234567890:Topic:99";
|
||||
const store = {
|
||||
[storeKey]: { sessionId: "pi-topic-99", updatedAt: 0 },
|
||||
} as Record<string, { sessionId: string; updatedAt: number }>;
|
||||
// Direct lookup fails (store uses lowercase keys); normalization fallback must succeed.
|
||||
expect(store[lookupKey]).toBeUndefined();
|
||||
const result = resolveSessionEntryForKey(store, lookupKey);
|
||||
expect(result.entry?.sessionId).toBe("pi-topic-99");
|
||||
expect(result.key).toBe(storeKey);
|
||||
});
|
||||
|
||||
it("fast-aborts even when text commands are disabled", async () => {
|
||||
const { cfg } = await createAbortConfig({ commandsTextEnabled: false });
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ import {
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveSessionStoreEntry,
|
||||
resolveStorePath,
|
||||
type SessionEntry,
|
||||
updateSessionStore,
|
||||
@@ -172,13 +173,22 @@ export function formatAbortReplyText(stoppedSubagents?: number): string {
|
||||
export function resolveSessionEntryForKey(
|
||||
store: Record<string, SessionEntry> | undefined,
|
||||
sessionKey: string | undefined,
|
||||
) {
|
||||
): { entry?: SessionEntry; key?: string; legacyKeys?: string[] } {
|
||||
if (!store || !sessionKey) {
|
||||
return {};
|
||||
}
|
||||
const direct = store[sessionKey];
|
||||
if (direct) {
|
||||
return { entry: direct, key: sessionKey };
|
||||
const resolved = resolveSessionStoreEntry({ store, sessionKey });
|
||||
if (resolved.existing) {
|
||||
return resolved.legacyKeys.length > 0
|
||||
? {
|
||||
entry: resolved.existing,
|
||||
key: resolved.normalizedKey,
|
||||
legacyKeys: resolved.legacyKeys,
|
||||
}
|
||||
: {
|
||||
entry: resolved.existing,
|
||||
key: resolved.normalizedKey,
|
||||
};
|
||||
}
|
||||
return {};
|
||||
}
|
||||
@@ -301,7 +311,7 @@ export async function tryFastAbortFromMessage(params: {
|
||||
if (targetKey) {
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
||||
const store = loadSessionStore(storePath);
|
||||
const { entry, key } = resolveSessionEntryForKey(store, targetKey);
|
||||
const { entry, key, legacyKeys } = resolveSessionEntryForKey(store, targetKey);
|
||||
const resolvedTargetKey = key ?? targetKey;
|
||||
const acpManager = getAcpSessionManager();
|
||||
const acpResolution = acpManager.resolveSession({
|
||||
@@ -340,6 +350,11 @@ export async function tryFastAbortFromMessage(params: {
|
||||
applyAbortCutoffToSessionEntry(entry, abortCutoff);
|
||||
entry.updatedAt = Date.now();
|
||||
store[key] = entry;
|
||||
for (const legacyKey of legacyKeys ?? []) {
|
||||
if (legacyKey !== key) {
|
||||
delete store[legacyKey];
|
||||
}
|
||||
}
|
||||
await updateSessionStore(storePath, (nextStore) => {
|
||||
const nextEntry = nextStore[key] ?? entry;
|
||||
if (!nextEntry) {
|
||||
@@ -349,6 +364,11 @@ export async function tryFastAbortFromMessage(params: {
|
||||
applyAbortCutoffToSessionEntry(nextEntry, abortCutoff);
|
||||
nextEntry.updatedAt = Date.now();
|
||||
nextStore[key] = nextEntry;
|
||||
for (const legacyKey of legacyKeys ?? []) {
|
||||
if (legacyKey !== key) {
|
||||
delete nextStore[legacyKey];
|
||||
}
|
||||
}
|
||||
});
|
||||
} else if (abortKey) {
|
||||
setAbortMemory(abortKey, true);
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
|
||||
import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { loadSessionStore, resolveStorePath, type SessionEntry } from "../../config/sessions.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveSessionStoreEntry,
|
||||
resolveStorePath,
|
||||
type SessionEntry,
|
||||
} from "../../config/sessions.js";
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { fireAndForgetHook } from "../../hooks/fire-and-forget.js";
|
||||
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
|
||||
@@ -65,7 +70,7 @@ const isInboundAudioContext = (ctx: FinalizedMsgContext): boolean => {
|
||||
return AUDIO_HEADER_RE.test(trimmed);
|
||||
};
|
||||
|
||||
const resolveSessionStoreEntry = (
|
||||
const resolveSessionStoreLookup = (
|
||||
ctx: FinalizedMsgContext,
|
||||
cfg: OpenClawConfig,
|
||||
): {
|
||||
@@ -84,7 +89,7 @@ const resolveSessionStoreEntry = (
|
||||
const store = loadSessionStore(storePath);
|
||||
return {
|
||||
sessionKey,
|
||||
entry: store[sessionKey.toLowerCase()] ?? store[sessionKey],
|
||||
entry: resolveSessionStoreEntry({ store, sessionKey }).existing,
|
||||
};
|
||||
} catch {
|
||||
return {
|
||||
@@ -164,7 +169,7 @@ export async function dispatchReplyFromConfig(params: {
|
||||
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
|
||||
}
|
||||
|
||||
const sessionStoreEntry = resolveSessionStoreEntry(ctx, cfg);
|
||||
const sessionStoreEntry = resolveSessionStoreLookup(ctx, cfg);
|
||||
const acpDispatchSessionKey = sessionStoreEntry.sessionKey ?? sessionKey;
|
||||
const inboundAudio = isInboundAudioContext(ctx);
|
||||
const sessionTtsAuto = normalizeTtsAutoMode(sessionStoreEntry.entry?.ttsAuto);
|
||||
|
||||
@@ -108,11 +108,11 @@ function removeThreadFromDeliveryContext(context?: DeliveryContext): DeliveryCon
|
||||
return next;
|
||||
}
|
||||
|
||||
function normalizeStoreSessionKey(sessionKey: string): string {
|
||||
export function normalizeStoreSessionKey(sessionKey: string): string {
|
||||
return sessionKey.trim().toLowerCase();
|
||||
}
|
||||
|
||||
function resolveStoreSessionEntry(params: {
|
||||
export function resolveSessionStoreEntry(params: {
|
||||
store: Record<string, SessionEntry>;
|
||||
sessionKey: string;
|
||||
}): {
|
||||
@@ -275,7 +275,7 @@ export function readSessionUpdatedAt(params: {
|
||||
}): number | undefined {
|
||||
try {
|
||||
const store = loadSessionStore(params.storePath);
|
||||
const resolved = resolveStoreSessionEntry({ store, sessionKey: params.sessionKey });
|
||||
const resolved = resolveSessionStoreEntry({ store, sessionKey: params.sessionKey });
|
||||
return resolved.existing?.updatedAt;
|
||||
} catch {
|
||||
return undefined;
|
||||
@@ -611,7 +611,7 @@ async function writeSessionStoreAtomic(params: {
|
||||
async function persistResolvedSessionEntry(params: {
|
||||
storePath: string;
|
||||
store: Record<string, SessionEntry>;
|
||||
resolved: ReturnType<typeof resolveStoreSessionEntry>;
|
||||
resolved: ReturnType<typeof resolveSessionStoreEntry>;
|
||||
next: SessionEntry;
|
||||
}): Promise<SessionEntry> {
|
||||
params.store[params.resolved.normalizedKey] = params.next;
|
||||
@@ -734,7 +734,7 @@ export async function updateSessionStoreEntry(params: {
|
||||
const { storePath, sessionKey, update } = params;
|
||||
return await withSessionStoreLock(storePath, async () => {
|
||||
const store = loadSessionStore(storePath, { skipCache: true });
|
||||
const resolved = resolveStoreSessionEntry({ store, sessionKey });
|
||||
const resolved = resolveSessionStoreEntry({ store, sessionKey });
|
||||
const existing = resolved.existing;
|
||||
if (!existing) {
|
||||
return null;
|
||||
@@ -765,7 +765,7 @@ export async function recordSessionMetaFromInbound(params: {
|
||||
return await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
const resolved = resolveStoreSessionEntry({ store, sessionKey });
|
||||
const resolved = resolveSessionStoreEntry({ store, sessionKey });
|
||||
const existing = resolved.existing;
|
||||
const patch = deriveSessionMetaPatch({
|
||||
ctx,
|
||||
@@ -814,7 +814,7 @@ export async function updateLastRoute(params: {
|
||||
const { storePath, sessionKey, channel, to, accountId, threadId, ctx } = params;
|
||||
return await withSessionStoreLock(storePath, async () => {
|
||||
const store = loadSessionStore(storePath);
|
||||
const resolved = resolveStoreSessionEntry({ store, sessionKey });
|
||||
const resolved = resolveSessionStoreEntry({ store, sessionKey });
|
||||
const existing = resolved.existing;
|
||||
const now = Date.now();
|
||||
const explicitContext = normalizeDeliveryContext(params.deliveryContext);
|
||||
|
||||
@@ -16,7 +16,11 @@ import { shouldDebounceTextInbound } from "../channels/inbound-debounce-policy.j
|
||||
import { resolveChannelConfigWrites } from "../channels/plugins/config-writes.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { writeConfigFile } from "../config/io.js";
|
||||
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveSessionStoreEntry,
|
||||
resolveStorePath,
|
||||
} from "../config/sessions.js";
|
||||
import type { DmPolicy } from "../config/types.base.js";
|
||||
import type {
|
||||
TelegramDirectConfig,
|
||||
@@ -50,6 +54,7 @@ import {
|
||||
resolveTelegramGroupAllowFromContext,
|
||||
} from "./bot/helpers.js";
|
||||
import type { TelegramContext } from "./bot/types.js";
|
||||
import { resolveTelegramConversationRoute } from "./conversation-route.js";
|
||||
import { enforceTelegramDmAccess } from "./dm-access.js";
|
||||
import {
|
||||
evaluateTelegramGroupBaseAccess,
|
||||
@@ -268,9 +273,10 @@ export const registerTelegramHandlers = ({
|
||||
isForum: boolean;
|
||||
messageThreadId?: number;
|
||||
resolvedThreadId?: number;
|
||||
senderId?: string | number;
|
||||
}): {
|
||||
agentId: string;
|
||||
sessionEntry: ReturnType<typeof loadSessionStore>[string];
|
||||
sessionEntry: ReturnType<typeof loadSessionStore>[string] | undefined;
|
||||
model?: string;
|
||||
} => {
|
||||
const resolvedThreadId =
|
||||
@@ -279,26 +285,20 @@ export const registerTelegramHandlers = ({
|
||||
isForum: params.isForum,
|
||||
messageThreadId: params.messageThreadId,
|
||||
});
|
||||
const peerId = params.isGroup
|
||||
? buildTelegramGroupPeerId(params.chatId, resolvedThreadId)
|
||||
: String(params.chatId);
|
||||
const parentPeer = buildTelegramParentPeer({
|
||||
const dmThreadId = !params.isGroup ? params.messageThreadId : undefined;
|
||||
const topicThreadId = resolvedThreadId ?? dmThreadId;
|
||||
const { topicConfig } = resolveTelegramGroupConfig(params.chatId, topicThreadId);
|
||||
const { route } = resolveTelegramConversationRoute({
|
||||
cfg,
|
||||
accountId,
|
||||
chatId: params.chatId,
|
||||
isGroup: params.isGroup,
|
||||
resolvedThreadId,
|
||||
chatId: params.chatId,
|
||||
});
|
||||
const route = resolveAgentRoute({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId,
|
||||
peer: {
|
||||
kind: params.isGroup ? "group" : "direct",
|
||||
id: peerId,
|
||||
},
|
||||
parentPeer,
|
||||
replyThreadId: topicThreadId,
|
||||
senderId: params.senderId,
|
||||
topicAgentId: topicConfig?.agentId,
|
||||
});
|
||||
const baseSessionKey = route.sessionKey;
|
||||
const dmThreadId = !params.isGroup ? params.messageThreadId : undefined;
|
||||
const threadKeys =
|
||||
dmThreadId != null
|
||||
? resolveThreadSessionKeys({ baseSessionKey, threadId: `${params.chatId}:${dmThreadId}` })
|
||||
@@ -306,7 +306,7 @@ export const registerTelegramHandlers = ({
|
||||
const sessionKey = threadKeys?.sessionKey ?? baseSessionKey;
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId: route.agentId });
|
||||
const store = loadSessionStore(storePath);
|
||||
const entry = store[sessionKey];
|
||||
const entry = resolveSessionStoreEntry({ store, sessionKey }).existing;
|
||||
const storedOverride = resolveStoredModelOverride({
|
||||
sessionEntry: entry,
|
||||
sessionStore: store,
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
import type { Bot } from "grammy";
|
||||
import {
|
||||
ensureConfiguredAcpRouteReady,
|
||||
resolveConfiguredAcpRoute,
|
||||
} from "../acp/persistent-bindings.route.js";
|
||||
import { ensureConfiguredAcpRouteReady } from "../acp/persistent-bindings.route.js";
|
||||
import { resolveAckReaction } from "../agents/identity.js";
|
||||
import {
|
||||
findModelInCatalog,
|
||||
@@ -42,19 +39,7 @@ import type {
|
||||
} from "../config/types.js";
|
||||
import { logVerbose, shouldLogVerbose } from "../globals.js";
|
||||
import { recordChannelActivity } from "../infra/channel-activity.js";
|
||||
import { getSessionBindingService } from "../infra/outbound/session-binding-service.js";
|
||||
import {
|
||||
buildAgentSessionKey,
|
||||
pickFirstExistingAgentId,
|
||||
resolveAgentRoute,
|
||||
type ResolvedAgentRoute,
|
||||
} from "../routing/resolve-route.js";
|
||||
import {
|
||||
DEFAULT_ACCOUNT_ID,
|
||||
buildAgentMainSessionKey,
|
||||
resolveAgentIdFromSessionKey,
|
||||
resolveThreadSessionKeys,
|
||||
} from "../routing/session-key.js";
|
||||
import { DEFAULT_ACCOUNT_ID, resolveThreadSessionKeys } from "../routing/session-key.js";
|
||||
import { resolvePinnedMainDmOwnerFromAllowlist } from "../security/dm-policy-shared.js";
|
||||
import { withTelegramApiErrorLogging } from "./api-logging.js";
|
||||
import {
|
||||
@@ -67,10 +52,8 @@ import {
|
||||
buildGroupLabel,
|
||||
buildSenderLabel,
|
||||
buildSenderName,
|
||||
resolveTelegramDirectPeerId,
|
||||
buildTelegramGroupFrom,
|
||||
buildTelegramGroupPeerId,
|
||||
buildTelegramParentPeer,
|
||||
buildTypingThreadParams,
|
||||
resolveTelegramMediaPlaceholder,
|
||||
expandTextLinks,
|
||||
@@ -81,6 +64,7 @@ import {
|
||||
resolveTelegramThreadSpec,
|
||||
} from "./bot/helpers.js";
|
||||
import type { StickerMetadata, TelegramContext } from "./bot/types.js";
|
||||
import { resolveTelegramConversationRoute } from "./conversation-route.js";
|
||||
import { enforceTelegramDmAccess } from "./dm-access.js";
|
||||
import { isTelegramForumServiceMessage } from "./forum-service-message.js";
|
||||
import { evaluateTelegramGroupBaseAccess } from "./group-access.js";
|
||||
@@ -209,89 +193,21 @@ export const buildTelegramMessageContext = async ({
|
||||
!isGroup && groupConfig && "dmPolicy" in groupConfig
|
||||
? (groupConfig.dmPolicy ?? dmPolicy)
|
||||
: dmPolicy;
|
||||
const peerId = isGroup
|
||||
? buildTelegramGroupPeerId(chatId, resolvedThreadId)
|
||||
: resolveTelegramDirectPeerId({ chatId, senderId });
|
||||
const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId });
|
||||
// Fresh config for bindings lookup; other routing inputs are payload-derived.
|
||||
const freshCfg = loadConfig();
|
||||
let route: ResolvedAgentRoute = resolveAgentRoute({
|
||||
let { route, configuredBinding, configuredBindingSessionKey } = resolveTelegramConversationRoute({
|
||||
cfg: freshCfg,
|
||||
channel: "telegram",
|
||||
accountId: account.accountId,
|
||||
peer: {
|
||||
kind: isGroup ? "group" : "direct",
|
||||
id: peerId,
|
||||
},
|
||||
parentPeer,
|
||||
chatId,
|
||||
isGroup,
|
||||
resolvedThreadId,
|
||||
replyThreadId,
|
||||
senderId,
|
||||
topicAgentId: topicConfig?.agentId,
|
||||
});
|
||||
// Per-topic agentId override: re-derive session key under the topic's agent.
|
||||
const rawTopicAgentId = topicConfig?.agentId?.trim();
|
||||
if (rawTopicAgentId) {
|
||||
// Validate agentId against configured agents; falls back to default if not found.
|
||||
const topicAgentId = pickFirstExistingAgentId(freshCfg, rawTopicAgentId);
|
||||
const overrideSessionKey = buildAgentSessionKey({
|
||||
agentId: topicAgentId,
|
||||
channel: "telegram",
|
||||
accountId: account.accountId,
|
||||
peer: { kind: isGroup ? "group" : "direct", id: peerId },
|
||||
dmScope: freshCfg.session?.dmScope,
|
||||
identityLinks: freshCfg.session?.identityLinks,
|
||||
}).toLowerCase();
|
||||
const overrideMainSessionKey = buildAgentMainSessionKey({
|
||||
agentId: topicAgentId,
|
||||
}).toLowerCase();
|
||||
route = {
|
||||
...route,
|
||||
agentId: topicAgentId,
|
||||
sessionKey: overrideSessionKey,
|
||||
mainSessionKey: overrideMainSessionKey,
|
||||
};
|
||||
logVerbose(
|
||||
`telegram: per-topic agent override: topic=${resolvedThreadId ?? dmThreadId} agent=${topicAgentId} sessionKey=${overrideSessionKey}`,
|
||||
);
|
||||
}
|
||||
const configuredRoute = resolveConfiguredAcpRoute({
|
||||
cfg: freshCfg,
|
||||
route,
|
||||
channel: "telegram",
|
||||
accountId: account.accountId,
|
||||
conversationId: peerId,
|
||||
parentConversationId: isGroup ? String(chatId) : undefined,
|
||||
});
|
||||
let configuredBinding = configuredRoute.configuredBinding;
|
||||
let configuredBindingSessionKey = configuredRoute.boundSessionKey ?? "";
|
||||
route = configuredRoute.route;
|
||||
const threadBindingConversationId =
|
||||
replyThreadId != null
|
||||
? `${chatId}:topic:${replyThreadId}`
|
||||
: !isGroup
|
||||
? String(chatId)
|
||||
: undefined;
|
||||
if (threadBindingConversationId) {
|
||||
const threadBinding = getSessionBindingService().resolveByConversation({
|
||||
channel: "telegram",
|
||||
accountId: account.accountId,
|
||||
conversationId: threadBindingConversationId,
|
||||
});
|
||||
const boundSessionKey = threadBinding?.targetSessionKey?.trim();
|
||||
if (threadBinding && boundSessionKey) {
|
||||
route = {
|
||||
...route,
|
||||
sessionKey: boundSessionKey,
|
||||
agentId: resolveAgentIdFromSessionKey(boundSessionKey),
|
||||
matchedBy: "binding.channel",
|
||||
};
|
||||
configuredBinding = null;
|
||||
configuredBindingSessionKey = "";
|
||||
getSessionBindingService().touch(threadBinding.bindingId);
|
||||
logVerbose(
|
||||
`telegram: routed via bound conversation ${threadBindingConversationId} -> ${boundSessionKey}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
const requiresExplicitAccountBinding = (candidate: ResolvedAgentRoute): boolean =>
|
||||
candidate.accountId !== DEFAULT_ACCOUNT_ID && candidate.matchedBy === "default";
|
||||
const requiresExplicitAccountBinding = (
|
||||
candidate: ReturnType<typeof resolveTelegramConversationRoute>["route"],
|
||||
): boolean => candidate.accountId !== DEFAULT_ACCOUNT_ID && candidate.matchedBy === "default";
|
||||
// Fail closed for named Telegram accounts when route resolution falls back to
|
||||
// default-agent routing. This prevents cross-account DM/session contamination.
|
||||
if (requiresExplicitAccountBinding(route)) {
|
||||
|
||||
@@ -30,10 +30,14 @@ vi.mock("./send.js", () => ({
|
||||
editMessageTelegram,
|
||||
}));
|
||||
|
||||
vi.mock("../config/sessions.js", async () => ({
|
||||
loadSessionStore,
|
||||
resolveStorePath,
|
||||
}));
|
||||
vi.mock("../config/sessions.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("../config/sessions.js")>();
|
||||
return {
|
||||
...actual,
|
||||
loadSessionStore,
|
||||
resolveStorePath,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("./sticker-cache.js", () => ({
|
||||
cacheSticker: vi.fn(),
|
||||
|
||||
@@ -15,7 +15,11 @@ import { logAckFailure, logTypingFailure } from "../channels/logging.js";
|
||||
import { createReplyPrefixOptions } from "../channels/reply-prefix.js";
|
||||
import { createTypingCallbacks } from "../channels/typing.js";
|
||||
import { resolveMarkdownTableMode } from "../config/markdown-tables.js";
|
||||
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveSessionStoreEntry,
|
||||
resolveStorePath,
|
||||
} from "../config/sessions.js";
|
||||
import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js";
|
||||
import { danger, logVerbose } from "../globals.js";
|
||||
import { getAgentScopedMediaLocalRoots } from "../media/local-roots.js";
|
||||
@@ -117,7 +121,7 @@ function resolveTelegramReasoningLevel(params: {
|
||||
try {
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
||||
const store = loadSessionStore(storePath, { skipCache: true });
|
||||
const entry = store[sessionKey.toLowerCase()] ?? store[sessionKey];
|
||||
const entry = resolveSessionStoreEntry({ store, sessionKey }).existing;
|
||||
const level = entry?.reasoningLevel;
|
||||
if (level === "on" || level === "stream") {
|
||||
return level;
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { registerTelegramNativeCommands } from "./bot-native-commands.js";
|
||||
import {
|
||||
registerTelegramNativeCommands,
|
||||
type RegisterTelegramHandlerParams,
|
||||
} from "./bot-native-commands.js";
|
||||
import { createNativeCommandTestParams } from "./bot-native-commands.test-helpers.js";
|
||||
|
||||
// All mocks scoped to this file only — does not affect bot-native-commands.test.ts
|
||||
@@ -24,6 +27,12 @@ const sessionMocks = vi.hoisted(() => ({
|
||||
const replyMocks = vi.hoisted(() => ({
|
||||
dispatchReplyWithBufferedBlockDispatcher: vi.fn(async () => undefined),
|
||||
}));
|
||||
const sessionBindingMocks = vi.hoisted(() => ({
|
||||
resolveByConversation: vi.fn<
|
||||
(ref: unknown) => { bindingId: string; targetSessionKey: string } | null
|
||||
>(() => null),
|
||||
touch: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("../acp/persistent-bindings.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("../acp/persistent-bindings.js")>();
|
||||
@@ -49,6 +58,16 @@ vi.mock("../auto-reply/reply/provider-dispatcher.js", () => ({
|
||||
vi.mock("../channels/reply-prefix.js", () => ({
|
||||
createReplyPrefixOptions: vi.fn(() => ({ onModelSelected: () => {} })),
|
||||
}));
|
||||
vi.mock("../infra/outbound/session-binding-service.js", () => ({
|
||||
getSessionBindingService: () => ({
|
||||
bind: vi.fn(),
|
||||
getCapabilities: vi.fn(),
|
||||
listBySession: vi.fn(),
|
||||
resolveByConversation: (ref: unknown) => sessionBindingMocks.resolveByConversation(ref),
|
||||
touch: (bindingId: string, at?: number) => sessionBindingMocks.touch(bindingId, at),
|
||||
unbind: vi.fn(),
|
||||
}),
|
||||
}));
|
||||
vi.mock("../auto-reply/skill-commands.js", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("../auto-reply/skill-commands.js")>();
|
||||
return { ...actual, listSkillCommandsForAgents: vi.fn(() => []) };
|
||||
@@ -106,11 +125,12 @@ function registerAndResolveStatusHandler(params: {
|
||||
cfg: OpenClawConfig;
|
||||
allowFrom?: string[];
|
||||
groupAllowFrom?: string[];
|
||||
resolveTelegramGroupConfig?: RegisterTelegramHandlerParams["resolveTelegramGroupConfig"];
|
||||
}): {
|
||||
handler: TelegramCommandHandler;
|
||||
sendMessage: ReturnType<typeof vi.fn>;
|
||||
} {
|
||||
const { cfg, allowFrom, groupAllowFrom } = params;
|
||||
const { cfg, allowFrom, groupAllowFrom, resolveTelegramGroupConfig } = params;
|
||||
const commandHandlers = new Map<string, TelegramCommandHandler>();
|
||||
const sendMessage = vi.fn().mockResolvedValue(undefined);
|
||||
registerTelegramNativeCommands({
|
||||
@@ -127,6 +147,7 @@ function registerAndResolveStatusHandler(params: {
|
||||
cfg,
|
||||
allowFrom: allowFrom ?? ["*"],
|
||||
groupAllowFrom: groupAllowFrom ?? [],
|
||||
resolveTelegramGroupConfig,
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -141,11 +162,19 @@ function registerAndResolveCommandHandler(params: {
|
||||
allowFrom?: string[];
|
||||
groupAllowFrom?: string[];
|
||||
useAccessGroups?: boolean;
|
||||
resolveTelegramGroupConfig?: RegisterTelegramHandlerParams["resolveTelegramGroupConfig"];
|
||||
}): {
|
||||
handler: TelegramCommandHandler;
|
||||
sendMessage: ReturnType<typeof vi.fn>;
|
||||
} {
|
||||
const { commandName, cfg, allowFrom, groupAllowFrom, useAccessGroups } = params;
|
||||
const {
|
||||
commandName,
|
||||
cfg,
|
||||
allowFrom,
|
||||
groupAllowFrom,
|
||||
useAccessGroups,
|
||||
resolveTelegramGroupConfig,
|
||||
} = params;
|
||||
const commandHandlers = new Map<string, TelegramCommandHandler>();
|
||||
const sendMessage = vi.fn().mockResolvedValue(undefined);
|
||||
registerTelegramNativeCommands({
|
||||
@@ -163,6 +192,7 @@ function registerAndResolveCommandHandler(params: {
|
||||
allowFrom: allowFrom ?? [],
|
||||
groupAllowFrom: groupAllowFrom ?? [],
|
||||
useAccessGroups: useAccessGroups ?? true,
|
||||
resolveTelegramGroupConfig,
|
||||
}),
|
||||
});
|
||||
|
||||
@@ -183,6 +213,8 @@ describe("registerTelegramNativeCommands — session metadata", () => {
|
||||
sessionMocks.recordSessionMetaFromInbound.mockClear().mockResolvedValue(undefined);
|
||||
sessionMocks.resolveStorePath.mockClear().mockReturnValue("/tmp/openclaw-sessions.json");
|
||||
replyMocks.dispatchReplyWithBufferedBlockDispatcher.mockClear().mockResolvedValue(undefined);
|
||||
sessionBindingMocks.resolveByConversation.mockReset().mockReturnValue(null);
|
||||
sessionBindingMocks.touch.mockReset();
|
||||
});
|
||||
|
||||
it("calls recordSessionMetaFromInbound after a native slash command", async () => {
|
||||
@@ -273,6 +305,58 @@ describe("registerTelegramNativeCommands — session metadata", () => {
|
||||
expect(sessionMetaCall?.sessionKey).toBe("agent:codex:telegram:slash:200");
|
||||
});
|
||||
|
||||
it("routes Telegram native commands through topic-specific agent sessions", async () => {
|
||||
const { handler } = registerAndResolveStatusHandler({
|
||||
cfg: {},
|
||||
allowFrom: ["200"],
|
||||
groupAllowFrom: ["200"],
|
||||
resolveTelegramGroupConfig: () => ({
|
||||
groupConfig: { requireMention: false },
|
||||
topicConfig: { agentId: "zu" },
|
||||
}),
|
||||
});
|
||||
await handler(buildStatusTopicCommandContext());
|
||||
|
||||
const dispatchCall = (
|
||||
replyMocks.dispatchReplyWithBufferedBlockDispatcher.mock.calls as unknown as Array<
|
||||
[{ ctx?: { CommandTargetSessionKey?: string } }]
|
||||
>
|
||||
)[0]?.[0];
|
||||
expect(dispatchCall?.ctx?.CommandTargetSessionKey).toBe(
|
||||
"agent:zu:telegram:group:-1001234567890:topic:42",
|
||||
);
|
||||
});
|
||||
|
||||
it("routes Telegram native commands through bound topic sessions", async () => {
|
||||
sessionBindingMocks.resolveByConversation.mockReturnValue({
|
||||
bindingId: "default:-1001234567890:topic:42",
|
||||
targetSessionKey: "agent:codex-acp:session-1",
|
||||
});
|
||||
|
||||
const { handler } = registerAndResolveStatusHandler({
|
||||
cfg: {},
|
||||
allowFrom: ["200"],
|
||||
groupAllowFrom: ["200"],
|
||||
});
|
||||
await handler(buildStatusTopicCommandContext());
|
||||
|
||||
expect(sessionBindingMocks.resolveByConversation).toHaveBeenCalledWith({
|
||||
channel: "telegram",
|
||||
accountId: "default",
|
||||
conversationId: "-1001234567890:topic:42",
|
||||
});
|
||||
const dispatchCall = (
|
||||
replyMocks.dispatchReplyWithBufferedBlockDispatcher.mock.calls as unknown as Array<
|
||||
[{ ctx?: { CommandTargetSessionKey?: string } }]
|
||||
>
|
||||
)[0]?.[0];
|
||||
expect(dispatchCall?.ctx?.CommandTargetSessionKey).toBe("agent:codex-acp:session-1");
|
||||
expect(sessionBindingMocks.touch).toHaveBeenCalledWith(
|
||||
"default:-1001234567890:topic:42",
|
||||
undefined,
|
||||
);
|
||||
});
|
||||
|
||||
it("aborts native command dispatch when configured ACP topic binding cannot initialize", async () => {
|
||||
const boundSessionKey = "agent:codex:acp:binding:telegram:default:feedface";
|
||||
persistentBindingMocks.resolveConfiguredAcpBindingRecord.mockReturnValue({
|
||||
|
||||
@@ -19,6 +19,7 @@ export function createNativeCommandTestParams(params: {
|
||||
nativeEnabled?: boolean;
|
||||
nativeSkillsEnabled?: boolean;
|
||||
nativeDisabledExplicit?: boolean;
|
||||
resolveTelegramGroupConfig?: RegisterTelegramNativeCommandParams["resolveTelegramGroupConfig"];
|
||||
opts?: RegisterTelegramNativeCommandParams["opts"];
|
||||
}): RegisterTelegramNativeCommandParams {
|
||||
return {
|
||||
@@ -36,10 +37,12 @@ export function createNativeCommandTestParams(params: {
|
||||
nativeSkillsEnabled: params.nativeSkillsEnabled ?? true,
|
||||
nativeDisabledExplicit: params.nativeDisabledExplicit ?? false,
|
||||
resolveGroupPolicy: () => ({ allowlistEnabled: false, allowed: true }),
|
||||
resolveTelegramGroupConfig: () => ({
|
||||
groupConfig: undefined,
|
||||
topicConfig: undefined,
|
||||
}),
|
||||
resolveTelegramGroupConfig:
|
||||
params.resolveTelegramGroupConfig ??
|
||||
(() => ({
|
||||
groupConfig: undefined,
|
||||
topicConfig: undefined,
|
||||
})),
|
||||
shouldSkipUpdate: () => false,
|
||||
opts: params.opts ?? { token: "token" },
|
||||
};
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
import type { Bot, Context } from "grammy";
|
||||
import {
|
||||
ensureConfiguredAcpRouteReady,
|
||||
resolveConfiguredAcpRoute,
|
||||
} from "../acp/persistent-bindings.route.js";
|
||||
import { ensureConfiguredAcpRouteReady } from "../acp/persistent-bindings.route.js";
|
||||
import { resolveChunkMode } from "../auto-reply/chunk.js";
|
||||
import type { CommandArgs } from "../auto-reply/commands-registry.js";
|
||||
import {
|
||||
@@ -60,12 +57,11 @@ import {
|
||||
buildTelegramThreadParams,
|
||||
buildSenderName,
|
||||
buildTelegramGroupFrom,
|
||||
buildTelegramGroupPeerId,
|
||||
buildTelegramParentPeer,
|
||||
resolveTelegramGroupAllowFromContext,
|
||||
resolveTelegramThreadSpec,
|
||||
} from "./bot/helpers.js";
|
||||
import type { TelegramContext } from "./bot/types.js";
|
||||
import { resolveTelegramConversationRoute } from "./conversation-route.js";
|
||||
import {
|
||||
evaluateTelegramGroupBaseAccess,
|
||||
evaluateTelegramGroupPolicyAccess,
|
||||
@@ -424,15 +420,17 @@ export const registerTelegramNativeCommands = ({
|
||||
isGroup: boolean;
|
||||
isForum: boolean;
|
||||
resolvedThreadId?: number;
|
||||
senderId?: string;
|
||||
topicAgentId?: string;
|
||||
}): Promise<{
|
||||
chatId: number;
|
||||
threadSpec: ReturnType<typeof resolveTelegramThreadSpec>;
|
||||
route: ReturnType<typeof resolveAgentRoute>;
|
||||
route: ReturnType<typeof resolveTelegramConversationRoute>["route"];
|
||||
mediaLocalRoots: readonly string[] | undefined;
|
||||
tableMode: ReturnType<typeof resolveMarkdownTableMode>;
|
||||
chunkMode: ReturnType<typeof resolveChunkMode>;
|
||||
} | null> => {
|
||||
const { msg, isGroup, isForum, resolvedThreadId } = params;
|
||||
const { msg, isGroup, isForum, resolvedThreadId, senderId, topicAgentId } = params;
|
||||
const chatId = msg.chat.id;
|
||||
const messageThreadId = (msg as { message_thread_id?: number }).message_thread_id;
|
||||
const threadSpec = resolveTelegramThreadSpec({
|
||||
@@ -440,28 +438,16 @@ export const registerTelegramNativeCommands = ({
|
||||
isForum,
|
||||
messageThreadId,
|
||||
});
|
||||
const parentPeer = buildTelegramParentPeer({ isGroup, resolvedThreadId, chatId });
|
||||
const peerId = isGroup ? buildTelegramGroupPeerId(chatId, resolvedThreadId) : String(chatId);
|
||||
let route = resolveAgentRoute({
|
||||
let { route, configuredBinding } = resolveTelegramConversationRoute({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId,
|
||||
peer: {
|
||||
kind: isGroup ? "group" : "direct",
|
||||
id: peerId,
|
||||
},
|
||||
parentPeer,
|
||||
chatId,
|
||||
isGroup,
|
||||
resolvedThreadId,
|
||||
replyThreadId: threadSpec.id,
|
||||
senderId,
|
||||
topicAgentId,
|
||||
});
|
||||
const configuredRoute = resolveConfiguredAcpRoute({
|
||||
cfg,
|
||||
route,
|
||||
channel: "telegram",
|
||||
accountId,
|
||||
conversationId: peerId,
|
||||
parentConversationId: isGroup ? String(chatId) : undefined,
|
||||
});
|
||||
const configuredBinding = configuredRoute.configuredBinding;
|
||||
route = configuredRoute.route;
|
||||
if (configuredBinding) {
|
||||
const ensured = await ensureConfiguredAcpRouteReady({
|
||||
cfg,
|
||||
@@ -562,6 +548,8 @@ export const registerTelegramNativeCommands = ({
|
||||
isGroup,
|
||||
isForum,
|
||||
resolvedThreadId,
|
||||
senderId,
|
||||
topicAgentId: topicConfig?.agentId,
|
||||
});
|
||||
if (!runtimeContext) {
|
||||
return;
|
||||
@@ -788,6 +776,8 @@ export const registerTelegramNativeCommands = ({
|
||||
isGroup,
|
||||
isForum,
|
||||
resolvedThreadId,
|
||||
senderId,
|
||||
topicAgentId: auth.topicConfig?.agentId,
|
||||
});
|
||||
if (!runtimeContext) {
|
||||
return;
|
||||
|
||||
122
src/telegram/conversation-route.ts
Normal file
122
src/telegram/conversation-route.ts
Normal file
@@ -0,0 +1,122 @@
|
||||
import { resolveConfiguredAcpRoute } from "../acp/persistent-bindings.route.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { logVerbose } from "../globals.js";
|
||||
import { getSessionBindingService } from "../infra/outbound/session-binding-service.js";
|
||||
import {
|
||||
buildAgentSessionKey,
|
||||
pickFirstExistingAgentId,
|
||||
resolveAgentRoute,
|
||||
} from "../routing/resolve-route.js";
|
||||
import { buildAgentMainSessionKey, resolveAgentIdFromSessionKey } from "../routing/session-key.js";
|
||||
import {
|
||||
buildTelegramGroupPeerId,
|
||||
buildTelegramParentPeer,
|
||||
resolveTelegramDirectPeerId,
|
||||
} from "./bot/helpers.js";
|
||||
|
||||
export function resolveTelegramConversationRoute(params: {
|
||||
cfg: OpenClawConfig;
|
||||
accountId: string;
|
||||
chatId: number | string;
|
||||
isGroup: boolean;
|
||||
resolvedThreadId?: number;
|
||||
replyThreadId?: number;
|
||||
senderId?: string | number | null;
|
||||
topicAgentId?: string | null;
|
||||
}): {
|
||||
route: ReturnType<typeof resolveAgentRoute>;
|
||||
configuredBinding: ReturnType<typeof resolveConfiguredAcpRoute>["configuredBinding"];
|
||||
configuredBindingSessionKey: string;
|
||||
} {
|
||||
const peerId = params.isGroup
|
||||
? buildTelegramGroupPeerId(params.chatId, params.resolvedThreadId)
|
||||
: resolveTelegramDirectPeerId({
|
||||
chatId: params.chatId,
|
||||
senderId: params.senderId,
|
||||
});
|
||||
const parentPeer = buildTelegramParentPeer({
|
||||
isGroup: params.isGroup,
|
||||
resolvedThreadId: params.resolvedThreadId,
|
||||
chatId: params.chatId,
|
||||
});
|
||||
let route = resolveAgentRoute({
|
||||
cfg: params.cfg,
|
||||
channel: "telegram",
|
||||
accountId: params.accountId,
|
||||
peer: {
|
||||
kind: params.isGroup ? "group" : "direct",
|
||||
id: peerId,
|
||||
},
|
||||
parentPeer,
|
||||
});
|
||||
|
||||
const rawTopicAgentId = params.topicAgentId?.trim();
|
||||
if (rawTopicAgentId) {
|
||||
const topicAgentId = pickFirstExistingAgentId(params.cfg, rawTopicAgentId);
|
||||
route = {
|
||||
...route,
|
||||
agentId: topicAgentId,
|
||||
sessionKey: buildAgentSessionKey({
|
||||
agentId: topicAgentId,
|
||||
channel: "telegram",
|
||||
accountId: params.accountId,
|
||||
peer: { kind: params.isGroup ? "group" : "direct", id: peerId },
|
||||
dmScope: params.cfg.session?.dmScope,
|
||||
identityLinks: params.cfg.session?.identityLinks,
|
||||
}).toLowerCase(),
|
||||
mainSessionKey: buildAgentMainSessionKey({
|
||||
agentId: topicAgentId,
|
||||
}).toLowerCase(),
|
||||
};
|
||||
logVerbose(
|
||||
`telegram: topic route override: topic=${params.resolvedThreadId ?? params.replyThreadId} agent=${topicAgentId} sessionKey=${route.sessionKey}`,
|
||||
);
|
||||
}
|
||||
|
||||
const configuredRoute = resolveConfiguredAcpRoute({
|
||||
cfg: params.cfg,
|
||||
route,
|
||||
channel: "telegram",
|
||||
accountId: params.accountId,
|
||||
conversationId: peerId,
|
||||
parentConversationId: params.isGroup ? String(params.chatId) : undefined,
|
||||
});
|
||||
let configuredBinding = configuredRoute.configuredBinding;
|
||||
let configuredBindingSessionKey = configuredRoute.boundSessionKey ?? "";
|
||||
route = configuredRoute.route;
|
||||
|
||||
const threadBindingConversationId =
|
||||
params.replyThreadId != null
|
||||
? `${params.chatId}:topic:${params.replyThreadId}`
|
||||
: !params.isGroup
|
||||
? String(params.chatId)
|
||||
: undefined;
|
||||
if (threadBindingConversationId) {
|
||||
const threadBinding = getSessionBindingService().resolveByConversation({
|
||||
channel: "telegram",
|
||||
accountId: params.accountId,
|
||||
conversationId: threadBindingConversationId,
|
||||
});
|
||||
const boundSessionKey = threadBinding?.targetSessionKey?.trim();
|
||||
if (threadBinding && boundSessionKey) {
|
||||
route = {
|
||||
...route,
|
||||
sessionKey: boundSessionKey,
|
||||
agentId: resolveAgentIdFromSessionKey(boundSessionKey),
|
||||
matchedBy: "binding.channel",
|
||||
};
|
||||
configuredBinding = null;
|
||||
configuredBindingSessionKey = "";
|
||||
getSessionBindingService().touch(threadBinding.bindingId);
|
||||
logVerbose(
|
||||
`telegram: routed via bound conversation ${threadBindingConversationId} -> ${boundSessionKey}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
route,
|
||||
configuredBinding,
|
||||
configuredBindingSessionKey,
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user