mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-25 15:37:32 +00:00
perf(test): trim duplicate gateway and auto-reply test overhead
This commit is contained in:
@@ -164,51 +164,6 @@ describe("block streaming", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("drops final payloads when block replies streamed", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
const onBlockReply = vi.fn().mockResolvedValue(undefined);
|
||||
|
||||
const impl = async (params: RunEmbeddedPiAgentParams) => {
|
||||
void params.onBlockReply?.({ text: "chunk-1" });
|
||||
return {
|
||||
payloads: [{ text: "chunk-1\nchunk-2" }],
|
||||
meta: {
|
||||
durationMs: 5,
|
||||
agentMeta: { sessionId: "s", provider: "p", model: "m" },
|
||||
},
|
||||
};
|
||||
};
|
||||
piEmbeddedMock.runEmbeddedPiAgent.mockImplementation(impl);
|
||||
|
||||
const res = await getReplyFromConfig(
|
||||
{
|
||||
Body: "ping",
|
||||
From: "+1004",
|
||||
To: "+2000",
|
||||
MessageSid: "msg-124",
|
||||
Provider: "discord",
|
||||
},
|
||||
{
|
||||
onBlockReply,
|
||||
disableBlockStreaming: false,
|
||||
},
|
||||
{
|
||||
agents: {
|
||||
defaults: {
|
||||
model: "anthropic/claude-opus-4-5",
|
||||
workspace: path.join(home, "openclaw"),
|
||||
},
|
||||
},
|
||||
channels: { whatsapp: { allowFrom: ["*"] } },
|
||||
session: { store: path.join(home, "sessions.json") },
|
||||
},
|
||||
);
|
||||
|
||||
expect(res).toBeUndefined();
|
||||
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to final payloads when block reply send times out", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
let sawAbort = false;
|
||||
|
||||
@@ -161,36 +161,10 @@ describe("RawBody directive parsing", () => {
|
||||
},
|
||||
expectedIncludes: ["Verbose logging enabled."],
|
||||
});
|
||||
|
||||
await assertCommandReply({
|
||||
message: {
|
||||
Body: `[Chat messages since your last reply - for context]\\n[WhatsApp ...] Someone: hello\\n\\n[Current message - respond to this]\\n[WhatsApp ...] Jake: /status\\n[from: Jake McInteer (+6421807830)]`,
|
||||
RawBody: "/status",
|
||||
ChatType: "group",
|
||||
From: "+1222",
|
||||
To: "+1222",
|
||||
SessionKey: "agent:main:whatsapp:group:g1",
|
||||
Provider: "whatsapp",
|
||||
Surface: "whatsapp",
|
||||
SenderE164: "+1222",
|
||||
CommandAuthorized: true,
|
||||
},
|
||||
config: {
|
||||
agents: {
|
||||
defaults: {
|
||||
model: "anthropic/claude-opus-4-5",
|
||||
workspace: path.join(home, "openclaw-3"),
|
||||
},
|
||||
},
|
||||
channels: { whatsapp: { allowFrom: ["+1222"] } },
|
||||
session: { store: path.join(home, "sessions-3.json") },
|
||||
},
|
||||
expectedIncludes: ["Session: agent:main:whatsapp:group:g1", "anthropic/claude-opus-4-5"],
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves history when RawBody is provided for command parsing", async () => {
|
||||
it("preserves history and reuses non-default agent session files", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
|
||||
payloads: [{ text: "ok" }],
|
||||
@@ -238,11 +212,6 @@ describe("RawBody directive parsing", () => {
|
||||
expect(prompt).toContain('"body": "hello"');
|
||||
expect(prompt).toContain("status please");
|
||||
expect(prompt).not.toContain("/think:high");
|
||||
});
|
||||
});
|
||||
|
||||
it("reuses non-default agent session files without throwing path validation errors", async () => {
|
||||
await withTempHome(async (home) => {
|
||||
const agentId = "worker1";
|
||||
const sessionId = "sess-worker-1";
|
||||
const sessionKey = `agent:${agentId}:telegram:12345`;
|
||||
@@ -259,6 +228,7 @@ describe("RawBody directive parsing", () => {
|
||||
},
|
||||
});
|
||||
|
||||
vi.mocked(runEmbeddedPiAgent).mockReset();
|
||||
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
|
||||
payloads: [{ text: "ok" }],
|
||||
meta: {
|
||||
@@ -267,7 +237,7 @@ describe("RawBody directive parsing", () => {
|
||||
},
|
||||
});
|
||||
|
||||
const res = await getReplyFromConfig(
|
||||
const resWorker = await getReplyFromConfig(
|
||||
{
|
||||
Body: "hello",
|
||||
From: "telegram:12345",
|
||||
@@ -288,8 +258,8 @@ describe("RawBody directive parsing", () => {
|
||||
},
|
||||
);
|
||||
|
||||
const text = Array.isArray(res) ? res[0]?.text : res?.text;
|
||||
expect(text).toBe("ok");
|
||||
const textWorker = Array.isArray(resWorker) ? resWorker[0]?.text : resWorker?.text;
|
||||
expect(textWorker).toBe("ok");
|
||||
expect(runEmbeddedPiAgent).toHaveBeenCalledOnce();
|
||||
expect(vi.mocked(runEmbeddedPiAgent).mock.calls[0]?.[0]?.sessionFile).toBe(sessionFile);
|
||||
});
|
||||
|
||||
@@ -36,7 +36,7 @@ describe("gateway config reload during reply", () => {
|
||||
const dispatcher = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
// Simulate async reply delivery
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
deliveredReplies.push(payload.text ?? "");
|
||||
},
|
||||
onError: (err) => {
|
||||
@@ -103,49 +103,4 @@ describe("gateway config reload during reply", () => {
|
||||
expect(deliverCalled).toBe(false);
|
||||
expect(getTotalPendingReplies()).toBe(0);
|
||||
});
|
||||
|
||||
it("should integrate dispatcher reservation with concurrent dispatchers", async () => {
|
||||
const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js");
|
||||
const { getTotalQueueSize } = await import("../process/command-queue.js");
|
||||
|
||||
const deliveredReplies: string[] = [];
|
||||
const dispatcher = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
deliveredReplies.push(payload.text ?? "");
|
||||
},
|
||||
});
|
||||
|
||||
// Dispatcher has reservation (pending=1)
|
||||
expect(getTotalPendingReplies()).toBe(1);
|
||||
|
||||
// Total active = queue + pending
|
||||
const totalActive = getTotalQueueSize() + getTotalPendingReplies();
|
||||
expect(totalActive).toBe(1); // 0 queue + 1 pending
|
||||
|
||||
// Command finishes, replies enqueued
|
||||
dispatcher.sendFinalReply({ text: "Reply 1" });
|
||||
dispatcher.sendFinalReply({ text: "Reply 2" });
|
||||
|
||||
// Now: pending=3 (reservation + 2 replies)
|
||||
expect(getTotalPendingReplies()).toBe(3);
|
||||
|
||||
// Mark complete (flags reservation for cleanup on last delivery)
|
||||
dispatcher.markComplete();
|
||||
|
||||
// Reservation still counted until delivery .finally() clears it,
|
||||
// but the important invariant is pending > 0 while deliveries are in flight.
|
||||
expect(getTotalPendingReplies()).toBeGreaterThan(0);
|
||||
|
||||
// Wait for replies
|
||||
await dispatcher.waitForIdle();
|
||||
|
||||
// Replies sent, pending=0
|
||||
expect(getTotalPendingReplies()).toBe(0);
|
||||
expect(deliveredReplies).toEqual(["Reply 1", "Reply 2"]);
|
||||
|
||||
// Now everything is idle
|
||||
expect(getTotalPendingReplies()).toBe(0);
|
||||
expect(getTotalQueueSize()).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -31,7 +31,7 @@ describe("gateway restart deferral integration", () => {
|
||||
const dispatcher = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
// Simulate network delay
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
deliveredReplies.push({
|
||||
text: payload.text ?? "",
|
||||
timestamp: Date.now(),
|
||||
@@ -116,84 +116,4 @@ describe("gateway restart deferral integration", () => {
|
||||
"restart-can-proceed",
|
||||
]);
|
||||
});
|
||||
|
||||
it("should handle concurrent dispatchers with config changes", async () => {
|
||||
const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js");
|
||||
const { getTotalPendingReplies } = await import("../auto-reply/reply/dispatcher-registry.js");
|
||||
|
||||
// Simulate two messages being processed concurrently
|
||||
const deliveredReplies: string[] = [];
|
||||
|
||||
// Message 1 — dispatcher created
|
||||
const dispatcher1 = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
deliveredReplies.push(`msg1: ${payload.text}`);
|
||||
},
|
||||
});
|
||||
|
||||
// Message 2 — dispatcher created
|
||||
const dispatcher2 = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
deliveredReplies.push(`msg2: ${payload.text}`);
|
||||
},
|
||||
});
|
||||
|
||||
// Both dispatchers have reservations
|
||||
expect(getTotalPendingReplies()).toBe(2);
|
||||
|
||||
// Config change detected - should defer
|
||||
const totalActive = getTotalPendingReplies();
|
||||
expect(totalActive).toBe(2); // 2 dispatcher reservations
|
||||
|
||||
// Messages process and send replies
|
||||
dispatcher1.sendFinalReply({ text: "Reply from message 1" });
|
||||
dispatcher1.markComplete();
|
||||
|
||||
dispatcher2.sendFinalReply({ text: "Reply from message 2" });
|
||||
dispatcher2.markComplete();
|
||||
|
||||
// Wait for both
|
||||
await Promise.all([dispatcher1.waitForIdle(), dispatcher2.waitForIdle()]);
|
||||
|
||||
// All idle
|
||||
expect(getTotalPendingReplies()).toBe(0);
|
||||
|
||||
// Replies delivered
|
||||
expect(deliveredReplies).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("should handle rapid config changes without losing replies", async () => {
|
||||
const { createReplyDispatcher } = await import("../auto-reply/reply/reply-dispatcher.js");
|
||||
const { getTotalPendingReplies } = await import("../auto-reply/reply/dispatcher-registry.js");
|
||||
|
||||
const deliveredReplies: string[] = [];
|
||||
|
||||
// Message received — dispatcher created
|
||||
const dispatcher = createReplyDispatcher({
|
||||
deliver: async (payload) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 200)); // Slow network
|
||||
deliveredReplies.push(payload.text ?? "");
|
||||
},
|
||||
});
|
||||
|
||||
// Config change 1, 2, 3 (rapid changes)
|
||||
// All should be deferred because dispatcher has pending replies
|
||||
|
||||
// Send replies
|
||||
dispatcher.sendFinalReply({ text: "Processing..." });
|
||||
dispatcher.sendFinalReply({ text: "Almost done..." });
|
||||
dispatcher.sendFinalReply({ text: "Complete!" });
|
||||
dispatcher.markComplete();
|
||||
|
||||
// Wait for all replies
|
||||
await dispatcher.waitForIdle();
|
||||
|
||||
// All replies should be delivered
|
||||
expect(deliveredReplies).toEqual(["Processing...", "Almost done...", "Complete!"]);
|
||||
|
||||
// Now restart can proceed
|
||||
expect(getTotalPendingReplies()).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -36,7 +36,7 @@ describe("real scenario: config change during message processing", () => {
|
||||
throw new Error(error);
|
||||
}
|
||||
// Slow delivery — restart checks will run during this window
|
||||
await new Promise((resolve) => setTimeout(resolve, 500));
|
||||
await new Promise((resolve) => setTimeout(resolve, 150));
|
||||
deliveredReplies.push(payload.text ?? "");
|
||||
},
|
||||
onError: () => {
|
||||
@@ -59,7 +59,7 @@ describe("real scenario: config change during message processing", () => {
|
||||
// If the tracking is broken, pending would be 0 and we'd restart.
|
||||
let restartTriggered = false;
|
||||
for (let i = 0; i < 3; i++) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
await new Promise((resolve) => setTimeout(resolve, 25));
|
||||
const pending = getTotalPendingReplies();
|
||||
if (pending === 0) {
|
||||
restartTriggered = true;
|
||||
@@ -86,7 +86,7 @@ describe("real scenario: config change during message processing", () => {
|
||||
|
||||
const dispatcher = createReplyDispatcher({
|
||||
deliver: async (_payload) => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
},
|
||||
});
|
||||
|
||||
@@ -94,7 +94,7 @@ describe("real scenario: config change during message processing", () => {
|
||||
expect(getTotalPendingReplies()).toBe(1);
|
||||
|
||||
// Simulate command processing delay BEFORE reply is enqueued
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
|
||||
// During this delay, pending should STILL be 1 (reservation active)
|
||||
expect(getTotalPendingReplies()).toBe(1);
|
||||
|
||||
@@ -112,8 +112,6 @@ describe("command queue", () => {
|
||||
await blocker;
|
||||
});
|
||||
|
||||
// Give the event loop a tick for the task to start.
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
expect(getActiveTaskCount()).toBe(1);
|
||||
|
||||
resolve1();
|
||||
@@ -136,18 +134,21 @@ describe("command queue", () => {
|
||||
await blocker;
|
||||
});
|
||||
|
||||
// Give the task a tick to start.
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const drainPromise = waitForActiveTasks(5000);
|
||||
|
||||
const drainPromise = waitForActiveTasks(5000);
|
||||
// Resolve the blocker after a short delay.
|
||||
setTimeout(() => resolve1(), 10);
|
||||
await vi.advanceTimersByTimeAsync(100);
|
||||
|
||||
// Resolve the blocker after a short delay.
|
||||
setTimeout(() => resolve1(), 50);
|
||||
const { drained } = await drainPromise;
|
||||
expect(drained).toBe(true);
|
||||
|
||||
const { drained } = await drainPromise;
|
||||
expect(drained).toBe(true);
|
||||
|
||||
await task;
|
||||
await task;
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("waitForActiveTasks returns drained=false on timeout", async () => {
|
||||
@@ -160,13 +161,18 @@ describe("command queue", () => {
|
||||
await blocker;
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const waitPromise = waitForActiveTasks(50);
|
||||
await vi.advanceTimersByTimeAsync(100);
|
||||
const { drained } = await waitPromise;
|
||||
expect(drained).toBe(false);
|
||||
|
||||
const { drained } = await waitForActiveTasks(50);
|
||||
expect(drained).toBe(false);
|
||||
|
||||
resolve1();
|
||||
await task;
|
||||
resolve1();
|
||||
await task;
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("resetAllLanes drains queued work immediately after reset", async () => {
|
||||
@@ -228,15 +234,12 @@ describe("command queue", () => {
|
||||
const first = enqueueCommandInLane(lane, async () => {
|
||||
await blocker1;
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
|
||||
const drainPromise = waitForActiveTasks(2000);
|
||||
|
||||
// Starts after waitForActiveTasks snapshot and should not block drain completion.
|
||||
const second = enqueueCommandInLane(lane, async () => {
|
||||
await blocker2;
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
expect(getActiveTaskCount()).toBeGreaterThanOrEqual(2);
|
||||
|
||||
resolve1();
|
||||
@@ -262,9 +265,6 @@ describe("command queue", () => {
|
||||
// Second task is queued behind the first.
|
||||
const second = enqueueCommand(async () => "second");
|
||||
|
||||
// Give the first task a tick to start.
|
||||
await new Promise((r) => setTimeout(r, 5));
|
||||
|
||||
const removed = clearCommandLane();
|
||||
expect(removed).toBe(1); // only the queued (not active) entry
|
||||
|
||||
|
||||
Reference in New Issue
Block a user