fix(whatsapp): narrow reconnect queue drain behavior

This commit is contained in:
Marcus Castro
2026-04-09 01:39:56 -03:00
parent df9932a62d
commit 5ce763406e
4 changed files with 150 additions and 94 deletions

View File

@@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai
- fix(exec): replace TOCTOU check-then-read with atomic pinned-fd open in script preflight [AI]. (#62333) Thanks @pgondhi987.
- WhatsApp/auto-reply: keep inbound reply, media, and composing sends on the current socket across reconnects, wait through reconnect gaps, and retry timeout-only send failures without dropping the active socket ref. (#62892) Thanks @mcaxtr.
- Config/plugins: let config writes keep disabled plugin entries without forcing required plugin config schemas or crashing raw plugin validation, so slot switches and similar plugin-state updates persist cleanly. (#63296) Thanks @fuller-stack-dev.
- WhatsApp/outbound queue: drain queued WhatsApp deliveries when the listener reconnects without dropping reconnect-delayed sends after a special TTL or rewriting retry history, so disconnect-window outbound messages can recover once the channel is ready again. (#46299) Thanks @manuel-claw.
## 2026.4.9

View File

@@ -1,5 +1,3 @@
import fs from "node:fs";
import path from "node:path";
import type { OpenClawConfig } from "../../config/config.js";
import { formatErrorMessage } from "../errors.js";
import {
@@ -9,7 +7,6 @@ import {
moveToFailed,
type QueuedDelivery,
type QueuedDeliveryPayload,
resolveQueueDir,
} from "./delivery-queue-storage.js";
export type RecoverySummary = {
@@ -58,9 +55,9 @@ const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [
];
const NO_LISTENER_ERROR_RE = /No active WhatsApp Web listener/i;
const RECONNECT_QUEUE_TTL_MS = 5 * 60 * 1000;
const drainInProgress = new Map<string, boolean>();
const entriesInProgress = new Set<string>();
type DeliverRuntimeModule = typeof import("./deliver-runtime.js");
@@ -81,31 +78,6 @@ function getErrnoCode(err: unknown): string | null {
: null;
}
async function resetReconnectEntry(entry: QueuedDelivery, stateDir?: string): Promise<boolean> {
const filePath = path.join(resolveQueueDir(stateDir), `${entry.id}.json`);
const tmp = `${filePath}.${process.pid}.tmp`;
const reset: QueuedDelivery = {
...entry,
retryCount: 0,
lastAttemptAt: undefined,
lastError: undefined,
};
await fs.promises.writeFile(tmp, JSON.stringify(reset, null, 2), {
encoding: "utf-8",
mode: 0o600,
});
try {
await fs.promises.rename(tmp, filePath);
} catch (err) {
await fs.promises.unlink(tmp).catch(() => {});
if (getErrnoCode(err) === "ENOENT") {
return false;
}
throw err;
}
return true;
}
function createEmptyRecoverySummary(): RecoverySummary {
return {
recovered: 0,
@@ -115,6 +87,18 @@ function createEmptyRecoverySummary(): RecoverySummary {
};
}
function claimRecoveryEntry(entryId: string): boolean {
if (entriesInProgress.has(entryId)) {
return false;
}
entriesInProgress.add(entryId);
return true;
}
function releaseRecoveryEntry(entryId: string): void {
entriesInProgress.delete(entryId);
}
function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) {
return {
cfg,
@@ -211,62 +195,50 @@ export async function drainReconnectQueue(opts: {
drainInProgress.set(opts.accountId, true);
try {
const pending = await loadPendingDeliveries(opts.stateDir);
const now = Date.now();
const matchingEntries = pending.filter(
(entry) =>
entry.channel === "whatsapp" &&
normalizeQueueAccountId(entry.accountId) === opts.accountId &&
typeof entry.lastError === "string" &&
NO_LISTENER_ERROR_RE.test(entry.lastError),
);
const eligible = matchingEntries.filter(
(entry) => entry.retryCount < MAX_RETRIES && now - entry.enqueuedAt < RECONNECT_QUEUE_TTL_MS,
);
const expired = matchingEntries.filter(
(entry) =>
entry.retryCount >= MAX_RETRIES || now - entry.enqueuedAt >= RECONNECT_QUEUE_TTL_MS,
);
const matchingEntries = (await loadPendingDeliveries(opts.stateDir))
.filter(
(entry) =>
entry.channel === "whatsapp" &&
normalizeQueueAccountId(entry.accountId) === opts.accountId &&
typeof entry.lastError === "string" &&
NO_LISTENER_ERROR_RE.test(entry.lastError),
)
.toSorted((a, b) => a.enqueuedAt - b.enqueuedAt);
for (const entry of expired) {
try {
await moveToFailed(entry.id, opts.stateDir);
} catch (err) {
if (getErrnoCode(err) === "ENOENT") {
opts.log.info(`reconnect drain: expired entry ${entry.id} already gone, skipping`);
continue;
}
throw err;
}
opts.log.warn(
`WhatsApp reconnect drain: expired entry ${entry.id} (TTL exceeded or max retries)`,
);
}
if (eligible.length === 0) {
if (matchingEntries.length === 0) {
return;
}
opts.log.info(
`WhatsApp reconnect drain: ${eligible.length} pending message(s) for account ${opts.accountId}`,
`WhatsApp reconnect drain: ${matchingEntries.length} pending message(s) for account ${opts.accountId}`,
);
const resetEntries: QueuedDelivery[] = [];
for (const entry of eligible) {
const reset = await resetReconnectEntry(entry, opts.stateDir);
if (!reset) {
opts.log.info(
`reconnect drain: entry ${entry.id} already acked by concurrent recovery, skipping`,
);
continue;
}
resetEntries.push(entry);
}
const deliver = opts.deliver ?? (await loadDeliverRuntime()).deliverOutboundPayloads;
// Deliver only the reset entries for this account instead of replaying the full queue.
for (const entry of resetEntries) {
for (const entry of matchingEntries) {
if (!claimRecoveryEntry(entry.id)) {
opts.log.info(`WhatsApp reconnect drain: entry ${entry.id} is already being recovered`);
continue;
}
if (entry.retryCount >= MAX_RETRIES) {
try {
await moveToFailed(entry.id, opts.stateDir);
} catch (err) {
if (getErrnoCode(err) === "ENOENT") {
opts.log.info(`reconnect drain: entry ${entry.id} already gone, skipping`);
continue;
}
throw err;
} finally {
releaseRecoveryEntry(entry.id);
}
opts.log.warn(
`WhatsApp reconnect drain: entry ${entry.id} exceeded max retries and was moved to failed/`,
);
continue;
}
try {
await deliver(buildRecoveryDeliverParams(entry, opts.cfg));
await ackDelivery(entry.id, opts.stateDir);
@@ -277,6 +249,8 @@ export async function drainReconnectQueue(opts: {
} else {
await failDelivery(entry.id, errMsg, opts.stateDir).catch(() => {});
}
} finally {
releaseRecoveryEntry(entry.id);
}
}
} finally {
@@ -316,11 +290,19 @@ export async function recoverPendingDeliveries(opts: {
break;
}
if (entry.retryCount >= MAX_RETRIES) {
opts.log.warn(
`Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`,
);
await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir);
summary.skippedMaxRetries += 1;
if (!claimRecoveryEntry(entry.id)) {
opts.log.info(`Recovery skipped for delivery ${entry.id}: already being processed`);
continue;
}
try {
opts.log.warn(
`Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`,
);
await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir);
summary.skippedMaxRetries += 1;
} finally {
releaseRecoveryEntry(entry.id);
}
continue;
}
@@ -333,6 +315,11 @@ export async function recoverPendingDeliveries(opts: {
continue;
}
if (!claimRecoveryEntry(entry.id)) {
opts.log.info(`Recovery skipped for delivery ${entry.id}: already being processed`);
continue;
}
try {
await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg));
await ackDelivery(entry.id, opts.stateDir);
@@ -353,6 +340,8 @@ export async function recoverPendingDeliveries(opts: {
}
summary.failed += 1;
opts.log.warn(`Retry failed for delivery ${entry.id}: ${errMsg}`);
} finally {
releaseRecoveryEntry(entry.id);
}
}

View File

@@ -10,6 +10,7 @@ import {
failDelivery,
MAX_RETRIES,
type RecoveryLogger,
recoverPendingDeliveries,
} from "./delivery-queue.js";
function createMockLogger(): RecoveryLogger {
@@ -90,22 +91,24 @@ describe("drainReconnectQueue", () => {
expect(deliver).not.toHaveBeenCalled();
});
it("expires and moves to failed entries older than TTL", async () => {
it("retries immediately without resetting retry history", async () => {
const log = createMockLogger();
const deliver = vi.fn<DeliverFn>(async () => {});
const deliver = vi.fn<DeliverFn>(async () => {
throw new Error("transient failure");
});
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
tmpDir,
);
await failDelivery(id, "No active WhatsApp Web listener", tmpDir);
// Backdate enqueuedAt to exceed 5-minute TTL
const queueDir = path.join(tmpDir, "delivery-queue");
const filePath = path.join(queueDir, `${id}.json`);
const entry = JSON.parse(fs.readFileSync(filePath, "utf-8"));
entry.enqueuedAt = Date.now() - 6 * 60 * 1000; // 6 minutes ago
fs.writeFileSync(filePath, JSON.stringify(entry, null, 2));
const before = JSON.parse(fs.readFileSync(filePath, "utf-8")) as {
retryCount: number;
lastAttemptAt?: number;
lastError?: string;
};
await drainReconnectQueue({
accountId: "acct1",
@@ -115,11 +118,17 @@ describe("drainReconnectQueue", () => {
deliver,
});
// Should have been moved to failed/
const failedDir = path.join(queueDir, "failed");
const failedFiles = fs.readdirSync(failedDir).filter((f) => f.endsWith(".json"));
expect(failedFiles).toHaveLength(1);
expect(deliver).not.toHaveBeenCalled();
expect(deliver).toHaveBeenCalledTimes(1);
const after = JSON.parse(fs.readFileSync(filePath, "utf-8")) as {
retryCount: number;
lastAttemptAt?: number;
lastError?: string;
};
expect(after.retryCount).toBe(before.retryCount + 1);
expect(after.lastAttemptAt).toBeTypeOf("number");
expect(after.lastAttemptAt).toBeGreaterThanOrEqual(before.lastAttemptAt ?? 0);
expect(after.lastError).toBe("transient failure");
});
it("does not throw if delivery fails during drain", async () => {
@@ -213,4 +222,61 @@ describe("drainReconnectQueue", () => {
resolveDeliver!();
await first;
});
it("does not re-deliver an entry already being recovered at startup", async () => {
const log = createMockLogger();
const startupLog = createMockLogger();
let resolveDeliver: () => void;
const deliverPromise = new Promise<void>((resolve) => {
resolveDeliver = resolve;
});
const deliver = vi.fn<DeliverFn>(async () => {
await deliverPromise;
});
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
tmpDir,
);
const queuePath = path.join(tmpDir, "delivery-queue", `${id}.json`);
const entry = JSON.parse(fs.readFileSync(queuePath, "utf-8")) as {
id: string;
enqueuedAt: number;
channel: string;
to: string;
accountId?: string;
payloads: Array<{ text: string }>;
retryCount: number;
lastError?: string;
};
entry.lastError = "No active WhatsApp Web listener";
fs.writeFileSync(queuePath, JSON.stringify(entry, null, 2));
const startupRecovery = recoverPendingDeliveries({
cfg: stubCfg,
deliver,
log: startupLog,
stateDir: tmpDir,
});
await vi.waitFor(() => {
expect(deliver).toHaveBeenCalledTimes(1);
});
await drainReconnectQueue({
accountId: "acct1",
cfg: stubCfg,
log,
stateDir: tmpDir,
deliver,
});
expect(deliver).toHaveBeenCalledTimes(1);
expect(log.info).toHaveBeenCalledWith(
expect.stringContaining(`entry ${id} is already being recovered`),
);
resolveDeliver!();
await startupRecovery;
});
});

View File

@@ -32,7 +32,7 @@ export * from "../infra/net/proxy-env.js";
export * from "../infra/net/proxy-fetch.js";
export * from "../infra/net/undici-global-dispatcher.js";
export * from "../infra/net/ssrf.js";
export * from "../infra/outbound/delivery-queue.js";
export { drainReconnectQueue } from "../infra/outbound/delivery-queue.js";
export * from "../infra/outbound/identity.js";
export * from "../infra/outbound/sanitize-text.js";
export * from "../infra/parse-finite-number.js";