mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-27 00:17:29 +00:00
perf: extract memory manager state helpers
This commit is contained in:
30
extensions/memory-core/src/memory/manager-async-state.ts
Normal file
30
extensions/memory-core/src/memory/manager-async-state.ts
Normal file
@@ -0,0 +1,30 @@
|
||||
export function startAsyncSearchSync(params: {
|
||||
enabled: boolean;
|
||||
dirty: boolean;
|
||||
sessionsDirty: boolean;
|
||||
sync: (params: { reason: string }) => Promise<void>;
|
||||
onError: (err: unknown) => void;
|
||||
}): void {
|
||||
if (!params.enabled || (!params.dirty && !params.sessionsDirty)) {
|
||||
return;
|
||||
}
|
||||
void params.sync({ reason: "search" }).catch((err) => {
|
||||
params.onError(err);
|
||||
});
|
||||
}
|
||||
|
||||
export async function awaitPendingManagerWork(params: {
|
||||
pendingSync?: Promise<void> | null;
|
||||
pendingProviderInit?: Promise<void> | null;
|
||||
}): Promise<void> {
|
||||
if (params.pendingSync) {
|
||||
try {
|
||||
await params.pendingSync;
|
||||
} catch {}
|
||||
}
|
||||
if (params.pendingProviderInit) {
|
||||
try {
|
||||
await params.pendingProviderInit;
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
145
extensions/memory-core/src/memory/manager-cache.test.ts
Normal file
145
extensions/memory-core/src/memory/manager-cache.test.ts
Normal file
@@ -0,0 +1,145 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
closeManagedCacheEntries,
|
||||
getOrCreateManagedCacheEntry,
|
||||
resolveSingletonManagedCache,
|
||||
type ManagedCache,
|
||||
} from "./manager-cache.js";
|
||||
|
||||
type TestEntry = {
|
||||
id: string;
|
||||
close: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
function createTestCache(): ManagedCache<TestEntry> {
|
||||
return resolveSingletonManagedCache<TestEntry>(Symbol("openclaw.manager-cache.test"));
|
||||
}
|
||||
|
||||
function createEntry(id: string): TestEntry {
|
||||
return {
|
||||
id,
|
||||
close: vi.fn(async () => {}),
|
||||
};
|
||||
}
|
||||
|
||||
function createDeferred<T>() {
|
||||
let resolve!: (value: T | PromiseLike<T>) => void;
|
||||
let reject!: (reason?: unknown) => void;
|
||||
const promise = new Promise<T>((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
}
|
||||
|
||||
describe("manager cache", () => {
|
||||
const cachesForCleanup: ManagedCache<TestEntry>[] = [];
|
||||
|
||||
afterEach(async () => {
|
||||
await Promise.all(
|
||||
cachesForCleanup.splice(0).map((cache) =>
|
||||
closeManagedCacheEntries({
|
||||
cache: cache.cache,
|
||||
pending: cache.pending,
|
||||
}),
|
||||
),
|
||||
);
|
||||
});
|
||||
|
||||
it("deduplicates concurrent creation for the same cache key", async () => {
|
||||
const cache = createTestCache();
|
||||
cachesForCleanup.push(cache);
|
||||
let createCalls = 0;
|
||||
|
||||
const results = await Promise.all(
|
||||
Array.from(
|
||||
{ length: 12 },
|
||||
async () =>
|
||||
await getOrCreateManagedCacheEntry({
|
||||
cache: cache.cache,
|
||||
pending: cache.pending,
|
||||
key: "same",
|
||||
create: async () => {
|
||||
createCalls += 1;
|
||||
await Promise.resolve();
|
||||
return createEntry("shared");
|
||||
},
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
expect(results).toHaveLength(12);
|
||||
expect(new Set(results).size).toBe(1);
|
||||
expect(createCalls).toBe(1);
|
||||
});
|
||||
|
||||
it("waits for pending creation before global teardown closes cached entries", async () => {
|
||||
const cache = createTestCache();
|
||||
const first = createEntry("first");
|
||||
const second = createEntry("second");
|
||||
cachesForCleanup.push(cache);
|
||||
const gate = createDeferred<void>();
|
||||
|
||||
const pendingFirst = getOrCreateManagedCacheEntry({
|
||||
cache: cache.cache,
|
||||
pending: cache.pending,
|
||||
key: "same",
|
||||
create: async () => {
|
||||
await gate.promise;
|
||||
return first;
|
||||
},
|
||||
});
|
||||
|
||||
const teardown = closeManagedCacheEntries({
|
||||
cache: cache.cache,
|
||||
pending: cache.pending,
|
||||
});
|
||||
gate.resolve();
|
||||
|
||||
await teardown;
|
||||
expect(first.close).toHaveBeenCalledTimes(1);
|
||||
|
||||
const resolvedFirst = await pendingFirst;
|
||||
const resolvedSecond = await getOrCreateManagedCacheEntry({
|
||||
cache: cache.cache,
|
||||
pending: cache.pending,
|
||||
key: "same",
|
||||
create: async () => second,
|
||||
});
|
||||
|
||||
expect(resolvedFirst).toBe(first);
|
||||
expect(resolvedSecond).toBe(second);
|
||||
expect(resolvedSecond).not.toBe(resolvedFirst);
|
||||
});
|
||||
|
||||
it("bypasses identity caching for status-only callers", async () => {
|
||||
const cache = createTestCache();
|
||||
cachesForCleanup.push(cache);
|
||||
let createCalls = 0;
|
||||
|
||||
const first = await getOrCreateManagedCacheEntry({
|
||||
cache: cache.cache,
|
||||
pending: cache.pending,
|
||||
key: "same",
|
||||
bypassCache: true,
|
||||
create: async () => {
|
||||
createCalls += 1;
|
||||
return createEntry(`status-${createCalls}`);
|
||||
},
|
||||
});
|
||||
const second = await getOrCreateManagedCacheEntry({
|
||||
cache: cache.cache,
|
||||
pending: cache.pending,
|
||||
key: "same",
|
||||
bypassCache: true,
|
||||
create: async () => {
|
||||
createCalls += 1;
|
||||
return createEntry(`status-${createCalls}`);
|
||||
},
|
||||
});
|
||||
|
||||
expect(first).not.toBe(second);
|
||||
expect(createCalls).toBe(2);
|
||||
expect(cache.cache.size).toBe(0);
|
||||
});
|
||||
});
|
||||
77
extensions/memory-core/src/memory/manager-cache.ts
Normal file
77
extensions/memory-core/src/memory/manager-cache.ts
Normal file
@@ -0,0 +1,77 @@
|
||||
import { resolveGlobalSingleton } from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
|
||||
|
||||
type Closable = {
|
||||
close?: () => Promise<void> | void;
|
||||
};
|
||||
|
||||
export type ManagedCache<T> = {
|
||||
cache: Map<string, T>;
|
||||
pending: Map<string, Promise<T>>;
|
||||
};
|
||||
|
||||
export function resolveSingletonManagedCache<T>(cacheKey: symbol): ManagedCache<T> {
|
||||
return resolveGlobalSingleton<ManagedCache<T>>(cacheKey, () => ({
|
||||
cache: new Map<string, T>(),
|
||||
pending: new Map<string, Promise<T>>(),
|
||||
}));
|
||||
}
|
||||
|
||||
export async function getOrCreateManagedCacheEntry<T>(params: {
|
||||
cache: Map<string, T>;
|
||||
pending: Map<string, Promise<T>>;
|
||||
key: string;
|
||||
bypassCache?: boolean;
|
||||
create: () => Promise<T> | T;
|
||||
}): Promise<T> {
|
||||
if (params.bypassCache) {
|
||||
return await params.create();
|
||||
}
|
||||
const existing = params.cache.get(params.key);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const pending = params.pending.get(params.key);
|
||||
if (pending) {
|
||||
return pending;
|
||||
}
|
||||
const createPromise = (async () => {
|
||||
const refreshed = params.cache.get(params.key);
|
||||
if (refreshed) {
|
||||
return refreshed;
|
||||
}
|
||||
const entry = await params.create();
|
||||
params.cache.set(params.key, entry);
|
||||
return entry;
|
||||
})();
|
||||
params.pending.set(params.key, createPromise);
|
||||
try {
|
||||
return await createPromise;
|
||||
} finally {
|
||||
if (params.pending.get(params.key) === createPromise) {
|
||||
params.pending.delete(params.key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function closeManagedCacheEntries<T extends Closable>(params: {
|
||||
cache: Map<string, T>;
|
||||
pending: Map<string, Promise<T>>;
|
||||
onCloseError?: (err: unknown) => void;
|
||||
}): Promise<void> {
|
||||
const pending = Array.from(params.pending.values());
|
||||
if (pending.length > 0) {
|
||||
await Promise.allSettled(pending);
|
||||
}
|
||||
const entries = Array.from(params.cache.values());
|
||||
params.cache.clear();
|
||||
for (const entry of entries) {
|
||||
if (typeof entry.close !== "function") {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
await entry.close();
|
||||
} catch (err) {
|
||||
params.onCloseError?.(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
29
extensions/memory-core/src/memory/manager-session-reindex.ts
Normal file
29
extensions/memory-core/src/memory/manager-session-reindex.ts
Normal file
@@ -0,0 +1,29 @@
|
||||
export function shouldSyncSessionsForReindex(params: {
|
||||
hasSessionSource: boolean;
|
||||
sessionsDirty: boolean;
|
||||
dirtySessionFileCount: number;
|
||||
sync?: {
|
||||
reason?: string;
|
||||
force?: boolean;
|
||||
sessionFiles?: string[];
|
||||
};
|
||||
needsFullReindex?: boolean;
|
||||
}): boolean {
|
||||
if (!params.hasSessionSource) {
|
||||
return false;
|
||||
}
|
||||
if (params.sync?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) {
|
||||
return true;
|
||||
}
|
||||
if (params.sync?.force) {
|
||||
return true;
|
||||
}
|
||||
if (params.needsFullReindex) {
|
||||
return true;
|
||||
}
|
||||
const reason = params.sync?.reason;
|
||||
if (reason === "session-start" || reason === "watch") {
|
||||
return false;
|
||||
}
|
||||
return params.sessionsDirty && params.dirtySessionFileCount > 0;
|
||||
}
|
||||
@@ -46,6 +46,7 @@ import {
|
||||
type EmbeddingProviderRuntime,
|
||||
resolveEmbeddingProviderFallbackModel,
|
||||
} from "./embeddings.js";
|
||||
import { shouldSyncSessionsForReindex } from "./manager-session-reindex.js";
|
||||
|
||||
type MemoryIndexMeta = {
|
||||
model: string;
|
||||
@@ -671,23 +672,13 @@ export abstract class MemoryManagerSyncOps {
|
||||
params?: { reason?: string; force?: boolean; sessionFiles?: string[] },
|
||||
needsFullReindex = false,
|
||||
) {
|
||||
if (!this.sources.has("sessions")) {
|
||||
return false;
|
||||
}
|
||||
if (params?.sessionFiles?.some((sessionFile) => sessionFile.trim().length > 0)) {
|
||||
return true;
|
||||
}
|
||||
if (params?.force) {
|
||||
return true;
|
||||
}
|
||||
if (needsFullReindex) {
|
||||
return true;
|
||||
}
|
||||
const reason = params?.reason;
|
||||
if (reason === "session-start" || reason === "watch") {
|
||||
return false;
|
||||
}
|
||||
return this.sessionsDirty && this.sessionsDirtyFiles.size > 0;
|
||||
return shouldSyncSessionsForReindex({
|
||||
hasSessionSource: this.sources.has("sessions"),
|
||||
sessionsDirty: this.sessionsDirty,
|
||||
dirtySessionFileCount: this.sessionsDirtyFiles.size,
|
||||
sync: params,
|
||||
needsFullReindex,
|
||||
});
|
||||
}
|
||||
|
||||
private async syncMemoryFiles(params: {
|
||||
|
||||
@@ -1,118 +1,39 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { MemoryIndexManager } from "./index.js";
|
||||
import { closeAllMemorySearchManagers } from "./index.js";
|
||||
import { createOpenAIEmbeddingProviderMock } from "./test-embeddings-mock.js";
|
||||
import { createMemoryManagerOrThrow } from "./test-manager.js";
|
||||
|
||||
const embedBatch = vi.fn(async (_input: string[]): Promise<number[][]> => []);
|
||||
const embedQuery = vi.fn(async (_input: string): Promise<number[]> => [0.2, 0.2, 0.2]);
|
||||
|
||||
vi.mock("./embeddings.js", () => ({
|
||||
createEmbeddingProvider: async (_options: unknown) =>
|
||||
createOpenAIEmbeddingProviderMock({
|
||||
embedQuery: embedQuery as unknown as (input: string) => Promise<number[]>,
|
||||
embedBatch: embedBatch as unknown as (input: string[]) => Promise<number[][]>,
|
||||
}),
|
||||
}));
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { awaitPendingManagerWork, startAsyncSearchSync } from "./manager-async-state.js";
|
||||
|
||||
describe("memory search async sync", () => {
|
||||
let workspaceDir: string;
|
||||
let indexPath: string;
|
||||
let manager: MemoryIndexManager | null = null;
|
||||
|
||||
const buildConfig = (): OpenClawConfig =>
|
||||
({
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: workspaceDir,
|
||||
memorySearch: {
|
||||
provider: "openai",
|
||||
model: "text-embedding-3-small",
|
||||
store: { path: indexPath },
|
||||
sync: { watch: false, onSessionStart: false, onSearch: true },
|
||||
query: { minScore: 0 },
|
||||
remote: { batch: { enabled: false, wait: false } },
|
||||
},
|
||||
},
|
||||
list: [{ id: "main", default: true }],
|
||||
},
|
||||
}) as OpenClawConfig;
|
||||
|
||||
beforeEach(async () => {
|
||||
await closeAllMemorySearchManagers();
|
||||
embedBatch.mockClear();
|
||||
embedBatch.mockImplementation(async (input: string[]) => input.map(() => [0.2, 0.2, 0.2]));
|
||||
workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-mem-async-"));
|
||||
indexPath = path.join(workspaceDir, "index.sqlite");
|
||||
await fs.mkdir(path.join(workspaceDir, "memory"));
|
||||
await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-07.md"), "hello\n");
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
vi.unstubAllGlobals();
|
||||
if (manager) {
|
||||
await manager.close();
|
||||
manager = null;
|
||||
}
|
||||
await closeAllMemorySearchManagers();
|
||||
await fs.rm(workspaceDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("does not await sync when searching", async () => {
|
||||
const cfg = buildConfig();
|
||||
manager = await createMemoryManagerOrThrow(cfg);
|
||||
|
||||
let releaseSync = () => {};
|
||||
const pending = new Promise<void>((resolve) => {
|
||||
releaseSync = () => resolve();
|
||||
}).finally(() => {
|
||||
(manager as unknown as { syncing: Promise<void> | null }).syncing = null;
|
||||
});
|
||||
const syncMock = vi.fn(async () => {
|
||||
(manager as unknown as { syncing: Promise<void> | null }).syncing = pending;
|
||||
return pending;
|
||||
});
|
||||
(manager as unknown as { sync: () => Promise<void> }).sync = syncMock;
|
||||
const onError = vi.fn();
|
||||
|
||||
startAsyncSearchSync({
|
||||
enabled: true,
|
||||
dirty: true,
|
||||
sessionsDirty: false,
|
||||
sync: syncMock,
|
||||
onError,
|
||||
});
|
||||
|
||||
const activeManager = manager;
|
||||
if (!activeManager) {
|
||||
throw new Error("manager missing");
|
||||
}
|
||||
await activeManager.search("hello");
|
||||
expect(syncMock).toHaveBeenCalledTimes(1);
|
||||
releaseSync();
|
||||
await vi.waitFor(() => {
|
||||
expect((manager as unknown as { syncing: Promise<void> | null }).syncing).toBeNull();
|
||||
});
|
||||
}, 300_000);
|
||||
await pending;
|
||||
expect(onError).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("waits for in-flight search sync during close", async () => {
|
||||
const cfg = buildConfig();
|
||||
manager = await createMemoryManagerOrThrow(cfg);
|
||||
let releaseSync = () => {};
|
||||
const pendingSync = new Promise<void>((resolve) => {
|
||||
releaseSync = () => resolve();
|
||||
}).finally(() => {
|
||||
(manager as unknown as { syncing: Promise<void> | null }).syncing = null;
|
||||
});
|
||||
const syncMock = vi.fn(async () => {
|
||||
(manager as unknown as { syncing: Promise<void> | null }).syncing = pendingSync;
|
||||
return pendingSync;
|
||||
});
|
||||
(manager as unknown as { dirty: boolean }).dirty = true;
|
||||
(manager as unknown as { sync: () => Promise<void> }).sync = syncMock;
|
||||
|
||||
await manager.search("hello");
|
||||
await vi.waitFor(() => {
|
||||
expect((manager as unknown as { syncing: Promise<void> | null }).syncing).toBe(pendingSync);
|
||||
});
|
||||
|
||||
let closed = false;
|
||||
const closePromise = manager.close().then(() => {
|
||||
const closePromise = awaitPendingManagerWork({ pendingSync }).then(() => {
|
||||
closed = true;
|
||||
});
|
||||
|
||||
@@ -121,6 +42,17 @@ describe("memory search async sync", () => {
|
||||
|
||||
releaseSync();
|
||||
await closePromise;
|
||||
manager = null;
|
||||
});
|
||||
|
||||
it("skips background search sync when search-triggered sync is disabled", () => {
|
||||
const syncMock = vi.fn(async () => {});
|
||||
startAsyncSearchSync({
|
||||
enabled: false,
|
||||
dirty: true,
|
||||
sessionsDirty: false,
|
||||
sync: syncMock,
|
||||
onError: vi.fn(),
|
||||
});
|
||||
expect(syncMock).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,137 +0,0 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { setTimeout as sleep } from "node:timers/promises";
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
|
||||
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import "./test-runtime-mocks.js";
|
||||
import type { MemoryIndexManager } from "./index.js";
|
||||
|
||||
type MemoryIndexModule = typeof import("./index.js");
|
||||
type ManagerModule = typeof import("./manager.js");
|
||||
|
||||
const hoisted = vi.hoisted(() => ({
|
||||
providerCreateCalls: 0,
|
||||
providerDelayMs: 0,
|
||||
}));
|
||||
|
||||
vi.mock("./embeddings.js", () => ({
|
||||
createEmbeddingProvider: async () => {
|
||||
hoisted.providerCreateCalls += 1;
|
||||
if (hoisted.providerDelayMs > 0) {
|
||||
await sleep(hoisted.providerDelayMs);
|
||||
}
|
||||
return {
|
||||
requestedProvider: "openai",
|
||||
provider: {
|
||||
id: "mock",
|
||||
model: "mock-embed",
|
||||
maxInputTokens: 8192,
|
||||
embedQuery: async () => [0, 1, 0],
|
||||
embedBatch: async (texts: string[]) => texts.map(() => [0, 1, 0]),
|
||||
},
|
||||
};
|
||||
},
|
||||
}));
|
||||
|
||||
let getMemorySearchManager: MemoryIndexModule["getMemorySearchManager"];
|
||||
let closeAllMemorySearchManagers: MemoryIndexModule["closeAllMemorySearchManagers"];
|
||||
let closeAllMemoryIndexManagers: ManagerModule["closeAllMemoryIndexManagers"];
|
||||
let RawMemoryIndexManager: ManagerModule["MemoryIndexManager"];
|
||||
|
||||
describe("memory manager cache hydration", () => {
|
||||
let workspaceDir = "";
|
||||
|
||||
beforeAll(async () => {
|
||||
({ getMemorySearchManager, closeAllMemorySearchManagers } = await import("./index.js"));
|
||||
({ closeAllMemoryIndexManagers, MemoryIndexManager: RawMemoryIndexManager } =
|
||||
await import("./manager.js"));
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
await closeAllMemoryIndexManagers();
|
||||
vi.clearAllMocks();
|
||||
workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-mem-concurrent-"));
|
||||
await fs.mkdir(path.join(workspaceDir, "memory"), { recursive: true });
|
||||
await fs.writeFile(path.join(workspaceDir, "MEMORY.md"), "Hello memory.");
|
||||
hoisted.providerCreateCalls = 0;
|
||||
hoisted.providerDelayMs = 50;
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await closeAllMemorySearchManagers();
|
||||
await fs.rm(workspaceDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
function createMemoryConcurrencyConfig(indexPath: string): OpenClawConfig {
|
||||
return {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: workspaceDir,
|
||||
memorySearch: {
|
||||
provider: "openai",
|
||||
model: "mock-embed",
|
||||
store: { path: indexPath, vector: { enabled: false } },
|
||||
sync: { watch: false, onSessionStart: false, onSearch: false },
|
||||
},
|
||||
},
|
||||
list: [{ id: "main", default: true }],
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
}
|
||||
|
||||
it("deduplicates concurrent manager creation for the same cache key", async () => {
|
||||
const indexPath = path.join(workspaceDir, "index.sqlite");
|
||||
const cfg = createMemoryConcurrencyConfig(indexPath);
|
||||
|
||||
const results = await Promise.all(
|
||||
Array.from(
|
||||
{ length: 12 },
|
||||
async () => await getMemorySearchManager({ cfg, agentId: "main" }),
|
||||
),
|
||||
);
|
||||
const managers = results
|
||||
.map((result) => result.manager)
|
||||
.filter((manager): manager is MemoryIndexManager => Boolean(manager));
|
||||
|
||||
expect(managers).toHaveLength(12);
|
||||
expect(new Set(managers).size).toBe(1);
|
||||
expect(hoisted.providerCreateCalls).toBe(0);
|
||||
|
||||
await managers[0].close();
|
||||
});
|
||||
|
||||
it("evicts cached managers during global teardown", async () => {
|
||||
const indexPath = path.join(workspaceDir, "index.sqlite");
|
||||
const cfg = createMemoryConcurrencyConfig(indexPath);
|
||||
|
||||
const pendingResult = RawMemoryIndexManager.get({ cfg, agentId: "main" });
|
||||
await closeAllMemoryIndexManagers();
|
||||
const firstManager = await pendingResult;
|
||||
|
||||
const secondManager = await RawMemoryIndexManager.get({ cfg, agentId: "main" });
|
||||
|
||||
expect(firstManager).toBeTruthy();
|
||||
expect(secondManager).toBeTruthy();
|
||||
expect(Object.is(secondManager, firstManager)).toBe(false);
|
||||
expect(hoisted.providerCreateCalls).toBe(0);
|
||||
|
||||
await secondManager?.close?.();
|
||||
});
|
||||
|
||||
it("does not identity-cache status-only managers", async () => {
|
||||
const indexPath = path.join(workspaceDir, "index.sqlite");
|
||||
const cfg = createMemoryConcurrencyConfig(indexPath);
|
||||
|
||||
const first = await RawMemoryIndexManager.get({ cfg, agentId: "main", purpose: "status" });
|
||||
const second = await RawMemoryIndexManager.get({ cfg, agentId: "main", purpose: "status" });
|
||||
|
||||
expect(first).toBeTruthy();
|
||||
expect(second).toBeTruthy();
|
||||
expect(Object.is(second, first)).toBe(false);
|
||||
expect(hoisted.providerCreateCalls).toBe(0);
|
||||
|
||||
await first?.close?.();
|
||||
await second?.close?.();
|
||||
});
|
||||
});
|
||||
@@ -1,78 +1,43 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
|
||||
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from "vitest";
|
||||
import type { MemoryIndexManager } from "./index.js";
|
||||
import { getRequiredMemoryIndexManager } from "./test-manager-helpers.js";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { shouldSyncSessionsForReindex } from "./manager-session-reindex.js";
|
||||
|
||||
describe("memory manager session reindex gating", () => {
|
||||
let fixtureRoot = "";
|
||||
let caseId = 0;
|
||||
let workspaceDir: string;
|
||||
let indexPath: string;
|
||||
let manager: MemoryIndexManager | null = null;
|
||||
|
||||
beforeAll(async () => {
|
||||
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-mem-session-reindex-"));
|
||||
});
|
||||
|
||||
beforeEach(async () => {
|
||||
workspaceDir = path.join(fixtureRoot, `case-${caseId++}`);
|
||||
await fs.mkdir(path.join(workspaceDir, "memory"), { recursive: true });
|
||||
await fs.writeFile(path.join(workspaceDir, "MEMORY.md"), "Hello memory.");
|
||||
indexPath = path.join(workspaceDir, "index.sqlite");
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
if (manager) {
|
||||
await manager.close();
|
||||
manager = null;
|
||||
}
|
||||
});
|
||||
|
||||
afterAll(async () => {
|
||||
if (!fixtureRoot) {
|
||||
return;
|
||||
}
|
||||
await fs.rm(fixtureRoot, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("keeps session syncing enabled for full reindexes triggered from session-start/watch", async () => {
|
||||
const cfg = {
|
||||
agents: {
|
||||
defaults: {
|
||||
workspace: workspaceDir,
|
||||
memorySearch: {
|
||||
provider: "openai",
|
||||
model: "mock-embed",
|
||||
store: { path: indexPath, vector: { enabled: false } },
|
||||
cache: { enabled: false },
|
||||
query: { minScore: 0, hybrid: { enabled: false } },
|
||||
chunking: { tokens: 4000, overlap: 0 },
|
||||
experimental: { sessionMemory: true },
|
||||
sources: ["memory", "sessions"],
|
||||
sync: { watch: false, onSessionStart: false, onSearch: false },
|
||||
},
|
||||
},
|
||||
list: [{ id: "main", default: true }],
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
|
||||
manager = await getRequiredMemoryIndexManager({ cfg, agentId: "main" });
|
||||
|
||||
const shouldSyncSessions = (
|
||||
manager as unknown as {
|
||||
shouldSyncSessions: (
|
||||
params?: { reason?: string; force?: boolean },
|
||||
needsFullReindex?: boolean,
|
||||
) => boolean;
|
||||
}
|
||||
).shouldSyncSessions.bind(manager);
|
||||
|
||||
expect(shouldSyncSessions({ reason: "session-start" }, true)).toBe(true);
|
||||
expect(shouldSyncSessions({ reason: "watch" }, true)).toBe(true);
|
||||
expect(shouldSyncSessions({ reason: "session-start" }, false)).toBe(false);
|
||||
expect(shouldSyncSessions({ reason: "watch" }, false)).toBe(false);
|
||||
it("keeps session syncing enabled for full reindexes triggered from session-start/watch", () => {
|
||||
expect(
|
||||
shouldSyncSessionsForReindex({
|
||||
hasSessionSource: true,
|
||||
sessionsDirty: false,
|
||||
dirtySessionFileCount: 0,
|
||||
sync: { reason: "session-start" },
|
||||
needsFullReindex: true,
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldSyncSessionsForReindex({
|
||||
hasSessionSource: true,
|
||||
sessionsDirty: false,
|
||||
dirtySessionFileCount: 0,
|
||||
sync: { reason: "watch" },
|
||||
needsFullReindex: true,
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldSyncSessionsForReindex({
|
||||
hasSessionSource: true,
|
||||
sessionsDirty: false,
|
||||
dirtySessionFileCount: 0,
|
||||
sync: { reason: "session-start" },
|
||||
needsFullReindex: false,
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
shouldSyncSessionsForReindex({
|
||||
hasSessionSource: true,
|
||||
sessionsDirty: false,
|
||||
dirtySessionFileCount: 0,
|
||||
sync: { reason: "watch" },
|
||||
needsFullReindex: false,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3,7 +3,6 @@ import { type FSWatcher } from "chokidar";
|
||||
import {
|
||||
resolveAgentDir,
|
||||
resolveAgentWorkspaceDir,
|
||||
resolveGlobalSingleton,
|
||||
resolveMemorySearchConfig,
|
||||
createSubsystemLogger,
|
||||
type OpenClawConfig,
|
||||
@@ -28,6 +27,12 @@ import {
|
||||
type EmbeddingProviderRuntime,
|
||||
} from "./embeddings.js";
|
||||
import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js";
|
||||
import { awaitPendingManagerWork, startAsyncSearchSync } from "./manager-async-state.js";
|
||||
import {
|
||||
closeManagedCacheEntries,
|
||||
getOrCreateManagedCacheEntry,
|
||||
resolveSingletonManagedCache,
|
||||
} from "./manager-cache.js";
|
||||
import { MemoryManagerEmbeddingOps } from "./manager-embedding-ops.js";
|
||||
import { searchKeyword, searchVector } from "./manager-search.js";
|
||||
const SNIPPET_MAX_CHARS = 700;
|
||||
@@ -37,26 +42,11 @@ const EMBEDDING_CACHE_TABLE = "embedding_cache";
|
||||
const BATCH_FAILURE_LIMIT = 2;
|
||||
|
||||
const MEMORY_INDEX_MANAGER_CACHE_KEY = Symbol.for("openclaw.memoryIndexManagerCache");
|
||||
type MemoryIndexManagerCacheStore = {
|
||||
indexCache: Map<string, MemoryIndexManager>;
|
||||
indexCachePending: Map<string, Promise<MemoryIndexManager>>;
|
||||
};
|
||||
|
||||
function getMemoryIndexManagerCacheStore(): MemoryIndexManagerCacheStore {
|
||||
// Keep manager caches reachable across `vi.resetModules()` so cleanup still reaches older managers.
|
||||
return resolveGlobalSingleton<MemoryIndexManagerCacheStore>(
|
||||
MEMORY_INDEX_MANAGER_CACHE_KEY,
|
||||
() => ({
|
||||
indexCache: new Map<string, MemoryIndexManager>(),
|
||||
indexCachePending: new Map<string, Promise<MemoryIndexManager>>(),
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
const log = createSubsystemLogger("memory");
|
||||
|
||||
const { indexCache: INDEX_CACHE, indexCachePending: INDEX_CACHE_PENDING } =
|
||||
getMemoryIndexManagerCacheStore();
|
||||
const { cache: INDEX_CACHE, pending: INDEX_CACHE_PENDING } =
|
||||
resolveSingletonManagedCache<MemoryIndexManager>(MEMORY_INDEX_MANAGER_CACHE_KEY);
|
||||
|
||||
type MemoryReadonlyRecoveryState = {
|
||||
closed: boolean;
|
||||
@@ -174,19 +164,13 @@ export async function runMemorySyncWithReadonlyRecovery(
|
||||
}
|
||||
|
||||
export async function closeAllMemoryIndexManagers(): Promise<void> {
|
||||
const pending = Array.from(INDEX_CACHE_PENDING.values());
|
||||
if (pending.length > 0) {
|
||||
await Promise.allSettled(pending);
|
||||
}
|
||||
const managers = Array.from(INDEX_CACHE.values());
|
||||
INDEX_CACHE.clear();
|
||||
for (const manager of managers) {
|
||||
try {
|
||||
await manager.close();
|
||||
} catch (err) {
|
||||
await closeManagedCacheEntries({
|
||||
cache: INDEX_CACHE,
|
||||
pending: INDEX_CACHE_PENDING,
|
||||
onCloseError: (err) => {
|
||||
log.warn(`failed to close memory index manager: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements MemorySearchManager {
|
||||
@@ -285,48 +269,21 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
const purpose = params.purpose === "status" ? "status" : "default";
|
||||
const key = `${agentId}:${workspaceDir}:${JSON.stringify(settings)}:${purpose}`;
|
||||
const statusOnly = params.purpose === "status";
|
||||
if (statusOnly) {
|
||||
return new MemoryIndexManager({
|
||||
cacheKey: key,
|
||||
cfg,
|
||||
agentId,
|
||||
workspaceDir,
|
||||
settings,
|
||||
purpose: params.purpose,
|
||||
});
|
||||
}
|
||||
const existing = INDEX_CACHE.get(key);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const pending = INDEX_CACHE_PENDING.get(key);
|
||||
if (pending) {
|
||||
return pending;
|
||||
}
|
||||
const createPromise = (async () => {
|
||||
const refreshed = INDEX_CACHE.get(key);
|
||||
if (refreshed) {
|
||||
return refreshed;
|
||||
}
|
||||
const manager = new MemoryIndexManager({
|
||||
cacheKey: key,
|
||||
cfg,
|
||||
agentId,
|
||||
workspaceDir,
|
||||
settings,
|
||||
purpose: params.purpose,
|
||||
});
|
||||
INDEX_CACHE.set(key, manager);
|
||||
return manager;
|
||||
})();
|
||||
INDEX_CACHE_PENDING.set(key, createPromise);
|
||||
try {
|
||||
return await createPromise;
|
||||
} finally {
|
||||
if (INDEX_CACHE_PENDING.get(key) === createPromise) {
|
||||
INDEX_CACHE_PENDING.delete(key);
|
||||
}
|
||||
}
|
||||
return await getOrCreateManagedCacheEntry({
|
||||
cache: INDEX_CACHE,
|
||||
pending: INDEX_CACHE_PENDING,
|
||||
key,
|
||||
bypassCache: statusOnly,
|
||||
create: async () =>
|
||||
new MemoryIndexManager({
|
||||
cacheKey: key,
|
||||
cfg,
|
||||
agentId,
|
||||
workspaceDir,
|
||||
settings,
|
||||
purpose: params.purpose,
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
private constructor(params: {
|
||||
@@ -440,11 +397,15 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
return [];
|
||||
}
|
||||
void this.warmSession(opts?.sessionKey);
|
||||
if (this.settings.sync.onSearch && (this.dirty || this.sessionsDirty)) {
|
||||
void this.sync({ reason: "search" }).catch((err) => {
|
||||
startAsyncSearchSync({
|
||||
enabled: this.settings.sync.onSearch,
|
||||
dirty: this.dirty,
|
||||
sessionsDirty: this.sessionsDirty,
|
||||
sync: async (params) => await this.sync(params),
|
||||
onError: (err) => {
|
||||
log.warn(`memory sync failed (search): ${String(err)}`);
|
||||
});
|
||||
}
|
||||
},
|
||||
});
|
||||
const hasIndexedContent = this.hasIndexedContent();
|
||||
if (!hasIndexedContent) {
|
||||
return [];
|
||||
@@ -982,16 +943,7 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
this.sessionUnsubscribe();
|
||||
this.sessionUnsubscribe = null;
|
||||
}
|
||||
if (pendingSync) {
|
||||
try {
|
||||
await pendingSync;
|
||||
} catch {}
|
||||
}
|
||||
if (pendingProviderInit) {
|
||||
try {
|
||||
await pendingProviderInit;
|
||||
} catch {}
|
||||
}
|
||||
await awaitPendingManagerWork({ pendingSync, pendingProviderInit });
|
||||
this.db.close();
|
||||
INDEX_CACHE.delete(this.cacheKey);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user