Matrix: recover from pinned dispatcher runtime failures (#61595)

Merged via squash.

Prepared head SHA: f9a2d9be7f
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
Gustavo Madeira Santana
2026-04-05 22:26:45 -04:00
committed by GitHub
parent 134d309571
commit 427997f989
6 changed files with 141 additions and 28 deletions

View File

@@ -236,6 +236,7 @@ Docs: https://docs.openclaw.ai
- Gateway/shutdown: bound websocket-server shutdown even when no tracked clients remain, so gateway restarts stop hanging until the watchdog kills the process. (#61565) Thanks @mbelinky.
- Control UI/multilingual: localize the remaining shared channel, instances, nodes, and gateway-confirmation strings so the dashboard stops mixing translated UI with hardcoded English labels. Thanks @vincentkoc.
- Discord/media: raise the default inbound and outbound media cap to `100MB` so Discord matches Telegram more closely and larger attachments stop failing on the old low default.
- Matrix: keep direct transport requests on the pinned dispatcher by routing them through undici runtime fetch, so Matrix clients resume syncing on newer runtimes without dropping the validated address binding. (#61595) Thanks @gumadeiras.
## 2026.4.2

View File

@@ -2,9 +2,12 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
import { MatrixMediaSizeLimitError } from "../media-errors.js";
import { performMatrixRequest } from "./transport.js";
const TEST_UNDICI_RUNTIME_DEPS_KEY = "__OPENCLAW_TEST_UNDICI_RUNTIME_DEPS__";
describe("performMatrixRequest", () => {
beforeEach(() => {
vi.unstubAllGlobals();
Reflect.deleteProperty(globalThis as object, TEST_UNDICI_RUNTIME_DEPS_KEY);
});
it("rejects oversized raw responses before buffering the whole body", async () => {
@@ -107,4 +110,44 @@ describe("performMatrixRequest", () => {
vi.useRealTimers();
}
}, 5_000);
it("uses undici runtime fetch for pinned Matrix requests so the dispatcher stays bound", async () => {
let ambientFetchCalls = 0;
vi.stubGlobal("fetch", (async () => {
ambientFetchCalls += 1;
throw new Error("expected pinned Matrix requests to avoid ambient fetch");
}) as typeof fetch);
const runtimeFetch = vi.fn(async (_input: RequestInfo | URL, init?: RequestInit) => {
const requestInit = init as RequestInit & { dispatcher?: unknown };
expect(requestInit.dispatcher).toBeDefined();
return new Response('{"ok":true}', {
status: 200,
headers: {
"content-type": "application/json",
},
});
});
(globalThis as Record<string, unknown>)[TEST_UNDICI_RUNTIME_DEPS_KEY] = {
Agent: class MockAgent {},
EnvHttpProxyAgent: class MockEnvHttpProxyAgent {},
ProxyAgent: class MockProxyAgent {},
fetch: runtimeFetch,
};
const result = await performMatrixRequest({
homeserver: "http://127.0.0.1:8008",
accessToken: "token",
method: "GET",
endpoint: "/_matrix/client/v3/account/whoami",
timeoutMs: 5000,
ssrfPolicy: { allowPrivateNetwork: true },
});
expect(result.text).toBe('{"ok":true}');
expect(ambientFetchCalls).toBe(0);
expect(runtimeFetch).toHaveBeenCalledTimes(1);
expect(
(runtimeFetch.mock.calls[0]?.[1] as RequestInit & { dispatcher?: unknown })?.dispatcher,
).toBeDefined();
});
});

View File

@@ -1,4 +1,7 @@
import type { PinnedDispatcherPolicy } from "openclaw/plugin-sdk/infra-runtime";
import {
fetchWithRuntimeDispatcher,
type PinnedDispatcherPolicy,
} from "openclaw/plugin-sdk/infra-runtime";
import {
buildTimeoutAbortSignal,
closeDispatcher,
@@ -21,6 +24,10 @@ type QueryValue =
export type QueryParams = Record<string, QueryValue> | null | undefined;
type MatrixDispatcherRequestInit = RequestInit & {
dispatcher?: ReturnType<typeof createPinnedDispatcher>;
};
function normalizeEndpoint(endpoint: string): string {
if (!endpoint) {
return "/";
@@ -84,6 +91,27 @@ function buildBufferedResponse(params: {
return response;
}
function isMockedFetch(fetchImpl: typeof fetch | undefined): boolean {
if (typeof fetchImpl !== "function") {
return false;
}
return typeof (fetchImpl as typeof fetch & { mock?: unknown }).mock === "object";
}
async function fetchWithMatrixDispatcher(params: {
url: string;
init: MatrixDispatcherRequestInit;
}): Promise<Response> {
// Keep this dispatcher-routing logic local to Matrix transport. Shared SSRF
// fetches must stay fail-closed unless a retry path can preserve the
// validated pinned-address binding. Route dispatcher-attached requests
// through undici runtime fetch so the pinned dispatcher is preserved.
if (params.init.dispatcher && !isMockedFetch(globalThis.fetch)) {
return await fetchWithRuntimeDispatcher(params.url, params.init);
}
return await fetch(params.url, params.init);
}
async function fetchWithMatrixGuardedRedirects(params: {
url: string;
init?: RequestInit;
@@ -110,15 +138,18 @@ async function fetchWithMatrixGuardedRedirects(params: {
policy: params.ssrfPolicy,
});
dispatcher = createPinnedDispatcher(pinned, params.dispatcherPolicy, params.ssrfPolicy);
const response = await fetch(currentUrl.toString(), {
...params.init,
method,
body,
headers,
redirect: "manual",
signal,
dispatcher,
} as RequestInit & { dispatcher: unknown });
const response = await fetchWithMatrixDispatcher({
url: currentUrl.toString(),
init: {
...params.init,
method,
body,
headers,
redirect: "manual",
signal,
dispatcher,
} as MatrixDispatcherRequestInit,
});
if (!isRedirectStatus(response.status)) {
return {

View File

@@ -21,6 +21,13 @@ const { agentCtor, envHttpProxyAgentCtor, proxyAgentCtor } = vi.hoisted(() => ({
}),
}));
function createPinnedDispatcherCompatibilityError(): Error {
const cause = Object.assign(new Error("invalid onRequestStart method"), {
code: "UND_ERR_INVALID_ARG",
});
return Object.assign(new TypeError("fetch failed"), { cause });
}
function redirectResponse(location: string): Response {
return new Response(null, {
status: 302,
@@ -310,6 +317,25 @@ describe("fetchWithSsrFGuard hardening", () => {
}
});
it("fails closed when the runtime rejects the pinned dispatcher shape", async () => {
const fetchImpl = vi.fn(async (_input: RequestInfo | URL, init?: RequestInit) => {
const requestInit = init as RequestInit & { dispatcher?: unknown };
if (requestInit.dispatcher) {
throw createPinnedDispatcherCompatibilityError();
}
return okResponse();
});
await expect(
fetchWithSsrFGuard({
url: "https://public.example/resource",
fetchImpl,
lookupFn: createPublicLookup(),
}),
).rejects.toThrow("fetch failed");
expect(fetchImpl).toHaveBeenCalledTimes(1);
});
it("ignores dispatcher support markers on ambient global fetch", async () => {
const runtimeFetch = vi.fn(async () => okResponse());
const originalGlobalFetch = globalThis.fetch;

View File

@@ -3,6 +3,11 @@ import { logWarn } from "../../logger.js";
import { buildTimeoutAbortSignal } from "../../utils/fetch-timeout.js";
import { hasProxyEnvConfigured } from "./proxy-env.js";
import { retainSafeHeadersForCrossOriginRedirect as retainSafeRedirectHeaders } from "./redirect-headers.js";
import {
fetchWithRuntimeDispatcher,
isMockedFetch,
type DispatcherAwareRequestInit,
} from "./runtime-fetch.js";
import {
closeDispatcher,
createPinnedDispatcher,
@@ -15,7 +20,6 @@ import {
import { loadUndiciRuntimeDeps } from "./undici-runtime.js";
type FetchLike = (input: RequestInfo | URL, init?: RequestInit) => Promise<Response>;
type DispatcherAwareRequestInit = RequestInit & { dispatcher?: Dispatcher };
export const GUARDED_FETCH_MODE = {
STRICT: "strict",
@@ -155,13 +159,6 @@ function isRedirectStatus(status: number): boolean {
return status === 301 || status === 302 || status === 303 || status === 307 || status === 308;
}
function isMockedFetch(fetchImpl: FetchLike | undefined): boolean {
if (typeof fetchImpl !== "function") {
return false;
}
return typeof (fetchImpl as FetchLike & { mock?: unknown }).mock === "object";
}
function isAmbientGlobalFetch(params: {
fetchImpl: FetchLike | undefined;
globalFetch: FetchLike | undefined;
@@ -227,16 +224,7 @@ function rewriteRedirectInitForMethod(params: {
};
}
async function fetchWithRuntimeDispatcher(
input: string,
init: DispatcherAwareRequestInit,
): Promise<Response> {
const runtimeFetch = loadUndiciRuntimeDeps().fetch as unknown as (
input: string,
init?: DispatcherAwareRequestInit,
) => Promise<unknown>;
return (await runtimeFetch(input, init)) as Response;
}
export { fetchWithRuntimeDispatcher } from "./runtime-fetch.js";
export async function fetchWithSsrFGuard(params: GuardedFetchOptions): Promise<GuardedFetchResult> {
const defaultFetch: FetchLike | undefined = params.fetchImpl ?? globalThis.fetch;

View File

@@ -0,0 +1,24 @@
import type { Dispatcher } from "undici";
import { loadUndiciRuntimeDeps } from "./undici-runtime.js";
export type DispatcherAwareRequestInit = RequestInit & { dispatcher?: Dispatcher };
type FetchLike = (input: RequestInfo | URL, init?: RequestInit) => Promise<Response>;
export function isMockedFetch(fetchImpl: FetchLike | undefined): boolean {
if (typeof fetchImpl !== "function") {
return false;
}
return typeof (fetchImpl as FetchLike & { mock?: unknown }).mock === "object";
}
export async function fetchWithRuntimeDispatcher(
input: RequestInfo | URL,
init?: DispatcherAwareRequestInit,
): Promise<Response> {
const runtimeFetch = loadUndiciRuntimeDeps().fetch as unknown as (
input: RequestInfo | URL,
init?: DispatcherAwareRequestInit,
) => Promise<unknown>;
return (await runtimeFetch(input, init)) as Response;
}