refactor: store subagent runs in sqlite table

This commit is contained in:
Peter Steinberger
2026-05-09 00:05:35 +01:00
parent 382247b437
commit bbca94041c
7 changed files with 657 additions and 34 deletions

View File

@@ -105,13 +105,16 @@ The branch already has a real shared SQLite base:
`capture_sessions`, `capture_events`, `capture_blobs`,
`sandbox_registry_entries`, `cron_run_logs`, `cron_jobs`, `commitments`,
`delivery_queue_entries`, `task_runs`, `task_delivery_state`, `flow_runs`,
`migration_runs`, and `backup_runs`.
`subagent_runs`, `migration_runs`, and `backup_runs`.
- `src/state/openclaw-agent-db.ts` opens
`agents/<agentId>/agent/openclaw-agent.sqlite`, registers the database in the
global DB, and owns agent-local session, transcript, VFS, artifact, and cache
tables. Shared runtime discovery now reads the generated-typed
`agent_databases` registry instead of reimplementing that query at each call
site.
- Subagent run recovery state now lives in typed shared `subagent_runs` rows
with indexed child, requester, and controller session keys. The old
`subagent_runs` KV scope is migration input only.
- `src/agents/filesystem/virtual-agent-fs.sqlite.ts` implements a SQLite VFS
over the agent database `vfs_entries` table.
- `src/agents/runtime-worker.entry.ts` creates per-run SQLite VFS, tool artifact,
@@ -525,6 +528,7 @@ agent_databases(agent_id, path, schema_version, last_seen_at, size_bytes)
task_runs(...)
task_delivery_state(...)
flow_runs(...)
subagent_runs(run_id, child_session_key, requester_session_key, controller_session_key, created_at, ended_at, cleanup_handled, payload_json)
plugin_state_entries(plugin_id, namespace, entry_key, value_json, created_at, expires_at)
plugin_blob_entries(plugin_id, namespace, entry_key, metadata_json, blob, created_at, expires_at)
media_blobs(subdir, id, content_type, size_bytes, blob, created_at, updated_at)

View File

