diff --git a/CHANGELOG.md b/CHANGELOG.md index 14a65faed3f..af7b0ee55fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Heartbeat: allow explicit accountId routing for multi-account channels. (#8702) Thanks @lsh411. +- TUI/Gateway: handle non-streaming finals, refresh history for non-local chat runs, and avoid event gap warnings for targeted tool streams. (#8432) Thanks @gumadeiras. - Shell completion: auto-detect and migrate slow dynamic patterns to cached files for faster terminal startup; add completion health checks to doctor/update/onboard. - Telegram: honor session model overrides in inline model selection. (#8193) Thanks @gildo. - Web UI: fix agent model selection saves for default/non-default agents and wrap long workspace paths. Thanks @Takhoffman. diff --git a/package.json b/package.json index cb9ea558a2f..c5f1ace5a22 100644 --- a/package.json +++ b/package.json @@ -108,10 +108,10 @@ "@larksuiteoapi/node-sdk": "^1.42.0", "@line/bot-sdk": "^10.6.0", "@lydell/node-pty": "1.2.0-beta.3", - "@mariozechner/pi-agent-core": "0.51.1", - "@mariozechner/pi-ai": "0.51.1", - "@mariozechner/pi-coding-agent": "0.51.1", - "@mariozechner/pi-tui": "0.51.1", + "@mariozechner/pi-agent-core": "0.51.3", + "@mariozechner/pi-ai": "0.51.3", + "@mariozechner/pi-coding-agent": "0.51.3", + "@mariozechner/pi-tui": "0.51.3", "@mozilla/readability": "^0.6.0", "@sinclair/typebox": "0.34.48", "@slack/bolt": "^4.6.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0df19dd14da..8ae1a7c2cd1 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -49,17 +49,17 @@ importers: specifier: 1.2.0-beta.3 version: 1.2.0-beta.3 '@mariozechner/pi-agent-core': - specifier: 0.51.1 - version: 0.51.1(ws@8.19.0)(zod@4.3.6) + specifier: 0.51.3 + version: 0.51.3(ws@8.19.0)(zod@4.3.6) '@mariozechner/pi-ai': - specifier: 0.51.1 - version: 0.51.1(ws@8.19.0)(zod@4.3.6) + specifier: 0.51.3 + version: 0.51.3(ws@8.19.0)(zod@4.3.6) '@mariozechner/pi-coding-agent': - specifier: 0.51.1 - version: 0.51.1(ws@8.19.0)(zod@4.3.6) + specifier: 0.51.3 + version: 0.51.3(ws@8.19.0)(zod@4.3.6) '@mariozechner/pi-tui': - specifier: 0.51.1 - version: 0.51.1 + specifier: 0.51.3 + version: 0.51.3 '@mozilla/readability': specifier: ^0.6.0 version: 0.6.0 @@ -1457,22 +1457,22 @@ packages: resolution: {integrity: sha512-faGUlTcXka5l7rv0lP3K3vGW/ejRuOS24RR2aSFWREUQqzjgdsuWNo/IiPqL3kWRGt6Ahl2+qcDAwtdeWeuGUw==} hasBin: true - '@mariozechner/pi-agent-core@0.51.1': - resolution: {integrity: sha512-Ssy7ipyYl2mg99T3W5maA1DKrFCYrWeM6kq5awyd+e34Bd6njK5bsi1keqtlbCIsTCtF9NngUwUJ2lWEi9kHhA==} + '@mariozechner/pi-agent-core@0.51.3': + resolution: {integrity: sha512-pO5ScRuf7F5GCqS02vuB3gIV/MHR2cskEEUnbVbkSf0RHJb3vTICy/ACQyeI+UYk7yjFmdvQgbSUtVrYJ3q8Ag==} engines: {node: '>=20.0.0'} - '@mariozechner/pi-ai@0.51.1': - resolution: {integrity: sha512-QJgiVwxvUJx6QECSqOQi1NNhOdzzFYDoX3C21aPgYH9DQQpvg4thzhSK9eZoxD+HsQGfcq8u/DkPdPyl0tl8Bg==} + '@mariozechner/pi-ai@0.51.3': + resolution: {integrity: sha512-NocfuwUPCGeNhWyfzSGKbsTqUvFmP+VihU8+xtzX9FoHvQQVJHQ49Sz8sfLK04BbEWYI9s/gZ7a9xnJ0O4cz8g==} engines: {node: '>=20.0.0'} hasBin: true - '@mariozechner/pi-coding-agent@0.51.1': - resolution: {integrity: sha512-vZCQ1gOQKC5kJOUQLMZb55OySIG27NxcMTKbJUQ0f1Ncn5uvV/Z4I/U5Ok217tm60EDC4JRv5GC1YMwpVRQFyg==} + '@mariozechner/pi-coding-agent@0.51.3': + resolution: {integrity: sha512-pu/4IxeMZMapYiSO3LWvNRztOXXKLlLNL+drjMvtgWbp9MJ8azP+5Zwsp3/vzrPvM54wCkaSa0voUEThm4Ba/Q==} engines: {node: '>=20.0.0'} hasBin: true - '@mariozechner/pi-tui@0.51.1': - resolution: {integrity: sha512-1g6Z4WBvWcQf3bMM85fsHyQHv4mOcqKoH1AB8+G2lBHO49707gqHc3y6LbXuBSNn8uINGoAk2LpUoAUFnxLExg==} + '@mariozechner/pi-tui@0.51.3': + resolution: {integrity: sha512-1B9C3oVsAcBSO0rvk4qC3Iq655LveLQDSnlseypCo/KiR5eY39Hw1XRtvq5N05mtxNuo3mRw8FMcYCwIl1BbDg==} engines: {node: '>=20.0.0'} '@matrix-org/matrix-sdk-crypto-nodejs@0.4.0': @@ -3766,6 +3766,7 @@ packages: glob@11.1.0: resolution: {integrity: sha512-vuNwKSaKiqm7g0THUBu2x7ckSs3XJLXE+2ssL7/MfTGPLLcrJQ/4Uq1CjPTtO5cCIiRxqvN6Twy1qOwhL0Xjcw==} engines: {node: 20 || >=22} + deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me hasBin: true google-auth-library@10.5.0: @@ -6783,10 +6784,9 @@ snapshots: std-env: 3.10.0 yoctocolors: 2.1.2 - '@mariozechner/pi-agent-core@0.51.1(ws@8.19.0)(zod@4.3.6)': + '@mariozechner/pi-agent-core@0.51.3(ws@8.19.0)(zod@4.3.6)': dependencies: - '@mariozechner/pi-ai': 0.51.1(ws@8.19.0)(zod@4.3.6) - '@mariozechner/pi-tui': 0.51.1 + '@mariozechner/pi-ai': 0.51.3(ws@8.19.0)(zod@4.3.6) transitivePeerDependencies: - '@modelcontextprotocol/sdk' - aws-crt @@ -6796,7 +6796,7 @@ snapshots: - ws - zod - '@mariozechner/pi-ai@0.51.1(ws@8.19.0)(zod@4.3.6)': + '@mariozechner/pi-ai@0.51.3(ws@8.19.0)(zod@4.3.6)': dependencies: '@anthropic-ai/sdk': 0.71.2(zod@4.3.6) '@aws-sdk/client-bedrock-runtime': 3.981.0 @@ -6820,12 +6820,12 @@ snapshots: - ws - zod - '@mariozechner/pi-coding-agent@0.51.1(ws@8.19.0)(zod@4.3.6)': + '@mariozechner/pi-coding-agent@0.51.3(ws@8.19.0)(zod@4.3.6)': dependencies: '@mariozechner/jiti': 2.6.5 - '@mariozechner/pi-agent-core': 0.51.1(ws@8.19.0)(zod@4.3.6) - '@mariozechner/pi-ai': 0.51.1(ws@8.19.0)(zod@4.3.6) - '@mariozechner/pi-tui': 0.51.1 + '@mariozechner/pi-agent-core': 0.51.3(ws@8.19.0)(zod@4.3.6) + '@mariozechner/pi-ai': 0.51.3(ws@8.19.0)(zod@4.3.6) + '@mariozechner/pi-tui': 0.51.3 '@silvia-odwyer/photon-node': 0.3.4 chalk: 5.6.2 cli-highlight: 2.1.11 @@ -6848,7 +6848,7 @@ snapshots: - ws - zod - '@mariozechner/pi-tui@0.51.1': + '@mariozechner/pi-tui@0.51.3': dependencies: '@types/mime-types': 2.1.4 chalk: 5.6.2 diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 840d5c74b76..a5bd6bd5367 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -163,6 +163,7 @@ export function handleMessageUpdate( mediaUrls: hasMedia ? mediaUrls : undefined, }, }); + ctx.state.emittedAssistantUpdate = true; if (ctx.params.onPartialReply && ctx.state.shouldEmitPartialReplies) { void ctx.params.onPartialReply({ text: cleanedText, @@ -215,6 +216,44 @@ export function handleMessageEnd( ? extractAssistantThinking(assistantMessage) || extractThinkingFromTaggedText(rawText) : ""; const formattedReasoning = rawThinking ? formatReasoningMessage(rawThinking) : ""; + const trimmedText = text.trim(); + const parsedText = trimmedText ? parseReplyDirectives(stripTrailingDirective(trimmedText)) : null; + let cleanedText = parsedText?.text ?? ""; + let mediaUrls = parsedText?.mediaUrls; + let hasMedia = Boolean(mediaUrls && mediaUrls.length > 0); + + if (!cleanedText && !hasMedia) { + const rawTrimmed = rawText.trim(); + const rawStrippedFinal = rawTrimmed.replace(/<\s*\/?\s*final\s*>/gi, "").trim(); + const rawCandidate = rawStrippedFinal || rawTrimmed; + if (rawCandidate) { + const parsedFallback = parseReplyDirectives(stripTrailingDirective(rawCandidate)); + cleanedText = parsedFallback.text ?? rawCandidate; + mediaUrls = parsedFallback.mediaUrls; + hasMedia = Boolean(mediaUrls && mediaUrls.length > 0); + } + } + + if (!ctx.state.emittedAssistantUpdate && (cleanedText || hasMedia)) { + emitAgentEvent({ + runId: ctx.params.runId, + stream: "assistant", + data: { + text: cleanedText, + delta: cleanedText, + mediaUrls: hasMedia ? mediaUrls : undefined, + }, + }); + void ctx.params.onAgentEvent?.({ + stream: "assistant", + data: { + text: cleanedText, + delta: cleanedText, + mediaUrls: hasMedia ? mediaUrls : undefined, + }, + }); + ctx.state.emittedAssistantUpdate = true; + } const addedDuringMessage = ctx.state.assistantTexts.length > ctx.state.assistantTextBaseline; const chunkerHasBuffered = ctx.blockChunker?.hasBuffered() ?? false; diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index e9758ba8fc2..db2f07b7683 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -39,6 +39,7 @@ export type EmbeddedPiSubscribeState = { partialBlockState: { thinking: boolean; final: boolean; inlineCode: InlineCodeState }; lastStreamedAssistant?: string; lastStreamedAssistantCleaned?: string; + emittedAssistantUpdate: boolean; lastStreamedReasoning?: string; lastBlockReplyText?: string; assistantMessageIndex: number; diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.test.ts index c9d5dd0810c..ad7bdfd81cb 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.test.ts @@ -62,6 +62,39 @@ describe("subscribeEmbeddedPiSession", () => { expect(onPartialReply).not.toHaveBeenCalled(); }); + it("emits agent events on message_end even without tags", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onAgentEvent = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run", + enforceFinalTag: true, + onAgentEvent, + }); + + const assistantMessage = { + role: "assistant", + content: [{ type: "text", text: "Hello world" }], + } as AssistantMessage; + + handler?.({ type: "message_start", message: assistantMessage }); + handler?.({ type: "message_end", message: assistantMessage }); + + const payloads = onAgentEvent.mock.calls + .map((call) => call[0]?.data as Record | undefined) + .filter((value): value is Record => Boolean(value)); + expect(payloads).toHaveLength(1); + expect(payloads[0]?.text).toBe("Hello world"); + expect(payloads[0]?.delta).toBe("Hello world"); + }); it("does not require when enforcement is off", () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts index ec38734ac8b..7b52dfe74d5 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts @@ -185,6 +185,71 @@ describe("subscribeEmbeddedPiSession", () => { expect(payloads[1]?.delta).toBe(" world"); }); + it("emits agent events on message_end for non-streaming assistant text", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onAgentEvent = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run", + onAgentEvent, + }); + + const assistantMessage = { + role: "assistant", + content: [{ type: "text", text: "Hello world" }], + } as AssistantMessage; + + handler?.({ type: "message_start", message: assistantMessage }); + handler?.({ type: "message_end", message: assistantMessage }); + + const payloads = onAgentEvent.mock.calls + .map((call) => call[0]?.data as Record | undefined) + .filter((value): value is Record => Boolean(value)); + expect(payloads).toHaveLength(1); + expect(payloads[0]?.text).toBe("Hello world"); + expect(payloads[0]?.delta).toBe("Hello world"); + }); + + it("does not emit duplicate agent events when message_end repeats", () => { + let handler: ((evt: unknown) => void) | undefined; + const session: StubSession = { + subscribe: (fn) => { + handler = fn; + return () => {}; + }, + }; + + const onAgentEvent = vi.fn(); + + subscribeEmbeddedPiSession({ + session: session as unknown as Parameters[0]["session"], + runId: "run", + onAgentEvent, + }); + + const assistantMessage = { + role: "assistant", + content: [{ type: "text", text: "Hello world" }], + } as AssistantMessage; + + handler?.({ type: "message_start", message: assistantMessage }); + handler?.({ type: "message_end", message: assistantMessage }); + handler?.({ type: "message_end", message: assistantMessage }); + + const payloads = onAgentEvent.mock.calls + .map((call) => call[0]?.data as Record | undefined) + .filter((value): value is Record => Boolean(value)); + expect(payloads).toHaveLength(1); + }); + it("skips agent events when cleaned text rewinds mid-stream", () => { let handler: ((evt: unknown) => void) | undefined; const session: StubSession = { diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index e9853775065..0a4b9c0fa53 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -49,6 +49,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar partialBlockState: { thinking: false, final: false, inlineCode: createInlineCodeState() }, lastStreamedAssistant: undefined, lastStreamedAssistantCleaned: undefined, + emittedAssistantUpdate: false, lastStreamedReasoning: undefined, lastBlockReplyText: undefined, assistantMessageIndex: 0, @@ -95,6 +96,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.partialBlockState.inlineCode = createInlineCodeState(); state.lastStreamedAssistant = undefined; state.lastStreamedAssistantCleaned = undefined; + state.emittedAssistantUpdate = false; state.lastBlockReplyText = undefined; state.lastStreamedReasoning = undefined; state.lastReasoningSent = undefined; diff --git a/src/gateway/protocol/client-info.ts b/src/gateway/protocol/client-info.ts index 7bb30f91709..7cd409f9057 100644 --- a/src/gateway/protocol/client-info.ts +++ b/src/gateway/protocol/client-info.ts @@ -42,6 +42,12 @@ export type GatewayClientInfo = { instanceId?: string; }; +export const GATEWAY_CLIENT_CAPS = { + TOOL_EVENTS: "tool-events", +} as const; + +export type GatewayClientCap = (typeof GATEWAY_CLIENT_CAPS)[keyof typeof GATEWAY_CLIENT_CAPS]; + const GATEWAY_CLIENT_ID_SET = new Set(Object.values(GATEWAY_CLIENT_IDS)); const GATEWAY_CLIENT_MODE_SET = new Set(Object.values(GATEWAY_CLIENT_MODES)); @@ -68,3 +74,13 @@ export function normalizeGatewayClientMode(raw?: string | null): GatewayClientMo ? (normalized as GatewayClientMode) : undefined; } + +export function hasGatewayClientCap( + caps: string[] | null | undefined, + cap: GatewayClientCap, +): boolean { + if (!Array.isArray(caps)) { + return false; + } + return caps.includes(cap); +} diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 7398ef42a46..f6e1813013b 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -1,4 +1,5 @@ import AjvPkg, { type ErrorObject } from "ajv"; +import type { SessionsPatchResult } from "../session-utils.types.js"; import { type AgentEvent, AgentEventSchema, @@ -536,6 +537,7 @@ export type { SessionsPreviewParams, SessionsResolveParams, SessionsPatchParams, + SessionsPatchResult, SessionsResetParams, SessionsDeleteParams, SessionsCompactParams, diff --git a/src/gateway/server-broadcast.test.ts b/src/gateway/server-broadcast.test.ts index 0dcec9e0974..2cb5855a98e 100644 --- a/src/gateway/server-broadcast.test.ts +++ b/src/gateway/server-broadcast.test.ts @@ -44,7 +44,7 @@ describe("gateway broadcaster", () => { }, ]); - const { broadcast } = createGatewayBroadcaster({ clients }); + const { broadcast, broadcastToConnIds } = createGatewayBroadcaster({ clients }); broadcast("exec.approval.requested", { id: "1" }); broadcast("device.pair.requested", { requestId: "r1" }); @@ -52,5 +52,10 @@ describe("gateway broadcaster", () => { expect(approvalsSocket.send).toHaveBeenCalledTimes(1); expect(pairingSocket.send).toHaveBeenCalledTimes(1); expect(readSocket.send).toHaveBeenCalledTimes(0); + + broadcastToConnIds("tick", { ts: 1 }, new Set(["c-read"])); + expect(readSocket.send).toHaveBeenCalledTimes(1); + expect(approvalsSocket.send).toHaveBeenCalledTimes(1); + expect(pairingSocket.send).toHaveBeenCalledTimes(1); }); }); diff --git a/src/gateway/server-broadcast.ts b/src/gateway/server-broadcast.ts index abbc3d87e38..870bdf95b8d 100644 --- a/src/gateway/server-broadcast.ts +++ b/src/gateway/server-broadcast.ts @@ -33,15 +33,18 @@ function hasEventScope(client: GatewayWsClient, event: string): boolean { export function createGatewayBroadcaster(params: { clients: Set }) { let seq = 0; - const broadcast = ( + + const broadcastInternal = ( event: string, payload: unknown, opts?: { dropIfSlow?: boolean; stateVersion?: { presence?: number; health?: number }; }, + targetConnIds?: ReadonlySet, ) => { - const eventSeq = ++seq; + const isTargeted = Boolean(targetConnIds); + const eventSeq = isTargeted ? undefined : ++seq; const frame = JSON.stringify({ type: "event", event, @@ -51,8 +54,9 @@ export function createGatewayBroadcaster(params: { clients: Set }); const logMeta: Record = { event, - seq: eventSeq, + seq: eventSeq ?? "targeted", clients: params.clients.size, + targets: targetConnIds ? targetConnIds.size : undefined, dropIfSlow: opts?.dropIfSlow, presenceVersion: opts?.stateVersion?.presence, healthVersion: opts?.stateVersion?.health, @@ -62,6 +66,9 @@ export function createGatewayBroadcaster(params: { clients: Set } logWs("out", "event", logMeta); for (const c of params.clients) { + if (targetConnIds && !targetConnIds.has(c.connId)) { + continue; + } if (!hasEventScope(c, event)) { continue; } @@ -84,5 +91,30 @@ export function createGatewayBroadcaster(params: { clients: Set } } }; - return { broadcast }; + + const broadcast = ( + event: string, + payload: unknown, + opts?: { + dropIfSlow?: boolean; + stateVersion?: { presence?: number; health?: number }; + }, + ) => broadcastInternal(event, payload, opts); + + const broadcastToConnIds = ( + event: string, + payload: unknown, + connIds: ReadonlySet, + opts?: { + dropIfSlow?: boolean; + stateVersion?: { presence?: number; health?: number }; + }, + ) => { + if (connIds.size === 0) { + return; + } + broadcastInternal(event, payload, opts, connIds); + }; + + return { broadcast, broadcastToConnIds }; } diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index 43761af11aa..33dc90155bc 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -1,22 +1,31 @@ import { describe, expect, it, vi } from "vitest"; -import { createAgentEventHandler, createChatRunState } from "./server-chat.js"; +import { registerAgentRunContext, resetAgentRunContextForTest } from "../infra/agent-events.js"; +import { + createAgentEventHandler, + createChatRunState, + createToolEventRecipientRegistry, +} from "./server-chat.js"; describe("agent event handler", () => { it("emits chat delta for assistant text-only events", () => { const nowSpy = vi.spyOn(Date, "now").mockReturnValue(1_000); const broadcast = vi.fn(); + const broadcastToConnIds = vi.fn(); const nodeSendToSession = vi.fn(); const agentRunSeq = new Map(); const chatRunState = createChatRunState(); + const toolEventRecipients = createToolEventRecipientRegistry(); chatRunState.registry.add("run-1", { sessionKey: "session-1", clientRunId: "client-1" }); const handler = createAgentEventHandler({ broadcast, + broadcastToConnIds, nodeSendToSession, agentRunSeq, chatRunState, resolveSessionKeyForRun: () => undefined, clearAgentRunContext: vi.fn(), + toolEventRecipients, }); handler({ @@ -39,4 +48,158 @@ describe("agent event handler", () => { expect(sessionChatCalls).toHaveLength(1); nowSpy.mockRestore(); }); + + it("routes tool events only to registered recipients when verbose is enabled", () => { + const broadcast = vi.fn(); + const broadcastToConnIds = vi.fn(); + const nodeSendToSession = vi.fn(); + const agentRunSeq = new Map(); + const chatRunState = createChatRunState(); + const toolEventRecipients = createToolEventRecipientRegistry(); + + registerAgentRunContext("run-tool", { sessionKey: "session-1", verboseLevel: "on" }); + toolEventRecipients.add("run-tool", "conn-1"); + + const handler = createAgentEventHandler({ + broadcast, + broadcastToConnIds, + nodeSendToSession, + agentRunSeq, + chatRunState, + resolveSessionKeyForRun: () => "session-1", + clearAgentRunContext: vi.fn(), + toolEventRecipients, + }); + + handler({ + runId: "run-tool", + seq: 1, + stream: "tool", + ts: Date.now(), + data: { phase: "start", name: "read", toolCallId: "t1" }, + }); + + expect(broadcast).not.toHaveBeenCalled(); + expect(broadcastToConnIds).toHaveBeenCalledTimes(1); + resetAgentRunContextForTest(); + }); + + it("suppresses tool events when verbose is off", () => { + const broadcast = vi.fn(); + const broadcastToConnIds = vi.fn(); + const nodeSendToSession = vi.fn(); + const agentRunSeq = new Map(); + const chatRunState = createChatRunState(); + const toolEventRecipients = createToolEventRecipientRegistry(); + + registerAgentRunContext("run-tool-off", { sessionKey: "session-1", verboseLevel: "off" }); + toolEventRecipients.add("run-tool-off", "conn-1"); + + const handler = createAgentEventHandler({ + broadcast, + broadcastToConnIds, + nodeSendToSession, + agentRunSeq, + chatRunState, + resolveSessionKeyForRun: () => "session-1", + clearAgentRunContext: vi.fn(), + toolEventRecipients, + }); + + handler({ + runId: "run-tool-off", + seq: 1, + stream: "tool", + ts: Date.now(), + data: { phase: "start", name: "read", toolCallId: "t2" }, + }); + + expect(broadcastToConnIds).not.toHaveBeenCalled(); + resetAgentRunContextForTest(); + }); + + it("strips tool output when verbose is on", () => { + const broadcast = vi.fn(); + const broadcastToConnIds = vi.fn(); + const nodeSendToSession = vi.fn(); + const agentRunSeq = new Map(); + const chatRunState = createChatRunState(); + const toolEventRecipients = createToolEventRecipientRegistry(); + + registerAgentRunContext("run-tool-on", { sessionKey: "session-1", verboseLevel: "on" }); + toolEventRecipients.add("run-tool-on", "conn-1"); + + const handler = createAgentEventHandler({ + broadcast, + broadcastToConnIds, + nodeSendToSession, + agentRunSeq, + chatRunState, + resolveSessionKeyForRun: () => "session-1", + clearAgentRunContext: vi.fn(), + toolEventRecipients, + }); + + handler({ + runId: "run-tool-on", + seq: 1, + stream: "tool", + ts: Date.now(), + data: { + phase: "result", + name: "exec", + toolCallId: "t3", + result: { content: [{ type: "text", text: "secret" }] }, + partialResult: { content: [{ type: "text", text: "partial" }] }, + }, + }); + + expect(broadcastToConnIds).toHaveBeenCalledTimes(1); + const payload = broadcastToConnIds.mock.calls[0]?.[1] as { data?: Record }; + expect(payload.data?.result).toBeUndefined(); + expect(payload.data?.partialResult).toBeUndefined(); + resetAgentRunContextForTest(); + }); + + it("keeps tool output when verbose is full", () => { + const broadcast = vi.fn(); + const broadcastToConnIds = vi.fn(); + const nodeSendToSession = vi.fn(); + const agentRunSeq = new Map(); + const chatRunState = createChatRunState(); + const toolEventRecipients = createToolEventRecipientRegistry(); + + registerAgentRunContext("run-tool-full", { sessionKey: "session-1", verboseLevel: "full" }); + toolEventRecipients.add("run-tool-full", "conn-1"); + + const handler = createAgentEventHandler({ + broadcast, + broadcastToConnIds, + nodeSendToSession, + agentRunSeq, + chatRunState, + resolveSessionKeyForRun: () => "session-1", + clearAgentRunContext: vi.fn(), + toolEventRecipients, + }); + + const result = { content: [{ type: "text", text: "secret" }] }; + handler({ + runId: "run-tool-full", + seq: 1, + stream: "tool", + ts: Date.now(), + data: { + phase: "result", + name: "exec", + toolCallId: "t4", + result, + }, + }); + + expect(broadcastToConnIds).toHaveBeenCalledTimes(1); + const payload = broadcastToConnIds.mock.calls[0]?.[1] as { data?: Record }; + expect(payload.data?.result).toEqual(result); + resetAgentRunContextForTest(); + }); }); diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 953b26268a4..2f9d17d577a 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -120,6 +120,79 @@ export function createChatRunState(): ChatRunState { }; } +export type ToolEventRecipientRegistry = { + add: (runId: string, connId: string) => void; + get: (runId: string) => ReadonlySet | undefined; + markFinal: (runId: string) => void; +}; + +type ToolRecipientEntry = { + connIds: Set; + updatedAt: number; + finalizedAt?: number; +}; + +const TOOL_EVENT_RECIPIENT_TTL_MS = 10 * 60 * 1000; +const TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS = 30 * 1000; + +export function createToolEventRecipientRegistry(): ToolEventRecipientRegistry { + const recipients = new Map(); + + const prune = () => { + if (recipients.size === 0) { + return; + } + const now = Date.now(); + for (const [runId, entry] of recipients) { + const cutoff = entry.finalizedAt + ? entry.finalizedAt + TOOL_EVENT_RECIPIENT_FINAL_GRACE_MS + : entry.updatedAt + TOOL_EVENT_RECIPIENT_TTL_MS; + if (now >= cutoff) { + recipients.delete(runId); + } + } + }; + + const add = (runId: string, connId: string) => { + if (!runId || !connId) { + return; + } + const now = Date.now(); + const existing = recipients.get(runId); + if (existing) { + existing.connIds.add(connId); + existing.updatedAt = now; + } else { + recipients.set(runId, { + connIds: new Set([connId]), + updatedAt: now, + }); + } + prune(); + }; + + const get = (runId: string) => { + const entry = recipients.get(runId); + if (!entry) { + return undefined; + } + entry.updatedAt = Date.now(); + prune(); + return entry.connIds; + }; + + const markFinal = (runId: string) => { + const entry = recipients.get(runId); + if (!entry) { + return; + } + entry.finalizedAt = Date.now(); + prune(); + }; + + return { add, get, markFinal }; +} + export type ChatEventBroadcast = ( event: string, payload: unknown, @@ -130,20 +203,29 @@ export type NodeSendToSession = (sessionKey: string, event: string, payload: unk export type AgentEventHandlerOptions = { broadcast: ChatEventBroadcast; + broadcastToConnIds: ( + event: string, + payload: unknown, + connIds: ReadonlySet, + opts?: { dropIfSlow?: boolean }, + ) => void; nodeSendToSession: NodeSendToSession; agentRunSeq: Map; chatRunState: ChatRunState; resolveSessionKeyForRun: (runId: string) => string | undefined; clearAgentRunContext: (runId: string) => void; + toolEventRecipients: ToolEventRecipientRegistry; }; export function createAgentEventHandler({ broadcast, + broadcastToConnIds, nodeSendToSession, agentRunSeq, chatRunState, resolveSessionKeyForRun, clearAgentRunContext, + toolEventRecipients, }: AgentEventHandlerOptions) { const emitChatDelta = (sessionKey: string, clientRunId: string, seq: number, text: string) => { chatRunState.buffers.set(clientRunId, text); @@ -213,25 +295,25 @@ export function createAgentEventHandler({ nodeSendToSession(sessionKey, "chat", payload); }; - const shouldEmitToolEvents = (runId: string, sessionKey?: string) => { + const resolveToolVerboseLevel = (runId: string, sessionKey?: string) => { const runContext = getAgentRunContext(runId); const runVerbose = normalizeVerboseLevel(runContext?.verboseLevel); if (runVerbose) { - return runVerbose === "on"; + return runVerbose; } if (!sessionKey) { - return false; + return "off"; } try { const { cfg, entry } = loadSessionEntry(sessionKey); const sessionVerbose = normalizeVerboseLevel(entry?.verboseLevel); if (sessionVerbose) { - return sessionVerbose === "on"; + return sessionVerbose; } const defaultVerbose = normalizeVerboseLevel(cfg.agents?.defaults?.verboseDefault); - return defaultVerbose === "on"; + return defaultVerbose ?? "off"; } catch { - return false; + return "off"; } }; @@ -244,10 +326,21 @@ export function createAgentEventHandler({ // Include sessionKey so Control UI can filter tool streams per session. const agentPayload = sessionKey ? { ...evt, sessionKey } : evt; const last = agentRunSeq.get(evt.runId) ?? 0; - if (evt.stream === "tool" && !shouldEmitToolEvents(evt.runId, sessionKey)) { + const isToolEvent = evt.stream === "tool"; + const toolVerbose = isToolEvent ? resolveToolVerboseLevel(evt.runId, sessionKey) : "off"; + if (isToolEvent && toolVerbose === "off") { agentRunSeq.set(evt.runId, evt.seq); return; } + const toolPayload = + isToolEvent && toolVerbose !== "full" + ? (() => { + const data = evt.data ? { ...evt.data } : {}; + delete data.result; + delete data.partialResult; + return sessionKey ? { ...evt, sessionKey, data } : { ...evt, data }; + })() + : agentPayload; if (evt.seq !== last + 1) { broadcast("agent", { runId: evt.runId, @@ -262,13 +355,20 @@ export function createAgentEventHandler({ }); } agentRunSeq.set(evt.runId, evt.seq); - broadcast("agent", agentPayload); + if (isToolEvent) { + const recipients = toolEventRecipients.get(evt.runId); + if (recipients && recipients.size > 0) { + broadcastToConnIds("agent", toolPayload, recipients); + } + } else { + broadcast("agent", agentPayload); + } const lifecyclePhase = evt.stream === "lifecycle" && typeof evt.data?.phase === "string" ? evt.data.phase : null; if (sessionKey) { - nodeSendToSession(sessionKey, "agent", agentPayload); + nodeSendToSession(sessionKey, "agent", isToolEvent ? toolPayload : agentPayload); if (!isAborted && evt.stream === "assistant" && typeof evt.data?.text === "string") { emitChatDelta(sessionKey, clientRunId, evt.seq, evt.data.text); } else if (!isAborted && (lifecyclePhase === "end" || lifecyclePhase === "error")) { @@ -306,6 +406,7 @@ export function createAgentEventHandler({ } if (lifecyclePhase === "end" || lifecyclePhase === "error") { + toolEventRecipients.markFinal(evt.runId); clearAgentRunContext(evt.runId); } }; diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 8377699ccd1..6ba6f9731fd 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -28,6 +28,7 @@ import { import { resolveAssistantIdentity } from "../assistant-identity.js"; import { parseMessageWithAttachments } from "../chat-attachments.js"; import { resolveAssistantAvatarUrl } from "../control-ui-shared.js"; +import { GATEWAY_CLIENT_CAPS, hasGatewayClientCap } from "../protocol/client-info.js"; import { ErrorCodes, errorShape, @@ -42,7 +43,7 @@ import { waitForAgentJob } from "./agent-job.js"; import { injectTimestamp, timestampOptsFromConfig } from "./agent-timestamp.js"; export const agentHandlers: GatewayRequestHandlers = { - agent: async ({ params, respond, context }) => { + agent: async ({ params, respond, context, client }) => { const p = params; if (!validateAgentParams(p)) { respond( @@ -296,6 +297,14 @@ export const agentHandlers: GatewayRequestHandlers = { } const runId = idem; + const connId = typeof client?.connId === "string" ? client.connId : undefined; + const wantsToolEvents = hasGatewayClientCap( + client?.connect?.caps, + GATEWAY_CLIENT_CAPS.TOOL_EVENTS, + ); + if (connId && wantsToolEvents) { + context.registerToolEventRecipient(runId, connId); + } const wantsDelivery = request.deliver === true; const explicitTo = diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index c07b0ec309c..21b91086114 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -20,6 +20,7 @@ import { } from "../chat-abort.js"; import { type ChatImageContent, parseMessageWithAttachments } from "../chat-attachments.js"; import { stripEnvelopeFromMessages } from "../chat-sanitize.js"; +import { GATEWAY_CLIENT_CAPS, hasGatewayClientCap } from "../protocol/client-info.js"; import { ErrorCodes, errorShape, @@ -216,7 +217,8 @@ export const chatHandlers: GatewayRequestHandlers = { if (configured) { thinkingLevel = configured; } else { - const { provider, model } = resolveSessionModelRef(cfg, entry); + const sessionAgentId = resolveSessionAgentId({ sessionKey, config: cfg }); + const { provider, model } = resolveSessionModelRef(cfg, entry, sessionAgentId); const catalog = await context.loadGatewayModelCatalog(); thinkingLevel = resolveThinkingDefault({ cfg, @@ -226,11 +228,13 @@ export const chatHandlers: GatewayRequestHandlers = { }); } } + const verboseLevel = entry?.verboseLevel ?? cfg.agents?.defaults?.verboseDefault; respond(true, { sessionKey, sessionId, messages: capped, thinkingLevel, + verboseLevel, }); }, "chat.abort": ({ params, respond, context }) => { @@ -432,7 +436,6 @@ export const chatHandlers: GatewayRequestHandlers = { startedAtMs: now, expiresAtMs: resolveChatRunExpiresAtMs({ now, timeoutMs }), }); - const ackPayload = { runId: clientRunId, status: "started" as const, @@ -506,8 +509,16 @@ export const chatHandlers: GatewayRequestHandlers = { abortSignal: abortController.signal, images: parsedImages.length > 0 ? parsedImages : undefined, disableBlockStreaming: true, - onAgentRunStart: () => { + onAgentRunStart: (runId) => { agentRunStarted = true; + const connId = typeof client?.connId === "string" ? client.connId : undefined; + const wantsToolEvents = hasGatewayClientCap( + client?.connect?.caps, + GATEWAY_CLIENT_CAPS.TOOL_EVENTS, + ); + if (connId && wantsToolEvents) { + context.registerToolEventRecipient(runId, connId); + } }, onModelSelected, }, diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 3fdb4ee75aa..b62a952d75a 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -1,6 +1,7 @@ import { randomUUID } from "node:crypto"; import fs from "node:fs"; import type { GatewayRequestHandlers } from "./types.js"; +import { resolveDefaultAgentId } from "../../agents/agent-scope.js"; import { abortEmbeddedPiRun, waitForEmbeddedPiRunEnd } from "../../agents/pi-embedded.js"; import { stopSubagentsForRequester } from "../../auto-reply/reply/abort.js"; import { clearSessionQueues } from "../../auto-reply/reply/queue.js"; @@ -12,6 +13,7 @@ import { type SessionEntry, updateSessionStore, } from "../../config/sessions.js"; +import { normalizeAgentId, parseAgentSessionKey } from "../../routing/session-key.js"; import { ErrorCodes, errorShape, @@ -31,6 +33,7 @@ import { loadSessionEntry, readSessionPreviewItemsFromTranscript, resolveGatewaySessionStoreTarget, + resolveSessionModelRef, resolveSessionTranscriptCandidates, type SessionsPatchResult, type SessionsPreviewEntry, @@ -194,11 +197,18 @@ export const sessionsHandlers: GatewayRequestHandlers = { respond(false, undefined, applied.error); return; } + const parsed = parseAgentSessionKey(target.canonicalKey ?? key); + const agentId = normalizeAgentId(parsed?.agentId ?? resolveDefaultAgentId(cfg)); + const resolved = resolveSessionModelRef(cfg, applied.entry, agentId); const result: SessionsPatchResult = { ok: true, path: storePath, key: target.canonicalKey, entry: applied.entry, + resolved: { + modelProvider: resolved.provider, + model: resolved.model, + }, }; respond(true, result, undefined); }, diff --git a/src/gateway/server-methods/types.ts b/src/gateway/server-methods/types.ts index e6b944f75a6..aa26b232f15 100644 --- a/src/gateway/server-methods/types.ts +++ b/src/gateway/server-methods/types.ts @@ -14,6 +14,7 @@ type SubsystemLogger = ReturnType; export type GatewayClient = { connect: ConnectParams; + connId?: string; }; export type RespondFn = ( @@ -42,6 +43,15 @@ export type GatewayRequestContext = { stateVersion?: { presence?: number; health?: number }; }, ) => void; + broadcastToConnIds: ( + event: string, + payload: unknown, + connIds: ReadonlySet, + opts?: { + dropIfSlow?: boolean; + stateVersion?: { presence?: number; health?: number }; + }, + ) => void; nodeSendToSession: (sessionKey: string, event: string, payload: unknown) => void; nodeSendToAllSubscribed: (event: string, payload: unknown) => void; nodeSubscribe: (nodeId: string, sessionKey: string) => void; @@ -60,6 +70,7 @@ export type GatewayRequestContext = { clientRunId: string, sessionKey?: string, ) => { sessionKey: string; clientRunId: string } | undefined; + registerToolEventRecipient: (runId: string, connId: string) => void; dedupe: Map; wizardSessions: Map; findRunningWizard: () => string | null; diff --git a/src/gateway/server-runtime-state.ts b/src/gateway/server-runtime-state.ts index 2fcfc62f08a..dc8a2e6bfc4 100644 --- a/src/gateway/server-runtime-state.ts +++ b/src/gateway/server-runtime-state.ts @@ -15,7 +15,11 @@ import { CANVAS_HOST_PATH } from "../canvas-host/a2ui.js"; import { type CanvasHostHandler, createCanvasHostHandler } from "../canvas-host/server.js"; import { resolveGatewayListenHosts } from "./net.js"; import { createGatewayBroadcaster } from "./server-broadcast.js"; -import { type ChatRunEntry, createChatRunState } from "./server-chat.js"; +import { + type ChatRunEntry, + createChatRunState, + createToolEventRecipientRegistry, +} from "./server-chat.js"; import { MAX_PAYLOAD_BYTES } from "./server-constants.js"; import { attachGatewayUpgradeHandler, createGatewayHttpServer } from "./server-http.js"; import { createGatewayHooksRequestHandler } from "./server/hooks.js"; @@ -59,6 +63,15 @@ export async function createGatewayRuntimeState(params: { stateVersion?: { presence?: number; health?: number }; }, ) => void; + broadcastToConnIds: ( + event: string, + payload: unknown, + connIds: ReadonlySet, + opts?: { + dropIfSlow?: boolean; + stateVersion?: { presence?: number; health?: number }; + }, + ) => void; agentRunSeq: Map; dedupe: Map; chatRunState: ReturnType; @@ -71,6 +84,7 @@ export async function createGatewayRuntimeState(params: { sessionKey?: string, ) => ChatRunEntry | undefined; chatAbortControllers: Map; + toolEventRecipients: ReturnType; }> { let canvasHost: CanvasHostHandler | null = null; if (params.canvasHostEnabled) { @@ -154,7 +168,7 @@ export async function createGatewayRuntimeState(params: { } const clients = new Set(); - const { broadcast } = createGatewayBroadcaster({ clients }); + const { broadcast, broadcastToConnIds } = createGatewayBroadcaster({ clients }); const agentRunSeq = new Map(); const dedupe = new Map(); const chatRunState = createChatRunState(); @@ -164,6 +178,7 @@ export async function createGatewayRuntimeState(params: { const addChatRun = chatRunRegistry.add; const removeChatRun = chatRunRegistry.remove; const chatAbortControllers = new Map(); + const toolEventRecipients = createToolEventRecipientRegistry(); return { canvasHost, @@ -173,6 +188,7 @@ export async function createGatewayRuntimeState(params: { wss, clients, broadcast, + broadcastToConnIds, agentRunSeq, dedupe, chatRunState, @@ -181,5 +197,6 @@ export async function createGatewayRuntimeState(params: { addChatRun, removeChatRun, chatAbortControllers, + toolEventRecipients, }; } diff --git a/src/gateway/server.chat.gateway-server-chat.e2e.test.ts b/src/gateway/server.chat.gateway-server-chat.e2e.test.ts index 59c0bec18b0..0f521ea44b4 100644 --- a/src/gateway/server.chat.gateway-server-chat.e2e.test.ts +++ b/src/gateway/server.chat.gateway-server-chat.e2e.test.ts @@ -380,8 +380,8 @@ describe("gateway server chat", () => { emitAgentEvent({ runId: "run-tool-1", - stream: "tool", - data: { phase: "start", name: "read", toolCallId: "tool-1" }, + stream: "assistant", + data: { text: "hello" }, }); const evt = await agentEvtP; @@ -390,31 +390,6 @@ describe("gateway server chat", () => { ? (evt.payload as Record) : {}; expect(payload.sessionKey).toBe("main"); - } - - { - registerAgentRunContext("run-tool-off", { sessionKey: "agent:main:main" }); - - emitAgentEvent({ - runId: "run-tool-off", - stream: "tool", - data: { phase: "start", name: "read", toolCallId: "tool-1" }, - }); - emitAgentEvent({ - runId: "run-tool-off", - stream: "assistant", - data: { text: "hello" }, - }); - - const evt = await onceMessage( - webchatWs, - (o) => o.type === "event" && o.event === "agent" && o.payload?.runId === "run-tool-off", - 8000, - ); - const payload = - evt.payload && typeof evt.payload === "object" - ? (evt.payload as Record) - : {}; expect(payload.stream).toBe("assistant"); } diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index d76fd6dd13d..d46a38ef3d6 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -318,6 +318,7 @@ export async function startGatewayServer( wss, clients, broadcast, + broadcastToConnIds, agentRunSeq, dedupe, chatRunState, @@ -326,6 +327,7 @@ export async function startGatewayServer( addChatRun, removeChatRun, chatAbortControllers, + toolEventRecipients, } = await createGatewayRuntimeState({ cfg: cfgAtStart, bindHost, @@ -441,11 +443,13 @@ export async function startGatewayServer( const agentUnsub = onAgentEvent( createAgentEventHandler({ broadcast, + broadcastToConnIds, nodeSendToSession, agentRunSeq, chatRunState, resolveSessionKeyForRun, clearAgentRunContext, + toolEventRecipients, }), ); @@ -495,6 +499,7 @@ export async function startGatewayServer( incrementPresenceVersion, getHealthVersion, broadcast, + broadcastToConnIds, nodeSendToSession, nodeSendToAllSubscribed, nodeSubscribe, @@ -509,6 +514,7 @@ export async function startGatewayServer( chatDeltaSentAt: chatRunState.deltaSentAt, addChatRun, removeChatRun, + registerToolEventRecipient: toolEventRecipients.add, dedupe, wizardSessions, findRunningWizard, diff --git a/src/gateway/session-utils.ts b/src/gateway/session-utils.ts index ec3b147f847..80ea40e4084 100644 --- a/src/gateway/session-utils.ts +++ b/src/gateway/session-utils.ts @@ -9,7 +9,10 @@ import type { import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js"; import { lookupContextTokens } from "../agents/context.js"; import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL, DEFAULT_PROVIDER } from "../agents/defaults.js"; -import { resolveConfiguredModelRef } from "../agents/model-selection.js"; +import { + resolveConfiguredModelRef, + resolveDefaultModelForAgent, +} from "../agents/model-selection.js"; import { type OpenClawConfig, loadConfig } from "../config/config.js"; import { resolveStateDir } from "../config/paths.js"; import { @@ -522,12 +525,15 @@ export function getSessionDefaults(cfg: OpenClawConfig): GatewaySessionsDefaults export function resolveSessionModelRef( cfg: OpenClawConfig, entry?: SessionEntry, + agentId?: string, ): { provider: string; model: string } { - const resolved = resolveConfiguredModelRef({ - cfg, - defaultProvider: DEFAULT_PROVIDER, - defaultModel: DEFAULT_MODEL, - }); + const resolved = agentId + ? resolveDefaultModelForAgent({ cfg, agentId }) + : resolveConfiguredModelRef({ + cfg, + defaultProvider: DEFAULT_PROVIDER, + defaultModel: DEFAULT_MODEL, + }); let provider = resolved.provider; let model = resolved.model; const storedModelOverride = entry?.modelOverride?.trim(); @@ -623,6 +629,11 @@ export function listSessionsFromStore(params: { entry?.label ?? originLabel; const deliveryFields = normalizeSessionDeliveryFields(entry); + const parsedAgent = parseAgentSessionKey(key); + const sessionAgentId = normalizeAgentId(parsedAgent?.agentId ?? resolveDefaultAgentId(cfg)); + const resolvedModel = resolveSessionModelRef(cfg, entry, sessionAgentId); + const modelProvider = resolvedModel.provider ?? DEFAULT_PROVIDER; + const model = resolvedModel.model ?? DEFAULT_MODEL; return { key, entry, @@ -648,8 +659,8 @@ export function listSessionsFromStore(params: { outputTokens: entry?.outputTokens, totalTokens: total, responseUsage: entry?.responseUsage, - modelProvider: entry?.modelProvider, - model: entry?.model, + modelProvider, + model, contextTokens: entry?.contextTokens, deliveryContext: deliveryFields.deliveryContext, lastChannel: deliveryFields.lastChannel ?? entry?.lastChannel, diff --git a/src/gateway/session-utils.types.ts b/src/gateway/session-utils.types.ts index 074e9eaa84e..503c7713abd 100644 --- a/src/gateway/session-utils.types.ts +++ b/src/gateway/session-utils.types.ts @@ -84,4 +84,8 @@ export type SessionsPatchResult = { path: string; key: string; entry: SessionEntry; + resolved?: { + modelProvider?: string; + model?: string; + }; }; diff --git a/src/gateway/sessions-patch.ts b/src/gateway/sessions-patch.ts index 36fe85e3a30..ba2d7bbc03c 100644 --- a/src/gateway/sessions-patch.ts +++ b/src/gateway/sessions-patch.ts @@ -2,8 +2,8 @@ import { randomUUID } from "node:crypto"; import type { ModelCatalogEntry } from "../agents/model-catalog.js"; import type { OpenClawConfig } from "../config/config.js"; import type { SessionEntry } from "../config/sessions.js"; -import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../agents/defaults.js"; -import { resolveAllowedModelRef, resolveConfiguredModelRef } from "../agents/model-selection.js"; +import { resolveDefaultAgentId } from "../agents/agent-scope.js"; +import { resolveAllowedModelRef, resolveDefaultModelForAgent } from "../agents/model-selection.js"; import { normalizeGroupActivation } from "../auto-reply/group-activation.js"; import { formatThinkingLevels, @@ -14,7 +14,11 @@ import { normalizeUsageDisplay, supportsXHighThinking, } from "../auto-reply/thinking.js"; -import { isSubagentSessionKey } from "../routing/session-key.js"; +import { + isSubagentSessionKey, + normalizeAgentId, + parseAgentSessionKey, +} from "../routing/session-key.js"; import { applyVerboseOverride, parseVerboseOverride } from "../sessions/level-overrides.js"; import { applyModelOverrideToSessionEntry } from "../sessions/model-overrides.js"; import { normalizeSendPolicy } from "../sessions/send-policy.js"; @@ -63,6 +67,9 @@ export async function applySessionsPatchToStore(params: { }): Promise<{ ok: true; entry: SessionEntry } | { ok: false; error: ErrorShape }> { const { cfg, store, storeKey, patch } = params; const now = Date.now(); + const parsedAgent = parseAgentSessionKey(storeKey); + const sessionAgentId = normalizeAgentId(parsedAgent?.agentId ?? resolveDefaultAgentId(cfg)); + const resolvedDefault = resolveDefaultModelForAgent({ cfg, agentId: sessionAgentId }); const existing = store[storeKey]; const next: SessionEntry = existing @@ -121,11 +128,6 @@ export async function applySessionsPatchToStore(params: { } else if (raw !== undefined) { const normalized = normalizeThinkLevel(String(raw)); if (!normalized) { - const resolvedDefault = resolveConfiguredModelRef({ - cfg, - defaultProvider: DEFAULT_PROVIDER, - defaultModel: DEFAULT_MODEL, - }); const hintProvider = existing?.providerOverride?.trim() || resolvedDefault.provider; const hintModel = existing?.modelOverride?.trim() || resolvedDefault.model; return invalid( @@ -251,11 +253,6 @@ export async function applySessionsPatchToStore(params: { if ("model" in patch) { const raw = patch.model; - const resolvedDefault = resolveConfiguredModelRef({ - cfg, - defaultProvider: DEFAULT_PROVIDER, - defaultModel: DEFAULT_MODEL, - }); if (raw === null) { applyModelOverrideToSessionEntry({ entry: next, @@ -302,11 +299,6 @@ export async function applySessionsPatchToStore(params: { } if (next.thinkingLevel === "xhigh") { - const resolvedDefault = resolveConfiguredModelRef({ - cfg, - defaultProvider: DEFAULT_PROVIDER, - defaultModel: DEFAULT_MODEL, - }); const effectiveProvider = next.providerOverride ?? resolvedDefault.provider; const effectiveModel = next.modelOverride ?? resolvedDefault.model; if (!supportsXHighThinking(effectiveProvider, effectiveModel)) { diff --git a/src/tui/gateway-chat.ts b/src/tui/gateway-chat.ts index 0dc08e9c254..fe871dac568 100644 --- a/src/tui/gateway-chat.ts +++ b/src/tui/gateway-chat.ts @@ -1,10 +1,12 @@ import { randomUUID } from "node:crypto"; import { loadConfig, resolveGatewayPort } from "../config/config.js"; import { GatewayClient } from "../gateway/client.js"; +import { GATEWAY_CLIENT_CAPS } from "../gateway/protocol/client-info.js"; import { type HelloOk, PROTOCOL_VERSION, type SessionsListParams, + type SessionsPatchResult, type SessionsPatchParams, } from "../gateway/protocol/index.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; @@ -22,6 +24,7 @@ export type ChatSendOptions = { thinking?: string; deliver?: boolean; timeoutMs?: number; + runId?: string; }; export type GatewayEvent = { @@ -116,6 +119,7 @@ export class GatewayChatClient { clientVersion: VERSION, platform: process.platform, mode: GATEWAY_CLIENT_MODES.UI, + caps: [GATEWAY_CLIENT_CAPS.TOOL_EVENTS], instanceId: randomUUID(), minProtocol: PROTOCOL_VERSION, maxProtocol: PROTOCOL_VERSION, @@ -153,7 +157,7 @@ export class GatewayChatClient { } async sendChat(opts: ChatSendOptions): Promise<{ runId: string }> { - const runId = randomUUID(); + const runId = opts.runId ?? randomUUID(); await this.client.request("chat.send", { sessionKey: opts.sessionKey, message: opts.message, @@ -195,8 +199,8 @@ export class GatewayChatClient { return await this.client.request("agents.list", {}); } - async patchSession(opts: SessionsPatchParams) { - return await this.client.request("sessions.patch", opts); + async patchSession(opts: SessionsPatchParams): Promise { + return await this.client.request("sessions.patch", opts); } async resetSession(key: string) { diff --git a/src/tui/tui-command-handlers.test.ts b/src/tui/tui-command-handlers.test.ts index 5ca9f9745b2..269dc7c3820 100644 --- a/src/tui/tui-command-handlers.test.ts +++ b/src/tui/tui-command-handlers.test.ts @@ -29,6 +29,8 @@ describe("tui command handlers", () => { abortActive: vi.fn(), setActivityStatus, formatSessionKey: vi.fn(), + applySessionInfoFromPatch: vi.fn(), + noteLocalRunId: vi.fn(), }); await handleCommand("/context"); diff --git a/src/tui/tui-command-handlers.ts b/src/tui/tui-command-handlers.ts index 136885b2af1..bb1bae48b60 100644 --- a/src/tui/tui-command-handlers.ts +++ b/src/tui/tui-command-handlers.ts @@ -1,4 +1,6 @@ import type { Component, TUI } from "@mariozechner/pi-tui"; +import { randomUUID } from "node:crypto"; +import type { SessionsPatchResult } from "../gateway/protocol/index.js"; import type { ChatLog } from "./components/chat-log.js"; import type { GatewayChatClient } from "./gateway-chat.js"; import type { @@ -38,6 +40,9 @@ type CommandHandlerContext = { abortActive: () => Promise; setActivityStatus: (text: string) => void; formatSessionKey: (key: string) => string; + applySessionInfoFromPatch: (result: SessionsPatchResult) => void; + noteLocalRunId: (runId: string) => void; + forgetLocalRunId?: (runId: string) => void; }; export function createCommandHandlers(context: CommandHandlerContext) { @@ -57,6 +62,9 @@ export function createCommandHandlers(context: CommandHandlerContext) { abortActive, setActivityStatus, formatSessionKey, + applySessionInfoFromPatch, + noteLocalRunId, + forgetLocalRunId, } = context; const setAgent = async (id: string) => { @@ -81,11 +89,12 @@ export function createCommandHandlers(context: CommandHandlerContext) { selector.onSelect = (item) => { void (async () => { try { - await client.patchSession({ + const result = await client.patchSession({ key: state.currentSessionKey, model: item.value, }); chatLog.addSystem(`model set to ${item.value}`); + applySessionInfoFromPatch(result); await refreshSessionInfo(); } catch (err) { chatLog.addSystem(`model set failed: ${String(err)}`); @@ -284,11 +293,12 @@ export function createCommandHandlers(context: CommandHandlerContext) { await openModelSelector(); } else { try { - await client.patchSession({ + const result = await client.patchSession({ key: state.currentSessionKey, model: args, }); chatLog.addSystem(`model set to ${args}`); + applySessionInfoFromPatch(result); await refreshSessionInfo(); } catch (err) { chatLog.addSystem(`model set failed: ${String(err)}`); @@ -309,11 +319,12 @@ export function createCommandHandlers(context: CommandHandlerContext) { break; } try { - await client.patchSession({ + const result = await client.patchSession({ key: state.currentSessionKey, thinkingLevel: args, }); chatLog.addSystem(`thinking set to ${args}`); + applySessionInfoFromPatch(result); await refreshSessionInfo(); } catch (err) { chatLog.addSystem(`think failed: ${String(err)}`); @@ -325,12 +336,13 @@ export function createCommandHandlers(context: CommandHandlerContext) { break; } try { - await client.patchSession({ + const result = await client.patchSession({ key: state.currentSessionKey, verboseLevel: args, }); chatLog.addSystem(`verbose set to ${args}`); - await refreshSessionInfo(); + applySessionInfoFromPatch(result); + await loadHistory(); } catch (err) { chatLog.addSystem(`verbose failed: ${String(err)}`); } @@ -341,11 +353,12 @@ export function createCommandHandlers(context: CommandHandlerContext) { break; } try { - await client.patchSession({ + const result = await client.patchSession({ key: state.currentSessionKey, reasoningLevel: args, }); chatLog.addSystem(`reasoning set to ${args}`); + applySessionInfoFromPatch(result); await refreshSessionInfo(); } catch (err) { chatLog.addSystem(`reasoning failed: ${String(err)}`); @@ -362,11 +375,12 @@ export function createCommandHandlers(context: CommandHandlerContext) { const next = normalized ?? (current === "off" ? "tokens" : current === "tokens" ? "full" : "off"); try { - await client.patchSession({ + const result = await client.patchSession({ key: state.currentSessionKey, responseUsage: next === "off" ? null : next, }); chatLog.addSystem(`usage footer: ${next}`); + applySessionInfoFromPatch(result); await refreshSessionInfo(); } catch (err) { chatLog.addSystem(`usage failed: ${String(err)}`); @@ -383,11 +397,12 @@ export function createCommandHandlers(context: CommandHandlerContext) { break; } try { - await client.patchSession({ + const result = await client.patchSession({ key: state.currentSessionKey, elevatedLevel: args, }); chatLog.addSystem(`elevated set to ${args}`); + applySessionInfoFromPatch(result); await refreshSessionInfo(); } catch (err) { chatLog.addSystem(`elevated failed: ${String(err)}`); @@ -399,11 +414,12 @@ export function createCommandHandlers(context: CommandHandlerContext) { break; } try { - await client.patchSession({ + const result = await client.patchSession({ key: state.currentSessionKey, groupActivation: args === "always" ? "always" : "mention", }); chatLog.addSystem(`activation set to ${args}`); + applySessionInfoFromPatch(result); await refreshSessionInfo(); } catch (err) { chatLog.addSystem(`activation failed: ${String(err)}`); @@ -448,17 +464,24 @@ export function createCommandHandlers(context: CommandHandlerContext) { try { chatLog.addUser(text); tui.requestRender(); + const runId = randomUUID(); + noteLocalRunId(runId); + state.activeChatRunId = runId; setActivityStatus("sending"); - const { runId } = await client.sendChat({ + await client.sendChat({ sessionKey: state.currentSessionKey, message: text, thinking: opts.thinking, deliver: deliverDefault, timeoutMs: opts.timeoutMs, + runId, }); - state.activeChatRunId = runId; setActivityStatus("waiting"); } catch (err) { + if (state.activeChatRunId) { + forgetLocalRunId?.(state.activeChatRunId); + } + state.activeChatRunId = null; chatLog.addSystem(`send failed: ${String(err)}`); setActivityStatus("error"); } diff --git a/src/tui/tui-event-handlers.test.ts b/src/tui/tui-event-handlers.test.ts index e3595584ba2..98b3d24da21 100644 --- a/src/tui/tui-event-handlers.test.ts +++ b/src/tui/tui-event-handlers.test.ts @@ -1,14 +1,14 @@ +import type { TUI } from "@mariozechner/pi-tui"; import { describe, expect, it, vi } from "vitest"; +import type { ChatLog } from "./components/chat-log.js"; import type { AgentEvent, ChatEvent, TuiStateAccess } from "./tui-types.js"; import { createEventHandlers } from "./tui-event-handlers.js"; -type MockChatLog = { - startTool: ReturnType; - updateToolResult: ReturnType; - addSystem: ReturnType; - updateAssistant: ReturnType; - finalizeAssistant: ReturnType; -}; +type MockChatLog = Pick< + ChatLog, + "startTool" | "updateToolResult" | "addSystem" | "updateAssistant" | "finalizeAssistant" +>; +type MockTui = Pick; describe("tui-event-handlers: handleAgentEvent", () => { const makeState = (overrides?: Partial): TuiStateAccess => ({ @@ -21,7 +21,7 @@ describe("tui-event-handlers: handleAgentEvent", () => { currentSessionId: "session-1", activeChatRunId: "run-1", historyLoaded: true, - sessionInfo: {}, + sessionInfo: { verboseLevel: "on" }, initialSessionApplied: true, isConnected: true, autoMessageSent: false, @@ -42,21 +42,40 @@ describe("tui-event-handlers: handleAgentEvent", () => { updateAssistant: vi.fn(), finalizeAssistant: vi.fn(), }; - const tui = { requestRender: vi.fn() }; + const tui: MockTui = { requestRender: vi.fn() }; const setActivityStatus = vi.fn(); + const loadHistory = vi.fn(); + const localRunIds = new Set(); + const noteLocalRunId = (runId: string) => { + localRunIds.add(runId); + }; + const forgetLocalRunId = (runId: string) => { + localRunIds.delete(runId); + }; + const isLocalRunId = (runId: string) => localRunIds.has(runId); + const clearLocalRunIds = () => { + localRunIds.clear(); + }; - return { chatLog, tui, state, setActivityStatus }; + return { + chatLog, + tui, + state, + setActivityStatus, + loadHistory, + noteLocalRunId, + forgetLocalRunId, + isLocalRunId, + clearLocalRunIds, + }; }; it("processes tool events when runId matches activeChatRunId (even if sessionId differs)", () => { const state = makeState({ currentSessionId: "session-xyz", activeChatRunId: "run-123" }); const { chatLog, tui, setActivityStatus } = makeContext(state); const { handleAgentEvent } = createEventHandlers({ - // Casts are fine here: TUI runtime shape is larger than we need in unit tests. - // oxlint-disable-next-line typescript/no-explicit-any - chatLog: chatLog as any, - // oxlint-disable-next-line typescript/no-explicit-any - tui: tui as any, + chatLog, + tui, state, setActivityStatus, }); @@ -82,10 +101,8 @@ describe("tui-event-handlers: handleAgentEvent", () => { const state = makeState({ activeChatRunId: "run-1" }); const { chatLog, tui, setActivityStatus } = makeContext(state); const { handleAgentEvent } = createEventHandlers({ - // oxlint-disable-next-line typescript/no-explicit-any - chatLog: chatLog as any, - // oxlint-disable-next-line typescript/no-explicit-any - tui: tui as any, + chatLog, + tui, state, setActivityStatus, }); @@ -107,10 +124,14 @@ describe("tui-event-handlers: handleAgentEvent", () => { const state = makeState({ activeChatRunId: "run-9" }); const { tui, setActivityStatus } = makeContext(state); const { handleAgentEvent } = createEventHandlers({ - // oxlint-disable-next-line typescript/no-explicit-any - chatLog: { startTool: vi.fn(), updateToolResult: vi.fn() } as any, - // oxlint-disable-next-line typescript/no-explicit-any - tui: tui as any, + chatLog: { + startTool: vi.fn(), + updateToolResult: vi.fn(), + addSystem: vi.fn(), + updateAssistant: vi.fn(), + finalizeAssistant: vi.fn(), + }, + tui, state, setActivityStatus, }); @@ -131,10 +152,8 @@ describe("tui-event-handlers: handleAgentEvent", () => { const state = makeState({ activeChatRunId: null }); const { chatLog, tui, setActivityStatus } = makeContext(state); const { handleChatEvent, handleAgentEvent } = createEventHandlers({ - // oxlint-disable-next-line typescript/no-explicit-any - chatLog: chatLog as any, - // oxlint-disable-next-line typescript/no-explicit-any - tui: tui as any, + chatLog, + tui, state, setActivityStatus, }); @@ -165,10 +184,8 @@ describe("tui-event-handlers: handleAgentEvent", () => { const state = makeState({ activeChatRunId: null }); const { chatLog, tui, setActivityStatus } = makeContext(state); const { handleChatEvent, handleAgentEvent } = createEventHandlers({ - // oxlint-disable-next-line typescript/no-explicit-any - chatLog: chatLog as any, - // oxlint-disable-next-line typescript/no-explicit-any - tui: tui as any, + chatLog, + tui, state, setActivityStatus, }); @@ -194,14 +211,39 @@ describe("tui-event-handlers: handleAgentEvent", () => { expect(tui.requestRender).not.toHaveBeenCalled(); }); + it("accepts tool events after chat final for the same run", () => { + const state = makeState({ activeChatRunId: null }); + const { chatLog, tui, setActivityStatus } = makeContext(state); + const { handleChatEvent, handleAgentEvent } = createEventHandlers({ + chatLog, + tui, + state, + setActivityStatus, + }); + + handleChatEvent({ + runId: "run-final", + sessionKey: state.currentSessionKey, + state: "final", + message: { content: [{ type: "text", text: "done" }] }, + }); + + handleAgentEvent({ + runId: "run-final", + stream: "tool", + data: { phase: "start", toolCallId: "tc-final", name: "session_status" }, + }); + + expect(chatLog.startTool).toHaveBeenCalledWith("tc-final", "session_status", undefined); + expect(tui.requestRender).toHaveBeenCalled(); + }); + it("ignores lifecycle updates for non-active runs in the same session", () => { const state = makeState({ activeChatRunId: "run-active" }); const { chatLog, tui, setActivityStatus } = makeContext(state); const { handleChatEvent, handleAgentEvent } = createEventHandlers({ - // oxlint-disable-next-line typescript/no-explicit-any - chatLog: chatLog as any, - // oxlint-disable-next-line typescript/no-explicit-any - tui: tui as any, + chatLog, + tui, state, setActivityStatus, }); @@ -224,4 +266,95 @@ describe("tui-event-handlers: handleAgentEvent", () => { expect(setActivityStatus).not.toHaveBeenCalled(); expect(tui.requestRender).not.toHaveBeenCalled(); }); + + it("suppresses tool events when verbose is off", () => { + const state = makeState({ + activeChatRunId: "run-123", + sessionInfo: { verboseLevel: "off" }, + }); + const { chatLog, tui, setActivityStatus } = makeContext(state); + const { handleAgentEvent } = createEventHandlers({ + chatLog, + tui, + state, + setActivityStatus, + }); + + handleAgentEvent({ + runId: "run-123", + stream: "tool", + data: { phase: "start", toolCallId: "tc-off", name: "session_status" }, + }); + + expect(chatLog.startTool).not.toHaveBeenCalled(); + expect(tui.requestRender).not.toHaveBeenCalled(); + }); + + it("omits tool output when verbose is on (non-full)", () => { + const state = makeState({ + activeChatRunId: "run-123", + sessionInfo: { verboseLevel: "on" }, + }); + const { chatLog, tui, setActivityStatus } = makeContext(state); + const { handleAgentEvent } = createEventHandlers({ + chatLog, + tui, + state, + setActivityStatus, + }); + + handleAgentEvent({ + runId: "run-123", + stream: "tool", + data: { + phase: "update", + toolCallId: "tc-on", + name: "session_status", + partialResult: { content: [{ type: "text", text: "secret" }] }, + }, + }); + + handleAgentEvent({ + runId: "run-123", + stream: "tool", + data: { + phase: "result", + toolCallId: "tc-on", + name: "session_status", + result: { content: [{ type: "text", text: "secret" }] }, + isError: false, + }, + }); + + expect(chatLog.updateToolResult).toHaveBeenCalledTimes(1); + expect(chatLog.updateToolResult).toHaveBeenCalledWith( + "tc-on", + { content: [] }, + { isError: false }, + ); + }); + + it("refreshes history after a non-local chat final", () => { + const state = makeState({ activeChatRunId: null }); + const { chatLog, tui, setActivityStatus, loadHistory, isLocalRunId, forgetLocalRunId } = + makeContext(state); + const { handleChatEvent } = createEventHandlers({ + chatLog, + tui, + state, + setActivityStatus, + loadHistory, + isLocalRunId, + forgetLocalRunId, + }); + + handleChatEvent({ + runId: "external-run", + sessionKey: state.currentSessionKey, + state: "final", + message: { content: [{ type: "text", text: "done" }] }, + }); + + expect(loadHistory).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/tui/tui-event-handlers.ts b/src/tui/tui-event-handlers.ts index 111f1fafb28..c07aef99a69 100644 --- a/src/tui/tui-event-handlers.ts +++ b/src/tui/tui-event-handlers.ts @@ -10,10 +10,24 @@ type EventHandlerContext = { state: TuiStateAccess; setActivityStatus: (text: string) => void; refreshSessionInfo?: () => Promise; + loadHistory?: () => Promise; + isLocalRunId?: (runId: string) => boolean; + forgetLocalRunId?: (runId: string) => void; + clearLocalRunIds?: () => void; }; export function createEventHandlers(context: EventHandlerContext) { - const { chatLog, tui, state, setActivityStatus, refreshSessionInfo } = context; + const { + chatLog, + tui, + state, + setActivityStatus, + refreshSessionInfo, + loadHistory, + isLocalRunId, + forgetLocalRunId, + clearLocalRunIds, + } = context; const finalizedRuns = new Map(); const sessionRuns = new Map(); let streamAssembler = new TuiStreamAssembler(); @@ -50,6 +64,7 @@ export function createEventHandlers(context: EventHandlerContext) { finalizedRuns.clear(); sessionRuns.clear(); streamAssembler = new TuiStreamAssembler(); + clearLocalRunIds?.(); }; const noteSessionRun = (runId: string) => { @@ -95,6 +110,11 @@ export function createEventHandlers(context: EventHandlerContext) { } if (evt.state === "final") { if (isCommandMessage(evt.message)) { + if (isLocalRunId?.(evt.runId)) { + forgetLocalRunId?.(evt.runId); + } else { + void loadHistory?.(); + } const text = extractTextFromMessage(evt.message); if (text) { chatLog.addSystem(text); @@ -107,6 +127,11 @@ export function createEventHandlers(context: EventHandlerContext) { tui.requestRender(); return; } + if (isLocalRunId?.(evt.runId)) { + forgetLocalRunId?.(evt.runId); + } else { + void loadHistory?.(); + } const stopReason = evt.message && typeof evt.message === "object" && !Array.isArray(evt.message) ? typeof (evt.message as Record).stopReason === "string" @@ -129,6 +154,11 @@ export function createEventHandlers(context: EventHandlerContext) { state.activeChatRunId = null; setActivityStatus("aborted"); void refreshSessionInfo?.(); + if (isLocalRunId?.(evt.runId)) { + forgetLocalRunId?.(evt.runId); + } else { + void loadHistory?.(); + } } if (evt.state === "error") { chatLog.addSystem(`run error: ${evt.errorMessage ?? "unknown"}`); @@ -137,6 +167,11 @@ export function createEventHandlers(context: EventHandlerContext) { state.activeChatRunId = null; setActivityStatus("error"); void refreshSessionInfo?.(); + if (isLocalRunId?.(evt.runId)) { + forgetLocalRunId?.(evt.runId); + } else { + void loadHistory?.(); + } } tui.requestRender(); }; @@ -148,12 +183,20 @@ export function createEventHandlers(context: EventHandlerContext) { const evt = payload as AgentEvent; syncSessionKey(); // Agent events (tool streaming, lifecycle) are emitted per-run. Filter against the - // active chat run id, not the session id. + // active chat run id, not the session id. Tool results can arrive after the chat + // final event, so accept finalized runs for tool updates. const isActiveRun = evt.runId === state.activeChatRunId; - if (!isActiveRun && !sessionRuns.has(evt.runId)) { + const isKnownRun = isActiveRun || sessionRuns.has(evt.runId) || finalizedRuns.has(evt.runId); + if (!isKnownRun) { return; } if (evt.stream === "tool") { + const verbose = state.sessionInfo.verboseLevel ?? "off"; + const allowToolEvents = verbose !== "off"; + const allowToolOutput = verbose === "full"; + if (!allowToolEvents) { + return; + } const data = evt.data ?? {}; const phase = asString(data.phase, ""); const toolCallId = asString(data.toolCallId, ""); @@ -164,13 +207,20 @@ export function createEventHandlers(context: EventHandlerContext) { if (phase === "start") { chatLog.startTool(toolCallId, toolName, data.args); } else if (phase === "update") { + if (!allowToolOutput) { + return; + } chatLog.updateToolResult(toolCallId, data.partialResult, { partial: true, }); } else if (phase === "result") { - chatLog.updateToolResult(toolCallId, data.result, { - isError: Boolean(data.isError), - }); + if (allowToolOutput) { + chatLog.updateToolResult(toolCallId, data.result, { + isError: Boolean(data.isError), + }); + } else { + chatLog.updateToolResult(toolCallId, { content: [] }, { isError: Boolean(data.isError) }); + } } tui.requestRender(); return; diff --git a/src/tui/tui-session-actions.test.ts b/src/tui/tui-session-actions.test.ts new file mode 100644 index 00000000000..7e20595058b --- /dev/null +++ b/src/tui/tui-session-actions.test.ts @@ -0,0 +1,113 @@ +import { describe, expect, it, vi } from "vitest"; +import type { TuiStateAccess } from "./tui-types.js"; +import { createSessionActions } from "./tui-session-actions.js"; + +describe("tui session actions", () => { + it("queues session refreshes and applies the latest result", async () => { + let resolveFirst: ((value: unknown) => void) | undefined; + let resolveSecond: ((value: unknown) => void) | undefined; + + const listSessions = vi + .fn() + .mockImplementationOnce( + () => + new Promise((resolve) => { + resolveFirst = resolve; + }), + ) + .mockImplementationOnce( + () => + new Promise((resolve) => { + resolveSecond = resolve; + }), + ); + + const state: TuiStateAccess = { + agentDefaultId: "main", + sessionMainKey: "agent:main:main", + sessionScope: "global", + agents: [], + currentAgentId: "main", + currentSessionKey: "agent:main:main", + currentSessionId: null, + activeChatRunId: null, + historyLoaded: false, + sessionInfo: {}, + initialSessionApplied: true, + isConnected: true, + autoMessageSent: false, + toolsExpanded: false, + showThinking: false, + connectionStatus: "connected", + activityStatus: "idle", + statusTimeout: null, + lastCtrlCAt: 0, + }; + + const updateFooter = vi.fn(); + const updateAutocompleteProvider = vi.fn(); + const requestRender = vi.fn(); + + const { refreshSessionInfo } = createSessionActions({ + client: { listSessions } as { listSessions: typeof listSessions }, + chatLog: { addSystem: vi.fn() } as unknown as import("./components/chat-log.js").ChatLog, + tui: { requestRender } as unknown as import("@mariozechner/pi-tui").TUI, + opts: {}, + state, + agentNames: new Map(), + initialSessionInput: "", + initialSessionAgentId: null, + resolveSessionKey: vi.fn(), + updateHeader: vi.fn(), + updateFooter, + updateAutocompleteProvider, + setActivityStatus: vi.fn(), + }); + + const first = refreshSessionInfo(); + const second = refreshSessionInfo(); + + await Promise.resolve(); + expect(listSessions).toHaveBeenCalledTimes(1); + + resolveFirst?.({ + ts: Date.now(), + path: "/tmp/sessions.json", + count: 1, + defaults: {}, + sessions: [ + { + key: "agent:main:main", + model: "old", + modelProvider: "anthropic", + }, + ], + }); + + await first; + await Promise.resolve(); + + expect(listSessions).toHaveBeenCalledTimes(2); + + resolveSecond?.({ + ts: Date.now(), + path: "/tmp/sessions.json", + count: 1, + defaults: {}, + sessions: [ + { + key: "agent:main:main", + model: "Minimax-M2.1", + modelProvider: "minimax", + }, + ], + }); + + await second; + + expect(state.sessionInfo.model).toBe("Minimax-M2.1"); + expect(updateAutocompleteProvider).toHaveBeenCalledTimes(2); + expect(updateFooter).toHaveBeenCalledTimes(2); + expect(requestRender).toHaveBeenCalledTimes(2); + }); +}); diff --git a/src/tui/tui-session-actions.ts b/src/tui/tui-session-actions.ts index 310acc74171..13faf44cee0 100644 --- a/src/tui/tui-session-actions.ts +++ b/src/tui/tui-session-actions.ts @@ -1,4 +1,5 @@ import type { TUI } from "@mariozechner/pi-tui"; +import type { SessionsPatchResult } from "../gateway/protocol/index.js"; import type { ChatLog } from "./components/chat-log.js"; import type { GatewayAgentsList, GatewayChatClient } from "./gateway-chat.js"; import type { TuiOptions, TuiStateAccess } from "./tui-types.js"; @@ -23,6 +24,30 @@ type SessionActionContext = { updateFooter: () => void; updateAutocompleteProvider: () => void; setActivityStatus: (text: string) => void; + clearLocalRunIds?: () => void; +}; + +type SessionInfoDefaults = { + model?: string | null; + modelProvider?: string | null; + contextTokens?: number | null; +}; + +type SessionInfoEntry = { + thinkingLevel?: string; + verboseLevel?: string; + reasoningLevel?: string; + model?: string; + modelProvider?: string; + modelOverride?: string; + providerOverride?: string; + contextTokens?: number | null; + inputTokens?: number | null; + outputTokens?: number | null; + totalTokens?: number | null; + responseUsage?: "on" | "off" | "tokens" | "full"; + updatedAt?: number | null; + displayName?: string; }; export function createSessionActions(context: SessionActionContext) { @@ -40,8 +65,10 @@ export function createSessionActions(context: SessionActionContext) { updateFooter, updateAutocompleteProvider, setActivityStatus, + clearLocalRunIds, } = context; - let refreshSessionInfoPromise: Promise | null = null; + let refreshSessionInfoPromise: Promise = Promise.resolve(); + let lastSessionDefaults: SessionInfoDefaults | null = null; const applyAgentsResult = (result: GatewayAgentsList) => { state.agentDefaultId = normalizeAgentId(result.defaultId); @@ -99,58 +126,173 @@ export function createSessionActions(context: SessionActionContext) { } }; - const refreshSessionInfo = async () => { - if (refreshSessionInfoPromise) { - return refreshSessionInfoPromise; + const resolveModelSelection = (entry?: SessionInfoEntry) => { + if (entry?.modelProvider || entry?.model) { + return { + modelProvider: entry.modelProvider ?? state.sessionInfo.modelProvider, + model: entry.model ?? state.sessionInfo.model, + }; } - refreshSessionInfoPromise = (async () => { - try { - const listAgentId = - state.currentSessionKey === "global" || state.currentSessionKey === "unknown" - ? undefined - : state.currentAgentId; - const result = await client.listSessions({ - includeGlobal: false, - includeUnknown: false, - agentId: listAgentId, - }); - const entry = result.sessions.find((row) => { - // Exact match - if (row.key === state.currentSessionKey) { - return true; - } - // Also match canonical keys like "agent:default:main" against "main" - const parsed = parseAgentSessionKey(row.key); - return parsed?.rest === state.currentSessionKey; - }); - state.sessionInfo = { - thinkingLevel: entry?.thinkingLevel, - verboseLevel: entry?.verboseLevel, - reasoningLevel: entry?.reasoningLevel, - model: entry?.model ?? result.defaults?.model ?? undefined, - modelProvider: entry?.modelProvider ?? result.defaults?.modelProvider ?? undefined, - contextTokens: entry?.contextTokens ?? result.defaults?.contextTokens, - inputTokens: entry?.inputTokens ?? null, - outputTokens: entry?.outputTokens ?? null, - totalTokens: entry?.totalTokens ?? null, - responseUsage: entry?.responseUsage, - updatedAt: entry?.updatedAt ?? null, - displayName: entry?.displayName, - }; - } catch (err) { - chatLog.addSystem(`sessions list failed: ${String(err)}`); - } - updateAutocompleteProvider(); - updateFooter(); - tui.requestRender(); - })(); + const overrideModel = entry?.modelOverride?.trim(); + if (overrideModel) { + const overrideProvider = entry?.providerOverride?.trim() || state.sessionInfo.modelProvider; + return { modelProvider: overrideProvider, model: overrideModel }; + } + return { + modelProvider: state.sessionInfo.modelProvider, + model: state.sessionInfo.model, + }; + }; + + const applySessionInfo = (params: { + entry?: SessionInfoEntry | null; + defaults?: SessionInfoDefaults | null; + force?: boolean; + }) => { + const entry = params.entry ?? undefined; + const defaults = params.defaults ?? lastSessionDefaults ?? undefined; + const previousDefaults = lastSessionDefaults; + const defaultsChanged = params.defaults + ? previousDefaults?.model !== params.defaults.model || + previousDefaults?.modelProvider !== params.defaults.modelProvider || + previousDefaults?.contextTokens !== params.defaults.contextTokens + : false; + if (params.defaults) { + lastSessionDefaults = params.defaults; + } + + const entryUpdatedAt = entry?.updatedAt ?? null; + const currentUpdatedAt = state.sessionInfo.updatedAt ?? null; + const modelChanged = + (entry?.modelProvider !== undefined && + entry.modelProvider !== state.sessionInfo.modelProvider) || + (entry?.model !== undefined && entry.model !== state.sessionInfo.model); + if ( + !params.force && + entryUpdatedAt !== null && + currentUpdatedAt !== null && + entryUpdatedAt < currentUpdatedAt && + !defaultsChanged && + !modelChanged + ) { + return; + } + + const next = { ...state.sessionInfo }; + if (entry?.thinkingLevel !== undefined) { + next.thinkingLevel = entry.thinkingLevel; + } + if (entry?.verboseLevel !== undefined) { + next.verboseLevel = entry.verboseLevel; + } + if (entry?.reasoningLevel !== undefined) { + next.reasoningLevel = entry.reasoningLevel; + } + if (entry?.responseUsage !== undefined) { + next.responseUsage = entry.responseUsage; + } + if (entry?.inputTokens !== undefined) { + next.inputTokens = entry.inputTokens; + } + if (entry?.outputTokens !== undefined) { + next.outputTokens = entry.outputTokens; + } + if (entry?.totalTokens !== undefined) { + next.totalTokens = entry.totalTokens; + } + if (entry?.contextTokens !== undefined || defaults?.contextTokens !== undefined) { + next.contextTokens = + entry?.contextTokens ?? defaults?.contextTokens ?? state.sessionInfo.contextTokens; + } + if (entry?.displayName !== undefined) { + next.displayName = entry.displayName; + } + if (entry?.updatedAt !== undefined) { + next.updatedAt = entry.updatedAt; + } + + const selection = resolveModelSelection(entry); + if (selection.modelProvider !== undefined) { + next.modelProvider = selection.modelProvider; + } + if (selection.model !== undefined) { + next.model = selection.model; + } + + state.sessionInfo = next; + updateAutocompleteProvider(); + updateFooter(); + tui.requestRender(); + }; + + const runRefreshSessionInfo = async () => { try { - await refreshSessionInfoPromise; - } finally { - refreshSessionInfoPromise = null; + const resolveListAgentId = () => { + if (state.currentSessionKey === "global" || state.currentSessionKey === "unknown") { + return undefined; + } + const parsed = parseAgentSessionKey(state.currentSessionKey); + return parsed?.agentId ? normalizeAgentId(parsed.agentId) : state.currentAgentId; + }; + const listAgentId = resolveListAgentId(); + const result = await client.listSessions({ + includeGlobal: false, + includeUnknown: false, + agentId: listAgentId, + }); + const normalizeMatchKey = (key: string) => parseAgentSessionKey(key)?.rest ?? key; + const currentMatchKey = normalizeMatchKey(state.currentSessionKey); + const entry = result.sessions.find((row) => { + // Exact match + if (row.key === state.currentSessionKey) { + return true; + } + // Also match canonical keys like "agent:default:main" against "main" + return normalizeMatchKey(row.key) === currentMatchKey; + }); + if (entry?.key && entry.key !== state.currentSessionKey) { + updateAgentFromSessionKey(entry.key); + state.currentSessionKey = entry.key; + updateHeader(); + } + applySessionInfo({ + entry, + defaults: result.defaults, + }); + } catch (err) { + chatLog.addSystem(`sessions list failed: ${String(err)}`); } }; + const refreshSessionInfo = async () => { + refreshSessionInfoPromise = refreshSessionInfoPromise.then( + runRefreshSessionInfo, + runRefreshSessionInfo, + ); + await refreshSessionInfoPromise; + }; + + const applySessionInfoFromPatch = (result?: SessionsPatchResult | null) => { + if (!result?.entry) { + return; + } + if (result.key && result.key !== state.currentSessionKey) { + updateAgentFromSessionKey(result.key); + state.currentSessionKey = result.key; + updateHeader(); + } + const resolved = result.resolved; + const entry = + resolved && (resolved.modelProvider || resolved.model) + ? { + ...result.entry, + modelProvider: resolved.modelProvider ?? result.entry.modelProvider, + model: resolved.model ?? result.entry.model, + } + : result.entry; + applySessionInfo({ entry, force: true }); + }; + const loadHistory = async () => { try { const history = await client.loadHistory({ @@ -161,9 +303,12 @@ export function createSessionActions(context: SessionActionContext) { messages?: unknown[]; sessionId?: string; thinkingLevel?: string; + verboseLevel?: string; }; state.currentSessionId = typeof record.sessionId === "string" ? record.sessionId : null; state.sessionInfo.thinkingLevel = record.thinkingLevel ?? state.sessionInfo.thinkingLevel; + state.sessionInfo.verboseLevel = record.verboseLevel ?? state.sessionInfo.verboseLevel; + const showTools = (state.sessionInfo.verboseLevel ?? "off") !== "off"; chatLog.clearAll(); chatLog.addSystem(`session ${state.currentSessionKey}`); for (const entry of record.messages ?? []) { @@ -195,6 +340,9 @@ export function createSessionActions(context: SessionActionContext) { continue; } if (message.role === "toolResult") { + if (!showTools) { + continue; + } const toolCallId = asString(message.toolCallId, ""); const toolName = asString(message.toolName, "tool"); const component = chatLog.startTool(toolCallId, toolName, {}); @@ -227,6 +375,7 @@ export function createSessionActions(context: SessionActionContext) { state.activeChatRunId = null; state.currentSessionId = null; state.historyLoaded = false; + clearLocalRunIds?.(); updateHeader(); updateFooter(); await loadHistory(); @@ -255,6 +404,7 @@ export function createSessionActions(context: SessionActionContext) { applyAgentsResult, refreshAgents, refreshSessionInfo, + applySessionInfoFromPatch, loadHistory, setSession, abortActive, diff --git a/src/tui/tui.ts b/src/tui/tui.ts index fd693b4db6a..982749d248c 100644 --- a/src/tui/tui.ts +++ b/src/tui/tui.ts @@ -95,6 +95,7 @@ export async function runTui(opts: TuiOptions) { let wasDisconnected = false; let toolsExpanded = false; let showThinking = false; + const localRunIds = new Set(); const deliverDefault = opts.deliver ?? false; const autoMessage = opts.message?.trim(); @@ -225,6 +226,29 @@ export async function runTui(opts: TuiOptions) { }, }; + const noteLocalRunId = (runId: string) => { + if (!runId) { + return; + } + localRunIds.add(runId); + if (localRunIds.size > 200) { + const [first] = localRunIds; + if (first) { + localRunIds.delete(first); + } + } + }; + + const forgetLocalRunId = (runId: string) => { + localRunIds.delete(runId); + }; + + const isLocalRunId = (runId: string) => localRunIds.has(runId); + + const clearLocalRunIds = () => { + localRunIds.clear(); + }; + const client = new GatewayChatClient({ url: opts.url, token: opts.token, @@ -522,9 +546,16 @@ export async function runTui(opts: TuiOptions) { updateFooter, updateAutocompleteProvider, setActivityStatus, + clearLocalRunIds, }); - const { refreshAgents, refreshSessionInfo, loadHistory, setSession, abortActive } = - sessionActions; + const { + refreshAgents, + refreshSessionInfo, + applySessionInfoFromPatch, + loadHistory, + setSession, + abortActive, + } = sessionActions; const { handleChatEvent, handleAgentEvent } = createEventHandlers({ chatLog, @@ -532,6 +563,10 @@ export async function runTui(opts: TuiOptions) { state, setActivityStatus, refreshSessionInfo, + loadHistory, + isLocalRunId, + forgetLocalRunId, + clearLocalRunIds, }); const { handleCommand, sendMessage, openModelSelector, openAgentSelector, openSessionSelector } = @@ -545,12 +580,15 @@ export async function runTui(opts: TuiOptions) { openOverlay, closeOverlay, refreshSessionInfo, + applySessionInfoFromPatch, loadHistory, setSession, refreshAgents, abortActive, setActivityStatus, formatSessionKey, + noteLocalRunId, + forgetLocalRunId, }); const { runLocalShellLine } = createLocalShellRunner({