ACPX: sync main ACP parser changes onto configurable-command branch

This commit is contained in:
Onur
2026-03-01 09:31:41 +01:00
committed by Onur Solmaz
parent cf3e4d2aef
commit 9cfc630be9
5 changed files with 301 additions and 486 deletions

View File

@@ -1,128 +1,63 @@
import { describe, expect, it } from "vitest";
import { PromptStreamProjector } from "./events.js";
import { parsePromptEventLine } from "./events.js";
function jsonLine(payload: unknown): string {
return JSON.stringify(payload);
}
function beginPrompt(projector: PromptStreamProjector, id = "req-1") {
projector.ingestLine(
jsonLine({
describe("parsePromptEventLine", () => {
it("parses raw ACP session/update agent_message_chunk lines", () => {
const line = JSON.stringify({
jsonrpc: "2.0",
id,
method: "session/prompt",
method: "session/update",
params: {
sessionId: "session-1",
prompt: [{ type: "text", text: "hello" }],
sessionId: "s1",
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "hello" },
},
},
}),
);
}
describe("PromptStreamProjector", () => {
it("maps agent message chunks to output deltas", () => {
const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "session-1",
update: {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: "hello world",
},
},
},
}),
);
expect(event).toEqual({
});
expect(parsePromptEventLine(line)).toEqual({
type: "text_delta",
text: "hello world",
text: "hello",
stream: "output",
tag: "agent_message_chunk",
});
});
it("preserves leading spaces in streamed output chunks", () => {
const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "session-1",
update: {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: " indented",
},
},
it("parses usage_update with stable metadata", () => {
const line = JSON.stringify({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "s1",
update: {
sessionUpdate: "usage_update",
used: 12,
size: 500,
},
}),
);
expect(event).toEqual({
type: "text_delta",
text: " indented",
stream: "output",
tag: "agent_message_chunk",
},
});
expect(parsePromptEventLine(line)).toEqual({
type: "status",
text: "usage updated: 12/500",
tag: "usage_update",
used: 12,
size: 500,
});
});
it("maps agent thought chunks to thought deltas", () => {
const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "session-1",
update: {
sessionUpdate: "agent_thought_chunk",
content: {
type: "text",
text: "thinking",
},
},
it("parses tool_call_update without using call ids as primary fallback label", () => {
const line = JSON.stringify({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "s1",
update: {
sessionUpdate: "tool_call_update",
toolCallId: "call_ABC123",
status: "in_progress",
},
}),
);
expect(event).toEqual({
type: "text_delta",
text: "thinking",
stream: "thought",
tag: "agent_thought_chunk",
},
});
});
it("maps tool call updates with metadata and stable fallback title", () => {
const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "session-1",
update: {
sessionUpdate: "tool_call_update",
toolCallId: "call_ABC123",
status: "in_progress",
},
},
}),
);
expect(event).toEqual({
expect(parsePromptEventLine(line)).toEqual({
type: "tool_call",
text: "tool call (in_progress)",
tag: "tool_call_update",
@@ -132,159 +67,15 @@ describe("PromptStreamProjector", () => {
});
});
it("maps usage updates with numeric metadata", () => {
const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "session-1",
update: {
sessionUpdate: "usage_update",
used: 12,
size: 500,
},
},
}),
);
expect(event).toEqual({
type: "status",
text: "usage updated: 12/500",
tag: "usage_update",
used: 12,
size: 500,
});
});
it("ignores replayed updates before current prompt starts", () => {
const projector = new PromptStreamProjector();
const replayed = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "session-1",
update: {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: "old turn",
},
},
},
}),
);
beginPrompt(projector, "req-2");
const current = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "session-1",
update: {
sessionUpdate: "agent_message_chunk",
content: {
type: "text",
text: "new turn",
},
},
},
}),
);
expect(replayed).toBeNull();
expect(current).toEqual({
it("keeps compatibility with simplified text/done lines", () => {
expect(parsePromptEventLine(JSON.stringify({ type: "text", content: "alpha" }))).toEqual({
type: "text_delta",
text: "new turn",
text: "alpha",
stream: "output",
tag: "agent_message_chunk",
});
});
it("maps prompt response stop reasons to done events", () => {
const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
id: "req-1",
result: {
stopReason: "end_turn",
},
}),
);
expect(event).toEqual({
expect(parsePromptEventLine(JSON.stringify({ type: "done", stopReason: "end_turn" }))).toEqual({
type: "done",
stopReason: "end_turn",
});
});
it("maps json-rpc errors to runtime errors", () => {
const projector = new PromptStreamProjector();
beginPrompt(projector);
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
id: "req-1",
error: {
code: -32000,
message: "adapter failed",
},
}),
);
expect(event).toEqual({
type: "error",
message: "adapter failed",
code: "-32000",
});
});
it("ignores non-prompt response errors", () => {
const projector = new PromptStreamProjector();
beginPrompt(projector, "3");
const loadError = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
id: 1,
error: {
code: -32002,
message: "Resource not found",
},
}),
);
const promptDone = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
id: 3,
result: {
stopReason: "end_turn",
},
}),
);
const trailingReplay = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
params: {
sessionId: "session-1",
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "should be ignored" },
},
},
}),
);
expect(loadError).toBeNull();
expect(promptDone).toEqual({
type: "done",
stopReason: "end_turn",
});
expect(trailingReplay).toBeNull();
});
});

