mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-07 22:44:16 +00:00
Cron: persist manual run marker before unlock (#23993)
* Cron: persist manual run marker before unlock * Cron tests: relax wakeMode now microtask wait after run lock persist
This commit is contained in:
@@ -68,6 +68,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Cron/Delivery: route text-only announce jobs with explicit thread/topic targets through direct outbound delivery so forum/thread destinations do not get dropped by intermediary announce turns. (#23841) Thanks @AndrewArto.
|
||||
- Cron: honor `cron.maxConcurrentRuns` in the timer loop so due jobs can execute up to the configured parallelism instead of always running serially. (#11595) Thanks @Takhoffman.
|
||||
- Cron/Run: enforce the same per-job timeout guard for manual `cron.run` executions as timer-driven runs, including abort propagation for isolated agent jobs, so forced runs cannot wedge indefinitely. (#23704) Thanks @tkuehnl.
|
||||
- Cron/Run: persist the manual-run `runningAtMs` marker before releasing the cron lock so overlapping timer ticks cannot start the same job concurrently.
|
||||
- Cron/Startup: enforce per-job timeout guards for startup catch-up replay runs so missed isolated jobs cannot hang indefinitely during gateway boot recovery.
|
||||
- Cron/Main session: honor abort/timeout signals while retrying `wakeMode=now` heartbeat contention loops so main-target cron runs stop promptly instead of waiting through the full busy-retry window.
|
||||
- Cron/Schedule: for `every` jobs, prefer `lastRunAtMs + everyMs` when still in the future after restarts, then fall back to anchor scheduling for catch-up windows, so NEXT timing matches the last successful cadence. (#22895) Thanks @SidQin-cyber.
|
||||
|
||||
@@ -507,6 +507,64 @@ describe("Cron issue regressions", () => {
|
||||
cron.stop();
|
||||
});
|
||||
|
||||
it("does not double-run a job when cron.run overlaps a due timer tick", async () => {
|
||||
const store = await makeStorePath();
|
||||
const runStarted = createDeferred<void>();
|
||||
const runFinished = createDeferred<void>();
|
||||
const runResolvers: Array<
|
||||
(value: { status: "ok" | "error" | "skipped"; summary?: string; error?: string }) => void
|
||||
> = [];
|
||||
const runIsolatedAgentJob = vi.fn(async () => {
|
||||
if (runIsolatedAgentJob.mock.calls.length === 1) {
|
||||
runStarted.resolve();
|
||||
}
|
||||
return await new Promise<{ status: "ok" | "error" | "skipped"; summary?: string }>(
|
||||
(resolve) => {
|
||||
runResolvers.push(resolve);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
let targetJobId = "";
|
||||
const cron = await startCronForStore({
|
||||
storePath: store.storePath,
|
||||
runIsolatedAgentJob,
|
||||
onEvent: (evt: CronEvent) => {
|
||||
if (evt.jobId === targetJobId && evt.action === "finished") {
|
||||
runFinished.resolve();
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
const dueAt = Date.now() + 100;
|
||||
const job = await cron.add({
|
||||
name: "manual-overlap-no-double-run",
|
||||
enabled: true,
|
||||
schedule: { kind: "at", at: new Date(dueAt).toISOString() },
|
||||
sessionTarget: "isolated",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "agentTurn", message: "overlap" },
|
||||
delivery: { mode: "none" },
|
||||
});
|
||||
targetJobId = job.id;
|
||||
|
||||
const manualRun = cron.run(job.id, "force");
|
||||
await runStarted.promise;
|
||||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(120);
|
||||
await Promise.resolve();
|
||||
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
|
||||
|
||||
runResolvers[0]?.({ status: "ok", summary: "done" });
|
||||
await manualRun;
|
||||
await runFinished.promise;
|
||||
// Barrier for final persistence before cleanup.
|
||||
await cron.list({ includeDisabled: true });
|
||||
|
||||
cron.stop();
|
||||
});
|
||||
|
||||
it("#13845: one-shot jobs with terminal statuses do not re-fire on restart", async () => {
|
||||
const store = await makeStorePath();
|
||||
const pastAt = Date.parse("2026-02-06T09:00:00.000Z");
|
||||
|
||||
@@ -479,7 +479,9 @@ describe("CronService", () => {
|
||||
const job = await addWakeModeNowMainSystemEventJob(cron, { name: "wakeMode now waits" });
|
||||
|
||||
const runPromise = cron.run(job.id, "force");
|
||||
for (let i = 0; i < 10; i++) {
|
||||
// `cron.run()` now persists the running marker before executing the job.
|
||||
// Allow more microtask turns so the post-lock execution can start.
|
||||
for (let i = 0; i < 500; i++) {
|
||||
if (runHeartbeatOnce.mock.calls.length > 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -226,6 +226,9 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
|
||||
// (`list`, `status`) stay responsive while the run is in progress.
|
||||
job.state.runningAtMs = now;
|
||||
job.state.lastError = undefined;
|
||||
// Persist the running marker before releasing lock so timer ticks that
|
||||
// force-reload from disk cannot start the same job concurrently.
|
||||
await persist(state);
|
||||
emit(state, { jobId: job.id, action: "started", runAtMs: now });
|
||||
const executionJob = JSON.parse(JSON.stringify(job)) as typeof job;
|
||||
return {
|
||||
|
||||
Reference in New Issue
Block a user