mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-06 15:18:58 +00:00
fix(sqlite): bound WAL sidecar growth
This commit is contained in:
@@ -32,6 +32,7 @@ Docs: https://docs.openclaw.ai
|
||||
- CLI/doctor: stream bundled plugin runtime dependency repair progress before, during, and after npm installs, so long `doctor --fix` runs no longer look hung in TTY or captured logs. Fixes #72775. Thanks @dfpalhano.
|
||||
- Memory-core: re-resolve the active runtime config whenever `memory_search` or `memory_get` executes, so provider changes made by `config.patch` stop leaving stale embedding backends behind in existing tool instances. Fixes #61098. Thanks @BradGroux and @Linux2010.
|
||||
- WebChat: keep bare `/new` and `/reset` startup instructions out of visible chat history while preserving `/reset <note>` as user-visible transcript text. Fixes #72369. Thanks @collynes and @haishmg.
|
||||
- Tasks/memory: checkpoint and truncate SQLite WAL sidecars on a timer and before close for task, Task Flow, proxy capture, and builtin memory databases, bounding long-running gateway `*.sqlite-wal` growth. Fixes #72774. Thanks @dfpalhano.
|
||||
- CLI/doctor: remove dangling channel config, heartbeat targets, and channel model overrides when stale plugin repair removes a missing channel plugin, preventing Gateway boot loops after failed plugin reinstalls. Fixes #65293. Thanks @yidecode.
|
||||
- Control UI/Gateway: cache, coalesce, stale-refresh, and invalidate effective tool inventory on channel registry changes while reusing the gateway-bound plugin registry and avoiding model/auth discovery, so chat runs no longer stall Control UI requests on repeated plugin/model setup. Fixes #72365; supersedes #72558. Thanks @Gabiii2398 and @1yihui.
|
||||
- Channels/setup: treat bundled channel plugins as already bundled during `channels add` and onboarding, enabling them without writing redundant `plugins.load.paths` entries or path install records. Fixes #72740. Thanks @iCodePoet.
|
||||
|
||||
@@ -116,6 +116,9 @@ Example: three independent cron jobs that together form a "morning ops" routine.
|
||||
## Durable state and revision tracking
|
||||
|
||||
Each flow persists its own state and tracks revisions so progress survives gateway restarts. Revision tracking enables conflict detection when multiple sources attempt to advance the same flow concurrently.
|
||||
The flow registry uses SQLite with bounded write-ahead-log maintenance, including
|
||||
periodic and shutdown checkpoints, so long-running gateways do not retain
|
||||
unbounded `registry.sqlite-wal` sidecar files.
|
||||
|
||||
## Cancel behavior
|
||||
|
||||
|
||||
@@ -305,6 +305,8 @@ $OPENCLAW_STATE_DIR/tasks/runs.sqlite
|
||||
```
|
||||
|
||||
The registry loads into memory at gateway start and syncs writes to SQLite for durability across restarts.
|
||||
The Gateway keeps the SQLite write-ahead log bounded by using SQLite's default
|
||||
autocheckpoint threshold plus periodic and shutdown `TRUNCATE` checkpoints.
|
||||
|
||||
### Automatic maintenance
|
||||
|
||||
|
||||
@@ -78,6 +78,8 @@ OpenClaw indexes `MEMORY.md` and `memory/*.md` into chunks (~400 tokens with
|
||||
80-token overlap) and stores them in a per-agent SQLite database.
|
||||
|
||||
- **Index location:** `~/.openclaw/memory/<agentId>.sqlite`
|
||||
- **Storage maintenance:** SQLite WAL sidecars are bounded with periodic and
|
||||
shutdown checkpoints.
|
||||
- **File watching:** changes to memory files trigger a debounced reindex (1.5s).
|
||||
- **Auto-reindex:** when the embedding provider, model, or chunking config
|
||||
changes, the entire index is rebuilt automatically.
|
||||
|
||||
@@ -1,15 +1,26 @@
|
||||
import path from "node:path";
|
||||
import type { DatabaseSync } from "node:sqlite";
|
||||
import { ensureDir, requireNodeSqlite } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
|
||||
import {
|
||||
closeMemorySqliteWalMaintenance,
|
||||
configureMemorySqliteWalMaintenance,
|
||||
ensureDir,
|
||||
requireNodeSqlite,
|
||||
} from "openclaw/plugin-sdk/memory-core-host-engine-storage";
|
||||
|
||||
export function openMemoryDatabaseAtPath(dbPath: string, allowExtension: boolean): DatabaseSync {
|
||||
const dir = path.dirname(dbPath);
|
||||
ensureDir(dir);
|
||||
const { DatabaseSync } = requireNodeSqlite();
|
||||
const db = new DatabaseSync(dbPath, { allowExtension });
|
||||
configureMemorySqliteWalMaintenance(db);
|
||||
// busy_timeout is per-connection and resets to 0 on restart.
|
||||
// Set it on every open so concurrent processes retry instead of
|
||||
// failing immediately with SQLITE_BUSY.
|
||||
db.exec("PRAGMA busy_timeout = 5000");
|
||||
return db;
|
||||
}
|
||||
|
||||
export function closeMemoryDatabase(db: DatabaseSync): void {
|
||||
closeMemorySqliteWalMaintenance(db);
|
||||
db.close();
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ export type MemoryReadonlyRecoveryState = {
|
||||
progress?: (update: MemorySyncProgressUpdate) => void;
|
||||
}) => Promise<void>;
|
||||
openDatabase: () => DatabaseSync;
|
||||
closeDatabase: (db: DatabaseSync) => void;
|
||||
resetVectorState: () => void;
|
||||
ensureSchema: () => void;
|
||||
readMeta: () => { vectorDims?: number } | undefined;
|
||||
@@ -100,7 +101,7 @@ export async function runMemorySyncWithReadonlyRecovery(
|
||||
state.readonlyRecoveryLastError = reason;
|
||||
log.warn(`memory sync readonly handle detected; reopening sqlite connection`, { reason });
|
||||
try {
|
||||
state.db.close();
|
||||
state.closeDatabase(state.db);
|
||||
} catch {}
|
||||
const previousVectorDims = state.vector.dims;
|
||||
state.db = state.openDatabase();
|
||||
|
||||
@@ -41,7 +41,7 @@ import {
|
||||
type EmbeddingProviderRuntime,
|
||||
} from "./embeddings.js";
|
||||
import { runMemoryAtomicReindex } from "./manager-atomic-reindex.js";
|
||||
import { openMemoryDatabaseAtPath } from "./manager-db.js";
|
||||
import { closeMemoryDatabase, openMemoryDatabaseAtPath } from "./manager-db.js";
|
||||
import {
|
||||
applyMemoryFallbackProviderState,
|
||||
resolveMemoryFallbackProviderRequest,
|
||||
@@ -1204,8 +1204,8 @@ export abstract class MemoryManagerSyncOps {
|
||||
this.writeMeta(meta);
|
||||
this.pruneEmbeddingCacheIfNeeded?.();
|
||||
|
||||
this.db.close();
|
||||
originalDb.close();
|
||||
closeMemoryDatabase(this.db);
|
||||
closeMemoryDatabase(originalDb);
|
||||
originalDbClosed = true;
|
||||
return meta;
|
||||
},
|
||||
@@ -1217,7 +1217,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
this.vector.dims = nextMeta?.vectorDims;
|
||||
} catch (err) {
|
||||
try {
|
||||
this.db.close();
|
||||
closeMemoryDatabase(this.db);
|
||||
} catch {}
|
||||
restoreOriginalState();
|
||||
throw err;
|
||||
|
||||
@@ -21,6 +21,7 @@ type ReadonlyRecoveryHarness = MemoryReadonlyRecoveryState & {
|
||||
enqueueTargetedSessionSync: ReturnType<typeof vi.fn>;
|
||||
runSync: ReturnType<typeof vi.fn>;
|
||||
openDatabase: ReturnType<typeof vi.fn>;
|
||||
closeDatabase: ReturnType<typeof vi.fn>;
|
||||
resetVectorState: ReturnType<typeof vi.fn>;
|
||||
ensureSchema: ReturnType<typeof vi.fn>;
|
||||
readMeta: ReturnType<typeof vi.fn>;
|
||||
@@ -80,6 +81,9 @@ describe("memory manager readonly recovery", () => {
|
||||
enqueueTargetedSessionSync: vi.fn(async () => {}),
|
||||
runSync: vi.fn(async (_params) => undefined) as ReadonlyRecoveryHarness["runSync"],
|
||||
openDatabase: vi.fn(() => reopenedDb),
|
||||
closeDatabase: vi.fn((db: DatabaseSync) => {
|
||||
db.close();
|
||||
}),
|
||||
resetVectorState: vi.fn(function (this: ReadonlyRecoveryHarness) {
|
||||
this.vector.dims = undefined;
|
||||
this.vectorDegradedWriteWarningShown = false;
|
||||
|
||||
@@ -36,6 +36,7 @@ import {
|
||||
getOrCreateManagedCacheEntry,
|
||||
resolveSingletonManagedCache,
|
||||
} from "./manager-cache.js";
|
||||
import { closeMemoryDatabase } from "./manager-db.js";
|
||||
import { MemoryManagerEmbeddingOps } from "./manager-embedding-ops.js";
|
||||
import {
|
||||
resolveMemoryPrimaryProviderRequest,
|
||||
@@ -694,6 +695,7 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
},
|
||||
runSync: (nextParams) => this.runSync(nextParams),
|
||||
openDatabase: () => this.openDatabase(),
|
||||
closeDatabase: (db) => closeMemoryDatabase(db),
|
||||
resetVectorState: () => this.resetVectorState(),
|
||||
ensureSchema: () => this.ensureSchema(),
|
||||
readMeta: () => this.readMeta() ?? undefined,
|
||||
@@ -862,7 +864,7 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
this.sessionUnsubscribe = null;
|
||||
}
|
||||
await awaitPendingManagerWork({ pendingSync, pendingProviderInit });
|
||||
this.db.close();
|
||||
closeMemoryDatabase(this.db);
|
||||
INDEX_CACHE.delete(this.cacheKey);
|
||||
}
|
||||
}
|
||||
|
||||
67
src/infra/sqlite-wal.test.ts
Normal file
67
src/infra/sqlite-wal.test.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import type { DatabaseSync } from "node:sqlite";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
DEFAULT_SQLITE_WAL_AUTOCHECKPOINT_PAGES,
|
||||
configureSqliteWalMaintenance,
|
||||
} from "./sqlite-wal.js";
|
||||
|
||||
function createMockDb(): DatabaseSync {
|
||||
return {
|
||||
exec: vi.fn(),
|
||||
} as unknown as DatabaseSync;
|
||||
}
|
||||
|
||||
describe("sqlite WAL maintenance", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("enables WAL mode and explicit autocheckpointing", () => {
|
||||
const db = createMockDb();
|
||||
|
||||
configureSqliteWalMaintenance(db, { checkpointIntervalMs: 0 });
|
||||
|
||||
expect(db.exec).toHaveBeenNthCalledWith(1, "PRAGMA journal_mode = WAL;");
|
||||
expect(db.exec).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
`PRAGMA wal_autocheckpoint = ${DEFAULT_SQLITE_WAL_AUTOCHECKPOINT_PAGES};`,
|
||||
);
|
||||
});
|
||||
|
||||
it("runs periodic TRUNCATE checkpoints and stops them on close", () => {
|
||||
vi.useFakeTimers();
|
||||
const db = createMockDb();
|
||||
|
||||
const maintenance = configureSqliteWalMaintenance(db, { checkpointIntervalMs: 100 });
|
||||
expect(db.exec).toHaveBeenCalledTimes(2);
|
||||
|
||||
vi.advanceTimersByTime(100);
|
||||
expect(db.exec).toHaveBeenLastCalledWith("PRAGMA wal_checkpoint(TRUNCATE);");
|
||||
expect(db.exec).toHaveBeenCalledTimes(3);
|
||||
|
||||
expect(maintenance.close()).toBe(true);
|
||||
expect(db.exec).toHaveBeenCalledTimes(4);
|
||||
|
||||
vi.advanceTimersByTime(200);
|
||||
expect(db.exec).toHaveBeenCalledTimes(4);
|
||||
});
|
||||
|
||||
it("reports checkpoint errors without throwing from background maintenance", () => {
|
||||
const db = createMockDb();
|
||||
const error = new Error("busy");
|
||||
const onCheckpointError = vi.fn();
|
||||
vi.mocked(db.exec).mockImplementation((sql) => {
|
||||
if (sql.includes("wal_checkpoint")) {
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
const maintenance = configureSqliteWalMaintenance(db, {
|
||||
checkpointIntervalMs: 0,
|
||||
onCheckpointError,
|
||||
});
|
||||
|
||||
expect(maintenance.checkpoint()).toBe(false);
|
||||
expect(onCheckpointError).toHaveBeenCalledWith(error);
|
||||
});
|
||||
});
|
||||
74
src/infra/sqlite-wal.ts
Normal file
74
src/infra/sqlite-wal.ts
Normal file
@@ -0,0 +1,74 @@
|
||||
import type { DatabaseSync } from "node:sqlite";
|
||||
|
||||
export const DEFAULT_SQLITE_WAL_AUTOCHECKPOINT_PAGES = 1000;
|
||||
export const DEFAULT_SQLITE_WAL_TRUNCATE_INTERVAL_MS = 30 * 60 * 1000;
|
||||
|
||||
type IntervalHandle = ReturnType<typeof setInterval> & {
|
||||
unref?: () => void;
|
||||
};
|
||||
|
||||
type SqliteWalCheckpointMode = "PASSIVE" | "FULL" | "RESTART" | "TRUNCATE";
|
||||
|
||||
export type SqliteWalMaintenance = {
|
||||
checkpoint: () => boolean;
|
||||
close: () => boolean;
|
||||
};
|
||||
|
||||
export type SqliteWalMaintenanceOptions = {
|
||||
autoCheckpointPages?: number;
|
||||
checkpointIntervalMs?: number;
|
||||
checkpointMode?: SqliteWalCheckpointMode;
|
||||
onCheckpointError?: (error: unknown) => void;
|
||||
};
|
||||
|
||||
function normalizeNonNegativeInteger(value: number, label: string): number {
|
||||
if (!Number.isInteger(value) || value < 0) {
|
||||
throw new Error(`${label} must be a non-negative integer`);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
export function configureSqliteWalMaintenance(
|
||||
db: DatabaseSync,
|
||||
options: SqliteWalMaintenanceOptions = {},
|
||||
): SqliteWalMaintenance {
|
||||
const autoCheckpointPages = normalizeNonNegativeInteger(
|
||||
options.autoCheckpointPages ?? DEFAULT_SQLITE_WAL_AUTOCHECKPOINT_PAGES,
|
||||
"autoCheckpointPages",
|
||||
);
|
||||
const checkpointIntervalMs = normalizeNonNegativeInteger(
|
||||
options.checkpointIntervalMs ?? DEFAULT_SQLITE_WAL_TRUNCATE_INTERVAL_MS,
|
||||
"checkpointIntervalMs",
|
||||
);
|
||||
const checkpointMode = options.checkpointMode ?? "TRUNCATE";
|
||||
|
||||
db.exec("PRAGMA journal_mode = WAL;");
|
||||
db.exec(`PRAGMA wal_autocheckpoint = ${autoCheckpointPages};`);
|
||||
|
||||
const checkpoint = (): boolean => {
|
||||
try {
|
||||
db.exec(`PRAGMA wal_checkpoint(${checkpointMode});`);
|
||||
return true;
|
||||
} catch (error) {
|
||||
options.onCheckpointError?.(error);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let timer: IntervalHandle | null = null;
|
||||
if (checkpointIntervalMs > 0) {
|
||||
timer = setInterval(checkpoint, checkpointIntervalMs) as IntervalHandle;
|
||||
timer.unref?.();
|
||||
}
|
||||
|
||||
return {
|
||||
checkpoint,
|
||||
close: () => {
|
||||
if (timer) {
|
||||
clearInterval(timer);
|
||||
timer = null;
|
||||
}
|
||||
return checkpoint();
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -40,5 +40,9 @@ export type {
|
||||
} from "./host/types.js";
|
||||
export { ensureMemoryIndexSchema } from "./host/memory-schema.js";
|
||||
export { loadSqliteVecExtension } from "./host/sqlite-vec.js";
|
||||
export { requireNodeSqlite } from "./host/sqlite.js";
|
||||
export {
|
||||
closeMemorySqliteWalMaintenance,
|
||||
configureMemorySqliteWalMaintenance,
|
||||
requireNodeSqlite,
|
||||
} from "./host/sqlite.js";
|
||||
export { isFileMissingError, statRegularFile } from "./host/fs-utils.js";
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
import { createRequire } from "node:module";
|
||||
import type { DatabaseSync } from "node:sqlite";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import {
|
||||
configureSqliteWalMaintenance,
|
||||
type SqliteWalMaintenance,
|
||||
type SqliteWalMaintenanceOptions,
|
||||
} from "../../infra/sqlite-wal.js";
|
||||
import { installProcessWarningFilter } from "../../infra/warning-filter.js";
|
||||
|
||||
const require = createRequire(import.meta.url);
|
||||
const sqliteWalMaintenanceByDb = new WeakMap<DatabaseSync, SqliteWalMaintenance>();
|
||||
|
||||
export function requireNodeSqlite(): typeof import("node:sqlite") {
|
||||
installProcessWarningFilter();
|
||||
@@ -18,3 +25,25 @@ export function requireNodeSqlite(): typeof import("node:sqlite") {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export function configureMemorySqliteWalMaintenance(
|
||||
db: DatabaseSync,
|
||||
options?: SqliteWalMaintenanceOptions,
|
||||
): SqliteWalMaintenance {
|
||||
const existing = sqliteWalMaintenanceByDb.get(db);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const maintenance = configureSqliteWalMaintenance(db, options);
|
||||
sqliteWalMaintenanceByDb.set(db, maintenance);
|
||||
return maintenance;
|
||||
}
|
||||
|
||||
export function closeMemorySqliteWalMaintenance(db: DatabaseSync): boolean {
|
||||
const maintenance = sqliteWalMaintenanceByDb.get(db);
|
||||
if (!maintenance) {
|
||||
return true;
|
||||
}
|
||||
sqliteWalMaintenanceByDb.delete(db);
|
||||
return maintenance.close();
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import type { DatabaseSync } from "node:sqlite";
|
||||
import { requireNodeSqlite } from "../infra/node-sqlite.js";
|
||||
import { configureSqliteWalMaintenance, type SqliteWalMaintenance } from "../infra/sqlite-wal.js";
|
||||
import { readCaptureBlobText, writeCaptureBlob } from "./blob-store.js";
|
||||
import type {
|
||||
CaptureBlobRecord,
|
||||
@@ -18,11 +19,16 @@ function ensureParentDir(filePath: string) {
|
||||
fs.mkdirSync(path.dirname(filePath), { recursive: true });
|
||||
}
|
||||
|
||||
function openDatabase(dbPath: string): DatabaseSync {
|
||||
type OpenedDatabase = {
|
||||
db: DatabaseSync;
|
||||
walMaintenance: SqliteWalMaintenance;
|
||||
};
|
||||
|
||||
function openDatabase(dbPath: string): OpenedDatabase {
|
||||
ensureParentDir(dbPath);
|
||||
const { DatabaseSync } = requireNodeSqlite();
|
||||
const db = new DatabaseSync(dbPath);
|
||||
db.exec("PRAGMA journal_mode = WAL");
|
||||
const walMaintenance = configureSqliteWalMaintenance(db);
|
||||
db.exec("PRAGMA busy_timeout = 5000");
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS capture_sessions (
|
||||
@@ -62,7 +68,7 @@ function openDatabase(dbPath: string): DatabaseSync {
|
||||
CREATE INDEX IF NOT EXISTS capture_events_session_ts_idx ON capture_events(session_id, ts);
|
||||
CREATE INDEX IF NOT EXISTS capture_events_flow_idx ON capture_events(flow_id, ts);
|
||||
`);
|
||||
return db;
|
||||
return { db, walMaintenance };
|
||||
}
|
||||
|
||||
function serializeJson(value: unknown): string | null {
|
||||
@@ -93,19 +99,23 @@ function sortObservedCounts(counts: Map<string, number>): CaptureObservedDimensi
|
||||
|
||||
export class DebugProxyCaptureStore {
|
||||
readonly db: DatabaseSync;
|
||||
private readonly walMaintenance: SqliteWalMaintenance;
|
||||
private closed = false;
|
||||
|
||||
constructor(
|
||||
readonly dbPath: string,
|
||||
readonly blobDir: string,
|
||||
) {
|
||||
this.db = openDatabase(dbPath);
|
||||
const opened = openDatabase(dbPath);
|
||||
this.db = opened.db;
|
||||
this.walMaintenance = opened.walMaintenance;
|
||||
}
|
||||
|
||||
close(): void {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
this.walMaintenance.close();
|
||||
this.db.close();
|
||||
this.closed = true;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { chmodSync, existsSync, mkdirSync } from "node:fs";
|
||||
import type { DatabaseSync, StatementSync } from "node:sqlite";
|
||||
import { requireNodeSqlite } from "../infra/node-sqlite.js";
|
||||
import { configureSqliteWalMaintenance, type SqliteWalMaintenance } from "../infra/sqlite-wal.js";
|
||||
import type { DeliveryContext } from "../utils/delivery-context.types.js";
|
||||
import {
|
||||
resolveTaskFlowRegistryDir,
|
||||
@@ -42,6 +43,7 @@ type FlowRegistryDatabase = {
|
||||
db: DatabaseSync;
|
||||
path: string;
|
||||
statements: FlowRegistryStatements;
|
||||
walMaintenance: SqliteWalMaintenance;
|
||||
};
|
||||
|
||||
let cachedDatabase: FlowRegistryDatabase | null = null;
|
||||
@@ -335,13 +337,14 @@ function openFlowRegistryDatabase(): FlowRegistryDatabase {
|
||||
return cachedDatabase;
|
||||
}
|
||||
if (cachedDatabase) {
|
||||
cachedDatabase.walMaintenance.close();
|
||||
cachedDatabase.db.close();
|
||||
cachedDatabase = null;
|
||||
}
|
||||
ensureFlowRegistryPermissions(pathname);
|
||||
const { DatabaseSync } = requireNodeSqlite();
|
||||
const db = new DatabaseSync(pathname);
|
||||
db.exec(`PRAGMA journal_mode = WAL;`);
|
||||
const walMaintenance = configureSqliteWalMaintenance(db);
|
||||
db.exec(`PRAGMA synchronous = NORMAL;`);
|
||||
db.exec(`PRAGMA busy_timeout = 5000;`);
|
||||
ensureSchema(db);
|
||||
@@ -350,6 +353,7 @@ function openFlowRegistryDatabase(): FlowRegistryDatabase {
|
||||
db,
|
||||
path: pathname,
|
||||
statements: createStatements(db),
|
||||
walMaintenance,
|
||||
};
|
||||
return cachedDatabase;
|
||||
}
|
||||
@@ -400,6 +404,7 @@ export function closeTaskFlowRegistrySqliteStore() {
|
||||
if (!cachedDatabase) {
|
||||
return;
|
||||
}
|
||||
cachedDatabase.walMaintenance.close();
|
||||
cachedDatabase.db.close();
|
||||
cachedDatabase = null;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { chmodSync, existsSync, mkdirSync } from "node:fs";
|
||||
import type { DatabaseSync, StatementSync } from "node:sqlite";
|
||||
import { requireNodeSqlite } from "../infra/node-sqlite.js";
|
||||
import { configureSqliteWalMaintenance, type SqliteWalMaintenance } from "../infra/sqlite-wal.js";
|
||||
import type { DeliveryContext } from "../utils/delivery-context.types.js";
|
||||
import { resolveTaskRegistryDir, resolveTaskRegistrySqlitePath } from "./task-registry.paths.js";
|
||||
import type { TaskRegistryStoreSnapshot } from "./task-registry.store.types.js";
|
||||
@@ -60,6 +61,7 @@ type TaskRegistryDatabase = {
|
||||
db: DatabaseSync;
|
||||
path: string;
|
||||
statements: TaskRegistryStatements;
|
||||
walMaintenance: SqliteWalMaintenance;
|
||||
};
|
||||
|
||||
let cachedDatabase: TaskRegistryDatabase | null = null;
|
||||
@@ -441,13 +443,14 @@ function openTaskRegistryDatabase(): TaskRegistryDatabase {
|
||||
return cachedDatabase;
|
||||
}
|
||||
if (cachedDatabase) {
|
||||
cachedDatabase.walMaintenance.close();
|
||||
cachedDatabase.db.close();
|
||||
cachedDatabase = null;
|
||||
}
|
||||
ensureTaskRegistryPermissions(pathname);
|
||||
const { DatabaseSync } = requireNodeSqlite();
|
||||
const db = new DatabaseSync(pathname);
|
||||
db.exec(`PRAGMA journal_mode = WAL;`);
|
||||
const walMaintenance = configureSqliteWalMaintenance(db);
|
||||
db.exec(`PRAGMA synchronous = NORMAL;`);
|
||||
db.exec(`PRAGMA busy_timeout = 5000;`);
|
||||
ensureSchema(db);
|
||||
@@ -456,6 +459,7 @@ function openTaskRegistryDatabase(): TaskRegistryDatabase {
|
||||
db,
|
||||
path: pathname,
|
||||
statements: createStatements(db),
|
||||
walMaintenance,
|
||||
};
|
||||
return cachedDatabase;
|
||||
}
|
||||
@@ -542,6 +546,7 @@ export function closeTaskRegistrySqliteStore() {
|
||||
if (!cachedDatabase) {
|
||||
return;
|
||||
}
|
||||
cachedDatabase.walMaintenance.close();
|
||||
cachedDatabase.db.close();
|
||||
cachedDatabase = null;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user