mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-07 07:58:36 +00:00
test: speed up gateway cron history case
This commit is contained in:
@@ -157,12 +157,17 @@ async function setupCronTestRun(params: {
|
||||
}
|
||||
|
||||
type DirectCronState = {
|
||||
cron: { stop: () => void };
|
||||
cron: { start: () => Promise<void>; stop: () => void };
|
||||
storePath: string;
|
||||
getRuntimeConfig: () => import("../config/types.openclaw.js").OpenClawConfig;
|
||||
};
|
||||
|
||||
async function createDirectCronState(): Promise<DirectCronState> {
|
||||
type CronBroadcast = (event: string, payload: unknown) => void;
|
||||
|
||||
async function createDirectCronState(params?: {
|
||||
broadcast?: CronBroadcast;
|
||||
}): Promise<DirectCronState> {
|
||||
resetConfigRuntimeState();
|
||||
const [{ getRuntimeConfig }, { buildGatewayCronService }] = await Promise.all([
|
||||
import("../config/config.js"),
|
||||
import("./server-cron.js"),
|
||||
@@ -171,12 +176,60 @@ async function createDirectCronState(): Promise<DirectCronState> {
|
||||
...buildGatewayCronService({
|
||||
cfg: getRuntimeConfig(),
|
||||
deps: {} as never,
|
||||
broadcast: vi.fn(),
|
||||
broadcast: params?.broadcast ?? vi.fn(),
|
||||
}),
|
||||
getRuntimeConfig: getRuntimeConfig,
|
||||
};
|
||||
}
|
||||
|
||||
function createCronEventCollector() {
|
||||
const events: Record<string, unknown>[] = [];
|
||||
const waiters: Array<{
|
||||
check: (payload: Record<string, unknown>) => boolean;
|
||||
resolve: (payload: Record<string, unknown>) => void;
|
||||
reject: (error: Error) => void;
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
}> = [];
|
||||
const flush = (payload: Record<string, unknown>) => {
|
||||
for (const waiter of [...waiters]) {
|
||||
if (!waiter.check(payload)) {
|
||||
continue;
|
||||
}
|
||||
clearTimeout(waiter.timer);
|
||||
waiters.splice(waiters.indexOf(waiter), 1);
|
||||
waiter.resolve(payload);
|
||||
}
|
||||
};
|
||||
return {
|
||||
broadcast(event: string, payload: unknown) {
|
||||
if (event !== "cron" || !payload || typeof payload !== "object" || Array.isArray(payload)) {
|
||||
return;
|
||||
}
|
||||
const record = payload as Record<string, unknown>;
|
||||
events.push(record);
|
||||
flush(record);
|
||||
},
|
||||
wait(check: (payload: Record<string, unknown>) => boolean, timeoutMs = CRON_WAIT_TIMEOUT_MS) {
|
||||
const existing = events.find(check);
|
||||
if (existing) {
|
||||
return Promise.resolve(existing);
|
||||
}
|
||||
return new Promise<Record<string, unknown>>((resolve, reject) => {
|
||||
const waiter = {
|
||||
check,
|
||||
resolve,
|
||||
reject,
|
||||
timer: setTimeout(() => {
|
||||
waiters.splice(waiters.indexOf(waiter), 1);
|
||||
reject(new Error("timeout waiting for cron event"));
|
||||
}, timeoutMs),
|
||||
};
|
||||
waiters.push(waiter);
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async function directCronReq(
|
||||
cronState: DirectCronState,
|
||||
method: string,
|
||||
@@ -388,9 +441,8 @@ describe("gateway server cron", () => {
|
||||
const routeJobId = typeof routeJobIdValue === "string" ? routeJobIdValue : "";
|
||||
expect(routeJobId.length > 0).toBe(true);
|
||||
|
||||
const runRes = await directCronReq(cronState, "cron.run", { id: routeJobId, mode: "force" });
|
||||
expect(runRes.ok).toBe(true);
|
||||
expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
|
||||
const runRes = await cronState.cron.run(routeJobId, "force");
|
||||
expect(runRes).toEqual({ ok: true, ran: true });
|
||||
const events = await waitForSystemEvent();
|
||||
expect(events.some((event) => event.includes("cron route check"))).toBe(true);
|
||||
|
||||
@@ -834,17 +886,16 @@ describe("gateway server cron", () => {
|
||||
test("writes cron run history and auto-runs due jobs", async () => {
|
||||
const { prevSkipCron } = await setupCronTestRun({
|
||||
tempPrefix: "openclaw-gw-cron-log-",
|
||||
cronEnabled: true,
|
||||
});
|
||||
|
||||
const { server, ws } = await startServerWithClient();
|
||||
await connectOk(ws);
|
||||
const events = createCronEventCollector();
|
||||
const cronState = await createDirectCronState({ broadcast: events.broadcast });
|
||||
|
||||
try {
|
||||
const atMs = Date.now() - 1;
|
||||
const addRes = await rpcReq(ws, "cron.add", {
|
||||
const addRes = await directCronReq(cronState, "cron.add", {
|
||||
name: "log test",
|
||||
enabled: true,
|
||||
schedule: { kind: "at", at: new Date(atMs).toISOString() },
|
||||
schedule: { kind: "every", everyMs: 60_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "hello" },
|
||||
@@ -854,11 +905,10 @@ describe("gateway server cron", () => {
|
||||
const jobId = typeof jobIdValue === "string" ? jobIdValue : "";
|
||||
expect(jobId.length > 0).toBe(true);
|
||||
|
||||
const finishedRun = waitForCronEvent(
|
||||
ws,
|
||||
const finishedRun = events.wait(
|
||||
(payload) => payload?.jobId === jobId && payload?.action === "finished",
|
||||
);
|
||||
const runRes = await rpcReq(ws, "cron.run", { id: jobId, mode: "force" }, 20_000);
|
||||
const runRes = await directCronReq(cronState, "cron.run", { id: jobId, mode: "force" });
|
||||
expect(runRes.ok).toBe(true);
|
||||
expect(runRes.payload).toEqual({ ok: true, enqueued: true, runId: expect.any(String) });
|
||||
const manualRunId = (runRes.payload as { runId?: unknown } | null)?.runId;
|
||||
@@ -872,7 +922,7 @@ describe("gateway server cron", () => {
|
||||
deliveryStatus: "not-requested",
|
||||
});
|
||||
|
||||
const runsRes = await rpcReq(ws, "cron.runs", { id: jobId, limit: 50 });
|
||||
const runsRes = await directCronReq(cronState, "cron.runs", { id: jobId, limit: 50 });
|
||||
expect(runsRes.ok).toBe(true);
|
||||
const entries = (runsRes.payload as { entries?: unknown } | null)?.entries;
|
||||
expect(Array.isArray(entries)).toBe(true);
|
||||
@@ -882,7 +932,7 @@ describe("gateway server cron", () => {
|
||||
"not-requested",
|
||||
);
|
||||
expect((entries as Array<{ runId?: unknown }>).at(-1)?.runId).toBe(manualRunId);
|
||||
const allRunsRes = await rpcReq(ws, "cron.runs", {
|
||||
const allRunsRes = await directCronReq(cronState, "cron.runs", {
|
||||
scope: "all",
|
||||
limit: 50,
|
||||
statuses: ["ok"],
|
||||
@@ -894,7 +944,7 @@ describe("gateway server cron", () => {
|
||||
(allEntries as Array<{ jobId?: unknown }>).some((entry) => entry.jobId === jobId),
|
||||
).toBe(true);
|
||||
|
||||
const statusRes = await rpcReq(ws, "cron.status", {});
|
||||
const statusRes = await directCronReq(cronState, "cron.status", {});
|
||||
expect(statusRes.ok).toBe(true);
|
||||
const statusPayload = statusRes.payload as
|
||||
| { enabled?: unknown; storePath?: unknown }
|
||||
@@ -903,7 +953,7 @@ describe("gateway server cron", () => {
|
||||
const storePath = typeof statusPayload?.storePath === "string" ? statusPayload.storePath : "";
|
||||
expect(storePath).toContain("jobs.json");
|
||||
|
||||
const autoRes = await rpcReq(ws, "cron.add", {
|
||||
const autoRes = await directCronReq(cronState, "cron.add", {
|
||||
name: "auto run test",
|
||||
enabled: true,
|
||||
schedule: { kind: "at", at: new Date(Date.now() - 1).toISOString() },
|
||||
@@ -916,18 +966,19 @@ describe("gateway server cron", () => {
|
||||
const autoJobId = typeof autoJobIdValue === "string" ? autoJobIdValue : "";
|
||||
expect(autoJobId.length > 0).toBe(true);
|
||||
|
||||
await waitForCronEvent(
|
||||
ws,
|
||||
const autoFinished = events.wait(
|
||||
(payload) => payload?.jobId === autoJobId && payload?.action === "finished",
|
||||
);
|
||||
const autoEntries = (await rpcReq(ws, "cron.runs", { id: autoJobId, limit: 10 })).payload as
|
||||
| { entries?: Array<{ jobId?: unknown }> }
|
||||
| undefined;
|
||||
await cronState.cron.start();
|
||||
await autoFinished;
|
||||
const autoEntries = (
|
||||
await directCronReq(cronState, "cron.runs", { id: autoJobId, limit: 10 })
|
||||
).payload as { entries?: Array<{ jobId?: unknown }> } | undefined;
|
||||
expect(Array.isArray(autoEntries?.entries)).toBe(true);
|
||||
const runs = autoEntries?.entries ?? [];
|
||||
expect(runs.at(-1)?.jobId).toBe(autoJobId);
|
||||
} finally {
|
||||
await cleanupCronTestRun({ ws, server, prevSkipCron });
|
||||
await cleanupCronTestRun({ cronState, prevSkipCron });
|
||||
}
|
||||
}, 45_000);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user