diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c605803a44..149a517fd64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Telegram/Webhook: pre-initialize webhook bots, switch webhook processing to callback-mode JSON handling, and preserve full near-limit payload reads under delayed handlers to prevent webhook request hangs and dropped updates. (#26156) - Agents/Subagents delivery: refactor subagent completion announce dispatch into an explicit queue/direct/fallback state machine, recover outbound channel-plugin resolution in cold/stale plugin-registry states across announce/message/gateway send paths, finalize cleanup bookkeeping when announce flow rejects, and treat Telegram sends without `message_id` as delivery failures (instead of false-success `"unknown"` IDs). (#26867, #25961, #26803, #25069, #26741) Thanks @SmithLabsLLC and @docaohieu2808. - Security/SSRF guard: classify IPv6 multicast literals (`ff00::/8`) as blocked/private-internal targets in shared SSRF IP checks, preventing multicast literals from bypassing URL-host preflight and DNS answer validation. This ships in the next npm release (`2026.2.25`). Thanks @zpbrent for reporting. - Slack/Session threads: prevent oversized parent-session inheritance from silently bricking new thread sessions, surface embedded context-overflow empty-result failures to users, and add configurable `session.parentForkMaxTokens` (default `100000`, `0` disables). (#26912) Thanks @markshields-tl. diff --git a/src/telegram/webhook.test.ts b/src/telegram/webhook.test.ts index 2c943a4be6f..0117c55823a 100644 --- a/src/telegram/webhook.test.ts +++ b/src/telegram/webhook.test.ts @@ -1,24 +1,26 @@ +import { createHash } from "node:crypto"; +import { once } from "node:events"; +import { request } from "node:http"; +import { setTimeout as sleep } from "node:timers/promises"; import { describe, expect, it, vi } from "vitest"; import { startTelegramWebhook } from "./webhook.js"; -const handlerSpy = vi.hoisted(() => - vi.fn( - (_req: unknown, res: { writeHead: (status: number) => void; end: (body?: string) => void }) => { - res.writeHead(200); - res.end("ok"); - }, - ), -); +const handlerSpy = vi.hoisted(() => vi.fn((..._args: unknown[]): unknown => undefined)); const setWebhookSpy = vi.hoisted(() => vi.fn()); +const deleteWebhookSpy = vi.hoisted(() => vi.fn(async () => true)); +const initSpy = vi.hoisted(() => vi.fn(async () => undefined)); const stopSpy = vi.hoisted(() => vi.fn()); const webhookCallbackSpy = vi.hoisted(() => vi.fn(() => handlerSpy)); const createTelegramBotSpy = vi.hoisted(() => vi.fn(() => ({ - api: { setWebhook: setWebhookSpy }, + init: initSpy, + api: { setWebhook: setWebhookSpy, deleteWebhook: deleteWebhookSpy }, stop: stopSpy, })), ); +const WEBHOOK_POST_TIMEOUT_MS = process.platform === "win32" ? 20_000 : 8_000; + vi.mock("grammy", async (importOriginal) => { const actual = await importOriginal(); return { @@ -31,8 +33,178 @@ vi.mock("./bot.js", () => ({ createTelegramBot: createTelegramBotSpy, })); +async function fetchWithTimeout( + input: string, + init: Omit, + timeoutMs: number, +): Promise { + const abort = new AbortController(); + const timer = setTimeout(() => { + abort.abort(); + }, timeoutMs); + try { + return await fetch(input, { ...init, signal: abort.signal }); + } finally { + clearTimeout(timer); + } +} + +async function postWebhookJson(params: { + url: string; + payload: string; + secret?: string; + timeoutMs?: number; +}): Promise { + return await fetchWithTimeout( + params.url, + { + method: "POST", + headers: { + "content-type": "application/json", + ...(params.secret ? { "x-telegram-bot-api-secret-token": params.secret } : {}), + }, + body: params.payload, + }, + params.timeoutMs ?? 5_000, + ); +} + +function createDeterministicRng(seed: number): () => number { + let state = seed >>> 0; + return () => { + state = (state * 1_664_525 + 1_013_904_223) >>> 0; + return state / 4_294_967_296; + }; +} + +async function postWebhookPayloadWithChunkPlan(params: { + port: number; + path: string; + payload: string; + secret: string; + mode: "single" | "random-chunked"; + timeoutMs?: number; +}): Promise<{ statusCode: number; body: string }> { + const payloadBuffer = Buffer.from(params.payload, "utf-8"); + return await new Promise((resolve, reject) => { + let bytesQueued = 0; + let chunksQueued = 0; + let phase: "writing" | "awaiting-response" = "writing"; + let settled = false; + const finishResolve = (value: { statusCode: number; body: string }) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timeout); + resolve(value); + }; + const finishReject = (error: unknown) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timeout); + reject(error); + }; + + const req = request( + { + hostname: "127.0.0.1", + port: params.port, + path: params.path, + method: "POST", + headers: { + "content-type": "application/json", + "content-length": String(payloadBuffer.length), + "x-telegram-bot-api-secret-token": params.secret, + }, + }, + (res) => { + const chunks: Buffer[] = []; + res.on("data", (chunk: Buffer | string) => { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }); + res.on("end", () => { + finishResolve({ + statusCode: res.statusCode ?? 0, + body: Buffer.concat(chunks).toString("utf-8"), + }); + }); + }, + ); + + const timeout = setTimeout(() => { + finishReject( + new Error( + `webhook post timed out after ${params.timeoutMs ?? 15_000}ms (phase=${phase}, bytesQueued=${bytesQueued}, chunksQueued=${chunksQueued}, totalBytes=${payloadBuffer.length})`, + ), + ); + req.destroy(); + }, params.timeoutMs ?? 15_000); + + req.on("error", (error) => { + finishReject(error); + }); + + const writeAll = async () => { + if (params.mode === "single") { + req.end(payloadBuffer); + return; + } + + const rng = createDeterministicRng(26156); + let offset = 0; + while (offset < payloadBuffer.length) { + const remaining = payloadBuffer.length - offset; + const nextSize = Math.max(1, Math.min(remaining, 1 + Math.floor(rng() * 8_192))); + const chunk = payloadBuffer.subarray(offset, offset + nextSize); + const canContinue = req.write(chunk); + offset += nextSize; + bytesQueued = offset; + chunksQueued += 1; + if (chunksQueued % 10 === 0) { + await sleep(1 + Math.floor(rng() * 3)); + } + if (!canContinue) { + // Windows CI occasionally stalls on waiting for drain indefinitely. + // Bound the wait, then continue queuing this small (~1MB) payload. + await Promise.race([once(req, "drain"), sleep(25)]); + } + } + phase = "awaiting-response"; + req.end(); + }; + + void writeAll().catch((error) => { + finishReject(error); + }); + }); +} + +function createNearLimitTelegramPayload(): { payload: string; sizeBytes: number } { + const maxBytes = 1_024 * 1_024; + const targetBytes = maxBytes - 4_096; + const shell = { update_id: 77_777, message: { text: "" } }; + const shellSize = Buffer.byteLength(JSON.stringify(shell), "utf-8"); + const textLength = Math.max(1, targetBytes - shellSize); + const pattern = "the quick brown fox jumps over the lazy dog "; + const repeats = Math.ceil(textLength / pattern.length); + const text = pattern.repeat(repeats).slice(0, textLength); + const payload = JSON.stringify({ + update_id: 77_777, + message: { text }, + }); + return { payload, sizeBytes: Buffer.byteLength(payload, "utf-8") }; +} + +function sha256(text: string): string { + return createHash("sha256").update(text).digest("hex"); +} + describe("startTelegramWebhook", () => { it("starts server, registers webhook, and serves health", async () => { + initSpy.mockClear(); createTelegramBotSpy.mockClear(); webhookCallbackSpy.mockClear(); const abort = new AbortController(); @@ -59,6 +231,7 @@ describe("startTelegramWebhook", () => { const health = await fetch(`${url}/healthz`); expect(health.status).toBe(200); + expect(initSpy).toHaveBeenCalledTimes(1); expect(setWebhookSpy).toHaveBeenCalled(); expect(webhookCallbackSpy).toHaveBeenCalledWith( expect.objectContaining({ @@ -66,7 +239,7 @@ describe("startTelegramWebhook", () => { setWebhook: expect.any(Function), }), }), - "http", + "callback", { secretToken: "secret", onTimeout: "return", @@ -101,7 +274,13 @@ describe("startTelegramWebhook", () => { if (!addr || typeof addr === "string") { throw new Error("no addr"); } - await fetch(`http://127.0.0.1:${addr.port}/hook`, { method: "POST" }); + const payload = JSON.stringify({ update_id: 1, message: { text: "hello" } }); + const response = await postWebhookJson({ + url: `http://127.0.0.1:${addr.port}/hook`, + payload, + secret: "secret", + }); + expect(response.status).toBe(200); expect(handlerSpy).toHaveBeenCalled(); abort.abort(); }); @@ -113,4 +292,371 @@ describe("startTelegramWebhook", () => { }), ).rejects.toThrow(/requires a non-empty secret token/i); }); + + it("registers webhook using the bound listening port when port is 0", async () => { + setWebhookSpy.mockClear(); + const abort = new AbortController(); + const { server } = await startTelegramWebhook({ + token: "tok", + secret: "secret", + port: 0, + abortSignal: abort.signal, + path: "/hook", + }); + try { + const addr = server.address(); + if (!addr || typeof addr === "string") { + throw new Error("no addr"); + } + expect(addr.port).toBeGreaterThan(0); + expect(setWebhookSpy).toHaveBeenCalledTimes(1); + expect(setWebhookSpy).toHaveBeenCalledWith( + `http://127.0.0.1:${addr.port}/hook`, + expect.objectContaining({ + secret_token: "secret", + }), + ); + } finally { + abort.abort(); + } + }); + + it("keeps webhook payload readable when callback delays body read", async () => { + handlerSpy.mockImplementationOnce(async (...args: unknown[]) => { + const [update, reply] = args as [unknown, (json: string) => Promise]; + await sleep(50); + await reply(JSON.stringify(update)); + }); + + const abort = new AbortController(); + const { server } = await startTelegramWebhook({ + token: "tok", + secret: "secret", + port: 0, + abortSignal: abort.signal, + path: "/hook", + }); + try { + const addr = server.address(); + if (!addr || typeof addr === "string") { + throw new Error("no addr"); + } + + const payload = JSON.stringify({ update_id: 1, message: { text: "hello" } }); + const res = await postWebhookJson({ + url: `http://127.0.0.1:${addr.port}/hook`, + payload, + secret: "secret", + }); + expect(res.status).toBe(200); + const responseBody = await res.text(); + expect(JSON.parse(responseBody)).toEqual(JSON.parse(payload)); + } finally { + abort.abort(); + } + }); + + it("keeps webhook payload readable across multiple delayed reads", async () => { + const seenPayloads: string[] = []; + const delayedHandler = async (...args: unknown[]) => { + const [update, reply] = args as [unknown, (json: string) => Promise]; + await sleep(50); + seenPayloads.push(JSON.stringify(update)); + await reply("ok"); + }; + handlerSpy.mockImplementationOnce(delayedHandler).mockImplementationOnce(delayedHandler); + + const abort = new AbortController(); + const { server } = await startTelegramWebhook({ + token: "tok", + secret: "secret", + port: 0, + abortSignal: abort.signal, + path: "/hook", + }); + try { + const addr = server.address(); + if (!addr || typeof addr === "string") { + throw new Error("no addr"); + } + + const payloads = [ + JSON.stringify({ update_id: 1, message: { text: "first" } }), + JSON.stringify({ update_id: 2, message: { text: "second" } }), + ]; + + for (const payload of payloads) { + const res = await postWebhookJson({ + url: `http://127.0.0.1:${addr.port}/hook`, + payload, + secret: "secret", + }); + expect(res.status).toBe(200); + } + + expect(seenPayloads.map((x) => JSON.parse(x))).toEqual(payloads.map((x) => JSON.parse(x))); + } finally { + abort.abort(); + } + }); + + it("processes a second request after first-request delayed-init data loss", async () => { + const seenUpdates: unknown[] = []; + webhookCallbackSpy.mockImplementationOnce( + () => + vi.fn( + ( + update: unknown, + reply: (json: string) => Promise, + _secretHeader: string | undefined, + _unauthorized: () => Promise, + ) => { + seenUpdates.push(update); + void (async () => { + await sleep(50); + await reply("ok"); + })(); + }, + ) as unknown as typeof handlerSpy, + ); + + const secret = "secret"; + const abort = new AbortController(); + const { server } = await startTelegramWebhook({ + token: "tok", + secret, + port: 0, + abortSignal: abort.signal, + path: "/hook", + }); + + try { + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("no addr"); + } + + const firstPayload = JSON.stringify({ update_id: 100, message: { text: "first" } }); + const secondPayload = JSON.stringify({ update_id: 101, message: { text: "second" } }); + const firstResponse = await postWebhookPayloadWithChunkPlan({ + port: address.port, + path: "/hook", + payload: firstPayload, + secret, + mode: "single", + timeoutMs: WEBHOOK_POST_TIMEOUT_MS, + }); + const secondResponse = await postWebhookPayloadWithChunkPlan({ + port: address.port, + path: "/hook", + payload: secondPayload, + secret, + mode: "single", + timeoutMs: WEBHOOK_POST_TIMEOUT_MS, + }); + + expect(firstResponse.statusCode).toBe(200); + expect(secondResponse.statusCode).toBe(200); + expect(seenUpdates).toEqual([JSON.parse(firstPayload), JSON.parse(secondPayload)]); + } finally { + abort.abort(); + } + }); + + it("handles near-limit payload with random chunk writes and event-loop yields", async () => { + const seenUpdates: Array<{ update_id: number; message: { text: string } }> = []; + webhookCallbackSpy.mockImplementationOnce( + () => + vi.fn( + ( + update: unknown, + reply: (json: string) => Promise, + _secretHeader: string | undefined, + _unauthorized: () => Promise, + ) => { + seenUpdates.push(update as { update_id: number; message: { text: string } }); + void reply("ok"); + }, + ) as unknown as typeof handlerSpy, + ); + + const { payload, sizeBytes } = createNearLimitTelegramPayload(); + expect(sizeBytes).toBeLessThan(1_024 * 1_024); + expect(sizeBytes).toBeGreaterThan(256 * 1_024); + const expected = JSON.parse(payload) as { update_id: number; message: { text: string } }; + + const secret = "secret"; + const abort = new AbortController(); + const { server } = await startTelegramWebhook({ + token: "tok", + secret, + port: 0, + abortSignal: abort.signal, + path: "/hook", + }); + + try { + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("no addr"); + } + + const response = await postWebhookPayloadWithChunkPlan({ + port: address.port, + path: "/hook", + payload, + secret, + mode: "random-chunked", + timeoutMs: WEBHOOK_POST_TIMEOUT_MS, + }); + + expect(response.statusCode).toBe(200); + expect(seenUpdates).toHaveLength(1); + expect(seenUpdates[0]?.update_id).toBe(expected.update_id); + expect(seenUpdates[0]?.message.text.length).toBe(expected.message.text.length); + expect(sha256(seenUpdates[0]?.message.text ?? "")).toBe(sha256(expected.message.text)); + } finally { + abort.abort(); + } + }); + + it("handles near-limit payload written in a single request write", async () => { + const seenUpdates: Array<{ update_id: number; message: { text: string } }> = []; + webhookCallbackSpy.mockImplementationOnce( + () => + vi.fn( + ( + update: unknown, + reply: (json: string) => Promise, + _secretHeader: string | undefined, + _unauthorized: () => Promise, + ) => { + seenUpdates.push(update as { update_id: number; message: { text: string } }); + void reply("ok"); + }, + ) as unknown as typeof handlerSpy, + ); + + const { payload, sizeBytes } = createNearLimitTelegramPayload(); + expect(sizeBytes).toBeLessThan(1_024 * 1_024); + expect(sizeBytes).toBeGreaterThan(256 * 1_024); + const expected = JSON.parse(payload) as { update_id: number; message: { text: string } }; + + const secret = "secret"; + const abort = new AbortController(); + const { server } = await startTelegramWebhook({ + token: "tok", + secret, + port: 0, + abortSignal: abort.signal, + path: "/hook", + }); + + try { + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("no addr"); + } + + const response = await postWebhookPayloadWithChunkPlan({ + port: address.port, + path: "/hook", + payload, + secret, + mode: "single", + timeoutMs: WEBHOOK_POST_TIMEOUT_MS, + }); + + expect(response.statusCode).toBe(200); + expect(seenUpdates).toHaveLength(1); + expect(seenUpdates[0]?.update_id).toBe(expected.update_id); + expect(seenUpdates[0]?.message.text.length).toBe(expected.message.text.length); + expect(sha256(seenUpdates[0]?.message.text ?? "")).toBe(sha256(expected.message.text)); + } finally { + abort.abort(); + } + }); + + it("rejects payloads larger than 1MB before invoking webhook handler", async () => { + handlerSpy.mockClear(); + const abort = new AbortController(); + const { server } = await startTelegramWebhook({ + token: "tok", + secret: "secret", + port: 0, + abortSignal: abort.signal, + path: "/hook", + }); + + try { + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("no addr"); + } + + const responseOrError = await new Promise< + | { kind: "response"; statusCode: number; body: string } + | { kind: "error"; code: string | undefined } + >((resolve) => { + const req = request( + { + hostname: "127.0.0.1", + port: address.port, + path: "/hook", + method: "POST", + headers: { + "content-type": "application/json", + "content-length": String(1_024 * 1_024 + 2_048), + "x-telegram-bot-api-secret-token": "secret", + }, + }, + (res) => { + const chunks: Buffer[] = []; + res.on("data", (chunk: Buffer | string) => { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + }); + res.on("end", () => { + resolve({ + kind: "response", + statusCode: res.statusCode ?? 0, + body: Buffer.concat(chunks).toString("utf-8"), + }); + }); + }, + ); + req.on("error", (error: NodeJS.ErrnoException) => { + resolve({ kind: "error", code: error.code }); + }); + req.end("{}"); + }); + + if (responseOrError.kind === "response") { + expect(responseOrError.statusCode).toBe(413); + expect(responseOrError.body).toBe("Payload too large"); + } else { + expect(responseOrError.code).toBeOneOf(["ECONNRESET", "EPIPE"]); + } + expect(handlerSpy).not.toHaveBeenCalled(); + } finally { + abort.abort(); + } + }); + + it("de-registers webhook when shutting down", async () => { + deleteWebhookSpy.mockClear(); + const abort = new AbortController(); + await startTelegramWebhook({ + token: "tok", + secret: "secret", + port: 0, + abortSignal: abort.signal, + path: "/hook", + }); + + abort.abort(); + await sleep(25); + + expect(deleteWebhookSpy).toHaveBeenCalledTimes(1); + expect(deleteWebhookSpy).toHaveBeenCalledWith({ drop_pending_updates: false }); + }); }); diff --git a/src/telegram/webhook.ts b/src/telegram/webhook.ts index 9eb3c73d7f4..0fd887f956c 100644 --- a/src/telegram/webhook.ts +++ b/src/telegram/webhook.ts @@ -3,7 +3,7 @@ import { webhookCallback } from "grammy"; import type { OpenClawConfig } from "../config/config.js"; import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js"; import { formatErrorMessage } from "../infra/errors.js"; -import { installRequestBodyLimitGuard } from "../infra/http-body.js"; +import { readJsonBodyWithLimit } from "../infra/http-body.js"; import { logWebhookError, logWebhookProcessed, @@ -21,6 +21,59 @@ const TELEGRAM_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024; const TELEGRAM_WEBHOOK_BODY_TIMEOUT_MS = 30_000; const TELEGRAM_WEBHOOK_CALLBACK_TIMEOUT_MS = 10_000; +async function listenHttpServer(params: { + server: ReturnType; + port: number; + host: string; +}) { + await new Promise((resolve, reject) => { + const onError = (err: Error) => { + params.server.off("error", onError); + reject(err); + }; + params.server.once("error", onError); + params.server.listen(params.port, params.host, () => { + params.server.off("error", onError); + resolve(); + }); + }); +} + +function resolveWebhookPublicUrl(params: { + configuredPublicUrl?: string; + server: ReturnType; + path: string; + host: string; + port: number; +}) { + if (params.configuredPublicUrl) { + return params.configuredPublicUrl; + } + const address = params.server.address(); + if (address && typeof address !== "string") { + const resolvedHost = + params.host === "0.0.0.0" || address.address === "0.0.0.0" || address.address === "::" + ? "localhost" + : address.address; + return `http://${resolvedHost}:${address.port}${params.path}`; + } + const fallbackHost = params.host === "0.0.0.0" ? "localhost" : params.host; + return `http://${fallbackHost}:${params.port}${params.path}`; +} + +async function initializeTelegramWebhookBot(params: { + bot: ReturnType; + runtime: RuntimeEnv; + abortSignal?: AbortSignal; +}) { + const initSignal = params.abortSignal as Parameters<(typeof params.bot)["init"]>[0]; + await withTelegramApiErrorLogging({ + operation: "getMe", + runtime: params.runtime, + fn: () => params.bot.init(initSignal), + }); +} + export async function startTelegramWebhook(opts: { token: string; accountId?: string; @@ -55,7 +108,12 @@ export async function startTelegramWebhook(opts: { config: opts.config, accountId: opts.accountId, }); - const handler = webhookCallback(bot, "http", { + await initializeTelegramWebhookBot({ + bot, + runtime, + abortSignal: opts.abortSignal, + }); + const handler = webhookCallback(bot, "callback", { secretToken: secret, onTimeout: "return", timeoutMilliseconds: TELEGRAM_WEBHOOK_CALLBACK_TIMEOUT_MS, @@ -66,6 +124,14 @@ export async function startTelegramWebhook(opts: { } const server = createServer((req, res) => { + const respondText = (statusCode: number, text = "") => { + if (res.headersSent || res.writableEnded) { + return; + } + res.writeHead(statusCode, { "Content-Type": "text/plain; charset=utf-8" }); + res.end(text); + }; + if (req.url === healthPath) { res.writeHead(200); res.end("ok"); @@ -80,69 +146,125 @@ export async function startTelegramWebhook(opts: { if (diagnosticsEnabled) { logWebhookReceived({ channel: "telegram", updateType: "telegram-post" }); } - const guard = installRequestBodyLimitGuard(req, res, { - maxBytes: TELEGRAM_WEBHOOK_MAX_BODY_BYTES, - timeoutMs: TELEGRAM_WEBHOOK_BODY_TIMEOUT_MS, - responseFormat: "text", - }); - if (guard.isTripped()) { - return; - } - const handled = handler(req, res); - if (handled && typeof handled.catch === "function") { - void handled - .then(() => { - if (diagnosticsEnabled) { - logWebhookProcessed({ - channel: "telegram", - updateType: "telegram-post", - durationMs: Date.now() - startTime, - }); - } - }) - .catch((err) => { - if (guard.isTripped()) { - return; - } - const errMsg = formatErrorMessage(err); - if (diagnosticsEnabled) { - logWebhookError({ - channel: "telegram", - updateType: "telegram-post", - error: errMsg, - }); - } - runtime.log?.(`webhook handler failed: ${errMsg}`); - if (!res.headersSent) { - res.writeHead(500); - } - res.end(); - }) - .finally(() => { - guard.dispose(); + void (async () => { + const body = await readJsonBodyWithLimit(req, { + maxBytes: TELEGRAM_WEBHOOK_MAX_BODY_BYTES, + timeoutMs: TELEGRAM_WEBHOOK_BODY_TIMEOUT_MS, + emptyObjectOnEmpty: false, + }); + if (!body.ok) { + if (body.code === "PAYLOAD_TOO_LARGE") { + respondText(413, body.error); + return; + } + if (body.code === "REQUEST_BODY_TIMEOUT") { + respondText(408, body.error); + return; + } + if (body.code === "CONNECTION_CLOSED") { + respondText(400, body.error); + return; + } + respondText(400, body.error); + return; + } + + let replied = false; + const reply = async (json: string) => { + if (replied) { + return; + } + replied = true; + if (res.headersSent || res.writableEnded) { + return; + } + res.writeHead(200, { "Content-Type": "application/json; charset=utf-8" }); + res.end(json); + }; + const unauthorized = async () => { + if (replied) { + return; + } + replied = true; + respondText(401, "unauthorized"); + }; + const secretHeaderRaw = req.headers["x-telegram-bot-api-secret-token"]; + const secretHeader = Array.isArray(secretHeaderRaw) ? secretHeaderRaw[0] : secretHeaderRaw; + + await handler(body.value, reply, secretHeader, unauthorized); + if (!replied) { + respondText(200); + } + + if (diagnosticsEnabled) { + logWebhookProcessed({ + channel: "telegram", + updateType: "telegram-post", + durationMs: Date.now() - startTime, }); - return; + } + })().catch((err) => { + const errMsg = formatErrorMessage(err); + if (diagnosticsEnabled) { + logWebhookError({ + channel: "telegram", + updateType: "telegram-post", + error: errMsg, + }); + } + runtime.log?.(`webhook handler failed: ${errMsg}`); + respondText(500); + }); + }); + + await listenHttpServer({ + server, + port, + host, + }); + + const publicUrl = resolveWebhookPublicUrl({ + configuredPublicUrl: opts.publicUrl, + server, + path, + host, + port, + }); + + try { + await withTelegramApiErrorLogging({ + operation: "setWebhook", + runtime, + fn: () => + bot.api.setWebhook(publicUrl, { + secret_token: secret, + allowed_updates: resolveTelegramAllowedUpdates(), + }), + }); + } catch (err) { + server.close(); + void bot.stop(); + if (diagnosticsEnabled) { + stopDiagnosticHeartbeat(); } - guard.dispose(); - }); + throw err; + } - const publicUrl = - opts.publicUrl ?? `http://${host === "0.0.0.0" ? "localhost" : host}:${port}${path}`; - - await withTelegramApiErrorLogging({ - operation: "setWebhook", - runtime, - fn: () => - bot.api.setWebhook(publicUrl, { - secret_token: secret, - allowed_updates: resolveTelegramAllowedUpdates(), - }), - }); - - await new Promise((resolve) => server.listen(port, host, resolve)); runtime.log?.(`webhook listening on ${publicUrl}`); + let shutDown = false; const shutdown = () => { + if (shutDown) { + return; + } + shutDown = true; + void withTelegramApiErrorLogging({ + operation: "deleteWebhook", + runtime, + fn: () => bot.api.deleteWebhook({ drop_pending_updates: false }), + }).catch(() => { + // withTelegramApiErrorLogging has already emitted the failure. + }); server.close(); void bot.stop(); if (diagnosticsEnabled) {