diff --git a/extensions/telegram/src/bot-handlers.runtime.ts b/extensions/telegram/src/bot-handlers.runtime.ts index b7fd2a2705b..04ff45f49e8 100644 --- a/extensions/telegram/src/bot-handlers.runtime.ts +++ b/extensions/telegram/src/bot-handlers.runtime.ts @@ -171,9 +171,7 @@ export const registerTelegramHandlers = ({ const mediaGroupBuffer = new Map(); let mediaGroupProcessing: Promise = Promise.resolve(); const messageCache = createTelegramMessageCache({ - persistedPath: resolveTelegramMessageCachePath( - telegramDeps.resolveStorePath(cfg.session?.store), - ), + persistedPath: resolveTelegramMessageCachePath(accountId), }); type TextFragmentEntry = { diff --git a/extensions/telegram/src/message-cache.ts b/extensions/telegram/src/message-cache.ts index 8ce7ca374a8..3809df7e5a8 100644 --- a/extensions/telegram/src/message-cache.ts +++ b/extensions/telegram/src/message-cache.ts @@ -1,9 +1,9 @@ -import fs from "node:fs"; +import { createHash } from "node:crypto"; import type { Message } from "@grammyjs/types"; import { formatLocationText } from "openclaw/plugin-sdk/channel-inbound"; +import { createPluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; import type { MsgContext } from "openclaw/plugin-sdk/reply-runtime"; import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; -import { appendRegularFileSync, replaceFileAtomicSync } from "openclaw/plugin-sdk/security-runtime"; import { resolveTelegramPrimaryMedia } from "./bot/body-helpers.js"; import { buildSenderName, @@ -55,13 +55,28 @@ export type TelegramMessageCache = { type MessageWithExternalReply = Message & { external_reply?: Message }; type TelegramMessageCacheBucket = { + scopeKey?: string; messages: Map; - persistedEntryCount: number; +}; + +type TelegramPersistedMessageCacheNode = { + scopeKey: string; + cacheKey: string; + sourceMessage: Message; + threadId?: string; }; const DEFAULT_MAX_MESSAGES = 5000; -const COMPACT_THRESHOLD_RATIO = 2; +const DEFAULT_TTL_MS = 7 * 24 * 60 * 60 * 1000; const persistedMessageCacheBuckets = new Map(); +const MESSAGE_CACHE_STORE = createPluginStateSyncKeyedStore( + "telegram", + { + namespace: "message-cache", + maxEntries: 100_000, + defaultTtlMs: DEFAULT_TTL_MS, + }, +); function telegramMessageCacheKey(params: { accountId: string; @@ -76,7 +91,8 @@ function telegramMessageCacheKeyPrefix(params: { accountId: string; chatId: stri } export function resolveTelegramMessageCachePath(storePath: string): string { - return `${storePath}.telegram-messages.json`; + const trimmed = storePath.trim(); + return trimmed ? `telegram-message-cache:${trimmed}` : "telegram-message-cache:default"; } function resolveReplyMessage(msg: Message): Message | undefined { @@ -162,17 +178,6 @@ function parsePersistedNode(value: unknown): TelegramCachedMessageNode | null { return normalizeMessageNode(value.sourceMessage, Number.isFinite(threadId) ? { threadId } : {}); } -function parsePersistedEntry(value: unknown): { - key: string; - node: TelegramCachedMessageNode; -} | null { - if (!isRecord(value) || !isString(value.key)) { - return null; - } - const node = parsePersistedNode(value.node); - return node ? { key: value.key, node } : null; -} - function trimMessages(messages: Map, maxMessages: number): void { while (messages.size > maxMessages) { const oldest = messages.keys().next().value; @@ -183,103 +188,75 @@ function trimMessages(messages: Map, maxMessa } } -function readPersistedMessages(filePath: string, maxMessages: number) { +function persistedMessageEntryKey(scopeKey: string, cacheKey: string): string { + return createHash("sha256").update(`${scopeKey}\0${cacheKey}`, "utf8").digest("hex").slice(0, 32); +} + +function readPersistedMessages(scopeKey: string, maxMessages: number) { const messages = new Map(); - let persistedEntryCount = 0; - if (!fs.existsSync(filePath)) { - return { messages, persistedEntryCount }; - } try { - for (const line of fs.readFileSync(filePath, "utf-8").split("\n")) { - if (!line.trim()) { + for (const entry of MESSAGE_CACHE_STORE.entries() + .filter((entry) => entry.value.scopeKey === scopeKey) + .slice(-maxMessages)) { + if (!isString(entry.value.cacheKey)) { continue; } - const entry = parsePersistedEntry(JSON.parse(line)); - if (!entry) { - continue; + const node = parsePersistedNode(entry.value); + if (node) { + messages.set(entry.value.cacheKey, node); } - persistedEntryCount++; - messages.delete(entry.key); - messages.set(entry.key, entry.node); - trimMessages(messages, maxMessages); } } catch (error) { logVerbose(`telegram: failed to read message cache: ${String(error)}`); } - return { messages, persistedEntryCount }; + return messages; } -function serializePersistedEntry(key: string, node: TelegramCachedMessageNode): string { - return `${JSON.stringify({ - key, - node: { - sourceMessage: node.sourceMessage, - ...(node.threadId ? { threadId: node.threadId } : {}), - }, - })}\n`; -} - -function replacePersistedMessages(params: { +function persistMessages(params: { messages: Map; - persistedPath?: string; -}): number { - const { persistedPath, messages } = params; - if (!persistedPath) { - return messages.size; + scopeKey?: string; +}) { + const { scopeKey, messages } = params; + if (!scopeKey) { + return; } - if (messages.size === 0) { - fs.rmSync(persistedPath, { force: true }); - return 0; + const retained = new Set(messages.keys()); + for (const entry of MESSAGE_CACHE_STORE.entries()) { + if (entry.value.scopeKey === scopeKey && !retained.has(entry.value.cacheKey)) { + MESSAGE_CACHE_STORE.delete(entry.key); + } } - const serialized = Array.from(messages, ([key, node]) => serializePersistedEntry(key, node)).join( - "", - ); - replaceFileAtomicSync({ - filePath: persistedPath, - content: serialized, - tempPrefix: ".telegram-message-cache", - }); - return messages.size; -} - -function appendPersistedMessage(params: { - key: string; - node: TelegramCachedMessageNode; - persistedPath?: string; -}): number { - const { persistedPath } = params; - if (!persistedPath) { - return 0; + for (const [key, node] of messages) { + MESSAGE_CACHE_STORE.register( + persistedMessageEntryKey(scopeKey, key), + { + scopeKey, + cacheKey: key, + sourceMessage: node.sourceMessage, + ...(node.threadId ? { threadId: node.threadId } : {}), + }, + { ttlMs: DEFAULT_TTL_MS }, + ); } - appendRegularFileSync({ - filePath: persistedPath, - content: serializePersistedEntry(params.key, params.node), - }); - return 1; } function resolveMessageCacheBucket(params: { - persistedPath?: string; + scopeKey?: string; maxMessages: number; }): TelegramMessageCacheBucket { - const { persistedPath, maxMessages } = params; - if (!persistedPath) { - return { messages: new Map(), persistedEntryCount: 0 }; + const { scopeKey, maxMessages } = params; + if (!scopeKey) { + return { messages: new Map() }; } - const existing = persistedMessageCacheBuckets.get(persistedPath); + const existing = persistedMessageCacheBuckets.get(scopeKey); if (existing) { - if (!fs.existsSync(persistedPath)) { - existing.messages.clear(); - existing.persistedEntryCount = 0; - } return existing; } - const persisted = readPersistedMessages(persistedPath, maxMessages); const bucket = { - messages: persisted.messages, - persistedEntryCount: persisted.persistedEntryCount, + scopeKey, + messages: readPersistedMessages(scopeKey, maxMessages), }; - persistedMessageCacheBuckets.set(persistedPath, bucket); + persistedMessageCacheBuckets.set(scopeKey, bucket); return bucket; } @@ -288,11 +265,11 @@ export function createTelegramMessageCache(params?: { persistedPath?: string; }): TelegramMessageCache { const maxMessages = params?.maxMessages ?? DEFAULT_MAX_MESSAGES; - const bucket = resolveMessageCacheBucket({ - persistedPath: params?.persistedPath, + const scopeKey = params?.persistedPath; + const { messages } = resolveMessageCacheBucket({ + scopeKey, maxMessages, }); - const { messages } = bucket; const get: TelegramMessageCache["get"] = ({ accountId, chatId, messageId }) => { if (!messageId) { @@ -337,17 +314,7 @@ export function createTelegramMessageCache(params?: { messages.set(key, entry); trimMessages(messages, maxMessages); try { - bucket.persistedEntryCount += appendPersistedMessage({ - key, - node: entry, - persistedPath: params?.persistedPath, - }); - if (bucket.persistedEntryCount > maxMessages * COMPACT_THRESHOLD_RATIO) { - bucket.persistedEntryCount = replacePersistedMessages({ - messages, - persistedPath: params?.persistedPath, - }); - } + persistMessages({ messages, scopeKey }); } catch (error) { logVerbose(`telegram: failed to persist message cache: ${String(error)}`); } diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index d58039800d0..10081b8e760 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -5,11 +5,10 @@ import { isAcpRuntimeSpawnAvailable } from "../../../acp/runtime/availability.js import { buildHierarchyReinforcementMessage } from "../../../auto-reply/handoff-summarizer.js"; import { filterHeartbeatPairs } from "../../../auto-reply/heartbeat-filter.js"; import { getRuntimeConfig } from "../../../config/config.js"; -import { resolveStorePath } from "../../../config/sessions/paths.js"; import { - loadSessionStore, - runQuotaSuspensionMaintenance, - updateSessionStoreEntry, + getSessionEntry, + listSessionEntries, + patchSessionEntry, } from "../../../config/sessions/store.js"; import { hasSqliteSessionTranscriptEvents } from "../../../config/sessions/transcript-store.sqlite.js"; import { resolveContextEngineOwnerPluginId } from "../../../context-engine/registry.js"; @@ -2441,15 +2440,14 @@ export async function runEmbeddedAttempt( }); if (params.sessionKey && !isRawModelRun) { - const storePath = resolveStorePath(params.config?.session?.store, { + const sessionEntry = getSessionEntry({ agentId: sessionAgentId, + sessionKey: params.sessionKey, }); - await runQuotaSuspensionMaintenance({ storePath }); - const store = loadSessionStore(storePath); - const sessionEntry = store[params.sessionKey]; const suspension = sessionEntry?.quotaSuspension; if (suspension?.state === "resuming") { - const subagents = Object.values(store) + const subagents = listSessionEntries({ agentId: sessionAgentId }) + .map(({ entry }) => entry) .filter((s) => s.spawnedBy === sessionEntry.sessionId) .map((s) => ({ sessionId: s.sessionId, @@ -2461,8 +2459,8 @@ export async function runEmbeddedAttempt( activeSubagents: subagents, }); validated.push(handoffMsg); - await updateSessionStoreEntry({ - storePath, + await patchSessionEntry({ + agentId: sessionAgentId, sessionKey: params.sessionKey, update: async (entry) => { if (entry.quotaSuspension?.state !== "resuming") { diff --git a/src/agents/session-suspension.test.ts b/src/agents/session-suspension.test.ts index 376484bd2da..2bb15968bb3 100644 --- a/src/agents/session-suspension.test.ts +++ b/src/agents/session-suspension.test.ts @@ -3,7 +3,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js"; import { CommandLane } from "../process/lanes.js"; const sessionStoreMocks = vi.hoisted(() => ({ - updateSessionStoreEntry: vi.fn(async (params: { update: (entry: unknown) => unknown }) => { + patchSessionEntry: vi.fn(async (params: { update: (entry: unknown) => unknown }) => { await params.update({ sessionId: "session-1" }); }), })); @@ -19,7 +19,7 @@ vi.mock("../process/command-queue.js", () => commandQueueMocks); vi.mock("./command/session.js", () => ({ resolveStoredSessionKeyForSessionId: () => ({ sessionKey: "session-key", - storePath: "/tmp/openclaw-session-suspension-test/sessions.json", + agentId: "main", }), })); @@ -41,7 +41,7 @@ describe("session suspension", () => { const { cancelLaneAutoResume } = await import("./session-suspension.js"); cancelLaneAutoResume(CommandLane.Main); vi.useRealTimers(); - sessionStoreMocks.updateSessionStoreEntry.mockClear(); + sessionStoreMocks.patchSessionEntry.mockClear(); commandQueueMocks.setCommandLaneConcurrency.mockClear(); }); diff --git a/src/agents/session-suspension.ts b/src/agents/session-suspension.ts index f136bdbf16a..3c8c35b3e1d 100644 --- a/src/agents/session-suspension.ts +++ b/src/agents/session-suspension.ts @@ -1,6 +1,6 @@ import path from "node:path"; import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../config/agent-limits.js"; -import { updateSessionStoreEntry } from "../config/sessions.js"; +import { patchSessionEntry } from "../config/sessions.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { setCommandLaneConcurrency } from "../process/command-queue.js"; @@ -85,7 +85,7 @@ export async function suspendSession(params: { return; } - const { sessionKey, storePath } = resolveStoredSessionKeyForSessionId({ + const { sessionKey, agentId } = resolveStoredSessionKeyForSessionId({ cfg: params.cfg, sessionId: params.sessionId, agentId: params.agentDir ? path.basename(params.agentDir) : undefined, @@ -99,8 +99,8 @@ export async function suspendSession(params: { const now = Date.now(); try { - await updateSessionStoreEntry({ - storePath, + await patchSessionEntry({ + agentId, sessionKey, update: async () => ({ quotaSuspension: { diff --git a/src/gateway/server-methods/usage.sessions-usage.test.ts b/src/gateway/server-methods/usage.sessions-usage.test.ts index 8808979192c..bde7e43a58a 100644 --- a/src/gateway/server-methods/usage.sessions-usage.test.ts +++ b/src/gateway/server-methods/usage.sessions-usage.test.ts @@ -2,6 +2,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import { createSqliteSessionTranscriptLocator } from "../../config/sessions/paths.js"; import { replaceSqliteSessionTranscriptEvents } from "../../config/sessions/transcript-store.sqlite.js"; import { withEnvAsync } from "../../test-utils/env.js"; @@ -225,21 +226,33 @@ describe("sessions.usage", () => { try { await withEnvAsync({ OPENCLAW_STATE_DIR: stateDir }, async () => { - const agentSessionsDir = path.join(stateDir, "agents", "opus", "sessions"); - fs.mkdirSync(agentSessionsDir, { recursive: true }); - fs.writeFileSync(path.join(agentSessionsDir, "current.jsonl"), "", "utf-8"); - fs.writeFileSync( - path.join(agentSessionsDir, "old.jsonl.reset.2026-02-01T00-00-00.000Z"), - "", - "utf-8", - ); + const currentSessionFile = createSqliteSessionTranscriptLocator({ + agentId: "opus", + sessionId: "current", + }); + const oldSessionFile = createSqliteSessionTranscriptLocator({ + agentId: "opus", + sessionId: "old", + }); + replaceSqliteSessionTranscriptEvents({ + agentId: "opus", + sessionId: "current", + transcriptPath: currentSessionFile, + events: [{ type: "session", id: "current" }], + }); + replaceSqliteSessionTranscriptEvents({ + agentId: "opus", + sessionId: "old", + transcriptPath: oldSessionFile, + events: [{ type: "session", id: "old" }], + }); - vi.mocked(loadCombinedSessionStoreForGateway).mockReturnValue({ - storePath: "(multiple)", - store: { + vi.mocked(loadCombinedSessionEntriesForGateway).mockReturnValue({ + databasePath: "(multiple)", + entries: { [storeKey]: { sessionId: "current", - sessionFile: "current.jsonl", + sessionFile: currentSessionFile, updatedAt: 1_000, usageFamilyKey: storeKey, usageFamilySessionIds: ["old", "current"],