refactor(test): extract cron issue-regression harness and frozen-time helper

This commit is contained in:
Peter Steinberger
2026-03-03 01:42:33 +00:00
parent 39520ad21b
commit 47736e3432
3 changed files with 189 additions and 151 deletions

View File

@@ -0,0 +1,165 @@
import crypto from "node:crypto";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterAll, beforeAll, beforeEach, vi } from "vitest";
import { useFrozenTime, useRealTime } from "../test-utils/frozen-time.js";
import type { CronService } from "./service.js";
import type { CronJob, CronJobState } from "./types.js";
const TOP_OF_HOUR_STAGGER_MS = 5 * 60 * 1_000;
export const noopLogger = {
info: () => {},
warn: () => {},
error: () => {},
debug: () => {},
trace: () => {},
};
let fixtureRoot = "";
let fixtureCount = 0;
export type CronServiceOptions = ConstructorParameters<typeof CronService>[0];
export function setupCronIssueRegressionFixtures() {
beforeAll(async () => {
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "cron-issues-"));
});
beforeEach(() => {
useFrozenTime("2026-02-06T10:05:00.000Z");
});
afterAll(async () => {
useRealTime();
await fs.rm(fixtureRoot, { recursive: true, force: true });
});
return {
makeStorePath,
};
}
export function topOfHourOffsetMs(jobId: string) {
const digest = crypto.createHash("sha256").update(jobId).digest();
return digest.readUInt32BE(0) % TOP_OF_HOUR_STAGGER_MS;
}
export function makeStorePath() {
const storePath = path.join(fixtureRoot, `case-${fixtureCount++}.jobs.json`);
return {
storePath,
};
}
export function createDueIsolatedJob(params: {
id: string;
nowMs: number;
nextRunAtMs: number;
deleteAfterRun?: boolean;
}): CronJob {
return {
id: params.id,
name: params.id,
enabled: true,
deleteAfterRun: params.deleteAfterRun ?? false,
createdAtMs: params.nowMs,
updatedAtMs: params.nowMs,
schedule: { kind: "at", at: new Date(params.nextRunAtMs).toISOString() },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: params.id },
delivery: { mode: "none" },
state: { nextRunAtMs: params.nextRunAtMs },
};
}
export function createDefaultIsolatedRunner(): CronServiceOptions["runIsolatedAgentJob"] {
return vi.fn().mockResolvedValue({
status: "ok",
summary: "ok",
}) as CronServiceOptions["runIsolatedAgentJob"];
}
export function createAbortAwareIsolatedRunner(summary = "late") {
let observedAbortSignal: AbortSignal | undefined;
const runIsolatedAgentJob = vi.fn(async ({ abortSignal }) => {
observedAbortSignal = abortSignal;
await new Promise<void>((resolve) => {
if (!abortSignal) {
return;
}
if (abortSignal.aborted) {
resolve();
return;
}
abortSignal.addEventListener("abort", () => resolve(), { once: true });
});
return { status: "ok" as const, summary };
}) as CronServiceOptions["runIsolatedAgentJob"];
return {
runIsolatedAgentJob,
getObservedAbortSignal: () => observedAbortSignal,
};
}
export function createIsolatedRegressionJob(params: {
id: string;
name: string;
scheduledAt: number;
schedule: CronJob["schedule"];
payload: CronJob["payload"];
state?: CronJobState;
}): CronJob {
return {
id: params.id,
name: params.name,
enabled: true,
createdAtMs: params.scheduledAt - 86_400_000,
updatedAtMs: params.scheduledAt - 86_400_000,
schedule: params.schedule,
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: params.payload,
delivery: { mode: "announce" },
state: params.state ?? {},
};
}
export async function writeCronJobs(storePath: string, jobs: CronJob[]) {
await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }), "utf-8");
}
export async function writeCronStoreSnapshot(storePath: string, jobs: unknown[]) {
await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }), "utf-8");
}
export async function startCronForStore(params: {
storePath: string;
cronEnabled?: boolean;
enqueueSystemEvent?: CronServiceOptions["enqueueSystemEvent"];
requestHeartbeatNow?: CronServiceOptions["requestHeartbeatNow"];
runIsolatedAgentJob?: CronServiceOptions["runIsolatedAgentJob"];
onEvent?: CronServiceOptions["onEvent"];
}) {
const enqueueSystemEvent =
params.enqueueSystemEvent ?? (vi.fn() as unknown as CronServiceOptions["enqueueSystemEvent"]);
const requestHeartbeatNow =
params.requestHeartbeatNow ?? (vi.fn() as unknown as CronServiceOptions["requestHeartbeatNow"]);
const runIsolatedAgentJob = params.runIsolatedAgentJob ?? createDefaultIsolatedRunner();
const { CronService } = await import("./service.js");
const cron = new CronService({
cronEnabled: params.cronEnabled ?? true,
storePath: params.storePath,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob,
...(params.onEvent ? { onEvent: params.onEvent } : {}),
});
await cron.start();
return cron;
}

View File

