refactor: unify queueing and normalize telegram slack flows

This commit is contained in:
Peter Steinberger
2026-03-02 20:55:10 +00:00
parent 320920d523
commit 3a08e69a05
21 changed files with 627 additions and 266 deletions

View File

@@ -1,3 +1,5 @@
import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue";
export const DEFAULT_SEND_GAP_MS = 150;
type MatrixSendQueueOptions = {
@@ -6,37 +8,19 @@ type MatrixSendQueueOptions = {
};
// Serialize sends per room to preserve Matrix delivery order.
const roomQueues = new Map<string, Promise<void>>();
const roomQueues = new KeyedAsyncQueue();
export async function enqueueSend<T>(
export function enqueueSend<T>(
roomId: string,
fn: () => Promise<T>,
options?: MatrixSendQueueOptions,
): Promise<T> {
const gapMs = options?.gapMs ?? DEFAULT_SEND_GAP_MS;
const delayFn = options?.delayFn ?? delay;
const previous = roomQueues.get(roomId) ?? Promise.resolve();
const next = previous
.catch(() => {})
.then(async () => {
await delayFn(gapMs);
return await fn();
});
const queueMarker = next.then(
() => {},
() => {},
);
roomQueues.set(roomId, queueMarker);
queueMarker.finally(() => {
if (roomQueues.get(roomId) === queueMarker) {
roomQueues.delete(roomId);
}
return roomQueues.enqueue(roomId, async () => {
await delayFn(gapMs);
return await fn();
});
return await next;
}
function delay(ms: number): Promise<void> {

View File

@@ -44,6 +44,10 @@
"types": "./dist/plugin-sdk/account-id.d.ts",
"default": "./dist/plugin-sdk/account-id.js"
},
"./plugin-sdk/keyed-async-queue": {
"types": "./dist/plugin-sdk/keyed-async-queue.d.ts",
"default": "./dist/plugin-sdk/keyed-async-queue.js"
},
"./cli-entry": "./openclaw.mjs"
},
"scripts": {

View File

@@ -1,9 +1,11 @@
import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js";
export class SessionActorQueue {
private readonly tailBySession = new Map<string, Promise<void>>();
private readonly queue = new KeyedAsyncQueue();
private readonly pendingBySession = new Map<string, number>();
getTailMapForTesting(): Map<string, Promise<void>> {
return this.tailBySession;
return this.queue.getTailMapForTesting();
}
getTotalPendingCount(): number {
@@ -19,35 +21,18 @@ export class SessionActorQueue {
}
async run<T>(actorKey: string, op: () => Promise<T>): Promise<T> {
const previous = this.tailBySession.get(actorKey) ?? Promise.resolve();
this.pendingBySession.set(actorKey, (this.pendingBySession.get(actorKey) ?? 0) + 1);
let release: () => void = () => {};
const marker = new Promise<void>((resolve) => {
release = resolve;
return this.queue.enqueue(actorKey, op, {
onEnqueue: () => {
this.pendingBySession.set(actorKey, (this.pendingBySession.get(actorKey) ?? 0) + 1);
},
onSettle: () => {
const pending = (this.pendingBySession.get(actorKey) ?? 1) - 1;
if (pending <= 0) {
this.pendingBySession.delete(actorKey);
} else {
this.pendingBySession.set(actorKey, pending);
}
},
});
const queuedTail = previous
.catch(() => {
// Keep actor queue alive after an operation failure.
})
.then(() => marker);
this.tailBySession.set(actorKey, queuedTail);
await previous.catch(() => {
// Previous failures should not block newer commands.
});
try {
return await op();
} finally {
const pending = (this.pendingBySession.get(actorKey) ?? 1) - 1;
if (pending <= 0) {
this.pendingBySession.delete(actorKey);
} else {
this.pendingBySession.set(actorKey, pending);
}
release();
if (this.tailBySession.get(actorKey) === queuedTail) {
this.tailBySession.delete(actorKey);
}
}
}
}

View File

@@ -7,6 +7,7 @@ import type { ImageContent } from "@mariozechner/pi-ai";
import type { ThinkLevel } from "../../auto-reply/thinking.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { CliBackendConfig } from "../../config/types.js";
import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js";
import { buildTtsSystemPromptHint } from "../../tts/tts.js";
import { isRecord } from "../../utils.js";
import { buildModelAliasLines } from "../model-alias-lines.js";
@@ -18,20 +19,9 @@ import { buildSystemPromptParams } from "../system-prompt-params.js";
import { buildAgentSystemPrompt } from "../system-prompt.js";
export { buildCliSupervisorScopeKey, resolveCliNoOutputTimeoutMs } from "./reliability.js";
const CLI_RUN_QUEUE = new Map<string, Promise<unknown>>();
const CLI_RUN_QUEUE = new KeyedAsyncQueue();
export function enqueueCliRun<T>(key: string, task: () => Promise<T>): Promise<T> {
const prior = CLI_RUN_QUEUE.get(key) ?? Promise.resolve();
const chained = prior.catch(() => undefined).then(task);
// Keep queue continuity even when a run rejects, without emitting unhandled rejections.
const tracked = chained
.catch(() => undefined)
.finally(() => {
if (CLI_RUN_QUEUE.get(key) === tracked) {
CLI_RUN_QUEUE.delete(key);
}
});
CLI_RUN_QUEUE.set(key, tracked);
return chained;
return CLI_RUN_QUEUE.enqueue(key, task);
}
type CliUsage = {

View File

@@ -9,6 +9,10 @@ export interface BackupRotationFs {
readdir?: (path: string) => Promise<string[]>;
}
export interface BackupMaintenanceFs extends BackupRotationFs {
copyFile: (from: string, to: string) => Promise<void>;
}
export async function rotateConfigBackups(
configPath: string,
ioFs: BackupRotationFs,
@@ -103,3 +107,19 @@ export async function cleanOrphanBackups(
});
}
}
/**
* Run the full backup maintenance cycle around config writes.
* Order matters: rotate ring -> create new .bak -> harden modes -> prune orphan .bak.* files.
*/
export async function maintainConfigBackups(
configPath: string,
ioFs: BackupMaintenanceFs,
): Promise<void> {
await rotateConfigBackups(configPath, ioFs);
await ioFs.copyFile(configPath, `${configPath}.bak`).catch(() => {
// best-effort
});
await hardenBackupPermissions(configPath, ioFs);
await cleanOrphanBackups(configPath, ioFs);
}

View File

@@ -2,6 +2,7 @@ import fs from "node:fs/promises";
import path from "node:path";
import { describe, expect, it } from "vitest";
import {
maintainConfigBackups,
rotateConfigBackups,
hardenBackupPermissions,
cleanOrphanBackups,
@@ -112,4 +113,31 @@ describe("config backup rotation", () => {
await expect(fs.readFile(configPath, "utf-8")).resolves.toBe("current");
});
});
it("maintainConfigBackups composes rotate/copy/harden/prune flow", async () => {
await withTempHome(async () => {
const stateDir = process.env.OPENCLAW_STATE_DIR?.trim();
if (!stateDir) {
throw new Error("Expected OPENCLAW_STATE_DIR to be set by withTempHome");
}
const configPath = path.join(stateDir, "openclaw.json");
await fs.writeFile(configPath, JSON.stringify({ token: "secret" }), { mode: 0o600 });
await fs.writeFile(`${configPath}.bak`, "previous", { mode: 0o644 });
await fs.writeFile(`${configPath}.bak.orphan`, "old");
await maintainConfigBackups(configPath, fs);
// A new primary backup is created from the current config.
await expect(fs.readFile(`${configPath}.bak`, "utf-8")).resolves.toBe(
JSON.stringify({ token: "secret" }),
);
// Prior primary backup gets rotated into ring slot 1.
await expect(fs.readFile(`${configPath}.bak.1`, "utf-8")).resolves.toBe("previous");
// Mode hardening still applies.
const primaryBackupStat = await fs.stat(`${configPath}.bak`);
expect(primaryBackupStat.mode & 0o777).toBe(0o600);
// Out-of-ring orphan gets pruned.
await expect(fs.stat(`${configPath}.bak.orphan`)).rejects.toThrow();
});
});
});

View File

@@ -15,11 +15,7 @@ import {
} from "../infra/shell-env.js";
import { VERSION } from "../version.js";
import { DuplicateAgentDirError, findDuplicateAgentDirs } from "./agent-dirs.js";
import {
rotateConfigBackups,
hardenBackupPermissions,
cleanOrphanBackups,
} from "./backup-rotation.js";
import { maintainConfigBackups } from "./backup-rotation.js";
import {
applyCompactionDefaults,
applyContextPruningDefaults,
@@ -1245,12 +1241,7 @@ export function createConfigIO(overrides: ConfigIoDeps = {}) {
});
if (deps.fs.existsSync(configPath)) {
await rotateConfigBackups(configPath, deps.fs.promises);
await deps.fs.promises.copyFile(configPath, `${configPath}.bak`).catch(() => {
// best-effort
});
await hardenBackupPermissions(configPath, deps.fs.promises);
await cleanOrphanBackups(configPath, deps.fs.promises);
await maintainConfigBackups(configPath, deps.fs.promises);
}
try {

View File

@@ -11,6 +11,7 @@ import { danger, logVerbose } from "../../globals.js";
import { formatDurationSeconds } from "../../infra/format-time/format-duration.ts";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js";
import { resolveAgentRoute } from "../../routing/resolve-route.js";
import {
readStoreAllowFromForDmPolicy,
@@ -119,7 +120,7 @@ export function registerDiscordListener(listeners: Array<object>, listener: obje
}
export class DiscordMessageListener extends MessageCreateListener {
private channelQueues = new Map<string, Promise<void>>();
private readonly channelQueue = new KeyedAsyncQueue();
constructor(
private handler: DiscordMessageHandler,
@@ -132,35 +133,21 @@ export class DiscordMessageListener extends MessageCreateListener {
async handle(data: DiscordMessageEvent, client: Client) {
this.onEvent?.();
const channelId = data.channel_id;
const prev = this.channelQueues.get(channelId) ?? Promise.resolve();
// Serialize messages within the same channel to preserve ordering,
// but allow different channels to proceed in parallel so that
// channel-bound agents are not blocked by each other.
const next = prev
.catch(() => {})
.then(() =>
runDiscordListenerWithSlowLog({
logger: this.logger,
listener: this.constructor.name,
event: this.type,
run: () => this.handler(data, client),
onError: (err) => {
const logger = this.logger ?? discordEventQueueLog;
logger.error(danger(`discord handler failed: ${String(err)}`));
},
}),
);
this.channelQueues.set(channelId, next);
void next
.then(() => {
if (this.channelQueues.get(channelId) === next) {
this.channelQueues.delete(channelId);
}
})
.catch((err) => {
const logger = this.logger ?? discordEventQueueLog;
logger.error(danger(`discord handler failed: ${String(err)}`));
});
void this.channelQueue.enqueue(channelId, () =>
runDiscordListenerWithSlowLog({
logger: this.logger,
listener: this.constructor.name,
event: this.type,
run: () => this.handler(data, client),
onError: (err) => {
const logger = this.logger ?? discordEventQueueLog;
logger.error(danger(`discord handler failed: ${String(err)}`));
},
}),
);
}
}

View File

@@ -120,6 +120,8 @@ export { isDangerousNameMatchingEnabled } from "../config/dangerous-name-matchin
export type { FileLockHandle, FileLockOptions } from "./file-lock.js";
export { acquireFileLock, withFileLock } from "./file-lock.js";
export type { KeyedAsyncQueueHooks } from "./keyed-async-queue.js";
export { enqueueKeyedTask, KeyedAsyncQueue } from "./keyed-async-queue.js";
export { normalizeWebhookPath, resolveWebhookPath } from "./webhook-path.js";
export {
registerWebhookTarget,

View File

@@ -0,0 +1,108 @@
import { describe, expect, it, vi } from "vitest";
import { enqueueKeyedTask, KeyedAsyncQueue } from "./keyed-async-queue.js";
function deferred<T>() {
let resolve!: (value: T | PromiseLike<T>) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
describe("enqueueKeyedTask", () => {
it("serializes tasks per key and keeps different keys independent", async () => {
const tails = new Map<string, Promise<void>>();
const gate = deferred<void>();
const order: string[] = [];
const first = enqueueKeyedTask({
tails,
key: "a",
task: async () => {
order.push("a1:start");
await gate.promise;
order.push("a1:end");
},
});
const second = enqueueKeyedTask({
tails,
key: "a",
task: async () => {
order.push("a2:start");
order.push("a2:end");
},
});
const third = enqueueKeyedTask({
tails,
key: "b",
task: async () => {
order.push("b1:start");
order.push("b1:end");
},
});
await vi.waitFor(() => {
expect(order).toContain("a1:start");
expect(order).toContain("b1:start");
});
expect(order).not.toContain("a2:start");
gate.resolve();
await Promise.all([first, second, third]);
expect(order).toEqual(["a1:start", "b1:start", "b1:end", "a1:end", "a2:start", "a2:end"]);
expect(tails.size).toBe(0);
});
it("keeps queue alive after task failures", async () => {
const tails = new Map<string, Promise<void>>();
await expect(
enqueueKeyedTask({
tails,
key: "a",
task: async () => {
throw new Error("boom");
},
}),
).rejects.toThrow("boom");
await expect(
enqueueKeyedTask({
tails,
key: "a",
task: async () => "ok",
}),
).resolves.toBe("ok");
});
it("runs enqueue/settle hooks once per task", async () => {
const tails = new Map<string, Promise<void>>();
const onEnqueue = vi.fn();
const onSettle = vi.fn();
await enqueueKeyedTask({
tails,
key: "a",
task: async () => undefined,
hooks: { onEnqueue, onSettle },
});
expect(onEnqueue).toHaveBeenCalledTimes(1);
expect(onSettle).toHaveBeenCalledTimes(1);
});
});
describe("KeyedAsyncQueue", () => {
it("exposes tail map for observability", async () => {
const queue = new KeyedAsyncQueue();
const gate = deferred<void>();
const run = queue.enqueue("actor", async () => {
await gate.promise;
return 1;
});
expect(queue.getTailMapForTesting().has("actor")).toBe(true);
gate.resolve();
await run;
await Promise.resolve();
expect(queue.getTailMapForTesting().has("actor")).toBe(false);
});
});

View File

@@ -0,0 +1,48 @@
export type KeyedAsyncQueueHooks = {
onEnqueue?: () => void;
onSettle?: () => void;
};
export function enqueueKeyedTask<T>(params: {
tails: Map<string, Promise<void>>;
key: string;
task: () => Promise<T>;
hooks?: KeyedAsyncQueueHooks;
}): Promise<T> {
params.hooks?.onEnqueue?.();
const previous = params.tails.get(params.key) ?? Promise.resolve();
const current = previous
.catch(() => undefined)
.then(params.task)
.finally(() => {
params.hooks?.onSettle?.();
});
const tail = current.then(
() => undefined,
() => undefined,
);
params.tails.set(params.key, tail);
void tail.finally(() => {
if (params.tails.get(params.key) === tail) {
params.tails.delete(params.key);
}
});
return current;
}
export class KeyedAsyncQueue {
private readonly tails = new Map<string, Promise<void>>();
getTailMapForTesting(): Map<string, Promise<void>> {
return this.tails;
}
enqueue<T>(key: string, task: () => Promise<T>, hooks?: KeyedAsyncQueueHooks): Promise<T> {
return enqueueKeyedTask({
tails: this.tails,
key,
task,
...(hooks ? { hooks } : {}),
});
}
}

View File

@@ -1,5 +1,5 @@
import { describe, expect, it } from "vitest";
import { markdownToSlackMrkdwn } from "./format.js";
import { markdownToSlackMrkdwn, normalizeSlackOutboundText } from "./format.js";
import { escapeSlackMrkdwn } from "./monitor/mrkdwn.js";
describe("markdownToSlackMrkdwn", () => {
@@ -72,3 +72,9 @@ describe("escapeSlackMrkdwn", () => {
expect(escapeSlackMrkdwn("mode_*`~<&>\\")).toBe("mode\\_\\*\\`\\~&lt;&amp;&gt;\\\\");
});
});
describe("normalizeSlackOutboundText", () => {
it("normalizes markdown for outbound send/update paths", () => {
expect(normalizeSlackOutboundText(" **bold** ")).toBe("*bold*");
});
});

View File

@@ -128,6 +128,10 @@ export function markdownToSlackMrkdwn(
return renderMarkdownWithMarkers(ir, buildSlackRenderOptions());
}
export function normalizeSlackOutboundText(markdown: string): string {
return markdownToSlackMrkdwn(markdown ?? "");
}
export function markdownToSlackMrkdwnChunks(
markdown: string,
limit: number,

View File

@@ -12,7 +12,7 @@ import { danger, logVerbose, shouldLogVerbose } from "../../../globals.js";
import { resolveAgentOutboundIdentity } from "../../../infra/outbound/identity.js";
import { removeSlackReaction } from "../../actions.js";
import { createSlackDraftStream } from "../../draft-stream.js";
import { markdownToSlackMrkdwn } from "../../format.js";
import { normalizeSlackOutboundText } from "../../format.js";
import { recordSlackThreadParticipation } from "../../sent-thread-cache.js";
import {
applyAppendOnlyStreamUpdate,
@@ -291,7 +291,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
token: ctx.botToken,
channel: draftChannelId,
ts: draftMessageId,
text: markdownToSlackMrkdwn(finalText.trim()),
text: normalizeSlackOutboundText(finalText.trim()),
});
return;
} catch (err) {

View File

@@ -14,7 +14,7 @@
import type { WebClient } from "@slack/web-api";
import type { ChatStreamer } from "@slack/web-api/dist/chat-stream.js";
import { logVerbose } from "../globals.js";
import { markdownToSlackMrkdwn } from "./format.js";
import { normalizeSlackOutboundText } from "./format.js";
// ---------------------------------------------------------------------------
// Types
@@ -100,7 +100,7 @@ export async function startSlackStream(
// If initial text is provided, send it as the first append which will
// trigger the ChatStreamer to call chat.startStream under the hood.
if (text) {
await streamer.append({ markdown_text: markdownToSlackMrkdwn(text) });
await streamer.append({ markdown_text: normalizeSlackOutboundText(text) });
logVerbose(`slack-stream: appended initial text (${text.length} chars)`);
}
@@ -122,7 +122,7 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis
return;
}
await session.streamer.append({ markdown_text: markdownToSlackMrkdwn(text) });
await session.streamer.append({ markdown_text: normalizeSlackOutboundText(text) });
logVerbose(`slack-stream: appended ${text.length} chars`);
}
@@ -148,7 +148,9 @@ export async function stopSlackStream(params: StopSlackStreamParams): Promise<vo
}`,
);
await session.streamer.stop(text ? { markdown_text: markdownToSlackMrkdwn(text) } : undefined);
await session.streamer.stop(
text ? { markdown_text: normalizeSlackOutboundText(text) } : undefined,
);
logVerbose("slack-stream: stream stopped");
}

View File

@@ -63,6 +63,7 @@ import {
calculateTotalPages,
getModelsPageSize,
parseModelCallbackData,
resolveModelSelection,
type ProviderInfo,
} from "./model-buttons.js";
import { buildInlineKeyboard } from "./send.js";
@@ -1260,30 +1261,28 @@ export const registerTelegramHandlers = ({
}
if (modelCallback.type === "select") {
const { provider, model } = modelCallback;
let resolvedProvider = provider;
if (!resolvedProvider) {
const matchingProviders = providers.filter((id) => byProvider.get(id)?.has(model));
if (matchingProviders.length === 1) {
resolvedProvider = matchingProviders[0];
} else {
const providerInfos: ProviderInfo[] = providers.map((p) => ({
id: p,
count: byProvider.get(p)?.size ?? 0,
}));
const buttons = buildProviderKeyboard(providerInfos);
await editMessageWithButtons(
`Could not resolve model "${model}".\n\nSelect a provider:`,
buttons,
);
return;
}
const selection = resolveModelSelection({
callback: modelCallback,
providers,
byProvider,
});
if (selection.kind !== "resolved") {
const providerInfos: ProviderInfo[] = providers.map((p) => ({
id: p,
count: byProvider.get(p)?.size ?? 0,
}));
const buttons = buildProviderKeyboard(providerInfos);
await editMessageWithButtons(
`Could not resolve model "${selection.model}".\n\nSelect a provider:`,
buttons,
);
return;
}
// Process model selection as a synthetic message with /model command
const syntheticMessage = buildSyntheticTextMessage({
base: callbackMessage,
from: callback.from,
text: `/model ${resolvedProvider}/${model}`,
text: `/model ${selection.provider}/${selection.model}`,
});
await processMessage(buildSyntheticContext(ctx, syntheticMessage), [], storeAllowFrom, {
forceWasMentioned: true,

View File

@@ -41,122 +41,31 @@ function isRetryableGetFileError(err: unknown): boolean {
return true;
}
export async function resolveMedia(
ctx: TelegramContext,
maxBytes: number,
token: string,
proxyFetch?: typeof fetch,
): Promise<{
path: string;
contentType?: string;
placeholder: string;
stickerMetadata?: StickerMetadata;
} | null> {
const msg = ctx.message;
const downloadAndSaveTelegramFile = async (
filePath: string,
fetchImpl: typeof fetch,
telegramFileName?: string,
) => {
const url = `https://api.telegram.org/file/bot${token}/${filePath}`;
const fetched = await fetchRemoteMedia({
url,
fetchImpl,
filePathHint: filePath,
maxBytes,
ssrfPolicy: TELEGRAM_MEDIA_SSRF_POLICY,
});
const originalName = telegramFileName ?? fetched.fileName ?? filePath;
return saveMediaBuffer(fetched.buffer, fetched.contentType, "inbound", maxBytes, originalName);
};
// Handle stickers separately - only static stickers (WEBP) are supported
if (msg.sticker) {
const sticker = msg.sticker;
// Skip animated (TGS) and video (WEBM) stickers - only static WEBP supported
if (sticker.is_animated || sticker.is_video) {
logVerbose("telegram: skipping animated/video sticker (only static stickers supported)");
return null;
}
if (!sticker.file_id) {
return null;
}
try {
const file = await ctx.getFile();
if (!file.file_path) {
logVerbose("telegram: getFile returned no file_path for sticker");
return null;
}
const fetchImpl = proxyFetch ?? globalThis.fetch;
if (!fetchImpl) {
logVerbose("telegram: fetch not available for sticker download");
return null;
}
const saved = await downloadAndSaveTelegramFile(file.file_path, fetchImpl);
// Check sticker cache for existing description
const cached = sticker.file_unique_id ? getCachedSticker(sticker.file_unique_id) : null;
if (cached) {
logVerbose(`telegram: sticker cache hit for ${sticker.file_unique_id}`);
const fileId = sticker.file_id ?? cached.fileId;
const emoji = sticker.emoji ?? cached.emoji;
const setName = sticker.set_name ?? cached.setName;
if (fileId !== cached.fileId || emoji !== cached.emoji || setName !== cached.setName) {
// Refresh cached sticker metadata on hits so sends/searches use latest file_id.
cacheSticker({
...cached,
fileId,
emoji,
setName,
});
}
return {
path: saved.path,
contentType: saved.contentType,
placeholder: "<media:sticker>",
stickerMetadata: {
emoji,
setName,
fileId,
fileUniqueId: sticker.file_unique_id,
cachedDescription: cached.description,
},
};
}
// Cache miss - return metadata for vision processing
return {
path: saved.path,
contentType: saved.contentType,
placeholder: "<media:sticker>",
stickerMetadata: {
emoji: sticker.emoji ?? undefined,
setName: sticker.set_name ?? undefined,
fileId: sticker.file_id,
fileUniqueId: sticker.file_unique_id,
},
};
} catch (err) {
logVerbose(`telegram: failed to process sticker: ${String(err)}`);
return null;
}
}
const m =
function resolveMediaFileRef(msg: TelegramContext["message"]) {
return (
msg.photo?.[msg.photo.length - 1] ??
msg.video ??
msg.video_note ??
msg.document ??
msg.audio ??
msg.voice;
if (!m?.file_id) {
return null;
}
msg.voice
);
}
let file: { file_path?: string };
function resolveTelegramFileName(msg: TelegramContext["message"]): string | undefined {
return (
msg.document?.file_name ??
msg.audio?.file_name ??
msg.video?.file_name ??
msg.animation?.file_name
);
}
async function resolveTelegramFileWithRetry(
ctx: TelegramContext,
): Promise<{ file_path?: string } | null> {
try {
file = await retryAsync(() => ctx.getFile(), {
return await retryAsync(() => ctx.getFile(), {
attempts: 3,
minDelayMs: 1000,
maxDelayMs: 4000,
@@ -181,19 +90,179 @@ export async function resolveMedia(
logVerbose(`telegram: getFile failed after retries: ${String(err)}`);
return null;
}
if (!file.file_path) {
throw new Error("Telegram getFile returned no file_path");
}
}
function resolveRequiredFetchImpl(proxyFetch?: typeof fetch): typeof fetch {
const fetchImpl = proxyFetch ?? globalThis.fetch;
if (!fetchImpl) {
throw new Error("fetch is not available; set channels.telegram.proxy in config");
}
const telegramFileName =
msg.document?.file_name ??
msg.audio?.file_name ??
msg.video?.file_name ??
msg.animation?.file_name;
const saved = await downloadAndSaveTelegramFile(file.file_path, fetchImpl, telegramFileName);
return fetchImpl;
}
async function downloadAndSaveTelegramFile(params: {
filePath: string;
token: string;
fetchImpl: typeof fetch;
maxBytes: number;
telegramFileName?: string;
}) {
const url = `https://api.telegram.org/file/bot${params.token}/${params.filePath}`;
const fetched = await fetchRemoteMedia({
url,
fetchImpl: params.fetchImpl,
filePathHint: params.filePath,
maxBytes: params.maxBytes,
ssrfPolicy: TELEGRAM_MEDIA_SSRF_POLICY,
});
const originalName = params.telegramFileName ?? fetched.fileName ?? params.filePath;
return saveMediaBuffer(
fetched.buffer,
fetched.contentType,
"inbound",
params.maxBytes,
originalName,
);
}
async function resolveStickerMedia(params: {
msg: TelegramContext["message"];
ctx: TelegramContext;
maxBytes: number;
token: string;
proxyFetch?: typeof fetch;
}): Promise<
| {
path: string;
contentType?: string;
placeholder: string;
stickerMetadata?: StickerMetadata;
}
| null
| undefined
> {
const { msg, ctx, maxBytes, token, proxyFetch } = params;
if (!msg.sticker) {
return undefined;
}
const sticker = msg.sticker;
// Skip animated (TGS) and video (WEBM) stickers - only static WEBP supported
if (sticker.is_animated || sticker.is_video) {
logVerbose("telegram: skipping animated/video sticker (only static stickers supported)");
return null;
}
if (!sticker.file_id) {
return null;
}
try {
const file = await ctx.getFile();
if (!file.file_path) {
logVerbose("telegram: getFile returned no file_path for sticker");
return null;
}
const fetchImpl = proxyFetch ?? globalThis.fetch;
if (!fetchImpl) {
logVerbose("telegram: fetch not available for sticker download");
return null;
}
const saved = await downloadAndSaveTelegramFile({
filePath: file.file_path,
token,
fetchImpl,
maxBytes,
});
// Check sticker cache for existing description
const cached = sticker.file_unique_id ? getCachedSticker(sticker.file_unique_id) : null;
if (cached) {
logVerbose(`telegram: sticker cache hit for ${sticker.file_unique_id}`);
const fileId = sticker.file_id ?? cached.fileId;
const emoji = sticker.emoji ?? cached.emoji;
const setName = sticker.set_name ?? cached.setName;
if (fileId !== cached.fileId || emoji !== cached.emoji || setName !== cached.setName) {
// Refresh cached sticker metadata on hits so sends/searches use latest file_id.
cacheSticker({
...cached,
fileId,
emoji,
setName,
});
}
return {
path: saved.path,
contentType: saved.contentType,
placeholder: "<media:sticker>",
stickerMetadata: {
emoji,
setName,
fileId,
fileUniqueId: sticker.file_unique_id,
cachedDescription: cached.description,
},
};
}
// Cache miss - return metadata for vision processing
return {
path: saved.path,
contentType: saved.contentType,
placeholder: "<media:sticker>",
stickerMetadata: {
emoji: sticker.emoji ?? undefined,
setName: sticker.set_name ?? undefined,
fileId: sticker.file_id,
fileUniqueId: sticker.file_unique_id,
},
};
} catch (err) {
logVerbose(`telegram: failed to process sticker: ${String(err)}`);
return null;
}
}
export async function resolveMedia(
ctx: TelegramContext,
maxBytes: number,
token: string,
proxyFetch?: typeof fetch,
): Promise<{
path: string;
contentType?: string;
placeholder: string;
stickerMetadata?: StickerMetadata;
} | null> {
const msg = ctx.message;
const stickerResolved = await resolveStickerMedia({
msg,
ctx,
maxBytes,
token,
proxyFetch,
});
if (stickerResolved !== undefined) {
return stickerResolved;
}
const m = resolveMediaFileRef(msg);
if (!m?.file_id) {
return null;
}
const file = await resolveTelegramFileWithRetry(ctx);
if (!file) {
return null;
}
if (!file.file_path) {
throw new Error("Telegram getFile returned no file_path");
}
const saved = await downloadAndSaveTelegramFile({
filePath: file.file_path,
token,
fetchImpl: resolveRequiredFetchImpl(proxyFetch),
maxBytes,
telegramFileName: resolveTelegramFileName(msg),
});
const placeholder = resolveTelegramMediaPlaceholder(msg) ?? "<media:document>";
return { path: saved.path, contentType: saved.contentType, placeholder };
}

View File

@@ -1,11 +1,13 @@
import { describe, expect, it } from "vitest";
import {
buildModelSelectionCallbackData,
buildModelsKeyboard,
buildProviderKeyboard,
buildBrowseProvidersButton,
buildProviderKeyboard,
calculateTotalPages,
getModelsPageSize,
parseModelCallbackData,
resolveModelSelection,
type ProviderInfo,
} from "./model-buttons.js";
@@ -52,6 +54,79 @@ describe("parseModelCallbackData", () => {
});
});
describe("resolveModelSelection", () => {
it("returns explicit provider selections unchanged", () => {
const result = resolveModelSelection({
callback: { type: "select", provider: "openai", model: "gpt-4.1" },
providers: ["openai", "anthropic"],
byProvider: new Map([
["openai", new Set(["gpt-4.1"])],
["anthropic", new Set(["claude-sonnet-4-5"])],
]),
});
expect(result).toEqual({ kind: "resolved", provider: "openai", model: "gpt-4.1" });
});
it("resolves compact callbacks when exactly one provider matches", () => {
const result = resolveModelSelection({
callback: { type: "select", model: "shared" },
providers: ["openai", "anthropic"],
byProvider: new Map([
["openai", new Set(["shared"])],
["anthropic", new Set(["other"])],
]),
});
expect(result).toEqual({ kind: "resolved", provider: "openai", model: "shared" });
});
it("returns ambiguous result when zero or multiple providers match", () => {
const sharedByBoth = resolveModelSelection({
callback: { type: "select", model: "shared" },
providers: ["openai", "anthropic"],
byProvider: new Map([
["openai", new Set(["shared"])],
["anthropic", new Set(["shared"])],
]),
});
expect(sharedByBoth).toEqual({
kind: "ambiguous",
model: "shared",
matchingProviders: ["openai", "anthropic"],
});
const missingEverywhere = resolveModelSelection({
callback: { type: "select", model: "missing" },
providers: ["openai", "anthropic"],
byProvider: new Map([
["openai", new Set(["gpt-4.1"])],
["anthropic", new Set(["claude-sonnet-4-5"])],
]),
});
expect(missingEverywhere).toEqual({
kind: "ambiguous",
model: "missing",
matchingProviders: [],
});
});
});
describe("buildModelSelectionCallbackData", () => {
it("uses standard callback when under limit and compact callback when needed", () => {
expect(buildModelSelectionCallbackData({ provider: "openai", model: "gpt-4.1" })).toBe(
"mdl_sel_openai/gpt-4.1",
);
const longModel = "us.anthropic.claude-3-5-sonnet-20240620-v1:0";
expect(buildModelSelectionCallbackData({ provider: "amazon-bedrock", model: longModel })).toBe(
`mdl_sel/${longModel}`,
);
});
it("returns null when even compact callback exceeds Telegram limit", () => {
const tooLongModel = "x".repeat(80);
expect(buildModelSelectionCallbackData({ provider: "openai", model: tooLongModel })).toBeNull();
});
});
describe("buildProviderKeyboard", () => {
it("lays out providers in two-column rows", () => {
const cases = [

View File

@@ -22,6 +22,10 @@ export type ProviderInfo = {
count: number;
};
export type ResolveModelSelectionResult =
| { kind: "resolved"; provider: string; model: string }
| { kind: "ambiguous"; model: string; matchingProviders: string[] };
export type ModelsKeyboardParams = {
provider: string;
models: readonly string[];
@@ -33,6 +37,13 @@ export type ModelsKeyboardParams = {
const MODELS_PAGE_SIZE = 8;
const MAX_CALLBACK_DATA_BYTES = 64;
const CALLBACK_PREFIX = {
providers: "mdl_prov",
back: "mdl_back",
list: "mdl_list_",
selectStandard: "mdl_sel_",
selectCompact: "mdl_sel/",
} as const;
/**
* Parse a model callback_data string into a structured object.
@@ -44,8 +55,8 @@ export function parseModelCallbackData(data: string): ParsedModelCallback | null
return null;
}
if (trimmed === "mdl_prov" || trimmed === "mdl_back") {
return { type: trimmed === "mdl_prov" ? "providers" : "back" };
if (trimmed === CALLBACK_PREFIX.providers || trimmed === CALLBACK_PREFIX.back) {
return { type: trimmed === CALLBACK_PREFIX.providers ? "providers" : "back" };
}
// mdl_list_{provider}_{page}
@@ -89,6 +100,49 @@ export function parseModelCallbackData(data: string): ParsedModelCallback | null
return null;
}
export function buildModelSelectionCallbackData(params: {
provider: string;
model: string;
}): string | null {
const fullCallbackData = `${CALLBACK_PREFIX.selectStandard}${params.provider}/${params.model}`;
if (Buffer.byteLength(fullCallbackData, "utf8") <= MAX_CALLBACK_DATA_BYTES) {
return fullCallbackData;
}
const compactCallbackData = `${CALLBACK_PREFIX.selectCompact}${params.model}`;
return Buffer.byteLength(compactCallbackData, "utf8") <= MAX_CALLBACK_DATA_BYTES
? compactCallbackData
: null;
}
export function resolveModelSelection(params: {
callback: Extract<ParsedModelCallback, { type: "select" }>;
providers: readonly string[];
byProvider: ReadonlyMap<string, ReadonlySet<string>>;
}): ResolveModelSelectionResult {
if (params.callback.provider) {
return {
kind: "resolved",
provider: params.callback.provider,
model: params.callback.model,
};
}
const matchingProviders = params.providers.filter((id) =>
params.byProvider.get(id)?.has(params.callback.model),
);
if (matchingProviders.length === 1) {
return {
kind: "resolved",
provider: matchingProviders[0],
model: params.callback.model,
};
}
return {
kind: "ambiguous",
model: params.callback.model,
matchingProviders,
};
}
/**
* Build provider selection keyboard with 2 providers per row.
*/
@@ -130,7 +184,7 @@ export function buildModelsKeyboard(params: ModelsKeyboardParams): ButtonRow[] {
const pageSize = params.pageSize ?? MODELS_PAGE_SIZE;
if (models.length === 0) {
return [[{ text: "<< Back", callback_data: "mdl_back" }]];
return [[{ text: "<< Back", callback_data: CALLBACK_PREFIX.back }]];
}
const rows: ButtonRow[] = [];
@@ -146,13 +200,9 @@ export function buildModelsKeyboard(params: ModelsKeyboardParams): ButtonRow[] {
: currentModel;
for (const model of pageModels) {
const fullCallbackData = `mdl_sel_${provider}/${model}`;
const callbackData =
Buffer.byteLength(fullCallbackData, "utf8") <= MAX_CALLBACK_DATA_BYTES
? fullCallbackData
: `mdl_sel/${model}`;
// Skip models that still exceed Telegram's callback_data limit
if (Buffer.byteLength(callbackData, "utf8") > MAX_CALLBACK_DATA_BYTES) {
const callbackData = buildModelSelectionCallbackData({ provider, model });
// Skip models that still exceed Telegram's callback_data limit.
if (!callbackData) {
continue;
}
@@ -175,19 +225,19 @@ export function buildModelsKeyboard(params: ModelsKeyboardParams): ButtonRow[] {
if (currentPage > 1) {
paginationRow.push({
text: "◀ Prev",
callback_data: `mdl_list_${provider}_${currentPage - 1}`,
callback_data: `${CALLBACK_PREFIX.list}${provider}_${currentPage - 1}`,
});
}
paginationRow.push({
text: `${currentPage}/${totalPages}`,
callback_data: `mdl_list_${provider}_${currentPage}`, // noop
callback_data: `${CALLBACK_PREFIX.list}${provider}_${currentPage}`, // noop
});
if (currentPage < totalPages) {
paginationRow.push({
text: "Next ▶",
callback_data: `mdl_list_${provider}_${currentPage + 1}`,
callback_data: `${CALLBACK_PREFIX.list}${provider}_${currentPage + 1}`,
});
}
@@ -195,7 +245,7 @@ export function buildModelsKeyboard(params: ModelsKeyboardParams): ButtonRow[] {
}
// Back button
rows.push([{ text: "<< Back", callback_data: "mdl_back" }]);
rows.push([{ text: "<< Back", callback_data: CALLBACK_PREFIX.back }]);
return rows;
}
@@ -204,7 +254,7 @@ export function buildModelsKeyboard(params: ModelsKeyboardParams): ButtonRow[] {
* Build "Browse providers" button for /model summary.
*/
export function buildBrowseProvidersButton(): ButtonRow[] {
return [[{ text: "Browse providers", callback_data: "mdl_prov" }]];
return [[{ text: "Browse providers", callback_data: CALLBACK_PREFIX.providers }]];
}
/**

View File

@@ -10,6 +10,11 @@
"rootDir": "src",
"tsBuildInfoFile": "dist/plugin-sdk/.tsbuildinfo"
},
"include": ["src/plugin-sdk/index.ts", "src/plugin-sdk/account-id.ts", "src/types/**/*.d.ts"],
"include": [
"src/plugin-sdk/index.ts",
"src/plugin-sdk/account-id.ts",
"src/plugin-sdk/keyed-async-queue.ts",
"src/types/**/*.d.ts"
],
"exclude": ["node_modules", "dist", "src/**/*.test.ts"]
}

View File

@@ -17,6 +17,10 @@ export default defineConfig({
find: "openclaw/plugin-sdk/account-id",
replacement: path.join(repoRoot, "src", "plugin-sdk", "account-id.ts"),
},
{
find: "openclaw/plugin-sdk/keyed-async-queue",
replacement: path.join(repoRoot, "src", "plugin-sdk", "keyed-async-queue.ts"),
},
{
find: "openclaw/plugin-sdk",
replacement: path.join(repoRoot, "src", "plugin-sdk", "index.ts"),