From 2c14b0cf4cb5472e9408752c69fb202415502c4d Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 21 Feb 2026 19:53:23 +0100 Subject: [PATCH] refactor(config): unify streaming config across channels --- CHANGELOG.md | 4 + docs/channels/discord.md | 10 +- docs/channels/grammy.md | 2 +- docs/channels/slack.md | 21 +- docs/channels/telegram.md | 7 +- docs/concepts/streaming.md | 78 +++--- docs/gateway/configuration-reference.md | 7 +- src/commands/doctor-config-flow.e2e.test.ts | 36 +++ src/commands/doctor-legacy-config.e2e.test.ts | 77 ++++++ src/commands/doctor-legacy-config.ts | 224 ++++++++++++++++-- ...tion.rejects-routing-allowfrom.e2e.test.ts | 117 ++++++++- src/config/legacy.migrations.part-1.ts | 115 +++++++++ src/config/schema.help.ts | 16 +- src/config/schema.labels.ts | 9 +- src/config/types.discord.ts | 22 +- src/config/types.slack.ts | 23 +- src/config/types.telegram.ts | 15 +- src/config/zod-schema.providers-core.ts | 55 +++-- .../monitor/message-handler.process.test.ts | 22 ++ .../monitor/message-handler.process.ts | 3 +- .../dispatch.streaming.test.ts | 12 +- src/slack/monitor/message-handler/dispatch.ts | 76 +++--- src/slack/stream-mode.test.ts | 43 ++++ src/slack/stream-mode.ts | 24 +- src/telegram/bot.helpers.test.ts | 6 +- src/telegram/bot/helpers.ts | 17 +- 26 files changed, 885 insertions(+), 156 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bd90d3b7bc3..395d6d180f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,10 @@ Docs: https://docs.openclaw.ai - Dependencies/Unused Dependencies: remove or scope unused root and extension deps (`@larksuiteoapi/node-sdk`, `signal-utils`, `ollama`, `lit`, `@lit/context`, `@lit-labs/signals`, `@microsoft/agents-hosting-express`, `@microsoft/agents-hosting-extensions-teams`, and plugin-local `openclaw` devDeps in `extensions/open-prose`, `extensions/lobster`, and `extensions/llm-task`). (#22471, #22495) Thanks @vincentkoc. - Dependencies/A2UI: harden dependency resolution after root cleanup (resolve `lit`, `@lit/context`, `@lit-labs/signals`, and `signal-utils` from workspace/root) and simplify bundling fallback behavior, including `pnpm dlx rolldown` compatibility. (#22481, #22507) Thanks @vincentkoc. +### Breaking + +- **BREAKING:** unify channel preview-streaming config to `channels..streaming` with enum values `off | partial | block | progress`, and move Slack native stream toggle to `channels.slack.nativeStreaming`. Legacy keys (`streamMode`, Slack boolean `streaming`) are still read and migrated by `openclaw doctor --fix`, but canonical saved config/docs now use the unified names. + ### Fixes - Chat/Usage/TUI: strip synthetic inbound metadata blocks (including `Conversation info` and trailing `Untrusted context` channel metadata wrappers) from displayed conversation history so internal prompt context no longer leaks into user-visible logs. diff --git a/docs/channels/discord.md b/docs/channels/discord.md index adafd6042d1..5f789a382a6 100644 --- a/docs/channels/discord.md +++ b/docs/channels/discord.md @@ -563,7 +563,9 @@ Default slash command settings: OpenClaw can stream draft replies by sending a temporary message and editing it as text arrives. - - `channels.discord.streamMode` controls preview streaming (`off` | `partial` | `block`, default: `off`). + - `channels.discord.streaming` controls preview streaming (`off` | `partial` | `block` | `progress`, default: `off`). + - `progress` is accepted for cross-channel consistency and maps to `partial` on Discord. + - `channels.discord.streamMode` is a legacy alias and is auto-migrated. - `partial` edits a single preview message as tokens arrive. - `block` emits draft-sized chunks (use `draftChunk` to tune size and breakpoints). @@ -573,7 +575,7 @@ Default slash command settings: { channels: { discord: { - streamMode: "partial", + streaming: "partial", }, }, } @@ -585,7 +587,7 @@ Default slash command settings: { channels: { discord: { - streamMode: "block", + streaming: "block", draftChunk: { minChars: 200, maxChars: 800, @@ -977,7 +979,7 @@ High-signal Discord fields: - command: `commands.native`, `commands.useAccessGroups`, `configWrites`, `slashCommand.*` - reply/history: `replyToMode`, `historyLimit`, `dmHistoryLimit`, `dms.*.historyLimit` - delivery: `textChunkLimit`, `chunkMode`, `maxLinesPerMessage` -- streaming: `streamMode`, `draftChunk`, `blockStreaming`, `blockStreamingCoalesce` +- streaming: `streaming` (legacy alias: `streamMode`), `draftChunk`, `blockStreaming`, `blockStreamingCoalesce` - media/retry: `mediaMaxMb`, `retry` - actions: `actions.*` - presence: `activity`, `status`, `activityType`, `activityUrl` diff --git a/docs/channels/grammy.md b/docs/channels/grammy.md index 570acabfb1c..25c197116f6 100644 --- a/docs/channels/grammy.md +++ b/docs/channels/grammy.md @@ -21,7 +21,7 @@ title: grammY - **Webhook support:** `webhook-set.ts` wraps `setWebhook/deleteWebhook`; `webhook.ts` hosts the callback with health + graceful shutdown. Gateway enables webhook mode when `channels.telegram.webhookUrl` + `channels.telegram.webhookSecret` are set (otherwise it long-polls). - **Sessions:** direct chats collapse into the agent main session (`agent::`); groups use `agent::telegram:group:`; replies route back to the same channel. - **Config knobs:** `channels.telegram.botToken`, `channels.telegram.dmPolicy`, `channels.telegram.groups` (allowlist + mention defaults), `channels.telegram.allowFrom`, `channels.telegram.groupAllowFrom`, `channels.telegram.groupPolicy`, `channels.telegram.mediaMaxMb`, `channels.telegram.linkPreview`, `channels.telegram.proxy`, `channels.telegram.webhookSecret`, `channels.telegram.webhookUrl`, `channels.telegram.webhookHost`. -- **Live stream preview:** optional `channels.telegram.streaming` sends a temporary message and updates it with `editMessageText`. This is separate from channel block streaming. +- **Live stream preview:** `channels.telegram.streaming` (`off | partial | block | progress`) sends a temporary message and updates it with `editMessageText`. This is separate from channel block streaming. - **Tests:** grammy mocks cover DM + group mention gating and outbound send; more media/webhook fixtures still welcome. Open questions diff --git a/docs/channels/slack.md b/docs/channels/slack.md index 9fdd3fb89a2..0d0bba3cb27 100644 --- a/docs/channels/slack.md +++ b/docs/channels/slack.md @@ -465,14 +465,29 @@ openclaw pairing list slack OpenClaw supports Slack native text streaming via the Agents and AI Apps API. -By default, streaming is enabled. Disable it per account: +`channels.slack.streaming` controls live preview behavior: + +- `off`: disable live preview streaming. +- `partial` (default): replace preview text with the latest partial output. +- `block`: append chunked preview updates. +- `progress`: show progress status text while generating, then send final text. + +`channels.slack.nativeStreaming` controls Slack's native streaming API (`chat.startStream` / `chat.appendStream` / `chat.stopStream`) when `streaming` is `partial` (default: `true`). + +Disable native Slack streaming (keep draft preview behavior): ```yaml channels: slack: - streaming: false + streaming: partial + nativeStreaming: false ``` +Legacy keys: + +- `channels.slack.streamMode` (`replace | status_final | append`) is auto-migrated to `channels.slack.streaming`. +- boolean `channels.slack.streaming` is auto-migrated to `channels.slack.nativeStreaming`. + ### Requirements 1. Enable **Agents and AI Apps** in your Slack app settings. @@ -498,7 +513,7 @@ Primary reference: - DM access: `dm.enabled`, `dmPolicy`, `allowFrom` (legacy: `dm.policy`, `dm.allowFrom`), `dm.groupEnabled`, `dm.groupChannels` - channel access: `groupPolicy`, `channels.*`, `channels.*.users`, `channels.*.requireMention` - threading/history: `replyToMode`, `replyToModeByChatType`, `thread.*`, `historyLimit`, `dmHistoryLimit`, `dms.*.historyLimit` - - delivery: `textChunkLimit`, `chunkMode`, `mediaMaxMb` + - delivery: `textChunkLimit`, `chunkMode`, `mediaMaxMb`, `streaming`, `nativeStreaming` - ops/features: `configWrites`, `commands.native`, `slashCommand.*`, `actions.*`, `userToken`, `userTokenReadOnly` ## Related diff --git a/docs/channels/telegram.md b/docs/channels/telegram.md index 5517ab20efb..8676bce4e97 100644 --- a/docs/channels/telegram.md +++ b/docs/channels/telegram.md @@ -226,8 +226,9 @@ curl "https://api.telegram.org/bot/getUpdates" Requirement: - - `channels.telegram.streaming` is `true` (default) - - legacy `channels.telegram.streamMode` values are auto-mapped to `streaming` + - `channels.telegram.streaming` is `off | partial | block | progress` (default: `off`) + - `progress` maps to `partial` on Telegram (compat with cross-channel naming) + - legacy `channels.telegram.streamMode` and boolean `streaming` values are auto-mapped This works in direct chats and groups/topics. @@ -708,7 +709,7 @@ Primary reference: - `channels.telegram.textChunkLimit`: outbound chunk size (chars). - `channels.telegram.chunkMode`: `length` (default) or `newline` to split on blank lines (paragraph boundaries) before length chunking. - `channels.telegram.linkPreview`: toggle link previews for outbound messages (default: true). -- `channels.telegram.streaming`: `true | false` (live stream preview; default: true). +- `channels.telegram.streaming`: `off | partial | block | progress` (live stream preview; default: `off`; `progress` maps to `partial`). - `channels.telegram.mediaMaxMb`: inbound/outbound media cap (MB). - `channels.telegram.retry`: retry policy for outbound Telegram API calls (attempts, minDelayMs, maxDelayMs, jitter). - `channels.telegram.network.autoSelectFamily`: override Node autoSelectFamily (true=enable, false=disable). Defaults to disabled on Node 22 to avoid Happy Eyeballs timeouts. diff --git a/docs/concepts/streaming.md b/docs/concepts/streaming.md index 1ac8da84ce7..310759deee9 100644 --- a/docs/concepts/streaming.md +++ b/docs/concepts/streaming.md @@ -1,20 +1,20 @@ --- -summary: "Streaming + chunking behavior (block replies, Telegram preview streaming, limits)" +summary: "Streaming + chunking behavior (block replies, channel preview streaming, mode mapping)" read_when: - Explaining how streaming or chunking works on channels - Changing block streaming or channel chunking behavior - - Debugging duplicate/early block replies or Telegram preview streaming + - Debugging duplicate/early block replies or channel preview streaming title: "Streaming and Chunking" --- # Streaming + chunking -OpenClaw has two separate “streaming” layers: +OpenClaw has two separate streaming layers: - **Block streaming (channels):** emit completed **blocks** as the assistant writes. These are normal channel messages (not token deltas). -- **Token-ish streaming (Telegram only):** update a temporary **preview message** with partial text while generating. +- **Preview streaming (Telegram/Discord/Slack):** update a temporary **preview message** while generating. -There is **no true token-delta streaming** to channel messages today. Telegram preview streaming is the only partial-stream surface. +There is **no true token-delta streaming** to channel messages today. Preview streaming is message-based (send + edits/appends). ## Block streaming (channel messages) @@ -98,34 +98,58 @@ This maps to: - **Stream everything at end:** `blockStreamingBreak: "message_end"` (flush once, possibly multiple chunks if very long). - **No block streaming:** `blockStreamingDefault: "off"` (only final reply). -**Channel note:** For non-Telegram channels, block streaming is **off unless** -`*.blockStreaming` is explicitly set to `true`. Telegram can stream a live preview -(`channels.telegram.streaming`) without block replies. +**Channel note:** Block streaming is **off unless** +`*.blockStreaming` is explicitly set to `true`. Channels can stream a live preview +(`channels..streaming`) without block replies. Config location reminder: the `blockStreaming*` defaults live under `agents.defaults`, not the root config. -## Telegram preview streaming (token-ish) +## Preview streaming modes -Telegram is the only channel with live preview streaming: +Canonical key: `channels..streaming` -- Uses Bot API `sendMessage` (first update) + `editMessageText` (subsequent updates). -- `channels.telegram.streaming: true | false` (default: `true`). -- Preview streaming is separate from block streaming. -- When Telegram block streaming is explicitly enabled, preview streaming is skipped to avoid double-streaming. -- Text-only finals are applied by editing the preview message in place. -- Non-text/complex finals fall back to normal final message delivery. -- `/reasoning stream` writes reasoning into the live preview (Telegram only). +Modes: -``` -Telegram - └─ sendMessage (temporary preview message) - └─ streaming=true → edit latest text - └─ final text-only reply → final edit on same message - └─ fallback: cleanup preview + normal final delivery (media/complex) -``` +- `off`: disable preview streaming. +- `partial`: single preview that is replaced with latest text. +- `block`: preview updates in chunked/appended steps. +- `progress`: progress/status preview during generation, final answer at completion. -Legend: +### Channel mapping -- `preview message`: temporary Telegram message updated during generation. -- `final edit`: in-place edit on the same preview message (text-only). +| Channel | `off` | `partial` | `block` | `progress` | +| -------- | ----- | --------- | ------- | ----------------- | +| Telegram | ✅ | ✅ | ✅ | maps to `partial` | +| Discord | ✅ | ✅ | ✅ | maps to `partial` | +| Slack | ✅ | ✅ | ✅ | ✅ | + +Slack-only: + +- `channels.slack.nativeStreaming` toggles Slack native streaming API calls when `streaming=partial` (default: `true`). + +Legacy key migration: + +- Telegram: `streamMode` + boolean `streaming` auto-migrate to `streaming` enum. +- Discord: `streamMode` + boolean `streaming` auto-migrate to `streaming` enum. +- Slack: `streamMode` auto-migrates to `streaming` enum; boolean `streaming` auto-migrates to `nativeStreaming`. + +### Runtime behavior + +Telegram: + +- Uses Bot API `sendMessage` + `editMessageText`. +- Preview streaming is skipped when Telegram block streaming is explicitly enabled (to avoid double-streaming). +- `/reasoning stream` can write reasoning to preview. + +Discord: + +- Uses send + edit preview messages. +- `block` mode uses draft chunking (`draftChunk`). +- Preview streaming is skipped when Discord block streaming is explicitly enabled. + +Slack: + +- `partial` can use Slack native streaming (`chat.startStream`/`append`/`stop`) when available. +- `block` uses append-style draft previews. +- `progress` uses status preview text, then final answer. diff --git a/docs/gateway/configuration-reference.md b/docs/gateway/configuration-reference.md index 3e2417971bb..3f25baf6380 100644 --- a/docs/gateway/configuration-reference.md +++ b/docs/gateway/configuration-reference.md @@ -151,7 +151,7 @@ WhatsApp runs through the gateway's web channel (Baileys Web). It starts automat historyLimit: 50, replyToMode: "first", // off | first | all linkPreview: true, - streaming: true, // live preview on/off (default true) + streaming: "partial", // off | partial | block | progress (default: off) actions: { reactions: true, sendMessage: true }, reactionNotifications: "own", // off | own | all mediaMaxMb: 5, @@ -228,6 +228,7 @@ WhatsApp runs through the gateway's web channel (Baileys Web). It starts automat historyLimit: 20, textChunkLimit: 2000, chunkMode: "length", // length | newline + streaming: "off", // off | partial | block | progress (progress maps to partial on Discord) maxLinesPerMessage: 17, ui: { components: { @@ -265,6 +266,7 @@ WhatsApp runs through the gateway's web channel (Baileys Web). It starts automat - `maxLinesPerMessage` (default 17) splits tall messages even when under 2000 chars. - `channels.discord.ui.components.accentColor` sets the accent color for Discord components v2 containers. - `channels.discord.voice` enables Discord voice channel conversations and optional auto-join + TTS overrides. +- `channels.discord.streaming` is the canonical stream mode key. Legacy `streamMode` and boolean `streaming` values are auto-migrated. **Reaction notification modes:** `off` (none), `own` (bot's messages, default), `all` (all messages), `allowlist` (from `guilds..users` on all messages). @@ -348,6 +350,8 @@ WhatsApp runs through the gateway's web channel (Baileys Web). It starts automat }, textChunkLimit: 4000, chunkMode: "length", + streaming: "partial", // off | partial | block | progress (preview mode) + nativeStreaming: true, // use Slack native streaming API when streaming=partial mediaMaxMb: 20, }, }, @@ -357,6 +361,7 @@ WhatsApp runs through the gateway's web channel (Baileys Web). It starts automat - **Socket mode** requires both `botToken` and `appToken` (`SLACK_BOT_TOKEN` + `SLACK_APP_TOKEN` for default account env fallback). - **HTTP mode** requires `botToken` plus `signingSecret` (at root or per-account). - `configWrites: false` blocks Slack-initiated config writes. +- `channels.slack.streaming` is the canonical stream mode key. Legacy `streamMode` and boolean `streaming` values are auto-migrated. - Use `user:` (DM) or `channel:` for delivery targets. **Reaction notification modes:** `off`, `own` (default), `all`, `allowlist` (from `reactionAllowlist`). diff --git a/src/commands/doctor-config-flow.e2e.test.ts b/src/commands/doctor-config-flow.e2e.test.ts index c60a3bfa626..f1d8bf307a4 100644 --- a/src/commands/doctor-config-flow.e2e.test.ts +++ b/src/commands/doctor-config-flow.e2e.test.ts @@ -68,6 +68,42 @@ describe("doctor config flow", () => { }); }); + it("preserves discord streaming intent while stripping unsupported keys on repair", async () => { + const result = await runDoctorConfigWithInput({ + repair: true, + config: { + channels: { + discord: { + streaming: true, + lifecycle: { + enabled: true, + reactions: { + queued: "⏳", + thinking: "🧠", + tool: "🔧", + done: "✅", + error: "❌", + }, + }, + }, + }, + }, + }); + + const cfg = result.cfg as { + channels: { + discord: { + streamMode?: string; + streaming?: string; + lifecycle?: unknown; + }; + }; + }; + expect(cfg.channels.discord.streaming).toBe("partial"); + expect(cfg.channels.discord.streamMode).toBeUndefined(); + expect(cfg.channels.discord.lifecycle).toBeUndefined(); + }); + it("resolves Telegram @username allowFrom entries to numeric IDs on repair", async () => { const fetchSpy = vi.fn(async (url: string) => { const u = String(url); diff --git a/src/commands/doctor-legacy-config.e2e.test.ts b/src/commands/doctor-legacy-config.e2e.test.ts index 43b097cecce..2a188e2d657 100644 --- a/src/commands/doctor-legacy-config.e2e.test.ts +++ b/src/commands/doctor-legacy-config.e2e.test.ts @@ -145,4 +145,81 @@ describe("normalizeLegacyConfigValues", () => { "Moved channels.discord.accounts.work.dm.allowFrom → channels.discord.accounts.work.allowFrom.", ]); }); + + it("migrates Discord streaming boolean alias to streaming enum", () => { + const res = normalizeLegacyConfigValues({ + channels: { + discord: { + streaming: true, + accounts: { + work: { + streaming: false, + }, + }, + }, + }, + }); + + expect(res.config.channels?.discord?.streaming).toBe("partial"); + expect(res.config.channels?.discord?.streamMode).toBeUndefined(); + expect(res.config.channels?.discord?.accounts?.work?.streaming).toBe("off"); + expect(res.config.channels?.discord?.accounts?.work?.streamMode).toBeUndefined(); + expect(res.changes).toEqual([ + "Normalized channels.discord.streaming boolean → enum (partial).", + "Normalized channels.discord.accounts.work.streaming boolean → enum (off).", + ]); + }); + + it("migrates Discord legacy streamMode into streaming enum", () => { + const res = normalizeLegacyConfigValues({ + channels: { + discord: { + streaming: false, + streamMode: "block", + }, + }, + }); + + expect(res.config.channels?.discord?.streaming).toBe("block"); + expect(res.config.channels?.discord?.streamMode).toBeUndefined(); + expect(res.changes).toEqual([ + "Moved channels.discord.streamMode → channels.discord.streaming (block).", + "Normalized channels.discord.streaming boolean → enum (block).", + ]); + }); + + it("migrates Telegram streamMode into streaming enum", () => { + const res = normalizeLegacyConfigValues({ + channels: { + telegram: { + streamMode: "block", + }, + }, + }); + + expect(res.config.channels?.telegram?.streaming).toBe("block"); + expect(res.config.channels?.telegram?.streamMode).toBeUndefined(); + expect(res.changes).toEqual([ + "Moved channels.telegram.streamMode → channels.telegram.streaming (block).", + ]); + }); + + it("migrates Slack legacy streaming keys to unified config", () => { + const res = normalizeLegacyConfigValues({ + channels: { + slack: { + streaming: false, + streamMode: "status_final", + }, + }, + }); + + expect(res.config.channels?.slack?.streaming).toBe("progress"); + expect(res.config.channels?.slack?.nativeStreaming).toBe(false); + expect(res.config.channels?.slack?.streamMode).toBeUndefined(); + expect(res.changes).toEqual([ + "Moved channels.slack.streamMode → channels.slack.streaming (progress).", + "Moved channels.slack.streaming (boolean) → channels.slack.nativeStreaming (false).", + ]); + }); }); diff --git a/src/commands/doctor-legacy-config.ts b/src/commands/doctor-legacy-config.ts index 58ffb196fd3..91c1d5eaaba 100644 --- a/src/commands/doctor-legacy-config.ts +++ b/src/commands/doctor-legacy-config.ts @@ -1,4 +1,11 @@ import type { OpenClawConfig } from "../config/config.js"; +import { + resolveDiscordPreviewStreamMode, + resolveSlackNativeStreaming, + resolveSlackStreamingMode, + resolveTelegramPreviewStreamMode, +} from "../config/discord-preview-streaming.js"; + export function normalizeLegacyConfigValues(cfg: OpenClawConfig): { config: OpenClawConfig; changes: string[]; @@ -90,20 +97,178 @@ export function normalizeLegacyConfigValues(cfg: OpenClawConfig): { return { entry: updated, changed }; }; - const normalizeProvider = (provider: "slack" | "discord") => { + const normalizeTelegramStreamingAliases = (params: { + entry: Record; + pathPrefix: string; + }): { entry: Record; changed: boolean } => { + let updated = params.entry; + const hadLegacyStreamMode = updated.streamMode !== undefined; + const beforeStreaming = updated.streaming; + const resolved = resolveTelegramPreviewStreamMode(updated); + const shouldNormalize = + hadLegacyStreamMode || + typeof beforeStreaming === "boolean" || + (typeof beforeStreaming === "string" && beforeStreaming !== resolved); + if (!shouldNormalize) { + return { entry: updated, changed: false }; + } + + let changed = false; + if (beforeStreaming !== resolved) { + updated = { ...updated, streaming: resolved }; + changed = true; + } + if (hadLegacyStreamMode) { + const { streamMode: _ignored, ...rest } = updated; + updated = rest; + changed = true; + changes.push( + `Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolved}).`, + ); + } + if (typeof beforeStreaming === "boolean") { + changes.push(`Normalized ${params.pathPrefix}.streaming boolean → enum (${resolved}).`); + } else if (typeof beforeStreaming === "string" && beforeStreaming !== resolved) { + changes.push( + `Normalized ${params.pathPrefix}.streaming (${beforeStreaming}) → (${resolved}).`, + ); + } + + return { entry: updated, changed }; + }; + + const normalizeDiscordStreamingAliases = (params: { + entry: Record; + pathPrefix: string; + }): { entry: Record; changed: boolean } => { + let updated = params.entry; + const hadLegacyStreamMode = updated.streamMode !== undefined; + const beforeStreaming = updated.streaming; + const resolved = resolveDiscordPreviewStreamMode(updated); + const shouldNormalize = + hadLegacyStreamMode || + typeof beforeStreaming === "boolean" || + (typeof beforeStreaming === "string" && beforeStreaming !== resolved); + if (!shouldNormalize) { + return { entry: updated, changed: false }; + } + + let changed = false; + if (beforeStreaming !== resolved) { + updated = { ...updated, streaming: resolved }; + changed = true; + } + if (hadLegacyStreamMode) { + const { streamMode: _ignored, ...rest } = updated; + updated = rest; + changed = true; + changes.push( + `Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolved}).`, + ); + } + if (typeof beforeStreaming === "boolean") { + changes.push(`Normalized ${params.pathPrefix}.streaming boolean → enum (${resolved}).`); + } else if (typeof beforeStreaming === "string" && beforeStreaming !== resolved) { + changes.push( + `Normalized ${params.pathPrefix}.streaming (${beforeStreaming}) → (${resolved}).`, + ); + } + + return { entry: updated, changed }; + }; + + const normalizeSlackStreamingAliases = (params: { + entry: Record; + pathPrefix: string; + }): { entry: Record; changed: boolean } => { + let updated = params.entry; + const hadLegacyStreamMode = updated.streamMode !== undefined; + const legacyStreaming = updated.streaming; + const beforeStreaming = updated.streaming; + const beforeNativeStreaming = updated.nativeStreaming; + const resolvedStreaming = resolveSlackStreamingMode(updated); + const resolvedNativeStreaming = resolveSlackNativeStreaming(updated); + const shouldNormalize = + hadLegacyStreamMode || + typeof legacyStreaming === "boolean" || + (typeof legacyStreaming === "string" && legacyStreaming !== resolvedStreaming); + if (!shouldNormalize) { + return { entry: updated, changed: false }; + } + + let changed = false; + if (beforeStreaming !== resolvedStreaming) { + updated = { ...updated, streaming: resolvedStreaming }; + changed = true; + } + if ( + typeof beforeNativeStreaming !== "boolean" || + beforeNativeStreaming !== resolvedNativeStreaming + ) { + updated = { ...updated, nativeStreaming: resolvedNativeStreaming }; + changed = true; + } + if (hadLegacyStreamMode) { + const { streamMode: _ignored, ...rest } = updated; + updated = rest; + changed = true; + changes.push( + `Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolvedStreaming}).`, + ); + } + if (typeof legacyStreaming === "boolean") { + changes.push( + `Moved ${params.pathPrefix}.streaming (boolean) → ${params.pathPrefix}.nativeStreaming (${resolvedNativeStreaming}).`, + ); + } else if (typeof legacyStreaming === "string" && legacyStreaming !== resolvedStreaming) { + changes.push( + `Normalized ${params.pathPrefix}.streaming (${legacyStreaming}) → (${resolvedStreaming}).`, + ); + } + + return { entry: updated, changed }; + }; + + const normalizeProvider = (provider: "telegram" | "slack" | "discord") => { const channels = next.channels as Record | undefined; const rawEntry = channels?.[provider]; if (!isRecord(rawEntry)) { return; } - const base = normalizeDmAliases({ - provider, - entry: rawEntry, - pathPrefix: `channels.${provider}`, - }); - let updated = base.entry; - let changed = base.changed; + let updated = rawEntry; + let changed = false; + if (provider !== "telegram") { + const base = normalizeDmAliases({ + provider, + entry: rawEntry, + pathPrefix: `channels.${provider}`, + }); + updated = base.entry; + changed = base.changed; + } + if (provider === "telegram") { + const streaming = normalizeTelegramStreamingAliases({ + entry: updated, + pathPrefix: `channels.${provider}`, + }); + updated = streaming.entry; + changed = changed || streaming.changed; + } else if (provider === "discord") { + const streaming = normalizeDiscordStreamingAliases({ + entry: updated, + pathPrefix: `channels.${provider}`, + }); + updated = streaming.entry; + changed = changed || streaming.changed; + } else if (provider === "slack") { + const streaming = normalizeSlackStreamingAliases({ + entry: updated, + pathPrefix: `channels.${provider}`, + }); + updated = streaming.entry; + changed = changed || streaming.changed; + } const rawAccounts = updated.accounts; if (isRecord(rawAccounts)) { @@ -113,13 +278,41 @@ export function normalizeLegacyConfigValues(cfg: OpenClawConfig): { if (!isRecord(rawAccount)) { continue; } - const res = normalizeDmAliases({ - provider, - entry: rawAccount, - pathPrefix: `channels.${provider}.accounts.${accountId}`, - }); - if (res.changed) { - accounts[accountId] = res.entry; + let accountEntry = rawAccount; + let accountChanged = false; + if (provider !== "telegram") { + const res = normalizeDmAliases({ + provider, + entry: rawAccount, + pathPrefix: `channels.${provider}.accounts.${accountId}`, + }); + accountEntry = res.entry; + accountChanged = res.changed; + } + if (provider === "telegram") { + const streaming = normalizeTelegramStreamingAliases({ + entry: accountEntry, + pathPrefix: `channels.${provider}.accounts.${accountId}`, + }); + accountEntry = streaming.entry; + accountChanged = accountChanged || streaming.changed; + } else if (provider === "discord") { + const streaming = normalizeDiscordStreamingAliases({ + entry: accountEntry, + pathPrefix: `channels.${provider}.accounts.${accountId}`, + }); + accountEntry = streaming.entry; + accountChanged = accountChanged || streaming.changed; + } else if (provider === "slack") { + const streaming = normalizeSlackStreamingAliases({ + entry: accountEntry, + pathPrefix: `channels.${provider}.accounts.${accountId}`, + }); + accountEntry = streaming.entry; + accountChanged = accountChanged || streaming.changed; + } + if (accountChanged) { + accounts[accountId] = accountEntry; accountsChanged = true; } } @@ -140,6 +333,7 @@ export function normalizeLegacyConfigValues(cfg: OpenClawConfig): { } }; + normalizeProvider("telegram"); normalizeProvider("slack"); normalizeProvider("discord"); diff --git a/src/config/config.legacy-config-detection.rejects-routing-allowfrom.e2e.test.ts b/src/config/config.legacy-config-detection.rejects-routing-allowfrom.e2e.test.ts index ac83e659af2..23997c4020d 100644 --- a/src/config/config.legacy-config-detection.rejects-routing-allowfrom.e2e.test.ts +++ b/src/config/config.legacy-config-detection.rejects-routing-allowfrom.e2e.test.ts @@ -378,27 +378,27 @@ describe("legacy config detection", () => { expect(res.config.channels?.telegram?.groupPolicy).toBe("allowlist"); } }); - it("defaults telegram.streaming to false when telegram section exists", async () => { + it("defaults telegram.streaming to off when telegram section exists", async () => { const res = validateConfigObject({ channels: { telegram: {} } }); expect(res.ok).toBe(true); if (res.ok) { - expect(res.config.channels?.telegram?.streaming).toBe(false); + expect(res.config.channels?.telegram?.streaming).toBe("off"); expect(res.config.channels?.telegram?.streamMode).toBeUndefined(); } }); - it("migrates legacy telegram.streamMode=off to streaming=false", async () => { + it("migrates legacy telegram.streamMode=off to streaming=off", async () => { const res = validateConfigObject({ channels: { telegram: { streamMode: "off" } } }); expect(res.ok).toBe(true); if (res.ok) { - expect(res.config.channels?.telegram?.streaming).toBe(false); + expect(res.config.channels?.telegram?.streaming).toBe("off"); expect(res.config.channels?.telegram?.streamMode).toBeUndefined(); } }); - it("migrates legacy telegram.streamMode=block to streaming=true", async () => { + it("migrates legacy telegram.streamMode=block to streaming=block", async () => { const res = validateConfigObject({ channels: { telegram: { streamMode: "block" } } }); expect(res.ok).toBe(true); if (res.ok) { - expect(res.config.channels?.telegram?.streaming).toBe(true); + expect(res.config.channels?.telegram?.streaming).toBe("block"); expect(res.config.channels?.telegram?.streamMode).toBeUndefined(); } }); @@ -416,10 +416,113 @@ describe("legacy config detection", () => { }); expect(res.ok).toBe(true); if (res.ok) { - expect(res.config.channels?.telegram?.accounts?.ops?.streaming).toBe(false); + expect(res.config.channels?.telegram?.accounts?.ops?.streaming).toBe("off"); expect(res.config.channels?.telegram?.accounts?.ops?.streamMode).toBeUndefined(); } }); + it("normalizes channels.discord.streaming booleans in legacy migration", async () => { + const res = migrateLegacyConfig({ + channels: { + discord: { + streaming: true, + }, + }, + }); + expect(res.changes).toContain( + "Normalized channels.discord.streaming boolean → enum (partial).", + ); + expect(res.config?.channels?.discord?.streaming).toBe("partial"); + expect(res.config?.channels?.discord?.streamMode).toBeUndefined(); + }); + it("migrates channels.discord.streamMode to channels.discord.streaming in legacy migration", async () => { + const res = migrateLegacyConfig({ + channels: { + discord: { + streaming: false, + streamMode: "block", + }, + }, + }); + expect(res.changes).toContain( + "Moved channels.discord.streamMode → channels.discord.streaming (block).", + ); + expect(res.changes).toContain("Normalized channels.discord.streaming boolean → enum (block)."); + expect(res.config?.channels?.discord?.streaming).toBe("block"); + expect(res.config?.channels?.discord?.streamMode).toBeUndefined(); + }); + it("migrates discord.streaming=true to streaming=partial", async () => { + const res = validateConfigObject({ channels: { discord: { streaming: true } } }); + expect(res.ok).toBe(true); + if (res.ok) { + expect(res.config.channels?.discord?.streaming).toBe("partial"); + expect(res.config.channels?.discord?.streamMode).toBeUndefined(); + } + }); + it("migrates discord.streaming=false to streaming=off", async () => { + const res = validateConfigObject({ channels: { discord: { streaming: false } } }); + expect(res.ok).toBe(true); + if (res.ok) { + expect(res.config.channels?.discord?.streaming).toBe("off"); + expect(res.config.channels?.discord?.streamMode).toBeUndefined(); + } + }); + it("keeps explicit discord.streamMode and normalizes to streaming", async () => { + const res = validateConfigObject({ + channels: { discord: { streamMode: "block", streaming: false } }, + }); + expect(res.ok).toBe(true); + if (res.ok) { + expect(res.config.channels?.discord?.streaming).toBe("block"); + expect(res.config.channels?.discord?.streamMode).toBeUndefined(); + } + }); + it("migrates discord.accounts.*.streaming alias to streaming enum", async () => { + const res = validateConfigObject({ + channels: { + discord: { + accounts: { + work: { + streaming: true, + }, + }, + }, + }, + }); + expect(res.ok).toBe(true); + if (res.ok) { + expect(res.config.channels?.discord?.accounts?.work?.streaming).toBe("partial"); + expect(res.config.channels?.discord?.accounts?.work?.streamMode).toBeUndefined(); + } + }); + it("migrates slack.streamMode values to slack.streaming enum", async () => { + const res = validateConfigObject({ + channels: { + slack: { + streamMode: "status_final", + }, + }, + }); + expect(res.ok).toBe(true); + if (res.ok) { + expect(res.config.channels?.slack?.streaming).toBe("progress"); + expect(res.config.channels?.slack?.streamMode).toBeUndefined(); + expect(res.config.channels?.slack?.nativeStreaming).toBe(true); + } + }); + it("migrates legacy slack.streaming boolean to nativeStreaming", async () => { + const res = validateConfigObject({ + channels: { + slack: { + streaming: false, + }, + }, + }); + expect(res.ok).toBe(true); + if (res.ok) { + expect(res.config.channels?.slack?.streaming).toBe("partial"); + expect(res.config.channels?.slack?.nativeStreaming).toBe(false); + } + }); it('rejects whatsapp.dmPolicy="open" without allowFrom "*"', async () => { const res = validateConfigObject({ channels: { diff --git a/src/config/legacy.migrations.part-1.ts b/src/config/legacy.migrations.part-1.ts index 2a988d3afe1..9c6d71287fc 100644 --- a/src/config/legacy.migrations.part-1.ts +++ b/src/config/legacy.migrations.part-1.ts @@ -1,3 +1,9 @@ +import { + resolveDiscordPreviewStreamMode, + resolveSlackNativeStreaming, + resolveSlackStreamingMode, + resolveTelegramPreviewStreamMode, +} from "./discord-preview-streaming.js"; import { ensureRecord, getRecord, @@ -206,6 +212,115 @@ export const LEGACY_CONFIG_MIGRATIONS_PART_1: LegacyConfigMigration[] = [ raw.channels = channels; }, }, + { + id: "channels.streaming-keys->channels.streaming", + describe: + "Normalize legacy streaming keys to channels..streaming (Telegram/Discord/Slack)", + apply: (raw, changes) => { + const channels = getRecord(raw.channels); + if (!channels) { + return; + } + + const migrateProviderEntry = (params: { + provider: "telegram" | "discord" | "slack"; + entry: Record; + pathPrefix: string; + }) => { + const hasLegacyStreamMode = params.entry.streamMode !== undefined; + const legacyStreaming = params.entry.streaming; + const legacyNativeStreaming = params.entry.nativeStreaming; + + if (params.provider === "telegram") { + if (!hasLegacyStreamMode && typeof legacyStreaming !== "boolean") { + return; + } + const resolved = resolveTelegramPreviewStreamMode(params.entry); + params.entry.streaming = resolved; + if (hasLegacyStreamMode) { + delete params.entry.streamMode; + changes.push( + `Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolved}).`, + ); + } + if (typeof legacyStreaming === "boolean") { + changes.push(`Normalized ${params.pathPrefix}.streaming boolean → enum (${resolved}).`); + } + return; + } + + if (params.provider === "discord") { + if (!hasLegacyStreamMode && typeof legacyStreaming !== "boolean") { + return; + } + const resolved = resolveDiscordPreviewStreamMode(params.entry); + params.entry.streaming = resolved; + if (hasLegacyStreamMode) { + delete params.entry.streamMode; + changes.push( + `Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolved}).`, + ); + } + if (typeof legacyStreaming === "boolean") { + changes.push(`Normalized ${params.pathPrefix}.streaming boolean → enum (${resolved}).`); + } + return; + } + + if (!hasLegacyStreamMode && typeof legacyStreaming !== "boolean") { + return; + } + const resolvedStreaming = resolveSlackStreamingMode(params.entry); + const resolvedNativeStreaming = resolveSlackNativeStreaming(params.entry); + params.entry.streaming = resolvedStreaming; + params.entry.nativeStreaming = resolvedNativeStreaming; + if (hasLegacyStreamMode) { + delete params.entry.streamMode; + changes.push( + `Moved ${params.pathPrefix}.streamMode → ${params.pathPrefix}.streaming (${resolvedStreaming}).`, + ); + } + if (typeof legacyStreaming === "boolean") { + changes.push( + `Moved ${params.pathPrefix}.streaming (boolean) → ${params.pathPrefix}.nativeStreaming (${resolvedNativeStreaming}).`, + ); + } else if (typeof legacyNativeStreaming !== "boolean" && hasLegacyStreamMode) { + changes.push(`Set ${params.pathPrefix}.nativeStreaming → ${resolvedNativeStreaming}.`); + } + }; + + const migrateProvider = (provider: "telegram" | "discord" | "slack") => { + const providerEntry = getRecord(channels[provider]); + if (!providerEntry) { + return; + } + migrateProviderEntry({ + provider, + entry: providerEntry, + pathPrefix: `channels.${provider}`, + }); + const accounts = getRecord(providerEntry.accounts); + if (!accounts) { + return; + } + for (const [accountId, accountValue] of Object.entries(accounts)) { + const account = getRecord(accountValue); + if (!account) { + continue; + } + migrateProviderEntry({ + provider, + entry: account, + pathPrefix: `channels.${provider}.accounts.${accountId}`, + }); + } + }; + + migrateProvider("telegram"); + migrateProvider("discord"); + migrateProvider("slack"); + }, + }, { id: "routing.allowFrom->channels.whatsapp.allowFrom", describe: "Move routing.allowFrom to channels.whatsapp.allowFrom", diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index f9bae5271d4..ea489ace793 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -379,8 +379,12 @@ export const FIELD_HELP: Record = { "channels.slack.commands.native": 'Override native commands for Slack (bool or "auto").', "channels.slack.commands.nativeSkills": 'Override native skill commands for Slack (bool or "auto").', + "channels.slack.streaming": + 'Unified Slack stream preview mode: "off" | "partial" | "block" | "progress". Legacy boolean/streamMode keys are auto-mapped.', + "channels.slack.nativeStreaming": + "Enable native Slack text streaming (chat.startStream/chat.appendStream/chat.stopStream) when channels.slack.streaming is partial (default: true).", "channels.slack.streamMode": - "Live stream preview mode for Slack replies (replace | status_final | append).", + "Legacy Slack preview mode alias (replace | status_final | append); auto-migrated to channels.slack.streaming.", "session.agentToAgent.maxPingPongTurns": "Max reply-back turns between requester and target (0–5).", "channels.telegram.customCommands": @@ -403,13 +407,15 @@ export const FIELD_HELP: Record = { "channels.telegram.dmPolicy": 'Direct message access control ("pairing" recommended). "open" requires channels.telegram.allowFrom=["*"].', "channels.telegram.streaming": - "Enable Telegram live stream preview via message edits (default: false; legacy streamMode auto-maps here).", + 'Unified Telegram stream preview mode: "off" | "partial" | "block" | "progress". "progress" maps to "partial" on Telegram. Legacy boolean/streamMode keys are auto-mapped.', + "channels.discord.streaming": + 'Unified Discord stream preview mode: "off" | "partial" | "block" | "progress". "progress" maps to "partial" on Discord. Legacy boolean/streamMode keys are auto-mapped.', "channels.discord.streamMode": - "Live stream preview mode for Discord replies (off | partial | block). Separate from block streaming; uses sendMessage + editMessage.", + "Legacy Discord preview mode alias (off | partial | block); auto-migrated to channels.discord.streaming.", "channels.discord.draftChunk.minChars": - 'Minimum chars before emitting a Discord stream preview update when channels.discord.streamMode="block" (default: 200).', + 'Minimum chars before emitting a Discord stream preview update when channels.discord.streaming="block" (default: 200).', "channels.discord.draftChunk.maxChars": - 'Target max size for a Discord stream preview chunk when channels.discord.streamMode="block" (default: 800; clamped to channels.discord.textChunkLimit).', + 'Target max size for a Discord stream preview chunk when channels.discord.streaming="block" (default: 800; clamped to channels.discord.textChunkLimit).', "channels.discord.draftChunk.breakPreference": "Preferred breakpoints for Discord draft chunks (paragraph | newline | sentence). Default: paragraph.", "channels.telegram.retry.attempts": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index 1a6d898ae05..1a7ab498e7d 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -265,7 +265,7 @@ export const FIELD_LABELS: Record = { ...IRC_FIELD_LABELS, "channels.telegram.botToken": "Telegram Bot Token", "channels.telegram.dmPolicy": "Telegram DM Policy", - "channels.telegram.streaming": "Telegram Streaming", + "channels.telegram.streaming": "Telegram Streaming Mode", "channels.telegram.retry.attempts": "Telegram Retry Attempts", "channels.telegram.retry.minDelayMs": "Telegram Retry Min Delay (ms)", "channels.telegram.retry.maxDelayMs": "Telegram Retry Max Delay (ms)", @@ -281,7 +281,8 @@ export const FIELD_LABELS: Record = { "channels.bluebubbles.dmPolicy": "BlueBubbles DM Policy", "channels.discord.dmPolicy": "Discord DM Policy", "channels.discord.dm.policy": "Discord DM Policy", - "channels.discord.streamMode": "Discord Stream Mode", + "channels.discord.streaming": "Discord Streaming Mode", + "channels.discord.streamMode": "Discord Stream Mode (Legacy)", "channels.discord.draftChunk.minChars": "Discord Draft Chunk Min Chars", "channels.discord.draftChunk.maxChars": "Discord Draft Chunk Max Chars", "channels.discord.draftChunk.breakPreference": "Discord Draft Chunk Break Preference", @@ -312,7 +313,9 @@ export const FIELD_LABELS: Record = { "channels.slack.appToken": "Slack App Token", "channels.slack.userToken": "Slack User Token", "channels.slack.userTokenReadOnly": "Slack User Token Read Only", - "channels.slack.streamMode": "Slack Stream Mode", + "channels.slack.streaming": "Slack Streaming Mode", + "channels.slack.nativeStreaming": "Slack Native Streaming", + "channels.slack.streamMode": "Slack Stream Mode (Legacy)", "channels.slack.thread.historyScope": "Slack Thread History Scope", "channels.slack.thread.inheritParent": "Slack Thread Parent Inheritance", "channels.slack.thread.initialHistoryLimit": "Slack Thread Initial History Limit", diff --git a/src/config/types.discord.ts b/src/config/types.discord.ts index 3b5fbf94b00..a5ef6c6465a 100644 --- a/src/config/types.discord.ts +++ b/src/config/types.discord.ts @@ -13,7 +13,7 @@ import type { DmConfig, ProviderCommandsConfig } from "./types.messages.js"; import type { GroupToolPolicyBySenderConfig, GroupToolPolicyConfig } from "./types.tools.js"; import type { TtsConfig } from "./types.tts.js"; -export type DiscordStreamMode = "partial" | "block" | "off"; +export type DiscordStreamMode = "off" | "partial" | "block" | "progress"; export type DiscordDmConfig = { /** If false, ignore all incoming Discord DMs. Default: true. */ @@ -198,14 +198,20 @@ export type DiscordAccountConfig = { /** Disable block streaming for this account. */ blockStreaming?: boolean; /** - * Live preview streaming mode (edit-based, like Telegram). - * - "partial": send a message and continuously edit it with new content as tokens arrive. - * - "block": stream previews in draft-sized chunks (like Telegram block mode). - * - "off": no preview streaming (default). - * When enabled, block streaming is automatically suppressed to avoid double-streaming. + * Live stream preview mode: + * - "off": disable preview updates + * - "partial": edit a single preview message + * - "block": stream in chunked preview updates + * - "progress": alias that maps to "partial" on Discord + * + * Legacy boolean values are still accepted and auto-migrated. */ - streamMode?: DiscordStreamMode; - /** Chunking config for Discord stream previews in `streamMode: "block"`. */ + streaming?: DiscordStreamMode | boolean; + /** + * @deprecated Legacy key; migrated automatically to `streaming`. + */ + streamMode?: "partial" | "block" | "off"; + /** Chunking config for Discord stream previews in `streaming: "block"`. */ draftChunk?: BlockStreamingChunkConfig; /** Merge streamed block replies before sending. */ blockStreamingCoalesce?: BlockStreamingCoalesceConfig; diff --git a/src/config/types.slack.ts b/src/config/types.slack.ts index b3a509ee44b..323906cd311 100644 --- a/src/config/types.slack.ts +++ b/src/config/types.slack.ts @@ -45,7 +45,8 @@ export type SlackChannelConfig = { }; export type SlackReactionNotificationMode = "off" | "own" | "all" | "allowlist"; -export type SlackStreamMode = "replace" | "status_final" | "append"; +export type SlackStreamingMode = "off" | "partial" | "block" | "progress"; +export type SlackLegacyStreamMode = "replace" | "status_final" | "append"; export type SlackActionConfig = { reactions?: boolean; @@ -126,14 +127,22 @@ export type SlackAccountConfig = { /** Merge streamed block replies before sending. */ blockStreamingCoalesce?: BlockStreamingCoalesceConfig; /** - * Enable Slack native text streaming (Agents & AI Apps). Default: true. + * Stream preview mode: + * - "off": disable live preview streaming + * - "partial": replace preview text with the latest partial output (default) + * - "block": append chunked preview updates + * - "progress": show progress status, then send final text * - * Set to `false` to disable native Slack text streaming and use normal reply - * delivery behavior only. + * Legacy boolean values are still accepted and auto-migrated. */ - streaming?: boolean; - /** Slack stream preview mode (replace|status_final|append). Default: replace. */ - streamMode?: SlackStreamMode; + streaming?: SlackStreamingMode | boolean; + /** + * Slack native text streaming toggle (`chat.startStream` / `chat.appendStream` / `chat.stopStream`). + * Used when `streaming` is `partial`. Default: true. + */ + nativeStreaming?: boolean; + /** @deprecated Legacy preview mode key; migrated automatically to `streaming`. */ + streamMode?: SlackLegacyStreamMode; mediaMaxMb?: number; /** Reaction notification mode (off|own|all|allowlist). Default: own. */ reactionNotifications?: SlackReactionNotificationMode; diff --git a/src/config/types.telegram.ts b/src/config/types.telegram.ts index 68079ebf18c..46438553acf 100644 --- a/src/config/types.telegram.ts +++ b/src/config/types.telegram.ts @@ -28,6 +28,7 @@ export type TelegramNetworkConfig = { }; export type TelegramInlineButtonsScope = "off" | "dm" | "group" | "all" | "allowlist"; +export type TelegramStreamingMode = "off" | "partial" | "block" | "progress"; export type TelegramCapabilitiesConfig = | string[] @@ -95,15 +96,23 @@ export type TelegramAccountConfig = { textChunkLimit?: number; /** Chunking mode: "length" (default) splits by size; "newline" splits on every newline. */ chunkMode?: "length" | "newline"; - /** Enable live stream preview via message edits (default: true). */ - streaming?: boolean; + /** + * Stream preview mode: + * - "off": disable preview updates + * - "partial": edit a single preview message + * - "block": stream in larger chunked updates + * - "progress": alias that maps to "partial" on Telegram + * + * Legacy boolean values are still accepted and auto-migrated. + */ + streaming?: TelegramStreamingMode | boolean; /** Disable block streaming for this account. */ blockStreaming?: boolean; /** @deprecated Legacy chunking config from `streamMode: "block"`; ignored after migration. */ draftChunk?: BlockStreamingChunkConfig; /** Merge streamed block replies before sending. */ blockStreamingCoalesce?: BlockStreamingCoalesceConfig; - /** @deprecated Legacy key; migrated automatically to `streaming` boolean. */ + /** @deprecated Legacy key; migrated automatically to `streaming`. */ streamMode?: "off" | "partial" | "block"; mediaMaxMb?: number; /** Telegram API client timeout in seconds (grammY ApiClientOptions). */ diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index cac84e04b60..5fd0ae8fdb3 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -1,6 +1,12 @@ import { z } from "zod"; import { isSafeScpRemoteHost } from "../infra/scp-host.js"; import { isValidInboundPathRootPattern } from "../media/inbound-path-policy.js"; +import { + resolveDiscordPreviewStreamMode, + resolveSlackNativeStreaming, + resolveSlackStreamingMode, + resolveTelegramPreviewStreamMode, +} from "./discord-preview-streaming.js"; import { normalizeTelegramCommandDescription, normalizeTelegramCommandName, @@ -99,25 +105,24 @@ const validateTelegramCustomCommands = ( } }; -function normalizeTelegramStreamingConfig(value: { - streaming?: boolean; - streamMode?: "off" | "partial" | "block"; +function normalizeTelegramStreamingConfig(value: { streaming?: unknown; streamMode?: unknown }) { + value.streaming = resolveTelegramPreviewStreamMode(value); + delete value.streamMode; +} + +function normalizeDiscordStreamingConfig(value: { streaming?: unknown; streamMode?: unknown }) { + value.streaming = resolveDiscordPreviewStreamMode(value); + delete value.streamMode; +} + +function normalizeSlackStreamingConfig(value: { + streaming?: unknown; + nativeStreaming?: unknown; + streamMode?: unknown; }) { - if (typeof value.streaming === "boolean") { - delete value.streamMode; - return; - } - if (value.streamMode === "off") { - value.streaming = false; - delete value.streamMode; - return; - } - if (value.streamMode === "partial" || value.streamMode === "block") { - value.streaming = true; - delete value.streamMode; - return; - } - value.streaming = false; + value.nativeStreaming = resolveSlackNativeStreaming(value); + value.streaming = resolveSlackStreamingMode(value); + delete value.streamMode; } export const TelegramAccountSchemaBase = z @@ -143,7 +148,7 @@ export const TelegramAccountSchemaBase = z dms: z.record(z.string(), DmConfigSchema.optional()).optional(), textChunkLimit: z.number().int().positive().optional(), chunkMode: z.enum(["length", "newline"]).optional(), - streaming: z.boolean().optional(), + streaming: z.union([z.boolean(), z.enum(["off", "partial", "block", "progress"])]).optional(), blockStreaming: z.boolean().optional(), draftChunk: BlockStreamingChunkSchema.optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), @@ -332,7 +337,9 @@ export const DiscordAccountSchema = z chunkMode: z.enum(["length", "newline"]).optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), - streamMode: z.enum(["partial", "block", "off"]).optional().default("off"), + // Canonical streaming mode. Legacy aliases (`streamMode`, boolean `streaming`) are auto-mapped. + streaming: z.union([z.boolean(), z.enum(["off", "partial", "block", "progress"])]).optional(), + streamMode: z.enum(["partial", "block", "off"]).optional(), draftChunk: BlockStreamingChunkSchema.optional(), maxLinesPerMessage: z.number().int().positive().optional(), mediaMaxMb: z.number().positive().optional(), @@ -422,6 +429,8 @@ export const DiscordAccountSchema = z }) .strict() .superRefine((value, ctx) => { + normalizeDiscordStreamingConfig(value); + const activityText = typeof value.activity === "string" ? value.activity.trim() : ""; const hasActivity = Boolean(activityText); const hasActivityType = value.activityType !== undefined; @@ -610,7 +619,9 @@ export const SlackAccountSchema = z chunkMode: z.enum(["length", "newline"]).optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), - streaming: z.boolean().optional(), + streaming: z.union([z.boolean(), z.enum(["off", "partial", "block", "progress"])]).optional(), + nativeStreaming: z.boolean().optional(), + streamMode: z.enum(["replace", "status_final", "append"]).optional(), mediaMaxMb: z.number().positive().optional(), reactionNotifications: z.enum(["off", "own", "all", "allowlist"]).optional(), reactionAllowlist: z.array(z.union([z.string(), z.number()])).optional(), @@ -652,6 +663,8 @@ export const SlackAccountSchema = z }) .strict() .superRefine((value, ctx) => { + normalizeSlackStreamingConfig(value); + const dmPolicy = value.dmPolicy ?? value.dm?.policy ?? "pairing"; const allowFrom = value.allowFrom ?? value.dm?.allowFrom; const allowFromPath = diff --git a/src/discord/monitor/message-handler.process.test.ts b/src/discord/monitor/message-handler.process.test.ts index b344ff198af..b17586df8b2 100644 --- a/src/discord/monitor/message-handler.process.test.ts +++ b/src/discord/monitor/message-handler.process.test.ts @@ -381,6 +381,28 @@ describe("processDiscordMessage draft streaming", () => { expect(deliverDiscordReply).not.toHaveBeenCalled(); }); + it("accepts streaming=true alias for partial preview mode", async () => { + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.dispatcher.sendFinalReply({ text: "Hello\nWorld" }); + return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } }; + }); + + const ctx = await createBaseContext({ + discordConfig: { streaming: true, maxLinesPerMessage: 5 }, + }); + + // oxlint-disable-next-line typescript/no-explicit-any + await processDiscordMessage(ctx as any); + + expect(editMessageDiscord).toHaveBeenCalledWith( + "c1", + "preview-1", + { content: "Hello\nWorld" }, + { rest: {} }, + ); + expect(deliverDiscordReply).not.toHaveBeenCalled(); + }); + it("falls back to standard send when final needs multiple chunks", async () => { dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { await params?.dispatcher.sendFinalReply({ text: "Hello\nWorld" }); diff --git a/src/discord/monitor/message-handler.process.ts b/src/discord/monitor/message-handler.process.ts index 307fca48f96..80a63fdf49c 100644 --- a/src/discord/monitor/message-handler.process.ts +++ b/src/discord/monitor/message-handler.process.ts @@ -21,6 +21,7 @@ import { type StatusReactionAdapter, } from "../../channels/status-reactions.js"; import { createTypingCallbacks } from "../../channels/typing.js"; +import { resolveDiscordPreviewStreamMode } from "../../config/discord-preview-streaming.js"; import { resolveMarkdownTableMode } from "../../config/markdown-tables.js"; import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; @@ -413,7 +414,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) }); // --- Discord draft stream (edit-based preview streaming) --- - const discordStreamMode = discordConfig?.streamMode ?? "off"; + const discordStreamMode = resolveDiscordPreviewStreamMode(discordConfig); const draftMaxChars = Math.min(textLimit, 2000); const accountBlockStreamingEnabled = typeof discordConfig?.blockStreaming === "boolean" diff --git a/src/slack/monitor/message-handler/dispatch.streaming.test.ts b/src/slack/monitor/message-handler/dispatch.streaming.test.ts index 58f4ba06956..dc6eae7a44d 100644 --- a/src/slack/monitor/message-handler/dispatch.streaming.test.ts +++ b/src/slack/monitor/message-handler/dispatch.streaming.test.ts @@ -2,13 +2,15 @@ import { describe, expect, it } from "vitest"; import { isSlackStreamingEnabled, resolveSlackStreamingThreadHint } from "./dispatch.js"; describe("slack native streaming defaults", () => { - it("is enabled when config is undefined", () => { - expect(isSlackStreamingEnabled(undefined)).toBe(true); + it("is enabled for partial mode when native streaming is on", () => { + expect(isSlackStreamingEnabled({ mode: "partial", nativeStreaming: true })).toBe(true); }); - it("can be disabled explicitly", () => { - expect(isSlackStreamingEnabled(false)).toBe(false); - expect(isSlackStreamingEnabled(true)).toBe(true); + it("is disabled outside partial mode or when native streaming is off", () => { + expect(isSlackStreamingEnabled({ mode: "partial", nativeStreaming: false })).toBe(false); + expect(isSlackStreamingEnabled({ mode: "block", nativeStreaming: true })).toBe(false); + expect(isSlackStreamingEnabled({ mode: "progress", nativeStreaming: true })).toBe(false); + expect(isSlackStreamingEnabled({ mode: "off", nativeStreaming: true })).toBe(false); }); }); diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index 369550ae99f..922f873d8bc 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -14,7 +14,7 @@ import { createSlackDraftStream } from "../../draft-stream.js"; import { applyAppendOnlyStreamUpdate, buildStatusFinalPreviewText, - resolveSlackStreamMode, + resolveSlackStreamingConfig, } from "../../stream-mode.js"; import type { SlackStreamSession } from "../../streaming.js"; import { appendSlackStream, startSlackStream, stopSlackStream } from "../../streaming.js"; @@ -26,8 +26,14 @@ function hasMedia(payload: ReplyPayload): boolean { return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; } -export function isSlackStreamingEnabled(streaming: boolean | undefined): boolean { - return streaming !== false; +export function isSlackStreamingEnabled(params: { + mode: "off" | "partial" | "block" | "progress"; + nativeStreaming: boolean; +}): boolean { + if (params.mode !== "partial") { + return false; + } + return params.nativeStreaming; } export function resolveSlackStreamingThreadHint(params: { @@ -146,7 +152,16 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag accountId: route.accountId, }); - const streamingEnabled = isSlackStreamingEnabled(account.config.streaming); + const slackStreaming = resolveSlackStreamingConfig({ + streaming: account.config.streaming, + streamMode: account.config.streamMode, + nativeStreaming: account.config.nativeStreaming, + }); + const previewStreamingEnabled = slackStreaming.mode !== "off"; + const streamingEnabled = isSlackStreamingEnabled({ + mode: slackStreaming.mode, + nativeStreaming: slackStreaming.nativeStreaming, + }); const streamThreadHint = resolveSlackStreamingThreadHint({ replyToMode: ctx.replyToMode, incomingThreadTs, @@ -233,6 +248,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag const draftChannelId = draftStream?.channelId(); const finalText = payload.text; const canFinalizeViaPreviewEdit = + previewStreamingEnabled && streamMode !== "status_final" && mediaCount === 0 && !payload.isError && @@ -256,7 +272,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag `slack: preview final edit failed; falling back to standard send (${String(err)})`, ); } - } else if (streamMode === "status_final" && hasStreamedMessage) { + } else if (previewStreamingEnabled && streamMode === "status_final" && hasStreamedMessage) { try { const statusChannelId = draftStream?.channelId(); const statusMessageId = draftStream?.messageId(); @@ -307,7 +323,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag warn: logVerbose, }); let hasStreamedMessage = false; - const streamMode = resolveSlackStreamMode(account.config.streamMode); + const streamMode = slackStreaming.draftMode; let appendRenderedText = ""; let appendSourceText = ""; let statusUpdateCount = 0; @@ -363,31 +379,37 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag onModelSelected, onPartialReply: useStreaming ? undefined - : async (payload) => { - updateDraftFromPartial(payload.text); - }, + : !previewStreamingEnabled + ? undefined + : async (payload) => { + updateDraftFromPartial(payload.text); + }, onAssistantMessageStart: useStreaming ? undefined - : async () => { - if (hasStreamedMessage) { - draftStream.forceNewMessage(); - hasStreamedMessage = false; - appendRenderedText = ""; - appendSourceText = ""; - statusUpdateCount = 0; - } - }, + : !previewStreamingEnabled + ? undefined + : async () => { + if (hasStreamedMessage) { + draftStream.forceNewMessage(); + hasStreamedMessage = false; + appendRenderedText = ""; + appendSourceText = ""; + statusUpdateCount = 0; + } + }, onReasoningEnd: useStreaming ? undefined - : async () => { - if (hasStreamedMessage) { - draftStream.forceNewMessage(); - hasStreamedMessage = false; - appendRenderedText = ""; - appendSourceText = ""; - statusUpdateCount = 0; - } - }, + : !previewStreamingEnabled + ? undefined + : async () => { + if (hasStreamedMessage) { + draftStream.forceNewMessage(); + hasStreamedMessage = false; + appendRenderedText = ""; + appendSourceText = ""; + statusUpdateCount = 0; + } + }, }, }); await draftStream.flush(); diff --git a/src/slack/stream-mode.test.ts b/src/slack/stream-mode.test.ts index aa913420059..c0146d323cc 100644 --- a/src/slack/stream-mode.test.ts +++ b/src/slack/stream-mode.test.ts @@ -2,6 +2,7 @@ import { describe, expect, it } from "vitest"; import { applyAppendOnlyStreamUpdate, buildStatusFinalPreviewText, + resolveSlackStreamingConfig, resolveSlackStreamMode, } from "./stream-mode.js"; @@ -19,6 +20,48 @@ describe("resolveSlackStreamMode", () => { }); }); +describe("resolveSlackStreamingConfig", () => { + it("defaults to partial mode with native streaming enabled", () => { + expect(resolveSlackStreamingConfig({})).toEqual({ + mode: "partial", + nativeStreaming: true, + draftMode: "replace", + }); + }); + + it("maps legacy streamMode values to unified streaming modes", () => { + expect(resolveSlackStreamingConfig({ streamMode: "append" })).toMatchObject({ + mode: "block", + draftMode: "append", + }); + expect(resolveSlackStreamingConfig({ streamMode: "status_final" })).toMatchObject({ + mode: "progress", + draftMode: "status_final", + }); + }); + + it("moves legacy streaming boolean to native streaming toggle", () => { + expect(resolveSlackStreamingConfig({ streaming: false })).toEqual({ + mode: "partial", + nativeStreaming: false, + draftMode: "replace", + }); + }); + + it("accepts unified enum values directly", () => { + expect(resolveSlackStreamingConfig({ streaming: "off" })).toEqual({ + mode: "off", + nativeStreaming: true, + draftMode: "replace", + }); + expect(resolveSlackStreamingConfig({ streaming: "progress" })).toEqual({ + mode: "progress", + nativeStreaming: true, + draftMode: "status_final", + }); + }); +}); + describe("applyAppendOnlyStreamUpdate", () => { it("starts with first incoming text", () => { const next = applyAppendOnlyStreamUpdate({ diff --git a/src/slack/stream-mode.ts b/src/slack/stream-mode.ts index be523f04d33..44abc91bcb9 100644 --- a/src/slack/stream-mode.ts +++ b/src/slack/stream-mode.ts @@ -1,5 +1,13 @@ -export type SlackStreamMode = "replace" | "status_final" | "append"; +import { + mapStreamingModeToSlackLegacyDraftStreamMode, + resolveSlackNativeStreaming, + resolveSlackStreamingMode, + type SlackLegacyDraftStreamMode, + type StreamingMode, +} from "../config/discord-preview-streaming.js"; +export type SlackStreamMode = SlackLegacyDraftStreamMode; +export type SlackStreamingMode = StreamingMode; const DEFAULT_STREAM_MODE: SlackStreamMode = "replace"; export function resolveSlackStreamMode(raw: unknown): SlackStreamMode { @@ -13,6 +21,20 @@ export function resolveSlackStreamMode(raw: unknown): SlackStreamMode { return DEFAULT_STREAM_MODE; } +export function resolveSlackStreamingConfig(params: { + streaming?: unknown; + streamMode?: unknown; + nativeStreaming?: unknown; +}): { mode: SlackStreamingMode; nativeStreaming: boolean; draftMode: SlackStreamMode } { + const mode = resolveSlackStreamingMode(params); + const nativeStreaming = resolveSlackNativeStreaming(params); + return { + mode, + nativeStreaming, + draftMode: mapStreamingModeToSlackLegacyDraftStreamMode(mode), + }; +} + export function applyAppendOnlyStreamUpdate(params: { incoming: string; rendered: string; diff --git a/src/telegram/bot.helpers.test.ts b/src/telegram/bot.helpers.test.ts index aa68107bf91..8f1e0252d68 100644 --- a/src/telegram/bot.helpers.test.ts +++ b/src/telegram/bot.helpers.test.ts @@ -15,6 +15,10 @@ describe("resolveTelegramStreamMode", () => { it("maps legacy streamMode values", () => { expect(resolveTelegramStreamMode({ streamMode: "off" })).toBe("off"); expect(resolveTelegramStreamMode({ streamMode: "partial" })).toBe("partial"); - expect(resolveTelegramStreamMode({ streamMode: "block" })).toBe("partial"); + expect(resolveTelegramStreamMode({ streamMode: "block" })).toBe("block"); + }); + + it("maps unified progress mode to partial on Telegram", () => { + expect(resolveTelegramStreamMode({ streaming: "progress" })).toBe("partial"); }); }); diff --git a/src/telegram/bot/helpers.ts b/src/telegram/bot/helpers.ts index 59e0634135d..79bc7f75dc2 100644 --- a/src/telegram/bot/helpers.ts +++ b/src/telegram/bot/helpers.ts @@ -1,5 +1,6 @@ import type { Chat, Message, MessageOrigin, User } from "@grammyjs/types"; import { formatLocationText, type NormalizedLocation } from "../../channels/location.js"; +import { resolveTelegramPreviewStreamMode } from "../../config/discord-preview-streaming.js"; import type { TelegramGroupConfig, TelegramTopicConfig } from "../../config/types.js"; import { readChannelAllowFromStore } from "../../pairing/pairing-store.js"; import { @@ -154,20 +155,10 @@ export function buildTypingThreadParams(messageThreadId?: number) { } export function resolveTelegramStreamMode(telegramCfg?: { - streaming?: boolean; - streamMode?: TelegramStreamMode; + streaming?: unknown; + streamMode?: unknown; }): TelegramStreamMode { - if (typeof telegramCfg?.streaming === "boolean") { - return telegramCfg.streaming ? "partial" : "off"; - } - const raw = telegramCfg?.streamMode?.trim().toLowerCase(); - if (raw === "off") { - return "off"; - } - if (raw === "partial" || raw === "block") { - return "partial"; - } - return "off"; + return resolveTelegramPreviewStreamMode(telegramCfg); } export function buildTelegramGroupPeerId(chatId: number | string, messageThreadId?: number) {