refactor: repair transcripts by sqlite locator

This commit is contained in:
Peter Steinberger
2026-05-09 16:44:58 +01:00
parent 5de21c87e7
commit eb7db34cba
4 changed files with 131 additions and 82 deletions

View File

@@ -137,9 +137,11 @@ import { buildEmbeddedMessageActionDiscoveryInput } from "./message-action-disco
import { readPiModelContextTokens } from "./model-context-tokens.js";
import { resolveModelAsync } from "./model.js";
import { sanitizeSessionHistory, validateReplayTurns } from "./replay-history.js";
import { shouldUseOpenAIWebSocketTransport } from "./run/attempt.thread-helpers.js";
import { buildEmbeddedSandboxInfo } from "./sandbox-info.js";
import { resolveEmbeddedRunSkillEntries } from "./skills-runtime.js";
import {
resolveEmbeddedAgentApiKey,
resolveEmbeddedAgentBaseStreamFn,
resolveEmbeddedAgentStreamFn,
} from "./stream-resolution.js";
@@ -182,6 +184,8 @@ function createCompactionDiagId(): string {
function prepareCompactionSessionAgent(params: {
session: { agent: { streamFn?: unknown } };
providerStreamFn: unknown;
shouldUseWebSocketTransport: boolean;
wsApiKey?: string;
sessionId: string;
signal: AbortSignal;
effectiveModel: ProviderRuntimeModel;
@@ -199,6 +203,8 @@ function prepareCompactionSessionAgent(params: {
params.session.agent.streamFn = resolveEmbeddedAgentStreamFn({
currentStreamFn: resolveEmbeddedAgentBaseStreamFn({ session: params.session as never }),
providerStreamFn: params.providerStreamFn as never,
shouldUseWebSocketTransport: params.shouldUseWebSocketTransport,
wsApiKey: params.wsApiKey,
sessionId: params.sessionId,
signal: params.signal,
model: params.effectiveModel,
@@ -590,7 +596,7 @@ async function compactEmbeddedPiSessionDirectOnce(
: resolvedWorkspace;
await fs.mkdir(effectiveWorkspace, { recursive: true });
await ensureSessionHeader({
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
sessionId: params.sessionId,
cwd: effectiveWorkspace,
agentId: earlyAgentIds.sessionAgentId,
@@ -717,7 +723,6 @@ async function compactEmbeddedPiSessionDirectOnce(
workspaceDir: effectiveWorkspace,
config: params.config,
abortSignal: runAbortController.signal,
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
modelProvider: model.provider,
modelId,
modelCompat: extractModelCompat(effectiveModel),
@@ -945,14 +950,14 @@ async function compactEmbeddedPiSessionDirectOnce(
try {
await repairTranscriptStateIfNeeded({
transcriptPath: params.sessionFile,
transcriptLocator: params.transcriptLocator,
debug: (message) => log.debug(message),
warn: (message) => log.warn(message),
});
const transcriptPolicy = runtimePlan.transcript.resolvePolicy(runtimePlanModelContext);
const sessionManager = guardSessionManager(
openTranscriptSessionManager({
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
sessionId: params.sessionId,
cwd: effectiveWorkspace,
}),
@@ -975,7 +980,7 @@ async function compactEmbeddedPiSessionDirectOnce(
checkpointSnapshot = await captureCompactionCheckpointSnapshotAsync({
agentId: sessionAgentId,
sessionManager,
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
});
compactionSessionManager = sessionManager;
const settingsManager = createPreparedEmbeddedPiSettingsManager({
@@ -1041,6 +1046,23 @@ async function compactEmbeddedPiSessionDirectOnce(
agentDir,
effectiveWorkspace,
});
const shouldUseWebSocketTransport = shouldUseOpenAIWebSocketTransport({
provider,
modelApi: effectiveModel.api,
modelBaseUrl: effectiveModel.baseUrl,
});
const wsApiKey = shouldUseWebSocketTransport
? await resolveEmbeddedAgentApiKey({
provider,
resolvedApiKey: hasRuntimeAuthExchange ? undefined : apiKeyInfo?.apiKey,
authStorage,
})
: undefined;
if (shouldUseWebSocketTransport && !wsApiKey) {
log.warn(
`[ws-stream] no API key for provider=${provider}; keeping compaction HTTP transport`,
);
}
while (true) {
// Rebuild the compaction session on retry so provider wrappers, payload
// shaping, and the embedded system prompt all reflect the fallback level.
@@ -1068,6 +1090,8 @@ async function compactEmbeddedPiSessionDirectOnce(
prepareCompactionSessionAgent({
session,
providerStreamFn,
shouldUseWebSocketTransport,
wsApiKey,
sessionId: params.sessionId,
signal: runAbortController.signal,
effectiveModel,
@@ -1224,7 +1248,7 @@ async function compactEmbeddedPiSessionDirectOnce(
if (params.trigger === "manual") {
try {
const hardenedBoundary = await hardenManualCompactionBoundary({
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
preserveRecentTail:
typeof params.config?.agents?.defaults?.compaction?.keepRecentTokens === "number",
});
@@ -1257,7 +1281,7 @@ async function compactEmbeddedPiSessionDirectOnce(
transcriptRotation = await rotateTranscriptAfterCompaction({
sessionManager: transcriptRotationSessionManager,
agentId: sessionAgentId,
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
});
} catch (err) {
log.warn("[compaction] post-compaction transcript rotation failed", {
@@ -1267,7 +1291,8 @@ async function compactEmbeddedPiSessionDirectOnce(
}
}
const activeSessionId = transcriptRotation.sessionId ?? params.sessionId;
const activeSessionFile = transcriptRotation.sessionFile ?? params.sessionFile;
const activeTranscriptLocator =
transcriptRotation.transcriptLocator ?? params.transcriptLocator;
const activePostLeafId = transcriptRotation.leafId ?? postCompactionLeafId;
if (transcriptRotation.rotated) {
log.info(
@@ -1280,7 +1305,7 @@ async function compactEmbeddedPiSessionDirectOnce(
agentId: sessionAgentId,
sessionId: activeSessionId,
sessionKey: params.sessionKey,
sessionFile: activeSessionFile,
transcriptLocator: activeTranscriptLocator,
});
if (params.config && params.sessionKey && checkpointSnapshot) {
try {
@@ -1296,7 +1321,7 @@ async function compactEmbeddedPiSessionDirectOnce(
firstKeptEntryId: effectiveFirstKeptEntryId,
tokensBefore: observedTokenCount ?? result.tokensBefore,
tokensAfter,
postSessionFile: activeSessionFile,
postTranscriptLocator: activeTranscriptLocator,
postLeafId: activePostLeafId,
postEntryId: activePostLeafId,
createdAt: compactStartedAt,
@@ -1336,7 +1361,7 @@ async function compactEmbeddedPiSessionDirectOnce(
messageCountAfter,
tokensAfter,
compactedCount,
sessionFile: activeSessionFile,
transcriptLocator: activeTranscriptLocator,
summaryLength: typeof result.summary === "string" ? result.summary.length : undefined,
tokensBefore: result.tokensBefore,
firstKeptEntryId: effectiveFirstKeptEntryId,
@@ -1352,7 +1377,7 @@ async function compactEmbeddedPiSessionDirectOnce(
tokensAfter,
details: result.details,
sessionId: transcriptRotation.sessionId,
sessionFile: transcriptRotation.sessionFile,
transcriptLocator: transcriptRotation.transcriptLocator,
},
};
} catch (err) {

View File

@@ -91,6 +91,8 @@ import { stripHistoricalRuntimeContextCustomMessages } from "../../internal-runt
import { resolveModelAuthMode } from "../../model-auth.js";
import { resolveDefaultModelForAgent } from "../../model-selection.js";
import { supportsModelTools } from "../../model-tool-support.js";
import { releaseWsSession } from "../../openai-ws-stream.js";
import { resolveOwnerDisplaySetting } from "../../owner-display.js";
import { createBundleLspToolRuntime } from "../../pi-bundle-lsp-runtime.js";
import {
getOrCreateSessionMcpRuntime,
@@ -160,7 +162,6 @@ import {
import { resolveSystemPromptOverride } from "../../system-prompt-override.js";
import { buildSystemPromptParams } from "../../system-prompt-params.js";
import { buildSystemPromptReport } from "../../system-prompt-report.js";
import { appendModelIdentitySystemPrompt } from "../../system-prompt.js";
import { resolveAgentTimeoutMs } from "../../timeout.js";
import {
buildEmptyExplicitToolAllowlistError,
@@ -318,6 +319,7 @@ import {
composeSystemPromptWithHookContext,
resolveAttemptSpawnWorkspaceDir,
shouldPersistCompletedBootstrapTurn,
shouldUseOpenAIWebSocketTransportForAttempt,
} from "./attempt.thread-helpers.js";
import {
shouldRepairMalformedToolCallArguments,
@@ -1060,7 +1062,6 @@ export async function runEmbeddedAttempt(
modelHasVision: params.model.input?.includes("image") ?? false,
requireExplicitMessageTarget:
params.requireExplicitMessageTarget ?? isSubagentSessionKey(params.sessionKey),
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
disableMessageTool: params.disableMessageTool,
agentFilesystem: params.agentFilesystem,
forceMessageTool: params.forceMessageTool,
@@ -1129,7 +1130,7 @@ export async function runEmbeddedAttempt(
bootstrapContextMode: params.bootstrapContextMode,
bootstrapContextRunKind: params.bootstrapContextRunKind ?? "default",
bootstrapMode,
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
hasCompletedBootstrapTranscriptTurn,
resolveBootstrapContextForRun: async () => {
const bootstrapFiles =
@@ -1600,11 +1601,11 @@ export async function runEmbeddedAttempt(
let buildAbortSettlePromise: () => Promise<void> | null = () => null;
try {
await repairTranscriptStateIfNeeded({
transcriptPath: params.sessionFile,
transcriptLocator: params.transcriptLocator,
debug: (message) => log.debug(message),
warn: (message) => log.warn(message),
});
const hadSessionFile = hasSqliteSessionTranscriptEvents({
const hadTranscriptLocator = hasSqliteSessionTranscriptEvents({
agentId: sessionAgentId,
sessionId: params.sessionId,
});
@@ -1620,7 +1621,7 @@ export async function runEmbeddedAttempt(
sessionManager = guardSessionManager(
openTranscriptSessionManager({
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
sessionId: params.sessionId,
cwd: effectiveWorkspace,
}),
@@ -1646,11 +1647,11 @@ export async function runEmbeddedAttempt(
},
);
await runAttemptContextEngineBootstrap({
hadSessionFile,
hadTranscriptLocator,
contextEngine: activeContextEngine,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
sessionManager,
runtimeContext: buildAfterTurnRuntimeContext({
attempt: params,
@@ -1666,7 +1667,7 @@ export async function runEmbeddedAttempt(
sessionAgentId,
sessionId: contextParams.sessionId,
sessionKey: contextParams.sessionKey,
sessionFile: contextParams.sessionFile,
transcriptLocator: contextParams.transcriptLocator,
reason: contextParams.reason,
sessionManager: contextParams.sessionManager as never,
runtimeContext: contextParams.runtimeContext,
@@ -1895,7 +1896,6 @@ export async function runEmbeddedAttempt(
await baseConvertToLlm(normalizeMessagesForLlmBoundary(messages));
}
let prePromptMessageCount = activeSession.messages.length;
let contextEngineAfterTurnCheckpoint: number | null = null;
let unwindowedContextEngineMessagesForPrecheck: AgentMessage[] | undefined;
let contextEnginePromptAuthority: NonNullable<AssembleResult["promptAuthority"]> =
"assembled";
@@ -1979,13 +1979,10 @@ export async function runEmbeddedAttempt(
contextEngine: activeContextEngine,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
tokenBudget: params.contextTokenBudget,
modelId: params.modelId,
getPrePromptMessageCount: () => prePromptMessageCount,
onAfterTurnCheckpoint: (messageCount) => {
contextEngineAfterTurnCheckpoint = messageCount;
},
getRuntimeContext: ({ messages, prePromptMessageCount: loopPrePromptMessageCount }) =>
buildAfterTurnRuntimeContext({
attempt: params,
@@ -2041,7 +2038,7 @@ export async function runEmbeddedAttempt(
runId: params.runId,
sessionId: activeSession.sessionId,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
provider: params.provider,
modelId: params.modelId,
modelApi: params.model.api,
@@ -2050,7 +2047,7 @@ export async function runEmbeddedAttempt(
});
trajectoryRecorder?.recordEvent("session.started", {
trigger: params.trigger,
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
workspaceDir: effectiveWorkspace,
agentId: sessionAgentId,
messageProvider: params.messageProvider,
@@ -2064,7 +2061,7 @@ export async function runEmbeddedAttempt(
env: process.env,
config: params.config,
workspaceDir: effectiveWorkspace,
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
sessionKey: params.sessionKey,
agentId: sessionAgentId,
trigger: params.trigger,
@@ -2133,15 +2130,38 @@ export async function runEmbeddedAttempt(
agentDir,
workspaceDir: effectiveWorkspace,
});
const shouldUseWebSocketTransport = shouldUseOpenAIWebSocketTransportForAttempt({
provider: params.provider,
modelApi: params.model.api,
modelBaseUrl: params.model.baseUrl,
streamParams: params.streamParams,
effectiveExtraParams,
modelParams: (params.model as { params?: Record<string, unknown> }).params,
});
const wsApiKey = shouldUseWebSocketTransport
? await resolveEmbeddedAgentApiKey({
provider: params.provider,
resolvedApiKey: params.resolvedApiKey,
authStorage: params.authStorage,
})
: undefined;
if (shouldUseWebSocketTransport && !wsApiKey) {
log.warn(
`[ws-stream] no API key for provider=${params.provider}; keeping session-managed HTTP transport`,
);
}
const streamStrategy = describeEmbeddedAgentStreamStrategy({
currentStreamFn: defaultSessionStreamFn,
providerStreamFn,
shouldUseWebSocketTransport,
wsApiKey,
model: params.model,
resolvedApiKey: params.resolvedApiKey,
});
activeSession.agent.streamFn = resolveEmbeddedAgentStreamFn({
currentStreamFn: defaultSessionStreamFn,
providerStreamFn,
shouldUseWebSocketTransport,
wsApiKey,
sessionId: params.sessionId,
signal: runAbortController.signal,
model: params.model,
@@ -2827,7 +2847,7 @@ export async function runEmbeddedAttempt(
let messagesSnapshot: AgentMessage[] = [];
let sessionIdUsed = activeSession.sessionId;
let sessionFileUsed: string | undefined = params.sessionFile;
let transcriptLocatorUsed: string | undefined = params.transcriptLocator;
const onAbort = () => {
externalAbort = true;
const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined;
@@ -2871,7 +2891,7 @@ export async function runEmbeddedAttempt(
`effectiveReserveTokens=${request.effectiveReserveTokens} ` +
`prePromptMessageCount=${prePromptMessageCount} ` +
(extra ? `${extra} ` : "") +
`sessionFile=${params.sessionFile}`,
`transcriptLocator=${params.transcriptLocator}`,
);
};
if (request.route === "truncate_tool_results_only") {
@@ -2886,7 +2906,7 @@ export async function runEmbeddedAttempt(
contextWindowTokens: contextTokenBudget,
maxCharsOverride: toolResultMaxChars,
agentId: sessionAgentId,
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
});
@@ -2994,14 +3014,6 @@ export async function runEmbeddedAttempt(
);
}
}
const modelAwareSystemPrompt = appendModelIdentitySystemPrompt({
systemPrompt: systemPromptText,
model: runtimeInfo.model,
});
if (modelAwareSystemPrompt !== systemPromptText) {
applySystemPromptOverrideToSession(activeSession, modelAwareSystemPrompt);
systemPromptText = modelAwareSystemPrompt;
}
if (cacheObservabilityEnabled) {
const cacheObservation = beginPromptCacheObservation({
@@ -3377,7 +3389,7 @@ export async function runEmbeddedAttempt(
`historyImageBlocks=${sessionSummary.totalImageBlocks} ` +
`systemPromptChars=${systemLen} promptChars=${promptLen} ` +
`promptImages=${imageResult.images.length} ` +
`provider=${params.provider}/${params.modelId} sessionFile=${params.sessionFile}`,
`provider=${params.provider}/${params.modelId} transcriptLocator=${params.transcriptLocator}`,
);
}
@@ -3440,7 +3452,7 @@ export async function runEmbeddedAttempt(
contextWindowTokens: contextTokenBudget,
maxCharsOverride: toolResultMaxChars,
agentId: sessionAgentId,
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
});
@@ -3459,7 +3471,7 @@ export async function runEmbeddedAttempt(
`overflowTokens=${preemptiveCompaction.overflowTokens} ` +
`toolResultReducibleChars=${preemptiveCompaction.toolResultReducibleChars} ` +
`effectiveReserveTokens=${preemptiveCompaction.effectiveReserveTokens} ` +
`sessionFile=${params.sessionFile}`,
`transcriptLocator=${params.transcriptLocator}`,
);
skipPromptSubmission = true;
}
@@ -3467,7 +3479,7 @@ export async function runEmbeddedAttempt(
log.warn(
`[context-overflow-precheck] early tool-result truncation did not help for ` +
`${params.provider}/${params.modelId}; falling back to compaction ` +
`reason=${truncationResult.reason ?? "unknown"} sessionFile=${params.sessionFile}`,
`reason=${truncationResult.reason ?? "unknown"} transcriptLocator=${params.transcriptLocator}`,
);
preflightRecovery = { route: "compact_only" };
promptError = new Error(PREEMPTIVE_OVERFLOW_ERROR_TEXT);
@@ -3492,7 +3504,7 @@ export async function runEmbeddedAttempt(
`toolResultReducibleChars=${preemptiveCompaction.toolResultReducibleChars} ` +
`reserveTokens=${reserveTokens} ` +
`effectiveReserveTokens=${preemptiveCompaction.effectiveReserveTokens} ` +
`sessionFile=${params.sessionFile}`,
`transcriptLocator=${params.transcriptLocator}`,
);
skipPromptSubmission = true;
}
@@ -3761,9 +3773,9 @@ export async function runEmbeddedAttempt(
yieldAborted,
sessionIdUsed,
sessionKey: params.sessionKey,
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
messagesSnapshot,
prePromptMessageCount: contextEngineAfterTurnCheckpoint ?? prePromptMessageCount,
prePromptMessageCount,
tokenBudget: params.contextTokenBudget,
runtimeContext: afterTurnRuntimeContext,
runMaintenance: async (contextParams) =>
@@ -3772,7 +3784,7 @@ export async function runEmbeddedAttempt(
sessionAgentId,
sessionId: contextParams.sessionId,
sessionKey: contextParams.sessionKey,
sessionFile: contextParams.sessionFile,
transcriptLocator: contextParams.transcriptLocator,
reason: contextParams.reason,
sessionManager: contextParams.sessionManager as never,
runtimeContext: contextParams.runtimeContext,
@@ -3818,11 +3830,11 @@ export async function runEmbeddedAttempt(
const rotation = await rotateTranscriptAfterCompaction({
sessionManager,
agentId: params.agentId,
sessionFile: params.sessionFile,
transcriptLocator: params.transcriptLocator,
});
if (rotation.rotated) {
sessionIdUsed = rotation.sessionId ?? sessionIdUsed;
sessionFileUsed = rotation.sessionFile ?? sessionFileUsed;
transcriptLocatorUsed = rotation.transcriptLocator ?? transcriptLocatorUsed;
log.info(
`[compaction] rotated active transcript after automatic compaction ` +
`(sessionKey=${params.sessionKey ?? params.sessionId})`,
@@ -4074,7 +4086,7 @@ export async function runEmbeddedAttempt(
promptErrorSource,
preflightRecovery,
sessionIdUsed,
sessionFileUsed,
transcriptLocatorUsed,
diagnosticTrace,
bootstrapPromptWarningSignaturesSeen: bootstrapPromptWarning.warningSignaturesSeen,
bootstrapPromptWarningSignature: bootstrapPromptWarning.signature,
@@ -4157,6 +4169,10 @@ export async function runEmbeddedAttempt(
flushPendingToolResultsAfterIdle,
session,
sessionManager,
releaseWsSession,
allowWsSessionPool:
!promptError && !aborted && !timedOut && !idleTimedOut && !timedOutDuringCompaction,
sessionId: params.sessionId,
bundleMcpRuntime,
bundleLspRuntime,
aborted: cleanupAborted,

View File

@@ -2,6 +2,7 @@ import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { createSqliteSessionTranscriptLocator } from "../config/sessions/paths.js";
import {
exportSqliteSessionTranscriptJsonl,
replaceSqliteSessionTranscriptEvents,
@@ -37,12 +38,20 @@ const tempDirs: string[] = [];
async function createTempSessionPath() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-repair-"));
tempDirs.push(dir);
return { dir, file: path.join(dir, "session.jsonl") };
vi.stubEnv("OPENCLAW_STATE_DIR", dir);
return {
dir,
file: createSqliteSessionTranscriptLocator({
agentId: "main",
sessionId: "session-1",
}),
};
}
afterEach(async () => {
closeOpenClawAgentDatabasesForTest();
closeOpenClawStateDatabaseForTest();
vi.unstubAllEnvs();
await Promise.all(tempDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true })));
});
@@ -55,7 +64,7 @@ function writeTranscriptEvents(file: string, events: unknown[]) {
(event as { type?: unknown }).type === "session" &&
typeof (event as { id?: unknown }).id === "string",
),
)?.id ?? path.basename(file, ".jsonl");
)?.id ?? "session-1";
replaceSqliteSessionTranscriptEvents({
agentId: "main",
sessionId,
@@ -80,7 +89,7 @@ describe("repairTranscriptStateIfNeeded", () => {
{ type: "message", id: "corrupt", message: { role: null, content: "bad" } },
]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(true);
expect(result.droppedLines).toBe(1);
@@ -99,7 +108,7 @@ describe("repairTranscriptStateIfNeeded", () => {
writeTranscriptEvents(file, [badHeader]);
const warn = vi.fn();
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file, warn });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file, warn });
expect(result.repaired).toBe(false);
expect(result.reason).toBe("invalid session header");
@@ -111,7 +120,7 @@ describe("repairTranscriptStateIfNeeded", () => {
const { dir } = await createTempSessionPath();
const warn = vi.fn();
const result = await repairTranscriptStateIfNeeded({ transcriptPath: dir, warn });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: dir, warn });
expect(result.repaired).toBe(false);
expect(result.reason).toBe("missing SQLite transcript");
@@ -148,7 +157,7 @@ describe("repairTranscriptStateIfNeeded", () => {
writeTranscriptEvents(file, [header, message, poisonedAssistantEntry, followUp]);
const debug = vi.fn();
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file, debug });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file, debug });
expect(result.repaired).toBe(true);
expect(result.droppedLines).toBe(0);
@@ -185,7 +194,7 @@ describe("repairTranscriptStateIfNeeded", () => {
writeTranscriptEvents(file, [header, blankUserEntry, message]);
const debug = vi.fn();
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file, debug });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file, debug });
expect(result.repaired).toBe(true);
expect(result.rewrittenUserMessages).toBe(1);
@@ -217,7 +226,7 @@ describe("repairTranscriptStateIfNeeded", () => {
};
writeTranscriptEvents(file, [header, blankStringUserEntry, message]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(true);
expect(result.rewrittenUserMessages).toBe(1);
@@ -247,7 +256,7 @@ describe("repairTranscriptStateIfNeeded", () => {
};
writeTranscriptEvents(file, [header, mediaUserEntry]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(true);
expect(result.rewrittenUserMessages).toBe(1);
@@ -283,7 +292,7 @@ describe("repairTranscriptStateIfNeeded", () => {
]);
const debug = vi.fn();
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file, debug });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file, debug });
expect(result.repaired).toBe(true);
expect(result.droppedLines).toBe(1);
@@ -321,7 +330,7 @@ describe("repairTranscriptStateIfNeeded", () => {
};
writeTranscriptEvents(file, [header, silentReplyEntry, followUp]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(false);
expect(result.rewrittenAssistantMessages ?? 0).toBe(0);
@@ -346,7 +355,7 @@ describe("repairTranscriptStateIfNeeded", () => {
};
writeTranscriptEvents(file, [header, message, assistantEntry]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(false);
@@ -382,7 +391,7 @@ describe("repairTranscriptStateIfNeeded", () => {
};
writeTranscriptEvents(file, [header, message, assistantEntry1, assistantEntry2]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(false);
@@ -414,7 +423,7 @@ describe("repairTranscriptStateIfNeeded", () => {
};
writeTranscriptEvents(file, [header, message, assistantEntry, userFollowUp]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(false);
});
@@ -438,7 +447,7 @@ describe("repairTranscriptStateIfNeeded", () => {
};
writeTranscriptEvents(file, [header, message, toolCallAssistant]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(false);
const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(toolCallAssistant)}\n`;
@@ -473,7 +482,7 @@ describe("repairTranscriptStateIfNeeded", () => {
};
writeTranscriptEvents(file, [header, message, toolCallAssistant, plainAssistant]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(false);
@@ -527,7 +536,7 @@ describe("repairTranscriptStateIfNeeded", () => {
};
writeTranscriptEvents(file, [header, message, toolCallAssistant, toolResult, finalAssistant]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(false);
@@ -552,7 +561,7 @@ describe("repairTranscriptStateIfNeeded", () => {
};
writeTranscriptEvents(file, [header, assistantEntry]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(false);
@@ -589,7 +598,7 @@ describe("repairTranscriptStateIfNeeded", () => {
};
writeTranscriptEvents(file, [header, healedEntry, followUp]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(false);
expect(result.rewrittenAssistantMessages ?? 0).toBe(0);
@@ -626,7 +635,7 @@ describe("repairTranscriptStateIfNeeded", () => {
writeTranscriptEvents(file, [header, message, nullRoleEntry, missingRoleEntry, emptyRoleEntry]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(true);
expect(result.droppedLines).toBe(3);
@@ -659,7 +668,7 @@ describe("repairTranscriptStateIfNeeded", () => {
writeTranscriptEvents(file, [header, message, missingMessage, stringMessage]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(true);
expect(result.droppedLines).toBe(2);
@@ -689,7 +698,7 @@ describe("repairTranscriptStateIfNeeded", () => {
writeTranscriptEvents(file, [header, message, summary, custom]);
const result = await repairTranscriptStateIfNeeded({ transcriptPath: file });
const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file });
expect(result.repaired).toBe(false);
expect(result.droppedLines).toBe(0);

View File

@@ -1,4 +1,3 @@
import path from "node:path";
import {
loadSqliteSessionTranscriptEvents,
replaceSqliteSessionTranscriptEvents,
@@ -186,16 +185,16 @@ function buildRepairSummaryParts(params: {
}
export async function repairTranscriptStateIfNeeded(params: {
transcriptPath: string;
transcriptLocator: string;
debug?: (message: string) => void;
warn?: (message: string) => void;
}): Promise<RepairReport> {
const transcriptPath = params.transcriptPath.trim();
if (!transcriptPath) {
const transcriptLocator = params.transcriptLocator.trim();
if (!transcriptLocator) {
return { repaired: false, droppedLines: 0, reason: "missing session transcript" };
}
const scope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath });
const scope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath: transcriptLocator });
if (!scope) {
return { repaired: false, droppedLines: 0, reason: "missing SQLite transcript" };
}
@@ -246,7 +245,7 @@ export async function repairTranscriptStateIfNeeded(params: {
if (!isSessionHeader(entries[0])) {
params.warn?.(
`session transcript repair skipped: invalid session header (${path.basename(transcriptPath)})`,
`session transcript repair skipped: invalid session header (${transcriptLocator})`,
);
return { repaired: false, droppedLines, reason: "invalid session header" };
}
@@ -263,7 +262,7 @@ export async function repairTranscriptStateIfNeeded(params: {
try {
replaceSqliteSessionTranscriptEvents({
...scope,
transcriptPath,
transcriptPath: transcriptLocator,
events: entries,
});
} catch (err) {
@@ -283,7 +282,7 @@ export async function repairTranscriptStateIfNeeded(params: {
rewrittenAssistantMessages,
droppedBlankUserMessages,
rewrittenUserMessages,
})} (${path.basename(transcriptPath)})`,
})} (${transcriptLocator})`,
);
return {
repaired: true,