fix(agents): close cache boundary transport gaps

This commit is contained in:
Vincent Koc
2026-04-04 17:08:58 +09:00
parent 58a56d9a82
commit 1a13c34f5b
7 changed files with 257 additions and 5 deletions

View File

@@ -1,5 +1,6 @@
import type { Model } from "@mariozechner/pi-ai";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js";
const hoisted = vi.hoisted(() => {
const streamAnthropicMock = vi.fn<(model: unknown, context: unknown, options: unknown) => symbol>(
@@ -145,6 +146,94 @@ describe("createAnthropicVertexStreamFn", () => {
);
});
it("applies Anthropic cache-boundary shaping before forwarding payload hooks", async () => {
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5");
const model = makeModel({ id: "claude-sonnet-4-6", maxTokens: 64000 });
const onPayload = vi.fn(async (payload: unknown) => payload);
void streamFn(
model,
{
systemPrompt: `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic suffix`,
messages: [{ role: "user", content: "Hello" }],
} as never,
{
cacheRetention: "short",
onPayload,
} as never,
);
const transportOptions = hoisted.streamAnthropicMock.mock.calls[0]?.[2] as {
onPayload?: (payload: unknown, payloadModel: unknown) => Promise<unknown>;
};
const payload = {
system: [
{
type: "text",
text: `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic suffix`,
cache_control: { type: "ephemeral" },
},
],
messages: [{ role: "user", content: "Hello" }],
};
const nextPayload = await transportOptions.onPayload?.(payload, model);
expect(onPayload).toHaveBeenCalledWith(
{
system: [
{
type: "text",
text: "Stable prefix",
cache_control: { type: "ephemeral" },
},
{
type: "text",
text: "Dynamic suffix",
},
],
messages: [
{
role: "user",
content: [
{
type: "text",
text: "Hello",
cache_control: { type: "ephemeral" },
},
],
},
],
},
model,
);
expect(nextPayload).toEqual({
system: [
{
type: "text",
text: "Stable prefix",
cache_control: { type: "ephemeral" },
},
{
type: "text",
text: "Dynamic suffix",
},
],
messages: [
{
role: "user",
content: [
{
type: "text",
text: "Hello",
cache_control: { type: "ephemeral" },
},
],
},
],
});
});
it("omits maxTokens when neither the model nor request provide a finite limit", () => {
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5");
const model = makeModel({ id: "claude-sonnet-4-6" });

View File

@@ -5,6 +5,10 @@ import {
resolveAnthropicVertexClientRegion,
resolveAnthropicVertexProjectId,
} from "../plugin-sdk/anthropic-vertex.js";
import {
applyAnthropicPayloadPolicyToParams,
resolveAnthropicPayloadPolicy,
} from "./anthropic-payload-policy.js";
type AnthropicVertexEffort = NonNullable<AnthropicOptions["effort"]>;
@@ -31,6 +35,28 @@ function resolveAnthropicVertexMaxTokens(params: {
return requested ?? modelMax;
}
function createAnthropicVertexOnPayload(params: {
model: { api: string; baseUrl?: string; provider: string };
cacheRetention: AnthropicOptions["cacheRetention"] | undefined;
onPayload: AnthropicOptions["onPayload"] | undefined;
}): NonNullable<AnthropicOptions["onPayload"]> {
const policy = resolveAnthropicPayloadPolicy({
provider: params.model.provider,
api: params.model.api,
baseUrl: params.model.baseUrl,
cacheRetention: params.cacheRetention,
enableCacheControl: true,
});
return async (payload, model) => {
if (payload && typeof payload === "object" && !Array.isArray(payload)) {
applyAnthropicPayloadPolicyToParams(payload as Record<string, unknown>, policy);
}
const nextPayload = await params.onPayload?.(payload, model);
return nextPayload ?? payload;
};
}
/**
* Create a StreamFn that routes through pi-ai's `streamAnthropic` with an
* injected `AnthropicVertex` client. All streaming, message conversion, and
@@ -49,8 +75,13 @@ export function createAnthropicVertexStreamFn(
});
return (model, context, options) => {
const transportModel = model as Model<"anthropic-messages"> & {
api: string;
baseUrl?: string;
provider: string;
};
const maxTokens = resolveAnthropicVertexMaxTokens({
modelMaxTokens: model.maxTokens,
modelMaxTokens: transportModel.maxTokens,
requestedMaxTokens: options?.maxTokens,
});
const opts: AnthropicOptions = {
@@ -61,7 +92,11 @@ export function createAnthropicVertexStreamFn(
cacheRetention: options?.cacheRetention,
sessionId: options?.sessionId,
headers: options?.headers,
onPayload: options?.onPayload,
onPayload: createAnthropicVertexOnPayload({
model: transportModel,
cacheRetention: options?.cacheRetention,
onPayload: options?.onPayload,
}),
maxRetryDelayMs: options?.maxRetryDelayMs,
metadata: options?.metadata,
};
@@ -95,7 +130,7 @@ export function createAnthropicVertexStreamFn(
opts.thinkingEnabled = false;
}
return streamAnthropic(model as Model<"anthropic-messages">, context, opts);
return streamAnthropic(transportModel, context, opts);
};
}

View File

@@ -69,6 +69,49 @@ describe("createCacheTrace", () => {
expect(event.system).toBe("");
});
it("records stream context from systemPrompt when wrapping stream functions", () => {
const lines: string[] = [];
const trace = createCacheTrace({
cfg: {
diagnostics: {
cacheTrace: {
enabled: true,
includeSystem: true,
},
},
},
env: {},
writer: {
filePath: "memory",
write: (line) => lines.push(line),
},
});
const wrapped = trace?.wrapStreamFn(((model, context, options) => ({
model,
context,
options,
})) as never);
wrapped?.(
{
id: "gpt-5.4",
provider: "openai",
api: "openai-responses",
} as never,
{
systemPrompt: "system prompt text",
messages: [],
} as never,
{},
);
const event = JSON.parse(lines[0]?.trim() ?? "{}") as Record<string, unknown>;
expect(event.stage).toBe("stream:context");
expect(event.system).toBe("system prompt text");
expect(event.systemDigest).toBeTypeOf("string");
});
it("respects env overrides for enablement", () => {
const lines: string[] = [];
const trace = createCacheTrace({

View File

@@ -238,14 +238,19 @@ export function createCacheTrace(params: CacheTraceInit): CacheTrace | null {
const wrapStreamFn: CacheTrace["wrapStreamFn"] = (streamFn) => {
const wrapped: StreamFn = (model, context, options) => {
const traceContext = context as {
messages?: AgentMessage[];
system?: unknown;
systemPrompt?: unknown;
};
recordStage("stream:context", {
model: {
id: model?.id,
provider: model?.provider,
api: model?.api,
},
system: (context as { system?: unknown }).system,
messages: (context as { messages?: AgentMessage[] }).messages ?? [],
system: traceContext.systemPrompt ?? traceContext.system,
messages: traceContext.messages ?? [],
options: (options ?? {}) as Record<string, unknown>,
});
return streamFn(model, context, options);

View File

@@ -20,6 +20,7 @@ import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js"
describe("openai transport stream", () => {
it("reports the supported transport-aware APIs", () => {
expect(isTransportAwareApiSupported("openai-responses")).toBe(true);
expect(isTransportAwareApiSupported("openai-codex-responses")).toBe(true);
expect(isTransportAwareApiSupported("openai-completions")).toBe(true);
expect(isTransportAwareApiSupported("azure-openai-responses")).toBe(true);
expect(isTransportAwareApiSupported("anthropic-messages")).toBe(true);
@@ -41,6 +42,20 @@ describe("openai transport stream", () => {
maxTokens: 8192,
} satisfies Model<"openai-responses">),
).toBeTypeOf("function");
expect(
createBoundaryAwareStreamFnForModel({
id: "codex-mini-latest",
name: "Codex Mini Latest",
api: "openai-codex-responses",
provider: "openai-codex",
baseUrl: "https://api.openai.com/v1",
reasoning: true,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 200000,
maxTokens: 8192,
} satisfies Model<"openai-codex-responses">),
).toBeTypeOf("function");
expect(
createBoundaryAwareStreamFnForModel({
id: "claude-sonnet-4-6",
@@ -104,6 +119,39 @@ describe("openai transport stream", () => {
expect(buildTransportAwareSimpleStreamFn(model)).toBeTypeOf("function");
});
it("prepares a Codex Responses simple-completion api alias when transport overrides are attached", () => {
const model = attachModelProviderRequestTransport(
{
id: "codex-mini-latest",
name: "Codex Mini Latest",
api: "openai-codex-responses",
provider: "openai-codex",
baseUrl: "https://api.openai.com/v1",
reasoning: true,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 200000,
maxTokens: 8192,
} satisfies Model<"openai-codex-responses">,
{
proxy: {
mode: "explicit-proxy",
url: "http://proxy.internal:8443",
},
},
);
const prepared = prepareTransportAwareSimpleModel(model);
expect(resolveTransportAwareSimpleApi(model.api)).toBe("openclaw-openai-responses-transport");
expect(prepared).toMatchObject({
api: "openclaw-openai-responses-transport",
provider: "openai-codex",
id: "codex-mini-latest",
});
expect(buildTransportAwareSimpleStreamFn(model)).toBeTypeOf("function");
});
it("prepares an Anthropic simple-completion api alias when transport overrides are attached", () => {
const model = attachModelProviderRequestTransport(
{

View File

@@ -35,6 +35,20 @@ describe("describeEmbeddedAgentStreamStrategy", () => {
).toBe("boundary-aware:openai-responses");
});
it("describes default Codex fallback shaping", () => {
expect(
describeEmbeddedAgentStreamStrategy({
currentStreamFn: undefined,
shouldUseWebSocketTransport: false,
model: {
api: "openai-codex-responses",
provider: "openai-codex",
id: "codex-mini-latest",
} as never,
}),
).toBe("boundary-aware:openai-codex-responses");
});
it("keeps custom session streams labeled as custom", () => {
expect(
describeEmbeddedAgentStreamStrategy({
@@ -65,4 +79,19 @@ describe("resolveEmbeddedAgentStreamFn", () => {
expect(streamFn).not.toBe(streamSimple);
});
it("routes Codex responses fallbacks through boundary-aware transports", () => {
const streamFn = resolveEmbeddedAgentStreamFn({
currentStreamFn: undefined,
shouldUseWebSocketTransport: false,
sessionId: "session-1",
model: {
api: "openai-codex-responses",
provider: "openai-codex",
id: "codex-mini-latest",
} as never,
});
expect(streamFn).not.toBe(streamSimple);
});
});

View File

@@ -11,6 +11,7 @@ import { getModelProviderRequestTransport } from "./provider-request-config.js";
const SUPPORTED_TRANSPORT_APIS = new Set<Api>([
"openai-responses",
"openai-codex-responses",
"openai-completions",
"azure-openai-responses",
"anthropic-messages",
@@ -19,6 +20,7 @@ const SUPPORTED_TRANSPORT_APIS = new Set<Api>([
const SIMPLE_TRANSPORT_API_ALIAS: Record<string, Api> = {
"openai-responses": "openclaw-openai-responses-transport",
"openai-codex-responses": "openclaw-openai-responses-transport",
"openai-completions": "openclaw-openai-completions-transport",
"azure-openai-responses": "openclaw-azure-openai-responses-transport",
"anthropic-messages": "openclaw-anthropic-messages-transport",
@@ -28,6 +30,7 @@ const SIMPLE_TRANSPORT_API_ALIAS: Record<string, Api> = {
function createSupportedTransportStreamFn(api: Api): StreamFn | undefined {
switch (api) {
case "openai-responses":
case "openai-codex-responses":
return createOpenAIResponsesTransportStreamFn();
case "openai-completions":
return createOpenAICompletionsTransportStreamFn();