fix(slack): preserve buffered thread stream replies

This commit is contained in:
Vincent Koc
2026-05-06 03:33:17 -07:00
parent 99b17263a1
commit 552ebcc7f0
5 changed files with 86 additions and 14 deletions

View File

@@ -119,6 +119,7 @@ Docs: https://docs.openclaw.ai
- Web fetch: bound guarded dispatcher cleanup after request timeouts so timed-out fetches return tool errors instead of leaving Gateway tool lanes active. (#78439) Thanks @obviyus.
- Mattermost/setup: prompt for and persist the server base URL after the bot token in `openclaw setup --wizard`, instead of failing validation before `--http-url` is collected. Fixes #76670. Thanks @jacobtomlinson.
- Gate Slack startup user allowlist resolution [AI]. (#77898) Thanks @pgondhi987.
- Slack/streaming: fall back to normal threaded delivery when native stream finalization rejects a locally buffered reply, so generated Slack thread replies no longer disappear before posting. Fixes #78061. Thanks @KennanHoa.
- OpenAI/Codex: suppress stale `openai-codex` GPT-5.1/5.2/5.3 model refs that ChatGPT/Codex OAuth accounts now reject, keeping model lists, config validation, and forward-compat resolution on current 5.4/5.5 routes. Fixes #67158. Thanks @drpau.
- CLI/update: keep pnpm package updates on the running custom global install root and pass pnpm's `--global-dir` so `openclaw update` does not create a second default-prefix install when `OPENCLAW_HOME` or the shell points at a custom OpenClaw directory. Fixes #78377. Thanks @amknight.
- Google Meet/Voice Call: wait longer before playing PIN-derived Twilio DTMF for Meet dial-in prompts and retire stale delegated phone sessions instead of reusing completed calls.

View File

@@ -1091,6 +1091,36 @@ describe("dispatchPreparedSlackMessage preview fallback", () => {
expect(session.stopped).toBe(true);
});
it("routes pending native stream text through chunked sender for unexpected finalize failures", async () => {
mockedNativeStreaming = true;
const session = {
channel: "C123",
threadTs: THREAD_TS,
stopped: false,
delivered: false,
pendingText: FINAL_REPLY_TEXT,
};
startSlackStreamMock.mockResolvedValueOnce(session);
stopSlackStreamMock.mockRejectedValueOnce(
new TestSlackStreamNotDeliveredError(
FINAL_REPLY_TEXT,
"method_not_supported_for_channel_type",
),
);
await dispatchPreparedSlackMessage(createPreparedSlackMessage());
expect(postMessageMock).not.toHaveBeenCalled();
expect(deliverRepliesMock).toHaveBeenCalledTimes(1);
expect(deliverRepliesMock).toHaveBeenCalledWith(
expect.objectContaining({
replyThreadTs: THREAD_TS,
replies: [expect.objectContaining({ text: FINAL_REPLY_TEXT })],
}),
);
expect(session.stopped).toBe(true);
});
it("routes all pending native stream text through chunked sender when an append flush fails", async () => {
mockedNativeStreaming = true;
mockedDispatchSequence = [

View File

@@ -1235,8 +1235,14 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
} catch (err) {
if (err instanceof SlackStreamNotDeliveredError) {
streamFallbackDelivered = await deliverPendingStreamFallback(finalStream, err);
if (!streamFallbackDelivered && !finalStream.delivered) {
dispatchError ??= err;
}
} else {
runtime.error?.(danger(`slack-stream: failed to stop stream: ${formatErrorMessage(err)}`));
if (!finalStream.delivered) {
dispatchError ??= err;
}
}
}
}

View File

@@ -83,6 +83,40 @@ describe("stopSlackStream finalize error handling", () => {
expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe("hello world");
});
it("throws SlackStreamNotDeliveredError for unexpected finalize codes while text is buffered", async () => {
const session = makeSession({
appendImpl: async () => null,
stopImpl: async () => {
throw slackApiError("method_not_supported_for_channel_type");
},
});
await appendSlackStream({ session, text: "short thread reply" });
const thrown = await stopSlackStream({ session }).catch((err: unknown) => err);
expect(thrown).toBeInstanceOf(SlackStreamNotDeliveredError);
expect((thrown as SlackStreamNotDeliveredError).slackCode).toBe(
"method_not_supported_for_channel_type",
);
expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe("short thread reply");
});
it("throws SlackStreamNotDeliveredError for non-Slack stop errors while text is buffered", async () => {
const session = makeSession({
appendImpl: async () => null,
stopImpl: async () => {
throw new Error("socket reset");
},
});
await appendSlackStream({ session, text: "locally buffered reply" });
const thrown = await stopSlackStream({ session }).catch((err: unknown) => err);
expect(thrown).toBeInstanceOf(SlackStreamNotDeliveredError);
expect((thrown as SlackStreamNotDeliveredError).slackCode).toBe("unknown");
expect((thrown as SlackStreamNotDeliveredError).pendingText).toBe("locally buffered reply");
});
it("clears pendingText after an append flush is acknowledged by Slack", async () => {
const session = makeSession({
appendImpl: async () => ({ ts: "1700000000.100203" }),

View File

@@ -197,15 +197,15 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis
* After calling this the stream message becomes a normal Slack message.
* Optionally include final text to append before stopping.
*
* If Slack's `chat.stopStream` responds with a known benign finalize error
* (see {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) AND any prior `append`
* has already landed on Slack, the error is swallowed and the session is
* marked stopped - the already-delivered text stays visible.
* If Slack's `chat.stopStream` responds with an error while text is still
* buffered locally, this function throws a {@link SlackStreamNotDeliveredError}
* carrying that pending text so the caller can deliver it through the normal
* Slack reply path.
*
* If the same benign error fires while text is still only buffered locally
* (e.g. short replies that never exceeded the SDK's buffer_size), this
* function throws a {@link SlackStreamNotDeliveredError} carrying that pending
* text so the caller can deliver it through the normal Slack reply path.
* If Slack responds with a known benign finalize error (see
* {@link BENIGN_SLACK_FINALIZE_ERROR_CODES}) after prior `append` calls already
* landed, the error is swallowed and the session is marked stopped - the
* already-delivered text stays visible.
*
* All other errors propagate unchanged.
*/
@@ -233,13 +233,14 @@ export async function stopSlackStream(params: StopSlackStreamParams): Promise<vo
session.delivered = true;
session.pendingText = "";
} catch (err) {
const code = extractSlackErrorCode(err) ?? "unknown";
if (session.pendingText) {
// stop() can be the first network call for short replies. If Slack
// rejects that finalize for any reason, the user has not seen the
// SDK-buffered text yet. Let the caller fall back to chat.postMessage.
throw new SlackStreamNotDeliveredError(session.pendingText, code);
}
if (isBenignSlackFinalizeError(err)) {
const code = extractSlackErrorCode(err) ?? "unknown";
if (session.pendingText) {
// stop() can be the first network call for short replies. If Slack
// Connect rejects it, the user has not seen the SDK-buffered text yet.
throw new SlackStreamNotDeliveredError(session.pendingText, code);
}
if (session.delivered) {
logVerbose(
`slack-stream: finalize rejected by Slack (${code}); prior appends delivered, treating stream as stopped`,