ACP: carry dedupe/projector updates onto configurable acpx branch

This commit is contained in:
Onur
2026-03-01 09:19:11 +01:00
committed by Onur Solmaz
parent f88bc09f85
commit 2466a9bb13
14 changed files with 1076 additions and 171 deletions

View File

@@ -44,6 +44,7 @@ describe("PromptStreamProjector", () => {
type: "text_delta",
text: "hello world",
stream: "output",
tag: "agent_message_chunk",
});
});
@@ -71,6 +72,7 @@ describe("PromptStreamProjector", () => {
type: "text_delta",
text: " indented",
stream: "output",
tag: "agent_message_chunk",
});
});
@@ -98,10 +100,11 @@ describe("PromptStreamProjector", () => {
type: "text_delta",
text: "thinking",
stream: "thought",
tag: "agent_thought_chunk",
});
});
it("maps tool call updates to tool_call events", () => {
it("maps tool call updates with metadata and stable fallback title", () => {
const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine(
@@ -111,9 +114,8 @@ describe("PromptStreamProjector", () => {
params: {
sessionId: "session-1",
update: {
sessionUpdate: "tool_call",
toolCallId: "call-1",
title: "exec",
sessionUpdate: "tool_call_update",
toolCallId: "call_ABC123",
status: "in_progress",
},
},
@@ -122,7 +124,38 @@ describe("PromptStreamProjector", () => {
expect(event).toEqual({
type: "tool_call",
text: "exec (in_progress)",
text: "tool call (in_progress)",
tag: "tool_call_update",
toolCallId: "call_ABC123",
status: "in_progress",
title: "tool call",
});
});
it("maps usage updates with numeric metadata", () => {
const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "session-1",
update: {
sessionUpdate: "usage_update",
used: 12,
size: 500,
},
},
}),
);
expect(event).toEqual({
type: "status",
text: "usage updated: 12/500",
tag: "usage_update",
used: 12,
size: 500,
});
});
@@ -167,6 +200,7 @@ describe("PromptStreamProjector", () => {
type: "text_delta",
text: "new turn",
stream: "output",
tag: "agent_message_chunk",
});
});

View File

@@ -1,4 +1,4 @@
import type { AcpRuntimeEvent } from "openclaw/plugin-sdk";
import type { AcpRuntimeEvent, AcpSessionUpdateTag } from "openclaw/plugin-sdk";
import { isAcpJsonRpcMessage, normalizeJsonRpcId } from "./jsonrpc.js";
import {
asOptionalString,
@@ -27,6 +27,10 @@ export function parseJsonLines(value: string): AcpxJsonObject[] {
return events;
}
function asOptionalFiniteNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function parsePromptStopReason(message: Record<string, unknown>): string | undefined {
if (!Object.hasOwn(message, "result")) {
return undefined;
@@ -39,6 +43,88 @@ function parsePromptStopReason(message: Record<string, unknown>): string | undef
return stopReason && stopReason.trim().length > 0 ? stopReason : undefined;
}
function resolveTextChunk(params: {
update: Record<string, unknown>;
stream: "output" | "thought";
tag: AcpSessionUpdateTag;
}): AcpRuntimeEvent | null {
const contentRaw = params.update.content;
if (isRecord(contentRaw)) {
const contentType = asTrimmedString(contentRaw.type);
if (contentType && contentType !== "text") {
return null;
}
const text = asString(contentRaw.text);
if (text && text.length > 0) {
return {
type: "text_delta",
text,
stream: params.stream,
tag: params.tag,
};
}
}
const text = asString(params.update.text);
if (!text || text.length === 0) {
return null;
}
return {
type: "text_delta",
text,
stream: params.stream,
tag: params.tag,
};
}
function resolveStatusTextForTag(params: {
tag: AcpSessionUpdateTag;
update: Record<string, unknown>;
}): string | null {
const { tag, update } = params;
if (tag === "available_commands_update") {
const commands = Array.isArray(update.availableCommands) ? update.availableCommands : [];
return commands.length > 0
? `available commands updated (${commands.length})`
: "available commands updated";
}
if (tag === "current_mode_update") {
const mode =
asTrimmedString(update.currentModeId) ||
asTrimmedString(update.modeId) ||
asTrimmedString(update.mode);
return mode ? `mode updated: ${mode}` : "mode updated";
}
if (tag === "config_option_update") {
const id = asTrimmedString(update.id) || asTrimmedString(update.configOptionId);
const value =
asTrimmedString(update.currentValue) ||
asTrimmedString(update.value) ||
asTrimmedString(update.optionValue);
if (id && value) {
return `config updated: ${id}=${value}`;
}
if (id) {
return `config updated: ${id}`;
}
return "config updated";
}
if (tag === "session_info_update") {
return asTrimmedString(update.summary) || asTrimmedString(update.message) || "session updated";
}
if (tag === "plan") {
const entries = Array.isArray(update.entries) ? update.entries : [];
const first = entries.find((entry) => isRecord(entry)) as Record<string, unknown> | undefined;
const content = asTrimmedString(first?.content);
if (!content) {
return "plan updated";
}
const status = asTrimmedString(first?.status);
return status ? `plan: [${status}] ${content}` : `plan: ${content}`;
}
return null;
}
function parseSessionUpdateEvent(message: Record<string, unknown>): AcpRuntimeEvent | null {
if (asTrimmedString(message.method) !== "session/update") {
return null;
@@ -52,105 +138,65 @@ function parseSessionUpdateEvent(message: Record<string, unknown>): AcpRuntimeEv
return null;
}
const sessionUpdate = asTrimmedString(update.sessionUpdate);
switch (sessionUpdate) {
case "agent_message_chunk": {
const content = isRecord(update.content) ? update.content : null;
if (!content || asTrimmedString(content.type) !== "text") {
return null;
}
const text = asString(content.text);
if (!text) {
return null;
}
return {
type: "text_delta",
text,
const tag = asOptionalString(update.sessionUpdate) as AcpSessionUpdateTag | undefined;
if (!tag) {
return null;
}
switch (tag) {
case "agent_message_chunk":
return resolveTextChunk({
update,
stream: "output",
};
}
case "agent_thought_chunk": {
const content = isRecord(update.content) ? update.content : null;
if (!content || asTrimmedString(content.type) !== "text") {
return null;
}
const text = asString(content.text);
if (!text) {
return null;
}
return {
type: "text_delta",
text,
tag,
});
case "agent_thought_chunk":
return resolveTextChunk({
update,
stream: "thought",
};
}
tag,
});
case "tool_call":
case "tool_call_update": {
const title =
asTrimmedString(update.title) ||
asTrimmedString(update.toolCallId) ||
asTrimmedString(update.kind) ||
"tool";
const title = asTrimmedString(update.title) || "tool call";
const status = asTrimmedString(update.status);
const toolCallId = asOptionalString(update.toolCallId);
return {
type: "tool_call",
text: status ? `${title} (${status})` : title,
};
}
case "plan": {
const entries = Array.isArray(update.entries) ? update.entries : [];
const first = entries.find((entry) => isRecord(entry)) as Record<string, unknown> | undefined;
const content = asTrimmedString(first?.content);
if (!content) {
return { type: "status", text: "plan updated" };
}
const status = asTrimmedString(first?.status);
return {
type: "status",
text: status ? `plan: [${status}] ${content}` : `plan: ${content}`,
};
}
case "available_commands_update": {
const commands = Array.isArray(update.availableCommands)
? update.availableCommands.length
: 0;
return {
type: "status",
text: `available commands updated (${commands})`,
};
}
case "current_mode_update": {
const modeId = asTrimmedString(update.currentModeId);
return {
type: "status",
text: modeId ? `mode updated: ${modeId}` : "mode updated",
};
}
case "config_option_update": {
const options = Array.isArray(update.configOptions) ? update.configOptions.length : 0;
return {
type: "status",
text: `config options updated (${options})`,
};
}
case "session_info_update": {
const title = asTrimmedString(update.title);
return {
type: "status",
text: title ? `session info updated: ${title}` : "session info updated",
tag,
...(toolCallId ? { toolCallId } : {}),
...(status ? { status } : {}),
title,
};
}
case "usage_update": {
const used =
typeof update.used === "number" && Number.isFinite(update.used) ? update.used : null;
const size =
typeof update.size === "number" && Number.isFinite(update.size) ? update.size : null;
if (used == null || size == null) {
return { type: "status", text: "usage updated" };
const used = asOptionalFiniteNumber(update.used);
const size = asOptionalFiniteNumber(update.size);
return {
type: "status",
text: used != null && size != null ? `usage updated: ${used}/${size}` : "usage updated",
tag,
...(used != null ? { used } : {}),
...(size != null ? { size } : {}),
};
}
case "available_commands_update":
case "current_mode_update":
case "config_option_update":
case "session_info_update":
case "plan": {
const text = resolveStatusTextForTag({
tag,
update,
});
if (!text) {
return null;
}
return {
type: "status",
text: `usage updated: ${used}/${size}`,
text,
tag,
};
}
default:

View File

@@ -2,6 +2,19 @@ export type AcpRuntimePromptMode = "prompt" | "steer";
export type AcpRuntimeSessionMode = "persistent" | "oneshot";
export type AcpSessionUpdateTag =
| "agent_message_chunk"
| "agent_thought_chunk"
| "tool_call"
| "tool_call_update"
| "usage_update"
| "available_commands_update"
| "current_mode_update"
| "config_option_update"
| "session_info_update"
| "plan"
| (string & {});
export type AcpRuntimeControl = "session/set_mode" | "session/set_config_option" | "session/status";
export type AcpRuntimeHandle = {
@@ -67,14 +80,22 @@ export type AcpRuntimeEvent =
type: "text_delta";
text: string;
stream?: "output" | "thought";
tag?: AcpSessionUpdateTag;
}
| {
type: "status";
text: string;
tag?: AcpSessionUpdateTag;
used?: number;
size?: number;
}
| {
type: "tool_call";
text: string;
tag?: AcpSessionUpdateTag;
toolCallId?: string;
status?: string;
title?: string;
}
| {
type: "done";

View File

@@ -43,6 +43,26 @@ vi.mock("../../agents/subagent-registry.js", () => ({
markSubagentRunTerminated: subagentRegistryMocks.markSubagentRunTerminated,
}));
const acpManagerMocks = vi.hoisted(() => ({
resolveSession: vi.fn<
() =>
| { kind: "none" }
| {
kind: "ready";
sessionKey: string;
meta: unknown;
}
>(() => ({ kind: "none" })),
cancelSession: vi.fn(async () => {}),
}));
vi.mock("../../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
resolveSession: acpManagerMocks.resolveSession,
cancelSession: acpManagerMocks.cancelSession,
}),
}));
describe("abort detection", () => {
async function writeSessionStore(
storePath: string,
@@ -106,6 +126,8 @@ describe("abort detection", () => {
afterEach(() => {
resetAbortMemoryForTest();
acpManagerMocks.resolveSession.mockReset().mockReturnValue({ kind: "none" });
acpManagerMocks.cancelSession.mockReset().mockResolvedValue(undefined);
});
it("triggerBodyNormalized extracts /stop from RawBody for abort detection", async () => {
@@ -355,6 +377,85 @@ describe("abort detection", () => {
expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${sessionKey}`);
});
it("plain-language stop on ACP-bound session triggers ACP cancel", async () => {
const sessionKey = "agent:codex:acp:test-1";
const sessionId = "session-123";
const { cfg } = await createAbortConfig({
sessionIdsByKey: { [sessionKey]: sessionId },
});
acpManagerMocks.resolveSession.mockReturnValue({
kind: "ready",
sessionKey,
meta: {} as never,
});
const result = await runStopCommand({
cfg,
sessionKey,
from: "telegram:123",
to: "telegram:123",
targetSessionKey: sessionKey,
});
expect(result.handled).toBe(true);
expect(acpManagerMocks.cancelSession).toHaveBeenCalledWith({
cfg,
sessionKey,
reason: "fast-abort",
});
});
it("ACP cancel failures do not skip queue and lane cleanup", async () => {
const sessionKey = "agent:codex:acp:test-2";
const sessionId = "session-456";
const { root, cfg } = await createAbortConfig({
sessionIdsByKey: { [sessionKey]: sessionId },
});
const followupRun: FollowupRun = {
prompt: "queued",
enqueuedAt: Date.now(),
run: {
agentId: "main",
agentDir: path.join(root, "agent"),
sessionId,
sessionKey,
messageProvider: "telegram",
agentAccountId: "acct",
sessionFile: path.join(root, "session.jsonl"),
workspaceDir: path.join(root, "workspace"),
config: cfg,
provider: "anthropic",
model: "claude-opus-4-5",
timeoutMs: 1000,
blockReplyBreak: "text_end",
},
};
enqueueFollowupRun(
sessionKey,
followupRun,
{ mode: "collect", debounceMs: 0, cap: 20, dropPolicy: "summarize" },
"none",
);
acpManagerMocks.resolveSession.mockReturnValue({
kind: "ready",
sessionKey,
meta: {} as never,
});
acpManagerMocks.cancelSession.mockRejectedValueOnce(new Error("cancel failed"));
const result = await runStopCommand({
cfg,
sessionKey,
from: "telegram:123",
to: "telegram:123",
targetSessionKey: sessionKey,
});
expect(result.handled).toBe(true);
expect(getFollowupQueueDepth(sessionKey)).toBe(0);
expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${sessionKey}`);
});
it("persists abort cutoff metadata on /stop when command and target session match", async () => {
const sessionKey = "telegram:123";
const sessionId = "session-123";

View File

@@ -1,3 +1,4 @@
import { getAcpSessionManager } from "../../acp/control-plane/manager.js";
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import { abortEmbeddedPiRun } from "../../agents/pi-embedded.js";
import {
@@ -301,9 +302,28 @@ export async function tryFastAbortFromMessage(params: {
const storePath = resolveStorePath(cfg.session?.store, { agentId });
const store = loadSessionStore(storePath);
const { entry, key } = resolveSessionEntryForKey(store, targetKey);
const resolvedTargetKey = key ?? targetKey;
const acpManager = getAcpSessionManager();
const acpResolution = acpManager.resolveSession({
cfg,
sessionKey: resolvedTargetKey,
});
if (acpResolution.kind !== "none") {
try {
await acpManager.cancelSession({
cfg,
sessionKey: resolvedTargetKey,
reason: "fast-abort",
});
} catch (error) {
logVerbose(
`abort: ACP cancel failed for ${resolvedTargetKey}: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
const sessionId = entry?.sessionId;
const aborted = sessionId ? abortEmbeddedPiRun(sessionId) : false;
const cleared = clearSessionQueues([key ?? targetKey, sessionId]);
const cleared = clearSessionQueues([resolvedTargetKey, sessionId]);
if (cleared.followupCleared > 0 || cleared.laneCleared > 0) {
logVerbose(
`abort: cleared followups=${cleared.followupCleared} lane=${cleared.laneCleared} keys=${cleared.keys.join(",")}`,
@@ -311,7 +331,7 @@ export async function tryFastAbortFromMessage(params: {
}
const abortCutoff = shouldPersistAbortCutoff({
commandSessionKey: ctx.SessionKey,
targetSessionKey: key ?? targetKey,
targetSessionKey: resolvedTargetKey,
})
? resolveAbortCutoffFromContext(ctx)
: undefined;

View File

@@ -1,5 +1,6 @@
import { describe, expect, it, vi } from "vitest";
import { describe, expect, it } from "vitest";
import type { OpenClawConfig } from "../../config/config.js";
import { prefixSystemMessage } from "../../infra/system-message.js";
import { createAcpReplyProjector } from "./acp-projector.js";
function createCfg(overrides?: Partial<OpenClawConfig>): OpenClawConfig {
@@ -8,7 +9,7 @@ function createCfg(overrides?: Partial<OpenClawConfig>): OpenClawConfig {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 50,
maxChunkChars: 64,
},
},
...overrides,
@@ -29,71 +30,123 @@ describe("createAcpReplyProjector", () => {
await projector.onEvent({
type: "text_delta",
text: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
text: "a".repeat(70),
tag: "agent_message_chunk",
});
await projector.flush(true);
expect(deliveries).toEqual([
{ kind: "block", text: "a".repeat(64) },
{ kind: "block", text: "a".repeat(6) },
]);
});
it("supports deliveryMode=final_only by buffering deltas until done", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 512,
deliveryMode: "final_only",
},
},
}),
shouldSendToolSummaries: true,
deliver: async (kind, payload) => {
deliveries.push({ kind, text: payload.text });
return true;
},
});
await projector.onEvent({
type: "text_delta",
text: "What",
tag: "agent_message_chunk",
});
await projector.onEvent({
type: "text_delta",
text: "bbbbbbbbbb",
text: " now?",
tag: "agent_message_chunk",
});
await projector.flush(true);
expect(deliveries).toEqual([
{
kind: "block",
text: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
},
{ kind: "block", text: "aabbbbbbbbbb" },
]);
});
it("buffers tiny token deltas and flushes once at turn end", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
},
},
}),
shouldSendToolSummaries: true,
provider: "discord",
deliver: async (kind, payload) => {
deliveries.push({ kind, text: payload.text });
return true;
},
});
await projector.onEvent({ type: "text_delta", text: "What" });
await projector.onEvent({ type: "text_delta", text: " do" });
await projector.onEvent({ type: "text_delta", text: " you want to work on?" });
expect(deliveries).toEqual([]);
await projector.flush(true);
expect(deliveries).toEqual([{ kind: "block", text: "What do you want to work on?" }]);
await projector.onEvent({ type: "done" });
expect(deliveries).toEqual([{ kind: "block", text: "What now?" }]);
});
it("filters thought stream text and suppresses tool summaries when disabled", async () => {
const deliver = vi.fn(async () => true);
const projector = createAcpReplyProjector({
it("suppresses usage_update by default and allows deduped usage when enabled", async () => {
const hidden: Array<{ kind: string; text?: string }> = [];
const hiddenProjector = createAcpReplyProjector({
cfg: createCfg(),
shouldSendToolSummaries: false,
deliver,
shouldSendToolSummaries: true,
deliver: async (kind, payload) => {
hidden.push({ kind, text: payload.text });
return true;
},
});
await hiddenProjector.onEvent({
type: "status",
text: "usage updated: 10/100",
tag: "usage_update",
used: 10,
size: 100,
});
expect(hidden).toEqual([]);
const shown: Array<{ kind: string; text?: string }> = [];
const shownProjector = createAcpReplyProjector({
cfg: createCfg({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 64,
showUsage: true,
tagVisibility: {
usage_update: true,
},
},
},
}),
shouldSendToolSummaries: true,
deliver: async (kind, payload) => {
shown.push({ kind, text: payload.text });
return true;
},
});
await projector.onEvent({ type: "text_delta", text: "internal", stream: "thought" });
await projector.onEvent({ type: "status", text: "running tool" });
await projector.onEvent({ type: "tool_call", text: "ls" });
await projector.flush(true);
await shownProjector.onEvent({
type: "status",
text: "usage updated: 10/100",
tag: "usage_update",
used: 10,
size: 100,
});
await shownProjector.onEvent({
type: "status",
text: "usage updated: 10/100",
tag: "usage_update",
used: 10,
size: 100,
});
await shownProjector.onEvent({
type: "status",
text: "usage updated: 11/100",
tag: "usage_update",
used: 11,
size: 100,
});
expect(deliver).not.toHaveBeenCalled();
expect(shown).toEqual([
{ kind: "tool", text: prefixSystemMessage("usage updated: 10/100") },
{ kind: "tool", text: prefixSystemMessage("usage updated: 11/100") },
]);
});
it("emits status and tool_call summaries when enabled", async () => {
it("dedupes repeated tool lifecycle updates in minimal mode", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg(),
@@ -104,16 +157,70 @@ describe("createAcpReplyProjector", () => {
},
});
await projector.onEvent({ type: "status", text: "planning" });
await projector.onEvent({ type: "tool_call", text: "exec ls" });
await projector.onEvent({
type: "tool_call",
tag: "tool_call",
toolCallId: "call_1",
status: "in_progress",
title: "List files",
text: "List files (in_progress)",
});
await projector.onEvent({
type: "tool_call",
tag: "tool_call_update",
toolCallId: "call_1",
status: "in_progress",
title: "List files",
text: "List files (in_progress)",
});
await projector.onEvent({
type: "tool_call",
tag: "tool_call_update",
toolCallId: "call_1",
status: "completed",
title: "List files",
text: "List files (completed)",
});
await projector.onEvent({
type: "tool_call",
tag: "tool_call_update",
toolCallId: "call_1",
status: "completed",
title: "List files",
text: "List files (completed)",
});
expect(deliveries).toEqual([
{ kind: "tool", text: "⚙️ planning" },
{ kind: "tool", text: "🧰 exec ls" },
]);
expect(deliveries.length).toBe(2);
expect(deliveries[0]?.kind).toBe("tool");
expect(deliveries[0]?.text).toContain("Tool Call");
expect(deliveries[1]?.kind).toBe("tool");
expect(deliveries[1]?.text).toContain("Tool Call");
});
it("flushes pending streamed text before tool/status updates", async () => {
it("renders fallback tool labels without leaking call ids as primary label", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg(),
shouldSendToolSummaries: true,
deliver: async (kind, payload) => {
deliveries.push({ kind, text: payload.text });
return true;
},
});
await projector.onEvent({
type: "tool_call",
tag: "tool_call",
toolCallId: "call_ABC123",
status: "in_progress",
text: "call_ABC123 (in_progress)",
});
expect(deliveries[0]?.text).toContain("Tool Call");
expect(deliveries[0]?.text).not.toContain("call_ABC123 (");
});
it("respects metaMode=off and still streams assistant text", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg({
@@ -122,24 +229,118 @@ describe("createAcpReplyProjector", () => {
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
metaMode: "off",
},
},
}),
shouldSendToolSummaries: true,
provider: "discord",
deliver: async (kind, payload) => {
deliveries.push({ kind, text: payload.text });
return true;
},
});
await projector.onEvent({ type: "text_delta", text: "Hello" });
await projector.onEvent({ type: "text_delta", text: " world" });
await projector.onEvent({ type: "status", text: "running tool" });
await projector.onEvent({
type: "status",
text: "available commands updated",
tag: "available_commands_update",
});
await projector.onEvent({
type: "tool_call",
text: "tool call",
tag: "tool_call",
toolCallId: "x",
status: "in_progress",
});
await projector.onEvent({
type: "text_delta",
text: "hello",
tag: "agent_message_chunk",
});
await projector.flush(true);
expect(deliveries).toEqual([{ kind: "block", text: "hello" }]);
});
it("truncates oversized turns once and emits one truncation notice", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
maxTurnChars: 5,
metaMode: "minimal",
},
},
}),
shouldSendToolSummaries: true,
deliver: async (kind, payload) => {
deliveries.push({ kind, text: payload.text });
return true;
},
});
await projector.onEvent({
type: "text_delta",
text: "hello world",
tag: "agent_message_chunk",
});
await projector.onEvent({
type: "text_delta",
text: "ignored tail",
tag: "agent_message_chunk",
});
await projector.flush(true);
expect(deliveries).toEqual([
{ kind: "block", text: "Hello world" },
{ kind: "tool", text: "⚙️ running tool" },
{ kind: "block", text: "hello" },
{ kind: "tool", text: prefixSystemMessage("output truncated") },
]);
});
it("supports tagVisibility overrides for tool updates", async () => {
const deliveries: Array<{ kind: string; text?: string }> = [];
const projector = createAcpReplyProjector({
cfg: createCfg({
acp: {
enabled: true,
stream: {
coalesceIdleMs: 0,
maxChunkChars: 256,
tagVisibility: {
tool_call_update: false,
},
},
},
}),
shouldSendToolSummaries: true,
deliver: async (kind, payload) => {
deliveries.push({ kind, text: payload.text });
return true;
},
});
await projector.onEvent({
type: "tool_call",
tag: "tool_call",
toolCallId: "c1",
status: "in_progress",
title: "Run tests",
text: "Run tests (in_progress)",
});
await projector.onEvent({
type: "tool_call",
tag: "tool_call_update",
toolCallId: "c1",
status: "completed",
title: "Run tests",
text: "Run tests (completed)",
});
expect(deliveries.length).toBe(1);
expect(deliveries[0]?.text).toContain("Tool Call");
});
});

View File

@@ -1,6 +1,8 @@
import type { AcpRuntimeEvent } from "../../acp/runtime/types.js";
import type { AcpRuntimeEvent, AcpSessionUpdateTag } from "../../acp/runtime/types.js";
import { EmbeddedBlockChunker } from "../../agents/pi-embedded-block-chunker.js";
import { formatToolSummary, resolveToolDisplay } from "../../agents/tool-display.js";
import type { OpenClawConfig } from "../../config/config.js";
import { prefixSystemMessage } from "../../infra/system-message.js";
import type { ReplyPayload } from "../types.js";
import { createBlockReplyPipeline } from "./block-reply-pipeline.js";
import { resolveEffectiveBlockStreamingConfig } from "./block-streaming.js";
@@ -8,8 +10,57 @@ import type { ReplyDispatchKind } from "./reply-dispatcher.js";
const DEFAULT_ACP_STREAM_COALESCE_IDLE_MS = 350;
const DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS = 1800;
const DEFAULT_ACP_META_MODE = "minimal";
const DEFAULT_ACP_SHOW_USAGE = false;
const DEFAULT_ACP_DELIVERY_MODE = "live";
const DEFAULT_ACP_MAX_TURN_CHARS = 24_000;
const DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS = 320;
const DEFAULT_ACP_MAX_STATUS_CHARS = 320;
const DEFAULT_ACP_MAX_META_EVENTS_PER_TURN = 64;
const ACP_BLOCK_REPLY_TIMEOUT_MS = 15_000;
const ACP_TAG_VISIBILITY_DEFAULTS: Record<string, boolean> = {
agent_message_chunk: true,
tool_call: true,
tool_call_update: true,
usage_update: false,
available_commands_update: false,
current_mode_update: false,
config_option_update: false,
session_info_update: false,
plan: false,
agent_thought_chunk: false,
};
const TERMINAL_TOOL_STATUSES = new Set(["completed", "failed", "cancelled", "done", "error"]);
export type AcpProjectedDeliveryMeta = {
tag?: AcpSessionUpdateTag;
toolCallId?: string;
toolStatus?: string;
allowEdit?: boolean;
};
type AcpDeliveryMode = "live" | "final_only";
type AcpMetaMode = "off" | "minimal" | "verbose";
type AcpProjectionSettings = {
deliveryMode: AcpDeliveryMode;
metaMode: AcpMetaMode;
showUsage: boolean;
maxTurnChars: number;
maxToolSummaryChars: number;
maxStatusChars: number;
maxMetaEventsPerTurn: number;
tagVisibility: Partial<Record<AcpSessionUpdateTag, boolean>>;
};
type ToolLifecycleState = {
started: boolean;
terminal: boolean;
lastRenderedHash?: string;
};
function clampPositiveInteger(
value: unknown,
fallback: number,
@@ -28,6 +79,21 @@ function clampPositiveInteger(
return rounded;
}
function clampBoolean(value: unknown, fallback: boolean): boolean {
return typeof value === "boolean" ? value : fallback;
}
function resolveAcpDeliveryMode(value: unknown): AcpDeliveryMode {
return value === "final_only" ? "final_only" : DEFAULT_ACP_DELIVERY_MODE;
}
function resolveAcpMetaMode(value: unknown): AcpMetaMode {
if (value === "off" || value === "minimal" || value === "verbose") {
return value;
}
return DEFAULT_ACP_META_MODE;
}
function resolveAcpStreamCoalesceIdleMs(cfg: OpenClawConfig): number {
return clampPositiveInteger(
cfg.acp?.stream?.coalesceIdleMs,
@@ -46,6 +112,40 @@ function resolveAcpStreamMaxChunkChars(cfg: OpenClawConfig): number {
});
}
function resolveAcpProjectionSettings(cfg: OpenClawConfig): AcpProjectionSettings {
const stream = cfg.acp?.stream;
return {
deliveryMode: resolveAcpDeliveryMode(stream?.deliveryMode),
metaMode: resolveAcpMetaMode(stream?.metaMode),
showUsage: clampBoolean(stream?.showUsage, DEFAULT_ACP_SHOW_USAGE),
maxTurnChars: clampPositiveInteger(stream?.maxTurnChars, DEFAULT_ACP_MAX_TURN_CHARS, {
min: 1,
max: 500_000,
}),
maxToolSummaryChars: clampPositiveInteger(
stream?.maxToolSummaryChars,
DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS,
{
min: 64,
max: 8_000,
},
),
maxStatusChars: clampPositiveInteger(stream?.maxStatusChars, DEFAULT_ACP_MAX_STATUS_CHARS, {
min: 64,
max: 8_000,
}),
maxMetaEventsPerTurn: clampPositiveInteger(
stream?.maxMetaEventsPerTurn,
DEFAULT_ACP_MAX_META_EVENTS_PER_TURN,
{
min: 1,
max: 2_000,
},
),
tagVisibility: stream?.tagVisibility ?? {},
};
}
function resolveAcpStreamingConfig(params: {
cfg: OpenClawConfig;
provider?: string;
@@ -60,6 +160,66 @@ function resolveAcpStreamingConfig(params: {
});
}
function truncateText(input: string, maxChars: number): string {
if (input.length <= maxChars) {
return input;
}
if (maxChars <= 1) {
return input.slice(0, maxChars);
}
return `${input.slice(0, maxChars - 1)}`;
}
function hashText(text: string): string {
return text.trim();
}
function normalizeToolStatus(status: string | undefined): string | undefined {
if (!status) {
return undefined;
}
const normalized = status.trim().toLowerCase();
return normalized || undefined;
}
function isTagVisible(
settings: AcpProjectionSettings,
tag: AcpSessionUpdateTag | undefined,
): boolean {
if (!tag) {
return true;
}
const override = settings.tagVisibility[tag];
if (typeof override === "boolean") {
return override;
}
if (Object.prototype.hasOwnProperty.call(ACP_TAG_VISIBILITY_DEFAULTS, tag)) {
return ACP_TAG_VISIBILITY_DEFAULTS[tag];
}
return true;
}
function renderToolSummaryText(event: Extract<AcpRuntimeEvent, { type: "tool_call" }>): string {
const detailParts: string[] = [];
const title = event.title?.trim();
if (title) {
detailParts.push(title);
}
const status = event.status?.trim();
if (status) {
detailParts.push(`status=${status}`);
}
const fallback = event.text?.trim();
if (detailParts.length === 0 && fallback) {
detailParts.push(fallback);
}
const display = resolveToolDisplay({
name: "tool_call",
meta: detailParts.join(" · ") || "tool call",
});
return formatToolSummary(display);
}
export type AcpReplyProjector = {
onEvent: (event: AcpRuntimeEvent) => Promise<void>;
flush: (force?: boolean) => Promise<void>;
@@ -68,10 +228,15 @@ export type AcpReplyProjector = {
export function createAcpReplyProjector(params: {
cfg: OpenClawConfig;
shouldSendToolSummaries: boolean;
deliver: (kind: ReplyDispatchKind, payload: ReplyPayload) => Promise<boolean>;
deliver: (
kind: ReplyDispatchKind,
payload: ReplyPayload,
meta?: AcpProjectedDeliveryMeta,
) => Promise<boolean>;
provider?: string;
accountId?: string;
}): AcpReplyProjector {
const settings = resolveAcpProjectionSettings(params.cfg);
const streaming = resolveAcpStreamingConfig({
cfg: params.cfg,
provider: params.provider,
@@ -86,7 +251,28 @@ export function createAcpReplyProjector(params: {
});
const chunker = new EmbeddedBlockChunker(streaming.chunking);
let emittedTurnChars = 0;
let emittedMetaEvents = 0;
let truncationNoticeEmitted = false;
let lastStatusHash: string | undefined;
let lastToolHash: string | undefined;
let lastUsageTuple: string | undefined;
const toolLifecycleById = new Map<string, ToolLifecycleState>();
const resetTurnState = () => {
emittedTurnChars = 0;
emittedMetaEvents = 0;
truncationNoticeEmitted = false;
lastStatusHash = undefined;
lastToolHash = undefined;
lastUsageTuple = undefined;
toolLifecycleById.clear();
};
const drainChunker = (force: boolean) => {
if (settings.deliveryMode === "final_only" && !force) {
return;
}
chunker.drain({
force,
emit: (chunk) => {
@@ -100,13 +286,132 @@ export function createAcpReplyProjector(params: {
await blockReplyPipeline.flush({ force });
};
const emitToolSummary = async (prefix: string, text: string): Promise<void> => {
if (!params.shouldSendToolSummaries || !text) {
const consumeMetaQuota = (force: boolean): boolean => {
if (force) {
return true;
}
if (emittedMetaEvents >= settings.maxMetaEventsPerTurn) {
return false;
}
emittedMetaEvents += 1;
return true;
};
const emitSystemStatus = async (
text: string,
meta?: AcpProjectedDeliveryMeta,
opts?: { force?: boolean; dedupe?: boolean },
) => {
if (!params.shouldSendToolSummaries) {
return;
}
// Keep tool summaries ordered after any pending streamed text.
await flush(true);
await params.deliver("tool", { text: `${prefix} ${text}` });
if (settings.metaMode === "off" && opts?.force !== true) {
return;
}
const bounded = truncateText(text.trim(), settings.maxStatusChars);
if (!bounded) {
return;
}
const formatted = prefixSystemMessage(bounded);
const hash = hashText(formatted);
const shouldDedupe = opts?.dedupe !== false;
if (shouldDedupe && lastStatusHash === hash) {
return;
}
if (!consumeMetaQuota(opts?.force === true)) {
return;
}
if (settings.deliveryMode === "live") {
await flush(true);
}
await params.deliver("tool", { text: formatted }, meta);
lastStatusHash = hash;
};
const emitToolSummary = async (
event: Extract<AcpRuntimeEvent, { type: "tool_call" }>,
opts?: { force?: boolean },
) => {
if (!params.shouldSendToolSummaries || settings.metaMode === "off") {
return;
}
if (!isTagVisible(settings, event.tag)) {
return;
}
const toolSummary = truncateText(renderToolSummaryText(event), settings.maxToolSummaryChars);
const hash = hashText(toolSummary);
const toolCallId = event.toolCallId?.trim() || undefined;
const status = normalizeToolStatus(event.status);
const isTerminal = status ? TERMINAL_TOOL_STATUSES.has(status) : false;
const isStart = status === "in_progress" || event.tag === "tool_call";
if (settings.metaMode === "verbose") {
if (lastToolHash === hash) {
return;
}
} else if (settings.metaMode === "minimal") {
if (toolCallId) {
const state = toolLifecycleById.get(toolCallId) ?? {
started: false,
terminal: false,
};
if (isTerminal && state.terminal) {
return;
}
if (isStart && state.started) {
return;
}
if (state.lastRenderedHash === hash) {
return;
}
if (isStart) {
state.started = true;
}
if (isTerminal) {
state.terminal = true;
}
state.lastRenderedHash = hash;
toolLifecycleById.set(toolCallId, state);
} else if (lastToolHash === hash) {
return;
}
}
if (!consumeMetaQuota(opts?.force === true)) {
return;
}
if (settings.deliveryMode === "live") {
await flush(true);
}
await params.deliver(
"tool",
{ text: toolSummary },
{
...(event.tag ? { tag: event.tag } : {}),
...(toolCallId ? { toolCallId } : {}),
...(status ? { toolStatus: status } : {}),
allowEdit: Boolean(toolCallId && event.tag === "tool_call_update"),
},
);
lastToolHash = hash;
};
const emitTruncationNotice = async () => {
if (truncationNoticeEmitted) {
return;
}
truncationNoticeEmitted = true;
await emitSystemStatus(
"output truncated",
{
tag: "session_info_update",
},
{
force: true,
dedupe: false,
},
);
};
const onEvent = async (event: AcpRuntimeEvent): Promise<void> => {
@@ -114,22 +419,61 @@ export function createAcpReplyProjector(params: {
if (event.stream && event.stream !== "output") {
return;
}
if (event.text) {
chunker.append(event.text);
if (!isTagVisible(settings, event.tag)) {
return;
}
const text = event.text;
if (!text) {
return;
}
if (emittedTurnChars >= settings.maxTurnChars) {
await emitTruncationNotice();
return;
}
const remaining = settings.maxTurnChars - emittedTurnChars;
const accepted = remaining < text.length ? text.slice(0, remaining) : text;
if (accepted.length > 0) {
chunker.append(accepted);
emittedTurnChars += accepted.length;
drainChunker(false);
}
if (accepted.length < text.length) {
await emitTruncationNotice();
}
return;
}
if (event.type === "status") {
await emitToolSummary("⚙️", event.text);
if (!isTagVisible(settings, event.tag)) {
return;
}
if (event.tag === "usage_update") {
if (!settings.showUsage) {
return;
}
const usageTuple =
typeof event.used === "number" && typeof event.size === "number"
? `${event.used}/${event.size}`
: hashText(event.text);
if (usageTuple === lastUsageTuple) {
return;
}
lastUsageTuple = usageTuple;
}
await emitSystemStatus(event.text, event.tag ? { tag: event.tag } : undefined, {
dedupe: true,
});
return;
}
if (event.type === "tool_call") {
await emitToolSummary("🧰", event.text);
await emitToolSummary(event);
return;
}
if (event.type === "done" || event.type === "error") {
await flush(true);
resetTurnState();
}
};

View File

@@ -11,6 +11,7 @@ import { readAcpSessionEntry } from "../../acp/runtime/session-meta.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { TtsAutoMode } from "../../config/types.tts.js";
import { logVerbose } from "../../globals.js";
import { runMessageAction } from "../../infra/outbound/message-action-runner.js";
import { getSessionBindingService } from "../../infra/outbound/session-binding-service.js";
import { generateSecureUuid } from "../../infra/secure-random.js";
import { prefixSystemMessage } from "../../infra/system-message.js";
@@ -157,6 +158,7 @@ export async function tryDispatchAcpReply(params: {
originatingTo?: string;
shouldSendToolSummaries: boolean;
bypassForCommand: boolean;
onReplyStart?: () => Promise<void> | void;
recordProcessed: DispatchProcessedRecorder;
markIdle: (reason: string) => void;
}): Promise<AcpDispatchAttemptResult | null> {
@@ -182,9 +184,69 @@ export async function tryDispatchAcpReply(params: {
let queuedFinal = false;
let acpAccumulatedBlockText = "";
let acpBlockCount = 0;
let startedReplyLifecycle = false;
const toolUpdateMessageById = new Map<
string,
{
channel: string;
accountId?: string;
to: string;
threadId?: string | number;
messageId: string;
}
>();
const ensureReplyLifecycleStarted = async () => {
if (startedReplyLifecycle) {
return;
}
startedReplyLifecycle = true;
await params.onReplyStart?.();
};
const tryEditToolUpdate = async (payload: ReplyPayload, toolCallId: string): Promise<boolean> => {
if (!params.shouldRouteToOriginating || !params.originatingChannel || !params.originatingTo) {
return false;
}
const handle = toolUpdateMessageById.get(toolCallId);
if (!handle?.messageId) {
return false;
}
const message = payload.text?.trim();
if (!message) {
return false;
}
try {
await runMessageAction({
cfg: params.cfg,
action: "edit",
params: {
channel: handle.channel,
accountId: handle.accountId,
to: handle.to,
threadId: handle.threadId,
messageId: handle.messageId,
message,
},
sessionKey: params.ctx.SessionKey,
});
routedCounts.tool += 1;
return true;
} catch (error) {
logVerbose(
`dispatch-acp: tool message edit failed for ${toolCallId}: ${error instanceof Error ? error.message : String(error)}`,
);
return false;
}
};
const deliverAcpPayload = async (
kind: ReplyDispatchKind,
payload: ReplyPayload,
meta?: {
toolCallId?: string;
allowEdit?: boolean;
},
): Promise<boolean> => {
if (kind === "block" && payload.text?.trim()) {
if (acpAccumulatedBlockText.length > 0) {
@@ -193,6 +255,9 @@ export async function tryDispatchAcpReply(params: {
acpAccumulatedBlockText += payload.text;
acpBlockCount += 1;
}
if ((payload.text?.trim() ?? "").length > 0 || payload.mediaUrl || payload.mediaUrls?.length) {
await ensureReplyLifecycleStarted();
}
const ttsPayload = await maybeApplyTtsToPayload({
payload,
@@ -204,6 +269,13 @@ export async function tryDispatchAcpReply(params: {
});
if (params.shouldRouteToOriginating && params.originatingChannel && params.originatingTo) {
const toolCallId = meta?.toolCallId?.trim();
if (kind === "tool" && meta?.allowEdit === true && toolCallId) {
const edited = await tryEditToolUpdate(ttsPayload, toolCallId);
if (edited) {
return true;
}
}
const result = await routeReply({
payload: ttsPayload,
channel: params.originatingChannel,
@@ -219,6 +291,15 @@ export async function tryDispatchAcpReply(params: {
);
return false;
}
if (kind === "tool" && meta?.toolCallId && result.messageId) {
toolUpdateMessageById.set(meta.toolCallId, {
channel: params.originatingChannel,
accountId: params.ctx.AccountId,
to: params.originatingTo,
...(params.ctx.MessageThreadId != null ? { threadId: params.ctx.MessageThreadId } : {}),
messageId: result.messageId,
});
}
routedCounts[kind] += 1;
return true;
}

View File

@@ -371,6 +371,7 @@ export async function dispatchReplyFromConfig(params: {
originatingTo,
shouldSendToolSummaries,
bypassForCommand: bypassAcpForCommand,
onReplyStart: params.replyOptions?.onReplyStart,
recordProcessed,
markIdle,
});

View File

@@ -166,11 +166,27 @@ export const FIELD_HELP: Record<string, string> = {
"Allowlist of ACP target agent ids permitted for ACP runtime sessions. Empty means no additional allowlist restriction.",
"acp.maxConcurrentSessions":
"Maximum concurrently active ACP sessions across this gateway process.",
"acp.stream": "ACP streaming projection controls for chunk sizing and coalescer flush timing.",
"acp.stream":
"ACP streaming projection controls for chunk sizing, metadata visibility, and deduped delivery behavior.",
"acp.stream.coalesceIdleMs":
"Coalescer idle flush window in milliseconds for ACP streamed text before block replies are emitted.",
"acp.stream.maxChunkChars":
"Maximum chunk size for ACP streamed block projection before splitting into multiple block replies.",
"acp.stream.metaMode":
"ACP metadata projection mode: off suppresses status/tool lines, minimal dedupes aggressively, verbose streams non-identical updates.",
"acp.stream.showUsage":
"When true, usage_update events are projected as system lines only when usage values change.",
"acp.stream.deliveryMode":
"ACP delivery style: live streams block chunks incrementally, final_only buffers text deltas until terminal turn events.",
"acp.stream.maxTurnChars":
"Maximum assistant text characters projected per ACP turn before truncation notice is emitted.",
"acp.stream.maxToolSummaryChars":
"Maximum characters for projected ACP tool lifecycle/progress summary lines.",
"acp.stream.maxStatusChars": "Maximum characters for projected ACP status/meta lines.",
"acp.stream.maxMetaEventsPerTurn":
"Maximum ACP meta events projected per turn (text deltas continue unaffected).",
"acp.stream.tagVisibility":
"Per-sessionUpdate visibility overrides for ACP projection (for example usage_update, available_commands_update).",
"acp.runtime.ttlMinutes":
"Idle runtime TTL in minutes for ACP session workers before eligible cleanup.",
"acp.runtime.installCommand":

View File

@@ -369,6 +369,14 @@ export const FIELD_LABELS: Record<string, string> = {
"acp.stream": "ACP Stream",
"acp.stream.coalesceIdleMs": "ACP Stream Coalesce Idle (ms)",
"acp.stream.maxChunkChars": "ACP Stream Max Chunk Chars",
"acp.stream.metaMode": "ACP Stream Meta Mode",
"acp.stream.showUsage": "ACP Stream Show Usage",
"acp.stream.deliveryMode": "ACP Stream Delivery Mode",
"acp.stream.maxTurnChars": "ACP Stream Max Turn Chars",
"acp.stream.maxToolSummaryChars": "ACP Stream Max Tool Summary Chars",
"acp.stream.maxStatusChars": "ACP Stream Max Status Chars",
"acp.stream.maxMetaEventsPerTurn": "ACP Stream Max Meta Events Per Turn",
"acp.stream.tagVisibility": "ACP Stream Tag Visibility",
"acp.runtime.ttlMinutes": "ACP Runtime TTL (minutes)",
"acp.runtime.installCommand": "ACP Runtime Install Command",
models: "Models",

View File

@@ -1,3 +1,5 @@
import type { AcpSessionUpdateTag } from "../acp/runtime/types.js";
export type AcpDispatchConfig = {
/** Master switch for ACP turn dispatch in the reply pipeline. */
enabled?: boolean;
@@ -8,6 +10,25 @@ export type AcpStreamConfig = {
coalesceIdleMs?: number;
/** Maximum text size per streamed chunk. */
maxChunkChars?: number;
/** Controls how ACP meta/system updates are projected to channels. */
metaMode?: "off" | "minimal" | "verbose";
/** Toggles usage_update projection in channel-facing output. */
showUsage?: boolean;
/** Live streams chunks or waits for terminal event before delivery. */
deliveryMode?: "live" | "final_only";
/** Maximum assistant text characters forwarded per turn. */
maxTurnChars?: number;
/** Maximum visible characters for tool summary/meta lines. */
maxToolSummaryChars?: number;
/** Maximum visible characters for status lines. */
maxStatusChars?: number;
/** Maximum number of meta events projected per turn. */
maxMetaEventsPerTurn?: number;
/**
* Per-sessionUpdate visibility overrides.
* Keys not listed here fall back to OpenClaw defaults.
*/
tagVisibility?: Partial<Record<AcpSessionUpdateTag, boolean>>;
};
export type AcpRuntimeConfig = {

View File

@@ -339,6 +339,16 @@ export const OpenClawSchema = z
.object({
coalesceIdleMs: z.number().int().nonnegative().optional(),
maxChunkChars: z.number().int().positive().optional(),
metaMode: z
.union([z.literal("off"), z.literal("minimal"), z.literal("verbose")])
.optional(),
showUsage: z.boolean().optional(),
deliveryMode: z.union([z.literal("live"), z.literal("final_only")]).optional(),
maxTurnChars: z.number().int().positive().optional(),
maxToolSummaryChars: z.number().int().positive().optional(),
maxStatusChars: z.number().int().positive().optional(),
maxMetaEventsPerTurn: z.number().int().positive().optional(),
tagVisibility: z.record(z.string(), z.boolean()).optional(),
})
.strict()
.optional(),

View File

@@ -80,6 +80,7 @@ export type {
AcpRuntimeEvent,
AcpRuntimeHandle,
AcpRuntimePromptMode,
AcpSessionUpdateTag,
AcpRuntimeSessionMode,
AcpRuntimeStatus,
AcpRuntimeTurnInput,