fix: preserve embedded dispatcher timeouts

This commit is contained in:
Shakker
2026-05-06 06:03:30 +01:00
committed by Shakker
parent d52f581f76
commit 7544beea17
5 changed files with 103 additions and 36 deletions

View File

@@ -1,14 +1,14 @@
import {
DEFAULT_UNDICI_STREAM_TIMEOUT_MS,
ensureGlobalUndiciDispatcherStreamTimeouts,
ensureGlobalUndiciEnvProxyDispatcher,
ensureGlobalUndiciStreamTimeouts,
} from "../../../infra/net/undici-global-dispatcher.js";
export function configureEmbeddedAttemptHttpRuntime(params: { timeoutMs: number }): void {
// Proxy bootstrap must happen before timeout tuning so the timeouts wrap the
// active EnvHttpProxyAgent instead of being replaced by a bare proxy dispatcher.
ensureGlobalUndiciEnvProxyDispatcher();
ensureGlobalUndiciStreamTimeouts({
ensureGlobalUndiciDispatcherStreamTimeouts({
timeoutMs: Math.max(params.timeoutMs, DEFAULT_UNDICI_STREAM_TIMEOUT_MS),
});
}

View File

@@ -60,6 +60,7 @@ type AttemptSpawnWorkspaceHoisted = {
sessionManagerOpenMock: UnknownMock;
resolveSandboxContextMock: UnknownMock;
ensureGlobalUndiciEnvProxyDispatcherMock: UnknownMock;
ensureGlobalUndiciDispatcherStreamTimeoutsMock: UnknownMock;
ensureGlobalUndiciStreamTimeoutsMock: UnknownMock;
buildEmbeddedMessageActionDiscoveryInputMock: UnknownMock;
createOpenClawCodingToolsMock: UnknownMock;
@@ -125,6 +126,7 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => {
const sessionManagerOpenMock = vi.fn();
const resolveSandboxContextMock = vi.fn();
const ensureGlobalUndiciEnvProxyDispatcherMock = vi.fn();
const ensureGlobalUndiciDispatcherStreamTimeoutsMock = vi.fn();
const ensureGlobalUndiciStreamTimeoutsMock = vi.fn();
const buildEmbeddedMessageActionDiscoveryInputMock = vi.fn((params: unknown) => params);
const createOpenClawCodingToolsMock = vi.fn(() => []);
@@ -193,6 +195,7 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => {
sessionManagerOpenMock,
resolveSandboxContextMock,
ensureGlobalUndiciEnvProxyDispatcherMock,
ensureGlobalUndiciDispatcherStreamTimeoutsMock,
ensureGlobalUndiciStreamTimeoutsMock,
buildEmbeddedMessageActionDiscoveryInputMock,
createOpenClawCodingToolsMock,
@@ -287,6 +290,8 @@ vi.mock("../../../infra/net/undici-global-dispatcher.js", () => ({
DEFAULT_UNDICI_STREAM_TIMEOUT_MS: 120_000,
ensureGlobalUndiciEnvProxyDispatcher: (...args: unknown[]) =>
hoisted.ensureGlobalUndiciEnvProxyDispatcherMock(...args),
ensureGlobalUndiciDispatcherStreamTimeouts: (...args: unknown[]) =>
hoisted.ensureGlobalUndiciDispatcherStreamTimeoutsMock(...args),
ensureGlobalUndiciStreamTimeouts: (...args: unknown[]) =>
hoisted.ensureGlobalUndiciStreamTimeoutsMock(...args),
}));
@@ -792,6 +797,7 @@ export function resetEmbeddedAttemptHarness(
hoisted.sessionManagerOpenMock.mockReset().mockReturnValue(hoisted.sessionManager);
hoisted.resolveSandboxContextMock.mockReset();
hoisted.ensureGlobalUndiciEnvProxyDispatcherMock.mockReset();
hoisted.ensureGlobalUndiciDispatcherStreamTimeoutsMock.mockReset();
hoisted.ensureGlobalUndiciStreamTimeoutsMock.mockReset();
hoisted.buildEmbeddedMessageActionDiscoveryInputMock
.mockReset()

View File

@@ -2,14 +2,14 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
const mocks = vi.hoisted(() => ({
DEFAULT_UNDICI_STREAM_TIMEOUT_MS: 30 * 60 * 1000,
ensureGlobalUndiciDispatcherStreamTimeouts: vi.fn(),
ensureGlobalUndiciEnvProxyDispatcher: vi.fn(),
ensureGlobalUndiciStreamTimeouts: vi.fn(),
}));
vi.mock("../../../infra/net/undici-global-dispatcher.js", () => ({
DEFAULT_UNDICI_STREAM_TIMEOUT_MS: mocks.DEFAULT_UNDICI_STREAM_TIMEOUT_MS,
ensureGlobalUndiciDispatcherStreamTimeouts: mocks.ensureGlobalUndiciDispatcherStreamTimeouts,
ensureGlobalUndiciEnvProxyDispatcher: mocks.ensureGlobalUndiciEnvProxyDispatcher,
ensureGlobalUndiciStreamTimeouts: mocks.ensureGlobalUndiciStreamTimeouts,
}));
import { configureEmbeddedAttemptHttpRuntime } from "./attempt-http-runtime.js";
@@ -17,14 +17,14 @@ import { configureEmbeddedAttemptHttpRuntime } from "./attempt-http-runtime.js";
describe("runEmbeddedAttempt undici timeout wiring", () => {
beforeEach(() => {
mocks.ensureGlobalUndiciEnvProxyDispatcher.mockReset();
mocks.ensureGlobalUndiciStreamTimeouts.mockReset();
mocks.ensureGlobalUndiciDispatcherStreamTimeouts.mockReset();
});
it("does not lower global undici stream tuning below the shared default", () => {
configureEmbeddedAttemptHttpRuntime({ timeoutMs: 123_456 });
expect(mocks.ensureGlobalUndiciEnvProxyDispatcher).toHaveBeenCalledOnce();
expect(mocks.ensureGlobalUndiciStreamTimeouts).toHaveBeenCalledWith({
expect(mocks.ensureGlobalUndiciDispatcherStreamTimeouts).toHaveBeenCalledWith({
timeoutMs: mocks.DEFAULT_UNDICI_STREAM_TIMEOUT_MS,
});
});
@@ -35,7 +35,7 @@ describe("runEmbeddedAttempt undici timeout wiring", () => {
configureEmbeddedAttemptHttpRuntime({ timeoutMs });
expect(mocks.ensureGlobalUndiciEnvProxyDispatcher).toHaveBeenCalledOnce();
expect(mocks.ensureGlobalUndiciStreamTimeouts).toHaveBeenCalledWith({
expect(mocks.ensureGlobalUndiciDispatcherStreamTimeouts).toHaveBeenCalledWith({
timeoutMs,
});
});

View File

@@ -79,6 +79,7 @@ vi.mock("../wsl.js", () => ({
import { isWSL2Sync } from "../wsl.js";
import { hasEnvHttpProxyAgentConfigured, resolveEnvHttpProxyAgentOptions } from "./proxy-env.js";
let DEFAULT_UNDICI_STREAM_TIMEOUT_MS: typeof import("./undici-global-dispatcher.js").DEFAULT_UNDICI_STREAM_TIMEOUT_MS;
let ensureGlobalUndiciDispatcherStreamTimeouts: typeof import("./undici-global-dispatcher.js").ensureGlobalUndiciDispatcherStreamTimeouts;
let ensureGlobalUndiciEnvProxyDispatcher: typeof import("./undici-global-dispatcher.js").ensureGlobalUndiciEnvProxyDispatcher;
let ensureGlobalUndiciStreamTimeouts: typeof import("./undici-global-dispatcher.js").ensureGlobalUndiciStreamTimeouts;
let forceResetGlobalDispatcher: typeof import("./undici-global-dispatcher.js").forceResetGlobalDispatcher;
@@ -90,6 +91,7 @@ describe("ensureGlobalUndiciStreamTimeouts", () => {
undiciGlobalDispatcherModule = await import("./undici-global-dispatcher.js");
({
DEFAULT_UNDICI_STREAM_TIMEOUT_MS,
ensureGlobalUndiciDispatcherStreamTimeouts,
ensureGlobalUndiciEnvProxyDispatcher,
ensureGlobalUndiciStreamTimeouts,
forceResetGlobalDispatcher,
@@ -150,6 +152,26 @@ describe("ensureGlobalUndiciStreamTimeouts", () => {
expect(output.trim()).toBe("ok");
});
it("explicitly tunes the global dispatcher when requested for embedded attempts", () => {
getDefaultAutoSelectFamily.mockReturnValue(false);
ensureGlobalUndiciDispatcherStreamTimeouts({ timeoutMs: 1_900_000 });
expect(loadUndiciGlobalDispatcherDeps).toHaveBeenCalledTimes(1);
expect(setGlobalDispatcher).toHaveBeenCalledTimes(1);
const next = getCurrentDispatcher() as { options?: Record<string, unknown> };
expect(next).toBeInstanceOf(Agent);
expect(next.options).toEqual({
bodyTimeout: 1_900_000,
headersTimeout: 1_900_000,
connect: {
autoSelectFamily: false,
autoSelectFamilyAttemptTimeout: 300,
},
});
expect(undiciGlobalDispatcherModule._globalUndiciStreamTimeoutMs).toBe(1_900_000);
});
it("replaces EnvHttpProxyAgent dispatcher while preserving env-proxy mode", () => {
getDefaultAutoSelectFamily.mockReturnValue(false);
vi.mocked(hasEnvHttpProxyAgentConfigured).mockReturnValue(true);

View File

@@ -49,9 +49,17 @@ function resolveDispatcherKey(params: {
return `${params.kind}:${params.timeoutMs}:${autoSelectToken}`;
}
function resolveStreamTimeoutMs(opts?: { timeoutMs?: number }): number | null {
const timeoutMsRaw = opts?.timeoutMs ?? DEFAULT_UNDICI_STREAM_TIMEOUT_MS;
if (!Number.isFinite(timeoutMsRaw)) {
return null;
}
return Math.max(DEFAULT_UNDICI_STREAM_TIMEOUT_MS, Math.floor(timeoutMsRaw));
}
function resolveCurrentDispatcherKind(
runtime: Pick<UndiciGlobalDispatcherDeps, "getGlobalDispatcher">,
): DispatcherKind | null {
): Exclude<DispatcherKind, "unsupported"> | null {
let dispatcher: unknown;
try {
dispatcher = runtime.getGlobalDispatcher();
@@ -92,27 +100,12 @@ export function ensureGlobalUndiciEnvProxyDispatcher(): void {
}
}
export function ensureGlobalUndiciStreamTimeouts(opts?: { timeoutMs?: number }): void {
const timeoutMsRaw = opts?.timeoutMs ?? DEFAULT_UNDICI_STREAM_TIMEOUT_MS;
if (!Number.isFinite(timeoutMsRaw)) {
return;
}
const timeoutMs = Math.max(DEFAULT_UNDICI_STREAM_TIMEOUT_MS, Math.floor(timeoutMsRaw));
_globalUndiciStreamTimeoutMs = timeoutMs;
if (!hasEnvHttpProxyAgentConfigured()) {
lastAppliedTimeoutKey = null;
return;
}
const runtime = loadUndiciGlobalDispatcherDeps();
const { EnvHttpProxyAgent, setGlobalDispatcher } = runtime;
const kind = resolveCurrentDispatcherKind(runtime);
if (kind === null) {
return;
}
if (kind !== "env-proxy") {
return;
}
function applyGlobalDispatcherStreamTimeouts(params: {
runtime: UndiciGlobalDispatcherDeps;
kind: Exclude<DispatcherKind, "unsupported">;
timeoutMs: number;
}): void {
const { runtime, kind, timeoutMs } = params;
const autoSelectFamily = resolveUndiciAutoSelectFamily();
const nextKey = resolveDispatcherKey({ kind, timeoutMs, autoSelectFamily });
if (lastAppliedTimeoutKey === nextKey) {
@@ -121,19 +114,65 @@ export function ensureGlobalUndiciStreamTimeouts(opts?: { timeoutMs?: number }):
const connect = createUndiciAutoSelectFamilyConnectOptions(autoSelectFamily);
try {
const proxyOptions = {
...resolveEnvHttpProxyAgentOptions(),
bodyTimeout: timeoutMs,
headersTimeout: timeoutMs,
...(connect ? { connect } : {}),
} as ConstructorParameters<UndiciGlobalDispatcherDeps["EnvHttpProxyAgent"]>[0];
setGlobalDispatcher(new EnvHttpProxyAgent(proxyOptions));
if (kind === "env-proxy") {
const proxyOptions = {
...resolveEnvHttpProxyAgentOptions(),
bodyTimeout: timeoutMs,
headersTimeout: timeoutMs,
...(connect ? { connect } : {}),
} as ConstructorParameters<UndiciGlobalDispatcherDeps["EnvHttpProxyAgent"]>[0];
runtime.setGlobalDispatcher(new runtime.EnvHttpProxyAgent(proxyOptions));
} else {
runtime.setGlobalDispatcher(
new runtime.Agent({
bodyTimeout: timeoutMs,
headersTimeout: timeoutMs,
...(connect ? { connect } : {}),
}),
);
}
lastAppliedTimeoutKey = nextKey;
} catch {
// Best-effort hardening only.
}
}
export function ensureGlobalUndiciStreamTimeouts(opts?: { timeoutMs?: number }): void {
const timeoutMs = resolveStreamTimeoutMs(opts);
if (timeoutMs === null) {
return;
}
_globalUndiciStreamTimeoutMs = timeoutMs;
if (!hasEnvHttpProxyAgentConfigured()) {
lastAppliedTimeoutKey = null;
return;
}
const runtime = loadUndiciGlobalDispatcherDeps();
const kind = resolveCurrentDispatcherKind(runtime);
if (kind === null) {
return;
}
if (kind !== "env-proxy") {
return;
}
applyGlobalDispatcherStreamTimeouts({ runtime, kind, timeoutMs });
}
export function ensureGlobalUndiciDispatcherStreamTimeouts(opts?: { timeoutMs?: number }): void {
const timeoutMs = resolveStreamTimeoutMs(opts);
if (timeoutMs === null) {
return;
}
_globalUndiciStreamTimeoutMs = timeoutMs;
const runtime = loadUndiciGlobalDispatcherDeps();
const kind = resolveCurrentDispatcherKind(runtime);
if (kind === null) {
return;
}
applyGlobalDispatcherStreamTimeouts({ runtime, kind, timeoutMs });
}
export function resetGlobalUndiciStreamTimeoutsForTests(): void {
lastAppliedTimeoutKey = null;
lastAppliedProxyBootstrap = false;