From fee91fefceb4652933182ad14fb323a7e94d3c2d Mon Sep 17 00:00:00 2001 From: Josh Lehman Date: Fri, 6 Mar 2026 05:31:59 -0800 Subject: [PATCH] feature(context): extend plugin system to support custom context management (#22201) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(context-engine): add ContextEngine interface and registry Introduce the pluggable ContextEngine abstraction that allows external plugins to register custom context management strategies. - ContextEngine interface with lifecycle methods: bootstrap, ingest, ingestBatch, afterTurn, assemble, compact, prepareSubagentSpawn, onSubagentEnded, dispose - Module-level singleton registry with registerContextEngine() and resolveContextEngine() (config-driven slot selection) - LegacyContextEngine: pass-through implementation wrapping existing compaction behavior for 100% backward compatibility - ensureContextEnginesInitialized() guard for safe one-time registration - 19 tests covering contract, registry, resolution, and legacy parity * feat(plugins): add context-engine slot and registerContextEngine API Wire the ContextEngine abstraction into the plugin system so external plugins can register context engines via the standard plugin API. - Add 'context-engine' to PluginKind union type - Add 'contextEngine' slot to PluginSlotsConfig (default: 'legacy') - Wire registerContextEngine() through OpenClawPluginApi - Export ContextEngine types from plugin-sdk for external consumers - Restore proper slot-based resolution in registry * feat(context-engine): wire ContextEngine into agent run lifecycle Integrate the ContextEngine abstraction into the core agent run path: - Resolve context engine once per run (reused across retries) - Bootstrap: hydrate canonical store from session file on first run - Assemble: route context assembly through pluggable engine - Auto-compaction guard: disable built-in auto-compaction when the engine declares ownsCompaction (prevents double-compaction) - AfterTurn: post-turn lifecycle hook for ingest + background compaction decisions - Overflow compaction: route through contextEngine.compact() - Dispose: clean up engine resources in finally block - Notify context engine on subagent lifecycle events Legacy engine: all lifecycle methods are pass-through/no-op, preserving 100% backward compatibility for users without a context engine plugin. * feat(plugins): add scoped subagent methods and gateway request scope Expose runtime.subagent.{run, waitForRun, getSession, deleteSession} so external plugins can spawn sub-agent sessions without raw gateway dispatch access. Uses AsyncLocalStorage request-scope bridge to dispatch internally via handleGatewayRequest with a synthetic operator client. Methods are only available during gateway request handling. - Symbol.for-backed global singleton for cross-module-reload safety - Fallback gateway context for non-WS dispatch paths (Telegram/WhatsApp) - Set gateway request scope for all handlers, not just plugin handlers - 3 staleness tests for fallback context hardening * feat(context-engine): route /compact and sessions.get through context engine Wire the /compact command and sessions.get handler through the pluggable ContextEngine interface. - Thread tokenBudget and force parameters to context engine compact - Route /compact through contextEngine.compact() when registered - Wire sessions.get as runtime alias for plugin subagent dispatch - Add .pebbles/ to .gitignore * style: format with oxfmt 0.33.0 Fix duplicate import (ControlUiRootState in server.impl.ts) and import ordering across all changed files. * fix: update extension test mocks for context-engine types Add missing subagent property to bluebubbles PluginRuntime mock. Add missing registerContextEngine to lobster OpenClawPluginApi mock. * fix(subagents): keep deferred delete cleanup retryable * style: format run attempt for CI * fix(rebase): remove duplicate embedded-run imports * test: add missing gateway context mock export * fix: pass resolved auth profile into afterTurn compaction Ensure the embedded runner forwards resolved auth profile context into legacy context-engine compaction params on the normal afterTurn path, matching overflow compaction behavior. This allows downstream LCM summarization to use the intended provider auth/profile consistently. Also fix strict TS typing in external-link token dedupe and align an attempt unit test reasoningLevel value with the current ReasoningLevel enum. Regeneration-Prompt: | We were debugging context-engine compaction where downstream summary calls were missing the right auth/profile context in normal afterTurn flow, while overflow compaction already propagated it. Preserve current behavior and keep changes additive: thread the resolved authProfileId through run -> attempt -> legacy compaction param builder without broad refactors. Add tests that prove the auth profile is included in afterTurn legacy params and that overflow compaction still passes it through run attempts. Keep existing APIs stable, and only adjust small type issues needed for strict compilation. * fix: remove duplicate imports from rebase * feat: add context-engine system prompt additions * fix(rebase): dedupe attempt import declarations * test: fix fetch mock typing in ollama autodiscovery * fix(test): add registerContextEngine to diffs extension mock APIs * test(windows): use path.delimiter in ios-team-id fixture PATH * test(cron): add model formatting and precedence edge case tests Covers: - Provider/model string splitting (whitespace, nested paths, empty segments) - Provider normalization (casing, aliases like bedrock→amazon-bedrock) - Anthropic model alias normalization (opus-4.5→claude-opus-4-5) - Precedence: job payload > session override > config default - Sequential runs with different providers (CI flake regression pattern) - forceNew session preserving stored model overrides - Whitespace/empty model string edge cases - Config model as string vs object format * test(cron): fix model formatting test config types * test(phone-control): add registerContextEngine to mock API * fix: re-export ChannelKind from config-reload-plan * fix: add subagent mock to plugin-runtime-mock test util * docs: add changelog fragment for context engine PR #22201 --- CHANGELOG.md | 1 + extensions/diffs/index.test.ts | 2 + extensions/diffs/src/tool.test.ts | 1 + extensions/lobster/src/lobster-tool.test.ts | 1 + extensions/phone-control/index.test.ts | 1 + extensions/test-utils/plugin-runtime-mock.ts | 7 + src/agents/pi-embedded-runner/compact.ts | 54 +- .../run.overflow-compaction.test.ts | 16 + src/agents/pi-embedded-runner/run.ts | 65 ++- .../pi-embedded-runner/run/attempt.test.ts | 54 +- src/agents/pi-embedded-runner/run/attempt.ts | 149 +++++ src/agents/pi-embedded-runner/run/types.ts | 9 + src/agents/pi-settings.ts | 25 + src/agents/subagent-registry.ts | 45 +- src/config/config-misc.test.ts | 13 + src/config/schema.help.ts | 2 + src/config/schema.labels.ts | 1 + src/config/types.plugins.ts | 2 + src/config/zod-schema.ts | 1 + src/context-engine/context-engine.test.ts | 337 +++++++++++ src/context-engine/index.ts | 19 + src/context-engine/init.ts | 23 + src/context-engine/legacy.ts | 115 ++++ src/context-engine/registry.ts | 67 +++ src/context-engine/types.ts | 167 ++++++ .../isolated-agent.model-formatting.test.ts | 541 ++++++++++++++++++ src/gateway/config-reload.ts | 2 +- src/gateway/method-scopes.ts | 1 + src/gateway/server-methods.ts | 22 +- src/gateway/server-methods/sessions.ts | 23 + src/gateway/server-plugins.test.ts | 159 ++++- src/gateway/server-plugins.ts | 164 +++++- ...server.agent.gateway-server-agent.mocks.ts | 20 +- src/gateway/server.impl.ts | 111 ++-- src/plugin-sdk/index.ts | 29 +- src/plugins/loader.ts | 5 +- src/plugins/registry.ts | 2 + .../runtime/gateway-request-scope.test.ts | 23 + src/plugins/runtime/gateway-request-scope.ts | 46 ++ src/plugins/runtime/index.ts | 20 +- src/plugins/runtime/types.ts | 55 ++ src/plugins/slots.ts | 2 + src/plugins/types.ts | 7 +- test/scripts/ios-team-id.test.ts | 2 +- 44 files changed, 2308 insertions(+), 103 deletions(-) create mode 100644 src/context-engine/context-engine.test.ts create mode 100644 src/context-engine/index.ts create mode 100644 src/context-engine/init.ts create mode 100644 src/context-engine/legacy.ts create mode 100644 src/context-engine/registry.ts create mode 100644 src/context-engine/types.ts create mode 100644 src/cron/isolated-agent.model-formatting.test.ts create mode 100644 src/plugins/runtime/gateway-request-scope.test.ts create mode 100644 src/plugins/runtime/gateway-request-scope.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 5cb86a8bd78..0621c2389de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai - Tools/Diffs guidance: restore a short system-prompt hint for enabled diffs while keeping the detailed instructions in the companion skill, so diffs usage guidance stays out of user-prompt space. (#36904) thanks @gumadeiras. - Telegram/ACP topic bindings: accept Telegram Mac Unicode dash option prefixes in `/acp spawn`, support Telegram topic thread binding (`--thread here|auto`), route bound-topic follow-ups to ACP sessions, add actionable Telegram approval buttons with prefixed approval-id resolution, and pin successful bind confirmations in-topic. (#36683) Thanks @huntharo. - Hooks/Compaction lifecycle: emit `session:compact:before` and `session:compact:after` internal events plus plugin compaction callbacks with session/count metadata, so automations can react to compaction runs consistently. (#16788) thanks @vincentkoc. +- Agents/context engine plugin interface: add `ContextEngine` plugin slot with full lifecycle hooks (`bootstrap`, `ingest`, `assemble`, `compact`, `afterTurn`, `prepareSubagentSpawn`, `onSubagentEnded`), slot-based registry with config-driven resolution, `LegacyContextEngine` wrapper preserving existing compaction behavior, scoped subagent runtime for plugin runtimes via `AsyncLocalStorage`, and `sessions.get` gateway method. Enables plugins like `lossless-claw` to provide alternative context management strategies without modifying core compaction logic. Zero behavior change when no context engine plugin is configured. (#22201) thanks @jalehman. - CLI: make read-only SecretRef status flows degrade safely (#37023) thanks @joshavant. ### Breaking diff --git a/extensions/diffs/index.test.ts b/extensions/diffs/index.test.ts index 84ce5d9fe87..1723fc3c73d 100644 --- a/extensions/diffs/index.test.ts +++ b/extensions/diffs/index.test.ts @@ -30,6 +30,7 @@ describe("diffs plugin registration", () => { registerService() {}, registerProvider() {}, registerCommand() {}, + registerContextEngine() {}, resolvePath(input: string) { return input; }, @@ -105,6 +106,7 @@ describe("diffs plugin registration", () => { registerService() {}, registerProvider() {}, registerCommand() {}, + registerContextEngine() {}, resolvePath(input: string) { return input; }, diff --git a/extensions/diffs/src/tool.test.ts b/extensions/diffs/src/tool.test.ts index db66255cba6..ba72c011c76 100644 --- a/extensions/diffs/src/tool.test.ts +++ b/extensions/diffs/src/tool.test.ts @@ -441,6 +441,7 @@ function createApi(): OpenClawPluginApi { registerService() {}, registerProvider() {}, registerCommand() {}, + registerContextEngine() {}, resolvePath(input: string) { return input; }, diff --git a/extensions/lobster/src/lobster-tool.test.ts b/extensions/lobster/src/lobster-tool.test.ts index 970c2ad4fd1..40e9a0b64e8 100644 --- a/extensions/lobster/src/lobster-tool.test.ts +++ b/extensions/lobster/src/lobster-tool.test.ts @@ -46,6 +46,7 @@ function fakeApi(overrides: Partial = {}): OpenClawPluginApi registerHook() {}, registerHttpRoute() {}, registerCommand() {}, + registerContextEngine() {}, on() {}, resolvePath: (p) => p, ...overrides, diff --git a/extensions/phone-control/index.test.ts b/extensions/phone-control/index.test.ts index a4d05e3d431..9259092b153 100644 --- a/extensions/phone-control/index.test.ts +++ b/extensions/phone-control/index.test.ts @@ -39,6 +39,7 @@ function createApi(params: { registerCli() {}, registerService() {}, registerProvider() {}, + registerContextEngine() {}, registerCommand: params.registerCommand, resolvePath(input: string) { return input; diff --git a/extensions/test-utils/plugin-runtime-mock.ts b/extensions/test-utils/plugin-runtime-mock.ts index f01c87d6c77..0526c6bf591 100644 --- a/extensions/test-utils/plugin-runtime-mock.ts +++ b/extensions/test-utils/plugin-runtime-mock.ts @@ -242,6 +242,13 @@ export function createPluginRuntimeMock(overrides: DeepPartial = state: { resolveStateDir: vi.fn(() => "/tmp/openclaw"), }, + subagent: { + run: vi.fn(), + waitForRun: vi.fn(), + getSessionMessages: vi.fn(), + getSession: vi.fn(), + deleteSession: vi.fn(), + }, }; return mergeDeep(base, overrides); diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 2bfc9e0a5ce..335c3a0e7d9 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -11,6 +11,10 @@ import { resolveHeartbeatPrompt } from "../../auto-reply/heartbeat.js"; import type { ReasoningLevel, ThinkLevel } from "../../auto-reply/thinking.js"; import { resolveChannelCapabilities } from "../../config/channel-capabilities.js"; import type { OpenClawConfig } from "../../config/config.js"; +import { + ensureContextEnginesInitialized, + resolveContextEngine, +} from "../../context-engine/index.js"; import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { getMachineDisplayName } from "../../infra/machine-name.js"; import { generateSecureToken } from "../../infra/secure-random.js"; @@ -29,8 +33,9 @@ import { resolveSessionAgentIds } from "../agent-scope.js"; import type { ExecElevatedDefaults } from "../bash-tools.js"; import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../bootstrap-files.js"; import { listChannelSupportedActions, resolveChannelMessageToolHints } from "../channel-tools.js"; +import { resolveContextWindowInfo } from "../context-window-guard.js"; import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js"; -import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js"; +import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js"; import { resolveOpenClawDocsPath } from "../docs-path.js"; import { getApiKeyForModel, resolveModelAuthMode } from "../model-auth.js"; import { ensureOpenClawModelsJson } from "../models-config.js"; @@ -115,6 +120,8 @@ export type CompactEmbeddedPiSessionParams = { reasoningLevel?: ReasoningLevel; bashElevated?: ExecElevatedDefaults; customInstructions?: string; + tokenBudget?: number; + force?: boolean; trigger?: "overflow" | "manual"; diagId?: string; attempt?: number; @@ -846,6 +853,49 @@ export async function compactEmbeddedPiSession( const enqueueGlobal = params.enqueue ?? ((task, opts) => enqueueCommandInLane(globalLane, task, opts)); return enqueueCommandInLane(sessionLane, () => - enqueueGlobal(async () => compactEmbeddedPiSessionDirect(params)), + enqueueGlobal(async () => { + ensureContextEnginesInitialized(); + const contextEngine = await resolveContextEngine(params.config); + try { + // Resolve token budget from model context window so the context engine + // knows the compaction target. The runner's afterTurn path passes this + // automatically, but the /compact command path needs to compute it here. + const ceProvider = (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER; + const ceModelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL; + const agentDir = params.agentDir ?? resolveOpenClawAgentDir(); + const { model: ceModel } = resolveModel(ceProvider, ceModelId, agentDir, params.config); + const ceCtxInfo = resolveContextWindowInfo({ + cfg: params.config, + provider: ceProvider, + modelId: ceModelId, + modelContextWindow: ceModel?.contextWindow, + defaultTokens: DEFAULT_CONTEXT_TOKENS, + }); + const result = await contextEngine.compact({ + sessionId: params.sessionId, + sessionFile: params.sessionFile, + tokenBudget: ceCtxInfo.tokens, + customInstructions: params.customInstructions, + force: params.trigger === "manual", + legacyParams: params as Record, + }); + return { + ok: result.ok, + compacted: result.compacted, + reason: result.reason, + result: result.result + ? { + summary: result.result.summary ?? "", + firstKeptEntryId: result.result.firstKeptEntryId ?? "", + tokensBefore: result.result.tokensBefore, + tokensAfter: result.result.tokensAfter, + details: result.result.details, + } + : undefined, + }; + } finally { + await contextEngine.dispose?.(); + } + }), ); } diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts index 1f8f8032f7e..19b4a81d279 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts @@ -54,6 +54,22 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { ); }); + it("passes resolved auth profile into run attempts for context-engine afterTurn propagation", async () => { + mockedRunEmbeddedAttempt.mockResolvedValueOnce(makeAttemptResult({ promptError: null })); + + await runEmbeddedPiAgent({ + ...overflowBaseRunParams, + runId: "run-auth-profile-passthrough", + }); + + expect(mockedRunEmbeddedAttempt).toHaveBeenCalledWith( + expect.objectContaining({ + authProfileId: "test-profile", + authProfileIdSource: "auto", + }), + ); + }); + it("passes trigger=overflow when retrying compaction after context overflow", async () => { mockOverflowRetrySuccess({ runEmbeddedAttempt: mockedRunEmbeddedAttempt, diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index a5b799471d2..52faf8514b7 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -1,6 +1,10 @@ import { randomBytes } from "node:crypto"; import fs from "node:fs/promises"; import type { ThinkLevel } from "../../auto-reply/thinking.js"; +import { + ensureContextEnginesInitialized, + resolveContextEngine, +} from "../../context-engine/index.js"; import { generateSecureToken } from "../../infra/secure-random.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import type { PluginHookBeforeAgentStartResult } from "../../plugins/types.js"; @@ -50,7 +54,6 @@ import { } from "../pi-embedded-helpers.js"; import { derivePromptTokens, normalizeUsage, type UsageLike } from "../usage.js"; import { redactRunIdentifier, resolveRunWorkspaceDir } from "../workspace-run.js"; -import { compactEmbeddedPiSessionDirect } from "./compact.js"; import { resolveGlobalLane, resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; import { resolveModel } from "./model.js"; @@ -737,6 +740,10 @@ export async function runEmbeddedPiAgent( agentDir, }); }; + // Resolve the context engine once and reuse across retries to avoid + // repeated initialization/connection overhead per attempt. + ensureContextEnginesInitialized(); + const contextEngine = await resolveContextEngine(params.config); try { let authRetryPending = false; // Hoisted so the retry-limit error path can use the most recent API total. @@ -806,6 +813,8 @@ export async function runEmbeddedPiAgent( workspaceDir: resolvedWorkspace, agentDir, config: params.config, + contextEngine, + contextTokenBudget: ctxInfo.tokens, skillsSnapshot: params.skillsSnapshot, prompt, images: params.images, @@ -813,6 +822,8 @@ export async function runEmbeddedPiAgent( provider, modelId, model, + authProfileId: lastProfileId, + authProfileIdSource: lockedProfileId ? "user" : "auto", authStorage, modelRegistry, agentId: workspaceResolution.agentId, @@ -955,31 +966,36 @@ export async function runEmbeddedPiAgent( log.warn( `context overflow detected (attempt ${overflowCompactionAttempts}/${MAX_OVERFLOW_COMPACTION_ATTEMPTS}); attempting auto-compaction for ${provider}/${modelId}`, ); - const compactResult = await compactEmbeddedPiSessionDirect({ + const compactResult = await contextEngine.compact({ sessionId: params.sessionId, - sessionKey: params.sessionKey, - messageChannel: params.messageChannel, - messageProvider: params.messageProvider, - agentAccountId: params.agentAccountId, - authProfileId: lastProfileId, sessionFile: params.sessionFile, - workspaceDir: resolvedWorkspace, - agentDir, - config: params.config, - skillsSnapshot: params.skillsSnapshot, - senderIsOwner: params.senderIsOwner, - provider, - model: modelId, - runId: params.runId, - thinkLevel, - reasoningLevel: params.reasoningLevel, - bashElevated: params.bashElevated, - extraSystemPrompt: params.extraSystemPrompt, - ownerNumbers: params.ownerNumbers, - trigger: "overflow", - diagId: overflowDiagId, - attempt: overflowCompactionAttempts, - maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS, + tokenBudget: ctxInfo.tokens, + force: true, + compactionTarget: "budget", + legacyParams: { + sessionKey: params.sessionKey, + messageChannel: params.messageChannel, + messageProvider: params.messageProvider, + agentAccountId: params.agentAccountId, + authProfileId: lastProfileId, + workspaceDir: resolvedWorkspace, + agentDir, + config: params.config, + skillsSnapshot: params.skillsSnapshot, + senderIsOwner: params.senderIsOwner, + provider, + model: modelId, + runId: params.runId, + thinkLevel, + reasoningLevel: params.reasoningLevel, + bashElevated: params.bashElevated, + extraSystemPrompt: params.extraSystemPrompt, + ownerNumbers: params.ownerNumbers, + trigger: "overflow", + diagId: overflowDiagId, + attempt: overflowCompactionAttempts, + maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS, + }, }); if (compactResult.compacted) { autoCompactionCount += 1; @@ -1412,6 +1428,7 @@ export async function runEmbeddedPiAgent( }; } } finally { + await contextEngine.dispose?.(); stopCopilotRefreshTimer(); process.chdir(prevCwd); } diff --git a/src/agents/pi-embedded-runner/run/attempt.test.ts b/src/agents/pi-embedded-runner/run/attempt.test.ts index 4f637a464c2..c4878617c5c 100644 --- a/src/agents/pi-embedded-runner/run/attempt.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.test.ts @@ -1,8 +1,10 @@ import { describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../../config/config.js"; import { + buildAfterTurnLegacyCompactionParams, composeSystemPromptWithHookContext, isOllamaCompatProvider, + prependSystemPromptAddition, resolveAttemptFsWorkspaceOnly, resolveOllamaBaseUrlForRun, resolveOllamaCompatNumCtxEnabled, @@ -180,7 +182,6 @@ describe("resolveAttemptFsWorkspaceOnly", () => { ).toBe(false); }); }); - describe("wrapStreamFnTrimToolCallNames", () => { function createFakeStream(params: { events: unknown[]; resultMessage: unknown }): { result: () => Promise; @@ -548,3 +549,54 @@ describe("decodeHtmlEntitiesInObject", () => { expect(decodeHtmlEntitiesInObject("'world'")).toBe("'world'"); }); }); +describe("prependSystemPromptAddition", () => { + it("prepends context-engine addition to the system prompt", () => { + const result = prependSystemPromptAddition({ + systemPrompt: "base system", + systemPromptAddition: "extra behavior", + }); + + expect(result).toBe("extra behavior\n\nbase system"); + }); + + it("returns the original system prompt when no addition is provided", () => { + const result = prependSystemPromptAddition({ + systemPrompt: "base system", + }); + + expect(result).toBe("base system"); + }); +}); + +describe("buildAfterTurnLegacyCompactionParams", () => { + it("includes resolved auth profile fields for context-engine afterTurn compaction", () => { + const legacy = buildAfterTurnLegacyCompactionParams({ + attempt: { + sessionKey: "agent:main:session:abc", + messageChannel: "slack", + messageProvider: "slack", + agentAccountId: "acct-1", + authProfileId: "openai:p1", + config: { plugins: { slots: { contextEngine: "lossless-claw" } } } as OpenClawConfig, + skillsSnapshot: undefined, + senderIsOwner: true, + provider: "openai-codex", + modelId: "gpt-5.3-codex", + thinkLevel: "off", + reasoningLevel: "on", + extraSystemPrompt: "extra", + ownerNumbers: ["+15555550123"], + }, + workspaceDir: "/tmp/workspace", + agentDir: "/tmp/agent", + }); + + expect(legacy).toMatchObject({ + authProfileId: "openai:p1", + provider: "openai-codex", + model: "gpt-5.3-codex", + workspaceDir: "/tmp/workspace", + agentDir: "/tmp/agent", + }); + }); +}); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 4a75c297a26..61159c13357 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -63,6 +63,7 @@ import { } from "../../pi-embedded-helpers.js"; import { subscribeEmbeddedPiSession } from "../../pi-embedded-subscribe.js"; import { createPreparedEmbeddedPiSettingsManager } from "../../pi-project-settings.js"; +import { applyPiAutoCompactionGuard } from "../../pi-settings.js"; import { toClientToolDefinitions } from "../../pi-tool-definition-adapter.js"; import { createOpenClawCodingTools, resolveToolLoopDetectionConfig } from "../../pi-tools.js"; import { resolveSandboxContext } from "../../sandbox.js"; @@ -90,6 +91,7 @@ import { resolveTranscriptPolicy } from "../../transcript-policy.js"; import { DEFAULT_BOOTSTRAP_FILENAME } from "../../workspace.js"; import { isRunnerAbortError } from "../abort.js"; import { appendCacheTtlTimestamp, isCacheTtlEligibleProvider } from "../cache-ttl.js"; +import type { CompactEmbeddedPiSessionParams } from "../compact.js"; import { buildEmbeddedExtensionFactories } from "../extensions.js"; import { applyExtraParamsToAgent } from "../extra-params.js"; import { @@ -617,6 +619,60 @@ export function resolveAttemptFsWorkspaceOnly(params: { }); } +export function prependSystemPromptAddition(params: { + systemPrompt: string; + systemPromptAddition?: string; +}): string { + if (!params.systemPromptAddition) { + return params.systemPrompt; + } + return `${params.systemPromptAddition}\n\n${params.systemPrompt}`; +} + +/** Build legacy compaction params passed into context-engine afterTurn hooks. */ +export function buildAfterTurnLegacyCompactionParams(params: { + attempt: Pick< + EmbeddedRunAttemptParams, + | "sessionKey" + | "messageChannel" + | "messageProvider" + | "agentAccountId" + | "config" + | "skillsSnapshot" + | "senderIsOwner" + | "provider" + | "modelId" + | "thinkLevel" + | "reasoningLevel" + | "bashElevated" + | "extraSystemPrompt" + | "ownerNumbers" + | "authProfileId" + >; + workspaceDir: string; + agentDir: string; +}): Partial { + return { + sessionKey: params.attempt.sessionKey, + messageChannel: params.attempt.messageChannel, + messageProvider: params.attempt.messageProvider, + agentAccountId: params.attempt.agentAccountId, + authProfileId: params.attempt.authProfileId, + workspaceDir: params.workspaceDir, + agentDir: params.agentDir, + config: params.attempt.config, + skillsSnapshot: params.attempt.skillsSnapshot, + senderIsOwner: params.attempt.senderIsOwner, + provider: params.attempt.provider, + model: params.attempt.modelId, + thinkLevel: params.attempt.thinkLevel, + reasoningLevel: params.attempt.reasoningLevel, + bashElevated: params.attempt.bashElevated, + extraSystemPrompt: params.attempt.extraSystemPrompt, + ownerNumbers: params.attempt.ownerNumbers, + }; +} + function summarizeMessagePayload(msg: AgentMessage): { textChars: number; imageBlocks: number } { const content = (msg as { content?: unknown }).content; if (typeof content === "string") { @@ -1025,6 +1081,17 @@ export async function runEmbeddedAttempt( }); trackSessionManagerAccess(params.sessionFile); + if (hadSessionFile && params.contextEngine?.bootstrap) { + try { + await params.contextEngine.bootstrap({ + sessionId: params.sessionId, + sessionFile: params.sessionFile, + }); + } catch (bootstrapErr) { + log.warn(`context engine bootstrap failed: ${String(bootstrapErr)}`); + } + } + await prepareSessionManagerForRun({ sessionManager, sessionFile: params.sessionFile, @@ -1038,6 +1105,10 @@ export async function runEmbeddedAttempt( agentDir, cfg: params.config, }); + applyPiAutoCompactionGuard({ + settingsManager, + contextEngineInfo: params.contextEngine?.info, + }); // Sets compaction/pruning runtime state and returns extension factories // that must be passed to the resource loader for the safeguard to be active. @@ -1336,6 +1407,33 @@ export async function runEmbeddedAttempt( if (limited.length > 0) { activeSession.agent.replaceMessages(limited); } + + if (params.contextEngine) { + try { + const assembled = await params.contextEngine.assemble({ + sessionId: params.sessionId, + messages: activeSession.messages, + tokenBudget: params.contextTokenBudget, + }); + if (assembled.messages !== activeSession.messages) { + activeSession.agent.replaceMessages(assembled.messages); + } + if (assembled.systemPromptAddition) { + systemPromptText = prependSystemPromptAddition({ + systemPrompt: systemPromptText, + systemPromptAddition: assembled.systemPromptAddition, + }); + applySystemPromptOverrideToSession(activeSession, systemPromptText); + log.debug( + `context engine: prepended system prompt addition (${assembled.systemPromptAddition.length} chars)`, + ); + } + } catch (assembleErr) { + log.warn( + `context engine assemble failed, using pipeline messages: ${String(assembleErr)}`, + ); + } + } } catch (err) { await flushPendingToolResultsAfterIdle({ agent: activeSession?.agent, @@ -1515,6 +1613,7 @@ export async function runEmbeddedAttempt( let promptError: unknown = null; let promptErrorSource: "prompt" | "compaction" | null = null; + const prePromptMessageCount = activeSession.messages.length; try { const promptStartedAt = Date.now(); @@ -1772,6 +1871,56 @@ export async function runEmbeddedAttempt( } } + // Let the active context engine run its post-turn lifecycle. + if (params.contextEngine) { + const afterTurnLegacyCompactionParams = buildAfterTurnLegacyCompactionParams({ + attempt: params, + workspaceDir: effectiveWorkspace, + agentDir, + }); + + if (typeof params.contextEngine.afterTurn === "function") { + try { + await params.contextEngine.afterTurn({ + sessionId: sessionIdUsed, + sessionFile: params.sessionFile, + messages: messagesSnapshot, + prePromptMessageCount, + tokenBudget: params.contextTokenBudget, + legacyCompactionParams: afterTurnLegacyCompactionParams, + }); + } catch (afterTurnErr) { + log.warn(`context engine afterTurn failed: ${String(afterTurnErr)}`); + } + } else { + // Fallback: ingest new messages individually + const newMessages = messagesSnapshot.slice(prePromptMessageCount); + if (newMessages.length > 0) { + if (typeof params.contextEngine.ingestBatch === "function") { + try { + await params.contextEngine.ingestBatch({ + sessionId: sessionIdUsed, + messages: newMessages, + }); + } catch (ingestErr) { + log.warn(`context engine ingest failed: ${String(ingestErr)}`); + } + } else { + for (const msg of newMessages) { + try { + await params.contextEngine.ingest({ + sessionId: sessionIdUsed, + message: msg, + }); + } catch (ingestErr) { + log.warn(`context engine ingest failed: ${String(ingestErr)}`); + } + } + } + } + } + } + cacheTrace?.recordStage("session:after", { messages: messagesSnapshot, note: timedOutDuringCompaction diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index 35251edd807..dff5aa6f251 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -3,6 +3,7 @@ import type { Api, AssistantMessage, Model } from "@mariozechner/pi-ai"; import type { AuthStorage, ModelRegistry } from "@mariozechner/pi-coding-agent"; import type { ThinkLevel } from "../../../auto-reply/thinking.js"; import type { SessionSystemPromptReport } from "../../../config/sessions/types.js"; +import type { ContextEngine } from "../../../context-engine/types.js"; import type { PluginHookBeforeAgentStartResult } from "../../../plugins/types.js"; import type { MessagingToolSend } from "../../pi-embedded-messaging.js"; import type { NormalizedUsage } from "../../usage.js"; @@ -14,6 +15,14 @@ type EmbeddedRunAttemptBase = Omit< >; export type EmbeddedRunAttemptParams = EmbeddedRunAttemptBase & { + /** Pluggable context engine for ingest/assemble/compact lifecycle. */ + contextEngine?: ContextEngine; + /** Resolved model context window in tokens for assemble/compact budgeting. */ + contextTokenBudget?: number; + /** Auth profile resolved for this attempt's provider/model call. */ + authProfileId?: string; + /** Source for the resolved auth profile (user-locked or automatic). */ + authProfileIdSource?: "auto" | "user"; provider: string; modelId: string; model: Model; diff --git a/src/agents/pi-settings.ts b/src/agents/pi-settings.ts index 3ea4c5d5b51..f1b66c6ea61 100644 --- a/src/agents/pi-settings.ts +++ b/src/agents/pi-settings.ts @@ -1,4 +1,5 @@ import type { OpenClawConfig } from "../config/config.js"; +import type { ContextEngineInfo } from "../context-engine/types.js"; export const DEFAULT_PI_COMPACTION_RESERVE_TOKENS_FLOOR = 20_000; @@ -11,6 +12,7 @@ type PiSettingsManagerLike = { keepRecentTokens?: number; }; }) => void; + setCompactionEnabled?: (enabled: boolean) => void; }; export function ensurePiCompactionReserveTokens(params: { @@ -95,3 +97,26 @@ export function applyPiCompactionSettingsFromConfig(params: { }, }; } + +/** Decide whether Pi's internal auto-compaction should be disabled for this run. */ +export function shouldDisablePiAutoCompaction(params: { + contextEngineInfo?: ContextEngineInfo; +}): boolean { + return params.contextEngineInfo?.ownsCompaction === true; +} + +/** Disable Pi auto-compaction via settings when a context engine owns compaction. */ +export function applyPiAutoCompactionGuard(params: { + settingsManager: PiSettingsManagerLike; + contextEngineInfo?: ContextEngineInfo; +}): { supported: boolean; disabled: boolean } { + const disable = shouldDisablePiAutoCompaction({ + contextEngineInfo: params.contextEngineInfo, + }); + const hasMethod = typeof params.settingsManager.setCompactionEnabled === "function"; + if (!disable || !hasMethod) { + return { supported: hasMethod, disabled: false }; + } + params.settingsManager.setCompactionEnabled!(false); + return { supported: true, disabled: true }; +} diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 906a8424ff8..e2453bcc0fd 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -8,8 +8,12 @@ import { resolveStorePath, type SessionEntry, } from "../config/sessions.js"; +import { ensureContextEnginesInitialized } from "../context-engine/init.js"; +import { resolveContextEngine } from "../context-engine/registry.js"; +import type { SubagentEndReason } from "../context-engine/types.js"; import { callGateway } from "../gateway/call.js"; import { onAgentEvent } from "../infra/agent-events.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; import { defaultRuntime } from "../runtime.js"; import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js"; import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js"; @@ -54,6 +58,7 @@ import type { SubagentRunRecord } from "./subagent-registry.types.js"; import { resolveAgentTimeoutMs } from "./timeout.js"; export type { SubagentRunRecord } from "./subagent-registry.types.js"; +const log = createSubsystemLogger("agents/subagent-registry"); const subagentRuns = new Map(); let sweeper: NodeJS.Timeout | null = null; @@ -305,6 +310,22 @@ function schedulePendingLifecycleError(params: { runId: string; endedAt: number; }); } +async function notifyContextEngineSubagentEnded(params: { + childSessionKey: string; + reason: SubagentEndReason; +}) { + try { + ensureContextEnginesInitialized(); + const engine = await resolveContextEngine(loadConfig()); + if (!engine.onSubagentEnded) { + return; + } + await engine.onSubagentEnded(params); + } catch (err) { + log.warn("context-engine onSubagentEnded failed (best-effort)", { err }); + } +} + function suppressAnnounceForSteerRestart(entry?: SubagentRunRecord) { return entry?.suppressAnnounceReason === "steer-restart"; } @@ -690,6 +711,10 @@ async function sweepSubagentRuns() { continue; } clearPendingLifecycleError(runId); + void notifyContextEngineSubagentEnded({ + childSessionKey: entry.childSessionKey, + reason: "swept", + }); subagentRuns.delete(runId); mutated = true; // Archive/purge is terminal for the run record; remove any retained attachments too. @@ -894,9 +919,8 @@ async function finalizeSubagentCleanup( return; } - // Allow retry on the next wake if announce was deferred or failed. - // Applies to both keep/delete cleanup modes so delete-runs are only removed - // after a successful announce (or terminal give-up). + // Keep both cleanup modes retryable after deferred/failed announce. + // Delete-mode is finalized only after announce succeeds or give-up triggers. entry.cleanupHandled = false; // Clear the in-flight resume marker so the scheduled retry can run again. resumedRuns.delete(runId); @@ -936,11 +960,19 @@ function completeCleanupBookkeeping(params: { }) { if (params.cleanup === "delete") { clearPendingLifecycleError(params.runId); + void notifyContextEngineSubagentEnded({ + childSessionKey: params.entry.childSessionKey, + reason: "deleted", + }); subagentRuns.delete(params.runId); persistSubagentRuns(); retryDeferredCompletedAnnounces(params.runId); return; } + void notifyContextEngineSubagentEnded({ + childSessionKey: params.entry.childSessionKey, + reason: "completed", + }); params.entry.cleanupCompletedAt = params.completedAt; persistSubagentRuns(); retryDeferredCompletedAnnounces(params.runId); @@ -1248,6 +1280,13 @@ export function addSubagentRunForTests(entry: SubagentRunRecord) { export function releaseSubagentRun(runId: string) { clearPendingLifecycleError(runId); + const entry = subagentRuns.get(runId); + if (entry) { + void notifyContextEngineSubagentEnded({ + childSessionKey: entry.childSessionKey, + reason: "released", + }); + } const didDelete = subagentRuns.delete(runId); if (didDelete) { persistSubagentRuns(); diff --git a/src/config/config-misc.test.ts b/src/config/config-misc.test.ts index 29efaa2b136..b46b5b49766 100644 --- a/src/config/config-misc.test.ts +++ b/src/config/config-misc.test.ts @@ -31,6 +31,19 @@ describe("$schema key in config (#14998)", () => { }); }); +describe("plugins.slots.contextEngine", () => { + it("accepts a contextEngine slot id", () => { + const result = OpenClawSchema.safeParse({ + plugins: { + slots: { + contextEngine: "my-context-engine", + }, + }, + }); + expect(result.success).toBe(true); + }); +}); + describe("ui.seamColor", () => { it("accepts hex colors", () => { const res = validateConfigObject({ ui: { seamColor: "#FF4500" } }); diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 911d08620e2..39a43d46acb 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -927,6 +927,8 @@ export const FIELD_HELP: Record = { "Selects which plugins own exclusive runtime slots such as memory so only one plugin provides that capability. Use explicit slot ownership to avoid overlapping providers with conflicting behavior.", "plugins.slots.memory": 'Select the active memory plugin by id, or "none" to disable memory plugins.', + "plugins.slots.contextEngine": + "Selects the active context engine plugin by id so one plugin provides context orchestration behavior.", "plugins.entries": "Per-plugin settings keyed by plugin ID including enablement and plugin-specific runtime configuration payloads. Use this for scoped plugin tuning without changing global loader policy.", "plugins.entries.*.enabled": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index 9454df66fb1..64d444aab47 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -817,6 +817,7 @@ export const FIELD_LABELS: Record = { "plugins.load.paths": "Plugin Load Paths", "plugins.slots": "Plugin Slots", "plugins.slots.memory": "Memory Plugin", + "plugins.slots.contextEngine": "Context Engine Plugin", "plugins.entries": "Plugin Entries", "plugins.entries.*.enabled": "Plugin Enabled", "plugins.entries.*.hooks": "Plugin Hook Policy", diff --git a/src/config/types.plugins.ts b/src/config/types.plugins.ts index 5244795d51e..323946dd541 100644 --- a/src/config/types.plugins.ts +++ b/src/config/types.plugins.ts @@ -10,6 +10,8 @@ export type PluginEntryConfig = { export type PluginSlotsConfig = { /** Select which plugin owns the memory slot ("none" disables memory plugins). */ memory?: string; + /** Select which plugin owns the context-engine slot. */ + contextEngine?: string; }; export type PluginsLoadConfig = { diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index 4d49e0428e4..033044238e8 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -829,6 +829,7 @@ export const OpenClawSchema = z slots: z .object({ memory: z.string().optional(), + contextEngine: z.string().optional(), }) .strict() .optional(), diff --git a/src/context-engine/context-engine.test.ts b/src/context-engine/context-engine.test.ts new file mode 100644 index 00000000000..022fdc14cc8 --- /dev/null +++ b/src/context-engine/context-engine.test.ts @@ -0,0 +1,337 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { describe, expect, it, beforeEach } from "vitest"; +// --------------------------------------------------------------------------- +// We dynamically import the registry so we can get a fresh module per test +// group when needed. For most groups we use the shared singleton directly. +// --------------------------------------------------------------------------- +import { LegacyContextEngine, registerLegacyContextEngine } from "./legacy.js"; +import { + registerContextEngine, + getContextEngineFactory, + listContextEngineIds, + resolveContextEngine, +} from "./registry.js"; +import type { + ContextEngine, + ContextEngineInfo, + AssembleResult, + CompactResult, + IngestResult, +} from "./types.js"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Build a config object with a contextEngine slot for testing. */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function configWithSlot(engineId: string): any { + return { plugins: { slots: { contextEngine: engineId } } }; +} + +function makeMockMessage(role: "user" | "assistant" = "user", text = "hello"): AgentMessage { + return { role, content: text, timestamp: Date.now() } as AgentMessage; +} + +/** A minimal mock engine that satisfies the ContextEngine interface. */ +class MockContextEngine implements ContextEngine { + readonly info: ContextEngineInfo = { + id: "mock", + name: "Mock Engine", + version: "0.0.1", + }; + + async ingest(_params: { + sessionId: string; + message: AgentMessage; + isHeartbeat?: boolean; + }): Promise { + return { ingested: true }; + } + + async assemble(params: { + sessionId: string; + messages: AgentMessage[]; + tokenBudget?: number; + }): Promise { + return { + messages: params.messages, + estimatedTokens: 42, + systemPromptAddition: "mock system addition", + }; + } + + async compact(_params: { + sessionId: string; + sessionFile: string; + tokenBudget?: number; + compactionTarget?: "budget" | "threshold"; + customInstructions?: string; + legacyParams?: Record; + }): Promise { + return { + ok: true, + compacted: true, + reason: "mock compaction", + result: { + summary: "mock summary", + tokensBefore: 100, + tokensAfter: 50, + }, + }; + } + + async dispose(): Promise { + // no-op + } +} + +// ═══════════════════════════════════════════════════════════════════════════ +// 1. Engine contract tests +// ═══════════════════════════════════════════════════════════════════════════ + +describe("Engine contract tests", () => { + it("a mock engine implementing ContextEngine can be registered and resolved", async () => { + const factory = () => new MockContextEngine(); + registerContextEngine("mock", factory); + + const resolved = getContextEngineFactory("mock"); + expect(resolved).toBe(factory); + + const engine = await resolved!(); + expect(engine).toBeInstanceOf(MockContextEngine); + expect(engine.info.id).toBe("mock"); + }); + + it("ingest() returns IngestResult with ingested boolean", async () => { + const engine = new MockContextEngine(); + const result = await engine.ingest({ + sessionId: "s1", + message: makeMockMessage(), + }); + + expect(result).toHaveProperty("ingested"); + expect(typeof result.ingested).toBe("boolean"); + expect(result.ingested).toBe(true); + }); + + it("assemble() returns AssembleResult with messages array and estimatedTokens", async () => { + const engine = new MockContextEngine(); + const msgs = [makeMockMessage(), makeMockMessage("assistant", "world")]; + const result = await engine.assemble({ + sessionId: "s1", + messages: msgs, + }); + + expect(Array.isArray(result.messages)).toBe(true); + expect(result.messages).toHaveLength(2); + expect(typeof result.estimatedTokens).toBe("number"); + expect(result.estimatedTokens).toBe(42); + expect(result.systemPromptAddition).toBe("mock system addition"); + }); + + it("compact() returns CompactResult with ok, compacted, reason, result fields", async () => { + const engine = new MockContextEngine(); + const result = await engine.compact({ + sessionId: "s1", + sessionFile: "/tmp/session.json", + }); + + expect(typeof result.ok).toBe("boolean"); + expect(typeof result.compacted).toBe("boolean"); + expect(result.ok).toBe(true); + expect(result.compacted).toBe(true); + expect(result.reason).toBe("mock compaction"); + expect(result.result).toBeDefined(); + expect(result.result!.summary).toBe("mock summary"); + expect(result.result!.tokensBefore).toBe(100); + expect(result.result!.tokensAfter).toBe(50); + }); + + it("dispose() is callable (optional method)", async () => { + const engine = new MockContextEngine(); + // Should complete without error + await expect(engine.dispose()).resolves.toBeUndefined(); + }); +}); + +// ═══════════════════════════════════════════════════════════════════════════ +// 2. Registry tests +// ═══════════════════════════════════════════════════════════════════════════ + +describe("Registry tests", () => { + it("registerContextEngine() stores a factory", () => { + const factory = () => new MockContextEngine(); + registerContextEngine("reg-test-1", factory); + + expect(getContextEngineFactory("reg-test-1")).toBe(factory); + }); + + it("getContextEngineFactory() returns the factory", () => { + const factory = () => new MockContextEngine(); + registerContextEngine("reg-test-2", factory); + + const retrieved = getContextEngineFactory("reg-test-2"); + expect(retrieved).toBe(factory); + expect(typeof retrieved).toBe("function"); + }); + + it("listContextEngineIds() returns all registered ids", () => { + // Ensure at least our test entries exist + registerContextEngine("reg-test-a", () => new MockContextEngine()); + registerContextEngine("reg-test-b", () => new MockContextEngine()); + + const ids = listContextEngineIds(); + expect(ids).toContain("reg-test-a"); + expect(ids).toContain("reg-test-b"); + expect(Array.isArray(ids)).toBe(true); + }); + + it("registering the same id overwrites the previous factory", () => { + const factory1 = () => new MockContextEngine(); + const factory2 = () => new MockContextEngine(); + + registerContextEngine("reg-overwrite", factory1); + expect(getContextEngineFactory("reg-overwrite")).toBe(factory1); + + registerContextEngine("reg-overwrite", factory2); + expect(getContextEngineFactory("reg-overwrite")).toBe(factory2); + expect(getContextEngineFactory("reg-overwrite")).not.toBe(factory1); + }); +}); + +// ═══════════════════════════════════════════════════════════════════════════ +// 3. Default engine selection +// ═══════════════════════════════════════════════════════════════════════════ + +describe("Default engine selection", () => { + // Ensure both legacy and a custom test engine are registered before these tests. + beforeEach(() => { + // Registration is idempotent (Map.set), so calling again is safe. + registerLegacyContextEngine(); + // Register a lightweight custom stub so we don't need external resources. + registerContextEngine("test-engine", () => { + const engine: ContextEngine = { + info: { id: "test-engine", name: "Custom Test Engine", version: "0.0.0" }, + async ingest() { + return { ingested: true }; + }, + async assemble({ messages }) { + return { messages, estimatedTokens: 0 }; + }, + async compact() { + return { ok: true, compacted: false }; + }, + }; + return engine; + }); + }); + + it("resolveContextEngine() with no config returns the default ('legacy') engine", async () => { + const engine = await resolveContextEngine(); + expect(engine.info.id).toBe("legacy"); + }); + + it("resolveContextEngine() with config contextEngine='legacy' returns legacy engine", async () => { + const engine = await resolveContextEngine(configWithSlot("legacy")); + expect(engine.info.id).toBe("legacy"); + }); + + it("resolveContextEngine() with config contextEngine='test-engine' returns the custom engine", async () => { + const engine = await resolveContextEngine(configWithSlot("test-engine")); + expect(engine.info.id).toBe("test-engine"); + }); +}); + +// ═══════════════════════════════════════════════════════════════════════════ +// 4. Invalid engine fallback +// ═══════════════════════════════════════════════════════════════════════════ + +describe("Invalid engine fallback", () => { + it("resolveContextEngine() with config pointing to unregistered engine throws with helpful error", async () => { + await expect(resolveContextEngine(configWithSlot("nonexistent-engine"))).rejects.toThrow( + /nonexistent-engine/, + ); + }); + + it("error message includes the requested id and available ids", async () => { + // Ensure at least legacy is registered so we see it in the available list + registerLegacyContextEngine(); + + try { + await resolveContextEngine(configWithSlot("does-not-exist")); + // Should not reach here + expect.unreachable("Expected resolveContextEngine to throw"); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + expect(message).toContain("does-not-exist"); + expect(message).toContain("not registered"); + // Should mention available engines + expect(message).toMatch(/Available engines:/); + // At least "legacy" should be listed as available + expect(message).toContain("legacy"); + } + }); +}); + +// ═══════════════════════════════════════════════════════════════════════════ +// 5. LegacyContextEngine parity +// ═══════════════════════════════════════════════════════════════════════════ + +describe("LegacyContextEngine parity", () => { + it("ingest() returns { ingested: false } (no-op)", async () => { + const engine = new LegacyContextEngine(); + const result = await engine.ingest({ + sessionId: "s1", + message: makeMockMessage(), + }); + + expect(result).toEqual({ ingested: false }); + }); + + it("assemble() returns messages as-is (pass-through)", async () => { + const engine = new LegacyContextEngine(); + const messages = [ + makeMockMessage("user", "first"), + makeMockMessage("assistant", "second"), + makeMockMessage("user", "third"), + ]; + + const result = await engine.assemble({ + sessionId: "s1", + messages, + }); + + // Messages should be the exact same array reference (pass-through) + expect(result.messages).toBe(messages); + expect(result.messages).toHaveLength(3); + expect(result.estimatedTokens).toBe(0); + expect(result.systemPromptAddition).toBeUndefined(); + }); + + it("dispose() completes without error", async () => { + const engine = new LegacyContextEngine(); + await expect(engine.dispose()).resolves.toBeUndefined(); + }); +}); + +// ═══════════════════════════════════════════════════════════════════════════ +// 6. Initialization guard +// ═══════════════════════════════════════════════════════════════════════════ + +describe("Initialization guard", () => { + it("ensureContextEnginesInitialized() is idempotent (calling twice does not throw)", async () => { + const { ensureContextEnginesInitialized } = await import("./init.js"); + + expect(() => ensureContextEnginesInitialized()).not.toThrow(); + expect(() => ensureContextEnginesInitialized()).not.toThrow(); + }); + + it("after init, 'legacy' engine is registered", async () => { + const { ensureContextEnginesInitialized } = await import("./init.js"); + ensureContextEnginesInitialized(); + + const ids = listContextEngineIds(); + expect(ids).toContain("legacy"); + }); +}); diff --git a/src/context-engine/index.ts b/src/context-engine/index.ts new file mode 100644 index 00000000000..fa3193d4030 --- /dev/null +++ b/src/context-engine/index.ts @@ -0,0 +1,19 @@ +export type { + ContextEngine, + ContextEngineInfo, + AssembleResult, + CompactResult, + IngestResult, +} from "./types.js"; + +export { + registerContextEngine, + getContextEngineFactory, + listContextEngineIds, + resolveContextEngine, +} from "./registry.js"; +export type { ContextEngineFactory } from "./registry.js"; + +export { LegacyContextEngine, registerLegacyContextEngine } from "./legacy.js"; + +export { ensureContextEnginesInitialized } from "./init.js"; diff --git a/src/context-engine/init.ts b/src/context-engine/init.ts new file mode 100644 index 00000000000..1052e4b3677 --- /dev/null +++ b/src/context-engine/init.ts @@ -0,0 +1,23 @@ +import { registerLegacyContextEngine } from "./legacy.js"; + +/** + * Ensures all built-in context engines are registered exactly once. + * + * The legacy engine is always registered as a safe fallback so that + * `resolveContextEngine()` can resolve the default "legacy" slot without + * callers needing to remember manual registration. + * + * Additional engines are registered by their own plugins via + * `api.registerContextEngine()` during plugin load. + */ +let initialized = false; + +export function ensureContextEnginesInitialized(): void { + if (initialized) { + return; + } + initialized = true; + + // Always available – safe fallback for the "legacy" slot default. + registerLegacyContextEngine(); +} diff --git a/src/context-engine/legacy.ts b/src/context-engine/legacy.ts new file mode 100644 index 00000000000..ba05c9e8b8d --- /dev/null +++ b/src/context-engine/legacy.ts @@ -0,0 +1,115 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { registerContextEngine } from "./registry.js"; +import type { + ContextEngine, + ContextEngineInfo, + AssembleResult, + CompactResult, + IngestResult, +} from "./types.js"; + +/** + * LegacyContextEngine wraps the existing compaction behavior behind the + * ContextEngine interface, preserving 100% backward compatibility. + * + * - ingest: no-op (SessionManager handles message persistence) + * - assemble: pass-through (existing sanitize/validate/limit pipeline in attempt.ts handles this) + * - compact: delegates to compactEmbeddedPiSessionDirect + */ +export class LegacyContextEngine implements ContextEngine { + readonly info: ContextEngineInfo = { + id: "legacy", + name: "Legacy Context Engine", + version: "1.0.0", + }; + + async ingest(_params: { + sessionId: string; + message: AgentMessage; + isHeartbeat?: boolean; + }): Promise { + // No-op: SessionManager handles message persistence in the legacy flow + return { ingested: false }; + } + + async assemble(params: { + sessionId: string; + messages: AgentMessage[]; + tokenBudget?: number; + }): Promise { + // Pass-through: the existing sanitize -> validate -> limit -> repair pipeline + // in attempt.ts handles context assembly for the legacy engine. + // We just return the messages as-is with a rough token estimate. + return { + messages: params.messages, + estimatedTokens: 0, // Caller handles estimation + }; + } + + async afterTurn(_params: { + sessionId: string; + sessionFile: string; + messages: AgentMessage[]; + prePromptMessageCount: number; + autoCompactionSummary?: string; + isHeartbeat?: boolean; + tokenBudget?: number; + legacyCompactionParams?: Record; + }): Promise { + // No-op: legacy flow persists context directly in SessionManager. + } + + async compact(params: { + sessionId: string; + sessionFile: string; + tokenBudget?: number; + force?: boolean; + currentTokenCount?: number; + compactionTarget?: "budget" | "threshold"; + customInstructions?: string; + legacyParams?: Record; + }): Promise { + // Import dynamically to avoid circular dependencies + const { compactEmbeddedPiSessionDirect } = + await import("../agents/pi-embedded-runner/compact.js"); + + // legacyParams carries the full CompactEmbeddedPiSessionParams fields + // set by the caller in run.ts. We spread them and override the fields + // that come from the ContextEngine compact() signature directly. + const lp = params.legacyParams ?? {}; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- legacy bridge: legacyParams is an opaque bag matching CompactEmbeddedPiSessionParams + const result = await compactEmbeddedPiSessionDirect({ + ...lp, + sessionId: params.sessionId, + sessionFile: params.sessionFile, + tokenBudget: params.tokenBudget, + force: params.force, + customInstructions: params.customInstructions, + workspaceDir: (lp.workspaceDir as string) ?? process.cwd(), + } as Parameters[0]); + + return { + ok: result.ok, + compacted: result.compacted, + reason: result.reason, + result: result.result + ? { + summary: result.result.summary, + firstKeptEntryId: result.result.firstKeptEntryId, + tokensBefore: result.result.tokensBefore, + tokensAfter: result.result.tokensAfter, + details: result.result.details, + } + : undefined, + }; + } + + async dispose(): Promise { + // Nothing to clean up for legacy engine + } +} + +export function registerLegacyContextEngine(): void { + registerContextEngine("legacy", () => new LegacyContextEngine()); +} diff --git a/src/context-engine/registry.ts b/src/context-engine/registry.ts new file mode 100644 index 00000000000..49bf34bfbb3 --- /dev/null +++ b/src/context-engine/registry.ts @@ -0,0 +1,67 @@ +import type { OpenClawConfig } from "../config/config.js"; +import { defaultSlotIdForKey } from "../plugins/slots.js"; +import type { ContextEngine } from "./types.js"; + +/** + * A factory that creates a ContextEngine instance. + * Supports async creation for engines that need DB connections etc. + */ +export type ContextEngineFactory = () => ContextEngine | Promise; + +// --------------------------------------------------------------------------- +// Registry (module-level singleton) +// --------------------------------------------------------------------------- + +const _engines = new Map(); + +/** + * Register a context engine implementation under the given id. + */ +export function registerContextEngine(id: string, factory: ContextEngineFactory): void { + _engines.set(id, factory); +} + +/** + * Return the factory for a registered engine, or undefined. + */ +export function getContextEngineFactory(id: string): ContextEngineFactory | undefined { + return _engines.get(id); +} + +/** + * List all registered engine ids. + */ +export function listContextEngineIds(): string[] { + return [..._engines.keys()]; +} + +// --------------------------------------------------------------------------- +// Resolution +// --------------------------------------------------------------------------- + +/** + * Resolve which ContextEngine to use based on plugin slot configuration. + * + * Resolution order: + * 1. `config.plugins.slots.contextEngine` (explicit slot override) + * 2. Default slot value ("legacy") + * + * Throws if the resolved engine id has no registered factory. + */ +export async function resolveContextEngine(config?: OpenClawConfig): Promise { + const slotValue = config?.plugins?.slots?.contextEngine; + const engineId = + typeof slotValue === "string" && slotValue.trim() + ? slotValue.trim() + : defaultSlotIdForKey("contextEngine"); + + const factory = _engines.get(engineId); + if (!factory) { + throw new Error( + `Context engine "${engineId}" is not registered. ` + + `Available engines: ${listContextEngineIds().join(", ") || "(none)"}`, + ); + } + + return factory(); +} diff --git a/src/context-engine/types.ts b/src/context-engine/types.ts new file mode 100644 index 00000000000..525c673b092 --- /dev/null +++ b/src/context-engine/types.ts @@ -0,0 +1,167 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; + +// Result types + +export type AssembleResult = { + /** Ordered messages to use as model context */ + messages: AgentMessage[]; + /** Estimated total tokens in assembled context */ + estimatedTokens: number; + /** Optional context-engine-provided instructions prepended to the runtime system prompt */ + systemPromptAddition?: string; +}; + +export type CompactResult = { + ok: boolean; + compacted: boolean; + reason?: string; + result?: { + summary?: string; + firstKeptEntryId?: string; + tokensBefore: number; + tokensAfter?: number; + details?: unknown; + }; +}; + +export type IngestResult = { + /** Whether the message was ingested (false if duplicate or no-op) */ + ingested: boolean; +}; + +export type IngestBatchResult = { + /** Number of messages ingested from the supplied batch */ + ingestedCount: number; +}; + +export type BootstrapResult = { + /** Whether bootstrap ran and initialized the engine's store */ + bootstrapped: boolean; + /** Number of historical messages imported (if applicable) */ + importedMessages?: number; + /** Optional reason when bootstrap was skipped */ + reason?: string; +}; + +export type ContextEngineInfo = { + id: string; + name: string; + version?: string; + /** True when the engine manages its own compaction lifecycle. */ + ownsCompaction?: boolean; +}; + +export type SubagentSpawnPreparation = { + /** Roll back pre-spawn setup when subagent launch fails. */ + rollback: () => void | Promise; +}; + +export type SubagentEndReason = "deleted" | "completed" | "swept" | "released"; + +/** + * ContextEngine defines the pluggable contract for context management. + * + * Required methods define a generic lifecycle; optional methods allow engines + * to provide additional capabilities (retrieval, lineage, etc.). + */ +export interface ContextEngine { + /** Engine identifier and metadata */ + readonly info: ContextEngineInfo; + + /** + * Initialize engine state for a session, optionally importing historical context. + */ + bootstrap?(params: { sessionId: string; sessionFile: string }): Promise; + + /** + * Ingest a single message into the engine's store. + */ + ingest(params: { + sessionId: string; + message: AgentMessage; + /** True when the message belongs to a heartbeat run. */ + isHeartbeat?: boolean; + }): Promise; + + /** + * Ingest a completed turn batch as a single unit. + */ + ingestBatch?(params: { + sessionId: string; + messages: AgentMessage[]; + /** True when the batch belongs to a heartbeat run. */ + isHeartbeat?: boolean; + }): Promise; + + /** + * Execute optional post-turn lifecycle work after a run attempt completes. + * Engines can use this to persist canonical context and trigger background + * compaction decisions. + */ + afterTurn?(params: { + sessionId: string; + sessionFile: string; + messages: AgentMessage[]; + /** Number of messages that existed before the prompt was sent. */ + prePromptMessageCount: number; + /** Optional auto-compaction summary emitted by the runtime. */ + autoCompactionSummary?: string; + /** True when this turn belongs to a heartbeat run. */ + isHeartbeat?: boolean; + /** Optional model context token budget for proactive compaction. */ + tokenBudget?: number; + /** Backward-compat only: legacy compaction bridge runtime params. */ + legacyCompactionParams?: Record; + }): Promise; + + /** + * Assemble model context under a token budget. + * Returns an ordered set of messages ready for the model. + */ + assemble(params: { + sessionId: string; + messages: AgentMessage[]; + tokenBudget?: number; + }): Promise; + + /** + * Compact context to reduce token usage. + * May create summaries, prune old turns, etc. + */ + compact(params: { + sessionId: string; + sessionFile: string; + tokenBudget?: number; + /** Backward-compat only: force legacy compaction behavior even below threshold. */ + force?: boolean; + /** Optional live token estimate from the caller's active context. */ + currentTokenCount?: number; + /** Controls convergence target; defaults to budget for compatibility. */ + compactionTarget?: "budget" | "threshold"; + customInstructions?: string; + /** Backward-compat only: full params bag for legacy compaction bridge. */ + legacyParams?: Record; + }): Promise; + + /** + * Prepare context-engine-managed subagent state before the child run starts. + * + * Implementations can return a rollback handle that is invoked when spawn + * fails after preparation succeeds. + */ + prepareSubagentSpawn?(params: { + parentSessionKey: string; + childSessionKey: string; + ttlMs?: number; + }): Promise; + + /** + * Notify the context engine that a subagent lifecycle ended. + */ + onSubagentEnded?(params: { childSessionKey: string; reason: SubagentEndReason }): Promise; + + /** + * Dispose of any resources held by the engine. + */ + dispose?(): Promise; +} diff --git a/src/cron/isolated-agent.model-formatting.test.ts b/src/cron/isolated-agent.model-formatting.test.ts new file mode 100644 index 00000000000..bfd751664af --- /dev/null +++ b/src/cron/isolated-agent.model-formatting.test.ts @@ -0,0 +1,541 @@ +import "./isolated-agent.mocks.js"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { loadModelCatalog } from "../agents/model-catalog.js"; +import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; +import { runCronIsolatedAgentTurn } from "./isolated-agent.js"; +import { + makeCfg, + makeJob, + withTempCronHome, + writeSessionStoreEntries, +} from "./isolated-agent.test-harness.js"; +import type { CronJob } from "./types.js"; + +const withTempHome = withTempCronHome; + +function makeDeps() { + return { + sendMessageSlack: vi.fn(), + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; +} + +function mockEmbeddedOk() { + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); +} + +/** + * Extract the provider and model from the last runEmbeddedPiAgent call. + */ +function lastEmbeddedCall(): { provider?: string; model?: string } { + const calls = vi.mocked(runEmbeddedPiAgent).mock.calls; + expect(calls.length).toBeGreaterThan(0); + return calls.at(-1)?.[0] as { provider?: string; model?: string }; +} + +const DEFAULT_MESSAGE = "do it"; + +type TurnOptions = { + cfgOverrides?: Parameters[2]; + jobPayload?: CronJob["payload"]; + sessionKey?: string; + storeEntries?: Record>; +}; + +/** Like runTurn but does NOT assert the embedded agent was called (for error paths). */ +async function runErrorTurn(home: string, options: TurnOptions = {}) { + const storePath = await writeSessionStoreEntries(home, { + "agent:main:main": { + sessionId: "main-session", + updatedAt: Date.now(), + lastProvider: "webchat", + lastTo: "", + }, + ...options.storeEntries, + }); + mockEmbeddedOk(); + + const jobPayload = options.jobPayload ?? { + kind: "agentTurn" as const, + message: DEFAULT_MESSAGE, + deliver: false, + }; + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath, options.cfgOverrides), + deps: makeDeps(), + job: makeJob(jobPayload), + message: DEFAULT_MESSAGE, + sessionKey: options.sessionKey ?? "cron:job-1", + lane: "cron", + }); + + return { res }; +} + +async function runTurn(home: string, options: TurnOptions = {}) { + const storePath = await writeSessionStoreEntries(home, { + "agent:main:main": { + sessionId: "main-session", + updatedAt: Date.now(), + lastProvider: "webchat", + lastTo: "", + }, + ...options.storeEntries, + }); + mockEmbeddedOk(); + + const jobPayload = options.jobPayload ?? { + kind: "agentTurn" as const, + message: DEFAULT_MESSAGE, + deliver: false, + }; + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath, options.cfgOverrides), + deps: makeDeps(), + job: makeJob(jobPayload), + message: DEFAULT_MESSAGE, + sessionKey: options.sessionKey ?? "cron:job-1", + lane: "cron", + }); + + return { res, call: lastEmbeddedCall() }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("cron model formatting and precedence edge cases", () => { + beforeEach(() => { + vi.mocked(runEmbeddedPiAgent).mockClear(); + vi.mocked(loadModelCatalog).mockResolvedValue([]); + }); + + // ------ provider/model string splitting ------ + + describe("parseModelRef formatting", () => { + it("splits standard provider/model", async () => { + await withTempHome(async (home) => { + const { res, call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, model: "openai/gpt-4.1-mini" }, + }); + expect(res.status).toBe("ok"); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1-mini"); + }); + }); + + it("handles leading/trailing whitespace in model string", async () => { + await withTempHome(async (home) => { + const { res, call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: " openai/gpt-4.1-mini ", + }, + }); + expect(res.status).toBe("ok"); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1-mini"); + }); + }); + + it("handles openrouter nested provider paths", async () => { + await withTempHome(async (home) => { + const { res, call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "openrouter/meta-llama/llama-3.3-70b:free", + }, + }); + expect(res.status).toBe("ok"); + expect(call.provider).toBe("openrouter"); + expect(call.model).toBe("meta-llama/llama-3.3-70b:free"); + }); + }); + + it("rejects model with trailing slash (empty model name)", async () => { + await withTempHome(async (home) => { + const { res } = await runErrorTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, model: "openai/" }, + }); + expect(res.status).toBe("error"); + expect(res.error).toMatch(/invalid model/i); + expect(vi.mocked(runEmbeddedPiAgent)).not.toHaveBeenCalled(); + }); + }); + + it("rejects model with leading slash (empty provider)", async () => { + await withTempHome(async (home) => { + const { res } = await runErrorTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, model: "/gpt-4.1-mini" }, + }); + expect(res.status).toBe("error"); + expect(res.error).toMatch(/invalid model/i); + expect(vi.mocked(runEmbeddedPiAgent)).not.toHaveBeenCalled(); + }); + }); + + it("normalizes provider casing", async () => { + await withTempHome(async (home) => { + const { res, call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "OpenAI/gpt-4.1-mini", + }, + }); + expect(res.status).toBe("ok"); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1-mini"); + }); + }); + + it("normalizes anthropic model aliases", async () => { + await withTempHome(async (home) => { + const { res, call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "anthropic/opus-4.5", + }, + }); + expect(res.status).toBe("ok"); + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-opus-4-5"); + }); + }); + + it("normalizes bedrock provider alias", async () => { + await withTempHome(async (home) => { + const { res, call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "bedrock/claude-sonnet-4-5", + }, + }); + expect(res.status).toBe("ok"); + expect(call.provider).toBe("amazon-bedrock"); + }); + }); + }); + + // ------ precedence: job payload > session override > default ------ + + describe("model precedence isolation", () => { + it("job payload model overrides default (anthropic → openai)", async () => { + // Default in makeCfg is anthropic/claude-opus-4-5. + // Job payload sets openai/gpt-4.1-mini. Provider must be openai. + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "openai/gpt-4.1-mini", + }, + }); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1-mini"); + }); + }); + + it("session override applies when no job payload model is present", async () => { + // No model in job payload. Session store has openai override. + // Provider must be openai, not the default anthropic. + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "existing-session", + updatedAt: Date.now(), + providerOverride: "openai", + modelOverride: "gpt-4.1-mini", + }, + }, + }); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1-mini"); + }); + }); + + it("job payload model wins over conflicting session override", async () => { + // Job payload says anthropic. Session says openai. Job must win. + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "anthropic/claude-sonnet-4-5", + deliver: false, + }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "existing-session", + updatedAt: Date.now(), + providerOverride: "openai", + modelOverride: "gpt-4.1-mini", + }, + }, + }); + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-sonnet-4-5"); + }); + }); + + it("falls through to default when no override is present", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + }); + // makeCfg default is anthropic/claude-opus-4-5 + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-opus-4-5"); + }); + }); + }); + + // ------ sequential runs with different overrides (the CI failure pattern) ------ + + describe("sequential model switches (CI failure regression)", () => { + it("openai override → session openai → job anthropic: each step resolves correctly", async () => { + // This reproduces the exact pattern from the CI failure. + // Three sequential calls in one temp home, switching providers. + await withTempHome(async (home) => { + // Step 1: Job payload says openai + vi.mocked(runEmbeddedPiAgent).mockClear(); + const step1 = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "openai/gpt-4.1-mini", + }, + }); + expect(step1.call.provider).toBe("openai"); + expect(step1.call.model).toBe("gpt-4.1-mini"); + + // Step 2: No job model, session store says openai + vi.mocked(runEmbeddedPiAgent).mockClear(); + mockEmbeddedOk(); + const step2 = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "existing-session", + updatedAt: Date.now(), + providerOverride: "openai", + modelOverride: "gpt-4.1-mini", + }, + }, + }); + expect(step2.call.provider).toBe("openai"); + expect(step2.call.model).toBe("gpt-4.1-mini"); + + // Step 3: Job payload says anthropic, session store still says openai + vi.mocked(runEmbeddedPiAgent).mockClear(); + mockEmbeddedOk(); + const step3 = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "anthropic/claude-opus-4-5", + deliver: false, + }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "existing-session", + updatedAt: Date.now(), + providerOverride: "openai", + modelOverride: "gpt-4.1-mini", + }, + }, + }); + expect(step3.call.provider).toBe("anthropic"); + expect(step3.call.model).toBe("claude-opus-4-5"); + }); + }); + + it("provider does not leak between isolated sequential runs", async () => { + // Run with openai, then run with no override. + // Second run must get the default (anthropic), not leaked openai. + await withTempHome(async (home) => { + // Run 1: explicit openai + const r1 = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "openai/gpt-4.1-mini", + }, + }); + expect(r1.call.provider).toBe("openai"); + + // Run 2: no override — must revert to default anthropic + vi.mocked(runEmbeddedPiAgent).mockClear(); + mockEmbeddedOk(); + const r2 = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + }); + expect(r2.call.provider).toBe("anthropic"); + expect(r2.call.model).toBe("claude-opus-4-5"); + }); + }); + }); + + // ------ forceNew session + stored model override interaction ------ + + describe("forceNew session preserves model overrides from store", () => { + it("new isolated session inherits stored modelOverride/providerOverride", async () => { + // Isolated cron uses forceNew=true, which creates a new sessionId. + // The stored modelOverride/providerOverride must still be read and applied + // (resolveCronSession spreads ...entry before overriding core fields). + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "old-session-id", + updatedAt: Date.now(), + providerOverride: "openai", + modelOverride: "gpt-4.1-mini", + }, + }, + }); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1-mini"); + }); + }); + + it("new isolated session uses default when store has no override", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "old-session-id", + updatedAt: Date.now(), + // No providerOverride or modelOverride + }, + }, + }); + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-opus-4-5"); + }); + }); + }); + + // ------ whitespace / empty edge cases ------ + + describe("whitespace and empty model strings", () => { + it("whitespace-only model treated as unset (falls to default)", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, model: " " }, + }); + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-opus-4-5"); + }); + }); + + it("empty string model treated as unset", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, model: "" }, + }); + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-opus-4-5"); + }); + }); + + it("whitespace-only session modelOverride is ignored", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "old", + updatedAt: Date.now(), + providerOverride: "openai", + modelOverride: " ", + }, + }, + }); + // Whitespace modelOverride should be ignored → default + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-opus-4-5"); + }); + }); + }); + + // ------ config default model as string vs object ------ + + describe("config model format variations", () => { + it("default model as string 'provider/model'", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + cfgOverrides: { + agents: { + defaults: { + model: "openai/gpt-4.1", + }, + }, + }, + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + }); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1"); + }); + }); + + it("default model as object with primary field", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + cfgOverrides: { + agents: { + defaults: { + model: { primary: "openai/gpt-4.1" }, + }, + }, + }, + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + }); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1"); + }); + }); + + it("job override switches away from object default", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + cfgOverrides: { + agents: { + defaults: { + model: { primary: "openai/gpt-4.1" }, + }, + }, + }, + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "anthropic/claude-sonnet-4-5", + }, + }); + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-sonnet-4-5"); + }); + }); + }); +}); diff --git a/src/gateway/config-reload.ts b/src/gateway/config-reload.ts index 38fe786a667..3887548e51b 100644 --- a/src/gateway/config-reload.ts +++ b/src/gateway/config-reload.ts @@ -6,7 +6,7 @@ import { isPlainObject } from "../utils.js"; import { buildGatewayReloadPlan, type GatewayReloadPlan } from "./config-reload-plan.js"; export { buildGatewayReloadPlan }; -export type { GatewayReloadPlan } from "./config-reload-plan.js"; +export type { ChannelKind, GatewayReloadPlan } from "./config-reload-plan.js"; export type GatewayReloadSettings = { mode: GatewayReloadMode; diff --git a/src/gateway/method-scopes.ts b/src/gateway/method-scopes.ts index 866d8071a83..04f3b756567 100644 --- a/src/gateway/method-scopes.ts +++ b/src/gateway/method-scopes.ts @@ -63,6 +63,7 @@ const METHOD_SCOPE_GROUPS: Record = { "skills.status", "voicewake.get", "sessions.list", + "sessions.get", "sessions.preview", "sessions.resolve", "sessions.usage", diff --git a/src/gateway/server-methods.ts b/src/gateway/server-methods.ts index 53bd8625aa3..62cd6bbcd9e 100644 --- a/src/gateway/server-methods.ts +++ b/src/gateway/server-methods.ts @@ -1,3 +1,4 @@ +import { withPluginRuntimeGatewayRequestScope } from "../plugins/runtime/gateway-request-scope.js"; import { formatControlPlaneActor, resolveControlPlaneActor } from "./control-plane-audit.js"; import { consumeControlPlaneWriteBudget } from "./control-plane-rate-limit.js"; import { ADMIN_SCOPE, authorizeOperatorScopesForMethod } from "./method-scopes.js"; @@ -138,12 +139,17 @@ export async function handleGatewayRequest( ); return; } - await handler({ - req, - params: (req.params ?? {}) as Record, - client, - isWebchatConnect, - respond, - context, - }); + const invokeHandler = () => + handler({ + req, + params: (req.params ?? {}) as Record, + client, + isWebchatConnect, + respond, + context, + }); + // All handlers run inside a request scope so that plugin runtime + // subagent methods (e.g. context engine tools spawning sub-agents + // during tool execution) can dispatch back into the gateway. + await withPluginRuntimeGatewayRequestScope({ context, isWebchatConnect }, invokeHandler); } diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 523e6655d71..8200031ae7c 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -50,6 +50,7 @@ import { type SessionsPatchResult, type SessionsPreviewEntry, type SessionsPreviewResult, + readSessionMessages, } from "../session-utils.js"; import { applySessionsPatchToStore } from "../sessions-patch.js"; import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js"; @@ -625,6 +626,28 @@ export const sessionsHandlers: GatewayRequestHandlers = { respond(true, { ok: true, key: target.canonicalKey, deleted, archived }, undefined); }, + "sessions.get": ({ params, respond }) => { + const p = params; + const key = requireSessionKey(p.key ?? p.sessionKey, respond); + if (!key) { + return; + } + const limit = + typeof p.limit === "number" && Number.isFinite(p.limit) + ? Math.max(1, Math.floor(p.limit)) + : 200; + + const { target, storePath } = resolveGatewaySessionTargetFromKey(key); + const store = loadSessionStore(storePath); + const entry = target.storeKeys.map((k) => store[k]).find(Boolean); + if (!entry?.sessionId) { + respond(true, { messages: [] }, undefined); + return; + } + const allMessages = readSessionMessages(entry.sessionId, storePath, entry.sessionFile); + const messages = limit < allMessages.length ? allMessages.slice(-limit) : allMessages; + respond(true, { messages }, undefined); + }, "sessions.compact": async ({ params, respond }) => { if (!assertValidParams(params, validateSessionsCompactParams, "sessions.compact", respond)) { return; diff --git a/src/gateway/server-plugins.test.ts b/src/gateway/server-plugins.test.ts index 4f2a4c84059..38f13cf6ac3 100644 --- a/src/gateway/server-plugins.test.ts +++ b/src/gateway/server-plugins.test.ts @@ -1,14 +1,25 @@ -import { describe, expect, test, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import type { PluginRegistry } from "../plugins/registry.js"; +import type { PluginRuntime } from "../plugins/runtime/types.js"; import type { PluginDiagnostic } from "../plugins/types.js"; -import { loadGatewayPlugins } from "./server-plugins.js"; +import type { GatewayRequestContext, GatewayRequestOptions } from "./server-methods/types.js"; const loadOpenClawPlugins = vi.hoisted(() => vi.fn()); +type HandleGatewayRequestOptions = GatewayRequestOptions & { + extraHandlers?: Record; +}; +const handleGatewayRequest = vi.hoisted(() => + vi.fn(async (_opts: HandleGatewayRequestOptions) => {}), +); vi.mock("../plugins/loader.js", () => ({ loadOpenClawPlugins, })); +vi.mock("./server-methods.js", () => ({ + handleGatewayRequest, +})); + const createRegistry = (diagnostics: PluginDiagnostic[]): PluginRegistry => ({ plugins: [], tools: [], @@ -24,8 +35,75 @@ const createRegistry = (diagnostics: PluginDiagnostic[]): PluginRegistry => ({ diagnostics, }); +type ServerPluginsModule = typeof import("./server-plugins.js"); + +function createTestContext(label: string): GatewayRequestContext { + return { label } as unknown as GatewayRequestContext; +} + +function getLastDispatchedContext(): GatewayRequestContext | undefined { + const call = handleGatewayRequest.mock.calls.at(-1)?.[0]; + return call?.context; +} + +async function importServerPluginsModule(): Promise { + return import("./server-plugins.js"); +} + +function createSubagentRuntime(serverPlugins: ServerPluginsModule): PluginRuntime["subagent"] { + const log = { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }; + loadOpenClawPlugins.mockReturnValue(createRegistry([])); + serverPlugins.loadGatewayPlugins({ + cfg: {}, + workspaceDir: "/tmp", + log, + coreGatewayHandlers: {}, + baseMethods: [], + }); + const call = loadOpenClawPlugins.mock.calls.at(-1)?.[0] as + | { runtimeOptions?: { subagent?: PluginRuntime["subagent"] } } + | undefined; + if (!call?.runtimeOptions?.subagent) { + throw new Error("Expected loadGatewayPlugins to provide subagent runtime"); + } + return call.runtimeOptions.subagent; +} + +beforeEach(() => { + loadOpenClawPlugins.mockReset(); + handleGatewayRequest.mockReset(); + handleGatewayRequest.mockImplementation(async (opts: HandleGatewayRequestOptions) => { + switch (opts.req.method) { + case "agent": + opts.respond(true, { runId: "run-1" }); + return; + case "agent.wait": + opts.respond(true, { status: "ok" }); + return; + case "sessions.get": + opts.respond(true, { messages: [] }); + return; + case "sessions.delete": + opts.respond(true, {}); + return; + default: + opts.respond(true, {}); + } + }); +}); + +afterEach(() => { + vi.resetModules(); +}); + describe("loadGatewayPlugins", () => { - test("logs plugin errors with details", () => { + test("logs plugin errors with details", async () => { + const { loadGatewayPlugins } = await importServerPluginsModule(); const diagnostics: PluginDiagnostic[] = [ { level: "error", @@ -56,4 +134,79 @@ describe("loadGatewayPlugins", () => { ); expect(log.warn).not.toHaveBeenCalled(); }); + + test("provides subagent runtime with sessions.get method aliases", async () => { + const { loadGatewayPlugins } = await importServerPluginsModule(); + loadOpenClawPlugins.mockReturnValue(createRegistry([])); + + const log = { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }; + + loadGatewayPlugins({ + cfg: {}, + workspaceDir: "/tmp", + log, + coreGatewayHandlers: {}, + baseMethods: [], + }); + + const call = loadOpenClawPlugins.mock.calls.at(-1)?.[0]; + const subagent = call?.runtimeOptions?.subagent; + expect(typeof subagent?.getSessionMessages).toBe("function"); + expect(typeof subagent?.getSession).toBe("function"); + }); + + test("shares fallback context across module reloads for existing runtimes", async () => { + const first = await importServerPluginsModule(); + const runtime = createSubagentRuntime(first); + + const staleContext = createTestContext("stale"); + first.setFallbackGatewayContext(staleContext); + await runtime.run({ sessionKey: "s-1", message: "hello" }); + expect(getLastDispatchedContext()).toBe(staleContext); + + vi.resetModules(); + const reloaded = await importServerPluginsModule(); + const freshContext = createTestContext("fresh"); + reloaded.setFallbackGatewayContext(freshContext); + + await runtime.run({ sessionKey: "s-1", message: "hello again" }); + expect(getLastDispatchedContext()).toBe(freshContext); + }); + + test("uses updated fallback context after context replacement", async () => { + const serverPlugins = await importServerPluginsModule(); + const runtime = createSubagentRuntime(serverPlugins); + const firstContext = createTestContext("before-restart"); + const secondContext = createTestContext("after-restart"); + + serverPlugins.setFallbackGatewayContext(firstContext); + await runtime.run({ sessionKey: "s-2", message: "before restart" }); + expect(getLastDispatchedContext()).toBe(firstContext); + + serverPlugins.setFallbackGatewayContext(secondContext); + await runtime.run({ sessionKey: "s-2", message: "after restart" }); + expect(getLastDispatchedContext()).toBe(secondContext); + }); + + test("reflects fallback context object mutation at dispatch time", async () => { + const serverPlugins = await importServerPluginsModule(); + const runtime = createSubagentRuntime(serverPlugins); + const context = { marker: "before-mutation" } as GatewayRequestContext & { + marker: string; + }; + + serverPlugins.setFallbackGatewayContext(context); + context.marker = "after-mutation"; + + await runtime.run({ sessionKey: "s-3", message: "mutated context" }); + const dispatched = getLastDispatchedContext() as + | (GatewayRequestContext & { marker: string }) + | undefined; + expect(dispatched?.marker).toBe("after-mutation"); + }); }); diff --git a/src/gateway/server-plugins.ts b/src/gateway/server-plugins.ts index e879310c304..dde23f703a6 100644 --- a/src/gateway/server-plugins.ts +++ b/src/gateway/server-plugins.ts @@ -1,6 +1,165 @@ +import { randomUUID } from "node:crypto"; import type { loadConfig } from "../config/config.js"; import { loadOpenClawPlugins } from "../plugins/loader.js"; -import type { GatewayRequestHandler } from "./server-methods/types.js"; +import { getPluginRuntimeGatewayRequestScope } from "../plugins/runtime/gateway-request-scope.js"; +import type { PluginRuntime } from "../plugins/runtime/types.js"; +import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "./protocol/client-info.js"; +import type { ErrorShape } from "./protocol/index.js"; +import { PROTOCOL_VERSION } from "./protocol/index.js"; +import { handleGatewayRequest } from "./server-methods.js"; +import type { + GatewayRequestContext, + GatewayRequestHandler, + GatewayRequestOptions, +} from "./server-methods/types.js"; + +// ── Fallback gateway context for non-WS paths (Telegram, WhatsApp, etc.) ── +// The WS path sets a per-request scope via AsyncLocalStorage, but channel +// adapters (Telegram polling, etc.) invoke the agent directly without going +// through handleGatewayRequest. We store the gateway context at startup so +// dispatchGatewayMethod can use it as a fallback. + +const FALLBACK_GATEWAY_CONTEXT_STATE_KEY: unique symbol = Symbol.for( + "openclaw.fallbackGatewayContextState", +); + +type FallbackGatewayContextState = { + context: GatewayRequestContext | undefined; +}; + +const fallbackGatewayContextState = (() => { + const globalState = globalThis as typeof globalThis & { + [FALLBACK_GATEWAY_CONTEXT_STATE_KEY]?: FallbackGatewayContextState; + }; + const existing = globalState[FALLBACK_GATEWAY_CONTEXT_STATE_KEY]; + if (existing) { + return existing; + } + const created: FallbackGatewayContextState = { context: undefined }; + globalState[FALLBACK_GATEWAY_CONTEXT_STATE_KEY] = created; + return created; +})(); + +export function setFallbackGatewayContext(ctx: GatewayRequestContext): void { + // TODO: This startup snapshot can become stale if runtime config/context changes. + fallbackGatewayContextState.context = ctx; +} + +// ── Internal gateway dispatch for plugin runtime ──────────────────── + +function createSyntheticOperatorClient(): GatewayRequestOptions["client"] { + return { + connect: { + minProtocol: PROTOCOL_VERSION, + maxProtocol: PROTOCOL_VERSION, + client: { + id: GATEWAY_CLIENT_IDS.GATEWAY_CLIENT, + version: "internal", + platform: "node", + mode: GATEWAY_CLIENT_MODES.BACKEND, + }, + role: "operator", + scopes: ["operator.admin", "operator.approvals", "operator.pairing"], + }, + }; +} + +async function dispatchGatewayMethod( + method: string, + params: Record, +): Promise { + const scope = getPluginRuntimeGatewayRequestScope(); + const context = scope?.context ?? fallbackGatewayContextState.context; + const isWebchatConnect = scope?.isWebchatConnect ?? (() => false); + if (!context) { + throw new Error( + `Plugin subagent dispatch requires a gateway request scope (method: ${method}). No scope set and no fallback context available.`, + ); + } + + let result: { ok: boolean; payload?: unknown; error?: ErrorShape } | undefined; + await handleGatewayRequest({ + req: { + type: "req", + id: `plugin-subagent-${randomUUID()}`, + method, + params, + }, + client: createSyntheticOperatorClient(), + isWebchatConnect, + respond: (ok, payload, error) => { + if (!result) { + result = { ok, payload, error }; + } + }, + context, + }); + + if (!result) { + throw new Error(`Gateway method "${method}" completed without a response.`); + } + if (!result.ok) { + throw new Error(result.error?.message ?? `Gateway method "${method}" failed.`); + } + return result.payload as T; +} + +function createGatewaySubagentRuntime(): PluginRuntime["subagent"] { + const getSessionMessages: PluginRuntime["subagent"]["getSessionMessages"] = async (params) => { + const payload = await dispatchGatewayMethod<{ messages?: unknown[] }>("sessions.get", { + key: params.sessionKey, + ...(params.limit != null && { limit: params.limit }), + }); + return { messages: Array.isArray(payload?.messages) ? payload.messages : [] }; + }; + + return { + async run(params) { + const payload = await dispatchGatewayMethod<{ runId?: string }>("agent", { + sessionKey: params.sessionKey, + message: params.message, + deliver: params.deliver ?? false, + ...(params.extraSystemPrompt && { extraSystemPrompt: params.extraSystemPrompt }), + ...(params.lane && { lane: params.lane }), + ...(params.idempotencyKey && { idempotencyKey: params.idempotencyKey }), + }); + const runId = payload?.runId; + if (typeof runId !== "string" || !runId) { + throw new Error("Gateway agent method returned an invalid runId."); + } + return { runId }; + }, + async waitForRun(params) { + const payload = await dispatchGatewayMethod<{ status?: string; error?: string }>( + "agent.wait", + { + runId: params.runId, + ...(params.timeoutMs != null && { timeoutMs: params.timeoutMs }), + }, + ); + const status = payload?.status; + if (status !== "ok" && status !== "error" && status !== "timeout") { + throw new Error(`Gateway agent.wait returned unexpected status: ${status}`); + } + return { + status, + ...(typeof payload?.error === "string" && payload.error && { error: payload.error }), + }; + }, + getSessionMessages, + async getSession(params) { + return getSessionMessages(params); + }, + async deleteSession(params) { + await dispatchGatewayMethod("sessions.delete", { + key: params.sessionKey, + deleteTranscript: params.deleteTranscript ?? true, + }); + }, + }; +} + +// ── Plugin loading ────────────────────────────────────────────────── export function loadGatewayPlugins(params: { cfg: ReturnType; @@ -24,6 +183,9 @@ export function loadGatewayPlugins(params: { debug: (msg) => params.log.debug(msg), }, coreGatewayHandlers: params.coreGatewayHandlers, + runtimeOptions: { + subagent: createGatewaySubagentRuntime(), + }, }); const pluginMethods = Object.keys(pluginRegistry.gatewayHandlers); const gatewayMethods = Array.from(new Set([...params.baseMethods, ...pluginMethods])); diff --git a/src/gateway/server.agent.gateway-server-agent.mocks.ts b/src/gateway/server.agent.gateway-server-agent.mocks.ts index b930ccbc67f..c3a33eca9ad 100644 --- a/src/gateway/server.agent.gateway-server-agent.mocks.ts +++ b/src/gateway/server.agent.gateway-server-agent.mocks.ts @@ -1,9 +1,23 @@ import { vi } from "vitest"; -import { createEmptyPluginRegistry, type PluginRegistry } from "../plugins/registry.js"; +import type { PluginRegistry } from "../plugins/registry.js"; import { setActivePluginRegistry } from "../plugins/runtime.js"; export const registryState: { registry: PluginRegistry } = { - registry: createEmptyPluginRegistry(), + registry: { + plugins: [], + tools: [], + hooks: [], + typedHooks: [], + channels: [], + providers: [], + gatewayHandlers: {}, + httpHandlers: [], + httpRoutes: [], + cliRegistrars: [], + services: [], + commands: [], + diagnostics: [], + } as PluginRegistry, }; export function setRegistry(registry: PluginRegistry) { @@ -21,5 +35,7 @@ vi.mock("./server-plugins.js", async () => { gatewayMethods: params.baseMethods ?? [], }; }, + // server.impl.ts sets a fallback context before dispatch; tests only need the symbol to exist. + setFallbackGatewayContext: vi.fn(), }; }); diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 2e816c67dce..efb95e7a7cf 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -89,7 +89,7 @@ import { createSecretsHandlers } from "./server-methods/secrets.js"; import { hasConnectedMobileNode } from "./server-mobile-nodes.js"; import { loadGatewayModelCatalog } from "./server-model-catalog.js"; import { createNodeSubscriptionManager } from "./server-node-subscriptions.js"; -import { loadGatewayPlugins } from "./server-plugins.js"; +import { loadGatewayPlugins, setFallbackGatewayContext } from "./server-plugins.js"; import { createGatewayReloadHandlers } from "./server-reload-handlers.js"; import { resolveGatewayRuntimeConfig } from "./server-runtime-config.js"; import { createGatewayRuntimeState } from "./server-runtime-state.js"; @@ -779,6 +779,63 @@ export async function startGatewayServer( const canvasHostServerPort = (canvasHostServer as CanvasHostServer | null)?.port; + const gatewayRequestContext: import("./server-methods/types.js").GatewayRequestContext = { + deps, + cron, + cronStorePath, + execApprovalManager, + loadGatewayModelCatalog, + getHealthCache, + refreshHealthSnapshot: refreshGatewayHealthSnapshot, + logHealth, + logGateway: log, + incrementPresenceVersion, + getHealthVersion, + broadcast, + broadcastToConnIds, + nodeSendToSession, + nodeSendToAllSubscribed, + nodeSubscribe, + nodeUnsubscribe, + nodeUnsubscribeAll, + hasConnectedMobileNode: hasMobileNodeConnected, + hasExecApprovalClients: () => { + for (const gatewayClient of clients) { + const scopes = Array.isArray(gatewayClient.connect.scopes) + ? gatewayClient.connect.scopes + : []; + if (scopes.includes("operator.admin") || scopes.includes("operator.approvals")) { + return true; + } + } + return false; + }, + nodeRegistry, + agentRunSeq, + chatAbortControllers, + chatAbortedRuns: chatRunState.abortedRuns, + chatRunBuffers: chatRunState.buffers, + chatDeltaSentAt: chatRunState.deltaSentAt, + addChatRun, + removeChatRun, + registerToolEventRecipient: toolEventRecipients.add, + dedupe, + wizardSessions, + findRunningWizard, + purgeWizardSession, + getRuntimeSnapshot, + startChannel, + stopChannel, + markChannelLoggedOut, + wizardRunner, + broadcastVoiceWakeChanged, + }; + + // Store the gateway context as a fallback for plugin subagent dispatch + // in non-WS paths (Telegram polling, WhatsApp, etc.) where no per-request + // scope is set via AsyncLocalStorage. + setFallbackGatewayContext(gatewayRequestContext); + attachGatewayWsHandlers({ wss, clients, @@ -800,57 +857,7 @@ export async function startGatewayServer( ...secretsHandlers, }, broadcast, - context: { - deps, - cron, - cronStorePath, - execApprovalManager, - loadGatewayModelCatalog, - getHealthCache, - refreshHealthSnapshot: refreshGatewayHealthSnapshot, - logHealth, - logGateway: log, - incrementPresenceVersion, - getHealthVersion, - broadcast, - broadcastToConnIds, - nodeSendToSession, - nodeSendToAllSubscribed, - nodeSubscribe, - nodeUnsubscribe, - nodeUnsubscribeAll, - hasConnectedMobileNode: hasMobileNodeConnected, - hasExecApprovalClients: () => { - for (const gatewayClient of clients) { - const scopes = Array.isArray(gatewayClient.connect.scopes) - ? gatewayClient.connect.scopes - : []; - if (scopes.includes("operator.admin") || scopes.includes("operator.approvals")) { - return true; - } - } - return false; - }, - nodeRegistry, - agentRunSeq, - chatAbortControllers, - chatAbortedRuns: chatRunState.abortedRuns, - chatRunBuffers: chatRunState.buffers, - chatDeltaSentAt: chatRunState.deltaSentAt, - addChatRun, - removeChatRun, - registerToolEventRecipient: toolEventRecipients.add, - dedupe, - wizardSessions, - findRunningWizard, - purgeWizardSession, - getRuntimeSnapshot, - startChannel, - stopChannel, - markChannelLoggedOut, - wizardRunner, - broadcastVoiceWakeChanged, - }, + context: gatewayRequestContext, }); logGatewayStartup({ cfg: cfgAtStart, diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index 2b8fc8e7a63..07b0846cddb 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -109,7 +109,19 @@ export type { GatewayRequestHandlerOptions, RespondFn, } from "../gateway/server-methods/types.js"; -export type { PluginRuntime, RuntimeLogger } from "../plugins/runtime/types.js"; +export type { + PluginRuntime, + RuntimeLogger, + SubagentRunParams, + SubagentRunResult, + SubagentWaitParams, + SubagentWaitResult, + SubagentGetSessionMessagesParams, + SubagentGetSessionMessagesResult, + SubagentGetSessionParams, + SubagentGetSessionResult, + SubagentDeleteSessionParams, +} from "../plugins/runtime/types.js"; export { normalizePluginHttpPath } from "../plugins/http-path.js"; export { registerPluginHttpRoute } from "../plugins/http-registry.js"; export { emptyPluginConfigSchema } from "../plugins/config-schema.js"; @@ -695,5 +707,20 @@ export type { ProcessedLineMessage } from "../line/markdown-to-line.js"; // Media utilities export { loadWebMedia, type WebMediaResult } from "../web/media.js"; +// Context engine +export type { + ContextEngine, + ContextEngineInfo, + AssembleResult, + CompactResult, + IngestResult, + IngestBatchResult, + BootstrapResult, + SubagentSpawnPreparation, + SubagentEndReason, +} from "../context-engine/types.js"; +export { registerContextEngine } from "../context-engine/registry.js"; +export type { ContextEngineFactory } from "../context-engine/registry.js"; + // Security utilities export { redactSensitiveText } from "../logging/redact.js"; diff --git a/src/plugins/loader.ts b/src/plugins/loader.ts index 482eeead5de..15051b25e81 100644 --- a/src/plugins/loader.ts +++ b/src/plugins/loader.ts @@ -21,7 +21,7 @@ import { loadPluginManifestRegistry } from "./manifest-registry.js"; import { isPathInside, safeStatSync } from "./path-safety.js"; import { createPluginRegistry, type PluginRecord, type PluginRegistry } from "./registry.js"; import { setActivePluginRegistry } from "./runtime.js"; -import { createPluginRuntime } from "./runtime/index.js"; +import { createPluginRuntime, type CreatePluginRuntimeOptions } from "./runtime/index.js"; import type { PluginRuntime } from "./runtime/types.js"; import { validateJsonSchemaValue } from "./schema-validator.js"; import type { @@ -38,6 +38,7 @@ export type PluginLoadOptions = { workspaceDir?: string; logger?: PluginLogger; coreGatewayHandlers?: Record; + runtimeOptions?: CreatePluginRuntimeOptions; cache?: boolean; mode?: "full" | "validate"; }; @@ -503,7 +504,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi // not eagerly load every channel runtime dependency. let resolvedRuntime: PluginRuntime | null = null; const resolveRuntime = (): PluginRuntime => { - resolvedRuntime ??= createPluginRuntime(); + resolvedRuntime ??= createPluginRuntime(options.runtimeOptions); return resolvedRuntime; }; const runtime = new Proxy({} as PluginRuntime, { diff --git a/src/plugins/registry.ts b/src/plugins/registry.ts index fde8d0e6a6d..9fc797ab235 100644 --- a/src/plugins/registry.ts +++ b/src/plugins/registry.ts @@ -2,6 +2,7 @@ import path from "node:path"; import type { AnyAgentTool } from "../agents/tools/common.js"; import type { ChannelDock } from "../channels/dock.js"; import type { ChannelPlugin } from "../channels/plugins/types.js"; +import { registerContextEngine } from "../context-engine/registry.js"; import type { GatewayRequestHandler, GatewayRequestHandlers, @@ -582,6 +583,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { registerCli: (registrar, opts) => registerCli(record, registrar, opts), registerService: (service) => registerService(record, service), registerCommand: (command) => registerCommand(record, command), + registerContextEngine: (id, factory) => registerContextEngine(id, factory), resolvePath: (input: string) => resolveUserPath(input), on: (hookName, handler, opts) => registerTypedHook(record, hookName, handler, opts, params.hookPolicy), diff --git a/src/plugins/runtime/gateway-request-scope.test.ts b/src/plugins/runtime/gateway-request-scope.test.ts new file mode 100644 index 00000000000..ef31350e2a3 --- /dev/null +++ b/src/plugins/runtime/gateway-request-scope.test.ts @@ -0,0 +1,23 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { PluginRuntimeGatewayRequestScope } from "./gateway-request-scope.js"; + +const TEST_SCOPE: PluginRuntimeGatewayRequestScope = { + context: {} as PluginRuntimeGatewayRequestScope["context"], + isWebchatConnect: (() => false) as PluginRuntimeGatewayRequestScope["isWebchatConnect"], +}; + +afterEach(() => { + vi.resetModules(); +}); + +describe("gateway request scope", () => { + it("reuses AsyncLocalStorage across reloaded module instances", async () => { + const first = await import("./gateway-request-scope.js"); + + await first.withPluginRuntimeGatewayRequestScope(TEST_SCOPE, async () => { + vi.resetModules(); + const second = await import("./gateway-request-scope.js"); + expect(second.getPluginRuntimeGatewayRequestScope()).toEqual(TEST_SCOPE); + }); + }); +}); diff --git a/src/plugins/runtime/gateway-request-scope.ts b/src/plugins/runtime/gateway-request-scope.ts new file mode 100644 index 00000000000..11ed9cb4980 --- /dev/null +++ b/src/plugins/runtime/gateway-request-scope.ts @@ -0,0 +1,46 @@ +import { AsyncLocalStorage } from "node:async_hooks"; +import type { + GatewayRequestContext, + GatewayRequestOptions, +} from "../../gateway/server-methods/types.js"; + +export type PluginRuntimeGatewayRequestScope = { + context: GatewayRequestContext; + isWebchatConnect: GatewayRequestOptions["isWebchatConnect"]; +}; + +const PLUGIN_RUNTIME_GATEWAY_REQUEST_SCOPE_KEY: unique symbol = Symbol.for( + "openclaw.pluginRuntimeGatewayRequestScope", +); + +const pluginRuntimeGatewayRequestScope = (() => { + const globalState = globalThis as typeof globalThis & { + [PLUGIN_RUNTIME_GATEWAY_REQUEST_SCOPE_KEY]?: AsyncLocalStorage; + }; + const existing = globalState[PLUGIN_RUNTIME_GATEWAY_REQUEST_SCOPE_KEY]; + if (existing) { + return existing; + } + const created = new AsyncLocalStorage(); + globalState[PLUGIN_RUNTIME_GATEWAY_REQUEST_SCOPE_KEY] = created; + return created; +})(); + +/** + * Runs plugin gateway handlers with request-scoped context that runtime helpers can read. + */ +export function withPluginRuntimeGatewayRequestScope( + scope: PluginRuntimeGatewayRequestScope, + run: () => T, +): T { + return pluginRuntimeGatewayRequestScope.run(scope, run); +} + +/** + * Returns the current plugin gateway request scope when called from a plugin request handler. + */ +export function getPluginRuntimeGatewayRequestScope(): + | PluginRuntimeGatewayRequestScope + | undefined { + return pluginRuntimeGatewayRequestScope.getStore(); +} diff --git a/src/plugins/runtime/index.ts b/src/plugins/runtime/index.ts index 3db2f68ad92..68b672db1b4 100644 --- a/src/plugins/runtime/index.ts +++ b/src/plugins/runtime/index.ts @@ -28,10 +28,28 @@ function resolveVersion(): string { } } -export function createPluginRuntime(): PluginRuntime { +function createUnavailableSubagentRuntime(): PluginRuntime["subagent"] { + const unavailable = () => { + throw new Error("Plugin runtime subagent methods are only available during a gateway request."); + }; + return { + run: unavailable, + waitForRun: unavailable, + getSessionMessages: unavailable, + getSession: unavailable, + deleteSession: unavailable, + }; +} + +export type CreatePluginRuntimeOptions = { + subagent?: PluginRuntime["subagent"]; +}; + +export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}): PluginRuntime { const runtime = { version: resolveVersion(), config: createRuntimeConfig(), + subagent: _options.subagent ?? createUnavailableSubagentRuntime(), system: createRuntimeSystem(), media: createRuntimeMedia(), tts: { textToSpeechTelephony }, diff --git a/src/plugins/runtime/types.ts b/src/plugins/runtime/types.ts index 275bb7cba9a..245e8dd1274 100644 --- a/src/plugins/runtime/types.ts +++ b/src/plugins/runtime/types.ts @@ -3,6 +3,61 @@ import type { PluginRuntimeCore, RuntimeLogger } from "./types-core.js"; export type { RuntimeLogger }; +// ── Subagent runtime types ────────────────────────────────────────── + +export type SubagentRunParams = { + sessionKey: string; + message: string; + extraSystemPrompt?: string; + lane?: string; + deliver?: boolean; + idempotencyKey?: string; +}; + +export type SubagentRunResult = { + runId: string; +}; + +export type SubagentWaitParams = { + runId: string; + timeoutMs?: number; +}; + +export type SubagentWaitResult = { + status: "ok" | "error" | "timeout"; + error?: string; +}; + +export type SubagentGetSessionMessagesParams = { + sessionKey: string; + limit?: number; +}; + +export type SubagentGetSessionMessagesResult = { + messages: unknown[]; +}; + +/** @deprecated Use SubagentGetSessionMessagesParams. */ +export type SubagentGetSessionParams = SubagentGetSessionMessagesParams; + +/** @deprecated Use SubagentGetSessionMessagesResult. */ +export type SubagentGetSessionResult = SubagentGetSessionMessagesResult; + +export type SubagentDeleteSessionParams = { + sessionKey: string; + deleteTranscript?: boolean; +}; + export type PluginRuntime = PluginRuntimeCore & { + subagent: { + run: (params: SubagentRunParams) => Promise; + waitForRun: (params: SubagentWaitParams) => Promise; + getSessionMessages: ( + params: SubagentGetSessionMessagesParams, + ) => Promise; + /** @deprecated Use getSessionMessages. */ + getSession: (params: SubagentGetSessionParams) => Promise; + deleteSession: (params: SubagentDeleteSessionParams) => Promise; + }; channel: PluginRuntimeChannel; }; diff --git a/src/plugins/slots.ts b/src/plugins/slots.ts index 8fee7172a2e..bcbbdd44a03 100644 --- a/src/plugins/slots.ts +++ b/src/plugins/slots.ts @@ -11,10 +11,12 @@ type SlotPluginRecord = { const SLOT_BY_KIND: Record = { memory: "memory", + "context-engine": "contextEngine", }; const DEFAULT_SLOT_BY_KEY: Record = { memory: "memory-core", + contextEngine: "legacy", }; export function slotKeyForPluginKind(kind?: PluginKind): PluginSlotKey | null { diff --git a/src/plugins/types.ts b/src/plugins/types.ts index 1cb2779e8c2..32f8a545038 100644 --- a/src/plugins/types.ts +++ b/src/plugins/types.ts @@ -35,7 +35,7 @@ export type PluginConfigUiHint = { placeholder?: string; }; -export type PluginKind = "memory"; +export type PluginKind = "memory" | "context-engine"; export type PluginConfigValidation = | { ok: true; value?: unknown } @@ -285,6 +285,11 @@ export type OpenClawPluginApi = { * Use this for simple state-toggling or status commands that don't need AI reasoning. */ registerCommand: (command: OpenClawPluginCommandDefinition) => void; + /** Register a context engine implementation (exclusive slot — only one active at a time). */ + registerContextEngine: ( + id: string, + factory: import("../context-engine/registry.js").ContextEngineFactory, + ) => void; resolvePath: (input: string) => string; /** Register a lifecycle hook handler */ on: ( diff --git a/test/scripts/ios-team-id.test.ts b/test/scripts/ios-team-id.test.ts index f2a9037f020..2496073951c 100644 --- a/test/scripts/ios-team-id.test.ts +++ b/test/scripts/ios-team-id.test.ts @@ -96,7 +96,7 @@ function runScript( const binDir = path.join(homeDir, "bin"); const env = { HOME: homeDir, - PATH: `${binDir}:${sharedBinDir}:${BASE_PATH}`, + PATH: `${binDir}${path.delimiter}${sharedBinDir}${path.delimiter}${BASE_PATH}`, LANG: BASE_LANG, ...extraEnv, };