mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-13 15:47:28 +00:00
refactor: remove cron store path runtime
This commit is contained in:
@@ -42,7 +42,7 @@ Cron is the Gateway's built-in scheduler. It persists jobs, wakes the agent at t
|
||||
- Cron runs **inside the Gateway** process (not inside the model).
|
||||
- Job definitions and runtime execution state persist in the shared SQLite state database at `~/.openclaw/state/openclaw.sqlite`.
|
||||
- Legacy `jobs.json` and `jobs-state.json` files are imported and removed by `openclaw doctor --fix`.
|
||||
- The optional `cron.store` path is now a legacy import namespace and display hint, not a runtime JSON writer.
|
||||
- `cron.store` is legacy config. Runtime cron jobs use the shared SQLite state database; doctor still checks the old `cron.store` path when importing pre-SQLite `jobs.json` files.
|
||||
- All cron executions create [background task](/automation/tasks) records.
|
||||
- On Gateway startup, overdue isolated agent-turn jobs are rescheduled out of the channel-connect window instead of replaying immediately, so Discord/Telegram startup and native-command setup stay responsive after restarts.
|
||||
- One-shot jobs (`--at`) auto-delete after success by default.
|
||||
@@ -413,7 +413,7 @@ Model override note:
|
||||
|
||||
`maxConcurrentRuns` limits both scheduled cron dispatch and isolated agent-turn execution. Isolated cron agent turns use the queue's dedicated `cron-nested` execution lane internally, so raising this value lets independent cron LLM runs progress in parallel instead of only starting their outer cron wrappers. The shared non-cron `nested` lane is not widened by this setting.
|
||||
|
||||
Cron data is keyed by the resolved `cron.store` value inside the shared SQLite state database. That value is a legacy import key, not a runtime JSON write path. SQLite stores job definitions, pending slots, active markers, last-run metadata, and the schedule identity used to invalidate stale pending slots after a job update.
|
||||
Cron data is keyed by a stable SQLite cron store key inside the shared state database. SQLite stores job definitions, pending slots, active markers, last-run metadata, and the schedule identity used to invalidate stale pending slots after a job update. Doctor still uses the old `cron.store` path only to discover and import pre-SQLite `jobs.json` files.
|
||||
|
||||
Run `openclaw doctor --fix` once after upgrading from an older version so doctor can import and remove legacy `jobs.json` and `jobs-state.json` files.
|
||||
|
||||
|
||||
@@ -202,7 +202,7 @@ For the plugin authoring guide, see [Plugin SDK overview](/plugins/sdk-overview)
|
||||
| `plugin-sdk/reply-reference` | `createReplyReferencePlanner` |
|
||||
| `plugin-sdk/reply-chunking` | Narrow text/markdown chunking helpers |
|
||||
| `plugin-sdk/session-store-runtime` | SQLite-backed session row, session-key, updated-at, and transcript locator helpers |
|
||||
| `plugin-sdk/cron-store-runtime` | SQLite cron store key/load/save helpers |
|
||||
| `plugin-sdk/cron-store-runtime` | SQLite cron store load/save helpers |
|
||||
| `plugin-sdk/state-paths` | State/OAuth dir path helpers |
|
||||
| `plugin-sdk/routing` | Route/session-key/account binding helpers such as `resolveAgentRoute`, `buildAgentSessionKey`, and `resolveDefaultAgentBoundAccountId` |
|
||||
| `plugin-sdk/status-helpers` | Shared channel/account status summary helpers, runtime-state defaults, and issue metadata helpers |
|
||||
|
||||
@@ -294,10 +294,10 @@ The remaining cleanup is mostly consolidation and deletion:
|
||||
deleting/reinserting the whole job table on each save. Plugin target
|
||||
writebacks update matching cron rows directly and keep runtime cron state in
|
||||
the same state-database transaction.
|
||||
- Cron runtime callers now resolve a SQLite cron store key. The old
|
||||
`resolveCronStorePath` name remains only as a compatibility alias for legacy
|
||||
import/test/plugin callers; production gateway, task maintenance, status, and
|
||||
Telegram target writeback paths use `resolveCronStoreKey`.
|
||||
- Cron runtime callers now use a stable SQLite cron store key. Legacy
|
||||
`cron.store` paths are doctor import inputs only; production gateway, task
|
||||
maintenance, status, and Telegram target writeback paths use
|
||||
`resolveCronStoreKey`.
|
||||
- ACP spawn no longer resolves or persists transcript JSONL file paths. Spawn
|
||||
and thread-bind setup persist the SQLite session row directly and keep the
|
||||
session id as the retained transcript identity.
|
||||
@@ -954,7 +954,7 @@ keeps only the version-1 schema plus doctor file-to-database import.
|
||||
`/status` and chat-driven trajectory export no longer propagate legacy
|
||||
store paths; transcript usage fallback reads SQLite by agent/session
|
||||
identity. Remaining `storePath` call surfaces are migration/path metadata,
|
||||
cron store paths, transcript-path metadata, and gateway aggregate lookup.
|
||||
transcript-path metadata, and gateway aggregate lookup.
|
||||
Gateway combined-session loading no longer has a special runtime branch for
|
||||
non-templated `session.store` values; it aggregates per-agent SQLite rows.
|
||||
The legacy session-lock doctor lane and its `.jsonl.lock` cleanup helper
|
||||
|
||||
@@ -192,7 +192,7 @@ export async function maybePersistResolvedTelegramTarget(params: {
|
||||
}
|
||||
|
||||
try {
|
||||
const storeKey = resolveCronStoreKey(params.cfg.cron?.store);
|
||||
const storeKey = resolveCronStoreKey();
|
||||
const result = await updateCronStoreJobs(storeKey, (job) => {
|
||||
if (job.delivery?.channel !== "telegram") {
|
||||
return undefined;
|
||||
|
||||
@@ -26,12 +26,9 @@ export function hasUnbackedReminderCommitment(text: string): boolean {
|
||||
* current session key. Used to suppress the "no reminder scheduled" guard note
|
||||
* when an existing cron (created in a prior turn) already covers the commitment.
|
||||
*/
|
||||
export async function hasSessionRelatedCronJobs(params: {
|
||||
cronStorePath?: string;
|
||||
sessionKey?: string;
|
||||
}): Promise<boolean> {
|
||||
export async function hasSessionRelatedCronJobs(params: { sessionKey?: string }): Promise<boolean> {
|
||||
try {
|
||||
const cronStorePath = resolveCronStoreKey(params.cronStorePath);
|
||||
const cronStorePath = resolveCronStoreKey();
|
||||
const store = await loadCronStore(cronStorePath);
|
||||
if (store.jobs.length === 0) {
|
||||
return false;
|
||||
|
||||
@@ -188,8 +188,7 @@ const loadCronStoreMock = vi.fn();
|
||||
vi.mock("../../cron/store.js", () => {
|
||||
return {
|
||||
loadCronStore: (...args: unknown[]) => loadCronStoreMock(...args),
|
||||
resolveCronStoreKey: (statePath?: string) => statePath ?? "/tmp/openclaw-cron-store.json",
|
||||
resolveCronStorePath: (statePath?: string) => statePath ?? "/tmp/openclaw-cron-store.json",
|
||||
resolveCronStoreKey: () => "default",
|
||||
};
|
||||
});
|
||||
|
||||
|
||||
@@ -1479,7 +1479,6 @@ export async function runReplyAgent(params: {
|
||||
const coveredByExistingCron =
|
||||
hasReminderCommitment && successfulCronAdds === 0
|
||||
? await hasSessionRelatedCronJobs({
|
||||
cronStorePath: cfg.cron?.store,
|
||||
sessionKey,
|
||||
})
|
||||
: false;
|
||||
|
||||
@@ -3,7 +3,7 @@ import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { loadCronStore, saveCronStore } from "../cron/store.js";
|
||||
import { loadCronStore, resolveCronStoreKey, saveCronStore } from "../cron/store.js";
|
||||
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
|
||||
import { maybeRepairLegacyCronStore, noteLegacyWhatsAppCrontabHealthCheck } from "./doctor-cron.js";
|
||||
|
||||
@@ -117,7 +117,7 @@ describe("maybeRepairLegacyCronStore", () => {
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const persisted = await loadCronStore(storePath);
|
||||
const persisted = await loadCronStore(resolveCronStoreKey());
|
||||
const [job] = persisted.jobs;
|
||||
const legacyJob = job as Record<string, unknown> | undefined;
|
||||
expect(legacyJob?.jobId).toBeUndefined();
|
||||
@@ -188,7 +188,7 @@ describe("maybeRepairLegacyCronStore", () => {
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const loaded = await loadCronStore(storePath);
|
||||
const loaded = await loadCronStore(resolveCronStoreKey());
|
||||
expect(loaded.jobs[0]?.updatedAtMs).toBe(Date.parse("2026-02-01T00:01:00.000Z"));
|
||||
expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(Date.parse("2026-02-01T00:02:00.000Z"));
|
||||
await expect(fs.stat(statePath)).rejects.toThrow();
|
||||
@@ -201,7 +201,7 @@ describe("maybeRepairLegacyCronStore", () => {
|
||||
it("imports legacy cron runtime state sidecars when job definitions are already SQLite-backed", async () => {
|
||||
const storePath = await makeTempStorePath();
|
||||
const statePath = storePath.replace(/\.json$/, "-state.json");
|
||||
await saveCronStore(storePath, {
|
||||
await saveCronStore(resolveCronStoreKey(), {
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
@@ -243,7 +243,7 @@ describe("maybeRepairLegacyCronStore", () => {
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const loaded = await loadCronStore(storePath);
|
||||
const loaded = await loadCronStore(resolveCronStoreKey());
|
||||
expect(loaded.jobs[0]?.updatedAtMs).toBe(Date.parse("2026-02-01T00:01:00.000Z"));
|
||||
expect(loaded.jobs[0]?.state.nextRunAtMs).toBe(Date.parse("2026-02-01T00:02:00.000Z"));
|
||||
await expect(fs.stat(storePath)).rejects.toThrow();
|
||||
@@ -284,9 +284,9 @@ describe("maybeRepairLegacyCronStore", () => {
|
||||
});
|
||||
|
||||
const { readCronRunLogEntriesFromSqliteSync } = await import("../cron/run-log.js");
|
||||
expect(readCronRunLogEntriesFromSqliteSync(storePath, { jobId: "stateful-job" })).toEqual([
|
||||
expect.objectContaining({ ts: 1, status: "ok" }),
|
||||
]);
|
||||
expect(
|
||||
readCronRunLogEntriesFromSqliteSync(resolveCronStoreKey(), { jobId: "stateful-job" }),
|
||||
).toEqual([expect.objectContaining({ ts: 1, status: "ok" })]);
|
||||
await expect(fs.stat(logPath)).rejects.toThrow();
|
||||
expect(noteMock).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Imported 1 cron run-log row from 1 legacy run-log file"),
|
||||
@@ -297,7 +297,7 @@ describe("maybeRepairLegacyCronStore", () => {
|
||||
it("imports legacy cron run-log files when job definitions are already SQLite-backed", async () => {
|
||||
const storePath = await makeTempStorePath();
|
||||
const logPath = path.join(path.dirname(storePath), "runs", "stateful-job.jsonl");
|
||||
await saveCronStore(storePath, {
|
||||
await saveCronStore(resolveCronStoreKey(), {
|
||||
version: 1,
|
||||
jobs: [
|
||||
{
|
||||
@@ -328,9 +328,9 @@ describe("maybeRepairLegacyCronStore", () => {
|
||||
});
|
||||
|
||||
const { readCronRunLogEntriesFromSqliteSync } = await import("../cron/run-log.js");
|
||||
expect(readCronRunLogEntriesFromSqliteSync(storePath, { jobId: "stateful-job" })).toEqual([
|
||||
expect.objectContaining({ ts: 1, status: "ok" }),
|
||||
]);
|
||||
expect(
|
||||
readCronRunLogEntriesFromSqliteSync(resolveCronStoreKey(), { jobId: "stateful-job" }),
|
||||
).toEqual([expect.objectContaining({ ts: 1, status: "ok" })]);
|
||||
await expect(fs.stat(storePath)).rejects.toThrow();
|
||||
await expect(fs.stat(logPath)).rejects.toThrow();
|
||||
expect(noteMock).toHaveBeenCalledWith(
|
||||
@@ -361,7 +361,7 @@ describe("maybeRepairLegacyCronStore", () => {
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const persisted = await loadCronStore(storePath);
|
||||
const persisted = await loadCronStore(resolveCronStoreKey());
|
||||
expect(persisted.jobs[0]?.id).toBe("42");
|
||||
expect(typeof persisted.jobs[1]?.id).toBe("string");
|
||||
expect(persisted.jobs[1]?.id).toMatch(/^cron-/);
|
||||
@@ -418,7 +418,7 @@ describe("maybeRepairLegacyCronStore", () => {
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const persisted = await loadCronStore(storePath);
|
||||
const persisted = await loadCronStore(resolveCronStoreKey());
|
||||
expect((persisted.jobs[0] as Record<string, unknown> | undefined)?.notify).toBe(true);
|
||||
expect(noteSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining('uses legacy notify fallback alongside delivery mode "announce"'),
|
||||
@@ -495,7 +495,7 @@ describe("maybeRepairLegacyCronStore", () => {
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const persisted = await loadCronStore(storePath);
|
||||
const persisted = await loadCronStore(resolveCronStoreKey());
|
||||
expect((persisted.jobs[0] as Record<string, unknown> | undefined)?.notify).toBeUndefined();
|
||||
expect(persisted.jobs[0]?.delivery).toMatchObject({
|
||||
mode: "webhook",
|
||||
@@ -532,7 +532,7 @@ describe("maybeRepairLegacyCronStore", () => {
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const persisted = await loadCronStore(storePath);
|
||||
const persisted = await loadCronStore(resolveCronStoreKey());
|
||||
const legacyJob = persisted.jobs[0] as Record<string, unknown> | undefined;
|
||||
expect(legacyJob?.channel).toBeUndefined();
|
||||
expect(legacyJob?.to).toBeUndefined();
|
||||
@@ -575,7 +575,7 @@ describe("maybeRepairLegacyCronStore", () => {
|
||||
prompter: makePrompter(true),
|
||||
});
|
||||
|
||||
const persisted = await loadCronStore(storePath);
|
||||
const persisted = await loadCronStore(resolveCronStoreKey());
|
||||
const [job] = persisted.jobs;
|
||||
expect(job).toMatchObject({
|
||||
sessionTarget: "isolated",
|
||||
|
||||
@@ -4,7 +4,12 @@ import { promisify } from "node:util";
|
||||
import { formatCliCommand } from "../cli/command-format.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { resolveCronRunLogPruneOptions } from "../cron/run-log.js";
|
||||
import { resolveCronStorePath, loadCronStore, saveCronStore } from "../cron/store.js";
|
||||
import {
|
||||
resolveCronStoreKey,
|
||||
resolveLegacyCronStorePath,
|
||||
loadCronStore,
|
||||
saveCronStore,
|
||||
} from "../cron/store.js";
|
||||
import type { CronJob } from "../cron/types.js";
|
||||
import {
|
||||
normalizeOptionalLowercaseString,
|
||||
@@ -206,13 +211,15 @@ export async function maybeRepairLegacyCronStore(params: {
|
||||
options: DoctorOptions;
|
||||
prompter: Pick<DoctorPrompter, "confirm">;
|
||||
}) {
|
||||
const storePath = resolveCronStorePath(params.cfg.cron?.store);
|
||||
const hasLegacyStoreFile = legacyCronStoreFileExists(storePath);
|
||||
const hasLegacyStateSidecar = legacyCronStateFileExists(storePath);
|
||||
const hasLegacyRunLogs = await legacyCronRunLogFilesExist(storePath);
|
||||
const configuredLegacyStorePath = (params.cfg.cron as { store?: string } | undefined)?.store;
|
||||
const legacyStorePath = resolveLegacyCronStorePath(configuredLegacyStorePath);
|
||||
const storeKey = resolveCronStoreKey();
|
||||
const hasLegacyStoreFile = legacyCronStoreFileExists(legacyStorePath);
|
||||
const hasLegacyStateSidecar = legacyCronStateFileExists(legacyStorePath);
|
||||
const hasLegacyRunLogs = await legacyCronRunLogFilesExist(legacyStorePath);
|
||||
const store =
|
||||
(hasLegacyStoreFile ? await loadLegacyCronStoreForMigration(storePath) : null) ??
|
||||
(await loadCronStore(storePath));
|
||||
(hasLegacyStoreFile ? await loadLegacyCronStoreForMigration(legacyStorePath) : null) ??
|
||||
(await loadCronStore(storeKey));
|
||||
const rawJobs = (store.jobs ?? []) as unknown as Array<Record<string, unknown>>;
|
||||
if (rawJobs.length === 0 && !hasLegacyStoreFile && !hasLegacyStateSidecar && !hasLegacyRunLogs) {
|
||||
return;
|
||||
@@ -248,7 +255,7 @@ export async function maybeRepairLegacyCronStore(params: {
|
||||
|
||||
note(
|
||||
[
|
||||
`Legacy cron job storage detected at ${shortenHomePath(storePath)}.`,
|
||||
`Legacy cron job storage detected at ${shortenHomePath(legacyStorePath)}.`,
|
||||
...previewLines,
|
||||
`Repair with ${formatCliCommand("openclaw doctor --fix")} to normalize the store and import runtime state into SQLite before the next scheduler run.`,
|
||||
].join("\n"),
|
||||
@@ -279,14 +286,14 @@ export async function maybeRepairLegacyCronStore(params: {
|
||||
}
|
||||
|
||||
if (changed) {
|
||||
await saveCronStore(storePath, {
|
||||
await saveCronStore(storeKey, {
|
||||
version: 1,
|
||||
jobs: rawJobs as unknown as CronJob[],
|
||||
});
|
||||
if (hasLegacyStoreFile) {
|
||||
await fs.rm(storePath, { force: true }).catch(() => undefined);
|
||||
await fs.rm(legacyStorePath, { force: true }).catch(() => undefined);
|
||||
}
|
||||
note(`Cron store normalized at ${shortenHomePath(storePath)}.`, "Doctor changes");
|
||||
note(`Cron store normalized at ${shortenHomePath(legacyStorePath)}.`, "Doctor changes");
|
||||
if (dreamingMigration.rewrittenCount > 0) {
|
||||
note(
|
||||
`Rewrote ${pluralize(dreamingMigration.rewrittenCount, "managed dreaming job")} to run as an isolated agent turn so dreaming no longer requires heartbeat.`,
|
||||
@@ -296,7 +303,7 @@ export async function maybeRepairLegacyCronStore(params: {
|
||||
}
|
||||
|
||||
const stateImport = hasLegacyStateSidecar
|
||||
? await importLegacyCronStateFileToSqlite(storePath)
|
||||
? await importLegacyCronStateFileToSqlite({ legacyStorePath, storeKey })
|
||||
: { imported: false, importedJobs: 0 };
|
||||
if (stateImport.imported) {
|
||||
note(
|
||||
@@ -307,7 +314,8 @@ export async function maybeRepairLegacyCronStore(params: {
|
||||
|
||||
if (hasLegacyRunLogs) {
|
||||
const runLogImport = await importLegacyCronRunLogFilesToSqlite({
|
||||
storePath,
|
||||
legacyStorePath,
|
||||
storeKey,
|
||||
opts: resolveCronRunLogPruneOptions(params.cfg.cron?.runLog),
|
||||
});
|
||||
if (runLogImport.files > 0) {
|
||||
|
||||
@@ -17,10 +17,11 @@ export async function legacyCronRunLogFilesExist(storePath: string): Promise<boo
|
||||
}
|
||||
|
||||
export async function importLegacyCronRunLogFilesToSqlite(params: {
|
||||
storePath: string;
|
||||
legacyStorePath: string;
|
||||
storeKey: string;
|
||||
opts?: { maxBytes?: number; keepLines?: number };
|
||||
}): Promise<{ imported: number; files: number; removedDir?: string }> {
|
||||
const runsDir = path.resolve(path.dirname(path.resolve(params.storePath)), "runs");
|
||||
const runsDir = path.resolve(path.dirname(path.resolve(params.legacyStorePath)), "runs");
|
||||
if (!(await pathExists(runsDir))) {
|
||||
return { imported: 0, files: 0 };
|
||||
}
|
||||
@@ -35,7 +36,7 @@ export async function importLegacyCronRunLogFilesToSqlite(params: {
|
||||
for (const fileName of files) {
|
||||
const raw = await runsRoot.readText(fileName).catch(() => "");
|
||||
for (const entry of parseAllRunLogEntries(raw)) {
|
||||
await appendCronRunLogToSqlite(params.storePath, entry, params.opts);
|
||||
await appendCronRunLogToSqlite(params.storeKey, entry, params.opts);
|
||||
imported++;
|
||||
}
|
||||
await fs.rm(path.join(runsDir, fileName), { force: true }).catch(() => undefined);
|
||||
|
||||
@@ -112,17 +112,20 @@ export async function loadLegacyCronStoreForMigration(
|
||||
};
|
||||
}
|
||||
|
||||
export async function importLegacyCronStateFileToSqlite(storePath: string): Promise<{
|
||||
export async function importLegacyCronStateFileToSqlite(params: {
|
||||
legacyStorePath: string;
|
||||
storeKey: string;
|
||||
}): Promise<{
|
||||
imported: boolean;
|
||||
importedJobs: number;
|
||||
removedPath?: string;
|
||||
}> {
|
||||
const statePath = resolveStatePath(storePath);
|
||||
const statePath = resolveStatePath(params.legacyStorePath);
|
||||
const stateFile = await loadStateFile(statePath);
|
||||
if (!stateFile) {
|
||||
return { imported: false, importedJobs: 0 };
|
||||
}
|
||||
const importedJobs = writeCronJobRuntimeStateForMigration(storePath, stateFile);
|
||||
const importedJobs = writeCronJobRuntimeStateForMigration(params.storeKey, stateFile);
|
||||
try {
|
||||
await fs.promises.rm(statePath, { force: true });
|
||||
} catch {
|
||||
@@ -135,27 +138,31 @@ export async function importLegacyCronStateFileToSqlite(storePath: string): Prom
|
||||
};
|
||||
}
|
||||
|
||||
export async function importLegacyCronStoreToSqlite(storePath: string): Promise<{
|
||||
export async function importLegacyCronStoreToSqlite(params: {
|
||||
legacyStorePath: string;
|
||||
storeKey: string;
|
||||
}): Promise<{
|
||||
imported: boolean;
|
||||
importedJobs: number;
|
||||
removedPath?: string;
|
||||
}> {
|
||||
const store = await loadLegacyCronStoreForMigration(storePath);
|
||||
const store = await loadLegacyCronStoreForMigration(params.legacyStorePath);
|
||||
if (!store) {
|
||||
return { imported: false, importedJobs: 0 };
|
||||
}
|
||||
const stateFile =
|
||||
(await loadStateFile(resolveStatePath(storePath))) ?? extractCronStateFileForMigration(store);
|
||||
writeCronJobsForMigration(storePath, store);
|
||||
writeCronJobRuntimeStateForMigration(storePath, stateFile);
|
||||
(await loadStateFile(resolveStatePath(params.legacyStorePath))) ??
|
||||
extractCronStateFileForMigration(store);
|
||||
writeCronJobsForMigration(params.storeKey, store);
|
||||
writeCronJobRuntimeStateForMigration(params.storeKey, stateFile);
|
||||
try {
|
||||
await fs.promises.rm(storePath, { force: true });
|
||||
await fs.promises.rm(params.legacyStorePath, { force: true });
|
||||
} catch {
|
||||
// Import already succeeded; doctor can remove the stale source on the next pass.
|
||||
}
|
||||
return {
|
||||
imported: true,
|
||||
importedJobs: store.jobs.length,
|
||||
removedPath: storePath,
|
||||
removedPath: params.legacyStorePath,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -121,6 +121,16 @@ const DOCTOR_DEPRECATION_COMPAT_RECORDS = [
|
||||
docsPath: "/automation",
|
||||
tests: ["src/commands/doctor/shared/legacy-config-migrate.test.ts"],
|
||||
}),
|
||||
deprecatedCompatRecord({
|
||||
code: "doctor-cron-store",
|
||||
owner: "config",
|
||||
introduced: "2026-05-09",
|
||||
source: "cron.store",
|
||||
migration: "src/commands/doctor/shared/legacy-config-migrations.runtime.gateway.ts",
|
||||
replacement: "shared SQLite cron store",
|
||||
docsPath: "/automation/cron-jobs",
|
||||
tests: ["src/commands/doctor/shared/legacy-config-migrate.test.ts"],
|
||||
}),
|
||||
deprecatedCompatRecord({
|
||||
code: "doctor-mcp-server-type-alias",
|
||||
owner: "config",
|
||||
|
||||
@@ -144,6 +144,26 @@ describe("legacy memory search store migrate", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("legacy cron store migrate", () => {
|
||||
it("removes ignored cron.store path config", () => {
|
||||
const res = migrateLegacyConfigForTest({
|
||||
cron: {
|
||||
enabled: true,
|
||||
store: "~/.openclaw/cron/jobs.json",
|
||||
maxConcurrentRuns: 2,
|
||||
},
|
||||
});
|
||||
|
||||
expect(res.config?.cron).toEqual({
|
||||
enabled: true,
|
||||
maxConcurrentRuns: 2,
|
||||
});
|
||||
expect(res.changes).toContain(
|
||||
"Removed cron.store; cron jobs now use the shared SQLite database.",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("legacy session parent fork migrate", () => {
|
||||
it("removes ignored session.store", () => {
|
||||
const res = migrateLegacyConfigForTest({
|
||||
|
||||
@@ -21,6 +21,16 @@ const GATEWAY_BIND_RULE: LegacyConfigRule = {
|
||||
requireSourceLiteral: true,
|
||||
};
|
||||
|
||||
const LEGACY_CRON_STORE_RULE: LegacyConfigRule = {
|
||||
path: ["cron"],
|
||||
message:
|
||||
'cron.store is legacy; cron jobs now use the shared SQLite database. Run "openclaw doctor --fix" to remove it after legacy import.',
|
||||
match: (value) => {
|
||||
const cron = getRecord(value);
|
||||
return Boolean(cron && Object.prototype.hasOwnProperty.call(cron, "store"));
|
||||
},
|
||||
};
|
||||
|
||||
function isLegacyGatewayBindHostAlias(value: unknown): boolean {
|
||||
const normalized = normalizeOptionalLowercaseString(value);
|
||||
if (!normalized) {
|
||||
@@ -52,6 +62,19 @@ function escapeControlForLog(value: string): string {
|
||||
}
|
||||
|
||||
export const LEGACY_CONFIG_MIGRATIONS_RUNTIME_GATEWAY: LegacyConfigMigrationSpec[] = [
|
||||
defineLegacyConfigMigration({
|
||||
id: "cron.store",
|
||||
describe: "Remove legacy cron.store path settings",
|
||||
legacyRules: [LEGACY_CRON_STORE_RULE],
|
||||
apply: (raw, changes) => {
|
||||
const cron = getRecord(raw.cron);
|
||||
if (!cron || !Object.prototype.hasOwnProperty.call(cron, "store")) {
|
||||
return;
|
||||
}
|
||||
delete cron.store;
|
||||
changes.push("Removed cron.store; cron jobs now use the shared SQLite database.");
|
||||
},
|
||||
}),
|
||||
defineLegacyConfigMigration({
|
||||
id: "gateway.controlUi.allowedOrigins-seed-for-non-loopback",
|
||||
describe: "Seed gateway.controlUi.allowedOrigins for existing non-loopback gateway installs",
|
||||
|
||||
@@ -149,7 +149,7 @@ export async function getStatusSummary(
|
||||
const queuedSystemEvents = peekSystemEvents(mainSessionKey);
|
||||
const taskMaintenanceModule = await loadTaskRegistryMaintenanceModule();
|
||||
taskMaintenanceModule.configureTaskRegistryMaintenance({
|
||||
cronStorePath: resolveCronStoreKey(cfg.cron?.store),
|
||||
cronStorePath: resolveCronStoreKey(),
|
||||
});
|
||||
const tasks = taskMaintenanceModule.getInspectableTaskRegistrySummary();
|
||||
const taskAudit = taskMaintenanceModule.getInspectableTaskAuditSummary();
|
||||
|
||||
@@ -58,7 +58,7 @@ async function loadTaskCancelConfig() {
|
||||
function configureTaskMaintenanceFromConfig(): void {
|
||||
const cfg = getRuntimeConfig();
|
||||
configureTaskRegistryMaintenance({
|
||||
cronStorePath: resolveCronStoreKey(cfg.cron?.store),
|
||||
cronStorePath: resolveCronStoreKey(),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -127,7 +127,6 @@ const TARGET_KEYS = [
|
||||
"gateway.controlUi.embedSandbox",
|
||||
"cron",
|
||||
"cron.enabled",
|
||||
"cron.store",
|
||||
"cron.maxConcurrentRuns",
|
||||
"cron.retry",
|
||||
"cron.retry.maxAttempts",
|
||||
|
||||
@@ -1515,8 +1515,6 @@ export const FIELD_HELP: Record<string, string> = {
|
||||
cron: "Global scheduler settings for stored cron jobs, run concurrency, delivery fallback, and run-session retention. Keep defaults unless you are scaling job volume or integrating external webhook receivers.",
|
||||
"cron.enabled":
|
||||
"Enables cron job execution for stored schedules managed by the gateway. Keep enabled for normal reminder/automation flows, and disable only to pause all cron execution without deleting jobs.",
|
||||
"cron.store":
|
||||
"Legacy cron store path used as the import namespace for old jobs.json files. Scheduled jobs now persist in the shared SQLite state database; set this only when importing or identifying a custom legacy store.",
|
||||
"cron.maxConcurrentRuns":
|
||||
"Limits how many cron jobs can execute at the same time when multiple schedules fire together, including isolated agent-turn LLM execution on the dedicated cron-nested lane. Use lower values to protect CPU/memory under heavy automation load, or raise carefully for higher throughput.",
|
||||
"cron.retry":
|
||||
|
||||
@@ -738,7 +738,6 @@ export const FIELD_LABELS: Record<string, string> = {
|
||||
"session.threadBindings.defaultSpawnContext": "Thread Spawn Context",
|
||||
cron: "Cron",
|
||||
"cron.enabled": "Cron Enabled",
|
||||
"cron.store": "Cron Legacy Store Key",
|
||||
"cron.maxConcurrentRuns": "Cron Max Concurrent Runs",
|
||||
"cron.retry": "Cron Retry Policy",
|
||||
"cron.retry.maxAttempts": "Cron Retry Max Attempts",
|
||||
|
||||
@@ -30,7 +30,6 @@ export type CronFailureDestinationConfig = {
|
||||
|
||||
export type CronConfig = {
|
||||
enabled?: boolean;
|
||||
store?: string;
|
||||
maxConcurrentRuns?: number;
|
||||
/** Override default retry policy for one-shot jobs on transient errors. */
|
||||
retry?: CronRetryConfig;
|
||||
|
||||
@@ -681,7 +681,6 @@ export const OpenClawSchema = z
|
||||
cron: z
|
||||
.object({
|
||||
enabled: z.boolean().optional(),
|
||||
store: z.string().optional(),
|
||||
maxConcurrentRuns: z.number().int().positive().optional(),
|
||||
retry: z
|
||||
.object({
|
||||
|
||||
@@ -110,7 +110,10 @@ describe("cron run log", () => {
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const result = await importLegacyCronRunLogFilesToSqlite({ storePath });
|
||||
const result = await importLegacyCronRunLogFilesToSqlite({
|
||||
legacyStorePath: storePath,
|
||||
storeKey: storePath,
|
||||
});
|
||||
|
||||
expect(result).toMatchObject({ imported: 1, files: 1 });
|
||||
expect(readCronRunLogEntriesFromSqliteSync(storePath, { jobId: "job-1" })).toEqual([
|
||||
|
||||
@@ -236,7 +236,7 @@ describe("cron service ops seam coverage", () => {
|
||||
),
|
||||
"utf-8",
|
||||
);
|
||||
await importLegacyCronStoreToSqlite(storePath);
|
||||
await importLegacyCronStoreToSqlite({ legacyStorePath: storePath, storeKey: storePath });
|
||||
|
||||
const state = createCronServiceState({
|
||||
storePath,
|
||||
|
||||
@@ -11,7 +11,7 @@ import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js
|
||||
import {
|
||||
loadCronStore,
|
||||
loadCronStoreSync,
|
||||
resolveCronStoreKey,
|
||||
resolveLegacyCronStorePath,
|
||||
saveCronStore,
|
||||
updateCronStoreJobs,
|
||||
} from "./store.js";
|
||||
@@ -72,7 +72,7 @@ async function expectPathMissing(targetPath: string): Promise<void> {
|
||||
await expect(fs.stat(targetPath)).rejects.toMatchObject({ code: "ENOENT" });
|
||||
}
|
||||
|
||||
describe("resolveCronStoreKey", () => {
|
||||
describe("resolveLegacyCronStorePath", () => {
|
||||
afterEach(() => {
|
||||
vi.unstubAllEnvs();
|
||||
});
|
||||
@@ -81,7 +81,7 @@ describe("resolveCronStoreKey", () => {
|
||||
vi.stubEnv("OPENCLAW_HOME", "/srv/openclaw-home");
|
||||
vi.stubEnv("HOME", "/home/other");
|
||||
|
||||
const result = resolveCronStoreKey("~/cron/jobs.json");
|
||||
const result = resolveLegacyCronStorePath("~/cron/jobs.json");
|
||||
expect(result).toBe(path.resolve("/srv/openclaw-home", "cron", "jobs.json"));
|
||||
});
|
||||
});
|
||||
@@ -224,7 +224,12 @@ describe("cron store", () => {
|
||||
await fs.mkdir(path.dirname(store.storePath), { recursive: true });
|
||||
await fs.writeFile(store.storePath, JSON.stringify(legacy, null, 2), "utf-8");
|
||||
|
||||
await expect(importLegacyCronStoreToSqlite(store.storePath)).resolves.toMatchObject({
|
||||
await expect(
|
||||
importLegacyCronStoreToSqlite({
|
||||
legacyStorePath: store.storePath,
|
||||
storeKey: store.storePath,
|
||||
}),
|
||||
).resolves.toMatchObject({
|
||||
imported: true,
|
||||
importedJobs: 1,
|
||||
removedPath: store.storePath,
|
||||
@@ -266,7 +271,10 @@ describe("cron store", () => {
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
await importLegacyCronStateFileToSqlite(store.storePath);
|
||||
await importLegacyCronStateFileToSqlite({
|
||||
legacyStorePath: store.storePath,
|
||||
storeKey: store.storePath,
|
||||
});
|
||||
const loaded = await loadCronStore(store.storePath);
|
||||
|
||||
expect(loaded.jobs[0]?.updatedAtMs).toBe(job.createdAtMs);
|
||||
@@ -298,9 +306,12 @@ describe("cron store", () => {
|
||||
});
|
||||
|
||||
try {
|
||||
await expect(importLegacyCronStateFileToSqlite(store.storePath)).rejects.toThrow(
|
||||
/Failed to read cron state/,
|
||||
);
|
||||
await expect(
|
||||
importLegacyCronStateFileToSqlite({
|
||||
legacyStorePath: store.storePath,
|
||||
storeKey: store.storePath,
|
||||
}),
|
||||
).rejects.toThrow(/Failed to read cron state/);
|
||||
} finally {
|
||||
spy.mockRestore();
|
||||
}
|
||||
|
||||
@@ -23,11 +23,13 @@ type CronJobRow = {
|
||||
state_json: string;
|
||||
};
|
||||
|
||||
const DEFAULT_CRON_STORE_KEY = "default";
|
||||
|
||||
function resolveDefaultCronDir(): string {
|
||||
return path.join(resolveConfigDir(), "cron");
|
||||
}
|
||||
|
||||
function resolveDefaultCronStoreKey(): string {
|
||||
function resolveDefaultLegacyCronStorePath(): string {
|
||||
return path.join(resolveDefaultCronDir(), "jobs.json");
|
||||
}
|
||||
|
||||
@@ -42,8 +44,9 @@ export type CronStateFile = {
|
||||
jobs: Record<string, CronStateFileEntry>;
|
||||
};
|
||||
|
||||
function cronStoreKey(storePath: string): string {
|
||||
return path.resolve(storePath);
|
||||
function cronStoreKey(storeKey: string): string {
|
||||
const normalized = storeKey.trim();
|
||||
return normalized || DEFAULT_CRON_STORE_KEY;
|
||||
}
|
||||
|
||||
function getCronJobsKysely(db: import("node:sqlite").DatabaseSync) {
|
||||
@@ -73,7 +76,11 @@ function extractStateFile(store: CronStoreFile): CronStateFile {
|
||||
|
||||
export const extractCronStateFileForMigration = extractStateFile;
|
||||
|
||||
export function resolveCronStoreKey(configuredLegacyStorePath?: string) {
|
||||
export function resolveCronStoreKey(): string {
|
||||
return DEFAULT_CRON_STORE_KEY;
|
||||
}
|
||||
|
||||
export function resolveLegacyCronStorePath(configuredLegacyStorePath?: string): string {
|
||||
if (configuredLegacyStorePath?.trim()) {
|
||||
const raw = configuredLegacyStorePath.trim();
|
||||
if (raw.startsWith("~")) {
|
||||
@@ -81,15 +88,9 @@ export function resolveCronStoreKey(configuredLegacyStorePath?: string) {
|
||||
}
|
||||
return path.resolve(raw);
|
||||
}
|
||||
return resolveDefaultCronStoreKey();
|
||||
return resolveDefaultLegacyCronStorePath();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Use `resolveCronStoreKey`. The returned value is now a SQLite
|
||||
* partition key and legacy import namespace, not a runtime JSON store path.
|
||||
*/
|
||||
export const resolveCronStorePath = resolveCronStoreKey;
|
||||
|
||||
function ensureJobStateObject(job: CronStoreFile["jobs"][number]): void {
|
||||
if (!job.state || typeof job.state !== "object") {
|
||||
job.state = {} as never;
|
||||
|
||||
@@ -16,7 +16,7 @@ type LoadedGatewayCronState = {
|
||||
};
|
||||
|
||||
export function createLazyGatewayCronState(params: LazyGatewayCronParams): GatewayCronState {
|
||||
const storePath = resolveCronStoreKey(params.cfg.cron?.store);
|
||||
const storePath = resolveCronStoreKey();
|
||||
const cronEnabled = process.env.OPENCLAW_SKIP_CRON !== "1" && params.cfg.cron?.enabled !== false;
|
||||
let loaded: LoadedGatewayCronState | null = null;
|
||||
let loading: Promise<LoadedGatewayCronState> | null = null;
|
||||
|
||||
@@ -87,7 +87,7 @@ export function buildGatewayCronService(params: {
|
||||
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
|
||||
}): GatewayCronState {
|
||||
const cronLogger = getChildLogger({ module: "cron" });
|
||||
const storePath = resolveCronStoreKey(params.cfg.cron?.store);
|
||||
const storePath = resolveCronStoreKey();
|
||||
const cronEnabled = process.env.OPENCLAW_SKIP_CRON !== "1" && params.cfg.cron?.enabled !== false;
|
||||
|
||||
const findAgentEntry = (cfg: OpenClawConfig, agentId: string) =>
|
||||
|
||||
@@ -115,7 +115,7 @@ export async function startGatewayEarlyRuntime(params: {
|
||||
setSkillsRemoteRegistry(params.nodeRegistry);
|
||||
void primeRemoteSkillsCache();
|
||||
taskRegistryMaintenance.configureTaskRegistryMaintenance({
|
||||
cronStorePath: resolveCronStoreKey(params.cfgAtStart.cron?.store),
|
||||
cronStorePath: resolveCronStoreKey(),
|
||||
cronRuntimeAuthoritative: true,
|
||||
});
|
||||
taskRegistryMaintenance.startTaskRegistryMaintenance();
|
||||
|
||||
@@ -5,7 +5,7 @@ import { setImmediate as setImmediatePromise } from "node:timers/promises";
|
||||
import { afterAll, beforeEach, describe, expect, test, vi } from "vitest";
|
||||
import type WebSocket from "ws";
|
||||
import { resetConfigRuntimeState } from "../config/config.js";
|
||||
import { saveCronStore } from "../cron/store.js";
|
||||
import { resolveCronStoreKey, saveCronStore } from "../cron/store.js";
|
||||
import type { CronStoreFile } from "../cron/types.js";
|
||||
import type { GuardedFetchOptions } from "../infra/net/fetch-guard.js";
|
||||
import type { GatewayCronState } from "./server-cron.js";
|
||||
@@ -113,8 +113,7 @@ async function createCronCasePaths(tempPrefix: string): Promise<{
|
||||
}> {
|
||||
const suiteRoot = await getCronSuiteTempRoot();
|
||||
const dir = path.join(suiteRoot, `${tempPrefix}${cronSuiteCaseId++}`);
|
||||
const storePath = path.join(dir, "cron", "jobs.json");
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
const storePath = resolveCronStoreKey();
|
||||
return { dir, storePath };
|
||||
}
|
||||
|
||||
@@ -956,7 +955,7 @@ describe("gateway server cron", () => {
|
||||
| undefined;
|
||||
expect(statusPayload?.enabled).toBe(true);
|
||||
const storePath = typeof statusPayload?.storePath === "string" ? statusPayload.storePath : "";
|
||||
expect(storePath).toContain("jobs.json");
|
||||
expect(storePath).toBe("default");
|
||||
|
||||
const autoRes = await directCronReq(cronState, "cron.add", {
|
||||
name: "auto run test",
|
||||
|
||||
@@ -119,9 +119,6 @@ export function createGatewayConfigModuleMock(actual: GatewayConfigModule): Gate
|
||||
if (typeof testState.cronEnabled === "boolean") {
|
||||
fileCron.enabled = testState.cronEnabled;
|
||||
}
|
||||
if (typeof testState.cronStorePath === "string") {
|
||||
fileCron.store = testState.cronStorePath;
|
||||
}
|
||||
const cron = Object.keys(fileCron).length > 0 ? fileCron : undefined;
|
||||
|
||||
return {
|
||||
|
||||
@@ -72,7 +72,6 @@ export { resolveAgentMaxConcurrent } from "../config/agent-limits.js";
|
||||
export {
|
||||
loadCronStore,
|
||||
resolveCronStoreKey,
|
||||
resolveCronStorePath,
|
||||
saveCronStore,
|
||||
updateCronStoreJobs,
|
||||
} from "../cron/store.js";
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
export {
|
||||
loadCronStore,
|
||||
resolveCronStoreKey,
|
||||
resolveCronStorePath,
|
||||
saveCronStore,
|
||||
updateCronStoreJobs,
|
||||
} from "../cron/store.js";
|
||||
|
||||
Reference in New Issue
Block a user