From afd7339bd2ec2ab299a6723436480bee9e1e72df Mon Sep 17 00:00:00 2001 From: VACInc <3279061+VACInc@users.noreply.github.com> Date: Mon, 11 May 2026 22:41:17 -0400 Subject: [PATCH] fix telegram context session start boundary --- .../telegram/src/bot-handlers.runtime.ts | 68 +++++++++- .../telegram/src/bot-message-context.types.ts | 1 + extensions/telegram/src/bot-updates.ts | 1 + extensions/telegram/src/bot.test.ts | 122 ++++++++++++++++++ extensions/telegram/src/message-cache.test.ts | 88 +++++++++++++ extensions/telegram/src/message-cache.ts | 24 ++++ 6 files changed, 301 insertions(+), 3 deletions(-) diff --git a/extensions/telegram/src/bot-handlers.runtime.ts b/extensions/telegram/src/bot-handlers.runtime.ts index 3b8ac73a4af..5533a10b2f7 100644 --- a/extensions/telegram/src/bot-handlers.runtime.ts +++ b/extensions/telegram/src/bot-handlers.runtime.ts @@ -179,6 +179,7 @@ export const registerTelegramHandlers = ({ key: string; threadId?: number; messages: Array<{ msg: Message; ctx: TelegramContext; receivedAtMs: number }>; + promptContextMinTimestampMs?: number; timer: ReturnType; }; const textFragmentBuffer = new Map(); @@ -197,6 +198,22 @@ export const registerTelegramHandlers = ({ debounceLane: TelegramDebounceLane; botUsername?: string; threadId?: number; + promptContextMinTimestampMs?: number; + }; + const normalizePromptContextMinTimestampMs = (timestampMs?: number) => + typeof timestampMs === "number" && Number.isFinite(timestampMs) ? timestampMs : undefined; + const latestPromptContextMinTimestampMs = ( + ...timestamps: Array + ): number | undefined => { + let latest: number | undefined; + for (const timestampMs of timestamps) { + const normalized = normalizePromptContextMinTimestampMs(timestampMs); + if (normalized === undefined) { + continue; + } + latest = latest === undefined ? normalized : Math.max(latest, normalized); + } + return latest; }; const resolveTelegramDebounceLane = (msg: Message): TelegramDebounceLane => { const forwardMeta = msg as { @@ -394,9 +411,13 @@ export const registerTelegramHandlers = ({ return; } if (entries.length === 1) { + const promptContextMinTimestampMs = normalizePromptContextMinTimestampMs( + last.promptContextMinTimestampMs, + ); await processMessageWithReplyChain(last.ctx, last.msg, last.allMedia, last.storeAllowFrom, { receivedAtMs: last.receivedAtMs, ingressBuffer: "inbound-debounce", + ...(promptContextMinTimestampMs !== undefined ? { promptContextMinTimestampMs } : {}), }); return; } @@ -409,6 +430,9 @@ export const registerTelegramHandlers = ({ return; } const first = entries[0]; + const promptContextMinTimestampMs = latestPromptContextMinTimestampMs( + ...entries.map((entry) => entry.promptContextMinTimestampMs), + ); const baseCtx = first.ctx; const syntheticMessage = buildSyntheticTextMessage({ base: first.msg, @@ -426,6 +450,7 @@ export const registerTelegramHandlers = ({ ...(messageIdOverride ? { messageIdOverride } : {}), receivedAtMs: first.receivedAtMs, ingressBuffer: "inbound-debounce", + ...(promptContextMinTimestampMs !== undefined ? { promptContextMinTimestampMs } : {}), }, ); }, @@ -454,13 +479,14 @@ export const registerTelegramHandlers = ({ messageThreadId?: number; resolvedThreadId?: number; senderId?: string | number; + runtimeCfg?: OpenClawConfig; }): { agentId: string; sessionEntry: ReturnType["existing"]; sessionKey: string; model?: string; } => { - const runtimeCfg = telegramDeps.getRuntimeConfig(); + const runtimeCfg = params.runtimeCfg ?? telegramDeps.getRuntimeConfig(); const resolvedThreadId = params.resolvedThreadId ?? resolveTelegramForumThreadId({ @@ -505,7 +531,7 @@ export const registerTelegramHandlers = ({ const storePath = telegramDeps.resolveStorePath(runtimeCfg.session?.store, { agentId: route.agentId, }); - const store = loadSessionStore(storePath); + const store = (telegramDeps.loadSessionStore ?? loadSessionStore)(storePath); const entry = resolveSessionStoreEntry({ store, sessionKey }).existing; const storedOverride = resolveStoredModelOverride({ sessionEntry: entry, @@ -585,6 +611,9 @@ export const registerTelegramHandlers = ({ primaryEntry.msg, allMedia, storeAllowFrom, + entry.promptContextMinTimestampMs !== undefined + ? { promptContextMinTimestampMs: entry.promptContextMinTimestampMs } + : undefined, ); } catch (err) { runtime.error?.(danger(`media group handler failed: ${String(err)}`)); @@ -620,6 +649,9 @@ export const registerTelegramHandlers = ({ messageIdOverride: String(last.msg.message_id), receivedAtMs: first.receivedAtMs, ingressBuffer: "text-fragment", + ...(entry.promptContextMinTimestampMs !== undefined + ? { promptContextMinTimestampMs: entry.promptContextMinTimestampMs } + : {}), }); } catch (err) { runtime.error?.(danger(`text fragment handler failed: ${String(err)}`)); @@ -698,6 +730,7 @@ export const registerTelegramHandlers = ({ const buildPromptContextForMessage = ( msg: Message, replyChainNodes: TelegramCachedMessageNode[], + options?: TelegramMessageContextOptions, ): TelegramPromptContextEntry[] => { const messageId = typeof msg.message_id === "number" ? String(msg.message_id) : undefined; const currentNode = messageCache.get({ @@ -715,6 +748,9 @@ export const registerTelegramHandlers = ({ replyChainNodes, recentLimit: 10, replyTargetWindowSize: 2, + ...(options?.promptContextMinTimestampMs !== undefined + ? { minTimestampMs: options.promptContextMinTimestampMs } + : {}), }); return conversationContext.length > 0 ? [ @@ -785,7 +821,7 @@ export const registerTelegramHandlers = ({ ) => { const replyChainNodes = buildReplyChainForMessage(msg); const { replyMedia, replyChain } = await resolveReplyMediaForChain(ctx, replyChainNodes); - const promptContext = buildPromptContextForMessage(msg, replyChainNodes); + const promptContext = buildPromptContextForMessage(msg, replyChainNodes, options); await processMessage( ctx, allMedia, @@ -1267,6 +1303,7 @@ export const registerTelegramHandlers = ({ storeAllowFrom: string[]; sendOversizeWarning: boolean; oversizeLogMessage: string; + promptContextMinTimestampMs?: number; }) => { const { ctx, @@ -1277,6 +1314,7 @@ export const registerTelegramHandlers = ({ storeAllowFrom, sendOversizeWarning, oversizeLogMessage, + promptContextMinTimestampMs, } = params; // Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars). @@ -1314,6 +1352,10 @@ export const registerTelegramHandlers = ({ nextTotalChars <= TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS ) { existing.messages.push({ msg, ctx, receivedAtMs: nowMs }); + existing.promptContextMinTimestampMs = latestPromptContextMinTimestampMs( + existing.promptContextMinTimestampMs, + promptContextMinTimestampMs, + ); scheduleTextFragmentFlush(existing); return; } @@ -1335,6 +1377,7 @@ export const registerTelegramHandlers = ({ const entry: TextFragmentEntry = { key, messages: [{ msg, ctx, receivedAtMs: nowMs }], + ...(promptContextMinTimestampMs !== undefined ? { promptContextMinTimestampMs } : {}), timer: setTimeout(() => {}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS), }; textFragmentBuffer.set(key, entry); @@ -1350,6 +1393,10 @@ export const registerTelegramHandlers = ({ if (existing) { clearTimeout(existing.timer); existing.messages.push({ msg, ctx }); + existing.promptContextMinTimestampMs = latestPromptContextMinTimestampMs( + existing.promptContextMinTimestampMs, + promptContextMinTimestampMs, + ); existing.timer = setTimeout(async () => { mediaGroupBuffer.delete(mediaGroupId); mediaGroupProcessing = mediaGroupProcessing @@ -1362,6 +1409,7 @@ export const registerTelegramHandlers = ({ } else { const entry: MediaGroupEntry = { messages: [{ msg, ctx }], + ...(promptContextMinTimestampMs !== undefined ? { promptContextMinTimestampMs } : {}), timer: setTimeout(async () => { mediaGroupBuffer.delete(mediaGroupId); mediaGroupProcessing = mediaGroupProcessing @@ -1458,6 +1506,7 @@ export const registerTelegramHandlers = ({ debounceKey, debounceLane, botUsername: ctx.me?.username, + ...(promptContextMinTimestampMs !== undefined ? { promptContextMinTimestampMs } : {}), }); }; bot.on("callback_query", async (ctx) => { @@ -2326,6 +2375,18 @@ export const registerTelegramHandlers = ({ } } + const promptContextMinTimestampMs = normalizePromptContextMinTimestampMs( + resolveTelegramSessionState({ + chatId: event.chatId, + isGroup: event.isGroup, + isForum: event.isForum, + messageThreadId: event.messageThreadId, + resolvedThreadId, + senderId: event.senderId, + runtimeCfg: cfg, + }).sessionEntry?.sessionStartedAt, + ); + recordMessageForReplyChain(event.msg, resolvedThreadId ?? dmThreadId); await processInboundMessage({ ctx: event.ctx, @@ -2336,6 +2397,7 @@ export const registerTelegramHandlers = ({ storeAllowFrom, sendOversizeWarning: event.sendOversizeWarning, oversizeLogMessage: event.oversizeLogMessage, + ...(promptContextMinTimestampMs !== undefined ? { promptContextMinTimestampMs } : {}), }); } catch (err) { runtime.error?.(danger(`${event.errorMessage}: ${String(err)}`)); diff --git a/extensions/telegram/src/bot-message-context.types.ts b/extensions/telegram/src/bot-message-context.types.ts index bb328b41191..af6d59a7874 100644 --- a/extensions/telegram/src/bot-message-context.types.ts +++ b/extensions/telegram/src/bot-message-context.types.ts @@ -23,6 +23,7 @@ export type TelegramMessageContextOptions = { messageIdOverride?: string; receivedAtMs?: number; ingressBuffer?: "inbound-debounce" | "text-fragment"; + promptContextMinTimestampMs?: number; }; export type TelegramPromptContextEntry = NonNullable< diff --git a/extensions/telegram/src/bot-updates.ts b/extensions/telegram/src/bot-updates.ts index c681495c7e4..cfe0cfc56c1 100644 --- a/extensions/telegram/src/bot-updates.ts +++ b/extensions/telegram/src/bot-updates.ts @@ -11,6 +11,7 @@ export type MediaGroupEntry = { msg: Message; ctx: TelegramContext; }>; + promptContextMinTimestampMs?: number; timer: ReturnType; }; diff --git a/extensions/telegram/src/bot.test.ts b/extensions/telegram/src/bot.test.ts index 9494998f739..5b43b1f42f2 100644 --- a/extensions/telegram/src/bot.test.ts +++ b/extensions/telegram/src/bot.test.ts @@ -26,6 +26,7 @@ const { replySpy, resolveExecApprovalSpy, sendMessageSpy, + setSessionStoreEntriesForTest, setMyCommandsSpy, telegramBotDepsForTest, telegramBotRuntimeForTest, @@ -1691,6 +1692,127 @@ describe("createTelegramBot", () => { expect(messagesById.get("201")?.body).toBe("After the incident review."); }); + it("omits stale Telegram topic context before the persisted session start", async () => { + onSpy.mockClear(); + replySpy.mockClear(); + + const sessionStartedAt = Date.parse("2026-05-10T17:30:43.127Z"); + const config = { + agents: { + defaults: { + envelopeTimezone: "utc", + }, + }, + channels: { + telegram: { + groupPolicy: "open", + groups: { "*": { requireMention: false } }, + }, + }, + } satisfies OpenClawConfig; + const sessionEntry = { + sessionId: "redacted-session", + sessionStartedAt, + updatedAt: sessionStartedAt, + lastInteractionAt: sessionStartedAt, + }; + loadConfig.mockReturnValue(config); + setSessionStoreEntriesForTest({ + "agent:main:telegram:group:-1001234567890:topic:22534": sessionEntry, + }); + + createTelegramBot({ token: "tok", config }); + const handler = getOnHandler("message") as (ctx: Record) => Promise; + const baseCtx = { + me: { id: 999, username: "openclaw_bot" }, + getFile: async () => ({ download: async () => new Uint8Array() }), + }; + const chat = { + id: -1001234567890, + type: "supergroup", + title: "Ops", + is_forum: true, + }; + const from = { id: 201, is_bot: false, first_name: "Requester" }; + const staleInstruction = "okay so we just flip in openclaw? if yes do it up"; + + await handler({ + ...baseCtx, + message: { + chat, + text: "tools.toolSearch: true", + date: Date.parse("2026-05-10T12:33:48.000Z") / 1000, + message_id: 84649, + message_thread_id: 22534, + from, + }, + }); + await handler({ + ...baseCtx, + message: { + chat, + text: staleInstruction, + date: Date.parse("2026-05-10T12:40:28.000Z") / 1000, + message_id: 84670, + message_thread_id: 22534, + from, + }, + }); + await handler({ + ...baseCtx, + message: { + chat, + text: "how does this determine stability?", + date: Date.parse("2026-05-11T23:36:21.000Z") / 1000, + message_id: 87184, + message_thread_id: 22534, + from, + }, + }); + + setSessionStoreEntriesForTest({ + "agent:main:telegram:group:-1001234567890:topic:22534": sessionEntry, + }); + replySpy.mockClear(); + await handler({ + ...baseCtx, + message: { + chat, + text: "what config change?", + date: Date.parse("2026-05-12T02:24:09.000Z") / 1000, + message_id: 87227, + message_thread_id: 22534, + from, + reply_to_message: { + chat, + text: staleInstruction, + date: Date.parse("2026-05-10T12:40:28.000Z") / 1000, + message_id: 84670, + message_thread_id: 22534, + from, + }, + }, + }); + + expect(replySpy).toHaveBeenCalledTimes(1); + const payload = replySpy.mock.calls[0][0]; + const [conversationContext] = requireArray( + payload.UntrustedStructuredContext, + "structured context", + ); + const contextRecord = requireRecord(conversationContext, "conversation context"); + const contextPayload = requireRecord(contextRecord.payload, "conversation context payload"); + const messages = requireArray(contextPayload.messages, "conversation context messages").map( + (message, index) => requireRecord(message, `conversation context message ${index + 1}`), + ); + const messagesById = new Map(messages.map((message) => [message.message_id, message])); + expect(messagesById.get("87184")?.body).toBe("how does this determine stability?"); + expect(messagesById.has("84649")).toBe(false); + expect(messagesById.has("84670")).toBe(false); + expect(messages.map((message) => message.body)).not.toContain(staleInstruction); + expect(messages.map((message) => message.body)).not.toContain("tools.toolSearch: true"); + }); + it("updates cached bot messages from Telegram edit updates", async () => { onSpy.mockClear(); replySpy.mockClear(); diff --git a/extensions/telegram/src/message-cache.test.ts b/extensions/telegram/src/message-cache.test.ts index faf738a1b17..c1f5ed1dd41 100644 --- a/extensions/telegram/src/message-cache.test.ts +++ b/extensions/telegram/src/message-cache.test.ts @@ -568,4 +568,92 @@ describe("telegram message cache", () => { expect(context.map((entry) => entry.node.messageId)).toEqual(["87183", "87184"]); expect(context.map((entry) => entry.node.body)).not.toContain(staleInstruction); }); + + it("does not select messages before the persisted session start when the reset command is absent", () => { + const cache = createTelegramMessageCache(); + const beforeSession = Date.parse("2026-05-10T12:40:00.000Z"); + const sessionStartedAt = Date.parse("2026-05-10T17:30:43.127Z"); + const afterSession = Date.parse("2026-05-11T23:36:00.000Z"); + const staleInstruction = "okay so we just flip in openclaw? if yes do it up"; + const record = (params: { + id: number; + text: string; + timestampMs: number; + replyTo?: { id: number; text: string; timestampMs: number }; + }) => + cache.record({ + accountId: "default", + chatId: -1001234567890, + threadId: 22534, + msg: { + chat: { + id: -1001234567890, + type: "supergroup", + title: "Ops", + is_forum: true, + }, + message_thread_id: 22534, + message_id: params.id, + date: Math.floor(params.timestampMs / 1000), + text: params.text, + from: { id: 101, is_bot: false, first_name: "Requester" }, + ...(params.replyTo + ? { + reply_to_message: { + chat: { + id: -1001234567890, + type: "supergroup", + title: "Ops", + is_forum: true, + }, + message_thread_id: 22534, + message_id: params.replyTo.id, + date: Math.floor(params.replyTo.timestampMs / 1000), + text: params.replyTo.text, + from: { id: 101, is_bot: false, first_name: "Requester" }, + } as Message["reply_to_message"], + } + : {}), + } as Message, + }); + + record({ + id: 84649, + text: "tools.toolSearch: true", + timestampMs: beforeSession - 5 * 60_000, + }); + record({ id: 84670, text: staleInstruction, timestampMs: beforeSession }); + record({ id: 87184, text: "how does this determine stability?", timestampMs: afterSession }); + const current = record({ + id: 87227, + text: "what config change?", + timestampMs: afterSession + 2 * 60 * 60_000, + replyTo: { id: 84670, text: staleInstruction, timestampMs: beforeSession }, + })?.sourceMessage; + if (!current) { + throw new Error("expected current Telegram message"); + } + + const replyChainNodes = buildTelegramReplyChain({ + cache, + accountId: "default", + chatId: -1001234567890, + msg: current, + }); + const context = buildTelegramConversationContext({ + cache, + accountId: "default", + chatId: -1001234567890, + messageId: "87227", + threadId: 22534, + replyChainNodes, + recentLimit: 10, + replyTargetWindowSize: 1, + minTimestampMs: sessionStartedAt, + }); + + expect(context.map((entry) => entry.node.messageId)).toEqual(["87184"]); + expect(context.map((entry) => entry.node.body)).not.toContain(staleInstruction); + expect(context.map((entry) => entry.node.body)).not.toContain("tools.toolSearch: true"); + }); }); diff --git a/extensions/telegram/src/message-cache.ts b/extensions/telegram/src/message-cache.ts index d21b569c021..2ad0b13bedd 100644 --- a/extensions/telegram/src/message-cache.ts +++ b/extensions/telegram/src/message-cache.ts @@ -529,6 +529,25 @@ function isAfterSessionBoundary( return true; } +function normalizeSessionBoundaryTimestamp(timestampMs?: number): number | undefined { + if (typeof timestampMs !== "number" || !Number.isFinite(timestampMs)) { + return undefined; + } + return Math.floor(timestampMs / 1000) * 1000; +} + +function isAtOrAfterSessionBoundaryTimestamp( + node: TelegramCachedMessageNode, + boundaryTimestampMs?: number, +): boolean { + if (boundaryTimestampMs === undefined) { + return true; + } + return typeof node.timestamp !== "number" || !Number.isFinite(node.timestamp) + ? true + : node.timestamp >= boundaryTimestampMs; +} + function resolveSessionBoundaryNode(params: { cache: TelegramMessageCache; accountId: string; @@ -602,10 +621,12 @@ export function buildTelegramConversationContext(params: { replyChainNodes: TelegramCachedMessageNode[]; recentLimit: number; replyTargetWindowSize: number; + minTimestampMs?: number; }): TelegramConversationContextNode[] { const selected = new Map(); const replyTargetIds = new Set(); const sessionBoundary = resolveSessionBoundaryNode(params); + const sessionBoundaryTimestamp = normalizeSessionBoundaryTimestamp(params.minTimestampMs); const addNode = (node: TelegramCachedMessageNode, flags?: { replyTarget?: boolean }) => { if (!node.messageId || node.messageId === params.messageId) { return; @@ -613,6 +634,9 @@ export function buildTelegramConversationContext(params: { if (!isAfterSessionBoundary(node, sessionBoundary)) { return; } + if (!isAtOrAfterSessionBoundaryTimestamp(node, sessionBoundaryTimestamp)) { + return; + } const existing = selected.get(node.messageId); const isReplyTarget = existing?.isReplyTarget === true || flags?.replyTarget === true; selected.set(node.messageId, {