fix(telegram): unify transport fallback chain (#49148)

* fix(telegram): unify transport fallback chain

* fix: address telegram fallback review comments

* fix: validate pinned SSRF overrides

* fix: unify telegram fallback retries (#49148)
This commit is contained in:
Ayaan Zaidi
2026-03-17 22:44:15 +05:30
committed by GitHub
parent 272d6ed24b
commit e4825a0f93
10 changed files with 459 additions and 135 deletions

View File

@@ -4,7 +4,7 @@ import { retryAsync } from "openclaw/plugin-sdk/infra-runtime";
import { fetchRemoteMedia } from "openclaw/plugin-sdk/media-runtime";
import { saveMediaBuffer } from "openclaw/plugin-sdk/media-runtime";
import { logVerbose, warn } from "openclaw/plugin-sdk/runtime-env";
import { shouldRetryTelegramIpv4Fallback, type TelegramTransport } from "../fetch.js";
import { shouldRetryTelegramTransportFallback, type TelegramTransport } from "../fetch.js";
import { cacheSticker, getCachedSticker } from "../sticker-cache.js";
import { resolveTelegramMediaPlaceholder } from "./helpers.js";
import type { StickerMetadata, TelegramContext } from "./types.js";
@@ -129,9 +129,8 @@ async function downloadAndSaveTelegramFile(params: {
const fetched = await fetchRemoteMedia({
url,
fetchImpl: params.transport.sourceFetch,
dispatcherPolicy: params.transport.pinnedDispatcherPolicy,
fallbackDispatcherPolicy: params.transport.fallbackPinnedDispatcherPolicy,
shouldRetryFetchError: shouldRetryTelegramIpv4Fallback,
dispatcherAttempts: params.transport.dispatcherAttempts,
shouldRetryFetchError: shouldRetryTelegramTransportFallback,
filePathHint: params.filePath,
maxBytes: params.maxBytes,
readIdleTimeoutMs: TELEGRAM_DOWNLOAD_IDLE_TIMEOUT_MS,

View File

@@ -1,6 +1,4 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { resolveFetch } from "../../../src/infra/fetch.js";
import { resolveTelegramFetch, resolveTelegramTransport } from "./fetch.js";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const setDefaultResultOrder = vi.hoisted(() => vi.fn());
const setDefaultAutoSelectFamily = vi.hoisted(() => vi.fn());
@@ -56,6 +54,16 @@ vi.mock("undici", () => ({
setGlobalDispatcher,
}));
let resolveFetch: typeof import("../../../src/infra/fetch.js").resolveFetch;
let resolveTelegramFetch: typeof import("./fetch.js").resolveTelegramFetch;
let resolveTelegramTransport: typeof import("./fetch.js").resolveTelegramTransport;
beforeEach(async () => {
vi.resetModules();
({ resolveFetch } = await import("../../../src/infra/fetch.js"));
({ resolveTelegramFetch, resolveTelegramTransport } = await import("./fetch.js"));
});
function resolveTelegramFetchOrThrow(
proxyFetch?: typeof fetch,
options?: { network?: { autoSelectFamily?: boolean; dnsResultOrder?: "ipv4first" | "verbatim" } },
@@ -152,6 +160,24 @@ function expectPinnedIpv4ConnectDispatcher(args: {
}
}
function expectPinnedFallbackIpDispatcher(callIndex: number) {
const dispatcher = getDispatcherFromUndiciCall(callIndex);
expect(dispatcher?.options?.connect).toEqual(
expect.objectContaining({
family: 4,
autoSelectFamily: false,
lookup: expect.any(Function),
}),
);
const callback = vi.fn();
(
dispatcher?.options?.connect?.lookup as
| ((hostname: string, callback: (err: null, address: string, family: number) => void) => void)
| undefined
)?.("api.telegram.org", callback);
expect(callback).toHaveBeenCalledWith(null, "149.154.167.220", 4);
}
function expectCallerDispatcherPreserved(callIndexes: number[], dispatcher: unknown) {
for (const callIndex of callIndexes) {
const callInit = undiciFetch.mock.calls[callIndex - 1]?.[1] as
@@ -395,7 +421,7 @@ describe("resolveTelegramFetch", () => {
pinnedCall: 2,
followupCall: 3,
});
expect(transport.pinnedDispatcherPolicy).toEqual(
expect(transport.dispatcherAttempts?.[0]?.dispatcherPolicy).toEqual(
expect.objectContaining({
mode: "direct",
}),
@@ -533,6 +559,34 @@ describe("resolveTelegramFetch", () => {
);
});
it("escalates from IPv4 fallback to pinned Telegram IP and keeps it sticky", async () => {
undiciFetch
.mockRejectedValueOnce(buildFetchFallbackError("ETIMEDOUT"))
.mockRejectedValueOnce(buildFetchFallbackError("EHOSTUNREACH"))
.mockResolvedValueOnce({ ok: true } as Response)
.mockResolvedValueOnce({ ok: true } as Response);
const resolved = resolveTelegramFetchOrThrow(undefined, {
network: {
autoSelectFamily: true,
dnsResultOrder: "ipv4first",
},
});
await resolved("https://api.telegram.org/botx/sendMessage");
await resolved("https://api.telegram.org/botx/sendChatAction");
expect(undiciFetch).toHaveBeenCalledTimes(4);
const secondDispatcher = getDispatcherFromUndiciCall(2);
const thirdDispatcher = getDispatcherFromUndiciCall(3);
const fourthDispatcher = getDispatcherFromUndiciCall(4);
expect(secondDispatcher).not.toBe(thirdDispatcher);
expect(thirdDispatcher).toBe(fourthDispatcher);
expectPinnedFallbackIpDispatcher(3);
});
it("preserves caller-provided dispatcher across fallback retry", async () => {
const fetchError = buildFetchFallbackError("EHOSTUNREACH");
undiciFetch.mockRejectedValueOnce(fetchError).mockResolvedValueOnce({ ok: true } as Response);

View File

@@ -1,8 +1,11 @@
import * as dns from "node:dns";
import type { TelegramNetworkConfig } from "openclaw/plugin-sdk/config-runtime";
import { resolveFetch } from "openclaw/plugin-sdk/infra-runtime";
import { hasEnvHttpProxyConfigured } from "openclaw/plugin-sdk/infra-runtime";
import type { PinnedDispatcherPolicy } from "openclaw/plugin-sdk/infra-runtime";
import {
createPinnedLookup,
hasEnvHttpProxyConfigured,
resolveFetch,
type PinnedDispatcherPolicy,
} from "openclaw/plugin-sdk/infra-runtime";
import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env";
import { Agent, EnvHttpProxyAgent, ProxyAgent, fetch as undiciFetch } from "undici";
import {
@@ -15,6 +18,7 @@ const log = createSubsystemLogger("telegram/network");
const TELEGRAM_AUTO_SELECT_FAMILY_ATTEMPT_TIMEOUT_MS = 300;
const TELEGRAM_API_HOSTNAME = "api.telegram.org";
const TELEGRAM_FALLBACK_IPS: readonly string[] = ["149.154.167.220"];
type RequestInitWithDispatcher = RequestInit & {
dispatcher?: unknown;
@@ -24,6 +28,16 @@ type TelegramDispatcher = Agent | EnvHttpProxyAgent | ProxyAgent;
type TelegramDispatcherMode = "direct" | "env-proxy" | "explicit-proxy";
type TelegramDispatcherAttempt = {
dispatcherPolicy?: PinnedDispatcherPolicy;
};
type TelegramTransportAttempt = {
createDispatcher: () => TelegramDispatcher;
exportAttempt: TelegramDispatcherAttempt;
logMessage?: string;
};
type TelegramDnsResultOrder = "ipv4first" | "verbatim";
type LookupCallback =
@@ -49,17 +63,17 @@ const FALLBACK_RETRY_ERROR_CODES = [
"UND_ERR_SOCKET",
] as const;
type Ipv4FallbackContext = {
type TelegramTransportFallbackContext = {
message: string;
codes: Set<string>;
};
type Ipv4FallbackRule = {
type TelegramTransportFallbackRule = {
name: string;
matches: (ctx: Ipv4FallbackContext) => boolean;
matches: (ctx: TelegramTransportFallbackContext) => boolean;
};
const IPV4_FALLBACK_RULES: readonly Ipv4FallbackRule[] = [
const TELEGRAM_TRANSPORT_FALLBACK_RULES: readonly TelegramTransportFallbackRule[] = [
{
name: "fetch-failed-envelope",
matches: ({ message }) => message.includes("fetch failed"),
@@ -98,7 +112,6 @@ function createDnsResultOrderLookup(
const lookupOptions: LookupOptions = {
...baseOptions,
order,
// Keep `verbatim` for compatibility with Node runtimes that ignore `order`.
verbatim: order === "verbatim",
};
lookup(hostname, lookupOptions, callback);
@@ -139,14 +152,6 @@ function buildTelegramConnectOptions(params: {
}
function shouldBypassEnvProxyForTelegramApi(env: NodeJS.ProcessEnv = process.env): boolean {
// We need this classification before dispatch to decide whether sticky IPv4 fallback
// can safely arm. EnvHttpProxyAgent does not expose route decisions (proxy vs direct
// NO_PROXY bypass), so we mirror undici's parsing/matching behavior for this host.
// Match EnvHttpProxyAgent behavior (undici):
// - lower-case no_proxy takes precedence over NO_PROXY
// - entries split by comma or whitespace
// - wildcard handling is exact-string "*" only
// - leading "." and "*." are normalized the same way
const noProxyValue = env.no_proxy ?? env.NO_PROXY ?? "";
if (!noProxyValue) {
return false;
@@ -228,16 +233,32 @@ function resolveTelegramDispatcherPolicy(params: {
};
}
function withPinnedLookup(
options: Record<string, unknown> | undefined,
pinnedHostname: PinnedDispatcherPolicy["pinnedHostname"],
): Record<string, unknown> | undefined {
if (!pinnedHostname) {
return options ? { ...options } : undefined;
}
const lookup = createPinnedLookup({
hostname: pinnedHostname.hostname,
addresses: [...pinnedHostname.addresses],
fallback: dns.lookup,
});
return options ? { ...options, lookup } : { lookup };
}
function createTelegramDispatcher(policy: PinnedDispatcherPolicy): {
dispatcher: TelegramDispatcher;
mode: TelegramDispatcherMode;
effectivePolicy: PinnedDispatcherPolicy;
} {
if (policy.mode === "explicit-proxy") {
const proxyOptions = policy.proxyTls
const proxyTlsOptions = withPinnedLookup(policy.proxyTls, policy.pinnedHostname);
const proxyOptions = proxyTlsOptions
? ({
uri: policy.proxyUrl,
proxyTls: { ...policy.proxyTls },
proxyTls: proxyTlsOptions,
} satisfies ConstructorParameters<typeof ProxyAgent>[0])
: policy.proxyUrl;
try {
@@ -253,13 +274,13 @@ function createTelegramDispatcher(policy: PinnedDispatcherPolicy): {
}
if (policy.mode === "env-proxy") {
const connectOptions = withPinnedLookup(policy.connect, policy.pinnedHostname);
const proxyTlsOptions = withPinnedLookup(policy.proxyTls, policy.pinnedHostname);
const proxyOptions =
policy.connect || policy.proxyTls
connectOptions || proxyTlsOptions
? ({
...(policy.connect ? { connect: { ...policy.connect } } : {}),
// undici's EnvHttpProxyAgent passes `connect` only to the no-proxy Agent.
// Real proxied HTTPS traffic reads transport settings from ProxyAgent.proxyTls.
...(policy.proxyTls ? { proxyTls: { ...policy.proxyTls } } : {}),
...(connectOptions ? { connect: connectOptions } : {}),
...(proxyTlsOptions ? { proxyTls: proxyTlsOptions } : {}),
} satisfies ConstructorParameters<typeof EnvHttpProxyAgent>[0])
: undefined;
try {
@@ -276,14 +297,12 @@ function createTelegramDispatcher(policy: PinnedDispatcherPolicy): {
);
const directPolicy: PinnedDispatcherPolicy = {
mode: "direct",
...(policy.connect ? { connect: { ...policy.connect } } : {}),
...(connectOptions ? { connect: connectOptions } : {}),
};
return {
dispatcher: new Agent(
directPolicy.connect
? ({
connect: { ...directPolicy.connect },
} satisfies ConstructorParameters<typeof Agent>[0])
? ({ connect: directPolicy.connect } satisfies ConstructorParameters<typeof Agent>[0])
: undefined,
),
mode: "direct",
@@ -292,11 +311,12 @@ function createTelegramDispatcher(policy: PinnedDispatcherPolicy): {
}
}
const connectOptions = withPinnedLookup(policy.connect, policy.pinnedHostname);
return {
dispatcher: new Agent(
policy.connect
connectOptions
? ({
connect: { ...policy.connect },
connect: connectOptions,
} satisfies ConstructorParameters<typeof Agent>[0])
: undefined,
),
@@ -375,13 +395,13 @@ function formatErrorCodes(err: unknown): string {
return codes.length > 0 ? codes.join(",") : "none";
}
function shouldRetryWithIpv4Fallback(err: unknown): boolean {
const ctx: Ipv4FallbackContext = {
function shouldUseTelegramTransportFallback(err: unknown): boolean {
const ctx: TelegramTransportFallbackContext = {
message:
err && typeof err === "object" && "message" in err ? String(err.message).toLowerCase() : "",
codes: collectErrorCodes(err),
};
for (const rule of IPV4_FALLBACK_RULES) {
for (const rule of TELEGRAM_TRANSPORT_FALLBACK_RULES) {
if (!rule.matches(ctx)) {
return false;
}
@@ -389,18 +409,71 @@ function shouldRetryWithIpv4Fallback(err: unknown): boolean {
return true;
}
export function shouldRetryTelegramIpv4Fallback(err: unknown): boolean {
return shouldRetryWithIpv4Fallback(err);
export function shouldRetryTelegramTransportFallback(err: unknown): boolean {
return shouldUseTelegramTransportFallback(err);
}
// Prefer wrapped fetch when available to normalize AbortSignal across runtimes.
export type TelegramTransport = {
fetch: typeof fetch;
sourceFetch: typeof fetch;
pinnedDispatcherPolicy?: PinnedDispatcherPolicy;
fallbackPinnedDispatcherPolicy?: PinnedDispatcherPolicy;
dispatcherAttempts?: TelegramDispatcherAttempt[];
};
function createTelegramTransportAttempts(params: {
defaultDispatcher: ReturnType<typeof createTelegramDispatcher>;
allowFallback: boolean;
fallbackPolicy?: PinnedDispatcherPolicy;
}): TelegramTransportAttempt[] {
const attempts: TelegramTransportAttempt[] = [
{
createDispatcher: () => params.defaultDispatcher.dispatcher,
exportAttempt: { dispatcherPolicy: params.defaultDispatcher.effectivePolicy },
},
];
if (!params.allowFallback || !params.fallbackPolicy) {
return attempts;
}
const fallbackPolicy = params.fallbackPolicy;
let ipv4Dispatcher: TelegramDispatcher | null = null;
attempts.push({
createDispatcher: () => {
if (!ipv4Dispatcher) {
ipv4Dispatcher = createTelegramDispatcher(fallbackPolicy).dispatcher;
}
return ipv4Dispatcher;
},
exportAttempt: { dispatcherPolicy: fallbackPolicy },
logMessage: "fetch fallback: enabling sticky IPv4-only dispatcher",
});
if (TELEGRAM_FALLBACK_IPS.length === 0) {
return attempts;
}
const fallbackIpPolicy: PinnedDispatcherPolicy = {
...fallbackPolicy,
pinnedHostname: {
hostname: TELEGRAM_API_HOSTNAME,
addresses: [...TELEGRAM_FALLBACK_IPS],
},
};
let fallbackIpDispatcher: TelegramDispatcher | null = null;
attempts.push({
createDispatcher: () => {
if (!fallbackIpDispatcher) {
fallbackIpDispatcher = createTelegramDispatcher(fallbackIpPolicy).dispatcher;
}
return fallbackIpDispatcher;
},
exportAttempt: { dispatcherPolicy: fallbackIpPolicy },
logMessage: "fetch fallback: DNS-resolved IP unreachable; trying alternative Telegram API IP",
});
return attempts;
}
export function resolveTelegramTransport(
proxyFetch?: typeof fetch,
options?: { network?: TelegramNetworkConfig },
@@ -424,7 +497,6 @@ export function resolveTelegramTransport(
? resolveWrappedFetch(proxyFetch)
: undiciSourceFetch;
const dnsResultOrder = normalizeDnsResultOrder(dnsDecision.value);
// Preserve fully caller-owned custom fetch implementations.
if (proxyFetch && !explicitProxyUrl) {
return { fetch: sourceFetch, sourceFetch };
}
@@ -439,70 +511,75 @@ export function resolveTelegramTransport(
});
const defaultDispatcher = createTelegramDispatcher(defaultDispatcherResolution.policy);
const shouldBypassEnvProxy = shouldBypassEnvProxyForTelegramApi();
const allowStickyIpv4Fallback =
const allowStickyFallback =
defaultDispatcher.mode === "direct" ||
(defaultDispatcher.mode === "env-proxy" && shouldBypassEnvProxy);
const stickyShouldUseEnvProxy = defaultDispatcher.mode === "env-proxy";
const fallbackPinnedDispatcherPolicy = allowStickyIpv4Fallback
const fallbackDispatcherPolicy = allowStickyFallback
? resolveTelegramDispatcherPolicy({
autoSelectFamily: false,
dnsResultOrder: "ipv4first",
useEnvProxy: stickyShouldUseEnvProxy,
useEnvProxy: defaultDispatcher.mode === "env-proxy",
forceIpv4: true,
proxyUrl: explicitProxyUrl,
}).policy
: undefined;
const transportAttempts = createTelegramTransportAttempts({
defaultDispatcher,
allowFallback: allowStickyFallback,
fallbackPolicy: fallbackDispatcherPolicy,
});
let stickyIpv4FallbackEnabled = false;
let stickyIpv4Dispatcher: TelegramDispatcher | null = null;
const resolveStickyIpv4Dispatcher = () => {
if (!stickyIpv4Dispatcher) {
if (!fallbackPinnedDispatcherPolicy) {
return defaultDispatcher.dispatcher;
}
stickyIpv4Dispatcher = createTelegramDispatcher(fallbackPinnedDispatcherPolicy).dispatcher;
}
return stickyIpv4Dispatcher;
};
let stickyAttemptIndex = 0;
const resolvedFetch = (async (input: RequestInfo | URL, init?: RequestInit) => {
const callerProvidedDispatcher = Boolean(
(init as RequestInitWithDispatcher | undefined)?.dispatcher,
);
const initialInit = withDispatcherIfMissing(
init,
stickyIpv4FallbackEnabled ? resolveStickyIpv4Dispatcher() : defaultDispatcher.dispatcher,
);
const startIndex = Math.min(stickyAttemptIndex, transportAttempts.length - 1);
let err: unknown;
try {
return await sourceFetch(input, initialInit);
} catch (err) {
if (shouldRetryWithIpv4Fallback(err)) {
// Preserve caller-owned dispatchers on retry.
if (callerProvidedDispatcher) {
return sourceFetch(input, init ?? {});
}
// Proxy routes should not arm sticky IPv4 mode; `family=4` would constrain
// proxy-connect behavior instead of Telegram endpoint selection.
if (!allowStickyIpv4Fallback) {
throw err;
}
if (!stickyIpv4FallbackEnabled) {
stickyIpv4FallbackEnabled = true;
log.warn(
`fetch fallback: enabling sticky IPv4-only dispatcher (codes=${formatErrorCodes(err)})`,
);
}
return sourceFetch(input, withDispatcherIfMissing(init, resolveStickyIpv4Dispatcher()));
}
return await sourceFetch(
input,
withDispatcherIfMissing(init, transportAttempts[startIndex].createDispatcher()),
);
} catch (caught) {
err = caught;
}
if (!shouldUseTelegramTransportFallback(err)) {
throw err;
}
if (callerProvidedDispatcher) {
return sourceFetch(input, init ?? {});
}
for (let nextIndex = startIndex + 1; nextIndex < transportAttempts.length; nextIndex += 1) {
const nextAttempt = transportAttempts[nextIndex];
if (nextAttempt.logMessage) {
log.warn(`${nextAttempt.logMessage} (codes=${formatErrorCodes(err)})`);
}
try {
const response = await sourceFetch(
input,
withDispatcherIfMissing(init, nextAttempt.createDispatcher()),
);
stickyAttemptIndex = nextIndex;
return response;
} catch (caught) {
err = caught;
if (!shouldUseTelegramTransportFallback(err)) {
throw err;
}
}
}
throw err;
}) as typeof fetch;
return {
fetch: resolvedFetch,
sourceFetch,
pinnedDispatcherPolicy: defaultDispatcher.effectivePolicy,
fallbackPinnedDispatcherPolicy,
dispatcherAttempts: transportAttempts.map((attempt) => attempt.exportAttempt),
};
}