diff --git a/src/agents/subagent-registry.announce-loop-guard.test.ts b/src/agents/subagent-registry.announce-loop-guard.test.ts index 5a2bfb2dbec..8389c53503c 100644 --- a/src/agents/subagent-registry.announce-loop-guard.test.ts +++ b/src/agents/subagent-registry.announce-loop-guard.test.ts @@ -16,7 +16,11 @@ vi.mock("../config/config.js", () => ({ })); vi.mock("../config/sessions.js", () => ({ - loadSessionStore: () => ({}), + loadSessionStore: () => ({ + "agent:main:subagent:child-1": { sessionId: "sess-child-1", updatedAt: 1 }, + "agent:main:subagent:expired-child": { sessionId: "sess-expired", updatedAt: 1 }, + "agent:main:subagent:retry-budget": { sessionId: "sess-retry", updatedAt: 1 }, + }), resolveAgentIdFromSessionKey: (key: string) => { const match = key.match(/^agent:([^:]+)/); return match?.[1] ?? "main"; diff --git a/src/agents/subagent-registry.persistence.test.ts b/src/agents/subagent-registry.persistence.test.ts index 9ef2458e35c..5558d77785e 100644 --- a/src/agents/subagent-registry.persistence.test.ts +++ b/src/agents/subagent-registry.persistence.test.ts @@ -5,7 +5,10 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import "./subagent-registry.mocks.shared.js"; import { captureEnv } from "../test-utils/env.js"; import { + addSubagentRunForTests, + clearSubagentRunSteerRestart, initSubagentRegistry, + listSubagentRunsForRequester, registerSubagentRun, resetSubagentRegistryForTests, } from "./subagent-registry.js"; @@ -22,12 +25,93 @@ describe("subagent registry persistence", () => { const envSnapshot = captureEnv(["OPENCLAW_STATE_DIR"]); let tempStateDir: string | null = null; - const writePersistedRegistry = async (persisted: Record) => { + const resolveAgentIdFromSessionKey = (sessionKey: string) => { + const match = sessionKey.match(/^agent:([^:]+):/i); + return (match?.[1] ?? "main").trim().toLowerCase() || "main"; + }; + + const resolveSessionStorePath = (stateDir: string, agentId: string) => + path.join(stateDir, "agents", agentId, "sessions", "sessions.json"); + + const readSessionStore = async (storePath: string) => { + try { + const raw = await fs.readFile(storePath, "utf8"); + const parsed = JSON.parse(raw) as unknown; + if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) { + return parsed as Record>; + } + } catch { + // ignore + } + return {} as Record>; + }; + + const writeChildSessionEntry = async (params: { + sessionKey: string; + sessionId?: string; + updatedAt?: number; + }) => { + if (!tempStateDir) { + throw new Error("tempStateDir not initialized"); + } + const agentId = resolveAgentIdFromSessionKey(params.sessionKey); + const storePath = resolveSessionStorePath(tempStateDir, agentId); + const store = await readSessionStore(storePath); + store[params.sessionKey] = { + ...(store[params.sessionKey] ?? {}), + sessionId: params.sessionId ?? `sess-${agentId}-${Date.now()}`, + updatedAt: params.updatedAt ?? Date.now(), + }; + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, `${JSON.stringify(store)}\n`, "utf8"); + return storePath; + }; + + const removeChildSessionEntry = async (sessionKey: string) => { + if (!tempStateDir) { + throw new Error("tempStateDir not initialized"); + } + const agentId = resolveAgentIdFromSessionKey(sessionKey); + const storePath = resolveSessionStorePath(tempStateDir, agentId); + const store = await readSessionStore(storePath); + delete store[sessionKey]; + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, `${JSON.stringify(store)}\n`, "utf8"); + return storePath; + }; + + const seedChildSessionsForPersistedRuns = async (persisted: Record) => { + const runs = (persisted.runs ?? {}) as Record< + string, + { + runId?: string; + childSessionKey?: string; + } + >; + for (const [runId, run] of Object.entries(runs)) { + const childSessionKey = run?.childSessionKey?.trim(); + if (!childSessionKey) { + continue; + } + await writeChildSessionEntry({ + sessionKey: childSessionKey, + sessionId: `sess-${run.runId ?? runId}`, + }); + } + }; + + const writePersistedRegistry = async ( + persisted: Record, + opts?: { seedChildSessions?: boolean }, + ) => { tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-subagent-")); process.env.OPENCLAW_STATE_DIR = tempStateDir; const registryPath = path.join(tempStateDir, "subagents", "runs.json"); await fs.mkdir(path.dirname(registryPath), { recursive: true }); await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8"); + if (opts?.seedChildSessions !== false) { + await seedChildSessionsForPersistedRuns(persisted); + } return registryPath; }; @@ -90,6 +174,10 @@ describe("subagent registry persistence", () => { task: "do the thing", cleanup: "keep", }); + await writeChildSessionEntry({ + sessionKey: "agent:main:subagent:test", + sessionId: "sess-test", + }); const registryPath = path.join(tempStateDir, "subagents", "runs.json"); const raw = await fs.readFile(registryPath, "utf8"); @@ -162,6 +250,10 @@ describe("subagent registry persistence", () => { }; await fs.mkdir(path.dirname(registryPath), { recursive: true }); await fs.writeFile(registryPath, `${JSON.stringify(persisted)}\n`, "utf8"); + await writeChildSessionEntry({ + sessionKey: "agent:main:subagent:two", + sessionId: "sess-two", + }); resetSubagentRegistryForTests({ persist: false }); initSubagentRegistry(); @@ -268,6 +360,64 @@ describe("subagent registry persistence", () => { expect(afterSecond.runs?.["run-4"]).toBeUndefined(); }); + it("reconciles orphaned restored runs by pruning them from registry", async () => { + const persisted = createPersistedEndedRun({ + runId: "run-orphan-restore", + childSessionKey: "agent:main:subagent:ghost-restore", + task: "orphan restore", + cleanup: "keep", + }); + const registryPath = await writePersistedRegistry(persisted, { + seedChildSessions: false, + }); + + await restartRegistryAndFlush(); + + expect(announceSpy).not.toHaveBeenCalled(); + const after = JSON.parse(await fs.readFile(registryPath, "utf8")) as { + runs?: Record; + }; + expect(after.runs?.["run-orphan-restore"]).toBeUndefined(); + expect(listSubagentRunsForRequester("agent:main:main")).toHaveLength(0); + }); + + it("resume guard prunes orphan runs before announce retry", async () => { + tempStateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-subagent-")); + process.env.OPENCLAW_STATE_DIR = tempStateDir; + const runId = "run-orphan-resume-guard"; + const childSessionKey = "agent:main:subagent:ghost-resume"; + const now = Date.now(); + + await writeChildSessionEntry({ + sessionKey: childSessionKey, + sessionId: "sess-resume-guard", + updatedAt: now, + }); + addSubagentRunForTests({ + runId, + childSessionKey, + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: "resume orphan guard", + cleanup: "keep", + createdAt: now - 50, + startedAt: now - 25, + endedAt: now, + suppressAnnounceReason: "steer-restart", + cleanupHandled: false, + }); + await removeChildSessionEntry(childSessionKey); + + const changed = clearSubagentRunSteerRestart(runId); + expect(changed).toBe(true); + await flushQueuedRegistryWork(); + + expect(announceSpy).not.toHaveBeenCalled(); + expect(listSubagentRunsForRequester("agent:main:main")).toHaveLength(0); + const persisted = loadSubagentRegistryFromDisk(); + expect(persisted.has(runId)).toBe(false); + }); + it("uses isolated temp state when OPENCLAW_STATE_DIR is unset in tests", async () => { delete process.env.OPENCLAW_STATE_DIR; vi.resetModules(); diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 8506b77d53e..edb8f228b07 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -1,4 +1,10 @@ import { loadConfig } from "../config/config.js"; +import { + loadSessionStore, + resolveAgentIdFromSessionKey, + resolveStorePath, + type SessionEntry, +} from "../config/sessions.js"; import { callGateway } from "../gateway/call.js"; import { onAgentEvent } from "../infra/agent-events.js"; import { defaultRuntime } from "../runtime.js"; @@ -59,6 +65,7 @@ const MAX_ANNOUNCE_RETRY_COUNT = 3; * succeeded. Guards against stale registry entries surviving gateway restarts. */ const ANNOUNCE_EXPIRY_MS = 5 * 60_000; // 5 minutes +type SubagentRunOrphanReason = "missing-session-entry" | "missing-session-id"; function resolveAnnounceRetryDelayMs(retryCount: number) { const boundedRetryCount = Math.max(0, Math.min(retryCount, 10)); @@ -82,6 +89,119 @@ function persistSubagentRuns() { persistSubagentRunsToDisk(subagentRuns); } +function findSessionEntryByKey(store: Record, sessionKey: string) { + const direct = store[sessionKey]; + if (direct) { + return direct; + } + const normalized = sessionKey.toLowerCase(); + for (const [key, entry] of Object.entries(store)) { + if (key.toLowerCase() === normalized) { + return entry; + } + } + return undefined; +} + +function resolveSubagentRunOrphanReason(params: { + entry: SubagentRunRecord; + storeCache?: Map>; +}): SubagentRunOrphanReason | null { + const childSessionKey = params.entry.childSessionKey?.trim(); + if (!childSessionKey) { + return "missing-session-entry"; + } + try { + const cfg = loadConfig(); + const agentId = resolveAgentIdFromSessionKey(childSessionKey); + const storePath = resolveStorePath(cfg.session?.store, { agentId }); + let store = params.storeCache?.get(storePath); + if (!store) { + store = loadSessionStore(storePath); + params.storeCache?.set(storePath, store); + } + const sessionEntry = findSessionEntryByKey(store, childSessionKey); + if (!sessionEntry) { + return "missing-session-entry"; + } + if (typeof sessionEntry.sessionId !== "string" || !sessionEntry.sessionId.trim()) { + return "missing-session-id"; + } + return null; + } catch { + // Best-effort guard: avoid false orphan pruning on transient read/config failures. + return null; + } +} + +function reconcileOrphanedRun(params: { + runId: string; + entry: SubagentRunRecord; + reason: SubagentRunOrphanReason; + source: "restore" | "resume"; +}) { + const now = Date.now(); + let changed = false; + if (typeof params.entry.endedAt !== "number") { + params.entry.endedAt = now; + changed = true; + } + const orphanOutcome: SubagentRunOutcome = { + status: "error", + error: `orphaned subagent run (${params.reason})`, + }; + if (!runOutcomesEqual(params.entry.outcome, orphanOutcome)) { + params.entry.outcome = orphanOutcome; + changed = true; + } + if (params.entry.endedReason !== SUBAGENT_ENDED_REASON_ERROR) { + params.entry.endedReason = SUBAGENT_ENDED_REASON_ERROR; + changed = true; + } + if (params.entry.cleanupHandled !== true) { + params.entry.cleanupHandled = true; + changed = true; + } + if (typeof params.entry.cleanupCompletedAt !== "number") { + params.entry.cleanupCompletedAt = now; + changed = true; + } + const removed = subagentRuns.delete(params.runId); + resumedRuns.delete(params.runId); + if (!removed && !changed) { + return false; + } + defaultRuntime.log( + `[warn] Subagent orphan run pruned source=${params.source} run=${params.runId} child=${params.entry.childSessionKey} reason=${params.reason}`, + ); + return true; +} + +function reconcileOrphanedRestoredRuns() { + const storeCache = new Map>(); + let changed = false; + for (const [runId, entry] of subagentRuns.entries()) { + const orphanReason = resolveSubagentRunOrphanReason({ + entry, + storeCache, + }); + if (!orphanReason) { + continue; + } + if ( + reconcileOrphanedRun({ + runId, + entry, + reason: orphanReason, + source: "restore", + }) + ) { + changed = true; + } + } + return changed; +} + const resumedRuns = new Set(); const endedHookInFlightRunIds = new Set(); @@ -225,6 +345,20 @@ function resumeSubagentRun(runId: string) { if (!entry) { return; } + const orphanReason = resolveSubagentRunOrphanReason({ entry }); + if (orphanReason) { + if ( + reconcileOrphanedRun({ + runId, + entry, + reason: orphanReason, + source: "resume", + }) + ) { + persistSubagentRuns(); + } + return; + } if (entry.cleanupCompletedAt) { return; } @@ -290,6 +424,12 @@ function restoreSubagentRunsOnce() { if (restoredCount === 0) { return; } + if (reconcileOrphanedRestoredRuns()) { + persistSubagentRuns(); + } + if (subagentRuns.size === 0) { + return; + } // Resume pending work. ensureListener(); if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) {