From 13c735c083fb1d90cbc910ec6014fade514d3bca Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 11 May 2026 03:55:03 +0100 Subject: [PATCH] refactor: call cron service directly for plugin scheduled turns --- src/gateway/server-plugin-bootstrap.ts | 5 + src/gateway/server-plugins.ts | 5 + src/gateway/server-startup-plugins.ts | 5 + src/gateway/server.impl.ts | 8 + .../test-helpers/contracts-testkit.ts | 7 +- .../scheduled-turns.contract.test.ts | 729 ++++++++++-------- src/plugins/host-hook-scheduled-turns.ts | 211 ++--- src/plugins/loader.ts | 5 + src/plugins/registry-types.ts | 4 + src/plugins/registry.ts | 3 + 10 files changed, 509 insertions(+), 473 deletions(-) diff --git a/src/gateway/server-plugin-bootstrap.ts b/src/gateway/server-plugin-bootstrap.ts index c942997d71e..a00c09fbb55 100644 --- a/src/gateway/server-plugin-bootstrap.ts +++ b/src/gateway/server-plugin-bootstrap.ts @@ -2,6 +2,7 @@ import { primeConfiguredBindingRegistry } from "../channels/plugins/binding-regi import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import type { PluginLookUpTable } from "../plugins/plugin-lookup-table.js"; +import type { PluginRegistryParams } from "../plugins/registry-types.js"; import type { PluginRegistry } from "../plugins/registry.js"; import { pinActivePluginChannelRegistry } from "../plugins/runtime.js"; import { @@ -35,6 +36,7 @@ type GatewayPluginBootstrapParams = { log: GatewayPluginBootstrapLog; coreGatewayHandlers?: Record; coreGatewayMethodNames?: readonly string[]; + hostServices?: PluginRegistryParams["hostServices"]; baseMethods: string[]; pluginIds?: string[]; pluginLookUpTable?: PluginLookUpTable; @@ -102,6 +104,9 @@ export function prepareGatewayPluginLoad(params: GatewayPluginBootstrapParams) { ...(params.coreGatewayMethodNames !== undefined && { coreGatewayMethodNames: params.coreGatewayMethodNames, }), + ...(params.hostServices !== undefined && { + hostServices: params.hostServices, + }), baseMethods: params.baseMethods, pluginIds: params.pluginIds, pluginLookUpTable: params.pluginLookUpTable, diff --git a/src/gateway/server-plugins.ts b/src/gateway/server-plugins.ts index 1698a2e2deb..e37db8e8f56 100644 --- a/src/gateway/server-plugins.ts +++ b/src/gateway/server-plugins.ts @@ -8,6 +8,7 @@ import { clearActivatedPluginRuntimeState, loadOpenClawPlugins } from "../plugin import { loadPluginLookUpTable, type PluginLookUpTable } from "../plugins/plugin-lookup-table.js"; import { getPluginModuleLoaderStats } from "../plugins/plugin-module-loader-cache.js"; import { createEmptyPluginRegistry } from "../plugins/registry-empty.js"; +import type { PluginRegistryParams } from "../plugins/registry-types.js"; import { setActivePluginRegistry } from "../plugins/runtime.js"; import { getPluginRuntimeGatewayRequestScope } from "../plugins/runtime/gateway-request-scope.js"; import { createPluginRuntimeLoaderLogger } from "../plugins/runtime/load-context.js"; @@ -527,6 +528,7 @@ export function loadGatewayPlugins(params: { }; coreGatewayHandlers?: Record; coreGatewayMethodNames?: readonly string[]; + hostServices?: PluginRegistryParams["hostServices"]; baseMethods: string[]; pluginIds?: string[]; pluginLookUpTable?: PluginLookUpTable; @@ -616,6 +618,9 @@ export function loadGatewayPlugins(params: { ...(params.coreGatewayMethodNames !== undefined && { coreGatewayMethodNames: params.coreGatewayMethodNames, }), + ...(params.hostServices !== undefined && { + hostServices: params.hostServices, + }), runtimeOptions: { allowGatewaySubagentBinding: true, }, diff --git a/src/gateway/server-startup-plugins.ts b/src/gateway/server-startup-plugins.ts index 6fd34df7ed8..2a7c7255697 100644 --- a/src/gateway/server-startup-plugins.ts +++ b/src/gateway/server-startup-plugins.ts @@ -4,6 +4,7 @@ import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { loadPluginLookUpTable } from "../plugins/plugin-lookup-table.js"; import type { PluginMetadataSnapshot } from "../plugins/plugin-metadata-snapshot.js"; +import type { PluginRegistryParams } from "../plugins/registry-types.js"; import { createEmptyPluginRegistry } from "../plugins/registry.js"; import { getActivePluginRegistry, setActivePluginRegistry } from "../plugins/runtime.js"; import { mergeActivationSectionsIntoRuntimeConfig } from "./plugin-activation-runtime-config.js"; @@ -151,6 +152,7 @@ export async function loadGatewayStartupPluginRuntime(params: { workspaceDir: string; log: GatewayPluginBootstrapLog; baseMethods: string[]; + hostServices?: PluginRegistryParams["hostServices"]; startupPluginIds: string[]; pluginLookUpTable?: ReturnType; preferSetupRuntimeForChannelPlugins?: boolean; @@ -165,6 +167,9 @@ export async function loadGatewayStartupPluginRuntime(params: { log: params.log, coreGatewayMethodNames: params.baseMethods, baseMethods: params.baseMethods, + ...(params.hostServices !== undefined && { + hostServices: params.hostServices, + }), pluginIds: params.startupPluginIds, pluginLookUpTable: params.pluginLookUpTable, preferSetupRuntimeForChannelPlugins: params.preferSetupRuntimeForChannelPlugins, diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index e8e3a9e1bea..d832adb96e6 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -882,6 +882,11 @@ export async function startGatewayServer( gatewayMethods: listActiveGatewayMethods(baseGatewayMethods), }); deps.cron = runtimeState.cronState.cron; + const pluginHostServices = { + get cron() { + return runtimeState.cronState.cron; + }, + }; let closePreludeStarted = false; let postReadyMaintenanceTimer: ReturnType | null = null; @@ -1177,6 +1182,7 @@ export async function startGatewayServer( workspaceDir: defaultWorkspaceDir, log, coreGatewayMethodNames: baseMethods, + hostServices: pluginHostServices, baseMethods, pluginLookUpTable: nextPluginLookUpTable, }); @@ -1307,6 +1313,7 @@ export async function startGatewayServer( workspaceDir: defaultWorkspaceDir, log, coreGatewayMethodNames: baseMethods, + hostServices: pluginHostServices, baseMethods, pluginIds: startupPluginIds, pluginLookUpTable, @@ -1415,6 +1422,7 @@ export async function startGatewayServer( workspaceDir: defaultWorkspaceDir, log, baseMethods, + hostServices: pluginHostServices, startupPluginIds, pluginLookUpTable, startupTrace, diff --git a/src/plugin-sdk/test-helpers/contracts-testkit.ts b/src/plugin-sdk/test-helpers/contracts-testkit.ts index 75aaa70e0f4..28f16cd9656 100644 --- a/src/plugin-sdk/test-helpers/contracts-testkit.ts +++ b/src/plugin-sdk/test-helpers/contracts-testkit.ts @@ -1,3 +1,4 @@ +import type { PluginRegistryParams } from "../../plugins/registry-types.js"; import type { OpenClawPluginApi } from "../plugin-entry.js"; import { createPluginRecord, @@ -13,7 +14,10 @@ import { uniqueSortedStrings } from "./string-utils.js"; export { registerProviders, requireProvider, uniqueSortedStrings }; -export function createPluginRegistryFixture(config = {} as OpenClawConfig) { +export function createPluginRegistryFixture( + config = {} as OpenClawConfig, + params: { hostServices?: PluginRegistryParams["hostServices"] } = {}, +) { return { config, registry: createPluginRegistry({ @@ -24,6 +28,7 @@ export function createPluginRegistryFixture(config = {} as OpenClawConfig) { debug() {}, }, runtime: {} as PluginRuntime, + ...(params.hostServices ? { hostServices: params.hostServices } : {}), }), }; } diff --git a/src/plugins/contracts/scheduled-turns.contract.test.ts b/src/plugins/contracts/scheduled-turns.contract.test.ts index 31408f4359f..4c48c26231b 100644 --- a/src/plugins/contracts/scheduled-turns.contract.test.ts +++ b/src/plugins/contracts/scheduled-turns.contract.test.ts @@ -2,7 +2,9 @@ import { createPluginRegistryFixture, registerTestPlugin, } from "openclaw/plugin-sdk/plugin-test-contracts"; -import { afterEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { CronServiceContract } from "../../cron/service-contract.js"; +import type { CronJob, CronJobCreate } from "../../cron/types.js"; import type { GatewayRequestHandler, GatewayRequestHandlerOptions, @@ -27,7 +29,9 @@ import { createPluginRecord } from "../status.test-helpers.js"; import type { OpenClawPluginApi } from "../types.js"; const workflowMocks = vi.hoisted(() => ({ - callGatewayTool: vi.fn(), + cronAdd: vi.fn(), + cronListPage: vi.fn(), + cronRemove: vi.fn(), })); const WORKFLOW_PLUGIN_ID = "workflow-plugin"; @@ -41,10 +45,6 @@ const DEFAULT_TURN_SCHEDULE = { type ScheduleSessionTurnRequest = Parameters[0]; type SessionTurnSchedule = ScheduleSessionTurnRequest["schedule"]; -vi.mock("../../agents/tools/gateway.js", () => ({ - callGatewayTool: workflowMocks.callGatewayTool, -})); - async function invokePluginGatewayHandler(params: { handler: GatewayRequestHandler; method: string; @@ -83,19 +83,55 @@ async function invokePluginGatewayHandler(params: { }); } -function mockCronAdd(response: unknown) { - workflowMocks.callGatewayTool.mockImplementation(async (method: string) => { - if (method === "cron.add") { - return response; - } - return { ok: true }; - }); +function createMockCronService(): CronServiceContract { + return { + start: vi.fn(async () => undefined), + stop: vi.fn(), + status: vi.fn(async () => ({ + enabled: true, + storePath: "/tmp/openclaw-test-cron.json", + jobs: 0, + nextWakeAtMs: null, + })), + list: vi.fn(async () => []), + listPage: workflowMocks.cronListPage, + add: workflowMocks.cronAdd, + update: vi.fn(async (id, patch) => makeCronJob({ id, ...patch })), + remove: workflowMocks.cronRemove, + run: vi.fn(async () => ({ ok: true, ran: false, reason: "not-due" })), + enqueueRun: vi.fn(async () => ({ ok: true, ran: false, reason: "not-due" })), + getJob: vi.fn(() => undefined), + getDefaultAgentId: vi.fn(() => undefined), + wake: vi.fn(() => ({ ok: true })), + } as CronServiceContract; +} + +function makeCronJob(input: Partial & { id: string }): CronJob { + return { + name: input.name ?? input.id, + enabled: true, + schedule: { kind: "at", at: "2026-05-01T00:00:00.000Z" }, + sessionTarget: input.sessionTarget ?? `session:${MAIN_SESSION_KEY}`, + wakeMode: "now", + payload: { kind: "agentTurn", message: "wake" }, + delivery: { mode: "announce", channel: "last" }, + state: {}, + createdAtMs: 0, + updatedAtMs: 0, + ...input, + }; +} + +const cron = createMockCronService(); + +function mockCronAdd(response: CronJob) { + workflowMocks.cronAdd.mockResolvedValue(response); } function getCronAddBody() { - const addCall = workflowMocks.callGatewayTool.mock.calls.find((args) => args[0] === "cron.add"); + const addCall = workflowMocks.cronAdd.mock.calls[0]; expect(addCall).toBeDefined(); - return addCall?.[2] as Record; + return addCall?.[0] as CronJobCreate; } function expectSessionTurnHandle( @@ -123,6 +159,7 @@ async function scheduleWorkflowTurn( pluginId: WORKFLOW_PLUGIN_ID, origin, schedule: { ...DEFAULT_TURN_SCHEDULE, ...schedule } as SessionTurnSchedule, + cron: params.cron ?? cron, ...rest, }); } @@ -137,14 +174,29 @@ async function unscheduleWorkflowTurnsByTag( return await unschedulePluginSessionTurnsByTag({ pluginId: WORKFLOW_PLUGIN_ID, origin, + cron, request, }); } describe("plugin scheduled turns", () => { + beforeEach(() => { + workflowMocks.cronAdd.mockReset(); + workflowMocks.cronListPage.mockReset(); + workflowMocks.cronRemove.mockReset(); + workflowMocks.cronListPage.mockResolvedValue({ + jobs: [], + total: 0, + offset: 0, + limit: 200, + hasMore: false, + nextOffset: null, + }); + workflowMocks.cronRemove.mockResolvedValue({ ok: true, removed: true }); + }); + afterEach(() => { vi.useRealTimers(); - workflowMocks.callGatewayTool.mockReset(); clearPluginLoaderCache(); clearPluginHostRuntimeState(); setActivePluginRegistry(createEmptyPluginRegistry()); @@ -169,7 +221,7 @@ describe("plugin scheduled turns", () => { }); it("schedules session turns with cron-compatible tagged cleanup metadata", async () => { - mockCronAdd({ payload: { jobId: "job-tagged" } }); + mockCronAdd(makeCronJob({ id: "job-tagged" })); const handle = await scheduleWorkflowTurn({ pluginName: "Workflow Plugin", @@ -196,7 +248,7 @@ describe("plugin scheduled turns", () => { }); it("prefixes explicit untagged schedule names with plugin ownership metadata", async () => { - mockCronAdd({ id: "job-untagged" }); + mockCronAdd(makeCronJob({ id: "job-untagged" })); const handle = await scheduleWorkflowTurn({ schedule: { @@ -210,16 +262,13 @@ describe("plugin scheduled turns", () => { it("builds payloads accepted by the real cron.add protocol validator", async () => { const { validateCronAddParams } = await import("../../gateway/protocol/index.js"); - workflowMocks.callGatewayTool.mockImplementation(async (method: string, _opts, body) => { - if (method === "cron.add") { - expect(validateCronAddParams(body)).toBe(true); - expect((body as { delivery?: unknown }).delivery).toEqual({ - mode: "announce", - channel: "last", - }); - return { id: "cron-compatible-job" }; - } - return { ok: true }; + workflowMocks.cronAdd.mockImplementation(async (body: unknown) => { + expect(validateCronAddParams(body)).toBe(true); + expect((body as { delivery?: unknown }).delivery).toEqual({ + mode: "announce", + channel: "last", + }); + return makeCronJob({ id: "cron-compatible-job" }); }); const handle = await scheduleWorkflowTurn({ @@ -233,43 +282,44 @@ describe("plugin scheduled turns", () => { it("pages through cron.list when unscheduling tagged turns", async () => { const removed: string[] = []; const listRequests: unknown[] = []; - workflowMocks.callGatewayTool.mockImplementation( - async (method: string, _opts: unknown, body: unknown) => { - if (method === "cron.list") { - const offset = (body as { offset?: unknown }).offset; - listRequests.push(body); - if (offset === undefined) { - return { - jobs: [ - { - id: "job-page-1", - name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1", - sessionTarget: "session:agent:main:main", - }, - ], - hasMore: true, - nextOffset: 200, - }; - } - return { - jobs: [ - { - id: "job-page-2", - name: "plugin:workflow-plugin:tag:nudge:agent:main:main:2", - sessionTarget: "session:agent:main:main", - }, - ], - hasMore: false, - nextOffset: null, - }; - } - if (method === "cron.remove") { - removed.push((body as { id?: string }).id ?? ""); - return { ok: true, removed: true }; - } - return { ok: true }; - }, - ); + workflowMocks.cronListPage.mockImplementation(async (body: unknown) => { + const offset = (body as { offset?: unknown }).offset; + listRequests.push(body); + if (offset === undefined) { + return { + jobs: [ + makeCronJob({ + id: "job-page-1", + name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1", + sessionTarget: "session:agent:main:main", + }), + ], + total: 2, + offset: 0, + limit: 200, + hasMore: true, + nextOffset: 200, + }; + } + return { + jobs: [ + makeCronJob({ + id: "job-page-2", + name: "plugin:workflow-plugin:tag:nudge:agent:main:main:2", + sessionTarget: "session:agent:main:main", + }), + ], + total: 2, + offset: 200, + limit: 200, + hasMore: false, + nextOffset: null, + }; + }); + workflowMocks.cronRemove.mockImplementation(async (id: string) => { + removed.push(id); + return { ok: true, removed: true }; + }); await expect(unscheduleWorkflowTurnsByTag()).resolves.toEqual({ removed: 2, failed: 0 }); expect(listRequests).toEqual([ @@ -293,13 +343,7 @@ describe("plugin scheduled turns", () => { }); it("tracks scheduled session turns using cron.add's top-level job id", async () => { - workflowMocks.callGatewayTool.mockResolvedValueOnce({ - id: "cron-top-level-id", - payload: { - id: "payload-body-id", - kind: "agentTurn", - }, - }); + workflowMocks.cronAdd.mockResolvedValueOnce(makeCronJob({ id: "cron-top-level-id" })); await expect( scheduleWorkflowTurn({ @@ -325,15 +369,10 @@ describe("plugin scheduled turns", () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-05-01T00:00:00.000Z")); const removed: string[] = []; - workflowMocks.callGatewayTool.mockImplementation(async (method: string, _opts, body) => { - if (method === "cron.add") { - return { id: "one-shot-job" }; - } - if (method === "cron.remove") { - removed.push((body as { id?: string }).id ?? ""); - return { ok: true, removed: false }; - } - return { ok: true }; + workflowMocks.cronAdd.mockResolvedValue(makeCronJob({ id: "one-shot-job" })); + workflowMocks.cronRemove.mockImplementation(async (id: string) => { + removed.push(id); + return { ok: true, removed: false }; }); await expect( @@ -412,11 +451,11 @@ describe("plugin scheduled turns", () => { }, }), ).resolves.toBeUndefined(); - expect(workflowMocks.callGatewayTool).not.toHaveBeenCalled(); + expect(workflowMocks.cronAdd).not.toHaveBeenCalled(); }); it("falls back to a valid delay schedule when a malformed cron value is absent", async () => { - mockCronAdd({ id: "delay-job" }); + mockCronAdd(makeCronJob({ id: "delay-job" })); const handle = await scheduleWorkflowTurn({ schedule: { @@ -431,19 +470,14 @@ describe("plugin scheduled turns", () => { it("removes a stale cron job when the plugin unloads after cron.add", async () => { let commit = true; const removed: string[] = []; - workflowMocks.callGatewayTool.mockImplementation( - async (method: string, _opts: unknown, body: unknown) => { - if (method === "cron.add") { - commit = false; - return { payload: { jobId: "job-stale" } }; - } - if (method === "cron.remove") { - removed.push((body as { id?: string }).id ?? ""); - return { ok: true, removed: true }; - } - return { ok: true }; - }, - ); + workflowMocks.cronAdd.mockImplementation(async () => { + commit = false; + return makeCronJob({ id: "job-stale" }); + }); + workflowMocks.cronRemove.mockImplementation(async (id: string) => { + removed.push(id); + return { ok: true, removed: true }; + }); await expect( scheduleWorkflowTurn({ @@ -472,15 +506,8 @@ describe("plugin scheduled turns", () => { } };`, }); - workflowMocks.callGatewayTool.mockImplementation(async (method: string) => { - if (method === "cron.add") { - return { id: "loader-scheduled-job" }; - } - if (method === "cron.remove") { - return { ok: true, removed: true }; - } - return { ok: true }; - }); + workflowMocks.cronAdd.mockResolvedValue(makeCronJob({ id: "loader-scheduled-job" })); + workflowMocks.cronRemove.mockResolvedValue({ ok: true, removed: true }); const registry = withEnv( { @@ -490,6 +517,7 @@ describe("plugin scheduled turns", () => { () => loadOpenClawPlugins({ cache: false, + hostServices: { cron }, config: { plugins: { enabled: true, @@ -506,16 +534,14 @@ describe("plugin scheduled turns", () => { expect(registry.plugins.find((plugin) => plugin.id === "loader-scheduler")?.status).toBe( "loaded", ); - await vi.waitFor(() => { - const cronAddCall = workflowMocks.callGatewayTool.mock.calls.find( - (args) => args[0] === "cron.add", - ); - expect(cronAddCall).toBeDefined(); - const body = cronAddCall?.[2] as Record | undefined; - expect(body?.sessionTarget).toBe("session:agent:main:main"); - expect(body?.payload).toEqual({ kind: "agentTurn", message: "wake" }); - expect(cronAddCall?.[3]).toEqual({ scopes: ["operator.admin"] }); - }); + await vi.waitFor(() => + expect(workflowMocks.cronAdd).toHaveBeenCalledWith( + expect.objectContaining({ + sessionTarget: "session:agent:main:main", + payload: { kind: "agentTurn", message: "wake" }, + }), + ), + ); expect(listPluginSessionSchedulerJobs("loader-scheduler")).toEqual([ { id: "loader-scheduled-job", @@ -581,32 +607,32 @@ describe("plugin scheduled turns", () => { }); const addedJobs: Array> = []; const removedJobIds = new Set(); - workflowMocks.callGatewayTool.mockImplementation(async (method: string, _opts, body) => { - if (method === "cron.add") { - const id = `loader-scheduled-job-${addedJobs.length + 1}`; - addedJobs.push({ - id, - ...(body as Record), - }); - return { id }; + workflowMocks.cronAdd.mockImplementation(async (body: CronJobCreate) => { + const id = `loader-scheduled-job-${addedJobs.length + 1}`; + addedJobs.push({ + id, + ...(body as Record), + }); + return makeCronJob({ id, ...body }); + }); + workflowMocks.cronListPage.mockImplementation(async () => ({ + jobs: addedJobs + .filter((job) => { + const id = typeof job.id === "string" ? job.id : ""; + return id && !removedJobIds.has(id); + }) + .map((job) => makeCronJob(job as Partial & { id: string })), + total: addedJobs.length, + offset: 0, + limit: 200, + hasMore: false, + nextOffset: null, + })); + workflowMocks.cronRemove.mockImplementation(async (id: string) => { + if (id) { + removedJobIds.add(id); } - if (method === "cron.list") { - return { - jobs: addedJobs.filter((job) => { - const id = typeof job.id === "string" ? job.id : ""; - return id && !removedJobIds.has(id); - }), - }; - } - if (method === "cron.remove") { - const id = - typeof (body as { id?: unknown })?.id === "string" ? (body as { id: string }).id : ""; - if (id) { - removedJobIds.add(id); - } - return { ok: true, removed: true }; - } - return { ok: true }; + return { ok: true, removed: true }; }); const registry = withEnv( @@ -617,6 +643,7 @@ describe("plugin scheduled turns", () => { () => loadOpenClawPlugins({ cache: false, + hostServices: { cron }, config: { plugins: { enabled: true, @@ -682,18 +709,13 @@ describe("plugin scheduled turns", () => { it("keeps stale scheduled-turn rollback non-throwing when cron cleanup fails", async () => { let commit = true; - workflowMocks.callGatewayTool.mockImplementation( - async (method: string, _opts: unknown, body: unknown) => { - if (method === "cron.add") { - commit = false; - return { id: "job-stale" }; - } - if (method === "cron.remove") { - throw new Error(`remove failed for ${(body as { id?: string }).id}`); - } - return { ok: true }; - }, - ); + workflowMocks.cronAdd.mockImplementation(async () => { + commit = false; + return makeCronJob({ id: "job-stale" }); + }); + workflowMocks.cronRemove.mockImplementation(async (id: string) => { + throw new Error(`remove failed for ${id}`); + }); await expect( scheduleWorkflowTurn({ @@ -705,17 +727,10 @@ describe("plugin scheduled turns", () => { }); it("keeps scheduled-turn records when cleanup fails", async () => { - workflowMocks.callGatewayTool.mockImplementation( - async (method: string, _opts: unknown, body: unknown) => { - if (method === "cron.add") { - return { id: "cleanup-failure-job" }; - } - if (method === "cron.remove") { - throw new Error(`remove failed for ${(body as { id?: string }).id}`); - } - return { ok: true }; - }, - ); + workflowMocks.cronAdd.mockResolvedValue(makeCronJob({ id: "cleanup-failure-job" })); + workflowMocks.cronRemove.mockImplementation(async (id: string) => { + throw new Error(`remove failed for ${id}`); + }); const cleanupFailureHandle = await scheduleWorkflowTurn({ pluginName: "Workflow Plugin", @@ -741,18 +756,11 @@ describe("plugin scheduled turns", () => { it("cleans live dynamic scheduled turns when registry cleanup records are empty", async () => { const removed: string[] = []; - workflowMocks.callGatewayTool.mockImplementation( - async (method: string, _opts: unknown, body: unknown) => { - if (method === "cron.add") { - return { id: "dynamic-cleanup-job" }; - } - if (method === "cron.remove") { - removed.push((body as { id?: string }).id ?? ""); - return { ok: true, removed: true }; - } - return { ok: true }; - }, - ); + workflowMocks.cronAdd.mockResolvedValue(makeCronJob({ id: "dynamic-cleanup-job" })); + workflowMocks.cronRemove.mockImplementation(async (id: string) => { + removed.push(id); + return { ok: true, removed: true }; + }); const dynamicCleanupHandle = await scheduleWorkflowTurn(); expectSessionTurnHandle(dynamicCleanupHandle, "dynamic-cleanup-job"); @@ -771,19 +779,13 @@ describe("plugin scheduled turns", () => { it("preserves replacement-generation runtime scheduled turns during restart cleanup", async () => { const removed: string[] = []; const scheduledIds = ["old-runtime-job", "new-runtime-job"]; - workflowMocks.callGatewayTool.mockImplementation( - async (method: string, _opts: unknown, body: unknown) => { - if (method === "cron.add") { - const id = scheduledIds.shift() ?? "unexpected-job"; - return { id }; - } - if (method === "cron.remove") { - removed.push((body as { id?: string }).id ?? ""); - return { ok: true, removed: true }; - } - return { ok: true }; - }, + workflowMocks.cronAdd.mockImplementation(async () => + makeCronJob({ id: scheduledIds.shift() ?? "unexpected-job" }), ); + workflowMocks.cronRemove.mockImplementation(async (id: string) => { + removed.push(id); + return { ok: true, removed: true }; + }); const previousFixture = createPluginRegistryFixture(); previousFixture.registry.registry.plugins.push( @@ -836,18 +838,11 @@ describe("plugin scheduled turns", () => { it("treats already-missing cron jobs as successful scheduled-turn cleanup", async () => { const removed: string[] = []; - workflowMocks.callGatewayTool.mockImplementation( - async (method: string, _opts: unknown, body: unknown) => { - if (method === "cron.add") { - return { id: "already-missing-job" }; - } - if (method === "cron.remove") { - removed.push((body as { id?: string }).id ?? ""); - return { ok: true, removed: false }; - } - return { ok: true }; - }, - ); + workflowMocks.cronAdd.mockResolvedValue(makeCronJob({ id: "already-missing-job" })); + workflowMocks.cronRemove.mockImplementation(async (id: string) => { + removed.push(id); + return { ok: true, removed: false }; + }); const alreadyMissingHandle = await scheduleWorkflowTurn(); expectSessionTurnHandle(alreadyMissingHandle, "already-missing-job"); @@ -865,42 +860,42 @@ describe("plugin scheduled turns", () => { it("removes only matching plugin tag jobs in the requested session", async () => { const removed: string[] = []; const listQueries: unknown[] = []; - workflowMocks.callGatewayTool.mockImplementation( - async (method: string, _opts: unknown, body: unknown) => { - if (method === "cron.list") { - listQueries.push((body as { query?: unknown }).query); - return { - jobs: [ - { - id: "job-a", - name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1", - sessionTarget: "session:agent:main:main", - }, - { - id: "job-b", - name: "plugin:workflow-plugin:tag:nudge:agent:main:main:2", - sessionTarget: "session:agent:main:main", - }, - { - id: "job-c", - name: "plugin:other-plugin:tag:nudge:agent:main:main:1", - sessionTarget: "session:agent:main:main", - }, - { - id: "job-d", - name: "plugin:workflow-plugin:tag:nudge:agent:other:main:1", - sessionTarget: "session:agent:other:main", - }, - ], - }; - } - if (method === "cron.remove") { - removed.push((body as { id?: string }).id ?? ""); - return { ok: true, removed: true }; - } - return { ok: true }; - }, - ); + workflowMocks.cronListPage.mockImplementation(async (body: unknown) => { + listQueries.push((body as { query?: unknown }).query); + return { + jobs: [ + makeCronJob({ + id: "job-a", + name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1", + sessionTarget: "session:agent:main:main", + }), + makeCronJob({ + id: "job-b", + name: "plugin:workflow-plugin:tag:nudge:agent:main:main:2", + sessionTarget: "session:agent:main:main", + }), + makeCronJob({ + id: "job-c", + name: "plugin:other-plugin:tag:nudge:agent:main:main:1", + sessionTarget: "session:agent:main:main", + }), + makeCronJob({ + id: "job-d", + name: "plugin:workflow-plugin:tag:nudge:agent:other:main:1", + sessionTarget: "session:agent:other:main", + }), + ], + total: 4, + offset: 0, + limit: 200, + hasMore: false, + nextOffset: null, + }; + }); + workflowMocks.cronRemove.mockImplementation(async (id: string) => { + removed.push(id); + return { ok: true, removed: true }; + }); await expect(unscheduleWorkflowTurnsByTag()).resolves.toEqual({ removed: 2, failed: 0 }); expect(listQueries).toEqual(["plugin:workflow-plugin:tag:nudge:agent:main:main:"]); @@ -909,35 +904,33 @@ describe("plugin scheduled turns", () => { it("prunes runtime scheduler records after tagged unschedule removes jobs", async () => { let addCount = 0; - workflowMocks.callGatewayTool.mockImplementation( - async (method: string, _opts: unknown, body: unknown) => { - if (method === "cron.add") { - addCount += 1; - return { id: `job-${addCount}` }; - } - if (method === "cron.list") { - return { - jobs: [ - { - id: "job-1", - name: "plugin:workflow-plugin:tag:nudge:agent:main:main:first", - sessionTarget: "session:agent:main:main", - }, - { - id: "job-2", - name: "plugin:workflow-plugin:tag:nudge:agent:main:main:second", - sessionTarget: "session:agent:main:main", - }, - ], - }; - } - if (method === "cron.remove") { - expect(["job-1", "job-2"]).toContain((body as { id?: unknown }).id); - return { ok: true, removed: true }; - } - return { ok: true }; - }, - ); + workflowMocks.cronAdd.mockImplementation(async () => { + addCount += 1; + return makeCronJob({ id: `job-${addCount}` }); + }); + workflowMocks.cronListPage.mockResolvedValue({ + jobs: [ + makeCronJob({ + id: "job-1", + name: "plugin:workflow-plugin:tag:nudge:agent:main:main:first", + sessionTarget: "session:agent:main:main", + }), + makeCronJob({ + id: "job-2", + name: "plugin:workflow-plugin:tag:nudge:agent:main:main:second", + sessionTarget: "session:agent:main:main", + }), + ], + total: 2, + offset: 0, + limit: 200, + hasMore: false, + nextOffset: null, + }); + workflowMocks.cronRemove.mockImplementation(async (id: string) => { + expect(["job-1", "job-2"]).toContain(id); + return { ok: true, removed: true }; + }); await scheduleWorkflowTurn({ schedule: { @@ -960,61 +953,57 @@ describe("plugin scheduled turns", () => { }); it("counts cron.list and cron.remove failures when unscheduling by tag", async () => { - workflowMocks.callGatewayTool.mockRejectedValueOnce(new Error("cron list unavailable")); + workflowMocks.cronListPage.mockRejectedValueOnce(new Error("cron list unavailable")); await expect(unscheduleWorkflowTurnsByTag()).resolves.toEqual({ removed: 0, failed: 1 }); - workflowMocks.callGatewayTool.mockReset(); - workflowMocks.callGatewayTool.mockImplementation( - async (method: string, _opts: unknown, body: unknown) => { - if (method === "cron.list") { - return { - jobs: [ - { - id: "job-ok", - name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1", - sessionTarget: "session:agent:main:main", - }, - { - id: "job-fail", - name: "plugin:workflow-plugin:tag:nudge:agent:main:main:2", - sessionTarget: "session:agent:main:main", - }, - ], - }; - } - if (method === "cron.remove" && (body as { id?: string }).id === "job-fail") { - throw new Error("remove failed"); - } - if (method === "cron.remove") { - return { ok: true, removed: true }; - } - return { ok: true }; - }, - ); + workflowMocks.cronListPage.mockReset(); + workflowMocks.cronListPage.mockResolvedValue({ + jobs: [ + makeCronJob({ + id: "job-ok", + name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1", + sessionTarget: "session:agent:main:main", + }), + makeCronJob({ + id: "job-fail", + name: "plugin:workflow-plugin:tag:nudge:agent:main:main:2", + sessionTarget: "session:agent:main:main", + }), + ], + total: 2, + offset: 0, + limit: 200, + hasMore: false, + nextOffset: null, + }); + workflowMocks.cronRemove.mockImplementation(async (id: string) => { + if (id === "job-fail") { + throw new Error("remove failed"); + } + return { ok: true, removed: true }; + }); await expect(unscheduleWorkflowTurnsByTag()).resolves.toEqual({ removed: 1, failed: 1 }); - workflowMocks.callGatewayTool.mockReset(); - workflowMocks.callGatewayTool.mockImplementation( - async (method: string, _opts: unknown, body: unknown) => { - if (method === "cron.list") { - return { - jobs: [ - { - id: "job-missing", - name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1", - sessionTarget: "session:agent:main:main", - }, - ], - }; - } - if (method === "cron.remove") { - expect((body as { id?: string }).id).toBe("job-missing"); - return { ok: true, removed: false }; - } - return { ok: true }; - }, - ); + workflowMocks.cronListPage.mockReset(); + workflowMocks.cronListPage.mockResolvedValue({ + jobs: [ + makeCronJob({ + id: "job-missing", + name: "plugin:workflow-plugin:tag:nudge:agent:main:main:1", + sessionTarget: "session:agent:main:main", + }), + ], + total: 1, + offset: 0, + limit: 200, + hasMore: false, + nextOffset: null, + }); + workflowMocks.cronRemove.mockImplementation(async (id: string) => { + expect(id).toBe("job-missing"); + return { ok: true, removed: false }; + }); await expect(unscheduleWorkflowTurnsByTag()).resolves.toEqual({ removed: 0, failed: 1 }); }); @@ -1030,23 +1019,13 @@ describe("plugin scheduled turns", () => { await expect( unscheduleWorkflowTurnsByTag({ sessionKey: MAIN_SESSION_KEY, tag: "nudge:followup" }), ).resolves.toEqual({ removed: 0, failed: 0 }); - expect(workflowMocks.callGatewayTool).not.toHaveBeenCalled(); + expect(workflowMocks.cronListPage).not.toHaveBeenCalled(); + expect(workflowMocks.cronRemove).not.toHaveBeenCalled(); }); it("wires schedule and unschedule through the plugin API with stale-registry protection", async () => { - workflowMocks.callGatewayTool.mockImplementation(async (method: string) => { - if (method === "cron.add") { - return { payload: { jobId: "job-live" } }; - } - if (method === "cron.list") { - return { jobs: [] }; - } - if (method === "cron.remove") { - return { ok: true, removed: true }; - } - return { ok: true }; - }); - const { config, registry } = createPluginRegistryFixture(); + workflowMocks.cronAdd.mockResolvedValue(makeCronJob({ id: "job-live" })); + const { config, registry } = createPluginRegistryFixture({}, { hostServices: { cron } }); let capturedApi: OpenClawPluginApi | undefined; registerTestPlugin({ registry, @@ -1091,16 +1070,98 @@ describe("plugin scheduled turns", () => { ).resolves.toEqual({ removed: 0, failed: 0 }); }); + it("resolves live cron service for captured plugin scheduled-turn APIs", async () => { + const firstCron = createMockCronService(); + const secondCron = createMockCronService(); + const firstAdd = vi.fn(async () => makeCronJob({ id: "first-cron-job" })); + const secondAdd = vi.fn(async () => makeCronJob({ id: "second-cron-job" })); + const firstListPage = vi.fn(async () => { + throw new Error("stale cron list used"); + }); + const firstRemove = vi.fn(async () => { + throw new Error("stale cron remove used"); + }); + const secondListPage = vi.fn(async () => ({ + jobs: [ + makeCronJob({ + id: "second-cron-existing-job", + name: "plugin:scheduler-plugin:tag:nudge:agent:main:main:1", + sessionTarget: "session:agent:main:main", + }), + ], + total: 1, + offset: 0, + limit: 200, + hasMore: false, + nextOffset: null, + })); + const secondRemove = vi.fn(async () => ({ ok: true, removed: true }) as const); + firstCron.add = firstAdd; + firstCron.listPage = firstListPage; + firstCron.remove = firstRemove; + secondCron.add = secondAdd; + secondCron.listPage = secondListPage; + secondCron.remove = secondRemove; + let liveCron = firstCron; + const hostServices = { + get cron() { + return liveCron; + }, + }; + const { config, registry } = createPluginRegistryFixture({}, { hostServices }); + let capturedApi: OpenClawPluginApi | undefined; + registerTestPlugin({ + registry, + config, + record: createPluginRecord({ + id: "scheduler-plugin", + name: "Scheduler Plugin", + origin: "bundled", + }), + register(api) { + capturedApi = api; + }, + }); + setActivePluginRegistry(registry.registry); + + await expect( + capturedApi?.session.workflow.scheduleSessionTurn({ + sessionKey: "agent:main:main", + message: "wake", + delayMs: 10, + }), + ).resolves.toMatchObject({ id: "first-cron-job" }); + liveCron = secondCron; + await expect( + capturedApi?.session.workflow.scheduleSessionTurn({ + sessionKey: "agent:main:main", + message: "wake again", + delayMs: 10, + }), + ).resolves.toMatchObject({ id: "second-cron-job" }); + await expect( + capturedApi?.session.workflow.unscheduleSessionTurnsByTag({ + sessionKey: "agent:main:main", + tag: "nudge", + }), + ).resolves.toEqual({ removed: 1, failed: 0 }); + + expect(firstAdd).toHaveBeenCalledTimes(1); + expect(secondAdd).toHaveBeenCalledTimes(1); + expect(firstListPage).not.toHaveBeenCalled(); + expect(firstRemove).not.toHaveBeenCalled(); + expect(secondListPage).toHaveBeenCalledTimes(1); + expect(secondRemove).toHaveBeenCalledWith("second-cron-existing-job"); + }); + it("blocks registration-time schedule and unschedule calls before activation", async () => { // Drain any cleanup microtasks queued by the previous test's // setActivePluginRegistry calls; setActivePluginRegistry schedules - // cleanup via fire-and-forget dynamic imports that may resolve and - // invoke callGatewayTool after this test's mockReset. + // cleanup via fire-and-forget dynamic imports that may resolve after + // this test's mockReset. for (let i = 0; i < 8; i++) { await Promise.resolve(); } - workflowMocks.callGatewayTool.mockReset(); - workflowMocks.callGatewayTool.mockResolvedValue({ ok: true }); const activeFixture = createPluginRegistryFixture(); setActivePluginRegistry(activeFixture.registry.registry); @@ -1127,6 +1188,8 @@ describe("plugin scheduled turns", () => { tag: "nudge", }), ).resolves.toEqual({ removed: 0, failed: 0 }); - expect(workflowMocks.callGatewayTool).not.toHaveBeenCalled(); + expect(workflowMocks.cronAdd).not.toHaveBeenCalled(); + expect(workflowMocks.cronListPage).not.toHaveBeenCalled(); + expect(workflowMocks.cronRemove).not.toHaveBeenCalled(); }); }); diff --git a/src/plugins/host-hook-scheduled-turns.ts b/src/plugins/host-hook-scheduled-turns.ts index bc1f18aeb58..2d7b48ef245 100644 --- a/src/plugins/host-hook-scheduled-turns.ts +++ b/src/plugins/host-hook-scheduled-turns.ts @@ -1,5 +1,6 @@ import { randomUUID } from "node:crypto"; -import { ADMIN_SCOPE } from "../gateway/operator-scopes.js"; +import type { CronServiceContract } from "../cron/service-contract.js"; +import type { CronJob, CronJobCreate } from "../cron/types.js"; import { formatErrorMessage } from "../infra/errors.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; @@ -20,9 +21,6 @@ const log = createSubsystemLogger("plugins/host-scheduled-turns"); const PLUGIN_CRON_NAME_PREFIX = "plugin:"; const PLUGIN_CRON_TAG_MARKER = ":tag:"; -type CallGatewayTool = typeof import("../agents/tools/gateway.js").callGatewayTool; -let callGatewayToolPromise: Promise | undefined; - type ResolvedSessionTurnSchedule = | { kind: "cron"; @@ -34,16 +32,6 @@ type ResolvedSessionTurnSchedule = at: string; }; -async function callGatewayToolLazy( - ...args: Parameters -): Promise>> { - callGatewayToolPromise ??= import("../agents/tools/gateway.js").then( - (module) => module.callGatewayTool, - ); - const callGatewayTool = await callGatewayToolPromise; - return callGatewayTool(...args); -} - function resolveSchedule( params: PluginSessionTurnScheduleParams, ): ResolvedSessionTurnSchedule | undefined { @@ -108,18 +96,14 @@ function formatScheduleLogContext(params: { } async function removeScheduledSessionTurn(params: { + cron: CronServiceContract; jobId: string; pluginId: string; sessionKey?: string; name?: string; }): Promise { try { - const result = await callGatewayToolLazy( - "cron.remove", - {}, - { id: params.jobId }, - { scopes: [ADMIN_SCOPE] }, - ); + const result = await params.cron.remove(params.jobId); return didCronCleanupJob(result); } catch (error) { log.warn( @@ -129,48 +113,12 @@ async function removeScheduledSessionTurn(params: { } } -function unwrapGatewayPayload(value: unknown): unknown { - if (!isCronJobRecord(value)) { - return value; - } - const payload = value.payload; - return isCronJobRecord(payload) ? payload : value; -} - function didCronRemoveJob(value: unknown): boolean { - const result = unwrapGatewayPayload(value); - if (!isCronJobRecord(result)) { - return false; - } - return result.ok !== false && result.removed === true; + return isCronRemoveResult(value) && value.ok && value.removed; } function didCronCleanupJob(value: unknown): boolean { - const result = unwrapGatewayPayload(value); - if (!isCronJobRecord(result) || result.ok === false) { - return false; - } - return result.removed === true || result.removed === false; -} - -function normalizeCronJobId(value: unknown): string | undefined { - return typeof value === "string" && value.trim() ? value.trim() : undefined; -} - -function extractCronJobId(value: unknown): string | undefined { - if (!value || typeof value !== "object") { - return undefined; - } - const record = value as Record; - const topLevelId = normalizeCronJobId(record.jobId ?? record.id); - if (topLevelId) { - return topLevelId; - } - const payload = - record.payload && typeof record.payload === "object" - ? (record.payload as Record) - : record; - return normalizeCronJobId(payload.jobId ?? payload.id); + return isCronRemoveResult(value) && value.ok; } const PLUGIN_CRON_RESERVED_DELIMITER = ":"; @@ -210,65 +158,41 @@ function buildPluginSchedulerTagPrefix(params: { return `${PLUGIN_CRON_NAME_PREFIX}${params.pluginId}${PLUGIN_CRON_TAG_MARKER}${params.tag}:${params.sessionKey}:`; } -function isCronJobRecord(value: unknown): value is Record { - return Boolean(value) && typeof value === "object" && !Array.isArray(value); -} - -function readCronListJobs(value: unknown): Record[] { - if (Array.isArray(value)) { - return value.filter(isCronJobRecord); - } - if (isCronJobRecord(value)) { - const jobs = (value as { jobs?: unknown }).jobs; - if (Array.isArray(jobs)) { - return jobs.filter(isCronJobRecord); - } - } - return []; -} - -function readCronListNextOffset(value: unknown): number | undefined { - if (!isCronJobRecord(value)) { - return undefined; - } - const nextOffset = value.nextOffset; - return typeof nextOffset === "number" && Number.isInteger(nextOffset) && nextOffset >= 0 - ? nextOffset - : undefined; -} - -function readCronListHasMore(value: unknown): boolean { - return isCronJobRecord(value) && value.hasMore === true; +function isCronRemoveResult( + value: unknown, +): value is Awaited> { + return ( + Boolean(value) && + typeof value === "object" && + !Array.isArray(value) && + typeof (value as { ok?: unknown }).ok === "boolean" && + typeof (value as { removed?: unknown }).removed === "boolean" + ); } async function listAllCronJobsForPluginTagCleanup( + cron: CronServiceContract, query: string, -): Promise[]> { - const jobs: Record[] = []; +): Promise { + const jobs: CronJob[] = []; let offset = 0; for (;;) { - const listResult = await callGatewayToolLazy( - "cron.list", - {}, - { - includeDisabled: true, - limit: 200, - query, - sortBy: "name", - sortDir: "asc", - ...(offset > 0 ? { offset } : {}), - }, - { scopes: [ADMIN_SCOPE] }, - ); - jobs.push(...readCronListJobs(listResult)); - if (!readCronListHasMore(listResult)) { + const listResult = await cron.listPage({ + includeDisabled: true, + limit: 200, + query, + sortBy: "name", + sortDir: "asc", + ...(offset > 0 ? { offset } : {}), + }); + jobs.push(...listResult.jobs); + if (!listResult.hasMore) { return jobs; } - const nextOffset = readCronListNextOffset(listResult); - if (nextOffset === undefined || nextOffset <= offset) { + if (listResult.nextOffset === null || listResult.nextOffset <= offset) { return jobs; } - offset = nextOffset; + offset = listResult.nextOffset; } } @@ -278,6 +202,7 @@ export async function schedulePluginSessionTurn(params: { origin?: PluginOrigin; schedule: PluginSessionTurnScheduleParams; shouldCommit?: () => boolean; + cron?: CronServiceContract; ownerRegistry?: PluginRegistry; }): Promise { if (params.origin !== "bundled") { @@ -330,36 +255,43 @@ export async function schedulePluginSessionTurn(params: { if (params.shouldCommit && !params.shouldCommit()) { return undefined; } + if (!params.cron) { + log.warn( + `plugin session turn scheduling failed (${formatScheduleLogContext({ + pluginId: params.pluginId, + sessionKey, + ...(scheduleName ? { name: scheduleName } : {}), + })}): cron service unavailable`, + ); + return undefined; + } + const cron = params.cron; const cronJobName = buildPluginSchedulerCronName({ pluginId: params.pluginId, sessionKey, ...(tag !== undefined ? { tag } : {}), ...(scheduleName ? { uniqueId: scheduleName } : {}), }); - const cronPayload: Record = { + const cronPayload: CronJobCreate["payload"] = { kind: "agentTurn", message, }; - let result: unknown; + let result: Awaited>; try { - result = await callGatewayToolLazy( - "cron.add", - {}, - { - name: cronJobName, - schedule: cronSchedule, - sessionTarget: `session:${sessionKey}`, - payload: cronPayload, - ...(params.schedule.agentId ? { agentId: params.schedule.agentId } : {}), - deleteAfterRun: params.schedule.deleteAfterRun ?? cronSchedule.kind === "at", - wakeMode: "now", - delivery: { - mode: cronDeliveryMode, - ...(cronDeliveryMode === "announce" ? { channel: "last" } : {}), - }, + result = await cron.add({ + name: cronJobName, + enabled: true, + schedule: cronSchedule, + sessionTarget: `session:${sessionKey}`, + payload: cronPayload, + ...(params.schedule.agentId ? { agentId: params.schedule.agentId } : {}), + deleteAfterRun: params.schedule.deleteAfterRun ?? cronSchedule.kind === "at", + wakeMode: "now", + delivery: { + mode: cronDeliveryMode, + ...(cronDeliveryMode === "announce" ? { channel: "last" } : {}), }, - { scopes: [ADMIN_SCOPE] }, - ); + }); } catch (error) { log.warn( `plugin session turn scheduling failed (${formatScheduleLogContext({ @@ -370,12 +302,13 @@ export async function schedulePluginSessionTurn(params: { ); return undefined; } - const jobId = extractCronJobId(result); + const jobId = result.id; if (!jobId) { return undefined; } if (params.shouldCommit && !params.shouldCommit()) { const removed = await removeScheduledSessionTurn({ + cron, jobId, pluginId: params.pluginId, sessionKey, @@ -403,6 +336,7 @@ export async function schedulePluginSessionTurn(params: { kind: "session-turn", cleanup: async () => { const removed = await removeScheduledSessionTurn({ + cron, jobId, pluginId: params.pluginId, sessionKey, @@ -420,6 +354,7 @@ export async function schedulePluginSessionTurn(params: { export async function unschedulePluginSessionTurnsByTag(params: { pluginId: string; origin?: PluginOrigin; + cron?: CronServiceContract; request: PluginSessionTurnUnscheduleByTagParams; }): Promise { if (params.origin !== "bundled") { @@ -430,37 +365,35 @@ export async function unschedulePluginSessionTurnsByTag(params: { if (!sessionKey || !tag || invalidTag) { return { removed: 0, failed: 0 }; } + if (!params.cron) { + log.warn("plugin session turn untag-list failed: cron service unavailable"); + return { removed: 0, failed: 1 }; + } + const cron = params.cron; const namePrefix = buildPluginSchedulerTagPrefix({ pluginId: params.pluginId, tag, sessionKey, }); - let jobs: Record[]; + let jobs: CronJob[]; try { - jobs = await listAllCronJobsForPluginTagCleanup(namePrefix); + jobs = await listAllCronJobsForPluginTagCleanup(cron, namePrefix); } catch (error) { log.warn(`plugin session turn untag-list failed: ${formatErrorMessage(error)}`); return { removed: 0, failed: 1 }; } const candidates = jobs.filter((job) => { - const name = typeof job.name === "string" ? job.name : ""; - const target = typeof job.sessionTarget === "string" ? job.sessionTarget : ""; - return name.startsWith(namePrefix) && target === `session:${sessionKey}`; + return job.name.startsWith(namePrefix) && job.sessionTarget === `session:${sessionKey}`; }); let removed = 0; let failed = 0; for (const job of candidates) { - const id = typeof job.id === "string" ? job.id.trim() : ""; + const id = job.id.trim(); if (!id) { continue; } try { - const result = await callGatewayToolLazy( - "cron.remove", - {}, - { id }, - { scopes: [ADMIN_SCOPE] }, - ); + const result = await cron.remove(id); if (didCronRemoveJob(result)) { removed += 1; deletePluginSessionSchedulerJob({ diff --git a/src/plugins/loader.ts b/src/plugins/loader.ts index b3e00a4d58e..22d43931a1b 100644 --- a/src/plugins/loader.ts +++ b/src/plugins/loader.ts @@ -122,6 +122,7 @@ import { } from "./plugin-scope.js"; import { ensureOpenClawPluginSdkAlias } from "./plugin-sdk-dist-alias.js"; import { createEmptyPluginRegistry } from "./registry-empty.js"; +import type { PluginRegistryParams } from "./registry-types.js"; import { createPluginRegistry, type PluginRecord, type PluginRegistry } from "./registry.js"; import { getActivePluginRegistry, @@ -169,6 +170,7 @@ export type PluginLoadOptions = { logger?: PluginLogger; coreGatewayHandlers?: Record; coreGatewayMethodNames?: readonly string[]; + hostServices?: PluginRegistryParams["hostServices"]; runtimeOptions?: CreatePluginRuntimeOptions; pluginSdkResolution?: PluginSdkResolutionPreference; cache?: boolean; @@ -1647,6 +1649,9 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi ...(options.coreGatewayMethodNames !== undefined && { coreGatewayMethodNames: options.coreGatewayMethodNames, }), + ...(options.hostServices !== undefined && { + hostServices: options.hostServices, + }), activateGlobalSideEffects: shouldActivate, }); diff --git a/src/plugins/registry-types.ts b/src/plugins/registry-types.ts index f24b18c3175..027ce092b34 100644 --- a/src/plugins/registry-types.ts +++ b/src/plugins/registry-types.ts @@ -473,6 +473,10 @@ export type PluginRegistryParams = { coreGatewayHandlers?: GatewayRequestHandlers; coreGatewayMethodNames?: readonly string[]; runtime: PluginRuntime; + hostServices?: { + /** May be a live accessor; plugin APIs must read it at call time. */ + cron?: import("../cron/service-contract.js").CronServiceContract; + }; activateGlobalSideEffects?: boolean; }; diff --git a/src/plugins/registry.ts b/src/plugins/registry.ts index 9bd563ee18b..de6c442aa1c 100644 --- a/src/plugins/registry.ts +++ b/src/plugins/registry.ts @@ -339,6 +339,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { ).toSorted(); registry.coreGatewayMethodNames = coreGatewayMethodNames; const coreGatewayMethods = new Set(coreGatewayMethodNames); + const getHostCronService = () => registryParams.hostServices?.cron; const pluginHookRollback = new Map(); const pluginsWithChannelRegistrationConflict = new Set(); const pluginSideEffectGuards = new Map>(); @@ -2719,6 +2720,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { pluginName: record.name, origin: record.origin, schedule, + cron: getHostCronService(), shouldCommit: isLoadedRecordInActiveRegistry, ownerRegistry: registry, }); @@ -2734,6 +2736,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { return unschedulePluginSessionTurnsByTag({ pluginId: record.id, origin: record.origin, + cron: getHostCronService(), request, }); },