View File

@@ -1,13 +1,28 @@
import type { AcpRuntimeEvent, AcpSessionUpdateTag } from "openclaw/plugin-sdk";
import { isAcpJsonRpcMessage, normalizeJsonRpcId } from "./jsonrpc.js";
import {
asOptionalBoolean,
asOptionalString,
asString,
asTrimmedString,
type AcpxErrorEvent,
type AcpxJsonObject,
isRecord,
} from "./shared.js";
export function toAcpxErrorEvent(value: unknown): AcpxErrorEvent | null {
if (!isRecord(value)) {
return null;
}
if (asTrimmedString(value.type) !== "error") {
return null;
}
return {
message: asTrimmedString(value.message) || "acpx reported an error",
code: asOptionalString(value.code),
retryable: asOptionalBoolean(value.retryable),
};
}
export function parseJsonLines(value: string): AcpxJsonObject[] {
const events: AcpxJsonObject[] = [];
for (const line of value.split(/\r?\n/)) {
@@ -31,24 +46,95 @@ function asOptionalFiniteNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function parsePromptStopReason(message: Record<string, unknown>): string | undefined {
if (!Object.hasOwn(message, "result")) {
return undefined;
function resolveStructuredPromptPayload(parsed: Record<string, unknown>): {
type: string;
payload: Record<string, unknown>;
tag?: AcpSessionUpdateTag;
} {
const method = asTrimmedString(parsed.method);
if (method === "session/update") {
const params = parsed.params;
if (isRecord(params) && isRecord(params.update)) {
const update = params.update;
const tag = asOptionalString(update.sessionUpdate) as AcpSessionUpdateTag | undefined;
return {
type: tag ?? "",
payload: update,
...(tag ? { tag } : {}),
};
}
}
const result = isRecord(message.result) ? message.result : null;
if (!result) {
return undefined;
const sessionUpdate = asOptionalString(parsed.sessionUpdate) as AcpSessionUpdateTag | undefined;
if (sessionUpdate) {
return {
type: sessionUpdate,
payload: parsed,
tag: sessionUpdate,
};
}
const stopReason = asString(result.stopReason);
return stopReason && stopReason.trim().length > 0 ? stopReason : undefined;
const type = asTrimmedString(parsed.type);
const tag = asOptionalString(parsed.tag) as AcpSessionUpdateTag | undefined;
return {
type,
payload: parsed,
...(tag ? { tag } : {}),
};
}
function resolveStatusTextForTag(params: {
tag: AcpSessionUpdateTag;
payload: Record<string, unknown>;
}): string | null {
const { tag, payload } = params;
if (tag === "available_commands_update") {
const commands = Array.isArray(payload.availableCommands) ? payload.availableCommands : [];
return commands.length > 0
? `available commands updated (${commands.length})`
: "available commands updated";
}
if (tag === "current_mode_update") {
const mode =
asTrimmedString(payload.currentModeId) ||
asTrimmedString(payload.modeId) ||
asTrimmedString(payload.mode);
return mode ? `mode updated: ${mode}` : "mode updated";
}
if (tag === "config_option_update") {
const id = asTrimmedString(payload.id) || asTrimmedString(payload.configOptionId);
const value =
asTrimmedString(payload.currentValue) ||
asTrimmedString(payload.value) ||
asTrimmedString(payload.optionValue);
if (id && value) {
return `config updated: ${id}=${value}`;
}
if (id) {
return `config updated: ${id}`;
}
return "config updated";
}
if (tag === "session_info_update") {
return (
asTrimmedString(payload.summary) || asTrimmedString(payload.message) || "session updated"
);
}
if (tag === "plan") {
const entries = Array.isArray(payload.entries) ? payload.entries : [];
const first = entries.find((entry) => isRecord(entry)) as Record<string, unknown> | undefined;
const content = asTrimmedString(first?.content);
return content ? `plan: ${content}` : null;
}
return null;
}
function resolveTextChunk(params: {
update: Record<string, unknown>;
payload: Record<string, unknown>;
stream: "output" | "thought";
tag: AcpSessionUpdateTag;
}): AcpRuntimeEvent | null {
const contentRaw = params.update.content;
const contentRaw = params.payload.content;
if (isRecord(contentRaw)) {
const contentType = asTrimmedString(contentRaw.type);
if (contentType && contentType !== "text") {
@@ -64,8 +150,7 @@ function resolveTextChunk(params: {
};
}
}
const text = asString(params.update.text);
const text = asString(params.payload.text);
if (!text || text.length === 0) {
return null;
}
@@ -77,106 +162,103 @@ function resolveTextChunk(params: {
};
}
function resolveStatusTextForTag(params: {
tag: AcpSessionUpdateTag;
update: Record<string, unknown>;
}): string | null {
const { tag, update } = params;
if (tag === "available_commands_update") {
const commands = Array.isArray(update.availableCommands) ? update.availableCommands : [];
return commands.length > 0
? `available commands updated (${commands.length})`
: "available commands updated";
export function parsePromptEventLine(line: string): AcpRuntimeEvent | null {
const trimmed = line.trim();
if (!trimmed) {
return null;
}
if (tag === "current_mode_update") {
const mode =
asTrimmedString(update.currentModeId) ||
asTrimmedString(update.modeId) ||
asTrimmedString(update.mode);
return mode ? `mode updated: ${mode}` : "mode updated";
let parsed: unknown;
try {
parsed = JSON.parse(trimmed);
} catch {
return {
type: "status",
text: trimmed,
};
}
if (tag === "config_option_update") {
const id = asTrimmedString(update.id) || asTrimmedString(update.configOptionId);
const value =
asTrimmedString(update.currentValue) ||
asTrimmedString(update.value) ||
asTrimmedString(update.optionValue);
if (id && value) {
return `config updated: ${id}=${value}`;
}
if (id) {
return `config updated: ${id}`;
}
return "config updated";
}
if (tag === "session_info_update") {
return asTrimmedString(update.summary) || asTrimmedString(update.message) || "session updated";
}
if (tag === "plan") {
const entries = Array.isArray(update.entries) ? update.entries : [];
const first = entries.find((entry) => isRecord(entry)) as Record<string, unknown> | undefined;
const content = asTrimmedString(first?.content);
if (!content) {
return "plan updated";
}
const status = asTrimmedString(first?.status);
return status ? `plan: [${status}] ${content}` : `plan: ${content}`;
}
return null;
}
function parseSessionUpdateEvent(message: Record<string, unknown>): AcpRuntimeEvent | null {
if (asTrimmedString(message.method) !== "session/update") {
return null;
}
const params = isRecord(message.params) ? message.params : null;
if (!params) {
return null;
}
const update = isRecord(params.update) ? params.update : null;
if (!update) {
if (!isRecord(parsed)) {
return null;
}
const tag = asOptionalString(update.sessionUpdate) as AcpSessionUpdateTag | undefined;
if (!tag) {
return null;
}
const structured = resolveStructuredPromptPayload(parsed);
const type = structured.type;
const payload = structured.payload;
const tag = structured.tag;
switch (tag) {
case "agent_message_chunk":
return resolveTextChunk({
update,
switch (type) {
case "text": {
const content = asString(payload.content);
if (content == null || content.length === 0) {
return null;
}
return {
type: "text_delta",
text: content,
stream: "output",
tag,
});
case "agent_thought_chunk":
return resolveTextChunk({
update,
...(tag ? { tag } : {}),
};
}
case "thought": {
const content = asString(payload.content);
if (content == null || content.length === 0) {
return null;
}
return {
type: "text_delta",
text: content,
stream: "thought",
tag,
});
case "tool_call":
case "tool_call_update": {
const title = asTrimmedString(update.title) || "tool call";
const status = asTrimmedString(update.status);
const toolCallId = asOptionalString(update.toolCallId);
...(tag ? { tag } : {}),
};
}
case "tool_call": {
const title = asTrimmedString(payload.title) || "tool call";
const status = asTrimmedString(payload.status);
const toolCallId = asOptionalString(payload.toolCallId);
return {
type: "tool_call",
text: status ? `${title} (${status})` : title,
tag,
tag: (tag ?? "tool_call") as AcpSessionUpdateTag,
...(toolCallId ? { toolCallId } : {}),
...(status ? { status } : {}),
title,
};
}
case "tool_call_update": {
const title = asTrimmedString(payload.title) || "tool call";
const status = asTrimmedString(payload.status);
const toolCallId = asOptionalString(payload.toolCallId);
const text = status ? `${title} (${status})` : title;
return {
type: "tool_call",
text,
tag: (tag ?? "tool_call_update") as AcpSessionUpdateTag,
...(toolCallId ? { toolCallId } : {}),
...(status ? { status } : {}),
title,
};
}
case "agent_message_chunk":
return resolveTextChunk({
payload,
stream: "output",
tag: "agent_message_chunk",
});
case "agent_thought_chunk":
return resolveTextChunk({
payload,
stream: "thought",
tag: "agent_thought_chunk",
});
case "usage_update": {
const used = asOptionalFiniteNumber(update.used);
const size = asOptionalFiniteNumber(update.size);
const used = asOptionalFiniteNumber(payload.used);
const size = asOptionalFiniteNumber(payload.size);
const text =
used != null && size != null ? `usage updated: ${used}/${size}` : "usage updated";
return {
type: "status",
text: used != null && size != null ? `usage updated: ${used}/${size}` : "usage updated",
tag,
text,
tag: "usage_update",
...(used != null ? { used } : {}),
...(size != null ? { size } : {}),
};
@@ -187,8 +269,8 @@ function parseSessionUpdateEvent(message: Record<string, unknown>): AcpRuntimeEv
case "session_info_update":
case "plan": {
const text = resolveStatusTextForTag({
tag,
update,
tag: type as AcpSessionUpdateTag,
payload,
});
if (!text) {
return null;
@@ -196,86 +278,42 @@ function parseSessionUpdateEvent(message: Record<string, unknown>): AcpRuntimeEv
return {
type: "status",
text,
tag,
tag: type as AcpSessionUpdateTag,
};
}
case "client_operation": {
const method = asTrimmedString(payload.method) || "operation";
const status = asTrimmedString(payload.status);
const summary = asTrimmedString(payload.summary);
const text = [method, status, summary].filter(Boolean).join(" ");
if (!text) {
return null;
}
return { type: "status", text, ...(tag ? { tag } : {}) };
}
case "update": {
const update = asTrimmedString(payload.update);
if (!update) {
return null;
}
return { type: "status", text: update, ...(tag ? { tag } : {}) };
}
case "done": {
return {
type: "done",
stopReason: asOptionalString(payload.stopReason),
};
}
case "error": {
const message = asTrimmedString(payload.message) || "acpx runtime error";
return {
type: "error",
message,
code: asOptionalString(payload.code),
retryable: asOptionalBoolean(payload.retryable),
};
}
default:
return null;
}
}
export class PromptStreamProjector {
private readonly promptRequestIds = new Set<string>();
ingestLine(line: string): AcpRuntimeEvent | null {
const trimmed = line.trim();
if (!trimmed) {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(trimmed);
} catch {
return {
type: "status",
text: trimmed,
};
}
if (!isRecord(parsed) || !isAcpJsonRpcMessage(parsed)) {
return null;
}
if (asTrimmedString(parsed.method) === "session/prompt") {
const id = normalizeJsonRpcId(parsed.id);
if (id) {
this.promptRequestIds.add(id);
}
return null;
}
const updateEvent = parseSessionUpdateEvent(parsed);
if (updateEvent) {
return this.promptRequestIds.size > 0 ? updateEvent : null;
}
if (Object.hasOwn(parsed, "error")) {
if (!this.consumePromptResponse(parsed)) {
return null;
}
const error = isRecord(parsed.error) ? parsed.error : null;
const message = asTrimmedString(error?.message);
const codeValue = error?.code;
return {
type: "error",
message: message || "acpx runtime error",
code:
typeof codeValue === "number" && Number.isFinite(codeValue)
? String(codeValue)
: asOptionalString(codeValue),
};
}
const stopReason = parsePromptStopReason(parsed);
if (!stopReason || !this.consumePromptResponse(parsed)) {
return null;
}
return {
type: "done",
stopReason,
};
}
private consumePromptResponse(message: Record<string, unknown>): boolean {
const id = normalizeJsonRpcId(message.id);
if (!id) {
return false;
}
if (!this.promptRequestIds.has(id)) {
return false;
}
this.promptRequestIds.delete(id);
return true;
}
}

View File

@@ -185,12 +185,9 @@ if (command === "prompt") {
if (stdinText.includes("trigger-error")) {
emitJson({
jsonrpc: "2.0",
id: requestId,
error: {
code: -32000,
message: "mock failure",
},
type: "error",
code: "-32000",
message: "mock failure",
});
process.exit(1);
}
@@ -208,13 +205,7 @@ if (command === "prompt") {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: " gamma" },
});
emitJson({
jsonrpc: "2.0",
id: requestId,
result: {
stopReason: "end_turn",
},
});
emitJson({ type: "done", stopReason: "end_turn" });
process.exit(0);
}
@@ -223,20 +214,8 @@ if (command === "prompt") {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "ok" },
});
emitJson({
jsonrpc: "2.0",
id: requestId,
result: {
stopReason: "end_turn",
},
});
emitJson({
jsonrpc: "2.0",
id: requestId,
result: {
stopReason: "end_turn",
},
});
emitJson({ type: "done", stopReason: "end_turn" });
emitJson({ type: "done", stopReason: "end_turn" });
process.exit(0);
}
@@ -255,22 +234,15 @@ if (command === "prompt") {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "echo:" + stdinText.trim() },
});
emitJson({
jsonrpc: "2.0",
id: requestId,
result: {
stopReason: "end_turn",
},
});
emitJson({ type: "done", stopReason: "end_turn" });
process.exit(0);
}
writeLog({ kind: "unknown", args });
emitJson({
error: {
code: "USAGE",
message: "unknown command",
},
type: "error",
code: "USAGE",
message: "unknown command",
});
process.exit(2);
`;

View File

@@ -66,20 +66,32 @@ describe("AcpxRuntime", () => {
events.push(event);
}
expect(events).toContainEqual({
type: "text_delta",
text: "thinking",
stream: "thought",
});
expect(events).toContainEqual({
type: "tool_call",
text: "run-tests (in_progress)",
});
expect(events).toContainEqual({
type: "text_delta",
text: "echo:hello world",
stream: "output",
});
expect(events).toEqual(
expect.arrayContaining([
expect.objectContaining({
type: "text_delta",
text: "thinking",
stream: "thought",
}),
]),
);
expect(events).toEqual(
expect.arrayContaining([
expect.objectContaining({
type: "tool_call",
text: "run-tests (in_progress)",
}),
]),
);
expect(events).toEqual(
expect.arrayContaining([
expect.objectContaining({
type: "text_delta",
text: "echo:hello world",
stream: "output",
}),
]),
);
expect(events).toContainEqual({
type: "done",
stopReason: "end_turn",

View File

@@ -14,8 +14,11 @@ import type {
import { AcpRuntimeError } from "openclaw/plugin-sdk";
import { type ResolvedAcpxPluginConfig } from "./config.js";
import { checkAcpxVersion } from "./ensure.js";
import { parseControlJsonError } from "./runtime-internals/control-errors.js";
import { parseJsonLines, PromptStreamProjector } from "./runtime-internals/events.js";
import {
parseJsonLines,
parsePromptEventLine,
toAcpxErrorEvent,
} from "./runtime-internals/events.js";
import {
resolveSpawnFailure,
spawnAndCollect,
@@ -194,7 +197,6 @@ export class AcpxRuntime implements AcpRuntime {
sessionName: state.name,
cwd: state.cwd,
});
const projector = new PromptStreamProjector();
const cancelOnAbort = async () => {
await this.cancel({
@@ -236,7 +238,7 @@ export class AcpxRuntime implements AcpRuntime {
const lines = createInterface({ input: child.stdout });
try {
for await (const line of lines) {
const parsed = projector.ingestLine(line);
const parsed = parsePromptEventLine(line);
if (!parsed) {
continue;
}
@@ -307,7 +309,7 @@ export class AcpxRuntime implements AcpRuntime {
fallbackCode: "ACP_TURN_FAILED",
ignoreNoSession: true,
});
const detail = events.find((event) => !parseControlJsonError(event)) ?? events[0];
const detail = events.find((event) => !toAcpxErrorEvent(event)) ?? events[0];
if (!detail) {
return {
summary: "acpx status unavailable",
@@ -553,7 +555,7 @@ export class AcpxRuntime implements AcpRuntime {
}
const events = parseJsonLines(result.stdout);
const errorEvent = events.map((event) => parseControlJsonError(event)).find(Boolean) ?? null;
const errorEvent = events.map((event) => toAcpxErrorEvent(event)).find(Boolean) ?? null;
if (errorEvent) {
if (params.ignoreNoSession && errorEvent.code === "NO_SESSION") {
return events;