From ff828639e8be2c8799e8664528e35577fe98a338 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 9 May 2026 09:38:41 +0100 Subject: [PATCH] refactor: remove embedded transcript locator handoff --- docs/refactor/database-first.md | 28 ++-- .../voice-call/src/response-generator.ts | 7 - src/agents/bootstrap-files.ts | 14 +- src/agents/command/attempt-execution.ts | 1 - .../harness/pi-run-worker-policy.test.ts | 1 - src/agents/harness/pi-worker-backend.test.ts | 1 - .../harness/prepared-run-params.test.ts | 2 - src/agents/harness/prepared-run-params.ts | 1 - src/agents/harness/prepared-run.test.ts | 6 - src/agents/harness/prepared-run.ts | 7 - src/agents/harness/worker-launch.test.ts | 2 - .../model-fallback.run-embedded.e2e.test.ts | 24 --- .../pi-embedded-runner.cache.live.test.ts | 5 +- src/agents/pi-embedded-runner.e2e.test.ts | 83 +++++----- ...pi-agent.auth-profile-rotation.e2e.test.ts | 28 ---- .../run.overflow-compaction.harness.ts | 3 +- .../run.overflow-compaction.test.ts | 16 +- src/agents/pi-embedded-runner/run.ts | 21 +-- .../run.worker-launch.test.ts | 16 +- .../run/attempt.context-engine-helpers.ts | 13 +- ....spawn-workspace.context-injection.test.ts | 30 ++-- .../attempt.spawn-workspace.test-support.ts | 21 ++- src/agents/pi-embedded-runner/run/attempt.ts | 62 +++---- src/agents/pi-embedded-runner/run/params.ts | 6 - src/agents/pi-embedded-runner/run/types.ts | 10 +- .../usage-reporting.test.ts | 7 - src/agents/runtime-backend.test.ts | 1 - src/agents/runtime-backend.ts | 2 - src/agents/runtime-worker-permissions.test.ts | 1 - src/agents/runtime-worker.entry.test.ts | 1 - src/agents/runtime-worker.test.ts | 1 - src/agents/transcript-state-repair.ts | 77 ++++++--- src/agents/transcript/session-manager.ts | 69 ++++++++ src/auto-reply/reply/followup-runner.ts | 1 - src/commands/agent.test.ts | 26 ++- src/commands/models/list.probe.ts | 3 - src/commitments/runtime.test.ts | 12 +- src/commitments/runtime.ts | 6 - src/crestodian/assistant.ts | 5 +- src/cron/isolated-agent/run-executor.ts | 1 - src/hooks/llm-slug-generator.ts | 3 - src/talk/agent-consult-runtime.test.ts | 154 ++++++++---------- src/talk/agent-consult-runtime.ts | 6 - 43 files changed, 373 insertions(+), 411 deletions(-) diff --git a/docs/refactor/database-first.md b/docs/refactor/database-first.md index c0642f51385..2b060f76978 100644 --- a/docs/refactor/database-first.md +++ b/docs/refactor/database-first.md @@ -47,12 +47,14 @@ This migration has one canonical runtime shape: - Legacy `sessions.json`, transcript JSONL, `.jsonl.lock`, pruning, truncation, and old session-path logic belong only to the doctor migration/import path. - Runtime startup, hot reply paths, compaction, reset, recovery, diagnostics, - TTS, memory hooks, subagents, and plugin command routing must derive transcript - handles from SQLite identity or pass `{agentId, sessionId}` directly. -- `runEmbeddedPiAgent(...)` and the inner embedded attempt must not honor - file-shaped transcript locator inputs. They derive the SQLite transcript - handle from `{agentId, sessionId}` before worker dispatch or model execution, - so stale callers cannot make the runner write JSON/JSONL transcripts. + TTS, memory hooks, subagents, and plugin command routing must pass + `{agentId, sessionId}` through the runtime. Any string transcript handle is a + boundary artifact, not runtime identity. +- `runEmbeddedPiAgent(...)`, prepared worker runs, and the inner embedded + attempt must not accept transcript locators. They open the SQLite transcript + manager by `{agentId, sessionId}` and pass that manager to the internalized + PI-compatible agent session, so stale callers cannot make the runner write + JSON/JSONL transcripts. - Runner diagnostics must store runtime/cache/payload trace records in SQLite. Runtime diagnostics must not expose JSONL file override knobs; export/debug commands can materialize files explicitly from database rows. @@ -240,17 +242,17 @@ The remaining cleanup is mostly consolidation and deletion: transcript files. Gateway chat/media/history paths read transcript rows from SQLite; JSONL is now a legacy doctor input or in-memory export encoding, not a runtime state file. -- Runtime transcript store APIs resolve SQLite transcript locators, not - filesystem paths. The old `resolve...ForPath` helper and unused - `transcriptPath` write options are gone from runtime callers. +- Runtime transcript store APIs resolve SQLite scope, not filesystem paths. The + old `resolve...ForPath` helper and unused `transcriptPath` write options are + gone from runtime callers. - Runtime session resolution now uses `{agentId, sessionId}`. It may derive a `sqlite-transcript:///` handle for external boundaries, but it must not store that derived value in active session rows. Legacy absolute JSONL paths are doctor migration inputs only. -- `runEmbeddedPiAgent(...)` now treats caller-provided transcript locators as - ignored legacy hints. It derives the active SQLite transcript locator from the - resolved agent id and session id before worker dispatch, attempt creation, or - prepared-run serialization. +- `runEmbeddedPiAgent(...)` no longer has a transcript-locator parameter. + Prepared worker descriptors also omit transcript locators. Runtime session + persistence starts from the resolved agent id and session id, then opens the + SQLite session manager directly. - Gateway transcript-key lookup compares derived SQLite transcript handles at protocol boundaries and no longer realpaths or stats transcript filenames. - Automatic compaction transcript rotation writes successor transcript rows diff --git a/extensions/voice-call/src/response-generator.ts b/extensions/voice-call/src/response-generator.ts index 493b87a2bf0..1d66c6060fc 100644 --- a/extensions/voice-call/src/response-generator.ts +++ b/extensions/voice-call/src/response-generator.ts @@ -5,7 +5,6 @@ import crypto from "node:crypto"; import { applyModelOverrideToSessionEntry } from "openclaw/plugin-sdk/model-session-runtime"; -import { createSqliteSessionTranscriptLocator } from "openclaw/plugin-sdk/session-store-runtime"; import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime"; import type { SessionEntry } from "../api.js"; import { resolveVoiceCallSessionKey, type VoiceCallConfig } from "./config.js"; @@ -256,11 +255,6 @@ export async function generateVoiceResponse( } const sessionId = sessionEntry.sessionId; - const sessionFile = createSqliteSessionTranscriptLocator({ - agentId, - sessionId, - }); - // Resolve thinking level const thinkLevel = agentRuntime.resolveThinkingDefault({ cfg, provider, model }); @@ -293,7 +287,6 @@ export async function generateVoiceResponse( sandboxSessionKey: resolveVoiceSandboxSessionKey(agentId, resolvedSessionKey), agentId, messageProvider: "voice", - sessionFile, workspaceDir, config: cfg, prompt: userMessage, diff --git a/src/agents/bootstrap-files.ts b/src/agents/bootstrap-files.ts index a23f7c0aac5..ed3f68b0c3f 100644 --- a/src/agents/bootstrap-files.ts +++ b/src/agents/bootstrap-files.ts @@ -66,7 +66,19 @@ export async function hasCompletedBootstrapTranscriptTurn( if (!scope) { return false; } - const records = loadSqliteSessionTranscriptEvents(scope) + return hasCompletedBootstrapSessionTurn(scope); +} + +export async function hasCompletedBootstrapSessionTurn(params: { + agentId: string; + sessionId: string; +}): Promise { + const agentId = params.agentId.trim(); + const sessionId = params.sessionId.trim(); + if (!agentId || !sessionId) { + return false; + } + const records = loadSqliteSessionTranscriptEvents({ agentId, sessionId }) .map((entry) => entry.event) .slice(-CONTINUATION_SCAN_MAX_RECORDS); let compactedAfterLatestAssistant = false; diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index d2a67a88f39..07164c8efb5 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -588,7 +588,6 @@ export function runAgentAttempt(params: { replyToMode: params.runContext.replyToMode, hasRepliedRef: params.runContext.hasRepliedRef, senderIsOwner: params.opts.senderIsOwner, - transcriptLocator: params.transcriptLocator, workspaceDir: params.workspaceDir, config: params.cfg, agentHarnessId: requestedAgentHarnessId, diff --git a/src/agents/harness/pi-run-worker-policy.test.ts b/src/agents/harness/pi-run-worker-policy.test.ts index bac6c320332..b6796c14b7b 100644 --- a/src/agents/harness/pi-run-worker-policy.test.ts +++ b/src/agents/harness/pi-run-worker-policy.test.ts @@ -13,7 +13,6 @@ const BASE_PARAMS = { sessionKey: "session-1", model: "gpt-5.5", prompt: "hello", - sessionFile: "sqlite-transcript://agent-1/session-1.jsonl", timeoutMs: 1_000, workspaceDir: "/tmp/openclaw-workspace", } satisfies RunEmbeddedPiAgentParams; diff --git a/src/agents/harness/pi-worker-backend.test.ts b/src/agents/harness/pi-worker-backend.test.ts index f35182465f3..27790e1ecdb 100644 --- a/src/agents/harness/pi-worker-backend.test.ts +++ b/src/agents/harness/pi-worker-backend.test.ts @@ -9,7 +9,6 @@ function createPreparedRun(overrides: Partial = {}): PreparedA agentId: "main", sessionId: "session-pi-worker", sessionKey: "agent:main:thread", - sessionFile: "sqlite-transcript://main/session-pi-worker.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", provider: "openai", diff --git a/src/agents/harness/prepared-run-params.test.ts b/src/agents/harness/prepared-run-params.test.ts index 48852bc57d6..38ca95ea730 100644 --- a/src/agents/harness/prepared-run-params.test.ts +++ b/src/agents/harness/prepared-run-params.test.ts @@ -13,7 +13,6 @@ function createPreparedRun(overrides: Partial = {}): PreparedA agentId: "main", sessionId: "session-rehydrate", sessionKey: "agent:main:thread", - sessionFile: "sqlite-transcript://main/session-rehydrate.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", provider: "openai", @@ -48,7 +47,6 @@ describe("createRunParamsFromPreparedAgentRun", () => { runId: "run-rehydrate", sessionId: "session-rehydrate", sessionKey: "agent:main:thread", - sessionFile: "sqlite-transcript://main/session-rehydrate.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", provider: "openai", diff --git a/src/agents/harness/prepared-run-params.ts b/src/agents/harness/prepared-run-params.ts index 54538a88a49..27c4642cdd3 100644 --- a/src/agents/harness/prepared-run-params.ts +++ b/src/agents/harness/prepared-run-params.ts @@ -108,7 +108,6 @@ export function createRunParamsFromPreparedAgentRun( runId: preparedRun.runId, sessionId: preparedRun.sessionId, ...(preparedRun.sessionKey ? { sessionKey: preparedRun.sessionKey } : {}), - sessionFile: preparedRun.sessionFile, workspaceDir: preparedRun.workspaceDir, ...(preparedRun.agentDir ? { agentDir: preparedRun.agentDir } : {}), ...(preparedRun.config ? { config: preparedRun.config } : {}), diff --git a/src/agents/harness/prepared-run.test.ts b/src/agents/harness/prepared-run.test.ts index 2d1f9829929..8d3ff830f3c 100644 --- a/src/agents/harness/prepared-run.test.ts +++ b/src/agents/harness/prepared-run.test.ts @@ -14,7 +14,6 @@ function createAttempt( runId: "run-prepared", sessionId: "session-prepared", sessionKey: "agent:ops:thread", - sessionFile: "sqlite-transcript://ops/session-prepared.jsonl", workspaceDir: "/tmp/workspace", agentDir: "/tmp/agent", prompt: "hello", @@ -43,7 +42,6 @@ describe("createPreparedAgentRunFromAttempt", () => { agentId: "ops", sessionId: "session-prepared", sessionKey: "agent:ops:thread", - sessionFile: "sqlite-transcript://ops/session-prepared.jsonl", workspaceDir: "/tmp/workspace", agentDir: "/tmp/agent", prompt: "hello", @@ -88,7 +86,6 @@ describe("createPreparedAgentRunFromRunParams", () => { runId: "run-high-level", sessionId: "session-high-level", sessionKey: "agent:ops:thread", - sessionFile: "sqlite-transcript://ops/session-high-level.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", provider: "openai", @@ -136,7 +133,6 @@ describe("createPreparedAgentRunFromRunParams", () => { runId: "run-high-level", sessionId: "session-high-level", sessionKey: "agent:ops:thread", - sessionFile: "sqlite-transcript://ops/session-high-level.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", provider: "openai", @@ -172,7 +168,6 @@ describe("createPreparedAgentRunFromRunParams", () => { createPreparedAgentRunFromRunParams({ runId: "run-high-level", sessionId: "session-high-level", - sessionFile: "sqlite-transcript://main/session-high-level.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 1000, @@ -187,7 +182,6 @@ describe("createSerializableRunParamsSnapshot", () => { const snapshot = createSerializableRunParamsSnapshot({ runId: "run-snapshot", sessionId: "session-snapshot", - sessionFile: "sqlite-transcript://main/session-snapshot.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 1000, diff --git a/src/agents/harness/prepared-run.ts b/src/agents/harness/prepared-run.ts index c987d1c9a6d..a28aa358a89 100644 --- a/src/agents/harness/prepared-run.ts +++ b/src/agents/harness/prepared-run.ts @@ -1,4 +1,3 @@ -import { createSqliteSessionTranscriptLocator } from "../../config/sessions/paths.js"; import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js"; import type { RunEmbeddedPiAgentParams } from "../pi-embedded-runner/run/params.js"; import type { EmbeddedRunAttemptParams } from "../pi-embedded-runner/run/types.js"; @@ -24,7 +23,6 @@ type PreparedRunAttemptShape = Pick< | "provider" | "replyOperation" | "runId" - | "transcriptLocator" | "sessionId" | "sessionKey" | "shouldEmitToolOutput" @@ -45,7 +43,6 @@ type PreparedRunParamsShape = Pick< | "initialVfsEntries" | "replyOperation" | "runId" - | "transcriptLocator" | "sessionId" | "sessionKey" | "shouldEmitToolOutput" @@ -100,10 +97,6 @@ function createPreparedAgentRun( agentId, sessionId: source.sessionId, ...(source.sessionKey ? { sessionKey: source.sessionKey } : {}), - transcriptLocator: createSqliteSessionTranscriptLocator({ - agentId, - sessionId: source.sessionId, - }), workspaceDir: source.workspaceDir, ...(source.agentDir ? { agentDir: source.agentDir } : {}), prompt: source.prompt, diff --git a/src/agents/harness/worker-launch.test.ts b/src/agents/harness/worker-launch.test.ts index 127089e7f06..99137e4d7c3 100644 --- a/src/agents/harness/worker-launch.test.ts +++ b/src/agents/harness/worker-launch.test.ts @@ -12,7 +12,6 @@ function createAttempt( return { sessionId: "session-worker-launch", sessionKey: "agent:main:thread", - sessionFile: "sqlite-transcript://main/session-worker-launch.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 1000, @@ -80,7 +79,6 @@ describe("PI run worker launch request", () => { { sessionId: "session-pi-run", sessionKey: "agent:main:thread", - sessionFile: "sqlite-transcript://main/session-pi-run.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 1000, diff --git a/src/agents/model-fallback.run-embedded.e2e.test.ts b/src/agents/model-fallback.run-embedded.e2e.test.ts index ffecd872692..788207089df 100644 --- a/src/agents/model-fallback.run-embedded.e2e.test.ts +++ b/src/agents/model-fallback.run-embedded.e2e.test.ts @@ -3,8 +3,6 @@ import os from "node:os"; import path from "node:path"; import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; -import { createSqliteSessionTranscriptLocator } from "../config/sessions/paths.js"; -import { resolveAgentIdFromSessionKey } from "../routing/session-key.js"; import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; import type { AuthProfileFailureReason } from "./auth-profiles.js"; import { @@ -82,16 +80,6 @@ const OVERLOADED_ERROR_PAYLOAD = const RATE_LIMIT_ERROR_MESSAGE = "rate limit exceeded"; const NO_ENDPOINTS_FOUND_ERROR_MESSAGE = "404 No endpoints found for deepseek/deepseek-r1:free."; -function createTestSessionTranscriptLocator(params: { - sessionKey: string; - sessionId: string; -}): string { - return createSqliteSessionTranscriptLocator({ - agentId: resolveAgentIdFromSessionKey(params.sessionKey), - sessionId: params.sessionId, - }); -} - function createTestSessionId(raw: string): string { return raw.replace(/[^a-z0-9._-]/gi, "-").slice(0, 128); } @@ -276,10 +264,6 @@ async function runEmbeddedFallback(params: { runEmbeddedPiAgent({ sessionId, sessionKey: params.sessionKey, - sessionFile: createTestSessionTranscriptLocator({ - sessionKey: params.sessionKey, - sessionId, - }), workspaceDir: params.workspaceDir, agentDir: params.agentDir, config: cfg, @@ -429,10 +413,6 @@ describe("runWithModelFallback + runEmbeddedPiAgent failover behavior", () => { const result = await runEmbeddedPiAgent({ sessionId: "tool-side-effect-terminal", sessionKey: "agent:test:tool-side-effect-terminal", - sessionFile: createTestSessionTranscriptLocator({ - sessionKey: "agent:test:tool-side-effect-terminal", - sessionId: "tool-side-effect-terminal", - }), workspaceDir, agentDir, config: makeConfig(), @@ -533,10 +513,6 @@ describe("runWithModelFallback + runEmbeddedPiAgent failover behavior", () => { name: "undici-terminated", message: "terminated", }, - { - name: "stream-read-error", - message: "stream_read_error", - }, { name: "codex-empty-transport-response", message: "Request failed", diff --git a/src/agents/pi-embedded-runner.cache.live.test.ts b/src/agents/pi-embedded-runner.cache.live.test.ts index 2cc34c030f3..d8c573e4a0e 100644 --- a/src/agents/pi-embedded-runner.cache.live.test.ts +++ b/src/agents/pi-embedded-runner.cache.live.test.ts @@ -108,7 +108,7 @@ function buildRunnerSessionPaths(sessionId: string) { } return { agentDir: liveRunnerRootDir, - sessionFile: createSqliteSessionTranscriptLocator({ agentId: "main", sessionId }), + transcriptLocator: createSqliteSessionTranscriptLocator({ agentId: "main", sessionId }), workspaceDir: path.join(liveRunnerRootDir, `${sessionId}-workspace`), }; } @@ -303,7 +303,6 @@ async function runEmbeddedCacheProbe(params: { runEmbeddedPiAgent({ sessionId: params.sessionId, sessionKey: `live-cache:${params.providerTag}:${params.sessionId}`, - sessionFile: sessionPaths.sessionFile, workspaceDir: sessionPaths.workspaceDir, agentDir: sessionPaths.agentDir, config: buildEmbeddedRunnerConfig({ @@ -347,7 +346,7 @@ async function compactLiveCacheSession(params: { compactEmbeddedPiSessionDirect({ sessionId: params.sessionId, sessionKey: `live-cache:${params.providerTag}:${params.sessionId}`, - sessionFile: sessionPaths.sessionFile, + transcriptLocator: sessionPaths.transcriptLocator, workspaceDir: sessionPaths.workspaceDir, agentDir: sessionPaths.agentDir, config: buildEmbeddedRunnerConfig({ diff --git a/src/agents/pi-embedded-runner.e2e.test.ts b/src/agents/pi-embedded-runner.e2e.test.ts index 3bb73e335f1..e6f4145bdb3 100644 --- a/src/agents/pi-embedded-runner.e2e.test.ts +++ b/src/agents/pi-embedded-runner.e2e.test.ts @@ -208,20 +208,20 @@ beforeEach(() => { }); }); -const nextSessionFile = () => { +const nextTranscriptLocator = () => { sessionCounter += 1; return createSqliteSessionTranscriptLocator({ agentId: "test", sessionId: `session-${sessionCounter}`, }); }; -const sessionIdFromLocator = (sessionFile: string) => - parseSqliteSessionTranscriptLocator(sessionFile)?.sessionId ?? "session:test"; -const appendTestSessionMessage = async (sessionFile: string, message: unknown) => +const sessionIdFromLocator = (transcriptLocator: string) => + parseSqliteSessionTranscriptLocator(transcriptLocator)?.sessionId ?? "session:test"; +const appendTestSessionMessage = async (transcriptLocator: string, message: unknown) => await appendSessionTranscriptMessage({ agentId: "test", - sessionId: sessionIdFromLocator(sessionFile), - transcriptPath: sessionFile, + sessionId: sessionIdFromLocator(transcriptLocator), + transcriptPath: transcriptLocator, cwd: workspaceDir, message, }); @@ -229,8 +229,8 @@ const nextRunId = (prefix = "run-embedded-test") => `${prefix}-${++runCounter}`; const nextSessionKey = () => `agent:test:embedded:${nextRunId("session-key")}`; const runWithOrphanedSingleUserMessage = async (text: string, sessionKey: string) => { - const sessionFile = nextSessionFile(); - await appendTestSessionMessage(sessionFile, { + const transcriptLocator = nextTranscriptLocator(); + await appendTestSessionMessage(transcriptLocator, { role: "user", content: [{ type: "text", text }], timestamp: Date.now(), @@ -247,9 +247,8 @@ const runWithOrphanedSingleUserMessage = async (text: string, sessionKey: string const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-1"]); return await runEmbeddedPiAgent({ - sessionId: sessionIdFromLocator(sessionFile), + sessionId: sessionIdFromLocator(transcriptLocator), sessionKey, - sessionFile, workspaceDir, config: cfg, prompt: "hello", @@ -273,7 +272,7 @@ const textFromContent = (content: unknown) => { }; const readSessionEntries = async ( - sessionFile: string, + transcriptLocator: string, ): Promise< Array<{ type?: string; @@ -282,7 +281,7 @@ const readSessionEntries = async ( }> > => { try { - return (await readTranscriptState(sessionFile)).getEntries() as Array<{ + return (await readTranscriptState(transcriptLocator)).getEntries() as Array<{ type?: string; customType?: string; data?: unknown; @@ -295,8 +294,8 @@ const readSessionEntries = async ( } }; -const readSessionMessages = async (sessionFile: string) => { - const entries = await readSessionEntries(sessionFile); +const readSessionMessages = async (transcriptLocator: string) => { + const entries = await readSessionEntries(transcriptLocator); return entries .filter((entry) => entry.type === "message") .map( @@ -304,7 +303,11 @@ const readSessionMessages = async (sessionFile: string) => { ) as Array<{ role?: string; content?: unknown }>; }; -const runDefaultEmbeddedTurn = async (sessionFile: string, prompt: string, sessionKey: string) => { +const runDefaultEmbeddedTurn = async ( + transcriptLocator: string, + prompt: string, + sessionKey: string, +) => { const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-error"]); runEmbeddedAttemptMock.mockResolvedValueOnce( makeEmbeddedRunnerAttempt({ @@ -315,9 +318,8 @@ const runDefaultEmbeddedTurn = async (sessionFile: string, prompt: string, sessi }), ); await runEmbeddedPiAgent({ - sessionId: sessionIdFromLocator(sessionFile), + sessionId: sessionIdFromLocator(transcriptLocator), sessionKey, - sessionFile, workspaceDir, config: cfg, prompt, @@ -332,7 +334,7 @@ const runDefaultEmbeddedTurn = async (sessionFile: string, prompt: string, sessi describe("runEmbeddedPiAgent", () => { it("skips models.json generation when dynamic model resolution succeeds", async () => { - const sessionFile = nextSessionFile(); + const transcriptLocator = nextTranscriptLocator(); const cfg = createEmbeddedPiRunnerOpenAiConfig([]); runEmbeddedAttemptMock.mockResolvedValueOnce( makeEmbeddedRunnerAttempt({ @@ -345,7 +347,6 @@ describe("runEmbeddedPiAgent", () => { await runEmbeddedPiAgent({ sessionId: "dynamic-model", - sessionFile, workspaceDir, config: cfg, prompt: "hello", @@ -370,7 +371,7 @@ describe("runEmbeddedPiAgent", () => { }); it("backfills a trimmed session key from sessionId when the embedded run omits it", async () => { - const sessionFile = nextSessionFile(); + const transcriptLocator = nextTranscriptLocator(); const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-1"]); resolveSessionKeyForRequestMock.mockReturnValue({ sessionKey: "agent:test:resolved", @@ -388,7 +389,6 @@ describe("runEmbeddedPiAgent", () => { await runEmbeddedPiAgent({ sessionId: "resume-123", sessionKey: " ", - sessionFile, workspaceDir, config: cfg, prompt: "hello", @@ -410,7 +410,7 @@ describe("runEmbeddedPiAgent", () => { }); it("drops whitespace-only session keys when backfill cannot resolve a session key", async () => { - const sessionFile = nextSessionFile(); + const transcriptLocator = nextTranscriptLocator(); const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-1"]); resolveSessionKeyForRequestMock.mockReturnValue({ sessionKey: undefined, @@ -428,7 +428,6 @@ describe("runEmbeddedPiAgent", () => { await runEmbeddedPiAgent({ sessionId: "resume-124", sessionKey: " ", - sessionFile, workspaceDir, config: cfg, prompt: "hello", @@ -450,7 +449,7 @@ describe("runEmbeddedPiAgent", () => { }); it("logs when embedded session-key backfill resolution fails", async () => { - const sessionFile = nextSessionFile(); + const transcriptLocator = nextTranscriptLocator(); const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-1"]); resolveSessionKeyForRequestMock.mockImplementation(() => { throw new Error("resolver exploded"); @@ -466,7 +465,6 @@ describe("runEmbeddedPiAgent", () => { await runEmbeddedPiAgent({ sessionId: "resume-456", - sessionFile, workspaceDir, config: cfg, prompt: "hello", @@ -486,7 +484,7 @@ describe("runEmbeddedPiAgent", () => { }); it("passes the current agentId when backfilling a session key", async () => { - const sessionFile = nextSessionFile(); + const transcriptLocator = nextTranscriptLocator(); const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-1"]); resolveStoredSessionKeyForSessionIdMock.mockReturnValue({ sessionKey: "agent:test:resolved", @@ -504,7 +502,6 @@ describe("runEmbeddedPiAgent", () => { await runEmbeddedPiAgent({ sessionId: "resume-agent-1", sessionKey: undefined, - sessionFile, workspaceDir, config: cfg, prompt: "hello", @@ -526,8 +523,8 @@ describe("runEmbeddedPiAgent", () => { }); it("disposes bundle MCP once when a one-shot local run completes", async () => { - const sessionFile = nextSessionFile(); - const sessionId = sessionIdFromLocator(sessionFile); + const transcriptLocator = nextTranscriptLocator(); + const sessionId = sessionIdFromLocator(transcriptLocator); const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-1"]); const sessionKey = nextSessionKey(); runEmbeddedAttemptMock.mockResolvedValueOnce( @@ -542,7 +539,6 @@ describe("runEmbeddedPiAgent", () => { await runEmbeddedPiAgent({ sessionId, sessionKey, - sessionFile, workspaceDir, config: cfg, prompt: "hello", @@ -562,8 +558,8 @@ describe("runEmbeddedPiAgent", () => { it("preserves bundle MCP state across retries within one local run", async () => { refreshRuntimeAuthOnFirstPromptError = true; - const sessionFile = nextSessionFile(); - const sessionId = sessionIdFromLocator(sessionFile); + const transcriptLocator = nextTranscriptLocator(); + const sessionId = sessionIdFromLocator(transcriptLocator); const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-1"]); const sessionKey = nextSessionKey(); runEmbeddedAttemptMock @@ -586,7 +582,6 @@ describe("runEmbeddedPiAgent", () => { const result = await runEmbeddedPiAgent({ sessionId, sessionKey, - sessionFile, workspaceDir, config: cfg, prompt: "hello", @@ -606,7 +601,7 @@ describe("runEmbeddedPiAgent", () => { }); it("retries a planning-only GPT turn once with an act-now steer", async () => { - const sessionFile = nextSessionFile(); + const transcriptLocator = nextTranscriptLocator(); const cfg = createEmbeddedPiRunnerOpenAiConfig(["gpt-5.4"]); const sessionKey = nextSessionKey(); @@ -640,9 +635,8 @@ describe("runEmbeddedPiAgent", () => { }); const result = await runEmbeddedPiAgent({ - sessionId: sessionIdFromLocator(sessionFile), + sessionId: sessionIdFromLocator(transcriptLocator), sessionKey, - sessionFile, workspaceDir, config: cfg, prompt: "ship it", @@ -659,7 +653,7 @@ describe("runEmbeddedPiAgent", () => { }); it("handles prompt error paths without dropping user state", async () => { - const sessionFile = nextSessionFile(); + const transcriptLocator = nextTranscriptLocator(); const cfg = createEmbeddedPiRunnerOpenAiConfig(["mock-error"]); const sessionKey = nextSessionKey(); runEmbeddedAttemptMock.mockResolvedValueOnce( @@ -669,9 +663,8 @@ describe("runEmbeddedPiAgent", () => { ); await expect( runEmbeddedPiAgent({ - sessionId: sessionIdFromLocator(sessionFile), + sessionId: sessionIdFromLocator(transcriptLocator), sessionKey, - sessionFile, workspaceDir, config: cfg, prompt: "boom", @@ -684,7 +677,7 @@ describe("runEmbeddedPiAgent", () => { }), ).rejects.toThrow("boom"); - const messages = await readSessionMessages(sessionFile); + const messages = await readSessionMessages(transcriptLocator); if (messages.length > 0) { const userIndex = messages.findIndex( (message) => message?.role === "user" && textFromContent(message.content) === "boom", @@ -697,15 +690,15 @@ describe("runEmbeddedPiAgent", () => { "preserves existing transcript entries across an additional turn", { timeout: 7_000 }, async () => { - const sessionFile = nextSessionFile(); + const transcriptLocator = nextTranscriptLocator(); const sessionKey = nextSessionKey(); - await appendTestSessionMessage(sessionFile, { + await appendTestSessionMessage(transcriptLocator, { role: "user", content: [{ type: "text", text: "seed user" }], timestamp: Date.now(), }); - await appendTestSessionMessage(sessionFile, { + await appendTestSessionMessage(transcriptLocator, { role: "assistant", content: [{ type: "text", text: "seed assistant" }], stopReason: "stop", @@ -715,9 +708,9 @@ describe("runEmbeddedPiAgent", () => { usage: createMockUsage(1, 1), timestamp: Date.now(), }); - await runDefaultEmbeddedTurn(sessionFile, "hello", sessionKey); + await runDefaultEmbeddedTurn(transcriptLocator, "hello", sessionKey); - const messages = await readSessionMessages(sessionFile); + const messages = await readSessionMessages(transcriptLocator); const seedUserIndex = messages.findIndex( (message) => message?.role === "user" && textFromContent(message.content) === "seed user", ); diff --git a/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts b/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts index 98e7f10845e..12204c484ac 100644 --- a/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts +++ b/src/agents/pi-embedded-runner.run-embedded-pi-agent.auth-profile-rotation.e2e.test.ts @@ -3,9 +3,7 @@ import os from "node:os"; import path from "node:path"; import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../config/config.js"; -import { createSqliteSessionTranscriptLocator } from "../config/sessions/paths.js"; import { redactIdentifier } from "../logging/redact-identifier.js"; -import { resolveAgentIdFromSessionKey } from "../routing/session-key.js"; import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; import type { AuthProfileFailureReason } from "./auth-profiles.js"; import { @@ -35,13 +33,6 @@ const { computeBackoffMock, sleepWithAbortMock } = vi.hoisted(() => ({ const TEST_SESSION_ID = "session-test"; -function createTestSessionTranscriptLocator(sessionKey?: string): string { - return createSqliteSessionTranscriptLocator({ - agentId: resolveAgentIdFromSessionKey(sessionKey), - sessionId: TEST_SESSION_ID, - }); -} - const installRunEmbeddedMocks = () => { installEmbeddedRunnerBaseE2eMocks(); installEmbeddedRunnerFastRunE2eMocks({ @@ -467,7 +458,6 @@ async function runAutoPinnedOpenAiTurn(params: { await runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: params.sessionKey, - sessionFile: createTestSessionTranscriptLocator(params.sessionKey), workspaceDir: params.workspaceDir, agentDir: params.agentDir, config: params.config ?? makeConfig(), @@ -621,7 +611,6 @@ async function runTurnWithCooldownSeed(params: { await runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: params.sessionKey, - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig(), @@ -686,7 +675,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { await runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:copilot-auth-error", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeCopilotConfig(), @@ -771,7 +759,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { await runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:copilot-auth-repeat", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeCopilotConfig(), @@ -819,7 +806,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { const runPromise = runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:copilot-shutdown", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeCopilotConfig(), @@ -1024,7 +1010,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { const result = await runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:compaction-timeout", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig(), @@ -1063,7 +1048,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { const result = await runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:compaction-wait-abort", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig(), @@ -1092,7 +1076,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:user", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig(), @@ -1142,7 +1125,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { await runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:user-order-excluded", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig(), @@ -1171,7 +1153,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { await runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:user-auth-alias", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig(), @@ -1210,7 +1191,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { await runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:mismatch", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig(), @@ -1252,7 +1232,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:cooldown-failover", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig({ fallbacks: ["openai/mock-2"] }), @@ -1296,7 +1275,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { const result = await runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:cooldown-probe", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig({ fallbacks: ["openai/mock-2"] }), @@ -1344,7 +1322,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { const result = await runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:overloaded-cooldown-probe", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig({ fallbacks: ["openai/mock-2"] }), @@ -1392,7 +1369,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { const result = await runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:billing-cooldown-probe-no-fallbacks", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig(), @@ -1423,7 +1399,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:support:cooldown-failover", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeAgentOverrideOnlyFallbackConfig("support"), @@ -1468,7 +1443,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:disabled-failover", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig({ fallbacks: ["openai/mock-2"] }), @@ -1503,7 +1477,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:auth-unavailable", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig({ fallbacks: ["openai/mock-2"], apiKey: "" }), @@ -1541,7 +1514,6 @@ describe("runEmbeddedPiAgent auth profile rotation", () => { await runEmbeddedPiAgentInline({ sessionId: TEST_SESSION_ID, sessionKey: "agent:test:billing-failover-active-model", - sessionFile: createTestSessionTranscriptLocator(), workspaceDir, agentDir, config: makeConfig({ fallbacks: ["openai/mock-2"] }), diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts index 6efce531a3c..501d9e219d6 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts @@ -24,7 +24,7 @@ type MockCompactionResult = tokensBefore?: number; tokensAfter?: number; sessionId?: string; - sessionFile?: string; + transcriptLocator?: string; }; reason?: string; } @@ -228,7 +228,6 @@ export const mockedShouldPreferExplicitConfigApiKeyAuth = vi.fn(() => false); export const overflowBaseRunParams = { sessionId: "test-session", sessionKey: "test-key", - sessionFile: "sqlite-transcript://main/test-session.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 30000, diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts index 3029bed821f..e3dc52c51e0 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts @@ -166,7 +166,6 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { await runEmbeddedPiAgent({ sessionId: "test-session", sessionKey: "test-key", - sessionFile: "sqlite-transcript://main/test-session.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 30000, @@ -583,7 +582,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { expect(mockedCompactDirect).toHaveBeenCalledWith( expect.objectContaining({ sessionId: "test-session", - sessionFile: "sqlite-transcript://main/test-session.jsonl", + transcriptLocator: "sqlite-transcript://main/test-session", runtimeContext: expect.objectContaining({ trigger: "overflow", authProfileId: "test-profile", @@ -736,7 +735,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { await runEmbeddedPiAgent(overflowBaseRunParams); expect(mockedGlobalHookRunner.runBeforeCompaction).toHaveBeenCalledWith( - { messageCount: -1, sessionFile: "sqlite-transcript://main/test-session.jsonl" }, + { messageCount: -1, transcriptLocator: "sqlite-transcript://main/test-session" }, expect.objectContaining({ sessionKey: "test-key", }), @@ -746,7 +745,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { messageCount: -1, compactedCount: -1, tokenCount: 50, - sessionFile: "sqlite-transcript://main/test-session.jsonl", + transcriptLocator: "sqlite-transcript://main/test-session", }, expect.objectContaining({ sessionKey: "test-key", @@ -775,7 +774,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { contextEngine: mockedContextEngine, sessionId: "test-session", sessionKey: "test-key", - sessionFile: "sqlite-transcript://main/test-session.jsonl", + transcriptLocator: "sqlite-transcript://main/test-session", reason: "compaction", runtimeContext: expect.objectContaining({ trigger: "overflow", @@ -792,7 +791,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { makeAttemptResult({ promptError: null, sessionIdUsed: "rotated-session", - sessionFileUsed: "/tmp/rotated-session.json", + transcriptLocatorUsed: "sqlite-transcript://main/rotated-session", }), ); mockedCompactDirect.mockResolvedValueOnce( @@ -800,7 +799,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { summary: "rotated overflow compaction", tokensAfter: 50, sessionId: "rotated-session", - sessionFile: "/tmp/rotated-session.json", + transcriptLocator: "sqlite-transcript://main/rotated-session", }), ); @@ -810,13 +809,12 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { 2, expect.objectContaining({ sessionId: "rotated-session", - sessionFile: "/tmp/rotated-session.json", }), ); expect(mockedRunContextEngineMaintenance).toHaveBeenCalledWith( expect.objectContaining({ sessionId: "rotated-session", - sessionFile: "/tmp/rotated-session.json", + transcriptLocator: "sqlite-transcript://main/rotated-session", }), ); }); diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 888b354fa1a..a6147072864 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -4,7 +4,6 @@ import type { ReplyPayload } from "../../auto-reply/reply-payload.js"; import type { ReplyBackendHandle } from "../../auto-reply/reply/reply-run-registry.js"; import type { ThinkLevel } from "../../auto-reply/thinking.js"; import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; -import { createSqliteSessionTranscriptLocator } from "../../config/sessions/paths.js"; import { ensureContextEnginesInitialized } from "../../context-engine/init.js"; import { resolveContextEngine, @@ -93,6 +92,7 @@ import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js"; import type { AgentWorkerPermissionMode } from "../runtime-worker-permissions.js"; import { resolveSessionSuspensionReason, suspendSession } from "../session-suspension.js"; import { resolveToolLoopDetectionConfig } from "../tool-loop-detection-config.js"; +import { openTranscriptSessionManagerForSession } from "../transcript/session-manager.js"; import { derivePromptTokens, normalizeUsage, type UsageLike } from "../usage.js"; import { redactRunIdentifier, resolveRunWorkspaceDir } from "../workspace-run.js"; import { runPostCompactionSideEffects } from "./compaction-hooks.js"; @@ -455,15 +455,17 @@ export async function runEmbeddedPiAgent( config: params.config, agentId: params.agentId, }); - const sqliteTranscriptLocator = createSqliteSessionTranscriptLocator({ + const initialSessionManager = openTranscriptSessionManagerForSession({ agentId: sessionAgentId, sessionId: params.sessionId, + cwd: params.workspaceDir, }); - const normalizedParams: RunEmbeddedPiAgentParams & { transcriptLocator: string } = { - ...params, - transcriptLocator: sqliteTranscriptLocator, - }; - params = normalizedParams; + const initialTranscriptLocator = initialSessionManager.getTranscriptLocator(); + if (!initialTranscriptLocator) { + throw new Error( + `SQLite transcript scope did not produce a runtime transcript handle: agentId=${sessionAgentId} sessionId=${params.sessionId}`, + ); + } const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId); const globalLane = resolveGlobalLane(params.lane); const laneTaskTimeoutMs = resolveEmbeddedRunLaneTimeoutMs(params.timeoutMs); @@ -1002,9 +1004,9 @@ export async function runEmbeddedPiAgent( const overloadProfileRotationLimit = resolveOverloadProfileRotationLimit(params.config); const rateLimitProfileRotationLimit = resolveRateLimitProfileRotationLimit(params.config); let activeSessionId = params.sessionId; - let activeTranscriptLocator = sqliteTranscriptLocator; + let activeTranscriptLocator = initialTranscriptLocator; let suppressNextUserMessagePersistence = params.suppressNextUserMessagePersistence ?? false; - // Pi owns transcript persistence; this marker only lets the outer retry avoid + // OpenClaw owns transcript persistence; this marker only lets the outer retry avoid // replaying the same inbound channel message after overflow compaction. let lastPersistedCurrentMessageId: string | number | undefined; const onUserMessagePersisted: RunEmbeddedPiAgentParams["onUserMessagePersisted"] = ( @@ -1310,7 +1312,6 @@ export async function runEmbeddedPiAgent( currentMessageId: params.currentMessageId, replyToMode: params.replyToMode, hasRepliedRef: params.hasRepliedRef, - transcriptLocator: activeTranscriptLocator, workspaceDir: resolvedWorkspace, agentDir, config: params.config, diff --git a/src/agents/pi-embedded-runner/run.worker-launch.test.ts b/src/agents/pi-embedded-runner/run.worker-launch.test.ts index e7dfa699f92..56f101f3958 100644 --- a/src/agents/pi-embedded-runner/run.worker-launch.test.ts +++ b/src/agents/pi-embedded-runner/run.worker-launch.test.ts @@ -28,7 +28,6 @@ function makeParams(): RunEmbeddedPiAgentParams { model: "gpt-5.5", prompt: "hello", runId: "run-1", - transcriptLocator: "/tmp/stale-session.jsonl", sessionId: "session-1", sessionKey: "session-key-1", timeoutMs: 1_000, @@ -86,7 +85,6 @@ describe("runEmbeddedPiAgent worker launch", () => { runParams: expect.objectContaining({ sessionId: "session-1", sessionKey: "session-key-1", - transcriptLocator: "sqlite-transcript://agent-1/session-1", }), mode: "worker", workerChild: false, @@ -95,7 +93,6 @@ describe("runEmbeddedPiAgent worker launch", () => { expect.objectContaining({ runId: "run-1", sessionId: "session-1", - transcriptLocator: "sqlite-transcript://agent-1/session-1", }), { runtimeId: "pi", @@ -105,7 +102,7 @@ describe("runEmbeddedPiAgent worker launch", () => { ); }); - it("does not forward caller-provided filesystem transcript locators", async () => { + it("does not include transcript locators in worker handoff", async () => { const workerResult = { payloads: [{ text: "worker-ok" }], meta: { durationMs: 12 }, @@ -117,17 +114,10 @@ describe("runEmbeddedPiAgent worker launch", () => { runPiRunInWorkerMock.mockResolvedValue(workerResult); vi.stubEnv("OPENCLAW_AGENT_WORKER_MODE", "worker"); - await expect( - runEmbeddedPiAgent({ - ...makeParams(), - transcriptLocator: "/tmp/old-runtime-session.jsonl", - }), - ).resolves.toBe(workerResult); + await expect(runEmbeddedPiAgent(makeParams())).resolves.toBe(workerResult); expect(runPiRunInWorkerMock).toHaveBeenCalledWith( - expect.objectContaining({ - transcriptLocator: "sqlite-transcript://agent-1/session-1", - }), + expect.not.objectContaining({ transcriptLocator: expect.anything() }), expect.anything(), ); }); diff --git a/src/agents/pi-embedded-runner/run/attempt.context-engine-helpers.ts b/src/agents/pi-embedded-runner/run/attempt.context-engine-helpers.ts index bc6d998b2de..8f5c6474d42 100644 --- a/src/agents/pi-embedded-runner/run/attempt.context-engine-helpers.ts +++ b/src/agents/pi-embedded-runner/run/attempt.context-engine-helpers.ts @@ -23,8 +23,12 @@ export async function resolveAttemptBootstrapContext Promise; + agentId: string; + sessionId: string; + hasCompletedBootstrapSessionTurn: (scope: { + agentId: string; + sessionId: string; + }) => Promise; resolveBootstrapContextForRun: () => Promise< AttemptBootstrapContext >; @@ -38,7 +42,10 @@ export async function resolveAttemptBootstrapContext Promise<{ bootstrapFiles: unknown[]; contextFiles: unknown[] }>; }) { - const hasCompletedBootstrapTranscriptTurn = vi.fn(async () => params.completed ?? false); + const hasCompletedBootstrapSessionTurn = vi.fn(async () => params.completed ?? false); const resolveBootstrapContextForRun = params.resolver ?? vi.fn(async () => ({ @@ -34,12 +34,12 @@ async function resolveBootstrapContext(params: { bootstrapContextMode: params.bootstrapContextMode ?? "full", bootstrapContextRunKind: params.bootstrapContextRunKind ?? "default", bootstrapMode: params.bootstrapMode ?? "none", - transcriptLocator: TEST_TRANSCRIPT_LOCATOR, - hasCompletedBootstrapTranscriptTurn, + ...TEST_BOOTSTRAP_SCOPE, + hasCompletedBootstrapSessionTurn, resolveBootstrapContextForRun, }); - return { result, hasCompletedBootstrapTranscriptTurn, resolveBootstrapContextForRun }; + return { result, hasCompletedBootstrapSessionTurn, resolveBootstrapContextForRun }; } describe("embedded attempt context injection", () => { @@ -48,7 +48,7 @@ describe("embedded attempt context injection", () => { }); it("skips bootstrap reinjection on safe continuation turns when configured", async () => { - const { result, hasCompletedBootstrapTranscriptTurn, resolveBootstrapContextForRun } = + const { result, hasCompletedBootstrapSessionTurn, resolveBootstrapContextForRun } = await resolveBootstrapContext({ contextInjectionMode: "continuation-skip", completed: true, @@ -57,7 +57,7 @@ describe("embedded attempt context injection", () => { expect(result.isContinuationTurn).toBe(true); expect(result.bootstrapFiles).toEqual([]); expect(result.contextFiles).toEqual([]); - expect(hasCompletedBootstrapTranscriptTurn).toHaveBeenCalledWith(TEST_TRANSCRIPT_LOCATOR); + expect(hasCompletedBootstrapSessionTurn).toHaveBeenCalledWith(TEST_BOOTSTRAP_SCOPE); expect(resolveBootstrapContextForRun).not.toHaveBeenCalled(); }); @@ -80,7 +80,7 @@ describe("embedded attempt context injection", () => { }); it("disables bootstrap injection without marking the turn as a continuation", async () => { - const { result, hasCompletedBootstrapTranscriptTurn, resolveBootstrapContextForRun } = + const { result, hasCompletedBootstrapSessionTurn, resolveBootstrapContextForRun } = await resolveBootstrapContext({ contextInjectionMode: "never", bootstrapMode: "full", @@ -91,7 +91,7 @@ describe("embedded attempt context injection", () => { expect(result.shouldRecordCompletedBootstrapTurn).toBe(false); expect(result.bootstrapFiles).toEqual([]); expect(result.contextFiles).toEqual([]); - expect(hasCompletedBootstrapTranscriptTurn).not.toHaveBeenCalled(); + expect(hasCompletedBootstrapSessionTurn).not.toHaveBeenCalled(); expect(resolveBootstrapContextForRun).not.toHaveBeenCalled(); }); @@ -101,7 +101,7 @@ describe("embedded attempt context injection", () => { contextFiles: [{ path: "BOOTSTRAP.md" }], })); - const { result, hasCompletedBootstrapTranscriptTurn } = await resolveBootstrapContext({ + const { result, hasCompletedBootstrapSessionTurn } = await resolveBootstrapContext({ contextInjectionMode: "continuation-skip", bootstrapMode: "full", completed: true, @@ -111,7 +111,7 @@ describe("embedded attempt context injection", () => { expect(result.isContinuationTurn).toBe(false); expect(result.bootstrapFiles).toEqual([{ name: "BOOTSTRAP.md" }]); expect(result.contextFiles).toEqual([{ path: "BOOTSTRAP.md" }]); - expect(hasCompletedBootstrapTranscriptTurn).not.toHaveBeenCalled(); + expect(hasCompletedBootstrapSessionTurn).not.toHaveBeenCalled(); expect(resolver).toHaveBeenCalledTimes(1); }); @@ -143,7 +143,7 @@ describe("embedded attempt context injection", () => { }); it("never skips heartbeat bootstrap filtering", async () => { - const { result, hasCompletedBootstrapTranscriptTurn, resolveBootstrapContextForRun } = + const { result, hasCompletedBootstrapSessionTurn, resolveBootstrapContextForRun } = await resolveBootstrapContext({ contextInjectionMode: "continuation-skip", bootstrapContextMode: "lightweight", @@ -153,7 +153,7 @@ describe("embedded attempt context injection", () => { expect(result.isContinuationTurn).toBe(false); expect(result.shouldRecordCompletedBootstrapTurn).toBe(false); - expect(hasCompletedBootstrapTranscriptTurn).not.toHaveBeenCalled(); + expect(hasCompletedBootstrapSessionTurn).not.toHaveBeenCalled(); expect(resolveBootstrapContextForRun).toHaveBeenCalledTimes(1); }); @@ -185,7 +185,7 @@ describe("embedded attempt context injection", () => { }); it("allows continuation skip again for limited bootstrap mode", async () => { - const { result, hasCompletedBootstrapTranscriptTurn, resolveBootstrapContextForRun } = + const { result, hasCompletedBootstrapSessionTurn, resolveBootstrapContextForRun } = await resolveBootstrapContext({ contextInjectionMode: "continuation-skip", bootstrapMode: "limited", @@ -193,7 +193,7 @@ describe("embedded attempt context injection", () => { }); expect(result.isContinuationTurn).toBe(true); - expect(hasCompletedBootstrapTranscriptTurn).toHaveBeenCalledWith(TEST_TRANSCRIPT_LOCATOR); + expect(hasCompletedBootstrapSessionTurn).toHaveBeenCalledWith(TEST_BOOTSTRAP_SCOPE); expect(resolveBootstrapContextForRun).not.toHaveBeenCalled(); expect(result.shouldRecordCompletedBootstrapTurn).toBe(false); }); diff --git a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts index 7357a6c4c52..82e0689b511 100644 --- a/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts +++ b/src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts @@ -68,6 +68,7 @@ type AttemptSpawnWorkspaceHoisted = { installToolResultContextGuardMock: UnknownMock; installContextEngineLoopHookMock: UnknownMock; flushPendingToolResultsAfterIdleMock: AsyncUnknownMock; + releaseWsSessionMock: UnknownMock; resolveBootstrapFilesForRunMock: Mock<(...args: unknown[]) => Promise>; resolveBootstrapContextForRunMock: Mock<() => Promise>; isWorkspaceBootstrapPendingMock: Mock<(workspaceDir: string) => Promise>; @@ -131,6 +132,7 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => { const installToolResultContextGuardMock = vi.fn(() => () => {}); const installContextEngineLoopHookMock = vi.fn(() => () => {}); const flushPendingToolResultsAfterIdleMock = vi.fn(async () => {}); + const releaseWsSessionMock = vi.fn(() => {}); const subscribeEmbeddedPiSessionMock = vi.fn(() => createSubscriptionMock(), ); @@ -197,6 +199,7 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => { installToolResultContextGuardMock, installContextEngineLoopHookMock, flushPendingToolResultsAfterIdleMock, + releaseWsSessionMock, resolveBootstrapFilesForRunMock, resolveBootstrapContextForRunMock, isWorkspaceBootstrapPendingMock, @@ -510,6 +513,12 @@ vi.mock("../extra-params.js", async () => { }; }); +vi.mock("../../openai-ws-stream.js", () => ({ + createOpenAIWebSocketStreamFn: vi.fn(), + releaseWsSession: (...args: unknown[]) => + (hoisted.releaseWsSessionMock as (...args: unknown[]) => unknown)(...args), +})); + vi.mock("../../anthropic-payload-log.js", () => ({ createAnthropicPayloadLogger: () => undefined, })); @@ -879,6 +888,7 @@ export function resetEmbeddedAttemptHarness( hoisted.installToolResultContextGuardMock.mockReset().mockReturnValue(() => {}); hoisted.installContextEngineLoopHookMock.mockReset().mockReturnValue(() => {}); hoisted.flushPendingToolResultsAfterIdleMock.mockReset().mockResolvedValue(undefined); + hoisted.releaseWsSessionMock.mockReset().mockReturnValue(undefined); hoisted.resolveBootstrapContextForRunMock.mockReset().mockResolvedValue({ bootstrapFiles: [], contextFiles: [], @@ -1016,14 +1026,14 @@ export async function createContextEngineAttemptRunner(params: { bootstrap?: (params: { sessionId: string; sessionKey?: string; - sessionFile: string; + transcriptLocator: string; }) => Promise; maintain?: | boolean | ((params: { sessionId: string; sessionKey?: string; - sessionFile: string; + transcriptLocator: string; runtimeContext?: Record; }) => Promise<{ changed: boolean; @@ -1041,7 +1051,7 @@ export async function createContextEngineAttemptRunner(params: { afterTurn?: (params: { sessionId: string; sessionKey?: string; - sessionFile: string; + transcriptLocator: string; messages: AgentMessage[]; prePromptMessageCount: number; tokenBudget?: number; @@ -1060,7 +1070,7 @@ export async function createContextEngineAttemptRunner(params: { compact?: (params: { sessionId: string; sessionKey?: string; - sessionFile: string; + transcriptLocator: string; tokenBudget?: number; }) => Promise; info?: Partial; @@ -1077,7 +1087,7 @@ export async function createContextEngineAttemptRunner(params: { const agentDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-ctx-engine-agent-")); const stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-ctx-engine-state-")); const sessionId = "embedded-session"; - const sessionFile = createSqliteSessionTranscriptLocator({ + const transcriptLocator = createSqliteSessionTranscriptLocator({ agentId: resolveAgentIdFromSessionKey(params.sessionKey) ?? DEFAULT_AGENT_ID, sessionId, }); @@ -1122,7 +1132,6 @@ export async function createContextEngineAttemptRunner(params: { )({ sessionId, sessionKey: params.sessionKey, - sessionFile, workspaceDir, agentDir, config: {}, diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 05e216da2f8..581861c0ea5 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -5,7 +5,6 @@ import { isAcpRuntimeSpawnAvailable } from "../../../acp/runtime/availability.js import { buildHierarchyReinforcementMessage } from "../../../auto-reply/handoff-summarizer.js"; import { filterHeartbeatPairs } from "../../../auto-reply/heartbeat-filter.js"; import { getRuntimeConfig } from "../../../config/config.js"; -import { createSqliteSessionTranscriptLocator } from "../../../config/sessions/paths.js"; import { getSessionEntry, listSessionEntries, @@ -67,7 +66,7 @@ import { import { FULL_BOOTSTRAP_COMPLETED_CUSTOM_TYPE, buildBootstrapContextForFiles, - hasCompletedBootstrapTranscriptTurn, + hasCompletedBootstrapSessionTurn, isWorkspaceBootstrapPending, makeBootstrapWarn, resolveBootstrapFilesForRun, @@ -163,9 +162,9 @@ import { } from "../../tool-allowlist-guard.js"; import { UNKNOWN_TOOL_THRESHOLD } from "../../tool-loop-detection.js"; import { shouldAllowProviderOwnedThinkingReplay } from "../../transcript-policy.js"; -import { repairTranscriptStateIfNeeded } from "../../transcript-state-repair.js"; +import { repairTranscriptSessionStateIfNeeded } from "../../transcript-state-repair.js"; import { removeSessionManagerTailEntries } from "../../transcript/session-manager-tail.js"; -import { openTranscriptSessionManager } from "../../transcript/session-manager.js"; +import { openTranscriptSessionManagerForSession } from "../../transcript/session-manager.js"; import { normalizeUsage, type NormalizedUsage } from "../../usage.js"; import { DEFAULT_BOOTSTRAP_FILENAME } from "../../workspace.js"; import { isRunnerAbortError } from "../abort.js"; @@ -722,13 +721,6 @@ export async function runEmbeddedAttempt( config: params.config, agentId: params.agentId, }); - const sqliteTranscriptLocator = createSqliteSessionTranscriptLocator({ - agentId: sessionAgentId, - sessionId: params.sessionId, - }); - if (params.transcriptLocator !== sqliteTranscriptLocator) { - params = { ...params, transcriptLocator: sqliteTranscriptLocator }; - } const runArtifactStore = createRunArtifactStoreBestEffort({ agentId: sessionAgentId, runId: params.runId, @@ -974,8 +966,9 @@ export async function runEmbeddedAttempt( bootstrapContextMode: params.bootstrapContextMode, bootstrapContextRunKind: params.bootstrapContextRunKind ?? "default", bootstrapMode, - transcriptLocator: params.transcriptLocator, - hasCompletedBootstrapTranscriptTurn, + agentId: sessionAgentId, + sessionId: params.sessionId, + hasCompletedBootstrapSessionTurn, resolveBootstrapContextForRun: async () => { const bootstrapFiles = preloadedBootstrapFiles ?? @@ -1397,12 +1390,13 @@ export async function runEmbeddedAttempt( let trajectoryRecorder: ReturnType | null = null; let trajectoryEndRecorded = false; try { - await repairTranscriptStateIfNeeded({ - transcriptLocator: params.transcriptLocator, + await repairTranscriptSessionStateIfNeeded({ + agentId: sessionAgentId, + sessionId: params.sessionId, debug: (message) => log.debug(message), warn: (message) => log.warn(message), }); - const hadTranscriptLocator = hasSqliteSessionTranscriptEvents({ + const hadTranscriptEvents = hasSqliteSessionTranscriptEvents({ agentId: sessionAgentId, sessionId: params.sessionId, }); @@ -1417,8 +1411,8 @@ export async function runEmbeddedAttempt( }); sessionManager = guardSessionManager( - openTranscriptSessionManager({ - transcriptLocator: params.transcriptLocator, + openTranscriptSessionManagerForSession({ + agentId: sessionAgentId, sessionId: params.sessionId, cwd: effectiveWorkspace, }), @@ -1443,12 +1437,18 @@ export async function runEmbeddedAttempt( }, }, ); + const sessionTranscriptLocator = sessionManager.getTranscriptLocator(); + if (!sessionTranscriptLocator) { + throw new Error( + `SQLite transcript scope did not produce a runtime transcript handle: agentId=${sessionAgentId} sessionId=${params.sessionId}`, + ); + } await runAttemptContextEngineBootstrap({ - hadTranscriptLocator, + hadTranscriptLocator: hadTranscriptEvents, contextEngine: activeContextEngine, sessionId: params.sessionId, sessionKey: params.sessionKey, - transcriptLocator: params.transcriptLocator, + transcriptLocator: sessionTranscriptLocator, sessionManager, runtimeContext: buildAfterTurnRuntimeContext({ attempt: params, @@ -1736,7 +1736,7 @@ export async function runEmbeddedAttempt( contextEngine: activeContextEngine, sessionId: params.sessionId, sessionKey: params.sessionKey, - transcriptLocator: params.transcriptLocator, + transcriptLocator: sessionTranscriptLocator, tokenBudget: params.contextTokenBudget, modelId: params.modelId, getPrePromptMessageCount: () => prePromptMessageCount, @@ -2532,7 +2532,7 @@ export async function runEmbeddedAttempt( let messagesSnapshot: AgentMessage[] = []; let sessionIdUsed = activeSession.sessionId; - let transcriptLocatorUsed: string | undefined = params.transcriptLocator; + let transcriptLocatorUsed: string | undefined = sessionTranscriptLocator; const onAbort = () => { externalAbort = true; const reason = params.abortSignal ? getAbortReason(params.abortSignal) : undefined; @@ -2576,7 +2576,7 @@ export async function runEmbeddedAttempt( `effectiveReserveTokens=${request.effectiveReserveTokens} ` + `prePromptMessageCount=${prePromptMessageCount} ` + (extra ? `${extra} ` : "") + - `transcriptLocator=${params.transcriptLocator}`, + `transcriptLocator=${sessionTranscriptLocator}`, ); }; if (request.route === "truncate_tool_results_only") { @@ -2591,7 +2591,7 @@ export async function runEmbeddedAttempt( contextWindowTokens: contextTokenBudget, maxCharsOverride: toolResultMaxChars, agentId: sessionAgentId, - transcriptLocator: params.transcriptLocator, + transcriptLocator: sessionTranscriptLocator, sessionId: params.sessionId, sessionKey: params.sessionKey, }); @@ -3063,7 +3063,7 @@ export async function runEmbeddedAttempt( `historyImageBlocks=${sessionSummary.totalImageBlocks} ` + `systemPromptChars=${systemLen} promptChars=${promptLen} ` + `promptImages=${imageResult.images.length} ` + - `provider=${params.provider}/${params.modelId} transcriptLocator=${params.transcriptLocator}`, + `provider=${params.provider}/${params.modelId} transcriptLocator=${sessionTranscriptLocator}`, ); } @@ -3126,7 +3126,7 @@ export async function runEmbeddedAttempt( contextWindowTokens: contextTokenBudget, maxCharsOverride: toolResultMaxChars, agentId: sessionAgentId, - transcriptLocator: params.transcriptLocator, + transcriptLocator: sessionTranscriptLocator, sessionId: params.sessionId, sessionKey: params.sessionKey, }); @@ -3145,7 +3145,7 @@ export async function runEmbeddedAttempt( `overflowTokens=${preemptiveCompaction.overflowTokens} ` + `toolResultReducibleChars=${preemptiveCompaction.toolResultReducibleChars} ` + `effectiveReserveTokens=${preemptiveCompaction.effectiveReserveTokens} ` + - `transcriptLocator=${params.transcriptLocator}`, + `transcriptLocator=${sessionTranscriptLocator}`, ); skipPromptSubmission = true; } @@ -3153,7 +3153,7 @@ export async function runEmbeddedAttempt( log.warn( `[context-overflow-precheck] early tool-result truncation did not help for ` + `${params.provider}/${params.modelId}; falling back to compaction ` + - `reason=${truncationResult.reason ?? "unknown"} transcriptLocator=${params.transcriptLocator}`, + `reason=${truncationResult.reason ?? "unknown"} transcriptLocator=${sessionTranscriptLocator}`, ); preflightRecovery = { route: "compact_only" }; promptError = new Error(PREEMPTIVE_OVERFLOW_ERROR_TEXT); @@ -3178,7 +3178,7 @@ export async function runEmbeddedAttempt( `toolResultReducibleChars=${preemptiveCompaction.toolResultReducibleChars} ` + `reserveTokens=${reserveTokens} ` + `effectiveReserveTokens=${preemptiveCompaction.effectiveReserveTokens} ` + - `transcriptLocator=${params.transcriptLocator}`, + `transcriptLocator=${sessionTranscriptLocator}`, ); skipPromptSubmission = true; } @@ -3446,7 +3446,7 @@ export async function runEmbeddedAttempt( yieldAborted, sessionIdUsed, sessionKey: params.sessionKey, - transcriptLocator: params.transcriptLocator, + transcriptLocator: sessionTranscriptLocator, messagesSnapshot, prePromptMessageCount, tokenBudget: params.contextTokenBudget, @@ -3503,7 +3503,7 @@ export async function runEmbeddedAttempt( const rotation = await rotateTranscriptAfterCompaction({ sessionManager, agentId: params.agentId, - transcriptLocator: params.transcriptLocator, + transcriptLocator: sessionTranscriptLocator, }); if (rotation.rotated) { sessionIdUsed = rotation.sessionId ?? sessionIdUsed; diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index 52e1a52b362..7178f1722de 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -99,12 +99,6 @@ export type RunEmbeddedPiAgentParams = { forceHeartbeatTool?: boolean; /** Allow runtime plugins for this run to late-bind the gateway subagent. */ allowGatewaySubagentBinding?: boolean; - /** - * Ignored legacy boundary hint. `runEmbeddedPiAgent()` always derives the - * active SQLite transcript locator from `{agentId, sessionId}` before it - * writes, so callers cannot route runtime transcript writes to JSON files. - */ - transcriptLocator?: string; workspaceDir: string; agentDir?: string; config?: OpenClawConfig; diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index ffe36bbe49b..7110e3b5ffb 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -21,18 +21,10 @@ import type { PreemptiveCompactionRoute } from "./preemptive-compaction.types.js type EmbeddedRunAttemptBase = Omit< RunEmbeddedPiAgentParams, - | "provider" - | "model" - | "authProfileId" - | "authProfileIdSource" - | "thinkLevel" - | "lane" - | "enqueue" - | "transcriptLocator" + "provider" | "model" | "authProfileId" | "authProfileIdSource" | "thinkLevel" | "lane" | "enqueue" >; export type EmbeddedRunAttemptParams = EmbeddedRunAttemptBase & { - transcriptLocator: string; initialReplayState?: EmbeddedRunReplayState; /** Pluggable context engine for ingest/assemble/compact lifecycle. */ contextEngine?: ContextEngine; diff --git a/src/agents/pi-embedded-runner/usage-reporting.test.ts b/src/agents/pi-embedded-runner/usage-reporting.test.ts index d612c631d98..1ce04a0d0ce 100644 --- a/src/agents/pi-embedded-runner/usage-reporting.test.ts +++ b/src/agents/pi-embedded-runner/usage-reporting.test.ts @@ -10,7 +10,6 @@ import { import type { EmbeddedRunAttemptResult } from "./run/types.js"; let runEmbeddedPiAgent: typeof import("./run.js").runEmbeddedPiAgent; -const TEST_TRANSCRIPT_LOCATOR = "sqlite-transcript://main/test-session"; function makeAssistantMessage( overrides: Partial = {}, @@ -48,7 +47,6 @@ describe("runEmbeddedPiAgent usage reporting", () => { await runEmbeddedPiAgent({ sessionId: "test-session", sessionKey: "test-key", - transcriptLocator: TEST_TRANSCRIPT_LOCATOR, workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 30000, @@ -71,7 +69,6 @@ describe("runEmbeddedPiAgent usage reporting", () => { await runEmbeddedPiAgent({ sessionId: "test-session", sessionKey: "test-key", - transcriptLocator: TEST_TRANSCRIPT_LOCATOR, workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 30000, @@ -100,7 +97,6 @@ describe("runEmbeddedPiAgent usage reporting", () => { await runEmbeddedPiAgent({ sessionId: "test-session", sessionKey: "test-key", - transcriptLocator: TEST_TRANSCRIPT_LOCATOR, workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 30000, @@ -135,7 +131,6 @@ describe("runEmbeddedPiAgent usage reporting", () => { await runEmbeddedPiAgent({ sessionId: "test-session", sessionKey: "test-key", - transcriptLocator: TEST_TRANSCRIPT_LOCATOR, workspaceDir: "/tmp/workspace", prompt: "flush", timeoutMs: 30000, @@ -180,7 +175,6 @@ describe("runEmbeddedPiAgent usage reporting", () => { const result = await runEmbeddedPiAgent({ sessionId: "test-session", sessionKey: "test-key", - transcriptLocator: TEST_TRANSCRIPT_LOCATOR, workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 30000, @@ -227,7 +221,6 @@ describe("runEmbeddedPiAgent usage reporting", () => { const result = await runEmbeddedPiAgent({ sessionId: "test-session", sessionKey: "test-key", - transcriptLocator: TEST_TRANSCRIPT_LOCATOR, workspaceDir: "/tmp/workspace", prompt: "hello", provider: "openrouter", diff --git a/src/agents/runtime-backend.test.ts b/src/agents/runtime-backend.test.ts index 7116e69bdcb..52977d8aa8c 100644 --- a/src/agents/runtime-backend.test.ts +++ b/src/agents/runtime-backend.test.ts @@ -8,7 +8,6 @@ function createPreparedRun(overrides: Partial = {}): PreparedA agentId: "main", sessionId: "session-1", sessionKey: "agent:main:main", - sessionFile: "sqlite-transcript://main/session-1.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 1000, diff --git a/src/agents/runtime-backend.ts b/src/agents/runtime-backend.ts index 6313a0a76f3..2d0f91e4d72 100644 --- a/src/agents/runtime-backend.ts +++ b/src/agents/runtime-backend.ts @@ -16,7 +16,6 @@ export type PreparedAgentRun = { agentId: string; sessionId: string; sessionKey?: string; - sessionFile: string; workspaceDir: string; agentDir?: string; prompt: string; @@ -96,7 +95,6 @@ export function assertPreparedAgentRunSerializable(run: PreparedAgentRun): Prepa "runId", "agentId", "sessionId", - "sessionFile", "workspaceDir", "prompt", ] satisfies (keyof PreparedAgentRun)[]; diff --git a/src/agents/runtime-worker-permissions.test.ts b/src/agents/runtime-worker-permissions.test.ts index fe271782488..23d58da217e 100644 --- a/src/agents/runtime-worker-permissions.test.ts +++ b/src/agents/runtime-worker-permissions.test.ts @@ -13,7 +13,6 @@ function createPreparedRun(overrides: Partial = {}): PreparedA agentId: "main", sessionId: "session-permissions", sessionKey: "agent:main:main", - sessionFile: "sqlite-transcript://main/session-permissions.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 1000, diff --git a/src/agents/runtime-worker.entry.test.ts b/src/agents/runtime-worker.entry.test.ts index 118c9fa1c57..f2035571fe5 100644 --- a/src/agents/runtime-worker.entry.test.ts +++ b/src/agents/runtime-worker.entry.test.ts @@ -23,7 +23,6 @@ function createPreparedRun( agentId: "main", sessionId: "session-worker", sessionKey: "agent:main:main", - sessionFile: "sqlite-transcript://main/session-worker.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 1000, diff --git a/src/agents/runtime-worker.test.ts b/src/agents/runtime-worker.test.ts index 80c7c859d4c..ef7678b9b88 100644 --- a/src/agents/runtime-worker.test.ts +++ b/src/agents/runtime-worker.test.ts @@ -40,7 +40,6 @@ function createPreparedRun(overrides: Partial = {}): PreparedA agentId: "main", sessionId: "session-worker", sessionKey: "agent:main:main", - sessionFile: "sqlite-transcript://main/session-worker.jsonl", workspaceDir: "/tmp/workspace", prompt: "hello", timeoutMs: 1000, diff --git a/src/agents/transcript-state-repair.ts b/src/agents/transcript-state-repair.ts index 0ff87962418..89d18d7f9e7 100644 --- a/src/agents/transcript-state-repair.ts +++ b/src/agents/transcript-state-repair.ts @@ -27,6 +27,11 @@ type SessionMessageEntry = { message: { role: string; content?: unknown } & Record; } & Record; +type TranscriptRepairScope = { + agentId: string; + sessionId: string; +}; + function isSessionHeader(entry: unknown): entry is { type: string; id: string } { if (!entry || typeof entry !== "object") { return false; @@ -184,24 +189,13 @@ function buildRepairSummaryParts(params: { return parts.length > 0 ? parts.join(", ") : "no changes"; } -export async function repairTranscriptStateIfNeeded(params: { - transcriptLocator: string; +async function repairTranscriptEntries(params: { + scope: TranscriptRepairScope; + label: string; debug?: (message: string) => void; warn?: (message: string) => void; }): Promise { - const transcriptLocator = params.transcriptLocator.trim(); - if (!transcriptLocator) { - return { repaired: false, droppedLines: 0, reason: "missing session transcript" }; - } - - const scope = resolveSqliteSessionTranscriptScopeForLocator({ - transcriptLocator: transcriptLocator, - }); - if (!scope) { - return { repaired: false, droppedLines: 0, reason: "missing SQLite transcript" }; - } - - const storedEntries = loadSqliteSessionTranscriptEvents(scope).map((entry) => entry.event); + const storedEntries = loadSqliteSessionTranscriptEvents(params.scope).map((entry) => entry.event); const entries: unknown[] = []; let droppedLines = 0; let rewrittenAssistantMessages = 0; @@ -246,9 +240,7 @@ export async function repairTranscriptStateIfNeeded(params: { } if (!isSessionHeader(entries[0])) { - params.warn?.( - `session transcript repair skipped: invalid session header (${transcriptLocator})`, - ); + params.warn?.(`session transcript repair skipped: invalid session header (${params.label})`); return { repaired: false, droppedLines, reason: "invalid session header" }; } @@ -263,7 +255,7 @@ export async function repairTranscriptStateIfNeeded(params: { try { replaceSqliteSessionTranscriptEvents({ - ...scope, + ...params.scope, events: entries, }); } catch (err) { @@ -283,7 +275,7 @@ export async function repairTranscriptStateIfNeeded(params: { rewrittenAssistantMessages, droppedBlankUserMessages, rewrittenUserMessages, - })} (${transcriptLocator})`, + })} (${params.label})`, ); return { repaired: true, @@ -293,3 +285,48 @@ export async function repairTranscriptStateIfNeeded(params: { rewrittenUserMessages, }; } + +export async function repairTranscriptStateIfNeeded(params: { + transcriptLocator: string; + debug?: (message: string) => void; + warn?: (message: string) => void; +}): Promise { + const transcriptLocator = params.transcriptLocator.trim(); + if (!transcriptLocator) { + return { repaired: false, droppedLines: 0, reason: "missing session transcript" }; + } + + const scope = resolveSqliteSessionTranscriptScopeForLocator({ + transcriptLocator: transcriptLocator, + }); + if (!scope) { + return { repaired: false, droppedLines: 0, reason: "missing SQLite transcript" }; + } + + return repairTranscriptEntries({ + scope, + label: transcriptLocator, + debug: params.debug, + warn: params.warn, + }); +} + +export async function repairTranscriptSessionStateIfNeeded(params: { + agentId: string; + sessionId: string; + debug?: (message: string) => void; + warn?: (message: string) => void; +}): Promise { + const agentId = params.agentId.trim(); + const sessionId = params.sessionId.trim(); + if (!agentId || !sessionId) { + return { repaired: false, droppedLines: 0, reason: "missing SQLite transcript scope" }; + } + + return repairTranscriptEntries({ + scope: { agentId, sessionId }, + label: `agentId=${agentId} sessionId=${sessionId}`, + debug: params.debug, + warn: params.warn, + }); +} diff --git a/src/agents/transcript/session-manager.ts b/src/agents/transcript/session-manager.ts index 1a1174d0460..87f674bb314 100644 --- a/src/agents/transcript/session-manager.ts +++ b/src/agents/transcript/session-manager.ts @@ -63,6 +63,27 @@ function normalizeTranscriptLocator(transcriptLocator: string): string { ); } +function normalizeTranscriptScopeId(value: string, label: string): string { + const trimmed = value.trim(); + if (!trimmed) { + throw new Error(`SQLite transcript ${label} is required`); + } + return trimmed; +} + +function createTranscriptScope(params: { + agentId: string; + sessionId: string; +}): TranscriptSqliteScope { + const agentId = normalizeTranscriptScopeId(params.agentId, "agent id"); + const sessionId = normalizeTranscriptScopeId(params.sessionId, "session id"); + return { + agentId, + sessionId, + transcriptLocator: createSqliteSessionTranscriptLocator({ agentId, sessionId }), + }; +} + function createTranscriptLocator(header: SessionHeader, agentId = DEFAULT_AGENT_ID): string { return createSqliteSessionTranscriptLocator({ agentId, @@ -141,6 +162,32 @@ function loadTranscriptState(params: { return { state, scope: headerScope }; } +function loadTranscriptStateForSession(params: { + agentId: string; + sessionId: string; + cwd?: string; +}): { + state: TranscriptState; + scope: TranscriptSqliteScope; +} { + const scope = createTranscriptScope({ + agentId: params.agentId, + sessionId: params.sessionId, + }); + const sqliteEvents = loadSqliteSessionTranscriptEvents(scope).map((entry) => entry.event); + if (sqliteEvents.length > 0) { + return { state: createTranscriptStateFromEvents(sqliteEvents), scope }; + } + + const header = createSessionHeader({ + id: scope.sessionId, + cwd: params.cwd ?? process.cwd(), + }); + const state = new TranscriptState({ header, entries: [] }); + persistFullTranscriptStateToSqlite(scope, state); + return { state, scope }; +} + function isMessageWithContent( message: unknown, ): message is { role: string; content: unknown; timestamp?: unknown } { @@ -315,6 +362,20 @@ export class TranscriptSessionManager implements SessionManager { }); } + static openForSession(params: { + agentId: string; + sessionId: string; + cwd?: string; + }): TranscriptSessionManager { + const loaded = loadTranscriptStateForSession(params); + return new TranscriptSessionManager({ + transcriptLocator: loaded.scope.transcriptLocator, + persist: true, + state: loaded.state, + sqliteScope: loaded.scope, + }); + } + static create(cwd: string): TranscriptSessionManager { const header = createSessionHeader({ cwd }); const transcriptLocator = createTranscriptLocator(header); @@ -620,6 +681,14 @@ export function openTranscriptSessionManager(params: { return TranscriptSessionManager.open(params); } +export function openTranscriptSessionManagerForSession(params: { + agentId: string; + sessionId: string; + cwd?: string; +}): SessionManager { + return TranscriptSessionManager.openForSession(params); +} + export const SessionManagerValue = { create: (cwd: string) => TranscriptSessionManager.create(cwd), open: (transcriptLocator: string, cwdOverride?: string) => { diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index d1220bcabcb..7c4321c28c0 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -289,7 +289,6 @@ export function createFollowupRunner(params: { senderUsername: run.senderUsername, senderE164: run.senderE164, senderIsOwner: run.senderIsOwner, - transcriptLocator: run.transcriptLocator, agentDir: run.agentDir, workspaceDir: run.workspaceDir, config: runtimeConfig, diff --git a/src/commands/agent.test.ts b/src/commands/agent.test.ts index 5ab28ac417a..11a1d91cc7c 100644 --- a/src/commands/agent.test.ts +++ b/src/commands/agent.test.ts @@ -127,7 +127,6 @@ vi.mock("../agents/command/attempt-execution.runtime.js", () => { messageTo: opts.replyTo ?? opts.to, messageThreadId: opts.threadId, senderIsOwner: opts.senderIsOwner, - sessionFile: params.sessionFile, workspaceDir: params.workspaceDir, config: params.cfg, skillsSnapshot: params.skillsSnapshot, @@ -238,16 +237,16 @@ vi.mock("../config/sessions/transcript-resolve.runtime.js", () => { async (params: { sessionId: string; sessionKey: string; - sessionEntry?: { sessionFile?: string; sessionId?: string }; - sessionStore?: Record; + sessionEntry?: { transcriptLocator?: string; sessionId?: string }; + sessionStore?: Record; agentId: string; threadId?: string | number; }) => { - const sessionFileFromStorePath = - params.sessionEntry?.sessionFile ?? + const transcriptLocatorFromStorePath = + params.sessionEntry?.transcriptLocator ?? resolveTranscriptLocator(params.sessionId, params.agentId); - const sessionFile = params.sessionEntry?.sessionFile - ? sessionFileFromStorePath + const transcriptLocator = params.sessionEntry?.transcriptLocator + ? transcriptLocatorFromStorePath : resolveTranscriptLocator(params.sessionId, params.agentId); let sessionEntry = params.sessionEntry; if (params.sessionStore && params.sessionKey) { @@ -258,12 +257,12 @@ vi.mock("../config/sessions/transcript-resolve.runtime.js", () => { sessionEntry = { ...existingEntry, sessionId: params.sessionId, - sessionFile, + transcriptLocator, }; params.sessionStore[params.sessionKey] = sessionEntry; await replaceTestSessionRows(params.agentId, params.sessionStore as never); } - return { sessionFile, sessionEntry }; + return { transcriptLocator, sessionEntry }; }, ), }; @@ -702,12 +701,7 @@ describe("agentCommand", () => { const callArgs = vi.mocked(runEmbeddedPiAgent).mock.calls.at(-1)?.[0]; expect(callArgs?.sessionId).toBe("session-123"); - expect(callArgs?.sessionFile).toContain( - path.join("agents", "main", "sessions", "session-123.jsonl"), - ); - expect(callArgs?.sessionFile).not.toContain( - `${path.sep}sessions${path.sep}agents${path.sep}main${path.sep}sessions${path.sep}`, - ); + expect(callArgs).not.toHaveProperty("transcriptLocator"); }); }); @@ -1096,7 +1090,7 @@ describe("agentCommand", () => { ); let callArgs = getLastEmbeddedCall(); expect(callArgs?.sessionKey).toBe("agent:ops:main"); - expect(callArgs?.sessionFile).toContain(`${path.sep}agents${path.sep}ops${path.sep}sessions`); + expect(callArgs).not.toHaveProperty("transcriptLocator"); expect(callArgs?.messageChannel).toBe("slack"); expect(runtime.log).toHaveBeenCalledWith("ok"); diff --git a/src/commands/models/list.probe.ts b/src/commands/models/list.probe.ts index 1a14caf908c..30873a66ad8 100644 --- a/src/commands/models/list.probe.ts +++ b/src/commands/models/list.probe.ts @@ -24,7 +24,6 @@ import { parseModelRef, } from "../../agents/model-selection.js"; import { resolveDefaultAgentWorkspaceDir } from "../../agents/workspace.js"; -import { createSqliteSessionTranscriptLocator } from "../../config/sessions/paths.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { coerceSecretRef, normalizeSecretInputString } from "../../config/types.secrets.js"; import { type SecretRefResolveCache, resolveSecretRefString } from "../../secrets/resolve.js"; @@ -457,7 +456,6 @@ async function probeTarget(params: { const model = target.model; const sessionId = `probe-${target.provider}-${crypto.randomUUID()}`; - const sessionFile = createSqliteSessionTranscriptLocator({ sessionId, agentId }); const start = Date.now(); const buildResult = (status: AuthProbeResult["status"], error?: string): AuthProbeResult => ({ @@ -475,7 +473,6 @@ async function probeTarget(params: { const { runEmbeddedPiAgent } = await loadEmbeddedRunnerModule(); await runEmbeddedPiAgent({ sessionId, - sessionFile, agentId, workspaceDir, agentDir, diff --git a/src/commitments/runtime.test.ts b/src/commitments/runtime.test.ts index 55939153440..eb9103109bc 100644 --- a/src/commitments/runtime.test.ts +++ b/src/commitments/runtime.test.ts @@ -203,11 +203,13 @@ describe("commitment extraction runtime", () => { await expect(drainCommitmentExtractionQueue()).resolves.toBe(1); expect(resolveDefaultModelMock).toHaveBeenCalledWith({ cfg, agentId: "main" }); - expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1); - const request = runEmbeddedPiAgentMock.mock.calls[0]?.[0]; - expect(request.provider).toBe("openai-codex"); - expect(request.model).toBe("gpt-5.5"); - expect(request.disableTools).toBe(true); + expect(runEmbeddedPiAgentMock).toHaveBeenCalledWith( + expect.objectContaining({ + provider: "openai-codex", + model: "gpt-5.5", + disableTools: true, + }), + ); }); it("backs off hidden extraction after terminal model or auth failures", async () => { diff --git a/src/commitments/runtime.ts b/src/commitments/runtime.ts index e967c046670..2d64b177823 100644 --- a/src/commitments/runtime.ts +++ b/src/commitments/runtime.ts @@ -2,7 +2,6 @@ import { randomUUID } from "node:crypto"; import { resolveAgentWorkspaceDir } from "../agents/agent-scope.js"; import { runEmbeddedPiAgent, type EmbeddedPiRunResult } from "../agents/pi-embedded.js"; import type { OpenClawConfig } from "../config/config.js"; -import { createSqliteSessionTranscriptLocator } from "../config/sessions/paths.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; import { resolveCommitmentTimezone, resolveCommitmentsConfig } from "./config.js"; @@ -180,10 +179,6 @@ function openTerminalFailureCooldown(agentId: string, error: unknown): void { }); } -function resolveExtractionSessionFile(agentId: string, runId: string): string { - return createSqliteSessionTranscriptLocator({ agentId, sessionId: runId }); -} - function joinPayloadText(result: EmbeddedPiRunResult): string { return ( result.payloads @@ -222,7 +217,6 @@ async function defaultExtractBatch(params: { sessionKey: `agent:${first.agentId}:commitments:${runId}`, agentId: first.agentId, trigger: "manual", - sessionFile: resolveExtractionSessionFile(first.agentId, runId), workspaceDir: resolveAgentWorkspaceDir(cfg, first.agentId), config: cfg, provider: modelRef.provider, diff --git a/src/crestodian/assistant.ts b/src/crestodian/assistant.ts index 73242b60401..c7666aec5f3 100644 --- a/src/crestodian/assistant.ts +++ b/src/crestodian/assistant.ts @@ -183,7 +183,7 @@ async function runLocalRuntimePlanner( try { const runId = `crestodian-planner-${randomUUID()}`; const sessionId = `${runId}-session`; - const sessionFile = createSqliteSessionTranscriptLocator({ + const transcriptLocator = createSqliteSessionTranscriptLocator({ agentId: "crestodian", sessionId, }); @@ -196,7 +196,7 @@ async function runLocalRuntimePlanner( sessionKey, agentId: "crestodian", trigger: "manual", - sessionFile, + transcriptLocator, workspaceDir: tempDir, config: backend.buildConfig(tempDir), prompt: params.prompt, @@ -220,7 +220,6 @@ async function runLocalRuntimePlanner( sessionKey, agentId: "crestodian", trigger: "manual", - sessionFile, workspaceDir: tempDir, config: backend.buildConfig(tempDir), prompt: params.prompt, diff --git a/src/cron/isolated-agent/run-executor.ts b/src/cron/isolated-agent/run-executor.ts index 832d9282015..52625f7faab 100644 --- a/src/cron/isolated-agent/run-executor.ts +++ b/src/cron/isolated-agent/run-executor.ts @@ -199,7 +199,6 @@ export function createCronPromptExecutor(params: { messageTo: params.resolvedDelivery.to, messageThreadId: params.resolvedDelivery.threadId, currentChannelId, - transcriptLocator, agentDir: params.agentDir, workspaceDir: params.workspaceDir, config: params.cfgWithAgentDefaults, diff --git a/src/hooks/llm-slug-generator.ts b/src/hooks/llm-slug-generator.ts index 22f96af15d3..4f3f116e3ce 100644 --- a/src/hooks/llm-slug-generator.ts +++ b/src/hooks/llm-slug-generator.ts @@ -11,7 +11,6 @@ import { import { resolveDefaultModelForAgent } from "../agents/model-selection.js"; import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; import { resolveAgentTimeoutMs } from "../agents/timeout.js"; -import { createSqliteSessionTranscriptLocator } from "../config/sessions/paths.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; @@ -39,7 +38,6 @@ export async function generateSlugViaLLM(params: { const workspaceDir = resolveAgentWorkspaceDir(params.cfg, agentId); const agentDir = resolveAgentDir(params.cfg, agentId); const sessionId = `slug-generator-${randomUUID()}`; - const sessionFile = createSqliteSessionTranscriptLocator({ agentId, sessionId }); const prompt = `Based on this conversation, generate a short 1-2 word filename slug (lowercase, hyphen-separated, no file extension). @@ -58,7 +56,6 @@ Reply with ONLY the slug, nothing else. Examples: "vendor-pitch", "api-design", sessionId, sessionKey: "temp:slug-generator", agentId, - sessionFile, workspaceDir, agentDir, config: params.cfg, diff --git a/src/talk/agent-consult-runtime.test.ts b/src/talk/agent-consult-runtime.test.ts index 0207acf2584..2f25863f06c 100644 --- a/src/talk/agent-consult-runtime.test.ts +++ b/src/talk/agent-consult-runtime.test.ts @@ -1,12 +1,12 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import type { RunEmbeddedPiAgentParams } from "../agents/pi-embedded-runner/run/params.js"; +import { createSqliteSessionTranscriptLocator } from "../config/sessions/paths.js"; import { __setRealtimeVoiceAgentConsultDepsForTest, consultRealtimeVoiceAgent, resolveRealtimeVoiceAgentConsultTools, resolveRealtimeVoiceAgentConsultToolsAllow, } from "./agent-consult-runtime.js"; -import { REALTIME_VOICE_AGENT_CONSULT_TOOL } from "./agent-consult-tool.js"; +import { REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME } from "./agent-consult-tool.js"; function createAgentRuntime(payloads: unknown[] = [{ text: "Speak this." }]) { const sessionStore: Record< @@ -14,7 +14,7 @@ function createAgentRuntime(payloads: unknown[] = [{ text: "Speak this." }]) { { sessionId?: string; updatedAt?: number; - sessionFile?: string; + transcriptLocator?: string; spawnedBy?: string; forkedFromParent?: boolean; totalTokens?: number; @@ -88,36 +88,16 @@ function createAgentRuntime(payloads: unknown[] = [{ text: "Speak this." }]) { }; } -function requireEmbeddedPiAgentCall(runEmbeddedPiAgent: { - mock: { calls: unknown[][] }; -}): RunEmbeddedPiAgentParams { - const call = runEmbeddedPiAgent.mock.calls[0]?.[0] as RunEmbeddedPiAgentParams | undefined; - if (!call) { - throw new Error("Expected embedded PI agent call"); - } - return call; -} - -function expectPositiveTimestamp(value: unknown) { - expect(typeof value).toBe("number"); - expect(value as number).toBeGreaterThan(0); -} - -function expectNonEmptyString(value: unknown) { - expect(typeof value).toBe("string"); - expect((value as string).trim()).not.toBe(""); -} - describe("realtime voice agent consult runtime", () => { afterEach(() => { __setRealtimeVoiceAgentConsultDepsForTest(null); }); it("exposes the shared consult tool based on policy", () => { - expect(resolveRealtimeVoiceAgentConsultTools("safe-read-only")).toStrictEqual([ - REALTIME_VOICE_AGENT_CONSULT_TOOL, + expect(resolveRealtimeVoiceAgentConsultTools("safe-read-only")).toEqual([ + expect.objectContaining({ name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME }), ]); - expect(resolveRealtimeVoiceAgentConsultTools("none")).toStrictEqual([]); + expect(resolveRealtimeVoiceAgentConsultTools("none")).toEqual([]); expect(resolveRealtimeVoiceAgentConsultToolsAllow("safe-read-only")).toEqual([ "read", "web_search", @@ -127,7 +107,7 @@ describe("realtime voice agent consult runtime", () => { "memory_get", ]); expect(resolveRealtimeVoiceAgentConsultToolsAllow("owner")).toBeUndefined(); - expect(resolveRealtimeVoiceAgentConsultToolsAllow("none")).toStrictEqual([]); + expect(resolveRealtimeVoiceAgentConsultToolsAllow("none")).toEqual([]); }); it("runs an embedded agent using the shared session and prompt contract", async () => { @@ -150,7 +130,6 @@ describe("realtime voice agent consult runtime", () => { provider: "openai", model: "gpt-5.4", thinkLevel: "high", - fastMode: true, timeoutMs: 10_000, }); @@ -159,24 +138,23 @@ describe("realtime voice agent consult runtime", () => { if (!voiceSession) { throw new Error("Expected voice consult session entry"); } - expect(Object.keys(voiceSession).toSorted()).toStrictEqual(["sessionId", "updatedAt"]); - expectNonEmptyString(voiceSession.sessionId); - expectPositiveTimestamp(voiceSession.updatedAt); - const call = requireEmbeddedPiAgentCall(runEmbeddedPiAgent); - expect(call.sessionId).toBe(voiceSession.sessionId); - expect(call.sessionKey).toBe("voice:15550001234"); - expect(call.sandboxSessionKey).toBe("agent:main:voice:15550001234"); - expect(call.agentId).toBe("main"); - expect(call.messageProvider).toBe("voice"); - expect(call.lane).toBe("voice"); - expect(call.toolsAllow).toStrictEqual(["read"]); - expect(call.provider).toBe("openai"); - expect(call.model).toBe("gpt-5.4"); - expect(call.thinkLevel).toBe("high"); - expect(call.fastMode).toBe(true); - expect(call.timeoutMs).toBe(10_000); - expect(call.prompt).toContain("Caller: Can you check this?"); - expect(call.extraSystemPrompt).toContain("delegated requests"); + expect(voiceSession.sessionId).toEqual(expect.stringMatching(/\S/)); + expect(runEmbeddedPiAgent).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "voice:15550001234", + sandboxSessionKey: "agent:main:voice:15550001234", + agentId: "main", + messageProvider: "voice", + lane: "voice", + toolsAllow: ["read"], + provider: "openai", + model: "gpt-5.4", + thinkLevel: "high", + timeoutMs: 10_000, + prompt: expect.stringContaining("Caller: Can you check this?"), + extraSystemPrompt: expect.stringContaining("delegated requests"), + }), + ); }); it("scopes sandbox resolution to the configured consult agent", async () => { @@ -197,10 +175,13 @@ describe("realtime voice agent consult runtime", () => { userLabel: "Caller", }); - const call = requireEmbeddedPiAgentCall(runEmbeddedPiAgent); - expect(call.sessionKey).toBe("voice:15550001234"); - expect(call.sandboxSessionKey).toBe("agent:voice:voice:15550001234"); - expect(call.agentId).toBe("voice"); + expect(runEmbeddedPiAgent).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "voice:15550001234", + sandboxSessionKey: "agent:voice:voice:15550001234", + agentId: "voice", + }), + ); }); it("returns a speakable fallback when the embedded agent has no visible text", async () => { @@ -232,7 +213,10 @@ describe("realtime voice agent consult runtime", () => { const { runtime, runEmbeddedPiAgent, sessionStore } = createAgentRuntime(); sessionStore["agent:main:main"] = { sessionId: "parent-session", - sessionFile: "/tmp/parent.jsonl", + transcriptLocator: createSqliteSessionTranscriptLocator({ + agentId: "main", + sessionId: "parent-session", + }), totalTokens: 100, updatedAt: 1, }; @@ -243,7 +227,7 @@ describe("realtime voice agent consult runtime", () => { })); const forkSessionFromParent = vi.fn(async () => ({ sessionId: "forked-session", - sessionFile: "sqlite-transcript://main/forked-session.jsonl", + transcriptLocator: "sqlite-transcript://main/forked-session", })); __setRealtimeVoiceAgentConsultDepsForTest({ resolveParentForkDecision, @@ -275,19 +259,17 @@ describe("realtime voice agent consult runtime", () => { parentEntry: sessionStore["agent:main:main"], agentId: "main", }); - const forkedEntry = sessionStore["agent:main:subagent:google-meet:meet-1"]; - if (!forkedEntry) { - throw new Error("Expected forked consult session entry"); - } - expect(forkedEntry).toMatchObject({ + expect(sessionStore["agent:main:subagent:google-meet:meet-1"]).toMatchObject({ sessionId: "forked-session", spawnedBy: "agent:main:main", forkedFromParent: true, }); - expectPositiveTimestamp(forkedEntry.updatedAt); - const call = requireEmbeddedPiAgentCall(runEmbeddedPiAgent); - expect(call.sessionId).toBe("forked-session"); - expect(call.spawnedBy).toBe("agent:main:main"); + expect(runEmbeddedPiAgent).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "forked-session", + spawnedBy: "agent:main:main", + }), + ); }); it("inherits requester message routing for forked consult sessions", async () => { @@ -319,20 +301,17 @@ describe("realtime voice agent consult runtime", () => { userLabel: "Caller", }); - const call = requireEmbeddedPiAgentCall(runEmbeddedPiAgent); - expect(call.sessionKey).toBe("voice:google-meet:meet-1"); - expect(call.spawnedBy).toBe("agent:main:discord:channel:123"); - expect(call.messageProvider).toBe("discord"); - expect(call.agentAccountId).toBe("default"); - expect(call.messageTo).toBe("channel:123"); - expect(call.currentChannelId).toBe("channel:123"); - const voiceEntry = sessionStore["voice:google-meet:meet-1"]; - if (!voiceEntry) { - throw new Error("Expected voice consult session entry"); - } - expect(voiceEntry).toStrictEqual({ - sessionId: voiceEntry.sessionId, - spawnedBy: "agent:main:discord:channel:123", + expect(runEmbeddedPiAgent).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey: "voice:google-meet:meet-1", + spawnedBy: "agent:main:discord:channel:123", + messageProvider: "discord", + agentAccountId: "default", + messageTo: "channel:123", + currentChannelId: "channel:123", + }), + ); + expect(sessionStore["voice:google-meet:meet-1"]).toMatchObject({ deliveryContext: { channel: "discord", to: "channel:123", @@ -341,11 +320,7 @@ describe("realtime voice agent consult runtime", () => { lastChannel: "discord", lastTo: "channel:123", lastAccountId: "default", - lastThreadId: undefined, - updatedAt: voiceEntry.updatedAt, }); - expectNonEmptyString(voiceEntry.sessionId); - expectPositiveTimestamp(voiceEntry.updatedAt); }); it("reuses the call session delivery context when requester metadata is absent", async () => { @@ -376,14 +351,17 @@ describe("realtime voice agent consult runtime", () => { userLabel: "Caller", }); - const call = requireEmbeddedPiAgentCall(runEmbeddedPiAgent); - expect(call.sessionId).toBe("call-session"); - expect(call.sessionKey).toBe("voice:google-meet:meet-1"); - expect(call.messageProvider).toBe("discord"); - expect(call.agentAccountId).toBe("default"); - expect(call.messageTo).toBe("channel:123"); - expect(call.messageThreadId).toBe("thread-456"); - expect(call.currentChannelId).toBe("channel:123"); - expect(call.currentThreadTs).toBe("thread-456"); + expect(runEmbeddedPiAgent).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: "call-session", + sessionKey: "voice:google-meet:meet-1", + messageProvider: "discord", + agentAccountId: "default", + messageTo: "channel:123", + messageThreadId: "thread-456", + currentChannelId: "channel:123", + currentThreadTs: "thread-456", + }), + ); }); }); diff --git a/src/talk/agent-consult-runtime.ts b/src/talk/agent-consult-runtime.ts index d62aa4ac70a..a1b10e86a50 100644 --- a/src/talk/agent-consult-runtime.ts +++ b/src/talk/agent-consult-runtime.ts @@ -4,7 +4,6 @@ import { forkSessionFromParent, resolveParentForkDecision, } from "../auto-reply/reply/session-fork.js"; -import { createSqliteSessionTranscriptLocator } from "../config/sessions/paths.js"; import { parseSessionThreadInfoFast } from "../config/sessions/thread-info.js"; import type { SessionEntry } from "../config/sessions/types.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; @@ -245,10 +244,6 @@ export async function consultRealtimeVoiceAgent(params: { resolvedDeliveryContext ?? deliveryContextFromSession(sessionEntry); const sessionId = sessionEntry.sessionId; - const transcriptLocator = createSqliteSessionTranscriptLocator({ - agentId, - sessionId, - }); const result = await params.agentRuntime.runEmbeddedPiAgent({ sessionId, sessionKey: params.sessionKey, @@ -264,7 +259,6 @@ export async function consultRealtimeVoiceAgent(params: { consultDeliveryContext?.threadId != null ? String(consultDeliveryContext.threadId) : undefined, - transcriptLocator, workspaceDir, config: params.cfg, prompt: buildRealtimeVoiceAgentConsultPrompt({