fix: keep ACP prompts alive across gateway reconnects (#59473)

* fix: keep acp prompts alive across gateway reconnects

* fix: bound ACP prompts after disconnect grace

* fix: preserve ACP send timeout semantics

* fix: defer pre-ack ACP disconnect failures

* fix: reconcile ACP runs after reconnect

* fix: keep ACP reconnect deadlines monotonic

* fix: keep pre-ack ACP deadlines after reconnect

* fix: keep ACP prompts alive across gateway reconnects (#59473)

* fix: reject superseded ACP pre-ack prompts (#59473)

* style: format ACP reconnect regression updates (#59473)

* style: format ACP reconnect regression updates (#59473)

* fix: guard ACP send acceptance by run id (#59473)

* fix: scope ACP reconnect deadline by prompt (#59473)

* fix: recheck ACP prompts at reconnect deadline (#59473)

* fix: key ACP reconnect deadline by run (#59473)
This commit is contained in:
Ayaan Zaidi
2026-04-02 14:34:11 +05:30
committed by GitHub
parent c27b45fd12
commit 304da2cbd7
4 changed files with 730 additions and 15 deletions

View File

@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
- Feishu/comment threads: harden document comment-thread delivery so whole-document comments fall back to `add_comment`, delayed reply lookups retry more reliably, and user-visible replies avoid reasoning/planning spillover. (#59129) Thanks @wittam-01.
- Matrix/streaming: keep live partial previews for the current assistant block while preserving completed block updates as separate messages when `channels.matrix.blockStreaming` is enabled. (#59384) thanks @gumadeiras
- Kimi Coding/tools: normalize Anthropic tool payloads into the OpenAI-compatible function shape Kimi Coding expects so tool calls stop losing required arguments. (#59440) Thanks @obviyus.
- ACP/gateway reconnects: keep ACP prompts alive across transient websocket drops while still failing boundedly when reconnect recovery does not complete. (#59473) Thanks @obviyus.
- Exec approvals/config: strip invalid `security`, `ask`, and `askFallback` values from `~/.openclaw/exec-approvals.json` during normalization so malformed policy enums fall back cleanly to the documented defaults instead of corrupting runtime policy resolution. (#59112) Thanks @openperf.
## 2026.4.1-beta.1

View File

@@ -83,7 +83,7 @@ describe("acp prompt cwd prefix", () => {
expect.objectContaining({
message: expect.stringMatching(/\[Working directory: ~[\\/]openclaw-test\]/),
}),
{ expectFinal: true },
{ timeoutMs: null },
);
});
@@ -94,7 +94,7 @@ describe("acp prompt cwd prefix", () => {
expect.objectContaining({
message: expect.stringContaining("[Working directory: ~\\openclaw-test]"),
}),
{ expectFinal: true },
{ timeoutMs: null },
);
});
@@ -111,7 +111,7 @@ describe("acp prompt cwd prefix", () => {
},
systemProvenanceReceipt: undefined,
}),
{ expectFinal: true },
{ timeoutMs: null },
);
});
@@ -128,28 +128,28 @@ describe("acp prompt cwd prefix", () => {
},
systemProvenanceReceipt: expect.stringContaining("[Source Receipt]"),
}),
{ expectFinal: true },
{ timeoutMs: null },
);
expect(requestSpy).toHaveBeenCalledWith(
"chat.send",
expect.objectContaining({
systemProvenanceReceipt: expect.stringContaining("bridge=openclaw-acp"),
}),
{ expectFinal: true },
{ timeoutMs: null },
);
expect(requestSpy).toHaveBeenCalledWith(
"chat.send",
expect.objectContaining({
systemProvenanceReceipt: expect.stringContaining(`originSessionId=${TEST_SESSION_ID}`),
}),
{ expectFinal: true },
{ timeoutMs: null },
);
expect(requestSpy).toHaveBeenCalledWith(
"chat.send",
expect.objectContaining({
systemProvenanceReceipt: expect.stringContaining(`targetSession=${TEST_SESSION_KEY}`),
}),
{ expectFinal: true },
{ timeoutMs: null },
);
});
@@ -192,7 +192,7 @@ describe("acp prompt cwd prefix", () => {
},
systemProvenanceReceipt: expect.stringContaining("[Source Receipt]"),
}),
{ expectFinal: true },
{ timeoutMs: null },
);
expect(requestSpy).toHaveBeenNthCalledWith(
2,
@@ -201,7 +201,7 @@ describe("acp prompt cwd prefix", () => {
systemInputProvenance: expect.anything(),
systemProvenanceReceipt: expect.anything(),
}),
{ expectFinal: true },
{ timeoutMs: null },
);
});
});