@@ -1,18 +1,27 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import type { DatabaseSync } from "node:sqlite";
import type { Insertable, Selectable } from "kysely";
import { resolveStateDir } from "../config/paths.js";
import { loadJsonFile } from "../infra/json-file.js";
import { executeSqliteQuerySync, getNodeSqliteKysely } from "../infra/kysely-sync.js";
import { readStringValue } from "../shared/string-coerce.js";
import type { OpenClawStateDatabaseOptions } from "../state/openclaw-state-db.js";
import type { DB as OpenClawStateKyselyDatabase } from "../state/openclaw-state-db.generated.js";
import {
deleteOpenClawStateKvJson,
listOpenClawStateKvJson,
writeOpenClawStateKvJson,
} from "../state/openclaw-state-kv.js";
type OpenClawStateDatabaseOptions,
openOpenClawStateDatabase,
runOpenClawStateWriteTransaction,
} from "../state/openclaw-state-db.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js";
import type { DeliveryContext } from "../utils/delivery-context.types.js";
import type { SubagentRunOutcome } from "./subagent-announce-output.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
type SubagentRunsTable = OpenClawStateKyselyDatabase["subagent_runs"];
type SubagentRunRow = Selectable<SubagentRunsTable>;
type SubagentRegistryDatabase = Pick<OpenClawStateKyselyDatabase, "subagent_runs">;
type PersistedSubagentRegistryV1 = {
version: 1;
runs: Record<string, LegacySubagentRunRecord>;
@@ -25,8 +34,6 @@ type PersistedSubagentRegistryV2 = {
type PersistedSubagentRegistry = PersistedSubagentRegistryV1 | PersistedSubagentRegistryV2;
const SUBAGENT_REGISTRY_KV_SCOPE = "subagent_runs";
type PersistedSubagentRunRecord = SubagentRunRecord;
type LegacySubagentRunRecord = PersistedSubagentRunRecord & {
@@ -125,21 +132,244 @@ function normalizePersistedRunRecords(params: {
return out;
}
function getSubagentRegistryKysely(db: DatabaseSync) {
return getNodeSqliteKysely<SubagentRegistryDatabase>(db);
}
function serializeJson(value: unknown): string | null {
return value == null ? null : JSON.stringify(value);
}
// oxlint-disable-next-line typescript/no-unnecessary-type-parameters -- JSON columns are parsed at module boundaries.
function parseJsonValue<T>(raw: string | null): T | undefined {
if (!raw?.trim()) {
return undefined;
}
try {
return JSON.parse(raw) as T;
} catch {
return undefined;
}
}
function normalizeNumber(value: number | bigint | null): number | undefined {
if (typeof value === "bigint") {
return Number(value);
}
return typeof value === "number" ? value : undefined;
}
function normalizeBoolean(value: number | bigint | null): boolean | undefined {
if (typeof value === "bigint") {
return value !== 0n;
}
return typeof value === "number" ? value !== 0 : undefined;
}
function booleanToInteger(value: boolean | undefined): number | null {
return typeof value === "boolean" ? (value ? 1 : 0) : null;
}
function rowToRunRecord(row: SubagentRunRow): SubagentRunRecord | null {
const payload = parseJsonValue<Partial<SubagentRunRecord>>(row.payload_json) ?? {};
const raw: PersistedSubagentRunRecord = {
...payload,
runId: row.run_id,
childSessionKey: row.child_session_key,
controllerSessionKey: row.controller_session_key ?? undefined,
requesterSessionKey: row.requester_session_key,
requesterDisplayKey: row.requester_display_key,
requesterOrigin: parseJsonValue<DeliveryContext>(row.requester_origin_json),
task: row.task,
cleanup: row.cleanup === "delete" ? "delete" : "keep",
label: row.label ?? undefined,
model: row.model ?? undefined,
agentDir: row.agent_dir ?? undefined,
workspaceDir: row.workspace_dir ?? undefined,
runTimeoutSeconds: normalizeNumber(row.run_timeout_seconds),
spawnMode: row.spawn_mode === "session" ? "session" : "run",
createdAt: normalizeNumber(row.created_at) ?? 0,
startedAt: normalizeNumber(row.started_at),
sessionStartedAt: normalizeNumber(row.session_started_at),
accumulatedRuntimeMs: normalizeNumber(row.accumulated_runtime_ms),
endedAt: normalizeNumber(row.ended_at),
outcome: parseJsonValue<SubagentRunOutcome>(row.outcome_json),
archiveAtMs: normalizeNumber(row.archive_at_ms),
cleanupCompletedAt: normalizeNumber(row.cleanup_completed_at),
cleanupHandled: normalizeBoolean(row.cleanup_handled),
suppressAnnounceReason:
row.suppress_announce_reason === "steer-restart" || row.suppress_announce_reason === "killed"
? row.suppress_announce_reason
: undefined,
expectsCompletionMessage: normalizeBoolean(row.expects_completion_message),
announceRetryCount: normalizeNumber(row.announce_retry_count),
lastAnnounceRetryAt: normalizeNumber(row.last_announce_retry_at),
lastAnnounceDeliveryError: row.last_announce_delivery_error ?? undefined,
endedReason: row.ended_reason as SubagentRunRecord["endedReason"],
pauseReason: row.pause_reason === "sessions_yield" ? "sessions_yield" : undefined,
wakeOnDescendantSettle: normalizeBoolean(row.wake_on_descendant_settle),
frozenResultText: row.frozen_result_text ?? undefined,
frozenResultCapturedAt: normalizeNumber(row.frozen_result_captured_at),
fallbackFrozenResultText: row.fallback_frozen_result_text ?? undefined,
fallbackFrozenResultCapturedAt: normalizeNumber(row.fallback_frozen_result_captured_at),
endedHookEmittedAt: normalizeNumber(row.ended_hook_emitted_at),
pendingFinalDelivery: normalizeBoolean(row.pending_final_delivery),
pendingFinalDeliveryCreatedAt: normalizeNumber(row.pending_final_delivery_created_at),
pendingFinalDeliveryLastAttemptAt: normalizeNumber(row.pending_final_delivery_last_attempt_at),
pendingFinalDeliveryAttemptCount: normalizeNumber(row.pending_final_delivery_attempt_count),
pendingFinalDeliveryLastError: row.pending_final_delivery_last_error,
pendingFinalDeliveryPayload: parseJsonValue(row.pending_final_delivery_payload_json),
completionAnnouncedAt: normalizeNumber(row.completion_announced_at),
attachmentsDir: row.attachments_dir ?? undefined,
attachmentsRootDir: row.attachments_root_dir ?? undefined,
retainAttachmentsOnKeep: normalizeBoolean(row.retain_attachments_on_keep),
};
return (
normalizePersistedRunRecords({
runsRaw: { [raw.runId]: raw },
isLegacy: false,
}).get(raw.runId) ?? null
);
}
function runRecordToRow(record: SubagentRunRecord): Insertable<SubagentRunsTable> {
return {
run_id: record.runId,
child_session_key: record.childSessionKey,
controller_session_key: record.controllerSessionKey ?? null,
requester_session_key: record.requesterSessionKey,
requester_display_key: record.requesterDisplayKey,
requester_origin_json: serializeJson(record.requesterOrigin),
task: record.task,
cleanup: record.cleanup,
label: record.label ?? null,
model: record.model ?? null,
agent_dir: record.agentDir ?? null,
workspace_dir: record.workspaceDir ?? null,
run_timeout_seconds: record.runTimeoutSeconds ?? null,
spawn_mode: record.spawnMode ?? "run",
created_at: record.createdAt,
started_at: record.startedAt ?? null,
session_started_at: record.sessionStartedAt ?? null,
accumulated_runtime_ms: record.accumulatedRuntimeMs ?? null,
ended_at: record.endedAt ?? null,
outcome_json: serializeJson(record.outcome),
archive_at_ms: record.archiveAtMs ?? null,
cleanup_completed_at: record.cleanupCompletedAt ?? null,
cleanup_handled: booleanToInteger(record.cleanupHandled),
suppress_announce_reason: record.suppressAnnounceReason ?? null,
expects_completion_message: booleanToInteger(record.expectsCompletionMessage),
announce_retry_count: record.announceRetryCount ?? null,
last_announce_retry_at: record.lastAnnounceRetryAt ?? null,
last_announce_delivery_error: record.lastAnnounceDeliveryError ?? null,
ended_reason: record.endedReason ?? null,
pause_reason: record.pauseReason ?? null,
wake_on_descendant_settle: booleanToInteger(record.wakeOnDescendantSettle),
frozen_result_text: record.frozenResultText ?? null,
frozen_result_captured_at: record.frozenResultCapturedAt ?? null,
fallback_frozen_result_text: record.fallbackFrozenResultText ?? null,
fallback_frozen_result_captured_at: record.fallbackFrozenResultCapturedAt ?? null,
ended_hook_emitted_at: record.endedHookEmittedAt ?? null,
pending_final_delivery: booleanToInteger(record.pendingFinalDelivery),
pending_final_delivery_created_at: record.pendingFinalDeliveryCreatedAt ?? null,
pending_final_delivery_last_attempt_at: record.pendingFinalDeliveryLastAttemptAt ?? null,
pending_final_delivery_attempt_count: record.pendingFinalDeliveryAttemptCount ?? null,
pending_final_delivery_last_error: record.pendingFinalDeliveryLastError ?? null,
pending_final_delivery_payload_json: serializeJson(record.pendingFinalDeliveryPayload),
completion_announced_at: record.completionAnnouncedAt ?? null,
attachments_dir: record.attachmentsDir ?? null,
attachments_root_dir: record.attachmentsRootDir ?? null,
retain_attachments_on_keep: booleanToInteger(record.retainAttachmentsOnKeep),
payload_json: JSON.stringify(record),
};
}
function upsertSubagentRunRow(db: DatabaseSync, row: Insertable<SubagentRunsTable>): void {
executeSqliteQuerySync(
db,
getSubagentRegistryKysely(db)
.insertInto("subagent_runs")
.values(row)
.onConflict((conflict) =>
conflict.column("run_id").doUpdateSet({
child_session_key: (eb) => eb.ref("excluded.child_session_key"),
controller_session_key: (eb) => eb.ref("excluded.controller_session_key"),
requester_session_key: (eb) => eb.ref("excluded.requester_session_key"),
requester_display_key: (eb) => eb.ref("excluded.requester_display_key"),
requester_origin_json: (eb) => eb.ref("excluded.requester_origin_json"),
task: (eb) => eb.ref("excluded.task"),
cleanup: (eb) => eb.ref("excluded.cleanup"),
label: (eb) => eb.ref("excluded.label"),
model: (eb) => eb.ref("excluded.model"),
agent_dir: (eb) => eb.ref("excluded.agent_dir"),
workspace_dir: (eb) => eb.ref("excluded.workspace_dir"),
run_timeout_seconds: (eb) => eb.ref("excluded.run_timeout_seconds"),
spawn_mode: (eb) => eb.ref("excluded.spawn_mode"),
created_at: (eb) => eb.ref("excluded.created_at"),
started_at: (eb) => eb.ref("excluded.started_at"),
session_started_at: (eb) => eb.ref("excluded.session_started_at"),
accumulated_runtime_ms: (eb) => eb.ref("excluded.accumulated_runtime_ms"),
ended_at: (eb) => eb.ref("excluded.ended_at"),
outcome_json: (eb) => eb.ref("excluded.outcome_json"),
archive_at_ms: (eb) => eb.ref("excluded.archive_at_ms"),
cleanup_completed_at: (eb) => eb.ref("excluded.cleanup_completed_at"),
cleanup_handled: (eb) => eb.ref("excluded.cleanup_handled"),
suppress_announce_reason: (eb) => eb.ref("excluded.suppress_announce_reason"),
expects_completion_message: (eb) => eb.ref("excluded.expects_completion_message"),
announce_retry_count: (eb) => eb.ref("excluded.announce_retry_count"),
last_announce_retry_at: (eb) => eb.ref("excluded.last_announce_retry_at"),
last_announce_delivery_error: (eb) => eb.ref("excluded.last_announce_delivery_error"),
ended_reason: (eb) => eb.ref("excluded.ended_reason"),
pause_reason: (eb) => eb.ref("excluded.pause_reason"),
wake_on_descendant_settle: (eb) => eb.ref("excluded.wake_on_descendant_settle"),
frozen_result_text: (eb) => eb.ref("excluded.frozen_result_text"),
frozen_result_captured_at: (eb) => eb.ref("excluded.frozen_result_captured_at"),
fallback_frozen_result_text: (eb) => eb.ref("excluded.fallback_frozen_result_text"),
fallback_frozen_result_captured_at: (eb) =>
eb.ref("excluded.fallback_frozen_result_captured_at"),
ended_hook_emitted_at: (eb) => eb.ref("excluded.ended_hook_emitted_at"),
pending_final_delivery: (eb) => eb.ref("excluded.pending_final_delivery"),
pending_final_delivery_created_at: (eb) =>
eb.ref("excluded.pending_final_delivery_created_at"),
pending_final_delivery_last_attempt_at: (eb) =>
eb.ref("excluded.pending_final_delivery_last_attempt_at"),
pending_final_delivery_attempt_count: (eb) =>
eb.ref("excluded.pending_final_delivery_attempt_count"),
pending_final_delivery_last_error: (eb) =>
eb.ref("excluded.pending_final_delivery_last_error"),
pending_final_delivery_payload_json: (eb) =>
eb.ref("excluded.pending_final_delivery_payload_json"),
completion_announced_at: (eb) => eb.ref("excluded.completion_announced_at"),
attachments_dir: (eb) => eb.ref("excluded.attachments_dir"),
attachments_root_dir: (eb) => eb.ref("excluded.attachments_root_dir"),
retain_attachments_on_keep: (eb) => eb.ref("excluded.retain_attachments_on_keep"),
payload_json: (eb) => eb.ref("excluded.payload_json"),
}),
),
);
}
export function loadSubagentRegistryFromSqlite(
env: NodeJS.ProcessEnv = process.env,
): Map<string, SubagentRunRecord> | null {
const entries = listOpenClawStateKvJson<PersistedSubagentRunRecord>(
SUBAGENT_REGISTRY_KV_SCOPE,
subagentRegistryDbOptions(env),
);
if (entries.length === 0) {
const database = openOpenClawStateDatabase(subagentRegistryDbOptions(env));
const query = getSubagentRegistryKysely(database.db)
.selectFrom("subagent_runs")
.selectAll()
.orderBy("created_at", "asc")
.orderBy("run_id", "asc");
const rows = executeSqliteQuerySync<SubagentRunRow>(database.db, query).rows;
if (rows.length === 0) {
return null;
}
const runsRaw: Record<string, unknown> = {};
for (const entry of entries) {
runsRaw[entry.key] = entry.value;
const runs = new Map<string, SubagentRunRecord>();
for (const row of rows) {
const run = rowToRunRecord(row);
if (run) {
runs.set(run.runId, run);
}
}
return normalizePersistedRunRecords({ runsRaw, isLegacy: false });
return runs;
}
export function loadSubagentRegistryFromState(): Map<string, SubagentRunRecord> {
@@ -150,10 +380,11 @@ function writeSubagentRegistryRunsToSqlite(
runs: Map<string, SubagentRunRecord>,
env: NodeJS.ProcessEnv = process.env,
): void {
const dbOptions = subagentRegistryDbOptions(env);
for (const [runId, entry] of runs.entries()) {
writeOpenClawStateKvJson(SUBAGENT_REGISTRY_KV_SCOPE, runId, entry, dbOptions);
}
runOpenClawStateWriteTransaction((database) => {
for (const entry of runs.values()) {
upsertSubagentRunRow(database.db, runRecordToRow(entry));
}
}, subagentRegistryDbOptions(env));
}
function loadLegacySubagentRegistryFile(pathname: string): Map<string, SubagentRunRecord> {
@@ -205,15 +436,22 @@ export function importLegacySubagentRegistryFileToSqlite(env: NodeJS.ProcessEnv
}
export function saveSubagentRegistryToState(runs: Map<string, SubagentRunRecord>) {
const dbOptions = subagentRegistryDbOptions();
const existing = listOpenClawStateKvJson<PersistedSubagentRunRecord>(
SUBAGENT_REGISTRY_KV_SCOPE,
dbOptions,
);
for (const entry of existing) {
if (!runs.has(entry.key)) {
deleteOpenClawStateKvJson(SUBAGENT_REGISTRY_KV_SCOPE, entry.key, dbOptions);
runOpenClawStateWriteTransaction((database) => {
const kysely = getSubagentRegistryKysely(database.db);
const existing = executeSqliteQuerySync<{ run_id: string }>(
database.db,
kysely.selectFrom("subagent_runs").select("run_id"),
).rows;
for (const entry of existing) {
if (!runs.has(entry.run_id)) {
executeSqliteQuerySync(
database.db,
kysely.deleteFrom("subagent_runs").where("run_id", "=", entry.run_id),
);
}
}
}
writeSubagentRegistryRunsToSqlite(runs);
for (const entry of runs.values()) {
upsertSubagentRunRow(database.db, runRecordToRow(entry));
}
}, subagentRegistryDbOptions());
}

View File

@@ -234,6 +234,56 @@ export interface SchemaMigrations {
version: Generated<number>;
}
export interface SubagentRuns {
accumulated_runtime_ms: number | null;
agent_dir: string | null;
announce_retry_count: number | null;
archive_at_ms: number | null;
attachments_dir: string | null;
attachments_root_dir: string | null;
child_session_key: string;
cleanup: string;
cleanup_completed_at: number | null;
cleanup_handled: number | null;
completion_announced_at: number | null;
controller_session_key: string | null;
created_at: number;
ended_at: number | null;
ended_hook_emitted_at: number | null;
ended_reason: string | null;
expects_completion_message: number | null;
fallback_frozen_result_captured_at: number | null;
fallback_frozen_result_text: string | null;
frozen_result_captured_at: number | null;
frozen_result_text: string | null;
label: string | null;
last_announce_delivery_error: string | null;
last_announce_retry_at: number | null;
model: string | null;
outcome_json: string | null;
pause_reason: string | null;
payload_json: Generated<string>;
pending_final_delivery: number | null;
pending_final_delivery_attempt_count: number | null;
pending_final_delivery_created_at: number | null;
pending_final_delivery_last_attempt_at: number | null;
pending_final_delivery_last_error: string | null;
pending_final_delivery_payload_json: string | null;
requester_display_key: string;
requester_origin_json: string | null;
requester_session_key: string;
retain_attachments_on_keep: number | null;
run_id: string;
run_timeout_seconds: number | null;
session_started_at: number | null;
spawn_mode: string | null;
started_at: number | null;
suppress_announce_reason: string | null;
task: string;
wake_on_descendant_settle: number | null;
workspace_dir: string | null;
}
export interface TaskDeliveryState {
last_notified_event_at: number | null;
requester_origin_json: string | null;
@@ -299,6 +349,7 @@ export interface DB {
plugin_state_entries: PluginStateEntries;
sandbox_registry_entries: SandboxRegistryEntries;
schema_migrations: SchemaMigrations;
subagent_runs: SubagentRuns;
task_delivery_state: TaskDeliveryState;
task_runs: TaskRuns;
transcript_files: TranscriptFiles;

View File

@@ -157,7 +157,7 @@ describe("openclaw state database", () => {
expect(columns.some((column) => column.name === "sort_order")).toBe(true);
expect(index?.sql).toContain("sort_order ASC");
expect(version.user_version).toBe(19);
expect(version.user_version).toBe(20);
});
it("migrates legacy cron runtime state from kv into cron job columns", () => {
@@ -215,7 +215,56 @@ describe("openclaw state database", () => {
.prepare("SELECT COUNT(*) AS count FROM kv WHERE scope = ?")
.get("cron.jobs.state"),
).toEqual({ count: 0 });
expect(database.db.prepare("PRAGMA user_version").get()).toEqual({ user_version: 19 });
expect(database.db.prepare("PRAGMA user_version").get()).toEqual({ user_version: 20 });
});
it("migrates persisted subagent runs from kv into subagent run rows", () => {
const stateDir = createTempStateDir();
const dbPath = resolveOpenClawStateSqlitePath({ OPENCLAW_STATE_DIR: stateDir });
fs.mkdirSync(path.dirname(dbPath), { recursive: true });
const sqlite = requireNodeSqlite();
const oldDb = new sqlite.DatabaseSync(dbPath);
oldDb.exec(`
CREATE TABLE kv (
scope TEXT NOT NULL,
key TEXT NOT NULL,
value_json TEXT NOT NULL,
updated_at INTEGER NOT NULL,
PRIMARY KEY (scope, key)
);
INSERT INTO kv (scope, key, value_json, updated_at)
VALUES (
'subagent_runs',
'run-1',
'{"runId":"run-1","childSessionKey":"agent:main:subagent:child","requesterSessionKey":"agent:main:main","requesterDisplayKey":"main","task":"migrate subagent","cleanup":"keep","createdAt":1,"startedAt":2,"cleanupHandled":false,"requesterOrigin":{"channel":"telegram","accountId":"acct-1"}}',
3
);
PRAGMA user_version = 19;
`);
oldDb.close();
const database = openOpenClawStateDatabase({
env: { OPENCLAW_STATE_DIR: stateDir },
});
expect(
database.db
.prepare(
"SELECT run_id, child_session_key, requester_session_key, task, cleanup_handled, requester_origin_json FROM subagent_runs WHERE run_id = ?",
)
.get("run-1"),
).toEqual({
run_id: "run-1",
child_session_key: "agent:main:subagent:child",
requester_session_key: "agent:main:main",
task: "migrate subagent",
cleanup_handled: 0,
requester_origin_json: '{"channel":"telegram","accountId":"acct-1"}',
});
expect(
database.db.prepare("SELECT COUNT(*) AS count FROM kv WHERE scope = ?").get("subagent_runs"),
).toEqual({ count: 0 });
expect(database.db.prepare("PRAGMA user_version").get()).toEqual({ user_version: 20 });
});
it("upgrades task delivery state with task-run cascade integrity", () => {

View File

@@ -17,7 +17,7 @@ import {
} from "./openclaw-state-db.paths.js";
import { OPENCLAW_STATE_SCHEMA_SQL } from "./openclaw-state-schema.generated.js";
const OPENCLAW_STATE_SCHEMA_VERSION = 19;
const OPENCLAW_STATE_SCHEMA_VERSION = 20;
export const OPENCLAW_SQLITE_BUSY_TIMEOUT_MS = 30_000;
const OPENCLAW_STATE_DIR_MODE = 0o700;
const OPENCLAW_STATE_FILE_MODE = 0o600;
@@ -190,6 +190,162 @@ function migrateCronJobRuntimeStateColumns(db: DatabaseSync): void {
db.prepare("DELETE FROM kv WHERE scope = 'cron.jobs.state'").run();
}
function readFiniteNumber(value: unknown): number | null {
return typeof value === "number" && Number.isFinite(value) ? value : null;
}
function readString(value: unknown): string | null {
return typeof value === "string" && value.trim() ? value.trim() : null;
}
function readBooleanInteger(value: unknown): number | null {
return typeof value === "boolean" ? (value ? 1 : 0) : null;
}
function readJsonText(value: unknown): string | null {
return value === undefined || value === null ? null : JSON.stringify(value);
}
function migrateSubagentRunsFromKv(db: DatabaseSync): void {
const legacyRows = db
.prepare("SELECT key, value_json FROM kv WHERE scope = 'subagent_runs'")
.all() as Array<{ key?: unknown; value_json?: unknown }>;
if (legacyRows.length === 0) {
return;
}
const insert = db.prepare(`
INSERT OR REPLACE INTO subagent_runs (
run_id,
child_session_key,
controller_session_key,
requester_session_key,
requester_display_key,
requester_origin_json,
task,
cleanup,
label,
model,
agent_dir,
workspace_dir,
run_timeout_seconds,
spawn_mode,
created_at,
started_at,
session_started_at,
accumulated_runtime_ms,
ended_at,
outcome_json,
archive_at_ms,
cleanup_completed_at,
cleanup_handled,
suppress_announce_reason,
expects_completion_message,
announce_retry_count,
last_announce_retry_at,
last_announce_delivery_error,
ended_reason,
pause_reason,
wake_on_descendant_settle,
frozen_result_text,
frozen_result_captured_at,
fallback_frozen_result_text,
fallback_frozen_result_captured_at,
ended_hook_emitted_at,
pending_final_delivery,
pending_final_delivery_created_at,
pending_final_delivery_last_attempt_at,
pending_final_delivery_attempt_count,
pending_final_delivery_last_error,
pending_final_delivery_payload_json,
completion_announced_at,
attachments_dir,
attachments_root_dir,
retain_attachments_on_keep,
payload_json
)
VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
`);
for (const row of legacyRows) {
if (typeof row.key !== "string" || typeof row.value_json !== "string") {
continue;
}
let parsed: unknown;
try {
parsed = JSON.parse(row.value_json);
} catch {
continue;
}
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
continue;
}
const record = parsed as Record<string, unknown>;
const runId = readString(record.runId) ?? row.key.trim();
const childSessionKey = readString(record.childSessionKey);
const requesterSessionKey = readString(record.requesterSessionKey);
if (!runId || !childSessionKey || !requesterSessionKey) {
continue;
}
insert.run(
runId,
childSessionKey,
readString(record.controllerSessionKey) ?? requesterSessionKey,
requesterSessionKey,
readString(record.requesterDisplayKey) ?? "",
readJsonText(record.requesterOrigin),
readString(record.task) ?? "",
record.cleanup === "delete" ? "delete" : "keep",
readString(record.label),
readString(record.model),
readString(record.agentDir),
readString(record.workspaceDir),
readFiniteNumber(record.runTimeoutSeconds),
record.spawnMode === "session" ? "session" : "run",
readFiniteNumber(record.createdAt) ?? Date.now(),
readFiniteNumber(record.startedAt),
readFiniteNumber(record.sessionStartedAt),
readFiniteNumber(record.accumulatedRuntimeMs),
readFiniteNumber(record.endedAt),
readJsonText(record.outcome),
readFiniteNumber(record.archiveAtMs),
readFiniteNumber(record.cleanupCompletedAt),
readBooleanInteger(record.cleanupHandled),
readString(record.suppressAnnounceReason),
readBooleanInteger(record.expectsCompletionMessage),
readFiniteNumber(record.announceRetryCount),
readFiniteNumber(record.lastAnnounceRetryAt),
readString(record.lastAnnounceDeliveryError),
readString(record.endedReason),
readString(record.pauseReason),
readBooleanInteger(record.wakeOnDescendantSettle),
typeof record.frozenResultText === "string" ? record.frozenResultText : null,
readFiniteNumber(record.frozenResultCapturedAt),
typeof record.fallbackFrozenResultText === "string" ? record.fallbackFrozenResultText : null,
readFiniteNumber(record.fallbackFrozenResultCapturedAt),
readFiniteNumber(record.endedHookEmittedAt),
readBooleanInteger(record.pendingFinalDelivery),
readFiniteNumber(record.pendingFinalDeliveryCreatedAt),
readFiniteNumber(record.pendingFinalDeliveryLastAttemptAt),
readFiniteNumber(record.pendingFinalDeliveryAttemptCount),
typeof record.pendingFinalDeliveryLastError === "string"
? record.pendingFinalDeliveryLastError
: null,
readJsonText(record.pendingFinalDeliveryPayload),
readFiniteNumber(record.completionAnnouncedAt),
readString(record.attachmentsDir),
readString(record.attachmentsRootDir),
readBooleanInteger(record.retainAttachmentsOnKeep),
row.value_json,
);
}
db.prepare("DELETE FROM kv WHERE scope = 'subagent_runs'").run();
}
function rebuildTaskDeliveryStateWithForeignKey(db: DatabaseSync): void {
db.exec(`
CREATE TABLE IF NOT EXISTS task_delivery_state_next (
@@ -266,6 +422,9 @@ function migrateStateSchema(db: DatabaseSync, fromVersion: number): void {
if (fromVersion < 19) {
migrateCronJobRuntimeStateColumns(db);
}
if (fromVersion < 20) {
migrateSubagentRunsFromKv(db);
}
}
function ensureSchema(db: DatabaseSync, pathname: string): void {

View File

@@ -292,6 +292,67 @@ CREATE INDEX IF NOT EXISTS idx_task_runs_owner_key ON task_runs(owner_key);
CREATE INDEX IF NOT EXISTS idx_task_runs_parent_flow_id ON task_runs(parent_flow_id);
CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);
CREATE TABLE IF NOT EXISTS subagent_runs (
run_id TEXT NOT NULL PRIMARY KEY,
child_session_key TEXT NOT NULL,
controller_session_key TEXT,
requester_session_key TEXT NOT NULL,
requester_display_key TEXT NOT NULL,
requester_origin_json TEXT,
task TEXT NOT NULL,
cleanup TEXT NOT NULL,
label TEXT,
model TEXT,
agent_dir TEXT,
workspace_dir TEXT,
run_timeout_seconds INTEGER,
spawn_mode TEXT,
created_at INTEGER NOT NULL,
started_at INTEGER,
session_started_at INTEGER,
accumulated_runtime_ms INTEGER,
ended_at INTEGER,
outcome_json TEXT,
archive_at_ms INTEGER,
cleanup_completed_at INTEGER,
cleanup_handled INTEGER,
suppress_announce_reason TEXT,
expects_completion_message INTEGER,
announce_retry_count INTEGER,
last_announce_retry_at INTEGER,
last_announce_delivery_error TEXT,
ended_reason TEXT,
pause_reason TEXT,
wake_on_descendant_settle INTEGER,
frozen_result_text TEXT,
frozen_result_captured_at INTEGER,
fallback_frozen_result_text TEXT,
fallback_frozen_result_captured_at INTEGER,
ended_hook_emitted_at INTEGER,
pending_final_delivery INTEGER,
pending_final_delivery_created_at INTEGER,
pending_final_delivery_last_attempt_at INTEGER,
pending_final_delivery_attempt_count INTEGER,
pending_final_delivery_last_error TEXT,
pending_final_delivery_payload_json TEXT,
completion_announced_at INTEGER,
attachments_dir TEXT,
attachments_root_dir TEXT,
retain_attachments_on_keep INTEGER,
payload_json TEXT NOT NULL DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_subagent_runs_child_session_key
ON subagent_runs(child_session_key, created_at DESC, run_id);
CREATE INDEX IF NOT EXISTS idx_subagent_runs_requester_session_key
ON subagent_runs(requester_session_key, created_at DESC, run_id);
CREATE INDEX IF NOT EXISTS idx_subagent_runs_controller_session_key
ON subagent_runs(controller_session_key, created_at DESC, run_id);
CREATE INDEX IF NOT EXISTS idx_subagent_runs_archive_at
ON subagent_runs(archive_at_ms, cleanup_handled, run_id);
CREATE INDEX IF NOT EXISTS idx_subagent_runs_ended_cleanup
ON subagent_runs(ended_at, cleanup_handled, run_id);
CREATE TABLE IF NOT EXISTS task_delivery_state (
task_id TEXT NOT NULL PRIMARY KEY,
requester_origin_json TEXT,

View File

@@ -287,6 +287,67 @@ CREATE INDEX IF NOT EXISTS idx_task_runs_owner_key ON task_runs(owner_key);
CREATE INDEX IF NOT EXISTS idx_task_runs_parent_flow_id ON task_runs(parent_flow_id);
CREATE INDEX IF NOT EXISTS idx_task_runs_child_session_key ON task_runs(child_session_key);
CREATE TABLE IF NOT EXISTS subagent_runs (
run_id TEXT NOT NULL PRIMARY KEY,
child_session_key TEXT NOT NULL,
controller_session_key TEXT,
requester_session_key TEXT NOT NULL,
requester_display_key TEXT NOT NULL,
requester_origin_json TEXT,
task TEXT NOT NULL,
cleanup TEXT NOT NULL,
label TEXT,
model TEXT,
agent_dir TEXT,
workspace_dir TEXT,
run_timeout_seconds INTEGER,
spawn_mode TEXT,
created_at INTEGER NOT NULL,
started_at INTEGER,
session_started_at INTEGER,
accumulated_runtime_ms INTEGER,
ended_at INTEGER,
outcome_json TEXT,
archive_at_ms INTEGER,
cleanup_completed_at INTEGER,
cleanup_handled INTEGER,
suppress_announce_reason TEXT,
expects_completion_message INTEGER,
announce_retry_count INTEGER,
last_announce_retry_at INTEGER,
last_announce_delivery_error TEXT,
ended_reason TEXT,
pause_reason TEXT,
wake_on_descendant_settle INTEGER,
frozen_result_text TEXT,
frozen_result_captured_at INTEGER,
fallback_frozen_result_text TEXT,
fallback_frozen_result_captured_at INTEGER,
ended_hook_emitted_at INTEGER,
pending_final_delivery INTEGER,
pending_final_delivery_created_at INTEGER,
pending_final_delivery_last_attempt_at INTEGER,
pending_final_delivery_attempt_count INTEGER,
pending_final_delivery_last_error TEXT,
pending_final_delivery_payload_json TEXT,
completion_announced_at INTEGER,
attachments_dir TEXT,
attachments_root_dir TEXT,
retain_attachments_on_keep INTEGER,
payload_json TEXT NOT NULL DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_subagent_runs_child_session_key
ON subagent_runs(child_session_key, created_at DESC, run_id);
CREATE INDEX IF NOT EXISTS idx_subagent_runs_requester_session_key
ON subagent_runs(requester_session_key, created_at DESC, run_id);
CREATE INDEX IF NOT EXISTS idx_subagent_runs_controller_session_key
ON subagent_runs(controller_session_key, created_at DESC, run_id);
CREATE INDEX IF NOT EXISTS idx_subagent_runs_archive_at
ON subagent_runs(archive_at_ms, cleanup_handled, run_id);
CREATE INDEX IF NOT EXISTS idx_subagent_runs_ended_cleanup
ON subagent_runs(ended_at, cleanup_handled, run_id);
CREATE TABLE IF NOT EXISTS task_delivery_state (
task_id TEXT NOT NULL PRIMARY KEY,
requester_origin_json TEXT,