From b786d11fea377d97d5c07cadee7a0f8bc8e0a168 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 26 Feb 2026 03:32:36 +0100 Subject: [PATCH] refactor(telegram): simplify polling restart flow --- src/telegram/monitor.test.ts | 173 ++++++++++++++++++++++------------- src/telegram/monitor.ts | 136 ++++++++++++++------------- 2 files changed, 184 insertions(+), 125 deletions(-) diff --git a/src/telegram/monitor.test.ts b/src/telegram/monitor.test.ts index 49fbcc13155..4e59f6c0c6a 100644 --- a/src/telegram/monitor.test.ts +++ b/src/telegram/monitor.test.ts @@ -67,6 +67,36 @@ const { startTelegramWebhookSpy } = vi.hoisted(() => ({ startTelegramWebhookSpy: vi.fn(async () => ({ server: { close: vi.fn() }, stop: vi.fn() })), })); +type RunnerStub = { + task: () => Promise; + stop: ReturnType void | Promise>>; + isRunning: () => boolean; +}; + +const makeRunnerStub = (overrides: Partial = {}): RunnerStub => ({ + task: overrides.task ?? (() => Promise.resolve()), + stop: overrides.stop ?? vi.fn<() => void | Promise>(), + isRunning: overrides.isRunning ?? (() => false), +}); + +async function monitorWithAutoAbort( + opts: Omit[0], "abortSignal"> = {}, +) { + const abort = new AbortController(); + runSpy.mockImplementationOnce(() => + makeRunnerStub({ + task: async () => { + abort.abort(); + }, + }), + ); + await monitorTelegramProvider({ + token: "tok", + ...opts, + abortSignal: abort.signal, + }); +} + vi.mock("../config/config.js", async (importOriginal) => { const actual = await importOriginal(); return { @@ -149,7 +179,7 @@ describe("monitorTelegramProvider (grammY)", () => { Object.values(api).forEach((fn) => { fn?.mockReset?.(); }); - await monitorTelegramProvider({ token: "tok" }); + await monitorWithAutoAbort(); expect(handlers.message).toBeDefined(); await handlers.message?.({ message: { @@ -172,7 +202,7 @@ describe("monitorTelegramProvider (grammY)", () => { channels: { telegram: {} }, }); - await monitorTelegramProvider({ token: "tok" }); + await monitorWithAutoAbort(); expect(runSpy).toHaveBeenCalledWith( expect.anything(), @@ -180,7 +210,7 @@ describe("monitorTelegramProvider (grammY)", () => { sink: { concurrency: 3 }, runner: expect.objectContaining({ silent: true, - maxRetryTime: 5 * 60 * 1000, + maxRetryTime: 60 * 60 * 1000, retryInterval: "exponential", }), }), @@ -191,7 +221,7 @@ describe("monitorTelegramProvider (grammY)", () => { Object.values(api).forEach((fn) => { fn?.mockReset?.(); }); - await monitorTelegramProvider({ token: "tok" }); + await monitorWithAutoAbort(); await handlers.message?.({ message: { message_id: 2, @@ -205,24 +235,27 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("retries on recoverable undici fetch errors", async () => { + const abort = new AbortController(); const networkError = Object.assign(new TypeError("fetch failed"), { cause: Object.assign(new Error("connect timeout"), { code: "UND_ERR_CONNECT_TIMEOUT", }), }); runSpy - .mockImplementationOnce(() => ({ - task: () => Promise.reject(networkError), - stop: vi.fn(), - isRunning: (): boolean => false, - })) - .mockImplementationOnce(() => ({ - task: () => Promise.resolve(), - stop: vi.fn(), - isRunning: (): boolean => false, - })); + .mockImplementationOnce(() => + makeRunnerStub({ + task: () => Promise.reject(networkError), + }), + ) + .mockImplementationOnce(() => + makeRunnerStub({ + task: async () => { + abort.abort(); + }, + }), + ); - await monitorTelegramProvider({ token: "tok" }); + await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); expect(computeBackoff).toHaveBeenCalled(); expect(sleepWithAbort).toHaveBeenCalled(); @@ -230,6 +263,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("deletes webhook before starting polling", async () => { + const abort = new AbortController(); const order: string[] = []; api.deleteWebhook.mockReset(); api.deleteWebhook.mockImplementationOnce(async () => { @@ -238,20 +272,21 @@ describe("monitorTelegramProvider (grammY)", () => { }); runSpy.mockImplementationOnce(() => { order.push("run"); - return { - task: () => Promise.resolve(), - stop: vi.fn(), - isRunning: () => false, - }; + return makeRunnerStub({ + task: async () => { + abort.abort(); + }, + }); }); - await monitorTelegramProvider({ token: "tok" }); + await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); expect(api.deleteWebhook).toHaveBeenCalledWith({ drop_pending_updates: false }); expect(order).toEqual(["deleteWebhook", "run"]); }); it("retries recoverable deleteWebhook failures before polling", async () => { + const abort = new AbortController(); const cleanupError = Object.assign(new TypeError("fetch failed"), { cause: Object.assign(new Error("connect timeout"), { code: "UND_ERR_CONNECT_TIMEOUT", @@ -259,13 +294,15 @@ describe("monitorTelegramProvider (grammY)", () => { }); api.deleteWebhook.mockReset(); api.deleteWebhook.mockRejectedValueOnce(cleanupError).mockResolvedValueOnce(true); - runSpy.mockImplementationOnce(() => ({ - task: () => Promise.resolve(), - stop: vi.fn(), - isRunning: () => false, - })); + runSpy.mockImplementationOnce(() => + makeRunnerStub({ + task: async () => { + abort.abort(); + }, + }), + ); - await monitorTelegramProvider({ token: "tok" }); + await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); expect(api.deleteWebhook).toHaveBeenCalledTimes(2); expect(computeBackoff).toHaveBeenCalled(); @@ -274,6 +311,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("retries setup-time recoverable errors before starting polling", async () => { + const abort = new AbortController(); const setupError = Object.assign(new TypeError("fetch failed"), { cause: Object.assign(new Error("connect timeout"), { code: "UND_ERR_CONNECT_TIMEOUT", @@ -281,13 +319,15 @@ describe("monitorTelegramProvider (grammY)", () => { }); createTelegramBotErrors.push(setupError); - runSpy.mockImplementationOnce(() => ({ - task: () => Promise.resolve(), - stop: vi.fn(), - isRunning: () => false, - })); + runSpy.mockImplementationOnce(() => + makeRunnerStub({ + task: async () => { + abort.abort(); + }, + }), + ); - await monitorTelegramProvider({ token: "tok" }); + await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); expect(computeBackoff).toHaveBeenCalled(); expect(sleepWithAbort).toHaveBeenCalled(); @@ -295,6 +335,7 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("awaits runner.stop before retrying after recoverable polling error", async () => { + const abort = new AbortController(); const recoverableError = Object.assign(new TypeError("fetch failed"), { cause: Object.assign(new Error("connect timeout"), { code: "UND_ERR_CONNECT_TIMEOUT", @@ -307,21 +348,22 @@ describe("monitorTelegramProvider (grammY)", () => { }); runSpy - .mockImplementationOnce(() => ({ - task: () => Promise.reject(recoverableError), - stop: firstStop, - isRunning: () => false, - })) + .mockImplementationOnce(() => + makeRunnerStub({ + task: () => Promise.reject(recoverableError), + stop: firstStop, + }), + ) .mockImplementationOnce(() => { expect(firstStopped).toBe(true); - return { - task: () => Promise.resolve(), - stop: vi.fn(), - isRunning: () => false, - }; + return makeRunnerStub({ + task: async () => { + abort.abort(); + }, + }); }); - await monitorTelegramProvider({ token: "tok" }); + await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); expect(firstStop).toHaveBeenCalled(); expect(computeBackoff).toHaveBeenCalled(); @@ -330,16 +372,17 @@ describe("monitorTelegramProvider (grammY)", () => { }); it("surfaces non-recoverable errors", async () => { - runSpy.mockImplementationOnce(() => ({ - task: () => Promise.reject(new Error("bad token")), - stop: vi.fn(), - isRunning: (): boolean => false, - })); + runSpy.mockImplementationOnce(() => + makeRunnerStub({ + task: () => Promise.reject(new Error("bad token")), + }), + ); await expect(monitorTelegramProvider({ token: "tok" })).rejects.toThrow("bad token"); }); it("force-restarts polling when unhandled network rejection stalls runner", async () => { + const abort = new AbortController(); let running = true; let releaseTask: (() => void) | undefined; const stop = vi.fn(async () => { @@ -348,21 +391,25 @@ describe("monitorTelegramProvider (grammY)", () => { }); runSpy - .mockImplementationOnce(() => ({ - task: () => - new Promise((resolve) => { - releaseTask = resolve; - }), - stop, - isRunning: () => running, - })) - .mockImplementationOnce(() => ({ - task: () => Promise.resolve(), - stop: vi.fn(), - isRunning: () => false, - })); + .mockImplementationOnce(() => + makeRunnerStub({ + task: () => + new Promise((resolve) => { + releaseTask = resolve; + }), + stop, + isRunning: () => running, + }), + ) + .mockImplementationOnce(() => + makeRunnerStub({ + task: async () => { + abort.abort(); + }, + }), + ); - const monitor = monitorTelegramProvider({ token: "tok" }); + const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); await vi.waitFor(() => expect(runSpy).toHaveBeenCalledTimes(1)); expect(emitUnhandledRejection(new TypeError("fetch failed"))).toBe(true); diff --git a/src/telegram/monitor.ts b/src/telegram/monitor.ts index 8c93eee60c9..579db8ad3a1 100644 --- a/src/telegram/monitor.ts +++ b/src/telegram/monitor.ts @@ -45,9 +45,8 @@ export function createTelegramRunnerOptions(cfg: OpenClawConfig): RunOptions; + const isGetUpdatesConflict = (err: unknown) => { if (!err || typeof err !== "object") { return false; @@ -188,21 +189,11 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { let restartAttempts = 0; let webhookCleared = false; const runnerOptions = createTelegramRunnerOptions(cfg); - const waitBeforeRetryOnRecoverableSetupError = async ( - err: unknown, - logPrefix: string, - ): Promise => { - if (opts.abortSignal?.aborted) { - return false; - } - if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) { - throw err; - } + const waitBeforeRestart = async (buildLine: (delay: string) => string): Promise => { restartAttempts += 1; const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts); - (opts.runtime?.error ?? console.error)( - `${logPrefix}: ${formatErrorMessage(err)}; retrying in ${formatDurationPrecise(delayMs)}.`, - ); + const delay = formatDurationPrecise(delayMs); + log(buildLine(delay)); try { await sleepWithAbort(delayMs, opts.abortSignal); } catch (sleepErr) { @@ -214,10 +205,24 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { return true; }; - while (!opts.abortSignal?.aborted) { - let bot; + const waitBeforeRetryOnRecoverableSetupError = async ( + err: unknown, + logPrefix: string, + ): Promise => { + if (opts.abortSignal?.aborted) { + return false; + } + if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) { + throw err; + } + return waitBeforeRestart( + (delay) => `${logPrefix}: ${formatErrorMessage(err)}; retrying in ${delay}.`, + ); + }; + + const createPollingBot = async (): Promise => { try { - bot = createTelegramBot({ + return createTelegramBot({ token, runtime: opts.runtime, proxyFetch, @@ -234,31 +239,34 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { "Telegram setup network error", ); if (!shouldRetry) { - return; + return undefined; } - continue; + return undefined; } + }; - if (!webhookCleared) { - try { - await withTelegramApiErrorLogging({ - operation: "deleteWebhook", - runtime: opts.runtime, - fn: () => bot.api.deleteWebhook({ drop_pending_updates: false }), - }); - webhookCleared = true; - } catch (err) { - const shouldRetry = await waitBeforeRetryOnRecoverableSetupError( - err, - "Telegram webhook cleanup failed", - ); - if (!shouldRetry) { - return; - } - continue; - } + const ensureWebhookCleanup = async (bot: TelegramBot): Promise<"ready" | "retry" | "exit"> => { + if (webhookCleared) { + return "ready"; } + try { + await withTelegramApiErrorLogging({ + operation: "deleteWebhook", + runtime: opts.runtime, + fn: () => bot.api.deleteWebhook({ drop_pending_updates: false }), + }); + webhookCleared = true; + return "ready"; + } catch (err) { + const shouldRetry = await waitBeforeRetryOnRecoverableSetupError( + err, + "Telegram webhook cleanup failed", + ); + return shouldRetry ? "retry" : "exit"; + } + }; + const runPollingCycle = async (bot: TelegramBot): Promise<"continue" | "exit"> => { const runner = run(bot, runnerOptions); activeRunner = runner; let stopPromise: Promise | undefined; @@ -280,23 +288,16 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { // runner.task() returns a promise that resolves when the runner stops await runner.task(); if (opts.abortSignal?.aborted) { - return; + return "exit"; } - // The runner stopped on its own. This can happen when grammY's - // maxRetryTime is exceeded (e.g. prolonged network outage). - // Instead of exiting permanently, restart with backoff so polling - // recovers once connectivity is restored. - restartAttempts += 1; - const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts); const reason = forceRestarted ? "unhandled network error" : "runner stopped (maxRetryTime exceeded or graceful stop)"; forceRestarted = false; - log( - `Telegram polling runner stopped (${reason}); restarting in ${formatDurationPrecise(delayMs)}.`, + const shouldRestart = await waitBeforeRestart( + (delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`, ); - await sleepWithAbort(delayMs, opts.abortSignal); - continue; + return shouldRestart ? "continue" : "exit"; } catch (err) { forceRestarted = false; if (opts.abortSignal?.aborted) { @@ -307,25 +308,36 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { if (!isConflict && !isRecoverable) { throw err; } - restartAttempts += 1; - const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts); const reason = isConflict ? "getUpdates conflict" : "network error"; const errMsg = formatErrorMessage(err); - (opts.runtime?.error ?? console.error)( - `Telegram ${reason}: ${errMsg}; retrying in ${formatDurationPrecise(delayMs)}.`, + const shouldRestart = await waitBeforeRestart( + (delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`, ); - try { - await sleepWithAbort(delayMs, opts.abortSignal); - } catch (sleepErr) { - if (opts.abortSignal?.aborted) { - return; - } - throw sleepErr; - } + return shouldRestart ? "continue" : "exit"; } finally { opts.abortSignal?.removeEventListener("abort", stopOnAbort); await stopRunner(); } + }; + + while (!opts.abortSignal?.aborted) { + const bot = await createPollingBot(); + if (!bot) { + continue; + } + + const cleanupState = await ensureWebhookCleanup(bot); + if (cleanupState === "retry") { + continue; + } + if (cleanupState === "exit") { + return; + } + + const state = await runPollingCycle(bot); + if (state === "exit") { + return; + } } } finally { unregisterHandler();