From 7fcec6ca3e91124f7548cb4cb415a41c7625d247 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 2 Mar 2026 05:19:54 +0000 Subject: [PATCH] refactor(streaming): share approval and stream message builders --- .../bash-tools.exec-approval-request.ts | 121 +++++++++--------- src/agents/bash-tools.exec-host-gateway.ts | 39 +++--- src/agents/bash-tools.exec-host-node.ts | 45 +++---- src/agents/ollama-stream.ts | 21 +-- src/agents/openai-ws-stream.ts | 60 ++------- src/agents/stream-message-shared.ts | 53 ++++++++ 6 files changed, 170 insertions(+), 169 deletions(-) create mode 100644 src/agents/stream-message-shared.ts diff --git a/src/agents/bash-tools.exec-approval-request.ts b/src/agents/bash-tools.exec-approval-request.ts index 0b0c0228c6e..02c5e5d2d95 100644 --- a/src/agents/bash-tools.exec-approval-request.ts +++ b/src/agents/bash-tools.exec-approval-request.ts @@ -25,24 +25,7 @@ export type RequestExecApprovalDecisionParams = { turnSourceThreadId?: string | number; }; -type ExecApprovalRequestToolParams = { - id: string; - command: string; - commandArgv?: string[]; - systemRunPlan?: SystemRunApprovalPlan; - env?: Record; - cwd: string; - nodeId?: string; - host: "gateway" | "node"; - security: ExecSecurity; - ask: ExecAsk; - agentId?: string; - resolvedPath?: string; - sessionKey?: string; - turnSourceChannel?: string; - turnSourceTo?: string; - turnSourceAccountId?: string; - turnSourceThreadId?: string | number; +type ExecApprovalRequestToolParams = RequestExecApprovalDecisionParams & { timeoutMs: number; twoPhase: true; }; @@ -155,7 +138,7 @@ export async function requestExecApprovalDecision( return await waitForExecApprovalDecision(registration.id); } -export async function requestExecApprovalDecisionForHost(params: { +type HostExecApprovalParams = { approvalId: string; command: string; commandArgv?: string[]; @@ -173,48 +156,45 @@ export async function requestExecApprovalDecisionForHost(params: { turnSourceTo?: string; turnSourceAccountId?: string; turnSourceThreadId?: string | number; -}): Promise { - return await requestExecApprovalDecision({ - id: params.approvalId, - command: params.command, - commandArgv: params.commandArgv, - systemRunPlan: params.systemRunPlan, - env: params.env, - cwd: params.workdir, - nodeId: params.nodeId, - host: params.host, - security: params.security, - ask: params.ask, +}; + +type ExecApprovalRequesterContext = { + agentId?: string; + sessionKey?: string; +}; + +export function buildExecApprovalRequesterContext(params: ExecApprovalRequesterContext): { + agentId?: string; + sessionKey?: string; +} { + return { agentId: params.agentId, - resolvedPath: params.resolvedPath, sessionKey: params.sessionKey, - turnSourceChannel: params.turnSourceChannel, - turnSourceTo: params.turnSourceTo, - turnSourceAccountId: params.turnSourceAccountId, - turnSourceThreadId: params.turnSourceThreadId, - }); + }; } -export async function registerExecApprovalRequestForHost(params: { - approvalId: string; - command: string; - commandArgv?: string[]; - systemRunPlan?: SystemRunApprovalPlan; - env?: Record; - workdir: string; - host: "gateway" | "node"; - nodeId?: string; - security: ExecSecurity; - ask: ExecAsk; - agentId?: string; - resolvedPath?: string; - sessionKey?: string; +type ExecApprovalTurnSourceContext = { turnSourceChannel?: string; turnSourceTo?: string; turnSourceAccountId?: string; turnSourceThreadId?: string | number; -}): Promise { - return await registerExecApprovalRequest({ +}; + +export function buildExecApprovalTurnSourceContext( + params: ExecApprovalTurnSourceContext, +): ExecApprovalTurnSourceContext { + return { + turnSourceChannel: params.turnSourceChannel, + turnSourceTo: params.turnSourceTo, + turnSourceAccountId: params.turnSourceAccountId, + turnSourceThreadId: params.turnSourceThreadId, + }; +} + +function buildHostApprovalDecisionParams( + params: HostExecApprovalParams, +): RequestExecApprovalDecisionParams { + return { id: params.approvalId, command: params.command, commandArgv: params.commandArgv, @@ -225,12 +205,33 @@ export async function registerExecApprovalRequestForHost(params: { host: params.host, security: params.security, ask: params.ask, - agentId: params.agentId, + ...buildExecApprovalRequesterContext({ + agentId: params.agentId, + sessionKey: params.sessionKey, + }), resolvedPath: params.resolvedPath, - sessionKey: params.sessionKey, - turnSourceChannel: params.turnSourceChannel, - turnSourceTo: params.turnSourceTo, - turnSourceAccountId: params.turnSourceAccountId, - turnSourceThreadId: params.turnSourceThreadId, - }); + ...buildExecApprovalTurnSourceContext(params), + }; +} + +export async function requestExecApprovalDecisionForHost( + params: HostExecApprovalParams, +): Promise { + return await requestExecApprovalDecision(buildHostApprovalDecisionParams(params)); +} + +export async function registerExecApprovalRequestForHost( + params: HostExecApprovalParams, +): Promise { + return await registerExecApprovalRequest(buildHostApprovalDecisionParams(params)); +} + +export async function registerExecApprovalRequestForHostOrThrow( + params: HostExecApprovalParams, +): Promise { + try { + return await registerExecApprovalRequestForHost(params); + } catch (err) { + throw new Error(`Exec approval registration failed: ${String(err)}`, { cause: err }); + } } diff --git a/src/agents/bash-tools.exec-host-gateway.ts b/src/agents/bash-tools.exec-host-gateway.ts index 9ce27e077cb..265b98ebf2c 100644 --- a/src/agents/bash-tools.exec-host-gateway.ts +++ b/src/agents/bash-tools.exec-host-gateway.ts @@ -18,7 +18,9 @@ import type { SafeBinProfile } from "../infra/exec-safe-bin-policy.js"; import { logInfo } from "../logger.js"; import { markBackgrounded, tail } from "./bash-process-registry.js"; import { - registerExecApprovalRequestForHost, + buildExecApprovalRequesterContext, + buildExecApprovalTurnSourceContext, + registerExecApprovalRequestForHostOrThrow, waitForExecApprovalDecision, } from "./bash-tools.exec-approval-request.js"; import { @@ -151,28 +153,23 @@ export async function processGatewayAllowlist( let expiresAtMs = Date.now() + DEFAULT_APPROVAL_TIMEOUT_MS; let preResolvedDecision: string | null | undefined; - try { - // Register first so the returned approval ID is actionable immediately. - const registration = await registerExecApprovalRequestForHost({ - approvalId, - command: params.command, - workdir: params.workdir, - host: "gateway", - security: hostSecurity, - ask: hostAsk, + // Register first so the returned approval ID is actionable immediately. + const registration = await registerExecApprovalRequestForHostOrThrow({ + approvalId, + command: params.command, + workdir: params.workdir, + host: "gateway", + security: hostSecurity, + ask: hostAsk, + ...buildExecApprovalRequesterContext({ agentId: params.agentId, - resolvedPath, sessionKey: params.sessionKey, - turnSourceChannel: params.turnSourceChannel, - turnSourceTo: params.turnSourceTo, - turnSourceAccountId: params.turnSourceAccountId, - turnSourceThreadId: params.turnSourceThreadId, - }); - expiresAtMs = registration.expiresAtMs; - preResolvedDecision = registration.finalDecision; - } catch (err) { - throw new Error(`Exec approval registration failed: ${String(err)}`, { cause: err }); - } + }), + resolvedPath, + ...buildExecApprovalTurnSourceContext(params), + }); + expiresAtMs = registration.expiresAtMs; + preResolvedDecision = registration.finalDecision; void (async () => { let decision: string | null = preResolvedDecision ?? null; diff --git a/src/agents/bash-tools.exec-host-node.ts b/src/agents/bash-tools.exec-host-node.ts index f72b6e289ed..69cc36d73fa 100644 --- a/src/agents/bash-tools.exec-host-node.ts +++ b/src/agents/bash-tools.exec-host-node.ts @@ -16,7 +16,9 @@ import { buildNodeShellCommand } from "../infra/node-shell.js"; import { parsePreparedSystemRunPayload } from "../infra/system-run-approval-context.js"; import { logInfo } from "../logger.js"; import { - registerExecApprovalRequestForHost, + buildExecApprovalRequesterContext, + buildExecApprovalTurnSourceContext, + registerExecApprovalRequestForHostOrThrow, waitForExecApprovalDecision, } from "./bash-tools.exec-approval-request.js"; import { @@ -219,31 +221,26 @@ export async function executeNodeHostCommand( let expiresAtMs = Date.now() + DEFAULT_APPROVAL_TIMEOUT_MS; let preResolvedDecision: string | null | undefined; - try { - // Register first so the returned approval ID is actionable immediately. - const registration = await registerExecApprovalRequestForHost({ - approvalId, - command: prepared.cmdText, - commandArgv: prepared.plan.argv, - systemRunPlan: prepared.plan, - env: nodeEnv, - workdir: runCwd, - host: "node", - nodeId, - security: hostSecurity, - ask: hostAsk, + // Register first so the returned approval ID is actionable immediately. + const registration = await registerExecApprovalRequestForHostOrThrow({ + approvalId, + command: prepared.cmdText, + commandArgv: prepared.plan.argv, + systemRunPlan: prepared.plan, + env: nodeEnv, + workdir: runCwd, + host: "node", + nodeId, + security: hostSecurity, + ask: hostAsk, + ...buildExecApprovalRequesterContext({ agentId: runAgentId, sessionKey: runSessionKey, - turnSourceChannel: params.turnSourceChannel, - turnSourceTo: params.turnSourceTo, - turnSourceAccountId: params.turnSourceAccountId, - turnSourceThreadId: params.turnSourceThreadId, - }); - expiresAtMs = registration.expiresAtMs; - preResolvedDecision = registration.finalDecision; - } catch (err) { - throw new Error(`Exec approval registration failed: ${String(err)}`, { cause: err }); - } + }), + ...buildExecApprovalTurnSourceContext(params), + }); + expiresAtMs = registration.expiresAtMs; + preResolvedDecision = registration.finalDecision; void (async () => { let decision: string | null = preResolvedDecision ?? null; diff --git a/src/agents/ollama-stream.ts b/src/agents/ollama-stream.ts index 321d26b5452..dd93dc90ae3 100644 --- a/src/agents/ollama-stream.ts +++ b/src/agents/ollama-stream.ts @@ -10,6 +10,7 @@ import type { } from "@mariozechner/pi-ai"; import { createAssistantMessageEventStream } from "@mariozechner/pi-ai"; import { createSubsystemLogger } from "../logging/subsystem.js"; +import { buildStreamErrorAssistantMessage } from "./stream-message-shared.js"; const log = createSubsystemLogger("ollama-stream"); @@ -521,24 +522,10 @@ export function createOllamaStreamFn(baseUrl: string): StreamFn { stream.push({ type: "error", reason: "error", - error: { - role: "assistant" as const, - content: [], - stopReason: "error" as StopReason, + error: buildStreamErrorAssistantMessage({ + model, errorMessage, - api: model.api, - provider: model.provider, - model: model.id, - usage: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - totalTokens: 0, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, - }, - timestamp: Date.now(), - }, + }), }); } finally { stream.end(); diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index ae7f1da4376..acc51f7e770 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -42,6 +42,10 @@ import { type ResponseObject, } from "./openai-ws-connection.js"; import { log } from "./pi-embedded-runner/logger.js"; +import { + buildAssistantMessageWithZeroUsage, + buildStreamErrorAssistantMessage, +} from "./stream-message-shared.js"; // ───────────────────────────────────────────────────────────────────────────── // Per-session state @@ -605,23 +609,11 @@ export function createOpenAIWebSocketStreamFn( eventStream.push({ type: "start", - partial: { - role: "assistant", + partial: buildAssistantMessageWithZeroUsage({ + model, content: [], stopReason: "stop", - api: model.api, - provider: model.provider, - model: model.id, - usage: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - totalTokens: 0, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, - }, - timestamp: Date.now(), - }, + }), }); // ── 5. Wait for response.completed ─────────────────────────────────── @@ -678,23 +670,11 @@ export function createOpenAIWebSocketStreamFn( reject(new Error(`OpenAI WebSocket error: ${event.message} (code=${event.code})`)); } else if (event.type === "response.output_text.delta") { // Stream partial text updates for responsive UI - const partialMsg: AssistantMessage = { - role: "assistant", + const partialMsg: AssistantMessage = buildAssistantMessageWithZeroUsage({ + model, content: [{ type: "text", text: event.delta }], stopReason: "stop", - api: model.api, - provider: model.provider, - model: model.id, - usage: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - totalTokens: 0, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, - }, - timestamp: Date.now(), - }; + }); eventStream.push({ type: "text_delta", contentIndex: 0, @@ -713,24 +693,10 @@ export function createOpenAIWebSocketStreamFn( eventStream.push({ type: "error", reason: "error", - error: { - role: "assistant" as const, - content: [], - stopReason: "error" as StopReason, + error: buildStreamErrorAssistantMessage({ + model, errorMessage, - api: model.api, - provider: model.provider, - model: model.id, - usage: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - totalTokens: 0, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, - }, - timestamp: Date.now(), - }, + }), }); eventStream.end(); }), diff --git a/src/agents/stream-message-shared.ts b/src/agents/stream-message-shared.ts new file mode 100644 index 00000000000..696c09890d0 --- /dev/null +++ b/src/agents/stream-message-shared.ts @@ -0,0 +1,53 @@ +import type { AssistantMessage, StopReason, Usage } from "@mariozechner/pi-ai"; + +export type StreamModelDescriptor = { + api: string; + provider: string; + id: string; +}; + +export function buildZeroUsage(): Usage { + return { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }; +} + +export function buildAssistantMessageWithZeroUsage(params: { + model: StreamModelDescriptor; + content: AssistantMessage["content"]; + stopReason: StopReason; + timestamp?: number; +}): AssistantMessage { + return { + role: "assistant", + content: params.content, + stopReason: params.stopReason, + api: params.model.api, + provider: params.model.provider, + model: params.model.id, + usage: buildZeroUsage(), + timestamp: params.timestamp ?? Date.now(), + }; +} + +export function buildStreamErrorAssistantMessage(params: { + model: StreamModelDescriptor; + errorMessage: string; + timestamp?: number; +}): AssistantMessage & { stopReason: "error"; errorMessage: string } { + return { + ...buildAssistantMessageWithZeroUsage({ + model: params.model, + content: [], + stopReason: "error", + timestamp: params.timestamp, + }), + stopReason: "error", + errorMessage: params.errorMessage, + }; +}