From e59890eff0ff5ba123e56b7236d4f334214a2018 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 6 May 2026 08:29:51 +0100 Subject: [PATCH] test: speed up gateway cron history case --- src/gateway/server.cron.test.ts | 101 ++++++++++++++++++++++++-------- 1 file changed, 76 insertions(+), 25 deletions(-) diff --git a/src/gateway/server.cron.test.ts b/src/gateway/server.cron.test.ts index 482d1621181..7568f12045d 100644 --- a/src/gateway/server.cron.test.ts +++ b/src/gateway/server.cron.test.ts @@ -157,12 +157,17 @@ async function setupCronTestRun(params: { } type DirectCronState = { - cron: { stop: () => void }; + cron: { start: () => Promise; stop: () => void }; storePath: string; getRuntimeConfig: () => import("../config/types.openclaw.js").OpenClawConfig; }; -async function createDirectCronState(): Promise { +type CronBroadcast = (event: string, payload: unknown) => void; + +async function createDirectCronState(params?: { + broadcast?: CronBroadcast; +}): Promise { + resetConfigRuntimeState(); const [{ getRuntimeConfig }, { buildGatewayCronService }] = await Promise.all([ import("../config/config.js"), import("./server-cron.js"), @@ -171,12 +176,60 @@ async function createDirectCronState(): Promise { ...buildGatewayCronService({ cfg: getRuntimeConfig(), deps: {} as never, - broadcast: vi.fn(), + broadcast: params?.broadcast ?? vi.fn(), }), getRuntimeConfig: getRuntimeConfig, }; } +function createCronEventCollector() { + const events: Record[] = []; + const waiters: Array<{ + check: (payload: Record) => boolean; + resolve: (payload: Record) => void; + reject: (error: Error) => void; + timer: ReturnType; + }> = []; + const flush = (payload: Record) => { + for (const waiter of [...waiters]) { + if (!waiter.check(payload)) { + continue; + } + clearTimeout(waiter.timer); + waiters.splice(waiters.indexOf(waiter), 1); + waiter.resolve(payload); + } + }; + return { + broadcast(event: string, payload: unknown) { + if (event !== "cron" || !payload || typeof payload !== "object" || Array.isArray(payload)) { + return; + } + const record = payload as Record; + events.push(record); + flush(record); + }, + wait(check: (payload: Record) => boolean, timeoutMs = CRON_WAIT_TIMEOUT_MS) { + const existing = events.find(check); + if (existing) { + return Promise.resolve(existing); + } + return new Promise>((resolve, reject) => { + const waiter = { + check, + resolve, + reject, + timer: setTimeout(() => { + waiters.splice(waiters.indexOf(waiter), 1); + reject(new Error("timeout waiting for cron event")); + }, timeoutMs), + }; + waiters.push(waiter); + }); + }, + }; +} + async function directCronReq( cronState: DirectCronState, method: string, @@ -388,9 +441,8 @@ describe("gateway server cron", () => { const routeJobId = typeof routeJobIdValue === "string" ? routeJobIdValue : ""; expect(routeJobId.length > 0).toBe(true); - const runRes = await directCronReq(cronState, "cron.run", { id: routeJobId, mode: "force" }); - expect(runRes.ok).toBe(true); - expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) }); + const runRes = await cronState.cron.run(routeJobId, "force"); + expect(runRes).toEqual({ ok: true, ran: true }); const events = await waitForSystemEvent(); expect(events.some((event) => event.includes("cron route check"))).toBe(true); @@ -834,17 +886,16 @@ describe("gateway server cron", () => { test("writes cron run history and auto-runs due jobs", async () => { const { prevSkipCron } = await setupCronTestRun({ tempPrefix: "openclaw-gw-cron-log-", + cronEnabled: true, }); - - const { server, ws } = await startServerWithClient(); - await connectOk(ws); + const events = createCronEventCollector(); + const cronState = await createDirectCronState({ broadcast: events.broadcast }); try { - const atMs = Date.now() - 1; - const addRes = await rpcReq(ws, "cron.add", { + const addRes = await directCronReq(cronState, "cron.add", { name: "log test", enabled: true, - schedule: { kind: "at", at: new Date(atMs).toISOString() }, + schedule: { kind: "every", everyMs: 60_000 }, sessionTarget: "main", wakeMode: "next-heartbeat", payload: { kind: "systemEvent", text: "hello" }, @@ -854,11 +905,10 @@ describe("gateway server cron", () => { const jobId = typeof jobIdValue === "string" ? jobIdValue : ""; expect(jobId.length > 0).toBe(true); - const finishedRun = waitForCronEvent( - ws, + const finishedRun = events.wait( (payload) => payload?.jobId === jobId && payload?.action === "finished", ); - const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 20_000); + const runRes = await directCronReq(cronState, "cron.run", { id: jobId, mode: "force" }); expect(runRes.ok).toBe(true); expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) }); const manualRunId = (runRes.payload as { runId?: unknown } | null)?.runId; @@ -872,7 +922,7 @@ describe("gateway server cron", () => { deliveryStatus: "not-requested", }); - const runsRes = await rpcReq(ws, "cron.runs", { id: jobId, limit: 50 }); + const runsRes = await directCronReq(cronState, "cron.runs", { id: jobId, limit: 50 }); expect(runsRes.ok).toBe(true); const entries = (runsRes.payload as { entries?: unknown } | null)?.entries; expect(Array.isArray(entries)).toBe(true); @@ -882,7 +932,7 @@ describe("gateway server cron", () => { "not-requested", ); expect((entries as Array<{ runId?: unknown }>).at(-1)?.runId).toBe(manualRunId); - const allRunsRes = await rpcReq(ws, "cron.runs", { + const allRunsRes = await directCronReq(cronState, "cron.runs", { scope: "all", limit: 50, statuses: ["ok"], @@ -894,7 +944,7 @@ describe("gateway server cron", () => { (allEntries as Array<{ jobId?: unknown }>).some((entry) => entry.jobId === jobId), ).toBe(true); - const statusRes = await rpcReq(ws, "cron.status", {}); + const statusRes = await directCronReq(cronState, "cron.status", {}); expect(statusRes.ok).toBe(true); const statusPayload = statusRes.payload as | { enabled?: unknown; storePath?: unknown } @@ -903,7 +953,7 @@ describe("gateway server cron", () => { const storePath = typeof statusPayload?.storePath === "string" ? statusPayload.storePath : ""; expect(storePath).toContain("jobs.json"); - const autoRes = await rpcReq(ws, "cron.add", { + const autoRes = await directCronReq(cronState, "cron.add", { name: "auto run test", enabled: true, schedule: { kind: "at", at: new Date(Date.now() - 1).toISOString() }, @@ -916,18 +966,19 @@ describe("gateway server cron", () => { const autoJobId = typeof autoJobIdValue === "string" ? autoJobIdValue : ""; expect(autoJobId.length > 0).toBe(true); - await waitForCronEvent( - ws, + const autoFinished = events.wait( (payload) => payload?.jobId === autoJobId && payload?.action === "finished", ); - const autoEntries = (await rpcReq(ws, "cron.runs", { id: autoJobId, limit: 10 })).payload as - | { entries?: Array<{ jobId?: unknown }> } - | undefined; + await cronState.cron.start(); + await autoFinished; + const autoEntries = ( + await directCronReq(cronState, "cron.runs", { id: autoJobId, limit: 10 }) + ).payload as { entries?: Array<{ jobId?: unknown }> } | undefined; expect(Array.isArray(autoEntries?.entries)).toBe(true); const runs = autoEntries?.entries ?? []; expect(runs.at(-1)?.jobId).toBe(autoJobId); } finally { - await cleanupCronTestRun({ ws, server, prevSkipCron }); + await cleanupCronTestRun({ cronState, prevSkipCron }); } }, 45_000);