diff --git a/CHANGELOG.md b/CHANGELOG.md index 84c25ab5df9..1901b11a174 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ Docs: https://docs.openclaw.ai - Cron: honor `cron.maxConcurrentRuns` in the timer loop so due jobs can execute up to the configured parallelism instead of always running serially. (#11595) Thanks @Takhoffman. - Cron/Run: enforce the same per-job timeout guard for manual `cron.run` executions as timer-driven runs, including abort propagation for isolated agent jobs, so forced runs cannot wedge indefinitely. (#23704) Thanks @tkuehnl. - Cron/Startup: enforce per-job timeout guards for startup catch-up replay runs so missed isolated jobs cannot hang indefinitely during gateway boot recovery. +- Cron/Main session: honor abort/timeout signals while retrying `wakeMode=now` heartbeat contention loops so main-target cron runs stop promptly instead of waiting through the full busy-retry window. - Cron/Schedule: for `every` jobs, prefer `lastRunAtMs + everyMs` when still in the future after restarts, then fall back to anchor scheduling for catch-up windows, so NEXT timing matches the last successful cadence. (#22895) Thanks @SidQin-cyber. - Cron/Service: execute manual `cron.run` jobs outside the cron lock (while still persisting started/finished state atomically) so `cron.list` and `cron.status` remain responsive during long forced runs. (#23628) Thanks @dsgraves. - Cron/Timer: keep a watchdog recheck timer armed while `onTimer` is actively executing so the scheduler continues polling even if a due-run tick stalls for an extended period. (#23628) Thanks @dsgraves. diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index c089449eaa2..b761ba12f6f 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -3,12 +3,13 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; import * as schedule from "./schedule.js"; import { CronService } from "./service.js"; import { createDeferred, createRunningCronServiceState } from "./service.test-harness.js"; import { computeJobNextRunAtMs } from "./service/jobs.js"; import { createCronServiceState, type CronEvent } from "./service/state.js"; -import { onTimer, runMissedJobs } from "./service/timer.js"; +import { executeJobCore, onTimer, runMissedJobs } from "./service/timer.js"; import type { CronJob, CronJobState } from "./types.js"; const noopLogger = { @@ -859,6 +860,55 @@ describe("Cron issue regressions", () => { expect(job?.state.lastError).toContain("timed out"); }); + it("respects abort signals while retrying main-session wake-now heartbeat runs", async () => { + vi.useRealTimers(); + const abortController = new AbortController(); + const runHeartbeatOnce = vi.fn( + async (): Promise => ({ + status: "skipped", + reason: "requests-in-flight", + }), + ); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + const mainJob: CronJob = { + id: "main-abort", + name: "main abort", + enabled: true, + createdAtMs: Date.now(), + updatedAtMs: Date.now(), + schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "tick" }, + state: {}, + }; + const state = createCronServiceState({ + cronEnabled: true, + storePath: "/tmp/openclaw-cron-abort-test/jobs.json", + log: noopLogger, + nowMs: () => Date.now(), + enqueueSystemEvent, + requestHeartbeatNow, + runHeartbeatOnce, + wakeNowHeartbeatBusyMaxWaitMs: 30, + wakeNowHeartbeatBusyRetryDelayMs: 5, + runIsolatedAgentJob: createDefaultIsolatedRunner(), + }); + + setTimeout(() => { + abortController.abort(); + }, 10); + + const result = await executeJobCore(state, mainJob, abortController.signal); + + expect(result.status).toBe("error"); + expect(result.error).toContain("timed out"); + expect(enqueueSystemEvent).toHaveBeenCalledTimes(1); + expect(runHeartbeatOnce).toHaveBeenCalled(); + expect(requestHeartbeatNow).not.toHaveBeenCalled(); + }); + it("retries cron schedule computation from the next second when the first attempt returns undefined (#17821)", () => { const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z"); const cronJob = createIsolatedRegressionJob({ diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 36e0ff45449..53d154cc439 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -607,8 +607,34 @@ export async function executeJobCore( job: CronJob, abortSignal?: AbortSignal, ): Promise { + const resolveAbortError = () => ({ + status: "error" as const, + error: timeoutErrorMessage(), + }); + const waitWithAbort = async (ms: number) => { + if (!abortSignal) { + await new Promise((resolve) => setTimeout(resolve, ms)); + return; + } + if (abortSignal.aborted) { + return; + } + await new Promise((resolve) => { + const timer = setTimeout(() => { + abortSignal.removeEventListener("abort", onAbort); + resolve(); + }, ms); + const onAbort = () => { + clearTimeout(timer); + abortSignal.removeEventListener("abort", onAbort); + resolve(); + }; + abortSignal.addEventListener("abort", onAbort, { once: true }); + }); + }; + if (abortSignal?.aborted) { - return { status: "error", error: timeoutErrorMessage() }; + return resolveAbortError(); } if (job.sessionTarget === "main") { const text = resolveJobPayloadTextForMain(job); @@ -629,7 +655,6 @@ export async function executeJobCore( }); if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) { const reason = `cron:${job.id}`; - const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); const maxWaitMs = state.deps.wakeNowHeartbeatBusyMaxWaitMs ?? 2 * 60_000; const retryDelayMs = state.deps.wakeNowHeartbeatBusyRetryDelayMs ?? 250; const waitStartedAt = state.deps.nowMs(); @@ -637,7 +662,7 @@ export async function executeJobCore( let heartbeatResult: HeartbeatRunResult; for (;;) { if (abortSignal?.aborted) { - return { status: "error", error: timeoutErrorMessage() }; + return resolveAbortError(); } heartbeatResult = await state.deps.runHeartbeatOnce({ reason, @@ -650,7 +675,13 @@ export async function executeJobCore( ) { break; } + if (abortSignal?.aborted) { + return resolveAbortError(); + } if (state.deps.nowMs() - waitStartedAt > maxWaitMs) { + if (abortSignal?.aborted) { + return resolveAbortError(); + } state.deps.requestHeartbeatNow({ reason, agentId: job.agentId, @@ -658,7 +689,7 @@ export async function executeJobCore( }); return { status: "ok", summary: text }; } - await delay(retryDelayMs); + await waitWithAbort(retryDelayMs); } if (heartbeatResult.status === "ran") { @@ -669,6 +700,9 @@ export async function executeJobCore( return { status: "error", error: heartbeatResult.reason, summary: text }; } } else { + if (abortSignal?.aborted) { + return resolveAbortError(); + } state.deps.requestHeartbeatNow({ reason: `cron:${job.id}`, agentId: job.agentId, @@ -682,7 +716,7 @@ export async function executeJobCore( return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" }; } if (abortSignal?.aborted) { - return { status: "error", error: timeoutErrorMessage() }; + return resolveAbortError(); } const res = await state.deps.runIsolatedAgentJob({