fix telegram context session start boundary

This commit is contained in:
VACInc
2026-05-11 22:41:17 -04:00
committed by Ayaan Zaidi
parent b31afcbc17
commit afd7339bd2
6 changed files with 301 additions and 3 deletions

View File

@@ -179,6 +179,7 @@ export const registerTelegramHandlers = ({
key: string;
threadId?: number;
messages: Array<{ msg: Message; ctx: TelegramContext; receivedAtMs: number }>;
promptContextMinTimestampMs?: number;
timer: ReturnType<typeof setTimeout>;
};
const textFragmentBuffer = new Map<string, TextFragmentEntry>();
@@ -197,6 +198,22 @@ export const registerTelegramHandlers = ({
debounceLane: TelegramDebounceLane;
botUsername?: string;
threadId?: number;
promptContextMinTimestampMs?: number;
};
const normalizePromptContextMinTimestampMs = (timestampMs?: number) =>
typeof timestampMs === "number" && Number.isFinite(timestampMs) ? timestampMs : undefined;
const latestPromptContextMinTimestampMs = (
...timestamps: Array<number | undefined>
): number | undefined => {
let latest: number | undefined;
for (const timestampMs of timestamps) {
const normalized = normalizePromptContextMinTimestampMs(timestampMs);
if (normalized === undefined) {
continue;
}
latest = latest === undefined ? normalized : Math.max(latest, normalized);
}
return latest;
};
const resolveTelegramDebounceLane = (msg: Message): TelegramDebounceLane => {
const forwardMeta = msg as {
@@ -394,9 +411,13 @@ export const registerTelegramHandlers = ({
return;
}
if (entries.length === 1) {
const promptContextMinTimestampMs = normalizePromptContextMinTimestampMs(
last.promptContextMinTimestampMs,
);
await processMessageWithReplyChain(last.ctx, last.msg, last.allMedia, last.storeAllowFrom, {
receivedAtMs: last.receivedAtMs,
ingressBuffer: "inbound-debounce",
...(promptContextMinTimestampMs !== undefined ? { promptContextMinTimestampMs } : {}),
});
return;
}
@@ -409,6 +430,9 @@ export const registerTelegramHandlers = ({
return;
}
const first = entries[0];
const promptContextMinTimestampMs = latestPromptContextMinTimestampMs(
...entries.map((entry) => entry.promptContextMinTimestampMs),
);
const baseCtx = first.ctx;
const syntheticMessage = buildSyntheticTextMessage({
base: first.msg,
@@ -426,6 +450,7 @@ export const registerTelegramHandlers = ({
...(messageIdOverride ? { messageIdOverride } : {}),
receivedAtMs: first.receivedAtMs,
ingressBuffer: "inbound-debounce",
...(promptContextMinTimestampMs !== undefined ? { promptContextMinTimestampMs } : {}),
},
);
},
@@ -454,13 +479,14 @@ export const registerTelegramHandlers = ({
messageThreadId?: number;
resolvedThreadId?: number;
senderId?: string | number;
runtimeCfg?: OpenClawConfig;
}): {
agentId: string;
sessionEntry: ReturnType<typeof resolveSessionStoreEntry>["existing"];
sessionKey: string;
model?: string;
} => {
const runtimeCfg = telegramDeps.getRuntimeConfig();
const runtimeCfg = params.runtimeCfg ?? telegramDeps.getRuntimeConfig();
const resolvedThreadId =
params.resolvedThreadId ??
resolveTelegramForumThreadId({
@@ -505,7 +531,7 @@ export const registerTelegramHandlers = ({
const storePath = telegramDeps.resolveStorePath(runtimeCfg.session?.store, {
agentId: route.agentId,
});
const store = loadSessionStore(storePath);
const store = (telegramDeps.loadSessionStore ?? loadSessionStore)(storePath);
const entry = resolveSessionStoreEntry({ store, sessionKey }).existing;
const storedOverride = resolveStoredModelOverride({
sessionEntry: entry,
@@ -585,6 +611,9 @@ export const registerTelegramHandlers = ({
primaryEntry.msg,
allMedia,
storeAllowFrom,
entry.promptContextMinTimestampMs !== undefined
? { promptContextMinTimestampMs: entry.promptContextMinTimestampMs }
: undefined,
);
} catch (err) {
runtime.error?.(danger(`media group handler failed: ${String(err)}`));
@@ -620,6 +649,9 @@ export const registerTelegramHandlers = ({
messageIdOverride: String(last.msg.message_id),
receivedAtMs: first.receivedAtMs,
ingressBuffer: "text-fragment",
...(entry.promptContextMinTimestampMs !== undefined
? { promptContextMinTimestampMs: entry.promptContextMinTimestampMs }
: {}),
});
} catch (err) {
runtime.error?.(danger(`text fragment handler failed: ${String(err)}`));
@@ -698,6 +730,7 @@ export const registerTelegramHandlers = ({
const buildPromptContextForMessage = (
msg: Message,
replyChainNodes: TelegramCachedMessageNode[],
options?: TelegramMessageContextOptions,
): TelegramPromptContextEntry[] => {
const messageId = typeof msg.message_id === "number" ? String(msg.message_id) : undefined;
const currentNode = messageCache.get({
@@ -715,6 +748,9 @@ export const registerTelegramHandlers = ({
replyChainNodes,
recentLimit: 10,
replyTargetWindowSize: 2,
...(options?.promptContextMinTimestampMs !== undefined
? { minTimestampMs: options.promptContextMinTimestampMs }
: {}),
});
return conversationContext.length > 0
? [
@@ -785,7 +821,7 @@ export const registerTelegramHandlers = ({
) => {
const replyChainNodes = buildReplyChainForMessage(msg);
const { replyMedia, replyChain } = await resolveReplyMediaForChain(ctx, replyChainNodes);
const promptContext = buildPromptContextForMessage(msg, replyChainNodes);
const promptContext = buildPromptContextForMessage(msg, replyChainNodes, options);
await processMessage(
ctx,
allMedia,
@@ -1267,6 +1303,7 @@ export const registerTelegramHandlers = ({
storeAllowFrom: string[];
sendOversizeWarning: boolean;
oversizeLogMessage: string;
promptContextMinTimestampMs?: number;
}) => {
const {
ctx,
@@ -1277,6 +1314,7 @@ export const registerTelegramHandlers = ({
storeAllowFrom,
sendOversizeWarning,
oversizeLogMessage,
promptContextMinTimestampMs,
} = params;
// Text fragment handling - Telegram splits long pastes into multiple inbound messages (~4096 chars).
@@ -1314,6 +1352,10 @@ export const registerTelegramHandlers = ({
nextTotalChars <= TELEGRAM_TEXT_FRAGMENT_MAX_TOTAL_CHARS
) {
existing.messages.push({ msg, ctx, receivedAtMs: nowMs });
existing.promptContextMinTimestampMs = latestPromptContextMinTimestampMs(
existing.promptContextMinTimestampMs,
promptContextMinTimestampMs,
);
scheduleTextFragmentFlush(existing);
return;
}
@@ -1335,6 +1377,7 @@ export const registerTelegramHandlers = ({
const entry: TextFragmentEntry = {
key,
messages: [{ msg, ctx, receivedAtMs: nowMs }],
...(promptContextMinTimestampMs !== undefined ? { promptContextMinTimestampMs } : {}),
timer: setTimeout(() => {}, TELEGRAM_TEXT_FRAGMENT_MAX_GAP_MS),
};
textFragmentBuffer.set(key, entry);
@@ -1350,6 +1393,10 @@ export const registerTelegramHandlers = ({
if (existing) {
clearTimeout(existing.timer);
existing.messages.push({ msg, ctx });
existing.promptContextMinTimestampMs = latestPromptContextMinTimestampMs(
existing.promptContextMinTimestampMs,
promptContextMinTimestampMs,
);
existing.timer = setTimeout(async () => {
mediaGroupBuffer.delete(mediaGroupId);
mediaGroupProcessing = mediaGroupProcessing
@@ -1362,6 +1409,7 @@ export const registerTelegramHandlers = ({
} else {
const entry: MediaGroupEntry = {
messages: [{ msg, ctx }],
...(promptContextMinTimestampMs !== undefined ? { promptContextMinTimestampMs } : {}),
timer: setTimeout(async () => {
mediaGroupBuffer.delete(mediaGroupId);
mediaGroupProcessing = mediaGroupProcessing
@@ -1458,6 +1506,7 @@ export const registerTelegramHandlers = ({
debounceKey,
debounceLane,
botUsername: ctx.me?.username,
...(promptContextMinTimestampMs !== undefined ? { promptContextMinTimestampMs } : {}),
});
};
bot.on("callback_query", async (ctx) => {
@@ -2326,6 +2375,18 @@ export const registerTelegramHandlers = ({
}
}
const promptContextMinTimestampMs = normalizePromptContextMinTimestampMs(
resolveTelegramSessionState({
chatId: event.chatId,
isGroup: event.isGroup,
isForum: event.isForum,
messageThreadId: event.messageThreadId,
resolvedThreadId,
senderId: event.senderId,
runtimeCfg: cfg,
}).sessionEntry?.sessionStartedAt,
);
recordMessageForReplyChain(event.msg, resolvedThreadId ?? dmThreadId);
await processInboundMessage({
ctx: event.ctx,
@@ -2336,6 +2397,7 @@ export const registerTelegramHandlers = ({
storeAllowFrom,
sendOversizeWarning: event.sendOversizeWarning,
oversizeLogMessage: event.oversizeLogMessage,
...(promptContextMinTimestampMs !== undefined ? { promptContextMinTimestampMs } : {}),
});
} catch (err) {
runtime.error?.(danger(`${event.errorMessage}: ${String(err)}`));

View File

@@ -23,6 +23,7 @@ export type TelegramMessageContextOptions = {
messageIdOverride?: string;
receivedAtMs?: number;
ingressBuffer?: "inbound-debounce" | "text-fragment";
promptContextMinTimestampMs?: number;
};
export type TelegramPromptContextEntry = NonNullable<

View File

@@ -11,6 +11,7 @@ export type MediaGroupEntry = {
msg: Message;
ctx: TelegramContext;
}>;
promptContextMinTimestampMs?: number;
timer: ReturnType<typeof setTimeout>;
};

View File

@@ -26,6 +26,7 @@ const {
replySpy,
resolveExecApprovalSpy,
sendMessageSpy,
setSessionStoreEntriesForTest,
setMyCommandsSpy,
telegramBotDepsForTest,
telegramBotRuntimeForTest,
@@ -1691,6 +1692,127 @@ describe("createTelegramBot", () => {
expect(messagesById.get("201")?.body).toBe("After the incident review.");
});
it("omits stale Telegram topic context before the persisted session start", async () => {
onSpy.mockClear();
replySpy.mockClear();
const sessionStartedAt = Date.parse("2026-05-10T17:30:43.127Z");
const config = {
agents: {
defaults: {
envelopeTimezone: "utc",
},
},
channels: {
telegram: {
groupPolicy: "open",
groups: { "*": { requireMention: false } },
},
},
} satisfies OpenClawConfig;
const sessionEntry = {
sessionId: "redacted-session",
sessionStartedAt,
updatedAt: sessionStartedAt,
lastInteractionAt: sessionStartedAt,
};
loadConfig.mockReturnValue(config);
setSessionStoreEntriesForTest({
"agent:main:telegram:group:-1001234567890:topic:22534": sessionEntry,
});
createTelegramBot({ token: "tok", config });
const handler = getOnHandler("message") as (ctx: Record<string, unknown>) => Promise<void>;
const baseCtx = {
me: { id: 999, username: "openclaw_bot" },
getFile: async () => ({ download: async () => new Uint8Array() }),
};
const chat = {
id: -1001234567890,
type: "supergroup",
title: "Ops",
is_forum: true,
};
const from = { id: 201, is_bot: false, first_name: "Requester" };
const staleInstruction = "okay so we just flip in openclaw? if yes do it up";
await handler({
...baseCtx,
message: {
chat,
text: "tools.toolSearch: true",
date: Date.parse("2026-05-10T12:33:48.000Z") / 1000,
message_id: 84649,
message_thread_id: 22534,
from,
},
});
await handler({
...baseCtx,
message: {
chat,
text: staleInstruction,
date: Date.parse("2026-05-10T12:40:28.000Z") / 1000,
message_id: 84670,
message_thread_id: 22534,
from,
},
});
await handler({
...baseCtx,
message: {
chat,
text: "how does this determine stability?",
date: Date.parse("2026-05-11T23:36:21.000Z") / 1000,
message_id: 87184,
message_thread_id: 22534,
from,
},
});
setSessionStoreEntriesForTest({
"agent:main:telegram:group:-1001234567890:topic:22534": sessionEntry,
});
replySpy.mockClear();
await handler({
...baseCtx,
message: {
chat,
text: "what config change?",
date: Date.parse("2026-05-12T02:24:09.000Z") / 1000,
message_id: 87227,
message_thread_id: 22534,
from,
reply_to_message: {
chat,
text: staleInstruction,
date: Date.parse("2026-05-10T12:40:28.000Z") / 1000,
message_id: 84670,
message_thread_id: 22534,
from,
},
},
});
expect(replySpy).toHaveBeenCalledTimes(1);
const payload = replySpy.mock.calls[0][0];
const [conversationContext] = requireArray(
payload.UntrustedStructuredContext,
"structured context",
);
const contextRecord = requireRecord(conversationContext, "conversation context");
const contextPayload = requireRecord(contextRecord.payload, "conversation context payload");
const messages = requireArray(contextPayload.messages, "conversation context messages").map(
(message, index) => requireRecord(message, `conversation context message ${index + 1}`),
);
const messagesById = new Map(messages.map((message) => [message.message_id, message]));
expect(messagesById.get("87184")?.body).toBe("how does this determine stability?");
expect(messagesById.has("84649")).toBe(false);
expect(messagesById.has("84670")).toBe(false);
expect(messages.map((message) => message.body)).not.toContain(staleInstruction);
expect(messages.map((message) => message.body)).not.toContain("tools.toolSearch: true");
});
it("updates cached bot messages from Telegram edit updates", async () => {
onSpy.mockClear();
replySpy.mockClear();

View File

@@ -568,4 +568,92 @@ describe("telegram message cache", () => {
expect(context.map((entry) => entry.node.messageId)).toEqual(["87183", "87184"]);
expect(context.map((entry) => entry.node.body)).not.toContain(staleInstruction);
});
it("does not select messages before the persisted session start when the reset command is absent", () => {
const cache = createTelegramMessageCache();
const beforeSession = Date.parse("2026-05-10T12:40:00.000Z");
const sessionStartedAt = Date.parse("2026-05-10T17:30:43.127Z");
const afterSession = Date.parse("2026-05-11T23:36:00.000Z");
const staleInstruction = "okay so we just flip in openclaw? if yes do it up";
const record = (params: {
id: number;
text: string;
timestampMs: number;
replyTo?: { id: number; text: string; timestampMs: number };
}) =>
cache.record({
accountId: "default",
chatId: -1001234567890,
threadId: 22534,
msg: {
chat: {
id: -1001234567890,
type: "supergroup",
title: "Ops",
is_forum: true,
},
message_thread_id: 22534,
message_id: params.id,
date: Math.floor(params.timestampMs / 1000),
text: params.text,
from: { id: 101, is_bot: false, first_name: "Requester" },
...(params.replyTo
? {
reply_to_message: {
chat: {
id: -1001234567890,
type: "supergroup",
title: "Ops",
is_forum: true,
},
message_thread_id: 22534,
message_id: params.replyTo.id,
date: Math.floor(params.replyTo.timestampMs / 1000),
text: params.replyTo.text,
from: { id: 101, is_bot: false, first_name: "Requester" },
} as Message["reply_to_message"],
}
: {}),
} as Message,
});
record({
id: 84649,
text: "tools.toolSearch: true",
timestampMs: beforeSession - 5 * 60_000,
});
record({ id: 84670, text: staleInstruction, timestampMs: beforeSession });
record({ id: 87184, text: "how does this determine stability?", timestampMs: afterSession });
const current = record({
id: 87227,
text: "what config change?",
timestampMs: afterSession + 2 * 60 * 60_000,
replyTo: { id: 84670, text: staleInstruction, timestampMs: beforeSession },
})?.sourceMessage;
if (!current) {
throw new Error("expected current Telegram message");
}
const replyChainNodes = buildTelegramReplyChain({
cache,
accountId: "default",
chatId: -1001234567890,
msg: current,
});
const context = buildTelegramConversationContext({
cache,
accountId: "default",
chatId: -1001234567890,
messageId: "87227",
threadId: 22534,
replyChainNodes,
recentLimit: 10,
replyTargetWindowSize: 1,
minTimestampMs: sessionStartedAt,
});
expect(context.map((entry) => entry.node.messageId)).toEqual(["87184"]);
expect(context.map((entry) => entry.node.body)).not.toContain(staleInstruction);
expect(context.map((entry) => entry.node.body)).not.toContain("tools.toolSearch: true");
});
});

View File

@@ -529,6 +529,25 @@ function isAfterSessionBoundary(
return true;
}
function normalizeSessionBoundaryTimestamp(timestampMs?: number): number | undefined {
if (typeof timestampMs !== "number" || !Number.isFinite(timestampMs)) {
return undefined;
}
return Math.floor(timestampMs / 1000) * 1000;
}
function isAtOrAfterSessionBoundaryTimestamp(
node: TelegramCachedMessageNode,
boundaryTimestampMs?: number,
): boolean {
if (boundaryTimestampMs === undefined) {
return true;
}
return typeof node.timestamp !== "number" || !Number.isFinite(node.timestamp)
? true
: node.timestamp >= boundaryTimestampMs;
}
function resolveSessionBoundaryNode(params: {
cache: TelegramMessageCache;
accountId: string;
@@ -602,10 +621,12 @@ export function buildTelegramConversationContext(params: {
replyChainNodes: TelegramCachedMessageNode[];
recentLimit: number;
replyTargetWindowSize: number;
minTimestampMs?: number;
}): TelegramConversationContextNode[] {
const selected = new Map<string, TelegramConversationContextNode>();
const replyTargetIds = new Set<string>();
const sessionBoundary = resolveSessionBoundaryNode(params);
const sessionBoundaryTimestamp = normalizeSessionBoundaryTimestamp(params.minTimestampMs);
const addNode = (node: TelegramCachedMessageNode, flags?: { replyTarget?: boolean }) => {
if (!node.messageId || node.messageId === params.messageId) {
return;
@@ -613,6 +634,9 @@ export function buildTelegramConversationContext(params: {
if (!isAfterSessionBoundary(node, sessionBoundary)) {
return;
}
if (!isAtOrAfterSessionBoundaryTimestamp(node, sessionBoundaryTimestamp)) {
return;
}
const existing = selected.get(node.messageId);
const isReplyTarget = existing?.isReplyTarget === true || flags?.replyTarget === true;
selected.set(node.messageId, {