From e0201c2774b8465b3bde85a097196a7241b51ab1 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 25 Feb 2026 02:03:15 +0000 Subject: [PATCH] fix: keep channel typing active during long inference (#25886, thanks @stakeswky) Co-authored-by: stakeswky --- CHANGELOG.md | 1 + .../matrix/src/matrix/monitor/handler.ts | 1 + .../mattermost/src/mattermost/monitor.ts | 2 + extensions/msteams/src/reply-dispatcher.ts | 2 + src/channels/channel-helpers.test.ts | 44 ++++++++++++++++++ src/channels/typing.ts | 46 +++++++++++++++++-- .../monitor/message-handler.process.ts | 2 + src/signal/monitor/event-handler.ts | 2 + src/slack/monitor/message-handler/dispatch.ts | 1 + src/telegram/bot-message-dispatch.ts | 26 ++++++----- 10 files changed, 111 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e93bc01767..7601b9997b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ Docs: https://docs.openclaw.ai - Cron/Heartbeat delivery: stop inheriting cached session `lastThreadId` for heartbeat-mode target resolution unless a thread/topic is explicitly requested, so announce-mode cron and heartbeat deliveries stay on top-level destinations instead of leaking into active conversation threads. (#25730) Thanks @markshields-tl. - Heartbeat defaults/prompts: switch the implicit heartbeat delivery target from `last` to `none` (opt-in for external delivery), and use internal-only cron/exec heartbeat prompt wording when delivery is disabled so background checks do not nudge user-facing relay behavior. (#25871, #24638, #25851) - Auto-reply/Heartbeat queueing: drop heartbeat runs when a session already has an active run instead of enqueueing a stale followup, preventing duplicate heartbeat response branches after queue drain. (#25610, #25606) Thanks @mcaxtr. +- Channels/Typing keepalive: refresh channel typing callbacks on a keepalive interval during long replies and clear keepalive timers on idle/cleanup across core + extension dispatcher callsites so typing indicators do not expire mid-inference. (#25886, #25882) Thanks @stakeswky. - Security/Sandbox media: restrict sandbox media tmp-path allowances to OpenClaw-managed tmp roots instead of broad host `os.tmpdir()` trust, and add outbound/channel guardrails (tmp-path lint + media-root smoke tests) to prevent regressions in local media attachment reads. Thanks @tdjackey for reporting. - Security/Sandbox media: reject hard-linked OpenClaw tmp media aliases (including symlink-to-hardlink chains) during sandbox media path resolution to prevent out-of-sandbox inode alias reads. (#25820) Thanks @bmendonca3. - Config/Plugins: treat stale removed `google-antigravity-auth` plugin references as compatibility warnings (not hard validation errors) across `plugins.entries`, `plugins.allow`, `plugins.deny`, and `plugins.slots.memory`, so startup no longer fails after antigravity removal. (#25538, #25862) Thanks @chilu18. diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index c1df46fec2a..f62ef60ce3b 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -654,6 +654,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }, onReplyStart: typingCallbacks.onReplyStart, onIdle: typingCallbacks.onIdle, + onCleanup: typingCallbacks.onCleanup, }); const { queuedFinal, counts } = await core.channel.reply.dispatchReplyFromConfig({ diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index fe799a295c9..233aee17d1b 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -805,6 +805,8 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} runtime.error?.(`mattermost ${info.kind} reply failed: ${String(err)}`); }, onReplyStart: typingCallbacks.onReplyStart, + onIdle: typingCallbacks.onIdle, + onCleanup: typingCallbacks.onCleanup, }); await core.channel.reply.dispatchReplyFromConfig({ diff --git a/extensions/msteams/src/reply-dispatcher.ts b/extensions/msteams/src/reply-dispatcher.ts index 55389f2f696..755ae3e56e1 100644 --- a/extensions/msteams/src/reply-dispatcher.ts +++ b/extensions/msteams/src/reply-dispatcher.ts @@ -122,6 +122,8 @@ export function createMSTeamsReplyDispatcher(params: { }); }, onReplyStart: typingCallbacks.onReplyStart, + onIdle: typingCallbacks.onIdle, + onCleanup: typingCallbacks.onCleanup, }); return { diff --git a/src/channels/channel-helpers.test.ts b/src/channels/channel-helpers.test.ts index b6d3ff4fbd8..34bd5370526 100644 --- a/src/channels/channel-helpers.test.ts +++ b/src/channels/channel-helpers.test.ts @@ -190,4 +190,48 @@ describe("createTypingCallbacks", () => { expect(stop).toHaveBeenCalledTimes(1); expect(onStopError).toHaveBeenCalledTimes(1); }); + + it("sends typing keepalive pings until idle cleanup", async () => { + vi.useFakeTimers(); + try { + const start = vi.fn().mockResolvedValue(undefined); + const stop = vi.fn().mockResolvedValue(undefined); + const onStartError = vi.fn(); + const callbacks = createTypingCallbacks({ start, stop, onStartError }); + + await callbacks.onReplyStart(); + expect(start).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(2_999); + expect(start).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(1); + expect(start).toHaveBeenCalledTimes(2); + + await vi.advanceTimersByTimeAsync(3_000); + expect(start).toHaveBeenCalledTimes(3); + + callbacks.onIdle?.(); + await flushMicrotasks(); + expect(stop).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(9_000); + expect(start).toHaveBeenCalledTimes(3); + } finally { + vi.useRealTimers(); + } + }); + + it("deduplicates stop across idle and cleanup", async () => { + const start = vi.fn().mockResolvedValue(undefined); + const stop = vi.fn().mockResolvedValue(undefined); + const onStartError = vi.fn(); + const callbacks = createTypingCallbacks({ start, stop, onStartError }); + + callbacks.onIdle?.(); + callbacks.onCleanup?.(); + await flushMicrotasks(); + + expect(stop).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/channels/typing.ts b/src/channels/typing.ts index 6ab2a975361..f6d60d498d1 100644 --- a/src/channels/typing.ts +++ b/src/channels/typing.ts @@ -10,9 +10,15 @@ export function createTypingCallbacks(params: { stop?: () => Promise; onStartError: (err: unknown) => void; onStopError?: (err: unknown) => void; + keepaliveIntervalMs?: number; }): TypingCallbacks { const stop = params.stop; - const onReplyStart = async () => { + const keepaliveIntervalMs = params.keepaliveIntervalMs ?? 3_000; + let keepaliveTimer: ReturnType | undefined; + let keepaliveStartInFlight = false; + let stopSent = false; + + const fireStart = async () => { try { await params.start(); } catch (err) { @@ -20,11 +26,41 @@ export function createTypingCallbacks(params: { } }; - const fireStop = stop - ? () => { - void stop().catch((err) => (params.onStopError ?? params.onStartError)(err)); + const clearKeepalive = () => { + if (!keepaliveTimer) { + return; + } + clearInterval(keepaliveTimer); + keepaliveTimer = undefined; + keepaliveStartInFlight = false; + }; + + const onReplyStart = async () => { + stopSent = false; + clearKeepalive(); + await fireStart(); + if (keepaliveIntervalMs <= 0) { + return; + } + keepaliveTimer = setInterval(() => { + if (keepaliveStartInFlight) { + return; } - : undefined; + keepaliveStartInFlight = true; + void fireStart().finally(() => { + keepaliveStartInFlight = false; + }); + }, keepaliveIntervalMs); + }; + + const fireStop = () => { + clearKeepalive(); + if (!stop || stopSent) { + return; + } + stopSent = true; + void stop().catch((err) => (params.onStopError ?? params.onStartError)(err)); + }; return { onReplyStart, onIdle: fireStop, onCleanup: fireStop }; } diff --git a/src/discord/monitor/message-handler.process.ts b/src/discord/monitor/message-handler.process.ts index 59b0ceaf649..1d84cd01410 100644 --- a/src/discord/monitor/message-handler.process.ts +++ b/src/discord/monitor/message-handler.process.ts @@ -669,6 +669,8 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) await typingCallbacks.onReplyStart(); await statusReactions.setThinking(); }, + onIdle: typingCallbacks.onIdle, + onCleanup: typingCallbacks.onCleanup, }); let dispatchResult: Awaited> | null = null; diff --git a/src/signal/monitor/event-handler.ts b/src/signal/monitor/event-handler.ts index 8454de9d525..4133930389a 100644 --- a/src/signal/monitor/event-handler.ts +++ b/src/signal/monitor/event-handler.ts @@ -238,6 +238,8 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) { deps.runtime.error?.(danger(`signal ${info.kind} reply failed: ${String(err)}`)); }, onReplyStart: typingCallbacks.onReplyStart, + onIdle: typingCallbacks.onIdle, + onCleanup: typingCallbacks.onCleanup, }); const { queuedFinal } = await dispatchInboundMessage({ diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index d726f804c10..f6d4b531f61 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -306,6 +306,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag }, onReplyStart: typingCallbacks.onReplyStart, onIdle: typingCallbacks.onIdle, + onCleanup: typingCallbacks.onCleanup, }); const draftStream = createSlackDraftStream({ diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 7dd0c48450a..89cb59038db 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -418,6 +418,18 @@ export const dispatchTelegramMessage = async ({ void statusReactionController.setThinking(); } + const typingCallbacks = createTypingCallbacks({ + start: sendTyping, + onStartError: (err) => { + logTypingFailure({ + log: logVerbose, + channel: "telegram", + target: String(chatId), + error: err, + }); + }, + }); + try { ({ queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, @@ -528,17 +540,9 @@ export const dispatchTelegramMessage = async ({ deliveryState.markNonSilentFailure(); runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`)); }, - onReplyStart: createTypingCallbacks({ - start: sendTyping, - onStartError: (err) => { - logTypingFailure({ - log: logVerbose, - channel: "telegram", - target: String(chatId), - error: err, - }); - }, - }).onReplyStart, + onReplyStart: typingCallbacks.onReplyStart, + onIdle: typingCallbacks.onIdle, + onCleanup: typingCallbacks.onCleanup, }, replyOptions: { skillFilter,