mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-07 22:44:16 +00:00
refactor(channels): dedupe hook and monitor execution paths
This commit is contained in:
@@ -25,7 +25,7 @@ import {
|
||||
import type { TranscriptPolicy } from "../transcript-policy.js";
|
||||
import { resolveTranscriptPolicy } from "../transcript-policy.js";
|
||||
import { log } from "./logger.js";
|
||||
import { dropThinkingBlocks } from "./thinking.js";
|
||||
import { dropThinkingBlocks, isAssistantMessageWithContent } from "./thinking.js";
|
||||
import { describeUnknownError } from "./utils.js";
|
||||
|
||||
const GOOGLE_TURN_ORDERING_CUSTOM_TYPE = "google-turn-ordering-bootstrap";
|
||||
@@ -73,15 +73,11 @@ export function sanitizeAntigravityThinkingBlocks(messages: AgentMessage[]): Age
|
||||
let touched = false;
|
||||
const out: AgentMessage[] = [];
|
||||
for (const msg of messages) {
|
||||
if (!msg || typeof msg !== "object" || msg.role !== "assistant") {
|
||||
if (!isAssistantMessageWithContent(msg)) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
const assistant = msg;
|
||||
if (!Array.isArray(assistant.content)) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
type AssistantContentBlock = Extract<AgentMessage, { role: "assistant" }>["content"][number];
|
||||
const nextContent: AssistantContentBlock[] = [];
|
||||
let contentChanged = false;
|
||||
|
||||
60
src/agents/pi-embedded-runner/thinking.test.ts
Normal file
60
src/agents/pi-embedded-runner/thinking.test.ts
Normal file
@@ -0,0 +1,60 @@
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { dropThinkingBlocks, isAssistantMessageWithContent } from "./thinking.js";
|
||||
|
||||
describe("isAssistantMessageWithContent", () => {
|
||||
it("accepts assistant messages with array content and rejects others", () => {
|
||||
const assistant = {
|
||||
role: "assistant",
|
||||
content: [{ type: "text", text: "ok" }],
|
||||
} as AgentMessage;
|
||||
const user = { role: "user", content: "hi" } as AgentMessage;
|
||||
const malformed = { role: "assistant", content: "not-array" } as unknown as AgentMessage;
|
||||
|
||||
expect(isAssistantMessageWithContent(assistant)).toBe(true);
|
||||
expect(isAssistantMessageWithContent(user)).toBe(false);
|
||||
expect(isAssistantMessageWithContent(malformed)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("dropThinkingBlocks", () => {
|
||||
it("returns the original reference when no thinking blocks are present", () => {
|
||||
const messages: AgentMessage[] = [
|
||||
{ role: "user", content: "hello" } as AgentMessage,
|
||||
{ role: "assistant", content: [{ type: "text", text: "world" }] } as AgentMessage,
|
||||
];
|
||||
|
||||
const result = dropThinkingBlocks(messages);
|
||||
expect(result).toBe(messages);
|
||||
});
|
||||
|
||||
it("drops thinking blocks while preserving non-thinking assistant content", () => {
|
||||
const messages: AgentMessage[] = [
|
||||
{
|
||||
role: "assistant",
|
||||
content: [
|
||||
{ type: "thinking", thinking: "internal" },
|
||||
{ type: "text", text: "final" },
|
||||
],
|
||||
} as unknown as AgentMessage,
|
||||
];
|
||||
|
||||
const result = dropThinkingBlocks(messages);
|
||||
const assistant = result[0] as Extract<AgentMessage, { role: "assistant" }>;
|
||||
expect(result).not.toBe(messages);
|
||||
expect(assistant.content).toEqual([{ type: "text", text: "final" }]);
|
||||
});
|
||||
|
||||
it("keeps assistant turn structure when all content blocks were thinking", () => {
|
||||
const messages: AgentMessage[] = [
|
||||
{
|
||||
role: "assistant",
|
||||
content: [{ type: "thinking", thinking: "internal-only" }],
|
||||
} as unknown as AgentMessage,
|
||||
];
|
||||
|
||||
const result = dropThinkingBlocks(messages);
|
||||
const assistant = result[0] as Extract<AgentMessage, { role: "assistant" }>;
|
||||
expect(assistant.content).toEqual([{ type: "text", text: "" }]);
|
||||
});
|
||||
});
|
||||
@@ -1,6 +1,16 @@
|
||||
import type { AgentMessage } from "@mariozechner/pi-agent-core";
|
||||
|
||||
type AssistantContentBlock = Extract<AgentMessage, { role: "assistant" }>["content"][number];
|
||||
type AssistantMessage = Extract<AgentMessage, { role: "assistant" }>;
|
||||
|
||||
export function isAssistantMessageWithContent(message: AgentMessage): message is AssistantMessage {
|
||||
return (
|
||||
!!message &&
|
||||
typeof message === "object" &&
|
||||
message.role === "assistant" &&
|
||||
Array.isArray(message.content)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Strip all `type: "thinking"` content blocks from assistant messages.
|
||||
@@ -16,11 +26,7 @@ export function dropThinkingBlocks(messages: AgentMessage[]): AgentMessage[] {
|
||||
let touched = false;
|
||||
const out: AgentMessage[] = [];
|
||||
for (const msg of messages) {
|
||||
if (!msg || typeof msg !== "object" || msg.role !== "assistant") {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
if (!Array.isArray(msg.content)) {
|
||||
if (!isAssistantMessageWithContent(msg)) {
|
||||
out.push(msg);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -91,6 +91,18 @@ async function applyGuardToContext(
|
||||
return await agent.transformContext?.(contextForNextCall, new AbortController().signal);
|
||||
}
|
||||
|
||||
function expectCompactedToolResultsWithoutContextNotice(
|
||||
contextForNextCall: AgentMessage[],
|
||||
oldIndex: number,
|
||||
newIndex: number,
|
||||
) {
|
||||
const oldResultText = getToolResultText(contextForNextCall[oldIndex]);
|
||||
const newResultText = getToolResultText(contextForNextCall[newIndex]);
|
||||
expect(oldResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER);
|
||||
expect(newResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER);
|
||||
expect(newResultText).not.toContain(CONTEXT_LIMIT_TRUNCATION_NOTICE);
|
||||
}
|
||||
|
||||
describe("installToolResultContextGuard", () => {
|
||||
it("compacts oldest-first when total context overflows, even if each result fits individually", async () => {
|
||||
const agent = makeGuardableAgent();
|
||||
@@ -98,12 +110,7 @@ describe("installToolResultContextGuard", () => {
|
||||
const transformed = await applyGuardToContext(agent, contextForNextCall);
|
||||
|
||||
expect(transformed).toBe(contextForNextCall);
|
||||
const oldResultText = getToolResultText(contextForNextCall[1]);
|
||||
const newResultText = getToolResultText(contextForNextCall[2]);
|
||||
|
||||
expect(oldResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER);
|
||||
expect(newResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER);
|
||||
expect(newResultText).not.toContain(CONTEXT_LIMIT_TRUNCATION_NOTICE);
|
||||
expectCompactedToolResultsWithoutContextNotice(contextForNextCall, 1, 2);
|
||||
});
|
||||
|
||||
it("keeps compacting oldest-first until context is back under budget", async () => {
|
||||
@@ -187,13 +194,7 @@ describe("installToolResultContextGuard", () => {
|
||||
];
|
||||
|
||||
await agent.transformContext?.(contextForNextCall, new AbortController().signal);
|
||||
|
||||
const oldResultText = getToolResultText(contextForNextCall[1]);
|
||||
const newResultText = getToolResultText(contextForNextCall[2]);
|
||||
|
||||
expect(oldResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER);
|
||||
expect(newResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER);
|
||||
expect(newResultText).not.toContain(CONTEXT_LIMIT_TRUNCATION_NOTICE);
|
||||
expectCompactedToolResultsWithoutContextNotice(contextForNextCall, 1, 2);
|
||||
});
|
||||
|
||||
it("wraps an existing transformContext and guards the transformed output", async () => {
|
||||
|
||||
@@ -428,20 +428,25 @@ export async function ensureChromeExtensionRelayServer(opts: {
|
||||
return;
|
||||
}
|
||||
|
||||
const activateMatch = path.match(/^\/json\/activate\/(.+)$/);
|
||||
if (activateMatch && (req.method === "GET" || req.method === "PUT")) {
|
||||
const targetId = decodeURIComponent(activateMatch[1] ?? "").trim();
|
||||
const handleTargetActionRoute = (
|
||||
match: RegExpMatchArray | null,
|
||||
cdpMethod: "Target.activateTarget" | "Target.closeTarget",
|
||||
): boolean => {
|
||||
if (!match || (req.method !== "GET" && req.method !== "PUT")) {
|
||||
return false;
|
||||
}
|
||||
const targetId = decodeURIComponent(match[1] ?? "").trim();
|
||||
if (!targetId) {
|
||||
res.writeHead(400);
|
||||
res.end("targetId required");
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
void (async () => {
|
||||
try {
|
||||
await sendToExtension({
|
||||
id: nextExtensionId++,
|
||||
method: "forwardCDPCommand",
|
||||
params: { method: "Target.activateTarget", params: { targetId } },
|
||||
params: { method: cdpMethod, params: { targetId } },
|
||||
});
|
||||
} catch {
|
||||
// ignore
|
||||
@@ -449,30 +454,13 @@ export async function ensureChromeExtensionRelayServer(opts: {
|
||||
})();
|
||||
res.writeHead(200);
|
||||
res.end("OK");
|
||||
return true;
|
||||
};
|
||||
|
||||
if (handleTargetActionRoute(path.match(/^\/json\/activate\/(.+)$/), "Target.activateTarget")) {
|
||||
return;
|
||||
}
|
||||
|
||||
const closeMatch = path.match(/^\/json\/close\/(.+)$/);
|
||||
if (closeMatch && (req.method === "GET" || req.method === "PUT")) {
|
||||
const targetId = decodeURIComponent(closeMatch[1] ?? "").trim();
|
||||
if (!targetId) {
|
||||
res.writeHead(400);
|
||||
res.end("targetId required");
|
||||
return;
|
||||
}
|
||||
void (async () => {
|
||||
try {
|
||||
await sendToExtension({
|
||||
id: nextExtensionId++,
|
||||
method: "forwardCDPCommand",
|
||||
params: { method: "Target.closeTarget", params: { targetId } },
|
||||
});
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
})();
|
||||
res.writeHead(200);
|
||||
res.end("OK");
|
||||
if (handleTargetActionRoute(path.match(/^\/json\/close\/(.+)$/), "Target.closeTarget")) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ import type { OpenClawConfig } from "../../config/config.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { enablePluginInConfig } from "../../plugins/enable.js";
|
||||
import { installPluginFromNpmSpec } from "../../plugins/install.js";
|
||||
import { recordPluginInstall } from "../../plugins/installs.js";
|
||||
import { buildNpmResolutionInstallFields, recordPluginInstall } from "../../plugins/installs.js";
|
||||
import { loadOpenClawPlugins } from "../../plugins/loader.js";
|
||||
import { createPluginLoaderLogger } from "../../plugins/logger.js";
|
||||
import type { RuntimeEnv } from "../../runtime.js";
|
||||
@@ -175,12 +175,7 @@ export async function ensureOnboardingPluginInstalled(params: {
|
||||
spec: entry.install.npmSpec,
|
||||
installPath: result.targetDir,
|
||||
version: result.version,
|
||||
resolvedName: result.npmResolution?.name,
|
||||
resolvedVersion: result.npmResolution?.version,
|
||||
resolvedSpec: result.npmResolution?.resolvedSpec,
|
||||
integrity: result.npmResolution?.integrity,
|
||||
shasum: result.npmResolution?.shasum,
|
||||
resolvedAt: result.npmResolution?.resolvedAt,
|
||||
...buildNpmResolutionInstallFields(result.npmResolution),
|
||||
});
|
||||
return { cfg: next, installed: true };
|
||||
}
|
||||
|
||||
67
src/discord/monitor/model-picker-preferences.test.ts
Normal file
67
src/discord/monitor/model-picker-preferences.test.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import fs from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import {
|
||||
readDiscordModelPickerRecentModels,
|
||||
recordDiscordModelPickerRecentModel,
|
||||
} from "./model-picker-preferences.js";
|
||||
|
||||
const tempDirs: string[] = [];
|
||||
|
||||
async function createStateEnv(): Promise<NodeJS.ProcessEnv> {
|
||||
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-model-picker-"));
|
||||
tempDirs.push(dir);
|
||||
return { ...process.env, OPENCLAW_STATE_DIR: dir };
|
||||
}
|
||||
|
||||
afterEach(async () => {
|
||||
await Promise.all(
|
||||
tempDirs.splice(0).map(async (dir) => {
|
||||
await fs.rm(dir, { recursive: true, force: true });
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
describe("discord model picker preferences", () => {
|
||||
it("records recent models in recency order without duplicates", async () => {
|
||||
const env = await createStateEnv();
|
||||
const scope = { userId: "123" };
|
||||
|
||||
await recordDiscordModelPickerRecentModel({ env, scope, modelRef: "openai/gpt-4o" });
|
||||
await recordDiscordModelPickerRecentModel({ env, scope, modelRef: "openai/gpt-4.1" });
|
||||
await recordDiscordModelPickerRecentModel({ env, scope, modelRef: "openai/gpt-4o" });
|
||||
|
||||
const recent = await readDiscordModelPickerRecentModels({ env, scope });
|
||||
expect(recent).toEqual(["openai/gpt-4o", "openai/gpt-4.1"]);
|
||||
});
|
||||
|
||||
it("filters recent models using an allowlist", async () => {
|
||||
const env = await createStateEnv();
|
||||
const scope = { userId: "456" };
|
||||
|
||||
await recordDiscordModelPickerRecentModel({ env, scope, modelRef: "openai/gpt-4o" });
|
||||
await recordDiscordModelPickerRecentModel({ env, scope, modelRef: "openai/gpt-4.1" });
|
||||
|
||||
const recent = await readDiscordModelPickerRecentModels({
|
||||
env,
|
||||
scope,
|
||||
allowedModelRefs: new Set(["openai/gpt-4.1"]),
|
||||
});
|
||||
expect(recent).toEqual(["openai/gpt-4.1"]);
|
||||
});
|
||||
|
||||
it("falls back to an empty store when the file is corrupt", async () => {
|
||||
const env = await createStateEnv();
|
||||
const stateDir = env.OPENCLAW_STATE_DIR as string;
|
||||
const filePath = path.join(stateDir, "discord", "model-picker-preferences.json");
|
||||
await fs.mkdir(path.dirname(filePath), { recursive: true });
|
||||
await fs.writeFile(filePath, "{not-json", "utf-8");
|
||||
|
||||
const recent = await readDiscordModelPickerRecentModels({
|
||||
env,
|
||||
scope: { userId: "789" },
|
||||
});
|
||||
expect(recent).toEqual([]);
|
||||
});
|
||||
});
|
||||
@@ -1,11 +1,10 @@
|
||||
import crypto from "node:crypto";
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { normalizeProviderId } from "../../agents/model-selection.js";
|
||||
import { resolveStateDir } from "../../config/paths.js";
|
||||
import { withFileLock } from "../../infra/file-lock.js";
|
||||
import { resolveRequiredHomeDir } from "../../infra/home-dir.js";
|
||||
import { readJsonFileWithFallback, writeJsonFileAtomically } from "../../plugin-sdk/json-store.js";
|
||||
import { normalizeAccountId as normalizeSharedAccountId } from "../../routing/account-id.js";
|
||||
|
||||
const MODEL_PICKER_PREFERENCES_LOCK_OPTIONS = {
|
||||
@@ -95,32 +94,6 @@ function sanitizeRecentModels(models: string[] | undefined, limit: number): stri
|
||||
return deduped;
|
||||
}
|
||||
|
||||
async function readJsonFileWithFallback<T>(
|
||||
filePath: string,
|
||||
fallback: T,
|
||||
): Promise<{ value: T; exists: boolean }> {
|
||||
try {
|
||||
const raw = await fs.promises.readFile(filePath, "utf-8");
|
||||
const parsed = JSON.parse(raw) as T;
|
||||
return { value: parsed, exists: true };
|
||||
} catch (err) {
|
||||
const code = (err as { code?: string }).code;
|
||||
if (code === "ENOENT") {
|
||||
return { value: fallback, exists: false };
|
||||
}
|
||||
return { value: fallback, exists: false };
|
||||
}
|
||||
}
|
||||
|
||||
async function writeJsonFileAtomically(filePath: string, value: unknown): Promise<void> {
|
||||
const dir = path.dirname(filePath);
|
||||
await fs.promises.mkdir(dir, { recursive: true, mode: 0o700 });
|
||||
const tmp = path.join(dir, `${path.basename(filePath)}.${crypto.randomUUID()}.tmp`);
|
||||
await fs.promises.writeFile(tmp, `${JSON.stringify(value, null, 2)}\n`, "utf-8");
|
||||
await fs.promises.chmod(tmp, 0o600);
|
||||
await fs.promises.rename(tmp, filePath);
|
||||
}
|
||||
|
||||
async function readPreferencesStore(filePath: string): Promise<ModelPickerPreferencesStore> {
|
||||
const { value } = await readJsonFileWithFallback<ModelPickerPreferencesStore>(filePath, {
|
||||
version: 1,
|
||||
|
||||
@@ -233,6 +233,11 @@ export type HookAgentPayload = {
|
||||
timeoutSeconds?: number;
|
||||
};
|
||||
|
||||
export type HookAgentDispatchPayload = Omit<HookAgentPayload, "sessionKey"> & {
|
||||
sessionKey: string;
|
||||
allowUnsafeExternalContent?: boolean;
|
||||
};
|
||||
|
||||
const listHookChannelValues = () => ["last", ...listChannelPlugins().map((plugin) => plugin.id)];
|
||||
|
||||
export type HookMessageChannel = ChannelId | "last";
|
||||
|
||||
@@ -42,7 +42,7 @@ import {
|
||||
extractHookToken,
|
||||
getHookAgentPolicyError,
|
||||
getHookChannelError,
|
||||
type HookMessageChannel,
|
||||
type HookAgentDispatchPayload,
|
||||
type HooksConfigResolved,
|
||||
isHookAgentAllowed,
|
||||
normalizeAgentPayload,
|
||||
@@ -69,20 +69,7 @@ const HOOK_AUTH_FAILURE_WINDOW_MS = 60_000;
|
||||
|
||||
type HookDispatchers = {
|
||||
dispatchWakeHook: (value: { text: string; mode: "now" | "next-heartbeat" }) => void;
|
||||
dispatchAgentHook: (value: {
|
||||
message: string;
|
||||
name: string;
|
||||
agentId?: string;
|
||||
wakeMode: "now" | "next-heartbeat";
|
||||
sessionKey: string;
|
||||
deliver: boolean;
|
||||
channel: HookMessageChannel;
|
||||
to?: string;
|
||||
model?: string;
|
||||
thinking?: string;
|
||||
timeoutSeconds?: number;
|
||||
allowUnsafeExternalContent?: boolean;
|
||||
}) => string;
|
||||
dispatchAgentHook: (value: HookAgentDispatchPayload) => string;
|
||||
};
|
||||
|
||||
function sendJson(res: ServerResponse, status: number, body: unknown) {
|
||||
|
||||
@@ -7,7 +7,7 @@ import type { CronJob } from "../../cron/types.js";
|
||||
import { requestHeartbeatNow } from "../../infra/heartbeat-wake.js";
|
||||
import { enqueueSystemEvent } from "../../infra/system-events.js";
|
||||
import type { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import type { HookMessageChannel, HooksConfigResolved } from "../hooks.js";
|
||||
import type { HookAgentDispatchPayload, HooksConfigResolved } from "../hooks.js";
|
||||
import { createHooksRequestHandler } from "../server-http.js";
|
||||
|
||||
type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
|
||||
@@ -29,20 +29,7 @@ export function createGatewayHooksRequestHandler(params: {
|
||||
}
|
||||
};
|
||||
|
||||
const dispatchAgentHook = (value: {
|
||||
message: string;
|
||||
name: string;
|
||||
agentId?: string;
|
||||
wakeMode: "now" | "next-heartbeat";
|
||||
sessionKey: string;
|
||||
deliver: boolean;
|
||||
channel: HookMessageChannel;
|
||||
to?: string;
|
||||
model?: string;
|
||||
thinking?: string;
|
||||
timeoutSeconds?: number;
|
||||
allowUnsafeExternalContent?: boolean;
|
||||
}) => {
|
||||
const dispatchAgentHook = (value: HookAgentDispatchPayload) => {
|
||||
const sessionKey = value.sessionKey.trim();
|
||||
const mainSessionKey = resolveMainSessionKeyFromConfig();
|
||||
const jobId = randomUUID();
|
||||
|
||||
@@ -3,7 +3,7 @@ import type { Request, Response, NextFunction } from "express";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import { logVerbose } from "../globals.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import { createNonExitingRuntime, type RuntimeEnv } from "../runtime.js";
|
||||
import { resolveLineAccount } from "./accounts.js";
|
||||
import { handleLineWebhookEvents } from "./bot-handlers.js";
|
||||
import type { LineInboundContext } from "./bot-message-context.js";
|
||||
@@ -26,13 +26,7 @@ export interface LineBot {
|
||||
}
|
||||
|
||||
export function createLineBot(opts: LineBotOptions): LineBot {
|
||||
const runtime: RuntimeEnv = opts.runtime ?? {
|
||||
log: console.log,
|
||||
error: console.error,
|
||||
exit: (code: number): never => {
|
||||
throw new Error(`exit ${code}`);
|
||||
},
|
||||
};
|
||||
const runtime: RuntimeEnv = opts.runtime ?? createNonExitingRuntime();
|
||||
|
||||
const cfg = opts.config ?? loadConfig();
|
||||
const account = resolveLineAccount({
|
||||
|
||||
@@ -172,6 +172,21 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
|
||||
return next;
|
||||
};
|
||||
|
||||
const handleHookError = (params: {
|
||||
hookName: PluginHookName;
|
||||
pluginId: string;
|
||||
error: unknown;
|
||||
}): never | void => {
|
||||
const msg = `[hooks] ${params.hookName} handler from ${params.pluginId} failed: ${String(
|
||||
params.error,
|
||||
)}`;
|
||||
if (catchErrors) {
|
||||
logger?.error(msg);
|
||||
return;
|
||||
}
|
||||
throw new Error(msg, { cause: params.error });
|
||||
};
|
||||
|
||||
/**
|
||||
* Run a hook that doesn't return a value (fire-and-forget style).
|
||||
* All handlers are executed in parallel for performance.
|
||||
@@ -192,12 +207,7 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
|
||||
try {
|
||||
await (hook.handler as (event: unknown, ctx: unknown) => Promise<void>)(event, ctx);
|
||||
} catch (err) {
|
||||
const msg = `[hooks] ${hookName} handler from ${hook.pluginId} failed: ${String(err)}`;
|
||||
if (catchErrors) {
|
||||
logger?.error(msg);
|
||||
} else {
|
||||
throw new Error(msg, { cause: err });
|
||||
}
|
||||
handleHookError({ hookName, pluginId: hook.pluginId, error: err });
|
||||
}
|
||||
});
|
||||
|
||||
@@ -237,12 +247,7 @@ export function createHookRunner(registry: PluginRegistry, options: HookRunnerOp
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
const msg = `[hooks] ${hookName} handler from ${hook.pluginId} failed: ${String(err)}`;
|
||||
if (catchErrors) {
|
||||
logger?.error(msg);
|
||||
} else {
|
||||
throw new Error(msg, { cause: err });
|
||||
}
|
||||
handleHookError({ hookName, pluginId: hook.pluginId, error: err });
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
45
src/plugins/installs.test.ts
Normal file
45
src/plugins/installs.test.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { buildNpmResolutionInstallFields, recordPluginInstall } from "./installs.js";
|
||||
|
||||
describe("buildNpmResolutionInstallFields", () => {
|
||||
it("maps npm resolution metadata into install record fields", () => {
|
||||
const fields = buildNpmResolutionInstallFields({
|
||||
name: "@openclaw/demo",
|
||||
version: "1.2.3",
|
||||
resolvedSpec: "@openclaw/demo@1.2.3",
|
||||
integrity: "sha512-abc",
|
||||
shasum: "deadbeef",
|
||||
resolvedAt: "2026-02-22T00:00:00.000Z",
|
||||
});
|
||||
expect(fields).toEqual({
|
||||
resolvedName: "@openclaw/demo",
|
||||
resolvedVersion: "1.2.3",
|
||||
resolvedSpec: "@openclaw/demo@1.2.3",
|
||||
integrity: "sha512-abc",
|
||||
shasum: "deadbeef",
|
||||
resolvedAt: "2026-02-22T00:00:00.000Z",
|
||||
});
|
||||
});
|
||||
|
||||
it("returns undefined fields when resolution is missing", () => {
|
||||
expect(buildNpmResolutionInstallFields(undefined)).toEqual({
|
||||
resolvedName: undefined,
|
||||
resolvedVersion: undefined,
|
||||
resolvedSpec: undefined,
|
||||
integrity: undefined,
|
||||
shasum: undefined,
|
||||
resolvedAt: undefined,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("recordPluginInstall", () => {
|
||||
it("stores install metadata for the plugin id", () => {
|
||||
const next = recordPluginInstall({}, { pluginId: "demo", source: "npm", spec: "demo@latest" });
|
||||
expect(next.plugins?.installs?.demo).toMatchObject({
|
||||
source: "npm",
|
||||
spec: "demo@latest",
|
||||
});
|
||||
expect(typeof next.plugins?.installs?.demo?.installedAt).toBe("string");
|
||||
});
|
||||
});
|
||||
@@ -1,8 +1,25 @@
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import type { PluginInstallRecord } from "../config/types.plugins.js";
|
||||
import type { NpmSpecResolution } from "../infra/install-source-utils.js";
|
||||
|
||||
export type PluginInstallUpdate = PluginInstallRecord & { pluginId: string };
|
||||
|
||||
export function buildNpmResolutionInstallFields(
|
||||
resolution?: NpmSpecResolution,
|
||||
): Pick<
|
||||
PluginInstallRecord,
|
||||
"resolvedName" | "resolvedVersion" | "resolvedSpec" | "integrity" | "shasum" | "resolvedAt"
|
||||
> {
|
||||
return {
|
||||
resolvedName: resolution?.name,
|
||||
resolvedVersion: resolution?.version,
|
||||
resolvedSpec: resolution?.resolvedSpec,
|
||||
integrity: resolution?.integrity,
|
||||
shasum: resolution?.shasum,
|
||||
resolvedAt: resolution?.resolvedAt,
|
||||
};
|
||||
}
|
||||
|
||||
export function recordPluginInstall(
|
||||
cfg: OpenClawConfig,
|
||||
update: PluginInstallUpdate,
|
||||
|
||||
@@ -175,6 +175,31 @@ function createPluginRecord(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function recordPluginError(params: {
|
||||
logger: PluginLogger;
|
||||
registry: PluginRegistry;
|
||||
record: PluginRecord;
|
||||
seenIds: Map<string, PluginRecord["origin"]>;
|
||||
pluginId: string;
|
||||
origin: PluginRecord["origin"];
|
||||
error: unknown;
|
||||
logPrefix: string;
|
||||
diagnosticMessagePrefix: string;
|
||||
}) {
|
||||
const errorText = String(params.error);
|
||||
params.logger.error(`${params.logPrefix}${errorText}`);
|
||||
params.record.status = "error";
|
||||
params.record.error = errorText;
|
||||
params.registry.plugins.push(params.record);
|
||||
params.seenIds.set(params.pluginId, params.origin);
|
||||
params.registry.diagnostics.push({
|
||||
level: "error",
|
||||
pluginId: params.record.id,
|
||||
source: params.record.source,
|
||||
message: `${params.diagnosticMessagePrefix}${errorText}`,
|
||||
});
|
||||
}
|
||||
|
||||
function pushDiagnostics(diagnostics: PluginDiagnostic[], append: PluginDiagnostic[]) {
|
||||
diagnostics.push(...append);
|
||||
}
|
||||
@@ -508,16 +533,16 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi
|
||||
try {
|
||||
mod = getJiti()(candidate.source) as OpenClawPluginModule;
|
||||
} catch (err) {
|
||||
logger.error(`[plugins] ${record.id} failed to load from ${record.source}: ${String(err)}`);
|
||||
record.status = "error";
|
||||
record.error = String(err);
|
||||
registry.plugins.push(record);
|
||||
seenIds.set(pluginId, candidate.origin);
|
||||
registry.diagnostics.push({
|
||||
level: "error",
|
||||
pluginId: record.id,
|
||||
source: record.source,
|
||||
message: `failed to load plugin: ${String(err)}`,
|
||||
recordPluginError({
|
||||
logger,
|
||||
registry,
|
||||
record,
|
||||
seenIds,
|
||||
pluginId,
|
||||
origin: candidate.origin,
|
||||
error: err,
|
||||
logPrefix: `[plugins] ${record.id} failed to load from ${record.source}: `,
|
||||
diagnosticMessagePrefix: "failed to load plugin: ",
|
||||
});
|
||||
continue;
|
||||
}
|
||||
@@ -634,18 +659,16 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi
|
||||
registry.plugins.push(record);
|
||||
seenIds.set(pluginId, candidate.origin);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`[plugins] ${record.id} failed during register from ${record.source}: ${String(err)}`,
|
||||
);
|
||||
record.status = "error";
|
||||
record.error = String(err);
|
||||
registry.plugins.push(record);
|
||||
seenIds.set(pluginId, candidate.origin);
|
||||
registry.diagnostics.push({
|
||||
level: "error",
|
||||
pluginId: record.id,
|
||||
source: record.source,
|
||||
message: `plugin failed during register: ${String(err)}`,
|
||||
recordPluginError({
|
||||
logger,
|
||||
registry,
|
||||
record,
|
||||
seenIds,
|
||||
pluginId,
|
||||
origin: candidate.origin,
|
||||
error: err,
|
||||
logPrefix: `[plugins] ${record.id} failed during register from ${record.source}: `,
|
||||
diagnosticMessagePrefix: "plugin failed during register: ",
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ import type { UpdateChannel } from "../infra/update-channels.js";
|
||||
import { resolveUserPath } from "../utils.js";
|
||||
import { discoverOpenClawPlugins } from "./discovery.js";
|
||||
import { installPluginFromNpmSpec, resolvePluginInstallDir } from "./install.js";
|
||||
import { recordPluginInstall } from "./installs.js";
|
||||
import { buildNpmResolutionInstallFields, recordPluginInstall } from "./installs.js";
|
||||
import { loadPluginManifest } from "./manifest.js";
|
||||
|
||||
export type PluginUpdateLogger = {
|
||||
@@ -344,12 +344,7 @@ export async function updateNpmInstalledPlugins(params: {
|
||||
spec: record.spec,
|
||||
installPath: result.targetDir,
|
||||
version: nextVersion,
|
||||
resolvedName: result.npmResolution?.name,
|
||||
resolvedVersion: result.npmResolution?.version,
|
||||
resolvedSpec: result.npmResolution?.resolvedSpec,
|
||||
integrity: result.npmResolution?.integrity,
|
||||
shasum: result.npmResolution?.shasum,
|
||||
resolvedAt: result.npmResolution?.resolvedAt,
|
||||
...buildNpmResolutionInstallFields(result.npmResolution),
|
||||
});
|
||||
changed = true;
|
||||
|
||||
@@ -473,12 +468,7 @@ export async function syncPluginsForUpdateChannel(params: {
|
||||
spec,
|
||||
installPath: result.targetDir,
|
||||
version: result.version,
|
||||
resolvedName: result.npmResolution?.name,
|
||||
resolvedVersion: result.npmResolution?.version,
|
||||
resolvedSpec: result.npmResolution?.resolvedSpec,
|
||||
integrity: result.npmResolution?.integrity,
|
||||
shasum: result.npmResolution?.shasum,
|
||||
resolvedAt: result.npmResolution?.resolvedAt,
|
||||
...buildNpmResolutionInstallFields(result.npmResolution),
|
||||
sourcePath: undefined,
|
||||
});
|
||||
summary.switchedToNpm.push(pluginId);
|
||||
|
||||
@@ -23,7 +23,7 @@ import { danger, logVerbose, shouldLogVerbose } from "../globals.js";
|
||||
import { formatUncaughtError } from "../infra/errors.js";
|
||||
import { getChildLogger } from "../logging.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import { createNonExitingRuntime, type RuntimeEnv } from "../runtime.js";
|
||||
import { resolveTelegramAccount } from "./accounts.js";
|
||||
import { registerTelegramHandlers } from "./bot-handlers.js";
|
||||
import { createTelegramMessageProcessor } from "./bot-message.js";
|
||||
@@ -113,13 +113,7 @@ export function getTelegramSequentialKey(ctx: {
|
||||
}
|
||||
|
||||
export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
const runtime: RuntimeEnv = opts.runtime ?? {
|
||||
log: console.log,
|
||||
error: console.error,
|
||||
exit: (code: number): never => {
|
||||
throw new Error(`exit ${code}`);
|
||||
},
|
||||
};
|
||||
const runtime: RuntimeEnv = opts.runtime ?? createNonExitingRuntime();
|
||||
const cfg = opts.config ?? loadConfig();
|
||||
const account = resolveTelegramAccount({
|
||||
cfg,
|
||||
|
||||
@@ -184,6 +184,31 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
let restartAttempts = 0;
|
||||
let webhookCleared = false;
|
||||
const runnerOptions = createTelegramRunnerOptions(cfg);
|
||||
const waitBeforeRetryOnRecoverableSetupError = async (
|
||||
err: unknown,
|
||||
logPrefix: string,
|
||||
): Promise<boolean> => {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return false;
|
||||
}
|
||||
if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) {
|
||||
throw err;
|
||||
}
|
||||
restartAttempts += 1;
|
||||
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
||||
(opts.runtime?.error ?? console.error)(
|
||||
`${logPrefix}: ${formatErrorMessage(err)}; retrying in ${formatDurationPrecise(delayMs)}.`,
|
||||
);
|
||||
try {
|
||||
await sleepWithAbort(delayMs, opts.abortSignal);
|
||||
} catch (sleepErr) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return false;
|
||||
}
|
||||
throw sleepErr;
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
while (!opts.abortSignal?.aborted) {
|
||||
let bot;
|
||||
@@ -200,24 +225,12 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) {
|
||||
throw err;
|
||||
}
|
||||
restartAttempts += 1;
|
||||
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
||||
(opts.runtime?.error ?? console.error)(
|
||||
`Telegram setup network error: ${formatErrorMessage(err)}; retrying in ${formatDurationPrecise(delayMs)}.`,
|
||||
const shouldRetry = await waitBeforeRetryOnRecoverableSetupError(
|
||||
err,
|
||||
"Telegram setup network error",
|
||||
);
|
||||
try {
|
||||
await sleepWithAbort(delayMs, opts.abortSignal);
|
||||
} catch (sleepErr) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
throw sleepErr;
|
||||
if (!shouldRetry) {
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -231,24 +244,12 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
});
|
||||
webhookCleared = true;
|
||||
} catch (err) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) {
|
||||
throw err;
|
||||
}
|
||||
restartAttempts += 1;
|
||||
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
||||
(opts.runtime?.error ?? console.error)(
|
||||
`Telegram webhook cleanup failed: ${formatErrorMessage(err)}; retrying in ${formatDurationPrecise(delayMs)}.`,
|
||||
const shouldRetry = await waitBeforeRetryOnRecoverableSetupError(
|
||||
err,
|
||||
"Telegram webhook cleanup failed",
|
||||
);
|
||||
try {
|
||||
await sleepWithAbort(delayMs, opts.abortSignal);
|
||||
} catch (sleepErr) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
throw sleepErr;
|
||||
if (!shouldRetry) {
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user