fix(discord): notify user on discord when inbound worker times out (#53823)

* fix(discord): notify user on discord when inbound worker times out.

* fix(discord): notify user on discord when inbound worker times out.

* Discord: await timeout fallback reply

* Discord: add changelog for timeout reply fix (#53823) (thanks @Kimbo7870)

---------

Co-authored-by: VioGarden <111024100+VioGarden@users.noreply.github.com>
Co-authored-by: Onur Solmaz <2453968+osolmaz@users.noreply.github.com>
This commit is contained in:
Bob
2026-03-24 19:01:12 +01:00
committed by GitHub
parent 03ed0bccf1
commit 7fab2c2897
6 changed files with 374 additions and 10 deletions

View File

@@ -33,6 +33,7 @@ Docs: https://docs.openclaw.ai
- Discord/config types: add missing `autoArchiveDuration` to `DiscordGuildChannelConfig` so TypeScript config definitions match the existing schema and runtime support. (#43427) Thanks @davidguttman.
- Feishu/startup: treat unresolved `SecretRef` app credentials as not configured during account resolution so CLI startup and read-only Feishu config surfaces stop crashing before runtime-backed secret resolution is available. (#53675) Thanks @hpt.
- WhatsApp/groups: track recent gateway-sent message IDs and suppress only matching group echoes, preserving owner `/status`, `/new`, and `/activation` commands from linked-account `fromMe` traffic. (#53624) Thanks @w-sss.
- Discord/timeouts: send a visible timeout reply when the inbound Discord worker times out before a final reply starts, including created auto-thread targets and queued-run ordering. (#53823) Thanks @Kimbo7870.
## 2026.3.23

View File

@@ -5,7 +5,9 @@ import { danger } from "openclaw/plugin-sdk/runtime-env";
import { materializeDiscordInboundJob, type DiscordInboundJob } from "./inbound-job.js";
import type { RuntimeEnv } from "./message-handler.preflight.types.js";
import { processDiscordMessage } from "./message-handler.process.js";
import { deliverDiscordReply } from "./reply-delivery.js";
import type { DiscordMonitorStatusSink } from "./status.js";
import { resolveDiscordReplyDeliveryPlan } from "./threading.js";
import { normalizeDiscordInboundWorkerTimeoutMs, runDiscordTaskWithTimeout } from "./timeouts.js";
type DiscordInboundWorkerParams = {
@@ -41,13 +43,27 @@ async function processDiscordInboundJob(params: {
}) {
const timeoutMs = normalizeDiscordInboundWorkerTimeoutMs(params.runTimeoutMs);
const contextSuffix = formatDiscordRunContextSuffix(params.job);
let finalReplyStarted = false;
let createdThreadId: string | undefined;
let sessionKey: string | undefined;
await runDiscordTaskWithTimeout({
run: async (abortSignal) => {
await processDiscordMessage(materializeDiscordInboundJob(params.job, abortSignal));
await processDiscordMessage(materializeDiscordInboundJob(params.job, abortSignal), {
onFinalReplyStart: () => {
finalReplyStarted = true;
},
onFinalReplyDelivered: () => {
finalReplyStarted = true;
},
onReplyPlanResolved: (resolved) => {
createdThreadId = resolved.createdThreadId?.trim() || undefined;
sessionKey = resolved.sessionKey?.trim() || undefined;
},
});
},
timeoutMs,
abortSignals: [params.job.runtime.abortSignal, params.lifecycleSignal],
onTimeout: (resolvedTimeoutMs) => {
onTimeout: async (resolvedTimeoutMs) => {
params.runtime.error?.(
danger(
`discord inbound worker timed out after ${formatDurationSeconds(resolvedTimeoutMs, {
@@ -56,6 +72,16 @@ async function processDiscordInboundJob(params: {
})}${contextSuffix}`,
),
);
if (finalReplyStarted) {
return;
}
await sendDiscordInboundWorkerTimeoutReply({
job: params.job,
runtime: params.runtime,
contextSuffix,
createdThreadId,
sessionKey,
});
},
onErrorAfterTimeout: (error) => {
params.runtime.error?.(
@@ -65,6 +91,60 @@ async function processDiscordInboundJob(params: {
});
}
async function sendDiscordInboundWorkerTimeoutReply(params: {
job: DiscordInboundJob;
runtime: RuntimeEnv;
contextSuffix: string;
createdThreadId?: string;
sessionKey?: string;
}) {
const messageChannelId = params.job.payload.messageChannelId?.trim();
const messageId = params.job.payload.message?.id?.trim();
const token = params.job.payload.token?.trim();
if (!messageChannelId || !messageId || !token) {
params.runtime.error?.(
danger(
`discord inbound worker timeout reply skipped: missing reply target${params.contextSuffix}`,
),
);
return;
}
const deliveryPlan = resolveDiscordReplyDeliveryPlan({
replyTarget: `channel:${params.job.payload.threadChannel?.id ?? messageChannelId}`,
replyToMode: params.job.payload.replyToMode,
messageId,
threadChannel: params.job.payload.threadChannel,
createdThreadId: params.createdThreadId,
});
try {
await deliverDiscordReply({
cfg: params.job.payload.cfg,
replies: [{ text: "Discord inbound worker timed out.", isError: true }],
target: deliveryPlan.deliverTarget,
token,
accountId: params.job.payload.accountId,
runtime: params.runtime,
textLimit: params.job.payload.textLimit,
maxLinesPerMessage: params.job.payload.discordConfig?.maxLinesPerMessage,
replyToId: deliveryPlan.replyReference.use(),
replyToMode: params.job.payload.replyToMode,
sessionKey:
params.sessionKey ??
params.job.payload.route.sessionKey ??
params.job.payload.baseSessionKey,
threadBindings: params.job.runtime.threadBindings,
});
} catch (error) {
params.runtime.error?.(
danger(
`discord inbound worker timeout reply failed: ${String(error)}${params.contextSuffix}`,
),
);
}
}
export function createDiscordInboundWorker(
params: DiscordInboundWorkerParams,
): DiscordInboundWorker {

View File

@@ -3,6 +3,7 @@ import { vi } from "vitest";
export const preflightDiscordMessageMock: MockFn = vi.fn();
export const processDiscordMessageMock: MockFn = vi.fn();
export const deliverDiscordReplyMock: MockFn = vi.fn(async () => undefined);
vi.mock("./message-handler.preflight.js", () => ({
preflightDiscordMessage: preflightDiscordMessageMock,
@@ -12,4 +13,8 @@ vi.mock("./message-handler.process.js", () => ({
processDiscordMessage: processDiscordMessageMock,
}));
vi.mock("./reply-delivery.js", () => ({
deliverDiscordReply: deliverDiscordReplyMock,
}));
export const { createDiscordMessageHandler } = await import("./message-handler.js");

View File

@@ -69,7 +69,16 @@ function isProcessAborted(abortSignal?: AbortSignal): boolean {
return Boolean(abortSignal?.aborted);
}
export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) {
type DiscordMessageProcessObserver = {
onFinalReplyStart?: () => void;
onFinalReplyDelivered?: () => void;
onReplyPlanResolved?: (params: { createdThreadId?: string; sessionKey?: string }) => void;
};
export async function processDiscordMessage(
ctx: DiscordMessagePreflightContext,
observer?: DiscordMessageProcessObserver,
) {
const {
cfg,
discordConfig,
@@ -397,6 +406,10 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
OriginatingTo: autoThreadContext?.OriginatingTo ?? replyTarget,
});
const persistedSessionKey = ctxPayload.SessionKey ?? route.sessionKey;
observer?.onReplyPlanResolved?.({
createdThreadId: replyPlan.createdThreadId,
sessionKey: persistedSessionKey,
});
await recordInboundSession({
storePath,
@@ -597,6 +610,14 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
// When draft streaming is active, suppress block streaming to avoid double-streaming.
const disableBlockStreamingForDraft = draftStream ? true : undefined;
let finalReplyStartNotified = false;
const notifyFinalReplyStart = () => {
if (finalReplyStartNotified) {
return;
}
finalReplyStartNotified = true;
observer?.onFinalReplyStart?.();
};
const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } =
createReplyDispatcherWithTyping({
@@ -633,6 +654,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
return;
}
try {
notifyFinalReplyStart();
await editMessageDiscord(
deliverChannelId,
previewMessageId,
@@ -641,6 +663,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
);
finalizedViaPreviewMessage = true;
replyReference.markSent();
observer?.onFinalReplyDelivered?.();
return;
} catch (err) {
logVerbose(
@@ -663,6 +686,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
!payload.isError
) {
try {
notifyFinalReplyStart();
await editMessageDiscord(
deliverChannelId,
messageIdAfterStop,
@@ -671,6 +695,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
);
finalizedViaPreviewMessage = true;
replyReference.markSent();
observer?.onFinalReplyDelivered?.();
return;
} catch (err) {
logVerbose(
@@ -690,6 +715,9 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
}
const replyToId = replyReference.use();
if (isFinal) {
notifyFinalReplyStart();
}
await deliverDiscordReply({
cfg,
replies: [payload],
@@ -709,6 +737,9 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
mediaLocalRoots,
});
replyReference.markSent();
if (isFinal) {
observer?.onFinalReplyDelivered?.();
}
},
onError: (err, info) => {
runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`));

View File

@@ -1,4 +1,4 @@
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { beforeAll, describe, expect, it, vi } from "vitest";
import {
createDiscordHandlerParams,
createDiscordPreflightContext,
@@ -6,10 +6,10 @@ import {
let createDiscordMessageHandler: typeof import("./message-handler.module-test-helpers.js").createDiscordMessageHandler;
let preflightDiscordMessageMock: typeof import("./message-handler.module-test-helpers.js").preflightDiscordMessageMock;
let processDiscordMessageMock: typeof import("./message-handler.module-test-helpers.js").processDiscordMessageMock;
let deliverDiscordReplyMock: typeof import("./message-handler.module-test-helpers.js").deliverDiscordReplyMock;
const eventualReplyDeliveredMock = vi.hoisted(() => vi.fn());
type SetStatusFn = (patch: Record<string, unknown>) => void;
function createDeferred<T = void>() {
let resolve: (value: T | PromiseLike<T>) => void = () => {};
const promise = new Promise<T>((innerResolve) => {
@@ -33,7 +33,18 @@ function createMessageData(messageId: string, channelId = "ch-1") {
}
function createPreflightContext(channelId = "ch-1") {
return createDiscordPreflightContext(channelId);
return {
...createDiscordPreflightContext(channelId),
accountId: "default",
token: "test-token",
textLimit: 2_000,
replyToMode: "off" as const,
discordConfig: {
enabled: true,
token: "test-token",
groupPolicy: "allowlist" as const,
},
};
}
function createHandlerWithDefaultPreflight(overrides?: {
@@ -86,8 +97,12 @@ async function createLifecycleStopScenario(params: {
describe("createDiscordMessageHandler queue behavior", () => {
beforeAll(async () => {
vi.resetModules();
({ createDiscordMessageHandler, preflightDiscordMessageMock, processDiscordMessageMock } =
await import("./message-handler.module-test-helpers.js"));
({
createDiscordMessageHandler,
preflightDiscordMessageMock,
processDiscordMessageMock,
deliverDiscordReplyMock,
} = await import("./message-handler.module-test-helpers.js"));
});
it("resets busy counters when the handler is created", () => {
@@ -181,6 +196,7 @@ describe("createDiscordMessageHandler queue behavior", () => {
try {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
deliverDiscordReplyMock.mockClear();
processDiscordMessageMock
.mockImplementationOnce(async (ctx: { abortSignal?: AbortSignal }) => {
@@ -219,6 +235,237 @@ describe("createDiscordMessageHandler queue behavior", () => {
expect(params.runtime.error).toHaveBeenCalledWith(
expect.stringContaining("discord inbound worker timed out after"),
);
expect(deliverDiscordReplyMock).toHaveBeenCalledTimes(1);
expect(deliverDiscordReplyMock).toHaveBeenCalledWith(
expect.objectContaining({
target: "channel:ch-1",
token: "test-token",
replies: [
expect.objectContaining({
isError: true,
text: "Discord inbound worker timed out.",
}),
],
}),
);
} finally {
vi.useRealTimers();
}
});
it("waits for the timeout fallback reply before starting the next queued run", async () => {
vi.useFakeTimers();
try {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
deliverDiscordReplyMock.mockReset();
const deliverTimeoutReply = createDeferred();
deliverDiscordReplyMock.mockImplementationOnce(async () => {
await deliverTimeoutReply.promise;
});
processDiscordMessageMock
.mockImplementationOnce(async (ctx: { abortSignal?: AbortSignal }) => {
await new Promise<void>((resolve) => {
if (ctx.abortSignal?.aborted) {
resolve();
return;
}
ctx.abortSignal?.addEventListener("abort", () => resolve(), { once: true });
});
})
.mockImplementationOnce(async () => undefined);
preflightDiscordMessageMock.mockImplementation(
async (preflightParams: { data: { channel_id: string } }) =>
createPreflightContext(preflightParams.data.channel_id),
);
const handler = createDiscordMessageHandler(createDiscordHandlerParams({ workerRunTimeoutMs: 50 }));
await expect(
handler(createMessageData("m-1") as never, {} as never),
).resolves.toBeUndefined();
await expect(
handler(createMessageData("m-2") as never, {} as never),
).resolves.toBeUndefined();
await vi.advanceTimersByTimeAsync(60);
await vi.waitFor(() => {
expect(deliverDiscordReplyMock).toHaveBeenCalledTimes(1);
});
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
deliverTimeoutReply.resolve();
await deliverTimeoutReply.promise;
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(2);
});
} finally {
vi.useRealTimers();
}
});
it("does not send the timeout fallback when a final reply already went out", async () => {
vi.useFakeTimers();
try {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
deliverDiscordReplyMock.mockClear();
processDiscordMessageMock.mockImplementationOnce(
async (
ctx: { abortSignal?: AbortSignal },
observer?: { onFinalReplyStart?: () => void; onFinalReplyDelivered?: () => void },
) => {
observer?.onFinalReplyStart?.();
observer?.onFinalReplyDelivered?.();
await new Promise<void>((resolve) => {
if (ctx.abortSignal?.aborted) {
resolve();
return;
}
ctx.abortSignal?.addEventListener("abort", () => resolve(), { once: true });
});
},
);
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const params = createDiscordHandlerParams({ workerRunTimeoutMs: 50 });
const handler = createDiscordMessageHandler(params);
await expect(
handler(createMessageData("m-1") as never, {} as never),
).resolves.toBeUndefined();
await vi.advanceTimersByTimeAsync(60);
await Promise.resolve();
expect(params.runtime.error).toHaveBeenCalledWith(
expect.stringContaining("discord inbound worker timed out after"),
);
expect(deliverDiscordReplyMock).not.toHaveBeenCalled();
} finally {
vi.useRealTimers();
}
});
it("routes the timeout fallback to the created auto-thread target", async () => {
vi.useFakeTimers();
try {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
deliverDiscordReplyMock.mockClear();
processDiscordMessageMock.mockImplementationOnce(
async (
ctx: { abortSignal?: AbortSignal },
observer?: {
onReplyPlanResolved?: (params: {
createdThreadId?: string;
sessionKey?: string;
}) => void;
},
) => {
observer?.onReplyPlanResolved?.({
createdThreadId: "thread-1",
sessionKey: "agent:main:discord:channel:thread-1",
});
await new Promise<void>((resolve) => {
if (ctx.abortSignal?.aborted) {
resolve();
return;
}
ctx.abortSignal?.addEventListener("abort", () => resolve(), { once: true });
});
},
);
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const params = createDiscordHandlerParams({ workerRunTimeoutMs: 50 });
const handler = createDiscordMessageHandler(params);
await expect(
handler(createMessageData("m-1") as never, {} as never),
).resolves.toBeUndefined();
await vi.advanceTimersByTimeAsync(60);
await Promise.resolve();
expect(params.runtime.error).toHaveBeenCalledWith(
expect.stringContaining("discord inbound worker timed out after"),
);
expect(deliverDiscordReplyMock).toHaveBeenCalledTimes(1);
expect(deliverDiscordReplyMock).toHaveBeenCalledWith(
expect.objectContaining({
target: "channel:thread-1",
sessionKey: "agent:main:discord:channel:thread-1",
replies: [
expect.objectContaining({
isError: true,
text: "Discord inbound worker timed out.",
}),
],
}),
);
} finally {
vi.useRealTimers();
}
});
it("does not send the timeout fallback when final reply delivery is already in flight", async () => {
vi.useFakeTimers();
try {
preflightDiscordMessageMock.mockReset();
processDiscordMessageMock.mockReset();
deliverDiscordReplyMock.mockClear();
const finishFinalReply = createDeferred();
processDiscordMessageMock.mockImplementationOnce(
async (
_ctx: { abortSignal?: AbortSignal },
observer?: { onFinalReplyStart?: () => void; onFinalReplyDelivered?: () => void },
) => {
observer?.onFinalReplyStart?.();
await finishFinalReply.promise;
observer?.onFinalReplyDelivered?.();
},
);
preflightDiscordMessageMock.mockImplementation(
async (params: { data: { channel_id: string } }) =>
createPreflightContext(params.data.channel_id),
);
const params = createDiscordHandlerParams({ workerRunTimeoutMs: 50 });
const handler = createDiscordMessageHandler(params);
await expect(
handler(createMessageData("m-1") as never, {} as never),
).resolves.toBeUndefined();
await vi.waitFor(() => {
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
});
await vi.advanceTimersByTimeAsync(60);
await Promise.resolve();
expect(params.runtime.error).toHaveBeenCalledWith(
expect.stringContaining("discord inbound worker timed out after"),
);
expect(deliverDiscordReplyMock).not.toHaveBeenCalled();
finishFinalReply.resolve();
await finishFinalReply.promise;
await Promise.resolve();
expect(deliverDiscordReplyMock).not.toHaveBeenCalled();
} finally {
vi.useRealTimers();
}

View File

@@ -69,7 +69,7 @@ export async function runDiscordTaskWithTimeout(params: {
run: (abortSignal: AbortSignal | undefined) => Promise<void>;
timeoutMs?: number;
abortSignals?: Array<AbortSignal | undefined>;
onTimeout: (timeoutMs: number) => void;
onTimeout: (timeoutMs: number) => void | Promise<void>;
onAbortAfterTimeout?: () => void;
onErrorAfterTimeout?: (error: unknown) => void;
}): Promise<boolean> {
@@ -108,7 +108,7 @@ export async function runDiscordTaskWithTimeout(params: {
if (result === "timeout") {
timedOut = true;
timeoutAbortController?.abort();
params.onTimeout(params.timeoutMs);
await params.onTimeout(params.timeoutMs);
return true;
}
return false;