From 7eefb26bc8d8dc14ec77d234d3aca73684a59907 Mon Sep 17 00:00:00 2001 From: Kaspre <36520309+Kaspre@users.noreply.github.com> Date: Mon, 11 May 2026 07:13:17 -0400 Subject: [PATCH] fix(heartbeat): remap cron-run exec events to session keys (#80214) Summary: - Remap cron-run async exec, ACP, node-event, and watchdog completion events to the queue heartbeat drains. - Route per-sender cron-run events to the agent main queue and global-scope events to `global` while preserving the originating `agentId`. - Tighten exec-completion classification and treat cron-run descendants as ephemeral for retention pruning. Verification: - CI: https://github.com/openclaw/openclaw/actions/runs/25625964382 passed. - Real behavior proof: https://github.com/openclaw/openclaw/actions/runs/25666664248/job/75340798016 passed. - PR targeted validation: `node scripts/run-vitest.mjs run --config test/vitest/vitest.unit.config.ts src/routing/session-key.test.ts src/infra/heartbeat-events-filter.test.ts src/agents/bash-tools.exec-runtime.test.ts src/agents/acp-spawn-parent-stream.test.ts src/cron/session-reaper.test.ts`. - PR targeted typecheck: `pnpm tsgo:core`. Refs #52305. Related: #18237, #14191. Co-authored-by: Kaspre --- CHANGELOG.md | 1 + src/agents/acp-spawn-parent-stream.test.ts | 39 ++++++ src/agents/acp-spawn-parent-stream.ts | 34 ++++- src/agents/acp-spawn.ts | 4 + src/agents/bash-process-registry.ts | 11 ++ src/agents/bash-tools.exec-runtime.test.ts | 43 ++++++ src/agents/bash-tools.exec-runtime.ts | 63 ++++++--- src/agents/bash-tools.exec-types.ts | 8 ++ src/agents/bash-tools.exec.ts | 2 + src/agents/cli-runner/execute.ts | 27 +++- src/agents/pi-tools.ts | 2 + src/auto-reply/reply/bash-command.ts | 2 + src/auto-reply/reply/commands-diagnostics.ts | 2 + .../reply/commands-export-trajectory.ts | 2 + src/cron/session-reaper.test.ts | 19 ++- src/gateway/server-node-events.ts | 20 ++- src/infra/heartbeat-events-filter.test.ts | 39 ++++++ src/routing/session-key.test.ts | 131 ++++++++++++++++++ src/routing/session-key.ts | 42 +++++- src/sessions/session-key-utils.ts | 2 +- 20 files changed, 451 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f7cca175a70..39b23a8c68c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -552,6 +552,7 @@ Docs: https://docs.openclaw.ai - OpenRouter: keep the default `openrouter/auto` model ref canonical while preventing TUI and Control UI catalog pickers from displaying or submitting `openrouter/openrouter/auto`. Fixes #62655. - Status/Claude CLI: show `oauth (claude-cli)` for working Claude CLI OAuth runtime sessions instead of `unknown` when no local auth profile exists. Fixes #78632. Thanks @gorkem2020. - Memory search: preserve keyword-only hybrid FTS matches when vector scoring is unavailable or below the configured minimum score, so exact lexical hits are not dropped by weighted min-score filtering. +- Heartbeat/async exec: remap cron-run session keys to agent-main (or `"global"` under `session.scope=global`) at the bash exec, ACP, gateway node-event, and CLI watchdog enqueue sites, and treat cron-run descendants as ephemeral for retention pruning, so async exec completion events land in the same queue the heartbeat drains instead of being stranded under the ephemeral cron-run key. Refs #52305. Thanks @Kaspre. - Exec approvals/node: let trusted backend node invokes complete no-device Control UI approvals after the original request connection changes, while keeping node, command, cwd, env, and allow-once replay bindings enforced. Fixes #78569. Thanks @naturedogdog. - Agents/subagents: keep background completion delivery on the requester-agent handoff/queue-retry path instead of raw-sending child results directly, and strip child-result wrapper or OpenClaw runtime-context scaffolding from queued outbound retries. Fixes #78531. Thanks @EthanSK. - Sandbox: recreate cached browser bridges when JavaScript-evaluation permission changes, keep failed prune removals tracked for retry, and make cross-device directory moves copy-then-commit without partially emptying the source on failure. diff --git a/src/agents/acp-spawn-parent-stream.test.ts b/src/agents/acp-spawn-parent-stream.test.ts index 5864eca1e48..ba3fdc6f8b0 100644 --- a/src/agents/acp-spawn-parent-stream.test.ts +++ b/src/agents/acp-spawn-parent-stream.test.ts @@ -154,6 +154,45 @@ describe("startAcpSpawnParentStreamRelay", () => { relay.dispose(); }); + it("remaps cron-run parent session keys while relaying stream events", () => { + const relay = startAcpSpawnParentStreamRelay({ + runId: "run-cron", + parentSessionKey: "agent:ops:cron:nightly:run:run-1:subagent:worker", + childSessionKey: "agent:codex:acp:child-cron", + agentId: "codex", + mainKey: "primary", + sessionScope: "global", + streamFlushMs: 10, + noOutputNoticeMs: 120_000, + }); + + emitAgentEvent({ + runId: "run-cron", + stream: "assistant", + data: { + delta: "hello from child", + }, + }); + vi.advanceTimersByTime(15); + + expect(enqueueSystemEventMock).toHaveBeenCalledWith( + expect.stringContaining("codex: hello from child"), + expect.objectContaining({ + contextKey: "acp-spawn:run-cron:progress", + sessionKey: "global", + trusted: false, + }), + ); + expect(requestHeartbeatMock).toHaveBeenCalledWith( + expect.objectContaining({ + agentId: "ops", + reason: "acp:spawn:stream", + }), + ); + expect(requestHeartbeatMock.mock.calls[0]?.[0]).not.toHaveProperty("sessionKey"); + relay.dispose(); + }); + it("emits a no-output notice and a resumed notice when output returns", () => { const relay = startAcpSpawnParentStreamRelay({ runId: "run-2", diff --git a/src/agents/acp-spawn-parent-stream.ts b/src/agents/acp-spawn-parent-stream.ts index d5eb0b125a3..3e0dec61f05 100644 --- a/src/agents/acp-spawn-parent-stream.ts +++ b/src/agents/acp-spawn-parent-stream.ts @@ -6,7 +6,7 @@ import { onAgentEvent } from "../infra/agent-events.js"; import { requestHeartbeat } from "../infra/heartbeat-wake.js"; import { appendRegularFile } from "../infra/regular-file.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; -import { scopedHeartbeatWakeOptions } from "../routing/session-key.js"; +import { resolveEventSessionKey, scopedHeartbeatWakeOptions } from "../routing/session-key.js"; import { normalizeAssistantPhase } from "../shared/chat-message-content.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; import { recordTaskRunProgressByRunId } from "../tasks/detached-task-runtime.js"; @@ -75,6 +75,21 @@ export function startAcpSpawnParentStreamRelay(params: { parentSessionKey: string; childSessionKey: string; agentId: string; + /** + * Optional `session.mainKey` from the runtime config. Used to remap + * cron-run parent session keys to the agent's main queue when relaying + * events. Caller passes the spawn-time `cfg.session?.mainKey`; pass-through + * of `undefined` falls back to the literal "main" default. Long-running + * relays keep using that start-time value if config changes while the child + * session is still streaming. + */ + mainKey?: string; + /** + * Optional `session.scope` from the runtime config. Required so global-scope + * agents route cron-run events to the "global" queue instead of agent-main. + * Snapshotted with `mainKey` for the same start-time routing reason. + */ + sessionScope?: "per-sender" | "global"; logPath?: string; deliveryContext?: DeliveryContext; surfaceUpdates?: boolean; @@ -180,11 +195,16 @@ export function startAcpSpawnParentStreamRelay(params: { return; } requestHeartbeat( - scopedHeartbeatWakeOptions(parentSessionKey, { - source: "acp-spawn", - intent: "event", - reason: "acp:spawn:stream", - }), + scopedHeartbeatWakeOptions( + parentSessionKey, + { + source: "acp-spawn", + intent: "event", + reason: "acp:spawn:stream", + }, + params.mainKey, + params.sessionScope, + ), ); }; const emit = (text: string, contextKey: string) => { @@ -197,7 +217,7 @@ export function startAcpSpawnParentStreamRelay(params: { return; } enqueueSystemEvent(cleaned, { - sessionKey: parentSessionKey, + sessionKey: resolveEventSessionKey(parentSessionKey, params.mainKey, params.sessionScope), contextKey, deliveryContext: params.deliveryContext, trusted: false, diff --git a/src/agents/acp-spawn.ts b/src/agents/acp-spawn.ts index b6297e7ff25..1b41c5cbc58 100644 --- a/src/agents/acp-spawn.ts +++ b/src/agents/acp-spawn.ts @@ -1371,6 +1371,8 @@ export async function spawnAcpDirect( parentSessionKey, childSessionKey: sessionKey, agentId: targetAgentId, + mainKey: cfg.session?.mainKey, + sessionScope: cfg.session?.scope, logPath: streamLogPath, deliveryContext: parentDeliveryCtx, emitStartNotice: false, @@ -1424,6 +1426,8 @@ export async function spawnAcpDirect( parentSessionKey, childSessionKey: sessionKey, agentId: targetAgentId, + mainKey: cfg.session?.mainKey, + sessionScope: cfg.session?.scope, logPath: streamLogPath, deliveryContext: parentDeliveryCtx, emitStartNotice: false, diff --git a/src/agents/bash-process-registry.ts b/src/agents/bash-process-registry.ts index 26ee5c1de4c..51cf73bc204 100644 --- a/src/agents/bash-process-registry.ts +++ b/src/agents/bash-process-registry.ts @@ -35,6 +35,17 @@ export interface ProcessSession { command: string; scopeKey?: string; sessionKey?: string; + /** `session.mainKey` from the runtime config, snapshotted at exec start. + * Used by background-exit notifications to remap cron-run keys to the + * agent's main queue without an ambient config load. If config changes + * while the process runs, the exit notification follows the start-time + * session contract. */ + mainKey?: string; + /** `session.scope` from the runtime config; required so the cron-run remap + * can route global-scope agents to the literal "global" queue instead + * of an agent-main queue the heartbeat never drains. Snapshotted with + * `mainKey` for the same start-time routing reason. */ + sessionScope?: "per-sender" | "global"; notifyDeliveryContext?: DeliveryContext; notifyOnExit?: boolean; notifyOnExitEmptySuccess?: boolean; diff --git a/src/agents/bash-tools.exec-runtime.test.ts b/src/agents/bash-tools.exec-runtime.test.ts index e734b8dc3b4..3bdc06fec21 100644 --- a/src/agents/bash-tools.exec-runtime.test.ts +++ b/src/agents/bash-tools.exec-runtime.test.ts @@ -489,6 +489,49 @@ describe("emitExecSystemEvent", () => { expect(heartbeat.sessionKey).toBe("agent:ops:main"); }); + it("remaps cron-run event enqueue and wake targets to the drained agent main session", () => { + emitExecSystemEvent("Exec finished", { + sessionKey: "agent:ops:cron:nightly:run:run-1", + contextKey: "exec:run-cron", + mainKey: "primary", + }); + + expect(enqueueSystemEventMock).toHaveBeenCalledWith("Exec finished", { + sessionKey: "agent:ops:primary", + contextKey: "exec:run-cron", + trusted: false, + }); + expect(requestHeartbeatMock).toHaveBeenCalledWith( + expect.objectContaining({ + coalesceMs: 0, + reason: "exec-event", + sessionKey: "agent:ops:primary", + }), + ); + }); + + it("routes global-scope cron-run events to the global queue and preserves the agent wake target", () => { + emitExecSystemEvent("Exec finished", { + sessionKey: "agent:ops:cron:nightly:run:run-1:subagent:worker", + contextKey: "exec:run-global", + sessionScope: "global", + }); + + expect(enqueueSystemEventMock).toHaveBeenCalledWith("Exec finished", { + sessionKey: "global", + contextKey: "exec:run-global", + trusted: false, + }); + expect(requestHeartbeatMock).toHaveBeenCalledWith( + expect.objectContaining({ + agentId: "ops", + coalesceMs: 0, + reason: "exec-event", + }), + ); + expect(requestHeartbeatMock.mock.calls[0]?.[0]).not.toHaveProperty("sessionKey"); + }); + it("keeps wake unscoped for non-agent session keys", () => { emitExecSystemEvent("Exec finished", { sessionKey: "global", diff --git a/src/agents/bash-tools.exec-runtime.ts b/src/agents/bash-tools.exec-runtime.ts index dc03d060faa..12285db1e9a 100644 --- a/src/agents/bash-tools.exec-runtime.ts +++ b/src/agents/bash-tools.exec-runtime.ts @@ -12,7 +12,7 @@ import { requestHeartbeat } from "../infra/heartbeat-wake.js"; import { isDangerousHostInheritedEnvVarName } from "../infra/host-env-security.js"; import { findPathKey, mergePathPrepend } from "../infra/path-prepend.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; -import { scopedHeartbeatWakeOptions } from "../routing/session-key.js"; +import { resolveEventSessionKey, scopedHeartbeatWakeOptions } from "../routing/session-key.js"; import type { ProcessSession } from "./bash-process-registry.js"; import type { ExecToolDetails } from "./bash-tools.exec-types.js"; import type { BashSandboxConfig } from "./bash-tools.shared.js"; @@ -340,17 +340,22 @@ function maybeNotifyOnExit(session: ProcessSession, status: "completed" | "faile ? `Exec ${status} (${session.id.slice(0, 8)}, ${exitLabel}) :: ${output}` : `Exec ${status} (${session.id.slice(0, 8)}, ${exitLabel})`; enqueueSystemEvent(summary, { - sessionKey, + sessionKey: resolveEventSessionKey(sessionKey, session.mainKey, session.sessionScope), deliveryContext: session.notifyDeliveryContext, trusted: false, }); requestHeartbeat( - scopedHeartbeatWakeOptions(sessionKey, { - source: "exec-event", - intent: "event", - reason: "exec-event", - coalesceMs: 0, - }), + scopedHeartbeatWakeOptions( + sessionKey, + { + source: "exec-event", + intent: "event", + reason: "exec-event", + coalesceMs: 0, + }, + session.mainKey, + session.sessionScope, + ), ); } @@ -416,25 +421,40 @@ export function resolveApprovalRunningNoticeMs(value?: number) { export function emitExecSystemEvent( text: string, - opts: { sessionKey?: string; contextKey?: string; deliveryContext?: DeliveryContext }, + opts: { + sessionKey?: string; + contextKey?: string; + deliveryContext?: DeliveryContext; + /** `session.mainKey` from the runtime config; pass-through of `undefined` + * falls back to the literal "main" default in `resolveEventSessionKey`. */ + mainKey?: string; + /** `session.scope` from the runtime config; needed so global-scope + * agents route cron-run events to the "global" queue. */ + sessionScope?: "per-sender" | "global"; + }, ) { const sessionKey = opts.sessionKey?.trim(); if (!sessionKey) { return; } enqueueSystemEvent(text, { - sessionKey, + sessionKey: resolveEventSessionKey(sessionKey, opts.mainKey, opts.sessionScope), contextKey: opts.contextKey, deliveryContext: opts.deliveryContext, trusted: false, }); requestHeartbeat( - scopedHeartbeatWakeOptions(sessionKey, { - source: "exec-event", - intent: "event", - reason: "exec-event", - coalesceMs: 0, - }), + scopedHeartbeatWakeOptions( + sessionKey, + { + source: "exec-event", + intent: "event", + reason: "exec-event", + coalesceMs: 0, + }, + opts.mainKey, + opts.sessionScope, + ), ); } @@ -568,6 +588,15 @@ export async function runExecProcess(opts: { notifyOnExitEmptySuccess?: boolean; scopeKey?: string; sessionKey?: string; + /** `session.mainKey` from the runtime config; snapshotted onto the + * ProcessSession so background-exit notifications can remap cron-run + * keys without an ambient config load. Long-running background exits use + * this start-time value even if config changes while the process runs. */ + mainKey?: string; + /** `session.scope` from the runtime config; snapshotted alongside + * `mainKey` so the cron-run remap can route global-scope agents to + * the "global" queue instead of agent-main. */ + sessionScope?: "per-sender" | "global"; notifyDeliveryContext?: DeliveryContext; timeoutSec: number | null; onUpdate?: (partialResult: AgentToolResult) => void; @@ -587,6 +616,8 @@ export async function runExecProcess(opts: { command: opts.command, scopeKey: opts.scopeKey, sessionKey: opts.sessionKey, + mainKey: opts.mainKey, + sessionScope: opts.sessionScope, notifyDeliveryContext: normalizeDeliveryContext(opts.notifyDeliveryContext), notifyOnExit: opts.notifyOnExit, notifyOnExitEmptySuccess: opts.notifyOnExitEmptySuccess === true, diff --git a/src/agents/bash-tools.exec-types.ts b/src/agents/bash-tools.exec-types.ts index b9e12e7fb61..9da0e83244a 100644 --- a/src/agents/bash-tools.exec-types.ts +++ b/src/agents/bash-tools.exec-types.ts @@ -29,6 +29,14 @@ export type ExecToolDefaults = { allowBackground?: boolean; scopeKey?: string; sessionKey?: string; + /** `session.mainKey` from the runtime config; passed through into + * runExecProcess so background-exit notifications can remap cron-run + * session keys to the agent's main queue without an ambient config load. */ + mainKey?: string; + /** `session.scope` from the runtime config; passed alongside `mainKey` + * so the cron-run remap can route global-scope agents to the "global" + * queue instead of agent-main. */ + sessionScope?: "per-sender" | "global"; messageProvider?: string; currentChannelId?: string; currentThreadTs?: string; diff --git a/src/agents/bash-tools.exec.ts b/src/agents/bash-tools.exec.ts index a707676ac86..5beb58f5573 100644 --- a/src/agents/bash-tools.exec.ts +++ b/src/agents/bash-tools.exec.ts @@ -1572,6 +1572,8 @@ export function createExecTool( notifyOnExitEmptySuccess, scopeKey: defaults?.scopeKey, sessionKey: notifySessionKey, + mainKey: defaults?.mainKey, + sessionScope: defaults?.sessionScope, notifyDeliveryContext, timeoutSec: effectiveTimeout, onUpdate, diff --git a/src/agents/cli-runner/execute.ts b/src/agents/cli-runner/execute.ts index 62b6610cd49..26dd99f8073 100644 --- a/src/agents/cli-runner/execute.ts +++ b/src/agents/cli-runner/execute.ts @@ -6,7 +6,7 @@ import { requestHeartbeat as requestHeartbeatImpl } from "../../infra/heartbeat- import { sanitizeHostExecEnv } from "../../infra/host-env-security.js"; import { enqueueSystemEvent as enqueueSystemEventImpl } from "../../infra/system-events.js"; import { getProcessSupervisor as getProcessSupervisorImpl } from "../../process/supervisor/index.js"; -import { scopedHeartbeatWakeOptions } from "../../routing/session-key.js"; +import { resolveEventSessionKey, scopedHeartbeatWakeOptions } from "../../routing/session-key.js"; import { appendBootstrapPromptWarning } from "../bootstrap-budget.js"; import { createCliJsonlStreamingParser, @@ -640,13 +640,26 @@ export async function executePreparedCliRun( "It may have been waiting for interactive input or an approval prompt.", "For Claude Code, prefer --permission-mode bypassPermissions --print.", ].join(" "); - executeDeps.enqueueSystemEvent(stallNotice, { sessionKey: params.sessionKey }); + const watchdogMainKey = params.config?.session?.mainKey; + const watchdogScope = params.config?.session?.scope; + executeDeps.enqueueSystemEvent(stallNotice, { + sessionKey: resolveEventSessionKey( + params.sessionKey, + watchdogMainKey, + watchdogScope, + ), + }); executeDeps.requestHeartbeat( - scopedHeartbeatWakeOptions(params.sessionKey, { - source: "cli-watchdog", - intent: "event", - reason: "cli:watchdog:stall", - }), + scopedHeartbeatWakeOptions( + params.sessionKey, + { + source: "cli-watchdog", + intent: "event", + reason: "cli:watchdog:stall", + }, + watchdogMainKey, + watchdogScope, + ), ); } throw new FailoverError(timeoutReason, { diff --git a/src/agents/pi-tools.ts b/src/agents/pi-tools.ts index 824a787a6ab..2e848fc5fe9 100644 --- a/src/agents/pi-tools.ts +++ b/src/agents/pi-tools.ts @@ -673,6 +673,8 @@ export function createOpenClawCodingTools(options?: { allowBackground, scopeKey, sessionKey: options?.sessionKey, + mainKey: options?.config?.session?.mainKey, + sessionScope: options?.config?.session?.scope, messageProvider: options?.messageProvider, currentChannelId: options?.currentChannelId, currentThreadTs: options?.currentThreadTs, diff --git a/src/auto-reply/reply/bash-command.ts b/src/auto-reply/reply/bash-command.ts index aee4883dcba..c81d3c4b1a5 100644 --- a/src/auto-reply/reply/bash-command.ts +++ b/src/auto-reply/reply/bash-command.ts @@ -352,6 +352,8 @@ export async function handleBashChatCommand(params: { allowBackground: true, timeoutSec, sessionKey: params.sessionKey, + mainKey: params.cfg.session?.mainKey, + sessionScope: params.cfg.session?.scope, notifyOnExit, notifyOnExitEmptySuccess, elevated: { diff --git a/src/auto-reply/reply/commands-diagnostics.ts b/src/auto-reply/reply/commands-diagnostics.ts index 5716a3cc159..d8d5498e715 100644 --- a/src/auto-reply/reply/commands-diagnostics.ts +++ b/src/auto-reply/reply/commands-diagnostics.ts @@ -308,6 +308,8 @@ async function requestGatewayDiagnosticsExportApproval( cwd: params.workspaceDir, agentId, sessionKey: params.sessionKey, + mainKey: params.cfg.session?.mainKey, + sessionScope: params.cfg.session?.scope, messageProvider: options.privateApprovalTarget?.channel ?? params.command.channel, currentChannelId: options.privateApprovalTarget?.to ?? readCommandDeliveryTarget(params), currentThreadTs: options.privateApprovalTarget diff --git a/src/auto-reply/reply/commands-export-trajectory.ts b/src/auto-reply/reply/commands-export-trajectory.ts index b0359d414a4..77bc3f98127 100644 --- a/src/auto-reply/reply/commands-export-trajectory.ts +++ b/src/auto-reply/reply/commands-export-trajectory.ts @@ -246,6 +246,8 @@ async function requestTrajectoryExportApproval( cwd: params.workspaceDir, agentId, sessionKey: params.sessionKey, + mainKey: params.cfg.session?.mainKey, + sessionScope: params.cfg.session?.scope, messageProvider: options.privateApprovalTarget?.channel ?? params.command.channel, currentChannelId: options.privateApprovalTarget?.to ?? readCommandDeliveryTarget(params), currentThreadTs: options.privateApprovalTarget diff --git a/src/cron/session-reaper.test.ts b/src/cron/session-reaper.test.ts index acbac6dbf7c..d093f1e5104 100644 --- a/src/cron/session-reaper.test.ts +++ b/src/cron/session-reaper.test.ts @@ -45,6 +45,11 @@ describe("isCronRunSessionKey", () => { expect(isCronRunSessionKey("agent:debugger:cron:249ecf82:run:1102aabb")).toBe(true); }); + it("matches cron run descendant session keys", () => { + expect(isCronRunSessionKey("agent:main:cron:abc-123:run:def-456:subagent:worker")).toBe(true); + expect(isCronRunSessionKey("agent:main:cron:abc-123:run:def-456:thread:reply")).toBe(true); + }); + it("does not match base cron session keys", () => { expect(isCronRunSessionKey("agent:main:cron:abc-123")).toBe(false); }); @@ -81,10 +86,18 @@ describe("sweepCronRunSessions", () => { sessionId: "old-run", updatedAt: now - 25 * 3_600_000, // 25h ago — expired }, + "agent:main:cron:job1:run:old-run:subagent:worker": { + sessionId: "old-run-child", + updatedAt: now - 25 * 3_600_000, // expired cron-run descendant + }, "agent:main:cron:job1:run:recent-run": { sessionId: "recent-run", updatedAt: now - 1 * 3_600_000, // 1h ago — not expired }, + "agent:main:cron:job1:run:recent-run:thread:reply": { + sessionId: "recent-run-thread", + updatedAt: now - 1 * 3_600_000, // active cron-run descendant + }, "agent:main:telegram:dm:123": { sessionId: "regular-session", updatedAt: now - 100 * 3_600_000, // old but not a cron run @@ -100,14 +113,18 @@ describe("sweepCronRunSessions", () => { }); expect(result.swept).toBe(true); - expect(result.pruned).toBe(1); + expect(result.pruned).toBe(2); const updated = JSON.parse(fs.readFileSync(storePath, "utf-8")); expect(updated["agent:main:cron:job1"]).toMatchObject({ sessionId: "base-session" }); expect(updated["agent:main:cron:job1:run:old-run"]).toBeUndefined(); + expect(updated["agent:main:cron:job1:run:old-run:subagent:worker"]).toBeUndefined(); expect(updated["agent:main:cron:job1:run:recent-run"]).toMatchObject({ sessionId: "recent-run", }); + expect(updated["agent:main:cron:job1:run:recent-run:thread:reply"]).toMatchObject({ + sessionId: "recent-run-thread", + }); expect(updated["agent:main:telegram:dm:123"]).toMatchObject({ sessionId: "regular-session", }); diff --git a/src/gateway/server-node-events.ts b/src/gateway/server-node-events.ts index e44ce6b3565..48ed7f488ef 100644 --- a/src/gateway/server-node-events.ts +++ b/src/gateway/server-node-events.ts @@ -4,6 +4,7 @@ import { updatePairedDeviceMetadata } from "../infra/device-pairing.js"; import { formatErrorMessage } from "../infra/errors.js"; import { updatePairedNodeMetadata } from "../infra/node-pairing.js"; import type { PromptImageOrderEntry } from "../media/prompt-image-order.js"; +import { resolveEventSessionKey } from "../routing/session-key.js"; import { NODE_PRESENCE_ALIVE_EVENT, normalizeNodePresenceAliveReason, @@ -750,7 +751,7 @@ export const handleNodeEvent = async ( } const queued = enqueueSystemEvent(text, { - sessionKey, + sessionKey: resolveEventSessionKey(sessionKey, cfg.session?.mainKey, cfg.session?.scope), contextKey: runId ? `exec:${runId}` : "exec", trusted: false, }); @@ -759,12 +760,17 @@ export const handleNodeEvent = async ( // keys should keep legacy unscoped behavior so enabled non-main heartbeat // agents still run when no explicit agent session is provided. requestHeartbeat( - scopedHeartbeatWakeOptions(sessionKey, { - source: "exec-event", - intent: "event", - reason: "exec-event", - coalesceMs: 0, - }), + scopedHeartbeatWakeOptions( + sessionKey, + { + source: "exec-event", + intent: "event", + reason: "exec-event", + coalesceMs: 0, + }, + cfg.session?.mainKey, + cfg.session?.scope, + ), ); } return undefined; diff --git a/src/infra/heartbeat-events-filter.test.ts b/src/infra/heartbeat-events-filter.test.ts index a9bff966fb0..6ebec95ec44 100644 --- a/src/infra/heartbeat-events-filter.test.ts +++ b/src/infra/heartbeat-events-filter.test.ts @@ -176,3 +176,42 @@ describe("heartbeat event classification", () => { expect(isRelayableExecCompletionEvent(value)).toBe(expected); }); }); + +describe("isExecCompletionEvent", () => { + it("matches emitExecSystemEvent (gateway/node approval path) events", () => { + expect(isExecCompletionEvent("Exec finished (gateway id=g1, session=s1, code 0)")).toBe(true); + expect(isExecCompletionEvent("exec finished (node=n1, code 1)\nsome output")).toBe(true); + }); + + it("matches maybeNotifyOnExit (backgrounded allowlisted commands) events", () => { + // Word-based session slugs (createSessionSlug) + expect(isExecCompletionEvent("Exec completed (amber-at, code 0) :: some output")).toBe(true); + expect(isExecCompletionEvent("Exec completed (calm-del, code 0)")).toBe(true); + expect(isExecCompletionEvent("Exec failed (brisk-no, code 1) :: error text")).toBe(true); + expect(isExecCompletionEvent("Exec failed (fresh-ke, signal SIGTERM)")).toBe(true); + // Hex-style IDs also accepted + expect(isExecCompletionEvent("Exec completed (abc12345, code 0)")).toBe(true); + }); + + it("is case-insensitive", () => { + expect(isExecCompletionEvent("EXEC COMPLETED (abc12345, code 0)")).toBe(true); + expect(isExecCompletionEvent("exec failed (abc12345, code 2)")).toBe(true); + }); + + it("does not match non-exec events", () => { + expect(isExecCompletionEvent("Exec running (gateway id=g1, session=s1, >5s): ls")).toBe(false); + expect(isExecCompletionEvent("Exec denied (gateway id=g1, reason): rm -rf /")).toBe(false); + expect(isExecCompletionEvent("Heartbeat wake")).toBe(false); + expect(isExecCompletionEvent("")).toBe(false); + }); + + it("does not false-positive on free-form cron text containing exec phrases", () => { + expect(isExecCompletionEvent("Nightly backup exec failed – see logs")).toBe(false); + expect(isExecCompletionEvent("Cron: check if exec completed successfully")).toBe(false); + expect(isExecCompletionEvent("exec killed the process manually")).toBe(false); + expect(isExecCompletionEvent("Exec finished weekly backup checks")).toBe(false); + // Parenthesized false positive from review feedback — must not match mid-string + expect(isExecCompletionEvent("Nightly backup exec failed (see logs)")).toBe(false); + expect(isExecCompletionEvent("Check: exec completed (last run was yesterday)")).toBe(false); + }); +}); diff --git a/src/routing/session-key.test.ts b/src/routing/session-key.test.ts index 0bf5294ae66..fd6f2e24c4a 100644 --- a/src/routing/session-key.test.ts +++ b/src/routing/session-key.test.ts @@ -10,6 +10,8 @@ import { classifySessionKeyShape, isValidAgentId, parseAgentSessionKey, + resolveEventSessionKey, + scopedHeartbeatWakeOptions, toAgentStoreSessionKey, } from "./session-key.js"; @@ -61,6 +63,7 @@ describe("isCronSessionKey", () => { it.each([ { key: "agent:main:cron:job-1", expected: true }, { key: "agent:main:cron:job-1:run:run-1", expected: true }, + { key: "agent:main:cron:job-1:run:run-1:subagent:worker", expected: true }, { key: "agent:main:main", expected: false }, { key: "agent:main:subagent:worker", expected: false }, { key: "cron:job-1", expected: false }, @@ -158,6 +161,134 @@ describe("session key canonicalization", () => { }); }); +describe("scopedHeartbeatWakeOptions", () => { + it("remaps ephemeral cron run sessions to agent main key", () => { + const result = scopedHeartbeatWakeOptions("agent:main:cron:backup:run:abc", { + reason: "exec:123:exit", + }); + expect(result).toEqual({ reason: "exec:123:exit", sessionKey: "agent:main:main" }); + }); + + it("preserves durable cron base sessions (not remapped)", () => { + const result = scopedHeartbeatWakeOptions("agent:main:cron:backup", { + reason: "exec:123:exit", + }); + expect(result).toEqual({ reason: "exec:123:exit", sessionKey: "agent:main:cron:backup" }); + }); + + it("preserves sessionKey for regular agent sessions", () => { + const result = scopedHeartbeatWakeOptions("agent:main:main", { + reason: "exec:123:exit", + }); + expect(result).toEqual({ reason: "exec:123:exit", sessionKey: "agent:main:main" }); + }); + + it("strips sessionKey for non-agent keys", () => { + const result = scopedHeartbeatWakeOptions("main", { reason: "test" }); + expect(result).toEqual({ reason: "test" }); + expect("sessionKey" in result).toBe(false); + }); + + it("strips sessionKey for global-scope sessions to preserve unscoped wake behavior", () => { + // In session.scope = "global" setups, resolveMainSessionKeyFromConfig() returns "global". + // Passing "global" as sessionKey into requestHeartbeatNow would create a targeted wake + // that can fail to resolve, breaking hook-triggered heartbeats. scopedHeartbeatWakeOptions + // must strip it to preserve the old unscoped behavior. + const result = scopedHeartbeatWakeOptions("global", { reason: "hook:wake" }); + expect(result).toEqual({ reason: "hook:wake" }); + expect("sessionKey" in result).toBe(false); + }); + + it("drops sessionKey but preserves agentId for cron-run keys when scope is global", () => { + // Global-scope agents drain the "global" queue automatically; a targeted + // wake on agent::main would be unresolvable. Carry the agent target + // so multi-agent global-scope setups still wake the originating agent. + const result = scopedHeartbeatWakeOptions( + "agent:ops:cron:job-1:run:xyz", + { reason: "exec-event" }, + undefined, + "global", + ); + expect(result).toEqual({ reason: "exec-event", agentId: "ops" }); + expect("sessionKey" in result).toBe(false); + }); + + it("threads custom mainKey for cron-run keys under per-sender scope", () => { + const result = scopedHeartbeatWakeOptions( + "agent:main:cron:backup:run:abc", + { reason: "exec-event" }, + "primary", + "per-sender", + ); + expect(result).toEqual({ reason: "exec-event", sessionKey: "agent:main:primary" }); + }); +}); + +describe("resolveEventSessionKey", () => { + it("remaps ephemeral cron run session keys to agent main session key", () => { + expect(resolveEventSessionKey("agent:main:cron:backup:run:abc123")).toBe("agent:main:main"); + expect(resolveEventSessionKey("agent:ops:cron:job-1:run:xyz")).toBe("agent:ops:main"); + }); + + it("collapses cron-run descendant session keys to the agent main session key", () => { + expect(resolveEventSessionKey("agent:main:cron:backup:run:abc123:subagent:worker")).toBe( + "agent:main:main", + ); + expect(resolveEventSessionKey("agent:ops:cron:job-1:run:xyz:thread:reply")).toBe( + "agent:ops:main", + ); + }); + + it("preserves durable cron base session keys", () => { + expect(resolveEventSessionKey("agent:ops:cron:job-1")).toBe("agent:ops:cron:job-1"); + expect(resolveEventSessionKey("agent:main:cron:backup")).toBe("agent:main:cron:backup"); + }); + + it("respects custom mainKey for ephemeral cron session remapping", () => { + expect(resolveEventSessionKey("agent:main:cron:backup:run:abc123", "primary")).toBe( + "agent:main:primary", + ); + expect(resolveEventSessionKey("agent:ops:cron:job-1:run:xyz", "primary")).toBe( + "agent:ops:primary", + ); + }); + + it("passes through non-cron session keys unchanged", () => { + expect(resolveEventSessionKey("agent:main:main")).toBe("agent:main:main"); + expect(resolveEventSessionKey("agent:main:discord:direct:user1")).toBe( + "agent:main:discord:direct:user1", + ); + }); + + it("passes through non-agent keys unchanged", () => { + expect(resolveEventSessionKey("main")).toBe("main"); + expect(resolveEventSessionKey("global")).toBe("global"); + }); + + it("routes cron-run keys to the global queue when scope is global", () => { + // resolveHeartbeatSession drains the literal "global" queue for global-scope + // sessions; remapping to agent::main would strand the event. + expect(resolveEventSessionKey("agent:ops:cron:job-1:run:xyz", undefined, "global")).toBe( + "global", + ); + expect(resolveEventSessionKey("agent:main:cron:backup:run:abc", "primary", "global")).toBe( + "global", + ); + expect( + resolveEventSessionKey("agent:main:cron:backup:run:abc:subagent:worker", "primary", "global"), + ).toBe("global"); + }); + + it("treats explicit per-sender scope identically to omitted scope", () => { + expect( + resolveEventSessionKey("agent:main:cron:backup:run:abc123", undefined, "per-sender"), + ).toBe("agent:main:main"); + expect( + resolveEventSessionKey("agent:main:cron:backup:run:abc123", "primary", "per-sender"), + ).toBe("agent:main:primary"); + }); +}); + describe("isValidAgentId", () => { it.each([ { input: "main", expected: true }, diff --git a/src/routing/session-key.ts b/src/routing/session-key.ts index 6f2129bf4ca..03e982bf21b 100644 --- a/src/routing/session-key.ts +++ b/src/routing/session-key.ts @@ -1,5 +1,5 @@ import type { ChatType } from "../channels/chat-type.js"; -import { parseAgentSessionKey } from "../sessions/session-key-utils.js"; +import { isCronRunSessionKey, parseAgentSessionKey } from "../sessions/session-key-utils.js"; import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; import { normalizeAccountId } from "./account-id.js"; @@ -35,8 +35,44 @@ function normalizeToken(value: string | undefined | null): string { export function scopedHeartbeatWakeOptions( sessionKey: string, wakeOptions: T, -): T | (T & { sessionKey: string }) { - return parseAgentSessionKey(sessionKey) ? { ...wakeOptions, sessionKey } : wakeOptions; + mainKey?: string, + scope?: "per-sender" | "global", +): T | (T & { sessionKey: string }) | (T & { agentId: string }) { + const parsed = parseAgentSessionKey(sessionKey); + if (!parsed) { + return wakeOptions; + } + if (isCronRunSessionKey(sessionKey)) { + // Global-scope agents drain the literal "global" queue, not agent-main; + // a targeted wake on agent::main would be unresolvable. Drop the + // sessionKey but carry the agent target so multi-agent global-scope + // setups still wake the originating agent's heartbeat. + if (scope === "global") { + return { ...wakeOptions, agentId: parsed.agentId }; + } + return { + ...wakeOptions, + sessionKey: buildAgentMainSessionKey({ agentId: parsed.agentId, mainKey }), + }; + } + return { ...wakeOptions, sessionKey }; +} + +export function resolveEventSessionKey( + sessionKey: string, + mainKey?: string, + scope?: "per-sender" | "global", +): string { + const parsed = parseAgentSessionKey(sessionKey); + if (!parsed || !isCronRunSessionKey(sessionKey)) { + return sessionKey; + } + // Global-scope agents enqueue/drain via the literal "global" queue; agent-main + // would strand the event in a queue the heartbeat never peeks. + if (scope === "global") { + return "global"; + } + return buildAgentMainSessionKey({ agentId: parsed.agentId, mainKey }); } export function normalizeMainKey(value: string | undefined | null): string { diff --git a/src/sessions/session-key-utils.ts b/src/sessions/session-key-utils.ts index d5447921748..8a4750ae33d 100644 --- a/src/sessions/session-key-utils.ts +++ b/src/sessions/session-key-utils.ts @@ -52,7 +52,7 @@ export function isCronRunSessionKey(sessionKey: string | undefined | null): bool if (!parsed) { return false; } - return /^cron:[^:]+:run:[^:]+$/.test(parsed.rest); + return /^cron:[^:]+:run:[^:]+(?::|$)/.test(parsed.rest); } export function isCronSessionKey(sessionKey: string | undefined | null): boolean {