fix(reply): preserve no-debounce inbound concurrency

This commit is contained in:
Vincent Koc
2026-03-23 10:05:06 -07:00
committed by Peter Steinberger
parent 7bc8e67d2a
commit 3de42e946a
6 changed files with 122 additions and 21 deletions

View File

@@ -68,7 +68,11 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
try {
await params.onFlush(items);
} catch (err) {
params.onError?.(err, items);
try {
params.onError?.(err, items);
} catch {
// Keep the keyed chain alive even if the error handler fails.
}
}
};
@@ -153,23 +157,27 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
if (!canDebounce || !key) {
if (key) {
if (!buffers.has(key)) {
if (buffers.has(key)) {
// Reserve the keyed immediate slot before forcing the pending buffer
// to flush so fire-and-forget callers cannot be overtaken.
const reservedTask = enqueueReservedKeyTask(key, async () => {
await runFlush([item]);
});
try {
await flushKey(key);
} finally {
reservedTask.release();
}
await reservedTask.task;
return;
}
if (keyChains.has(key)) {
await enqueueKeyTask(key, async () => {
await runFlush([item]);
});
return;
}
// Reserve the keyed immediate slot before forcing the pending buffer
// to flush so fire-and-forget callers cannot be overtaken.
const reservedTask = enqueueReservedKeyTask(key, async () => {
await runFlush([item]);
});
try {
await flushKey(key);
} finally {
reservedTask.release();
}
await reservedTask.task;
await runFlush([item]);
} else {
await runFlush([item]);
}

View File

@@ -469,6 +469,56 @@ describe("createInboundDebouncer", () => {
setTimeoutSpy.mockRestore();
}
});
it("does not serialize keyed turns when debounce is disabled and no keyed chain exists", async () => {
const started: string[] = [];
let releaseFirst!: () => void;
const firstGate = new Promise<void>((resolve) => {
releaseFirst = resolve;
});
const debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 0,
buildKey: (item) => item.key,
onFlush: async (items) => {
const id = items[0]?.id ?? "";
started.push(id);
if (id === "1") {
await firstGate;
}
},
});
const first = debouncer.enqueue({ key: "a", id: "1" });
await Promise.resolve();
const second = debouncer.enqueue({ key: "a", id: "2" });
await Promise.resolve();
expect(started).toEqual(["1", "2"]);
releaseFirst();
await Promise.all([first, second]);
});
it("swallows onError failures so keyed chains still complete", async () => {
const calls: string[] = [];
const debouncer = createInboundDebouncer<{ key: string; id: string }>({
debounceMs: 0,
buildKey: (item) => item.key,
onFlush: async (items) => {
calls.push(items[0]?.id ?? "");
throw new Error("flush failed");
},
onError: () => {
throw new Error("handler failed");
},
});
await expect(debouncer.enqueue({ key: "a", id: "1" })).resolves.toBeUndefined();
await expect(debouncer.enqueue({ key: "a", id: "2" })).resolves.toBeUndefined();
expect(calls).toEqual(["1", "2"]);
});
});
describe("initSessionState BodyStripped", () => {

View File

@@ -233,7 +233,14 @@ export async function runReplyAgent(params: {
}
if (activeRunQueueAction === "enqueue-followup") {
enqueueFollowupRun(queueKey, followupRun, resolvedQueue, "message-id", queuedRunFollowupTurn);
enqueueFollowupRun(
queueKey,
followupRun,
resolvedQueue,
"message-id",
queuedRunFollowupTurn,
false,
);
await touchActiveSessionEntry();
typing.cleanup();
return undefined;

View File

@@ -83,9 +83,10 @@ export function scheduleFollowupDrain(
if (!queue) {
return;
}
const effectiveRunFollowup = FOLLOWUP_RUN_CALLBACKS.get(key) ?? runFollowup;
// Cache callback only when a drain actually starts. Avoid keeping stale
// callbacks around from finalize calls where no queue work is pending.
rememberFollowupDrainCallback(key, runFollowup);
rememberFollowupDrainCallback(key, effectiveRunFollowup);
void (async () => {
try {
const collectState = { forceIndividualCollect: false };
@@ -104,7 +105,7 @@ export function scheduleFollowupDrain(
collectState,
isCrossChannel,
items: queue.items,
run: runFollowup,
run: effectiveRunFollowup,
});
if (collectDrainResult === "empty") {
break;
@@ -128,7 +129,7 @@ export function scheduleFollowupDrain(
summary,
renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(),
});
await runFollowup({
await effectiveRunFollowup({
prompt,
run,
enqueuedAt: Date.now(),
@@ -149,7 +150,7 @@ export function scheduleFollowupDrain(
}
if (
!(await drainNextQueueItem(queue.items, async (item) => {
await runFollowup({
await effectiveRunFollowup({
prompt: summaryPrompt,
run,
enqueuedAt: Date.now(),
@@ -166,7 +167,7 @@ export function scheduleFollowupDrain(
continue;
}
if (!(await drainNextQueueItem(queue.items, runFollowup))) {
if (!(await drainNextQueueItem(queue.items, effectiveRunFollowup))) {
break;
}
}
@@ -178,7 +179,7 @@ export function scheduleFollowupDrain(
if (queue.items.length === 0 && queue.droppedCount === 0) {
FOLLOWUP_QUEUES.delete(key);
} else {
scheduleFollowupDrain(key, runFollowup);
scheduleFollowupDrain(key, effectiveRunFollowup);
}
}
})();

View File

@@ -60,6 +60,7 @@ export function enqueueFollowupRun(
settings: QueueSettings,
dedupeMode: QueueDedupeMode = "message-id",
runFollowup?: (run: FollowupRun) => Promise<void>,
restartIfIdle = true,
): boolean {
const queue = getFollowupQueue(key, settings);
const recentMessageIdKey = dedupeMode !== "none" ? buildRecentMessageIdKey(run, key) : undefined;
@@ -99,7 +100,7 @@ export function enqueueFollowupRun(
// If drain finished and deleted the queue before this item arrived, a new queue
// object was created (draining: false) but nobody scheduled a drain for it.
// Use the cached callback to restart the drain now.
if (!queue.draining) {
if (restartIfIdle && !queue.draining) {
kickFollowupDrainIfIdle(key);
}
return true;

View File

@@ -1535,6 +1535,40 @@ describe("followup queue drain restart after idle window", () => {
expect(freshCalls[0]?.prompt).toBe("after-idle");
});
it("does not auto-start a drain when a busy run only refreshes the callback", async () => {
const key = `test-busy-run-refreshes-callback-${Date.now()}`;
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
const staleCalls: FollowupRun[] = [];
const freshCalls: FollowupRun[] = [];
const staleFollowup = async (run: FollowupRun) => {
staleCalls.push(run);
};
const freshFollowup = async (run: FollowupRun) => {
freshCalls.push(run);
};
enqueueFollowupRun(
key,
createRun({ prompt: "queued-while-busy" }),
settings,
"message-id",
freshFollowup,
false,
);
await new Promise<void>((resolve) => setImmediate(resolve));
expect(freshCalls).toHaveLength(0);
scheduleFollowupDrain(key, staleFollowup);
await vi.waitFor(() => {
expect(freshCalls).toHaveLength(1);
});
expect(staleCalls).toHaveLength(0);
expect(freshCalls[0]?.prompt).toBe("queued-while-busy");
});
it("restarts an idle drain across distinct enqueue and drain module instances", async () => {
const drainA = await importFreshModule<typeof import("./queue/drain.js")>(
import.meta.url,