mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-08 06:54:24 +00:00
fix(telegram): webhook hang - tests and fix (openclaw#26933) thanks @huntharo
Verified: - pnpm build - pnpm check - pnpm test:macmini Co-authored-by: huntharo <5617868+huntharo@users.noreply.github.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
@@ -16,6 +16,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Telegram/Webhook: pre-initialize webhook bots, switch webhook processing to callback-mode JSON handling, and preserve full near-limit payload reads under delayed handlers to prevent webhook request hangs and dropped updates. (#26156)
|
||||
- Agents/Subagents delivery: refactor subagent completion announce dispatch into an explicit queue/direct/fallback state machine, recover outbound channel-plugin resolution in cold/stale plugin-registry states across announce/message/gateway send paths, finalize cleanup bookkeeping when announce flow rejects, and treat Telegram sends without `message_id` as delivery failures (instead of false-success `"unknown"` IDs). (#26867, #25961, #26803, #25069, #26741) Thanks @SmithLabsLLC and @docaohieu2808.
|
||||
- Security/SSRF guard: classify IPv6 multicast literals (`ff00::/8`) as blocked/private-internal targets in shared SSRF IP checks, preventing multicast literals from bypassing URL-host preflight and DNS answer validation. This ships in the next npm release (`2026.2.25`). Thanks @zpbrent for reporting.
|
||||
- Slack/Session threads: prevent oversized parent-session inheritance from silently bricking new thread sessions, surface embedded context-overflow empty-result failures to users, and add configurable `session.parentForkMaxTokens` (default `100000`, `0` disables). (#26912) Thanks @markshields-tl.
|
||||
|
||||
@@ -1,24 +1,26 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import { once } from "node:events";
|
||||
import { request } from "node:http";
|
||||
import { setTimeout as sleep } from "node:timers/promises";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { startTelegramWebhook } from "./webhook.js";
|
||||
|
||||
const handlerSpy = vi.hoisted(() =>
|
||||
vi.fn(
|
||||
(_req: unknown, res: { writeHead: (status: number) => void; end: (body?: string) => void }) => {
|
||||
res.writeHead(200);
|
||||
res.end("ok");
|
||||
},
|
||||
),
|
||||
);
|
||||
const handlerSpy = vi.hoisted(() => vi.fn((..._args: unknown[]): unknown => undefined));
|
||||
const setWebhookSpy = vi.hoisted(() => vi.fn());
|
||||
const deleteWebhookSpy = vi.hoisted(() => vi.fn(async () => true));
|
||||
const initSpy = vi.hoisted(() => vi.fn(async () => undefined));
|
||||
const stopSpy = vi.hoisted(() => vi.fn());
|
||||
const webhookCallbackSpy = vi.hoisted(() => vi.fn(() => handlerSpy));
|
||||
const createTelegramBotSpy = vi.hoisted(() =>
|
||||
vi.fn(() => ({
|
||||
api: { setWebhook: setWebhookSpy },
|
||||
init: initSpy,
|
||||
api: { setWebhook: setWebhookSpy, deleteWebhook: deleteWebhookSpy },
|
||||
stop: stopSpy,
|
||||
})),
|
||||
);
|
||||
|
||||
const WEBHOOK_POST_TIMEOUT_MS = process.platform === "win32" ? 20_000 : 8_000;
|
||||
|
||||
vi.mock("grammy", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("grammy")>();
|
||||
return {
|
||||
@@ -31,8 +33,178 @@ vi.mock("./bot.js", () => ({
|
||||
createTelegramBot: createTelegramBotSpy,
|
||||
}));
|
||||
|
||||
async function fetchWithTimeout(
|
||||
input: string,
|
||||
init: Omit<RequestInit, "signal">,
|
||||
timeoutMs: number,
|
||||
): Promise<Response> {
|
||||
const abort = new AbortController();
|
||||
const timer = setTimeout(() => {
|
||||
abort.abort();
|
||||
}, timeoutMs);
|
||||
try {
|
||||
return await fetch(input, { ...init, signal: abort.signal });
|
||||
} finally {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
|
||||
async function postWebhookJson(params: {
|
||||
url: string;
|
||||
payload: string;
|
||||
secret?: string;
|
||||
timeoutMs?: number;
|
||||
}): Promise<Response> {
|
||||
return await fetchWithTimeout(
|
||||
params.url,
|
||||
{
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
...(params.secret ? { "x-telegram-bot-api-secret-token": params.secret } : {}),
|
||||
},
|
||||
body: params.payload,
|
||||
},
|
||||
params.timeoutMs ?? 5_000,
|
||||
);
|
||||
}
|
||||
|
||||
function createDeterministicRng(seed: number): () => number {
|
||||
let state = seed >>> 0;
|
||||
return () => {
|
||||
state = (state * 1_664_525 + 1_013_904_223) >>> 0;
|
||||
return state / 4_294_967_296;
|
||||
};
|
||||
}
|
||||
|
||||
async function postWebhookPayloadWithChunkPlan(params: {
|
||||
port: number;
|
||||
path: string;
|
||||
payload: string;
|
||||
secret: string;
|
||||
mode: "single" | "random-chunked";
|
||||
timeoutMs?: number;
|
||||
}): Promise<{ statusCode: number; body: string }> {
|
||||
const payloadBuffer = Buffer.from(params.payload, "utf-8");
|
||||
return await new Promise((resolve, reject) => {
|
||||
let bytesQueued = 0;
|
||||
let chunksQueued = 0;
|
||||
let phase: "writing" | "awaiting-response" = "writing";
|
||||
let settled = false;
|
||||
const finishResolve = (value: { statusCode: number; body: string }) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
clearTimeout(timeout);
|
||||
resolve(value);
|
||||
};
|
||||
const finishReject = (error: unknown) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
clearTimeout(timeout);
|
||||
reject(error);
|
||||
};
|
||||
|
||||
const req = request(
|
||||
{
|
||||
hostname: "127.0.0.1",
|
||||
port: params.port,
|
||||
path: params.path,
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
"content-length": String(payloadBuffer.length),
|
||||
"x-telegram-bot-api-secret-token": params.secret,
|
||||
},
|
||||
},
|
||||
(res) => {
|
||||
const chunks: Buffer[] = [];
|
||||
res.on("data", (chunk: Buffer | string) => {
|
||||
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
|
||||
});
|
||||
res.on("end", () => {
|
||||
finishResolve({
|
||||
statusCode: res.statusCode ?? 0,
|
||||
body: Buffer.concat(chunks).toString("utf-8"),
|
||||
});
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
finishReject(
|
||||
new Error(
|
||||
`webhook post timed out after ${params.timeoutMs ?? 15_000}ms (phase=${phase}, bytesQueued=${bytesQueued}, chunksQueued=${chunksQueued}, totalBytes=${payloadBuffer.length})`,
|
||||
),
|
||||
);
|
||||
req.destroy();
|
||||
}, params.timeoutMs ?? 15_000);
|
||||
|
||||
req.on("error", (error) => {
|
||||
finishReject(error);
|
||||
});
|
||||
|
||||
const writeAll = async () => {
|
||||
if (params.mode === "single") {
|
||||
req.end(payloadBuffer);
|
||||
return;
|
||||
}
|
||||
|
||||
const rng = createDeterministicRng(26156);
|
||||
let offset = 0;
|
||||
while (offset < payloadBuffer.length) {
|
||||
const remaining = payloadBuffer.length - offset;
|
||||
const nextSize = Math.max(1, Math.min(remaining, 1 + Math.floor(rng() * 8_192)));
|
||||
const chunk = payloadBuffer.subarray(offset, offset + nextSize);
|
||||
const canContinue = req.write(chunk);
|
||||
offset += nextSize;
|
||||
bytesQueued = offset;
|
||||
chunksQueued += 1;
|
||||
if (chunksQueued % 10 === 0) {
|
||||
await sleep(1 + Math.floor(rng() * 3));
|
||||
}
|
||||
if (!canContinue) {
|
||||
// Windows CI occasionally stalls on waiting for drain indefinitely.
|
||||
// Bound the wait, then continue queuing this small (~1MB) payload.
|
||||
await Promise.race([once(req, "drain"), sleep(25)]);
|
||||
}
|
||||
}
|
||||
phase = "awaiting-response";
|
||||
req.end();
|
||||
};
|
||||
|
||||
void writeAll().catch((error) => {
|
||||
finishReject(error);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function createNearLimitTelegramPayload(): { payload: string; sizeBytes: number } {
|
||||
const maxBytes = 1_024 * 1_024;
|
||||
const targetBytes = maxBytes - 4_096;
|
||||
const shell = { update_id: 77_777, message: { text: "" } };
|
||||
const shellSize = Buffer.byteLength(JSON.stringify(shell), "utf-8");
|
||||
const textLength = Math.max(1, targetBytes - shellSize);
|
||||
const pattern = "the quick brown fox jumps over the lazy dog ";
|
||||
const repeats = Math.ceil(textLength / pattern.length);
|
||||
const text = pattern.repeat(repeats).slice(0, textLength);
|
||||
const payload = JSON.stringify({
|
||||
update_id: 77_777,
|
||||
message: { text },
|
||||
});
|
||||
return { payload, sizeBytes: Buffer.byteLength(payload, "utf-8") };
|
||||
}
|
||||
|
||||
function sha256(text: string): string {
|
||||
return createHash("sha256").update(text).digest("hex");
|
||||
}
|
||||
|
||||
describe("startTelegramWebhook", () => {
|
||||
it("starts server, registers webhook, and serves health", async () => {
|
||||
initSpy.mockClear();
|
||||
createTelegramBotSpy.mockClear();
|
||||
webhookCallbackSpy.mockClear();
|
||||
const abort = new AbortController();
|
||||
@@ -59,6 +231,7 @@ describe("startTelegramWebhook", () => {
|
||||
|
||||
const health = await fetch(`${url}/healthz`);
|
||||
expect(health.status).toBe(200);
|
||||
expect(initSpy).toHaveBeenCalledTimes(1);
|
||||
expect(setWebhookSpy).toHaveBeenCalled();
|
||||
expect(webhookCallbackSpy).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
@@ -66,7 +239,7 @@ describe("startTelegramWebhook", () => {
|
||||
setWebhook: expect.any(Function),
|
||||
}),
|
||||
}),
|
||||
"http",
|
||||
"callback",
|
||||
{
|
||||
secretToken: "secret",
|
||||
onTimeout: "return",
|
||||
@@ -101,7 +274,13 @@ describe("startTelegramWebhook", () => {
|
||||
if (!addr || typeof addr === "string") {
|
||||
throw new Error("no addr");
|
||||
}
|
||||
await fetch(`http://127.0.0.1:${addr.port}/hook`, { method: "POST" });
|
||||
const payload = JSON.stringify({ update_id: 1, message: { text: "hello" } });
|
||||
const response = await postWebhookJson({
|
||||
url: `http://127.0.0.1:${addr.port}/hook`,
|
||||
payload,
|
||||
secret: "secret",
|
||||
});
|
||||
expect(response.status).toBe(200);
|
||||
expect(handlerSpy).toHaveBeenCalled();
|
||||
abort.abort();
|
||||
});
|
||||
@@ -113,4 +292,371 @@ describe("startTelegramWebhook", () => {
|
||||
}),
|
||||
).rejects.toThrow(/requires a non-empty secret token/i);
|
||||
});
|
||||
|
||||
it("registers webhook using the bound listening port when port is 0", async () => {
|
||||
setWebhookSpy.mockClear();
|
||||
const abort = new AbortController();
|
||||
const { server } = await startTelegramWebhook({
|
||||
token: "tok",
|
||||
secret: "secret",
|
||||
port: 0,
|
||||
abortSignal: abort.signal,
|
||||
path: "/hook",
|
||||
});
|
||||
try {
|
||||
const addr = server.address();
|
||||
if (!addr || typeof addr === "string") {
|
||||
throw new Error("no addr");
|
||||
}
|
||||
expect(addr.port).toBeGreaterThan(0);
|
||||
expect(setWebhookSpy).toHaveBeenCalledTimes(1);
|
||||
expect(setWebhookSpy).toHaveBeenCalledWith(
|
||||
`http://127.0.0.1:${addr.port}/hook`,
|
||||
expect.objectContaining({
|
||||
secret_token: "secret",
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
abort.abort();
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps webhook payload readable when callback delays body read", async () => {
|
||||
handlerSpy.mockImplementationOnce(async (...args: unknown[]) => {
|
||||
const [update, reply] = args as [unknown, (json: string) => Promise<void>];
|
||||
await sleep(50);
|
||||
await reply(JSON.stringify(update));
|
||||
});
|
||||
|
||||
const abort = new AbortController();
|
||||
const { server } = await startTelegramWebhook({
|
||||
token: "tok",
|
||||
secret: "secret",
|
||||
port: 0,
|
||||
abortSignal: abort.signal,
|
||||
path: "/hook",
|
||||
});
|
||||
try {
|
||||
const addr = server.address();
|
||||
if (!addr || typeof addr === "string") {
|
||||
throw new Error("no addr");
|
||||
}
|
||||
|
||||
const payload = JSON.stringify({ update_id: 1, message: { text: "hello" } });
|
||||
const res = await postWebhookJson({
|
||||
url: `http://127.0.0.1:${addr.port}/hook`,
|
||||
payload,
|
||||
secret: "secret",
|
||||
});
|
||||
expect(res.status).toBe(200);
|
||||
const responseBody = await res.text();
|
||||
expect(JSON.parse(responseBody)).toEqual(JSON.parse(payload));
|
||||
} finally {
|
||||
abort.abort();
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps webhook payload readable across multiple delayed reads", async () => {
|
||||
const seenPayloads: string[] = [];
|
||||
const delayedHandler = async (...args: unknown[]) => {
|
||||
const [update, reply] = args as [unknown, (json: string) => Promise<void>];
|
||||
await sleep(50);
|
||||
seenPayloads.push(JSON.stringify(update));
|
||||
await reply("ok");
|
||||
};
|
||||
handlerSpy.mockImplementationOnce(delayedHandler).mockImplementationOnce(delayedHandler);
|
||||
|
||||
const abort = new AbortController();
|
||||
const { server } = await startTelegramWebhook({
|
||||
token: "tok",
|
||||
secret: "secret",
|
||||
port: 0,
|
||||
abortSignal: abort.signal,
|
||||
path: "/hook",
|
||||
});
|
||||
try {
|
||||
const addr = server.address();
|
||||
if (!addr || typeof addr === "string") {
|
||||
throw new Error("no addr");
|
||||
}
|
||||
|
||||
const payloads = [
|
||||
JSON.stringify({ update_id: 1, message: { text: "first" } }),
|
||||
JSON.stringify({ update_id: 2, message: { text: "second" } }),
|
||||
];
|
||||
|
||||
for (const payload of payloads) {
|
||||
const res = await postWebhookJson({
|
||||
url: `http://127.0.0.1:${addr.port}/hook`,
|
||||
payload,
|
||||
secret: "secret",
|
||||
});
|
||||
expect(res.status).toBe(200);
|
||||
}
|
||||
|
||||
expect(seenPayloads.map((x) => JSON.parse(x))).toEqual(payloads.map((x) => JSON.parse(x)));
|
||||
} finally {
|
||||
abort.abort();
|
||||
}
|
||||
});
|
||||
|
||||
it("processes a second request after first-request delayed-init data loss", async () => {
|
||||
const seenUpdates: unknown[] = [];
|
||||
webhookCallbackSpy.mockImplementationOnce(
|
||||
() =>
|
||||
vi.fn(
|
||||
(
|
||||
update: unknown,
|
||||
reply: (json: string) => Promise<void>,
|
||||
_secretHeader: string | undefined,
|
||||
_unauthorized: () => Promise<void>,
|
||||
) => {
|
||||
seenUpdates.push(update);
|
||||
void (async () => {
|
||||
await sleep(50);
|
||||
await reply("ok");
|
||||
})();
|
||||
},
|
||||
) as unknown as typeof handlerSpy,
|
||||
);
|
||||
|
||||
const secret = "secret";
|
||||
const abort = new AbortController();
|
||||
const { server } = await startTelegramWebhook({
|
||||
token: "tok",
|
||||
secret,
|
||||
port: 0,
|
||||
abortSignal: abort.signal,
|
||||
path: "/hook",
|
||||
});
|
||||
|
||||
try {
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("no addr");
|
||||
}
|
||||
|
||||
const firstPayload = JSON.stringify({ update_id: 100, message: { text: "first" } });
|
||||
const secondPayload = JSON.stringify({ update_id: 101, message: { text: "second" } });
|
||||
const firstResponse = await postWebhookPayloadWithChunkPlan({
|
||||
port: address.port,
|
||||
path: "/hook",
|
||||
payload: firstPayload,
|
||||
secret,
|
||||
mode: "single",
|
||||
timeoutMs: WEBHOOK_POST_TIMEOUT_MS,
|
||||
});
|
||||
const secondResponse = await postWebhookPayloadWithChunkPlan({
|
||||
port: address.port,
|
||||
path: "/hook",
|
||||
payload: secondPayload,
|
||||
secret,
|
||||
mode: "single",
|
||||
timeoutMs: WEBHOOK_POST_TIMEOUT_MS,
|
||||
});
|
||||
|
||||
expect(firstResponse.statusCode).toBe(200);
|
||||
expect(secondResponse.statusCode).toBe(200);
|
||||
expect(seenUpdates).toEqual([JSON.parse(firstPayload), JSON.parse(secondPayload)]);
|
||||
} finally {
|
||||
abort.abort();
|
||||
}
|
||||
});
|
||||
|
||||
it("handles near-limit payload with random chunk writes and event-loop yields", async () => {
|
||||
const seenUpdates: Array<{ update_id: number; message: { text: string } }> = [];
|
||||
webhookCallbackSpy.mockImplementationOnce(
|
||||
() =>
|
||||
vi.fn(
|
||||
(
|
||||
update: unknown,
|
||||
reply: (json: string) => Promise<void>,
|
||||
_secretHeader: string | undefined,
|
||||
_unauthorized: () => Promise<void>,
|
||||
) => {
|
||||
seenUpdates.push(update as { update_id: number; message: { text: string } });
|
||||
void reply("ok");
|
||||
},
|
||||
) as unknown as typeof handlerSpy,
|
||||
);
|
||||
|
||||
const { payload, sizeBytes } = createNearLimitTelegramPayload();
|
||||
expect(sizeBytes).toBeLessThan(1_024 * 1_024);
|
||||
expect(sizeBytes).toBeGreaterThan(256 * 1_024);
|
||||
const expected = JSON.parse(payload) as { update_id: number; message: { text: string } };
|
||||
|
||||
const secret = "secret";
|
||||
const abort = new AbortController();
|
||||
const { server } = await startTelegramWebhook({
|
||||
token: "tok",
|
||||
secret,
|
||||
port: 0,
|
||||
abortSignal: abort.signal,
|
||||
path: "/hook",
|
||||
});
|
||||
|
||||
try {
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("no addr");
|
||||
}
|
||||
|
||||
const response = await postWebhookPayloadWithChunkPlan({
|
||||
port: address.port,
|
||||
path: "/hook",
|
||||
payload,
|
||||
secret,
|
||||
mode: "random-chunked",
|
||||
timeoutMs: WEBHOOK_POST_TIMEOUT_MS,
|
||||
});
|
||||
|
||||
expect(response.statusCode).toBe(200);
|
||||
expect(seenUpdates).toHaveLength(1);
|
||||
expect(seenUpdates[0]?.update_id).toBe(expected.update_id);
|
||||
expect(seenUpdates[0]?.message.text.length).toBe(expected.message.text.length);
|
||||
expect(sha256(seenUpdates[0]?.message.text ?? "")).toBe(sha256(expected.message.text));
|
||||
} finally {
|
||||
abort.abort();
|
||||
}
|
||||
});
|
||||
|
||||
it("handles near-limit payload written in a single request write", async () => {
|
||||
const seenUpdates: Array<{ update_id: number; message: { text: string } }> = [];
|
||||
webhookCallbackSpy.mockImplementationOnce(
|
||||
() =>
|
||||
vi.fn(
|
||||
(
|
||||
update: unknown,
|
||||
reply: (json: string) => Promise<void>,
|
||||
_secretHeader: string | undefined,
|
||||
_unauthorized: () => Promise<void>,
|
||||
) => {
|
||||
seenUpdates.push(update as { update_id: number; message: { text: string } });
|
||||
void reply("ok");
|
||||
},
|
||||
) as unknown as typeof handlerSpy,
|
||||
);
|
||||
|
||||
const { payload, sizeBytes } = createNearLimitTelegramPayload();
|
||||
expect(sizeBytes).toBeLessThan(1_024 * 1_024);
|
||||
expect(sizeBytes).toBeGreaterThan(256 * 1_024);
|
||||
const expected = JSON.parse(payload) as { update_id: number; message: { text: string } };
|
||||
|
||||
const secret = "secret";
|
||||
const abort = new AbortController();
|
||||
const { server } = await startTelegramWebhook({
|
||||
token: "tok",
|
||||
secret,
|
||||
port: 0,
|
||||
abortSignal: abort.signal,
|
||||
path: "/hook",
|
||||
});
|
||||
|
||||
try {
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("no addr");
|
||||
}
|
||||
|
||||
const response = await postWebhookPayloadWithChunkPlan({
|
||||
port: address.port,
|
||||
path: "/hook",
|
||||
payload,
|
||||
secret,
|
||||
mode: "single",
|
||||
timeoutMs: WEBHOOK_POST_TIMEOUT_MS,
|
||||
});
|
||||
|
||||
expect(response.statusCode).toBe(200);
|
||||
expect(seenUpdates).toHaveLength(1);
|
||||
expect(seenUpdates[0]?.update_id).toBe(expected.update_id);
|
||||
expect(seenUpdates[0]?.message.text.length).toBe(expected.message.text.length);
|
||||
expect(sha256(seenUpdates[0]?.message.text ?? "")).toBe(sha256(expected.message.text));
|
||||
} finally {
|
||||
abort.abort();
|
||||
}
|
||||
});
|
||||
|
||||
it("rejects payloads larger than 1MB before invoking webhook handler", async () => {
|
||||
handlerSpy.mockClear();
|
||||
const abort = new AbortController();
|
||||
const { server } = await startTelegramWebhook({
|
||||
token: "tok",
|
||||
secret: "secret",
|
||||
port: 0,
|
||||
abortSignal: abort.signal,
|
||||
path: "/hook",
|
||||
});
|
||||
|
||||
try {
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("no addr");
|
||||
}
|
||||
|
||||
const responseOrError = await new Promise<
|
||||
| { kind: "response"; statusCode: number; body: string }
|
||||
| { kind: "error"; code: string | undefined }
|
||||
>((resolve) => {
|
||||
const req = request(
|
||||
{
|
||||
hostname: "127.0.0.1",
|
||||
port: address.port,
|
||||
path: "/hook",
|
||||
method: "POST",
|
||||
headers: {
|
||||
"content-type": "application/json",
|
||||
"content-length": String(1_024 * 1_024 + 2_048),
|
||||
"x-telegram-bot-api-secret-token": "secret",
|
||||
},
|
||||
},
|
||||
(res) => {
|
||||
const chunks: Buffer[] = [];
|
||||
res.on("data", (chunk: Buffer | string) => {
|
||||
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
|
||||
});
|
||||
res.on("end", () => {
|
||||
resolve({
|
||||
kind: "response",
|
||||
statusCode: res.statusCode ?? 0,
|
||||
body: Buffer.concat(chunks).toString("utf-8"),
|
||||
});
|
||||
});
|
||||
},
|
||||
);
|
||||
req.on("error", (error: NodeJS.ErrnoException) => {
|
||||
resolve({ kind: "error", code: error.code });
|
||||
});
|
||||
req.end("{}");
|
||||
});
|
||||
|
||||
if (responseOrError.kind === "response") {
|
||||
expect(responseOrError.statusCode).toBe(413);
|
||||
expect(responseOrError.body).toBe("Payload too large");
|
||||
} else {
|
||||
expect(responseOrError.code).toBeOneOf(["ECONNRESET", "EPIPE"]);
|
||||
}
|
||||
expect(handlerSpy).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
abort.abort();
|
||||
}
|
||||
});
|
||||
|
||||
it("de-registers webhook when shutting down", async () => {
|
||||
deleteWebhookSpy.mockClear();
|
||||
const abort = new AbortController();
|
||||
await startTelegramWebhook({
|
||||
token: "tok",
|
||||
secret: "secret",
|
||||
port: 0,
|
||||
abortSignal: abort.signal,
|
||||
path: "/hook",
|
||||
});
|
||||
|
||||
abort.abort();
|
||||
await sleep(25);
|
||||
|
||||
expect(deleteWebhookSpy).toHaveBeenCalledTimes(1);
|
||||
expect(deleteWebhookSpy).toHaveBeenCalledWith({ drop_pending_updates: false });
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3,7 +3,7 @@ import { webhookCallback } from "grammy";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import { installRequestBodyLimitGuard } from "../infra/http-body.js";
|
||||
import { readJsonBodyWithLimit } from "../infra/http-body.js";
|
||||
import {
|
||||
logWebhookError,
|
||||
logWebhookProcessed,
|
||||
@@ -21,6 +21,59 @@ const TELEGRAM_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024;
|
||||
const TELEGRAM_WEBHOOK_BODY_TIMEOUT_MS = 30_000;
|
||||
const TELEGRAM_WEBHOOK_CALLBACK_TIMEOUT_MS = 10_000;
|
||||
|
||||
async function listenHttpServer(params: {
|
||||
server: ReturnType<typeof createServer>;
|
||||
port: number;
|
||||
host: string;
|
||||
}) {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const onError = (err: Error) => {
|
||||
params.server.off("error", onError);
|
||||
reject(err);
|
||||
};
|
||||
params.server.once("error", onError);
|
||||
params.server.listen(params.port, params.host, () => {
|
||||
params.server.off("error", onError);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function resolveWebhookPublicUrl(params: {
|
||||
configuredPublicUrl?: string;
|
||||
server: ReturnType<typeof createServer>;
|
||||
path: string;
|
||||
host: string;
|
||||
port: number;
|
||||
}) {
|
||||
if (params.configuredPublicUrl) {
|
||||
return params.configuredPublicUrl;
|
||||
}
|
||||
const address = params.server.address();
|
||||
if (address && typeof address !== "string") {
|
||||
const resolvedHost =
|
||||
params.host === "0.0.0.0" || address.address === "0.0.0.0" || address.address === "::"
|
||||
? "localhost"
|
||||
: address.address;
|
||||
return `http://${resolvedHost}:${address.port}${params.path}`;
|
||||
}
|
||||
const fallbackHost = params.host === "0.0.0.0" ? "localhost" : params.host;
|
||||
return `http://${fallbackHost}:${params.port}${params.path}`;
|
||||
}
|
||||
|
||||
async function initializeTelegramWebhookBot(params: {
|
||||
bot: ReturnType<typeof createTelegramBot>;
|
||||
runtime: RuntimeEnv;
|
||||
abortSignal?: AbortSignal;
|
||||
}) {
|
||||
const initSignal = params.abortSignal as Parameters<(typeof params.bot)["init"]>[0];
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "getMe",
|
||||
runtime: params.runtime,
|
||||
fn: () => params.bot.init(initSignal),
|
||||
});
|
||||
}
|
||||
|
||||
export async function startTelegramWebhook(opts: {
|
||||
token: string;
|
||||
accountId?: string;
|
||||
@@ -55,7 +108,12 @@ export async function startTelegramWebhook(opts: {
|
||||
config: opts.config,
|
||||
accountId: opts.accountId,
|
||||
});
|
||||
const handler = webhookCallback(bot, "http", {
|
||||
await initializeTelegramWebhookBot({
|
||||
bot,
|
||||
runtime,
|
||||
abortSignal: opts.abortSignal,
|
||||
});
|
||||
const handler = webhookCallback(bot, "callback", {
|
||||
secretToken: secret,
|
||||
onTimeout: "return",
|
||||
timeoutMilliseconds: TELEGRAM_WEBHOOK_CALLBACK_TIMEOUT_MS,
|
||||
@@ -66,6 +124,14 @@ export async function startTelegramWebhook(opts: {
|
||||
}
|
||||
|
||||
const server = createServer((req, res) => {
|
||||
const respondText = (statusCode: number, text = "") => {
|
||||
if (res.headersSent || res.writableEnded) {
|
||||
return;
|
||||
}
|
||||
res.writeHead(statusCode, { "Content-Type": "text/plain; charset=utf-8" });
|
||||
res.end(text);
|
||||
};
|
||||
|
||||
if (req.url === healthPath) {
|
||||
res.writeHead(200);
|
||||
res.end("ok");
|
||||
@@ -80,69 +146,125 @@ export async function startTelegramWebhook(opts: {
|
||||
if (diagnosticsEnabled) {
|
||||
logWebhookReceived({ channel: "telegram", updateType: "telegram-post" });
|
||||
}
|
||||
const guard = installRequestBodyLimitGuard(req, res, {
|
||||
maxBytes: TELEGRAM_WEBHOOK_MAX_BODY_BYTES,
|
||||
timeoutMs: TELEGRAM_WEBHOOK_BODY_TIMEOUT_MS,
|
||||
responseFormat: "text",
|
||||
});
|
||||
if (guard.isTripped()) {
|
||||
return;
|
||||
}
|
||||
const handled = handler(req, res);
|
||||
if (handled && typeof handled.catch === "function") {
|
||||
void handled
|
||||
.then(() => {
|
||||
if (diagnosticsEnabled) {
|
||||
logWebhookProcessed({
|
||||
channel: "telegram",
|
||||
updateType: "telegram-post",
|
||||
durationMs: Date.now() - startTime,
|
||||
});
|
||||
}
|
||||
})
|
||||
.catch((err) => {
|
||||
if (guard.isTripped()) {
|
||||
return;
|
||||
}
|
||||
const errMsg = formatErrorMessage(err);
|
||||
if (diagnosticsEnabled) {
|
||||
logWebhookError({
|
||||
channel: "telegram",
|
||||
updateType: "telegram-post",
|
||||
error: errMsg,
|
||||
});
|
||||
}
|
||||
runtime.log?.(`webhook handler failed: ${errMsg}`);
|
||||
if (!res.headersSent) {
|
||||
res.writeHead(500);
|
||||
}
|
||||
res.end();
|
||||
})
|
||||
.finally(() => {
|
||||
guard.dispose();
|
||||
void (async () => {
|
||||
const body = await readJsonBodyWithLimit(req, {
|
||||
maxBytes: TELEGRAM_WEBHOOK_MAX_BODY_BYTES,
|
||||
timeoutMs: TELEGRAM_WEBHOOK_BODY_TIMEOUT_MS,
|
||||
emptyObjectOnEmpty: false,
|
||||
});
|
||||
if (!body.ok) {
|
||||
if (body.code === "PAYLOAD_TOO_LARGE") {
|
||||
respondText(413, body.error);
|
||||
return;
|
||||
}
|
||||
if (body.code === "REQUEST_BODY_TIMEOUT") {
|
||||
respondText(408, body.error);
|
||||
return;
|
||||
}
|
||||
if (body.code === "CONNECTION_CLOSED") {
|
||||
respondText(400, body.error);
|
||||
return;
|
||||
}
|
||||
respondText(400, body.error);
|
||||
return;
|
||||
}
|
||||
|
||||
let replied = false;
|
||||
const reply = async (json: string) => {
|
||||
if (replied) {
|
||||
return;
|
||||
}
|
||||
replied = true;
|
||||
if (res.headersSent || res.writableEnded) {
|
||||
return;
|
||||
}
|
||||
res.writeHead(200, { "Content-Type": "application/json; charset=utf-8" });
|
||||
res.end(json);
|
||||
};
|
||||
const unauthorized = async () => {
|
||||
if (replied) {
|
||||
return;
|
||||
}
|
||||
replied = true;
|
||||
respondText(401, "unauthorized");
|
||||
};
|
||||
const secretHeaderRaw = req.headers["x-telegram-bot-api-secret-token"];
|
||||
const secretHeader = Array.isArray(secretHeaderRaw) ? secretHeaderRaw[0] : secretHeaderRaw;
|
||||
|
||||
await handler(body.value, reply, secretHeader, unauthorized);
|
||||
if (!replied) {
|
||||
respondText(200);
|
||||
}
|
||||
|
||||
if (diagnosticsEnabled) {
|
||||
logWebhookProcessed({
|
||||
channel: "telegram",
|
||||
updateType: "telegram-post",
|
||||
durationMs: Date.now() - startTime,
|
||||
});
|
||||
return;
|
||||
}
|
||||
})().catch((err) => {
|
||||
const errMsg = formatErrorMessage(err);
|
||||
if (diagnosticsEnabled) {
|
||||
logWebhookError({
|
||||
channel: "telegram",
|
||||
updateType: "telegram-post",
|
||||
error: errMsg,
|
||||
});
|
||||
}
|
||||
runtime.log?.(`webhook handler failed: ${errMsg}`);
|
||||
respondText(500);
|
||||
});
|
||||
});
|
||||
|
||||
await listenHttpServer({
|
||||
server,
|
||||
port,
|
||||
host,
|
||||
});
|
||||
|
||||
const publicUrl = resolveWebhookPublicUrl({
|
||||
configuredPublicUrl: opts.publicUrl,
|
||||
server,
|
||||
path,
|
||||
host,
|
||||
port,
|
||||
});
|
||||
|
||||
try {
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "setWebhook",
|
||||
runtime,
|
||||
fn: () =>
|
||||
bot.api.setWebhook(publicUrl, {
|
||||
secret_token: secret,
|
||||
allowed_updates: resolveTelegramAllowedUpdates(),
|
||||
}),
|
||||
});
|
||||
} catch (err) {
|
||||
server.close();
|
||||
void bot.stop();
|
||||
if (diagnosticsEnabled) {
|
||||
stopDiagnosticHeartbeat();
|
||||
}
|
||||
guard.dispose();
|
||||
});
|
||||
throw err;
|
||||
}
|
||||
|
||||
const publicUrl =
|
||||
opts.publicUrl ?? `http://${host === "0.0.0.0" ? "localhost" : host}:${port}${path}`;
|
||||
|
||||
await withTelegramApiErrorLogging({
|
||||
operation: "setWebhook",
|
||||
runtime,
|
||||
fn: () =>
|
||||
bot.api.setWebhook(publicUrl, {
|
||||
secret_token: secret,
|
||||
allowed_updates: resolveTelegramAllowedUpdates(),
|
||||
}),
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve) => server.listen(port, host, resolve));
|
||||
runtime.log?.(`webhook listening on ${publicUrl}`);
|
||||
|
||||
let shutDown = false;
|
||||
const shutdown = () => {
|
||||
if (shutDown) {
|
||||
return;
|
||||
}
|
||||
shutDown = true;
|
||||
void withTelegramApiErrorLogging({
|
||||
operation: "deleteWebhook",
|
||||
runtime,
|
||||
fn: () => bot.api.deleteWebhook({ drop_pending_updates: false }),
|
||||
}).catch(() => {
|
||||
// withTelegramApiErrorLogging has already emitted the failure.
|
||||
});
|
||||
server.close();
|
||||
void bot.stop();
|
||||
if (diagnosticsEnabled) {
|
||||
|
||||
Reference in New Issue
Block a user