From e554c59aac684c19bee394d18341027c8469bd35 Mon Sep 17 00:00:00 2001 From: Tyler Yust <64381258+tyler6204@users.noreply.github.com> Date: Sat, 7 Mar 2026 12:13:37 -0800 Subject: [PATCH] fix(cron): eliminate double-announce and replace delivery polling with push-based flow (#39089) * fix(cron): eliminate double-announce and replace delivery polling with push-based flow - Set deliveryAttempted=true in announce early-return paths (active-subagent suppression and stale-interim suppression) so the heartbeat timer no longer fires a redundant enqueueSystemEvent fallback (double-announce bug). - Refactor waitForDescendantSubagentSummary to use event-based agent.wait RPC calls instead of a 500ms busy-poll loop. Each active descendant run is now awaited concurrently via Promise.allSettled, and only a short bounded grace period (5s) remains to capture the cron agent's post-orchestration synthesis. Eliminates O(n*timeoutMs/500ms) gateway calls and wasted wall-clock time. - Add FAST_TEST_MODE (OPENCLAW_TEST_FAST=1) to subagent-followup.ts to keep the grace-period tests instant in CI. - Add comprehensive tests for the new waitForDescendantSubagentSummary behaviour (push-based wait, error resilience, NO_REPLY handling, multi-descendant waits). * fix: prep cron double-announce followup tests (#39089) (thanks @tyler6204) --- CHANGELOG.md | 1 + .../delivery-dispatch.double-announce.test.ts | 271 ++++++++++++++++++ src/cron/isolated-agent/delivery-dispatch.ts | 24 +- .../isolated-agent/subagent-followup.test.ts | 263 ++++++++++++++++- src/cron/isolated-agent/subagent-followup.ts | 94 ++++-- 5 files changed, 620 insertions(+), 33 deletions(-) create mode 100644 src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index cb20f9f0203..71a864bdd7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -628,6 +628,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Cron/announce delivery: stop duplicate completion announces when cron early-return paths already handled delivery, and replace descendant followup polling with push-based waits so cron summaries arrive without the old busy-loop fallback. (#39089) Thanks @tyler6204. - Dashboard/macOS auth handling: switch the macOS “Open Dashboard” flow from query-string token injection to URL fragments, stop persisting Control UI gateway tokens in browser localStorage, and scrub legacy stored tokens on load. Thanks @JNX03 for reporting. - Models/provider config precedence: prefer exact `models.providers.` matches before normalized provider aliases in embedded model resolution, preventing alias/canonical key collisions from applying the wrong provider `api`, `baseUrl`, or headers. (#35934) thanks @RealKai42. - Hooks/auth throttling: reject non-`POST` `/hooks/*` requests before auth-failure accounting so unsupported methods can no longer burn the hook auth lockout budget and block legitimate webhook delivery. Thanks @JNX03 for reporting. diff --git a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts new file mode 100644 index 00000000000..b41b88576e9 --- /dev/null +++ b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts @@ -0,0 +1,271 @@ +/** + * Tests for the double-announce bug in cron delivery dispatch. + * + * Bug: early return paths in deliverViaAnnounce (active subagent suppression + * and stale interim message suppression) returned without setting + * deliveryAttempted = true. The timer saw deliveryAttempted = false and + * fired enqueueSystemEvent as a fallback, causing a second announcement. + * + * Fix: both early return paths now set deliveryAttempted = true before + * returning so the timer correctly skips the system-event fallback. + */ + +import { beforeEach, describe, expect, it, vi } from "vitest"; + +// --- Module mocks (must be hoisted before imports) --- + +vi.mock("../../agents/subagent-announce.js", () => ({ + runSubagentAnnounceFlow: vi.fn().mockResolvedValue(true), +})); + +vi.mock("../../agents/subagent-registry.js", () => ({ + countActiveDescendantRuns: vi.fn().mockReturnValue(0), +})); + +vi.mock("../../config/sessions.js", () => ({ + resolveAgentMainSessionKey: vi.fn().mockReturnValue("agent:main"), +})); + +vi.mock("../../infra/outbound/outbound-session.js", () => ({ + resolveOutboundSessionRoute: vi.fn().mockResolvedValue(null), + ensureOutboundSessionEntry: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock("../../infra/outbound/deliver.js", () => ({ + deliverOutboundPayloads: vi.fn().mockResolvedValue([{ ok: true }]), +})); + +vi.mock("../../infra/outbound/identity.js", () => ({ + resolveAgentOutboundIdentity: vi.fn().mockReturnValue({}), +})); + +vi.mock("../../infra/outbound/session-context.js", () => ({ + buildOutboundSessionContext: vi.fn().mockReturnValue({}), +})); + +vi.mock("../../cli/outbound-send-deps.js", () => ({ + createOutboundSendDeps: vi.fn().mockReturnValue({}), +})); + +vi.mock("../../logger.js", () => ({ + logWarn: vi.fn(), +})); + +vi.mock("./subagent-followup.js", () => ({ + expectsSubagentFollowup: vi.fn().mockReturnValue(false), + isLikelyInterimCronMessage: vi.fn().mockReturnValue(false), + readDescendantSubagentFallbackReply: vi.fn().mockResolvedValue(undefined), + waitForDescendantSubagentSummary: vi.fn().mockResolvedValue(undefined), +})); + +import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js"; +// Import after mocks +import { countActiveDescendantRuns } from "../../agents/subagent-registry.js"; +import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js"; +import { dispatchCronDelivery } from "./delivery-dispatch.js"; +import type { DeliveryTargetResolution } from "./delivery-target.js"; +import type { RunCronAgentTurnResult } from "./run.js"; +import { + expectsSubagentFollowup, + isLikelyInterimCronMessage, + readDescendantSubagentFallbackReply, + waitForDescendantSubagentSummary, +} from "./subagent-followup.js"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeResolvedDelivery(): Extract { + return { + ok: true, + channel: "telegram", + to: "123456", + accountId: undefined, + threadId: undefined, + }; +} + +function makeWithRunSession() { + return ( + result: Omit, + ): RunCronAgentTurnResult => ({ + ...result, + sessionId: "test-session-id", + sessionKey: "test-session-key", + }); +} + +function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested?: boolean }) { + const resolvedDelivery = makeResolvedDelivery(); + return { + cfg: {} as never, + cfgWithAgentDefaults: {} as never, + deps: {} as never, + job: { + id: "test-job", + name: "Test Job", + deleteAfterRun: false, + payload: { kind: "agentTurn", message: "hello" }, + } as never, + agentId: "main", + agentSessionKey: "agent:main", + runSessionId: "run-123", + runStartedAt: Date.now(), + runEndedAt: Date.now(), + timeoutMs: 30_000, + resolvedDelivery, + deliveryRequested: overrides.deliveryRequested ?? true, + skipHeartbeatDelivery: false, + skipMessagingToolDelivery: false, + deliveryBestEffort: false, + deliveryPayloadHasStructuredContent: false, + deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [], + synthesizedText: overrides.synthesizedText ?? "on it", + summary: overrides.synthesizedText ?? "on it", + outputText: overrides.synthesizedText ?? "on it", + telemetry: undefined, + abortSignal: undefined, + isAborted: () => false, + abortReason: () => "aborted", + withRunSession: makeWithRunSession(), + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("dispatchCronDelivery — double-announce guard", () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(expectsSubagentFollowup).mockReturnValue(false); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(undefined); + vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined); + vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true); + }); + + it("early return (active subagent) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => { + // countActiveDescendantRuns returns >0 → enters wait block; still >0 after wait → early return + vi.mocked(countActiveDescendantRuns).mockReturnValue(2); + vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined); + vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(undefined); + + const params = makeBaseParams({ synthesizedText: "on it" }); + const state = await dispatchCronDelivery(params); + + // deliveryAttempted must be true so timer does NOT fire enqueueSystemEvent + expect(state.deliveryAttempted).toBe(true); + + // Verify timer guard agrees: shouldEnqueueCronMainSummary returns false + expect( + shouldEnqueueCronMainSummary({ + summaryText: "on it", + deliveryRequested: true, + delivered: state.delivered, + deliveryAttempted: state.deliveryAttempted, + suppressMainSummary: false, + isCronSystemEvent: () => true, + }), + ).toBe(false); + + // No announce should have been attempted (subagents still running) + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + }); + + it("early return (stale interim suppression) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => { + // First countActiveDescendantRuns call returns >0 (had descendants), second returns 0 + vi.mocked(countActiveDescendantRuns) + .mockReturnValueOnce(2) // initial check → hadDescendants=true, enters wait block + .mockReturnValueOnce(0); // second check after wait → activeSubagentRuns=0 + vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined); + vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(undefined); + // synthesizedText matches initialSynthesizedText & isLikelyInterimCronMessage → stale interim + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(true); + + const params = makeBaseParams({ synthesizedText: "on it, pulling everything together" }); + const state = await dispatchCronDelivery(params); + + // deliveryAttempted must be true so timer does NOT fire enqueueSystemEvent + expect(state.deliveryAttempted).toBe(true); + + // Verify timer guard agrees + expect( + shouldEnqueueCronMainSummary({ + summaryText: "on it, pulling everything together", + deliveryRequested: true, + delivered: state.delivered, + deliveryAttempted: state.deliveryAttempted, + suppressMainSummary: false, + isCronSystemEvent: () => true, + }), + ).toBe(false); + + // No announce or direct delivery should have been sent (stale interim suppressed) + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + }); + + it("normal announce success delivers exactly once and sets deliveryAttempted=true", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true); + + const params = makeBaseParams({ synthesizedText: "Morning briefing complete." }); + const state = await dispatchCronDelivery(params); + + expect(state.deliveryAttempted).toBe(true); + expect(state.delivered).toBe(true); + // Announce called exactly once + expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); + + // Timer should not fire enqueueSystemEvent (delivered=true) + expect( + shouldEnqueueCronMainSummary({ + summaryText: "Morning briefing complete.", + deliveryRequested: true, + delivered: state.delivered, + deliveryAttempted: state.deliveryAttempted, + suppressMainSummary: false, + isCronSystemEvent: () => true, + }), + ).toBe(false); + }); + + it("announce failure falls back to direct delivery exactly once (no double-deliver)", async () => { + vi.mocked(countActiveDescendantRuns).mockReturnValue(0); + vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false); + // Announce fails: runSubagentAnnounceFlow returns false + vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false); + + const { deliverOutboundPayloads } = await import("../../infra/outbound/deliver.js"); + vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]); + + const params = makeBaseParams({ synthesizedText: "Briefing ready." }); + const state = await dispatchCronDelivery(params); + + // Delivery was attempted; direct fallback picked up the slack + expect(state.deliveryAttempted).toBe(true); + expect(state.delivered).toBe(true); + + // Announce was tried exactly once + expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1); + + // Direct fallback fired exactly once (not zero, not twice) + // This ensures one delivery total reaches the user, not two + expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1); + }); + + it("no delivery requested means deliveryAttempted stays false and runSubagentAnnounceFlow not called", async () => { + const params = makeBaseParams({ + synthesizedText: "Task done.", + deliveryRequested: false, + }); + const state = await dispatchCronDelivery(params); + + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + // deliveryAttempted starts false (skipMessagingToolDelivery=false) and nothing runs + expect(state.deliveryAttempted).toBe(false); + }); +}); diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index 1924beb90b2..fffa5fcb8b8 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -318,8 +318,16 @@ export async function dispatchCronDelivery( } if (activeSubagentRuns > 0) { // Parent orchestration is still in progress; avoid announcing a partial - // update to the main requester. - return params.withRunSession({ status: "ok", summary, outputText, ...params.telemetry }); + // update to the main requester. Mark deliveryAttempted so the timer does + // not fire a redundant enqueueSystemEvent fallback (double-announce bug). + deliveryAttempted = true; + return params.withRunSession({ + status: "ok", + summary, + outputText, + deliveryAttempted, + ...params.telemetry, + }); } if ( hadDescendants && @@ -329,8 +337,16 @@ export async function dispatchCronDelivery( ) { // Descendants existed but no post-orchestration synthesis arrived AND // no descendant fallback reply was available. Suppress stale parent - // text like "on it, pulling everything together". - return params.withRunSession({ status: "ok", summary, outputText, ...params.telemetry }); + // text like "on it, pulling everything together". Mark deliveryAttempted + // so the timer does not fire a redundant enqueueSystemEvent fallback. + deliveryAttempted = true; + return params.withRunSession({ + status: "ok", + summary, + outputText, + deliveryAttempted, + ...params.telemetry, + }); } if (synthesizedText.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) { return params.withRunSession({ diff --git a/src/cron/isolated-agent/subagent-followup.test.ts b/src/cron/isolated-agent/subagent-followup.test.ts index 237f912903f..093da010026 100644 --- a/src/cron/isolated-agent/subagent-followup.test.ts +++ b/src/cron/isolated-agent/subagent-followup.test.ts @@ -1,12 +1,18 @@ -import { describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +// vi.hoisted runs before module imports, ensuring FAST_TEST_MODE is picked up. +vi.hoisted(() => { + process.env.OPENCLAW_TEST_FAST = "1"; +}); + import { expectsSubagentFollowup, isLikelyInterimCronMessage, readDescendantSubagentFallbackReply, + waitForDescendantSubagentSummary, } from "./subagent-followup.js"; vi.mock("../../agents/subagent-registry.js", () => ({ - countActiveDescendantRuns: vi.fn().mockReturnValue(0), listDescendantRunsForRequester: vi.fn().mockReturnValue([]), })); @@ -14,8 +20,18 @@ vi.mock("../../agents/tools/agent-step.js", () => ({ readLatestAssistantReply: vi.fn().mockResolvedValue(undefined), })); +vi.mock("../../gateway/call.js", () => ({ + callGateway: vi.fn().mockResolvedValue({ status: "ok" }), +})); + const { listDescendantRunsForRequester } = await import("../../agents/subagent-registry.js"); const { readLatestAssistantReply } = await import("../../agents/tools/agent-step.js"); +const { callGateway } = await import("../../gateway/call.js"); + +async function resolveAfterAdvancingTimers(promise: Promise, advanceMs = 100): Promise { + await vi.advanceTimersByTimeAsync(advanceMs); + return promise; +} describe("isLikelyInterimCronMessage", () => { it("detects 'on it' as interim", () => { @@ -243,3 +259,246 @@ describe("readDescendantSubagentFallbackReply", () => { expect(result).toBeUndefined(); }); }); + +describe("waitForDescendantSubagentSummary", () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.useRealTimers(); + vi.mocked(listDescendantRunsForRequester).mockReturnValue([]); + vi.mocked(readLatestAssistantReply).mockResolvedValue(undefined); + vi.mocked(callGateway).mockResolvedValue({ status: "ok" }); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("returns initialReply immediately when no active descendants and observedActiveDescendants=false", async () => { + vi.mocked(listDescendantRunsForRequester).mockReturnValue([]); + const result = await waitForDescendantSubagentSummary({ + sessionKey: "cron-session", + initialReply: "on it", + timeoutMs: 100, + observedActiveDescendants: false, + }); + expect(result).toBe("on it"); + expect(callGateway).not.toHaveBeenCalled(); + }); + + it("awaits active descendants via agent.wait and returns synthesis after grace period", async () => { + // First call: active run; second call (after agent.wait resolves): no active runs + vi.mocked(listDescendantRunsForRequester) + .mockReturnValueOnce([ + { + runId: "run-abc", + childSessionKey: "child-session", + requesterSessionKey: "cron-session", + requesterDisplayKey: "cron-session", + task: "morning briefing", + cleanup: "keep", + createdAt: 1000, + // no endedAt → active + }, + ]) + .mockReturnValue([]); // subsequent calls: all done + + vi.mocked(callGateway).mockResolvedValue({ status: "ok" }); + vi.mocked(readLatestAssistantReply).mockResolvedValue("Morning briefing complete!"); + + const result = await waitForDescendantSubagentSummary({ + sessionKey: "cron-session", + initialReply: "on it", + timeoutMs: 30_000, + observedActiveDescendants: true, + }); + + expect(result).toBe("Morning briefing complete!"); + // agent.wait should have been called with the active run's ID + expect(callGateway).toHaveBeenCalledWith( + expect.objectContaining({ + method: "agent.wait", + params: expect.objectContaining({ runId: "run-abc" }), + }), + ); + }); + + it("returns undefined when descendants finish but only interim text remains after grace period", async () => { + vi.useFakeTimers(); + // No active runs at call time, but observedActiveDescendants=true (saw them before) + vi.mocked(listDescendantRunsForRequester).mockReturnValue([]); + // readLatestAssistantReply keeps returning interim text + vi.mocked(readLatestAssistantReply).mockResolvedValue("on it"); + + const resultPromise = waitForDescendantSubagentSummary({ + sessionKey: "cron-session", + initialReply: "on it", + timeoutMs: 100, + observedActiveDescendants: true, + }); + + const result = await resolveAfterAdvancingTimers(resultPromise); + + expect(result).toBeUndefined(); + }); + + it("returns synthesis even if initial reply was undefined", async () => { + vi.mocked(listDescendantRunsForRequester) + .mockReturnValueOnce([ + { + runId: "run-xyz", + childSessionKey: "child-2", + requesterSessionKey: "cron-session", + requesterDisplayKey: "cron-session", + task: "report", + cleanup: "keep", + createdAt: 1000, + }, + ]) + .mockReturnValue([]); + + vi.mocked(callGateway).mockResolvedValue({ status: "ok" }); + vi.mocked(readLatestAssistantReply).mockResolvedValue("Report generated successfully."); + + const result = await waitForDescendantSubagentSummary({ + sessionKey: "cron-session", + initialReply: undefined, + timeoutMs: 30_000, + observedActiveDescendants: true, + }); + + expect(result).toBe("Report generated successfully."); + }); + + it("uses agent.wait for each active run when multiple descendants exist", async () => { + vi.mocked(listDescendantRunsForRequester) + .mockReturnValueOnce([ + { + runId: "run-1", + childSessionKey: "child-1", + requesterSessionKey: "cron-session", + requesterDisplayKey: "cron-session", + task: "task-1", + cleanup: "keep", + createdAt: 1000, + }, + { + runId: "run-2", + childSessionKey: "child-2", + requesterSessionKey: "cron-session", + requesterDisplayKey: "cron-session", + task: "task-2", + cleanup: "keep", + createdAt: 1000, + }, + ]) + .mockReturnValue([]); + + vi.mocked(callGateway).mockResolvedValue({ status: "ok" }); + vi.mocked(readLatestAssistantReply).mockResolvedValue("All tasks complete."); + + await waitForDescendantSubagentSummary({ + sessionKey: "cron-session", + initialReply: "spawned a subagent", + timeoutMs: 30_000, + observedActiveDescendants: true, + }); + + // agent.wait called once for each active run + const waitCalls = vi + .mocked(callGateway) + .mock.calls.filter((c) => (c[0] as { method?: string }).method === "agent.wait"); + expect(waitCalls).toHaveLength(2); + const runIds = waitCalls.map((c) => (c[0] as { params: { runId: string } }).params.runId); + expect(runIds).toContain("run-1"); + expect(runIds).toContain("run-2"); + }); + + it("waits for newly discovered active descendants after the first wait round", async () => { + vi.mocked(listDescendantRunsForRequester) + .mockReturnValueOnce([ + { + runId: "run-1", + childSessionKey: "child-1", + requesterSessionKey: "cron-session", + requesterDisplayKey: "cron-session", + task: "task-1", + cleanup: "keep", + createdAt: 1000, + }, + ]) + .mockReturnValueOnce([ + { + runId: "run-2", + childSessionKey: "child-2", + requesterSessionKey: "cron-session", + requesterDisplayKey: "cron-session", + task: "task-2", + cleanup: "keep", + createdAt: 1001, + }, + ]) + .mockReturnValue([]); + + vi.mocked(callGateway).mockResolvedValue({ status: "ok" }); + vi.mocked(readLatestAssistantReply).mockResolvedValue("Nested descendant work complete."); + + const result = await waitForDescendantSubagentSummary({ + sessionKey: "cron-session", + initialReply: "spawned a subagent", + timeoutMs: 30_000, + observedActiveDescendants: true, + }); + + expect(result).toBe("Nested descendant work complete."); + const waitedRunIds = vi + .mocked(callGateway) + .mock.calls.filter((c) => (c[0] as { method?: string }).method === "agent.wait") + .map((c) => (c[0] as { params: { runId: string } }).params.runId); + expect(waitedRunIds).toEqual(["run-1", "run-2"]); + }); + + it("handles agent.wait errors gracefully and still reads the synthesis", async () => { + vi.mocked(listDescendantRunsForRequester) + .mockReturnValueOnce([ + { + runId: "run-err", + childSessionKey: "child-err", + requesterSessionKey: "cron-session", + requesterDisplayKey: "cron-session", + task: "task-err", + cleanup: "keep", + createdAt: 1000, + }, + ]) + .mockReturnValue([]); + + vi.mocked(callGateway).mockRejectedValue(new Error("gateway unavailable")); + vi.mocked(readLatestAssistantReply).mockResolvedValue("Completed despite gateway error."); + + const result = await waitForDescendantSubagentSummary({ + sessionKey: "cron-session", + initialReply: "on it", + timeoutMs: 30_000, + observedActiveDescendants: true, + }); + + expect(result).toBe("Completed despite gateway error."); + }); + + it("skips NO_REPLY synthesis and returns undefined", async () => { + vi.useFakeTimers(); + vi.mocked(listDescendantRunsForRequester).mockReturnValue([]); + vi.mocked(readLatestAssistantReply).mockResolvedValue("NO_REPLY"); + + const resultPromise = waitForDescendantSubagentSummary({ + sessionKey: "cron-session", + initialReply: "on it", + timeoutMs: 100, + observedActiveDescendants: true, + }); + + const result = await resolveAfterAdvancingTimers(resultPromise); + + expect(result).toBeUndefined(); + }); +}); diff --git a/src/cron/isolated-agent/subagent-followup.ts b/src/cron/isolated-agent/subagent-followup.ts index ef4a18a3863..6d5f9d4c502 100644 --- a/src/cron/isolated-agent/subagent-followup.ts +++ b/src/cron/isolated-agent/subagent-followup.ts @@ -1,12 +1,14 @@ -import { - countActiveDescendantRuns, - listDescendantRunsForRequester, -} from "../../agents/subagent-registry.js"; +import { listDescendantRunsForRequester } from "../../agents/subagent-registry.js"; import { readLatestAssistantReply } from "../../agents/tools/agent-step.js"; import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; -const CRON_SUBAGENT_WAIT_POLL_MS = 500; -const CRON_SUBAGENT_WAIT_MIN_MS = 30_000; -const CRON_SUBAGENT_FINAL_REPLY_GRACE_MS = 5_000; +import { callGateway } from "../../gateway/call.js"; + +const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1"; + +const CRON_SUBAGENT_WAIT_MIN_MS = FAST_TEST_MODE ? 10 : 30_000; +const CRON_SUBAGENT_FINAL_REPLY_GRACE_MS = FAST_TEST_MODE ? 50 : 5_000; +const CRON_SUBAGENT_GRACE_POLL_MS = FAST_TEST_MODE ? 8 : 200; + const SUBAGENT_FOLLOWUP_HINTS = [ "subagent spawned", "spawned a subagent", @@ -14,6 +16,7 @@ const SUBAGENT_FOLLOWUP_HINTS = [ "both subagents are running", "wait for them to report back", ] as const; + const INTERIM_CRON_HINTS = [ "on it", "pulling everything together", @@ -103,6 +106,12 @@ export async function readDescendantSubagentFallbackReply(params: { return replies.join("\n\n"); } +/** + * Waits for descendant subagents to complete using a push-based approach: + * each active descendant run is awaited via `agent.wait` (gateway RPC) instead + * of a busy-poll loop. After all active runs settle, a short grace period + * polls the cron agent's session for a post-orchestration synthesis message. + */ export async function waitForDescendantSubagentSummary(params: { sessionKey: string; initialReply?: string; @@ -111,22 +120,53 @@ export async function waitForDescendantSubagentSummary(params: { }): Promise { const initialReply = params.initialReply?.trim(); const deadline = Date.now() + Math.max(CRON_SUBAGENT_WAIT_MIN_MS, Math.floor(params.timeoutMs)); - let sawActiveDescendants = params.observedActiveDescendants === true; - let drainedAtMs: number | undefined; - while (Date.now() < deadline) { - const activeDescendants = countActiveDescendantRuns(params.sessionKey); - if (activeDescendants > 0) { - sawActiveDescendants = true; - drainedAtMs = undefined; - await new Promise((resolve) => setTimeout(resolve, CRON_SUBAGENT_WAIT_POLL_MS)); - continue; - } - if (!sawActiveDescendants) { - return initialReply; - } - if (!drainedAtMs) { - drainedAtMs = Date.now(); - } + + // Snapshot the currently active descendant run IDs. + const getActiveRuns = () => + listDescendantRunsForRequester(params.sessionKey).filter( + (entry) => typeof entry.endedAt !== "number", + ); + + const initialActiveRuns = getActiveRuns(); + const sawActiveDescendants = + params.observedActiveDescendants === true || initialActiveRuns.length > 0; + + if (!sawActiveDescendants) { + // No active descendants and none were observed before the call – nothing to wait for. + return initialReply; + } + + // --- Push-based wait for all active descendants --- + // We iterate in case first-level descendants spawn their own subagents while + // we wait, so new active runs can appear between rounds. + let pendingRunIds = new Set(initialActiveRuns.map((e) => e.runId)); + + while (pendingRunIds.size > 0 && Date.now() < deadline) { + const remainingMs = Math.max(1, deadline - Date.now()); + // Wait for all currently pending runs concurrently. If any fails or times + // out, allSettled absorbs the error so we proceed to the next iteration. + await Promise.allSettled( + [...pendingRunIds].map((runId) => + callGateway<{ status?: string }>({ + method: "agent.wait", + params: { runId, timeoutMs: remainingMs }, + timeoutMs: remainingMs + 2_000, + }).catch(() => undefined), + ), + ); + + // Refresh: check for newly created active descendants (e.g. spawned by + // the runs that just finished) and keep looping if any exist. + pendingRunIds = new Set(getActiveRuns().map((e) => e.runId)); + } + + // --- Grace period: wait for the cron agent's synthesis --- + // After the subagent announces fire and the cron agent processes them, it + // produces a new assistant message. Poll briefly (bounded by + // CRON_SUBAGENT_FINAL_REPLY_GRACE_MS) to capture that synthesis. + const gracePeriodDeadline = Math.min(Date.now() + CRON_SUBAGENT_FINAL_REPLY_GRACE_MS, deadline); + + while (Date.now() < gracePeriodDeadline) { const latest = (await readLatestAssistantReply({ sessionKey: params.sessionKey }))?.trim(); if ( latest && @@ -135,11 +175,10 @@ export async function waitForDescendantSubagentSummary(params: { ) { return latest; } - if (Date.now() - drainedAtMs >= CRON_SUBAGENT_FINAL_REPLY_GRACE_MS) { - return undefined; - } - await new Promise((resolve) => setTimeout(resolve, CRON_SUBAGENT_WAIT_POLL_MS)); + await new Promise((resolve) => setTimeout(resolve, CRON_SUBAGENT_GRACE_POLL_MS)); } + + // Final read after grace period expires. const latest = (await readLatestAssistantReply({ sessionKey: params.sessionKey }))?.trim(); if ( latest && @@ -148,5 +187,6 @@ export async function waitForDescendantSubagentSummary(params: { ) { return latest; } + return undefined; }