mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-11 04:48:05 +00:00
refactor: default transcript sessions to sqlite locators
This commit is contained in:
@@ -207,6 +207,9 @@ The remaining cleanup is mostly consolidation and deletion:
|
||||
- `llm-task` helper runs and hidden commitment extraction also use SQLite
|
||||
transcript locators, so these model-only helper sessions no longer create
|
||||
temporary JSON/JSONL transcript files.
|
||||
- `TranscriptSessionManager` default create, list, fork, and branch paths now
|
||||
use SQLite transcript locators unless a caller explicitly supplies a legacy
|
||||
transcript directory.
|
||||
- Parent transcript fork decisions and fork creation no longer accept
|
||||
`storePath` or `sessionsDir`; they use `{agentId, sessionId}` SQLite
|
||||
transcript scope and derive any retained path metadata from the parent
|
||||
|
||||
@@ -122,6 +122,37 @@ describe("TranscriptSessionManager", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("creates, branches, lists, and forks default sessions with virtual sqlite locators", async () => {
|
||||
await makeTempSessionFile();
|
||||
const sessionManager = SessionManager.create("/tmp/sqlite-workspace");
|
||||
const sessionFile = sessionManager.getSessionFile();
|
||||
if (!sessionFile) {
|
||||
throw new Error("expected session file");
|
||||
}
|
||||
expect(sessionFile).toMatch(/^sqlite-transcript:\/\/main\//);
|
||||
|
||||
const userId = sessionManager.appendMessage({
|
||||
role: "user",
|
||||
content: "sqlite default",
|
||||
timestamp: 3,
|
||||
});
|
||||
const branchFile = sessionManager.createBranchedSession(userId);
|
||||
if (!branchFile) {
|
||||
throw new Error("expected branch file");
|
||||
}
|
||||
expect(branchFile).toMatch(/^sqlite-transcript:\/\/main\//);
|
||||
|
||||
const listed = await SessionManager.list("/tmp/sqlite-workspace");
|
||||
expect(listed.map((session) => session.id)).toContain(sessionManager.getSessionId());
|
||||
|
||||
const forked = SessionManager.forkFrom(sessionFile, "/tmp/sqlite-fork");
|
||||
expect(forked.getSessionFile()).toMatch(/^sqlite-transcript:\/\/main\//);
|
||||
expect(forked.getHeader()).toMatchObject({
|
||||
cwd: "/tmp/sqlite-fork",
|
||||
parentSession: sessionFile,
|
||||
});
|
||||
});
|
||||
|
||||
it("persists initial user messages synchronously before the first assistant message", async () => {
|
||||
const sessionFile = await makeTempSessionFile();
|
||||
const sessionManager = openTranscriptSessionManager({
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { isSqliteSessionTranscriptLocator } from "../../config/sessions/paths.js";
|
||||
import {
|
||||
createSqliteSessionTranscriptLocator,
|
||||
isSqliteSessionTranscriptLocator,
|
||||
} from "../../config/sessions/paths.js";
|
||||
import {
|
||||
appendSqliteSessionTranscriptEvent,
|
||||
listSqliteSessionTranscriptFiles,
|
||||
listSqliteSessionTranscripts,
|
||||
loadSqliteSessionTranscriptEvents,
|
||||
replaceSqliteSessionTranscriptEvents,
|
||||
resolveSqliteSessionTranscriptScopeForPath,
|
||||
@@ -48,13 +51,12 @@ type TranscriptSqliteScope = {
|
||||
transcriptPath: string;
|
||||
};
|
||||
|
||||
function encodeSessionCwd(cwd: string): string {
|
||||
return `--${cwd.replace(/^[/\\]/, "").replace(/[/\\:]/g, "-")}--`;
|
||||
}
|
||||
|
||||
function resolveDefaultSessionDir(cwd: string): string {
|
||||
return path.join(os.homedir(), ".openclaw", "sessions", encodeSessionCwd(cwd));
|
||||
}
|
||||
type SqliteTranscriptRecord = {
|
||||
agentId: string;
|
||||
sessionId: string;
|
||||
path: string;
|
||||
updatedAt: number;
|
||||
};
|
||||
|
||||
function normalizeSessionFileIdentifier(sessionFile: string): string {
|
||||
const trimmed = sessionFile.trim();
|
||||
@@ -68,6 +70,17 @@ function resolveSessionDirForIdentifier(sessionFile: string, sessionDir?: string
|
||||
return isSqliteSessionTranscriptLocator(sessionFile) ? "" : path.dirname(sessionFile);
|
||||
}
|
||||
|
||||
function createSessionFileIdentifier(header: SessionHeader, sessionDir?: string): string {
|
||||
const dir = sessionDir?.trim();
|
||||
if (dir) {
|
||||
return path.join(path.resolve(dir), createSessionFileName(header));
|
||||
}
|
||||
return createSqliteSessionTranscriptLocator({
|
||||
agentId: DEFAULT_AGENT_ID,
|
||||
sessionId: header.id,
|
||||
});
|
||||
}
|
||||
|
||||
function resolveAgentIdFromSessionPath(sessionFile: string): string {
|
||||
void sessionFile;
|
||||
return DEFAULT_AGENT_ID;
|
||||
@@ -272,6 +285,42 @@ async function listSessionsFromDir(
|
||||
return sessions.toSorted((a, b) => b.modified.getTime() - a.modified.getTime());
|
||||
}
|
||||
|
||||
function listSqliteTranscriptRecords(): SqliteTranscriptRecord[] {
|
||||
const seen = new Set<string>();
|
||||
return [
|
||||
...listSqliteSessionTranscripts(),
|
||||
...listSqliteSessionTranscripts({ agentId: DEFAULT_AGENT_ID }),
|
||||
]
|
||||
.filter((entry) => {
|
||||
const key = `${entry.agentId}\0${entry.sessionId}`;
|
||||
if (seen.has(key)) {
|
||||
return false;
|
||||
}
|
||||
seen.add(key);
|
||||
return true;
|
||||
})
|
||||
.map((entry) => ({
|
||||
agentId: entry.agentId,
|
||||
sessionId: entry.sessionId,
|
||||
path:
|
||||
entry.path ??
|
||||
createSqliteSessionTranscriptLocator({
|
||||
agentId: entry.agentId,
|
||||
sessionId: entry.sessionId,
|
||||
}),
|
||||
updatedAt: entry.updatedAt,
|
||||
}));
|
||||
}
|
||||
|
||||
function loadTranscriptStateForRecord(record: SqliteTranscriptRecord): TranscriptState {
|
||||
return createTranscriptStateFromEvents(
|
||||
loadSqliteSessionTranscriptEvents({
|
||||
agentId: record.agentId,
|
||||
sessionId: record.sessionId,
|
||||
}).map((entry) => entry.event),
|
||||
);
|
||||
}
|
||||
|
||||
export class TranscriptSessionManager implements SessionManager {
|
||||
private state: TranscriptState;
|
||||
private sessionFile: string | undefined;
|
||||
@@ -317,13 +366,13 @@ export class TranscriptSessionManager implements SessionManager {
|
||||
}
|
||||
|
||||
static create(cwd: string, sessionDir?: string): TranscriptSessionManager {
|
||||
const dir = path.resolve(sessionDir ?? resolveDefaultSessionDir(cwd));
|
||||
const header = createSessionHeader({ cwd });
|
||||
const sessionFile = path.join(dir, createSessionFileName(header));
|
||||
const sessionFile = createSessionFileIdentifier(header, sessionDir);
|
||||
const dir = resolveSessionDirForIdentifier(sessionFile, sessionDir);
|
||||
const sqliteScope = {
|
||||
agentId: resolveAgentIdFromSessionPath(sessionFile),
|
||||
sessionId: header.id,
|
||||
transcriptPath: path.resolve(sessionFile),
|
||||
transcriptPath: normalizeSessionFileIdentifier(sessionFile),
|
||||
};
|
||||
const state = new TranscriptState({ header, entries: [] });
|
||||
persistFullTranscriptStateToSqlite(sqliteScope, state);
|
||||
@@ -347,9 +396,27 @@ export class TranscriptSessionManager implements SessionManager {
|
||||
}
|
||||
|
||||
static continueRecent(cwd: string, sessionDir?: string): TranscriptSessionManager {
|
||||
const dir = path.resolve(sessionDir ?? resolveDefaultSessionDir(cwd));
|
||||
if (!sessionDir) {
|
||||
const newestSqlite = listSqliteTranscriptRecords().find((entry) => {
|
||||
const state = loadTranscriptStateForRecord(entry);
|
||||
return state.getCwd() === cwd;
|
||||
});
|
||||
if (newestSqlite) {
|
||||
return TranscriptSessionManager.open({ sessionFile: newestSqlite.path, cwd });
|
||||
}
|
||||
return TranscriptSessionManager.create(cwd);
|
||||
}
|
||||
const dir = sessionDir ? path.resolve(sessionDir) : "";
|
||||
const newestSqlite = listSqliteSessionTranscriptFiles()
|
||||
.filter((entry) => path.dirname(path.resolve(entry.path)) === dir)
|
||||
.filter((entry) => {
|
||||
if (!dir) {
|
||||
return true;
|
||||
}
|
||||
return (
|
||||
!isSqliteSessionTranscriptLocator(entry.path) &&
|
||||
path.dirname(path.resolve(entry.path)) === dir
|
||||
);
|
||||
})
|
||||
.toSorted((a, b) => b.updatedAt - a.updatedAt)[0];
|
||||
if (newestSqlite) {
|
||||
return TranscriptSessionManager.open({ sessionFile: newestSqlite.path, cwd });
|
||||
@@ -362,7 +429,7 @@ export class TranscriptSessionManager implements SessionManager {
|
||||
targetCwd: string,
|
||||
sessionDir?: string,
|
||||
): TranscriptSessionManager {
|
||||
const sourceFile = path.resolve(sourcePath);
|
||||
const sourceFile = normalizeSessionFileIdentifier(sourcePath);
|
||||
const sourceScope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath: sourceFile });
|
||||
if (!sourceScope) {
|
||||
throw new Error(
|
||||
@@ -372,17 +439,16 @@ export class TranscriptSessionManager implements SessionManager {
|
||||
const sourceState = createTranscriptStateFromEvents(
|
||||
loadSqliteSessionTranscriptEvents(sourceScope).map((entry) => entry.event),
|
||||
);
|
||||
const dir = path.resolve(sessionDir ?? resolveDefaultSessionDir(targetCwd));
|
||||
const header = createSessionHeader({
|
||||
cwd: targetCwd,
|
||||
parentSession: sourceFile,
|
||||
});
|
||||
const sessionFile = path.join(dir, createSessionFileName(header));
|
||||
const sessionFile = createSessionFileIdentifier(header, sessionDir);
|
||||
const state = new TranscriptState({ header, entries: sourceState.getEntries() });
|
||||
const sqliteScope = {
|
||||
agentId: resolveAgentIdFromSessionPath(sessionFile),
|
||||
sessionId: header.id,
|
||||
transcriptPath: path.resolve(sessionFile),
|
||||
transcriptPath: normalizeSessionFileIdentifier(sessionFile),
|
||||
};
|
||||
persistFullTranscriptStateToSqlite(sqliteScope, state);
|
||||
return TranscriptSessionManager.open({ sessionFile, cwd: targetCwd });
|
||||
@@ -393,23 +459,20 @@ export class TranscriptSessionManager implements SessionManager {
|
||||
sessionDir?: string,
|
||||
onProgress?: SessionListProgress,
|
||||
): Promise<SessionInfo[]> {
|
||||
return await listSessionsFromDir(
|
||||
path.resolve(sessionDir ?? resolveDefaultSessionDir(cwd)),
|
||||
onProgress,
|
||||
);
|
||||
if (!sessionDir) {
|
||||
return (await TranscriptSessionManager.listAll(onProgress)).filter(
|
||||
(session) => session.cwd === cwd,
|
||||
);
|
||||
}
|
||||
return await listSessionsFromDir(path.resolve(sessionDir), onProgress);
|
||||
}
|
||||
|
||||
static async listAll(onProgress?: SessionListProgress): Promise<SessionInfo[]> {
|
||||
const files = listSqliteSessionTranscriptFiles();
|
||||
const files = listSqliteTranscriptRecords();
|
||||
const sessions: SessionInfo[] = [];
|
||||
let loaded = 0;
|
||||
for (const file of files) {
|
||||
const state = createTranscriptStateFromEvents(
|
||||
loadSqliteSessionTranscriptEvents({
|
||||
agentId: file.agentId,
|
||||
sessionId: file.sessionId,
|
||||
}).map((entry) => entry.event),
|
||||
);
|
||||
const state = loadTranscriptStateForRecord(file);
|
||||
loaded += 1;
|
||||
onProgress?.(loaded, files.length);
|
||||
const info = buildSessionInfoFromState(file.path, state, new Date(file.updatedAt));
|
||||
@@ -441,7 +504,10 @@ export class TranscriptSessionManager implements SessionManager {
|
||||
this.state = new TranscriptState({ header, entries: [] });
|
||||
if (this.persist) {
|
||||
this.sessionFile =
|
||||
this.sessionFile ?? path.join(this.sessionDir, createSessionFileName(header));
|
||||
this.sessionFile ??
|
||||
(this.sessionDir
|
||||
? path.join(this.sessionDir, createSessionFileName(header))
|
||||
: createSessionFileIdentifier(header));
|
||||
this.sqliteScope = {
|
||||
agentId: resolveAgentIdFromSessionPath(this.sessionFile),
|
||||
sessionId: header.id,
|
||||
@@ -603,7 +669,12 @@ export class TranscriptSessionManager implements SessionManager {
|
||||
parentSession: this.sessionFile,
|
||||
});
|
||||
const timestamp = header.timestamp.replace(/[:.]/g, "-");
|
||||
const sessionFile = path.join(this.sessionDir, `${timestamp}_${header.id}.jsonl`);
|
||||
const sessionFile = this.sessionDir
|
||||
? path.join(this.sessionDir, `${timestamp}_${header.id}.jsonl`)
|
||||
: createSqliteSessionTranscriptLocator({
|
||||
agentId: DEFAULT_AGENT_ID,
|
||||
sessionId: header.id,
|
||||
});
|
||||
if (!this.persist) {
|
||||
return undefined;
|
||||
}
|
||||
@@ -615,7 +686,7 @@ export class TranscriptSessionManager implements SessionManager {
|
||||
{
|
||||
agentId: resolveAgentIdFromSessionPath(sessionFile),
|
||||
sessionId: header.id,
|
||||
transcriptPath: path.resolve(sessionFile),
|
||||
transcriptPath: normalizeSessionFileIdentifier(sessionFile),
|
||||
},
|
||||
state,
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user