fix(cron): start isolated timeout after execution begins

This commit is contained in:
Peter Steinberger
2026-04-27 08:15:45 +01:00
parent 45778a840d
commit 729147dcb5
12 changed files with 146 additions and 31 deletions

View File

@@ -35,6 +35,7 @@ Docs: https://docs.openclaw.ai
- Cron: add `failureAlert.includeSkipped` and `openclaw cron edit --failure-alert-include-skipped` so persistently skipped jobs can alert without counting skips as execution errors or affecting retry backoff. Fixes #60846. Thanks @slideshow-dingo.
- Cron: invalidate stale pending runtime slots after live or offline `jobs.json` schedule edits, while preserving due slots for formatting-only rewrites. Fixes #27996 and #71607; carries forward #71651. Thanks @xialonglee and @fagnersouza666.
- Cron: keep legacy flat `jobs.json` rows loadable while comparing split-state schedule identities, so old cron stores do not crash before in-memory hydration can normalize them. Thanks @codex.
- Cron: start isolated agent-turn execution timeouts after the runner enters its effective execution lane, so queued cron/manual runs no longer spend their whole timeout budget before useful work begins. Fixes #41783. Thanks @ayanesakura and @Hurray0.
- Cron/Telegram: preserve direct-chat thread IDs and optional account IDs when inferring reminder delivery from Telegram direct-thread session keys. Fixes #44270; carries forward #44325, #44351, #44412, and #72657. Thanks @RunMintOn, @arkyu2077, @0xsline, and @vincentkoc.
- Cron: omit synthetic `delivery.resolved` errors from `--no-deliver` run records while preserving explicit no-deliver target traces for agent-initiated messages. Fixes #72210; carries forward #72219. Thanks @hatemclawbot-collab and @xydigit-sj.
- Cron: classify isolated runs as errors from structured embedded-run execution-denial metadata, with final-output marker fallback for `SYSTEM_RUN_DENIED`, `INVALID_REQUEST`, and approval-binding refusals, so blocked commands no longer appear green in cron history. Fixes #67172; carries forward #67186. Thanks @oc-gh-dr, @hclsys, and @1yihui.

View File

