mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-13 23:56:07 +00:00
refactor: preserve kysely sync result types
This commit is contained in:
@@ -133,6 +133,11 @@ Keep helpers composable:
|
||||
- Accept a transaction-capable database object when work may run inside a
|
||||
transaction.
|
||||
- Alias computed selections explicitly.
|
||||
- Let Kysely carry selected row shapes through builder queries. Avoid passing a
|
||||
broad row generic to a sync execution helper when the builder already knows
|
||||
the result type; use exact boundary types or a mapper instead.
|
||||
- For finite public query presets, prefer a preset-to-row type map and exported
|
||||
union over a generic `Record<string, ...>` row shape.
|
||||
|
||||
## Raw SQL
|
||||
|
||||
@@ -209,6 +214,8 @@ Adapter rules:
|
||||
pragmas, CTEs, and raw SQL make verb heuristics brittle.
|
||||
- Execute row-returning statements with `all()` or `iterate()`, and mutations
|
||||
with `run()`.
|
||||
- Preserve the row type from `CompiledQuery<Row>` in sync execution helpers so
|
||||
native stores keep Kysely's inferred result shape after compilation.
|
||||
- Do not blindly map `lastInsertRowid` to Kysely `insertId`. In `node:sqlite`,
|
||||
that value is connection-scoped and can be stale for updates or ignored
|
||||
inserts. Only return `insertId` for insert statements that changed rows.
|
||||
@@ -236,6 +243,7 @@ a real in-memory SQLite database when feasible.
|
||||
Minimum coverage for the native adapter:
|
||||
|
||||
- builder `select`
|
||||
- sync helper type inference for aliases, aggregates, and driver-specific values
|
||||
- raw row-returning SQL
|
||||
- non-returning insert metadata
|
||||
- `INSERT ... RETURNING`
|
||||
|
||||
28
src/infra/delivery-queue-entry-json.ts
Normal file
28
src/infra/delivery-queue-entry-json.ts
Normal file
@@ -0,0 +1,28 @@
|
||||
export type DeliveryQueueEntryJsonRow = {
|
||||
entry_json: string;
|
||||
};
|
||||
|
||||
export function isDeliveryQueueEntryWithId(
|
||||
value: unknown,
|
||||
): value is Record<string, unknown> & { id: string } {
|
||||
if (!value || typeof value !== "object" || Array.isArray(value) || !("id" in value)) {
|
||||
return false;
|
||||
}
|
||||
return typeof value.id === "string";
|
||||
}
|
||||
|
||||
export function parseDeliveryQueueEntryJson<Entry>(
|
||||
row: DeliveryQueueEntryJsonRow | undefined,
|
||||
isEntry: (value: unknown) => value is Entry,
|
||||
): Entry | null {
|
||||
if (!row) {
|
||||
return null;
|
||||
}
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(row.entry_json) as unknown;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
return isEntry(parsed) ? parsed : null;
|
||||
}
|
||||
@@ -269,7 +269,7 @@ function readGatewayLockPayload(
|
||||
return runOpenClawStateWriteTransaction((database) => {
|
||||
const db = getNodeSqliteKysely<GatewayLockKvDatabase>(database.db);
|
||||
const row =
|
||||
executeSqliteQueryTakeFirstSync<GatewayLockKvRow>(
|
||||
executeSqliteQueryTakeFirstSync(
|
||||
database.db,
|
||||
db
|
||||
.selectFrom("kv")
|
||||
@@ -317,7 +317,7 @@ function tryAcquireGatewayLockRow(params: {
|
||||
(database) => {
|
||||
const db = getNodeSqliteKysely<GatewayLockKvDatabase>(database.db);
|
||||
const existingRow =
|
||||
executeSqliteQueryTakeFirstSync<GatewayLockKvRow>(
|
||||
executeSqliteQueryTakeFirstSync(
|
||||
database.db,
|
||||
db
|
||||
.selectFrom("kv")
|
||||
@@ -362,7 +362,7 @@ function clearGatewayLockRowIfTokenMatches(params: {
|
||||
(database) => {
|
||||
const db = getNodeSqliteKysely<GatewayLockKvDatabase>(database.db);
|
||||
const existingRow =
|
||||
executeSqliteQueryTakeFirstSync<GatewayLockKvRow>(
|
||||
executeSqliteQueryTakeFirstSync(
|
||||
database.db,
|
||||
db
|
||||
.selectFrom("kv")
|
||||
|
||||
@@ -4,8 +4,7 @@ import { InsertQueryNode, Kysely as KyselyInstance } from "kysely";
|
||||
import { NodeSqliteKyselyDialect } from "./kysely-node-sqlite.js";
|
||||
|
||||
type CompilableQuery<Row = unknown> = {
|
||||
compile(): CompiledQuery;
|
||||
readonly __resultRow?: Row;
|
||||
compile(): CompiledQuery<Row>;
|
||||
};
|
||||
|
||||
const kyselyByDatabase = new WeakMap<DatabaseSync, Kysely<unknown>>();
|
||||
@@ -24,7 +23,7 @@ export function getNodeSqliteKysely<Database>(db: DatabaseSync): Kysely<Database
|
||||
|
||||
export function executeCompiledSqliteQuerySync<Row>(
|
||||
db: DatabaseSync,
|
||||
compiledQuery: CompiledQuery,
|
||||
compiledQuery: CompiledQuery<Row>,
|
||||
): QueryResult<Row> {
|
||||
const statement = db.prepare(compiledQuery.sql);
|
||||
const parameters = compiledQuery.parameters as SQLInputValue[];
|
||||
|
||||
51
src/infra/kysely-sync.types.test.ts
Normal file
51
src/infra/kysely-sync.types.test.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
import type { DatabaseSync } from "node:sqlite";
|
||||
import { describe, expect, expectTypeOf, it } from "vitest";
|
||||
import {
|
||||
executeSqliteQuerySync,
|
||||
executeSqliteQueryTakeFirstSync,
|
||||
getNodeSqliteKysely,
|
||||
} from "./kysely-sync.js";
|
||||
|
||||
type TypeTestDatabase = {
|
||||
type_test_items: {
|
||||
id: number;
|
||||
name: string | null;
|
||||
data: Uint8Array;
|
||||
};
|
||||
};
|
||||
|
||||
describe("kysely sync helper types", () => {
|
||||
it("preserves Kysely builder result rows through sync helpers", () => {
|
||||
const nativeDb = {} as DatabaseSync;
|
||||
const db = getNodeSqliteKysely<TypeTestDatabase>(nativeDb);
|
||||
const query = db
|
||||
.selectFrom("type_test_items")
|
||||
.select((eb) => ["id as itemId", "name", "data", eb.fn.countAll<number>().as("total")])
|
||||
.groupBy(["id", "name", "data"]);
|
||||
|
||||
if (false) {
|
||||
const result = executeSqliteQuerySync(nativeDb, query);
|
||||
expectTypeOf(result.rows).toEqualTypeOf<
|
||||
Array<{
|
||||
itemId: number;
|
||||
name: string | null;
|
||||
data: Uint8Array;
|
||||
total: number;
|
||||
}>
|
||||
>();
|
||||
|
||||
const row = executeSqliteQueryTakeFirstSync(nativeDb, query);
|
||||
expectTypeOf(row).toEqualTypeOf<
|
||||
| {
|
||||
itemId: number;
|
||||
name: string | null;
|
||||
data: Uint8Array;
|
||||
total: number;
|
||||
}
|
||||
| undefined
|
||||
>();
|
||||
}
|
||||
|
||||
expect(query.compile().sql).toContain("select");
|
||||
});
|
||||
});
|
||||
@@ -191,7 +191,7 @@ function loadBindingsIntoMemory(): void {
|
||||
bindingsByConversationKey.clear();
|
||||
const stateDatabase = openOpenClawStateDatabase();
|
||||
const db = getCurrentConversationBindingsKysely(stateDatabase.db);
|
||||
const rows = executeSqliteQuerySync<CurrentConversationBindingRow>(
|
||||
const rows = executeSqliteQuerySync(
|
||||
stateDatabase.db,
|
||||
db.selectFrom("current_conversation_bindings").selectAll().orderBy("updated_at", "asc"),
|
||||
).rows;
|
||||
|
||||
@@ -7,6 +7,11 @@ import {
|
||||
runOpenClawStateWriteTransaction,
|
||||
type OpenClawStateDatabaseOptions,
|
||||
} from "../../state/openclaw-state-db.js";
|
||||
import {
|
||||
isDeliveryQueueEntryWithId,
|
||||
parseDeliveryQueueEntryJson,
|
||||
type DeliveryQueueEntryJsonRow,
|
||||
} from "../delivery-queue-entry-json.js";
|
||||
import {
|
||||
executeSqliteQuerySync,
|
||||
executeSqliteQueryTakeFirstSync,
|
||||
@@ -72,10 +77,6 @@ export interface QueuedDelivery extends QueuedDeliveryPayload {
|
||||
|
||||
type DeliveryQueueDatabase = Pick<OpenClawStateKyselyDatabase, "delivery_queue_entries">;
|
||||
|
||||
type DeliveryQueueEntryRow = {
|
||||
entry_json: string;
|
||||
};
|
||||
|
||||
function databaseOptions(stateDir?: string): OpenClawStateDatabaseOptions {
|
||||
return stateDir ? { env: { ...process.env, OPENCLAW_STATE_DIR: stateDir } } : {};
|
||||
}
|
||||
@@ -113,16 +114,21 @@ function createMissingQueueEntryError(id: string): NodeJS.ErrnoException {
|
||||
return error;
|
||||
}
|
||||
|
||||
function parseQueueEntry(row: DeliveryQueueEntryRow | undefined): QueuedDelivery | null {
|
||||
if (!row) {
|
||||
return null;
|
||||
}
|
||||
const parsed = JSON.parse(row.entry_json) as unknown;
|
||||
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
|
||||
return null;
|
||||
}
|
||||
const entry = parsed as QueuedDelivery;
|
||||
return typeof entry.id === "string" ? entry : null;
|
||||
function isQueuedDelivery(value: unknown): value is QueuedDelivery {
|
||||
return (
|
||||
isDeliveryQueueEntryWithId(value) &&
|
||||
typeof value.channel === "string" &&
|
||||
typeof value.to === "string" &&
|
||||
Array.isArray(value.payloads) &&
|
||||
typeof value.enqueuedAt === "number" &&
|
||||
Number.isFinite(value.enqueuedAt) &&
|
||||
typeof value.retryCount === "number" &&
|
||||
Number.isFinite(value.retryCount)
|
||||
);
|
||||
}
|
||||
|
||||
function parseQueueEntry(row: DeliveryQueueEntryJsonRow | undefined): QueuedDelivery | null {
|
||||
return parseDeliveryQueueEntryJson(row, isQueuedDelivery);
|
||||
}
|
||||
|
||||
function loadQueueEntryByStatus(
|
||||
@@ -132,7 +138,7 @@ function loadQueueEntryByStatus(
|
||||
): QueuedDelivery | null {
|
||||
const stateDatabase = openOpenClawStateDatabase(databaseOptions(stateDir));
|
||||
const db = getNodeSqliteKysely<DeliveryQueueDatabase>(stateDatabase.db);
|
||||
const row = executeSqliteQueryTakeFirstSync<DeliveryQueueEntryRow>(
|
||||
const row = executeSqliteQueryTakeFirstSync(
|
||||
stateDatabase.db,
|
||||
db
|
||||
.selectFrom("delivery_queue_entries")
|
||||
@@ -284,7 +290,7 @@ export async function loadPendingDelivery(
|
||||
export async function loadPendingDeliveries(stateDir?: string): Promise<QueuedDelivery[]> {
|
||||
const stateDatabase = openOpenClawStateDatabase(databaseOptions(stateDir));
|
||||
const db = getNodeSqliteKysely<DeliveryQueueDatabase>(stateDatabase.db);
|
||||
const rows = executeSqliteQuerySync<DeliveryQueueEntryRow>(
|
||||
const rows = executeSqliteQuerySync(
|
||||
stateDatabase.db,
|
||||
db
|
||||
.selectFrom("delivery_queue_entries")
|
||||
|
||||
@@ -17,17 +17,13 @@ import type { DeliverFn, RecoveryLogger } from "./delivery-queue.js";
|
||||
|
||||
type DeliveryQueueDatabase = Pick<OpenClawStateKyselyDatabase, "delivery_queue_entries">;
|
||||
|
||||
type DeliveryQueueEntryRow = {
|
||||
entry_json: string;
|
||||
};
|
||||
|
||||
const QUEUE_NAME = "outbound-delivery";
|
||||
|
||||
function databaseOptions(tmpDir: string) {
|
||||
return { env: { ...process.env, OPENCLAW_STATE_DIR: tmpDir } };
|
||||
}
|
||||
|
||||
function parseEntry(row: DeliveryQueueEntryRow | undefined, id: string): Record<string, unknown> {
|
||||
function parseEntry(row: { entry_json: string } | undefined, id: string): Record<string, unknown> {
|
||||
if (!row) {
|
||||
throw new Error(`missing queued delivery test entry: ${id}`);
|
||||
}
|
||||
@@ -65,7 +61,7 @@ export function installDeliveryQueueTmpDirHooks(): { readonly tmpDir: () => stri
|
||||
export function readQueuedEntry(tmpDir: string, id: string): Record<string, unknown> {
|
||||
const stateDatabase = openOpenClawStateDatabase(databaseOptions(tmpDir));
|
||||
const db = getNodeSqliteKysely<DeliveryQueueDatabase>(stateDatabase.db);
|
||||
const row = executeSqliteQueryTakeFirstSync<DeliveryQueueEntryRow>(
|
||||
const row = executeSqliteQueryTakeFirstSync(
|
||||
stateDatabase.db,
|
||||
db
|
||||
.selectFrom("delivery_queue_entries")
|
||||
@@ -80,7 +76,7 @@ export function readQueuedEntry(tmpDir: string, id: string): Record<string, unkn
|
||||
export function readFailedQueuedEntry(tmpDir: string, id: string): Record<string, unknown> | null {
|
||||
const stateDatabase = openOpenClawStateDatabase(databaseOptions(tmpDir));
|
||||
const db = getNodeSqliteKysely<DeliveryQueueDatabase>(stateDatabase.db);
|
||||
const row = executeSqliteQueryTakeFirstSync<DeliveryQueueEntryRow>(
|
||||
const row = executeSqliteQueryTakeFirstSync(
|
||||
stateDatabase.db,
|
||||
db
|
||||
.selectFrom("delivery_queue_entries")
|
||||
@@ -95,7 +91,7 @@ export function readFailedQueuedEntry(tmpDir: string, id: string): Record<string
|
||||
export function readPendingQueuedEntries(tmpDir: string): Record<string, unknown>[] {
|
||||
const stateDatabase = openOpenClawStateDatabase(databaseOptions(tmpDir));
|
||||
const db = getNodeSqliteKysely<DeliveryQueueDatabase>(stateDatabase.db);
|
||||
return executeSqliteQuerySync<DeliveryQueueEntryRow>(
|
||||
return executeSqliteQuerySync(
|
||||
stateDatabase.db,
|
||||
db
|
||||
.selectFrom("delivery_queue_entries")
|
||||
|
||||
@@ -6,6 +6,11 @@ import {
|
||||
runOpenClawStateWriteTransaction,
|
||||
type OpenClawStateDatabaseOptions,
|
||||
} from "../state/openclaw-state-db.js";
|
||||
import {
|
||||
isDeliveryQueueEntryWithId,
|
||||
parseDeliveryQueueEntryJson,
|
||||
type DeliveryQueueEntryJsonRow,
|
||||
} from "./delivery-queue-entry-json.js";
|
||||
import {
|
||||
executeSqliteQuerySync,
|
||||
executeSqliteQueryTakeFirstSync,
|
||||
@@ -63,10 +68,6 @@ export type QueuedSessionDelivery = QueuedSessionDeliveryPayload & {
|
||||
|
||||
type DeliveryQueueDatabase = Pick<OpenClawStateKyselyDatabase, "delivery_queue_entries">;
|
||||
|
||||
type DeliveryQueueEntryRow = {
|
||||
entry_json: string;
|
||||
};
|
||||
|
||||
function buildEntryId(idempotencyKey?: string): string {
|
||||
if (!idempotencyKey) {
|
||||
return generateSecureUuid();
|
||||
@@ -78,16 +79,27 @@ function databaseOptions(stateDir?: string): OpenClawStateDatabaseOptions {
|
||||
return stateDir ? { env: { ...process.env, OPENCLAW_STATE_DIR: stateDir } } : {};
|
||||
}
|
||||
|
||||
function parseQueueEntry(row: DeliveryQueueEntryRow | undefined): QueuedSessionDelivery | null {
|
||||
if (!row) {
|
||||
return null;
|
||||
function isQueuedSessionDelivery(value: unknown): value is QueuedSessionDelivery {
|
||||
if (
|
||||
!isDeliveryQueueEntryWithId(value) ||
|
||||
typeof value.sessionKey !== "string" ||
|
||||
typeof value.enqueuedAt !== "number" ||
|
||||
!Number.isFinite(value.enqueuedAt) ||
|
||||
typeof value.retryCount !== "number" ||
|
||||
!Number.isFinite(value.retryCount)
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
const parsed = JSON.parse(row.entry_json) as unknown;
|
||||
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
|
||||
return null;
|
||||
if (value.kind === "systemEvent") {
|
||||
return typeof value.text === "string";
|
||||
}
|
||||
const entry = parsed as QueuedSessionDelivery;
|
||||
return typeof entry.id === "string" ? entry : null;
|
||||
return value.kind === "agentTurn"
|
||||
? typeof value.message === "string" && typeof value.messageId === "string"
|
||||
: false;
|
||||
}
|
||||
|
||||
function parseQueueEntry(row: DeliveryQueueEntryJsonRow | undefined): QueuedSessionDelivery | null {
|
||||
return parseDeliveryQueueEntryJson(row, isQueuedSessionDelivery);
|
||||
}
|
||||
|
||||
function ensureSessionDeliveryQueueStorage(stateDir?: string): void {
|
||||
@@ -194,7 +206,7 @@ export async function loadPendingSessionDelivery(
|
||||
): Promise<QueuedSessionDelivery | null> {
|
||||
const stateDatabase = openOpenClawStateDatabase(databaseOptions(stateDir));
|
||||
const db = getNodeSqliteKysely<DeliveryQueueDatabase>(stateDatabase.db);
|
||||
const row = executeSqliteQueryTakeFirstSync<DeliveryQueueEntryRow>(
|
||||
const row = executeSqliteQueryTakeFirstSync(
|
||||
stateDatabase.db,
|
||||
db
|
||||
.selectFrom("delivery_queue_entries")
|
||||
@@ -211,7 +223,7 @@ export async function loadPendingSessionDeliveries(
|
||||
): Promise<QueuedSessionDelivery[]> {
|
||||
const stateDatabase = openOpenClawStateDatabase(databaseOptions(stateDir));
|
||||
const db = getNodeSqliteKysely<DeliveryQueueDatabase>(stateDatabase.db);
|
||||
const rows = executeSqliteQuerySync<DeliveryQueueEntryRow>(
|
||||
const rows = executeSqliteQuerySync(
|
||||
stateDatabase.db,
|
||||
db
|
||||
.selectFrom("delivery_queue_entries")
|
||||
|
||||
@@ -457,12 +457,7 @@ describe("state migrations", () => {
|
||||
|
||||
const stateDatabase = openOpenClawStateDatabase({ env });
|
||||
const db = getNodeSqliteKysely<DeliveryQueueTestDatabase>(stateDatabase.db);
|
||||
const rows = executeSqliteQuerySync<{
|
||||
queue_name: string;
|
||||
id: string;
|
||||
status: string;
|
||||
entry_json: string;
|
||||
}>(
|
||||
const rows = executeSqliteQuerySync(
|
||||
stateDatabase.db,
|
||||
db
|
||||
.selectFrom("delivery_queue_entries")
|
||||
@@ -571,7 +566,7 @@ describe("state migrations", () => {
|
||||
|
||||
const stateDatabase = openOpenClawStateDatabase({ env });
|
||||
const db = getNodeSqliteKysely<PluginStateTestDatabase>(stateDatabase.db);
|
||||
const row = executeSqliteQuerySync<{ value_json: string }>(
|
||||
const row = executeSqliteQuerySync(
|
||||
stateDatabase.db,
|
||||
db
|
||||
.selectFrom("plugin_state_entries")
|
||||
@@ -638,12 +633,7 @@ describe("state migrations", () => {
|
||||
|
||||
const stateDatabase = openOpenClawStateDatabase({ env });
|
||||
const db = getNodeSqliteKysely<CurrentConversationBindingsTestDatabase>(stateDatabase.db);
|
||||
const row = executeSqliteQuerySync<{
|
||||
binding_key: string;
|
||||
binding_id: string;
|
||||
target_session_key: string;
|
||||
record_json: string;
|
||||
}>(
|
||||
const row = executeSqliteQuerySync(
|
||||
stateDatabase.db,
|
||||
db
|
||||
.selectFrom("current_conversation_bindings")
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { mkdtempSync, rmSync } from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { afterEach, describe, expect, expectTypeOf, it } from "vitest";
|
||||
import {
|
||||
collectSqliteSchemaShape,
|
||||
createSqliteSchemaShapeFromSql,
|
||||
@@ -13,6 +13,7 @@ import {
|
||||
getDebugProxyCaptureStore,
|
||||
persistEventPayload,
|
||||
} from "./store.sqlite.js";
|
||||
import type { CaptureQueryRowsByPreset } from "./types.js";
|
||||
|
||||
const cleanupDirs: string[] = [];
|
||||
|
||||
@@ -39,6 +40,21 @@ function readPragmaNumber(db: import("node:sqlite").DatabaseSync, pragma: string
|
||||
}
|
||||
|
||||
describe("DebugProxyCaptureStore", () => {
|
||||
it("types query preset rows by preset", () => {
|
||||
const store = null as unknown as DebugProxyCaptureStore;
|
||||
|
||||
if (false) {
|
||||
expectTypeOf(store.queryPreset("double-sends")).toEqualTypeOf<
|
||||
CaptureQueryRowsByPreset["double-sends"][]
|
||||
>();
|
||||
expectTypeOf(store.queryPreset("missing-ack")).toEqualTypeOf<
|
||||
CaptureQueryRowsByPreset["missing-ack"][]
|
||||
>();
|
||||
}
|
||||
|
||||
expect(true).toBe(true);
|
||||
});
|
||||
|
||||
it("keeps the cached store open until the last lease releases", () => {
|
||||
const root = mkdtempSync(path.join(os.tmpdir(), "openclaw-proxy-capture-lease-"));
|
||||
cleanupDirs.push(root);
|
||||
|
||||
@@ -20,6 +20,7 @@ import type {
|
||||
CaptureObservedDimension,
|
||||
CaptureQueryPreset,
|
||||
CaptureQueryRow,
|
||||
CaptureQueryRowsByPreset,
|
||||
CaptureSessionCoverageSummary,
|
||||
CaptureSessionRecord,
|
||||
CaptureSessionSummary,
|
||||
@@ -75,24 +76,6 @@ function sortObservedCounts(counts: Map<string, number>): CaptureObservedDimensi
|
||||
.toSorted((left, right) => right.count - left.count || left.value.localeCompare(right.value));
|
||||
}
|
||||
|
||||
type CaptureSessionRow = {
|
||||
id: string;
|
||||
startedAt: number;
|
||||
endedAt: number | null;
|
||||
mode: string;
|
||||
sourceProcess: string;
|
||||
proxyUrl: string | null;
|
||||
eventCount: number;
|
||||
};
|
||||
|
||||
type BlobIdRow = {
|
||||
blobId: string | null;
|
||||
};
|
||||
|
||||
type CountRow = {
|
||||
count: number;
|
||||
};
|
||||
|
||||
function getCaptureKysely(db: DatabaseSync) {
|
||||
return getNodeSqliteKysely<ProxyCaptureKyselyDatabase>(db);
|
||||
}
|
||||
@@ -115,7 +98,7 @@ function countTable(
|
||||
table: "capture_blobs" | "capture_events" | "capture_sessions",
|
||||
): number {
|
||||
return (
|
||||
executeSqliteQueryTakeFirstSync<CountRow>(
|
||||
executeSqliteQueryTakeFirstSync(
|
||||
db,
|
||||
getCaptureKysely(db).selectFrom(table).select((eb) => eb.fn.countAll<number>().as("count")),
|
||||
)?.count ?? 0
|
||||
@@ -241,7 +224,7 @@ export class DebugProxyCaptureStore {
|
||||
}
|
||||
|
||||
listSessions(limit = 50): CaptureSessionSummary[] {
|
||||
const rows = executeSqliteQuerySync<CaptureSessionRow>(
|
||||
const rows = executeSqliteQuerySync(
|
||||
this.db,
|
||||
getCaptureKysely(this.db)
|
||||
.selectFrom("capture_sessions as s")
|
||||
@@ -271,7 +254,7 @@ export class DebugProxyCaptureStore {
|
||||
}
|
||||
|
||||
getSessionEvents(sessionId: string, limit = 500): Array<Record<string, unknown>> {
|
||||
return executeSqliteQuerySync<Record<string, unknown>>(
|
||||
return executeSqliteQuerySync(
|
||||
this.db,
|
||||
getCaptureKysely(this.db)
|
||||
.selectFrom("capture_events")
|
||||
@@ -306,7 +289,7 @@ export class DebugProxyCaptureStore {
|
||||
}
|
||||
|
||||
summarizeSessionCoverage(sessionId: string): CaptureSessionCoverageSummary {
|
||||
const rows = executeSqliteQuerySync<{ host: string | null; metaJson: string | null }>(
|
||||
const rows = executeSqliteQuerySync(
|
||||
this.db,
|
||||
getCaptureKysely(this.db)
|
||||
.selectFrom("capture_events")
|
||||
@@ -361,7 +344,7 @@ export class DebugProxyCaptureStore {
|
||||
}
|
||||
|
||||
readBlob(blobId: string): string | null {
|
||||
const row = executeSqliteQueryTakeFirstSync<{ data: Uint8Array }>(
|
||||
const row = executeSqliteQueryTakeFirstSync(
|
||||
this.db,
|
||||
getCaptureKysely(this.db)
|
||||
.selectFrom("capture_blobs")
|
||||
@@ -372,11 +355,15 @@ export class DebugProxyCaptureStore {
|
||||
return row ? decodeCaptureBlobText(Buffer.from(row.data)) : null;
|
||||
}
|
||||
|
||||
queryPreset<Preset extends CaptureQueryPreset>(
|
||||
preset: Preset,
|
||||
sessionId?: string,
|
||||
): CaptureQueryRowsByPreset[Preset][];
|
||||
queryPreset(preset: CaptureQueryPreset, sessionId?: string): CaptureQueryRow[] {
|
||||
const db = getCaptureKysely(this.db);
|
||||
switch (preset) {
|
||||
case "double-sends":
|
||||
return executeSqliteQuerySync<CaptureQueryRow>(
|
||||
return executeSqliteQuerySync(
|
||||
this.db,
|
||||
db
|
||||
.selectFrom("capture_events")
|
||||
@@ -394,7 +381,7 @@ export class DebugProxyCaptureStore {
|
||||
.orderBy("host", "asc"),
|
||||
).rows;
|
||||
case "retry-storms":
|
||||
return executeSqliteQuerySync<CaptureQueryRow>(
|
||||
return executeSqliteQuerySync(
|
||||
this.db,
|
||||
db
|
||||
.selectFrom("capture_events")
|
||||
@@ -408,7 +395,7 @@ export class DebugProxyCaptureStore {
|
||||
.orderBy("host", "asc"),
|
||||
).rows;
|
||||
case "cache-busting":
|
||||
return executeSqliteQuerySync<CaptureQueryRow>(
|
||||
return executeSqliteQuerySync(
|
||||
this.db,
|
||||
db
|
||||
.selectFrom("capture_events")
|
||||
@@ -427,7 +414,7 @@ export class DebugProxyCaptureStore {
|
||||
.orderBy("host", "asc"),
|
||||
).rows;
|
||||
case "ws-duplicate-frames":
|
||||
return executeSqliteQuerySync<CaptureQueryRow>(
|
||||
return executeSqliteQuerySync(
|
||||
this.db,
|
||||
db
|
||||
.selectFrom("capture_events")
|
||||
@@ -447,7 +434,7 @@ export class DebugProxyCaptureStore {
|
||||
.where("kind", "=", "ws-frame")
|
||||
.where("direction", "=", "inbound")
|
||||
.$if(Boolean(sessionId), (qb) => qb.where("session_id", "=", sessionId ?? ""));
|
||||
return executeSqliteQuerySync<CaptureQueryRow>(
|
||||
return executeSqliteQuerySync(
|
||||
this.db,
|
||||
db
|
||||
.selectFrom("capture_events")
|
||||
@@ -466,7 +453,7 @@ export class DebugProxyCaptureStore {
|
||||
).rows;
|
||||
}
|
||||
case "error-bursts":
|
||||
return executeSqliteQuerySync<CaptureQueryRow>(
|
||||
return executeSqliteQuerySync(
|
||||
this.db,
|
||||
db
|
||||
.selectFrom("capture_events")
|
||||
@@ -501,7 +488,7 @@ export class DebugProxyCaptureStore {
|
||||
return { sessions: 0, events: 0, blobs: 0 };
|
||||
}
|
||||
const db = getCaptureKysely(this.db);
|
||||
const blobRows = executeSqliteQuerySync<BlobIdRow>(
|
||||
const blobRows = executeSqliteQuerySync(
|
||||
this.db,
|
||||
db
|
||||
.selectFrom("capture_events")
|
||||
@@ -511,7 +498,7 @@ export class DebugProxyCaptureStore {
|
||||
.where("data_blob_id", "is not", null),
|
||||
).rows;
|
||||
const eventCount =
|
||||
executeSqliteQueryTakeFirstSync<CountRow>(
|
||||
executeSqliteQueryTakeFirstSync(
|
||||
this.db,
|
||||
db
|
||||
.selectFrom("capture_events")
|
||||
@@ -519,7 +506,7 @@ export class DebugProxyCaptureStore {
|
||||
.where("session_id", "in", uniqueSessionIds),
|
||||
)?.count ?? 0;
|
||||
const sessionCount =
|
||||
executeSqliteQueryTakeFirstSync<CountRow>(
|
||||
executeSqliteQueryTakeFirstSync(
|
||||
this.db,
|
||||
db
|
||||
.selectFrom("capture_sessions")
|
||||
@@ -540,7 +527,7 @@ export class DebugProxyCaptureStore {
|
||||
const remainingBlobRefs =
|
||||
candidateBlobIds.length > 0
|
||||
? new Set(
|
||||
executeSqliteQuerySync<BlobIdRow>(
|
||||
executeSqliteQuerySync(
|
||||
this.db,
|
||||
db
|
||||
.selectFrom("capture_events")
|
||||
|
||||
@@ -65,7 +65,42 @@ export type CaptureQueryPreset =
|
||||
| "missing-ack"
|
||||
| "error-bursts";
|
||||
|
||||
export type CaptureQueryRow = Record<string, string | number | null>;
|
||||
export type CaptureQueryRowsByPreset = {
|
||||
"double-sends": {
|
||||
host: string | null;
|
||||
path: string | null;
|
||||
method: string | null;
|
||||
duplicateCount: number;
|
||||
};
|
||||
"retry-storms": {
|
||||
host: string | null;
|
||||
path: string | null;
|
||||
errorCount: number;
|
||||
};
|
||||
"cache-busting": {
|
||||
host: string | null;
|
||||
path: string | null;
|
||||
variantCount: number;
|
||||
};
|
||||
"ws-duplicate-frames": {
|
||||
host: string | null;
|
||||
path: string | null;
|
||||
duplicateFrames: number;
|
||||
};
|
||||
"missing-ack": {
|
||||
flowId: string;
|
||||
host: string | null;
|
||||
path: string | null;
|
||||
outboundFrames: number;
|
||||
};
|
||||
"error-bursts": {
|
||||
host: string | null;
|
||||
path: string | null;
|
||||
errorCount: number;
|
||||
};
|
||||
};
|
||||
|
||||
export type CaptureQueryRow = CaptureQueryRowsByPreset[CaptureQueryPreset];
|
||||
|
||||
export type CaptureSessionSummary = {
|
||||
id: string;
|
||||
|
||||
@@ -123,7 +123,7 @@ export function listOpenClawRegisteredAgentDatabases(
|
||||
): OpenClawRegisteredAgentDatabase[] {
|
||||
const database = openOpenClawStateDatabase(options);
|
||||
const db = getNodeSqliteKysely<OpenClawAgentRegistryDatabase>(database.db);
|
||||
const rows = executeSqliteQuerySync<OpenClawStateKyselyDatabase["agent_databases"]>(
|
||||
const rows = executeSqliteQuerySync(
|
||||
database.db,
|
||||
db.selectFrom("agent_databases").selectAll().orderBy("agent_id", "asc"),
|
||||
).rows;
|
||||
|
||||
@@ -67,7 +67,7 @@ export function readOpenClawStateKvJson(
|
||||
const database = openOpenClawStateDatabase(options);
|
||||
const db = getNodeSqliteKysely<OpenClawStateKvDatabase>(database.db);
|
||||
const row =
|
||||
executeSqliteQueryTakeFirstSync<KvRow>(
|
||||
executeSqliteQueryTakeFirstSync(
|
||||
database.db,
|
||||
db
|
||||
.selectFrom("kv")
|
||||
@@ -84,7 +84,7 @@ export function listOpenClawStateKvJson<TValue>(
|
||||
): OpenClawStateKvEntry<TValue>[] {
|
||||
const database = openOpenClawStateDatabase(options);
|
||||
const db = getNodeSqliteKysely<OpenClawStateKvDatabase>(database.db);
|
||||
return executeSqliteQuerySync<KvRow>(
|
||||
return executeSqliteQuerySync(
|
||||
database.db,
|
||||
db
|
||||
.selectFrom("kv")
|
||||
|
||||
Reference in New Issue
Block a user