mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-11 04:48:05 +00:00
refactor: remove session maintenance write options
This commit is contained in:
@@ -49,9 +49,11 @@ This plan has started landing in slices:
|
||||
- Canonical per-agent session stores use SQLite by default. The `openclaw doctor`
|
||||
fix mode imports legacy `sessions.json` indexes into SQLite and removes the
|
||||
JSON index after import, instead of keeping a startup migration or parallel
|
||||
compatibility/export store. Runtime session reads and writes no longer run
|
||||
JSON import, pruning, capping, archive cleanup, or disk-budget cleanup; those
|
||||
mutations now live behind explicit doctor/session-cleanup steps.
|
||||
compatibility/export store. Runtime session reads and writes normalize and
|
||||
persist only: no JSON import, pruning, capping, archive cleanup, or
|
||||
disk-budget cleanup runs on the hot path. The old maintenance write options
|
||||
have been removed from the session-store API; doctor owns legacy import and
|
||||
`openclaw sessions cleanup` owns explicit cleanup.
|
||||
- Transcript events have a SQLite store primitive with JSONL import/export.
|
||||
Transcript append paths dual-write when the caller already has agent and
|
||||
session scope, including gateway-injected assistant messages. Scoped appends
|
||||
@@ -341,10 +343,12 @@ Migration order:
|
||||
|
||||
1. Keep current task registry and plugin state as is.
|
||||
2. Add shared SQLite connection and migration helpers.
|
||||
3. Move `sessions.json` behind a `SessionStoreBackend` interface.
|
||||
4. Make SQLite primary for session entries.
|
||||
3. Move `sessions.json` behind a `SessionStoreBackend` interface. Done for
|
||||
canonical per-agent stores.
|
||||
4. Make SQLite primary for session entries. Done for canonical per-agent
|
||||
stores.
|
||||
5. Import old `sessions.json` only from `openclaw doctor --fix`, then remove the
|
||||
JSON index after SQLite has the rows.
|
||||
JSON index after SQLite has the rows. Done for session indexes.
|
||||
6. Leave `*.jsonl` transcripts on disk while PI owns transcript semantics.
|
||||
7. After session manager ownership moves behind OpenClaw APIs, store transcript
|
||||
events in SQLite and export JSONL for compatibility.
|
||||
@@ -543,10 +547,10 @@ Phase 1: SQLite session index
|
||||
- Add shared state DB helper.
|
||||
- Add a doctor migration that imports `sessions.json` into SQLite and removes
|
||||
the JSON index.
|
||||
- Move session entries to SQLite behind a flag.
|
||||
- Move canonical session entries to SQLite by default.
|
||||
- Prove current session list, patch, reset, cleanup, and UI flows.
|
||||
- Remove load-time/startup session JSON migration and write-time pruning from
|
||||
the runtime store path.
|
||||
- Remove load-time/startup session JSON migration, write-time pruning, and
|
||||
migration-era maintenance options from the runtime store path.
|
||||
|
||||
Phase 2: VFS scratch
|
||||
|
||||
|
||||
@@ -171,9 +171,6 @@ export async function upsertAcpSessionMeta(params: {
|
||||
store[storeSessionKey] = nextEntry;
|
||||
return nextEntry;
|
||||
},
|
||||
{
|
||||
activeSessionKey: normalizeLowercaseStringOrEmpty(sessionKey),
|
||||
allowDropAcpMetaSessionKeys: [sessionKey],
|
||||
},
|
||||
{ allowDropAcpMetaSessionKeys: [sessionKey] },
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2,8 +2,9 @@ import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { loadSessionStore, type SessionEntry } from "../config/sessions.js";
|
||||
import { loadSessionStore, saveSessionStore, type SessionEntry } from "../config/sessions.js";
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
|
||||
import {
|
||||
markRestartAbortedMainSessionsFromLocks,
|
||||
recoverRestartAbortedMainSessions,
|
||||
@@ -19,9 +20,12 @@ let tmpDir: string;
|
||||
beforeEach(async () => {
|
||||
vi.clearAllMocks();
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-main-restart-recovery-"));
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", tmpDir);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
closeOpenClawStateDatabaseForTest();
|
||||
vi.unstubAllEnvs();
|
||||
await fs.rm(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
@@ -32,7 +36,7 @@ async function makeSessionsDir(agentId = "main"): Promise<string> {
|
||||
}
|
||||
|
||||
async function writeStore(sessionsDir: string, store: Record<string, SessionEntry>): Promise<void> {
|
||||
await fs.writeFile(path.join(sessionsDir, "sessions.json"), JSON.stringify(store, null, 2));
|
||||
await saveSessionStore(path.join(sessionsDir, "sessions.json"), store);
|
||||
}
|
||||
|
||||
async function writeTranscript(
|
||||
|
||||
@@ -133,28 +133,24 @@ async function markSessionFailed(params: {
|
||||
sessionKey: string;
|
||||
reason: string;
|
||||
}): Promise<void> {
|
||||
await updateSessionStore(
|
||||
params.storePath,
|
||||
(store) => {
|
||||
const entry = store[params.sessionKey];
|
||||
if (!entry || entry.status !== "running") {
|
||||
return;
|
||||
}
|
||||
entry.status = "failed";
|
||||
entry.abortedLastRun = true;
|
||||
entry.endedAt = Date.now();
|
||||
entry.updatedAt = entry.endedAt;
|
||||
entry.pendingFinalDelivery = undefined;
|
||||
entry.pendingFinalDeliveryText = undefined;
|
||||
entry.pendingFinalDeliveryCreatedAt = undefined;
|
||||
entry.pendingFinalDeliveryLastAttemptAt = undefined;
|
||||
entry.pendingFinalDeliveryAttemptCount = undefined;
|
||||
entry.pendingFinalDeliveryLastError = undefined;
|
||||
entry.pendingFinalDeliveryContext = undefined;
|
||||
store[params.sessionKey] = entry;
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
await updateSessionStore(params.storePath, (store) => {
|
||||
const entry = store[params.sessionKey];
|
||||
if (!entry || entry.status !== "running") {
|
||||
return;
|
||||
}
|
||||
entry.status = "failed";
|
||||
entry.abortedLastRun = true;
|
||||
entry.endedAt = Date.now();
|
||||
entry.updatedAt = entry.endedAt;
|
||||
entry.pendingFinalDelivery = undefined;
|
||||
entry.pendingFinalDeliveryText = undefined;
|
||||
entry.pendingFinalDeliveryCreatedAt = undefined;
|
||||
entry.pendingFinalDeliveryLastAttemptAt = undefined;
|
||||
entry.pendingFinalDeliveryAttemptCount = undefined;
|
||||
entry.pendingFinalDeliveryLastError = undefined;
|
||||
entry.pendingFinalDeliveryContext = undefined;
|
||||
store[params.sessionKey] = entry;
|
||||
});
|
||||
log.warn(`marked interrupted main session failed: ${params.sessionKey} (${params.reason})`);
|
||||
}
|
||||
|
||||
@@ -175,26 +171,21 @@ async function resumeMainSession(params: {
|
||||
},
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
await updateSessionStore(
|
||||
params.storePath,
|
||||
(store) => {
|
||||
const entry = store[params.sessionKey];
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
const now = Date.now();
|
||||
entry.abortedLastRun = false;
|
||||
entry.updatedAt = now;
|
||||
if (entry.pendingFinalDelivery || entry.pendingFinalDeliveryText) {
|
||||
entry.pendingFinalDeliveryLastAttemptAt = now;
|
||||
entry.pendingFinalDeliveryAttemptCount =
|
||||
(entry.pendingFinalDeliveryAttemptCount ?? 0) + 1;
|
||||
entry.pendingFinalDeliveryLastError = null;
|
||||
}
|
||||
store[params.sessionKey] = entry;
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
await updateSessionStore(params.storePath, (store) => {
|
||||
const entry = store[params.sessionKey];
|
||||
if (!entry) {
|
||||
return;
|
||||
}
|
||||
const now = Date.now();
|
||||
entry.abortedLastRun = false;
|
||||
entry.updatedAt = now;
|
||||
if (entry.pendingFinalDelivery || entry.pendingFinalDeliveryText) {
|
||||
entry.pendingFinalDeliveryLastAttemptAt = now;
|
||||
entry.pendingFinalDeliveryAttemptCount = (entry.pendingFinalDeliveryAttemptCount ?? 0) + 1;
|
||||
entry.pendingFinalDeliveryLastError = null;
|
||||
}
|
||||
store[params.sessionKey] = entry;
|
||||
});
|
||||
log.info(
|
||||
`resumed interrupted main session: ${params.sessionKey}${
|
||||
params.pendingFinalDeliveryText ? " (with pending payload)" : ""
|
||||
@@ -223,28 +214,24 @@ export async function markRestartAbortedMainSessionsFromLocks(params: {
|
||||
}
|
||||
|
||||
const storePath = path.join(sessionsDir, "sessions.json");
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
for (const [sessionKey, entry] of Object.entries(store)) {
|
||||
if (!entry || entry.status !== "running") {
|
||||
continue;
|
||||
}
|
||||
if (shouldSkipMainRecovery(entry, sessionKey)) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
const entryLockPaths = resolveEntryTranscriptLockPaths({ entry, sessionsDir });
|
||||
if (!entryLockPaths.some((lockPath) => interruptedLockPaths.has(lockPath))) {
|
||||
continue;
|
||||
}
|
||||
entry.abortedLastRun = true;
|
||||
store[sessionKey] = entry;
|
||||
result.marked++;
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
for (const [sessionKey, entry] of Object.entries(store)) {
|
||||
if (!entry || entry.status !== "running") {
|
||||
continue;
|
||||
}
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
if (shouldSkipMainRecovery(entry, sessionKey)) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
const entryLockPaths = resolveEntryTranscriptLockPaths({ entry, sessionsDir });
|
||||
if (!entryLockPaths.some((lockPath) => interruptedLockPaths.has(lockPath))) {
|
||||
continue;
|
||||
}
|
||||
entry.abortedLastRun = true;
|
||||
store[sessionKey] = entry;
|
||||
result.marked++;
|
||||
}
|
||||
});
|
||||
|
||||
if (result.marked > 0) {
|
||||
log.warn(`marked ${result.marked} interrupted main session(s) from stale transcript locks`);
|
||||
|
||||
@@ -27,20 +27,16 @@ export async function writeSubagentSessionEntry(params: {
|
||||
defaultSessionId: string;
|
||||
}): Promise<string> {
|
||||
const storePath = resolveSubagentSessionStorePath(params.stateDir, params.agentId);
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
store[params.sessionKey] = {
|
||||
...store[params.sessionKey],
|
||||
sessionId: params.sessionId ?? params.defaultSessionId,
|
||||
updatedAt: params.updatedAt ?? Date.now(),
|
||||
...(typeof params.abortedLastRun === "boolean"
|
||||
? { abortedLastRun: params.abortedLastRun }
|
||||
: {}),
|
||||
};
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
store[params.sessionKey] = {
|
||||
...store[params.sessionKey],
|
||||
sessionId: params.sessionId ?? params.defaultSessionId,
|
||||
updatedAt: params.updatedAt ?? Date.now(),
|
||||
...(typeof params.abortedLastRun === "boolean"
|
||||
? { abortedLastRun: params.abortedLastRun }
|
||||
: {}),
|
||||
};
|
||||
});
|
||||
return storePath;
|
||||
}
|
||||
|
||||
@@ -50,13 +46,9 @@ export async function removeSubagentSessionEntry(params: {
|
||||
agentId: string;
|
||||
}): Promise<string> {
|
||||
const storePath = resolveSubagentSessionStorePath(params.stateDir, params.agentId);
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
delete store[params.sessionKey];
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
delete store[params.sessionKey];
|
||||
});
|
||||
return storePath;
|
||||
}
|
||||
|
||||
|
||||
@@ -761,7 +761,6 @@ export async function initSessionState(params: {
|
||||
agentId,
|
||||
sessionsDir: path.dirname(storePath),
|
||||
fallbackSessionFile,
|
||||
activeSessionKey: sessionKey,
|
||||
});
|
||||
sessionEntry = resolvedSessionFile.sessionEntry;
|
||||
if (isNewSession) {
|
||||
@@ -784,19 +783,13 @@ export async function initSessionState(params: {
|
||||
}
|
||||
// Preserve per-session overrides while resetting compaction state on /new.
|
||||
sessionStore[sessionKey] = { ...sessionStore[sessionKey], ...sessionEntry };
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
// Preserve per-session overrides while resetting compaction state on /new.
|
||||
store[sessionKey] = { ...store[sessionKey], ...sessionEntry };
|
||||
if (retiredLegacyMainDelivery) {
|
||||
store[retiredLegacyMainDelivery.key] = retiredLegacyMainDelivery.entry;
|
||||
}
|
||||
},
|
||||
{
|
||||
activeSessionKey: sessionKey,
|
||||
},
|
||||
);
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
// Preserve per-session overrides while resetting compaction state on /new.
|
||||
store[sessionKey] = { ...store[sessionKey], ...sessionEntry };
|
||||
if (retiredLegacyMainDelivery) {
|
||||
store[retiredLegacyMainDelivery.key] = retiredLegacyMainDelivery.entry;
|
||||
}
|
||||
});
|
||||
|
||||
// Archive old transcript so it doesn't accumulate on disk (#14869).
|
||||
let previousSessionTranscript: {
|
||||
|
||||
@@ -737,10 +737,8 @@ export async function maybeRepairCodexSessionRoutes(params: {
|
||||
if (staleSessionKeys.length === 0) {
|
||||
continue;
|
||||
}
|
||||
const result = await updateSessionStore(
|
||||
target.storePath,
|
||||
(store) => repairCodexSessionStoreRoutes({ store }),
|
||||
{ skipMaintenance: true },
|
||||
const result = await updateSessionStore(target.storePath, (store) =>
|
||||
repairCodexSessionStoreRoutes({ store, runtime }),
|
||||
);
|
||||
if (!result.changed) {
|
||||
continue;
|
||||
|
||||
@@ -861,20 +861,16 @@ describe("sessions", () => {
|
||||
const readSpy = vi.spyOn(fsSync, "readFileSync");
|
||||
const parseSpy = vi.spyOn(JSON, "parse");
|
||||
try {
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
const existing = store[mainSessionKey];
|
||||
if (!existing) {
|
||||
throw new Error("missing session entry");
|
||||
}
|
||||
store[mainSessionKey] = {
|
||||
...existing,
|
||||
thinkingLevel: "high",
|
||||
};
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
const existing = store[mainSessionKey];
|
||||
if (!existing) {
|
||||
throw new Error("missing session entry");
|
||||
}
|
||||
store[mainSessionKey] = {
|
||||
...existing,
|
||||
thinkingLevel: "high",
|
||||
};
|
||||
});
|
||||
|
||||
expect(readSpy).not.toHaveBeenCalled();
|
||||
expect(parseSpy).not.toHaveBeenCalled();
|
||||
@@ -903,21 +899,17 @@ describe("sessions", () => {
|
||||
expect(loadSessionStore(storePath)[mainSessionKey]?.thinkingLevel).toBe("low");
|
||||
|
||||
await expect(
|
||||
updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
const existing = store[mainSessionKey];
|
||||
if (!existing) {
|
||||
throw new Error("missing session entry");
|
||||
}
|
||||
store[mainSessionKey] = {
|
||||
...existing,
|
||||
thinkingLevel: "mutated-before-throw",
|
||||
};
|
||||
throw new Error("boom");
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
),
|
||||
updateSessionStore(storePath, (store) => {
|
||||
const existing = store[mainSessionKey];
|
||||
if (!existing) {
|
||||
throw new Error("missing session entry");
|
||||
}
|
||||
store[mainSessionKey] = {
|
||||
...existing,
|
||||
thinkingLevel: "mutated-before-throw",
|
||||
};
|
||||
throw new Error("boom");
|
||||
}),
|
||||
).rejects.toThrow("boom");
|
||||
|
||||
const readSpy = vi.spyOn(fsSync, "readFileSync");
|
||||
|
||||
@@ -412,72 +412,51 @@ export async function runSessionsCleanup(params: {
|
||||
const appliedReportRef: { current: AppliedSessionCleanupReport | null } = {
|
||||
current: null,
|
||||
};
|
||||
const dmScopeRemovedSessionFiles = new Map<string, string | undefined>();
|
||||
let missingApplied = 0;
|
||||
let dmScopeRetiredApplied = 0;
|
||||
await updateSessionStore(
|
||||
target.storePath,
|
||||
async (store) => {
|
||||
const beforeCount = Object.keys(store).length;
|
||||
const missing = opts.fixMissing
|
||||
? pruneMissingTranscriptEntries({
|
||||
store,
|
||||
storePath: target.storePath,
|
||||
})
|
||||
: 0;
|
||||
let pruned = 0;
|
||||
let capped = 0;
|
||||
let diskBudget: AppliedSessionCleanupReport["diskBudget"] = null;
|
||||
if (mode === "warn") {
|
||||
diskBudget = await enforceSessionDiskBudget({
|
||||
const missingApplied = await updateSessionStore(target.storePath, async (store) => {
|
||||
const beforeCount = Object.keys(store).length;
|
||||
const missing = opts.fixMissing
|
||||
? pruneMissingTranscriptEntries({
|
||||
store,
|
||||
storePath: target.storePath,
|
||||
activeSessionKey: opts.activeKey,
|
||||
maintenance,
|
||||
warnOnly: true,
|
||||
});
|
||||
} else {
|
||||
const preserveKeys = opts.activeKey ? new Set([opts.activeKey]) : undefined;
|
||||
pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, {
|
||||
preserveKeys,
|
||||
});
|
||||
capped = capEntryCount(store, maintenance.maxEntries, {
|
||||
preserveKeys,
|
||||
});
|
||||
diskBudget = await enforceSessionDiskBudget({
|
||||
store,
|
||||
storePath: target.storePath,
|
||||
activeSessionKey: opts.activeKey,
|
||||
maintenance,
|
||||
warnOnly: false,
|
||||
});
|
||||
}
|
||||
appliedReportRef.current = {
|
||||
mode,
|
||||
beforeCount,
|
||||
afterCount: Object.keys(store).length,
|
||||
pruned,
|
||||
capped,
|
||||
diskBudget,
|
||||
};
|
||||
return missing;
|
||||
},
|
||||
opts.activeKey ? { activeSessionKey: opts.activeKey } : undefined,
|
||||
);
|
||||
if (dmScopeRemovedSessionFiles.size > 0) {
|
||||
const storeAfterDmScopeRetire = loadSessionStore(target.storePath, { skipCache: true });
|
||||
await archiveRemovedSessionTranscripts({
|
||||
removedSessionFiles: dmScopeRemovedSessionFiles,
|
||||
referencedSessionIds: new Set(
|
||||
Object.values(storeAfterDmScopeRetire)
|
||||
.map((entry) => entry?.sessionId)
|
||||
.filter((id): id is string => Boolean(id)),
|
||||
),
|
||||
storePath: target.storePath,
|
||||
reason: "deleted",
|
||||
restrictToStoreDir: true,
|
||||
});
|
||||
}
|
||||
})
|
||||
: 0;
|
||||
let pruned = 0;
|
||||
let capped = 0;
|
||||
let diskBudget: AppliedSessionCleanupReport["diskBudget"] = null;
|
||||
if (mode === "warn") {
|
||||
diskBudget = await enforceSessionDiskBudget({
|
||||
store,
|
||||
storePath: target.storePath,
|
||||
activeSessionKey: opts.activeKey,
|
||||
maintenance,
|
||||
warnOnly: true,
|
||||
});
|
||||
} else {
|
||||
const preserveKeys = opts.activeKey ? new Set([opts.activeKey]) : undefined;
|
||||
pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, {
|
||||
preserveKeys,
|
||||
});
|
||||
capped = capEntryCount(store, maintenance.maxEntries, {
|
||||
preserveKeys,
|
||||
});
|
||||
diskBudget = await enforceSessionDiskBudget({
|
||||
store,
|
||||
storePath: target.storePath,
|
||||
activeSessionKey: opts.activeKey,
|
||||
maintenance,
|
||||
warnOnly: false,
|
||||
});
|
||||
}
|
||||
appliedReportRef.current = {
|
||||
mode,
|
||||
beforeCount,
|
||||
afterCount: Object.keys(store).length,
|
||||
pruned,
|
||||
capped,
|
||||
diskBudget,
|
||||
};
|
||||
return missing;
|
||||
});
|
||||
const afterStore = loadSessionStore(target.storePath, { skipCache: true });
|
||||
const unreferencedArtifacts =
|
||||
mode === "warn"
|
||||
|
||||
@@ -8,8 +8,6 @@ export type ReadSessionUpdatedAt = (params: {
|
||||
}) => number | undefined;
|
||||
|
||||
export type SaveSessionStoreOptions = {
|
||||
skipMaintenance?: boolean;
|
||||
activeSessionKey?: string;
|
||||
allowDropAcpMetaSessionKeys?: string[];
|
||||
};
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ export async function resolveAndPersistSessionFile(params: {
|
||||
agentId?: string;
|
||||
sessionsDir?: string;
|
||||
fallbackSessionFile?: string;
|
||||
activeSessionKey?: string;
|
||||
}): Promise<{ sessionFile: string; sessionEntry: SessionEntry }> {
|
||||
const { sessionId, sessionKey, sessionStore, storePath } = params;
|
||||
const now = Date.now();
|
||||
@@ -39,16 +38,12 @@ export async function resolveAndPersistSessionFile(params: {
|
||||
};
|
||||
if (baseEntry.sessionId !== sessionId || baseEntry.sessionFile !== sessionFile) {
|
||||
sessionStore[sessionKey] = persistedEntry;
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
store[sessionKey] = {
|
||||
...store[sessionKey],
|
||||
...persistedEntry,
|
||||
};
|
||||
},
|
||||
params.activeSessionKey ? { activeSessionKey: params.activeSessionKey } : undefined,
|
||||
);
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
store[sessionKey] = {
|
||||
...store[sessionKey],
|
||||
...persistedEntry,
|
||||
};
|
||||
});
|
||||
return { sessionFile, sessionEntry: persistedEntry };
|
||||
}
|
||||
sessionStore[sessionKey] = persistedEntry;
|
||||
|
||||
@@ -326,13 +326,9 @@ describe("session store writer queue", () => {
|
||||
});
|
||||
|
||||
const writeSpy = vi.spyOn(jsonFiles, "writeTextAtomic");
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
async () => {
|
||||
// Intentionally no-op mutation.
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
await updateSessionStore(storePath, async () => {
|
||||
// Intentionally no-op mutation.
|
||||
});
|
||||
expect(writeSpy).not.toHaveBeenCalled();
|
||||
writeSpy.mockRestore();
|
||||
});
|
||||
|
||||
@@ -113,18 +113,14 @@ describe("SQLite session store backend", () => {
|
||||
updatedAt: 100,
|
||||
};
|
||||
|
||||
await saveSessionStore(storePath, { "discord:ops": entry }, { skipMaintenance: true });
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
store["discord:ops"] = {
|
||||
...store["discord:ops"],
|
||||
updatedAt: 200,
|
||||
modelOverride: "gpt-5.5",
|
||||
};
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
await saveSessionStore(storePath, { "discord:ops": entry });
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
store["discord:ops"] = {
|
||||
...store["discord:ops"],
|
||||
updatedAt: 200,
|
||||
modelOverride: "gpt-5.5",
|
||||
};
|
||||
});
|
||||
|
||||
expect(fs.existsSync(storePath)).toBe(false);
|
||||
expect(loadSessionStore(storePath, { skipCache: true })).toEqual({
|
||||
@@ -146,7 +142,7 @@ describe("SQLite session store backend", () => {
|
||||
updatedAt: 100,
|
||||
};
|
||||
|
||||
await saveSessionStore(storePath, { "discord:ops": entry }, { skipMaintenance: true });
|
||||
await saveSessionStore(storePath, { "discord:ops": entry });
|
||||
|
||||
expect(fs.existsSync(storePath)).toBe(false);
|
||||
expect(loadSessionStore(storePath, { skipCache: true })).toEqual({
|
||||
@@ -176,17 +172,13 @@ describe("SQLite session store backend", () => {
|
||||
|
||||
expect(loadSessionStore(storePath, { skipCache: true })).toEqual({});
|
||||
|
||||
await saveSessionStore(
|
||||
storePath,
|
||||
{
|
||||
"discord:ops": {
|
||||
...legacyEntry,
|
||||
sessionId: "sqlite-session",
|
||||
updatedAt: 200,
|
||||
},
|
||||
await saveSessionStore(storePath, {
|
||||
"discord:ops": {
|
||||
...legacyEntry,
|
||||
sessionId: "sqlite-session",
|
||||
updatedAt: 200,
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
});
|
||||
expect(loadSessionStore(storePath, { skipCache: true })).toEqual({
|
||||
"discord:ops": {
|
||||
...legacyEntry,
|
||||
|
||||
@@ -2,11 +2,13 @@ import crypto from "node:crypto";
|
||||
import { afterAll, beforeAll, describe, expect, it } from "vitest";
|
||||
import { createFixtureSuite } from "../../test-utils/fixture-suite.js";
|
||||
import {
|
||||
capEntryCount,
|
||||
getActiveSessionMaintenanceWarning,
|
||||
isProtectedSessionMaintenanceEntry,
|
||||
pruneStaleEntries,
|
||||
resolveMaintenanceConfigFromInput,
|
||||
resolveSessionEntryMaintenanceHighWater,
|
||||
} from "./store-maintenance.js";
|
||||
import { capEntryCount, getActiveSessionMaintenanceWarning, pruneStaleEntries } from "./store.js";
|
||||
import type { SessionEntry } from "./types.js";
|
||||
|
||||
const DAY_MS = 24 * 60 * 60 * 1000;
|
||||
|
||||
@@ -91,7 +91,7 @@ describe("session store strips resolvedSkills from persistence", () => {
|
||||
"agent:main:test:1": makeEntry("session-1", makeSnapshot(5)),
|
||||
};
|
||||
|
||||
await saveSessionStore(storePath, store, { skipMaintenance: true });
|
||||
await saveSessionStore(storePath, store);
|
||||
|
||||
const raw = await fs.readFile(storePath, "utf-8");
|
||||
expect(raw).not.toContain("resolvedSkills");
|
||||
@@ -107,7 +107,7 @@ describe("session store strips resolvedSkills from persistence", () => {
|
||||
"agent:main:test:1": makeEntry("session-1", snapshot),
|
||||
};
|
||||
|
||||
await saveSessionStore(storePath, store, { skipMaintenance: true });
|
||||
await saveSessionStore(storePath, store);
|
||||
const loaded = loadSessionStore(storePath, { skipCache: true });
|
||||
|
||||
const persistedSnapshot = loaded["agent:main:test:1"]?.skillsSnapshot;
|
||||
@@ -137,7 +137,7 @@ describe("session store strips resolvedSkills from persistence", () => {
|
||||
);
|
||||
|
||||
// Saving the loaded record should rewrite the file in stripped form.
|
||||
await saveSessionStore(storePath, loaded, { skipMaintenance: true });
|
||||
await saveSessionStore(storePath, loaded);
|
||||
const rawAfter = await fs.readFile(storePath, "utf-8");
|
||||
expect(rawAfter).not.toContain("resolvedSkills");
|
||||
});
|
||||
@@ -145,13 +145,9 @@ describe("session store strips resolvedSkills from persistence", () => {
|
||||
it("strips resolvedSkills written via updateSessionStore mutator", async () => {
|
||||
// Simulate the production hot path where ensureSkillSnapshot puts a
|
||||
// freshly-built snapshot (with resolvedSkills) into the store via mutator.
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
store["agent:main:test:1"] = makeEntry("session-1", makeSnapshot(6));
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
store["agent:main:test:1"] = makeEntry("session-1", makeSnapshot(6));
|
||||
});
|
||||
|
||||
const raw = await fs.readFile(storePath, "utf-8");
|
||||
expect(raw).not.toContain("resolvedSkills");
|
||||
@@ -168,7 +164,7 @@ describe("session store strips resolvedSkills from persistence", () => {
|
||||
store[`agent:main:scale:${i}`] = makeEntry(`session-${i}`, makeSnapshot(SKILLS_PER_SESSION));
|
||||
}
|
||||
|
||||
await saveSessionStore(storePath, store, { skipMaintenance: true });
|
||||
await saveSessionStore(storePath, store);
|
||||
|
||||
const stat = await fs.stat(storePath);
|
||||
// Pre-fix: ~SESSION_COUNT * SKILLS_PER_SESSION * ~3KB ≈ 15MB.
|
||||
|
||||
@@ -11,7 +11,6 @@ import {
|
||||
} from "../../utils/delivery-context.shared.js";
|
||||
import type { DeliveryContext } from "../../utils/delivery-context.types.js";
|
||||
import { getFileStatSnapshot } from "../cache-utils.js";
|
||||
import { enforceSessionDiskBudget, type SessionDiskBudgetSweepResult } from "./disk-budget.js";
|
||||
import { deriveSessionMetaPatch } from "./metadata.js";
|
||||
import {
|
||||
saveSqliteSessionStore,
|
||||
@@ -26,16 +25,9 @@ import {
|
||||
writeSessionStoreCache,
|
||||
} from "./store-cache.js";
|
||||
import { normalizeStoreSessionKey, resolveSessionStoreEntry } from "./store-entry.js";
|
||||
import { loadSessionStore } from "./store-load.js";
|
||||
import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js";
|
||||
import {
|
||||
capEntryCount,
|
||||
getActiveSessionMaintenanceWarning,
|
||||
pruneQuotaSuspensions,
|
||||
pruneStaleEntries,
|
||||
type QuotaSuspensionMaintenanceResult,
|
||||
type ResolvedSessionMaintenanceConfig,
|
||||
type SessionMaintenanceWarning,
|
||||
} from "./store-maintenance.js";
|
||||
import { normalizeSessionStore } from "./store-normalize.js";
|
||||
import { runExclusiveSessionStoreWrite } from "./store-writer.js";
|
||||
@@ -86,19 +78,7 @@ export function readSessionUpdatedAt(params: {
|
||||
}
|
||||
}
|
||||
|
||||
export {
|
||||
capEntryCount,
|
||||
getActiveSessionMaintenanceWarning,
|
||||
pruneStaleEntries,
|
||||
resolveMaintenanceConfig,
|
||||
};
|
||||
export type { ResolvedSessionMaintenanceConfig, SessionMaintenanceWarning };
|
||||
|
||||
type SaveSessionStoreOptions = {
|
||||
/** Deprecated no-op retained for callers that still pass migration-era options. */
|
||||
skipMaintenance?: boolean;
|
||||
/** Deprecated no-op retained for callers that still pass migration-era options. */
|
||||
activeSessionKey?: string;
|
||||
/**
|
||||
* Session keys that are allowed to drop persisted ACP metadata during this update.
|
||||
* All other updates preserve existing `entry.acp` blocks when callers replace the
|
||||
@@ -306,16 +286,13 @@ export async function runQuotaSuspensionMaintenance(params: {
|
||||
if (!fs.existsSync(params.storePath)) {
|
||||
return { resumed: [], cleared: 0 };
|
||||
}
|
||||
return await updateSessionStore(
|
||||
params.storePath,
|
||||
(store) =>
|
||||
pruneQuotaSuspensions({
|
||||
store,
|
||||
now: params.now ?? Date.now(),
|
||||
ttlMs: params.ttlMs,
|
||||
log: params.log,
|
||||
}),
|
||||
{ skipMaintenance: true },
|
||||
return await updateSessionStore(params.storePath, (store) =>
|
||||
pruneQuotaSuspensions({
|
||||
store,
|
||||
now: params.now ?? Date.now(),
|
||||
ttlMs: params.ttlMs,
|
||||
log: params.log,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -376,9 +353,7 @@ async function persistResolvedSessionEntry(params: {
|
||||
for (const legacyKey of params.resolved.legacyKeys) {
|
||||
delete params.store[legacyKey];
|
||||
}
|
||||
await saveSessionStoreUnlocked(params.storePath, params.store, {
|
||||
activeSessionKey: params.resolved.normalizedKey,
|
||||
});
|
||||
await saveSessionStoreUnlocked(params.storePath, params.store);
|
||||
return params.next;
|
||||
}
|
||||
|
||||
@@ -418,42 +393,38 @@ export async function recordSessionMetaFromInbound(params: {
|
||||
}): Promise<SessionEntry | null> {
|
||||
const { storePath, sessionKey, ctx } = params;
|
||||
const createIfMissing = params.createIfMissing ?? true;
|
||||
return await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
const resolved = resolveSessionStoreEntry({ store, sessionKey });
|
||||
const existing = resolved.existing;
|
||||
const patch = deriveSessionMetaPatch({
|
||||
ctx,
|
||||
sessionKey: resolved.normalizedKey,
|
||||
existing,
|
||||
groupResolution: params.groupResolution,
|
||||
});
|
||||
if (!patch) {
|
||||
if (existing && resolved.legacyKeys.length > 0) {
|
||||
store[resolved.normalizedKey] = existing;
|
||||
for (const legacyKey of resolved.legacyKeys) {
|
||||
delete store[legacyKey];
|
||||
}
|
||||
return await updateSessionStore(storePath, (store) => {
|
||||
const resolved = resolveSessionStoreEntry({ store, sessionKey });
|
||||
const existing = resolved.existing;
|
||||
const patch = deriveSessionMetaPatch({
|
||||
ctx,
|
||||
sessionKey: resolved.normalizedKey,
|
||||
existing,
|
||||
groupResolution: params.groupResolution,
|
||||
});
|
||||
if (!patch) {
|
||||
if (existing && resolved.legacyKeys.length > 0) {
|
||||
store[resolved.normalizedKey] = existing;
|
||||
for (const legacyKey of resolved.legacyKeys) {
|
||||
delete store[legacyKey];
|
||||
}
|
||||
return existing ?? null;
|
||||
}
|
||||
if (!existing && !createIfMissing) {
|
||||
return null;
|
||||
}
|
||||
const next = existing
|
||||
? // Inbound metadata updates must not refresh activity timestamps;
|
||||
// idle reset evaluation relies on updatedAt from actual session turns.
|
||||
mergeSessionEntryPreserveActivity(existing, patch)
|
||||
: mergeSessionEntry(existing, patch);
|
||||
store[resolved.normalizedKey] = next;
|
||||
for (const legacyKey of resolved.legacyKeys) {
|
||||
delete store[legacyKey];
|
||||
}
|
||||
return next;
|
||||
},
|
||||
{ activeSessionKey: normalizeStoreSessionKey(sessionKey) },
|
||||
);
|
||||
return existing ?? null;
|
||||
}
|
||||
if (!existing && !createIfMissing) {
|
||||
return null;
|
||||
}
|
||||
const next = existing
|
||||
? // Inbound metadata updates must not refresh activity timestamps;
|
||||
// idle reset evaluation relies on updatedAt from actual session turns.
|
||||
mergeSessionEntryPreserveActivity(existing, patch)
|
||||
: mergeSessionEntry(existing, patch);
|
||||
store[resolved.normalizedKey] = next;
|
||||
for (const legacyKey of resolved.legacyKeys) {
|
||||
delete store[legacyKey];
|
||||
}
|
||||
return next;
|
||||
});
|
||||
}
|
||||
|
||||
export async function updateLastRoute(params: {
|
||||
|
||||
@@ -577,8 +577,8 @@ export type SessionSkillSnapshot = {
|
||||
/**
|
||||
* Runtime-only, never persisted. Carries the full parsed Skill[] (including
|
||||
* each SKILL.md body) so the embedded runner can skip a workspace skill
|
||||
* scan within a turn. Stripped from sessions.json on every read and write
|
||||
* via normalizeSessionStore — see store-load.ts. On a cold session resume
|
||||
* scan within a turn. Stripped from persistent session entries on every
|
||||
* read and write via normalizeSessionStore — see store-load.ts. On a cold session resume
|
||||
* this is undefined and src/agents/pi-embedded-runner/skills-runtime.ts
|
||||
* rebuilds it by reloading skill entries from disk.
|
||||
*/
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { SessionScope } from "../config/sessions/types.js";
|
||||
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
|
||||
|
||||
const agentCommand = vi.fn();
|
||||
|
||||
@@ -37,10 +38,21 @@ describe("runBootOnce", () => {
|
||||
|
||||
beforeEach(async () => {
|
||||
vi.clearAllMocks();
|
||||
stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-boot-state-"));
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", stateDir);
|
||||
const { storePath } = resolveMainStore();
|
||||
await fs.rm(storePath, { force: true });
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
closeOpenClawStateDatabaseForTest();
|
||||
vi.unstubAllEnvs();
|
||||
if (stateDir) {
|
||||
await fs.rm(stateDir, { recursive: true, force: true });
|
||||
stateDir = "";
|
||||
}
|
||||
});
|
||||
|
||||
const makeDeps = () => ({
|
||||
sendMessageWhatsApp: vi.fn(),
|
||||
sendMessageTelegram: vi.fn(),
|
||||
@@ -261,3 +273,4 @@ describe("runBootOnce", () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
let stateDir = "";
|
||||
|
||||
@@ -119,17 +119,13 @@ async function restoreMainSessionMapping(
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
await updateSessionStore(
|
||||
snapshot.storePath,
|
||||
(store) => {
|
||||
if (snapshot.hadEntry && snapshot.entry) {
|
||||
store[snapshot.sessionKey] = snapshot.entry;
|
||||
return;
|
||||
}
|
||||
delete store[snapshot.sessionKey];
|
||||
},
|
||||
{ activeSessionKey: snapshot.sessionKey },
|
||||
);
|
||||
await updateSessionStore(snapshot.storePath, (store) => {
|
||||
if (snapshot.hadEntry && snapshot.entry) {
|
||||
store[snapshot.sessionKey] = snapshot.entry;
|
||||
return;
|
||||
}
|
||||
delete store[snapshot.sessionKey];
|
||||
});
|
||||
return undefined;
|
||||
} catch (err) {
|
||||
return formatErrorMessage(err);
|
||||
|
||||
Reference in New Issue
Block a user