From 40aff672c1461db81227da75f2b5794c8c7eb176 Mon Sep 17 00:00:00 2001 From: Joseph Krug Date: Thu, 12 Feb 2026 23:30:21 -0400 Subject: [PATCH] fix: prevent heartbeat scheduler silent death from wake handler race (#15108) Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: fd7165b93547251c48904fa60b4b608d96bfb65c Co-authored-by: joeykrug <5925937+joeykrug@users.noreply.github.com> Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com> Reviewed-by: @gumadeiras --- CHANGELOG.md | 1 + src/infra/heartbeat-runner.scheduler.test.ts | 51 +++++++ src/infra/heartbeat-runner.ts | 15 +- src/infra/heartbeat-wake.test.ts | 100 +++++++++++++- src/infra/heartbeat-wake.ts | 137 ++++++++++++++++--- 5 files changed, 282 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 01a1791696b..7982c1421e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Docs: https://docs.openclaw.ai - Onboarding/CLI: restore terminal state without resuming paused `stdin`, so onboarding exits cleanly after choosing Web UI and the installer returns instead of appearing stuck. - macOS Voice Wake: fix a crash in trigger trimming for CJK/Unicode transcripts by matching and slicing on original-string ranges instead of transformed-string indices. (#11052) Thanks @Flash-LHR. +- Heartbeat: prevent scheduler silent-death races during runner reloads, preserve retry cooldown backoff under wake bursts, and prioritize user/action wake causes over interval/retry reasons when coalescing. (#15108) Thanks @joeykrug. ## 2026.2.12 diff --git a/src/infra/heartbeat-runner.scheduler.test.ts b/src/infra/heartbeat-runner.scheduler.test.ts index e1923371ac0..ba560826cfe 100644 --- a/src/infra/heartbeat-runner.scheduler.test.ts +++ b/src/infra/heartbeat-runner.scheduler.test.ts @@ -87,6 +87,57 @@ describe("startHeartbeatRunner", () => { runner.stop(); }); + it("cleanup is idempotent and does not clear a newer runner's handler", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date(0)); + + const runSpy1 = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + const runSpy2 = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + + const cfg = { + agents: { defaults: { heartbeat: { every: "30m" } } }, + } as OpenClawConfig; + + // Start runner A + const runnerA = startHeartbeatRunner({ cfg, runOnce: runSpy1 }); + + // Start runner B (simulates lifecycle reload) + const runnerB = startHeartbeatRunner({ cfg, runOnce: runSpy2 }); + + // Stop runner A (stale cleanup) — should NOT kill runner B's handler + runnerA.stop(); + + // Runner B should still fire + await vi.advanceTimersByTimeAsync(30 * 60_000 + 1_000); + expect(runSpy2).toHaveBeenCalledTimes(1); + expect(runSpy1).not.toHaveBeenCalled(); + + // Double-stop should be safe (idempotent) + runnerA.stop(); + + runnerB.stop(); + }); + + it("run() returns skipped when runner is stopped", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date(0)); + + const runSpy = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + + const runner = startHeartbeatRunner({ + cfg: { + agents: { defaults: { heartbeat: { every: "30m" } } }, + } as OpenClawConfig, + runOnce: runSpy, + }); + + runner.stop(); + + // After stopping, no heartbeats should fire + await vi.advanceTimersByTimeAsync(60 * 60_000); + expect(runSpy).not.toHaveBeenCalled(); + }); + it("reschedules timer when runOnce returns requests-in-flight", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date(0)); diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index cec770f24f5..fe5783fd0e0 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -880,6 +880,7 @@ export function startHeartbeatRunner(opts: { } const delay = Math.max(0, nextDue - now); state.timer = setTimeout(() => { + state.timer = null; requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); }, delay); state.timer.unref?.(); @@ -933,6 +934,12 @@ export function startHeartbeatRunner(opts: { }; const run: HeartbeatWakeHandler = async (params) => { + if (state.stopped) { + return { + status: "skipped", + reason: "disabled", + } satisfies HeartbeatRunResult; + } if (!heartbeatsEnabled) { return { status: "skipped", @@ -994,12 +1001,16 @@ export function startHeartbeatRunner(opts: { return { status: "skipped", reason: isInterval ? "not-due" : "disabled" }; }; - setHeartbeatWakeHandler(async (params) => run({ reason: params.reason })); + const wakeHandler: HeartbeatWakeHandler = async (params) => run({ reason: params.reason }); + const disposeWakeHandler = setHeartbeatWakeHandler(wakeHandler); updateConfig(state.cfg); const cleanup = () => { + if (state.stopped) { + return; + } state.stopped = true; - setHeartbeatWakeHandler(null); + disposeWakeHandler(); if (state.timer) { clearTimeout(state.timer); } diff --git a/src/infra/heartbeat-wake.test.ts b/src/infra/heartbeat-wake.test.ts index cd703ed4069..58d24556672 100644 --- a/src/infra/heartbeat-wake.test.ts +++ b/src/infra/heartbeat-wake.test.ts @@ -28,7 +28,7 @@ describe("heartbeat-wake", () => { await vi.advanceTimersByTimeAsync(1); expect(handler).toHaveBeenCalledTimes(1); - expect(handler).toHaveBeenCalledWith({ reason: "retry" }); + expect(handler).toHaveBeenCalledWith({ reason: "exec-event" }); expect(wake.hasPendingHeartbeatWake()).toBe(false); }); @@ -54,6 +54,29 @@ describe("heartbeat-wake", () => { expect(handler.mock.calls[1]?.[0]).toEqual({ reason: "interval" }); }); + it("keeps retry cooldown even when a sooner request arrives", async () => { + vi.useFakeTimers(); + const wake = await loadWakeModule(); + const handler = vi + .fn() + .mockResolvedValueOnce({ status: "skipped", reason: "requests-in-flight" }) + .mockResolvedValueOnce({ status: "ran", durationMs: 1 }); + wake.setHeartbeatWakeHandler(handler); + + wake.requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); + await vi.advanceTimersByTimeAsync(1); + expect(handler).toHaveBeenCalledTimes(1); + + // Retry is now waiting for 1000ms. This should not preempt cooldown. + wake.requestHeartbeatNow({ reason: "hook:wake", coalesceMs: 0 }); + await vi.advanceTimersByTimeAsync(998); + expect(handler).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(1); + expect(handler).toHaveBeenCalledTimes(2); + expect(handler.mock.calls[1]?.[0]).toEqual({ reason: "hook:wake" }); + }); + it("retries thrown handler errors after the default retry delay", async () => { vi.useFakeTimers(); const wake = await loadWakeModule(); @@ -76,6 +99,81 @@ describe("heartbeat-wake", () => { expect(handler.mock.calls[1]?.[0]).toEqual({ reason: "exec-event" }); }); + it("stale disposer does not clear a newer handler", async () => { + vi.useFakeTimers(); + const wake = await loadWakeModule(); + const handlerA = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + const handlerB = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + + // Runner A registers its handler + const disposeA = wake.setHeartbeatWakeHandler(handlerA); + + // Runner B registers its handler (replaces A) + const disposeB = wake.setHeartbeatWakeHandler(handlerB); + + // Runner A's stale cleanup runs — should NOT clear handlerB + disposeA(); + expect(wake.hasHeartbeatWakeHandler()).toBe(true); + + // handlerB should still work + wake.requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); + await vi.advanceTimersByTimeAsync(1); + expect(handlerB).toHaveBeenCalledTimes(1); + expect(handlerA).not.toHaveBeenCalled(); + + // Runner B's dispose should work + disposeB(); + expect(wake.hasHeartbeatWakeHandler()).toBe(false); + }); + + it("preempts existing timer when a sooner schedule is requested", async () => { + vi.useFakeTimers(); + const wake = await loadWakeModule(); + const handler = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + wake.setHeartbeatWakeHandler(handler); + + // Schedule for 5 seconds from now + wake.requestHeartbeatNow({ reason: "slow", coalesceMs: 5000 }); + + // Schedule for 100ms from now — should preempt the 5s timer + wake.requestHeartbeatNow({ reason: "fast", coalesceMs: 100 }); + + await vi.advanceTimersByTimeAsync(100); + expect(handler).toHaveBeenCalledTimes(1); + // The reason should be "fast" since it was set last + expect(handler).toHaveBeenCalledWith({ reason: "fast" }); + }); + + it("keeps existing timer when later schedule is requested", async () => { + vi.useFakeTimers(); + const wake = await loadWakeModule(); + const handler = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + wake.setHeartbeatWakeHandler(handler); + + // Schedule for 100ms from now + wake.requestHeartbeatNow({ reason: "fast", coalesceMs: 100 }); + + // Schedule for 5 seconds from now — should NOT preempt + wake.requestHeartbeatNow({ reason: "slow", coalesceMs: 5000 }); + + await vi.advanceTimersByTimeAsync(100); + expect(handler).toHaveBeenCalledTimes(1); + }); + + it("does not downgrade a higher-priority pending reason", async () => { + vi.useFakeTimers(); + const wake = await loadWakeModule(); + const handler = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + wake.setHeartbeatWakeHandler(handler); + + wake.requestHeartbeatNow({ reason: "exec-event", coalesceMs: 100 }); + wake.requestHeartbeatNow({ reason: "retry", coalesceMs: 100 }); + + await vi.advanceTimersByTimeAsync(100); + expect(handler).toHaveBeenCalledTimes(1); + expect(handler).toHaveBeenCalledWith({ reason: "exec-event" }); + }); + it("drains pending wake once a handler is registered", async () => { vi.useFakeTimers(); const wake = await loadWakeModule(); diff --git a/src/infra/heartbeat-wake.ts b/src/infra/heartbeat-wake.ts index 8e981ffc168..2bdbc747f43 100644 --- a/src/infra/heartbeat-wake.ts +++ b/src/infra/heartbeat-wake.ts @@ -5,21 +5,102 @@ export type HeartbeatRunResult = export type HeartbeatWakeHandler = (opts: { reason?: string }) => Promise; +type WakeTimerKind = "normal" | "retry"; +type PendingWakeReason = { + reason: string; + priority: number; + requestedAt: number; +}; + let handler: HeartbeatWakeHandler | null = null; -let pendingReason: string | null = null; +let handlerGeneration = 0; +let pendingWake: PendingWakeReason | null = null; let scheduled = false; let running = false; let timer: NodeJS.Timeout | null = null; +let timerDueAt: number | null = null; +let timerKind: WakeTimerKind | null = null; const DEFAULT_COALESCE_MS = 250; const DEFAULT_RETRY_MS = 1_000; +const HOOK_REASON_PREFIX = "hook:"; +const REASON_PRIORITY = { + RETRY: 0, + INTERVAL: 1, + DEFAULT: 2, + ACTION: 3, +} as const; -function schedule(coalesceMs: number) { - if (timer) { +function isActionWakeReason(reason: string): boolean { + return reason === "manual" || reason === "exec-event" || reason.startsWith(HOOK_REASON_PREFIX); +} + +function resolveReasonPriority(reason: string): number { + if (reason === "retry") { + return REASON_PRIORITY.RETRY; + } + if (reason === "interval") { + return REASON_PRIORITY.INTERVAL; + } + if (isActionWakeReason(reason)) { + return REASON_PRIORITY.ACTION; + } + return REASON_PRIORITY.DEFAULT; +} + +function normalizeWakeReason(reason?: string): string { + if (typeof reason !== "string") { + return "requested"; + } + const trimmed = reason.trim(); + return trimmed.length > 0 ? trimmed : "requested"; +} + +function queuePendingWakeReason(reason?: string, requestedAt = Date.now()) { + const normalizedReason = normalizeWakeReason(reason); + const next: PendingWakeReason = { + reason: normalizedReason, + priority: resolveReasonPriority(normalizedReason), + requestedAt, + }; + if (!pendingWake) { + pendingWake = next; return; } + if (next.priority > pendingWake.priority) { + pendingWake = next; + return; + } + if (next.priority === pendingWake.priority && next.requestedAt >= pendingWake.requestedAt) { + pendingWake = next; + } +} + +function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") { + const delay = Number.isFinite(coalesceMs) ? Math.max(0, coalesceMs) : DEFAULT_COALESCE_MS; + const dueAt = Date.now() + delay; + if (timer) { + // Keep retry cooldown as a hard minimum delay. This prevents the + // finally-path reschedule (often delay=0) from collapsing backoff. + if (timerKind === "retry") { + return; + } + // If existing timer fires sooner or at the same time, keep it. + if (typeof timerDueAt === "number" && timerDueAt <= dueAt) { + return; + } + // New request needs to fire sooner — preempt the existing timer. + clearTimeout(timer); + timer = null; + timerDueAt = null; + timerKind = null; + } + timerDueAt = dueAt; + timerKind = kind; timer = setTimeout(async () => { timer = null; + timerDueAt = null; + timerKind = null; scheduled = false; const active = handler; if (!active) { @@ -27,44 +108,62 @@ function schedule(coalesceMs: number) { } if (running) { scheduled = true; - schedule(coalesceMs); + schedule(delay, kind); return; } - const reason = pendingReason; - pendingReason = null; + const reason = pendingWake?.reason; + pendingWake = null; running = true; try { const res = await active({ reason: reason ?? undefined }); if (res.status === "skipped" && res.reason === "requests-in-flight") { // The main lane is busy; retry soon. - pendingReason = reason ?? "retry"; - schedule(DEFAULT_RETRY_MS); + queuePendingWakeReason(reason ?? "retry"); + schedule(DEFAULT_RETRY_MS, "retry"); } } catch { // Error is already logged by the heartbeat runner; schedule a retry. - pendingReason = reason ?? "retry"; - schedule(DEFAULT_RETRY_MS); + queuePendingWakeReason(reason ?? "retry"); + schedule(DEFAULT_RETRY_MS, "retry"); } finally { running = false; - if (pendingReason || scheduled) { - schedule(coalesceMs); + if (pendingWake || scheduled) { + schedule(delay, "normal"); } } - }, coalesceMs); + }, delay); timer.unref?.(); } -export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null) { +/** + * Register (or clear) the heartbeat wake handler. + * Returns a disposer function that clears this specific registration. + * Stale disposers (from previous registrations) are no-ops, preventing + * a race where an old runner's cleanup clears a newer runner's handler. + */ +export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null): () => void { + handlerGeneration += 1; + const generation = handlerGeneration; handler = next; - if (handler && pendingReason) { - schedule(DEFAULT_COALESCE_MS); + if (handler && pendingWake) { + schedule(DEFAULT_COALESCE_MS, "normal"); } + return () => { + if (handlerGeneration !== generation) { + return; + } + if (handler !== next) { + return; + } + handlerGeneration += 1; + handler = null; + }; } export function requestHeartbeatNow(opts?: { reason?: string; coalesceMs?: number }) { - pendingReason = opts?.reason ?? pendingReason ?? "requested"; - schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS); + queuePendingWakeReason(opts?.reason); + schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS, "normal"); } export function hasHeartbeatWakeHandler() { @@ -72,5 +171,5 @@ export function hasHeartbeatWakeHandler() { } export function hasPendingHeartbeatWake() { - return pendingReason !== null || Boolean(timer) || scheduled; + return pendingWake !== null || Boolean(timer) || scheduled; }