refactor: call cron service directly for plugin scheduled turns

This commit is contained in:
Peter Steinberger
2026-05-11 03:55:03 +01:00
parent 7a859a1001
commit 13c735c083
10 changed files with 509 additions and 473 deletions

View File

@@ -2,6 +2,7 @@ import { primeConfiguredBindingRegistry } from "../channels/plugins/binding-regi
import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { PluginLookUpTable } from "../plugins/plugin-lookup-table.js";
import type { PluginRegistryParams } from "../plugins/registry-types.js";
import type { PluginRegistry } from "../plugins/registry.js";
import { pinActivePluginChannelRegistry } from "../plugins/runtime.js";
import {
@@ -35,6 +36,7 @@ type GatewayPluginBootstrapParams = {
log: GatewayPluginBootstrapLog;
coreGatewayHandlers?: Record<string, GatewayRequestHandler>;
coreGatewayMethodNames?: readonly string[];
hostServices?: PluginRegistryParams["hostServices"];
baseMethods: string[];
pluginIds?: string[];
pluginLookUpTable?: PluginLookUpTable;
@@ -102,6 +104,9 @@ export function prepareGatewayPluginLoad(params: GatewayPluginBootstrapParams) {
...(params.coreGatewayMethodNames !== undefined && {
coreGatewayMethodNames: params.coreGatewayMethodNames,
}),
...(params.hostServices !== undefined && {
hostServices: params.hostServices,
}),
baseMethods: params.baseMethods,
pluginIds: params.pluginIds,
pluginLookUpTable: params.pluginLookUpTable,

View File

@@ -8,6 +8,7 @@ import { clearActivatedPluginRuntimeState, loadOpenClawPlugins } from "../plugin
import { loadPluginLookUpTable, type PluginLookUpTable } from "../plugins/plugin-lookup-table.js";
import { getPluginModuleLoaderStats } from "../plugins/plugin-module-loader-cache.js";
import { createEmptyPluginRegistry } from "../plugins/registry-empty.js";
import type { PluginRegistryParams } from "../plugins/registry-types.js";
import { setActivePluginRegistry } from "../plugins/runtime.js";
import { getPluginRuntimeGatewayRequestScope } from "../plugins/runtime/gateway-request-scope.js";
import { createPluginRuntimeLoaderLogger } from "../plugins/runtime/load-context.js";
@@ -527,6 +528,7 @@ export function loadGatewayPlugins(params: {
};
coreGatewayHandlers?: Record<string, GatewayRequestHandler>;
coreGatewayMethodNames?: readonly string[];
hostServices?: PluginRegistryParams["hostServices"];
baseMethods: string[];
pluginIds?: string[];
pluginLookUpTable?: PluginLookUpTable;
@@ -616,6 +618,9 @@ export function loadGatewayPlugins(params: {
...(params.coreGatewayMethodNames !== undefined && {
coreGatewayMethodNames: params.coreGatewayMethodNames,
}),
...(params.hostServices !== undefined && {
hostServices: params.hostServices,
}),
runtimeOptions: {
allowGatewaySubagentBinding: true,
},

View File

@@ -4,6 +4,7 @@ import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { loadPluginLookUpTable } from "../plugins/plugin-lookup-table.js";
import type { PluginMetadataSnapshot } from "../plugins/plugin-metadata-snapshot.js";
import type { PluginRegistryParams } from "../plugins/registry-types.js";
import { createEmptyPluginRegistry } from "../plugins/registry.js";
import { getActivePluginRegistry, setActivePluginRegistry } from "../plugins/runtime.js";
import { mergeActivationSectionsIntoRuntimeConfig } from "./plugin-activation-runtime-config.js";
@@ -151,6 +152,7 @@ export async function loadGatewayStartupPluginRuntime(params: {
workspaceDir: string;
log: GatewayPluginBootstrapLog;
baseMethods: string[];
hostServices?: PluginRegistryParams["hostServices"];
startupPluginIds: string[];
pluginLookUpTable?: ReturnType<typeof loadPluginLookUpTable>;
preferSetupRuntimeForChannelPlugins?: boolean;
@@ -165,6 +167,9 @@ export async function loadGatewayStartupPluginRuntime(params: {
log: params.log,
coreGatewayMethodNames: params.baseMethods,
baseMethods: params.baseMethods,
...(params.hostServices !== undefined && {
hostServices: params.hostServices,
}),
pluginIds: params.startupPluginIds,
pluginLookUpTable: params.pluginLookUpTable,
preferSetupRuntimeForChannelPlugins: params.preferSetupRuntimeForChannelPlugins,

View File

@@ -882,6 +882,11 @@ export async function startGatewayServer(
gatewayMethods: listActiveGatewayMethods(baseGatewayMethods),
});
deps.cron = runtimeState.cronState.cron;
const pluginHostServices = {
get cron() {
return runtimeState.cronState.cron;
},
};
let closePreludeStarted = false;
let postReadyMaintenanceTimer: ReturnType<typeof setTimeout> | null = null;
@@ -1177,6 +1182,7 @@ export async function startGatewayServer(
workspaceDir: defaultWorkspaceDir,
log,
coreGatewayMethodNames: baseMethods,
hostServices: pluginHostServices,
baseMethods,
pluginLookUpTable: nextPluginLookUpTable,
});
@@ -1307,6 +1313,7 @@ export async function startGatewayServer(
workspaceDir: defaultWorkspaceDir,
log,
coreGatewayMethodNames: baseMethods,
hostServices: pluginHostServices,
baseMethods,
pluginIds: startupPluginIds,
pluginLookUpTable,
@@ -1415,6 +1422,7 @@ export async function startGatewayServer(
workspaceDir: defaultWorkspaceDir,
log,
baseMethods,
hostServices: pluginHostServices,
startupPluginIds,
pluginLookUpTable,
startupTrace,

View File

@@ -1,3 +1,4 @@
import type { PluginRegistryParams } from "../../plugins/registry-types.js";
import type { OpenClawPluginApi } from "../plugin-entry.js";
import {
createPluginRecord,
@@ -13,7 +14,10 @@ import { uniqueSortedStrings } from "./string-utils.js";
export { registerProviders, requireProvider, uniqueSortedStrings };
export function createPluginRegistryFixture(config = {} as OpenClawConfig) {
export function createPluginRegistryFixture(
config = {} as OpenClawConfig,
params: { hostServices?: PluginRegistryParams["hostServices"] } = {},
) {
return {
config,
registry: createPluginRegistry({
@@ -24,6 +28,7 @@ export function createPluginRegistryFixture(config = {} as OpenClawConfig) {
debug() {},
},
runtime: {} as PluginRuntime,
...(params.hostServices ? { hostServices: params.hostServices } : {}),
}),
};
}

View File

@@ -2,7 +2,9 @@ import {
createPluginRegistryFixture,
registerTestPlugin,
} from "openclaw/plugin-sdk/plugin-test-contracts";
import { afterEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { CronServiceContract } from "../../cron/service-contract.js";
import type { CronJob, CronJobCreate } from "../../cron/types.js";
import type {
GatewayRequestHandler,
GatewayRequestHandlerOptions,
@@ -27,7 +29,9 @@ import { createPluginRecord } from "../status.test-helpers.js";
import type { OpenClawPluginApi } from "../types.js";
const workflowMocks = vi.hoisted(() => ({
callGatewayTool: vi.fn(),
cronAdd: vi.fn(),
cronListPage: vi.fn(),
cronRemove: vi.fn(),
}));
const WORKFLOW_PLUGIN_ID = "workflow-plugin";
@@ -41,10 +45,6 @@ const DEFAULT_TURN_SCHEDULE = {
type ScheduleSessionTurnRequest = Parameters<typeof schedulePluginSessionTurn>[0];
type SessionTurnSchedule = ScheduleSessionTurnRequest["schedule"];
vi.mock("../../agents/tools/gateway.js", () => ({
callGatewayTool: workflowMocks.callGatewayTool,
}));
async function invokePluginGatewayHandler(params: {
handler: GatewayRequestHandler;
method: string;
@@ -83,19 +83,55 @@ async function invokePluginGatewayHandler(params: {
});
}
function mockCronAdd(response: unknown) {
workflowMocks.callGatewayTool.mockImplementation(async (method: string) => {
if (method === "cron.add") {
return response;
}
return { ok: true };
});
function createMockCronService(): CronServiceContract {
return {
start: vi.fn(async () => undefined),
stop: vi.fn(),
status: vi.fn(async () => ({
enabled: true,
storePath: "/tmp/openclaw-test-cron.json",
jobs: 0,
nextWakeAtMs: null,
})),
list: vi.fn(async () => []),
listPage: workflowMocks.cronListPage,
add: workflowMocks.cronAdd,
update: vi.fn(async (id, patch) => makeCronJob({ id, ...patch })),
remove: workflowMocks.cronRemove,
run: vi.fn(async () => ({ ok: true, ran: false, reason: "not-due" })),
enqueueRun: vi.fn(async () => ({ ok: true, ran: false, reason: "not-due" })),
getJob: vi.fn(() => undefined),
getDefaultAgentId: vi.fn(() => undefined),
wake: vi.fn(() => ({ ok: true })),
} as CronServiceContract;
}
function makeCronJob(input: Partial<CronJob> & { id: string }): CronJob {
return {
name: input.name ?? input.id,
enabled: true,
schedule: { kind: "at", at: "2026-05-01T00:00:00.000Z" },
sessionTarget: input.sessionTarget ?? `session:${MAIN_SESSION_KEY}`,
wakeMode: "now",
payload: { kind: "agentTurn", message: "wake" },
delivery: { mode: "announce", channel: "last" },
state: {},
createdAtMs: 0,
updatedAtMs: 0,
...input,
};
}
const cron = createMockCronService();
function mockCronAdd(response: CronJob) {
workflowMocks.cronAdd.mockResolvedValue(response);
}
function getCronAddBody() {
const addCall = workflowMocks.callGatewayTool.mock.calls.find((args) => args[0] === "cron.add");
const addCall = workflowMocks.cronAdd.mock.calls[0];
expect(addCall).toBeDefined();
return addCall?.[2] as Record<string, unknown>;
return addCall?.[0] as CronJobCreate;
}
function expectSessionTurnHandle(
@@ -123,6 +159,7 @@ async function scheduleWorkflowTurn(
pluginId: WORKFLOW_PLUGIN_ID,
origin,
schedule: { ...DEFAULT_TURN_SCHEDULE, ...schedule } as SessionTurnSchedule,
cron: params.cron ?? cron,
...rest,
});
}
@@ -137,14 +174,29 @@ async function unscheduleWorkflowTurnsByTag(
return await unschedulePluginSessionTurnsByTag({
pluginId: WORKFLOW_PLUGIN_ID,
origin,
cron,
request,
});
}
describe("plugin scheduled turns", () => {
beforeEach(() => {
workflowMocks.cronAdd.mockReset();
workflowMocks.cronListPage.mockReset();
workflowMocks.cronRemove.mockReset();
workflowMocks.cronListPage.mockResolvedValue({
jobs: [],
total: 0,
offset: 0,
limit: 200,
hasMore: false,
nextOffset: null,
});
workflowMocks.cronRemove.mockResolvedValue({ ok: true, removed: true });
});
afterEach(() => {
vi.useRealTimers();
workflowMocks.callGatewayTool.mockReset();
clearPluginLoaderCache();
clearPluginHostRuntimeState();
setActivePluginRegistry(createEmptyPluginRegistry());
@@ -169,7 +221,7 @@ describe("plugin scheduled turns", () => {
});
it("schedules session turns with cron-compatible tagged cleanup metadata", async () => {
mockCronAdd({ payload: { jobId: "job-tagged" } });
mockCronAdd(makeCronJob({ id: "job-tagged" }));
const handle = await scheduleWorkflowTurn({
pluginName: "Workflow Plugin",
@@ -196,7 +248,7 @@ describe("plugin scheduled turns", () => {
});
it("prefixes explicit untagged schedule names with plugin ownership metadata", async () => {
mockCronAdd({ id: "job-untagged" });
mockCronAdd(makeCronJob({ id: "job-untagged" }));
const handle = await scheduleWorkflowTurn({
schedule: {
@@ -210,16 +262,13 @@ describe("plugin scheduled turns", () => {
it("builds payloads accepted by the real cron.add protocol validator", async () => {
const { validateCronAddParams } = await import("../../gateway/protocol/index.js");
workflowMocks.callGatewayTool.mockImplementation(async (method: string, _opts, body) => {
if (method === "cron.add") {
expect(validateCronAddParams(body)).toBe(true);
expect((body as { delivery?: unknown }).delivery).toEqual({
mode: "announce",
channel: "last",
});
return { id: "cron-compatible-job" };
}
return { ok: true };
workflowMocks.cronAdd.mockImplementation(async (body: unknown) => {
expect(validateCronAddParams(body)).toBe(true);
expect((body as { delivery?: unknown }).delivery).toEqual({
mode: "announce",
channel: "last",
});
return makeCronJob({ id: "cron-compatible-job" });
});
const handle = await scheduleWorkflowTurn({
@@ -233,43 +282,44 @@ describe("plugin scheduled turns", () => {
it("pages through cron.list when unscheduling tagged turns", async () => {
const removed: string[] = [];
const listRequests: unknown[] = [];
workflowMocks.callGatewayTool.mockImplementation(
async (method: string, _opts: unknown, body: unknown) => {
if (method === "cron.list") {
const offset = (body as { offset?: unknown }).offset;
listRequests.push(body);
if (offset === undefined) {
return {
jobs: [
{
id: "job-page-1",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1",
sessionTarget: "session:agent:main:main",
},
],
hasMore: true,
nextOffset: 200,
};
}
return {
jobs: [
{
id: "job-page-2",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:2",
sessionTarget: "session:agent:main:main",
},
],
hasMore: false,
nextOffset: null,
};
}
if (method === "cron.remove") {
removed.push((body as { id?: string }).id ?? "");
return { ok: true, removed: true };
}
return { ok: true };
},
);
workflowMocks.cronListPage.mockImplementation(async (body: unknown) => {
const offset = (body as { offset?: unknown }).offset;
listRequests.push(body);
if (offset === undefined) {
return {
jobs: [
makeCronJob({
id: "job-page-1",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1",
sessionTarget: "session:agent:main:main",
}),
],
total: 2,
offset: 0,
limit: 200,
hasMore: true,
nextOffset: 200,
};
}
return {
jobs: [
makeCronJob({
id: "job-page-2",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:2",
sessionTarget: "session:agent:main:main",
}),
],
total: 2,
offset: 200,
limit: 200,
hasMore: false,
nextOffset: null,
};
});
workflowMocks.cronRemove.mockImplementation(async (id: string) => {
removed.push(id);
return { ok: true, removed: true };
});
await expect(unscheduleWorkflowTurnsByTag()).resolves.toEqual({ removed: 2, failed: 0 });
expect(listRequests).toEqual([
@@ -293,13 +343,7 @@ describe("plugin scheduled turns", () => {
});
it("tracks scheduled session turns using cron.add's top-level job id", async () => {
workflowMocks.callGatewayTool.mockResolvedValueOnce({
id: "cron-top-level-id",
payload: {
id: "payload-body-id",
kind: "agentTurn",
},
});
workflowMocks.cronAdd.mockResolvedValueOnce(makeCronJob({ id: "cron-top-level-id" }));
await expect(
scheduleWorkflowTurn({
@@ -325,15 +369,10 @@ describe("plugin scheduled turns", () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-01T00:00:00.000Z"));
const removed: string[] = [];
workflowMocks.callGatewayTool.mockImplementation(async (method: string, _opts, body) => {
if (method === "cron.add") {
return { id: "one-shot-job" };
}
if (method === "cron.remove") {
removed.push((body as { id?: string }).id ?? "");
return { ok: true, removed: false };
}
return { ok: true };
workflowMocks.cronAdd.mockResolvedValue(makeCronJob({ id: "one-shot-job" }));
workflowMocks.cronRemove.mockImplementation(async (id: string) => {
removed.push(id);
return { ok: true, removed: false };
});
await expect(
@@ -412,11 +451,11 @@ describe("plugin scheduled turns", () => {
},
}),
).resolves.toBeUndefined();
expect(workflowMocks.callGatewayTool).not.toHaveBeenCalled();
expect(workflowMocks.cronAdd).not.toHaveBeenCalled();
});
it("falls back to a valid delay schedule when a malformed cron value is absent", async () => {
mockCronAdd({ id: "delay-job" });
mockCronAdd(makeCronJob({ id: "delay-job" }));
const handle = await scheduleWorkflowTurn({
schedule: {
@@ -431,19 +470,14 @@ describe("plugin scheduled turns", () => {
it("removes a stale cron job when the plugin unloads after cron.add", async () => {
let commit = true;
const removed: string[] = [];
workflowMocks.callGatewayTool.mockImplementation(
async (method: string, _opts: unknown, body: unknown) => {
if (method === "cron.add") {
commit = false;
return { payload: { jobId: "job-stale" } };
}
if (method === "cron.remove") {
removed.push((body as { id?: string }).id ?? "");
return { ok: true, removed: true };
}
return { ok: true };
},
);
workflowMocks.cronAdd.mockImplementation(async () => {
commit = false;
return makeCronJob({ id: "job-stale" });
});
workflowMocks.cronRemove.mockImplementation(async (id: string) => {
removed.push(id);
return { ok: true, removed: true };
});
await expect(
scheduleWorkflowTurn({
@@ -472,15 +506,8 @@ describe("plugin scheduled turns", () => {
}
};`,
});
workflowMocks.callGatewayTool.mockImplementation(async (method: string) => {
if (method === "cron.add") {
return { id: "loader-scheduled-job" };
}
if (method === "cron.remove") {
return { ok: true, removed: true };
}
return { ok: true };
});
workflowMocks.cronAdd.mockResolvedValue(makeCronJob({ id: "loader-scheduled-job" }));
workflowMocks.cronRemove.mockResolvedValue({ ok: true, removed: true });
const registry = withEnv(
{
@@ -490,6 +517,7 @@ describe("plugin scheduled turns", () => {
() =>
loadOpenClawPlugins({
cache: false,
hostServices: { cron },
config: {
plugins: {
enabled: true,
@@ -506,16 +534,14 @@ describe("plugin scheduled turns", () => {
expect(registry.plugins.find((plugin) => plugin.id === "loader-scheduler")?.status).toBe(
"loaded",
);
await vi.waitFor(() => {
const cronAddCall = workflowMocks.callGatewayTool.mock.calls.find(
(args) => args[0] === "cron.add",
);
expect(cronAddCall).toBeDefined();
const body = cronAddCall?.[2] as Record<string, unknown> | undefined;
expect(body?.sessionTarget).toBe("session:agent:main:main");
expect(body?.payload).toEqual({ kind: "agentTurn", message: "wake" });
expect(cronAddCall?.[3]).toEqual({ scopes: ["operator.admin"] });
});
await vi.waitFor(() =>
expect(workflowMocks.cronAdd).toHaveBeenCalledWith(
expect.objectContaining({
sessionTarget: "session:agent:main:main",
payload: { kind: "agentTurn", message: "wake" },
}),
),
);
expect(listPluginSessionSchedulerJobs("loader-scheduler")).toEqual([
{
id: "loader-scheduled-job",
@@ -581,32 +607,32 @@ describe("plugin scheduled turns", () => {
});
const addedJobs: Array<Record<string, unknown>> = [];
const removedJobIds = new Set<string>();
workflowMocks.callGatewayTool.mockImplementation(async (method: string, _opts, body) => {
if (method === "cron.add") {
const id = `loader-scheduled-job-${addedJobs.length + 1}`;
addedJobs.push({
id,
...(body as Record<string, unknown>),
});
return { id };
workflowMocks.cronAdd.mockImplementation(async (body: CronJobCreate) => {
const id = `loader-scheduled-job-${addedJobs.length + 1}`;
addedJobs.push({
id,
...(body as Record<string, unknown>),
});
return makeCronJob({ id, ...body });
});
workflowMocks.cronListPage.mockImplementation(async () => ({
jobs: addedJobs
.filter((job) => {
const id = typeof job.id === "string" ? job.id : "";
return id && !removedJobIds.has(id);
})
.map((job) => makeCronJob(job as Partial<CronJob> & { id: string })),
total: addedJobs.length,
offset: 0,
limit: 200,
hasMore: false,
nextOffset: null,
}));
workflowMocks.cronRemove.mockImplementation(async (id: string) => {
if (id) {
removedJobIds.add(id);
}
if (method === "cron.list") {
return {
jobs: addedJobs.filter((job) => {
const id = typeof job.id === "string" ? job.id : "";
return id && !removedJobIds.has(id);
}),
};
}
if (method === "cron.remove") {
const id =
typeof (body as { id?: unknown })?.id === "string" ? (body as { id: string }).id : "";
if (id) {
removedJobIds.add(id);
}
return { ok: true, removed: true };
}
return { ok: true };
return { ok: true, removed: true };
});
const registry = withEnv(
@@ -617,6 +643,7 @@ describe("plugin scheduled turns", () => {
() =>
loadOpenClawPlugins({
cache: false,
hostServices: { cron },
config: {
plugins: {
enabled: true,
@@ -682,18 +709,13 @@ describe("plugin scheduled turns", () => {
it("keeps stale scheduled-turn rollback non-throwing when cron cleanup fails", async () => {
let commit = true;
workflowMocks.callGatewayTool.mockImplementation(
async (method: string, _opts: unknown, body: unknown) => {
if (method === "cron.add") {
commit = false;
return { id: "job-stale" };
}
if (method === "cron.remove") {
throw new Error(`remove failed for ${(body as { id?: string }).id}`);
}
return { ok: true };
},
);
workflowMocks.cronAdd.mockImplementation(async () => {
commit = false;
return makeCronJob({ id: "job-stale" });
});
workflowMocks.cronRemove.mockImplementation(async (id: string) => {
throw new Error(`remove failed for ${id}`);
});
await expect(
scheduleWorkflowTurn({
@@ -705,17 +727,10 @@ describe("plugin scheduled turns", () => {
});
it("keeps scheduled-turn records when cleanup fails", async () => {
workflowMocks.callGatewayTool.mockImplementation(
async (method: string, _opts: unknown, body: unknown) => {
if (method === "cron.add") {
return { id: "cleanup-failure-job" };
}
if (method === "cron.remove") {
throw new Error(`remove failed for ${(body as { id?: string }).id}`);
}
return { ok: true };
},
);
workflowMocks.cronAdd.mockResolvedValue(makeCronJob({ id: "cleanup-failure-job" }));
workflowMocks.cronRemove.mockImplementation(async (id: string) => {
throw new Error(`remove failed for ${id}`);
});
const cleanupFailureHandle = await scheduleWorkflowTurn({
pluginName: "Workflow Plugin",
@@ -741,18 +756,11 @@ describe("plugin scheduled turns", () => {
it("cleans live dynamic scheduled turns when registry cleanup records are empty", async () => {
const removed: string[] = [];
workflowMocks.callGatewayTool.mockImplementation(
async (method: string, _opts: unknown, body: unknown) => {
if (method === "cron.add") {
return { id: "dynamic-cleanup-job" };
}
if (method === "cron.remove") {
removed.push((body as { id?: string }).id ?? "");
return { ok: true, removed: true };
}
return { ok: true };
},
);
workflowMocks.cronAdd.mockResolvedValue(makeCronJob({ id: "dynamic-cleanup-job" }));
workflowMocks.cronRemove.mockImplementation(async (id: string) => {
removed.push(id);
return { ok: true, removed: true };
});
const dynamicCleanupHandle = await scheduleWorkflowTurn();
expectSessionTurnHandle(dynamicCleanupHandle, "dynamic-cleanup-job");
@@ -771,19 +779,13 @@ describe("plugin scheduled turns", () => {
it("preserves replacement-generation runtime scheduled turns during restart cleanup", async () => {
const removed: string[] = [];
const scheduledIds = ["old-runtime-job", "new-runtime-job"];
workflowMocks.callGatewayTool.mockImplementation(
async (method: string, _opts: unknown, body: unknown) => {
if (method === "cron.add") {
const id = scheduledIds.shift() ?? "unexpected-job";
return { id };
}
if (method === "cron.remove") {
removed.push((body as { id?: string }).id ?? "");
return { ok: true, removed: true };
}
return { ok: true };
},
workflowMocks.cronAdd.mockImplementation(async () =>
makeCronJob({ id: scheduledIds.shift() ?? "unexpected-job" }),
);
workflowMocks.cronRemove.mockImplementation(async (id: string) => {
removed.push(id);
return { ok: true, removed: true };
});
const previousFixture = createPluginRegistryFixture();
previousFixture.registry.registry.plugins.push(
@@ -836,18 +838,11 @@ describe("plugin scheduled turns", () => {
it("treats already-missing cron jobs as successful scheduled-turn cleanup", async () => {
const removed: string[] = [];
workflowMocks.callGatewayTool.mockImplementation(
async (method: string, _opts: unknown, body: unknown) => {
if (method === "cron.add") {
return { id: "already-missing-job" };
}
if (method === "cron.remove") {
removed.push((body as { id?: string }).id ?? "");
return { ok: true, removed: false };
}
return { ok: true };
},
);
workflowMocks.cronAdd.mockResolvedValue(makeCronJob({ id: "already-missing-job" }));
workflowMocks.cronRemove.mockImplementation(async (id: string) => {
removed.push(id);
return { ok: true, removed: false };
});
const alreadyMissingHandle = await scheduleWorkflowTurn();
expectSessionTurnHandle(alreadyMissingHandle, "already-missing-job");
@@ -865,42 +860,42 @@ describe("plugin scheduled turns", () => {
it("removes only matching plugin tag jobs in the requested session", async () => {
const removed: string[] = [];
const listQueries: unknown[] = [];
workflowMocks.callGatewayTool.mockImplementation(
async (method: string, _opts: unknown, body: unknown) => {
if (method === "cron.list") {
listQueries.push((body as { query?: unknown }).query);
return {
jobs: [
{
id: "job-a",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1",
sessionTarget: "session:agent:main:main",
},
{
id: "job-b",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:2",
sessionTarget: "session:agent:main:main",
},
{
id: "job-c",
name: "plugin:other-plugin:tag:nudge:agent:main:main:1",
sessionTarget: "session:agent:main:main",
},
{
id: "job-d",
name: "plugin:workflow-plugin:tag:nudge:agent:other:main:1",
sessionTarget: "session:agent:other:main",
},
],
};
}
if (method === "cron.remove") {
removed.push((body as { id?: string }).id ?? "");
return { ok: true, removed: true };
}
return { ok: true };
},
);
workflowMocks.cronListPage.mockImplementation(async (body: unknown) => {
listQueries.push((body as { query?: unknown }).query);
return {
jobs: [
makeCronJob({
id: "job-a",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1",
sessionTarget: "session:agent:main:main",
}),
makeCronJob({
id: "job-b",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:2",
sessionTarget: "session:agent:main:main",
}),
makeCronJob({
id: "job-c",
name: "plugin:other-plugin:tag:nudge:agent:main:main:1",
sessionTarget: "session:agent:main:main",
}),
makeCronJob({
id: "job-d",
name: "plugin:workflow-plugin:tag:nudge:agent:other:main:1",
sessionTarget: "session:agent:other:main",
}),
],
total: 4,
offset: 0,
limit: 200,
hasMore: false,
nextOffset: null,
};
});
workflowMocks.cronRemove.mockImplementation(async (id: string) => {
removed.push(id);
return { ok: true, removed: true };
});
await expect(unscheduleWorkflowTurnsByTag()).resolves.toEqual({ removed: 2, failed: 0 });
expect(listQueries).toEqual(["plugin:workflow-plugin:tag:nudge:agent:main:main:"]);
@@ -909,35 +904,33 @@ describe("plugin scheduled turns", () => {
it("prunes runtime scheduler records after tagged unschedule removes jobs", async () => {
let addCount = 0;
workflowMocks.callGatewayTool.mockImplementation(
async (method: string, _opts: unknown, body: unknown) => {
if (method === "cron.add") {
addCount += 1;
return { id: `job-${addCount}` };
}
if (method === "cron.list") {
return {
jobs: [
{
id: "job-1",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:first",
sessionTarget: "session:agent:main:main",
},
{
id: "job-2",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:second",
sessionTarget: "session:agent:main:main",
},
],
};
}
if (method === "cron.remove") {
expect(["job-1", "job-2"]).toContain((body as { id?: unknown }).id);
return { ok: true, removed: true };
}
return { ok: true };
},
);
workflowMocks.cronAdd.mockImplementation(async () => {
addCount += 1;
return makeCronJob({ id: `job-${addCount}` });
});
workflowMocks.cronListPage.mockResolvedValue({
jobs: [
makeCronJob({
id: "job-1",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:first",
sessionTarget: "session:agent:main:main",
}),
makeCronJob({
id: "job-2",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:second",
sessionTarget: "session:agent:main:main",
}),
],
total: 2,
offset: 0,
limit: 200,
hasMore: false,
nextOffset: null,
});
workflowMocks.cronRemove.mockImplementation(async (id: string) => {
expect(["job-1", "job-2"]).toContain(id);
return { ok: true, removed: true };
});
await scheduleWorkflowTurn({
schedule: {
@@ -960,61 +953,57 @@ describe("plugin scheduled turns", () => {
});
it("counts cron.list and cron.remove failures when unscheduling by tag", async () => {
workflowMocks.callGatewayTool.mockRejectedValueOnce(new Error("cron list unavailable"));
workflowMocks.cronListPage.mockRejectedValueOnce(new Error("cron list unavailable"));
await expect(unscheduleWorkflowTurnsByTag()).resolves.toEqual({ removed: 0, failed: 1 });
workflowMocks.callGatewayTool.mockReset();
workflowMocks.callGatewayTool.mockImplementation(
async (method: string, _opts: unknown, body: unknown) => {
if (method === "cron.list") {
return {
jobs: [
{
id: "job-ok",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1",
sessionTarget: "session:agent:main:main",
},
{
id: "job-fail",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:2",
sessionTarget: "session:agent:main:main",
},
],
};
}
if (method === "cron.remove" && (body as { id?: string }).id === "job-fail") {
throw new Error("remove failed");
}
if (method === "cron.remove") {
return { ok: true, removed: true };
}
return { ok: true };
},
);
workflowMocks.cronListPage.mockReset();
workflowMocks.cronListPage.mockResolvedValue({
jobs: [
makeCronJob({
id: "job-ok",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1",
sessionTarget: "session:agent:main:main",
}),
makeCronJob({
id: "job-fail",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:2",
sessionTarget: "session:agent:main:main",
}),
],
total: 2,
offset: 0,
limit: 200,
hasMore: false,
nextOffset: null,
});
workflowMocks.cronRemove.mockImplementation(async (id: string) => {
if (id === "job-fail") {
throw new Error("remove failed");
}
return { ok: true, removed: true };
});
await expect(unscheduleWorkflowTurnsByTag()).resolves.toEqual({ removed: 1, failed: 1 });
workflowMocks.callGatewayTool.mockReset();
workflowMocks.callGatewayTool.mockImplementation(
async (method: string, _opts: unknown, body: unknown) => {
if (method === "cron.list") {
return {
jobs: [
{
id: "job-missing",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1",
sessionTarget: "session:agent:main:main",
},
],
};
}
if (method === "cron.remove") {
expect((body as { id?: string }).id).toBe("job-missing");
return { ok: true, removed: false };
}
return { ok: true };
},
);
workflowMocks.cronListPage.mockReset();
workflowMocks.cronListPage.mockResolvedValue({
jobs: [
makeCronJob({
id: "job-missing",
name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1",
sessionTarget: "session:agent:main:main",
}),
],
total: 1,
offset: 0,
limit: 200,
hasMore: false,
nextOffset: null,
});
workflowMocks.cronRemove.mockImplementation(async (id: string) => {
expect(id).toBe("job-missing");
return { ok: true, removed: false };
});
await expect(unscheduleWorkflowTurnsByTag()).resolves.toEqual({ removed: 0, failed: 1 });
});
@@ -1030,23 +1019,13 @@ describe("plugin scheduled turns", () => {
await expect(
unscheduleWorkflowTurnsByTag({ sessionKey: MAIN_SESSION_KEY, tag: "nudge:followup" }),
).resolves.toEqual({ removed: 0, failed: 0 });
expect(workflowMocks.callGatewayTool).not.toHaveBeenCalled();
expect(workflowMocks.cronListPage).not.toHaveBeenCalled();
expect(workflowMocks.cronRemove).not.toHaveBeenCalled();
});
it("wires schedule and unschedule through the plugin API with stale-registry protection", async () => {
workflowMocks.callGatewayTool.mockImplementation(async (method: string) => {
if (method === "cron.add") {
return { payload: { jobId: "job-live" } };
}
if (method === "cron.list") {
return { jobs: [] };
}
if (method === "cron.remove") {
return { ok: true, removed: true };
}
return { ok: true };
});
const { config, registry } = createPluginRegistryFixture();
workflowMocks.cronAdd.mockResolvedValue(makeCronJob({ id: "job-live" }));
const { config, registry } = createPluginRegistryFixture({}, { hostServices: { cron } });
let capturedApi: OpenClawPluginApi | undefined;
registerTestPlugin({
registry,
@@ -1091,16 +1070,98 @@ describe("plugin scheduled turns", () => {
).resolves.toEqual({ removed: 0, failed: 0 });
});
it("resolves live cron service for captured plugin scheduled-turn APIs", async () => {
const firstCron = createMockCronService();
const secondCron = createMockCronService();
const firstAdd = vi.fn(async () => makeCronJob({ id: "first-cron-job" }));
const secondAdd = vi.fn(async () => makeCronJob({ id: "second-cron-job" }));
const firstListPage = vi.fn(async () => {
throw new Error("stale cron list used");
});
const firstRemove = vi.fn(async () => {
throw new Error("stale cron remove used");
});
const secondListPage = vi.fn(async () => ({
jobs: [
makeCronJob({
id: "second-cron-existing-job",
name: "plugin:scheduler-plugin:tag:nudge:agent:main:main:1",
sessionTarget: "session:agent:main:main",
}),
],
total: 1,
offset: 0,
limit: 200,
hasMore: false,
nextOffset: null,
}));
const secondRemove = vi.fn(async () => ({ ok: true, removed: true }) as const);
firstCron.add = firstAdd;
firstCron.listPage = firstListPage;
firstCron.remove = firstRemove;
secondCron.add = secondAdd;
secondCron.listPage = secondListPage;
secondCron.remove = secondRemove;
let liveCron = firstCron;
const hostServices = {
get cron() {
return liveCron;
},
};
const { config, registry } = createPluginRegistryFixture({}, { hostServices });
let capturedApi: OpenClawPluginApi | undefined;
registerTestPlugin({
registry,
config,
record: createPluginRecord({
id: "scheduler-plugin",
name: "Scheduler Plugin",
origin: "bundled",
}),
register(api) {
capturedApi = api;
},
});
setActivePluginRegistry(registry.registry);
await expect(
capturedApi?.session.workflow.scheduleSessionTurn({
sessionKey: "agent:main:main",
message: "wake",
delayMs: 10,
}),
).resolves.toMatchObject({ id: "first-cron-job" });
liveCron = secondCron;
await expect(
capturedApi?.session.workflow.scheduleSessionTurn({
sessionKey: "agent:main:main",
message: "wake again",
delayMs: 10,
}),
).resolves.toMatchObject({ id: "second-cron-job" });
await expect(
capturedApi?.session.workflow.unscheduleSessionTurnsByTag({
sessionKey: "agent:main:main",
tag: "nudge",
}),
).resolves.toEqual({ removed: 1, failed: 0 });
expect(firstAdd).toHaveBeenCalledTimes(1);
expect(secondAdd).toHaveBeenCalledTimes(1);
expect(firstListPage).not.toHaveBeenCalled();
expect(firstRemove).not.toHaveBeenCalled();
expect(secondListPage).toHaveBeenCalledTimes(1);
expect(secondRemove).toHaveBeenCalledWith("second-cron-existing-job");
});
it("blocks registration-time schedule and unschedule calls before activation", async () => {
// Drain any cleanup microtasks queued by the previous test's
// setActivePluginRegistry calls; setActivePluginRegistry schedules
// cleanup via fire-and-forget dynamic imports that may resolve and
// invoke callGatewayTool after this test's mockReset.
// cleanup via fire-and-forget dynamic imports that may resolve after
// this test's mockReset.
for (let i = 0; i < 8; i++) {
await Promise.resolve();
}
workflowMocks.callGatewayTool.mockReset();
workflowMocks.callGatewayTool.mockResolvedValue({ ok: true });
const activeFixture = createPluginRegistryFixture();
setActivePluginRegistry(activeFixture.registry.registry);
@@ -1127,6 +1188,8 @@ describe("plugin scheduled turns", () => {
tag: "nudge",
}),
).resolves.toEqual({ removed: 0, failed: 0 });
expect(workflowMocks.callGatewayTool).not.toHaveBeenCalled();
expect(workflowMocks.cronAdd).not.toHaveBeenCalled();
expect(workflowMocks.cronListPage).not.toHaveBeenCalled();
expect(workflowMocks.cronRemove).not.toHaveBeenCalled();
});
});

View File

@@ -1,5 +1,6 @@
import { randomUUID } from "node:crypto";
import { ADMIN_SCOPE } from "../gateway/operator-scopes.js";
import type { CronServiceContract } from "../cron/service-contract.js";
import type { CronJob, CronJobCreate } from "../cron/types.js";
import { formatErrorMessage } from "../infra/errors.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
@@ -20,9 +21,6 @@ const log = createSubsystemLogger("plugins/host-scheduled-turns");
const PLUGIN_CRON_NAME_PREFIX = "plugin:";
const PLUGIN_CRON_TAG_MARKER = ":tag:";
type CallGatewayTool = typeof import("../agents/tools/gateway.js").callGatewayTool;
let callGatewayToolPromise: Promise<CallGatewayTool> | undefined;
type ResolvedSessionTurnSchedule =
| {
kind: "cron";
@@ -34,16 +32,6 @@ type ResolvedSessionTurnSchedule =
at: string;
};
async function callGatewayToolLazy(
...args: Parameters<CallGatewayTool>
): Promise<Awaited<ReturnType<CallGatewayTool>>> {
callGatewayToolPromise ??= import("../agents/tools/gateway.js").then(
(module) => module.callGatewayTool,
);
const callGatewayTool = await callGatewayToolPromise;
return callGatewayTool(...args);
}
function resolveSchedule(
params: PluginSessionTurnScheduleParams,
): ResolvedSessionTurnSchedule | undefined {
@@ -108,18 +96,14 @@ function formatScheduleLogContext(params: {
}
async function removeScheduledSessionTurn(params: {
cron: CronServiceContract;
jobId: string;
pluginId: string;
sessionKey?: string;
name?: string;
}): Promise<boolean> {
try {
const result = await callGatewayToolLazy(
"cron.remove",
{},
{ id: params.jobId },
{ scopes: [ADMIN_SCOPE] },
);
const result = await params.cron.remove(params.jobId);
return didCronCleanupJob(result);
} catch (error) {
log.warn(
@@ -129,48 +113,12 @@ async function removeScheduledSessionTurn(params: {
}
}
function unwrapGatewayPayload(value: unknown): unknown {
if (!isCronJobRecord(value)) {
return value;
}
const payload = value.payload;
return isCronJobRecord(payload) ? payload : value;
}
function didCronRemoveJob(value: unknown): boolean {
const result = unwrapGatewayPayload(value);
if (!isCronJobRecord(result)) {
return false;
}
return result.ok !== false && result.removed === true;
return isCronRemoveResult(value) && value.ok && value.removed;
}
function didCronCleanupJob(value: unknown): boolean {
const result = unwrapGatewayPayload(value);
if (!isCronJobRecord(result) || result.ok === false) {
return false;
}
return result.removed === true || result.removed === false;
}
function normalizeCronJobId(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}
function extractCronJobId(value: unknown): string | undefined {
if (!value || typeof value !== "object") {
return undefined;
}
const record = value as Record<string, unknown>;
const topLevelId = normalizeCronJobId(record.jobId ?? record.id);
if (topLevelId) {
return topLevelId;
}
const payload =
record.payload && typeof record.payload === "object"
? (record.payload as Record<string, unknown>)
: record;
return normalizeCronJobId(payload.jobId ?? payload.id);
return isCronRemoveResult(value) && value.ok;
}
const PLUGIN_CRON_RESERVED_DELIMITER = ":";
@@ -210,65 +158,41 @@ function buildPluginSchedulerTagPrefix(params: {
return `${PLUGIN_CRON_NAME_PREFIX}${params.pluginId}${PLUGIN_CRON_TAG_MARKER}${params.tag}:${params.sessionKey}:`;
}
function isCronJobRecord(value: unknown): value is Record<string, unknown> {
return Boolean(value) && typeof value === "object" && !Array.isArray(value);
}
function readCronListJobs(value: unknown): Record<string, unknown>[] {
if (Array.isArray(value)) {
return value.filter(isCronJobRecord);
}
if (isCronJobRecord(value)) {
const jobs = (value as { jobs?: unknown }).jobs;
if (Array.isArray(jobs)) {
return jobs.filter(isCronJobRecord);
}
}
return [];
}
function readCronListNextOffset(value: unknown): number | undefined {
if (!isCronJobRecord(value)) {
return undefined;
}
const nextOffset = value.nextOffset;
return typeof nextOffset === "number" && Number.isInteger(nextOffset) && nextOffset >= 0
? nextOffset
: undefined;
}
function readCronListHasMore(value: unknown): boolean {
return isCronJobRecord(value) && value.hasMore === true;
function isCronRemoveResult(
value: unknown,
): value is Awaited<ReturnType<CronServiceContract["remove"]>> {
return (
Boolean(value) &&
typeof value === "object" &&
!Array.isArray(value) &&
typeof (value as { ok?: unknown }).ok === "boolean" &&
typeof (value as { removed?: unknown }).removed === "boolean"
);
}
async function listAllCronJobsForPluginTagCleanup(
cron: CronServiceContract,
query: string,
): Promise<Record<string, unknown>[]> {
const jobs: Record<string, unknown>[] = [];
): Promise<CronJob[]> {
const jobs: CronJob[] = [];
let offset = 0;
for (;;) {
const listResult = await callGatewayToolLazy(
"cron.list",
{},
{
includeDisabled: true,
limit: 200,
query,
sortBy: "name",
sortDir: "asc",
...(offset > 0 ? { offset } : {}),
},
{ scopes: [ADMIN_SCOPE] },
);
jobs.push(...readCronListJobs(listResult));
if (!readCronListHasMore(listResult)) {
const listResult = await cron.listPage({
includeDisabled: true,
limit: 200,
query,
sortBy: "name",
sortDir: "asc",
...(offset > 0 ? { offset } : {}),
});
jobs.push(...listResult.jobs);
if (!listResult.hasMore) {
return jobs;
}
const nextOffset = readCronListNextOffset(listResult);
if (nextOffset === undefined || nextOffset <= offset) {
if (listResult.nextOffset === null || listResult.nextOffset <= offset) {
return jobs;
}
offset = nextOffset;
offset = listResult.nextOffset;
}
}
@@ -278,6 +202,7 @@ export async function schedulePluginSessionTurn(params: {
origin?: PluginOrigin;
schedule: PluginSessionTurnScheduleParams;
shouldCommit?: () => boolean;
cron?: CronServiceContract;
ownerRegistry?: PluginRegistry;
}): Promise<PluginSessionSchedulerJobHandle | undefined> {
if (params.origin !== "bundled") {
@@ -330,36 +255,43 @@ export async function schedulePluginSessionTurn(params: {
if (params.shouldCommit && !params.shouldCommit()) {
return undefined;
}
if (!params.cron) {
log.warn(
`plugin session turn scheduling failed (${formatScheduleLogContext({
pluginId: params.pluginId,
sessionKey,
...(scheduleName ? { name: scheduleName } : {}),
})}): cron service unavailable`,
);
return undefined;
}
const cron = params.cron;
const cronJobName = buildPluginSchedulerCronName({
pluginId: params.pluginId,
sessionKey,
...(tag !== undefined ? { tag } : {}),
...(scheduleName ? { uniqueId: scheduleName } : {}),
});
const cronPayload: Record<string, unknown> = {
const cronPayload: CronJobCreate["payload"] = {
kind: "agentTurn",
message,
};
let result: unknown;
let result: Awaited<ReturnType<CronServiceContract["add"]>>;
try {
result = await callGatewayToolLazy(
"cron.add",
{},
{
name: cronJobName,
schedule: cronSchedule,
sessionTarget: `session:${sessionKey}`,
payload: cronPayload,
...(params.schedule.agentId ? { agentId: params.schedule.agentId } : {}),
deleteAfterRun: params.schedule.deleteAfterRun ?? cronSchedule.kind === "at",
wakeMode: "now",
delivery: {
mode: cronDeliveryMode,
...(cronDeliveryMode === "announce" ? { channel: "last" } : {}),
},
result = await cron.add({
name: cronJobName,
enabled: true,
schedule: cronSchedule,
sessionTarget: `session:${sessionKey}`,
payload: cronPayload,
...(params.schedule.agentId ? { agentId: params.schedule.agentId } : {}),
deleteAfterRun: params.schedule.deleteAfterRun ?? cronSchedule.kind === "at",
wakeMode: "now",
delivery: {
mode: cronDeliveryMode,
...(cronDeliveryMode === "announce" ? { channel: "last" } : {}),
},
{ scopes: [ADMIN_SCOPE] },
);
});
} catch (error) {
log.warn(
`plugin session turn scheduling failed (${formatScheduleLogContext({
@@ -370,12 +302,13 @@ export async function schedulePluginSessionTurn(params: {
);
return undefined;
}
const jobId = extractCronJobId(result);
const jobId = result.id;
if (!jobId) {
return undefined;
}
if (params.shouldCommit && !params.shouldCommit()) {
const removed = await removeScheduledSessionTurn({
cron,
jobId,
pluginId: params.pluginId,
sessionKey,
@@ -403,6 +336,7 @@ export async function schedulePluginSessionTurn(params: {
kind: "session-turn",
cleanup: async () => {
const removed = await removeScheduledSessionTurn({
cron,
jobId,
pluginId: params.pluginId,
sessionKey,
@@ -420,6 +354,7 @@ export async function schedulePluginSessionTurn(params: {
export async function unschedulePluginSessionTurnsByTag(params: {
pluginId: string;
origin?: PluginOrigin;
cron?: CronServiceContract;
request: PluginSessionTurnUnscheduleByTagParams;
}): Promise<PluginSessionTurnUnscheduleByTagResult> {
if (params.origin !== "bundled") {
@@ -430,37 +365,35 @@ export async function unschedulePluginSessionTurnsByTag(params: {
if (!sessionKey || !tag || invalidTag) {
return { removed: 0, failed: 0 };
}
if (!params.cron) {
log.warn("plugin session turn untag-list failed: cron service unavailable");
return { removed: 0, failed: 1 };
}
const cron = params.cron;
const namePrefix = buildPluginSchedulerTagPrefix({
pluginId: params.pluginId,
tag,
sessionKey,
});
let jobs: Record<string, unknown>[];
let jobs: CronJob[];
try {
jobs = await listAllCronJobsForPluginTagCleanup(namePrefix);
jobs = await listAllCronJobsForPluginTagCleanup(cron, namePrefix);
} catch (error) {
log.warn(`plugin session turn untag-list failed: ${formatErrorMessage(error)}`);
return { removed: 0, failed: 1 };
}
const candidates = jobs.filter((job) => {
const name = typeof job.name === "string" ? job.name : "";
const target = typeof job.sessionTarget === "string" ? job.sessionTarget : "";
return name.startsWith(namePrefix) && target === `session:${sessionKey}`;
return job.name.startsWith(namePrefix) && job.sessionTarget === `session:${sessionKey}`;
});
let removed = 0;
let failed = 0;
for (const job of candidates) {
const id = typeof job.id === "string" ? job.id.trim() : "";
const id = job.id.trim();
if (!id) {
continue;
}
try {
const result = await callGatewayToolLazy(
"cron.remove",
{},
{ id },
{ scopes: [ADMIN_SCOPE] },
);
const result = await cron.remove(id);
if (didCronRemoveJob(result)) {
removed += 1;
deletePluginSessionSchedulerJob({

View File

@@ -122,6 +122,7 @@ import {
} from "./plugin-scope.js";
import { ensureOpenClawPluginSdkAlias } from "./plugin-sdk-dist-alias.js";
import { createEmptyPluginRegistry } from "./registry-empty.js";
import type { PluginRegistryParams } from "./registry-types.js";
import { createPluginRegistry, type PluginRecord, type PluginRegistry } from "./registry.js";
import {
getActivePluginRegistry,
@@ -169,6 +170,7 @@ export type PluginLoadOptions = {
logger?: PluginLogger;
coreGatewayHandlers?: Record<string, GatewayRequestHandler>;
coreGatewayMethodNames?: readonly string[];
hostServices?: PluginRegistryParams["hostServices"];
runtimeOptions?: CreatePluginRuntimeOptions;
pluginSdkResolution?: PluginSdkResolutionPreference;
cache?: boolean;
@@ -1647,6 +1649,9 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi
...(options.coreGatewayMethodNames !== undefined && {
coreGatewayMethodNames: options.coreGatewayMethodNames,
}),
...(options.hostServices !== undefined && {
hostServices: options.hostServices,
}),
activateGlobalSideEffects: shouldActivate,
});

View File

@@ -473,6 +473,10 @@ export type PluginRegistryParams = {
coreGatewayHandlers?: GatewayRequestHandlers;
coreGatewayMethodNames?: readonly string[];
runtime: PluginRuntime;
hostServices?: {
/** May be a live accessor; plugin APIs must read it at call time. */
cron?: import("../cron/service-contract.js").CronServiceContract;
};
activateGlobalSideEffects?: boolean;
};

View File

@@ -339,6 +339,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
).toSorted();
registry.coreGatewayMethodNames = coreGatewayMethodNames;
const coreGatewayMethods = new Set(coreGatewayMethodNames);
const getHostCronService = () => registryParams.hostServices?.cron;
const pluginHookRollback = new Map<string, HookRollbackEntry[]>();
const pluginsWithChannelRegistrationConflict = new Set<string>();
const pluginSideEffectGuards = new Map<string, Set<PluginSideEffectGuard>>();
@@ -2719,6 +2720,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
pluginName: record.name,
origin: record.origin,
schedule,
cron: getHostCronService(),
shouldCommit: isLoadedRecordInActiveRegistry,
ownerRegistry: registry,
});
@@ -2734,6 +2736,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) {
return unschedulePluginSessionTurnsByTag({
pluginId: record.id,
origin: record.origin,
cron: getHostCronService(),
request,
});
},