diff --git a/CHANGELOG.md b/CHANGELOG.md index 5cb86a8bd78..0621c2389de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai - Tools/Diffs guidance: restore a short system-prompt hint for enabled diffs while keeping the detailed instructions in the companion skill, so diffs usage guidance stays out of user-prompt space. (#36904) thanks @gumadeiras. - Telegram/ACP topic bindings: accept Telegram Mac Unicode dash option prefixes in `/acp spawn`, support Telegram topic thread binding (`--thread here|auto`), route bound-topic follow-ups to ACP sessions, add actionable Telegram approval buttons with prefixed approval-id resolution, and pin successful bind confirmations in-topic. (#36683) Thanks @huntharo. - Hooks/Compaction lifecycle: emit `session:compact:before` and `session:compact:after` internal events plus plugin compaction callbacks with session/count metadata, so automations can react to compaction runs consistently. (#16788) thanks @vincentkoc. +- Agents/context engine plugin interface: add `ContextEngine` plugin slot with full lifecycle hooks (`bootstrap`, `ingest`, `assemble`, `compact`, `afterTurn`, `prepareSubagentSpawn`, `onSubagentEnded`), slot-based registry with config-driven resolution, `LegacyContextEngine` wrapper preserving existing compaction behavior, scoped subagent runtime for plugin runtimes via `AsyncLocalStorage`, and `sessions.get` gateway method. Enables plugins like `lossless-claw` to provide alternative context management strategies without modifying core compaction logic. Zero behavior change when no context engine plugin is configured. (#22201) thanks @jalehman. - CLI: make read-only SecretRef status flows degrade safely (#37023) thanks @joshavant. ### Breaking diff --git a/extensions/diffs/index.test.ts b/extensions/diffs/index.test.ts index 84ce5d9fe87..1723fc3c73d 100644 --- a/extensions/diffs/index.test.ts +++ b/extensions/diffs/index.test.ts @@ -30,6 +30,7 @@ describe("diffs plugin registration", () => { registerService() {}, registerProvider() {}, registerCommand() {}, + registerContextEngine() {}, resolvePath(input: string) { return input; }, @@ -105,6 +106,7 @@ describe("diffs plugin registration", () => { registerService() {}, registerProvider() {}, registerCommand() {}, + registerContextEngine() {}, resolvePath(input: string) { return input; }, diff --git a/extensions/diffs/src/tool.test.ts b/extensions/diffs/src/tool.test.ts index db66255cba6..ba72c011c76 100644 --- a/extensions/diffs/src/tool.test.ts +++ b/extensions/diffs/src/tool.test.ts @@ -441,6 +441,7 @@ function createApi(): OpenClawPluginApi { registerService() {}, registerProvider() {}, registerCommand() {}, + registerContextEngine() {}, resolvePath(input: string) { return input; }, diff --git a/extensions/lobster/src/lobster-tool.test.ts b/extensions/lobster/src/lobster-tool.test.ts index 970c2ad4fd1..40e9a0b64e8 100644 --- a/extensions/lobster/src/lobster-tool.test.ts +++ b/extensions/lobster/src/lobster-tool.test.ts @@ -46,6 +46,7 @@ function fakeApi(overrides: Partial = {}): OpenClawPluginApi registerHook() {}, registerHttpRoute() {}, registerCommand() {}, + registerContextEngine() {}, on() {}, resolvePath: (p) => p, ...overrides, diff --git a/extensions/phone-control/index.test.ts b/extensions/phone-control/index.test.ts index a4d05e3d431..9259092b153 100644 --- a/extensions/phone-control/index.test.ts +++ b/extensions/phone-control/index.test.ts @@ -39,6 +39,7 @@ function createApi(params: { registerCli() {}, registerService() {}, registerProvider() {}, + registerContextEngine() {}, registerCommand: params.registerCommand, resolvePath(input: string) { return input; diff --git a/extensions/test-utils/plugin-runtime-mock.ts b/extensions/test-utils/plugin-runtime-mock.ts index f01c87d6c77..0526c6bf591 100644 --- a/extensions/test-utils/plugin-runtime-mock.ts +++ b/extensions/test-utils/plugin-runtime-mock.ts @@ -242,6 +242,13 @@ export function createPluginRuntimeMock(overrides: DeepPartial = state: { resolveStateDir: vi.fn(() => "/tmp/openclaw"), }, + subagent: { + run: vi.fn(), + waitForRun: vi.fn(), + getSessionMessages: vi.fn(), + getSession: vi.fn(), + deleteSession: vi.fn(), + }, }; return mergeDeep(base, overrides); diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 2bfc9e0a5ce..335c3a0e7d9 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -11,6 +11,10 @@ import { resolveHeartbeatPrompt } from "../../auto-reply/heartbeat.js"; import type { ReasoningLevel, ThinkLevel } from "../../auto-reply/thinking.js"; import { resolveChannelCapabilities } from "../../config/channel-capabilities.js"; import type { OpenClawConfig } from "../../config/config.js"; +import { + ensureContextEnginesInitialized, + resolveContextEngine, +} from "../../context-engine/index.js"; import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { getMachineDisplayName } from "../../infra/machine-name.js"; import { generateSecureToken } from "../../infra/secure-random.js"; @@ -29,8 +33,9 @@ import { resolveSessionAgentIds } from "../agent-scope.js"; import type { ExecElevatedDefaults } from "../bash-tools.js"; import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../bootstrap-files.js"; import { listChannelSupportedActions, resolveChannelMessageToolHints } from "../channel-tools.js"; +import { resolveContextWindowInfo } from "../context-window-guard.js"; import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js"; -import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js"; +import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js"; import { resolveOpenClawDocsPath } from "../docs-path.js"; import { getApiKeyForModel, resolveModelAuthMode } from "../model-auth.js"; import { ensureOpenClawModelsJson } from "../models-config.js"; @@ -115,6 +120,8 @@ export type CompactEmbeddedPiSessionParams = { reasoningLevel?: ReasoningLevel; bashElevated?: ExecElevatedDefaults; customInstructions?: string; + tokenBudget?: number; + force?: boolean; trigger?: "overflow" | "manual"; diagId?: string; attempt?: number; @@ -846,6 +853,49 @@ export async function compactEmbeddedPiSession( const enqueueGlobal = params.enqueue ?? ((task, opts) => enqueueCommandInLane(globalLane, task, opts)); return enqueueCommandInLane(sessionLane, () => - enqueueGlobal(async () => compactEmbeddedPiSessionDirect(params)), + enqueueGlobal(async () => { + ensureContextEnginesInitialized(); + const contextEngine = await resolveContextEngine(params.config); + try { + // Resolve token budget from model context window so the context engine + // knows the compaction target. The runner's afterTurn path passes this + // automatically, but the /compact command path needs to compute it here. + const ceProvider = (params.provider ?? DEFAULT_PROVIDER).trim() || DEFAULT_PROVIDER; + const ceModelId = (params.model ?? DEFAULT_MODEL).trim() || DEFAULT_MODEL; + const agentDir = params.agentDir ?? resolveOpenClawAgentDir(); + const { model: ceModel } = resolveModel(ceProvider, ceModelId, agentDir, params.config); + const ceCtxInfo = resolveContextWindowInfo({ + cfg: params.config, + provider: ceProvider, + modelId: ceModelId, + modelContextWindow: ceModel?.contextWindow, + defaultTokens: DEFAULT_CONTEXT_TOKENS, + }); + const result = await contextEngine.compact({ + sessionId: params.sessionId, + sessionFile: params.sessionFile, + tokenBudget: ceCtxInfo.tokens, + customInstructions: params.customInstructions, + force: params.trigger === "manual", + legacyParams: params as Record, + }); + return { + ok: result.ok, + compacted: result.compacted, + reason: result.reason, + result: result.result + ? { + summary: result.result.summary ?? "", + firstKeptEntryId: result.result.firstKeptEntryId ?? "", + tokensBefore: result.result.tokensBefore, + tokensAfter: result.result.tokensAfter, + details: result.result.details, + } + : undefined, + }; + } finally { + await contextEngine.dispose?.(); + } + }), ); } diff --git a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts index 1f8f8032f7e..19b4a81d279 100644 --- a/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts +++ b/src/agents/pi-embedded-runner/run.overflow-compaction.test.ts @@ -54,6 +54,22 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => { ); }); + it("passes resolved auth profile into run attempts for context-engine afterTurn propagation", async () => { + mockedRunEmbeddedAttempt.mockResolvedValueOnce(makeAttemptResult({ promptError: null })); + + await runEmbeddedPiAgent({ + ...overflowBaseRunParams, + runId: "run-auth-profile-passthrough", + }); + + expect(mockedRunEmbeddedAttempt).toHaveBeenCalledWith( + expect.objectContaining({ + authProfileId: "test-profile", + authProfileIdSource: "auto", + }), + ); + }); + it("passes trigger=overflow when retrying compaction after context overflow", async () => { mockOverflowRetrySuccess({ runEmbeddedAttempt: mockedRunEmbeddedAttempt, diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index a5b799471d2..52faf8514b7 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -1,6 +1,10 @@ import { randomBytes } from "node:crypto"; import fs from "node:fs/promises"; import type { ThinkLevel } from "../../auto-reply/thinking.js"; +import { + ensureContextEnginesInitialized, + resolveContextEngine, +} from "../../context-engine/index.js"; import { generateSecureToken } from "../../infra/secure-random.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; import type { PluginHookBeforeAgentStartResult } from "../../plugins/types.js"; @@ -50,7 +54,6 @@ import { } from "../pi-embedded-helpers.js"; import { derivePromptTokens, normalizeUsage, type UsageLike } from "../usage.js"; import { redactRunIdentifier, resolveRunWorkspaceDir } from "../workspace-run.js"; -import { compactEmbeddedPiSessionDirect } from "./compact.js"; import { resolveGlobalLane, resolveSessionLane } from "./lanes.js"; import { log } from "./logger.js"; import { resolveModel } from "./model.js"; @@ -737,6 +740,10 @@ export async function runEmbeddedPiAgent( agentDir, }); }; + // Resolve the context engine once and reuse across retries to avoid + // repeated initialization/connection overhead per attempt. + ensureContextEnginesInitialized(); + const contextEngine = await resolveContextEngine(params.config); try { let authRetryPending = false; // Hoisted so the retry-limit error path can use the most recent API total. @@ -806,6 +813,8 @@ export async function runEmbeddedPiAgent( workspaceDir: resolvedWorkspace, agentDir, config: params.config, + contextEngine, + contextTokenBudget: ctxInfo.tokens, skillsSnapshot: params.skillsSnapshot, prompt, images: params.images, @@ -813,6 +822,8 @@ export async function runEmbeddedPiAgent( provider, modelId, model, + authProfileId: lastProfileId, + authProfileIdSource: lockedProfileId ? "user" : "auto", authStorage, modelRegistry, agentId: workspaceResolution.agentId, @@ -955,31 +966,36 @@ export async function runEmbeddedPiAgent( log.warn( `context overflow detected (attempt ${overflowCompactionAttempts}/${MAX_OVERFLOW_COMPACTION_ATTEMPTS}); attempting auto-compaction for ${provider}/${modelId}`, ); - const compactResult = await compactEmbeddedPiSessionDirect({ + const compactResult = await contextEngine.compact({ sessionId: params.sessionId, - sessionKey: params.sessionKey, - messageChannel: params.messageChannel, - messageProvider: params.messageProvider, - agentAccountId: params.agentAccountId, - authProfileId: lastProfileId, sessionFile: params.sessionFile, - workspaceDir: resolvedWorkspace, - agentDir, - config: params.config, - skillsSnapshot: params.skillsSnapshot, - senderIsOwner: params.senderIsOwner, - provider, - model: modelId, - runId: params.runId, - thinkLevel, - reasoningLevel: params.reasoningLevel, - bashElevated: params.bashElevated, - extraSystemPrompt: params.extraSystemPrompt, - ownerNumbers: params.ownerNumbers, - trigger: "overflow", - diagId: overflowDiagId, - attempt: overflowCompactionAttempts, - maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS, + tokenBudget: ctxInfo.tokens, + force: true, + compactionTarget: "budget", + legacyParams: { + sessionKey: params.sessionKey, + messageChannel: params.messageChannel, + messageProvider: params.messageProvider, + agentAccountId: params.agentAccountId, + authProfileId: lastProfileId, + workspaceDir: resolvedWorkspace, + agentDir, + config: params.config, + skillsSnapshot: params.skillsSnapshot, + senderIsOwner: params.senderIsOwner, + provider, + model: modelId, + runId: params.runId, + thinkLevel, + reasoningLevel: params.reasoningLevel, + bashElevated: params.bashElevated, + extraSystemPrompt: params.extraSystemPrompt, + ownerNumbers: params.ownerNumbers, + trigger: "overflow", + diagId: overflowDiagId, + attempt: overflowCompactionAttempts, + maxAttempts: MAX_OVERFLOW_COMPACTION_ATTEMPTS, + }, }); if (compactResult.compacted) { autoCompactionCount += 1; @@ -1412,6 +1428,7 @@ export async function runEmbeddedPiAgent( }; } } finally { + await contextEngine.dispose?.(); stopCopilotRefreshTimer(); process.chdir(prevCwd); } diff --git a/src/agents/pi-embedded-runner/run/attempt.test.ts b/src/agents/pi-embedded-runner/run/attempt.test.ts index 4f637a464c2..c4878617c5c 100644 --- a/src/agents/pi-embedded-runner/run/attempt.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.test.ts @@ -1,8 +1,10 @@ import { describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../../config/config.js"; import { + buildAfterTurnLegacyCompactionParams, composeSystemPromptWithHookContext, isOllamaCompatProvider, + prependSystemPromptAddition, resolveAttemptFsWorkspaceOnly, resolveOllamaBaseUrlForRun, resolveOllamaCompatNumCtxEnabled, @@ -180,7 +182,6 @@ describe("resolveAttemptFsWorkspaceOnly", () => { ).toBe(false); }); }); - describe("wrapStreamFnTrimToolCallNames", () => { function createFakeStream(params: { events: unknown[]; resultMessage: unknown }): { result: () => Promise; @@ -548,3 +549,54 @@ describe("decodeHtmlEntitiesInObject", () => { expect(decodeHtmlEntitiesInObject("'world'")).toBe("'world'"); }); }); +describe("prependSystemPromptAddition", () => { + it("prepends context-engine addition to the system prompt", () => { + const result = prependSystemPromptAddition({ + systemPrompt: "base system", + systemPromptAddition: "extra behavior", + }); + + expect(result).toBe("extra behavior\n\nbase system"); + }); + + it("returns the original system prompt when no addition is provided", () => { + const result = prependSystemPromptAddition({ + systemPrompt: "base system", + }); + + expect(result).toBe("base system"); + }); +}); + +describe("buildAfterTurnLegacyCompactionParams", () => { + it("includes resolved auth profile fields for context-engine afterTurn compaction", () => { + const legacy = buildAfterTurnLegacyCompactionParams({ + attempt: { + sessionKey: "agent:main:session:abc", + messageChannel: "slack", + messageProvider: "slack", + agentAccountId: "acct-1", + authProfileId: "openai:p1", + config: { plugins: { slots: { contextEngine: "lossless-claw" } } } as OpenClawConfig, + skillsSnapshot: undefined, + senderIsOwner: true, + provider: "openai-codex", + modelId: "gpt-5.3-codex", + thinkLevel: "off", + reasoningLevel: "on", + extraSystemPrompt: "extra", + ownerNumbers: ["+15555550123"], + }, + workspaceDir: "/tmp/workspace", + agentDir: "/tmp/agent", + }); + + expect(legacy).toMatchObject({ + authProfileId: "openai:p1", + provider: "openai-codex", + model: "gpt-5.3-codex", + workspaceDir: "/tmp/workspace", + agentDir: "/tmp/agent", + }); + }); +}); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 4a75c297a26..61159c13357 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -63,6 +63,7 @@ import { } from "../../pi-embedded-helpers.js"; import { subscribeEmbeddedPiSession } from "../../pi-embedded-subscribe.js"; import { createPreparedEmbeddedPiSettingsManager } from "../../pi-project-settings.js"; +import { applyPiAutoCompactionGuard } from "../../pi-settings.js"; import { toClientToolDefinitions } from "../../pi-tool-definition-adapter.js"; import { createOpenClawCodingTools, resolveToolLoopDetectionConfig } from "../../pi-tools.js"; import { resolveSandboxContext } from "../../sandbox.js"; @@ -90,6 +91,7 @@ import { resolveTranscriptPolicy } from "../../transcript-policy.js"; import { DEFAULT_BOOTSTRAP_FILENAME } from "../../workspace.js"; import { isRunnerAbortError } from "../abort.js"; import { appendCacheTtlTimestamp, isCacheTtlEligibleProvider } from "../cache-ttl.js"; +import type { CompactEmbeddedPiSessionParams } from "../compact.js"; import { buildEmbeddedExtensionFactories } from "../extensions.js"; import { applyExtraParamsToAgent } from "../extra-params.js"; import { @@ -617,6 +619,60 @@ export function resolveAttemptFsWorkspaceOnly(params: { }); } +export function prependSystemPromptAddition(params: { + systemPrompt: string; + systemPromptAddition?: string; +}): string { + if (!params.systemPromptAddition) { + return params.systemPrompt; + } + return `${params.systemPromptAddition}\n\n${params.systemPrompt}`; +} + +/** Build legacy compaction params passed into context-engine afterTurn hooks. */ +export function buildAfterTurnLegacyCompactionParams(params: { + attempt: Pick< + EmbeddedRunAttemptParams, + | "sessionKey" + | "messageChannel" + | "messageProvider" + | "agentAccountId" + | "config" + | "skillsSnapshot" + | "senderIsOwner" + | "provider" + | "modelId" + | "thinkLevel" + | "reasoningLevel" + | "bashElevated" + | "extraSystemPrompt" + | "ownerNumbers" + | "authProfileId" + >; + workspaceDir: string; + agentDir: string; +}): Partial { + return { + sessionKey: params.attempt.sessionKey, + messageChannel: params.attempt.messageChannel, + messageProvider: params.attempt.messageProvider, + agentAccountId: params.attempt.agentAccountId, + authProfileId: params.attempt.authProfileId, + workspaceDir: params.workspaceDir, + agentDir: params.agentDir, + config: params.attempt.config, + skillsSnapshot: params.attempt.skillsSnapshot, + senderIsOwner: params.attempt.senderIsOwner, + provider: params.attempt.provider, + model: params.attempt.modelId, + thinkLevel: params.attempt.thinkLevel, + reasoningLevel: params.attempt.reasoningLevel, + bashElevated: params.attempt.bashElevated, + extraSystemPrompt: params.attempt.extraSystemPrompt, + ownerNumbers: params.attempt.ownerNumbers, + }; +} + function summarizeMessagePayload(msg: AgentMessage): { textChars: number; imageBlocks: number } { const content = (msg as { content?: unknown }).content; if (typeof content === "string") { @@ -1025,6 +1081,17 @@ export async function runEmbeddedAttempt( }); trackSessionManagerAccess(params.sessionFile); + if (hadSessionFile && params.contextEngine?.bootstrap) { + try { + await params.contextEngine.bootstrap({ + sessionId: params.sessionId, + sessionFile: params.sessionFile, + }); + } catch (bootstrapErr) { + log.warn(`context engine bootstrap failed: ${String(bootstrapErr)}`); + } + } + await prepareSessionManagerForRun({ sessionManager, sessionFile: params.sessionFile, @@ -1038,6 +1105,10 @@ export async function runEmbeddedAttempt( agentDir, cfg: params.config, }); + applyPiAutoCompactionGuard({ + settingsManager, + contextEngineInfo: params.contextEngine?.info, + }); // Sets compaction/pruning runtime state and returns extension factories // that must be passed to the resource loader for the safeguard to be active. @@ -1336,6 +1407,33 @@ export async function runEmbeddedAttempt( if (limited.length > 0) { activeSession.agent.replaceMessages(limited); } + + if (params.contextEngine) { + try { + const assembled = await params.contextEngine.assemble({ + sessionId: params.sessionId, + messages: activeSession.messages, + tokenBudget: params.contextTokenBudget, + }); + if (assembled.messages !== activeSession.messages) { + activeSession.agent.replaceMessages(assembled.messages); + } + if (assembled.systemPromptAddition) { + systemPromptText = prependSystemPromptAddition({ + systemPrompt: systemPromptText, + systemPromptAddition: assembled.systemPromptAddition, + }); + applySystemPromptOverrideToSession(activeSession, systemPromptText); + log.debug( + `context engine: prepended system prompt addition (${assembled.systemPromptAddition.length} chars)`, + ); + } + } catch (assembleErr) { + log.warn( + `context engine assemble failed, using pipeline messages: ${String(assembleErr)}`, + ); + } + } } catch (err) { await flushPendingToolResultsAfterIdle({ agent: activeSession?.agent, @@ -1515,6 +1613,7 @@ export async function runEmbeddedAttempt( let promptError: unknown = null; let promptErrorSource: "prompt" | "compaction" | null = null; + const prePromptMessageCount = activeSession.messages.length; try { const promptStartedAt = Date.now(); @@ -1772,6 +1871,56 @@ export async function runEmbeddedAttempt( } } + // Let the active context engine run its post-turn lifecycle. + if (params.contextEngine) { + const afterTurnLegacyCompactionParams = buildAfterTurnLegacyCompactionParams({ + attempt: params, + workspaceDir: effectiveWorkspace, + agentDir, + }); + + if (typeof params.contextEngine.afterTurn === "function") { + try { + await params.contextEngine.afterTurn({ + sessionId: sessionIdUsed, + sessionFile: params.sessionFile, + messages: messagesSnapshot, + prePromptMessageCount, + tokenBudget: params.contextTokenBudget, + legacyCompactionParams: afterTurnLegacyCompactionParams, + }); + } catch (afterTurnErr) { + log.warn(`context engine afterTurn failed: ${String(afterTurnErr)}`); + } + } else { + // Fallback: ingest new messages individually + const newMessages = messagesSnapshot.slice(prePromptMessageCount); + if (newMessages.length > 0) { + if (typeof params.contextEngine.ingestBatch === "function") { + try { + await params.contextEngine.ingestBatch({ + sessionId: sessionIdUsed, + messages: newMessages, + }); + } catch (ingestErr) { + log.warn(`context engine ingest failed: ${String(ingestErr)}`); + } + } else { + for (const msg of newMessages) { + try { + await params.contextEngine.ingest({ + sessionId: sessionIdUsed, + message: msg, + }); + } catch (ingestErr) { + log.warn(`context engine ingest failed: ${String(ingestErr)}`); + } + } + } + } + } + } + cacheTrace?.recordStage("session:after", { messages: messagesSnapshot, note: timedOutDuringCompaction diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index 35251edd807..dff5aa6f251 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -3,6 +3,7 @@ import type { Api, AssistantMessage, Model } from "@mariozechner/pi-ai"; import type { AuthStorage, ModelRegistry } from "@mariozechner/pi-coding-agent"; import type { ThinkLevel } from "../../../auto-reply/thinking.js"; import type { SessionSystemPromptReport } from "../../../config/sessions/types.js"; +import type { ContextEngine } from "../../../context-engine/types.js"; import type { PluginHookBeforeAgentStartResult } from "../../../plugins/types.js"; import type { MessagingToolSend } from "../../pi-embedded-messaging.js"; import type { NormalizedUsage } from "../../usage.js"; @@ -14,6 +15,14 @@ type EmbeddedRunAttemptBase = Omit< >; export type EmbeddedRunAttemptParams = EmbeddedRunAttemptBase & { + /** Pluggable context engine for ingest/assemble/compact lifecycle. */ + contextEngine?: ContextEngine; + /** Resolved model context window in tokens for assemble/compact budgeting. */ + contextTokenBudget?: number; + /** Auth profile resolved for this attempt's provider/model call. */ + authProfileId?: string; + /** Source for the resolved auth profile (user-locked or automatic). */ + authProfileIdSource?: "auto" | "user"; provider: string; modelId: string; model: Model; diff --git a/src/agents/pi-settings.ts b/src/agents/pi-settings.ts index 3ea4c5d5b51..f1b66c6ea61 100644 --- a/src/agents/pi-settings.ts +++ b/src/agents/pi-settings.ts @@ -1,4 +1,5 @@ import type { OpenClawConfig } from "../config/config.js"; +import type { ContextEngineInfo } from "../context-engine/types.js"; export const DEFAULT_PI_COMPACTION_RESERVE_TOKENS_FLOOR = 20_000; @@ -11,6 +12,7 @@ type PiSettingsManagerLike = { keepRecentTokens?: number; }; }) => void; + setCompactionEnabled?: (enabled: boolean) => void; }; export function ensurePiCompactionReserveTokens(params: { @@ -95,3 +97,26 @@ export function applyPiCompactionSettingsFromConfig(params: { }, }; } + +/** Decide whether Pi's internal auto-compaction should be disabled for this run. */ +export function shouldDisablePiAutoCompaction(params: { + contextEngineInfo?: ContextEngineInfo; +}): boolean { + return params.contextEngineInfo?.ownsCompaction === true; +} + +/** Disable Pi auto-compaction via settings when a context engine owns compaction. */ +export function applyPiAutoCompactionGuard(params: { + settingsManager: PiSettingsManagerLike; + contextEngineInfo?: ContextEngineInfo; +}): { supported: boolean; disabled: boolean } { + const disable = shouldDisablePiAutoCompaction({ + contextEngineInfo: params.contextEngineInfo, + }); + const hasMethod = typeof params.settingsManager.setCompactionEnabled === "function"; + if (!disable || !hasMethod) { + return { supported: hasMethod, disabled: false }; + } + params.settingsManager.setCompactionEnabled!(false); + return { supported: true, disabled: true }; +} diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 906a8424ff8..e2453bcc0fd 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -8,8 +8,12 @@ import { resolveStorePath, type SessionEntry, } from "../config/sessions.js"; +import { ensureContextEnginesInitialized } from "../context-engine/init.js"; +import { resolveContextEngine } from "../context-engine/registry.js"; +import type { SubagentEndReason } from "../context-engine/types.js"; import { callGateway } from "../gateway/call.js"; import { onAgentEvent } from "../infra/agent-events.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; import { defaultRuntime } from "../runtime.js"; import { type DeliveryContext, normalizeDeliveryContext } from "../utils/delivery-context.js"; import { resetAnnounceQueuesForTests } from "./subagent-announce-queue.js"; @@ -54,6 +58,7 @@ import type { SubagentRunRecord } from "./subagent-registry.types.js"; import { resolveAgentTimeoutMs } from "./timeout.js"; export type { SubagentRunRecord } from "./subagent-registry.types.js"; +const log = createSubsystemLogger("agents/subagent-registry"); const subagentRuns = new Map(); let sweeper: NodeJS.Timeout | null = null; @@ -305,6 +310,22 @@ function schedulePendingLifecycleError(params: { runId: string; endedAt: number; }); } +async function notifyContextEngineSubagentEnded(params: { + childSessionKey: string; + reason: SubagentEndReason; +}) { + try { + ensureContextEnginesInitialized(); + const engine = await resolveContextEngine(loadConfig()); + if (!engine.onSubagentEnded) { + return; + } + await engine.onSubagentEnded(params); + } catch (err) { + log.warn("context-engine onSubagentEnded failed (best-effort)", { err }); + } +} + function suppressAnnounceForSteerRestart(entry?: SubagentRunRecord) { return entry?.suppressAnnounceReason === "steer-restart"; } @@ -690,6 +711,10 @@ async function sweepSubagentRuns() { continue; } clearPendingLifecycleError(runId); + void notifyContextEngineSubagentEnded({ + childSessionKey: entry.childSessionKey, + reason: "swept", + }); subagentRuns.delete(runId); mutated = true; // Archive/purge is terminal for the run record; remove any retained attachments too. @@ -894,9 +919,8 @@ async function finalizeSubagentCleanup( return; } - // Allow retry on the next wake if announce was deferred or failed. - // Applies to both keep/delete cleanup modes so delete-runs are only removed - // after a successful announce (or terminal give-up). + // Keep both cleanup modes retryable after deferred/failed announce. + // Delete-mode is finalized only after announce succeeds or give-up triggers. entry.cleanupHandled = false; // Clear the in-flight resume marker so the scheduled retry can run again. resumedRuns.delete(runId); @@ -936,11 +960,19 @@ function completeCleanupBookkeeping(params: { }) { if (params.cleanup === "delete") { clearPendingLifecycleError(params.runId); + void notifyContextEngineSubagentEnded({ + childSessionKey: params.entry.childSessionKey, + reason: "deleted", + }); subagentRuns.delete(params.runId); persistSubagentRuns(); retryDeferredCompletedAnnounces(params.runId); return; } + void notifyContextEngineSubagentEnded({ + childSessionKey: params.entry.childSessionKey, + reason: "completed", + }); params.entry.cleanupCompletedAt = params.completedAt; persistSubagentRuns(); retryDeferredCompletedAnnounces(params.runId); @@ -1248,6 +1280,13 @@ export function addSubagentRunForTests(entry: SubagentRunRecord) { export function releaseSubagentRun(runId: string) { clearPendingLifecycleError(runId); + const entry = subagentRuns.get(runId); + if (entry) { + void notifyContextEngineSubagentEnded({ + childSessionKey: entry.childSessionKey, + reason: "released", + }); + } const didDelete = subagentRuns.delete(runId); if (didDelete) { persistSubagentRuns(); diff --git a/src/config/config-misc.test.ts b/src/config/config-misc.test.ts index 29efaa2b136..b46b5b49766 100644 --- a/src/config/config-misc.test.ts +++ b/src/config/config-misc.test.ts @@ -31,6 +31,19 @@ describe("$schema key in config (#14998)", () => { }); }); +describe("plugins.slots.contextEngine", () => { + it("accepts a contextEngine slot id", () => { + const result = OpenClawSchema.safeParse({ + plugins: { + slots: { + contextEngine: "my-context-engine", + }, + }, + }); + expect(result.success).toBe(true); + }); +}); + describe("ui.seamColor", () => { it("accepts hex colors", () => { const res = validateConfigObject({ ui: { seamColor: "#FF4500" } }); diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 911d08620e2..39a43d46acb 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -927,6 +927,8 @@ export const FIELD_HELP: Record = { "Selects which plugins own exclusive runtime slots such as memory so only one plugin provides that capability. Use explicit slot ownership to avoid overlapping providers with conflicting behavior.", "plugins.slots.memory": 'Select the active memory plugin by id, or "none" to disable memory plugins.', + "plugins.slots.contextEngine": + "Selects the active context engine plugin by id so one plugin provides context orchestration behavior.", "plugins.entries": "Per-plugin settings keyed by plugin ID including enablement and plugin-specific runtime configuration payloads. Use this for scoped plugin tuning without changing global loader policy.", "plugins.entries.*.enabled": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index 9454df66fb1..64d444aab47 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -817,6 +817,7 @@ export const FIELD_LABELS: Record = { "plugins.load.paths": "Plugin Load Paths", "plugins.slots": "Plugin Slots", "plugins.slots.memory": "Memory Plugin", + "plugins.slots.contextEngine": "Context Engine Plugin", "plugins.entries": "Plugin Entries", "plugins.entries.*.enabled": "Plugin Enabled", "plugins.entries.*.hooks": "Plugin Hook Policy", diff --git a/src/config/types.plugins.ts b/src/config/types.plugins.ts index 5244795d51e..323946dd541 100644 --- a/src/config/types.plugins.ts +++ b/src/config/types.plugins.ts @@ -10,6 +10,8 @@ export type PluginEntryConfig = { export type PluginSlotsConfig = { /** Select which plugin owns the memory slot ("none" disables memory plugins). */ memory?: string; + /** Select which plugin owns the context-engine slot. */ + contextEngine?: string; }; export type PluginsLoadConfig = { diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index 4d49e0428e4..033044238e8 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -829,6 +829,7 @@ export const OpenClawSchema = z slots: z .object({ memory: z.string().optional(), + contextEngine: z.string().optional(), }) .strict() .optional(), diff --git a/src/context-engine/context-engine.test.ts b/src/context-engine/context-engine.test.ts new file mode 100644 index 00000000000..022fdc14cc8 --- /dev/null +++ b/src/context-engine/context-engine.test.ts @@ -0,0 +1,337 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { describe, expect, it, beforeEach } from "vitest"; +// --------------------------------------------------------------------------- +// We dynamically import the registry so we can get a fresh module per test +// group when needed. For most groups we use the shared singleton directly. +// --------------------------------------------------------------------------- +import { LegacyContextEngine, registerLegacyContextEngine } from "./legacy.js"; +import { + registerContextEngine, + getContextEngineFactory, + listContextEngineIds, + resolveContextEngine, +} from "./registry.js"; +import type { + ContextEngine, + ContextEngineInfo, + AssembleResult, + CompactResult, + IngestResult, +} from "./types.js"; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Build a config object with a contextEngine slot for testing. */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function configWithSlot(engineId: string): any { + return { plugins: { slots: { contextEngine: engineId } } }; +} + +function makeMockMessage(role: "user" | "assistant" = "user", text = "hello"): AgentMessage { + return { role, content: text, timestamp: Date.now() } as AgentMessage; +} + +/** A minimal mock engine that satisfies the ContextEngine interface. */ +class MockContextEngine implements ContextEngine { + readonly info: ContextEngineInfo = { + id: "mock", + name: "Mock Engine", + version: "0.0.1", + }; + + async ingest(_params: { + sessionId: string; + message: AgentMessage; + isHeartbeat?: boolean; + }): Promise { + return { ingested: true }; + } + + async assemble(params: { + sessionId: string; + messages: AgentMessage[]; + tokenBudget?: number; + }): Promise { + return { + messages: params.messages, + estimatedTokens: 42, + systemPromptAddition: "mock system addition", + }; + } + + async compact(_params: { + sessionId: string; + sessionFile: string; + tokenBudget?: number; + compactionTarget?: "budget" | "threshold"; + customInstructions?: string; + legacyParams?: Record; + }): Promise { + return { + ok: true, + compacted: true, + reason: "mock compaction", + result: { + summary: "mock summary", + tokensBefore: 100, + tokensAfter: 50, + }, + }; + } + + async dispose(): Promise { + // no-op + } +} + +// ═══════════════════════════════════════════════════════════════════════════ +// 1. Engine contract tests +// ═══════════════════════════════════════════════════════════════════════════ + +describe("Engine contract tests", () => { + it("a mock engine implementing ContextEngine can be registered and resolved", async () => { + const factory = () => new MockContextEngine(); + registerContextEngine("mock", factory); + + const resolved = getContextEngineFactory("mock"); + expect(resolved).toBe(factory); + + const engine = await resolved!(); + expect(engine).toBeInstanceOf(MockContextEngine); + expect(engine.info.id).toBe("mock"); + }); + + it("ingest() returns IngestResult with ingested boolean", async () => { + const engine = new MockContextEngine(); + const result = await engine.ingest({ + sessionId: "s1", + message: makeMockMessage(), + }); + + expect(result).toHaveProperty("ingested"); + expect(typeof result.ingested).toBe("boolean"); + expect(result.ingested).toBe(true); + }); + + it("assemble() returns AssembleResult with messages array and estimatedTokens", async () => { + const engine = new MockContextEngine(); + const msgs = [makeMockMessage(), makeMockMessage("assistant", "world")]; + const result = await engine.assemble({ + sessionId: "s1", + messages: msgs, + }); + + expect(Array.isArray(result.messages)).toBe(true); + expect(result.messages).toHaveLength(2); + expect(typeof result.estimatedTokens).toBe("number"); + expect(result.estimatedTokens).toBe(42); + expect(result.systemPromptAddition).toBe("mock system addition"); + }); + + it("compact() returns CompactResult with ok, compacted, reason, result fields", async () => { + const engine = new MockContextEngine(); + const result = await engine.compact({ + sessionId: "s1", + sessionFile: "/tmp/session.json", + }); + + expect(typeof result.ok).toBe("boolean"); + expect(typeof result.compacted).toBe("boolean"); + expect(result.ok).toBe(true); + expect(result.compacted).toBe(true); + expect(result.reason).toBe("mock compaction"); + expect(result.result).toBeDefined(); + expect(result.result!.summary).toBe("mock summary"); + expect(result.result!.tokensBefore).toBe(100); + expect(result.result!.tokensAfter).toBe(50); + }); + + it("dispose() is callable (optional method)", async () => { + const engine = new MockContextEngine(); + // Should complete without error + await expect(engine.dispose()).resolves.toBeUndefined(); + }); +}); + +// ═══════════════════════════════════════════════════════════════════════════ +// 2. Registry tests +// ═══════════════════════════════════════════════════════════════════════════ + +describe("Registry tests", () => { + it("registerContextEngine() stores a factory", () => { + const factory = () => new MockContextEngine(); + registerContextEngine("reg-test-1", factory); + + expect(getContextEngineFactory("reg-test-1")).toBe(factory); + }); + + it("getContextEngineFactory() returns the factory", () => { + const factory = () => new MockContextEngine(); + registerContextEngine("reg-test-2", factory); + + const retrieved = getContextEngineFactory("reg-test-2"); + expect(retrieved).toBe(factory); + expect(typeof retrieved).toBe("function"); + }); + + it("listContextEngineIds() returns all registered ids", () => { + // Ensure at least our test entries exist + registerContextEngine("reg-test-a", () => new MockContextEngine()); + registerContextEngine("reg-test-b", () => new MockContextEngine()); + + const ids = listContextEngineIds(); + expect(ids).toContain("reg-test-a"); + expect(ids).toContain("reg-test-b"); + expect(Array.isArray(ids)).toBe(true); + }); + + it("registering the same id overwrites the previous factory", () => { + const factory1 = () => new MockContextEngine(); + const factory2 = () => new MockContextEngine(); + + registerContextEngine("reg-overwrite", factory1); + expect(getContextEngineFactory("reg-overwrite")).toBe(factory1); + + registerContextEngine("reg-overwrite", factory2); + expect(getContextEngineFactory("reg-overwrite")).toBe(factory2); + expect(getContextEngineFactory("reg-overwrite")).not.toBe(factory1); + }); +}); + +// ═══════════════════════════════════════════════════════════════════════════ +// 3. Default engine selection +// ═══════════════════════════════════════════════════════════════════════════ + +describe("Default engine selection", () => { + // Ensure both legacy and a custom test engine are registered before these tests. + beforeEach(() => { + // Registration is idempotent (Map.set), so calling again is safe. + registerLegacyContextEngine(); + // Register a lightweight custom stub so we don't need external resources. + registerContextEngine("test-engine", () => { + const engine: ContextEngine = { + info: { id: "test-engine", name: "Custom Test Engine", version: "0.0.0" }, + async ingest() { + return { ingested: true }; + }, + async assemble({ messages }) { + return { messages, estimatedTokens: 0 }; + }, + async compact() { + return { ok: true, compacted: false }; + }, + }; + return engine; + }); + }); + + it("resolveContextEngine() with no config returns the default ('legacy') engine", async () => { + const engine = await resolveContextEngine(); + expect(engine.info.id).toBe("legacy"); + }); + + it("resolveContextEngine() with config contextEngine='legacy' returns legacy engine", async () => { + const engine = await resolveContextEngine(configWithSlot("legacy")); + expect(engine.info.id).toBe("legacy"); + }); + + it("resolveContextEngine() with config contextEngine='test-engine' returns the custom engine", async () => { + const engine = await resolveContextEngine(configWithSlot("test-engine")); + expect(engine.info.id).toBe("test-engine"); + }); +}); + +// ═══════════════════════════════════════════════════════════════════════════ +// 4. Invalid engine fallback +// ═══════════════════════════════════════════════════════════════════════════ + +describe("Invalid engine fallback", () => { + it("resolveContextEngine() with config pointing to unregistered engine throws with helpful error", async () => { + await expect(resolveContextEngine(configWithSlot("nonexistent-engine"))).rejects.toThrow( + /nonexistent-engine/, + ); + }); + + it("error message includes the requested id and available ids", async () => { + // Ensure at least legacy is registered so we see it in the available list + registerLegacyContextEngine(); + + try { + await resolveContextEngine(configWithSlot("does-not-exist")); + // Should not reach here + expect.unreachable("Expected resolveContextEngine to throw"); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + expect(message).toContain("does-not-exist"); + expect(message).toContain("not registered"); + // Should mention available engines + expect(message).toMatch(/Available engines:/); + // At least "legacy" should be listed as available + expect(message).toContain("legacy"); + } + }); +}); + +// ═══════════════════════════════════════════════════════════════════════════ +// 5. LegacyContextEngine parity +// ═══════════════════════════════════════════════════════════════════════════ + +describe("LegacyContextEngine parity", () => { + it("ingest() returns { ingested: false } (no-op)", async () => { + const engine = new LegacyContextEngine(); + const result = await engine.ingest({ + sessionId: "s1", + message: makeMockMessage(), + }); + + expect(result).toEqual({ ingested: false }); + }); + + it("assemble() returns messages as-is (pass-through)", async () => { + const engine = new LegacyContextEngine(); + const messages = [ + makeMockMessage("user", "first"), + makeMockMessage("assistant", "second"), + makeMockMessage("user", "third"), + ]; + + const result = await engine.assemble({ + sessionId: "s1", + messages, + }); + + // Messages should be the exact same array reference (pass-through) + expect(result.messages).toBe(messages); + expect(result.messages).toHaveLength(3); + expect(result.estimatedTokens).toBe(0); + expect(result.systemPromptAddition).toBeUndefined(); + }); + + it("dispose() completes without error", async () => { + const engine = new LegacyContextEngine(); + await expect(engine.dispose()).resolves.toBeUndefined(); + }); +}); + +// ═══════════════════════════════════════════════════════════════════════════ +// 6. Initialization guard +// ═══════════════════════════════════════════════════════════════════════════ + +describe("Initialization guard", () => { + it("ensureContextEnginesInitialized() is idempotent (calling twice does not throw)", async () => { + const { ensureContextEnginesInitialized } = await import("./init.js"); + + expect(() => ensureContextEnginesInitialized()).not.toThrow(); + expect(() => ensureContextEnginesInitialized()).not.toThrow(); + }); + + it("after init, 'legacy' engine is registered", async () => { + const { ensureContextEnginesInitialized } = await import("./init.js"); + ensureContextEnginesInitialized(); + + const ids = listContextEngineIds(); + expect(ids).toContain("legacy"); + }); +}); diff --git a/src/context-engine/index.ts b/src/context-engine/index.ts new file mode 100644 index 00000000000..fa3193d4030 --- /dev/null +++ b/src/context-engine/index.ts @@ -0,0 +1,19 @@ +export type { + ContextEngine, + ContextEngineInfo, + AssembleResult, + CompactResult, + IngestResult, +} from "./types.js"; + +export { + registerContextEngine, + getContextEngineFactory, + listContextEngineIds, + resolveContextEngine, +} from "./registry.js"; +export type { ContextEngineFactory } from "./registry.js"; + +export { LegacyContextEngine, registerLegacyContextEngine } from "./legacy.js"; + +export { ensureContextEnginesInitialized } from "./init.js"; diff --git a/src/context-engine/init.ts b/src/context-engine/init.ts new file mode 100644 index 00000000000..1052e4b3677 --- /dev/null +++ b/src/context-engine/init.ts @@ -0,0 +1,23 @@ +import { registerLegacyContextEngine } from "./legacy.js"; + +/** + * Ensures all built-in context engines are registered exactly once. + * + * The legacy engine is always registered as a safe fallback so that + * `resolveContextEngine()` can resolve the default "legacy" slot without + * callers needing to remember manual registration. + * + * Additional engines are registered by their own plugins via + * `api.registerContextEngine()` during plugin load. + */ +let initialized = false; + +export function ensureContextEnginesInitialized(): void { + if (initialized) { + return; + } + initialized = true; + + // Always available – safe fallback for the "legacy" slot default. + registerLegacyContextEngine(); +} diff --git a/src/context-engine/legacy.ts b/src/context-engine/legacy.ts new file mode 100644 index 00000000000..ba05c9e8b8d --- /dev/null +++ b/src/context-engine/legacy.ts @@ -0,0 +1,115 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { registerContextEngine } from "./registry.js"; +import type { + ContextEngine, + ContextEngineInfo, + AssembleResult, + CompactResult, + IngestResult, +} from "./types.js"; + +/** + * LegacyContextEngine wraps the existing compaction behavior behind the + * ContextEngine interface, preserving 100% backward compatibility. + * + * - ingest: no-op (SessionManager handles message persistence) + * - assemble: pass-through (existing sanitize/validate/limit pipeline in attempt.ts handles this) + * - compact: delegates to compactEmbeddedPiSessionDirect + */ +export class LegacyContextEngine implements ContextEngine { + readonly info: ContextEngineInfo = { + id: "legacy", + name: "Legacy Context Engine", + version: "1.0.0", + }; + + async ingest(_params: { + sessionId: string; + message: AgentMessage; + isHeartbeat?: boolean; + }): Promise { + // No-op: SessionManager handles message persistence in the legacy flow + return { ingested: false }; + } + + async assemble(params: { + sessionId: string; + messages: AgentMessage[]; + tokenBudget?: number; + }): Promise { + // Pass-through: the existing sanitize -> validate -> limit -> repair pipeline + // in attempt.ts handles context assembly for the legacy engine. + // We just return the messages as-is with a rough token estimate. + return { + messages: params.messages, + estimatedTokens: 0, // Caller handles estimation + }; + } + + async afterTurn(_params: { + sessionId: string; + sessionFile: string; + messages: AgentMessage[]; + prePromptMessageCount: number; + autoCompactionSummary?: string; + isHeartbeat?: boolean; + tokenBudget?: number; + legacyCompactionParams?: Record; + }): Promise { + // No-op: legacy flow persists context directly in SessionManager. + } + + async compact(params: { + sessionId: string; + sessionFile: string; + tokenBudget?: number; + force?: boolean; + currentTokenCount?: number; + compactionTarget?: "budget" | "threshold"; + customInstructions?: string; + legacyParams?: Record; + }): Promise { + // Import dynamically to avoid circular dependencies + const { compactEmbeddedPiSessionDirect } = + await import("../agents/pi-embedded-runner/compact.js"); + + // legacyParams carries the full CompactEmbeddedPiSessionParams fields + // set by the caller in run.ts. We spread them and override the fields + // that come from the ContextEngine compact() signature directly. + const lp = params.legacyParams ?? {}; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any -- legacy bridge: legacyParams is an opaque bag matching CompactEmbeddedPiSessionParams + const result = await compactEmbeddedPiSessionDirect({ + ...lp, + sessionId: params.sessionId, + sessionFile: params.sessionFile, + tokenBudget: params.tokenBudget, + force: params.force, + customInstructions: params.customInstructions, + workspaceDir: (lp.workspaceDir as string) ?? process.cwd(), + } as Parameters[0]); + + return { + ok: result.ok, + compacted: result.compacted, + reason: result.reason, + result: result.result + ? { + summary: result.result.summary, + firstKeptEntryId: result.result.firstKeptEntryId, + tokensBefore: result.result.tokensBefore, + tokensAfter: result.result.tokensAfter, + details: result.result.details, + } + : undefined, + }; + } + + async dispose(): Promise { + // Nothing to clean up for legacy engine + } +} + +export function registerLegacyContextEngine(): void { + registerContextEngine("legacy", () => new LegacyContextEngine()); +} diff --git a/src/context-engine/registry.ts b/src/context-engine/registry.ts new file mode 100644 index 00000000000..49bf34bfbb3 --- /dev/null +++ b/src/context-engine/registry.ts @@ -0,0 +1,67 @@ +import type { OpenClawConfig } from "../config/config.js"; +import { defaultSlotIdForKey } from "../plugins/slots.js"; +import type { ContextEngine } from "./types.js"; + +/** + * A factory that creates a ContextEngine instance. + * Supports async creation for engines that need DB connections etc. + */ +export type ContextEngineFactory = () => ContextEngine | Promise; + +// --------------------------------------------------------------------------- +// Registry (module-level singleton) +// --------------------------------------------------------------------------- + +const _engines = new Map(); + +/** + * Register a context engine implementation under the given id. + */ +export function registerContextEngine(id: string, factory: ContextEngineFactory): void { + _engines.set(id, factory); +} + +/** + * Return the factory for a registered engine, or undefined. + */ +export function getContextEngineFactory(id: string): ContextEngineFactory | undefined { + return _engines.get(id); +} + +/** + * List all registered engine ids. + */ +export function listContextEngineIds(): string[] { + return [..._engines.keys()]; +} + +// --------------------------------------------------------------------------- +// Resolution +// --------------------------------------------------------------------------- + +/** + * Resolve which ContextEngine to use based on plugin slot configuration. + * + * Resolution order: + * 1. `config.plugins.slots.contextEngine` (explicit slot override) + * 2. Default slot value ("legacy") + * + * Throws if the resolved engine id has no registered factory. + */ +export async function resolveContextEngine(config?: OpenClawConfig): Promise { + const slotValue = config?.plugins?.slots?.contextEngine; + const engineId = + typeof slotValue === "string" && slotValue.trim() + ? slotValue.trim() + : defaultSlotIdForKey("contextEngine"); + + const factory = _engines.get(engineId); + if (!factory) { + throw new Error( + `Context engine "${engineId}" is not registered. ` + + `Available engines: ${listContextEngineIds().join(", ") || "(none)"}`, + ); + } + + return factory(); +} diff --git a/src/context-engine/types.ts b/src/context-engine/types.ts new file mode 100644 index 00000000000..525c673b092 --- /dev/null +++ b/src/context-engine/types.ts @@ -0,0 +1,167 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; + +// Result types + +export type AssembleResult = { + /** Ordered messages to use as model context */ + messages: AgentMessage[]; + /** Estimated total tokens in assembled context */ + estimatedTokens: number; + /** Optional context-engine-provided instructions prepended to the runtime system prompt */ + systemPromptAddition?: string; +}; + +export type CompactResult = { + ok: boolean; + compacted: boolean; + reason?: string; + result?: { + summary?: string; + firstKeptEntryId?: string; + tokensBefore: number; + tokensAfter?: number; + details?: unknown; + }; +}; + +export type IngestResult = { + /** Whether the message was ingested (false if duplicate or no-op) */ + ingested: boolean; +}; + +export type IngestBatchResult = { + /** Number of messages ingested from the supplied batch */ + ingestedCount: number; +}; + +export type BootstrapResult = { + /** Whether bootstrap ran and initialized the engine's store */ + bootstrapped: boolean; + /** Number of historical messages imported (if applicable) */ + importedMessages?: number; + /** Optional reason when bootstrap was skipped */ + reason?: string; +}; + +export type ContextEngineInfo = { + id: string; + name: string; + version?: string; + /** True when the engine manages its own compaction lifecycle. */ + ownsCompaction?: boolean; +}; + +export type SubagentSpawnPreparation = { + /** Roll back pre-spawn setup when subagent launch fails. */ + rollback: () => void | Promise; +}; + +export type SubagentEndReason = "deleted" | "completed" | "swept" | "released"; + +/** + * ContextEngine defines the pluggable contract for context management. + * + * Required methods define a generic lifecycle; optional methods allow engines + * to provide additional capabilities (retrieval, lineage, etc.). + */ +export interface ContextEngine { + /** Engine identifier and metadata */ + readonly info: ContextEngineInfo; + + /** + * Initialize engine state for a session, optionally importing historical context. + */ + bootstrap?(params: { sessionId: string; sessionFile: string }): Promise; + + /** + * Ingest a single message into the engine's store. + */ + ingest(params: { + sessionId: string; + message: AgentMessage; + /** True when the message belongs to a heartbeat run. */ + isHeartbeat?: boolean; + }): Promise; + + /** + * Ingest a completed turn batch as a single unit. + */ + ingestBatch?(params: { + sessionId: string; + messages: AgentMessage[]; + /** True when the batch belongs to a heartbeat run. */ + isHeartbeat?: boolean; + }): Promise; + + /** + * Execute optional post-turn lifecycle work after a run attempt completes. + * Engines can use this to persist canonical context and trigger background + * compaction decisions. + */ + afterTurn?(params: { + sessionId: string; + sessionFile: string; + messages: AgentMessage[]; + /** Number of messages that existed before the prompt was sent. */ + prePromptMessageCount: number; + /** Optional auto-compaction summary emitted by the runtime. */ + autoCompactionSummary?: string; + /** True when this turn belongs to a heartbeat run. */ + isHeartbeat?: boolean; + /** Optional model context token budget for proactive compaction. */ + tokenBudget?: number; + /** Backward-compat only: legacy compaction bridge runtime params. */ + legacyCompactionParams?: Record; + }): Promise; + + /** + * Assemble model context under a token budget. + * Returns an ordered set of messages ready for the model. + */ + assemble(params: { + sessionId: string; + messages: AgentMessage[]; + tokenBudget?: number; + }): Promise; + + /** + * Compact context to reduce token usage. + * May create summaries, prune old turns, etc. + */ + compact(params: { + sessionId: string; + sessionFile: string; + tokenBudget?: number; + /** Backward-compat only: force legacy compaction behavior even below threshold. */ + force?: boolean; + /** Optional live token estimate from the caller's active context. */ + currentTokenCount?: number; + /** Controls convergence target; defaults to budget for compatibility. */ + compactionTarget?: "budget" | "threshold"; + customInstructions?: string; + /** Backward-compat only: full params bag for legacy compaction bridge. */ + legacyParams?: Record; + }): Promise; + + /** + * Prepare context-engine-managed subagent state before the child run starts. + * + * Implementations can return a rollback handle that is invoked when spawn + * fails after preparation succeeds. + */ + prepareSubagentSpawn?(params: { + parentSessionKey: string; + childSessionKey: string; + ttlMs?: number; + }): Promise; + + /** + * Notify the context engine that a subagent lifecycle ended. + */ + onSubagentEnded?(params: { childSessionKey: string; reason: SubagentEndReason }): Promise; + + /** + * Dispose of any resources held by the engine. + */ + dispose?(): Promise; +} diff --git a/src/cron/isolated-agent.model-formatting.test.ts b/src/cron/isolated-agent.model-formatting.test.ts new file mode 100644 index 00000000000..bfd751664af --- /dev/null +++ b/src/cron/isolated-agent.model-formatting.test.ts @@ -0,0 +1,541 @@ +import "./isolated-agent.mocks.js"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { loadModelCatalog } from "../agents/model-catalog.js"; +import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; +import { runCronIsolatedAgentTurn } from "./isolated-agent.js"; +import { + makeCfg, + makeJob, + withTempCronHome, + writeSessionStoreEntries, +} from "./isolated-agent.test-harness.js"; +import type { CronJob } from "./types.js"; + +const withTempHome = withTempCronHome; + +function makeDeps() { + return { + sendMessageSlack: vi.fn(), + sendMessageWhatsApp: vi.fn(), + sendMessageTelegram: vi.fn(), + sendMessageDiscord: vi.fn(), + sendMessageSignal: vi.fn(), + sendMessageIMessage: vi.fn(), + }; +} + +function mockEmbeddedOk() { + vi.mocked(runEmbeddedPiAgent).mockResolvedValue({ + payloads: [{ text: "ok" }], + meta: { + durationMs: 5, + agentMeta: { sessionId: "s", provider: "p", model: "m" }, + }, + }); +} + +/** + * Extract the provider and model from the last runEmbeddedPiAgent call. + */ +function lastEmbeddedCall(): { provider?: string; model?: string } { + const calls = vi.mocked(runEmbeddedPiAgent).mock.calls; + expect(calls.length).toBeGreaterThan(0); + return calls.at(-1)?.[0] as { provider?: string; model?: string }; +} + +const DEFAULT_MESSAGE = "do it"; + +type TurnOptions = { + cfgOverrides?: Parameters[2]; + jobPayload?: CronJob["payload"]; + sessionKey?: string; + storeEntries?: Record>; +}; + +/** Like runTurn but does NOT assert the embedded agent was called (for error paths). */ +async function runErrorTurn(home: string, options: TurnOptions = {}) { + const storePath = await writeSessionStoreEntries(home, { + "agent:main:main": { + sessionId: "main-session", + updatedAt: Date.now(), + lastProvider: "webchat", + lastTo: "", + }, + ...options.storeEntries, + }); + mockEmbeddedOk(); + + const jobPayload = options.jobPayload ?? { + kind: "agentTurn" as const, + message: DEFAULT_MESSAGE, + deliver: false, + }; + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath, options.cfgOverrides), + deps: makeDeps(), + job: makeJob(jobPayload), + message: DEFAULT_MESSAGE, + sessionKey: options.sessionKey ?? "cron:job-1", + lane: "cron", + }); + + return { res }; +} + +async function runTurn(home: string, options: TurnOptions = {}) { + const storePath = await writeSessionStoreEntries(home, { + "agent:main:main": { + sessionId: "main-session", + updatedAt: Date.now(), + lastProvider: "webchat", + lastTo: "", + }, + ...options.storeEntries, + }); + mockEmbeddedOk(); + + const jobPayload = options.jobPayload ?? { + kind: "agentTurn" as const, + message: DEFAULT_MESSAGE, + deliver: false, + }; + + const res = await runCronIsolatedAgentTurn({ + cfg: makeCfg(home, storePath, options.cfgOverrides), + deps: makeDeps(), + job: makeJob(jobPayload), + message: DEFAULT_MESSAGE, + sessionKey: options.sessionKey ?? "cron:job-1", + lane: "cron", + }); + + return { res, call: lastEmbeddedCall() }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("cron model formatting and precedence edge cases", () => { + beforeEach(() => { + vi.mocked(runEmbeddedPiAgent).mockClear(); + vi.mocked(loadModelCatalog).mockResolvedValue([]); + }); + + // ------ provider/model string splitting ------ + + describe("parseModelRef formatting", () => { + it("splits standard provider/model", async () => { + await withTempHome(async (home) => { + const { res, call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, model: "openai/gpt-4.1-mini" }, + }); + expect(res.status).toBe("ok"); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1-mini"); + }); + }); + + it("handles leading/trailing whitespace in model string", async () => { + await withTempHome(async (home) => { + const { res, call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: " openai/gpt-4.1-mini ", + }, + }); + expect(res.status).toBe("ok"); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1-mini"); + }); + }); + + it("handles openrouter nested provider paths", async () => { + await withTempHome(async (home) => { + const { res, call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "openrouter/meta-llama/llama-3.3-70b:free", + }, + }); + expect(res.status).toBe("ok"); + expect(call.provider).toBe("openrouter"); + expect(call.model).toBe("meta-llama/llama-3.3-70b:free"); + }); + }); + + it("rejects model with trailing slash (empty model name)", async () => { + await withTempHome(async (home) => { + const { res } = await runErrorTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, model: "openai/" }, + }); + expect(res.status).toBe("error"); + expect(res.error).toMatch(/invalid model/i); + expect(vi.mocked(runEmbeddedPiAgent)).not.toHaveBeenCalled(); + }); + }); + + it("rejects model with leading slash (empty provider)", async () => { + await withTempHome(async (home) => { + const { res } = await runErrorTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, model: "/gpt-4.1-mini" }, + }); + expect(res.status).toBe("error"); + expect(res.error).toMatch(/invalid model/i); + expect(vi.mocked(runEmbeddedPiAgent)).not.toHaveBeenCalled(); + }); + }); + + it("normalizes provider casing", async () => { + await withTempHome(async (home) => { + const { res, call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "OpenAI/gpt-4.1-mini", + }, + }); + expect(res.status).toBe("ok"); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1-mini"); + }); + }); + + it("normalizes anthropic model aliases", async () => { + await withTempHome(async (home) => { + const { res, call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "anthropic/opus-4.5", + }, + }); + expect(res.status).toBe("ok"); + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-opus-4-5"); + }); + }); + + it("normalizes bedrock provider alias", async () => { + await withTempHome(async (home) => { + const { res, call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "bedrock/claude-sonnet-4-5", + }, + }); + expect(res.status).toBe("ok"); + expect(call.provider).toBe("amazon-bedrock"); + }); + }); + }); + + // ------ precedence: job payload > session override > default ------ + + describe("model precedence isolation", () => { + it("job payload model overrides default (anthropic → openai)", async () => { + // Default in makeCfg is anthropic/claude-opus-4-5. + // Job payload sets openai/gpt-4.1-mini. Provider must be openai. + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "openai/gpt-4.1-mini", + }, + }); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1-mini"); + }); + }); + + it("session override applies when no job payload model is present", async () => { + // No model in job payload. Session store has openai override. + // Provider must be openai, not the default anthropic. + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "existing-session", + updatedAt: Date.now(), + providerOverride: "openai", + modelOverride: "gpt-4.1-mini", + }, + }, + }); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1-mini"); + }); + }); + + it("job payload model wins over conflicting session override", async () => { + // Job payload says anthropic. Session says openai. Job must win. + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "anthropic/claude-sonnet-4-5", + deliver: false, + }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "existing-session", + updatedAt: Date.now(), + providerOverride: "openai", + modelOverride: "gpt-4.1-mini", + }, + }, + }); + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-sonnet-4-5"); + }); + }); + + it("falls through to default when no override is present", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + }); + // makeCfg default is anthropic/claude-opus-4-5 + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-opus-4-5"); + }); + }); + }); + + // ------ sequential runs with different overrides (the CI failure pattern) ------ + + describe("sequential model switches (CI failure regression)", () => { + it("openai override → session openai → job anthropic: each step resolves correctly", async () => { + // This reproduces the exact pattern from the CI failure. + // Three sequential calls in one temp home, switching providers. + await withTempHome(async (home) => { + // Step 1: Job payload says openai + vi.mocked(runEmbeddedPiAgent).mockClear(); + const step1 = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "openai/gpt-4.1-mini", + }, + }); + expect(step1.call.provider).toBe("openai"); + expect(step1.call.model).toBe("gpt-4.1-mini"); + + // Step 2: No job model, session store says openai + vi.mocked(runEmbeddedPiAgent).mockClear(); + mockEmbeddedOk(); + const step2 = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "existing-session", + updatedAt: Date.now(), + providerOverride: "openai", + modelOverride: "gpt-4.1-mini", + }, + }, + }); + expect(step2.call.provider).toBe("openai"); + expect(step2.call.model).toBe("gpt-4.1-mini"); + + // Step 3: Job payload says anthropic, session store still says openai + vi.mocked(runEmbeddedPiAgent).mockClear(); + mockEmbeddedOk(); + const step3 = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "anthropic/claude-opus-4-5", + deliver: false, + }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "existing-session", + updatedAt: Date.now(), + providerOverride: "openai", + modelOverride: "gpt-4.1-mini", + }, + }, + }); + expect(step3.call.provider).toBe("anthropic"); + expect(step3.call.model).toBe("claude-opus-4-5"); + }); + }); + + it("provider does not leak between isolated sequential runs", async () => { + // Run with openai, then run with no override. + // Second run must get the default (anthropic), not leaked openai. + await withTempHome(async (home) => { + // Run 1: explicit openai + const r1 = await runTurn(home, { + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "openai/gpt-4.1-mini", + }, + }); + expect(r1.call.provider).toBe("openai"); + + // Run 2: no override — must revert to default anthropic + vi.mocked(runEmbeddedPiAgent).mockClear(); + mockEmbeddedOk(); + const r2 = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + }); + expect(r2.call.provider).toBe("anthropic"); + expect(r2.call.model).toBe("claude-opus-4-5"); + }); + }); + }); + + // ------ forceNew session + stored model override interaction ------ + + describe("forceNew session preserves model overrides from store", () => { + it("new isolated session inherits stored modelOverride/providerOverride", async () => { + // Isolated cron uses forceNew=true, which creates a new sessionId. + // The stored modelOverride/providerOverride must still be read and applied + // (resolveCronSession spreads ...entry before overriding core fields). + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "old-session-id", + updatedAt: Date.now(), + providerOverride: "openai", + modelOverride: "gpt-4.1-mini", + }, + }, + }); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1-mini"); + }); + }); + + it("new isolated session uses default when store has no override", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "old-session-id", + updatedAt: Date.now(), + // No providerOverride or modelOverride + }, + }, + }); + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-opus-4-5"); + }); + }); + }); + + // ------ whitespace / empty edge cases ------ + + describe("whitespace and empty model strings", () => { + it("whitespace-only model treated as unset (falls to default)", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, model: " " }, + }); + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-opus-4-5"); + }); + }); + + it("empty string model treated as unset", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, model: "" }, + }); + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-opus-4-5"); + }); + }); + + it("whitespace-only session modelOverride is ignored", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + storeEntries: { + "agent:main:cron:job-1": { + sessionId: "old", + updatedAt: Date.now(), + providerOverride: "openai", + modelOverride: " ", + }, + }, + }); + // Whitespace modelOverride should be ignored → default + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-opus-4-5"); + }); + }); + }); + + // ------ config default model as string vs object ------ + + describe("config model format variations", () => { + it("default model as string 'provider/model'", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + cfgOverrides: { + agents: { + defaults: { + model: "openai/gpt-4.1", + }, + }, + }, + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + }); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1"); + }); + }); + + it("default model as object with primary field", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + cfgOverrides: { + agents: { + defaults: { + model: { primary: "openai/gpt-4.1" }, + }, + }, + }, + jobPayload: { kind: "agentTurn", message: DEFAULT_MESSAGE, deliver: false }, + }); + expect(call.provider).toBe("openai"); + expect(call.model).toBe("gpt-4.1"); + }); + }); + + it("job override switches away from object default", async () => { + await withTempHome(async (home) => { + const { call } = await runTurn(home, { + cfgOverrides: { + agents: { + defaults: { + model: { primary: "openai/gpt-4.1" }, + }, + }, + }, + jobPayload: { + kind: "agentTurn", + message: DEFAULT_MESSAGE, + model: "anthropic/claude-sonnet-4-5", + }, + }); + expect(call.provider).toBe("anthropic"); + expect(call.model).toBe("claude-sonnet-4-5"); + }); + }); + }); +}); diff --git a/src/gateway/config-reload.ts b/src/gateway/config-reload.ts index 38fe786a667..3887548e51b 100644 --- a/src/gateway/config-reload.ts +++ b/src/gateway/config-reload.ts @@ -6,7 +6,7 @@ import { isPlainObject } from "../utils.js"; import { buildGatewayReloadPlan, type GatewayReloadPlan } from "./config-reload-plan.js"; export { buildGatewayReloadPlan }; -export type { GatewayReloadPlan } from "./config-reload-plan.js"; +export type { ChannelKind, GatewayReloadPlan } from "./config-reload-plan.js"; export type GatewayReloadSettings = { mode: GatewayReloadMode; diff --git a/src/gateway/method-scopes.ts b/src/gateway/method-scopes.ts index 866d8071a83..04f3b756567 100644 --- a/src/gateway/method-scopes.ts +++ b/src/gateway/method-scopes.ts @@ -63,6 +63,7 @@ const METHOD_SCOPE_GROUPS: Record = { "skills.status", "voicewake.get", "sessions.list", + "sessions.get", "sessions.preview", "sessions.resolve", "sessions.usage", diff --git a/src/gateway/server-methods.ts b/src/gateway/server-methods.ts index 53bd8625aa3..62cd6bbcd9e 100644 --- a/src/gateway/server-methods.ts +++ b/src/gateway/server-methods.ts @@ -1,3 +1,4 @@ +import { withPluginRuntimeGatewayRequestScope } from "../plugins/runtime/gateway-request-scope.js"; import { formatControlPlaneActor, resolveControlPlaneActor } from "./control-plane-audit.js"; import { consumeControlPlaneWriteBudget } from "./control-plane-rate-limit.js"; import { ADMIN_SCOPE, authorizeOperatorScopesForMethod } from "./method-scopes.js"; @@ -138,12 +139,17 @@ export async function handleGatewayRequest( ); return; } - await handler({ - req, - params: (req.params ?? {}) as Record, - client, - isWebchatConnect, - respond, - context, - }); + const invokeHandler = () => + handler({ + req, + params: (req.params ?? {}) as Record, + client, + isWebchatConnect, + respond, + context, + }); + // All handlers run inside a request scope so that plugin runtime + // subagent methods (e.g. context engine tools spawning sub-agents + // during tool execution) can dispatch back into the gateway. + await withPluginRuntimeGatewayRequestScope({ context, isWebchatConnect }, invokeHandler); } diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 523e6655d71..8200031ae7c 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -50,6 +50,7 @@ import { type SessionsPatchResult, type SessionsPreviewEntry, type SessionsPreviewResult, + readSessionMessages, } from "../session-utils.js"; import { applySessionsPatchToStore } from "../sessions-patch.js"; import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js"; @@ -625,6 +626,28 @@ export const sessionsHandlers: GatewayRequestHandlers = { respond(true, { ok: true, key: target.canonicalKey, deleted, archived }, undefined); }, + "sessions.get": ({ params, respond }) => { + const p = params; + const key = requireSessionKey(p.key ?? p.sessionKey, respond); + if (!key) { + return; + } + const limit = + typeof p.limit === "number" && Number.isFinite(p.limit) + ? Math.max(1, Math.floor(p.limit)) + : 200; + + const { target, storePath } = resolveGatewaySessionTargetFromKey(key); + const store = loadSessionStore(storePath); + const entry = target.storeKeys.map((k) => store[k]).find(Boolean); + if (!entry?.sessionId) { + respond(true, { messages: [] }, undefined); + return; + } + const allMessages = readSessionMessages(entry.sessionId, storePath, entry.sessionFile); + const messages = limit < allMessages.length ? allMessages.slice(-limit) : allMessages; + respond(true, { messages }, undefined); + }, "sessions.compact": async ({ params, respond }) => { if (!assertValidParams(params, validateSessionsCompactParams, "sessions.compact", respond)) { return; diff --git a/src/gateway/server-plugins.test.ts b/src/gateway/server-plugins.test.ts index 4f2a4c84059..38f13cf6ac3 100644 --- a/src/gateway/server-plugins.test.ts +++ b/src/gateway/server-plugins.test.ts @@ -1,14 +1,25 @@ -import { describe, expect, test, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; import type { PluginRegistry } from "../plugins/registry.js"; +import type { PluginRuntime } from "../plugins/runtime/types.js"; import type { PluginDiagnostic } from "../plugins/types.js"; -import { loadGatewayPlugins } from "./server-plugins.js"; +import type { GatewayRequestContext, GatewayRequestOptions } from "./server-methods/types.js"; const loadOpenClawPlugins = vi.hoisted(() => vi.fn()); +type HandleGatewayRequestOptions = GatewayRequestOptions & { + extraHandlers?: Record; +}; +const handleGatewayRequest = vi.hoisted(() => + vi.fn(async (_opts: HandleGatewayRequestOptions) => {}), +); vi.mock("../plugins/loader.js", () => ({ loadOpenClawPlugins, })); +vi.mock("./server-methods.js", () => ({ + handleGatewayRequest, +})); + const createRegistry = (diagnostics: PluginDiagnostic[]): PluginRegistry => ({ plugins: [], tools: [], @@ -24,8 +35,75 @@ const createRegistry = (diagnostics: PluginDiagnostic[]): PluginRegistry => ({ diagnostics, }); +type ServerPluginsModule = typeof import("./server-plugins.js"); + +function createTestContext(label: string): GatewayRequestContext { + return { label } as unknown as GatewayRequestContext; +} + +function getLastDispatchedContext(): GatewayRequestContext | undefined { + const call = handleGatewayRequest.mock.calls.at(-1)?.[0]; + return call?.context; +} + +async function importServerPluginsModule(): Promise { + return import("./server-plugins.js"); +} + +function createSubagentRuntime(serverPlugins: ServerPluginsModule): PluginRuntime["subagent"] { + const log = { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }; + loadOpenClawPlugins.mockReturnValue(createRegistry([])); + serverPlugins.loadGatewayPlugins({ + cfg: {}, + workspaceDir: "/tmp", + log, + coreGatewayHandlers: {}, + baseMethods: [], + }); + const call = loadOpenClawPlugins.mock.calls.at(-1)?.[0] as + | { runtimeOptions?: { subagent?: PluginRuntime["subagent"] } } + | undefined; + if (!call?.runtimeOptions?.subagent) { + throw new Error("Expected loadGatewayPlugins to provide subagent runtime"); + } + return call.runtimeOptions.subagent; +} + +beforeEach(() => { + loadOpenClawPlugins.mockReset(); + handleGatewayRequest.mockReset(); + handleGatewayRequest.mockImplementation(async (opts: HandleGatewayRequestOptions) => { + switch (opts.req.method) { + case "agent": + opts.respond(true, { runId: "run-1" }); + return; + case "agent.wait": + opts.respond(true, { status: "ok" }); + return; + case "sessions.get": + opts.respond(true, { messages: [] }); + return; + case "sessions.delete": + opts.respond(true, {}); + return; + default: + opts.respond(true, {}); + } + }); +}); + +afterEach(() => { + vi.resetModules(); +}); + describe("loadGatewayPlugins", () => { - test("logs plugin errors with details", () => { + test("logs plugin errors with details", async () => { + const { loadGatewayPlugins } = await importServerPluginsModule(); const diagnostics: PluginDiagnostic[] = [ { level: "error", @@ -56,4 +134,79 @@ describe("loadGatewayPlugins", () => { ); expect(log.warn).not.toHaveBeenCalled(); }); + + test("provides subagent runtime with sessions.get method aliases", async () => { + const { loadGatewayPlugins } = await importServerPluginsModule(); + loadOpenClawPlugins.mockReturnValue(createRegistry([])); + + const log = { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }; + + loadGatewayPlugins({ + cfg: {}, + workspaceDir: "/tmp", + log, + coreGatewayHandlers: {}, + baseMethods: [], + }); + + const call = loadOpenClawPlugins.mock.calls.at(-1)?.[0]; + const subagent = call?.runtimeOptions?.subagent; + expect(typeof subagent?.getSessionMessages).toBe("function"); + expect(typeof subagent?.getSession).toBe("function"); + }); + + test("shares fallback context across module reloads for existing runtimes", async () => { + const first = await importServerPluginsModule(); + const runtime = createSubagentRuntime(first); + + const staleContext = createTestContext("stale"); + first.setFallbackGatewayContext(staleContext); + await runtime.run({ sessionKey: "s-1", message: "hello" }); + expect(getLastDispatchedContext()).toBe(staleContext); + + vi.resetModules(); + const reloaded = await importServerPluginsModule(); + const freshContext = createTestContext("fresh"); + reloaded.setFallbackGatewayContext(freshContext); + + await runtime.run({ sessionKey: "s-1", message: "hello again" }); + expect(getLastDispatchedContext()).toBe(freshContext); + }); + + test("uses updated fallback context after context replacement", async () => { + const serverPlugins = await importServerPluginsModule(); + const runtime = createSubagentRuntime(serverPlugins); + const firstContext = createTestContext("before-restart"); + const secondContext = createTestContext("after-restart"); + + serverPlugins.setFallbackGatewayContext(firstContext); + await runtime.run({ sessionKey: "s-2", message: "before restart" }); + expect(getLastDispatchedContext()).toBe(firstContext); + + serverPlugins.setFallbackGatewayContext(secondContext); + await runtime.run({ sessionKey: "s-2", message: "after restart" }); + expect(getLastDispatchedContext()).toBe(secondContext); + }); + + test("reflects fallback context object mutation at dispatch time", async () => { + const serverPlugins = await importServerPluginsModule(); + const runtime = createSubagentRuntime(serverPlugins); + const context = { marker: "before-mutation" } as GatewayRequestContext & { + marker: string; + }; + + serverPlugins.setFallbackGatewayContext(context); + context.marker = "after-mutation"; + + await runtime.run({ sessionKey: "s-3", message: "mutated context" }); + const dispatched = getLastDispatchedContext() as + | (GatewayRequestContext & { marker: string }) + | undefined; + expect(dispatched?.marker).toBe("after-mutation"); + }); }); diff --git a/src/gateway/server-plugins.ts b/src/gateway/server-plugins.ts index e879310c304..dde23f703a6 100644 --- a/src/gateway/server-plugins.ts +++ b/src/gateway/server-plugins.ts @@ -1,6 +1,165 @@ +import { randomUUID } from "node:crypto"; import type { loadConfig } from "../config/config.js"; import { loadOpenClawPlugins } from "../plugins/loader.js"; -import type { GatewayRequestHandler } from "./server-methods/types.js"; +import { getPluginRuntimeGatewayRequestScope } from "../plugins/runtime/gateway-request-scope.js"; +import type { PluginRuntime } from "../plugins/runtime/types.js"; +import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "./protocol/client-info.js"; +import type { ErrorShape } from "./protocol/index.js"; +import { PROTOCOL_VERSION } from "./protocol/index.js"; +import { handleGatewayRequest } from "./server-methods.js"; +import type { + GatewayRequestContext, + GatewayRequestHandler, + GatewayRequestOptions, +} from "./server-methods/types.js"; + +// ── Fallback gateway context for non-WS paths (Telegram, WhatsApp, etc.) ── +// The WS path sets a per-request scope via AsyncLocalStorage, but channel +// adapters (Telegram polling, etc.) invoke the agent directly without going +// through handleGatewayRequest. We store the gateway context at startup so +// dispatchGatewayMethod can use it as a fallback. + +const FALLBACK_GATEWAY_CONTEXT_STATE_KEY: unique symbol = Symbol.for( + "openclaw.fallbackGatewayContextState", +); + +type FallbackGatewayContextState = { + context: GatewayRequestContext | undefined; +}; + +const fallbackGatewayContextState = (() => { + const globalState = globalThis as typeof globalThis & { + [FALLBACK_GATEWAY_CONTEXT_STATE_KEY]?: FallbackGatewayContextState; + }; + const existing = globalState[FALLBACK_GATEWAY_CONTEXT_STATE_KEY]; + if (existing) { + return existing; + } + const created: FallbackGatewayContextState = { context: undefined }; + globalState[FALLBACK_GATEWAY_CONTEXT_STATE_KEY] = created; + return created; +})(); + +export function setFallbackGatewayContext(ctx: GatewayRequestContext): void { + // TODO: This startup snapshot can become stale if runtime config/context changes. + fallbackGatewayContextState.context = ctx; +} + +// ── Internal gateway dispatch for plugin runtime ──────────────────── + +function createSyntheticOperatorClient(): GatewayRequestOptions["client"] { + return { + connect: { + minProtocol: PROTOCOL_VERSION, + maxProtocol: PROTOCOL_VERSION, + client: { + id: GATEWAY_CLIENT_IDS.GATEWAY_CLIENT, + version: "internal", + platform: "node", + mode: GATEWAY_CLIENT_MODES.BACKEND, + }, + role: "operator", + scopes: ["operator.admin", "operator.approvals", "operator.pairing"], + }, + }; +} + +async function dispatchGatewayMethod( + method: string, + params: Record, +): Promise { + const scope = getPluginRuntimeGatewayRequestScope(); + const context = scope?.context ?? fallbackGatewayContextState.context; + const isWebchatConnect = scope?.isWebchatConnect ?? (() => false); + if (!context) { + throw new Error( + `Plugin subagent dispatch requires a gateway request scope (method: ${method}). No scope set and no fallback context available.`, + ); + } + + let result: { ok: boolean; payload?: unknown; error?: ErrorShape } | undefined; + await handleGatewayRequest({ + req: { + type: "req", + id: `plugin-subagent-${randomUUID()}`, + method, + params, + }, + client: createSyntheticOperatorClient(), + isWebchatConnect, + respond: (ok, payload, error) => { + if (!result) { + result = { ok, payload, error }; + } + }, + context, + }); + + if (!result) { + throw new Error(`Gateway method "${method}" completed without a response.`); + } + if (!result.ok) { + throw new Error(result.error?.message ?? `Gateway method "${method}" failed.`); + } + return result.payload as T; +} + +function createGatewaySubagentRuntime(): PluginRuntime["subagent"] { + const getSessionMessages: PluginRuntime["subagent"]["getSessionMessages"] = async (params) => { + const payload = await dispatchGatewayMethod<{ messages?: unknown[] }>("sessions.get", { + key: params.sessionKey, + ...(params.limit != null && { limit: params.limit }), + }); + return { messages: Array.isArray(payload?.messages) ? payload.messages : [] }; + }; + + return { + async run(params) { + const payload = await dispatchGatewayMethod<{ runId?: string }>("agent", { + sessionKey: params.sessionKey, + message: params.message, + deliver: params.deliver ?? false, + ...(params.extraSystemPrompt && { extraSystemPrompt: params.extraSystemPrompt }), + ...(params.lane && { lane: params.lane }), + ...(params.idempotencyKey && { idempotencyKey: params.idempotencyKey }), + }); + const runId = payload?.runId; + if (typeof runId !== "string" || !runId) { + throw new Error("Gateway agent method returned an invalid runId."); + } + return { runId }; + }, + async waitForRun(params) { + const payload = await dispatchGatewayMethod<{ status?: string; error?: string }>( + "agent.wait", + { + runId: params.runId, + ...(params.timeoutMs != null && { timeoutMs: params.timeoutMs }), + }, + ); + const status = payload?.status; + if (status !== "ok" && status !== "error" && status !== "timeout") { + throw new Error(`Gateway agent.wait returned unexpected status: ${status}`); + } + return { + status, + ...(typeof payload?.error === "string" && payload.error && { error: payload.error }), + }; + }, + getSessionMessages, + async getSession(params) { + return getSessionMessages(params); + }, + async deleteSession(params) { + await dispatchGatewayMethod("sessions.delete", { + key: params.sessionKey, + deleteTranscript: params.deleteTranscript ?? true, + }); + }, + }; +} + +// ── Plugin loading ────────────────────────────────────────────────── export function loadGatewayPlugins(params: { cfg: ReturnType; @@ -24,6 +183,9 @@ export function loadGatewayPlugins(params: { debug: (msg) => params.log.debug(msg), }, coreGatewayHandlers: params.coreGatewayHandlers, + runtimeOptions: { + subagent: createGatewaySubagentRuntime(), + }, }); const pluginMethods = Object.keys(pluginRegistry.gatewayHandlers); const gatewayMethods = Array.from(new Set([...params.baseMethods, ...pluginMethods])); diff --git a/src/gateway/server.agent.gateway-server-agent.mocks.ts b/src/gateway/server.agent.gateway-server-agent.mocks.ts index b930ccbc67f..c3a33eca9ad 100644 --- a/src/gateway/server.agent.gateway-server-agent.mocks.ts +++ b/src/gateway/server.agent.gateway-server-agent.mocks.ts @@ -1,9 +1,23 @@ import { vi } from "vitest"; -import { createEmptyPluginRegistry, type PluginRegistry } from "../plugins/registry.js"; +import type { PluginRegistry } from "../plugins/registry.js"; import { setActivePluginRegistry } from "../plugins/runtime.js"; export const registryState: { registry: PluginRegistry } = { - registry: createEmptyPluginRegistry(), + registry: { + plugins: [], + tools: [], + hooks: [], + typedHooks: [], + channels: [], + providers: [], + gatewayHandlers: {}, + httpHandlers: [], + httpRoutes: [], + cliRegistrars: [], + services: [], + commands: [], + diagnostics: [], + } as PluginRegistry, }; export function setRegistry(registry: PluginRegistry) { @@ -21,5 +35,7 @@ vi.mock("./server-plugins.js", async () => { gatewayMethods: params.baseMethods ?? [], }; }, + // server.impl.ts sets a fallback context before dispatch; tests only need the symbol to exist. + setFallbackGatewayContext: vi.fn(), }; }); diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 2e816c67dce..efb95e7a7cf 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -89,7 +89,7 @@ import { createSecretsHandlers } from "./server-methods/secrets.js"; import { hasConnectedMobileNode } from "./server-mobile-nodes.js"; import { loadGatewayModelCatalog } from "./server-model-catalog.js"; import { createNodeSubscriptionManager } from "./server-node-subscriptions.js"; -import { loadGatewayPlugins } from "./server-plugins.js"; +import { loadGatewayPlugins, setFallbackGatewayContext } from "./server-plugins.js"; import { createGatewayReloadHandlers } from "./server-reload-handlers.js"; import { resolveGatewayRuntimeConfig } from "./server-runtime-config.js"; import { createGatewayRuntimeState } from "./server-runtime-state.js"; @@ -779,6 +779,63 @@ export async function startGatewayServer( const canvasHostServerPort = (canvasHostServer as CanvasHostServer | null)?.port; + const gatewayRequestContext: import("./server-methods/types.js").GatewayRequestContext = { + deps, + cron, + cronStorePath, + execApprovalManager, + loadGatewayModelCatalog, + getHealthCache, + refreshHealthSnapshot: refreshGatewayHealthSnapshot, + logHealth, + logGateway: log, + incrementPresenceVersion, + getHealthVersion, + broadcast, + broadcastToConnIds, + nodeSendToSession, + nodeSendToAllSubscribed, + nodeSubscribe, + nodeUnsubscribe, + nodeUnsubscribeAll, + hasConnectedMobileNode: hasMobileNodeConnected, + hasExecApprovalClients: () => { + for (const gatewayClient of clients) { + const scopes = Array.isArray(gatewayClient.connect.scopes) + ? gatewayClient.connect.scopes + : []; + if (scopes.includes("operator.admin") || scopes.includes("operator.approvals")) { + return true; + } + } + return false; + }, + nodeRegistry, + agentRunSeq, + chatAbortControllers, + chatAbortedRuns: chatRunState.abortedRuns, + chatRunBuffers: chatRunState.buffers, + chatDeltaSentAt: chatRunState.deltaSentAt, + addChatRun, + removeChatRun, + registerToolEventRecipient: toolEventRecipients.add, + dedupe, + wizardSessions, + findRunningWizard, + purgeWizardSession, + getRuntimeSnapshot, + startChannel, + stopChannel, + markChannelLoggedOut, + wizardRunner, + broadcastVoiceWakeChanged, + }; + + // Store the gateway context as a fallback for plugin subagent dispatch + // in non-WS paths (Telegram polling, WhatsApp, etc.) where no per-request + // scope is set via AsyncLocalStorage. + setFallbackGatewayContext(gatewayRequestContext); + attachGatewayWsHandlers({ wss, clients, @@ -800,57 +857,7 @@ export async function startGatewayServer( ...secretsHandlers, }, broadcast, - context: { - deps, - cron, - cronStorePath, - execApprovalManager, - loadGatewayModelCatalog, - getHealthCache, - refreshHealthSnapshot: refreshGatewayHealthSnapshot, - logHealth, - logGateway: log, - incrementPresenceVersion, - getHealthVersion, - broadcast, - broadcastToConnIds, - nodeSendToSession, - nodeSendToAllSubscribed, - nodeSubscribe, - nodeUnsubscribe, - nodeUnsubscribeAll, - hasConnectedMobileNode: hasMobileNodeConnected, - hasExecApprovalClients: () => { - for (const gatewayClient of clients) { - const scopes = Array.isArray(gatewayClient.connect.scopes) - ? gatewayClient.connect.scopes - : []; - if (scopes.includes("operator.admin") || scopes.includes("operator.approvals")) { - return true; - } - } - return false; - }, - nodeRegistry, - agentRunSeq, - chatAbortControllers, - chatAbortedRuns: chatRunState.abortedRuns, - chatRunBuffers: chatRunState.buffers, - chatDeltaSentAt: chatRunState.deltaSentAt, - addChatRun, - removeChatRun, - registerToolEventRecipient: toolEventRecipients.add, - dedupe, - wizardSessions, - findRunningWizard, - purgeWizardSession, - getRuntimeSnapshot, - startChannel, - stopChannel, - markChannelLoggedOut, - wizardRunner, - broadcastVoiceWakeChanged, - }, + context: gatewayRequestContext, }); logGatewayStartup({ cfg: cfgAtStart, diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index 2b8fc8e7a63..07b0846cddb 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -109,7 +109,19 @@ export type { GatewayRequestHandlerOptions, RespondFn, } from "../gateway/server-methods/types.js"; -export type { PluginRuntime, RuntimeLogger } from "../plugins/runtime/types.js"; +export type { + PluginRuntime, + RuntimeLogger, + SubagentRunParams, + SubagentRunResult, + SubagentWaitParams, + SubagentWaitResult, + SubagentGetSessionMessagesParams, + SubagentGetSessionMessagesResult, + SubagentGetSessionParams, + SubagentGetSessionResult, + SubagentDeleteSessionParams, +} from "../plugins/runtime/types.js"; export { normalizePluginHttpPath } from "../plugins/http-path.js"; export { registerPluginHttpRoute } from "../plugins/http-registry.js"; export { emptyPluginConfigSchema } from "../plugins/config-schema.js"; @@ -695,5 +707,20 @@ export type { ProcessedLineMessage } from "../line/markdown-to-line.js"; // Media utilities export { loadWebMedia, type WebMediaResult } from "../web/media.js"; +// Context engine +export type { + ContextEngine, + ContextEngineInfo, + AssembleResult, + CompactResult, + IngestResult, + IngestBatchResult, + BootstrapResult, + SubagentSpawnPreparation, + SubagentEndReason, +} from "../context-engine/types.js"; +export { registerContextEngine } from "../context-engine/registry.js"; +export type { ContextEngineFactory } from "../context-engine/registry.js"; + // Security utilities export { redactSensitiveText } from "../logging/redact.js"; diff --git a/src/plugins/loader.ts b/src/plugins/loader.ts index 482eeead5de..15051b25e81 100644 --- a/src/plugins/loader.ts +++ b/src/plugins/loader.ts @@ -21,7 +21,7 @@ import { loadPluginManifestRegistry } from "./manifest-registry.js"; import { isPathInside, safeStatSync } from "./path-safety.js"; import { createPluginRegistry, type PluginRecord, type PluginRegistry } from "./registry.js"; import { setActivePluginRegistry } from "./runtime.js"; -import { createPluginRuntime } from "./runtime/index.js"; +import { createPluginRuntime, type CreatePluginRuntimeOptions } from "./runtime/index.js"; import type { PluginRuntime } from "./runtime/types.js"; import { validateJsonSchemaValue } from "./schema-validator.js"; import type { @@ -38,6 +38,7 @@ export type PluginLoadOptions = { workspaceDir?: string; logger?: PluginLogger; coreGatewayHandlers?: Record; + runtimeOptions?: CreatePluginRuntimeOptions; cache?: boolean; mode?: "full" | "validate"; }; @@ -503,7 +504,7 @@ export function loadOpenClawPlugins(options: PluginLoadOptions = {}): PluginRegi // not eagerly load every channel runtime dependency. let resolvedRuntime: PluginRuntime | null = null; const resolveRuntime = (): PluginRuntime => { - resolvedRuntime ??= createPluginRuntime(); + resolvedRuntime ??= createPluginRuntime(options.runtimeOptions); return resolvedRuntime; }; const runtime = new Proxy({} as PluginRuntime, { diff --git a/src/plugins/registry.ts b/src/plugins/registry.ts index fde8d0e6a6d..9fc797ab235 100644 --- a/src/plugins/registry.ts +++ b/src/plugins/registry.ts @@ -2,6 +2,7 @@ import path from "node:path"; import type { AnyAgentTool } from "../agents/tools/common.js"; import type { ChannelDock } from "../channels/dock.js"; import type { ChannelPlugin } from "../channels/plugins/types.js"; +import { registerContextEngine } from "../context-engine/registry.js"; import type { GatewayRequestHandler, GatewayRequestHandlers, @@ -582,6 +583,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { registerCli: (registrar, opts) => registerCli(record, registrar, opts), registerService: (service) => registerService(record, service), registerCommand: (command) => registerCommand(record, command), + registerContextEngine: (id, factory) => registerContextEngine(id, factory), resolvePath: (input: string) => resolveUserPath(input), on: (hookName, handler, opts) => registerTypedHook(record, hookName, handler, opts, params.hookPolicy), diff --git a/src/plugins/runtime/gateway-request-scope.test.ts b/src/plugins/runtime/gateway-request-scope.test.ts new file mode 100644 index 00000000000..ef31350e2a3 --- /dev/null +++ b/src/plugins/runtime/gateway-request-scope.test.ts @@ -0,0 +1,23 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { PluginRuntimeGatewayRequestScope } from "./gateway-request-scope.js"; + +const TEST_SCOPE: PluginRuntimeGatewayRequestScope = { + context: {} as PluginRuntimeGatewayRequestScope["context"], + isWebchatConnect: (() => false) as PluginRuntimeGatewayRequestScope["isWebchatConnect"], +}; + +afterEach(() => { + vi.resetModules(); +}); + +describe("gateway request scope", () => { + it("reuses AsyncLocalStorage across reloaded module instances", async () => { + const first = await import("./gateway-request-scope.js"); + + await first.withPluginRuntimeGatewayRequestScope(TEST_SCOPE, async () => { + vi.resetModules(); + const second = await import("./gateway-request-scope.js"); + expect(second.getPluginRuntimeGatewayRequestScope()).toEqual(TEST_SCOPE); + }); + }); +}); diff --git a/src/plugins/runtime/gateway-request-scope.ts b/src/plugins/runtime/gateway-request-scope.ts new file mode 100644 index 00000000000..11ed9cb4980 --- /dev/null +++ b/src/plugins/runtime/gateway-request-scope.ts @@ -0,0 +1,46 @@ +import { AsyncLocalStorage } from "node:async_hooks"; +import type { + GatewayRequestContext, + GatewayRequestOptions, +} from "../../gateway/server-methods/types.js"; + +export type PluginRuntimeGatewayRequestScope = { + context: GatewayRequestContext; + isWebchatConnect: GatewayRequestOptions["isWebchatConnect"]; +}; + +const PLUGIN_RUNTIME_GATEWAY_REQUEST_SCOPE_KEY: unique symbol = Symbol.for( + "openclaw.pluginRuntimeGatewayRequestScope", +); + +const pluginRuntimeGatewayRequestScope = (() => { + const globalState = globalThis as typeof globalThis & { + [PLUGIN_RUNTIME_GATEWAY_REQUEST_SCOPE_KEY]?: AsyncLocalStorage; + }; + const existing = globalState[PLUGIN_RUNTIME_GATEWAY_REQUEST_SCOPE_KEY]; + if (existing) { + return existing; + } + const created = new AsyncLocalStorage(); + globalState[PLUGIN_RUNTIME_GATEWAY_REQUEST_SCOPE_KEY] = created; + return created; +})(); + +/** + * Runs plugin gateway handlers with request-scoped context that runtime helpers can read. + */ +export function withPluginRuntimeGatewayRequestScope( + scope: PluginRuntimeGatewayRequestScope, + run: () => T, +): T { + return pluginRuntimeGatewayRequestScope.run(scope, run); +} + +/** + * Returns the current plugin gateway request scope when called from a plugin request handler. + */ +export function getPluginRuntimeGatewayRequestScope(): + | PluginRuntimeGatewayRequestScope + | undefined { + return pluginRuntimeGatewayRequestScope.getStore(); +} diff --git a/src/plugins/runtime/index.ts b/src/plugins/runtime/index.ts index 3db2f68ad92..68b672db1b4 100644 --- a/src/plugins/runtime/index.ts +++ b/src/plugins/runtime/index.ts @@ -28,10 +28,28 @@ function resolveVersion(): string { } } -export function createPluginRuntime(): PluginRuntime { +function createUnavailableSubagentRuntime(): PluginRuntime["subagent"] { + const unavailable = () => { + throw new Error("Plugin runtime subagent methods are only available during a gateway request."); + }; + return { + run: unavailable, + waitForRun: unavailable, + getSessionMessages: unavailable, + getSession: unavailable, + deleteSession: unavailable, + }; +} + +export type CreatePluginRuntimeOptions = { + subagent?: PluginRuntime["subagent"]; +}; + +export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}): PluginRuntime { const runtime = { version: resolveVersion(), config: createRuntimeConfig(), + subagent: _options.subagent ?? createUnavailableSubagentRuntime(), system: createRuntimeSystem(), media: createRuntimeMedia(), tts: { textToSpeechTelephony }, diff --git a/src/plugins/runtime/types.ts b/src/plugins/runtime/types.ts index 275bb7cba9a..245e8dd1274 100644 --- a/src/plugins/runtime/types.ts +++ b/src/plugins/runtime/types.ts @@ -3,6 +3,61 @@ import type { PluginRuntimeCore, RuntimeLogger } from "./types-core.js"; export type { RuntimeLogger }; +// ── Subagent runtime types ────────────────────────────────────────── + +export type SubagentRunParams = { + sessionKey: string; + message: string; + extraSystemPrompt?: string; + lane?: string; + deliver?: boolean; + idempotencyKey?: string; +}; + +export type SubagentRunResult = { + runId: string; +}; + +export type SubagentWaitParams = { + runId: string; + timeoutMs?: number; +}; + +export type SubagentWaitResult = { + status: "ok" | "error" | "timeout"; + error?: string; +}; + +export type SubagentGetSessionMessagesParams = { + sessionKey: string; + limit?: number; +}; + +export type SubagentGetSessionMessagesResult = { + messages: unknown[]; +}; + +/** @deprecated Use SubagentGetSessionMessagesParams. */ +export type SubagentGetSessionParams = SubagentGetSessionMessagesParams; + +/** @deprecated Use SubagentGetSessionMessagesResult. */ +export type SubagentGetSessionResult = SubagentGetSessionMessagesResult; + +export type SubagentDeleteSessionParams = { + sessionKey: string; + deleteTranscript?: boolean; +}; + export type PluginRuntime = PluginRuntimeCore & { + subagent: { + run: (params: SubagentRunParams) => Promise; + waitForRun: (params: SubagentWaitParams) => Promise; + getSessionMessages: ( + params: SubagentGetSessionMessagesParams, + ) => Promise; + /** @deprecated Use getSessionMessages. */ + getSession: (params: SubagentGetSessionParams) => Promise; + deleteSession: (params: SubagentDeleteSessionParams) => Promise; + }; channel: PluginRuntimeChannel; }; diff --git a/src/plugins/slots.ts b/src/plugins/slots.ts index 8fee7172a2e..bcbbdd44a03 100644 --- a/src/plugins/slots.ts +++ b/src/plugins/slots.ts @@ -11,10 +11,12 @@ type SlotPluginRecord = { const SLOT_BY_KIND: Record = { memory: "memory", + "context-engine": "contextEngine", }; const DEFAULT_SLOT_BY_KEY: Record = { memory: "memory-core", + contextEngine: "legacy", }; export function slotKeyForPluginKind(kind?: PluginKind): PluginSlotKey | null { diff --git a/src/plugins/types.ts b/src/plugins/types.ts index 1cb2779e8c2..32f8a545038 100644 --- a/src/plugins/types.ts +++ b/src/plugins/types.ts @@ -35,7 +35,7 @@ export type PluginConfigUiHint = { placeholder?: string; }; -export type PluginKind = "memory"; +export type PluginKind = "memory" | "context-engine"; export type PluginConfigValidation = | { ok: true; value?: unknown } @@ -285,6 +285,11 @@ export type OpenClawPluginApi = { * Use this for simple state-toggling or status commands that don't need AI reasoning. */ registerCommand: (command: OpenClawPluginCommandDefinition) => void; + /** Register a context engine implementation (exclusive slot — only one active at a time). */ + registerContextEngine: ( + id: string, + factory: import("../context-engine/registry.js").ContextEngineFactory, + ) => void; resolvePath: (input: string) => string; /** Register a lifecycle hook handler */ on: ( diff --git a/test/scripts/ios-team-id.test.ts b/test/scripts/ios-team-id.test.ts index f2a9037f020..2496073951c 100644 --- a/test/scripts/ios-team-id.test.ts +++ b/test/scripts/ios-team-id.test.ts @@ -96,7 +96,7 @@ function runScript( const binDir = path.join(homeDir, "bin"); const env = { HOME: homeDir, - PATH: `${binDir}:${sharedBinDir}:${BASE_PATH}`, + PATH: `${binDir}${path.delimiter}${sharedBinDir}${path.delimiter}${BASE_PATH}`, LANG: BASE_LANG, ...extraEnv, };