From eb7db34cba56e145ea778c054642cfffe7059480 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 9 May 2026 16:44:58 +0100 Subject: [PATCH] refactor: repair transcripts by sqlite locator --- src/agents/pi-embedded-runner/compact.ts | 49 +++++++--- src/agents/pi-embedded-runner/run/attempt.ts | 96 ++++++++++++-------- src/agents/transcript-state-repair.test.ts | 53 ++++++----- src/agents/transcript-state-repair.ts | 15 ++- 4 files changed, 131 insertions(+), 82 deletions(-) diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 630878814a1..766a7d2ad1d 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -137,9 +137,11 @@ import { buildEmbeddedMessageActionDiscoveryInput } from "./message-action-disco import { readPiModelContextTokens } from "./model-context-tokens.js"; import { resolveModelAsync } from "./model.js"; import { sanitizeSessionHistory, validateReplayTurns } from "./replay-history.js"; +import { shouldUseOpenAIWebSocketTransport } from "./run/attempt.thread-helpers.js"; import { buildEmbeddedSandboxInfo } from "./sandbox-info.js"; import { resolveEmbeddedRunSkillEntries } from "./skills-runtime.js"; import { + resolveEmbeddedAgentApiKey, resolveEmbeddedAgentBaseStreamFn, resolveEmbeddedAgentStreamFn, } from "./stream-resolution.js"; @@ -182,6 +184,8 @@ function createCompactionDiagId(): string { function prepareCompactionSessionAgent(params: { session: { agent: { streamFn?: unknown } }; providerStreamFn: unknown; + shouldUseWebSocketTransport: boolean; + wsApiKey?: string; sessionId: string; signal: AbortSignal; effectiveModel: ProviderRuntimeModel; @@ -199,6 +203,8 @@ function prepareCompactionSessionAgent(params: { params.session.agent.streamFn = resolveEmbeddedAgentStreamFn({ currentStreamFn: resolveEmbeddedAgentBaseStreamFn({ session: params.session as never }), providerStreamFn: params.providerStreamFn as never, + shouldUseWebSocketTransport: params.shouldUseWebSocketTransport, + wsApiKey: params.wsApiKey, sessionId: params.sessionId, signal: params.signal, model: params.effectiveModel, @@ -590,7 +596,7 @@ async function compactEmbeddedPiSessionDirectOnce( : resolvedWorkspace; await fs.mkdir(effectiveWorkspace, { recursive: true }); await ensureSessionHeader({ - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, sessionId: params.sessionId, cwd: effectiveWorkspace, agentId: earlyAgentIds.sessionAgentId, @@ -717,7 +723,6 @@ async function compactEmbeddedPiSessionDirectOnce( workspaceDir: effectiveWorkspace, config: params.config, abortSignal: runAbortController.signal, - sourceReplyDeliveryMode: params.sourceReplyDeliveryMode, modelProvider: model.provider, modelId, modelCompat: extractModelCompat(effectiveModel), @@ -945,14 +950,14 @@ async function compactEmbeddedPiSessionDirectOnce( try { await repairTranscriptStateIfNeeded({ - transcriptPath: params.sessionFile, + transcriptLocator: params.transcriptLocator, debug: (message) => log.debug(message), warn: (message) => log.warn(message), }); const transcriptPolicy = runtimePlan.transcript.resolvePolicy(runtimePlanModelContext); const sessionManager = guardSessionManager( openTranscriptSessionManager({ - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, sessionId: params.sessionId, cwd: effectiveWorkspace, }), @@ -975,7 +980,7 @@ async function compactEmbeddedPiSessionDirectOnce( checkpointSnapshot = await captureCompactionCheckpointSnapshotAsync({ agentId: sessionAgentId, sessionManager, - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, }); compactionSessionManager = sessionManager; const settingsManager = createPreparedEmbeddedPiSettingsManager({ @@ -1041,6 +1046,23 @@ async function compactEmbeddedPiSessionDirectOnce( agentDir, effectiveWorkspace, }); + const shouldUseWebSocketTransport = shouldUseOpenAIWebSocketTransport({ + provider, + modelApi: effectiveModel.api, + modelBaseUrl: effectiveModel.baseUrl, + }); + const wsApiKey = shouldUseWebSocketTransport + ? await resolveEmbeddedAgentApiKey({ + provider, + resolvedApiKey: hasRuntimeAuthExchange ? undefined : apiKeyInfo?.apiKey, + authStorage, + }) + : undefined; + if (shouldUseWebSocketTransport && !wsApiKey) { + log.warn( + `[ws-stream] no API key for provider=${provider}; keeping compaction HTTP transport`, + ); + } while (true) { // Rebuild the compaction session on retry so provider wrappers, payload // shaping, and the embedded system prompt all reflect the fallback level. @@ -1068,6 +1090,8 @@ async function compactEmbeddedPiSessionDirectOnce( prepareCompactionSessionAgent({ session, providerStreamFn, + shouldUseWebSocketTransport, + wsApiKey, sessionId: params.sessionId, signal: runAbortController.signal, effectiveModel, @@ -1224,7 +1248,7 @@ async function compactEmbeddedPiSessionDirectOnce( if (params.trigger === "manual") { try { const hardenedBoundary = await hardenManualCompactionBoundary({ - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, preserveRecentTail: typeof params.config?.agents?.defaults?.compaction?.keepRecentTokens === "number", }); @@ -1257,7 +1281,7 @@ async function compactEmbeddedPiSessionDirectOnce( transcriptRotation = await rotateTranscriptAfterCompaction({ sessionManager: transcriptRotationSessionManager, agentId: sessionAgentId, - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, }); } catch (err) { log.warn("[compaction] post-compaction transcript rotation failed", { @@ -1267,7 +1291,8 @@ async function compactEmbeddedPiSessionDirectOnce( } } const activeSessionId = transcriptRotation.sessionId ?? params.sessionId; - const activeSessionFile = transcriptRotation.sessionFile ?? params.sessionFile; + const activeTranscriptLocator = + transcriptRotation.transcriptLocator ?? params.transcriptLocator; const activePostLeafId = transcriptRotation.leafId ?? postCompactionLeafId; if (transcriptRotation.rotated) { log.info( @@ -1280,7 +1305,7 @@ async function compactEmbeddedPiSessionDirectOnce( agentId: sessionAgentId, sessionId: activeSessionId, sessionKey: params.sessionKey, - sessionFile: activeSessionFile, + transcriptLocator: activeTranscriptLocator, }); if (params.config && params.sessionKey && checkpointSnapshot) { try { @@ -1296,7 +1321,7 @@ async function compactEmbeddedPiSessionDirectOnce( firstKeptEntryId: effectiveFirstKeptEntryId, tokensBefore: observedTokenCount ?? result.tokensBefore, tokensAfter, - postSessionFile: activeSessionFile, + postTranscriptLocator: activeTranscriptLocator, postLeafId: activePostLeafId, postEntryId: activePostLeafId, createdAt: compactStartedAt, @@ -1336,7 +1361,7 @@ async function compactEmbeddedPiSessionDirectOnce( messageCountAfter, tokensAfter, compactedCount, - sessionFile: activeSessionFile, + transcriptLocator: activeTranscriptLocator, summaryLength: typeof result.summary === "string" ? result.summary.length : undefined, tokensBefore: result.tokensBefore, firstKeptEntryId: effectiveFirstKeptEntryId, @@ -1352,7 +1377,7 @@ async function compactEmbeddedPiSessionDirectOnce( tokensAfter, details: result.details, sessionId: transcriptRotation.sessionId, - sessionFile: transcriptRotation.sessionFile, + transcriptLocator: transcriptRotation.transcriptLocator, }, }; } catch (err) { diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index ded748396b2..6451e32538c 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -91,6 +91,8 @@ import { stripHistoricalRuntimeContextCustomMessages } from "../../internal-runt import { resolveModelAuthMode } from "../../model-auth.js"; import { resolveDefaultModelForAgent } from "../../model-selection.js"; import { supportsModelTools } from "../../model-tool-support.js"; +import { releaseWsSession } from "../../openai-ws-stream.js"; +import { resolveOwnerDisplaySetting } from "../../owner-display.js"; import { createBundleLspToolRuntime } from "../../pi-bundle-lsp-runtime.js"; import { getOrCreateSessionMcpRuntime, @@ -160,7 +162,6 @@ import { import { resolveSystemPromptOverride } from "../../system-prompt-override.js"; import { buildSystemPromptParams } from "../../system-prompt-params.js"; import { buildSystemPromptReport } from "../../system-prompt-report.js"; -import { appendModelIdentitySystemPrompt } from "../../system-prompt.js"; import { resolveAgentTimeoutMs } from "../../timeout.js"; import { buildEmptyExplicitToolAllowlistError, @@ -318,6 +319,7 @@ import { composeSystemPromptWithHookContext, resolveAttemptSpawnWorkspaceDir, shouldPersistCompletedBootstrapTurn, + shouldUseOpenAIWebSocketTransportForAttempt, } from "./attempt.thread-helpers.js"; import { shouldRepairMalformedToolCallArguments, @@ -1060,7 +1062,6 @@ export async function runEmbeddedAttempt( modelHasVision: params.model.input?.includes("image") ?? false, requireExplicitMessageTarget: params.requireExplicitMessageTarget ?? isSubagentSessionKey(params.sessionKey), - sourceReplyDeliveryMode: params.sourceReplyDeliveryMode, disableMessageTool: params.disableMessageTool, agentFilesystem: params.agentFilesystem, forceMessageTool: params.forceMessageTool, @@ -1129,7 +1130,7 @@ export async function runEmbeddedAttempt( bootstrapContextMode: params.bootstrapContextMode, bootstrapContextRunKind: params.bootstrapContextRunKind ?? "default", bootstrapMode, - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, hasCompletedBootstrapTranscriptTurn, resolveBootstrapContextForRun: async () => { const bootstrapFiles = @@ -1600,11 +1601,11 @@ export async function runEmbeddedAttempt( let buildAbortSettlePromise: () => Promise | null = () => null; try { await repairTranscriptStateIfNeeded({ - transcriptPath: params.sessionFile, + transcriptLocator: params.transcriptLocator, debug: (message) => log.debug(message), warn: (message) => log.warn(message), }); - const hadSessionFile = hasSqliteSessionTranscriptEvents({ + const hadTranscriptLocator = hasSqliteSessionTranscriptEvents({ agentId: sessionAgentId, sessionId: params.sessionId, }); @@ -1620,7 +1621,7 @@ export async function runEmbeddedAttempt( sessionManager = guardSessionManager( openTranscriptSessionManager({ - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, sessionId: params.sessionId, cwd: effectiveWorkspace, }), @@ -1646,11 +1647,11 @@ export async function runEmbeddedAttempt( }, ); await runAttemptContextEngineBootstrap({ - hadSessionFile, + hadTranscriptLocator, contextEngine: activeContextEngine, sessionId: params.sessionId, sessionKey: params.sessionKey, - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, sessionManager, runtimeContext: buildAfterTurnRuntimeContext({ attempt: params, @@ -1666,7 +1667,7 @@ export async function runEmbeddedAttempt( sessionAgentId, sessionId: contextParams.sessionId, sessionKey: contextParams.sessionKey, - sessionFile: contextParams.sessionFile, + transcriptLocator: contextParams.transcriptLocator, reason: contextParams.reason, sessionManager: contextParams.sessionManager as never, runtimeContext: contextParams.runtimeContext, @@ -1895,7 +1896,6 @@ export async function runEmbeddedAttempt( await baseConvertToLlm(normalizeMessagesForLlmBoundary(messages)); } let prePromptMessageCount = activeSession.messages.length; - let contextEngineAfterTurnCheckpoint: number | null = null; let unwindowedContextEngineMessagesForPrecheck: AgentMessage[] | undefined; let contextEnginePromptAuthority: NonNullable = "assembled"; @@ -1979,13 +1979,10 @@ export async function runEmbeddedAttempt( contextEngine: activeContextEngine, sessionId: params.sessionId, sessionKey: params.sessionKey, - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, tokenBudget: params.contextTokenBudget, modelId: params.modelId, getPrePromptMessageCount: () => prePromptMessageCount, - onAfterTurnCheckpoint: (messageCount) => { - contextEngineAfterTurnCheckpoint = messageCount; - }, getRuntimeContext: ({ messages, prePromptMessageCount: loopPrePromptMessageCount }) => buildAfterTurnRuntimeContext({ attempt: params, @@ -2041,7 +2038,7 @@ export async function runEmbeddedAttempt( runId: params.runId, sessionId: activeSession.sessionId, sessionKey: params.sessionKey, - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, provider: params.provider, modelId: params.modelId, modelApi: params.model.api, @@ -2050,7 +2047,7 @@ export async function runEmbeddedAttempt( }); trajectoryRecorder?.recordEvent("session.started", { trigger: params.trigger, - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, workspaceDir: effectiveWorkspace, agentId: sessionAgentId, messageProvider: params.messageProvider, @@ -2064,7 +2061,7 @@ export async function runEmbeddedAttempt( env: process.env, config: params.config, workspaceDir: effectiveWorkspace, - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, sessionKey: params.sessionKey, agentId: sessionAgentId, trigger: params.trigger, @@ -2133,15 +2130,38 @@ export async function runEmbeddedAttempt( agentDir, workspaceDir: effectiveWorkspace, }); + const shouldUseWebSocketTransport = shouldUseOpenAIWebSocketTransportForAttempt({ + provider: params.provider, + modelApi: params.model.api, + modelBaseUrl: params.model.baseUrl, + streamParams: params.streamParams, + effectiveExtraParams, + modelParams: (params.model as { params?: Record }).params, + }); + const wsApiKey = shouldUseWebSocketTransport + ? await resolveEmbeddedAgentApiKey({ + provider: params.provider, + resolvedApiKey: params.resolvedApiKey, + authStorage: params.authStorage, + }) + : undefined; + if (shouldUseWebSocketTransport && !wsApiKey) { + log.warn( + `[ws-stream] no API key for provider=${params.provider}; keeping session-managed HTTP transport`, + ); + } const streamStrategy = describeEmbeddedAgentStreamStrategy({ currentStreamFn: defaultSessionStreamFn, providerStreamFn, + shouldUseWebSocketTransport, + wsApiKey, model: params.model, - resolvedApiKey: params.resolvedApiKey, }); activeSession.agent.streamFn = resolveEmbeddedAgentStreamFn({ currentStreamFn: defaultSessionStreamFn, providerStreamFn, + shouldUseWebSocketTransport, + wsApiKey, sessionId: params.sessionId, signal: runAbortController.signal, model: params.model, @@ -2827,7 +2847,7 @@ export async function runEmbeddedAttempt( let messagesSnapshot: AgentMessage[] = []; let sessionIdUsed = activeSession.sessionId; - let sessionFileUsed: string | undefined = params.sessionFile; + let transcriptLocatorUsed: string | undefined = params.transcriptLocator; const onAbort = () => { externalAbort = true; const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined; @@ -2871,7 +2891,7 @@ export async function runEmbeddedAttempt( `effectiveReserveTokens=${request.effectiveReserveTokens} ` + `prePromptMessageCount=${prePromptMessageCount} ` + (extra ? `${extra} ` : "") + - `sessionFile=${params.sessionFile}`, + `transcriptLocator=${params.transcriptLocator}`, ); }; if (request.route === "truncate_tool_results_only") { @@ -2886,7 +2906,7 @@ export async function runEmbeddedAttempt( contextWindowTokens: contextTokenBudget, maxCharsOverride: toolResultMaxChars, agentId: sessionAgentId, - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, sessionId: params.sessionId, sessionKey: params.sessionKey, }); @@ -2994,14 +3014,6 @@ export async function runEmbeddedAttempt( ); } } - const modelAwareSystemPrompt = appendModelIdentitySystemPrompt({ - systemPrompt: systemPromptText, - model: runtimeInfo.model, - }); - if (modelAwareSystemPrompt !== systemPromptText) { - applySystemPromptOverrideToSession(activeSession, modelAwareSystemPrompt); - systemPromptText = modelAwareSystemPrompt; - } if (cacheObservabilityEnabled) { const cacheObservation = beginPromptCacheObservation({ @@ -3377,7 +3389,7 @@ export async function runEmbeddedAttempt( `historyImageBlocks=${sessionSummary.totalImageBlocks} ` + `systemPromptChars=${systemLen} promptChars=${promptLen} ` + `promptImages=${imageResult.images.length} ` + - `provider=${params.provider}/${params.modelId} sessionFile=${params.sessionFile}`, + `provider=${params.provider}/${params.modelId} transcriptLocator=${params.transcriptLocator}`, ); } @@ -3440,7 +3452,7 @@ export async function runEmbeddedAttempt( contextWindowTokens: contextTokenBudget, maxCharsOverride: toolResultMaxChars, agentId: sessionAgentId, - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, sessionId: params.sessionId, sessionKey: params.sessionKey, }); @@ -3459,7 +3471,7 @@ export async function runEmbeddedAttempt( `overflowTokens=${preemptiveCompaction.overflowTokens} ` + `toolResultReducibleChars=${preemptiveCompaction.toolResultReducibleChars} ` + `effectiveReserveTokens=${preemptiveCompaction.effectiveReserveTokens} ` + - `sessionFile=${params.sessionFile}`, + `transcriptLocator=${params.transcriptLocator}`, ); skipPromptSubmission = true; } @@ -3467,7 +3479,7 @@ export async function runEmbeddedAttempt( log.warn( `[context-overflow-precheck] early tool-result truncation did not help for ` + `${params.provider}/${params.modelId}; falling back to compaction ` + - `reason=${truncationResult.reason ?? "unknown"} sessionFile=${params.sessionFile}`, + `reason=${truncationResult.reason ?? "unknown"} transcriptLocator=${params.transcriptLocator}`, ); preflightRecovery = { route: "compact_only" }; promptError = new Error(PREEMPTIVE_OVERFLOW_ERROR_TEXT); @@ -3492,7 +3504,7 @@ export async function runEmbeddedAttempt( `toolResultReducibleChars=${preemptiveCompaction.toolResultReducibleChars} ` + `reserveTokens=${reserveTokens} ` + `effectiveReserveTokens=${preemptiveCompaction.effectiveReserveTokens} ` + - `sessionFile=${params.sessionFile}`, + `transcriptLocator=${params.transcriptLocator}`, ); skipPromptSubmission = true; } @@ -3761,9 +3773,9 @@ export async function runEmbeddedAttempt( yieldAborted, sessionIdUsed, sessionKey: params.sessionKey, - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, messagesSnapshot, - prePromptMessageCount: contextEngineAfterTurnCheckpoint ?? prePromptMessageCount, + prePromptMessageCount, tokenBudget: params.contextTokenBudget, runtimeContext: afterTurnRuntimeContext, runMaintenance: async (contextParams) => @@ -3772,7 +3784,7 @@ export async function runEmbeddedAttempt( sessionAgentId, sessionId: contextParams.sessionId, sessionKey: contextParams.sessionKey, - sessionFile: contextParams.sessionFile, + transcriptLocator: contextParams.transcriptLocator, reason: contextParams.reason, sessionManager: contextParams.sessionManager as never, runtimeContext: contextParams.runtimeContext, @@ -3818,11 +3830,11 @@ export async function runEmbeddedAttempt( const rotation = await rotateTranscriptAfterCompaction({ sessionManager, agentId: params.agentId, - sessionFile: params.sessionFile, + transcriptLocator: params.transcriptLocator, }); if (rotation.rotated) { sessionIdUsed = rotation.sessionId ?? sessionIdUsed; - sessionFileUsed = rotation.sessionFile ?? sessionFileUsed; + transcriptLocatorUsed = rotation.transcriptLocator ?? transcriptLocatorUsed; log.info( `[compaction] rotated active transcript after automatic compaction ` + `(sessionKey=${params.sessionKey ?? params.sessionId})`, @@ -4074,7 +4086,7 @@ export async function runEmbeddedAttempt( promptErrorSource, preflightRecovery, sessionIdUsed, - sessionFileUsed, + transcriptLocatorUsed, diagnosticTrace, bootstrapPromptWarningSignaturesSeen: bootstrapPromptWarning.warningSignaturesSeen, bootstrapPromptWarningSignature: bootstrapPromptWarning.signature, @@ -4157,6 +4169,10 @@ export async function runEmbeddedAttempt( flushPendingToolResultsAfterIdle, session, sessionManager, + releaseWsSession, + allowWsSessionPool: + !promptError && !aborted && !timedOut && !idleTimedOut && !timedOutDuringCompaction, + sessionId: params.sessionId, bundleMcpRuntime, bundleLspRuntime, aborted: cleanupAborted, diff --git a/src/agents/transcript-state-repair.test.ts b/src/agents/transcript-state-repair.test.ts index 96f2110b6d7..61fa050f73c 100644 --- a/src/agents/transcript-state-repair.test.ts +++ b/src/agents/transcript-state-repair.test.ts @@ -2,6 +2,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; +import { createSqliteSessionTranscriptLocator } from "../config/sessions/paths.js"; import { exportSqliteSessionTranscriptJsonl, replaceSqliteSessionTranscriptEvents, @@ -37,12 +38,20 @@ const tempDirs: string[] = []; async function createTempSessionPath() { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-repair-")); tempDirs.push(dir); - return { dir, file: path.join(dir, "session.jsonl") }; + vi.stubEnv("OPENCLAW_STATE_DIR", dir); + return { + dir, + file: createSqliteSessionTranscriptLocator({ + agentId: "main", + sessionId: "session-1", + }), + }; } afterEach(async () => { closeOpenClawAgentDatabasesForTest(); closeOpenClawStateDatabaseForTest(); + vi.unstubAllEnvs(); await Promise.all(tempDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true }))); }); @@ -55,7 +64,7 @@ function writeTranscriptEvents(file: string, events: unknown[]) { (event as { type?: unknown }).type === "session" && typeof (event as { id?: unknown }).id === "string", ), - )?.id ?? path.basename(file, ".jsonl"); + )?.id ?? "session-1"; replaceSqliteSessionTranscriptEvents({ agentId: "main", sessionId, @@ -80,7 +89,7 @@ describe("repairTranscriptStateIfNeeded", () => { { type: "message", id: "corrupt", message: { role: null, content: "bad" } }, ]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(true); expect(result.droppedLines).toBe(1); @@ -99,7 +108,7 @@ describe("repairTranscriptStateIfNeeded", () => { writeTranscriptEvents(file, [badHeader]); const warn = vi.fn(); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file, warn }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file, warn }); expect(result.repaired).toBe(false); expect(result.reason).toBe("invalid session header"); @@ -111,7 +120,7 @@ describe("repairTranscriptStateIfNeeded", () => { const { dir } = await createTempSessionPath(); const warn = vi.fn(); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: dir, warn }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: dir, warn }); expect(result.repaired).toBe(false); expect(result.reason).toBe("missing SQLite transcript"); @@ -148,7 +157,7 @@ describe("repairTranscriptStateIfNeeded", () => { writeTranscriptEvents(file, [header, message, poisonedAssistantEntry, followUp]); const debug = vi.fn(); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file, debug }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file, debug }); expect(result.repaired).toBe(true); expect(result.droppedLines).toBe(0); @@ -185,7 +194,7 @@ describe("repairTranscriptStateIfNeeded", () => { writeTranscriptEvents(file, [header, blankUserEntry, message]); const debug = vi.fn(); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file, debug }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file, debug }); expect(result.repaired).toBe(true); expect(result.rewrittenUserMessages).toBe(1); @@ -217,7 +226,7 @@ describe("repairTranscriptStateIfNeeded", () => { }; writeTranscriptEvents(file, [header, blankStringUserEntry, message]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(true); expect(result.rewrittenUserMessages).toBe(1); @@ -247,7 +256,7 @@ describe("repairTranscriptStateIfNeeded", () => { }; writeTranscriptEvents(file, [header, mediaUserEntry]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(true); expect(result.rewrittenUserMessages).toBe(1); @@ -283,7 +292,7 @@ describe("repairTranscriptStateIfNeeded", () => { ]); const debug = vi.fn(); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file, debug }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file, debug }); expect(result.repaired).toBe(true); expect(result.droppedLines).toBe(1); @@ -321,7 +330,7 @@ describe("repairTranscriptStateIfNeeded", () => { }; writeTranscriptEvents(file, [header, silentReplyEntry, followUp]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(false); expect(result.rewrittenAssistantMessages ?? 0).toBe(0); @@ -346,7 +355,7 @@ describe("repairTranscriptStateIfNeeded", () => { }; writeTranscriptEvents(file, [header, message, assistantEntry]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(false); @@ -382,7 +391,7 @@ describe("repairTranscriptStateIfNeeded", () => { }; writeTranscriptEvents(file, [header, message, assistantEntry1, assistantEntry2]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(false); @@ -414,7 +423,7 @@ describe("repairTranscriptStateIfNeeded", () => { }; writeTranscriptEvents(file, [header, message, assistantEntry, userFollowUp]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(false); }); @@ -438,7 +447,7 @@ describe("repairTranscriptStateIfNeeded", () => { }; writeTranscriptEvents(file, [header, message, toolCallAssistant]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(false); const original = `${JSON.stringify(header)}\n${JSON.stringify(message)}\n${JSON.stringify(toolCallAssistant)}\n`; @@ -473,7 +482,7 @@ describe("repairTranscriptStateIfNeeded", () => { }; writeTranscriptEvents(file, [header, message, toolCallAssistant, plainAssistant]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(false); @@ -527,7 +536,7 @@ describe("repairTranscriptStateIfNeeded", () => { }; writeTranscriptEvents(file, [header, message, toolCallAssistant, toolResult, finalAssistant]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(false); @@ -552,7 +561,7 @@ describe("repairTranscriptStateIfNeeded", () => { }; writeTranscriptEvents(file, [header, assistantEntry]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(false); @@ -589,7 +598,7 @@ describe("repairTranscriptStateIfNeeded", () => { }; writeTranscriptEvents(file, [header, healedEntry, followUp]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(false); expect(result.rewrittenAssistantMessages ?? 0).toBe(0); @@ -626,7 +635,7 @@ describe("repairTranscriptStateIfNeeded", () => { writeTranscriptEvents(file, [header, message, nullRoleEntry, missingRoleEntry, emptyRoleEntry]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(true); expect(result.droppedLines).toBe(3); @@ -659,7 +668,7 @@ describe("repairTranscriptStateIfNeeded", () => { writeTranscriptEvents(file, [header, message, missingMessage, stringMessage]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(true); expect(result.droppedLines).toBe(2); @@ -689,7 +698,7 @@ describe("repairTranscriptStateIfNeeded", () => { writeTranscriptEvents(file, [header, message, summary, custom]); - const result = await repairTranscriptStateIfNeeded({ transcriptPath: file }); + const result = await repairTranscriptStateIfNeeded({ transcriptLocator: file }); expect(result.repaired).toBe(false); expect(result.droppedLines).toBe(0); diff --git a/src/agents/transcript-state-repair.ts b/src/agents/transcript-state-repair.ts index 453a9fff896..06f4d1cb052 100644 --- a/src/agents/transcript-state-repair.ts +++ b/src/agents/transcript-state-repair.ts @@ -1,4 +1,3 @@ -import path from "node:path"; import { loadSqliteSessionTranscriptEvents, replaceSqliteSessionTranscriptEvents, @@ -186,16 +185,16 @@ function buildRepairSummaryParts(params: { } export async function repairTranscriptStateIfNeeded(params: { - transcriptPath: string; + transcriptLocator: string; debug?: (message: string) => void; warn?: (message: string) => void; }): Promise { - const transcriptPath = params.transcriptPath.trim(); - if (!transcriptPath) { + const transcriptLocator = params.transcriptLocator.trim(); + if (!transcriptLocator) { return { repaired: false, droppedLines: 0, reason: "missing session transcript" }; } - const scope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath }); + const scope = resolveSqliteSessionTranscriptScopeForPath({ transcriptPath: transcriptLocator }); if (!scope) { return { repaired: false, droppedLines: 0, reason: "missing SQLite transcript" }; } @@ -246,7 +245,7 @@ export async function repairTranscriptStateIfNeeded(params: { if (!isSessionHeader(entries[0])) { params.warn?.( - `session transcript repair skipped: invalid session header (${path.basename(transcriptPath)})`, + `session transcript repair skipped: invalid session header (${transcriptLocator})`, ); return { repaired: false, droppedLines, reason: "invalid session header" }; } @@ -263,7 +262,7 @@ export async function repairTranscriptStateIfNeeded(params: { try { replaceSqliteSessionTranscriptEvents({ ...scope, - transcriptPath, + transcriptPath: transcriptLocator, events: entries, }); } catch (err) { @@ -283,7 +282,7 @@ export async function repairTranscriptStateIfNeeded(params: { rewrittenAssistantMessages, droppedBlankUserMessages, rewrittenUserMessages, - })} (${path.basename(transcriptPath)})`, + })} (${transcriptLocator})`, ); return { repaired: true,