fix(cron): eliminate double-announce and replace delivery polling with push-based flow (#39089)

* fix(cron): eliminate double-announce and replace delivery polling with push-based flow

- Set deliveryAttempted=true in announce early-return paths (active-subagent
  suppression and stale-interim suppression) so the heartbeat timer no longer
  fires a redundant enqueueSystemEvent fallback (double-announce bug).

- Refactor waitForDescendantSubagentSummary to use event-based agent.wait RPC
  calls instead of a 500ms busy-poll loop.  Each active descendant run is now
  awaited concurrently via Promise.allSettled, and only a short bounded grace
  period (5s) remains to capture the cron agent's post-orchestration synthesis.
  Eliminates O(n*timeoutMs/500ms) gateway calls and wasted wall-clock time.

- Add FAST_TEST_MODE (OPENCLAW_TEST_FAST=1) to subagent-followup.ts to keep
  the grace-period tests instant in CI.

- Add comprehensive tests for the new waitForDescendantSubagentSummary behaviour
  (push-based wait, error resilience, NO_REPLY handling, multi-descendant waits).

* fix: prep cron double-announce followup tests (#39089) (thanks @tyler6204)
This commit is contained in:
Tyler Yust
2026-03-07 12:13:37 -08:00
committed by GitHub
parent 97f9e25525
commit e554c59aac
5 changed files with 620 additions and 33 deletions

View File

@@ -628,6 +628,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Cron/announce delivery: stop duplicate completion announces when cron early-return paths already handled delivery, and replace descendant followup polling with push-based waits so cron summaries arrive without the old busy-loop fallback. (#39089) Thanks @tyler6204.
- Dashboard/macOS auth handling: switch the macOS “Open Dashboard” flow from query-string token injection to URL fragments, stop persisting Control UI gateway tokens in browser localStorage, and scrub legacy stored tokens on load. Thanks @JNX03 for reporting.
- Models/provider config precedence: prefer exact `models.providers.<name>` matches before normalized provider aliases in embedded model resolution, preventing alias/canonical key collisions from applying the wrong provider `api`, `baseUrl`, or headers. (#35934) thanks @RealKai42.
- Hooks/auth throttling: reject non-`POST` `/hooks/*` requests before auth-failure accounting so unsupported methods can no longer burn the hook auth lockout budget and block legitimate webhook delivery. Thanks @JNX03 for reporting.

View File

@@ -0,0 +1,271 @@
/**
* Tests for the double-announce bug in cron delivery dispatch.
*
* Bug: early return paths in deliverViaAnnounce (active subagent suppression
* and stale interim message suppression) returned without setting
* deliveryAttempted = true. The timer saw deliveryAttempted = false and
* fired enqueueSystemEvent as a fallback, causing a second announcement.
*
* Fix: both early return paths now set deliveryAttempted = true before
* returning so the timer correctly skips the system-event fallback.
*/
import { beforeEach, describe, expect, it, vi } from "vitest";
// --- Module mocks (must be hoisted before imports) ---
vi.mock("../../agents/subagent-announce.js", () => ({
runSubagentAnnounceFlow: vi.fn().mockResolvedValue(true),
}));
vi.mock("../../agents/subagent-registry.js", () => ({
countActiveDescendantRuns: vi.fn().mockReturnValue(0),
}));
vi.mock("../../config/sessions.js", () => ({
resolveAgentMainSessionKey: vi.fn().mockReturnValue("agent:main"),
}));
vi.mock("../../infra/outbound/outbound-session.js", () => ({
resolveOutboundSessionRoute: vi.fn().mockResolvedValue(null),
ensureOutboundSessionEntry: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("../../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: vi.fn().mockResolvedValue([{ ok: true }]),
}));
vi.mock("../../infra/outbound/identity.js", () => ({
resolveAgentOutboundIdentity: vi.fn().mockReturnValue({}),
}));
vi.mock("../../infra/outbound/session-context.js", () => ({
buildOutboundSessionContext: vi.fn().mockReturnValue({}),
}));
vi.mock("../../cli/outbound-send-deps.js", () => ({
createOutboundSendDeps: vi.fn().mockReturnValue({}),
}));
vi.mock("../../logger.js", () => ({
logWarn: vi.fn(),
}));
vi.mock("./subagent-followup.js", () => ({
expectsSubagentFollowup: vi.fn().mockReturnValue(false),
isLikelyInterimCronMessage: vi.fn().mockReturnValue(false),
readDescendantSubagentFallbackReply: vi.fn().mockResolvedValue(undefined),
waitForDescendantSubagentSummary: vi.fn().mockResolvedValue(undefined),
}));
import { runSubagentAnnounceFlow } from "../../agents/subagent-announce.js";
// Import after mocks
import { countActiveDescendantRuns } from "../../agents/subagent-registry.js";
import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js";
import { dispatchCronDelivery } from "./delivery-dispatch.js";
import type { DeliveryTargetResolution } from "./delivery-target.js";
import type { RunCronAgentTurnResult } from "./run.js";
import {
expectsSubagentFollowup,
isLikelyInterimCronMessage,
readDescendantSubagentFallbackReply,
waitForDescendantSubagentSummary,
} from "./subagent-followup.js";
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeResolvedDelivery(): Extract<DeliveryTargetResolution, { ok: true }> {
return {
ok: true,
channel: "telegram",
to: "123456",
accountId: undefined,
threadId: undefined,
};
}
function makeWithRunSession() {
return (
result: Omit<RunCronAgentTurnResult, "sessionId" | "sessionKey">,
): RunCronAgentTurnResult => ({
...result,
sessionId: "test-session-id",
sessionKey: "test-session-key",
});
}
function makeBaseParams(overrides: { synthesizedText?: string; deliveryRequested?: boolean }) {
const resolvedDelivery = makeResolvedDelivery();
return {
cfg: {} as never,
cfgWithAgentDefaults: {} as never,
deps: {} as never,
job: {
id: "test-job",
name: "Test Job",
deleteAfterRun: false,
payload: { kind: "agentTurn", message: "hello" },
} as never,
agentId: "main",
agentSessionKey: "agent:main",
runSessionId: "run-123",
runStartedAt: Date.now(),
runEndedAt: Date.now(),
timeoutMs: 30_000,
resolvedDelivery,
deliveryRequested: overrides.deliveryRequested ?? true,
skipHeartbeatDelivery: false,
skipMessagingToolDelivery: false,
deliveryBestEffort: false,
deliveryPayloadHasStructuredContent: false,
deliveryPayloads: overrides.synthesizedText ? [{ text: overrides.synthesizedText }] : [],
synthesizedText: overrides.synthesizedText ?? "on it",
summary: overrides.synthesizedText ?? "on it",
outputText: overrides.synthesizedText ?? "on it",
telemetry: undefined,
abortSignal: undefined,
isAborted: () => false,
abortReason: () => "aborted",
withRunSession: makeWithRunSession(),
};
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("dispatchCronDelivery — double-announce guard", () => {
beforeEach(() => {
vi.clearAllMocks();
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(expectsSubagentFollowup).mockReturnValue(false);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(undefined);
vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined);
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true);
});
it("early return (active subagent) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => {
// countActiveDescendantRuns returns >0 → enters wait block; still >0 after wait → early return
vi.mocked(countActiveDescendantRuns).mockReturnValue(2);
vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined);
vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(undefined);
const params = makeBaseParams({ synthesizedText: "on it" });
const state = await dispatchCronDelivery(params);
// deliveryAttempted must be true so timer does NOT fire enqueueSystemEvent
expect(state.deliveryAttempted).toBe(true);
// Verify timer guard agrees: shouldEnqueueCronMainSummary returns false
expect(
shouldEnqueueCronMainSummary({
summaryText: "on it",
deliveryRequested: true,
delivered: state.delivered,
deliveryAttempted: state.deliveryAttempted,
suppressMainSummary: false,
isCronSystemEvent: () => true,
}),
).toBe(false);
// No announce should have been attempted (subagents still running)
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
});
it("early return (stale interim suppression) sets deliveryAttempted=true so timer skips enqueueSystemEvent", async () => {
// First countActiveDescendantRuns call returns >0 (had descendants), second returns 0
vi.mocked(countActiveDescendantRuns)
.mockReturnValueOnce(2) // initial check → hadDescendants=true, enters wait block
.mockReturnValueOnce(0); // second check after wait → activeSubagentRuns=0
vi.mocked(waitForDescendantSubagentSummary).mockResolvedValue(undefined);
vi.mocked(readDescendantSubagentFallbackReply).mockResolvedValue(undefined);
// synthesizedText matches initialSynthesizedText & isLikelyInterimCronMessage → stale interim
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(true);
const params = makeBaseParams({ synthesizedText: "on it, pulling everything together" });
const state = await dispatchCronDelivery(params);
// deliveryAttempted must be true so timer does NOT fire enqueueSystemEvent
expect(state.deliveryAttempted).toBe(true);
// Verify timer guard agrees
expect(
shouldEnqueueCronMainSummary({
summaryText: "on it, pulling everything together",
deliveryRequested: true,
delivered: state.delivered,
deliveryAttempted: state.deliveryAttempted,
suppressMainSummary: false,
isCronSystemEvent: () => true,
}),
).toBe(false);
// No announce or direct delivery should have been sent (stale interim suppressed)
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
});
it("normal announce success delivers exactly once and sets deliveryAttempted=true", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(true);
const params = makeBaseParams({ synthesizedText: "Morning briefing complete." });
const state = await dispatchCronDelivery(params);
expect(state.deliveryAttempted).toBe(true);
expect(state.delivered).toBe(true);
// Announce called exactly once
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
// Timer should not fire enqueueSystemEvent (delivered=true)
expect(
shouldEnqueueCronMainSummary({
summaryText: "Morning briefing complete.",
deliveryRequested: true,
delivered: state.delivered,
deliveryAttempted: state.deliveryAttempted,
suppressMainSummary: false,
isCronSystemEvent: () => true,
}),
).toBe(false);
});
it("announce failure falls back to direct delivery exactly once (no double-deliver)", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
// Announce fails: runSubagentAnnounceFlow returns false
vi.mocked(runSubagentAnnounceFlow).mockResolvedValue(false);
const { deliverOutboundPayloads } = await import("../../infra/outbound/deliver.js");
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
const params = makeBaseParams({ synthesizedText: "Briefing ready." });
const state = await dispatchCronDelivery(params);
// Delivery was attempted; direct fallback picked up the slack
expect(state.deliveryAttempted).toBe(true);
expect(state.delivered).toBe(true);
// Announce was tried exactly once
expect(runSubagentAnnounceFlow).toHaveBeenCalledTimes(1);
// Direct fallback fired exactly once (not zero, not twice)
// This ensures one delivery total reaches the user, not two
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
});
it("no delivery requested means deliveryAttempted stays false and runSubagentAnnounceFlow not called", async () => {
const params = makeBaseParams({
synthesizedText: "Task done.",
deliveryRequested: false,
});
const state = await dispatchCronDelivery(params);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
// deliveryAttempted starts false (skipMessagingToolDelivery=false) and nothing runs
expect(state.deliveryAttempted).toBe(false);
});
});

