refactor: clarify cron sqlite store keys

This commit is contained in:
Peter Steinberger
2026-05-08 14:24:32 +01:00
parent e9964d0cda
commit 2ec7a92574
21 changed files with 62 additions and 48 deletions

View File

@@ -398,7 +398,7 @@ Model override note:
{
cron: {
enabled: true,
store: "~/.openclaw/cron/jobs.json", // optional legacy import namespace
store: "~/.openclaw/cron/jobs.json", // optional legacy import key
maxConcurrentRuns: 1,
retry: {
maxAttempts: 3,
@@ -413,7 +413,7 @@ Model override note:
`maxConcurrentRuns` limits both scheduled cron dispatch and isolated agent-turn execution. Isolated cron agent turns use the queue's dedicated `cron-nested` execution lane internally, so raising this value lets independent cron LLM runs progress in parallel instead of only starting their outer cron wrappers. The shared non-cron `nested` lane is not widened by this setting.
Cron data is keyed by the resolved `cron.store` value inside the shared SQLite state database. It stores job definitions, pending slots, active markers, last-run metadata, and the schedule identity used to invalidate stale pending slots after a job update.
Cron data is keyed by the resolved `cron.store` value inside the shared SQLite state database. That value is a legacy import key, not a runtime JSON write path. SQLite stores job definitions, pending slots, active markers, last-run metadata, and the schedule identity used to invalidate stale pending slots after a job update.
Run `openclaw doctor --fix` once after upgrading from an older version so doctor can import and remove legacy `jobs.json` and `jobs-state.json` files.

View File

@@ -202,7 +202,7 @@ For the plugin authoring guide, see [Plugin SDK overview](/plugins/sdk-overview)
| `plugin-sdk/reply-reference` | `createReplyReferencePlanner` |
| `plugin-sdk/reply-chunking` | Narrow text/markdown chunking helpers |
| `plugin-sdk/session-store-runtime` | SQLite-backed session row, session-key, updated-at, and transcript path helpers |
| `plugin-sdk/cron-store-runtime` | Cron store path/load/save helpers |
| `plugin-sdk/cron-store-runtime` | SQLite cron store key/load/save helpers |
| `plugin-sdk/state-paths` | State/OAuth dir path helpers |
| `plugin-sdk/routing` | Route/session-key/account binding helpers such as `resolveAgentRoute`, `buildAgentSessionKey`, and `resolveDefaultAgentBoundAccountId` |
| `plugin-sdk/status-helpers` | Shared channel/account status summary helpers, runtime-state defaults, and issue metadata helpers |

View File

@@ -248,6 +248,10 @@ The remaining cleanup is mostly consolidation and deletion:
deleting/reinserting the whole job table on each save. Plugin target
writebacks update matching cron rows directly and keep `cron.jobs.state` in
the same state-database transaction.
- Cron runtime callers now resolve a SQLite cron store key. The old
`resolveCronStorePath` name remains only as a compatibility alias for legacy
import/test/plugin callers; production gateway, task maintenance, status, and
Telegram target writeback paths use `resolveCronStoreKey`.
- ACP spawn no longer resolves or persists transcript JSONL file paths. Spawn
and thread-bind setup persist the SQLite session row directly and keep the
session id as the retained transcript identity.

View File

@@ -10,7 +10,7 @@ const replaceConfigFile: AsyncUnknownMock = vi.fn(async (params: unknown) => {
const record = params as { nextConfig?: unknown; writeOptions?: unknown };
await writeConfigFile(record.nextConfig, record.writeOptions);
});
const resolveCronStorePath: UnknownMock = vi.fn();
const resolveCronStoreKey: UnknownMock = vi.fn();
const updateCronStoreJobs: AsyncUnknownMock = vi.fn();
vi.mock("openclaw/plugin-sdk/config-mutation", async () => {
@@ -31,7 +31,7 @@ vi.mock("openclaw/plugin-sdk/cron-store-runtime", async () => {
);
return {
...actual,
resolveCronStorePath,
resolveCronStoreKey,
updateCronStoreJobs,
};
});
@@ -75,9 +75,9 @@ export function installMaybePersistResolvedTelegramTargetTests(params?: {
readConfigFileSnapshotForWrite.mockReset();
replaceConfigFile.mockClear();
writeConfigFile.mockReset();
resolveCronStorePath.mockReset();
resolveCronStoreKey.mockReset();
updateCronStoreJobs.mockReset();
resolveCronStorePath.mockReturnValue("/tmp/cron/jobs.json");
resolveCronStoreKey.mockReturnValue("/tmp/cron/jobs.json");
updateCronStoreJobs.mockResolvedValue({ updatedJobs: 0 });
});

View File

@@ -3,7 +3,7 @@ import {
replaceConfigFile,
} from "openclaw/plugin-sdk/config-mutation";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { resolveCronStorePath, updateCronStoreJobs } from "openclaw/plugin-sdk/cron-store-runtime";
import { resolveCronStoreKey, updateCronStoreJobs } from "openclaw/plugin-sdk/cron-store-runtime";
import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env";
import {
normalizeLowercaseStringOrEmpty,
@@ -192,8 +192,8 @@ export async function maybePersistResolvedTelegramTarget(params: {
}
try {
const storePath = resolveCronStorePath(params.cfg.cron?.store);
const result = await updateCronStoreJobs(storePath, (job) => {
const storeKey = resolveCronStoreKey(params.cfg.cron?.store);
const result = await updateCronStoreJobs(storeKey, (job) => {
if (job.delivery?.channel !== "telegram") {
return undefined;
}

View File

@@ -1,4 +1,4 @@
import { loadCronStore, resolveCronStorePath } from "../../cron/store.js";
import { loadCronStore, resolveCronStoreKey } from "../../cron/store.js";
import { normalizeLowercaseStringOrEmpty } from "../../shared/string-coerce.js";
import type { ReplyPayload } from "../types.js";
@@ -31,7 +31,7 @@ export async function hasSessionRelatedCronJobs(params: {
sessionKey?: string;
}): Promise<boolean> {
try {
const cronStorePath = resolveCronStorePath(params.cronStorePath);
const cronStorePath = resolveCronStoreKey(params.cronStorePath);
const store = await loadCronStore(cronStorePath);
if (store.jobs.length === 0) {
return false;

View File

@@ -188,6 +188,7 @@ const loadCronStoreMock = vi.fn();
vi.mock("../../cron/store.js", () => {
return {
loadCronStore: (...args: unknown[]) => loadCronStoreMock(...args),
resolveCronStoreKey: (statePath?: string) => statePath ?? "/tmp/openclaw-cron-store.json",
resolveCronStorePath: (statePath?: string) => statePath ?? "/tmp/openclaw-cron-store.json",
};
});

View File

@@ -4,7 +4,7 @@ import { resolveMainSessionKey } from "../config/sessions/main-session.js";
import { listSessionEntries } from "../config/sessions/store.js";
import { resolveSessionTotalTokens, type SessionEntry } from "../config/sessions/types.js";
import type { OpenClawConfig } from "../config/types.js";
import { resolveCronStorePath } from "../cron/store.js";
import { resolveCronStoreKey } from "../cron/store.js";
import { listGatewayAgentsBasic } from "../gateway/agent-list.js";
import { resolveHeartbeatSummaryForAgent } from "../infra/heartbeat-summary.js";
import { peekSystemEvents } from "../infra/system-events.js";
@@ -149,7 +149,7 @@ export async function getStatusSummary(
const queuedSystemEvents = peekSystemEvents(mainSessionKey);
const taskMaintenanceModule = await loadTaskRegistryMaintenanceModule();
taskMaintenanceModule.configureTaskRegistryMaintenance({
cronStorePath: resolveCronStorePath(cfg.cron?.store),
cronStorePath: resolveCronStoreKey(cfg.cron?.store),
});
const tasks = taskMaintenanceModule.getInspectableTaskRegistrySummary();
const taskAudit = taskMaintenanceModule.getInspectableTaskAuditSummary();

View File

@@ -1,6 +1,6 @@
import { formatCliCommand } from "../cli/command-format.js";
import { getRuntimeConfig } from "../config/config.js";
import { resolveCronStorePath } from "../cron/store.js";
import { resolveCronStoreKey } from "../cron/store.js";
import type { RuntimeEnv } from "../runtime.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { getTaskById, updateTaskNotifyPolicyById } from "../tasks/runtime-internal.js";
@@ -58,7 +58,7 @@ async function loadTaskCancelConfig() {
function configureTaskMaintenanceFromConfig(): void {
const cfg = getRuntimeConfig();
configureTaskRegistryMaintenance({
cronStorePath: resolveCronStorePath(cfg.cron?.store),
cronStorePath: resolveCronStoreKey(cfg.cron?.store),
});
}

View File

@@ -739,7 +739,7 @@ export const FIELD_LABELS: Record<string, string> = {
"session.threadBindings.defaultSpawnContext": "Thread Spawn Context",
cron: "Cron",
"cron.enabled": "Cron Enabled",
"cron.store": "Cron Store Path",
"cron.store": "Cron Legacy Store Key",
"cron.maxConcurrentRuns": "Cron Max Concurrent Runs",
"cron.retry": "Cron Retry Policy",
"cron.retry.maxAttempts": "Cron Retry Max Attempts",

View File

@@ -9,7 +9,7 @@ import {
loadCronStore,
loadCronStoreSync,
loadLegacyCronStoreForMigration,
resolveCronStorePath,
resolveCronStoreKey,
saveCronStore,
updateCronStoreJobs,
} from "./store.js";
@@ -66,7 +66,7 @@ function makeStore(jobId: string, enabled: boolean): CronStoreFile {
};
}
describe("resolveCronStorePath", () => {
describe("resolveCronStoreKey", () => {
afterEach(() => {
vi.unstubAllEnvs();
});
@@ -75,7 +75,7 @@ describe("resolveCronStorePath", () => {
vi.stubEnv("OPENCLAW_HOME", "/srv/openclaw-home");
vi.stubEnv("HOME", "/home/other");
const result = resolveCronStorePath("~/cron/jobs.json");
const result = resolveCronStoreKey("~/cron/jobs.json");
expect(result).toBe(path.resolve("/srv/openclaw-home", "cron", "jobs.json"));
});
});

View File

@@ -38,7 +38,7 @@ function resolveDefaultCronDir(): string {
return path.join(resolveConfigDir(), "cron");
}
function resolveDefaultCronStorePath(): string {
function resolveDefaultCronStoreKey(): string {
return path.join(resolveDefaultCronDir(), "jobs.json");
}
@@ -172,17 +172,23 @@ function extractStateFile(store: CronStoreFile): CronStateFile {
return { version: 1, jobs };
}
export function resolveCronStorePath(storePath?: string) {
if (storePath?.trim()) {
const raw = storePath.trim();
export function resolveCronStoreKey(configuredLegacyStorePath?: string) {
if (configuredLegacyStorePath?.trim()) {
const raw = configuredLegacyStorePath.trim();
if (raw.startsWith("~")) {
return path.resolve(expandHomePrefix(raw));
}
return path.resolve(raw);
}
return resolveDefaultCronStorePath();
return resolveDefaultCronStoreKey();
}
/**
* @deprecated Use `resolveCronStoreKey`. The returned value is now a SQLite
* partition key and legacy import namespace, not a runtime JSON store path.
*/
export const resolveCronStorePath = resolveCronStoreKey;
export function legacyCronStoreFileExists(storePath: string): boolean {
try {
return fs.existsSync(storePath);

View File

@@ -1,7 +1,7 @@
import type { CliDeps } from "../cli/deps.types.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { CronServiceContract } from "../cron/service-contract.js";
import { resolveCronStorePath } from "../cron/store.js";
import { resolveCronStoreKey } from "../cron/store.js";
import type { GatewayCronState } from "./server-cron.js";
type LazyGatewayCronParams = {
@@ -16,7 +16,7 @@ type LoadedGatewayCronState = {
};
export function createLazyGatewayCronState(params: LazyGatewayCronParams): GatewayCronState {
const storePath = resolveCronStorePath(params.cfg.cron?.store);
const storePath = resolveCronStoreKey(params.cfg.cron?.store);
const cronEnabled = process.env.OPENCLAW_SKIP_CRON !== "1" && params.cfg.cron?.enabled !== false;
let loaded: LoadedGatewayCronState | null = null;
let loading: Promise<LoadedGatewayCronState> | null = null;

View File

@@ -14,7 +14,7 @@ import { appendCronRunLogToSqlite, resolveCronRunLogPruneOptions } from "../cron
import type { CronServiceContract } from "../cron/service-contract.js";
import { CronService } from "../cron/service.js";
import { resolveCronSessionTargetSessionKey } from "../cron/session-target.js";
import { resolveCronStorePath } from "../cron/store.js";
import { resolveCronStoreKey } from "../cron/store.js";
import type { CronJob } from "../cron/types.js";
import { formatErrorMessage } from "../infra/errors.js";
import { runHeartbeatOnce } from "../infra/heartbeat-runner.js";
@@ -37,6 +37,7 @@ import {
export type GatewayCronState = {
cron: CronServiceContract;
/** SQLite cron partition key. Kept as `storePath` for older RPC/status shapes. */
storePath: string;
cronEnabled: boolean;
};
@@ -86,7 +87,7 @@ export function buildGatewayCronService(params: {
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
}): GatewayCronState {
const cronLogger = getChildLogger({ module: "cron" });
const storePath = resolveCronStorePath(params.cfg.cron?.store);
const storePath = resolveCronStoreKey(params.cfg.cron?.store);
const cronEnabled = process.env.OPENCLAW_SKIP_CRON !== "1" && params.cfg.cron?.enabled !== false;
const findAgentEntry = (cfg: OpenClawConfig, agentId: string) =>
@@ -338,7 +339,7 @@ export function buildGatewayCronService(params: {
mode,
accountId,
}),
log: getChildLogger({ module: "cron", storePath }),
log: getChildLogger({ module: "cron", storeKey: storePath }),
onEvent: (evt) => {
params.broadcast("cron", evt, { dropIfSlow: true });
// Build hook event from CronEvent. The job snapshot is carried on the
@@ -416,7 +417,7 @@ export function buildGatewayCronService(params: {
},
runLogPrune,
).catch((err) => {
cronLogger.warn({ err: String(err), storePath }, "cron: run log append failed");
cronLogger.warn({ err: String(err), storeKey: storePath }, "cron: run log append failed");
});
}
},

View File

@@ -1,6 +1,6 @@
import type { GatewayTailscaleMode } from "../config/types.gateway.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { resolveCronStorePath } from "../cron/store.js";
import { resolveCronStoreKey } from "../cron/store.js";
import type { PluginRegistry } from "../plugins/registry-types.js";
type Awaitable<T> = T | Promise<T>;
@@ -115,7 +115,7 @@ export async function startGatewayEarlyRuntime(params: {
setSkillsRemoteRegistry(params.nodeRegistry);
void primeRemoteSkillsCache();
taskRegistryMaintenance.configureTaskRegistryMaintenance({
cronStorePath: resolveCronStorePath(params.cfgAtStart.cron?.store),
cronStorePath: resolveCronStoreKey(params.cfgAtStart.cron?.store),
cronRuntimeAuthoritative: true,
});
taskRegistryMaintenance.startTaskRegistryMaintenance();

View File

@@ -1,5 +1,5 @@
import { loadSqliteSessionTranscriptEvents } from "../config/sessions/transcript-store.sqlite.js";
import { loadCronStoreSync, resolveCronStorePath } from "../cron/store.js";
import { loadCronStoreSync, resolveCronStoreKey } from "../cron/store.js";
const MAX_QUOTED_FIELD_CHARS = 140;
@@ -99,7 +99,7 @@ function readCronJobName(cronJobId: string | undefined): string | undefined {
return undefined;
}
try {
const store = loadCronStoreSync(resolveCronStorePath());
const store = loadCronStoreSync(resolveCronStoreKey());
const job = store.jobs.find((entry) => entry.id === cronJobId);
return typeof job?.name === "string" && job.name.trim() ? job.name.trim() : undefined;
} catch {

View File

@@ -71,6 +71,7 @@ export { resolveActiveTalkProviderConfig } from "../config/talk.js";
export { resolveAgentMaxConcurrent } from "../config/agent-limits.js";
export {
loadCronStore,
resolveCronStoreKey,
resolveCronStorePath,
saveCronStore,
updateCronStoreJobs,

View File

@@ -1,5 +1,6 @@
export {
loadCronStore,
resolveCronStoreKey,
resolveCronStorePath,
saveCronStore,
updateCronStoreJobs,

View File

@@ -169,7 +169,7 @@ function createTaskRegistryMaintenanceHarness(params: {
return next;
},
isCronRuntimeAuthoritative: () => params.cronRuntimeAuthoritative ?? true,
resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json",
resolveCronStoreKey: () => "/tmp/openclaw-test-cron/jobs.json",
loadCronStoreSync: () => params.cronStore ?? { version: 1, jobs: [] },
readCronRunLogEntriesSync: (_storePath, opts) => cronRunLogEntries[opts?.jobId ?? ""] ?? [],
};

View File

@@ -14,7 +14,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js";
import { isCronJobActive } from "../cron/active-jobs.js";
import { readCronRunLogEntriesFromSqliteSync } from "../cron/run-log.js";
import type { CronRunLogEntry } from "../cron/run-log.js";
import { loadCronStoreSync, resolveCronStorePath } from "../cron/store.js";
import { loadCronStoreSync, resolveCronStoreKey } from "../cron/store.js";
import type { CronJob, CronStoreFile } from "../cron/types.js";
import { getAgentRunContext } from "../infra/agent-events.js";
import { getSessionBindingService } from "../infra/outbound/session-binding-service.js";
@@ -68,7 +68,7 @@ const SWEEP_YIELD_BATCH_SIZE = 25;
let sweeper: NodeJS.Timeout | null = null;
let deferredSweep: NodeJS.Timeout | null = null;
let sweepInProgress = false;
let configuredCronStorePath: string | undefined;
let configuredCronStoreKey: string | undefined;
let configuredCronRuntimeAuthoritative = false;
type TaskRegistryMaintenanceRuntime = {
@@ -97,7 +97,7 @@ type TaskRegistryMaintenanceRuntime = {
resolveTaskForLookupToken: typeof resolveTaskForLookupToken;
setTaskCleanupAfterById: typeof setTaskCleanupAfterById;
isCronRuntimeAuthoritative: () => boolean;
resolveCronStorePath: typeof resolveCronStorePath;
resolveCronStoreKey: typeof resolveCronStoreKey;
loadCronStoreSync: typeof loadCronStoreSync;
readCronRunLogEntriesSync: typeof readCronRunLogEntriesFromSqliteSync;
};
@@ -135,7 +135,7 @@ const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = {
resolveTaskForLookupToken,
setTaskCleanupAfterById,
isCronRuntimeAuthoritative: () => configuredCronRuntimeAuthoritative,
resolveCronStorePath: () => configuredCronStorePath ?? resolveCronStorePath(),
resolveCronStoreKey: () => configuredCronStoreKey ?? resolveCronStoreKey(),
loadCronStoreSync,
readCronRunLogEntriesSync: readCronRunLogEntriesFromSqliteSync,
};
@@ -164,7 +164,7 @@ type CronTerminalRecovery = {
};
type CronRecoveryContext = {
storePath: string;
storeKey: string;
store?: CronStoreFile | null;
runLogsByJobId: Map<string, CronRunLogEntry[]>;
};
@@ -176,7 +176,7 @@ type BackingSessionLookupContext = {
function createCronRecoveryContext(): CronRecoveryContext {
return {
storePath: taskRegistryMaintenanceRuntime.resolveCronStorePath(),
storeKey: taskRegistryMaintenanceRuntime.resolveCronStoreKey(),
runLogsByJobId: new Map<string, CronRunLogEntry[]>(),
};
}
@@ -280,7 +280,7 @@ function getCronRunLogEntries(context: CronRecoveryContext, jobId: string): Cron
}
let entries: CronRunLogEntry[] = [];
try {
entries = taskRegistryMaintenanceRuntime.readCronRunLogEntriesSync(context.storePath, {
entries = taskRegistryMaintenanceRuntime.readCronRunLogEntriesSync(context.storeKey, {
jobId,
limit: 5000,
});
@@ -296,7 +296,7 @@ function getCronStore(context: CronRecoveryContext): CronStoreFile | null {
return context.store;
}
try {
context.store = taskRegistryMaintenanceRuntime.loadCronStoreSync(context.storePath);
context.store = taskRegistryMaintenanceRuntime.loadCronStoreSync(context.storeKey);
} catch {
context.store = null;
}
@@ -1058,7 +1058,7 @@ export function setTaskRegistryMaintenanceRuntimeForTests(
export function resetTaskRegistryMaintenanceRuntimeForTests(): void {
taskRegistryMaintenanceRuntime = defaultTaskRegistryMaintenanceRuntime;
configuredCronStorePath = undefined;
configuredCronStoreKey = undefined;
configuredCronRuntimeAuthoritative = false;
}
@@ -1066,7 +1066,7 @@ export function configureTaskRegistryMaintenance(options: {
cronStorePath?: string;
cronRuntimeAuthoritative?: boolean;
}): void {
configuredCronStorePath = options.cronStorePath?.trim() || undefined;
configuredCronStoreKey = options.cronStorePath?.trim() || undefined;
if (options.cronRuntimeAuthoritative !== undefined) {
configuredCronRuntimeAuthoritative = options.cronRuntimeAuthoritative;
}

View File

@@ -184,7 +184,7 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: {
return next;
},
isCronRuntimeAuthoritative: () => true,
resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json",
resolveCronStoreKey: () => "/tmp/openclaw-test-cron/jobs.json",
loadCronStoreSync: () => ({ version: 1, jobs: [] }),
readCronRunLogEntriesSync: () => [],
});
@@ -2104,7 +2104,7 @@ describe("task-registry", () => {
resolveTaskForLookupToken: () => undefined,
setTaskCleanupAfterById: () => null,
isCronRuntimeAuthoritative: () => true,
resolveCronStorePath: () => "/tmp/openclaw-test-cron/jobs.json",
resolveCronStoreKey: () => "/tmp/openclaw-test-cron/jobs.json",
loadCronStoreSync: () => ({ version: 1, jobs: [] }),
readCronRunLogEntriesSync: () => [],
});