mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-18 20:19:47 +00:00
refactor: key transcript updates by sqlite scope
This commit is contained in:
@@ -641,21 +641,21 @@ inspection, turn persistence on explicitly:
|
||||
}
|
||||
```
|
||||
|
||||
When enabled, active memory stores transcripts in a separate directory under the
|
||||
target agent's sessions folder, not in the main user conversation transcript
|
||||
path.
|
||||
When enabled, active memory records the blocking sub-agent transcript in the
|
||||
agent SQLite database and registers a plugin-owned transcript locator under the
|
||||
state directory, not in the main user conversation transcript path.
|
||||
|
||||
The default layout is conceptually:
|
||||
The default locator namespace is conceptually:
|
||||
|
||||
```text
|
||||
agents/<agent>/sessions/active-memory/<blocking-memory-sub-agent-session-id>.jsonl
|
||||
plugins/active-memory/transcripts/agents/<agent>/active-memory/<blocking-memory-sub-agent-session-id>.jsonl
|
||||
```
|
||||
|
||||
You can change the relative subdirectory with `config.transcriptDir`.
|
||||
|
||||
Use this carefully:
|
||||
|
||||
- blocking memory sub-agent transcripts can accumulate quickly on busy sessions
|
||||
- blocking memory sub-agent transcript rows can accumulate quickly on busy sessions
|
||||
- `full` query mode can duplicate a lot of conversation context
|
||||
- these transcripts contain hidden prompt context and recalled memories
|
||||
|
||||
|
||||
@@ -474,12 +474,12 @@ function hasDeprecatedModelFallbackPolicy(pluginConfig: unknown): boolean {
|
||||
return raw ? Object.hasOwn(raw, "modelFallbackPolicy") : false;
|
||||
}
|
||||
|
||||
function resolveSafeTranscriptDir(baseSessionsDir: string, transcriptDir: string): string {
|
||||
function resolveSafeTranscriptDir(baseTranscriptDir: string, transcriptDir: string): string {
|
||||
const normalized = transcriptDir.trim();
|
||||
if (!normalized || normalized.includes(":") || path.isAbsolute(normalized)) {
|
||||
return path.resolve(baseSessionsDir, DEFAULT_TRANSCRIPT_DIR);
|
||||
return path.resolve(baseTranscriptDir, DEFAULT_TRANSCRIPT_DIR);
|
||||
}
|
||||
const resolvedBase = path.resolve(baseSessionsDir);
|
||||
const resolvedBase = path.resolve(baseTranscriptDir);
|
||||
const candidate = path.resolve(resolvedBase, normalized);
|
||||
if (!isPathInside(resolvedBase, candidate)) {
|
||||
return path.resolve(resolvedBase, DEFAULT_TRANSCRIPT_DIR);
|
||||
|
||||
@@ -124,8 +124,17 @@ export async function mirrorCodexAppServerTranscript(params: {
|
||||
}
|
||||
|
||||
if (params.sessionKey) {
|
||||
emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey });
|
||||
emitSessionTranscriptUpdate({
|
||||
agentId,
|
||||
sessionId,
|
||||
sessionFile: params.sessionFile,
|
||||
sessionKey: params.sessionKey,
|
||||
});
|
||||
} else {
|
||||
emitSessionTranscriptUpdate(params.sessionFile);
|
||||
emitSessionTranscriptUpdate({
|
||||
agentId,
|
||||
sessionId,
|
||||
sessionFile: params.sessionFile,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import {
|
||||
buildSessionTranscriptEntry,
|
||||
listSessionTranscriptsForAgent,
|
||||
readSessionTranscriptDeltaStats,
|
||||
resolveSessionTranscriptScope,
|
||||
sessionPathForTranscript,
|
||||
} from "openclaw/plugin-sdk/memory-core-host-engine-qmd";
|
||||
import {
|
||||
@@ -463,7 +464,11 @@ export abstract class MemoryManagerSyncOps {
|
||||
return;
|
||||
}
|
||||
const sessionTranscript = update.sessionFile;
|
||||
if (!this.isSessionTranscriptForAgent(sessionTranscript)) {
|
||||
const updateAgentId = update.agentId?.trim();
|
||||
if (updateAgentId && updateAgentId !== this.agentId) {
|
||||
return;
|
||||
}
|
||||
if (!updateAgentId && !this.isSessionTranscriptForAgent(sessionTranscript)) {
|
||||
return;
|
||||
}
|
||||
this.scheduleSessionDirty(sessionTranscript);
|
||||
@@ -585,6 +590,10 @@ export abstract class MemoryManagerSyncOps {
|
||||
if (!sessionTranscript) {
|
||||
return false;
|
||||
}
|
||||
const scope = resolveSessionTranscriptScope(sessionTranscript);
|
||||
if (scope) {
|
||||
return scope.agentId === this.agentId;
|
||||
}
|
||||
const sessionsDir = resolveSessionTranscriptsDirForAgent(this.agentId);
|
||||
const resolvedFile = path.resolve(sessionTranscript);
|
||||
const resolvedDir = path.resolve(sessionsDir);
|
||||
|
||||
@@ -8,6 +8,7 @@ export {
|
||||
loadSessionTranscriptClassificationForAgent,
|
||||
normalizeSessionTranscriptPathForComparison,
|
||||
readSessionTranscriptDeltaStats,
|
||||
resolveSessionTranscriptScope,
|
||||
sessionPathForTranscript,
|
||||
type BuildSessionTranscriptEntryOptions,
|
||||
type SessionTranscriptEntry,
|
||||
|
||||
@@ -56,6 +56,7 @@ function seedTranscript(params: {
|
||||
sessionId: string;
|
||||
transcriptPath?: string;
|
||||
events: unknown[];
|
||||
rememberPath?: boolean;
|
||||
now?: number;
|
||||
}): string {
|
||||
const agentId = params.agentId ?? "main";
|
||||
@@ -65,7 +66,7 @@ function seedTranscript(params: {
|
||||
replaceSqliteSessionTranscriptEvents({
|
||||
agentId,
|
||||
sessionId: params.sessionId,
|
||||
transcriptPath,
|
||||
...(params.rememberPath === false ? {} : { transcriptPath }),
|
||||
events: params.events,
|
||||
now: () => params.now ?? 1_770_000_000_000,
|
||||
});
|
||||
@@ -88,6 +89,22 @@ describe("listSessionTranscriptsForAgent", () => {
|
||||
|
||||
expect(files).toEqual([includedPath]);
|
||||
});
|
||||
|
||||
it("uses a virtual SQLite locator when no legacy transcript path is recorded", async () => {
|
||||
seedTranscript({
|
||||
sessionId: "sqlite-only",
|
||||
events: [{ type: "message", message: { role: "user", content: "Stored only in SQLite" } }],
|
||||
rememberPath: false,
|
||||
});
|
||||
|
||||
const files = await listSessionTranscriptsForAgent("main");
|
||||
const [locator] = files;
|
||||
|
||||
expect(locator).toBe("sqlite-transcript://main/sqlite-only.jsonl");
|
||||
const entry = await buildSessionTranscriptEntry(locator);
|
||||
expect(entry?.content).toBe("User: Stored only in SQLite");
|
||||
expect(entry?.path).toBe("sessions/main/sqlite-only.jsonl");
|
||||
});
|
||||
});
|
||||
|
||||
describe("sessionPathForTranscript", () => {
|
||||
|
||||
@@ -14,7 +14,6 @@ import {
|
||||
loadSqliteSessionTranscriptEvents,
|
||||
resolveSqliteSessionTranscriptScopeForPath,
|
||||
parseUsageCountedSessionIdFromFileName,
|
||||
resolveSessionTranscriptsDirForAgent,
|
||||
stripInboundMetadata,
|
||||
stripInternalRuntimeContext,
|
||||
} from "./openclaw-runtime-session.js";
|
||||
@@ -26,6 +25,7 @@ const DREAMING_NARRATIVE_RUN_PREFIX = "dreaming-narrative-";
|
||||
// This limit applies to content only; the role label adds up to 11 chars.
|
||||
const SESSION_EXPORT_CONTENT_WRAP_CHARS = 800;
|
||||
const DIRECT_CRON_PROMPT_RE = /^\[cron:[^\]]+\]\s*/;
|
||||
const SQLITE_TRANSCRIPT_REF_PREFIX = "sqlite-transcript://";
|
||||
|
||||
export type SessionTranscriptEntry = {
|
||||
path: string;
|
||||
@@ -193,13 +193,44 @@ export function loadSessionTranscriptClassificationForAgent(
|
||||
};
|
||||
}
|
||||
|
||||
function createSqliteSessionTranscriptRef(params: { agentId: string; sessionId: string }): string {
|
||||
return `${SQLITE_TRANSCRIPT_REF_PREFIX}${encodeURIComponent(params.agentId)}/${encodeURIComponent(
|
||||
params.sessionId,
|
||||
)}.jsonl`;
|
||||
}
|
||||
|
||||
export async function listSessionTranscriptsForAgent(agentId: string): Promise<string[]> {
|
||||
const dir = resolveSessionTranscriptsDirForAgent(agentId);
|
||||
return listSqliteSessionTranscripts({ agentId }).map(
|
||||
(transcript) => transcript.path ?? path.join(dir, `${transcript.sessionId}.jsonl`),
|
||||
(transcript) =>
|
||||
transcript.path ??
|
||||
createSqliteSessionTranscriptRef({
|
||||
agentId: transcript.agentId,
|
||||
sessionId: transcript.sessionId,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
function parseSqliteSessionTranscriptRef(locator: string): {
|
||||
agentId: string;
|
||||
sessionId: string;
|
||||
} | null {
|
||||
if (!locator.startsWith(SQLITE_TRANSCRIPT_REF_PREFIX)) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
const url = new URL(locator);
|
||||
const agentId = decodeURIComponent(url.hostname).trim();
|
||||
const fileName = decodeURIComponent(url.pathname.replace(/^\/+/u, "")).trim();
|
||||
const sessionId = parseUsageCountedSessionIdFromFileName(fileName);
|
||||
if (!agentId || !sessionId) {
|
||||
return null;
|
||||
}
|
||||
return { agentId, sessionId };
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function extractAgentIdFromSessionPath(absPath: string): string | null {
|
||||
const parts = path.normalize(path.resolve(absPath)).split(path.sep).filter(Boolean);
|
||||
const sessionsIndex = parts.lastIndexOf("sessions");
|
||||
@@ -214,25 +245,47 @@ function resolveSessionIdFromTranscriptPath(absPath: string): string | null {
|
||||
}
|
||||
|
||||
export function sessionPathForTranscript(absPath: string): string {
|
||||
const sqliteRef = parseSqliteSessionTranscriptRef(absPath);
|
||||
if (sqliteRef) {
|
||||
return path
|
||||
.join("sessions", sqliteRef.agentId, `${sqliteRef.sessionId}.jsonl`)
|
||||
.replace(/\\/g, "/");
|
||||
}
|
||||
const agentId = extractAgentIdFromSessionPath(absPath);
|
||||
return path
|
||||
.join("sessions", ...(agentId ? [agentId] : []), path.basename(absPath))
|
||||
.replace(/\\/g, "/");
|
||||
}
|
||||
|
||||
export function resolveSessionTranscriptScope(locator: string): {
|
||||
agentId: string;
|
||||
sessionId: string;
|
||||
transcriptPath?: string;
|
||||
} | null {
|
||||
const sqliteRef = parseSqliteSessionTranscriptRef(locator);
|
||||
if (sqliteRef) {
|
||||
return sqliteRef;
|
||||
}
|
||||
const transcriptPath = path.resolve(locator);
|
||||
const rememberedScope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath });
|
||||
const agentId = rememberedScope?.agentId ?? extractAgentIdFromSessionPath(transcriptPath);
|
||||
const sessionId =
|
||||
rememberedScope?.sessionId ?? resolveSessionIdFromTranscriptPath(transcriptPath);
|
||||
if (!agentId || !sessionId) {
|
||||
return null;
|
||||
}
|
||||
return { agentId, sessionId, transcriptPath };
|
||||
}
|
||||
|
||||
export function readSessionTranscriptDeltaStats(
|
||||
absPath: string,
|
||||
): SessionTranscriptDeltaStats | null {
|
||||
try {
|
||||
const transcriptPath = path.resolve(absPath);
|
||||
const rememberedScope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath });
|
||||
const agentId = rememberedScope?.agentId ?? extractAgentIdFromSessionPath(transcriptPath);
|
||||
const sessionId =
|
||||
rememberedScope?.sessionId ?? resolveSessionIdFromTranscriptPath(transcriptPath);
|
||||
if (!agentId || !sessionId) {
|
||||
const scope = resolveSessionTranscriptScope(absPath);
|
||||
if (!scope) {
|
||||
return null;
|
||||
}
|
||||
const transcriptEvents = loadSqliteSessionTranscriptEvents({ agentId, sessionId });
|
||||
const transcriptEvents = loadSqliteSessionTranscriptEvents(scope);
|
||||
if (transcriptEvents.length === 0) {
|
||||
return null;
|
||||
}
|
||||
@@ -453,15 +506,11 @@ export async function buildSessionTranscriptEntry(
|
||||
opts: BuildSessionTranscriptEntryOptions = {},
|
||||
): Promise<SessionTranscriptEntry | null> {
|
||||
try {
|
||||
const transcriptPath = path.resolve(absPath);
|
||||
const rememberedScope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath });
|
||||
const agentId = rememberedScope?.agentId ?? extractAgentIdFromSessionPath(transcriptPath);
|
||||
const sessionId =
|
||||
rememberedScope?.sessionId ?? resolveSessionIdFromTranscriptPath(transcriptPath);
|
||||
if (!agentId || !sessionId) {
|
||||
const scope = resolveSessionTranscriptScope(absPath);
|
||||
if (!scope) {
|
||||
return null;
|
||||
}
|
||||
const transcriptEvents = loadSqliteSessionTranscriptEvents({ agentId, sessionId });
|
||||
const transcriptEvents = loadSqliteSessionTranscriptEvents(scope);
|
||||
if (transcriptEvents.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -250,7 +250,12 @@ async function persistTextTurnTranscript(
|
||||
}
|
||||
}
|
||||
|
||||
emitSessionTranscriptUpdate({ sessionFile, sessionKey: params.sessionKey });
|
||||
emitSessionTranscriptUpdate({
|
||||
agentId: params.sessionAgentId,
|
||||
sessionId: params.sessionId,
|
||||
sessionFile,
|
||||
sessionKey: params.sessionKey,
|
||||
});
|
||||
return sessionEntry;
|
||||
}
|
||||
|
||||
|
||||
@@ -772,6 +772,8 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
|
||||
expect(result.ok).toBe(true);
|
||||
expect(listener).toHaveBeenCalledTimes(1);
|
||||
expect(listener).toHaveBeenCalledWith({
|
||||
agentId: "main",
|
||||
sessionId: "rotated-session",
|
||||
sessionFile: TEST_ROTATED_SESSION_FILE,
|
||||
sessionKey: TEST_SESSION_KEY,
|
||||
});
|
||||
@@ -1252,6 +1254,8 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => {
|
||||
expect(result.ok).toBe(true);
|
||||
expect(listener).toHaveBeenCalledTimes(1);
|
||||
expect(listener).toHaveBeenCalledWith({
|
||||
agentId: "main",
|
||||
sessionId: "session-1",
|
||||
sessionFile: TEST_SESSION_FILE,
|
||||
sessionKey: TEST_SESSION_KEY,
|
||||
});
|
||||
|
||||
@@ -246,6 +246,8 @@ export async function compactEmbeddedPiSession(
|
||||
if (engineOwnsCompaction && result.ok && result.compacted) {
|
||||
await runPostCompactionSideEffects({
|
||||
config: params.config,
|
||||
agentId: agentIds.sessionAgentId,
|
||||
sessionId: postCompactionSessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: postCompactionSessionFile,
|
||||
});
|
||||
|
||||
@@ -1276,6 +1276,8 @@ async function compactEmbeddedPiSessionDirectOnce(
|
||||
}
|
||||
await runPostCompactionSideEffects({
|
||||
config: params.config,
|
||||
agentId: sessionAgentId,
|
||||
sessionId: activeSessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: activeSessionFile,
|
||||
});
|
||||
|
||||
@@ -81,6 +81,8 @@ function syncPostCompactionSessionMemory(params: {
|
||||
|
||||
export async function runPostCompactionSideEffects(params: {
|
||||
config?: OpenClawConfig;
|
||||
agentId?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
sessionFile: string;
|
||||
}): Promise<void> {
|
||||
@@ -88,7 +90,12 @@ export async function runPostCompactionSideEffects(params: {
|
||||
if (!sessionFile) {
|
||||
return;
|
||||
}
|
||||
emitSessionTranscriptUpdate({ sessionFile, sessionKey: params.sessionKey });
|
||||
emitSessionTranscriptUpdate({
|
||||
...(params.agentId ? { agentId: params.agentId } : {}),
|
||||
...(params.sessionId ? { sessionId: params.sessionId } : {}),
|
||||
sessionFile,
|
||||
sessionKey: params.sessionKey,
|
||||
});
|
||||
await syncPostCompactionSessionMemory({
|
||||
config: params.config,
|
||||
sessionKey: params.sessionKey,
|
||||
|
||||
@@ -1699,6 +1699,8 @@ export async function runEmbeddedPiAgent(
|
||||
if (contextEngine.info.ownsCompaction === true) {
|
||||
await runPostCompactionSideEffects({
|
||||
config: params.config,
|
||||
agentId: sessionAgentId,
|
||||
sessionId: activeSessionId,
|
||||
sessionKey: params.sessionKey,
|
||||
sessionFile: activeSessionFile,
|
||||
});
|
||||
|
||||
@@ -299,10 +299,22 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
|
||||
|
||||
switch (params.updateMode ?? "inline") {
|
||||
case "inline":
|
||||
emitSessionTranscriptUpdate({ sessionFile, sessionKey, message, messageId });
|
||||
emitSessionTranscriptUpdate({
|
||||
agentId,
|
||||
sessionId: entry.sessionId,
|
||||
sessionFile,
|
||||
sessionKey,
|
||||
message,
|
||||
messageId,
|
||||
});
|
||||
break;
|
||||
case "file-only":
|
||||
emitSessionTranscriptUpdate({ sessionFile, sessionKey });
|
||||
emitSessionTranscriptUpdate({
|
||||
agentId,
|
||||
sessionId: entry.sessionId,
|
||||
sessionFile,
|
||||
sessionKey,
|
||||
});
|
||||
break;
|
||||
case "none":
|
||||
break;
|
||||
|
||||
@@ -109,16 +109,20 @@ export async function appendInjectedAssistantMessageToTranscript(params: {
|
||||
const existingScope = resolveSqliteSessionTranscriptScopeForPath({
|
||||
transcriptPath: params.transcriptPath,
|
||||
});
|
||||
const agentId = params.agentId ?? existingScope?.agentId ?? DEFAULT_AGENT_ID;
|
||||
const sessionId = params.sessionId ?? existingScope?.sessionId;
|
||||
const { messageId } = await appendSessionTranscriptMessage({
|
||||
transcriptPath: params.transcriptPath,
|
||||
agentId: params.agentId ?? existingScope?.agentId ?? DEFAULT_AGENT_ID,
|
||||
sessionId: params.sessionId ?? existingScope?.sessionId,
|
||||
agentId,
|
||||
sessionId,
|
||||
message: messageBody,
|
||||
now,
|
||||
useRawWhenLinear: true,
|
||||
config: params.config,
|
||||
});
|
||||
emitSessionTranscriptUpdate({
|
||||
agentId,
|
||||
...(sessionId ? { sessionId } : {}),
|
||||
sessionFile: params.transcriptPath,
|
||||
message: messageBody,
|
||||
messageId,
|
||||
|
||||
@@ -2269,6 +2269,8 @@ export const chatHandlers: GatewayRequestHandlers = {
|
||||
}
|
||||
const persistedImages = await persistedImagesPromise;
|
||||
emitSessionTranscriptUpdate({
|
||||
agentId,
|
||||
sessionId: resolvedSessionId,
|
||||
sessionFile: transcriptPath,
|
||||
sessionKey,
|
||||
message: buildChatSendTranscriptMessage({
|
||||
|
||||
@@ -25,15 +25,21 @@ describe("transcript events", () => {
|
||||
cleanup.push(onSessionTranscriptUpdate(listener));
|
||||
|
||||
emitSessionTranscriptUpdate({
|
||||
agentId: " main ",
|
||||
sessionId: " sess-1 ",
|
||||
sessionFile: " /tmp/session.jsonl ",
|
||||
sessionKey: " agent:main:main ",
|
||||
message: { role: "assistant", content: "hi" },
|
||||
messageId: " msg-1 ",
|
||||
});
|
||||
|
||||
expect(listener).toHaveBeenCalledWith({
|
||||
agentId: "main",
|
||||
sessionId: "sess-1",
|
||||
sessionFile: "/tmp/session.jsonl",
|
||||
sessionKey: "agent:main:main",
|
||||
message: { role: "assistant", content: "hi" },
|
||||
messageId: "msg-1",
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { normalizeOptionalString } from "../shared/string-coerce.js";
|
||||
|
||||
export type SessionTranscriptUpdate = {
|
||||
agentId?: string;
|
||||
sessionId?: string;
|
||||
sessionFile: string;
|
||||
sessionKey?: string;
|
||||
message?: unknown;
|
||||
@@ -23,6 +25,8 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp
|
||||
typeof update === "string"
|
||||
? { sessionFile: update }
|
||||
: {
|
||||
agentId: update.agentId,
|
||||
sessionId: update.sessionId,
|
||||
sessionFile: update.sessionFile,
|
||||
sessionKey: update.sessionKey,
|
||||
message: update.message,
|
||||
@@ -33,6 +37,12 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp
|
||||
return;
|
||||
}
|
||||
const nextUpdate: SessionTranscriptUpdate = {
|
||||
...(normalizeOptionalString(normalized.agentId)
|
||||
? { agentId: normalizeOptionalString(normalized.agentId) }
|
||||
: {}),
|
||||
...(normalizeOptionalString(normalized.sessionId)
|
||||
? { sessionId: normalizeOptionalString(normalized.sessionId) }
|
||||
: {}),
|
||||
sessionFile: trimmed,
|
||||
...(normalizeOptionalString(normalized.sessionKey)
|
||||
? { sessionKey: normalizeOptionalString(normalized.sessionKey) }
|
||||
|
||||
Reference in New Issue
Block a user