diff --git a/docs/concepts/active-memory.md b/docs/concepts/active-memory.md index 258b7ff65eb..784a2c03637 100644 --- a/docs/concepts/active-memory.md +++ b/docs/concepts/active-memory.md @@ -641,21 +641,21 @@ inspection, turn persistence on explicitly: } ``` -When enabled, active memory stores transcripts in a separate directory under the -target agent's sessions folder, not in the main user conversation transcript -path. +When enabled, active memory records the blocking sub-agent transcript in the +agent SQLite database and registers a plugin-owned transcript locator under the +state directory, not in the main user conversation transcript path. -The default layout is conceptually: +The default locator namespace is conceptually: ```text -agents//sessions/active-memory/.jsonl +plugins/active-memory/transcripts/agents//active-memory/.jsonl ``` You can change the relative subdirectory with `config.transcriptDir`. Use this carefully: -- blocking memory sub-agent transcripts can accumulate quickly on busy sessions +- blocking memory sub-agent transcript rows can accumulate quickly on busy sessions - `full` query mode can duplicate a lot of conversation context - these transcripts contain hidden prompt context and recalled memories diff --git a/extensions/active-memory/index.ts b/extensions/active-memory/index.ts index c103c7ed8d0..687c6d5865d 100644 --- a/extensions/active-memory/index.ts +++ b/extensions/active-memory/index.ts @@ -474,12 +474,12 @@ function hasDeprecatedModelFallbackPolicy(pluginConfig: unknown): boolean { return raw ? Object.hasOwn(raw, "modelFallbackPolicy") : false; } -function resolveSafeTranscriptDir(baseSessionsDir: string, transcriptDir: string): string { +function resolveSafeTranscriptDir(baseTranscriptDir: string, transcriptDir: string): string { const normalized = transcriptDir.trim(); if (!normalized || normalized.includes(":") || path.isAbsolute(normalized)) { - return path.resolve(baseSessionsDir, DEFAULT_TRANSCRIPT_DIR); + return path.resolve(baseTranscriptDir, DEFAULT_TRANSCRIPT_DIR); } - const resolvedBase = path.resolve(baseSessionsDir); + const resolvedBase = path.resolve(baseTranscriptDir); const candidate = path.resolve(resolvedBase, normalized); if (!isPathInside(resolvedBase, candidate)) { return path.resolve(resolvedBase, DEFAULT_TRANSCRIPT_DIR); diff --git a/extensions/codex/src/app-server/transcript-mirror.ts b/extensions/codex/src/app-server/transcript-mirror.ts index e73fd48cc96..727feb749c8 100644 --- a/extensions/codex/src/app-server/transcript-mirror.ts +++ b/extensions/codex/src/app-server/transcript-mirror.ts @@ -124,8 +124,17 @@ export async function mirrorCodexAppServerTranscript(params: { } if (params.sessionKey) { - emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey }); + emitSessionTranscriptUpdate({ + agentId, + sessionId, + sessionFile: params.sessionFile, + sessionKey: params.sessionKey, + }); } else { - emitSessionTranscriptUpdate(params.sessionFile); + emitSessionTranscriptUpdate({ + agentId, + sessionId, + sessionFile: params.sessionFile, + }); } } diff --git a/extensions/memory-core/src/memory/manager-sync-ops.ts b/extensions/memory-core/src/memory/manager-sync-ops.ts index 8a17ff681fd..32d08448cd1 100644 --- a/extensions/memory-core/src/memory/manager-sync-ops.ts +++ b/extensions/memory-core/src/memory/manager-sync-ops.ts @@ -18,6 +18,7 @@ import { buildSessionTranscriptEntry, listSessionTranscriptsForAgent, readSessionTranscriptDeltaStats, + resolveSessionTranscriptScope, sessionPathForTranscript, } from "openclaw/plugin-sdk/memory-core-host-engine-qmd"; import { @@ -463,7 +464,11 @@ export abstract class MemoryManagerSyncOps { return; } const sessionTranscript = update.sessionFile; - if (!this.isSessionTranscriptForAgent(sessionTranscript)) { + const updateAgentId = update.agentId?.trim(); + if (updateAgentId && updateAgentId !== this.agentId) { + return; + } + if (!updateAgentId && !this.isSessionTranscriptForAgent(sessionTranscript)) { return; } this.scheduleSessionDirty(sessionTranscript); @@ -585,6 +590,10 @@ export abstract class MemoryManagerSyncOps { if (!sessionTranscript) { return false; } + const scope = resolveSessionTranscriptScope(sessionTranscript); + if (scope) { + return scope.agentId === this.agentId; + } const sessionsDir = resolveSessionTranscriptsDirForAgent(this.agentId); const resolvedFile = path.resolve(sessionTranscript); const resolvedDir = path.resolve(sessionsDir); diff --git a/packages/memory-host-sdk/src/engine-qmd.ts b/packages/memory-host-sdk/src/engine-qmd.ts index 6a7fa0306b2..eb24c103812 100644 --- a/packages/memory-host-sdk/src/engine-qmd.ts +++ b/packages/memory-host-sdk/src/engine-qmd.ts @@ -8,6 +8,7 @@ export { loadSessionTranscriptClassificationForAgent, normalizeSessionTranscriptPathForComparison, readSessionTranscriptDeltaStats, + resolveSessionTranscriptScope, sessionPathForTranscript, type BuildSessionTranscriptEntryOptions, type SessionTranscriptEntry, diff --git a/packages/memory-host-sdk/src/host/session-transcripts.test.ts b/packages/memory-host-sdk/src/host/session-transcripts.test.ts index 719fe784239..d8bda0817e5 100644 --- a/packages/memory-host-sdk/src/host/session-transcripts.test.ts +++ b/packages/memory-host-sdk/src/host/session-transcripts.test.ts @@ -56,6 +56,7 @@ function seedTranscript(params: { sessionId: string; transcriptPath?: string; events: unknown[]; + rememberPath?: boolean; now?: number; }): string { const agentId = params.agentId ?? "main"; @@ -65,7 +66,7 @@ function seedTranscript(params: { replaceSqliteSessionTranscriptEvents({ agentId, sessionId: params.sessionId, - transcriptPath, + ...(params.rememberPath === false ? {} : { transcriptPath }), events: params.events, now: () => params.now ?? 1_770_000_000_000, }); @@ -88,6 +89,22 @@ describe("listSessionTranscriptsForAgent", () => { expect(files).toEqual([includedPath]); }); + + it("uses a virtual SQLite locator when no legacy transcript path is recorded", async () => { + seedTranscript({ + sessionId: "sqlite-only", + events: [{ type: "message", message: { role: "user", content: "Stored only in SQLite" } }], + rememberPath: false, + }); + + const files = await listSessionTranscriptsForAgent("main"); + const [locator] = files; + + expect(locator).toBe("sqlite-transcript://main/sqlite-only.jsonl"); + const entry = await buildSessionTranscriptEntry(locator); + expect(entry?.content).toBe("User: Stored only in SQLite"); + expect(entry?.path).toBe("sessions/main/sqlite-only.jsonl"); + }); }); describe("sessionPathForTranscript", () => { diff --git a/packages/memory-host-sdk/src/host/session-transcripts.ts b/packages/memory-host-sdk/src/host/session-transcripts.ts index d44793c3fcb..a8022f2764a 100644 --- a/packages/memory-host-sdk/src/host/session-transcripts.ts +++ b/packages/memory-host-sdk/src/host/session-transcripts.ts @@ -14,7 +14,6 @@ import { loadSqliteSessionTranscriptEvents, resolveSqliteSessionTranscriptScopeForPath, parseUsageCountedSessionIdFromFileName, - resolveSessionTranscriptsDirForAgent, stripInboundMetadata, stripInternalRuntimeContext, } from "./openclaw-runtime-session.js"; @@ -26,6 +25,7 @@ const DREAMING_NARRATIVE_RUN_PREFIX = "dreaming-narrative-"; // This limit applies to content only; the role label adds up to 11 chars. const SESSION_EXPORT_CONTENT_WRAP_CHARS = 800; const DIRECT_CRON_PROMPT_RE = /^\[cron:[^\]]+\]\s*/; +const SQLITE_TRANSCRIPT_REF_PREFIX = "sqlite-transcript://"; export type SessionTranscriptEntry = { path: string; @@ -193,13 +193,44 @@ export function loadSessionTranscriptClassificationForAgent( }; } +function createSqliteSessionTranscriptRef(params: { agentId: string; sessionId: string }): string { + return `${SQLITE_TRANSCRIPT_REF_PREFIX}${encodeURIComponent(params.agentId)}/${encodeURIComponent( + params.sessionId, + )}.jsonl`; +} + export async function listSessionTranscriptsForAgent(agentId: string): Promise { - const dir = resolveSessionTranscriptsDirForAgent(agentId); return listSqliteSessionTranscripts({ agentId }).map( - (transcript) => transcript.path ?? path.join(dir, `${transcript.sessionId}.jsonl`), + (transcript) => + transcript.path ?? + createSqliteSessionTranscriptRef({ + agentId: transcript.agentId, + sessionId: transcript.sessionId, + }), ); } +function parseSqliteSessionTranscriptRef(locator: string): { + agentId: string; + sessionId: string; +} | null { + if (!locator.startsWith(SQLITE_TRANSCRIPT_REF_PREFIX)) { + return null; + } + try { + const url = new URL(locator); + const agentId = decodeURIComponent(url.hostname).trim(); + const fileName = decodeURIComponent(url.pathname.replace(/^\/+/u, "")).trim(); + const sessionId = parseUsageCountedSessionIdFromFileName(fileName); + if (!agentId || !sessionId) { + return null; + } + return { agentId, sessionId }; + } catch { + return null; + } +} + function extractAgentIdFromSessionPath(absPath: string): string | null { const parts = path.normalize(path.resolve(absPath)).split(path.sep).filter(Boolean); const sessionsIndex = parts.lastIndexOf("sessions"); @@ -214,25 +245,47 @@ function resolveSessionIdFromTranscriptPath(absPath: string): string | null { } export function sessionPathForTranscript(absPath: string): string { + const sqliteRef = parseSqliteSessionTranscriptRef(absPath); + if (sqliteRef) { + return path + .join("sessions", sqliteRef.agentId, `${sqliteRef.sessionId}.jsonl`) + .replace(/\\/g, "/"); + } const agentId = extractAgentIdFromSessionPath(absPath); return path .join("sessions", ...(agentId ? [agentId] : []), path.basename(absPath)) .replace(/\\/g, "/"); } +export function resolveSessionTranscriptScope(locator: string): { + agentId: string; + sessionId: string; + transcriptPath?: string; +} | null { + const sqliteRef = parseSqliteSessionTranscriptRef(locator); + if (sqliteRef) { + return sqliteRef; + } + const transcriptPath = path.resolve(locator); + const rememberedScope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath }); + const agentId = rememberedScope?.agentId ?? extractAgentIdFromSessionPath(transcriptPath); + const sessionId = + rememberedScope?.sessionId ?? resolveSessionIdFromTranscriptPath(transcriptPath); + if (!agentId || !sessionId) { + return null; + } + return { agentId, sessionId, transcriptPath }; +} + export function readSessionTranscriptDeltaStats( absPath: string, ): SessionTranscriptDeltaStats | null { try { - const transcriptPath = path.resolve(absPath); - const rememberedScope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath }); - const agentId = rememberedScope?.agentId ?? extractAgentIdFromSessionPath(transcriptPath); - const sessionId = - rememberedScope?.sessionId ?? resolveSessionIdFromTranscriptPath(transcriptPath); - if (!agentId || !sessionId) { + const scope = resolveSessionTranscriptScope(absPath); + if (!scope) { return null; } - const transcriptEvents = loadSqliteSessionTranscriptEvents({ agentId, sessionId }); + const transcriptEvents = loadSqliteSessionTranscriptEvents(scope); if (transcriptEvents.length === 0) { return null; } @@ -453,15 +506,11 @@ export async function buildSessionTranscriptEntry( opts: BuildSessionTranscriptEntryOptions = {}, ): Promise { try { - const transcriptPath = path.resolve(absPath); - const rememberedScope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath }); - const agentId = rememberedScope?.agentId ?? extractAgentIdFromSessionPath(transcriptPath); - const sessionId = - rememberedScope?.sessionId ?? resolveSessionIdFromTranscriptPath(transcriptPath); - if (!agentId || !sessionId) { + const scope = resolveSessionTranscriptScope(absPath); + if (!scope) { return null; } - const transcriptEvents = loadSqliteSessionTranscriptEvents({ agentId, sessionId }); + const transcriptEvents = loadSqliteSessionTranscriptEvents(scope); if (transcriptEvents.length === 0) { return null; } diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index 127bd28f5c5..1c68f79c247 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -250,7 +250,12 @@ async function persistTextTurnTranscript( } } - emitSessionTranscriptUpdate({ sessionFile, sessionKey: params.sessionKey }); + emitSessionTranscriptUpdate({ + agentId: params.sessionAgentId, + sessionId: params.sessionId, + sessionFile, + sessionKey: params.sessionKey, + }); return sessionEntry; } diff --git a/src/agents/pi-embedded-runner/compact.hooks.test.ts b/src/agents/pi-embedded-runner/compact.hooks.test.ts index 8d3edd02a5c..1569dab915c 100644 --- a/src/agents/pi-embedded-runner/compact.hooks.test.ts +++ b/src/agents/pi-embedded-runner/compact.hooks.test.ts @@ -772,6 +772,8 @@ describe("compactEmbeddedPiSessionDirect hooks", () => { expect(result.ok).toBe(true); expect(listener).toHaveBeenCalledTimes(1); expect(listener).toHaveBeenCalledWith({ + agentId: "main", + sessionId: "rotated-session", sessionFile: TEST_ROTATED_SESSION_FILE, sessionKey: TEST_SESSION_KEY, }); @@ -1252,6 +1254,8 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => { expect(result.ok).toBe(true); expect(listener).toHaveBeenCalledTimes(1); expect(listener).toHaveBeenCalledWith({ + agentId: "main", + sessionId: "session-1", sessionFile: TEST_SESSION_FILE, sessionKey: TEST_SESSION_KEY, }); diff --git a/src/agents/pi-embedded-runner/compact.queued.ts b/src/agents/pi-embedded-runner/compact.queued.ts index 4c709ed5d8a..496112256b1 100644 --- a/src/agents/pi-embedded-runner/compact.queued.ts +++ b/src/agents/pi-embedded-runner/compact.queued.ts @@ -246,6 +246,8 @@ export async function compactEmbeddedPiSession( if (engineOwnsCompaction && result.ok && result.compacted) { await runPostCompactionSideEffects({ config: params.config, + agentId: agentIds.sessionAgentId, + sessionId: postCompactionSessionId, sessionKey: params.sessionKey, sessionFile: postCompactionSessionFile, }); diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index c9b23ad964d..aad8087c745 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -1276,6 +1276,8 @@ async function compactEmbeddedPiSessionDirectOnce( } await runPostCompactionSideEffects({ config: params.config, + agentId: sessionAgentId, + sessionId: activeSessionId, sessionKey: params.sessionKey, sessionFile: activeSessionFile, }); diff --git a/src/agents/pi-embedded-runner/compaction-hooks.ts b/src/agents/pi-embedded-runner/compaction-hooks.ts index 0f58204cf38..b688d76b24f 100644 --- a/src/agents/pi-embedded-runner/compaction-hooks.ts +++ b/src/agents/pi-embedded-runner/compaction-hooks.ts @@ -81,6 +81,8 @@ function syncPostCompactionSessionMemory(params: { export async function runPostCompactionSideEffects(params: { config?: OpenClawConfig; + agentId?: string; + sessionId?: string; sessionKey?: string; sessionFile: string; }): Promise { @@ -88,7 +90,12 @@ export async function runPostCompactionSideEffects(params: { if (!sessionFile) { return; } - emitSessionTranscriptUpdate({ sessionFile, sessionKey: params.sessionKey }); + emitSessionTranscriptUpdate({ + ...(params.agentId ? { agentId: params.agentId } : {}), + ...(params.sessionId ? { sessionId: params.sessionId } : {}), + sessionFile, + sessionKey: params.sessionKey, + }); await syncPostCompactionSessionMemory({ config: params.config, sessionKey: params.sessionKey, diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index ce69b4da776..df3bd227030 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -1699,6 +1699,8 @@ export async function runEmbeddedPiAgent( if (contextEngine.info.ownsCompaction === true) { await runPostCompactionSideEffects({ config: params.config, + agentId: sessionAgentId, + sessionId: activeSessionId, sessionKey: params.sessionKey, sessionFile: activeSessionFile, }); diff --git a/src/config/sessions/transcript.ts b/src/config/sessions/transcript.ts index 023f84c9ad1..01f04dab730 100644 --- a/src/config/sessions/transcript.ts +++ b/src/config/sessions/transcript.ts @@ -299,10 +299,22 @@ export async function appendExactAssistantMessageToSessionTranscript(params: { switch (params.updateMode ?? "inline") { case "inline": - emitSessionTranscriptUpdate({ sessionFile, sessionKey, message, messageId }); + emitSessionTranscriptUpdate({ + agentId, + sessionId: entry.sessionId, + sessionFile, + sessionKey, + message, + messageId, + }); break; case "file-only": - emitSessionTranscriptUpdate({ sessionFile, sessionKey }); + emitSessionTranscriptUpdate({ + agentId, + sessionId: entry.sessionId, + sessionFile, + sessionKey, + }); break; case "none": break; diff --git a/src/gateway/server-methods/chat-transcript-inject.ts b/src/gateway/server-methods/chat-transcript-inject.ts index a239771a7a3..c09de65ab6e 100644 --- a/src/gateway/server-methods/chat-transcript-inject.ts +++ b/src/gateway/server-methods/chat-transcript-inject.ts @@ -109,16 +109,20 @@ export async function appendInjectedAssistantMessageToTranscript(params: { const existingScope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath: params.transcriptPath, }); + const agentId = params.agentId ?? existingScope?.agentId ?? DEFAULT_AGENT_ID; + const sessionId = params.sessionId ?? existingScope?.sessionId; const { messageId } = await appendSessionTranscriptMessage({ transcriptPath: params.transcriptPath, - agentId: params.agentId ?? existingScope?.agentId ?? DEFAULT_AGENT_ID, - sessionId: params.sessionId ?? existingScope?.sessionId, + agentId, + sessionId, message: messageBody, now, useRawWhenLinear: true, config: params.config, }); emitSessionTranscriptUpdate({ + agentId, + ...(sessionId ? { sessionId } : {}), sessionFile: params.transcriptPath, message: messageBody, messageId, diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index d9632b5f7f7..7cb52183cc0 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -2269,6 +2269,8 @@ export const chatHandlers: GatewayRequestHandlers = { } const persistedImages = await persistedImagesPromise; emitSessionTranscriptUpdate({ + agentId, + sessionId: resolvedSessionId, sessionFile: transcriptPath, sessionKey, message: buildChatSendTranscriptMessage({ diff --git a/src/sessions/transcript-events.test.ts b/src/sessions/transcript-events.test.ts index 3256793d677..52abe1e6731 100644 --- a/src/sessions/transcript-events.test.ts +++ b/src/sessions/transcript-events.test.ts @@ -25,15 +25,21 @@ describe("transcript events", () => { cleanup.push(onSessionTranscriptUpdate(listener)); emitSessionTranscriptUpdate({ + agentId: " main ", + sessionId: " sess-1 ", sessionFile: " /tmp/session.jsonl ", sessionKey: " agent:main:main ", message: { role: "assistant", content: "hi" }, + messageId: " msg-1 ", }); expect(listener).toHaveBeenCalledWith({ + agentId: "main", + sessionId: "sess-1", sessionFile: "/tmp/session.jsonl", sessionKey: "agent:main:main", message: { role: "assistant", content: "hi" }, + messageId: "msg-1", }); }); diff --git a/src/sessions/transcript-events.ts b/src/sessions/transcript-events.ts index 4c540d209ef..c3e275acd3d 100644 --- a/src/sessions/transcript-events.ts +++ b/src/sessions/transcript-events.ts @@ -1,6 +1,8 @@ import { normalizeOptionalString } from "../shared/string-coerce.js"; export type SessionTranscriptUpdate = { + agentId?: string; + sessionId?: string; sessionFile: string; sessionKey?: string; message?: unknown; @@ -23,6 +25,8 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp typeof update === "string" ? { sessionFile: update } : { + agentId: update.agentId, + sessionId: update.sessionId, sessionFile: update.sessionFile, sessionKey: update.sessionKey, message: update.message, @@ -33,6 +37,12 @@ export function emitSessionTranscriptUpdate(update: string | SessionTranscriptUp return; } const nextUpdate: SessionTranscriptUpdate = { + ...(normalizeOptionalString(normalized.agentId) + ? { agentId: normalizeOptionalString(normalized.agentId) } + : {}), + ...(normalizeOptionalString(normalized.sessionId) + ? { sessionId: normalizeOptionalString(normalized.sessionId) } + : {}), sessionFile: trimmed, ...(normalizeOptionalString(normalized.sessionKey) ? { sessionKey: normalizeOptionalString(normalized.sessionKey) }