test: isolate base vitest thread blockers

This commit is contained in:
Peter Steinberger
2026-03-23 01:24:52 -07:00
parent 8b02ef1332
commit f64f3fdb53
8 changed files with 886 additions and 329 deletions

View File

@@ -334,6 +334,47 @@ export async function loadCompactHooksHarness(): Promise<{
splitSdkTools: vi.fn(() => ({ builtInTools: [], customTools: [] })),
}));
vi.doMock("./compaction-safety-timeout.js", () => ({
compactWithSafetyTimeout: vi.fn(
async (
compact: () => Promise<unknown>,
_timeoutMs?: number,
opts?: { abortSignal?: AbortSignal; onCancel?: () => void },
) => {
const abortSignal = opts?.abortSignal;
if (!abortSignal) {
return await compact();
}
const cancelAndCreateError = () => {
opts?.onCancel?.();
const reason = "reason" in abortSignal ? abortSignal.reason : undefined;
if (reason instanceof Error) {
return reason;
}
const err = new Error("aborted");
err.name = "AbortError";
return err;
};
if (abortSignal.aborted) {
throw cancelAndCreateError();
}
return await Promise.race([
compact(),
new Promise<never>((_, reject) => {
abortSignal.addEventListener(
"abort",
() => {
reject(cancelAndCreateError());
},
{ once: true },
);
}),
]);
},
),
resolveCompactionTimeoutMs: vi.fn(() => 30_000),
}));
vi.doMock("./wait-for-idle-before-flush.js", () => ({
flushPendingToolResultsAfterIdle: vi.fn(async () => {}),
}));

View File

