From ed21b63bb8490274bcaa7e90e062f2a82d445be8 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 2 Mar 2026 15:14:46 +0000 Subject: [PATCH] refactor(plugin-sdk): share auth, routing, and stream/account helpers --- extensions/googlechat/src/monitor.ts | 13 +- .../matrix/src/matrix/monitor/allowlist.ts | 16 +-- extensions/nextcloud-talk/src/accounts.ts | 46 +++---- extensions/zalo/src/monitor.ts | 124 ++++++++---------- extensions/zalouser/src/monitor.ts | 114 ++++++++-------- .../bash-tools.exec-approval-request.ts | 10 ++ src/agents/bash-tools.exec-host-gateway.ts | 13 +- src/agents/bash-tools.exec-host-node.ts | 13 +- src/agents/ollama-stream.ts | 31 ++--- src/agents/openai-ws-stream.ts | 28 ++-- src/agents/stream-message-shared.ts | 41 +++++- src/channels/allowlist-match.ts | 19 +++ src/plugin-sdk/account-resolution.ts | 41 ++++++ src/plugin-sdk/command-auth.ts | 42 ++++++ src/plugin-sdk/inbound-envelope.ts | 109 +++++++++++++++ src/plugin-sdk/index.ts | 18 ++- src/slack/monitor/allow-list.ts | 19 +-- src/telegram/accounts.ts | 46 +++---- 18 files changed, 457 insertions(+), 286 deletions(-) create mode 100644 src/plugin-sdk/account-resolution.ts diff --git a/extensions/googlechat/src/monitor.ts b/extensions/googlechat/src/monitor.ts index ce81c4a9d64..97841a0b4c3 100644 --- a/extensions/googlechat/src/monitor.ts +++ b/extensions/googlechat/src/monitor.ts @@ -2,7 +2,6 @@ import type { IncomingMessage, ServerResponse } from "node:http"; import type { OpenClawConfig } from "openclaw/plugin-sdk"; import { GROUP_POLICY_BLOCKED_LABEL, - createInboundEnvelopeBuilder, createScopedPairingAccess, createReplyPrefixOptions, readJsonBodyWithLimit, @@ -11,6 +10,7 @@ import { isDangerousNameMatchingEnabled, resolveAllowlistProviderRuntimeGroupPolicy, resolveDefaultGroupPolicy, + resolveInboundRouteEnvelopeBuilderWithRuntime, resolveSingleWebhookTargetAsync, resolveWebhookPath, resolveWebhookTargets, @@ -638,7 +638,7 @@ async function processMessageWithPipeline(params: { return; } - const route = core.channel.routing.resolveAgentRoute({ + const { route, buildEnvelope } = resolveInboundRouteEnvelopeBuilderWithRuntime({ cfg: config, channel: "googlechat", accountId: account.accountId, @@ -646,15 +646,8 @@ async function processMessageWithPipeline(params: { kind: isGroup ? "group" : "direct", id: spaceId, }, - }); - const buildEnvelope = createInboundEnvelopeBuilder({ - cfg: config, - route, + runtime: core.channel, sessionStore: config.session?.store, - resolveStorePath: core.channel.session.resolveStorePath, - readSessionUpdatedAt: core.channel.session.readSessionUpdatedAt, - resolveEnvelopeFormatOptions: core.channel.reply.resolveEnvelopeFormatOptions, - formatAgentEnvelope: core.channel.reply.formatAgentEnvelope, }); let mediaPath: string | undefined; diff --git a/extensions/matrix/src/matrix/monitor/allowlist.ts b/extensions/matrix/src/matrix/monitor/allowlist.ts index 754f3ee24f7..17da78b3611 100644 --- a/extensions/matrix/src/matrix/monitor/allowlist.ts +++ b/extensions/matrix/src/matrix/monitor/allowlist.ts @@ -1,4 +1,4 @@ -import type { AllowlistMatch } from "openclaw/plugin-sdk"; +import { resolveAllowlistMatchByCandidates, type AllowlistMatch } from "openclaw/plugin-sdk"; function normalizeAllowList(list?: Array) { return (list ?? []).map((entry) => String(entry).trim()).filter(Boolean); @@ -83,19 +83,7 @@ export function resolveMatrixAllowListMatch(params: { { value: userId ? `matrix:${userId}` : "", source: "prefixed-id" }, { value: userId ? `user:${userId}` : "", source: "prefixed-user" }, ]; - for (const candidate of candidates) { - if (!candidate.value) { - continue; - } - if (allowList.includes(candidate.value)) { - return { - allowed: true, - matchKey: candidate.value, - matchSource: candidate.source, - }; - } - } - return { allowed: false }; + return resolveAllowlistMatchByCandidates({ allowList, candidates }); } export function resolveMatrixAllowListMatches(params: { allowList: string[]; userId?: string }) { diff --git a/extensions/nextcloud-talk/src/accounts.ts b/extensions/nextcloud-talk/src/accounts.ts index 4a059be4981..aef80af8953 100644 --- a/extensions/nextcloud-talk/src/accounts.ts +++ b/extensions/nextcloud-talk/src/accounts.ts @@ -1,4 +1,8 @@ import { readFileSync } from "node:fs"; +import { + listConfiguredAccountIds as listConfiguredAccountIdsFromSection, + resolveAccountWithDefaultFallback, +} from "openclaw/plugin-sdk"; import { DEFAULT_ACCOUNT_ID, normalizeAccountId, @@ -28,18 +32,10 @@ export type ResolvedNextcloudTalkAccount = { }; function listConfiguredAccountIds(cfg: CoreConfig): string[] { - const accounts = cfg.channels?.["nextcloud-talk"]?.accounts; - if (!accounts || typeof accounts !== "object") { - return []; - } - const ids = new Set(); - for (const key of Object.keys(accounts)) { - if (!key) { - continue; - } - ids.add(normalizeAccountId(key)); - } - return [...ids]; + return listConfiguredAccountIdsFromSection({ + accounts: cfg.channels?.["nextcloud-talk"]?.accounts as Record | undefined, + normalizeAccountId, + }); } export function listNextcloudTalkAccountIds(cfg: CoreConfig): string[] { @@ -134,7 +130,6 @@ export function resolveNextcloudTalkAccount(params: { cfg: CoreConfig; accountId?: string | null; }): ResolvedNextcloudTalkAccount { - const hasExplicitAccountId = Boolean(params.accountId?.trim()); const baseEnabled = params.cfg.channels?.["nextcloud-talk"]?.enabled !== false; const resolve = (accountId: string) => { @@ -162,24 +157,13 @@ export function resolveNextcloudTalkAccount(params: { } satisfies ResolvedNextcloudTalkAccount; }; - const normalized = normalizeAccountId(params.accountId); - const primary = resolve(normalized); - if (hasExplicitAccountId) { - return primary; - } - if (primary.secretSource !== "none") { - return primary; - } - - const fallbackId = resolveDefaultNextcloudTalkAccountId(params.cfg); - if (fallbackId === primary.accountId) { - return primary; - } - const fallback = resolve(fallbackId); - if (fallback.secretSource === "none") { - return primary; - } - return fallback; + return resolveAccountWithDefaultFallback({ + accountId: params.accountId, + normalizeAccountId, + resolvePrimary: resolve, + hasCredential: (account) => account.secretSource !== "none", + resolveDefaultAccountId: () => resolveDefaultNextcloudTalkAccountId(params.cfg), + }); } export function listEnabledNextcloudTalkAccounts(cfg: CoreConfig): ResolvedNextcloudTalkAccount[] { diff --git a/extensions/zalo/src/monitor.ts b/extensions/zalo/src/monitor.ts index e2a2edd1be0..be7e7082a60 100644 --- a/extensions/zalo/src/monitor.ts +++ b/extensions/zalo/src/monitor.ts @@ -1,12 +1,13 @@ import type { IncomingMessage, ServerResponse } from "node:http"; import type { MarkdownTableMode, OpenClawConfig, OutboundReplyPayload } from "openclaw/plugin-sdk"; import { - createInboundEnvelopeBuilder, createScopedPairingAccess, createReplyPrefixOptions, - resolveSenderCommandAuthorization, + resolveDirectDmAuthorizationOutcome, + resolveSenderCommandAuthorizationWithRuntime, resolveOutboundMediaUrls, resolveDefaultGroupPolicy, + resolveInboundRouteEnvelopeBuilderWithRuntime, sendMediaWithLeadingCaption, resolveWebhookPath, warnMissingProviderGroupPolicyFallbackOnce, @@ -367,75 +368,67 @@ async function processMessageWithPipeline(params: { } const rawBody = text?.trim() || (mediaPath ? "" : ""); - const { senderAllowedForCommands, commandAuthorized } = await resolveSenderCommandAuthorization({ - cfg: config, - rawBody, + const { senderAllowedForCommands, commandAuthorized } = + await resolveSenderCommandAuthorizationWithRuntime({ + cfg: config, + rawBody, + isGroup, + dmPolicy, + configuredAllowFrom: configAllowFrom, + configuredGroupAllowFrom: groupAllowFrom, + senderId, + isSenderAllowed: isZaloSenderAllowed, + readAllowFromStore: pairing.readAllowFromStore, + runtime: core.channel.commands, + }); + + const directDmOutcome = resolveDirectDmAuthorizationOutcome({ isGroup, dmPolicy, - configuredAllowFrom: configAllowFrom, - configuredGroupAllowFrom: groupAllowFrom, - senderId, - isSenderAllowed: isZaloSenderAllowed, - readAllowFromStore: pairing.readAllowFromStore, - shouldComputeCommandAuthorized: (body, cfg) => - core.channel.commands.shouldComputeCommandAuthorized(body, cfg), - resolveCommandAuthorizedFromAuthorizers: (params) => - core.channel.commands.resolveCommandAuthorizedFromAuthorizers(params), + senderAllowedForCommands, }); + if (directDmOutcome === "disabled") { + logVerbose(core, runtime, `Blocked zalo DM from ${senderId} (dmPolicy=disabled)`); + return; + } + if (directDmOutcome === "unauthorized") { + if (dmPolicy === "pairing") { + const { code, created } = await pairing.upsertPairingRequest({ + id: senderId, + meta: { name: senderName ?? undefined }, + }); - if (!isGroup) { - if (dmPolicy === "disabled") { - logVerbose(core, runtime, `Blocked zalo DM from ${senderId} (dmPolicy=disabled)`); - return; - } - - if (dmPolicy !== "open") { - const allowed = senderAllowedForCommands; - - if (!allowed) { - if (dmPolicy === "pairing") { - const { code, created } = await pairing.upsertPairingRequest({ - id: senderId, - meta: { name: senderName ?? undefined }, - }); - - if (created) { - logVerbose(core, runtime, `zalo pairing request sender=${senderId}`); - try { - await sendMessage( - token, - { - chat_id: chatId, - text: core.channel.pairing.buildPairingReply({ - channel: "zalo", - idLine: `Your Zalo user id: ${senderId}`, - code, - }), - }, - fetcher, - ); - statusSink?.({ lastOutboundAt: Date.now() }); - } catch (err) { - logVerbose( - core, - runtime, - `zalo pairing reply failed for ${senderId}: ${String(err)}`, - ); - } - } - } else { - logVerbose( - core, - runtime, - `Blocked unauthorized zalo sender ${senderId} (dmPolicy=${dmPolicy})`, + if (created) { + logVerbose(core, runtime, `zalo pairing request sender=${senderId}`); + try { + await sendMessage( + token, + { + chat_id: chatId, + text: core.channel.pairing.buildPairingReply({ + channel: "zalo", + idLine: `Your Zalo user id: ${senderId}`, + code, + }), + }, + fetcher, ); + statusSink?.({ lastOutboundAt: Date.now() }); + } catch (err) { + logVerbose(core, runtime, `zalo pairing reply failed for ${senderId}: ${String(err)}`); } - return; } + } else { + logVerbose( + core, + runtime, + `Blocked unauthorized zalo sender ${senderId} (dmPolicy=${dmPolicy})`, + ); } + return; } - const route = core.channel.routing.resolveAgentRoute({ + const { route, buildEnvelope } = resolveInboundRouteEnvelopeBuilderWithRuntime({ cfg: config, channel: "zalo", accountId: account.accountId, @@ -443,15 +436,8 @@ async function processMessageWithPipeline(params: { kind: isGroup ? "group" : "direct", id: chatId, }, - }); - const buildEnvelope = createInboundEnvelopeBuilder({ - cfg: config, - route, + runtime: core.channel, sessionStore: config.session?.store, - resolveStorePath: core.channel.session.resolveStorePath, - readSessionUpdatedAt: core.channel.session.readSessionUpdatedAt, - resolveEnvelopeFormatOptions: core.channel.reply.resolveEnvelopeFormatOptions, - formatAgentEnvelope: core.channel.reply.formatAgentEnvelope, }); if ( diff --git a/extensions/zalouser/src/monitor.ts b/extensions/zalouser/src/monitor.ts index 72c8753fe71..9d95943242c 100644 --- a/extensions/zalouser/src/monitor.ts +++ b/extensions/zalouser/src/monitor.ts @@ -6,14 +6,15 @@ import type { RuntimeEnv, } from "openclaw/plugin-sdk"; import { - createInboundEnvelopeBuilder, createScopedPairingAccess, createReplyPrefixOptions, resolveOutboundMediaUrls, mergeAllowlist, + resolveDirectDmAuthorizationOutcome, resolveOpenProviderRuntimeGroupPolicy, resolveDefaultGroupPolicy, - resolveSenderCommandAuthorization, + resolveInboundRouteEnvelopeBuilderWithRuntime, + resolveSenderCommandAuthorizationWithRuntime, sendMediaWithLeadingCaption, summarizeMapping, warnMissingProviderGroupPolicyFallbackOnce, @@ -224,68 +225,64 @@ async function processMessage( const dmPolicy = account.config.dmPolicy ?? "pairing"; const configAllowFrom = (account.config.allowFrom ?? []).map((v) => String(v)); const rawBody = content.trim(); - const { senderAllowedForCommands, commandAuthorized } = await resolveSenderCommandAuthorization({ - cfg: config, - rawBody, + const { senderAllowedForCommands, commandAuthorized } = + await resolveSenderCommandAuthorizationWithRuntime({ + cfg: config, + rawBody, + isGroup, + dmPolicy, + configuredAllowFrom: configAllowFrom, + senderId, + isSenderAllowed, + readAllowFromStore: pairing.readAllowFromStore, + runtime: core.channel.commands, + }); + + const directDmOutcome = resolveDirectDmAuthorizationOutcome({ isGroup, dmPolicy, - configuredAllowFrom: configAllowFrom, - senderId, - isSenderAllowed, - readAllowFromStore: pairing.readAllowFromStore, - shouldComputeCommandAuthorized: (body, cfg) => - core.channel.commands.shouldComputeCommandAuthorized(body, cfg), - resolveCommandAuthorizedFromAuthorizers: (params) => - core.channel.commands.resolveCommandAuthorizedFromAuthorizers(params), + senderAllowedForCommands, }); + if (directDmOutcome === "disabled") { + logVerbose(core, runtime, `Blocked zalouser DM from ${senderId} (dmPolicy=disabled)`); + return; + } + if (directDmOutcome === "unauthorized") { + if (dmPolicy === "pairing") { + const { code, created } = await pairing.upsertPairingRequest({ + id: senderId, + meta: { name: senderName || undefined }, + }); - if (!isGroup) { - if (dmPolicy === "disabled") { - logVerbose(core, runtime, `Blocked zalouser DM from ${senderId} (dmPolicy=disabled)`); - return; - } - - if (dmPolicy !== "open") { - const allowed = senderAllowedForCommands; - - if (!allowed) { - if (dmPolicy === "pairing") { - const { code, created } = await pairing.upsertPairingRequest({ - id: senderId, - meta: { name: senderName || undefined }, - }); - - if (created) { - logVerbose(core, runtime, `zalouser pairing request sender=${senderId}`); - try { - await sendMessageZalouser( - chatId, - core.channel.pairing.buildPairingReply({ - channel: "zalouser", - idLine: `Your Zalo user id: ${senderId}`, - code, - }), - { profile: account.profile }, - ); - statusSink?.({ lastOutboundAt: Date.now() }); - } catch (err) { - logVerbose( - core, - runtime, - `zalouser pairing reply failed for ${senderId}: ${String(err)}`, - ); - } - } - } else { + if (created) { + logVerbose(core, runtime, `zalouser pairing request sender=${senderId}`); + try { + await sendMessageZalouser( + chatId, + core.channel.pairing.buildPairingReply({ + channel: "zalouser", + idLine: `Your Zalo user id: ${senderId}`, + code, + }), + { profile: account.profile }, + ); + statusSink?.({ lastOutboundAt: Date.now() }); + } catch (err) { logVerbose( core, runtime, - `Blocked unauthorized zalouser sender ${senderId} (dmPolicy=${dmPolicy})`, + `zalouser pairing reply failed for ${senderId}: ${String(err)}`, ); } - return; } + } else { + logVerbose( + core, + runtime, + `Blocked unauthorized zalouser sender ${senderId} (dmPolicy=${dmPolicy})`, + ); } + return; } if ( @@ -305,7 +302,7 @@ async function processMessage( ? { kind: "group" as const, id: chatId } : { kind: "group" as const, id: senderId }; - const route = core.channel.routing.resolveAgentRoute({ + const { route, buildEnvelope } = resolveInboundRouteEnvelopeBuilderWithRuntime({ cfg: config, channel: "zalouser", accountId: account.accountId, @@ -314,15 +311,8 @@ async function processMessage( kind: peer.kind, id: peer.id, }, - }); - const buildEnvelope = createInboundEnvelopeBuilder({ - cfg: config, - route, + runtime: core.channel, sessionStore: config.session?.store, - resolveStorePath: core.channel.session.resolveStorePath, - readSessionUpdatedAt: core.channel.session.readSessionUpdatedAt, - resolveEnvelopeFormatOptions: core.channel.reply.resolveEnvelopeFormatOptions, - formatAgentEnvelope: core.channel.reply.formatAgentEnvelope, }); const fromLabel = isGroup ? `group:${chatId}` : senderName || `user:${senderId}`; diff --git a/src/agents/bash-tools.exec-approval-request.ts b/src/agents/bash-tools.exec-approval-request.ts index 02c5e5d2d95..7c28827c051 100644 --- a/src/agents/bash-tools.exec-approval-request.ts +++ b/src/agents/bash-tools.exec-approval-request.ts @@ -128,6 +128,16 @@ export async function waitForExecApprovalDecision(id: string): Promise { + if (params.preResolvedDecision !== undefined) { + return params.preResolvedDecision ?? null; + } + return await waitForExecApprovalDecision(params.approvalId); +} + export async function requestExecApprovalDecision( params: RequestExecApprovalDecisionParams, ): Promise { diff --git a/src/agents/bash-tools.exec-host-gateway.ts b/src/agents/bash-tools.exec-host-gateway.ts index 265b98ebf2c..d2a0ad7259f 100644 --- a/src/agents/bash-tools.exec-host-gateway.ts +++ b/src/agents/bash-tools.exec-host-gateway.ts @@ -19,9 +19,9 @@ import { logInfo } from "../logger.js"; import { markBackgrounded, tail } from "./bash-process-registry.js"; import { buildExecApprovalRequesterContext, + resolveRegisteredExecApprovalDecision, buildExecApprovalTurnSourceContext, registerExecApprovalRequestForHostOrThrow, - waitForExecApprovalDecision, } from "./bash-tools.exec-approval-request.js"; import { DEFAULT_APPROVAL_TIMEOUT_MS, @@ -172,13 +172,12 @@ export async function processGatewayAllowlist( preResolvedDecision = registration.finalDecision; void (async () => { - let decision: string | null = preResolvedDecision ?? null; + let decision: string | null = null; try { - // Some gateways may return a final decision inline during registration. - // Only call waitDecision when registration did not already carry one. - if (preResolvedDecision === undefined) { - decision = await waitForExecApprovalDecision(approvalId); - } + decision = await resolveRegisteredExecApprovalDecision({ + approvalId, + preResolvedDecision, + }); } catch { emitExecSystemEvent( `Exec denied (gateway id=${approvalId}, approval-request-failed): ${params.command}`, diff --git a/src/agents/bash-tools.exec-host-node.ts b/src/agents/bash-tools.exec-host-node.ts index 69cc36d73fa..c9a85566c05 100644 --- a/src/agents/bash-tools.exec-host-node.ts +++ b/src/agents/bash-tools.exec-host-node.ts @@ -17,9 +17,9 @@ import { parsePreparedSystemRunPayload } from "../infra/system-run-approval-cont import { logInfo } from "../logger.js"; import { buildExecApprovalRequesterContext, + resolveRegisteredExecApprovalDecision, buildExecApprovalTurnSourceContext, registerExecApprovalRequestForHostOrThrow, - waitForExecApprovalDecision, } from "./bash-tools.exec-approval-request.js"; import { DEFAULT_APPROVAL_TIMEOUT_MS, @@ -243,13 +243,12 @@ export async function executeNodeHostCommand( preResolvedDecision = registration.finalDecision; void (async () => { - let decision: string | null = preResolvedDecision ?? null; + let decision: string | null = null; try { - // Some gateways may return a final decision inline during registration. - // Only call waitDecision when registration did not already carry one. - if (preResolvedDecision === undefined) { - decision = await waitForExecApprovalDecision(approvalId); - } + decision = await resolveRegisteredExecApprovalDecision({ + approvalId, + preResolvedDecision, + }); } catch { emitExecSystemEvent( `Exec denied (node=${nodeId} id=${approvalId}, approval-request-failed): ${params.command}`, diff --git a/src/agents/ollama-stream.ts b/src/agents/ollama-stream.ts index dd93dc90ae3..5040b37737a 100644 --- a/src/agents/ollama-stream.ts +++ b/src/agents/ollama-stream.ts @@ -6,11 +6,14 @@ import type { TextContent, ToolCall, Tool, - Usage, } from "@mariozechner/pi-ai"; import { createAssistantMessageEventStream } from "@mariozechner/pi-ai"; import { createSubsystemLogger } from "../logging/subsystem.js"; -import { buildStreamErrorAssistantMessage } from "./stream-message-shared.js"; +import { + buildAssistantMessage as buildStreamAssistantMessage, + buildStreamErrorAssistantMessage, + buildUsageWithNoCost, +} from "./stream-message-shared.js"; const log = createSubsystemLogger("ollama-stream"); @@ -343,25 +346,15 @@ export function buildAssistantMessage( const hasToolCalls = toolCalls && toolCalls.length > 0; const stopReason: StopReason = hasToolCalls ? "toolUse" : "stop"; - const usage: Usage = { - input: response.prompt_eval_count ?? 0, - output: response.eval_count ?? 0, - cacheRead: 0, - cacheWrite: 0, - totalTokens: (response.prompt_eval_count ?? 0) + (response.eval_count ?? 0), - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, - }; - - return { - role: "assistant", + return buildStreamAssistantMessage({ + model: modelInfo, content, stopReason, - api: modelInfo.api, - provider: modelInfo.provider, - model: modelInfo.id, - usage, - timestamp: Date.now(), - }; + usage: buildUsageWithNoCost({ + input: response.prompt_eval_count ?? 0, + output: response.eval_count ?? 0, + }), + }); } // ── NDJSON streaming parser ───────────────────────────────────────────────── diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index acc51f7e770..4563f2e3781 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -30,7 +30,6 @@ import type { StopReason, TextContent, ToolCall, - Usage, } from "@mariozechner/pi-ai"; import { createAssistantMessageEventStream, streamSimple } from "@mariozechner/pi-ai"; import { @@ -43,7 +42,9 @@ import { } from "./openai-ws-connection.js"; import { log } from "./pi-embedded-runner/logger.js"; import { + buildAssistantMessage, buildAssistantMessageWithZeroUsage, + buildUsageWithNoCost, buildStreamErrorAssistantMessage, } from "./stream-message-shared.js"; @@ -298,25 +299,16 @@ export function buildAssistantMessageFromResponse( const hasToolCalls = content.some((c) => c.type === "toolCall"); const stopReason: StopReason = hasToolCalls ? "toolUse" : "stop"; - const usage: Usage = { - input: response.usage?.input_tokens ?? 0, - output: response.usage?.output_tokens ?? 0, - cacheRead: 0, - cacheWrite: 0, - totalTokens: response.usage?.total_tokens ?? 0, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, - }; - - return { - role: "assistant", + return buildAssistantMessage({ + model: modelInfo, content, stopReason, - api: modelInfo.api, - provider: modelInfo.provider, - model: modelInfo.id, - usage, - timestamp: Date.now(), - }; + usage: buildUsageWithNoCost({ + input: response.usage?.input_tokens ?? 0, + output: response.usage?.output_tokens ?? 0, + totalTokens: response.usage?.total_tokens ?? 0, + }), + }); } // ───────────────────────────────────────────────────────────────────────────── diff --git a/src/agents/stream-message-shared.ts b/src/agents/stream-message-shared.ts index 696c09890d0..5c3f0b0d995 100644 --- a/src/agents/stream-message-shared.ts +++ b/src/agents/stream-message-shared.ts @@ -17,10 +17,32 @@ export function buildZeroUsage(): Usage { }; } -export function buildAssistantMessageWithZeroUsage(params: { +export function buildUsageWithNoCost(params: { + input?: number; + output?: number; + cacheRead?: number; + cacheWrite?: number; + totalTokens?: number; +}): Usage { + const input = params.input ?? 0; + const output = params.output ?? 0; + const cacheRead = params.cacheRead ?? 0; + const cacheWrite = params.cacheWrite ?? 0; + return { + input, + output, + cacheRead, + cacheWrite, + totalTokens: params.totalTokens ?? input + output, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }; +} + +export function buildAssistantMessage(params: { model: StreamModelDescriptor; content: AssistantMessage["content"]; stopReason: StopReason; + usage: Usage; timestamp?: number; }): AssistantMessage { return { @@ -30,11 +52,26 @@ export function buildAssistantMessageWithZeroUsage(params: { api: params.model.api, provider: params.model.provider, model: params.model.id, - usage: buildZeroUsage(), + usage: params.usage, timestamp: params.timestamp ?? Date.now(), }; } +export function buildAssistantMessageWithZeroUsage(params: { + model: StreamModelDescriptor; + content: AssistantMessage["content"]; + stopReason: StopReason; + timestamp?: number; +}): AssistantMessage { + return buildAssistantMessage({ + model: params.model, + content: params.content, + stopReason: params.stopReason, + usage: buildZeroUsage(), + timestamp: params.timestamp, + }); +} + export function buildStreamErrorAssistantMessage(params: { model: StreamModelDescriptor; errorMessage: string; diff --git a/src/channels/allowlist-match.ts b/src/channels/allowlist-match.ts index 74ed2c25931..23063878a49 100644 --- a/src/channels/allowlist-match.ts +++ b/src/channels/allowlist-match.ts @@ -22,6 +22,25 @@ export function formatAllowlistMatchMeta( return `matchKey=${match?.matchKey ?? "none"} matchSource=${match?.matchSource ?? "none"}`; } +export function resolveAllowlistMatchByCandidates(params: { + allowList: string[]; + candidates: Array<{ value?: string; source: TSource }>; +}): AllowlistMatch { + for (const candidate of params.candidates) { + if (!candidate.value) { + continue; + } + if (params.allowList.includes(candidate.value)) { + return { + allowed: true, + matchKey: candidate.value, + matchSource: candidate.source, + }; + } + } + return { allowed: false }; +} + export function resolveAllowlistMatchSimple(params: { allowFrom: Array; senderId: string; diff --git a/src/plugin-sdk/account-resolution.ts b/src/plugin-sdk/account-resolution.ts new file mode 100644 index 00000000000..e25c2cc74cb --- /dev/null +++ b/src/plugin-sdk/account-resolution.ts @@ -0,0 +1,41 @@ +export function resolveAccountWithDefaultFallback(params: { + accountId?: string | null; + normalizeAccountId: (accountId?: string | null) => string; + resolvePrimary: (accountId: string) => TAccount; + hasCredential: (account: TAccount) => boolean; + resolveDefaultAccountId: () => string; +}): TAccount { + const hasExplicitAccountId = Boolean(params.accountId?.trim()); + const normalizedAccountId = params.normalizeAccountId(params.accountId); + const primary = params.resolvePrimary(normalizedAccountId); + if (hasExplicitAccountId || params.hasCredential(primary)) { + return primary; + } + + const fallbackId = params.resolveDefaultAccountId(); + if (fallbackId === normalizedAccountId) { + return primary; + } + const fallback = params.resolvePrimary(fallbackId); + if (!params.hasCredential(fallback)) { + return primary; + } + return fallback; +} + +export function listConfiguredAccountIds(params: { + accounts: Record | undefined; + normalizeAccountId: (accountId: string) => string; +}): string[] { + if (!params.accounts) { + return []; + } + const ids = new Set(); + for (const key of Object.keys(params.accounts)) { + if (!key) { + continue; + } + ids.add(params.normalizeAccountId(key)); + } + return [...ids]; +} diff --git a/src/plugin-sdk/command-auth.ts b/src/plugin-sdk/command-auth.ts index cc7d9d2207a..2e95974cf1f 100644 --- a/src/plugin-sdk/command-auth.ts +++ b/src/plugin-sdk/command-auth.ts @@ -18,6 +18,48 @@ export type ResolveSenderCommandAuthorizationParams = { }) => boolean; }; +export type CommandAuthorizationRuntime = { + shouldComputeCommandAuthorized: (rawBody: string, cfg: OpenClawConfig) => boolean; + resolveCommandAuthorizedFromAuthorizers: (params: { + useAccessGroups: boolean; + authorizers: Array<{ configured: boolean; allowed: boolean }>; + }) => boolean; +}; + +export type ResolveSenderCommandAuthorizationWithRuntimeParams = Omit< + ResolveSenderCommandAuthorizationParams, + "shouldComputeCommandAuthorized" | "resolveCommandAuthorizedFromAuthorizers" +> & { + runtime: CommandAuthorizationRuntime; +}; + +export function resolveDirectDmAuthorizationOutcome(params: { + isGroup: boolean; + dmPolicy: string; + senderAllowedForCommands: boolean; +}): "disabled" | "unauthorized" | "allowed" { + if (params.isGroup) { + return "allowed"; + } + if (params.dmPolicy === "disabled") { + return "disabled"; + } + if (params.dmPolicy !== "open" && !params.senderAllowedForCommands) { + return "unauthorized"; + } + return "allowed"; +} + +export async function resolveSenderCommandAuthorizationWithRuntime( + params: ResolveSenderCommandAuthorizationWithRuntimeParams, +): ReturnType { + return resolveSenderCommandAuthorization({ + ...params, + shouldComputeCommandAuthorized: params.runtime.shouldComputeCommandAuthorized, + resolveCommandAuthorizedFromAuthorizers: params.runtime.resolveCommandAuthorizedFromAuthorizers, + }); +} + export async function resolveSenderCommandAuthorization( params: ResolveSenderCommandAuthorizationParams, ): Promise<{ diff --git a/src/plugin-sdk/inbound-envelope.ts b/src/plugin-sdk/inbound-envelope.ts index 84f6664c295..b6d220b1648 100644 --- a/src/plugin-sdk/inbound-envelope.ts +++ b/src/plugin-sdk/inbound-envelope.ts @@ -3,6 +3,11 @@ type RouteLike = { sessionKey: string; }; +type RoutePeerLike = { + kind: string; + id: string | number; +}; + export function createInboundEnvelopeBuilder(params: { cfg: TConfig; route: RouteLike; @@ -39,3 +44,107 @@ export function createInboundEnvelopeBuilder(params: { return { storePath, body }; }; } + +export function resolveInboundRouteEnvelopeBuilder< + TConfig, + TEnvelope, + TRoute extends RouteLike, +>(params: { + cfg: TConfig; + channel: string; + accountId: string; + peer: RoutePeerLike; + resolveAgentRoute: (params: { + cfg: TConfig; + channel: string; + accountId: string; + peer: RoutePeerLike; + }) => TRoute; + sessionStore?: string; + resolveStorePath: (store: string | undefined, opts: { agentId: string }) => string; + readSessionUpdatedAt: (params: { storePath: string; sessionKey: string }) => number | undefined; + resolveEnvelopeFormatOptions: (cfg: TConfig) => TEnvelope; + formatAgentEnvelope: (params: { + channel: string; + from: string; + timestamp?: number; + previousTimestamp?: number; + envelope: TEnvelope; + body: string; + }) => string; +}): { + route: TRoute; + buildEnvelope: ReturnType>; +} { + const route = params.resolveAgentRoute({ + cfg: params.cfg, + channel: params.channel, + accountId: params.accountId, + peer: params.peer, + }); + const buildEnvelope = createInboundEnvelopeBuilder({ + cfg: params.cfg, + route, + sessionStore: params.sessionStore, + resolveStorePath: params.resolveStorePath, + readSessionUpdatedAt: params.readSessionUpdatedAt, + resolveEnvelopeFormatOptions: params.resolveEnvelopeFormatOptions, + formatAgentEnvelope: params.formatAgentEnvelope, + }); + return { route, buildEnvelope }; +} + +type InboundRouteEnvelopeRuntime = { + routing: { + resolveAgentRoute: (params: { + cfg: TConfig; + channel: string; + accountId: string; + peer: RoutePeerLike; + }) => TRoute; + }; + session: { + resolveStorePath: (store: string | undefined, opts: { agentId: string }) => string; + readSessionUpdatedAt: (params: { storePath: string; sessionKey: string }) => number | undefined; + }; + reply: { + resolveEnvelopeFormatOptions: (cfg: TConfig) => TEnvelope; + formatAgentEnvelope: (params: { + channel: string; + from: string; + timestamp?: number; + previousTimestamp?: number; + envelope: TEnvelope; + body: string; + }) => string; + }; +}; + +export function resolveInboundRouteEnvelopeBuilderWithRuntime< + TConfig, + TEnvelope, + TRoute extends RouteLike, +>(params: { + cfg: TConfig; + channel: string; + accountId: string; + peer: RoutePeerLike; + runtime: InboundRouteEnvelopeRuntime; + sessionStore?: string; +}): { + route: TRoute; + buildEnvelope: ReturnType>; +} { + return resolveInboundRouteEnvelopeBuilder({ + cfg: params.cfg, + channel: params.channel, + accountId: params.accountId, + peer: params.peer, + resolveAgentRoute: (routeParams) => params.runtime.routing.resolveAgentRoute(routeParams), + sessionStore: params.sessionStore, + resolveStorePath: params.runtime.session.resolveStorePath, + readSessionUpdatedAt: params.runtime.session.readSessionUpdatedAt, + resolveEnvelopeFormatOptions: params.runtime.reply.resolveEnvelopeFormatOptions, + formatAgentEnvelope: params.runtime.reply.formatAgentEnvelope, + }); +} diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index 10a482d6d29..60f74ecb542 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -148,6 +148,7 @@ export { buildOauthProviderAuthResult } from "./provider-auth-result.js"; export { formatResolvedUnresolvedNote } from "./resolution-notes.js"; export type { ChannelDock } from "../channels/dock.js"; export { getChatChannelMeta } from "../channels/registry.js"; +export { resolveAllowlistMatchByCandidates } from "../channels/allowlist-match.js"; export type { BlockStreamingCoalesceConfig, DmPolicy, @@ -223,9 +224,22 @@ export { type SenderGroupAccessDecision, type SenderGroupAccessReason, } from "./group-access.js"; -export { resolveSenderCommandAuthorization } from "./command-auth.js"; +export { + resolveDirectDmAuthorizationOutcome, + resolveSenderCommandAuthorization, + resolveSenderCommandAuthorizationWithRuntime, +} from "./command-auth.js"; +export type { CommandAuthorizationRuntime } from "./command-auth.js"; export { createScopedPairingAccess } from "./pairing-access.js"; -export { createInboundEnvelopeBuilder } from "./inbound-envelope.js"; +export { + createInboundEnvelopeBuilder, + resolveInboundRouteEnvelopeBuilder, + resolveInboundRouteEnvelopeBuilderWithRuntime, +} from "./inbound-envelope.js"; +export { + listConfiguredAccountIds, + resolveAccountWithDefaultFallback, +} from "./account-resolution.js"; export { issuePairingChallenge } from "../pairing/pairing-challenge.js"; export { handleSlackMessageAction } from "./slack-message-actions.js"; export { extractToolSend } from "./tool-send.js"; diff --git a/src/slack/monitor/allow-list.ts b/src/slack/monitor/allow-list.ts index 34aa9ed3914..fdee1467dab 100644 --- a/src/slack/monitor/allow-list.ts +++ b/src/slack/monitor/allow-list.ts @@ -1,4 +1,7 @@ -import type { AllowlistMatch } from "../../channels/allowlist-match.js"; +import { + resolveAllowlistMatchByCandidates, + type AllowlistMatch, +} from "../../channels/allowlist-match.js"; import { normalizeHyphenSlug, normalizeStringEntries, @@ -49,19 +52,7 @@ export function resolveSlackAllowListMatch(params: { ] satisfies Array<{ value?: string; source: SlackAllowListMatch["matchSource"] }>) : []), ]; - for (const candidate of candidates) { - if (!candidate.value) { - continue; - } - if (allowList.includes(candidate.value)) { - return { - allowed: true, - matchKey: candidate.value, - matchSource: candidate.source, - }; - } - } - return { allowed: false }; + return resolveAllowlistMatchByCandidates({ allowList, candidates }); } export function allowListMatches(params: { diff --git a/src/telegram/accounts.ts b/src/telegram/accounts.ts index d81781a25cb..54af9ba2adf 100644 --- a/src/telegram/accounts.ts +++ b/src/telegram/accounts.ts @@ -4,6 +4,10 @@ import type { OpenClawConfig } from "../config/config.js"; import type { TelegramAccountConfig, TelegramActionConfig } from "../config/types.js"; import { isTruthyEnvValue } from "../infra/env.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; +import { + listConfiguredAccountIds as listConfiguredAccountIdsFromSection, + resolveAccountWithDefaultFallback, +} from "../plugin-sdk/account-resolution.js"; import { resolveAccountEntry } from "../routing/account-lookup.js"; import { listBoundAccountIds, resolveDefaultAgentBoundAccountId } from "../routing/bindings.js"; import { @@ -42,18 +46,10 @@ export type ResolvedTelegramAccount = { }; function listConfiguredAccountIds(cfg: OpenClawConfig): string[] { - const accounts = cfg.channels?.telegram?.accounts; - if (!accounts || typeof accounts !== "object") { - return []; - } - const ids = new Set(); - for (const key of Object.keys(accounts)) { - if (!key) { - continue; - } - ids.add(normalizeAccountId(key)); - } - return [...ids]; + return listConfiguredAccountIdsFromSection({ + accounts: cfg.channels?.telegram?.accounts, + normalizeAccountId, + }); } export function listTelegramAccountIds(cfg: OpenClawConfig): string[] { @@ -135,7 +131,6 @@ export function resolveTelegramAccount(params: { cfg: OpenClawConfig; accountId?: string | null; }): ResolvedTelegramAccount { - const hasExplicitAccountId = Boolean(params.accountId?.trim()); const baseEnabled = params.cfg.channels?.telegram?.enabled !== false; const resolve = (accountId: string) => { @@ -158,27 +153,16 @@ export function resolveTelegramAccount(params: { } satisfies ResolvedTelegramAccount; }; - const normalized = normalizeAccountId(params.accountId); - const primary = resolve(normalized); - if (hasExplicitAccountId) { - return primary; - } - if (primary.tokenSource !== "none") { - return primary; - } - // If accountId is omitted, prefer a configured account token over failing on // the implicit "default" account. This keeps env-based setups working while // making config-only tokens work for things like heartbeats. - const fallbackId = resolveDefaultTelegramAccountId(params.cfg); - if (fallbackId === primary.accountId) { - return primary; - } - const fallback = resolve(fallbackId); - if (fallback.tokenSource === "none") { - return primary; - } - return fallback; + return resolveAccountWithDefaultFallback({ + accountId: params.accountId, + normalizeAccountId, + resolvePrimary: resolve, + hasCredential: (account) => account.tokenSource !== "none", + resolveDefaultAccountId: () => resolveDefaultTelegramAccountId(params.cfg), + }); } export function listEnabledTelegramAccounts(cfg: OpenClawConfig): ResolvedTelegramAccount[] {