mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-07 22:44:16 +00:00
* fix(cron): pass heartbeat target=last for main-session cron jobs When a cron job with sessionTarget=main and wakeMode=now fires, it triggers a heartbeat via runHeartbeatOnce. Sincee2362d35changed the default heartbeat target from "last" to "none", these cron-triggered heartbeats silently discard their responses instead of delivering them to the last active channel (e.g. Telegram). Fix: pass heartbeat: { target: "last" } from the cron timer to runHeartbeatOnce for main-session jobs, and wire the override through the gateway cron service builder. This restores delivery for sessionTarget=main cron jobs without reverting the intentional default change for regular heartbeats. Regression introduced in:e2362d35(2026-02-25) Fixes #28508 * Cron: align server-cron wake routing expectations for main-target jobs --------- Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
122
src/cron/service.main-job-passes-heartbeat-target-last.test.ts
Normal file
122
src/cron/service.main-job-passes-heartbeat-target-last.test.ts
Normal file
@@ -0,0 +1,122 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js";
|
||||
import { CronService } from "./service.js";
|
||||
import { setupCronServiceSuite, writeCronStoreSnapshot } from "./service.test-harness.js";
|
||||
import type { CronJob } from "./types.js";
|
||||
|
||||
const { logger, makeStorePath } = setupCronServiceSuite({
|
||||
prefix: "cron-main-heartbeat-target",
|
||||
});
|
||||
|
||||
describe("cron main job passes heartbeat target=last", () => {
|
||||
it("should pass heartbeat.target=last to runHeartbeatOnce for wakeMode=now main jobs", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const now = Date.now();
|
||||
|
||||
const job: CronJob = {
|
||||
id: "test-main-delivery",
|
||||
name: "test-main-delivery",
|
||||
enabled: true,
|
||||
createdAtMs: now - 10_000,
|
||||
updatedAtMs: now - 10_000,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "now",
|
||||
payload: { kind: "systemEvent", text: "Check in" },
|
||||
state: { nextRunAtMs: now - 1 },
|
||||
};
|
||||
|
||||
await writeCronStoreSnapshot({ storePath, jobs: [job] });
|
||||
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
const requestHeartbeatNow = vi.fn();
|
||||
const runHeartbeatOnce = vi.fn<
|
||||
(opts?: {
|
||||
reason?: string;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
heartbeat?: { target?: string };
|
||||
}) => Promise<HeartbeatRunResult>
|
||||
>(async () => ({
|
||||
status: "ran" as const,
|
||||
durationMs: 50,
|
||||
}));
|
||||
|
||||
const cron = new CronService({
|
||||
storePath,
|
||||
cronEnabled: true,
|
||||
log: logger,
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runHeartbeatOnce,
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
|
||||
// Wait for the timer to fire
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
|
||||
// Give the async run a chance to complete
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
|
||||
cron.stop();
|
||||
|
||||
// runHeartbeatOnce should have been called
|
||||
expect(runHeartbeatOnce).toHaveBeenCalled();
|
||||
|
||||
// The heartbeat config passed should include target: "last" so the
|
||||
// heartbeat runner delivers the response to the last active channel.
|
||||
const callArgs = runHeartbeatOnce.mock.calls[0]?.[0];
|
||||
expect(callArgs).toBeDefined();
|
||||
expect(callArgs?.heartbeat).toBeDefined();
|
||||
expect(callArgs?.heartbeat?.target).toBe("last");
|
||||
});
|
||||
|
||||
it("should not pass heartbeat target for wakeMode=next-heartbeat main jobs", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const now = Date.now();
|
||||
|
||||
const job: CronJob = {
|
||||
id: "test-next-heartbeat",
|
||||
name: "test-next-heartbeat",
|
||||
enabled: true,
|
||||
createdAtMs: now - 10_000,
|
||||
updatedAtMs: now - 10_000,
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "Check in" },
|
||||
state: { nextRunAtMs: now - 1 },
|
||||
};
|
||||
|
||||
await writeCronStoreSnapshot({ storePath, jobs: [job] });
|
||||
|
||||
const enqueueSystemEvent = vi.fn();
|
||||
const requestHeartbeatNow = vi.fn();
|
||||
const runHeartbeatOnce = vi.fn(async () => ({
|
||||
status: "ran" as const,
|
||||
durationMs: 50,
|
||||
}));
|
||||
|
||||
const cron = new CronService({
|
||||
storePath,
|
||||
cronEnabled: true,
|
||||
log: logger,
|
||||
enqueueSystemEvent,
|
||||
requestHeartbeatNow,
|
||||
runHeartbeatOnce,
|
||||
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
|
||||
});
|
||||
|
||||
await cron.start();
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
cron.stop();
|
||||
|
||||
// wakeMode=next-heartbeat uses requestHeartbeatNow, not runHeartbeatOnce
|
||||
expect(requestHeartbeatNow).toHaveBeenCalled();
|
||||
// runHeartbeatOnce should NOT have been called for next-heartbeat mode
|
||||
expect(runHeartbeatOnce).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -56,6 +56,8 @@ export type CronServiceDeps = {
|
||||
reason?: string;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
/** Optional heartbeat config override (e.g. target: "last" for cron-triggered heartbeats). */
|
||||
heartbeat?: { target?: string };
|
||||
}) => Promise<HeartbeatRunResult>;
|
||||
/**
|
||||
* WakeMode=now: max time to wait for runHeartbeatOnce to stop returning
|
||||
|
||||
@@ -663,6 +663,11 @@ export async function executeJobCore(
|
||||
reason,
|
||||
agentId: job.agentId,
|
||||
sessionKey: targetMainSessionKey,
|
||||
// Cron-triggered heartbeats should deliver to the last active channel.
|
||||
// Without this override, heartbeat target defaults to "none" (since
|
||||
// e2362d35) and cron main-session responses are silently swallowed.
|
||||
// See: https://github.com/openclaw/openclaw/issues/28508
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
if (
|
||||
heartbeatResult.status !== "skipped" ||
|
||||
|
||||
@@ -40,7 +40,7 @@ describe("buildGatewayCronService", () => {
|
||||
fetchWithSsrFGuardMock.mockClear();
|
||||
});
|
||||
|
||||
it("canonicalizes non-agent sessionKey to agent store key for enqueue + wake", async () => {
|
||||
it("routes main-target jobs to the main session for enqueue + wake", async () => {
|
||||
const tmpDir = path.join(os.tmpdir(), `server-cron-${Date.now()}`);
|
||||
const cfg = {
|
||||
session: {
|
||||
@@ -73,12 +73,12 @@ describe("buildGatewayCronService", () => {
|
||||
expect(enqueueSystemEventMock).toHaveBeenCalledWith(
|
||||
"hello",
|
||||
expect.objectContaining({
|
||||
sessionKey: "agent:main:discord:channel:ops",
|
||||
sessionKey: "agent:main:main",
|
||||
}),
|
||||
);
|
||||
expect(requestHeartbeatNowMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
sessionKey: "agent:main:discord:channel:ops",
|
||||
sessionKey: undefined,
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
|
||||
@@ -182,11 +182,29 @@ export function buildGatewayCronService(params: {
|
||||
},
|
||||
runHeartbeatOnce: async (opts) => {
|
||||
const { runtimeConfig, agentId, sessionKey } = resolveCronWakeTarget(opts);
|
||||
// Merge cron-supplied heartbeat overrides (e.g. target: "last") with the
|
||||
// fully resolved agent heartbeat config so cron-triggered heartbeats
|
||||
// respect agent-specific overrides (agents.list[].heartbeat) before
|
||||
// falling back to agents.defaults.heartbeat.
|
||||
const agentEntry =
|
||||
Array.isArray(runtimeConfig.agents?.list) &&
|
||||
runtimeConfig.agents.list.find(
|
||||
(entry) =>
|
||||
entry && typeof entry.id === "string" && normalizeAgentId(entry.id) === agentId,
|
||||
);
|
||||
const baseHeartbeat = {
|
||||
...runtimeConfig.agents?.defaults?.heartbeat,
|
||||
...agentEntry?.heartbeat,
|
||||
};
|
||||
const heartbeatOverride = opts?.heartbeat
|
||||
? { ...baseHeartbeat, ...opts.heartbeat }
|
||||
: undefined;
|
||||
return await runHeartbeatOnce({
|
||||
cfg: runtimeConfig,
|
||||
reason: opts?.reason,
|
||||
agentId,
|
||||
sessionKey,
|
||||
heartbeat: heartbeatOverride,
|
||||
deps: { ...params.deps, runtime: defaultRuntime },
|
||||
});
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user