diff --git a/CHANGELOG.md b/CHANGELOG.md index bd7d4f50ed5..274ca2e9975 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -154,6 +154,7 @@ Docs: https://docs.openclaw.ai - Feishu/Inbound ordering: serialize message handling per chat while preserving cross-chat concurrency to avoid same-chat race drops under bursty inbound traffic. (#31807) - Feishu/Typing notification suppression: skip typing keepalive reaction re-adds when the indicator is already active, preventing duplicate notification pings from repeated identical emoji adds. (#31580) - Feishu/Probe failure backoff: cache API and timeout probe failures for one minute per account key while preserving abort-aware probe timeouts, reducing repeated health-check retries during transient credential/network outages. (#29970) +- Feishu/Streaming block fallback: preserve markdown block stream text as final streaming-card content when final payload text is missing, while still suppressing non-card internal block chunk delivery. (#30663) - BlueBubbles/Message metadata: harden send response ID extraction, include sender identity in DM context, and normalize inbound `message_id` selection to avoid duplicate ID metadata. (#23970) Thanks @tyler6204. - WebChat/markdown tables: ensure GitHub-flavored markdown table parsing is explicitly enabled at render time and add horizontal overflow handling for wide tables, with regression coverage for table-only and mixed text+table content. (#32365) Thanks @BlueBirdBack. - Feishu/default account resolution: always honor explicit `channels.feishu.defaultAccount` during outbound account selection (including top-level-credential setups where the preferred id is not present in `accounts`), instead of silently falling back to another account id. (#32253) Thanks @bmendonca3. diff --git a/extensions/feishu/src/reply-dispatcher.test.ts b/extensions/feishu/src/reply-dispatcher.test.ts index 4a46a2ee3b6..ace7b2cc2db 100644 --- a/extensions/feishu/src/reply-dispatcher.test.ts +++ b/extensions/feishu/src/reply-dispatcher.test.ts @@ -226,6 +226,24 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled(); }); + it("closes streaming with block text when final reply is missing", async () => { + createFeishuReplyDispatcher({ + cfg: {} as never, + agentId: "agent", + runtime: { log: vi.fn(), error: vi.fn() } as never, + chatId: "oc_chat", + }); + + const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0]; + await options.deliver({ text: "```md\npartial answer\n```" }, { kind: "block" }); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].start).toHaveBeenCalledTimes(1); + expect(streamingInstances[0].close).toHaveBeenCalledTimes(1); + expect(streamingInstances[0].close).toHaveBeenCalledWith("```md\npartial answer\n```"); + }); + it("sends media-only payloads as attachments", async () => { createFeishuReplyDispatcher({ cfg: {} as never, diff --git a/extensions/feishu/src/reply-dispatcher.ts b/extensions/feishu/src/reply-dispatcher.ts index a98ae3094a1..88c31c66260 100644 --- a/extensions/feishu/src/reply-dispatcher.ts +++ b/extensions/feishu/src/reply-dispatcher.ts @@ -146,6 +146,48 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP let partialUpdateQueue: Promise = Promise.resolve(); let streamingStartPromise: Promise | null = null; + const mergeStreamingText = (nextText: string) => { + if (!streamText) { + streamText = nextText; + return; + } + if (nextText.startsWith(streamText)) { + // Handle cumulative partial payloads where nextText already includes prior text. + streamText = nextText; + return; + } + if (streamText.endsWith(nextText)) { + return; + } + streamText += nextText; + }; + + const queueStreamingUpdate = ( + nextText: string, + options?: { + dedupeWithLastPartial?: boolean; + }, + ) => { + if (!nextText) { + return; + } + if (options?.dedupeWithLastPartial && nextText === lastPartial) { + return; + } + if (options?.dedupeWithLastPartial) { + lastPartial = nextText; + } + mergeStreamingText(nextText); + partialUpdateQueue = partialUpdateQueue.then(async () => { + if (streamingStartPromise) { + await streamingStartPromise; + } + if (streaming?.isActive()) { + await streaming.update(streamText); + } + }); + }; + const startStreaming = () => { if (!streamingEnabled || streamingStartPromise || streaming) { return; @@ -205,12 +247,6 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP void typingCallbacks.onReplyStart?.(); }, deliver: async (payload: ReplyPayload, info) => { - // FIX: Filter out internal 'block' reasoning chunks immediately to prevent - // data leak and race conditions with streaming state initialization. - if (info?.kind === "block") { - return; - } - const text = payload.text ?? ""; const mediaList = payload.mediaUrls && payload.mediaUrls.length > 0 @@ -228,6 +264,18 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP if (hasText) { const useCard = renderMode === "card" || (renderMode === "auto" && shouldUseCard(text)); + if (info?.kind === "block") { + // Drop internal block chunks unless we can safely consume them as + // streaming-card fallback content. + if (!(streamingEnabled && useCard)) { + return; + } + startStreaming(); + if (streamingStartPromise) { + await streamingStartPromise; + } + } + if (info?.kind === "final" && streamingEnabled && useCard) { startStreaming(); if (streamingStartPromise) { @@ -236,6 +284,11 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP } if (streaming?.isActive()) { + if (info?.kind === "block") { + // Some runtimes emit block payloads without onPartial/final callbacks. + // Mirror block text into streamText so onIdle close still sends content. + queueStreamingUpdate(text); + } if (info?.kind === "final") { streamText = text; await closeStreaming(); @@ -331,19 +384,10 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP onModelSelected: prefixContext.onModelSelected, onPartialReply: streamingEnabled ? (payload: ReplyPayload) => { - if (!payload.text || payload.text === lastPartial) { + if (!payload.text) { return; } - lastPartial = payload.text; - streamText = payload.text; - partialUpdateQueue = partialUpdateQueue.then(async () => { - if (streamingStartPromise) { - await streamingStartPromise; - } - if (streaming?.isActive()) { - await streaming.update(streamText); - } - }); + queueStreamingUpdate(payload.text, { dedupeWithLastPartial: true }); } : undefined, },