View File

@@ -108,4 +108,534 @@ describe("acp translator stop reason mapping", () => {
await expect(promptPromise).resolves.toEqual({ stopReason: "cancelled" });
});
it("keeps in-flight prompts pending across transient gateway disconnects", async () => {
const { agent, promptPromise, runId } = await createPendingPromptHarness();
const settleSpy = vi.fn();
void promptPromise.then(
(value) => settleSpy({ kind: "resolve", value }),
(error) => settleSpy({ kind: "reject", error }),
);
agent.handleGatewayDisconnect("1006: connection lost");
await Promise.resolve();
expect(settleSpy).not.toHaveBeenCalled();
agent.handleGatewayReconnect();
await agent.handleGatewayEvent(
createChatEvent({
runId,
sessionKey: "agent:main:main",
seq: 1,
state: "final",
}),
);
await expect(promptPromise).resolves.toEqual({ stopReason: "end_turn" });
});
it("rejects in-flight prompts when the gateway does not reconnect before the grace window", async () => {
vi.useFakeTimers();
try {
const { agent, promptPromise } = await createPendingPromptHarness();
void promptPromise.catch(() => {});
agent.handleGatewayDisconnect("1006: connection lost");
await vi.advanceTimersByTimeAsync(5_000);
await expect(promptPromise).rejects.toThrow("Gateway disconnected: 1006: connection lost");
} finally {
vi.useRealTimers();
}
});
it("keeps pre-ack send disconnects inside the reconnect grace window", async () => {
vi.useFakeTimers();
try {
const sessionStore = createInMemorySessionStore();
sessionStore.createSession({
sessionId: "session-1",
sessionKey: "agent:main:main",
cwd: "/tmp",
});
const request = vi.fn(async (method: string) => {
if (method === "chat.send") {
throw new Error("gateway closed (1006): connection lost");
}
return {};
}) as GatewayClient["request"];
const agent = new AcpGatewayAgent(createAcpConnection(), createAcpGateway(request), {
sessionStore,
});
const promptPromise = agent.prompt({
sessionId: "session-1",
prompt: [{ type: "text", text: "hello" }],
_meta: {},
} as unknown as PromptRequest);
const settleSpy = vi.fn();
void promptPromise.then(
(value) => settleSpy({ kind: "resolve", value }),
(error) => settleSpy({ kind: "reject", error }),
);
await Promise.resolve();
expect(settleSpy).not.toHaveBeenCalled();
agent.handleGatewayDisconnect("1006: connection lost");
await vi.advanceTimersByTimeAsync(4_999);
expect(settleSpy).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(1);
await expect(promptPromise).rejects.toThrow("Gateway disconnected: 1006: connection lost");
} finally {
vi.useRealTimers();
}
});
it("reconciles a missed final event on reconnect via agent.wait", async () => {
const sessionId = "session-1";
const sessionKey = "agent:main:main";
let runId: string | undefined;
const request = vi.fn(async (method: string, params?: Record<string, unknown>) => {
if (method === "chat.send") {
runId = params?.idempotencyKey as string | undefined;
return {};
}
if (method === "agent.wait") {
return { status: "ok" };
}
return {};
}) as GatewayClient["request"];
const sessionStore = createInMemorySessionStore();
sessionStore.createSession({
sessionId,
sessionKey,
cwd: "/tmp",
});
const agent = new AcpGatewayAgent(createAcpConnection(), createAcpGateway(request), {
sessionStore,
});
const promptPromise = agent.prompt({
sessionId,
prompt: [{ type: "text", text: "hello" }],
_meta: {},
} as unknown as PromptRequest);
await vi.waitFor(() => {
expect(runId).toBeDefined();
});
agent.handleGatewayDisconnect("1006: connection lost");
agent.handleGatewayReconnect();
await expect(promptPromise).resolves.toEqual({ stopReason: "end_turn" });
expect(request).toHaveBeenCalledWith(
"agent.wait",
{
runId,
timeoutMs: 0,
},
{ timeoutMs: null },
);
});
it("rechecks accepted prompts at the disconnect deadline after reconnect timeout", async () => {
vi.useFakeTimers();
try {
const sessionId = "session-1";
const sessionKey = "agent:main:main";
let waitCount = 0;
const request = vi.fn(async (method: string, params?: Record<string, unknown>) => {
if (method === "chat.send") {
return {};
}
if (method === "agent.wait") {
waitCount += 1;
expect(params).toEqual({
runId: expect.any(String),
timeoutMs: 0,
});
return waitCount === 1 ? { status: "timeout" } : { status: "ok" };
}
return {};
}) as GatewayClient["request"];
const sessionStore = createInMemorySessionStore();
sessionStore.createSession({
sessionId,
sessionKey,
cwd: "/tmp",
});
const agent = new AcpGatewayAgent(createAcpConnection(), createAcpGateway(request), {
sessionStore,
});
const promptPromise = agent.prompt({
sessionId,
prompt: [{ type: "text", text: "hello" }],
_meta: {},
} as unknown as PromptRequest);
const settleSpy = vi.fn();
void promptPromise.then(
(value) => settleSpy({ kind: "resolve", value }),
(error) => settleSpy({ kind: "reject", error }),
);
await Promise.resolve();
agent.handleGatewayDisconnect("1006: connection lost");
agent.handleGatewayReconnect();
await Promise.resolve();
await vi.advanceTimersByTimeAsync(4_999);
expect(settleSpy).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(1);
await expect(promptPromise).resolves.toEqual({ stopReason: "end_turn" });
} finally {
vi.useRealTimers();
}
});
it("does not clear a newer disconnect deadline while reconnect reconciliation is still running", async () => {
vi.useFakeTimers();
try {
const sessionId = "session-1";
const sessionKey = "agent:main:main";
let resolveAgentWait: ((value: { status: "timeout" }) => void) | undefined;
let agentWaitCount = 0;
const request = vi.fn(async (method: string) => {
if (method === "chat.send") {
return {};
}
if (method === "agent.wait") {
agentWaitCount += 1;
if (agentWaitCount > 1) {
return { status: "timeout" };
}
return await new Promise<{ status: "timeout" }>((resolve) => {
resolveAgentWait = resolve;
});
}
return {};
}) as GatewayClient["request"];
const sessionStore = createInMemorySessionStore();
sessionStore.createSession({
sessionId,
sessionKey,
cwd: "/tmp",
});
const agent = new AcpGatewayAgent(createAcpConnection(), createAcpGateway(request), {
sessionStore,
});
const promptPromise = agent.prompt({
sessionId,
prompt: [{ type: "text", text: "hello" }],
_meta: {},
} as unknown as PromptRequest);
const settleSpy = vi.fn();
void promptPromise.then(
(value) => settleSpy({ kind: "resolve", value }),
(error) => settleSpy({ kind: "reject", error }),
);
await Promise.resolve();
agent.handleGatewayDisconnect("1006: first disconnect");
agent.handleGatewayReconnect();
for (let attempt = 0; attempt < 5 && !resolveAgentWait; attempt += 1) {
await Promise.resolve();
}
expect(resolveAgentWait).toBeDefined();
agent.handleGatewayDisconnect("1006: second disconnect");
resolveAgentWait?.({ status: "timeout" });
await Promise.resolve();
await vi.advanceTimersByTimeAsync(4_999);
expect(settleSpy).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(1);
await expect(promptPromise).rejects.toThrow("Gateway disconnected: 1006: second disconnect");
} finally {
vi.useRealTimers();
}
});
it("keeps the disconnect deadline after reconnect when a pre-ack send never started", async () => {
vi.useFakeTimers();
try {
const sessionId = "session-1";
const sessionKey = "agent:main:main";
const request = vi.fn(async (method: string) => {
if (method === "chat.send") {
throw new Error("gateway closed (1006): connection lost");
}
if (method === "agent.wait") {
return { status: "timeout" };
}
return {};
}) as GatewayClient["request"];
const sessionStore = createInMemorySessionStore();
sessionStore.createSession({
sessionId,
sessionKey,
cwd: "/tmp",
});
const agent = new AcpGatewayAgent(createAcpConnection(), createAcpGateway(request), {
sessionStore,
});
const promptPromise = agent.prompt({
sessionId,
prompt: [{ type: "text", text: "hello" }],
_meta: {},
} as unknown as PromptRequest);
void promptPromise.catch(() => {});
await Promise.resolve();
agent.handleGatewayDisconnect("1006: connection lost");
agent.handleGatewayReconnect();
await Promise.resolve();
await vi.advanceTimersByTimeAsync(4_999);
await expect(Promise.race([promptPromise, Promise.resolve("pending")])).resolves.toBe(
"pending",
);
await vi.advanceTimersByTimeAsync(1);
await expect(promptPromise).rejects.toThrow("Gateway disconnected: 1006: connection lost");
} finally {
vi.useRealTimers();
}
});
it("rejects a superseded pre-ack prompt when a newer prompt has replaced the session entry", async () => {
const sessionId = "session-1";
const sessionKey = "agent:main:main";
let promptCount = 0;
const request = vi.fn(async (method: string) => {
if (method !== "chat.send") {
return {};
}
promptCount += 1;
if (promptCount === 1) {
throw new Error("gateway closed (1006): connection lost");
}
return {};
}) as GatewayClient["request"];
const sessionStore = createInMemorySessionStore();
sessionStore.createSession({
sessionId,
sessionKey,
cwd: "/tmp",
});
const agent = new AcpGatewayAgent(createAcpConnection(), createAcpGateway(request), {
sessionStore,
});
const firstPrompt = agent.prompt({
sessionId,
prompt: [{ type: "text", text: "first" }],
_meta: {},
} as unknown as PromptRequest);
await Promise.resolve();
const secondPrompt = agent.prompt({
sessionId,
prompt: [{ type: "text", text: "second" }],
_meta: {},
} as unknown as PromptRequest);
await expect(firstPrompt).rejects.toThrow("gateway closed (1006): connection lost");
await expect(Promise.race([secondPrompt, Promise.resolve("pending")])).resolves.toBe("pending");
});
it("keeps disconnect deadline when a superseded send resolves late", async () => {
vi.useFakeTimers();
try {
const sessionId = "session-1";
const sessionKey = "agent:main:main";
let firstSendResolve: (() => void) | undefined;
let sendCount = 0;
const request = vi.fn(async (method: string) => {
if (method === "chat.send") {
sendCount += 1;
if (sendCount === 1) {
return await new Promise<void>((resolve) => {
firstSendResolve = resolve;
});
}
throw new Error("gateway closed (1006): connection lost");
}
if (method === "agent.wait") {
return { status: "timeout" };
}
return {};
}) as GatewayClient["request"];
const sessionStore = createInMemorySessionStore();
sessionStore.createSession({
sessionId,
sessionKey,
cwd: "/tmp",
});
const agent = new AcpGatewayAgent(createAcpConnection(), createAcpGateway(request), {
sessionStore,
});
const firstPrompt = agent.prompt({
sessionId,
prompt: [{ type: "text", text: "first" }],
_meta: {},
} as unknown as PromptRequest);
void firstPrompt.catch(() => {});
await Promise.resolve();
expect(firstSendResolve).toBeDefined();
const secondPrompt = agent.prompt({
sessionId,
prompt: [{ type: "text", text: "second" }],
_meta: {},
} as unknown as PromptRequest);
void secondPrompt.catch(() => {});
await Promise.resolve();
expect(sendCount).toBe(2);
firstSendResolve?.();
await Promise.resolve();
agent.handleGatewayDisconnect("1006: connection lost");
agent.handleGatewayReconnect();
await vi.advanceTimersByTimeAsync(5_000);
await expect(secondPrompt).rejects.toThrow("Gateway disconnected: 1006: connection lost");
} finally {
vi.useRealTimers();
}
});
it("rejects only unrecovered prompts after reconnect reconciliation", async () => {
vi.useFakeTimers();
try {
let acceptedRunId: string | undefined;
let acceptedWaitCount = 0;
const requestMock = vi.fn(async (method: string, params?: Record<string, unknown>) => {
if (method === "chat.send") {
return params?.sessionKey === "agent:main:second"
? Promise.reject(new Error("gateway closed (1006): connection lost"))
: {};
}
if (method === "agent.wait") {
return params?.runId === acceptedRunId && acceptedRunId
? acceptedWaitCount++ === 0
? { status: "timeout" }
: { status: "ok" }
: { status: "timeout" };
}
return {};
});
const request = requestMock as GatewayClient["request"];
const sessionStore = createInMemorySessionStore();
sessionStore.createSession({
sessionId: "session-1",
sessionKey: "agent:main:first",
cwd: "/tmp",
});
sessionStore.createSession({
sessionId: "session-2",
sessionKey: "agent:main:second",
cwd: "/tmp",
});
const agent = new AcpGatewayAgent(createAcpConnection(), createAcpGateway(request), {
sessionStore,
});
const acceptedPrompt = agent.prompt({
sessionId: "session-1",
prompt: [{ type: "text", text: "accepted" }],
_meta: {},
} as unknown as PromptRequest);
const preAckPrompt = agent.prompt({
sessionId: "session-2",
prompt: [{ type: "text", text: "pre-ack" }],
_meta: {},
} as unknown as PromptRequest);
const acceptedSettleSpy = vi.fn();
void acceptedPrompt.then(
(value) => acceptedSettleSpy({ kind: "resolve", value }),
(error) => acceptedSettleSpy({ kind: "reject", error }),
);
void preAckPrompt.catch(() => {});
await Promise.resolve();
acceptedRunId = requestMock.mock.calls.find((call) => {
const [method, requestParams] = call;
return method === "chat.send" && requestParams?.sessionKey === "agent:main:first";
})?.[1]?.idempotencyKey as string | undefined;
agent.handleGatewayDisconnect("1006: connection lost");
agent.handleGatewayReconnect();
await Promise.resolve();
await vi.advanceTimersByTimeAsync(5_000);
await expect(acceptedPrompt).resolves.toEqual({ stopReason: "end_turn" });
await expect(preAckPrompt).rejects.toThrow("Gateway disconnected: 1006: connection lost");
} finally {
vi.useRealTimers();
}
});
it("does not let a stale disconnect deadline reject a newer prompt on the same session", async () => {
vi.useFakeTimers();
try {
const sessionId = "session-1";
const sessionKey = "agent:main:main";
let sendCount = 0;
const requestMock = vi.fn(async (method: string, params?: Record<string, unknown>) => {
if (method === "chat.send") {
sendCount += 1;
if (sendCount === 1) {
throw new Error("gateway closed (1006): connection lost");
}
return {};
}
if (method === "agent.wait") {
return params?.runId === firstRunId ? { status: "timeout" } : { status: "ok" };
}
return {};
});
const request = requestMock as GatewayClient["request"];
const sessionStore = createInMemorySessionStore();
sessionStore.createSession({
sessionId,
sessionKey,
cwd: "/tmp",
});
const agent = new AcpGatewayAgent(createAcpConnection(), createAcpGateway(request), {
sessionStore,
});
const firstPrompt = agent.prompt({
sessionId,
prompt: [{ type: "text", text: "first" }],
_meta: {},
} as unknown as PromptRequest);
void firstPrompt.catch(() => {});
await Promise.resolve();
const firstRunId = requestMock.mock.calls[0]?.[1]?.idempotencyKey as string;
agent.handleGatewayDisconnect("1006: connection lost");
agent.handleGatewayReconnect();
await Promise.resolve();
const secondPrompt = agent.prompt({
sessionId,
prompt: [{ type: "text", text: "second" }],
_meta: {},
} as unknown as PromptRequest);
await vi.advanceTimersByTimeAsync(5_000);
await expect(Promise.race([secondPrompt, Promise.resolve("pending")])).resolves.toBe(
"pending",
);
} finally {
vi.useRealTimers();
}
});
});

