fix: track cron execution milestones

This commit is contained in:
Peter Steinberger
2026-05-11 06:11:00 +01:00
parent 52b1f58638
commit 5a80be35e9
15 changed files with 278 additions and 36 deletions

View File

@@ -50,6 +50,7 @@ Docs: https://docs.openclaw.ai
- Slack: include the bot's own root/parent message in new thread sessions so in-thread replies reach the agent with the parent text the user is responding to, instead of only `reply_to_id` metadata. Fixes #79338. Thanks @sxxtony.
- Memory: skip managed dreaming cron reconciliation warnings for ordinary cron and heartbeat hook contexts that cannot manage Gateway cron. (#77027) Thanks @rubencu.
- Cron: treat Codex app-server turn acceptance, CLI process spawn, and tool starts as execution milestones, preventing isolated runs from tripping the early startup watchdog after work has begun.
- Yuanbao: bump `openclaw-plugin-yuanbao` to 2.13.1 to support `sourceReplyDeliveryMode: "automatic"` for group chat. (#79814) Thanks @loongfay.
- Memory: keep `memory_search` result `corpus` labels aligned with the hit source, so session transcript hits surface as `sessions` and memory-file hits stay `memory`. Fixes #72885. (#71898, #72886) Thanks @rubencu.
- Codex app-server: default native plugin app tool approvals to automatic so non-destructive read tools run when destructive actions are disabled.

View File

@@ -1071,6 +1071,7 @@ describe("runCodexAppServerAttempt", () => {
it("emits normalized tool progress around app-server dynamic tool requests", async () => {
const harness = createStartedThreadHarness();
const onRunAgentEvent = vi.fn();
const onExecutionPhase = vi.fn();
const globalAgentEvents: AgentEventPayload[] = [];
onAgentEvent((event) => globalAgentEvents.push(event));
const params = createParams(
@@ -1078,6 +1079,7 @@ describe("runCodexAppServerAttempt", () => {
path.join(tempDir, "workspace"),
);
params.onAgentEvent = onRunAgentEvent;
params.onExecutionPhase = onExecutionPhase;
const run = runCodexAppServerAttempt(params);
await harness.waitForMethod("turn/start");
@@ -1143,6 +1145,20 @@ describe("runCodexAppServerAttempt", () => {
expect(globalStartEvent?.runId).toBe("run-1");
expect(globalStartEvent?.sessionKey).toBe("agent:main:session-1");
expect(globalStartEvent?.data.name).toBe("message");
expect(onExecutionPhase).toHaveBeenCalledWith({
phase: "turn_accepted",
provider: "codex",
model: "gpt-5.4-codex",
backend: "codex-app-server",
});
expect(onExecutionPhase).toHaveBeenCalledWith({
phase: "tool_execution_started",
provider: "codex",
model: "gpt-5.4-codex",
backend: "codex-app-server",
tool: "message",
toolCallId: "call-1",
});
});
it("releases the session when Codex never completes after a dynamic tool response", async () => {

View File

@@ -99,6 +99,7 @@ import {
type CodexDynamicToolSpec,
type CodexDynamicToolCallParams,
type CodexDynamicToolCallResponse,
type CodexThreadItem,
type CodexTurnStartResponse,
type JsonObject,
type JsonValue,
@@ -1067,6 +1068,46 @@ export async function runCodexAppServerAttempt(
lifecycleTerminalEmitted = true;
};
const executionPhaseKeys = new Set<string>();
const emitExecutionPhaseOnce = (
key: string,
info: Parameters<NonNullable<EmbeddedRunAttemptParams["onExecutionPhase"]>>[0],
) => {
if (executionPhaseKeys.has(key)) {
return;
}
executionPhaseKeys.add(key);
params.onExecutionPhase?.({
provider: params.provider,
model: params.modelId,
backend: "codex-app-server",
...info,
});
};
const reportCodexExecutionNotification = (notification: CodexServerNotification) => {
if (notification.method === "turn/started") {
emitExecutionPhaseOnce("turn_accepted", { phase: "turn_accepted" });
return;
}
if (notification.method === "item/agentMessage/delta") {
emitExecutionPhaseOnce("assistant_output_started", { phase: "assistant_output_started" });
return;
}
if (notification.method !== "item/started") {
return;
}
const item = readCodexNotificationItem(notification.params);
const tool = item ? codexExecutionToolName(item) : undefined;
if (!item || !tool) {
return;
}
emitExecutionPhaseOnce(`tool:${item.id}`, {
phase: "tool_execution_started",
tool,
itemId: item.id,
});
};
const handleNotification = async (notification: CodexServerNotification) => {
userInputBridge?.handleNotification(notification);
if (!projector || !turnId) {
@@ -1082,6 +1123,7 @@ export async function runCodexAppServerAttempt(
touchTurnCompletionActivity(`notification:${notification.method}`, {
details: describeNotificationActivity(notification),
});
reportCodexExecutionNotification(notification);
}
if (isCurrentTurnNotification && notification.method === "error") {
if (isRetryableErrorNotification(notification.params)) {
@@ -1198,6 +1240,11 @@ export async function runCodexAppServerAttempt(
tool: call.tool,
arguments: call.arguments,
});
emitExecutionPhaseOnce(`tool:${call.callId}`, {
phase: "tool_execution_started",
tool: call.tool,
toolCallId: call.callId,
});
const toolProgressDetailMode = resolveCodexToolProgressDetailMode(params.toolProgressDetail);
const toolMeta = inferCodexDynamicToolMeta(call, toolProgressDetailMode);
const toolArgs = sanitizeCodexToolArguments(call.arguments);
@@ -1371,6 +1418,7 @@ export async function runCodexAppServerAttempt(
}
turnId = turn.turn.id;
const activeTurnId = turn.turn.id;
emitExecutionPhaseOnce("turn_accepted", { phase: "turn_accepted" });
userInputBridge = createCodexUserInputBridge({
paramsForRun: params,
threadId: thread.threadId,
@@ -2595,6 +2643,36 @@ function isNonEmptyString(value: unknown): value is string {
return typeof value === "string" && value.length > 0;
}
function readCodexNotificationItem(params: JsonValue | undefined): CodexThreadItem | undefined {
if (!isJsonObject(params) || !isJsonObject(params.item)) {
return undefined;
}
const item = params.item;
return typeof item.id === "string" && typeof item.type === "string"
? (item as CodexThreadItem)
: undefined;
}
function codexExecutionToolName(item: CodexThreadItem): string | undefined {
if (item.type === "dynamicToolCall" && typeof item.tool === "string") {
return item.tool;
}
if (item.type === "mcpToolCall" && typeof item.tool === "string") {
const server = typeof item.server === "string" && item.server ? item.server : undefined;
return server ? `${server}.${item.tool}` : item.tool;
}
if (item.type === "commandExecution") {
return "bash";
}
if (item.type === "fileChange") {
return "apply_patch";
}
if (item.type === "webSearch") {
return "web_search";
}
return undefined;
}
function joinPresentSections(...sections: Array<string | undefined>): string {
return sections.filter((section): section is string => Boolean(section?.trim())).join("\n\n");
}

View File

@@ -449,10 +449,10 @@ export async function executePreparedCliRun(
throw new Error("Claude live session requires JSONL streaming parser");
}
params.onExecutionPhase?.({
phase: "model_call_started",
phase: "process_spawned",
provider: params.provider,
model: context.modelId,
firstModelCallStarted: true,
backend: context.backendResolved.id,
});
claudeSkillsPluginCleanupOwned = true;
const ownedPreparedBackendCleanup = context.preparedBackend.cleanup;
@@ -536,10 +536,10 @@ export async function executePreparedCliRun(
let stderrParseExceeded = false;
params.onExecutionPhase?.({
phase: "model_call_started",
phase: "process_spawned",
provider: params.provider,
model: context.modelId,
firstModelCallStarted: true,
backend: context.backendResolved.id,
});
const managedRun = await supervisor.spawn({
sessionId: params.sessionId,

View File

@@ -56,9 +56,11 @@ export type RunCliAgentParams = {
abortSignal?: AbortSignal;
onExecutionStarted?: () => void;
onExecutionPhase?: (info: {
phase: "model_call_started";
phase: "process_spawned" | "model_call_started";
provider?: string;
model?: string;
backend?: string;
source?: string;
firstModelCallStarted?: boolean;
}) => void;
replyOperation?: ReplyOperation;

View File

@@ -1277,6 +1277,7 @@ export async function runEmbeddedPiAgent(
onReasoningEnd: params.onReasoningEnd,
onToolResult: params.onToolResult,
onAgentEvent: params.onAgentEvent,
onExecutionPhase: params.onExecutionPhase,
extraSystemPrompt: params.extraSystemPrompt,
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
inputProvenance: params.inputProvenance,

View File

@@ -2662,6 +2662,7 @@ export async function runEmbeddedAttempt(
blockReplyChunking: params.blockReplyChunking,
onPartialReply: params.onPartialReply,
onAssistantMessageStart: params.onAssistantMessageStart,
onExecutionPhase: params.onExecutionPhase,
onAgentEvent: params.onAgentEvent,
onBeforeLifecycleTerminal: () => {
// Clear embedded-run activity before emitting terminal lifecycle events so

View File

@@ -162,9 +162,18 @@ export type RunEmbeddedPiAgentParams = {
| "context_engine"
| "attempt_dispatch"
| "context_assembled"
| "turn_accepted"
| "process_spawned"
| "tool_execution_started"
| "assistant_output_started"
| "model_call_started";
provider?: string;
model?: string;
backend?: string;
source?: string;
tool?: string;
toolCallId?: string;
itemId?: string;
firstModelCallStarted?: boolean;
}) => void;
replyOperation?: ReplyOperation;

View File

@@ -23,15 +23,18 @@ function createTestContext(): {
warn: ReturnType<typeof vi.fn>;
onBlockReplyFlush: ReturnType<typeof vi.fn>;
onAgentEvent: ReturnType<typeof vi.fn>;
onExecutionPhase: ReturnType<typeof vi.fn>;
} {
const onBlockReplyFlush = vi.fn();
const onAgentEvent = vi.fn();
const onExecutionPhase = vi.fn();
const warn = vi.fn();
const ctx: ToolHandlerContext = {
params: {
runId: "run-test",
onBlockReplyFlush,
onAgentEvent,
onExecutionPhase,
onToolResult: undefined,
},
flushBlockReplyBuffer: vi.fn(),
@@ -70,7 +73,7 @@ function createTestContext(): {
trimMessagingToolSent: vi.fn(),
};
return { ctx, warn, onBlockReplyFlush, onAgentEvent };
return { ctx, warn, onBlockReplyFlush, onAgentEvent, onExecutionPhase };
}
type CapturedAgentEvent = { stream?: string; data?: Record<string, unknown> };
@@ -139,7 +142,7 @@ function requireSingleMessagingTarget(ctx: ToolHandlerContext) {
describe("handleToolExecutionStart read path checks", () => {
it("does not warn when read tool uses file_path alias", async () => {
const { ctx, warn, onBlockReplyFlush } = createTestContext();
const { ctx, warn, onBlockReplyFlush, onExecutionPhase } = createTestContext();
const evt: ToolExecutionStartEvent = {
type: "tool_execution_start",
@@ -151,6 +154,12 @@ describe("handleToolExecutionStart read path checks", () => {
await handleToolExecutionStart(ctx, evt);
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
expect(onExecutionPhase).toHaveBeenCalledWith({
phase: "tool_execution_started",
tool: "read",
toolCallId: "tool-1",
source: "pi-embedded",
});
expect(warn).not.toHaveBeenCalled();
});

View File

@@ -659,6 +659,12 @@ export function handleToolExecutionStart(
const args = evt.args;
const runId = ctx.params.runId;
ctx.state.toolExecutionSinceLastBlockReply = true;
ctx.params.onExecutionPhase?.({
phase: "tool_execution_started",
tool: toolName,
toolCallId,
source: "pi-embedded",
});
// Track start time and args for after_tool_call hook.
const startedAt = Date.now();

View File

@@ -185,6 +185,7 @@ type ToolHandlerParams = Pick<
| "runId"
| "onBlockReplyFlush"
| "onAgentEvent"
| "onExecutionPhase"
| "onToolResult"
| "sessionKey"
| "sessionId"

View File

@@ -41,6 +41,12 @@ export type SubscribeEmbeddedPiSessionParams = {
blockReplyChunking?: BlockReplyChunking;
onPartialReply?: (payload: PartialReplyPayload) => void | Promise<void>;
onAssistantMessageStart?: () => void | Promise<void>;
onExecutionPhase?: (info: {
phase: "tool_execution_started";
tool?: string;
toolCallId?: string;
source?: string;
}) => void;
onAgentEvent?: (evt: {
stream: string;
data: Record<string, unknown>;

View File

@@ -1429,7 +1429,7 @@ describe("cron service timer regressions", () => {
}
});
it("times out isolated agent runs that stall before the first model call (#74803)", async () => {
it("times out isolated agent runs that stall before execution starts (#74803)", async () => {
vi.useFakeTimers();
try {
const store = timerRegressionFixtures.makeStorePath();
@@ -1503,7 +1503,7 @@ describe("cron service timer regressions", () => {
const job = requireJob(state, "isolated-pre-model-timeout-74803");
expect(abortObserved).toBe(true);
expect(job.state.lastStatus).toBe("error");
expect(job.state.lastError).toContain("stalled before first model call");
expect(job.state.lastError).toContain("stalled before execution start");
expect(job.state.lastError).toContain("context-engine");
expect(cleanupTimedOutAgentRun).toHaveBeenCalledTimes(1);
const cleanupArgs = requireRecord(firstMockArg(cleanupTimedOutAgentRun));
@@ -1517,6 +1517,88 @@ describe("cron service timer regressions", () => {
}
});
it("clears the pre-execution watchdog on explicit execution milestones (#80283)", async () => {
vi.useFakeTimers();
try {
const store = timerRegressionFixtures.makeStorePath();
const scheduledAt = Date.parse("2026-05-10T09:10:00.000Z");
const cronJob = createIsolatedRegressionJob({
id: "isolated-turn-accepted-80283",
name: "turn accepted regression",
scheduledAt,
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
payload: { kind: "agentTurn", message: "work", timeoutSeconds: 1_200 },
state: { nextRunAtMs: scheduledAt },
});
await writeCronJobs(store.storePath, [cronJob]);
vi.setSystemTime(scheduledAt);
let now = scheduledAt;
const started = createDeferred<void>();
let abortObserved = false;
const cleanupTimedOutAgentRun = vi.fn(async () => {});
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeat: vi.fn(),
cleanupTimedOutAgentRun,
runIsolatedAgentJob: vi.fn(
async ({
abortSignal,
onExecutionStarted,
onExecutionPhase,
}: {
abortSignal?: AbortSignal;
onExecutionStarted?: (info?: CronAgentExecutionStarted) => void;
onExecutionPhase?: (info: CronAgentExecutionPhaseUpdate) => void;
}) => {
onExecutionStarted?.({
jobId: "isolated-turn-accepted-80283",
phase: "runner_entered",
});
onExecutionPhase?.({
jobId: "isolated-turn-accepted-80283",
phase: "turn_accepted",
backend: "codex-app-server",
});
started.resolve();
abortSignal?.addEventListener(
"abort",
() => {
abortObserved = true;
},
{ once: true },
);
return await new Promise<never>(() => {});
},
),
});
const timerPromise = onTimer(state);
await started.promise;
await vi.advanceTimersByTimeAsync(60_100);
now += 60_100;
expect(abortObserved).toBe(false);
expect(cleanupTimedOutAgentRun).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(1_140_000);
now += 1_140_000;
await timerPromise;
const job = requireJob(state, "isolated-turn-accepted-80283");
expect(abortObserved).toBe(true);
expect(job.state.lastStatus).toBe("error");
expect(job.state.lastError).toContain("job execution timed out");
expect(job.state.lastError).toContain("turn-accepted");
expect(cleanupTimedOutAgentRun).toHaveBeenCalledTimes(1);
} finally {
vi.useRealTimers();
}
});
it("keeps state updates when cron next-run computation throws after a successful run (#30905)", () => {
const startedAt = Date.parse("2026-03-02T12:00:00.000Z");
const endedAt = startedAt + 50;

View File

@@ -54,8 +54,8 @@ export { DEFAULT_JOB_TIMEOUT_MS } from "./timeout-policy.js";
const MAX_TIMER_DELAY_MS = 60_000;
const CRON_TIMEOUT_CLEANUP_GUARD_MS = 20_000;
const CRON_AGENT_SETUP_WATCHDOG_MS = 60_000;
const CRON_AGENT_PRE_MODEL_WATCHDOG_MS = 60_000;
const CRON_AGENT_PRE_MODEL_MIN_WATCHDOG_MS = 1_000;
const CRON_AGENT_PRE_EXECUTION_WATCHDOG_MS = 60_000;
const CRON_AGENT_PRE_EXECUTION_MIN_WATCHDOG_MS = 1_000;
/**
* Minimum gap between consecutive fires of the same cron job. This is a
@@ -120,9 +120,10 @@ export async function executeJobCoreWithTimeout(
const runAbortController = new AbortController();
let timeoutId: NodeJS.Timeout | undefined;
let setupTimeoutId: NodeJS.Timeout | undefined;
let preModelTimeoutId: NodeJS.Timeout | undefined;
let preExecutionTimeoutId: NodeJS.Timeout | undefined;
let activeExecution: CronAgentExecutionStarted | undefined;
let modelCallStarted = false;
let runnerStarted = false;
let executionStarted = false;
let timeoutReason: string | undefined;
const timeoutMarker = Symbol("cron-timeout");
let resolveTimeout: ((value: typeof timeoutMarker) => void) | undefined;
@@ -148,11 +149,13 @@ export async function executeJobCoreWithTimeout(
}
};
const startSetupTimeout = () => {
if (setupTimeoutId) {
if (setupTimeoutId || runnerStarted) {
return;
}
setupTimeoutId = setTimeout(() => {
triggerTimeout(setupTimeoutErrorMessage(activeExecution));
if (!runnerStarted) {
triggerTimeout(setupTimeoutErrorMessage(activeExecution));
}
}, CRON_AGENT_SETUP_WATCHDOG_MS);
};
const clearSetupTimeout = () => {
@@ -162,37 +165,38 @@ export async function executeJobCoreWithTimeout(
clearTimeout(setupTimeoutId);
setupTimeoutId = undefined;
};
const startPreModelTimeout = () => {
if (preModelTimeoutId || modelCallStarted) {
const startPreExecutionTimeout = () => {
if (preExecutionTimeoutId || executionStarted) {
return;
}
preModelTimeoutId = setTimeout(() => {
if (!modelCallStarted) {
triggerTimeout(preModelTimeoutErrorMessage(activeExecution));
preExecutionTimeoutId = setTimeout(() => {
if (!executionStarted) {
triggerTimeout(preExecutionTimeoutErrorMessage(activeExecution));
}
}, resolveCronAgentPreModelWatchdogMs(jobTimeoutMs));
}, resolveCronAgentPreExecutionWatchdogMs(jobTimeoutMs));
};
const clearPreModelTimeout = () => {
if (!preModelTimeoutId) {
const clearPreExecutionTimeout = () => {
if (!preExecutionTimeoutId) {
return;
}
clearTimeout(preModelTimeoutId);
preModelTimeoutId = undefined;
clearTimeout(preExecutionTimeoutId);
preExecutionTimeoutId = undefined;
};
const noteExecutionProgress = (info?: CronAgentExecutionStarted) => {
if (info) {
activeExecution = { ...activeExecution, ...info };
if (info.phase === "model_call_started" || info.firstModelCallStarted) {
modelCallStarted = true;
clearPreModelTimeout();
if (isCronAgentExecutionStarted(info)) {
executionStarted = true;
clearPreExecutionTimeout();
}
}
};
const onExecutionStarted = (info?: CronAgentExecutionStarted) => {
runnerStarted = true;
noteExecutionProgress(info);
clearSetupTimeout();
startTimeout();
startPreModelTimeout();
startPreExecutionTimeout();
};
const onExecutionPhase = (info: CronAgentExecutionPhaseUpdate) => {
noteExecutionProgress(info);
@@ -233,7 +237,7 @@ export async function executeJobCoreWithTimeout(
clearTimeout(timeoutId);
}
clearSetupTimeout();
clearPreModelTimeout();
clearPreExecutionTimeout();
}
}
@@ -288,22 +292,38 @@ function setupTimeoutErrorMessage(execution?: CronAgentExecutionStarted): string
return `cron: isolated agent setup timed out before runner start (last phase: ${phase})`;
}
function preModelTimeoutErrorMessage(execution?: CronAgentExecutionStarted): string {
function preExecutionTimeoutErrorMessage(execution?: CronAgentExecutionStarted): string {
const phase = formatCronAgentExecutionPhase(execution);
if (!phase) {
return "cron: isolated agent run stalled before first model call";
return "cron: isolated agent run stalled before execution start";
}
return `cron: isolated agent run stalled before first model call (last phase: ${phase})`;
return `cron: isolated agent run stalled before execution start (last phase: ${phase})`;
}
function formatCronAgentExecutionPhase(execution?: CronAgentExecutionStarted): string | undefined {
return execution?.phase?.replaceAll("_", "-");
}
function resolveCronAgentPreModelWatchdogMs(jobTimeoutMs: number): number {
function isCronAgentExecutionStarted(info: CronAgentExecutionStarted): boolean {
if (info.firstModelCallStarted) {
return true;
}
switch (info.phase) {
case "turn_accepted":
case "process_spawned":
case "tool_execution_started":
case "assistant_output_started":
case "model_call_started":
return true;
default:
return false;
}
}
function resolveCronAgentPreExecutionWatchdogMs(jobTimeoutMs: number): number {
return Math.max(
CRON_AGENT_PRE_MODEL_MIN_WATCHDOG_MS,
Math.min(CRON_AGENT_PRE_MODEL_WATCHDOG_MS, Math.floor(jobTimeoutMs / 2)),
CRON_AGENT_PRE_EXECUTION_MIN_WATCHDOG_MS,
Math.min(CRON_AGENT_PRE_EXECUTION_WATCHDOG_MS, Math.floor(jobTimeoutMs / 2)),
);
}

View File

@@ -134,6 +134,10 @@ export type CronAgentExecutionPhase =
| "context_engine"
| "attempt_dispatch"
| "context_assembled"
| "turn_accepted"
| "process_spawned"
| "tool_execution_started"
| "assistant_output_started"
| "model_call_started";
export type CronAgentExecutionStarted = {
@@ -144,6 +148,12 @@ export type CronAgentExecutionStarted = {
phase?: CronAgentExecutionPhase;
provider?: string;
model?: string;
backend?: string;
source?: string;
tool?: string;
toolCallId?: string;
itemId?: string;
/** @deprecated Use phase-specific execution milestones for watchdog progress. */
firstModelCallStarted?: boolean;
};