diff --git a/src/cron/service.main-job-passes-heartbeat-target-last.test.ts b/src/cron/service.main-job-passes-heartbeat-target-last.test.ts new file mode 100644 index 00000000000..03a8eb214dd --- /dev/null +++ b/src/cron/service.main-job-passes-heartbeat-target-last.test.ts @@ -0,0 +1,122 @@ +import { describe, expect, it, vi } from "vitest"; +import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; +import { CronService } from "./service.js"; +import { setupCronServiceSuite, writeCronStoreSnapshot } from "./service.test-harness.js"; +import type { CronJob } from "./types.js"; + +const { logger, makeStorePath } = setupCronServiceSuite({ + prefix: "cron-main-heartbeat-target", +}); + +describe("cron main job passes heartbeat target=last", () => { + it("should pass heartbeat.target=last to runHeartbeatOnce for wakeMode=now main jobs", async () => { + const { storePath } = await makeStorePath(); + const now = Date.now(); + + const job: CronJob = { + id: "test-main-delivery", + name: "test-main-delivery", + enabled: true, + createdAtMs: now - 10_000, + updatedAtMs: now - 10_000, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "Check in" }, + state: { nextRunAtMs: now - 1 }, + }; + + await writeCronStoreSnapshot({ storePath, jobs: [job] }); + + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + const runHeartbeatOnce = vi.fn< + (opts?: { + reason?: string; + agentId?: string; + sessionKey?: string; + heartbeat?: { target?: string }; + }) => Promise + >(async () => ({ + status: "ran" as const, + durationMs: 50, + })); + + const cron = new CronService({ + storePath, + cronEnabled: true, + log: logger, + enqueueSystemEvent, + requestHeartbeatNow, + runHeartbeatOnce, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await cron.start(); + + // Wait for the timer to fire + await vi.advanceTimersByTimeAsync(2_000); + + // Give the async run a chance to complete + await vi.advanceTimersByTimeAsync(1_000); + + cron.stop(); + + // runHeartbeatOnce should have been called + expect(runHeartbeatOnce).toHaveBeenCalled(); + + // The heartbeat config passed should include target: "last" so the + // heartbeat runner delivers the response to the last active channel. + const callArgs = runHeartbeatOnce.mock.calls[0]?.[0]; + expect(callArgs).toBeDefined(); + expect(callArgs?.heartbeat).toBeDefined(); + expect(callArgs?.heartbeat?.target).toBe("last"); + }); + + it("should not pass heartbeat target for wakeMode=next-heartbeat main jobs", async () => { + const { storePath } = await makeStorePath(); + const now = Date.now(); + + const job: CronJob = { + id: "test-next-heartbeat", + name: "test-next-heartbeat", + enabled: true, + createdAtMs: now - 10_000, + updatedAtMs: now - 10_000, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "Check in" }, + state: { nextRunAtMs: now - 1 }, + }; + + await writeCronStoreSnapshot({ storePath, jobs: [job] }); + + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + const runHeartbeatOnce = vi.fn(async () => ({ + status: "ran" as const, + durationMs: 50, + })); + + const cron = new CronService({ + storePath, + cronEnabled: true, + log: logger, + enqueueSystemEvent, + requestHeartbeatNow, + runHeartbeatOnce, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })), + }); + + await cron.start(); + await vi.advanceTimersByTimeAsync(2_000); + await vi.advanceTimersByTimeAsync(1_000); + cron.stop(); + + // wakeMode=next-heartbeat uses requestHeartbeatNow, not runHeartbeatOnce + expect(requestHeartbeatNow).toHaveBeenCalled(); + // runHeartbeatOnce should NOT have been called for next-heartbeat mode + expect(runHeartbeatOnce).not.toHaveBeenCalled(); + }); +}); diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 3ad9cc1f591..604fd842b68 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -56,6 +56,8 @@ export type CronServiceDeps = { reason?: string; agentId?: string; sessionKey?: string; + /** Optional heartbeat config override (e.g. target: "last" for cron-triggered heartbeats). */ + heartbeat?: { target?: string }; }) => Promise; /** * WakeMode=now: max time to wait for runHeartbeatOnce to stop returning diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 42be9848294..7a0a8d181c1 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -663,6 +663,11 @@ export async function executeJobCore( reason, agentId: job.agentId, sessionKey: targetMainSessionKey, + // Cron-triggered heartbeats should deliver to the last active channel. + // Without this override, heartbeat target defaults to "none" (since + // e2362d35) and cron main-session responses are silently swallowed. + // See: https://github.com/openclaw/openclaw/issues/28508 + heartbeat: { target: "last" }, }); if ( heartbeatResult.status !== "skipped" || diff --git a/src/gateway/server-cron.test.ts b/src/gateway/server-cron.test.ts index 82a6d05f8e9..cd322bc6ebd 100644 --- a/src/gateway/server-cron.test.ts +++ b/src/gateway/server-cron.test.ts @@ -40,7 +40,7 @@ describe("buildGatewayCronService", () => { fetchWithSsrFGuardMock.mockClear(); }); - it("canonicalizes non-agent sessionKey to agent store key for enqueue + wake", async () => { + it("routes main-target jobs to the main session for enqueue + wake", async () => { const tmpDir = path.join(os.tmpdir(), `server-cron-${Date.now()}`); const cfg = { session: { @@ -73,12 +73,12 @@ describe("buildGatewayCronService", () => { expect(enqueueSystemEventMock).toHaveBeenCalledWith( "hello", expect.objectContaining({ - sessionKey: "agent:main:discord:channel:ops", + sessionKey: "agent:main:main", }), ); expect(requestHeartbeatNowMock).toHaveBeenCalledWith( expect.objectContaining({ - sessionKey: "agent:main:discord:channel:ops", + sessionKey: undefined, }), ); } finally { diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index c97d90b99f3..7430870f111 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -182,11 +182,29 @@ export function buildGatewayCronService(params: { }, runHeartbeatOnce: async (opts) => { const { runtimeConfig, agentId, sessionKey } = resolveCronWakeTarget(opts); + // Merge cron-supplied heartbeat overrides (e.g. target: "last") with the + // fully resolved agent heartbeat config so cron-triggered heartbeats + // respect agent-specific overrides (agents.list[].heartbeat) before + // falling back to agents.defaults.heartbeat. + const agentEntry = + Array.isArray(runtimeConfig.agents?.list) && + runtimeConfig.agents.list.find( + (entry) => + entry && typeof entry.id === "string" && normalizeAgentId(entry.id) === agentId, + ); + const baseHeartbeat = { + ...runtimeConfig.agents?.defaults?.heartbeat, + ...agentEntry?.heartbeat, + }; + const heartbeatOverride = opts?.heartbeat + ? { ...baseHeartbeat, ...opts.heartbeat } + : undefined; return await runHeartbeatOnce({ cfg: runtimeConfig, reason: opts?.reason, agentId, sessionKey, + heartbeat: heartbeatOverride, deps: { ...params.deps, runtime: defaultRuntime }, }); },