From 2466a9bb13691b26655985edfd6b004a75523f17 Mon Sep 17 00:00:00 2001 From: Onur <2453968+osolmaz@users.noreply.github.com> Date: Sun, 1 Mar 2026 09:19:11 +0100 Subject: [PATCH] ACP: carry dedupe/projector updates onto configurable acpx branch --- .../acpx/src/runtime-internals/events.test.ts | 44 ++- .../acpx/src/runtime-internals/events.ts | 218 +++++++---- src/acp/runtime/types.ts | 21 + src/auto-reply/reply/abort.test.ts | 101 +++++ src/auto-reply/reply/abort.ts | 24 +- src/auto-reply/reply/acp-projector.test.ts | 333 ++++++++++++---- src/auto-reply/reply/acp-projector.ts | 366 +++++++++++++++++- src/auto-reply/reply/dispatch-acp.ts | 81 ++++ src/auto-reply/reply/dispatch-from-config.ts | 1 + src/config/schema.help.ts | 18 +- src/config/schema.labels.ts | 8 + src/config/types.acp.ts | 21 + src/config/zod-schema.ts | 10 + src/plugin-sdk/index.ts | 1 + 14 files changed, 1076 insertions(+), 171 deletions(-) diff --git a/extensions/acpx/src/runtime-internals/events.test.ts b/extensions/acpx/src/runtime-internals/events.test.ts index 0736ba4063c..7329b9d1991 100644 --- a/extensions/acpx/src/runtime-internals/events.test.ts +++ b/extensions/acpx/src/runtime-internals/events.test.ts @@ -44,6 +44,7 @@ describe("PromptStreamProjector", () => { type: "text_delta", text: "hello world", stream: "output", + tag: "agent_message_chunk", }); }); @@ -71,6 +72,7 @@ describe("PromptStreamProjector", () => { type: "text_delta", text: " indented", stream: "output", + tag: "agent_message_chunk", }); }); @@ -98,10 +100,11 @@ describe("PromptStreamProjector", () => { type: "text_delta", text: "thinking", stream: "thought", + tag: "agent_thought_chunk", }); }); - it("maps tool call updates to tool_call events", () => { + it("maps tool call updates with metadata and stable fallback title", () => { const projector = new PromptStreamProjector(); beginPrompt(projector); const event = projector.ingestLine( @@ -111,9 +114,8 @@ describe("PromptStreamProjector", () => { params: { sessionId: "session-1", update: { - sessionUpdate: "tool_call", - toolCallId: "call-1", - title: "exec", + sessionUpdate: "tool_call_update", + toolCallId: "call_ABC123", status: "in_progress", }, }, @@ -122,7 +124,38 @@ describe("PromptStreamProjector", () => { expect(event).toEqual({ type: "tool_call", - text: "exec (in_progress)", + text: "tool call (in_progress)", + tag: "tool_call_update", + toolCallId: "call_ABC123", + status: "in_progress", + title: "tool call", + }); + }); + + it("maps usage updates with numeric metadata", () => { + const projector = new PromptStreamProjector(); + beginPrompt(projector); + const event = projector.ingestLine( + jsonLine({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "usage_update", + used: 12, + size: 500, + }, + }, + }), + ); + + expect(event).toEqual({ + type: "status", + text: "usage updated: 12/500", + tag: "usage_update", + used: 12, + size: 500, }); }); @@ -167,6 +200,7 @@ describe("PromptStreamProjector", () => { type: "text_delta", text: "new turn", stream: "output", + tag: "agent_message_chunk", }); }); diff --git a/extensions/acpx/src/runtime-internals/events.ts b/extensions/acpx/src/runtime-internals/events.ts index 2e66076f595..986911bfdc0 100644 --- a/extensions/acpx/src/runtime-internals/events.ts +++ b/extensions/acpx/src/runtime-internals/events.ts @@ -1,4 +1,4 @@ -import type { AcpRuntimeEvent } from "openclaw/plugin-sdk"; +import type { AcpRuntimeEvent, AcpSessionUpdateTag } from "openclaw/plugin-sdk"; import { isAcpJsonRpcMessage, normalizeJsonRpcId } from "./jsonrpc.js"; import { asOptionalString, @@ -27,6 +27,10 @@ export function parseJsonLines(value: string): AcpxJsonObject[] { return events; } +function asOptionalFiniteNumber(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + function parsePromptStopReason(message: Record): string | undefined { if (!Object.hasOwn(message, "result")) { return undefined; @@ -39,6 +43,88 @@ function parsePromptStopReason(message: Record): string | undef return stopReason && stopReason.trim().length > 0 ? stopReason : undefined; } +function resolveTextChunk(params: { + update: Record; + stream: "output" | "thought"; + tag: AcpSessionUpdateTag; +}): AcpRuntimeEvent | null { + const contentRaw = params.update.content; + if (isRecord(contentRaw)) { + const contentType = asTrimmedString(contentRaw.type); + if (contentType && contentType !== "text") { + return null; + } + const text = asString(contentRaw.text); + if (text && text.length > 0) { + return { + type: "text_delta", + text, + stream: params.stream, + tag: params.tag, + }; + } + } + + const text = asString(params.update.text); + if (!text || text.length === 0) { + return null; + } + return { + type: "text_delta", + text, + stream: params.stream, + tag: params.tag, + }; +} + +function resolveStatusTextForTag(params: { + tag: AcpSessionUpdateTag; + update: Record; +}): string | null { + const { tag, update } = params; + if (tag === "available_commands_update") { + const commands = Array.isArray(update.availableCommands) ? update.availableCommands : []; + return commands.length > 0 + ? `available commands updated (${commands.length})` + : "available commands updated"; + } + if (tag === "current_mode_update") { + const mode = + asTrimmedString(update.currentModeId) || + asTrimmedString(update.modeId) || + asTrimmedString(update.mode); + return mode ? `mode updated: ${mode}` : "mode updated"; + } + if (tag === "config_option_update") { + const id = asTrimmedString(update.id) || asTrimmedString(update.configOptionId); + const value = + asTrimmedString(update.currentValue) || + asTrimmedString(update.value) || + asTrimmedString(update.optionValue); + if (id && value) { + return `config updated: ${id}=${value}`; + } + if (id) { + return `config updated: ${id}`; + } + return "config updated"; + } + if (tag === "session_info_update") { + return asTrimmedString(update.summary) || asTrimmedString(update.message) || "session updated"; + } + if (tag === "plan") { + const entries = Array.isArray(update.entries) ? update.entries : []; + const first = entries.find((entry) => isRecord(entry)) as Record | undefined; + const content = asTrimmedString(first?.content); + if (!content) { + return "plan updated"; + } + const status = asTrimmedString(first?.status); + return status ? `plan: [${status}] ${content}` : `plan: ${content}`; + } + return null; +} + function parseSessionUpdateEvent(message: Record): AcpRuntimeEvent | null { if (asTrimmedString(message.method) !== "session/update") { return null; @@ -52,105 +138,65 @@ function parseSessionUpdateEvent(message: Record): AcpRuntimeEv return null; } - const sessionUpdate = asTrimmedString(update.sessionUpdate); - switch (sessionUpdate) { - case "agent_message_chunk": { - const content = isRecord(update.content) ? update.content : null; - if (!content || asTrimmedString(content.type) !== "text") { - return null; - } - const text = asString(content.text); - if (!text) { - return null; - } - return { - type: "text_delta", - text, + const tag = asOptionalString(update.sessionUpdate) as AcpSessionUpdateTag | undefined; + if (!tag) { + return null; + } + + switch (tag) { + case "agent_message_chunk": + return resolveTextChunk({ + update, stream: "output", - }; - } - case "agent_thought_chunk": { - const content = isRecord(update.content) ? update.content : null; - if (!content || asTrimmedString(content.type) !== "text") { - return null; - } - const text = asString(content.text); - if (!text) { - return null; - } - return { - type: "text_delta", - text, + tag, + }); + case "agent_thought_chunk": + return resolveTextChunk({ + update, stream: "thought", - }; - } + tag, + }); case "tool_call": case "tool_call_update": { - const title = - asTrimmedString(update.title) || - asTrimmedString(update.toolCallId) || - asTrimmedString(update.kind) || - "tool"; + const title = asTrimmedString(update.title) || "tool call"; const status = asTrimmedString(update.status); + const toolCallId = asOptionalString(update.toolCallId); return { type: "tool_call", text: status ? `${title} (${status})` : title, - }; - } - case "plan": { - const entries = Array.isArray(update.entries) ? update.entries : []; - const first = entries.find((entry) => isRecord(entry)) as Record | undefined; - const content = asTrimmedString(first?.content); - if (!content) { - return { type: "status", text: "plan updated" }; - } - const status = asTrimmedString(first?.status); - return { - type: "status", - text: status ? `plan: [${status}] ${content}` : `plan: ${content}`, - }; - } - case "available_commands_update": { - const commands = Array.isArray(update.availableCommands) - ? update.availableCommands.length - : 0; - return { - type: "status", - text: `available commands updated (${commands})`, - }; - } - case "current_mode_update": { - const modeId = asTrimmedString(update.currentModeId); - return { - type: "status", - text: modeId ? `mode updated: ${modeId}` : "mode updated", - }; - } - case "config_option_update": { - const options = Array.isArray(update.configOptions) ? update.configOptions.length : 0; - return { - type: "status", - text: `config options updated (${options})`, - }; - } - case "session_info_update": { - const title = asTrimmedString(update.title); - return { - type: "status", - text: title ? `session info updated: ${title}` : "session info updated", + tag, + ...(toolCallId ? { toolCallId } : {}), + ...(status ? { status } : {}), + title, }; } case "usage_update": { - const used = - typeof update.used === "number" && Number.isFinite(update.used) ? update.used : null; - const size = - typeof update.size === "number" && Number.isFinite(update.size) ? update.size : null; - if (used == null || size == null) { - return { type: "status", text: "usage updated" }; + const used = asOptionalFiniteNumber(update.used); + const size = asOptionalFiniteNumber(update.size); + return { + type: "status", + text: used != null && size != null ? `usage updated: ${used}/${size}` : "usage updated", + tag, + ...(used != null ? { used } : {}), + ...(size != null ? { size } : {}), + }; + } + case "available_commands_update": + case "current_mode_update": + case "config_option_update": + case "session_info_update": + case "plan": { + const text = resolveStatusTextForTag({ + tag, + update, + }); + if (!text) { + return null; } return { type: "status", - text: `usage updated: ${used}/${size}`, + text, + tag, }; } default: diff --git a/src/acp/runtime/types.ts b/src/acp/runtime/types.ts index 4e479eb8c8c..ff4f39a70ee 100644 --- a/src/acp/runtime/types.ts +++ b/src/acp/runtime/types.ts @@ -2,6 +2,19 @@ export type AcpRuntimePromptMode = "prompt" | "steer"; export type AcpRuntimeSessionMode = "persistent" | "oneshot"; +export type AcpSessionUpdateTag = + | "agent_message_chunk" + | "agent_thought_chunk" + | "tool_call" + | "tool_call_update" + | "usage_update" + | "available_commands_update" + | "current_mode_update" + | "config_option_update" + | "session_info_update" + | "plan" + | (string & {}); + export type AcpRuntimeControl = "session/set_mode" | "session/set_config_option" | "session/status"; export type AcpRuntimeHandle = { @@ -67,14 +80,22 @@ export type AcpRuntimeEvent = type: "text_delta"; text: string; stream?: "output" | "thought"; + tag?: AcpSessionUpdateTag; } | { type: "status"; text: string; + tag?: AcpSessionUpdateTag; + used?: number; + size?: number; } | { type: "tool_call"; text: string; + tag?: AcpSessionUpdateTag; + toolCallId?: string; + status?: string; + title?: string; } | { type: "done"; diff --git a/src/auto-reply/reply/abort.test.ts b/src/auto-reply/reply/abort.test.ts index a76eb9b1b2d..9041380030d 100644 --- a/src/auto-reply/reply/abort.test.ts +++ b/src/auto-reply/reply/abort.test.ts @@ -43,6 +43,26 @@ vi.mock("../../agents/subagent-registry.js", () => ({ markSubagentRunTerminated: subagentRegistryMocks.markSubagentRunTerminated, })); +const acpManagerMocks = vi.hoisted(() => ({ + resolveSession: vi.fn< + () => + | { kind: "none" } + | { + kind: "ready"; + sessionKey: string; + meta: unknown; + } + >(() => ({ kind: "none" })), + cancelSession: vi.fn(async () => {}), +})); + +vi.mock("../../acp/control-plane/manager.js", () => ({ + getAcpSessionManager: () => ({ + resolveSession: acpManagerMocks.resolveSession, + cancelSession: acpManagerMocks.cancelSession, + }), +})); + describe("abort detection", () => { async function writeSessionStore( storePath: string, @@ -106,6 +126,8 @@ describe("abort detection", () => { afterEach(() => { resetAbortMemoryForTest(); + acpManagerMocks.resolveSession.mockReset().mockReturnValue({ kind: "none" }); + acpManagerMocks.cancelSession.mockReset().mockResolvedValue(undefined); }); it("triggerBodyNormalized extracts /stop from RawBody for abort detection", async () => { @@ -355,6 +377,85 @@ describe("abort detection", () => { expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${sessionKey}`); }); + it("plain-language stop on ACP-bound session triggers ACP cancel", async () => { + const sessionKey = "agent:codex:acp:test-1"; + const sessionId = "session-123"; + const { cfg } = await createAbortConfig({ + sessionIdsByKey: { [sessionKey]: sessionId }, + }); + acpManagerMocks.resolveSession.mockReturnValue({ + kind: "ready", + sessionKey, + meta: {} as never, + }); + + const result = await runStopCommand({ + cfg, + sessionKey, + from: "telegram:123", + to: "telegram:123", + targetSessionKey: sessionKey, + }); + + expect(result.handled).toBe(true); + expect(acpManagerMocks.cancelSession).toHaveBeenCalledWith({ + cfg, + sessionKey, + reason: "fast-abort", + }); + }); + + it("ACP cancel failures do not skip queue and lane cleanup", async () => { + const sessionKey = "agent:codex:acp:test-2"; + const sessionId = "session-456"; + const { root, cfg } = await createAbortConfig({ + sessionIdsByKey: { [sessionKey]: sessionId }, + }); + const followupRun: FollowupRun = { + prompt: "queued", + enqueuedAt: Date.now(), + run: { + agentId: "main", + agentDir: path.join(root, "agent"), + sessionId, + sessionKey, + messageProvider: "telegram", + agentAccountId: "acct", + sessionFile: path.join(root, "session.jsonl"), + workspaceDir: path.join(root, "workspace"), + config: cfg, + provider: "anthropic", + model: "claude-opus-4-5", + timeoutMs: 1000, + blockReplyBreak: "text_end", + }, + }; + enqueueFollowupRun( + sessionKey, + followupRun, + { mode: "collect", debounceMs: 0, cap: 20, dropPolicy: "summarize" }, + "none", + ); + acpManagerMocks.resolveSession.mockReturnValue({ + kind: "ready", + sessionKey, + meta: {} as never, + }); + acpManagerMocks.cancelSession.mockRejectedValueOnce(new Error("cancel failed")); + + const result = await runStopCommand({ + cfg, + sessionKey, + from: "telegram:123", + to: "telegram:123", + targetSessionKey: sessionKey, + }); + + expect(result.handled).toBe(true); + expect(getFollowupQueueDepth(sessionKey)).toBe(0); + expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${sessionKey}`); + }); + it("persists abort cutoff metadata on /stop when command and target session match", async () => { const sessionKey = "telegram:123"; const sessionId = "session-123"; diff --git a/src/auto-reply/reply/abort.ts b/src/auto-reply/reply/abort.ts index 0b318272d20..ba4d92b1dfa 100644 --- a/src/auto-reply/reply/abort.ts +++ b/src/auto-reply/reply/abort.ts @@ -1,3 +1,4 @@ +import { getAcpSessionManager } from "../../acp/control-plane/manager.js"; import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import { abortEmbeddedPiRun } from "../../agents/pi-embedded.js"; import { @@ -301,9 +302,28 @@ export async function tryFastAbortFromMessage(params: { const storePath = resolveStorePath(cfg.session?.store, { agentId }); const store = loadSessionStore(storePath); const { entry, key } = resolveSessionEntryForKey(store, targetKey); + const resolvedTargetKey = key ?? targetKey; + const acpManager = getAcpSessionManager(); + const acpResolution = acpManager.resolveSession({ + cfg, + sessionKey: resolvedTargetKey, + }); + if (acpResolution.kind !== "none") { + try { + await acpManager.cancelSession({ + cfg, + sessionKey: resolvedTargetKey, + reason: "fast-abort", + }); + } catch (error) { + logVerbose( + `abort: ACP cancel failed for ${resolvedTargetKey}: ${error instanceof Error ? error.message : String(error)}`, + ); + } + } const sessionId = entry?.sessionId; const aborted = sessionId ? abortEmbeddedPiRun(sessionId) : false; - const cleared = clearSessionQueues([key ?? targetKey, sessionId]); + const cleared = clearSessionQueues([resolvedTargetKey, sessionId]); if (cleared.followupCleared > 0 || cleared.laneCleared > 0) { logVerbose( `abort: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`, @@ -311,7 +331,7 @@ export async function tryFastAbortFromMessage(params: { } const abortCutoff = shouldPersistAbortCutoff({ commandSessionKey: ctx.SessionKey, - targetSessionKey: key ?? targetKey, + targetSessionKey: resolvedTargetKey, }) ? resolveAbortCutoffFromContext(ctx) : undefined; diff --git a/src/auto-reply/reply/acp-projector.test.ts b/src/auto-reply/reply/acp-projector.test.ts index 829ef7cc452..8a13d72ea4e 100644 --- a/src/auto-reply/reply/acp-projector.test.ts +++ b/src/auto-reply/reply/acp-projector.test.ts @@ -1,5 +1,6 @@ -import { describe, expect, it, vi } from "vitest"; +import { describe, expect, it } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; +import { prefixSystemMessage } from "../../infra/system-message.js"; import { createAcpReplyProjector } from "./acp-projector.js"; function createCfg(overrides?: Partial): OpenClawConfig { @@ -8,7 +9,7 @@ function createCfg(overrides?: Partial): OpenClawConfig { enabled: true, stream: { coalesceIdleMs: 0, - maxChunkChars: 50, + maxChunkChars: 64, }, }, ...overrides, @@ -29,71 +30,123 @@ describe("createAcpReplyProjector", () => { await projector.onEvent({ type: "text_delta", - text: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + text: "a".repeat(70), + tag: "agent_message_chunk", + }); + await projector.flush(true); + + expect(deliveries).toEqual([ + { kind: "block", text: "a".repeat(64) }, + { kind: "block", text: "a".repeat(6) }, + ]); + }); + + it("supports deliveryMode=final_only by buffering deltas until done", async () => { + const deliveries: Array<{ kind: string; text?: string }> = []; + const projector = createAcpReplyProjector({ + cfg: createCfg({ + acp: { + enabled: true, + stream: { + coalesceIdleMs: 0, + maxChunkChars: 512, + deliveryMode: "final_only", + }, + }, + }), + shouldSendToolSummaries: true, + deliver: async (kind, payload) => { + deliveries.push({ kind, text: payload.text }); + return true; + }, + }); + + await projector.onEvent({ + type: "text_delta", + text: "What", + tag: "agent_message_chunk", }); await projector.onEvent({ type: "text_delta", - text: "bbbbbbbbbb", + text: " now?", + tag: "agent_message_chunk", }); - await projector.flush(true); - - expect(deliveries).toEqual([ - { - kind: "block", - text: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - }, - { kind: "block", text: "aabbbbbbbbbb" }, - ]); - }); - - it("buffers tiny token deltas and flushes once at turn end", async () => { - const deliveries: Array<{ kind: string; text?: string }> = []; - const projector = createAcpReplyProjector({ - cfg: createCfg({ - acp: { - enabled: true, - stream: { - coalesceIdleMs: 0, - maxChunkChars: 256, - }, - }, - }), - shouldSendToolSummaries: true, - provider: "discord", - deliver: async (kind, payload) => { - deliveries.push({ kind, text: payload.text }); - return true; - }, - }); - - await projector.onEvent({ type: "text_delta", text: "What" }); - await projector.onEvent({ type: "text_delta", text: " do" }); - await projector.onEvent({ type: "text_delta", text: " you want to work on?" }); - expect(deliveries).toEqual([]); - await projector.flush(true); - - expect(deliveries).toEqual([{ kind: "block", text: "What do you want to work on?" }]); + await projector.onEvent({ type: "done" }); + expect(deliveries).toEqual([{ kind: "block", text: "What now?" }]); }); - it("filters thought stream text and suppresses tool summaries when disabled", async () => { - const deliver = vi.fn(async () => true); - const projector = createAcpReplyProjector({ + it("suppresses usage_update by default and allows deduped usage when enabled", async () => { + const hidden: Array<{ kind: string; text?: string }> = []; + const hiddenProjector = createAcpReplyProjector({ cfg: createCfg(), - shouldSendToolSummaries: false, - deliver, + shouldSendToolSummaries: true, + deliver: async (kind, payload) => { + hidden.push({ kind, text: payload.text }); + return true; + }, + }); + await hiddenProjector.onEvent({ + type: "status", + text: "usage updated: 10/100", + tag: "usage_update", + used: 10, + size: 100, + }); + expect(hidden).toEqual([]); + + const shown: Array<{ kind: string; text?: string }> = []; + const shownProjector = createAcpReplyProjector({ + cfg: createCfg({ + acp: { + enabled: true, + stream: { + coalesceIdleMs: 0, + maxChunkChars: 64, + showUsage: true, + tagVisibility: { + usage_update: true, + }, + }, + }, + }), + shouldSendToolSummaries: true, + deliver: async (kind, payload) => { + shown.push({ kind, text: payload.text }); + return true; + }, }); - await projector.onEvent({ type: "text_delta", text: "internal", stream: "thought" }); - await projector.onEvent({ type: "status", text: "running tool" }); - await projector.onEvent({ type: "tool_call", text: "ls" }); - await projector.flush(true); + await shownProjector.onEvent({ + type: "status", + text: "usage updated: 10/100", + tag: "usage_update", + used: 10, + size: 100, + }); + await shownProjector.onEvent({ + type: "status", + text: "usage updated: 10/100", + tag: "usage_update", + used: 10, + size: 100, + }); + await shownProjector.onEvent({ + type: "status", + text: "usage updated: 11/100", + tag: "usage_update", + used: 11, + size: 100, + }); - expect(deliver).not.toHaveBeenCalled(); + expect(shown).toEqual([ + { kind: "tool", text: prefixSystemMessage("usage updated: 10/100") }, + { kind: "tool", text: prefixSystemMessage("usage updated: 11/100") }, + ]); }); - it("emits status and tool_call summaries when enabled", async () => { + it("dedupes repeated tool lifecycle updates in minimal mode", async () => { const deliveries: Array<{ kind: string; text?: string }> = []; const projector = createAcpReplyProjector({ cfg: createCfg(), @@ -104,16 +157,70 @@ describe("createAcpReplyProjector", () => { }, }); - await projector.onEvent({ type: "status", text: "planning" }); - await projector.onEvent({ type: "tool_call", text: "exec ls" }); + await projector.onEvent({ + type: "tool_call", + tag: "tool_call", + toolCallId: "call_1", + status: "in_progress", + title: "List files", + text: "List files (in_progress)", + }); + await projector.onEvent({ + type: "tool_call", + tag: "tool_call_update", + toolCallId: "call_1", + status: "in_progress", + title: "List files", + text: "List files (in_progress)", + }); + await projector.onEvent({ + type: "tool_call", + tag: "tool_call_update", + toolCallId: "call_1", + status: "completed", + title: "List files", + text: "List files (completed)", + }); + await projector.onEvent({ + type: "tool_call", + tag: "tool_call_update", + toolCallId: "call_1", + status: "completed", + title: "List files", + text: "List files (completed)", + }); - expect(deliveries).toEqual([ - { kind: "tool", text: "⚙️ planning" }, - { kind: "tool", text: "🧰 exec ls" }, - ]); + expect(deliveries.length).toBe(2); + expect(deliveries[0]?.kind).toBe("tool"); + expect(deliveries[0]?.text).toContain("Tool Call"); + expect(deliveries[1]?.kind).toBe("tool"); + expect(deliveries[1]?.text).toContain("Tool Call"); }); - it("flushes pending streamed text before tool/status updates", async () => { + it("renders fallback tool labels without leaking call ids as primary label", async () => { + const deliveries: Array<{ kind: string; text?: string }> = []; + const projector = createAcpReplyProjector({ + cfg: createCfg(), + shouldSendToolSummaries: true, + deliver: async (kind, payload) => { + deliveries.push({ kind, text: payload.text }); + return true; + }, + }); + + await projector.onEvent({ + type: "tool_call", + tag: "tool_call", + toolCallId: "call_ABC123", + status: "in_progress", + text: "call_ABC123 (in_progress)", + }); + + expect(deliveries[0]?.text).toContain("Tool Call"); + expect(deliveries[0]?.text).not.toContain("call_ABC123 ("); + }); + + it("respects metaMode=off and still streams assistant text", async () => { const deliveries: Array<{ kind: string; text?: string }> = []; const projector = createAcpReplyProjector({ cfg: createCfg({ @@ -122,24 +229,118 @@ describe("createAcpReplyProjector", () => { stream: { coalesceIdleMs: 0, maxChunkChars: 256, + metaMode: "off", }, }, }), shouldSendToolSummaries: true, - provider: "discord", deliver: async (kind, payload) => { deliveries.push({ kind, text: payload.text }); return true; }, }); - await projector.onEvent({ type: "text_delta", text: "Hello" }); - await projector.onEvent({ type: "text_delta", text: " world" }); - await projector.onEvent({ type: "status", text: "running tool" }); + await projector.onEvent({ + type: "status", + text: "available commands updated", + tag: "available_commands_update", + }); + await projector.onEvent({ + type: "tool_call", + text: "tool call", + tag: "tool_call", + toolCallId: "x", + status: "in_progress", + }); + await projector.onEvent({ + type: "text_delta", + text: "hello", + tag: "agent_message_chunk", + }); + await projector.flush(true); + + expect(deliveries).toEqual([{ kind: "block", text: "hello" }]); + }); + + it("truncates oversized turns once and emits one truncation notice", async () => { + const deliveries: Array<{ kind: string; text?: string }> = []; + const projector = createAcpReplyProjector({ + cfg: createCfg({ + acp: { + enabled: true, + stream: { + coalesceIdleMs: 0, + maxChunkChars: 256, + maxTurnChars: 5, + metaMode: "minimal", + }, + }, + }), + shouldSendToolSummaries: true, + deliver: async (kind, payload) => { + deliveries.push({ kind, text: payload.text }); + return true; + }, + }); + + await projector.onEvent({ + type: "text_delta", + text: "hello world", + tag: "agent_message_chunk", + }); + await projector.onEvent({ + type: "text_delta", + text: "ignored tail", + tag: "agent_message_chunk", + }); + await projector.flush(true); expect(deliveries).toEqual([ - { kind: "block", text: "Hello world" }, - { kind: "tool", text: "⚙️ running tool" }, + { kind: "block", text: "hello" }, + { kind: "tool", text: prefixSystemMessage("output truncated") }, ]); }); + + it("supports tagVisibility overrides for tool updates", async () => { + const deliveries: Array<{ kind: string; text?: string }> = []; + const projector = createAcpReplyProjector({ + cfg: createCfg({ + acp: { + enabled: true, + stream: { + coalesceIdleMs: 0, + maxChunkChars: 256, + tagVisibility: { + tool_call_update: false, + }, + }, + }, + }), + shouldSendToolSummaries: true, + deliver: async (kind, payload) => { + deliveries.push({ kind, text: payload.text }); + return true; + }, + }); + + await projector.onEvent({ + type: "tool_call", + tag: "tool_call", + toolCallId: "c1", + status: "in_progress", + title: "Run tests", + text: "Run tests (in_progress)", + }); + await projector.onEvent({ + type: "tool_call", + tag: "tool_call_update", + toolCallId: "c1", + status: "completed", + title: "Run tests", + text: "Run tests (completed)", + }); + + expect(deliveries.length).toBe(1); + expect(deliveries[0]?.text).toContain("Tool Call"); + }); }); diff --git a/src/auto-reply/reply/acp-projector.ts b/src/auto-reply/reply/acp-projector.ts index 8bbe643dc30..e1d5cbc9af0 100644 --- a/src/auto-reply/reply/acp-projector.ts +++ b/src/auto-reply/reply/acp-projector.ts @@ -1,6 +1,8 @@ -import type { AcpRuntimeEvent } from "../../acp/runtime/types.js"; +import type { AcpRuntimeEvent, AcpSessionUpdateTag } from "../../acp/runtime/types.js"; import { EmbeddedBlockChunker } from "../../agents/pi-embedded-block-chunker.js"; +import { formatToolSummary, resolveToolDisplay } from "../../agents/tool-display.js"; import type { OpenClawConfig } from "../../config/config.js"; +import { prefixSystemMessage } from "../../infra/system-message.js"; import type { ReplyPayload } from "../types.js"; import { createBlockReplyPipeline } from "./block-reply-pipeline.js"; import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js"; @@ -8,8 +10,57 @@ import type { ReplyDispatchKind } from "./reply-dispatcher.js"; const DEFAULT_ACP_STREAM_COALESCE_IDLE_MS = 350; const DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS = 1800; +const DEFAULT_ACP_META_MODE = "minimal"; +const DEFAULT_ACP_SHOW_USAGE = false; +const DEFAULT_ACP_DELIVERY_MODE = "live"; +const DEFAULT_ACP_MAX_TURN_CHARS = 24_000; +const DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS = 320; +const DEFAULT_ACP_MAX_STATUS_CHARS = 320; +const DEFAULT_ACP_MAX_META_EVENTS_PER_TURN = 64; const ACP_BLOCK_REPLY_TIMEOUT_MS = 15_000; +const ACP_TAG_VISIBILITY_DEFAULTS: Record = { + agent_message_chunk: true, + tool_call: true, + tool_call_update: true, + usage_update: false, + available_commands_update: false, + current_mode_update: false, + config_option_update: false, + session_info_update: false, + plan: false, + agent_thought_chunk: false, +}; + +const TERMINAL_TOOL_STATUSES = new Set(["completed", "failed", "cancelled", "done", "error"]); + +export type AcpProjectedDeliveryMeta = { + tag?: AcpSessionUpdateTag; + toolCallId?: string; + toolStatus?: string; + allowEdit?: boolean; +}; + +type AcpDeliveryMode = "live" | "final_only"; +type AcpMetaMode = "off" | "minimal" | "verbose"; + +type AcpProjectionSettings = { + deliveryMode: AcpDeliveryMode; + metaMode: AcpMetaMode; + showUsage: boolean; + maxTurnChars: number; + maxToolSummaryChars: number; + maxStatusChars: number; + maxMetaEventsPerTurn: number; + tagVisibility: Partial>; +}; + +type ToolLifecycleState = { + started: boolean; + terminal: boolean; + lastRenderedHash?: string; +}; + function clampPositiveInteger( value: unknown, fallback: number, @@ -28,6 +79,21 @@ function clampPositiveInteger( return rounded; } +function clampBoolean(value: unknown, fallback: boolean): boolean { + return typeof value === "boolean" ? value : fallback; +} + +function resolveAcpDeliveryMode(value: unknown): AcpDeliveryMode { + return value === "final_only" ? "final_only" : DEFAULT_ACP_DELIVERY_MODE; +} + +function resolveAcpMetaMode(value: unknown): AcpMetaMode { + if (value === "off" || value === "minimal" || value === "verbose") { + return value; + } + return DEFAULT_ACP_META_MODE; +} + function resolveAcpStreamCoalesceIdleMs(cfg: OpenClawConfig): number { return clampPositiveInteger( cfg.acp?.stream?.coalesceIdleMs, @@ -46,6 +112,40 @@ function resolveAcpStreamMaxChunkChars(cfg: OpenClawConfig): number { }); } +function resolveAcpProjectionSettings(cfg: OpenClawConfig): AcpProjectionSettings { + const stream = cfg.acp?.stream; + return { + deliveryMode: resolveAcpDeliveryMode(stream?.deliveryMode), + metaMode: resolveAcpMetaMode(stream?.metaMode), + showUsage: clampBoolean(stream?.showUsage, DEFAULT_ACP_SHOW_USAGE), + maxTurnChars: clampPositiveInteger(stream?.maxTurnChars, DEFAULT_ACP_MAX_TURN_CHARS, { + min: 1, + max: 500_000, + }), + maxToolSummaryChars: clampPositiveInteger( + stream?.maxToolSummaryChars, + DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS, + { + min: 64, + max: 8_000, + }, + ), + maxStatusChars: clampPositiveInteger(stream?.maxStatusChars, DEFAULT_ACP_MAX_STATUS_CHARS, { + min: 64, + max: 8_000, + }), + maxMetaEventsPerTurn: clampPositiveInteger( + stream?.maxMetaEventsPerTurn, + DEFAULT_ACP_MAX_META_EVENTS_PER_TURN, + { + min: 1, + max: 2_000, + }, + ), + tagVisibility: stream?.tagVisibility ?? {}, + }; +} + function resolveAcpStreamingConfig(params: { cfg: OpenClawConfig; provider?: string; @@ -60,6 +160,66 @@ function resolveAcpStreamingConfig(params: { }); } +function truncateText(input: string, maxChars: number): string { + if (input.length <= maxChars) { + return input; + } + if (maxChars <= 1) { + return input.slice(0, maxChars); + } + return `${input.slice(0, maxChars - 1)}…`; +} + +function hashText(text: string): string { + return text.trim(); +} + +function normalizeToolStatus(status: string | undefined): string | undefined { + if (!status) { + return undefined; + } + const normalized = status.trim().toLowerCase(); + return normalized || undefined; +} + +function isTagVisible( + settings: AcpProjectionSettings, + tag: AcpSessionUpdateTag | undefined, +): boolean { + if (!tag) { + return true; + } + const override = settings.tagVisibility[tag]; + if (typeof override === "boolean") { + return override; + } + if (Object.prototype.hasOwnProperty.call(ACP_TAG_VISIBILITY_DEFAULTS, tag)) { + return ACP_TAG_VISIBILITY_DEFAULTS[tag]; + } + return true; +} + +function renderToolSummaryText(event: Extract): string { + const detailParts: string[] = []; + const title = event.title?.trim(); + if (title) { + detailParts.push(title); + } + const status = event.status?.trim(); + if (status) { + detailParts.push(`status=${status}`); + } + const fallback = event.text?.trim(); + if (detailParts.length === 0 && fallback) { + detailParts.push(fallback); + } + const display = resolveToolDisplay({ + name: "tool_call", + meta: detailParts.join(" · ") || "tool call", + }); + return formatToolSummary(display); +} + export type AcpReplyProjector = { onEvent: (event: AcpRuntimeEvent) => Promise; flush: (force?: boolean) => Promise; @@ -68,10 +228,15 @@ export type AcpReplyProjector = { export function createAcpReplyProjector(params: { cfg: OpenClawConfig; shouldSendToolSummaries: boolean; - deliver: (kind: ReplyDispatchKind, payload: ReplyPayload) => Promise; + deliver: ( + kind: ReplyDispatchKind, + payload: ReplyPayload, + meta?: AcpProjectedDeliveryMeta, + ) => Promise; provider?: string; accountId?: string; }): AcpReplyProjector { + const settings = resolveAcpProjectionSettings(params.cfg); const streaming = resolveAcpStreamingConfig({ cfg: params.cfg, provider: params.provider, @@ -86,7 +251,28 @@ export function createAcpReplyProjector(params: { }); const chunker = new EmbeddedBlockChunker(streaming.chunking); + let emittedTurnChars = 0; + let emittedMetaEvents = 0; + let truncationNoticeEmitted = false; + let lastStatusHash: string | undefined; + let lastToolHash: string | undefined; + let lastUsageTuple: string | undefined; + const toolLifecycleById = new Map(); + + const resetTurnState = () => { + emittedTurnChars = 0; + emittedMetaEvents = 0; + truncationNoticeEmitted = false; + lastStatusHash = undefined; + lastToolHash = undefined; + lastUsageTuple = undefined; + toolLifecycleById.clear(); + }; + const drainChunker = (force: boolean) => { + if (settings.deliveryMode === "final_only" && !force) { + return; + } chunker.drain({ force, emit: (chunk) => { @@ -100,13 +286,132 @@ export function createAcpReplyProjector(params: { await blockReplyPipeline.flush({ force }); }; - const emitToolSummary = async (prefix: string, text: string): Promise => { - if (!params.shouldSendToolSummaries || !text) { + const consumeMetaQuota = (force: boolean): boolean => { + if (force) { + return true; + } + if (emittedMetaEvents >= settings.maxMetaEventsPerTurn) { + return false; + } + emittedMetaEvents += 1; + return true; + }; + + const emitSystemStatus = async ( + text: string, + meta?: AcpProjectedDeliveryMeta, + opts?: { force?: boolean; dedupe?: boolean }, + ) => { + if (!params.shouldSendToolSummaries) { return; } - // Keep tool summaries ordered after any pending streamed text. - await flush(true); - await params.deliver("tool", { text: `${prefix} ${text}` }); + if (settings.metaMode === "off" && opts?.force !== true) { + return; + } + const bounded = truncateText(text.trim(), settings.maxStatusChars); + if (!bounded) { + return; + } + const formatted = prefixSystemMessage(bounded); + const hash = hashText(formatted); + const shouldDedupe = opts?.dedupe !== false; + if (shouldDedupe && lastStatusHash === hash) { + return; + } + if (!consumeMetaQuota(opts?.force === true)) { + return; + } + if (settings.deliveryMode === "live") { + await flush(true); + } + await params.deliver("tool", { text: formatted }, meta); + lastStatusHash = hash; + }; + + const emitToolSummary = async ( + event: Extract, + opts?: { force?: boolean }, + ) => { + if (!params.shouldSendToolSummaries || settings.metaMode === "off") { + return; + } + if (!isTagVisible(settings, event.tag)) { + return; + } + + const toolSummary = truncateText(renderToolSummaryText(event), settings.maxToolSummaryChars); + const hash = hashText(toolSummary); + const toolCallId = event.toolCallId?.trim() || undefined; + const status = normalizeToolStatus(event.status); + const isTerminal = status ? TERMINAL_TOOL_STATUSES.has(status) : false; + const isStart = status === "in_progress" || event.tag === "tool_call"; + + if (settings.metaMode === "verbose") { + if (lastToolHash === hash) { + return; + } + } else if (settings.metaMode === "minimal") { + if (toolCallId) { + const state = toolLifecycleById.get(toolCallId) ?? { + started: false, + terminal: false, + }; + if (isTerminal && state.terminal) { + return; + } + if (isStart && state.started) { + return; + } + if (state.lastRenderedHash === hash) { + return; + } + if (isStart) { + state.started = true; + } + if (isTerminal) { + state.terminal = true; + } + state.lastRenderedHash = hash; + toolLifecycleById.set(toolCallId, state); + } else if (lastToolHash === hash) { + return; + } + } + + if (!consumeMetaQuota(opts?.force === true)) { + return; + } + if (settings.deliveryMode === "live") { + await flush(true); + } + await params.deliver( + "tool", + { text: toolSummary }, + { + ...(event.tag ? { tag: event.tag } : {}), + ...(toolCallId ? { toolCallId } : {}), + ...(status ? { toolStatus: status } : {}), + allowEdit: Boolean(toolCallId && event.tag === "tool_call_update"), + }, + ); + lastToolHash = hash; + }; + + const emitTruncationNotice = async () => { + if (truncationNoticeEmitted) { + return; + } + truncationNoticeEmitted = true; + await emitSystemStatus( + "output truncated", + { + tag: "session_info_update", + }, + { + force: true, + dedupe: false, + }, + ); }; const onEvent = async (event: AcpRuntimeEvent): Promise => { @@ -114,22 +419,61 @@ export function createAcpReplyProjector(params: { if (event.stream && event.stream !== "output") { return; } - if (event.text) { - chunker.append(event.text); + if (!isTagVisible(settings, event.tag)) { + return; + } + const text = event.text; + if (!text) { + return; + } + if (emittedTurnChars >= settings.maxTurnChars) { + await emitTruncationNotice(); + return; + } + const remaining = settings.maxTurnChars - emittedTurnChars; + const accepted = remaining < text.length ? text.slice(0, remaining) : text; + if (accepted.length > 0) { + chunker.append(accepted); + emittedTurnChars += accepted.length; drainChunker(false); } + if (accepted.length < text.length) { + await emitTruncationNotice(); + } return; } + if (event.type === "status") { - await emitToolSummary("⚙️", event.text); + if (!isTagVisible(settings, event.tag)) { + return; + } + if (event.tag === "usage_update") { + if (!settings.showUsage) { + return; + } + const usageTuple = + typeof event.used === "number" && typeof event.size === "number" + ? `${event.used}/${event.size}` + : hashText(event.text); + if (usageTuple === lastUsageTuple) { + return; + } + lastUsageTuple = usageTuple; + } + await emitSystemStatus(event.text, event.tag ? { tag: event.tag } : undefined, { + dedupe: true, + }); return; } + if (event.type === "tool_call") { - await emitToolSummary("🧰", event.text); + await emitToolSummary(event); return; } + if (event.type === "done" || event.type === "error") { await flush(true); + resetTurnState(); } }; diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index a5e6f25287f..573504f69c4 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -11,6 +11,7 @@ import { readAcpSessionEntry } from "../../acp/runtime/session-meta.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { TtsAutoMode } from "../../config/types.tts.js"; import { logVerbose } from "../../globals.js"; +import { runMessageAction } from "../../infra/outbound/message-action-runner.js"; import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js"; import { generateSecureUuid } from "../../infra/secure-random.js"; import { prefixSystemMessage } from "../../infra/system-message.js"; @@ -157,6 +158,7 @@ export async function tryDispatchAcpReply(params: { originatingTo?: string; shouldSendToolSummaries: boolean; bypassForCommand: boolean; + onReplyStart?: () => Promise | void; recordProcessed: DispatchProcessedRecorder; markIdle: (reason: string) => void; }): Promise { @@ -182,9 +184,69 @@ export async function tryDispatchAcpReply(params: { let queuedFinal = false; let acpAccumulatedBlockText = ""; let acpBlockCount = 0; + let startedReplyLifecycle = false; + const toolUpdateMessageById = new Map< + string, + { + channel: string; + accountId?: string; + to: string; + threadId?: string | number; + messageId: string; + } + >(); + + const ensureReplyLifecycleStarted = async () => { + if (startedReplyLifecycle) { + return; + } + startedReplyLifecycle = true; + await params.onReplyStart?.(); + }; + + const tryEditToolUpdate = async (payload: ReplyPayload, toolCallId: string): Promise => { + if (!params.shouldRouteToOriginating || !params.originatingChannel || !params.originatingTo) { + return false; + } + const handle = toolUpdateMessageById.get(toolCallId); + if (!handle?.messageId) { + return false; + } + const message = payload.text?.trim(); + if (!message) { + return false; + } + try { + await runMessageAction({ + cfg: params.cfg, + action: "edit", + params: { + channel: handle.channel, + accountId: handle.accountId, + to: handle.to, + threadId: handle.threadId, + messageId: handle.messageId, + message, + }, + sessionKey: params.ctx.SessionKey, + }); + routedCounts.tool += 1; + return true; + } catch (error) { + logVerbose( + `dispatch-acp: tool message edit failed for ${toolCallId}: ${error instanceof Error ? error.message : String(error)}`, + ); + return false; + } + }; + const deliverAcpPayload = async ( kind: ReplyDispatchKind, payload: ReplyPayload, + meta?: { + toolCallId?: string; + allowEdit?: boolean; + }, ): Promise => { if (kind === "block" && payload.text?.trim()) { if (acpAccumulatedBlockText.length > 0) { @@ -193,6 +255,9 @@ export async function tryDispatchAcpReply(params: { acpAccumulatedBlockText += payload.text; acpBlockCount += 1; } + if ((payload.text?.trim() ?? "").length > 0 || payload.mediaUrl || payload.mediaUrls?.length) { + await ensureReplyLifecycleStarted(); + } const ttsPayload = await maybeApplyTtsToPayload({ payload, @@ -204,6 +269,13 @@ export async function tryDispatchAcpReply(params: { }); if (params.shouldRouteToOriginating && params.originatingChannel && params.originatingTo) { + const toolCallId = meta?.toolCallId?.trim(); + if (kind === "tool" && meta?.allowEdit === true && toolCallId) { + const edited = await tryEditToolUpdate(ttsPayload, toolCallId); + if (edited) { + return true; + } + } const result = await routeReply({ payload: ttsPayload, channel: params.originatingChannel, @@ -219,6 +291,15 @@ export async function tryDispatchAcpReply(params: { ); return false; } + if (kind === "tool" && meta?.toolCallId && result.messageId) { + toolUpdateMessageById.set(meta.toolCallId, { + channel: params.originatingChannel, + accountId: params.ctx.AccountId, + to: params.originatingTo, + ...(params.ctx.MessageThreadId != null ? { threadId: params.ctx.MessageThreadId } : {}), + messageId: result.messageId, + }); + } routedCounts[kind] += 1; return true; } diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index ff42ff2d81b..0cfcdf03ce0 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -371,6 +371,7 @@ export async function dispatchReplyFromConfig(params: { originatingTo, shouldSendToolSummaries, bypassForCommand: bypassAcpForCommand, + onReplyStart: params.replyOptions?.onReplyStart, recordProcessed, markIdle, }); diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index fbcb86286b8..eb6b61161a6 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -166,11 +166,27 @@ export const FIELD_HELP: Record = { "Allowlist of ACP target agent ids permitted for ACP runtime sessions. Empty means no additional allowlist restriction.", "acp.maxConcurrentSessions": "Maximum concurrently active ACP sessions across this gateway process.", - "acp.stream": "ACP streaming projection controls for chunk sizing and coalescer flush timing.", + "acp.stream": + "ACP streaming projection controls for chunk sizing, metadata visibility, and deduped delivery behavior.", "acp.stream.coalesceIdleMs": "Coalescer idle flush window in milliseconds for ACP streamed text before block replies are emitted.", "acp.stream.maxChunkChars": "Maximum chunk size for ACP streamed block projection before splitting into multiple block replies.", + "acp.stream.metaMode": + "ACP metadata projection mode: off suppresses status/tool lines, minimal dedupes aggressively, verbose streams non-identical updates.", + "acp.stream.showUsage": + "When true, usage_update events are projected as system lines only when usage values change.", + "acp.stream.deliveryMode": + "ACP delivery style: live streams block chunks incrementally, final_only buffers text deltas until terminal turn events.", + "acp.stream.maxTurnChars": + "Maximum assistant text characters projected per ACP turn before truncation notice is emitted.", + "acp.stream.maxToolSummaryChars": + "Maximum characters for projected ACP tool lifecycle/progress summary lines.", + "acp.stream.maxStatusChars": "Maximum characters for projected ACP status/meta lines.", + "acp.stream.maxMetaEventsPerTurn": + "Maximum ACP meta events projected per turn (text deltas continue unaffected).", + "acp.stream.tagVisibility": + "Per-sessionUpdate visibility overrides for ACP projection (for example usage_update, available_commands_update).", "acp.runtime.ttlMinutes": "Idle runtime TTL in minutes for ACP session workers before eligible cleanup.", "acp.runtime.installCommand": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index 82ed7d05290..bb29fe76a1d 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -369,6 +369,14 @@ export const FIELD_LABELS: Record = { "acp.stream": "ACP Stream", "acp.stream.coalesceIdleMs": "ACP Stream Coalesce Idle (ms)", "acp.stream.maxChunkChars": "ACP Stream Max Chunk Chars", + "acp.stream.metaMode": "ACP Stream Meta Mode", + "acp.stream.showUsage": "ACP Stream Show Usage", + "acp.stream.deliveryMode": "ACP Stream Delivery Mode", + "acp.stream.maxTurnChars": "ACP Stream Max Turn Chars", + "acp.stream.maxToolSummaryChars": "ACP Stream Max Tool Summary Chars", + "acp.stream.maxStatusChars": "ACP Stream Max Status Chars", + "acp.stream.maxMetaEventsPerTurn": "ACP Stream Max Meta Events Per Turn", + "acp.stream.tagVisibility": "ACP Stream Tag Visibility", "acp.runtime.ttlMinutes": "ACP Runtime TTL (minutes)", "acp.runtime.installCommand": "ACP Runtime Install Command", models: "Models", diff --git a/src/config/types.acp.ts b/src/config/types.acp.ts index f69971ced93..f50cfaf71f9 100644 --- a/src/config/types.acp.ts +++ b/src/config/types.acp.ts @@ -1,3 +1,5 @@ +import type { AcpSessionUpdateTag } from "../acp/runtime/types.js"; + export type AcpDispatchConfig = { /** Master switch for ACP turn dispatch in the reply pipeline. */ enabled?: boolean; @@ -8,6 +10,25 @@ export type AcpStreamConfig = { coalesceIdleMs?: number; /** Maximum text size per streamed chunk. */ maxChunkChars?: number; + /** Controls how ACP meta/system updates are projected to channels. */ + metaMode?: "off" | "minimal" | "verbose"; + /** Toggles usage_update projection in channel-facing output. */ + showUsage?: boolean; + /** Live streams chunks or waits for terminal event before delivery. */ + deliveryMode?: "live" | "final_only"; + /** Maximum assistant text characters forwarded per turn. */ + maxTurnChars?: number; + /** Maximum visible characters for tool summary/meta lines. */ + maxToolSummaryChars?: number; + /** Maximum visible characters for status lines. */ + maxStatusChars?: number; + /** Maximum number of meta events projected per turn. */ + maxMetaEventsPerTurn?: number; + /** + * Per-sessionUpdate visibility overrides. + * Keys not listed here fall back to OpenClaw defaults. + */ + tagVisibility?: Partial>; }; export type AcpRuntimeConfig = { diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index c4e3adfc36f..9681b3df23d 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -339,6 +339,16 @@ export const OpenClawSchema = z .object({ coalesceIdleMs: z.number().int().nonnegative().optional(), maxChunkChars: z.number().int().positive().optional(), + metaMode: z + .union([z.literal("off"), z.literal("minimal"), z.literal("verbose")]) + .optional(), + showUsage: z.boolean().optional(), + deliveryMode: z.union([z.literal("live"), z.literal("final_only")]).optional(), + maxTurnChars: z.number().int().positive().optional(), + maxToolSummaryChars: z.number().int().positive().optional(), + maxStatusChars: z.number().int().positive().optional(), + maxMetaEventsPerTurn: z.number().int().positive().optional(), + tagVisibility: z.record(z.string(), z.boolean()).optional(), }) .strict() .optional(), diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index 6a0829c0b9f..b18eae6c25c 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -80,6 +80,7 @@ export type { AcpRuntimeEvent, AcpRuntimeHandle, AcpRuntimePromptMode, + AcpSessionUpdateTag, AcpRuntimeSessionMode, AcpRuntimeStatus, AcpRuntimeTurnInput,