mirror of
https://github.com/moltbot/moltbot.git
synced 2026-03-07 22:44:16 +00:00
fix(cron): fix test failures and regenerate protocol files
- Add forceReload option to ensureLoaded to avoid stat I/O in normal paths while still detecting cross-service writes in the timer path - Post isolated job summary back to main session (restores the old isolation.postToMainPrefix behavior via delivery model) - Update legacy migration tests to check delivery.channel instead of payload.channel (normalization now moves delivery fields to top-level) - Remove legacy deliver/channel/to/bestEffortDeliver from payload schema - Update protocol conformance test for delivery modes - Regenerate GatewayModels.swift (isolation -> delivery)
This commit is contained in:
committed by
Peter Steinberger
parent
6fb8d8850e
commit
f8d2534062
@@ -1872,7 +1872,7 @@ public struct CronJob: Codable, Sendable {
|
||||
public let sessiontarget: AnyCodable
|
||||
public let wakemode: AnyCodable
|
||||
public let payload: AnyCodable
|
||||
public let isolation: [String: AnyCodable]?
|
||||
public let delivery: [String: AnyCodable]?
|
||||
public let state: [String: AnyCodable]
|
||||
|
||||
public init(
|
||||
@@ -1888,7 +1888,7 @@ public struct CronJob: Codable, Sendable {
|
||||
sessiontarget: AnyCodable,
|
||||
wakemode: AnyCodable,
|
||||
payload: AnyCodable,
|
||||
isolation: [String: AnyCodable]?,
|
||||
delivery: [String: AnyCodable]?,
|
||||
state: [String: AnyCodable]
|
||||
) {
|
||||
self.id = id
|
||||
@@ -1903,7 +1903,7 @@ public struct CronJob: Codable, Sendable {
|
||||
self.sessiontarget = sessiontarget
|
||||
self.wakemode = wakemode
|
||||
self.payload = payload
|
||||
self.isolation = isolation
|
||||
self.delivery = delivery
|
||||
self.state = state
|
||||
}
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
@@ -1919,7 +1919,7 @@ public struct CronJob: Codable, Sendable {
|
||||
case sessiontarget = "sessionTarget"
|
||||
case wakemode = "wakeMode"
|
||||
case payload
|
||||
case isolation
|
||||
case delivery
|
||||
case state
|
||||
}
|
||||
}
|
||||
@@ -1950,7 +1950,7 @@ public struct CronAddParams: Codable, Sendable {
|
||||
public let sessiontarget: AnyCodable
|
||||
public let wakemode: AnyCodable
|
||||
public let payload: AnyCodable
|
||||
public let isolation: [String: AnyCodable]?
|
||||
public let delivery: [String: AnyCodable]?
|
||||
|
||||
public init(
|
||||
name: String,
|
||||
@@ -1962,7 +1962,7 @@ public struct CronAddParams: Codable, Sendable {
|
||||
sessiontarget: AnyCodable,
|
||||
wakemode: AnyCodable,
|
||||
payload: AnyCodable,
|
||||
isolation: [String: AnyCodable]?
|
||||
delivery: [String: AnyCodable]?
|
||||
) {
|
||||
self.name = name
|
||||
self.agentid = agentid
|
||||
@@ -1973,7 +1973,7 @@ public struct CronAddParams: Codable, Sendable {
|
||||
self.sessiontarget = sessiontarget
|
||||
self.wakemode = wakemode
|
||||
self.payload = payload
|
||||
self.isolation = isolation
|
||||
self.delivery = delivery
|
||||
}
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case name
|
||||
@@ -1985,7 +1985,7 @@ public struct CronAddParams: Codable, Sendable {
|
||||
case sessiontarget = "sessionTarget"
|
||||
case wakemode = "wakeMode"
|
||||
case payload
|
||||
case isolation
|
||||
case delivery
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1872,7 +1872,7 @@ public struct CronJob: Codable, Sendable {
|
||||
public let sessiontarget: AnyCodable
|
||||
public let wakemode: AnyCodable
|
||||
public let payload: AnyCodable
|
||||
public let isolation: [String: AnyCodable]?
|
||||
public let delivery: [String: AnyCodable]?
|
||||
public let state: [String: AnyCodable]
|
||||
|
||||
public init(
|
||||
@@ -1888,7 +1888,7 @@ public struct CronJob: Codable, Sendable {
|
||||
sessiontarget: AnyCodable,
|
||||
wakemode: AnyCodable,
|
||||
payload: AnyCodable,
|
||||
isolation: [String: AnyCodable]?,
|
||||
delivery: [String: AnyCodable]?,
|
||||
state: [String: AnyCodable]
|
||||
) {
|
||||
self.id = id
|
||||
@@ -1903,7 +1903,7 @@ public struct CronJob: Codable, Sendable {
|
||||
self.sessiontarget = sessiontarget
|
||||
self.wakemode = wakemode
|
||||
self.payload = payload
|
||||
self.isolation = isolation
|
||||
self.delivery = delivery
|
||||
self.state = state
|
||||
}
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
@@ -1919,7 +1919,7 @@ public struct CronJob: Codable, Sendable {
|
||||
case sessiontarget = "sessionTarget"
|
||||
case wakemode = "wakeMode"
|
||||
case payload
|
||||
case isolation
|
||||
case delivery
|
||||
case state
|
||||
}
|
||||
}
|
||||
@@ -1950,7 +1950,7 @@ public struct CronAddParams: Codable, Sendable {
|
||||
public let sessiontarget: AnyCodable
|
||||
public let wakemode: AnyCodable
|
||||
public let payload: AnyCodable
|
||||
public let isolation: [String: AnyCodable]?
|
||||
public let delivery: [String: AnyCodable]?
|
||||
|
||||
public init(
|
||||
name: String,
|
||||
@@ -1962,7 +1962,7 @@ public struct CronAddParams: Codable, Sendable {
|
||||
sessiontarget: AnyCodable,
|
||||
wakemode: AnyCodable,
|
||||
payload: AnyCodable,
|
||||
isolation: [String: AnyCodable]?
|
||||
delivery: [String: AnyCodable]?
|
||||
) {
|
||||
self.name = name
|
||||
self.agentid = agentid
|
||||
@@ -1973,7 +1973,7 @@ public struct CronAddParams: Codable, Sendable {
|
||||
self.sessiontarget = sessiontarget
|
||||
self.wakemode = wakemode
|
||||
self.payload = payload
|
||||
self.isolation = isolation
|
||||
self.delivery = delivery
|
||||
}
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case name
|
||||
@@ -1985,7 +1985,7 @@ public struct CronAddParams: Codable, Sendable {
|
||||
case sessiontarget = "sessionTarget"
|
||||
case wakemode = "wakeMode"
|
||||
case payload
|
||||
case isolation
|
||||
case delivery
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -66,11 +66,11 @@ export function parseAt(input: string): string | null {
|
||||
return null;
|
||||
}
|
||||
const absolute = parseAbsoluteTimeMs(raw);
|
||||
if (absolute) {
|
||||
if (absolute !== null) {
|
||||
return new Date(absolute).toISOString();
|
||||
}
|
||||
const dur = parseDurationMs(raw);
|
||||
if (dur) {
|
||||
if (dur !== null) {
|
||||
return new Date(Date.now() + dur).toISOString();
|
||||
}
|
||||
return null;
|
||||
|
||||
@@ -2,39 +2,29 @@ import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { MACOS_APP_SOURCES_DIR } from "../compat/legacy-names.js";
|
||||
import { CronPayloadSchema } from "../gateway/protocol/schema.js";
|
||||
import { CronDeliverySchema } from "../gateway/protocol/schema.js";
|
||||
|
||||
type SchemaLike = {
|
||||
anyOf?: Array<{ properties?: Record<string, unknown> }>;
|
||||
anyOf?: Array<{ properties?: Record<string, unknown>; const?: unknown }>;
|
||||
properties?: Record<string, unknown>;
|
||||
const?: unknown;
|
||||
};
|
||||
|
||||
type ProviderSchema = {
|
||||
anyOf?: Array<{ const?: unknown }>;
|
||||
};
|
||||
|
||||
function extractCronChannels(schema: SchemaLike): string[] {
|
||||
const union = schema.anyOf ?? [];
|
||||
const payloadWithChannel = union.find((entry) =>
|
||||
Boolean(entry?.properties && "channel" in entry.properties),
|
||||
);
|
||||
const channelSchema = payloadWithChannel?.properties
|
||||
? (payloadWithChannel.properties.channel as ProviderSchema)
|
||||
: undefined;
|
||||
const channels = (channelSchema?.anyOf ?? [])
|
||||
function extractDeliveryModes(schema: SchemaLike): string[] {
|
||||
const modeSchema = schema.properties?.mode as SchemaLike | undefined;
|
||||
return (modeSchema?.anyOf ?? [])
|
||||
.map((entry) => entry?.const)
|
||||
.filter((value): value is string => typeof value === "string");
|
||||
return channels;
|
||||
}
|
||||
|
||||
const UI_FILES = ["ui/src/ui/types.ts", "ui/src/ui/ui-types.ts", "ui/src/ui/views/cron.ts"];
|
||||
|
||||
const SWIFT_FILE_CANDIDATES = [`${MACOS_APP_SOURCES_DIR}/GatewayConnection.swift`];
|
||||
const SWIFT_MODEL_CANDIDATES = [`${MACOS_APP_SOURCES_DIR}/CronModels.swift`];
|
||||
const SWIFT_STATUS_CANDIDATES = [`${MACOS_APP_SOURCES_DIR}/GatewayConnection.swift`];
|
||||
|
||||
async function resolveSwiftFiles(cwd: string): Promise<string[]> {
|
||||
async function resolveSwiftFiles(cwd: string, candidates: string[]): Promise<string[]> {
|
||||
const matches: string[] = [];
|
||||
for (const relPath of SWIFT_FILE_CANDIDATES) {
|
||||
for (const relPath of candidates) {
|
||||
try {
|
||||
await fs.access(path.join(cwd, relPath));
|
||||
matches.push(relPath);
|
||||
@@ -43,30 +33,32 @@ async function resolveSwiftFiles(cwd: string): Promise<string[]> {
|
||||
}
|
||||
}
|
||||
if (matches.length === 0) {
|
||||
throw new Error(`Missing Swift cron definition. Tried: ${SWIFT_FILE_CANDIDATES.join(", ")}`);
|
||||
throw new Error(`Missing Swift cron definition. Tried: ${candidates.join(", ")}`);
|
||||
}
|
||||
return matches;
|
||||
}
|
||||
|
||||
describe("cron protocol conformance", () => {
|
||||
it("ui + swift include all cron providers from gateway schema", async () => {
|
||||
const channels = extractCronChannels(CronPayloadSchema as SchemaLike);
|
||||
expect(channels.length).toBeGreaterThan(0);
|
||||
it("ui + swift include all cron delivery modes from gateway schema", async () => {
|
||||
const modes = extractDeliveryModes(CronDeliverySchema as SchemaLike);
|
||||
expect(modes.length).toBeGreaterThan(0);
|
||||
|
||||
const cwd = process.cwd();
|
||||
for (const relPath of UI_FILES) {
|
||||
const content = await fs.readFile(path.join(cwd, relPath), "utf-8");
|
||||
for (const channel of channels) {
|
||||
expect(content.includes(`"${channel}"`), `${relPath} missing ${channel}`).toBe(true);
|
||||
for (const mode of modes) {
|
||||
expect(content.includes(`"${mode}"`), `${relPath} missing delivery mode ${mode}`).toBe(
|
||||
true,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const swiftFiles = await resolveSwiftFiles(cwd);
|
||||
for (const relPath of swiftFiles) {
|
||||
const swiftModelFiles = await resolveSwiftFiles(cwd, SWIFT_MODEL_CANDIDATES);
|
||||
for (const relPath of swiftModelFiles) {
|
||||
const content = await fs.readFile(path.join(cwd, relPath), "utf-8");
|
||||
for (const channel of channels) {
|
||||
const pattern = new RegExp(`\\bcase\\s+${channel}\\b`);
|
||||
expect(pattern.test(content), `${relPath} missing case ${channel}`).toBe(true);
|
||||
for (const mode of modes) {
|
||||
const pattern = new RegExp(`\\bcase\\s+${mode}\\b`);
|
||||
expect(pattern.test(content), `${relPath} missing case ${mode}`).toBe(true);
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -78,7 +70,7 @@ describe("cron protocol conformance", () => {
|
||||
expect(uiTypes.includes("jobs:")).toBe(true);
|
||||
expect(uiTypes.includes("jobCount")).toBe(false);
|
||||
|
||||
const [swiftRelPath] = await resolveSwiftFiles(cwd);
|
||||
const [swiftRelPath] = await resolveSwiftFiles(cwd, SWIFT_STATUS_CANDIDATES);
|
||||
const swiftPath = path.join(cwd, swiftRelPath);
|
||||
const swift = await fs.readFile(swiftPath, "utf-8");
|
||||
expect(swift.includes("struct CronSchedulerStatus")).toBe(true);
|
||||
|
||||
@@ -270,9 +270,12 @@ describe("CronService", () => {
|
||||
await cron.start();
|
||||
const jobs = await cron.list({ includeDisabled: true });
|
||||
const job = jobs.find((j) => j.id === rawJob.id);
|
||||
// Legacy delivery fields are migrated to the top-level delivery object
|
||||
const delivery = job?.delivery as unknown as Record<string, unknown>;
|
||||
expect(delivery?.channel).toBe("telegram");
|
||||
const payload = job?.payload as unknown as Record<string, unknown>;
|
||||
expect(payload.channel).toBe("telegram");
|
||||
expect("provider" in payload).toBe(false);
|
||||
expect("channel" in payload).toBe(false);
|
||||
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
@@ -321,8 +324,9 @@ describe("CronService", () => {
|
||||
await cron.start();
|
||||
const jobs = await cron.list({ includeDisabled: true });
|
||||
const job = jobs.find((j) => j.id === rawJob.id);
|
||||
const payload = job?.payload as unknown as Record<string, unknown>;
|
||||
expect(payload.channel).toBe("telegram");
|
||||
// Legacy delivery fields are migrated to the top-level delivery object
|
||||
const delivery = job?.delivery as unknown as Record<string, unknown>;
|
||||
expect(delivery?.channel).toBe("telegram");
|
||||
|
||||
cron.stop();
|
||||
await store.cleanup();
|
||||
|
||||
@@ -126,23 +126,24 @@ async function getFileMtimeMs(path: string): Promise<number | null> {
|
||||
}
|
||||
}
|
||||
|
||||
export async function ensureLoaded(state: CronServiceState) {
|
||||
const fileMtimeMs = await getFileMtimeMs(state.deps.storePath);
|
||||
|
||||
// Check if we need to reload:
|
||||
// - No store loaded yet
|
||||
// - File modification time has changed
|
||||
// - File was modified after we last loaded (external edit)
|
||||
const needsReload =
|
||||
!state.store ||
|
||||
(fileMtimeMs !== null &&
|
||||
state.storeFileMtimeMs !== null &&
|
||||
fileMtimeMs > state.storeFileMtimeMs);
|
||||
|
||||
if (!needsReload) {
|
||||
export async function ensureLoaded(state: CronServiceState, opts?: { forceReload?: boolean }) {
|
||||
// Fast path: store is already in memory. The timer path passes
|
||||
// forceReload=true so that cross-service writes to the same store file
|
||||
// are always picked up. Other callers (add, list, run, …) trust the
|
||||
// in-memory copy to avoid a stat syscall on every operation.
|
||||
if (state.store && !opts?.forceReload) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (opts?.forceReload && state.store) {
|
||||
// Only pay for the stat when we're explicitly checking for external edits.
|
||||
const mtime = await getFileMtimeMs(state.deps.storePath);
|
||||
if (mtime !== null && state.storeFileMtimeMs !== null && mtime === state.storeFileMtimeMs) {
|
||||
return; // File unchanged since our last load/persist.
|
||||
}
|
||||
}
|
||||
|
||||
const fileMtimeMs = await getFileMtimeMs(state.deps.storePath);
|
||||
const loaded = await loadCronStore(state.deps.storePath);
|
||||
const jobs = (loaded.jobs ?? []) as unknown as Array<Record<string, unknown>>;
|
||||
let mutated = false;
|
||||
|
||||
@@ -37,7 +37,7 @@ export async function onTimer(state: CronServiceState) {
|
||||
state.running = true;
|
||||
try {
|
||||
await locked(state, async () => {
|
||||
await ensureLoaded(state);
|
||||
await ensureLoaded(state, { forceReload: true });
|
||||
await runDueJobs(state);
|
||||
await persist(state);
|
||||
armTimer(state);
|
||||
@@ -184,6 +184,18 @@ export async function executeJob(
|
||||
job,
|
||||
message: job.payload.message,
|
||||
});
|
||||
|
||||
// Post a short summary back to the main session so the user sees
|
||||
// the cron result without opening the isolated session.
|
||||
const summaryText = res.summary?.trim();
|
||||
if (summaryText) {
|
||||
const prefix = "Cron";
|
||||
const label =
|
||||
res.status === "error" ? `${prefix} (error): ${summaryText}` : `${prefix}: ${summaryText}`;
|
||||
state.deps.enqueueSystemEvent(label, { agentId: job.agentId });
|
||||
state.deps.requestHeartbeatNow({ reason: `cron:${job.id}` });
|
||||
}
|
||||
|
||||
if (res.status === "ok") {
|
||||
await finish("ok", undefined, res.summary);
|
||||
} else if (res.status === "skipped") {
|
||||
|
||||
@@ -42,10 +42,6 @@ export const CronPayloadSchema = Type.Union([
|
||||
model: Type.Optional(Type.String()),
|
||||
thinking: Type.Optional(Type.String()),
|
||||
timeoutSeconds: Type.Optional(Type.Integer({ minimum: 1 })),
|
||||
deliver: Type.Optional(Type.Boolean()),
|
||||
channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])),
|
||||
to: Type.Optional(Type.String()),
|
||||
bestEffortDeliver: Type.Optional(Type.Boolean()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
),
|
||||
@@ -66,10 +62,6 @@ export const CronPayloadPatchSchema = Type.Union([
|
||||
model: Type.Optional(Type.String()),
|
||||
thinking: Type.Optional(Type.String()),
|
||||
timeoutSeconds: Type.Optional(Type.Integer({ minimum: 1 })),
|
||||
deliver: Type.Optional(Type.Boolean()),
|
||||
channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])),
|
||||
to: Type.Optional(Type.String()),
|
||||
bestEffortDeliver: Type.Optional(Type.Boolean()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
),
|
||||
|
||||
@@ -164,28 +164,22 @@ describe("gateway server cron", () => {
|
||||
const mergeUpdateRes = await rpcReq(ws, "cron.update", {
|
||||
id: mergeJobId,
|
||||
patch: {
|
||||
payload: { kind: "agentTurn", deliver: true, channel: "telegram", to: "19098680" },
|
||||
delivery: { mode: "announce", channel: "telegram", to: "19098680" },
|
||||
},
|
||||
});
|
||||
expect(mergeUpdateRes.ok).toBe(true);
|
||||
const merged = mergeUpdateRes.payload as
|
||||
| {
|
||||
payload?: {
|
||||
kind?: unknown;
|
||||
message?: unknown;
|
||||
model?: unknown;
|
||||
deliver?: unknown;
|
||||
channel?: unknown;
|
||||
to?: unknown;
|
||||
};
|
||||
payload?: { kind?: unknown; message?: unknown; model?: unknown };
|
||||
delivery?: { mode?: unknown; channel?: unknown; to?: unknown };
|
||||
}
|
||||
| undefined;
|
||||
expect(merged?.payload?.kind).toBe("agentTurn");
|
||||
expect(merged?.payload?.message).toBe("hello");
|
||||
expect(merged?.payload?.model).toBe("opus");
|
||||
expect(merged?.payload?.deliver).toBe(true);
|
||||
expect(merged?.payload?.channel).toBe("telegram");
|
||||
expect(merged?.payload?.to).toBe("19098680");
|
||||
expect(merged?.delivery?.mode).toBe("announce");
|
||||
expect(merged?.delivery?.channel).toBe("telegram");
|
||||
expect(merged?.delivery?.to).toBe("19098680");
|
||||
|
||||
const rejectRes = await rpcReq(ws, "cron.add", {
|
||||
name: "patch reject",
|
||||
@@ -203,7 +197,7 @@ describe("gateway server cron", () => {
|
||||
const rejectUpdateRes = await rpcReq(ws, "cron.update", {
|
||||
id: rejectJobId,
|
||||
patch: {
|
||||
payload: { kind: "agentTurn", deliver: true },
|
||||
payload: { kind: "agentTurn", message: "nope" },
|
||||
},
|
||||
});
|
||||
expect(rejectUpdateRes.ok).toBe(false);
|
||||
|
||||
Reference in New Issue
Block a user