refactor: move cron runtime state to sqlite

This commit is contained in:
Peter Steinberger
2026-05-06 23:28:32 +01:00
parent 318f7c923b
commit 188886c3ae
29 changed files with 791 additions and 208 deletions

View File

@@ -99,6 +99,7 @@ Docs: https://docs.openclaw.ai
- Codex/plugins: enable migrated source-installed `openai-curated` Codex plugins in the same Codex harness thread with explicit `codexPlugins` config, cached app readiness, and fail-closed destructive-action policy. Thanks @kevinslin.
- Codex/plugins: enforce native plugin destructive-action policy with Codex app-level `destructive_enabled` config instead of OpenClaw-maintained per-tool deny lists, leave plugin app `open_world_enabled` on by default, and invalidate existing plugin app thread bindings so old generated app config is rebuilt. Thanks @kevinslin.
- Gateway/sessions: remove the automatic cron session reaper and retired `cron.sessionRetention`; use `openclaw sessions cleanup` for session-row maintenance while cron run-log pruning remains under `cron.runLog`.
- Cron/state: store runtime schedule state and run history in the shared SQLite state database; `openclaw doctor --fix` imports legacy `jobs-state.json` and `cron/runs/*.jsonl` files.
- PR triage: mark external pull requests with `proof: supplied` when Barnacle finds structured real behavior proof, keep stale negative proof labels in sync across CRLF-edited PR bodies, and let ClawSweeper own the stronger `proof: sufficient` judgement.
- ACPX/Codex: preserve trusted Codex project declarations when launching isolated Codex ACP sessions, avoiding interactive trust prompts in headless runs. Thanks @Stedyclaw.
- ACPX/Codex: reap stale OpenClaw-owned ACPX/Codex ACP process trees on startup and after ACP session close, preventing orphaned harness processes from slowing the Gateway. Thanks @91wan.

View File

@@ -41,8 +41,8 @@ Cron is the Gateway's built-in scheduler. It persists jobs, wakes the agent at t
- Cron runs **inside the Gateway** process (not inside the model).
- Job definitions persist at `~/.openclaw/cron/jobs.json` so restarts do not lose schedules.
- Runtime execution state persists next to it in `~/.openclaw/cron/jobs-state.json`. If you track cron definitions in git, track `jobs.json` and gitignore `jobs-state.json`.
- After the split, older OpenClaw versions can read `jobs.json` but may treat jobs as fresh because runtime fields now live in `jobs-state.json`.
- Runtime execution state persists in the shared SQLite state database at `~/.openclaw/state/openclaw.sqlite`. Legacy `jobs-state.json` sidecars are imported by `openclaw doctor --fix`.
- If you track cron definitions in git, track `jobs.json` only. Runtime fields no longer belong beside the cron definition file.
- When `jobs.json` is edited while the Gateway is running or stopped, OpenClaw compares the changed schedule fields with pending runtime slot metadata and clears stale `nextRunAtMs` values. Pure formatting or key-order-only rewrites preserve the pending slot.
- All cron executions create [background task](/automation/tasks) records.
- On Gateway startup, overdue isolated agent-turn jobs are rescheduled out of the channel-connect window instead of replaying immediately, so Discord/Telegram startup and native-command setup stay responsive after restarts.
@@ -57,7 +57,7 @@ Cron is the Gateway's built-in scheduler. It persists jobs, wakes the agent at t
<a id="maintenance"></a>
<Note>
Task reconciliation for cron is runtime-owned first, durable-history-backed second: an active cron task stays live while the cron runtime still tracks that job as running, even if an old child session row still exists. Once the runtime stops owning the job and the 5-minute grace window expires, maintenance checks persisted run logs and job state for the matching `cron:<jobId>:<startedAt>` run. If that durable history shows a terminal result, the task ledger is finalized from it; otherwise Gateway-owned maintenance can mark the task `lost`. Offline CLI audit can recover from durable history, but it does not treat its own empty in-process active-job set as proof that a Gateway-owned cron run is gone.
Task reconciliation for cron is runtime-owned first, durable-history-backed second: an active cron task stays live while the cron runtime still tracks that job as running, even if an old child session row still exists. Once the runtime stops owning the job and the 5-minute grace window expires, maintenance checks persisted SQLite run logs and job state for the matching `cron:<jobId>:<startedAt>` run. If that durable history shows a terminal result, the task ledger is finalized from it; otherwise Gateway-owned maintenance can mark the task `lost`. Offline CLI audit can recover from durable history, but it does not treat its own empty in-process active-job set as proof that a Gateway-owned cron run is gone.
</Note>
## Schedule types
@@ -414,9 +414,9 @@ Model override note:
`maxConcurrentRuns` limits both scheduled cron dispatch and isolated agent-turn execution. Isolated cron agent turns use the queue's dedicated `cron-nested` execution lane internally, so raising this value lets independent cron LLM runs progress in parallel instead of only starting their outer cron wrappers. The shared non-cron `nested` lane is not widened by this setting.
The runtime state sidecar is derived from `cron.store`: a `.json` store such as `~/clawd/cron/jobs.json` uses `~/clawd/cron/jobs-state.json`, while a store path without a `.json` suffix appends `-state.json`.
Runtime state is keyed by the resolved `cron.store` path inside the shared SQLite state database. It stores pending slots, active markers, last-run metadata, and the schedule identity that tells the scheduler when an externally edited job needs a fresh `nextRunAtMs`.
If you hand-edit `jobs.json`, leave `jobs-state.json` out of source control. OpenClaw uses that sidecar for pending slots, active markers, last-run metadata, and the schedule identity that tells the scheduler when an externally edited job needs a fresh `nextRunAtMs`.
If you hand-edit `jobs.json`, do not create or edit `jobs-state.json`. Run `openclaw doctor --fix` once after upgrading from an older version so doctor can import and remove the legacy sidecar.
Disable cron: `cron.enabled: false` or `OPENCLAW_SKIP_CRON=1`.
@@ -428,7 +428,7 @@ Disable cron: `cron.enabled: false` or `OPENCLAW_SKIP_CRON=1`.
</Accordion>
<Accordion title="Maintenance">
`openclaw sessions cleanup` maintains isolated run-session entries. `cron.runLog.maxBytes` / `cron.runLog.keepLines` auto-prune run-log files.
`openclaw sessions cleanup` maintains isolated run-session entries. `cron.runLog.maxBytes` / `cron.runLog.keepLines` auto-prune SQLite run-log rows.
</Accordion>
</AccordionGroup>

View File

@@ -249,8 +249,8 @@ openclaw tasks notify <lookup> state_changes
- ACP/subagent tasks check their backing child session.
- Subagent tasks whose child session has a restart-recovery tombstone are marked lost instead of being treated as recoverable backing sessions.
- Cron tasks check whether the cron runtime still owns the job, then recover terminal status from persisted cron run logs/job state before falling back to `lost`. Only the Gateway process is authoritative for the in-memory cron active-job set; offline CLI audit uses durable history but does not mark a cron task lost solely because that local Set is empty.
- CLI tasks with run identity check the owning live run context, not just child-session or chat-session rows.
- Cron tasks check whether the cron runtime still owns the job, then recover terminal status from persisted SQLite cron run logs/job state before falling back to `lost`. Only the Gateway process is authoritative for the in-memory cron active-job set; offline CLI audit uses durable history but does not mark a cron task lost solely because that local Set is empty.
- Chat-backed CLI tasks check the owning live run context, not just the chat session row.
Completion cleanup is also runtime-aware:
@@ -344,7 +344,7 @@ A sweeper runs every **60 seconds** and handles four things:
</Accordion>
<Accordion title="Tasks and cron">
A cron job **definition** lives in `~/.openclaw/cron/jobs.json`; runtime execution state lives beside it in `~/.openclaw/cron/jobs-state.json`. **Every** cron execution creates a task record - both main-session and isolated. Main-session cron tasks default to `silent` notify policy so they track without generating notifications.
A cron job **definition** lives in `~/.openclaw/cron/jobs.json`; runtime execution state lives in the shared SQLite state database. **Every** cron execution creates a task record - both main-session and isolated. Main-session cron tasks default to `silent` notify policy so they track without generating notifications.
See [Cron Jobs](/automation/cron-jobs).

View File

@@ -88,7 +88,7 @@ Skipped runs are tracked separately from execution errors. They do not affect re
For isolated jobs that target a local configured model provider, cron runs a lightweight provider preflight before starting the agent turn. Loopback, private-network, and `.local` `api: "ollama"` providers are probed at `/api/tags`; local OpenAI-compatible providers such as vLLM, SGLang, and LM Studio are probed at `/models`. If the endpoint is unreachable, the run is recorded as `skipped` and retried on a later schedule; matching dead endpoints are cached for 5 minutes to avoid many jobs hammering the same local server.
Note: cron job definitions live in `jobs.json`, while pending runtime state lives in `jobs-state.json`. If `jobs.json` is edited externally, the Gateway reloads changed schedules and clears stale pending slots; formatting-only rewrites do not clear the pending slot.
Note: cron job definitions live in `jobs.json`, while pending runtime state lives in the shared SQLite state database. Legacy `jobs-state.json` sidecars are imported by `openclaw doctor --fix`. If `jobs.json` is edited externally, the Gateway reloads changed schedules and clears stale pending slots; formatting-only rewrites do not clear the pending slot.
### Manual runs
@@ -151,7 +151,7 @@ Isolated cron runs prefer structured execution-denial metadata from the embedded
Retention and pruning are controlled in config:
- `openclaw sessions cleanup` maintains completed isolated run sessions.
- `cron.runLog.maxBytes` and `cron.runLog.keepLines` prune `~/.openclaw/cron/runs/<jobId>.jsonl`.
- `cron.runLog.maxBytes` and `cron.runLog.keepLines` prune SQLite cron run history.
## Migrating older jobs

