diff --git a/CHANGELOG.md b/CHANGELOG.md index 89de0c5531f..312799a31f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ Docs: https://docs.openclaw.ai ### Changes +- Cron: default `wakeMode` is now `"now"` for new jobs (was `"next-heartbeat"`). (#10776) Thanks @tyler6204. +- Cron: `cron run` defaults to force execution; use `--due` to restrict to due-only. (#10776) Thanks @tyler6204. - Models: support Anthropic Opus 4.6 and OpenAI Codex gpt-5.3-codex (forward-compat fallbacks). (#9853, #10720, #9995) Thanks @TinyTb, @calvin-hpnet, @tyler6204. - Providers: add xAI (Grok) support. (#9885) Thanks @grp06. - Web UI: add token usage dashboard. (#10072) Thanks @Takhoffman. @@ -14,8 +16,16 @@ Docs: https://docs.openclaw.ai - CLI: sort commands alphabetically in help output. (#8068) Thanks @deepsoumya617. - Agents: bump pi-mono to 0.52.7; add embedded forward-compat fallback for Opus 4.6 model ids. +### Added + +- Cron: run history deep-links to session chat from the dashboard. (#10776) Thanks @tyler6204. +- Cron: per-run session keys in run log entries and default labels for cron sessions. (#10776) Thanks @tyler6204. +- Cron: legacy payload field compatibility (`deliver`, `channel`, `to`, `bestEffortDeliver`) in schema. (#10776) Thanks @tyler6204. + ### Fixes +- Cron: scheduler reliability (timer drift, restart catch-up, lock contention, stale running markers). (#10776) Thanks @tyler6204. +- Cron: store migration hardening (legacy field migration, parse error handling, explicit delivery mode persistence). (#10776) Thanks @tyler6204. - Telegram: auto-inject DM topic threadId in message tool + subagent announce. (#7235) Thanks @Lukavyi. - Security: require auth for Gateway canvas host and A2UI assets. (#9518) Thanks @coygeek. - Cron: fix scheduling and reminder delivery regressions; harden next-run recompute + timer re-arming + legacy schedule fields. (#9733, #9823, #9948, #9932) Thanks @tyler6204, @pycckuu, @j2h4u, @fujiwara-tofu-shop. diff --git a/apps/macos/Sources/OpenClaw/CronJobEditor.swift b/apps/macos/Sources/OpenClaw/CronJobEditor.swift index a5207ca1014..517d32df445 100644 --- a/apps/macos/Sources/OpenClaw/CronJobEditor.swift +++ b/apps/macos/Sources/OpenClaw/CronJobEditor.swift @@ -29,7 +29,7 @@ struct CronJobEditor: View { @State var agentId: String = "" @State var enabled: Bool = true @State var sessionTarget: CronSessionTarget = .main - @State var wakeMode: CronWakeMode = .nextHeartbeat + @State var wakeMode: CronWakeMode = .now @State var deleteAfterRun: Bool = false enum ScheduleKind: String, CaseIterable, Identifiable { case at, every, cron; var id: String { rawValue } } @@ -119,8 +119,8 @@ struct CronJobEditor: View { GridRow { self.gridLabel("Wake mode") Picker("", selection: self.$wakeMode) { - Text("next-heartbeat").tag(CronWakeMode.nextHeartbeat) Text("now").tag(CronWakeMode.now) + Text("next-heartbeat").tag(CronWakeMode.nextHeartbeat) } .labelsHidden() .pickerStyle(.segmented) diff --git a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift index dd3cfb50a1d..07c9db84e20 100644 --- a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift @@ -2025,6 +2025,8 @@ public struct CronRunLogEntry: Codable, Sendable { public let status: AnyCodable? public let error: String? public let summary: String? + public let sessionid: String? + public let sessionkey: String? public let runatms: Int? public let durationms: Int? public let nextrunatms: Int? @@ -2036,6 +2038,8 @@ public struct CronRunLogEntry: Codable, Sendable { status: AnyCodable?, error: String?, summary: String?, + sessionid: String?, + sessionkey: String?, runatms: Int?, durationms: Int?, nextrunatms: Int? @@ -2046,6 +2050,8 @@ public struct CronRunLogEntry: Codable, Sendable { self.status = status self.error = error self.summary = summary + self.sessionid = sessionid + self.sessionkey = sessionkey self.runatms = runatms self.durationms = durationms self.nextrunatms = nextrunatms @@ -2057,6 +2063,8 @@ public struct CronRunLogEntry: Codable, Sendable { case status case error case summary + case sessionid = "sessionId" + case sessionkey = "sessionKey" case runatms = "runAtMs" case durationms = "durationMs" case nextrunatms = "nextRunAtMs" diff --git a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift index dd3cfb50a1d..07c9db84e20 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift @@ -2025,6 +2025,8 @@ public struct CronRunLogEntry: Codable, Sendable { public let status: AnyCodable? public let error: String? public let summary: String? + public let sessionid: String? + public let sessionkey: String? public let runatms: Int? public let durationms: Int? public let nextrunatms: Int? @@ -2036,6 +2038,8 @@ public struct CronRunLogEntry: Codable, Sendable { status: AnyCodable?, error: String?, summary: String?, + sessionid: String?, + sessionkey: String?, runatms: Int?, durationms: Int?, nextrunatms: Int? @@ -2046,6 +2050,8 @@ public struct CronRunLogEntry: Codable, Sendable { self.status = status self.error = error self.summary = summary + self.sessionid = sessionid + self.sessionkey = sessionkey self.runatms = runatms self.durationms = durationms self.nextrunatms = nextrunatms @@ -2057,6 +2063,8 @@ public struct CronRunLogEntry: Codable, Sendable { case status case error case summary + case sessionid = "sessionId" + case sessionkey = "sessionKey" case runatms = "runAtMs" case durationms = "durationMs" case nextrunatms = "nextRunAtMs" diff --git a/docs/automation/cron-jobs.md b/docs/automation/cron-jobs.md index 8eb79881ec3..54d6a964705 100644 --- a/docs/automation/cron-jobs.md +++ b/docs/automation/cron-jobs.md @@ -40,7 +40,7 @@ openclaw cron add \ --delete-after-run openclaw cron list -openclaw cron run --force +openclaw cron run openclaw cron runs --id ``` @@ -123,8 +123,8 @@ local timezone is used. Main jobs enqueue a system event and optionally wake the heartbeat runner. They must use `payload.kind = "systemEvent"`. -- `wakeMode: "next-heartbeat"` (default): event waits for the next scheduled heartbeat. -- `wakeMode: "now"`: event triggers an immediate heartbeat run. +- `wakeMode: "now"` (default): event triggers an immediate heartbeat run. +- `wakeMode: "next-heartbeat"`: event waits for the next scheduled heartbeat. This is the best fit when you want the normal heartbeat prompt + main-session context. See [Heartbeat](/gateway/heartbeat). @@ -288,7 +288,7 @@ Notes: - `sessionTarget` must be `"main"` or `"isolated"` and must match `payload.kind`. - Optional fields: `agentId`, `description`, `enabled`, `deleteAfterRun` (defaults to true for `at`), `delivery`. -- `wakeMode` defaults to `"next-heartbeat"` when omitted. +- `wakeMode` defaults to `"now"` when omitted. ### cron.update params @@ -420,10 +420,11 @@ openclaw cron edit --agent ops openclaw cron edit --clear-agent ``` -Manual run (debug): +Manual run (force is the default, use `--due` to only run when due): ```bash -openclaw cron run --force +openclaw cron run +openclaw cron run --due ``` Edit an existing job (patch fields): diff --git a/scripts/test-parallel.mjs b/scripts/test-parallel.mjs index 9ee0a9d8768..786f8bbd14e 100644 --- a/scripts/test-parallel.mjs +++ b/scripts/test-parallel.mjs @@ -32,6 +32,7 @@ const shardCount = isWindowsCi const windowsCiArgs = isWindowsCi ? ["--no-file-parallelism", "--dangerouslyIgnoreUnhandledErrors"] : []; +const passthroughArgs = process.argv.slice(2); const overrideWorkers = Number.parseInt(process.env.OPENCLAW_TEST_WORKERS ?? "", 10); const resolvedOverride = Number.isFinite(overrideWorkers) && overrideWorkers > 0 ? overrideWorkers : null; @@ -96,6 +97,30 @@ const shutdown = (signal) => { process.on("SIGINT", () => shutdown("SIGINT")); process.on("SIGTERM", () => shutdown("SIGTERM")); +if (passthroughArgs.length > 0) { + const args = maxWorkers + ? ["vitest", "run", "--maxWorkers", String(maxWorkers), ...windowsCiArgs, ...passthroughArgs] + : ["vitest", "run", ...windowsCiArgs, ...passthroughArgs]; + const nodeOptions = process.env.NODE_OPTIONS ?? ""; + const nextNodeOptions = WARNING_SUPPRESSION_FLAGS.reduce( + (acc, flag) => (acc.includes(flag) ? acc : `${acc} ${flag}`.trim()), + nodeOptions, + ); + const code = await new Promise((resolve) => { + const child = spawn(pnpm, args, { + stdio: "inherit", + env: { ...process.env, NODE_OPTIONS: nextNodeOptions }, + shell: process.platform === "win32", + }); + children.add(child); + child.on("exit", (exitCode, signal) => { + children.delete(child); + resolve(exitCode ?? (signal ? 1 : 0)); + }); + }); + process.exit(Number(code) || 0); +} + const parallelCodes = await Promise.all(parallelRuns.map(run)); const failedParallel = parallelCodes.find((code) => code !== 0); if (failedParallel !== undefined) { diff --git a/src/agents/openclaw-tools.subagents.sessions-spawn-applies-thinking-default.test.ts b/src/agents/openclaw-tools.subagents.sessions-spawn-applies-thinking-default.test.ts index 36eb50b553c..c9b7175717a 100644 --- a/src/agents/openclaw-tools.subagents.sessions-spawn-applies-thinking-default.test.ts +++ b/src/agents/openclaw-tools.subagents.sessions-spawn-applies-thinking-default.test.ts @@ -45,8 +45,12 @@ describe("sessions_spawn thinking defaults", () => { const agentCall = calls .map((call) => call[0] as { method: string; params?: Record }) .findLast((call) => call.method === "agent"); + const thinkingPatch = calls + .map((call) => call[0] as { method: string; params?: Record }) + .findLast((call) => call.method === "sessions.patch" && call.params?.thinkingLevel); expect(agentCall?.params?.thinking).toBe("high"); + expect(thinkingPatch?.params?.thinkingLevel).toBe("high"); }); it("prefers explicit sessions_spawn.thinking over config default", async () => { @@ -60,7 +64,11 @@ describe("sessions_spawn thinking defaults", () => { const agentCall = calls .map((call) => call[0] as { method: string; params?: Record }) .findLast((call) => call.method === "agent"); + const thinkingPatch = calls + .map((call) => call[0] as { method: string; params?: Record }) + .findLast((call) => call.method === "sessions.patch" && call.params?.thinkingLevel); expect(agentCall?.params?.thinking).toBe("low"); + expect(thinkingPatch?.params?.thinkingLevel).toBe("low"); }); }); diff --git a/src/agents/tools/cron-tool.test.ts b/src/agents/tools/cron-tool.test.ts index 77ffb36e6fb..cee2e57e0f8 100644 --- a/src/agents/tools/cron-tool.test.ts +++ b/src/agents/tools/cron-tool.test.ts @@ -30,8 +30,8 @@ describe("cron tool", () => { ], ["remove", { action: "remove", jobId: "job-1" }, { id: "job-1" }], ["remove", { action: "remove", id: "job-2" }, { id: "job-2" }], - ["run", { action: "run", jobId: "job-1" }, { id: "job-1" }], - ["run", { action: "run", id: "job-2" }, { id: "job-2" }], + ["run", { action: "run", jobId: "job-1" }, { id: "job-1", mode: "force" }], + ["run", { action: "run", id: "job-2" }, { id: "job-2", mode: "force" }], ["runs", { action: "runs", jobId: "job-1" }, { id: "job-1" }], ["runs", { action: "runs", id: "job-2" }, { id: "job-2" }], ])("%s sends id to gateway", async (action, args, expectedParams) => { @@ -58,7 +58,21 @@ describe("cron tool", () => { const call = callGatewayMock.mock.calls[0]?.[0] as { params?: unknown; }; - expect(call?.params).toEqual({ id: "job-primary" }); + expect(call?.params).toEqual({ id: "job-primary", mode: "force" }); + }); + + it("supports due-only run mode", async () => { + const tool = createCronTool(); + await tool.execute("call-due", { + action: "run", + jobId: "job-due", + runMode: "due", + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: unknown; + }; + expect(call?.params).toEqual({ id: "job-due", mode: "due" }); }); it("normalizes cron.add job payloads", async () => { @@ -86,7 +100,7 @@ describe("cron tool", () => { deleteAfterRun: true, schedule: { kind: "at", at: new Date(123).toISOString() }, sessionTarget: "main", - wakeMode: "next-heartbeat", + wakeMode: "now", payload: { kind: "systemEvent", text: "hello" }, }); }); diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index 4c9633144f2..cc5cab54f6e 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -18,6 +18,7 @@ import { resolveInternalSessionKey, resolveMainSessionAlias } from "./sessions-h const CRON_ACTIONS = ["status", "list", "add", "update", "remove", "run", "runs", "wake"] as const; const CRON_WAKE_MODES = ["now", "next-heartbeat"] as const; +const CRON_RUN_MODES = ["due", "force"] as const; const REMINDER_CONTEXT_MESSAGES_MAX = 10; const REMINDER_CONTEXT_PER_MESSAGE_MAX = 220; @@ -37,6 +38,7 @@ const CronToolSchema = Type.Object({ patch: Type.Optional(Type.Object({}, { additionalProperties: true })), text: Type.Optional(Type.String()), mode: optionalStringEnum(CRON_WAKE_MODES), + runMode: optionalStringEnum(CRON_RUN_MODES), contextMessages: Type.Optional( Type.Number({ minimum: 0, maximum: REMINDER_CONTEXT_MESSAGES_MAX }), ), @@ -312,7 +314,6 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con } } - // [Fix Issue 3] Infer delivery target from session key for isolated jobs if not provided if ( opts?.agentSessionKey && job && @@ -393,7 +394,9 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con if (!id) { throw new Error("jobId required (id accepted for backward compatibility)"); } - return jsonResult(await callGatewayTool("cron.run", gatewayOpts, { id })); + const runMode = + params.runMode === "due" || params.runMode === "force" ? params.runMode : "force"; + return jsonResult(await callGatewayTool("cron.run", gatewayOpts, { id, mode: runMode })); } case "runs": { const id = readStringParam(params, "jobId") ?? readStringParam(params, "id"); diff --git a/src/agents/tools/sessions-spawn-tool.ts b/src/agents/tools/sessions-spawn-tool.ts index d73b8c4a0d5..1ed7bcd1c1b 100644 --- a/src/agents/tools/sessions-spawn-tool.ts +++ b/src/agents/tools/sessions-spawn-tool.ts @@ -214,6 +214,26 @@ export function createSessionsSpawnTool(opts?: { modelWarning = messageText; } } + if (thinkingOverride !== undefined) { + try { + await callGateway({ + method: "sessions.patch", + params: { + key: childSessionKey, + thinkingLevel: thinkingOverride === "off" ? null : thinkingOverride, + }, + timeoutMs: 10_000, + }); + } catch (err) { + const messageText = + err instanceof Error ? err.message : typeof err === "string" ? err : "error"; + return jsonResult({ + status: "error", + error: messageText, + childSessionKey, + }); + } + } const childSystemPrompt = buildSubagentSystemPrompt({ requesterSessionKey, requesterOrigin, diff --git a/src/cli/cron-cli/register.cron-add.ts b/src/cli/cron-cli/register.cron-add.ts index 81720418d2b..001fd5f1bf0 100644 --- a/src/cli/cron-cli/register.cron-add.ts +++ b/src/cli/cron-cli/register.cron-add.ts @@ -71,7 +71,7 @@ export function registerCronAddCommand(cron: Command) { .option("--keep-after-run", "Keep one-shot job after it succeeds", false) .option("--agent ", "Agent id for this job") .option("--session ", "Session target (main|isolated)") - .option("--wake ", "Wake mode (now|next-heartbeat)", "next-heartbeat") + .option("--wake ", "Wake mode (now|next-heartbeat)", "now") .option("--at ", "Run once at time (ISO) or +duration (e.g. 20m)") .option("--every ", "Run every duration (e.g. 10m, 1h)") .option("--cron ", "Cron expression (5-field)") @@ -122,8 +122,8 @@ export function registerCronAddCommand(cron: Command) { }; })(); - const wakeModeRaw = typeof opts.wake === "string" ? opts.wake : "next-heartbeat"; - const wakeMode = wakeModeRaw.trim() || "next-heartbeat"; + const wakeModeRaw = typeof opts.wake === "string" ? opts.wake : "now"; + const wakeMode = wakeModeRaw.trim() || "now"; if (wakeMode !== "now" && wakeMode !== "next-heartbeat") { throw new Error("--wake must be now or next-heartbeat"); } diff --git a/src/cli/cron-cli/register.cron-simple.ts b/src/cli/cron-cli/register.cron-simple.ts index 1493c371ac0..e5baa117147 100644 --- a/src/cli/cron-cli/register.cron-simple.ts +++ b/src/cli/cron-cli/register.cron-simple.ts @@ -92,12 +92,12 @@ export function registerCronSimpleCommands(cron: Command) { .command("run") .description("Run a cron job now (debug)") .argument("", "Job id") - .option("--force", "Run even if not due", false) + .option("--due", "Run only when due (default behavior in older versions)", false) .action(async (id, opts) => { try { const res = await callGatewayFromCli("cron.run", opts, { id, - mode: opts.force ? "force" : "due", + mode: opts.due ? "due" : "force", }); defaultRuntime.log(JSON.stringify(res, null, 2)); } catch (err) { diff --git a/src/cron/delivery.test.ts b/src/cron/delivery.test.ts new file mode 100644 index 00000000000..fcbe9e99a3b --- /dev/null +++ b/src/cron/delivery.test.ts @@ -0,0 +1,45 @@ +import { describe, expect, it } from "vitest"; +import type { CronJob } from "./types.js"; +import { resolveCronDeliveryPlan } from "./delivery.js"; + +function makeJob(overrides: Partial): CronJob { + const now = Date.now(); + return { + id: "job-1", + name: "test", + enabled: true, + createdAtMs: now, + updatedAtMs: now, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "hello" }, + state: {}, + ...overrides, + }; +} + +describe("resolveCronDeliveryPlan", () => { + it("defaults to announce when delivery object has no mode", () => { + const plan = resolveCronDeliveryPlan( + makeJob({ + delivery: { channel: "telegram", to: "123", mode: undefined as never }, + }), + ); + expect(plan.mode).toBe("announce"); + expect(plan.requested).toBe(true); + expect(plan.channel).toBe("telegram"); + expect(plan.to).toBe("123"); + }); + + it("respects legacy payload deliver=false", () => { + const plan = resolveCronDeliveryPlan( + makeJob({ + delivery: undefined, + payload: { kind: "agentTurn", message: "hello", deliver: false }, + }), + ); + expect(plan.mode).toBe("none"); + expect(plan.requested).toBe(false); + }); +}); diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index c7cbe87f9bb..f0ba2c2b072 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -32,12 +32,13 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { const delivery = job.delivery; const hasDelivery = delivery && typeof delivery === "object"; const rawMode = hasDelivery ? (delivery as { mode?: unknown }).mode : undefined; + const normalizedMode = typeof rawMode === "string" ? rawMode.trim().toLowerCase() : rawMode; const mode = - rawMode === "announce" + normalizedMode === "announce" ? "announce" - : rawMode === "none" + : normalizedMode === "none" ? "none" - : rawMode === "deliver" + : normalizedMode === "deliver" ? "announce" : undefined; @@ -51,7 +52,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { const channel = deliveryChannel ?? payloadChannel ?? "last"; const to = deliveryTo ?? payloadTo; if (hasDelivery) { - const resolvedMode = mode ?? "none"; + const resolvedMode = mode ?? "announce"; return { mode: resolvedMode, channel, diff --git a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts index 6aac38f88dc..4b5317ef4ea 100644 --- a/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts +++ b/src/cron/isolated-agent.skips-delivery-without-whatsapp-recipient-besteffortdeliver-true.test.ts @@ -134,6 +134,48 @@ describe("runCronIsolatedAgentTurn", () => { }); }); + it("announces only the final payload text", async () => { + await withTempHome(async (home) => { + const storePath = await writeSessionStore(home); + const deps: CliDeps = { + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "Working on it..." }, { text: "Final weather summary" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath, { + channels: { telegram: { botToken: "t-1" } }, + }), + deps, + job: { + ...makeJob({ kind: "agentTurn", message: "do it" }), + delivery: { mode: "announce", channel: "telegram", to: "123" }, + }, + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); + + expect(res.status).toBe("ok"); + expect(deps.sendMessageTelegram).toHaveBeenCalledTimes(1); + expect(deps.sendMessageTelegram).toHaveBeenCalledWith( + "123", + "Final weather summary", + expect.any(Object), + ); + }); + }); + it("skips announce when messaging tool already sent to target", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home); diff --git a/src/cron/isolated-agent.uses-last-non-empty-agent-text-as.test.ts b/src/cron/isolated-agent.uses-last-non-empty-agent-text-as.test.ts index ab547bdf727..3ec1c935b08 100644 --- a/src/cron/isolated-agent.uses-last-non-empty-agent-text-as.test.ts +++ b/src/cron/isolated-agent.uses-last-non-empty-agent-text-as.test.ts @@ -48,7 +48,7 @@ async function writeSessionStore(home: string) { async function readSessionEntry(storePath: string, key: string) { const raw = await fs.readFile(storePath, "utf-8"); - const store = JSON.parse(raw) as Record; + const store = JSON.parse(raw) as Record; return store[key]; } @@ -90,6 +90,38 @@ describe("runCronIsolatedAgentTurn", () => { vi.mocked(loadModelCatalog).mockResolvedValue([]); }); + it("treats blank model overrides as unset", async () => { + await withTempHome(async (home) => { + const storePath = await writeSessionStore(home); + const deps: CliDeps = { + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath), + deps, + job: makeJob({ kind: "agentTurn", message: "do it", model: " " }), + message: "do it", + sessionKey: "cron:job-1", + lane: "cron", + }); + + expect(res.status).toBe("ok"); + expect(vi.mocked(runEmbeddedPiAgent)).toHaveBeenCalledTimes(1); + }); + }); + it("uses last non-empty agent text as summary", async () => { await withTempHome(async (home) => { const storePath = await writeSessionStore(home); @@ -585,6 +617,49 @@ describe("runCronIsolatedAgentTurn", () => { expect(first?.sessionId).toBeDefined(); expect(second?.sessionId).toBeDefined(); expect(second?.sessionId).not.toBe(first?.sessionId); + expect(first?.label).toBe("Cron: job-1"); + expect(second?.label).toBe("Cron: job-1"); + }); + }); + + it("preserves an existing cron session label", async () => { + await withTempHome(async (home) => { + const storePath = await writeSessionStore(home); + const raw = await fs.readFile(storePath, "utf-8"); + const store = JSON.parse(raw) as Record>; + store["agent:main:cron:job-1"] = { + sessionId: "old", + updatedAt: Date.now(), + label: "Nightly digest", + }; + await fs.writeFile(storePath, JSON.stringify(store, null, 2), "utf-8"); + + const deps: CliDeps = { + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); + + await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath), + deps, + job: makeJob({ kind: "agentTurn", message: "ping", deliver: false }), + message: "ping", + sessionKey: "cron:job-1", + lane: "cron", + }); + const entry = await readSessionEntry(storePath, "agent:main:cron:job-1"); + + expect(entry?.label).toBe("Nightly digest"); }); }); }); diff --git a/src/cron/isolated-agent/delivery-target.ts b/src/cron/isolated-agent/delivery-target.ts index 5be448b2c1f..35ccc904739 100644 --- a/src/cron/isolated-agent/delivery-target.ts +++ b/src/cron/isolated-agent/delivery-target.ts @@ -30,6 +30,7 @@ export async function resolveDeliveryTarget( }> { const requestedChannel = typeof jobPayload.channel === "string" ? jobPayload.channel : "last"; const explicitTo = typeof jobPayload.to === "string" ? jobPayload.to : undefined; + const allowMismatchedLastTo = requestedChannel === "last"; const sessionCfg = cfg.session; const mainSessionKey = resolveAgentMainSessionKey({ cfg, agentId }); @@ -41,7 +42,7 @@ export async function resolveDeliveryTarget( entry: main, requestedChannel, explicitTo, - allowMismatchedLastTo: true, + allowMismatchedLastTo, }); let fallbackChannel: Exclude | undefined; @@ -60,7 +61,7 @@ export async function resolveDeliveryTarget( requestedChannel, explicitTo, fallbackChannel, - allowMismatchedLastTo: true, + allowMismatchedLastTo, mode: preliminary.mode, }) : preliminary; diff --git a/src/cron/isolated-agent/helpers.ts b/src/cron/isolated-agent/helpers.ts index ddc72d6456b..d4d42b20fe5 100644 --- a/src/cron/isolated-agent/helpers.ts +++ b/src/cron/isolated-agent/helpers.ts @@ -8,6 +8,7 @@ type DeliveryPayload = { text?: string; mediaUrl?: string; mediaUrls?: string[]; + channelData?: Record; }; export function pickSummaryFromOutput(text: string | undefined) { @@ -39,6 +40,19 @@ export function pickLastNonEmptyTextFromPayloads(payloads: Array<{ text?: string return undefined; } +export function pickLastDeliverablePayload(payloads: DeliveryPayload[]) { + for (let i = payloads.length - 1; i >= 0; i--) { + const payload = payloads[i]; + const text = (payload?.text ?? "").trim(); + const hasMedia = Boolean(payload?.mediaUrl) || (payload?.mediaUrls?.length ?? 0) > 0; + const hasChannelData = Object.keys(payload?.channelData ?? {}).length > 0; + if (text || hasMedia || hasChannelData) { + return payload; + } + } + return undefined; +} + /** * Check if all payloads are just heartbeat ack responses (HEARTBEAT_OK). * Returns true if delivery should be skipped because there's no real content. diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 3273cb8f9ba..3dd0cc41657 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -56,6 +56,7 @@ import { resolveCronDeliveryPlan } from "../delivery.js"; import { resolveDeliveryTarget } from "./delivery-target.js"; import { isHeartbeatOnlyResponse, + pickLastDeliverablePayload, pickLastNonEmptyTextFromPayloads, pickSummaryFromOutput, pickSummaryFromPayloads, @@ -97,6 +98,8 @@ export type RunCronAgentTurnResult = { /** Last non-empty agent text output (not truncated). */ outputText?: string; error?: string; + sessionId?: string; + sessionKey?: string; }; export async function runCronIsolatedAgentTurn(params: { @@ -187,14 +190,12 @@ export async function runCronIsolatedAgentTurn(params: { } const modelOverrideRaw = params.job.payload.kind === "agentTurn" ? params.job.payload.model : undefined; - if (modelOverrideRaw !== undefined) { - if (typeof modelOverrideRaw !== "string") { - return { status: "error", error: "invalid model: expected string" }; - } + const modelOverride = typeof modelOverrideRaw === "string" ? modelOverrideRaw.trim() : undefined; + if (modelOverride !== undefined && modelOverride.length > 0) { const resolvedOverride = resolveAllowedModelRef({ cfg: cfgWithAgentDefaults, catalog: await loadCatalog(), - raw: modelOverrideRaw, + raw: modelOverride, defaultProvider: resolvedDefault.provider, defaultModel: resolvedDefault.model, }); @@ -211,6 +212,36 @@ export async function runCronIsolatedAgentTurn(params: { agentId, nowMs: now, }); + const runSessionId = cronSession.sessionEntry.sessionId; + const runSessionKey = baseSessionKey.startsWith("cron:") + ? `${agentSessionKey}:run:${runSessionId}` + : agentSessionKey; + const persistSessionEntry = async () => { + cronSession.store[agentSessionKey] = cronSession.sessionEntry; + if (runSessionKey !== agentSessionKey) { + cronSession.store[runSessionKey] = cronSession.sessionEntry; + } + await updateSessionStore(cronSession.storePath, (store) => { + store[agentSessionKey] = cronSession.sessionEntry; + if (runSessionKey !== agentSessionKey) { + store[runSessionKey] = cronSession.sessionEntry; + } + }); + }; + const withRunSession = ( + result: Omit, + ): RunCronAgentTurnResult => ({ + ...result, + sessionId: runSessionId, + sessionKey: runSessionKey, + }); + if (!cronSession.sessionEntry.label?.trim() && baseSessionKey.startsWith("cron:")) { + const labelSuffix = + typeof params.job.name === "string" && params.job.name.trim() + ? params.job.name.trim() + : params.job.id; + cronSession.sessionEntry.label = `Cron: ${labelSuffix}`; + } // Resolve thinking level - job thinking > hooks.gmail.thinking > agent default const hooksGmailThinking = isGmailHook @@ -317,18 +348,12 @@ export async function runCronIsolatedAgentTurn(params: { updatedAt: Date.now(), skillsSnapshot, }; - cronSession.store[agentSessionKey] = cronSession.sessionEntry; - await updateSessionStore(cronSession.storePath, (store) => { - store[agentSessionKey] = cronSession.sessionEntry; - }); + await persistSessionEntry(); } // Persist systemSent before the run, mirroring the inbound auto-reply behavior. cronSession.sessionEntry.systemSent = true; - cronSession.store[agentSessionKey] = cronSession.sessionEntry; - await updateSessionStore(cronSession.storePath, (store) => { - store[agentSessionKey] = cronSession.sessionEntry; - }); + await persistSessionEntry(); let runResult: Awaited>; let fallbackProvider = provider; @@ -396,7 +421,7 @@ export async function runCronIsolatedAgentTurn(params: { fallbackProvider = fallbackResult.provider; fallbackModel = fallbackResult.model; } catch (err) { - return { status: "error", error: String(err) }; + return withRunSession({ status: "error", error: String(err) }); } const payloads = runResult.payloads ?? []; @@ -427,14 +452,19 @@ export async function runCronIsolatedAgentTurn(params: { cronSession.sessionEntry.totalTokens = promptTokens > 0 ? promptTokens : (usage.total ?? input); } - cronSession.store[agentSessionKey] = cronSession.sessionEntry; - await updateSessionStore(cronSession.storePath, (store) => { - store[agentSessionKey] = cronSession.sessionEntry; - }); + await persistSessionEntry(); } const firstText = payloads[0]?.text ?? ""; const summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText); const outputText = pickLastNonEmptyTextFromPayloads(payloads); + const synthesizedText = outputText?.trim() || summary?.trim() || undefined; + const deliveryPayload = pickLastDeliverablePayload(payloads); + const deliveryPayloads = + deliveryPayload !== undefined + ? [deliveryPayload] + : synthesizedText + ? [{ text: synthesizedText }] + : []; const deliveryBestEffort = resolveCronDeliveryBestEffort(params.job); // Skip delivery for heartbeat-only responses (HEARTBEAT_OK with no real content). @@ -454,28 +484,28 @@ export async function runCronIsolatedAgentTurn(params: { if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) { if (resolvedDelivery.error) { if (!deliveryBestEffort) { - return { + return withRunSession({ status: "error", error: resolvedDelivery.error.message, summary, outputText, - }; + }); } logWarn(`[cron:${params.job.id}] ${resolvedDelivery.error.message}`); - return { status: "ok", summary, outputText }; + return withRunSession({ status: "ok", summary, outputText }); } if (!resolvedDelivery.to) { const message = "cron delivery target is missing"; if (!deliveryBestEffort) { - return { + return withRunSession({ status: "error", error: message, summary, outputText, - }; + }); } logWarn(`[cron:${params.job.id}] ${message}`); - return { status: "ok", summary, outputText }; + return withRunSession({ status: "ok", summary, outputText }); } try { await deliverOutboundPayloads({ @@ -484,16 +514,16 @@ export async function runCronIsolatedAgentTurn(params: { to: resolvedDelivery.to, accountId: resolvedDelivery.accountId, threadId: resolvedDelivery.threadId, - payloads, + payloads: deliveryPayloads, bestEffort: deliveryBestEffort, deps: createOutboundSendDeps(params.deps), }); } catch (err) { if (!deliveryBestEffort) { - return { status: "error", summary, outputText, error: String(err) }; + return withRunSession({ status: "error", summary, outputText, error: String(err) }); } } } - return { status: "ok", summary, outputText }; + return withRunSession({ status: "ok", summary, outputText }); } diff --git a/src/cron/isolated-agent/session.ts b/src/cron/isolated-agent/session.ts index 8428efeb4f0..c31a35465c4 100644 --- a/src/cron/isolated-agent/session.ts +++ b/src/cron/isolated-agent/session.ts @@ -28,6 +28,8 @@ export function resolveCronSession(params: { lastChannel: entry?.lastChannel, lastTo: entry?.lastTo, lastAccountId: entry?.lastAccountId, + label: entry?.label, + displayName: entry?.displayName, skillsSnapshot: entry?.skillsSnapshot, }; return { storePath, store, sessionEntry, systemSent, isNewSession: true }; diff --git a/src/cron/normalize.test.ts b/src/cron/normalize.test.ts index a876e031758..99c6748364c 100644 --- a/src/cron/normalize.test.ts +++ b/src/cron/normalize.test.ts @@ -234,4 +234,62 @@ describe("normalizeCronJobCreate", () => { expect(delivery.mode).toBe("announce"); expect((normalized as { isolation?: unknown }).isolation).toBeUndefined(); }); + + it("infers payload kind/session target and name for message-only jobs", () => { + const normalized = normalizeCronJobCreate({ + schedule: { kind: "every", everyMs: 60_000 }, + payload: { message: "Nightly backup" }, + }) as unknown as Record; + + const payload = normalized.payload as Record; + expect(payload.kind).toBe("agentTurn"); + expect(payload.message).toBe("Nightly backup"); + expect(normalized.sessionTarget).toBe("isolated"); + expect(normalized.wakeMode).toBe("now"); + expect(typeof normalized.name).toBe("string"); + }); + + it("maps top-level model/thinking/timeout into payload for legacy add params", () => { + const normalized = normalizeCronJobCreate({ + name: "legacy root fields", + schedule: { kind: "every", everyMs: 60_000 }, + payload: { kind: "agentTurn", message: "hello" }, + model: " openrouter/deepseek/deepseek-r1 ", + thinking: " high ", + timeoutSeconds: 45, + allowUnsafeExternalContent: true, + }) as unknown as Record; + + const payload = normalized.payload as Record; + expect(payload.model).toBe("openrouter/deepseek/deepseek-r1"); + expect(payload.thinking).toBe("high"); + expect(payload.timeoutSeconds).toBe(45); + expect(payload.allowUnsafeExternalContent).toBe(true); + }); + + it("coerces sessionTarget and wakeMode casing", () => { + const normalized = normalizeCronJobCreate({ + name: "casing", + schedule: { kind: "cron", expr: "* * * * *" }, + sessionTarget: " IsOlAtEd ", + wakeMode: " NOW ", + payload: { kind: "agentTurn", message: "hello" }, + }) as unknown as Record; + + expect(normalized.sessionTarget).toBe("isolated"); + expect(normalized.wakeMode).toBe("now"); + }); + + it("strips invalid delivery mode from partial delivery objects", () => { + const normalized = normalizeCronJobCreate({ + name: "delivery mode", + schedule: { kind: "cron", expr: "* * * * *" }, + payload: { kind: "agentTurn", message: "hello" }, + delivery: { mode: "bogus", to: "123" }, + }) as unknown as Record; + + const delivery = normalized.delivery as Record; + expect(delivery.mode).toBeUndefined(); + expect(delivery.to).toBe("123"); + }); }); diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index 733be718c1b..a41044b3632 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -2,6 +2,7 @@ import type { CronJobCreate, CronJobPatch } from "./types.js"; import { sanitizeAgentId } from "../routing/session-key.js"; import { parseAbsoluteTimeMs } from "./parse.js"; import { migrateLegacyCronPayload } from "./payload-migration.js"; +import { inferLegacyName } from "./service/normalize.js"; type UnknownRecord = Record; @@ -19,7 +20,8 @@ function isRecord(value: unknown): value is UnknownRecord { function coerceSchedule(schedule: UnknownRecord) { const next: UnknownRecord = { ...schedule }; - const kind = typeof schedule.kind === "string" ? schedule.kind : undefined; + const rawKind = typeof schedule.kind === "string" ? schedule.kind.trim().toLowerCase() : ""; + const kind = rawKind === "at" || rawKind === "every" || rawKind === "cron" ? rawKind : undefined; const atMsRaw = schedule.atMs; const atRaw = schedule.at; const atString = typeof atRaw === "string" ? atRaw.trim() : ""; @@ -32,7 +34,9 @@ function coerceSchedule(schedule: UnknownRecord) { ? parseAbsoluteTimeMs(atString) : null; - if (!kind) { + if (kind) { + next.kind = kind; + } else { if ( typeof schedule.atMs === "number" || typeof schedule.at === "string" || @@ -47,7 +51,7 @@ function coerceSchedule(schedule: UnknownRecord) { } if (atString) { - next.at = parsedAtMs ? new Date(parsedAtMs).toISOString() : atString; + next.at = parsedAtMs !== null ? new Date(parsedAtMs).toISOString() : atString; } else if (parsedAtMs !== null) { next.at = new Date(parsedAtMs).toISOString(); } @@ -62,6 +66,72 @@ function coercePayload(payload: UnknownRecord) { const next: UnknownRecord = { ...payload }; // Back-compat: older configs used `provider` for delivery channel. migrateLegacyCronPayload(next); + const kindRaw = typeof next.kind === "string" ? next.kind.trim().toLowerCase() : ""; + if (kindRaw === "agentturn") { + next.kind = "agentTurn"; + } else if (kindRaw === "systemevent") { + next.kind = "systemEvent"; + } else if (kindRaw) { + next.kind = kindRaw; + } + if (!next.kind) { + const hasMessage = typeof next.message === "string" && next.message.trim().length > 0; + const hasText = typeof next.text === "string" && next.text.trim().length > 0; + if (hasMessage) { + next.kind = "agentTurn"; + } else if (hasText) { + next.kind = "systemEvent"; + } + } + if (typeof next.message === "string") { + const trimmed = next.message.trim(); + if (trimmed) { + next.message = trimmed; + } + } + if (typeof next.text === "string") { + const trimmed = next.text.trim(); + if (trimmed) { + next.text = trimmed; + } + } + if ("model" in next) { + if (typeof next.model === "string") { + const trimmed = next.model.trim(); + if (trimmed) { + next.model = trimmed; + } else { + delete next.model; + } + } else { + delete next.model; + } + } + if ("thinking" in next) { + if (typeof next.thinking === "string") { + const trimmed = next.thinking.trim(); + if (trimmed) { + next.thinking = trimmed; + } else { + delete next.thinking; + } + } else { + delete next.thinking; + } + } + if ("timeoutSeconds" in next) { + if (typeof next.timeoutSeconds === "number" && Number.isFinite(next.timeoutSeconds)) { + next.timeoutSeconds = Math.max(1, Math.floor(next.timeoutSeconds)); + } else { + delete next.timeoutSeconds; + } + } + if ( + "allowUnsafeExternalContent" in next && + typeof next.allowUnsafeExternalContent !== "boolean" + ) { + delete next.allowUnsafeExternalContent; + } return next; } @@ -69,7 +139,15 @@ function coerceDelivery(delivery: UnknownRecord) { const next: UnknownRecord = { ...delivery }; if (typeof delivery.mode === "string") { const mode = delivery.mode.trim().toLowerCase(); - next.mode = mode === "deliver" ? "announce" : mode; + if (mode === "deliver") { + next.mode = "announce"; + } else if (mode === "announce" || mode === "none") { + next.mode = mode; + } else { + delete next.mode; + } + } else if ("mode" in next) { + delete next.mode; } if (typeof delivery.channel === "string") { const trimmed = delivery.channel.trim().toLowerCase(); @@ -147,6 +225,95 @@ function unwrapJob(raw: UnknownRecord) { return raw; } +function normalizeSessionTarget(raw: unknown) { + if (typeof raw !== "string") { + return undefined; + } + const trimmed = raw.trim().toLowerCase(); + if (trimmed === "main" || trimmed === "isolated") { + return trimmed; + } + return undefined; +} + +function normalizeWakeMode(raw: unknown) { + if (typeof raw !== "string") { + return undefined; + } + const trimmed = raw.trim().toLowerCase(); + if (trimmed === "now" || trimmed === "next-heartbeat") { + return trimmed; + } + return undefined; +} + +function copyTopLevelAgentTurnFields(next: UnknownRecord, payload: UnknownRecord) { + const copyString = (field: "model" | "thinking") => { + if (typeof payload[field] === "string" && payload[field].trim()) { + return; + } + const value = next[field]; + if (typeof value === "string" && value.trim()) { + payload[field] = value.trim(); + } + }; + copyString("model"); + copyString("thinking"); + + if (typeof payload.timeoutSeconds !== "number" && typeof next.timeoutSeconds === "number") { + payload.timeoutSeconds = next.timeoutSeconds; + } + if ( + typeof payload.allowUnsafeExternalContent !== "boolean" && + typeof next.allowUnsafeExternalContent === "boolean" + ) { + payload.allowUnsafeExternalContent = next.allowUnsafeExternalContent; + } +} + +function copyTopLevelLegacyDeliveryFields(next: UnknownRecord, payload: UnknownRecord) { + if (typeof payload.deliver !== "boolean" && typeof next.deliver === "boolean") { + payload.deliver = next.deliver; + } + if ( + typeof payload.channel !== "string" && + typeof next.channel === "string" && + next.channel.trim() + ) { + payload.channel = next.channel.trim(); + } + if (typeof payload.to !== "string" && typeof next.to === "string" && next.to.trim()) { + payload.to = next.to.trim(); + } + if ( + typeof payload.bestEffortDeliver !== "boolean" && + typeof next.bestEffortDeliver === "boolean" + ) { + payload.bestEffortDeliver = next.bestEffortDeliver; + } + if ( + typeof payload.provider !== "string" && + typeof next.provider === "string" && + next.provider.trim() + ) { + payload.provider = next.provider.trim(); + } +} + +function stripLegacyTopLevelFields(next: UnknownRecord) { + delete next.model; + delete next.thinking; + delete next.timeoutSeconds; + delete next.allowUnsafeExternalContent; + delete next.message; + delete next.text; + delete next.deliver; + delete next.channel; + delete next.to; + delete next.bestEffortDeliver; + delete next.provider; +} + export function normalizeCronJobInput( raw: unknown, options: NormalizeOptions = DEFAULT_OPTIONS, @@ -186,10 +353,38 @@ export function normalizeCronJobInput( } } + if ("sessionTarget" in base) { + const normalized = normalizeSessionTarget(base.sessionTarget); + if (normalized) { + next.sessionTarget = normalized; + } else { + delete next.sessionTarget; + } + } + + if ("wakeMode" in base) { + const normalized = normalizeWakeMode(base.wakeMode); + if (normalized) { + next.wakeMode = normalized; + } else { + delete next.wakeMode; + } + } + if (isRecord(base.schedule)) { next.schedule = coerceSchedule(base.schedule); } + if (!("payload" in next) || !isRecord(next.payload)) { + const message = typeof next.message === "string" ? next.message.trim() : ""; + const text = typeof next.text === "string" ? next.text.trim() : ""; + if (message) { + next.payload = { kind: "agentTurn", message }; + } else if (text) { + next.payload = { kind: "systemEvent", text }; + } + } + if (isRecord(base.payload)) { next.payload = coercePayload(base.payload); } @@ -198,17 +393,39 @@ export function normalizeCronJobInput( next.delivery = coerceDelivery(base.delivery); } - if (isRecord(base.isolation)) { + if ("isolation" in next) { delete next.isolation; } + const payload = isRecord(next.payload) ? next.payload : null; + if (payload && payload.kind === "agentTurn") { + copyTopLevelAgentTurnFields(next, payload); + copyTopLevelLegacyDeliveryFields(next, payload); + } + stripLegacyTopLevelFields(next); + if (options.applyDefaults) { if (!next.wakeMode) { - next.wakeMode = "next-heartbeat"; + next.wakeMode = "now"; } if (typeof next.enabled !== "boolean") { next.enabled = true; } + if ( + (typeof next.name !== "string" || !next.name.trim()) && + isRecord(next.schedule) && + isRecord(next.payload) + ) { + next.name = inferLegacyName({ + schedule: next.schedule as { kind?: unknown; everyMs?: unknown; expr?: unknown }, + payload: next.payload as { kind?: unknown; text?: unknown; message?: unknown }, + }); + } else if (typeof next.name === "string") { + const trimmed = next.name.trim(); + if (trimmed) { + next.name = trimmed; + } + } if (!next.sessionTarget && isRecord(next.payload)) { const kind = typeof next.payload.kind === "string" ? next.payload.kind : ""; if (kind === "systemEvent") { diff --git a/src/cron/run-log.test.ts b/src/cron/run-log.test.ts index cef09acfe2d..6ac9cca2bb1 100644 --- a/src/cron/run-log.test.ts +++ b/src/cron/run-log.test.ts @@ -65,6 +65,8 @@ describe("cron run log", () => { jobId: "a", action: "finished", status: "skipped", + sessionId: "run-123", + sessionKey: "agent:main:cron:a:run:run-123", }); const allA = await readCronRunLogEntries(logPathA, { limit: 10 }); @@ -78,6 +80,8 @@ describe("cron run log", () => { const lastOne = await readCronRunLogEntries(logPathA, { limit: 1 }); expect(lastOne.map((e) => e.ts)).toEqual([3]); + expect(lastOne[0]?.sessionId).toBe("run-123"); + expect(lastOne[0]?.sessionKey).toBe("agent:main:cron:a:run:run-123"); const onlyB = await readCronRunLogEntries(logPathB, { limit: 10, diff --git a/src/cron/run-log.ts b/src/cron/run-log.ts index 744b023e535..25846ce81a0 100644 --- a/src/cron/run-log.ts +++ b/src/cron/run-log.ts @@ -8,6 +8,8 @@ export type CronRunLogEntry = { status?: "ok" | "error" | "skipped"; error?: string; summary?: string; + sessionId?: string; + sessionKey?: string; runAtMs?: number; durationMs?: number; nextRunAtMs?: number; @@ -93,7 +95,24 @@ export async function readCronRunLogEntries( if (jobId && obj.jobId !== jobId) { continue; } - parsed.push(obj as CronRunLogEntry); + const entry: CronRunLogEntry = { + ts: obj.ts, + jobId: obj.jobId, + action: "finished", + status: obj.status, + error: obj.error, + summary: obj.summary, + runAtMs: obj.runAtMs, + durationMs: obj.durationMs, + nextRunAtMs: obj.nextRunAtMs, + }; + if (typeof obj.sessionId === "string" && obj.sessionId.trim().length > 0) { + entry.sessionId = obj.sessionId; + } + if (typeof obj.sessionKey === "string" && obj.sessionKey.trim().length > 0) { + entry.sessionKey = obj.sessionKey; + } + parsed.push(entry); } catch { // ignore invalid lines } diff --git a/src/cron/schedule.ts b/src/cron/schedule.ts index 252d29babe7..fc13ebfe2b6 100644 --- a/src/cron/schedule.ts +++ b/src/cron/schedule.ts @@ -2,6 +2,14 @@ import { Cron } from "croner"; import type { CronSchedule } from "./types.js"; import { parseAbsoluteTimeMs } from "./parse.js"; +function resolveCronTimezone(tz?: string) { + const trimmed = typeof tz === "string" ? tz.trim() : ""; + if (trimmed) { + return trimmed; + } + return Intl.DateTimeFormat().resolvedOptions().timeZone; +} + export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): number | undefined { if (schedule.kind === "at") { // Handle both canonical `at` (string) and legacy `atMs` (number) fields. @@ -38,9 +46,20 @@ export function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): numbe return undefined; } const cron = new Cron(expr, { - timezone: schedule.tz?.trim() || undefined, + timezone: resolveCronTimezone(schedule.tz), catch: false, }); - const next = cron.nextRun(new Date(nowMs)); - return next ? next.getTime() : undefined; + let cursor = nowMs; + for (let attempt = 0; attempt < 3; attempt++) { + const next = cron.nextRun(new Date(cursor)); + if (!next) { + return undefined; + } + const nextMs = next.getTime(); + if (Number.isFinite(nextMs) && nextMs > nowMs) { + return nextMs; + } + cursor += 1_000; + } + return undefined; } diff --git a/src/cron/service.delivery-plan.test.ts b/src/cron/service.delivery-plan.test.ts new file mode 100644 index 00000000000..707868cba68 --- /dev/null +++ b/src/cron/service.delivery-plan.test.ts @@ -0,0 +1,92 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it, vi } from "vitest"; +import { CronService } from "./service.js"; + +const noopLogger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-delivery-")); + return { + storePath: path.join(dir, "cron", "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("CronService delivery plan consistency", () => { + it("does not post isolated summary when legacy deliver=false", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok", summary: "done" })), + }); + await cron.start(); + const job = await cron.add({ + name: "legacy-off", + schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { + kind: "agentTurn", + message: "hello", + deliver: false, + }, + }); + + const result = await cron.run(job.id, "force"); + expect(result).toEqual({ ok: true, ran: true }); + expect(enqueueSystemEvent).not.toHaveBeenCalled(); + + cron.stop(); + await store.cleanup(); + }); + + it("treats delivery object without mode as announce", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok", summary: "done" })), + }); + await cron.start(); + const job = await cron.add({ + name: "partial-delivery", + schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { + kind: "agentTurn", + message: "hello", + }, + delivery: { channel: "telegram", to: "123" } as unknown as { + mode: "none" | "announce"; + channel?: string; + to?: string; + }, + }); + + const result = await cron.run(job.id, "force"); + expect(result).toEqual({ ok: true, ran: true }); + expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron: done", { agentId: undefined }); + + cron.stop(); + await store.cleanup(); + }); +}); diff --git a/src/cron/service.every-jobs-fire.test.ts b/src/cron/service.every-jobs-fire.test.ts index a6a2bab80f5..7ae49ac2d0d 100644 --- a/src/cron/service.every-jobs-fire.test.ts +++ b/src/cron/service.every-jobs-fire.test.ts @@ -2,6 +2,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { CronJob } from "./types.js"; import { CronService } from "./service.js"; const noopLogger = { @@ -21,6 +22,23 @@ async function makeStorePath() { }; } +async function waitForJob( + cron: CronService, + id: string, + predicate: (job: CronJob | undefined) => boolean, +) { + let latest: CronJob | undefined; + for (let i = 0; i < 30; i++) { + const jobs = await cron.list({ includeDisabled: true }); + latest = jobs.find((job) => job.id === id); + if (predicate(latest)) { + return latest; + } + await vi.runOnlyPendingTimersAsync(); + } + return latest; +} + describe("CronService interval/cron jobs fire on time", () => { beforeEach(() => { vi.useFakeTimers(); @@ -66,9 +84,7 @@ describe("CronService interval/cron jobs fire on time", () => { vi.setSystemTime(new Date(firstDueAt + 5)); await vi.runOnlyPendingTimersAsync(); - // Wait for the async onTimer to complete via the lock queue. - const jobs = await cron.list(); - const updated = jobs.find((j) => j.id === job.id); + const updated = await waitForJob(cron, job.id, (current) => current?.state.lastStatus === "ok"); expect(enqueueSystemEvent).toHaveBeenCalledWith("tick", { agentId: undefined }); expect(updated?.state.lastStatus).toBe("ok"); @@ -112,9 +128,7 @@ describe("CronService interval/cron jobs fire on time", () => { vi.setSystemTime(new Date(firstDueAt + 5)); await vi.runOnlyPendingTimersAsync(); - // Wait for the async onTimer to complete via the lock queue. - const jobs = await cron.list(); - const updated = jobs.find((j) => j.id === job.id); + const updated = await waitForJob(cron, job.id, (current) => current?.state.lastStatus === "ok"); expect(enqueueSystemEvent).toHaveBeenCalledWith("cron-tick", { agentId: undefined }); expect(updated?.state.lastStatus).toBe("ok"); @@ -124,4 +138,88 @@ describe("CronService interval/cron jobs fire on time", () => { cron.stop(); await store.cleanup(); }); + + it("keeps legacy every jobs due while minute cron jobs recompute schedules", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + const nowMs = Date.parse("2025-12-13T00:00:00.000Z"); + + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile( + store.storePath, + JSON.stringify( + { + version: 1, + jobs: [ + { + id: "legacy-every", + name: "legacy every", + enabled: true, + createdAtMs: nowMs, + updatedAtMs: nowMs, + schedule: { kind: "every", everyMs: 120_000 }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "sf-tick" }, + state: { nextRunAtMs: nowMs + 120_000 }, + }, + { + id: "minute-cron", + name: "minute cron", + enabled: true, + createdAtMs: nowMs, + updatedAtMs: nowMs, + schedule: { kind: "cron", expr: "* * * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "minute-tick" }, + state: { nextRunAtMs: nowMs + 60_000 }, + }, + ], + }, + null, + 2, + ), + "utf-8", + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + for (let minute = 1; minute <= 6; minute++) { + vi.setSystemTime(new Date(nowMs + minute * 60_000)); + const minuteRun = await cron.run("minute-cron", "force"); + expect(minuteRun).toEqual({ ok: true, ran: true }); + } + + vi.setSystemTime(new Date(nowMs + 6 * 60_000)); + const sfRun = await cron.run("legacy-every", "due"); + expect(sfRun).toEqual({ ok: true, ran: true }); + + const sfRuns = enqueueSystemEvent.mock.calls.filter((args) => args[0] === "sf-tick").length; + const minuteRuns = enqueueSystemEvent.mock.calls.filter( + (args) => args[0] === "minute-tick", + ).length; + expect(minuteRuns).toBeGreaterThan(0); + expect(sfRuns).toBeGreaterThan(0); + + const jobs = await cron.list({ includeDisabled: true }); + const sfJob = jobs.find((job) => job.id === "legacy-every"); + expect(sfJob?.state.lastStatus).toBe("ok"); + expect(sfJob?.schedule.kind).toBe("every"); + if (sfJob?.schedule.kind === "every") { + expect(sfJob.schedule.anchorMs).toBe(nowMs); + } + + cron.stop(); + await store.cleanup(); + }); }); diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts new file mode 100644 index 00000000000..c793979c167 --- /dev/null +++ b/src/cron/service.issue-regressions.test.ts @@ -0,0 +1,346 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { setTimeout as delay } from "node:timers/promises"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { CronJob } from "./types.js"; +import { CronService } from "./service.js"; +import { createCronServiceState, type CronEvent } from "./service/state.js"; +import { onTimer } from "./service/timer.js"; + +const noopLogger = { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + trace: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "cron-issues-")); + const storePath = path.join(dir, "jobs.json"); + return { + storePath, + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +function createDueIsolatedJob(params: { + id: string; + nowMs: number; + nextRunAtMs: number; + deleteAfterRun?: boolean; +}): CronJob { + return { + id: params.id, + name: params.id, + enabled: true, + deleteAfterRun: params.deleteAfterRun ?? false, + createdAtMs: params.nowMs, + updatedAtMs: params.nowMs, + schedule: { kind: "at", at: new Date(params.nextRunAtMs).toISOString() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: params.id }, + delivery: { mode: "none" }, + state: { nextRunAtMs: params.nextRunAtMs }, + }; +} + +describe("Cron issue regressions", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-02-06T10:05:00.000Z")); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.clearAllMocks(); + }); + + it("recalculates nextRunAtMs when schedule changes", async () => { + const store = await makeStorePath(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }), + }); + await cron.start(); + + const created = await cron.add({ + name: "hourly", + schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC" }, + sessionTarget: "main", + payload: { kind: "systemEvent", text: "tick" }, + }); + expect(created.state.nextRunAtMs).toBe(Date.parse("2026-02-06T11:00:00.000Z")); + + const updated = await cron.update(created.id, { + schedule: { kind: "cron", expr: "0 */2 * * *", tz: "UTC" }, + }); + + expect(updated.state.nextRunAtMs).toBe(Date.parse("2026-02-06T12:00:00.000Z")); + + cron.stop(); + await store.cleanup(); + }); + + it("runs immediately with force mode even when not due", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }), + }); + await cron.start(); + + const created = await cron.add({ + name: "force-now", + schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() }, + sessionTarget: "main", + payload: { kind: "systemEvent", text: "force" }, + }); + + const result = await cron.run(created.id, "force"); + + expect(result).toEqual({ ok: true, ran: true }); + expect(enqueueSystemEvent).toHaveBeenCalledWith("force", { agentId: undefined }); + + cron.stop(); + await store.cleanup(); + }); + + it("schedules isolated jobs with next wake time", async () => { + const store = await makeStorePath(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }), + }); + await cron.start(); + + const job = await cron.add({ + name: "isolated", + schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() }, + sessionTarget: "isolated", + payload: { kind: "agentTurn", message: "hi" }, + }); + const status = await cron.status(); + + expect(typeof job.state.nextRunAtMs).toBe("number"); + expect(typeof status.nextWakeAtMs).toBe("number"); + + cron.stop(); + await store.cleanup(); + }); + + it("persists allowUnsafeExternalContent on agentTurn payload patches", async () => { + const store = await makeStorePath(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }), + }); + await cron.start(); + + const created = await cron.add({ + name: "unsafe toggle", + schedule: { kind: "every", everyMs: 60_000, anchorMs: Date.now() }, + sessionTarget: "isolated", + payload: { kind: "agentTurn", message: "hi" }, + }); + + const updated = await cron.update(created.id, { + payload: { kind: "agentTurn", allowUnsafeExternalContent: true }, + }); + + expect(updated.payload.kind).toBe("agentTurn"); + if (updated.payload.kind === "agentTurn") { + expect(updated.payload.allowUnsafeExternalContent).toBe(true); + expect(updated.payload.message).toBe("hi"); + } + + cron.stop(); + await store.cleanup(); + }); + + it("caps timer delay to 60s for far-future schedules", async () => { + const timeoutSpy = vi.spyOn(globalThis, "setTimeout"); + const store = await makeStorePath(); + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }), + }); + await cron.start(); + + const callsBeforeAdd = timeoutSpy.mock.calls.length; + await cron.add({ + name: "far-future", + schedule: { kind: "at", at: "2035-01-01T00:00:00.000Z" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "future" }, + }); + + const delaysAfterAdd = timeoutSpy.mock.calls + .slice(callsBeforeAdd) + .map(([, delay]) => delay) + .filter((delay): delay is number => typeof delay === "number"); + expect(delaysAfterAdd.some((delay) => delay === 60_000)).toBe(true); + + cron.stop(); + timeoutSpy.mockRestore(); + await store.cleanup(); + }); + + it("does not hot-loop zero-delay timers while a run is already in progress", async () => { + const timeoutSpy = vi.spyOn(globalThis, "setTimeout"); + const store = await makeStorePath(); + const now = Date.parse("2026-02-06T10:05:00.000Z"); + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok", summary: "ok" }), + }); + state.running = true; + state.store = { + version: 1, + jobs: [createDueIsolatedJob({ id: "due", nowMs: now, nextRunAtMs: now - 1 })], + }; + + await onTimer(state); + + expect(timeoutSpy).not.toHaveBeenCalled(); + expect(state.timer).toBeNull(); + timeoutSpy.mockRestore(); + await store.cleanup(); + }); + + it("skips forced manual runs while a timer-triggered run is in progress", async () => { + vi.useRealTimers(); + const store = await makeStorePath(); + let resolveRun: + | ((value: { status: "ok" | "error" | "skipped"; summary?: string; error?: string }) => void) + | undefined; + const runIsolatedAgentJob = vi.fn( + async () => + await new Promise<{ status: "ok" | "error" | "skipped"; summary?: string; error?: string }>( + (resolve) => { + resolveRun = resolve; + }, + ), + ); + + const cron = new CronService({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob, + }); + await cron.start(); + + const runAt = Date.now() + 30; + const job = await cron.add({ + name: "timer-overlap", + enabled: true, + schedule: { kind: "at", at: new Date(runAt).toISOString() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "long task" }, + delivery: { mode: "none" }, + }); + + for (let i = 0; i < 25 && runIsolatedAgentJob.mock.calls.length === 0; i++) { + await delay(20); + } + expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); + + const manualResult = await cron.run(job.id, "force"); + expect(manualResult).toEqual({ ok: true, ran: false, reason: "already-running" }); + expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); + + resolveRun?.({ status: "ok", summary: "done" }); + for (let i = 0; i < 25; i++) { + const jobs = await cron.list({ includeDisabled: true }); + if (jobs.some((j) => j.id === job.id && j.state.lastStatus === "ok")) { + break; + } + await delay(20); + } + + cron.stop(); + await store.cleanup(); + }); + + it("records per-job start time and duration for batched due jobs", async () => { + const store = await makeStorePath(); + const dueAt = Date.parse("2026-02-06T10:05:01.000Z"); + const first = createDueIsolatedJob({ id: "batch-first", nowMs: dueAt, nextRunAtMs: dueAt }); + const second = createDueIsolatedJob({ id: "batch-second", nowMs: dueAt, nextRunAtMs: dueAt }); + await fs.writeFile( + store.storePath, + JSON.stringify({ version: 1, jobs: [first, second] }, null, 2), + "utf-8", + ); + + let now = dueAt; + const events: CronEvent[] = []; + const state = createCronServiceState({ + cronEnabled: true, + storePath: store.storePath, + log: noopLogger, + nowMs: () => now, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + onEvent: (evt) => { + events.push(evt); + }, + runIsolatedAgentJob: vi.fn(async (params: { job: { id: string } }) => { + now += params.job.id === first.id ? 50 : 20; + return { status: "ok" as const, summary: "ok" }; + }), + }); + + await onTimer(state); + + const jobs = state.store?.jobs ?? []; + const firstDone = jobs.find((job) => job.id === first.id); + const secondDone = jobs.find((job) => job.id === second.id); + const startedAtEvents = events + .filter((evt) => evt.action === "started") + .map((evt) => evt.runAtMs); + + expect(firstDone?.state.lastRunAtMs).toBe(dueAt); + expect(firstDone?.state.lastDurationMs).toBe(50); + expect(secondDone?.state.lastRunAtMs).toBe(dueAt + 50); + expect(secondDone?.state.lastDurationMs).toBe(20); + expect(startedAtEvents).toEqual([dueAt, dueAt + 50]); + + await store.cleanup(); + }); +}); diff --git a/src/cron/service.read-ops-nonblocking.test.ts b/src/cron/service.read-ops-nonblocking.test.ts new file mode 100644 index 00000000000..d0e73c87dd6 --- /dev/null +++ b/src/cron/service.read-ops-nonblocking.test.ts @@ -0,0 +1,104 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { setTimeout as delay } from "node:timers/promises"; +import { describe, expect, it, vi } from "vitest"; +import { CronService } from "./service.js"; + +const noopLogger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-")); + return { + storePath: path.join(dir, "cron", "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("CronService read ops while job is running", () => { + it("keeps list and status responsive during a long isolated run", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + let resolveRun: + | ((value: { status: "ok" | "error" | "skipped"; summary?: string; error?: string }) => void) + | undefined; + + const runIsolatedAgentJob = vi.fn( + async () => + await new Promise<{ status: "ok" | "error" | "skipped"; summary?: string; error?: string }>( + (resolve) => { + resolveRun = resolve; + }, + ), + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob, + }); + + await cron.start(); + + const runAt = Date.now() + 30; + await cron.add({ + name: "slow isolated", + enabled: true, + deleteAfterRun: false, + schedule: { kind: "at", at: new Date(runAt).toISOString() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "long task" }, + delivery: { mode: "none" }, + }); + + for (let i = 0; i < 25 && runIsolatedAgentJob.mock.calls.length === 0; i++) { + await delay(20); + } + + expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); + + const listRace = await Promise.race([ + cron.list({ includeDisabled: true }).then(() => "ok"), + delay(200).then(() => "timeout"), + ]); + expect(listRace).toBe("ok"); + + const statusRace = await Promise.race([ + cron.status().then(() => "ok"), + delay(200).then(() => "timeout"), + ]); + expect(statusRace).toBe("ok"); + + const running = await cron.list({ includeDisabled: true }); + expect(running[0]?.state.runningAtMs).toBeTypeOf("number"); + + resolveRun?.({ status: "ok", summary: "done" }); + + for (let i = 0; i < 25; i++) { + const jobs = await cron.list({ includeDisabled: true }); + if (jobs[0]?.state.lastStatus === "ok") { + break; + } + await delay(20); + } + + const finished = await cron.list({ includeDisabled: true }); + expect(finished[0]?.state.lastStatus).toBe("ok"); + + cron.stop(); + await store.cleanup(); + }); +}); diff --git a/src/cron/service.restart-catchup.test.ts b/src/cron/service.restart-catchup.test.ts new file mode 100644 index 00000000000..c8994eed19a --- /dev/null +++ b/src/cron/service.restart-catchup.test.ts @@ -0,0 +1,165 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { CronService } from "./service.js"; + +const noopLogger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-")); + return { + storePath: path.join(dir, "cron", "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("CronService restart catch-up", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2025-12-13T17:00:00.000Z")); + noopLogger.debug.mockClear(); + noopLogger.info.mockClear(); + noopLogger.warn.mockClear(); + noopLogger.error.mockClear(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("executes an overdue recurring job immediately on start", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + const dueAt = Date.parse("2025-12-13T15:00:00.000Z"); + const lastRunAt = Date.parse("2025-12-12T15:00:00.000Z"); + + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile( + store.storePath, + JSON.stringify( + { + version: 1, + jobs: [ + { + id: "restart-overdue-job", + name: "daily digest", + enabled: true, + createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), + updatedAtMs: Date.parse("2025-12-12T15:00:00.000Z"), + schedule: { kind: "cron", expr: "0 15 * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "digest now" }, + state: { + nextRunAtMs: dueAt, + lastRunAtMs: lastRunAt, + lastStatus: "ok", + }, + }, + ], + }, + null, + 2, + ), + "utf-8", + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + + expect(enqueueSystemEvent).toHaveBeenCalledWith("digest now", { agentId: undefined }); + expect(requestHeartbeatNow).toHaveBeenCalled(); + + const jobs = await cron.list({ includeDisabled: true }); + const updated = jobs.find((job) => job.id === "restart-overdue-job"); + expect(updated?.state.lastStatus).toBe("ok"); + expect(updated?.state.lastRunAtMs).toBe(Date.parse("2025-12-13T17:00:00.000Z")); + expect(updated?.state.nextRunAtMs).toBeGreaterThan(Date.parse("2025-12-13T17:00:00.000Z")); + + cron.stop(); + await store.cleanup(); + }); + + it("clears stale running markers and catches up overdue jobs on startup", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + + const dueAt = Date.parse("2025-12-13T16:00:00.000Z"); + const staleRunningAt = Date.parse("2025-12-13T16:30:00.000Z"); + + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile( + store.storePath, + JSON.stringify( + { + version: 1, + jobs: [ + { + id: "restart-stale-running", + name: "daily stale marker", + enabled: true, + createdAtMs: Date.parse("2025-12-10T12:00:00.000Z"), + updatedAtMs: Date.parse("2025-12-13T16:30:00.000Z"), + schedule: { kind: "cron", expr: "0 16 * * *", tz: "UTC" }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { kind: "systemEvent", text: "resume stale marker" }, + state: { + nextRunAtMs: dueAt, + runningAtMs: staleRunningAt, + }, + }, + ], + }, + null, + 2, + ), + "utf-8", + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + + expect(enqueueSystemEvent).toHaveBeenCalledWith("resume stale marker", { agentId: undefined }); + expect(noopLogger.warn).toHaveBeenCalledWith( + expect.objectContaining({ jobId: "restart-stale-running" }), + "cron: clearing stale running marker on startup", + ); + + const jobs = await cron.list({ includeDisabled: true }); + const updated = jobs.find((job) => job.id === "restart-stale-running"); + expect(updated?.state.runningAtMs).toBeUndefined(); + expect(updated?.state.lastStatus).toBe("ok"); + expect(updated?.state.lastRunAtMs).toBe(Date.parse("2025-12-13T17:00:00.000Z")); + + cron.stop(); + await store.cleanup(); + }); +}); diff --git a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts index e26e71cab70..1cc3eca03c1 100644 --- a/src/cron/service.runs-one-shot-main-job-disables-it.test.ts +++ b/src/cron/service.runs-one-shot-main-job-disables-it.test.ts @@ -3,6 +3,7 @@ import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; +import type { CronJob } from "./types.js"; import { CronService } from "./service.js"; const noopLogger = { @@ -22,6 +23,18 @@ async function makeStorePath() { }; } +async function waitForJobs(cron: CronService, predicate: (jobs: CronJob[]) => boolean) { + let latest: CronJob[] = []; + for (let i = 0; i < 30; i++) { + latest = await cron.list({ includeDisabled: true }); + if (predicate(latest)) { + return latest; + } + await vi.runOnlyPendingTimersAsync(); + } + return latest; +} + describe("CronService", () => { beforeEach(() => { vi.useFakeTimers(); @@ -67,7 +80,9 @@ describe("CronService", () => { vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z")); await vi.runOnlyPendingTimersAsync(); - const jobs = await cron.list({ includeDisabled: true }); + const jobs = await waitForJobs(cron, (items) => + items.some((item) => item.id === job.id && !item.enabled), + ); const updated = jobs.find((j) => j.id === job.id); expect(updated?.enabled).toBe(false); expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", { @@ -108,7 +123,7 @@ describe("CronService", () => { vi.setSystemTime(new Date("2025-12-13T00:00:02.000Z")); await vi.runOnlyPendingTimersAsync(); - const jobs = await cron.list({ includeDisabled: true }); + const jobs = await waitForJobs(cron, (items) => !items.some((item) => item.id === job.id)); expect(jobs.find((j) => j.id === job.id)).toBeUndefined(); expect(enqueueSystemEvent).toHaveBeenCalledWith("hello", { agentId: undefined, @@ -185,6 +200,49 @@ describe("CronService", () => { await store.cleanup(); }); + it("wakeMode now falls back to queued heartbeat when main lane stays busy", async () => { + const store = await makeStorePath(); + const enqueueSystemEvent = vi.fn(); + const requestHeartbeatNow = vi.fn(); + const runHeartbeatOnce = vi.fn(async () => ({ + status: "skipped" as const, + reason: "requests-in-flight", + })); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runHeartbeatOnce, + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + const job = await cron.add({ + name: "wakeMode now fallback", + enabled: true, + schedule: { kind: "at", at: new Date(1).toISOString() }, + sessionTarget: "main", + wakeMode: "now", + payload: { kind: "systemEvent", text: "hello" }, + }); + + const runPromise = cron.run(job.id, "force"); + await vi.advanceTimersByTimeAsync(125_000); + await runPromise; + + expect(runHeartbeatOnce).toHaveBeenCalled(); + expect(requestHeartbeatNow).toHaveBeenCalled(); + expect(job.state.lastStatus).toBe("ok"); + expect(job.state.lastError).toBeUndefined(); + + await cron.list({ includeDisabled: true }); + cron.stop(); + await store.cleanup(); + }); + it("runs an isolated job and posts summary to main", async () => { const store = await makeStorePath(); const enqueueSystemEvent = vi.fn(); @@ -218,7 +276,7 @@ describe("CronService", () => { vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); await vi.runOnlyPendingTimersAsync(); - await cron.list({ includeDisabled: true }); + await waitForJobs(cron, (items) => items.some((item) => item.state.lastStatus === "ok")); expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1); expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron: done", { agentId: undefined, @@ -366,7 +424,7 @@ describe("CronService", () => { vi.setSystemTime(new Date("2025-12-13T00:00:01.000Z")); await vi.runOnlyPendingTimersAsync(); - await cron.list({ includeDisabled: true }); + await waitForJobs(cron, (items) => items.some((item) => item.state.lastStatus === "error")); expect(enqueueSystemEvent).toHaveBeenCalledWith("Cron (error): last output", { agentId: undefined, @@ -460,7 +518,7 @@ describe("CronService", () => { expect(enqueueSystemEvent).not.toHaveBeenCalled(); expect(requestHeartbeatNow).not.toHaveBeenCalled(); - const jobs = await cron.list({ includeDisabled: true }); + const jobs = await waitForJobs(cron, (items) => items[0]?.state.lastStatus === "skipped"); expect(jobs[0]?.state.lastStatus).toBe("skipped"); expect(jobs[0]?.state.lastError).toMatch(/main job requires/i); diff --git a/src/cron/service.skips-main-jobs-empty-systemevent-text.test.ts b/src/cron/service.skips-main-jobs-empty-systemevent-text.test.ts index d25edfb8a71..4bbc07afc8f 100644 --- a/src/cron/service.skips-main-jobs-empty-systemevent-text.test.ts +++ b/src/cron/service.skips-main-jobs-empty-systemevent-text.test.ts @@ -2,6 +2,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { CronJob } from "./types.js"; import { CronService } from "./service.js"; const noopLogger = { @@ -21,6 +22,22 @@ async function makeStorePath() { }; } +async function waitForFirstJob( + cron: CronService, + predicate: (job: CronJob | undefined) => boolean, +) { + let latest: CronJob | undefined; + for (let i = 0; i < 30; i++) { + const jobs = await cron.list({ includeDisabled: true }); + latest = jobs[0]; + if (predicate(latest)) { + return latest; + } + await vi.runOnlyPendingTimersAsync(); + } + return latest; +} + describe("CronService", () => { beforeEach(() => { vi.useFakeTimers(); @@ -66,9 +83,9 @@ describe("CronService", () => { expect(enqueueSystemEvent).not.toHaveBeenCalled(); expect(requestHeartbeatNow).not.toHaveBeenCalled(); - const jobs = await cron.list({ includeDisabled: true }); - expect(jobs[0]?.state.lastStatus).toBe("skipped"); - expect(jobs[0]?.state.lastError).toMatch(/non-empty/i); + const job = await waitForFirstJob(cron, (current) => current?.state.lastStatus === "skipped"); + expect(job?.state.lastStatus).toBe("skipped"); + expect(job?.state.lastError).toMatch(/non-empty/i); cron.stop(); await store.cleanup(); diff --git a/src/cron/service.store-migration.test.ts b/src/cron/service.store-migration.test.ts new file mode 100644 index 00000000000..ed3b25e6907 --- /dev/null +++ b/src/cron/service.store-migration.test.ts @@ -0,0 +1,124 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { CronService } from "./service.js"; + +const noopLogger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-")); + return { + storePath: path.join(dir, "cron", "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("CronService store migrations", () => { + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-02-06T17:00:00.000Z")); + noopLogger.debug.mockClear(); + noopLogger.info.mockClear(); + noopLogger.warn.mockClear(); + noopLogger.error.mockClear(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("migrates legacy top-level agentTurn fields and initializes missing state", async () => { + const store = await makeStorePath(); + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile( + store.storePath, + JSON.stringify( + { + version: 1, + jobs: [ + { + id: "legacy-agentturn-job", + name: "legacy agentturn", + enabled: true, + createdAtMs: Date.parse("2026-02-01T12:00:00.000Z"), + updatedAtMs: Date.parse("2026-02-05T12:00:00.000Z"), + schedule: { kind: "cron", expr: "0 23 * * *", tz: "UTC" }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + model: "openrouter/deepseek/deepseek-r1", + thinking: "high", + timeoutSeconds: 120, + allowUnsafeExternalContent: true, + deliver: true, + channel: "telegram", + to: "12345", + bestEffortDeliver: true, + payload: { kind: "agentTurn", message: "legacy payload fields" }, + }, + ], + }, + null, + 2, + ), + "utf-8", + ); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok", summary: "ok" })), + }); + + await cron.start(); + + const status = await cron.status(); + expect(status.enabled).toBe(true); + + const jobs = await cron.list({ includeDisabled: true }); + const job = jobs.find((entry) => entry.id === "legacy-agentturn-job"); + expect(job).toBeDefined(); + expect(job?.state).toBeDefined(); + expect(job?.sessionTarget).toBe("isolated"); + expect(job?.payload.kind).toBe("agentTurn"); + if (job?.payload.kind === "agentTurn") { + expect(job.payload.model).toBe("openrouter/deepseek/deepseek-r1"); + expect(job.payload.thinking).toBe("high"); + expect(job.payload.timeoutSeconds).toBe(120); + expect(job.payload.allowUnsafeExternalContent).toBe(true); + } + expect(job?.delivery).toEqual({ + mode: "announce", + channel: "telegram", + to: "12345", + bestEffort: true, + }); + + const persisted = JSON.parse(await fs.readFile(store.storePath, "utf-8")) as { + jobs: Array>; + }; + const persistedJob = persisted.jobs.find((entry) => entry.id === "legacy-agentturn-job"); + expect(persistedJob).toBeDefined(); + expect(persistedJob?.state).toEqual(expect.any(Object)); + expect(persistedJob?.model).toBeUndefined(); + expect(persistedJob?.thinking).toBeUndefined(); + expect(persistedJob?.timeoutSeconds).toBeUndefined(); + expect(persistedJob?.deliver).toBeUndefined(); + expect(persistedJob?.channel).toBeUndefined(); + expect(persistedJob?.to).toBeUndefined(); + expect(persistedJob?.bestEffortDeliver).toBeUndefined(); + + cron.stop(); + await store.cleanup(); + }); +}); diff --git a/src/cron/service.store.migration.test.ts b/src/cron/service.store.migration.test.ts index 6e0734b15b4..3054a634e52 100644 --- a/src/cron/service.store.migration.test.ts +++ b/src/cron/service.store.migration.test.ts @@ -98,4 +98,49 @@ describe("cron store migration", () => { await store.cleanup(); }); + + it("adds anchorMs to legacy every schedules", async () => { + const store = await makeStorePath(); + const createdAtMs = 1_700_000_000_000; + const legacyJob = { + id: "job-every-legacy", + agentId: undefined, + name: "Legacy every", + description: null, + enabled: true, + deleteAfterRun: false, + createdAtMs, + updatedAtMs: createdAtMs, + schedule: { kind: "every", everyMs: 120_000 }, + sessionTarget: "main", + wakeMode: "next-heartbeat", + payload: { + kind: "systemEvent", + text: "tick", + }, + state: {}, + }; + await fs.mkdir(path.dirname(store.storePath), { recursive: true }); + await fs.writeFile(store.storePath, JSON.stringify({ version: 1, jobs: [legacyJob] }, null, 2)); + + const cron = new CronService({ + storePath: store.storePath, + cronEnabled: true, + log: noopLogger, + enqueueSystemEvent: vi.fn(), + requestHeartbeatNow: vi.fn(), + runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })), + }); + + await cron.start(); + cron.stop(); + + const loaded = await loadCronStore(store.storePath); + const migrated = loaded.jobs[0] as Record; + const schedule = migrated.schedule as Record; + expect(schedule.kind).toBe("every"); + expect(schedule.anchorMs).toBe(createdAtMs); + + await store.cleanup(); + }); }); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index a01475224a7..fbd96d34d94 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -20,6 +20,17 @@ import { const STUCK_RUN_MS = 2 * 60 * 60 * 1000; +function resolveEveryAnchorMs(params: { + schedule: { everyMs: number; anchorMs?: number }; + fallbackAnchorMs: number; +}) { + const raw = params.schedule.anchorMs; + if (typeof raw === "number" && Number.isFinite(raw)) { + return Math.max(0, Math.floor(raw)); + } + return Math.max(0, Math.floor(params.fallbackAnchorMs)); +} + export function assertSupportedJobSpec(job: Pick) { if (job.sessionTarget === "main" && job.payload.kind !== "systemEvent") { throw new Error('main cron jobs require payload.kind="systemEvent"'); @@ -47,6 +58,13 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und if (!job.enabled) { return undefined; } + if (job.schedule.kind === "every") { + const anchorMs = resolveEveryAnchorMs({ + schedule: job.schedule, + fallbackAnchorMs: job.createdAtMs, + }); + return computeNextRunAtMs({ ...job.schedule, anchorMs }, nowMs); + } if (job.schedule.kind === "at") { // One-shot jobs stay due until they successfully finish. if (job.state.lastStatus === "ok" && job.state.lastRunAtMs) { @@ -69,18 +87,26 @@ export function computeJobNextRunAtMs(job: CronJob, nowMs: number): number | und return computeNextRunAtMs(job.schedule, nowMs); } -export function recomputeNextRuns(state: CronServiceState) { +export function recomputeNextRuns(state: CronServiceState): boolean { if (!state.store) { - return; + return false; } + let changed = false; const now = state.deps.nowMs(); for (const job of state.store.jobs) { if (!job.state) { job.state = {}; + changed = true; } if (!job.enabled) { - job.state.nextRunAtMs = undefined; - job.state.runningAtMs = undefined; + if (job.state.nextRunAtMs !== undefined) { + job.state.nextRunAtMs = undefined; + changed = true; + } + if (job.state.runningAtMs !== undefined) { + job.state.runningAtMs = undefined; + changed = true; + } continue; } const runningAt = job.state.runningAtMs; @@ -90,9 +116,15 @@ export function recomputeNextRuns(state: CronServiceState) { "cron: clearing stuck running marker", ); job.state.runningAtMs = undefined; + changed = true; + } + const newNext = computeJobNextRunAtMs(job, now); + if (job.state.nextRunAtMs !== newNext) { + job.state.nextRunAtMs = newNext; + changed = true; } - job.state.nextRunAtMs = computeJobNextRunAtMs(job, now); } + return changed; } export function nextWakeAtMs(state: CronServiceState) { @@ -110,10 +142,20 @@ export function nextWakeAtMs(state: CronServiceState) { export function createJob(state: CronServiceState, input: CronJobCreate): CronJob { const now = state.deps.nowMs(); const id = crypto.randomUUID(); + const schedule = + input.schedule.kind === "every" + ? { + ...input.schedule, + anchorMs: resolveEveryAnchorMs({ + schedule: input.schedule, + fallbackAnchorMs: now, + }), + } + : input.schedule; const deleteAfterRun = typeof input.deleteAfterRun === "boolean" ? input.deleteAfterRun - : input.schedule.kind === "at" + : schedule.kind === "at" ? true : undefined; const enabled = typeof input.enabled === "boolean" ? input.enabled : true; @@ -126,7 +168,7 @@ export function createJob(state: CronServiceState, input: CronJobCreate): CronJo deleteAfterRun, createdAtMs: now, updatedAtMs: now, - schedule: input.schedule, + schedule, sessionTarget: input.sessionTarget, wakeMode: input.wakeMode, payload: input.payload, @@ -223,6 +265,9 @@ function mergeCronPayload(existing: CronPayload, patch: CronPayloadPatch): CronP if (typeof patch.timeoutSeconds === "number") { next.timeoutSeconds = patch.timeoutSeconds; } + if (typeof patch.allowUnsafeExternalContent === "boolean") { + next.allowUnsafeExternalContent = patch.allowUnsafeExternalContent; + } if (typeof patch.deliver === "boolean") { next.deliver = patch.deliver; } @@ -297,6 +342,7 @@ function buildPayloadFromPatch(patch: CronPayloadPatch): CronPayload { model: patch.model, thinking: patch.thinking, timeoutSeconds: patch.timeoutSeconds, + allowUnsafeExternalContent: patch.allowUnsafeExternalContent, deliver: patch.deliver, channel: patch.channel, to: patch.to, @@ -334,6 +380,9 @@ function mergeCronDelivery( } export function isJobDue(job: CronJob, nowMs: number, opts: { forced: boolean }) { + if (typeof job.state.runningAtMs === "number") { + return false; + } if (opts.forced) { return true; } diff --git a/src/cron/service/ops.ts b/src/cron/service/ops.ts index d1459765692..545261e9731 100644 --- a/src/cron/service/ops.ts +++ b/src/cron/service/ops.ts @@ -11,7 +11,7 @@ import { } from "./jobs.js"; import { locked } from "./locked.js"; import { ensureLoaded, persist, warnIfDisabled } from "./store.js"; -import { armTimer, emit, executeJob, stopTimer, wake } from "./timer.js"; +import { armTimer, emit, executeJob, runMissedJobs, stopTimer, wake } from "./timer.js"; export async function start(state: CronServiceState) { await locked(state, async () => { @@ -19,7 +19,18 @@ export async function start(state: CronServiceState) { state.deps.log.info({ enabled: false }, "cron: disabled"); return; } - await ensureLoaded(state); + await ensureLoaded(state, { skipRecompute: true }); + const jobs = state.store?.jobs ?? []; + for (const job of jobs) { + if (typeof job.state.runningAtMs === "number") { + state.deps.log.warn( + { jobId: job.id, runningAtMs: job.state.runningAtMs }, + "cron: clearing stale running marker on startup", + ); + job.state.runningAtMs = undefined; + } + } + await runMissedJobs(state); recomputeNextRuns(state); await persist(state); armTimer(state); @@ -40,7 +51,7 @@ export function stop(state: CronServiceState) { export async function status(state: CronServiceState) { return await locked(state, async () => { - await ensureLoaded(state); + await ensureLoaded(state, { skipRecompute: true }); return { enabled: state.deps.cronEnabled, storePath: state.deps.storePath, @@ -52,7 +63,7 @@ export async function status(state: CronServiceState) { export async function list(state: CronServiceState, opts?: { includeDisabled?: boolean }) { return await locked(state, async () => { - await ensureLoaded(state); + await ensureLoaded(state, { skipRecompute: true }); const includeDisabled = opts?.includeDisabled === true; const jobs = (state.store?.jobs ?? []).filter((j) => includeDisabled || j.enabled); return jobs.toSorted((a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0)); @@ -83,6 +94,22 @@ export async function update(state: CronServiceState, id: string, patch: CronJob const job = findJobOrThrow(state, id); const now = state.deps.nowMs(); applyJobPatch(job, patch); + if (job.schedule.kind === "every") { + const anchor = job.schedule.anchorMs; + if (typeof anchor !== "number" || !Number.isFinite(anchor)) { + const patchSchedule = patch.schedule; + const fallbackAnchorMs = + patchSchedule?.kind === "every" + ? now + : typeof job.createdAtMs === "number" && Number.isFinite(job.createdAtMs) + ? job.createdAtMs + : now; + job.schedule = { + ...job.schedule, + anchorMs: Math.max(0, Math.floor(fallbackAnchorMs)), + }; + } + } job.updatedAtMs = now; if (job.enabled) { job.state.nextRunAtMs = computeJobNextRunAtMs(job, now); @@ -124,14 +151,18 @@ export async function remove(state: CronServiceState, id: string) { export async function run(state: CronServiceState, id: string, mode?: "due" | "force") { return await locked(state, async () => { warnIfDisabled(state, "run"); - await ensureLoaded(state); + await ensureLoaded(state, { skipRecompute: true }); const job = findJobOrThrow(state, id); + if (typeof job.state.runningAtMs === "number") { + return { ok: true, ran: false, reason: "already-running" as const }; + } const now = state.deps.nowMs(); const due = isJobDue(job, now, { forced: mode === "force" }); if (!due) { return { ok: true, ran: false, reason: "not-due" as const }; } await executeJob(state, job, now, { forced: mode === "force" }); + recomputeNextRuns(state); await persist(state); armTimer(state); return { ok: true, ran: true } as const; diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 64fd9cc9e05..0847989b3d5 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -9,6 +9,8 @@ export type CronEvent = { status?: "ok" | "error" | "skipped"; error?: string; summary?: string; + sessionId?: string; + sessionKey?: string; nextRunAtMs?: number; }; @@ -33,6 +35,8 @@ export type CronServiceDeps = { /** Last non-empty agent text output (not truncated). */ outputText?: string; error?: string; + sessionId?: string; + sessionKey?: string; }>; onEvent?: (evt: CronEvent) => void; }; @@ -78,6 +82,7 @@ export type CronStatusSummary = { export type CronRunResult = | { ok: true; ran: true } | { ok: true; ran: false; reason: "not-due" } + | { ok: true; ran: false; reason: "already-running" } | { ok: false }; export type CronRemoveResult = { ok: true; removed: boolean } | { ok: false; removed: false }; diff --git a/src/cron/service/store.ts b/src/cron/service/store.ts index 51aca416578..3da848f3e38 100644 --- a/src/cron/service/store.ts +++ b/src/cron/service/store.ts @@ -117,6 +117,141 @@ function stripLegacyDeliveryFields(payload: Record) { } } +function normalizePayloadKind(payload: Record) { + const raw = typeof payload.kind === "string" ? payload.kind.trim().toLowerCase() : ""; + if (raw === "agentturn") { + payload.kind = "agentTurn"; + return true; + } + if (raw === "systemevent") { + payload.kind = "systemEvent"; + return true; + } + return false; +} + +function inferPayloadIfMissing(raw: Record) { + const message = typeof raw.message === "string" ? raw.message.trim() : ""; + const text = typeof raw.text === "string" ? raw.text.trim() : ""; + if (message) { + raw.payload = { kind: "agentTurn", message }; + return true; + } + if (text) { + raw.payload = { kind: "systemEvent", text }; + return true; + } + return false; +} + +function copyTopLevelAgentTurnFields( + raw: Record, + payload: Record, +) { + let mutated = false; + + const copyTrimmedString = (field: "model" | "thinking") => { + const existing = payload[field]; + if (typeof existing === "string" && existing.trim()) { + return; + } + const value = raw[field]; + if (typeof value === "string" && value.trim()) { + payload[field] = value.trim(); + mutated = true; + } + }; + copyTrimmedString("model"); + copyTrimmedString("thinking"); + + if ( + typeof payload.timeoutSeconds !== "number" && + typeof raw.timeoutSeconds === "number" && + Number.isFinite(raw.timeoutSeconds) + ) { + payload.timeoutSeconds = Math.max(1, Math.floor(raw.timeoutSeconds)); + mutated = true; + } + + if ( + typeof payload.allowUnsafeExternalContent !== "boolean" && + typeof raw.allowUnsafeExternalContent === "boolean" + ) { + payload.allowUnsafeExternalContent = raw.allowUnsafeExternalContent; + mutated = true; + } + + if (typeof payload.deliver !== "boolean" && typeof raw.deliver === "boolean") { + payload.deliver = raw.deliver; + mutated = true; + } + if ( + typeof payload.channel !== "string" && + typeof raw.channel === "string" && + raw.channel.trim() + ) { + payload.channel = raw.channel.trim(); + mutated = true; + } + if (typeof payload.to !== "string" && typeof raw.to === "string" && raw.to.trim()) { + payload.to = raw.to.trim(); + mutated = true; + } + if ( + typeof payload.bestEffortDeliver !== "boolean" && + typeof raw.bestEffortDeliver === "boolean" + ) { + payload.bestEffortDeliver = raw.bestEffortDeliver; + mutated = true; + } + if ( + typeof payload.provider !== "string" && + typeof raw.provider === "string" && + raw.provider.trim() + ) { + payload.provider = raw.provider.trim(); + mutated = true; + } + + return mutated; +} + +function stripLegacyTopLevelFields(raw: Record) { + if ("model" in raw) { + delete raw.model; + } + if ("thinking" in raw) { + delete raw.thinking; + } + if ("timeoutSeconds" in raw) { + delete raw.timeoutSeconds; + } + if ("allowUnsafeExternalContent" in raw) { + delete raw.allowUnsafeExternalContent; + } + if ("message" in raw) { + delete raw.message; + } + if ("text" in raw) { + delete raw.text; + } + if ("deliver" in raw) { + delete raw.deliver; + } + if ("channel" in raw) { + delete raw.channel; + } + if ("to" in raw) { + delete raw.to; + } + if ("bestEffortDeliver" in raw) { + delete raw.bestEffortDeliver; + } + if ("provider" in raw) { + delete raw.provider; + } +} + async function getFileMtimeMs(path: string): Promise { try { const stats = await fs.promises.stat(path); @@ -148,6 +283,12 @@ export async function ensureLoaded( const jobs = (loaded.jobs ?? []) as unknown as Array>; let mutated = false; for (const raw of jobs) { + const state = raw.state; + if (!state || typeof state !== "object" || Array.isArray(state)) { + raw.state = {}; + mutated = true; + } + const nameRaw = raw.name; if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) { raw.name = inferLegacyName({ @@ -171,8 +312,57 @@ export async function ensureLoaded( } const payload = raw.payload; - if (payload && typeof payload === "object" && !Array.isArray(payload)) { - if (migrateLegacyCronPayload(payload as Record)) { + if ( + (!payload || typeof payload !== "object" || Array.isArray(payload)) && + inferPayloadIfMissing(raw) + ) { + mutated = true; + } + + const payloadRecord = + raw.payload && typeof raw.payload === "object" && !Array.isArray(raw.payload) + ? (raw.payload as Record) + : null; + + if (payloadRecord) { + if (normalizePayloadKind(payloadRecord)) { + mutated = true; + } + if (!payloadRecord.kind) { + if (typeof payloadRecord.message === "string" && payloadRecord.message.trim()) { + payloadRecord.kind = "agentTurn"; + mutated = true; + } else if (typeof payloadRecord.text === "string" && payloadRecord.text.trim()) { + payloadRecord.kind = "systemEvent"; + mutated = true; + } + } + if (payloadRecord.kind === "agentTurn") { + if (copyTopLevelAgentTurnFields(raw, payloadRecord)) { + mutated = true; + } + } + } + + const hadLegacyTopLevelFields = + "model" in raw || + "thinking" in raw || + "timeoutSeconds" in raw || + "allowUnsafeExternalContent" in raw || + "message" in raw || + "text" in raw || + "deliver" in raw || + "channel" in raw || + "to" in raw || + "bestEffortDeliver" in raw || + "provider" in raw; + if (hadLegacyTopLevelFields) { + stripLegacyTopLevelFields(raw); + mutated = true; + } + + if (payloadRecord) { + if (migrateLegacyCronPayload(payloadRecord)) { mutated = true; } } @@ -202,6 +392,27 @@ export async function ensureLoaded( } mutated = true; } + + const everyMsRaw = sched.everyMs; + const everyMs = + typeof everyMsRaw === "number" && Number.isFinite(everyMsRaw) + ? Math.floor(everyMsRaw) + : null; + if ((kind === "every" || sched.kind === "every") && everyMs !== null) { + const anchorRaw = sched.anchorMs; + const normalizedAnchor = + typeof anchorRaw === "number" && Number.isFinite(anchorRaw) + ? Math.max(0, Math.floor(anchorRaw)) + : typeof raw.createdAtMs === "number" && Number.isFinite(raw.createdAtMs) + ? Math.max(0, Math.floor(raw.createdAtMs)) + : typeof raw.updatedAtMs === "number" && Number.isFinite(raw.updatedAtMs) + ? Math.max(0, Math.floor(raw.updatedAtMs)) + : null; + if (normalizedAnchor !== null && anchorRaw !== normalizedAnchor) { + sched.anchorMs = normalizedAnchor; + mutated = true; + } + } } const delivery = raw.delivery; @@ -213,6 +424,11 @@ export async function ensureLoaded( (delivery as { mode?: unknown }).mode = "announce"; mutated = true; } + } else if (modeRaw === undefined || modeRaw === null) { + // Explicitly persist the default so existing jobs don't silently + // change behaviour when the runtime default shifts. + (delivery as { mode?: unknown }).mode = "announce"; + mutated = true; } } @@ -222,10 +438,6 @@ export async function ensureLoaded( mutated = true; } - const payloadRecord = - payload && typeof payload === "object" && !Array.isArray(payload) - ? (payload as Record) - : null; const payloadKind = payloadRecord && typeof payloadRecord.kind === "string" ? payloadRecord.kind : ""; const sessionTarget = diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index 8af4f9bc367..8e9bfb2d585 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -1,6 +1,7 @@ import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js"; import type { CronJob } from "../types.js"; import type { CronEvent, CronServiceState } from "./state.js"; +import { resolveCronDeliveryPlan } from "../delivery.js"; import { computeJobNextRunAtMs, nextWakeAtMs, @@ -10,7 +11,7 @@ import { import { locked } from "./locked.js"; import { ensureLoaded, persist } from "./store.js"; -const MAX_TIMEOUT_MS = 2 ** 31 - 1; +const MAX_TIMER_DELAY_MS = 60_000; export function armTimer(state: CronServiceState) { if (state.timer) { @@ -25,12 +26,15 @@ export function armTimer(state: CronServiceState) { return; } const delay = Math.max(nextAt - state.deps.nowMs(), 0); - // Avoid TimeoutOverflowWarning when a job is far in the future. - const clampedDelay = Math.min(delay, MAX_TIMEOUT_MS); - state.timer = setTimeout(() => { - void onTimer(state).catch((err) => { + // Wake at least once a minute to avoid schedule drift and recover quickly + // when the process was paused or wall-clock time jumps. + const clampedDelay = Math.min(delay, MAX_TIMER_DELAY_MS); + state.timer = setTimeout(async () => { + try { + await onTimer(state); + } catch (err) { state.deps.log.error({ err: String(err) }, "cron: timer tick failed"); - }); + } }, clampedDelay); } @@ -40,22 +44,169 @@ export async function onTimer(state: CronServiceState) { } state.running = true; try { - await locked(state, async () => { - // Reload persisted due-times without recomputing so runDueJobs sees - // the original nextRunAtMs values. Recomputing first would advance - // every/cron slots past the current tick when the timer fires late (#9788). + const dueJobs = await locked(state, async () => { await ensureLoaded(state, { forceReload: true, skipRecompute: true }); - await runDueJobs(state); - recomputeNextRuns(state); + const due = findDueJobs(state); + + if (due.length === 0) { + const changed = recomputeNextRuns(state); + if (changed) { + await persist(state); + } + return []; + } + + const now = state.deps.nowMs(); + for (const job of due) { + job.state.runningAtMs = now; + job.state.lastError = undefined; + } await persist(state); + + return due.map((j) => ({ + id: j.id, + job: j, + })); }); + + const results: Array<{ + jobId: string; + status: "ok" | "error" | "skipped"; + error?: string; + summary?: string; + sessionId?: string; + sessionKey?: string; + startedAt: number; + endedAt: number; + }> = []; + + for (const { id, job } of dueJobs) { + const startedAt = state.deps.nowMs(); + job.state.runningAtMs = startedAt; + emit(state, { jobId: job.id, action: "started", runAtMs: startedAt }); + try { + const result = await executeJobCore(state, job); + results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() }); + } catch (err) { + results.push({ + jobId: id, + status: "error", + error: String(err), + startedAt, + endedAt: state.deps.nowMs(), + }); + } + } + + if (results.length > 0) { + await locked(state, async () => { + await ensureLoaded(state, { forceReload: true, skipRecompute: true }); + + for (const result of results) { + const job = state.store?.jobs.find((j) => j.id === result.jobId); + if (!job) { + continue; + } + + const startedAt = result.startedAt; + job.state.runningAtMs = undefined; + job.state.lastRunAtMs = startedAt; + job.state.lastStatus = result.status; + job.state.lastDurationMs = Math.max(0, result.endedAt - startedAt); + job.state.lastError = result.error; + + const shouldDelete = + job.schedule.kind === "at" && result.status === "ok" && job.deleteAfterRun === true; + + if (!shouldDelete) { + if (job.schedule.kind === "at" && result.status === "ok") { + job.enabled = false; + job.state.nextRunAtMs = undefined; + } else if (job.enabled) { + job.state.nextRunAtMs = computeJobNextRunAtMs(job, result.endedAt); + } else { + job.state.nextRunAtMs = undefined; + } + } + + emit(state, { + jobId: job.id, + action: "finished", + status: result.status, + error: result.error, + summary: result.summary, + sessionId: result.sessionId, + sessionKey: result.sessionKey, + runAtMs: startedAt, + durationMs: job.state.lastDurationMs, + nextRunAtMs: job.state.nextRunAtMs, + }); + + if (shouldDelete && state.store) { + state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id); + emit(state, { jobId: job.id, action: "removed" }); + } + + job.updatedAtMs = result.endedAt; + } + + recomputeNextRuns(state); + await persist(state); + }); + } } finally { state.running = false; - // Always re-arm so transient errors (e.g. ENOSPC) don't kill the scheduler. armTimer(state); } } +function findDueJobs(state: CronServiceState): CronJob[] { + if (!state.store) { + return []; + } + const now = state.deps.nowMs(); + return state.store.jobs.filter((j) => { + if (!j.enabled) { + return false; + } + if (typeof j.state.runningAtMs === "number") { + return false; + } + const next = j.state.nextRunAtMs; + return typeof next === "number" && now >= next; + }); +} + +export async function runMissedJobs(state: CronServiceState) { + if (!state.store) { + return; + } + const now = state.deps.nowMs(); + const missed = state.store.jobs.filter((j) => { + if (!j.enabled) { + return false; + } + if (typeof j.state.runningAtMs === "number") { + return false; + } + const next = j.state.nextRunAtMs; + if (j.schedule.kind === "at" && j.state.lastStatus === "ok") { + return false; + } + return typeof next === "number" && now >= next; + }); + + if (missed.length > 0) { + state.deps.log.info( + { count: missed.length, jobIds: missed.map((j) => j.id) }, + "cron: running missed jobs after restart", + ); + for (const job of missed) { + await executeJob(state, job, now, { forced: false }); + } + } +} + export async function runDueJobs(state: CronServiceState) { if (!state.store) { return; @@ -76,6 +227,99 @@ export async function runDueJobs(state: CronServiceState) { } } +async function executeJobCore( + state: CronServiceState, + job: CronJob, +): Promise<{ + status: "ok" | "error" | "skipped"; + error?: string; + summary?: string; + sessionId?: string; + sessionKey?: string; +}> { + if (job.sessionTarget === "main") { + const text = resolveJobPayloadTextForMain(job); + if (!text) { + const kind = job.payload.kind; + return { + status: "skipped", + error: + kind === "systemEvent" + ? "main job requires non-empty systemEvent text" + : 'main job requires payload.kind="systemEvent"', + }; + } + state.deps.enqueueSystemEvent(text, { agentId: job.agentId }); + if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) { + const reason = `cron:${job.id}`; + const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + const maxWaitMs = 2 * 60_000; + const waitStartedAt = state.deps.nowMs(); + + let heartbeatResult: HeartbeatRunResult; + for (;;) { + heartbeatResult = await state.deps.runHeartbeatOnce({ reason }); + if ( + heartbeatResult.status !== "skipped" || + heartbeatResult.reason !== "requests-in-flight" + ) { + break; + } + if (state.deps.nowMs() - waitStartedAt > maxWaitMs) { + state.deps.requestHeartbeatNow({ reason }); + return { status: "ok", summary: text }; + } + await delay(250); + } + + if (heartbeatResult.status === "ran") { + return { status: "ok", summary: text }; + } else if (heartbeatResult.status === "skipped") { + return { status: "skipped", error: heartbeatResult.reason, summary: text }; + } else { + return { status: "error", error: heartbeatResult.reason, summary: text }; + } + } else { + state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); + return { status: "ok", summary: text }; + } + } + + if (job.payload.kind !== "agentTurn") { + return { status: "skipped", error: "isolated job requires payload.kind=agentTurn" }; + } + + const res = await state.deps.runIsolatedAgentJob({ + job, + message: job.payload.message, + }); + + // Post a short summary back to the main session. + const summaryText = res.summary?.trim(); + const deliveryPlan = resolveCronDeliveryPlan(job); + if (summaryText && deliveryPlan.requested) { + const prefix = "Cron"; + const label = + res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`; + state.deps.enqueueSystemEvent(label, { agentId: job.agentId }); + if (job.wakeMode === "now") { + state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); + } + } + + return { + status: res.status, + error: res.error, + summary: res.summary, + sessionId: res.sessionId, + sessionKey: res.sessionKey, + }; +} + +/** + * Execute a job. This version is used by the `run` command and other + * places that need the full execution with state updates. + */ export async function executeJob( state: CronServiceState, job: CronJob, @@ -89,7 +333,12 @@ export async function executeJob( let deleted = false; - const finish = async (status: "ok" | "error" | "skipped", err?: string, summary?: string) => { + const finish = async ( + status: "ok" | "error" | "skipped", + err?: string, + summary?: string, + session?: { sessionId?: string; sessionKey?: string }, + ) => { const endedAt = state.deps.nowMs(); job.state.runningAtMs = undefined; job.state.lastRunAtMs = startedAt; @@ -102,7 +351,6 @@ export async function executeJob( if (!shouldDelete) { if (job.schedule.kind === "at" && status === "ok") { - // One-shot job completed successfully; disable it. job.enabled = false; job.state.nextRunAtMs = undefined; } else if (job.enabled) { @@ -118,6 +366,8 @@ export async function executeJob( status, error: err, summary, + sessionId: session?.sessionId, + sessionKey: session?.sessionKey, runAtMs: startedAt, durationMs: job.state.lastDurationMs, nextRunAtMs: job.state.nextRunAtMs, @@ -131,96 +381,16 @@ export async function executeJob( }; try { - if (job.sessionTarget === "main") { - const text = resolveJobPayloadTextForMain(job); - if (!text) { - const kind = job.payload.kind; - await finish( - "skipped", - kind === "systemEvent" - ? "main job requires non-empty systemEvent text" - : 'main job requires payload.kind="systemEvent"', - ); - return; - } - state.deps.enqueueSystemEvent(text, { agentId: job.agentId }); - if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) { - const reason = `cron:${job.id}`; - const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); - const maxWaitMs = 2 * 60_000; - const waitStartedAt = state.deps.nowMs(); - - let heartbeatResult: HeartbeatRunResult; - for (;;) { - heartbeatResult = await state.deps.runHeartbeatOnce({ reason }); - if ( - heartbeatResult.status !== "skipped" || - heartbeatResult.reason !== "requests-in-flight" - ) { - break; - } - if (state.deps.nowMs() - waitStartedAt > maxWaitMs) { - heartbeatResult = { - status: "skipped", - reason: "timeout waiting for main lane to become idle", - }; - break; - } - await delay(250); - } - - if (heartbeatResult.status === "ran") { - await finish("ok", undefined, text); - } else if (heartbeatResult.status === "skipped") { - await finish("skipped", heartbeatResult.reason, text); - } else { - await finish("error", heartbeatResult.reason, text); - } - } else { - // wakeMode is "next-heartbeat" or runHeartbeatOnce not available - state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); - await finish("ok", undefined, text); - } - return; - } - - if (job.payload.kind !== "agentTurn") { - await finish("skipped", "isolated job requires payload.kind=agentTurn"); - return; - } - - const res = await state.deps.runIsolatedAgentJob({ - job, - message: job.payload.message, + const result = await executeJobCore(state, job); + await finish(result.status, result.error, result.summary, { + sessionId: result.sessionId, + sessionKey: result.sessionKey, }); - - // Post a short summary back to the main session so the user sees - // the cron result without opening the isolated session. - const summaryText = res.summary?.trim(); - const deliveryMode = job.delivery?.mode ?? "announce"; - if (summaryText && deliveryMode !== "none") { - const prefix = "Cron"; - const label = - res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`; - state.deps.enqueueSystemEvent(label, { agentId: job.agentId }); - if (job.wakeMode === "now") { - state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` }); - } - } - - if (res.status === "ok") { - await finish("ok", undefined, res.summary); - } else if (res.status === "skipped") { - await finish("skipped", undefined, res.summary); - } else { - await finish("error", res.error ?? "cron job failed", res.summary); - } } catch (err) { await finish("error", String(err)); } finally { job.updatedAtMs = nowMs; if (!opts.forced && job.enabled && !deleted) { - // Keep nextRunAtMs in sync in case the schedule advanced during a long run. job.state.nextRunAtMs = computeJobNextRunAtMs(job, state.deps.nowMs()); } } diff --git a/src/cron/store.test.ts b/src/cron/store.test.ts new file mode 100644 index 00000000000..ec80160df22 --- /dev/null +++ b/src/cron/store.test.ts @@ -0,0 +1,32 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; +import { loadCronStore } from "./store.js"; + +async function makeStorePath() { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-store-")); + return { + dir, + storePath: path.join(dir, "jobs.json"), + cleanup: async () => { + await fs.rm(dir, { recursive: true, force: true }); + }, + }; +} + +describe("cron store", () => { + it("returns empty store when file does not exist", async () => { + const store = await makeStorePath(); + const loaded = await loadCronStore(store.storePath); + expect(loaded).toEqual({ version: 1, jobs: [] }); + await store.cleanup(); + }); + + it("throws when store contains invalid JSON", async () => { + const store = await makeStorePath(); + await fs.writeFile(store.storePath, "{ not json", "utf-8"); + await expect(loadCronStore(store.storePath)).rejects.toThrow(/Failed to parse cron store/i); + await store.cleanup(); + }); +}); diff --git a/src/cron/store.ts b/src/cron/store.ts index 5fb296153a4..21bc1824522 100644 --- a/src/cron/store.ts +++ b/src/cron/store.ts @@ -22,14 +22,28 @@ export function resolveCronStorePath(storePath?: string) { export async function loadCronStore(storePath: string): Promise { try { const raw = await fs.promises.readFile(storePath, "utf-8"); - const parsed = JSON5.parse(raw); - const jobs = Array.isArray(parsed?.jobs) ? (parsed?.jobs as never[]) : []; + let parsed: unknown; + try { + parsed = JSON5.parse(raw); + } catch (err) { + throw new Error(`Failed to parse cron store at ${storePath}: ${String(err)}`, { + cause: err, + }); + } + const parsedRecord = + parsed && typeof parsed === "object" && !Array.isArray(parsed) + ? (parsed as Record) + : {}; + const jobs = Array.isArray(parsedRecord.jobs) ? (parsedRecord.jobs as never[]) : []; return { version: 1, jobs: jobs.filter(Boolean) as never as CronStoreFile["jobs"], }; - } catch { - return { version: 1, jobs: [] }; + } catch (err) { + if ((err as { code?: unknown })?.code === "ENOENT") { + return { version: 1, jobs: [] }; + } + throw err; } } diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index ce9479d1ade..c8238c50f1b 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -42,6 +42,11 @@ export const CronPayloadSchema = Type.Union([ model: Type.Optional(Type.String()), thinking: Type.Optional(Type.String()), timeoutSeconds: Type.Optional(Type.Integer({ minimum: 1 })), + allowUnsafeExternalContent: Type.Optional(Type.Boolean()), + deliver: Type.Optional(Type.Boolean()), + channel: Type.Optional(Type.String()), + to: Type.Optional(Type.String()), + bestEffortDeliver: Type.Optional(Type.Boolean()), }, { additionalProperties: false }, ), @@ -62,6 +67,11 @@ export const CronPayloadPatchSchema = Type.Union([ model: Type.Optional(Type.String()), thinking: Type.Optional(Type.String()), timeoutSeconds: Type.Optional(Type.Integer({ minimum: 1 })), + allowUnsafeExternalContent: Type.Optional(Type.Boolean()), + deliver: Type.Optional(Type.Boolean()), + channel: Type.Optional(Type.String()), + to: Type.Optional(Type.String()), + bestEffortDeliver: Type.Optional(Type.Boolean()), }, { additionalProperties: false }, ), @@ -239,6 +249,8 @@ export const CronRunLogEntrySchema = Type.Object( ), error: Type.Optional(Type.String()), summary: Type.Optional(Type.String()), + sessionId: Type.Optional(NonEmptyString), + sessionKey: Type.Optional(NonEmptyString), runAtMs: Type.Optional(Type.Integer({ minimum: 0 })), durationMs: Type.Optional(Type.Integer({ minimum: 0 })), nextRunAtMs: Type.Optional(Type.Integer({ minimum: 0 })), diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index 68b0bc095e3..12b0fe6b6cd 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -90,6 +90,8 @@ export function buildGatewayCronService(params: { status: evt.status, error: evt.error, summary: evt.summary, + sessionId: evt.sessionId, + sessionKey: evt.sessionKey, runAtMs: evt.runAtMs, durationMs: evt.durationMs, nextRunAtMs: evt.nextRunAtMs, diff --git a/src/gateway/server-methods/cron.ts b/src/gateway/server-methods/cron.ts index 703103860f3..023d9d36332 100644 --- a/src/gateway/server-methods/cron.ts +++ b/src/gateway/server-methods/cron.ts @@ -189,7 +189,7 @@ export const cronHandlers: GatewayRequestHandlers = { ); return; } - const result = await context.cron.run(jobId, p.mode); + const result = await context.cron.run(jobId, p.mode ?? "force"); respond(true, result, undefined); }, "cron.runs": async ({ params, respond, context }) => { diff --git a/src/gateway/server.cron.e2e.test.ts b/src/gateway/server.cron.e2e.test.ts index fc37f1702b0..8e9d242e4f6 100644 --- a/src/gateway/server.cron.e2e.test.ts +++ b/src/gateway/server.cron.e2e.test.ts @@ -117,7 +117,7 @@ describe("gateway server cron", () => { | { schedule?: unknown; sessionTarget?: unknown; wakeMode?: unknown } | undefined; expect(wrappedPayload?.sessionTarget).toBe("main"); - expect(wrappedPayload?.wakeMode).toBe("next-heartbeat"); + expect(wrappedPayload?.wakeMode).toBe("now"); expect((wrappedPayload?.schedule as { kind?: unknown } | undefined)?.kind).toBe("at"); const patchRes = await rpcReq(ws, "cron.add", { @@ -181,6 +181,32 @@ describe("gateway server cron", () => { expect(merged?.delivery?.channel).toBe("telegram"); expect(merged?.delivery?.to).toBe("19098680"); + const legacyDeliveryPatchRes = await rpcReq(ws, "cron.update", { + id: mergeJobId, + patch: { + payload: { + kind: "agentTurn", + deliver: true, + channel: "signal", + to: "+15550001111", + bestEffortDeliver: true, + }, + }, + }); + expect(legacyDeliveryPatchRes.ok).toBe(true); + const legacyDeliveryPatched = legacyDeliveryPatchRes.payload as + | { + payload?: { kind?: unknown; message?: unknown }; + delivery?: { mode?: unknown; channel?: unknown; to?: unknown; bestEffort?: unknown }; + } + | undefined; + expect(legacyDeliveryPatched?.payload?.kind).toBe("agentTurn"); + expect(legacyDeliveryPatched?.payload?.message).toBe("hello"); + expect(legacyDeliveryPatched?.delivery?.mode).toBe("announce"); + expect(legacyDeliveryPatched?.delivery?.channel).toBe("signal"); + expect(legacyDeliveryPatched?.delivery?.to).toBe("+15550001111"); + expect(legacyDeliveryPatched?.delivery?.bestEffort).toBe(true); + const rejectRes = await rpcReq(ws, "cron.add", { name: "patch reject", enabled: true, diff --git a/src/gateway/session-utils.test.ts b/src/gateway/session-utils.test.ts index 76798db437a..2fb51153d49 100644 --- a/src/gateway/session-utils.test.ts +++ b/src/gateway/session-utils.test.ts @@ -331,4 +331,29 @@ describe("listSessionsFromStore search", () => { }); expect(result.sessions.length).toBe(1); }); + + test("hides cron run alias session keys from sessions list", () => { + const now = Date.now(); + const store: Record = { + "agent:main:cron:job-1": { + sessionId: "run-abc", + updatedAt: now, + label: "Cron: job-1", + } as SessionEntry, + "agent:main:cron:job-1:run:run-abc": { + sessionId: "run-abc", + updatedAt: now, + label: "Cron: job-1", + } as SessionEntry, + }; + + const result = listSessionsFromStore({ + cfg: baseCfg, + storePath: "/tmp/sessions.json", + store, + opts: {}, + }); + + expect(result.sessions.map((session) => session.key)).toEqual(["agent:main:cron:job-1"]); + }); }); diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index 80ea40e4084..bbbbc575ecc 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -207,6 +207,12 @@ export function classifySessionKey(key: string, entry?: SessionEntry): GatewaySe return "direct"; } +function isCronRunSessionKey(key: string): boolean { + const parsed = parseAgentSessionKey(key); + const raw = parsed?.rest ?? key; + return /^cron:[^:]+:run:[^:]+$/.test(raw); +} + export function parseGroupKey( key: string, ): { channel?: string; kind?: "group" | "channel"; id?: string } | null { @@ -568,6 +574,9 @@ export function listSessionsFromStore(params: { let sessions = Object.entries(store) .filter(([key]) => { + if (isCronRunSessionKey(key)) { + return false; + } if (!includeGlobal && key === "global") { return false; } diff --git a/ui/src/styles/components.css b/ui/src/styles/components.css index 5ab1858b817..f0438eeec2f 100644 --- a/ui/src/styles/components.css +++ b/ui/src/styles/components.css @@ -681,6 +681,138 @@ width: 100%; } +/* Cron jobs: allow long payload/state text and keep action buttons inside the card. */ +.cron-job-payload, +.cron-job-agent, +.cron-job-state { + overflow-wrap: anywhere; + word-break: break-word; +} + +.cron-job .list-title { + font-weight: 600; + font-size: 15px; + letter-spacing: -0.015em; +} + +.cron-job { + grid-template-columns: minmax(0, 1fr) minmax(240px, 300px); + grid-template-areas: + "main meta" + "footer footer"; + row-gap: 10px; +} + +.cron-job .list-main { + grid-area: main; +} + +.cron-job .list-meta { + grid-area: meta; + min-width: 240px; + gap: 8px; +} + +.cron-job-footer { + grid-area: footer; + display: flex; + justify-content: space-between; + align-items: center; + gap: 12px; + border-top: 1px solid var(--border); + padding-top: 10px; +} + +.cron-job-chips { + flex: 1 1 auto; +} + +.cron-job-detail { + display: grid; + gap: 3px; + margin-top: 2px; +} + +.cron-job-detail-label { + color: var(--muted); + font-size: 11px; + font-weight: 600; + letter-spacing: 0.03em; + text-transform: uppercase; +} + +.cron-job-detail-value { + font-size: 13px; + line-height: 1.35; +} + +.cron-job-state { + display: grid; + gap: 4px; +} + +.cron-job-state-row { + display: flex; + justify-content: space-between; + align-items: baseline; + gap: 10px; +} + +.cron-job-state-key { + color: var(--muted); + font-size: 10px; + font-weight: 600; + letter-spacing: 0.05em; + text-transform: uppercase; +} + +.cron-job-state-value { + color: var(--text); + font-size: 12px; + white-space: nowrap; +} + +.cron-job-status-pill { + font-size: 11px; + font-weight: 600; + border: 1px solid var(--border); + border-radius: var(--radius-full); + padding: 2px 8px; + text-transform: lowercase; +} + +.cron-job-status-ok { + color: var(--ok); + border-color: rgba(34, 197, 94, 0.35); + background: var(--ok-subtle); +} + +.cron-job-status-error { + color: var(--danger); + border-color: rgba(239, 68, 68, 0.35); + background: var(--danger-subtle); +} + +.cron-job-status-skipped { + color: var(--warn); + border-color: rgba(245, 158, 11, 0.35); + background: var(--warn-subtle); +} + +.cron-job-status-na { + color: var(--muted); +} + +.cron-job-actions { + flex-wrap: wrap; + justify-content: flex-end; + margin-top: 0; +} + +.cron-job-actions .btn { + flex: 0 0 auto; +} + @container (max-width: 560px) { .list-item { grid-template-columns: 1fr; @@ -690,6 +822,23 @@ min-width: 0; text-align: left; } + + .cron-job-actions { + justify-content: flex-start; + } + + .cron-job { + grid-template-columns: 1fr; + grid-template-areas: + "main" + "meta" + "footer"; + } + + .cron-job-footer { + flex-direction: column; + align-items: stretch; + } } /* =========================================== @@ -737,6 +886,12 @@ background: var(--warn-subtle); } +.chip-danger { + color: var(--danger); + border-color: rgba(239, 68, 68, 0.3); + background: var(--danger-subtle); +} + /* =========================================== Tables =========================================== */ @@ -783,6 +938,22 @@ text-decoration: underline; } +.session-key-cell { + display: grid; + gap: 4px; + min-width: 0; +} + +.session-key-cell .session-link, +.session-key-display-name { + overflow-wrap: anywhere; + word-break: break-word; +} + +.session-key-display-name { + font-size: 11px; +} + /* =========================================== Log Stream =========================================== */ diff --git a/ui/src/ui/app-defaults.ts b/ui/src/ui/app-defaults.ts index 6521d07487c..89bdaf11d1b 100644 --- a/ui/src/ui/app-defaults.ts +++ b/ui/src/ui/app-defaults.ts @@ -22,7 +22,7 @@ export const DEFAULT_CRON_FORM: CronFormState = { cronExpr: "0 7 * * *", cronTz: "", sessionTarget: "isolated", - wakeMode: "next-heartbeat", + wakeMode: "now", payloadKind: "agentTurn", payloadText: "", deliveryMode: "announce", diff --git a/ui/src/ui/app-render.helpers.ts b/ui/src/ui/app-render.helpers.ts index d2bc9aa906f..c1225859988 100644 --- a/ui/src/ui/app-render.helpers.ts +++ b/ui/src/ui/app-render.helpers.ts @@ -206,13 +206,13 @@ function resolveMainSessionKey( } function resolveSessionDisplayName(key: string, row?: SessionsListResult["sessions"][number]) { - const label = row?.label?.trim(); - if (label) { + const label = row?.label?.trim() || ""; + const displayName = row?.displayName?.trim() || ""; + if (label && label !== key) { return `${label} (${key})`; } - const displayName = row?.displayName?.trim(); - if (displayName) { - return displayName; + if (displayName && displayName !== key) { + return `${key} (${displayName})`; } return key; } diff --git a/ui/src/ui/app-render.ts b/ui/src/ui/app-render.ts index f5c71c57920..d416bde447e 100644 --- a/ui/src/ui/app-render.ts +++ b/ui/src/ui/app-render.ts @@ -581,6 +581,7 @@ export function renderApp(state: AppViewState) { ${ state.tab === "cron" ? renderCron({ + basePath: state.basePath, loading: state.cronLoading, status: state.cronStatus, jobs: state.cronJobs, diff --git a/ui/src/ui/format.test.ts b/ui/src/ui/format.test.ts index 8e1f121ea66..4260f07dac0 100644 --- a/ui/src/ui/format.test.ts +++ b/ui/src/ui/format.test.ts @@ -2,8 +2,8 @@ import { describe, expect, it } from "vitest"; import { formatAgo, stripThinkingTags } from "./format.ts"; describe("formatAgo", () => { - it("returns 'just now' for timestamps less than 60s in the future", () => { - expect(formatAgo(Date.now() + 30_000)).toBe("just now"); + it("returns 'in <1m' for timestamps less than 60s in the future", () => { + expect(formatAgo(Date.now() + 30_000)).toBe("in <1m"); }); it("returns 'Xm from now' for future timestamps", () => { diff --git a/ui/src/ui/format.ts b/ui/src/ui/format.ts index 812aaa3fb14..91debb2e41e 100644 --- a/ui/src/ui/format.ts +++ b/ui/src/ui/format.ts @@ -16,7 +16,7 @@ export function formatAgo(ms?: number | null): string { const suffix = diff < 0 ? "from now" : "ago"; const sec = Math.round(absDiff / 1000); if (sec < 60) { - return diff < 0 ? "just now" : `${sec}s ago`; + return diff < 0 ? "in <1m" : `${sec}s ago`; } const min = Math.round(sec / 60); if (min < 60) { diff --git a/ui/src/ui/types.ts b/ui/src/ui/types.ts index d1d3f432b58..1c85b87319b 100644 --- a/ui/src/ui/types.ts +++ b/ui/src/ui/types.ts @@ -704,6 +704,8 @@ export type CronRunLogEntry = { durationMs?: number; error?: string; summary?: string; + sessionId?: string; + sessionKey?: string; }; export type SkillsStatusConfigCheck = { diff --git a/ui/src/ui/views/cron.test.ts b/ui/src/ui/views/cron.test.ts index 6d1e2c7a4d1..ea74093afec 100644 --- a/ui/src/ui/views/cron.test.ts +++ b/ui/src/ui/views/cron.test.ts @@ -20,6 +20,7 @@ function createJob(id: string): CronJob { function createProps(overrides: Partial = {}): CronProps { return { + basePath: "", loading: false, status: null, jobs: [], @@ -70,7 +71,7 @@ describe("cron view", () => { expect(onLoadRuns).toHaveBeenCalledWith("job-1"); }); - it("marks the selected job and keeps Runs button to a single call", () => { + it("marks the selected job and keeps History button to a single call", () => { const container = document.createElement("div"); const onLoadRuns = vi.fn(); const job = createJob("job-1"); @@ -88,13 +89,73 @@ describe("cron view", () => { const selected = container.querySelector(".list-item-selected"); expect(selected).not.toBeNull(); - const runsButton = Array.from(container.querySelectorAll("button")).find( - (btn) => btn.textContent?.trim() === "Runs", + const historyButton = Array.from(container.querySelectorAll("button")).find( + (btn) => btn.textContent?.trim() === "History", ); - expect(runsButton).not.toBeUndefined(); - runsButton?.dispatchEvent(new MouseEvent("click", { bubbles: true })); + expect(historyButton).not.toBeUndefined(); + historyButton?.dispatchEvent(new MouseEvent("click", { bubbles: true })); expect(onLoadRuns).toHaveBeenCalledTimes(1); expect(onLoadRuns).toHaveBeenCalledWith("job-1"); }); + + it("renders run chat links when session keys are present", () => { + const container = document.createElement("div"); + render( + renderCron( + createProps({ + basePath: "/ui", + runsJobId: "job-1", + runs: [ + { + ts: Date.now(), + jobId: "job-1", + status: "ok", + summary: "done", + sessionKey: "agent:main:cron:job-1:run:abc", + }, + ], + }), + ), + container, + ); + + const link = container.querySelector("a.session-link"); + expect(link).not.toBeNull(); + expect(link?.getAttribute("href")).toContain( + "/ui/chat?session=agent%3Amain%3Acron%3Ajob-1%3Arun%3Aabc", + ); + }); + + it("shows selected job name and sorts run history newest first", () => { + const container = document.createElement("div"); + const job = createJob("job-1"); + render( + renderCron( + createProps({ + jobs: [job], + runsJobId: "job-1", + runs: [ + { ts: 1, jobId: "job-1", status: "ok", summary: "older run" }, + { ts: 2, jobId: "job-1", status: "ok", summary: "newer run" }, + ], + }), + ), + container, + ); + + expect(container.textContent).toContain("Latest runs for Daily ping."); + + const cards = Array.from(container.querySelectorAll(".card")); + const runHistoryCard = cards.find( + (card) => card.querySelector(".card-title")?.textContent?.trim() === "Run history", + ); + expect(runHistoryCard).not.toBeUndefined(); + + const summaries = Array.from( + runHistoryCard?.querySelectorAll(".list-item .list-sub") ?? [], + ).map((el) => (el.textContent ?? "").trim()); + expect(summaries[0]).toBe("newer run"); + expect(summaries[1]).toBe("older run"); + }); }); diff --git a/ui/src/ui/views/cron.ts b/ui/src/ui/views/cron.ts index a957cf1a207..7b87826eaad 100644 --- a/ui/src/ui/views/cron.ts +++ b/ui/src/ui/views/cron.ts @@ -1,15 +1,12 @@ import { html, nothing } from "lit"; import type { ChannelUiMetaEntry, CronJob, CronRunLogEntry, CronStatus } from "../types.ts"; import type { CronFormState } from "../ui-types.ts"; -import { formatMs } from "../format.ts"; -import { - formatCronPayload, - formatCronSchedule, - formatCronState, - formatNextRun, -} from "../presenter.ts"; +import { formatAgo, formatMs } from "../format.ts"; +import { pathForTab } from "../navigation.ts"; +import { formatCronSchedule, formatNextRun } from "../presenter.ts"; export type CronProps = { + basePath: string; loading: boolean; status: CronStatus | null; jobs: CronJob[]; @@ -59,6 +56,10 @@ function resolveChannelLabel(props: CronProps, channel: string): string { export function renderCron(props: CronProps) { const channelOptions = buildChannelOptions(props); + const selectedJob = + props.runsJobId == null ? undefined : props.jobs.find((job) => job.id === props.runsJobId); + const selectedRunTitle = selectedJob?.name ?? props.runsJobId ?? "(select a job)"; + const orderedRuns = props.runs.toSorted((a, b) => b.ts - a.ts); return html`
@@ -167,8 +168,8 @@ export function renderCron(props: CronProps) { wakeMode: (e.target as HTMLSelectElement).value as CronFormState["wakeMode"], })} > - +