@@ -1,10 +1,19 @@
import crypto from "node:crypto";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { describe, expect, it, vi } from "vitest";
import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js";
import * as schedule from "./schedule.js";
import {
createAbortAwareIsolatedRunner,
createDefaultIsolatedRunner,
createDueIsolatedJob,
createIsolatedRegressionJob,
noopLogger,
setupCronIssueRegressionFixtures,
startCronForStore,
topOfHourOffsetMs,
writeCronJobs,
writeCronStoreSnapshot,
} from "./service.issue-regressions.test-helpers.js";
import { CronService } from "./service.js";
import { createDeferred, createRunningCronServiceState } from "./service.test-harness.js";
import { computeJobNextRunAtMs } from "./service/jobs.js";
@@ -19,156 +28,10 @@ import {
} from "./service/timer.js";
import type { CronJob, CronJobState } from "./types.js";
const noopLogger = {
info: () => {},
warn: () => {},
error: () => {},
debug: () => {},
trace: () => {},
};
const TOP_OF_HOUR_STAGGER_MS = 5 * 60 * 1_000;
const FAST_TIMEOUT_SECONDS = 0.0025;
type CronServiceOptions = ConstructorParameters<typeof CronService>[0];
function topOfHourOffsetMs(jobId: string) {
const digest = crypto.createHash("sha256").update(jobId).digest();
return digest.readUInt32BE(0) % TOP_OF_HOUR_STAGGER_MS;
}
let fixtureRoot = "";
let fixtureCount = 0;
function makeStorePath() {
const storePath = path.join(fixtureRoot, `case-${fixtureCount++}.jobs.json`);
return {
storePath,
};
}
function createDueIsolatedJob(params: {
id: string;
nowMs: number;
nextRunAtMs: number;
deleteAfterRun?: boolean;
}): CronJob {
return {
id: params.id,
name: params.id,
enabled: true,
deleteAfterRun: params.deleteAfterRun ?? false,
createdAtMs: params.nowMs,
updatedAtMs: params.nowMs,
schedule: { kind: "at", at: new Date(params.nextRunAtMs).toISOString() },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: params.id },
delivery: { mode: "none" },
state: { nextRunAtMs: params.nextRunAtMs },
};
}
function createDefaultIsolatedRunner(): CronServiceOptions["runIsolatedAgentJob"] {
return vi.fn().mockResolvedValue({
status: "ok",
summary: "ok",
}) as CronServiceOptions["runIsolatedAgentJob"];
}
function createAbortAwareIsolatedRunner(summary = "late") {
let observedAbortSignal: AbortSignal | undefined;
const runIsolatedAgentJob = vi.fn(async ({ abortSignal }) => {
observedAbortSignal = abortSignal;
await new Promise<void>((resolve) => {
if (!abortSignal) {
return;
}
if (abortSignal.aborted) {
resolve();
return;
}
abortSignal.addEventListener("abort", () => resolve(), { once: true });
});
return { status: "ok" as const, summary };
}) as CronServiceOptions["runIsolatedAgentJob"];
return {
runIsolatedAgentJob,
getObservedAbortSignal: () => observedAbortSignal,
};
}
function createIsolatedRegressionJob(params: {
id: string;
name: string;
scheduledAt: number;
schedule: CronJob["schedule"];
payload: CronJob["payload"];
state?: CronJobState;
}): CronJob {
return {
id: params.id,
name: params.name,
enabled: true,
createdAtMs: params.scheduledAt - 86_400_000,
updatedAtMs: params.scheduledAt - 86_400_000,
schedule: params.schedule,
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: params.payload,
delivery: { mode: "announce" },
state: params.state ?? {},
};
}
async function writeCronJobs(storePath: string, jobs: CronJob[]) {
await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }), "utf-8");
}
async function writeCronStoreSnapshot(storePath: string, jobs: unknown[]) {
await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }), "utf-8");
}
async function startCronForStore(params: {
storePath: string;
cronEnabled?: boolean;
enqueueSystemEvent?: CronServiceOptions["enqueueSystemEvent"];
requestHeartbeatNow?: CronServiceOptions["requestHeartbeatNow"];
runIsolatedAgentJob?: CronServiceOptions["runIsolatedAgentJob"];
onEvent?: CronServiceOptions["onEvent"];
}) {
const enqueueSystemEvent =
params.enqueueSystemEvent ?? (vi.fn() as unknown as CronServiceOptions["enqueueSystemEvent"]);
const requestHeartbeatNow =
params.requestHeartbeatNow ?? (vi.fn() as unknown as CronServiceOptions["requestHeartbeatNow"]);
const runIsolatedAgentJob = params.runIsolatedAgentJob ?? createDefaultIsolatedRunner();
const cron = new CronService({
cronEnabled: params.cronEnabled ?? true,
storePath: params.storePath,
log: noopLogger,
enqueueSystemEvent,
requestHeartbeatNow,
runIsolatedAgentJob,
...(params.onEvent ? { onEvent: params.onEvent } : {}),
});
await cron.start();
return cron;
}
describe("Cron issue regressions", () => {
beforeAll(async () => {
fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "cron-issues-"));
});
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-02-06T10:05:00.000Z"));
});
afterAll(async () => {
vi.useRealTimers();
await fs.rm(fixtureRoot, { recursive: true, force: true });
});
const { makeStorePath } = setupCronIssueRegressionFixtures();
it("covers schedule updates and payload patching", async () => {
const store = makeStorePath();

View File

@@ -0,0 +1,10 @@
import { vi } from "vitest";
export function useFrozenTime(at: string | number | Date): void {
vi.useFakeTimers();
vi.setSystemTime(at);
}
export function useRealTime(): void {
vi.useRealTimers();
}