mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-21 05:32:53 +00:00
fix: add Telegram error suppression controls (#51914) (thanks @chinar-amrutkar)
* feat(telegram): add error policy for suppressing repetitive error messages Introduces per-account error policy configuration that can suppress repetitive error messages (e.g., 429 rate limit, ECONNRESET) to prevent noisy error floods in Telegram channels. Closes #34498 * fix(telegram): track error cooldown per message * fix(telegram): prune expired error cooldowns * fix: add Telegram error suppression controls (#51914) (thanks @chinar-amrutkar) --------- Co-authored-by: chinar-amrutkar <chinar-amrutkar@users.noreply.github.com> Co-authored-by: Ayaan Zaidi <hi@obviy.us>
This commit is contained in:
@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Feishu/comments: add a dedicated Drive comment-event flow with comment-thread context resolution, in-thread replies, and `feishu_drive` comment actions for document collaboration workflows. (#58497) thanks @wittam-01.
|
||||
- Tasks/chat: add `/tasks` as a chat-native background task board for the current session, with recent task details and agent-local fallback counts when no linked tasks are visible. Related #54226. Thanks @vincentkoc.
|
||||
- Agents/failover: cap prompt-side and assistant-side same-provider auth-profile retries for rate-limit failures before cross-provider model fallback, add the `auth.cooldowns.rateLimitedProfileRotations` knob, and document the new fallback behavior. (#58707) Thanks @Forgely3D
|
||||
- Telegram/errors: add configurable `errorPolicy` and `errorCooldownMs` controls so Telegram can suppress repeated delivery errors per account, chat, and topic without muting distinct failures. (#51914) Thanks @chinar-amrutkar
|
||||
|
||||
### Fixes
|
||||
|
||||
|
||||
@@ -61,6 +61,9 @@ export type TelegramMessageContext = {
|
||||
groupConfig?: ReturnType<
|
||||
BuildTelegramMessageContextParams["resolveTelegramGroupConfig"]
|
||||
>["groupConfig"];
|
||||
topicConfig?: ReturnType<
|
||||
BuildTelegramMessageContextParams["resolveTelegramGroupConfig"]
|
||||
>["topicConfig"];
|
||||
resolvedThreadId?: number;
|
||||
threadSpec: ReturnType<typeof resolveTelegramThreadSpec>;
|
||||
replyThreadId?: number;
|
||||
@@ -491,6 +494,7 @@ export const buildTelegramMessageContext = async ({
|
||||
chatId,
|
||||
isGroup,
|
||||
groupConfig,
|
||||
topicConfig,
|
||||
resolvedThreadId,
|
||||
threadSpec,
|
||||
replyThreadId,
|
||||
|
||||
@@ -39,6 +39,12 @@ import { deliverReplies, emitInternalMessageSentHook } from "./bot/delivery.js";
|
||||
import type { TelegramStreamMode } from "./bot/types.js";
|
||||
import type { TelegramInlineButtons } from "./button-types.js";
|
||||
import { createTelegramDraftStream } from "./draft-stream.js";
|
||||
import {
|
||||
buildTelegramErrorScopeKey,
|
||||
isSilentErrorPolicy,
|
||||
resolveTelegramErrorPolicy,
|
||||
shouldSuppressTelegramError,
|
||||
} from "./error-policy.js";
|
||||
import { shouldSuppressLocalTelegramExecApprovalPrompt } from "./exec-approvals.js";
|
||||
import { renderTelegramHtmlText } from "./format.js";
|
||||
import {
|
||||
@@ -166,6 +172,7 @@ export const dispatchTelegramMessage = async ({
|
||||
chatId,
|
||||
isGroup,
|
||||
groupConfig,
|
||||
topicConfig,
|
||||
threadSpec,
|
||||
historyKey,
|
||||
historyLimit,
|
||||
@@ -726,6 +733,28 @@ export const dispatchTelegramMessage = async ({
|
||||
}
|
||||
},
|
||||
onError: (err, info) => {
|
||||
const errorPolicy = resolveTelegramErrorPolicy({
|
||||
accountConfig: telegramCfg,
|
||||
groupConfig,
|
||||
topicConfig,
|
||||
});
|
||||
if (isSilentErrorPolicy(errorPolicy.policy)) {
|
||||
return;
|
||||
}
|
||||
if (
|
||||
errorPolicy.policy === "once" &&
|
||||
shouldSuppressTelegramError({
|
||||
scopeKey: buildTelegramErrorScopeKey({
|
||||
accountId: route.accountId,
|
||||
chatId,
|
||||
threadId: threadSpec.id,
|
||||
}),
|
||||
cooldownMs: errorPolicy.cooldownMs,
|
||||
errorMessage: String(err),
|
||||
})
|
||||
) {
|
||||
return;
|
||||
}
|
||||
deliveryState.markNonSilentFailure();
|
||||
runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`));
|
||||
},
|
||||
|
||||
160
extensions/telegram/src/error-policy.test.ts
Normal file
160
extensions/telegram/src/error-policy.test.ts
Normal file
@@ -0,0 +1,160 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
buildTelegramErrorScopeKey,
|
||||
resolveTelegramErrorPolicy,
|
||||
resetTelegramErrorPolicyStoreForTest,
|
||||
shouldSuppressTelegramError,
|
||||
} from "./error-policy.js";
|
||||
|
||||
describe("telegram error policy", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-01-01T00:00:00Z"));
|
||||
resetTelegramErrorPolicyStoreForTest();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
resetTelegramErrorPolicyStoreForTest();
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("resolves policy and cooldown from the most specific config", () => {
|
||||
expect(
|
||||
resolveTelegramErrorPolicy({
|
||||
accountConfig: { errorPolicy: "once", errorCooldownMs: 1000 },
|
||||
groupConfig: { errorCooldownMs: 2000 },
|
||||
topicConfig: { errorPolicy: "silent" },
|
||||
}),
|
||||
).toEqual({
|
||||
policy: "silent",
|
||||
cooldownMs: 2000,
|
||||
});
|
||||
});
|
||||
|
||||
it("suppresses only repeated matching errors within the same scope", () => {
|
||||
const scopeKey = buildTelegramErrorScopeKey({
|
||||
accountId: "work",
|
||||
chatId: 42,
|
||||
threadId: 7,
|
||||
});
|
||||
|
||||
expect(
|
||||
shouldSuppressTelegramError({
|
||||
scopeKey,
|
||||
cooldownMs: 1000,
|
||||
errorMessage: "429",
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
shouldSuppressTelegramError({
|
||||
scopeKey,
|
||||
cooldownMs: 1000,
|
||||
errorMessage: "429",
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldSuppressTelegramError({
|
||||
scopeKey,
|
||||
cooldownMs: 1000,
|
||||
errorMessage: "403",
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("keeps cooldowns per error message within the same scope", () => {
|
||||
const scopeKey = buildTelegramErrorScopeKey({
|
||||
accountId: "work",
|
||||
chatId: 42,
|
||||
});
|
||||
|
||||
expect(
|
||||
shouldSuppressTelegramError({
|
||||
scopeKey,
|
||||
cooldownMs: 1000,
|
||||
errorMessage: "A",
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
shouldSuppressTelegramError({
|
||||
scopeKey,
|
||||
cooldownMs: 1000,
|
||||
errorMessage: "B",
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
shouldSuppressTelegramError({
|
||||
scopeKey,
|
||||
cooldownMs: 1000,
|
||||
errorMessage: "A",
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("prunes expired cooldowns within a single scope", () => {
|
||||
const scopeKey = buildTelegramErrorScopeKey({
|
||||
accountId: "work",
|
||||
chatId: 42,
|
||||
});
|
||||
|
||||
expect(
|
||||
shouldSuppressTelegramError({
|
||||
scopeKey,
|
||||
cooldownMs: 1000,
|
||||
errorMessage: "A",
|
||||
}),
|
||||
).toBe(false);
|
||||
vi.advanceTimersByTime(1001);
|
||||
expect(
|
||||
shouldSuppressTelegramError({
|
||||
scopeKey,
|
||||
cooldownMs: 1000,
|
||||
errorMessage: "B",
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
shouldSuppressTelegramError({
|
||||
scopeKey,
|
||||
cooldownMs: 1000,
|
||||
errorMessage: "A",
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("does not leak suppression across accounts or threads", () => {
|
||||
const workMain = buildTelegramErrorScopeKey({
|
||||
accountId: "work",
|
||||
chatId: 42,
|
||||
});
|
||||
const personalMain = buildTelegramErrorScopeKey({
|
||||
accountId: "personal",
|
||||
chatId: 42,
|
||||
});
|
||||
const workTopic = buildTelegramErrorScopeKey({
|
||||
accountId: "work",
|
||||
chatId: 42,
|
||||
threadId: 9,
|
||||
});
|
||||
|
||||
expect(
|
||||
shouldSuppressTelegramError({
|
||||
scopeKey: workMain,
|
||||
cooldownMs: 1000,
|
||||
errorMessage: "429",
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
shouldSuppressTelegramError({
|
||||
scopeKey: personalMain,
|
||||
cooldownMs: 1000,
|
||||
errorMessage: "429",
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
shouldSuppressTelegramError({
|
||||
scopeKey: workTopic,
|
||||
cooldownMs: 1000,
|
||||
errorMessage: "429",
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
107
extensions/telegram/src/error-policy.ts
Normal file
107
extensions/telegram/src/error-policy.ts
Normal file
@@ -0,0 +1,107 @@
|
||||
import type {
|
||||
TelegramAccountConfig,
|
||||
TelegramDirectConfig,
|
||||
TelegramGroupConfig,
|
||||
TelegramTopicConfig,
|
||||
} from "openclaw/plugin-sdk/config-runtime";
|
||||
|
||||
export type TelegramErrorPolicy = "always" | "once" | "silent";
|
||||
|
||||
type TelegramErrorConfig =
|
||||
| TelegramAccountConfig
|
||||
| TelegramDirectConfig
|
||||
| TelegramGroupConfig
|
||||
| TelegramTopicConfig;
|
||||
|
||||
const errorCooldownStore = new Map<string, Map<string, number>>();
|
||||
const DEFAULT_ERROR_COOLDOWN_MS = 14400000;
|
||||
|
||||
function pruneExpiredCooldowns(messageStore: Map<string, number>, now: number) {
|
||||
for (const [message, expiresAt] of messageStore) {
|
||||
if (expiresAt <= now) {
|
||||
messageStore.delete(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function resolveTelegramErrorPolicy(params: {
|
||||
accountConfig?: TelegramAccountConfig;
|
||||
groupConfig?: TelegramDirectConfig | TelegramGroupConfig;
|
||||
topicConfig?: TelegramTopicConfig;
|
||||
}): {
|
||||
policy: TelegramErrorPolicy;
|
||||
cooldownMs: number;
|
||||
} {
|
||||
const configs: Array<TelegramErrorConfig | undefined> = [
|
||||
params.accountConfig,
|
||||
params.groupConfig,
|
||||
params.topicConfig,
|
||||
];
|
||||
let policy: TelegramErrorPolicy = "always";
|
||||
let cooldownMs = DEFAULT_ERROR_COOLDOWN_MS;
|
||||
|
||||
for (const config of configs) {
|
||||
if (config?.errorPolicy) {
|
||||
policy = config.errorPolicy;
|
||||
}
|
||||
if (typeof config?.errorCooldownMs === "number") {
|
||||
cooldownMs = config.errorCooldownMs;
|
||||
}
|
||||
}
|
||||
|
||||
return { policy, cooldownMs };
|
||||
}
|
||||
|
||||
export function buildTelegramErrorScopeKey(params: {
|
||||
accountId: string;
|
||||
chatId: string | number;
|
||||
threadId?: string | number | null;
|
||||
}): string {
|
||||
const threadId = params.threadId == null ? "main" : String(params.threadId);
|
||||
return `${params.accountId}:${String(params.chatId)}:${threadId}`;
|
||||
}
|
||||
|
||||
export function shouldSuppressTelegramError(params: {
|
||||
scopeKey: string;
|
||||
cooldownMs: number;
|
||||
errorMessage?: string;
|
||||
}): boolean {
|
||||
const { scopeKey, cooldownMs, errorMessage } = params;
|
||||
const now = Date.now();
|
||||
const messageKey = errorMessage ?? "";
|
||||
const scopeStore = errorCooldownStore.get(scopeKey);
|
||||
|
||||
if (scopeStore) {
|
||||
pruneExpiredCooldowns(scopeStore, now);
|
||||
if (scopeStore.size === 0) {
|
||||
errorCooldownStore.delete(scopeKey);
|
||||
}
|
||||
}
|
||||
|
||||
if (errorCooldownStore.size > 100) {
|
||||
for (const [scope, messageStore] of errorCooldownStore) {
|
||||
pruneExpiredCooldowns(messageStore, now);
|
||||
if (messageStore.size === 0) {
|
||||
errorCooldownStore.delete(scope);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const expiresAt = scopeStore?.get(messageKey);
|
||||
if (typeof expiresAt === "number" && expiresAt > now) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const nextScopeStore = scopeStore ?? new Map<string, number>();
|
||||
nextScopeStore.set(messageKey, now + cooldownMs);
|
||||
errorCooldownStore.set(scopeKey, nextScopeStore);
|
||||
return false;
|
||||
}
|
||||
|
||||
export function isSilentErrorPolicy(policy: TelegramErrorPolicy): boolean {
|
||||
return policy === "silent";
|
||||
}
|
||||
|
||||
export function resetTelegramErrorPolicyStoreForTest() {
|
||||
errorCooldownStore.clear();
|
||||
}
|
||||
@@ -203,6 +203,10 @@ export type TelegramAccountConfig = {
|
||||
linkPreview?: boolean;
|
||||
/** Send Telegram bot error replies silently (no notification sound). Default: false. */
|
||||
silentErrorReplies?: boolean;
|
||||
/** Controls outbound error reporting: always, once per cooldown window, or silent. */
|
||||
errorPolicy?: "always" | "once" | "silent";
|
||||
/** Cooldown window for `errorPolicy: "once"` in milliseconds. */
|
||||
errorCooldownMs?: number;
|
||||
/**
|
||||
* Per-channel outbound response prefix override.
|
||||
*
|
||||
@@ -238,6 +242,10 @@ export type TelegramTopicConfig = {
|
||||
disableAudioPreflight?: boolean;
|
||||
/** Route this topic to a specific agent (overrides group-level and binding routing). */
|
||||
agentId?: string;
|
||||
/** Controls outbound error reporting for this topic. */
|
||||
errorPolicy?: "always" | "once" | "silent";
|
||||
/** Cooldown window for `errorPolicy: "once"` in milliseconds. */
|
||||
errorCooldownMs?: number;
|
||||
};
|
||||
|
||||
export type TelegramGroupConfig = {
|
||||
@@ -259,6 +267,10 @@ export type TelegramGroupConfig = {
|
||||
systemPrompt?: string;
|
||||
/** If true, skip automatic voice-note transcription for mention detection in this group. */
|
||||
disableAudioPreflight?: boolean;
|
||||
/** Controls outbound error reporting for this group. */
|
||||
errorPolicy?: "always" | "once" | "silent";
|
||||
/** Cooldown window for `errorPolicy: "once"` in milliseconds. */
|
||||
errorCooldownMs?: number;
|
||||
};
|
||||
|
||||
/** Config for LLM-based auto-topic labeling. */
|
||||
@@ -288,6 +300,10 @@ export type TelegramDirectConfig = {
|
||||
allowFrom?: Array<string | number>;
|
||||
/** Optional system prompt snippet for this DM. */
|
||||
systemPrompt?: string;
|
||||
/** Controls outbound error reporting for this DM. */
|
||||
errorPolicy?: "always" | "once" | "silent";
|
||||
/** Cooldown window for `errorPolicy: "once"` in milliseconds. */
|
||||
errorCooldownMs?: number;
|
||||
/** Auto-rename DM forum topics on first message using LLM. Default: true. */
|
||||
autoTopicLabel?: AutoTopicLabelConfig;
|
||||
};
|
||||
|
||||
@@ -71,6 +71,7 @@ const SlackCapabilitiesSchema = z.union([
|
||||
.strict(),
|
||||
]);
|
||||
|
||||
const TelegramErrorPolicySchema = z.enum(["always", "once", "silent"]).optional();
|
||||
export const TelegramTopicSchema = z
|
||||
.object({
|
||||
requireMention: z.boolean().optional(),
|
||||
@@ -81,6 +82,8 @@ export const TelegramTopicSchema = z
|
||||
allowFrom: z.array(z.union([z.string(), z.number()])).optional(),
|
||||
systemPrompt: z.string().optional(),
|
||||
agentId: z.string().optional(),
|
||||
errorPolicy: TelegramErrorPolicySchema,
|
||||
errorCooldownMs: z.number().int().nonnegative().optional(),
|
||||
})
|
||||
.strict();
|
||||
|
||||
@@ -96,6 +99,8 @@ export const TelegramGroupSchema = z
|
||||
allowFrom: z.array(z.union([z.string(), z.number()])).optional(),
|
||||
systemPrompt: z.string().optional(),
|
||||
topics: z.record(z.string(), TelegramTopicSchema.optional()).optional(),
|
||||
errorPolicy: TelegramErrorPolicySchema,
|
||||
errorCooldownMs: z.number().int().nonnegative().optional(),
|
||||
})
|
||||
.strict();
|
||||
|
||||
@@ -121,6 +126,8 @@ export const TelegramDirectSchema = z
|
||||
allowFrom: z.array(z.union([z.string(), z.number()])).optional(),
|
||||
systemPrompt: z.string().optional(),
|
||||
topics: z.record(z.string(), TelegramTopicSchema.optional()).optional(),
|
||||
errorPolicy: TelegramErrorPolicySchema,
|
||||
errorCooldownMs: z.number().int().nonnegative().optional(),
|
||||
requireTopic: z.boolean().optional(),
|
||||
autoTopicLabel: AutoTopicLabelSchema,
|
||||
})
|
||||
@@ -293,6 +300,8 @@ export const TelegramAccountSchemaBase = z
|
||||
silentErrorReplies: z.boolean().optional(),
|
||||
responsePrefix: z.string().optional(),
|
||||
ackReaction: z.string().optional(),
|
||||
errorPolicy: TelegramErrorPolicySchema,
|
||||
errorCooldownMs: z.number().int().nonnegative().optional(),
|
||||
apiRoot: z.string().url().optional(),
|
||||
autoTopicLabel: AutoTopicLabelSchema,
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user