diff --git a/CHANGELOG.md b/CHANGELOG.md index f399942b04c..5f22ef04477 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Docs: https://docs.openclaw.ai - iOS/Tests: cover IPv4-mapped IPv6 loopback in manual TLS policy tests for connect validation paths. (#22045) Thanks @mbelinky. - iOS/Gateway: stabilize background wake and reconnect behavior with background reconnect suppression/lease windows, BGAppRefresh wake fallback, location wake hook throttling, and APNs wake retry+nudge instrumentation. (#21226) thanks @mbelinky. - Auto-reply/UI: add model fallback lifecycle visibility in verbose logs, /status active-model context with fallback reason, and cohesive WebUI fallback indicators. (#20704) Thanks @joshavant. +- Discord/Streaming: add stream preview mode for live draft replies with partial/block options and configurable chunking. Thanks @thewilloftheshadow. Inspiration @neoagentic-ship-it. ### Fixes diff --git a/docs/channels/discord.md b/docs/channels/discord.md index 774a0eba1a8..464dc430db4 100644 --- a/docs/channels/discord.md +++ b/docs/channels/discord.md @@ -530,6 +530,49 @@ See [Slash commands](/tools/slash-commands) for command catalog and behavior. + + OpenClaw can stream draft replies by sending a temporary message and editing it as text arrives. + + - `channels.discord.streamMode` controls preview streaming (`off` | `partial` | `block`, default: `off`). + - `partial` edits a single preview message as tokens arrive. + - `block` emits draft-sized chunks (use `draftChunk` to tune size and breakpoints). + + Example: + +```json5 +{ + channels: { + discord: { + streamMode: "partial", + }, + }, +} +``` + + `block` mode chunking defaults (clamped to `channels.discord.textChunkLimit`): + +```json5 +{ + channels: { + discord: { + streamMode: "block", + draftChunk: { + minChars: 200, + maxChars: 800, + breakPreference: "paragraph", + }, + }, + }, +} +``` + + Preview streaming is text-only; media replies fall back to normal delivery. + + Note: preview streaming is separate from block streaming. When block streaming is explicitly + enabled for Discord, OpenClaw skips the preview stream to avoid double streaming. + + + Guild history context: @@ -863,6 +906,7 @@ High-signal Discord fields: - command: `commands.native`, `commands.useAccessGroups`, `configWrites` - reply/history: `replyToMode`, `historyLimit`, `dmHistoryLimit`, `dms.*.historyLimit` - delivery: `textChunkLimit`, `chunkMode`, `maxLinesPerMessage` +- streaming: `streamMode`, `draftChunk`, `blockStreaming`, `blockStreamingCoalesce` - media/retry: `mediaMaxMb`, `retry` - actions: `actions.*` - presence: `activity`, `status`, `activityType`, `activityUrl` diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index f280e634c66..be65b0e3c1c 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -385,6 +385,14 @@ export const FIELD_HELP: Record = { 'Target max size for a Telegram stream preview chunk when channels.telegram.streamMode="block" (default: 800; clamped to channels.telegram.textChunkLimit).', "channels.telegram.draftChunk.breakPreference": "Preferred breakpoints for Telegram draft chunks (paragraph | newline | sentence). Default: paragraph.", + "channels.discord.streamMode": + "Live stream preview mode for Discord replies (off | partial | block). Separate from block streaming; uses sendMessage + editMessage.", + "channels.discord.draftChunk.minChars": + 'Minimum chars before emitting a Discord stream preview update when channels.discord.streamMode="block" (default: 200).', + "channels.discord.draftChunk.maxChars": + 'Target max size for a Discord stream preview chunk when channels.discord.streamMode="block" (default: 800; clamped to channels.discord.textChunkLimit).', + "channels.discord.draftChunk.breakPreference": + "Preferred breakpoints for Discord draft chunks (paragraph | newline | sentence). Default: paragraph.", "channels.telegram.retry.attempts": "Max retry attempts for outbound Telegram API calls (default: 3).", "channels.telegram.retry.minDelayMs": "Minimum retry delay in ms for Telegram outbound calls.", diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index 8a1e45a5640..cc7aac534a0 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -275,6 +275,10 @@ export const FIELD_LABELS: Record = { "channels.bluebubbles.dmPolicy": "BlueBubbles DM Policy", "channels.discord.dmPolicy": "Discord DM Policy", "channels.discord.dm.policy": "Discord DM Policy", + "channels.discord.streamMode": "Discord Stream Mode", + "channels.discord.draftChunk.minChars": "Discord Draft Chunk Min Chars", + "channels.discord.draftChunk.maxChars": "Discord Draft Chunk Max Chars", + "channels.discord.draftChunk.breakPreference": "Discord Draft Chunk Break Preference", "channels.discord.retry.attempts": "Discord Retry Attempts", "channels.discord.retry.minDelayMs": "Discord Retry Min Delay (ms)", "channels.discord.retry.maxDelayMs": "Discord Retry Max Delay (ms)", diff --git a/src/config/types.discord.ts b/src/config/types.discord.ts index a578338ead7..1bce558c16c 100644 --- a/src/config/types.discord.ts +++ b/src/config/types.discord.ts @@ -1,5 +1,6 @@ import type { DiscordPluralKitConfig } from "../discord/pluralkit.js"; import type { + BlockStreamingChunkConfig, BlockStreamingCoalesceConfig, DmPolicy, GroupPolicy, @@ -11,6 +12,8 @@ import type { ChannelHeartbeatVisibilityConfig } from "./types.channels.js"; import type { DmConfig, ProviderCommandsConfig } from "./types.messages.js"; import type { GroupToolPolicyBySenderConfig, GroupToolPolicyConfig } from "./types.tools.js"; +export type DiscordStreamMode = "partial" | "block" | "off"; + export type DiscordDmConfig = { /** If false, ignore all incoming Discord DMs. Default: true. */ enabled?: boolean; @@ -153,6 +156,16 @@ export type DiscordAccountConfig = { chunkMode?: "length" | "newline"; /** Disable block streaming for this account. */ blockStreaming?: boolean; + /** + * Live preview streaming mode (edit-based, like Telegram). + * - "partial": send a message and continuously edit it with new content as tokens arrive. + * - "block": stream previews in draft-sized chunks (like Telegram block mode). + * - "off": no preview streaming (default). + * When enabled, block streaming is automatically suppressed to avoid double-streaming. + */ + streamMode?: DiscordStreamMode; + /** Chunking config for Discord stream previews in `streamMode: "block"`. */ + draftChunk?: BlockStreamingChunkConfig; /** Merge streamed block replies before sending. */ blockStreamingCoalesce?: BlockStreamingCoalesceConfig; /** diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index 416d559ad3c..8f9be6c4056 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -290,6 +290,8 @@ export const DiscordAccountSchema = z chunkMode: z.enum(["length", "newline"]).optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), + streamMode: z.enum(["partial", "block", "off"]).optional().default("off"), + draftChunk: BlockStreamingChunkSchema.optional(), maxLinesPerMessage: z.number().int().positive().optional(), mediaMaxMb: z.number().positive().optional(), retry: RetryConfigSchema, diff --git a/src/discord/draft-chunking.ts b/src/discord/draft-chunking.ts new file mode 100644 index 00000000000..f238ed472af --- /dev/null +++ b/src/discord/draft-chunking.ts @@ -0,0 +1,41 @@ +import { resolveTextChunkLimit } from "../auto-reply/chunk.js"; +import { getChannelDock } from "../channels/dock.js"; +import type { OpenClawConfig } from "../config/config.js"; +import { normalizeAccountId } from "../routing/session-key.js"; + +const DEFAULT_DISCORD_DRAFT_STREAM_MIN = 200; +const DEFAULT_DISCORD_DRAFT_STREAM_MAX = 800; + +export function resolveDiscordDraftStreamingChunking( + cfg: OpenClawConfig | undefined, + accountId?: string | null, +): { + minChars: number; + maxChars: number; + breakPreference: "paragraph" | "newline" | "sentence"; +} { + const providerChunkLimit = getChannelDock("discord")?.outbound?.textChunkLimit; + const textLimit = resolveTextChunkLimit(cfg, "discord", accountId, { + fallbackLimit: providerChunkLimit, + }); + const normalizedAccountId = normalizeAccountId(accountId); + const draftCfg = + cfg?.channels?.discord?.accounts?.[normalizedAccountId]?.draftChunk ?? + cfg?.channels?.discord?.draftChunk; + + const maxRequested = Math.max( + 1, + Math.floor(draftCfg?.maxChars ?? DEFAULT_DISCORD_DRAFT_STREAM_MAX), + ); + const maxChars = Math.max(1, Math.min(maxRequested, textLimit)); + const minRequested = Math.max( + 1, + Math.floor(draftCfg?.minChars ?? DEFAULT_DISCORD_DRAFT_STREAM_MIN), + ); + const minChars = Math.min(minRequested, maxChars); + const breakPreference = + draftCfg?.breakPreference === "newline" || draftCfg?.breakPreference === "sentence" + ? draftCfg.breakPreference + : "paragraph"; + return { minChars, maxChars, breakPreference }; +} diff --git a/src/discord/draft-stream.ts b/src/discord/draft-stream.ts new file mode 100644 index 00000000000..835fee2341d --- /dev/null +++ b/src/discord/draft-stream.ts @@ -0,0 +1,161 @@ +import type { RequestClient } from "@buape/carbon"; +import { Routes } from "discord-api-types/v10"; +import { createDraftStreamLoop } from "../channels/draft-stream-loop.js"; + +/** Discord messages cap at 2000 characters. */ +const DISCORD_STREAM_MAX_CHARS = 2000; +const DEFAULT_THROTTLE_MS = 1200; + +export type DiscordDraftStream = { + update: (text: string) => void; + flush: () => Promise; + messageId: () => string | undefined; + clear: () => Promise; + stop: () => Promise; + /** Reset internal state so the next update creates a new message instead of editing. */ + forceNewMessage: () => void; +}; + +export function createDiscordDraftStream(params: { + rest: RequestClient; + channelId: string; + maxChars?: number; + replyToMessageId?: string | (() => string | undefined); + throttleMs?: number; + /** Minimum chars before sending first message (debounce for push notifications) */ + minInitialChars?: number; + log?: (message: string) => void; + warn?: (message: string) => void; +}): DiscordDraftStream { + const maxChars = Math.min(params.maxChars ?? DISCORD_STREAM_MAX_CHARS, DISCORD_STREAM_MAX_CHARS); + const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS); + const minInitialChars = params.minInitialChars; + const channelId = params.channelId; + const rest = params.rest; + const resolveReplyToMessageId = () => + typeof params.replyToMessageId === "function" + ? params.replyToMessageId() + : params.replyToMessageId; + + let streamMessageId: string | undefined; + let lastSentText = ""; + let stopped = false; + let isFinal = false; + + const sendOrEditStreamMessage = async (text: string): Promise => { + // Allow final flush even if stopped (e.g., after clear()). + if (stopped && !isFinal) { + return false; + } + const trimmed = text.trimEnd(); + if (!trimmed) { + return false; + } + if (trimmed.length > maxChars) { + // Discord messages cap at 2000 chars. + // Stop streaming once we exceed the cap to avoid repeated API failures. + stopped = true; + params.warn?.(`discord stream preview stopped (text length ${trimmed.length} > ${maxChars})`); + return false; + } + if (trimmed === lastSentText) { + return true; + } + + // Debounce first preview send for better push notification quality. + if (streamMessageId === undefined && minInitialChars != null && !isFinal) { + if (trimmed.length < minInitialChars) { + return false; + } + } + + lastSentText = trimmed; + try { + if (streamMessageId !== undefined) { + // Edit existing message + await rest.patch(Routes.channelMessage(channelId, streamMessageId), { + body: { content: trimmed }, + }); + return true; + } + // Send new message + const replyToMessageId = resolveReplyToMessageId()?.trim(); + const messageReference = replyToMessageId + ? { message_id: replyToMessageId, fail_if_not_exists: false } + : undefined; + const sent = (await rest.post(Routes.channelMessages(channelId), { + body: { + content: trimmed, + ...(messageReference ? { message_reference: messageReference } : {}), + }, + })) as { id?: string } | undefined; + const sentMessageId = sent?.id; + if (typeof sentMessageId !== "string" || !sentMessageId) { + stopped = true; + params.warn?.("discord stream preview stopped (missing message id from send)"); + return false; + } + streamMessageId = sentMessageId; + return true; + } catch (err) { + stopped = true; + params.warn?.( + `discord stream preview failed: ${err instanceof Error ? err.message : String(err)}`, + ); + return false; + } + }; + + const loop = createDraftStreamLoop({ + throttleMs, + isStopped: () => stopped, + sendOrEditStreamMessage, + }); + + const update = (text: string) => { + if (stopped || isFinal) { + return; + } + loop.update(text); + }; + + const stop = async (): Promise => { + isFinal = true; + await loop.flush(); + }; + + const clear = async () => { + stopped = true; + loop.stop(); + await loop.waitForInFlight(); + const messageId = streamMessageId; + streamMessageId = undefined; + if (typeof messageId !== "string") { + return; + } + try { + await rest.delete(Routes.channelMessage(channelId, messageId)); + } catch (err) { + params.warn?.( + `discord stream preview cleanup failed: ${err instanceof Error ? err.message : String(err)}`, + ); + } + }; + + const forceNewMessage = () => { + streamMessageId = undefined; + lastSentText = ""; + loop.resetPending(); + }; + + params.log?.(`discord stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`); + + return { + update, + flush: loop.flush, + messageId: () => streamMessageId, + clear, + stop, + forceNewMessage, + }; +} diff --git a/src/discord/monitor/message-handler.process.test.ts b/src/discord/monitor/message-handler.process.test.ts index 47b0586d6e5..a279f103162 100644 --- a/src/discord/monitor/message-handler.process.test.ts +++ b/src/discord/monitor/message-handler.process.test.ts @@ -3,10 +3,27 @@ import { createBaseDiscordMessageContext } from "./message-handler.test-harness. const reactMessageDiscord = vi.fn(async () => {}); const removeReactionDiscord = vi.fn(async () => {}); +const editMessageDiscord = vi.fn(async () => ({})); +const deliverDiscordReply = vi.fn(async () => {}); +const createDiscordDraftStream = vi.fn(() => ({ + update: vi.fn<(text: string) => void>(() => {}), + flush: vi.fn(async () => {}), + messageId: vi.fn(() => "preview-1"), + clear: vi.fn(async () => {}), + stop: vi.fn(async () => {}), + forceNewMessage: vi.fn(() => {}), +})); + type DispatchInboundParams = { + dispatcher: { + sendFinalReply: (payload: { text?: string }) => boolean | Promise; + }; replyOptions?: { onReasoningStream?: () => Promise | void; + onReasoningEnd?: () => Promise | void; onToolStart?: (payload: { name?: string }) => Promise | void; + onPartialReply?: (payload: { text?: string }) => Promise | void; + onAssistantMessageStart?: () => Promise | void; }; }; const dispatchInboundMessage = vi.fn(async (_params?: DispatchInboundParams) => ({ @@ -22,23 +39,40 @@ vi.mock("../send.js", () => ({ removeReactionDiscord, })); +vi.mock("../send.messages.js", () => ({ + editMessageDiscord, +})); + +vi.mock("../draft-stream.js", () => ({ + createDiscordDraftStream, +})); + +vi.mock("./reply-delivery.js", () => ({ + deliverDiscordReply, +})); + vi.mock("../../auto-reply/dispatch.js", () => ({ dispatchInboundMessage, })); vi.mock("../../auto-reply/reply/reply-dispatcher.js", () => ({ - createReplyDispatcherWithTyping: vi.fn(() => ({ - dispatcher: { - sendToolResult: vi.fn(() => true), - sendBlockReply: vi.fn(() => true), - sendFinalReply: vi.fn(() => true), - waitForIdle: vi.fn(async () => {}), - getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), - markComplete: vi.fn(), - }, - replyOptions: {}, - markDispatchIdle: vi.fn(), - })), + createReplyDispatcherWithTyping: vi.fn( + (opts: { deliver: (payload: unknown, info: { kind: string }) => Promise | void }) => ({ + dispatcher: { + sendToolResult: vi.fn(() => true), + sendBlockReply: vi.fn(() => true), + sendFinalReply: vi.fn((payload: unknown) => { + void opts.deliver(payload as never, { kind: "final" }); + return true; + }), + waitForIdle: vi.fn(async () => {}), + getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), + markComplete: vi.fn(), + }, + replyOptions: {}, + markDispatchIdle: vi.fn(), + }), + ), })); vi.mock("../../channels/session.js", () => ({ @@ -58,6 +92,9 @@ beforeEach(() => { vi.useRealTimers(); reactMessageDiscord.mockClear(); removeReactionDiscord.mockClear(); + editMessageDiscord.mockClear(); + deliverDiscordReply.mockClear(); + createDiscordDraftStream.mockClear(); dispatchInboundMessage.mockReset(); recordInboundSession.mockReset(); readSessionUpdatedAt.mockReset(); @@ -252,3 +289,116 @@ describe("processDiscordMessage session routing", () => { }); }); }); + +describe("processDiscordMessage draft streaming", () => { + it("finalizes via preview edit when final fits one chunk", async () => { + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.dispatcher.sendFinalReply({ text: "Hello\nWorld" }); + return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } }; + }); + + const ctx = await createBaseContext({ + discordConfig: { streamMode: "partial", maxLinesPerMessage: 5 }, + }); + + // oxlint-disable-next-line typescript/no-explicit-any + await processDiscordMessage(ctx as any); + + expect(editMessageDiscord).toHaveBeenCalledWith( + "c1", + "preview-1", + { content: "Hello\nWorld" }, + { rest: {} }, + ); + expect(deliverDiscordReply).not.toHaveBeenCalled(); + }); + + it("falls back to standard send when final needs multiple chunks", async () => { + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.dispatcher.sendFinalReply({ text: "Hello\nWorld" }); + return { queuedFinal: true, counts: { final: 1, tool: 0, block: 0 } }; + }); + + const ctx = await createBaseContext({ + discordConfig: { streamMode: "partial", maxLinesPerMessage: 1 }, + }); + + // oxlint-disable-next-line typescript/no-explicit-any + await processDiscordMessage(ctx as any); + + expect(editMessageDiscord).not.toHaveBeenCalled(); + expect(deliverDiscordReply).toHaveBeenCalledTimes(1); + }); + + it("streams block previews using draft chunking", async () => { + const draftStream = { + update: vi.fn<(text: string) => void>(() => {}), + flush: vi.fn(async () => {}), + messageId: vi.fn(() => "preview-1"), + clear: vi.fn(async () => {}), + stop: vi.fn(async () => {}), + forceNewMessage: vi.fn(() => {}), + }; + createDiscordDraftStream.mockReturnValueOnce(draftStream); + + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.replyOptions?.onPartialReply?.({ text: "HelloWorld" }); + return { queuedFinal: false, counts: { final: 0, tool: 0, block: 0 } }; + }); + + const ctx = await createBaseContext({ + cfg: { + messages: { ackReaction: "👀" }, + session: { store: "/tmp/openclaw-discord-process-test-sessions.json" }, + channels: { + discord: { + draftChunk: { minChars: 1, maxChars: 5, breakPreference: "newline" }, + }, + }, + }, + discordConfig: { streamMode: "block" }, + }); + + // oxlint-disable-next-line typescript/no-explicit-any + await processDiscordMessage(ctx as any); + + const updates = draftStream.update.mock.calls.map((call) => call[0]); + expect(updates).toEqual(["Hello", "HelloWorld"]); + }); + + it("forces new preview messages on assistant boundaries in block mode", async () => { + const draftStream = { + update: vi.fn<(text: string) => void>(() => {}), + flush: vi.fn(async () => {}), + messageId: vi.fn(() => "preview-1"), + clear: vi.fn(async () => {}), + stop: vi.fn(async () => {}), + forceNewMessage: vi.fn(() => {}), + }; + createDiscordDraftStream.mockReturnValueOnce(draftStream); + + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.replyOptions?.onPartialReply?.({ text: "Hello" }); + await params?.replyOptions?.onAssistantMessageStart?.(); + return { queuedFinal: false, counts: { final: 0, tool: 0, block: 0 } }; + }); + + const ctx = await createBaseContext({ + cfg: { + messages: { ackReaction: "👀" }, + session: { store: "/tmp/openclaw-discord-process-test-sessions.json" }, + channels: { + discord: { + draftChunk: { minChars: 1, maxChars: 5, breakPreference: "newline" }, + }, + }, + }, + discordConfig: { streamMode: "block" }, + }); + + // oxlint-disable-next-line typescript/no-explicit-any + await processDiscordMessage(ctx as any); + + expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/discord/monitor/message-handler.process.ts b/src/discord/monitor/message-handler.process.ts index c627604a4b4..a93e020d1b2 100644 --- a/src/discord/monitor/message-handler.process.ts +++ b/src/discord/monitor/message-handler.process.ts @@ -1,5 +1,6 @@ import { ChannelType } from "@buape/carbon"; import { resolveAckReaction, resolveHumanDelayConfig } from "../../agents/identity.js"; +import { EmbeddedBlockChunker } from "../../agents/pi-embedded-block-chunker.js"; import { resolveChunkMode } from "../../auto-reply/chunk.js"; import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; import { formatInboundEnvelope, resolveEnvelopeFormatOptions } from "../../auto-reply/envelope.js"; @@ -18,11 +19,16 @@ import { createTypingCallbacks } from "../../channels/typing.js"; import { resolveMarkdownTableMode } from "../../config/markdown-tables.js"; import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js"; import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; +import { convertMarkdownTables } from "../../markdown/tables.js"; import { buildAgentSessionKey } from "../../routing/resolve-route.js"; import { resolveThreadSessionKeys } from "../../routing/session-key.js"; import { buildUntrustedChannelMetadata } from "../../security/channel-metadata.js"; import { truncateUtf16Safe } from "../../utils.js"; +import { chunkDiscordTextWithMode } from "../chunk.js"; +import { resolveDiscordDraftStreamingChunking } from "../draft-chunking.js"; +import { createDiscordDraftStream } from "../draft-stream.js"; import { reactMessageDiscord, removeReactionDiscord } from "../send.js"; +import { editMessageDiscord } from "../send.messages.js"; import { normalizeDiscordSlug, resolveDiscordOwnerAllowFrom } from "./allow-list.js"; import { resolveTimestampMs } from "./format.js"; import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js"; @@ -594,6 +600,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) channel: "discord", accountId, }); + const chunkMode = resolveChunkMode(cfg, "discord", accountId); const typingCallbacks = createTypingCallbacks({ start: () => sendTyping({ client, channelId: typingChannelId }), @@ -607,10 +614,216 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) }, }); + // --- Discord draft stream (edit-based preview streaming) --- + const discordStreamMode = discordConfig?.streamMode ?? "off"; + const draftMaxChars = Math.min(textLimit, 2000); + const accountBlockStreamingEnabled = + typeof discordConfig?.blockStreaming === "boolean" + ? discordConfig.blockStreaming + : cfg.agents?.defaults?.blockStreamingDefault === "on"; + const canStreamDraft = discordStreamMode !== "off" && !accountBlockStreamingEnabled; + const draftReplyToMessageId = () => replyReference.use(); + const deliverChannelId = deliverTarget.startsWith("channel:") + ? deliverTarget.slice("channel:".length) + : messageChannelId; + const draftStream = canStreamDraft + ? createDiscordDraftStream({ + rest: client.rest, + channelId: deliverChannelId, + maxChars: draftMaxChars, + replyToMessageId: draftReplyToMessageId, + minInitialChars: 30, + throttleMs: 1200, + log: logVerbose, + warn: logVerbose, + }) + : undefined; + const draftChunking = + draftStream && discordStreamMode === "block" + ? resolveDiscordDraftStreamingChunking(cfg, accountId) + : undefined; + const shouldSplitPreviewMessages = discordStreamMode === "block"; + const draftChunker = draftChunking ? new EmbeddedBlockChunker(draftChunking) : undefined; + let lastPartialText = ""; + let draftText = ""; + let hasStreamedMessage = false; + let finalizedViaPreviewMessage = false; + + const resolvePreviewFinalText = (text?: string) => { + if (typeof text !== "string") { + return undefined; + } + const formatted = convertMarkdownTables(text, tableMode); + const chunks = chunkDiscordTextWithMode(formatted, { + maxChars: draftMaxChars, + maxLines: discordConfig?.maxLinesPerMessage, + chunkMode, + }); + if (!chunks.length && formatted) { + chunks.push(formatted); + } + if (chunks.length !== 1) { + return undefined; + } + const trimmed = chunks[0].trim(); + if (!trimmed) { + return undefined; + } + const currentPreviewText = discordStreamMode === "block" ? draftText : lastPartialText; + if ( + currentPreviewText && + currentPreviewText.startsWith(trimmed) && + trimmed.length < currentPreviewText.length + ) { + return undefined; + } + return trimmed; + }; + + const updateDraftFromPartial = (text?: string) => { + if (!draftStream || !text) { + return; + } + if (text === lastPartialText) { + return; + } + hasStreamedMessage = true; + if (discordStreamMode === "partial") { + // Keep the longer preview to avoid visible punctuation flicker. + if ( + lastPartialText && + lastPartialText.startsWith(text) && + text.length < lastPartialText.length + ) { + return; + } + lastPartialText = text; + draftStream.update(text); + return; + } + + let delta = text; + if (text.startsWith(lastPartialText)) { + delta = text.slice(lastPartialText.length); + } else { + // Streaming buffer reset (or non-monotonic stream). Start fresh. + draftChunker?.reset(); + draftText = ""; + } + lastPartialText = text; + if (!delta) { + return; + } + if (!draftChunker) { + draftText = text; + draftStream.update(draftText); + return; + } + draftChunker.append(delta); + draftChunker.drain({ + force: false, + emit: (chunk) => { + draftText += chunk; + draftStream.update(draftText); + }, + }); + }; + + const flushDraft = async () => { + if (!draftStream) { + return; + } + if (draftChunker?.hasBuffered()) { + draftChunker.drain({ + force: true, + emit: (chunk) => { + draftText += chunk; + }, + }); + draftChunker.reset(); + if (draftText) { + draftStream.update(draftText); + } + } + await draftStream.flush(); + }; + + // When draft streaming is active, suppress block streaming to avoid double-streaming. + const disableBlockStreamingForDraft = draftStream ? true : undefined; + const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: resolveHumanDelayConfig(cfg, route.agentId), - deliver: async (payload: ReplyPayload) => { + deliver: async (payload: ReplyPayload, info) => { + const isFinal = info.kind === "final"; + if (draftStream && isFinal) { + await flushDraft(); + const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + const finalText = payload.text; + const previewFinalText = resolvePreviewFinalText(finalText); + const previewMessageId = draftStream.messageId(); + + // Try to finalize via preview edit (text-only, fits in 2000 chars, not an error) + const canFinalizeViaPreviewEdit = + !finalizedViaPreviewMessage && + !hasMedia && + typeof previewFinalText === "string" && + typeof previewMessageId === "string" && + !payload.isError; + + if (canFinalizeViaPreviewEdit) { + await draftStream.stop(); + try { + await editMessageDiscord( + deliverChannelId, + previewMessageId, + { content: previewFinalText }, + { rest: client.rest }, + ); + finalizedViaPreviewMessage = true; + replyReference.markSent(); + return; + } catch (err) { + logVerbose( + `discord: preview final edit failed; falling back to standard send (${String(err)})`, + ); + } + } + + // Check if stop() flushed a message we can edit + if (!finalizedViaPreviewMessage) { + await draftStream.stop(); + const messageIdAfterStop = draftStream.messageId(); + if ( + typeof messageIdAfterStop === "string" && + typeof previewFinalText === "string" && + !hasMedia && + !payload.isError + ) { + try { + await editMessageDiscord( + deliverChannelId, + messageIdAfterStop, + { content: previewFinalText }, + { rest: client.rest }, + ); + finalizedViaPreviewMessage = true; + replyReference.markSent(); + return; + } catch (err) { + logVerbose( + `discord: post-stop preview edit failed; falling back to standard send (${String(err)})`, + ); + } + } + } + + // Clear the preview and fall through to standard delivery + if (!finalizedViaPreviewMessage) { + await draftStream.clear(); + } + } + const replyToId = replyReference.use(); await deliverDiscordReply({ replies: [payload], @@ -623,7 +836,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) textLimit, maxLinesPerMessage: discordConfig?.maxLinesPerMessage, tableMode, - chunkMode: resolveChunkMode(cfg, "discord", accountId), + chunkMode, }); replyReference.markSent(); }, @@ -647,9 +860,33 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) ...replyOptions, skillFilter: channelConfig?.skills, disableBlockStreaming: - typeof discordConfig?.blockStreaming === "boolean" + disableBlockStreamingForDraft ?? + (typeof discordConfig?.blockStreaming === "boolean" ? !discordConfig.blockStreaming - : undefined, + : undefined), + onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined, + onAssistantMessageStart: draftStream + ? () => { + if (shouldSplitPreviewMessages && hasStreamedMessage) { + logVerbose("discord: calling forceNewMessage() for draft stream"); + draftStream.forceNewMessage(); + } + lastPartialText = ""; + draftText = ""; + draftChunker?.reset(); + } + : undefined, + onReasoningEnd: draftStream + ? () => { + if (shouldSplitPreviewMessages && hasStreamedMessage) { + logVerbose("discord: calling forceNewMessage() for draft stream"); + draftStream.forceNewMessage(); + } + lastPartialText = ""; + draftText = ""; + draftChunker?.reset(); + } + : undefined, onModelSelected, onReasoningStream: async () => { await statusReactions.setThinking(); @@ -663,6 +900,11 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) dispatchError = true; throw err; } finally { + // Must stop() first to flush debounced content before clear() wipes state + await draftStream?.stop(); + if (!finalizedViaPreviewMessage) { + await draftStream?.clear(); + } markDispatchIdle(); if (statusReactionsEnabled) { if (dispatchError) {