mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-20 21:23:23 +00:00
fix(whatsapp): preserve watchdog message age across reconnects
This commit is contained in:
@@ -250,6 +250,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Discord/ACP: forward worker abort signals into ACP turns so timed-out Discord jobs cancel the running turn instead of silently leaving the bound ACP session working in the background.
|
||||
- Gateway/openresponses: preserve assistant commentary and session continuity across hosted-tool `/v1/responses` turns, and emit streamed tool-call payloads before finalization so client tool loops stay resumable. (#52171) Thanks @CharZhou.
|
||||
- Android/Talk: serialize `TalkModeManager` player teardown so rapid interrupt/restart cycles stop double-releasing or overlapping TTS playback. (#52310) Thanks @Kaneki-x.
|
||||
- WhatsApp/reconnect: preserve the last inbound timestamp across reconnect attempts so the watchdog can still recycle linked-but-dead listeners after a restart instead of leaving them stuck connected forever.
|
||||
|
||||
### Breaking
|
||||
|
||||
|
||||
@@ -184,7 +184,7 @@ describe("web auto-reply connection", () => {
|
||||
if (!completedQuickly) {
|
||||
await vi.waitFor(
|
||||
() => {
|
||||
expect(listenerFactory).toHaveBeenCalledTimes(2);
|
||||
expect(listenerFactory.mock.calls.length).toBeGreaterThanOrEqual(2);
|
||||
},
|
||||
{ timeout: 250, interval: 2 },
|
||||
);
|
||||
@@ -264,7 +264,7 @@ describe("web auto-reply connection", () => {
|
||||
await Promise.resolve();
|
||||
await vi.waitFor(
|
||||
() => {
|
||||
expect(listenerFactory).toHaveBeenCalledTimes(2);
|
||||
expect(listenerFactory.mock.calls.length).toBeGreaterThanOrEqual(2);
|
||||
},
|
||||
{ timeout: 250, interval: 2 },
|
||||
);
|
||||
@@ -278,6 +278,92 @@ describe("web auto-reply connection", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps watchdog message age across reconnects", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const sleep = vi.fn(async () => {});
|
||||
const closeResolvers: Array<(reason: unknown) => void> = [];
|
||||
let capturedOnMessage:
|
||||
| ((msg: import("./inbound.js").WebInboundMessage) => Promise<void>)
|
||||
| undefined;
|
||||
const listenerFactory = vi.fn(
|
||||
async (opts: {
|
||||
onMessage: (msg: import("./inbound.js").WebInboundMessage) => Promise<void>;
|
||||
}) => {
|
||||
capturedOnMessage = opts.onMessage;
|
||||
let resolveClose: (reason: unknown) => void = () => {};
|
||||
const onClose = new Promise<unknown>((res) => {
|
||||
resolveClose = res;
|
||||
closeResolvers.push(res);
|
||||
});
|
||||
return {
|
||||
close: vi.fn(),
|
||||
onClose,
|
||||
signalClose: (reason?: unknown) => resolveClose(reason),
|
||||
};
|
||||
},
|
||||
);
|
||||
const { controller, run } = startMonitorWebChannel({
|
||||
monitorWebChannelFn: monitorWebChannel as never,
|
||||
listenerFactory,
|
||||
sleep,
|
||||
heartbeatSeconds: 60,
|
||||
messageTimeoutMs: 30,
|
||||
watchdogCheckMs: 5,
|
||||
});
|
||||
|
||||
await Promise.resolve();
|
||||
expect(listenerFactory).toHaveBeenCalledTimes(1);
|
||||
await vi.waitFor(
|
||||
() => {
|
||||
expect(capturedOnMessage).toBeTypeOf("function");
|
||||
},
|
||||
{ timeout: 250, interval: 2 },
|
||||
);
|
||||
|
||||
const reply = vi.fn().mockResolvedValue(undefined);
|
||||
const sendComposing = vi.fn();
|
||||
const sendMedia = vi.fn();
|
||||
|
||||
void capturedOnMessage?.(
|
||||
makeInboundMessage({
|
||||
body: "hi",
|
||||
from: "+1",
|
||||
to: "+2",
|
||||
id: "m1",
|
||||
sendComposing,
|
||||
reply,
|
||||
sendMedia,
|
||||
}),
|
||||
);
|
||||
await Promise.resolve();
|
||||
|
||||
closeResolvers.shift()?.({ status: 499, isLoggedOut: false, error: "first-close" });
|
||||
await vi.waitFor(
|
||||
() => {
|
||||
expect(listenerFactory).toHaveBeenCalledTimes(2);
|
||||
},
|
||||
{ timeout: 250, interval: 2 },
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(200);
|
||||
await Promise.resolve();
|
||||
await vi.waitFor(
|
||||
() => {
|
||||
expect(listenerFactory.mock.calls.length).toBeGreaterThanOrEqual(3);
|
||||
},
|
||||
{ timeout: 250, interval: 2 },
|
||||
);
|
||||
|
||||
controller.abort();
|
||||
closeResolvers.at(-1)?.({ status: 499, isLoggedOut: false, error: "aborted" });
|
||||
await Promise.resolve();
|
||||
await run;
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("processes inbound messages without batching and preserves timestamps", async () => {
|
||||
await withEnvAsync({ TZ: "Europe/Vienna" }, async () => {
|
||||
const originalMax = process.getMaxListeners();
|
||||
|
||||
@@ -151,7 +151,10 @@ export async function monitorWebChannel(
|
||||
const startedAt = Date.now();
|
||||
let heartbeat: NodeJS.Timeout | null = null;
|
||||
let watchdogTimer: NodeJS.Timeout | null = null;
|
||||
let lastMessageAt: number | null = null;
|
||||
// Preserve the last known inbound timestamp across reconnects so the watchdog
|
||||
// can still detect a listener that comes back "connected" but never receives
|
||||
// another message after a restart cycle.
|
||||
let lastMessageAt: number | null = status.lastMessageAt ?? null;
|
||||
let handledMessages = 0;
|
||||
let _lastInboundMsg: WebInboundMsg | null = null;
|
||||
let unregisterUnhandled: (() => void) | null = null;
|
||||
|
||||
Reference in New Issue
Block a user