View File

@@ -59,11 +59,13 @@ const ACP_REASONING_LEVEL_CONFIG_ID = "reasoning_level";
const ACP_RESPONSE_USAGE_CONFIG_ID = "response_usage";
const ACP_ELEVATED_LEVEL_CONFIG_ID = "elevated_level";
const ACP_LOAD_SESSION_REPLAY_LIMIT = 1_000_000;
const ACP_GATEWAY_DISCONNECT_GRACE_MS = 5_000;
type PendingPrompt = {
sessionId: string;
sessionKey: string;
idempotencyKey: string;
sendAccepted?: boolean;
resolve: (response: PromptResponse) => void;
reject: (err: Error) => void;
sentTextLength?: number;
@@ -133,11 +135,20 @@ function isAdminScopeProvenanceRejection(err: unknown): boolean {
);
}
function isGatewayCloseError(err: unknown): boolean {
return err instanceof Error && err.message.startsWith("gateway closed (");
}
type SessionSnapshot = SessionPresentation & {
metadata?: SessionMetadata;
usage?: SessionUsageSnapshot;
};
type AgentWaitResult = {
status?: "ok" | "error" | "timeout";
error?: string;
};
type GatewayTranscriptMessage = {
role?: unknown;
content?: unknown;
@@ -398,6 +409,17 @@ export class AcpGatewayAgent implements Agent {
private sessionStore: AcpSessionStore;
private sessionCreateRateLimiter: FixedWindowRateLimiter;
private pendingPrompts = new Map<string, PendingPrompt>();
private disconnectDeadlineRunIds = new Map<string, string>();
private disconnectTimer: NodeJS.Timeout | null = null;
private disconnectGeneration = 0;
private getPendingPrompt(sessionId: string, runId: string): PendingPrompt | undefined {
const pending = this.pendingPrompts.get(sessionId);
if (pending?.idempotencyKey !== runId) {
return undefined;
}
return pending;
}
constructor(
connection: AgentSideConnection,
@@ -427,15 +449,27 @@ export class AcpGatewayAgent implements Agent {
handleGatewayReconnect(): void {
this.log("gateway reconnected");
void this.reconcilePendingPromptsOnReconnect(this.disconnectGeneration);
}
handleGatewayDisconnect(reason: string): void {
this.log(`gateway disconnected: ${reason}`);
for (const pending of this.pendingPrompts.values()) {
pending.reject(new Error(`Gateway disconnected: ${reason}`));
this.sessionStore.clearActiveRun(pending.sessionId);
if (this.pendingPrompts.size === 0) {
return;
}
this.pendingPrompts.clear();
this.disconnectGeneration += 1;
this.disconnectDeadlineRunIds = new Map(
[...this.pendingPrompts.entries()].map(([sessionId, pending]) => [
sessionId,
pending.idempotencyKey,
]),
);
this.clearDisconnectTimer();
this.disconnectTimer = setTimeout(() => {
this.disconnectTimer = null;
void this.expireDisconnectDeadline(new Error(`Gateway disconnected: ${reason}`));
}, ACP_GATEWAY_DISCONNECT_GRACE_MS);
this.disconnectTimer.unref?.();
}
async handleGatewayEvent(evt: EventFrame): Promise<void> {
@@ -685,14 +719,22 @@ export class AcpGatewayAgent implements Agent {
systemInputProvenance,
systemProvenanceReceipt,
},
{ expectFinal: true },
{ timeoutMs: null },
);
const pending = this.getPendingPrompt(params.sessionId, runId);
if (pending) {
pending.sendAccepted = true;
}
} catch (err) {
if (
(systemInputProvenance || systemProvenanceReceipt) &&
isAdminScopeProvenanceRejection(err)
) {
await this.gateway.request("chat.send", requestParams, { expectFinal: true });
await this.gateway.request("chat.send", requestParams, { timeoutMs: null });
const pending = this.getPendingPrompt(params.sessionId, runId);
if (pending) {
pending.sendAccepted = true;
}
return;
}
throw err;
@@ -700,8 +742,15 @@ export class AcpGatewayAgent implements Agent {
};
void sendWithProvenanceFallback().catch((err) => {
if (isGatewayCloseError(err) && this.getPendingPrompt(params.sessionId, runId)) {
return;
}
this.pendingPrompts.delete(params.sessionId);
this.disconnectDeadlineRunIds.delete(params.sessionId);
this.sessionStore.clearActiveRun(params.sessionId);
if (this.pendingPrompts.size === 0) {
this.clearDisconnectTimer();
}
reject(err instanceof Error ? err : new Error(String(err)));
});
});
@@ -733,6 +782,9 @@ export class AcpGatewayAgent implements Agent {
if (pending) {
this.pendingPrompts.delete(params.sessionId);
if (this.pendingPrompts.size === 0) {
this.clearDisconnectTimer();
}
pending.resolve({ stopReason: "cancelled" });
}
}
@@ -957,7 +1009,11 @@ export class AcpGatewayAgent implements Agent {
stopReason: StopReason,
): Promise<void> {
this.pendingPrompts.delete(sessionId);
this.disconnectDeadlineRunIds.delete(sessionId);
this.sessionStore.clearActiveRun(sessionId);
if (this.pendingPrompts.size === 0) {
this.clearDisconnectTimer();
}
const sessionSnapshot = await this.getSessionSnapshot(pending.sessionKey);
try {
await this.sendSessionSnapshotUpdate(sessionId, sessionSnapshot, {
@@ -982,6 +1038,134 @@ export class AcpGatewayAgent implements Agent {
return undefined;
}
private clearDisconnectTimer(): void {
if (!this.disconnectTimer) {
return;
}
clearTimeout(this.disconnectTimer);
this.disconnectTimer = null;
}
private rejectPendingPrompts(
error: Error,
promptRuns: Iterable<readonly [string, string]>,
): void {
for (const [sessionId, runId] of promptRuns) {
const pending = this.getPendingPrompt(sessionId, runId);
if (!pending) {
continue;
}
pending.reject(error);
this.sessionStore.clearActiveRun(pending.sessionId);
this.pendingPrompts.delete(sessionId);
this.disconnectDeadlineRunIds.delete(sessionId);
}
}
private async expireDisconnectDeadline(error: Error): Promise<void> {
const deadlinePromptRuns = [...this.disconnectDeadlineRunIds.entries()];
for (const [sessionId, runId] of deadlinePromptRuns) {
const pending = this.getPendingPrompt(sessionId, runId);
if (!pending) {
this.disconnectDeadlineRunIds.delete(sessionId);
continue;
}
if (!pending.sendAccepted) {
this.rejectPendingPrompts(error, [[sessionId, runId]]);
continue;
}
let result: AgentWaitResult | undefined;
try {
result = await this.gateway.request<AgentWaitResult>(
"agent.wait",
{
runId,
timeoutMs: 0,
},
{ timeoutMs: null },
);
} catch {
this.rejectPendingPrompts(error, [[sessionId, runId]]);
continue;
}
const currentPending = this.getPendingPrompt(sessionId, runId);
if (!currentPending) {
continue;
}
if (result?.status === "ok") {
await this.finishPrompt(sessionId, currentPending, "end_turn");
continue;
}
if (result?.status === "error") {
void this.finishPrompt(sessionId, currentPending, "end_turn");
continue;
}
this.rejectPendingPrompts(error, [[sessionId, runId]]);
}
}
private async reconcilePendingPromptsOnReconnect(
observedDisconnectGeneration: number,
): Promise<void> {
if (this.pendingPrompts.size === 0) {
if (this.disconnectGeneration === observedDisconnectGeneration) {
this.clearDisconnectTimer();
}
return;
}
const pendingEntries = [...this.pendingPrompts.entries()];
let keepDisconnectTimer = false;
const nextDisconnectDeadlineRunIds = new Map<string, string>();
for (const [sessionId, pending] of pendingEntries) {
if (this.pendingPrompts.get(sessionId) !== pending) {
continue;
}
let result: AgentWaitResult | undefined;
try {
result = await this.gateway.request<AgentWaitResult>(
"agent.wait",
{
runId: pending.idempotencyKey,
timeoutMs: 0,
},
{ timeoutMs: null },
);
} catch (err) {
this.log(`agent.wait reconcile failed for ${pending.idempotencyKey}: ${String(err)}`);
keepDisconnectTimer = true;
nextDisconnectDeadlineRunIds.set(sessionId, pending.idempotencyKey);
continue;
}
if (this.pendingPrompts.get(sessionId) !== pending) {
continue;
}
if (result?.status === "ok") {
await this.finishPrompt(sessionId, pending, "end_turn");
continue;
}
if (result?.status === "error") {
void this.finishPrompt(sessionId, pending, "end_turn");
continue;
}
if (result?.status === "timeout") {
keepDisconnectTimer = true;
nextDisconnectDeadlineRunIds.set(sessionId, pending.idempotencyKey);
continue;
}
}
if (this.disconnectGeneration === observedDisconnectGeneration) {
this.disconnectDeadlineRunIds = nextDisconnectDeadlineRunIds;
}
if (!keepDisconnectTimer && this.disconnectGeneration === observedDisconnectGeneration) {
this.clearDisconnectTimer();
}
}
private async sendAvailableCommands(sessionId: string): Promise<void> {
await this.connection.sessionUpdate({
sessionId,