fix(telegram): serialize thread binding persists

This commit is contained in:
Vincent Koc
2026-03-20 00:28:52 -07:00
parent 9f8af3604d
commit f1e012e0fc
3 changed files with 144 additions and 27 deletions

View File

@@ -15,12 +15,13 @@ import {
describe("telegram thread bindings", () => {
let stateDirOverride: string | undefined;
beforeEach(() => {
__testing.resetTelegramThreadBindingsForTests();
beforeEach(async () => {
await __testing.resetTelegramThreadBindingsForTests();
});
afterEach(() => {
afterEach(async () => {
vi.useRealTimers();
await __testing.resetTelegramThreadBindingsForTests();
if (stateDirOverride) {
delete process.env.OPENCLAW_STATE_DIR;
fs.rmSync(stateDirOverride, { recursive: true, force: true });
@@ -90,7 +91,7 @@ describe("telegram thread bindings", () => {
"./thread-bindings.js?scope=shared-b",
);
bindingsA.__testing.resetTelegramThreadBindingsForTests();
await bindingsA.__testing.resetTelegramThreadBindingsForTests();
try {
const managerA = bindingsA.createTelegramThreadBindingManager({
@@ -123,7 +124,7 @@ describe("telegram thread bindings", () => {
?.getByConversationId("-100200300:topic:44")?.targetSessionKey,
).toBe("agent:main:subagent:child-shared");
} finally {
bindingsA.__testing.resetTelegramThreadBindingsForTests();
await bindingsA.__testing.resetTelegramThreadBindingsForTests();
}
});
@@ -237,7 +238,7 @@ describe("telegram thread bindings", () => {
reason: "test-detach",
});
__testing.resetTelegramThreadBindingsForTests();
await __testing.resetTelegramThreadBindingsForTests();
const reloaded = createTelegramThreadBindingManager({
accountId: "default",
@@ -247,4 +248,45 @@ describe("telegram thread bindings", () => {
expect(reloaded.getByConversationId("8460800771")).toBeUndefined();
});
it("flushes pending lifecycle update persists before test reset", async () => {
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z"));
createTelegramThreadBindingManager({
accountId: "persist-reset",
persist: true,
enableSweeper: false,
});
await getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:child-3",
targetKind: "subagent",
conversation: {
channel: "telegram",
accountId: "persist-reset",
conversationId: "-100200300:topic:99",
},
});
setTelegramThreadBindingIdleTimeoutBySessionKey({
accountId: "persist-reset",
targetSessionKey: "agent:main:subagent:child-3",
idleTimeoutMs: 90_000,
});
await __testing.resetTelegramThreadBindingsForTests();
const statePath = path.join(
resolveStateDir(process.env, os.homedir),
"telegram",
"thread-bindings-persist-reset.json",
);
const persisted = JSON.parse(fs.readFileSync(statePath, "utf8")) as {
bindings?: Array<{ idleTimeoutMs?: number }>;
};
expect(persisted.bindings?.[0]?.idleTimeoutMs).toBe(90_000);
});
});

View File

@@ -67,6 +67,7 @@ export type TelegramThreadBindingManager = {
type TelegramThreadBindingsState = {
managersByAccountId: Map<string, TelegramThreadBindingManager>;
bindingsByAccountConversation: Map<string, TelegramThreadBindingRecord>;
persistQueueByAccountId: Map<string, Promise<void>>;
};
/**
@@ -80,10 +81,12 @@ const threadBindingsState = resolveGlobalSingleton<TelegramThreadBindingsState>(
() => ({
managersByAccountId: new Map<string, TelegramThreadBindingManager>(),
bindingsByAccountConversation: new Map<string, TelegramThreadBindingRecord>(),
persistQueueByAccountId: new Map<string, Promise<void>>(),
}),
);
const MANAGERS_BY_ACCOUNT_ID = threadBindingsState.managersByAccountId;
const BINDINGS_BY_ACCOUNT_CONVERSATION = threadBindingsState.bindingsByAccountConversation;
const PERSIST_QUEUE_BY_ACCOUNT_ID = threadBindingsState.persistQueueByAccountId;
function normalizeDurationMs(raw: unknown, fallback: number): number {
if (typeof raw !== "number" || !Number.isFinite(raw)) {
@@ -323,16 +326,18 @@ function loadBindingsFromDisk(accountId: string): TelegramThreadBindingRecord[]
async function persistBindingsToDisk(params: {
accountId: string;
persist: boolean;
bindings?: TelegramThreadBindingRecord[];
}): Promise<void> {
if (!params.persist) {
return;
}
const bindings = [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter(
(entry) => entry.accountId === params.accountId,
);
const payload: StoredTelegramBindingState = {
version: STORE_VERSION,
bindings,
bindings:
params.bindings ??
[...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter(
(entry) => entry.accountId === params.accountId,
),
};
await writeJsonAtomic(resolveBindingsPath(params.accountId), payload, {
mode: 0o600,
@@ -341,6 +346,48 @@ async function persistBindingsToDisk(params: {
});
}
function listBindingsForAccount(accountId: string): TelegramThreadBindingRecord[] {
return [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter(
(entry) => entry.accountId === accountId,
);
}
function enqueuePersistBindings(params: {
accountId: string;
persist: boolean;
bindings?: TelegramThreadBindingRecord[];
}): Promise<void> {
if (!params.persist) {
return Promise.resolve();
}
const previous = PERSIST_QUEUE_BY_ACCOUNT_ID.get(params.accountId) ?? Promise.resolve();
const next = previous
.catch(() => undefined)
.then(async () => {
await persistBindingsToDisk(params);
});
PERSIST_QUEUE_BY_ACCOUNT_ID.set(params.accountId, next);
void next.finally(() => {
if (PERSIST_QUEUE_BY_ACCOUNT_ID.get(params.accountId) === next) {
PERSIST_QUEUE_BY_ACCOUNT_ID.delete(params.accountId);
}
});
return next;
}
function persistBindingsSafely(params: {
accountId: string;
persist: boolean;
bindings?: TelegramThreadBindingRecord[];
reason: string;
}): void {
void enqueuePersistBindings(params).catch((err) => {
logVerbose(
`telegram thread bindings persist failed (${params.accountId}, ${params.reason}): ${String(err)}`,
);
});
}
function normalizeTimestampMs(raw: unknown): number {
if (typeof raw !== "number" || !Number.isFinite(raw)) {
return Date.now();
@@ -414,9 +461,6 @@ export function createTelegramThreadBindingManager(
});
}
const listBindingsForAccount = () =>
[...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter((entry) => entry.accountId === accountId);
let sweepTimer: NodeJS.Timeout | null = null;
const manager: TelegramThreadBindingManager = {
@@ -441,11 +485,11 @@ export function createTelegramThreadBindingManager(
if (!targetSessionKey) {
return [];
}
return listBindingsForAccount().filter(
return listBindingsForAccount(accountId).filter(
(entry) => entry.targetSessionKey === targetSessionKey,
);
},
listBindings: () => listBindingsForAccount(),
listBindings: () => listBindingsForAccount(accountId),
touchConversation: (conversationIdRaw, at) => {
const conversationId = normalizeConversationId(conversationIdRaw);
if (!conversationId) {
@@ -461,7 +505,12 @@ export function createTelegramThreadBindingManager(
lastActivityAt: normalizeTimestampMs(at ?? Date.now()),
};
BINDINGS_BY_ACCOUNT_CONVERSATION.set(key, nextRecord);
void persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() });
persistBindingsSafely({
accountId,
persist: manager.shouldPersistMutations(),
bindings: listBindingsForAccount(accountId),
reason: "touch",
});
return nextRecord;
},
unbindConversation: (unbindParams) => {
@@ -475,7 +524,12 @@ export function createTelegramThreadBindingManager(
return null;
}
BINDINGS_BY_ACCOUNT_CONVERSATION.delete(key);
void persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() });
persistBindingsSafely({
accountId,
persist: manager.shouldPersistMutations(),
bindings: listBindingsForAccount(accountId),
reason: "unbind-conversation",
});
return removed;
},
unbindBySessionKey: (unbindParams) => {
@@ -484,7 +538,7 @@ export function createTelegramThreadBindingManager(
return [];
}
const removed: TelegramThreadBindingRecord[] = [];
for (const entry of listBindingsForAccount()) {
for (const entry of listBindingsForAccount(accountId)) {
if (entry.targetSessionKey !== targetSessionKey) {
continue;
}
@@ -496,7 +550,12 @@ export function createTelegramThreadBindingManager(
removed.push(entry);
}
if (removed.length > 0) {
void persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() });
persistBindingsSafely({
accountId,
persist: manager.shouldPersistMutations(),
bindings: listBindingsForAccount(accountId),
reason: "unbind-session",
});
}
return removed;
},
@@ -544,7 +603,11 @@ export function createTelegramThreadBindingManager(
resolveBindingKey({ accountId, conversationId }),
record,
);
await persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() });
await enqueuePersistBindings({
accountId,
persist: manager.shouldPersistMutations(),
bindings: listBindingsForAccount(accountId),
});
logVerbose(
`telegram: bound conversation ${conversationId} -> ${targetSessionKey} (${summarizeLifecycleForLog(
record,
@@ -605,7 +668,11 @@ export function createTelegramThreadBindingManager(
sendFarewell: false,
});
if (removed.length > 0) {
await persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() });
await enqueuePersistBindings({
accountId,
persist: manager.shouldPersistMutations(),
bindings: listBindingsForAccount(accountId),
});
}
return removed.map((entry) =>
toSessionBindingRecord(entry, {
@@ -627,7 +694,11 @@ export function createTelegramThreadBindingManager(
sendFarewell: false,
});
if (removed) {
await persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() });
await enqueuePersistBindings({
accountId,
persist: manager.shouldPersistMutations(),
bindings: listBindingsForAccount(accountId),
});
}
return removed
? [
@@ -644,7 +715,7 @@ export function createTelegramThreadBindingManager(
if (sweeperEnabled) {
sweepTimer = setInterval(() => {
const now = Date.now();
for (const record of listBindingsForAccount()) {
for (const record of listBindingsForAccount(accountId)) {
const idleExpired = shouldExpireByIdle({
now,
record,
@@ -699,9 +770,11 @@ function updateTelegramBindingsBySessionKey(params: {
updated.push(next);
}
if (updated.length > 0) {
void persistBindingsToDisk({
persistBindingsSafely({
accountId: params.manager.accountId,
persist: params.manager.shouldPersistMutations(),
bindings: listBindingsForAccount(params.manager.accountId),
reason: "session-lifecycle-update",
});
}
return updated;
@@ -750,10 +823,12 @@ export function setTelegramThreadBindingMaxAgeBySessionKey(params: {
}
export const __testing = {
resetTelegramThreadBindingsForTests() {
async resetTelegramThreadBindingsForTests() {
for (const manager of MANAGERS_BY_ACCOUNT_ID.values()) {
manager.stop();
}
await Promise.allSettled(PERSIST_QUEUE_BY_ACCOUNT_ID.values());
PERSIST_QUEUE_BY_ACCOUNT_ID.clear();
MANAGERS_BY_ACCOUNT_ID.clear();
BINDINGS_BY_ACCOUNT_CONVERSATION.clear();
},

View File

@@ -22,12 +22,12 @@ vi.mock("../../../../extensions/matrix/src/matrix/send.js", async () => {
};
});
beforeEach(() => {
beforeEach(async () => {
sessionBindingTesting.resetSessionBindingAdaptersForTests();
discordThreadBindingTesting.resetThreadBindingsForTests();
feishuThreadBindingTesting.resetFeishuThreadBindingsForTests();
resetMatrixThreadBindingsForTests();
telegramThreadBindingTesting.resetTelegramThreadBindingsForTests();
await telegramThreadBindingTesting.resetTelegramThreadBindingsForTests();
});
for (const entry of sessionBindingContractRegistry) {