@@ -64,6 +64,7 @@ function buildCliHookAssistantMessage(params: {
export async function runCliAgent(params: RunCliAgentParams): Promise<EmbeddedPiRunResult> {
// Cron gate must fire before prepareCliRunContext — that call allocates
// backend resources released only by runPreparedCliAgent's try…finally.
params.onExecutionStarted?.();
if (params.trigger === "cron") {
const startedAt = Date.now();
const hookRunner = getGlobalHookRunner();

View File

@@ -44,6 +44,7 @@ export type RunCliAgentParams = {
agentAccountId?: string;
senderIsOwner?: boolean;
abortSignal?: AbortSignal;
onExecutionStarted?: () => void;
replyOperation?: ReplyOperation;
/**
* Close any long-lived CLI live session created for this run after the run

View File

@@ -322,6 +322,7 @@ export async function runEmbeddedPiAgent(
return enqueueGlobal(async () => {
throwIfAborted();
const started = Date.now();
params.onExecutionStarted?.();
const workspaceResolution = resolveRunWorkspaceDir({
workspaceDir: params.workspaceDir,
sessionKey: params.sessionKey,

View File

@@ -125,6 +125,7 @@ export type RunEmbeddedPiAgentParams = {
timeoutMs: number;
runId: string;
abortSignal?: AbortSignal;
onExecutionStarted?: () => void;
replyOperation?: ReplyOperation;
shouldEmitToolResult?: () => boolean;
shouldEmitToolOutput?: () => boolean;

View File

@@ -87,6 +87,7 @@ export function createCronPromptExecutor(params: {
cronSession: MutableCronSession;
abortSignal?: AbortSignal;
abortReason: () => string;
onExecutionStarted?: () => void;
}) {
const sessionFile =
params.cronSession.sessionEntry.sessionFile?.trim() ||
@@ -145,6 +146,7 @@ export function createCronPromptExecutor(params: {
skillsSnapshot: params.skillsSnapshot,
messageChannel: params.messageChannel,
abortSignal: params.abortSignal,
onExecutionStarted: params.onExecutionStarted,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature,
senderIsOwner: true,
@@ -213,6 +215,7 @@ export function createCronPromptExecutor(params: {
forceMessageTool: params.toolPolicy.forceMessageTool,
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
abortSignal: params.abortSignal,
onExecutionStarted: params.onExecutionStarted,
bootstrapPromptWarningSignaturesSeen,
bootstrapPromptWarningSignature,
});
@@ -273,6 +276,7 @@ export async function executeCronRun(params: {
abortSignal?: AbortSignal;
abortReason: () => string;
isAborted: () => boolean;
onExecutionStarted?: () => void;
thinkLevel: ThinkLevel | undefined;
timeoutMs: number;
suppressExecNotifyOnExit: boolean;
@@ -309,6 +313,7 @@ export async function executeCronRun(params: {
cronSession: params.cronSession,
abortSignal: params.abortSignal,
abortReason: params.abortReason,
onExecutionStarted: params.onExecutionStarted,
});
const runStartedAt = params.runStartedAt ?? Date.now();

View File

@@ -416,6 +416,7 @@ type RunCronAgentTurnParams = {
message: string;
abortSignal?: AbortSignal;
signal?: AbortSignal;
onExecutionStarted?: () => void;
sessionKey: string;
agentId?: string;
lane?: string;
@@ -968,6 +969,7 @@ export async function runCronIsolatedAgentTurn(params: {
message: string;
abortSignal?: AbortSignal;
signal?: AbortSignal;
onExecutionStarted?: () => void;
sessionKey: string;
agentId?: string;
lane?: string;
@@ -1013,6 +1015,7 @@ export async function runCronIsolatedAgentTurn(params: {
commandBody: prepared.context.commandBody,
persistSessionEntry: prepared.context.persistSessionEntry,
abortSignal,
onExecutionStarted: params.onExecutionStarted,
abortReason,
isAborted,
thinkLevel: prepared.context.thinkLevel,

View File

@@ -86,6 +86,7 @@ export type CronServiceDeps = {
job: CronJob;
message: string;
abortSignal?: AbortSignal;
onExecutionStarted?: () => void;
}) => Promise<
{
summary?: string;

View File

@@ -615,6 +615,74 @@ describe("cron service timer regressions", () => {
}
});
it("does not spend isolated execution timeout while waiting for the runner lane (#41783)", async () => {
vi.useFakeTimers();
try {
const store = timerRegressionFixtures.makeStorePath();
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
const cronJob = createIsolatedRegressionJob({
id: "timeout-after-lane-start",
name: "timeout after lane start",
scheduledAt,
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
payload: { kind: "agentTurn", message: "work", timeoutSeconds: FAST_TIMEOUT_SECONDS },
state: { nextRunAtMs: scheduledAt },
});
await writeCronJobs(store.storePath, [cronJob]);
let now = scheduledAt;
const runnerEntered = createDeferred<void>();
const laneAcquired = createDeferred<void>();
let observedAbortSignal: AbortSignal | undefined;
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async ({ abortSignal, onExecutionStarted }) => {
observedAbortSignal = abortSignal;
runnerEntered.resolve();
await laneAcquired.promise;
onExecutionStarted?.();
await new Promise<void>((resolve) => {
if (!abortSignal) {
resolve();
return;
}
if (abortSignal.aborted) {
resolve();
return;
}
abortSignal.addEventListener("abort", () => resolve(), { once: true });
});
now += 5;
return { status: "ok" as const, summary: "late" };
}),
});
const timerPromise = onTimer(state);
await runnerEntered.promise;
await vi.advanceTimersByTimeAsync(Math.ceil(FAST_TIMEOUT_SECONDS * 1_000) + 10);
expect(observedAbortSignal?.aborted).toBe(false);
laneAcquired.resolve();
await Promise.resolve();
expect(observedAbortSignal?.aborted).toBe(false);
await vi.advanceTimersByTimeAsync(Math.ceil(FAST_TIMEOUT_SECONDS * 1_000) + 10);
await timerPromise;
expect(observedAbortSignal?.aborted).toBe(true);
const job = state.store?.jobs.find((entry) => entry.id === "timeout-after-lane-start");
expect(job?.state.lastStatus).toBe("error");
expect(job?.state.lastError).toContain("timed out");
} finally {
vi.useRealTimers();
}
});
it("suppresses isolated follow-up side effects after timeout", async () => {
vi.useFakeTimers();
try {
@@ -981,30 +1049,39 @@ describe("cron service timer regressions", () => {
nowMs: () => now,
enqueueSystemEvent: vi.fn(),
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async ({ abortSignal }: { abortSignal?: AbortSignal }) => {
started.resolve();
await new Promise<void>((resolve) => {
if (!abortSignal) {
resolve();
return;
}
if (abortSignal.aborted) {
abortWallMs = Date.now();
resolve();
return;
}
abortSignal.addEventListener(
"abort",
() => {
runIsolatedAgentJob: vi.fn(
async ({
abortSignal,
onExecutionStarted,
}: {
abortSignal?: AbortSignal;
onExecutionStarted?: () => void;
}) => {
onExecutionStarted?.();
started.resolve();
await new Promise<void>((resolve) => {
if (!abortSignal) {
resolve();
return;
}
if (abortSignal.aborted) {
abortWallMs = Date.now();
resolve();
},
{ once: true },
);
});
now += 5;
return { status: "ok" as const, summary: "done" };
}),
return;
}
abortSignal.addEventListener(
"abort",
() => {
abortWallMs = Date.now();
resolve();
},
{ once: true },
);
});
now += 5;
return { status: "ok" as const, summary: "done" };
},
),
});
const timerPromise = onTimer(state);

View File

@@ -97,15 +97,30 @@ export async function executeJobCoreWithTimeout(
const runAbortController = new AbortController();
let timeoutId: NodeJS.Timeout | undefined;
let rejectTimeout: ((reason?: unknown) => void) | undefined;
const timeoutPromise = new Promise<never>((_, reject) => {
rejectTimeout = reject;
});
const startTimeout = () => {
if (timeoutId) {
return;
}
timeoutId = setTimeout(() => {
runAbortController.abort(timeoutErrorMessage());
rejectTimeout?.(new Error(timeoutErrorMessage()));
}, jobTimeoutMs);
};
const deferTimeoutUntilExecutionStart =
job.sessionTarget !== "main" && job.payload.kind === "agentTurn";
if (!deferTimeoutUntilExecutionStart) {
startTimeout();
}
try {
return await Promise.race([
executeJobCore(state, job, runAbortController.signal),
new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
runAbortController.abort(timeoutErrorMessage());
reject(new Error(timeoutErrorMessage()));
}, jobTimeoutMs);
executeJobCore(state, job, runAbortController.signal, {
onExecutionStarted: deferTimeoutUntilExecutionStart ? startTimeout : undefined,
}),
timeoutPromise,
]);
} finally {
if (timeoutId) {
@@ -1178,6 +1193,9 @@ export async function executeJobCore(
state: CronServiceState,
job: CronJob,
abortSignal?: AbortSignal,
options?: {
onExecutionStarted?: () => void;
},
): Promise<
CronRunOutcome &
CronRunTelemetry & {
@@ -1219,7 +1237,7 @@ export async function executeJobCore(
return await executeMainSessionCronJob(state, job, abortSignal, waitWithAbort);
}
return await executeDetachedCronJob(state, job, abortSignal, resolveAbortError);
return await executeDetachedCronJob(state, job, abortSignal, resolveAbortError, options);
}
async function executeMainSessionCronJob(
@@ -1329,6 +1347,9 @@ async function executeDetachedCronJob(
job: CronJob,
abortSignal: AbortSignal | undefined,
resolveAbortError: () => { status: "error"; error: string },
options?: {
onExecutionStarted?: () => void;
},
): Promise<
CronRunOutcome &
CronRunTelemetry & {
@@ -1348,6 +1369,7 @@ async function executeDetachedCronJob(
job,
message: job.payload.message,
abortSignal,
onExecutionStarted: options?.onExecutionStarted,
});
if (abortSignal?.aborted) {

View File

@@ -221,7 +221,7 @@ export function buildGatewayCronService(params: {
deps: { ...params.deps, runtime: defaultRuntime },
});
},
runIsolatedAgentJob: async ({ job, message, abortSignal }) => {
runIsolatedAgentJob: async ({ job, message, abortSignal, onExecutionStarted }) => {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
const sessionKey = resolveCronSessionTargetSessionKey(job.sessionTarget) ?? `cron:${job.id}`;
try {
@@ -231,6 +231,7 @@ export function buildGatewayCronService(params: {
job,
message,
abortSignal,
onExecutionStarted,
agentId,
sessionKey,
lane: "cron",

View File

@@ -133,9 +133,10 @@ export function createDefaultIsolatedRunner(): CronServiceDeps["runIsolatedAgent
export function createAbortAwareIsolatedRunner(summary = "late") {
let observedAbortSignal: AbortSignal | undefined;
const started = createDeferred<void>();
const runIsolatedAgentJob = vi.fn(async ({ abortSignal }) => {
const runIsolatedAgentJob = vi.fn(async ({ abortSignal, onExecutionStarted }) => {
observedAbortSignal = abortSignal;
started.resolve();
onExecutionStarted?.();
await new Promise<void>((resolve) => {
if (!abortSignal) {
return;