refactor: distill ACP reconnect prompt state

This commit is contained in:
Ayaan Zaidi
2026-04-02 14:39:58 +05:30
parent 657295c347
commit e48a7b9be8

View File

@@ -66,6 +66,8 @@ type PendingPrompt = {
sessionKey: string;
idempotencyKey: string;
sendAccepted?: boolean;
disconnectGeneration?: number;
disconnectReason?: string;
resolve: (response: PromptResponse) => void;
reject: (err: Error) => void;
sentTextLength?: number;
@@ -409,7 +411,6 @@ export class AcpGatewayAgent implements Agent {
private sessionStore: AcpSessionStore;
private sessionCreateRateLimiter: FixedWindowRateLimiter;
private pendingPrompts = new Map<string, PendingPrompt>();
private disconnectDeadlineRunIds = new Map<string, string>();
private disconnectTimer: NodeJS.Timeout | null = null;
private disconnectGeneration = 0;
@@ -449,7 +450,7 @@ export class AcpGatewayAgent implements Agent {
handleGatewayReconnect(): void {
this.log("gateway reconnected");
void this.reconcilePendingPromptsOnReconnect(this.disconnectGeneration);
void this.reconcilePendingPrompts(this.disconnectGeneration, false);
}
handleGatewayDisconnect(reason: string): void {
@@ -458,16 +459,14 @@ export class AcpGatewayAgent implements Agent {
return;
}
this.disconnectGeneration += 1;
this.disconnectDeadlineRunIds = new Map(
[...this.pendingPrompts.entries()].map(([sessionId, pending]) => [
sessionId,
pending.idempotencyKey,
]),
);
for (const pending of this.pendingPrompts.values()) {
pending.disconnectGeneration = this.disconnectGeneration;
pending.disconnectReason = reason;
}
this.clearDisconnectTimer();
this.disconnectTimer = setTimeout(() => {
this.disconnectTimer = null;
void this.expireDisconnectDeadline(new Error(`Gateway disconnected: ${reason}`));
void this.reconcilePendingPrompts(this.disconnectGeneration, true);
}, ACP_GATEWAY_DISCONNECT_GRACE_MS);
this.disconnectTimer.unref?.();
}
@@ -746,7 +745,6 @@ export class AcpGatewayAgent implements Agent {
return;
}
this.pendingPrompts.delete(params.sessionId);
this.disconnectDeadlineRunIds.delete(params.sessionId);
this.sessionStore.clearActiveRun(params.sessionId);
if (this.pendingPrompts.size === 0) {
this.clearDisconnectTimer();
@@ -1009,7 +1007,6 @@ export class AcpGatewayAgent implements Agent {
stopReason: StopReason,
): Promise<void> {
this.pendingPrompts.delete(sessionId);
this.disconnectDeadlineRunIds.delete(sessionId);
this.sessionStore.clearActiveRun(sessionId);
if (this.pendingPrompts.size === 0) {
this.clearDisconnectTimer();
@@ -1046,68 +1043,18 @@ export class AcpGatewayAgent implements Agent {
this.disconnectTimer = null;
}
private rejectPendingPrompts(
error: Error,
promptRuns: Iterable<readonly [string, string]>,
): void {
for (const [sessionId, runId] of promptRuns) {
const pending = this.getPendingPrompt(sessionId, runId);
if (!pending) {
continue;
}
pending.reject(error);
this.sessionStore.clearActiveRun(pending.sessionId);
this.pendingPrompts.delete(sessionId);
this.disconnectDeadlineRunIds.delete(sessionId);
private rejectPendingPrompt(pending: PendingPrompt, error: Error): void {
this.pendingPrompts.delete(pending.sessionId);
this.sessionStore.clearActiveRun(pending.sessionId);
if (this.pendingPrompts.size === 0) {
this.clearDisconnectTimer();
}
pending.reject(error);
}
private async expireDisconnectDeadline(error: Error): Promise<void> {
const deadlinePromptRuns = [...this.disconnectDeadlineRunIds.entries()];
for (const [sessionId, runId] of deadlinePromptRuns) {
const pending = this.getPendingPrompt(sessionId, runId);
if (!pending) {
this.disconnectDeadlineRunIds.delete(sessionId);
continue;
}
if (!pending.sendAccepted) {
this.rejectPendingPrompts(error, [[sessionId, runId]]);
continue;
}
let result: AgentWaitResult | undefined;
try {
result = await this.gateway.request<AgentWaitResult>(
"agent.wait",
{
runId,
timeoutMs: 0,
},
{ timeoutMs: null },
);
} catch {
this.rejectPendingPrompts(error, [[sessionId, runId]]);
continue;
}
const currentPending = this.getPendingPrompt(sessionId, runId);
if (!currentPending) {
continue;
}
if (result?.status === "ok") {
await this.finishPrompt(sessionId, currentPending, "end_turn");
continue;
}
if (result?.status === "error") {
void this.finishPrompt(sessionId, currentPending, "end_turn");
continue;
}
this.rejectPendingPrompts(error, [[sessionId, runId]]);
}
}
private async reconcilePendingPromptsOnReconnect(
private async reconcilePendingPrompts(
observedDisconnectGeneration: number,
deadlineExpired: boolean,
): Promise<void> {
if (this.pendingPrompts.size === 0) {
if (this.disconnectGeneration === observedDisconnectGeneration) {
@@ -1118,54 +1065,88 @@ export class AcpGatewayAgent implements Agent {
const pendingEntries = [...this.pendingPrompts.entries()];
let keepDisconnectTimer = false;
const nextDisconnectDeadlineRunIds = new Map<string, string>();
for (const [sessionId, pending] of pendingEntries) {
if (this.pendingPrompts.get(sessionId) !== pending) {
continue;
}
let result: AgentWaitResult | undefined;
try {
result = await this.gateway.request<AgentWaitResult>(
"agent.wait",
{
runId: pending.idempotencyKey,
timeoutMs: 0,
},
{ timeoutMs: null },
);
} catch (err) {
this.log(`agent.wait reconcile failed for ${pending.idempotencyKey}: ${String(err)}`);
if (pending.disconnectGeneration !== observedDisconnectGeneration) {
continue;
}
const shouldKeepPending = await this.reconcilePendingPrompt(
sessionId,
pending,
deadlineExpired,
);
if (shouldKeepPending) {
keepDisconnectTimer = true;
nextDisconnectDeadlineRunIds.set(sessionId, pending.idempotencyKey);
continue;
}
if (this.pendingPrompts.get(sessionId) !== pending) {
continue;
}
if (result?.status === "ok") {
await this.finishPrompt(sessionId, pending, "end_turn");
continue;
}
if (result?.status === "error") {
void this.finishPrompt(sessionId, pending, "end_turn");
continue;
}
if (result?.status === "timeout") {
keepDisconnectTimer = true;
nextDisconnectDeadlineRunIds.set(sessionId, pending.idempotencyKey);
continue;
}
}
if (this.disconnectGeneration === observedDisconnectGeneration) {
this.disconnectDeadlineRunIds = nextDisconnectDeadlineRunIds;
}
if (!keepDisconnectTimer && this.disconnectGeneration === observedDisconnectGeneration) {
this.clearDisconnectTimer();
}
}
private async reconcilePendingPrompt(
sessionId: string,
pending: PendingPrompt,
deadlineExpired: boolean,
): Promise<boolean> {
if (!pending.sendAccepted) {
if (deadlineExpired) {
this.rejectPendingPrompt(
pending,
new Error(`Gateway disconnected: ${pending.disconnectReason}`),
);
return false;
}
return true;
}
let result: AgentWaitResult | undefined;
try {
result = await this.gateway.request<AgentWaitResult>(
"agent.wait",
{
runId: pending.idempotencyKey,
timeoutMs: 0,
},
{ timeoutMs: null },
);
} catch (err) {
this.log(`agent.wait reconcile failed for ${pending.idempotencyKey}: ${String(err)}`);
if (deadlineExpired) {
this.rejectPendingPrompt(
pending,
new Error(`Gateway disconnected: ${pending.disconnectReason}`),
);
return false;
}
return true;
}
const currentPending = this.getPendingPrompt(sessionId, pending.idempotencyKey);
if (!currentPending) {
return false;
}
if (result?.status === "ok") {
await this.finishPrompt(sessionId, currentPending, "end_turn");
return false;
}
if (result?.status === "error") {
void this.finishPrompt(sessionId, currentPending, "end_turn");
return false;
}
if (deadlineExpired) {
this.rejectPendingPrompt(
currentPending,
new Error(`Gateway disconnected: ${currentPending.disconnectReason}`),
);
return false;
}
return true;
}
private async sendAvailableCommands(sessionId: string): Promise<void> {
await this.connection.sessionUpdate({
sessionId,