From c2f76a2ef66526c3ea2dc10d2e90aefdc95acc81 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 8 May 2026 18:15:04 +0100 Subject: [PATCH] refactor: remove session maintenance write options --- docs/refactor/piless.md | 22 ++-- src/acp/runtime/session-meta.ts | 5 +- .../main-session-restart-recovery.test.ts | 8 +- src/agents/main-session-restart-recovery.ts | 113 ++++++++---------- ...agent-registry.persistence.test-support.ts | 34 ++---- src/auto-reply/reply/session.ts | 21 ++-- .../doctor/shared/codex-route-warnings.ts | 6 +- src/config/sessions.test.ts | 50 ++++---- src/config/sessions/cleanup-service.ts | 107 +++++++---------- src/config/sessions/runtime-types.ts | 2 - src/config/sessions/session-file.ts | 17 +-- src/config/sessions/sessions.test.ts | 10 +- .../sessions/store-backend.sqlite.test.ts | 38 +++--- src/config/sessions/store.pruning.test.ts | 4 +- .../sessions/store.skills-stripping.test.ts | 18 ++- src/config/sessions/store.ts | 105 ++++++---------- src/config/sessions/types.ts | 4 +- src/gateway/boot.test.ts | 15 ++- src/gateway/boot.ts | 18 ++- 19 files changed, 251 insertions(+), 346 deletions(-) diff --git a/docs/refactor/piless.md b/docs/refactor/piless.md index c4d99e357b1..c45dea2958a 100644 --- a/docs/refactor/piless.md +++ b/docs/refactor/piless.md @@ -49,9 +49,11 @@ This plan has started landing in slices: - Canonical per-agent session stores use SQLite by default. The `openclaw doctor` fix mode imports legacy `sessions.json` indexes into SQLite and removes the JSON index after import, instead of keeping a startup migration or parallel - compatibility/export store. Runtime session reads and writes no longer run - JSON import, pruning, capping, archive cleanup, or disk-budget cleanup; those - mutations now live behind explicit doctor/session-cleanup steps. + compatibility/export store. Runtime session reads and writes normalize and + persist only: no JSON import, pruning, capping, archive cleanup, or + disk-budget cleanup runs on the hot path. The old maintenance write options + have been removed from the session-store API; doctor owns legacy import and + `openclaw sessions cleanup` owns explicit cleanup. - Transcript events have a SQLite store primitive with JSONL import/export. Transcript append paths dual-write when the caller already has agent and session scope, including gateway-injected assistant messages. Scoped appends @@ -341,10 +343,12 @@ Migration order: 1. Keep current task registry and plugin state as is. 2. Add shared SQLite connection and migration helpers. -3. Move `sessions.json` behind a `SessionStoreBackend` interface. -4. Make SQLite primary for session entries. +3. Move `sessions.json` behind a `SessionStoreBackend` interface. Done for + canonical per-agent stores. +4. Make SQLite primary for session entries. Done for canonical per-agent + stores. 5. Import old `sessions.json` only from `openclaw doctor --fix`, then remove the - JSON index after SQLite has the rows. + JSON index after SQLite has the rows. Done for session indexes. 6. Leave `*.jsonl` transcripts on disk while PI owns transcript semantics. 7. After session manager ownership moves behind OpenClaw APIs, store transcript events in SQLite and export JSONL for compatibility. @@ -543,10 +547,10 @@ Phase 1: SQLite session index - Add shared state DB helper. - Add a doctor migration that imports `sessions.json` into SQLite and removes the JSON index. -- Move session entries to SQLite behind a flag. +- Move canonical session entries to SQLite by default. - Prove current session list, patch, reset, cleanup, and UI flows. -- Remove load-time/startup session JSON migration and write-time pruning from - the runtime store path. +- Remove load-time/startup session JSON migration, write-time pruning, and + migration-era maintenance options from the runtime store path. Phase 2: VFS scratch diff --git a/src/acp/runtime/session-meta.ts b/src/acp/runtime/session-meta.ts index 491e2a80d57..6f417b10fe6 100644 --- a/src/acp/runtime/session-meta.ts +++ b/src/acp/runtime/session-meta.ts @@ -171,9 +171,6 @@ export async function upsertAcpSessionMeta(params: { store[storeSessionKey] = nextEntry; return nextEntry; }, - { - activeSessionKey: normalizeLowercaseStringOrEmpty(sessionKey), - allowDropAcpMetaSessionKeys: [sessionKey], - }, + { allowDropAcpMetaSessionKeys: [sessionKey] }, ); } diff --git a/src/agents/main-session-restart-recovery.test.ts b/src/agents/main-session-restart-recovery.test.ts index f99c3daef87..558dea7e18d 100644 --- a/src/agents/main-session-restart-recovery.test.ts +++ b/src/agents/main-session-restart-recovery.test.ts @@ -2,8 +2,9 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { loadSessionStore, type SessionEntry } from "../config/sessions.js"; +import { loadSessionStore, saveSessionStore, type SessionEntry } from "../config/sessions.js"; import { callGateway } from "../gateway/call.js"; +import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; import { markRestartAbortedMainSessionsFromLocks, recoverRestartAbortedMainSessions, @@ -19,9 +20,12 @@ let tmpDir: string; beforeEach(async () => { vi.clearAllMocks(); tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-main-restart-recovery-")); + vi.stubEnv("OPENCLAW_STATE_DIR", tmpDir); }); afterEach(async () => { + closeOpenClawStateDatabaseForTest(); + vi.unstubAllEnvs(); await fs.rm(tmpDir, { recursive: true, force: true }); }); @@ -32,7 +36,7 @@ async function makeSessionsDir(agentId = "main"): Promise { } async function writeStore(sessionsDir: string, store: Record): Promise { - await fs.writeFile(path.join(sessionsDir, "sessions.json"), JSON.stringify(store, null, 2)); + await saveSessionStore(path.join(sessionsDir, "sessions.json"), store); } async function writeTranscript( diff --git a/src/agents/main-session-restart-recovery.ts b/src/agents/main-session-restart-recovery.ts index 7adc9857c53..99cd67326d6 100644 --- a/src/agents/main-session-restart-recovery.ts +++ b/src/agents/main-session-restart-recovery.ts @@ -133,28 +133,24 @@ async function markSessionFailed(params: { sessionKey: string; reason: string; }): Promise { - await updateSessionStore( - params.storePath, - (store) => { - const entry = store[params.sessionKey]; - if (!entry || entry.status !== "running") { - return; - } - entry.status = "failed"; - entry.abortedLastRun = true; - entry.endedAt = Date.now(); - entry.updatedAt = entry.endedAt; - entry.pendingFinalDelivery = undefined; - entry.pendingFinalDeliveryText = undefined; - entry.pendingFinalDeliveryCreatedAt = undefined; - entry.pendingFinalDeliveryLastAttemptAt = undefined; - entry.pendingFinalDeliveryAttemptCount = undefined; - entry.pendingFinalDeliveryLastError = undefined; - entry.pendingFinalDeliveryContext = undefined; - store[params.sessionKey] = entry; - }, - { skipMaintenance: true }, - ); + await updateSessionStore(params.storePath, (store) => { + const entry = store[params.sessionKey]; + if (!entry || entry.status !== "running") { + return; + } + entry.status = "failed"; + entry.abortedLastRun = true; + entry.endedAt = Date.now(); + entry.updatedAt = entry.endedAt; + entry.pendingFinalDelivery = undefined; + entry.pendingFinalDeliveryText = undefined; + entry.pendingFinalDeliveryCreatedAt = undefined; + entry.pendingFinalDeliveryLastAttemptAt = undefined; + entry.pendingFinalDeliveryAttemptCount = undefined; + entry.pendingFinalDeliveryLastError = undefined; + entry.pendingFinalDeliveryContext = undefined; + store[params.sessionKey] = entry; + }); log.warn(`marked interrupted main session failed: ${params.sessionKey} (${params.reason})`); } @@ -175,26 +171,21 @@ async function resumeMainSession(params: { }, timeoutMs: 10_000, }); - await updateSessionStore( - params.storePath, - (store) => { - const entry = store[params.sessionKey]; - if (!entry) { - return; - } - const now = Date.now(); - entry.abortedLastRun = false; - entry.updatedAt = now; - if (entry.pendingFinalDelivery || entry.pendingFinalDeliveryText) { - entry.pendingFinalDeliveryLastAttemptAt = now; - entry.pendingFinalDeliveryAttemptCount = - (entry.pendingFinalDeliveryAttemptCount ?? 0) + 1; - entry.pendingFinalDeliveryLastError = null; - } - store[params.sessionKey] = entry; - }, - { skipMaintenance: true }, - ); + await updateSessionStore(params.storePath, (store) => { + const entry = store[params.sessionKey]; + if (!entry) { + return; + } + const now = Date.now(); + entry.abortedLastRun = false; + entry.updatedAt = now; + if (entry.pendingFinalDelivery || entry.pendingFinalDeliveryText) { + entry.pendingFinalDeliveryLastAttemptAt = now; + entry.pendingFinalDeliveryAttemptCount = (entry.pendingFinalDeliveryAttemptCount ?? 0) + 1; + entry.pendingFinalDeliveryLastError = null; + } + store[params.sessionKey] = entry; + }); log.info( `resumed interrupted main session: ${params.sessionKey}${ params.pendingFinalDeliveryText ? " (with pending payload)" : "" @@ -223,28 +214,24 @@ export async function markRestartAbortedMainSessionsFromLocks(params: { } const storePath = path.join(sessionsDir, "sessions.json"); - await updateSessionStore( - storePath, - (store) => { - for (const [sessionKey, entry] of Object.entries(store)) { - if (!entry || entry.status !== "running") { - continue; - } - if (shouldSkipMainRecovery(entry, sessionKey)) { - result.skipped++; - continue; - } - const entryLockPaths = resolveEntryTranscriptLockPaths({ entry, sessionsDir }); - if (!entryLockPaths.some((lockPath) => interruptedLockPaths.has(lockPath))) { - continue; - } - entry.abortedLastRun = true; - store[sessionKey] = entry; - result.marked++; + await updateSessionStore(storePath, (store) => { + for (const [sessionKey, entry] of Object.entries(store)) { + if (!entry || entry.status !== "running") { + continue; } - }, - { skipMaintenance: true }, - ); + if (shouldSkipMainRecovery(entry, sessionKey)) { + result.skipped++; + continue; + } + const entryLockPaths = resolveEntryTranscriptLockPaths({ entry, sessionsDir }); + if (!entryLockPaths.some((lockPath) => interruptedLockPaths.has(lockPath))) { + continue; + } + entry.abortedLastRun = true; + store[sessionKey] = entry; + result.marked++; + } + }); if (result.marked > 0) { log.warn(`marked ${result.marked} interrupted main session(s) from stale transcript locks`); diff --git a/src/agents/subagent-registry.persistence.test-support.ts b/src/agents/subagent-registry.persistence.test-support.ts index 11cd0e46cfa..77dcff78c44 100644 --- a/src/agents/subagent-registry.persistence.test-support.ts +++ b/src/agents/subagent-registry.persistence.test-support.ts @@ -27,20 +27,16 @@ export async function writeSubagentSessionEntry(params: { defaultSessionId: string; }): Promise { const storePath = resolveSubagentSessionStorePath(params.stateDir, params.agentId); - await updateSessionStore( - storePath, - (store) => { - store[params.sessionKey] = { - ...store[params.sessionKey], - sessionId: params.sessionId ?? params.defaultSessionId, - updatedAt: params.updatedAt ?? Date.now(), - ...(typeof params.abortedLastRun === "boolean" - ? { abortedLastRun: params.abortedLastRun } - : {}), - }; - }, - { skipMaintenance: true }, - ); + await updateSessionStore(storePath, (store) => { + store[params.sessionKey] = { + ...store[params.sessionKey], + sessionId: params.sessionId ?? params.defaultSessionId, + updatedAt: params.updatedAt ?? Date.now(), + ...(typeof params.abortedLastRun === "boolean" + ? { abortedLastRun: params.abortedLastRun } + : {}), + }; + }); return storePath; } @@ -50,13 +46,9 @@ export async function removeSubagentSessionEntry(params: { agentId: string; }): Promise { const storePath = resolveSubagentSessionStorePath(params.stateDir, params.agentId); - await updateSessionStore( - storePath, - (store) => { - delete store[params.sessionKey]; - }, - { skipMaintenance: true }, - ); + await updateSessionStore(storePath, (store) => { + delete store[params.sessionKey]; + }); return storePath; } diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index 97bb984f6f0..9c79a28861e 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -761,7 +761,6 @@ export async function initSessionState(params: { agentId, sessionsDir: path.dirname(storePath), fallbackSessionFile, - activeSessionKey: sessionKey, }); sessionEntry = resolvedSessionFile.sessionEntry; if (isNewSession) { @@ -784,19 +783,13 @@ export async function initSessionState(params: { } // Preserve per-session overrides while resetting compaction state on /new. sessionStore[sessionKey] = { ...sessionStore[sessionKey], ...sessionEntry }; - await updateSessionStore( - storePath, - (store) => { - // Preserve per-session overrides while resetting compaction state on /new. - store[sessionKey] = { ...store[sessionKey], ...sessionEntry }; - if (retiredLegacyMainDelivery) { - store[retiredLegacyMainDelivery.key] = retiredLegacyMainDelivery.entry; - } - }, - { - activeSessionKey: sessionKey, - }, - ); + await updateSessionStore(storePath, (store) => { + // Preserve per-session overrides while resetting compaction state on /new. + store[sessionKey] = { ...store[sessionKey], ...sessionEntry }; + if (retiredLegacyMainDelivery) { + store[retiredLegacyMainDelivery.key] = retiredLegacyMainDelivery.entry; + } + }); // Archive old transcript so it doesn't accumulate on disk (#14869). let previousSessionTranscript: { diff --git a/src/commands/doctor/shared/codex-route-warnings.ts b/src/commands/doctor/shared/codex-route-warnings.ts index 817085ec636..1247289884d 100644 --- a/src/commands/doctor/shared/codex-route-warnings.ts +++ b/src/commands/doctor/shared/codex-route-warnings.ts @@ -737,10 +737,8 @@ export async function maybeRepairCodexSessionRoutes(params: { if (staleSessionKeys.length === 0) { continue; } - const result = await updateSessionStore( - target.storePath, - (store) => repairCodexSessionStoreRoutes({ store }), - { skipMaintenance: true }, + const result = await updateSessionStore(target.storePath, (store) => + repairCodexSessionStoreRoutes({ store, runtime }), ); if (!result.changed) { continue; diff --git a/src/config/sessions.test.ts b/src/config/sessions.test.ts index ddc6f072dcc..83553d7db99 100644 --- a/src/config/sessions.test.ts +++ b/src/config/sessions.test.ts @@ -861,20 +861,16 @@ describe("sessions", () => { const readSpy = vi.spyOn(fsSync, "readFileSync"); const parseSpy = vi.spyOn(JSON, "parse"); try { - await updateSessionStore( - storePath, - (store) => { - const existing = store[mainSessionKey]; - if (!existing) { - throw new Error("missing session entry"); - } - store[mainSessionKey] = { - ...existing, - thinkingLevel: "high", - }; - }, - { skipMaintenance: true }, - ); + await updateSessionStore(storePath, (store) => { + const existing = store[mainSessionKey]; + if (!existing) { + throw new Error("missing session entry"); + } + store[mainSessionKey] = { + ...existing, + thinkingLevel: "high", + }; + }); expect(readSpy).not.toHaveBeenCalled(); expect(parseSpy).not.toHaveBeenCalled(); @@ -903,21 +899,17 @@ describe("sessions", () => { expect(loadSessionStore(storePath)[mainSessionKey]?.thinkingLevel).toBe("low"); await expect( - updateSessionStore( - storePath, - (store) => { - const existing = store[mainSessionKey]; - if (!existing) { - throw new Error("missing session entry"); - } - store[mainSessionKey] = { - ...existing, - thinkingLevel: "mutated-before-throw", - }; - throw new Error("boom"); - }, - { skipMaintenance: true }, - ), + updateSessionStore(storePath, (store) => { + const existing = store[mainSessionKey]; + if (!existing) { + throw new Error("missing session entry"); + } + store[mainSessionKey] = { + ...existing, + thinkingLevel: "mutated-before-throw", + }; + throw new Error("boom"); + }), ).rejects.toThrow("boom"); const readSpy = vi.spyOn(fsSync, "readFileSync"); diff --git a/src/config/sessions/cleanup-service.ts b/src/config/sessions/cleanup-service.ts index 98b7a22b70d..0c5c359c404 100644 --- a/src/config/sessions/cleanup-service.ts +++ b/src/config/sessions/cleanup-service.ts @@ -412,72 +412,51 @@ export async function runSessionsCleanup(params: { const appliedReportRef: { current: AppliedSessionCleanupReport | null } = { current: null, }; - const dmScopeRemovedSessionFiles = new Map(); - let missingApplied = 0; - let dmScopeRetiredApplied = 0; - await updateSessionStore( - target.storePath, - async (store) => { - const beforeCount = Object.keys(store).length; - const missing = opts.fixMissing - ? pruneMissingTranscriptEntries({ - store, - storePath: target.storePath, - }) - : 0; - let pruned = 0; - let capped = 0; - let diskBudget: AppliedSessionCleanupReport["diskBudget"] = null; - if (mode === "warn") { - diskBudget = await enforceSessionDiskBudget({ + const missingApplied = await updateSessionStore(target.storePath, async (store) => { + const beforeCount = Object.keys(store).length; + const missing = opts.fixMissing + ? pruneMissingTranscriptEntries({ store, storePath: target.storePath, - activeSessionKey: opts.activeKey, - maintenance, - warnOnly: true, - }); - } else { - const preserveKeys = opts.activeKey ? new Set([opts.activeKey]) : undefined; - pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, { - preserveKeys, - }); - capped = capEntryCount(store, maintenance.maxEntries, { - preserveKeys, - }); - diskBudget = await enforceSessionDiskBudget({ - store, - storePath: target.storePath, - activeSessionKey: opts.activeKey, - maintenance, - warnOnly: false, - }); - } - appliedReportRef.current = { - mode, - beforeCount, - afterCount: Object.keys(store).length, - pruned, - capped, - diskBudget, - }; - return missing; - }, - opts.activeKey ? { activeSessionKey: opts.activeKey } : undefined, - ); - if (dmScopeRemovedSessionFiles.size > 0) { - const storeAfterDmScopeRetire = loadSessionStore(target.storePath, { skipCache: true }); - await archiveRemovedSessionTranscripts({ - removedSessionFiles: dmScopeRemovedSessionFiles, - referencedSessionIds: new Set( - Object.values(storeAfterDmScopeRetire) - .map((entry) => entry?.sessionId) - .filter((id): id is string => Boolean(id)), - ), - storePath: target.storePath, - reason: "deleted", - restrictToStoreDir: true, - }); - } + }) + : 0; + let pruned = 0; + let capped = 0; + let diskBudget: AppliedSessionCleanupReport["diskBudget"] = null; + if (mode === "warn") { + diskBudget = await enforceSessionDiskBudget({ + store, + storePath: target.storePath, + activeSessionKey: opts.activeKey, + maintenance, + warnOnly: true, + }); + } else { + const preserveKeys = opts.activeKey ? new Set([opts.activeKey]) : undefined; + pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, { + preserveKeys, + }); + capped = capEntryCount(store, maintenance.maxEntries, { + preserveKeys, + }); + diskBudget = await enforceSessionDiskBudget({ + store, + storePath: target.storePath, + activeSessionKey: opts.activeKey, + maintenance, + warnOnly: false, + }); + } + appliedReportRef.current = { + mode, + beforeCount, + afterCount: Object.keys(store).length, + pruned, + capped, + diskBudget, + }; + return missing; + }); const afterStore = loadSessionStore(target.storePath, { skipCache: true }); const unreferencedArtifacts = mode === "warn" diff --git a/src/config/sessions/runtime-types.ts b/src/config/sessions/runtime-types.ts index 635668f88dd..4c0212172ca 100644 --- a/src/config/sessions/runtime-types.ts +++ b/src/config/sessions/runtime-types.ts @@ -8,8 +8,6 @@ export type ReadSessionUpdatedAt = (params: { }) => number | undefined; export type SaveSessionStoreOptions = { - skipMaintenance?: boolean; - activeSessionKey?: string; allowDropAcpMetaSessionKeys?: string[]; }; diff --git a/src/config/sessions/session-file.ts b/src/config/sessions/session-file.ts index 5814c8a49c8..c940491219a 100644 --- a/src/config/sessions/session-file.ts +++ b/src/config/sessions/session-file.ts @@ -11,7 +11,6 @@ export async function resolveAndPersistSessionFile(params: { agentId?: string; sessionsDir?: string; fallbackSessionFile?: string; - activeSessionKey?: string; }): Promise<{ sessionFile: string; sessionEntry: SessionEntry }> { const { sessionId, sessionKey, sessionStore, storePath } = params; const now = Date.now(); @@ -39,16 +38,12 @@ export async function resolveAndPersistSessionFile(params: { }; if (baseEntry.sessionId !== sessionId || baseEntry.sessionFile !== sessionFile) { sessionStore[sessionKey] = persistedEntry; - await updateSessionStore( - storePath, - (store) => { - store[sessionKey] = { - ...store[sessionKey], - ...persistedEntry, - }; - }, - params.activeSessionKey ? { activeSessionKey: params.activeSessionKey } : undefined, - ); + await updateSessionStore(storePath, (store) => { + store[sessionKey] = { + ...store[sessionKey], + ...persistedEntry, + }; + }); return { sessionFile, sessionEntry: persistedEntry }; } sessionStore[sessionKey] = persistedEntry; diff --git a/src/config/sessions/sessions.test.ts b/src/config/sessions/sessions.test.ts index d93e529e99e..ba8aa6ee72e 100644 --- a/src/config/sessions/sessions.test.ts +++ b/src/config/sessions/sessions.test.ts @@ -326,13 +326,9 @@ describe("session store writer queue", () => { }); const writeSpy = vi.spyOn(jsonFiles, "writeTextAtomic"); - await updateSessionStore( - storePath, - async () => { - // Intentionally no-op mutation. - }, - { skipMaintenance: true }, - ); + await updateSessionStore(storePath, async () => { + // Intentionally no-op mutation. + }); expect(writeSpy).not.toHaveBeenCalled(); writeSpy.mockRestore(); }); diff --git a/src/config/sessions/store-backend.sqlite.test.ts b/src/config/sessions/store-backend.sqlite.test.ts index 8d6e27680a5..1dc05b3c544 100644 --- a/src/config/sessions/store-backend.sqlite.test.ts +++ b/src/config/sessions/store-backend.sqlite.test.ts @@ -113,18 +113,14 @@ describe("SQLite session store backend", () => { updatedAt: 100, }; - await saveSessionStore(storePath, { "discord:ops": entry }, { skipMaintenance: true }); - await updateSessionStore( - storePath, - (store) => { - store["discord:ops"] = { - ...store["discord:ops"], - updatedAt: 200, - modelOverride: "gpt-5.5", - }; - }, - { skipMaintenance: true }, - ); + await saveSessionStore(storePath, { "discord:ops": entry }); + await updateSessionStore(storePath, (store) => { + store["discord:ops"] = { + ...store["discord:ops"], + updatedAt: 200, + modelOverride: "gpt-5.5", + }; + }); expect(fs.existsSync(storePath)).toBe(false); expect(loadSessionStore(storePath, { skipCache: true })).toEqual({ @@ -146,7 +142,7 @@ describe("SQLite session store backend", () => { updatedAt: 100, }; - await saveSessionStore(storePath, { "discord:ops": entry }, { skipMaintenance: true }); + await saveSessionStore(storePath, { "discord:ops": entry }); expect(fs.existsSync(storePath)).toBe(false); expect(loadSessionStore(storePath, { skipCache: true })).toEqual({ @@ -176,17 +172,13 @@ describe("SQLite session store backend", () => { expect(loadSessionStore(storePath, { skipCache: true })).toEqual({}); - await saveSessionStore( - storePath, - { - "discord:ops": { - ...legacyEntry, - sessionId: "sqlite-session", - updatedAt: 200, - }, + await saveSessionStore(storePath, { + "discord:ops": { + ...legacyEntry, + sessionId: "sqlite-session", + updatedAt: 200, }, - { skipMaintenance: true }, - ); + }); expect(loadSessionStore(storePath, { skipCache: true })).toEqual({ "discord:ops": { ...legacyEntry, diff --git a/src/config/sessions/store.pruning.test.ts b/src/config/sessions/store.pruning.test.ts index 27560e4ebc2..38989bb9642 100644 --- a/src/config/sessions/store.pruning.test.ts +++ b/src/config/sessions/store.pruning.test.ts @@ -2,11 +2,13 @@ import crypto from "node:crypto"; import { afterAll, beforeAll, describe, expect, it } from "vitest"; import { createFixtureSuite } from "../../test-utils/fixture-suite.js"; import { + capEntryCount, + getActiveSessionMaintenanceWarning, isProtectedSessionMaintenanceEntry, + pruneStaleEntries, resolveMaintenanceConfigFromInput, resolveSessionEntryMaintenanceHighWater, } from "./store-maintenance.js"; -import { capEntryCount, getActiveSessionMaintenanceWarning, pruneStaleEntries } from "./store.js"; import type { SessionEntry } from "./types.js"; const DAY_MS = 24 * 60 * 60 * 1000; diff --git a/src/config/sessions/store.skills-stripping.test.ts b/src/config/sessions/store.skills-stripping.test.ts index a8dab480c2e..f6034b0aefe 100644 --- a/src/config/sessions/store.skills-stripping.test.ts +++ b/src/config/sessions/store.skills-stripping.test.ts @@ -91,7 +91,7 @@ describe("session store strips resolvedSkills from persistence", () => { "agent:main:test:1": makeEntry("session-1", makeSnapshot(5)), }; - await saveSessionStore(storePath, store, { skipMaintenance: true }); + await saveSessionStore(storePath, store); const raw = await fs.readFile(storePath, "utf-8"); expect(raw).not.toContain("resolvedSkills"); @@ -107,7 +107,7 @@ describe("session store strips resolvedSkills from persistence", () => { "agent:main:test:1": makeEntry("session-1", snapshot), }; - await saveSessionStore(storePath, store, { skipMaintenance: true }); + await saveSessionStore(storePath, store); const loaded = loadSessionStore(storePath, { skipCache: true }); const persistedSnapshot = loaded["agent:main:test:1"]?.skillsSnapshot; @@ -137,7 +137,7 @@ describe("session store strips resolvedSkills from persistence", () => { ); // Saving the loaded record should rewrite the file in stripped form. - await saveSessionStore(storePath, loaded, { skipMaintenance: true }); + await saveSessionStore(storePath, loaded); const rawAfter = await fs.readFile(storePath, "utf-8"); expect(rawAfter).not.toContain("resolvedSkills"); }); @@ -145,13 +145,9 @@ describe("session store strips resolvedSkills from persistence", () => { it("strips resolvedSkills written via updateSessionStore mutator", async () => { // Simulate the production hot path where ensureSkillSnapshot puts a // freshly-built snapshot (with resolvedSkills) into the store via mutator. - await updateSessionStore( - storePath, - (store) => { - store["agent:main:test:1"] = makeEntry("session-1", makeSnapshot(6)); - }, - { skipMaintenance: true }, - ); + await updateSessionStore(storePath, (store) => { + store["agent:main:test:1"] = makeEntry("session-1", makeSnapshot(6)); + }); const raw = await fs.readFile(storePath, "utf-8"); expect(raw).not.toContain("resolvedSkills"); @@ -168,7 +164,7 @@ describe("session store strips resolvedSkills from persistence", () => { store[`agent:main:scale:${i}`] = makeEntry(`session-${i}`, makeSnapshot(SKILLS_PER_SESSION)); } - await saveSessionStore(storePath, store, { skipMaintenance: true }); + await saveSessionStore(storePath, store); const stat = await fs.stat(storePath); // Pre-fix: ~SESSION_COUNT * SKILLS_PER_SESSION * ~3KB ≈ 15MB. diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index a993813d918..b8c3f99aaf8 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -11,7 +11,6 @@ import { } from "../../utils/delivery-context.shared.js"; import type { DeliveryContext } from "../../utils/delivery-context.types.js"; import { getFileStatSnapshot } from "../cache-utils.js"; -import { enforceSessionDiskBudget, type SessionDiskBudgetSweepResult } from "./disk-budget.js"; import { deriveSessionMetaPatch } from "./metadata.js"; import { saveSqliteSessionStore, @@ -26,16 +25,9 @@ import { writeSessionStoreCache, } from "./store-cache.js"; import { normalizeStoreSessionKey, resolveSessionStoreEntry } from "./store-entry.js"; -import { loadSessionStore } from "./store-load.js"; -import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js"; import { - capEntryCount, - getActiveSessionMaintenanceWarning, pruneQuotaSuspensions, - pruneStaleEntries, type QuotaSuspensionMaintenanceResult, - type ResolvedSessionMaintenanceConfig, - type SessionMaintenanceWarning, } from "./store-maintenance.js"; import { normalizeSessionStore } from "./store-normalize.js"; import { runExclusiveSessionStoreWrite } from "./store-writer.js"; @@ -86,19 +78,7 @@ export function readSessionUpdatedAt(params: { } } -export { - capEntryCount, - getActiveSessionMaintenanceWarning, - pruneStaleEntries, - resolveMaintenanceConfig, -}; -export type { ResolvedSessionMaintenanceConfig, SessionMaintenanceWarning }; - type SaveSessionStoreOptions = { - /** Deprecated no-op retained for callers that still pass migration-era options. */ - skipMaintenance?: boolean; - /** Deprecated no-op retained for callers that still pass migration-era options. */ - activeSessionKey?: string; /** * Session keys that are allowed to drop persisted ACP metadata during this update. * All other updates preserve existing `entry.acp` blocks when callers replace the @@ -306,16 +286,13 @@ export async function runQuotaSuspensionMaintenance(params: { if (!fs.existsSync(params.storePath)) { return { resumed: [], cleared: 0 }; } - return await updateSessionStore( - params.storePath, - (store) => - pruneQuotaSuspensions({ - store, - now: params.now ?? Date.now(), - ttlMs: params.ttlMs, - log: params.log, - }), - { skipMaintenance: true }, + return await updateSessionStore(params.storePath, (store) => + pruneQuotaSuspensions({ + store, + now: params.now ?? Date.now(), + ttlMs: params.ttlMs, + log: params.log, + }), ); } @@ -376,9 +353,7 @@ async function persistResolvedSessionEntry(params: { for (const legacyKey of params.resolved.legacyKeys) { delete params.store[legacyKey]; } - await saveSessionStoreUnlocked(params.storePath, params.store, { - activeSessionKey: params.resolved.normalizedKey, - }); + await saveSessionStoreUnlocked(params.storePath, params.store); return params.next; } @@ -418,42 +393,38 @@ export async function recordSessionMetaFromInbound(params: { }): Promise { const { storePath, sessionKey, ctx } = params; const createIfMissing = params.createIfMissing ?? true; - return await updateSessionStore( - storePath, - (store) => { - const resolved = resolveSessionStoreEntry({ store, sessionKey }); - const existing = resolved.existing; - const patch = deriveSessionMetaPatch({ - ctx, - sessionKey: resolved.normalizedKey, - existing, - groupResolution: params.groupResolution, - }); - if (!patch) { - if (existing && resolved.legacyKeys.length > 0) { - store[resolved.normalizedKey] = existing; - for (const legacyKey of resolved.legacyKeys) { - delete store[legacyKey]; - } + return await updateSessionStore(storePath, (store) => { + const resolved = resolveSessionStoreEntry({ store, sessionKey }); + const existing = resolved.existing; + const patch = deriveSessionMetaPatch({ + ctx, + sessionKey: resolved.normalizedKey, + existing, + groupResolution: params.groupResolution, + }); + if (!patch) { + if (existing && resolved.legacyKeys.length > 0) { + store[resolved.normalizedKey] = existing; + for (const legacyKey of resolved.legacyKeys) { + delete store[legacyKey]; } - return existing ?? null; } - if (!existing && !createIfMissing) { - return null; - } - const next = existing - ? // Inbound metadata updates must not refresh activity timestamps; - // idle reset evaluation relies on updatedAt from actual session turns. - mergeSessionEntryPreserveActivity(existing, patch) - : mergeSessionEntry(existing, patch); - store[resolved.normalizedKey] = next; - for (const legacyKey of resolved.legacyKeys) { - delete store[legacyKey]; - } - return next; - }, - { activeSessionKey: normalizeStoreSessionKey(sessionKey) }, - ); + return existing ?? null; + } + if (!existing && !createIfMissing) { + return null; + } + const next = existing + ? // Inbound metadata updates must not refresh activity timestamps; + // idle reset evaluation relies on updatedAt from actual session turns. + mergeSessionEntryPreserveActivity(existing, patch) + : mergeSessionEntry(existing, patch); + store[resolved.normalizedKey] = next; + for (const legacyKey of resolved.legacyKeys) { + delete store[legacyKey]; + } + return next; + }); } export async function updateLastRoute(params: { diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index 425cf4da62e..5a6d0435a5b 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -577,8 +577,8 @@ export type SessionSkillSnapshot = { /** * Runtime-only, never persisted. Carries the full parsed Skill[] (including * each SKILL.md body) so the embedded runner can skip a workspace skill - * scan within a turn. Stripped from sessions.json on every read and write - * via normalizeSessionStore — see store-load.ts. On a cold session resume + * scan within a turn. Stripped from persistent session entries on every + * read and write via normalizeSessionStore — see store-load.ts. On a cold session resume * this is undefined and src/agents/pi-embedded-runner/skills-runtime.ts * rebuilds it by reloading skill entries from disk. */ diff --git a/src/gateway/boot.test.ts b/src/gateway/boot.test.ts index 99271e4242b..279880b41ea 100644 --- a/src/gateway/boot.test.ts +++ b/src/gateway/boot.test.ts @@ -1,8 +1,9 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { SessionScope } from "../config/sessions/types.js"; +import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; const agentCommand = vi.fn(); @@ -37,10 +38,21 @@ describe("runBootOnce", () => { beforeEach(async () => { vi.clearAllMocks(); + stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-boot-state-")); + vi.stubEnv("OPENCLAW_STATE_DIR", stateDir); const { storePath } = resolveMainStore(); await fs.rm(storePath, { force: true }); }); + afterEach(async () => { + closeOpenClawStateDatabaseForTest(); + vi.unstubAllEnvs(); + if (stateDir) { + await fs.rm(stateDir, { recursive: true, force: true }); + stateDir = ""; + } + }); + const makeDeps = () => ({ sendMessageWhatsApp: vi.fn(), sendMessageTelegram: vi.fn(), @@ -261,3 +273,4 @@ describe("runBootOnce", () => { }); }); }); +let stateDir = ""; diff --git a/src/gateway/boot.ts b/src/gateway/boot.ts index 0c2ee6d0f99..eb70ee58ee7 100644 --- a/src/gateway/boot.ts +++ b/src/gateway/boot.ts @@ -119,17 +119,13 @@ async function restoreMainSessionMapping( return undefined; } try { - await updateSessionStore( - snapshot.storePath, - (store) => { - if (snapshot.hadEntry && snapshot.entry) { - store[snapshot.sessionKey] = snapshot.entry; - return; - } - delete store[snapshot.sessionKey]; - }, - { activeSessionKey: snapshot.sessionKey }, - ); + await updateSessionStore(snapshot.storePath, (store) => { + if (snapshot.hadEntry && snapshot.entry) { + store[snapshot.sessionKey] = snapshot.entry; + return; + } + delete store[snapshot.sessionKey]; + }); return undefined; } catch (err) { return formatErrorMessage(err);