diff --git a/src/infra/outbound/message-action-runner.plugin-dispatch.test.ts b/src/infra/outbound/message-action-runner.plugin-dispatch.test.ts index 401002d97ae..265d33050f3 100644 --- a/src/infra/outbound/message-action-runner.plugin-dispatch.test.ts +++ b/src/infra/outbound/message-action-runner.plugin-dispatch.test.ts @@ -2,7 +2,11 @@ import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { jsonResult } from "../../agents/tools/common.js"; import { dispatchChannelMessageAction } from "../../channels/plugins/message-action-dispatch.js"; -import type { ChannelMessageActionContext, ChannelPlugin } from "../../channels/plugins/types.js"; +import type { + ChannelMessageActionContext, + ChannelMessageActionName, + ChannelPlugin, +} from "../../channels/plugins/types.js"; import type { OpenClawConfig } from "../../config/config.js"; import { getActivePluginRegistry, setActivePluginRegistry } from "../../plugins/runtime.js"; import { createTestRegistry } from "../../test-utils/channel-plugins.js"; @@ -98,6 +102,39 @@ function createPollForwardingPlugin(params: { }; } +function createGatewayActionPlugin(params: { + pluginId: string; + label: string; + blurb: string; + actions: ChannelMessageActionName[]; + gatewayActions?: ChannelMessageActionName[]; + capabilities?: ChannelPlugin["capabilities"]; + messaging?: ChannelPlugin["messaging"]; + handleAction: ChannelActionHandler; +}): ChannelPlugin { + const actions = new Set(params.actions); + const gatewayActions = new Set(params.gatewayActions ?? params.actions); + return { + id: params.pluginId, + meta: { + id: params.pluginId, + label: params.label, + selectionLabel: params.label, + docsPath: `/channels/${params.pluginId}`, + blurb: params.blurb, + }, + capabilities: params.capabilities ?? { chatTypes: ["direct"] }, + config: createAlwaysConfiguredPluginConfig(), + messaging: params.messaging, + actions: { + describeMessageTool: () => ({ actions: params.actions }), + supportsAction: ({ action }) => actions.has(action), + resolveExecutionMode: ({ action }) => (gatewayActions.has(action) ? "gateway" : "local"), + handleAction: params.handleAction, + }, + }; +} + async function executePluginAction(params: { action: "send" | "poll"; ctx: Pick< @@ -318,24 +355,14 @@ describe("runMessageAction plugin dispatch", () => { local: true, }), ); - const gatewayPlugin: ChannelPlugin = { - id: "gatewaychat", - meta: { - id: "gatewaychat", - label: "Gateway Chat", - selectionLabel: "Gateway Chat", - docsPath: "/channels/gatewaychat", - blurb: "Gateway Chat reaction test plugin.", - }, + const gatewayPlugin = createGatewayActionPlugin({ + pluginId: "gatewaychat", + label: "Gateway Chat", + blurb: "Gateway Chat reaction test plugin.", + actions: ["react"], capabilities: { chatTypes: ["direct"], reactions: true }, - config: createAlwaysConfiguredPluginConfig(), - actions: { - describeMessageTool: () => ({ actions: ["react"] }), - supportsAction: ({ action }) => action === "react", - resolveExecutionMode: ({ action }) => (action === "react" ? "gateway" : "local"), - handleAction, - }, - }; + handleAction, + }); setActivePluginRegistry( createTestRegistry([ { @@ -414,29 +441,18 @@ describe("runMessageAction plugin dispatch", () => { it("routes gateway-executed plugin sends through gateway RPC instead of local dispatch", async () => { const handleAction = vi.fn(async () => jsonResult({ ok: true, local: true })); - const gatewayPlugin: ChannelPlugin = { - id: "gatewaychat", - meta: { - id: "gatewaychat", - label: "Gateway Chat", - selectionLabel: "Gateway Chat", - docsPath: "/channels/gatewaychat", - blurb: "Gateway Chat send test plugin.", - }, - capabilities: { chatTypes: ["direct"] }, - config: createAlwaysConfiguredPluginConfig(), + const gatewayPlugin = createGatewayActionPlugin({ + pluginId: "gatewaychat", + label: "Gateway Chat", + blurb: "Gateway Chat send test plugin.", + actions: ["send"], messaging: { targetResolver: { looksLikeId: () => true, }, }, - actions: { - describeMessageTool: () => ({ actions: ["send"] }), - supportsAction: ({ action }) => action === "send", - resolveExecutionMode: ({ action }) => (action === "send" ? "gateway" : "local"), - handleAction, - }, - }; + handleAction, + }); setActivePluginRegistry( createTestRegistry([ { @@ -1003,19 +1019,18 @@ describe("runMessageAction plugin dispatch", () => { it("routes gateway-executed plugin polls through gateway RPC instead of local dispatch", async () => { const handleAction = vi.fn(async () => jsonResult({ ok: true, local: true })); - const pollGatewayPlugin = createPollForwardingPlugin({ + const pollGatewayPlugin = createGatewayActionPlugin({ pluginId: "pollchat", label: "Poll Chat", blurb: "Poll chat gateway forwarding test plugin.", + actions: ["poll"], + messaging: { + targetResolver: { + looksLikeId: () => true, + }, + }, handleAction, }); - const baseActions = pollGatewayPlugin.actions!; - pollGatewayPlugin.actions = { - describeMessageTool: baseActions.describeMessageTool, - supportsAction: baseActions.supportsAction, - handleAction: baseActions.handleAction, - resolveExecutionMode: ({ action }) => (action === "poll" ? "gateway" : "local"), - }; setActivePluginRegistry( createTestRegistry([ { diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index 712184fe720..440cd55c037 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -366,7 +366,7 @@ type ResolvedActionContext = { abortSignal?: AbortSignal; }; -async function maybeCallGatewayPluginMessageAction(params: { +async function runGatewayPluginMessageActionOrNull(params: { cfg: OpenClawConfig; params: Record; channel: ChannelId; @@ -376,7 +376,8 @@ async function maybeCallGatewayPluginMessageAction(params: { gateway?: MessageActionRunnerGateway; input: RunMessageActionParams; agentId?: string; -}): Promise<{ payload: unknown } | null> { + result: (payload: unknown) => MessageActionRunResult; +}): Promise { if (params.dryRun || !params.gateway) { return null; } @@ -388,26 +389,25 @@ async function maybeCallGatewayPluginMessageAction(params: { if (executionMode !== "gateway") { return null; } - return { - payload: await callGatewayMessageAction({ - gateway: params.gateway, - actionParams: { - channel: params.channel, - action: params.action, - params: params.params, - accountId: params.accountId ?? undefined, - requesterSenderId: params.input.requesterSenderId ?? undefined, - senderIsOwner: params.input.senderIsOwner, - sessionKey: params.input.sessionKey, - sessionId: params.input.sessionId, - agentId: params.agentId, - toolContext: params.input.toolContext, - idempotencyKey: await resolveGatewayActionIdempotencyKey( - normalizeOptionalString(params.params.idempotencyKey), - ), - }, - }), - }; + const payload = await callGatewayMessageAction({ + gateway: params.gateway, + actionParams: { + channel: params.channel, + action: params.action, + params: params.params, + accountId: params.accountId ?? undefined, + requesterSenderId: params.input.requesterSenderId ?? undefined, + senderIsOwner: params.input.senderIsOwner, + sessionKey: params.input.sessionKey, + sessionId: params.input.sessionId, + agentId: params.agentId, + toolContext: params.input.toolContext, + idempotencyKey: await resolveGatewayActionIdempotencyKey( + normalizeOptionalString(params.params.idempotencyKey), + ), + }, + }); + return params.result(payload); } function resolveGateway(input: RunMessageActionParams): MessageActionRunnerGateway | undefined { @@ -636,7 +636,7 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise 0 ? mergedMediaUrls : mediaUrl ? [mediaUrl] : undefined; throwIfAborted(abortSignal); - const gatewayPluginAction = await maybeCallGatewayPluginMessageAction({ + const gatewayPluginAction = await runGatewayPluginMessageActionOrNull({ cfg, params, channel, @@ -646,17 +646,18 @@ async function handleSendAction(ctx: ResolvedActionContext): Promise ({ kind: "send", channel, action, to, handledBy: "plugin", - payload: gatewayPluginAction.payload, + payload, dryRun, - }; + }), + }); + if (gatewayPluginAction) { + return gatewayPluginAction; } const send = await executeSendAction({ @@ -743,7 +744,7 @@ async function handlePollAction(ctx: ResolvedActionContext): Promise ({ kind: "poll", channel, action, to, handledBy: "plugin", - payload: gatewayPluginAction.payload, + payload, dryRun, - }; + }), + }); + if (gatewayPluginAction) { + return gatewayPluginAction; } const poll = await executePollAction({ @@ -850,7 +852,7 @@ async function handlePluginAction(ctx: ResolvedActionContext): Promise ({ kind: "action", channel, action, handledBy: "plugin", - payload: gatewayPluginAction.payload, + payload, dryRun, - }; + }), + }); + if (gatewayPluginAction) { + // Gateway-owned actions must execute where the live channel runtime exists. + return gatewayPluginAction; } const handled = await dispatchChannelMessageAction({