diff --git a/extensions/matrix/src/session-route.test.ts b/extensions/matrix/src/session-route.test.ts index 30045046f2a..3a80af0e727 100644 --- a/extensions/matrix/src/session-route.test.ts +++ b/extensions/matrix/src/session-route.test.ts @@ -1,11 +1,13 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; +import { upsertSessionEntry } from "openclaw/plugin-sdk/session-store-runtime"; import { afterEach, describe, expect, it } from "vitest"; import type { OpenClawConfig } from "./runtime-api.js"; import { resolveMatrixOutboundSessionRoute } from "./session-route.js"; const tempDirs = new Set(); +const previousStateDir = process.env.OPENCLAW_STATE_DIR; const currentDmSessionKey = "agent:main:matrix:channel:!dm:example.org"; type MatrixChannelConfig = NonNullable["matrix"]>; @@ -26,22 +28,26 @@ const defaultAccountPerRoomDmMatrixConfig = { }, } satisfies MatrixChannelConfig; -function createTempStore(entries: Record): string { +function seedTempSessionEntries(entries: Record): void { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "matrix-session-route-")); tempDirs.add(tempDir); - const storePath = path.join(tempDir, "sessions.json"); - fs.writeFileSync(storePath, JSON.stringify(entries), "utf8"); - return storePath; + process.env.OPENCLAW_STATE_DIR = tempDir; + for (const [sessionKey, entry] of Object.entries(entries)) { + upsertSessionEntry({ + agentId: "main", + sessionKey, + entry: entry as never, + }); + } } function createMatrixRouteConfig( entries: Record, matrix: MatrixChannelConfig = perRoomDmMatrixConfig, ): OpenClawConfig { + seedTempSessionEntries(entries); return { - session: { - store: createTempStore(entries), - }, + session: {}, channels: { matrix, }, @@ -173,6 +179,11 @@ function expectFallbackUserRoute( } afterEach(() => { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } for (const tempDir of tempDirs) { fs.rmSync(tempDir, { recursive: true, force: true }); } diff --git a/extensions/mattermost/src/mattermost/model-picker.test.ts b/extensions/mattermost/src/mattermost/model-picker.test.ts index 1c489947a76..5a6519e9676 100644 --- a/extensions/mattermost/src/mattermost/model-picker.test.ts +++ b/extensions/mattermost/src/mattermost/model-picker.test.ts @@ -140,9 +140,7 @@ describe("Mattermost model picker", () => { const testDir = fs.mkdtempSync(path.join(os.tmpdir(), "mm-model-picker-")); try { const cfg: OpenClawConfig = { - session: { - store: path.join(testDir, "{agentId}.json"), - }, + session: {}, agents: { defaults: { model: "anthropic/claude-opus-4-5", diff --git a/extensions/telegram/src/bot-message-context.session-recreate.test-support.ts b/extensions/telegram/src/bot-message-context.session-recreate.test-support.ts index 2ab161d807c..9fa58581c84 100644 --- a/extensions/telegram/src/bot-message-context.session-recreate.test-support.ts +++ b/extensions/telegram/src/bot-message-context.session-recreate.test-support.ts @@ -4,11 +4,7 @@ import { clearRuntimeConfigSnapshot, setRuntimeConfigSnapshot, } from "openclaw/plugin-sdk/runtime-config-snapshot"; -import { - clearSessionStoreCacheForTest, - loadSessionStore, - updateSessionStore, -} from "openclaw/plugin-sdk/session-store-runtime"; +import { getSessionEntry } from "openclaw/plugin-sdk/session-store-runtime"; import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; import { afterAll, afterEach, beforeAll, describe, expect, it } from "vitest"; import { buildTelegramMessageContextForTest } from "./bot-message-context.test-harness.js"; @@ -54,7 +50,6 @@ describe("Telegram direct session recreation after delete", () => { afterEach(() => { clearRuntimeConfigSnapshot(); - clearSessionStoreCacheForTest(); }); afterAll(async () => { @@ -75,30 +70,9 @@ describe("Telegram direct session recreation after delete", () => { messages: { groupChat: { mentionPatterns: [] } }, session: { dmScope: "per-channel-peer" as const, - store: storePath, }, }; setRuntimeConfigSnapshot(cfg as never); - await fs.writeFile( - storePath, - JSON.stringify( - { - [TELEGRAM_DIRECT_KEY]: { - sessionId: "old-session", - updatedAt: 1_700_000_000_000, - chatType: "direct", - channel: "telegram", - }, - }, - null, - 2, - ), - "utf-8", - ); - await updateSessionStore(storePath, (store) => { - delete store[TELEGRAM_DIRECT_KEY]; - }); - const context = await buildTelegramMessageContextForTest({ cfg, message: { @@ -112,16 +86,18 @@ describe("Telegram direct session recreation after delete", () => { }); expect(context).not.toBeNull(); await context?.turn.recordInboundSession({ - storePath: context.turn.storePath, sessionKey: String(context.ctxPayload.SessionKey), ctx: context.ctxPayload as never, updateLastRoute: context.turn.record.updateLastRoute, onRecordError: context.turn.record.onRecordError, }); - const store = loadSessionStore(storePath); + const entry = getSessionEntry({ + agentId: "main", + sessionKey: TELEGRAM_DIRECT_KEY, + }); expect(context?.ctxPayload?.SessionKey).toBe(TELEGRAM_DIRECT_KEY); - expect(store[TELEGRAM_DIRECT_KEY]).toEqual( + expect(entry).toEqual( expect.objectContaining({ lastChannel: "telegram", lastTo: "telegram:7463849194", diff --git a/extensions/telegram/src/bot.test.ts b/extensions/telegram/src/bot.test.ts index c7af5056542..83ad3640adc 100644 --- a/extensions/telegram/src/bot.test.ts +++ b/extensions/telegram/src/bot.test.ts @@ -4,7 +4,6 @@ import { clearPluginInteractiveHandlers, registerPluginInteractiveHandler, } from "openclaw/plugin-sdk/plugin-runtime"; -import { loadSessionStore } from "openclaw/plugin-sdk/session-store-runtime"; import { mockPinnedHostnameResolution } from "openclaw/plugin-sdk/test-env"; import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import type { TelegramInteractiveHandlerContext } from "./interactive-dispatch.js"; @@ -17,6 +16,7 @@ const { getFileSpy, getChatSpy, getLoadConfigMock, + getSessionStoreEntriesForTest, getLoadWebMediaMock, getReadChannelAllowFromStoreMock, getOnHandler, @@ -190,9 +190,7 @@ describe("createTelegramBot", () => { capabilities: { inlineButtons: "dm" }, }, }, - session: { - store: storePath, - }, + session: {}, } satisfies NonNullable[0]["config"]>; loadConfig.mockReturnValue(config); @@ -226,7 +224,7 @@ describe("createTelegramBot", () => { expect(replySpy).not.toHaveBeenCalled(); expect(editMessageTextSpy).not.toHaveBeenCalled(); - expect(loadSessionStore(storePath)).toStrictEqual({}); + expect(getSessionStoreEntriesForTest()).toStrictEqual({}); expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-model-authz-bypass-1"); } finally { await rm(storePath, { force: true }); @@ -265,9 +263,7 @@ describe("createTelegramBot", () => { groups: { "*": { requireMention: false } }, }, }, - session: { - store: storePath, - }, + session: {}, } satisfies NonNullable[0]["config"]>; loadConfig.mockReturnValue(config); @@ -299,7 +295,7 @@ describe("createTelegramBot", () => { expect(replySpy).not.toHaveBeenCalled(); expect(editMessageTextSpy).not.toHaveBeenCalled(); - expect(loadSessionStore(storePath)).toStrictEqual({}); + expect(getSessionStoreEntriesForTest()).toStrictEqual({}); expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-group-model-authz-1"); } finally { await rm(storePath, { force: true }); @@ -338,9 +334,7 @@ describe("createTelegramBot", () => { groups: { "*": { requireMention: false } }, }, }, - session: { - store: storePath, - }, + session: {}, } satisfies NonNullable[0]["config"]>; loadConfig.mockImplementation(() => currentConfig); @@ -381,7 +375,7 @@ describe("createTelegramBot", () => { expect(replySpy).not.toHaveBeenCalled(); expect(editMessageTextSpy).not.toHaveBeenCalled(); - expect(loadSessionStore(storePath)).toStrictEqual({}); + expect(getSessionStoreEntriesForTest()).toStrictEqual({}); expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-group-model-authz-runtime-1"); } finally { loadConfig.mockReset(); @@ -1061,9 +1055,7 @@ describe("createTelegramBot", () => { allowFrom: ["*"], }, }, - session: { - store: storePath, - }, + session: {}, } satisfies NonNullable[0]["config"]>; await rm(storePath, { force: true }); @@ -1104,7 +1096,7 @@ describe("createTelegramBot", () => { "Session selection cleared. Runtime unchanged. New replies use the agent's configured default.", ); - const entry = Object.values(loadSessionStore(storePath))[0]; + const entry = Object.values(getSessionStoreEntriesForTest())[0]; expect(entry?.providerOverride).toBeUndefined(); expect(entry?.modelOverride).toBeUndefined(); expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-model-compact-1"); @@ -1212,9 +1204,7 @@ describe("createTelegramBot", () => { allowFrom: ["*"], }, }, - session: { - store: storePath, - }, + session: {}, }; await rm(storePath, { force: true }); @@ -1255,7 +1245,7 @@ describe("createTelegramBot", () => { "Session selection cleared. Runtime unchanged. New replies use the agent's configured default.", ); - const entry = Object.values(loadSessionStore(storePath))[0]; + const entry = Object.values(getSessionStoreEntriesForTest())[0]; expect(entry?.providerOverride).toBeUndefined(); expect(entry?.modelOverride).toBeUndefined(); expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-model-default-1"); @@ -1289,9 +1279,7 @@ describe("createTelegramBot", () => { allowFrom: ["*"], }, }, - session: { - store: storePath, - }, + session: {}, } satisfies NonNullable[0]["config"]>; loadConfig.mockReturnValue(config); @@ -1330,7 +1318,7 @@ describe("createTelegramBot", () => { expect.objectContaining({ parse_mode: "HTML" }), ); - const entry = Object.values(loadSessionStore(storePath))[0]; + const entry = Object.values(getSessionStoreEntriesForTest())[0]; expect(entry?.providerOverride).toBe("openai"); expect(entry?.modelOverride).toBe("gpt-5.4"); expect(answerCallbackQuerySpy).toHaveBeenCalledWith("cbq-model-html-1"); @@ -1369,9 +1357,7 @@ describe("createTelegramBot", () => { allowFrom: ["*"], }, }, - session: { - store: storePath, - }, + session: {}, } satisfies NonNullable[0]["config"]>; // Fresh config: default changed to anthropic/claude-opus-4-6 @@ -1420,7 +1406,7 @@ describe("createTelegramBot", () => { // Override must be persisted (not cleared) because openai/gpt-5.4 is // NOT the default in the fresh config. - const entry = Object.values(loadSessionStore(storePath))[0]; + const entry = Object.values(getSessionStoreEntriesForTest())[0]; expect(entry?.providerOverride).toBe("openai"); expect(entry?.modelOverride).toBe("gpt-5.4"); } finally { diff --git a/extensions/whatsapp/src/auto-reply/web-auto-reply-utils.test.ts b/extensions/whatsapp/src/auto-reply/web-auto-reply-utils.test.ts index 30cbb117989..1033069e3b6 100644 --- a/extensions/whatsapp/src/auto-reply/web-auto-reply-utils.test.ts +++ b/extensions/whatsapp/src/auto-reply/web-auto-reply-utils.test.ts @@ -2,19 +2,18 @@ import fs from "node:fs/promises"; import path from "node:path"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; import { normalizeMainKey } from "openclaw/plugin-sdk/routing"; -import { saveSessionStore } from "openclaw/plugin-sdk/session-store-runtime"; import { withTempDir } from "openclaw/plugin-sdk/test-env"; import { describe, expect, it, vi } from "vitest"; import type { WhatsAppSendResult } from "../inbound/send-result.js"; import { evaluateSessionFreshness, - loadSessionStore, + getSessionEntry, resolveChannelResetConfig, resolveSessionKey, resolveSessionResetPolicy, resolveSessionResetType, - resolveStorePath, resolveThreadFlag, + upsertSessionEntry, } from "./config.runtime.js"; import { debugMention, @@ -71,8 +70,7 @@ function getSessionSnapshotForTest( { From: from, To: "", Body: "" }, normalizeMainKey(sessionCfg?.mainKey), ); - const store = loadSessionStore(resolveStorePath(sessionCfg?.store)); - const entry = store[key]; + const entry = getSessionEntry({ agentId: "main", sessionKey: key }); const isThread = resolveThreadFlag({ sessionKey: key, messageThreadId: ctx?.messageThreadId ?? null, @@ -260,35 +258,46 @@ describe("getSessionSnapshot", () => { vi.setSystemTime(new Date(2026, 0, 18, 5, 0, 0)); try { await withTempDir("openclaw-snapshot-", async (root) => { - const storePath = path.join(root, "sessions.json"); + const previousStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = root; + const storePath = path.join(root, "agents", "main", "sessions", "sessions.json"); const sessionKey = "agent:main:whatsapp:dm:s1"; - await saveSessionStore(storePath, { - [sessionKey]: { - sessionId: "snapshot-session", - updatedAt: new Date(2026, 0, 18, 3, 30, 0).getTime(), - lastChannel: "whatsapp", - }, - }); - - const cfg = { - session: { - store: storePath, - reset: { mode: "daily", atHour: 4, idleMinutes: 240 }, - resetByChannel: { - whatsapp: { mode: "idle", idleMinutes: 360 }, + try { + upsertSessionEntry({ + agentId: "main", + sessionKey, + entry: { + sessionId: "snapshot-session", + updatedAt: new Date(2026, 0, 18, 3, 30, 0).getTime(), + lastChannel: "whatsapp", }, - }, - } as OpenClawConfig; + }); - const snapshot = getSessionSnapshotForTest(cfg, "whatsapp:+15550001111", { - sessionKey, - }); + const cfg = { + session: { + reset: { mode: "daily", atHour: 4, idleMinutes: 240 }, + resetByChannel: { + whatsapp: { mode: "idle", idleMinutes: 360 }, + }, + }, + } as OpenClawConfig; - expect(snapshot.resetPolicy.mode).toBe("idle"); - expect(snapshot.resetPolicy.idleMinutes).toBe(360); - expect(snapshot.fresh).toBe(true); - expect(snapshot.dailyResetAt).toBeUndefined(); + const snapshot = getSessionSnapshotForTest(cfg, "whatsapp:+15550001111", { + sessionKey, + }); + + expect(snapshot.resetPolicy.mode).toBe("idle"); + expect(snapshot.resetPolicy.idleMinutes).toBe(360); + expect(snapshot.fresh).toBe(true); + expect(snapshot.dailyResetAt).toBeUndefined(); + } finally { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + } }); } finally { vi.useRealTimers(); diff --git a/src/acp/runtime/session-meta.test.ts b/src/acp/runtime/session-meta.test.ts index 0ef60b4b50b..bf04c52ba94 100644 --- a/src/acp/runtime/session-meta.test.ts +++ b/src/acp/runtime/session-meta.test.ts @@ -2,21 +2,22 @@ import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; const hoisted = vi.hoisted(() => { - const resolveAllAgentSessionStoreTargetsMock = vi.fn(); - const loadSessionStoreMock = vi.fn(); + const resolveAllAgentSessionDatabaseTargetsMock = vi.fn(); + const listSessionEntriesMock = vi.fn(); return { - resolveAllAgentSessionStoreTargetsMock, - loadSessionStoreMock, + resolveAllAgentSessionDatabaseTargetsMock, + listSessionEntriesMock, }; }); -vi.mock("../../config/sessions/store-load.js", () => ({ - loadSessionStore: (storePath: string) => hoisted.loadSessionStoreMock(storePath), +vi.mock("../../config/sessions/store.js", () => ({ + listSessionEntries: (params: { agentId: string }) => hoisted.listSessionEntriesMock(params), + getSessionEntry: vi.fn(() => undefined), })); vi.mock("../../config/sessions/targets.js", () => ({ - resolveAllAgentSessionStoreTargets: (cfg: OpenClawConfig, opts: unknown) => - hoisted.resolveAllAgentSessionStoreTargetsMock(cfg, opts), + resolveAllAgentSessionDatabaseTargets: (cfg: OpenClawConfig, opts: unknown) => + hoisted.resolveAllAgentSessionDatabaseTargetsMock(cfg, opts), })); let listAcpSessionEntries: typeof import("./session-meta.js").listAcpSessionEntries; @@ -31,42 +32,40 @@ describe("listAcpSessionEntries", () => { it("reads ACP sessions from resolved configured store targets", async () => { const cfg = { - session: { - store: "/custom/sessions/{agentId}.json", - }, + session: {}, } as OpenClawConfig; - hoisted.resolveAllAgentSessionStoreTargetsMock.mockResolvedValue([ + hoisted.resolveAllAgentSessionDatabaseTargetsMock.mockResolvedValue([ { agentId: "ops", storePath: "/custom/sessions/ops.json", }, ]); - const storedEntry = { - updatedAt: 123, - acp: { - backend: "acpx", - agent: "ops", - mode: "persistent", - state: "idle", + hoisted.listSessionEntriesMock.mockReturnValue([ + { + sessionKey: "agent:ops:acp:s1", + entry: { + updatedAt: 123, + acp: { + backend: "acpx", + agent: "ops", + mode: "persistent", + state: "idle", + }, + }, }, - }; - hoisted.loadSessionStoreMock.mockReturnValue({ - "agent:ops:acp:s1": storedEntry, - }); + ]); const entries = await listAcpSessionEntries({ cfg }); - expect(hoisted.resolveAllAgentSessionStoreTargetsMock).toHaveBeenCalledWith(cfg, undefined); - expect(hoisted.loadSessionStoreMock).toHaveBeenCalledWith("/custom/sessions/ops.json"); + expect(hoisted.resolveAllAgentSessionDatabaseTargetsMock).toHaveBeenCalledWith(cfg, undefined); + expect(hoisted.listSessionEntriesMock).toHaveBeenCalledWith({ agentId: "ops" }); expect(entries).toEqual([ - { - acp: storedEntry.acp, + expect.objectContaining({ cfg, - entry: storedEntry, - storePath: "/custom/sessions/ops.json", + agentId: "ops", sessionKey: "agent:ops:acp:s1", storeSessionKey: "agent:ops:acp:s1", - }, + }), ]); }); }); diff --git a/src/agents/cli-runner/session-history.test.ts b/src/agents/cli-runner/session-history.test.ts index 2d7f0365590..21a9c2ac7af 100644 --- a/src/agents/cli-runner/session-history.test.ts +++ b/src/agents/cli-runner/session-history.test.ts @@ -250,15 +250,14 @@ describe("loadCliSessionHistoryMessages", () => { } }); - it("honors custom session store roots when resolving hook history transcripts", async () => { + it("resolves hook history from SQLite without custom session store roots", async () => { const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-cli-state-")); - const customStoreDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-cli-store-")); + const customTranscriptDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-cli-transcript-")); vi.stubEnv("OPENCLAW_STATE_DIR", stateDir); - const storePath = path.join(customStoreDir, "sessions.json"); const sessionFile = createSessionTranscript({ - rootDir: customStoreDir, + rootDir: customTranscriptDir, sessionId: "session-custom-store", - filePath: path.join(customStoreDir, "session-custom-store.jsonl"), + filePath: path.join(customTranscriptDir, "session-custom-store.jsonl"), messages: ["custom store history"], }); @@ -269,16 +268,14 @@ describe("loadCliSessionHistoryMessages", () => { sessionKey: "agent:main:main", agentId: "main", config: { - session: { - store: storePath, - }, + session: {}, }, }); expect(history).toHaveLength(1); expectMessageFields(history[0], { role: "user", content: "custom store history" }); } finally { fs.rmSync(stateDir, { recursive: true, force: true }); - fs.rmSync(customStoreDir, { recursive: true, force: true }); + fs.rmSync(customTranscriptDir, { recursive: true, force: true }); } }); }); diff --git a/src/agents/command/session.resolve-session-key.test.ts b/src/agents/command/session.resolve-session-key.test.ts index 9be8ea32858..82e503cecf5 100644 --- a/src/agents/command/session.resolve-session-key.test.ts +++ b/src/agents/command/session.resolve-session-key.test.ts @@ -3,17 +3,18 @@ import type { OpenClawConfig } from "../../config/config.js"; import type { SessionEntry } from "../../config/sessions/types.js"; const hoisted = vi.hoisted(() => ({ - loadSessionStoreMock: vi.fn<(storePath: string) => Record>(), + listSessionRowsMock: vi.fn<(agentId: string) => Record>(), listAgentIdsMock: vi.fn<() => string[]>(), })); -vi.mock("../../config/sessions/store-load.js", () => ({ - loadSessionStore: (storePath: string) => hoisted.loadSessionStoreMock(storePath), -})); - -vi.mock("../../config/sessions/paths.js", () => ({ - resolveStorePath: (_store?: string, params?: { agentId?: string }) => - `/stores/${params?.agentId ?? "main"}.json`, +vi.mock("../../config/sessions/store.js", () => ({ + listSessionEntries: (params: { agentId: string }) => + Object.entries(hoisted.listSessionRowsMock(params.agentId) ?? {}).map( + ([sessionKey, entry]) => ({ + sessionKey, + entry, + }), + ), })); vi.mock("../../config/sessions/main-session.js", () => ({ @@ -29,33 +30,31 @@ vi.mock("../agent-scope.js", () => ({ const { resolveSessionKeyForRequest, resolveStoredSessionKeyForSessionId } = await import("./session.js"); -function mockSessionStores(storesByPath: Record>): void { - hoisted.loadSessionStoreMock.mockImplementation((storePath) => storesByPath[storePath] ?? {}); +function mockSessionStores(storesByAgentId: Record>): void { + hoisted.listSessionRowsMock.mockImplementation((agentId) => storesByAgentId[agentId] ?? {}); } function expectResolvedRequestSession(params: { sessionId: string; sessionKey: string; sessionStore: Record; - storePath: string; + agentId: string; }): void { const result = resolveSessionKeyForRequest({ cfg: { - session: { - store: "/stores/{agentId}.json", - }, + session: {}, } satisfies OpenClawConfig, sessionId: params.sessionId, }); expect(result.sessionKey).toBe(params.sessionKey); - expect(result.sessionStore).toBe(params.sessionStore); - expect(result.storePath).toBe(params.storePath); + expect(result.sessionStore).toEqual(params.sessionStore); + expect(result.agentId).toBe(params.agentId); } describe("resolveSessionKeyForRequest", () => { beforeEach(() => { - hoisted.loadSessionStoreMock.mockReset(); + hoisted.listSessionRowsMock.mockReset(); hoisted.listAgentIdsMock.mockReset(); hoisted.listAgentIdsMock.mockReturnValue(["main", "other"]); }); @@ -68,15 +67,15 @@ describe("resolveSessionKeyForRequest", () => { "agent:other:main": { sessionId: "sid", updatedAt: 10 }, } satisfies Record; mockSessionStores({ - "/stores/main.json": mainStore, - "/stores/other.json": otherStore, + main: mainStore, + other: otherStore, }); expectResolvedRequestSession({ sessionId: "sid", sessionKey: "agent:main:main", sessionStore: mainStore, - storePath: "/stores/main.json", + agentId: "main", }); }); @@ -88,15 +87,15 @@ describe("resolveSessionKeyForRequest", () => { "agent:other:acp:sid": { sessionId: "sid", updatedAt: 10 }, } satisfies Record; mockSessionStores({ - "/stores/main.json": mainStore, - "/stores/other.json": otherStore, + main: mainStore, + other: otherStore, }); expectResolvedRequestSession({ sessionId: "sid", sessionKey: "agent:other:acp:sid", sessionStore: otherStore, - storePath: "/stores/other.json", + agentId: "other", }); }); @@ -105,8 +104,8 @@ describe("resolveSessionKeyForRequest", () => { "agent:embedded-agent:main": { sessionId: "other-session", updatedAt: 2 }, "agent:embedded-agent:work": { sessionId: "resume-agent-1", updatedAt: 1 }, } satisfies Record; - hoisted.loadSessionStoreMock.mockImplementation((storePath) => { - if (storePath === "/stores/embedded-agent.json") { + hoisted.listSessionRowsMock.mockImplementation((agentId) => { + if (agentId === "embedded-agent") { return embeddedAgentStore; } return {}; @@ -114,17 +113,15 @@ describe("resolveSessionKeyForRequest", () => { const result = resolveStoredSessionKeyForSessionId({ cfg: { - session: { - store: "/stores/{agentId}.json", - }, + session: {}, } satisfies OpenClawConfig, sessionId: "resume-agent-1", agentId: "embedded-agent", }); expect(result.sessionKey).toBe("agent:embedded-agent:work"); - expect(result.sessionStore).toBe(embeddedAgentStore); - expect(result.storePath).toBe("/stores/embedded-agent.json"); - expect(hoisted.loadSessionStoreMock).toHaveBeenCalledTimes(1); + expect(result.sessionStore).toEqual(embeddedAgentStore); + expect(result.agentId).toBe("embedded-agent"); + expect(hoisted.listSessionRowsMock).toHaveBeenCalledTimes(1); }); }); diff --git a/src/agents/openclaw-tools.subagents.scope.test.ts b/src/agents/openclaw-tools.subagents.scope.test.ts index 94c5eaaacff..ab322fa7cb3 100644 --- a/src/agents/openclaw-tools.subagents.scope.test.ts +++ b/src/agents/openclaw-tools.subagents.scope.test.ts @@ -1,7 +1,10 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; -import { beforeEach, describe, expect, it } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { upsertSessionEntry } from "../config/sessions/store.js"; +import type { SessionEntry } from "../config/sessions/types.js"; +import { closeOpenClawAgentDatabasesForTest } from "../state/openclaw-agent-db.js"; import { callGatewayMock, resetSubagentsConfigOverride, @@ -11,14 +14,15 @@ import { addSubagentRunForTests, resetSubagentRegistryForTests } from "./subagen import { createPerSenderSessionConfig } from "./test-helpers/session-config.js"; import { createSubagentsTool } from "./tools/subagents-tool.js"; -function writeStore(storePath: string, store: Record) { - fs.mkdirSync(path.dirname(storePath), { recursive: true }); - fs.writeFileSync(storePath, JSON.stringify(store, null, 2), "utf-8"); +function writeStore(store: Record) { + for (const [sessionKey, entry] of Object.entries(store)) { + upsertSessionEntry({ agentId: "main", sessionKey, entry }); + } } -function seedLeafOwnedChildSession(storePath: string, leafKey = "agent:main:subagent:leaf") { +async function seedLeafOwnedChildSession(_storePath: string, leafKey = "agent:main:subagent:leaf") { const childKey = `${leafKey}:subagent:child`; - writeStore(storePath, { + writeStore({ [leafKey]: { sessionId: "leaf-session", updatedAt: Date.now(), @@ -59,7 +63,7 @@ async function expectLeafSubagentControlForbidden(params: { callId: string; message?: string; }) { - const { childKey, tool } = seedLeafOwnedChildSession(params.storePath); + const { childKey, tool } = await seedLeafOwnedChildSession(params.storePath); const result = await tool.execute(params.callId, { action: params.action, target: childKey, @@ -74,26 +78,32 @@ async function expectLeafSubagentControlForbidden(params: { describe("openclaw-tools: subagents scope isolation", () => { let storePath = ""; + let stateDir = ""; - beforeEach(() => { + beforeEach(async () => { resetSubagentRegistryForTests(); resetSubagentsConfigOverride(); callGatewayMock.mockReset(); - storePath = path.join( - os.tmpdir(), - `openclaw-subagents-scope-${Date.now()}-${Math.random().toString(16).slice(2)}.json`, - ); + stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-subagents-scope-")); + vi.stubEnv("OPENCLAW_STATE_DIR", stateDir); + storePath = path.join(stateDir, "agents", "main", "sessions", "sessions.json"); setSubagentsConfigOverride({ - session: createPerSenderSessionConfig({ store: storePath }), + session: createPerSenderSessionConfig({}), }); - writeStore(storePath, {}); + writeStore({}); + }); + + afterEach(() => { + closeOpenClawAgentDatabasesForTest(); + vi.unstubAllEnvs(); + fs.rmSync(stateDir, { recursive: true, force: true }); }); it("leaf subagents do not inherit parent sibling control scope", async () => { const leafKey = "agent:main:subagent:leaf"; const siblingKey = "agent:main:subagent:unsandboxed"; - writeStore(storePath, { + writeStore({ [leafKey]: { sessionId: "leaf-session", updatedAt: Date.now(), @@ -154,7 +164,7 @@ describe("openclaw-tools: subagents scope isolation", () => { const workerKey = `${orchestratorKey}:subagent:worker`; const siblingKey = "agent:main:subagent:sibling"; - writeStore(storePath, { + writeStore({ [orchestratorKey]: { sessionId: "orchestrator-session", updatedAt: Date.now(), diff --git a/src/agents/openclaw-tools.subagents.steer-failure-clears-suppression.test.ts b/src/agents/openclaw-tools.subagents.steer-failure-clears-suppression.test.ts index 70de50f44a2..2a92e175097 100644 --- a/src/agents/openclaw-tools.subagents.steer-failure-clears-suppression.test.ts +++ b/src/agents/openclaw-tools.subagents.steer-failure-clears-suppression.test.ts @@ -1,7 +1,8 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; -import { beforeEach, describe, expect, it } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { closeOpenClawAgentDatabasesForTest } from "../state/openclaw-agent-db.js"; import { callGatewayMock, setSubagentsConfigOverride, @@ -14,21 +15,26 @@ import { import { createSubagentsTool } from "./tools/subagents-tool.js"; describe("openclaw-tools: subagents steer failure", () => { - beforeEach(() => { + let stateDir = ""; + + beforeEach(async () => { resetSubagentRegistryForTests(); callGatewayMock.mockClear(); - const storePath = path.join( - os.tmpdir(), - `openclaw-subagents-steer-${Date.now()}-${Math.random().toString(16).slice(2)}.json`, - ); + stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-subagents-steer-")); + vi.stubEnv("OPENCLAW_STATE_DIR", stateDir); + const storePath = path.join(stateDir, "agents", "main", "sessions", "sessions.json"); setSubagentsConfigOverride({ session: { mainKey: "main", scope: "per-sender", - store: storePath, }, }); - fs.writeFileSync(storePath, "{}", "utf-8"); + }); + + afterEach(() => { + closeOpenClawAgentDatabasesForTest(); + vi.unstubAllEnvs(); + fs.rmSync(stateDir, { recursive: true, force: true }); }); it("restores announce behavior when steer replacement dispatch fails", async () => { diff --git a/src/agents/pi-tools.create-openclaw-coding-tools.test.ts b/src/agents/pi-tools.create-openclaw-coding-tools.test.ts index 20fec4ad937..c8ed547f0e0 100644 --- a/src/agents/pi-tools.create-openclaw-coding-tools.test.ts +++ b/src/agents/pi-tools.create-openclaw-coding-tools.test.ts @@ -1,7 +1,9 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { describe, expect, it, vi } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { upsertSessionEntry } from "../config/sessions/store.js"; +import type { SessionEntry } from "../config/sessions/types.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { applyXaiModelCompat, @@ -12,6 +14,7 @@ import { import "./test-helpers/fast-bash-tools.js"; import "./test-helpers/fast-coding-tools.js"; import "./test-helpers/fast-openclaw-tools.js"; +import { closeOpenClawAgentDatabasesForTest } from "../state/openclaw-agent-db.js"; import type { VirtualAgentFs, VirtualAgentFsEntry } from "./filesystem/agent-filesystem.js"; import { createOpenClawTools } from "./openclaw-tools.js"; import { createOpenClawCodingTools } from "./pi-tools.js"; @@ -55,20 +58,17 @@ async function writeSessionStore( agentId: string, entries: Record, ) { - await fs.writeFile( - storeTemplate.replaceAll("{agentId}", agentId), - JSON.stringify(entries, null, 2), - "utf-8", - ); + void storeTemplate; + for (const [sessionKey, entry] of Object.entries(entries)) { + upsertSessionEntry({ agentId, sessionKey, entry: entry as SessionEntry }); + } } function createToolsForStoredSession(storeTemplate: string, sessionKey: string) { return createOpenClawCodingTools({ sessionKey, config: { - session: { - store: storeTemplate, - }, + session: {}, agents: { defaults: { subagents: { @@ -80,6 +80,11 @@ function createToolsForStoredSession(storeTemplate: string, sessionKey: string) }); } +afterEach(() => { + closeOpenClawAgentDatabasesForTest(); + vi.unstubAllEnvs(); +}); + function expectNoSubagentControlTools(tools: ReturnType) { const names = new Set(tools.map((tool) => tool.name)); expect(names.has("sessions_spawn")).toBe(false); @@ -646,7 +651,15 @@ describe("createOpenClawCodingTools", () => { it("uses stored spawnDepth to apply leaf tool policy for flat depth-2 session keys", async () => { const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-depth-policy-")); try { - const storeTemplate = path.join(tmpDir, "sessions-{agentId}.json"); + vi.stubEnv("OPENCLAW_STATE_DIR", path.join(tmpDir, ".openclaw")); + const storeTemplate = path.join( + tmpDir, + ".openclaw", + "agents", + "{agentId}", + "sessions", + "sessions.json", + ); await writeSessionStore(storeTemplate, "main", { "agent:main:subagent:flat": { sessionId: "session-flat-depth-2", @@ -665,7 +678,15 @@ describe("createOpenClawCodingTools", () => { it("applies subagent tool policy to ACP children spawned under a subagent envelope", async () => { const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-acp-subagent-policy-")); try { - const storeTemplate = path.join(tmpDir, "sessions-{agentId}.json"); + vi.stubEnv("OPENCLAW_STATE_DIR", path.join(tmpDir, ".openclaw")); + const storeTemplate = path.join( + tmpDir, + ".openclaw", + "agents", + "{agentId}", + "sessions", + "sessions.json", + ); await writeSessionStore(storeTemplate, "main", { "agent:main:acp:child": { sessionId: "session-acp-child", @@ -715,7 +736,15 @@ describe("createOpenClawCodingTools", () => { it("applies leaf tool policy for cross-agent subagent sessions when spawnDepth is missing", async () => { const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cross-agent-subagent-")); try { - const storeTemplate = path.join(tmpDir, "sessions-{agentId}.json"); + vi.stubEnv("OPENCLAW_STATE_DIR", path.join(tmpDir, ".openclaw")); + const storeTemplate = path.join( + tmpDir, + ".openclaw", + "agents", + "{agentId}", + "sessions", + "sessions.json", + ); await writeSessionStore(storeTemplate, "main", { "agent:main:subagent:parent": { sessionId: "session-main-parent", diff --git a/src/auto-reply/reply.triggers.trigger-handling.targets-active-session-native-stop.e2e.test.ts b/src/auto-reply/reply.triggers.trigger-handling.targets-active-session-native-stop.e2e.test.ts index a049a172e5c..3d346cf1b02 100644 --- a/src/auto-reply/reply.triggers.trigger-handling.targets-active-session-native-stop.e2e.test.ts +++ b/src/auto-reply/reply.triggers.trigger-handling.targets-active-session-native-stop.e2e.test.ts @@ -10,12 +10,17 @@ import { MAIN_SESSION_KEY, makeCfg, mockRunEmbeddedPiAgentOk, - requireSessionStorePath, expectBareNewOrResetAcknowledged, withTempHome, } from "../../test/helpers/auto-reply/trigger-handling-test-harness.js"; import { savePersistedAuthProfileState } from "../agents/auth-profiles/state.js"; -import { loadSessionStore, resolveSessionKey } from "../config/sessions.js"; +import { resolveSessionKey } from "../config/sessions.js"; +import { + deleteSessionEntry, + listSessionEntries, + upsertSessionEntry, +} from "../config/sessions/store.js"; +import type { SessionEntry } from "../config/sessions/types.js"; import { registerGroupIntroPromptCases } from "./reply.triggers.group-intro-prompts.cases.js"; import { registerTriggerHandlingUsageSummaryCases } from "./reply.triggers.trigger-handling.filters-usage-summary-current-model-provider.cases.js"; import { enqueueFollowupRun, getFollowupQueueDepth, type FollowupRun } from "./reply/queue.js"; @@ -158,18 +163,33 @@ async function writeDailyMemoryNotes( } } -async function seedTargetSession(storePath: string, targetSessionKey: string) { - await fs.writeFile( - storePath, - JSON.stringify({ - [targetSessionKey]: { - sessionId: "session-target", - updatedAt: Date.now(), - }, - }), +async function replaceSessionStore( + agentId: string, + store: Record, +): Promise { + for (const { sessionKey } of listSessionEntries({ agentId })) { + deleteSessionEntry({ agentId, sessionKey }); + } + for (const [sessionKey, entry] of Object.entries(store)) { + upsertSessionEntry({ agentId, sessionKey, entry }); + } +} + +function readSessionStore(agentId: string): Record { + return Object.fromEntries( + listSessionEntries({ agentId }).map(({ sessionKey, entry }) => [sessionKey, entry]), ); } +async function seedTargetSession(agentId: string, targetSessionKey: string) { + await replaceSessionStore(agentId, { + [targetSessionKey]: { + sessionId: "session-target", + updatedAt: Date.now(), + }, + }); +} + function makeNativeTelegramCommandMessage(params: { body: string; slashSessionKey: string; @@ -240,18 +260,14 @@ async function expectNextRunUsesTargetSession( } async function writeStoredModelOverride(cfg: ReturnType): Promise { - await fs.writeFile( - requireSessionStorePath(cfg), - JSON.stringify({ - [MAIN_SESSION_KEY]: { - sessionId: "main", - updatedAt: Date.now(), - providerOverride: "openai", - modelOverride: "gpt-5.4", - }, - }), - "utf-8", - ); + await replaceSessionStore("main", { + [MAIN_SESSION_KEY]: { + sessionId: "main", + updatedAt: Date.now(), + providerOverride: "openai", + modelOverride: "gpt-5.4", + }, + }); } function mockSuccessfulCompaction() { @@ -291,10 +307,6 @@ async function expectResetBlockedForNonOwner(params: { home: string }): Promise< ...cfg.commands, ownerAllowFrom: ["whatsapp:+1999"], }; - cfg.session = { - ...cfg.session, - store: join(home, "blocked-reset.sessions.json"), - }; const res = await getReplyFromConfig( { Body: "/reset", @@ -505,7 +517,6 @@ describe("trigger handling", () => { runEmbeddedPiAgentMock.mockReset(); mockEmbeddedOkPayload(); const cfg = makeCfg(home); - cfg.session = { ...cfg.session, store: join(home, `${testCase.label}.sessions.json`) }; await writeStoredModelOverride(cfg); testCase.setup(cfg); await getReplyFromConfig(BASE_MESSAGE, { isHeartbeat: true }, cfg); @@ -519,9 +530,7 @@ describe("trigger handling", () => { it("compacts the active main session", async () => { await withTempHome(async (home) => { - const storePath = join(home, "compact-main.sessions.json"); const cfg = makeCfg(home); - cfg.session = { ...cfg.session, store: storePath }; mockSuccessfulCompaction(); const request = { @@ -541,7 +550,7 @@ describe("trigger handling", () => { const text = maybeReplyText(res); expect(text?.startsWith("⚙️ Compacted")).toBe(true); expect(getCompactEmbeddedPiSessionMock()).toHaveBeenCalledOnce(); - const store = loadSessionStore(storePath); + const store = readSessionStore("main"); const sessionKey = resolveSessionKey("per-sender", request); expect(store[sessionKey]?.compactionCount).toBe(1); }); @@ -552,7 +561,6 @@ describe("trigger handling", () => { getCompactEmbeddedPiSessionMock().mockReset(); mockSuccessfulCompaction(); const cfg = makeCfg(home); - cfg.session = { ...cfg.session, store: join(home, "compact-worker.sessions.json") }; const res = await getReplyFromConfig( { Body: "/compact", @@ -577,23 +585,15 @@ describe("trigger handling", () => { it("aborts native target sessions and clears queued followups", async () => { await withTempHome(async (home) => { const cfg = makeCfg(home); - cfg.session = { ...cfg.session, store: join(home, "native-stop.sessions.json") }; getAbortEmbeddedPiRunMock().mockReset().mockReturnValue(false); - const storePath = cfg.session?.store; - if (!storePath) { - throw new Error("missing session store path"); - } const targetSessionKey = "agent:main:telegram:group:123"; const targetSessionId = "session-target"; - await fs.writeFile( - storePath, - JSON.stringify({ - [targetSessionKey]: { - sessionId: targetSessionId, - updatedAt: Date.now(), - }, - }), - ); + await replaceSessionStore("main", { + [targetSessionKey]: { + sessionId: targetSessionId, + updatedAt: Date.now(), + }, + }); const followupRun: FollowupRun = { prompt: "queued", enqueuedAt: Date.now(), @@ -641,7 +641,7 @@ describe("trigger handling", () => { const text = Array.isArray(res) ? res[0]?.text : res?.text; expect(text).toBe("⚙️ Agent was aborted."); expect(getAbortEmbeddedPiRunMock()).toHaveBeenCalledWith(targetSessionId); - const store = loadSessionStore(storePath); + const store = readSessionStore("main"); expect(store[targetSessionKey]?.abortedLastRun).toBe(true); expect(getFollowupQueueDepth(targetSessionKey)).toBe(0); }); @@ -650,14 +650,12 @@ describe("trigger handling", () => { it("applies native model changes to the target session", async () => { await withTempHome(async (home) => { const cfg = makeCfg(home); - cfg.session = { ...cfg.session, store: join(home, "native-model.sessions.json") }; const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); runEmbeddedPiAgentMock.mockReset(); - const storePath = requireSessionStorePath(cfg); const slashSessionKey = "telegram:slash:111"; const targetSessionKey = MAIN_SESSION_KEY; - await seedTargetSession(storePath, targetSessionKey); + await seedTargetSession("main", targetSessionKey); const res = await getReplyFromConfig( makeNativeTelegramCommandMessage({ @@ -671,7 +669,7 @@ describe("trigger handling", () => { expect(maybeReplyText(res)).toContain("Model set to openai/gpt-4.1-mini"); - const store = loadSessionStore(storePath); + const store = readSessionStore("main"); expect(store[targetSessionKey]?.providerOverride).toBe("openai"); expect(store[targetSessionKey]?.modelOverride).toBe("gpt-4.1-mini"); expect(store[slashSessionKey]).toBeUndefined(); @@ -699,24 +697,19 @@ describe("trigger handling", () => { }, }, }; - cfg.session = { ...cfg.session, store: join(home, "native-model-thread.sessions.json") }; const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); runEmbeddedPiAgentMock.mockReset(); - const storePath = requireSessionStorePath(cfg); const slashSessionKey = "agent:main:telegram:slash:7595562691"; const targetSessionKey = "agent:main:main:thread:7595562691:12812"; - await fs.writeFile( - storePath, - JSON.stringify({ - [targetSessionKey]: { - sessionId: "session-target", - updatedAt: Date.now(), - providerOverride: "zai", - modelOverride: "glm-5.1", - }, - }), - ); + await replaceSessionStore("main", { + [targetSessionKey]: { + sessionId: "session-target", + updatedAt: Date.now(), + providerOverride: "zai", + modelOverride: "glm-5.1", + }, + }); const res = await getReplyFromConfig( makeNativeTelegramCommandMessage({ @@ -730,7 +723,7 @@ describe("trigger handling", () => { expect(maybeReplyText(res)).toContain("Model set to deepseek/deepseek-v4-pro"); - const store = loadSessionStore(storePath); + const store = readSessionStore("main"); expect(store[targetSessionKey]?.providerOverride).toBe("deepseek"); expect(store[targetSessionKey]?.modelOverride).toBe("deepseek-v4-pro"); expect(store[slashSessionKey]).toBeUndefined(); @@ -748,10 +741,8 @@ describe("trigger handling", () => { it("applies native model auth profile overrides to the target session", async () => { await withTempHome(async (home) => { const cfg = makeCfg(home); - cfg.session = { ...cfg.session, store: join(home, "native-model-auth.sessions.json") }; const runEmbeddedPiAgentMock = getRunEmbeddedPiAgentMock(); runEmbeddedPiAgentMock.mockReset(); - const storePath = requireSessionStorePath(cfg); const authDir = join(home, ".openclaw", "agents", "main", "agent"); await fs.mkdir(authDir, { recursive: true }); await fs.writeFile( @@ -788,7 +779,7 @@ describe("trigger handling", () => { const slashSessionKey = "telegram:slash:111"; const targetSessionKey = MAIN_SESSION_KEY; - await seedTargetSession(storePath, targetSessionKey); + await seedTargetSession("main", targetSessionKey); const res = await getReplyFromConfig( makeNativeTelegramCommandMessage({ @@ -802,7 +793,7 @@ describe("trigger handling", () => { expect(maybeReplyText(res)).toContain(`Auth profile set to ${TEST_SECONDARY_PROFILE_ID}`); - const store = loadSessionStore(storePath); + const store = readSessionStore("main"); expect(store[targetSessionKey]?.authProfileOverride).toBe(TEST_SECONDARY_PROFILE_ID); expect(store[targetSessionKey]?.authProfileOverrideSource).toBe("user"); expect(store[slashSessionKey]).toBeUndefined(); diff --git a/src/auto-reply/reply/agent-runner-memory.test.ts b/src/auto-reply/reply/agent-runner-memory.test.ts index bacdd6fe975..27afd102c1b 100644 --- a/src/auto-reply/reply/agent-runner-memory.test.ts +++ b/src/auto-reply/reply/agent-runner-memory.test.ts @@ -3,18 +3,25 @@ import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { SessionEntry } from "../../config/sessions.js"; +import { appendSqliteSessionTranscriptEvent } from "../../config/sessions/transcript-store.sqlite.js"; import { clearMemoryPluginState, registerMemoryCapability, type MemoryFlushPlanResolver, } from "../../plugins/memory-state.js"; +import { closeOpenClawAgentDatabasesForTest } from "../../state/openclaw-agent-db.js"; +import { closeOpenClawStateDatabaseForTest } from "../../state/openclaw-state-db.js"; import type { TemplateContext } from "../templating.js"; import { runMemoryFlushIfNeeded, runPreflightCompactionIfNeeded, setAgentRunnerMemoryTestDeps, } from "./agent-runner-memory.js"; -import { createTestFollowupRun, writeTestSessionStore } from "./agent-runner.test-fixtures.js"; +import { + createTestFollowupRun, + readTestSessionRow, + writeTestSessionRow, +} from "./agent-runner.test-fixtures.js"; const compactEmbeddedPiSessionMock = vi.fn(); const runWithModelFallbackMock = vi.fn(); @@ -34,11 +41,18 @@ function createReplyOperation() { } as never; } +function resolveMainTranscriptPath(rootDir: string, sessionId: string): string { + return path.join(rootDir, "agents", "main", "sessions", `${sessionId}.jsonl`); +} + describe("runMemoryFlushIfNeeded", () => { let rootDir = ""; + let previousStateDir: string | undefined; beforeEach(async () => { rootDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-memory-unit-")); + previousStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = rootDir; registerMemoryFlushPlanResolverForTest(() => ({ softThresholdTokens: 4_000, forceFlushTranscriptBytes: 1_000_000_000, @@ -75,17 +89,11 @@ describe("runMemoryFlushIfNeeded", () => { if (typeof params.newSessionFile === "string" && params.newSessionFile) { nextEntry.sessionFile = params.newSessionFile; } else { - const storePath = typeof params.storePath === "string" ? params.storePath : rootDir; - nextEntry.sessionFile = path.join( - path.dirname(storePath), - `${params.newSessionId}.jsonl`, - ); + nextEntry.sessionFile = resolveMainTranscriptPath(rootDir, params.newSessionId); } } params.sessionStore[sessionKey] = nextEntry; - if (typeof params.storePath === "string") { - await writeTestSessionStore(params.storePath, sessionKey, nextEntry); - } + await writeTestSessionRow(sessionKey, nextEntry); return nextEntry.compactionCount; }); setAgentRunnerMemoryTestDeps({ @@ -103,11 +111,17 @@ describe("runMemoryFlushIfNeeded", () => { afterEach(async () => { setAgentRunnerMemoryTestDeps(); clearMemoryPluginState(); + closeOpenClawAgentDatabasesForTest(); + closeOpenClawStateDatabaseForTest(); + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } await fs.rm(rootDir, { recursive: true, force: true }); }); it("runs a memory flush turn, rotates after compaction, and persists metadata", async () => { - const storePath = path.join(rootDir, "sessions.json"); const sessionKey = "main"; const sessionEntry: SessionEntry = { sessionId: "session", @@ -116,7 +130,7 @@ describe("runMemoryFlushIfNeeded", () => { compactionCount: 1, }; const sessionStore = { [sessionKey]: sessionEntry }; - await writeTestSessionStore(storePath, sessionKey, sessionEntry); + await writeTestSessionRow(sessionKey, sessionEntry); runEmbeddedPiAgentMock.mockImplementationOnce( async (params: { @@ -149,7 +163,6 @@ describe("runMemoryFlushIfNeeded", () => { sessionEntry, sessionStore, sessionKey, - storePath, isHeartbeat: false, replyOperation: createReplyOperation(), }); @@ -175,13 +188,11 @@ describe("runMemoryFlushIfNeeded", () => { nextSessionFile: expect.stringContaining("session-rotated.jsonl"), }); - const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { - main: SessionEntry; - }; - expect(persisted.main.sessionId).toBe("session-rotated"); - expect(persisted.main.compactionCount).toBe(2); - expect(persisted.main.memoryFlushCompactionCount).toBe(1); - expect(persisted.main.memoryFlushAt).toBe(1_700_000_000_000); + const persisted = readTestSessionRow(sessionKey); + expect(persisted?.sessionId).toBe("session-rotated"); + expect(persisted?.compactionCount).toBe(2); + expect(persisted?.memoryFlushCompactionCount).toBe(1); + expect(persisted?.memoryFlushAt).toBe(1_700_000_000_000); }); it("runs memory flush on the configured maintenance model without active fallbacks", async () => { @@ -317,11 +328,16 @@ describe("runMemoryFlushIfNeeded", () => { it("passes runtime policy session key to preflight compaction sandbox resolution", async () => { const sessionFile = path.join(rootDir, "session.jsonl"); - await fs.writeFile( - sessionFile, - `${JSON.stringify({ message: { role: "user", content: "x".repeat(5_000) } })}\n`, - "utf8", - ); + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "message", + id: "m1", + message: { role: "user", content: "x".repeat(5_000) }, + }, + }); registerMemoryFlushPlanResolverForTest(() => ({ softThresholdTokens: 1, forceFlushTranscriptBytes: 1_000_000_000, @@ -351,7 +367,6 @@ describe("runMemoryFlushIfNeeded", () => { sessionStore: { "agent:main:main": sessionEntry }, sessionKey: "agent:main:main", runtimePolicySessionKey: "agent:main:telegram:default:direct:12345", - storePath: path.join(rootDir, "sessions.json"), isHeartbeat: false, replyOperation: createReplyOperation(), }); @@ -367,11 +382,16 @@ describe("runMemoryFlushIfNeeded", () => { it("updates the active preflight run after transcript rotation", async () => { const sessionFile = path.join(rootDir, "session.jsonl"); const successorFile = path.join(rootDir, "session-rotated.jsonl"); - await fs.writeFile( - sessionFile, - `${JSON.stringify({ message: { role: "user", content: "x".repeat(5_000) } })}\n`, - "utf8", - ); + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "message", + id: "m1", + message: { role: "user", content: "x".repeat(5_000) }, + }, + }); registerMemoryFlushPlanResolverForTest(() => ({ softThresholdTokens: 1, forceFlushTranscriptBytes: 1_000_000_000, @@ -416,7 +436,6 @@ describe("runMemoryFlushIfNeeded", () => { sessionEntry, sessionStore, sessionKey: "agent:main:main", - storePath: path.join(rootDir, "sessions.json"), isHeartbeat: false, replyOperation, }); @@ -436,17 +455,20 @@ describe("runMemoryFlushIfNeeded", () => { it("includes recent output tokens when deciding preflight compaction", async () => { const sessionFile = path.join(rootDir, "session-usage.jsonl"); - await fs.writeFile( - sessionFile, - `${JSON.stringify({ + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "message", + id: "m1", message: { role: "assistant", content: "large answer", usage: { input: 90_000, output: 10_000 }, }, - })}\n`, - "utf8", - ); + }, + }); registerMemoryFlushPlanResolverForTest(() => ({ softThresholdTokens: 4_000, forceFlushTranscriptBytes: 1_000_000_000, @@ -474,7 +496,6 @@ describe("runMemoryFlushIfNeeded", () => { sessionEntry, sessionStore: { main: sessionEntry }, sessionKey: "main", - storePath: path.join(rootDir, "sessions.json"), isHeartbeat: false, replyOperation: createReplyOperation(), }); @@ -487,17 +508,20 @@ describe("runMemoryFlushIfNeeded", () => { it("uses the active run sessionFile when the session entry has no transcript path", async () => { const sessionFile = path.join(rootDir, "active-run-session.jsonl"); - await fs.writeFile( - sessionFile, - `${JSON.stringify({ + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "message", + id: "m1", message: { role: "assistant", content: "large answer", usage: { input: 90_000, output: 8_000 }, }, - })}\n`, - "utf8", - ); + }, + }); registerMemoryFlushPlanResolverForTest(() => ({ softThresholdTokens: 4_000, forceFlushTranscriptBytes: 1_000_000_000, @@ -524,7 +548,6 @@ describe("runMemoryFlushIfNeeded", () => { sessionEntry, sessionStore: { main: sessionEntry }, sessionKey: "main", - storePath: path.join(rootDir, "sessions.json"), isHeartbeat: false, replyOperation: createReplyOperation(), }); @@ -539,25 +562,33 @@ describe("runMemoryFlushIfNeeded", () => { it("keeps preflight compaction conservative for content appended after latest usage", async () => { const sessionFile = path.join(rootDir, "post-usage-tail-session.jsonl"); - await fs.writeFile( - sessionFile, - [ - JSON.stringify({ - message: { - role: "assistant", - content: "small answer", - usage: { input: 40_000, output: 2_000 }, - }, - }), - JSON.stringify({ - message: { - role: "tool", - content: `large interrupted tool output ${"x".repeat(450_000)}`, - }, - }), - ].join("\n"), - "utf8", - ); + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "message", + id: "m1", + message: { + role: "assistant", + content: "small answer", + usage: { input: 40_000, output: 2_000 }, + }, + }, + }); + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "message", + id: "m2", + message: { + role: "tool", + content: `large interrupted tool output ${"x".repeat(450_000)}`, + }, + }, + }); registerMemoryFlushPlanResolverForTest(() => ({ softThresholdTokens: 4_000, forceFlushTranscriptBytes: 1_000_000_000, @@ -585,7 +616,6 @@ describe("runMemoryFlushIfNeeded", () => { sessionEntry, sessionStore: { main: sessionEntry }, sessionKey: "main", - storePath: path.join(rootDir, "sessions.json"), isHeartbeat: false, replyOperation: createReplyOperation(), }); @@ -598,25 +628,33 @@ describe("runMemoryFlushIfNeeded", () => { it("combines latest usage with post-usage tail pressure for preflight compaction", async () => { const sessionFile = path.join(rootDir, "combined-tail-pressure-session.jsonl"); - await fs.writeFile( - sessionFile, - [ - JSON.stringify({ - message: { - role: "assistant", - content: "small answer", - usage: { input: 86_000, output: 2_000 }, - }, - }), - JSON.stringify({ - message: { - role: "tool", - content: `moderate interrupted tool output ${"x".repeat(36_000)}`, - }, - }), - ].join("\n"), - "utf8", - ); + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "message", + id: "m1", + message: { + role: "assistant", + content: "small answer", + usage: { input: 86_000, output: 2_000 }, + }, + }, + }); + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "message", + id: "m2", + message: { + role: "tool", + content: `moderate interrupted tool output ${"x".repeat(36_000)}`, + }, + }, + }); registerMemoryFlushPlanResolverForTest(() => ({ softThresholdTokens: 4_000, forceFlushTranscriptBytes: 1_000_000_000, @@ -644,7 +682,6 @@ describe("runMemoryFlushIfNeeded", () => { sessionEntry, sessionStore: { main: sessionEntry }, sessionKey: "main", - storePath: path.join(rootDir, "sessions.json"), isHeartbeat: false, replyOperation: createReplyOperation(), }); @@ -657,23 +694,29 @@ describe("runMemoryFlushIfNeeded", () => { it("does not count bytes from a large latest usage record as post-usage tail pressure", async () => { const sessionFile = path.join(rootDir, "large-usage-record-session.jsonl"); - await fs.writeFile( - sessionFile, - [ - JSON.stringify({ - type: "session", - id: "session", - }), - JSON.stringify({ - message: { - role: "assistant", - content: `large answer ${"x".repeat(300_000)}`, - usage: { input: 40_000, output: 2_000 }, - }, - }), - ].join("\n"), - "utf8", - ); + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "session", + id: "session", + }, + }); + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "message", + id: "m1", + message: { + role: "assistant", + content: `large answer ${"x".repeat(300_000)}`, + usage: { input: 40_000, output: 2_000 }, + }, + }, + }); registerMemoryFlushPlanResolverForTest(() => ({ softThresholdTokens: 4_000, forceFlushTranscriptBytes: 1_000_000_000, @@ -701,7 +744,6 @@ describe("runMemoryFlushIfNeeded", () => { sessionEntry, sessionStore: { main: sessionEntry }, sessionKey: "main", - storePath: path.join(rootDir, "sessions.json"), isHeartbeat: false, replyOperation: createReplyOperation(), }); @@ -712,27 +754,38 @@ describe("runMemoryFlushIfNeeded", () => { it("does not treat raw transcript metadata bytes as token pressure", async () => { const sessionFile = path.join(rootDir, "metadata-heavy-session.jsonl"); - await fs.writeFile( - sessionFile, - [ - JSON.stringify({ - type: "session", - id: "session", - }), - JSON.stringify({ - type: "custom", - payload: "x".repeat(450_000), - }), - JSON.stringify({ - message: { - role: "assistant", - content: "small answer", - usage: { input: 40_000, output: 2_000 }, - }, - }), - ].join("\n"), - "utf8", - ); + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "session", + id: "session", + }, + }); + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "custom", + payload: "x".repeat(450_000), + }, + }); + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "message", + id: "m1", + message: { + role: "assistant", + content: "small answer", + usage: { input: 40_000, output: 2_000 }, + }, + }, + }); registerMemoryFlushPlanResolverForTest(() => ({ softThresholdTokens: 4_000, forceFlushTranscriptBytes: 1_000_000_000, @@ -770,7 +823,6 @@ describe("runMemoryFlushIfNeeded", () => { sessionEntry, sessionStore: { main: sessionEntry }, sessionKey: "main", - storePath: path.join(rootDir, "sessions.json"), isHeartbeat: false, replyOperation: createReplyOperation(), }); @@ -781,11 +833,16 @@ describe("runMemoryFlushIfNeeded", () => { it("triggers preflight compaction when the active transcript exceeds the configured byte threshold", async () => { const sessionFile = path.join(rootDir, "large-session.jsonl"); - await fs.writeFile( - sessionFile, - `${JSON.stringify({ message: { role: "user", content: "x".repeat(256) } })}\n`, - "utf8", - ); + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "message", + id: "m1", + message: { role: "user", content: "x".repeat(256) }, + }, + }); const sessionEntry: SessionEntry = { sessionId: "session", sessionFile, @@ -822,7 +879,6 @@ describe("runMemoryFlushIfNeeded", () => { sessionEntry, sessionStore, sessionKey: "main", - storePath: path.join(rootDir, "sessions.json"), isHeartbeat: false, replyOperation: replyOperation as never, }); @@ -847,11 +903,16 @@ describe("runMemoryFlushIfNeeded", () => { it("keeps the active transcript byte threshold inactive unless transcript rotation is enabled", async () => { const sessionFile = path.join(rootDir, "large-session-no-rotation.jsonl"); - await fs.writeFile( - sessionFile, - `${JSON.stringify({ message: { role: "user", content: "x".repeat(256) } })}\n`, - "utf8", - ); + appendSqliteSessionTranscriptEvent({ + agentId: "main", + sessionId: "session", + transcriptPath: sessionFile, + event: { + type: "message", + id: "m1", + message: { role: "user", content: "x".repeat(256) }, + }, + }); const sessionEntry: SessionEntry = { sessionId: "session", sessionFile, diff --git a/src/auto-reply/reply/agent-runner-session-reset.test.ts b/src/auto-reply/reply/agent-runner-session-reset.test.ts index 258e31a6fe4..4017a786dac 100644 --- a/src/auto-reply/reply/agent-runner-session-reset.test.ts +++ b/src/auto-reply/reply/agent-runner-session-reset.test.ts @@ -3,24 +3,29 @@ import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { SessionEntry } from "../../config/sessions.js"; +import { closeOpenClawAgentDatabasesForTest } from "../../state/openclaw-agent-db.js"; +import { closeOpenClawStateDatabaseForTest } from "../../state/openclaw-state-db.js"; import { resetReplyRunSession, setAgentRunnerSessionResetTestDeps, } from "./agent-runner-session-reset.js"; -import { createTestFollowupRun, writeTestSessionStore } from "./agent-runner.test-fixtures.js"; +import { + createTestFollowupRun, + readTestSessionRow, + writeTestSessionRow, +} from "./agent-runner.test-fixtures.js"; const refreshQueuedFollowupSessionMock = vi.fn(); const errorMock = vi.fn(); -async function expectPathMissing(targetPath: string): Promise { - await expect(fs.access(targetPath)).rejects.toMatchObject({ code: "ENOENT" }); -} - describe("resetReplyRunSession", () => { let rootDir = ""; + let previousStateDir: string | undefined; beforeEach(async () => { rootDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-reset-run-")); + previousStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = rootDir; refreshQueuedFollowupSessionMock.mockReset(); errorMock.mockReset(); setAgentRunnerSessionResetTestDeps({ @@ -32,15 +37,23 @@ describe("resetReplyRunSession", () => { afterEach(async () => { setAgentRunnerSessionResetTestDeps(); + closeOpenClawAgentDatabasesForTest(); + closeOpenClawStateDatabaseForTest(); + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + previousStateDir = undefined; await fs.rm(rootDir, { recursive: true, force: true }); }); it("rotates the session and clears stale runtime and fallback fields", async () => { - const storePath = path.join(rootDir, "sessions.json"); + const sessionsDir = path.join(rootDir, "agents", "main", "sessions"); const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: 1, - sessionFile: path.join(rootDir, "session.jsonl"), + sessionFile: path.join(sessionsDir, "session.jsonl"), modelProvider: "qwencode", model: "qwen", contextTokens: 123, @@ -58,7 +71,7 @@ describe("resetReplyRunSession", () => { }; const sessionStore = { main: sessionEntry }; const followupRun = createTestFollowupRun(); - await writeTestSessionStore(storePath, "main", sessionEntry); + await writeTestSessionRow("main", sessionEntry); let activeSessionEntry: SessionEntry | undefined = sessionEntry; let isNewSession = false; @@ -71,7 +84,6 @@ describe("resetReplyRunSession", () => { queueKey: "main", activeSessionEntry, activeSessionStore: sessionStore, - storePath, followupRun, onActiveSessionEntry: (entry) => { activeSessionEntry = entry; @@ -100,41 +112,43 @@ describe("resetReplyRunSession", () => { }); expect(errorMock).toHaveBeenCalledWith("reset 00000000-0000-0000-0000-000000000123"); - const persisted = JSON.parse(await fs.readFile(storePath, "utf8")) as { - main: SessionEntry; - }; - expect(persisted.main.sessionId).toBe(activeSessionEntry?.sessionId); - expect(persisted.main.fallbackNoticeReason).toBeUndefined(); + const persisted = readTestSessionRow("main"); + expect(persisted?.sessionId).toBe(activeSessionEntry?.sessionId); + expect(persisted?.fallbackNoticeReason).toBeUndefined(); }); - it("cleans up the old transcript when requested", async () => { - const storePath = path.join(rootDir, "sessions.json"); - const oldTranscriptPath = path.join(rootDir, "old-session.jsonl"); - await fs.writeFile(oldTranscriptPath, "old", "utf8"); + it("rotates from the SQLite row when no in-memory store is available", async () => { + const sessionsDir = path.join(rootDir, "agents", "main", "sessions"); const sessionEntry: SessionEntry = { - sessionId: "old-session", + sessionId: "session", updatedAt: 1, - sessionFile: oldTranscriptPath, + sessionFile: path.join(sessionsDir, "session.jsonl"), + totalTokens: 42, + compactionCount: 1, }; - const sessionStore = { main: sessionEntry }; - await writeTestSessionStore(storePath, "main", sessionEntry); + await writeTestSessionRow("main", sessionEntry); - await resetReplyRunSession({ + const followupRun = createTestFollowupRun(); + let activeSessionEntry: SessionEntry | undefined; + const reset = await resetReplyRunSession({ options: { - failureLabel: "role ordering conflict", - cleanupTranscripts: true, + failureLabel: "role ordering", buildLogMessage: (next) => `reset ${next}`, }, sessionKey: "main", queueKey: "main", - activeSessionEntry: sessionEntry, - activeSessionStore: sessionStore, - storePath, - followupRun: createTestFollowupRun(), - onActiveSessionEntry: () => {}, + followupRun, + onActiveSessionEntry: (entry) => { + activeSessionEntry = entry; + }, onNewSession: () => {}, }); - await expectPathMissing(oldTranscriptPath); + expect(reset).toBe(true); + expect(activeSessionEntry?.sessionId).toBe("00000000-0000-0000-0000-000000000123"); + expect(activeSessionEntry?.totalTokens).toBeUndefined(); + expect(activeSessionEntry?.compactionCount).toBe(1); + expect(followupRun.run.sessionId).toBe(activeSessionEntry?.sessionId); + expect(readTestSessionRow("main")?.sessionId).toBe(activeSessionEntry?.sessionId); }); }); diff --git a/src/auto-reply/reply/agent-runner.test-fixtures.ts b/src/auto-reply/reply/agent-runner.test-fixtures.ts index f66ee0912c7..831258001b9 100644 --- a/src/auto-reply/reply/agent-runner.test-fixtures.ts +++ b/src/auto-reply/reply/agent-runner.test-fixtures.ts @@ -1,6 +1,9 @@ -import fs from "node:fs/promises"; -import path from "node:path"; -import type { SessionEntry } from "../../config/sessions.js"; +import { + getSessionEntry, + resolveAgentIdFromSessionKey, + type SessionEntry, + upsertSessionEntry, +} from "../../config/sessions.js"; import type { FollowupRun } from "./queue.js"; export function createTestFollowupRun(overrides: Partial = {}): FollowupRun { @@ -32,11 +35,17 @@ export function createTestFollowupRun(overrides: Partial = { } as unknown as FollowupRun; } -export async function writeTestSessionStore( - storePath: string, - sessionKey: string, - entry: SessionEntry, -): Promise { - await fs.mkdir(path.dirname(storePath), { recursive: true }); - await fs.writeFile(storePath, JSON.stringify({ [sessionKey]: entry }, null, 2), "utf8"); +export async function writeTestSessionRow(sessionKey: string, entry: SessionEntry): Promise { + upsertSessionEntry({ + agentId: resolveAgentIdFromSessionKey(sessionKey), + sessionKey, + entry, + }); +} + +export function readTestSessionRow(sessionKey: string): SessionEntry | undefined { + return getSessionEntry({ + agentId: resolveAgentIdFromSessionKey(sessionKey), + sessionKey, + }); } diff --git a/src/auto-reply/reply/session.heartbeat-no-reset.test.ts b/src/auto-reply/reply/session.heartbeat-no-reset.test.ts index 583b59cdd3f..59e1b0a9b2e 100644 --- a/src/auto-reply/reply/session.heartbeat-no-reset.test.ts +++ b/src/auto-reply/reply/session.heartbeat-no-reset.test.ts @@ -2,8 +2,10 @@ import fs from "node:fs/promises"; import path from "node:path"; import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; -import { loadSessionStore, saveSessionStore } from "../../config/sessions/store.js"; +import { listSessionEntries, upsertSessionEntry } from "../../config/sessions/store.js"; +import { replaceSqliteSessionTranscriptEvents } from "../../config/sessions/transcript-store.sqlite.js"; import type { SessionEntry } from "../../config/sessions/types.js"; +import { closeOpenClawAgentDatabasesForTest } from "../../state/openclaw-agent-db.js"; import type { MsgContext } from "../templating.js"; import { initSessionState } from "./session.js"; @@ -17,10 +19,11 @@ describe("initSessionState - heartbeat should not trigger session reset", () => beforeEach(async () => { tempDir = await fs.mkdtemp("/tmp/openclaw-test-"); - storePath = path.join(tempDir, "sessions.json"); + storePath = path.join(tempDir, "agents", "main", "sessions", "sessions.json"); }); afterEach(async () => { + closeOpenClawAgentDatabasesForTest(); await fs.rm(tempDir, { recursive: true, force: true }); }); @@ -37,7 +40,6 @@ describe("initSessionState - heartbeat should not trigger session reset", () => ], }, session: { - store: storePath, reset: { mode: "idle", idleMinutes: 5, // 5 minutes idle timeout @@ -72,8 +74,11 @@ describe("initSessionState - heartbeat should not trigger session reset", () => updatedAt: number, overrides: Partial = {}, ): Promise => { - await saveSessionStore(storePath, { - "main:user123": { + void storePath; + upsertSessionEntry({ + agentId: "main", + sessionKey: "main:user123", + entry: { sessionId, updatedAt, systemSent: true, @@ -82,14 +87,27 @@ describe("initSessionState - heartbeat should not trigger session reset", () => }); }; - const expectPersistedSession = (sessionStore: Record): SessionEntry => { - const entry = sessionStore["main:user123"]; - if (!entry) { - throw new Error("Expected persisted session for main:user123"); - } - return entry; - }; + const readStoredSessions = (): Record => + Object.fromEntries( + listSessionEntries({ agentId: "main" }).map(({ sessionKey, entry }) => [sessionKey, entry]), + ); + const writeSessionHeader = (sessionFile: string, sessionId: string, startedAt: number): void => { + replaceSqliteSessionTranscriptEvents({ + agentId: "main", + sessionId, + transcriptPath: sessionFile, + events: [ + { + type: "session", + version: 3, + id: sessionId, + timestamp: new Date(startedAt).toISOString(), + cwd: tempDir, + }, + ], + }); + }; it("should NOT reset session when Provider is 'heartbeat'", async () => { // Setup: Create a session entry that is "stale" (older than idle timeout) const now = Date.now(); @@ -198,8 +216,8 @@ describe("initSessionState - heartbeat should not trigger session reset", () => expect(heartbeatResult.sessionId).toBe("daily-session-id"); expect(heartbeatResult.sessionEntry.lastInteractionAt).toBe(staleTime); - const persistedAfterHeartbeat = loadSessionStore(storePath); - expect(expectPersistedSession(persistedAfterHeartbeat).lastInteractionAt).toBe(staleTime); + const persistedAfterHeartbeat = readStoredSessions(); + expect(persistedAfterHeartbeat["main:user123"]?.lastInteractionAt).toBe(staleTime); const userResult = await initSessionState({ ctx: createBaseCtx({ @@ -214,21 +232,11 @@ describe("initSessionState - heartbeat should not trigger session reset", () => expect(userResult.sessionId).not.toBe("daily-session-id"); }); - it("resets legacy daily sessions using the JSONL header even when updatedAt is fresh", async () => { + it("resets daily sessions using the transcript header even when updatedAt is fresh", async () => { const now = Date.now(); const staleTime = now - 25 * 60 * 60 * 1000; const sessionFile = path.join(tempDir, "legacy-daily-session.jsonl"); - await fs.writeFile( - sessionFile, - `${JSON.stringify({ - type: "session", - version: 3, - id: "legacy-daily-session", - timestamp: new Date(staleTime).toISOString(), - cwd: tempDir, - })}\n`, - "utf8", - ); + writeSessionHeader(sessionFile, "legacy-daily-session", staleTime); await saveExistingSession("legacy-daily-session", now, { sessionFile, lastInteractionAt: staleTime, @@ -253,21 +261,11 @@ describe("initSessionState - heartbeat should not trigger session reset", () => expect(result.sessionId).not.toBe("legacy-daily-session"); }); - it("does not let heartbeat keep a legacy idle session fresh without lastInteractionAt", async () => { + it("does not let heartbeat keep an idle session fresh without lastInteractionAt", async () => { const now = Date.now(); const staleTime = now - 10 * 60 * 1000; const sessionFile = path.join(tempDir, "legacy-idle-session.jsonl"); - await fs.writeFile( - sessionFile, - `${JSON.stringify({ - type: "session", - version: 3, - id: "legacy-idle-session", - timestamp: new Date(staleTime).toISOString(), - cwd: tempDir, - })}\n`, - "utf8", - ); + writeSessionHeader(sessionFile, "legacy-idle-session", staleTime); await saveExistingSession("legacy-idle-session", now, { sessionFile, }); @@ -285,8 +283,8 @@ describe("initSessionState - heartbeat should not trigger session reset", () => expect(heartbeatResult.isNewSession).toBe(false); expect(heartbeatResult.sessionId).toBe("legacy-idle-session"); - const persistedAfterHeartbeat = loadSessionStore(storePath); - expect(expectPersistedSession(persistedAfterHeartbeat).lastInteractionAt).toBeUndefined(); + const persistedAfterHeartbeat = readStoredSessions(); + expect(persistedAfterHeartbeat["main:user123"]?.lastInteractionAt).toBeUndefined(); const userResult = await initSessionState({ ctx: createBaseCtx({ diff --git a/src/commands/agent-via-gateway.test.ts b/src/commands/agent-via-gateway.test.ts index 3d37203cf89..4b440fed459 100644 --- a/src/commands/agent-via-gateway.test.ts +++ b/src/commands/agent-via-gateway.test.ts @@ -44,7 +44,6 @@ function mockConfig(storePath: string, overrides?: Partial) { }, }, session: { - store: storePath, mainKey: "main", ...overrides?.session, }, @@ -57,7 +56,7 @@ async function withTempStore( overrides?: Partial, ) { const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-agent-cli-")); - const store = path.join(dir, "sessions.json"); + const store = path.join(dir, "agents", "main", "sessions", "sessions.json"); mockConfig(store, overrides); try { await fn({ dir, store }); diff --git a/src/commands/agent.runtime-config.test.ts b/src/commands/agent.runtime-config.test.ts index 6869337517e..e35d933f982 100644 --- a/src/commands/agent.runtime-config.test.ts +++ b/src/commands/agent.runtime-config.test.ts @@ -63,7 +63,7 @@ async function withTempHome(fn: (home: string) => Promise): Promise { return withTempHomeBase(fn, { prefix: "openclaw-agent-" }); } -function mockConfig(home: string, storePath: string): OpenClawConfig { +function mockConfig(home: string): OpenClawConfig { const cfg = { agents: { defaults: { @@ -72,7 +72,7 @@ function mockConfig(home: string, storePath: string): OpenClawConfig { workspace: path.join(home, "openclaw"), }, }, - session: { store: storePath, mainKey: "main" }, + session: { mainKey: "main" }, } as OpenClawConfig; loadConfigMock.mockReturnValue(cfg); return cfg; @@ -89,7 +89,6 @@ beforeEach(() => { describe("agentCommand runtime config", () => { it("sets runtime snapshots from source config before embedded agent run", async () => { await withTempHome(async (home) => { - const store = path.join(home, "sessions.json"); const loadedConfig = { agents: { defaults: { @@ -98,7 +97,7 @@ describe("agentCommand runtime config", () => { workspace: path.join(home, "openclaw"), }, }, - session: { store, mainKey: "main" }, + session: { mainKey: "main" }, models: { providers: { openai: { @@ -163,8 +162,7 @@ describe("agentCommand runtime config", () => { it("includes channel secret targets when delivery is requested", async () => { await withTempHome(async (home) => { - const store = path.join(home, "sessions.json"); - const loadedConfig = mockConfig(home, store); + const loadedConfig = mockConfig(home); loadedConfig.channels = { telegram: { botToken: { source: "env", provider: "default", id: "TELEGRAM_BOT_TOKEN" }, @@ -187,8 +185,7 @@ describe("agentCommand runtime config", () => { it("skips command secret resolution when no relevant SecretRef values exist", async () => { await withTempHome(async (home) => { - const store = path.join(home, "sessions.json"); - const loadedConfig = mockConfig(home, store); + const loadedConfig = mockConfig(home); const prepared = await resolveAgentRuntimeConfig(runtime); @@ -199,20 +196,13 @@ describe("agentCommand runtime config", () => { it("derives a fresh session from --to", async () => { await withTempHome(async (home) => { - const store = path.join(home, "sessions.json"); - const cfg = mockConfig(home, store); + const cfg = mockConfig(home); const resolved = resolveSession({ cfg, to: "+1555" }); - expect(resolved.storePath).toBe(store); - expect(resolved.sessionKey).toBeTypeOf("string"); - const sessionKey = resolved.sessionKey; - if (!sessionKey) { - throw new Error("expected session key"); - } - expect(sessionKey.length).toBeGreaterThan(0); - expect(resolved.sessionId).toBeTypeOf("string"); - expect(resolved.sessionId.length).toBeGreaterThan(0); + expect(resolved.agentId).toBe("main"); + expect(resolved.sessionKey).toBeTruthy(); + expect(resolved.sessionId).toBeTruthy(); expect(resolved.isNewSession).toBe(true); }); }); diff --git a/src/commands/agents.delete.test.ts b/src/commands/agents.delete.test.ts index cd1a7289181..4fd19b8a489 100644 --- a/src/commands/agents.delete.test.ts +++ b/src/commands/agents.delete.test.ts @@ -1,7 +1,7 @@ import fs from "node:fs/promises"; import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; -import { loadSessionStore, resolveStorePath, saveSessionStore } from "../config/sessions.js"; +import { listSessionEntries, upsertSessionEntry } from "../config/sessions/store.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { withStateDirEnv } from "../test-helpers/state-dir-env.js"; import { baseConfigSnapshot, createTestRuntime } from "./test-runtime-config-helpers.js"; @@ -39,6 +39,26 @@ import { agentsDeleteCommand } from "./agents.js"; const runtime = createTestRuntime(); +function readSessionRowsForAgent( + agentId: string, +): Record { + return Object.fromEntries( + listSessionEntries({ agentId }).map(({ sessionKey, entry }) => [ + sessionKey, + { sessionId: entry.sessionId, updatedAt: entry.updatedAt }, + ]), + ); +} + +function writeSessionRowsForAgent( + agentId: string, + sessions: Record, +): void { + for (const [sessionKey, entry] of Object.entries(sessions)) { + upsertSessionEntry({ agentId, sessionKey, entry }); + } +} + async function arrangeAgentsDeleteTest(params: { stateDir: string; cfg: OpenClawConfig; @@ -46,8 +66,7 @@ async function arrangeAgentsDeleteTest(params: { sessions: Record; }) { const deletedAgentId = params.deletedAgentId ?? "ops"; - const storePath = resolveStorePath(params.cfg.session?.store, { agentId: deletedAgentId }); - await saveSessionStore(storePath, params.sessions); + writeSessionRowsForAgent(deletedAgentId, params.sessions); await fs.mkdir(path.join(params.stateDir, `workspace-${deletedAgentId}`), { recursive: true }); await fs.mkdir(path.join(params.stateDir, "agents", deletedAgentId, "agent"), { recursive: true, @@ -61,14 +80,14 @@ async function arrangeAgentsDeleteTest(params: { resolved: params.cfg, }); - return storePath; + return deletedAgentId; } function expectSessionStore( - storePath: string, + agentId: string, sessions: Record, ) { - expect(loadSessionStore(storePath)).toEqual(sessions); + expect(readSessionRowsForAgent(agentId)).toEqual(sessions); } function readJsonLogs(): Array> { @@ -216,39 +235,6 @@ describe("agents delete command", () => { }); }); - it("preserves shared-store legacy default keys when deleting another agent", async () => { - await withStateDirEnv("openclaw-agents-delete-shared-store-", async ({ stateDir }) => { - const now = Date.now(); - const cfg: OpenClawConfig = { - session: { store: path.join(stateDir, "sessions.json") }, - agents: { - list: [ - { id: "main", default: true, workspace: path.join(stateDir, "workspace-main") }, - { id: "ops", workspace: path.join(stateDir, "workspace-ops") }, - ], - }, - }; - const storePath = await arrangeAgentsDeleteTest({ - stateDir, - cfg, - sessions: { - main: { sessionId: "sess-main", updatedAt: now + 1 }, - "quietchat:direct:u1": { sessionId: "sess-main-direct", updatedAt: now + 2 }, - "agent:ops:main": { sessionId: "sess-ops-main", updatedAt: now + 3 }, - "agent:ops:quietchat:direct:u2": { sessionId: "sess-ops-direct", updatedAt: now + 4 }, - }, - }); - - await agentsDeleteCommand({ id: "ops", force: true, json: true }, runtime); - - expect(runtime.exit).not.toHaveBeenCalled(); - expectSessionStore(storePath, { - main: { sessionId: "sess-main", updatedAt: now + 1 }, - "quietchat:direct:u1": { sessionId: "sess-main-direct", updatedAt: now + 2 }, - }); - }); - }); - it("skips workspace removal when another agent shares the same workspace (#70890)", async () => { await withStateDirEnv("openclaw-agents-delete-shared-workspace-", async ({ stateDir }) => { const sharedWorkspace = path.join(stateDir, "workspace-shared"); diff --git a/src/commands/doctor-state-integrity.test.ts b/src/commands/doctor-state-integrity.test.ts index d629c4a402d..3ec16a0af78 100644 --- a/src/commands/doctor-state-integrity.test.ts +++ b/src/commands/doctor-state-integrity.test.ts @@ -4,16 +4,21 @@ import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { HEARTBEAT_TRANSCRIPT_PROMPT } from "../auto-reply/heartbeat.js"; import type { OpenClawConfig } from "../config/config.js"; +import { resolveSessionTranscriptsDirForAgent } from "../config/sessions/paths.js"; import { - resolveStorePath, - resolveSessionTranscriptsDirForAgent, -} from "../config/sessions/paths.js"; -import { loadSessionStore, saveSessionStore } from "../config/sessions/store.js"; + deleteSessionEntry, + listSessionEntries, + upsertSessionEntry, +} from "../config/sessions/store.js"; import { replaceSqliteSessionTranscriptEvents } from "../config/sessions/transcript-store.sqlite.js"; import type { SessionEntry } from "../config/sessions/types.js"; import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; import { clearTuiLastSessionPointers, + readTuiLastSessionKey, + writeTuiLastSessionKey, +} from "../tui/tui-last-session.js"; +import { moveHeartbeatMainSessionEntry, resolveHeartbeatMainSessionRepairCandidate, } from "./doctor-heartbeat-main-session-repair.js"; @@ -58,12 +63,26 @@ function restoreEnv(snapshot: EnvSnapshot) { } } -function setupSessionState(cfg: OpenClawConfig, env: NodeJS.ProcessEnv, homeDir: string) { +function setupSessionState(env: NodeJS.ProcessEnv, homeDir: string) { const agentId = "main"; const sessionsDir = resolveSessionTranscriptsDirForAgent(agentId, env, () => homeDir); - const storePath = resolveStorePath(cfg.session?.store, { agentId }); fs.mkdirSync(sessionsDir, { recursive: true }); - fs.mkdirSync(path.dirname(storePath), { recursive: true }); +} + +function replaceSessionStoreForTest(store: Record): void { + const agentId = "main"; + for (const { sessionKey } of listSessionEntries({ agentId })) { + deleteSessionEntry({ agentId, sessionKey }); + } + for (const [sessionKey, entry] of Object.entries(store)) { + upsertSessionEntry({ agentId, sessionKey, entry }); + } +} + +function readSessionStoreForTest(): Record { + return Object.fromEntries( + listSessionEntries({ agentId: "main" }).map(({ sessionKey, entry }) => [sessionKey, entry]), + ); } function stateIntegrityText(): string { @@ -96,7 +115,7 @@ const OAUTH_PROMPT_MATCHER = expect.objectContaining({ }); async function runStateIntegrity(cfg: OpenClawConfig) { - setupSessionState(cfg, process.env, process.env.HOME ?? ""); + setupSessionState(process.env, process.env.HOME ?? ""); const confirmRuntimeRepair = vi.fn(async () => false); await noteStateIntegrity(cfg, { confirmRuntimeRepair, note: noteMock }); return confirmRuntimeRepair; @@ -106,9 +125,8 @@ async function writeSessionStore( cfg: OpenClawConfig, sessions: Record>, ) { - setupSessionState(cfg, process.env, process.env.HOME ?? ""); - const storePath = resolveStorePath(cfg.session?.store, { agentId: "main" }); - await saveSessionStore(storePath, sessions as Record); + setupSessionState(process.env, process.env.HOME ?? ""); + replaceSessionStoreForTest(sessions as Record); } async function runStateIntegrityText(cfg: OpenClawConfig): Promise { @@ -125,7 +143,7 @@ async function runOrphanTranscriptCheckWithQmdSessions(enabled: boolean, homeDir }, }, }; - setupSessionState(cfg, process.env, homeDir); + setupSessionState(process.env, homeDir); const sessionsDir = resolveSessionTranscriptsDirForAgent("main", process.env, () => homeDir); fs.writeFileSync(path.join(sessionsDir, "orphan-session.jsonl"), '{"type":"session"}\n'); const confirmRuntimeRepair = vi.fn(async () => false); @@ -292,8 +310,7 @@ describe("doctor state integrity oauth dir checks", () => { ); await noteStateIntegrity(cfg, { confirmRuntimeRepair, note: noteMock }); - const storePath = resolveStorePath(cfg.session?.store, { agentId: "main" }); - const persisted = loadSessionStore(storePath) as Record< + const persisted = readSessionStoreForTest() as Record< string, { abortedLastRun?: boolean; updatedAt?: number } >; @@ -365,7 +382,7 @@ describe("doctor state integrity oauth dir checks", () => { it("detects orphan transcripts and offers delete remediation", async () => { const cfg: OpenClawConfig = {}; - setupSessionState(cfg, process.env, process.env.HOME ?? ""); + setupSessionState(process.env, process.env.HOME ?? ""); const sessionsDir = resolveSessionTranscriptsDirForAgent("main", process.env, () => tempHome); fs.writeFileSync(path.join(sessionsDir, "orphan-session.jsonl"), '{"type":"session"}\n'); const confirmRuntimeRepair = vi.fn(async (params: { message: string }) => @@ -373,7 +390,7 @@ describe("doctor state integrity oauth dir checks", () => { ); await noteStateIntegrity(cfg, { confirmRuntimeRepair, note: noteMock }); expect(stateIntegrityText()).toContain( - "These .jsonl files are no longer referenced by sessions.json", + "These legacy .jsonl files are no longer referenced by SQLite session rows", ); expect(stateIntegrityText()).toContain("Examples: orphan-session.jsonl"); expect(confirmRuntimeRepair).toHaveBeenCalledWith( @@ -388,7 +405,7 @@ describe("doctor state integrity oauth dir checks", () => { it("does not auto-delete orphan transcripts from non-interactive repair mode", async () => { const cfg: OpenClawConfig = {}; - setupSessionState(cfg, process.env, process.env.HOME ?? ""); + setupSessionState(process.env, process.env.HOME ?? ""); const sessionsDir = resolveSessionTranscriptsDirForAgent("main", process.env, () => tempHome); fs.writeFileSync(path.join(sessionsDir, "orphan-session.jsonl"), '{"type":"session"}\n'); const confirmRuntimeRepair = vi.fn( @@ -422,7 +439,7 @@ describe("doctor state integrity oauth dir checks", () => { process.env.OPENCLAW_HOME = symlinkHome; process.env.OPENCLAW_STATE_DIR = path.join(symlinkHome, ".openclaw"); - setupSessionState(cfg, process.env, symlinkHome); + setupSessionState(process.env, symlinkHome); const sessionsDir = resolveSessionTranscriptsDirForAgent( "main", process.env, @@ -454,7 +471,7 @@ describe("doctor state integrity oauth dir checks", () => { const confirmRuntimeRepair = await runOrphanTranscriptCheckWithQmdSessions(true, tempHome); expect(stateIntegrityText()).not.toContain( - "These .jsonl files are no longer referenced by sessions.json", + "These legacy .jsonl files are no longer referenced by SQLite session rows", ); expect(confirmRuntimeRepair).not.toHaveBeenCalled(); }); @@ -463,7 +480,7 @@ describe("doctor state integrity oauth dir checks", () => { const confirmRuntimeRepair = await runOrphanTranscriptCheckWithQmdSessions(false, tempHome); expect(stateIntegrityText()).toContain( - "These .jsonl files are no longer referenced by sessions.json", + "These legacy .jsonl files are no longer referenced by SQLite session rows", ); expect(confirmRuntimeRepair).toHaveBeenCalled(); }); @@ -479,15 +496,15 @@ describe("doctor state integrity oauth dir checks", () => { const text = await runStateIntegrityText(cfg); expect(text).toContain("recent sessions are missing transcripts"); expect(text).toContain("openclaw doctor --fix"); - expect(text).toContain("openclaw sessions cleanup --dry-run"); - expect(text).toContain("openclaw sessions cleanup --enforce --fix-missing"); + expect(text).toContain("reset or delete the affected sessions explicitly"); + expect(text).not.toContain("openclaw sessions cleanup"); expect(text).not.toContain("--active"); expect(text).not.toContain(" ls "); }); it("moves a heartbeat-poisoned main session and clears stale TUI restore pointers", async () => { const cfg: OpenClawConfig = {}; - setupSessionState(cfg, process.env, tempHome); + setupSessionState(process.env, tempHome); const sessionsDir = resolveSessionTranscriptsDirForAgent("main", process.env, () => tempHome); const heartbeatTranscriptPath = path.join(sessionsDir, "heartbeat-session.jsonl"); replaceSqliteSessionTranscriptEvents({ @@ -506,31 +523,24 @@ describe("doctor state integrity oauth dir checks", () => { updatedAt: Date.now(), }, }); - const tuiLastSessionPath = path.join( - process.env.OPENCLAW_STATE_DIR ?? "", - "tui", - "last-session.json", - ); - fs.mkdirSync(path.dirname(tuiLastSessionPath), { recursive: true }); - fs.writeFileSync( - tuiLastSessionPath, - JSON.stringify( - { - default: { sessionKey: "agent:main:main", updatedAt: Date.now() }, - telegram: { sessionKey: "agent:main:telegram:thread", updatedAt: Date.now() }, - }, - null, - 2, - ), - ); + const stateDir = process.env.OPENCLAW_STATE_DIR ?? ""; + await writeTuiLastSessionKey({ + stateDir, + scopeKey: "default", + sessionKey: "agent:main:main", + }); + await writeTuiLastSessionKey({ + stateDir, + scopeKey: "telegram", + sessionKey: "agent:main:telegram:thread", + }); const confirmRuntimeRepair = vi.fn(async (params: { message: string }) => params.message.startsWith("Move heartbeat-owned main session"), ); await noteStateIntegrity(cfg, { confirmRuntimeRepair, note: noteMock }); - const storePath = resolveStorePath(cfg.session?.store, { agentId: "main" }); - const store = loadSessionStore(storePath); + const store = readSessionStoreForTest(); const recoveredKey = Object.keys(store).find((key) => key.startsWith("agent:main:heartbeat-recovered-"), ); @@ -538,14 +548,17 @@ describe("doctor state integrity oauth dir checks", () => { expect(recoveredKey).toBeDefined(); expect(store[recoveredKey ?? ""]?.sessionId).toBe("heartbeat-session"); - expect(fs.existsSync(tuiLastSessionPath)).toBe(false); + await expect(readTuiLastSessionKey({ stateDir, scopeKey: "default" })).resolves.toBeNull(); + await expect(readTuiLastSessionKey({ stateDir, scopeKey: "telegram" })).resolves.toBe( + "agent:main:telegram:thread", + ); expect(doctorChangesText()).toContain("Moved heartbeat-owned main session agent:main:main"); expect(doctorChangesText()).toContain("Cleared 1 stale TUI last-session pointer"); }); it("does not move a mixed main transcript that has real user activity", async () => { const cfg: OpenClawConfig = {}; - setupSessionState(cfg, process.env, tempHome); + setupSessionState(process.env, tempHome); const sessionsDir = resolveSessionTranscriptsDirForAgent("main", process.env, () => tempHome); const mixedTranscriptPath = path.join(sessionsDir, "mixed-session.jsonl"); replaceSqliteSessionTranscriptEvents({ @@ -568,8 +581,7 @@ describe("doctor state integrity oauth dir checks", () => { const confirmRuntimeRepair = vi.fn(async () => true); await noteStateIntegrity(cfg, { confirmRuntimeRepair, note: noteMock }); - const storePath = resolveStorePath(cfg.session?.store, { agentId: "main" }); - const store = loadSessionStore(storePath); + const store = readSessionStoreForTest(); expect(store["agent:main:main"]?.sessionId).toBe("mixed-session"); expect(Object.keys(store).some((key) => key.includes("heartbeat-recovered"))).toBe(false); expect(confirmRuntimeRepair).not.toHaveBeenCalledWith( @@ -706,7 +718,7 @@ describe("doctor state integrity oauth dir checks", () => { } }); - it("moves store entries and clears matching TUI pointers without touching others", () => { + it("moves store entries and clears matching TUI pointers without touching others", async () => { const store: Record = { "agent:main:main": { sessionId: "main-session", updatedAt: 1 }, }; @@ -724,26 +736,28 @@ describe("doctor state integrity oauth dir checks", () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-tui-pointer-clear-")); try { - const filePath = path.join(tempDir, "last-session.json"); - fs.writeFileSync( - filePath, - JSON.stringify({ - terminal: { sessionKey: "agent:main:main" }, - telegram: { sessionKey: "agent:main:telegram:thread" }, - }), - ); + await writeTuiLastSessionKey({ + scopeKey: "terminal", + sessionKey: "agent:main:main", + stateDir: tempDir, + }); + await writeTuiLastSessionKey({ + scopeKey: "telegram", + sessionKey: "agent:main:telegram:thread", + stateDir: tempDir, + }); expect( - clearTuiLastSessionPointers({ - filePath, + await clearTuiLastSessionPointers({ + stateDir: tempDir, sessionKeys: new Set(["agent:main:main"]), }), ).toBe(1); - const parsed = JSON.parse(fs.readFileSync(filePath, "utf8")) as Record< - string, - { sessionKey?: string } - >; - expect(parsed.terminal).toBeUndefined(); - expect(parsed.telegram?.sessionKey).toBe("agent:main:telegram:thread"); + await expect( + readTuiLastSessionKey({ scopeKey: "terminal", stateDir: tempDir }), + ).resolves.toBe(null); + await expect( + readTuiLastSessionKey({ scopeKey: "telegram", stateDir: tempDir }), + ).resolves.toBe("agent:main:telegram:thread"); } finally { fs.rmSync(tempDir, { recursive: true, force: true }); } diff --git a/src/config/io.write-config.test.ts b/src/config/io.write-config.test.ts index a7f3e4a1235..295131034d9 100644 --- a/src/config/io.write-config.test.ts +++ b/src/config/io.write-config.test.ts @@ -461,7 +461,7 @@ describe("config io write", () => { homedir: () => home, logger: { warn, error: vi.fn() }, }); - await fs.writeFile(path.join(unwritableStatePath, "plugins"), "not a directory", "utf-8"); + await fs.writeFile(path.join(unwritableStatePath, "state"), "not a directory", "utf-8"); const loadedConfig = io.loadConfig(); expect(loadedConfig.plugins?.installs?.demo).toMatchObject({ @@ -583,7 +583,7 @@ describe("config io write", () => { await io.writeConfigFile({ agents: { list: [{ id: "main", default: true }] }, gateway: { mode: "local" }, - session: { mainKey: "main", store: path.join(overrideDir, "sessions.json") }, + session: { mainKey: "main" }, }); const livePersisted = JSON.parse(await fs.readFile(liveConfigPath, "utf-8")) as { @@ -594,9 +594,9 @@ describe("config io write", () => { const overridePersisted = JSON.parse( await fs.readFile(path.join(overrideDir, "openclaw.json"), "utf-8"), ) as { - session?: { store?: unknown }; + session?: { mainKey?: unknown }; }; - expect(overridePersisted.session?.store).toBe(path.join(overrideDir, "sessions.json")); + expect(overridePersisted.session).toEqual({ mainKey: "main" }); }); }); diff --git a/src/config/sessions/sessions.test.ts b/src/config/sessions/sessions.test.ts index 73bcc75f26b..a2fbbbbf38a 100644 --- a/src/config/sessions/sessions.test.ts +++ b/src/config/sessions/sessions.test.ts @@ -16,12 +16,13 @@ import { import { evaluateSessionFreshness, resolveSessionResetPolicy } from "./reset.js"; import { resolveAndPersistSessionFile } from "./session-file.js"; import { - clearSessionStoreCacheForTest, - loadSessionStore, - saveSessionStore, - updateSessionStore, + getSessionEntry, + listSessionEntries, + patchSessionEntry, + upsertSessionEntry, } from "./store.js"; import { useTempSessionsFixture } from "./test-helpers.js"; +import { replaceSqliteSessionTranscriptEvents } from "./transcript-store.sqlite.js"; import { mergeSessionEntry, mergeSessionEntryWithPolicy, type SessionEntry } from "./types.js"; describe("session path safety", () => { @@ -74,11 +75,9 @@ describe("session path safety", () => { fs.mkdirSync(sessionsDir, { recursive: true }); fs.symlinkSync(realRoot, aliasRoot, "dir"); const viaAlias = path.join(aliasRoot, "agents", "main", "sessions", "sess-1.jsonl"); - fs.writeFileSync(path.join(sessionsDir, "sess-1.jsonl"), ""); const resolved = resolveSessionFilePath("sess-1", { sessionFile: viaAlias }, { sessionsDir }); - expect(fs.realpathSync(resolved)).toBe( - fs.realpathSync(path.join(sessionsDir, "sess-1.jsonl")), - ); + expect(fs.realpathSync(path.dirname(resolved))).toBe(fs.realpathSync(sessionsDir)); + expect(path.basename(resolved)).toBe("sess-1.jsonl"); }); }); @@ -241,26 +240,31 @@ describe("resolveSessionResetPolicy", () => { }); describe("session lifecycle timestamps", () => { - it("falls back to the JSONL session header for legacy session start time", async () => { + it("falls back to the SQLite transcript header for session start time", async () => { const dir = await fsPromises.mkdtemp("/tmp/openclaw-lifecycle-test-"); + const previousStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = dir; try { - const storePath = path.join(dir, "sessions.json"); - const sessionFile = path.join(dir, "legacy-session.jsonl"); + const storePath = path.join(dir, "agents", "main", "sessions", "sessions.json"); + const sessionFile = path.join(path.dirname(storePath), "legacy-session.jsonl"); const headerTimestamp = "2026-04-20T04:30:00.000Z"; - await fsPromises.writeFile( - sessionFile, - `${JSON.stringify({ - type: "session", - version: 3, - id: "legacy-session", - timestamp: headerTimestamp, - cwd: dir, - })}\n`, - "utf8", - ); + replaceSqliteSessionTranscriptEvents({ + agentId: "main", + sessionId: "legacy-session", + transcriptPath: sessionFile, + events: [ + { + type: "session", + version: 3, + id: "legacy-session", + timestamp: headerTimestamp, + cwd: dir, + }, + ], + }); const timestamps = resolveSessionLifecycleTimestamps({ - storePath, + agentId: "main", entry: { sessionId: "legacy-session", sessionFile, @@ -270,110 +274,160 @@ describe("session lifecycle timestamps", () => { expect(timestamps.sessionStartedAt).toBe(Date.parse(headerTimestamp)); } finally { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } + await fsPromises.rm(dir, { recursive: true, force: true }); + } + }); + + it("ignores legacy transcript files that were not imported", async () => { + const dir = await fsPromises.mkdtemp("/tmp/openclaw-lifecycle-test-"); + const previousStateDir = process.env.OPENCLAW_STATE_DIR; + process.env.OPENCLAW_STATE_DIR = dir; + try { + const storePath = path.join(dir, "agents", "main", "sessions", "sessions.json"); + const sessionFile = path.join(path.dirname(storePath), "legacy-session.jsonl"); + await fsPromises.mkdir(path.dirname(sessionFile), { recursive: true }); + await fsPromises.writeFile( + sessionFile, + `${JSON.stringify({ + type: "session", + version: 3, + id: "legacy-session", + timestamp: "2026-04-20T04:30:00.000Z", + cwd: dir, + })}\n`, + "utf8", + ); + + const timestamps = resolveSessionLifecycleTimestamps({ + agentId: "main", + entry: { + sessionId: "legacy-session", + sessionFile, + updatedAt: Date.parse("2026-04-25T08:00:00.000Z"), + }, + }); + + expect(timestamps.sessionStartedAt).toBeUndefined(); + } finally { + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } await fsPromises.rm(dir, { recursive: true, force: true }); } }); }); -describe("session store writer queue", () => { - const writerFixtureRootTracker = createSuiteTempRootTracker({ prefix: "openclaw-writer-test-" }); +describe("SQLite session store patch retries", () => { + const patchFixtureRootTracker = createSuiteTempRootTracker({ prefix: "openclaw-patch-test-" }); + const previousStateDir = process.env.OPENCLAW_STATE_DIR; async function makeTmpStore( initial: Record = {}, - ): Promise<{ dir: string; storePath: string }> { - const dir = await writerFixtureRootTracker.make("case"); - const storePath = path.join(dir, "agents", "main", "sessions", "sessions.json"); - if (Object.keys(initial).length > 0) { - await saveSessionStore(storePath, initial as Record); + options: { agentId?: string } = {}, + ): Promise<{ dir: string; agentId: string; storePath: string }> { + const dir = await patchFixtureRootTracker.make("case"); + process.env.OPENCLAW_STATE_DIR = dir; + const agentId = options.agentId ?? "main"; + const storePath = path.join(dir, "agents", agentId, "sessions", "sessions.json"); + for (const [sessionKey, entry] of Object.entries(initial)) { + upsertSessionEntry({ agentId, sessionKey, entry: entry as SessionEntry }); } - return { dir, storePath }; + return { dir, agentId, storePath }; + } + + function readSessionEntries(agentId = "main"): Record { + return Object.fromEntries( + listSessionEntries({ agentId }).map(({ sessionKey, entry }) => [sessionKey, entry]), + ); } beforeAll(async () => { - await writerFixtureRootTracker.setup(); + await patchFixtureRootTracker.setup(); }); afterAll(async () => { - await writerFixtureRootTracker.cleanup(); + await patchFixtureRootTracker.cleanup(); }); afterEach(async () => { - clearSessionStoreCacheForTest(); + if (previousStateDir === undefined) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } }); - it("serializes concurrent updateSessionStore calls without data loss", async () => { + it("serializes concurrent patchSessionEntry calls without data loss", async () => { const key = "agent:main:test"; - const { storePath } = await makeTmpStore({ - [key]: { sessionId: "s1", updatedAt: Date.now(), counter: 0 }, + const { agentId } = await makeTmpStore({ + [key]: { sessionId: "s1", updatedAt: Date.now(), heartbeatTaskState: { counter: 0 } }, }); const N = 4; await Promise.all( Array.from({ length: N }, (_, i) => - updateSessionStore(storePath, async (store) => { - const entry = store[key] as Record; - await Promise.resolve(); - entry.counter = (entry.counter as number) + 1; - entry.tag = `writer-${i}`; + patchSessionEntry({ + agentId, + sessionKey: key, + update: async (entry) => { + const current = entry.heartbeatTaskState?.counter ?? 0; + await Promise.resolve(); + return { + heartbeatTaskState: { counter: current + 1, [`patch-${i}`]: i }, + }; + }, }), ), ); - const store = loadSessionStore(storePath); - expect((store[key] as Record).counter).toBe(N); + const store = readSessionEntries(agentId); + expect(store[key]?.heartbeatTaskState?.counter).toBe(N); }); - it("persists SQLite stores even when payload is unchanged", async () => { + it("keeps SQLite rows when a patch returns no changes", async () => { const key = "agent:main:no-op-save"; - const { storePath } = await makeTmpStore({ + const { agentId } = await makeTmpStore({ [key]: { sessionId: "s-noop", updatedAt: Date.now() }, }); - await updateSessionStore(storePath, async () => { - // Intentionally no-op mutation. - }); - expect(loadSessionStore(storePath)[key]?.sessionId).toBe("s-noop"); - }); - - it("keeps session store writes atomic while skipping durable fsync inside the writer lock", async () => { - const key = "agent:main:no-fsync"; - const { storePath } = await makeTmpStore({ - [key]: { sessionId: "s-no-fsync", updatedAt: Date.now(), counter: 0 }, - }); - - const writeSpy = vi.spyOn(jsonFiles, "writeTextAtomic"); - await updateSessionStore( - storePath, - async (store) => { - const entry = store[key] as Record; - entry.counter = 1; + await patchSessionEntry({ + agentId, + sessionKey: key, + update: async () => { + // Intentionally no-op mutation. + return null; }, - { skipMaintenance: true }, - ); - - expect(writeSpy).toHaveBeenCalledTimes(1); - expect(writeSpy).toHaveBeenCalledWith( - storePath, - expect.any(String), - expect.objectContaining({ durable: false, mode: 0o600 }), - ); - writeSpy.mockRestore(); + }); + expect(getSessionEntry({ agentId, sessionKey: key })?.sessionId).toBe("s-noop"); }); - it("multiple consecutive errors do not permanently poison the queue", async () => { + it("multiple consecutive errors do not block later writes", async () => { const key = "agent:main:multi-err"; - const { storePath } = await makeTmpStore({ + const { agentId } = await makeTmpStore({ [key]: { sessionId: "s1", updatedAt: Date.now() }, }); const errors = Array.from({ length: 3 }, (_, i) => - updateSessionStore(storePath, async () => { - throw new Error(`fail-${i}`); + patchSessionEntry({ + agentId, + sessionKey: key, + update: async () => { + throw new Error(`fail-${i}`); + }, }), ); - const success = updateSessionStore(storePath, async (store) => { - store[key] = { ...store[key], modelOverride: "recovered" } as unknown as SessionEntry; + const success = patchSessionEntry({ + agentId, + sessionKey: key, + update: async () => ({ modelOverride: "recovered" }), }); for (const [index, p] of errors.entries()) { @@ -381,7 +435,7 @@ describe("session store writer queue", () => { } await success; - const store = loadSessionStore(storePath); + const store = readSessionEntries(agentId); expect(store[key]?.modelOverride).toBe("recovered"); }); @@ -433,7 +487,7 @@ describe("session store writer queue", () => { it("normalizes orphan modelProvider fields at store write boundary", async () => { const key = "agent:main:orphan-provider"; - const { storePath } = await makeTmpStore({ + const { agentId } = await makeTmpStore({ [key]: { sessionId: "sess-orphan", updatedAt: 100, @@ -441,17 +495,12 @@ describe("session store writer queue", () => { }, }); - await updateSessionStore(storePath, async (store) => { - const entry = store[key]; - entry.updatedAt = Date.now(); - }); - - const store = loadSessionStore(storePath); + const store = readSessionEntries(agentId); expect(store[key]?.modelProvider).toBeUndefined(); expect(store[key]?.model).toBeUndefined(); }); - it("preserves ACP metadata when replacing a session entry wholesale", async () => { + it("preserves ACP metadata when patching a session entry", async () => { const key = "agent:codex:acp:binding:discord:default:feedface"; const acp = { backend: "acpx", @@ -461,7 +510,7 @@ describe("session store writer queue", () => { state: "idle" as const, lastActivityAt: 100, }; - const { storePath } = await makeTmpStore({ + const { agentId } = await makeTmpStore({ [key]: { sessionId: "sess-acp", updatedAt: Date.now(), @@ -469,16 +518,19 @@ describe("session store writer queue", () => { }, }); - await updateSessionStore(storePath, (store) => { - store[key] = { - sessionId: "sess-acp", - updatedAt: Date.now(), - modelProvider: "openai-codex", - model: "gpt-5.4", - }; + await patchSessionEntry({ + agentId, + sessionKey: key, + update: () => { + return { + updatedAt: Date.now(), + modelProvider: "openai-codex", + model: "gpt-5.4", + }; + }, }); - const store = loadSessionStore(storePath); + const store = readSessionEntries(agentId); expect(store[key]?.acp).toEqual(acp); expect(store[key]?.modelProvider).toBe("openai-codex"); expect(store[key]?.model).toBe("gpt-5.4"); @@ -486,24 +538,25 @@ describe("session store writer queue", () => { it("allows explicit ACP metadata removal through the ACP session helper", async () => { const key = "agent:codex:acp:binding:discord:default:deadbeef"; - const { storePath } = await makeTmpStore({ - [key]: { - sessionId: "sess-acp-clear", - updatedAt: 100, - acp: { - backend: "acpx", - agent: "codex", - runtimeSessionName: "codex-discord", - mode: "persistent", - state: "idle", - lastActivityAt: 100, + const { agentId, storePath } = await makeTmpStore( + { + [key]: { + sessionId: "sess-acp-clear", + updatedAt: 100, + acp: { + backend: "acpx", + agent: "codex", + runtimeSessionName: "codex-discord", + mode: "persistent", + state: "idle", + lastActivityAt: 100, + }, }, }, - }); + { agentId: "codex" }, + ); const cfg = { - session: { - store: storePath, - }, + session: {}, } as OpenClawConfig; const result = await upsertAcpSessionMeta({ @@ -513,14 +566,25 @@ describe("session store writer queue", () => { }); expect(result?.acp).toBeUndefined(); - const store = loadSessionStore(storePath); - expect(store[key]?.acp).toBeUndefined(); + expect(getSessionEntry({ agentId, sessionKey: key })?.acp).toBeUndefined(); }); }); describe("resolveAndPersistSessionFile", () => { const fixture = useTempSessionsFixture("session-file-test-"); + function readFixtureSessionEntries(): Record { + return Object.fromEntries( + listSessionEntries({ agentId: "main" }).map(({ sessionKey, entry }) => [sessionKey, entry]), + ); + } + + function seedFixtureSessionEntries(store: Record): void { + for (const [sessionKey, entry] of Object.entries(store)) { + upsertSessionEntry({ agentId: "main", sessionKey, entry }); + } + } + it("persists fallback topic transcript paths for sessions without sessionFile", async () => { const sessionId = "topic-session-id"; const sessionKey = "agent:main:telegram:group:123:topic:456"; @@ -530,8 +594,8 @@ describe("resolveAndPersistSessionFile", () => { updatedAt: Date.now(), }, }; - await saveSessionStore(fixture.storePath(), store); - const sessionStore = loadSessionStore(fixture.storePath()); + seedFixtureSessionEntries(store); + const sessionStore = readFixtureSessionEntries(); const fallbackSessionFile = resolveSessionTranscriptPathInDir( sessionId, fixture.sessionsDir(), @@ -542,35 +606,34 @@ describe("resolveAndPersistSessionFile", () => { sessionId, sessionKey, sessionStore, - storePath: fixture.storePath(), sessionEntry: sessionStore[sessionKey], + agentId: "main", fallbackSessionFile, }); expect(result.sessionFile).toBe(fallbackSessionFile); - const saved = loadSessionStore(fixture.storePath()); + const saved = readFixtureSessionEntries(); expect(saved[sessionKey]?.sessionFile).toBe(fallbackSessionFile); }); it("creates and persists entry when session is not yet present", async () => { const sessionId = "new-session-id"; const sessionKey = "agent:main:telegram:group:123"; - await saveSessionStore(fixture.storePath(), {}); - const sessionStore = loadSessionStore(fixture.storePath()); + const sessionStore = readFixtureSessionEntries(); const fallbackSessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir()); const result = await resolveAndPersistSessionFile({ sessionId, sessionKey, sessionStore, - storePath: fixture.storePath(), + agentId: "main", fallbackSessionFile, }); expect(result.sessionFile).toBe(fallbackSessionFile); expect(result.sessionEntry.sessionId).toBe(sessionId); - const saved = loadSessionStore(fixture.storePath()); + const saved = readFixtureSessionEntries(); expect(saved[sessionKey]?.sessionFile).toBe(fallbackSessionFile); }); @@ -593,15 +656,15 @@ describe("resolveAndPersistSessionFile", () => { sessionFile: previousSessionFile, }, }; - await saveSessionStore(fixture.storePath(), store); - const sessionStore = loadSessionStore(fixture.storePath()); + seedFixtureSessionEntries(store); + const sessionStore = readFixtureSessionEntries(); const result = await resolveAndPersistSessionFile({ sessionId: nextSessionId, sessionKey, sessionStore, - storePath: fixture.storePath(), sessionEntry: sessionStore[sessionKey], + agentId: "main", sessionsDir: fixture.sessionsDir(), }); @@ -609,7 +672,7 @@ describe("resolveAndPersistSessionFile", () => { expect(result.sessionFile).not.toBe(previousSessionFile); expect(result.sessionEntry.sessionFile).toBe(expectedNextSessionFile); - const saved = loadSessionStore(fixture.storePath()); + const saved = readFixtureSessionEntries(); expect(saved[sessionKey]?.sessionFile).toBe(expectedNextSessionFile); }); }); diff --git a/src/cron/isolated-agent.test-harness.ts b/src/cron/isolated-agent.test-harness.ts index 09861c17b27..feb17c11c19 100644 --- a/src/cron/isolated-agent.test-harness.ts +++ b/src/cron/isolated-agent.test-harness.ts @@ -1,18 +1,36 @@ -import fs from "node:fs/promises"; import path from "node:path"; import { withTempHome as withTempHomeBase } from "openclaw/plugin-sdk/test-env"; +import { upsertSessionEntry } from "../config/sessions/store.js"; +import type { SessionEntry } from "../config/sessions/types.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; import type { CronJob } from "./types.js"; export async function withTempCronHome(fn: (home: string) => Promise): Promise { - return withTempHomeBase(fn, { prefix: "openclaw-cron-" }); + return withTempHomeBase( + async (home) => { + try { + return await fn(home); + } finally { + closeOpenClawStateDatabaseForTest(); + } + }, + { prefix: "openclaw-cron-" }, + ); } -export async function writeSessionStore( +function cronTestEnv(home: string): NodeJS.ProcessEnv { + return { + ...process.env, + OPENCLAW_STATE_DIR: path.join(home, ".openclaw"), + }; +} + +export async function seedMainRouteSession( home: string, session: { lastProvider: string; lastTo: string; lastChannel?: string }, -): Promise { - return writeSessionStoreEntries(home, { +): Promise { + await seedSessionRows(home, { "agent:main:main": { sessionId: "main-session", updatedAt: Date.now(), @@ -21,22 +39,30 @@ export async function writeSessionStore( }); } -export async function writeSessionStoreEntries( +export async function seedSessionRows( home: string, entries: Record>, -): Promise { - const dir = path.join(home, ".openclaw", "sessions"); - await fs.mkdir(dir, { recursive: true }); - const storePath = path.join(dir, "sessions.json"); - await fs.writeFile(storePath, JSON.stringify(entries, null, 2), "utf-8"); - return storePath; + agentId = "main", +): Promise { + const env = cronTestEnv(home); + for (const [sessionKey, entry] of Object.entries(entries)) { + upsertSessionEntry({ + agentId, + env, + sessionKey, + entry: entry as SessionEntry, + }); + } } export function makeCfg( home: string, - storePath: string, + storePathOrOverrides: string | Partial = {}, overrides: Partial = {}, ): OpenClawConfig { + const storePath = typeof storePathOrOverrides === "string" ? storePathOrOverrides : undefined; + const resolvedOverrides = + typeof storePathOrOverrides === "string" ? overrides : storePathOrOverrides; const base: OpenClawConfig = { agents: { defaults: { @@ -44,9 +70,11 @@ export function makeCfg( workspace: path.join(home, "openclaw"), }, }, - session: { store: storePath, mainKey: "main" }, + session: { + mainKey: "main", + }, } as OpenClawConfig; - return { ...base, ...overrides }; + return { ...base, ...resolvedOverrides }; } export function makeJob(payload: CronJob["payload"]): CronJob { diff --git a/src/gateway/server-methods/agent.create-event.test.ts b/src/gateway/server-methods/agent.create-event.test.ts index 38f4c2fff60..7aa07c2c37b 100644 --- a/src/gateway/server-methods/agent.create-event.test.ts +++ b/src/gateway/server-methods/agent.create-event.test.ts @@ -2,6 +2,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { closeOpenClawAgentDatabasesForTest } from "../../state/openclaw-agent-db.js"; const configMocks = vi.hoisted(() => ({ storePath: "", @@ -15,7 +16,6 @@ const configMocks = vi.hoisted(() => ({ }, session: { mainKey: "main", - store: configMocks.storePath, }, })), })); @@ -48,16 +48,18 @@ describe("agent handler session create events", () => { beforeEach(async () => { tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-agent-create-event-")); - storePath = path.join(tempDir, "sessions.json"); + vi.stubEnv("OPENCLAW_STATE_DIR", tempDir); + storePath = path.join(tempDir, "agents", "main", "sessions", "sessions.json"); configMocks.storePath = storePath; configMocks.workspaceDir = tempDir; configMocks.getRuntimeConfig.mockClear(); agentIngressMocks.agentCommandFromIngress.mockClear(); agentIngressMocks.agentCommandFromIngress.mockResolvedValue({ ok: true }); - await fs.writeFile(storePath, "{}\n", "utf8"); }); afterEach(async () => { + closeOpenClawAgentDatabasesForTest(); + vi.unstubAllEnvs(); await fs.rm(tempDir, { recursive: true, force: true }); vi.restoreAllMocks(); }); diff --git a/src/gateway/server.chat.gateway-server-chat-b.test.ts b/src/gateway/server.chat.gateway-server-chat-b.test.ts index 93070ac7487..b707f0190f5 100644 --- a/src/gateway/server.chat.gateway-server-chat-b.test.ts +++ b/src/gateway/server.chat.gateway-server-chat-b.test.ts @@ -4,6 +4,8 @@ import path from "node:path"; import { afterAll, beforeAll, describe, expect, test, vi } from "vitest"; import type { GetReplyOptions } from "../auto-reply/get-reply-options.types.js"; import { clearConfigCache } from "../config/config.js"; +import { getSessionEntry } from "../config/sessions.js"; +import { replaceSqliteSessionTranscriptEvents } from "../config/sessions/transcript-store.sqlite.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; import { __setMaxChatHistoryMessagesBytesForTest } from "./server-constants.js"; import type { GatewayRequestContext, RespondFn } from "./server-methods/shared-types.js"; @@ -17,7 +19,7 @@ import { onceMessage, rpcReq, testState, - writeSessionStore, + seedGatewaySessionEntries, } from "./test-helpers.js"; installGatewayTestHooks({ scope: "suite" }); @@ -63,6 +65,10 @@ function createDeferred() { return { promise, resolve, reject }; } +function resolveMainTranscriptPath(root: string): string { + return path.join(root, "agents", "main", "sessions", "sess-main.jsonl"); +} + async function withGatewayChatHarness( run: (ctx: { ws: GatewaySocket; createSessionDir: () => Promise }) => Promise, ) { @@ -71,7 +77,6 @@ async function withGatewayChatHarness( const createSessionDir = async () => { const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); tempDirs.push(sessionDir); - testState.sessionStorePath = path.join(sessionDir, "sessions.json"); return sessionDir; }; @@ -80,7 +85,6 @@ async function withGatewayChatHarness( } finally { __setMaxChatHistoryMessagesBytesForTest(); clearConfigCache(); - testState.sessionStorePath = undefined; ws.close(); await Promise.all( tempDirs.map((dir) => @@ -90,8 +94,8 @@ async function withGatewayChatHarness( } } -async function writeMainSessionStore() { - await writeSessionStore({ +async function seedMainSessionEntry() { + await seedGatewaySessionEntries({ entries: { main: { sessionId: "sess-main", updatedAt: Date.now() }, }, @@ -109,7 +113,12 @@ async function writeGatewayConfig(config: Record) { } async function writeMainSessionTranscript(sessionDir: string, lines: string[]) { - await fs.writeFile(path.join(sessionDir, "sess-main.jsonl"), `${lines.join("\n")}\n`, "utf-8"); + replaceSqliteSessionTranscriptEvents({ + agentId: "main", + sessionId: "sess-main", + transcriptPath: resolveMainTranscriptPath(sessionDir), + events: lines.map((line) => JSON.parse(line) as unknown), + }); } async function fetchHistoryMessages( @@ -138,7 +147,7 @@ async function prepareMainHistoryHarness(params: { } await connectOk(params.ws); const sessionDir = await params.createSessionDir(); - await writeMainSessionStore(); + await seedMainSessionEntry(); return sessionDir; } @@ -146,11 +155,10 @@ describe("gateway server chat", () => { test("chat.history does not wait for model catalog discovery to return history", async () => { const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); try { - testState.sessionStorePath = path.join(sessionDir, "sessions.json"); testState.agentConfig = { model: { primary: "test-provider/slow-catalog-model" }, }; - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { main: { sessionId: "sess-main", @@ -206,7 +214,6 @@ describe("gateway server chat", () => { } finally { clearConfigCache(); testState.agentConfig = undefined; - testState.sessionStorePath = undefined; await fs.rm(sessionDir, { recursive: true, force: true }); } }); @@ -215,8 +222,7 @@ describe("gateway server chat", () => { const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); const dispatchRelease = createDeferred(); try { - testState.sessionStorePath = path.join(sessionDir, "sessions.json"); - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { main: { sessionId: "sess-main", @@ -329,7 +335,6 @@ describe("gateway server chat", () => { } finally { dispatchRelease.resolve(); dispatchInboundMessageMock.mockReset(); - testState.sessionStorePath = undefined; clearConfigCache(); await fs.rm(sessionDir, { recursive: true, force: true }); } @@ -339,8 +344,7 @@ describe("gateway server chat", () => { const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); const dispatchRelease = createDeferred(); try { - testState.sessionStorePath = path.join(sessionDir, "sessions.json"); - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { main: { sessionId: "sess-main", @@ -436,7 +440,6 @@ describe("gateway server chat", () => { } finally { dispatchRelease.resolve(); dispatchInboundMessageMock.mockReset(); - testState.sessionStorePath = undefined; clearConfigCache(); await fs.rm(sessionDir, { recursive: true, force: true }); } @@ -445,8 +448,7 @@ describe("gateway server chat", () => { test("chat.send starts the next WebChat turn after the prior internal run finishes", async () => { const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); try { - testState.sessionStorePath = path.join(sessionDir, "sessions.json"); - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { main: { sessionId: "sess-main", @@ -539,7 +541,6 @@ describe("gateway server chat", () => { expect(context.addChatRun).toHaveBeenCalledTimes(2); } finally { dispatchInboundMessageMock.mockReset(); - testState.sessionStorePath = undefined; clearConfigCache(); await fs.rm(sessionDir, { recursive: true, force: true }); } @@ -589,7 +590,7 @@ describe("gateway server chat", () => { ); process.env.HOME = homeDir; try { - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { main: { sessionId: "sess-main", @@ -653,7 +654,7 @@ describe("gateway server chat", () => { expect(bytes).toBeLessThanOrEqual(historyMaxBytes); expect(messages.length).toBeLessThan(45); - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { main: { sessionId: "sess-main", @@ -671,16 +672,9 @@ describe("gateway server chat", () => { }); expect(sendRes.ok).toBe(true); - const sessionStorePath = testState.sessionStorePath; - if (!sessionStorePath) { - throw new Error("expected session store path"); - } - const stored = JSON.parse(await fs.readFile(sessionStorePath, "utf-8")) as Record< - string, - { lastChannel?: string; lastTo?: string } | undefined - >; - expect(stored["agent:main:main"]?.lastChannel).toBe("whatsapp"); - expect(stored["agent:main:main"]?.lastTo).toBe("+1555"); + const stored = getSessionEntry({ agentId: "main", sessionKey: "agent:main:main" }); + expect(stored?.lastChannel).toBe("whatsapp"); + expect(stored?.lastTo).toBe("+1555"); }); }); @@ -690,7 +684,7 @@ describe("gateway server chat", () => { await connectOk(ws); await createSessionDir(); - await writeMainSessionStore(); + await seedMainSessionEntry(); testState.agentConfig = { blockStreamingDefault: "on" }; try { let capturedOpts: GetReplyOptions | undefined; @@ -818,7 +812,7 @@ describe("gateway server chat", () => { await connectOk(ws); const sessionDir = await createSessionDir(); - await writeMainSessionStore(); + await seedMainSessionEntry(); await writeMainSessionTranscript(sessionDir, [ JSON.stringify({ @@ -849,7 +843,7 @@ describe("gateway server chat", () => { await connectOk(ws); const sessionDir = await createSessionDir(); - await writeMainSessionStore(); + await seedMainSessionEntry(); const lines = [ JSON.stringify({ @@ -1059,7 +1053,7 @@ describe("gateway server chat", () => { await connectOk(ws); await createSessionDir(); - await writeMainSessionStore(); + await seedMainSessionEntry(); mockGetReplyFromConfigOnce(async (_ctx, opts) => { opts?.onAgentRunStart?.(opts.runId ?? "idem-abort-1"); diff --git a/src/gateway/server.chat.gateway-server-chat.test.ts b/src/gateway/server.chat.gateway-server-chat.test.ts index b809c57d36a..bf9daacfec1 100644 --- a/src/gateway/server.chat.gateway-server-chat.test.ts +++ b/src/gateway/server.chat.gateway-server-chat.test.ts @@ -3,6 +3,8 @@ import os from "node:os"; import path from "node:path"; import { beforeEach, describe, expect, test, vi } from "vitest"; import { WebSocket } from "ws"; +import { getSessionEntry } from "../config/sessions.js"; +import { replaceSqliteSessionTranscriptEvents } from "../config/sessions/transcript-store.sqlite.js"; import { emitAgentEvent, registerAgentRunContext } from "../infra/agent-events.js"; import { extractFirstTextBlock } from "../shared/chat-message-content.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; @@ -17,7 +19,7 @@ import { testState, trackConnectChallengeNonce, withGatewayServer, - writeSessionStore, + seedGatewaySessionEntries, } from "./test-helpers.js"; import { agentCommand } from "./test-helpers.runtime-state.js"; import { installConnectedControlUiServerSuite } from "./test-with-server.js"; @@ -28,6 +30,10 @@ const CHAT_RESPONSE_TIMEOUT_MS = 10_000; let ws: WebSocket; let port: number; +function resolveMainTranscriptPath(root: string): string { + return path.join(root, "agents", "main", "sessions", "sess-main.jsonl"); +} + installConnectedControlUiServerSuite((started) => { ws = started.ws; port = started.port; @@ -87,8 +93,10 @@ describe("gateway server chat", () => { messages: Array>, ): Promise => { return withMainSessionStore(async (dir) => { - const lines = messages.map((message) => JSON.stringify({ message })); - await fs.writeFile(path.join(dir, "sess-main.jsonl"), lines.join("\n"), "utf-8"); + writeMainSessionTranscript( + dir, + messages.map((message) => ({ message })), + ); const res = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", { sessionKey: "main", @@ -100,9 +108,10 @@ describe("gateway server chat", () => { const withMainSessionStore = async (run: (dir: string) => Promise): Promise => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); + const previousStateDir = process.env.OPENCLAW_STATE_DIR; try { - testState.sessionStorePath = path.join(dir, "sessions.json"); - await writeSessionStore({ + process.env.OPENCLAW_STATE_DIR = dir; + await seedGatewaySessionEntries({ entries: { main: { sessionId: "sess-main", @@ -112,11 +121,24 @@ describe("gateway server chat", () => { }); return await run(dir); } finally { - testState.sessionStorePath = undefined; + if (previousStateDir == null) { + delete process.env.OPENCLAW_STATE_DIR; + } else { + process.env.OPENCLAW_STATE_DIR = previousStateDir; + } await removeTempDir(dir); } }; + const writeMainSessionTranscript = (dir: string, events: unknown[]): void => { + replaceSqliteSessionTranscriptEvents({ + agentId: "main", + sessionId: "sess-main", + transcriptPath: resolveMainTranscriptPath(dir), + events, + }); + }; + const collectHistoryTextValues = (historyMessages: unknown[]) => historyMessages .map((message) => { @@ -206,9 +228,8 @@ describe("gateway server chat", () => { test("sessions.send accepts dashboard messages for existing sessions", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-send-")); - testState.sessionStorePath = path.join(dir, "sessions.json"); try { - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { "agent:main:dashboard:test-send": { sessionId: "sess-dashboard-send", @@ -226,16 +247,14 @@ describe("gateway server chat", () => { expect(res.payload?.runId).toBe("idem-sessions-send-1"); expect(res.payload?.messageSeq).toBe(1); } finally { - testState.sessionStorePath = undefined; await removeTempDir(dir); } }); test("sessions.steer accepts dashboard follow-up messages for existing sessions", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-steer-")); - testState.sessionStorePath = path.join(dir, "sessions.json"); try { - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { "agent:main:dashboard:test-steer": { sessionId: "sess-dashboard-steer", @@ -253,16 +272,14 @@ describe("gateway server chat", () => { expect(res.payload?.runId).toBe("idem-sessions-steer-1"); expect(res.payload?.messageSeq).toBe(1); } finally { - testState.sessionStorePath = undefined; await removeTempDir(dir); } }); test("sessions.abort stops active dashboard runs", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-abort-")); - testState.sessionStorePath = path.join(dir, "sessions.json"); try { - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { "agent:main:dashboard:test-abort": { sessionId: "sess-dashboard-abort", @@ -328,16 +345,14 @@ describe("gateway server chat", () => { expect(abortRes.payload?.abortedRunId).toBeNull(); } } finally { - testState.sessionStorePath = undefined; await removeTempDir(dir); } }); test("sessions.abort resolves active runs by runId without a caller session key", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-sessions-abort-runid-")); - testState.sessionStorePath = path.join(dir, "sessions.json"); try { - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { "agent:main:dashboard:test-abort-runid": { sessionId: "sess-dashboard-abort-runid", @@ -363,7 +378,6 @@ describe("gateway server chat", () => { expect(abortRes.payload?.abortedRunId).toBe("idem-sessions-abort-runid-1"); } } finally { - testState.sessionStorePath = undefined; await removeTempDir(dir); } }); @@ -436,7 +450,6 @@ describe("gateway server chat", () => { const sendPolicyDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); tempDirs.push(sendPolicyDir); - testState.sessionStorePath = path.join(sendPolicyDir, "sessions.json"); testState.sessionConfig = { sendPolicy: { default: "allow", @@ -449,7 +462,7 @@ describe("gateway server chat", () => { }, }; - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { "discord:group:dev": { sessionId: "sess-discord", @@ -469,13 +482,10 @@ describe("gateway server chat", () => { expect((blockedRes.error as { message?: string } | undefined)?.message ?? "").toMatch( /send blocked/i, ); - - testState.sessionStorePath = undefined; testState.sessionConfig = undefined; const agentBlockedDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); tempDirs.push(agentBlockedDir); - testState.sessionStorePath = path.join(agentBlockedDir, "sessions.json"); testState.sessionConfig = { sendPolicy: { default: "allow", @@ -483,7 +493,7 @@ describe("gateway server chat", () => { }, }; - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { "cron:job-1": { sessionId: "sess-cron", @@ -502,8 +512,6 @@ describe("gateway server chat", () => { expect(agentAllowedRes.payload?.status).toBe("accepted"); expect(agentAllowedRes.payload?.runId).toBe("idem-2"); await vi.waitFor(() => expect(agentCommand).toHaveBeenCalled()); - - testState.sessionStorePath = undefined; testState.sessionConfig = undefined; const pngB64 = @@ -572,8 +580,7 @@ describe("gateway server chat", () => { const historyDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); tempDirs.push(historyDir); - testState.sessionStorePath = path.join(historyDir, "sessions.json"); - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { main: { sessionId: "sess-main", @@ -594,7 +601,10 @@ describe("gateway server chat", () => { }), ); } - await fs.writeFile(path.join(historyDir, "sess-main.jsonl"), lines.join("\n"), "utf-8"); + writeMainSessionTranscript( + historyDir, + lines.map((line) => JSON.parse(line) as unknown), + ); const defaultRes = await rpcReq<{ messages?: unknown[] }>(ws, "chat.history", { sessionKey: "main", @@ -605,7 +615,6 @@ describe("gateway server chat", () => { expect(extractFirstTextBlock(defaultMsgs[0])).toBe("m1"); } finally { testState.agentConfig = undefined; - testState.sessionStorePath = undefined; testState.sessionConfig = undefined; if (webchatWs) { webchatWs.close(); @@ -740,17 +749,15 @@ describe("gateway server chat", () => { test("routes /btw replies through side-result events without transcript injection", async () => { await withMainSessionStore(async (dir) => { - await fs.writeFile( - path.join(dir, "sess-main.jsonl"), - `${JSON.stringify({ + writeMainSessionTranscript(dir, [ + { message: { role: "user", content: [{ type: "text", text: "main thread context" }], timestamp: Date.now(), }, - })}\n`, - "utf-8", - ); + }, + ]); dispatchInboundMessageMock.mockImplementationOnce(async (...args: unknown[]) => { const [params] = args as [ { @@ -828,17 +835,15 @@ describe("gateway server chat", () => { test("routes block-streamed /btw replies through side-result events", async () => { await withMainSessionStore(async (dir) => { - await fs.writeFile( - path.join(dir, "sess-main.jsonl"), - `${JSON.stringify({ + writeMainSessionTranscript(dir, [ + { message: { role: "assistant", content: [{ type: "text", text: "existing context" }], timestamp: Date.now(), }, - })}\n`, - "utf-8", - ); + }, + ]); dispatchInboundMessageMock.mockImplementationOnce(async (...args: unknown[]) => { const [params] = args as [ { @@ -1023,7 +1028,6 @@ describe("gateway server chat", () => { test("chat.history uses the owning agent thinkingDefault for non-default agent sessions", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); try { - testState.sessionStorePath = path.join(dir, "sessions.json"); testState.agentConfig = { model: { primary: "openai/gpt-5" }, thinkingDefault: "low", @@ -1034,7 +1038,7 @@ describe("gateway server chat", () => { { id: "alpha", thinkingDefault: "minimal" }, ], }; - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { "agent:alpha:main": { sessionId: "sess-alpha", @@ -1054,7 +1058,6 @@ describe("gateway server chat", () => { } finally { testState.agentConfig = undefined; testState.agentsConfig = undefined; - testState.sessionStorePath = undefined; await removeTempDir(dir); } }); @@ -1086,13 +1089,8 @@ describe("gateway server chat", () => { expect(waitRes.ok).toBe(true); expect(waitRes.payload?.status).toBe("ok"); - const raw = await fs.readFile(testState.sessionStorePath!, "utf-8"); - const stored = JSON.parse(raw) as { - "agent:main:main"?: { - verboseLevel?: string; - }; - }; - expect(stored["agent:main:main"]?.verboseLevel).toBeUndefined(); + const stored = getSessionEntry({ agentId: "main", sessionKey: "agent:main:main" }); + expect(stored?.verboseLevel).toBeUndefined(); } finally { scopedWs?.close(); } @@ -1127,13 +1125,8 @@ describe("gateway server chat", () => { expect(waitRes.ok).toBe(true); expect(waitRes.payload?.status).toBe("ok"); - const raw = await fs.readFile(testState.sessionStorePath!, "utf-8"); - const stored = JSON.parse(raw) as { - "agent:main:main"?: { - sessionId?: string; - }; - }; - expect(stored["agent:main:main"]?.sessionId).toBe("sess-main"); + const stored = getSessionEntry({ agentId: "main", sessionKey: "agent:main:main" }); + expect(stored?.sessionId).toBe("sess-main"); } finally { scopedWs?.close(); } @@ -1162,8 +1155,7 @@ describe("gateway server chat", () => { }); try { - testState.sessionStorePath = path.join(dir, "sessions.json"); - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { main: { sessionId: "sess-main", @@ -1194,7 +1186,6 @@ describe("gateway server chat", () => { await waitForAgentRunOk(runId); } finally { resolveAgentRun?.(); - testState.sessionStorePath = undefined; await removeTempDir(dir); } }); @@ -1273,8 +1264,7 @@ describe("gateway server chat", () => { test("agent events include sessionKey and agent.wait covers lifecycle flows", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); - testState.sessionStorePath = path.join(dir, "sessions.json"); - await writeSessionStore({ + await seedGatewaySessionEntries({ entries: { main: { sessionId: "sess-main", @@ -1410,7 +1400,6 @@ describe("gateway server chat", () => { } finally { webchatWs.close(); await removeTempDir(dir); - testState.sessionStorePath = undefined; } }); }); diff --git a/src/gateway/test-helpers.server.ts b/src/gateway/test-helpers.server.ts index bf4fcce4d43..03e1045226c 100644 --- a/src/gateway/test-helpers.server.ts +++ b/src/gateway/test-helpers.server.ts @@ -6,9 +6,10 @@ import { WebSocket } from "ws"; import "./test-helpers.mocks.js"; import { parseConfigJson5, resetConfigRuntimeState } from "../config/config.js"; import { - clearSessionStoreCacheForTest, + deleteSessionEntry, + listSessionEntries, resolveMainSessionKeyFromConfig, - saveSessionStore, + upsertSessionEntry, type SessionEntry, } from "../config/sessions.js"; import { resetAgentRunContextForTest } from "../infra/agent-events.js"; @@ -29,6 +30,7 @@ import { } from "../routing/session-key.js"; import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; +import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js"; import { resetTaskRegistryForTests } from "../tasks/runtime-internal.js"; import { resetTaskFlowRegistryForTests } from "../tasks/task-flow-runtime-internal.js"; import { captureEnv } from "../test-utils/env.js"; @@ -86,7 +88,6 @@ let tempHome: string | undefined; let tempConfigRoot: string | undefined; let tempControlUiRoot: string | undefined; let suiteConfigRootSeq = 0; -let lastSyncedSessionStorePath: string | undefined; let lastSyncedSessionConfigJson: string | undefined; let activeSuiteGatewayServerCount = 0; let activeSuiteHookScopeCount = 0; @@ -117,10 +118,7 @@ function serializeGatewayTestSessionConfig(): string | undefined { } function hasUnsyncedGatewayTestSessionConfig(): boolean { - return ( - testState.sessionStorePath !== lastSyncedSessionStorePath || - serializeGatewayTestSessionConfig() !== lastSyncedSessionConfigJson - ); + return serializeGatewayTestSessionConfig() !== lastSyncedSessionConfigJson; } async function persistTestSessionConfig(): Promise { @@ -132,7 +130,6 @@ async function persistTestSessionConfig(): Promise { configPaths.add(path.join(process.env.OPENCLAW_STATE_DIR, "openclaw.json")); } const parsedConfigs = new Map>(); - let preservedTemplateStore: string | undefined; for (const configPath of configPaths) { let config: Record = {}; try { @@ -150,19 +147,7 @@ async function persistTestSessionConfig(): Promise { config = {}; } parsedConfigs.set(configPath, config); - const session = - config.session && typeof config.session === "object" && !Array.isArray(config.session) - ? (config.session as Record) - : undefined; - const existingStore = typeof session?.store === "string" ? session.store.trim() : ""; - if (!preservedTemplateStore && existingStore.includes("{agentId}")) { - preservedTemplateStore = existingStore; - } } - const nextStoreValue = - typeof testState.sessionStorePath === "string" - ? preservedTemplateStore || testState.sessionStorePath - : preservedTemplateStore; for (const configPath of configPaths) { const config = { ...parsedConfigs.get(configPath) }; const session = @@ -170,10 +155,6 @@ async function persistTestSessionConfig(): Promise { ? { ...(config.session as Record) } : {}; delete session.mainKey; - delete session.store; - if (typeof nextStoreValue === "string" && nextStoreValue.trim().length > 0) { - session.store = nextStoreValue; - } if (testState.sessionConfig) { Object.assign(session, testState.sessionConfig); } @@ -186,25 +167,19 @@ async function persistTestSessionConfig(): Promise { await fs.writeFile(configPath, `${JSON.stringify(config, null, 2)}\n`, "utf-8"); } resetConfigRuntimeState(); - lastSyncedSessionStorePath = testState.sessionStorePath; lastSyncedSessionConfigJson = serializeGatewayTestSessionConfig(); } -export async function writeSessionStore(params: { +export async function seedGatewaySessionEntries(params: { entries: Record>; - storePath?: string; agentId?: string; mainKey?: string; }): Promise { - const storePath = params.storePath ?? testState.sessionStorePath; - if (!storePath) { - throw new Error("writeSessionStore requires testState.sessionStorePath"); - } const agentId = params.agentId ?? DEFAULT_AGENT_ID; - const store: Record> = {}; + const entriesBySessionKey: Record> = {}; for (const [requestKey, entry] of Object.entries(params.entries)) { const rawKey = requestKey.trim(); - const storeKey = + const sessionKey = rawKey === "global" || rawKey === "unknown" ? rawKey : toAgentStoreSessionKey({ @@ -212,14 +187,17 @@ export async function writeSessionStore(params: { requestKey, mainKey: params.mainKey, }); - store[storeKey] = entry; + entriesBySessionKey[sessionKey] = entry; } - // Gateway suites often reuse the same store path across tests; clear the + // Gateway suites often reuse the same config path across tests; clear the // in-process cache so handlers reload the seeded SQLite state. - clearSessionStoreCacheForTest(); await persistTestSessionConfig(); - await saveSessionStore(storePath, store as Record); - clearSessionStoreCacheForTest(); + for (const { sessionKey } of listSessionEntries({ agentId })) { + deleteSessionEntry({ agentId, sessionKey }); + } + for (const [sessionKey, entry] of Object.entries(entriesBySessionKey)) { + upsertSessionEntry({ agentId, sessionKey, entry: entry as SessionEntry }); + } } async function setupGatewayTestHome() { @@ -258,6 +236,7 @@ async function resetGatewayTestState(options: { uniqueConfigRoot: boolean }) { delete process.env.OPENCLAW_GATEWAY_TOKEN; resetTaskRegistryForTests({ persist: false }); resetTaskFlowRegistryForTests({ persist: false }); + closeOpenClawStateDatabaseForTest(); const stateDir = process.env.OPENCLAW_STATE_DIR; if (stateDir) { await fs.rm(stateDir, { @@ -321,13 +300,11 @@ async function resetGatewayTestState(options: { uniqueConfigRoot: boolean }) { testState.cronEnabled = false; testState.cronStorePath = undefined; testState.sessionConfig = undefined; - testState.sessionStorePath = undefined; testState.agentConfig = undefined; testState.agentsConfig = undefined; testState.bindingsConfig = undefined; testState.channelsConfig = undefined; testState.allowFrom = undefined; - lastSyncedSessionStorePath = testState.sessionStorePath; lastSyncedSessionConfigJson = serializeGatewayTestSessionConfig(); testIsNixMode.value = false; cronIsolatedRun.mockReset(); @@ -412,13 +389,11 @@ async function resetGatewayTestRuntimeOnly() { testState.cronEnabled = false; testState.cronStorePath = undefined; testState.sessionConfig = undefined; - testState.sessionStorePath = undefined; testState.agentConfig = undefined; testState.agentsConfig = undefined; testState.bindingsConfig = undefined; testState.channelsConfig = undefined; testState.allowFrom = undefined; - lastSyncedSessionStorePath = testState.sessionStorePath; lastSyncedSessionConfigJson = serializeGatewayTestSessionConfig(); testIsNixMode.value = false; cronIsolatedRun.mockReset(); @@ -444,7 +419,6 @@ async function resetGatewayTestRuntimeOnly() { tokensAfter: 80, }, }); - clearSessionStoreCacheForTest(); await persistTestSessionConfig(); for (const sessionKey of resolveGatewayTestMainSessionKeys()) { drainSystemEvents(sessionKey); @@ -581,7 +555,6 @@ export async function startGatewayServer(port: number, opts?: GatewayServerOptio // Tests mutate testState-backed config before server startup; discard earlier // helper reads so startup observes the current fixture state. resetConfigRuntimeState(); - clearSessionStoreCacheForTest(); const mod = await getServerModule(); const resolvedOpts = opts?.controlUiEnabled === undefined ? { ...opts, controlUiEnabled: false } : opts; @@ -1053,7 +1026,6 @@ export async function rpcReq>( // RPCs while reusing one server instance; flush caches so the next request // observes the updated test fixture state. resetConfigRuntimeState(); - clearSessionStoreCacheForTest(); const { randomUUID } = await import("node:crypto"); const id = randomUUID(); const responsePromise = onceMessage<{ diff --git a/src/infra/heartbeat-runner.typing.test.ts b/src/infra/heartbeat-runner.typing.test.ts index d38e0cef861..a865c6d4d48 100644 --- a/src/infra/heartbeat-runner.typing.test.ts +++ b/src/infra/heartbeat-runner.typing.test.ts @@ -50,7 +50,6 @@ function createHeartbeatConfig(params: { }, }, session: { - store: params.storePath, ...params.session, }, } as OpenClawConfig; diff --git a/test/helpers/auto-reply/trigger-handling-test-harness.ts b/test/helpers/auto-reply/trigger-handling-test-harness.ts index 6600ad43d6c..a9d0066fa37 100644 --- a/test/helpers/auto-reply/trigger-handling-test-harness.ts +++ b/test/helpers/auto-reply/trigger-handling-test-harness.ts @@ -278,7 +278,6 @@ export function makeCfg(home: string): OpenClawConfig { debounceMs: 0, }, }, - session: { store: join(home, "sessions.json") }, } as OpenClawConfig); } @@ -297,14 +296,6 @@ export function installTriggerHandlingReplyHarness( installTriggerHandlingE2eTestHooks(); } -export function requireSessionStorePath(cfg: { session?: { store?: string } }): string { - const storePath = cfg.session?.store; - if (!storePath) { - throw new Error("expected session store path"); - } - return storePath; -} - export async function expectInlineCommandHandledAndStripped(params: { home: string; getReplyFromConfig: typeof import("../../../src/auto-reply/reply.js").getReplyFromConfig;