diff --git a/extensions/matrix/src/matrix/send-queue.ts b/extensions/matrix/src/matrix/send-queue.ts index daf5e40931e..4bad4878f90 100644 --- a/extensions/matrix/src/matrix/send-queue.ts +++ b/extensions/matrix/src/matrix/send-queue.ts @@ -1,3 +1,5 @@ +import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue"; + export const DEFAULT_SEND_GAP_MS = 150; type MatrixSendQueueOptions = { @@ -6,37 +8,19 @@ type MatrixSendQueueOptions = { }; // Serialize sends per room to preserve Matrix delivery order. -const roomQueues = new Map>(); +const roomQueues = new KeyedAsyncQueue(); -export async function enqueueSend( +export function enqueueSend( roomId: string, fn: () => Promise, options?: MatrixSendQueueOptions, ): Promise { const gapMs = options?.gapMs ?? DEFAULT_SEND_GAP_MS; const delayFn = options?.delayFn ?? delay; - const previous = roomQueues.get(roomId) ?? Promise.resolve(); - - const next = previous - .catch(() => {}) - .then(async () => { - await delayFn(gapMs); - return await fn(); - }); - - const queueMarker = next.then( - () => {}, - () => {}, - ); - roomQueues.set(roomId, queueMarker); - - queueMarker.finally(() => { - if (roomQueues.get(roomId) === queueMarker) { - roomQueues.delete(roomId); - } + return roomQueues.enqueue(roomId, async () => { + await delayFn(gapMs); + return await fn(); }); - - return await next; } function delay(ms: number): Promise { diff --git a/package.json b/package.json index 92a23da970c..92c3e723f60 100644 --- a/package.json +++ b/package.json @@ -44,6 +44,10 @@ "types": "./dist/plugin-sdk/account-id.d.ts", "default": "./dist/plugin-sdk/account-id.js" }, + "./plugin-sdk/keyed-async-queue": { + "types": "./dist/plugin-sdk/keyed-async-queue.d.ts", + "default": "./dist/plugin-sdk/keyed-async-queue.js" + }, "./cli-entry": "./openclaw.mjs" }, "scripts": { diff --git a/src/acp/control-plane/session-actor-queue.ts b/src/acp/control-plane/session-actor-queue.ts index 67dd6119a3b..7112d7421e3 100644 --- a/src/acp/control-plane/session-actor-queue.ts +++ b/src/acp/control-plane/session-actor-queue.ts @@ -1,9 +1,11 @@ +import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js"; + export class SessionActorQueue { - private readonly tailBySession = new Map>(); + private readonly queue = new KeyedAsyncQueue(); private readonly pendingBySession = new Map(); getTailMapForTesting(): Map> { - return this.tailBySession; + return this.queue.getTailMapForTesting(); } getTotalPendingCount(): number { @@ -19,35 +21,18 @@ export class SessionActorQueue { } async run(actorKey: string, op: () => Promise): Promise { - const previous = this.tailBySession.get(actorKey) ?? Promise.resolve(); - this.pendingBySession.set(actorKey, (this.pendingBySession.get(actorKey) ?? 0) + 1); - let release: () => void = () => {}; - const marker = new Promise((resolve) => { - release = resolve; + return this.queue.enqueue(actorKey, op, { + onEnqueue: () => { + this.pendingBySession.set(actorKey, (this.pendingBySession.get(actorKey) ?? 0) + 1); + }, + onSettle: () => { + const pending = (this.pendingBySession.get(actorKey) ?? 1) - 1; + if (pending <= 0) { + this.pendingBySession.delete(actorKey); + } else { + this.pendingBySession.set(actorKey, pending); + } + }, }); - const queuedTail = previous - .catch(() => { - // Keep actor queue alive after an operation failure. - }) - .then(() => marker); - this.tailBySession.set(actorKey, queuedTail); - - await previous.catch(() => { - // Previous failures should not block newer commands. - }); - try { - return await op(); - } finally { - const pending = (this.pendingBySession.get(actorKey) ?? 1) - 1; - if (pending <= 0) { - this.pendingBySession.delete(actorKey); - } else { - this.pendingBySession.set(actorKey, pending); - } - release(); - if (this.tailBySession.get(actorKey) === queuedTail) { - this.tailBySession.delete(actorKey); - } - } } } diff --git a/src/agents/cli-runner/helpers.ts b/src/agents/cli-runner/helpers.ts index dbabca75faa..96ec35540be 100644 --- a/src/agents/cli-runner/helpers.ts +++ b/src/agents/cli-runner/helpers.ts @@ -7,6 +7,7 @@ import type { ImageContent } from "@mariozechner/pi-ai"; import type { ThinkLevel } from "../../auto-reply/thinking.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { CliBackendConfig } from "../../config/types.js"; +import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js"; import { buildTtsSystemPromptHint } from "../../tts/tts.js"; import { isRecord } from "../../utils.js"; import { buildModelAliasLines } from "../model-alias-lines.js"; @@ -18,20 +19,9 @@ import { buildSystemPromptParams } from "../system-prompt-params.js"; import { buildAgentSystemPrompt } from "../system-prompt.js"; export { buildCliSupervisorScopeKey, resolveCliNoOutputTimeoutMs } from "./reliability.js"; -const CLI_RUN_QUEUE = new Map>(); +const CLI_RUN_QUEUE = new KeyedAsyncQueue(); export function enqueueCliRun(key: string, task: () => Promise): Promise { - const prior = CLI_RUN_QUEUE.get(key) ?? Promise.resolve(); - const chained = prior.catch(() => undefined).then(task); - // Keep queue continuity even when a run rejects, without emitting unhandled rejections. - const tracked = chained - .catch(() => undefined) - .finally(() => { - if (CLI_RUN_QUEUE.get(key) === tracked) { - CLI_RUN_QUEUE.delete(key); - } - }); - CLI_RUN_QUEUE.set(key, tracked); - return chained; + return CLI_RUN_QUEUE.enqueue(key, task); } type CliUsage = { diff --git a/src/config/backup-rotation.ts b/src/config/backup-rotation.ts index 1883cde2045..7c0aae66fe6 100644 --- a/src/config/backup-rotation.ts +++ b/src/config/backup-rotation.ts @@ -9,6 +9,10 @@ export interface BackupRotationFs { readdir?: (path: string) => Promise; } +export interface BackupMaintenanceFs extends BackupRotationFs { + copyFile: (from: string, to: string) => Promise; +} + export async function rotateConfigBackups( configPath: string, ioFs: BackupRotationFs, @@ -103,3 +107,19 @@ export async function cleanOrphanBackups( }); } } + +/** + * Run the full backup maintenance cycle around config writes. + * Order matters: rotate ring -> create new .bak -> harden modes -> prune orphan .bak.* files. + */ +export async function maintainConfigBackups( + configPath: string, + ioFs: BackupMaintenanceFs, +): Promise { + await rotateConfigBackups(configPath, ioFs); + await ioFs.copyFile(configPath, `${configPath}.bak`).catch(() => { + // best-effort + }); + await hardenBackupPermissions(configPath, ioFs); + await cleanOrphanBackups(configPath, ioFs); +} diff --git a/src/config/config.backup-rotation.test.ts b/src/config/config.backup-rotation.test.ts index 8e9159beef3..4eb4bb2f3f3 100644 --- a/src/config/config.backup-rotation.test.ts +++ b/src/config/config.backup-rotation.test.ts @@ -2,6 +2,7 @@ import fs from "node:fs/promises"; import path from "node:path"; import { describe, expect, it } from "vitest"; import { + maintainConfigBackups, rotateConfigBackups, hardenBackupPermissions, cleanOrphanBackups, @@ -112,4 +113,31 @@ describe("config backup rotation", () => { await expect(fs.readFile(configPath, "utf-8")).resolves.toBe("current"); }); }); + + it("maintainConfigBackups composes rotate/copy/harden/prune flow", async () => { + await withTempHome(async () => { + const stateDir = process.env.OPENCLAW_STATE_DIR?.trim(); + if (!stateDir) { + throw new Error("Expected OPENCLAW_STATE_DIR to be set by withTempHome"); + } + const configPath = path.join(stateDir, "openclaw.json"); + await fs.writeFile(configPath, JSON.stringify({ token: "secret" }), { mode: 0o600 }); + await fs.writeFile(`${configPath}.bak`, "previous", { mode: 0o644 }); + await fs.writeFile(`${configPath}.bak.orphan`, "old"); + + await maintainConfigBackups(configPath, fs); + + // A new primary backup is created from the current config. + await expect(fs.readFile(`${configPath}.bak`, "utf-8")).resolves.toBe( + JSON.stringify({ token: "secret" }), + ); + // Prior primary backup gets rotated into ring slot 1. + await expect(fs.readFile(`${configPath}.bak.1`, "utf-8")).resolves.toBe("previous"); + // Mode hardening still applies. + const primaryBackupStat = await fs.stat(`${configPath}.bak`); + expect(primaryBackupStat.mode & 0o777).toBe(0o600); + // Out-of-ring orphan gets pruned. + await expect(fs.stat(`${configPath}.bak.orphan`)).rejects.toThrow(); + }); + }); }); diff --git a/src/config/io.ts b/src/config/io.ts index 3bec12c3a11..a2a2af5d1b5 100644 --- a/src/config/io.ts +++ b/src/config/io.ts @@ -15,11 +15,7 @@ import { } from "../infra/shell-env.js"; import { VERSION } from "../version.js"; import { DuplicateAgentDirError, findDuplicateAgentDirs } from "./agent-dirs.js"; -import { - rotateConfigBackups, - hardenBackupPermissions, - cleanOrphanBackups, -} from "./backup-rotation.js"; +import { maintainConfigBackups } from "./backup-rotation.js"; import { applyCompactionDefaults, applyContextPruningDefaults, @@ -1245,12 +1241,7 @@ export function createConfigIO(overrides: ConfigIoDeps = {}) { }); if (deps.fs.existsSync(configPath)) { - await rotateConfigBackups(configPath, deps.fs.promises); - await deps.fs.promises.copyFile(configPath, `${configPath}.bak`).catch(() => { - // best-effort - }); - await hardenBackupPermissions(configPath, deps.fs.promises); - await cleanOrphanBackups(configPath, deps.fs.promises); + await maintainConfigBackups(configPath, deps.fs.promises); } try { diff --git a/src/discord/monitor/listeners.ts b/src/discord/monitor/listeners.ts index 516b863e466..e8b1cf40cf9 100644 --- a/src/discord/monitor/listeners.ts +++ b/src/discord/monitor/listeners.ts @@ -11,6 +11,7 @@ import { danger, logVerbose } from "../../globals.js"; import { formatDurationSeconds } from "../../infra/format-time/format-duration.ts"; import { enqueueSystemEvent } from "../../infra/system-events.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; +import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js"; import { resolveAgentRoute } from "../../routing/resolve-route.js"; import { readStoreAllowFromForDmPolicy, @@ -119,7 +120,7 @@ export function registerDiscordListener(listeners: Array, listener: obje } export class DiscordMessageListener extends MessageCreateListener { - private channelQueues = new Map>(); + private readonly channelQueue = new KeyedAsyncQueue(); constructor( private handler: DiscordMessageHandler, @@ -132,35 +133,21 @@ export class DiscordMessageListener extends MessageCreateListener { async handle(data: DiscordMessageEvent, client: Client) { this.onEvent?.(); const channelId = data.channel_id; - const prev = this.channelQueues.get(channelId) ?? Promise.resolve(); // Serialize messages within the same channel to preserve ordering, // but allow different channels to proceed in parallel so that // channel-bound agents are not blocked by each other. - const next = prev - .catch(() => {}) - .then(() => - runDiscordListenerWithSlowLog({ - logger: this.logger, - listener: this.constructor.name, - event: this.type, - run: () => this.handler(data, client), - onError: (err) => { - const logger = this.logger ?? discordEventQueueLog; - logger.error(danger(`discord handler failed: ${String(err)}`)); - }, - }), - ); - this.channelQueues.set(channelId, next); - void next - .then(() => { - if (this.channelQueues.get(channelId) === next) { - this.channelQueues.delete(channelId); - } - }) - .catch((err) => { - const logger = this.logger ?? discordEventQueueLog; - logger.error(danger(`discord handler failed: ${String(err)}`)); - }); + void this.channelQueue.enqueue(channelId, () => + runDiscordListenerWithSlowLog({ + logger: this.logger, + listener: this.constructor.name, + event: this.type, + run: () => this.handler(data, client), + onError: (err) => { + const logger = this.logger ?? discordEventQueueLog; + logger.error(danger(`discord handler failed: ${String(err)}`)); + }, + }), + ); } } diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index 81c64f69ab2..4d656634602 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -120,6 +120,8 @@ export { isDangerousNameMatchingEnabled } from "../config/dangerous-name-matchin export type { FileLockHandle, FileLockOptions } from "./file-lock.js"; export { acquireFileLock, withFileLock } from "./file-lock.js"; +export type { KeyedAsyncQueueHooks } from "./keyed-async-queue.js"; +export { enqueueKeyedTask, KeyedAsyncQueue } from "./keyed-async-queue.js"; export { normalizeWebhookPath, resolveWebhookPath } from "./webhook-path.js"; export { registerWebhookTarget, diff --git a/src/plugin-sdk/keyed-async-queue.test.ts b/src/plugin-sdk/keyed-async-queue.test.ts new file mode 100644 index 00000000000..50038f5bc93 --- /dev/null +++ b/src/plugin-sdk/keyed-async-queue.test.ts @@ -0,0 +1,108 @@ +import { describe, expect, it, vi } from "vitest"; +import { enqueueKeyedTask, KeyedAsyncQueue } from "./keyed-async-queue.js"; + +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + +describe("enqueueKeyedTask", () => { + it("serializes tasks per key and keeps different keys independent", async () => { + const tails = new Map>(); + const gate = deferred(); + const order: string[] = []; + + const first = enqueueKeyedTask({ + tails, + key: "a", + task: async () => { + order.push("a1:start"); + await gate.promise; + order.push("a1:end"); + }, + }); + const second = enqueueKeyedTask({ + tails, + key: "a", + task: async () => { + order.push("a2:start"); + order.push("a2:end"); + }, + }); + const third = enqueueKeyedTask({ + tails, + key: "b", + task: async () => { + order.push("b1:start"); + order.push("b1:end"); + }, + }); + + await vi.waitFor(() => { + expect(order).toContain("a1:start"); + expect(order).toContain("b1:start"); + }); + expect(order).not.toContain("a2:start"); + + gate.resolve(); + await Promise.all([first, second, third]); + expect(order).toEqual(["a1:start", "b1:start", "b1:end", "a1:end", "a2:start", "a2:end"]); + expect(tails.size).toBe(0); + }); + + it("keeps queue alive after task failures", async () => { + const tails = new Map>(); + await expect( + enqueueKeyedTask({ + tails, + key: "a", + task: async () => { + throw new Error("boom"); + }, + }), + ).rejects.toThrow("boom"); + + await expect( + enqueueKeyedTask({ + tails, + key: "a", + task: async () => "ok", + }), + ).resolves.toBe("ok"); + }); + + it("runs enqueue/settle hooks once per task", async () => { + const tails = new Map>(); + const onEnqueue = vi.fn(); + const onSettle = vi.fn(); + await enqueueKeyedTask({ + tails, + key: "a", + task: async () => undefined, + hooks: { onEnqueue, onSettle }, + }); + expect(onEnqueue).toHaveBeenCalledTimes(1); + expect(onSettle).toHaveBeenCalledTimes(1); + }); +}); + +describe("KeyedAsyncQueue", () => { + it("exposes tail map for observability", async () => { + const queue = new KeyedAsyncQueue(); + const gate = deferred(); + const run = queue.enqueue("actor", async () => { + await gate.promise; + return 1; + }); + expect(queue.getTailMapForTesting().has("actor")).toBe(true); + gate.resolve(); + await run; + await Promise.resolve(); + expect(queue.getTailMapForTesting().has("actor")).toBe(false); + }); +}); diff --git a/src/plugin-sdk/keyed-async-queue.ts b/src/plugin-sdk/keyed-async-queue.ts new file mode 100644 index 00000000000..6e79cf35d59 --- /dev/null +++ b/src/plugin-sdk/keyed-async-queue.ts @@ -0,0 +1,48 @@ +export type KeyedAsyncQueueHooks = { + onEnqueue?: () => void; + onSettle?: () => void; +}; + +export function enqueueKeyedTask(params: { + tails: Map>; + key: string; + task: () => Promise; + hooks?: KeyedAsyncQueueHooks; +}): Promise { + params.hooks?.onEnqueue?.(); + const previous = params.tails.get(params.key) ?? Promise.resolve(); + const current = previous + .catch(() => undefined) + .then(params.task) + .finally(() => { + params.hooks?.onSettle?.(); + }); + const tail = current.then( + () => undefined, + () => undefined, + ); + params.tails.set(params.key, tail); + void tail.finally(() => { + if (params.tails.get(params.key) === tail) { + params.tails.delete(params.key); + } + }); + return current; +} + +export class KeyedAsyncQueue { + private readonly tails = new Map>(); + + getTailMapForTesting(): Map> { + return this.tails; + } + + enqueue(key: string, task: () => Promise, hooks?: KeyedAsyncQueueHooks): Promise { + return enqueueKeyedTask({ + tails: this.tails, + key, + task, + ...(hooks ? { hooks } : {}), + }); + } +} diff --git a/src/slack/format.test.ts b/src/slack/format.test.ts index 220fa7a594c..ea889014941 100644 --- a/src/slack/format.test.ts +++ b/src/slack/format.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "vitest"; -import { markdownToSlackMrkdwn } from "./format.js"; +import { markdownToSlackMrkdwn, normalizeSlackOutboundText } from "./format.js"; import { escapeSlackMrkdwn } from "./monitor/mrkdwn.js"; describe("markdownToSlackMrkdwn", () => { @@ -72,3 +72,9 @@ describe("escapeSlackMrkdwn", () => { expect(escapeSlackMrkdwn("mode_*`~<&>\\")).toBe("mode\\_\\*\\`\\~<&>\\\\"); }); }); + +describe("normalizeSlackOutboundText", () => { + it("normalizes markdown for outbound send/update paths", () => { + expect(normalizeSlackOutboundText(" **bold** ")).toBe("*bold*"); + }); +}); diff --git a/src/slack/format.ts b/src/slack/format.ts index 2eade04153f..baf8f804374 100644 --- a/src/slack/format.ts +++ b/src/slack/format.ts @@ -128,6 +128,10 @@ export function markdownToSlackMrkdwn( return renderMarkdownWithMarkers(ir, buildSlackRenderOptions()); } +export function normalizeSlackOutboundText(markdown: string): string { + return markdownToSlackMrkdwn(markdown ?? ""); +} + export function markdownToSlackMrkdwnChunks( markdown: string, limit: number, diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index 2022100ce19..e847b2446ee 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -12,7 +12,7 @@ import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js"; import { resolveAgentOutboundIdentity } from "../../../infra/outbound/identity.js"; import { removeSlackReaction } from "../../actions.js"; import { createSlackDraftStream } from "../../draft-stream.js"; -import { markdownToSlackMrkdwn } from "../../format.js"; +import { normalizeSlackOutboundText } from "../../format.js"; import { recordSlackThreadParticipation } from "../../sent-thread-cache.js"; import { applyAppendOnlyStreamUpdate, @@ -291,7 +291,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag token: ctx.botToken, channel: draftChannelId, ts: draftMessageId, - text: markdownToSlackMrkdwn(finalText.trim()), + text: normalizeSlackOutboundText(finalText.trim()), }); return; } catch (err) { diff --git a/src/slack/streaming.ts b/src/slack/streaming.ts index d2ed0e02612..e80fe9b2140 100644 --- a/src/slack/streaming.ts +++ b/src/slack/streaming.ts @@ -14,7 +14,7 @@ import type { WebClient } from "@slack/web-api"; import type { ChatStreamer } from "@slack/web-api/dist/chat-stream.js"; import { logVerbose } from "../globals.js"; -import { markdownToSlackMrkdwn } from "./format.js"; +import { normalizeSlackOutboundText } from "./format.js"; // --------------------------------------------------------------------------- // Types @@ -100,7 +100,7 @@ export async function startSlackStream( // If initial text is provided, send it as the first append which will // trigger the ChatStreamer to call chat.startStream under the hood. if (text) { - await streamer.append({ markdown_text: markdownToSlackMrkdwn(text) }); + await streamer.append({ markdown_text: normalizeSlackOutboundText(text) }); logVerbose(`slack-stream: appended initial text (${text.length} chars)`); } @@ -122,7 +122,7 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis return; } - await session.streamer.append({ markdown_text: markdownToSlackMrkdwn(text) }); + await session.streamer.append({ markdown_text: normalizeSlackOutboundText(text) }); logVerbose(`slack-stream: appended ${text.length} chars`); } @@ -148,7 +148,9 @@ export async function stopSlackStream(params: StopSlackStreamParams): Promise byProvider.get(id)?.has(model)); - if (matchingProviders.length === 1) { - resolvedProvider = matchingProviders[0]; - } else { - const providerInfos: ProviderInfo[] = providers.map((p) => ({ - id: p, - count: byProvider.get(p)?.size ?? 0, - })); - const buttons = buildProviderKeyboard(providerInfos); - await editMessageWithButtons( - `Could not resolve model "${model}".\n\nSelect a provider:`, - buttons, - ); - return; - } + const selection = resolveModelSelection({ + callback: modelCallback, + providers, + byProvider, + }); + if (selection.kind !== "resolved") { + const providerInfos: ProviderInfo[] = providers.map((p) => ({ + id: p, + count: byProvider.get(p)?.size ?? 0, + })); + const buttons = buildProviderKeyboard(providerInfos); + await editMessageWithButtons( + `Could not resolve model "${selection.model}".\n\nSelect a provider:`, + buttons, + ); + return; } // Process model selection as a synthetic message with /model command const syntheticMessage = buildSyntheticTextMessage({ base: callbackMessage, from: callback.from, - text: `/model ${resolvedProvider}/${model}`, + text: `/model ${selection.provider}/${selection.model}`, }); await processMessage(buildSyntheticContext(ctx, syntheticMessage), [], storeAllowFrom, { forceWasMentioned: true, diff --git a/src/telegram/bot/delivery.resolve-media.ts b/src/telegram/bot/delivery.resolve-media.ts index 0d5ddec445c..50112236c90 100644 --- a/src/telegram/bot/delivery.resolve-media.ts +++ b/src/telegram/bot/delivery.resolve-media.ts @@ -41,122 +41,31 @@ function isRetryableGetFileError(err: unknown): boolean { return true; } -export async function resolveMedia( - ctx: TelegramContext, - maxBytes: number, - token: string, - proxyFetch?: typeof fetch, -): Promise<{ - path: string; - contentType?: string; - placeholder: string; - stickerMetadata?: StickerMetadata; -} | null> { - const msg = ctx.message; - const downloadAndSaveTelegramFile = async ( - filePath: string, - fetchImpl: typeof fetch, - telegramFileName?: string, - ) => { - const url = `https://api.telegram.org/file/bot${token}/${filePath}`; - const fetched = await fetchRemoteMedia({ - url, - fetchImpl, - filePathHint: filePath, - maxBytes, - ssrfPolicy: TELEGRAM_MEDIA_SSRF_POLICY, - }); - const originalName = telegramFileName ?? fetched.fileName ?? filePath; - return saveMediaBuffer(fetched.buffer, fetched.contentType, "inbound", maxBytes, originalName); - }; - - // Handle stickers separately - only static stickers (WEBP) are supported - if (msg.sticker) { - const sticker = msg.sticker; - // Skip animated (TGS) and video (WEBM) stickers - only static WEBP supported - if (sticker.is_animated || sticker.is_video) { - logVerbose("telegram: skipping animated/video sticker (only static stickers supported)"); - return null; - } - if (!sticker.file_id) { - return null; - } - - try { - const file = await ctx.getFile(); - if (!file.file_path) { - logVerbose("telegram: getFile returned no file_path for sticker"); - return null; - } - const fetchImpl = proxyFetch ?? globalThis.fetch; - if (!fetchImpl) { - logVerbose("telegram: fetch not available for sticker download"); - return null; - } - const saved = await downloadAndSaveTelegramFile(file.file_path, fetchImpl); - - // Check sticker cache for existing description - const cached = sticker.file_unique_id ? getCachedSticker(sticker.file_unique_id) : null; - if (cached) { - logVerbose(`telegram: sticker cache hit for ${sticker.file_unique_id}`); - const fileId = sticker.file_id ?? cached.fileId; - const emoji = sticker.emoji ?? cached.emoji; - const setName = sticker.set_name ?? cached.setName; - if (fileId !== cached.fileId || emoji !== cached.emoji || setName !== cached.setName) { - // Refresh cached sticker metadata on hits so sends/searches use latest file_id. - cacheSticker({ - ...cached, - fileId, - emoji, - setName, - }); - } - return { - path: saved.path, - contentType: saved.contentType, - placeholder: "", - stickerMetadata: { - emoji, - setName, - fileId, - fileUniqueId: sticker.file_unique_id, - cachedDescription: cached.description, - }, - }; - } - - // Cache miss - return metadata for vision processing - return { - path: saved.path, - contentType: saved.contentType, - placeholder: "", - stickerMetadata: { - emoji: sticker.emoji ?? undefined, - setName: sticker.set_name ?? undefined, - fileId: sticker.file_id, - fileUniqueId: sticker.file_unique_id, - }, - }; - } catch (err) { - logVerbose(`telegram: failed to process sticker: ${String(err)}`); - return null; - } - } - - const m = +function resolveMediaFileRef(msg: TelegramContext["message"]) { + return ( msg.photo?.[msg.photo.length - 1] ?? msg.video ?? msg.video_note ?? msg.document ?? msg.audio ?? - msg.voice; - if (!m?.file_id) { - return null; - } + msg.voice + ); +} - let file: { file_path?: string }; +function resolveTelegramFileName(msg: TelegramContext["message"]): string | undefined { + return ( + msg.document?.file_name ?? + msg.audio?.file_name ?? + msg.video?.file_name ?? + msg.animation?.file_name + ); +} + +async function resolveTelegramFileWithRetry( + ctx: TelegramContext, +): Promise<{ file_path?: string } | null> { try { - file = await retryAsync(() => ctx.getFile(), { + return await retryAsync(() => ctx.getFile(), { attempts: 3, minDelayMs: 1000, maxDelayMs: 4000, @@ -181,19 +90,179 @@ export async function resolveMedia( logVerbose(`telegram: getFile failed after retries: ${String(err)}`); return null; } - if (!file.file_path) { - throw new Error("Telegram getFile returned no file_path"); - } +} + +function resolveRequiredFetchImpl(proxyFetch?: typeof fetch): typeof fetch { const fetchImpl = proxyFetch ?? globalThis.fetch; if (!fetchImpl) { throw new Error("fetch is not available; set channels.telegram.proxy in config"); } - const telegramFileName = - msg.document?.file_name ?? - msg.audio?.file_name ?? - msg.video?.file_name ?? - msg.animation?.file_name; - const saved = await downloadAndSaveTelegramFile(file.file_path, fetchImpl, telegramFileName); + return fetchImpl; +} + +async function downloadAndSaveTelegramFile(params: { + filePath: string; + token: string; + fetchImpl: typeof fetch; + maxBytes: number; + telegramFileName?: string; +}) { + const url = `https://api.telegram.org/file/bot${params.token}/${params.filePath}`; + const fetched = await fetchRemoteMedia({ + url, + fetchImpl: params.fetchImpl, + filePathHint: params.filePath, + maxBytes: params.maxBytes, + ssrfPolicy: TELEGRAM_MEDIA_SSRF_POLICY, + }); + const originalName = params.telegramFileName ?? fetched.fileName ?? params.filePath; + return saveMediaBuffer( + fetched.buffer, + fetched.contentType, + "inbound", + params.maxBytes, + originalName, + ); +} + +async function resolveStickerMedia(params: { + msg: TelegramContext["message"]; + ctx: TelegramContext; + maxBytes: number; + token: string; + proxyFetch?: typeof fetch; +}): Promise< + | { + path: string; + contentType?: string; + placeholder: string; + stickerMetadata?: StickerMetadata; + } + | null + | undefined +> { + const { msg, ctx, maxBytes, token, proxyFetch } = params; + if (!msg.sticker) { + return undefined; + } + const sticker = msg.sticker; + // Skip animated (TGS) and video (WEBM) stickers - only static WEBP supported + if (sticker.is_animated || sticker.is_video) { + logVerbose("telegram: skipping animated/video sticker (only static stickers supported)"); + return null; + } + if (!sticker.file_id) { + return null; + } + + try { + const file = await ctx.getFile(); + if (!file.file_path) { + logVerbose("telegram: getFile returned no file_path for sticker"); + return null; + } + const fetchImpl = proxyFetch ?? globalThis.fetch; + if (!fetchImpl) { + logVerbose("telegram: fetch not available for sticker download"); + return null; + } + const saved = await downloadAndSaveTelegramFile({ + filePath: file.file_path, + token, + fetchImpl, + maxBytes, + }); + + // Check sticker cache for existing description + const cached = sticker.file_unique_id ? getCachedSticker(sticker.file_unique_id) : null; + if (cached) { + logVerbose(`telegram: sticker cache hit for ${sticker.file_unique_id}`); + const fileId = sticker.file_id ?? cached.fileId; + const emoji = sticker.emoji ?? cached.emoji; + const setName = sticker.set_name ?? cached.setName; + if (fileId !== cached.fileId || emoji !== cached.emoji || setName !== cached.setName) { + // Refresh cached sticker metadata on hits so sends/searches use latest file_id. + cacheSticker({ + ...cached, + fileId, + emoji, + setName, + }); + } + return { + path: saved.path, + contentType: saved.contentType, + placeholder: "", + stickerMetadata: { + emoji, + setName, + fileId, + fileUniqueId: sticker.file_unique_id, + cachedDescription: cached.description, + }, + }; + } + + // Cache miss - return metadata for vision processing + return { + path: saved.path, + contentType: saved.contentType, + placeholder: "", + stickerMetadata: { + emoji: sticker.emoji ?? undefined, + setName: sticker.set_name ?? undefined, + fileId: sticker.file_id, + fileUniqueId: sticker.file_unique_id, + }, + }; + } catch (err) { + logVerbose(`telegram: failed to process sticker: ${String(err)}`); + return null; + } +} + +export async function resolveMedia( + ctx: TelegramContext, + maxBytes: number, + token: string, + proxyFetch?: typeof fetch, +): Promise<{ + path: string; + contentType?: string; + placeholder: string; + stickerMetadata?: StickerMetadata; +} | null> { + const msg = ctx.message; + const stickerResolved = await resolveStickerMedia({ + msg, + ctx, + maxBytes, + token, + proxyFetch, + }); + if (stickerResolved !== undefined) { + return stickerResolved; + } + + const m = resolveMediaFileRef(msg); + if (!m?.file_id) { + return null; + } + + const file = await resolveTelegramFileWithRetry(ctx); + if (!file) { + return null; + } + if (!file.file_path) { + throw new Error("Telegram getFile returned no file_path"); + } + const saved = await downloadAndSaveTelegramFile({ + filePath: file.file_path, + token, + fetchImpl: resolveRequiredFetchImpl(proxyFetch), + maxBytes, + telegramFileName: resolveTelegramFileName(msg), + }); const placeholder = resolveTelegramMediaPlaceholder(msg) ?? ""; return { path: saved.path, contentType: saved.contentType, placeholder }; } diff --git a/src/telegram/model-buttons.test.ts b/src/telegram/model-buttons.test.ts index 3650ed917d7..3a6b5832f49 100644 --- a/src/telegram/model-buttons.test.ts +++ b/src/telegram/model-buttons.test.ts @@ -1,11 +1,13 @@ import { describe, expect, it } from "vitest"; import { + buildModelSelectionCallbackData, buildModelsKeyboard, - buildProviderKeyboard, buildBrowseProvidersButton, + buildProviderKeyboard, calculateTotalPages, getModelsPageSize, parseModelCallbackData, + resolveModelSelection, type ProviderInfo, } from "./model-buttons.js"; @@ -52,6 +54,79 @@ describe("parseModelCallbackData", () => { }); }); +describe("resolveModelSelection", () => { + it("returns explicit provider selections unchanged", () => { + const result = resolveModelSelection({ + callback: { type: "select", provider: "openai", model: "gpt-4.1" }, + providers: ["openai", "anthropic"], + byProvider: new Map([ + ["openai", new Set(["gpt-4.1"])], + ["anthropic", new Set(["claude-sonnet-4-5"])], + ]), + }); + expect(result).toEqual({ kind: "resolved", provider: "openai", model: "gpt-4.1" }); + }); + + it("resolves compact callbacks when exactly one provider matches", () => { + const result = resolveModelSelection({ + callback: { type: "select", model: "shared" }, + providers: ["openai", "anthropic"], + byProvider: new Map([ + ["openai", new Set(["shared"])], + ["anthropic", new Set(["other"])], + ]), + }); + expect(result).toEqual({ kind: "resolved", provider: "openai", model: "shared" }); + }); + + it("returns ambiguous result when zero or multiple providers match", () => { + const sharedByBoth = resolveModelSelection({ + callback: { type: "select", model: "shared" }, + providers: ["openai", "anthropic"], + byProvider: new Map([ + ["openai", new Set(["shared"])], + ["anthropic", new Set(["shared"])], + ]), + }); + expect(sharedByBoth).toEqual({ + kind: "ambiguous", + model: "shared", + matchingProviders: ["openai", "anthropic"], + }); + + const missingEverywhere = resolveModelSelection({ + callback: { type: "select", model: "missing" }, + providers: ["openai", "anthropic"], + byProvider: new Map([ + ["openai", new Set(["gpt-4.1"])], + ["anthropic", new Set(["claude-sonnet-4-5"])], + ]), + }); + expect(missingEverywhere).toEqual({ + kind: "ambiguous", + model: "missing", + matchingProviders: [], + }); + }); +}); + +describe("buildModelSelectionCallbackData", () => { + it("uses standard callback when under limit and compact callback when needed", () => { + expect(buildModelSelectionCallbackData({ provider: "openai", model: "gpt-4.1" })).toBe( + "mdl_sel_openai/gpt-4.1", + ); + const longModel = "us.anthropic.claude-3-5-sonnet-20240620-v1:0"; + expect(buildModelSelectionCallbackData({ provider: "amazon-bedrock", model: longModel })).toBe( + `mdl_sel/${longModel}`, + ); + }); + + it("returns null when even compact callback exceeds Telegram limit", () => { + const tooLongModel = "x".repeat(80); + expect(buildModelSelectionCallbackData({ provider: "openai", model: tooLongModel })).toBeNull(); + }); +}); + describe("buildProviderKeyboard", () => { it("lays out providers in two-column rows", () => { const cases = [ diff --git a/src/telegram/model-buttons.ts b/src/telegram/model-buttons.ts index 86e54b72827..f6a16457d6c 100644 --- a/src/telegram/model-buttons.ts +++ b/src/telegram/model-buttons.ts @@ -22,6 +22,10 @@ export type ProviderInfo = { count: number; }; +export type ResolveModelSelectionResult = + | { kind: "resolved"; provider: string; model: string } + | { kind: "ambiguous"; model: string; matchingProviders: string[] }; + export type ModelsKeyboardParams = { provider: string; models: readonly string[]; @@ -33,6 +37,13 @@ export type ModelsKeyboardParams = { const MODELS_PAGE_SIZE = 8; const MAX_CALLBACK_DATA_BYTES = 64; +const CALLBACK_PREFIX = { + providers: "mdl_prov", + back: "mdl_back", + list: "mdl_list_", + selectStandard: "mdl_sel_", + selectCompact: "mdl_sel/", +} as const; /** * Parse a model callback_data string into a structured object. @@ -44,8 +55,8 @@ export function parseModelCallbackData(data: string): ParsedModelCallback | null return null; } - if (trimmed === "mdl_prov" || trimmed === "mdl_back") { - return { type: trimmed === "mdl_prov" ? "providers" : "back" }; + if (trimmed === CALLBACK_PREFIX.providers || trimmed === CALLBACK_PREFIX.back) { + return { type: trimmed === CALLBACK_PREFIX.providers ? "providers" : "back" }; } // mdl_list_{provider}_{page} @@ -89,6 +100,49 @@ export function parseModelCallbackData(data: string): ParsedModelCallback | null return null; } +export function buildModelSelectionCallbackData(params: { + provider: string; + model: string; +}): string | null { + const fullCallbackData = `${CALLBACK_PREFIX.selectStandard}${params.provider}/${params.model}`; + if (Buffer.byteLength(fullCallbackData, "utf8") <= MAX_CALLBACK_DATA_BYTES) { + return fullCallbackData; + } + const compactCallbackData = `${CALLBACK_PREFIX.selectCompact}${params.model}`; + return Buffer.byteLength(compactCallbackData, "utf8") <= MAX_CALLBACK_DATA_BYTES + ? compactCallbackData + : null; +} + +export function resolveModelSelection(params: { + callback: Extract; + providers: readonly string[]; + byProvider: ReadonlyMap>; +}): ResolveModelSelectionResult { + if (params.callback.provider) { + return { + kind: "resolved", + provider: params.callback.provider, + model: params.callback.model, + }; + } + const matchingProviders = params.providers.filter((id) => + params.byProvider.get(id)?.has(params.callback.model), + ); + if (matchingProviders.length === 1) { + return { + kind: "resolved", + provider: matchingProviders[0], + model: params.callback.model, + }; + } + return { + kind: "ambiguous", + model: params.callback.model, + matchingProviders, + }; +} + /** * Build provider selection keyboard with 2 providers per row. */ @@ -130,7 +184,7 @@ export function buildModelsKeyboard(params: ModelsKeyboardParams): ButtonRow[] { const pageSize = params.pageSize ?? MODELS_PAGE_SIZE; if (models.length === 0) { - return [[{ text: "<< Back", callback_data: "mdl_back" }]]; + return [[{ text: "<< Back", callback_data: CALLBACK_PREFIX.back }]]; } const rows: ButtonRow[] = []; @@ -146,13 +200,9 @@ export function buildModelsKeyboard(params: ModelsKeyboardParams): ButtonRow[] { : currentModel; for (const model of pageModels) { - const fullCallbackData = `mdl_sel_${provider}/${model}`; - const callbackData = - Buffer.byteLength(fullCallbackData, "utf8") <= MAX_CALLBACK_DATA_BYTES - ? fullCallbackData - : `mdl_sel/${model}`; - // Skip models that still exceed Telegram's callback_data limit - if (Buffer.byteLength(callbackData, "utf8") > MAX_CALLBACK_DATA_BYTES) { + const callbackData = buildModelSelectionCallbackData({ provider, model }); + // Skip models that still exceed Telegram's callback_data limit. + if (!callbackData) { continue; } @@ -175,19 +225,19 @@ export function buildModelsKeyboard(params: ModelsKeyboardParams): ButtonRow[] { if (currentPage > 1) { paginationRow.push({ text: "◀ Prev", - callback_data: `mdl_list_${provider}_${currentPage - 1}`, + callback_data: `${CALLBACK_PREFIX.list}${provider}_${currentPage - 1}`, }); } paginationRow.push({ text: `${currentPage}/${totalPages}`, - callback_data: `mdl_list_${provider}_${currentPage}`, // noop + callback_data: `${CALLBACK_PREFIX.list}${provider}_${currentPage}`, // noop }); if (currentPage < totalPages) { paginationRow.push({ text: "Next ▶", - callback_data: `mdl_list_${provider}_${currentPage + 1}`, + callback_data: `${CALLBACK_PREFIX.list}${provider}_${currentPage + 1}`, }); } @@ -195,7 +245,7 @@ export function buildModelsKeyboard(params: ModelsKeyboardParams): ButtonRow[] { } // Back button - rows.push([{ text: "<< Back", callback_data: "mdl_back" }]); + rows.push([{ text: "<< Back", callback_data: CALLBACK_PREFIX.back }]); return rows; } @@ -204,7 +254,7 @@ export function buildModelsKeyboard(params: ModelsKeyboardParams): ButtonRow[] { * Build "Browse providers" button for /model summary. */ export function buildBrowseProvidersButton(): ButtonRow[] { - return [[{ text: "Browse providers", callback_data: "mdl_prov" }]]; + return [[{ text: "Browse providers", callback_data: CALLBACK_PREFIX.providers }]]; } /** diff --git a/tsconfig.plugin-sdk.dts.json b/tsconfig.plugin-sdk.dts.json index 4361da3b71e..ba48a3d1eeb 100644 --- a/tsconfig.plugin-sdk.dts.json +++ b/tsconfig.plugin-sdk.dts.json @@ -10,6 +10,11 @@ "rootDir": "src", "tsBuildInfoFile": "dist/plugin-sdk/.tsbuildinfo" }, - "include": ["src/plugin-sdk/index.ts", "src/plugin-sdk/account-id.ts", "src/types/**/*.d.ts"], + "include": [ + "src/plugin-sdk/index.ts", + "src/plugin-sdk/account-id.ts", + "src/plugin-sdk/keyed-async-queue.ts", + "src/types/**/*.d.ts" + ], "exclude": ["node_modules", "dist", "src/**/*.test.ts"] } diff --git a/vitest.config.ts b/vitest.config.ts index 424fa3e8427..51eda12f55b 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -17,6 +17,10 @@ export default defineConfig({ find: "openclaw/plugin-sdk/account-id", replacement: path.join(repoRoot, "src", "plugin-sdk", "account-id.ts"), }, + { + find: "openclaw/plugin-sdk/keyed-async-queue", + replacement: path.join(repoRoot, "src", "plugin-sdk", "keyed-async-queue.ts"), + }, { find: "openclaw/plugin-sdk", replacement: path.join(repoRoot, "src", "plugin-sdk", "index.ts"),