mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-16 18:34:18 +00:00
Summary: - Remap cron-run async exec, ACP, node-event, and watchdog completion events to the queue heartbeat drains. - Route per-sender cron-run events to the agent main queue and global-scope events to `global` while preserving the originating `agentId`. - Tighten exec-completion classification and treat cron-run descendants as ephemeral for retention pruning. Verification: - CI: https://github.com/openclaw/openclaw/actions/runs/25625964382 passed. - Real behavior proof: https://github.com/openclaw/openclaw/actions/runs/25666664248/job/75340798016 passed. - PR targeted validation: `node scripts/run-vitest.mjs run --config test/vitest/vitest.unit.config.ts src/routing/session-key.test.ts src/infra/heartbeat-events-filter.test.ts src/agents/bash-tools.exec-runtime.test.ts src/agents/acp-spawn-parent-stream.test.ts src/cron/session-reaper.test.ts`. - PR targeted typecheck: `pnpm tsgo:core`. Refs #52305. Related: #18237, #14191. Co-authored-by: Kaspre <kaspre@gmail.com>
769 lines
26 KiB
TypeScript
769 lines
26 KiB
TypeScript
import crypto from "node:crypto";
|
|
import { shouldLogVerbose } from "../../globals.js";
|
|
import { emitAgentEvent } from "../../infra/agent-events.js";
|
|
import { isTruthyEnvValue } from "../../infra/env.js";
|
|
import { requestHeartbeat as requestHeartbeatImpl } from "../../infra/heartbeat-wake.js";
|
|
import { sanitizeHostExecEnv } from "../../infra/host-env-security.js";
|
|
import { enqueueSystemEvent as enqueueSystemEventImpl } from "../../infra/system-events.js";
|
|
import { getProcessSupervisor as getProcessSupervisorImpl } from "../../process/supervisor/index.js";
|
|
import { resolveEventSessionKey, scopedHeartbeatWakeOptions } from "../../routing/session-key.js";
|
|
import { appendBootstrapPromptWarning } from "../bootstrap-budget.js";
|
|
import {
|
|
createCliJsonlStreamingParser,
|
|
extractCliErrorMessage,
|
|
parseCliOutput,
|
|
type CliOutput,
|
|
} from "../cli-output.js";
|
|
import { FailoverError, resolveFailoverStatus } from "../failover-error.js";
|
|
import { classifyFailoverReason } from "../pi-embedded-helpers.js";
|
|
import { applyPluginTextReplacements } from "../plugin-text-transforms.js";
|
|
import { applySkillEnvOverridesFromSnapshot } from "../skills.js";
|
|
import { runClaudeLiveSessionTurn, shouldUseClaudeLiveSession } from "./claude-live-session.js";
|
|
import { prepareClaudeCliSkillsPlugin } from "./claude-skills-plugin.js";
|
|
import {
|
|
buildCliSupervisorScopeKey,
|
|
buildCliArgs,
|
|
resolveCliRunQueueKey,
|
|
enqueueCliRun,
|
|
prepareCliPromptImagePayload,
|
|
resolveCliNoOutputTimeoutMs,
|
|
resolvePromptInput,
|
|
resolveSessionIdToSend,
|
|
resolveSystemPromptUsage,
|
|
writeCliSystemPromptFile,
|
|
} from "./helpers.js";
|
|
import {
|
|
cliBackendLog,
|
|
CLI_BACKEND_LOG_OUTPUT_ENV,
|
|
LEGACY_CLAUDE_CLI_LOG_OUTPUT_ENV,
|
|
} from "./log.js";
|
|
import type { PreparedCliRunContext } from "./types.js";
|
|
|
|
const executeDeps = {
|
|
getProcessSupervisor: getProcessSupervisorImpl,
|
|
enqueueSystemEvent: enqueueSystemEventImpl,
|
|
requestHeartbeat: requestHeartbeatImpl,
|
|
};
|
|
|
|
const CLI_RUNNER_OUTPUT_TAIL_BYTES = 64 * 1024;
|
|
const CLI_RUNNER_OUTPUT_PARSE_BYTES = 1024 * 1024;
|
|
|
|
function appendCliOutputTail(tail: Buffer, chunk: string): Buffer {
|
|
if (!chunk) {
|
|
return tail;
|
|
}
|
|
const chunkBuffer = Buffer.from(chunk);
|
|
if (chunkBuffer.byteLength >= CLI_RUNNER_OUTPUT_TAIL_BYTES) {
|
|
return Buffer.from(chunkBuffer.subarray(chunkBuffer.byteLength - CLI_RUNNER_OUTPUT_TAIL_BYTES));
|
|
}
|
|
const next = Buffer.concat([tail, chunkBuffer], tail.byteLength + chunkBuffer.byteLength);
|
|
if (next.byteLength <= CLI_RUNNER_OUTPUT_TAIL_BYTES) {
|
|
return next;
|
|
}
|
|
return Buffer.from(next.subarray(next.byteLength - CLI_RUNNER_OUTPUT_TAIL_BYTES));
|
|
}
|
|
|
|
function appendCliOutputParseBuffer(
|
|
buffer: Buffer,
|
|
chunk: string,
|
|
): { buffer: Buffer; exceeded: boolean } {
|
|
if (!chunk) {
|
|
return { buffer, exceeded: false };
|
|
}
|
|
const chunkBuffer = Buffer.from(chunk);
|
|
if (buffer.byteLength + chunkBuffer.byteLength > CLI_RUNNER_OUTPUT_PARSE_BYTES) {
|
|
const remainingBytes = CLI_RUNNER_OUTPUT_PARSE_BYTES - buffer.byteLength;
|
|
if (remainingBytes <= 0) {
|
|
return { buffer, exceeded: true };
|
|
}
|
|
return {
|
|
buffer: Buffer.concat(
|
|
[buffer, chunkBuffer.subarray(0, remainingBytes)],
|
|
CLI_RUNNER_OUTPUT_PARSE_BYTES,
|
|
),
|
|
exceeded: true,
|
|
};
|
|
}
|
|
return {
|
|
buffer: Buffer.concat([buffer, chunkBuffer], buffer.byteLength + chunkBuffer.byteLength),
|
|
exceeded: false,
|
|
};
|
|
}
|
|
|
|
export function setCliRunnerExecuteTestDeps(overrides: Partial<typeof executeDeps>): void {
|
|
Object.assign(executeDeps, overrides);
|
|
}
|
|
|
|
function createCliAbortError(): Error {
|
|
const error = new Error("CLI run aborted");
|
|
error.name = "AbortError";
|
|
return error;
|
|
}
|
|
|
|
function buildCliLogArgs(params: {
|
|
args: string[];
|
|
systemPromptArg?: string;
|
|
sessionArg?: string;
|
|
modelArg?: string;
|
|
imageArg?: string;
|
|
argsPrompt?: string;
|
|
}): string[] {
|
|
const logArgs: string[] = [];
|
|
for (let i = 0; i < params.args.length; i += 1) {
|
|
const arg = params.args[i] ?? "";
|
|
if (arg === params.systemPromptArg) {
|
|
const systemPromptValue = params.args[i + 1] ?? "";
|
|
logArgs.push(arg, `<systemPrompt:${systemPromptValue.length} chars>`);
|
|
i += 1;
|
|
continue;
|
|
}
|
|
if (arg === params.sessionArg) {
|
|
logArgs.push(arg, params.args[i + 1] ?? "");
|
|
i += 1;
|
|
continue;
|
|
}
|
|
if (arg === params.modelArg) {
|
|
logArgs.push(arg, params.args[i + 1] ?? "");
|
|
i += 1;
|
|
continue;
|
|
}
|
|
if (arg === params.imageArg) {
|
|
logArgs.push(arg, "<image>");
|
|
i += 1;
|
|
continue;
|
|
}
|
|
logArgs.push(arg);
|
|
}
|
|
if (params.argsPrompt) {
|
|
const promptIndex = logArgs.indexOf(params.argsPrompt);
|
|
if (promptIndex >= 0) {
|
|
logArgs[promptIndex] = `<prompt:${params.argsPrompt.length} chars>`;
|
|
}
|
|
}
|
|
return logArgs;
|
|
}
|
|
|
|
const CLI_ENV_AUTH_LOG_KEYS = [
|
|
"AI_GATEWAY_API_KEY",
|
|
"ANTHROPIC_API_KEY",
|
|
"ANTHROPIC_API_KEY_OLD",
|
|
"ANTHROPIC_API_TOKEN",
|
|
"ANTHROPIC_AUTH_TOKEN",
|
|
"ANTHROPIC_BASE_URL",
|
|
"ANTHROPIC_CUSTOM_HEADERS",
|
|
"ANTHROPIC_OAUTH_TOKEN",
|
|
"ANTHROPIC_UNIX_SOCKET",
|
|
"AZURE_OPENAI_API_KEY",
|
|
"CLAUDE_CODE_OAUTH_TOKEN",
|
|
"CLAUDE_CODE_PROVIDER_MANAGED_BY_HOST",
|
|
"OPENAI_API_KEY",
|
|
"OPENAI_STEIPETE_API_KEY",
|
|
"OPENROUTER_API_KEY",
|
|
] as const;
|
|
|
|
const CLI_BACKEND_PRESERVE_ENV = "OPENCLAW_LIVE_CLI_BACKEND_PRESERVE_ENV";
|
|
|
|
function parseCliBackendPreserveEnv(raw: string | undefined): Set<string> {
|
|
const trimmed = raw?.trim();
|
|
if (!trimmed) {
|
|
return new Set();
|
|
}
|
|
if (trimmed.startsWith("[")) {
|
|
try {
|
|
const parsed = JSON.parse(trimmed) as unknown;
|
|
return new Set(
|
|
Array.isArray(parsed)
|
|
? parsed.filter((entry): entry is string => typeof entry === "string")
|
|
: [],
|
|
);
|
|
} catch {
|
|
return new Set();
|
|
}
|
|
}
|
|
return new Set(
|
|
trimmed
|
|
.split(/[,\s]+/)
|
|
.map((entry) => entry.trim())
|
|
.filter((entry) => entry.length > 0),
|
|
);
|
|
}
|
|
|
|
function listPresentCliAuthEnvKeys(env: Record<string, string | undefined>): string[] {
|
|
return CLI_ENV_AUTH_LOG_KEYS.filter((key) => {
|
|
const value = env[key];
|
|
return typeof value === "string" && value.length > 0;
|
|
});
|
|
}
|
|
|
|
function formatCliEnvKeyList(keys: readonly string[]): string {
|
|
return keys.length > 0 ? keys.join(",") : "none";
|
|
}
|
|
|
|
function buildCliEnvMcpLog(childEnv: Record<string, string>): string {
|
|
return [
|
|
`token=${childEnv.OPENCLAW_MCP_TOKEN ? "set" : "missing"}`,
|
|
`sessionKey=${childEnv.OPENCLAW_MCP_SESSION_KEY ? "set" : "<empty>"}`,
|
|
`agentId=${childEnv.OPENCLAW_MCP_AGENT_ID || "<empty>"}`,
|
|
`accountId=${childEnv.OPENCLAW_MCP_ACCOUNT_ID || "<empty>"}`,
|
|
`messageChannel=${childEnv.OPENCLAW_MCP_MESSAGE_CHANNEL || "<empty>"}`,
|
|
`senderIsOwner=${childEnv.OPENCLAW_MCP_SENDER_IS_OWNER || "<empty>"}`,
|
|
].join(" ");
|
|
}
|
|
|
|
function fingerprintCliSessionId(sessionId?: string): string {
|
|
const trimmed = sessionId?.trim();
|
|
if (!trimmed) {
|
|
return "none";
|
|
}
|
|
return crypto.createHash("sha256").update(trimmed).digest("hex").slice(0, 12);
|
|
}
|
|
|
|
export function buildCliExecLogLine(params: {
|
|
provider: string;
|
|
model: string;
|
|
promptChars: number;
|
|
trigger?: string;
|
|
useResume: boolean;
|
|
cliSessionId?: string;
|
|
resolvedSessionId?: string;
|
|
reusableSessionId?: string;
|
|
invalidatedReason?: string;
|
|
hasHistoryPrompt: boolean;
|
|
}): string {
|
|
const reuseState = params.reusableSessionId
|
|
? "reusable"
|
|
: params.invalidatedReason
|
|
? `invalidated:${params.invalidatedReason}`
|
|
: "none";
|
|
return [
|
|
`cli exec: provider=${params.provider}`,
|
|
`model=${params.model}`,
|
|
`promptChars=${params.promptChars}`,
|
|
`trigger=${params.trigger ?? "unknown"}`,
|
|
`useResume=${params.useResume ? "true" : "false"}`,
|
|
`session=${params.cliSessionId ? "present" : "none"}`,
|
|
`resumeSession=${params.useResume ? fingerprintCliSessionId(params.resolvedSessionId) : "none"}`,
|
|
`reuse=${reuseState}`,
|
|
`historyPrompt=${params.hasHistoryPrompt ? "present" : "none"}`,
|
|
].join(" ");
|
|
}
|
|
|
|
export function buildCliEnvAuthLog(childEnv: Record<string, string>): string {
|
|
const hostKeys = listPresentCliAuthEnvKeys(process.env);
|
|
const childKeys = listPresentCliAuthEnvKeys(childEnv);
|
|
const childKeySet = new Set(childKeys);
|
|
const clearedKeys = hostKeys.filter((key) => !childKeySet.has(key));
|
|
return [
|
|
`host=${formatCliEnvKeyList(hostKeys)}`,
|
|
`child=${formatCliEnvKeyList(childKeys)}`,
|
|
`cleared=${formatCliEnvKeyList(clearedKeys)}`,
|
|
].join(" ");
|
|
}
|
|
|
|
export async function executePreparedCliRun(
|
|
context: PreparedCliRunContext,
|
|
cliSessionIdToUse?: string,
|
|
): Promise<CliOutput> {
|
|
const params = context.params;
|
|
if (params.abortSignal?.aborted) {
|
|
throw createCliAbortError();
|
|
}
|
|
const backend = context.preparedBackend.backend;
|
|
const { sessionId: resolvedSessionId, isNew } = resolveSessionIdToSend({
|
|
backend,
|
|
cliSessionId: cliSessionIdToUse,
|
|
});
|
|
const useResume = Boolean(
|
|
cliSessionIdToUse && resolvedSessionId && backend.resumeArgs && backend.resumeArgs.length > 0,
|
|
);
|
|
const systemPromptArg = resolveSystemPromptUsage({
|
|
backend,
|
|
isNewSession: isNew,
|
|
systemPrompt: context.systemPrompt,
|
|
});
|
|
const systemPromptFile =
|
|
!useResume && systemPromptArg
|
|
? await writeCliSystemPromptFile({
|
|
backend,
|
|
systemPrompt: systemPromptArg,
|
|
})
|
|
: undefined;
|
|
|
|
const basePrompt = cliSessionIdToUse
|
|
? params.prompt
|
|
: (context.openClawHistoryPrompt ?? params.prompt);
|
|
let prompt = applyPluginTextReplacements(
|
|
appendBootstrapPromptWarning(basePrompt, context.bootstrapPromptWarningLines, {
|
|
preserveExactPrompt: context.heartbeatPrompt,
|
|
}),
|
|
context.backendResolved.textTransforms?.input,
|
|
);
|
|
const {
|
|
prompt: promptWithImages,
|
|
imagePaths,
|
|
cleanupImages,
|
|
} = await prepareCliPromptImagePayload({
|
|
backend,
|
|
prompt,
|
|
workspaceDir: context.workspaceDir,
|
|
images: params.images,
|
|
});
|
|
prompt = promptWithImages;
|
|
|
|
const { argsPrompt, stdin } = resolvePromptInput({
|
|
backend,
|
|
prompt,
|
|
});
|
|
const stdinPayload = stdin ?? "";
|
|
const baseArgs = useResume ? (backend.resumeArgs ?? backend.args ?? []) : (backend.args ?? []);
|
|
const resolvedArgs = useResume
|
|
? baseArgs.map((entry) => entry.replaceAll("{sessionId}", resolvedSessionId ?? ""))
|
|
: baseArgs;
|
|
const claudeSkillsPlugin = await prepareClaudeCliSkillsPlugin({
|
|
backendId: context.backendResolved.id,
|
|
skillsSnapshot: params.skillsSnapshot,
|
|
});
|
|
let claudeSkillsPluginCleanupOwned = false;
|
|
const baseArgsWithSkills =
|
|
claudeSkillsPlugin.args.length > 0
|
|
? [...resolvedArgs, ...claudeSkillsPlugin.args]
|
|
: resolvedArgs;
|
|
const executionBaseArgs =
|
|
context.backendResolved.resolveExecutionArgs?.({
|
|
config: params.config,
|
|
workspaceDir: context.workspaceDir,
|
|
provider: params.provider,
|
|
modelId: context.modelId,
|
|
authProfileId: context.effectiveAuthProfileId,
|
|
thinkingLevel: params.thinkLevel,
|
|
useResume,
|
|
baseArgs: baseArgsWithSkills,
|
|
}) ?? baseArgsWithSkills;
|
|
const args = buildCliArgs({
|
|
backend,
|
|
baseArgs: Array.from(executionBaseArgs),
|
|
modelId: context.normalizedModel,
|
|
sessionId: resolvedSessionId,
|
|
systemPrompt: systemPromptArg,
|
|
systemPromptFilePath: systemPromptFile?.filePath,
|
|
imagePaths,
|
|
promptArg: argsPrompt,
|
|
useResume,
|
|
});
|
|
|
|
const queueKey = resolveCliRunQueueKey({
|
|
backendId: context.backendResolved.id,
|
|
serialize: backend.serialize,
|
|
runId: params.runId,
|
|
workspaceDir: context.workspaceDir,
|
|
cliSessionId: useResume ? resolvedSessionId : undefined,
|
|
});
|
|
|
|
try {
|
|
return await enqueueCliRun(queueKey, async () => {
|
|
const restoreSkillEnv = params.skillsSnapshot
|
|
? applySkillEnvOverridesFromSnapshot({
|
|
snapshot: params.skillsSnapshot,
|
|
config: params.config,
|
|
})
|
|
: undefined;
|
|
try {
|
|
cliBackendLog.info(
|
|
buildCliExecLogLine({
|
|
provider: params.provider,
|
|
model: context.normalizedModel,
|
|
promptChars: basePrompt.length,
|
|
trigger: params.trigger,
|
|
useResume,
|
|
cliSessionId: cliSessionIdToUse,
|
|
resolvedSessionId,
|
|
reusableSessionId: context.reusableCliSession.sessionId,
|
|
invalidatedReason: context.reusableCliSession.invalidatedReason,
|
|
hasHistoryPrompt: Boolean(context.openClawHistoryPrompt),
|
|
}),
|
|
);
|
|
const logOutputText =
|
|
isTruthyEnvValue(process.env[CLI_BACKEND_LOG_OUTPUT_ENV]) ||
|
|
isTruthyEnvValue(process.env[LEGACY_CLAUDE_CLI_LOG_OUTPUT_ENV]);
|
|
const env = (() => {
|
|
const next = sanitizeHostExecEnv({
|
|
baseEnv: process.env,
|
|
blockPathOverrides: true,
|
|
});
|
|
const preservedEnv = parseCliBackendPreserveEnv(process.env[CLI_BACKEND_PRESERVE_ENV]);
|
|
for (const key of backend.clearEnv ?? []) {
|
|
if (preservedEnv.has(key)) {
|
|
continue;
|
|
}
|
|
delete next[key];
|
|
}
|
|
if (backend.env && Object.keys(backend.env).length > 0) {
|
|
Object.assign(
|
|
next,
|
|
sanitizeHostExecEnv({
|
|
baseEnv: {},
|
|
overrides: backend.env,
|
|
blockPathOverrides: true,
|
|
}),
|
|
);
|
|
}
|
|
Object.assign(next, context.preparedBackend.env);
|
|
|
|
// Never mark Claude CLI as host-managed. That marker routes runs into
|
|
// Anthropic's separate host-managed usage tier instead of normal CLI
|
|
// subscription behavior.
|
|
delete next["CLAUDE_CODE_PROVIDER_MANAGED_BY_HOST"];
|
|
|
|
return next;
|
|
})();
|
|
if (logOutputText) {
|
|
const logArgs = buildCliLogArgs({
|
|
args,
|
|
systemPromptArg: backend.systemPromptArg,
|
|
sessionArg: backend.sessionArg,
|
|
modelArg: backend.modelArg,
|
|
imageArg: backend.imageArg,
|
|
argsPrompt,
|
|
});
|
|
cliBackendLog.info(`cli argv: ${backend.command} ${logArgs.join(" ")}`);
|
|
cliBackendLog.info(`cli env auth: ${buildCliEnvAuthLog(env)}`);
|
|
if (
|
|
env.OPENCLAW_MCP_TOKEN ||
|
|
env.OPENCLAW_MCP_SESSION_KEY ||
|
|
env.OPENCLAW_MCP_SENDER_IS_OWNER
|
|
) {
|
|
cliBackendLog.info(`cli env mcp: ${buildCliEnvMcpLog(env)}`);
|
|
}
|
|
}
|
|
|
|
const noOutputTimeoutMs = resolveCliNoOutputTimeoutMs({
|
|
backend,
|
|
timeoutMs: params.timeoutMs,
|
|
useResume,
|
|
trigger: params.trigger,
|
|
});
|
|
const outputMode = useResume ? (backend.resumeOutput ?? backend.output) : backend.output;
|
|
const hasJsonlOutput = outputMode === "jsonl";
|
|
if (shouldUseClaudeLiveSession(context)) {
|
|
if (!hasJsonlOutput) {
|
|
throw new Error("Claude live session requires JSONL streaming parser");
|
|
}
|
|
params.onExecutionPhase?.({
|
|
phase: "process_spawned",
|
|
provider: params.provider,
|
|
model: context.modelId,
|
|
backend: context.backendResolved.id,
|
|
});
|
|
claudeSkillsPluginCleanupOwned = true;
|
|
const ownedPreparedBackendCleanup = context.preparedBackend.cleanup;
|
|
context.preparedBackend.cleanup = undefined;
|
|
const liveResult = await runClaudeLiveSessionTurn({
|
|
context,
|
|
args,
|
|
env,
|
|
prompt,
|
|
useResume,
|
|
noOutputTimeoutMs,
|
|
getProcessSupervisor: executeDeps.getProcessSupervisor,
|
|
onAssistantDelta: ({ text, delta }) => {
|
|
emitAgentEvent({
|
|
runId: params.runId,
|
|
stream: "assistant",
|
|
data: {
|
|
text: applyPluginTextReplacements(
|
|
text,
|
|
context.backendResolved.textTransforms?.output,
|
|
),
|
|
delta: applyPluginTextReplacements(
|
|
delta,
|
|
context.backendResolved.textTransforms?.output,
|
|
),
|
|
},
|
|
});
|
|
},
|
|
cleanup: async () => {
|
|
try {
|
|
await claudeSkillsPlugin.cleanup();
|
|
} finally {
|
|
await ownedPreparedBackendCleanup?.();
|
|
}
|
|
},
|
|
});
|
|
const rawText = liveResult.output.text;
|
|
return {
|
|
...liveResult.output,
|
|
rawText,
|
|
finalPromptText: prompt,
|
|
text: applyPluginTextReplacements(
|
|
rawText,
|
|
context.backendResolved.textTransforms?.output,
|
|
),
|
|
};
|
|
}
|
|
const streamingParser = hasJsonlOutput
|
|
? createCliJsonlStreamingParser({
|
|
backend,
|
|
providerId: context.backendResolved.id,
|
|
onAssistantDelta: ({ text, delta }) => {
|
|
emitAgentEvent({
|
|
runId: params.runId,
|
|
stream: "assistant",
|
|
data: {
|
|
text: applyPluginTextReplacements(
|
|
text,
|
|
context.backendResolved.textTransforms?.output,
|
|
),
|
|
delta: applyPluginTextReplacements(
|
|
delta,
|
|
context.backendResolved.textTransforms?.output,
|
|
),
|
|
},
|
|
});
|
|
},
|
|
})
|
|
: null;
|
|
const supervisor = executeDeps.getProcessSupervisor();
|
|
const scopeKey = buildCliSupervisorScopeKey({
|
|
backend,
|
|
backendId: context.backendResolved.id,
|
|
cliSessionId: useResume ? resolvedSessionId : undefined,
|
|
});
|
|
let stdoutTail: Buffer = Buffer.alloc(0);
|
|
let stdoutParseBuffer: Buffer = Buffer.alloc(0);
|
|
let stdoutParseExceeded = false;
|
|
let stderrTail: Buffer = Buffer.alloc(0);
|
|
let stderrParseBuffer: Buffer = Buffer.alloc(0);
|
|
let stderrParseExceeded = false;
|
|
|
|
params.onExecutionPhase?.({
|
|
phase: "process_spawned",
|
|
provider: params.provider,
|
|
model: context.modelId,
|
|
backend: context.backendResolved.id,
|
|
});
|
|
const managedRun = await supervisor.spawn({
|
|
sessionId: params.sessionId,
|
|
backendId: context.backendResolved.id,
|
|
scopeKey,
|
|
replaceExistingScope: Boolean(useResume && scopeKey),
|
|
mode: "child",
|
|
argv: [backend.command, ...args],
|
|
timeoutMs: params.timeoutMs,
|
|
noOutputTimeoutMs,
|
|
cwd: context.workspaceDir,
|
|
env,
|
|
input: stdinPayload,
|
|
captureOutput: false,
|
|
onStdout: (chunk: string) => {
|
|
stdoutTail = appendCliOutputTail(stdoutTail, chunk);
|
|
if (!stdoutParseExceeded) {
|
|
const nextStdoutParse = appendCliOutputParseBuffer(stdoutParseBuffer, chunk);
|
|
stdoutParseBuffer = nextStdoutParse.buffer;
|
|
stdoutParseExceeded = nextStdoutParse.exceeded;
|
|
}
|
|
streamingParser?.push(chunk);
|
|
},
|
|
onStderr: (chunk: string) => {
|
|
stderrTail = appendCliOutputTail(stderrTail, chunk);
|
|
if (!stderrParseExceeded) {
|
|
const nextStderrParse = appendCliOutputParseBuffer(stderrParseBuffer, chunk);
|
|
stderrParseBuffer = nextStderrParse.buffer;
|
|
stderrParseExceeded = nextStderrParse.exceeded;
|
|
}
|
|
},
|
|
});
|
|
let replyBackendCompleted = false;
|
|
const replyBackendHandle = params.replyOperation
|
|
? {
|
|
kind: "cli" as const,
|
|
cancel: () => {
|
|
managedRun.cancel("manual-cancel");
|
|
},
|
|
isStreaming: () => !replyBackendCompleted,
|
|
}
|
|
: undefined;
|
|
if (replyBackendHandle) {
|
|
params.replyOperation?.attachBackend(replyBackendHandle);
|
|
}
|
|
const abortManagedRun = () => {
|
|
managedRun.cancel("manual-cancel");
|
|
};
|
|
params.abortSignal?.addEventListener("abort", abortManagedRun, { once: true });
|
|
if (params.abortSignal?.aborted) {
|
|
abortManagedRun();
|
|
}
|
|
let result: Awaited<ReturnType<typeof managedRun.wait>>;
|
|
try {
|
|
result = await managedRun.wait();
|
|
} finally {
|
|
replyBackendCompleted = true;
|
|
if (replyBackendHandle) {
|
|
params.replyOperation?.detachBackend(replyBackendHandle);
|
|
}
|
|
params.abortSignal?.removeEventListener("abort", abortManagedRun);
|
|
}
|
|
streamingParser?.finish();
|
|
if (params.abortSignal?.aborted && result.reason === "manual-cancel") {
|
|
throw createCliAbortError();
|
|
}
|
|
|
|
const stdout = stdoutParseBuffer.toString("utf8").trim();
|
|
const stdoutDiagnostic = stdoutTail.toString("utf8").trim();
|
|
const stderr = stderrParseBuffer.toString("utf8").trim();
|
|
const stderrDiagnostic = stderrTail.toString("utf8").trim();
|
|
if (logOutputText) {
|
|
if (stdoutDiagnostic) {
|
|
cliBackendLog.info(`cli stdout:\n${stdoutDiagnostic}`);
|
|
}
|
|
if (stderrDiagnostic) {
|
|
cliBackendLog.info(`cli stderr:\n${stderrDiagnostic}`);
|
|
}
|
|
}
|
|
if (shouldLogVerbose()) {
|
|
if (stdoutDiagnostic) {
|
|
cliBackendLog.debug(`cli stdout:\n${stdoutDiagnostic}`);
|
|
}
|
|
if (stderrDiagnostic) {
|
|
cliBackendLog.debug(`cli stderr:\n${stderrDiagnostic}`);
|
|
}
|
|
}
|
|
|
|
if (result.exitCode !== 0 || result.reason !== "exit") {
|
|
if (result.reason === "no-output-timeout" || result.noOutputTimedOut) {
|
|
const timeoutReason = `CLI produced no output for ${Math.round(noOutputTimeoutMs / 1000)}s and was terminated.`;
|
|
cliBackendLog.warn(
|
|
`cli watchdog timeout: provider=${params.provider} model=${context.modelId} session=${resolvedSessionId ?? params.sessionId} noOutputTimeoutMs=${noOutputTimeoutMs} pid=${managedRun.pid ?? "unknown"}`,
|
|
);
|
|
if (params.sessionKey) {
|
|
const stallNotice = [
|
|
`CLI agent (${params.provider}) produced no output for ${Math.round(noOutputTimeoutMs / 1000)}s and was terminated.`,
|
|
"It may have been waiting for interactive input or an approval prompt.",
|
|
"For Claude Code, prefer --permission-mode bypassPermissions --print.",
|
|
].join(" ");
|
|
const watchdogMainKey = params.config?.session?.mainKey;
|
|
const watchdogScope = params.config?.session?.scope;
|
|
executeDeps.enqueueSystemEvent(stallNotice, {
|
|
sessionKey: resolveEventSessionKey(
|
|
params.sessionKey,
|
|
watchdogMainKey,
|
|
watchdogScope,
|
|
),
|
|
});
|
|
executeDeps.requestHeartbeat(
|
|
scopedHeartbeatWakeOptions(
|
|
params.sessionKey,
|
|
{
|
|
source: "cli-watchdog",
|
|
intent: "event",
|
|
reason: "cli:watchdog:stall",
|
|
},
|
|
watchdogMainKey,
|
|
watchdogScope,
|
|
),
|
|
);
|
|
}
|
|
throw new FailoverError(timeoutReason, {
|
|
reason: "timeout",
|
|
provider: params.provider,
|
|
model: context.modelId,
|
|
sessionId: params.sessionId,
|
|
lane: params.lane,
|
|
status: resolveFailoverStatus("timeout"),
|
|
});
|
|
}
|
|
if (result.reason === "overall-timeout") {
|
|
const timeoutReason = `CLI exceeded timeout (${Math.round(params.timeoutMs / 1000)}s) and was terminated.`;
|
|
throw new FailoverError(timeoutReason, {
|
|
reason: "timeout",
|
|
provider: params.provider,
|
|
model: context.modelId,
|
|
sessionId: params.sessionId,
|
|
lane: params.lane,
|
|
status: resolveFailoverStatus("timeout"),
|
|
});
|
|
}
|
|
const errorCandidates = [stderr, stdout, stderrDiagnostic, stdoutDiagnostic].filter(
|
|
(candidate) => candidate.length > 0,
|
|
);
|
|
const structuredError =
|
|
errorCandidates.map((candidate) => extractCliErrorMessage(candidate)).find(Boolean) ??
|
|
null;
|
|
let classifiedErrorText = structuredError;
|
|
let reason = structuredError
|
|
? classifyFailoverReason(structuredError, { provider: params.provider })
|
|
: null;
|
|
if (!reason) {
|
|
for (const candidate of errorCandidates) {
|
|
reason = classifyFailoverReason(candidate, { provider: params.provider });
|
|
if (reason) {
|
|
classifiedErrorText = candidate;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
const err = structuredError || classifiedErrorText || errorCandidates[0] || "CLI failed.";
|
|
reason = reason ?? "unknown";
|
|
const status = resolveFailoverStatus(reason);
|
|
throw new FailoverError(err, {
|
|
reason,
|
|
provider: params.provider,
|
|
model: context.modelId,
|
|
sessionId: params.sessionId,
|
|
lane: params.lane,
|
|
status,
|
|
});
|
|
}
|
|
|
|
const streamedJsonlOutput =
|
|
outputMode === "jsonl" ? (streamingParser?.getOutput() ?? null) : null;
|
|
|
|
if (stdoutParseExceeded && !streamedJsonlOutput) {
|
|
throw new FailoverError(
|
|
`CLI stdout exceeded ${CLI_RUNNER_OUTPUT_PARSE_BYTES} bytes; refusing to parse truncated output.`,
|
|
{
|
|
reason: "format",
|
|
provider: params.provider,
|
|
model: context.modelId,
|
|
sessionId: params.sessionId,
|
|
lane: params.lane,
|
|
status: resolveFailoverStatus("format"),
|
|
},
|
|
);
|
|
}
|
|
|
|
const parsed =
|
|
streamedJsonlOutput ??
|
|
parseCliOutput({
|
|
raw: stdout,
|
|
backend,
|
|
providerId: context.backendResolved.id,
|
|
outputMode,
|
|
fallbackSessionId: resolvedSessionId,
|
|
});
|
|
const rawText = parsed.text;
|
|
return {
|
|
...parsed,
|
|
rawText,
|
|
finalPromptText: prompt,
|
|
text: applyPluginTextReplacements(
|
|
rawText,
|
|
context.backendResolved.textTransforms?.output,
|
|
),
|
|
};
|
|
} finally {
|
|
restoreSkillEnv?.();
|
|
}
|
|
});
|
|
} finally {
|
|
if (!claudeSkillsPluginCleanupOwned) {
|
|
await claudeSkillsPlugin.cleanup();
|
|
}
|
|
if (systemPromptFile) {
|
|
await systemPromptFile.cleanup();
|
|
}
|
|
if (cleanupImages) {
|
|
await cleanupImages();
|
|
}
|
|
}
|
|
}
|