refactor: remove legacy session store remnants

This commit is contained in:
Peter Steinberger
2026-05-08 19:03:13 +01:00
parent af8fd48d6a
commit a5b59393b9
6 changed files with 110 additions and 134 deletions

View File

@@ -171,9 +171,7 @@ export const registerTelegramHandlers = ({
const mediaGroupBuffer = new Map<string, MediaGroupEntry>();
let mediaGroupProcessing: Promise<void> = Promise.resolve();
const messageCache = createTelegramMessageCache({
persistedPath: resolveTelegramMessageCachePath(
telegramDeps.resolveStorePath(cfg.session?.store),
),
persistedPath: resolveTelegramMessageCachePath(accountId),
});
type TextFragmentEntry = {

View File

@@ -1,9 +1,9 @@
import fs from "node:fs";
import { createHash } from "node:crypto";
import type { Message } from "@grammyjs/types";
import { formatLocationText } from "openclaw/plugin-sdk/channel-inbound";
import { createPluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import type { MsgContext } from "openclaw/plugin-sdk/reply-runtime";
import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { appendRegularFileSync, replaceFileAtomicSync } from "openclaw/plugin-sdk/security-runtime";
import { resolveTelegramPrimaryMedia } from "./bot/body-helpers.js";
import {
buildSenderName,
@@ -50,13 +50,28 @@ export type TelegramMessageCache = {
type MessageWithExternalReply = Message & { external_reply?: Message };
type TelegramMessageCacheBucket = {
scopeKey?: string;
messages: Map<string, TelegramCachedMessageNode>;
persistedEntryCount: number;
};
type TelegramPersistedMessageCacheNode = {
scopeKey: string;
cacheKey: string;
sourceMessage: Message;
threadId?: string;
};
const DEFAULT_MAX_MESSAGES = 5000;
const COMPACT_THRESHOLD_RATIO = 2;
const DEFAULT_TTL_MS = 7 * 24 * 60 * 60 * 1000;
const persistedMessageCacheBuckets = new Map<string, TelegramMessageCacheBucket>();
const MESSAGE_CACHE_STORE = createPluginStateSyncKeyedStore<TelegramPersistedMessageCacheNode>(
"telegram",
{
namespace: "message-cache",
maxEntries: 100_000,
defaultTtlMs: DEFAULT_TTL_MS,
},
);
function telegramMessageCacheKey(params: {
accountId: string;
@@ -71,7 +86,8 @@ function telegramMessageCacheKeyPrefix(params: { accountId: string; chatId: stri
}
export function resolveTelegramMessageCachePath(storePath: string): string {
return `${storePath}.telegram-messages.json`;
const trimmed = storePath.trim();
return trimmed ? `telegram-message-cache:${trimmed}` : "telegram-message-cache:default";
}
function resolveReplyMessage(msg: Message): Message | undefined {
@@ -157,17 +173,6 @@ function parsePersistedNode(value: unknown): TelegramCachedMessageNode | null {
return normalizeMessageNode(value.sourceMessage, Number.isFinite(threadId) ? { threadId } : {});
}
function parsePersistedEntry(value: unknown): {
key: string;
node: TelegramCachedMessageNode;
} | null {
if (!isRecord(value) || !isString(value.key)) {
return null;
}
const node = parsePersistedNode(value.node);
return node ? { key: value.key, node } : null;
}
function trimMessages(messages: Map<string, TelegramCachedMessageNode>, maxMessages: number): void {
while (messages.size > maxMessages) {
const oldest = messages.keys().next().value;
@@ -178,103 +183,75 @@ function trimMessages(messages: Map<string, TelegramCachedMessageNode>, maxMessa
}
}
function readPersistedMessages(filePath: string, maxMessages: number) {
function persistedMessageEntryKey(scopeKey: string, cacheKey: string): string {
return createHash("sha256").update(`${scopeKey}\0${cacheKey}`, "utf8").digest("hex").slice(0, 32);
}
function readPersistedMessages(scopeKey: string, maxMessages: number) {
const messages = new Map<string, TelegramCachedMessageNode>();
let persistedEntryCount = 0;
if (!fs.existsSync(filePath)) {
return { messages, persistedEntryCount };
}
try {
for (const line of fs.readFileSync(filePath, "utf-8").split("\n")) {
if (!line.trim()) {
for (const entry of MESSAGE_CACHE_STORE.entries()
.filter((entry) => entry.value.scopeKey === scopeKey)
.slice(-maxMessages)) {
if (!isString(entry.value.cacheKey)) {
continue;
}
const entry = parsePersistedEntry(JSON.parse(line));
if (!entry) {
continue;
const node = parsePersistedNode(entry.value);
if (node) {
messages.set(entry.value.cacheKey, node);
}
persistedEntryCount++;
messages.delete(entry.key);
messages.set(entry.key, entry.node);
trimMessages(messages, maxMessages);
}
} catch (error) {
logVerbose(`telegram: failed to read message cache: ${String(error)}`);
}
return { messages, persistedEntryCount };
return messages;
}
function serializePersistedEntry(key: string, node: TelegramCachedMessageNode): string {
return `${JSON.stringify({
key,
node: {
sourceMessage: node.sourceMessage,
...(node.threadId ? { threadId: node.threadId } : {}),
},
})}\n`;
}
function replacePersistedMessages(params: {
function persistMessages(params: {
messages: Map<string, TelegramCachedMessageNode>;
persistedPath?: string;
}): number {
const { persistedPath, messages } = params;
if (!persistedPath) {
return messages.size;
scopeKey?: string;
}) {
const { scopeKey, messages } = params;
if (!scopeKey) {
return;
}
if (messages.size === 0) {
fs.rmSync(persistedPath, { force: true });
return 0;
const retained = new Set(messages.keys());
for (const entry of MESSAGE_CACHE_STORE.entries()) {
if (entry.value.scopeKey === scopeKey && !retained.has(entry.value.cacheKey)) {
MESSAGE_CACHE_STORE.delete(entry.key);
}
}
const serialized = Array.from(messages, ([key, node]) => serializePersistedEntry(key, node)).join(
"",
);
replaceFileAtomicSync({
filePath: persistedPath,
content: serialized,
tempPrefix: ".telegram-message-cache",
});
return messages.size;
}
function appendPersistedMessage(params: {
key: string;
node: TelegramCachedMessageNode;
persistedPath?: string;
}): number {
const { persistedPath } = params;
if (!persistedPath) {
return 0;
for (const [key, node] of messages) {
MESSAGE_CACHE_STORE.register(
persistedMessageEntryKey(scopeKey, key),
{
scopeKey,
cacheKey: key,
sourceMessage: node.sourceMessage,
...(node.threadId ? { threadId: node.threadId } : {}),
},
{ ttlMs: DEFAULT_TTL_MS },
);
}
appendRegularFileSync({
filePath: persistedPath,
content: serializePersistedEntry(params.key, params.node),
});
return 1;
}
function resolveMessageCacheBucket(params: {
persistedPath?: string;
scopeKey?: string;
maxMessages: number;
}): TelegramMessageCacheBucket {
const { persistedPath, maxMessages } = params;
if (!persistedPath) {
return { messages: new Map<string, TelegramCachedMessageNode>(), persistedEntryCount: 0 };
const { scopeKey, maxMessages } = params;
if (!scopeKey) {
return { messages: new Map<string, TelegramCachedMessageNode>() };
}
const existing = persistedMessageCacheBuckets.get(persistedPath);
const existing = persistedMessageCacheBuckets.get(scopeKey);
if (existing) {
if (!fs.existsSync(persistedPath)) {
existing.messages.clear();
existing.persistedEntryCount = 0;
}
return existing;
}
const persisted = readPersistedMessages(persistedPath, maxMessages);
const bucket = {
messages: persisted.messages,
persistedEntryCount: persisted.persistedEntryCount,
scopeKey,
messages: readPersistedMessages(scopeKey, maxMessages),
};
persistedMessageCacheBuckets.set(persistedPath, bucket);
persistedMessageCacheBuckets.set(scopeKey, bucket);
return bucket;
}
@@ -283,11 +260,11 @@ export function createTelegramMessageCache(params?: {
persistedPath?: string;
}): TelegramMessageCache {
const maxMessages = params?.maxMessages ?? DEFAULT_MAX_MESSAGES;
const bucket = resolveMessageCacheBucket({
persistedPath: params?.persistedPath,
const scopeKey = params?.persistedPath;
const { messages } = resolveMessageCacheBucket({
scopeKey,
maxMessages,
});
const { messages } = bucket;
const get: TelegramMessageCache["get"] = ({ accountId, chatId, messageId }) => {
if (!messageId) {
@@ -332,17 +309,7 @@ export function createTelegramMessageCache(params?: {
messages.set(key, entry);
trimMessages(messages, maxMessages);
try {
bucket.persistedEntryCount += appendPersistedMessage({
key,
node: entry,
persistedPath: params?.persistedPath,
});
if (bucket.persistedEntryCount > maxMessages * COMPACT_THRESHOLD_RATIO) {
bucket.persistedEntryCount = replacePersistedMessages({
messages,
persistedPath: params?.persistedPath,
});
}
persistMessages({ messages, scopeKey });
} catch (error) {
logVerbose(`telegram: failed to persist message cache: ${String(error)}`);
}

View File

@@ -5,11 +5,10 @@ import { isAcpRuntimeSpawnAvailable } from "../../../acp/runtime/availability.js
import { buildHierarchyReinforcementMessage } from "../../../auto-reply/handoff-summarizer.js";
import { filterHeartbeatPairs } from "../../../auto-reply/heartbeat-filter.js";
import { getRuntimeConfig } from "../../../config/config.js";
import { resolveStorePath } from "../../../config/sessions/paths.js";
import {
loadSessionStore,
runQuotaSuspensionMaintenance,
updateSessionStoreEntry,
getSessionEntry,
listSessionEntries,
patchSessionEntry,
} from "../../../config/sessions/store.js";
import { hasSqliteSessionTranscriptEvents } from "../../../config/sessions/transcript-store.sqlite.js";
import { resolveContextEngineOwnerPluginId } from "../../../context-engine/registry.js";
@@ -2203,15 +2202,14 @@ export async function runEmbeddedAttempt(
});
if (params.sessionKey && !isRawModelRun) {
const storePath = resolveStorePath(params.config?.session?.store, {
const sessionEntry = getSessionEntry({
agentId: sessionAgentId,
sessionKey: params.sessionKey,
});
await runQuotaSuspensionMaintenance({ storePath });
const store = loadSessionStore(storePath);
const sessionEntry = store[params.sessionKey];
const suspension = sessionEntry?.quotaSuspension;
if (suspension?.state === "resuming") {
const subagents = Object.values(store)
const subagents = listSessionEntries({ agentId: sessionAgentId })
.map(({ entry }) => entry)
.filter((s) => s.spawnedBy === sessionEntry.sessionId)
.map((s) => ({
sessionId: s.sessionId,
@@ -2223,8 +2221,8 @@ export async function runEmbeddedAttempt(
activeSubagents: subagents,
});
validated.push(handoffMsg);
await updateSessionStoreEntry({
storePath,
await patchSessionEntry({
agentId: sessionAgentId,
sessionKey: params.sessionKey,
update: async (entry) => {
if (entry.quotaSuspension?.state !== "resuming") {

View File

@@ -3,7 +3,7 @@ import type { OpenClawConfig } from "../config/types.openclaw.js";
import { CommandLane } from "../process/lanes.js";
const sessionStoreMocks = vi.hoisted(() => ({
updateSessionStoreEntry: vi.fn(async (params: { update: (entry: unknown) => unknown }) => {
patchSessionEntry: vi.fn(async (params: { update: (entry: unknown) => unknown }) => {
await params.update({ sessionId: "session-1" });
}),
}));
@@ -19,7 +19,7 @@ vi.mock("../process/command-queue.js", () => commandQueueMocks);
vi.mock("./command/session.js", () => ({
resolveStoredSessionKeyForSessionId: () => ({
sessionKey: "session-key",
storePath: "/tmp/openclaw-session-suspension-test/sessions.json",
agentId: "main",
}),
}));
@@ -41,7 +41,7 @@ describe("session suspension", () => {
const { cancelLaneAutoResume } = await import("./session-suspension.js");
cancelLaneAutoResume(CommandLane.Main);
vi.useRealTimers();
sessionStoreMocks.updateSessionStoreEntry.mockClear();
sessionStoreMocks.patchSessionEntry.mockClear();
commandQueueMocks.setCommandLaneConcurrency.mockClear();
});

View File

@@ -1,6 +1,6 @@
import path from "node:path";
import { resolveAgentMaxConcurrent, resolveSubagentMaxConcurrent } from "../config/agent-limits.js";
import { updateSessionStoreEntry } from "../config/sessions.js";
import { patchSessionEntry } from "../config/sessions.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { setCommandLaneConcurrency } from "../process/command-queue.js";
@@ -85,7 +85,7 @@ export async function suspendSession(params: {
return;
}
const { sessionKey, storePath } = resolveStoredSessionKeyForSessionId({
const { sessionKey, agentId } = resolveStoredSessionKeyForSessionId({
cfg: params.cfg,
sessionId: params.sessionId,
agentId: params.agentDir ? path.basename(params.agentDir) : undefined,
@@ -99,8 +99,8 @@ export async function suspendSession(params: {
const now = Date.now();
try {
await updateSessionStoreEntry({
storePath,
await patchSessionEntry({
agentId,
sessionKey,
update: async () => ({
quotaSuspension: {

View File

@@ -2,6 +2,7 @@ import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { createSqliteSessionTranscriptLocator } from "../../config/sessions/paths.js";
import { replaceSqliteSessionTranscriptEvents } from "../../config/sessions/transcript-store.sqlite.js";
import { withEnvAsync } from "../../test-utils/env.js";
@@ -225,21 +226,33 @@ describe("sessions.usage", () => {
try {
await withEnvAsync({ OPENCLAW_STATE_DIR: stateDir }, async () => {
const agentSessionsDir = path.join(stateDir, "agents", "opus", "sessions");
fs.mkdirSync(agentSessionsDir, { recursive: true });
fs.writeFileSync(path.join(agentSessionsDir, "current.jsonl"), "", "utf-8");
fs.writeFileSync(
path.join(agentSessionsDir, "old.jsonl.reset.2026-02-01T00-00-00.000Z"),
"",
"utf-8",
);
const currentSessionFile = createSqliteSessionTranscriptLocator({
agentId: "opus",
sessionId: "current",
});
const oldSessionFile = createSqliteSessionTranscriptLocator({
agentId: "opus",
sessionId: "old",
});
replaceSqliteSessionTranscriptEvents({
agentId: "opus",
sessionId: "current",
transcriptPath: currentSessionFile,
events: [{ type: "session", id: "current" }],
});
replaceSqliteSessionTranscriptEvents({
agentId: "opus",
sessionId: "old",
transcriptPath: oldSessionFile,
events: [{ type: "session", id: "old" }],
});
vi.mocked(loadCombinedSessionStoreForGateway).mockReturnValue({
storePath: "(multiple)",
store: {
vi.mocked(loadCombinedSessionEntriesForGateway).mockReturnValue({
databasePath: "(multiple)",
entries: {
[storeKey]: {
sessionId: "current",
sessionFile: "current.jsonl",
sessionFile: currentSessionFile,
updatedAt: 1_000,
usageFamilyKey: storeKey,
usageFamilySessionIds: ["old", "current"],