refactor: scope skipQueue to retryTransient path only

Non-retrying direct delivery (structured content / thread) keeps the
write-ahead queue so recoverPendingDeliveries can replay after a crash.

Addresses review feedback from codex-connector.
This commit is contained in:
openperf
2026-03-09 14:07:34 +08:00
parent 157769f83f
commit ea5ae5c5da
2 changed files with 34 additions and 10 deletions

View File

@@ -212,6 +212,9 @@ describe("dispatchCronDelivery — double-announce guard", () => {
channel: "telegram",
to: "123456",
payloads: [{ text: "Detailed child result, everything finished successfully." }],
// Text delivery goes through finalizeTextDelivery which uses
// retryTransient: true, so skipQueue must be set.
skipQueue: true,
}),
);
});
@@ -266,7 +269,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
expect(state.deliveryAttempted).toBe(false);
});
it("direct delivery passes skipQueue=true to avoid duplicate queue entries on transient retry", async () => {
it("text delivery (retryTransient path) passes skipQueue=true to avoid duplicate queue entries", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
@@ -278,7 +281,8 @@ describe("dispatchCronDelivery — double-announce guard", () => {
expect(state.deliveryAttempted).toBe(true);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
// The call must include skipQueue: true so transient retries inside
// Text delivery goes through finalizeTextDelivery → deliverViaDirect with
// retryTransient: true. skipQueue must be set so transient retries inside
// retryTransientDirectCronDelivery do not enqueue additional write-ahead
// entries that would cause duplicate sends.
// See: https://github.com/openclaw/openclaw/issues/40545
@@ -292,6 +296,24 @@ describe("dispatchCronDelivery — double-announce guard", () => {
);
});
it("structured/thread delivery (non-retryTransient path) keeps write-ahead queue", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
const params = makeBaseParams({ synthesizedText: "Report attached." });
// Simulate structured content so useDirectDelivery path is taken (no retryTransient)
(params as Record<string, unknown>).deliveryPayloadHasStructuredContent = true;
await dispatchCronDelivery(params);
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
// Non-retrying path should NOT set skipQueue, preserving crash-recovery
// via recoverPendingDeliveries.
expect(deliverOutboundPayloads).toHaveBeenCalledWith(
expect.not.objectContaining({ skipQueue: true }),
);
});
it("transient retry delivers exactly once after initial transient failure", async () => {
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);

View File

@@ -241,7 +241,7 @@ export async function dispatchCronDelivery(
agentId: params.agentId,
sessionKey: params.agentSessionKey,
});
const runDelivery = async () =>
const runDelivery = async (skipQueue?: boolean) =>
await deliverOutboundPayloads({
cfg: params.cfgWithAgentDefaults,
channel: delivery.channel,
@@ -254,19 +254,21 @@ export async function dispatchCronDelivery(
bestEffort: params.deliveryBestEffort,
deps: createOutboundSendDeps(params.deps),
abortSignal: params.abortSignal,
// Skip the write-ahead delivery queue for cron direct delivery.
// retryTransientDirectCronDelivery already handles transient retries;
// without skipQueue each retry attempt enqueues a *new* queue entry,
// causing duplicate sends when the first attempt actually succeeded
// but threw a transient error (e.g. gateway timeout / econnreset).
// Skip the write-ahead delivery queue only when retrying transient
// errors. retryTransientDirectCronDelivery already provides
// resilience; without skipQueue each retry attempt enqueues a *new*
// queue entry, causing duplicate sends when the first attempt
// actually succeeded but threw a transient error (e.g. gateway
// timeout / econnreset). Non-retrying callers keep the queue so
// recoverPendingDeliveries can replay after a crash.
// See: https://github.com/openclaw/openclaw/issues/40545
skipQueue: true,
...(skipQueue ? { skipQueue: true } : {}),
});
const deliveryResults = options?.retryTransient
? await retryTransientDirectCronDelivery({
jobId: params.job.id,
signal: params.abortSignal,
run: runDelivery,
run: () => runDelivery(true),
})
: await runDelivery();
delivered = deliveryResults.length > 0;