View File

@@ -114,7 +114,7 @@ openclaw sessions cleanup --json
`openclaw sessions cleanup` uses `session.maintenance` settings from config:
- Scope note: `openclaw sessions cleanup` maintains SQLite session rows only. It does not prune transcript files, trajectory sidecars, or cron run logs (`cron/runs/<jobId>.jsonl`), which are managed by their owning runtimes.
- Scope note: `openclaw sessions cleanup` maintains SQLite session rows only. It does not prune trajectory sidecars or cron run history, which are managed by their owning runtimes.
- Legacy JSON import belongs to `openclaw doctor --fix`; cleanup no longer imports or rewrites `sessions.json`.
- `--dry-run`: preview how many entries would be pruned/capped without writing.

View File

@@ -127,7 +127,7 @@ See [Sandboxing](/gateway/sandboxing) and [Multi-Agent Sandbox & Tools](/tools/m
Configure logging before the delegate handles any real data:
- Cron run history: `~/.openclaw/cron/runs/<jobId>.jsonl`
- Cron run history: `~/.openclaw/state/openclaw.sqlite`
- Session transcripts: `~/.openclaw/agents/delegate/sessions`
- Identity provider audit logs (Exchange, Google Workspace)

View File

@@ -1225,8 +1225,8 @@ Current builds no longer include the TCP bridge. Nodes connect over the Gateway
}
```
- `runLog.maxBytes`: max size per run log file (`cron/runs/<jobId>.jsonl`) before pruning. Default: `2_000_000` bytes.
- `runLog.keepLines`: newest lines retained when run-log pruning is triggered. Default: `2000`.
- `runLog.maxBytes`: approximate max serialized SQLite run-log bytes per job before pruning. Default: `2_000_000` bytes.
- `runLog.keepLines`: newest rows retained when run-log pruning is triggered. Default: `2000`.
- `webhookToken`: bearer token used for cron webhook POST delivery (`delivery.mode = "webhook"`), if omitted no auth header is sent.
- `webhook`: deprecated legacy fallback webhook URL (http/https) used only for stored jobs that still have `notify: true`.

View File

@@ -427,7 +427,7 @@ candidate contains redacted secret placeholders such as `***`.
}
```
- `runLog`: prune `cron/runs/<jobId>.jsonl` by size and retained lines.
- `runLog`: prune SQLite cron run history by approximate serialized size and retained rows.
- See [Cron jobs](/automation/cron-jobs) for feature overview and CLI examples.
</Accordion>

View File

@@ -86,7 +86,7 @@ cat ~/.openclaw/openclaw.json
- Plugin/tool allowlist warnings when `plugins.allow` is restrictive but tool policy still asks for wildcard or plugin-owned tools.
- Legacy on-disk state migration (sessions/agent dir/WhatsApp auth).
- Legacy plugin manifest contract key migration (`speechProviders`, `realtimeTranscriptionProviders`, `realtimeVoiceProviders`, `mediaUnderstandingProviders`, `imageGenerationProviders`, `videoGenerationProviders`, `webFetchProviders`, `webSearchProviders` → `contracts`).
- Legacy cron store migration (`jobId`, `schedule.cron`, top-level delivery/payload fields, payload `provider`, simple `notify: true` webhook fallback jobs).
- Legacy cron store migration (`jobId`, `schedule.cron`, top-level delivery/payload fields, payload `provider`, simple `notify: true` webhook fallback jobs, `jobs-state.json` and `cron/runs/*.jsonl` import into SQLite).
- Legacy whole-agent runtime-policy cleanup; provider/model runtime policy is the active route selector.
- Stale plugin config cleanup when plugins are enabled; when `plugins.enabled=false`, stale plugin references are treated as inert containment config and are preserved.
@@ -311,6 +311,8 @@ That stages grounded durable candidates into the short-term dreaming store while
- top-level delivery fields (`deliver`, `channel`, `to`, `provider`, ...) → `delivery`
- payload `provider` delivery aliases → explicit `delivery.channel`
- simple legacy `notify: true` webhook fallback jobs → explicit `delivery.mode="webhook"` with `delivery.to=cron.webhook`
- legacy `jobs-state.json` runtime sidecars → the shared SQLite state database
- legacy `cron/runs/*.jsonl` run history files → the shared SQLite state database
Doctor only auto-migrates `notify: true` jobs when it can do so without changing behavior. If a job combines legacy notify fallback with an existing non-webhook delivery mode, doctor warns and leaves that job for manual review.

View File

