diff --git a/CHANGELOG.md b/CHANGELOG.md index c77a0b2bc21..0cb82299cb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ Docs: https://docs.clawd.bot - Agents: propagate accountId into embedded runs so sub-agent announce routing honors the originating account. (#1058) - Compaction: include tool failure summaries in safeguard compaction to prevent retry loops. (#1084) - Daemon: include HOME in service environments to avoid missing HOME errors. (#1214) — thanks @ameno-. +- Memory: show total file counts + scan issues in `clawdbot memory status`; fall back to non-batch embeddings after repeated batch failures. - TUI: show generic empty-state text for searchable pickers. (#1201) — thanks @vignesh07. - Doctor: canonicalize legacy session keys in session stores to prevent stale metadata. (#1169) - CLI: centralize CLI command registration to keep fast-path routing and program wiring in sync. (#1207) — thanks @gumadeiras. diff --git a/src/cli/memory-cli.ts b/src/cli/memory-cli.ts index b263d0b89ec..e890ce4e3d4 100644 --- a/src/cli/memory-cli.ts +++ b/src/cli/memory-cli.ts @@ -1,3 +1,5 @@ +import fsSync from "node:fs"; +import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; @@ -5,10 +7,12 @@ import type { Command } from "commander"; import { resolveDefaultAgentId } from "../agents/agent-scope.js"; import { loadConfig } from "../config/config.js"; +import { resolveSessionTranscriptsDirForAgent } from "../config/sessions/paths.js"; import { setVerbose } from "../globals.js"; import { withProgress, withProgressTotals } from "./progress.js"; import { formatErrorMessage, withManager } from "./cli-utils.js"; import { getMemorySearchManager, type MemorySearchManagerResult } from "../memory/index.js"; +import { listMemoryFiles } from "../memory/internal.js"; import { defaultRuntime } from "../runtime.js"; import { formatDocsLink } from "../terminal/links.js"; import { colorize, isRich, theme } from "../terminal/theme.js"; @@ -24,6 +28,20 @@ type MemoryCommandOptions = { type MemoryManager = NonNullable; +type MemorySourceName = "memory" | "sessions"; + +type SourceScan = { + source: MemorySourceName; + totalFiles: number | null; + issues: string[]; +}; + +type MemorySourceScan = { + sources: SourceScan[]; + totalFiles: number | null; + issues: string[]; +}; + function formatSourceLabel(source: string, workspaceDir: string, agentId: string): string { if (source === "memory") { return `memory (MEMORY.md + ${path.join(workspaceDir, "memory")}${path.sep}*.md)`; @@ -51,6 +69,117 @@ function resolveAgentIds(cfg: ReturnType, agent?: string): st return [resolveDefaultAgentId(cfg)]; } +async function checkReadableFile(pathname: string): Promise<{ exists: boolean; issue?: string }> { + try { + await fs.access(pathname, fsSync.constants.R_OK); + return { exists: true }; + } catch (err) { + const code = (err as NodeJS.ErrnoException).code; + if (code === "ENOENT") return { exists: false }; + return { exists: true, issue: `${pathname} not readable (${code ?? "error"})` }; + } +} + +async function scanSessionFiles(agentId: string): Promise { + const issues: string[] = []; + const sessionsDir = resolveSessionTranscriptsDirForAgent(agentId); + try { + const entries = await fs.readdir(sessionsDir, { withFileTypes: true }); + const totalFiles = entries.filter((entry) => entry.isFile() && entry.name.endsWith(".jsonl")) + .length; + return { source: "sessions", totalFiles, issues }; + } catch (err) { + const code = (err as NodeJS.ErrnoException).code; + if (code === "ENOENT") { + issues.push(`sessions directory missing (${sessionsDir})`); + return { source: "sessions", totalFiles: 0, issues }; + } + issues.push(`sessions directory not accessible (${sessionsDir}): ${code ?? "error"}`); + return { source: "sessions", totalFiles: null, issues }; + } +} + +async function scanMemoryFiles(workspaceDir: string): Promise { + const issues: string[] = []; + const memoryFile = path.join(workspaceDir, "MEMORY.md"); + const altMemoryFile = path.join(workspaceDir, "memory.md"); + const memoryDir = path.join(workspaceDir, "memory"); + + const primary = await checkReadableFile(memoryFile); + const alt = await checkReadableFile(altMemoryFile); + if (primary.issue) issues.push(primary.issue); + if (alt.issue) issues.push(alt.issue); + + let dirReadable: boolean | null = null; + try { + await fs.access(memoryDir, fsSync.constants.R_OK); + dirReadable = true; + } catch (err) { + const code = (err as NodeJS.ErrnoException).code; + if (code === "ENOENT") { + issues.push(`memory directory missing (${memoryDir})`); + dirReadable = false; + } else { + issues.push(`memory directory not accessible (${memoryDir}): ${code ?? "error"}`); + dirReadable = null; + } + } + + let listed: string[] = []; + let listedOk = false; + try { + listed = await listMemoryFiles(workspaceDir); + listedOk = true; + } catch (err) { + const code = (err as NodeJS.ErrnoException).code; + if (dirReadable !== null) { + issues.push(`memory directory scan failed (${memoryDir}): ${code ?? "error"}`); + dirReadable = null; + } + } + + let totalFiles: number | null = 0; + if (dirReadable === null) { + totalFiles = null; + } else { + const files = new Set(listedOk ? listed : []); + if (!listedOk) { + if (primary.exists) files.add(memoryFile); + if (alt.exists) files.add(altMemoryFile); + } + totalFiles = files.size; + } + + if ((totalFiles ?? 0) === 0 && issues.length === 0) { + issues.push(`no memory files found in ${workspaceDir}`); + } + + return { source: "memory", totalFiles, issues }; +} + +async function scanMemorySources(params: { + workspaceDir: string; + agentId: string; + sources: MemorySourceName[]; +}): Promise { + const scans: SourceScan[] = []; + for (const source of params.sources) { + if (source === "memory") { + scans.push(await scanMemoryFiles(params.workspaceDir)); + } + if (source === "sessions") { + scans.push(await scanSessionFiles(params.agentId)); + } + } + const issues = scans.flatMap((scan) => scan.issues); + const totals = scans.map((scan) => scan.totalFiles); + const numericTotals = totals.filter((total): total is number => total !== null); + const totalFiles = totals.some((total) => total === null) + ? null + : numericTotals.reduce((sum, total) => sum + total, 0); + return { sources: scans, totalFiles, issues }; +} + export async function runMemoryStatus(opts: MemoryCommandOptions) { setVerbose(Boolean(opts.verbose)); const cfg = loadConfig(); @@ -60,6 +189,7 @@ export async function runMemoryStatus(opts: MemoryCommandOptions) { status: ReturnType; embeddingProbe?: Awaited>; indexError?: string; + scan?: MemorySourceScan; }> = []; for (const agentId of agentIds) { @@ -116,7 +246,13 @@ export async function runMemoryStatus(opts: MemoryCommandOptions) { await manager.probeVectorAvailability(); } const status = manager.status(); - allResults.push({ agentId, status, embeddingProbe, indexError }); + const sources = (status.sources?.length ? status.sources : ["memory"]) as MemorySourceName[]; + const scan = await scanMemorySources({ + workspaceDir: status.workspaceDir, + agentId, + sources, + }); + allResults.push({ agentId, status, embeddingProbe, indexError, scan }); }, }); } @@ -136,7 +272,12 @@ export async function runMemoryStatus(opts: MemoryCommandOptions) { const label = (text: string) => muted(`${text}:`); for (const result of allResults) { - const { agentId, status, embeddingProbe, indexError } = result; + const { agentId, status, embeddingProbe, indexError, scan } = result; + const totalFiles = scan?.totalFiles ?? null; + const indexedLabel = + totalFiles === null + ? `${status.files}/? files · ${status.chunks} chunks` + : `${status.files}/${totalFiles} files · ${status.chunks} chunks`; if (opts.index) { const line = indexError ? `Memory index failed: ${indexError}` : "Memory index complete."; defaultRuntime.log(line); @@ -148,7 +289,7 @@ export async function runMemoryStatus(opts: MemoryCommandOptions) { )}`, `${label("Model")} ${info(status.model)}`, status.sources?.length ? `${label("Sources")} ${info(status.sources.join(", "))}` : null, - `${label("Indexed")} ${success(`${status.files} files · ${status.chunks} chunks`)}`, + `${label("Indexed")} ${success(indexedLabel)}`, `${label("Dirty")} ${status.dirty ? warn("yes") : muted("no")}`, `${label("Store")} ${info(status.dbPath)}`, `${label("Workspace")} ${info(status.workspaceDir)}`, @@ -164,7 +305,12 @@ export async function runMemoryStatus(opts: MemoryCommandOptions) { if (status.sourceCounts?.length) { lines.push(label("By source")); for (const entry of status.sourceCounts) { - const counts = `${entry.files} files · ${entry.chunks} chunks`; + const total = scan?.sources.find((scanEntry) => scanEntry.source === entry.source) + ?.totalFiles; + const counts = + total === null + ? `${entry.files}/? files · ${entry.chunks} chunks` + : `${entry.files}/${total} files · ${entry.chunks} chunks`; lines.push(` ${accent(entry.source)} ${muted("·")} ${muted(counts)}`); } } @@ -223,12 +369,27 @@ export async function runMemoryStatus(opts: MemoryCommandOptions) { lines.push(`${label("Cache cap")} ${info(String(status.cache.maxEntries))}`); } } + if (status.batch) { + const batchState = status.batch.enabled ? "enabled" : "disabled"; + const batchColor = status.batch.enabled ? theme.success : theme.warn; + const batchSuffix = ` (failures ${status.batch.failures}/${status.batch.limit})`; + lines.push(`${label("Batch")} ${colorize(rich, batchColor, batchState)}${muted(batchSuffix)}`); + if (status.batch.lastError) { + lines.push(`${label("Batch error")} ${warn(status.batch.lastError)}`); + } + } if (status.fallback?.reason) { lines.push(muted(status.fallback.reason)); } if (indexError) { lines.push(`${label("Index error")} ${warn(indexError)}`); } + if (scan?.issues.length) { + lines.push(label("Issues")); + for (const issue of scan.issues) { + lines.push(` ${warn(issue)}`); + } + } defaultRuntime.log(lines.join("\n")); defaultRuntime.log(""); } diff --git a/src/memory/internal.ts b/src/memory/internal.ts index e53c48982bc..b68570c3592 100644 --- a/src/memory/internal.ts +++ b/src/memory/internal.ts @@ -70,7 +70,19 @@ export async function listMemoryFiles(workspaceDir: string): Promise { if (await exists(memoryDir)) { await walkDir(memoryDir, result); } - return result; + if (result.length <= 1) return result; + const seen = new Set(); + const deduped: string[] = []; + for (const entry of result) { + let key = entry; + try { + key = await fs.realpath(entry); + } catch {} + if (seen.has(key)) continue; + seen.add(key); + deduped.push(entry); + } + return deduped; } export function hashText(value: string): string { diff --git a/src/memory/manager.batch.test.ts b/src/memory/manager.batch.test.ts index e1c76d80ba3..a3f48936e6a 100644 --- a/src/memory/manager.batch.test.ts +++ b/src/memory/manager.batch.test.ts @@ -34,6 +34,9 @@ describe("memory indexing with OpenAI batches", () => { beforeEach(async () => { embedBatch.mockClear(); embedQuery.mockClear(); + embedBatch.mockImplementation(async (texts: string[]) => + texts.map((_text, index) => [index + 1, 0, 0]), + ); workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "clawdbot-mem-batch-")); indexPath = path.join(workspaceDir, "index.sqlite"); await fs.mkdir(path.join(workspaceDir, "memory")); @@ -246,4 +249,218 @@ describe("memory indexing with OpenAI batches", () => { expect(status.chunks).toBeGreaterThan(0); expect(batchCreates).toBe(2); }); + + it("falls back to non-batch on failure and resets failures after success", async () => { + const content = ["flaky", "batch"].join("\n\n"); + await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-09.md"), content); + + let uploadedRequests: Array<{ custom_id?: string }> = []; + let mode: "fail" | "ok" = "fail"; + const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => { + const url = + typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; + if (url.endsWith("/files")) { + const body = init?.body; + if (!(body instanceof FormData)) { + throw new Error("expected FormData upload"); + } + for (const [key, value] of body.entries()) { + if (key !== "file") continue; + if (typeof value === "string") { + uploadedRequests = value + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line) as { custom_id?: string }); + } else { + const text = await value.text(); + uploadedRequests = text + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line) as { custom_id?: string }); + } + } + return new Response(JSON.stringify({ id: "file_1" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + if (url.endsWith("/batches")) { + if (mode === "fail") { + return new Response("batch failed", { status: 500 }); + } + return new Response(JSON.stringify({ id: "batch_1", status: "in_progress" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + if (url.endsWith("/batches/batch_1")) { + return new Response( + JSON.stringify({ id: "batch_1", status: "completed", output_file_id: "file_out" }), + { status: 200, headers: { "Content-Type": "application/json" } }, + ); + } + if (url.endsWith("/files/file_out/content")) { + const lines = uploadedRequests.map((request, index) => + JSON.stringify({ + custom_id: request.custom_id, + response: { + status_code: 200, + body: { data: [{ embedding: [index + 1, 0, 0], index: 0 }] }, + }, + }), + ); + return new Response(lines.join("\n"), { + status: 200, + headers: { "Content-Type": "application/jsonl" }, + }); + } + throw new Error(`unexpected fetch ${url}`); + }); + + vi.stubGlobal("fetch", fetchMock); + + const cfg = { + agents: { + defaults: { + workspace: workspaceDir, + memorySearch: { + provider: "openai", + model: "text-embedding-3-small", + store: { path: indexPath }, + sync: { watch: false, onSessionStart: false, onSearch: false }, + query: { minScore: 0 }, + remote: { batch: { enabled: true, wait: true } }, + }, + }, + list: [{ id: "main", default: true }], + }, + }; + + const result = await getMemorySearchManager({ cfg, agentId: "main" }); + expect(result.manager).not.toBeNull(); + if (!result.manager) throw new Error("manager missing"); + manager = result.manager; + + await manager.sync({ force: true }); + expect(embedBatch).toHaveBeenCalled(); + let status = manager.status(); + expect(status.batch?.enabled).toBe(true); + expect(status.batch?.failures).toBe(1); + + embedBatch.mockClear(); + mode = "ok"; + await fs.writeFile( + path.join(workspaceDir, "memory", "2026-01-09.md"), + ["flaky", "batch", "recovery"].join("\n\n"), + ); + await manager.sync({ force: true }); + status = manager.status(); + expect(status.batch?.enabled).toBe(true); + expect(status.batch?.failures).toBe(0); + expect(embedBatch).not.toHaveBeenCalled(); + }); + + it("disables batch after repeated failures and skips batch thereafter", async () => { + const content = ["repeat", "failures"].join("\n\n"); + await fs.writeFile(path.join(workspaceDir, "memory", "2026-01-10.md"), content); + + let uploadedRequests: Array<{ custom_id?: string }> = []; + const fetchMock = vi.fn(async (input: RequestInfo | URL, init?: RequestInit) => { + const url = + typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; + if (url.endsWith("/files")) { + const body = init?.body; + if (!(body instanceof FormData)) { + throw new Error("expected FormData upload"); + } + for (const [key, value] of body.entries()) { + if (key !== "file") continue; + if (typeof value === "string") { + uploadedRequests = value + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line) as { custom_id?: string }); + } else { + const text = await value.text(); + uploadedRequests = text + .split("\n") + .filter(Boolean) + .map((line) => JSON.parse(line) as { custom_id?: string }); + } + } + return new Response(JSON.stringify({ id: "file_1" }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + if (url.endsWith("/batches")) { + return new Response("batch failed", { status: 500 }); + } + if (url.endsWith("/files/file_out/content")) { + const lines = uploadedRequests.map((request, index) => + JSON.stringify({ + custom_id: request.custom_id, + response: { + status_code: 200, + body: { data: [{ embedding: [index + 1, 0, 0], index: 0 }] }, + }, + }), + ); + return new Response(lines.join("\n"), { + status: 200, + headers: { "Content-Type": "application/jsonl" }, + }); + } + throw new Error(`unexpected fetch ${url}`); + }); + + vi.stubGlobal("fetch", fetchMock); + + const cfg = { + agents: { + defaults: { + workspace: workspaceDir, + memorySearch: { + provider: "openai", + model: "text-embedding-3-small", + store: { path: indexPath }, + sync: { watch: false, onSessionStart: false, onSearch: false }, + query: { minScore: 0 }, + remote: { batch: { enabled: true, wait: true } }, + }, + }, + list: [{ id: "main", default: true }], + }, + }; + + const result = await getMemorySearchManager({ cfg, agentId: "main" }); + expect(result.manager).not.toBeNull(); + if (!result.manager) throw new Error("manager missing"); + manager = result.manager; + + await manager.sync({ force: true }); + let status = manager.status(); + expect(status.batch?.enabled).toBe(true); + expect(status.batch?.failures).toBe(1); + + embedBatch.mockClear(); + await fs.writeFile( + path.join(workspaceDir, "memory", "2026-01-10.md"), + ["repeat", "failures", "again"].join("\n\n"), + ); + await manager.sync({ force: true }); + status = manager.status(); + expect(status.batch?.enabled).toBe(false); + expect(status.batch?.failures).toBeGreaterThanOrEqual(2); + + const fetchCalls = fetchMock.mock.calls.length; + embedBatch.mockClear(); + await fs.writeFile( + path.join(workspaceDir, "memory", "2026-01-10.md"), + ["repeat", "failures", "fallback"].join("\n\n"), + ); + await manager.sync({ force: true }); + expect(fetchMock.mock.calls.length).toBe(fetchCalls); + expect(embedBatch).toHaveBeenCalled(); + }); }); diff --git a/src/memory/manager.ts b/src/memory/manager.ts index b94e5f3de73..771b5eeab1e 100644 --- a/src/memory/manager.ts +++ b/src/memory/manager.ts @@ -100,6 +100,7 @@ const EMBEDDING_INDEX_CONCURRENCY = 4; const EMBEDDING_RETRY_MAX_ATTEMPTS = 3; const EMBEDDING_RETRY_BASE_DELAY_MS = 500; const EMBEDDING_RETRY_MAX_DELAY_MS = 8000; +const BATCH_FAILURE_LIMIT = 2; const log = createSubsystemLogger("memory"); @@ -127,6 +128,10 @@ export class MemoryIndexManager { pollIntervalMs: number; timeoutMs: number; }; + private batchFailureCount = 0; + private batchFailureLastError?: string; + private batchFailureLastProvider?: string; + private batchFailureLock: Promise = Promise.resolve(); private db: DatabaseSync; private readonly sources: Set; private providerKey: string; @@ -419,6 +424,17 @@ export class MemoryIndexManager { loadError?: string; dims?: number; }; + batch?: { + enabled: boolean; + failures: number; + limit: number; + wait: boolean; + concurrency: number; + pollIntervalMs: number; + timeoutMs: number; + lastError?: string; + lastProvider?: string; + }; } { const sourceFilter = this.buildSourceFilter(); const files = this.db @@ -498,6 +514,17 @@ export class MemoryIndexManager { loadError: this.vector.loadError, dims: this.vector.dims, }, + batch: { + enabled: this.batch.enabled, + failures: this.batchFailureCount, + limit: BATCH_FAILURE_LIMIT, + wait: this.batch.wait, + concurrency: this.batch.concurrency, + pollIntervalMs: this.batch.pollIntervalMs, + timeoutMs: this.batch.timeoutMs, + lastError: this.batchFailureLastError, + lastProvider: this.batchFailureLastProvider, + }, }; } @@ -1538,7 +1565,8 @@ export class MemoryIndexManager { entry: MemoryFileEntry | SessionFileEntry, source: MemorySource, ): Promise { - if (!this.openAi) { + const openAi = this.openAi; + if (!openAi) { return this.embedChunksInBatches(chunks); } if (chunks.length === 0) return []; @@ -1576,16 +1604,23 @@ export class MemoryIndexManager { }, }); } - const byCustomId = await runOpenAiEmbeddingBatches({ - openAi: this.openAi, - agentId: this.agentId, - requests, - wait: this.batch.wait, - concurrency: this.batch.concurrency, - pollIntervalMs: this.batch.pollIntervalMs, - timeoutMs: this.batch.timeoutMs, - debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }), + const batchResult = await this.runBatchWithFallback({ + provider: "openai", + run: async () => + await runOpenAiEmbeddingBatches({ + openAi, + agentId: this.agentId, + requests, + wait: this.batch.wait, + concurrency: this.batch.concurrency, + pollIntervalMs: this.batch.pollIntervalMs, + timeoutMs: this.batch.timeoutMs, + debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }), + }), + fallback: async () => await this.embedChunksInBatches(chunks), }); + if (Array.isArray(batchResult)) return batchResult; + const byCustomId = batchResult; const toCache: Array<{ hash: string; embedding: number[] }> = []; for (const [customId, embedding] of byCustomId.entries()) { @@ -1603,7 +1638,8 @@ export class MemoryIndexManager { entry: MemoryFileEntry | SessionFileEntry, source: MemorySource, ): Promise { - if (!this.gemini) { + const gemini = this.gemini; + if (!gemini) { return this.embedChunksInBatches(chunks); } if (chunks.length === 0) return []; @@ -1638,16 +1674,23 @@ export class MemoryIndexManager { }); } - const byCustomId = await runGeminiEmbeddingBatches({ - gemini: this.gemini, - agentId: this.agentId, - requests, - wait: this.batch.wait, - concurrency: this.batch.concurrency, - pollIntervalMs: this.batch.pollIntervalMs, - timeoutMs: this.batch.timeoutMs, - debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }), + const batchResult = await this.runBatchWithFallback({ + provider: "gemini", + run: async () => + await runGeminiEmbeddingBatches({ + gemini, + agentId: this.agentId, + requests, + wait: this.batch.wait, + concurrency: this.batch.concurrency, + pollIntervalMs: this.batch.pollIntervalMs, + timeoutMs: this.batch.timeoutMs, + debug: (message, data) => log.debug(message, { ...data, source, chunks: chunks.length }), + }), + fallback: async () => await this.embedChunksInBatches(chunks), }); + if (Array.isArray(batchResult)) return batchResult; + const byCustomId = batchResult; const toCache: Array<{ hash: string; embedding: number[] }> = []; for (const [customId, embedding] of byCustomId.entries()) { @@ -1717,6 +1760,111 @@ export class MemoryIndexManager { return results; } + private async withBatchFailureLock(fn: () => Promise): Promise { + let release: () => void; + const wait = this.batchFailureLock; + this.batchFailureLock = new Promise((resolve) => { + release = resolve; + }); + await wait; + try { + return await fn(); + } finally { + release!(); + } + } + + private async resetBatchFailureCount(): Promise { + await this.withBatchFailureLock(async () => { + if (this.batchFailureCount > 0) { + log.debug("memory embeddings: batch recovered; resetting failure count"); + } + this.batchFailureCount = 0; + this.batchFailureLastError = undefined; + this.batchFailureLastProvider = undefined; + }); + } + + private async recordBatchFailure(params: { + provider: string; + message: string; + attempts?: number; + forceDisable?: boolean; + }): Promise<{ disabled: boolean; count: number }> { + return await this.withBatchFailureLock(async () => { + if (!this.batch.enabled) { + return { disabled: true, count: this.batchFailureCount }; + } + const increment = params.forceDisable ? BATCH_FAILURE_LIMIT : Math.max(1, params.attempts ?? 1); + this.batchFailureCount += increment; + this.batchFailureLastError = params.message; + this.batchFailureLastProvider = params.provider; + const disabled = params.forceDisable || this.batchFailureCount >= BATCH_FAILURE_LIMIT; + if (disabled) { + this.batch.enabled = false; + } + return { disabled, count: this.batchFailureCount }; + }); + } + + private isBatchTimeoutError(message: string): boolean { + return /timed out|timeout/i.test(message); + } + + private async runBatchWithTimeoutRetry(params: { + provider: string; + run: () => Promise; + }): Promise { + try { + return await params.run(); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + if (this.isBatchTimeoutError(message)) { + log.warn(`memory embeddings: ${params.provider} batch timed out; retrying once`); + try { + return await params.run(); + } catch (retryErr) { + (retryErr as { batchAttempts?: number }).batchAttempts = 2; + throw retryErr; + } + } + throw err; + } + } + + private async runBatchWithFallback(params: { + provider: string; + run: () => Promise; + fallback: () => Promise; + }): Promise { + if (!this.batch.enabled) { + return await params.fallback(); + } + try { + const result = await this.runBatchWithTimeoutRetry({ + provider: params.provider, + run: params.run, + }); + await this.resetBatchFailureCount(); + return result; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + const attempts = (err as { batchAttempts?: number }).batchAttempts ?? 1; + const forceDisable = /asyncBatchEmbedContent not available/i.test(message); + const failure = await this.recordBatchFailure({ + provider: params.provider, + message, + attempts, + forceDisable, + }); + const suffix = failure.disabled ? "disabling batch" : "keeping batch enabled"; + log.warn( + `memory embeddings: ${params.provider} batch failed (${failure.count}/${BATCH_FAILURE_LIMIT}); ${suffix}; falling back to non-batch embeddings: ${message}`, + ); + return await params.fallback(); + } + } + private getIndexConcurrency(): number { return this.batch.enabled ? this.batch.concurrency : EMBEDDING_INDEX_CONCURRENCY; }