Gateway: abstract task registry storage (#56927)

Merged via squash.

Prepared head SHA: 8db9b860e8
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
Mariano
2026-03-29 12:58:06 +02:00
committed by GitHub
parent 17c36b5093
commit 92d0b3a557
4 changed files with 276 additions and 62 deletions

View File

@@ -0,0 +1,67 @@
import os from "node:os";
import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
import { loadJsonFile, saveJsonFile } from "../infra/json-file.js";
import type { TaskRecord } from "./task-registry.types.js";
type PersistedTaskRegistry = {
version: 1;
tasks: Record<string, TaskRecord>;
};
const TASK_REGISTRY_VERSION = 1 as const;
function resolveTaskStateDir(env: NodeJS.ProcessEnv = process.env): string {
const explicit = env.OPENCLAW_STATE_DIR?.trim();
if (explicit) {
return resolveStateDir(env);
}
if (env.VITEST || env.NODE_ENV === "test") {
return path.join(os.tmpdir(), "openclaw-test-state", String(process.pid));
}
return resolveStateDir(env);
}
export function resolveTaskRegistryPath(): string {
return path.join(resolveTaskStateDir(process.env), "tasks", "runs.json");
}
export function loadTaskRegistrySnapshotFromJson(): Map<string, TaskRecord> {
const pathname = resolveTaskRegistryPath();
const raw = loadJsonFile(pathname);
if (!raw || typeof raw !== "object") {
return new Map();
}
const record = raw as Partial<PersistedTaskRegistry>;
if (record.version !== TASK_REGISTRY_VERSION) {
return new Map();
}
const tasksRaw = record.tasks;
if (!tasksRaw || typeof tasksRaw !== "object") {
return new Map();
}
const out = new Map<string, TaskRecord>();
for (const [taskId, entry] of Object.entries(tasksRaw)) {
if (!entry || typeof entry !== "object") {
continue;
}
if (!entry.taskId || typeof entry.taskId !== "string") {
continue;
}
out.set(taskId, entry);
}
return out;
}
export function saveTaskRegistrySnapshotToJson(tasks: ReadonlyMap<string, TaskRecord>) {
const pathname = resolveTaskRegistryPath();
const serialized: Record<string, TaskRecord> = {};
for (const [taskId, entry] of tasks.entries()) {
serialized[taskId] = entry;
}
const out: PersistedTaskRegistry = {
version: TASK_REGISTRY_VERSION,
tasks: serialized,
};
saveJsonFile(pathname, out);
}

View File

@@ -0,0 +1,108 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import {
createTaskRecord,
deleteTaskRecordById,
findTaskByRunId,
resetTaskRegistryForTests,
} from "./task-registry.js";
import { configureTaskRegistryRuntime, type TaskRegistryHookEvent } from "./task-registry.store.js";
import type { TaskRecord } from "./task-registry.types.js";
function createStoredTask(): TaskRecord {
return {
taskId: "task-restored",
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:restored",
runId: "run-restored",
task: "Restored task",
status: "running",
deliveryStatus: "pending",
notifyPolicy: "done_only",
createdAt: 100,
lastEventAt: 100,
};
}
describe("task-registry store runtime", () => {
afterEach(() => {
resetTaskRegistryForTests({ persist: false });
});
it("uses the configured task store for restore and save", () => {
const storedTask = createStoredTask();
const loadSnapshot = vi.fn(() => new Map([[storedTask.taskId, storedTask]]));
const saveSnapshot = vi.fn();
configureTaskRegistryRuntime({
store: {
loadSnapshot,
saveSnapshot,
},
});
expect(findTaskByRunId("run-restored")).toMatchObject({
taskId: "task-restored",
task: "Restored task",
});
expect(loadSnapshot).toHaveBeenCalledTimes(1);
createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:new",
runId: "run-new",
task: "New task",
status: "running",
deliveryStatus: "pending",
});
expect(saveSnapshot).toHaveBeenCalled();
const latestSnapshot = saveSnapshot.mock.calls.at(-1)?.[0] as ReadonlyMap<string, TaskRecord>;
expect(latestSnapshot.size).toBe(2);
expect(latestSnapshot.get("task-restored")?.task).toBe("Restored task");
});
it("emits incremental hook events for restore, mutation, and delete", () => {
const events: TaskRegistryHookEvent[] = [];
configureTaskRegistryRuntime({
store: {
loadSnapshot: () => new Map([[createStoredTask().taskId, createStoredTask()]]),
saveSnapshot: () => {},
},
hooks: {
onEvent: (event) => {
events.push(event);
},
},
});
expect(findTaskByRunId("run-restored")).toBeTruthy();
const created = createTaskRecord({
source: "sessions_spawn",
runtime: "acp",
requesterSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:new",
runId: "run-new",
task: "New task",
status: "running",
deliveryStatus: "pending",
});
expect(deleteTaskRecordById(created.taskId)).toBe(true);
expect(events.map((event) => event.kind)).toEqual(["restored", "upserted", "deleted"]);
expect(events[0]).toMatchObject({
kind: "restored",
tasks: [expect.objectContaining({ taskId: "task-restored" })],
});
expect(events[1]).toMatchObject({
kind: "upserted",
task: expect.objectContaining({ taskId: created.taskId }),
});
expect(events[2]).toMatchObject({
kind: "deleted",
taskId: created.taskId,
});
});
});

