refactor(streaming): share approval and stream message builders

This commit is contained in:
Peter Steinberger
2026-03-02 05:19:54 +00:00
parent 6b78544f82
commit 7fcec6ca3e
6 changed files with 170 additions and 169 deletions

View File

@@ -25,24 +25,7 @@ export type RequestExecApprovalDecisionParams = {
turnSourceThreadId?: string | number;
};
type ExecApprovalRequestToolParams = {
id: string;
command: string;
commandArgv?: string[];
systemRunPlan?: SystemRunApprovalPlan;
env?: Record<string, string>;
cwd: string;
nodeId?: string;
host: "gateway" | "node";
security: ExecSecurity;
ask: ExecAsk;
agentId?: string;
resolvedPath?: string;
sessionKey?: string;
turnSourceChannel?: string;
turnSourceTo?: string;
turnSourceAccountId?: string;
turnSourceThreadId?: string | number;
type ExecApprovalRequestToolParams = RequestExecApprovalDecisionParams & {
timeoutMs: number;
twoPhase: true;
};
@@ -155,7 +138,7 @@ export async function requestExecApprovalDecision(
return await waitForExecApprovalDecision(registration.id);
}
export async function requestExecApprovalDecisionForHost(params: {
type HostExecApprovalParams = {
approvalId: string;
command: string;
commandArgv?: string[];
@@ -173,48 +156,45 @@ export async function requestExecApprovalDecisionForHost(params: {
turnSourceTo?: string;
turnSourceAccountId?: string;
turnSourceThreadId?: string | number;
}): Promise<string | null> {
return await requestExecApprovalDecision({
id: params.approvalId,
command: params.command,
commandArgv: params.commandArgv,
systemRunPlan: params.systemRunPlan,
env: params.env,
cwd: params.workdir,
nodeId: params.nodeId,
host: params.host,
security: params.security,
ask: params.ask,
};
type ExecApprovalRequesterContext = {
agentId?: string;
sessionKey?: string;
};
export function buildExecApprovalRequesterContext(params: ExecApprovalRequesterContext): {
agentId?: string;
sessionKey?: string;
} {
return {
agentId: params.agentId,
resolvedPath: params.resolvedPath,
sessionKey: params.sessionKey,
turnSourceChannel: params.turnSourceChannel,
turnSourceTo: params.turnSourceTo,
turnSourceAccountId: params.turnSourceAccountId,
turnSourceThreadId: params.turnSourceThreadId,
});
};
}
export async function registerExecApprovalRequestForHost(params: {
approvalId: string;
command: string;
commandArgv?: string[];
systemRunPlan?: SystemRunApprovalPlan;
env?: Record<string, string>;
workdir: string;
host: "gateway" | "node";
nodeId?: string;
security: ExecSecurity;
ask: ExecAsk;
agentId?: string;
resolvedPath?: string;
sessionKey?: string;
type ExecApprovalTurnSourceContext = {
turnSourceChannel?: string;
turnSourceTo?: string;
turnSourceAccountId?: string;
turnSourceThreadId?: string | number;
}): Promise<ExecApprovalRegistration> {
return await registerExecApprovalRequest({
};
export function buildExecApprovalTurnSourceContext(
params: ExecApprovalTurnSourceContext,
): ExecApprovalTurnSourceContext {
return {
turnSourceChannel: params.turnSourceChannel,
turnSourceTo: params.turnSourceTo,
turnSourceAccountId: params.turnSourceAccountId,
turnSourceThreadId: params.turnSourceThreadId,
};
}
function buildHostApprovalDecisionParams(
params: HostExecApprovalParams,
): RequestExecApprovalDecisionParams {
return {
id: params.approvalId,
command: params.command,
commandArgv: params.commandArgv,
@@ -225,12 +205,33 @@ export async function registerExecApprovalRequestForHost(params: {
host: params.host,
security: params.security,
ask: params.ask,
agentId: params.agentId,
...buildExecApprovalRequesterContext({
agentId: params.agentId,
sessionKey: params.sessionKey,
}),
resolvedPath: params.resolvedPath,
sessionKey: params.sessionKey,
turnSourceChannel: params.turnSourceChannel,
turnSourceTo: params.turnSourceTo,
turnSourceAccountId: params.turnSourceAccountId,
turnSourceThreadId: params.turnSourceThreadId,
});
...buildExecApprovalTurnSourceContext(params),
};
}
export async function requestExecApprovalDecisionForHost(
params: HostExecApprovalParams,
): Promise<string | null> {
return await requestExecApprovalDecision(buildHostApprovalDecisionParams(params));
}
export async function registerExecApprovalRequestForHost(
params: HostExecApprovalParams,
): Promise<ExecApprovalRegistration> {
return await registerExecApprovalRequest(buildHostApprovalDecisionParams(params));
}
export async function registerExecApprovalRequestForHostOrThrow(
params: HostExecApprovalParams,
): Promise<ExecApprovalRegistration> {
try {
return await registerExecApprovalRequestForHost(params);
} catch (err) {
throw new Error(`Exec approval registration failed: ${String(err)}`, { cause: err });
}
}

View File

@@ -18,7 +18,9 @@ import type { SafeBinProfile } from "../infra/exec-safe-bin-policy.js";
import { logInfo } from "../logger.js";
import { markBackgrounded, tail } from "./bash-process-registry.js";
import {
registerExecApprovalRequestForHost,
buildExecApprovalRequesterContext,
buildExecApprovalTurnSourceContext,
registerExecApprovalRequestForHostOrThrow,
waitForExecApprovalDecision,
} from "./bash-tools.exec-approval-request.js";
import {
@@ -151,28 +153,23 @@ export async function processGatewayAllowlist(
let expiresAtMs = Date.now() + DEFAULT_APPROVAL_TIMEOUT_MS;
let preResolvedDecision: string | null | undefined;
try {
// Register first so the returned approval ID is actionable immediately.
const registration = await registerExecApprovalRequestForHost({
approvalId,
command: params.command,
workdir: params.workdir,
host: "gateway",
security: hostSecurity,
ask: hostAsk,
// Register first so the returned approval ID is actionable immediately.
const registration = await registerExecApprovalRequestForHostOrThrow({
approvalId,
command: params.command,
workdir: params.workdir,
host: "gateway",
security: hostSecurity,
ask: hostAsk,
...buildExecApprovalRequesterContext({
agentId: params.agentId,
resolvedPath,
sessionKey: params.sessionKey,
turnSourceChannel: params.turnSourceChannel,
turnSourceTo: params.turnSourceTo,
turnSourceAccountId: params.turnSourceAccountId,
turnSourceThreadId: params.turnSourceThreadId,
});
expiresAtMs = registration.expiresAtMs;
preResolvedDecision = registration.finalDecision;
} catch (err) {
throw new Error(`Exec approval registration failed: ${String(err)}`, { cause: err });
}
}),
resolvedPath,
...buildExecApprovalTurnSourceContext(params),
});
expiresAtMs = registration.expiresAtMs;
preResolvedDecision = registration.finalDecision;
void (async () => {
let decision: string | null = preResolvedDecision ?? null;

View File

@@ -16,7 +16,9 @@ import { buildNodeShellCommand } from "../infra/node-shell.js";
import { parsePreparedSystemRunPayload } from "../infra/system-run-approval-context.js";
import { logInfo } from "../logger.js";
import {
registerExecApprovalRequestForHost,
buildExecApprovalRequesterContext,
buildExecApprovalTurnSourceContext,
registerExecApprovalRequestForHostOrThrow,
waitForExecApprovalDecision,
} from "./bash-tools.exec-approval-request.js";
import {
@@ -219,31 +221,26 @@ export async function executeNodeHostCommand(
let expiresAtMs = Date.now() + DEFAULT_APPROVAL_TIMEOUT_MS;
let preResolvedDecision: string | null | undefined;
try {
// Register first so the returned approval ID is actionable immediately.
const registration = await registerExecApprovalRequestForHost({
approvalId,
command: prepared.cmdText,
commandArgv: prepared.plan.argv,
systemRunPlan: prepared.plan,
env: nodeEnv,
workdir: runCwd,
host: "node",
nodeId,
security: hostSecurity,
ask: hostAsk,
// Register first so the returned approval ID is actionable immediately.
const registration = await registerExecApprovalRequestForHostOrThrow({
approvalId,
command: prepared.cmdText,
commandArgv: prepared.plan.argv,
systemRunPlan: prepared.plan,
env: nodeEnv,
workdir: runCwd,
host: "node",
nodeId,
security: hostSecurity,
ask: hostAsk,
...buildExecApprovalRequesterContext({
agentId: runAgentId,
sessionKey: runSessionKey,
turnSourceChannel: params.turnSourceChannel,
turnSourceTo: params.turnSourceTo,
turnSourceAccountId: params.turnSourceAccountId,
turnSourceThreadId: params.turnSourceThreadId,
});
expiresAtMs = registration.expiresAtMs;
preResolvedDecision = registration.finalDecision;
} catch (err) {
throw new Error(`Exec approval registration failed: ${String(err)}`, { cause: err });
}
}),
...buildExecApprovalTurnSourceContext(params),
});
expiresAtMs = registration.expiresAtMs;
preResolvedDecision = registration.finalDecision;
void (async () => {
let decision: string | null = preResolvedDecision ?? null;

View File

@@ -10,6 +10,7 @@ import type {
} from "@mariozechner/pi-ai";
import { createAssistantMessageEventStream } from "@mariozechner/pi-ai";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { buildStreamErrorAssistantMessage } from "./stream-message-shared.js";
const log = createSubsystemLogger("ollama-stream");
@@ -521,24 +522,10 @@ export function createOllamaStreamFn(baseUrl: string): StreamFn {
stream.push({
type: "error",
reason: "error",
error: {
role: "assistant" as const,
content: [],
stopReason: "error" as StopReason,
error: buildStreamErrorAssistantMessage({
model,
errorMessage,
api: model.api,
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
}),
});
} finally {
stream.end();

View File

@@ -42,6 +42,10 @@ import {
type ResponseObject,
} from "./openai-ws-connection.js";
import { log } from "./pi-embedded-runner/logger.js";
import {
buildAssistantMessageWithZeroUsage,
buildStreamErrorAssistantMessage,
} from "./stream-message-shared.js";
// ─────────────────────────────────────────────────────────────────────────────
// Per-session state
@@ -605,23 +609,11 @@ export function createOpenAIWebSocketStreamFn(
eventStream.push({
type: "start",
partial: {
role: "assistant",
partial: buildAssistantMessageWithZeroUsage({
model,
content: [],
stopReason: "stop",
api: model.api,
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
}),
});
// ── 5. Wait for response.completed ───────────────────────────────────
@@ -678,23 +670,11 @@ export function createOpenAIWebSocketStreamFn(
reject(new Error(`OpenAI WebSocket error: ${event.message} (code=${event.code})`));
} else if (event.type === "response.output_text.delta") {
// Stream partial text updates for responsive UI
const partialMsg: AssistantMessage = {
role: "assistant",
const partialMsg: AssistantMessage = buildAssistantMessageWithZeroUsage({
model,
content: [{ type: "text", text: event.delta }],
stopReason: "stop",
api: model.api,
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
};
});
eventStream.push({
type: "text_delta",
contentIndex: 0,
@@ -713,24 +693,10 @@ export function createOpenAIWebSocketStreamFn(
eventStream.push({
type: "error",
reason: "error",
error: {
role: "assistant" as const,
content: [],
stopReason: "error" as StopReason,
error: buildStreamErrorAssistantMessage({
model,
errorMessage,
api: model.api,
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
timestamp: Date.now(),
},
}),
});
eventStream.end();
}),

View File

@@ -0,0 +1,53 @@
import type { AssistantMessage, StopReason, Usage } from "@mariozechner/pi-ai";
export type StreamModelDescriptor = {
api: string;
provider: string;
id: string;
};
export function buildZeroUsage(): Usage {
return {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
};
}
export function buildAssistantMessageWithZeroUsage(params: {
model: StreamModelDescriptor;
content: AssistantMessage["content"];
stopReason: StopReason;
timestamp?: number;
}): AssistantMessage {
return {
role: "assistant",
content: params.content,
stopReason: params.stopReason,
api: params.model.api,
provider: params.model.provider,
model: params.model.id,
usage: buildZeroUsage(),
timestamp: params.timestamp ?? Date.now(),
};
}
export function buildStreamErrorAssistantMessage(params: {
model: StreamModelDescriptor;
errorMessage: string;
timestamp?: number;
}): AssistantMessage & { stopReason: "error"; errorMessage: string } {
return {
...buildAssistantMessageWithZeroUsage({
model: params.model,
content: [],
stopReason: "error",
timestamp: params.timestamp,
}),
stopReason: "error",
errorMessage: params.errorMessage,
};
}