Surface Codex usage-limit reset details in chat replies (#77557)

* fix(codex): surface usage limit reset details

* fix(codex): satisfy extension lint

* fix: surface codex runtime failures in tool-only replies
This commit is contained in:
pashpashpash
2026-05-04 17:00:39 -07:00
committed by GitHub
parent 306a582294
commit b2c3202a15
26 changed files with 1187 additions and 103 deletions

View File

@@ -66,6 +66,7 @@ Docs: https://docs.openclaw.ai
- Gateway/watch: suppress sync-I/O trace output during `pnpm gateway:watch --benchmark` unless explicitly requested, so CPU profiling no longer floods the terminal with stack traces.
- Gateway/watch: when benchmark sync-I/O tracing is explicitly enabled, tee trace blocks to the benchmark output log and filter them from the terminal pane while keeping normal Gateway logs visible.
- Plugins/runtime-deps: include `json5` in the memory-core plugin runtime dependency set so packaged `memory_search` sandboxes can resolve generated OpenClaw runtime chunks that parse JSON5 config. Fixes #77461.
- Codex harness: preserve app-server usage-limit reset details and deliver OpenClaw-owned runtime failure notices through tool-only source-reply mode, so Telegram and other chat channels tell users when Codex subscription limits or API failures block a turn instead of going silent.
- Agents/OpenAI: default direct OpenAI Responses models to the SSE transport instead of WebSocket auto-selection, preventing pi runtime chat turns from hanging on servers where the WebSocket path stalls while the OpenAI HTTP stream works. Thanks @vincentkoc.
- Discord: prefer IPv4 for Discord REST and gateway WebSocket startup paths so IPv4-only networks no longer stall before Gateway READY and inbound message dispatch. Fixes #77398; refs #77526. Thanks @Beandon13.
- Channels/plugins: key bundled package-state probes, env/config presence, and read-only command defaults by channel id instead of manifest plugin id, preserving setup and native-command detection for channel plugins whose package id differs from the channel alias. Thanks @vincentkoc.

View File

@@ -845,6 +845,10 @@ Common forms:
- `/codex mcp` lists Codex app-server MCP server status.
- `/codex skills` lists Codex app-server skills.
When Codex reports a usage-limit failure, OpenClaw includes the next
app-server reset time when Codex provided one. Use `/codex account` in the same
conversation to inspect the current account and rate-limit windows.
### Common debugging workflow
When a Codex-backed agent does something surprising in Telegram, Discord, Slack,

View File

@@ -14,6 +14,7 @@ import {
CodexAppServerEventProjector,
type CodexAppServerToolTelemetry,
} from "./event-projector.js";
import { rememberCodexRateLimits, resetCodexRateLimitCacheForTests } from "./rate-limit-cache.js";
import { createCodexTestModel } from "./test-support.js";
const THREAD_ID = "thread-1";
@@ -86,6 +87,7 @@ beforeEach(() => {
afterEach(async () => {
resetAgentEventsForTest();
resetGlobalHookRunner();
resetCodexRateLimitCacheForTests();
vi.restoreAllMocks();
for (const tempDir of tempDirs) {
await fs.rm(tempDir, { recursive: true, force: true });
@@ -140,6 +142,23 @@ function appServerError(params: { message: string; willRetry: boolean }): Projec
});
}
function rateLimitsUpdated(resetsAt: number): ProjectorNotification {
return {
method: "account/rateLimits/updated",
params: {
rateLimits: {
limitId: "codex",
limitName: "Codex",
primary: { usedPercent: 100, windowDurationMins: 300, resetsAt },
secondary: null,
credits: null,
planType: "plus",
rateLimitReachedType: "rate_limit_reached",
},
},
} as ProjectorNotification;
}
function turnCompleted(items: unknown[] = []): ProjectorNotification {
return {
method: "turn/completed",
@@ -280,6 +299,95 @@ describe("CodexAppServerEventProjector", () => {
expect(result.lastAssistant).toBeUndefined();
});
it("uses Codex rate-limit resets for usage-limit app-server errors", async () => {
const projector = await createProjector();
const resetsAt = Math.ceil(Date.now() / 1000) + 120;
await projector.handleNotification(rateLimitsUpdated(resetsAt));
await projector.handleNotification(
forCurrentTurn("error", {
error: {
message: "You've reached your usage limit.",
codexErrorInfo: "usageLimitExceeded",
additionalDetails: null,
},
willRetry: false,
}),
);
const result = projector.buildResult(buildEmptyToolTelemetry());
expect(result.promptError).toContain("You've reached your Codex subscription usage limit.");
expect(result.promptError).toContain("Next reset in");
expect(result.promptError).toContain("Run /codex account");
expect(result.promptErrorSource).toBe("prompt");
});
it("uses Codex rate-limit resets for failed turns", async () => {
const projector = await createProjector();
const resetsAt = Math.ceil(Date.now() / 1000) + 120;
await projector.handleNotification(rateLimitsUpdated(resetsAt));
await projector.handleNotification(
forCurrentTurn("turn/completed", {
turn: {
id: TURN_ID,
status: "failed",
error: {
message: "You've reached your usage limit.",
codexErrorInfo: "usageLimitExceeded",
additionalDetails: null,
},
items: [],
},
}),
);
const result = projector.buildResult(buildEmptyToolTelemetry());
expect(result.promptError).toContain("You've reached your Codex subscription usage limit.");
expect(result.promptError).toContain("Next reset in");
expect(result.promptErrorSource).toBe("prompt");
});
it("uses a recent Codex rate-limit snapshot when failed turns omit reset details", async () => {
const projector = await createProjector();
const resetsAt = Math.ceil(Date.now() / 1000) + 120;
rememberCodexRateLimits({
rateLimits: {
limitId: "codex",
limitName: "Codex",
primary: { usedPercent: 100, windowDurationMins: 300, resetsAt },
secondary: null,
credits: null,
planType: "plus",
rateLimitReachedType: "rate_limit_reached",
},
rateLimitsByLimitId: null,
});
await projector.handleNotification(
forCurrentTurn("turn/completed", {
turn: {
id: TURN_ID,
status: "failed",
error: {
message: "You've reached your usage limit.",
codexErrorInfo: "usageLimitExceeded",
additionalDetails: null,
},
items: [],
},
}),
);
const result = projector.buildResult(buildEmptyToolTelemetry());
expect(result.promptError).toContain("You've reached your Codex subscription usage limit.");
expect(result.promptError).toContain("Next reset in");
expect(result.promptErrorSource).toBe("prompt");
});
it("normalizes snake_case current token usage fields", async () => {
const projector = await createProjector();

View File

@@ -27,6 +27,8 @@ import {
type JsonObject,
type JsonValue,
} from "./protocol.js";
import { readRecentCodexRateLimits, rememberCodexRateLimits } from "./rate-limit-cache.js";
import { formatCodexUsageLimitErrorMessage } from "./rate-limits.js";
import { readCodexMirroredSessionHistoryMessages } from "./session-history.js";
import { attachCodexMirrorIdentity } from "./transcript-mirror.js";
@@ -100,6 +102,7 @@ export class CodexAppServerEventProjector {
private tokenUsage: ReturnType<typeof normalizeUsage>;
private guardianReviewCount = 0;
private completedCompactionCount = 0;
private latestRateLimits: JsonValue | undefined;
constructor(
private readonly params: EmbeddedRunAttemptParams,
@@ -112,6 +115,11 @@ export class CodexAppServerEventProjector {
if (!params) {
return;
}
if (notification.method === "account/rateLimits/updated") {
this.latestRateLimits = params;
rememberCodexRateLimits(params);
return;
}
if (isHookNotificationMethod(notification.method)) {
if (!this.isHookNotificationForCurrentThread(params)) {
return;
@@ -167,7 +175,7 @@ export class CodexAppServerEventProjector {
if (readBooleanAlias(params, ["willRetry", "will_retry"]) === true) {
break;
}
this.promptError = readCodexErrorNotificationMessage(params) ?? "codex app-server error";
this.promptError = this.formatCodexErrorMessage(params) ?? "codex app-server error";
this.promptErrorSource = "prompt";
break;
default:
@@ -523,7 +531,14 @@ export class CodexAppServerEventProjector {
this.aborted = true;
}
if (turn.status === "failed") {
this.promptError = turn.error?.message ?? "codex app-server turn failed";
this.promptError =
formatCodexUsageLimitErrorMessage({
message: turn.error?.message,
codexErrorInfo: turn.error?.codexErrorInfo as JsonValue | null | undefined,
rateLimits: this.latestRateLimits ?? readRecentCodexRateLimits(),
}) ??
turn.error?.message ??
"codex app-server turn failed";
this.promptErrorSource = "prompt";
}
for (const item of turn.items ?? []) {
@@ -746,6 +761,17 @@ export class CodexAppServerEventProjector {
});
}
private formatCodexErrorMessage(params: JsonObject): string | undefined {
const error = isJsonObject(params.error) ? params.error : undefined;
return (
formatCodexUsageLimitErrorMessage({
message: error ? readString(error, "message") : undefined,
codexErrorInfo: error?.codexErrorInfo,
rateLimits: this.latestRateLimits ?? readRecentCodexRateLimits(),
}) ?? readCodexErrorNotificationMessage(params)
);
}
private emitAgentEvent(
event: Parameters<NonNullable<EmbeddedRunAttemptParams["onAgentEvent"]>>[0],
): void {

View File

@@ -0,0 +1,48 @@
import type { JsonValue } from "./protocol.js";
const DEFAULT_CODEX_RATE_LIMIT_CACHE_MAX_AGE_MS = 10 * 60_000;
const CODEX_RATE_LIMIT_CACHE_STATE = Symbol.for("openclaw.codexRateLimitCacheState");
type CodexRateLimitCacheState = {
value?: JsonValue;
updatedAtMs?: number;
};
function getCodexRateLimitCacheState(): CodexRateLimitCacheState {
const globalState = globalThis as typeof globalThis & {
[CODEX_RATE_LIMIT_CACHE_STATE]?: CodexRateLimitCacheState;
};
globalState[CODEX_RATE_LIMIT_CACHE_STATE] ??= {};
return globalState[CODEX_RATE_LIMIT_CACHE_STATE];
}
export function rememberCodexRateLimits(value: JsonValue | undefined, nowMs = Date.now()): void {
if (value === undefined) {
return;
}
const state = getCodexRateLimitCacheState();
state.value = value;
state.updatedAtMs = nowMs;
}
export function readRecentCodexRateLimits(options?: {
nowMs?: number;
maxAgeMs?: number;
}): JsonValue | undefined {
const state = getCodexRateLimitCacheState();
if (state.value === undefined || state.updatedAtMs === undefined) {
return undefined;
}
const nowMs = options?.nowMs ?? Date.now();
const maxAgeMs = options?.maxAgeMs ?? DEFAULT_CODEX_RATE_LIMIT_CACHE_MAX_AGE_MS;
if (maxAgeMs >= 0 && nowMs - state.updatedAtMs > maxAgeMs) {
return undefined;
}
return state.value;
}
export function resetCodexRateLimitCacheForTests(): void {
const state = getCodexRateLimitCacheState();
state.value = undefined;
state.updatedAtMs = undefined;
}

View File

@@ -0,0 +1,262 @@
import { isJsonObject, type JsonObject, type JsonValue } from "./protocol.js";
const CODEX_LIMIT_ID = "codex";
const LIMIT_WINDOW_KEYS = ["primary", "secondary"] as const;
const ONE_MINUTE_MS = 60_000;
const ONE_HOUR_MS = 60 * ONE_MINUTE_MS;
const ONE_DAY_MS = 24 * ONE_HOUR_MS;
type LimitWindowKey = (typeof LIMIT_WINDOW_KEYS)[number];
type RateLimitReset = {
resetsAtMs: number;
usedPercent?: number;
};
export function formatCodexUsageLimitErrorMessage(params: {
message?: string | null;
codexErrorInfo?: JsonValue | null;
rateLimits?: JsonValue;
nowMs?: number;
}): string | undefined {
const message = normalizeText(params.message);
if (!isCodexUsageLimitError(params.codexErrorInfo, message)) {
return undefined;
}
const nowMs = params.nowMs ?? Date.now();
const nextReset = selectNextRateLimitReset(params.rateLimits, nowMs);
const parts = ["You've reached your Codex subscription usage limit."];
if (nextReset) {
parts.push(`Next reset ${formatResetTime(nextReset.resetsAtMs, nowMs)}.`);
} else {
parts.push("Codex did not return a reset time for this limit.");
}
parts.push("Run /codex account for current usage details.");
return parts.join(" ");
}
export function summarizeCodexRateLimits(
value: JsonValue | undefined,
nowMs = Date.now(),
): string | undefined {
const snapshots = collectCodexRateLimitSnapshots(value);
if (snapshots.length === 0) {
return undefined;
}
return snapshots
.slice(0, 4)
.map((snapshot) => summarizeRateLimitSnapshot(snapshot, nowMs))
.join("; ");
}
function isCodexUsageLimitError(
codexErrorInfo: JsonValue | null | undefined,
message: string | undefined,
): boolean {
if (codexErrorInfo === "usageLimitExceeded") {
return true;
}
if (typeof codexErrorInfo === "string") {
const normalized = codexErrorInfo.replace(/[_\s-]/gu, "").toLowerCase();
if (normalized === "usagelimitexceeded") {
return true;
}
}
return Boolean(message?.toLowerCase().includes("usage limit"));
}
function selectNextRateLimitReset(
value: JsonValue | undefined,
nowMs: number,
): RateLimitReset | undefined {
const windows = collectCodexRateLimitSnapshots(value).flatMap((snapshot) =>
LIMIT_WINDOW_KEYS.flatMap((key) => readRateLimitWindow(snapshot, key) ?? []),
);
const futureWindows = windows.filter((window) => window.resetsAtMs > nowMs);
if (futureWindows.length === 0) {
return undefined;
}
const exhaustedWindows = futureWindows.filter(
(window) => window.usedPercent !== undefined && window.usedPercent >= 100,
);
const candidates = exhaustedWindows.length > 0 ? exhaustedWindows : futureWindows;
candidates.sort((left, right) => left.resetsAtMs - right.resetsAtMs);
return candidates[0];
}
function summarizeRateLimitSnapshot(snapshot: JsonObject, nowMs: number): string {
const label = formatLimitLabel(snapshot);
const windows = LIMIT_WINDOW_KEYS.flatMap((key) => {
const window = readRateLimitWindow(snapshot, key);
return window ? [formatRateLimitWindow(key, window, nowMs)] : [];
});
const reachedType = readString(snapshot, "rateLimitReachedType");
const suffix = reachedType ? ` (${formatReachedType(reachedType)})` : "";
return `${label}: ${windows.join(", ") || "available"}${suffix}`;
}
function collectCodexRateLimitSnapshots(value: JsonValue | undefined): JsonObject[] {
const snapshots: JsonObject[] = [];
const seen = new Set<string>();
collectRateLimitSnapshots(value, snapshots, seen);
return snapshots;
}
function collectRateLimitSnapshots(
value: JsonValue | undefined,
snapshots: JsonObject[],
seen: Set<string>,
): void {
if (Array.isArray(value)) {
for (const entry of value) {
collectRateLimitSnapshots(entry, snapshots, seen);
}
return;
}
if (!isJsonObject(value)) {
return;
}
if (isRateLimitSnapshot(value)) {
addRateLimitSnapshot(value, snapshots, seen);
return;
}
const byLimitId = value.rateLimitsByLimitId;
if (isJsonObject(byLimitId)) {
for (const key of sortedRateLimitKeys(Object.keys(byLimitId))) {
collectRateLimitSnapshots(byLimitId[key], snapshots, seen);
}
}
collectRateLimitSnapshots(value.rateLimits, snapshots, seen);
collectRateLimitSnapshots(value.data, snapshots, seen);
collectRateLimitSnapshots(value.items, snapshots, seen);
}
function sortedRateLimitKeys(keys: string[]): string[] {
return keys.toSorted((left, right) => {
if (left === CODEX_LIMIT_ID) {
return -1;
}
if (right === CODEX_LIMIT_ID) {
return 1;
}
return left.localeCompare(right);
});
}
function addRateLimitSnapshot(
snapshot: JsonObject,
snapshots: JsonObject[],
seen: Set<string>,
): void {
const signature = [
readNullableString(snapshot, "limitId") ?? "",
readNullableString(snapshot, "limitName") ?? "",
formatWindowSignature(snapshot.primary),
formatWindowSignature(snapshot.secondary),
].join("|");
if (seen.has(signature)) {
return;
}
seen.add(signature);
snapshots.push(snapshot);
}
function isRateLimitSnapshot(value: JsonObject): boolean {
return (
isJsonObject(value.primary) ||
isJsonObject(value.secondary) ||
value.rateLimitReachedType !== undefined ||
value.limitId !== undefined ||
value.limitName !== undefined
);
}
function readRateLimitWindow(
snapshot: JsonObject,
key: LimitWindowKey,
): RateLimitReset | undefined {
const window = snapshot[key];
if (!isJsonObject(window)) {
return undefined;
}
const resetsAt = readNumber(window, "resetsAt");
return {
...(typeof resetsAt === "number" && Number.isFinite(resetsAt) && resetsAt > 0
? { resetsAtMs: resetsAt * 1000 }
: { resetsAtMs: 0 }),
...readOptionalNumberField(window, "usedPercent"),
};
}
function readOptionalNumberField(record: JsonObject, key: string): { usedPercent?: number } {
const value = readNumber(record, key);
return value === undefined ? {} : { usedPercent: value };
}
function formatRateLimitWindow(key: LimitWindowKey, window: RateLimitReset, nowMs: number): string {
const usedPercent =
window.usedPercent === undefined ? "usage unknown" : `${Math.round(window.usedPercent)}%`;
const reset =
window.resetsAtMs > nowMs ? `, resets ${formatResetTime(window.resetsAtMs, nowMs)}` : "";
return `${key} ${usedPercent}${reset}`;
}
function formatLimitLabel(snapshot: JsonObject): string {
const label =
readNullableString(snapshot, "limitName") ?? readNullableString(snapshot, "limitId");
if (!label || label === CODEX_LIMIT_ID) {
return "Codex";
}
return label.replace(/[_-]+/gu, " ").replace(/\s+/gu, " ").trim();
}
function formatReachedType(value: string): string {
return value.replace(/[_-]+/gu, " ").replace(/\s+/gu, " ").trim();
}
function formatResetTime(resetsAtMs: number, nowMs: number): string {
return `in ${formatRelativeDuration(resetsAtMs - nowMs)} (${new Date(resetsAtMs).toISOString()})`;
}
function formatRelativeDuration(durationMs: number): string {
const safeMs = Math.max(1_000, durationMs);
if (safeMs < ONE_MINUTE_MS) {
return `${Math.ceil(safeMs / 1000)} seconds`;
}
if (safeMs < ONE_HOUR_MS) {
const minutes = Math.ceil(safeMs / ONE_MINUTE_MS);
return `${minutes} ${minutes === 1 ? "minute" : "minutes"}`;
}
if (safeMs < ONE_DAY_MS) {
const hours = Math.ceil(safeMs / ONE_HOUR_MS);
return `${hours} ${hours === 1 ? "hour" : "hours"}`;
}
const days = Math.ceil(safeMs / ONE_DAY_MS);
return `${days} ${days === 1 ? "day" : "days"}`;
}
function formatWindowSignature(value: JsonValue | undefined): string {
if (!isJsonObject(value)) {
return "";
}
return `${readNumber(value, "usedPercent") ?? ""}:${readNumber(value, "resetsAt") ?? ""}`;
}
function readString(record: JsonObject, key: string): string | undefined {
const value = record[key];
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}
function readNullableString(record: JsonObject, key: string): string | undefined {
return readString(record, key) ?? undefined;
}
function readNumber(record: JsonObject, key: string): number | undefined {
const value = record[key];
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function normalizeText(value: string | null | undefined): string | undefined {
const text = value?.trim();
return text ? text : undefined;
}

View File

@@ -24,6 +24,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { CODEX_GPT5_BEHAVIOR_CONTRACT } from "../../prompt-overlay.js";
import * as elicitationBridge from "./elicitation-bridge.js";
import type { CodexServerNotification } from "./protocol.js";
import { rememberCodexRateLimits, resetCodexRateLimitCacheForTests } from "./rate-limit-cache.js";
import { runCodexAppServerAttempt, __testing } from "./run-attempt.js";
import { readCodexAppServerBinding, writeCodexAppServerBinding } from "./session-binding.js";
import { createCodexTestModel } from "./test-support.js";
@@ -126,6 +127,23 @@ function turnStartResult(turnId = "turn-1", status = "inProgress") {
};
}
function rateLimitsUpdated(resetsAt: number): CodexServerNotification {
return {
method: "account/rateLimits/updated",
params: {
rateLimits: {
limitId: "codex",
limitName: "Codex",
primary: { usedPercent: 100, windowDurationMins: 300, resetsAt },
secondary: null,
credits: null,
planType: "plus",
rateLimitReachedType: "rate_limit_reached",
},
},
};
}
function assistantMessage(text: string, timestamp: number) {
return {
role: "assistant" as const,
@@ -351,6 +369,7 @@ describe("runCodexAppServerAttempt", () => {
afterEach(async () => {
__testing.resetCodexAppServerClientFactoryForTests();
resetCodexRateLimitCacheForTests();
nativeHookRelayTesting.clearNativeHookRelaysForTests();
resetAgentEventsForTest();
resetGlobalHookRunner();
@@ -1171,6 +1190,71 @@ describe("runCodexAppServerAttempt", () => {
expect(nativeHookRelayTesting.getNativeHookRelayRegistrationForTests(relayId)).toBeUndefined();
});
it("preserves Codex usage-limit reset details when turn/start fails", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
const resetsAt = Math.ceil(Date.now() / 1000) + 120;
let harness!: ReturnType<typeof createStartedThreadHarness>;
harness = createStartedThreadHarness(async (method) => {
if (method === "turn/start") {
await harness.notify(rateLimitsUpdated(resetsAt));
throw Object.assign(new Error("You've reached your usage limit."), {
data: { codexErrorInfo: "usageLimitExceeded" },
});
}
return undefined;
});
const runError = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir)).catch(
(error: unknown) => error,
);
const error = await runError;
expect(error).toBeInstanceOf(Error);
expect((error as Error).message).toContain(
"You've reached your Codex subscription usage limit.",
);
expect((error as Error).message).toContain("Next reset in");
});
it("uses a recent Codex rate-limit snapshot when turn/start omits reset details", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");
const resetsAt = Math.ceil(Date.now() / 1000) + 120;
rememberCodexRateLimits({
rateLimits: {
limitId: "codex",
limitName: "Codex",
primary: { usedPercent: 100, windowDurationMins: 300, resetsAt },
secondary: null,
credits: null,
planType: "plus",
rateLimitReachedType: "rate_limit_reached",
},
rateLimitsByLimitId: null,
});
const harness = createStartedThreadHarness(async (method) => {
if (method === "turn/start") {
throw Object.assign(new Error("You've reached your usage limit."), {
data: { codexErrorInfo: "usageLimitExceeded" },
});
}
return undefined;
});
const runError = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir)).catch(
(error: unknown) => error,
);
await harness.waitForMethod("turn/start");
const error = await runError;
expect(error).toBeInstanceOf(Error);
expect((error as Error).message).toContain(
"You've reached your Codex subscription usage limit.",
);
expect((error as Error).message).toContain("Next reset in");
});
it("cleans up native hook relay state when the Codex turn aborts", async () => {
const sessionFile = path.join(tempDir, "session.jsonl");
const workspaceDir = path.join(tempDir, "workspace");

View File

@@ -84,6 +84,8 @@ import {
type JsonObject,
type JsonValue,
} from "./protocol.js";
import { readRecentCodexRateLimits, rememberCodexRateLimits } from "./rate-limit-cache.js";
import { formatCodexUsageLimitErrorMessage } from "./rate-limits.js";
import { readCodexAppServerBinding, type CodexAppServerThreadBinding } from "./session-binding.js";
import { readCodexMirroredSessionHistoryMessages } from "./session-history.js";
import { clearSharedCodexAppServerClientIfCurrent } from "./shared-client.js";
@@ -1034,16 +1036,18 @@ export async function runCodexAppServerAttempt(
),
);
} catch (error) {
const usageLimitError = formatCodexTurnStartUsageLimitError(error, pendingNotifications);
const turnStartErrorMessage = usageLimitError ?? formatErrorMessage(error);
emitCodexAppServerEvent(params, {
stream: "codex_app_server.lifecycle",
data: { phase: "turn_start_failed", error: formatErrorMessage(error) },
data: { phase: "turn_start_failed", error: turnStartErrorMessage },
});
trajectoryRecorder?.recordEvent("session.ended", {
status: "error",
threadId: thread.threadId,
timedOut,
aborted: runAbortController.signal.aborted,
promptError: normalizeCodexTrajectoryError(error),
promptError: turnStartErrorMessage,
});
trajectoryEndRecorded = true;
runAgentHarnessLlmOutputHook({
@@ -1065,7 +1069,7 @@ export async function runCodexAppServerAttempt(
event: {
messages: turnStartFailureMessages,
success: false,
error: formatErrorMessage(error),
error: turnStartErrorMessage,
durationMs: Date.now() - attemptStartedAt,
},
ctx: hookContext,
@@ -1083,6 +1087,11 @@ export async function runCodexAppServerAttempt(
},
});
params.abortSignal?.removeEventListener("abort", abortFromUpstream);
if (usageLimitError) {
throw new Error(usageLimitError, {
cause: error,
});
}
throw error;
}
turnId = turn.turn.id;
@@ -1644,6 +1653,76 @@ function readDynamicToolCallParams(
return readCodexDynamicToolCallParams(value);
}
function formatCodexTurnStartUsageLimitError(
error: unknown,
pendingNotifications: CodexServerNotification[],
): string | undefined {
const notificationError = readLatestCodexErrorNotification(pendingNotifications);
const errorPayload = readCodexErrorPayload(error);
return formatCodexUsageLimitErrorMessage({
message: notificationError?.message ?? errorPayload.message ?? formatErrorMessage(error),
codexErrorInfo: notificationError?.codexErrorInfo ?? errorPayload.codexErrorInfo,
rateLimits:
readLatestRateLimitNotificationPayload(pendingNotifications) ??
errorPayload.rateLimits ??
readRecentCodexRateLimits(),
});
}
function readLatestRateLimitNotificationPayload(
notifications: CodexServerNotification[],
): JsonValue | undefined {
for (let index = notifications.length - 1; index >= 0; index -= 1) {
const notification = notifications[index];
if (notification?.method === "account/rateLimits/updated") {
rememberCodexRateLimits(notification.params);
return notification.params;
}
}
return undefined;
}
function readLatestCodexErrorNotification(
notifications: CodexServerNotification[],
): { message?: string; codexErrorInfo?: JsonValue | null } | undefined {
for (let index = notifications.length - 1; index >= 0; index -= 1) {
const notification = notifications[index];
if (notification?.method !== "error" || !isJsonObject(notification.params)) {
continue;
}
const error = notification.params.error;
if (!isJsonObject(error)) {
continue;
}
return {
message: readString(error, "message"),
codexErrorInfo: error.codexErrorInfo,
};
}
return undefined;
}
function readCodexErrorPayload(error: unknown): {
message?: string;
codexErrorInfo?: JsonValue | null;
rateLimits?: JsonValue;
} {
const message = error instanceof Error ? error.message : undefined;
if (!error || typeof error !== "object" || !("data" in error)) {
return { message };
}
const data = (error as { data?: unknown }).data as JsonValue | undefined;
if (!isJsonObject(data)) {
return { message };
}
const nestedError = isJsonObject(data.error) ? data.error : data;
return {
message: readString(nestedError, "message") ?? message,
codexErrorInfo: nestedError.codexErrorInfo,
rateLimits: nestedError.rateLimits ?? data.rateLimits,
};
}
function isTurnNotification(
value: JsonValue | undefined,
threadId: string,

View File

@@ -1,6 +1,7 @@
import type { CodexComputerUseStatus } from "./app-server/computer-use.js";
import type { CodexAppServerModelListResult } from "./app-server/models.js";
import { isJsonObject, type JsonObject, type JsonValue } from "./app-server/protocol.js";
import { summarizeCodexRateLimits } from "./app-server/rate-limits.js";
import type { SafeValue } from "./command-rpc.js";
type CodexStatusProbes = {
@@ -37,7 +38,7 @@ export function formatCodexStatus(probes: CodexStatusProbes): string {
lines.push(
`Rate limits: ${
probes.limits.ok
? summarizeRateLimits(probes.limits.value)
? formatCodexRateLimitSummary(probes.limits.value)
: formatCodexDisplayText(probes.limits.error)
}`,
);
@@ -104,7 +105,9 @@ export function formatAccount(
): string {
return [
`Account: ${account.ok ? formatCodexAccountSummary(account.value) : formatCodexDisplayText(account.error)}`,
`Rate limits: ${limits.ok ? summarizeRateLimits(limits.value) : formatCodexDisplayText(limits.error)}`,
`Rate limits: ${
limits.ok ? formatCodexRateLimitSummary(limits.value) : formatCodexDisplayText(limits.error)
}`,
].join("\n");
}
@@ -276,6 +279,10 @@ function summarizeArrayLike(value: JsonValue | undefined): string {
return `${entries.length}`;
}
function formatCodexRateLimitSummary(value: JsonValue | undefined): string {
return formatCodexDisplayText(summarizeCodexRateLimits(value) ?? summarizeRateLimits(value));
}
function summarizeRateLimits(value: JsonValue | undefined): string {
const entries = extractArray(value);
if (entries.length > 0) {

View File

@@ -9,6 +9,7 @@ import {
import type { CodexComputerUseConfig } from "./app-server/config.js";
import { listAllCodexAppServerModels } from "./app-server/models.js";
import { isJsonObject, type JsonValue } from "./app-server/protocol.js";
import { rememberCodexRateLimits } from "./app-server/rate-limit-cache.js";
import {
clearCodexAppServerBinding,
readCodexAppServerBinding,
@@ -321,6 +322,9 @@ export async function handleCodexSubcommand(
undefined,
),
]);
if (limits.ok) {
rememberCodexRateLimits(limits.value);
}
return { text: formatAccount(account, limits) };
}
return { text: `Unknown Codex command: ${formatCodexDisplayText(subcommand)}\n\n${buildHelp()}` };

View File

@@ -6,6 +6,10 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { CODEX_CONTROL_METHODS } from "./app-server/capabilities.js";
import type { CodexComputerUseStatus } from "./app-server/computer-use.js";
import type { CodexAppServerStartOptions } from "./app-server/config.js";
import {
readRecentCodexRateLimits,
resetCodexRateLimitCacheForTests,
} from "./app-server/rate-limit-cache.js";
import { resetSharedCodexAppServerClientForTests } from "./app-server/shared-client.js";
import {
resetCodexDiagnosticsFeedbackStateForTests,
@@ -101,6 +105,7 @@ describe("codex command", () => {
afterEach(async () => {
resetCodexDiagnosticsFeedbackStateForTests();
resetCodexRateLimitCacheForTests();
resetSharedCodexAppServerClientForTests();
await fs.rm(tempDir, { recursive: true, force: true });
});
@@ -422,10 +427,10 @@ describe("codex command", () => {
});
await expect(handleCodexCommand(createContext("status"), { deps })).resolves.toMatchObject({
text: expect.stringContaining("Rate limits: 1"),
text: expect.stringContaining("Rate limits: Codex: primary 42%"),
});
await expect(handleCodexCommand(createContext("account"), { deps })).resolves.toMatchObject({
text: expect.stringContaining("Rate limits: 1"),
text: expect.stringContaining("Rate limits: Codex: primary 42%"),
});
});
@@ -476,6 +481,7 @@ describe("codex command", () => {
});
it("formats generated account/read responses", async () => {
const resetsAt = Math.ceil(Date.now() / 1000) + 120;
const safeCodexControlRequest = vi
.fn()
.mockResolvedValueOnce({
@@ -485,14 +491,30 @@ describe("codex command", () => {
requiresOpenaiAuth: false,
},
})
.mockResolvedValueOnce({ ok: true, value: { data: [{ name: "primary" }] } });
.mockResolvedValueOnce({
ok: true,
value: {
rateLimits: {
limitId: "codex",
limitName: "Codex",
primary: { usedPercent: 50, windowDurationMins: 300, resetsAt },
secondary: null,
credits: null,
planType: "plus",
rateLimitReachedType: null,
},
rateLimitsByLimitId: null,
},
});
await expect(
handleCodexCommand(createContext("account"), {
deps: createDeps({ safeCodexControlRequest }),
}),
).resolves.toEqual({
text: ["Account: codex@example.com", "Rate limits: 1"].join("\n"),
const result = await handleCodexCommand(createContext("account"), {
deps: createDeps({ safeCodexControlRequest }),
});
expect(result.text).toContain("Account: codex@example.com");
expect(result.text).toContain("Rate limits: Codex: primary 50%, resets in");
expect(readRecentCodexRateLimits()).toMatchObject({
rateLimits: { limitId: "codex" },
});
expect(safeCodexControlRequest).toHaveBeenCalledWith(undefined, CODEX_CONTROL_METHODS.account, {
refreshToken: false,

View File

@@ -2762,6 +2762,37 @@ describe("createTelegramBot", () => {
expect(sendMessageSpy).toHaveBeenCalledTimes(1);
expect(sendMessageSpy.mock.calls[0][1]).toBe("PFX final reply");
});
it("sends Codex usage-limit reset details as the Telegram reply body", async () => {
const codexRateLimitText =
"⚠️ You've reached your Codex subscription usage limit. Next reset in 42 minutes (2026-05-04T21:34:00.000Z). Run /codex account for current usage details.";
replySpy.mockResolvedValue({ text: codexRateLimitText });
loadConfig.mockReturnValue({
channels: {
telegram: { dmPolicy: "open", allowFrom: ["*"] },
},
});
createTelegramBot({ token: "tok" });
const handler = getOnHandler("message") as (ctx: Record<string, unknown>) => Promise<void>;
await handler({
message: {
chat: { id: 5, type: "private" },
text: "hi",
date: 1736380800,
},
me: { username: "openclaw_bot" },
getFile: async () => ({ download: async () => new Uint8Array() }),
});
expect(sendMessageSpy).toHaveBeenCalledTimes(1);
expect(sendMessageSpy.mock.calls[0][0]).toBe("5");
expect(sendMessageSpy.mock.calls[0][1]).toBe(codexRateLimitText);
expect(String(sendMessageSpy.mock.calls[0][1])).not.toContain(
"All models are temporarily rate-limited",
);
});
it("honors threaded replies for replyToMode=first/all", async () => {
for (const [mode, messageId] of [
["first", 101],

View File

@@ -50,6 +50,12 @@ export type ReplyPayload = {
export type ReplyPayloadMetadata = {
assistantMessageIndex?: number;
/**
* Internal OpenClaw notices generated after a runtime/provider failure are
* not assistant source replies. Dispatch may deliver them even when normal
* assistant source replies are message-tool-only; sendPolicy deny still wins.
*/
deliverDespiteSourceReplySuppression?: boolean;
};
const replyPayloadMetadata = new WeakMap<object, ReplyPayloadMetadata>();
@@ -66,3 +72,14 @@ export function setReplyPayloadMetadata<T extends object>(
export function getReplyPayloadMetadata(payload: object): ReplyPayloadMetadata | undefined {
return replyPayloadMetadata.get(payload);
}
export function copyReplyPayloadMetadata<T extends object>(source: object, payload: T): T {
const metadata = getReplyPayloadMetadata(source);
return metadata ? setReplyPayloadMetadata(payload, metadata) : payload;
}
export function markReplyPayloadForSourceSuppressionDelivery<T extends object>(payload: T): T {
return setReplyPayloadMetadata(payload, {
deliverDespiteSourceReplySuppression: true,
});
}

View File

@@ -1,4 +1,5 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { getReplyPayloadMetadata } from "../reply-payload.js";
import type { TemplateContext } from "../templating.js";
import { createTestFollowupRun } from "./agent-runner.test-fixtures.js";
import type { QueueSettings } from "./queue.js";
@@ -201,6 +202,26 @@ describe("runReplyAgent runtime config", () => {
);
});
it("surfaces known pre-run Codex usage-limit failures instead of dropping the reply", async () => {
const { replyParams } = createDirectRuntimeReplyParams({
shouldFollowup: false,
isActive: false,
});
const codexMessage =
"You've reached your Codex subscription usage limit. Codex did not return a reset time for this limit. Run /codex account for current usage details.";
runPreflightCompactionIfNeededMock.mockRejectedValue(new Error(codexMessage));
runMemoryFlushIfNeededMock.mockResolvedValue(undefined);
const result = await runReplyAgent(replyParams);
expect(result).toMatchObject({
text: `⚠️ ${codexMessage}`,
});
expect(result ? getReplyPayloadMetadata(result) : undefined).toMatchObject({
deliverDespiteSourceReplySuppression: true,
});
});
it("does not resolve secrets before the enqueue-followup queue path", async () => {
const { followupRun, resolvedQueue, replyParams } = createDirectRuntimeReplyParams({
shouldFollowup: true,

View File

@@ -3,6 +3,7 @@ import { LiveSessionModelSwitchError } from "../../agents/live-model-switch-erro
import type { SessionEntry } from "../../config/sessions.js";
import type { ModelDefinitionConfig } from "../../config/types.models.js";
import { CommandLaneClearedError, GatewayDrainingError } from "../../process/command-queue.js";
import { getReplyPayloadMetadata } from "../reply-payload.js";
import type { TemplateContext } from "../templating.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
@@ -88,7 +89,8 @@ vi.mock("../../agents/pi-embedded-helpers.js", () => ({
isBillingErrorMessage: () => false,
isLikelyContextOverflowError: () => false,
isOverloadedErrorMessage: (message: string) => /overloaded|capacity/i.test(message),
isRateLimitErrorMessage: () => false,
isRateLimitErrorMessage: (message: string) =>
/rate.limit|too many requests|429|usage limit/i.test(message),
isTransientHttpError: () => false,
sanitizeUserFacingText: (text?: string) => text ?? "",
}));
@@ -2024,6 +2026,98 @@ describe("runAgentTurnWithFallback", () => {
}
});
it("surfaces Codex usage-limit reset details for pure fallback exhaustion", async () => {
const codexMessage =
"You've reached your Codex subscription usage limit. Next reset in 42 minutes (2026-05-04T21:34:00.000Z). Run /codex account for current usage details.";
state.runWithModelFallbackMock.mockRejectedValueOnce(
Object.assign(new Error(`All models failed (1): openai/gpt-5.5: ${codexMessage}`), {
name: "FallbackSummaryError",
attempts: [
{
provider: "openai",
model: "gpt-5.5",
error: codexMessage,
reason: "rate_limit",
},
],
soonestCooldownExpiry: null,
}),
);
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello",
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "telegram",
MessageSid: "msg",
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true,
shouldEmitToolOutput: () => false,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false,
resetSessionAfterRoleOrderingConflict: async () => false,
isHeartbeat: false,
sessionKey: "main",
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off",
});
expect(result.kind).toBe("final");
if (result.kind === "final") {
expect(result.payload.text).toBe(`⚠️ ${codexMessage}`);
expect(result.payload.text).not.toContain("All models failed");
expect(getReplyPayloadMetadata(result.payload)).toMatchObject({
deliverDespiteSourceReplySuppression: true,
});
}
});
it("surfaces direct Codex usage-limit errors when fallback does not wrap one attempt", async () => {
const codexMessage =
"You've reached your Codex subscription usage limit. Codex did not return a reset time for this limit. Run /codex account for current usage details.";
state.runWithModelFallbackMock.mockRejectedValueOnce(new Error(codexMessage));
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
const result = await runAgentTurnWithFallback({
commandBody: "hello",
followupRun: createFollowupRun(),
sessionCtx: {
Provider: "telegram",
MessageSid: "msg",
} as unknown as TemplateContext,
opts: {},
typingSignals: createMockTypingSignaler(),
blockReplyPipeline: null,
blockStreamingEnabled: false,
resolvedBlockStreamingBreak: "message_end",
applyReplyToMode: (payload) => payload,
shouldEmitToolResult: () => true,
shouldEmitToolOutput: () => false,
pendingToolTasks: new Set(),
resetSessionAfterCompactionFailure: async () => false,
resetSessionAfterRoleOrderingConflict: async () => false,
isHeartbeat: false,
sessionKey: "main",
getActiveSessionEntry: () => undefined,
resolvedVerboseLevel: "off",
});
expect(result.kind).toBe("final");
if (result.kind === "final") {
expect(result.payload.text).toBe(`⚠️ ${codexMessage}`);
expect(getReplyPayloadMetadata(result.payload)).toMatchObject({
deliverDespiteSourceReplySuppression: true,
});
}
});
it("surfaces billing guidance for pure billing cooldown fallback exhaustion", async () => {
state.runWithModelFallbackMock.mockRejectedValueOnce(
Object.assign(

View File

@@ -59,6 +59,7 @@ import {
} from "../../utils/message-channel.js";
import { isInternalMessageChannel } from "../../utils/message-channel.js";
import { stripHeartbeatToken } from "../heartbeat.js";
import { markReplyPayloadForSourceSuppressionDelivery } from "../reply-payload.js";
import type { TemplateContext } from "../templating.js";
import type { VerboseLevel } from "../thinking.js";
import {
@@ -300,6 +301,10 @@ function rollbackFallbackSelectionStateIfUnchanged(
* Includes a countdown when the soonest cooldown expiry is known.
*/
function buildRateLimitCooldownMessage(err: unknown): string {
const codexUsageLimitMessage = extractCodexUsageLimitErrorMessage(err);
if (codexUsageLimitMessage) {
return codexUsageLimitMessage;
}
if (!isFallbackSummaryError(err)) {
return "⚠️ All models are temporarily rate-limited. Please try again in a few minutes.";
}
@@ -316,6 +321,44 @@ function buildRateLimitCooldownMessage(err: unknown): string {
return "⚠️ All models are temporarily rate-limited. Please try again in a few minutes.";
}
function extractCodexUsageLimitErrorMessage(err: unknown): string | undefined {
if (isFallbackSummaryError(err)) {
for (const attempt of err.attempts) {
const message = extractCodexUsageLimitMessage(attempt.error);
if (message) {
return `⚠️ ${message}`;
}
}
return undefined;
}
const message = extractCodexUsageLimitMessage(formatErrorMessage(err));
return message ? `⚠️ ${message}` : undefined;
}
function extractCodexUsageLimitMessage(text: string): string | undefined {
const markers = [
"You've reached your Codex subscription usage limit.",
"Codex usage limit reached.",
];
const markerIndex = markers
.map((marker) => text.indexOf(marker))
.filter((index) => index >= 0)
.toSorted((left, right) => left - right)[0];
if (markerIndex === undefined) {
return undefined;
}
const message = sanitizeUserFacingText(text.slice(markerIndex), { errorContext: true })
.split(/\r?\n/u)
.map((line) => line.trim())
.filter(Boolean)
.join(" ")
.trim();
if (!message) {
return undefined;
}
return message.length > 500 ? `${message.slice(0, 497)}...` : message;
}
function isPureTransientRateLimitSummary(err: unknown): boolean {
return (
isFallbackSummaryError(err) &&
@@ -459,6 +502,74 @@ function buildExternalRunFailureReply(
};
}
function markAgentRunFailureReplyPayload<T extends ReplyPayload>(payload: T): T {
return markReplyPayloadForSourceSuppressionDelivery(payload);
}
export function buildKnownAgentRunFailureReplyPayload(params: {
err: unknown;
sessionCtx: TemplateContext;
resolvedVerboseLevel: VerboseLevel | undefined;
}): ReplyPayload | undefined {
const message = formatErrorMessage(params.err);
const isFallbackSummary = isFallbackSummaryError(params.err);
const isBilling = isFallbackSummary
? isPureBillingSummary(params.err)
: isBillingErrorMessage(message);
if (isBilling) {
return markAgentRunFailureReplyPayload({
text: resolveExternalRunFailureTextForConversation({
text: BILLING_ERROR_USER_MESSAGE,
sessionCtx: params.sessionCtx,
isGenericRunnerFailure: false,
}),
});
}
const isPureTransientSummary = isFallbackSummary
? isPureTransientRateLimitSummary(params.err)
: false;
const isRateLimit = isFallbackSummary ? isPureTransientSummary : isRateLimitErrorMessage(message);
const rateLimitOrOverloadedCopy =
!isFallbackSummary || isPureTransientSummary
? formatRateLimitOrOverloadedErrorCopy(message)
: undefined;
if (isRateLimit && !isOverloadedErrorMessage(message)) {
return markAgentRunFailureReplyPayload({
text: resolveExternalRunFailureTextForConversation({
text: buildRateLimitCooldownMessage(params.err),
sessionCtx: params.sessionCtx,
isGenericRunnerFailure: false,
}),
});
}
if (rateLimitOrOverloadedCopy) {
return markAgentRunFailureReplyPayload({
text: resolveExternalRunFailureTextForConversation({
text: rateLimitOrOverloadedCopy,
sessionCtx: params.sessionCtx,
isGenericRunnerFailure: false,
}),
});
}
const externalRunFailureReply = buildExternalRunFailureReply(message, {
includeDetails: isVerboseFailureDetailEnabled(params.resolvedVerboseLevel),
});
if (externalRunFailureReply.isGenericRunnerFailure) {
return undefined;
}
return markAgentRunFailureReplyPayload({
text: resolveExternalRunFailureTextForConversation({
text: externalRunFailureReply.text,
sessionCtx: params.sessionCtx,
isGenericRunnerFailure: false,
}),
});
}
const CONTEXT_OVERFLOW_RESET_HINT =
"\n\nTo prevent this, increase your compaction buffer by setting " +
"`agents.defaults.compaction.reserveTokensFloor` to 20000 or higher in your config.";
@@ -1771,7 +1882,7 @@ export async function runAgentTurnWithFallback(params: {
params.replyOperation?.fail("run_failed", embeddedError);
return {
kind: "final",
payload: {
payload: markAgentRunFailureReplyPayload({
text: buildContextOverflowRecoveryText({
cfg: runtimeConfig,
agentId: params.followupRun.run.agentId,
@@ -1779,7 +1890,7 @@ export async function runAgentTurnWithFallback(params: {
primaryModel: params.followupRun.run.model,
activeSessionEntry: params.getActiveSessionEntry(),
}),
},
}),
};
}
if (embeddedError?.kind === "role_ordering") {
@@ -1788,9 +1899,9 @@ export async function runAgentTurnWithFallback(params: {
params.replyOperation?.fail("run_failed", embeddedError);
return {
kind: "final",
payload: {
payload: markAgentRunFailureReplyPayload({
text: "⚠️ Message ordering conflict. I've reset the conversation - please try again.",
},
}),
};
}
}
@@ -1820,13 +1931,13 @@ export async function runAgentTurnWithFallback(params: {
params.replyOperation?.fail("run_failed", err);
return {
kind: "final",
payload: {
payload: markAgentRunFailureReplyPayload({
text: resolveExternalRunFailureTextForConversation({
text: switchErrorText,
sessionCtx: params.sessionCtx,
isGenericRunnerFailure: !shouldSurfaceToControlUi,
}),
},
}),
};
}
params.followupRun.run.provider = err.provider;
@@ -1852,9 +1963,9 @@ export async function runAgentTurnWithFallback(params: {
if (isReplyOperationRestartAbort(params.replyOperation)) {
return {
kind: "final",
payload: {
payload: markAgentRunFailureReplyPayload({
text: buildRestartLifecycleReplyText(),
},
}),
};
}
@@ -1872,9 +1983,9 @@ export async function runAgentTurnWithFallback(params: {
params.replyOperation?.fail("gateway_draining", restartLifecycleError);
return {
kind: "final",
payload: {
payload: markAgentRunFailureReplyPayload({
text: buildRestartLifecycleReplyText(),
},
}),
};
}
@@ -1882,9 +1993,9 @@ export async function runAgentTurnWithFallback(params: {
params.replyOperation?.fail("command_lane_cleared", restartLifecycleError);
return {
kind: "final",
payload: {
payload: markAgentRunFailureReplyPayload({
text: buildRestartLifecycleReplyText(),
},
}),
};
}
@@ -1897,7 +2008,7 @@ export async function runAgentTurnWithFallback(params: {
params.replyOperation?.fail("run_failed", err);
return {
kind: "final",
payload: {
payload: markAgentRunFailureReplyPayload({
text: buildContextOverflowRecoveryText({
duringCompaction: true,
cfg: runtimeConfig,
@@ -1906,7 +2017,7 @@ export async function runAgentTurnWithFallback(params: {
primaryModel: params.followupRun.run.model,
activeSessionEntry: params.getActiveSessionEntry(),
}),
},
}),
};
}
if (isRoleOrderingError) {
@@ -1915,9 +2026,9 @@ export async function runAgentTurnWithFallback(params: {
params.replyOperation?.fail("run_failed", err);
return {
kind: "final",
payload: {
payload: markAgentRunFailureReplyPayload({
text: "⚠️ Message ordering conflict. I've reset the conversation - please try again.",
},
}),
};
}
}
@@ -1962,9 +2073,9 @@ export async function runAgentTurnWithFallback(params: {
params.replyOperation?.fail("session_corruption_reset", err);
return {
kind: "final",
payload: {
payload: markAgentRunFailureReplyPayload({
text: "⚠️ Session history was corrupted. I've reset the conversation - please try again!",
},
}),
};
}
@@ -2036,9 +2147,9 @@ export async function runAgentTurnWithFallback(params: {
params.replyOperation?.fail("run_failed", err);
return {
kind: "final",
payload: {
payload: markAgentRunFailureReplyPayload({
text: userVisibleFallbackText,
},
}),
};
}
}
@@ -2056,9 +2167,9 @@ export async function runAgentTurnWithFallback(params: {
params.replyOperation?.fail("run_failed", finalEmbeddedError);
return {
kind: "final",
payload: {
payload: markAgentRunFailureReplyPayload({
text: "⚠️ Context overflow — this conversation is too large for the model. Use /new to start a fresh session.",
},
}),
};
}
}
@@ -2093,10 +2204,10 @@ export async function runAgentTurnWithFallback(params: {
: undefined;
if (formattedErrorCandidate) {
runResult.payloads = [
{
markAgentRunFailureReplyPayload({
text: formattedErrorCandidate,
isError: true,
},
}),
];
}
}

View File

@@ -1,6 +1,10 @@
import { describe, expect, it } from "vitest";
import { resetPluginRuntimeStateForTest, setActivePluginRegistry } from "../../plugins/runtime.js";
import { createTestRegistry } from "../../test-utils/channel-plugins.js";
import {
getReplyPayloadMetadata,
markReplyPayloadForSourceSuppressionDelivery,
} from "../reply-payload.js";
import { buildReplyPayloads } from "./agent-runner-payloads.js";
const baseParams = {
@@ -47,6 +51,28 @@ describe("buildReplyPayloads media filter integration", () => {
expect(replyPayloads[0]?.text).toBe("Before\n\n\nAfter");
});
it("preserves internal delivery metadata through final payload normalization", async () => {
const payload = markReplyPayloadForSourceSuppressionDelivery({
text: "⚠️ API rate limit reached.\n[[reply_to_current]]",
});
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
payloads: [payload],
replyToMode: "all",
currentMessageId: "msg-1",
});
expect(replyPayloads).toHaveLength(1);
expect(replyPayloads[0]).toMatchObject({
text: "⚠️ API rate limit reached.",
replyToId: "msg-1",
});
expect(getReplyPayloadMetadata(replyPayloads[0])).toMatchObject({
deliverDespiteSourceReplySuppression: true,
});
});
it("strips media URL from payload when in messagingToolSentMediaUrls", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,

View File

@@ -5,6 +5,7 @@ import { logVerbose } from "../../globals.js";
import { createLazyImportLoader } from "../../shared/lazy-promise.js";
import { stripLegacyBracketToolCallBlocks } from "../../shared/text/assistant-visible-text.js";
import { stripHeartbeatToken } from "../heartbeat.js";
import { copyReplyPayloadMetadata } from "../reply-payload.js";
import type { OriginatingChannelType } from "../templating.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { ReplyPayload, ReplyThreadingPolicy } from "../types.js";
@@ -35,15 +36,16 @@ async function normalizeReplyPayloadMedia(params: {
}
try {
return await params.normalizeMediaPaths(params.payload);
const normalized = await params.normalizeMediaPaths(params.payload);
return copyReplyPayloadMetadata(params.payload, normalized);
} catch (err) {
logVerbose(`reply payload media normalization failed: ${String(err)}`);
return {
return copyReplyPayloadMetadata(params.payload, {
...params.payload,
mediaUrl: undefined,
mediaUrls: undefined,
audioAsVoice: false,
};
});
}
}
@@ -102,7 +104,7 @@ function sanitizeHeartbeatPayload(payload: ReplyPayload): ReplyPayload {
return payload;
}
logVerbose("Stripped legacy tool-call block from heartbeat reply");
return { ...payload, text: cleaned };
return copyReplyPayloadMetadata(payload, { ...payload, text: cleaned });
}
export async function buildReplyPayloads(params: {
@@ -139,7 +141,7 @@ export async function buildReplyPayloads(params: {
}
if (!text || !text.includes("HEARTBEAT_OK")) {
return [{ ...payload, text }];
return [copyReplyPayloadMetadata(payload, { ...payload, text })];
}
const stripped = stripHeartbeatToken(text, { mode: "message" });
if (stripped.didStrip && !didLogHeartbeatStrip) {
@@ -150,7 +152,7 @@ export async function buildReplyPayloads(params: {
if (stripped.shouldSkip && !hasMedia) {
return [];
}
return [{ ...payload, text: stripped.text }];
return [copyReplyPayloadMetadata(payload, { ...payload, text: stripped.text })];
});
const replyTaggedPayloads = (
@@ -275,20 +277,20 @@ export async function buildReplyPayloads(params: {
if (!reply.trimmedText) {
return payload;
}
const textOnlyPayload = {
const textOnlyPayload = copyReplyPayloadMetadata(payload, {
...payload,
mediaUrl: undefined,
mediaUrls: undefined,
audioAsVoice: undefined,
};
});
if (!params.blockReplyPipeline?.hasSentPayload(textOnlyPayload)) {
return payload;
}
return {
return copyReplyPayloadMetadata(payload, {
...payload,
text: undefined,
audioAsVoice: payload.audioAsVoice || undefined,
};
});
};
const contentSuppressedPayloads = shouldDropFinalPayloads
? dedupedPayloads.flatMap((payload) => preserveUnsentMediaAfterBlockStream(payload) ?? [])

View File

@@ -38,11 +38,15 @@ import {
buildFallbackNotice,
resolveFallbackTransition,
} from "../fallback-state.js";
import { markReplyPayloadForSourceSuppressionDelivery } from "../reply-payload.js";
import type { OriginatingChannelType, TemplateContext } from "../templating.js";
import { resolveResponseUsageMode, type VerboseLevel } from "../thinking.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { runAgentTurnWithFallback } from "./agent-runner-execution.js";
import {
buildKnownAgentRunFailureReplyPayload,
runAgentTurnWithFallback,
} from "./agent-runner-execution.js";
import {
createShouldEmitToolOutput,
createShouldEmitToolResult,
@@ -1141,9 +1145,9 @@ export async function runReplyAgent(params: {
} catch (error) {
if (error instanceof ReplyRunAlreadyActiveError) {
typing.cleanup();
return {
return markReplyPayloadForSourceSuppressionDelivery({
text: "⚠️ Previous run is still shutting down. Please try again in a moment.",
};
});
}
throw error;
}
@@ -1883,24 +1887,39 @@ export async function runReplyAgent(params: {
replyOperation.result?.kind === "aborted" &&
replyOperation.result.code === "aborted_for_restart"
) {
return returnWithQueuedFollowupDrain({
text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.",
});
return returnWithQueuedFollowupDrain(
markReplyPayloadForSourceSuppressionDelivery({
text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.",
}),
);
}
if (replyOperation.result?.kind === "aborted") {
return returnWithQueuedFollowupDrain({ text: SILENT_REPLY_TOKEN });
}
if (error instanceof GatewayDrainingError) {
replyOperation.fail("gateway_draining", error);
return returnWithQueuedFollowupDrain({
text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.",
});
return returnWithQueuedFollowupDrain(
markReplyPayloadForSourceSuppressionDelivery({
text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.",
}),
);
}
if (error instanceof CommandLaneClearedError) {
replyOperation.fail("command_lane_cleared", error);
return returnWithQueuedFollowupDrain({
text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.",
});
return returnWithQueuedFollowupDrain(
markReplyPayloadForSourceSuppressionDelivery({
text: "⚠️ Gateway is restarting. Please wait a few seconds and try again.",
}),
);
}
const knownFailurePayload = buildKnownAgentRunFailureReplyPayload({
err: error,
sessionCtx,
resolvedVerboseLevel,
});
if (knownFailurePayload) {
replyOperation.fail("run_failed", error);
return returnWithQueuedFollowupDrain(knownFailurePayload);
}
replyOperation.fail("run_failed", error);
// Keep the followup queue moving even when an unexpected exception escapes

View File

@@ -25,7 +25,7 @@ import {
} from "../../test-utils/channel-plugins.js";
import { createInternalHookEventPayload } from "../../test-utils/internal-hook-event-payload.js";
import type { MsgContext } from "../templating.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import { setReplyPayloadMetadata, type GetReplyOptions, type ReplyPayload } from "../types.js";
import type { ReplyDispatcher } from "./reply-dispatcher.js";
import { buildTestCtx } from "./test-ctx.js";
@@ -4162,6 +4162,7 @@ describe("before_dispatch hook", () => {
describe("sendPolicy deny — suppress delivery, not processing (#53328)", () => {
beforeEach(() => {
resetInboundDedupe();
sessionStoreMocks.currentEntry = undefined;
sessionBindingMocks.resolveByConversation.mockReset();
sessionBindingMocks.resolveByConversation.mockReturnValue(null);
sessionBindingMocks.touch.mockReset();
@@ -4579,6 +4580,72 @@ describe("sendPolicy deny — suppress delivery, not processing (#53328)", () =>
);
});
it("delivers marked runtime failure notices in message-tool-only mode", async () => {
setNoAbort();
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
sendPolicy: "allow",
};
const dispatcher = createDispatcher();
const failureNotice = setReplyPayloadMetadata(
{ text: "⚠️ You've reached your Codex subscription usage limit." },
{ deliverDespiteSourceReplySuppression: true },
);
const replyResolver = vi.fn(async () => failureNotice satisfies ReplyPayload);
const ctx = buildTestCtx({ SessionKey: "test:session" });
const result = await dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver,
replyOptions: {
sourceReplyDeliveryMode: "message_tool_only",
},
});
expect(replyResolver).toHaveBeenCalledTimes(1);
expect(result.queuedFinal).toBe(true);
expect(result.sourceReplyDeliveryMode).toBe("message_tool_only");
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith(failureNotice);
expect(dispatcher.sendBlockReply).not.toHaveBeenCalled();
expect(dispatcher.sendToolResult).not.toHaveBeenCalled();
});
it("does not deliver marked runtime failure notices when sendPolicy denies delivery", async () => {
setNoAbort();
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
sendPolicy: "deny",
};
const dispatcher = createDispatcher();
const replyResolver = vi.fn(
async () =>
setReplyPayloadMetadata(
{ text: "⚠️ You've reached your Codex subscription usage limit." },
{ deliverDespiteSourceReplySuppression: true },
) satisfies ReplyPayload,
);
const ctx = buildTestCtx({ SessionKey: "test:session" });
const result = await dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver,
replyOptions: {
sourceReplyDeliveryMode: "message_tool_only",
},
});
expect(replyResolver).toHaveBeenCalledTimes(1);
expect(result.queuedFinal).toBe(false);
expect(result.sourceReplyDeliveryMode).toBe("message_tool_only");
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
});
it("defaults group/channel turns to message-tool-only source delivery", async () => {
setNoAbort();
const dispatcher = createDispatcher();

View File

@@ -779,6 +779,7 @@ export async function dispatchReplyFromConfig(
sourceReplyDeliveryMode,
suppressAutomaticSourceDelivery,
suppressDelivery,
sendPolicyDenied,
deliverySuppressionReason,
suppressHookUserDelivery,
suppressHookReplyLifecycle,
@@ -1501,29 +1502,36 @@ export async function dispatchReplyFromConfig(
let routedFinalCount = 0;
let attemptedFinalDelivery = false;
let finalDeliveryFailed = false;
const shouldDeliverDespiteSourceReplySuppression = (reply: ReplyPayload) =>
suppressAutomaticSourceDelivery &&
!sendPolicyDenied &&
getReplyPayloadMetadata(reply)?.deliverDespiteSourceReplySuppression === true;
for (const reply of replies) {
// Suppress reasoning payloads from channel delivery — channels using this
// generic dispatch path do not have a dedicated reasoning lane.
if (reply.isReasoning === true) {
continue;
}
if (suppressDelivery && !shouldDeliverDespiteSourceReplySuppression(reply)) {
continue;
}
attemptedFinalDelivery = true;
const finalReply = await sendFinalPayload(reply);
queuedFinal = finalReply.queuedFinal || queuedFinal;
routedFinalCount += finalReply.routedFinalCount;
if (!finalReply.queuedFinal && finalReply.routedFinalCount === 0) {
finalDeliveryFailed = true;
}
}
if (attemptedFinalDelivery && !finalDeliveryFailed) {
await clearPendingFinalDeliveryAfterSuccess({
storePath: sessionStoreEntry.storePath,
sessionKey: sessionStoreEntry.sessionKey ?? sessionKey,
});
}
if (!suppressDelivery) {
for (const reply of replies) {
// Suppress reasoning payloads from channel delivery — channels using this
// generic dispatch path do not have a dedicated reasoning lane.
if (reply.isReasoning === true) {
continue;
}
attemptedFinalDelivery = true;
const finalReply = await sendFinalPayload(reply);
queuedFinal = finalReply.queuedFinal || queuedFinal;
routedFinalCount += finalReply.routedFinalCount;
if (!finalReply.queuedFinal && finalReply.routedFinalCount === 0) {
finalDeliveryFailed = true;
}
}
if (attemptedFinalDelivery && !finalDeliveryFailed) {
await clearPendingFinalDeliveryAfterSuccess({
storePath: sessionStoreEntry.storePath,
sessionKey: sessionStoreEntry.sessionKey ?? sessionKey,
});
}
const ttsMode = resolveConfiguredTtsMode(cfg, {
agentId: sessionAgentId,
channelId: deliveryChannel,

View File

@@ -1,6 +1,6 @@
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import { logVerbose } from "../../globals.js";
import { getReplyPayloadMetadata, setReplyPayloadMetadata } from "../reply-payload.js";
import { copyReplyPayloadMetadata } from "../reply-payload.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { BlockReplyContext, ReplyPayload, ReplyThreadingPolicy } from "../types.js";
import type { BlockReplyPipeline } from "./block-reply-pipeline.js";
@@ -48,7 +48,7 @@ export function normalizeReplyPayloadDirectives(params: {
const mediaUrl = params.payload.mediaUrl ?? parsed?.mediaUrl ?? mediaUrls?.[0];
return {
payload: {
payload: copyReplyPayloadMetadata(params.payload, {
...params.payload,
text,
mediaUrls,
@@ -57,16 +57,11 @@ export function normalizeReplyPayloadDirectives(params: {
replyToTag: params.payload.replyToTag || parsed?.replyToTag,
replyToCurrent: params.payload.replyToCurrent || parsed?.replyToCurrent,
audioAsVoice: Boolean(params.payload.audioAsVoice || parsed?.audioAsVoice),
},
}),
isSilent: parsed?.isSilent ?? false,
};
}
function carryReplyPayloadMetadata(source: ReplyPayload, target: ReplyPayload): ReplyPayload {
const metadata = getReplyPayloadMetadata(source);
return metadata ? setReplyPayloadMetadata(target, metadata) : target;
}
async function sendDirectBlockReply(params: {
onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => Promise<void> | void;
directlySentBlockKeys: Set<string>;
@@ -130,7 +125,7 @@ export function createBlockReplyDeliveryHandler(params: {
const mediaNormalizedPayload = params.normalizeMediaPaths
? await params.normalizeMediaPaths(normalized.payload)
: normalized.payload;
const blockPayload = carryReplyPayloadMetadata(
const blockPayload = copyReplyPayloadMetadata(
payload,
params.applyReplyToMode(mediaNormalizedPayload),
);

View File

@@ -1,6 +1,7 @@
import type { ReplyToMode } from "../../config/types.js";
import { hasReplyPayloadContent } from "../../interactive/payload.js";
import { normalizeOptionalString } from "../../shared/string-coerce.js";
import { copyReplyPayloadMetadata } from "../reply-payload.js";
import type { OriginatingChannelType } from "../templating.js";
import type { ReplyPayload, ReplyThreadingPolicy } from "../types.js";
import { extractReplyToTag } from "./reply-tags.js";
@@ -42,27 +43,30 @@ function resolveReplyThreadingForPayload(params: {
!implicitReplyToId ||
!allowImplicitReplyToCurrentMessage
? params.payload
: { ...params.payload, replyToId: implicitReplyToId };
: copyReplyPayloadMetadata(params.payload, {
...params.payload,
replyToId: implicitReplyToId,
});
if (typeof resolved.text === "string" && resolved.text.includes("[[")) {
const { cleaned, replyToId, replyToCurrent, hasTag } = extractReplyToTag(
resolved.text,
currentMessageId,
);
resolved = {
resolved = copyReplyPayloadMetadata(resolved, {
...resolved,
text: cleaned ? cleaned : undefined,
replyToId: replyToId ?? resolved.replyToId,
replyToTag: hasTag || resolved.replyToTag,
replyToCurrent: replyToCurrent || resolved.replyToCurrent,
};
});
}
if (resolved.replyToCurrent && !resolved.replyToId && currentMessageId) {
resolved = {
resolved = copyReplyPayloadMetadata(resolved, {
...resolved,
replyToId: currentMessageId,
};
});
}
return resolved;

View File

@@ -4,6 +4,7 @@ import { normalizeAnyChannelId } from "../../channels/registry.js";
import type { ReplyToMode } from "../../config/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { normalizeOptionalLowercaseString } from "../../shared/string-coerce.js";
import { copyReplyPayloadMetadata } from "../reply-payload.js";
import type { OriginatingChannelType } from "../templating.js";
import type { ReplyPayload, ReplyThreadingPolicy } from "../types.js";
import { isSingleUseReplyToMode } from "./reply-reference.js";
@@ -107,7 +108,7 @@ export function createReplyToModeFilter(
if (opts.allowExplicitReplyTagsWhenOff && isExplicit && !payload.isCompactionNotice) {
return payload;
}
return { ...payload, replyToId: undefined };
return copyReplyPayloadMetadata(payload, { ...payload, replyToId: undefined });
}
if (mode === "all") {
return payload;
@@ -119,7 +120,7 @@ export function createReplyToModeFilter(
if (payload.isCompactionNotice) {
return payload;
}
return { ...payload, replyToId: undefined };
return copyReplyPayloadMetadata(payload, { ...payload, replyToId: undefined });
}
// Compaction notices are transient status messages — they should be
// threaded (so they appear in-context), but they must not consume the

View File

@@ -4,5 +4,9 @@ export type {
ReplyThreadingPolicy,
TypingPolicy,
} from "./get-reply-options.types.js";
export { setReplyPayloadMetadata } from "./reply-payload.js";
export {
copyReplyPayloadMetadata,
markReplyPayloadForSourceSuppressionDelivery,
setReplyPayloadMetadata,
} from "./reply-payload.js";
export type { ReplyPayload } from "./reply-payload.js";

View File

@@ -5,6 +5,7 @@ import {
createHeartbeatToolResponsePayload,
type HeartbeatToolResponse,
} from "../auto-reply/heartbeat-tool-response.js";
import { markReplyPayloadForSourceSuppressionDelivery } from "../auto-reply/types.js";
import type { OpenClawConfig } from "../config/config.js";
import { runHeartbeatOnce, type HeartbeatDeps } from "./heartbeat-runner.js";
import { installHeartbeatRunnerTestRuntime } from "./heartbeat-runner.test-harness.js";
@@ -184,6 +185,44 @@ describe("runHeartbeatOnce heartbeat response tool", () => {
});
});
it("delivers Codex runtime failure notices during Codex heartbeat message-tool mode", async () => {
await withTempTelegramHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => {
const cfg = createConfig({ tmpDir, storePath });
await seedMainSessionStore(storePath, cfg, {
lastChannel: "telegram",
lastProvider: "telegram",
lastTo: TELEGRAM_GROUP,
agentHarnessId: "codex",
});
const usageLimitMessage =
"⚠️ You've reached your Codex subscription usage limit. Next reset in 42 minutes (2026-05-04T21:34:00.000Z). Run /codex account for current usage details.";
replySpy.mockResolvedValue(
markReplyPayloadForSourceSuppressionDelivery({
text: usageLimitMessage,
isError: true,
}),
);
const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1" });
const result = await runHeartbeatOnce({
cfg,
deps: createDeps({ sendTelegram, getReplyFromConfig: replySpy }),
});
const calledOpts = replySpy.mock.calls[0]?.[1] as {
sourceReplyDeliveryMode?: string;
};
expect(result.status).toBe("ran");
expect(calledOpts.sourceReplyDeliveryMode).toBe("message_tool_only");
expect(sendTelegram).toHaveBeenCalledTimes(1);
expect(sendTelegram).toHaveBeenCalledWith(
TELEGRAM_GROUP,
usageLimitMessage,
expect.any(Object),
);
});
});
it("uses the heartbeat response tool prompt for auto-selected Codex model sessions", async () => {
await withTempTelegramHeartbeatSandbox(async ({ tmpDir, storePath, replySpy }) => {
const cfg = createConfig({