View File

@@ -1,68 +1,64 @@
import os from "node:os";
import path from "node:path";
import { resolveStateDir } from "../config/paths.js";
import { loadJsonFile, saveJsonFile } from "../infra/json-file.js";
import {
loadTaskRegistrySnapshotFromJson,
saveTaskRegistrySnapshotToJson,
} from "./task-registry.store.json.js";
import type { TaskRecord } from "./task-registry.types.js";
type PersistedTaskRegistry = {
version: 1;
tasks: Record<string, TaskRecord>;
export type TaskRegistryStore = {
loadSnapshot: () => Map<string, TaskRecord>;
saveSnapshot: (tasks: ReadonlyMap<string, TaskRecord>) => void;
};
const TASK_REGISTRY_VERSION = 1 as const;
function resolveTaskStateDir(env: NodeJS.ProcessEnv = process.env): string {
const explicit = env.OPENCLAW_STATE_DIR?.trim();
if (explicit) {
return resolveStateDir(env);
}
if (env.VITEST || env.NODE_ENV === "test") {
return path.join(os.tmpdir(), "openclaw-test-state", String(process.pid));
}
return resolveStateDir(env);
}
export function resolveTaskRegistryPath(): string {
return path.join(resolveTaskStateDir(process.env), "tasks", "runs.json");
}
export function loadTaskRegistryFromDisk(): Map<string, TaskRecord> {
const pathname = resolveTaskRegistryPath();
const raw = loadJsonFile(pathname);
if (!raw || typeof raw !== "object") {
return new Map();
}
const record = raw as Partial<PersistedTaskRegistry>;
if (record.version !== TASK_REGISTRY_VERSION) {
return new Map();
}
const tasksRaw = record.tasks;
if (!tasksRaw || typeof tasksRaw !== "object") {
return new Map();
}
const out = new Map<string, TaskRecord>();
for (const [taskId, entry] of Object.entries(tasksRaw)) {
if (!entry || typeof entry !== "object") {
continue;
export type TaskRegistryHookEvent =
| {
kind: "restored";
tasks: TaskRecord[];
}
const typed = entry;
if (!typed.taskId || typeof typed.taskId !== "string") {
continue;
| {
kind: "upserted";
task: TaskRecord;
previous?: TaskRecord;
}
out.set(taskId, typed);
}
return out;
| {
kind: "deleted";
taskId: string;
previous: TaskRecord;
};
export type TaskRegistryHooks = {
// Hooks are incremental/observational. Snapshot persistence belongs to TaskRegistryStore.
onEvent?: (event: TaskRegistryHookEvent) => void;
};
const defaultTaskRegistryStore: TaskRegistryStore = {
loadSnapshot: loadTaskRegistrySnapshotFromJson,
saveSnapshot: saveTaskRegistrySnapshotToJson,
};
let configuredTaskRegistryStore: TaskRegistryStore = defaultTaskRegistryStore;
let configuredTaskRegistryHooks: TaskRegistryHooks | null = null;
export function getTaskRegistryStore(): TaskRegistryStore {
return configuredTaskRegistryStore;
}
export function saveTaskRegistryToDisk(tasks: Map<string, TaskRecord>) {
const pathname = resolveTaskRegistryPath();
const serialized: Record<string, TaskRecord> = {};
for (const [taskId, entry] of tasks.entries()) {
serialized[taskId] = entry;
}
const out: PersistedTaskRegistry = {
version: TASK_REGISTRY_VERSION,
tasks: serialized,
};
saveJsonFile(pathname, out);
export function getTaskRegistryHooks(): TaskRegistryHooks | null {
return configuredTaskRegistryHooks;
}
export function configureTaskRegistryRuntime(params: {
store?: TaskRegistryStore;
hooks?: TaskRegistryHooks | null;
}) {
if (params.store) {
configuredTaskRegistryStore = params.store;
}
if ("hooks" in params) {
configuredTaskRegistryHooks = params.hooks ?? null;
}
}
export function resetTaskRegistryRuntimeForTests() {
configuredTaskRegistryStore = defaultTaskRegistryStore;
configuredTaskRegistryHooks = null;
}

View File

@@ -9,7 +9,12 @@ import { createSubsystemLogger } from "../logging/subsystem.js";
import { parseAgentSessionKey } from "../routing/session-key.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
import { loadTaskRegistryFromDisk, saveTaskRegistryToDisk } from "./task-registry.store.js";
import {
getTaskRegistryHooks,
getTaskRegistryStore,
resetTaskRegistryRuntimeForTests,
type TaskRegistryHookEvent,
} from "./task-registry.store.js";
import type {
TaskBindingTargetKind,
TaskDeliveryStatus,
@@ -44,8 +49,27 @@ function cloneTaskRecord(record: TaskRecord): TaskRecord {
};
}
function snapshotTaskRecords(source: ReadonlyMap<string, TaskRecord>): TaskRecord[] {
return [...source.values()].map((record) => cloneTaskRecord(record));
}
function emitTaskRegistryHookEvent(createEvent: () => TaskRegistryHookEvent): void {
const hooks = getTaskRegistryHooks();
if (!hooks?.onEvent) {
return;
}
try {
hooks.onEvent(createEvent());
} catch (error) {
log.warn("Task registry hook failed", {
event: "task-registry",
error,
});
}
}
function persistTaskRegistry() {
saveTaskRegistryToDisk(tasks);
getTaskRegistryStore().saveSnapshot(tasks);
}
function ensureDeliveryStatus(requesterSessionKey: string): TaskDeliveryStatus {
@@ -256,7 +280,7 @@ function restoreTaskRegistryOnce() {
}
restoreAttempted = true;
try {
const restored = loadTaskRegistryFromDisk();
const restored = getTaskRegistryStore().loadSnapshot();
if (restored.size === 0) {
return;
}
@@ -264,6 +288,10 @@ function restoreTaskRegistryOnce() {
tasks.set(taskId, task);
}
rebuildRunIdIndex();
emitTaskRegistryHookEvent(() => ({
kind: "restored",
tasks: snapshotTaskRecords(tasks),
}));
} catch (error) {
log.warn("Failed to restore task registry", { error });
}
@@ -285,6 +313,11 @@ function updateTask(taskId: string, patch: Partial<TaskRecord>): TaskRecord | nu
rebuildRunIdIndex();
}
persistTaskRegistry();
emitTaskRegistryHookEvent(() => ({
kind: "upserted",
task: cloneTaskRecord(next),
previous: cloneTaskRecord(current),
}));
return cloneTaskRecord(next);
}
@@ -718,6 +751,10 @@ export function createTaskRecord(params: {
tasks.set(taskId, record);
addRunIdIndex(taskId, record.runId);
persistTaskRegistry();
emitTaskRegistryHookEvent(() => ({
kind: "upserted",
task: cloneTaskRecord(record),
}));
return cloneTaskRecord(record);
}
@@ -958,6 +995,11 @@ export function deleteTaskRecordById(taskId: string): boolean {
tasks.delete(taskId);
rebuildRunIdIndex();
persistTaskRegistry();
emitTaskRegistryHookEvent(() => ({
kind: "deleted",
taskId: current.taskId,
previous: cloneTaskRecord(current),
}));
return true;
}
@@ -965,6 +1007,7 @@ export function resetTaskRegistryForTests(opts?: { persist?: boolean }) {
tasks.clear();
taskIdsByRunId.clear();
restoreAttempted = false;
resetTaskRegistryRuntimeForTests();
if (listenerStop) {
listenerStop();
listenerStop = null;