mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-30 01:06:11 +00:00
refactor: scope skipQueue to retryTransient path only
Non-retrying direct delivery (structured content / thread) keeps the write-ahead queue so recoverPendingDeliveries can replay after a crash. Addresses review feedback from codex-connector.
This commit is contained in:
@@ -212,6 +212,9 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
||||
channel: "telegram",
|
||||
to: "123456",
|
||||
payloads: [{ text: "Detailed child result, everything finished successfully." }],
|
||||
// Text delivery goes through finalizeTextDelivery which uses
|
||||
// retryTransient: true, so skipQueue must be set.
|
||||
skipQueue: true,
|
||||
}),
|
||||
);
|
||||
});
|
||||
@@ -266,7 +269,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
||||
expect(state.deliveryAttempted).toBe(false);
|
||||
});
|
||||
|
||||
it("direct delivery passes skipQueue=true to avoid duplicate queue entries on transient retry", async () => {
|
||||
it("text delivery (retryTransient path) passes skipQueue=true to avoid duplicate queue entries", async () => {
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
|
||||
@@ -278,7 +281,8 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
||||
expect(state.deliveryAttempted).toBe(true);
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
|
||||
|
||||
// The call must include skipQueue: true so transient retries inside
|
||||
// Text delivery goes through finalizeTextDelivery → deliverViaDirect with
|
||||
// retryTransient: true. skipQueue must be set so transient retries inside
|
||||
// retryTransientDirectCronDelivery do not enqueue additional write-ahead
|
||||
// entries that would cause duplicate sends.
|
||||
// See: https://github.com/openclaw/openclaw/issues/40545
|
||||
@@ -292,6 +296,24 @@ describe("dispatchCronDelivery — double-announce guard", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("structured/thread delivery (non-retryTransient path) keeps write-ahead queue", async () => {
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
|
||||
|
||||
const params = makeBaseParams({ synthesizedText: "Report attached." });
|
||||
// Simulate structured content so useDirectDelivery path is taken (no retryTransient)
|
||||
(params as Record<string, unknown>).deliveryPayloadHasStructuredContent = true;
|
||||
await dispatchCronDelivery(params);
|
||||
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
|
||||
// Non-retrying path should NOT set skipQueue, preserving crash-recovery
|
||||
// via recoverPendingDeliveries.
|
||||
expect(deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect.not.objectContaining({ skipQueue: true }),
|
||||
);
|
||||
});
|
||||
|
||||
it("transient retry delivers exactly once after initial transient failure", async () => {
|
||||
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
|
||||
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
|
||||
|
||||
@@ -241,7 +241,7 @@ export async function dispatchCronDelivery(
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.agentSessionKey,
|
||||
});
|
||||
const runDelivery = async () =>
|
||||
const runDelivery = async (skipQueue?: boolean) =>
|
||||
await deliverOutboundPayloads({
|
||||
cfg: params.cfgWithAgentDefaults,
|
||||
channel: delivery.channel,
|
||||
@@ -254,19 +254,21 @@ export async function dispatchCronDelivery(
|
||||
bestEffort: params.deliveryBestEffort,
|
||||
deps: createOutboundSendDeps(params.deps),
|
||||
abortSignal: params.abortSignal,
|
||||
// Skip the write-ahead delivery queue for cron direct delivery.
|
||||
// retryTransientDirectCronDelivery already handles transient retries;
|
||||
// without skipQueue each retry attempt enqueues a *new* queue entry,
|
||||
// causing duplicate sends when the first attempt actually succeeded
|
||||
// but threw a transient error (e.g. gateway timeout / econnreset).
|
||||
// Skip the write-ahead delivery queue only when retrying transient
|
||||
// errors. retryTransientDirectCronDelivery already provides
|
||||
// resilience; without skipQueue each retry attempt enqueues a *new*
|
||||
// queue entry, causing duplicate sends when the first attempt
|
||||
// actually succeeded but threw a transient error (e.g. gateway
|
||||
// timeout / econnreset). Non-retrying callers keep the queue so
|
||||
// recoverPendingDeliveries can replay after a crash.
|
||||
// See: https://github.com/openclaw/openclaw/issues/40545
|
||||
skipQueue: true,
|
||||
...(skipQueue ? { skipQueue: true } : {}),
|
||||
});
|
||||
const deliveryResults = options?.retryTransient
|
||||
? await retryTransientDirectCronDelivery({
|
||||
jobId: params.job.id,
|
||||
signal: params.abortSignal,
|
||||
run: runDelivery,
|
||||
run: () => runDelivery(true),
|
||||
})
|
||||
: await runDelivery();
|
||||
delivered = deliveryResults.length > 0;
|
||||
|
||||
Reference in New Issue
Block a user