mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-07 22:44:16 +00:00
refactor(acp): split session tests and share rate limiter
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { GatewayClient } from "../gateway/client.js";
|
||||
import { parseSessionMeta, resolveSessionKey } from "./session-mapper.js";
|
||||
import { createInMemorySessionStore } from "./session.js";
|
||||
|
||||
function createGateway(resolveLabelKey = "agent:main:label"): {
|
||||
gateway: GatewayClient;
|
||||
@@ -55,145 +54,3 @@ describe("acp session mapper", () => {
|
||||
expect(request).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("acp session manager", () => {
|
||||
let nowMs = 0;
|
||||
const now = () => nowMs;
|
||||
const advance = (ms: number) => {
|
||||
nowMs += ms;
|
||||
};
|
||||
let store = createInMemorySessionStore({ now });
|
||||
|
||||
beforeEach(() => {
|
||||
nowMs = 1_000;
|
||||
store = createInMemorySessionStore({ now });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
store.clearAllSessionsForTest();
|
||||
});
|
||||
|
||||
it("tracks active runs and clears on cancel", () => {
|
||||
const session = store.createSession({
|
||||
sessionKey: "acp:test",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
const controller = new AbortController();
|
||||
store.setActiveRun(session.sessionId, "run-1", controller);
|
||||
|
||||
expect(store.getSessionByRunId("run-1")?.sessionId).toBe(session.sessionId);
|
||||
|
||||
const cancelled = store.cancelActiveRun(session.sessionId);
|
||||
expect(cancelled).toBe(true);
|
||||
expect(store.getSessionByRunId("run-1")).toBeUndefined();
|
||||
});
|
||||
|
||||
it("refreshes existing session IDs instead of creating duplicates", () => {
|
||||
const first = store.createSession({
|
||||
sessionId: "existing",
|
||||
sessionKey: "acp:one",
|
||||
cwd: "/tmp/one",
|
||||
});
|
||||
advance(500);
|
||||
|
||||
const refreshed = store.createSession({
|
||||
sessionId: "existing",
|
||||
sessionKey: "acp:two",
|
||||
cwd: "/tmp/two",
|
||||
});
|
||||
|
||||
expect(refreshed).toBe(first);
|
||||
expect(refreshed.sessionKey).toBe("acp:two");
|
||||
expect(refreshed.cwd).toBe("/tmp/two");
|
||||
expect(refreshed.createdAt).toBe(1_000);
|
||||
expect(refreshed.lastTouchedAt).toBe(1_500);
|
||||
});
|
||||
|
||||
it("reaps idle sessions before enforcing the max session cap", () => {
|
||||
const boundedStore = createInMemorySessionStore({
|
||||
maxSessions: 1,
|
||||
idleTtlMs: 1_000,
|
||||
now,
|
||||
});
|
||||
try {
|
||||
boundedStore.createSession({
|
||||
sessionId: "old",
|
||||
sessionKey: "acp:old",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
advance(2_000);
|
||||
const fresh = boundedStore.createSession({
|
||||
sessionId: "fresh",
|
||||
sessionKey: "acp:fresh",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
|
||||
expect(fresh.sessionId).toBe("fresh");
|
||||
expect(boundedStore.getSession("old")).toBeUndefined();
|
||||
} finally {
|
||||
boundedStore.clearAllSessionsForTest();
|
||||
}
|
||||
});
|
||||
|
||||
it("uses soft-cap eviction for the oldest idle session when full", () => {
|
||||
const boundedStore = createInMemorySessionStore({
|
||||
maxSessions: 2,
|
||||
idleTtlMs: 24 * 60 * 60 * 1_000,
|
||||
now,
|
||||
});
|
||||
try {
|
||||
const first = boundedStore.createSession({
|
||||
sessionId: "first",
|
||||
sessionKey: "acp:first",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
advance(100);
|
||||
const second = boundedStore.createSession({
|
||||
sessionId: "second",
|
||||
sessionKey: "acp:second",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
const controller = new AbortController();
|
||||
boundedStore.setActiveRun(second.sessionId, "run-2", controller);
|
||||
advance(100);
|
||||
|
||||
const third = boundedStore.createSession({
|
||||
sessionId: "third",
|
||||
sessionKey: "acp:third",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
|
||||
expect(third.sessionId).toBe("third");
|
||||
expect(boundedStore.getSession(first.sessionId)).toBeUndefined();
|
||||
expect(boundedStore.getSession(second.sessionId)).toBeDefined();
|
||||
} finally {
|
||||
boundedStore.clearAllSessionsForTest();
|
||||
}
|
||||
});
|
||||
|
||||
it("rejects when full and no session is evictable", () => {
|
||||
const boundedStore = createInMemorySessionStore({
|
||||
maxSessions: 1,
|
||||
idleTtlMs: 24 * 60 * 60 * 1_000,
|
||||
now,
|
||||
});
|
||||
try {
|
||||
const only = boundedStore.createSession({
|
||||
sessionId: "only",
|
||||
sessionKey: "acp:only",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
boundedStore.setActiveRun(only.sessionId, "run-only", new AbortController());
|
||||
|
||||
expect(() =>
|
||||
boundedStore.createSession({
|
||||
sessionId: "next",
|
||||
sessionKey: "acp:next",
|
||||
cwd: "/tmp",
|
||||
}),
|
||||
).toThrow(/session limit reached/i);
|
||||
} finally {
|
||||
boundedStore.clearAllSessionsForTest();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
146
src/acp/session.test.ts
Normal file
146
src/acp/session.test.ts
Normal file
@@ -0,0 +1,146 @@
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
import { createInMemorySessionStore } from "./session.js";
|
||||
|
||||
describe("acp session manager", () => {
|
||||
let nowMs = 0;
|
||||
const now = () => nowMs;
|
||||
const advance = (ms: number) => {
|
||||
nowMs += ms;
|
||||
};
|
||||
let store = createInMemorySessionStore({ now });
|
||||
|
||||
beforeEach(() => {
|
||||
nowMs = 1_000;
|
||||
store = createInMemorySessionStore({ now });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
store.clearAllSessionsForTest();
|
||||
});
|
||||
|
||||
it("tracks active runs and clears on cancel", () => {
|
||||
const session = store.createSession({
|
||||
sessionKey: "acp:test",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
const controller = new AbortController();
|
||||
store.setActiveRun(session.sessionId, "run-1", controller);
|
||||
|
||||
expect(store.getSessionByRunId("run-1")?.sessionId).toBe(session.sessionId);
|
||||
|
||||
const cancelled = store.cancelActiveRun(session.sessionId);
|
||||
expect(cancelled).toBe(true);
|
||||
expect(store.getSessionByRunId("run-1")).toBeUndefined();
|
||||
});
|
||||
|
||||
it("refreshes existing session IDs instead of creating duplicates", () => {
|
||||
const first = store.createSession({
|
||||
sessionId: "existing",
|
||||
sessionKey: "acp:one",
|
||||
cwd: "/tmp/one",
|
||||
});
|
||||
advance(500);
|
||||
|
||||
const refreshed = store.createSession({
|
||||
sessionId: "existing",
|
||||
sessionKey: "acp:two",
|
||||
cwd: "/tmp/two",
|
||||
});
|
||||
|
||||
expect(refreshed).toBe(first);
|
||||
expect(refreshed.sessionKey).toBe("acp:two");
|
||||
expect(refreshed.cwd).toBe("/tmp/two");
|
||||
expect(refreshed.createdAt).toBe(1_000);
|
||||
expect(refreshed.lastTouchedAt).toBe(1_500);
|
||||
expect(store.hasSession("existing")).toBe(true);
|
||||
});
|
||||
|
||||
it("reaps idle sessions before enforcing the max session cap", () => {
|
||||
const boundedStore = createInMemorySessionStore({
|
||||
maxSessions: 1,
|
||||
idleTtlMs: 1_000,
|
||||
now,
|
||||
});
|
||||
try {
|
||||
boundedStore.createSession({
|
||||
sessionId: "old",
|
||||
sessionKey: "acp:old",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
advance(2_000);
|
||||
const fresh = boundedStore.createSession({
|
||||
sessionId: "fresh",
|
||||
sessionKey: "acp:fresh",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
|
||||
expect(fresh.sessionId).toBe("fresh");
|
||||
expect(boundedStore.getSession("old")).toBeUndefined();
|
||||
expect(boundedStore.hasSession("old")).toBe(false);
|
||||
} finally {
|
||||
boundedStore.clearAllSessionsForTest();
|
||||
}
|
||||
});
|
||||
|
||||
it("uses soft-cap eviction for the oldest idle session when full", () => {
|
||||
const boundedStore = createInMemorySessionStore({
|
||||
maxSessions: 2,
|
||||
idleTtlMs: 24 * 60 * 60 * 1_000,
|
||||
now,
|
||||
});
|
||||
try {
|
||||
const first = boundedStore.createSession({
|
||||
sessionId: "first",
|
||||
sessionKey: "acp:first",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
advance(100);
|
||||
const second = boundedStore.createSession({
|
||||
sessionId: "second",
|
||||
sessionKey: "acp:second",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
const controller = new AbortController();
|
||||
boundedStore.setActiveRun(second.sessionId, "run-2", controller);
|
||||
advance(100);
|
||||
|
||||
const third = boundedStore.createSession({
|
||||
sessionId: "third",
|
||||
sessionKey: "acp:third",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
|
||||
expect(third.sessionId).toBe("third");
|
||||
expect(boundedStore.getSession(first.sessionId)).toBeUndefined();
|
||||
expect(boundedStore.getSession(second.sessionId)).toBeDefined();
|
||||
} finally {
|
||||
boundedStore.clearAllSessionsForTest();
|
||||
}
|
||||
});
|
||||
|
||||
it("rejects when full and no session is evictable", () => {
|
||||
const boundedStore = createInMemorySessionStore({
|
||||
maxSessions: 1,
|
||||
idleTtlMs: 24 * 60 * 60 * 1_000,
|
||||
now,
|
||||
});
|
||||
try {
|
||||
const only = boundedStore.createSession({
|
||||
sessionId: "only",
|
||||
sessionKey: "acp:only",
|
||||
cwd: "/tmp",
|
||||
});
|
||||
boundedStore.setActiveRun(only.sessionId, "run-only", new AbortController());
|
||||
|
||||
expect(() =>
|
||||
boundedStore.createSession({
|
||||
sessionId: "next",
|
||||
sessionKey: "acp:next",
|
||||
cwd: "/tmp",
|
||||
}),
|
||||
).toThrow(/session limit reached/i);
|
||||
} finally {
|
||||
boundedStore.clearAllSessionsForTest();
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -3,6 +3,7 @@ import type { AcpSession } from "./types.js";
|
||||
|
||||
export type AcpSessionStore = {
|
||||
createSession: (params: { sessionKey: string; cwd: string; sessionId?: string }) => AcpSession;
|
||||
hasSession: (sessionId: string) => boolean;
|
||||
getSession: (sessionId: string) => AcpSession | undefined;
|
||||
getSessionByRunId: (runId: string) => AcpSession | undefined;
|
||||
setActiveRun: (sessionId: string, runId: string, abortController: AbortController) => void;
|
||||
@@ -105,6 +106,8 @@ export function createInMemorySessionStore(options: AcpSessionStoreOptions = {})
|
||||
return session;
|
||||
};
|
||||
|
||||
const hasSession: AcpSessionStore["hasSession"] = (sessionId) => sessions.has(sessionId);
|
||||
|
||||
const getSession: AcpSessionStore["getSession"] = (sessionId) => {
|
||||
const session = sessions.get(sessionId);
|
||||
if (session) {
|
||||
@@ -174,6 +177,7 @@ export function createInMemorySessionStore(options: AcpSessionStoreOptions = {})
|
||||
|
||||
return {
|
||||
createSession,
|
||||
hasSession,
|
||||
getSession,
|
||||
getSessionByRunId,
|
||||
setActiveRun,
|
||||
|
||||
@@ -23,6 +23,10 @@ import { PROTOCOL_VERSION } from "@agentclientprotocol/sdk";
|
||||
import type { GatewayClient } from "../gateway/client.js";
|
||||
import type { EventFrame } from "../gateway/protocol/index.js";
|
||||
import type { SessionsListResult } from "../gateway/session-utils.js";
|
||||
import {
|
||||
createFixedWindowRateLimiter,
|
||||
type FixedWindowRateLimiter,
|
||||
} from "../infra/fixed-window-rate-limit.js";
|
||||
import { getAvailableCommands } from "./commands.js";
|
||||
import {
|
||||
extractAttachmentsFromPrompt,
|
||||
@@ -53,47 +57,13 @@ type AcpGatewayAgentOptions = AcpServerOptions & {
|
||||
const SESSION_CREATE_RATE_LIMIT_DEFAULT_MAX_REQUESTS = 120;
|
||||
const SESSION_CREATE_RATE_LIMIT_DEFAULT_WINDOW_MS = 10_000;
|
||||
|
||||
class SessionCreateRateLimiter {
|
||||
private count = 0;
|
||||
private windowStartMs = 0;
|
||||
|
||||
constructor(
|
||||
private readonly maxRequests: number,
|
||||
private readonly windowMs: number,
|
||||
private readonly now: () => number = Date.now,
|
||||
) {}
|
||||
|
||||
consume(): { allowed: boolean; retryAfterMs: number; remaining: number } {
|
||||
const nowMs = this.now();
|
||||
if (nowMs - this.windowStartMs >= this.windowMs) {
|
||||
this.windowStartMs = nowMs;
|
||||
this.count = 0;
|
||||
}
|
||||
|
||||
if (this.count >= this.maxRequests) {
|
||||
return {
|
||||
allowed: false,
|
||||
retryAfterMs: Math.max(0, this.windowStartMs + this.windowMs - nowMs),
|
||||
remaining: 0,
|
||||
};
|
||||
}
|
||||
|
||||
this.count += 1;
|
||||
return {
|
||||
allowed: true,
|
||||
retryAfterMs: 0,
|
||||
remaining: Math.max(0, this.maxRequests - this.count),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export class AcpGatewayAgent implements Agent {
|
||||
private connection: AgentSideConnection;
|
||||
private gateway: GatewayClient;
|
||||
private opts: AcpGatewayAgentOptions;
|
||||
private log: (msg: string) => void;
|
||||
private sessionStore: AcpSessionStore;
|
||||
private sessionCreateRateLimiter: SessionCreateRateLimiter;
|
||||
private sessionCreateRateLimiter: FixedWindowRateLimiter;
|
||||
private pendingPrompts = new Map<string, PendingPrompt>();
|
||||
|
||||
constructor(
|
||||
@@ -106,16 +76,16 @@ export class AcpGatewayAgent implements Agent {
|
||||
this.opts = opts;
|
||||
this.log = opts.verbose ? (msg: string) => process.stderr.write(`[acp] ${msg}\n`) : () => {};
|
||||
this.sessionStore = opts.sessionStore ?? defaultAcpSessionStore;
|
||||
this.sessionCreateRateLimiter = new SessionCreateRateLimiter(
|
||||
Math.max(
|
||||
this.sessionCreateRateLimiter = createFixedWindowRateLimiter({
|
||||
maxRequests: Math.max(
|
||||
1,
|
||||
opts.sessionCreateRateLimit?.maxRequests ?? SESSION_CREATE_RATE_LIMIT_DEFAULT_MAX_REQUESTS,
|
||||
),
|
||||
Math.max(
|
||||
windowMs: Math.max(
|
||||
1_000,
|
||||
opts.sessionCreateRateLimit?.windowMs ?? SESSION_CREATE_RATE_LIMIT_DEFAULT_WINDOW_MS,
|
||||
),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
start(): void {
|
||||
@@ -203,7 +173,7 @@ export class AcpGatewayAgent implements Agent {
|
||||
if (params.mcpServers.length > 0) {
|
||||
this.log(`ignoring ${params.mcpServers.length} MCP servers`);
|
||||
}
|
||||
if (!this.sessionStore.getSession(params.sessionId)) {
|
||||
if (!this.sessionStore.hasSession(params.sessionId)) {
|
||||
this.enforceSessionCreateRateLimit("loadSession");
|
||||
}
|
||||
|
||||
|
||||
31
src/infra/fixed-window-rate-limit.test.ts
Normal file
31
src/infra/fixed-window-rate-limit.test.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { createFixedWindowRateLimiter } from "./fixed-window-rate-limit.js";
|
||||
|
||||
describe("fixed-window rate limiter", () => {
|
||||
it("blocks after max requests until window reset", () => {
|
||||
let nowMs = 1_000;
|
||||
const limiter = createFixedWindowRateLimiter({
|
||||
maxRequests: 2,
|
||||
windowMs: 1_000,
|
||||
now: () => nowMs,
|
||||
});
|
||||
|
||||
expect(limiter.consume()).toMatchObject({ allowed: true, remaining: 1 });
|
||||
expect(limiter.consume()).toMatchObject({ allowed: true, remaining: 0 });
|
||||
expect(limiter.consume()).toMatchObject({ allowed: false, retryAfterMs: 1_000 });
|
||||
|
||||
nowMs += 1_000;
|
||||
expect(limiter.consume()).toMatchObject({ allowed: true, remaining: 1 });
|
||||
});
|
||||
|
||||
it("supports explicit reset", () => {
|
||||
const limiter = createFixedWindowRateLimiter({
|
||||
maxRequests: 1,
|
||||
windowMs: 10_000,
|
||||
});
|
||||
expect(limiter.consume().allowed).toBe(true);
|
||||
expect(limiter.consume().allowed).toBe(false);
|
||||
limiter.reset();
|
||||
expect(limiter.consume().allowed).toBe(true);
|
||||
});
|
||||
});
|
||||
48
src/infra/fixed-window-rate-limit.ts
Normal file
48
src/infra/fixed-window-rate-limit.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
export type FixedWindowRateLimiter = {
|
||||
consume: () => {
|
||||
allowed: boolean;
|
||||
retryAfterMs: number;
|
||||
remaining: number;
|
||||
};
|
||||
reset: () => void;
|
||||
};
|
||||
|
||||
export function createFixedWindowRateLimiter(params: {
|
||||
maxRequests: number;
|
||||
windowMs: number;
|
||||
now?: () => number;
|
||||
}): FixedWindowRateLimiter {
|
||||
const maxRequests = Math.max(1, Math.floor(params.maxRequests));
|
||||
const windowMs = Math.max(1, Math.floor(params.windowMs));
|
||||
const now = params.now ?? Date.now;
|
||||
|
||||
let count = 0;
|
||||
let windowStartMs = 0;
|
||||
|
||||
return {
|
||||
consume() {
|
||||
const nowMs = now();
|
||||
if (nowMs - windowStartMs >= windowMs) {
|
||||
windowStartMs = nowMs;
|
||||
count = 0;
|
||||
}
|
||||
if (count >= maxRequests) {
|
||||
return {
|
||||
allowed: false,
|
||||
retryAfterMs: Math.max(0, windowStartMs + windowMs - nowMs),
|
||||
remaining: 0,
|
||||
};
|
||||
}
|
||||
count += 1;
|
||||
return {
|
||||
allowed: true,
|
||||
retryAfterMs: 0,
|
||||
remaining: Math.max(0, maxRequests - count),
|
||||
};
|
||||
},
|
||||
reset() {
|
||||
count = 0;
|
||||
windowStartMs = 0;
|
||||
},
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user