mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-14 08:04:17 +00:00
refactor: use transcript naming in memory sync
This commit is contained in:
@@ -7,7 +7,7 @@ describe("memory session sync state", () => {
|
||||
needsFullReindex: false,
|
||||
files: ["/tmp/a.jsonl", "/tmp/b.jsonl"],
|
||||
targetSessionTranscripts: null,
|
||||
sessionsDirtyFiles: new Set(),
|
||||
dirtySessionTranscripts: new Set(),
|
||||
existingRows: [
|
||||
{ path: "sessions/a.jsonl", hash: "hash-a" },
|
||||
{ path: "sessions/b.jsonl", hash: "hash-b" },
|
||||
@@ -34,7 +34,7 @@ describe("memory session sync state", () => {
|
||||
needsFullReindex: false,
|
||||
files: ["/tmp/targeted-first.jsonl"],
|
||||
targetSessionTranscripts: new Set(["/tmp/targeted-first.jsonl"]),
|
||||
sessionsDirtyFiles: new Set(["/tmp/targeted-first.jsonl"]),
|
||||
dirtySessionTranscripts: new Set(["/tmp/targeted-first.jsonl"]),
|
||||
existingRows: [
|
||||
{ path: "sessions/targeted-first.jsonl", hash: "hash-first" },
|
||||
{ path: "sessions/targeted-second.jsonl", hash: "hash-second" },
|
||||
@@ -53,7 +53,7 @@ describe("memory session sync state", () => {
|
||||
needsFullReindex: false,
|
||||
files: ["/tmp/incremental.jsonl"],
|
||||
targetSessionTranscripts: null,
|
||||
sessionsDirtyFiles: new Set(["/tmp/incremental.jsonl"]),
|
||||
dirtySessionTranscripts: new Set(["/tmp/incremental.jsonl"]),
|
||||
existingRows: [],
|
||||
sessionPathForTranscript: (file) => `sessions/${file.split("/").at(-1)}`,
|
||||
});
|
||||
|
||||
@@ -4,7 +4,7 @@ export function resolveMemorySessionSyncPlan(params: {
|
||||
needsFullReindex: boolean;
|
||||
files: string[];
|
||||
targetSessionTranscripts: Set<string> | null;
|
||||
sessionsDirtyFiles: Set<string>;
|
||||
dirtySessionTranscripts: Set<string>;
|
||||
existingRows?: MemorySourceFileStateRow[] | null;
|
||||
sessionPathForTranscript: (file: string) => string;
|
||||
}): {
|
||||
@@ -24,6 +24,6 @@ export function resolveMemorySessionSyncPlan(params: {
|
||||
indexAll:
|
||||
params.needsFullReindex ||
|
||||
Boolean(params.targetSessionTranscripts) ||
|
||||
params.sessionsDirtyFiles.size === 0,
|
||||
params.dirtySessionTranscripts.size === 0,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -176,8 +176,8 @@ export abstract class MemoryManagerSyncOps {
|
||||
protected closed = false;
|
||||
protected dirty = false;
|
||||
protected sessionsDirty = false;
|
||||
protected sessionsDirtyFiles = new Set<string>();
|
||||
protected sessionPendingFiles = new Set<string>();
|
||||
protected dirtySessionTranscripts = new Set<string>();
|
||||
protected pendingSessionTranscripts = new Set<string>();
|
||||
protected sessionDeltas = new Map<
|
||||
string,
|
||||
{ lastSize: number; lastMessages: number; pendingBytes: number; pendingMessages: number }
|
||||
@@ -475,7 +475,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
}
|
||||
|
||||
private scheduleSessionDirty(sessionTranscript: string) {
|
||||
this.sessionPendingFiles.add(sessionTranscript);
|
||||
this.pendingSessionTranscripts.add(sessionTranscript);
|
||||
if (this.sessionWatchTimer) {
|
||||
return;
|
||||
}
|
||||
@@ -488,11 +488,11 @@ export abstract class MemoryManagerSyncOps {
|
||||
}
|
||||
|
||||
private async processSessionDeltaBatch(): Promise<void> {
|
||||
if (this.sessionPendingFiles.size === 0) {
|
||||
if (this.pendingSessionTranscripts.size === 0) {
|
||||
return;
|
||||
}
|
||||
const pending = Array.from(this.sessionPendingFiles);
|
||||
this.sessionPendingFiles.clear();
|
||||
const pending = Array.from(this.pendingSessionTranscripts);
|
||||
this.pendingSessionTranscripts.clear();
|
||||
let shouldSync = false;
|
||||
for (const sessionTranscript of pending) {
|
||||
const delta = await this.updateSessionDelta(sessionTranscript);
|
||||
@@ -510,7 +510,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
if (!bytesHit && !messagesHit) {
|
||||
continue;
|
||||
}
|
||||
this.sessionsDirtyFiles.add(sessionTranscript);
|
||||
this.dirtySessionTranscripts.add(sessionTranscript);
|
||||
this.sessionsDirty = true;
|
||||
delta.pendingBytes =
|
||||
bytesThreshold > 0 ? Math.max(0, delta.pendingBytes - bytesThreshold) : 0;
|
||||
@@ -642,7 +642,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
return shouldSyncSessionsForReindex({
|
||||
hasSessionSource: this.sources.has("sessions"),
|
||||
sessionsDirty: this.sessionsDirty,
|
||||
dirtySessionTranscriptCount: this.sessionsDirtyFiles.size,
|
||||
dirtySessionTranscriptCount: this.dirtySessionTranscripts.size,
|
||||
sync: params,
|
||||
needsFullReindex,
|
||||
});
|
||||
@@ -778,7 +778,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
needsFullReindex: params.needsFullReindex,
|
||||
files,
|
||||
targetSessionTranscripts,
|
||||
sessionsDirtyFiles: this.sessionsDirtyFiles,
|
||||
dirtySessionTranscripts: this.dirtySessionTranscripts,
|
||||
existingRows: targetSessionTranscripts
|
||||
? null
|
||||
: loadMemorySourceFileState({
|
||||
@@ -791,8 +791,8 @@ export abstract class MemoryManagerSyncOps {
|
||||
log.debug("memory sync: indexing session transcripts", {
|
||||
files: files.length,
|
||||
indexAll,
|
||||
dirtyFiles: this.sessionsDirtyFiles.size,
|
||||
targetedFiles: targetSessionTranscripts?.size ?? 0,
|
||||
dirtyTranscripts: this.dirtySessionTranscripts.size,
|
||||
targetedTranscripts: targetSessionTranscripts?.size ?? 0,
|
||||
batch: this.batch.enabled,
|
||||
concurrency: this.getIndexConcurrency(),
|
||||
});
|
||||
@@ -808,7 +808,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
}
|
||||
|
||||
const tasks = files.map((absPath) => async () => {
|
||||
if (!indexAll && !this.sessionsDirtyFiles.has(absPath)) {
|
||||
if (!indexAll && !this.dirtySessionTranscripts.has(absPath)) {
|
||||
if (params.progress) {
|
||||
params.progress.completed += 1;
|
||||
params.progress.report({
|
||||
@@ -950,7 +950,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
useUnsafeReindex:
|
||||
process.env.OPENCLAW_TEST_FAST === "1" &&
|
||||
process.env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1",
|
||||
sessionsDirtyFiles: this.sessionsDirtyFiles,
|
||||
dirtySessionTranscripts: this.dirtySessionTranscripts,
|
||||
syncSessionTranscripts: async (targetedParams) => {
|
||||
await this.syncSessionTranscripts(targetedParams);
|
||||
},
|
||||
@@ -1021,8 +1021,8 @@ export abstract class MemoryManagerSyncOps {
|
||||
progress: progress ?? undefined,
|
||||
});
|
||||
this.sessionsDirty = false;
|
||||
this.sessionsDirtyFiles.clear();
|
||||
} else if (this.sessionsDirtyFiles.size > 0) {
|
||||
this.dirtySessionTranscripts.clear();
|
||||
} else if (this.dirtySessionTranscripts.size > 0) {
|
||||
this.sessionsDirty = true;
|
||||
} else {
|
||||
this.sessionsDirty = false;
|
||||
@@ -1176,8 +1176,8 @@ export abstract class MemoryManagerSyncOps {
|
||||
progress: params.progress,
|
||||
});
|
||||
this.sessionsDirty = false;
|
||||
this.sessionsDirtyFiles.clear();
|
||||
} else if (this.sessionsDirtyFiles.size > 0) {
|
||||
this.dirtySessionTranscripts.clear();
|
||||
} else if (this.dirtySessionTranscripts.size > 0) {
|
||||
this.sessionsDirty = true;
|
||||
} else {
|
||||
this.sessionsDirty = false;
|
||||
@@ -1252,8 +1252,8 @@ export abstract class MemoryManagerSyncOps {
|
||||
if (shouldSyncSessions) {
|
||||
await this.syncSessionTranscripts({ needsFullReindex: true, progress: params.progress });
|
||||
this.sessionsDirty = false;
|
||||
this.sessionsDirtyFiles.clear();
|
||||
} else if (this.sessionsDirtyFiles.size > 0) {
|
||||
this.dirtySessionTranscripts.clear();
|
||||
} else if (this.dirtySessionTranscripts.size > 0) {
|
||||
this.sessionsDirty = true;
|
||||
} else {
|
||||
this.sessionsDirty = false;
|
||||
@@ -1296,7 +1296,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
this.ensureSchema();
|
||||
this.dropVectorTable();
|
||||
this.vector.dims = undefined;
|
||||
this.sessionsDirtyFiles.clear();
|
||||
this.dirtySessionTranscripts.clear();
|
||||
}
|
||||
|
||||
protected readMeta(): MemoryIndexMeta | null {
|
||||
|
||||
@@ -7,14 +7,14 @@ import {
|
||||
describe("memory targeted session sync", () => {
|
||||
it("preserves unrelated dirty sessions after targeted cleanup", () => {
|
||||
const secondSessionPath = "/tmp/targeted-dirty-second.jsonl";
|
||||
const sessionsDirtyFiles = new Set(["/tmp/targeted-dirty-first.jsonl", secondSessionPath]);
|
||||
const dirtySessionTranscripts = new Set(["/tmp/targeted-dirty-first.jsonl", secondSessionPath]);
|
||||
|
||||
const sessionsDirty = clearMemorySyncedSessionTranscripts({
|
||||
sessionsDirtyFiles,
|
||||
dirtySessionTranscripts,
|
||||
targetSessionTranscripts: ["/tmp/targeted-dirty-first.jsonl"],
|
||||
});
|
||||
|
||||
expect(sessionsDirtyFiles.has(secondSessionPath)).toBe(true);
|
||||
expect(dirtySessionTranscripts.has(secondSessionPath)).toBe(true);
|
||||
expect(sessionsDirty).toBe(true);
|
||||
});
|
||||
|
||||
@@ -29,7 +29,7 @@ describe("memory targeted session sync", () => {
|
||||
reason: "post-compaction",
|
||||
progress: undefined,
|
||||
useUnsafeReindex: false,
|
||||
sessionsDirtyFiles: new Set(),
|
||||
dirtySessionTranscripts: new Set(),
|
||||
syncSessionTranscripts: async () => {
|
||||
throw new Error("embedding backend failed");
|
||||
},
|
||||
@@ -58,7 +58,7 @@ describe("memory targeted session sync", () => {
|
||||
reason: "post-compaction",
|
||||
progress: undefined,
|
||||
useUnsafeReindex: true,
|
||||
sessionsDirtyFiles: new Set(),
|
||||
dirtySessionTranscripts: new Set(),
|
||||
syncSessionTranscripts: async () => {
|
||||
throw new Error("embedding backend failed");
|
||||
},
|
||||
|
||||
@@ -9,17 +9,17 @@ type TargetedSyncProgress = {
|
||||
};
|
||||
|
||||
export function clearMemorySyncedSessionTranscripts(params: {
|
||||
sessionsDirtyFiles: Set<string>;
|
||||
dirtySessionTranscripts: Set<string>;
|
||||
targetSessionTranscripts?: Iterable<string> | null;
|
||||
}): boolean {
|
||||
if (!params.targetSessionTranscripts) {
|
||||
params.sessionsDirtyFiles.clear();
|
||||
params.dirtySessionTranscripts.clear();
|
||||
} else {
|
||||
for (const targetSessionTranscript of params.targetSessionTranscripts) {
|
||||
params.sessionsDirtyFiles.delete(targetSessionTranscript);
|
||||
params.dirtySessionTranscripts.delete(targetSessionTranscript);
|
||||
}
|
||||
}
|
||||
return params.sessionsDirtyFiles.size > 0;
|
||||
return params.dirtySessionTranscripts.size > 0;
|
||||
}
|
||||
|
||||
export async function runMemoryTargetedSessionSync(params: {
|
||||
@@ -28,7 +28,7 @@ export async function runMemoryTargetedSessionSync(params: {
|
||||
reason?: string;
|
||||
progress?: TargetedSyncProgress;
|
||||
useUnsafeReindex: boolean;
|
||||
sessionsDirtyFiles: Set<string>;
|
||||
dirtySessionTranscripts: Set<string>;
|
||||
syncSessionTranscripts: (params: {
|
||||
needsFullReindex: boolean;
|
||||
targetSessionTranscripts?: string[];
|
||||
@@ -50,7 +50,7 @@ export async function runMemoryTargetedSessionSync(params: {
|
||||
if (!params.hasSessionSource || !params.targetSessionTranscripts) {
|
||||
return {
|
||||
handled: false,
|
||||
sessionsDirty: params.sessionsDirtyFiles.size > 0,
|
||||
sessionsDirty: params.dirtySessionTranscripts.size > 0,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ export async function runMemoryTargetedSessionSync(params: {
|
||||
return {
|
||||
handled: true,
|
||||
sessionsDirty: clearMemorySyncedSessionTranscripts({
|
||||
sessionsDirtyFiles: params.sessionsDirtyFiles,
|
||||
dirtySessionTranscripts: params.dirtySessionTranscripts,
|
||||
targetSessionTranscripts: params.targetSessionTranscripts,
|
||||
}),
|
||||
};
|
||||
@@ -86,7 +86,7 @@ export async function runMemoryTargetedSessionSync(params: {
|
||||
}
|
||||
return {
|
||||
handled: true,
|
||||
sessionsDirty: params.sessionsDirtyFiles.size > 0,
|
||||
sessionsDirty: params.dirtySessionTranscripts.size > 0,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,8 +139,8 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
protected closed = false;
|
||||
protected dirty = false;
|
||||
protected sessionsDirty = false;
|
||||
protected sessionsDirtyFiles = new Set<string>();
|
||||
protected sessionPendingFiles = new Set<string>();
|
||||
protected dirtySessionTranscripts = new Set<string>();
|
||||
protected pendingSessionTranscripts = new Set<string>();
|
||||
protected sessionDeltas = new Map<
|
||||
string,
|
||||
{ lastSize: number; lastMessages: number; pendingBytes: number; pendingMessages: number }
|
||||
|
||||
Reference in New Issue
Block a user