refactor(telegram): simplify polling restart flow

This commit is contained in:
Peter Steinberger
2026-02-26 03:32:36 +01:00
parent 069bbf9741
commit b786d11fea
2 changed files with 184 additions and 125 deletions

View File

@@ -67,6 +67,36 @@ const { startTelegramWebhookSpy } = vi.hoisted(() => ({
startTelegramWebhookSpy: vi.fn(async () => ({ server: { close: vi.fn() }, stop: vi.fn() })),
}));
type RunnerStub = {
task: () => Promise<void>;
stop: ReturnType<typeof vi.fn<() => void | Promise<void>>>;
isRunning: () => boolean;
};
const makeRunnerStub = (overrides: Partial<RunnerStub> = {}): RunnerStub => ({
task: overrides.task ?? (() => Promise.resolve()),
stop: overrides.stop ?? vi.fn<() => void | Promise<void>>(),
isRunning: overrides.isRunning ?? (() => false),
});
async function monitorWithAutoAbort(
opts: Omit<Parameters<typeof monitorTelegramProvider>[0], "abortSignal"> = {},
) {
const abort = new AbortController();
runSpy.mockImplementationOnce(() =>
makeRunnerStub({
task: async () => {
abort.abort();
},
}),
);
await monitorTelegramProvider({
token: "tok",
...opts,
abortSignal: abort.signal,
});
}
vi.mock("../config/config.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../config/config.js")>();
return {
@@ -149,7 +179,7 @@ describe("monitorTelegramProvider (grammY)", () => {
Object.values(api).forEach((fn) => {
fn?.mockReset?.();
});
await monitorTelegramProvider({ token: "tok" });
await monitorWithAutoAbort();
expect(handlers.message).toBeDefined();
await handlers.message?.({
message: {
@@ -172,7 +202,7 @@ describe("monitorTelegramProvider (grammY)", () => {
channels: { telegram: {} },
});
await monitorTelegramProvider({ token: "tok" });
await monitorWithAutoAbort();
expect(runSpy).toHaveBeenCalledWith(
expect.anything(),
@@ -180,7 +210,7 @@ describe("monitorTelegramProvider (grammY)", () => {
sink: { concurrency: 3 },
runner: expect.objectContaining({
silent: true,
maxRetryTime: 5 * 60 * 1000,
maxRetryTime: 60 * 60 * 1000,
retryInterval: "exponential",
}),
}),
@@ -191,7 +221,7 @@ describe("monitorTelegramProvider (grammY)", () => {
Object.values(api).forEach((fn) => {
fn?.mockReset?.();
});
await monitorTelegramProvider({ token: "tok" });
await monitorWithAutoAbort();
await handlers.message?.({
message: {
message_id: 2,
@@ -205,24 +235,27 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("retries on recoverable undici fetch errors", async () => {
const abort = new AbortController();
const networkError = Object.assign(new TypeError("fetch failed"), {
cause: Object.assign(new Error("connect timeout"), {
code: "UND_ERR_CONNECT_TIMEOUT",
}),
});
runSpy
.mockImplementationOnce(() => ({
task: () => Promise.reject(networkError),
stop: vi.fn(),
isRunning: (): boolean => false,
}))
.mockImplementationOnce(() => ({
task: () => Promise.resolve(),
stop: vi.fn(),
isRunning: (): boolean => false,
}));
.mockImplementationOnce(() =>
makeRunnerStub({
task: () => Promise.reject(networkError),
}),
)
.mockImplementationOnce(() =>
makeRunnerStub({
task: async () => {
abort.abort();
},
}),
);
await monitorTelegramProvider({ token: "tok" });
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
expect(computeBackoff).toHaveBeenCalled();
expect(sleepWithAbort).toHaveBeenCalled();
@@ -230,6 +263,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("deletes webhook before starting polling", async () => {
const abort = new AbortController();
const order: string[] = [];
api.deleteWebhook.mockReset();
api.deleteWebhook.mockImplementationOnce(async () => {
@@ -238,20 +272,21 @@ describe("monitorTelegramProvider (grammY)", () => {
});
runSpy.mockImplementationOnce(() => {
order.push("run");
return {
task: () => Promise.resolve(),
stop: vi.fn(),
isRunning: () => false,
};
return makeRunnerStub({
task: async () => {
abort.abort();
},
});
});
await monitorTelegramProvider({ token: "tok" });
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
expect(api.deleteWebhook).toHaveBeenCalledWith({ drop_pending_updates: false });
expect(order).toEqual(["deleteWebhook", "run"]);
});
it("retries recoverable deleteWebhook failures before polling", async () => {
const abort = new AbortController();
const cleanupError = Object.assign(new TypeError("fetch failed"), {
cause: Object.assign(new Error("connect timeout"), {
code: "UND_ERR_CONNECT_TIMEOUT",
@@ -259,13 +294,15 @@ describe("monitorTelegramProvider (grammY)", () => {
});
api.deleteWebhook.mockReset();
api.deleteWebhook.mockRejectedValueOnce(cleanupError).mockResolvedValueOnce(true);
runSpy.mockImplementationOnce(() => ({
task: () => Promise.resolve(),
stop: vi.fn(),
isRunning: () => false,
}));
runSpy.mockImplementationOnce(() =>
makeRunnerStub({
task: async () => {
abort.abort();
},
}),
);
await monitorTelegramProvider({ token: "tok" });
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
expect(api.deleteWebhook).toHaveBeenCalledTimes(2);
expect(computeBackoff).toHaveBeenCalled();
@@ -274,6 +311,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("retries setup-time recoverable errors before starting polling", async () => {
const abort = new AbortController();
const setupError = Object.assign(new TypeError("fetch failed"), {
cause: Object.assign(new Error("connect timeout"), {
code: "UND_ERR_CONNECT_TIMEOUT",
@@ -281,13 +319,15 @@ describe("monitorTelegramProvider (grammY)", () => {
});
createTelegramBotErrors.push(setupError);
runSpy.mockImplementationOnce(() => ({
task: () => Promise.resolve(),
stop: vi.fn(),
isRunning: () => false,
}));
runSpy.mockImplementationOnce(() =>
makeRunnerStub({
task: async () => {
abort.abort();
},
}),
);
await monitorTelegramProvider({ token: "tok" });
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
expect(computeBackoff).toHaveBeenCalled();
expect(sleepWithAbort).toHaveBeenCalled();
@@ -295,6 +335,7 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("awaits runner.stop before retrying after recoverable polling error", async () => {
const abort = new AbortController();
const recoverableError = Object.assign(new TypeError("fetch failed"), {
cause: Object.assign(new Error("connect timeout"), {
code: "UND_ERR_CONNECT_TIMEOUT",
@@ -307,21 +348,22 @@ describe("monitorTelegramProvider (grammY)", () => {
});
runSpy
.mockImplementationOnce(() => ({
task: () => Promise.reject(recoverableError),
stop: firstStop,
isRunning: () => false,
}))
.mockImplementationOnce(() =>
makeRunnerStub({
task: () => Promise.reject(recoverableError),
stop: firstStop,
}),
)
.mockImplementationOnce(() => {
expect(firstStopped).toBe(true);
return {
task: () => Promise.resolve(),
stop: vi.fn(),
isRunning: () => false,
};
return makeRunnerStub({
task: async () => {
abort.abort();
},
});
});
await monitorTelegramProvider({ token: "tok" });
await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
expect(firstStop).toHaveBeenCalled();
expect(computeBackoff).toHaveBeenCalled();
@@ -330,16 +372,17 @@ describe("monitorTelegramProvider (grammY)", () => {
});
it("surfaces non-recoverable errors", async () => {
runSpy.mockImplementationOnce(() => ({
task: () => Promise.reject(new Error("bad token")),
stop: vi.fn(),
isRunning: (): boolean => false,
}));
runSpy.mockImplementationOnce(() =>
makeRunnerStub({
task: () => Promise.reject(new Error("bad token")),
}),
);
await expect(monitorTelegramProvider({ token: "tok" })).rejects.toThrow("bad token");
});
it("force-restarts polling when unhandled network rejection stalls runner", async () => {
const abort = new AbortController();
let running = true;
let releaseTask: (() => void) | undefined;
const stop = vi.fn(async () => {
@@ -348,21 +391,25 @@ describe("monitorTelegramProvider (grammY)", () => {
});
runSpy
.mockImplementationOnce(() => ({
task: () =>
new Promise<void>((resolve) => {
releaseTask = resolve;
}),
stop,
isRunning: () => running,
}))
.mockImplementationOnce(() => ({
task: () => Promise.resolve(),
stop: vi.fn(),
isRunning: () => false,
}));
.mockImplementationOnce(() =>
makeRunnerStub({
task: () =>
new Promise<void>((resolve) => {
releaseTask = resolve;
}),
stop,
isRunning: () => running,
}),
)
.mockImplementationOnce(() =>
makeRunnerStub({
task: async () => {
abort.abort();
},
}),
);
const monitor = monitorTelegramProvider({ token: "tok" });
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await vi.waitFor(() => expect(runSpy).toHaveBeenCalledTimes(1));
expect(emitUnhandledRejection(new TypeError("fetch failed"))).toBe(true);

View File

@@ -45,9 +45,8 @@ export function createTelegramRunnerOptions(cfg: OpenClawConfig): RunOptions<unk
},
// Suppress grammY getUpdates stack traces; we log concise errors ourselves.
silent: true,
// Retry transient failures before surfacing errors. Use a generous
// window so the runner survives prolonged outages (e.g. scheduled
// internet downtime) without the outer loop needing to restart it.
// Keep grammY retrying for a long outage window. If polling still
// stops, the outer monitor loop restarts it with backoff.
maxRetryTime: 60 * 60 * 1000,
retryInterval: "exponential",
},
@@ -61,6 +60,8 @@ const TELEGRAM_POLL_RESTART_POLICY = {
jitter: 0.25,
};
type TelegramBot = ReturnType<typeof createTelegramBot>;
const isGetUpdatesConflict = (err: unknown) => {
if (!err || typeof err !== "object") {
return false;
@@ -188,21 +189,11 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
let restartAttempts = 0;
let webhookCleared = false;
const runnerOptions = createTelegramRunnerOptions(cfg);
const waitBeforeRetryOnRecoverableSetupError = async (
err: unknown,
logPrefix: string,
): Promise<boolean> => {
if (opts.abortSignal?.aborted) {
return false;
}
if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) {
throw err;
}
const waitBeforeRestart = async (buildLine: (delay: string) => string): Promise<boolean> => {
restartAttempts += 1;
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
(opts.runtime?.error ?? console.error)(
`${logPrefix}: ${formatErrorMessage(err)}; retrying in ${formatDurationPrecise(delayMs)}.`,
);
const delay = formatDurationPrecise(delayMs);
log(buildLine(delay));
try {
await sleepWithAbort(delayMs, opts.abortSignal);
} catch (sleepErr) {
@@ -214,10 +205,24 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
return true;
};
while (!opts.abortSignal?.aborted) {
let bot;
const waitBeforeRetryOnRecoverableSetupError = async (
err: unknown,
logPrefix: string,
): Promise<boolean> => {
if (opts.abortSignal?.aborted) {
return false;
}
if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) {
throw err;
}
return waitBeforeRestart(
(delay) => `${logPrefix}: ${formatErrorMessage(err)}; retrying in ${delay}.`,
);
};
const createPollingBot = async (): Promise<TelegramBot | undefined> => {
try {
bot = createTelegramBot({
return createTelegramBot({
token,
runtime: opts.runtime,
proxyFetch,
@@ -234,31 +239,34 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
"Telegram setup network error",
);
if (!shouldRetry) {
return;
return undefined;
}
continue;
return undefined;
}
};
if (!webhookCleared) {
try {
await withTelegramApiErrorLogging({
operation: "deleteWebhook",
runtime: opts.runtime,
fn: () => bot.api.deleteWebhook({ drop_pending_updates: false }),
});
webhookCleared = true;
} catch (err) {
const shouldRetry = await waitBeforeRetryOnRecoverableSetupError(
err,
"Telegram webhook cleanup failed",
);
if (!shouldRetry) {
return;
}
continue;
}
const ensureWebhookCleanup = async (bot: TelegramBot): Promise<"ready" | "retry" | "exit"> => {
if (webhookCleared) {
return "ready";
}
try {
await withTelegramApiErrorLogging({
operation: "deleteWebhook",
runtime: opts.runtime,
fn: () => bot.api.deleteWebhook({ drop_pending_updates: false }),
});
webhookCleared = true;
return "ready";
} catch (err) {
const shouldRetry = await waitBeforeRetryOnRecoverableSetupError(
err,
"Telegram webhook cleanup failed",
);
return shouldRetry ? "retry" : "exit";
}
};
const runPollingCycle = async (bot: TelegramBot): Promise<"continue" | "exit"> => {
const runner = run(bot, runnerOptions);
activeRunner = runner;
let stopPromise: Promise<void> | undefined;
@@ -280,23 +288,16 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
// runner.task() returns a promise that resolves when the runner stops
await runner.task();
if (opts.abortSignal?.aborted) {
return;
return "exit";
}
// The runner stopped on its own. This can happen when grammY's
// maxRetryTime is exceeded (e.g. prolonged network outage).
// Instead of exiting permanently, restart with backoff so polling
// recovers once connectivity is restored.
restartAttempts += 1;
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
const reason = forceRestarted
? "unhandled network error"
: "runner stopped (maxRetryTime exceeded or graceful stop)";
forceRestarted = false;
log(
`Telegram polling runner stopped (${reason}); restarting in ${formatDurationPrecise(delayMs)}.`,
const shouldRestart = await waitBeforeRestart(
(delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`,
);
await sleepWithAbort(delayMs, opts.abortSignal);
continue;
return shouldRestart ? "continue" : "exit";
} catch (err) {
forceRestarted = false;
if (opts.abortSignal?.aborted) {
@@ -307,25 +308,36 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
if (!isConflict && !isRecoverable) {
throw err;
}
restartAttempts += 1;
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
const reason = isConflict ? "getUpdates conflict" : "network error";
const errMsg = formatErrorMessage(err);
(opts.runtime?.error ?? console.error)(
`Telegram ${reason}: ${errMsg}; retrying in ${formatDurationPrecise(delayMs)}.`,
const shouldRestart = await waitBeforeRestart(
(delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`,
);
try {
await sleepWithAbort(delayMs, opts.abortSignal);
} catch (sleepErr) {
if (opts.abortSignal?.aborted) {
return;
}
throw sleepErr;
}
return shouldRestart ? "continue" : "exit";
} finally {
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
await stopRunner();
}
};
while (!opts.abortSignal?.aborted) {
const bot = await createPollingBot();
if (!bot) {
continue;
}
const cleanupState = await ensureWebhookCleanup(bot);
if (cleanupState === "retry") {
continue;
}
if (cleanupState === "exit") {
return;
}
const state = await runPollingCycle(bot);
if (state === "exit") {
return;
}
}
} finally {
unregisterHandler();