Subagents: restore announce chain + fix nested retry/drop regressions (#22223)

* Subagents: restore announce flow and fix nested delivery retries

* fix: prep subagent announce + docs alignment (#22223) (thanks @tyler6204)
This commit is contained in:
Tyler Yust
2026-02-20 15:39:09 -08:00
committed by GitHub
parent 086af56867
commit fe57bea088
21 changed files with 579 additions and 985 deletions

View File

@@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai
### Changes
- Agents/Subagents: default subagent spawn depth now uses shared `maxSpawnDepth=2`, enabling depth-1 orchestrator spawning by default while keeping depth policy checks consistent across spawn and prompt paths. (#22223) Thanks @tyler6204.
- Channels/CLI: add per-account/channel `defaultTo` outbound routing fallback so `openclaw agent --deliver` can send without explicit `--reply-to` when a default target is configured. (#16985) Thanks @KirillShchetinin.
- iOS/Chat: clean chat UI noise by stripping inbound untrusted metadata/timestamp prefixes, formatting tool outputs into concise summaries/errors, compacting the composer while typing, and supporting tap-to-dismiss keyboard in chat view. (#22122) thanks @mbelinky.
- iOS/Watch: bridge mirrored watch prompt notification actions into iOS quick-reply handling, including queued action handoff until app model initialization. (#22123) thanks @mbelinky.
@@ -19,6 +20,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Agents/Subagents: restore announce-chain delivery to agent injection, defer nested announce output until descendant follow-up content is ready, and prevent descendant deferrals from consuming announce retry budget so deep chains do not drop final completions. (#22223) Thanks @tyler6204.
- Gateway/Auth: require `gateway.trustedProxies` to include a loopback proxy address when `auth.mode="trusted-proxy"` and `bind="loopback"`, preventing same-host proxy misconfiguration from silently blocking auth. (#22082, follow-up to #20097) thanks @mbelinky.
- Agents/System Prompt: label allowlisted senders as authorized senders to avoid implying ownership. Thanks @thewilloftheshadow.
- Gateway/Auth: allow trusted-proxy mode with loopback bind for same-host reverse-proxy deployments, while still requiring configured `gateway.trustedProxies`. (#20097) thanks @xinhuagu.

View File

@@ -166,12 +166,12 @@ Behavior:
- Starts a new `agent:<agentId>:subagent:<uuid>` session with `deliver: false`.
- Sub-agents default to the full tool set **minus session tools** (configurable via `tools.subagents.tools`).
- Sub-agents are not allowed to call `sessions_spawn` (no sub-agent → sub-agent spawning).
- Depth policy is enforced for nested spawns. With the default `maxSpawnDepth = 2`, depth-1 sub-agents can call `sessions_spawn`, depth-2 sub-agents cannot.
- Always non-blocking: returns `{ status: "accepted", runId, childSessionKey }` immediately.
- After completion, OpenClaw runs a sub-agent **announce step** and posts the result to the requester chat channel.
- If the assistant final reply is empty, the latest `toolResult` from sub-agent history is included as `Result`.
- Reply exactly `ANNOUNCE_SKIP` during the announce step to stay silent.
- Announce replies are normalized to `Status`/`Result`/`Notes`; `Status` comes from runtime outcome (not model text).
- After completion, OpenClaw builds a sub-agent announce system message from the child session's latest assistant reply and injects it to the requester session.
- Delivery stays internal (`deliver=false`) when the requester is a sub-agent, and is user-facing (`deliver=true`) when the requester is main.
- Recipient agents can return the internal silent token to suppress duplicate outward delivery in the same turn.
- Announce replies are normalized to runtime-derived status plus result context.
- Sub-agent sessions are auto-archived after `agents.defaults.subagents.archiveAfterMinutes` (default: 60).
- Announce replies include a stats line (runtime, tokens, sessionKey/sessionId, transcript path, and optional cost).

View File

@@ -35,7 +35,7 @@ Use `/subagents` to inspect or control sub-agent runs for the **current session*
- If direct delivery fails, it falls back to queue routing.
- If queue routing is still not available, the announce is retried with a short exponential backoff before final give-up.
- The completion message is a system message and includes:
- `Result` (`assistant` reply text, or latest `toolResult` if the assistant reply is empty)
- `Result` (latest assistant reply text from the child session, after a short settle retry)
- `Status` (`completed successfully` / `failed` / `timed out`)
- compact runtime/token stats
- `--model` and `--thinking` override defaults for that specific run.
@@ -90,7 +90,7 @@ Auto-archive:
## Nested Sub-Agents
By default, sub-agents cannot spawn their own sub-agents (`maxSpawnDepth: 1`). You can enable one level of nesting by setting `maxSpawnDepth: 2`, which allows the **orchestrator pattern**: main → orchestrator sub-agent → worker sub-sub-agents.
By default, sub-agents can spawn one additional level (`maxSpawnDepth: 2`), enabling the **orchestrator pattern**: main → orchestrator sub-agent → worker sub-sub-agents. Set `maxSpawnDepth: 1` to disable nested spawning.
### How to enable
@@ -99,7 +99,7 @@ By default, sub-agents cannot spawn their own sub-agents (`maxSpawnDepth: 1`). Y
agents: {
defaults: {
subagents: {
maxSpawnDepth: 2, // allow sub-agents to spawn children (default: 1)
maxSpawnDepth: 2, // allow sub-agents to spawn children (default: 2)
maxChildrenPerAgent: 5, // max active children per agent session (default: 5)
maxConcurrent: 8, // global concurrency lane cap (default: 8)
},
@@ -110,11 +110,11 @@ By default, sub-agents cannot spawn their own sub-agents (`maxSpawnDepth: 1`). Y
### Depth levels
| Depth | Session key shape | Role | Can spawn? |
| ----- | -------------------------------------------- | --------------------------------------------- | ---------------------------- |
| 0 | `agent:<id>:main` | Main agent | Always |
| 1 | `agent:<id>:subagent:<uuid>` | Sub-agent (orchestrator when depth 2 allowed) | Only if `maxSpawnDepth >= 2` |
| 2 | `agent:<id>:subagent:<uuid>:subagent:<uuid>` | Sub-sub-agent (leaf worker) | Never |
| Depth | Session key shape | Role | Can spawn? |
| ----- | -------------------------------------------- | ----------------------------------- | ------------------------------ |
| 0 | `agent:<id>:main` | Main agent | Always |
| 1 | `agent:<id>:subagent:<uuid>` | Sub-agent (orchestrator by default) | Yes, when `maxSpawnDepth >= 2` |
| 2 | `agent:<id>:subagent:<uuid>:subagent:<uuid>` | Sub-sub-agent (leaf worker) | No, when `maxSpawnDepth = 2` |
### Announce chain
@@ -128,9 +128,9 @@ Each level only sees announces from its direct children.
### Tool policy by depth
- **Depth 1 (orchestrator, when `maxSpawnDepth >= 2`)**: Gets `sessions_spawn`, `subagents`, `sessions_list`, `sessions_history` so it can manage its children. Other session/system tools remain denied.
- **Depth 1 (leaf, when `maxSpawnDepth == 1`)**: No session tools (current default behavior).
- **Depth 2 (leaf worker)**: No session tools `sessions_spawn` is always denied at depth 2. Cannot spawn further children.
- **Depth 1 (orchestrator, default with `maxSpawnDepth = 2`)**: Gets `sessions_spawn`, `subagents`, `sessions_list`, `sessions_history` so it can manage its children. Other session/system tools remain denied.
- **Depth 1 (leaf, when `maxSpawnDepth = 1`)**: No session tools.
- **Depth 2 (leaf worker, default `maxSpawnDepth = 2`)**: No session tools, `sessions_spawn` is denied at depth 2, cannot spawn further children.
### Per-agent spawn limit
@@ -156,17 +156,16 @@ Note: the merge is additive, so main profiles are always available as fallbacks.
## Announce
Sub-agents report back via an announce step:
Sub-agents report back via an announce injection step:
- The announce step runs inside the sub-agent session (not the requester session).
- If the sub-agent replies exactly `ANNOUNCE_SKIP`, nothing is posted.
- Otherwise the announce reply is posted to the requester chat channel via a follow-up `agent` call (`deliver=true`).
- Announce replies preserve thread/topic routing when available (Slack threads, Telegram topics, Matrix threads).
- Announce messages are normalized to a stable template:
- `Status:` derived from the run outcome (`success`, `error`, `timeout`, or `unknown`).
- `Result:` the summary content from the announce step (or `(not available)` if missing).
- `Notes:` error details and other useful context.
- `Status` is not inferred from model output; it comes from runtime outcome signals.
- OpenClaw reads the child session's latest assistant reply after completion, with a short settle retry.
- It builds a system message with `Status`, `Result`, compact stats, and reply guidance.
- The message is injected with a follow-up `agent` call:
- `deliver=false` when the requester is another sub-agent, this keeps orchestration internal.
- `deliver=true` when the requester is main, this produces the user-facing update.
- Delivery context prefers captured requester origin, but non-deliverable channels (for example `webchat`) are ignored in favor of persisted deliverable routes.
- Recipient agents can return the internal silent token to suppress duplicate outward delivery in the same turn.
- `Status` is derived from runtime outcome signals, not inferred from model output.
Announce payloads include a stats line at the end (even when wrapped):
@@ -184,7 +183,7 @@ By default, sub-agents get **all tools except session tools** and system tools:
- `sessions_send`
- `sessions_spawn`
When `maxSpawnDepth >= 2`, depth-1 orchestrator sub-agents additionally receive `sessions_spawn`, `subagents`, `sessions_list`, and `sessions_history` so they can manage their children.
With the default `maxSpawnDepth = 2`, depth-1 orchestrator sub-agents receive `sessions_spawn`, `subagents`, `sessions_list`, and `sessions_history` so they can manage their children. If you set `maxSpawnDepth = 1`, those session tools stay denied.
Override via config:

View File

@@ -94,13 +94,14 @@ describe("sessions_spawn depth + child limits", () => {
});
});
it("rejects spawning when caller depth reaches maxSpawnDepth", async () => {
it("allows depth-1 callers by default (maxSpawnDepth defaults to 2)", async () => {
const tool = createSessionsSpawnTool({ agentSessionKey: "agent:main:subagent:parent" });
const result = await tool.execute("call-depth-reject", { task: "hello" });
expect(result.details).toMatchObject({
status: "forbidden",
error: "sessions_spawn is not allowed at this depth (current depth: 1, max: 1)",
status: "accepted",
childSessionKey: expect.stringMatching(/^agent:main:subagent:/),
runId: "run-depth",
});
});

View File

@@ -133,35 +133,6 @@ const waitFor = async (predicate: () => boolean, timeoutMs = 2000) => {
);
};
function expectSingleCompletionSend(
calls: GatewayRequest[],
expected: { sessionKey: string; channel: string; to: string; message: string },
) {
const sendCalls = calls.filter((call) => call.method === "send");
expect(sendCalls).toHaveLength(1);
const send = sendCalls[0]?.params as
| { sessionKey?: string; channel?: string; to?: string; message?: string }
| undefined;
expect(send?.sessionKey).toBe(expected.sessionKey);
expect(send?.channel).toBe(expected.channel);
expect(send?.to).toBe(expected.to);
expect(send?.message).toBe(expected.message);
}
function createDeleteCleanupHooks(setDeletedKey: (key: string | undefined) => void) {
return {
onAgentSubagentSpawn: (params: unknown) => {
const rec = params as { channel?: string; timeout?: number } | undefined;
expect(rec?.channel).toBe("discord");
expect(rec?.timeout).toBe(1);
},
onSessionsDelete: (params: unknown) => {
const rec = params as { key?: string } | undefined;
setDeletedKey(rec?.key);
},
};
}
describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
beforeEach(() => {
resetSessionsSpawnConfigOverride();
@@ -184,7 +155,6 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
const tool = await getSessionsSpawnTool({
agentSessionKey: "main",
agentChannel: "whatsapp",
agentTo: "+123",
});
const result = await tool.execute("call2", {
@@ -213,7 +183,7 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
await waitFor(() => ctx.waitCalls.some((call) => call.runId === child.runId));
await waitFor(() => patchCalls.some((call) => call.label === "my-task"));
await waitFor(() => ctx.calls.filter((c) => c.method === "send").length >= 1);
await waitFor(() => ctx.calls.filter((c) => c.method === "agent").length >= 2);
const childWait = ctx.waitCalls.find((call) => call.runId === child.runId);
expect(childWait?.timeoutMs).toBe(1000);
@@ -222,21 +192,22 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
expect(labelPatch?.key).toBe(child.sessionKey);
expect(labelPatch?.label).toBe("my-task");
// Subagent spawn call plus direct outbound completion send.
// Two agent calls: subagent spawn + main agent trigger
const agentCalls = ctx.calls.filter((c) => c.method === "agent");
expect(agentCalls).toHaveLength(1);
expect(agentCalls).toHaveLength(2);
// First call: subagent spawn
const first = agentCalls[0]?.params as { lane?: string } | undefined;
expect(first?.lane).toBe("subagent");
// Direct send should route completion to the requester channel/session.
expectSingleCompletionSend(ctx.calls, {
sessionKey: "agent:main:main",
channel: "whatsapp",
to: "+123",
message: "✅ Subagent main finished\n\ndone",
});
// Second call: main agent trigger (not "Sub-agent announce step." anymore)
const second = agentCalls[1]?.params as { sessionKey?: string; message?: string } | undefined;
expect(second?.sessionKey).toBe("main");
expect(second?.message).toContain("subagent task");
// No direct send to external channel (main agent handles delivery)
const sendCalls = ctx.calls.filter((c) => c.method === "send");
expect(sendCalls.length).toBe(0);
expect(child.sessionKey?.startsWith("agent:main:subagent:")).toBe(true);
});
@@ -245,15 +216,20 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
callGatewayMock.mockReset();
let deletedKey: string | undefined;
const ctx = setupSessionsSpawnGatewayMock({
...createDeleteCleanupHooks((key) => {
deletedKey = key;
}),
onAgentSubagentSpawn: (params) => {
const rec = params as { channel?: string; timeout?: number } | undefined;
expect(rec?.channel).toBe("discord");
expect(rec?.timeout).toBe(1);
},
onSessionsDelete: (params) => {
const rec = params as { key?: string } | undefined;
deletedKey = rec?.key;
},
});
const tool = await getSessionsSpawnTool({
agentSessionKey: "discord:group:req",
agentChannel: "discord",
agentTo: "discord:dm:u123",
});
const result = await tool.execute("call1", {
@@ -291,7 +267,7 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
expect(childWait?.timeoutMs).toBe(1000);
const agentCalls = ctx.calls.filter((call) => call.method === "agent");
expect(agentCalls).toHaveLength(1);
expect(agentCalls).toHaveLength(2);
const first = agentCalls[0]?.params as
| {
@@ -307,12 +283,19 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
expect(first?.sessionKey?.startsWith("agent:main:subagent:")).toBe(true);
expect(child.sessionKey?.startsWith("agent:main:subagent:")).toBe(true);
expectSingleCompletionSend(ctx.calls, {
sessionKey: "agent:main:discord:group:req",
channel: "discord",
to: "discord:dm:u123",
message: "✅ Subagent main finished",
});
const second = agentCalls[1]?.params as
| {
sessionKey?: string;
message?: string;
deliver?: boolean;
}
| undefined;
expect(second?.sessionKey).toBe("discord:group:req");
expect(second?.deliver).toBe(true);
expect(second?.message).toContain("subagent task");
const sendCalls = ctx.calls.filter((c) => c.method === "send");
expect(sendCalls.length).toBe(0);
expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true);
});
@@ -323,16 +306,21 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
let deletedKey: string | undefined;
const ctx = setupSessionsSpawnGatewayMock({
includeChatHistory: true,
...createDeleteCleanupHooks((key) => {
deletedKey = key;
}),
onAgentSubagentSpawn: (params) => {
const rec = params as { channel?: string; timeout?: number } | undefined;
expect(rec?.channel).toBe("discord");
expect(rec?.timeout).toBe(1);
},
onSessionsDelete: (params) => {
const rec = params as { key?: string } | undefined;
deletedKey = rec?.key;
},
agentWaitResult: { status: "ok", startedAt: 3000, endedAt: 4000 },
});
const tool = await getSessionsSpawnTool({
agentSessionKey: "discord:group:req",
agentChannel: "discord",
agentTo: "discord:dm:u123",
});
const result = await tool.execute("call1b", {
@@ -350,27 +338,29 @@ describe("openclaw-tools: subagents (sessions_spawn lifecycle)", () => {
throw new Error("missing child runId");
}
await waitFor(() => ctx.waitCalls.some((call) => call.runId === child.runId));
await waitFor(() => ctx.calls.filter((call) => call.method === "send").length >= 1);
await waitFor(() => ctx.calls.filter((call) => call.method === "agent").length >= 2);
await waitFor(() => Boolean(deletedKey));
const childWait = ctx.waitCalls.find((call) => call.runId === child.runId);
expect(childWait?.timeoutMs).toBe(1000);
expect(child.sessionKey?.startsWith("agent:main:subagent:")).toBe(true);
// One agent call for spawn, then direct completion send.
// Two agent calls: subagent spawn + main agent trigger
const agentCalls = ctx.calls.filter((call) => call.method === "agent");
expect(agentCalls).toHaveLength(1);
expect(agentCalls).toHaveLength(2);
// First call: subagent spawn
const first = agentCalls[0]?.params as { lane?: string } | undefined;
expect(first?.lane).toBe("subagent");
expectSingleCompletionSend(ctx.calls, {
sessionKey: "agent:main:discord:group:req",
channel: "discord",
to: "discord:dm:u123",
message: "✅ Subagent main finished\n\ndone",
});
// Second call: main agent trigger
const second = agentCalls[1]?.params as { sessionKey?: string; deliver?: boolean } | undefined;
expect(second?.sessionKey).toBe("discord:group:req");
expect(second?.deliver).toBe(true);
// No direct send to external channel (main agent handles delivery)
const sendCalls = ctx.calls.filter((c) => c.method === "send");
expect(sendCalls.length).toBe(0);
// Session should be deleted
expect(deletedKey?.startsWith("agent:main:subagent:")).toBe(true);

View File

@@ -1,4 +1,5 @@
import { getChannelDock } from "../channels/dock.js";
import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../config/agent-limits.js";
import type { OpenClawConfig } from "../config/config.js";
import { resolveChannelGroupToolsPolicy } from "../config/group-policy.js";
import { resolveThreadParentSessionKey } from "../sessions/session-key-utils.js";
@@ -83,7 +84,8 @@ function resolveSubagentDenyList(depth: number, maxSpawnDepth: number): string[]
export function resolveSubagentToolPolicy(cfg?: OpenClawConfig, depth?: number): SandboxToolPolicy {
const configured = cfg?.tools?.subagents?.tools;
const maxSpawnDepth = cfg?.agents?.defaults?.subagents?.maxSpawnDepth ?? 1;
const maxSpawnDepth =
cfg?.agents?.defaults?.subagents?.maxSpawnDepth ?? DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH;
const effectiveDepth = typeof depth === "number" && depth >= 0 ? depth : 1;
const baseDeny = resolveSubagentDenyList(effectiveDepth, maxSpawnDepth);
const deny = [...baseDeny, ...(Array.isArray(configured?.deny) ? configured.deny : [])];

View File

@@ -7,8 +7,13 @@ type RequesterResolution = {
requesterOrigin?: Record<string, unknown>;
} | null;
type DescendantRun = {
runId: string;
requesterSessionKey: string;
childSessionKey: string;
};
const agentSpy = vi.fn(async (_req: AgentCallRequest) => ({ runId: "run-main", status: "ok" }));
const sendSpy = vi.fn(async (_req: AgentCallRequest) => ({ runId: "send-main", status: "ok" }));
const sessionsDeleteSpy = vi.fn((_req: AgentCallRequest) => undefined);
const readLatestAssistantReplyMock = vi.fn(
async (_sessionKey?: string): Promise<string | undefined> => "raw subagent reply",
@@ -22,11 +27,9 @@ const embeddedRunMock = {
const subagentRegistryMock = {
isSubagentSessionRunActive: vi.fn(() => true),
countActiveDescendantRuns: vi.fn((_sessionKey: string) => 0),
listDescendantRunsForRequester: vi.fn((_sessionKey: string): DescendantRun[] => []),
resolveRequesterForChildSession: vi.fn((_sessionKey: string): RequesterResolution => null),
};
const chatHistoryMock = vi.fn(async (_sessionKey?: string) => ({
messages: [] as Array<unknown>,
}));
let sessionStore: Record<string, Record<string, unknown>> = {};
let configOverride: ReturnType<(typeof import("../config/config.js"))["loadConfig"]> = {
session: {
@@ -67,15 +70,9 @@ vi.mock("../gateway/call.js", () => ({
if (typed.method === "agent") {
return await agentSpy(typed);
}
if (typed.method === "send") {
return await sendSpy(typed);
}
if (typed.method === "agent.wait") {
return { status: "error", startedAt: 10, endedAt: 20, error: "boom" };
}
if (typed.method === "chat.history") {
return await chatHistoryMock(typed.params?.sessionKey);
}
if (typed.method === "sessions.patch") {
return {};
}
@@ -115,7 +112,6 @@ vi.mock("../config/config.js", async (importOriginal) => {
describe("subagent announce formatting", () => {
beforeEach(() => {
agentSpy.mockClear();
sendSpy.mockClear();
sessionsDeleteSpy.mockClear();
embeddedRunMock.isEmbeddedPiRunActive.mockReset().mockReturnValue(false);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReset().mockReturnValue(false);
@@ -123,9 +119,9 @@ describe("subagent announce formatting", () => {
embeddedRunMock.waitForEmbeddedPiRunEnd.mockReset().mockResolvedValue(true);
subagentRegistryMock.isSubagentSessionRunActive.mockReset().mockReturnValue(true);
subagentRegistryMock.countActiveDescendantRuns.mockReset().mockReturnValue(0);
subagentRegistryMock.listDescendantRunsForRequester.mockReset().mockReturnValue([]);
subagentRegistryMock.resolveRequesterForChildSession.mockReset().mockReturnValue(null);
readLatestAssistantReplyMock.mockReset().mockResolvedValue("raw subagent reply");
chatHistoryMock.mockReset().mockResolvedValue({ messages: [] });
sessionStore = {};
configOverride = {
session: {
@@ -209,72 +205,6 @@ describe("subagent announce formatting", () => {
);
});
it.each([
{ role: "toolResult", toolOutput: "tool output line 1", childRunId: "run-tool-fallback-1" },
{ role: "tool", toolOutput: "tool output line 2", childRunId: "run-tool-fallback-2" },
] as const)(
"falls back to latest $role output when assistant reply is empty",
async (testCase) => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
chatHistoryMock.mockResolvedValueOnce({
messages: [
{
role: "assistant",
content: [{ type: "text", text: "" }],
},
{
role: testCase.role,
content: [{ type: "text", text: testCase.toolOutput }],
},
],
});
readLatestAssistantReplyMock.mockResolvedValue("");
await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:worker",
childRunId: testCase.childRunId,
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
...defaultOutcomeAnnounce,
waitForCompletion: false,
});
const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
const msg = call?.params?.message as string;
expect(msg).toContain(testCase.toolOutput);
},
);
it("uses latest assistant text when it appears after a tool output", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
chatHistoryMock.mockResolvedValueOnce({
messages: [
{
role: "tool",
content: [{ type: "text", text: "tool output line" }],
},
{
role: "assistant",
content: [{ type: "text", text: "assistant final line" }],
},
],
});
readLatestAssistantReplyMock.mockResolvedValue("");
await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:worker",
childRunId: "run-latest-assistant",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
...defaultOutcomeAnnounce,
waitForCompletion: false,
});
const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
const msg = call?.params?.message as string;
expect(msg).toContain("assistant final line");
});
it("keeps full findings and includes compact stats", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
sessionStore = {
@@ -312,121 +242,6 @@ describe("subagent announce formatting", () => {
expect(msg).toContain("step-139");
});
it("sends deterministic completion message directly for manual spawn completion", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
sessionStore = {
"agent:main:subagent:test": {
sessionId: "child-session-direct",
inputTokens: 12,
outputTokens: 34,
totalTokens: 46,
},
"agent:main:main": {
sessionId: "requester-session",
},
};
chatHistoryMock.mockResolvedValueOnce({
messages: [{ role: "assistant", content: [{ type: "text", text: "final answer: 2" }] }],
});
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-completion",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" },
...defaultOutcomeAnnounce,
expectsCompletionMessage: true,
});
expect(didAnnounce).toBe(true);
expect(sendSpy).toHaveBeenCalledTimes(1);
expect(agentSpy).not.toHaveBeenCalled();
const call = sendSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
const rawMessage = call?.params?.message;
const msg = typeof rawMessage === "string" ? rawMessage : "";
expect(call?.params?.channel).toBe("discord");
expect(call?.params?.to).toBe("channel:12345");
expect(call?.params?.sessionKey).toBe("agent:main:main");
expect(msg).toContain("✅ Subagent main finished");
expect(msg).toContain("final answer: 2");
expect(msg).not.toContain("Convert the result above into your normal assistant voice");
});
it("ignores stale session thread hints for manual completion direct-send", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
sessionStore = {
"agent:main:subagent:test": {
sessionId: "child-session-direct-thread",
},
"agent:main:main": {
sessionId: "requester-session-thread",
lastChannel: "discord",
lastTo: "channel:stale",
lastThreadId: 42,
},
};
chatHistoryMock.mockResolvedValueOnce({
messages: [{ role: "assistant", content: [{ type: "text", text: "done" }] }],
});
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-stale-thread",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: { channel: "discord", to: "channel:12345", accountId: "acct-1" },
...defaultOutcomeAnnounce,
expectsCompletionMessage: true,
});
expect(didAnnounce).toBe(true);
expect(sendSpy).toHaveBeenCalledTimes(1);
expect(agentSpy).not.toHaveBeenCalled();
const call = sendSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.channel).toBe("discord");
expect(call?.params?.to).toBe("channel:12345");
expect(call?.params?.threadId).toBeUndefined();
});
it("passes requesterOrigin.threadId for manual completion direct-send", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
sessionStore = {
"agent:main:subagent:test": {
sessionId: "child-session-direct-thread-pass",
},
"agent:main:main": {
sessionId: "requester-session-thread-pass",
},
};
chatHistoryMock.mockResolvedValueOnce({
messages: [{ role: "assistant", content: [{ type: "text", text: "done" }] }],
});
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-thread-pass",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
requesterOrigin: {
channel: "discord",
to: "channel:12345",
accountId: "acct-1",
threadId: 99,
},
...defaultOutcomeAnnounce,
expectsCompletionMessage: true,
});
expect(didAnnounce).toBe(true);
expect(sendSpy).toHaveBeenCalledTimes(1);
expect(agentSpy).not.toHaveBeenCalled();
const call = sendSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.channel).toBe("discord");
expect(call?.params?.to).toBe("channel:12345");
expect(call?.params?.threadId).toBe("99");
});
it("steers announcements into an active run when queue mode is steer", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
@@ -541,139 +356,6 @@ describe("subagent announce formatting", () => {
expect(new Set(idempotencyKeys).size).toBe(2);
});
it("prefers direct delivery first for completion-mode and then queues on direct failure", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
sessionStore = {
"agent:main:main": {
sessionId: "session-collect",
lastChannel: "whatsapp",
lastTo: "+1555",
queueMode: "collect",
queueDebounceMs: 0,
},
};
sendSpy.mockRejectedValueOnce(new Error("direct delivery unavailable"));
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:worker",
childRunId: "run-completion-direct-fallback",
requesterSessionKey: "main",
requesterDisplayKey: "main",
expectsCompletionMessage: true,
...defaultOutcomeAnnounce,
});
expect(didAnnounce).toBe(true);
await expect.poll(() => sendSpy.mock.calls.length).toBe(1);
await expect.poll(() => agentSpy.mock.calls.length).toBe(1);
expect(sendSpy.mock.calls[0]?.[0]).toMatchObject({
method: "send",
params: { sessionKey: "agent:main:main" },
});
expect(agentSpy.mock.calls[0]?.[0]).toMatchObject({
method: "agent",
params: { sessionKey: "agent:main:main" },
});
expect(agentSpy.mock.calls[0]?.[0]).toMatchObject({
method: "agent",
params: { channel: "whatsapp", to: "+1555", deliver: true },
});
});
it("returns failure for completion-mode when direct delivery fails and queue fallback is unavailable", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
sessionStore = {
"agent:main:main": {
sessionId: "session-direct-only",
lastChannel: "whatsapp",
lastTo: "+1555",
},
};
sendSpy.mockRejectedValueOnce(new Error("direct delivery unavailable"));
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:worker",
childRunId: "run-completion-direct-fail",
requesterSessionKey: "main",
requesterDisplayKey: "main",
expectsCompletionMessage: true,
...defaultOutcomeAnnounce,
});
expect(didAnnounce).toBe(false);
expect(sendSpy).toHaveBeenCalledTimes(1);
expect(agentSpy).toHaveBeenCalledTimes(0);
});
it("uses assistant output for completion-mode when latest assistant text exists", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
chatHistoryMock.mockResolvedValueOnce({
messages: [
{
role: "toolResult",
content: [{ type: "text", text: "old tool output" }],
},
{
role: "assistant",
content: [{ type: "text", text: "assistant completion text" }],
},
],
});
readLatestAssistantReplyMock.mockResolvedValue("assistant ignored fallback");
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:worker",
childRunId: "run-completion-assistant-output",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
expectsCompletionMessage: true,
...defaultOutcomeAnnounce,
});
expect(didAnnounce).toBe(true);
await expect.poll(() => sendSpy.mock.calls.length).toBe(1);
const call = sendSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
const msg = call?.params?.message as string;
expect(msg).toContain("assistant completion text");
expect(msg).not.toContain("old tool output");
});
it("falls back to latest tool output for completion-mode when assistant output is empty", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
chatHistoryMock.mockResolvedValueOnce({
messages: [
{
role: "assistant",
content: [{ type: "text", text: "" }],
},
{
role: "toolResult",
content: [{ type: "text", text: "tool output only" }],
},
],
});
readLatestAssistantReplyMock.mockResolvedValue("");
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:worker",
childRunId: "run-completion-tool-output",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
expectsCompletionMessage: true,
...defaultOutcomeAnnounce,
});
expect(didAnnounce).toBe(true);
await expect.poll(() => sendSpy.mock.calls.length).toBe(1);
const call = sendSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
const msg = call?.params?.message as string;
expect(msg).toContain("tool output only");
});
it("queues announce delivery back into requester subagent session", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
@@ -706,24 +388,7 @@ describe("subagent announce formatting", () => {
expect(call?.params?.to).toBeUndefined();
});
it.each([
{
testName: "includes threadId when origin has an active topic/thread",
childRunId: "run-thread",
expectedThreadId: "42",
requesterOrigin: undefined,
},
{
testName: "prefers requesterOrigin.threadId over session entry threadId",
childRunId: "run-thread-override",
expectedThreadId: "99",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
threadId: 99,
},
},
] as const)("$testName", async (testCase) => {
it("includes threadId when origin has an active topic/thread", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
@@ -740,10 +405,9 @@ describe("subagent announce formatting", () => {
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: testCase.childRunId,
childRunId: "run-thread",
requesterSessionKey: "main",
requesterDisplayKey: "main",
...(testCase.requesterOrigin ? { requesterOrigin: testCase.requesterOrigin } : {}),
...defaultOutcomeAnnounce,
});
@@ -751,7 +415,42 @@ describe("subagent announce formatting", () => {
const params = await getSingleAgentCallParams();
expect(params.channel).toBe("telegram");
expect(params.to).toBe("telegram:123");
expect(params.threadId).toBe(testCase.expectedThreadId);
expect(params.threadId).toBe("42");
});
it("prefers requesterOrigin.threadId over session entry threadId", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
sessionStore = {
"agent:main:main": {
sessionId: "session-thread-override",
lastChannel: "telegram",
lastTo: "telegram:123",
lastThreadId: 42,
queueMode: "collect",
queueDebounceMs: 0,
},
};
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-thread-override",
requesterSessionKey: "main",
requesterDisplayKey: "main",
requesterOrigin: {
channel: "telegram",
to: "telegram:123",
threadId: 99,
},
...defaultOutcomeAnnounce,
});
expect(didAnnounce).toBe(true);
await expect.poll(() => agentSpy.mock.calls.length).toBe(1);
const call = agentSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.threadId).toBe("99");
});
it("splits collect-mode queues when accountId differs", async () => {
@@ -795,31 +494,16 @@ describe("subagent announce formatting", () => {
expect(accountIds).toEqual(expect.arrayContaining(["acct-a", "acct-b"]));
});
it.each([
{
testName: "uses requester origin for direct announce when not queued",
childRunId: "run-direct",
requesterOrigin: { channel: "whatsapp", accountId: "acct-123" },
expectedChannel: "whatsapp",
expectedAccountId: "acct-123",
},
{
testName: "normalizes requesterOrigin for direct announce delivery",
childRunId: "run-direct-origin",
requesterOrigin: { channel: " whatsapp ", accountId: " acct-987 " },
expectedChannel: "whatsapp",
expectedAccountId: "acct-987",
},
] as const)("$testName", async (testCase) => {
it("uses requester origin for direct announce when not queued", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: testCase.childRunId,
childRunId: "run-direct",
requesterSessionKey: "agent:main:main",
requesterOrigin: testCase.requesterOrigin,
requesterOrigin: { channel: "whatsapp", accountId: "acct-123" },
requesterDisplayKey: "main",
...defaultOutcomeAnnounce,
});
@@ -829,8 +513,8 @@ describe("subagent announce formatting", () => {
params?: Record<string, unknown>;
expectFinal?: boolean;
};
expect(call?.params?.channel).toBe(testCase.expectedChannel);
expect(call?.params?.accountId).toBe(testCase.expectedAccountId);
expect(call?.params?.channel).toBe("whatsapp");
expect(call?.params?.accountId).toBe("acct-123");
expect(call?.expectFinal).toBe(true);
});
@@ -933,6 +617,93 @@ describe("subagent announce formatting", () => {
expect(agentSpy).not.toHaveBeenCalled();
});
it("waits for follow-up reply when descendant runs exist and child reply is still waiting", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
const waitingReply = "Spawned the nested subagent. Waiting for its auto-announced result.";
const finalReply = "Nested subagent finished and I synthesized the final result.";
subagentRegistryMock.listDescendantRunsForRequester.mockImplementation((sessionKey: string) =>
sessionKey === "agent:main:subagent:parent"
? [
{
runId: "run-leaf",
requesterSessionKey: sessionKey,
childSessionKey: "agent:main:subagent:parent:subagent:leaf",
},
]
: [],
);
readLatestAssistantReplyMock
.mockResolvedValueOnce(waitingReply)
.mockResolvedValueOnce(waitingReply)
.mockResolvedValueOnce(finalReply);
vi.useFakeTimers();
try {
const announcePromise = runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:parent",
childRunId: "run-parent",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
...defaultOutcomeAnnounce,
});
await vi.advanceTimersByTimeAsync(500);
const didAnnounce = await announcePromise;
expect(didAnnounce).toBe(true);
} finally {
vi.useRealTimers();
}
const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } };
const msg = call?.params?.message as string;
expect(msg).toContain(finalReply);
expect(msg).not.toContain("Waiting for its auto-announced result.");
});
it("defers announce when descendant follow-up reply has not arrived yet", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
const waitingReply = "Spawned the nested subagent. Waiting for its auto-announced result.";
subagentRegistryMock.listDescendantRunsForRequester.mockImplementation((sessionKey: string) =>
sessionKey === "agent:main:subagent:parent"
? [
{
runId: "run-leaf",
requesterSessionKey: sessionKey,
childSessionKey: "agent:main:subagent:parent:subagent:leaf",
},
]
: [],
);
readLatestAssistantReplyMock.mockResolvedValue(waitingReply);
vi.useFakeTimers();
try {
const announcePromise = runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:parent",
childRunId: "run-parent-still-waiting",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "main",
task: "nested test",
timeoutMs: 700,
cleanup: "keep",
waitForCompletion: false,
startedAt: 10,
endedAt: 20,
outcome: { status: "ok" },
});
await vi.advanceTimersByTimeAsync(1200);
const didAnnounce = await announcePromise;
expect(didAnnounce).toBe(false);
} finally {
vi.useRealTimers();
}
expect(agentSpy).not.toHaveBeenCalled();
});
it("bubbles child announce to parent requester when requester subagent already ended", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
subagentRegistryMock.isSubagentSessionRunActive.mockReturnValue(false);
@@ -1013,6 +784,26 @@ describe("subagent announce formatting", () => {
expect(agentSpy).not.toHaveBeenCalled();
});
it("normalizes requesterOrigin for direct announce delivery", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-direct-origin",
requesterSessionKey: "agent:main:main",
requesterOrigin: { channel: " whatsapp ", accountId: " acct-987 " },
requesterDisplayKey: "main",
...defaultOutcomeAnnounce,
});
expect(didAnnounce).toBe(true);
const call = agentSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.channel).toBe("whatsapp");
expect(call?.params?.accountId).toBe("acct-987");
});
it("prefers requesterOrigin channel over stale session lastChannel in queued announce", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
@@ -1045,6 +836,35 @@ describe("subagent announce formatting", () => {
expect(call?.params?.to).toBe("bluebubbles:chat_guid:123");
});
it("falls back to persisted deliverable route when requesterOrigin channel is non-deliverable", async () => {
const { runSubagentAnnounceFlow } = await import("./subagent-announce.js");
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(false);
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
sessionStore = {
"agent:main:main": {
sessionId: "session-webchat-origin",
lastChannel: "discord",
lastTo: "discord:channel:123",
lastAccountId: "acct-store",
},
};
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: "agent:main:subagent:test",
childRunId: "run-webchat-origin",
requesterSessionKey: "main",
requesterOrigin: { channel: "webchat", to: "ignored", accountId: "acct-live" },
requesterDisplayKey: "main",
...defaultOutcomeAnnounce,
});
expect(didAnnounce).toBe(true);
const call = agentSpy.mock.calls[0]?.[0] as { params?: Record<string, unknown> };
expect(call?.params?.channel).toBe("discord");
expect(call?.params?.to).toBe("discord:channel:123");
expect(call?.params?.accountId).toBe("acct-live");
});
it("routes to parent subagent when parent run ended but session still exists (#18037)", async () => {
// Scenario: Newton (depth-1) spawns Birdie (depth-2). Newton's agent turn ends
// after spawning but Newton's SESSION still exists (waiting for Birdie's result).

View File

@@ -1,5 +1,6 @@
import { resolveQueueSettings } from "../auto-reply/reply/queue.js";
import { SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../config/agent-limits.js";
import { loadConfig } from "../config/config.js";
import {
loadSessionStore,
@@ -10,14 +11,13 @@ import {
import { callGateway } from "../gateway/call.js";
import { normalizeMainKey } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
import { extractTextFromChatContent } from "../shared/chat-content.js";
import {
type DeliveryContext,
deliveryContextFromSession,
mergeDeliveryContext,
normalizeDeliveryContext,
} from "../utils/delivery-context.js";
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
import { isInternalMessageChannel } from "../utils/message-channel.js";
import {
buildAnnounceIdFromChildRun,
buildAnnounceIdempotencyKey,
@@ -30,170 +30,7 @@ import {
} from "./pi-embedded.js";
import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js";
import { getSubagentDepthFromSessionStore } from "./subagent-depth.js";
import { sanitizeTextContent, extractAssistantText } from "./tools/sessions-helpers.js";
type ToolResultMessage = {
role?: unknown;
content?: unknown;
};
type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none";
type SubagentAnnounceDeliveryResult = {
delivered: boolean;
path: SubagentDeliveryPath;
error?: string;
};
function buildCompletionDeliveryMessage(params: {
findings: string;
subagentName: string;
}): string {
const findingsText = params.findings.trim();
const hasFindings = findingsText.length > 0 && findingsText !== "(no output)";
const header = `✅ Subagent ${params.subagentName} finished`;
if (!hasFindings) {
return header;
}
return `${header}\n\n${findingsText}`;
}
function summarizeDeliveryError(error: unknown): string {
if (error instanceof Error) {
return error.message || "error";
}
if (typeof error === "string") {
return error;
}
if (error === undefined || error === null) {
return "unknown error";
}
try {
return JSON.stringify(error);
} catch {
return "error";
}
}
function extractToolResultText(content: unknown): string {
if (typeof content === "string") {
return sanitizeTextContent(content);
}
if (content && typeof content === "object" && !Array.isArray(content)) {
const obj = content as {
text?: unknown;
output?: unknown;
content?: unknown;
result?: unknown;
error?: unknown;
summary?: unknown;
};
if (typeof obj.text === "string") {
return sanitizeTextContent(obj.text);
}
if (typeof obj.output === "string") {
return sanitizeTextContent(obj.output);
}
if (typeof obj.content === "string") {
return sanitizeTextContent(obj.content);
}
if (typeof obj.result === "string") {
return sanitizeTextContent(obj.result);
}
if (typeof obj.error === "string") {
return sanitizeTextContent(obj.error);
}
if (typeof obj.summary === "string") {
return sanitizeTextContent(obj.summary);
}
}
if (!Array.isArray(content)) {
return "";
}
const joined = extractTextFromChatContent(content, {
sanitizeText: sanitizeTextContent,
normalizeText: (text) => text,
joinWith: "\n",
});
return joined?.trim() ?? "";
}
function extractInlineTextContent(content: unknown): string {
if (!Array.isArray(content)) {
return "";
}
return (
extractTextFromChatContent(content, {
sanitizeText: sanitizeTextContent,
normalizeText: (text) => text.trim(),
joinWith: "",
}) ?? ""
);
}
function extractSubagentOutputText(message: unknown): string {
if (!message || typeof message !== "object") {
return "";
}
const role = (message as { role?: unknown }).role;
const content = (message as { content?: unknown }).content;
if (role === "assistant") {
const assistantText = extractAssistantText(message);
if (assistantText) {
return assistantText;
}
if (typeof content === "string") {
return sanitizeTextContent(content);
}
if (Array.isArray(content)) {
return extractInlineTextContent(content);
}
return "";
}
if (role === "toolResult" || role === "tool") {
return extractToolResultText((message as ToolResultMessage).content);
}
if (typeof content === "string") {
return sanitizeTextContent(content);
}
if (Array.isArray(content)) {
return extractInlineTextContent(content);
}
return "";
}
async function readLatestSubagentOutput(sessionKey: string): Promise<string | undefined> {
const history = await callGateway<{ messages?: Array<unknown> }>({
method: "chat.history",
params: { sessionKey, limit: 50 },
});
const messages = Array.isArray(history?.messages) ? history.messages : [];
for (let i = messages.length - 1; i >= 0; i -= 1) {
const msg = messages[i];
const text = extractSubagentOutputText(msg);
if (text) {
return text;
}
}
return undefined;
}
async function readLatestSubagentOutputWithRetry(params: {
sessionKey: string;
maxWaitMs: number;
}): Promise<string | undefined> {
const RETRY_INTERVAL_MS = 100;
const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 15_000));
let result: string | undefined;
while (Date.now() < deadline) {
result = await readLatestSubagentOutput(params.sessionKey);
if (result?.trim()) {
return result;
}
await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS));
}
return result;
}
import { readLatestAssistantReply } from "./tools/agent-step.js";
function formatDurationShort(valueMs?: number) {
if (!valueMs || !Number.isFinite(valueMs) || valueMs <= 0) {
@@ -273,8 +110,8 @@ function resolveAnnounceOrigin(
): DeliveryContext | undefined {
const normalizedRequester = normalizeDeliveryContext(requesterOrigin);
const normalizedEntry = deliveryContextFromSession(entry);
if (normalizedRequester?.channel && !isDeliverableMessageChannel(normalizedRequester.channel)) {
// Ignore internal/non-deliverable channel hints (for example webchat)
if (normalizedRequester?.channel && isInternalMessageChannel(normalizedRequester.channel)) {
// Ignore internal channel hints, for example webchat,
// so a valid persisted route can still be used for outbound delivery.
return mergeDeliveryContext(
{
@@ -284,7 +121,7 @@ function resolveAnnounceOrigin(
normalizedEntry,
);
}
// requesterOrigin (captured at spawn time) reflects the channel the user is
// requesterOrigin, captured at spawn time, reflects the channel the user is
// actually on and must take priority over the session entry, which may carry
// stale lastChannel / lastTo values from a previous channel interaction.
return mergeDeliveryContext(normalizedRequester, normalizedEntry);
@@ -408,182 +245,6 @@ async function maybeQueueSubagentAnnounce(params: {
return "none";
}
function queueOutcomeToDeliveryResult(
outcome: "steered" | "queued" | "none",
): SubagentAnnounceDeliveryResult {
if (outcome === "steered") {
return {
delivered: true,
path: "steered",
};
}
if (outcome === "queued") {
return {
delivered: true,
path: "queued",
};
}
return {
delivered: false,
path: "none",
};
}
async function sendSubagentAnnounceDirectly(params: {
targetRequesterSessionKey: string;
triggerMessage: string;
completionMessage?: string;
expectsCompletionMessage: boolean;
directIdempotencyKey: string;
completionDirectOrigin?: DeliveryContext;
directOrigin?: DeliveryContext;
requesterIsSubagent: boolean;
}): Promise<SubagentAnnounceDeliveryResult> {
const cfg = loadConfig();
const canonicalRequesterSessionKey = resolveRequesterStoreKey(
cfg,
params.targetRequesterSessionKey,
);
try {
const completionDirectOrigin = normalizeDeliveryContext(params.completionDirectOrigin);
const completionChannelRaw =
typeof completionDirectOrigin?.channel === "string"
? completionDirectOrigin.channel.trim()
: "";
const completionChannel =
completionChannelRaw && isDeliverableMessageChannel(completionChannelRaw)
? completionChannelRaw
: "";
const completionTo =
typeof completionDirectOrigin?.to === "string" ? completionDirectOrigin.to.trim() : "";
const hasCompletionDirectTarget =
!params.requesterIsSubagent && Boolean(completionChannel) && Boolean(completionTo);
if (
params.expectsCompletionMessage &&
hasCompletionDirectTarget &&
params.completionMessage?.trim()
) {
const completionThreadId =
completionDirectOrigin?.threadId != null && completionDirectOrigin.threadId !== ""
? String(completionDirectOrigin.threadId)
: undefined;
await callGateway({
method: "send",
params: {
channel: completionChannel,
to: completionTo,
accountId: completionDirectOrigin?.accountId,
threadId: completionThreadId,
sessionKey: canonicalRequesterSessionKey,
message: params.completionMessage,
idempotencyKey: params.directIdempotencyKey,
},
timeoutMs: 15_000,
});
return {
delivered: true,
path: "direct",
};
}
const directOrigin = normalizeDeliveryContext(params.directOrigin);
const threadId =
directOrigin?.threadId != null && directOrigin.threadId !== ""
? String(directOrigin.threadId)
: undefined;
await callGateway({
method: "agent",
params: {
sessionKey: canonicalRequesterSessionKey,
message: params.triggerMessage,
deliver: !params.requesterIsSubagent,
channel: params.requesterIsSubagent ? undefined : directOrigin?.channel,
accountId: params.requesterIsSubagent ? undefined : directOrigin?.accountId,
to: params.requesterIsSubagent ? undefined : directOrigin?.to,
threadId: params.requesterIsSubagent ? undefined : threadId,
idempotencyKey: params.directIdempotencyKey,
},
expectFinal: true,
timeoutMs: 15_000,
});
return {
delivered: true,
path: "direct",
};
} catch (err) {
return {
delivered: false,
path: "direct",
error: summarizeDeliveryError(err),
};
}
}
async function deliverSubagentAnnouncement(params: {
requesterSessionKey: string;
announceId?: string;
triggerMessage: string;
completionMessage?: string;
summaryLine?: string;
requesterOrigin?: DeliveryContext;
completionDirectOrigin?: DeliveryContext;
directOrigin?: DeliveryContext;
targetRequesterSessionKey: string;
requesterIsSubagent: boolean;
expectsCompletionMessage: boolean;
directIdempotencyKey: string;
}): Promise<SubagentAnnounceDeliveryResult> {
// Non-completion mode mirrors historical behavior: try queued/steered delivery first,
// then (only if not queued) attempt direct delivery.
if (!params.expectsCompletionMessage) {
const queueOutcome = await maybeQueueSubagentAnnounce({
requesterSessionKey: params.requesterSessionKey,
announceId: params.announceId,
triggerMessage: params.triggerMessage,
summaryLine: params.summaryLine,
requesterOrigin: params.requesterOrigin,
});
const queued = queueOutcomeToDeliveryResult(queueOutcome);
if (queued.delivered) {
return queued;
}
}
// Completion-mode uses direct send first so manual spawns can return immediately
// in the common ready-to-deliver case.
const direct = await sendSubagentAnnounceDirectly({
targetRequesterSessionKey: params.targetRequesterSessionKey,
triggerMessage: params.triggerMessage,
completionMessage: params.completionMessage,
directIdempotencyKey: params.directIdempotencyKey,
completionDirectOrigin: params.completionDirectOrigin,
directOrigin: params.directOrigin,
requesterIsSubagent: params.requesterIsSubagent,
expectsCompletionMessage: params.expectsCompletionMessage,
});
if (direct.delivered || !params.expectsCompletionMessage) {
return direct;
}
// If completion path failed direct delivery, try queueing as a fallback so the
// report can still be delivered once the requester session is idle.
const queueOutcome = await maybeQueueSubagentAnnounce({
requesterSessionKey: params.requesterSessionKey,
announceId: params.announceId,
triggerMessage: params.triggerMessage,
summaryLine: params.summaryLine,
requesterOrigin: params.requesterOrigin,
});
if (queueOutcome === "steered" || queueOutcome === "queued") {
return queueOutcomeToDeliveryResult(queueOutcome);
}
return direct;
}
function loadSessionEntryByKey(sessionKey: string) {
const cfg = loadConfig();
const agentId = resolveAgentIdFromSessionKey(sessionKey);
@@ -592,6 +253,65 @@ function loadSessionEntryByKey(sessionKey: string) {
return store[sessionKey];
}
async function readLatestAssistantReplyWithRetry(params: {
sessionKey: string;
initialReply?: string;
maxWaitMs: number;
}): Promise<string | undefined> {
const RETRY_INTERVAL_MS = 100;
let reply = params.initialReply?.trim() ? params.initialReply : undefined;
if (reply) {
return reply;
}
const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 15_000));
while (Date.now() < deadline) {
await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS));
const latest = await readLatestAssistantReply({ sessionKey: params.sessionKey });
if (latest?.trim()) {
return latest;
}
}
return reply;
}
function isLikelyWaitingForDescendantResult(reply?: string): boolean {
const text = reply?.trim();
if (!text) {
return false;
}
const normalized = text.toLowerCase();
if (!normalized.includes("waiting")) {
return false;
}
return (
normalized.includes("subagent") ||
normalized.includes("child") ||
normalized.includes("auto-announce") ||
normalized.includes("auto announced") ||
normalized.includes("result")
);
}
async function waitForAssistantReplyChange(params: {
sessionKey: string;
previousReply?: string;
maxWaitMs: number;
}): Promise<string | undefined> {
const RETRY_INTERVAL_MS = 200;
const previous = params.previousReply?.trim() ?? "";
const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 30_000));
while (Date.now() < deadline) {
await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS));
const latest = await readLatestAssistantReply({ sessionKey: params.sessionKey });
const normalizedLatest = latest?.trim() ?? "";
if (normalizedLatest && normalizedLatest !== previous) {
return latest;
}
}
return undefined;
}
export function buildSubagentSystemPrompt(params: {
requesterSessionKey?: string;
requesterOrigin?: DeliveryContext;
@@ -608,7 +328,10 @@ export function buildSubagentSystemPrompt(params: {
? params.task.replace(/\s+/g, " ").trim()
: "{{TASK_DESCRIPTION}}";
const childDepth = typeof params.childDepth === "number" ? params.childDepth : 1;
const maxSpawnDepth = typeof params.maxSpawnDepth === "number" ? params.maxSpawnDepth : 1;
const maxSpawnDepth =
typeof params.maxSpawnDepth === "number"
? params.maxSpawnDepth
: DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH;
const canSpawn = childDepth < maxSpawnDepth;
const parentLabel = childDepth >= 2 ? "parent orchestrator" : "main agent";
@@ -692,11 +415,7 @@ function buildAnnounceReplyInstruction(params: {
remainingActiveSubagentRuns: number;
requesterIsSubagent: boolean;
announceType: SubagentAnnounceType;
expectsCompletionMessage?: boolean;
}): string {
if (params.expectsCompletionMessage) {
return `A completed ${params.announceType} is ready for user delivery. Convert the result above into your normal assistant voice and send that user-facing update now. Keep this internal context private (don't mention system/log/stats/session details or announce type).`;
}
if (params.remainingActiveSubagentRuns > 0) {
const activeRunsLabel = params.remainingActiveSubagentRuns === 1 ? "run" : "runs";
return `There are still ${params.remainingActiveSubagentRuns} active subagent ${activeRunsLabel} for this session. If they are part of the same workflow, wait for the remaining results before sending a user update. If they are unrelated, respond normally using only the result above.`;
@@ -723,10 +442,8 @@ export async function runSubagentAnnounceFlow(params: {
label?: string;
outcome?: SubagentRunOutcome;
announceType?: SubagentAnnounceType;
expectsCompletionMessage?: boolean;
}): Promise<boolean> {
let didAnnounce = false;
const expectsCompletionMessage = params.expectsCompletionMessage === true;
let shouldDeleteChildSession = params.cleanup === "delete";
try {
let targetRequesterSessionKey = params.requesterSessionKey;
@@ -742,7 +459,7 @@ export async function runSubagentAnnounceFlow(params: {
let outcome: SubagentRunOutcome | undefined = params.outcome;
// Lifecycle "end" can arrive before auto-compaction retries finish. If the
// subagent is still active, wait for the embedded run to fully settle.
if (!expectsCompletionMessage && childSessionId && isEmbeddedPiRunActive(childSessionId)) {
if (childSessionId && isEmbeddedPiRunActive(childSessionId)) {
const settled = await waitForEmbeddedPiRunEnd(childSessionId, settleTimeoutMs);
if (!settled && isEmbeddedPiRunActive(childSessionId)) {
// The child run is still active (e.g., compaction retry still in progress).
@@ -787,26 +504,22 @@ export async function runSubagentAnnounceFlow(params: {
outcome = { status: "timeout" };
}
}
reply = await readLatestSubagentOutput(params.childSessionKey);
reply = await readLatestAssistantReply({ sessionKey: params.childSessionKey });
}
if (!reply) {
reply = await readLatestSubagentOutput(params.childSessionKey);
reply = await readLatestAssistantReply({ sessionKey: params.childSessionKey });
}
if (!reply?.trim()) {
reply = await readLatestSubagentOutputWithRetry({
reply = await readLatestAssistantReplyWithRetry({
sessionKey: params.childSessionKey,
initialReply: reply,
maxWaitMs: params.timeoutMs,
});
}
if (
!expectsCompletionMessage &&
!reply?.trim() &&
childSessionId &&
isEmbeddedPiRunActive(childSessionId)
) {
if (!reply?.trim() && childSessionId && isEmbeddedPiRunActive(childSessionId)) {
// Avoid announcing "(no output)" while the child run is still producing output.
shouldDeleteChildSession = false;
return false;
@@ -823,12 +536,46 @@ export async function runSubagentAnnounceFlow(params: {
} catch {
// Best-effort only; fall back to direct announce behavior when unavailable.
}
if (!expectsCompletionMessage && activeChildDescendantRuns > 0) {
if (activeChildDescendantRuns > 0) {
// The finished run still has active descendant subagents. Defer announcing
// this run until descendants settle so we avoid posting in-progress updates.
shouldDeleteChildSession = false;
return false;
}
// If the subagent reply is still a "waiting for nested result" placeholder,
// hold this announce and wait for the follow-up turn that synthesizes child output.
let hasAnyChildDescendantRuns = false;
try {
const { listDescendantRunsForRequester } = await import("./subagent-registry.js");
hasAnyChildDescendantRuns = listDescendantRunsForRequester(params.childSessionKey).length > 0;
} catch {
// Best-effort only; fall back to existing behavior when unavailable.
}
if (hasAnyChildDescendantRuns && isLikelyWaitingForDescendantResult(reply)) {
const followupReply = await waitForAssistantReplyChange({
sessionKey: params.childSessionKey,
previousReply: reply,
maxWaitMs: settleTimeoutMs,
});
if (!followupReply?.trim()) {
shouldDeleteChildSession = false;
return false;
}
reply = followupReply;
try {
const { countActiveDescendantRuns } = await import("./subagent-registry.js");
activeChildDescendantRuns = Math.max(0, countActiveDescendantRuns(params.childSessionKey));
} catch {
activeChildDescendantRuns = 0;
}
if (
activeChildDescendantRuns > 0 ||
(hasAnyChildDescendantRuns && isLikelyWaitingForDescendantResult(reply))
) {
shouldDeleteChildSession = false;
return false;
}
}
// Build status label
const statusLabel =
@@ -843,14 +590,12 @@ export async function runSubagentAnnounceFlow(params: {
// Build instructional message for main agent
const announceType = params.announceType ?? "subagent task";
const taskLabel = params.label || params.task || "task";
const subagentName = resolveAgentIdFromSessionKey(params.childSessionKey);
const announceSessionId = childSessionId || "unknown";
const findings = reply || "(no output)";
let completionMessage = "";
let triggerMessage = "";
let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey);
let requesterIsSubagent = !expectsCompletionMessage && requesterDepth >= 1;
let requesterIsSubagent = requesterDepth >= 1;
// If the requester subagent has already finished, bubble the announce to its
// requester (typically main) so descendant completion is not silently lost.
// BUT: only fallback if the parent SESSION is deleted, not just if the current
@@ -903,31 +648,43 @@ export async function runSubagentAnnounceFlow(params: {
remainingActiveSubagentRuns,
requesterIsSubagent,
announceType,
expectsCompletionMessage,
});
const statsLine = await buildCompactAnnounceStatsLine({
sessionKey: params.childSessionKey,
startedAt: params.startedAt,
endedAt: params.endedAt,
});
completionMessage = buildCompletionDeliveryMessage({
findings,
subagentName,
});
const internalSummaryMessage = [
triggerMessage = [
`[System Message] [sessionId: ${announceSessionId}] A ${announceType} "${taskLabel}" just ${statusLabel}.`,
"",
"Result:",
findings,
"",
statsLine,
"",
replyInstruction,
].join("\n");
triggerMessage = [internalSummaryMessage, "", replyInstruction].join("\n");
const announceId = buildAnnounceIdFromChildRun({
childSessionKey: params.childSessionKey,
childRunId: params.childRunId,
});
const queued = await maybeQueueSubagentAnnounce({
requesterSessionKey: targetRequesterSessionKey,
announceId,
triggerMessage,
summaryLine: taskLabel,
requesterOrigin: targetRequesterOrigin,
});
if (queued === "steered") {
didAnnounce = true;
return true;
}
if (queued === "queued") {
didAnnounce = true;
return true;
}
// Send to the requester session. For nested subagents this is an internal
// follow-up injection (deliver=false) so the orchestrator receives it.
let directOrigin = targetRequesterOrigin;
@@ -939,26 +696,26 @@ export async function runSubagentAnnounceFlow(params: {
// catches duplicates if this announce is also queued by the gateway-
// level message queue while the main session is busy (#17122).
const directIdempotencyKey = buildAnnounceIdempotencyKey(announceId);
const delivery = await deliverSubagentAnnouncement({
requesterSessionKey: targetRequesterSessionKey,
announceId,
triggerMessage,
completionMessage,
summaryLine: taskLabel,
requesterOrigin: targetRequesterOrigin,
completionDirectOrigin: targetRequesterOrigin,
directOrigin,
targetRequesterSessionKey,
requesterIsSubagent,
expectsCompletionMessage: expectsCompletionMessage,
directIdempotencyKey,
await callGateway({
method: "agent",
params: {
sessionKey: targetRequesterSessionKey,
message: triggerMessage,
deliver: !requesterIsSubagent,
channel: requesterIsSubagent ? undefined : directOrigin?.channel,
accountId: requesterIsSubagent ? undefined : directOrigin?.accountId,
to: requesterIsSubagent ? undefined : directOrigin?.to,
threadId:
!requesterIsSubagent && directOrigin?.threadId != null && directOrigin.threadId !== ""
? String(directOrigin.threadId)
: undefined,
idempotencyKey: directIdempotencyKey,
},
expectFinal: true,
timeoutMs: 15_000,
});
didAnnounce = delivery.delivered;
if (!delivery.delivered && delivery.path === "direct" && delivery.error) {
defaultRuntime.error?.(
`Subagent completion direct announce failed for run ${params.childRunId}: ${delivery.error}`,
);
}
didAnnounce = true;
} catch (err) {
defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`);
// Best-effort follow-ups; ignore failures to avoid breaking the caller response.

View File

@@ -151,4 +151,57 @@ describe("announce loop guard (#18264)", () => {
const stored = runs.find((run) => run.runId === entry.runId);
expect(stored?.cleanupCompletedAt).toBeDefined();
});
test("does not consume retry budget while descendants are still active", async () => {
announceFn.mockClear();
registry.resetSubagentRegistryForTests();
const now = Date.now();
const parentEntry = {
runId: "test-parent-ended",
childSessionKey: "agent:main:subagent:parent-ended",
requesterSessionKey: "agent:main:main",
requesterDisplayKey: "agent:main:main",
task: "parent task",
cleanup: "keep" as const,
createdAt: now - 30_000,
startedAt: now - 20_000,
endedAt: now - 10_000,
expectsCompletionMessage: true,
cleanupHandled: false,
};
const activeDescendant = {
runId: "test-desc-active",
childSessionKey: "agent:main:subagent:parent-ended:subagent:leaf",
requesterSessionKey: "agent:main:subagent:parent-ended",
requesterDisplayKey: "agent:main:subagent:parent-ended",
task: "leaf task",
cleanup: "keep" as const,
createdAt: now - 5_000,
startedAt: now - 5_000,
expectsCompletionMessage: true,
cleanupHandled: false,
};
loadSubagentRegistryFromDisk.mockReturnValue(
new Map([
[parentEntry.runId, parentEntry],
[activeDescendant.runId, activeDescendant],
]),
);
registry.initSubagentRegistry();
await Promise.resolve();
await Promise.resolve();
expect(announceFn).toHaveBeenCalledWith(
expect.objectContaining({ childRunId: parentEntry.runId }),
);
const parent = registry
.listSubagentRunsForRequester("agent:main:main")
.find((run) => run.runId === parentEntry.runId);
expect(parent?.announceRetryCount).toBeUndefined();
expect(parent?.cleanupCompletedAt).toBeUndefined();
expect(parent?.cleanupHandled).toBe(false);
});
});

View File

@@ -102,7 +102,6 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor
requesterOrigin,
requesterDisplayKey: entry.requesterDisplayKey,
task: entry.task,
expectsCompletionMessage: entry.expectsCompletionMessage,
timeoutMs: SUBAGENT_ANNOUNCE_TIMEOUT_MS,
cleanup: entry.cleanup,
waitForCompletion: false,
@@ -324,12 +323,34 @@ function finalizeSubagentCleanup(runId: string, cleanup: "delete" | "keep", didA
}
if (!didAnnounce) {
const now = Date.now();
const endedAgo = typeof entry.endedAt === "number" ? now - entry.endedAt : 0;
// Normal defer: the run ended, but descendant runs are still active.
// Don't consume retry budget in this state or we can give up before
// descendants finish and before the parent synthesizes the final reply.
const activeDescendantRuns = Math.max(0, countActiveDescendantRuns(entry.childSessionKey));
if (entry.expectsCompletionMessage === true && activeDescendantRuns > 0) {
if (endedAgo > ANNOUNCE_EXPIRY_MS) {
logAnnounceGiveUp(entry, "expiry");
entry.cleanupCompletedAt = now;
persistSubagentRuns();
retryDeferredCompletedAnnounces(runId);
return;
}
entry.lastAnnounceRetryAt = now;
entry.cleanupHandled = false;
resumedRuns.delete(runId);
persistSubagentRuns();
setTimeout(() => {
resumeSubagentRun(runId);
}, MIN_ANNOUNCE_RETRY_DELAY_MS).unref?.();
return;
}
const retryCount = (entry.announceRetryCount ?? 0) + 1;
entry.announceRetryCount = retryCount;
entry.lastAnnounceRetryAt = now;
// Check if the announce has exceeded retry limits or expired (#18264).
const endedAgo = typeof entry.endedAt === "number" ? now - entry.endedAt : 0;
if (retryCount >= MAX_ANNOUNCE_RETRY_COUNT || endedAgo > ANNOUNCE_EXPIRY_MS) {
// Give up: mark as completed to break the infinite retry loop.
logAnnounceGiveUp(entry, retryCount >= MAX_ANNOUNCE_RETRY_COUNT ? "retry-limit" : "expiry");

View File

@@ -1,5 +1,6 @@
import crypto from "node:crypto";
import { formatThinkingLevels, normalizeThinkLevel } from "../auto-reply/thinking.js";
import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../config/agent-limits.js";
import { loadConfig } from "../config/config.js";
import { callGateway } from "../gateway/call.js";
import { normalizeAgentId, parseAgentSessionKey } from "../routing/session-key.js";
@@ -107,7 +108,8 @@ export async function spawnSubagentDirect(
});
const callerDepth = getSubagentDepthFromSessionStore(requesterInternalKey, { cfg });
const maxSpawnDepth = cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? 1;
const maxSpawnDepth =
cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH;
if (callerDepth >= maxSpawnDepth) {
return {
status: "forbidden",

View File

@@ -575,14 +575,15 @@ describe("buildSubagentSystemPrompt", () => {
expect(prompt).toContain("instead of full-file `cat`");
});
it("defaults to depth 1 and maxSpawnDepth 1 when not provided", () => {
it("defaults to depth 1 and maxSpawnDepth 2 when not provided", () => {
const prompt = buildSubagentSystemPrompt({
childSessionKey: "agent:main:subagent:abc",
task: "basic task",
});
// Should not include spawning guidance (default maxSpawnDepth is 1, depth 1 is leaf)
expect(prompt).not.toContain("## Sub-Agent Spawning");
// Default maxSpawnDepth is 2, so depth-1 subagents are orchestrators.
expect(prompt).toContain("## Sub-Agent Spawning");
expect(prompt).toContain("You CAN spawn your own sub-agents");
expect(prompt).toContain("spawned by the main agent");
});
});

View File

@@ -7,6 +7,7 @@ import {
sortSubagentRuns,
type SubagentTargetResolution,
} from "../../auto-reply/reply/subagents-utils.js";
import { DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH } from "../../config/agent-limits.js";
import { loadConfig } from "../../config/config.js";
import type { SessionEntry } from "../../config/sessions.js";
import { loadSessionStore, resolveStorePath, updateSessionStore } from "../../config/sessions.js";
@@ -199,7 +200,8 @@ function resolveRequesterKey(params: {
// Check if this sub-agent can spawn children (orchestrator).
// If so, it should see its own children, not its parent's children.
const callerDepth = getSubagentDepthFromSessionStore(callerSessionKey, { cfg: params.cfg });
const maxSpawnDepth = params.cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? 1;
const maxSpawnDepth =
params.cfg.agents?.defaults?.subagents?.maxSpawnDepth ?? DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH;
if (callerDepth < maxSpawnDepth) {
// Orchestrator sub-agent: use its own session key as requester
// so it sees children it spawned.

View File

@@ -2,6 +2,7 @@ import type { OpenClawConfig } from "./types.js";
export const DEFAULT_AGENT_MAX_CONCURRENT = 4;
export const DEFAULT_SUBAGENT_MAX_CONCURRENT = 8;
export const DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH = 2;
export function resolveAgentMaxConcurrent(cfg?: OpenClawConfig): number {
const raw = cfg?.agents?.defaults?.maxConcurrent;

View File

@@ -3,6 +3,7 @@ import path from "node:path";
import { describe, expect, it } from "vitest";
import {
DEFAULT_AGENT_MAX_CONCURRENT,
DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH,
DEFAULT_SUBAGENT_MAX_CONCURRENT,
resolveAgentMaxConcurrent,
resolveSubagentMaxConcurrent,
@@ -60,6 +61,7 @@ describe("agent concurrency defaults", () => {
expect(cfg.agents?.defaults?.maxConcurrent).toBe(DEFAULT_AGENT_MAX_CONCURRENT);
expect(cfg.agents?.defaults?.subagents?.maxConcurrent).toBe(DEFAULT_SUBAGENT_MAX_CONCURRENT);
expect(cfg.agents?.defaults?.subagents?.maxSpawnDepth).toBe(DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH);
});
});
});

View File

@@ -1,7 +1,11 @@
import fs from "node:fs/promises";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { DEFAULT_AGENT_MAX_CONCURRENT, DEFAULT_SUBAGENT_MAX_CONCURRENT } from "./agent-limits.js";
import {
DEFAULT_AGENT_MAX_CONCURRENT,
DEFAULT_SUBAGENT_MAX_CONCURRENT,
DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH,
} from "./agent-limits.js";
import { loadConfig } from "./config.js";
import { withTempHome } from "./home-env.test-harness.js";
@@ -53,6 +57,7 @@ describe("config identity defaults", () => {
expect(cfg.agents?.list).toBeUndefined();
expect(cfg.agents?.defaults?.maxConcurrent).toBe(DEFAULT_AGENT_MAX_CONCURRENT);
expect(cfg.agents?.defaults?.subagents?.maxConcurrent).toBe(DEFAULT_SUBAGENT_MAX_CONCURRENT);
expect(cfg.agents?.defaults?.subagents?.maxSpawnDepth).toBe(DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH);
expect(cfg.session).toBeUndefined();
});
});

View File

@@ -1,6 +1,10 @@
import { DEFAULT_CONTEXT_TOKENS } from "../agents/defaults.js";
import { parseModelRef } from "../agents/model-selection.js";
import { DEFAULT_AGENT_MAX_CONCURRENT, DEFAULT_SUBAGENT_MAX_CONCURRENT } from "./agent-limits.js";
import {
DEFAULT_AGENT_MAX_CONCURRENT,
DEFAULT_SUBAGENT_MAX_CONCURRENT,
DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH,
} from "./agent-limits.js";
import { resolveTalkApiKey } from "./talk.js";
import type { OpenClawConfig } from "./types.js";
import type { ModelDefinitionConfig } from "./types.models.js";
@@ -299,7 +303,10 @@ export function applyAgentDefaults(cfg: OpenClawConfig): OpenClawConfig {
const hasSubMax =
typeof defaults?.subagents?.maxConcurrent === "number" &&
Number.isFinite(defaults.subagents.maxConcurrent);
if (hasMax && hasSubMax) {
const hasMaxSpawnDepth =
typeof defaults?.subagents?.maxSpawnDepth === "number" &&
Number.isFinite(defaults.subagents.maxSpawnDepth);
if (hasMax && hasSubMax && hasMaxSpawnDepth) {
return cfg;
}
@@ -315,6 +322,10 @@ export function applyAgentDefaults(cfg: OpenClawConfig): OpenClawConfig {
nextSubagents.maxConcurrent = DEFAULT_SUBAGENT_MAX_CONCURRENT;
mutated = true;
}
if (!hasMaxSpawnDepth) {
nextSubagents.maxSpawnDepth = DEFAULT_SUBAGENT_MAX_SPAWN_DEPTH;
mutated = true;
}
if (!mutated) {
return cfg;

View File

@@ -241,7 +241,7 @@ export type AgentDefaultsConfig = {
subagents?: {
/** Max concurrent sub-agent runs (global lane: "subagent"). Default: 1. */
maxConcurrent?: number;
/** Maximum depth allowed for sessions_spawn chains. Default behavior: 1 (no nested spawns). */
/** Maximum depth allowed for sessions_spawn chains. Default behavior: 2 (allows nested spawns). */
maxSpawnDepth?: number;
/** Maximum active children a single requester session may spawn. Default behavior: 5. */
maxChildrenPerAgent?: number;

View File

@@ -150,7 +150,7 @@ export const AgentDefaultsSchema = z
.max(5)
.optional()
.describe(
"Maximum nesting depth for sub-agent spawning. 1 = no nesting (default), 2 = sub-agents can spawn sub-sub-agents.",
"Maximum nesting depth for sub-agent spawning. Default is 2 (sub-agents can spawn sub-sub-agents).",
),
maxChildrenPerAgent: z
.number()

View File

@@ -4,11 +4,9 @@ import type { OpenClawConfig } from "../../config/config.js";
vi.mock("../../config/sessions.js", () => ({
loadSessionStore: vi.fn(),
resolveStorePath: vi.fn().mockReturnValue("/tmp/test-store.json"),
evaluateSessionFreshness: vi.fn().mockReturnValue({ fresh: true }),
resolveSessionResetPolicy: vi.fn().mockReturnValue({ mode: "idle", idleMinutes: 60 }),
}));
import { loadSessionStore, evaluateSessionFreshness } from "../../config/sessions.js";
import { loadSessionStore } from "../../config/sessions.js";
import { resolveCronSession } from "./session.js";
const NOW_MS = 1_737_600_000_000;
@@ -17,25 +15,18 @@ type SessionStore = ReturnType<typeof loadSessionStore>;
type SessionStoreEntry = SessionStore[string];
type MockSessionStoreEntry = Partial<SessionStoreEntry>;
function resolveWithStoredEntry(params?: {
sessionKey?: string;
entry?: MockSessionStoreEntry;
forceNew?: boolean;
fresh?: boolean;
}) {
function resolveWithStoredEntry(params?: { sessionKey?: string; entry?: MockSessionStoreEntry }) {
const sessionKey = params?.sessionKey ?? "webhook:stable-key";
const store: SessionStore = params?.entry
? ({ [sessionKey]: params.entry as SessionStoreEntry } as SessionStore)
: {};
vi.mocked(loadSessionStore).mockReturnValue(store);
vi.mocked(evaluateSessionFreshness).mockReturnValue({ fresh: params?.fresh ?? true });
return resolveCronSession({
cfg: {} as OpenClawConfig,
sessionKey,
agentId: "main",
nowMs: NOW_MS,
forceNew: params?.forceNew,
});
}
@@ -85,76 +76,51 @@ describe("resolveCronSession", () => {
expect(result.isNewSession).toBe(true);
});
// New tests for session reuse behavior (#18027)
describe("session reuse for webhooks/cron", () => {
it("reuses existing sessionId when session is fresh", () => {
const result = resolveWithStoredEntry({
entry: {
sessionId: "existing-session-id-123",
updatedAt: NOW_MS - 1000,
systemSent: true,
},
fresh: true,
});
expect(result.sessionEntry.sessionId).toBe("existing-session-id-123");
expect(result.isNewSession).toBe(false);
expect(result.systemSent).toBe(true);
it("always creates a new sessionId for cron/webhook runs", () => {
const result = resolveWithStoredEntry({
entry: {
sessionId: "existing-session-id-123",
updatedAt: NOW_MS - 1000,
systemSent: true,
},
});
it("creates new sessionId when session is stale", () => {
const result = resolveWithStoredEntry({
entry: {
sessionId: "old-session-id",
updatedAt: NOW_MS - 86_400_000, // 1 day ago
systemSent: true,
modelOverride: "gpt-4.1-mini",
providerOverride: "openai",
sendPolicy: "allow",
},
fresh: false,
});
expect(result.sessionEntry.sessionId).not.toBe("existing-session-id-123");
expect(result.isNewSession).toBe(true);
expect(result.systemSent).toBe(false);
});
expect(result.sessionEntry.sessionId).not.toBe("old-session-id");
expect(result.isNewSession).toBe(true);
expect(result.systemSent).toBe(false);
expect(result.sessionEntry.modelOverride).toBe("gpt-4.1-mini");
expect(result.sessionEntry.providerOverride).toBe("openai");
expect(result.sessionEntry.sendPolicy).toBe("allow");
it("preserves overrides while rolling a new sessionId", () => {
const result = resolveWithStoredEntry({
entry: {
sessionId: "old-session-id",
updatedAt: NOW_MS - 86_400_000,
systemSent: true,
modelOverride: "gpt-4.1-mini",
providerOverride: "openai",
sendPolicy: "allow",
},
});
it("creates new sessionId when forceNew is true", () => {
const result = resolveWithStoredEntry({
entry: {
sessionId: "existing-session-id-456",
updatedAt: NOW_MS - 1000,
systemSent: true,
modelOverride: "sonnet-4",
providerOverride: "anthropic",
},
fresh: true,
forceNew: true,
});
expect(result.sessionEntry.sessionId).not.toBe("old-session-id");
expect(result.isNewSession).toBe(true);
expect(result.systemSent).toBe(false);
expect(result.sessionEntry.modelOverride).toBe("gpt-4.1-mini");
expect(result.sessionEntry.providerOverride).toBe("openai");
expect(result.sessionEntry.sendPolicy).toBe("allow");
});
expect(result.sessionEntry.sessionId).not.toBe("existing-session-id-456");
expect(result.isNewSession).toBe(true);
expect(result.systemSent).toBe(false);
expect(result.sessionEntry.modelOverride).toBe("sonnet-4");
expect(result.sessionEntry.providerOverride).toBe("anthropic");
it("creates new sessionId when entry exists but has no sessionId", () => {
const result = resolveWithStoredEntry({
entry: {
updatedAt: NOW_MS - 1000,
modelOverride: "some-model",
},
});
it("creates new sessionId when entry exists but has no sessionId", () => {
const result = resolveWithStoredEntry({
entry: {
updatedAt: NOW_MS - 1000,
modelOverride: "some-model",
},
});
expect(result.sessionEntry.sessionId).toBeDefined();
expect(result.isNewSession).toBe(true);
// Should still preserve other fields from entry
expect(result.sessionEntry.modelOverride).toBe("some-model");
});
expect(result.sessionEntry.sessionId).toBeDefined();
expect(result.isNewSession).toBe(true);
// Should still preserve other fields from entry
expect(result.sessionEntry.modelOverride).toBe("some-model");
});
});

View File

@@ -1,19 +1,12 @@
import crypto from "node:crypto";
import type { OpenClawConfig } from "../../config/config.js";
import {
evaluateSessionFreshness,
loadSessionStore,
resolveSessionResetPolicy,
resolveStorePath,
type SessionEntry,
} from "../../config/sessions.js";
import { loadSessionStore, resolveStorePath, type SessionEntry } from "../../config/sessions.js";
export function resolveCronSession(params: {
cfg: OpenClawConfig;
sessionKey: string;
nowMs: number;
agentId: string;
forceNew?: boolean;
}) {
const sessionCfg = params.cfg.session;
const storePath = resolveStorePath(sessionCfg?.store, {
@@ -21,42 +14,8 @@ export function resolveCronSession(params: {
});
const store = loadSessionStore(storePath);
const entry = store[params.sessionKey];
// Check if we can reuse an existing session
let sessionId: string;
let isNewSession: boolean;
let systemSent: boolean;
if (!params.forceNew && entry?.sessionId) {
// Evaluate freshness using the configured reset policy
// Cron/webhook sessions use "direct" reset type (1:1 conversation style)
const resetPolicy = resolveSessionResetPolicy({
sessionCfg,
resetType: "direct",
});
const freshness = evaluateSessionFreshness({
updatedAt: entry.updatedAt,
now: params.nowMs,
policy: resetPolicy,
});
if (freshness.fresh) {
// Reuse existing session
sessionId = entry.sessionId;
isNewSession = false;
systemSent = entry.systemSent ?? false;
} else {
// Session expired, create new
sessionId = crypto.randomUUID();
isNewSession = true;
systemSent = false;
}
} else {
// No existing session or forced new
sessionId = crypto.randomUUID();
isNewSession = true;
systemSent = false;
}
const sessionId = crypto.randomUUID();
const systemSent = false;
const sessionEntry: SessionEntry = {
// Preserve existing per-session overrides even when rolling to a new sessionId.
@@ -66,5 +25,5 @@ export function resolveCronSession(params: {
updatedAt: params.nowMs,
systemSent,
};
return { storePath, store, sessionEntry, systemSent, isNewSession };
return { storePath, store, sessionEntry, systemSent, isNewSession: true };
}