fix(outbound): prevent replay after ack crash windows (#38668, thanks @Gundam98)

Co-authored-by: Gundam98 <huhanwen98@gmail.com>
This commit is contained in:
Peter Steinberger
2026-03-07 22:52:27 +00:00
parent 3ca023bf44
commit 708187f28c
3 changed files with 77 additions and 6 deletions

View File

@@ -298,6 +298,7 @@ Docs: https://docs.openclaw.ai
- Gateway/password auth startup diagnostics: detect unresolved provider-reference objects in `gateway.auth.password` and fail with a specific bootstrap-secrets error message instead of generic misconfiguration output. (#39230) Thanks @ademczuk.
- Agents/OpenAI-responses compatibility: strip unsupported `store` payload fields when `supportsStore=false` (including OpenAI-compatible non-OpenAI providers) while preserving server-compaction payload behavior. (#39219) Thanks @ademczuk.
- Agents/model fallback visibility: warn when configured model IDs cannot be resolved and fallback is applied, with log-safe sanitization of model text to prevent control-sequence injection in warning output. (#39215) Thanks @ademczuk.
- Outbound delivery replay safety: use two-phase delivery ACK markers (`.json` -> `.delivered` -> unlink) and startup marker cleanup so crash windows between send and cleanup do not replay already-delivered messages. (#38668) Thanks @Gundam98.
## 2026.3.2

View File

@@ -107,20 +107,44 @@ export async function enqueueDelivery(
return id;
}
/** Remove a successfully delivered entry from the queue. */
/** Remove a successfully delivered entry from the queue.
*
* Uses a two-phase approach so that a crash between delivery and cleanup
* does not cause the message to be replayed on the next recovery scan:
* Phase 1: atomic rename {id}.json → {id}.delivered
* Phase 2: unlink the .delivered marker
* If the process dies between phase 1 and phase 2 the marker is cleaned up
* by {@link loadPendingDeliveries} on the next startup without re-sending.
*/
export async function ackDelivery(id: string, stateDir?: string): Promise<void> {
const filePath = path.join(resolveQueueDir(stateDir), `${id}.json`);
const queueDir = resolveQueueDir(stateDir);
const jsonPath = path.join(queueDir, `${id}.json`);
const deliveredPath = path.join(queueDir, `${id}.delivered`);
try {
await fs.promises.unlink(filePath);
// Phase 1: atomic rename marks the delivery as complete.
await fs.promises.rename(jsonPath, deliveredPath);
} catch (err) {
const code =
err && typeof err === "object" && "code" in err
? String((err as { code?: unknown }).code)
: null;
if (code !== "ENOENT") {
throw err;
if (code === "ENOENT") {
// .json already gone — may have been renamed by a previous ack attempt.
// Try to clean up a leftover .delivered marker if present.
try {
await fs.promises.unlink(deliveredPath);
} catch {
// marker already gone — no-op.
}
return;
}
// Already removed — no-op.
throw err;
}
// Phase 2: remove the marker file.
try {
await fs.promises.unlink(deliveredPath);
} catch {
// Best-effort; loadPendingDeliveries will clean it up on next startup.
}
}
@@ -156,6 +180,19 @@ export async function loadPendingDeliveries(stateDir?: string): Promise<QueuedDe
}
throw err;
}
// Clean up .delivered markers left by ackDelivery if the process crashed
// between the rename and the unlink.
for (const file of files) {
if (!file.endsWith(".delivered")) {
continue;
}
try {
await fs.promises.unlink(path.join(queueDir, file));
} catch {
// Best-effort cleanup.
}
}
const entries: QueuedDelivery[] = [];
for (const file of files) {
if (!file.endsWith(".json")) {

View File

@@ -113,6 +113,39 @@ describe("delivery-queue", () => {
it("ack is idempotent (no error on missing file)", async () => {
await expect(ackDelivery("nonexistent-id", tmpDir)).resolves.toBeUndefined();
});
it("ack removes .delivered marker so recovery does not replay", async () => {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1", payloads: [{ text: "ack-test" }] },
tmpDir,
);
const queueDir = path.join(tmpDir, "delivery-queue");
await ackDelivery(id, tmpDir);
// Neither .json nor .delivered should remain.
expect(fs.existsSync(path.join(queueDir, `${id}.json`))).toBe(false);
expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
});
it("loadPendingDeliveries cleans up stale .delivered markers without replaying", async () => {
const id = await enqueueDelivery(
{ channel: "telegram", to: "99", payloads: [{ text: "stale" }] },
tmpDir,
);
const queueDir = path.join(tmpDir, "delivery-queue");
// Simulate crash between ack phase 1 (rename) and phase 2 (unlink):
// rename .json → .delivered, then pretend the process died.
fs.renameSync(path.join(queueDir, `${id}.json`), path.join(queueDir, `${id}.delivered`));
const entries = await loadPendingDeliveries(tmpDir);
// The .delivered entry must NOT appear as pending.
expect(entries).toHaveLength(0);
// And the marker file should have been cleaned up.
expect(fs.existsSync(path.join(queueDir, `${id}.delivered`))).toBe(false);
});
});
describe("failDelivery", () => {