mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-13 07:21:52 +00:00
refactor: make session cleanup explicit
This commit is contained in:
@@ -31,6 +31,10 @@
|
||||
"source": "Message lifecycle refactor",
|
||||
"target": "消息生命周期重构"
|
||||
},
|
||||
{
|
||||
"source": "Refactoring",
|
||||
"target": "重构"
|
||||
},
|
||||
{
|
||||
"source": "Channel message API",
|
||||
"target": "频道消息 API"
|
||||
|
||||
@@ -92,7 +92,7 @@ JSON examples:
|
||||
|
||||
## Cleanup maintenance
|
||||
|
||||
Run maintenance now (instead of waiting for the next write cycle):
|
||||
Run maintenance explicitly:
|
||||
|
||||
```bash
|
||||
openclaw sessions cleanup --dry-run
|
||||
@@ -106,7 +106,7 @@ openclaw sessions cleanup --json
|
||||
`openclaw sessions cleanup` uses `session.maintenance` settings from config:
|
||||
|
||||
- Scope note: `openclaw sessions cleanup` maintains session stores, transcripts, and trajectory sidecars. It does not prune cron run logs (`cron/runs/<jobId>.jsonl`), which are managed by `cron.runLog.maxBytes` and `cron.runLog.keepLines` in [Cron configuration](/automation/cron-jobs#configuration) and explained in [Cron maintenance](/automation/cron-jobs#maintenance).
|
||||
- Cleanup also prunes unreferenced primary transcripts, compaction checkpoints, and trajectory sidecars older than `session.maintenance.pruneAfter`; files still referenced by `sessions.json` are preserved.
|
||||
- Cleanup also prunes unreferenced primary transcripts, compaction checkpoints, and trajectory sidecars older than `session.maintenance.pruneAfter`; files still referenced by the session store are preserved.
|
||||
|
||||
- `--dry-run`: preview how many entries would be pruned/capped without writing.
|
||||
- In text mode, dry-run prints a per-session action table (`Action`, `Key`, `Age`, `Model`, `Flags`) so you can see what would be kept vs removed.
|
||||
@@ -120,7 +120,8 @@ openclaw sessions cleanup --json
|
||||
|
||||
When a Gateway is reachable, non-dry-run cleanup for configured agent stores is
|
||||
sent through the Gateway so it shares the same session-store writer as runtime
|
||||
traffic. Use `--store <path>` for explicit offline repair of a store file.
|
||||
traffic. Legacy JSON import belongs to `openclaw doctor --fix`; cleanup no
|
||||
longer acts as the migration path for `sessions.json`.
|
||||
|
||||
`openclaw sessions cleanup --all-agents --dry-run --json`:
|
||||
|
||||
|
||||
@@ -92,7 +92,7 @@ sessions should expire on a timer.
|
||||
All session state is owned by the **gateway**. UI clients query the gateway for
|
||||
session data.
|
||||
|
||||
- **Store:** `~/.openclaw/state/openclaw.sqlite` by default; legacy/custom JSON stores use `~/.openclaw/agents/<agentId>/sessions/sessions.json`
|
||||
- **Store:** `~/.openclaw/state/openclaw.sqlite` by default. Legacy `sessions.json` indexes are imported by `openclaw doctor --fix`.
|
||||
- **Transcripts:** `~/.openclaw/agents/<agentId>/sessions/<sessionId>.jsonl`
|
||||
|
||||
The session store keeps separate lifecycle timestamps:
|
||||
@@ -109,9 +109,10 @@ writes.
|
||||
|
||||
## Session maintenance
|
||||
|
||||
OpenClaw automatically bounds session storage over time. By default, it runs
|
||||
in `warn` mode (reports what would be cleaned). Set `session.maintenance.mode`
|
||||
to `"enforce"` for automatic cleanup:
|
||||
OpenClaw bounds session storage through explicit maintenance. By default, it
|
||||
runs in `warn` mode (reports what would be cleaned). Set
|
||||
`session.maintenance.mode` to `"enforce"` and run `openclaw sessions cleanup`
|
||||
when you want cleanup to apply:
|
||||
|
||||
```json5
|
||||
{
|
||||
@@ -125,7 +126,7 @@ to `"enforce"` for automatic cleanup:
|
||||
}
|
||||
```
|
||||
|
||||
For production-sized `maxEntries` limits, Gateway runtime writes use a small high-water buffer and clean back down to the configured cap in batches. Session store reads do not prune or cap entries during Gateway startup. This avoids running full store cleanup on every startup or isolated cron session. `openclaw sessions cleanup --enforce` applies the cap immediately.
|
||||
Gateway runtime writes do not prune, cap, or import session rows. Session store reads also do not prune or cap entries during Gateway startup. This avoids running full store cleanup on every startup or isolated cron session. `openclaw sessions cleanup --enforce` applies the cap immediately.
|
||||
|
||||
Maintenance preserves durable external conversation pointers, including group
|
||||
sessions and thread-scoped chat sessions, while still allowing synthetic cron,
|
||||
|
||||
@@ -1221,7 +1221,7 @@ See [Multi-Agent Sandbox & Tools](/tools/multi-agent-sandbox-tools) for preceden
|
||||
- **`maintenance`**: session-store cleanup + retention controls.
|
||||
- `mode`: `warn` emits warnings only; `enforce` applies cleanup.
|
||||
- `pruneAfter`: age cutoff for stale entries (default `30d`).
|
||||
- `maxEntries`: maximum number of entries in the session store (default `500`). Runtime writes batch cleanup with a small high-water buffer for production-sized caps; `openclaw sessions cleanup --enforce` applies the cap immediately.
|
||||
- `maxEntries`: maximum number of entries in the session store (default `500`). Runtime writes do not prune or cap entries; `openclaw sessions cleanup --enforce` applies the cap immediately.
|
||||
- `rotateBytes`: deprecated and ignored; `openclaw doctor --fix` removes it from older configs.
|
||||
- `resetArchiveRetention`: retention for `*.reset.<timestamp>` transcript archives. Defaults to `pruneAfter`; set `false` to disable.
|
||||
- `maxDiskBytes`: optional sessions-directory disk budget. In `warn` mode it logs warnings; in `enforce` mode it removes oldest artifacts/sessions first.
|
||||
|
||||
@@ -293,7 +293,7 @@ That stages grounded durable candidates into the short-term dreaming store while
|
||||
- from legacy `~/.openclaw/credentials/*.json` (except `oauth.json`)
|
||||
- to `~/.openclaw/credentials/whatsapp/<accountId>/...` (default account id: `default`)
|
||||
|
||||
These migrations are best-effort and idempotent; doctor will emit warnings when it leaves any legacy folders behind as backups. The Gateway/CLI also auto-migrates the legacy sessions + agent dir on startup so history/auth/models land in the per-agent path without a manual doctor run. WhatsApp auth is intentionally only migrated via `openclaw doctor`. Talk provider/provider-map normalization now compares by structural equality, so key-order-only diffs no longer trigger repeat no-op `doctor --fix` changes.
|
||||
These migrations are best-effort and idempotent; doctor will emit warnings when it leaves any legacy folders behind as backups. Session JSON import is a doctor step only; Gateway startup does not import or rewrite `sessions.json`. WhatsApp auth is intentionally only migrated via `openclaw doctor`. Talk provider/provider-map normalization now compares by structural equality, so key-order-only diffs no longer trigger repeat no-op `doctor --fix` changes.
|
||||
|
||||
</Accordion>
|
||||
<Accordion title="3a. Legacy plugin manifest migrations">
|
||||
|
||||
@@ -28,7 +28,7 @@ health commands above for live connectivity checks.
|
||||
## Deep diagnostics
|
||||
|
||||
- Creds on disk: `ls -l ~/.openclaw/credentials/whatsapp/<accountId>/creds.json` (mtime should be recent).
|
||||
- Session store: `ls -l ~/.openclaw/state/openclaw.sqlite` (canonical) or `ls -l ~/.openclaw/agents/<agentId>/sessions/sessions.json` for legacy/custom JSON stores. Count and recent recipients are surfaced via `status`.
|
||||
- Session store: `ls -l ~/.openclaw/state/openclaw.sqlite` (canonical). Legacy `sessions.json` indexes are imported through `openclaw doctor --fix`. Count and recent recipients are surfaced via `status`.
|
||||
- Relink flow: `openclaw channels logout && openclaw channels login --verbose` when status codes 409–515 or `loggedOut` appear in logs. (Note: the QR login flow auto-restarts once for status 515 after pairing.)
|
||||
- Diagnostics are enabled by default. The gateway records operational facts unless `diagnostics.enabled: false` is set. Memory events record RSS/heap byte counts, threshold pressure, and growth pressure. Liveness warnings record event-loop delay, event-loop utilization, CPU-core ratio, and active/waiting/queued session counts when the process is running but saturated. Oversized-payload events record what was rejected, truncated, or chunked, plus sizes and limits when available. They do not record the message text, attachment contents, webhook body, raw request or response body, tokens, cookies, or secret values. The same heartbeat starts the bounded stability recorder, which is available through `openclaw gateway stability` or the `diagnostics.stability` Gateway RPC. Fatal Gateway exits, shutdown timeouts, and restart startup failures persist the latest recorder snapshot under `~/.openclaw/logs/stability/` when events exist; inspect the newest saved bundle with `openclaw gateway stability --bundle latest`.
|
||||
- For bug reports, run `openclaw gateway diagnostics export` and attach the generated zip. The export combines a Markdown summary, the newest stability bundle, sanitized log metadata, sanitized Gateway status/health snapshots, and config shape. It is meant to be shared: chat text, webhook bodies, tool outputs, credentials, cookies, account/message identifiers, and secret values are omitted or redacted. See [Diagnostics Export](/gateway/diagnostics).
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
---
|
||||
summary: "Plan for reducing OpenClaw's dependency on external PI packages while moving agent state toward SQLite, VFS scratch storage, and worker isolation"
|
||||
title: "Piless runtime and state refactor plan"
|
||||
title: "Refactoring"
|
||||
read_when:
|
||||
- Planning work to internalize PI runtime pieces
|
||||
- Moving session, transcript, or agent scratch state from JSON files to SQLite
|
||||
@@ -49,7 +49,9 @@ This plan has started landing in slices:
|
||||
- Canonical per-agent session stores use SQLite by default. The `openclaw doctor`
|
||||
fix mode imports legacy `sessions.json` indexes into SQLite and removes the
|
||||
JSON index after import, instead of keeping a startup migration or parallel
|
||||
compatibility/export store.
|
||||
compatibility/export store. Runtime session reads and writes no longer run
|
||||
JSON import, pruning, capping, archive cleanup, or disk-budget cleanup; those
|
||||
mutations now live behind explicit doctor/session-cleanup steps.
|
||||
- Transcript events have a SQLite store primitive with JSONL import/export.
|
||||
Transcript append paths dual-write when the caller already has agent and
|
||||
session scope, including gateway-injected assistant messages. Scoped appends
|
||||
@@ -548,6 +550,8 @@ Phase 1: SQLite session index
|
||||
the JSON index.
|
||||
- Move session entries to SQLite behind a flag.
|
||||
- Prove current session list, patch, reset, cleanup, and UI flows.
|
||||
- Remove load-time/startup session JSON migration and write-time pruning from
|
||||
the runtime store path.
|
||||
|
||||
Phase 2: VFS scratch
|
||||
|
||||
|
||||
@@ -96,7 +96,7 @@ Session persistence has automatic maintenance controls (`session.maintenance`) f
|
||||
- `maxDiskBytes`: optional sessions-directory budget
|
||||
- `highWaterBytes`: optional target after cleanup (default `80%` of `maxDiskBytes`)
|
||||
|
||||
Normal Gateway writes flow through a per-store session writer that serializes in-process mutations. SQLite is the canonical per-agent backend; `sessions.json` is a legacy doctor-import input, not a parallel export/debug store. Runtime code should prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)`. When a Gateway is reachable, non-dry-run `openclaw sessions cleanup` and `openclaw agents delete` delegate store mutations to the Gateway so cleanup joins the same writer queue. `maxEntries` cleanup is still batched for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not import, prune, or cap entries during Gateway startup; use `openclaw doctor --fix` for legacy JSON import and writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately and prunes old unreferenced transcript, checkpoint, and trajectory artifacts even when no disk budget is configured.
|
||||
Normal Gateway writes flow through a per-store session writer that serializes in-process mutations. SQLite is the canonical per-agent backend; `sessions.json` is a legacy doctor-import input, not a parallel export/debug store. Runtime code should prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)`. Runtime writes normalize and persist only; they do not prune, cap, import, archive, or run disk-budget cleanup. When a Gateway is reachable, non-dry-run `openclaw sessions cleanup` and `openclaw agents delete` delegate store mutations to the Gateway so cleanup joins the same writer queue. Session store reads do not import, prune, or cap entries during Gateway startup; use `openclaw doctor --fix` for legacy JSON import and `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` applies the configured cap immediately and prunes old unreferenced transcript, checkpoint, and trajectory artifacts even when no disk budget is configured.
|
||||
|
||||
Maintenance keeps durable external conversation pointers such as group sessions
|
||||
and thread-scoped chat sessions, but synthetic runtime entries for cron, hooks,
|
||||
|
||||
@@ -22,7 +22,6 @@ import {
|
||||
} from "../../config/sessions/reset.js";
|
||||
import { resolveAndPersistSessionFile } from "../../config/sessions/session-file.js";
|
||||
import { resolveSessionKey } from "../../config/sessions/session-key.js";
|
||||
import { resolveMaintenanceConfigFromInput } from "../../config/sessions/store-maintenance.js";
|
||||
import { loadSessionStore, updateSessionStore } from "../../config/sessions/store.js";
|
||||
import { parseSessionThreadInfoFast } from "../../config/sessions/thread-info.js";
|
||||
import {
|
||||
@@ -34,7 +33,6 @@ import {
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import type { TtsAutoMode } from "../../config/types.tts.js";
|
||||
import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js";
|
||||
import { deliverSessionMaintenanceWarning } from "../../infra/session-maintenance-warning.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { closeTrackedBrowserTabsForSessions } from "../../plugin-sdk/browser-maintenance.js";
|
||||
import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
|
||||
@@ -260,7 +258,6 @@ export async function initSessionState(params: {
|
||||
? { ...ctx, SessionKey: targetSessionKey }
|
||||
: ctx;
|
||||
const sessionCfg = cfg.session;
|
||||
const maintenanceConfig = resolveMaintenanceConfigFromInput(sessionCfg?.maintenance);
|
||||
const mainKey = normalizeMainKey(sessionCfg?.mainKey);
|
||||
const agentId = resolveSessionAgentId({
|
||||
sessionKey: sessionCtxForState.SessionKey,
|
||||
@@ -751,7 +748,6 @@ export async function initSessionState(params: {
|
||||
sessionsDir: path.dirname(storePath),
|
||||
fallbackSessionFile,
|
||||
activeSessionKey: sessionKey,
|
||||
maintenanceConfig,
|
||||
});
|
||||
sessionEntry = resolvedSessionFile.sessionEntry;
|
||||
if (isNewSession) {
|
||||
@@ -782,14 +778,6 @@ export async function initSessionState(params: {
|
||||
},
|
||||
{
|
||||
activeSessionKey: sessionKey,
|
||||
maintenanceConfig,
|
||||
onWarn: (warning) =>
|
||||
deliverSessionMaintenanceWarning({
|
||||
cfg,
|
||||
sessionKey,
|
||||
entry: sessionEntry,
|
||||
warning,
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -30,7 +30,6 @@ const NON_CHANNEL_DEP_KEYS = new Set([
|
||||
"hasOwnProperty",
|
||||
"inspect",
|
||||
"log",
|
||||
"migrateOrphanedSessionKeys",
|
||||
"nowMs",
|
||||
"onEvent",
|
||||
"requestHeartbeat",
|
||||
|
||||
@@ -864,9 +864,9 @@ export async function noteStateIntegrity(
|
||||
warnings.push(
|
||||
[
|
||||
`- ${missing.length}/${recentTranscriptCandidates.length} recent sessions are missing transcripts.`,
|
||||
` Verify sessions in store: ${formatCliCommand(`openclaw sessions --store "${absoluteStorePath}"`)}`,
|
||||
` Preview cleanup impact: ${formatCliCommand(`openclaw sessions cleanup --store "${absoluteStorePath}" --dry-run`)}`,
|
||||
` Prune missing entries: ${formatCliCommand(`openclaw sessions cleanup --store "${absoluteStorePath}" --enforce --fix-missing`)}`,
|
||||
` Import legacy session indexes: ${formatCliCommand("openclaw doctor --fix")}`,
|
||||
` Preview cleanup impact: ${formatCliCommand("openclaw sessions cleanup --dry-run")}`,
|
||||
` Prune missing entries: ${formatCliCommand("openclaw sessions cleanup --enforce --fix-missing")}`,
|
||||
].join("\n"),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -23,11 +23,7 @@ import {
|
||||
pruneStaleEntries,
|
||||
type ResolvedSessionMaintenanceConfig,
|
||||
} from "./store-maintenance.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
updateSessionStore,
|
||||
type SessionMaintenanceApplyReport,
|
||||
} from "./store.js";
|
||||
import { loadSessionStore, updateSessionStore } from "./store.js";
|
||||
import {
|
||||
resolveSessionStoreTargets,
|
||||
type SessionStoreTarget,
|
||||
@@ -89,6 +85,15 @@ export type SessionsCleanupRunResult = {
|
||||
appliedSummaries: SessionCleanupSummary[];
|
||||
};
|
||||
|
||||
type AppliedSessionCleanupReport = {
|
||||
mode: ResolvedSessionMaintenanceConfig["mode"];
|
||||
beforeCount: number;
|
||||
afterCount: number;
|
||||
pruned: number;
|
||||
capped: number;
|
||||
diskBudget: Awaited<ReturnType<typeof enforceSessionDiskBudget>>;
|
||||
};
|
||||
|
||||
export function resolveSessionCleanupAction(params: {
|
||||
key: string;
|
||||
missingKeys: Set<string>;
|
||||
@@ -312,29 +317,57 @@ export async function runSessionsCleanup(params: {
|
||||
const appliedSummaries: SessionCleanupSummary[] = [];
|
||||
if (!opts.dryRun) {
|
||||
for (const target of targets) {
|
||||
const appliedReportRef: { current: SessionMaintenanceApplyReport | null } = {
|
||||
const appliedReportRef: { current: AppliedSessionCleanupReport | null } = {
|
||||
current: null,
|
||||
};
|
||||
const missingApplied = await updateSessionStore(
|
||||
target.storePath,
|
||||
async (store) => {
|
||||
if (!opts.fixMissing) {
|
||||
return 0;
|
||||
const beforeCount = Object.keys(store).length;
|
||||
const missing = opts.fixMissing
|
||||
? pruneMissingTranscriptEntries({
|
||||
store,
|
||||
storePath: target.storePath,
|
||||
})
|
||||
: 0;
|
||||
let pruned = 0;
|
||||
let capped = 0;
|
||||
let diskBudget: AppliedSessionCleanupReport["diskBudget"] = null;
|
||||
if (mode === "warn") {
|
||||
diskBudget = await enforceSessionDiskBudget({
|
||||
store,
|
||||
storePath: target.storePath,
|
||||
activeSessionKey: opts.activeKey,
|
||||
maintenance,
|
||||
warnOnly: true,
|
||||
});
|
||||
} else {
|
||||
const preserveKeys = opts.activeKey ? new Set([opts.activeKey]) : undefined;
|
||||
pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, {
|
||||
preserveKeys,
|
||||
});
|
||||
capped = capEntryCount(store, maintenance.maxEntries, {
|
||||
preserveKeys,
|
||||
});
|
||||
diskBudget = await enforceSessionDiskBudget({
|
||||
store,
|
||||
storePath: target.storePath,
|
||||
activeSessionKey: opts.activeKey,
|
||||
maintenance,
|
||||
warnOnly: false,
|
||||
});
|
||||
}
|
||||
return pruneMissingTranscriptEntries({
|
||||
store,
|
||||
storePath: target.storePath,
|
||||
});
|
||||
},
|
||||
{
|
||||
activeSessionKey: opts.activeKey,
|
||||
maintenanceOverride: {
|
||||
appliedReportRef.current = {
|
||||
mode,
|
||||
},
|
||||
onMaintenanceApplied: (report) => {
|
||||
appliedReportRef.current = report;
|
||||
},
|
||||
beforeCount,
|
||||
afterCount: Object.keys(store).length,
|
||||
pruned,
|
||||
capped,
|
||||
diskBudget,
|
||||
};
|
||||
return missing;
|
||||
},
|
||||
opts.activeKey ? { activeSessionKey: opts.activeKey } : undefined,
|
||||
);
|
||||
const afterStore = loadSessionStore(target.storePath, { skipCache: true });
|
||||
const unreferencedArtifacts =
|
||||
|
||||
@@ -220,14 +220,15 @@ describe("enforceSessionDiskBudget", () => {
|
||||
},
|
||||
};
|
||||
await fs.writeFile(storePath, JSON.stringify(store, null, 2), "utf-8");
|
||||
await fs.writeFile(path.join(dir, "removable-worker.jsonl"), "w".repeat(800), "utf-8");
|
||||
|
||||
const result = await enforceSessionDiskBudget({
|
||||
store,
|
||||
storePath,
|
||||
activeSessionKey: activeKey,
|
||||
maintenance: {
|
||||
maxDiskBytes: 1000,
|
||||
highWaterBytes: 500,
|
||||
maxDiskBytes: 600,
|
||||
highWaterBytes: 200,
|
||||
},
|
||||
warnOnly: false,
|
||||
});
|
||||
@@ -238,6 +239,7 @@ describe("enforceSessionDiskBudget", () => {
|
||||
expect(result).toEqual(
|
||||
expect.objectContaining({
|
||||
removedEntries: 1,
|
||||
removedFiles: 1,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
@@ -68,27 +68,6 @@ function canonicalizePathForComparison(filePath: string): string {
|
||||
}
|
||||
}
|
||||
|
||||
function measureStoreBytes(store: Record<string, SessionEntry>): number {
|
||||
return Buffer.byteLength(JSON.stringify(store, null, 2), "utf-8");
|
||||
}
|
||||
|
||||
function measureStoreEntryChunkBytes(key: string, entry: SessionEntry): number {
|
||||
const singleEntryStore = JSON.stringify({ [key]: entry }, null, 2);
|
||||
if (!singleEntryStore.startsWith("{\n") || !singleEntryStore.endsWith("\n}")) {
|
||||
return measureStoreBytes({ [key]: entry }) - 4;
|
||||
}
|
||||
const chunk = singleEntryStore.slice(2, -2);
|
||||
return Buffer.byteLength(chunk, "utf-8");
|
||||
}
|
||||
|
||||
function buildStoreEntryChunkSizeMap(store: Record<string, SessionEntry>): Map<string, number> {
|
||||
const out = new Map<string, number>();
|
||||
for (const [key, entry] of Object.entries(store)) {
|
||||
out.set(key, measureStoreEntryChunkBytes(key, entry));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function getEntryUpdatedAt(entry?: SessionEntry): number {
|
||||
if (!entry) {
|
||||
return 0;
|
||||
@@ -350,10 +329,9 @@ export async function enforceSessionDiskBudget(params: {
|
||||
const fileSizesByPath = new Map(files.map((file) => [file.canonicalPath, file.size]));
|
||||
const simulatedRemovedPaths = new Set<string>();
|
||||
const resolvedStorePath = canonicalizePathForComparison(params.storePath);
|
||||
const storeFile = files.find((file) => file.canonicalPath === resolvedStorePath);
|
||||
let projectedStoreBytes = measureStoreBytes(params.store);
|
||||
let total =
|
||||
files.reduce((sum, file) => sum + file.size, 0) - (storeFile?.size ?? 0) + projectedStoreBytes;
|
||||
let total = files
|
||||
.filter((file) => file.canonicalPath !== resolvedStorePath)
|
||||
.reduce((sum, file) => sum + file.size, 0);
|
||||
const totalBefore = total;
|
||||
if (total <= maxBytes) {
|
||||
return {
|
||||
@@ -421,7 +399,6 @@ export async function enforceSessionDiskBudget(params: {
|
||||
if (total > highWaterBytes) {
|
||||
const activeSessionKey = normalizeOptionalLowercaseString(params.activeSessionKey);
|
||||
const sessionIdRefCounts = buildSessionIdRefCounts(params.store);
|
||||
const entryChunkBytesByKey = buildStoreEntryChunkSizeMap(params.store);
|
||||
const keys = Object.keys(params.store).toSorted((a, b) => {
|
||||
const aTime = getEntryUpdatedAt(params.store[a]);
|
||||
const bTime = getEntryUpdatedAt(params.store[b]);
|
||||
@@ -441,17 +418,7 @@ export async function enforceSessionDiskBudget(params: {
|
||||
if (isProtectedSessionMaintenanceEntry(key, entry)) {
|
||||
continue;
|
||||
}
|
||||
const previousProjectedBytes = projectedStoreBytes;
|
||||
delete params.store[key];
|
||||
const chunkBytes = entryChunkBytesByKey.get(key);
|
||||
entryChunkBytesByKey.delete(key);
|
||||
if (typeof chunkBytes === "number" && Number.isFinite(chunkBytes) && chunkBytes >= 0) {
|
||||
// Removing any one pretty-printed top-level entry always removes the entry chunk plus ",\n" (2 bytes).
|
||||
projectedStoreBytes = Math.max(2, projectedStoreBytes - (chunkBytes + 2));
|
||||
} else {
|
||||
projectedStoreBytes = measureStoreBytes(params.store);
|
||||
}
|
||||
total += projectedStoreBytes - previousProjectedBytes;
|
||||
removedEntries += 1;
|
||||
|
||||
const sessionId = entry.sessionId;
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import type { MsgContext } from "../../auto-reply/templating.js";
|
||||
import type { DeliveryContext } from "../../utils/delivery-context.types.js";
|
||||
import type { SessionMaintenanceMode } from "../types.base.js";
|
||||
import type { SessionEntry, GroupKeyResolution } from "./types.js";
|
||||
|
||||
export type ReadSessionUpdatedAt = (params: {
|
||||
@@ -8,41 +7,10 @@ export type ReadSessionUpdatedAt = (params: {
|
||||
sessionKey: string;
|
||||
}) => number | undefined;
|
||||
|
||||
export type SessionMaintenanceWarningRuntime = {
|
||||
activeSessionKey: string;
|
||||
activeUpdatedAt?: number;
|
||||
totalEntries: number;
|
||||
pruneAfterMs: number;
|
||||
maxEntries: number;
|
||||
wouldPrune: boolean;
|
||||
wouldCap: boolean;
|
||||
};
|
||||
|
||||
export type ResolvedSessionMaintenanceConfigRuntime = {
|
||||
mode: SessionMaintenanceMode;
|
||||
pruneAfterMs: number;
|
||||
maxEntries: number;
|
||||
resetArchiveRetentionMs: number | null;
|
||||
maxDiskBytes: number | null;
|
||||
highWaterBytes: number | null;
|
||||
};
|
||||
|
||||
export type SessionMaintenanceApplyReportRuntime = {
|
||||
mode: SessionMaintenanceMode;
|
||||
beforeCount: number;
|
||||
afterCount: number;
|
||||
pruned: number;
|
||||
capped: number;
|
||||
diskBudget: Record<string, unknown> | null;
|
||||
};
|
||||
|
||||
export type SaveSessionStoreOptions = {
|
||||
skipMaintenance?: boolean;
|
||||
activeSessionKey?: string;
|
||||
allowDropAcpMetaSessionKeys?: string[];
|
||||
onWarn?: (warning: SessionMaintenanceWarningRuntime) => void | Promise<void>;
|
||||
onMaintenanceApplied?: (report: SessionMaintenanceApplyReportRuntime) => void | Promise<void>;
|
||||
maintenanceOverride?: Partial<ResolvedSessionMaintenanceConfigRuntime>;
|
||||
};
|
||||
|
||||
export type SaveSessionStore = (
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import { resolveSessionFilePath } from "./paths.js";
|
||||
import type { ResolvedSessionMaintenanceConfig } from "./store-maintenance.js";
|
||||
import { updateSessionStore } from "./store.js";
|
||||
import type { SessionEntry } from "./types.js";
|
||||
|
||||
@@ -13,7 +12,6 @@ export async function resolveAndPersistSessionFile(params: {
|
||||
sessionsDir?: string;
|
||||
fallbackSessionFile?: string;
|
||||
activeSessionKey?: string;
|
||||
maintenanceConfig?: ResolvedSessionMaintenanceConfig;
|
||||
}): Promise<{ sessionFile: string; sessionEntry: SessionEntry }> {
|
||||
const { sessionId, sessionKey, sessionStore, storePath } = params;
|
||||
const now = Date.now();
|
||||
@@ -49,12 +47,7 @@ export async function resolveAndPersistSessionFile(params: {
|
||||
...persistedEntry,
|
||||
};
|
||||
},
|
||||
params.activeSessionKey || params.maintenanceConfig
|
||||
? {
|
||||
...(params.activeSessionKey ? { activeSessionKey: params.activeSessionKey } : {}),
|
||||
...(params.maintenanceConfig ? { maintenanceConfig: params.maintenanceConfig } : {}),
|
||||
}
|
||||
: undefined,
|
||||
params.activeSessionKey ? { activeSessionKey: params.activeSessionKey } : undefined,
|
||||
);
|
||||
return { sessionFile, sessionEntry: persistedEntry };
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import fs from "node:fs";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { getFileStatSnapshot } from "../cache-utils.js";
|
||||
import {
|
||||
loadSqliteSessionStore,
|
||||
@@ -12,13 +11,6 @@ import {
|
||||
setSerializedSessionStore,
|
||||
writeSessionStoreCache,
|
||||
} from "./store-cache.js";
|
||||
import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js";
|
||||
import {
|
||||
capEntryCount,
|
||||
pruneStaleEntries,
|
||||
shouldRunSessionEntryMaintenance,
|
||||
type ResolvedSessionMaintenanceConfig,
|
||||
} from "./store-maintenance.js";
|
||||
import { applySessionStoreMigrations } from "./store-migrations.js";
|
||||
import { normalizeSessionStore } from "./store-normalize.js";
|
||||
import type { SessionEntry } from "./types.js";
|
||||
@@ -27,13 +19,9 @@ export { normalizeSessionStore } from "./store-normalize.js";
|
||||
|
||||
export type LoadSessionStoreOptions = {
|
||||
skipCache?: boolean;
|
||||
maintenanceConfig?: ResolvedSessionMaintenanceConfig;
|
||||
runMaintenance?: boolean;
|
||||
clone?: boolean;
|
||||
};
|
||||
|
||||
const log = createSubsystemLogger("sessions/store");
|
||||
|
||||
function isSessionStoreRecord(value: unknown): value is Record<string, SessionEntry> {
|
||||
return !!value && typeof value === "object" && !Array.isArray(value);
|
||||
}
|
||||
@@ -96,32 +84,6 @@ export function loadSessionStore(
|
||||
if (migrated || normalized) {
|
||||
serializedFromDisk = undefined;
|
||||
}
|
||||
if (opts.runMaintenance) {
|
||||
const maintenance = opts.maintenanceConfig ?? resolveMaintenanceConfig();
|
||||
const beforeCount = Object.keys(store).length;
|
||||
if (maintenance.mode === "enforce" && beforeCount > maintenance.maxEntries) {
|
||||
const pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, { log: false });
|
||||
const countAfterPrune = Object.keys(store).length;
|
||||
const capped = shouldRunSessionEntryMaintenance({
|
||||
entryCount: countAfterPrune,
|
||||
maxEntries: maintenance.maxEntries,
|
||||
})
|
||||
? capEntryCount(store, maintenance.maxEntries, { log: false })
|
||||
: 0;
|
||||
const afterCount = Object.keys(store).length;
|
||||
if (pruned > 0 || capped > 0) {
|
||||
serializedFromDisk = undefined;
|
||||
log.info("applied load-time maintenance to oversized session store", {
|
||||
storePath,
|
||||
before: beforeCount,
|
||||
after: afterCount,
|
||||
pruned,
|
||||
capped,
|
||||
maxEntries: maintenance.maxEntries,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
setSerializedSessionStore(storePath, serializedFromDisk);
|
||||
|
||||
|
||||
@@ -1,937 +0,0 @@
|
||||
import crypto from "node:crypto";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { createSuiteTempRootTracker } from "../../test-helpers/temp-dir.js";
|
||||
import {
|
||||
resolveTrajectoryFilePath,
|
||||
resolveTrajectoryPointerFilePath,
|
||||
} from "../../trajectory/paths.js";
|
||||
import type { SessionEntry } from "./types.js";
|
||||
|
||||
// Keep integration tests deterministic: never read a real openclaw.json.
|
||||
vi.mock("../config.js", async () => ({
|
||||
...(await vi.importActual<typeof import("../config.js")>("../config.js")),
|
||||
getRuntimeConfig: vi.fn().mockReturnValue({}),
|
||||
}));
|
||||
|
||||
import { getRuntimeConfig } from "../config.js";
|
||||
import { runSessionsCleanup } from "./cleanup-service.js";
|
||||
import {
|
||||
clearSessionStoreCacheForTest,
|
||||
loadSessionStore,
|
||||
saveSessionStore,
|
||||
updateSessionStore,
|
||||
} from "./store.js";
|
||||
|
||||
let mockLoadConfig: ReturnType<typeof vi.fn>;
|
||||
|
||||
const DAY_MS = 24 * 60 * 60 * 1000;
|
||||
const ENFORCED_MAINTENANCE_OVERRIDE = {
|
||||
mode: "enforce" as const,
|
||||
pruneAfterMs: 7 * DAY_MS,
|
||||
maxEntries: 500,
|
||||
resetArchiveRetentionMs: 7 * DAY_MS,
|
||||
maxDiskBytes: null,
|
||||
highWaterBytes: null,
|
||||
};
|
||||
|
||||
const archiveTimestamp = (ms: number) => new Date(ms).toISOString().replaceAll(":", "-");
|
||||
|
||||
const suiteRootTracker = createSuiteTempRootTracker({ prefix: "openclaw-pruning-integ-" });
|
||||
|
||||
function makeEntry(updatedAt: number): SessionEntry {
|
||||
return { sessionId: crypto.randomUUID(), updatedAt };
|
||||
}
|
||||
|
||||
function applyEnforcedMaintenanceConfig(mockLoadConfig: ReturnType<typeof vi.fn>) {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "enforce",
|
||||
pruneAfter: "7d",
|
||||
maxEntries: 500,
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function applyCappedMaintenanceConfig(mockLoadConfig: ReturnType<typeof vi.fn>) {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "enforce",
|
||||
pruneAfter: "365d",
|
||||
maxEntries: 1,
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async function createCaseDir(prefix: string): Promise<string> {
|
||||
return await suiteRootTracker.make(prefix);
|
||||
}
|
||||
|
||||
function createStaleAndFreshStore(now = Date.now()): Record<string, SessionEntry> {
|
||||
return {
|
||||
stale: makeEntry(now - 30 * DAY_MS),
|
||||
fresh: makeEntry(now),
|
||||
};
|
||||
}
|
||||
|
||||
describe("Integration: saveSessionStore with pruning", () => {
|
||||
let testDir: string;
|
||||
let storePath: string;
|
||||
let savedCacheTtl: string | undefined;
|
||||
|
||||
beforeAll(async () => {
|
||||
await suiteRootTracker.setup();
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
await suiteRootTracker.cleanup();
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
mockLoadConfig = vi.mocked(getRuntimeConfig) as ReturnType<typeof vi.fn>;
|
||||
mockLoadConfig.mockReset();
|
||||
testDir = await createCaseDir("pruning-integ");
|
||||
storePath = path.join(testDir, "sessions.json");
|
||||
savedCacheTtl = process.env.OPENCLAW_SESSION_CACHE_TTL_MS;
|
||||
process.env.OPENCLAW_SESSION_CACHE_TTL_MS = "0";
|
||||
clearSessionStoreCacheForTest();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
mockLoadConfig.mockReset();
|
||||
clearSessionStoreCacheForTest();
|
||||
if (savedCacheTtl === undefined) {
|
||||
delete process.env.OPENCLAW_SESSION_CACHE_TTL_MS;
|
||||
} else {
|
||||
process.env.OPENCLAW_SESSION_CACHE_TTL_MS = savedCacheTtl;
|
||||
}
|
||||
});
|
||||
|
||||
it("saveSessionStore prunes stale entries on write", async () => {
|
||||
applyEnforcedMaintenanceConfig(mockLoadConfig);
|
||||
|
||||
const store = createStaleAndFreshStore();
|
||||
|
||||
await saveSessionStore(storePath, store, {
|
||||
maintenanceOverride: ENFORCED_MAINTENANCE_OVERRIDE,
|
||||
});
|
||||
|
||||
const loaded = loadSessionStore(storePath, { skipCache: true });
|
||||
expect(loaded.stale).toBeUndefined();
|
||||
expect(loaded.fresh).toBeDefined();
|
||||
});
|
||||
|
||||
it("archives transcript files for stale sessions pruned on write", async () => {
|
||||
applyEnforcedMaintenanceConfig(mockLoadConfig);
|
||||
|
||||
const now = Date.now();
|
||||
const staleSessionId = "stale-session";
|
||||
const freshSessionId = "fresh-session";
|
||||
const store: Record<string, SessionEntry> = {
|
||||
stale: { sessionId: staleSessionId, updatedAt: now - 30 * DAY_MS },
|
||||
fresh: { sessionId: freshSessionId, updatedAt: now },
|
||||
};
|
||||
const staleTranscript = path.join(testDir, `${staleSessionId}.jsonl`);
|
||||
const freshTranscript = path.join(testDir, `${freshSessionId}.jsonl`);
|
||||
await fs.writeFile(staleTranscript, '{"type":"session"}\n', "utf-8");
|
||||
await fs.writeFile(freshTranscript, '{"type":"session"}\n', "utf-8");
|
||||
|
||||
await saveSessionStore(storePath, store);
|
||||
|
||||
const loaded = loadSessionStore(storePath);
|
||||
expect(loaded.stale).toBeUndefined();
|
||||
expect(loaded.fresh).toBeDefined();
|
||||
await expect(fs.stat(staleTranscript)).rejects.toThrow();
|
||||
await expect(fs.stat(freshTranscript)).resolves.toBeDefined();
|
||||
const dirEntries = await fs.readdir(testDir);
|
||||
const archived = dirEntries.filter((entry) =>
|
||||
entry.startsWith(`${staleSessionId}.jsonl.deleted.`),
|
||||
);
|
||||
expect(archived).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("removes trajectory sidecars for stale sessions pruned on write", async () => {
|
||||
applyEnforcedMaintenanceConfig(mockLoadConfig);
|
||||
|
||||
const now = Date.now();
|
||||
const staleSessionId = "stale-trajectory-session";
|
||||
const freshSessionId = "fresh-trajectory-session";
|
||||
const store: Record<string, SessionEntry> = {
|
||||
stale: { sessionId: staleSessionId, updatedAt: now - 30 * DAY_MS },
|
||||
fresh: { sessionId: freshSessionId, updatedAt: now },
|
||||
};
|
||||
const staleTranscript = path.join(testDir, `${staleSessionId}.jsonl`);
|
||||
const freshTranscript = path.join(testDir, `${freshSessionId}.jsonl`);
|
||||
const staleRuntime = resolveTrajectoryFilePath({
|
||||
env: {},
|
||||
sessionFile: staleTranscript,
|
||||
sessionId: staleSessionId,
|
||||
});
|
||||
const freshRuntime = resolveTrajectoryFilePath({
|
||||
env: {},
|
||||
sessionFile: freshTranscript,
|
||||
sessionId: freshSessionId,
|
||||
});
|
||||
const stalePointer = resolveTrajectoryPointerFilePath(staleTranscript);
|
||||
const freshPointer = resolveTrajectoryPointerFilePath(freshTranscript);
|
||||
await fs.writeFile(staleTranscript, '{"type":"session"}\n', "utf-8");
|
||||
await fs.writeFile(freshTranscript, '{"type":"session"}\n', "utf-8");
|
||||
await fs.writeFile(staleRuntime, '{"traceSchema":"openclaw-trajectory"}\n', "utf-8");
|
||||
await fs.writeFile(freshRuntime, '{"traceSchema":"openclaw-trajectory"}\n', "utf-8");
|
||||
await fs.writeFile(
|
||||
stalePointer,
|
||||
JSON.stringify({
|
||||
traceSchema: "openclaw-trajectory-pointer",
|
||||
schemaVersion: 1,
|
||||
sessionId: staleSessionId,
|
||||
runtimeFile: staleRuntime,
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
await fs.writeFile(
|
||||
freshPointer,
|
||||
JSON.stringify({
|
||||
traceSchema: "openclaw-trajectory-pointer",
|
||||
schemaVersion: 1,
|
||||
sessionId: freshSessionId,
|
||||
runtimeFile: freshRuntime,
|
||||
}),
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
await saveSessionStore(storePath, store);
|
||||
|
||||
await expect(fs.stat(staleRuntime)).rejects.toThrow();
|
||||
await expect(fs.stat(stalePointer)).rejects.toThrow();
|
||||
await expect(fs.stat(freshRuntime)).resolves.toBeDefined();
|
||||
await expect(fs.stat(freshPointer)).resolves.toBeDefined();
|
||||
});
|
||||
|
||||
it("sessions cleanup prunes old unreferenced session artifacts without touching referenced files", async () => {
|
||||
applyEnforcedMaintenanceConfig(mockLoadConfig);
|
||||
|
||||
const now = Date.now();
|
||||
const oldDate = new Date(now - 10 * DAY_MS);
|
||||
const freshDate = new Date(now);
|
||||
const referencedCheckpointPath = path.join(
|
||||
testDir,
|
||||
"fresh-session.checkpoint.22222222-2222-4222-8222-222222222222.jsonl",
|
||||
);
|
||||
const store: Record<string, SessionEntry> = {
|
||||
fresh: {
|
||||
sessionId: "fresh-session",
|
||||
updatedAt: now,
|
||||
compactionCheckpoints: [
|
||||
{
|
||||
checkpointId: "referenced",
|
||||
sessionKey: "fresh",
|
||||
sessionId: "fresh-session",
|
||||
createdAt: now,
|
||||
reason: "manual",
|
||||
preCompaction: {
|
||||
sessionId: "fresh-session",
|
||||
sessionFile: referencedCheckpointPath,
|
||||
leafId: "leaf",
|
||||
},
|
||||
postCompaction: { sessionId: "fresh-session" },
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
const referencedTranscript = path.join(testDir, "fresh-session.jsonl");
|
||||
const oldOrphanTranscript = path.join(testDir, "orphan-session.jsonl");
|
||||
const freshOrphanTranscript = path.join(testDir, "fresh-orphan.jsonl");
|
||||
const orphanRuntime = path.join(testDir, "orphan-session.trajectory.jsonl");
|
||||
const orphanPointer = path.join(testDir, "orphan-session.trajectory-path.json");
|
||||
const orphanCheckpoint = path.join(
|
||||
testDir,
|
||||
"orphan-session.checkpoint.11111111-1111-4111-8111-111111111111.jsonl",
|
||||
);
|
||||
await fs.writeFile(storePath, JSON.stringify(store, null, 2), "utf-8");
|
||||
await fs.writeFile(referencedTranscript, "referenced", "utf-8");
|
||||
await fs.writeFile(referencedCheckpointPath, "referenced checkpoint", "utf-8");
|
||||
await fs.writeFile(oldOrphanTranscript, "orphan transcript", "utf-8");
|
||||
await fs.writeFile(freshOrphanTranscript, "fresh orphan", "utf-8");
|
||||
await fs.writeFile(orphanRuntime, "orphan runtime", "utf-8");
|
||||
await fs.writeFile(orphanPointer, "orphan pointer", "utf-8");
|
||||
await fs.writeFile(orphanCheckpoint, "orphan checkpoint", "utf-8");
|
||||
for (const file of [
|
||||
referencedTranscript,
|
||||
referencedCheckpointPath,
|
||||
oldOrphanTranscript,
|
||||
orphanRuntime,
|
||||
orphanPointer,
|
||||
orphanCheckpoint,
|
||||
]) {
|
||||
await fs.utimes(file, oldDate, oldDate);
|
||||
}
|
||||
await fs.utimes(freshOrphanTranscript, freshDate, freshDate);
|
||||
|
||||
const dryRun = await runSessionsCleanup({
|
||||
cfg: {},
|
||||
opts: { store: storePath, dryRun: true, enforce: true },
|
||||
targets: [{ agentId: "main", storePath }],
|
||||
});
|
||||
expect(dryRun.previewResults[0]?.summary.unreferencedArtifacts).toEqual(
|
||||
expect.objectContaining({
|
||||
removedFiles: 4,
|
||||
}),
|
||||
);
|
||||
await expect(fs.stat(oldOrphanTranscript)).resolves.toBeDefined();
|
||||
await expect(fs.stat(orphanRuntime)).resolves.toBeDefined();
|
||||
await expect(fs.stat(orphanPointer)).resolves.toBeDefined();
|
||||
await expect(fs.stat(orphanCheckpoint)).resolves.toBeDefined();
|
||||
|
||||
const applied = await runSessionsCleanup({
|
||||
cfg: {},
|
||||
opts: { store: storePath, enforce: true },
|
||||
targets: [{ agentId: "main", storePath }],
|
||||
});
|
||||
|
||||
expect(applied.appliedSummaries[0]?.unreferencedArtifacts).toEqual(
|
||||
expect.objectContaining({
|
||||
removedFiles: 4,
|
||||
}),
|
||||
);
|
||||
await expect(fs.stat(oldOrphanTranscript)).rejects.toThrow();
|
||||
await expect(fs.stat(orphanRuntime)).rejects.toThrow();
|
||||
await expect(fs.stat(orphanPointer)).rejects.toThrow();
|
||||
await expect(fs.stat(orphanCheckpoint)).rejects.toThrow();
|
||||
await expect(fs.stat(referencedTranscript)).resolves.toBeDefined();
|
||||
await expect(fs.stat(referencedCheckpointPath)).resolves.toBeDefined();
|
||||
await expect(fs.stat(freshOrphanTranscript)).resolves.toBeDefined();
|
||||
});
|
||||
|
||||
it("sessions cleanup dry-run does not double-count artifacts already covered by disk budget", async () => {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "enforce",
|
||||
pruneAfter: "7d",
|
||||
maxEntries: 500,
|
||||
maxDiskBytes: 1000,
|
||||
highWaterBytes: 900,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const store: Record<string, SessionEntry> = {
|
||||
fresh: { sessionId: "fresh-session", updatedAt: Date.now() },
|
||||
};
|
||||
const oldOrphanTranscript = path.join(testDir, "orphan-session.jsonl");
|
||||
await fs.writeFile(storePath, JSON.stringify(store, null, 2), "utf-8");
|
||||
await fs.writeFile(oldOrphanTranscript, "x".repeat(2000), "utf-8");
|
||||
const oldDate = new Date(Date.now() - 10 * DAY_MS);
|
||||
await fs.utimes(oldOrphanTranscript, oldDate, oldDate);
|
||||
|
||||
const dryRun = await runSessionsCleanup({
|
||||
cfg: {},
|
||||
opts: { store: storePath, dryRun: true, enforce: true },
|
||||
targets: [{ agentId: "main", storePath }],
|
||||
});
|
||||
|
||||
expect(dryRun.previewResults[0]?.summary.diskBudget).toEqual(
|
||||
expect.objectContaining({
|
||||
removedFiles: 1,
|
||||
}),
|
||||
);
|
||||
expect(dryRun.previewResults[0]?.summary.unreferencedArtifacts).toEqual(
|
||||
expect.objectContaining({
|
||||
removedFiles: 0,
|
||||
}),
|
||||
);
|
||||
await expect(fs.stat(oldOrphanTranscript)).resolves.toBeDefined();
|
||||
});
|
||||
|
||||
it("sessions cleanup dry-run excludes stale and capped entry transcripts from orphan counts", async () => {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "enforce",
|
||||
pruneAfter: "7d",
|
||||
maxEntries: 1,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const now = Date.now();
|
||||
const store: Record<string, SessionEntry> = {
|
||||
stale: { sessionId: "stale-session", updatedAt: now - 30 * DAY_MS },
|
||||
capped: { sessionId: "capped-session", updatedAt: now - DAY_MS },
|
||||
fresh: { sessionId: "fresh-session", updatedAt: now },
|
||||
};
|
||||
const staleTranscript = path.join(testDir, "stale-session.jsonl");
|
||||
const cappedTranscript = path.join(testDir, "capped-session.jsonl");
|
||||
const freshTranscript = path.join(testDir, "fresh-session.jsonl");
|
||||
await fs.writeFile(storePath, JSON.stringify(store, null, 2), "utf-8");
|
||||
await fs.writeFile(staleTranscript, "stale", "utf-8");
|
||||
await fs.writeFile(cappedTranscript, "capped", "utf-8");
|
||||
await fs.writeFile(freshTranscript, "fresh", "utf-8");
|
||||
const oldDate = new Date(now - 10 * DAY_MS);
|
||||
await fs.utimes(staleTranscript, oldDate, oldDate);
|
||||
await fs.utimes(cappedTranscript, oldDate, oldDate);
|
||||
|
||||
const dryRun = await runSessionsCleanup({
|
||||
cfg: {},
|
||||
opts: { store: storePath, dryRun: true, enforce: true },
|
||||
targets: [{ agentId: "main", storePath }],
|
||||
});
|
||||
|
||||
expect(dryRun.previewResults[0]?.summary).toEqual(
|
||||
expect.objectContaining({
|
||||
pruned: 1,
|
||||
capped: 1,
|
||||
unreferencedArtifacts: expect.objectContaining({
|
||||
removedFiles: 0,
|
||||
}),
|
||||
}),
|
||||
);
|
||||
await expect(fs.stat(staleTranscript)).resolves.toBeDefined();
|
||||
await expect(fs.stat(cappedTranscript)).resolves.toBeDefined();
|
||||
await expect(fs.stat(freshTranscript)).resolves.toBeDefined();
|
||||
});
|
||||
|
||||
it("cleans up archived transcripts older than the prune window", async () => {
|
||||
applyEnforcedMaintenanceConfig(mockLoadConfig);
|
||||
|
||||
const now = Date.now();
|
||||
const staleSessionId = "stale-session";
|
||||
const store: Record<string, SessionEntry> = {
|
||||
stale: { sessionId: staleSessionId, updatedAt: now - 30 * DAY_MS },
|
||||
fresh: { sessionId: "fresh-session", updatedAt: now },
|
||||
};
|
||||
|
||||
const staleTranscript = path.join(testDir, `${staleSessionId}.jsonl`);
|
||||
await fs.writeFile(staleTranscript, '{"type":"session"}\n', "utf-8");
|
||||
|
||||
const oldArchived = path.join(
|
||||
testDir,
|
||||
`old-session.jsonl.deleted.${archiveTimestamp(now - 9 * DAY_MS)}`,
|
||||
);
|
||||
const recentArchived = path.join(
|
||||
testDir,
|
||||
`recent-session.jsonl.deleted.${archiveTimestamp(now - 2 * DAY_MS)}`,
|
||||
);
|
||||
const bakArchived = path.join(
|
||||
testDir,
|
||||
`bak-session.jsonl.bak.${archiveTimestamp(now - 20 * DAY_MS)}`,
|
||||
);
|
||||
await fs.writeFile(oldArchived, "old", "utf-8");
|
||||
await fs.writeFile(recentArchived, "recent", "utf-8");
|
||||
await fs.writeFile(bakArchived, "bak", "utf-8");
|
||||
|
||||
await saveSessionStore(storePath, store);
|
||||
|
||||
await expect(fs.stat(oldArchived)).rejects.toThrow();
|
||||
await expect(fs.stat(recentArchived)).resolves.toBeDefined();
|
||||
await expect(fs.stat(bakArchived)).resolves.toBeDefined();
|
||||
});
|
||||
|
||||
it("cleans up reset archives using resetArchiveRetention", async () => {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "enforce",
|
||||
pruneAfter: "30d",
|
||||
resetArchiveRetention: "3d",
|
||||
maxEntries: 500,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const now = Date.now();
|
||||
const store: Record<string, SessionEntry> = {
|
||||
fresh: { sessionId: "fresh-session", updatedAt: now },
|
||||
};
|
||||
const oldReset = path.join(
|
||||
testDir,
|
||||
`old-reset.jsonl.reset.${archiveTimestamp(now - 10 * DAY_MS)}`,
|
||||
);
|
||||
const freshReset = path.join(
|
||||
testDir,
|
||||
`fresh-reset.jsonl.reset.${archiveTimestamp(now - 1 * DAY_MS)}`,
|
||||
);
|
||||
await fs.writeFile(oldReset, "old", "utf-8");
|
||||
await fs.writeFile(freshReset, "fresh", "utf-8");
|
||||
|
||||
await saveSessionStore(storePath, store);
|
||||
|
||||
await expect(fs.stat(oldReset)).rejects.toThrow();
|
||||
await expect(fs.stat(freshReset)).resolves.toBeDefined();
|
||||
});
|
||||
|
||||
it("saveSessionStore skips enforcement when maintenance mode is warn", async () => {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "warn",
|
||||
pruneAfter: "7d",
|
||||
maxEntries: 1,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const store = createStaleAndFreshStore();
|
||||
|
||||
await saveSessionStore(storePath, store);
|
||||
|
||||
const loaded = loadSessionStore(storePath);
|
||||
expect(loaded.stale).toBeDefined();
|
||||
expect(loaded.fresh).toBeDefined();
|
||||
expect(Object.keys(loaded)).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("loadSessionStore leaves oversized stores untouched during normal reads", async () => {
|
||||
const now = Date.now();
|
||||
const store: Record<string, SessionEntry> = {
|
||||
stale: makeEntry(now - 31 * DAY_MS),
|
||||
recent: makeEntry(now - DAY_MS),
|
||||
newest: makeEntry(now),
|
||||
};
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
|
||||
const loaded = loadSessionStore(storePath, {
|
||||
skipCache: true,
|
||||
maintenanceConfig: {
|
||||
...ENFORCED_MAINTENANCE_OVERRIDE,
|
||||
maxEntries: 2,
|
||||
pruneAfterMs: 7 * DAY_MS,
|
||||
},
|
||||
});
|
||||
|
||||
expect(Object.keys(loaded)).toHaveLength(3);
|
||||
expect(loaded.stale).toBeDefined();
|
||||
expect(loaded.recent).toBeDefined();
|
||||
expect(loaded.newest).toBeDefined();
|
||||
});
|
||||
|
||||
it("loadSessionStore applies maintenance only when explicitly requested", async () => {
|
||||
const now = Date.now();
|
||||
const store: Record<string, SessionEntry> = {
|
||||
stale: makeEntry(now - 31 * DAY_MS),
|
||||
recent: makeEntry(now - DAY_MS),
|
||||
newest: makeEntry(now),
|
||||
};
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
|
||||
const loaded = loadSessionStore(storePath, {
|
||||
skipCache: true,
|
||||
runMaintenance: true,
|
||||
maintenanceConfig: {
|
||||
...ENFORCED_MAINTENANCE_OVERRIDE,
|
||||
maxEntries: 1,
|
||||
pruneAfterMs: 7 * DAY_MS,
|
||||
},
|
||||
});
|
||||
|
||||
expect(loaded.stale).toBeUndefined();
|
||||
expect(loaded.recent).toBeUndefined();
|
||||
expect(loaded.newest).toBeDefined();
|
||||
});
|
||||
|
||||
it("loadSessionStore does not cap oversized stores during normal reads", async () => {
|
||||
const now = Date.now();
|
||||
const store: Record<string, SessionEntry> = {
|
||||
oldest: makeEntry(now - 3 * DAY_MS),
|
||||
recent: makeEntry(now - DAY_MS),
|
||||
newest: makeEntry(now),
|
||||
};
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
|
||||
const loaded = loadSessionStore(storePath, {
|
||||
skipCache: true,
|
||||
maintenanceConfig: {
|
||||
...ENFORCED_MAINTENANCE_OVERRIDE,
|
||||
maxEntries: 2,
|
||||
pruneAfterMs: 365 * DAY_MS,
|
||||
},
|
||||
});
|
||||
|
||||
expect(Object.keys(loaded)).toHaveLength(3);
|
||||
expect(loaded.oldest).toBeDefined();
|
||||
expect(loaded.recent).toBeDefined();
|
||||
expect(loaded.newest).toBeDefined();
|
||||
});
|
||||
|
||||
it("explicit loadSessionStore maintenance batches entry-count cleanup until the high-water mark", async () => {
|
||||
const now = Date.now();
|
||||
const store = Object.fromEntries(
|
||||
Array.from({ length: 51 }, (_, index) => [`session-${index}`, makeEntry(now - index)]),
|
||||
);
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
|
||||
const loaded = loadSessionStore(storePath, {
|
||||
skipCache: true,
|
||||
runMaintenance: true,
|
||||
maintenanceConfig: {
|
||||
...ENFORCED_MAINTENANCE_OVERRIDE,
|
||||
maxEntries: 50,
|
||||
pruneAfterMs: 365 * DAY_MS,
|
||||
},
|
||||
});
|
||||
|
||||
expect(Object.keys(loaded)).toHaveLength(51);
|
||||
});
|
||||
|
||||
it("explicit loadSessionStore maintenance caps production-sized stores once they reach the high-water mark", async () => {
|
||||
const now = Date.now();
|
||||
const store = Object.fromEntries(
|
||||
Array.from({ length: 75 }, (_, index) => [`session-${index}`, makeEntry(now - index)]),
|
||||
);
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
|
||||
const loaded = loadSessionStore(storePath, {
|
||||
skipCache: true,
|
||||
runMaintenance: true,
|
||||
maintenanceConfig: {
|
||||
...ENFORCED_MAINTENANCE_OVERRIDE,
|
||||
maxEntries: 50,
|
||||
pruneAfterMs: 365 * DAY_MS,
|
||||
},
|
||||
});
|
||||
|
||||
expect(Object.keys(loaded)).toHaveLength(50);
|
||||
expect(loaded["session-0"]).toBeDefined();
|
||||
expect(loaded["session-74"]).toBeUndefined();
|
||||
});
|
||||
|
||||
it("explicit loadSessionStore maintenance preserves channel, thread, and topic session pointers", async () => {
|
||||
const now = Date.now();
|
||||
const channelKey = "agent:main:slack:channel:C123";
|
||||
const threadKey = "agent:main:discord:channel:123456:thread:987654";
|
||||
const topicKey = "agent:main:telegram:group:-100123:topic:77";
|
||||
const store = Object.fromEntries(
|
||||
Array.from({ length: 75 }, (_, index) => [`session-${index}`, makeEntry(now - index)]),
|
||||
);
|
||||
store[channelKey] = makeEntry(now - 99 * DAY_MS);
|
||||
store[threadKey] = makeEntry(now - 100 * DAY_MS);
|
||||
store[topicKey] = makeEntry(now - 101 * DAY_MS);
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
|
||||
const loaded = loadSessionStore(storePath, {
|
||||
skipCache: true,
|
||||
runMaintenance: true,
|
||||
maintenanceConfig: {
|
||||
...ENFORCED_MAINTENANCE_OVERRIDE,
|
||||
maxEntries: 50,
|
||||
pruneAfterMs: 365 * DAY_MS,
|
||||
},
|
||||
});
|
||||
|
||||
expect(Object.keys(loaded)).toHaveLength(50);
|
||||
expect(loaded[channelKey]).toBeDefined();
|
||||
expect(loaded[threadKey]).toBeDefined();
|
||||
expect(loaded[topicKey]).toBeDefined();
|
||||
expect(loaded["session-74"]).toBeUndefined();
|
||||
});
|
||||
|
||||
it("updateSessionStore batches cap-hit maintenance instead of pruning every new session", async () => {
|
||||
const now = Date.now();
|
||||
const store = Object.fromEntries(
|
||||
Array.from({ length: 50 }, (_, index) => [`session-${index}`, makeEntry(now - index)]),
|
||||
);
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "enforce",
|
||||
pruneAfter: "365d",
|
||||
maxEntries: 50,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await updateSessionStore(storePath, (next) => {
|
||||
next["session-50"] = makeEntry(now + 1);
|
||||
});
|
||||
|
||||
const loaded = loadSessionStore(storePath, { skipCache: true });
|
||||
expect(Object.keys(loaded)).toHaveLength(51);
|
||||
expect(loaded["session-50"]).toBeDefined();
|
||||
});
|
||||
|
||||
it("loadSessionStore honors configured maxEntries without an explicit override", async () => {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "enforce",
|
||||
pruneAfter: "365d",
|
||||
maxEntries: 1000,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const now = Date.now();
|
||||
const store = Object.fromEntries(
|
||||
Array.from({ length: 501 }, (_, index) => [`session-${index}`, makeEntry(now - index)]),
|
||||
);
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
|
||||
const loaded = loadSessionStore(storePath, { skipCache: true });
|
||||
|
||||
expect(Object.keys(loaded)).toHaveLength(501);
|
||||
});
|
||||
|
||||
it("loadSessionStore honors configured warn mode without an explicit override", async () => {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "warn",
|
||||
pruneAfter: "365d",
|
||||
maxEntries: 1,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const now = Date.now();
|
||||
const store: Record<string, SessionEntry> = {
|
||||
oldest: makeEntry(now - DAY_MS),
|
||||
newest: makeEntry(now),
|
||||
};
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
|
||||
const loaded = loadSessionStore(storePath, { skipCache: true });
|
||||
|
||||
expect(Object.keys(loaded)).toHaveLength(2);
|
||||
expect(loaded.oldest).toBeDefined();
|
||||
expect(loaded.newest).toBeDefined();
|
||||
});
|
||||
|
||||
it("archives transcript files for entries evicted by maxEntries capping", async () => {
|
||||
applyCappedMaintenanceConfig(mockLoadConfig);
|
||||
|
||||
const now = Date.now();
|
||||
const oldestSessionId = "oldest-session";
|
||||
const newestSessionId = "newest-session";
|
||||
const store: Record<string, SessionEntry> = {
|
||||
oldest: { sessionId: oldestSessionId, updatedAt: now - DAY_MS },
|
||||
newest: { sessionId: newestSessionId, updatedAt: now },
|
||||
};
|
||||
const oldestTranscript = path.join(testDir, `${oldestSessionId}.jsonl`);
|
||||
const newestTranscript = path.join(testDir, `${newestSessionId}.jsonl`);
|
||||
await fs.writeFile(oldestTranscript, '{"type":"session"}\n', "utf-8");
|
||||
await fs.writeFile(newestTranscript, '{"type":"session"}\n', "utf-8");
|
||||
|
||||
await saveSessionStore(storePath, store);
|
||||
|
||||
const loaded = loadSessionStore(storePath);
|
||||
expect(loaded.oldest).toBeUndefined();
|
||||
expect(loaded.newest).toBeDefined();
|
||||
await expect(fs.stat(oldestTranscript)).rejects.toThrow();
|
||||
await expect(fs.stat(newestTranscript)).resolves.toBeDefined();
|
||||
const files = await fs.readdir(testDir);
|
||||
expect(files.some((name) => name.startsWith(`${oldestSessionId}.jsonl.deleted.`))).toBe(true);
|
||||
});
|
||||
|
||||
it("does not archive external transcript paths when capping entries", async () => {
|
||||
applyCappedMaintenanceConfig(mockLoadConfig);
|
||||
|
||||
const now = Date.now();
|
||||
const externalDir = await createCaseDir("external-cap");
|
||||
const externalTranscript = path.join(externalDir, "outside.jsonl");
|
||||
await fs.writeFile(externalTranscript, "external", "utf-8");
|
||||
const store: Record<string, SessionEntry> = {
|
||||
oldest: {
|
||||
sessionId: "outside",
|
||||
sessionFile: externalTranscript,
|
||||
updatedAt: now - DAY_MS,
|
||||
},
|
||||
newest: { sessionId: "inside", updatedAt: now },
|
||||
};
|
||||
await fs.writeFile(path.join(testDir, "inside.jsonl"), '{"type":"session"}\n', "utf-8");
|
||||
|
||||
try {
|
||||
await saveSessionStore(storePath, store);
|
||||
const loaded = loadSessionStore(storePath);
|
||||
expect(loaded.oldest).toBeUndefined();
|
||||
expect(loaded.newest).toBeDefined();
|
||||
await expect(fs.stat(externalTranscript)).resolves.toBeDefined();
|
||||
} finally {
|
||||
await expect(fs.stat(externalTranscript)).resolves.toBeDefined();
|
||||
}
|
||||
});
|
||||
|
||||
it("enforces maxDiskBytes with oldest-first session eviction", async () => {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "enforce",
|
||||
pruneAfter: "365d",
|
||||
maxEntries: 100,
|
||||
maxDiskBytes: 900,
|
||||
highWaterBytes: 700,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const now = Date.now();
|
||||
const oldSessionId = "old-disk-session";
|
||||
const newSessionId = "new-disk-session";
|
||||
const store: Record<string, SessionEntry> = {
|
||||
old: { sessionId: oldSessionId, updatedAt: now - DAY_MS },
|
||||
recent: { sessionId: newSessionId, updatedAt: now },
|
||||
};
|
||||
await fs.writeFile(path.join(testDir, `${oldSessionId}.jsonl`), "x".repeat(500), "utf-8");
|
||||
await fs.writeFile(path.join(testDir, `${newSessionId}.jsonl`), "y".repeat(500), "utf-8");
|
||||
|
||||
await saveSessionStore(storePath, store);
|
||||
|
||||
const loaded = loadSessionStore(storePath);
|
||||
expect(Object.keys(loaded).length).toBe(1);
|
||||
expect(loaded.recent).toBeDefined();
|
||||
await expect(fs.stat(path.join(testDir, `${oldSessionId}.jsonl`))).rejects.toThrow();
|
||||
await expect(fs.stat(path.join(testDir, `${newSessionId}.jsonl`))).resolves.toBeDefined();
|
||||
});
|
||||
|
||||
it("uses projected sessions.json size to avoid over-eviction", async () => {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "enforce",
|
||||
pruneAfter: "365d",
|
||||
maxEntries: 100,
|
||||
maxDiskBytes: 900,
|
||||
highWaterBytes: 700,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
// Simulate a stale oversized on-disk sessions.json from a previous write.
|
||||
await fs.writeFile(storePath, JSON.stringify({ noisy: "x".repeat(10_000) }), "utf-8");
|
||||
|
||||
const now = Date.now();
|
||||
const store: Record<string, SessionEntry> = {
|
||||
older: { sessionId: "older", updatedAt: now - DAY_MS },
|
||||
newer: { sessionId: "newer", updatedAt: now },
|
||||
};
|
||||
await fs.writeFile(path.join(testDir, "older.jsonl"), "x".repeat(80), "utf-8");
|
||||
await fs.writeFile(path.join(testDir, "newer.jsonl"), "y".repeat(80), "utf-8");
|
||||
|
||||
await saveSessionStore(storePath, store);
|
||||
|
||||
const loaded = loadSessionStore(storePath);
|
||||
expect(loaded.older).toBeDefined();
|
||||
expect(loaded.newer).toBeDefined();
|
||||
});
|
||||
|
||||
it("does not create rotation backups for hot oversized store writes", async () => {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "enforce",
|
||||
pruneAfter: "365d",
|
||||
maxEntries: 100,
|
||||
rotateBytes: 200,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
let now = 1_800_000_000_000;
|
||||
const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => (now += 1000));
|
||||
try {
|
||||
const store: Record<string, SessionEntry> = {
|
||||
hot: {
|
||||
sessionId: "hot-session",
|
||||
updatedAt: Date.now(),
|
||||
pluginExtensions: { test: { payload: "x".repeat(1000) } },
|
||||
},
|
||||
};
|
||||
|
||||
for (let i = 0; i < 5; i++) {
|
||||
store.hot.updatedAt = Date.now();
|
||||
store.hot.pluginExtensions = { test: { payload: "x".repeat(1000), write: i } };
|
||||
await saveSessionStore(storePath, store);
|
||||
}
|
||||
} finally {
|
||||
nowSpy.mockRestore();
|
||||
}
|
||||
|
||||
const files = await fs.readdir(testDir);
|
||||
const backups = files.filter((file) => file.startsWith("sessions.json.bak."));
|
||||
expect(backups).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("does not create rotation backups for destructive maintenance rewrites", async () => {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "enforce",
|
||||
pruneAfter: "365d",
|
||||
maxEntries: 1,
|
||||
rotateBytes: 200,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const now = Date.now();
|
||||
const store: Record<string, SessionEntry> = {
|
||||
old: {
|
||||
sessionId: "old-session",
|
||||
updatedAt: now - DAY_MS,
|
||||
pluginExtensions: { test: { payload: "x".repeat(1000) } },
|
||||
},
|
||||
fresh: {
|
||||
sessionId: "fresh-session",
|
||||
updatedAt: now,
|
||||
pluginExtensions: { test: { payload: "y".repeat(1000) } },
|
||||
},
|
||||
};
|
||||
await fs.writeFile(storePath, JSON.stringify(store, null, 2), "utf-8");
|
||||
|
||||
await saveSessionStore(
|
||||
storePath,
|
||||
JSON.parse(JSON.stringify(store)) as Record<string, SessionEntry>,
|
||||
);
|
||||
|
||||
const files = await fs.readdir(testDir);
|
||||
const backups = files.filter((file) => file.startsWith("sessions.json.bak."));
|
||||
expect(backups).toHaveLength(0);
|
||||
const loaded = loadSessionStore(storePath, { skipCache: true });
|
||||
expect(loaded.old).toBeUndefined();
|
||||
expect(loaded.fresh).toBeDefined();
|
||||
});
|
||||
|
||||
it("never deletes transcripts outside the agent sessions directory during budget cleanup", async () => {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "enforce",
|
||||
pruneAfter: "365d",
|
||||
maxEntries: 100,
|
||||
maxDiskBytes: 500,
|
||||
highWaterBytes: 300,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const now = Date.now();
|
||||
const externalDir = await createCaseDir("external-session");
|
||||
const externalTranscript = path.join(externalDir, "outside.jsonl");
|
||||
await fs.writeFile(externalTranscript, "z".repeat(400), "utf-8");
|
||||
|
||||
const store: Record<string, SessionEntry> = {
|
||||
older: {
|
||||
sessionId: "outside",
|
||||
sessionFile: externalTranscript,
|
||||
updatedAt: now - DAY_MS,
|
||||
},
|
||||
newer: {
|
||||
sessionId: "inside",
|
||||
updatedAt: now,
|
||||
},
|
||||
};
|
||||
await fs.writeFile(path.join(testDir, "inside.jsonl"), "i".repeat(400), "utf-8");
|
||||
|
||||
try {
|
||||
await saveSessionStore(storePath, store);
|
||||
await expect(fs.stat(externalTranscript)).resolves.toBeDefined();
|
||||
} finally {
|
||||
await expect(fs.stat(externalTranscript)).resolves.toBeDefined();
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -32,7 +32,6 @@ import {
|
||||
capEntryCount,
|
||||
getActiveSessionMaintenanceWarning,
|
||||
pruneStaleEntries,
|
||||
shouldRunSessionEntryMaintenance,
|
||||
type ResolvedSessionMaintenanceConfig,
|
||||
type SessionMaintenanceWarning,
|
||||
} from "./store-maintenance.js";
|
||||
@@ -57,19 +56,12 @@ const log = createSubsystemLogger("sessions/store");
|
||||
let sessionArchiveRuntimePromise: Promise<
|
||||
typeof import("../../gateway/session-archive.runtime.js")
|
||||
> | null = null;
|
||||
let trajectoryCleanupRuntimePromise: Promise<typeof import("../../trajectory/cleanup.js")> | null =
|
||||
null;
|
||||
|
||||
function loadSessionArchiveRuntime() {
|
||||
sessionArchiveRuntimePromise ??= import("../../gateway/session-archive.runtime.js");
|
||||
return sessionArchiveRuntimePromise;
|
||||
}
|
||||
|
||||
function loadTrajectoryCleanupRuntime() {
|
||||
trajectoryCleanupRuntimePromise ??= import("../../trajectory/cleanup.js");
|
||||
return trajectoryCleanupRuntimePromise;
|
||||
}
|
||||
|
||||
function removeThreadFromDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined {
|
||||
if (!context || context.threadId == null) {
|
||||
return context;
|
||||
@@ -92,19 +84,6 @@ export function readSessionUpdatedAt(params: {
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Session Store Pruning, Capping & File Rotation
|
||||
// ============================================================================
|
||||
|
||||
export type SessionMaintenanceApplyReport = {
|
||||
mode: ResolvedSessionMaintenanceConfig["mode"];
|
||||
beforeCount: number;
|
||||
afterCount: number;
|
||||
pruned: number;
|
||||
capped: number;
|
||||
diskBudget: SessionDiskBudgetSweepResult | null;
|
||||
};
|
||||
|
||||
export {
|
||||
capEntryCount,
|
||||
getActiveSessionMaintenanceWarning,
|
||||
@@ -114,9 +93,9 @@ export {
|
||||
export type { ResolvedSessionMaintenanceConfig, SessionMaintenanceWarning };
|
||||
|
||||
type SaveSessionStoreOptions = {
|
||||
/** Skip pruning, capping, and rotation (e.g. during one-time migrations). */
|
||||
/** Deprecated no-op retained for callers that still pass migration-era options. */
|
||||
skipMaintenance?: boolean;
|
||||
/** Active session key for warn-only maintenance. */
|
||||
/** Deprecated no-op retained for callers that still pass migration-era options. */
|
||||
activeSessionKey?: string;
|
||||
/**
|
||||
* Session keys that are allowed to drop persisted ACP metadata during this update.
|
||||
@@ -124,14 +103,6 @@ type SaveSessionStoreOptions = {
|
||||
* whole session entry without carrying ACP state forward.
|
||||
*/
|
||||
allowDropAcpMetaSessionKeys?: string[];
|
||||
/** Optional callback for warn-only maintenance. */
|
||||
onWarn?: (warning: SessionMaintenanceWarning) => void | Promise<void>;
|
||||
/** Optional callback with maintenance stats after a save. */
|
||||
onMaintenanceApplied?: (report: SessionMaintenanceApplyReport) => void | Promise<void>;
|
||||
/** Optional overrides used by maintenance commands. */
|
||||
maintenanceOverride?: Partial<ResolvedSessionMaintenanceConfig>;
|
||||
/** Fully resolved maintenance settings when the caller already has config loaded. */
|
||||
maintenanceConfig?: ResolvedSessionMaintenanceConfig;
|
||||
};
|
||||
|
||||
function updateSessionStoreWriteCaches(params: {
|
||||
@@ -230,149 +201,10 @@ function preserveExistingAcpMetadata(params: {
|
||||
async function saveSessionStoreUnlocked(
|
||||
storePath: string,
|
||||
store: Record<string, SessionEntry>,
|
||||
opts?: SaveSessionStoreOptions,
|
||||
_opts?: SaveSessionStoreOptions,
|
||||
): Promise<void> {
|
||||
normalizeSessionStore(store);
|
||||
|
||||
if (!opts?.skipMaintenance) {
|
||||
// Resolve maintenance config once (avoids repeated getRuntimeConfig() calls).
|
||||
const maintenance = opts?.maintenanceConfig
|
||||
? { ...opts.maintenanceConfig, ...opts?.maintenanceOverride }
|
||||
: { ...resolveMaintenanceConfig(), ...opts?.maintenanceOverride };
|
||||
const shouldWarnOnly = maintenance.mode === "warn";
|
||||
const beforeCount = Object.keys(store).length;
|
||||
const forceMaintenance = opts?.maintenanceOverride !== undefined;
|
||||
const shouldRunEntryMaintenance = shouldRunSessionEntryMaintenance({
|
||||
entryCount: beforeCount,
|
||||
maxEntries: maintenance.maxEntries,
|
||||
force: forceMaintenance,
|
||||
});
|
||||
|
||||
if (shouldWarnOnly) {
|
||||
const activeSessionKey = opts?.activeSessionKey?.trim();
|
||||
if (activeSessionKey && shouldRunEntryMaintenance) {
|
||||
const warning = getActiveSessionMaintenanceWarning({
|
||||
store,
|
||||
activeSessionKey,
|
||||
pruneAfterMs: maintenance.pruneAfterMs,
|
||||
maxEntries: maintenance.maxEntries,
|
||||
});
|
||||
if (warning) {
|
||||
log.warn("session maintenance would evict active session; skipping enforcement", {
|
||||
activeSessionKey: warning.activeSessionKey,
|
||||
wouldPrune: warning.wouldPrune,
|
||||
wouldCap: warning.wouldCap,
|
||||
pruneAfterMs: warning.pruneAfterMs,
|
||||
maxEntries: warning.maxEntries,
|
||||
});
|
||||
await opts?.onWarn?.(warning);
|
||||
}
|
||||
}
|
||||
const diskBudget = await enforceSessionDiskBudget({
|
||||
store,
|
||||
storePath,
|
||||
activeSessionKey: opts?.activeSessionKey,
|
||||
maintenance,
|
||||
warnOnly: true,
|
||||
log,
|
||||
});
|
||||
await opts?.onMaintenanceApplied?.({
|
||||
mode: maintenance.mode,
|
||||
beforeCount,
|
||||
afterCount: Object.keys(store).length,
|
||||
pruned: 0,
|
||||
capped: 0,
|
||||
diskBudget,
|
||||
});
|
||||
} else {
|
||||
const preserveSessionKeys = opts?.activeSessionKey
|
||||
? new Set([opts.activeSessionKey])
|
||||
: undefined;
|
||||
// Prune stale entries and cap total count before serializing.
|
||||
const removedSessionFiles = new Map<string, string | undefined>();
|
||||
const pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, {
|
||||
onPruned: ({ entry }) => {
|
||||
rememberRemovedSessionFile(removedSessionFiles, entry);
|
||||
},
|
||||
preserveKeys: preserveSessionKeys,
|
||||
});
|
||||
const countAfterPrune = Object.keys(store).length;
|
||||
const shouldRunCapMaintenance =
|
||||
forceMaintenance ||
|
||||
shouldRunSessionEntryMaintenance({
|
||||
entryCount: countAfterPrune,
|
||||
maxEntries: maintenance.maxEntries,
|
||||
});
|
||||
const capped = shouldRunCapMaintenance
|
||||
? capEntryCount(store, maintenance.maxEntries, {
|
||||
onCapped: ({ entry }) => {
|
||||
rememberRemovedSessionFile(removedSessionFiles, entry);
|
||||
},
|
||||
preserveKeys: preserveSessionKeys,
|
||||
})
|
||||
: 0;
|
||||
const archivedDirs = new Set<string>();
|
||||
const referencedSessionIds = new Set(
|
||||
Object.values(store)
|
||||
.map((entry) => entry?.sessionId)
|
||||
.filter((id): id is string => Boolean(id)),
|
||||
);
|
||||
const archivedForDeletedSessions = await archiveRemovedSessionTranscripts({
|
||||
removedSessionFiles,
|
||||
referencedSessionIds,
|
||||
storePath,
|
||||
reason: "deleted",
|
||||
restrictToStoreDir: true,
|
||||
});
|
||||
if (removedSessionFiles.size > 0) {
|
||||
const { removeRemovedSessionTrajectoryArtifacts } = await loadTrajectoryCleanupRuntime();
|
||||
await removeRemovedSessionTrajectoryArtifacts({
|
||||
removedSessionFiles,
|
||||
referencedSessionIds,
|
||||
storePath,
|
||||
restrictToStoreDir: true,
|
||||
});
|
||||
}
|
||||
for (const archivedDir of archivedForDeletedSessions) {
|
||||
archivedDirs.add(archivedDir);
|
||||
}
|
||||
if (archivedDirs.size > 0 || maintenance.resetArchiveRetentionMs != null) {
|
||||
const { cleanupArchivedSessionTranscripts } = await loadSessionArchiveRuntime();
|
||||
const targetDirs =
|
||||
archivedDirs.size > 0 ? [...archivedDirs] : [path.dirname(path.resolve(storePath))];
|
||||
await cleanupArchivedSessionTranscripts({
|
||||
directories: targetDirs,
|
||||
olderThanMs: maintenance.pruneAfterMs,
|
||||
reason: "deleted",
|
||||
});
|
||||
if (maintenance.resetArchiveRetentionMs != null) {
|
||||
await cleanupArchivedSessionTranscripts({
|
||||
directories: targetDirs,
|
||||
olderThanMs: maintenance.resetArchiveRetentionMs,
|
||||
reason: "reset",
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const diskBudget = await enforceSessionDiskBudget({
|
||||
store,
|
||||
storePath,
|
||||
activeSessionKey: opts?.activeSessionKey,
|
||||
maintenance,
|
||||
warnOnly: false,
|
||||
log,
|
||||
});
|
||||
await opts?.onMaintenanceApplied?.({
|
||||
mode: maintenance.mode,
|
||||
beforeCount,
|
||||
afterCount: Object.keys(store).length,
|
||||
pruned,
|
||||
capped,
|
||||
diskBudget,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
await fs.promises.mkdir(path.dirname(storePath), { recursive: true });
|
||||
const json = JSON.stringify(store, null, 2);
|
||||
const sqliteOptions = resolveSqliteSessionStoreOptionsForPath(storePath);
|
||||
@@ -470,15 +302,6 @@ function getErrorCode(error: unknown): string | null {
|
||||
return String((error as { code?: unknown }).code);
|
||||
}
|
||||
|
||||
function rememberRemovedSessionFile(
|
||||
removedSessionFiles: Map<string, string | undefined>,
|
||||
entry: SessionEntry,
|
||||
): void {
|
||||
if (!removedSessionFiles.has(entry.sessionId) || entry.sessionFile) {
|
||||
removedSessionFiles.set(entry.sessionId, entry.sessionFile);
|
||||
}
|
||||
}
|
||||
|
||||
export async function archiveRemovedSessionTranscripts(params: {
|
||||
removedSessionFiles: Iterable<[string, string | undefined]>;
|
||||
referencedSessionIds: ReadonlySet<string>;
|
||||
|
||||
@@ -12,7 +12,7 @@ export function useTempSessionsFixture(prefix: string) {
|
||||
tempDir = fs.mkdtempSync(path.join(os.tmpdir(), prefix));
|
||||
sessionsDir = path.join(tempDir, "agents", "main", "sessions");
|
||||
fs.mkdirSync(sessionsDir, { recursive: true });
|
||||
storePath = path.join(sessionsDir, "sessions.json");
|
||||
storePath = path.join(tempDir, "session-store.json");
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
|
||||
@@ -101,7 +101,6 @@ const resolveOpenClawPackageRootSync = vi.hoisted(() => vi.fn((_params: unknown)
|
||||
const runChannelPluginStartupMaintenance = vi.hoisted(() =>
|
||||
vi.fn(async (_params: unknown) => undefined),
|
||||
);
|
||||
const runStartupSessionMigration = vi.hoisted(() => vi.fn(async (_params: unknown) => undefined));
|
||||
vi.mock("../agents/agent-scope.js", () => ({
|
||||
resolveAgentWorkspaceDir: () => "/workspace",
|
||||
resolveDefaultAgentId: () => "default",
|
||||
@@ -149,10 +148,6 @@ vi.mock("./server-plugin-bootstrap.js", () => ({
|
||||
loadGatewayStartupPlugins: (params: unknown) => loadGatewayStartupPlugins(params),
|
||||
}));
|
||||
|
||||
vi.mock("./server-startup-session-migration.js", () => ({
|
||||
runStartupSessionMigration: (params: unknown) => runStartupSessionMigration(params),
|
||||
}));
|
||||
|
||||
function createLog() {
|
||||
return {
|
||||
info: vi.fn(),
|
||||
@@ -177,7 +172,6 @@ describe("prepareGatewayPluginBootstrap startup plugins", () => {
|
||||
});
|
||||
resolveOpenClawPackageRootSync.mockClear().mockReturnValue("/package");
|
||||
runChannelPluginStartupMaintenance.mockClear();
|
||||
runStartupSessionMigration.mockClear();
|
||||
});
|
||||
it("derives startup activation from source config instead of runtime plugin defaults", async () => {
|
||||
const sourceConfig = {
|
||||
|
||||
@@ -53,24 +53,11 @@ export async function prepareGatewayPluginBootstrap(params: {
|
||||
if (shouldRunStartupMaintenance) {
|
||||
const { runChannelPluginStartupMaintenance } =
|
||||
await import("../channels/plugins/lifecycle-startup.js");
|
||||
const startupTasks = [
|
||||
runChannelPluginStartupMaintenance({
|
||||
cfg: startupMaintenanceConfig,
|
||||
env: process.env,
|
||||
log: params.log,
|
||||
}),
|
||||
];
|
||||
if (!params.minimalTestGateway) {
|
||||
const { runStartupSessionMigration } = await import("./server-startup-session-migration.js");
|
||||
startupTasks.push(
|
||||
runStartupSessionMigration({
|
||||
cfg: params.cfgAtStart,
|
||||
env: process.env,
|
||||
log: params.log,
|
||||
}),
|
||||
);
|
||||
}
|
||||
await Promise.all(startupTasks);
|
||||
await runChannelPluginStartupMaintenance({
|
||||
cfg: startupMaintenanceConfig,
|
||||
env: process.env,
|
||||
log: params.log,
|
||||
});
|
||||
}
|
||||
|
||||
initSubagentRegistry();
|
||||
|
||||
@@ -1,75 +0,0 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { runStartupSessionMigration } from "./server-startup-session-migration.js";
|
||||
|
||||
function makeLog() {
|
||||
return {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
function makeCfg() {
|
||||
return { agents: { defaults: {} }, session: {} } as Parameters<
|
||||
typeof runStartupSessionMigration
|
||||
>[0]["cfg"];
|
||||
}
|
||||
|
||||
describe("runStartupSessionMigration", () => {
|
||||
it("logs changes when orphaned keys are canonicalized", async () => {
|
||||
const log = makeLog();
|
||||
const migrate = vi.fn().mockResolvedValue({
|
||||
changes: ["Canonicalized 2 orphaned session key(s) in /tmp/store.json"],
|
||||
warnings: [],
|
||||
});
|
||||
await runStartupSessionMigration({
|
||||
cfg: makeCfg(),
|
||||
log,
|
||||
deps: { migrateOrphanedSessionKeys: migrate },
|
||||
});
|
||||
expect(migrate).toHaveBeenCalledOnce();
|
||||
expect(log.info).toHaveBeenCalledOnce();
|
||||
expect(log.info.mock.calls[0][0]).toContain("canonicalized orphaned session keys");
|
||||
expect(log.warn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("logs warnings from migration", async () => {
|
||||
const log = makeLog();
|
||||
const migrate = vi.fn().mockResolvedValue({
|
||||
changes: [],
|
||||
warnings: ["Could not read /bad/path: ENOENT"],
|
||||
});
|
||||
await runStartupSessionMigration({
|
||||
cfg: makeCfg(),
|
||||
log,
|
||||
deps: { migrateOrphanedSessionKeys: migrate },
|
||||
});
|
||||
expect(log.info).not.toHaveBeenCalled();
|
||||
expect(log.warn).toHaveBeenCalledOnce();
|
||||
expect(log.warn.mock.calls[0][0]).toContain("session key migration warnings");
|
||||
});
|
||||
|
||||
it("silently continues when no changes needed", async () => {
|
||||
const log = makeLog();
|
||||
const migrate = vi.fn().mockResolvedValue({ changes: [], warnings: [] });
|
||||
await runStartupSessionMigration({
|
||||
cfg: makeCfg(),
|
||||
log,
|
||||
deps: { migrateOrphanedSessionKeys: migrate },
|
||||
});
|
||||
expect(log.info).not.toHaveBeenCalled();
|
||||
expect(log.warn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("catches and logs migration errors without throwing", async () => {
|
||||
const log = makeLog();
|
||||
const migrate = vi.fn().mockRejectedValue(new Error("disk full"));
|
||||
await runStartupSessionMigration({
|
||||
cfg: makeCfg(),
|
||||
log,
|
||||
deps: { migrateOrphanedSessionKeys: migrate },
|
||||
});
|
||||
expect(log.warn).toHaveBeenCalledOnce();
|
||||
expect(log.warn.mock.calls[0][0]).toContain("migration failed during startup");
|
||||
expect(log.warn.mock.calls[0][0]).toContain("disk full");
|
||||
});
|
||||
});
|
||||
@@ -1,46 +0,0 @@
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { migrateOrphanedSessionKeys } from "../infra/state-migrations.js";
|
||||
|
||||
type SessionMigrationLogger = {
|
||||
info: (message: string) => void;
|
||||
warn: (message: string) => void;
|
||||
};
|
||||
|
||||
/**
|
||||
* Run orphan-key session migration at gateway startup.
|
||||
*
|
||||
* Idempotent and best-effort: if the migration fails, gateway startup
|
||||
* continues normally. This ensures accumulated orphaned session keys
|
||||
* (from the write-path bug #29683) are cleaned up automatically on
|
||||
* upgrade rather than requiring a manual `openclaw doctor` run.
|
||||
*/
|
||||
export async function runStartupSessionMigration(params: {
|
||||
cfg: OpenClawConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
log: SessionMigrationLogger;
|
||||
deps?: {
|
||||
migrateOrphanedSessionKeys?: typeof migrateOrphanedSessionKeys;
|
||||
};
|
||||
}): Promise<void> {
|
||||
const migrate = params.deps?.migrateOrphanedSessionKeys ?? migrateOrphanedSessionKeys;
|
||||
try {
|
||||
const result = await migrate({
|
||||
cfg: params.cfg,
|
||||
env: params.env ?? process.env,
|
||||
});
|
||||
if (result.changes.length > 0) {
|
||||
params.log.info(
|
||||
`gateway: canonicalized orphaned session keys:\n${result.changes.map((c) => `- ${c}`).join("\n")}`,
|
||||
);
|
||||
}
|
||||
if (result.warnings.length > 0) {
|
||||
params.log.warn(
|
||||
`gateway: session key migration warnings:\n${result.warnings.map((w) => `- ${w}`).join("\n")}`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
params.log.warn(
|
||||
`gateway: orphaned session key migration failed during startup; continuing: ${String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
import { resolveStorePath } from "../config/sessions/paths.js";
|
||||
import { loadSessionStore } from "../config/sessions/store-load.js";
|
||||
import { resolveMaintenanceConfigFromInput } from "../config/sessions/store-maintenance.js";
|
||||
import type { SessionEntry } from "../config/sessions/types.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import { normalizeOptionalAccountId } from "../routing/account-id.js";
|
||||
@@ -37,9 +36,7 @@ export function resolvePersistedApprovalRequestSessionEntry(params: {
|
||||
const parsed = parseAgentSessionKey(sessionKey);
|
||||
const agentId = parsed?.agentId ?? params.request.request.agentId ?? "main";
|
||||
const storePath = resolveStorePath(params.cfg.session?.store, { agentId });
|
||||
const store = loadSessionStore(storePath, {
|
||||
maintenanceConfig: resolveMaintenanceConfigFromInput(params.cfg.session?.maintenance),
|
||||
});
|
||||
const store = loadSessionStore(storePath);
|
||||
const entry = store[sessionKey];
|
||||
if (!entry) {
|
||||
return null;
|
||||
|
||||
@@ -1,185 +0,0 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { withTempDir } from "../test-helpers/temp-dir.js";
|
||||
import { migrateOrphanedSessionKeys } from "./state-migrations.js";
|
||||
|
||||
function writeStore(storePath: string, store: Record<string, unknown>): void {
|
||||
fs.mkdirSync(path.dirname(storePath), { recursive: true });
|
||||
fs.writeFileSync(storePath, JSON.stringify(store));
|
||||
}
|
||||
|
||||
function readStore(storePath: string): Record<string, unknown> {
|
||||
return JSON.parse(fs.readFileSync(storePath, "utf-8"));
|
||||
}
|
||||
|
||||
async function withStateFixture(
|
||||
run: (params: { tmpDir: string; stateDir: string }) => Promise<void>,
|
||||
): Promise<void> {
|
||||
await withTempDir({ prefix: "orphan-keys-test-" }, async (tmpDir) => {
|
||||
const stateDir = path.join(tmpDir, ".openclaw");
|
||||
fs.mkdirSync(stateDir, { recursive: true });
|
||||
await run({ tmpDir, stateDir });
|
||||
});
|
||||
}
|
||||
|
||||
const OPS_WORK_CONFIG = {
|
||||
session: { mainKey: "work" },
|
||||
agents: { list: [{ id: "ops", default: true }] },
|
||||
} as OpenClawConfig;
|
||||
|
||||
function opsSessionStorePath(stateDir: string): string {
|
||||
return path.join(stateDir, "agents", "ops", "sessions", "sessions.json");
|
||||
}
|
||||
|
||||
function sharedMainOpsConfig(sharedStorePath: string): OpenClawConfig {
|
||||
return {
|
||||
session: { mainKey: "work", store: sharedStorePath },
|
||||
agents: { list: [{ id: "main" }, { id: "ops", default: true }] },
|
||||
} as OpenClawConfig;
|
||||
}
|
||||
|
||||
async function migrateFixtureState(stateDir: string, cfg: OpenClawConfig = OPS_WORK_CONFIG) {
|
||||
return migrateOrphanedSessionKeys({
|
||||
cfg,
|
||||
env: { OPENCLAW_STATE_DIR: stateDir },
|
||||
});
|
||||
}
|
||||
|
||||
describe("migrateOrphanedSessionKeys", () => {
|
||||
it("renames orphaned raw key to canonical form", async () => {
|
||||
await withStateFixture(async ({ stateDir }) => {
|
||||
const storePath = opsSessionStorePath(stateDir);
|
||||
writeStore(storePath, {
|
||||
"agent:main:main": { sessionId: "abc-123", updatedAt: 1000 },
|
||||
});
|
||||
|
||||
const result = await migrateFixtureState(stateDir);
|
||||
|
||||
expect(result.changes.length).toBeGreaterThan(0);
|
||||
const store = readStore(storePath);
|
||||
expect(store["agent:ops:work"]).toBeDefined();
|
||||
expect((store["agent:ops:work"] as { sessionId: string }).sessionId).toBe("abc-123");
|
||||
expect(store["agent:main:main"]).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps most recently updated entry when both orphan and canonical exist", async () => {
|
||||
await withStateFixture(async ({ stateDir }) => {
|
||||
const storePath = opsSessionStorePath(stateDir);
|
||||
writeStore(storePath, {
|
||||
"agent:main:main": { sessionId: "old-orphan", updatedAt: 500 },
|
||||
"agent:ops:work": { sessionId: "current", updatedAt: 2000 },
|
||||
});
|
||||
|
||||
await migrateFixtureState(stateDir);
|
||||
|
||||
const store = readStore(storePath);
|
||||
expect((store["agent:ops:work"] as { sessionId: string }).sessionId).toBe("current");
|
||||
expect(store["agent:main:main"]).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
it("skips stores that are already fully canonical", async () => {
|
||||
await withStateFixture(async ({ stateDir }) => {
|
||||
const storePath = opsSessionStorePath(stateDir);
|
||||
writeStore(storePath, {
|
||||
"agent:ops:work": { sessionId: "abc-123", updatedAt: 1000 },
|
||||
});
|
||||
|
||||
const result = await migrateFixtureState(stateDir);
|
||||
|
||||
expect(result.changes).toHaveLength(0);
|
||||
expect(result.warnings).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
it("handles missing store files gracefully", async () => {
|
||||
await withStateFixture(async ({ stateDir }) => {
|
||||
const result = await migrateFixtureState(stateDir);
|
||||
|
||||
expect(result.changes).toHaveLength(0);
|
||||
expect(result.warnings).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
it("is idempotent — running twice produces same result", async () => {
|
||||
await withStateFixture(async ({ stateDir }) => {
|
||||
const storePath = opsSessionStorePath(stateDir);
|
||||
writeStore(storePath, {
|
||||
"agent:main:main": { sessionId: "abc-123", updatedAt: 1000 },
|
||||
});
|
||||
|
||||
const env = { OPENCLAW_STATE_DIR: stateDir };
|
||||
await migrateOrphanedSessionKeys({ cfg: OPS_WORK_CONFIG, env });
|
||||
const result2 = await migrateOrphanedSessionKeys({ cfg: OPS_WORK_CONFIG, env });
|
||||
|
||||
expect(result2.changes).toHaveLength(0);
|
||||
const store = readStore(storePath);
|
||||
expect((store["agent:ops:work"] as { sessionId: string }).sessionId).toBe("abc-123");
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves legitimate agent:main:* keys in shared stores with both main and non-main agents", async () => {
|
||||
await withStateFixture(async ({ tmpDir, stateDir }) => {
|
||||
// When session.store lacks {agentId}, all agents resolve to the same file.
|
||||
// The "main" agent's keys must not be remapped into the "ops" namespace.
|
||||
const sharedStorePath = path.join(tmpDir, "shared-sessions.json");
|
||||
writeStore(sharedStorePath, {
|
||||
"agent:main:main": { sessionId: "main-session", updatedAt: 2000 },
|
||||
"agent:ops:work": { sessionId: "ops-session", updatedAt: 1000 },
|
||||
});
|
||||
|
||||
await migrateFixtureState(stateDir, sharedMainOpsConfig(sharedStorePath));
|
||||
|
||||
const store = readStore(sharedStorePath);
|
||||
// main agent's session is canonicalised to use configured mainKey ("work"),
|
||||
// but stays in the "main" agent namespace — NOT remapped into "ops".
|
||||
expect(store["agent:main:work"]).toBeDefined();
|
||||
expect((store["agent:main:work"] as { sessionId: string }).sessionId).toBe("main-session");
|
||||
expect(store["agent:ops:work"]).toBeDefined();
|
||||
expect((store["agent:ops:work"] as { sessionId: string }).sessionId).toBe("ops-session");
|
||||
// The key must NOT have been merged into ops namespace
|
||||
expect(Object.keys(store).filter((k) => k.startsWith("agent:ops:")).length).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
it("lets the main agent claim bare main aliases in shared stores", async () => {
|
||||
await withStateFixture(async ({ tmpDir, stateDir }) => {
|
||||
const sharedStorePath = path.join(tmpDir, "shared-sessions.json");
|
||||
writeStore(sharedStorePath, {
|
||||
main: { sessionId: "main-session", updatedAt: 2000 },
|
||||
"agent:ops:work": { sessionId: "ops-session", updatedAt: 1000 },
|
||||
});
|
||||
|
||||
await migrateFixtureState(stateDir, sharedMainOpsConfig(sharedStorePath));
|
||||
|
||||
const store = readStore(sharedStorePath);
|
||||
expect(store["agent:main:work"]).toBeDefined();
|
||||
expect((store["agent:main:work"] as { sessionId: string }).sessionId).toBe("main-session");
|
||||
expect(store.main).toBeUndefined();
|
||||
expect(store["agent:ops:work"]).toBeDefined();
|
||||
});
|
||||
});
|
||||
|
||||
it("no-ops when default agentId is main and mainKey is main", async () => {
|
||||
await withStateFixture(async ({ stateDir }) => {
|
||||
const storePath = path.join(stateDir, "agents", "main", "sessions", "sessions.json");
|
||||
writeStore(storePath, {
|
||||
"agent:main:main": { sessionId: "abc-123", updatedAt: 1000 },
|
||||
});
|
||||
|
||||
const cfg = {} as OpenClawConfig;
|
||||
|
||||
const result = await migrateOrphanedSessionKeys({
|
||||
cfg,
|
||||
env: { OPENCLAW_STATE_DIR: stateDir },
|
||||
});
|
||||
|
||||
expect(result.changes).toHaveLength(0);
|
||||
const store = readStore(storePath);
|
||||
expect(store["agent:main:main"]).toBeDefined();
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -208,9 +208,9 @@ describe("state migrations", () => {
|
||||
|
||||
expect(result.warnings).toEqual([]);
|
||||
expect(result.changes).toEqual([
|
||||
`Canonicalized 2 orphaned session key(s) in ${path.join(stateDir, "agents", "worker-1", "sessions", "sessions.json")}`,
|
||||
`Migrated latest direct-chat session → agent:worker-1:desk`,
|
||||
"Imported 4 session index row(s) into SQLite for agent worker-1",
|
||||
"Canonicalized 2 legacy session key(s)",
|
||||
"Moved trace.jsonl → agents/worker-1/sessions",
|
||||
"Moved agent file settings.json → agents/worker-1/agent",
|
||||
`Moved MobileAuth auth creds.json → ${path.join(stateDir, "credentials", "mobileauth", "default", "creds.json")}`,
|
||||
|
||||
@@ -31,8 +31,6 @@ import {
|
||||
normalizeLowercaseStringOrEmpty,
|
||||
normalizeOptionalLowercaseString,
|
||||
} from "../shared/string-coerce.js";
|
||||
import { expandHomePrefix } from "./home-dir.js";
|
||||
import { writeTextAtomic } from "./json-files.js";
|
||||
import { isWithinDir } from "./path-safety.js";
|
||||
import {
|
||||
ensureDir,
|
||||
@@ -998,26 +996,12 @@ export async function runLegacyStateMigrations(params: {
|
||||
}): Promise<{ changes: string[]; warnings: string[] }> {
|
||||
const now = params.now ?? (() => Date.now());
|
||||
const detected = params.detected;
|
||||
const orphanKeys = await migrateOrphanedSessionKeys({
|
||||
cfg: detected.cfg,
|
||||
env: detected.env,
|
||||
});
|
||||
const sessions = await migrateLegacySessions(detected, now);
|
||||
const agentDir = await migrateLegacyAgentDir(detected, now);
|
||||
const channelPlans = await migrateChannelLegacyStatePlans(detected);
|
||||
return {
|
||||
changes: [
|
||||
...orphanKeys.changes,
|
||||
...sessions.changes,
|
||||
...agentDir.changes,
|
||||
...channelPlans.changes,
|
||||
],
|
||||
warnings: [
|
||||
...orphanKeys.warnings,
|
||||
...sessions.warnings,
|
||||
...agentDir.warnings,
|
||||
...channelPlans.warnings,
|
||||
],
|
||||
changes: [...sessions.changes, ...agentDir.changes, ...channelPlans.changes],
|
||||
warnings: [...sessions.warnings, ...agentDir.warnings, ...channelPlans.warnings],
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1036,147 +1020,6 @@ export async function autoMigrateLegacyAgentDir(params: {
|
||||
return await autoMigrateLegacyState(params);
|
||||
}
|
||||
|
||||
/**
|
||||
* Canonicalize orphaned raw session keys in all known agent session stores.
|
||||
*
|
||||
* Keys written by resolveSessionKey() used DEFAULT_AGENT_ID="main" regardless
|
||||
* of the configured default agent; reads always use resolveSessionStoreKey()
|
||||
* which canonicalizes via canonicalizeMainSessionAlias. This migration renames
|
||||
* any orphaned raw keys to their canonical form in-place, merging with any
|
||||
* existing canonical entry by preferring the most recently updated.
|
||||
*
|
||||
* Safe to run multiple times (idempotent). See #29683.
|
||||
*/
|
||||
export async function migrateOrphanedSessionKeys(params: {
|
||||
cfg: OpenClawConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
}): Promise<{ changes: string[]; warnings: string[] }> {
|
||||
const changes: string[] = [];
|
||||
const warnings: string[] = [];
|
||||
const env = params.env ?? process.env;
|
||||
const stateDir = resolveStateDir(env);
|
||||
const agentId = normalizeAgentId(resolveDefaultAgentId(params.cfg));
|
||||
const mainKey = normalizeMainKey(params.cfg.session?.mainKey);
|
||||
const scope = params.cfg.session?.scope as SessionScope | undefined;
|
||||
const storeConfig = params.cfg.session?.store;
|
||||
|
||||
// Collect all known agent store paths with their owning agentIds.
|
||||
// A single path may be shared by multiple agents when session.store
|
||||
// does not contain {agentId}.
|
||||
const storeMap = new Map<string, Set<string>>();
|
||||
const addToStoreMap = (p: string, id: string) => {
|
||||
const existing = storeMap.get(p);
|
||||
if (existing) {
|
||||
existing.add(id);
|
||||
} else {
|
||||
storeMap.set(p, new Set([id]));
|
||||
}
|
||||
};
|
||||
// Default agent store.
|
||||
const defaultStorePath = storeConfig
|
||||
? resolveStorePathFromTemplate(storeConfig, agentId, env)
|
||||
: path.join(stateDir, "agents", agentId, "sessions", "sessions.json");
|
||||
addToStoreMap(defaultStorePath, agentId);
|
||||
// Configured agents.
|
||||
for (const entry of params.cfg.agents?.list ?? []) {
|
||||
if (entry?.id) {
|
||||
const id = normalizeAgentId(entry.id);
|
||||
const p = storeConfig
|
||||
? resolveStorePathFromTemplate(storeConfig, id, env)
|
||||
: path.join(stateDir, "agents", id, "sessions", "sessions.json");
|
||||
addToStoreMap(p, id);
|
||||
}
|
||||
}
|
||||
// Agent directories present on disk.
|
||||
// This only covers the standard state-dir layout so we can still pick up
|
||||
// orphaned stores left behind by older configs. Active custom-template paths
|
||||
// are already covered by the configured-agents loop above.
|
||||
const agentsDir = path.join(stateDir, "agents");
|
||||
if (existsDir(agentsDir)) {
|
||||
for (const dirEntry of safeReadDir(agentsDir)) {
|
||||
if (dirEntry.isDirectory()) {
|
||||
const diskAgentId = normalizeAgentId(dirEntry.name);
|
||||
if (diskAgentId) {
|
||||
const diskPath = path.join(agentsDir, diskAgentId, "sessions", "sessions.json");
|
||||
addToStoreMap(diskPath, diskAgentId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (const [storePath, storeAgentIds] of storeMap) {
|
||||
if (!fileExists(storePath)) {
|
||||
continue;
|
||||
}
|
||||
let parsed: ReturnType<typeof readSessionStoreJson5>;
|
||||
try {
|
||||
parsed = readSessionStoreJson5(storePath);
|
||||
} catch (err) {
|
||||
warnings.push(`Could not read ${storePath}: ${String(err)}`);
|
||||
continue;
|
||||
}
|
||||
if (!parsed.ok) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// When multiple agents share a single store file (session.store without
|
||||
// {agentId}), run canonicalization once per agent so each agent's keys are
|
||||
// handled correctly. Skip cross-agent "agent:main:*" remapping when "main"
|
||||
// is a legitimate configured agent to avoid merging its data into another
|
||||
// agent's namespace.
|
||||
let working = parsed.store;
|
||||
let totalLegacy = 0;
|
||||
for (const storeAgentId of storeAgentIds) {
|
||||
const { store: canonicalized, legacyKeys } = canonicalizeSessionStore({
|
||||
store: working,
|
||||
agentId: storeAgentId,
|
||||
mainKey,
|
||||
scope,
|
||||
// When multiple agents share the store and "main" is one of them,
|
||||
// agent:main:* keys are legitimate — don't cross-agent remap them.
|
||||
skipCrossAgentRemap: storeAgentIds.size > 1 && storeAgentIds.has(DEFAULT_AGENT_ID),
|
||||
});
|
||||
working = canonicalized;
|
||||
// Each pass only counts keys it changed from the current working store, so
|
||||
// once a key is canonicalized it is not counted again by later agent passes.
|
||||
totalLegacy += legacyKeys.length;
|
||||
}
|
||||
if (totalLegacy === 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const normalized: Record<string, SessionEntry> = {};
|
||||
for (const [key, entry] of Object.entries(working)) {
|
||||
const ne = normalizeSessionEntry(entry);
|
||||
if (ne) {
|
||||
normalized[key] = ne;
|
||||
}
|
||||
}
|
||||
try {
|
||||
ensureDir(path.dirname(storePath));
|
||||
await writeTextAtomic(storePath, `${JSON.stringify(normalized, null, 2)}\n`, { mode: 0o600 });
|
||||
changes.push(`Canonicalized ${totalLegacy} orphaned session key(s) in ${storePath}`);
|
||||
} catch (err) {
|
||||
warnings.push(`Failed to write canonicalized store ${storePath}: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
return { changes, warnings };
|
||||
}
|
||||
|
||||
function resolveStorePathFromTemplate(
|
||||
template: string,
|
||||
agentId: string,
|
||||
env?: NodeJS.ProcessEnv,
|
||||
): string {
|
||||
const expand = (s: string) =>
|
||||
s.startsWith("~") ? expandHomePrefix(s, { env: env ?? process.env, homedir: os.homedir }) : s;
|
||||
if (template.includes("{agentId}")) {
|
||||
return path.resolve(expand(template.replaceAll("{agentId}", agentId)));
|
||||
}
|
||||
return path.resolve(expand(template));
|
||||
}
|
||||
|
||||
export async function autoMigrateLegacyState(params: {
|
||||
cfg: OpenClawConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
|
||||
Reference in New Issue
Block a user