From 7f9eaed2815520a9ef6600a593a85e29bba34842 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sat, 7 Mar 2026 09:42:15 -0800 Subject: [PATCH] Gateway: add request timeouts to client RPCs --- src/gateway/client.test.ts | 31 +++++++++++++++++++++++++++ src/gateway/client.ts | 43 ++++++++++++++++++++++++++++++++++---- 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/src/gateway/client.test.ts b/src/gateway/client.test.ts index c69cbef39ee..f69e0801376 100644 --- a/src/gateway/client.test.ts +++ b/src/gateway/client.test.ts @@ -19,11 +19,14 @@ type WsEventHandlers = { }; class MockWebSocket { + static readonly OPEN = 1; + static readonly CLOSED = 3; private openHandlers: WsEventHandlers["open"][] = []; private messageHandlers: WsEventHandlers["message"][] = []; private closeHandlers: WsEventHandlers["close"][] = []; private errorHandlers: WsEventHandlers["error"][] = []; readonly sent: string[] = []; + readyState = MockWebSocket.CLOSED; constructor(_url: string, _options?: unknown) { wsInstances.push(this); @@ -59,6 +62,7 @@ class MockWebSocket { } emitOpen(): void { + this.readyState = MockWebSocket.OPEN; for (const handler of this.openHandlers) { handler(); } @@ -71,6 +75,7 @@ class MockWebSocket { } emitClose(code: number, reason: string): void { + this.readyState = MockWebSocket.CLOSED; for (const handler of this.closeHandlers) { handler(code, Buffer.from(reason)); } @@ -438,4 +443,30 @@ describe("GatewayClient connect auth payload", () => { }); client.stop(); }); + + it("times out pending requests and cleans them up", async () => { + vi.useFakeTimers(); + try { + const client = new GatewayClient({ + url: "ws://127.0.0.1:18789", + requestTimeoutMs: 25, + }); + + client.start(); + const ws = getLatestWs(); + ws.emitOpen(); + + const pending = client.request("health.check"); + const observed = pending.catch((err) => err); + await vi.advanceTimersByTimeAsync(25); + + await expect(observed).resolves.toMatchObject({ + message: "gateway request timeout for health.check", + }); + expect((client as unknown as { pending: Map }).pending.size).toBe(0); + client.stop(); + } finally { + vi.useRealTimers(); + } + }); }); diff --git a/src/gateway/client.ts b/src/gateway/client.ts index a22d3471bb4..466a2c86aa9 100644 --- a/src/gateway/client.ts +++ b/src/gateway/client.ts @@ -39,12 +39,14 @@ type Pending = { resolve: (value: unknown) => void; reject: (err: unknown) => void; expectFinal: boolean; + cleanup?: () => void; }; export type GatewayClientOptions = { url?: string; // ws://127.0.0.1:18789 connectDelayMs?: number; tickWatchMinIntervalMs?: number; + requestTimeoutMs?: number; token?: string; deviceToken?: string; password?: string; @@ -442,6 +444,7 @@ export class GatewayClient { private flushPendingErrors(err: Error) { for (const [, p] of this.pending) { + p.cleanup?.(); p.reject(err); } this.pending.clear(); @@ -501,7 +504,7 @@ export class GatewayClient { async request>( method: string, params?: unknown, - opts?: { expectFinal?: boolean }, + opts?: { expectFinal?: boolean; timeoutMs?: number }, ): Promise { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { throw new Error("gateway not connected"); @@ -514,14 +517,46 @@ export class GatewayClient { ); } const expectFinal = opts?.expectFinal === true; + const rawTimeoutMs = opts?.timeoutMs ?? this.opts.requestTimeoutMs; + const timeoutMs = + typeof rawTimeoutMs === "number" && Number.isFinite(rawTimeoutMs) + ? Math.max(1, Math.min(300_000, rawTimeoutMs)) + : 30_000; const p = new Promise((resolve, reject) => { + let timeout: NodeJS.Timeout | null = setTimeout(() => { + timeout = null; + this.pending.delete(id); + reject(new Error(`gateway request timeout for ${method}`)); + }, timeoutMs); + timeout.unref?.(); + const cleanup = () => { + if (!timeout) { + return; + } + clearTimeout(timeout); + timeout = null; + }; this.pending.set(id, { - resolve: (value) => resolve(value as T), - reject, + resolve: (value) => { + cleanup(); + resolve(value as T); + }, + reject: (err) => { + cleanup(); + reject(err); + }, expectFinal, + cleanup, }); }); - this.ws.send(JSON.stringify(frame)); + try { + this.ws.send(JSON.stringify(frame)); + } catch (err) { + const pending = this.pending.get(id); + pending?.cleanup?.(); + this.pending.delete(id); + throw err; + } return p; } }