diff --git a/CHANGELOG.md b/CHANGELOG.md index a2c4442bbe4..7bfe19d53bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -298,6 +298,7 @@ Docs: https://docs.openclaw.ai - Gateway/password auth startup diagnostics: detect unresolved provider-reference objects in `gateway.auth.password` and fail with a specific bootstrap-secrets error message instead of generic misconfiguration output. (#39230) Thanks @ademczuk. - Agents/OpenAI-responses compatibility: strip unsupported `store` payload fields when `supportsStore=false` (including OpenAI-compatible non-OpenAI providers) while preserving server-compaction payload behavior. (#39219) Thanks @ademczuk. - Agents/model fallback visibility: warn when configured model IDs cannot be resolved and fallback is applied, with log-safe sanitization of model text to prevent control-sequence injection in warning output. (#39215) Thanks @ademczuk. +- Outbound delivery replay safety: use two-phase delivery ACK markers (`.json` -> `.delivered` -> unlink) and startup marker cleanup so crash windows between send and cleanup do not replay already-delivered messages. (#38668) Thanks @Gundam98. ## 2026.3.2 diff --git a/src/infra/outbound/delivery-queue.ts b/src/infra/outbound/delivery-queue.ts index e84527b461e..8a79cc732d5 100644 --- a/src/infra/outbound/delivery-queue.ts +++ b/src/infra/outbound/delivery-queue.ts @@ -107,20 +107,44 @@ export async function enqueueDelivery( return id; } -/** Remove a successfully delivered entry from the queue. */ +/** Remove a successfully delivered entry from the queue. + * + * Uses a two-phase approach so that a crash between delivery and cleanup + * does not cause the message to be replayed on the next recovery scan: + * Phase 1: atomic rename {id}.json → {id}.delivered + * Phase 2: unlink the .delivered marker + * If the process dies between phase 1 and phase 2 the marker is cleaned up + * by {@link loadPendingDeliveries} on the next startup without re-sending. + */ export async function ackDelivery(id: string, stateDir?: string): Promise { - const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`); + const queueDir = resolveQueueDir(stateDir); + const jsonPath = path.join(queueDir, `${id}.json`); + const deliveredPath = path.join(queueDir, `${id}.delivered`); try { - await fs.promises.unlink(filePath); + // Phase 1: atomic rename marks the delivery as complete. + await fs.promises.rename(jsonPath, deliveredPath); } catch (err) { const code = err && typeof err === "object" && "code" in err ? String((err as { code?: unknown }).code) : null; - if (code !== "ENOENT") { - throw err; + if (code === "ENOENT") { + // .json already gone — may have been renamed by a previous ack attempt. + // Try to clean up a leftover .delivered marker if present. + try { + await fs.promises.unlink(deliveredPath); + } catch { + // marker already gone — no-op. + } + return; } - // Already removed — no-op. + throw err; + } + // Phase 2: remove the marker file. + try { + await fs.promises.unlink(deliveredPath); + } catch { + // Best-effort; loadPendingDeliveries will clean it up on next startup. } } @@ -156,6 +180,19 @@ export async function loadPendingDeliveries(stateDir?: string): Promise { it("ack is idempotent (no error on missing file)", async () => { await expect(ackDelivery("nonexistent-id", tmpDir)).resolves.toBeUndefined(); }); + + it("ack removes .delivered marker so recovery does not replay", async () => { + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1", payloads: [{ text: "ack-test" }] }, + tmpDir, + ); + const queueDir = path.join(tmpDir, "delivery-queue"); + + await ackDelivery(id, tmpDir); + + // Neither .json nor .delivered should remain. + expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false); + expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); + }); + + it("loadPendingDeliveries cleans up stale .delivered markers without replaying", async () => { + const id = await enqueueDelivery( + { channel: "telegram", to: "99", payloads: [{ text: "stale" }] }, + tmpDir, + ); + const queueDir = path.join(tmpDir, "delivery-queue"); + + // Simulate crash between ack phase 1 (rename) and phase 2 (unlink): + // rename .json → .delivered, then pretend the process died. + fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`)); + + const entries = await loadPendingDeliveries(tmpDir); + + // The .delivered entry must NOT appear as pending. + expect(entries).toHaveLength(0); + // And the marker file should have been cleaned up. + expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false); + }); }); describe("failDelivery", () => {