From 5a9c0efa548b39a66e60bb22f003421ffc17ea86 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 29 Apr 2026 04:35:00 +0100 Subject: [PATCH] fix(tasks): clean orphaned parent-owned acp sessions --- CHANGELOG.md | 1 + docs/automation/tasks.md | 2 +- docs/tools/acp-agents.md | 2 +- ...k-registry.maintenance.issue-60299.test.ts | 1 + src/tasks/task-registry.maintenance.ts | 102 ++++++++++++++++- src/tasks/task-registry.test.ts | 108 ++++++++++++++++++ 6 files changed, 209 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 14b96e0a978..cc28cad806f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ Docs: https://docs.openclaw.ai - NVIDIA/NIM: persist the `NVIDIA_API_KEY` provider marker and mark bundled NVIDIA Chat Completions models as string-content compatible, so NIM models load from `models.json` and OpenAI-compatible subagent calls send plain text content. Fixes #73013 and #50107; refs #73014. Thanks @bautrey, @iot2edge, @ifearghal, and @futhgar. - Channels/Discord: let text-only configs drop the `GuildVoiceStates` gateway intent and expose a bounded `/gateway/bot` metadata timeout with rate-limited fallback logs, reducing idle CPU and warning floods. Fixes #73709 and #73585. Thanks @sanchezm86 and @trac3r00. - Agents/sessions: mark same-turn `sessions_send` and A2A reply prompts with an inter-session `isUser=false` envelope before they reach the model, so foreign session output no longer lands as bare active user text. Fixes #73702; refs #73698, #73609, #73595, and #73622. Thanks @alvelda. +- ACP/tasks: sweep orphaned parent-owned ACP sessions whose task records are gone, preserving bound persistent sessions but clearing unbound stale ACPX metadata so old child sessions cannot silently respawn into chat. Fixes #73609. Thanks @joerod26. - Outbound/security: strip known internal runtime scaffolding such as `` and `` at the final channel delivery boundary and keep Discord output on targeted tag stripping, so degraded harness replies cannot leak those tags to users. Fixes #73595. Thanks @gabrielexito-stack and @martingarramon. - Security/Telegram: load Telegram security adapters in read-only audit/doctor, audit malformed Telegram DM `allowFrom` entries even when groups are disabled, and keep allowlist DM audits from counting stale pairing-store senders, so public/shared-DM risk checks stay accurate. Refs #73698. Thanks @xace1825. - Plugins: remove hidden manifest, provider-owner, bootstrap, and channel metadata caches so plugin installs, manifest edits, and bundled-root changes are visible on the next metadata read while keeping runtime/module loader caches for actual plugin code. Thanks @shakkernerd. diff --git a/docs/automation/tasks.md b/docs/automation/tasks.md index 8d1a9b75152..2690439125f 100644 --- a/docs/automation/tasks.md +++ b/docs/automation/tasks.md @@ -318,7 +318,7 @@ A sweeper runs every **60 seconds** and handles four things: Checks whether active tasks still have authoritative runtime backing. ACP/subagent tasks use child-session state, cron tasks use active-job ownership, and chat-backed CLI tasks use the owning run context. If that backing state is gone for more than 5 minutes, the task is marked `lost`. - Closes terminal parent-owned one-shot ACP sessions, and closes stale terminal persistent ACP sessions only when no active conversation binding remains. + Closes terminal or orphaned parent-owned one-shot ACP sessions, and closes stale terminal or orphaned persistent ACP sessions only when no active conversation binding remains. Sets a `cleanupAfter` timestamp on terminal tasks (endedAt + 7 days). During retention, lost tasks still appear in audit as warnings; after `cleanupAfter` expires or when cleanup metadata is missing, they are errors. diff --git a/docs/tools/acp-agents.md b/docs/tools/acp-agents.md index 4b53e39b722..34111c2ffb4 100644 --- a/docs/tools/acp-agents.md +++ b/docs/tools/acp-agents.md @@ -143,7 +143,7 @@ Quick `/acp` flow from chat: - Spawn creates or resumes an ACP runtime session, records ACP metadata in the OpenClaw session store, and may create a background task when the run is parent-owned. - Parent-owned ACP sessions are treated as background work even when the runtime session is persistent; completion and cross-surface delivery go through the parent task notifier rather than acting like a normal user-facing chat session. - - Task maintenance closes terminal parent-owned one-shot ACP sessions. Persistent ACP sessions are preserved while an active conversation binding remains; stale persistent sessions without an active binding are closed so they cannot be silently resumed after the owning task is done. + - Task maintenance closes terminal or orphaned parent-owned one-shot ACP sessions. Persistent ACP sessions are preserved while an active conversation binding remains; stale persistent sessions without an active binding are closed so they cannot be silently resumed after the owning task is done or its task record is gone. - Bound follow-up messages go directly to the ACP session until the binding is closed, unfocused, reset, or expired. - Gateway commands stay local. `/acp ...`, `/status`, and `/unfocus` are never sent as normal prompt text to a bound ACP harness. - `cancel` aborts the active turn when the backend supports cancellation; it does not delete the binding or session metadata. diff --git a/src/tasks/task-registry.maintenance.issue-60299.test.ts b/src/tasks/task-registry.maintenance.issue-60299.test.ts index 20e369a37d7..68b6e5b5255 100644 --- a/src/tasks/task-registry.maintenance.issue-60299.test.ts +++ b/src/tasks/task-registry.maintenance.issue-60299.test.ts @@ -68,6 +68,7 @@ function createTaskRegistryMaintenanceHarness(params: { const currentTasks = new Map(params.tasks.map((task) => [task.taskId, { ...task }])); const runtime: TaskRegistryMaintenanceRuntime = { + listAcpSessionEntries: async () => [], readAcpSessionEntry: () => acpEntry !== undefined ? ({ diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index c07bdab1a56..2607ab4e7c0 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -1,5 +1,9 @@ import { getAcpSessionManager } from "../acp/control-plane/manager.js"; -import { readAcpSessionEntry } from "../acp/runtime/session-meta.js"; +import { + listAcpSessionEntries, + readAcpSessionEntry, + type AcpSessionStoreEntry, +} from "../acp/runtime/session-meta.js"; import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { isCronJobActive } from "../cron/active-jobs.js"; @@ -55,6 +59,7 @@ let configuredCronStorePath: string | undefined; let configuredCronRuntimeAuthoritative = false; type TaskRegistryMaintenanceRuntime = { + listAcpSessionEntries: typeof listAcpSessionEntries; readAcpSessionEntry: typeof readAcpSessionEntry; closeAcpSession?: (params: { cfg: OpenClawConfig; @@ -85,6 +90,7 @@ type TaskRegistryMaintenanceRuntime = { }; const defaultTaskRegistryMaintenanceRuntime: TaskRegistryMaintenanceRuntime = { + listAcpSessionEntries, readAcpSessionEntry, closeAcpSession: async ({ cfg, sessionKey, reason }) => { await getAcpSessionManager().closeSession({ @@ -421,6 +427,24 @@ function hasActiveTaskForChildSession(task: TaskRecord, tasks: TaskRecord[]): bo ); } +function hasActiveTaskForChildSessionKey(sessionKey: string, tasks: TaskRecord[]): boolean { + const normalized = normalizeOptionalString(sessionKey); + if (!normalized) { + return false; + } + return tasks.some( + (candidate) => + isActiveTask(candidate) && getNormalizedTaskChildSessionKey(candidate) === normalized, + ); +} + +function getAcpSessionParentKeys(acpEntry: Pick): string[] { + return [ + normalizeOptionalString(acpEntry.entry?.spawnedBy), + normalizeOptionalString(acpEntry.entry?.parentSessionKey), + ].filter((value): value is string => Boolean(value)); +} + function isParentOwnedAcpSessionTask( task: TaskRecord, acpEntry: ReturnType, @@ -431,13 +455,14 @@ function isParentOwnedAcpSessionTask( } const ownerKey = normalizeOptionalString(task.ownerKey); const requesterKey = normalizeOptionalString(task.requesterSessionKey); - const parentKeys = [ - normalizeOptionalString(entry.spawnedBy), - normalizeOptionalString(entry.parentSessionKey), - ].filter((value): value is string => Boolean(value)); + const parentKeys = getAcpSessionParentKeys({ entry }); return parentKeys.some((parentKey) => parentKey === ownerKey || parentKey === requesterKey); } +function isParentOwnedAcpSessionEntry(acpEntry: Pick): boolean { + return getAcpSessionParentKeys(acpEntry).length > 0; +} + function hasActiveSessionBinding(sessionKey: string): boolean { const listBindings = taskRegistryMaintenanceRuntime.listSessionBindingsBySession; if (!listBindings) { @@ -471,6 +496,23 @@ function shouldCloseTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): b return !hasActiveSessionBinding(sessionKey); } +function shouldCloseOrphanedParentOwnedAcpSession( + acpEntry: AcpSessionStoreEntry, + tasks: TaskRecord[], +): boolean { + if (!acpEntry.entry || !acpEntry.acp || !isParentOwnedAcpSessionEntry(acpEntry)) { + return false; + } + const sessionKey = normalizeOptionalString(acpEntry.sessionKey); + if (!sessionKey || hasActiveTaskForChildSessionKey(sessionKey, tasks)) { + return false; + } + if (acpEntry.acp.mode === "oneshot") { + return true; + } + return !hasActiveSessionBinding(sessionKey); +} + async function cleanupTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): Promise { if (!shouldCloseTerminalAcpSession(task, tasks)) { return; @@ -512,6 +554,55 @@ async function cleanupTerminalAcpSession(task: TaskRecord, tasks: TaskRecord[]): } } +async function cleanupOrphanedParentOwnedAcpSessions(tasks: TaskRecord[]): Promise { + let acpSessions: AcpSessionStoreEntry[]; + try { + acpSessions = await taskRegistryMaintenanceRuntime.listAcpSessionEntries({}); + } catch (error) { + log.warn("Failed to list ACP sessions during task maintenance", { error }); + return; + } + const seenSessionKeys = new Set(); + for (const acpEntry of acpSessions) { + const sessionKey = normalizeOptionalString(acpEntry.sessionKey); + if (!sessionKey || seenSessionKeys.has(sessionKey)) { + continue; + } + seenSessionKeys.add(sessionKey); + if (!shouldCloseOrphanedParentOwnedAcpSession(acpEntry, tasks)) { + continue; + } + const closeAcpSession = taskRegistryMaintenanceRuntime.closeAcpSession; + if (!closeAcpSession) { + continue; + } + try { + await closeAcpSession({ + cfg: acpEntry.cfg, + sessionKey, + reason: "orphaned-parent-task-cleanup", + }); + } catch (error) { + log.warn("Failed to close orphaned parent-owned ACP session during task maintenance", { + sessionKey, + error, + }); + continue; + } + try { + await taskRegistryMaintenanceRuntime.unbindSessionBindings?.({ + targetSessionKey: sessionKey, + reason: "orphaned-parent-task-cleanup", + }); + } catch (error) { + log.warn("Failed to unbind orphaned parent-owned ACP session during task maintenance", { + sessionKey, + error, + }); + } + } +} + function markTaskLost(task: TaskRecord, now: number): TaskRecord { const cleanupAfter = task.cleanupAfter ?? projectTaskLost(task, now).cleanupAfter; const updated = @@ -754,6 +845,7 @@ export async function runTaskRegistryMaintenance(): Promise>; snapshotTasks: ReturnType[]; acpEntry?: AcpSessionStoreEntry; + acpEntries?: AcpSessionStoreEntry[]; sessionBindings?: SessionBindingRecord[]; closeAcpSession?: (params: { cfg: AcpSessionStoreEntry["cfg"]; @@ -112,6 +113,7 @@ function configureTaskRegistryMaintenanceRuntimeForTest(params: { storeReadFailed: false, } satisfies AcpSessionStoreEntry; setTaskRegistryMaintenanceRuntimeForTests({ + listAcpSessionEntries: async () => params.acpEntries ?? [], readAcpSessionEntry: () => params.acpEntry ?? emptyAcpEntry, listSessionBindingsBySession: () => params.sessionBindings ?? [], closeAcpSession: params.closeAcpSession, @@ -1726,6 +1728,111 @@ describe("task-registry", () => { }); }); + it("closes orphaned parent-owned one-shot ACP sessions after task records are gone", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + const parentSessionKey = "agent:main:telegram:direct:owner"; + const childSessionKey = "agent:claude:acp:orphaned-oneshot"; + const closeAcpSession = vi.fn().mockResolvedValue(undefined); + const unbindSessionBindings = vi.fn().mockResolvedValue([]); + + configureTaskRegistryMaintenanceRuntimeForTest({ + currentTasks: new Map(), + snapshotTasks: [], + acpEntries: [ + createAcpSessionStoreEntry({ + sessionKey: childSessionKey, + parentSessionKey, + mode: "oneshot", + }), + ], + closeAcpSession, + unbindSessionBindings, + }); + + await runTaskRegistryMaintenance(); + + expect(closeAcpSession).toHaveBeenCalledWith({ + cfg: {}, + sessionKey: childSessionKey, + reason: "orphaned-parent-task-cleanup", + }); + expect(unbindSessionBindings).toHaveBeenCalledWith({ + targetSessionKey: childSessionKey, + reason: "orphaned-parent-task-cleanup", + }); + }); + }); + + it("keeps orphaned parent-owned persistent ACP sessions while a binding is active", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + const parentSessionKey = "agent:main:telegram:direct:owner"; + const childSessionKey = "agent:claude:acp:bound-orphaned-persistent"; + const closeAcpSession = vi.fn().mockResolvedValue(undefined); + const unbindSessionBindings = vi.fn().mockResolvedValue([]); + + configureTaskRegistryMaintenanceRuntimeForTest({ + currentTasks: new Map(), + snapshotTasks: [], + acpEntries: [ + createAcpSessionStoreEntry({ + sessionKey: childSessionKey, + parentSessionKey, + mode: "persistent", + }), + ], + sessionBindings: [createSessionBindingRecord({ targetSessionKey: childSessionKey })], + closeAcpSession, + unbindSessionBindings, + }); + + await runTaskRegistryMaintenance(); + + expect(closeAcpSession).not.toHaveBeenCalled(); + expect(unbindSessionBindings).not.toHaveBeenCalled(); + }); + }); + + it("closes orphaned parent-owned persistent ACP sessions without active bindings", async () => { + await withTaskRegistryTempDir(async (root) => { + process.env.OPENCLAW_STATE_DIR = root; + resetTaskRegistryForTests(); + const parentSessionKey = "agent:main:telegram:direct:owner"; + const childSessionKey = "agent:claude:acp:unbound-orphaned-persistent"; + const closeAcpSession = vi.fn().mockResolvedValue(undefined); + const unbindSessionBindings = vi.fn().mockResolvedValue([]); + + configureTaskRegistryMaintenanceRuntimeForTest({ + currentTasks: new Map(), + snapshotTasks: [], + acpEntries: [ + createAcpSessionStoreEntry({ + sessionKey: childSessionKey, + parentSessionKey, + mode: "persistent", + }), + ], + closeAcpSession, + unbindSessionBindings, + }); + + await runTaskRegistryMaintenance(); + + expect(closeAcpSession).toHaveBeenCalledWith({ + cfg: {}, + sessionKey: childSessionKey, + reason: "orphaned-parent-task-cleanup", + }); + expect(unbindSessionBindings).toHaveBeenCalledWith({ + targetSessionKey: childSessionKey, + reason: "orphaned-parent-task-cleanup", + }); + }); + }); + it("prunes old terminal tasks during maintenance sweeps", async () => { await withTaskRegistryTempDir(async (root) => { process.env.OPENCLAW_STATE_DIR = root; @@ -1856,6 +1963,7 @@ describe("task-registry", () => { process.on("unhandledRejection", onUnhandledRejection); setTaskRegistryMaintenanceRuntimeForTests({ + listAcpSessionEntries: async () => [], readAcpSessionEntry: () => ({ cfg: {} as never, storePath: "",