refactor: store memory host events in sqlite

This commit is contained in:
Peter Steinberger
2026-05-08 20:04:29 +01:00
parent 6e8d7ad47f
commit 76b7ca7cf6
11 changed files with 267 additions and 36 deletions

View File

@@ -51,7 +51,9 @@ describe("listMemoryCorePublicArtifacts", () => {
},
};
await expect(listMemoryCorePublicArtifacts({ cfg })).resolves.toEqual([
const artifacts = await listMemoryCorePublicArtifacts({ cfg });
expect(artifacts).toHaveLength(4);
expect(artifacts).toEqual([
{
kind: "memory-root",
workspaceDir,
@@ -83,6 +85,9 @@ describe("listMemoryCorePublicArtifacts", () => {
absolutePath: resolveMemoryHostEventLogPath(workspaceDir),
agentIds: ["main"],
contentType: "json",
content: expect.stringContaining('"type":"memory.recall.recorded"'),
sizeBytes: expect.any(Number),
updatedAtMs: Date.parse("2026-04-06T12:00:00.000Z"),
},
]);
});

View File

@@ -1,9 +1,12 @@
import fs from "node:fs/promises";
import path from "node:path";
import { resolveMemoryHostEventLogPath } from "openclaw/plugin-sdk/memory-core-host-events";
import {
readMemoryHostEvents,
renderMemoryHostEventsJsonl,
resolveMemoryHostEventLogPath,
} from "openclaw/plugin-sdk/memory-core-host-events";
import { resolveMemoryDreamingWorkspaces } from "openclaw/plugin-sdk/memory-core-host-status";
import type { MemoryPluginPublicArtifact } from "openclaw/plugin-sdk/memory-host-core";
import { pathExists } from "openclaw/plugin-sdk/security-runtime";
import type { OpenClawConfig } from "../api.js";
async function listMarkdownFilesRecursive(rootDir: string): Promise<string[]> {
@@ -60,8 +63,17 @@ async function collectWorkspaceArtifacts(params: {
});
}
const eventLogPath = resolveMemoryHostEventLogPath(params.workspaceDir);
if (await pathExists(eventLogPath)) {
const eventContent = renderMemoryHostEventsJsonl(
await readMemoryHostEvents({ workspaceDir: params.workspaceDir }),
);
if (eventContent.trim()) {
const eventLogPath = resolveMemoryHostEventLogPath(params.workspaceDir);
const eventLines = eventContent.trim().split(/\r?\n/u);
const lastEvent = JSON.parse(eventLines.at(-1) ?? "{}") as { timestamp?: string };
const updatedAtMs =
typeof lastEvent.timestamp === "string" && Number.isFinite(Date.parse(lastEvent.timestamp))
? Date.parse(lastEvent.timestamp)
: Date.now();
artifacts.push({
kind: "event-log",
workspaceDir: params.workspaceDir,
@@ -69,6 +81,9 @@ async function collectWorkspaceArtifacts(params: {
absolutePath: eventLogPath,
agentIds: [...params.agentIds],
contentType: "json",
content: eventContent,
updatedAtMs,
sizeBytes: Buffer.byteLength(eventContent),
});
}

View File

@@ -8,6 +8,8 @@ import {
} from "openclaw/plugin-sdk/memory-host-core";
import {
appendMemoryHostEvent,
readMemoryHostEvents,
renderMemoryHostEventsJsonl,
resolveMemoryHostEventLogPath,
} from "openclaw/plugin-sdk/memory-host-events";
import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-runtime";
@@ -230,6 +232,7 @@ describe("syncMemoryWikiBridgeSources", () => {
},
],
});
const eventContent = renderMemoryHostEventsJsonl(await readMemoryHostEvents({ workspaceDir }));
registerBridgeArtifacts([
{
kind: "event-log",
@@ -238,6 +241,9 @@ describe("syncMemoryWikiBridgeSources", () => {
absolutePath: resolveMemoryHostEventLogPath(workspaceDir),
agentIds: ["main"],
contentType: "json",
content: eventContent,
sizeBytes: Buffer.byteLength(eventContent),
updatedAtMs: Date.parse("2026-04-05T12:00:00.000Z"),
},
]);

View File

@@ -30,6 +30,9 @@ type BridgeArtifact = {
workspaceDir: string;
relativePath: string;
absolutePath: string;
content?: string;
updatedAtMs?: number;
sizeBytes?: number;
};
export type BridgeMemoryWikiResult = {
@@ -76,6 +79,9 @@ async function collectBridgeArtifacts(
workspaceDir: artifact.workspaceDir,
relativePath: artifact.relativePath,
absolutePath: artifact.absolutePath,
content: artifact.content,
updatedAtMs: artifact.updatedAtMs,
sizeBytes: artifact.sizeBytes,
});
}
const deduped = new Map<string, BridgeArtifact>();
@@ -145,6 +151,10 @@ async function writeBridgeSourcePage(params: {
workspaceDir: params.artifact.workspaceDir,
relativePath: params.artifact.relativePath,
agentIds: params.agentIds,
contentHash:
params.artifact.content === undefined
? undefined
: createHash("sha1").update(params.artifact.content).digest("hex"),
}),
)
.digest("hex");
@@ -154,6 +164,7 @@ async function writeBridgeSourcePage(params: {
sourcePath: params.artifact.absolutePath,
sourceUpdatedAtMs: params.sourceUpdatedAtMs,
sourceSize: params.sourceSize,
sourceContent: params.artifact.content,
renderFingerprint,
pagePath,
group: "bridge",
@@ -234,7 +245,13 @@ export async function syncMemoryWikiBridgeSources(params: {
}
artifactCount = artifacts.length;
for (const artifact of artifacts) {
const stats = await fs.stat(artifact.absolutePath);
const stats =
artifact.content === undefined
? await fs.stat(artifact.absolutePath)
: {
mtimeMs: artifact.updatedAtMs ?? Date.now(),
size: artifact.sizeBytes ?? Buffer.byteLength(artifact.content),
};
activeKeys.add(artifact.syncKey);
results.push(
await writeBridgeSourcePage({

View File

@@ -14,6 +14,7 @@ export async function writeImportedSourcePage(params: {
sourcePath: string;
sourceUpdatedAtMs: number;
sourceSize: number;
sourceContent?: string;
renderFingerprint: string;
pagePath: string;
group: MemoryWikiImportedSourceGroup;
@@ -46,7 +47,7 @@ export async function writeImportedSourcePage(params: {
return { pagePath: params.pagePath, changed: false, created };
}
const raw = await fs.readFile(params.sourcePath, "utf8");
const raw = params.sourceContent ?? (await fs.readFile(params.sourcePath, "utf8"));
const rendered = params.buildRendered(raw, updatedAt);
const existing = pageStat ? await vault.readText(params.pagePath).catch(() => "") : "";
if (existing !== rendered) {

View File

@@ -18,6 +18,7 @@ import {
readDreamingWorkspaceMap,
readDreamingWorkspaceValue,
} from "../memory-host-sdk/dreaming-state-store.js";
import { readMemoryHostEvents } from "../memory-host-sdk/events.js";
import { loadNodeHostConfig } from "../node-host/config.js";
import { listChannelPairingRequests, readChannelAllowFromStore } from "../pairing/pairing-store.js";
import { readOpenClawStateKvJson } from "../state/openclaw-state-kv.js";
@@ -650,6 +651,17 @@ describe("maybeRepairLegacyRuntimeStateFiles", () => {
})}\n`,
"utf8",
);
await fs.writeFile(
path.join(dreamsDir, "events.jsonl"),
`${JSON.stringify({
type: "memory.recall.recorded",
timestamp: "2026-04-05T14:00:00.000Z",
query: "legacy doctor event",
resultCount: 0,
results: [],
})}\n`,
"utf8",
);
await fs.writeFile(path.join(dreamsDir, "short-term-promotion.lock"), "999999:0\n", "utf8");
await maybeRepairLegacyRuntimeStateFiles({
@@ -674,6 +686,9 @@ describe("maybeRepairLegacyRuntimeStateFiles", () => {
await expect(fs.stat(path.join(dreamsDir, "phase-signals.json"))).rejects.toMatchObject({
code: "ENOENT",
});
await expect(fs.stat(path.join(dreamsDir, "events.jsonl"))).rejects.toMatchObject({
code: "ENOENT",
});
await expect(
fs.stat(path.join(dreamsDir, "short-term-promotion.lock")),
).rejects.toMatchObject({ code: "ENOENT" });
@@ -713,6 +728,12 @@ describe("maybeRepairLegacyRuntimeStateFiles", () => {
"phase-signals",
),
).resolves.toEqual({ updatedAt: "2026-04-05T13:00:00.000Z" });
await expect(readMemoryHostEvents({ workspaceDir, env })).resolves.toMatchObject([
{
type: "memory.recall.recorded",
query: "legacy doctor event",
},
]);
});
});
});

View File

@@ -503,6 +503,9 @@ export async function maybeRepairLegacyRuntimeStateFiles(params: {
`- Imported ${result.rows} memory-core dreaming checkpoint row(s) from ${result.files} legacy file(s) into SQLite${result.removedLocks > 0 ? ` and removed ${result.removedLocks} stale lock file(s)` : ""}.`,
);
}
warnings.push(
...result.warnings.map((warning) => `- Memory-core dreaming state: ${warning}`),
);
});
}

View File

@@ -14,12 +14,14 @@ import {
MEMORY_CORE_SHORT_TERM_RECALL_NAMESPACE,
} from "./dreaming-state-store.js";
import { resolveMemoryDreamingWorkspaces } from "./dreaming.js";
import { importLegacyMemoryHostEventLogToSqlite, resolveMemoryHostEventLogPath } from "./events.js";
const DREAMING_STATE_RELATIVE_PATHS = {
dailyIngestion: path.join("memory", ".dreams", "daily-ingestion.json"),
sessionIngestion: path.join("memory", ".dreams", "session-ingestion.json"),
shortTermRecall: path.join("memory", ".dreams", "short-term-recall.json"),
phaseSignals: path.join("memory", ".dreams", "phase-signals.json"),
events: path.join("memory", ".dreams", "events.jsonl"),
shortTermLock: path.join("memory", ".dreams", "short-term-promotion.lock"),
} as const;
@@ -28,6 +30,7 @@ type MigrationResult = {
files: number;
rows: number;
removedLocks: number;
warnings: string[];
};
function asRecord(value: unknown): Record<string, unknown> | undefined {
@@ -124,7 +127,13 @@ export async function importLegacyMemoryCoreDreamingStateFilesToSqlite(params: {
cfg: OpenClawConfig;
env: NodeJS.ProcessEnv;
}): Promise<MigrationResult> {
const result: MigrationResult = { workspaces: 0, files: 0, rows: 0, removedLocks: 0 };
const result: MigrationResult = {
workspaces: 0,
files: 0,
rows: 0,
removedLocks: 0,
warnings: [],
};
for (const workspaceDir of configuredDreamingWorkspaces(params.cfg)) {
let touchedWorkspace = false;
@@ -240,6 +249,21 @@ export async function importLegacyMemoryCoreDreamingStateFilesToSqlite(params: {
touchedWorkspace = true;
}
const eventsPath = resolveMemoryHostEventLogPath(workspaceDir);
if (await fileExists(eventsPath)) {
const imported = await importLegacyMemoryHostEventLogToSqlite({
workspaceDir,
eventLogPath: eventsPath,
env: params.env,
});
result.rows += imported.imported;
result.warnings.push(...imported.warnings);
if (imported.warnings.length === 0) {
result.files += 1;
}
touchedWorkspace = true;
}
const lockPath = path.join(workspaceDir, DREAMING_STATE_RELATIVE_PATHS.shortTermLock);
if (await fileExists(lockPath)) {
await fs.rm(lockPath, { force: true });

View File

@@ -1,9 +1,20 @@
import { createHash, randomUUID } from "node:crypto";
import fs from "node:fs/promises";
import path from "node:path";
import { appendRegularFile } from "../infra/fs-safe.js";
import { createPluginStateKeyedStore } from "../plugin-state/plugin-state-store.js";
import { MEMORY_CORE_PLUGIN_ID } from "./dreaming-state-store.js";
import type { MemoryDreamingPhaseName } from "./dreaming.js";
export const MEMORY_HOST_EVENT_LOG_RELATIVE_PATH = path.join("memory", ".dreams", "events.jsonl");
const MEMORY_HOST_EVENTS_NAMESPACE = "memory-host.events";
const MAX_MEMORY_HOST_EVENTS = 50_000;
const WORKSPACE_HASH_BYTES = 24;
type StoredMemoryHostEvent = {
workspaceKey: string;
event: MemoryHostEvent;
recordedAt: number;
};
export type MemoryHostRecallRecordedEvent = {
type: "memory.recall.recorded";
@@ -48,6 +59,50 @@ export type MemoryHostEvent =
| MemoryHostPromotionAppliedEvent
| MemoryHostDreamCompletedEvent;
let eventSequence = 0;
function normalizeWorkspaceKey(workspaceDir: string): string {
const resolved = path.resolve(workspaceDir).replace(/\\/g, "/");
return process.platform === "win32" ? resolved.toLowerCase() : resolved;
}
function hashValue(value: string, bytes = 32): string {
return createHash("sha256").update(value).digest("hex").slice(0, bytes);
}
function workspacePrefix(workspaceDir: string): { prefix: string; workspaceKey: string } {
const workspaceKey = normalizeWorkspaceKey(workspaceDir);
return {
prefix: hashValue(workspaceKey, WORKSPACE_HASH_BYTES),
workspaceKey,
};
}
function getMemoryHostEventStore(env?: NodeJS.ProcessEnv) {
return createPluginStateKeyedStore<StoredMemoryHostEvent>(MEMORY_CORE_PLUGIN_ID, {
namespace: MEMORY_HOST_EVENTS_NAMESPACE,
maxEntries: MAX_MEMORY_HOST_EVENTS,
...(env ? { env } : {}),
});
}
function nextEventKey(workspaceDir: string, recordedAt: number): string {
const { prefix } = workspacePrefix(workspaceDir);
eventSequence = (eventSequence + 1) % Number.MAX_SAFE_INTEGER;
return `${prefix}:${recordedAt.toString(36)}:${process.pid.toString(36)}:${eventSequence.toString(36)}:${randomUUID()}`;
}
function legacyEventKey(workspaceDir: string, line: string, lineNumber: number): string {
const { prefix } = workspacePrefix(workspaceDir);
const digest = hashValue(`${lineNumber}\0${line}`);
return `${prefix}:legacy:${digest}`;
}
function eventTimestampMs(event: MemoryHostEvent): number | undefined {
const parsed = Date.parse(event.timestamp);
return Number.isFinite(parsed) ? parsed : undefined;
}
export function resolveMemoryHostEventLogPath(workspaceDir: string): string {
return path.join(workspaceDir, MEMORY_HOST_EVENT_LOG_RELATIVE_PATH);
}
@@ -56,20 +111,54 @@ export async function appendMemoryHostEvent(
workspaceDir: string,
event: MemoryHostEvent,
): Promise<void> {
const eventLogPath = resolveMemoryHostEventLogPath(workspaceDir);
await fs.mkdir(path.dirname(eventLogPath), { recursive: true });
await appendRegularFile({
filePath: eventLogPath,
content: `${JSON.stringify(event)}\n`,
rejectSymlinkParents: true,
const recordedAt = Date.now();
const { workspaceKey } = workspacePrefix(workspaceDir);
await getMemoryHostEventStore().register(nextEventKey(workspaceDir, recordedAt), {
workspaceKey,
event,
recordedAt,
});
}
export async function readMemoryHostEvents(params: {
workspaceDir: string;
limit?: number;
env?: NodeJS.ProcessEnv;
}): Promise<MemoryHostEvent[]> {
const eventLogPath = resolveMemoryHostEventLogPath(params.workspaceDir);
const { prefix, workspaceKey } = workspacePrefix(params.workspaceDir);
const events = (await getMemoryHostEventStore(params.env).entries())
.filter(
(entry) => entry.key.startsWith(`${prefix}:`) && entry.value.workspaceKey === workspaceKey,
)
.toSorted((left, right) => {
const leftTime = eventTimestampMs(left.value.event) ?? left.value.recordedAt;
const rightTime = eventTimestampMs(right.value.event) ?? right.value.recordedAt;
if (leftTime !== rightTime) {
return leftTime - rightTime;
}
if (left.value.recordedAt !== right.value.recordedAt) {
return left.value.recordedAt - right.value.recordedAt;
}
return left.key.localeCompare(right.key);
})
.map((entry) => entry.value.event);
if (!Number.isFinite(params.limit)) {
return events;
}
const limit = Math.max(0, Math.floor(params.limit as number));
return limit === 0 ? [] : events.slice(-limit);
}
export function renderMemoryHostEventsJsonl(events: readonly MemoryHostEvent[]): string {
return events.length === 0 ? "" : `${events.map((event) => JSON.stringify(event)).join("\n")}\n`;
}
export async function importLegacyMemoryHostEventLogToSqlite(params: {
workspaceDir: string;
eventLogPath?: string;
env?: NodeJS.ProcessEnv;
}): Promise<{ imported: number; warnings: string[] }> {
const eventLogPath = params.eventLogPath ?? resolveMemoryHostEventLogPath(params.workspaceDir);
const raw = await fs.readFile(eventLogPath, "utf8").catch((err: unknown) => {
if ((err as NodeJS.ErrnoException)?.code === "ENOENT") {
return "";
@@ -77,22 +166,40 @@ export async function readMemoryHostEvents(params: {
throw err;
});
if (!raw.trim()) {
return [];
await fs.rm(eventLogPath, { force: true });
return { imported: 0, warnings: [] };
}
const events = raw
.split("\n")
.map((line) => line.trim())
.filter(Boolean)
.flatMap((line) => {
try {
return [JSON.parse(line) as MemoryHostEvent];
} catch {
return [];
const { workspaceKey } = workspacePrefix(params.workspaceDir);
const store = getMemoryHostEventStore(params.env);
const warnings: string[] = [];
let imported = 0;
const lines = raw.split(/\r?\n/u);
for (const [index, line] of lines.entries()) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
try {
const event = JSON.parse(trimmed) as MemoryHostEvent;
const inserted = await store.registerIfAbsent(
legacyEventKey(params.workspaceDir, trimmed, index + 1),
{
workspaceKey,
event,
recordedAt: eventTimestampMs(event) ?? Date.now(),
},
);
if (inserted) {
imported += 1;
}
});
if (!Number.isFinite(params.limit)) {
return events;
} catch {
warnings.push(`Skipped invalid memory host event at ${eventLogPath}:${index + 1}`);
}
}
const limit = Math.max(0, Math.floor(params.limit as number));
return limit === 0 ? [] : events.slice(-limit);
if (warnings.length === 0) {
await fs.rm(eventLogPath, { force: true });
}
return { imported, warnings };
}

View File

@@ -3,6 +3,7 @@ import path from "node:path";
import { describe, expect, it } from "vitest";
import {
appendMemoryHostEvent,
importLegacyMemoryHostEventLogToSqlite,
readMemoryHostEvents,
resolveMemoryHostEventLogPath,
} from "./memory-host-events.js";
@@ -48,13 +49,11 @@ describe("memory host event journal helpers", () => {
reportPath: path.join(workspaceDir, "memory", "dreaming", "light", "2026-04-05.md"),
});
const eventLogPath = resolveMemoryHostEventLogPath(workspaceDir);
await expect(fs.readFile(eventLogPath, "utf8")).resolves.toContain(
'"type":"memory.recall.recorded"',
);
const events = await readMemoryHostEvents({ workspaceDir });
const tail = await readMemoryHostEvents({ workspaceDir, limit: 1 });
await expect(fs.stat(resolveMemoryHostEventLogPath(workspaceDir))).rejects.toMatchObject({
code: "ENOENT",
});
expect(events).toHaveLength(2);
expect(events[0]?.type).toBe("memory.recall.recorded");
@@ -62,6 +61,36 @@ describe("memory host event journal helpers", () => {
expect(tail).toHaveLength(1);
expect(tail[0]?.type).toBe("memory.dream.completed");
});
it("imports legacy JSONL journals into SQLite and removes the source after success", async () => {
const workspaceDir = await createTempDir("memory-host-events-legacy-");
const eventLogPath = resolveMemoryHostEventLogPath(workspaceDir);
await fs.mkdir(path.dirname(eventLogPath), { recursive: true });
await fs.writeFile(
eventLogPath,
`${JSON.stringify({
type: "memory.recall.recorded",
timestamp: "2026-04-05T12:00:00.000Z",
query: "legacy event",
resultCount: 0,
results: [],
})}\n`,
"utf8",
);
await expect(importLegacyMemoryHostEventLogToSqlite({ workspaceDir })).resolves.toEqual({
imported: 1,
warnings: [],
});
await expect(fs.stat(eventLogPath)).rejects.toMatchObject({ code: "ENOENT" });
await expect(readMemoryHostEvents({ workspaceDir })).resolves.toMatchObject([
{
type: "memory.recall.recorded",
query: "legacy event",
},
]);
});
});
describe("createPersistentDedupe", () => {

View File

@@ -119,6 +119,9 @@ export type MemoryPluginPublicArtifact = {
absolutePath: string;
agentIds: string[];
contentType: MemoryPluginPublicArtifactContentType;
content?: string;
updatedAtMs?: number;
sizeBytes?: number;
};
export type MemoryPluginPublicArtifactsProvider = {