From 47736e3432b7129cdbc9b0f631519bb35e1ece4c Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 3 Mar 2026 01:42:33 +0000 Subject: [PATCH] refactor(test): extract cron issue-regression harness and frozen-time helper --- .../service.issue-regressions.test-helpers.ts | 165 ++++++++++++++++++ src/cron/service.issue-regressions.test.ts | 165 ++---------------- src/test-utils/frozen-time.ts | 10 ++ 3 files changed, 189 insertions(+), 151 deletions(-) create mode 100644 src/cron/service.issue-regressions.test-helpers.ts create mode 100644 src/test-utils/frozen-time.ts diff --git a/src/cron/service.issue-regressions.test-helpers.ts b/src/cron/service.issue-regressions.test-helpers.ts new file mode 100644 index 00000000000..d6a680e21f0 --- /dev/null +++ b/src/cron/service.issue-regressions.test-helpers.ts @@ -0,0 +1,165 @@ +import crypto from "node:crypto"; +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterAll, beforeAll, beforeEach, vi } from "vitest"; +import { useFrozenTime, useRealTime } from "../test-utils/frozen-time.js"; +import type { CronService } from "./service.js"; +import type { CronJob, CronJobState } from "./types.js"; + +const TOP_OF_HOUR_STAGGER_MS = 5 * 60 * 1_000; + +export const noopLogger = { + info: () => {}, + warn: () => {}, + error: () => {}, + debug: () => {}, + trace: () => {}, +}; + +let fixtureRoot = ""; +let fixtureCount = 0; + +export type CronServiceOptions = ConstructorParameters[0]; + +export function setupCronIssueRegressionFixtures() { + beforeAll(async () => { + fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "cron-issues-")); + }); + + beforeEach(() => { + useFrozenTime("2026-02-06T10:05:00.000Z"); + }); + + afterAll(async () => { + useRealTime(); + await fs.rm(fixtureRoot, { recursive: true, force: true }); + }); + + return { + makeStorePath, + }; +} + +export function topOfHourOffsetMs(jobId: string) { + const digest = crypto.createHash("sha256").update(jobId).digest(); + return digest.readUInt32BE(0) % TOP_OF_HOUR_STAGGER_MS; +} + +export function makeStorePath() { + const storePath = path.join(fixtureRoot, `case-${fixtureCount++}.jobs.json`); + return { + storePath, + }; +} + +export function createDueIsolatedJob(params: { + id: string; + nowMs: number; + nextRunAtMs: number; + deleteAfterRun?: boolean; +}): CronJob { + return { + id: params.id, + name: params.id, + enabled: true, + deleteAfterRun: params.deleteAfterRun ?? false, + createdAtMs: params.nowMs, + updatedAtMs: params.nowMs, + schedule: { kind: "at", at: new Date(params.nextRunAtMs).toISOString() }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: params.id }, + delivery: { mode: "none" }, + state: { nextRunAtMs: params.nextRunAtMs }, + }; +} + +export function createDefaultIsolatedRunner(): CronServiceOptions["runIsolatedAgentJob"] { + return vi.fn().mockResolvedValue({ + status: "ok", + summary: "ok", + }) as CronServiceOptions["runIsolatedAgentJob"]; +} + +export function createAbortAwareIsolatedRunner(summary = "late") { + let observedAbortSignal: AbortSignal | undefined; + const runIsolatedAgentJob = vi.fn(async ({ abortSignal }) => { + observedAbortSignal = abortSignal; + await new Promise((resolve) => { + if (!abortSignal) { + return; + } + if (abortSignal.aborted) { + resolve(); + return; + } + abortSignal.addEventListener("abort", () => resolve(), { once: true }); + }); + return { status: "ok" as const, summary }; + }) as CronServiceOptions["runIsolatedAgentJob"]; + + return { + runIsolatedAgentJob, + getObservedAbortSignal: () => observedAbortSignal, + }; +} + +export function createIsolatedRegressionJob(params: { + id: string; + name: string; + scheduledAt: number; + schedule: CronJob["schedule"]; + payload: CronJob["payload"]; + state?: CronJobState; +}): CronJob { + return { + id: params.id, + name: params.name, + enabled: true, + createdAtMs: params.scheduledAt - 86_400_000, + updatedAtMs: params.scheduledAt - 86_400_000, + schedule: params.schedule, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: params.payload, + delivery: { mode: "announce" }, + state: params.state ?? {}, + }; +} + +export async function writeCronJobs(storePath: string, jobs: CronJob[]) { + await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }), "utf-8"); +} + +export async function writeCronStoreSnapshot(storePath: string, jobs: unknown[]) { + await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }), "utf-8"); +} + +export async function startCronForStore(params: { + storePath: string; + cronEnabled?: boolean; + enqueueSystemEvent?: CronServiceOptions["enqueueSystemEvent"]; + requestHeartbeatNow?: CronServiceOptions["requestHeartbeatNow"]; + runIsolatedAgentJob?: CronServiceOptions["runIsolatedAgentJob"]; + onEvent?: CronServiceOptions["onEvent"]; +}) { + const enqueueSystemEvent = + params.enqueueSystemEvent ?? (vi.fn() as unknown as CronServiceOptions["enqueueSystemEvent"]); + const requestHeartbeatNow = + params.requestHeartbeatNow ?? (vi.fn() as unknown as CronServiceOptions["requestHeartbeatNow"]); + const runIsolatedAgentJob = params.runIsolatedAgentJob ?? createDefaultIsolatedRunner(); + + const { CronService } = await import("./service.js"); + const cron = new CronService({ + cronEnabled: params.cronEnabled ?? true, + storePath: params.storePath, + log: noopLogger, + enqueueSystemEvent, + requestHeartbeatNow, + runIsolatedAgentJob, + ...(params.onEvent ? { onEvent: params.onEvent } : {}), + }); + await cron.start(); + return cron; +} diff --git a/src/cron/service.issue-regressions.test.ts b/src/cron/service.issue-regressions.test.ts index 66dc3a03108..ed6a927686e 100644 --- a/src/cron/service.issue-regressions.test.ts +++ b/src/cron/service.issue-regressions.test.ts @@ -1,10 +1,19 @@ -import crypto from "node:crypto"; import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; -import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import type { HeartbeatRunResult } from "../infra/heartbeat-wake.js"; import * as schedule from "./schedule.js"; +import { + createAbortAwareIsolatedRunner, + createDefaultIsolatedRunner, + createDueIsolatedJob, + createIsolatedRegressionJob, + noopLogger, + setupCronIssueRegressionFixtures, + startCronForStore, + topOfHourOffsetMs, + writeCronJobs, + writeCronStoreSnapshot, +} from "./service.issue-regressions.test-helpers.js"; import { CronService } from "./service.js"; import { createDeferred, createRunningCronServiceState } from "./service.test-harness.js"; import { computeJobNextRunAtMs } from "./service/jobs.js"; @@ -19,156 +28,10 @@ import { } from "./service/timer.js"; import type { CronJob, CronJobState } from "./types.js"; -const noopLogger = { - info: () => {}, - warn: () => {}, - error: () => {}, - debug: () => {}, - trace: () => {}, -}; -const TOP_OF_HOUR_STAGGER_MS = 5 * 60 * 1_000; const FAST_TIMEOUT_SECONDS = 0.0025; -type CronServiceOptions = ConstructorParameters[0]; - -function topOfHourOffsetMs(jobId: string) { - const digest = crypto.createHash("sha256").update(jobId).digest(); - return digest.readUInt32BE(0) % TOP_OF_HOUR_STAGGER_MS; -} - -let fixtureRoot = ""; -let fixtureCount = 0; - -function makeStorePath() { - const storePath = path.join(fixtureRoot, `case-${fixtureCount++}.jobs.json`); - return { - storePath, - }; -} - -function createDueIsolatedJob(params: { - id: string; - nowMs: number; - nextRunAtMs: number; - deleteAfterRun?: boolean; -}): CronJob { - return { - id: params.id, - name: params.id, - enabled: true, - deleteAfterRun: params.deleteAfterRun ?? false, - createdAtMs: params.nowMs, - updatedAtMs: params.nowMs, - schedule: { kind: "at", at: new Date(params.nextRunAtMs).toISOString() }, - sessionTarget: "isolated", - wakeMode: "next-heartbeat", - payload: { kind: "agentTurn", message: params.id }, - delivery: { mode: "none" }, - state: { nextRunAtMs: params.nextRunAtMs }, - }; -} - -function createDefaultIsolatedRunner(): CronServiceOptions["runIsolatedAgentJob"] { - return vi.fn().mockResolvedValue({ - status: "ok", - summary: "ok", - }) as CronServiceOptions["runIsolatedAgentJob"]; -} - -function createAbortAwareIsolatedRunner(summary = "late") { - let observedAbortSignal: AbortSignal | undefined; - const runIsolatedAgentJob = vi.fn(async ({ abortSignal }) => { - observedAbortSignal = abortSignal; - await new Promise((resolve) => { - if (!abortSignal) { - return; - } - if (abortSignal.aborted) { - resolve(); - return; - } - abortSignal.addEventListener("abort", () => resolve(), { once: true }); - }); - return { status: "ok" as const, summary }; - }) as CronServiceOptions["runIsolatedAgentJob"]; - - return { - runIsolatedAgentJob, - getObservedAbortSignal: () => observedAbortSignal, - }; -} - -function createIsolatedRegressionJob(params: { - id: string; - name: string; - scheduledAt: number; - schedule: CronJob["schedule"]; - payload: CronJob["payload"]; - state?: CronJobState; -}): CronJob { - return { - id: params.id, - name: params.name, - enabled: true, - createdAtMs: params.scheduledAt - 86_400_000, - updatedAtMs: params.scheduledAt - 86_400_000, - schedule: params.schedule, - sessionTarget: "isolated", - wakeMode: "next-heartbeat", - payload: params.payload, - delivery: { mode: "announce" }, - state: params.state ?? {}, - }; -} - -async function writeCronJobs(storePath: string, jobs: CronJob[]) { - await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }), "utf-8"); -} - -async function writeCronStoreSnapshot(storePath: string, jobs: unknown[]) { - await fs.writeFile(storePath, JSON.stringify({ version: 1, jobs }), "utf-8"); -} - -async function startCronForStore(params: { - storePath: string; - cronEnabled?: boolean; - enqueueSystemEvent?: CronServiceOptions["enqueueSystemEvent"]; - requestHeartbeatNow?: CronServiceOptions["requestHeartbeatNow"]; - runIsolatedAgentJob?: CronServiceOptions["runIsolatedAgentJob"]; - onEvent?: CronServiceOptions["onEvent"]; -}) { - const enqueueSystemEvent = - params.enqueueSystemEvent ?? (vi.fn() as unknown as CronServiceOptions["enqueueSystemEvent"]); - const requestHeartbeatNow = - params.requestHeartbeatNow ?? (vi.fn() as unknown as CronServiceOptions["requestHeartbeatNow"]); - const runIsolatedAgentJob = params.runIsolatedAgentJob ?? createDefaultIsolatedRunner(); - - const cron = new CronService({ - cronEnabled: params.cronEnabled ?? true, - storePath: params.storePath, - log: noopLogger, - enqueueSystemEvent, - requestHeartbeatNow, - runIsolatedAgentJob, - ...(params.onEvent ? { onEvent: params.onEvent } : {}), - }); - await cron.start(); - return cron; -} describe("Cron issue regressions", () => { - beforeAll(async () => { - fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "cron-issues-")); - }); - - beforeEach(() => { - vi.useFakeTimers(); - vi.setSystemTime(new Date("2026-02-06T10:05:00.000Z")); - }); - - afterAll(async () => { - vi.useRealTimers(); - await fs.rm(fixtureRoot, { recursive: true, force: true }); - }); + const { makeStorePath } = setupCronIssueRegressionFixtures(); it("covers schedule updates and payload patching", async () => { const store = makeStorePath(); diff --git a/src/test-utils/frozen-time.ts b/src/test-utils/frozen-time.ts new file mode 100644 index 00000000000..f5e626fad21 --- /dev/null +++ b/src/test-utils/frozen-time.ts @@ -0,0 +1,10 @@ +import { vi } from "vitest"; + +export function useFrozenTime(at: string | number | Date): void { + vi.useFakeTimers(); + vi.setSystemTime(at); +} + +export function useRealTime(): void { + vi.useRealTimers(); +}