@@ -278,9 +278,8 @@ replacement. Gateway startup does not generate bundled-plugin dependency trees.
For full persistence details on VM deployments, see
[Docker VM Runtime - What persists where](/install/docker-vm-runtime#what-persists-where).
**Disk growth hotspots:** watch `media/`, session JSONL files,
`cron/runs/*.jsonl`, installed plugin package roots, and rolling file logs
under `/tmp/openclaw/`.
**Disk growth hotspots:** watch `media/`, the shared SQLite state database,
installed plugin package roots, and rolling file logs under `/tmp/openclaw/`.
### Shell helpers (optional)

View File

@@ -73,8 +73,8 @@ This plan has started landing in slices:
replay, chat/TUI history, restart/subagent recovery, managed media indexing,
token estimation, title/preview/usage helpers, runtime transcript repair,
bootstrap completion checks, and bounded inspection all use the scoped SQLite
transcript. Legacy JSONL import is doctor/import/debug only: `openclaw doctor
--fix` builds the transcript database from old files and removes the JSONL
transcript. Legacy JSONL import is doctor/import/debug only:
`openclaw doctor --fix` builds the transcript database from old files and removes the JSONL
sources after successful import. Runtime paths do not import, prune, or repair
JSONL files. Pre-compaction checkpoints are SQLite transcript snapshots, not
`.checkpoint.*.jsonl` copies; branch/restore and checkpoint pruning now work
@@ -101,6 +101,12 @@ This plan has started landing in slices:
- Managed outgoing image attachment metadata now uses the shared SQLite `kv`
store as the primary record path. Older per-attachment JSON files import into
SQLite when encountered and are removed after import.
- Cron runtime schedule state and run history now use the shared SQLite state
database. `openclaw doctor --fix` imports legacy `jobs-state.json` and
`cron/runs/*.jsonl` files into SQLite and removes those file sources after a
successful import. Runtime cron paths no longer write new schedule-state or
run-history JSON files; `jobs.json` remains the hand-editable job definition
file.
- The subagent run registry now uses the shared SQLite `kv` store as the
primary record path. Legacy `subagents/runs.json` files import into SQLite
when SQLite is empty and are removed after import.

View File

@@ -124,7 +124,7 @@ openclaw sessions cleanup --enforce
Isolated cron runs also create session entries/transcripts. Session rows use the same explicit session cleanup path as other rows:
- `openclaw sessions cleanup --enforce` maintains old isolated cron run sessions through `session.maintenance`.
- `cron.runLog.maxBytes` + `cron.runLog.keepLines` prune `~/.openclaw/cron/runs/<jobId>.jsonl` files (defaults: `2_000_000` bytes and `2000` lines).
- `cron.runLog.maxBytes` + `cron.runLog.keepLines` prune SQLite cron run history (defaults: `2_000_000` approximate serialized bytes and `2000` rows per job).
When cron force-creates a new isolated run session, it sanitizes the previous
`cron:<jobId>` session entry before writing the new row. It carries safe

View File

@@ -3,6 +3,7 @@ import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
import { maybeRepairLegacyCronStore, noteLegacyWhatsAppCrontabHealthCheck } from "./doctor-cron.js";
type TerminalNote = (message: string, title?: string) => void;
@@ -14,13 +15,23 @@ vi.mock("../terminal/note.js", () => ({
}));
let tempRoot: string | null = null;
let originalOpenClawStateDir: string | undefined;
async function makeTempStorePath() {
tempRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-doctor-cron-"));
originalOpenClawStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = path.join(tempRoot, "state");
return path.join(tempRoot, "cron", "jobs.json");
}
afterEach(async () => {
closeOpenClawStateDatabaseForTest();
if (originalOpenClawStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = originalOpenClawStateDir;
}
originalOpenClawStateDir = undefined;
noteMock.mockClear();
if (tempRoot) {
await fs.rm(tempRoot, { recursive: true, force: true });
@@ -134,6 +145,97 @@ describe("maybeRepairLegacyCronStore", () => {
);
});
it("imports legacy cron runtime state sidecars into SQLite", async () => {
const storePath = await makeTempStorePath();
const statePath = storePath.replace(/\.json$/, "-state.json");
await writeCronStore(storePath, [
{
id: "stateful-job",
name: "Stateful job",
enabled: true,
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "tick" },
state: {},
},
]);
await fs.writeFile(
statePath,
JSON.stringify(
{
version: 1,
jobs: {
"stateful-job": {
updatedAtMs: Date.parse("2026-02-01T00:01:00.000Z"),
state: { nextRunAtMs: Date.parse("2026-02-01T00:02:00.000Z") },
},
},
},
null,
2,
),
"utf-8",
);
await maybeRepairLegacyCronStore({
cfg: createCronConfig(storePath),
options: {},
prompter: makePrompter(true),
});
const { loadCronStore } = await import("../cron/store.js");
const loaded = await loadCronStore(storePath);
expect(loaded.jobs[0]?.updatedAtMs).toBe(Date.parse("2026-02-01T00:01:00.000Z"));
expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(Date.parse("2026-02-01T00:02:00.000Z"));
await expect(fs.stat(statePath)).rejects.toThrow();
expect(noteMock).toHaveBeenCalledWith(
expect.stringContaining("Imported 1 cron runtime state row into SQLite"),
"Doctor changes",
);
});
it("imports legacy cron run-log files into SQLite", async () => {
const storePath = await makeTempStorePath();
const logPath = path.join(path.dirname(storePath), "runs", "stateful-job.jsonl");
await writeCronStore(storePath, [
{
id: "stateful-job",
name: "Stateful job",
enabled: true,
createdAtMs: Date.parse("2026-02-01T00:00:00.000Z"),
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "main",
wakeMode: "next-heartbeat",
payload: { kind: "systemEvent", text: "tick" },
state: {},
},
]);
await fs.mkdir(path.dirname(logPath), { recursive: true });
await fs.writeFile(
logPath,
`${JSON.stringify({ ts: 1, jobId: "stateful-job", action: "finished", status: "ok" })}\n`,
"utf-8",
);
await maybeRepairLegacyCronStore({
cfg: createCronConfig(storePath),
options: {},
prompter: makePrompter(true),
});
const { readCronRunLogEntriesFromSqliteSync } = await import("../cron/run-log.js");
expect(readCronRunLogEntriesFromSqliteSync(storePath, { jobId: "stateful-job" })).toEqual([
expect.objectContaining({ ts: 1, status: "ok" }),
]);
await expect(fs.stat(logPath)).rejects.toThrow();
expect(noteMock).toHaveBeenCalledWith(
expect.stringContaining("Imported 1 cron run-log row from 1 legacy run-log file"),
"Doctor changes",
);
});
it("repairs malformed persisted cron ids before list rendering sees them", async () => {
const storePath = await makeTempStorePath();
await writeCronStore(storePath, [

View File

@@ -2,7 +2,18 @@ import { execFile } from "node:child_process";
import { promisify } from "node:util";
import { formatCliCommand } from "../cli/command-format.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { resolveCronStorePath, loadCronStore, saveCronStore } from "../cron/store.js";
import {
importLegacyCronRunLogFilesToSqlite,
legacyCronRunLogFilesExist,
resolveCronRunLogPruneOptions,
} from "../cron/run-log.js";
import {
importLegacyCronStateFileToSqlite,
legacyCronStateFileExists,
resolveCronStorePath,
loadCronStore,
saveCronStore,
} from "../cron/store.js";
import type { CronJob } from "../cron/types.js";
import {
normalizeOptionalLowercaseString,
@@ -195,9 +206,11 @@ export async function maybeRepairLegacyCronStore(params: {
prompter: Pick<DoctorPrompter, "confirm">;
}) {
const storePath = resolveCronStorePath(params.cfg.cron?.store);
const hasLegacyStateSidecar = legacyCronStateFileExists(storePath);
const hasLegacyRunLogs = await legacyCronRunLogFilesExist(storePath);
const store = await loadCronStore(storePath);
const rawJobs = (store.jobs ?? []) as unknown as Array<Record<string, unknown>>;
if (rawJobs.length === 0) {
if (rawJobs.length === 0 && !hasLegacyStateSidecar && !hasLegacyRunLogs) {
return;
}
@@ -216,6 +229,12 @@ export async function maybeRepairLegacyCronStore(params: {
`- ${pluralize(dreamingStaleCount, "managed dreaming job")} still has the legacy heartbeat-coupled shape`,
);
}
if (hasLegacyStateSidecar) {
previewLines.push("- Runtime state still lives in the legacy `jobs-state.json` sidecar");
}
if (hasLegacyRunLogs) {
previewLines.push("- Run history still lives in legacy `cron/runs/*.jsonl` files");
}
if (previewLines.length === 0) {
return;
}
@@ -224,7 +243,7 @@ export async function maybeRepairLegacyCronStore(params: {
[
`Legacy cron job storage detected at ${shortenHomePath(storePath)}.`,
...previewLines,
`Repair with ${formatCliCommand("openclaw doctor --fix")} to normalize the store before the next scheduler run.`,
`Repair with ${formatCliCommand("openclaw doctor --fix")} to normalize the store and import runtime state into SQLite before the next scheduler run.`,
].join("\n"),
"Cron",
);
@@ -261,6 +280,29 @@ export async function maybeRepairLegacyCronStore(params: {
}
}
const stateImport = hasLegacyStateSidecar
? await importLegacyCronStateFileToSqlite(storePath)
: { imported: false, importedJobs: 0 };
if (stateImport.imported) {
note(
`Imported ${pluralize(stateImport.importedJobs, "cron runtime state row")} into SQLite.`,
"Doctor changes",
);
}
if (hasLegacyRunLogs) {
const runLogImport = await importLegacyCronRunLogFilesToSqlite({
storePath,
opts: resolveCronRunLogPruneOptions(params.cfg.cron?.runLog),
});
if (runLogImport.files > 0) {
note(
`Imported ${pluralize(runLogImport.imported, "cron run-log row")} from ${pluralize(runLogImport.files, "legacy run-log file")} into SQLite.`,
"Doctor changes",
);
}
}
if (notifyMigration.warnings.length > 0) {
note(notifyMigration.warnings.join("\n"), "Doctor warnings");
}

View File

@@ -728,7 +728,7 @@ describe("config help copy quality", () => {
it("documents cron run-log retention controls", () => {
const runLog = FIELD_HELP["cron.runLog"];
expect(runLog.includes("cron/runs")).toBe(true);
expect(runLog.includes("SQLite")).toBe(true);
const maxBytes = FIELD_HELP["cron.runLog.maxBytes"];
expect(maxBytes.includes("2mb")).toBe(true);

View File

@@ -1556,11 +1556,11 @@ export const FIELD_HELP: Record<string, string> = {
"cron.webhookToken":
"Bearer token attached to cron webhook POST deliveries when webhook mode is used. Prefer secret/env substitution and rotate this token regularly if shared webhook endpoints are internet-reachable.",
"cron.runLog":
"Pruning controls for per-job cron run history files under `cron/runs/<jobId>.jsonl`, including size and line retention.",
"Pruning controls for per-job cron run history rows in the shared SQLite state database, including approximate serialized size and row retention.",
"cron.runLog.maxBytes":
"Maximum bytes per cron run-log file before pruning rewrites to the last keepLines entries (for example `2mb`, default `2000000`).",
"Maximum approximate serialized bytes per cron job run history before pruning to the newest keepLines rows (for example `2mb`, default `2000000`).",
"cron.runLog.keepLines":
"How many trailing run-log lines to retain when a file exceeds maxBytes (default `2000`). Increase for longer forensic history or lower for smaller disks.",
"How many newest cron run history rows to retain when a job exceeds maxBytes (default `2000`). Increase for longer forensic history or lower for smaller databases.",
hooks:
"Inbound webhook automation surface for mapping external events into wake or agent actions in OpenClaw. Keep this locked down with explicit token/session/agent controls before exposing it beyond trusted networks.",
"hooks.enabled":

View File

@@ -42,7 +42,7 @@ export type CronConfig = {
/** Bearer token for cron webhook POST delivery. */
webhookToken?: SecretInput;
/**
* Run-log pruning controls for `cron/runs/<jobId>.jsonl`.
* Run-log pruning controls for SQLite cron run history.
* Defaults: `maxBytes=2_000_000`, `keepLines=2000`.
*/
runLog?: {

View File

@@ -1,20 +1,30 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { afterEach, describe, expect, it } from "vitest";
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
import {
appendCronRunLogToSqlite,
appendCronRunLog,
DEFAULT_CRON_RUN_LOG_KEEP_LINES,
DEFAULT_CRON_RUN_LOG_MAX_BYTES,
getPendingCronRunLogWriteCountForTests,
importLegacyCronRunLogFilesToSqlite,
readCronRunLogEntries,
readCronRunLogEntriesFromSqliteSync,
readCronRunLogEntriesPage,
readCronRunLogEntriesPageAllFromSqlite,
readCronRunLogEntriesPageFromSqlite,
readCronRunLogEntriesSync,
resolveCronRunLogPruneOptions,
resolveCronRunLogPath,
} from "./run-log.js";
describe("cron run log", () => {
afterEach(() => {
closeOpenClawStateDatabaseForTest();
});
it("resolves prune options from config with defaults", () => {
expect(resolveCronRunLogPruneOptions()).toEqual({
maxBytes: DEFAULT_CRON_RUN_LOG_MAX_BYTES,
@@ -42,9 +52,17 @@ describe("cron run log", () => {
async function withRunLogDir(prefix: string, run: (dir: string) => Promise<void>) {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), prefix));
const originalStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = path.join(dir, "state");
try {
await run(dir);
} finally {
closeOpenClawStateDatabaseForTest();
if (originalStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = originalStateDir;
}
await fs.rm(dir, { recursive: true, force: true });
}
}
@@ -443,4 +461,63 @@ describe("cron run log", () => {
await writePromise.catch(() => undefined);
});
});
it("stores and pages SQLite run-log entries", async () => {
await withRunLogDir("openclaw-cron-log-sqlite-", async (dir) => {
const storePath = path.join(dir, "cron", "jobs.json");
await appendCronRunLogToSqlite(storePath, {
ts: 1,
jobId: "job-1",
action: "finished",
status: "ok",
summary: "first",
});
await appendCronRunLogToSqlite(storePath, {
ts: 2,
jobId: "job-1",
action: "finished",
status: "error",
error: "boom",
});
expect(readCronRunLogEntriesFromSqliteSync(storePath, { jobId: "job-1" })).toEqual([
expect.objectContaining({ ts: 1, summary: "first" }),
expect.objectContaining({ ts: 2, error: "boom" }),
]);
const page = await readCronRunLogEntriesPageFromSqlite(storePath, {
jobId: "job-1",
status: "error",
});
expect(page.entries).toEqual([expect.objectContaining({ ts: 2, status: "error" })]);
const all = await readCronRunLogEntriesPageAllFromSqlite({
storePath,
query: "Nightly Backup",
status: "error",
jobNameById: { "job-1": "Nightly Backup" },
});
expect(all.entries).toEqual([expect.objectContaining({ ts: 2 })]);
expect(all.entries[0]).toMatchObject({ jobName: "Nightly Backup" });
});
});
it("imports legacy JSONL run-log files into SQLite and removes them", async () => {
await withRunLogDir("openclaw-cron-log-import-", async (dir) => {
const storePath = path.join(dir, "cron", "jobs.json");
const logPath = path.join(dir, "cron", "runs", "job-1.jsonl");
await fs.mkdir(path.dirname(logPath), { recursive: true });
await fs.writeFile(
logPath,
`${JSON.stringify({ ts: 1, jobId: "job-1", action: "finished", status: "ok" })}\n`,
"utf-8",
);
const result = await importLegacyCronRunLogFilesToSqlite({ storePath });
expect(result).toMatchObject({ imported: 1, files: 1 });
expect(readCronRunLogEntriesFromSqliteSync(storePath, { jobId: "job-1" })).toEqual([
expect.objectContaining({ ts: 1, status: "ok" }),
]);
await expect(fs.stat(logPath)).rejects.toThrow();
});
});
});

View File

@@ -10,6 +10,10 @@ import {
normalizeOptionalString,
normalizeStringifiedOptionalString,
} from "../shared/string-coerce.js";
import {
openOpenClawStateDatabase,
runOpenClawStateWriteTransaction,
} from "../state/openclaw-state-db.js";
import { normalizeCronRunDiagnostics } from "./run-diagnostics.js";
import type {
CronDeliveryStatus,
@@ -91,7 +95,21 @@ export function resolveCronRunLogPath(params: { storePath: string; jobId: string
return resolvedPath;
}
export async function legacyCronRunLogFilesExist(storePath: string): Promise<boolean> {
const runsDir = path.resolve(path.dirname(path.resolve(storePath)), "runs");
if (!(await pathExists(runsDir))) {
return false;
}
const runsRoot = await fsRoot(runsDir).catch(() => null);
if (!runsRoot) {
return false;
}
const files = await runsRoot.list(".", { withFileTypes: true }).catch(() => []);
return files.some((entry) => entry.isFile && entry.name.endsWith(".jsonl"));
}
const writesByPath = new Map<string, Promise<void>>();
const writesByStoreKey = new Map<string, Promise<void>>();
async function setSecureFileMode(filePath: string): Promise<void> {
await fs.chmod(filePath, 0o600).catch(() => undefined);
@@ -125,7 +143,84 @@ export function resolveCronRunLogPruneOptions(cfg?: CronConfig["runLog"]): {
}
export function getPendingCronRunLogWriteCountForTests() {
return writesByPath.size;
return writesByPath.size + writesByStoreKey.size;
}
function resolveCronRunLogStoreKey(storePath: string): string {
return path.resolve(storePath);
}
type CronRunLogRow = {
entry_json: string;
};
function rowToCronRunLogEntry(row: CronRunLogRow): CronRunLogEntry | null {
const entries = parseAllRunLogEntries(`${row.entry_json}\n`);
return entries[0] ?? null;
}
function insertCronRunLogEntry(params: {
storePath: string;
entry: CronRunLogEntry;
maxBytes: number;
keepLines: number;
}) {
assertSafeCronRunLogJobId(params.entry.jobId);
const storeKey = resolveCronRunLogStoreKey(params.storePath);
const entryJson = JSON.stringify(params.entry);
runOpenClawStateWriteTransaction((database) => {
const seqRow = database.db
.prepare(
`
SELECT COALESCE(MAX(seq), 0) + 1 AS next_seq
FROM cron_run_logs
WHERE store_key = ? AND job_id = ?
`,
)
.get(storeKey, params.entry.jobId) as { next_seq?: number | bigint } | undefined;
const rawSeq = seqRow?.next_seq ?? 1;
const seq = typeof rawSeq === "bigint" ? Number(rawSeq) : rawSeq;
database.db
.prepare(
`
INSERT INTO cron_run_logs (store_key, job_id, seq, ts, entry_json, created_at)
VALUES (?, ?, ?, ?, ?, ?)
`,
)
.run(storeKey, params.entry.jobId, seq, params.entry.ts, entryJson, Date.now());
database.db
.prepare(
`
WITH ordered AS (
SELECT
seq,
ROW_NUMBER() OVER (ORDER BY ts DESC, seq DESC) AS rn,
SUM(LENGTH(entry_json) + 1) OVER (ORDER BY ts DESC, seq DESC) AS running_bytes
FROM cron_run_logs
WHERE store_key = ? AND job_id = ?
)
DELETE FROM cron_run_logs
WHERE store_key = ? AND job_id = ? AND seq IN (
SELECT seq FROM ordered WHERE rn > ? OR running_bytes > ?
)
`,
)
.run(
storeKey,
params.entry.jobId,
storeKey,
params.entry.jobId,
params.keepLines,
params.maxBytes,
);
});
}
async function drainPendingStoreWrite(storePath: string): Promise<void> {
const pending = writesByStoreKey.get(resolveCronRunLogStoreKey(storePath));
if (pending) {
await pending.catch(() => undefined);
}
}
async function drainPendingWrite(filePath: string): Promise<void> {
@@ -188,6 +283,33 @@ export async function appendCronRunLog(
}
}
export async function appendCronRunLogToSqlite(
storePath: string,
entry: CronRunLogEntry,
opts?: { maxBytes?: number; keepLines?: number },
) {
const storeKey = resolveCronRunLogStoreKey(storePath);
const prev = writesByStoreKey.get(storeKey) ?? Promise.resolve();
const next = prev
.catch(() => undefined)
.then(() => {
insertCronRunLogEntry({
storePath,
entry,
maxBytes: opts?.maxBytes ?? DEFAULT_CRON_RUN_LOG_MAX_BYTES,
keepLines: opts?.keepLines ?? DEFAULT_CRON_RUN_LOG_KEEP_LINES,
});
});
writesByStoreKey.set(storeKey, next);
try {
await next;
} finally {
if (writesByStoreKey.get(storeKey) === next) {
writesByStoreKey.delete(storeKey);
}
}
}
export async function readCronRunLogEntries(
filePath: string,
opts?: { limit?: number; jobId?: string },
@@ -221,6 +343,34 @@ export function readCronRunLogEntriesSync(
return parseAllRunLogEntries(raw, { jobId: opts?.jobId }).slice(-limit);
}
export function readCronRunLogEntriesFromSqliteSync(
storePath: string,
opts?: { limit?: number; jobId?: string },
): CronRunLogEntry[] {
const jobId = normalizeOptionalString(opts?.jobId);
if (!jobId) {
return [];
}
assertSafeCronRunLogJobId(jobId);
const limit = Math.max(1, Math.min(5000, Math.floor(opts?.limit ?? 200)));
const database = openOpenClawStateDatabase();
const rows = database.db
.prepare(
`
SELECT entry_json
FROM cron_run_logs
WHERE store_key = ? AND job_id = ?
ORDER BY ts DESC, seq DESC
LIMIT ?
`,
)
.all(resolveCronRunLogStoreKey(storePath), jobId, limit) as CronRunLogRow[];
return rows
.map(rowToCronRunLogEntry)
.filter((entry): entry is CronRunLogEntry => Boolean(entry))
.toReversed();
}
function normalizeRunStatusFilter(status?: string): CronRunLogStatusFilter {
if (status === "ok" || status === "error" || status === "skipped" || status === "all") {
return status;
@@ -439,6 +589,86 @@ export async function readCronRunLogEntriesPage(
};
}
function pageRunLogEntries(
entries: CronRunLogEntry[],
opts: ReadCronRunLogPageOptions = {},
queryTextForEntry?: (entry: CronRunLogEntry) => string,
) {
const limit = Math.max(1, Math.min(200, Math.floor(opts.limit ?? 50)));
const statuses = normalizeRunStatuses(opts);
const deliveryStatuses = normalizeDeliveryStatuses(opts);
const query = normalizeLowercaseStringOrEmpty(opts.query);
const sortDir: CronRunLogSortDir = opts.sortDir === "asc" ? "asc" : "desc";
const filtered = filterRunLogEntries(entries, {
statuses,
deliveryStatuses,
query,
queryTextForEntry:
queryTextForEntry ??
((entry) =>
[
entry.summary ?? "",
entry.error ?? "",
entry.diagnostics?.summary ?? "",
...(entry.diagnostics?.entries ?? []).map((diagnostic) => diagnostic.message),
entry.jobId,
entry.delivery?.intended?.channel ?? "",
entry.delivery?.resolved?.channel ?? "",
...(entry.delivery?.messageToolSentTo ?? []).map((target) => target.channel),
].join(" ")),
});
const sorted =
sortDir === "asc"
? filtered.toSorted((a, b) => a.ts - b.ts)
: filtered.toSorted((a, b) => b.ts - a.ts);
const total = sorted.length;
const offset = Math.max(0, Math.min(total, Math.floor(opts.offset ?? 0)));
const pageEntries = sorted.slice(offset, offset + limit);
const nextOffset = offset + pageEntries.length;
return {
entries: pageEntries,
total,
offset,
limit,
hasMore: nextOffset < total,
nextOffset: nextOffset < total ? nextOffset : null,
};
}
export async function readCronRunLogEntriesPageFromSqlite(
storePath: string,
opts?: ReadCronRunLogPageOptions,
): Promise<CronRunLogPageResult> {
await drainPendingStoreWrite(storePath);
const jobId = normalizeOptionalString(opts?.jobId);
if (!jobId) {
return {
entries: [],
total: 0,
offset: 0,
limit: Math.max(1, Math.min(200, Math.floor(opts?.limit ?? 50))),
hasMore: false,
nextOffset: null,
};
}
assertSafeCronRunLogJobId(jobId);
const database = openOpenClawStateDatabase();
const rows = database.db
.prepare(
`
SELECT entry_json
FROM cron_run_logs
WHERE store_key = ? AND job_id = ?
ORDER BY ts ASC, seq ASC
`,
)
.all(resolveCronRunLogStoreKey(storePath), jobId) as CronRunLogRow[];
const entries = rows
.map(rowToCronRunLogEntry)
.filter((entry): entry is CronRunLogEntry => Boolean(entry));
return pageRunLogEntries(entries, opts);
}
export async function readCronRunLogEntriesPageAll(
opts: ReadCronRunLogAllPageOptions,
): Promise<CronRunLogPageResult> {
@@ -535,3 +765,83 @@ export async function readCronRunLogEntriesPageAll(
nextOffset: nextOffset < total ? nextOffset : null,
};
}
export async function readCronRunLogEntriesPageAllFromSqlite(
opts: ReadCronRunLogAllPageOptions,
): Promise<CronRunLogPageResult> {
await drainPendingStoreWrite(opts.storePath);
const database = openOpenClawStateDatabase();
const rows = database.db
.prepare(
`
SELECT entry_json
FROM cron_run_logs
WHERE store_key = ?
ORDER BY ts ASC, seq ASC
`,
)
.all(resolveCronRunLogStoreKey(opts.storePath)) as CronRunLogRow[];
const entries = rows
.map(rowToCronRunLogEntry)
.filter((entry): entry is CronRunLogEntry => Boolean(entry));
const page = pageRunLogEntries(entries, opts, (entry) => {
const jobName = opts.jobNameById?.[entry.jobId] ?? "";
return [
entry.summary ?? "",
entry.error ?? "",
entry.diagnostics?.summary ?? "",
...(entry.diagnostics?.entries ?? []).map((diagnostic) => diagnostic.message),
entry.jobId,
jobName,
entry.delivery?.intended?.channel ?? "",
entry.delivery?.resolved?.channel ?? "",
...(entry.delivery?.messageToolSentTo ?? []).map((target) => target.channel),
].join(" ");
});
if (opts.jobNameById) {
for (const entry of page.entries) {
const jobName = opts.jobNameById[entry.jobId];
if (jobName) {
(entry as CronRunLogEntry & { jobName?: string }).jobName = jobName;
}
}
}
return page;
}
export async function importLegacyCronRunLogFilesToSqlite(params: {
storePath: string;
opts?: { maxBytes?: number; keepLines?: number };
}): Promise<{ imported: number; files: number; removedDir?: string }> {
const runsDir = path.resolve(path.dirname(path.resolve(params.storePath)), "runs");
if (!(await pathExists(runsDir))) {
return { imported: 0, files: 0 };
}
const runsRoot = await fsRoot(runsDir).catch(() => null);
if (!runsRoot) {
return { imported: 0, files: 0 };
}
const files = (await runsRoot.list(".", { withFileTypes: true }).catch(() => []))
.filter((entry) => entry.isFile && entry.name.endsWith(".jsonl"))
.map((entry) => entry.name);
let imported = 0;
for (const fileName of files) {
const raw = await runsRoot.readText(fileName).catch(() => "");
for (const entry of parseAllRunLogEntries(raw)) {
await appendCronRunLogToSqlite(params.storePath, entry, params.opts);
imported++;
}
await fs.rm(path.join(runsDir, fileName), { force: true }).catch(() => undefined);
}
let removedDir: string | undefined;
try {
const remaining = await runsRoot.list(".", { withFileTypes: true });
if (remaining.length === 0) {
await fs.rmdir(runsDir);
removedDir = runsDir;
}
} catch {
// best-effort cleanup only
}
return { imported, files: files.length, removedDir };
}

View File

@@ -2,6 +2,7 @@ import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterAll, afterEach, beforeAll, beforeEach, vi } from "vitest";
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
import type { MockFn } from "../test-utils/vitest-mock-fn.js";
import type { CronEvent, CronServiceDeps } from "./service.js";
import { CronService } from "./service.js";
@@ -27,12 +28,21 @@ export function createNoopLogger(): NoopLogger {
export function createCronStoreHarness(options?: { prefix?: string }) {
let fixtureRoot = "";
let caseId = 0;
let originalOpenClawStateDir: string | undefined;
beforeAll(async () => {
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), options?.prefix ?? "openclaw-cron-"));
originalOpenClawStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = path.join(fixtureRoot, "state");
});
afterAll(async () => {
closeOpenClawStateDatabaseForTest();
if (originalOpenClawStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = originalOpenClawStateDir;
}
if (!fixtureRoot) {
return;
}

View File

@@ -4,7 +4,7 @@ import { describe, expect, it, vi } from "vitest";
import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js";
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
import { setupCronServiceSuite, writeCronStoreSnapshot } from "../service.test-harness.js";
import { loadCronStore } from "../store.js";
import { importLegacyCronStateFileToSqlite, loadCronStore } from "../store.js";
import type { CronJob } from "../types.js";
import { run, start, stop, update } from "./ops.js";
import { createCronServiceState } from "./state.js";
@@ -186,7 +186,7 @@ describe("cron service ops seam coverage", () => {
stop(state);
});
it("start persists load-time updatedAtMs repairs to the state sidecar only", async () => {
it("start persists load-time updatedAtMs repairs to SQLite state only", async () => {
const { storePath } = await makeStorePath();
const now = Date.parse("2026-04-09T08:00:00.000Z");
const createdAtMs = now - 86_400_000;
@@ -235,6 +235,7 @@ describe("cron service ops seam coverage", () => {
),
"utf-8",
);
await importLegacyCronStateFileToSqlite(storePath);
const configBefore = await fs.readFile(storePath, "utf-8");
const state = createCronServiceState({
@@ -251,13 +252,11 @@ describe("cron service ops seam coverage", () => {
await start(state);
const configAfter = await fs.readFile(storePath, "utf-8");
const persistedState = JSON.parse(await fs.readFile(statePath, "utf-8")) as {
jobs: Record<string, { updatedAtMs?: unknown; state?: { nextRunAtMs?: unknown } }>;
};
const persisted = await loadCronStore(storePath);
expect(configAfter).toBe(configBefore);
expect(persistedState.jobs[jobId]?.updatedAtMs).toBe(createdAtMs);
expect(persistedState.jobs[jobId]?.state?.nextRunAtMs).toBe(nextRunAtMs);
expect(persisted.jobs[0]?.updatedAtMs).toBe(createdAtMs);
expect(persisted.jobs[0]?.state.nextRunAtMs).toBe(nextRunAtMs);
} finally {
stop(state);
}

View File

@@ -3,17 +3,33 @@ import os from "node:os";
import path from "node:path";
import { setTimeout as scheduleNativeTimeout } from "node:timers";
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { loadCronStore, loadCronStoreSync, resolveCronStorePath, saveCronStore } from "./store.js";
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
import {
importLegacyCronStateFileToSqlite,
loadCronStore,
loadCronStoreSync,
resolveCronStorePath,
saveCronStore,
} from "./store.js";
import type { CronStoreFile } from "./types.js";
let fixtureRoot = "";
let caseId = 0;
let originalOpenClawStateDir: string | undefined;
beforeAll(async () => {
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-store-"));
originalOpenClawStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = path.join(fixtureRoot, "state");
});
afterAll(async () => {
closeOpenClawStateDatabaseForTest();
if (originalOpenClawStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = originalOpenClawStateDir;
}
if (fixtureRoot) {
await fs.rm(fixtureRoot, { recursive: true, force: true });
}
@@ -142,53 +158,24 @@ describe("cron store", () => {
});
});
it("compares split state identity for flat legacy cron rows", async () => {
it("compares SQLite state identity for flat legacy cron rows", async () => {
const { storePath } = await makeStorePath();
const statePath = storePath.replace(/\.json$/, "-state.json");
await fs.mkdir(path.dirname(storePath), { recursive: true });
await fs.writeFile(
storePath,
JSON.stringify(
{
version: 1,
jobs: [
{
id: "legacy-flat-cron",
name: "legacy flat cron",
enabled: true,
kind: "cron",
cron: "*/10 * * * *",
tz: "UTC",
},
],
},
null,
2,
),
"utf-8",
);
await fs.writeFile(
statePath,
JSON.stringify(
{
version: 1,
jobs: {
"legacy-flat-cron": {
updatedAtMs: 1,
scheduleIdentity: JSON.stringify({
version: 1,
enabled: true,
schedule: { kind: "cron", expr: "0 * * * *", tz: "UTC" },
}),
state: { nextRunAtMs: 123 },
},
},
},
null,
2,
),
"utf-8",
);
const payload = makeStore("legacy-flat-cron", true);
payload.jobs[0].updatedAtMs = 1;
payload.jobs[0].schedule = { kind: "cron", expr: "0 * * * *", tz: "UTC" };
payload.jobs[0].state = { nextRunAtMs: 123 };
await saveCronStore(storePath, payload);
const config = JSON.parse(await fs.readFile(storePath, "utf-8"));
config.jobs[0] = {
id: "legacy-flat-cron",
name: "legacy flat cron",
enabled: true,
kind: "cron",
cron: "*/10 * * * *",
tz: "UTC",
};
await fs.writeFile(storePath, JSON.stringify(config, null, 2), "utf-8");
const loaded = await loadCronStore(storePath);
@@ -249,14 +236,9 @@ describe("cron store", () => {
expect(config.jobs[0].state).toStrictEqual({});
expect(config.jobs[0]).not.toHaveProperty("updatedAtMs");
// State file should contain runtime fields.
const statePath = store.storePath.replace(/\.json$/, "-state.json");
const stateRaw = await fs.readFile(statePath, "utf-8");
const stateFile = JSON.parse(stateRaw);
expect(stateFile.jobs[first.jobs[0].id].state.nextRunAtMs).toBe(
first.jobs[0].createdAtMs + 60_000,
);
expect(typeof stateFile.jobs[first.jobs[0].id].scheduleIdentity).toBe("string");
await expect(fs.stat(store.storePath.replace(/\.json$/, "-state.json"))).rejects.toThrow();
const loaded = await loadCronStore(store.storePath);
expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(first.jobs[0].createdAtMs + 60_000);
await expectPathMissing(`${store.storePath}.bak`);
});
@@ -303,10 +285,9 @@ describe("cron store", () => {
expect(loaded.jobs[0]?.state.nextRunAtMs).toBeUndefined();
});
it("keeps state separate for custom store paths without a json suffix", async () => {
it("keeps SQLite state separate for custom store paths without a json suffix", async () => {
const store = await makeStorePath();
const storePath = store.storePath.replace(/\.json$/, "");
const statePath = `${storePath}-state.json`;
const first = makeStore("job-1", true);
const second: CronStoreFile = {
...first,
@@ -328,46 +309,40 @@ describe("cron store", () => {
expect(config.jobs[0].id).toBe("job-1");
expect(config.jobs[0].state).toStrictEqual({});
const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8"));
expect(stateFile.jobs["job-1"].state.nextRunAtMs).toBe(first.jobs[0].createdAtMs + 60_000);
await expect(fs.stat(`${storePath}-state.json`)).rejects.toThrow();
const loaded = await loadCronStore(storePath);
expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(first.jobs[0].createdAtMs + 60_000);
});
it("recreates a missing state sidecar without rewriting unchanged config", async () => {
it("persists unchanged SQLite state without rewriting unchanged config", async () => {
const store = await makeStorePath();
const statePath = store.storePath.replace(/\.json$/, "-state.json");
const payload = makeStore("job-1", true);
payload.jobs[0].state = { nextRunAtMs: payload.jobs[0].createdAtMs + 60_000 };
await saveCronStore(store.storePath, payload);
await loadCronStore(store.storePath);
const configRawBefore = await fs.readFile(store.storePath, "utf-8");
await fs.rm(statePath);
const renamedDestinations = await captureRenameDestinations(() =>
saveCronStore(store.storePath, payload),
);
const configRawAfter = await fs.readFile(store.storePath, "utf-8");
const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8"));
const loaded = await loadCronStore(store.storePath);
expect(configRawAfter).toBe(configRawBefore);
expect(renamedDestinations).toContain(statePath);
expect(renamedDestinations).not.toContain(store.storePath);
expect(stateFile.jobs["job-1"].state.nextRunAtMs).toBe(payload.jobs[0].createdAtMs + 60_000);
expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(payload.jobs[0].createdAtMs + 60_000);
});
it("recreates a missing config file without rewriting unchanged state", async () => {
it("recreates a missing config file without changing SQLite state", async () => {
const store = await makeStorePath();
const statePath = store.storePath.replace(/\.json$/, "-state.json");
const payload = makeStore("job-1", true);
payload.jobs[0].state = { nextRunAtMs: payload.jobs[0].createdAtMs + 60_000 };
await saveCronStore(store.storePath, payload);
await loadCronStore(store.storePath);
const stateRawBefore = await fs.readFile(statePath, "utf-8");
await fs.rm(store.storePath);
const renamedDestinations = await captureRenameDestinations(() =>
@@ -375,18 +350,16 @@ describe("cron store", () => {
);
const config = JSON.parse(await fs.readFile(store.storePath, "utf-8"));
const stateRawAfter = await fs.readFile(statePath, "utf-8");
const loaded = await loadCronStore(store.storePath);
expect(config.jobs[0].id).toBe("job-1");
expect(config.jobs[0].state).toStrictEqual({});
expect(stateRawAfter).toBe(stateRawBefore);
expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(payload.jobs[0].createdAtMs + 60_000);
expect(renamedDestinations).toContain(store.storePath);
expect(renamedDestinations).not.toContain(statePath);
});
it("migrates legacy inline state into the state sidecar", async () => {
it("migrates legacy inline state into SQLite state", async () => {
const store = await makeStorePath();
const statePath = store.storePath.replace(/\.json$/, "-state.json");
const legacy = makeStore("job-1", true);
legacy.jobs[0].state = {
lastRunAtMs: legacy.jobs[0].createdAtMs + 30_000,
@@ -400,15 +373,15 @@ describe("cron store", () => {
await saveCronStore(store.storePath, loaded);
const config = JSON.parse(await fs.readFile(store.storePath, "utf-8"));
const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8"));
const reloaded = await loadCronStore(store.storePath);
expect(config.jobs[0]).not.toHaveProperty("updatedAtMs");
expect(config.jobs[0].state).toStrictEqual({});
expect(stateFile.jobs["job-1"].updatedAtMs).toBe(legacy.jobs[0].updatedAtMs);
expect(stateFile.jobs["job-1"].state.nextRunAtMs).toBe(legacy.jobs[0].createdAtMs + 60_000);
expect(reloaded.jobs[0]?.updatedAtMs).toBe(legacy.jobs[0].updatedAtMs);
expect(reloaded.jobs[0]?.state.nextRunAtMs).toBe(legacy.jobs[0].createdAtMs + 60_000);
});
it("ignores array-shaped state sidecars when migrating legacy inline state", async () => {
it("ignores array-shaped legacy state sidecars when migrating legacy inline state", async () => {
const store = await makeStorePath();
const statePath = store.storePath.replace(/\.json$/, "-state.json");
// Numeric-looking IDs catch accidental array indexing in invalid sidecars.
@@ -437,22 +410,21 @@ describe("cron store", () => {
const loaded = await loadCronStore(store.storePath);
await saveCronStore(store.storePath, loaded);
const stateFile = JSON.parse(await fs.readFile(statePath, "utf-8"));
const reloaded = await loadCronStore(store.storePath);
expect(loaded.jobs[0]?.updatedAtMs).toBe(legacy.jobs[0].updatedAtMs);
expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(legacy.jobs[0].createdAtMs + 60_000);
expect(Array.isArray(stateFile.jobs)).toBe(false);
expect(stateFile.jobs["0"].updatedAtMs).toBe(legacy.jobs[0].updatedAtMs);
expect(stateFile.jobs["0"].state.nextRunAtMs).toBe(legacy.jobs[0].createdAtMs + 60_000);
expect(reloaded.jobs[0]?.updatedAtMs).toBe(legacy.jobs[0].updatedAtMs);
expect(reloaded.jobs[0]?.state.nextRunAtMs).toBe(legacy.jobs[0].createdAtMs + 60_000);
});
it("treats a corrupt state sidecar as absent", async () => {
it("treats a corrupt legacy state sidecar as absent at runtime", async () => {
const store = await makeStorePath();
const payload = makeStore("job-1", true);
payload.jobs[0].state = { nextRunAtMs: payload.jobs[0].createdAtMs + 60_000 };
const statePath = store.storePath.replace(/\.json$/, "-state.json");
await saveCronStore(store.storePath, payload);
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
await fs.writeFile(store.storePath, JSON.stringify(payload, null, 2), "utf-8");
await fs.writeFile(statePath, "{ not json", "utf-8");
const loaded = await loadCronStore(store.storePath);
@@ -461,12 +433,17 @@ describe("cron store", () => {
expect(loaded.jobs[0]?.state).toStrictEqual({});
});
it("propagates unreadable state sidecar errors", async () => {
it("propagates unreadable legacy state sidecar errors during doctor import", async () => {
const store = await makeStorePath();
const payload = makeStore("job-1", true);
const statePath = store.storePath.replace(/\.json$/, "-state.json");
await saveCronStore(store.storePath, payload);
await fs.writeFile(
statePath,
JSON.stringify({ version: 1, jobs: { "job-1": { state: {} } } }),
"utf-8",
);
const origReadFile = fs.readFile.bind(fs);
const spy = vi.spyOn(fs, "readFile").mockImplementation(async (filePath, options) => {
@@ -479,13 +456,15 @@ describe("cron store", () => {
});
try {
await expect(loadCronStore(store.storePath)).rejects.toThrow(/Failed to read cron state/);
await expect(importLegacyCronStateFileToSqlite(store.storePath)).rejects.toThrow(
/Failed to read cron state/,
);
} finally {
spy.mockRestore();
}
});
it("sanitizes invalid updatedAtMs values from the state sidecar", async () => {
it("imports legacy state sidecars into SQLite and sanitizes invalid updatedAtMs values", async () => {
const store = await makeStorePath();
const job = makeStore("job-1", true).jobs[0];
const config = {
@@ -514,10 +493,12 @@ describe("cron store", () => {
"utf-8",
);
await importLegacyCronStateFileToSqlite(store.storePath);
const loaded = await loadCronStore(store.storePath);
expect(loaded.jobs[0]?.updatedAtMs).toBe(job.createdAtMs);
expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(job.createdAtMs + 60_000);
await expect(fs.stat(statePath)).rejects.toThrow();
});
it.skipIf(process.platform === "win32")(

View File

@@ -2,11 +2,19 @@ import fs from "node:fs";
import path from "node:path";
import { expandHomePrefix } from "../infra/home-dir.js";
import { replaceFileAtomic } from "../infra/replace-file.js";
import { openOpenClawStateDatabase } from "../state/openclaw-state-db.js";
import {
readOpenClawStateKvJson,
writeOpenClawStateKvJson,
type OpenClawStateJsonValue,
} from "../state/openclaw-state-kv.js";
import { resolveConfigDir } from "../utils.js";
import { parseJsonWithJson5Fallback } from "../utils/parse-json-compat.js";
import { tryCronScheduleIdentity } from "./schedule-identity.js";
import type { CronStoreFile } from "./types.js";
const CRON_STATE_KV_SCOPE = "cron.jobs.state";
type SerializedStoreCacheEntry = {
configJson?: string;
stateJson?: string;
@@ -50,6 +58,66 @@ type CronStateFile = {
jobs: Record<string, CronStateFileEntry>;
};
function cronStateKey(storePath: string): string {
return path.resolve(storePath);
}
function isRecord(value: unknown): value is Record<string, unknown> {
return Boolean(value) && typeof value === "object" && !Array.isArray(value);
}
function normalizeCronStateFile(value: unknown): CronStateFile | null {
if (!isRecord(value) || value.version !== 1 || !isRecord(value.jobs)) {
return null;
}
const jobs: Record<string, CronStateFileEntry> = {};
for (const [jobId, entry] of Object.entries(value.jobs)) {
if (!isRecord(entry)) {
continue;
}
const normalized: CronStateFileEntry = {};
if (typeof entry.updatedAtMs === "number" && Number.isFinite(entry.updatedAtMs)) {
normalized.updatedAtMs = entry.updatedAtMs;
}
if (typeof entry.scheduleIdentity === "string") {
normalized.scheduleIdentity = entry.scheduleIdentity;
}
if (isRecord(entry.state)) {
normalized.state = entry.state;
}
jobs[jobId] = normalized;
}
return { version: 1, jobs };
}
function readStateDatabase(storePath: string): CronStateFile | null {
const value = readOpenClawStateKvJson(CRON_STATE_KV_SCOPE, cronStateKey(storePath));
return normalizeCronStateFile(value);
}
function readStateDatabaseSync(storePath: string): CronStateFile | null {
const database = openOpenClawStateDatabase();
const row = database.db
.prepare("SELECT value_json FROM kv WHERE scope = ? AND key = ?")
.get(CRON_STATE_KV_SCOPE, cronStateKey(storePath)) as { value_json?: string } | undefined;
if (typeof row?.value_json !== "string") {
return null;
}
try {
return normalizeCronStateFile(JSON.parse(row.value_json));
} catch {
return null;
}
}
function writeStateDatabase(storePath: string, stateFile: CronStateFile) {
writeOpenClawStateKvJson<OpenClawStateJsonValue>(
CRON_STATE_KV_SCOPE,
cronStateKey(storePath),
stateFile as unknown as OpenClawStateJsonValue,
);
}
function stripRuntimeOnlyCronFields(store: CronStoreFile): unknown {
return {
version: store.version,
@@ -83,6 +151,14 @@ export function resolveCronStorePath(storePath?: string) {
return resolveDefaultCronStorePath();
}
export function legacyCronStateFileExists(storePath: string): boolean {
try {
return fs.existsSync(resolveStatePath(storePath));
} catch {
return false;
}
}
async function loadStateFile(statePath: string): Promise<CronStateFile | null> {
let raw: string;
try {
@@ -117,37 +193,28 @@ async function loadStateFile(statePath: string): Promise<CronStateFile | null> {
}
}
function loadStateFileSync(statePath: string): CronStateFile | null {
let raw: string;
try {
raw = fs.readFileSync(statePath, "utf-8");
} catch (err) {
if ((err as { code?: unknown })?.code === "ENOENT") {
return null;
}
throw new Error(`Failed to read cron state at ${statePath}: ${String(err)}`, {
cause: err,
});
export async function importLegacyCronStateFileToSqlite(storePath: string): Promise<{
imported: boolean;
importedJobs: number;
removedPath?: string;
}> {
const statePath = resolveStatePath(storePath);
const stateFile = await loadStateFile(statePath);
if (!stateFile) {
return { imported: false, importedJobs: 0 };
}
writeStateDatabase(storePath, stateFile);
try {
const parsed = parseJsonWithJson5Fallback(raw);
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
return null;
}
const record = parsed as Record<string, unknown>;
if (
record.version !== 1 ||
typeof record.jobs !== "object" ||
record.jobs === null ||
Array.isArray(record.jobs)
) {
return null;
}
return { version: 1, jobs: record.jobs as Record<string, CronStateFileEntry> };
await fs.promises.rm(statePath, { force: true });
} catch {
return null;
// Import already succeeded; a later doctor run can remove the stale sidecar.
}
serializedStoreCache.delete(storePath);
return {
imported: true,
importedJobs: Object.keys(stateFile.jobs).length,
removedPath: statePath,
};
}
function hasInlineState(jobs: Array<Record<string, unknown> | null | undefined>): boolean {
@@ -219,14 +286,13 @@ export async function loadCronStore(storePath: string): Promise<CronStoreFile> {
jobs: jobs.filter(Boolean) as never as CronStoreFile["jobs"],
};
// Load state file and merge.
const statePath = resolveStatePath(storePath);
const stateFile = await loadStateFile(statePath);
// Load runtime state from SQLite and merge.
const stateFile = readStateDatabase(storePath);
const hasLegacyInlineState =
!stateFile && hasInlineState(jobs as unknown as Array<Record<string, unknown>>);
if (stateFile) {
// State file exists: merge state by job ID. Inline state in jobs.json is ignored.
// Runtime state exists: merge state by job ID. Inline state in jobs.json is ignored.
for (const job of store.jobs) {
const entry = stateFile.jobs[job.id];
if (entry) {
@@ -287,7 +353,7 @@ export function loadCronStoreSync(storePath: string): CronStoreFile {
jobs: jobs.filter(Boolean) as never as CronStoreFile["jobs"],
};
const stateFile = loadStateFileSync(resolveStatePath(storePath));
const stateFile = readStateDatabaseSync(storePath);
const hasLegacyInlineState =
!stateFile && hasInlineState(jobs as unknown as Array<Record<string, unknown>>);
@@ -369,7 +435,6 @@ export async function saveCronStore(
const stateFile = extractStateFile(store);
const stateJson = JSON.stringify(stateFile, null, 2);
const statePath = resolveStatePath(storePath);
const cache = serializedStoreCache.get(storePath);
const configChanged = !stateOnly && cache?.configJson !== configJson;
@@ -378,7 +443,7 @@ export async function saveCronStore(
const configNeedsWrite = stateOnly
? false
: await serializedFileNeedsWrite(storePath, configJson, configChanged);
const stateNeedsWrite = await serializedFileNeedsWrite(statePath, stateJson, stateChanged);
const stateNeedsWrite = stateChanged || cache?.stateJson === undefined;
if (
stateOnly ? !stateNeedsWrite && !migrating : !configNeedsWrite && !stateNeedsWrite && !migrating
@@ -390,7 +455,7 @@ export async function saveCronStore(
// Write state first so migration never leaves stripped config without runtime state.
if (stateNeedsWrite || migrating) {
await atomicWrite(statePath, stateJson);
writeStateDatabase(storePath, stateFile);
updatedCache.stateJson = stateJson;
}

View File

@@ -10,11 +10,7 @@ import {
} from "../config/sessions.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { runCronIsolatedAgentTurn } from "../cron/isolated-agent.js";
import {
appendCronRunLog,
resolveCronRunLogPath,
resolveCronRunLogPruneOptions,
} from "../cron/run-log.js";
import { appendCronRunLogToSqlite, resolveCronRunLogPruneOptions } from "../cron/run-log.js";
import type { CronServiceContract } from "../cron/service-contract.js";
import { CronService } from "../cron/service.js";
import { resolveCronSessionTargetSessionKey } from "../cron/session-target.js";
@@ -394,12 +390,8 @@ export function buildGatewayCronService(params: {
warnedLegacyWebhookJobs,
});
const logPath = resolveCronRunLogPath({
void appendCronRunLogToSqlite(
storePath,
jobId: evt.jobId,
});
void appendCronRunLog(
logPath,
{
ts: Date.now(),
jobId: evt.jobId,
@@ -424,7 +416,7 @@ export function buildGatewayCronService(params: {
},
runLogPrune,
).catch((err) => {
cronLogger.warn({ err: String(err), logPath }, "cron: run log append failed");
cronLogger.warn({ err: String(err), storePath }, "cron: run log append failed");
});
}
},

View File

@@ -2,9 +2,8 @@ import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { resolveCronDeliveryPreviews } from "../../cron/delivery-preview.js";
import { normalizeCronJobCreate, normalizeCronJobPatch } from "../../cron/normalize.js";
import {
readCronRunLogEntriesPage,
readCronRunLogEntriesPageAll,
resolveCronRunLogPath,
readCronRunLogEntriesPageAllFromSqlite,
readCronRunLogEntriesPageFromSqlite,
} from "../../cron/run-log.js";
import { applyJobPatch } from "../../cron/service/jobs.js";
import { isInvalidCronSessionTargetIdError } from "../../cron/session-target.js";
@@ -510,7 +509,7 @@ export const cronHandlers: GatewayRequestHandlers = {
.filter((job) => typeof job.id === "string" && typeof job.name === "string")
.map((job) => [job.id, job.name]),
);
const page = await readCronRunLogEntriesPageAll({
const page = await readCronRunLogEntriesPageAllFromSqlite({
storePath: context.cronStorePath,
limit: p.limit,
offset: p.offset,
@@ -525,31 +524,25 @@ export const cronHandlers: GatewayRequestHandlers = {
respond(true, page, undefined);
return;
}
let logPath: string;
try {
logPath = resolveCronRunLogPath({
storePath: context.cronStorePath,
const page = await readCronRunLogEntriesPageFromSqlite(context.cronStorePath, {
limit: p.limit,
offset: p.offset,
jobId: jobId as string,
statuses: p.statuses,
status: p.status,
deliveryStatuses: p.deliveryStatuses,
deliveryStatus: p.deliveryStatus,
query: p.query,
sortDir: p.sortDir,
});
respond(true, page, undefined);
} catch {
respond(
false,
undefined,
errorShape(ErrorCodes.INVALID_REQUEST, "invalid cron.runs params: invalid id"),
);
return;
}
const page = await readCronRunLogEntriesPage(logPath, {
limit: p.limit,
offset: p.offset,
jobId: jobId as string,
statuses: p.statuses,
status: p.status,
deliveryStatuses: p.deliveryStatuses,
deliveryStatus: p.deliveryStatus,
query: p.query,
sortDir: p.sortDir,
});
respond(true, page, undefined);
},
};

View File

@@ -8,7 +8,7 @@ import {
resolveOpenClawStateSqlitePath,
} from "./openclaw-state-db.paths.js";
const OPENCLAW_STATE_SCHEMA_VERSION = 1;
const OPENCLAW_STATE_SCHEMA_VERSION = 2;
const OPENCLAW_STATE_DIR_MODE = 0o700;
const OPENCLAW_STATE_FILE_MODE = 0o600;
const OPENCLAW_STATE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
@@ -135,6 +135,19 @@ function ensureSchema(db: DatabaseSync, pathname: string): void {
PRIMARY KEY (agent_id, run_id, artifact_id)
);
CREATE TABLE IF NOT EXISTS cron_run_logs (
store_key TEXT NOT NULL,
job_id TEXT NOT NULL,
seq INTEGER NOT NULL,
ts INTEGER NOT NULL,
entry_json TEXT NOT NULL,
created_at INTEGER NOT NULL,
PRIMARY KEY (store_key, job_id, seq)
);
CREATE INDEX IF NOT EXISTS idx_cron_run_logs_store_ts
ON cron_run_logs(store_key, ts DESC, seq DESC);
PRAGMA user_version = ${OPENCLAW_STATE_SCHEMA_VERSION};
`);
}

View File

@@ -169,8 +169,7 @@ function createTaskRegistryMaintenanceHarness(params: {
isCronRuntimeAuthoritative: () => params.cronRuntimeAuthoritative ?? true,
resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json",
loadCronStoreSync: () => params.cronStore ?? { version: 1, jobs: [] },
resolveCronRunLogPath: ({ jobId }) => jobId,
readCronRunLogEntriesSync: (jobId) => cronRunLogEntries[jobId] ?? [],
readCronRunLogEntriesSync: (_storePath, opts) => cronRunLogEntries[opts?.jobId ?? ""] ?? [],
};
setTaskRegistryMaintenanceRuntimeForTests(runtime);

View File

@@ -12,7 +12,7 @@ import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
import type { SessionEntry } from "../config/sessions.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { isCronJobActive } from "../cron/active-jobs.js";
import { readCronRunLogEntriesSync, resolveCronRunLogPath } from "../cron/run-log.js";
import { readCronRunLogEntriesFromSqliteSync } from "../cron/run-log.js";
import type { CronRunLogEntry } from "../cron/run-log.js";
import { loadCronStoreSync, resolveCronStorePath } from "../cron/store.js";
import type { CronJob, CronStoreFile } from "../cron/types.js";
@@ -103,8 +103,7 @@ type TaskRegistryMaintenanceRuntime = {
isCronRuntimeAuthoritative: () => boolean;
resolveCronStorePath: typeof resolveCronStorePath;
loadCronStoreSync: typeof loadCronStoreSync;
resolveCronRunLogPath: typeof resolveCronRunLogPath;
readCronRunLogEntriesSync: typeof readCronRunLogEntriesSync;
readCronRunLogEntriesSync: typeof readCronRunLogEntriesFromSqliteSync;
};
const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = {
@@ -143,8 +142,7 @@ const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = {
isCronRuntimeAuthoritative: () => configuredCronRuntimeAuthoritative,
resolveCronStorePath: () => configuredCronStorePath ?? resolveCronStorePath(),
loadCronStoreSync,
resolveCronRunLogPath,
readCronRunLogEntriesSync,
readCronRunLogEntriesSync: readCronRunLogEntriesFromSqliteSync,
};
let taskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime =
@@ -331,11 +329,7 @@ function getCronRunLogEntries(context: CronRecoveryContext, jobId: string): Cron
}
let entries: CronRunLogEntry[] = [];
try {
const logPath = taskRegistryMaintenanceRuntime.resolveCronRunLogPath({
storePath: context.storePath,
jobId,
});
entries = taskRegistryMaintenanceRuntime.readCronRunLogEntriesSync(logPath, {
entries = taskRegistryMaintenanceRuntime.readCronRunLogEntriesSync(context.storePath, {
jobId,
limit: 5000,
});

View File

@@ -187,7 +187,6 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: {
isCronRuntimeAuthoritative: () => true,
resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json",
loadCronStoreSync: () => ({ version: 1, jobs: [] }),
resolveCronRunLogPath: ({ jobId }) => jobId,
readCronRunLogEntriesSync: () => [],
});
}
@@ -2109,7 +2108,6 @@ describe("task-registry", () => {
isCronRuntimeAuthoritative: () => true,
resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json",
loadCronStoreSync: () => ({ version: 1, jobs: [] }),
resolveCronRunLogPath: ({ jobId }) => jobId,
readCronRunLogEntriesSync: () => [],
});