mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-25 23:47:20 +00:00
test: stabilize gateway thread harness
This commit is contained in:
@@ -24,7 +24,12 @@ import { drainSystemEvents, peekSystemEvents } from "../infra/system-events.js";
|
||||
import { rawDataToString } from "../infra/ws.js";
|
||||
import { resetLogger, setLoggerOverride } from "../logging.js";
|
||||
import { clearGatewaySubagentRuntime } from "../plugins/runtime/index.js";
|
||||
import { DEFAULT_AGENT_ID, toAgentStoreSessionKey } from "../routing/session-key.js";
|
||||
import {
|
||||
DEFAULT_AGENT_ID,
|
||||
normalizeMainKey,
|
||||
parseAgentSessionKey,
|
||||
toAgentStoreSessionKey,
|
||||
} from "../routing/session-key.js";
|
||||
import { captureEnv } from "../test-utils/env.js";
|
||||
import { getDeterministicFreePortBlock } from "../test-utils/ports.js";
|
||||
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
|
||||
@@ -75,8 +80,42 @@ let gatewayEnvSnapshot: ReturnType<typeof captureEnv> | undefined;
|
||||
let tempHome: string | undefined;
|
||||
let tempConfigRoot: string | undefined;
|
||||
let suiteConfigRootSeq = 0;
|
||||
let lastSyncedSessionStorePath: string | undefined;
|
||||
let lastSyncedSessionConfigJson: string | undefined;
|
||||
|
||||
async function persistTestSessionStorePath(storePath: string): Promise<void> {
|
||||
function resolveGatewayTestMainSessionKeys(): string[] {
|
||||
const resolved = resolveMainSessionKeyFromConfig();
|
||||
const keys = new Set<string>();
|
||||
if (resolved) {
|
||||
keys.add(resolved);
|
||||
}
|
||||
if (resolved !== "global") {
|
||||
const parsed = parseAgentSessionKey(resolved);
|
||||
const agentId = parsed?.agentId ?? DEFAULT_AGENT_ID;
|
||||
keys.add(`agent:${agentId}:main`);
|
||||
const configuredMainKey = normalizeMainKey(
|
||||
(testState.sessionConfig as { mainKey?: unknown } | undefined)?.mainKey as string | undefined,
|
||||
);
|
||||
keys.add(`agent:${agentId}:${configuredMainKey}`);
|
||||
}
|
||||
return [...keys];
|
||||
}
|
||||
|
||||
function serializeGatewayTestSessionConfig(): string | undefined {
|
||||
if (!testState.sessionConfig) {
|
||||
return undefined;
|
||||
}
|
||||
return JSON.stringify(testState.sessionConfig);
|
||||
}
|
||||
|
||||
function hasUnsyncedGatewayTestSessionConfig(): boolean {
|
||||
return (
|
||||
testState.sessionStorePath !== lastSyncedSessionStorePath ||
|
||||
serializeGatewayTestSessionConfig() !== lastSyncedSessionConfigJson
|
||||
);
|
||||
}
|
||||
|
||||
async function persistTestSessionConfig(): Promise<void> {
|
||||
const configPaths = new Set<string>();
|
||||
if (process.env.OPENCLAW_CONFIG_PATH) {
|
||||
configPaths.add(process.env.OPENCLAW_CONFIG_PATH);
|
||||
@@ -112,20 +151,30 @@ async function persistTestSessionStorePath(storePath: string): Promise<void> {
|
||||
preservedTemplateStore = existingStore;
|
||||
}
|
||||
}
|
||||
const nextStoreValue = preservedTemplateStore || storePath;
|
||||
const nextStoreValue =
|
||||
typeof testState.sessionStorePath === "string"
|
||||
? preservedTemplateStore || testState.sessionStorePath
|
||||
: preservedTemplateStore;
|
||||
for (const configPath of configPaths) {
|
||||
const config = { ...parsedConfigs.get(configPath) };
|
||||
const session =
|
||||
config.session && typeof config.session === "object" && !Array.isArray(config.session)
|
||||
? { ...(config.session as Record<string, unknown>) }
|
||||
: {};
|
||||
session.store = nextStoreValue;
|
||||
if (typeof nextStoreValue === "string" && nextStoreValue.trim().length > 0) {
|
||||
session.store = nextStoreValue;
|
||||
}
|
||||
if (testState.sessionConfig) {
|
||||
Object.assign(session, testState.sessionConfig);
|
||||
}
|
||||
config.session = session;
|
||||
await fs.mkdir(path.dirname(configPath), { recursive: true });
|
||||
await fs.writeFile(configPath, `${JSON.stringify(config, null, 2)}\n`, "utf-8");
|
||||
}
|
||||
clearRuntimeConfigSnapshot();
|
||||
clearConfigCache();
|
||||
lastSyncedSessionStorePath = testState.sessionStorePath;
|
||||
lastSyncedSessionConfigJson = serializeGatewayTestSessionConfig();
|
||||
}
|
||||
|
||||
export async function writeSessionStore(params: {
|
||||
@@ -155,7 +204,7 @@ export async function writeSessionStore(params: {
|
||||
// Gateway suites often reuse the same store path across tests while writing the
|
||||
// file directly; clear the in-process cache so handlers reload the seeded state.
|
||||
clearSessionStoreCacheForTest();
|
||||
await persistTestSessionStorePath(storePath);
|
||||
await persistTestSessionConfig();
|
||||
await fs.mkdir(path.dirname(storePath), { recursive: true });
|
||||
await fs.writeFile(storePath, JSON.stringify(store, null, 2), "utf-8");
|
||||
clearSessionStoreCacheForTest();
|
||||
@@ -248,6 +297,8 @@ async function resetGatewayTestState(options: { uniqueConfigRoot: boolean }) {
|
||||
testState.bindingsConfig = undefined;
|
||||
testState.channelsConfig = undefined;
|
||||
testState.allowFrom = undefined;
|
||||
lastSyncedSessionStorePath = testState.sessionStorePath;
|
||||
lastSyncedSessionConfigJson = serializeGatewayTestSessionConfig();
|
||||
testIsNixMode.value = false;
|
||||
cronIsolatedRun.mockReset();
|
||||
cronIsolatedRun.mockResolvedValue({ status: "ok", summary: "ok" });
|
||||
@@ -261,7 +312,9 @@ async function resetGatewayTestState(options: { uniqueConfigRoot: boolean }) {
|
||||
embeddedRunMock.abortCalls = [];
|
||||
embeddedRunMock.waitCalls = [];
|
||||
embeddedRunMock.waitResults.clear();
|
||||
drainSystemEvents(resolveMainSessionKeyFromConfig());
|
||||
for (const sessionKey of resolveGatewayTestMainSessionKeys()) {
|
||||
drainSystemEvents(sessionKey);
|
||||
}
|
||||
resetAgentRunContextForTest();
|
||||
const mod = await getServerModule();
|
||||
mod.__resetModelCatalogCacheForTest();
|
||||
@@ -810,6 +863,15 @@ export async function rpcReq<T extends Record<string, unknown>>(
|
||||
params?: unknown,
|
||||
timeoutMs?: number,
|
||||
) {
|
||||
if (hasUnsyncedGatewayTestSessionConfig()) {
|
||||
await persistTestSessionConfig();
|
||||
}
|
||||
// Gateway suites often mutate testState-backed config/session inputs between
|
||||
// RPCs while reusing one server instance; flush caches so the next request
|
||||
// observes the updated test fixture state.
|
||||
clearRuntimeConfigSnapshot();
|
||||
clearConfigCache();
|
||||
clearSessionStoreCacheForTest();
|
||||
const { randomUUID } = await import("node:crypto");
|
||||
const id = randomUUID();
|
||||
ws.send(JSON.stringify({ type: "req", id, method, params }));
|
||||
@@ -833,12 +895,14 @@ export async function rpcReq<T extends Record<string, unknown>>(
|
||||
}
|
||||
|
||||
export async function waitForSystemEvent(timeoutMs = 2000) {
|
||||
const sessionKey = resolveMainSessionKeyFromConfig();
|
||||
const sessionKeys = resolveGatewayTestMainSessionKeys();
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
while (Date.now() < deadline) {
|
||||
const events = peekSystemEvents(sessionKey);
|
||||
if (events.length > 0) {
|
||||
return events;
|
||||
for (const sessionKey of sessionKeys) {
|
||||
const events = peekSystemEvents(sessionKey);
|
||||
if (events.length > 0) {
|
||||
return events;
|
||||
}
|
||||
}
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user