test(agents): expand live cache runner scenarios

This commit is contained in:
Vincent Koc
2026-04-04 14:46:56 +09:00
parent 32dd0aa7e7
commit b3186aeef9

View File

@@ -1,7 +1,10 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { AssistantMessage, Message, Tool } from "@mariozechner/pi-ai";
import { Type } from "@sinclair/typebox";
import { beforeAll, describe, expect, it } from "vitest";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import type { OpenClawConfig } from "../config/config.js";
import {
buildStableCachePrefix,
completeSimpleWithLiveTimeout,
@@ -10,7 +13,10 @@ import {
LIVE_CACHE_TEST_ENABLED,
logLiveCache,
resolveLiveDirectModel,
withLiveCacheHeartbeat,
} from "./live-cache-test-support.js";
import { runEmbeddedPiAgent } from "./pi-embedded-runner.js";
import { compactEmbeddedPiSessionDirect } from "./pi-embedded-runner/compact.runtime.js";
const describeCacheLive = LIVE_CACHE_TEST_ENABLED ? describe : describe.skip;
@@ -31,6 +37,7 @@ type CacheRun = {
text: string;
usage: AssistantMessage["usage"];
};
type LiveResolvedModel = Awaited<ReturnType<typeof resolveLiveDirectModel>>;
const NOOP_TOOL: Tool = {
name: "noop",
@@ -38,6 +45,7 @@ const NOOP_TOOL: Tool = {
parameters: Type.Object({}, { additionalProperties: false }),
};
let liveTestPngBase64 = "";
let liveRunnerRootDir: string | undefined;
type UserContent = Extract<Message, { role: "user" }>["content"];
@@ -67,6 +75,153 @@ function makeImageUserTurn(text: string): Message {
]);
}
function buildRunnerSessionPaths(sessionId: string) {
if (!liveRunnerRootDir) {
throw new Error("live runner temp root not initialized");
}
return {
agentDir: liveRunnerRootDir,
sessionFile: path.join(liveRunnerRootDir, `${sessionId}.jsonl`),
workspaceDir: path.join(liveRunnerRootDir, `${sessionId}-workspace`),
};
}
function resolveProviderBaseUrl(fixture: LiveResolvedModel): string | undefined {
const candidate = (fixture.model as { baseUrl?: unknown }).baseUrl;
return typeof candidate === "string" && candidate.trim().length > 0 ? candidate : undefined;
}
function buildEmbeddedRunnerConfig(params: {
fixture: LiveResolvedModel;
cacheRetention: "none" | "short" | "long";
transport?: "sse" | "websocket";
}): OpenClawConfig {
const provider = params.fixture.model.provider;
const modelKey = `${provider}/${params.fixture.model.id}`;
const providerBaseUrl = resolveProviderBaseUrl(params.fixture);
return {
models: {
providers: {
[provider]: {
api: params.fixture.model.api,
apiKey: params.fixture.apiKey,
...(providerBaseUrl ? { baseUrl: providerBaseUrl } : {}),
},
},
},
agents: {
defaults: {
models: {
[modelKey]: {
params: {
cacheRetention: params.cacheRetention,
...(params.transport ? { transport: params.transport } : {}),
},
},
},
},
},
};
}
function buildEmbeddedCachePrompt(suffix: string, sections = 48): string {
const lines = [
`Reply with exactly CACHE-OK ${suffix}.`,
"Do not add any extra words or punctuation.",
];
for (let index = 0; index < sections; index += 1) {
lines.push(
`Embedded cache section ${index + 1}: deterministic prose about prompt stability, session affinity, request shaping, transport continuity, and cache reuse across identical stable prefixes.`,
);
}
return lines.join("\n");
}
function extractRunPayloadText(payloads: Array<{ text?: string } | undefined> | undefined): string {
return (
payloads
?.map((payload) => payload?.text?.trim())
.filter((text): text is string => Boolean(text))
.join(" ") ?? ""
);
}
async function runEmbeddedCacheProbe(params: {
fixture: LiveResolvedModel;
cacheRetention: "none" | "short" | "long";
prefix: string;
providerTag: "anthropic" | "openai";
sessionId: string;
suffix: string;
transport?: "sse" | "websocket";
promptSections?: number;
}): Promise<CacheRun> {
const sessionPaths = buildRunnerSessionPaths(params.sessionId);
await fs.mkdir(sessionPaths.workspaceDir, { recursive: true });
const result = await withLiveCacheHeartbeat(
runEmbeddedPiAgent({
sessionId: params.sessionId,
sessionKey: `live-cache:${params.providerTag}:${params.sessionId}`,
sessionFile: sessionPaths.sessionFile,
workspaceDir: sessionPaths.workspaceDir,
agentDir: sessionPaths.agentDir,
config: buildEmbeddedRunnerConfig({
fixture: params.fixture,
cacheRetention: params.cacheRetention,
transport: params.transport,
}),
prompt: buildEmbeddedCachePrompt(params.suffix, params.promptSections),
provider: params.fixture.model.provider,
model: params.fixture.model.id,
timeoutMs: params.providerTag === "openai" ? OPENAI_TIMEOUT_MS : ANTHROPIC_TIMEOUT_MS,
runId: `${params.sessionId}-${params.suffix}-${params.transport ?? "default"}`,
extraSystemPrompt: params.prefix,
disableTools: true,
cleanupBundleMcpOnRunEnd: true,
}),
`${params.providerTag} embedded cache probe ${params.suffix}${params.transport ? ` (${params.transport})` : ""}`,
);
const text = extractRunPayloadText(result.payloads);
expect(text.toLowerCase()).toContain(params.suffix.toLowerCase());
const usage = result.meta.agentMeta?.usage ?? {};
return {
suffix: params.suffix,
text,
usage,
hitRate: computeCacheHitRate(usage),
};
}
async function compactLiveCacheSession(params: {
fixture: LiveResolvedModel;
cacheRetention: "none" | "short" | "long";
providerTag: "anthropic" | "openai";
sessionId: string;
}) {
const sessionPaths = buildRunnerSessionPaths(params.sessionId);
await fs.mkdir(sessionPaths.workspaceDir, { recursive: true });
return await withLiveCacheHeartbeat(
compactEmbeddedPiSessionDirect({
sessionId: params.sessionId,
sessionKey: `live-cache:${params.providerTag}:${params.sessionId}`,
sessionFile: sessionPaths.sessionFile,
workspaceDir: sessionPaths.workspaceDir,
agentDir: sessionPaths.agentDir,
config: buildEmbeddedRunnerConfig({
fixture: params.fixture,
cacheRetention: params.cacheRetention,
}),
provider: params.fixture.model.provider,
model: params.fixture.model.id,
force: true,
trigger: "manual",
runId: `${params.sessionId}-compact`,
tokenBudget: 512,
}),
`${params.providerTag} embedded compaction ${params.sessionId}`,
);
}
function extractFirstToolCall(message: AssistantMessage) {
return message.content.find((block) => block.type === "toolCall");
}
@@ -445,9 +600,17 @@ async function runAnthropicImageCacheProbe(params: {
describeCacheLive("pi embedded runner prompt caching (live)", () => {
beforeAll(async () => {
liveRunnerRootDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-live-cache-"));
liveTestPngBase64 = (await fs.readFile(LIVE_TEST_PNG_URL)).toString("base64");
}, 120_000);
afterAll(async () => {
if (liveRunnerRootDir) {
await fs.rm(liveRunnerRootDir, { recursive: true, force: true });
}
liveRunnerRootDir = undefined;
});
describe("openai", () => {
let fixture: Awaited<ReturnType<typeof resolveLiveDirectModel>>;
@@ -564,6 +727,95 @@ describeCacheLive("pi embedded runner prompt caching (live)", () => {
},
6 * 60_000,
);
it(
"keeps high cache-read rates across repeated embedded-runner turns",
async () => {
const sessionId = `${OPENAI_SESSION_ID}-embedded`;
const warmup = await runEmbeddedCacheProbe({
...fixture,
cacheRetention: "short",
prefix: OPENAI_PREFIX,
providerTag: "openai",
sessionId,
suffix: "embedded-warmup",
});
logLiveCache(
`openai embedded warmup cacheRead=${warmup.usage.cacheRead} input=${warmup.usage.input} rate=${warmup.hitRate.toFixed(3)}`,
);
const hitA = await runEmbeddedCacheProbe({
...fixture,
cacheRetention: "short",
prefix: OPENAI_PREFIX,
providerTag: "openai",
sessionId,
suffix: "embedded-hit-a",
});
const hitB = await runEmbeddedCacheProbe({
...fixture,
cacheRetention: "short",
prefix: OPENAI_PREFIX,
providerTag: "openai",
sessionId,
suffix: "embedded-hit-b",
});
const bestHit = (hitA.usage.cacheRead ?? 0) >= (hitB.usage.cacheRead ?? 0) ? hitA : hitB;
logLiveCache(
`openai embedded best-hit suffix=${bestHit.suffix} cacheRead=${bestHit.usage.cacheRead} input=${bestHit.usage.input} rate=${bestHit.hitRate.toFixed(3)}`,
);
expect(bestHit.usage.cacheRead ?? 0).toBeGreaterThan(1_024);
expect(bestHit.hitRate).toBeGreaterThanOrEqual(0.4);
},
8 * 60_000,
);
it(
"keeps high cache-read rates when the same embedded session flips from websocket to sse",
async () => {
const sessionId = `${OPENAI_SESSION_ID}-transport-flip`;
const warmup = await runEmbeddedCacheProbe({
...fixture,
cacheRetention: "short",
prefix: OPENAI_PREFIX,
providerTag: "openai",
sessionId,
suffix: "ws-warmup",
transport: "websocket",
});
logLiveCache(
`openai transport warmup cacheRead=${warmup.usage.cacheRead} input=${warmup.usage.input} rate=${warmup.hitRate.toFixed(3)}`,
);
const hitA = await runEmbeddedCacheProbe({
...fixture,
cacheRetention: "short",
prefix: OPENAI_PREFIX,
providerTag: "openai",
sessionId,
suffix: "sse-hit-a",
transport: "sse",
});
const hitB = await runEmbeddedCacheProbe({
...fixture,
cacheRetention: "short",
prefix: OPENAI_PREFIX,
providerTag: "openai",
sessionId,
suffix: "sse-hit-b",
transport: "sse",
});
const bestHit = (hitA.usage.cacheRead ?? 0) >= (hitB.usage.cacheRead ?? 0) ? hitA : hitB;
logLiveCache(
`openai transport-flip best-hit suffix=${bestHit.suffix} cacheRead=${bestHit.usage.cacheRead} input=${bestHit.usage.input} rate=${bestHit.hitRate.toFixed(3)}`,
);
expect(bestHit.usage.cacheRead ?? 0).toBeGreaterThan(1_024);
expect(bestHit.hitRate).toBeGreaterThanOrEqual(0.35);
},
8 * 60_000,
);
});
describe("anthropic", () => {
@@ -712,5 +964,102 @@ describeCacheLive("pi embedded runner prompt caching (live)", () => {
},
3 * 60_000,
);
it(
"keeps high cache-read rates across repeated embedded-runner turns",
async () => {
const sessionId = `${ANTHROPIC_SESSION_ID}-embedded`;
const warmup = await runEmbeddedCacheProbe({
...fixture,
cacheRetention: "short",
prefix: ANTHROPIC_PREFIX,
providerTag: "anthropic",
sessionId,
suffix: "embedded-warmup",
});
logLiveCache(
`anthropic embedded warmup cacheWrite=${warmup.usage.cacheWrite} cacheRead=${warmup.usage.cacheRead} input=${warmup.usage.input} rate=${warmup.hitRate.toFixed(3)}`,
);
expect(warmup.usage.cacheWrite ?? 0).toBeGreaterThan(0);
const hitA = await runEmbeddedCacheProbe({
...fixture,
cacheRetention: "short",
prefix: ANTHROPIC_PREFIX,
providerTag: "anthropic",
sessionId,
suffix: "embedded-hit-a",
});
const hitB = await runEmbeddedCacheProbe({
...fixture,
cacheRetention: "short",
prefix: ANTHROPIC_PREFIX,
providerTag: "anthropic",
sessionId,
suffix: "embedded-hit-b",
});
const bestHit = (hitA.usage.cacheRead ?? 0) >= (hitB.usage.cacheRead ?? 0) ? hitA : hitB;
logLiveCache(
`anthropic embedded best-hit suffix=${bestHit.suffix} cacheWrite=${bestHit.usage.cacheWrite} cacheRead=${bestHit.usage.cacheRead} input=${bestHit.usage.input} rate=${bestHit.hitRate.toFixed(3)}`,
);
expect(bestHit.usage.cacheRead ?? 0).toBeGreaterThan(1_024);
expect(bestHit.hitRate).toBeGreaterThanOrEqual(0.4);
},
8 * 60_000,
);
it(
"preserves cache-safe shaping across compaction followup turns",
async () => {
const sessionId = `${ANTHROPIC_SESSION_ID}-compaction`;
await runEmbeddedCacheProbe({
...fixture,
cacheRetention: "short",
prefix: ANTHROPIC_PREFIX,
providerTag: "anthropic",
sessionId,
suffix: "compact-prime-a",
promptSections: 96,
});
await runEmbeddedCacheProbe({
...fixture,
cacheRetention: "short",
prefix: ANTHROPIC_PREFIX,
providerTag: "anthropic",
sessionId,
suffix: "compact-prime-b",
promptSections: 96,
});
const compacted = await compactLiveCacheSession({
...fixture,
cacheRetention: "short",
providerTag: "anthropic",
sessionId,
});
logLiveCache(
`anthropic compaction ok=${compacted.ok} compacted=${compacted.compacted} reason=${compacted.reason ?? "none"}`,
);
expect(compacted.ok).toBe(true);
expect(compacted.compacted).toBe(true);
const followup = await runEmbeddedCacheProbe({
...fixture,
cacheRetention: "short",
prefix: ANTHROPIC_PREFIX,
providerTag: "anthropic",
sessionId,
suffix: "compact-hit",
});
logLiveCache(
`anthropic compaction followup cacheWrite=${followup.usage.cacheWrite} cacheRead=${followup.usage.cacheRead} input=${followup.usage.input} rate=${followup.hitRate.toFixed(3)}`,
);
expect(followup.usage.cacheRead ?? 0).toBeGreaterThan(1_024);
expect(followup.hitRate).toBeGreaterThanOrEqual(0.3);
},
10 * 60_000,
);
});
});