mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-20 21:23:23 +00:00
feat: add LLM idle timeout for streaming responses
Problem: When LLM stops responding, the agent hangs for ~5 minutes with no feedback. Users had to use /stop to recover. Solution: Add idle timeout detection for LLM streaming responses.
This commit is contained in:
@@ -177,6 +177,7 @@ import {
|
||||
} from "./compaction-timeout.js";
|
||||
import { pruneProcessedHistoryImages } from "./history-image-prune.js";
|
||||
import { detectAndLoadPromptImages } from "./images.js";
|
||||
import { resolveLlmIdleTimeoutMs, streamWithIdleTimeout } from "./llm-idle-timeout.js";
|
||||
import type { EmbeddedRunAttemptParams, EmbeddedRunAttemptResult } from "./types.js";
|
||||
|
||||
export {
|
||||
@@ -1056,7 +1057,6 @@ export async function runEmbeddedAttempt(
|
||||
activeSession.agent.streamFn,
|
||||
);
|
||||
}
|
||||
|
||||
// Anthropic-compatible providers can add new stop reasons before pi-ai maps them.
|
||||
// Recover the known "sensitive" stop reason here so a model refusal does not
|
||||
// bubble out as an uncaught runner error and stall channel polling.
|
||||
@@ -1064,6 +1064,18 @@ export async function runEmbeddedAttempt(
|
||||
activeSession.agent.streamFn,
|
||||
);
|
||||
|
||||
let idleTimeoutTrigger: ((error: Error) => void) | undefined;
|
||||
|
||||
// Wrap stream with idle timeout detection
|
||||
const idleTimeoutMs = resolveLlmIdleTimeoutMs(params.config);
|
||||
if (idleTimeoutMs > 0) {
|
||||
activeSession.agent.streamFn = streamWithIdleTimeout(
|
||||
activeSession.agent.streamFn,
|
||||
idleTimeoutMs,
|
||||
(error) => idleTimeoutTrigger?.(error),
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
const prior = await sanitizeSessionHistory({
|
||||
messages: activeSession.messages,
|
||||
@@ -1156,6 +1168,13 @@ export async function runEmbeddedAttempt(
|
||||
};
|
||||
const makeAbortError = (signal: AbortSignal): Error => {
|
||||
const reason = getAbortReason(signal);
|
||||
// If the reason is already an Error, preserve it to keep the original message
|
||||
// (e.g., "LLM idle timeout (60s): no response from model" instead of "aborted")
|
||||
if (reason instanceof Error) {
|
||||
const err = new Error(reason.message, { cause: reason });
|
||||
err.name = "AbortError";
|
||||
return err;
|
||||
}
|
||||
const err = reason ? new Error("aborted", { cause: reason }) : new Error("aborted");
|
||||
err.name = "AbortError";
|
||||
return err;
|
||||
@@ -1187,6 +1206,9 @@ export async function runEmbeddedAttempt(
|
||||
abortCompaction();
|
||||
void activeSession.abort();
|
||||
};
|
||||
idleTimeoutTrigger = (error) => {
|
||||
abortRun(true, error);
|
||||
};
|
||||
const abortable = <T>(promise: Promise<T>): Promise<T> => {
|
||||
const signal = runAbortController.signal;
|
||||
if (signal.aborted) {
|
||||
|
||||
219
src/agents/pi-embedded-runner/run/llm-idle-timeout.test.ts
Normal file
219
src/agents/pi-embedded-runner/run/llm-idle-timeout.test.ts
Normal file
@@ -0,0 +1,219 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../../../config/config.js";
|
||||
import {
|
||||
DEFAULT_LLM_IDLE_TIMEOUT_MS,
|
||||
resolveLlmIdleTimeoutMs,
|
||||
streamWithIdleTimeout,
|
||||
} from "./llm-idle-timeout.js";
|
||||
|
||||
describe("resolveLlmIdleTimeoutMs", () => {
|
||||
it("returns default when config is undefined", () => {
|
||||
expect(resolveLlmIdleTimeoutMs(undefined)).toBe(DEFAULT_LLM_IDLE_TIMEOUT_MS);
|
||||
});
|
||||
|
||||
it("returns default when llm config is missing", () => {
|
||||
const cfg = { agents: {} } as OpenClawConfig;
|
||||
expect(resolveLlmIdleTimeoutMs(cfg)).toBe(DEFAULT_LLM_IDLE_TIMEOUT_MS);
|
||||
});
|
||||
|
||||
it("returns default when idleTimeoutSeconds is not set", () => {
|
||||
const cfg = { agents: { defaults: { llm: {} } } } as OpenClawConfig;
|
||||
expect(resolveLlmIdleTimeoutMs(cfg)).toBe(DEFAULT_LLM_IDLE_TIMEOUT_MS);
|
||||
});
|
||||
|
||||
it("returns 0 when idleTimeoutSeconds is 0 (disabled)", () => {
|
||||
const cfg = { agents: { defaults: { llm: { idleTimeoutSeconds: 0 } } } } as OpenClawConfig;
|
||||
expect(resolveLlmIdleTimeoutMs(cfg)).toBe(0);
|
||||
});
|
||||
|
||||
it("returns configured value in milliseconds", () => {
|
||||
const cfg = { agents: { defaults: { llm: { idleTimeoutSeconds: 30 } } } } as OpenClawConfig;
|
||||
expect(resolveLlmIdleTimeoutMs(cfg)).toBe(30_000);
|
||||
});
|
||||
|
||||
it("caps at max safe timeout", () => {
|
||||
const cfg = {
|
||||
agents: { defaults: { llm: { idleTimeoutSeconds: 10_000_000 } } },
|
||||
} as OpenClawConfig;
|
||||
expect(resolveLlmIdleTimeoutMs(cfg)).toBe(2_147_000_000);
|
||||
});
|
||||
|
||||
it("ignores negative values", () => {
|
||||
const cfg = { agents: { defaults: { llm: { idleTimeoutSeconds: -10 } } } } as OpenClawConfig;
|
||||
expect(resolveLlmIdleTimeoutMs(cfg)).toBe(DEFAULT_LLM_IDLE_TIMEOUT_MS);
|
||||
});
|
||||
|
||||
it("ignores non-finite values", () => {
|
||||
const cfg = {
|
||||
agents: { defaults: { llm: { idleTimeoutSeconds: Infinity } } },
|
||||
} as OpenClawConfig;
|
||||
expect(resolveLlmIdleTimeoutMs(cfg)).toBe(DEFAULT_LLM_IDLE_TIMEOUT_MS);
|
||||
});
|
||||
});
|
||||
|
||||
describe("streamWithIdleTimeout", () => {
|
||||
// Helper to create a mock async iterable
|
||||
function createMockAsyncIterable<T>(chunks: T[]): AsyncIterable<T> {
|
||||
return {
|
||||
[Symbol.asyncIterator]() {
|
||||
let index = 0;
|
||||
return {
|
||||
async next() {
|
||||
if (index < chunks.length) {
|
||||
return { done: false, value: chunks[index++] };
|
||||
}
|
||||
return { done: true, value: undefined };
|
||||
},
|
||||
async return() {
|
||||
return { done: true, value: undefined };
|
||||
},
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
it("wraps stream function", () => {
|
||||
const mockStream = createMockAsyncIterable([]);
|
||||
const baseFn = vi.fn().mockReturnValue(mockStream);
|
||||
const wrapped = streamWithIdleTimeout(baseFn, 1000);
|
||||
expect(typeof wrapped).toBe("function");
|
||||
});
|
||||
|
||||
it("passes through model, context, and options", async () => {
|
||||
const mockStream = createMockAsyncIterable([]);
|
||||
const baseFn = vi.fn().mockReturnValue(mockStream);
|
||||
const wrapped = streamWithIdleTimeout(baseFn, 1000);
|
||||
|
||||
const model = { api: "openai" } as Parameters<typeof baseFn>[0];
|
||||
const context = {} as Parameters<typeof baseFn>[1];
|
||||
const options = {} as Parameters<typeof baseFn>[2];
|
||||
|
||||
void wrapped(model, context, options);
|
||||
|
||||
expect(baseFn).toHaveBeenCalledWith(model, context, options);
|
||||
});
|
||||
|
||||
it("throws on idle timeout", async () => {
|
||||
// Create a stream that never yields
|
||||
const slowStream: AsyncIterable<unknown> = {
|
||||
[Symbol.asyncIterator]() {
|
||||
return {
|
||||
async next() {
|
||||
// Never resolves - simulates hung LLM
|
||||
return new Promise<IteratorResult<unknown>>(() => {});
|
||||
},
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
const baseFn = vi.fn().mockReturnValue(slowStream);
|
||||
const wrapped = streamWithIdleTimeout(baseFn, 50); // 50ms timeout
|
||||
|
||||
const model = {} as Parameters<typeof baseFn>[0];
|
||||
const context = {} as Parameters<typeof baseFn>[1];
|
||||
const options = {} as Parameters<typeof baseFn>[2];
|
||||
|
||||
const stream = wrapped(model, context, options) as AsyncIterable<unknown>;
|
||||
const iterator = stream[Symbol.asyncIterator]();
|
||||
|
||||
await expect(iterator.next()).rejects.toThrow(/LLM idle timeout/);
|
||||
});
|
||||
|
||||
it("resets timer on each chunk", async () => {
|
||||
const chunks = [{ text: "a" }, { text: "b" }, { text: "c" }];
|
||||
const mockStream = createMockAsyncIterable(chunks);
|
||||
const baseFn = vi.fn().mockReturnValue(mockStream);
|
||||
const wrapped = streamWithIdleTimeout(baseFn, 1000);
|
||||
|
||||
const model = {} as Parameters<typeof baseFn>[0];
|
||||
const context = {} as Parameters<typeof baseFn>[1];
|
||||
const options = {} as Parameters<typeof baseFn>[2];
|
||||
|
||||
const stream = wrapped(model, context, options) as AsyncIterable<unknown>;
|
||||
const results: unknown[] = [];
|
||||
|
||||
for await (const chunk of stream) {
|
||||
results.push(chunk);
|
||||
}
|
||||
|
||||
expect(results).toHaveLength(3);
|
||||
expect(results).toEqual(chunks);
|
||||
});
|
||||
|
||||
it("handles stream with delays between chunks", async () => {
|
||||
// Create a stream with small delays
|
||||
const delayedStream: AsyncIterable<{ text: string }> = {
|
||||
[Symbol.asyncIterator]() {
|
||||
let count = 0;
|
||||
return {
|
||||
async next() {
|
||||
if (count < 3) {
|
||||
await new Promise((r) => setTimeout(r, 10)); // 10ms delay
|
||||
return { done: false, value: { text: String(count++) } };
|
||||
}
|
||||
return { done: true, value: undefined };
|
||||
},
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
const baseFn = vi.fn().mockReturnValue(delayedStream);
|
||||
const wrapped = streamWithIdleTimeout(baseFn, 100); // 100ms timeout - should be enough
|
||||
|
||||
const model = {} as Parameters<typeof baseFn>[0];
|
||||
const context = {} as Parameters<typeof baseFn>[1];
|
||||
const options = {} as Parameters<typeof baseFn>[2];
|
||||
|
||||
const stream = wrapped(model, context, options) as AsyncIterable<{ text: string }>;
|
||||
const results: { text: string }[] = [];
|
||||
|
||||
for await (const chunk of stream) {
|
||||
results.push(chunk);
|
||||
}
|
||||
|
||||
expect(results).toHaveLength(3);
|
||||
});
|
||||
|
||||
it("aborts controller on idle timeout", async () => {
|
||||
// Create a stream that never yields
|
||||
const slowStream: AsyncIterable<unknown> = {
|
||||
[Symbol.asyncIterator]() {
|
||||
return {
|
||||
async next() {
|
||||
// Never resolves - simulates hung LLM
|
||||
return new Promise<IteratorResult<unknown>>(() => {});
|
||||
},
|
||||
};
|
||||
},
|
||||
};
|
||||
|
||||
const baseFn = vi.fn().mockReturnValue(slowStream);
|
||||
const controller = new AbortController();
|
||||
const wrapped = streamWithIdleTimeout(baseFn, 50, controller); // 50ms timeout
|
||||
|
||||
const model = {} as Parameters<typeof baseFn>[0];
|
||||
const context = {} as Parameters<typeof baseFn>[1];
|
||||
const options = {} as Parameters<typeof baseFn>[2];
|
||||
|
||||
const stream = wrapped(model, context, options) as AsyncIterable<unknown>;
|
||||
const iterator = stream[Symbol.asyncIterator]();
|
||||
|
||||
try {
|
||||
await iterator.next();
|
||||
// Should not reach here
|
||||
expect.fail("Expected timeout error");
|
||||
} catch (error) {
|
||||
// Verify the error message is preserved
|
||||
expect(error).toBeInstanceOf(Error);
|
||||
expect((error as Error).message).toMatch(/LLM idle timeout/);
|
||||
|
||||
// Verify the controller was aborted
|
||||
expect(controller.signal.aborted).toBe(true);
|
||||
|
||||
// Verify the abort reason is the same error
|
||||
const reason = controller.signal.reason;
|
||||
expect(reason).toBeInstanceOf(Error);
|
||||
expect((reason as Error).message).toMatch(/LLM idle timeout/);
|
||||
}
|
||||
});
|
||||
});
|
||||
121
src/agents/pi-embedded-runner/run/llm-idle-timeout.ts
Normal file
121
src/agents/pi-embedded-runner/run/llm-idle-timeout.ts
Normal file
@@ -0,0 +1,121 @@
|
||||
import type { StreamFn } from "@mariozechner/pi-agent-core";
|
||||
import { streamSimple } from "@mariozechner/pi-ai";
|
||||
import type { OpenClawConfig } from "../../../config/config.js";
|
||||
|
||||
/**
|
||||
* Default idle timeout for LLM streaming responses in milliseconds.
|
||||
* If no token is received within this time, the request is aborted.
|
||||
* Set to 0 to disable (never timeout).
|
||||
* Default: 60 seconds.
|
||||
*/
|
||||
export const DEFAULT_LLM_IDLE_TIMEOUT_MS = 60_000;
|
||||
|
||||
/**
|
||||
* Maximum safe timeout value (approximately 24.8 days).
|
||||
*/
|
||||
const MAX_SAFE_TIMEOUT_MS = 2_147_000_000;
|
||||
|
||||
/**
|
||||
* Resolves the LLM idle timeout from configuration.
|
||||
* @param cfg - OpenClaw configuration
|
||||
* @returns Idle timeout in milliseconds, or 0 to disable
|
||||
*/
|
||||
export function resolveLlmIdleTimeoutMs(cfg?: OpenClawConfig): number {
|
||||
const raw = cfg?.agents?.defaults?.llm?.idleTimeoutSeconds;
|
||||
// 0 means disabled (no timeout)
|
||||
if (raw === 0) {
|
||||
return 0;
|
||||
}
|
||||
if (typeof raw === "number" && Number.isFinite(raw) && raw > 0) {
|
||||
return Math.min(Math.floor(raw) * 1000, MAX_SAFE_TIMEOUT_MS);
|
||||
}
|
||||
return DEFAULT_LLM_IDLE_TIMEOUT_MS;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a stream function with idle timeout detection.
|
||||
* If no token is received within the specified timeout, the request is aborted.
|
||||
*
|
||||
* @param baseFn - The base stream function to wrap
|
||||
* @param timeoutMs - Idle timeout in milliseconds
|
||||
* @param controller - Optional abort controller to abort on timeout
|
||||
* @returns A wrapped stream function with idle timeout detection
|
||||
*/
|
||||
export function streamWithIdleTimeout(
|
||||
baseFn: StreamFn,
|
||||
timeoutMs: number,
|
||||
controller?: AbortController,
|
||||
): StreamFn {
|
||||
return (model, context, options) => {
|
||||
const maybeStream = baseFn(model, context, options);
|
||||
|
||||
const wrapStream = (stream: ReturnType<typeof streamSimple>) => {
|
||||
const originalAsyncIterator = stream[Symbol.asyncIterator].bind(stream);
|
||||
(stream as { [Symbol.asyncIterator]: typeof originalAsyncIterator })[Symbol.asyncIterator] =
|
||||
function () {
|
||||
const iterator = originalAsyncIterator();
|
||||
let idleTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
const createTimeoutPromise = (): Promise<never> => {
|
||||
return new Promise((_, reject) => {
|
||||
idleTimer = setTimeout(() => {
|
||||
const error = new Error(
|
||||
`LLM idle timeout (${Math.floor(timeoutMs / 1000)}s): no response from model`,
|
||||
);
|
||||
if (controller && !controller.signal.aborted) {
|
||||
controller.abort(error);
|
||||
}
|
||||
reject(error);
|
||||
}, timeoutMs);
|
||||
});
|
||||
};
|
||||
|
||||
const clearTimer = () => {
|
||||
if (idleTimer) {
|
||||
clearTimeout(idleTimer);
|
||||
idleTimer = null;
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
async next() {
|
||||
clearTimer();
|
||||
|
||||
try {
|
||||
// Race between the actual next() and the timeout
|
||||
const result = await Promise.race([iterator.next(), createTimeoutPromise()]);
|
||||
|
||||
if (result.done) {
|
||||
clearTimer();
|
||||
return result;
|
||||
}
|
||||
|
||||
clearTimer();
|
||||
return result;
|
||||
} catch (error) {
|
||||
clearTimer();
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
|
||||
return() {
|
||||
clearTimer();
|
||||
return iterator.return?.() ?? Promise.resolve({ done: true, value: undefined });
|
||||
},
|
||||
|
||||
throw(error?: unknown) {
|
||||
clearTimer();
|
||||
return iterator.throw?.(error) ?? Promise.reject(error);
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
return stream;
|
||||
};
|
||||
|
||||
if (maybeStream && typeof maybeStream === "object" && "then" in maybeStream) {
|
||||
return Promise.resolve(maybeStream).then(wrapStream);
|
||||
}
|
||||
return wrapStream(maybeStream);
|
||||
};
|
||||
}
|
||||
@@ -2303,6 +2303,19 @@ export const GENERATED_BASE_CONFIG_SCHEMA = {
|
||||
},
|
||||
additionalProperties: false,
|
||||
},
|
||||
llm: {
|
||||
type: "object",
|
||||
properties: {
|
||||
idleTimeoutSeconds: {
|
||||
description:
|
||||
"Idle timeout for LLM streaming responses in seconds. If no token is received within this time, the request is aborted. Set to 0 to disable. Default: 60 seconds.",
|
||||
type: "integer",
|
||||
minimum: 0,
|
||||
maximum: 9007199254740991,
|
||||
},
|
||||
},
|
||||
additionalProperties: false,
|
||||
},
|
||||
compaction: {
|
||||
type: "object",
|
||||
properties: {
|
||||
|
||||
@@ -171,6 +171,8 @@ export type AgentDefaultsConfig = {
|
||||
cliBackends?: Record<string, CliBackendConfig>;
|
||||
/** Opt-in: prune old tool results from the LLM context to reduce token usage. */
|
||||
contextPruning?: AgentContextPruningConfig;
|
||||
/** LLM timeout configuration. */
|
||||
llm?: AgentLlmConfig;
|
||||
/** Compaction tuning and pre-compaction memory flush behavior. */
|
||||
compaction?: AgentCompactionConfig;
|
||||
/** Embedded Pi runner hardening and compatibility controls. */
|
||||
@@ -365,3 +367,16 @@ export type AgentCompactionMemoryFlushConfig = {
|
||||
/** System prompt appended for the memory flush turn. */
|
||||
systemPrompt?: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* LLM timeout configuration.
|
||||
*/
|
||||
export type AgentLlmConfig = {
|
||||
/**
|
||||
* Idle timeout for LLM streaming responses in seconds.
|
||||
* If no token is received within this time, the request is aborted.
|
||||
* Set to 0 to disable (never timeout).
|
||||
* Default: 60 seconds.
|
||||
*/
|
||||
idleTimeoutSeconds?: number;
|
||||
};
|
||||
|
||||
@@ -85,6 +85,19 @@ export const AgentDefaultsSchema = z
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
llm: z
|
||||
.object({
|
||||
idleTimeoutSeconds: z
|
||||
.number()
|
||||
.int()
|
||||
.nonnegative()
|
||||
.optional()
|
||||
.describe(
|
||||
"Idle timeout for LLM streaming responses in seconds. If no token is received within this time, the request is aborted. Set to 0 to disable. Default: 60 seconds.",
|
||||
),
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
compaction: z
|
||||
.object({
|
||||
mode: z.union([z.literal("default"), z.literal("safeguard")]).optional(),
|
||||
|
||||
Reference in New Issue
Block a user