mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-20 21:23:23 +00:00
memory-core: checkpoint session transcript dreaming ingestion
This commit is contained in:
committed by
Vignesh
parent
f0ba7b95da
commit
695176542f
@@ -1,6 +1,7 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import type { OpenClawConfig, OpenClawPluginApi } from "openclaw/plugin-sdk/memory-core";
|
||||
import { resolveSessionTranscriptsDirForAgent } from "openclaw/plugin-sdk/memory-core";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { registerMemoryDreamingPhases } from "./dreaming-phases.js";
|
||||
import {
|
||||
@@ -319,6 +320,124 @@ describe("memory-core dreaming phases", () => {
|
||||
expect(after[0]?.snippet).toContain("Keep retention at 365 days.");
|
||||
});
|
||||
|
||||
it("checkpoints session transcript ingestion and skips unchanged transcripts", async () => {
|
||||
const workspaceDir = await createTempWorkspace("openclaw-dreaming-phases-");
|
||||
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", path.join(workspaceDir, ".state"));
|
||||
await fs.mkdir(path.join(workspaceDir, "memory"), { recursive: true });
|
||||
const sessionsDir = resolveSessionTranscriptsDirForAgent("main");
|
||||
await fs.mkdir(sessionsDir, { recursive: true });
|
||||
const transcriptPath = path.join(sessionsDir, "dreaming-main.jsonl");
|
||||
await fs.writeFile(
|
||||
transcriptPath,
|
||||
[
|
||||
JSON.stringify({
|
||||
type: "session",
|
||||
id: "dreaming-main",
|
||||
timestamp: "2026-04-05T18:00:00.000Z",
|
||||
}),
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "user",
|
||||
timestamp: "2026-04-05T18:01:00.000Z",
|
||||
content: [{ type: "text", text: "Move backups to S3 Glacier." }],
|
||||
},
|
||||
}),
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: {
|
||||
role: "assistant",
|
||||
timestamp: "2026-04-05T18:02:00.000Z",
|
||||
content: [{ type: "text", text: "Set retention to 365 days." }],
|
||||
},
|
||||
}),
|
||||
].join("\n") + "\n",
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
const { beforeAgentReply } = createHarness(
|
||||
{
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: workspaceDir,
|
||||
memorySearch: {
|
||||
enabled: true,
|
||||
sources: ["memory", "sessions"],
|
||||
experimental: {
|
||||
sessionMemory: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
plugins: {
|
||||
entries: {
|
||||
"memory-core": {
|
||||
config: {
|
||||
dreaming: {
|
||||
enabled: true,
|
||||
phases: {
|
||||
light: {
|
||||
enabled: true,
|
||||
limit: 20,
|
||||
lookbackDays: 7,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
workspaceDir,
|
||||
);
|
||||
|
||||
const readSpy = vi.spyOn(fs, "readFile");
|
||||
let transcriptReadCount = 0;
|
||||
try {
|
||||
await beforeAgentReply(
|
||||
{ cleanedBody: "__openclaw_memory_core_light_sleep__" },
|
||||
{ trigger: "heartbeat", workspaceDir },
|
||||
);
|
||||
await beforeAgentReply(
|
||||
{ cleanedBody: "__openclaw_memory_core_light_sleep__" },
|
||||
{ trigger: "heartbeat", workspaceDir },
|
||||
);
|
||||
} finally {
|
||||
transcriptReadCount = readSpy.mock.calls.filter(
|
||||
([target]) => String(target) === transcriptPath,
|
||||
).length;
|
||||
readSpy.mockRestore();
|
||||
vi.unstubAllEnvs();
|
||||
}
|
||||
|
||||
expect(transcriptReadCount).toBeLessThanOrEqual(1);
|
||||
|
||||
await expect(
|
||||
fs.access(path.join(workspaceDir, "memory", ".dreams", "session-ingestion.json")),
|
||||
).resolves.toBeUndefined();
|
||||
await expect(
|
||||
fs.access(path.join(workspaceDir, "memory", ".dreams", "session-corpus", "2026-04-05.txt")),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
const ranked = await rankShortTermPromotionCandidates({
|
||||
workspaceDir,
|
||||
minScore: 0,
|
||||
minRecallCount: 0,
|
||||
minUniqueQueries: 0,
|
||||
nowMs: Date.parse("2026-04-05T19:00:00.000Z"),
|
||||
});
|
||||
expect(ranked.map((candidate) => candidate.path)).toContain(
|
||||
"memory/.dreams/session-corpus/2026-04-05.txt",
|
||||
);
|
||||
expect(ranked.map((candidate) => candidate.snippet)).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.stringContaining("Move backups to S3 Glacier."),
|
||||
expect.stringContaining("Set retention to 365 days."),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps section context when chunking durable daily notes", async () => {
|
||||
const workspaceDir = await createDreamingWorkspace();
|
||||
await fs.writeFile(
|
||||
|
||||
@@ -2,8 +2,14 @@ import type { Dirent } from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import type { OpenClawConfig, OpenClawPluginApi } from "openclaw/plugin-sdk/memory-core";
|
||||
import { resolveMemorySearchConfig } from "openclaw/plugin-sdk/memory-core";
|
||||
import {
|
||||
listSessionFilesForAgent,
|
||||
sessionPathForFile,
|
||||
} from "openclaw/plugin-sdk/memory-core-host-engine-qmd";
|
||||
import type { MemorySearchResult } from "openclaw/plugin-sdk/memory-core-host-runtime-files";
|
||||
import {
|
||||
formatMemoryDreamingDay,
|
||||
resolveMemoryCorePluginConfig,
|
||||
resolveMemoryDreamingWorkspaces,
|
||||
resolveMemoryLightDreamingConfig,
|
||||
@@ -84,6 +90,18 @@ const DAILY_INGESTION_SCORE = 0.62;
|
||||
const DAILY_INGESTION_MAX_SNIPPET_CHARS = 280;
|
||||
const DAILY_INGESTION_MIN_SNIPPET_CHARS = 8;
|
||||
const DAILY_INGESTION_MAX_CHUNK_LINES = 4;
|
||||
const SESSION_INGESTION_STATE_RELATIVE_PATH = path.join(
|
||||
"memory",
|
||||
".dreams",
|
||||
"session-ingestion.json",
|
||||
);
|
||||
const SESSION_CORPUS_RELATIVE_DIR = path.join("memory", ".dreams", "session-corpus");
|
||||
const SESSION_INGESTION_SCORE = 0.58;
|
||||
const SESSION_INGESTION_MAX_SNIPPET_CHARS = 280;
|
||||
const SESSION_INGESTION_MIN_SNIPPET_CHARS = 12;
|
||||
const SESSION_INGESTION_MAX_MESSAGES_PER_SWEEP = 240;
|
||||
const SESSION_INGESTION_MAX_MESSAGES_PER_FILE = 80;
|
||||
const SESSION_INGESTION_MIN_MESSAGES_PER_FILE = 12;
|
||||
const GENERIC_DAY_HEADING_RE =
|
||||
/^(?:(?:mon|monday|tue|tues|tuesday|wed|wednesday|thu|thur|thurs|thursday|fri|friday|sat|saturday|sun|sunday)(?:,\s+)?)?(?:(?:jan|january|feb|february|mar|march|apr|april|may|jun|june|jul|july|aug|august|sep|sept|september|oct|october|nov|november|dec|december)\s+\d{1,2}(?:st|nd|rd|th)?(?:,\s*\d{4})?|\d{1,2}[/-]\d{1,2}(?:[/-]\d{2,4})?|\d{4}[/-]\d{2}[/-]\d{2})$/i;
|
||||
const MANAGED_DAILY_DREAMING_BLOCKS = [
|
||||
@@ -611,6 +629,485 @@ async function writeDailyIngestionState(
|
||||
await fs.rename(tmpPath, statePath);
|
||||
}
|
||||
|
||||
type SessionIngestionFileState = {
|
||||
mtimeMs: number;
|
||||
size: number;
|
||||
lineCount: number;
|
||||
lastJsonlLine: number;
|
||||
};
|
||||
|
||||
type SessionIngestionState = {
|
||||
version: 1;
|
||||
files: Record<string, SessionIngestionFileState>;
|
||||
};
|
||||
|
||||
type SessionIngestionMessage = {
|
||||
day: string;
|
||||
snippet: string;
|
||||
rendered: string;
|
||||
};
|
||||
|
||||
type SessionIngestionCollectionResult = {
|
||||
batches: DailyIngestionBatch[];
|
||||
nextState: SessionIngestionState;
|
||||
changed: boolean;
|
||||
};
|
||||
|
||||
function normalizeWorkspaceKey(workspaceDir: string): string {
|
||||
const resolved = path.resolve(workspaceDir).replace(/\\/g, "/");
|
||||
return process.platform === "win32" ? resolved.toLowerCase() : resolved;
|
||||
}
|
||||
|
||||
function resolveSessionIngestionStatePath(workspaceDir: string): string {
|
||||
return path.join(workspaceDir, SESSION_INGESTION_STATE_RELATIVE_PATH);
|
||||
}
|
||||
|
||||
function normalizeSessionIngestionState(raw: unknown): SessionIngestionState {
|
||||
const record = asRecord(raw);
|
||||
const filesRaw = asRecord(record?.files);
|
||||
if (!filesRaw) {
|
||||
return { version: 1, files: {} };
|
||||
}
|
||||
const files: Record<string, SessionIngestionFileState> = {};
|
||||
for (const [key, value] of Object.entries(filesRaw)) {
|
||||
const file = asRecord(value);
|
||||
if (!file || key.trim().length === 0) {
|
||||
continue;
|
||||
}
|
||||
const mtimeMs = Number(file.mtimeMs);
|
||||
const size = Number(file.size);
|
||||
const lineCount = Number(file.lineCount);
|
||||
const lastJsonlLine = Number(file.lastJsonlLine);
|
||||
if (
|
||||
!Number.isFinite(mtimeMs) ||
|
||||
mtimeMs < 0 ||
|
||||
!Number.isFinite(size) ||
|
||||
size < 0 ||
|
||||
!Number.isFinite(lineCount) ||
|
||||
lineCount < 0 ||
|
||||
!Number.isFinite(lastJsonlLine) ||
|
||||
lastJsonlLine < 0
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
files[key] = {
|
||||
mtimeMs: Math.floor(mtimeMs),
|
||||
size: Math.floor(size),
|
||||
lineCount: Math.floor(lineCount),
|
||||
lastJsonlLine: Math.floor(lastJsonlLine),
|
||||
};
|
||||
}
|
||||
return { version: 1, files };
|
||||
}
|
||||
|
||||
async function readSessionIngestionState(workspaceDir: string): Promise<SessionIngestionState> {
|
||||
const statePath = resolveSessionIngestionStatePath(workspaceDir);
|
||||
try {
|
||||
const raw = await fs.readFile(statePath, "utf-8");
|
||||
return normalizeSessionIngestionState(JSON.parse(raw) as unknown);
|
||||
} catch (err) {
|
||||
const code = (err as NodeJS.ErrnoException)?.code;
|
||||
if (code === "ENOENT" || err instanceof SyntaxError) {
|
||||
return { version: 1, files: {} };
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
async function writeSessionIngestionState(
|
||||
workspaceDir: string,
|
||||
state: SessionIngestionState,
|
||||
): Promise<void> {
|
||||
const statePath = resolveSessionIngestionStatePath(workspaceDir);
|
||||
await fs.mkdir(path.dirname(statePath), { recursive: true });
|
||||
const tmpPath = `${statePath}.${process.pid}.${Date.now()}.tmp`;
|
||||
await fs.writeFile(tmpPath, `${JSON.stringify(state, null, 2)}\n`, "utf-8");
|
||||
await fs.rename(tmpPath, statePath);
|
||||
}
|
||||
|
||||
function normalizeSessionText(value: string): string {
|
||||
return value
|
||||
.replace(/\s*\n+\s*/g, " ")
|
||||
.replace(/\s+/g, " ")
|
||||
.trim();
|
||||
}
|
||||
|
||||
function extractSessionMessageText(content: unknown): string | null {
|
||||
if (typeof content === "string") {
|
||||
const normalized = normalizeSessionText(content);
|
||||
return normalized.length > 0 ? normalized : null;
|
||||
}
|
||||
if (!Array.isArray(content)) {
|
||||
return null;
|
||||
}
|
||||
const parts: string[] = [];
|
||||
for (const block of content) {
|
||||
const record = asRecord(block);
|
||||
if (!record || record.type !== "text" || typeof record.text !== "string") {
|
||||
continue;
|
||||
}
|
||||
const normalized = normalizeSessionText(record.text);
|
||||
if (normalized.length > 0) {
|
||||
parts.push(normalized);
|
||||
}
|
||||
}
|
||||
if (parts.length === 0) {
|
||||
return null;
|
||||
}
|
||||
return parts.join(" ");
|
||||
}
|
||||
|
||||
function parseSessionTimestampMs(record: Record<string, unknown>): number | null {
|
||||
const candidates = [record.timestamp, asRecord(record.message)?.timestamp];
|
||||
for (const value of candidates) {
|
||||
if (typeof value === "number" && Number.isFinite(value)) {
|
||||
const ms = value > 0 && value < 1e11 ? value * 1000 : value;
|
||||
if (Number.isFinite(ms) && ms > 0) {
|
||||
return ms;
|
||||
}
|
||||
}
|
||||
if (typeof value === "string") {
|
||||
const parsed = Date.parse(value);
|
||||
if (Number.isFinite(parsed)) {
|
||||
return parsed;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function formatSessionSnippet(role: "user" | "assistant", text: string): string {
|
||||
const label = role === "user" ? "User" : "Assistant";
|
||||
const normalized = normalizeSessionText(text);
|
||||
if (!normalized) {
|
||||
return "";
|
||||
}
|
||||
return `${label}: ${normalized}`
|
||||
.slice(0, SESSION_INGESTION_MAX_SNIPPET_CHARS)
|
||||
.replace(/\s+/g, " ")
|
||||
.trim();
|
||||
}
|
||||
|
||||
function buildSessionStateKey(agentId: string, absolutePath: string): string {
|
||||
return `${agentId}:${sessionPathForFile(absolutePath)}`;
|
||||
}
|
||||
|
||||
function resolveSessionAgentsForWorkspace(
|
||||
cfg: OpenClawConfig | undefined,
|
||||
workspaceDir: string,
|
||||
): string[] {
|
||||
if (!cfg) {
|
||||
return [];
|
||||
}
|
||||
const target = normalizeWorkspaceKey(workspaceDir);
|
||||
const workspaces = resolveMemoryDreamingWorkspaces(cfg);
|
||||
const match = workspaces.find((entry) => normalizeWorkspaceKey(entry.workspaceDir) === target);
|
||||
if (!match) {
|
||||
return [];
|
||||
}
|
||||
return match.agentIds
|
||||
.filter((agentId, index, all) => agentId.trim().length > 0 && all.indexOf(agentId) === index)
|
||||
.toSorted();
|
||||
}
|
||||
|
||||
async function appendSessionCorpusLines(params: {
|
||||
workspaceDir: string;
|
||||
day: string;
|
||||
lines: SessionIngestionMessage[];
|
||||
}): Promise<MemorySearchResult[]> {
|
||||
if (params.lines.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const relativePath = path.posix.join("memory", ".dreams", "session-corpus", `${params.day}.txt`);
|
||||
const absolutePath = path.join(
|
||||
params.workspaceDir,
|
||||
SESSION_CORPUS_RELATIVE_DIR,
|
||||
`${params.day}.txt`,
|
||||
);
|
||||
await fs.mkdir(path.dirname(absolutePath), { recursive: true });
|
||||
let existing = "";
|
||||
try {
|
||||
existing = await fs.readFile(absolutePath, "utf-8");
|
||||
} catch (err) {
|
||||
if ((err as NodeJS.ErrnoException)?.code !== "ENOENT") {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
const normalizedExisting = existing.replace(/\r\n/g, "\n");
|
||||
const existingLineCount =
|
||||
normalizedExisting.length === 0
|
||||
? 0
|
||||
: normalizedExisting.endsWith("\n")
|
||||
? normalizedExisting.slice(0, -1).split("\n").length
|
||||
: normalizedExisting.split("\n").length;
|
||||
const payload = `${params.lines.map((entry) => entry.rendered).join("\n")}\n`;
|
||||
await fs.appendFile(absolutePath, payload, "utf-8");
|
||||
return params.lines.map((entry, index) => {
|
||||
const lineNumber = existingLineCount + index + 1;
|
||||
return {
|
||||
path: relativePath,
|
||||
startLine: lineNumber,
|
||||
endLine: lineNumber,
|
||||
score: SESSION_INGESTION_SCORE,
|
||||
snippet: entry.snippet,
|
||||
source: "memory",
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
async function collectSessionIngestionBatches(params: {
|
||||
workspaceDir: string;
|
||||
cfg?: OpenClawConfig;
|
||||
lookbackDays: number;
|
||||
nowMs: number;
|
||||
timezone?: string;
|
||||
state: SessionIngestionState;
|
||||
}): Promise<SessionIngestionCollectionResult> {
|
||||
if (!params.cfg) {
|
||||
return {
|
||||
batches: [],
|
||||
nextState: { version: 1, files: {} },
|
||||
changed: Object.keys(params.state.files).length > 0,
|
||||
};
|
||||
}
|
||||
const agentIds = resolveSessionAgentsForWorkspace(params.cfg, params.workspaceDir);
|
||||
const cutoffMs = calculateLookbackCutoffMs(params.nowMs, params.lookbackDays);
|
||||
const batchByDay = new Map<string, SessionIngestionMessage[]>();
|
||||
const nextFiles: Record<string, SessionIngestionFileState> = {};
|
||||
let changed = false;
|
||||
|
||||
const sessionFiles: Array<{ agentId: string; absolutePath: string; sessionPath: string }> = [];
|
||||
for (const agentId of agentIds) {
|
||||
const memorySearch = resolveMemorySearchConfig(params.cfg, agentId);
|
||||
if (
|
||||
!memorySearch?.enabled ||
|
||||
!memorySearch.experimental.sessionMemory ||
|
||||
!memorySearch.sources.includes("sessions")
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
const files = await listSessionFilesForAgent(agentId);
|
||||
for (const absolutePath of files) {
|
||||
sessionFiles.push({
|
||||
agentId,
|
||||
absolutePath,
|
||||
sessionPath: sessionPathForFile(absolutePath),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const sortedFiles = sessionFiles.toSorted((a, b) => {
|
||||
if (a.agentId !== b.agentId) {
|
||||
return a.agentId.localeCompare(b.agentId);
|
||||
}
|
||||
return a.sessionPath.localeCompare(b.sessionPath);
|
||||
});
|
||||
|
||||
const totalCap = SESSION_INGESTION_MAX_MESSAGES_PER_SWEEP;
|
||||
let remaining = totalCap;
|
||||
const perFileCap = Math.min(
|
||||
SESSION_INGESTION_MAX_MESSAGES_PER_FILE,
|
||||
Math.max(
|
||||
SESSION_INGESTION_MIN_MESSAGES_PER_FILE,
|
||||
Math.ceil(totalCap / Math.max(1, sortedFiles.length)),
|
||||
),
|
||||
);
|
||||
|
||||
for (const file of sortedFiles) {
|
||||
if (remaining <= 0) {
|
||||
break;
|
||||
}
|
||||
const stateKey = buildSessionStateKey(file.agentId, file.absolutePath);
|
||||
const previous = params.state.files[stateKey];
|
||||
const stat = await fs.stat(file.absolutePath).catch((err: unknown) => {
|
||||
if ((err as NodeJS.ErrnoException)?.code === "ENOENT") {
|
||||
return null;
|
||||
}
|
||||
throw err;
|
||||
});
|
||||
if (!stat) {
|
||||
if (previous) {
|
||||
changed = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
const fingerprint = {
|
||||
mtimeMs: Math.floor(Math.max(0, stat.mtimeMs)),
|
||||
size: Math.floor(Math.max(0, stat.size)),
|
||||
};
|
||||
const cursorAtEnd = previous && previous.lastJsonlLine >= previous.lineCount;
|
||||
const unchanged =
|
||||
Boolean(previous) &&
|
||||
previous.mtimeMs === fingerprint.mtimeMs &&
|
||||
previous.size === fingerprint.size &&
|
||||
cursorAtEnd;
|
||||
if (unchanged) {
|
||||
nextFiles[stateKey] = previous!;
|
||||
continue;
|
||||
}
|
||||
|
||||
const raw = await fs.readFile(file.absolutePath, "utf-8").catch((err: unknown) => {
|
||||
if ((err as NodeJS.ErrnoException)?.code === "ENOENT") {
|
||||
return "";
|
||||
}
|
||||
throw err;
|
||||
});
|
||||
const lines = raw.split(/\r?\n/);
|
||||
const resetCursor =
|
||||
!previous ||
|
||||
fingerprint.size < previous.size ||
|
||||
fingerprint.mtimeMs < previous.mtimeMs ||
|
||||
previous.lastJsonlLine > lines.length;
|
||||
let cursor = resetCursor ? 0 : previous.lastJsonlLine;
|
||||
const fileCap = Math.max(1, Math.min(perFileCap, remaining));
|
||||
let fileCount = 0;
|
||||
let lastProcessedLine = cursor;
|
||||
|
||||
for (let index = cursor; index < lines.length; index += 1) {
|
||||
if (fileCount >= fileCap || remaining <= 0) {
|
||||
break;
|
||||
}
|
||||
const rawLine = lines[index] ?? "";
|
||||
const lineNumber = index + 1;
|
||||
lastProcessedLine = lineNumber;
|
||||
if (!rawLine.trim()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(rawLine);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
const record = asRecord(parsed);
|
||||
if (!record || record.type !== "message") {
|
||||
continue;
|
||||
}
|
||||
const message = asRecord(record.message);
|
||||
if (!message) {
|
||||
continue;
|
||||
}
|
||||
const role = message.role === "user" || message.role === "assistant" ? message.role : null;
|
||||
if (!role) {
|
||||
continue;
|
||||
}
|
||||
const text = extractSessionMessageText(message.content);
|
||||
if (!text) {
|
||||
continue;
|
||||
}
|
||||
const snippet = formatSessionSnippet(role, text);
|
||||
if (snippet.length < SESSION_INGESTION_MIN_SNIPPET_CHARS) {
|
||||
continue;
|
||||
}
|
||||
const timestampMs = parseSessionTimestampMs(record) ?? parseSessionTimestampMs(message);
|
||||
const day = formatMemoryDreamingDay(timestampMs ?? fingerprint.mtimeMs, params.timezone);
|
||||
if (!isDayWithinLookback(day, cutoffMs)) {
|
||||
continue;
|
||||
}
|
||||
const source = `${file.agentId}/${file.sessionPath}#L${lineNumber}`;
|
||||
const rendered = `[${source}] ${snippet}`.slice(0, SESSION_INGESTION_MAX_SNIPPET_CHARS + 64);
|
||||
const bucket = batchByDay.get(day) ?? [];
|
||||
bucket.push({ day, snippet, rendered });
|
||||
batchByDay.set(day, bucket);
|
||||
fileCount += 1;
|
||||
remaining -= 1;
|
||||
}
|
||||
|
||||
if (lastProcessedLine < cursor) {
|
||||
lastProcessedLine = cursor;
|
||||
}
|
||||
cursor = Math.max(0, Math.min(lastProcessedLine, lines.length));
|
||||
nextFiles[stateKey] = {
|
||||
mtimeMs: fingerprint.mtimeMs,
|
||||
size: fingerprint.size,
|
||||
lineCount: lines.length,
|
||||
lastJsonlLine: cursor,
|
||||
};
|
||||
if (
|
||||
!previous ||
|
||||
previous.mtimeMs !== fingerprint.mtimeMs ||
|
||||
previous.size !== fingerprint.size ||
|
||||
previous.lineCount !== lines.length ||
|
||||
previous.lastJsonlLine !== cursor
|
||||
) {
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
||||
for (const [key, state] of Object.entries(params.state.files)) {
|
||||
if (!Object.hasOwn(nextFiles, key)) {
|
||||
changed = true;
|
||||
continue;
|
||||
}
|
||||
const next = nextFiles[key];
|
||||
if (
|
||||
!next ||
|
||||
next.mtimeMs !== state.mtimeMs ||
|
||||
next.size !== state.size ||
|
||||
next.lineCount !== state.lineCount ||
|
||||
next.lastJsonlLine !== state.lastJsonlLine
|
||||
) {
|
||||
changed = true;
|
||||
}
|
||||
}
|
||||
|
||||
const batches: DailyIngestionBatch[] = [];
|
||||
for (const day of [...batchByDay.keys()].toSorted()) {
|
||||
const lines = batchByDay.get(day) ?? [];
|
||||
if (lines.length === 0) {
|
||||
continue;
|
||||
}
|
||||
const results = await appendSessionCorpusLines({
|
||||
workspaceDir: params.workspaceDir,
|
||||
day,
|
||||
lines,
|
||||
});
|
||||
if (results.length > 0) {
|
||||
batches.push({ day, results });
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
batches,
|
||||
nextState: { version: 1, files: nextFiles },
|
||||
changed,
|
||||
};
|
||||
}
|
||||
|
||||
async function ingestSessionTranscriptSignals(params: {
|
||||
workspaceDir: string;
|
||||
cfg?: OpenClawConfig;
|
||||
lookbackDays: number;
|
||||
nowMs: number;
|
||||
timezone?: string;
|
||||
}): Promise<void> {
|
||||
const state = await readSessionIngestionState(params.workspaceDir);
|
||||
const collected = await collectSessionIngestionBatches({
|
||||
workspaceDir: params.workspaceDir,
|
||||
cfg: params.cfg,
|
||||
lookbackDays: params.lookbackDays,
|
||||
nowMs: params.nowMs,
|
||||
timezone: params.timezone,
|
||||
state,
|
||||
});
|
||||
for (const batch of collected.batches) {
|
||||
await recordShortTermRecalls({
|
||||
workspaceDir: params.workspaceDir,
|
||||
query: `__dreaming_sessions__:${batch.day}`,
|
||||
results: batch.results,
|
||||
signalType: "daily",
|
||||
dedupeByQueryPerDay: true,
|
||||
dayBucket: batch.day,
|
||||
nowMs: params.nowMs,
|
||||
timezone: params.timezone,
|
||||
});
|
||||
}
|
||||
if (collected.changed) {
|
||||
await writeSessionIngestionState(params.workspaceDir, collected.nextState);
|
||||
}
|
||||
}
|
||||
|
||||
type DailyIngestionCollectionResult = {
|
||||
batches: DailyIngestionBatch[];
|
||||
nextState: DailyIngestionState;
|
||||
@@ -982,6 +1479,7 @@ export function previewRemDreaming(params: {
|
||||
|
||||
async function runLightDreaming(params: {
|
||||
workspaceDir: string;
|
||||
cfg?: OpenClawConfig;
|
||||
config: MemoryLightDreamingConfig & {
|
||||
timezone?: string;
|
||||
storage: { mode: "inline" | "separate" | "both"; separateReports: boolean };
|
||||
@@ -999,6 +1497,13 @@ async function runLightDreaming(params: {
|
||||
nowMs,
|
||||
timezone: params.config.timezone,
|
||||
});
|
||||
await ingestSessionTranscriptSignals({
|
||||
workspaceDir: params.workspaceDir,
|
||||
cfg: params.cfg,
|
||||
lookbackDays: params.config.lookbackDays,
|
||||
nowMs,
|
||||
timezone: params.config.timezone,
|
||||
});
|
||||
const entries = dedupeEntries(
|
||||
(await readShortTermRecallEntries({ workspaceDir: params.workspaceDir, nowMs }))
|
||||
.filter((entry) => entryWithinLookback(entry, cutoffMs))
|
||||
@@ -1054,6 +1559,7 @@ async function runLightDreaming(params: {
|
||||
|
||||
async function runRemDreaming(params: {
|
||||
workspaceDir: string;
|
||||
cfg?: OpenClawConfig;
|
||||
config: MemoryRemDreamingConfig & {
|
||||
timezone?: string;
|
||||
storage: { mode: "inline" | "separate" | "both"; separateReports: boolean };
|
||||
@@ -1071,6 +1577,13 @@ async function runRemDreaming(params: {
|
||||
nowMs,
|
||||
timezone: params.config.timezone,
|
||||
});
|
||||
await ingestSessionTranscriptSignals({
|
||||
workspaceDir: params.workspaceDir,
|
||||
cfg: params.cfg,
|
||||
lookbackDays: params.config.lookbackDays,
|
||||
nowMs,
|
||||
timezone: params.config.timezone,
|
||||
});
|
||||
const entries = (
|
||||
await readShortTermRecallEntries({ workspaceDir: params.workspaceDir, nowMs })
|
||||
).filter((entry) => entryWithinLookback(entry, cutoffMs));
|
||||
@@ -1141,6 +1654,7 @@ export async function runDreamingSweepPhases(params: {
|
||||
if (light.enabled && light.limit > 0) {
|
||||
await runLightDreaming({
|
||||
workspaceDir: params.workspaceDir,
|
||||
cfg: params.cfg,
|
||||
config: light,
|
||||
logger: params.logger,
|
||||
subagent: params.subagent,
|
||||
@@ -1155,6 +1669,7 @@ export async function runDreamingSweepPhases(params: {
|
||||
if (rem.enabled && rem.limit > 0) {
|
||||
await runRemDreaming({
|
||||
workspaceDir: params.workspaceDir,
|
||||
cfg: params.cfg,
|
||||
config: rem,
|
||||
logger: params.logger,
|
||||
subagent: params.subagent,
|
||||
@@ -1207,6 +1722,7 @@ async function runPhaseIfTriggered(params: {
|
||||
if (params.phase === "light") {
|
||||
await runLightDreaming({
|
||||
workspaceDir,
|
||||
cfg: params.cfg,
|
||||
config: params.config as MemoryLightDreamingConfig & {
|
||||
timezone?: string;
|
||||
storage: { mode: "inline" | "separate" | "both"; separateReports: boolean };
|
||||
@@ -1217,6 +1733,7 @@ async function runPhaseIfTriggered(params: {
|
||||
} else {
|
||||
await runRemDreaming({
|
||||
workspaceDir,
|
||||
cfg: params.cfg,
|
||||
config: params.config as MemoryRemDreamingConfig & {
|
||||
timezone?: string;
|
||||
storage: { mode: "inline" | "separate" | "both"; separateReports: boolean };
|
||||
|
||||
@@ -55,6 +55,7 @@ describe("short-term promotion", () => {
|
||||
it("detects short-term daily memory paths", () => {
|
||||
expect(isShortTermMemoryPath("memory/2026-04-03.md")).toBe(true);
|
||||
expect(isShortTermMemoryPath("2026-04-03.md")).toBe(true);
|
||||
expect(isShortTermMemoryPath("memory/.dreams/session-corpus/2026-04-03.txt")).toBe(true);
|
||||
expect(isShortTermMemoryPath("notes/2026-04-03.md")).toBe(false);
|
||||
expect(isShortTermMemoryPath("MEMORY.md")).toBe(false);
|
||||
expect(isShortTermMemoryPath("memory/network.md")).toBe(false);
|
||||
|
||||
@@ -13,6 +13,8 @@ import {
|
||||
import { asRecord } from "./dreaming-shared.js";
|
||||
|
||||
const SHORT_TERM_PATH_RE = /(?:^|\/)memory\/(\d{4})-(\d{2})-(\d{2})\.md$/;
|
||||
const SHORT_TERM_SESSION_CORPUS_RE =
|
||||
/(?:^|\/)memory\/\.dreams\/session-corpus\/(\d{4})-(\d{2})-(\d{2})\.(?:md|txt)$/;
|
||||
const SHORT_TERM_BASENAME_RE = /^(\d{4})-(\d{2})-(\d{2})\.md$/;
|
||||
const DAY_MS = 24 * 60 * 60 * 1000;
|
||||
const DEFAULT_RECENCY_HALF_LIFE_DAYS = 14;
|
||||
@@ -763,6 +765,9 @@ export function isShortTermMemoryPath(filePath: string): boolean {
|
||||
if (SHORT_TERM_PATH_RE.test(normalized)) {
|
||||
return true;
|
||||
}
|
||||
if (SHORT_TERM_SESSION_CORPUS_RE.test(normalized)) {
|
||||
return true;
|
||||
}
|
||||
return SHORT_TERM_BASENAME_RE.test(normalized);
|
||||
}
|
||||
|
||||
|
||||
@@ -177,6 +177,13 @@ function isShortTermMemoryPath(filePath: string): boolean {
|
||||
if (/(?:^|\/)memory\/(\d{4})-(\d{2})-(\d{2})\.md$/.test(normalized)) {
|
||||
return true;
|
||||
}
|
||||
if (
|
||||
/(?:^|\/)memory\/\.dreams\/session-corpus\/(\d{4})-(\d{2})-(\d{2})\.(?:md|txt)$/.test(
|
||||
normalized,
|
||||
)
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
return /^(\d{4})-(\d{2})-(\d{2})\.md$/.test(normalized);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user