From 4258a3307f5a616c26c5bd63e108d918b71a9217 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Thu, 26 Feb 2026 00:30:19 +0000 Subject: [PATCH] refactor(agents): unify subagent announce delivery pipeline Co-authored-by: Smith Labs Co-authored-by: Do Cao Hieu --- CHANGELOG.md | 1 + src/agents/subagent-announce-dispatch.test.ts | 156 ++++++++++++++++++ src/agents/subagent-announce-dispatch.ts | 104 ++++++++++++ src/agents/subagent-announce.ts | 113 ++++--------- ...agent-registry.announce-loop-guard.test.ts | 39 +++++ src/agents/subagent-registry.ts | 13 +- src/gateway/server-methods/send.ts | 7 +- src/infra/outbound/channel-resolution.ts | 73 ++++++++ src/infra/outbound/message.test.ts | 51 +++++- src/infra/outbound/message.ts | 26 +-- .../targets.channel-resolution.test.ts | 61 +++++++ src/infra/outbound/targets.ts | 27 ++- src/telegram/send.test.ts | 38 +++++ src/telegram/send.ts | 46 +++--- 14 files changed, 623 insertions(+), 132 deletions(-) create mode 100644 src/agents/subagent-announce-dispatch.test.ts create mode 100644 src/agents/subagent-announce-dispatch.ts create mode 100644 src/infra/outbound/channel-resolution.ts create mode 100644 src/infra/outbound/targets.channel-resolution.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 6db4c8796de..6ee1e3d0375 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Agents/Subagents delivery: refactor subagent completion announce dispatch into an explicit queue/direct/fallback state machine, recover outbound channel-plugin resolution in cold/stale plugin-registry states across announce/message/gateway send paths, finalize cleanup bookkeeping when announce flow rejects, and treat Telegram sends without `message_id` as delivery failures (instead of false-success `"unknown"` IDs). (#26867, #25961, #26803, #25069, #26741) Thanks @SmithLabsLLC and @docaohieu2808. - Slack/Session threads: prevent oversized parent-session inheritance from silently bricking new thread sessions, surface embedded context-overflow empty-result failures to users, and add configurable `session.parentForkMaxTokens` (default `100000`, `0` disables). (#26912) Thanks @markshields-tl. - Security/Signal: enforce DM/group authorization before reaction-only notification enqueue so unauthorized senders can no longer inject Signal reaction system events under `dmPolicy`/`groupPolicy`; reaction notifications now require channel access checks first. This ships in the next npm release (`2026.2.25`). Thanks @tdjackey for reporting. - Security/Discord + Slack reactions: enforce DM policy/allowlist authorization before reaction-event system enqueue in direct messages; Discord reaction handling now also honors DM/group-DM enablement and guild `groupPolicy` channel gating to keep reaction ingress aligned with normal message preflight. This ships in the next npm release (`2026.2.25`). Thanks @tdjackey for reporting. diff --git a/src/agents/subagent-announce-dispatch.test.ts b/src/agents/subagent-announce-dispatch.test.ts new file mode 100644 index 00000000000..fcc2f992e2b --- /dev/null +++ b/src/agents/subagent-announce-dispatch.test.ts @@ -0,0 +1,156 @@ +import { describe, expect, it, vi } from "vitest"; +import { + mapQueueOutcomeToDeliveryResult, + runSubagentAnnounceDispatch, +} from "./subagent-announce-dispatch.js"; + +describe("mapQueueOutcomeToDeliveryResult", () => { + it("maps steered to delivered", () => { + expect(mapQueueOutcomeToDeliveryResult("steered")).toEqual({ + delivered: true, + path: "steered", + }); + }); + + it("maps queued to delivered", () => { + expect(mapQueueOutcomeToDeliveryResult("queued")).toEqual({ + delivered: true, + path: "queued", + }); + }); + + it("maps none to not-delivered", () => { + expect(mapQueueOutcomeToDeliveryResult("none")).toEqual({ + delivered: false, + path: "none", + }); + }); +}); + +describe("runSubagentAnnounceDispatch", () => { + it("uses queue-first ordering for non-completion mode", async () => { + const queue = vi.fn(async () => "none" as const); + const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const })); + + const result = await runSubagentAnnounceDispatch({ + expectsCompletionMessage: false, + queue, + direct, + }); + + expect(queue).toHaveBeenCalledTimes(1); + expect(direct).toHaveBeenCalledTimes(1); + expect(result.delivered).toBe(true); + expect(result.path).toBe("direct"); + expect(result.phases).toEqual([ + { phase: "queue-primary", delivered: false, path: "none", error: undefined }, + { phase: "direct-primary", delivered: true, path: "direct", error: undefined }, + ]); + }); + + it("short-circuits direct send when non-completion queue delivers", async () => { + const queue = vi.fn(async () => "queued" as const); + const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const })); + + const result = await runSubagentAnnounceDispatch({ + expectsCompletionMessage: false, + queue, + direct, + }); + + expect(queue).toHaveBeenCalledTimes(1); + expect(direct).not.toHaveBeenCalled(); + expect(result.path).toBe("queued"); + expect(result.phases).toEqual([ + { phase: "queue-primary", delivered: true, path: "queued", error: undefined }, + ]); + }); + + it("uses direct-first ordering for completion mode", async () => { + const queue = vi.fn(async () => "queued" as const); + const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const })); + + const result = await runSubagentAnnounceDispatch({ + expectsCompletionMessage: true, + queue, + direct, + }); + + expect(direct).toHaveBeenCalledTimes(1); + expect(queue).not.toHaveBeenCalled(); + expect(result.path).toBe("direct"); + expect(result.phases).toEqual([ + { phase: "direct-primary", delivered: true, path: "direct", error: undefined }, + ]); + }); + + it("falls back to queue when completion direct send fails", async () => { + const queue = vi.fn(async () => "steered" as const); + const direct = vi.fn(async () => ({ + delivered: false, + path: "direct" as const, + error: "network", + })); + + const result = await runSubagentAnnounceDispatch({ + expectsCompletionMessage: true, + queue, + direct, + }); + + expect(direct).toHaveBeenCalledTimes(1); + expect(queue).toHaveBeenCalledTimes(1); + expect(result.path).toBe("steered"); + expect(result.phases).toEqual([ + { phase: "direct-primary", delivered: false, path: "direct", error: "network" }, + { phase: "queue-fallback", delivered: true, path: "steered", error: undefined }, + ]); + }); + + it("returns direct failure when completion fallback queue cannot deliver", async () => { + const queue = vi.fn(async () => "none" as const); + const direct = vi.fn(async () => ({ + delivered: false, + path: "direct" as const, + error: "failed", + })); + + const result = await runSubagentAnnounceDispatch({ + expectsCompletionMessage: true, + queue, + direct, + }); + + expect(result).toMatchObject({ + delivered: false, + path: "direct", + error: "failed", + }); + expect(result.phases).toEqual([ + { phase: "direct-primary", delivered: false, path: "direct", error: "failed" }, + { phase: "queue-fallback", delivered: false, path: "none", error: undefined }, + ]); + }); + + it("returns none immediately when signal is already aborted", async () => { + const queue = vi.fn(async () => "none" as const); + const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const })); + const controller = new AbortController(); + controller.abort(); + + const result = await runSubagentAnnounceDispatch({ + expectsCompletionMessage: true, + signal: controller.signal, + queue, + direct, + }); + + expect(queue).not.toHaveBeenCalled(); + expect(direct).not.toHaveBeenCalled(); + expect(result).toEqual({ + delivered: false, + path: "none", + phases: [], + }); + }); +}); diff --git a/src/agents/subagent-announce-dispatch.ts b/src/agents/subagent-announce-dispatch.ts new file mode 100644 index 00000000000..93aa0dd9092 --- /dev/null +++ b/src/agents/subagent-announce-dispatch.ts @@ -0,0 +1,104 @@ +export type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none"; + +export type SubagentAnnounceQueueOutcome = "steered" | "queued" | "none"; + +export type SubagentAnnounceDeliveryResult = { + delivered: boolean; + path: SubagentDeliveryPath; + error?: string; + phases?: SubagentAnnounceDispatchPhaseResult[]; +}; + +export type SubagentAnnounceDispatchPhase = "queue-primary" | "direct-primary" | "queue-fallback"; + +export type SubagentAnnounceDispatchPhaseResult = { + phase: SubagentAnnounceDispatchPhase; + delivered: boolean; + path: SubagentDeliveryPath; + error?: string; +}; + +export function mapQueueOutcomeToDeliveryResult( + outcome: SubagentAnnounceQueueOutcome, +): SubagentAnnounceDeliveryResult { + if (outcome === "steered") { + return { + delivered: true, + path: "steered", + }; + } + if (outcome === "queued") { + return { + delivered: true, + path: "queued", + }; + } + return { + delivered: false, + path: "none", + }; +} + +export async function runSubagentAnnounceDispatch(params: { + expectsCompletionMessage: boolean; + signal?: AbortSignal; + queue: () => Promise; + direct: () => Promise; +}): Promise { + const phases: SubagentAnnounceDispatchPhaseResult[] = []; + const appendPhase = ( + phase: SubagentAnnounceDispatchPhase, + result: SubagentAnnounceDeliveryResult, + ) => { + phases.push({ + phase, + delivered: result.delivered, + path: result.path, + error: result.error, + }); + }; + const withPhases = (result: SubagentAnnounceDeliveryResult): SubagentAnnounceDeliveryResult => ({ + ...result, + phases, + }); + + if (params.signal?.aborted) { + return withPhases({ + delivered: false, + path: "none", + }); + } + + if (!params.expectsCompletionMessage) { + const primaryQueue = mapQueueOutcomeToDeliveryResult(await params.queue()); + appendPhase("queue-primary", primaryQueue); + if (primaryQueue.delivered) { + return withPhases(primaryQueue); + } + + const primaryDirect = await params.direct(); + appendPhase("direct-primary", primaryDirect); + return withPhases(primaryDirect); + } + + const primaryDirect = await params.direct(); + appendPhase("direct-primary", primaryDirect); + if (primaryDirect.delivered) { + return withPhases(primaryDirect); + } + + if (params.signal?.aborted) { + return withPhases({ + delivered: false, + path: "none", + }); + } + + const fallbackQueue = mapQueueOutcomeToDeliveryResult(await params.queue()); + appendPhase("queue-fallback", fallbackQueue); + if (fallbackQueue.delivered) { + return withPhases(fallbackQueue); + } + + return withPhases(primaryDirect); +} diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 7d7fd7ceb48..c99a6cb6593 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -32,6 +32,10 @@ import { queueEmbeddedPiMessage, waitForEmbeddedPiRunEnd, } from "./pi-embedded.js"; +import { + runSubagentAnnounceDispatch, + type SubagentAnnounceDeliveryResult, +} from "./subagent-announce-dispatch.js"; import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js"; import { getSubagentDepthFromSessionStore } from "./subagent-depth.js"; import type { SpawnSubagentMode } from "./subagent-spawn.js"; @@ -53,14 +57,6 @@ type ToolResultMessage = { content?: unknown; }; -type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none"; - -type SubagentAnnounceDeliveryResult = { - delivered: boolean; - path: SubagentDeliveryPath; - error?: string; -}; - function resolveSubagentAnnounceTimeoutMs(cfg: ReturnType): number { const configured = cfg.agents?.defaults?.subagents?.announceTimeoutMs; if (typeof configured !== "number" || !Number.isFinite(configured)) { @@ -705,27 +701,6 @@ async function maybeQueueSubagentAnnounce(params: { return "none"; } -function queueOutcomeToDeliveryResult( - outcome: "steered" | "queued" | "none", -): SubagentAnnounceDeliveryResult { - if (outcome === "steered") { - return { - delivered: true, - path: "steered", - }; - } - if (outcome === "queued") { - return { - delivered: true, - path: "queued", - }; - } - return { - delivered: false, - path: "none", - }; -} - async function sendSubagentAnnounceDirectly(params: { targetRequesterSessionKey: string; triggerMessage: string; @@ -905,64 +880,34 @@ async function deliverSubagentAnnouncement(params: { directIdempotencyKey: string; signal?: AbortSignal; }): Promise { - if (params.signal?.aborted) { - return { - delivered: false, - path: "none", - }; - } - // Non-completion mode mirrors historical behavior: try queued/steered delivery first, - // then (only if not queued) attempt direct delivery. - if (!params.expectsCompletionMessage) { - const queueOutcome = await maybeQueueSubagentAnnounce({ - requesterSessionKey: params.requesterSessionKey, - announceId: params.announceId, - triggerMessage: params.triggerMessage, - summaryLine: params.summaryLine, - requesterOrigin: params.requesterOrigin, - signal: params.signal, - }); - const queued = queueOutcomeToDeliveryResult(queueOutcome); - if (queued.delivered) { - return queued; - } - } - - // Completion-mode uses direct send first so manual spawns can return immediately - // in the common ready-to-deliver case. - const direct = await sendSubagentAnnounceDirectly({ - targetRequesterSessionKey: params.targetRequesterSessionKey, - triggerMessage: params.triggerMessage, - completionMessage: params.completionMessage, - directIdempotencyKey: params.directIdempotencyKey, - completionDirectOrigin: params.completionDirectOrigin, - completionRouteMode: params.completionRouteMode, - spawnMode: params.spawnMode, - directOrigin: params.directOrigin, - requesterIsSubagent: params.requesterIsSubagent, + return await runSubagentAnnounceDispatch({ expectsCompletionMessage: params.expectsCompletionMessage, signal: params.signal, - bestEffortDeliver: params.bestEffortDeliver, + queue: async () => + await maybeQueueSubagentAnnounce({ + requesterSessionKey: params.requesterSessionKey, + announceId: params.announceId, + triggerMessage: params.triggerMessage, + summaryLine: params.summaryLine, + requesterOrigin: params.requesterOrigin, + signal: params.signal, + }), + direct: async () => + await sendSubagentAnnounceDirectly({ + targetRequesterSessionKey: params.targetRequesterSessionKey, + triggerMessage: params.triggerMessage, + completionMessage: params.completionMessage, + directIdempotencyKey: params.directIdempotencyKey, + completionDirectOrigin: params.completionDirectOrigin, + completionRouteMode: params.completionRouteMode, + spawnMode: params.spawnMode, + directOrigin: params.directOrigin, + requesterIsSubagent: params.requesterIsSubagent, + expectsCompletionMessage: params.expectsCompletionMessage, + signal: params.signal, + bestEffortDeliver: params.bestEffortDeliver, + }), }); - if (direct.delivered || !params.expectsCompletionMessage) { - return direct; - } - - // If completion path failed direct delivery, try queueing as a fallback so the - // report can still be delivered once the requester session is idle. - const queueOutcome = await maybeQueueSubagentAnnounce({ - requesterSessionKey: params.requesterSessionKey, - announceId: params.announceId, - triggerMessage: params.triggerMessage, - summaryLine: params.summaryLine, - requesterOrigin: params.requesterOrigin, - signal: params.signal, - }); - if (queueOutcome === "steered" || queueOutcome === "queued") { - return queueOutcomeToDeliveryResult(queueOutcome); - } - - return direct; } function loadSessionEntryByKey(sessionKey: string) { diff --git a/src/agents/subagent-registry.announce-loop-guard.test.ts b/src/agents/subagent-registry.announce-loop-guard.test.ts index 8389c53503c..498b38aaedc 100644 --- a/src/agents/subagent-registry.announce-loop-guard.test.ts +++ b/src/agents/subagent-registry.announce-loop-guard.test.ts @@ -155,4 +155,43 @@ describe("announce loop guard (#18264)", () => { const stored = runs.find((run) => run.runId === entry.runId); expect(stored?.cleanupCompletedAt).toBeDefined(); }); + + test("announce rejection resets cleanupHandled so retries can resume", async () => { + announceFn.mockReset(); + announceFn.mockRejectedValueOnce(new Error("announce failed")); + registry.resetSubagentRegistryForTests(); + + const now = Date.now(); + const runId = "test-announce-rejection"; + loadSubagentRegistryFromDisk.mockReturnValue( + new Map([ + [ + runId, + { + runId, + childSessionKey: "agent:main:subagent:child-1", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "agent:main:main", + task: "rejection test", + cleanup: "keep" as const, + createdAt: now - 30_000, + startedAt: now - 20_000, + endedAt: now - 10_000, + cleanupHandled: false, + }, + ], + ]), + ); + + registry.initSubagentRegistry(); + await Promise.resolve(); + await Promise.resolve(); + + const runs = registry.listSubagentRunsForRequester("agent:main:main"); + const stored = runs.find((run) => run.runId === runId); + expect(stored?.cleanupHandled).toBe(false); + expect(stored?.cleanupCompletedAt).toBeUndefined(); + expect(stored?.announceRetryCount).toBe(1); + expect(stored?.lastAnnounceRetryAt).toBeTypeOf("number"); + }); }); diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index edb8f228b07..072fd91693f 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -331,9 +331,16 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor outcome: entry.outcome, spawnMode: entry.spawnMode, expectsCompletionMessage: entry.expectsCompletionMessage, - }).then((didAnnounce) => { - void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce); - }); + }) + .then((didAnnounce) => { + void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce); + }) + .catch((error) => { + defaultRuntime.log( + `[warn] Subagent announce flow failed during cleanup for run ${runId}: ${String(error)}`, + ); + void finalizeSubagentCleanup(runId, entry.cleanup, false); + }); return true; } diff --git a/src/gateway/server-methods/send.ts b/src/gateway/server-methods/send.ts index 9e976a79ae1..f398d94aae4 100644 --- a/src/gateway/server-methods/send.ts +++ b/src/gateway/server-methods/send.ts @@ -1,7 +1,8 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js"; -import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/index.js"; +import { normalizeChannelId } from "../../channels/plugins/index.js"; import { createOutboundSendDeps } from "../../cli/deps.js"; import { loadConfig } from "../../config/config.js"; +import { resolveOutboundChannelPlugin } from "../../infra/outbound/channel-resolution.js"; import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js"; import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; import { @@ -166,7 +167,7 @@ export const sendHandlers: GatewayRequestHandlers = { ? request.threadId.trim() : undefined; const outboundChannel = channel; - const plugin = getChannelPlugin(channel); + const plugin = resolveOutboundChannelPlugin({ channel, cfg }); if (!plugin) { respond( false, @@ -393,7 +394,7 @@ export const sendHandlers: GatewayRequestHandlers = { ? request.accountId.trim() : undefined; try { - const plugin = getChannelPlugin(channel); + const plugin = resolveOutboundChannelPlugin({ channel, cfg }); const outbound = plugin?.outbound; if (!outbound?.sendPoll) { respond( diff --git a/src/infra/outbound/channel-resolution.ts b/src/infra/outbound/channel-resolution.ts new file mode 100644 index 00000000000..58596da93f3 --- /dev/null +++ b/src/infra/outbound/channel-resolution.ts @@ -0,0 +1,73 @@ +import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../../agents/agent-scope.js"; +import { getChannelPlugin } from "../../channels/plugins/index.js"; +import type { ChannelPlugin } from "../../channels/plugins/types.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import { applyPluginAutoEnable } from "../../config/plugin-auto-enable.js"; +import { loadOpenClawPlugins } from "../../plugins/loader.js"; +import { getActivePluginRegistry, getActivePluginRegistryKey } from "../../plugins/runtime.js"; +import { + isDeliverableMessageChannel, + normalizeMessageChannel, + type DeliverableMessageChannel, +} from "../../utils/message-channel.js"; + +const bootstrapAttempts = new Set(); + +export function normalizeDeliverableOutboundChannel( + raw?: string | null, +): DeliverableMessageChannel | undefined { + const normalized = normalizeMessageChannel(raw); + if (!normalized || !isDeliverableMessageChannel(normalized)) { + return undefined; + } + return normalized; +} + +function maybeBootstrapChannelPlugin(params: { + channel: DeliverableMessageChannel; + cfg?: OpenClawConfig; +}): void { + const cfg = params.cfg; + if (!cfg) { + return; + } + + const activeRegistry = getActivePluginRegistry(); + if ((activeRegistry?.channels?.length ?? 0) > 0) { + return; + } + + const registryKey = getActivePluginRegistryKey() ?? ""; + const attemptKey = `${registryKey}:${params.channel}`; + if (bootstrapAttempts.has(attemptKey)) { + return; + } + bootstrapAttempts.add(attemptKey); + + const autoEnabled = applyPluginAutoEnable({ config: cfg }).config; + const defaultAgentId = resolveDefaultAgentId(autoEnabled); + const workspaceDir = resolveAgentWorkspaceDir(autoEnabled, defaultAgentId); + loadOpenClawPlugins({ + config: autoEnabled, + workspaceDir, + }); +} + +export function resolveOutboundChannelPlugin(params: { + channel: string; + cfg?: OpenClawConfig; +}): ChannelPlugin | undefined { + const normalized = normalizeDeliverableOutboundChannel(params.channel); + if (!normalized) { + return undefined; + } + + const resolve = () => getChannelPlugin(normalized); + const current = resolve(); + if (current) { + return current; + } + + maybeBootstrapChannelPlugin({ channel: normalized, cfg: params.cfg }); + return resolve(); +} diff --git a/src/infra/outbound/message.test.ts b/src/infra/outbound/message.test.ts index 3714e7ab5ac..d6fab2e39dc 100644 --- a/src/infra/outbound/message.test.ts +++ b/src/infra/outbound/message.test.ts @@ -4,6 +4,7 @@ const mocks = vi.hoisted(() => ({ getChannelPlugin: vi.fn(), resolveOutboundTarget: vi.fn(), deliverOutboundPayloads: vi.fn(), + loadOpenClawPlugins: vi.fn(), })); vi.mock("../../channels/plugins/index.js", () => ({ @@ -11,6 +12,19 @@ vi.mock("../../channels/plugins/index.js", () => ({ getChannelPlugin: mocks.getChannelPlugin, })); +vi.mock("../../agents/agent-scope.js", () => ({ + resolveDefaultAgentId: () => "main", + resolveAgentWorkspaceDir: () => "/tmp/openclaw-test-workspace", +})); + +vi.mock("../../config/plugin-auto-enable.js", () => ({ + applyPluginAutoEnable: ({ config }: { config: unknown }) => ({ config, changes: [] }), +})); + +vi.mock("../../plugins/loader.js", () => ({ + loadOpenClawPlugins: mocks.loadOpenClawPlugins, +})); + vi.mock("./targets.js", () => ({ resolveOutboundTarget: mocks.resolveOutboundTarget, })); @@ -19,13 +33,17 @@ vi.mock("./deliver.js", () => ({ deliverOutboundPayloads: mocks.deliverOutboundPayloads, })); +import { setActivePluginRegistry } from "../../plugins/runtime.js"; +import { createTestRegistry } from "../../test-utils/channel-plugins.js"; import { sendMessage } from "./message.js"; describe("sendMessage", () => { beforeEach(() => { + setActivePluginRegistry(createTestRegistry([])); mocks.getChannelPlugin.mockClear(); mocks.resolveOutboundTarget.mockClear(); mocks.deliverOutboundPayloads.mockClear(); + mocks.loadOpenClawPlugins.mockClear(); mocks.getChannelPlugin.mockReturnValue({ outbound: { deliveryMode: "direct" }, @@ -37,8 +55,8 @@ describe("sendMessage", () => { it("passes explicit agentId to outbound delivery for scoped media roots", async () => { await sendMessage({ cfg: {}, - channel: "mattermost", - to: "channel:town-square", + channel: "telegram", + to: "123456", content: "hi", agentId: "work", }); @@ -46,9 +64,34 @@ describe("sendMessage", () => { expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( expect.objectContaining({ agentId: "work", - channel: "mattermost", - to: "channel:town-square", + channel: "telegram", + to: "123456", }), ); }); + + it("recovers telegram plugin resolution so message/send does not fail with Unknown channel: telegram", async () => { + const telegramPlugin = { + outbound: { deliveryMode: "direct" }, + }; + mocks.getChannelPlugin + .mockReturnValueOnce(undefined) + .mockReturnValueOnce(telegramPlugin) + .mockReturnValue(telegramPlugin); + + await expect( + sendMessage({ + cfg: { channels: { telegram: { botToken: "test-token" } } }, + channel: "telegram", + to: "123456", + content: "hi", + }), + ).resolves.toMatchObject({ + channel: "telegram", + to: "123456", + via: "direct", + }); + + expect(mocks.loadOpenClawPlugins).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/infra/outbound/message.ts b/src/infra/outbound/message.ts index 649aabd0ece..30451b66959 100644 --- a/src/infra/outbound/message.ts +++ b/src/infra/outbound/message.ts @@ -1,4 +1,3 @@ -import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/index.js"; import type { OpenClawConfig } from "../../config/config.js"; import { loadConfig } from "../../config/config.js"; import { callGatewayLeastPrivilege, randomIdempotencyKey } from "../../gateway/call.js"; @@ -10,6 +9,10 @@ import { type GatewayClientMode, type GatewayClientName, } from "../../utils/message-channel.js"; +import { + normalizeDeliverableOutboundChannel, + resolveOutboundChannelPlugin, +} from "./channel-resolution.js"; import { resolveMessageChannelSelection } from "./channel-selection.js"; import { deliverOutboundPayloads, @@ -107,17 +110,18 @@ async function resolveRequiredChannel(params: { cfg: OpenClawConfig; channel?: string; }): Promise { - const channel = params.channel?.trim() - ? normalizeChannelId(params.channel) - : (await resolveMessageChannelSelection({ cfg: params.cfg })).channel; - if (!channel) { - throw new Error(`Unknown channel: ${params.channel}`); + if (params.channel?.trim()) { + const normalized = normalizeDeliverableOutboundChannel(params.channel); + if (!normalized) { + throw new Error(`Unknown channel: ${params.channel}`); + } + return normalized; } - return channel; + return (await resolveMessageChannelSelection({ cfg: params.cfg })).channel; } -function resolveRequiredPlugin(channel: string) { - const plugin = getChannelPlugin(channel); +function resolveRequiredPlugin(channel: string, cfg: OpenClawConfig) { + const plugin = resolveOutboundChannelPlugin({ channel, cfg }); if (!plugin) { throw new Error(`Unknown channel: ${channel}`); } @@ -166,7 +170,7 @@ async function callMessageGateway(params: { export async function sendMessage(params: MessageSendParams): Promise { const cfg = params.cfg ?? loadConfig(); const channel = await resolveRequiredChannel({ cfg, channel: params.channel }); - const plugin = resolveRequiredPlugin(channel); + const plugin = resolveRequiredPlugin(channel, cfg); const deliveryMode = plugin.outbound?.deliveryMode ?? "direct"; const normalizedPayloads = normalizeReplyPayloadsForDelivery([ { @@ -279,7 +283,7 @@ export async function sendPoll(params: MessagePollParams): Promise ({ + getChannelPlugin: vi.fn(), + loadOpenClawPlugins: vi.fn(), +})); + +vi.mock("../../channels/plugins/index.js", () => ({ + getChannelPlugin: mocks.getChannelPlugin, + normalizeChannelId: (channel?: string) => channel?.trim().toLowerCase() ?? undefined, +})); + +vi.mock("../../agents/agent-scope.js", () => ({ + resolveDefaultAgentId: () => "main", + resolveAgentWorkspaceDir: () => "/tmp/openclaw-test-workspace", +})); + +vi.mock("../../config/plugin-auto-enable.js", () => ({ + applyPluginAutoEnable: ({ config }: { config: unknown }) => ({ config, changes: [] }), +})); + +vi.mock("../../plugins/loader.js", () => ({ + loadOpenClawPlugins: mocks.loadOpenClawPlugins, +})); + +import { setActivePluginRegistry } from "../../plugins/runtime.js"; +import { createTestRegistry } from "../../test-utils/channel-plugins.js"; +import { resolveOutboundTarget } from "./targets.js"; + +describe("resolveOutboundTarget channel resolution", () => { + beforeEach(() => { + setActivePluginRegistry(createTestRegistry([])); + mocks.getChannelPlugin.mockReset(); + mocks.loadOpenClawPlugins.mockReset(); + }); + + it("recovers telegram plugin resolution so announce delivery does not fail with Unsupported channel: telegram", () => { + const telegramPlugin = { + id: "telegram", + meta: { label: "Telegram" }, + config: { + listAccountIds: () => [], + resolveAccount: () => ({}), + }, + }; + mocks.getChannelPlugin + .mockReturnValueOnce(undefined) + .mockReturnValueOnce(telegramPlugin) + .mockReturnValue(telegramPlugin); + + const result = resolveOutboundTarget({ + channel: "telegram", + to: "123456", + cfg: { channels: { telegram: { botToken: "test-token" } } }, + mode: "explicit", + }); + + expect(result).toEqual({ ok: true, to: "123456" }); + expect(mocks.loadOpenClawPlugins).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/infra/outbound/targets.ts b/src/infra/outbound/targets.ts index 41baa558653..d9411e2223c 100644 --- a/src/infra/outbound/targets.ts +++ b/src/infra/outbound/targets.ts @@ -1,5 +1,4 @@ import { normalizeChatType, type ChatType } from "../../channels/chat-type.js"; -import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/index.js"; import type { ChannelOutboundTargetMode } from "../../channels/plugins/types.js"; import { formatCliCommand } from "../../cli/command-format.js"; import type { OpenClawConfig } from "../../config/config.js"; @@ -20,6 +19,10 @@ import { normalizeMessageChannel, } from "../../utils/message-channel.js"; import { isWhatsAppGroupJid, normalizeWhatsAppTarget } from "../../whatsapp/normalize.js"; +import { + normalizeDeliverableOutboundChannel, + resolveOutboundChannelPlugin, +} from "./channel-resolution.js"; import { missingTargetError } from "./target-errors.js"; export type OutboundChannel = DeliverableMessageChannel | "none"; @@ -181,7 +184,10 @@ export function resolveOutboundTarget(params: { }; } - const plugin = getChannelPlugin(params.channel); + const plugin = resolveOutboundChannelPlugin({ + channel: params.channel, + cfg: params.cfg, + }); if (!plugin) { return { ok: false, @@ -242,7 +248,7 @@ export function resolveHeartbeatDeliveryTarget(params: { if (rawTarget === "none" || rawTarget === "last") { target = rawTarget; } else if (typeof rawTarget === "string") { - const normalized = normalizeChannelId(rawTarget); + const normalized = normalizeDeliverableOutboundChannel(rawTarget); if (normalized) { target = normalized; } @@ -269,7 +275,10 @@ export function resolveHeartbeatDeliveryTarget(params: { let effectiveAccountId = heartbeatAccountId || resolvedTarget.accountId; if (heartbeatAccountId && resolvedTarget.channel) { - const plugin = getChannelPlugin(resolvedTarget.channel); + const plugin = resolveOutboundChannelPlugin({ + channel: resolvedTarget.channel, + cfg, + }); const listAccountIds = plugin?.config.listAccountIds; const accountIds = listAccountIds ? listAccountIds(cfg) : []; if (accountIds.length > 0) { @@ -331,7 +340,10 @@ export function resolveHeartbeatDeliveryTarget(params: { } let reason: string | undefined; - const plugin = getChannelPlugin(resolvedTarget.channel); + const plugin = resolveOutboundChannelPlugin({ + channel: resolvedTarget.channel, + cfg, + }); if (plugin?.config.resolveAllowFrom) { const explicit = resolveOutboundTarget({ channel: resolvedTarget.channel, @@ -516,7 +528,10 @@ export function resolveHeartbeatSenderContext(params: { params.delivery.accountId ?? (provider === params.delivery.lastChannel ? params.delivery.lastAccountId : undefined); const allowFromRaw = provider - ? (getChannelPlugin(provider)?.config.resolveAllowFrom?.({ + ? (resolveOutboundChannelPlugin({ + channel: provider, + cfg: params.cfg, + })?.config.resolveAllowFrom?.({ cfg: params.cfg, accountId, }) ?? []) diff --git a/src/telegram/send.test.ts b/src/telegram/send.test.ts index 37d881d843c..afd616b5f15 100644 --- a/src/telegram/send.test.ts +++ b/src/telegram/send.test.ts @@ -196,6 +196,10 @@ describe("sendMessageTelegram", () => { for (const testCase of cases) { botCtorSpy.mockClear(); loadConfig.mockReturnValue(testCase.cfg); + botApi.sendMessage.mockResolvedValue({ + message_id: 1, + chat: { id: "123" }, + }); await sendMessageTelegram("123", "hi", testCase.opts); expect(botCtorSpy, testCase.name).toHaveBeenCalledWith( "tok", @@ -325,6 +329,40 @@ describe("sendMessageTelegram", () => { } }); + it("fails when Telegram text send returns no message_id", async () => { + const sendMessage = vi.fn().mockResolvedValue({ + chat: { id: "123" }, + }); + const api = { sendMessage } as unknown as { + sendMessage: typeof sendMessage; + }; + + await expect( + sendMessageTelegram("123", "hi", { + token: "tok", + api, + }), + ).rejects.toThrow(/returned no message_id/i); + }); + + it("fails when Telegram media send returns no message_id", async () => { + mockLoadedMedia({ contentType: "image/png", fileName: "photo.png" }); + const sendPhoto = vi.fn().mockResolvedValue({ + chat: { id: "123" }, + }); + const api = { sendPhoto } as unknown as { + sendPhoto: typeof sendPhoto; + }; + + await expect( + sendMessageTelegram("123", "caption", { + token: "tok", + api, + mediaUrl: "https://example.com/photo.png", + }), + ).rejects.toThrow(/returned no message_id/i); + }); + it("uses native fetch for BAN compatibility when api is omitted", async () => { const originalFetch = globalThis.fetch; const originalBun = (globalThis as { Bun?: unknown }).Bun; diff --git a/src/telegram/send.ts b/src/telegram/send.ts index 85327df22b5..ceaa9113e32 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -86,6 +86,16 @@ type TelegramReactionOpts = { retry?: RetryConfig; }; +function resolveTelegramMessageIdOrThrow( + result: TelegramMessageLike | null | undefined, + context: string, +): number { + if (typeof result?.message_id === "number" && Number.isFinite(result.message_id)) { + return Math.trunc(result.message_id); + } + throw new Error(`Telegram ${context} returned no message_id`); +} + const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity/i; const THREAD_NOT_FOUND_RE = /400:\s*Bad Request:\s*message thread not found/i; const MESSAGE_NOT_MODIFIED_RE = @@ -685,11 +695,9 @@ export async function sendMessageTelegram( })(); const result = await sendMedia(mediaSender.label, mediaSender.sender); - const mediaMessageId = String(result?.message_id ?? "unknown"); + const mediaMessageId = resolveTelegramMessageIdOrThrow(result, "media send"); const resolvedChatId = String(result?.chat?.id ?? chatId); - if (result?.message_id) { - recordSentMessage(chatId, result.message_id); - } + recordSentMessage(chatId, mediaMessageId); recordChannelActivity({ channel: "telegram", accountId: account.accountId, @@ -708,13 +716,15 @@ export async function sendMessageTelegram( : undefined; const textRes = await sendTelegramText(followUpText, textParams); // Return the text message ID as the "main" message (it's the actual content). + const textMessageId = resolveTelegramMessageIdOrThrow(textRes, "text follow-up send"); + recordSentMessage(chatId, textMessageId); return { - messageId: String(textRes?.message_id ?? mediaMessageId), + messageId: String(textMessageId), chatId: resolvedChatId, }; } - return { messageId: mediaMessageId, chatId: resolvedChatId }; + return { messageId: String(mediaMessageId), chatId: resolvedChatId }; } if (!text || !text.trim()) { @@ -728,16 +738,14 @@ export async function sendMessageTelegram( } : undefined; const res = await sendTelegramText(text, textParams, opts.plainText); - const messageId = String(res?.message_id ?? "unknown"); - if (res?.message_id) { - recordSentMessage(chatId, res.message_id); - } + const messageId = resolveTelegramMessageIdOrThrow(res, "text send"); + recordSentMessage(chatId, messageId); recordChannelActivity({ channel: "telegram", accountId: account.accountId, direction: "outbound", }); - return { messageId, chatId: String(res?.chat?.id ?? chatId) }; + return { messageId: String(messageId), chatId: String(res?.chat?.id ?? chatId) }; } export async function reactMessageTelegram( @@ -1013,18 +1021,16 @@ export async function sendStickerTelegram( requestWithChatNotFound(() => api.sendSticker(chatId, fileId.trim(), effectiveParams), label), ); - const messageId = String(result?.message_id ?? "unknown"); + const messageId = resolveTelegramMessageIdOrThrow(result, "sticker send"); const resolvedChatId = String(result?.chat?.id ?? chatId); - if (result?.message_id) { - recordSentMessage(chatId, result.message_id); - } + recordSentMessage(chatId, messageId); recordChannelActivity({ channel: "telegram", accountId: account.accountId, direction: "outbound", }); - return { messageId, chatId: resolvedChatId }; + return { messageId: String(messageId), chatId: resolvedChatId }; } type TelegramPollOpts = { @@ -1121,12 +1127,10 @@ export async function sendPollTelegram( ), ); - const messageId = String(result?.message_id ?? "unknown"); + const messageId = resolveTelegramMessageIdOrThrow(result, "poll send"); const resolvedChatId = String(result?.chat?.id ?? chatId); const pollId = result?.poll?.id; - if (result?.message_id) { - recordSentMessage(chatId, result.message_id); - } + recordSentMessage(chatId, messageId); recordChannelActivity({ channel: "telegram", @@ -1134,7 +1138,7 @@ export async function sendPollTelegram( direction: "outbound", }); - return { messageId, chatId: resolvedChatId, pollId }; + return { messageId: String(messageId), chatId: resolvedChatId, pollId }; } // ---------------------------------------------------------------------------