From 5a80be35e9821f602e2ddd86aa4e192cd87e28ab Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 11 May 2026 06:11:00 +0100 Subject: [PATCH] fix: track cron execution milestones --- CHANGELOG.md | 1 + .../codex/src/app-server/run-attempt.test.ts | 16 ++++ .../codex/src/app-server/run-attempt.ts | 78 +++++++++++++++++ src/agents/cli-runner/execute.ts | 8 +- src/agents/cli-runner/types.ts | 4 +- src/agents/pi-embedded-runner/run.ts | 1 + src/agents/pi-embedded-runner/run/attempt.ts | 1 + src/agents/pi-embedded-runner/run/params.ts | 9 ++ ...-embedded-subscribe.handlers.tools.test.ts | 13 ++- .../pi-embedded-subscribe.handlers.tools.ts | 6 ++ .../pi-embedded-subscribe.handlers.types.ts | 1 + src/agents/pi-embedded-subscribe.types.ts | 6 ++ src/cron/service/timer.regression.test.ts | 86 ++++++++++++++++++- src/cron/service/timer.ts | 74 ++++++++++------ src/cron/types.ts | 10 +++ 15 files changed, 278 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7c71851d82..98766e9f055 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ Docs: https://docs.openclaw.ai - Slack: include the bot's own root/parent message in new thread sessions so in-thread replies reach the agent with the parent text the user is responding to, instead of only `reply_to_id` metadata. Fixes #79338. Thanks @sxxtony. - Memory: skip managed dreaming cron reconciliation warnings for ordinary cron and heartbeat hook contexts that cannot manage Gateway cron. (#77027) Thanks @rubencu. +- Cron: treat Codex app-server turn acceptance, CLI process spawn, and tool starts as execution milestones, preventing isolated runs from tripping the early startup watchdog after work has begun. - Yuanbao: bump `openclaw-plugin-yuanbao` to 2.13.1 to support `sourceReplyDeliveryMode: "automatic"` for group chat. (#79814) Thanks @loongfay. - Memory: keep `memory_search` result `corpus` labels aligned with the hit source, so session transcript hits surface as `sessions` and memory-file hits stay `memory`. Fixes #72885. (#71898, #72886) Thanks @rubencu. - Codex app-server: default native plugin app tool approvals to automatic so non-destructive read tools run when destructive actions are disabled. diff --git a/extensions/codex/src/app-server/run-attempt.test.ts b/extensions/codex/src/app-server/run-attempt.test.ts index 4f5edb02d8c..3488ceeeaa1 100644 --- a/extensions/codex/src/app-server/run-attempt.test.ts +++ b/extensions/codex/src/app-server/run-attempt.test.ts @@ -1071,6 +1071,7 @@ describe("runCodexAppServerAttempt", () => { it("emits normalized tool progress around app-server dynamic tool requests", async () => { const harness = createStartedThreadHarness(); const onRunAgentEvent = vi.fn(); + const onExecutionPhase = vi.fn(); const globalAgentEvents: AgentEventPayload[] = []; onAgentEvent((event) => globalAgentEvents.push(event)); const params = createParams( @@ -1078,6 +1079,7 @@ describe("runCodexAppServerAttempt", () => { path.join(tempDir, "workspace"), ); params.onAgentEvent = onRunAgentEvent; + params.onExecutionPhase = onExecutionPhase; const run = runCodexAppServerAttempt(params); await harness.waitForMethod("turn/start"); @@ -1143,6 +1145,20 @@ describe("runCodexAppServerAttempt", () => { expect(globalStartEvent?.runId).toBe("run-1"); expect(globalStartEvent?.sessionKey).toBe("agent:main:session-1"); expect(globalStartEvent?.data.name).toBe("message"); + expect(onExecutionPhase).toHaveBeenCalledWith({ + phase: "turn_accepted", + provider: "codex", + model: "gpt-5.4-codex", + backend: "codex-app-server", + }); + expect(onExecutionPhase).toHaveBeenCalledWith({ + phase: "tool_execution_started", + provider: "codex", + model: "gpt-5.4-codex", + backend: "codex-app-server", + tool: "message", + toolCallId: "call-1", + }); }); it("releases the session when Codex never completes after a dynamic tool response", async () => { diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 83c1d7b71ec..284ac063ae6 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -99,6 +99,7 @@ import { type CodexDynamicToolSpec, type CodexDynamicToolCallParams, type CodexDynamicToolCallResponse, + type CodexThreadItem, type CodexTurnStartResponse, type JsonObject, type JsonValue, @@ -1067,6 +1068,46 @@ export async function runCodexAppServerAttempt( lifecycleTerminalEmitted = true; }; + const executionPhaseKeys = new Set(); + const emitExecutionPhaseOnce = ( + key: string, + info: Parameters>[0], + ) => { + if (executionPhaseKeys.has(key)) { + return; + } + executionPhaseKeys.add(key); + params.onExecutionPhase?.({ + provider: params.provider, + model: params.modelId, + backend: "codex-app-server", + ...info, + }); + }; + const reportCodexExecutionNotification = (notification: CodexServerNotification) => { + if (notification.method === "turn/started") { + emitExecutionPhaseOnce("turn_accepted", { phase: "turn_accepted" }); + return; + } + if (notification.method === "item/agentMessage/delta") { + emitExecutionPhaseOnce("assistant_output_started", { phase: "assistant_output_started" }); + return; + } + if (notification.method !== "item/started") { + return; + } + const item = readCodexNotificationItem(notification.params); + const tool = item ? codexExecutionToolName(item) : undefined; + if (!item || !tool) { + return; + } + emitExecutionPhaseOnce(`tool:${item.id}`, { + phase: "tool_execution_started", + tool, + itemId: item.id, + }); + }; + const handleNotification = async (notification: CodexServerNotification) => { userInputBridge?.handleNotification(notification); if (!projector || !turnId) { @@ -1082,6 +1123,7 @@ export async function runCodexAppServerAttempt( touchTurnCompletionActivity(`notification:${notification.method}`, { details: describeNotificationActivity(notification), }); + reportCodexExecutionNotification(notification); } if (isCurrentTurnNotification && notification.method === "error") { if (isRetryableErrorNotification(notification.params)) { @@ -1198,6 +1240,11 @@ export async function runCodexAppServerAttempt( tool: call.tool, arguments: call.arguments, }); + emitExecutionPhaseOnce(`tool:${call.callId}`, { + phase: "tool_execution_started", + tool: call.tool, + toolCallId: call.callId, + }); const toolProgressDetailMode = resolveCodexToolProgressDetailMode(params.toolProgressDetail); const toolMeta = inferCodexDynamicToolMeta(call, toolProgressDetailMode); const toolArgs = sanitizeCodexToolArguments(call.arguments); @@ -1371,6 +1418,7 @@ export async function runCodexAppServerAttempt( } turnId = turn.turn.id; const activeTurnId = turn.turn.id; + emitExecutionPhaseOnce("turn_accepted", { phase: "turn_accepted" }); userInputBridge = createCodexUserInputBridge({ paramsForRun: params, threadId: thread.threadId, @@ -2595,6 +2643,36 @@ function isNonEmptyString(value: unknown): value is string { return typeof value === "string" && value.length > 0; } +function readCodexNotificationItem(params: JsonValue | undefined): CodexThreadItem | undefined { + if (!isJsonObject(params) || !isJsonObject(params.item)) { + return undefined; + } + const item = params.item; + return typeof item.id === "string" && typeof item.type === "string" + ? (item as CodexThreadItem) + : undefined; +} + +function codexExecutionToolName(item: CodexThreadItem): string | undefined { + if (item.type === "dynamicToolCall" && typeof item.tool === "string") { + return item.tool; + } + if (item.type === "mcpToolCall" && typeof item.tool === "string") { + const server = typeof item.server === "string" && item.server ? item.server : undefined; + return server ? `${server}.${item.tool}` : item.tool; + } + if (item.type === "commandExecution") { + return "bash"; + } + if (item.type === "fileChange") { + return "apply_patch"; + } + if (item.type === "webSearch") { + return "web_search"; + } + return undefined; +} + function joinPresentSections(...sections: Array): string { return sections.filter((section): section is string => Boolean(section?.trim())).join("\n\n"); } diff --git a/src/agents/cli-runner/execute.ts b/src/agents/cli-runner/execute.ts index 3baea6e59c6..62b6610cd49 100644 --- a/src/agents/cli-runner/execute.ts +++ b/src/agents/cli-runner/execute.ts @@ -449,10 +449,10 @@ export async function executePreparedCliRun( throw new Error("Claude live session requires JSONL streaming parser"); } params.onExecutionPhase?.({ - phase: "model_call_started", + phase: "process_spawned", provider: params.provider, model: context.modelId, - firstModelCallStarted: true, + backend: context.backendResolved.id, }); claudeSkillsPluginCleanupOwned = true; const ownedPreparedBackendCleanup = context.preparedBackend.cleanup; @@ -536,10 +536,10 @@ export async function executePreparedCliRun( let stderrParseExceeded = false; params.onExecutionPhase?.({ - phase: "model_call_started", + phase: "process_spawned", provider: params.provider, model: context.modelId, - firstModelCallStarted: true, + backend: context.backendResolved.id, }); const managedRun = await supervisor.spawn({ sessionId: params.sessionId, diff --git a/src/agents/cli-runner/types.ts b/src/agents/cli-runner/types.ts index 485c8bb2b38..fc8341b7163 100644 --- a/src/agents/cli-runner/types.ts +++ b/src/agents/cli-runner/types.ts @@ -56,9 +56,11 @@ export type RunCliAgentParams = { abortSignal?: AbortSignal; onExecutionStarted?: () => void; onExecutionPhase?: (info: { - phase: "model_call_started"; + phase: "process_spawned" | "model_call_started"; provider?: string; model?: string; + backend?: string; + source?: string; firstModelCallStarted?: boolean; }) => void; replyOperation?: ReplyOperation; diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 282772d9abf..8749ac63a23 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -1277,6 +1277,7 @@ export async function runEmbeddedPiAgent( onReasoningEnd: params.onReasoningEnd, onToolResult: params.onToolResult, onAgentEvent: params.onAgentEvent, + onExecutionPhase: params.onExecutionPhase, extraSystemPrompt: params.extraSystemPrompt, sourceReplyDeliveryMode: params.sourceReplyDeliveryMode, inputProvenance: params.inputProvenance, diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 1584093d31d..862170f7225 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -2662,6 +2662,7 @@ export async function runEmbeddedAttempt( blockReplyChunking: params.blockReplyChunking, onPartialReply: params.onPartialReply, onAssistantMessageStart: params.onAssistantMessageStart, + onExecutionPhase: params.onExecutionPhase, onAgentEvent: params.onAgentEvent, onBeforeLifecycleTerminal: () => { // Clear embedded-run activity before emitting terminal lifecycle events so diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index 10c7aae99eb..1a4f37aa893 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -162,9 +162,18 @@ export type RunEmbeddedPiAgentParams = { | "context_engine" | "attempt_dispatch" | "context_assembled" + | "turn_accepted" + | "process_spawned" + | "tool_execution_started" + | "assistant_output_started" | "model_call_started"; provider?: string; model?: string; + backend?: string; + source?: string; + tool?: string; + toolCallId?: string; + itemId?: string; firstModelCallStarted?: boolean; }) => void; replyOperation?: ReplyOperation; diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.test.ts b/src/agents/pi-embedded-subscribe.handlers.tools.test.ts index 521be4a30da..f8de2f0ab73 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.test.ts @@ -23,15 +23,18 @@ function createTestContext(): { warn: ReturnType; onBlockReplyFlush: ReturnType; onAgentEvent: ReturnType; + onExecutionPhase: ReturnType; } { const onBlockReplyFlush = vi.fn(); const onAgentEvent = vi.fn(); + const onExecutionPhase = vi.fn(); const warn = vi.fn(); const ctx: ToolHandlerContext = { params: { runId: "run-test", onBlockReplyFlush, onAgentEvent, + onExecutionPhase, onToolResult: undefined, }, flushBlockReplyBuffer: vi.fn(), @@ -70,7 +73,7 @@ function createTestContext(): { trimMessagingToolSent: vi.fn(), }; - return { ctx, warn, onBlockReplyFlush, onAgentEvent }; + return { ctx, warn, onBlockReplyFlush, onAgentEvent, onExecutionPhase }; } type CapturedAgentEvent = { stream?: string; data?: Record }; @@ -139,7 +142,7 @@ function requireSingleMessagingTarget(ctx: ToolHandlerContext) { describe("handleToolExecutionStart read path checks", () => { it("does not warn when read tool uses file_path alias", async () => { - const { ctx, warn, onBlockReplyFlush } = createTestContext(); + const { ctx, warn, onBlockReplyFlush, onExecutionPhase } = createTestContext(); const evt: ToolExecutionStartEvent = { type: "tool_execution_start", @@ -151,6 +154,12 @@ describe("handleToolExecutionStart read path checks", () => { await handleToolExecutionStart(ctx, evt); expect(onBlockReplyFlush).toHaveBeenCalledTimes(1); + expect(onExecutionPhase).toHaveBeenCalledWith({ + phase: "tool_execution_started", + tool: "read", + toolCallId: "tool-1", + source: "pi-embedded", + }); expect(warn).not.toHaveBeenCalled(); }); diff --git a/src/agents/pi-embedded-subscribe.handlers.tools.ts b/src/agents/pi-embedded-subscribe.handlers.tools.ts index a1b79bd3921..2bd534421f9 100644 --- a/src/agents/pi-embedded-subscribe.handlers.tools.ts +++ b/src/agents/pi-embedded-subscribe.handlers.tools.ts @@ -659,6 +659,12 @@ export function handleToolExecutionStart( const args = evt.args; const runId = ctx.params.runId; ctx.state.toolExecutionSinceLastBlockReply = true; + ctx.params.onExecutionPhase?.({ + phase: "tool_execution_started", + tool: toolName, + toolCallId, + source: "pi-embedded", + }); // Track start time and args for after_tool_call hook. const startedAt = Date.now(); diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index 72f5da73005..90d508b2901 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -185,6 +185,7 @@ type ToolHandlerParams = Pick< | "runId" | "onBlockReplyFlush" | "onAgentEvent" + | "onExecutionPhase" | "onToolResult" | "sessionKey" | "sessionId" diff --git a/src/agents/pi-embedded-subscribe.types.ts b/src/agents/pi-embedded-subscribe.types.ts index 265b4f34193..097142e6c81 100644 --- a/src/agents/pi-embedded-subscribe.types.ts +++ b/src/agents/pi-embedded-subscribe.types.ts @@ -41,6 +41,12 @@ export type SubscribeEmbeddedPiSessionParams = { blockReplyChunking?: BlockReplyChunking; onPartialReply?: (payload: PartialReplyPayload) => void | Promise; onAssistantMessageStart?: () => void | Promise; + onExecutionPhase?: (info: { + phase: "tool_execution_started"; + tool?: string; + toolCallId?: string; + source?: string; + }) => void; onAgentEvent?: (evt: { stream: string; data: Record; diff --git a/src/cron/service/timer.regression.test.ts b/src/cron/service/timer.regression.test.ts index 11b31d5df19..91af2593508 100644 --- a/src/cron/service/timer.regression.test.ts +++ b/src/cron/service/timer.regression.test.ts @@ -1429,7 +1429,7 @@ describe("cron service timer regressions", () => { } }); - it("times out isolated agent runs that stall before the first model call (#74803)", async () => { + it("times out isolated agent runs that stall before execution starts (#74803)", async () => { vi.useFakeTimers(); try { const store = timerRegressionFixtures.makeStorePath(); @@ -1503,7 +1503,7 @@ describe("cron service timer regressions", () => { const job = requireJob(state, "isolated-pre-model-timeout-74803"); expect(abortObserved).toBe(true); expect(job.state.lastStatus).toBe("error"); - expect(job.state.lastError).toContain("stalled before first model call"); + expect(job.state.lastError).toContain("stalled before execution start"); expect(job.state.lastError).toContain("context-engine"); expect(cleanupTimedOutAgentRun).toHaveBeenCalledTimes(1); const cleanupArgs = requireRecord(firstMockArg(cleanupTimedOutAgentRun)); @@ -1517,6 +1517,88 @@ describe("cron service timer regressions", () => { } }); + it("clears the pre-execution watchdog on explicit execution milestones (#80283)", async () => { + vi.useFakeTimers(); + try { + const store = timerRegressionFixtures.makeStorePath(); + const scheduledAt = Date.parse("2026-05-10T09:10:00.000Z"); + const cronJob = createIsolatedRegressionJob({ + id: "isolated-turn-accepted-80283", + name: "turn accepted regression", + scheduledAt, + schedule: { kind: "at", at: new Date(scheduledAt).toISOString() }, + payload: { kind: "agentTurn", message: "work", timeoutSeconds: 1_200 }, + state: { nextRunAtMs: scheduledAt }, + }); + await writeCronJobs(store.storePath, [cronJob]); + + vi.setSystemTime(scheduledAt); + let now = scheduledAt; + const started = createDeferred(); + let abortObserved = false; + const cleanupTimedOutAgentRun = vi.fn(async () => {}); + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeat: vi.fn(), + cleanupTimedOutAgentRun, + runIsolatedAgentJob: vi.fn( + async ({ + abortSignal, + onExecutionStarted, + onExecutionPhase, + }: { + abortSignal?: AbortSignal; + onExecutionStarted?: (info?: CronAgentExecutionStarted) => void; + onExecutionPhase?: (info: CronAgentExecutionPhaseUpdate) => void; + }) => { + onExecutionStarted?.({ + jobId: "isolated-turn-accepted-80283", + phase: "runner_entered", + }); + onExecutionPhase?.({ + jobId: "isolated-turn-accepted-80283", + phase: "turn_accepted", + backend: "codex-app-server", + }); + started.resolve(); + abortSignal?.addEventListener( + "abort", + () => { + abortObserved = true; + }, + { once: true }, + ); + return await new Promise(() => {}); + }, + ), + }); + + const timerPromise = onTimer(state); + await started.promise; + await vi.advanceTimersByTimeAsync(60_100); + now += 60_100; + expect(abortObserved).toBe(false); + expect(cleanupTimedOutAgentRun).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1_140_000); + now += 1_140_000; + await timerPromise; + + const job = requireJob(state, "isolated-turn-accepted-80283"); + expect(abortObserved).toBe(true); + expect(job.state.lastStatus).toBe("error"); + expect(job.state.lastError).toContain("job execution timed out"); + expect(job.state.lastError).toContain("turn-accepted"); + expect(cleanupTimedOutAgentRun).toHaveBeenCalledTimes(1); + } finally { + vi.useRealTimers(); + } + }); + it("keeps state updates when cron next-run computation throws after a successful run (#30905)", () => { const startedAt = Date.parse("2026-03-02T12:00:00.000Z"); const endedAt = startedAt + 50; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 195af570f96..e452d69b5fd 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -54,8 +54,8 @@ export { DEFAULT_JOB_TIMEOUT_MS } from "./timeout-policy.js"; const MAX_TIMER_DELAY_MS = 60_000; const CRON_TIMEOUT_CLEANUP_GUARD_MS = 20_000; const CRON_AGENT_SETUP_WATCHDOG_MS = 60_000; -const CRON_AGENT_PRE_MODEL_WATCHDOG_MS = 60_000; -const CRON_AGENT_PRE_MODEL_MIN_WATCHDOG_MS = 1_000; +const CRON_AGENT_PRE_EXECUTION_WATCHDOG_MS = 60_000; +const CRON_AGENT_PRE_EXECUTION_MIN_WATCHDOG_MS = 1_000; /** * Minimum gap between consecutive fires of the same cron job. This is a @@ -120,9 +120,10 @@ export async function executeJobCoreWithTimeout( const runAbortController = new AbortController(); let timeoutId: NodeJS.Timeout | undefined; let setupTimeoutId: NodeJS.Timeout | undefined; - let preModelTimeoutId: NodeJS.Timeout | undefined; + let preExecutionTimeoutId: NodeJS.Timeout | undefined; let activeExecution: CronAgentExecutionStarted | undefined; - let modelCallStarted = false; + let runnerStarted = false; + let executionStarted = false; let timeoutReason: string | undefined; const timeoutMarker = Symbol("cron-timeout"); let resolveTimeout: ((value: typeof timeoutMarker) => void) | undefined; @@ -148,11 +149,13 @@ export async function executeJobCoreWithTimeout( } }; const startSetupTimeout = () => { - if (setupTimeoutId) { + if (setupTimeoutId || runnerStarted) { return; } setupTimeoutId = setTimeout(() => { - triggerTimeout(setupTimeoutErrorMessage(activeExecution)); + if (!runnerStarted) { + triggerTimeout(setupTimeoutErrorMessage(activeExecution)); + } }, CRON_AGENT_SETUP_WATCHDOG_MS); }; const clearSetupTimeout = () => { @@ -162,37 +165,38 @@ export async function executeJobCoreWithTimeout( clearTimeout(setupTimeoutId); setupTimeoutId = undefined; }; - const startPreModelTimeout = () => { - if (preModelTimeoutId || modelCallStarted) { + const startPreExecutionTimeout = () => { + if (preExecutionTimeoutId || executionStarted) { return; } - preModelTimeoutId = setTimeout(() => { - if (!modelCallStarted) { - triggerTimeout(preModelTimeoutErrorMessage(activeExecution)); + preExecutionTimeoutId = setTimeout(() => { + if (!executionStarted) { + triggerTimeout(preExecutionTimeoutErrorMessage(activeExecution)); } - }, resolveCronAgentPreModelWatchdogMs(jobTimeoutMs)); + }, resolveCronAgentPreExecutionWatchdogMs(jobTimeoutMs)); }; - const clearPreModelTimeout = () => { - if (!preModelTimeoutId) { + const clearPreExecutionTimeout = () => { + if (!preExecutionTimeoutId) { return; } - clearTimeout(preModelTimeoutId); - preModelTimeoutId = undefined; + clearTimeout(preExecutionTimeoutId); + preExecutionTimeoutId = undefined; }; const noteExecutionProgress = (info?: CronAgentExecutionStarted) => { if (info) { activeExecution = { ...activeExecution, ...info }; - if (info.phase === "model_call_started" || info.firstModelCallStarted) { - modelCallStarted = true; - clearPreModelTimeout(); + if (isCronAgentExecutionStarted(info)) { + executionStarted = true; + clearPreExecutionTimeout(); } } }; const onExecutionStarted = (info?: CronAgentExecutionStarted) => { + runnerStarted = true; noteExecutionProgress(info); clearSetupTimeout(); startTimeout(); - startPreModelTimeout(); + startPreExecutionTimeout(); }; const onExecutionPhase = (info: CronAgentExecutionPhaseUpdate) => { noteExecutionProgress(info); @@ -233,7 +237,7 @@ export async function executeJobCoreWithTimeout( clearTimeout(timeoutId); } clearSetupTimeout(); - clearPreModelTimeout(); + clearPreExecutionTimeout(); } } @@ -288,22 +292,38 @@ function setupTimeoutErrorMessage(execution?: CronAgentExecutionStarted): string return `cron: isolated agent setup timed out before runner start (last phase: ${phase})`; } -function preModelTimeoutErrorMessage(execution?: CronAgentExecutionStarted): string { +function preExecutionTimeoutErrorMessage(execution?: CronAgentExecutionStarted): string { const phase = formatCronAgentExecutionPhase(execution); if (!phase) { - return "cron: isolated agent run stalled before first model call"; + return "cron: isolated agent run stalled before execution start"; } - return `cron: isolated agent run stalled before first model call (last phase: ${phase})`; + return `cron: isolated agent run stalled before execution start (last phase: ${phase})`; } function formatCronAgentExecutionPhase(execution?: CronAgentExecutionStarted): string | undefined { return execution?.phase?.replaceAll("_", "-"); } -function resolveCronAgentPreModelWatchdogMs(jobTimeoutMs: number): number { +function isCronAgentExecutionStarted(info: CronAgentExecutionStarted): boolean { + if (info.firstModelCallStarted) { + return true; + } + switch (info.phase) { + case "turn_accepted": + case "process_spawned": + case "tool_execution_started": + case "assistant_output_started": + case "model_call_started": + return true; + default: + return false; + } +} + +function resolveCronAgentPreExecutionWatchdogMs(jobTimeoutMs: number): number { return Math.max( - CRON_AGENT_PRE_MODEL_MIN_WATCHDOG_MS, - Math.min(CRON_AGENT_PRE_MODEL_WATCHDOG_MS, Math.floor(jobTimeoutMs / 2)), + CRON_AGENT_PRE_EXECUTION_MIN_WATCHDOG_MS, + Math.min(CRON_AGENT_PRE_EXECUTION_WATCHDOG_MS, Math.floor(jobTimeoutMs / 2)), ); } diff --git a/src/cron/types.ts b/src/cron/types.ts index dc4dfb971be..cbf3172eb4a 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -134,6 +134,10 @@ export type CronAgentExecutionPhase = | "context_engine" | "attempt_dispatch" | "context_assembled" + | "turn_accepted" + | "process_spawned" + | "tool_execution_started" + | "assistant_output_started" | "model_call_started"; export type CronAgentExecutionStarted = { @@ -144,6 +148,12 @@ export type CronAgentExecutionStarted = { phase?: CronAgentExecutionPhase; provider?: string; model?: string; + backend?: string; + source?: string; + tool?: string; + toolCallId?: string; + itemId?: string; + /** @deprecated Use phase-specific execution milestones for watchdog progress. */ firstModelCallStarted?: boolean; };