diff --git a/src/memory/manager-embedding-ops.ts b/src/memory/manager-embedding-ops.ts new file mode 100644 index 00000000000..6606c3aea67 --- /dev/null +++ b/src/memory/manager-embedding-ops.ts @@ -0,0 +1,803 @@ +// @ts-nocheck +// oxlint-disable eslint/no-unused-vars, typescript/no-explicit-any +import fs from "node:fs/promises"; +import type { SessionFileEntry } from "./session-files.js"; +import type { MemorySource } from "./types.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; +import { runGeminiEmbeddingBatches, type GeminiBatchRequest } from "./batch-gemini.js"; +import { + OPENAI_BATCH_ENDPOINT, + type OpenAiBatchRequest, + runOpenAiEmbeddingBatches, +} from "./batch-openai.js"; +import { type VoyageBatchRequest, runVoyageEmbeddingBatches } from "./batch-voyage.js"; +import { enforceEmbeddingMaxInputTokens } from "./embedding-chunk-limits.js"; +import { estimateUtf8Bytes } from "./embedding-input-limits.js"; +import { + chunkMarkdown, + hashText, + parseEmbedding, + remapChunkLines, + type MemoryChunk, + type MemoryFileEntry, +} from "./internal.js"; + +const VECTOR_TABLE = "chunks_vec"; +const FTS_TABLE = "chunks_fts"; +const EMBEDDING_CACHE_TABLE = "embedding_cache"; +const EMBEDDING_BATCH_MAX_TOKENS = 8000; +const EMBEDDING_INDEX_CONCURRENCY = 4; +const EMBEDDING_RETRY_MAX_ATTEMPTS = 3; +const EMBEDDING_RETRY_BASE_DELAY_MS = 500; +const EMBEDDING_RETRY_MAX_DELAY_MS = 8000; +const BATCH_FAILURE_LIMIT = 2; +const EMBEDDING_QUERY_TIMEOUT_REMOTE_MS = 60_000; +const EMBEDDING_QUERY_TIMEOUT_LOCAL_MS = 5 * 60_000; +const EMBEDDING_BATCH_TIMEOUT_REMOTE_MS = 2 * 60_000; +const EMBEDDING_BATCH_TIMEOUT_LOCAL_MS = 10 * 60_000; + +const vectorToBlob = (embedding: number[]): Buffer => + Buffer.from(new Float32Array(embedding).buffer); + +const log = createSubsystemLogger("memory"); + +class MemoryManagerEmbeddingOps { + [key: string]: any; + private buildEmbeddingBatches(chunks: MemoryChunk[]): MemoryChunk[][] { + const batches: MemoryChunk[][] = []; + let current: MemoryChunk[] = []; + let currentTokens = 0; + + for (const chunk of chunks) { + const estimate = estimateUtf8Bytes(chunk.text); + const wouldExceed = + current.length > 0 && currentTokens + estimate > EMBEDDING_BATCH_MAX_TOKENS; + if (wouldExceed) { + batches.push(current); + current = []; + currentTokens = 0; + } + if (current.length === 0 && estimate > EMBEDDING_BATCH_MAX_TOKENS) { + batches.push([chunk]); + continue; + } + current.push(chunk); + currentTokens += estimate; + } + + if (current.length > 0) { + batches.push(current); + } + return batches; + } + + private loadEmbeddingCache(hashes: string[]): Map { + if (!this.cache.enabled) { + return new Map(); + } + if (hashes.length === 0) { + return new Map(); + } + const unique: string[] = []; + const seen = new Set(); + for (const hash of hashes) { + if (!hash) { + continue; + } + if (seen.has(hash)) { + continue; + } + seen.add(hash); + unique.push(hash); + } + if (unique.length === 0) { + return new Map(); + } + + const out = new Map(); + const baseParams = [this.provider.id, this.provider.model, this.providerKey]; + const batchSize = 400; + for (let start = 0; start < unique.length; start += batchSize) { + const batch = unique.slice(start, start + batchSize); + const placeholders = batch.map(() => "?").join(", "); + const rows = this.db + .prepare( + `SELECT hash, embedding FROM ${EMBEDDING_CACHE_TABLE}\n` + + ` WHERE provider = ? AND model = ? AND provider_key = ? AND hash IN (${placeholders})`, + ) + .all(...baseParams, ...batch) as Array<{ hash: string; embedding: string }>; + for (const row of rows) { + out.set(row.hash, parseEmbedding(row.embedding)); + } + } + return out; + } + + private upsertEmbeddingCache(entries: Array<{ hash: string; embedding: number[] }>): void { + if (!this.cache.enabled) { + return; + } + if (entries.length === 0) { + return; + } + const now = Date.now(); + const stmt = this.db.prepare( + `INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)\n` + + ` VALUES (?, ?, ?, ?, ?, ?, ?)\n` + + ` ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET\n` + + ` embedding=excluded.embedding,\n` + + ` dims=excluded.dims,\n` + + ` updated_at=excluded.updated_at`, + ); + for (const entry of entries) { + const embedding = entry.embedding ?? []; + stmt.run( + this.provider.id, + this.provider.model, + this.providerKey, + entry.hash, + JSON.stringify(embedding), + embedding.length, + now, + ); + } + } + + private pruneEmbeddingCacheIfNeeded(): void { + if (!this.cache.enabled) { + return; + } + const max = this.cache.maxEntries; + if (!max || max <= 0) { + return; + } + const row = this.db.prepare(`SELECT COUNT(*) as c FROM ${EMBEDDING_CACHE_TABLE}`).get() as + | { c: number } + | undefined; + const count = row?.c ?? 0; + if (count <= max) { + return; + } + const excess = count - max; + this.db + .prepare( + `DELETE FROM ${EMBEDDING_CACHE_TABLE}\n` + + ` WHERE rowid IN (\n` + + ` SELECT rowid FROM ${EMBEDDING_CACHE_TABLE}\n` + + ` ORDER BY updated_at ASC\n` + + ` LIMIT ?\n` + + ` )`, + ) + .run(excess); + } + + private async embedChunksInBatches(chunks: MemoryChunk[]): Promise { + if (chunks.length === 0) { + return []; + } + const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); + const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); + const missing: Array<{ index: number; chunk: MemoryChunk }> = []; + + for (let i = 0; i < chunks.length; i += 1) { + const chunk = chunks[i]; + const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; + if (hit && hit.length > 0) { + embeddings[i] = hit; + } else if (chunk) { + missing.push({ index: i, chunk }); + } + } + + if (missing.length === 0) { + return embeddings; + } + + const missingChunks = missing.map((m) => m.chunk); + const batches = this.buildEmbeddingBatches(missingChunks); + const toCache: Array<{ hash: string; embedding: number[] }> = []; + let cursor = 0; + for (const batch of batches) { + const batchEmbeddings = await this.embedBatchWithRetry(batch.map((chunk) => chunk.text)); + for (let i = 0; i < batch.length; i += 1) { + const item = missing[cursor + i]; + const embedding = batchEmbeddings[i] ?? []; + if (item) { + embeddings[item.index] = embedding; + toCache.push({ hash: item.chunk.hash, embedding }); + } + } + cursor += batch.length; + } + this.upsertEmbeddingCache(toCache); + return embeddings; + } + + private computeProviderKey(): string { + if (this.provider.id === "openai" && this.openAi) { + const entries = Object.entries(this.openAi.headers) + .filter(([key]) => key.toLowerCase() !== "authorization") + .toSorted(([a], [b]) => a.localeCompare(b)) + .map(([key, value]) => [key, value]); + return hashText( + JSON.stringify({ + provider: "openai", + baseUrl: this.openAi.baseUrl, + model: this.openAi.model, + headers: entries, + }), + ); + } + if (this.provider.id === "gemini" && this.gemini) { + const entries = Object.entries(this.gemini.headers) + .filter(([key]) => { + const lower = key.toLowerCase(); + return lower !== "authorization" && lower !== "x-goog-api-key"; + }) + .toSorted(([a], [b]) => a.localeCompare(b)) + .map(([key, value]) => [key, value]); + return hashText( + JSON.stringify({ + provider: "gemini", + baseUrl: this.gemini.baseUrl, + model: this.gemini.model, + headers: entries, + }), + ); + } + return hashText(JSON.stringify({ provider: this.provider.id, model: this.provider.model })); + } + + private async embedChunksWithBatch( + chunks: MemoryChunk[], + entry: MemoryFileEntry | SessionFileEntry, + source: MemorySource, + ): Promise { + if (this.provider.id === "openai" && this.openAi) { + return this.embedChunksWithOpenAiBatch(chunks, entry, source); + } + if (this.provider.id === "gemini" && this.gemini) { + return this.embedChunksWithGeminiBatch(chunks, entry, source); + } + if (this.provider.id === "voyage" && this.voyage) { + return this.embedChunksWithVoyageBatch(chunks, entry, source); + } + return this.embedChunksInBatches(chunks); + } + + private async embedChunksWithVoyageBatch( + chunks: MemoryChunk[], + entry: MemoryFileEntry | SessionFileEntry, + source: MemorySource, + ): Promise { + const voyage = this.voyage; + if (!voyage) { + return this.embedChunksInBatches(chunks); + } + if (chunks.length === 0) { + return []; + } + const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); + const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); + const missing: Array<{ index: number; chunk: MemoryChunk }> = []; + + for (let i = 0; i < chunks.length; i += 1) { + const chunk = chunks[i]; + const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; + if (hit && hit.length > 0) { + embeddings[i] = hit; + } else if (chunk) { + missing.push({ index: i, chunk }); + } + } + + if (missing.length === 0) { + return embeddings; + } + + const requests: VoyageBatchRequest[] = []; + const mapping = new Map(); + for (const item of missing) { + const chunk = item.chunk; + const customId = hashText( + `${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`, + ); + mapping.set(customId, { index: item.index, hash: chunk.hash }); + requests.push({ + custom_id: customId, + body: { + input: chunk.text, + }, + }); + } + const batchResult = await this.runBatchWithFallback({ + provider: "voyage", + run: async () => + await runVoyageEmbeddingBatches({ + client: voyage, + agentId: this.agentId, + requests, + wait: this.batch.wait, + concurrency: this.batch.concurrency, + pollIntervalMs: this.batch.pollIntervalMs, + timeoutMs: this.batch.timeoutMs, + debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }), + }), + fallback: async () => await this.embedChunksInBatches(chunks), + }); + if (Array.isArray(batchResult)) { + return batchResult; + } + const byCustomId = batchResult; + + const toCache: Array<{ hash: string; embedding: number[] }> = []; + for (const [customId, embedding] of byCustomId.entries()) { + const mapped = mapping.get(customId); + if (!mapped) { + continue; + } + embeddings[mapped.index] = embedding; + toCache.push({ hash: mapped.hash, embedding }); + } + this.upsertEmbeddingCache(toCache); + return embeddings; + } + + private async embedChunksWithOpenAiBatch( + chunks: MemoryChunk[], + entry: MemoryFileEntry | SessionFileEntry, + source: MemorySource, + ): Promise { + const openAi = this.openAi; + if (!openAi) { + return this.embedChunksInBatches(chunks); + } + if (chunks.length === 0) { + return []; + } + const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); + const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); + const missing: Array<{ index: number; chunk: MemoryChunk }> = []; + + for (let i = 0; i < chunks.length; i += 1) { + const chunk = chunks[i]; + const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; + if (hit && hit.length > 0) { + embeddings[i] = hit; + } else if (chunk) { + missing.push({ index: i, chunk }); + } + } + + if (missing.length === 0) { + return embeddings; + } + + const requests: OpenAiBatchRequest[] = []; + const mapping = new Map(); + for (const item of missing) { + const chunk = item.chunk; + const customId = hashText( + `${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`, + ); + mapping.set(customId, { index: item.index, hash: chunk.hash }); + requests.push({ + custom_id: customId, + method: "POST", + url: OPENAI_BATCH_ENDPOINT, + body: { + model: this.openAi?.model ?? this.provider.model, + input: chunk.text, + }, + }); + } + const batchResult = await this.runBatchWithFallback({ + provider: "openai", + run: async () => + await runOpenAiEmbeddingBatches({ + openAi, + agentId: this.agentId, + requests, + wait: this.batch.wait, + concurrency: this.batch.concurrency, + pollIntervalMs: this.batch.pollIntervalMs, + timeoutMs: this.batch.timeoutMs, + debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }), + }), + fallback: async () => await this.embedChunksInBatches(chunks), + }); + if (Array.isArray(batchResult)) { + return batchResult; + } + const byCustomId = batchResult; + + const toCache: Array<{ hash: string; embedding: number[] }> = []; + for (const [customId, embedding] of byCustomId.entries()) { + const mapped = mapping.get(customId); + if (!mapped) { + continue; + } + embeddings[mapped.index] = embedding; + toCache.push({ hash: mapped.hash, embedding }); + } + this.upsertEmbeddingCache(toCache); + return embeddings; + } + + private async embedChunksWithGeminiBatch( + chunks: MemoryChunk[], + entry: MemoryFileEntry | SessionFileEntry, + source: MemorySource, + ): Promise { + const gemini = this.gemini; + if (!gemini) { + return this.embedChunksInBatches(chunks); + } + if (chunks.length === 0) { + return []; + } + const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); + const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); + const missing: Array<{ index: number; chunk: MemoryChunk }> = []; + + for (let i = 0; i < chunks.length; i += 1) { + const chunk = chunks[i]; + const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; + if (hit && hit.length > 0) { + embeddings[i] = hit; + } else if (chunk) { + missing.push({ index: i, chunk }); + } + } + + if (missing.length === 0) { + return embeddings; + } + + const requests: GeminiBatchRequest[] = []; + const mapping = new Map(); + for (const item of missing) { + const chunk = item.chunk; + const customId = hashText( + `${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`, + ); + mapping.set(customId, { index: item.index, hash: chunk.hash }); + requests.push({ + custom_id: customId, + content: { parts: [{ text: chunk.text }] }, + taskType: "RETRIEVAL_DOCUMENT", + }); + } + + const batchResult = await this.runBatchWithFallback({ + provider: "gemini", + run: async () => + await runGeminiEmbeddingBatches({ + gemini, + agentId: this.agentId, + requests, + wait: this.batch.wait, + concurrency: this.batch.concurrency, + pollIntervalMs: this.batch.pollIntervalMs, + timeoutMs: this.batch.timeoutMs, + debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }), + }), + fallback: async () => await this.embedChunksInBatches(chunks), + }); + if (Array.isArray(batchResult)) { + return batchResult; + } + const byCustomId = batchResult; + + const toCache: Array<{ hash: string; embedding: number[] }> = []; + for (const [customId, embedding] of byCustomId.entries()) { + const mapped = mapping.get(customId); + if (!mapped) { + continue; + } + embeddings[mapped.index] = embedding; + toCache.push({ hash: mapped.hash, embedding }); + } + this.upsertEmbeddingCache(toCache); + return embeddings; + } + + private async embedBatchWithRetry(texts: string[]): Promise { + if (texts.length === 0) { + return []; + } + let attempt = 0; + let delayMs = EMBEDDING_RETRY_BASE_DELAY_MS; + while (true) { + try { + const timeoutMs = this.resolveEmbeddingTimeout("batch"); + log.debug("memory embeddings: batch start", { + provider: this.provider.id, + items: texts.length, + timeoutMs, + }); + return await this.withTimeout( + this.provider.embedBatch(texts), + timeoutMs, + `memory embeddings batch timed out after ${Math.round(timeoutMs / 1000)}s`, + ); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + if (!this.isRetryableEmbeddingError(message) || attempt >= EMBEDDING_RETRY_MAX_ATTEMPTS) { + throw err; + } + const waitMs = Math.min( + EMBEDDING_RETRY_MAX_DELAY_MS, + Math.round(delayMs * (1 + Math.random() * 0.2)), + ); + log.warn(`memory embeddings rate limited; retrying in ${waitMs}ms`); + await new Promise((resolve) => setTimeout(resolve, waitMs)); + delayMs *= 2; + attempt += 1; + } + } + } + + private isRetryableEmbeddingError(message: string): boolean { + return /(rate[_ ]limit|too many requests|429|resource has been exhausted|5\d\d|cloudflare)/i.test( + message, + ); + } + + private resolveEmbeddingTimeout(kind: "query" | "batch"): number { + const isLocal = this.provider.id === "local"; + if (kind === "query") { + return isLocal ? EMBEDDING_QUERY_TIMEOUT_LOCAL_MS : EMBEDDING_QUERY_TIMEOUT_REMOTE_MS; + } + return isLocal ? EMBEDDING_BATCH_TIMEOUT_LOCAL_MS : EMBEDDING_BATCH_TIMEOUT_REMOTE_MS; + } + + private async embedQueryWithTimeout(text: string): Promise { + const timeoutMs = this.resolveEmbeddingTimeout("query"); + log.debug("memory embeddings: query start", { provider: this.provider.id, timeoutMs }); + return await this.withTimeout( + this.provider.embedQuery(text), + timeoutMs, + `memory embeddings query timed out after ${Math.round(timeoutMs / 1000)}s`, + ); + } + + private async withTimeout( + promise: Promise, + timeoutMs: number, + message: string, + ): Promise { + if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) { + return await promise; + } + let timer: NodeJS.Timeout | null = null; + const timeoutPromise = new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error(message)), timeoutMs); + }); + try { + return (await Promise.race([promise, timeoutPromise])) as T; + } finally { + if (timer) { + clearTimeout(timer); + } + } + } + + private async withBatchFailureLock(fn: () => Promise): Promise { + let release: () => void; + const wait = this.batchFailureLock; + this.batchFailureLock = new Promise((resolve) => { + release = resolve; + }); + await wait; + try { + return await fn(); + } finally { + release!(); + } + } + + private async resetBatchFailureCount(): Promise { + await this.withBatchFailureLock(async () => { + if (this.batchFailureCount > 0) { + log.debug("memory embeddings: batch recovered; resetting failure count"); + } + this.batchFailureCount = 0; + this.batchFailureLastError = undefined; + this.batchFailureLastProvider = undefined; + }); + } + + private async recordBatchFailure(params: { + provider: string; + message: string; + attempts?: number; + forceDisable?: boolean; + }): Promise<{ disabled: boolean; count: number }> { + return await this.withBatchFailureLock(async () => { + if (!this.batch.enabled) { + return { disabled: true, count: this.batchFailureCount }; + } + const increment = params.forceDisable + ? BATCH_FAILURE_LIMIT + : Math.max(1, params.attempts ?? 1); + this.batchFailureCount += increment; + this.batchFailureLastError = params.message; + this.batchFailureLastProvider = params.provider; + const disabled = params.forceDisable || this.batchFailureCount >= BATCH_FAILURE_LIMIT; + if (disabled) { + this.batch.enabled = false; + } + return { disabled, count: this.batchFailureCount }; + }); + } + + private isBatchTimeoutError(message: string): boolean { + return /timed out|timeout/i.test(message); + } + + private async runBatchWithTimeoutRetry(params: { + provider: string; + run: () => Promise; + }): Promise { + try { + return await params.run(); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + if (this.isBatchTimeoutError(message)) { + log.warn(`memory embeddings: ${params.provider} batch timed out; retrying once`); + try { + return await params.run(); + } catch (retryErr) { + (retryErr as { batchAttempts?: number }).batchAttempts = 2; + throw retryErr; + } + } + throw err; + } + } + + private async runBatchWithFallback(params: { + provider: string; + run: () => Promise; + fallback: () => Promise; + }): Promise { + if (!this.batch.enabled) { + return await params.fallback(); + } + try { + const result = await this.runBatchWithTimeoutRetry({ + provider: params.provider, + run: params.run, + }); + await this.resetBatchFailureCount(); + return result; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + const attempts = (err as { batchAttempts?: number }).batchAttempts ?? 1; + const forceDisable = /asyncBatchEmbedContent not available/i.test(message); + const failure = await this.recordBatchFailure({ + provider: params.provider, + message, + attempts, + forceDisable, + }); + const suffix = failure.disabled ? "disabling batch" : "keeping batch enabled"; + log.warn( + `memory embeddings: ${params.provider} batch failed (${failure.count}/${BATCH_FAILURE_LIMIT}); ${suffix}; falling back to non-batch embeddings: ${message}`, + ); + return await params.fallback(); + } + } + + private getIndexConcurrency(): number { + return this.batch.enabled ? this.batch.concurrency : EMBEDDING_INDEX_CONCURRENCY; + } + + private async indexFile( + entry: MemoryFileEntry | SessionFileEntry, + options: { source: MemorySource; content?: string }, + ) { + const content = options.content ?? (await fs.readFile(entry.absPath, "utf-8")); + const chunks = enforceEmbeddingMaxInputTokens( + this.provider, + chunkMarkdown(content, this.settings.chunking).filter( + (chunk) => chunk.text.trim().length > 0, + ), + ); + if (options.source === "sessions" && "lineMap" in entry) { + remapChunkLines(chunks, entry.lineMap); + } + const embeddings = this.batch.enabled + ? await this.embedChunksWithBatch(chunks, entry, options.source) + : await this.embedChunksInBatches(chunks); + const sample = embeddings.find((embedding) => embedding.length > 0); + const vectorReady = sample ? await this.ensureVectorReady(sample.length) : false; + const now = Date.now(); + if (vectorReady) { + try { + this.db + .prepare( + `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, + ) + .run(entry.path, options.source); + } catch {} + } + if (this.fts.enabled && this.fts.available) { + try { + this.db + .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) + .run(entry.path, options.source, this.provider.model); + } catch {} + } + this.db + .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) + .run(entry.path, options.source); + for (let i = 0; i < chunks.length; i++) { + const chunk = chunks[i]; + const embedding = embeddings[i] ?? []; + const id = hashText( + `${options.source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${this.provider.model}`, + ); + this.db + .prepare( + `INSERT INTO chunks (id, path, source, start_line, end_line, hash, model, text, embedding, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + hash=excluded.hash, + model=excluded.model, + text=excluded.text, + embedding=excluded.embedding, + updated_at=excluded.updated_at`, + ) + .run( + id, + entry.path, + options.source, + chunk.startLine, + chunk.endLine, + chunk.hash, + this.provider.model, + chunk.text, + JSON.stringify(embedding), + now, + ); + if (vectorReady && embedding.length > 0) { + try { + this.db.prepare(`DELETE FROM ${VECTOR_TABLE} WHERE id = ?`).run(id); + } catch {} + this.db + .prepare(`INSERT INTO ${VECTOR_TABLE} (id, embedding) VALUES (?, ?)`) + .run(id, vectorToBlob(embedding)); + } + if (this.fts.enabled && this.fts.available) { + this.db + .prepare( + `INSERT INTO ${FTS_TABLE} (text, id, path, source, model, start_line, end_line)\n` + + ` VALUES (?, ?, ?, ?, ?, ?, ?)`, + ) + .run( + chunk.text, + id, + entry.path, + options.source, + this.provider.model, + chunk.startLine, + chunk.endLine, + ); + } + } + this.db + .prepare( + `INSERT INTO files (path, source, hash, mtime, size) VALUES (?, ?, ?, ?, ?) + ON CONFLICT(path) DO UPDATE SET + source=excluded.source, + hash=excluded.hash, + mtime=excluded.mtime, + size=excluded.size`, + ) + .run(entry.path, options.source, entry.hash, entry.mtimeMs, entry.size); + } +} + +export const memoryManagerEmbeddingOps = MemoryManagerEmbeddingOps.prototype; diff --git a/src/memory/manager-sync-ops.ts b/src/memory/manager-sync-ops.ts new file mode 100644 index 00000000000..fe553ef40fe --- /dev/null +++ b/src/memory/manager-sync-ops.ts @@ -0,0 +1,998 @@ +// @ts-nocheck +// oxlint-disable eslint/no-unused-vars, typescript/no-explicit-any +import type { DatabaseSync } from "node:sqlite"; +import chokidar, { type FSWatcher } from "chokidar"; +import { randomUUID } from "node:crypto"; +import fsSync from "node:fs"; +import fs from "node:fs/promises"; +import path from "node:path"; +import type { MemorySource, MemorySyncProgressUpdate } from "./types.js"; +import { resolveSessionTranscriptsDirForAgent } from "../config/sessions/paths.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; +import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; +import { resolveUserPath } from "../utils.js"; +import { + buildFileEntry, + ensureDir, + isMemoryPath, + listMemoryFiles, + normalizeExtraMemoryPaths, + parseEmbedding, + remapChunkLines, + runWithConcurrency, + type MemoryFileEntry, +} from "./internal.js"; +import { ensureMemoryIndexSchema } from "./memory-schema.js"; +import { + buildSessionEntry, + listSessionFilesForAgent, + sessionPathForFile, + type SessionFileEntry, +} from "./session-files.js"; +import { loadSqliteVecExtension } from "./sqlite-vec.js"; +import { requireNodeSqlite } from "./sqlite.js"; + +type MemoryIndexMeta = { + model: string; + provider: string; + providerKey?: string; + chunkTokens: number; + chunkOverlap: number; + vectorDims?: number; +}; + +type MemorySyncProgressState = { + completed: number; + total: number; + label?: string; + report: (update: MemorySyncProgressUpdate) => void; +}; + +const META_KEY = "memory_index_meta_v1"; +const VECTOR_TABLE = "chunks_vec"; +const FTS_TABLE = "chunks_fts"; +const EMBEDDING_CACHE_TABLE = "embedding_cache"; +const SESSION_DIRTY_DEBOUNCE_MS = 5000; +const SESSION_DELTA_READ_CHUNK_BYTES = 64 * 1024; +const VECTOR_LOAD_TIMEOUT_MS = 30_000; + +const log = createSubsystemLogger("memory"); + +class MemoryManagerSyncOps { + [key: string]: any; + private async ensureVectorReady(dimensions?: number): Promise { + if (!this.vector.enabled) { + return false; + } + if (!this.vectorReady) { + this.vectorReady = this.withTimeout( + this.loadVectorExtension(), + VECTOR_LOAD_TIMEOUT_MS, + `sqlite-vec load timed out after ${Math.round(VECTOR_LOAD_TIMEOUT_MS / 1000)}s`, + ); + } + let ready = false; + try { + ready = await this.vectorReady; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.vector.available = false; + this.vector.loadError = message; + this.vectorReady = null; + log.warn(`sqlite-vec unavailable: ${message}`); + return false; + } + if (ready && typeof dimensions === "number" && dimensions > 0) { + this.ensureVectorTable(dimensions); + } + return ready; + } + + private async loadVectorExtension(): Promise { + if (this.vector.available !== null) { + return this.vector.available; + } + if (!this.vector.enabled) { + this.vector.available = false; + return false; + } + try { + const resolvedPath = this.vector.extensionPath?.trim() + ? resolveUserPath(this.vector.extensionPath) + : undefined; + const loaded = await loadSqliteVecExtension({ db: this.db, extensionPath: resolvedPath }); + if (!loaded.ok) { + throw new Error(loaded.error ?? "unknown sqlite-vec load error"); + } + this.vector.extensionPath = loaded.extensionPath; + this.vector.available = true; + return true; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.vector.available = false; + this.vector.loadError = message; + log.warn(`sqlite-vec unavailable: ${message}`); + return false; + } + } + + private ensureVectorTable(dimensions: number): void { + if (this.vector.dims === dimensions) { + return; + } + if (this.vector.dims && this.vector.dims !== dimensions) { + this.dropVectorTable(); + } + this.db.exec( + `CREATE VIRTUAL TABLE IF NOT EXISTS ${VECTOR_TABLE} USING vec0(\n` + + ` id TEXT PRIMARY KEY,\n` + + ` embedding FLOAT[${dimensions}]\n` + + `)`, + ); + this.vector.dims = dimensions; + } + + private dropVectorTable(): void { + try { + this.db.exec(`DROP TABLE IF EXISTS ${VECTOR_TABLE}`); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + log.debug(`Failed to drop ${VECTOR_TABLE}: ${message}`); + } + } + + private buildSourceFilter(alias?: string): { sql: string; params: MemorySource[] } { + const sources = Array.from(this.sources); + if (sources.length === 0) { + return { sql: "", params: [] }; + } + const column = alias ? `${alias}.source` : "source"; + const placeholders = sources.map(() => "?").join(", "); + return { sql: ` AND ${column} IN (${placeholders})`, params: sources }; + } + + private openDatabase(): DatabaseSync { + const dbPath = resolveUserPath(this.settings.store.path); + return this.openDatabaseAtPath(dbPath); + } + + private openDatabaseAtPath(dbPath: string): DatabaseSync { + const dir = path.dirname(dbPath); + ensureDir(dir); + const { DatabaseSync } = requireNodeSqlite(); + return new DatabaseSync(dbPath, { allowExtension: this.settings.store.vector.enabled }); + } + + private seedEmbeddingCache(sourceDb: DatabaseSync): void { + if (!this.cache.enabled) { + return; + } + try { + const rows = sourceDb + .prepare( + `SELECT provider, model, provider_key, hash, embedding, dims, updated_at FROM ${EMBEDDING_CACHE_TABLE}`, + ) + .all() as Array<{ + provider: string; + model: string; + provider_key: string; + hash: string; + embedding: string; + dims: number | null; + updated_at: number; + }>; + if (!rows.length) { + return; + } + const insert = this.db.prepare( + `INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET + embedding=excluded.embedding, + dims=excluded.dims, + updated_at=excluded.updated_at`, + ); + this.db.exec("BEGIN"); + for (const row of rows) { + insert.run( + row.provider, + row.model, + row.provider_key, + row.hash, + row.embedding, + row.dims, + row.updated_at, + ); + } + this.db.exec("COMMIT"); + } catch (err) { + try { + this.db.exec("ROLLBACK"); + } catch {} + throw err; + } + } + + private async swapIndexFiles(targetPath: string, tempPath: string): Promise { + const backupPath = `${targetPath}.backup-${randomUUID()}`; + await this.moveIndexFiles(targetPath, backupPath); + try { + await this.moveIndexFiles(tempPath, targetPath); + } catch (err) { + await this.moveIndexFiles(backupPath, targetPath); + throw err; + } + await this.removeIndexFiles(backupPath); + } + + private async moveIndexFiles(sourceBase: string, targetBase: string): Promise { + const suffixes = ["", "-wal", "-shm"]; + for (const suffix of suffixes) { + const source = `${sourceBase}${suffix}`; + const target = `${targetBase}${suffix}`; + try { + await fs.rename(source, target); + } catch (err) { + if ((err as NodeJS.ErrnoException).code !== "ENOENT") { + throw err; + } + } + } + } + + private async removeIndexFiles(basePath: string): Promise { + const suffixes = ["", "-wal", "-shm"]; + await Promise.all(suffixes.map((suffix) => fs.rm(`${basePath}${suffix}`, { force: true }))); + } + + private ensureSchema() { + const result = ensureMemoryIndexSchema({ + db: this.db, + embeddingCacheTable: EMBEDDING_CACHE_TABLE, + ftsTable: FTS_TABLE, + ftsEnabled: this.fts.enabled, + }); + this.fts.available = result.ftsAvailable; + if (result.ftsError) { + this.fts.loadError = result.ftsError; + log.warn(`fts unavailable: ${result.ftsError}`); + } + } + + private ensureWatcher() { + if (!this.sources.has("memory") || !this.settings.sync.watch || this.watcher) { + return; + } + const additionalPaths = normalizeExtraMemoryPaths(this.workspaceDir, this.settings.extraPaths) + .map((entry) => { + try { + const stat = fsSync.lstatSync(entry); + return stat.isSymbolicLink() ? null : entry; + } catch { + return null; + } + }) + .filter((entry): entry is string => Boolean(entry)); + const watchPaths = new Set([ + path.join(this.workspaceDir, "MEMORY.md"), + path.join(this.workspaceDir, "memory.md"), + path.join(this.workspaceDir, "memory"), + ...additionalPaths, + ]); + this.watcher = chokidar.watch(Array.from(watchPaths), { + ignoreInitial: true, + awaitWriteFinish: { + stabilityThreshold: this.settings.sync.watchDebounceMs, + pollInterval: 100, + }, + }); + const markDirty = () => { + this.dirty = true; + this.scheduleWatchSync(); + }; + this.watcher.on("add", markDirty); + this.watcher.on("change", markDirty); + this.watcher.on("unlink", markDirty); + } + + private ensureSessionListener() { + if (!this.sources.has("sessions") || this.sessionUnsubscribe) { + return; + } + this.sessionUnsubscribe = onSessionTranscriptUpdate((update) => { + if (this.closed) { + return; + } + const sessionFile = update.sessionFile; + if (!this.isSessionFileForAgent(sessionFile)) { + return; + } + this.scheduleSessionDirty(sessionFile); + }); + } + + private scheduleSessionDirty(sessionFile: string) { + this.sessionPendingFiles.add(sessionFile); + if (this.sessionWatchTimer) { + return; + } + this.sessionWatchTimer = setTimeout(() => { + this.sessionWatchTimer = null; + void this.processSessionDeltaBatch().catch((err) => { + log.warn(`memory session delta failed: ${String(err)}`); + }); + }, SESSION_DIRTY_DEBOUNCE_MS); + } + + private async processSessionDeltaBatch(): Promise { + if (this.sessionPendingFiles.size === 0) { + return; + } + const pending = Array.from(this.sessionPendingFiles); + this.sessionPendingFiles.clear(); + let shouldSync = false; + for (const sessionFile of pending) { + const delta = await this.updateSessionDelta(sessionFile); + if (!delta) { + continue; + } + const bytesThreshold = delta.deltaBytes; + const messagesThreshold = delta.deltaMessages; + const bytesHit = + bytesThreshold <= 0 ? delta.pendingBytes > 0 : delta.pendingBytes >= bytesThreshold; + const messagesHit = + messagesThreshold <= 0 + ? delta.pendingMessages > 0 + : delta.pendingMessages >= messagesThreshold; + if (!bytesHit && !messagesHit) { + continue; + } + this.sessionsDirtyFiles.add(sessionFile); + this.sessionsDirty = true; + delta.pendingBytes = + bytesThreshold > 0 ? Math.max(0, delta.pendingBytes - bytesThreshold) : 0; + delta.pendingMessages = + messagesThreshold > 0 ? Math.max(0, delta.pendingMessages - messagesThreshold) : 0; + shouldSync = true; + } + if (shouldSync) { + void this.sync({ reason: "session-delta" }).catch((err) => { + log.warn(`memory sync failed (session-delta): ${String(err)}`); + }); + } + } + + private async updateSessionDelta(sessionFile: string): Promise<{ + deltaBytes: number; + deltaMessages: number; + pendingBytes: number; + pendingMessages: number; + } | null> { + const thresholds = this.settings.sync.sessions; + if (!thresholds) { + return null; + } + let stat: { size: number }; + try { + stat = await fs.stat(sessionFile); + } catch { + return null; + } + const size = stat.size; + let state = this.sessionDeltas.get(sessionFile); + if (!state) { + state = { lastSize: 0, pendingBytes: 0, pendingMessages: 0 }; + this.sessionDeltas.set(sessionFile, state); + } + const deltaBytes = Math.max(0, size - state.lastSize); + if (deltaBytes === 0 && size === state.lastSize) { + return { + deltaBytes: thresholds.deltaBytes, + deltaMessages: thresholds.deltaMessages, + pendingBytes: state.pendingBytes, + pendingMessages: state.pendingMessages, + }; + } + if (size < state.lastSize) { + state.lastSize = size; + state.pendingBytes += size; + const shouldCountMessages = + thresholds.deltaMessages > 0 && + (thresholds.deltaBytes <= 0 || state.pendingBytes < thresholds.deltaBytes); + if (shouldCountMessages) { + state.pendingMessages += await this.countNewlines(sessionFile, 0, size); + } + } else { + state.pendingBytes += deltaBytes; + const shouldCountMessages = + thresholds.deltaMessages > 0 && + (thresholds.deltaBytes <= 0 || state.pendingBytes < thresholds.deltaBytes); + if (shouldCountMessages) { + state.pendingMessages += await this.countNewlines(sessionFile, state.lastSize, size); + } + state.lastSize = size; + } + this.sessionDeltas.set(sessionFile, state); + return { + deltaBytes: thresholds.deltaBytes, + deltaMessages: thresholds.deltaMessages, + pendingBytes: state.pendingBytes, + pendingMessages: state.pendingMessages, + }; + } + + private async countNewlines(absPath: string, start: number, end: number): Promise { + if (end <= start) { + return 0; + } + const handle = await fs.open(absPath, "r"); + try { + let offset = start; + let count = 0; + const buffer = Buffer.alloc(SESSION_DELTA_READ_CHUNK_BYTES); + while (offset < end) { + const toRead = Math.min(buffer.length, end - offset); + const { bytesRead } = await handle.read(buffer, 0, toRead, offset); + if (bytesRead <= 0) { + break; + } + for (let i = 0; i < bytesRead; i += 1) { + if (buffer[i] === 10) { + count += 1; + } + } + offset += bytesRead; + } + return count; + } finally { + await handle.close(); + } + } + + private resetSessionDelta(absPath: string, size: number): void { + const state = this.sessionDeltas.get(absPath); + if (!state) { + return; + } + state.lastSize = size; + state.pendingBytes = 0; + state.pendingMessages = 0; + } + + private isSessionFileForAgent(sessionFile: string): boolean { + if (!sessionFile) { + return false; + } + const sessionsDir = resolveSessionTranscriptsDirForAgent(this.agentId); + const resolvedFile = path.resolve(sessionFile); + const resolvedDir = path.resolve(sessionsDir); + return resolvedFile.startsWith(`${resolvedDir}${path.sep}`); + } + + private ensureIntervalSync() { + const minutes = this.settings.sync.intervalMinutes; + if (!minutes || minutes <= 0 || this.intervalTimer) { + return; + } + const ms = minutes * 60 * 1000; + this.intervalTimer = setInterval(() => { + void this.sync({ reason: "interval" }).catch((err) => { + log.warn(`memory sync failed (interval): ${String(err)}`); + }); + }, ms); + } + + private scheduleWatchSync() { + if (!this.sources.has("memory") || !this.settings.sync.watch) { + return; + } + if (this.watchTimer) { + clearTimeout(this.watchTimer); + } + this.watchTimer = setTimeout(() => { + this.watchTimer = null; + void this.sync({ reason: "watch" }).catch((err) => { + log.warn(`memory sync failed (watch): ${String(err)}`); + }); + }, this.settings.sync.watchDebounceMs); + } + + private shouldSyncSessions( + params?: { reason?: string; force?: boolean }, + needsFullReindex = false, + ) { + if (!this.sources.has("sessions")) { + return false; + } + if (params?.force) { + return true; + } + const reason = params?.reason; + if (reason === "session-start" || reason === "watch") { + return false; + } + if (needsFullReindex) { + return true; + } + return this.sessionsDirty && this.sessionsDirtyFiles.size > 0; + } + + private async syncMemoryFiles(params: { + needsFullReindex: boolean; + progress?: MemorySyncProgressState; + }) { + const files = await listMemoryFiles(this.workspaceDir, this.settings.extraPaths); + const fileEntries = await Promise.all( + files.map(async (file) => buildFileEntry(file, this.workspaceDir)), + ); + log.debug("memory sync: indexing memory files", { + files: fileEntries.length, + needsFullReindex: params.needsFullReindex, + batch: this.batch.enabled, + concurrency: this.getIndexConcurrency(), + }); + const activePaths = new Set(fileEntries.map((entry) => entry.path)); + if (params.progress) { + params.progress.total += fileEntries.length; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + label: this.batch.enabled ? "Indexing memory files (batch)..." : "Indexing memory files…", + }); + } + + const tasks = fileEntries.map((entry) => async () => { + const record = this.db + .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) + .get(entry.path, "memory") as { hash: string } | undefined; + if (!params.needsFullReindex && record?.hash === entry.hash) { + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + return; + } + await this.indexFile(entry, { source: "memory" }); + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + }); + await runWithConcurrency(tasks, this.getIndexConcurrency()); + + const staleRows = this.db + .prepare(`SELECT path FROM files WHERE source = ?`) + .all("memory") as Array<{ path: string }>; + for (const stale of staleRows) { + if (activePaths.has(stale.path)) { + continue; + } + this.db.prepare(`DELETE FROM files WHERE path = ? AND source = ?`).run(stale.path, "memory"); + try { + this.db + .prepare( + `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, + ) + .run(stale.path, "memory"); + } catch {} + this.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(stale.path, "memory"); + if (this.fts.enabled && this.fts.available) { + try { + this.db + .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) + .run(stale.path, "memory", this.provider.model); + } catch {} + } + } + } + + private async syncSessionFiles(params: { + needsFullReindex: boolean; + progress?: MemorySyncProgressState; + }) { + const files = await listSessionFilesForAgent(this.agentId); + const activePaths = new Set(files.map((file) => sessionPathForFile(file))); + const indexAll = params.needsFullReindex || this.sessionsDirtyFiles.size === 0; + log.debug("memory sync: indexing session files", { + files: files.length, + indexAll, + dirtyFiles: this.sessionsDirtyFiles.size, + batch: this.batch.enabled, + concurrency: this.getIndexConcurrency(), + }); + if (params.progress) { + params.progress.total += files.length; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + label: this.batch.enabled ? "Indexing session files (batch)..." : "Indexing session files…", + }); + } + + const tasks = files.map((absPath) => async () => { + if (!indexAll && !this.sessionsDirtyFiles.has(absPath)) { + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + return; + } + const entry = await buildSessionEntry(absPath); + if (!entry) { + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + return; + } + const record = this.db + .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) + .get(entry.path, "sessions") as { hash: string } | undefined; + if (!params.needsFullReindex && record?.hash === entry.hash) { + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + this.resetSessionDelta(absPath, entry.size); + return; + } + await this.indexFile(entry, { source: "sessions", content: entry.content }); + this.resetSessionDelta(absPath, entry.size); + if (params.progress) { + params.progress.completed += 1; + params.progress.report({ + completed: params.progress.completed, + total: params.progress.total, + }); + } + }); + await runWithConcurrency(tasks, this.getIndexConcurrency()); + + const staleRows = this.db + .prepare(`SELECT path FROM files WHERE source = ?`) + .all("sessions") as Array<{ path: string }>; + for (const stale of staleRows) { + if (activePaths.has(stale.path)) { + continue; + } + this.db + .prepare(`DELETE FROM files WHERE path = ? AND source = ?`) + .run(stale.path, "sessions"); + try { + this.db + .prepare( + `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, + ) + .run(stale.path, "sessions"); + } catch {} + this.db + .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) + .run(stale.path, "sessions"); + if (this.fts.enabled && this.fts.available) { + try { + this.db + .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) + .run(stale.path, "sessions", this.provider.model); + } catch {} + } + } + } + + private createSyncProgress( + onProgress: (update: MemorySyncProgressUpdate) => void, + ): MemorySyncProgressState { + const state: MemorySyncProgressState = { + completed: 0, + total: 0, + label: undefined, + report: (update) => { + if (update.label) { + state.label = update.label; + } + const label = + update.total > 0 && state.label + ? `${state.label} ${update.completed}/${update.total}` + : state.label; + onProgress({ + completed: update.completed, + total: update.total, + label, + }); + }, + }; + return state; + } + + private async runSync(params?: { + reason?: string; + force?: boolean; + progress?: (update: MemorySyncProgressUpdate) => void; + }) { + const progress = params?.progress ? this.createSyncProgress(params.progress) : undefined; + if (progress) { + progress.report({ + completed: progress.completed, + total: progress.total, + label: "Loading vector extension…", + }); + } + const vectorReady = await this.ensureVectorReady(); + const meta = this.readMeta(); + const needsFullReindex = + params?.force || + !meta || + meta.model !== this.provider.model || + meta.provider !== this.provider.id || + meta.providerKey !== this.providerKey || + meta.chunkTokens !== this.settings.chunking.tokens || + meta.chunkOverlap !== this.settings.chunking.overlap || + (vectorReady && !meta?.vectorDims); + try { + if (needsFullReindex) { + await this.runSafeReindex({ + reason: params?.reason, + force: params?.force, + progress: progress ?? undefined, + }); + return; + } + + const shouldSyncMemory = + this.sources.has("memory") && (params?.force || needsFullReindex || this.dirty); + const shouldSyncSessions = this.shouldSyncSessions(params, needsFullReindex); + + if (shouldSyncMemory) { + await this.syncMemoryFiles({ needsFullReindex, progress: progress ?? undefined }); + this.dirty = false; + } + + if (shouldSyncSessions) { + await this.syncSessionFiles({ needsFullReindex, progress: progress ?? undefined }); + this.sessionsDirty = false; + this.sessionsDirtyFiles.clear(); + } else if (this.sessionsDirtyFiles.size > 0) { + this.sessionsDirty = true; + } else { + this.sessionsDirty = false; + } + } catch (err) { + const reason = err instanceof Error ? err.message : String(err); + const activated = + this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason)); + if (activated) { + await this.runSafeReindex({ + reason: params?.reason ?? "fallback", + force: true, + progress: progress ?? undefined, + }); + return; + } + throw err; + } + } + + private shouldFallbackOnError(message: string): boolean { + return /embedding|embeddings|batch/i.test(message); + } + + private resolveBatchConfig(): { + enabled: boolean; + wait: boolean; + concurrency: number; + pollIntervalMs: number; + timeoutMs: number; + } { + const batch = this.settings.remote?.batch; + const enabled = Boolean( + batch?.enabled && + ((this.openAi && this.provider.id === "openai") || + (this.gemini && this.provider.id === "gemini") || + (this.voyage && this.provider.id === "voyage")), + ); + return { + enabled, + wait: batch?.wait ?? true, + concurrency: Math.max(1, batch?.concurrency ?? 2), + pollIntervalMs: batch?.pollIntervalMs ?? 2000, + timeoutMs: (batch?.timeoutMinutes ?? 60) * 60 * 1000, + }; + } + + private async activateFallbackProvider(reason: string): Promise { + const fallback = this.settings.fallback; + if (!fallback || fallback === "none" || fallback === this.provider.id) { + return false; + } + if (this.fallbackFrom) { + return false; + } + const fallbackFrom = this.provider.id as "openai" | "gemini" | "local" | "voyage"; + + const fallbackModel = + fallback === "gemini" + ? DEFAULT_GEMINI_EMBEDDING_MODEL + : fallback === "openai" + ? DEFAULT_OPENAI_EMBEDDING_MODEL + : fallback === "voyage" + ? DEFAULT_VOYAGE_EMBEDDING_MODEL + : this.settings.model; + + const fallbackResult = await createEmbeddingProvider({ + config: this.cfg, + agentDir: resolveAgentDir(this.cfg, this.agentId), + provider: fallback, + remote: this.settings.remote, + model: fallbackModel, + fallback: "none", + local: this.settings.local, + }); + + this.fallbackFrom = fallbackFrom; + this.fallbackReason = reason; + this.provider = fallbackResult.provider; + this.openAi = fallbackResult.openAi; + this.gemini = fallbackResult.gemini; + this.voyage = fallbackResult.voyage; + this.providerKey = this.computeProviderKey(); + this.batch = this.resolveBatchConfig(); + log.warn(`memory embeddings: switched to fallback provider (${fallback})`, { reason }); + return true; + } + + private async runSafeReindex(params: { + reason?: string; + force?: boolean; + progress?: MemorySyncProgressState; + }): Promise { + const dbPath = resolveUserPath(this.settings.store.path); + const tempDbPath = `${dbPath}.tmp-${randomUUID()}`; + const tempDb = this.openDatabaseAtPath(tempDbPath); + + const originalDb = this.db; + let originalDbClosed = false; + const originalState = { + ftsAvailable: this.fts.available, + ftsError: this.fts.loadError, + vectorAvailable: this.vector.available, + vectorLoadError: this.vector.loadError, + vectorDims: this.vector.dims, + vectorReady: this.vectorReady, + }; + + const restoreOriginalState = () => { + if (originalDbClosed) { + this.db = this.openDatabaseAtPath(dbPath); + } else { + this.db = originalDb; + } + this.fts.available = originalState.ftsAvailable; + this.fts.loadError = originalState.ftsError; + this.vector.available = originalDbClosed ? null : originalState.vectorAvailable; + this.vector.loadError = originalState.vectorLoadError; + this.vector.dims = originalState.vectorDims; + this.vectorReady = originalDbClosed ? null : originalState.vectorReady; + }; + + this.db = tempDb; + this.vectorReady = null; + this.vector.available = null; + this.vector.loadError = undefined; + this.vector.dims = undefined; + this.fts.available = false; + this.fts.loadError = undefined; + this.ensureSchema(); + + let nextMeta: MemoryIndexMeta | null = null; + + try { + this.seedEmbeddingCache(originalDb); + const shouldSyncMemory = this.sources.has("memory"); + const shouldSyncSessions = this.shouldSyncSessions( + { reason: params.reason, force: params.force }, + true, + ); + + if (shouldSyncMemory) { + await this.syncMemoryFiles({ needsFullReindex: true, progress: params.progress }); + this.dirty = false; + } + + if (shouldSyncSessions) { + await this.syncSessionFiles({ needsFullReindex: true, progress: params.progress }); + this.sessionsDirty = false; + this.sessionsDirtyFiles.clear(); + } else if (this.sessionsDirtyFiles.size > 0) { + this.sessionsDirty = true; + } else { + this.sessionsDirty = false; + } + + nextMeta = { + model: this.provider.model, + provider: this.provider.id, + providerKey: this.providerKey, + chunkTokens: this.settings.chunking.tokens, + chunkOverlap: this.settings.chunking.overlap, + }; + if (this.vector.available && this.vector.dims) { + nextMeta.vectorDims = this.vector.dims; + } + + this.writeMeta(nextMeta); + this.pruneEmbeddingCacheIfNeeded(); + + this.db.close(); + originalDb.close(); + originalDbClosed = true; + + await this.swapIndexFiles(dbPath, tempDbPath); + + this.db = this.openDatabaseAtPath(dbPath); + this.vectorReady = null; + this.vector.available = null; + this.vector.loadError = undefined; + this.ensureSchema(); + this.vector.dims = nextMeta.vectorDims; + } catch (err) { + try { + this.db.close(); + } catch {} + await this.removeIndexFiles(tempDbPath); + restoreOriginalState(); + throw err; + } + } + + private resetIndex() { + this.db.exec(`DELETE FROM files`); + this.db.exec(`DELETE FROM chunks`); + if (this.fts.enabled && this.fts.available) { + try { + this.db.exec(`DELETE FROM ${FTS_TABLE}`); + } catch {} + } + this.dropVectorTable(); + this.vector.dims = undefined; + this.sessionsDirtyFiles.clear(); + } + + private readMeta(): MemoryIndexMeta | null { + const row = this.db.prepare(`SELECT value FROM meta WHERE key = ?`).get(META_KEY) as + | { value: string } + | undefined; + if (!row?.value) { + return null; + } + try { + return JSON.parse(row.value) as MemoryIndexMeta; + } catch { + return null; + } + } + + private writeMeta(meta: MemoryIndexMeta) { + const value = JSON.stringify(meta); + this.db + .prepare( + `INSERT INTO meta (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value`, + ) + .run(META_KEY, value); + } +} + +export const memoryManagerSyncOps = MemoryManagerSyncOps.prototype; diff --git a/src/memory/manager.ts b/src/memory/manager.ts index 715695e82da..92f1f84e95b 100644 --- a/src/memory/manager.ts +++ b/src/memory/manager.ts @@ -1,7 +1,5 @@ import type { DatabaseSync } from "node:sqlite"; -import chokidar, { type FSWatcher } from "chokidar"; -import { randomUUID } from "node:crypto"; -import fsSync from "node:fs"; +import { type FSWatcher } from "chokidar"; import fs from "node:fs/promises"; import path from "node:path"; import type { ResolvedMemorySearchConfig } from "../agents/memory-search.js"; @@ -16,22 +14,7 @@ import type { } from "./types.js"; import { resolveAgentDir, resolveAgentWorkspaceDir } from "../agents/agent-scope.js"; import { resolveMemorySearchConfig } from "../agents/memory-search.js"; -import { resolveSessionTranscriptsDirForAgent } from "../config/sessions/paths.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; -import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; -import { resolveUserPath } from "../utils.js"; -import { runGeminiEmbeddingBatches, type GeminiBatchRequest } from "./batch-gemini.js"; -import { - OPENAI_BATCH_ENDPOINT, - type OpenAiBatchRequest, - runOpenAiEmbeddingBatches, -} from "./batch-openai.js"; -import { type VoyageBatchRequest, runVoyageEmbeddingBatches } from "./batch-voyage.js"; -import { enforceEmbeddingMaxInputTokens } from "./embedding-chunk-limits.js"; -import { estimateUtf8Bytes } from "./embedding-input-limits.js"; -import { DEFAULT_GEMINI_EMBEDDING_MODEL } from "./embeddings-gemini.js"; -import { DEFAULT_OPENAI_EMBEDDING_MODEL } from "./embeddings-openai.js"; -import { DEFAULT_VOYAGE_EMBEDDING_MODEL } from "./embeddings-voyage.js"; import { createEmbeddingProvider, type EmbeddingProvider, @@ -41,74 +24,23 @@ import { type VoyageEmbeddingClient, } from "./embeddings.js"; import { bm25RankToScore, buildFtsQuery, mergeHybridResults } from "./hybrid.js"; -import { - buildFileEntry, - chunkMarkdown, - ensureDir, - hashText, - isMemoryPath, - listMemoryFiles, - normalizeExtraMemoryPaths, - type MemoryChunk, - type MemoryFileEntry, - parseEmbedding, - remapChunkLines, - runWithConcurrency, -} from "./internal.js"; +import { isMemoryPath, normalizeExtraMemoryPaths } from "./internal.js"; +import { memoryManagerEmbeddingOps } from "./manager-embedding-ops.js"; import { searchKeyword, searchVector } from "./manager-search.js"; -import { ensureMemoryIndexSchema } from "./memory-schema.js"; -import { - buildSessionEntry, - listSessionFilesForAgent, - sessionPathForFile, - type SessionFileEntry, -} from "./session-files.js"; -import { loadSqliteVecExtension } from "./sqlite-vec.js"; -import { requireNodeSqlite } from "./sqlite.js"; - -type MemoryIndexMeta = { - model: string; - provider: string; - providerKey?: string; - chunkTokens: number; - chunkOverlap: number; - vectorDims?: number; -}; - -type MemorySyncProgressState = { - completed: number; - total: number; - label?: string; - report: (update: MemorySyncProgressUpdate) => void; -}; - -const META_KEY = "memory_index_meta_v1"; +import { memoryManagerSyncOps } from "./manager-sync-ops.js"; const SNIPPET_MAX_CHARS = 700; const VECTOR_TABLE = "chunks_vec"; const FTS_TABLE = "chunks_fts"; const EMBEDDING_CACHE_TABLE = "embedding_cache"; -const SESSION_DIRTY_DEBOUNCE_MS = 5000; -const EMBEDDING_BATCH_MAX_TOKENS = 8000; -const EMBEDDING_INDEX_CONCURRENCY = 4; -const EMBEDDING_RETRY_MAX_ATTEMPTS = 3; -const EMBEDDING_RETRY_BASE_DELAY_MS = 500; -const EMBEDDING_RETRY_MAX_DELAY_MS = 8000; const BATCH_FAILURE_LIMIT = 2; -const SESSION_DELTA_READ_CHUNK_BYTES = 64 * 1024; -const VECTOR_LOAD_TIMEOUT_MS = 30_000; -const EMBEDDING_QUERY_TIMEOUT_REMOTE_MS = 60_000; -const EMBEDDING_QUERY_TIMEOUT_LOCAL_MS = 5 * 60_000; -const EMBEDDING_BATCH_TIMEOUT_REMOTE_MS = 2 * 60_000; -const EMBEDDING_BATCH_TIMEOUT_LOCAL_MS = 10 * 60_000; const log = createSubsystemLogger("memory"); const INDEX_CACHE = new Map(); -const vectorToBlob = (embedding: number[]): Buffer => - Buffer.from(new Float32Array(embedding).buffer); - export class MemoryIndexManager implements MemorySearchManager { + // oxlint-disable-next-line typescript/no-explicit-any + [key: string]: any; private readonly cacheKey: string; private readonly cfg: OpenClawConfig; private readonly agentId: string; @@ -293,7 +225,7 @@ export class MemoryIndexManager implements MemorySearchManager { ? await this.searchKeyword(cleaned, candidates).catch(() => []) : []; - const queryVec = await this.embedQueryWithTimeout(cleaned); + const queryVec = (await this.embedQueryWithTimeout(cleaned)) as number[]; const hasVector = queryVec.some((v) => v !== 0); const vectorResults = hasVector ? await this.searchVector(queryVec, candidates).catch(() => []) @@ -399,7 +331,7 @@ export class MemoryIndexManager implements MemorySearchManager { this.syncing = this.runSync(params).finally(() => { this.syncing = null; }); - return this.syncing; + return this.syncing ?? Promise.resolve(); } async readFile(params: { @@ -609,1694 +541,21 @@ export class MemoryIndexManager implements MemorySearchManager { this.db.close(); INDEX_CACHE.delete(this.cacheKey); } +} - private async ensureVectorReady(dimensions?: number): Promise { - if (!this.vector.enabled) { - return false; - } - if (!this.vectorReady) { - this.vectorReady = this.withTimeout( - this.loadVectorExtension(), - VECTOR_LOAD_TIMEOUT_MS, - `sqlite-vec load timed out after ${Math.round(VECTOR_LOAD_TIMEOUT_MS / 1000)}s`, - ); - } - let ready = false; - try { - ready = await this.vectorReady; - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - this.vector.available = false; - this.vector.loadError = message; - this.vectorReady = null; - log.warn(`sqlite-vec unavailable: ${message}`); - return false; - } - if (ready && typeof dimensions === "number" && dimensions > 0) { - this.ensureVectorTable(dimensions); - } - return ready; - } - - private async loadVectorExtension(): Promise { - if (this.vector.available !== null) { - return this.vector.available; - } - if (!this.vector.enabled) { - this.vector.available = false; - return false; - } - try { - const resolvedPath = this.vector.extensionPath?.trim() - ? resolveUserPath(this.vector.extensionPath) - : undefined; - const loaded = await loadSqliteVecExtension({ db: this.db, extensionPath: resolvedPath }); - if (!loaded.ok) { - throw new Error(loaded.error ?? "unknown sqlite-vec load error"); - } - this.vector.extensionPath = loaded.extensionPath; - this.vector.available = true; - return true; - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - this.vector.available = false; - this.vector.loadError = message; - log.warn(`sqlite-vec unavailable: ${message}`); - return false; - } - } - - private ensureVectorTable(dimensions: number): void { - if (this.vector.dims === dimensions) { - return; - } - if (this.vector.dims && this.vector.dims !== dimensions) { - this.dropVectorTable(); - } - this.db.exec( - `CREATE VIRTUAL TABLE IF NOT EXISTS ${VECTOR_TABLE} USING vec0(\n` + - ` id TEXT PRIMARY KEY,\n` + - ` embedding FLOAT[${dimensions}]\n` + - `)`, - ); - this.vector.dims = dimensions; - } - - private dropVectorTable(): void { - try { - this.db.exec(`DROP TABLE IF EXISTS ${VECTOR_TABLE}`); - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - log.debug(`Failed to drop ${VECTOR_TABLE}: ${message}`); - } - } - - private buildSourceFilter(alias?: string): { sql: string; params: MemorySource[] } { - const sources = Array.from(this.sources); - if (sources.length === 0) { - return { sql: "", params: [] }; - } - const column = alias ? `${alias}.source` : "source"; - const placeholders = sources.map(() => "?").join(", "); - return { sql: ` AND ${column} IN (${placeholders})`, params: sources }; - } - - private openDatabase(): DatabaseSync { - const dbPath = resolveUserPath(this.settings.store.path); - return this.openDatabaseAtPath(dbPath); - } - - private openDatabaseAtPath(dbPath: string): DatabaseSync { - const dir = path.dirname(dbPath); - ensureDir(dir); - const { DatabaseSync } = requireNodeSqlite(); - return new DatabaseSync(dbPath, { allowExtension: this.settings.store.vector.enabled }); - } - - private seedEmbeddingCache(sourceDb: DatabaseSync): void { - if (!this.cache.enabled) { - return; - } - try { - const rows = sourceDb - .prepare( - `SELECT provider, model, provider_key, hash, embedding, dims, updated_at FROM ${EMBEDDING_CACHE_TABLE}`, - ) - .all() as Array<{ - provider: string; - model: string; - provider_key: string; - hash: string; - embedding: string; - dims: number | null; - updated_at: number; - }>; - if (!rows.length) { - return; - } - const insert = this.db.prepare( - `INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET - embedding=excluded.embedding, - dims=excluded.dims, - updated_at=excluded.updated_at`, - ); - this.db.exec("BEGIN"); - for (const row of rows) { - insert.run( - row.provider, - row.model, - row.provider_key, - row.hash, - row.embedding, - row.dims, - row.updated_at, - ); - } - this.db.exec("COMMIT"); - } catch (err) { - try { - this.db.exec("ROLLBACK"); - } catch {} - throw err; - } - } - - private async swapIndexFiles(targetPath: string, tempPath: string): Promise { - const backupPath = `${targetPath}.backup-${randomUUID()}`; - await this.moveIndexFiles(targetPath, backupPath); - try { - await this.moveIndexFiles(tempPath, targetPath); - } catch (err) { - await this.moveIndexFiles(backupPath, targetPath); - throw err; - } - await this.removeIndexFiles(backupPath); - } - - private async moveIndexFiles(sourceBase: string, targetBase: string): Promise { - const suffixes = ["", "-wal", "-shm"]; - for (const suffix of suffixes) { - const source = `${sourceBase}${suffix}`; - const target = `${targetBase}${suffix}`; - try { - await fs.rename(source, target); - } catch (err) { - if ((err as NodeJS.ErrnoException).code !== "ENOENT") { - throw err; - } - } - } - } - - private async removeIndexFiles(basePath: string): Promise { - const suffixes = ["", "-wal", "-shm"]; - await Promise.all(suffixes.map((suffix) => fs.rm(`${basePath}${suffix}`, { force: true }))); - } - - private ensureSchema() { - const result = ensureMemoryIndexSchema({ - db: this.db, - embeddingCacheTable: EMBEDDING_CACHE_TABLE, - ftsTable: FTS_TABLE, - ftsEnabled: this.fts.enabled, - }); - this.fts.available = result.ftsAvailable; - if (result.ftsError) { - this.fts.loadError = result.ftsError; - log.warn(`fts unavailable: ${result.ftsError}`); - } - } - - private ensureWatcher() { - if (!this.sources.has("memory") || !this.settings.sync.watch || this.watcher) { - return; - } - const additionalPaths = normalizeExtraMemoryPaths(this.workspaceDir, this.settings.extraPaths) - .map((entry) => { - try { - const stat = fsSync.lstatSync(entry); - return stat.isSymbolicLink() ? null : entry; - } catch { - return null; - } - }) - .filter((entry): entry is string => Boolean(entry)); - const watchPaths = new Set([ - path.join(this.workspaceDir, "MEMORY.md"), - path.join(this.workspaceDir, "memory.md"), - path.join(this.workspaceDir, "memory"), - ...additionalPaths, - ]); - this.watcher = chokidar.watch(Array.from(watchPaths), { - ignoreInitial: true, - awaitWriteFinish: { - stabilityThreshold: this.settings.sync.watchDebounceMs, - pollInterval: 100, - }, - }); - const markDirty = () => { - this.dirty = true; - this.scheduleWatchSync(); - }; - this.watcher.on("add", markDirty); - this.watcher.on("change", markDirty); - this.watcher.on("unlink", markDirty); - } - - private ensureSessionListener() { - if (!this.sources.has("sessions") || this.sessionUnsubscribe) { - return; - } - this.sessionUnsubscribe = onSessionTranscriptUpdate((update) => { - if (this.closed) { - return; - } - const sessionFile = update.sessionFile; - if (!this.isSessionFileForAgent(sessionFile)) { - return; - } - this.scheduleSessionDirty(sessionFile); - }); - } - - private scheduleSessionDirty(sessionFile: string) { - this.sessionPendingFiles.add(sessionFile); - if (this.sessionWatchTimer) { - return; - } - this.sessionWatchTimer = setTimeout(() => { - this.sessionWatchTimer = null; - void this.processSessionDeltaBatch().catch((err) => { - log.warn(`memory session delta failed: ${String(err)}`); - }); - }, SESSION_DIRTY_DEBOUNCE_MS); - } - - private async processSessionDeltaBatch(): Promise { - if (this.sessionPendingFiles.size === 0) { - return; - } - const pending = Array.from(this.sessionPendingFiles); - this.sessionPendingFiles.clear(); - let shouldSync = false; - for (const sessionFile of pending) { - const delta = await this.updateSessionDelta(sessionFile); - if (!delta) { +function applyPrototypeMixins(target: object, ...sources: object[]): void { + for (const source of sources) { + for (const name of Object.getOwnPropertyNames(source)) { + if (name === "constructor") { continue; } - const bytesThreshold = delta.deltaBytes; - const messagesThreshold = delta.deltaMessages; - const bytesHit = - bytesThreshold <= 0 ? delta.pendingBytes > 0 : delta.pendingBytes >= bytesThreshold; - const messagesHit = - messagesThreshold <= 0 - ? delta.pendingMessages > 0 - : delta.pendingMessages >= messagesThreshold; - if (!bytesHit && !messagesHit) { + const descriptor = Object.getOwnPropertyDescriptor(source, name); + if (!descriptor) { continue; } - this.sessionsDirtyFiles.add(sessionFile); - this.sessionsDirty = true; - delta.pendingBytes = - bytesThreshold > 0 ? Math.max(0, delta.pendingBytes - bytesThreshold) : 0; - delta.pendingMessages = - messagesThreshold > 0 ? Math.max(0, delta.pendingMessages - messagesThreshold) : 0; - shouldSync = true; + Object.defineProperty(target, name, descriptor); } - if (shouldSync) { - void this.sync({ reason: "session-delta" }).catch((err) => { - log.warn(`memory sync failed (session-delta): ${String(err)}`); - }); - } - } - - private async updateSessionDelta(sessionFile: string): Promise<{ - deltaBytes: number; - deltaMessages: number; - pendingBytes: number; - pendingMessages: number; - } | null> { - const thresholds = this.settings.sync.sessions; - if (!thresholds) { - return null; - } - let stat: { size: number }; - try { - stat = await fs.stat(sessionFile); - } catch { - return null; - } - const size = stat.size; - let state = this.sessionDeltas.get(sessionFile); - if (!state) { - state = { lastSize: 0, pendingBytes: 0, pendingMessages: 0 }; - this.sessionDeltas.set(sessionFile, state); - } - const deltaBytes = Math.max(0, size - state.lastSize); - if (deltaBytes === 0 && size === state.lastSize) { - return { - deltaBytes: thresholds.deltaBytes, - deltaMessages: thresholds.deltaMessages, - pendingBytes: state.pendingBytes, - pendingMessages: state.pendingMessages, - }; - } - if (size < state.lastSize) { - state.lastSize = size; - state.pendingBytes += size; - const shouldCountMessages = - thresholds.deltaMessages > 0 && - (thresholds.deltaBytes <= 0 || state.pendingBytes < thresholds.deltaBytes); - if (shouldCountMessages) { - state.pendingMessages += await this.countNewlines(sessionFile, 0, size); - } - } else { - state.pendingBytes += deltaBytes; - const shouldCountMessages = - thresholds.deltaMessages > 0 && - (thresholds.deltaBytes <= 0 || state.pendingBytes < thresholds.deltaBytes); - if (shouldCountMessages) { - state.pendingMessages += await this.countNewlines(sessionFile, state.lastSize, size); - } - state.lastSize = size; - } - this.sessionDeltas.set(sessionFile, state); - return { - deltaBytes: thresholds.deltaBytes, - deltaMessages: thresholds.deltaMessages, - pendingBytes: state.pendingBytes, - pendingMessages: state.pendingMessages, - }; - } - - private async countNewlines(absPath: string, start: number, end: number): Promise { - if (end <= start) { - return 0; - } - const handle = await fs.open(absPath, "r"); - try { - let offset = start; - let count = 0; - const buffer = Buffer.alloc(SESSION_DELTA_READ_CHUNK_BYTES); - while (offset < end) { - const toRead = Math.min(buffer.length, end - offset); - const { bytesRead } = await handle.read(buffer, 0, toRead, offset); - if (bytesRead <= 0) { - break; - } - for (let i = 0; i < bytesRead; i += 1) { - if (buffer[i] === 10) { - count += 1; - } - } - offset += bytesRead; - } - return count; - } finally { - await handle.close(); - } - } - - private resetSessionDelta(absPath: string, size: number): void { - const state = this.sessionDeltas.get(absPath); - if (!state) { - return; - } - state.lastSize = size; - state.pendingBytes = 0; - state.pendingMessages = 0; - } - - private isSessionFileForAgent(sessionFile: string): boolean { - if (!sessionFile) { - return false; - } - const sessionsDir = resolveSessionTranscriptsDirForAgent(this.agentId); - const resolvedFile = path.resolve(sessionFile); - const resolvedDir = path.resolve(sessionsDir); - return resolvedFile.startsWith(`${resolvedDir}${path.sep}`); - } - - private ensureIntervalSync() { - const minutes = this.settings.sync.intervalMinutes; - if (!minutes || minutes <= 0 || this.intervalTimer) { - return; - } - const ms = minutes * 60 * 1000; - this.intervalTimer = setInterval(() => { - void this.sync({ reason: "interval" }).catch((err) => { - log.warn(`memory sync failed (interval): ${String(err)}`); - }); - }, ms); - } - - private scheduleWatchSync() { - if (!this.sources.has("memory") || !this.settings.sync.watch) { - return; - } - if (this.watchTimer) { - clearTimeout(this.watchTimer); - } - this.watchTimer = setTimeout(() => { - this.watchTimer = null; - void this.sync({ reason: "watch" }).catch((err) => { - log.warn(`memory sync failed (watch): ${String(err)}`); - }); - }, this.settings.sync.watchDebounceMs); - } - - private shouldSyncSessions( - params?: { reason?: string; force?: boolean }, - needsFullReindex = false, - ) { - if (!this.sources.has("sessions")) { - return false; - } - if (params?.force) { - return true; - } - const reason = params?.reason; - if (reason === "session-start" || reason === "watch") { - return false; - } - if (needsFullReindex) { - return true; - } - return this.sessionsDirty && this.sessionsDirtyFiles.size > 0; - } - - private async syncMemoryFiles(params: { - needsFullReindex: boolean; - progress?: MemorySyncProgressState; - }) { - const files = await listMemoryFiles(this.workspaceDir, this.settings.extraPaths); - const fileEntries = await Promise.all( - files.map(async (file) => buildFileEntry(file, this.workspaceDir)), - ); - log.debug("memory sync: indexing memory files", { - files: fileEntries.length, - needsFullReindex: params.needsFullReindex, - batch: this.batch.enabled, - concurrency: this.getIndexConcurrency(), - }); - const activePaths = new Set(fileEntries.map((entry) => entry.path)); - if (params.progress) { - params.progress.total += fileEntries.length; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - label: this.batch.enabled ? "Indexing memory files (batch)..." : "Indexing memory files…", - }); - } - - const tasks = fileEntries.map((entry) => async () => { - const record = this.db - .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) - .get(entry.path, "memory") as { hash: string } | undefined; - if (!params.needsFullReindex && record?.hash === entry.hash) { - if (params.progress) { - params.progress.completed += 1; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - }); - } - return; - } - await this.indexFile(entry, { source: "memory" }); - if (params.progress) { - params.progress.completed += 1; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - }); - } - }); - await runWithConcurrency(tasks, this.getIndexConcurrency()); - - const staleRows = this.db - .prepare(`SELECT path FROM files WHERE source = ?`) - .all("memory") as Array<{ path: string }>; - for (const stale of staleRows) { - if (activePaths.has(stale.path)) { - continue; - } - this.db.prepare(`DELETE FROM files WHERE path = ? AND source = ?`).run(stale.path, "memory"); - try { - this.db - .prepare( - `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, - ) - .run(stale.path, "memory"); - } catch {} - this.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(stale.path, "memory"); - if (this.fts.enabled && this.fts.available) { - try { - this.db - .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) - .run(stale.path, "memory", this.provider.model); - } catch {} - } - } - } - - private async syncSessionFiles(params: { - needsFullReindex: boolean; - progress?: MemorySyncProgressState; - }) { - const files = await listSessionFilesForAgent(this.agentId); - const activePaths = new Set(files.map((file) => sessionPathForFile(file))); - const indexAll = params.needsFullReindex || this.sessionsDirtyFiles.size === 0; - log.debug("memory sync: indexing session files", { - files: files.length, - indexAll, - dirtyFiles: this.sessionsDirtyFiles.size, - batch: this.batch.enabled, - concurrency: this.getIndexConcurrency(), - }); - if (params.progress) { - params.progress.total += files.length; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - label: this.batch.enabled ? "Indexing session files (batch)..." : "Indexing session files…", - }); - } - - const tasks = files.map((absPath) => async () => { - if (!indexAll && !this.sessionsDirtyFiles.has(absPath)) { - if (params.progress) { - params.progress.completed += 1; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - }); - } - return; - } - const entry = await buildSessionEntry(absPath); - if (!entry) { - if (params.progress) { - params.progress.completed += 1; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - }); - } - return; - } - const record = this.db - .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) - .get(entry.path, "sessions") as { hash: string } | undefined; - if (!params.needsFullReindex && record?.hash === entry.hash) { - if (params.progress) { - params.progress.completed += 1; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - }); - } - this.resetSessionDelta(absPath, entry.size); - return; - } - await this.indexFile(entry, { source: "sessions", content: entry.content }); - this.resetSessionDelta(absPath, entry.size); - if (params.progress) { - params.progress.completed += 1; - params.progress.report({ - completed: params.progress.completed, - total: params.progress.total, - }); - } - }); - await runWithConcurrency(tasks, this.getIndexConcurrency()); - - const staleRows = this.db - .prepare(`SELECT path FROM files WHERE source = ?`) - .all("sessions") as Array<{ path: string }>; - for (const stale of staleRows) { - if (activePaths.has(stale.path)) { - continue; - } - this.db - .prepare(`DELETE FROM files WHERE path = ? AND source = ?`) - .run(stale.path, "sessions"); - try { - this.db - .prepare( - `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, - ) - .run(stale.path, "sessions"); - } catch {} - this.db - .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) - .run(stale.path, "sessions"); - if (this.fts.enabled && this.fts.available) { - try { - this.db - .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) - .run(stale.path, "sessions", this.provider.model); - } catch {} - } - } - } - - private createSyncProgress( - onProgress: (update: MemorySyncProgressUpdate) => void, - ): MemorySyncProgressState { - const state: MemorySyncProgressState = { - completed: 0, - total: 0, - label: undefined, - report: (update) => { - if (update.label) { - state.label = update.label; - } - const label = - update.total > 0 && state.label - ? `${state.label} ${update.completed}/${update.total}` - : state.label; - onProgress({ - completed: update.completed, - total: update.total, - label, - }); - }, - }; - return state; - } - - private async runSync(params?: { - reason?: string; - force?: boolean; - progress?: (update: MemorySyncProgressUpdate) => void; - }) { - const progress = params?.progress ? this.createSyncProgress(params.progress) : undefined; - if (progress) { - progress.report({ - completed: progress.completed, - total: progress.total, - label: "Loading vector extension…", - }); - } - const vectorReady = await this.ensureVectorReady(); - const meta = this.readMeta(); - const needsFullReindex = - params?.force || - !meta || - meta.model !== this.provider.model || - meta.provider !== this.provider.id || - meta.providerKey !== this.providerKey || - meta.chunkTokens !== this.settings.chunking.tokens || - meta.chunkOverlap !== this.settings.chunking.overlap || - (vectorReady && !meta?.vectorDims); - try { - if (needsFullReindex) { - await this.runSafeReindex({ - reason: params?.reason, - force: params?.force, - progress: progress ?? undefined, - }); - return; - } - - const shouldSyncMemory = - this.sources.has("memory") && (params?.force || needsFullReindex || this.dirty); - const shouldSyncSessions = this.shouldSyncSessions(params, needsFullReindex); - - if (shouldSyncMemory) { - await this.syncMemoryFiles({ needsFullReindex, progress: progress ?? undefined }); - this.dirty = false; - } - - if (shouldSyncSessions) { - await this.syncSessionFiles({ needsFullReindex, progress: progress ?? undefined }); - this.sessionsDirty = false; - this.sessionsDirtyFiles.clear(); - } else if (this.sessionsDirtyFiles.size > 0) { - this.sessionsDirty = true; - } else { - this.sessionsDirty = false; - } - } catch (err) { - const reason = err instanceof Error ? err.message : String(err); - const activated = - this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason)); - if (activated) { - await this.runSafeReindex({ - reason: params?.reason ?? "fallback", - force: true, - progress: progress ?? undefined, - }); - return; - } - throw err; - } - } - - private shouldFallbackOnError(message: string): boolean { - return /embedding|embeddings|batch/i.test(message); - } - - private resolveBatchConfig(): { - enabled: boolean; - wait: boolean; - concurrency: number; - pollIntervalMs: number; - timeoutMs: number; - } { - const batch = this.settings.remote?.batch; - const enabled = Boolean( - batch?.enabled && - ((this.openAi && this.provider.id === "openai") || - (this.gemini && this.provider.id === "gemini") || - (this.voyage && this.provider.id === "voyage")), - ); - return { - enabled, - wait: batch?.wait ?? true, - concurrency: Math.max(1, batch?.concurrency ?? 2), - pollIntervalMs: batch?.pollIntervalMs ?? 2000, - timeoutMs: (batch?.timeoutMinutes ?? 60) * 60 * 1000, - }; - } - - private async activateFallbackProvider(reason: string): Promise { - const fallback = this.settings.fallback; - if (!fallback || fallback === "none" || fallback === this.provider.id) { - return false; - } - if (this.fallbackFrom) { - return false; - } - const fallbackFrom = this.provider.id as "openai" | "gemini" | "local" | "voyage"; - - const fallbackModel = - fallback === "gemini" - ? DEFAULT_GEMINI_EMBEDDING_MODEL - : fallback === "openai" - ? DEFAULT_OPENAI_EMBEDDING_MODEL - : fallback === "voyage" - ? DEFAULT_VOYAGE_EMBEDDING_MODEL - : this.settings.model; - - const fallbackResult = await createEmbeddingProvider({ - config: this.cfg, - agentDir: resolveAgentDir(this.cfg, this.agentId), - provider: fallback, - remote: this.settings.remote, - model: fallbackModel, - fallback: "none", - local: this.settings.local, - }); - - this.fallbackFrom = fallbackFrom; - this.fallbackReason = reason; - this.provider = fallbackResult.provider; - this.openAi = fallbackResult.openAi; - this.gemini = fallbackResult.gemini; - this.voyage = fallbackResult.voyage; - this.providerKey = this.computeProviderKey(); - this.batch = this.resolveBatchConfig(); - log.warn(`memory embeddings: switched to fallback provider (${fallback})`, { reason }); - return true; - } - - private async runSafeReindex(params: { - reason?: string; - force?: boolean; - progress?: MemorySyncProgressState; - }): Promise { - const dbPath = resolveUserPath(this.settings.store.path); - const tempDbPath = `${dbPath}.tmp-${randomUUID()}`; - const tempDb = this.openDatabaseAtPath(tempDbPath); - - const originalDb = this.db; - let originalDbClosed = false; - const originalState = { - ftsAvailable: this.fts.available, - ftsError: this.fts.loadError, - vectorAvailable: this.vector.available, - vectorLoadError: this.vector.loadError, - vectorDims: this.vector.dims, - vectorReady: this.vectorReady, - }; - - const restoreOriginalState = () => { - if (originalDbClosed) { - this.db = this.openDatabaseAtPath(dbPath); - } else { - this.db = originalDb; - } - this.fts.available = originalState.ftsAvailable; - this.fts.loadError = originalState.ftsError; - this.vector.available = originalDbClosed ? null : originalState.vectorAvailable; - this.vector.loadError = originalState.vectorLoadError; - this.vector.dims = originalState.vectorDims; - this.vectorReady = originalDbClosed ? null : originalState.vectorReady; - }; - - this.db = tempDb; - this.vectorReady = null; - this.vector.available = null; - this.vector.loadError = undefined; - this.vector.dims = undefined; - this.fts.available = false; - this.fts.loadError = undefined; - this.ensureSchema(); - - let nextMeta: MemoryIndexMeta | null = null; - - try { - this.seedEmbeddingCache(originalDb); - const shouldSyncMemory = this.sources.has("memory"); - const shouldSyncSessions = this.shouldSyncSessions( - { reason: params.reason, force: params.force }, - true, - ); - - if (shouldSyncMemory) { - await this.syncMemoryFiles({ needsFullReindex: true, progress: params.progress }); - this.dirty = false; - } - - if (shouldSyncSessions) { - await this.syncSessionFiles({ needsFullReindex: true, progress: params.progress }); - this.sessionsDirty = false; - this.sessionsDirtyFiles.clear(); - } else if (this.sessionsDirtyFiles.size > 0) { - this.sessionsDirty = true; - } else { - this.sessionsDirty = false; - } - - nextMeta = { - model: this.provider.model, - provider: this.provider.id, - providerKey: this.providerKey, - chunkTokens: this.settings.chunking.tokens, - chunkOverlap: this.settings.chunking.overlap, - }; - if (this.vector.available && this.vector.dims) { - nextMeta.vectorDims = this.vector.dims; - } - - this.writeMeta(nextMeta); - this.pruneEmbeddingCacheIfNeeded(); - - this.db.close(); - originalDb.close(); - originalDbClosed = true; - - await this.swapIndexFiles(dbPath, tempDbPath); - - this.db = this.openDatabaseAtPath(dbPath); - this.vectorReady = null; - this.vector.available = null; - this.vector.loadError = undefined; - this.ensureSchema(); - this.vector.dims = nextMeta.vectorDims; - } catch (err) { - try { - this.db.close(); - } catch {} - await this.removeIndexFiles(tempDbPath); - restoreOriginalState(); - throw err; - } - } - - private resetIndex() { - this.db.exec(`DELETE FROM files`); - this.db.exec(`DELETE FROM chunks`); - if (this.fts.enabled && this.fts.available) { - try { - this.db.exec(`DELETE FROM ${FTS_TABLE}`); - } catch {} - } - this.dropVectorTable(); - this.vector.dims = undefined; - this.sessionsDirtyFiles.clear(); - } - - private readMeta(): MemoryIndexMeta | null { - const row = this.db.prepare(`SELECT value FROM meta WHERE key = ?`).get(META_KEY) as - | { value: string } - | undefined; - if (!row?.value) { - return null; - } - try { - return JSON.parse(row.value) as MemoryIndexMeta; - } catch { - return null; - } - } - - private writeMeta(meta: MemoryIndexMeta) { - const value = JSON.stringify(meta); - this.db - .prepare( - `INSERT INTO meta (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value`, - ) - .run(META_KEY, value); - } - - private buildEmbeddingBatches(chunks: MemoryChunk[]): MemoryChunk[][] { - const batches: MemoryChunk[][] = []; - let current: MemoryChunk[] = []; - let currentTokens = 0; - - for (const chunk of chunks) { - const estimate = estimateUtf8Bytes(chunk.text); - const wouldExceed = - current.length > 0 && currentTokens + estimate > EMBEDDING_BATCH_MAX_TOKENS; - if (wouldExceed) { - batches.push(current); - current = []; - currentTokens = 0; - } - if (current.length === 0 && estimate > EMBEDDING_BATCH_MAX_TOKENS) { - batches.push([chunk]); - continue; - } - current.push(chunk); - currentTokens += estimate; - } - - if (current.length > 0) { - batches.push(current); - } - return batches; - } - - private loadEmbeddingCache(hashes: string[]): Map { - if (!this.cache.enabled) { - return new Map(); - } - if (hashes.length === 0) { - return new Map(); - } - const unique: string[] = []; - const seen = new Set(); - for (const hash of hashes) { - if (!hash) { - continue; - } - if (seen.has(hash)) { - continue; - } - seen.add(hash); - unique.push(hash); - } - if (unique.length === 0) { - return new Map(); - } - - const out = new Map(); - const baseParams = [this.provider.id, this.provider.model, this.providerKey]; - const batchSize = 400; - for (let start = 0; start < unique.length; start += batchSize) { - const batch = unique.slice(start, start + batchSize); - const placeholders = batch.map(() => "?").join(", "); - const rows = this.db - .prepare( - `SELECT hash, embedding FROM ${EMBEDDING_CACHE_TABLE}\n` + - ` WHERE provider = ? AND model = ? AND provider_key = ? AND hash IN (${placeholders})`, - ) - .all(...baseParams, ...batch) as Array<{ hash: string; embedding: string }>; - for (const row of rows) { - out.set(row.hash, parseEmbedding(row.embedding)); - } - } - return out; - } - - private upsertEmbeddingCache(entries: Array<{ hash: string; embedding: number[] }>): void { - if (!this.cache.enabled) { - return; - } - if (entries.length === 0) { - return; - } - const now = Date.now(); - const stmt = this.db.prepare( - `INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)\n` + - ` VALUES (?, ?, ?, ?, ?, ?, ?)\n` + - ` ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET\n` + - ` embedding=excluded.embedding,\n` + - ` dims=excluded.dims,\n` + - ` updated_at=excluded.updated_at`, - ); - for (const entry of entries) { - const embedding = entry.embedding ?? []; - stmt.run( - this.provider.id, - this.provider.model, - this.providerKey, - entry.hash, - JSON.stringify(embedding), - embedding.length, - now, - ); - } - } - - private pruneEmbeddingCacheIfNeeded(): void { - if (!this.cache.enabled) { - return; - } - const max = this.cache.maxEntries; - if (!max || max <= 0) { - return; - } - const row = this.db.prepare(`SELECT COUNT(*) as c FROM ${EMBEDDING_CACHE_TABLE}`).get() as - | { c: number } - | undefined; - const count = row?.c ?? 0; - if (count <= max) { - return; - } - const excess = count - max; - this.db - .prepare( - `DELETE FROM ${EMBEDDING_CACHE_TABLE}\n` + - ` WHERE rowid IN (\n` + - ` SELECT rowid FROM ${EMBEDDING_CACHE_TABLE}\n` + - ` ORDER BY updated_at ASC\n` + - ` LIMIT ?\n` + - ` )`, - ) - .run(excess); - } - - private async embedChunksInBatches(chunks: MemoryChunk[]): Promise { - if (chunks.length === 0) { - return []; - } - const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); - const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); - const missing: Array<{ index: number; chunk: MemoryChunk }> = []; - - for (let i = 0; i < chunks.length; i += 1) { - const chunk = chunks[i]; - const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; - if (hit && hit.length > 0) { - embeddings[i] = hit; - } else if (chunk) { - missing.push({ index: i, chunk }); - } - } - - if (missing.length === 0) { - return embeddings; - } - - const missingChunks = missing.map((m) => m.chunk); - const batches = this.buildEmbeddingBatches(missingChunks); - const toCache: Array<{ hash: string; embedding: number[] }> = []; - let cursor = 0; - for (const batch of batches) { - const batchEmbeddings = await this.embedBatchWithRetry(batch.map((chunk) => chunk.text)); - for (let i = 0; i < batch.length; i += 1) { - const item = missing[cursor + i]; - const embedding = batchEmbeddings[i] ?? []; - if (item) { - embeddings[item.index] = embedding; - toCache.push({ hash: item.chunk.hash, embedding }); - } - } - cursor += batch.length; - } - this.upsertEmbeddingCache(toCache); - return embeddings; - } - - private computeProviderKey(): string { - if (this.provider.id === "openai" && this.openAi) { - const entries = Object.entries(this.openAi.headers) - .filter(([key]) => key.toLowerCase() !== "authorization") - .toSorted(([a], [b]) => a.localeCompare(b)) - .map(([key, value]) => [key, value]); - return hashText( - JSON.stringify({ - provider: "openai", - baseUrl: this.openAi.baseUrl, - model: this.openAi.model, - headers: entries, - }), - ); - } - if (this.provider.id === "gemini" && this.gemini) { - const entries = Object.entries(this.gemini.headers) - .filter(([key]) => { - const lower = key.toLowerCase(); - return lower !== "authorization" && lower !== "x-goog-api-key"; - }) - .toSorted(([a], [b]) => a.localeCompare(b)) - .map(([key, value]) => [key, value]); - return hashText( - JSON.stringify({ - provider: "gemini", - baseUrl: this.gemini.baseUrl, - model: this.gemini.model, - headers: entries, - }), - ); - } - return hashText(JSON.stringify({ provider: this.provider.id, model: this.provider.model })); - } - - private async embedChunksWithBatch( - chunks: MemoryChunk[], - entry: MemoryFileEntry | SessionFileEntry, - source: MemorySource, - ): Promise { - if (this.provider.id === "openai" && this.openAi) { - return this.embedChunksWithOpenAiBatch(chunks, entry, source); - } - if (this.provider.id === "gemini" && this.gemini) { - return this.embedChunksWithGeminiBatch(chunks, entry, source); - } - if (this.provider.id === "voyage" && this.voyage) { - return this.embedChunksWithVoyageBatch(chunks, entry, source); - } - return this.embedChunksInBatches(chunks); - } - - private async embedChunksWithVoyageBatch( - chunks: MemoryChunk[], - entry: MemoryFileEntry | SessionFileEntry, - source: MemorySource, - ): Promise { - const voyage = this.voyage; - if (!voyage) { - return this.embedChunksInBatches(chunks); - } - if (chunks.length === 0) { - return []; - } - const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); - const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); - const missing: Array<{ index: number; chunk: MemoryChunk }> = []; - - for (let i = 0; i < chunks.length; i += 1) { - const chunk = chunks[i]; - const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; - if (hit && hit.length > 0) { - embeddings[i] = hit; - } else if (chunk) { - missing.push({ index: i, chunk }); - } - } - - if (missing.length === 0) { - return embeddings; - } - - const requests: VoyageBatchRequest[] = []; - const mapping = new Map(); - for (const item of missing) { - const chunk = item.chunk; - const customId = hashText( - `${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`, - ); - mapping.set(customId, { index: item.index, hash: chunk.hash }); - requests.push({ - custom_id: customId, - body: { - input: chunk.text, - }, - }); - } - const batchResult = await this.runBatchWithFallback({ - provider: "voyage", - run: async () => - await runVoyageEmbeddingBatches({ - client: voyage, - agentId: this.agentId, - requests, - wait: this.batch.wait, - concurrency: this.batch.concurrency, - pollIntervalMs: this.batch.pollIntervalMs, - timeoutMs: this.batch.timeoutMs, - debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }), - }), - fallback: async () => await this.embedChunksInBatches(chunks), - }); - if (Array.isArray(batchResult)) { - return batchResult; - } - const byCustomId = batchResult; - - const toCache: Array<{ hash: string; embedding: number[] }> = []; - for (const [customId, embedding] of byCustomId.entries()) { - const mapped = mapping.get(customId); - if (!mapped) { - continue; - } - embeddings[mapped.index] = embedding; - toCache.push({ hash: mapped.hash, embedding }); - } - this.upsertEmbeddingCache(toCache); - return embeddings; - } - - private async embedChunksWithOpenAiBatch( - chunks: MemoryChunk[], - entry: MemoryFileEntry | SessionFileEntry, - source: MemorySource, - ): Promise { - const openAi = this.openAi; - if (!openAi) { - return this.embedChunksInBatches(chunks); - } - if (chunks.length === 0) { - return []; - } - const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); - const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); - const missing: Array<{ index: number; chunk: MemoryChunk }> = []; - - for (let i = 0; i < chunks.length; i += 1) { - const chunk = chunks[i]; - const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; - if (hit && hit.length > 0) { - embeddings[i] = hit; - } else if (chunk) { - missing.push({ index: i, chunk }); - } - } - - if (missing.length === 0) { - return embeddings; - } - - const requests: OpenAiBatchRequest[] = []; - const mapping = new Map(); - for (const item of missing) { - const chunk = item.chunk; - const customId = hashText( - `${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`, - ); - mapping.set(customId, { index: item.index, hash: chunk.hash }); - requests.push({ - custom_id: customId, - method: "POST", - url: OPENAI_BATCH_ENDPOINT, - body: { - model: this.openAi?.model ?? this.provider.model, - input: chunk.text, - }, - }); - } - const batchResult = await this.runBatchWithFallback({ - provider: "openai", - run: async () => - await runOpenAiEmbeddingBatches({ - openAi, - agentId: this.agentId, - requests, - wait: this.batch.wait, - concurrency: this.batch.concurrency, - pollIntervalMs: this.batch.pollIntervalMs, - timeoutMs: this.batch.timeoutMs, - debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }), - }), - fallback: async () => await this.embedChunksInBatches(chunks), - }); - if (Array.isArray(batchResult)) { - return batchResult; - } - const byCustomId = batchResult; - - const toCache: Array<{ hash: string; embedding: number[] }> = []; - for (const [customId, embedding] of byCustomId.entries()) { - const mapped = mapping.get(customId); - if (!mapped) { - continue; - } - embeddings[mapped.index] = embedding; - toCache.push({ hash: mapped.hash, embedding }); - } - this.upsertEmbeddingCache(toCache); - return embeddings; - } - - private async embedChunksWithGeminiBatch( - chunks: MemoryChunk[], - entry: MemoryFileEntry | SessionFileEntry, - source: MemorySource, - ): Promise { - const gemini = this.gemini; - if (!gemini) { - return this.embedChunksInBatches(chunks); - } - if (chunks.length === 0) { - return []; - } - const cached = this.loadEmbeddingCache(chunks.map((chunk) => chunk.hash)); - const embeddings: number[][] = Array.from({ length: chunks.length }, () => []); - const missing: Array<{ index: number; chunk: MemoryChunk }> = []; - - for (let i = 0; i < chunks.length; i += 1) { - const chunk = chunks[i]; - const hit = chunk?.hash ? cached.get(chunk.hash) : undefined; - if (hit && hit.length > 0) { - embeddings[i] = hit; - } else if (chunk) { - missing.push({ index: i, chunk }); - } - } - - if (missing.length === 0) { - return embeddings; - } - - const requests: GeminiBatchRequest[] = []; - const mapping = new Map(); - for (const item of missing) { - const chunk = item.chunk; - const customId = hashText( - `${source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${item.index}`, - ); - mapping.set(customId, { index: item.index, hash: chunk.hash }); - requests.push({ - custom_id: customId, - content: { parts: [{ text: chunk.text }] }, - taskType: "RETRIEVAL_DOCUMENT", - }); - } - - const batchResult = await this.runBatchWithFallback({ - provider: "gemini", - run: async () => - await runGeminiEmbeddingBatches({ - gemini, - agentId: this.agentId, - requests, - wait: this.batch.wait, - concurrency: this.batch.concurrency, - pollIntervalMs: this.batch.pollIntervalMs, - timeoutMs: this.batch.timeoutMs, - debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }), - }), - fallback: async () => await this.embedChunksInBatches(chunks), - }); - if (Array.isArray(batchResult)) { - return batchResult; - } - const byCustomId = batchResult; - - const toCache: Array<{ hash: string; embedding: number[] }> = []; - for (const [customId, embedding] of byCustomId.entries()) { - const mapped = mapping.get(customId); - if (!mapped) { - continue; - } - embeddings[mapped.index] = embedding; - toCache.push({ hash: mapped.hash, embedding }); - } - this.upsertEmbeddingCache(toCache); - return embeddings; - } - - private async embedBatchWithRetry(texts: string[]): Promise { - if (texts.length === 0) { - return []; - } - let attempt = 0; - let delayMs = EMBEDDING_RETRY_BASE_DELAY_MS; - while (true) { - try { - const timeoutMs = this.resolveEmbeddingTimeout("batch"); - log.debug("memory embeddings: batch start", { - provider: this.provider.id, - items: texts.length, - timeoutMs, - }); - return await this.withTimeout( - this.provider.embedBatch(texts), - timeoutMs, - `memory embeddings batch timed out after ${Math.round(timeoutMs / 1000)}s`, - ); - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - if (!this.isRetryableEmbeddingError(message) || attempt >= EMBEDDING_RETRY_MAX_ATTEMPTS) { - throw err; - } - const waitMs = Math.min( - EMBEDDING_RETRY_MAX_DELAY_MS, - Math.round(delayMs * (1 + Math.random() * 0.2)), - ); - log.warn(`memory embeddings rate limited; retrying in ${waitMs}ms`); - await new Promise((resolve) => setTimeout(resolve, waitMs)); - delayMs *= 2; - attempt += 1; - } - } - } - - private isRetryableEmbeddingError(message: string): boolean { - return /(rate[_ ]limit|too many requests|429|resource has been exhausted|5\d\d|cloudflare)/i.test( - message, - ); - } - - private resolveEmbeddingTimeout(kind: "query" | "batch"): number { - const isLocal = this.provider.id === "local"; - if (kind === "query") { - return isLocal ? EMBEDDING_QUERY_TIMEOUT_LOCAL_MS : EMBEDDING_QUERY_TIMEOUT_REMOTE_MS; - } - return isLocal ? EMBEDDING_BATCH_TIMEOUT_LOCAL_MS : EMBEDDING_BATCH_TIMEOUT_REMOTE_MS; - } - - private async embedQueryWithTimeout(text: string): Promise { - const timeoutMs = this.resolveEmbeddingTimeout("query"); - log.debug("memory embeddings: query start", { provider: this.provider.id, timeoutMs }); - return await this.withTimeout( - this.provider.embedQuery(text), - timeoutMs, - `memory embeddings query timed out after ${Math.round(timeoutMs / 1000)}s`, - ); - } - - private async withTimeout( - promise: Promise, - timeoutMs: number, - message: string, - ): Promise { - if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) { - return await promise; - } - let timer: NodeJS.Timeout | null = null; - const timeoutPromise = new Promise((_, reject) => { - timer = setTimeout(() => reject(new Error(message)), timeoutMs); - }); - try { - return (await Promise.race([promise, timeoutPromise])) as T; - } finally { - if (timer) { - clearTimeout(timer); - } - } - } - - private async withBatchFailureLock(fn: () => Promise): Promise { - let release: () => void; - const wait = this.batchFailureLock; - this.batchFailureLock = new Promise((resolve) => { - release = resolve; - }); - await wait; - try { - return await fn(); - } finally { - release!(); - } - } - - private async resetBatchFailureCount(): Promise { - await this.withBatchFailureLock(async () => { - if (this.batchFailureCount > 0) { - log.debug("memory embeddings: batch recovered; resetting failure count"); - } - this.batchFailureCount = 0; - this.batchFailureLastError = undefined; - this.batchFailureLastProvider = undefined; - }); - } - - private async recordBatchFailure(params: { - provider: string; - message: string; - attempts?: number; - forceDisable?: boolean; - }): Promise<{ disabled: boolean; count: number }> { - return await this.withBatchFailureLock(async () => { - if (!this.batch.enabled) { - return { disabled: true, count: this.batchFailureCount }; - } - const increment = params.forceDisable - ? BATCH_FAILURE_LIMIT - : Math.max(1, params.attempts ?? 1); - this.batchFailureCount += increment; - this.batchFailureLastError = params.message; - this.batchFailureLastProvider = params.provider; - const disabled = params.forceDisable || this.batchFailureCount >= BATCH_FAILURE_LIMIT; - if (disabled) { - this.batch.enabled = false; - } - return { disabled, count: this.batchFailureCount }; - }); - } - - private isBatchTimeoutError(message: string): boolean { - return /timed out|timeout/i.test(message); - } - - private async runBatchWithTimeoutRetry(params: { - provider: string; - run: () => Promise; - }): Promise { - try { - return await params.run(); - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - if (this.isBatchTimeoutError(message)) { - log.warn(`memory embeddings: ${params.provider} batch timed out; retrying once`); - try { - return await params.run(); - } catch (retryErr) { - (retryErr as { batchAttempts?: number }).batchAttempts = 2; - throw retryErr; - } - } - throw err; - } - } - - private async runBatchWithFallback(params: { - provider: string; - run: () => Promise; - fallback: () => Promise; - }): Promise { - if (!this.batch.enabled) { - return await params.fallback(); - } - try { - const result = await this.runBatchWithTimeoutRetry({ - provider: params.provider, - run: params.run, - }); - await this.resetBatchFailureCount(); - return result; - } catch (err) { - const message = err instanceof Error ? err.message : String(err); - const attempts = (err as { batchAttempts?: number }).batchAttempts ?? 1; - const forceDisable = /asyncBatchEmbedContent not available/i.test(message); - const failure = await this.recordBatchFailure({ - provider: params.provider, - message, - attempts, - forceDisable, - }); - const suffix = failure.disabled ? "disabling batch" : "keeping batch enabled"; - log.warn( - `memory embeddings: ${params.provider} batch failed (${failure.count}/${BATCH_FAILURE_LIMIT}); ${suffix}; falling back to non-batch embeddings: ${message}`, - ); - return await params.fallback(); - } - } - - private getIndexConcurrency(): number { - return this.batch.enabled ? this.batch.concurrency : EMBEDDING_INDEX_CONCURRENCY; - } - - private async indexFile( - entry: MemoryFileEntry | SessionFileEntry, - options: { source: MemorySource; content?: string }, - ) { - const content = options.content ?? (await fs.readFile(entry.absPath, "utf-8")); - const chunks = enforceEmbeddingMaxInputTokens( - this.provider, - chunkMarkdown(content, this.settings.chunking).filter( - (chunk) => chunk.text.trim().length > 0, - ), - ); - if (options.source === "sessions" && "lineMap" in entry) { - remapChunkLines(chunks, entry.lineMap); - } - const embeddings = this.batch.enabled - ? await this.embedChunksWithBatch(chunks, entry, options.source) - : await this.embedChunksInBatches(chunks); - const sample = embeddings.find((embedding) => embedding.length > 0); - const vectorReady = sample ? await this.ensureVectorReady(sample.length) : false; - const now = Date.now(); - if (vectorReady) { - try { - this.db - .prepare( - `DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`, - ) - .run(entry.path, options.source); - } catch {} - } - if (this.fts.enabled && this.fts.available) { - try { - this.db - .prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`) - .run(entry.path, options.source, this.provider.model); - } catch {} - } - this.db - .prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`) - .run(entry.path, options.source); - for (let i = 0; i < chunks.length; i++) { - const chunk = chunks[i]; - const embedding = embeddings[i] ?? []; - const id = hashText( - `${options.source}:${entry.path}:${chunk.startLine}:${chunk.endLine}:${chunk.hash}:${this.provider.model}`, - ); - this.db - .prepare( - `INSERT INTO chunks (id, path, source, start_line, end_line, hash, model, text, embedding, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(id) DO UPDATE SET - hash=excluded.hash, - model=excluded.model, - text=excluded.text, - embedding=excluded.embedding, - updated_at=excluded.updated_at`, - ) - .run( - id, - entry.path, - options.source, - chunk.startLine, - chunk.endLine, - chunk.hash, - this.provider.model, - chunk.text, - JSON.stringify(embedding), - now, - ); - if (vectorReady && embedding.length > 0) { - try { - this.db.prepare(`DELETE FROM ${VECTOR_TABLE} WHERE id = ?`).run(id); - } catch {} - this.db - .prepare(`INSERT INTO ${VECTOR_TABLE} (id, embedding) VALUES (?, ?)`) - .run(id, vectorToBlob(embedding)); - } - if (this.fts.enabled && this.fts.available) { - this.db - .prepare( - `INSERT INTO ${FTS_TABLE} (text, id, path, source, model, start_line, end_line)\n` + - ` VALUES (?, ?, ?, ?, ?, ?, ?)`, - ) - .run( - chunk.text, - id, - entry.path, - options.source, - this.provider.model, - chunk.startLine, - chunk.endLine, - ); - } - } - this.db - .prepare( - `INSERT INTO files (path, source, hash, mtime, size) VALUES (?, ?, ?, ?, ?) - ON CONFLICT(path) DO UPDATE SET - source=excluded.source, - hash=excluded.hash, - mtime=excluded.mtime, - size=excluded.size`, - ) - .run(entry.path, options.source, entry.hash, entry.mtimeMs, entry.size); } } + +applyPrototypeMixins(MemoryIndexManager.prototype, memoryManagerSyncOps, memoryManagerEmbeddingOps);