From 552ebcc7f0426e36dbcd19398a5700715f942df3 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Wed, 6 May 2026 03:33:17 -0700 Subject: [PATCH] fix(slack): preserve buffered thread stream replies --- CHANGELOG.md | 1 + .../dispatch.preview-fallback.test.ts | 30 ++++++++++++++++ .../src/monitor/message-handler/dispatch.ts | 6 ++++ extensions/slack/src/streaming.test.ts | 34 +++++++++++++++++++ extensions/slack/src/streaming.ts | 29 ++++++++-------- 5 files changed, 86 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 39469d6d722..a945b28efba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -119,6 +119,7 @@ Docs: https://docs.openclaw.ai - Web fetch: bound guarded dispatcher cleanup after request timeouts so timed-out fetches return tool errors instead of leaving Gateway tool lanes active. (#78439) Thanks @obviyus. - Mattermost/setup: prompt for and persist the server base URL after the bot token in `openclaw setup --wizard`, instead of failing validation before `--http-url` is collected. Fixes #76670. Thanks @jacobtomlinson. - Gate Slack startup user allowlist resolution [AI]. (#77898) Thanks @pgondhi987. +- Slack/streaming: fall back to normal threaded delivery when native stream finalization rejects a locally buffered reply, so generated Slack thread replies no longer disappear before posting. Fixes #78061. Thanks @KennanHoa. - OpenAI/Codex: suppress stale `openai-codex` GPT-5.1/5.2/5.3 model refs that ChatGPT/Codex OAuth accounts now reject, keeping model lists, config validation, and forward-compat resolution on current 5.4/5.5 routes. Fixes #67158. Thanks @drpau. - CLI/update: keep pnpm package updates on the running custom global install root and pass pnpm's `--global-dir` so `openclaw update` does not create a second default-prefix install when `OPENCLAW_HOME` or the shell points at a custom OpenClaw directory. Fixes #78377. Thanks @amknight. - Google Meet/Voice Call: wait longer before playing PIN-derived Twilio DTMF for Meet dial-in prompts and retire stale delegated phone sessions instead of reusing completed calls. diff --git a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts index d4a2ff8e5f2..68ad40c35a1 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts @@ -1091,6 +1091,36 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { expect(session.stopped).toBe(true); }); + it("routes pending native stream text through chunked sender for unexpected finalize failures", async () => { + mockedNativeStreaming = true; + const session = { + channel: "C123", + threadTs: THREAD_TS, + stopped: false, + delivered: false, + pendingText: FINAL_REPLY_TEXT, + }; + startSlackStreamMock.mockResolvedValueOnce(session); + stopSlackStreamMock.mockRejectedValueOnce( + new TestSlackStreamNotDeliveredError( + FINAL_REPLY_TEXT, + "method_not_supported_for_channel_type", + ), + ); + + await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + + expect(postMessageMock).not.toHaveBeenCalled(); + expect(deliverRepliesMock).toHaveBeenCalledTimes(1); + expect(deliverRepliesMock).toHaveBeenCalledWith( + expect.objectContaining({ + replyThreadTs: THREAD_TS, + replies: [expect.objectContaining({ text: FINAL_REPLY_TEXT })], + }), + ); + expect(session.stopped).toBe(true); + }); + it("routes all pending native stream text through chunked sender when an append flush fails", async () => { mockedNativeStreaming = true; mockedDispatchSequence = [ diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index 9f6c757659e..283313f50fd 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -1235,8 +1235,14 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag } catch (err) { if (err instanceof SlackStreamNotDeliveredError) { streamFallbackDelivered = await deliverPendingStreamFallback(finalStream, err); + if (!streamFallbackDelivered && !finalStream.delivered) { + dispatchError ??= err; + } } else { runtime.error?.(danger(`slack-stream: failed to stop stream: ${formatErrorMessage(err)}`)); + if (!finalStream.delivered) { + dispatchError ??= err; + } } } } diff --git a/extensions/slack/src/streaming.test.ts b/extensions/slack/src/streaming.test.ts index bc68a70847c..b9b8e68c72f 100644 --- a/extensions/slack/src/streaming.test.ts +++ b/extensions/slack/src/streaming.test.ts @@ -83,6 +83,40 @@ describe("stopSlackStream finalize error handling", () => { expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe("hello world"); }); + it("throws SlackStreamNotDeliveredError for unexpected finalize codes while text is buffered", async () => { + const session = makeSession({ + appendImpl: async () => null, + stopImpl: async () => { + throw slackApiError("method_not_supported_for_channel_type"); + }, + }); + await appendSlackStream({ session, text: "short thread reply" }); + + const thrown = await stopSlackStream({ session }).catch((err: unknown) => err); + + expect(thrown).toBeInstanceOf(SlackStreamNotDeliveredError); + expect((thrown as SlackStreamNotDeliveredError).slackCode).toBe( + "method_not_supported_for_channel_type", + ); + expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe("short thread reply"); + }); + + it("throws SlackStreamNotDeliveredError for non-Slack stop errors while text is buffered", async () => { + const session = makeSession({ + appendImpl: async () => null, + stopImpl: async () => { + throw new Error("socket reset"); + }, + }); + await appendSlackStream({ session, text: "locally buffered reply" }); + + const thrown = await stopSlackStream({ session }).catch((err: unknown) => err); + + expect(thrown).toBeInstanceOf(SlackStreamNotDeliveredError); + expect((thrown as SlackStreamNotDeliveredError).slackCode).toBe("unknown"); + expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe("locally buffered reply"); + }); + it("clears pendingText after an append flush is acknowledged by Slack", async () => { const session = makeSession({ appendImpl: async () => ({ ts: "1700000000.100203" }), diff --git a/extensions/slack/src/streaming.ts b/extensions/slack/src/streaming.ts index fe5316486cc..66a9ebe84c5 100644 --- a/extensions/slack/src/streaming.ts +++ b/extensions/slack/src/streaming.ts @@ -197,15 +197,15 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis * After calling this the stream message becomes a normal Slack message. * Optionally include final text to append before stopping. * - * If Slack's `chat.stopStream` responds with a known benign finalize error - * (see {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) AND any prior `append` - * has already landed on Slack, the error is swallowed and the session is - * marked stopped - the already-delivered text stays visible. + * If Slack's `chat.stopStream` responds with an error while text is still + * buffered locally, this function throws a {@link SlackStreamNotDeliveredError} + * carrying that pending text so the caller can deliver it through the normal + * Slack reply path. * - * If the same benign error fires while text is still only buffered locally - * (e.g. short replies that never exceeded the SDK's buffer_size), this - * function throws a {@link SlackStreamNotDeliveredError} carrying that pending - * text so the caller can deliver it through the normal Slack reply path. + * If Slack responds with a known benign finalize error (see + * {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) after prior `append` calls already + * landed, the error is swallowed and the session is marked stopped - the + * already-delivered text stays visible. * * All other errors propagate unchanged. */ @@ -233,13 +233,14 @@ export async function stopSlackStream(params: StopSlackStreamParams): Promise