View File

@@ -318,8 +318,16 @@ export async function dispatchCronDelivery(
}
if (activeSubagentRuns > 0) {
// Parent orchestration is still in progress; avoid announcing a partial
// update to the main requester.
return params.withRunSession({ status: "ok", summary, outputText, ...params.telemetry });
// update to the main requester. Mark deliveryAttempted so the timer does
// not fire a redundant enqueueSystemEvent fallback (double-announce bug).
deliveryAttempted = true;
return params.withRunSession({
status: "ok",
summary,
outputText,
deliveryAttempted,
...params.telemetry,
});
}
if (
hadDescendants &&
@@ -329,8 +337,16 @@ export async function dispatchCronDelivery(
) {
// Descendants existed but no post-orchestration synthesis arrived AND
// no descendant fallback reply was available. Suppress stale parent
// text like "on it, pulling everything together".
return params.withRunSession({ status: "ok", summary, outputText, ...params.telemetry });
// text like "on it, pulling everything together". Mark deliveryAttempted
// so the timer does not fire a redundant enqueueSystemEvent fallback.
deliveryAttempted = true;
return params.withRunSession({
status: "ok",
summary,
outputText,
deliveryAttempted,
...params.telemetry,
});
}
if (synthesizedText.toUpperCase() === SILENT_REPLY_TOKEN.toUpperCase()) {
return params.withRunSession({

View File

@@ -1,12 +1,18 @@
import { describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// vi.hoisted runs before module imports, ensuring FAST_TEST_MODE is picked up.
vi.hoisted(() => {
process.env.OPENCLAW_TEST_FAST = "1";
});
import {
expectsSubagentFollowup,
isLikelyInterimCronMessage,
readDescendantSubagentFallbackReply,
waitForDescendantSubagentSummary,
} from "./subagent-followup.js";
vi.mock("../../agents/subagent-registry.js", () => ({
countActiveDescendantRuns: vi.fn().mockReturnValue(0),
listDescendantRunsForRequester: vi.fn().mockReturnValue([]),
}));
@@ -14,8 +20,18 @@ vi.mock("../../agents/tools/agent-step.js", () => ({
readLatestAssistantReply: vi.fn().mockResolvedValue(undefined),
}));
vi.mock("../../gateway/call.js", () => ({
callGateway: vi.fn().mockResolvedValue({ status: "ok" }),
}));
const { listDescendantRunsForRequester } = await import("../../agents/subagent-registry.js");
const { readLatestAssistantReply } = await import("../../agents/tools/agent-step.js");
const { callGateway } = await import("../../gateway/call.js");
async function resolveAfterAdvancingTimers<T>(promise: Promise<T>, advanceMs = 100): Promise<T> {
await vi.advanceTimersByTimeAsync(advanceMs);
return promise;
}
describe("isLikelyInterimCronMessage", () => {
it("detects 'on it' as interim", () => {
@@ -243,3 +259,246 @@ describe("readDescendantSubagentFallbackReply", () => {
expect(result).toBeUndefined();
});
});
describe("waitForDescendantSubagentSummary", () => {
beforeEach(() => {
vi.clearAllMocks();
vi.useRealTimers();
vi.mocked(listDescendantRunsForRequester).mockReturnValue([]);
vi.mocked(readLatestAssistantReply).mockResolvedValue(undefined);
vi.mocked(callGateway).mockResolvedValue({ status: "ok" });
});
afterEach(() => {
vi.useRealTimers();
});
it("returns initialReply immediately when no active descendants and observedActiveDescendants=false", async () => {
vi.mocked(listDescendantRunsForRequester).mockReturnValue([]);
const result = await waitForDescendantSubagentSummary({
sessionKey: "cron-session",
initialReply: "on it",
timeoutMs: 100,
observedActiveDescendants: false,
});
expect(result).toBe("on it");
expect(callGateway).not.toHaveBeenCalled();
});
it("awaits active descendants via agent.wait and returns synthesis after grace period", async () => {
// First call: active run; second call (after agent.wait resolves): no active runs
vi.mocked(listDescendantRunsForRequester)
.mockReturnValueOnce([
{
runId: "run-abc",
childSessionKey: "child-session",
requesterSessionKey: "cron-session",
requesterDisplayKey: "cron-session",
task: "morning briefing",
cleanup: "keep",
createdAt: 1000,
// no endedAt → active
},
])
.mockReturnValue([]); // subsequent calls: all done
vi.mocked(callGateway).mockResolvedValue({ status: "ok" });
vi.mocked(readLatestAssistantReply).mockResolvedValue("Morning briefing complete!");
const result = await waitForDescendantSubagentSummary({
sessionKey: "cron-session",
initialReply: "on it",
timeoutMs: 30_000,
observedActiveDescendants: true,
});
expect(result).toBe("Morning briefing complete!");
// agent.wait should have been called with the active run's ID
expect(callGateway).toHaveBeenCalledWith(
expect.objectContaining({
method: "agent.wait",
params: expect.objectContaining({ runId: "run-abc" }),
}),
);
});
it("returns undefined when descendants finish but only interim text remains after grace period", async () => {
vi.useFakeTimers();
// No active runs at call time, but observedActiveDescendants=true (saw them before)
vi.mocked(listDescendantRunsForRequester).mockReturnValue([]);
// readLatestAssistantReply keeps returning interim text
vi.mocked(readLatestAssistantReply).mockResolvedValue("on it");
const resultPromise = waitForDescendantSubagentSummary({
sessionKey: "cron-session",
initialReply: "on it",
timeoutMs: 100,
observedActiveDescendants: true,
});
const result = await resolveAfterAdvancingTimers(resultPromise);
expect(result).toBeUndefined();
});
it("returns synthesis even if initial reply was undefined", async () => {
vi.mocked(listDescendantRunsForRequester)
.mockReturnValueOnce([
{
runId: "run-xyz",
childSessionKey: "child-2",
requesterSessionKey: "cron-session",
requesterDisplayKey: "cron-session",
task: "report",
cleanup: "keep",
createdAt: 1000,
},
])
.mockReturnValue([]);
vi.mocked(callGateway).mockResolvedValue({ status: "ok" });
vi.mocked(readLatestAssistantReply).mockResolvedValue("Report generated successfully.");
const result = await waitForDescendantSubagentSummary({
sessionKey: "cron-session",
initialReply: undefined,
timeoutMs: 30_000,
observedActiveDescendants: true,
});
expect(result).toBe("Report generated successfully.");
});
it("uses agent.wait for each active run when multiple descendants exist", async () => {
vi.mocked(listDescendantRunsForRequester)
.mockReturnValueOnce([
{
runId: "run-1",
childSessionKey: "child-1",
requesterSessionKey: "cron-session",
requesterDisplayKey: "cron-session",
task: "task-1",
cleanup: "keep",
createdAt: 1000,
},
{
runId: "run-2",
childSessionKey: "child-2",
requesterSessionKey: "cron-session",
requesterDisplayKey: "cron-session",
task: "task-2",
cleanup: "keep",
createdAt: 1000,
},
])
.mockReturnValue([]);
vi.mocked(callGateway).mockResolvedValue({ status: "ok" });
vi.mocked(readLatestAssistantReply).mockResolvedValue("All tasks complete.");
await waitForDescendantSubagentSummary({
sessionKey: "cron-session",
initialReply: "spawned a subagent",
timeoutMs: 30_000,
observedActiveDescendants: true,
});
// agent.wait called once for each active run
const waitCalls = vi
.mocked(callGateway)
.mock.calls.filter((c) => (c[0] as { method?: string }).method === "agent.wait");
expect(waitCalls).toHaveLength(2);
const runIds = waitCalls.map((c) => (c[0] as { params: { runId: string } }).params.runId);
expect(runIds).toContain("run-1");
expect(runIds).toContain("run-2");
});
it("waits for newly discovered active descendants after the first wait round", async () => {
vi.mocked(listDescendantRunsForRequester)
.mockReturnValueOnce([
{
runId: "run-1",
childSessionKey: "child-1",
requesterSessionKey: "cron-session",
requesterDisplayKey: "cron-session",
task: "task-1",
cleanup: "keep",
createdAt: 1000,
},
])
.mockReturnValueOnce([
{
runId: "run-2",
childSessionKey: "child-2",
requesterSessionKey: "cron-session",
requesterDisplayKey: "cron-session",
task: "task-2",
cleanup: "keep",
createdAt: 1001,
},
])
.mockReturnValue([]);
vi.mocked(callGateway).mockResolvedValue({ status: "ok" });
vi.mocked(readLatestAssistantReply).mockResolvedValue("Nested descendant work complete.");
const result = await waitForDescendantSubagentSummary({
sessionKey: "cron-session",
initialReply: "spawned a subagent",
timeoutMs: 30_000,
observedActiveDescendants: true,
});
expect(result).toBe("Nested descendant work complete.");
const waitedRunIds = vi
.mocked(callGateway)
.mock.calls.filter((c) => (c[0] as { method?: string }).method === "agent.wait")
.map((c) => (c[0] as { params: { runId: string } }).params.runId);
expect(waitedRunIds).toEqual(["run-1", "run-2"]);
});
it("handles agent.wait errors gracefully and still reads the synthesis", async () => {
vi.mocked(listDescendantRunsForRequester)
.mockReturnValueOnce([
{
runId: "run-err",
childSessionKey: "child-err",
requesterSessionKey: "cron-session",
requesterDisplayKey: "cron-session",
task: "task-err",
cleanup: "keep",
createdAt: 1000,
},
])
.mockReturnValue([]);
vi.mocked(callGateway).mockRejectedValue(new Error("gateway unavailable"));
vi.mocked(readLatestAssistantReply).mockResolvedValue("Completed despite gateway error.");
const result = await waitForDescendantSubagentSummary({
sessionKey: "cron-session",
initialReply: "on it",
timeoutMs: 30_000,
observedActiveDescendants: true,
});
expect(result).toBe("Completed despite gateway error.");
});
it("skips NO_REPLY synthesis and returns undefined", async () => {
vi.useFakeTimers();
vi.mocked(listDescendantRunsForRequester).mockReturnValue([]);
vi.mocked(readLatestAssistantReply).mockResolvedValue("NO_REPLY");
const resultPromise = waitForDescendantSubagentSummary({
sessionKey: "cron-session",
initialReply: "on it",
timeoutMs: 100,
observedActiveDescendants: true,
});
const result = await resolveAfterAdvancingTimers(resultPromise);
expect(result).toBeUndefined();
});
});

View File

@@ -1,12 +1,14 @@
import {
countActiveDescendantRuns,
listDescendantRunsForRequester,
} from "../../agents/subagent-registry.js";
import { listDescendantRunsForRequester } from "../../agents/subagent-registry.js";
import { readLatestAssistantReply } from "../../agents/tools/agent-step.js";
import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
const CRON_SUBAGENT_WAIT_POLL_MS = 500;
const CRON_SUBAGENT_WAIT_MIN_MS = 30_000;
const CRON_SUBAGENT_FINAL_REPLY_GRACE_MS = 5_000;
import { callGateway } from "../../gateway/call.js";
const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1";
const CRON_SUBAGENT_WAIT_MIN_MS = FAST_TEST_MODE ? 10 : 30_000;
const CRON_SUBAGENT_FINAL_REPLY_GRACE_MS = FAST_TEST_MODE ? 50 : 5_000;
const CRON_SUBAGENT_GRACE_POLL_MS = FAST_TEST_MODE ? 8 : 200;
const SUBAGENT_FOLLOWUP_HINTS = [
"subagent spawned",
"spawned a subagent",
@@ -14,6 +16,7 @@ const SUBAGENT_FOLLOWUP_HINTS = [
"both subagents are running",
"wait for them to report back",
] as const;
const INTERIM_CRON_HINTS = [
"on it",
"pulling everything together",
@@ -103,6 +106,12 @@ export async function readDescendantSubagentFallbackReply(params: {
return replies.join("\n\n");
}
/**
* Waits for descendant subagents to complete using a push-based approach:
* each active descendant run is awaited via `agent.wait` (gateway RPC) instead
* of a busy-poll loop. After all active runs settle, a short grace period
* polls the cron agent's session for a post-orchestration synthesis message.
*/
export async function waitForDescendantSubagentSummary(params: {
sessionKey: string;
initialReply?: string;
@@ -111,22 +120,53 @@ export async function waitForDescendantSubagentSummary(params: {
}): Promise<string | undefined> {
const initialReply = params.initialReply?.trim();
const deadline = Date.now() + Math.max(CRON_SUBAGENT_WAIT_MIN_MS, Math.floor(params.timeoutMs));
let sawActiveDescendants = params.observedActiveDescendants === true;
let drainedAtMs: number | undefined;
while (Date.now() < deadline) {
const activeDescendants = countActiveDescendantRuns(params.sessionKey);
if (activeDescendants > 0) {
sawActiveDescendants = true;
drainedAtMs = undefined;
await new Promise((resolve) => setTimeout(resolve, CRON_SUBAGENT_WAIT_POLL_MS));
continue;
}
if (!sawActiveDescendants) {
return initialReply;
}
if (!drainedAtMs) {
drainedAtMs = Date.now();
}
// Snapshot the currently active descendant run IDs.
const getActiveRuns = () =>
listDescendantRunsForRequester(params.sessionKey).filter(
(entry) => typeof entry.endedAt !== "number",
);
const initialActiveRuns = getActiveRuns();
const sawActiveDescendants =
params.observedActiveDescendants === true || initialActiveRuns.length > 0;
if (!sawActiveDescendants) {
// No active descendants and none were observed before the call nothing to wait for.
return initialReply;
}
// --- Push-based wait for all active descendants ---
// We iterate in case first-level descendants spawn their own subagents while
// we wait, so new active runs can appear between rounds.
let pendingRunIds = new Set<string>(initialActiveRuns.map((e) => e.runId));
while (pendingRunIds.size > 0 && Date.now() < deadline) {
const remainingMs = Math.max(1, deadline - Date.now());
// Wait for all currently pending runs concurrently. If any fails or times
// out, allSettled absorbs the error so we proceed to the next iteration.
await Promise.allSettled(
[...pendingRunIds].map((runId) =>
callGateway<{ status?: string }>({
method: "agent.wait",
params: { runId, timeoutMs: remainingMs },
timeoutMs: remainingMs + 2_000,
}).catch(() => undefined),
),
);
// Refresh: check for newly created active descendants (e.g. spawned by
// the runs that just finished) and keep looping if any exist.
pendingRunIds = new Set<string>(getActiveRuns().map((e) => e.runId));
}
// --- Grace period: wait for the cron agent's synthesis ---
// After the subagent announces fire and the cron agent processes them, it
// produces a new assistant message. Poll briefly (bounded by
// CRON_SUBAGENT_FINAL_REPLY_GRACE_MS) to capture that synthesis.
const gracePeriodDeadline = Math.min(Date.now() + CRON_SUBAGENT_FINAL_REPLY_GRACE_MS, deadline);
while (Date.now() < gracePeriodDeadline) {
const latest = (await readLatestAssistantReply({ sessionKey: params.sessionKey }))?.trim();
if (
latest &&
@@ -135,11 +175,10 @@ export async function waitForDescendantSubagentSummary(params: {
) {
return latest;
}
if (Date.now() - drainedAtMs >= CRON_SUBAGENT_FINAL_REPLY_GRACE_MS) {
return undefined;
}
await new Promise((resolve) => setTimeout(resolve, CRON_SUBAGENT_WAIT_POLL_MS));
await new Promise<void>((resolve) => setTimeout(resolve, CRON_SUBAGENT_GRACE_POLL_MS));
}
// Final read after grace period expires.
const latest = (await readLatestAssistantReply({ sessionKey: params.sessionKey }))?.trim();
if (
latest &&
@@ -148,5 +187,6 @@ export async function waitForDescendantSubagentSummary(params: {
) {
return latest;
}
return undefined;
}