fix(telegram): surface fallback on dispatch failures (#39209, thanks @riftzen-bit)

Co-authored-by: riftzen-bit <binb53339@gmail.com>
This commit is contained in:
Peter Steinberger
2026-03-07 22:40:53 +00:00
parent f53e10e3fd
commit 99de6515a0
6 changed files with 88 additions and 20 deletions

View File

@@ -292,6 +292,7 @@ Docs: https://docs.openclaw.ai
- Heartbeat/requests-in-flight scheduling: stop advancing `nextDueMs` and avoid immediate `scheduleNext()` timer overrides on requests-in-flight skips, so wake-layer retry cooldowns are honored and heartbeat cadence no longer drifts under sustained contention. (#39182) Thanks @MumuTW.
- Memory/SQLite contention resilience: re-apply `PRAGMA busy_timeout` on every sync-store and QMD connection open so process restarts/reopens no longer revert to immediate `SQLITE_BUSY` failures under lock contention. (#39183) Thanks @MumuTW.
- Gateway/webchat route safety: block webchat/control-ui clients from inheriting stored external delivery routes on channel-scoped sessions (while preserving route inheritance for UI/TUI clients), preventing cross-channel leakage from scoped chats. (#39175) Thanks @widingmarcus-cyber.
- Telegram error-surface resilience: return a user-visible fallback reply when dispatch/debounce processing fails instead of going silent, while preserving draft-stream cleanup and best-effort thread-scoped fallback delivery. (#39209) Thanks @riftzen-bit.
## 2026.3.2

View File

@@ -262,8 +262,21 @@ export const registerTelegramHandlers = ({
replyMedia,
);
},
onError: (err) => {
onError: (err, items) => {
runtime.error?.(danger(`telegram debounce flush failed: ${String(err)}`));
const chatId = items[0]?.msg.chat.id;
if (chatId != null) {
const threadId = items[0]?.msg.message_thread_id;
void bot.api
.sendMessage(
chatId,
"Something went wrong while processing your message. Please try again.",
threadId != null ? { message_thread_id: threadId } : undefined,
)
.catch((sendErr) => {
logVerbose(`telegram: error fallback send failed: ${String(sendErr)}`);
});
}
},
});

View File

@@ -1775,18 +1775,25 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("clears preview when dispatcher throws before fallback phase", async () => {
it("sends error fallback and clears preview when dispatcher throws", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockRejectedValue(new Error("dispatcher exploded"));
deliverReplies.mockResolvedValue({ delivered: true });
await expect(dispatchWithContext({ context: createContext() })).rejects.toThrow(
"dispatcher exploded",
);
await dispatchWithContext({ context: createContext() });
expect(draftStream.stop).toHaveBeenCalledTimes(1);
expect(draftStream.clear).toHaveBeenCalledTimes(1);
expect(deliverReplies).not.toHaveBeenCalled();
// Error fallback message should be delivered to the user instead of silent failure
expect(deliverReplies).toHaveBeenCalledTimes(1);
expect(deliverReplies).toHaveBeenCalledWith(
expect.objectContaining({
replies: [
{ text: "Something went wrong while processing your request. Please try again." },
],
}),
);
});
it("supports concurrent dispatches with independent previews", async () => {

View File

@@ -507,6 +507,7 @@ export const dispatchTelegramMessage = async ({
},
});
let dispatchError: unknown;
try {
({ queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
@@ -680,6 +681,9 @@ export const dispatchTelegramMessage = async ({
onModelSelected,
},
}));
} catch (err) {
dispatchError = err;
runtime.error?.(danger(`telegram dispatch failed: ${String(err)}`));
} finally {
// Upstream assistant callbacks are fire-and-forget; drain queued lane work
// before stream cleanup so boundary rotations/materialization complete first.
@@ -747,11 +751,15 @@ export const dispatchTelegramMessage = async ({
let sentFallback = false;
const deliverySummary = deliveryState.snapshot();
if (
!deliverySummary.delivered &&
(deliverySummary.skippedNonSilent > 0 || deliverySummary.failedNonSilent > 0)
dispatchError ||
(!deliverySummary.delivered &&
(deliverySummary.skippedNonSilent > 0 || deliverySummary.failedNonSilent > 0))
) {
const fallbackText = dispatchError
? "Something went wrong while processing your request. Please try again."
: EMPTY_RESPONSE_FALLBACK;
const result = await deliverReplies({
replies: [{ text: EMPTY_RESPONSE_FALLBACK }],
replies: [{ text: fallbackText }],
...deliveryBaseOptions,
});
sentFallback = result.delivered;

View File

@@ -72,4 +72,29 @@ describe("telegram bot message processor", () => {
await processSampleMessage(processMessage);
expect(dispatchTelegramMessage).not.toHaveBeenCalled();
});
it("sends user-visible fallback when dispatch throws", async () => {
const sendMessage = vi.fn().mockResolvedValue(undefined);
const runtimeError = vi.fn();
buildTelegramMessageContext.mockResolvedValue({
chatId: 123,
threadSpec: { id: 456 },
route: { sessionKey: "agent:main:main" },
});
dispatchTelegramMessage.mockRejectedValue(new Error("dispatch exploded"));
const processMessage = createTelegramMessageProcessor({
...baseDeps,
bot: { api: { sendMessage } },
runtime: { error: runtimeError },
} as unknown as Parameters<typeof createTelegramMessageProcessor>[0]);
await expect(processSampleMessage(processMessage)).resolves.toBeUndefined();
expect(sendMessage).toHaveBeenCalledWith(
123,
"Something went wrong while processing your request. Please try again.",
{ message_thread_id: 456 },
);
expect(runtimeError).toHaveBeenCalledWith(expect.stringContaining("dispatch exploded"));
});
});

View File

@@ -1,5 +1,6 @@
import type { ReplyToMode } from "../config/config.js";
import type { TelegramAccountConfig } from "../config/types.telegram.js";
import { danger } from "../globals.js";
import type { RuntimeEnv } from "../runtime.js";
import {
buildTelegramMessageContext,
@@ -78,16 +79,29 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep
if (!context) {
return;
}
await dispatchTelegramMessage({
context,
bot,
cfg,
runtime,
replyToMode,
streamMode,
textLimit,
telegramCfg,
opts,
});
try {
await dispatchTelegramMessage({
context,
bot,
cfg,
runtime,
replyToMode,
streamMode,
textLimit,
telegramCfg,
opts,
});
} catch (err) {
runtime.error?.(danger(`telegram message processing failed: ${String(err)}`));
try {
await bot.api.sendMessage(
context.chatId,
"Something went wrong while processing your request. Please try again.",
context.threadSpec?.id != null ? { message_thread_id: context.threadSpec.id } : undefined,
);
} catch {
// Best-effort fallback; delivery may fail if the bot was blocked or the chat is invalid.
}
}
};
};