mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-20 21:23:23 +00:00
memory-core: harden dreaming session ingestion privacy and idempotence
This commit is contained in:
committed by
Vignesh
parent
6ab359f5a9
commit
5291a2cfd1
@@ -431,6 +431,376 @@ describe("memory-core dreaming phases", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("redacts sensitive session content before writing session corpus", async () => {
|
||||
const workspaceDir = await createTempWorkspace("openclaw-dreaming-phases-");
|
||||
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", path.join(workspaceDir, ".state"));
|
||||
await fs.mkdir(path.join(workspaceDir, "memory"), { recursive: true });
|
||||
const sessionsDir = resolveSessionTranscriptsDirForAgent("main");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
const transcriptPath = path.join(sessionsDir, "dreaming-main.jsonl");
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
timestamp: "2026-04-05T18:01:00.000Z",
|
||||
content: [{ type: "text", text: "OPENAI_API_KEY=sk-1234567890abcdef" }],
|
||||
},
|
||||
}),
|
||||
].join("\n") + "\n",
|
||||
"utf-8",
|
||||
);
|
||||
const mtime = new Date("2026-04-05T18:05:00.000Z");
|
||||
await fs.utimes(transcriptPath, mtime, mtime);
|
||||
|
||||
const { beforeAgentReply } = createHarness(
|
||||
{
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: workspaceDir,
|
||||
},
|
||||
},
|
||||
plugins: {
|
||||
entries: {
|
||||
"memory-core": {
|
||||
config: {
|
||||
dreaming: {
|
||||
enabled: true,
|
||||
phases: {
|
||||
light: {
|
||||
enabled: true,
|
||||
limit: 20,
|
||||
lookbackDays: 7,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
workspaceDir,
|
||||
);
|
||||
|
||||
try {
|
||||
await beforeAgentReply(
|
||||
{ cleanedBody: "__openclaw_memory_core_light_sleep__" },
|
||||
{ trigger: "heartbeat", workspaceDir },
|
||||
);
|
||||
} finally {
|
||||
vi.unstubAllEnvs();
|
||||
}
|
||||
|
||||
const corpusPath = path.join(
|
||||
workspaceDir,
|
||||
"memory",
|
||||
".dreams",
|
||||
"session-corpus",
|
||||
"2026-04-05.txt",
|
||||
);
|
||||
const corpus = await fs.readFile(corpusPath, "utf-8");
|
||||
expect(corpus).not.toContain("OPENAI_API_KEY=sk-1234567890abcdef");
|
||||
expect(corpus).toContain("OPENAI_API_KEY=sk-123…cdef");
|
||||
});
|
||||
|
||||
it("dedupes reset/deleted session archives instead of double-ingesting", async () => {
|
||||
const workspaceDir = await createTempWorkspace("openclaw-dreaming-phases-");
|
||||
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", path.join(workspaceDir, ".state"));
|
||||
await fs.mkdir(path.join(workspaceDir, "memory"), { recursive: true });
|
||||
const sessionsDir = resolveSessionTranscriptsDirForAgent("main");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
const transcriptPath = path.join(sessionsDir, "dreaming-main.jsonl");
|
||||
const oldMessage = "Move backups to S3 Glacier.";
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
timestamp: "2026-04-05T18:01:00.000Z",
|
||||
content: [{ type: "text", text: oldMessage }],
|
||||
},
|
||||
}),
|
||||
].join("\n") + "\n",
|
||||
"utf-8",
|
||||
);
|
||||
const dayOne = new Date("2026-04-05T18:05:00.000Z");
|
||||
await fs.utimes(transcriptPath, dayOne, dayOne);
|
||||
|
||||
const { beforeAgentReply } = createHarness(
|
||||
{
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: workspaceDir,
|
||||
},
|
||||
},
|
||||
plugins: {
|
||||
entries: {
|
||||
"memory-core": {
|
||||
config: {
|
||||
dreaming: {
|
||||
enabled: true,
|
||||
phases: {
|
||||
light: {
|
||||
enabled: true,
|
||||
limit: 20,
|
||||
lookbackDays: 7,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
workspaceDir,
|
||||
);
|
||||
|
||||
try {
|
||||
await beforeAgentReply(
|
||||
{ cleanedBody: "__openclaw_memory_core_light_sleep__" },
|
||||
{ trigger: "heartbeat", workspaceDir },
|
||||
);
|
||||
|
||||
const resetPath = path.join(
|
||||
sessionsDir,
|
||||
"dreaming-main.jsonl.reset.2026-04-06T01-00-00.000Z",
|
||||
);
|
||||
await fs.writeFile(resetPath, await fs.readFile(transcriptPath, "utf-8"), "utf-8");
|
||||
const newMessage = "Keep retention at 365 days.";
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
timestamp: "2026-04-06T01:01:00.000Z",
|
||||
content: [{ type: "text", text: oldMessage }],
|
||||
},
|
||||
}),
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "assistant",
|
||||
timestamp: "2026-04-06T01:02:00.000Z",
|
||||
content: [{ type: "text", text: newMessage }],
|
||||
},
|
||||
}),
|
||||
].join("\n") + "\n",
|
||||
"utf-8",
|
||||
);
|
||||
const dayTwo = new Date("2026-04-06T01:05:00.000Z");
|
||||
await fs.utimes(transcriptPath, dayTwo, dayTwo);
|
||||
await fs.utimes(resetPath, dayTwo, dayTwo);
|
||||
|
||||
await beforeAgentReply(
|
||||
{ cleanedBody: "__openclaw_memory_core_light_sleep__" },
|
||||
{ trigger: "heartbeat", workspaceDir },
|
||||
);
|
||||
} finally {
|
||||
vi.unstubAllEnvs();
|
||||
}
|
||||
|
||||
const ranked = await rankShortTermPromotionCandidates({
|
||||
workspaceDir,
|
||||
minScore: 0,
|
||||
minRecallCount: 0,
|
||||
minUniqueQueries: 0,
|
||||
nowMs: Date.parse("2026-04-06T02:00:00.000Z"),
|
||||
});
|
||||
const oldCandidate = ranked.find((candidate) => candidate.snippet.includes(oldMessage));
|
||||
const newCandidate = ranked.find((candidate) => candidate.snippet.includes("retention at 365"));
|
||||
expect(oldCandidate?.dailyCount).toBe(1);
|
||||
expect(newCandidate?.dailyCount).toBe(1);
|
||||
});
|
||||
|
||||
it("re-ingests rewritten session transcripts after truncate/reset", async () => {
|
||||
const workspaceDir = await createTempWorkspace("openclaw-dreaming-phases-");
|
||||
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", path.join(workspaceDir, ".state"));
|
||||
await fs.mkdir(path.join(workspaceDir, "memory"), { recursive: true });
|
||||
const sessionsDir = resolveSessionTranscriptsDirForAgent("main");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
const transcriptPath = path.join(sessionsDir, "dreaming-main.jsonl");
|
||||
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
timestamp: "2026-04-05T18:01:00.000Z",
|
||||
content: [{ type: "text", text: "Move backups to S3 Glacier." }],
|
||||
},
|
||||
}),
|
||||
].join("\n") + "\n",
|
||||
"utf-8",
|
||||
);
|
||||
const dayOne = new Date("2026-04-05T18:05:00.000Z");
|
||||
await fs.utimes(transcriptPath, dayOne, dayOne);
|
||||
|
||||
const { beforeAgentReply } = createHarness(
|
||||
{
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: workspaceDir,
|
||||
},
|
||||
},
|
||||
plugins: {
|
||||
entries: {
|
||||
"memory-core": {
|
||||
config: {
|
||||
dreaming: {
|
||||
enabled: true,
|
||||
phases: {
|
||||
light: {
|
||||
enabled: true,
|
||||
limit: 20,
|
||||
lookbackDays: 7,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
workspaceDir,
|
||||
);
|
||||
|
||||
try {
|
||||
await beforeAgentReply(
|
||||
{ cleanedBody: "__openclaw_memory_core_light_sleep__" },
|
||||
{ trigger: "heartbeat", workspaceDir },
|
||||
);
|
||||
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "assistant",
|
||||
timestamp: "2026-04-06T01:02:00.000Z",
|
||||
content: [{ type: "text", text: "Retention policy stays at 365 days." }],
|
||||
},
|
||||
}),
|
||||
].join("\n") + "\n",
|
||||
"utf-8",
|
||||
);
|
||||
const dayTwo = new Date("2026-04-06T01:05:00.000Z");
|
||||
await fs.utimes(transcriptPath, dayTwo, dayTwo);
|
||||
|
||||
await beforeAgentReply(
|
||||
{ cleanedBody: "__openclaw_memory_core_light_sleep__" },
|
||||
{ trigger: "heartbeat", workspaceDir },
|
||||
);
|
||||
} finally {
|
||||
vi.unstubAllEnvs();
|
||||
}
|
||||
|
||||
const ranked = await rankShortTermPromotionCandidates({
|
||||
workspaceDir,
|
||||
minScore: 0,
|
||||
minRecallCount: 0,
|
||||
minUniqueQueries: 0,
|
||||
nowMs: Date.parse("2026-04-06T02:00:00.000Z"),
|
||||
});
|
||||
expect(ranked.map((candidate) => candidate.snippet)).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.stringContaining("Move backups to S3 Glacier."),
|
||||
expect.stringContaining("Retention policy stays at 365 days."),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it("ingests sessions when dreaming is enabled even if memorySearch is disabled", async () => {
|
||||
const workspaceDir = await createTempWorkspace("openclaw-dreaming-phases-");
|
||||
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", path.join(workspaceDir, ".state"));
|
||||
await fs.mkdir(path.join(workspaceDir, "memory"), { recursive: true });
|
||||
const sessionsDir = resolveSessionTranscriptsDirForAgent("main");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
const transcriptPath = path.join(sessionsDir, "dreaming-main.jsonl");
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
timestamp: "2026-04-05T18:01:00.000Z",
|
||||
content: [{ type: "text", text: "Glacier archive migration is now complete." }],
|
||||
},
|
||||
}),
|
||||
].join("\n") + "\n",
|
||||
"utf-8",
|
||||
);
|
||||
const mtime = new Date("2026-04-05T18:05:00.000Z");
|
||||
await fs.utimes(transcriptPath, mtime, mtime);
|
||||
|
||||
const { beforeAgentReply } = createHarness(
|
||||
{
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: workspaceDir,
|
||||
memorySearch: {
|
||||
enabled: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
plugins: {
|
||||
entries: {
|
||||
"memory-core": {
|
||||
config: {
|
||||
dreaming: {
|
||||
enabled: true,
|
||||
phases: {
|
||||
light: {
|
||||
enabled: true,
|
||||
limit: 20,
|
||||
lookbackDays: 7,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
workspaceDir,
|
||||
);
|
||||
|
||||
try {
|
||||
await beforeAgentReply(
|
||||
{ cleanedBody: "__openclaw_memory_core_light_sleep__" },
|
||||
{ trigger: "heartbeat", workspaceDir },
|
||||
);
|
||||
} finally {
|
||||
vi.unstubAllEnvs();
|
||||
}
|
||||
|
||||
const ranked = await rankShortTermPromotionCandidates({
|
||||
workspaceDir,
|
||||
minScore: 0,
|
||||
minRecallCount: 0,
|
||||
minUniqueQueries: 0,
|
||||
nowMs: Date.parse("2026-04-05T19:00:00.000Z"),
|
||||
});
|
||||
expect(ranked.map((candidate) => candidate.snippet)).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.stringContaining("Glacier archive migration is now complete."),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps section context when chunking durable daily notes", async () => {
|
||||
const workspaceDir = await createDreamingWorkspace();
|
||||
await fs.writeFile(
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import type { Dirent } from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import type { OpenClawConfig, OpenClawPluginApi } from "openclaw/plugin-sdk/memory-core";
|
||||
import {
|
||||
buildSessionEntry,
|
||||
listSessionFilesForAgent,
|
||||
parseUsageCountedSessionIdFromFileName,
|
||||
sessionPathForFile,
|
||||
} from "openclaw/plugin-sdk/memory-core-host-engine-qmd";
|
||||
import type { MemorySearchResult } from "openclaw/plugin-sdk/memory-core-host-runtime-files";
|
||||
@@ -101,6 +104,8 @@ const SESSION_INGESTION_MIN_SNIPPET_CHARS = 12;
|
||||
const SESSION_INGESTION_MAX_MESSAGES_PER_SWEEP = 240;
|
||||
const SESSION_INGESTION_MAX_MESSAGES_PER_FILE = 80;
|
||||
const SESSION_INGESTION_MIN_MESSAGES_PER_FILE = 12;
|
||||
const SESSION_INGESTION_MAX_TRACKED_MESSAGES_PER_SESSION = 4096;
|
||||
const SESSION_INGESTION_MAX_TRACKED_SCOPES = 2048;
|
||||
const GENERIC_DAY_HEADING_RE =
|
||||
/^(?:(?:mon|monday|tue|tues|tuesday|wed|wednesday|thu|thur|thurs|thursday|fri|friday|sat|saturday|sun|sunday)(?:,\s+)?)?(?:(?:jan|january|feb|february|mar|march|apr|april|may|jun|june|jul|july|aug|august|sep|sept|september|oct|october|nov|november|dec|december)\s+\d{1,2}(?:st|nd|rd|th)?(?:,\s*\d{4})?|\d{1,2}[/-]\d{1,2}(?:[/-]\d{2,4})?|\d{4}[/-]\d{2}[/-]\d{2})$/i;
|
||||
const MANAGED_DAILY_DREAMING_BLOCKS = [
|
||||
@@ -631,13 +636,13 @@ async function writeDailyIngestionState(
|
||||
type SessionIngestionFileState = {
|
||||
mtimeMs: number;
|
||||
size: number;
|
||||
lineCount: number;
|
||||
lastJsonlLine: number;
|
||||
contentHash: string;
|
||||
};
|
||||
|
||||
type SessionIngestionState = {
|
||||
version: 1;
|
||||
version: 2;
|
||||
files: Record<string, SessionIngestionFileState>;
|
||||
seenMessages: Record<string, string[]>;
|
||||
};
|
||||
|
||||
type SessionIngestionMessage = {
|
||||
@@ -664,39 +669,44 @@ function resolveSessionIngestionStatePath(workspaceDir: string): string {
|
||||
function normalizeSessionIngestionState(raw: unknown): SessionIngestionState {
|
||||
const record = asRecord(raw);
|
||||
const filesRaw = asRecord(record?.files);
|
||||
if (!filesRaw) {
|
||||
return { version: 1, files: {} };
|
||||
}
|
||||
const files: Record<string, SessionIngestionFileState> = {};
|
||||
for (const [key, value] of Object.entries(filesRaw)) {
|
||||
const file = asRecord(value);
|
||||
if (!file || key.trim().length === 0) {
|
||||
continue;
|
||||
if (filesRaw) {
|
||||
for (const [key, value] of Object.entries(filesRaw)) {
|
||||
const file = asRecord(value);
|
||||
if (!file || key.trim().length === 0) {
|
||||
continue;
|
||||
}
|
||||
const mtimeMs = Number(file.mtimeMs);
|
||||
const size = Number(file.size);
|
||||
if (!Number.isFinite(mtimeMs) || mtimeMs < 0 || !Number.isFinite(size) || size < 0) {
|
||||
continue;
|
||||
}
|
||||
files[key] = {
|
||||
mtimeMs: Math.floor(mtimeMs),
|
||||
size: Math.floor(size),
|
||||
contentHash: typeof file.contentHash === "string" ? file.contentHash.trim() : "",
|
||||
};
|
||||
}
|
||||
const mtimeMs = Number(file.mtimeMs);
|
||||
const size = Number(file.size);
|
||||
const lineCount = Number(file.lineCount);
|
||||
const lastJsonlLine = Number(file.lastJsonlLine);
|
||||
if (
|
||||
!Number.isFinite(mtimeMs) ||
|
||||
mtimeMs < 0 ||
|
||||
!Number.isFinite(size) ||
|
||||
size < 0 ||
|
||||
!Number.isFinite(lineCount) ||
|
||||
lineCount < 0 ||
|
||||
!Number.isFinite(lastJsonlLine) ||
|
||||
lastJsonlLine < 0
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
files[key] = {
|
||||
mtimeMs: Math.floor(mtimeMs),
|
||||
size: Math.floor(size),
|
||||
lineCount: Math.floor(lineCount),
|
||||
lastJsonlLine: Math.floor(lastJsonlLine),
|
||||
};
|
||||
}
|
||||
return { version: 1, files };
|
||||
const seenMessagesRaw = asRecord(record?.seenMessages);
|
||||
const seenMessages: Record<string, string[]> = {};
|
||||
if (seenMessagesRaw) {
|
||||
for (const [scope, value] of Object.entries(seenMessagesRaw)) {
|
||||
if (scope.trim().length === 0 || !Array.isArray(value)) {
|
||||
continue;
|
||||
}
|
||||
const unique = [
|
||||
...new Set(value.filter((entry): entry is string => typeof entry === "string")),
|
||||
]
|
||||
.map((entry) => entry.trim())
|
||||
.filter(Boolean)
|
||||
.slice(-SESSION_INGESTION_MAX_TRACKED_MESSAGES_PER_SESSION);
|
||||
if (unique.length > 0) {
|
||||
seenMessages[scope] = unique;
|
||||
}
|
||||
}
|
||||
}
|
||||
return { version: 2, files, seenMessages };
|
||||
}
|
||||
|
||||
async function readSessionIngestionState(workspaceDir: string): Promise<SessionIngestionState> {
|
||||
@@ -707,7 +717,7 @@ async function readSessionIngestionState(workspaceDir: string): Promise<SessionI
|
||||
} catch (err) {
|
||||
const code = (err as NodeJS.ErrnoException)?.code;
|
||||
if (code === "ENOENT" || err instanceof SyntaxError) {
|
||||
return { version: 1, files: {} };
|
||||
return { version: 2, files: {}, seenMessages: {} };
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
@@ -724,73 +734,81 @@ async function writeSessionIngestionState(
|
||||
await fs.rename(tmpPath, statePath);
|
||||
}
|
||||
|
||||
function normalizeSessionText(value: string): string {
|
||||
return value
|
||||
.replace(/\s*\n+\s*/g, " ")
|
||||
.replace(/\s+/g, " ")
|
||||
.trim();
|
||||
function trimTrackedSessionScopes(
|
||||
seenMessages: Record<string, string[]>,
|
||||
): Record<string, string[]> {
|
||||
const keys = Object.keys(seenMessages);
|
||||
if (keys.length <= SESSION_INGESTION_MAX_TRACKED_SCOPES) {
|
||||
return seenMessages;
|
||||
}
|
||||
const keep = new Set(keys.toSorted().slice(-SESSION_INGESTION_MAX_TRACKED_SCOPES));
|
||||
const next: Record<string, string[]> = {};
|
||||
for (const [scope, hashes] of Object.entries(seenMessages)) {
|
||||
if (keep.has(scope)) {
|
||||
next[scope] = hashes;
|
||||
}
|
||||
}
|
||||
return next;
|
||||
}
|
||||
|
||||
function extractSessionMessageText(content: unknown): string | null {
|
||||
if (typeof content === "string") {
|
||||
const normalized = normalizeSessionText(content);
|
||||
return normalized.length > 0 ? normalized : null;
|
||||
}
|
||||
if (!Array.isArray(content)) {
|
||||
return null;
|
||||
}
|
||||
const parts: string[] = [];
|
||||
for (const block of content) {
|
||||
const record = asRecord(block);
|
||||
if (!record || record.type !== "text" || typeof record.text !== "string") {
|
||||
continue;
|
||||
}
|
||||
const normalized = normalizeSessionText(record.text);
|
||||
if (normalized.length > 0) {
|
||||
parts.push(normalized);
|
||||
}
|
||||
}
|
||||
if (parts.length === 0) {
|
||||
return null;
|
||||
}
|
||||
return parts.join(" ");
|
||||
function normalizeSessionCorpusSnippet(value: string): string {
|
||||
return value.replace(/\s+/g, " ").trim().slice(0, SESSION_INGESTION_MAX_SNIPPET_CHARS);
|
||||
}
|
||||
|
||||
function parseSessionTimestampMs(record: Record<string, unknown>): number | null {
|
||||
const candidates = [record.timestamp, asRecord(record.message)?.timestamp];
|
||||
for (const value of candidates) {
|
||||
if (typeof value === "number" && Number.isFinite(value)) {
|
||||
const ms = value > 0 && value < 1e11 ? value * 1000 : value;
|
||||
if (Number.isFinite(ms) && ms > 0) {
|
||||
return ms;
|
||||
}
|
||||
}
|
||||
if (typeof value === "string") {
|
||||
const parsed = Date.parse(value);
|
||||
if (Number.isFinite(parsed)) {
|
||||
return parsed;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
function hashSessionMessageId(value: string): string {
|
||||
return createHash("sha1").update(value).digest("hex");
|
||||
}
|
||||
|
||||
function formatSessionSnippet(role: "user" | "assistant", text: string): string {
|
||||
const label = role === "user" ? "User" : "Assistant";
|
||||
const normalized = normalizeSessionText(text);
|
||||
if (!normalized) {
|
||||
return "";
|
||||
function buildSessionScopeKey(agentId: string, absolutePath: string): string {
|
||||
const fileName = path.basename(absolutePath);
|
||||
const logicalSessionId = parseUsageCountedSessionIdFromFileName(fileName) ?? fileName;
|
||||
return `${agentId}:${logicalSessionId}`;
|
||||
}
|
||||
|
||||
function mergeTrackedMessageHashes(existing: string[], additions: string[]): string[] {
|
||||
if (additions.length === 0) {
|
||||
return existing;
|
||||
}
|
||||
return `${label}: ${normalized}`
|
||||
.slice(0, SESSION_INGESTION_MAX_SNIPPET_CHARS)
|
||||
.replace(/\s+/g, " ")
|
||||
.trim();
|
||||
const seen = new Set(existing);
|
||||
const next = existing.slice();
|
||||
for (const hash of additions) {
|
||||
if (!seen.has(hash)) {
|
||||
seen.add(hash);
|
||||
next.push(hash);
|
||||
}
|
||||
}
|
||||
if (next.length <= SESSION_INGESTION_MAX_TRACKED_MESSAGES_PER_SESSION) {
|
||||
return next;
|
||||
}
|
||||
return next.slice(-SESSION_INGESTION_MAX_TRACKED_MESSAGES_PER_SESSION);
|
||||
}
|
||||
|
||||
function areStringArraysEqual(a: string[], b: string[]): boolean {
|
||||
if (a.length !== b.length) {
|
||||
return false;
|
||||
}
|
||||
for (let index = 0; index < a.length; index += 1) {
|
||||
if (a[index] !== b[index]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
function buildSessionStateKey(agentId: string, absolutePath: string): string {
|
||||
return `${agentId}:${sessionPathForFile(absolutePath)}`;
|
||||
}
|
||||
|
||||
function buildSessionRenderedLine(params: {
|
||||
agentId: string;
|
||||
sessionPath: string;
|
||||
lineNumber: number;
|
||||
snippet: string;
|
||||
}): string {
|
||||
const source = `${params.agentId}/${params.sessionPath}#L${params.lineNumber}`;
|
||||
return `[${source}] ${params.snippet}`.slice(0, SESSION_INGESTION_MAX_SNIPPET_CHARS + 64);
|
||||
}
|
||||
|
||||
function resolveSessionAgentsForWorkspace(
|
||||
cfg: OpenClawConfig | undefined,
|
||||
workspaceDir: string,
|
||||
@@ -865,14 +883,17 @@ async function collectSessionIngestionBatches(params: {
|
||||
if (!params.cfg) {
|
||||
return {
|
||||
batches: [],
|
||||
nextState: { version: 1, files: {} },
|
||||
changed: Object.keys(params.state.files).length > 0,
|
||||
nextState: { version: 2, files: {}, seenMessages: {} },
|
||||
changed:
|
||||
Object.keys(params.state.files).length > 0 ||
|
||||
Object.keys(params.state.seenMessages).length > 0,
|
||||
};
|
||||
}
|
||||
const agentIds = resolveSessionAgentsForWorkspace(params.cfg, params.workspaceDir);
|
||||
const cutoffMs = calculateLookbackCutoffMs(params.nowMs, params.lookbackDays);
|
||||
const batchByDay = new Map<string, SessionIngestionMessage[]>();
|
||||
const nextFiles: Record<string, SessionIngestionFileState> = {};
|
||||
const nextSeenMessages: Record<string, string[]> = { ...params.state.seenMessages };
|
||||
let changed = false;
|
||||
|
||||
const sessionFiles: Array<{ agentId: string; absolutePath: string; sessionPath: string }> = [];
|
||||
@@ -926,101 +947,95 @@ async function collectSessionIngestionBatches(params: {
|
||||
mtimeMs: Math.floor(Math.max(0, stat.mtimeMs)),
|
||||
size: Math.floor(Math.max(0, stat.size)),
|
||||
};
|
||||
const cursorAtEnd = previous && previous.lastJsonlLine >= previous.lineCount;
|
||||
const unchanged =
|
||||
Boolean(previous) &&
|
||||
previous.mtimeMs === fingerprint.mtimeMs &&
|
||||
previous.size === fingerprint.size &&
|
||||
cursorAtEnd;
|
||||
previous.contentHash.length > 0;
|
||||
if (unchanged) {
|
||||
nextFiles[stateKey] = previous!;
|
||||
continue;
|
||||
}
|
||||
|
||||
const raw = await fs.readFile(file.absolutePath, "utf-8").catch((err: unknown) => {
|
||||
if ((err as NodeJS.ErrnoException)?.code === "ENOENT") {
|
||||
return "";
|
||||
}
|
||||
throw err;
|
||||
});
|
||||
const lines = raw.split(/\r?\n/);
|
||||
const resetCursor =
|
||||
!previous ||
|
||||
fingerprint.size < previous.size ||
|
||||
fingerprint.mtimeMs < previous.mtimeMs ||
|
||||
previous.lastJsonlLine > lines.length;
|
||||
let cursor = resetCursor ? 0 : previous.lastJsonlLine;
|
||||
const entry = await buildSessionEntry(file.absolutePath);
|
||||
if (!entry) {
|
||||
continue;
|
||||
}
|
||||
const contentHash = entry.hash.trim();
|
||||
if (
|
||||
previous &&
|
||||
previous.mtimeMs === fingerprint.mtimeMs &&
|
||||
previous.size === fingerprint.size &&
|
||||
previous.contentHash === contentHash
|
||||
) {
|
||||
nextFiles[stateKey] = previous;
|
||||
continue;
|
||||
}
|
||||
|
||||
const sessionScope = buildSessionScopeKey(file.agentId, file.absolutePath);
|
||||
const previousSeen = nextSeenMessages[sessionScope] ?? [];
|
||||
let seenSet = new Set(previousSeen);
|
||||
const newSeenHashes: string[] = [];
|
||||
|
||||
const lines = entry.content.length > 0 ? entry.content.split("\n") : [];
|
||||
const day = formatMemoryDreamingDay(fingerprint.mtimeMs, params.timezone);
|
||||
if (!isDayWithinLookback(day, cutoffMs)) {
|
||||
nextFiles[stateKey] = {
|
||||
mtimeMs: fingerprint.mtimeMs,
|
||||
size: fingerprint.size,
|
||||
contentHash,
|
||||
};
|
||||
continue;
|
||||
}
|
||||
|
||||
const fileCap = Math.max(1, Math.min(perFileCap, remaining));
|
||||
let fileCount = 0;
|
||||
let lastProcessedLine = cursor;
|
||||
|
||||
for (let index = cursor; index < lines.length; index += 1) {
|
||||
for (let index = 0; index < lines.length; index += 1) {
|
||||
if (fileCount >= fileCap || remaining <= 0) {
|
||||
break;
|
||||
}
|
||||
const rawLine = lines[index] ?? "";
|
||||
const lineNumber = index + 1;
|
||||
lastProcessedLine = lineNumber;
|
||||
if (!rawLine.trim()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(rawLine);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
const record = asRecord(parsed);
|
||||
if (!record || record.type !== "message") {
|
||||
continue;
|
||||
}
|
||||
const message = asRecord(record.message);
|
||||
if (!message) {
|
||||
continue;
|
||||
}
|
||||
const role = message.role === "user" || message.role === "assistant" ? message.role : null;
|
||||
if (!role) {
|
||||
continue;
|
||||
}
|
||||
const text = extractSessionMessageText(message.content);
|
||||
if (!text) {
|
||||
continue;
|
||||
}
|
||||
const snippet = formatSessionSnippet(role, text);
|
||||
const rawSnippet = lines[index] ?? "";
|
||||
const snippet = normalizeSessionCorpusSnippet(rawSnippet);
|
||||
if (snippet.length < SESSION_INGESTION_MIN_SNIPPET_CHARS) {
|
||||
continue;
|
||||
}
|
||||
const timestampMs = parseSessionTimestampMs(record) ?? parseSessionTimestampMs(message);
|
||||
const day = formatMemoryDreamingDay(timestampMs ?? fingerprint.mtimeMs, params.timezone);
|
||||
if (!isDayWithinLookback(day, cutoffMs)) {
|
||||
const lineNumber = entry.lineMap[index] ?? index + 1;
|
||||
const messageHash = hashSessionMessageId(
|
||||
`${file.agentId}\n${sessionScope}\n${lineNumber}\n${snippet}`,
|
||||
);
|
||||
if (seenSet.has(messageHash)) {
|
||||
continue;
|
||||
}
|
||||
const source = `${file.agentId}/${file.sessionPath}#L${lineNumber}`;
|
||||
const rendered = `[${source}] ${snippet}`.slice(0, SESSION_INGESTION_MAX_SNIPPET_CHARS + 64);
|
||||
const rendered = buildSessionRenderedLine({
|
||||
agentId: file.agentId,
|
||||
sessionPath: file.sessionPath,
|
||||
lineNumber,
|
||||
snippet,
|
||||
});
|
||||
const bucket = batchByDay.get(day) ?? [];
|
||||
bucket.push({ day, snippet, rendered });
|
||||
batchByDay.set(day, bucket);
|
||||
seenSet.add(messageHash);
|
||||
newSeenHashes.push(messageHash);
|
||||
fileCount += 1;
|
||||
remaining -= 1;
|
||||
}
|
||||
|
||||
if (lastProcessedLine < cursor) {
|
||||
lastProcessedLine = cursor;
|
||||
}
|
||||
cursor = Math.max(0, Math.min(lastProcessedLine, lines.length));
|
||||
nextFiles[stateKey] = {
|
||||
mtimeMs: fingerprint.mtimeMs,
|
||||
size: fingerprint.size,
|
||||
lineCount: lines.length,
|
||||
lastJsonlLine: cursor,
|
||||
contentHash,
|
||||
};
|
||||
const mergedSeen = mergeTrackedMessageHashes(previousSeen, newSeenHashes);
|
||||
nextSeenMessages[sessionScope] = mergedSeen;
|
||||
if (!areStringArraysEqual(mergedSeen, previousSeen)) {
|
||||
changed = true;
|
||||
}
|
||||
if (
|
||||
!previous ||
|
||||
previous.mtimeMs !== fingerprint.mtimeMs ||
|
||||
previous.size !== fingerprint.size ||
|
||||
previous.lineCount !== lines.length ||
|
||||
previous.lastJsonlLine !== cursor
|
||||
previous.contentHash !== contentHash
|
||||
) {
|
||||
changed = true;
|
||||
}
|
||||
@@ -1032,17 +1047,32 @@ async function collectSessionIngestionBatches(params: {
|
||||
continue;
|
||||
}
|
||||
const next = nextFiles[key];
|
||||
if (!next || next.mtimeMs !== state.mtimeMs || next.size !== state.size) {
|
||||
changed = true;
|
||||
}
|
||||
if (
|
||||
!next ||
|
||||
next.mtimeMs !== state.mtimeMs ||
|
||||
next.size !== state.size ||
|
||||
next.lineCount !== state.lineCount ||
|
||||
next.lastJsonlLine !== state.lastJsonlLine
|
||||
next &&
|
||||
typeof state.contentHash === "string" &&
|
||||
state.contentHash.trim().length > 0 &&
|
||||
next.contentHash !== state.contentHash
|
||||
) {
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
||||
const trimmedSeenMessages = trimTrackedSessionScopes(nextSeenMessages);
|
||||
for (const [scope, hashes] of Object.entries(trimmedSeenMessages)) {
|
||||
const previous = params.state.seenMessages[scope] ?? [];
|
||||
if (!areStringArraysEqual(previous, hashes)) {
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
for (const scope of Object.keys(params.state.seenMessages)) {
|
||||
if (!Object.hasOwn(trimmedSeenMessages, scope)) {
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
||||
const batches: DailyIngestionBatch[] = [];
|
||||
for (const day of [...batchByDay.keys()].toSorted()) {
|
||||
const lines = batchByDay.get(day) ?? [];
|
||||
@@ -1061,7 +1091,7 @@ async function collectSessionIngestionBatches(params: {
|
||||
|
||||
return {
|
||||
batches,
|
||||
nextState: { version: 1, files: nextFiles },
|
||||
nextState: { version: 2, files: nextFiles, seenMessages: trimmedSeenMessages },
|
||||
changed,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -5,21 +5,12 @@ const resolveDefaultAgentId = vi.hoisted(() => vi.fn(() => "main"));
|
||||
const resolveAgentWorkspaceDir = vi.hoisted(() =>
|
||||
vi.fn((_cfg: OpenClawConfig, agentId: string) => `/workspace/${agentId}`),
|
||||
);
|
||||
const resolveMemorySearchConfig = vi.hoisted(() =>
|
||||
vi.fn<(_cfg: OpenClawConfig, _agentId: string) => { enabled: boolean } | null>(() => ({
|
||||
enabled: true,
|
||||
})),
|
||||
);
|
||||
|
||||
vi.mock("../agents/agent-scope.js", () => ({
|
||||
resolveDefaultAgentId,
|
||||
resolveAgentWorkspaceDir,
|
||||
}));
|
||||
|
||||
vi.mock("../agents/memory-search.js", () => ({
|
||||
resolveMemorySearchConfig,
|
||||
}));
|
||||
|
||||
import {
|
||||
formatMemoryDreamingDay,
|
||||
isSameMemoryDreamingDay,
|
||||
@@ -114,10 +105,7 @@ describe("memory dreaming host helpers", () => {
|
||||
expect(resolved.phases.rem.cron).toBe("15 */8 * * *");
|
||||
});
|
||||
|
||||
it("dedupes shared workspaces and skips agents without memory search", () => {
|
||||
resolveMemorySearchConfig.mockImplementation((_cfg: OpenClawConfig, agentId: string) =>
|
||||
agentId === "beta" ? null : { enabled: true },
|
||||
);
|
||||
it("dedupes shared workspaces across all configured agents", () => {
|
||||
resolveAgentWorkspaceDir.mockImplementation((_cfg: OpenClawConfig, agentId: string) => {
|
||||
if (agentId === "alpha") {
|
||||
return "/workspace/shared";
|
||||
@@ -139,6 +127,10 @@ describe("memory dreaming host helpers", () => {
|
||||
workspaceDir: "/workspace/shared",
|
||||
agentIds: ["alpha", "gamma"],
|
||||
},
|
||||
{
|
||||
workspaceDir: "/workspace/beta",
|
||||
agentIds: ["beta"],
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import path from "node:path";
|
||||
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js";
|
||||
import { resolveMemorySearchConfig } from "../agents/memory-search.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { asNullableRecord } from "../shared/record-coerce.js";
|
||||
|
||||
@@ -587,9 +586,6 @@ export function resolveMemoryDreamingWorkspaces(cfg: OpenClawConfig): MemoryDrea
|
||||
|
||||
const byWorkspace = new Map<string, MemoryDreamingWorkspace>();
|
||||
for (const agentId of agentIds) {
|
||||
if (!resolveMemorySearchConfig(cfg, agentId)) {
|
||||
continue;
|
||||
}
|
||||
const workspaceDir = resolveAgentWorkspaceDir(cfg, agentId)?.trim();
|
||||
if (!workspaceDir) {
|
||||
continue;
|
||||
|
||||
@@ -7,6 +7,7 @@ export {
|
||||
sessionPathForFile,
|
||||
type SessionFileEntry,
|
||||
} from "./host/session-files.js";
|
||||
export { parseUsageCountedSessionIdFromFileName } from "../config/sessions/artifacts.js";
|
||||
export { parseQmdQueryJson, type QmdQueryResult } from "./host/qmd-query-parser.js";
|
||||
export {
|
||||
deriveQmdScopeChannel,
|
||||
|
||||
Reference in New Issue
Block a user