@@ -77,17 +77,6 @@ function compactionConfig(mode: "await" | "off" | "async") {
} as never;
}
function directCompactionArgs(overrides: Record<string, unknown> = {}) {
return {
sessionId: TEST_SESSION_ID,
sessionKey: TEST_SESSION_KEY,
sessionFile: TEST_SESSION_FILE,
workspaceDir: TEST_WORKSPACE_DIR,
customInstructions: TEST_CUSTOM_INSTRUCTIONS,
...overrides,
};
}
function wrappedCompactionArgs(overrides: Record<string, unknown> = {}) {
return {
sessionId: TEST_SESSION_ID,
@@ -174,14 +163,6 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
unregisterApiProviders(getCustomApiRegistrySourceId("ollama"));
});
async function runDirectCompaction(customInstructions = TEST_CUSTOM_INSTRUCTIONS) {
return await compactEmbeddedPiSessionDirect(
directCompactionArgs({
customInstructions,
}),
);
}
it("bootstraps runtime plugins with the resolved workspace", async () => {
// This assertion only cares about bootstrap wiring, so stop before the
// rest of the compaction pipeline can pull in unrelated runtime surfaces.
@@ -230,23 +211,39 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
it("emits internal + plugin compaction hooks with counts", async () => {
hookRunner.hasHooks.mockReturnValue(true);
let sanitizedCount = 0;
sanitizeSessionHistoryMock.mockImplementation(async (params: { messages: unknown[] }) => {
const sanitized = params.messages.slice(1);
sanitizedCount = sanitized.length;
return sanitized;
const originalMessages = sessionMessages.slice(1) as AgentMessage[];
const currentMessages = sessionMessages.slice(1) as AgentMessage[];
const beforeMetrics = compactTesting.buildBeforeCompactionHookMetrics({
originalMessages,
currentMessages,
estimateTokensFn: estimateTokensMock as (message: AgentMessage) => number,
});
const result = await compactEmbeddedPiSessionDirect({
const { hookSessionKey, missingSessionKey } = await compactTesting.runBeforeCompactionHooks({
hookRunner,
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile: "/tmp/session.jsonl",
sessionAgentId: "main",
workspaceDir: "/tmp",
messageChannel: "telegram",
customInstructions: "focus on decisions",
messageProvider: "telegram",
metrics: beforeMetrics,
});
await compactTesting.runAfterCompactionHooks({
hookRunner,
sessionId: "session-1",
sessionAgentId: "main",
hookSessionKey,
missingSessionKey,
workspaceDir: "/tmp",
messageProvider: "telegram",
messageCountAfter: 1,
tokensAfter: 10,
compactedCount: 1,
sessionFile: "/tmp/session.jsonl",
summaryLength: "summary".length,
tokensBefore: 120,
firstKeptEntryId: "entry-1",
});
expect(result.ok).toBe(true);
expect(sessionHook("compact:before")).toMatchObject({
type: "session",
action: "compact:before",
@@ -257,8 +254,8 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
expect(beforeContext).toMatchObject({
messageCount: 2,
tokenCount: 20,
messageCountOriginal: sanitizedCount,
tokenCountOriginal: sanitizedCount * 10,
messageCountOriginal: 2,
tokenCountOriginal: 20,
});
expect(afterContext).toMatchObject({
messageCount: 1,
@@ -288,15 +285,33 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
it("uses sessionId as hook session key fallback when sessionKey is missing", async () => {
hookRunner.hasHooks.mockReturnValue(true);
const result = await compactEmbeddedPiSessionDirect({
const originalMessages = sessionMessages.slice(1) as AgentMessage[];
const currentMessages = sessionMessages.slice(1) as AgentMessage[];
const beforeMetrics = compactTesting.buildBeforeCompactionHookMetrics({
originalMessages,
currentMessages,
estimateTokensFn: estimateTokensMock as (message: AgentMessage) => number,
});
const { hookSessionKey, missingSessionKey } = await compactTesting.runBeforeCompactionHooks({
hookRunner,
sessionId: "session-1",
sessionFile: "/tmp/session.jsonl",
sessionAgentId: "main",
workspaceDir: "/tmp",
customInstructions: "focus on decisions",
metrics: beforeMetrics,
});
await compactTesting.runAfterCompactionHooks({
hookRunner,
sessionId: "session-1",
sessionAgentId: "main",
hookSessionKey,
missingSessionKey,
workspaceDir: "/tmp",
messageCountAfter: 1,
tokensAfter: 10,
compactedCount: 1,
sessionFile: "/tmp/session.jsonl",
});
expect(result.ok).toBe(true);
expect(sessionHook("compact:before")?.sessionKey).toBe("session-1");
expect(sessionHook("compact:after")?.sessionKey).toBe("session-1");
expect(hookRunner.runBeforeCompaction).toHaveBeenCalledWith(
@@ -311,11 +326,20 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
it("applies validated transcript before hooks even when it becomes empty", async () => {
hookRunner.hasHooks.mockReturnValue(true);
sanitizeSessionHistoryMock.mockResolvedValue([]);
const beforeMetrics = compactTesting.buildBeforeCompactionHookMetrics({
originalMessages: [],
currentMessages: [],
estimateTokensFn: estimateTokensMock as (message: AgentMessage) => number,
});
await compactTesting.runBeforeCompactionHooks({
hookRunner,
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionAgentId: "main",
workspaceDir: "/tmp",
metrics: beforeMetrics,
});
const result = await runDirectCompaction();
expect(result.ok).toBe(true);
const beforeContext = sessionHook("compact:before")?.context;
expect(beforeContext).toMatchObject({
messageCountOriginal: 0,
@@ -329,15 +353,11 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
const cleanup = onSessionTranscriptUpdate(listener);
try {
const result = await compactEmbeddedPiSessionDirect({
sessionId: "session-1",
await compactTesting.runPostCompactionSideEffects({
sessionKey: "agent:main:session-1",
sessionFile: " /tmp/session.jsonl ",
workspaceDir: "/tmp",
customInstructions: "focus on decisions",
});
expect(result.ok).toBe(true);
expect(listener).toHaveBeenCalledTimes(1);
expect(listener).toHaveBeenCalledWith({ sessionFile: "/tmp/session.jsonl" });
} finally {
@@ -356,24 +376,13 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
}
return 5;
});
sessionCompactImpl.mockResolvedValue({
summary: "summary",
firstKeptEntryId: "entry-1",
tokensBefore: 20,
details: { ok: true },
const tokensAfter = compactTesting.estimateTokensAfterCompaction({
messagesAfter: [{ role: "user", content: "kept ask" }] as AgentMessage[],
fullSessionTokensBefore: 55,
estimateTokensFn: estimateTokensMock as (message: AgentMessage) => number,
});
const result = await runDirectCompaction();
expect(result).toMatchObject({
ok: true,
compacted: true,
result: {
tokensBefore: 20,
tokensAfter: 30,
},
});
expect(sessionHook("compact:after")?.context?.tokenCount).toBe(30);
expect(tokensAfter).toBe(30);
});
it("treats pre-compaction token estimation failures as a no-op sanity check", async () => {
@@ -387,29 +396,20 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
}
return 5;
});
sessionCompactImpl.mockResolvedValue({
summary: "summary",
firstKeptEntryId: "entry-1",
tokensBefore: 20,
details: { ok: true },
const beforeMetrics = compactTesting.buildBeforeCompactionHookMetrics({
originalMessages: sessionMessages as AgentMessage[],
currentMessages: sessionMessages as AgentMessage[],
estimateTokensFn: estimateTokensMock as (message: AgentMessage) => number,
});
const tokensAfter = compactTesting.estimateTokensAfterCompaction({
messagesAfter: [{ role: "user", content: "kept ask" }] as AgentMessage[],
fullSessionTokensBefore: 0,
estimateTokensFn: estimateTokensMock as (message: AgentMessage) => number,
});
const result = await compactEmbeddedPiSessionDirect({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
customInstructions: "focus on decisions",
});
expect(result).toMatchObject({
ok: true,
compacted: true,
result: {
tokensAfter: 30,
},
});
expect(sessionHook("compact:after")?.context?.tokenCount).toBe(30);
expect(beforeMetrics.tokenCountOriginal).toBeUndefined();
expect(beforeMetrics.tokenCountBefore).toBeUndefined();
expect(tokensAfter).toBe(30);
});
it("skips sync in await mode when postCompactionForce is false", async () => {
@@ -424,13 +424,12 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
},
});
const result = await compactEmbeddedPiSessionDirect(
directCompactionArgs({
config: compactionConfig("await"),
}),
);
await compactTesting.runPostCompactionSideEffects({
config: compactionConfig("await"),
sessionKey: TEST_SESSION_KEY,
sessionFile: TEST_SESSION_FILE,
});
expect(result.ok).toBe(true);
expect(resolveSessionAgentIdMock).toHaveBeenCalledWith({
sessionKey: TEST_SESSION_KEY,
config: expect.any(Object),
@@ -449,11 +448,11 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } });
let settled = false;
const resultPromise = compactEmbeddedPiSessionDirect(
directCompactionArgs({
config: compactionConfig("await"),
}),
);
const resultPromise = compactTesting.runPostCompactionSideEffects({
config: compactionConfig("await"),
sessionKey: TEST_SESSION_KEY,
sessionFile: TEST_SESSION_FILE,
});
void resultPromise.then(() => {
settled = true;
@@ -464,8 +463,7 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
});
expect(settled).toBe(false);
syncRelease.resolve(undefined);
const result = await resultPromise;
expect(result.ok).toBe(true);
await resultPromise;
expect(settled).toBe(true);
});
@@ -473,13 +471,12 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
const sync = vi.fn(async () => {});
getMemorySearchManagerMock.mockResolvedValue({ manager: { sync } });
const result = await compactEmbeddedPiSessionDirect(
directCompactionArgs({
config: compactionConfig("off"),
}),
);
await compactTesting.runPostCompactionSideEffects({
config: compactionConfig("off"),
sessionKey: TEST_SESSION_KEY,
sessionFile: TEST_SESSION_FILE,
});
expect(result.ok).toBe(true);
expect(resolveSessionAgentIdMock).not.toHaveBeenCalled();
expect(getMemorySearchManagerMock).not.toHaveBeenCalled();
expect(sync).not.toHaveBeenCalled();
@@ -499,11 +496,11 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
});
let settled = false;
const resultPromise = compactEmbeddedPiSessionDirect(
directCompactionArgs({
config: compactionConfig("async"),
}),
);
const resultPromise = compactTesting.runPostCompactionSideEffects({
config: compactionConfig("async"),
sessionKey: TEST_SESSION_KEY,
sessionFile: TEST_SESSION_FILE,
});
await managerRequested.promise;
void resultPromise.then(() => {
@@ -521,9 +518,7 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
});
it("skips compaction when the transcript only contains boilerplate replies and tool output", async () => {
sessionMessages.splice(
0,
sessionMessages.length,
const messages = [
{ role: "user", content: "<b>HEARTBEAT_OK</b>", timestamp: 1 },
{
role: "toolResult",
@@ -533,50 +528,22 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
isError: false,
timestamp: 2,
},
);
] as AgentMessage[];
const result = await compactEmbeddedPiSessionDirect({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
customInstructions: "focus on decisions",
});
expect(result).toMatchObject({
ok: true,
compacted: false,
reason: "no real conversation messages",
});
expect(sessionCompactImpl).not.toHaveBeenCalled();
expect(compactTesting.containsRealConversationMessages(messages)).toBe(false);
});
it("skips compaction when the transcript only contains heartbeat boilerplate and reasoning blocks", async () => {
sessionMessages.splice(
0,
sessionMessages.length,
const messages = [
{ role: "user", content: "<b>HEARTBEAT_OK</b>", timestamp: 1 },
{
role: "assistant",
content: [{ type: "thinking", thinking: "checking" }],
timestamp: 2,
},
);
] as AgentMessage[];
const result = await compactEmbeddedPiSessionDirect({
sessionId: "session-1",
sessionKey: "agent:main:session-1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
customInstructions: "focus on decisions",
});
expect(result).toMatchObject({
ok: true,
compacted: false,
reason: "no real conversation messages",
});
expect(sessionCompactImpl).not.toHaveBeenCalled();
expect(compactTesting.containsRealConversationMessages(messages)).toBe(false);
});
it("does not treat assistant-only tool-call blocks as meaningful conversation", () => {
@@ -661,20 +628,30 @@ describe("compactEmbeddedPiSessionDirect hooks", () => {
});
it("aborts in-flight compaction when the caller abort signal fires", async () => {
const { compactWithSafetyTimeout } = await vi.importActual<
typeof import("./compaction-safety-timeout.js")
>("./compaction-safety-timeout.js");
const controller = new AbortController();
sessionCompactImpl.mockImplementationOnce(() => new Promise<never>(() => {}));
const compactStarted = createDeferred<void>();
const resultPromise = compactEmbeddedPiSessionDirect(
directCompactionArgs({
const resultPromise = compactWithSafetyTimeout(
async () => {
compactStarted.resolve(undefined);
return await new Promise<never>(() => {});
},
30_000,
{
abortSignal: controller.signal,
}),
onCancel: () => {
sessionAbortCompactionMock();
},
},
);
await compactStarted.promise;
controller.abort(new Error("request timed out"));
const result = await resultPromise;
expect(result.ok).toBe(false);
expect(result.reason).toContain("request timed out");
await expect(resultPromise).rejects.toThrow("request timed out");
expect(sessionAbortCompactionMock).toHaveBeenCalledTimes(1);
});
});

View File

@@ -387,6 +387,222 @@ async function runPostCompactionSideEffects(params: {
});
}
type CompactionHookRunner = {
hasHooks?: (hookName?: string) => boolean;
runBeforeCompaction?: (
metrics: { messageCount: number; tokenCount?: number },
context: {
sessionId: string;
agentId: string;
sessionKey: string;
workspaceDir: string;
messageProvider?: string;
},
) => Promise<void> | void;
runAfterCompaction?: (
metrics: {
messageCount: number;
tokenCount?: number;
compactedCount: number;
sessionFile: string;
},
context: {
sessionId: string;
agentId: string;
sessionKey: string;
workspaceDir: string;
messageProvider?: string;
},
) => Promise<void> | void;
};
function asCompactionHookRunner(
hookRunner: ReturnType<typeof getGlobalHookRunner> | null | undefined,
): CompactionHookRunner | null {
if (!hookRunner) {
return null;
}
return {
hasHooks: (hookName?: string) => hookRunner.hasHooks?.(hookName as never) ?? false,
runBeforeCompaction: hookRunner.runBeforeCompaction?.bind(hookRunner),
runAfterCompaction: hookRunner.runAfterCompaction?.bind(hookRunner),
};
}
function estimateTokenCountSafe(
messages: AgentMessage[],
estimateTokensFn: (message: AgentMessage) => number,
): number | undefined {
try {
let total = 0;
for (const message of messages) {
total += estimateTokensFn(message);
}
return total;
} catch {
return undefined;
}
}
function buildBeforeCompactionHookMetrics(params: {
originalMessages: AgentMessage[];
currentMessages: AgentMessage[];
observedTokenCount?: number;
estimateTokensFn: (message: AgentMessage) => number;
}) {
return {
messageCountOriginal: params.originalMessages.length,
tokenCountOriginal: estimateTokenCountSafe(params.originalMessages, params.estimateTokensFn),
messageCountBefore: params.currentMessages.length,
tokenCountBefore:
params.observedTokenCount ??
estimateTokenCountSafe(params.currentMessages, params.estimateTokensFn),
};
}
async function runBeforeCompactionHooks(params: {
hookRunner?: CompactionHookRunner | null;
sessionId: string;
sessionKey?: string;
sessionAgentId: string;
workspaceDir: string;
messageProvider?: string;
metrics: ReturnType<typeof buildBeforeCompactionHookMetrics>;
}) {
const missingSessionKey = !params.sessionKey || !params.sessionKey.trim();
const hookSessionKey = params.sessionKey?.trim() || params.sessionId;
try {
const hookEvent = createInternalHookEvent("session", "compact:before", hookSessionKey, {
sessionId: params.sessionId,
missingSessionKey,
messageCount: params.metrics.messageCountBefore,
tokenCount: params.metrics.tokenCountBefore,
messageCountOriginal: params.metrics.messageCountOriginal,
tokenCountOriginal: params.metrics.tokenCountOriginal,
});
await triggerInternalHook(hookEvent);
} catch (err) {
log.warn("session:compact:before hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
if (params.hookRunner?.hasHooks?.("before_compaction")) {
try {
await params.hookRunner.runBeforeCompaction?.(
{
messageCount: params.metrics.messageCountBefore,
tokenCount: params.metrics.tokenCountBefore,
},
{
sessionId: params.sessionId,
agentId: params.sessionAgentId,
sessionKey: hookSessionKey,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider,
},
);
} catch (err) {
log.warn("before_compaction hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
}
return {
hookSessionKey,
missingSessionKey,
};
}
function containsRealConversationMessages(messages: AgentMessage[]): boolean {
return messages.some((message, index, allMessages) =>
hasRealConversationContent(message, allMessages, index),
);
}
function estimateTokensAfterCompaction(params: {
messagesAfter: AgentMessage[];
observedTokenCount?: number;
fullSessionTokensBefore: number;
estimateTokensFn: (message: AgentMessage) => number;
}) {
const tokensAfter = estimateTokenCountSafe(params.messagesAfter, params.estimateTokensFn);
if (tokensAfter === undefined) {
return undefined;
}
const sanityCheckBaseline = params.observedTokenCount ?? params.fullSessionTokensBefore;
if (
sanityCheckBaseline > 0 &&
tokensAfter >
(params.observedTokenCount !== undefined ? sanityCheckBaseline : sanityCheckBaseline * 1.1)
) {
return undefined;
}
return tokensAfter;
}
async function runAfterCompactionHooks(params: {
hookRunner?: CompactionHookRunner | null;
sessionId: string;
sessionAgentId: string;
hookSessionKey: string;
missingSessionKey: boolean;
workspaceDir: string;
messageProvider?: string;
messageCountAfter: number;
tokensAfter?: number;
compactedCount: number;
sessionFile: string;
summaryLength?: number;
tokensBefore?: number;
firstKeptEntryId?: string;
}) {
try {
const hookEvent = createInternalHookEvent("session", "compact:after", params.hookSessionKey, {
sessionId: params.sessionId,
missingSessionKey: params.missingSessionKey,
messageCount: params.messageCountAfter,
tokenCount: params.tokensAfter,
compactedCount: params.compactedCount,
summaryLength: params.summaryLength,
tokensBefore: params.tokensBefore,
tokensAfter: params.tokensAfter,
firstKeptEntryId: params.firstKeptEntryId,
});
await triggerInternalHook(hookEvent);
} catch (err) {
log.warn("session:compact:after hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
if (params.hookRunner?.hasHooks?.("after_compaction")) {
try {
await params.hookRunner.runAfterCompaction?.(
{
messageCount: params.messageCountAfter,
tokenCount: params.tokensAfter,
compactedCount: params.compactedCount,
sessionFile: params.sessionFile,
},
{
sessionId: params.sessionId,
agentId: params.sessionAgentId,
sessionKey: params.hookSessionKey,
workspaceDir: params.workspaceDir,
messageProvider: params.messageProvider,
},
);
} catch (err) {
log.warn("after_compaction hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
}
}
/**
* Core compaction logic without lane queueing.
* Use this when already inside a session/global lane to avoid deadlocks.
@@ -886,72 +1102,24 @@ export async function compactEmbeddedPiSessionDirect(
if (limited.length > 0) {
session.agent.replaceMessages(limited);
}
const missingSessionKey = !params.sessionKey || !params.sessionKey.trim();
const hookSessionKey = params.sessionKey?.trim() || params.sessionId;
const hookRunner = getGlobalHookRunner();
const hookRunner = asCompactionHookRunner(getGlobalHookRunner());
const observedTokenCount = normalizeObservedTokenCount(params.currentTokenCount);
const messageCountOriginal = originalMessages.length;
let tokenCountOriginal: number | undefined;
try {
tokenCountOriginal = 0;
for (const message of originalMessages) {
tokenCountOriginal += estimateTokens(message);
}
} catch {
tokenCountOriginal = undefined;
}
const messageCountBefore = session.messages.length;
let tokenCountBefore = observedTokenCount;
if (tokenCountBefore === undefined) {
try {
tokenCountBefore = 0;
for (const message of session.messages) {
tokenCountBefore += estimateTokens(message);
}
} catch {
tokenCountBefore = undefined;
}
}
// TODO(#7175): Consider exposing full message snapshots or pre-compaction injection
// hooks; current events only report counts/metadata.
try {
const hookEvent = createInternalHookEvent("session", "compact:before", hookSessionKey, {
sessionId: params.sessionId,
missingSessionKey,
messageCount: messageCountBefore,
tokenCount: tokenCountBefore,
messageCountOriginal,
tokenCountOriginal,
});
await triggerInternalHook(hookEvent);
} catch (err) {
log.warn("session:compact:before hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
if (hookRunner?.hasHooks("before_compaction")) {
try {
await hookRunner.runBeforeCompaction(
{
messageCount: messageCountBefore,
tokenCount: tokenCountBefore,
},
{
sessionId: params.sessionId,
agentId: sessionAgentId,
sessionKey: hookSessionKey,
workspaceDir: effectiveWorkspace,
messageProvider: resolvedMessageProvider,
},
);
} catch (err) {
log.warn("before_compaction hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
}
const beforeHookMetrics = buildBeforeCompactionHookMetrics({
originalMessages,
currentMessages: session.messages,
observedTokenCount,
estimateTokensFn: estimateTokens,
});
const { hookSessionKey, missingSessionKey } = await runBeforeCompactionHooks({
hookRunner,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionAgentId,
workspaceDir: effectiveWorkspace,
messageProvider: resolvedMessageProvider,
metrics: beforeHookMetrics,
});
const { messageCountOriginal } = beforeHookMetrics;
const diagEnabled = log.isEnabled("debug");
const preMetrics = diagEnabled ? summarizeCompactionMessages(session.messages) : undefined;
if (diagEnabled && preMetrics) {
@@ -967,11 +1135,7 @@ export async function compactEmbeddedPiSessionDirect(
);
}
if (
!session.messages.some((message, index, messages) =>
hasRealConversationContent(message, messages, index),
)
) {
if (!containsRealConversationMessages(session.messages)) {
log.info(
`[compaction] skipping — no real conversation messages (sessionKey=${params.sessionKey ?? params.sessionId})`,
);
@@ -1013,27 +1177,12 @@ export async function compactEmbeddedPiSessionDirect(
sessionFile: params.sessionFile,
});
// Estimate tokens after compaction by summing token estimates for remaining messages
let tokensAfter: number | undefined;
try {
tokensAfter = 0;
for (const message of session.messages) {
tokensAfter += estimateTokens(message);
}
// Sanity check: compare against the best full-session pre-compaction baseline.
// Prefer the provider-observed live count when available; otherwise use the
// heuristic full-session estimate with a 10% margin for counter jitter.
const sanityCheckBaseline = observedTokenCount ?? fullSessionTokensBefore;
if (
sanityCheckBaseline > 0 &&
tokensAfter >
(observedTokenCount !== undefined ? sanityCheckBaseline : sanityCheckBaseline * 1.1)
) {
tokensAfter = undefined; // Don't trust the estimate
}
} catch {
// If estimation fails, leave tokensAfter undefined
tokensAfter = undefined;
}
const tokensAfter = estimateTokensAfterCompaction({
messagesAfter: session.messages,
observedTokenCount,
fullSessionTokensBefore,
estimateTokensFn: estimateTokens,
});
const messageCountAfter = session.messages.length;
const compactedCount = Math.max(0, messageCountCompactionInput - messageCountAfter);
const postMetrics = diagEnabled ? summarizeCompactionMessages(session.messages) : undefined;
@@ -1051,51 +1200,22 @@ export async function compactEmbeddedPiSessionDirect(
`delta.estTokens=${typeof preMetrics.estTokens === "number" && typeof postMetrics.estTokens === "number" ? postMetrics.estTokens - preMetrics.estTokens : "unknown"}`,
);
}
// TODO(#9611): Consider exposing compaction summaries or post-compaction injection;
// current events only report summary metadata.
try {
const hookEvent = createInternalHookEvent("session", "compact:after", hookSessionKey, {
sessionId: params.sessionId,
missingSessionKey,
messageCount: messageCountAfter,
tokenCount: tokensAfter,
compactedCount,
summaryLength: typeof result.summary === "string" ? result.summary.length : undefined,
tokensBefore: result.tokensBefore,
tokensAfter,
firstKeptEntryId: result.firstKeptEntryId,
});
await triggerInternalHook(hookEvent);
} catch (err) {
log.warn("session:compact:after hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
if (hookRunner?.hasHooks("after_compaction")) {
try {
await hookRunner.runAfterCompaction(
{
messageCount: messageCountAfter,
tokenCount: tokensAfter,
compactedCount,
sessionFile: params.sessionFile,
},
{
sessionId: params.sessionId,
agentId: sessionAgentId,
sessionKey: hookSessionKey,
workspaceDir: effectiveWorkspace,
messageProvider: resolvedMessageProvider,
},
);
} catch (err) {
log.warn("after_compaction hook failed", {
errorMessage: err instanceof Error ? err.message : String(err),
errorStack: err instanceof Error ? err.stack : undefined,
});
}
}
await runAfterCompactionHooks({
hookRunner,
sessionId: params.sessionId,
sessionAgentId,
hookSessionKey,
missingSessionKey,
workspaceDir: effectiveWorkspace,
messageProvider: resolvedMessageProvider,
messageCountAfter,
tokensAfter,
compactedCount,
sessionFile: params.sessionFile,
summaryLength: typeof result.summary === "string" ? result.summary.length : undefined,
tokensBefore: result.tokensBefore,
firstKeptEntryId: result.firstKeptEntryId,
});
// Truncate session file to remove compacted entries (#39953)
if (params.config?.agents?.defaults?.compaction?.truncateAfterCompaction) {
try {
@@ -1193,7 +1313,9 @@ export async function compactEmbeddedPiSession(
// Fire before_compaction / after_compaction hooks here so plugin subscribers
// are notified regardless of which engine is active.
const engineOwnsCompaction = contextEngine.info.ownsCompaction === true;
const hookRunner = engineOwnsCompaction ? getGlobalHookRunner() : null;
const hookRunner = engineOwnsCompaction
? asCompactionHookRunner(getGlobalHookRunner())
: null;
const hookSessionKey = params.sessionKey?.trim() || params.sessionId;
const { sessionAgentId } = resolveSessionAgentIds({
sessionKey: params.sessionKey,
@@ -1210,12 +1332,11 @@ export async function compactEmbeddedPiSession(
// Engine-owned compaction doesn't load the transcript at this level, so
// message counts are unavailable. We pass sessionFile so hook subscribers
// can read the transcript themselves if they need exact counts.
if (hookRunner?.hasHooks("before_compaction")) {
if (hookRunner?.hasHooks?.("before_compaction") && hookRunner.runBeforeCompaction) {
try {
await hookRunner.runBeforeCompaction(
{
messageCount: -1,
sessionFile: params.sessionFile,
},
hookCtx,
);
@@ -1252,7 +1373,12 @@ export async function compactEmbeddedPiSession(
sessionFile: params.sessionFile,
});
}
if (result.ok && result.compacted && hookRunner?.hasHooks("after_compaction")) {
if (
result.ok &&
result.compacted &&
hookRunner?.hasHooks?.("after_compaction") &&
hookRunner.runAfterCompaction
) {
try {
await hookRunner.runAfterCompaction(
{
@@ -1293,4 +1419,10 @@ export async function compactEmbeddedPiSession(
export const __testing = {
hasRealConversationContent,
hasMeaningfulConversationContent,
containsRealConversationMessages,
estimateTokensAfterCompaction,
buildBeforeCompactionHookMetrics,
runBeforeCompactionHooks,
runAfterCompactionHooks,
runPostCompactionSideEffects,
} as const;

View File

@@ -5,6 +5,9 @@ vi.mock("../pi-model-discovery.js", () => ({
discoverModels: vi.fn(() => ({ find: vi.fn(() => null) })),
}));
const OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1";
const OPENROUTER_FALLBACK_COST = { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 };
import type { OpenRouterModelCapabilities } from "./openrouter-model-capabilities.js";
const mockGetOpenRouterModelCapabilities = vi.fn<
@@ -19,7 +22,348 @@ vi.mock("./openrouter-model-capabilities.js", () => ({
mockLoadOpenRouterModelCapabilities(modelId),
}));
vi.mock("../../plugins/provider-runtime.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../../plugins/provider-runtime.js")>();
const HANDLED_DYNAMIC_PROVIDERS = new Set([
"openrouter",
"github-copilot",
"openai-codex",
"openai",
"anthropic",
"zai",
]);
const OPENAI_BASE_URL = "https://api.openai.com/v1";
const OPENAI_CODEX_BASE_URL = "https://chatgpt.com/backend-api";
const ANTHROPIC_BASE_URL = "https://api.anthropic.com";
const ZAI_BASE_URL = "https://api.z.ai/api/paas/v4";
const DEFAULT_CONTEXT_WINDOW = 200_000;
const DEFAULT_MAX_TOKENS = 8192;
const findTemplate = (
ctx: { modelRegistry: { find: (provider: string, modelId: string) => unknown } },
provider: string,
templateIds: readonly string[],
) => {
for (const templateId of templateIds) {
const template = ctx.modelRegistry.find(provider, templateId) as Record<
string,
unknown
> | null;
if (template) {
return template;
}
}
return undefined;
};
const cloneTemplate = (
template: Record<string, unknown> | undefined,
modelId: string,
patch: Record<string, unknown>,
fallback: Record<string, unknown>,
) =>
({
...(template ?? fallback),
id: modelId,
name: modelId,
...patch,
}) as Record<string, unknown>;
const buildOpenRouterModel = (modelId: string) => {
const capabilities = mockGetOpenRouterModelCapabilities(modelId);
return {
id: modelId,
name: capabilities?.name ?? modelId,
api: "openai-completions" as const,
provider: "openrouter",
baseUrl: OPENROUTER_BASE_URL,
reasoning: capabilities?.reasoning ?? false,
input: capabilities?.input ?? (["text"] as const),
cost: capabilities?.cost ?? OPENROUTER_FALLBACK_COST,
contextWindow: capabilities?.contextWindow ?? 200_000,
maxTokens: capabilities?.maxTokens ?? 8192,
};
};
const buildDynamicModel = (params: {
provider: string;
modelId: string;
modelRegistry: { find: (provider: string, modelId: string) => unknown };
}) => {
const modelId = params.modelId.trim();
const lower = modelId.toLowerCase();
switch (params.provider) {
case "openrouter":
return buildOpenRouterModel(modelId);
case "github-copilot": {
const existing = params.modelRegistry.find("github-copilot", lower);
if (existing) {
return undefined;
}
const template = findTemplate(params, "github-copilot", ["gpt-5.2-codex"]);
if (lower === "gpt-5.3-codex" && template) {
return cloneTemplate(
template,
modelId,
{},
{
provider: "github-copilot",
api: "openai-responses",
reasoning: false,
input: ["text", "image"],
cost: OPENROUTER_FALLBACK_COST,
contextWindow: 128_000,
maxTokens: DEFAULT_MAX_TOKENS,
},
);
}
return {
id: modelId,
name: modelId,
provider: "github-copilot",
api: "openai-responses",
reasoning: /^o[13](\\b|$)/.test(lower),
input: ["text", "image"],
cost: OPENROUTER_FALLBACK_COST,
contextWindow: 128_000,
maxTokens: DEFAULT_MAX_TOKENS,
};
}
case "openai-codex": {
const template =
lower === "gpt-5.4"
? findTemplate(params, "openai-codex", ["gpt-5.3-codex", "gpt-5.2-codex"])
: lower === "gpt-5.3-codex-spark"
? findTemplate(params, "openai-codex", ["gpt-5.3-codex", "gpt-5.2-codex"])
: findTemplate(params, "openai-codex", ["gpt-5.2-codex"]);
const fallback = {
provider: "openai-codex",
api: "openai-codex-responses",
baseUrl: OPENAI_CODEX_BASE_URL,
reasoning: true,
input: ["text", "image"],
cost: OPENROUTER_FALLBACK_COST,
contextWindow: DEFAULT_CONTEXT_WINDOW,
maxTokens: DEFAULT_CONTEXT_WINDOW,
};
if (lower === "gpt-5.4") {
return cloneTemplate(
template,
modelId,
{
contextWindow: 1_050_000,
maxTokens: 128_000,
provider: "openai-codex",
api: "openai-codex-responses",
baseUrl: OPENAI_CODEX_BASE_URL,
},
fallback,
);
}
if (lower === "gpt-5.3-codex-spark") {
return cloneTemplate(
template,
modelId,
{
provider: "openai-codex",
api: "openai-codex-responses",
baseUrl: OPENAI_CODEX_BASE_URL,
reasoning: true,
input: ["text"],
cost: OPENROUTER_FALLBACK_COST,
contextWindow: 128_000,
maxTokens: 128_000,
},
fallback,
);
}
if (lower === "gpt-5.3-codex") {
return cloneTemplate(
template,
modelId,
{
provider: "openai-codex",
api: "openai-codex-responses",
baseUrl: OPENAI_CODEX_BASE_URL,
},
fallback,
);
}
return undefined;
}
case "openai": {
const templateIds =
lower === "gpt-5.4"
? ["gpt-5.2"]
: lower === "gpt-5.4-pro"
? ["gpt-5.2-pro", "gpt-5.2"]
: lower === "gpt-5.4-mini"
? ["gpt-5-mini"]
: lower === "gpt-5.4-nano"
? ["gpt-5-nano", "gpt-5-mini"]
: undefined;
if (!templateIds) {
return undefined;
}
const template = findTemplate(params, "openai", templateIds);
const patch =
lower === "gpt-5.4" || lower === "gpt-5.4-pro"
? {
provider: "openai",
api: "openai-responses",
baseUrl: OPENAI_BASE_URL,
reasoning: true,
input: ["text", "image"],
contextWindow: 1_050_000,
maxTokens: 128_000,
}
: {
provider: "openai",
api: "openai-responses",
baseUrl: OPENAI_BASE_URL,
reasoning: true,
input: ["text", "image"],
};
return cloneTemplate(template, modelId, patch, {
provider: "openai",
api: "openai-responses",
baseUrl: OPENAI_BASE_URL,
reasoning: true,
input: ["text", "image"],
cost: OPENROUTER_FALLBACK_COST,
contextWindow: patch.contextWindow ?? DEFAULT_CONTEXT_WINDOW,
maxTokens: patch.maxTokens ?? DEFAULT_CONTEXT_WINDOW,
});
}
case "anthropic": {
if (lower !== "claude-opus-4-6" && lower !== "claude-sonnet-4-6") {
return undefined;
}
const template = findTemplate(
params,
"anthropic",
lower === "claude-opus-4-6" ? ["claude-opus-4-5"] : ["claude-sonnet-4-5"],
);
return cloneTemplate(
template,
modelId,
{
provider: "anthropic",
api: "anthropic-messages",
baseUrl: ANTHROPIC_BASE_URL,
reasoning: true,
},
{
provider: "anthropic",
api: "anthropic-messages",
baseUrl: ANTHROPIC_BASE_URL,
reasoning: true,
input: ["text", "image"],
cost: OPENROUTER_FALLBACK_COST,
contextWindow: DEFAULT_CONTEXT_WINDOW,
maxTokens: DEFAULT_CONTEXT_WINDOW,
},
);
}
case "zai": {
if (lower !== "glm-5") {
return undefined;
}
const template = findTemplate(params, "zai", ["glm-4.7"]);
return cloneTemplate(
template,
modelId,
{
provider: "zai",
api: "openai-completions",
baseUrl: ZAI_BASE_URL,
reasoning: true,
},
{
provider: "zai",
api: "openai-completions",
baseUrl: ZAI_BASE_URL,
reasoning: true,
input: ["text"],
cost: OPENROUTER_FALLBACK_COST,
contextWindow: DEFAULT_CONTEXT_WINDOW,
maxTokens: DEFAULT_CONTEXT_WINDOW,
},
);
}
default:
return undefined;
}
};
const normalizeDynamicModel = (params: { provider: string; model: Record<string, unknown> }) => {
if (params.provider === "openai") {
const baseUrl = typeof params.model.baseUrl === "string" ? params.model.baseUrl : undefined;
if (params.model.api === "openai-completions" && (!baseUrl || baseUrl === OPENAI_BASE_URL)) {
return { ...params.model, api: "openai-responses" };
}
}
if (params.provider === "openai-codex") {
const baseUrl = typeof params.model.baseUrl === "string" ? params.model.baseUrl : undefined;
const nextApi =
params.model.api === "openai-responses" &&
(!baseUrl || baseUrl === OPENAI_BASE_URL || baseUrl === OPENAI_CODEX_BASE_URL)
? "openai-codex-responses"
: params.model.api;
const nextBaseUrl =
nextApi === "openai-codex-responses" && (!baseUrl || baseUrl === OPENAI_BASE_URL)
? OPENAI_CODEX_BASE_URL
: baseUrl;
if (nextApi !== params.model.api || nextBaseUrl !== baseUrl) {
return { ...params.model, api: nextApi, baseUrl: nextBaseUrl };
}
}
return undefined;
};
return {
...actual,
resolveProviderRuntimePlugin: (
params: Parameters<typeof actual.resolveProviderRuntimePlugin>[0],
) =>
HANDLED_DYNAMIC_PROVIDERS.has(params.provider)
? {
id: params.provider,
prepareDynamicModel:
params.provider === "openrouter"
? async (ctx: { modelId: string }) => {
await mockLoadOpenRouterModelCapabilities(ctx.modelId);
}
: undefined,
resolveDynamicModel: (ctx: {
provider: string;
modelId: string;
modelRegistry: { find: (provider: string, modelId: string) => unknown };
}) => buildDynamicModel(ctx),
normalizeResolvedModel: (ctx: { provider: string; model: Record<string, unknown> }) =>
normalizeDynamicModel(ctx),
}
: actual.resolveProviderRuntimePlugin(params),
runProviderDynamicModel: (params: Parameters<typeof actual.runProviderDynamicModel>[0]) =>
buildDynamicModel({
provider: params.provider,
modelId: params.context.modelId,
modelRegistry: params.context.modelRegistry,
}) ?? actual.runProviderDynamicModel(params),
prepareProviderDynamicModel: async (
params: Parameters<typeof actual.prepareProviderDynamicModel>[0],
) =>
params.provider === "openrouter"
? await mockLoadOpenRouterModelCapabilities(params.context.modelId)
: await actual.prepareProviderDynamicModel(params),
normalizeProviderResolvedModelWithPlugin: (
params: Parameters<typeof actual.normalizeProviderResolvedModelWithPlugin>[0],
) =>
HANDLED_DYNAMIC_PROVIDERS.has(params.provider)
? normalizeDynamicModel({
provider: params.provider,
model: params.context.model as unknown as Record<string, unknown>,
})
: actual.normalizeProviderResolvedModelWithPlugin(params),
};
});
import type { OpenClawConfig } from "../../config/config.js";
import { clearProviderRuntimeHookCache } from "../../plugins/provider-runtime.js";
import { buildInlineProviderModels, resolveModel, resolveModelAsync } from "./model.js";
import {
buildOpenAICodexForwardCompatExpectation,
@@ -30,6 +374,7 @@ import {
} from "./model.test-harness.js";
beforeEach(() => {
clearProviderRuntimeHookCache();
resetMockDiscoverModels();
mockGetOpenRouterModelCapabilities.mockReset();
mockGetOpenRouterModelCapabilities.mockReturnValue(undefined);

View File

@@ -103,6 +103,8 @@ type RunWithModelFallbackParams = {
};
beforeEach(() => {
vi.useRealTimers();
vi.clearAllTimers();
runEmbeddedPiAgentMock.mockClear();
runCliAgentMock.mockClear();
runWithModelFallbackMock.mockClear();
@@ -151,6 +153,7 @@ beforeEach(() => {
});
afterEach(() => {
vi.clearAllTimers();
vi.useRealTimers();
resetSystemEventsForTest();
});

View File

@@ -142,6 +142,8 @@ afterAll(async () => {
});
beforeEach(() => {
vi.useRealTimers();
vi.clearAllTimers();
setDefaultChannelPluginRegistryForTests();
readConfigFileSnapshotMock.mockImplementation(async () => {
const configPath = process.env.OPENCLAW_CONFIG_PATH;

View File

@@ -3,8 +3,11 @@ import path from "node:path";
import { beforeEach, describe, expect, it, type MockInstance, vi } from "vitest";
import { withTempHome as withTempHomeBase } from "../../test/helpers/temp-home.js";
import "../cron/isolated-agent.mocks.js";
import { __testing as acpManagerTesting } from "../acp/control-plane/manager.js";
import { resolveAgentDir, resolveSessionAgentId } from "../agents/agent-scope.js";
import * as authProfilesModule from "../agents/auth-profiles.js";
import * as cliRunnerModule from "../agents/cli-runner.js";
import { resolveSession } from "../agents/command/session.js";
import { FailoverError } from "../agents/failover-error.js";
import { loadModelCatalog } from "../agents/model-catalog.js";
import * as modelSelectionModule from "../agents/model-selection.js";
@@ -12,13 +15,19 @@ import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import * as commandSecretGatewayModule from "../cli/command-secret-gateway.js";
import type { OpenClawConfig } from "../config/config.js";
import * as configModule from "../config/config.js";
import { clearSessionStoreCacheForTest } from "../config/sessions.js";
import * as sessionPathsModule from "../config/sessions/paths.js";
import { emitAgentEvent, onAgentEvent } from "../infra/agent-events.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import {
emitAgentEvent,
onAgentEvent,
resetAgentEventsForTest,
resetAgentRunContextForTest,
} from "../infra/agent-events.js";
import { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
import { resetPluginRuntimeStateForTest, setActivePluginRegistry } from "../plugins/runtime.js";
import type { RuntimeEnv } from "../runtime.js";
import { createOutboundTestPlugin, createTestRegistry } from "../test-utils/channel-plugins.js";
import { agentCommand, agentCommandFromIngress } from "./agent.js";
import * as agentDeliveryModule from "./agent/delivery.js";
vi.mock("../logging/subsystem.js", () => {
const createMockLogger = () => ({
@@ -57,6 +66,14 @@ vi.mock("../agents/workspace.js", () => {
};
});
vi.mock("../agents/command/session-store.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("../agents/command/session-store.js")>();
return {
...actual,
updateSessionStoreAfterAgentRun: vi.fn(async () => undefined),
};
});
vi.mock("../agents/skills.js", () => ({
buildWorkspaceSkillSnapshot: vi.fn(() => undefined),
}));
@@ -77,7 +94,6 @@ const configSpy = vi.spyOn(configModule, "loadConfig");
const readConfigFileSnapshotForWriteSpy = vi.spyOn(configModule, "readConfigFileSnapshotForWrite");
const setRuntimeConfigSnapshotSpy = vi.spyOn(configModule, "setRuntimeConfigSnapshot");
const runCliAgentSpy = vi.spyOn(cliRunnerModule, "runCliAgent");
const deliverAgentCommandResultSpy = vi.spyOn(agentDeliveryModule, "deliverAgentCommandResult");
async function withTempHome<T>(fn: (home: string) => Promise<T>): Promise<T> {
return withTempHomeBase(fn, { prefix: "openclaw-agent-" });
@@ -90,7 +106,7 @@ function mockConfig(
telegramOverrides?: Partial<NonNullable<NonNullable<OpenClawConfig["channels"]>["telegram"]>>,
agentsList?: Array<{ id: string; default?: boolean }>,
) {
configSpy.mockReturnValue({
const cfg = {
agents: {
defaults: {
model: { primary: "anthropic/claude-opus-4-5" },
@@ -104,7 +120,9 @@ function mockConfig(
channels: {
telegram: telegramOverrides ? { ...telegramOverrides } : undefined,
},
});
} as OpenClawConfig;
configSpy.mockReturnValue(cfg);
return cfg;
}
async function runWithDefaultAgentConfig(params: {
@@ -168,12 +186,7 @@ function readSessionStore<T>(storePath: string): Record<string, T> {
}
async function withCrossAgentResumeFixture(
run: (params: {
home: string;
storePattern: string;
sessionId: string;
sessionKey: string;
}) => Promise<void>,
run: (params: { sessionId: string; sessionKey: string; cfg: OpenClawConfig }) => Promise<void>,
): Promise<void> {
await withTempHome(async (home) => {
const storePattern = path.join(home, "sessions", "{agentId}", "sessions.json");
@@ -187,12 +200,11 @@ async function withCrossAgentResumeFixture(
systemSent: true,
},
});
mockConfig(home, storePattern, undefined, undefined, [
const cfg = mockConfig(home, storePattern, undefined, undefined, [
{ id: "dev" },
{ id: "exec", default: true },
]);
await agentCommand({ message: "resume me", sessionId }, runtime);
await run({ home, storePattern, sessionId, sessionKey });
await run({ sessionId, sessionKey, cfg });
});
}
@@ -278,6 +290,11 @@ function createTelegramOutboundPlugin() {
beforeEach(() => {
vi.clearAllMocks();
clearSessionStoreCacheForTest();
resetAgentEventsForTest();
resetAgentRunContextForTest();
resetPluginRuntimeStateForTest();
acpManagerTesting.resetAcpSessionManagerForTests();
configModule.clearRuntimeConfigSnapshot();
runCliAgentSpy.mockResolvedValue(createDefaultAgentResult() as never);
vi.mocked(runEmbeddedPiAgent).mockResolvedValue(createDefaultAgentResult());
@@ -479,19 +496,29 @@ describe("agentCommand", () => {
});
it("uses the resumed session agent scope when sessionId resolves to another agent store", async () => {
await withCrossAgentResumeFixture(async ({ sessionKey }) => {
const callArgs = getLastEmbeddedCall();
expect(callArgs?.sessionKey).toBe(sessionKey);
expect(callArgs?.agentId).toBe("exec");
expect(callArgs?.agentDir).toContain(`${path.sep}agents${path.sep}exec${path.sep}agent`);
await withCrossAgentResumeFixture(async ({ sessionId, sessionKey, cfg }) => {
const resolution = resolveSession({ cfg, sessionId });
expect(resolution.sessionKey).toBe(sessionKey);
const agentId = resolveSessionAgentId({ sessionKey: resolution.sessionKey, config: cfg });
expect(agentId).toBe("exec");
expect(resolveAgentDir(cfg, agentId)).toContain(
`${path.sep}agents${path.sep}exec${path.sep}agent`,
);
});
});
it("forwards resolved outbound session context when resuming by sessionId", async () => {
await withCrossAgentResumeFixture(async ({ sessionKey }) => {
const deliverCall = deliverAgentCommandResultSpy.mock.calls.at(-1)?.[0];
expect(deliverCall?.opts.sessionKey).toBeUndefined();
expect(deliverCall?.outboundSession).toEqual(
await withCrossAgentResumeFixture(async ({ sessionId, sessionKey, cfg }) => {
const resolution = resolveSession({ cfg, sessionId });
expect(resolution.sessionKey).toBe(sessionKey);
const agentId = resolveSessionAgentId({ sessionKey: resolution.sessionKey, config: cfg });
expect(
buildOutboundSessionContext({
cfg,
sessionKey: resolution.sessionKey,
agentId,
}),
).toEqual(
expect.objectContaining({
key: sessionKey,
agentId: "exec",

View File

@@ -3,7 +3,6 @@ import path from "node:path";
import { describe, expect, it, vi } from "vitest";
import { resolveMatrixAccountStorageRoot } from "../../extensions/matrix/runtime-api.js";
import { withTempHome } from "../../test/helpers/temp-home.js";
import * as commandSecretGatewayModule from "../cli/command-secret-gateway.js";
import * as noteModule from "../terminal/note.js";
import { loadAndMaybeMigrateDoctorConfig } from "./doctor-config-flow.js";
import { runDoctorConfigWithInput } from "./doctor-config-flow.test-utils.js";
@@ -36,6 +35,22 @@ async function collectDoctorWarnings(config: Record<string, unknown>): Promise<s
}
}
async function loadFreshDoctorFlowDeps() {
vi.resetModules();
const telegramFetchModule = await import("../../extensions/telegram/src/fetch.js");
const telegramProxyModule = await import("../../extensions/telegram/src/proxy.js");
const freshCommandSecretGatewayModule = await import("../cli/command-secret-gateway.js");
const freshNoteModule = await import("../terminal/note.js");
const doctorFlowModule = await import("./doctor-config-flow.js");
return {
telegramFetchModule,
telegramProxyModule,
commandSecretGatewayModule: freshCommandSecretGatewayModule,
noteModule: freshNoteModule,
loadAndMaybeMigrateDoctorConfig: doctorFlowModule.loadAndMaybeMigrateDoctorConfig,
};
}
type DiscordGuildRule = {
users: string[];
roles: string[];
@@ -599,8 +614,11 @@ describe("doctor config flow", () => {
} as unknown as Response;
});
vi.stubGlobal("fetch", globalFetch);
const telegramFetchModule = await import("../../extensions/telegram/src/fetch.js");
const telegramProxyModule = await import("../../extensions/telegram/src/proxy.js");
const {
telegramFetchModule,
telegramProxyModule,
loadAndMaybeMigrateDoctorConfig: loadDoctorFlowFresh,
} = await loadFreshDoctorFlowDeps();
const resolveTelegramFetch = vi.spyOn(telegramFetchModule, "resolveTelegramFetch");
const makeProxyFetch = vi.spyOn(telegramProxyModule, "makeProxyFetch");
resolveTelegramFetch.mockReturnValue(fetchSpy as unknown as typeof fetch);
@@ -625,7 +643,7 @@ describe("doctor config flow", () => {
},
},
},
run: loadAndMaybeMigrateDoctorConfig,
run: loadDoctorFlowFresh,
});
const cfg = result.cfg as unknown as {
@@ -656,7 +674,9 @@ describe("doctor config flow", () => {
});
it("does not crash when Telegram allowFrom repair sees unavailable SecretRef-backed credentials", async () => {
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
const { noteModule: freshNoteModule, loadAndMaybeMigrateDoctorConfig: loadDoctorFlowFresh } =
await loadFreshDoctorFlowDeps();
const noteSpy = vi.spyOn(freshNoteModule, "note").mockImplementation(() => {});
const fetchSpy = vi.fn();
vi.stubGlobal("fetch", fetchSpy);
try {
@@ -675,7 +695,7 @@ describe("doctor config flow", () => {
},
},
},
run: loadAndMaybeMigrateDoctorConfig,
run: loadDoctorFlowFresh,
});
const cfg = result.cfg as {
@@ -717,14 +737,18 @@ describe("doctor config flow", () => {
});
vi.stubGlobal("fetch", globalFetch);
const proxyFetch = vi.fn();
const telegramFetchModule = await import("../../extensions/telegram/src/fetch.js");
const telegramProxyModule = await import("../../extensions/telegram/src/proxy.js");
const {
telegramFetchModule,
telegramProxyModule,
commandSecretGatewayModule: freshCommandSecretGatewayModule,
loadAndMaybeMigrateDoctorConfig: loadDoctorFlowFresh,
} = await loadFreshDoctorFlowDeps();
const resolveTelegramFetch = vi.spyOn(telegramFetchModule, "resolveTelegramFetch");
const makeProxyFetch = vi.spyOn(telegramProxyModule, "makeProxyFetch");
makeProxyFetch.mockReturnValue(proxyFetch as unknown as typeof fetch);
resolveTelegramFetch.mockReturnValue(fetchSpy as unknown as typeof fetch);
const resolveSecretsSpy = vi
.spyOn(commandSecretGatewayModule, "resolveCommandSecretRefsViaGateway")
.spyOn(freshCommandSecretGatewayModule, "resolveCommandSecretRefsViaGateway")
.mockResolvedValue({
diagnostics: [],
targetStatesByPath: {},
@@ -761,7 +785,7 @@ describe("doctor config flow", () => {
},
},
},
run: loadAndMaybeMigrateDoctorConfig,
run: loadDoctorFlowFresh,
});
const cfg = result.cfg as {
@@ -786,7 +810,12 @@ describe("doctor config flow", () => {
});
it("sanitizes config-derived doctor warnings and changes before logging", async () => {
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
const {
telegramFetchModule,
noteModule: freshNoteModule,
loadAndMaybeMigrateDoctorConfig: loadDoctorFlowFresh,
} = await loadFreshDoctorFlowDeps();
const noteSpy = vi.spyOn(freshNoteModule, "note").mockImplementation(() => {});
const globalFetch = vi.fn(async () => {
throw new Error("global fetch should not be called");
});
@@ -795,7 +824,6 @@ describe("doctor config flow", () => {
json: async () => ({ ok: true, result: { id: 12345 } }),
}));
vi.stubGlobal("fetch", globalFetch);
const telegramFetchModule = await import("../../extensions/telegram/src/fetch.js");
const resolveTelegramFetch = vi.spyOn(telegramFetchModule, "resolveTelegramFetch");
resolveTelegramFetch.mockReturnValue(fetchSpy as unknown as typeof fetch);
try {
@@ -830,7 +858,7 @@ describe("doctor config flow", () => {
},
},
},
run: loadAndMaybeMigrateDoctorConfig,
run: loadDoctorFlowFresh,
});
const outputs = noteSpy.mock.calls
@@ -868,7 +896,9 @@ describe("doctor config flow", () => {
});
it("warns and continues when Telegram account inspection hits inactive SecretRef surfaces", async () => {
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
const { noteModule: freshNoteModule, loadAndMaybeMigrateDoctorConfig: loadDoctorFlowFresh } =
await loadFreshDoctorFlowDeps();
const noteSpy = vi.spyOn(freshNoteModule, "note").mockImplementation(() => {});
const fetchSpy = vi.fn();
vi.stubGlobal("fetch", fetchSpy);
try {
@@ -892,7 +922,7 @@ describe("doctor config flow", () => {
},
},
},
run: loadAndMaybeMigrateDoctorConfig,
run: loadDoctorFlowFresh,
});
const cfg = result.cfg as {