Matrix: persist clean shutdown sync state

This commit is contained in:
Gustavo Madeira Santana
2026-03-19 08:31:44 -04:00
parent 16129272dc
commit 75e6c8fe9c
5 changed files with 103 additions and 17 deletions

View File

@@ -91,6 +91,50 @@ describe("FileBackedMatrixSyncStore", () => {
},
]);
expect(savedSync?.roomsData.join?.["!room:example.org"]).toBeTruthy();
expect(secondStore.hasSavedSyncFromCleanShutdown()).toBe(false);
});
it("only treats sync state as restart-safe after a clean shutdown persist", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-"));
tempDirs.push(tempDir);
const storagePath = path.join(tempDir, "bot-storage.json");
const firstStore = new FileBackedMatrixSyncStore(storagePath);
await firstStore.setSyncData(createSyncResponse("s123"));
await firstStore.flush();
const afterDirtyPersist = new FileBackedMatrixSyncStore(storagePath);
expect(afterDirtyPersist.hasSavedSync()).toBe(true);
expect(afterDirtyPersist.hasSavedSyncFromCleanShutdown()).toBe(false);
firstStore.markCleanShutdown();
await firstStore.flush();
const afterCleanShutdown = new FileBackedMatrixSyncStore(storagePath);
expect(afterCleanShutdown.hasSavedSync()).toBe(true);
expect(afterCleanShutdown.hasSavedSyncFromCleanShutdown()).toBe(true);
});
it("clears the clean-shutdown marker once fresh sync data arrives", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-"));
tempDirs.push(tempDir);
const storagePath = path.join(tempDir, "bot-storage.json");
const firstStore = new FileBackedMatrixSyncStore(storagePath);
await firstStore.setSyncData(createSyncResponse("s123"));
firstStore.markCleanShutdown();
await firstStore.flush();
const restartedStore = new FileBackedMatrixSyncStore(storagePath);
expect(restartedStore.hasSavedSyncFromCleanShutdown()).toBe(true);
await restartedStore.setSyncData(createSyncResponse("s456"));
await restartedStore.flush();
const afterNewSync = new FileBackedMatrixSyncStore(storagePath);
expect(afterNewSync.hasSavedSync()).toBe(true);
expect(afterNewSync.hasSavedSyncFromCleanShutdown()).toBe(false);
await expect(afterNewSync.getSavedSyncToken()).resolves.toBe("s456");
});
it("coalesces background persistence until the debounce window elapses", async () => {

View File

@@ -17,6 +17,7 @@ type PersistedMatrixSyncStore = {
version: number;
savedSync: ISyncData | null;
clientOptions?: IStoredClientOpts;
cleanShutdown?: boolean;
};
function createAsyncLock() {
@@ -76,6 +77,7 @@ function readPersistedStore(raw: string): PersistedMatrixSyncStore | null {
version?: unknown;
savedSync?: unknown;
clientOptions?: unknown;
cleanShutdown?: unknown;
};
const savedSync = toPersistedSyncData(parsed.savedSync);
if (parsed.version === STORE_VERSION) {
@@ -85,6 +87,7 @@ function readPersistedStore(raw: string): PersistedMatrixSyncStore | null {
clientOptions: isRecord(parsed.clientOptions)
? (parsed.clientOptions as IStoredClientOpts)
: undefined,
cleanShutdown: parsed.cleanShutdown === true,
};
}
@@ -93,6 +96,7 @@ function readPersistedStore(raw: string): PersistedMatrixSyncStore | null {
return {
version: STORE_VERSION,
savedSync: toPersistedSyncData(parsed),
cleanShutdown: false,
};
} catch {
return null;
@@ -119,6 +123,8 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
private savedSync: ISyncData | null = null;
private savedClientOptions: IStoredClientOpts | undefined;
private readonly hadSavedSyncOnLoad: boolean;
private readonly hadCleanShutdownOnLoad: boolean;
private cleanShutdown = false;
private dirty = false;
private persistTimer: NodeJS.Timeout | null = null;
private persistPromise: Promise<void> | null = null;
@@ -128,11 +134,13 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
let restoredSavedSync: ISyncData | null = null;
let restoredClientOptions: IStoredClientOpts | undefined;
let restoredCleanShutdown = false;
try {
const raw = readFileSync(this.storagePath, "utf8");
const persisted = readPersistedStore(raw);
restoredSavedSync = persisted?.savedSync ?? null;
restoredClientOptions = persisted?.clientOptions;
restoredCleanShutdown = persisted?.cleanShutdown === true;
} catch {
// Missing or unreadable sync cache should not block startup.
}
@@ -140,6 +148,8 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
this.savedSync = restoredSavedSync;
this.savedClientOptions = restoredClientOptions;
this.hadSavedSyncOnLoad = restoredSavedSync !== null;
this.hadCleanShutdownOnLoad = this.hadSavedSyncOnLoad && restoredCleanShutdown;
this.cleanShutdown = this.hadCleanShutdownOnLoad;
if (this.savedSync) {
this.accumulator.accumulate(syncDataToSyncResponse(this.savedSync), true);
@@ -154,6 +164,10 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
return this.hadSavedSyncOnLoad;
}
hasSavedSyncFromCleanShutdown(): boolean {
return this.hadCleanShutdownOnLoad;
}
override getSavedSync(): Promise<ISyncData | null> {
return Promise.resolve(this.savedSync ? cloneJson(this.savedSync) : null);
}
@@ -205,9 +219,15 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
await super.deleteAllData();
this.savedSync = null;
this.savedClientOptions = undefined;
this.cleanShutdown = false;
await fs.rm(this.storagePath, { force: true }).catch(() => undefined);
}
markCleanShutdown(): void {
this.cleanShutdown = true;
this.dirty = true;
}
async flush(): Promise<void> {
if (this.persistTimer) {
clearTimeout(this.persistTimer);
@@ -224,6 +244,7 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
}
private markDirtyAndSchedulePersist(): void {
this.cleanShutdown = false;
this.dirty = true;
if (this.persistTimer) {
return;
@@ -242,6 +263,7 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
const payload: PersistedMatrixSyncStore = {
version: STORE_VERSION,
savedSync: this.savedSync ? cloneJson(this.savedSync) : null,
cleanShutdown: this.cleanShutdown === true,
...(this.savedClientOptions ? { clientOptions: cloneJson(this.savedClientOptions) } : {}),
};
try {

View File

@@ -17,17 +17,17 @@ const hoisted = vi.hoisted(() => {
debug: vi.fn(),
};
const stopThreadBindingManager = vi.fn();
const stopSharedClientInstance = vi.fn();
const releaseSharedClientInstance = vi.fn(async () => true);
const setActiveMatrixClient = vi.fn();
return {
callOrder,
client,
createMatrixRoomMessageHandler,
logger,
releaseSharedClientInstance,
resolveTextChunkLimit,
setActiveMatrixClient,
startClientError: null as Error | null,
stopSharedClientInstance,
stopThreadBindingManager,
};
});
@@ -127,7 +127,10 @@ vi.mock("../client.js", () => ({
hoisted.callOrder.push("start-client");
return hoisted.client;
}),
stopSharedClientInstance: hoisted.stopSharedClientInstance,
}));
vi.mock("../client/shared.js", () => ({
releaseSharedClientInstance: hoisted.releaseSharedClientInstance,
}));
vi.mock("../config-update.js", () => ({
@@ -206,8 +209,8 @@ describe("monitorMatrixProvider", () => {
hoisted.callOrder.length = 0;
hoisted.startClientError = null;
hoisted.resolveTextChunkLimit.mockReset().mockReturnValue(4000);
hoisted.releaseSharedClientInstance.mockReset().mockResolvedValue(true);
hoisted.setActiveMatrixClient.mockReset();
hoisted.stopSharedClientInstance.mockReset();
hoisted.stopThreadBindingManager.mockReset();
hoisted.client.hasPersistedSyncState.mockReset().mockReturnValue(false);
hoisted.createMatrixRoomMessageHandler.mockReset().mockReturnValue(vi.fn());
@@ -251,12 +254,13 @@ describe("monitorMatrixProvider", () => {
await expect(monitorMatrixProvider()).rejects.toThrow("start failed");
expect(hoisted.stopThreadBindingManager).toHaveBeenCalledTimes(1);
expect(hoisted.stopSharedClientInstance).toHaveBeenCalledTimes(1);
expect(hoisted.releaseSharedClientInstance).toHaveBeenCalledTimes(1);
expect(hoisted.releaseSharedClientInstance).toHaveBeenCalledWith(hoisted.client, "persist");
expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(1, hoisted.client, "default");
expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(2, null, "default");
});
it("disables cold-start backlog dropping when sync state already exists", async () => {
it("disables cold-start backlog dropping only when sync state is cleanly persisted", async () => {
hoisted.client.hasPersistedSyncState.mockReturnValue(true);
const { monitorMatrixProvider } = await import("./index.js");
const abortController = new AbortController();

View File

@@ -17,8 +17,8 @@ import {
resolveMatrixAuth,
resolveMatrixAuthContext,
resolveSharedMatrixClient,
stopSharedClientInstance,
} from "../client.js";
import { releaseSharedClientInstance } from "../client/shared.js";
import { createMatrixThreadBindingManager } from "../thread-bindings.js";
import { registerMatrixAutoJoin } from "./auto-join.js";
import { resolveMatrixMonitorConfig } from "./config.js";
@@ -131,7 +131,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
setActiveMatrixClient(client, auth.accountId);
let cleanedUp = false;
let threadBindingManager: { accountId: string; stop: () => void } | null = null;
const cleanup = () => {
const cleanup = async () => {
if (cleanedUp) {
return;
}
@@ -139,7 +139,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
try {
threadBindingManager?.stop();
} finally {
stopSharedClientInstance(client);
await releaseSharedClientInstance(client, "persist");
setActiveMatrixClient(null, auth.accountId);
}
};
@@ -273,19 +273,32 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
});
await new Promise<void>((resolve) => {
const onAbort = () => {
logVerboseMessage("matrix: stopping client");
cleanup();
resolve();
const stopAndResolve = async () => {
try {
logVerboseMessage("matrix: stopping client");
await cleanup();
} catch (err) {
logger.warn("matrix: failed during monitor shutdown cleanup", {
error: String(err),
});
} finally {
resolve();
}
};
if (opts.abortSignal?.aborted) {
onAbort();
void stopAndResolve();
return;
}
opts.abortSignal?.addEventListener("abort", onAbort, { once: true });
opts.abortSignal?.addEventListener(
"abort",
() => {
void stopAndResolve();
},
{ once: true },
);
});
} catch (err) {
cleanup();
await cleanup();
throw err;
}
}

View File

@@ -350,7 +350,9 @@ export class MatrixClient {
}
hasPersistedSyncState(): boolean {
return this.syncStore?.hasSavedSync() === true;
// Only trust restart replay when the previous process completed a final
// sync-store persist. A stale cursor can make Matrix re-surface old events.
return this.syncStore?.hasSavedSyncFromCleanShutdown() === true;
}
private async ensureStartedForCryptoControlPlane(): Promise<void> {
@@ -367,6 +369,7 @@ export class MatrixClient {
}
this.decryptBridge.stop();
// Final persist on shutdown
this.syncStore?.markCleanShutdown();
this.stopPersistPromise = Promise.all([
persistIdbToDisk({
snapshotPath: this.idbSnapshotPath,