mirror of
https://github.com/moltbot/moltbot.git
synced 2026-04-23 14:45:46 +00:00
test: dedupe telegram polling session harness
This commit is contained in:
@@ -44,6 +44,95 @@ function makeBot() {
|
||||
};
|
||||
}
|
||||
|
||||
function installPollingStallWatchdogHarness() {
|
||||
let watchdog: (() => void) | undefined;
|
||||
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
|
||||
watchdog = fn as () => void;
|
||||
return 1 as unknown as ReturnType<typeof setInterval>;
|
||||
});
|
||||
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {});
|
||||
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => {
|
||||
void Promise.resolve().then(() => (fn as () => void)());
|
||||
return 1 as unknown as ReturnType<typeof setTimeout>;
|
||||
});
|
||||
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {});
|
||||
const dateNowSpy = vi
|
||||
.spyOn(Date, "now")
|
||||
.mockImplementationOnce(() => 0)
|
||||
.mockImplementation(() => 120_001);
|
||||
|
||||
return {
|
||||
async waitForWatchdog() {
|
||||
for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) {
|
||||
await Promise.resolve();
|
||||
}
|
||||
expect(watchdog).toBeTypeOf("function");
|
||||
return watchdog;
|
||||
},
|
||||
restore() {
|
||||
setIntervalSpy.mockRestore();
|
||||
clearIntervalSpy.mockRestore();
|
||||
setTimeoutSpy.mockRestore();
|
||||
clearTimeoutSpy.mockRestore();
|
||||
dateNowSpy.mockRestore();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function expectTelegramBotTransportSequence(firstTransport: unknown, secondTransport: unknown) {
|
||||
expect(createTelegramBotMock).toHaveBeenCalledTimes(2);
|
||||
expect(createTelegramBotMock.mock.calls[0]?.[0]?.telegramTransport).toBe(firstTransport);
|
||||
expect(createTelegramBotMock.mock.calls[1]?.[0]?.telegramTransport).toBe(secondTransport);
|
||||
}
|
||||
|
||||
function makeTelegramTransport() {
|
||||
return { fetch: globalThis.fetch, sourceFetch: globalThis.fetch };
|
||||
}
|
||||
|
||||
function mockRestartAfterPollingError(error: unknown, abort: AbortController) {
|
||||
let firstCycle = true;
|
||||
runMock.mockImplementation(() => {
|
||||
if (firstCycle) {
|
||||
firstCycle = false;
|
||||
return {
|
||||
task: async () => {
|
||||
throw error;
|
||||
},
|
||||
stop: vi.fn(async () => undefined),
|
||||
isRunning: () => false,
|
||||
};
|
||||
}
|
||||
return {
|
||||
task: async () => {
|
||||
abort.abort();
|
||||
},
|
||||
stop: vi.fn(async () => undefined),
|
||||
isRunning: () => false,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
function createPollingSessionWithTransportRestart(params: {
|
||||
abortSignal: AbortSignal;
|
||||
telegramTransport: ReturnType<typeof makeTelegramTransport>;
|
||||
createTelegramTransport: () => ReturnType<typeof makeTelegramTransport>;
|
||||
}) {
|
||||
return new TelegramPollingSession({
|
||||
token: "tok",
|
||||
config: {},
|
||||
accountId: "default",
|
||||
runtime: undefined,
|
||||
proxyFetch: undefined,
|
||||
abortSignal: params.abortSignal,
|
||||
runnerOptions: {},
|
||||
getLastUpdateId: () => null,
|
||||
persistUpdateId: async () => undefined,
|
||||
log: () => undefined,
|
||||
telegramTransport: params.telegramTransport,
|
||||
createTelegramTransport: params.createTelegramTransport,
|
||||
});
|
||||
}
|
||||
|
||||
describe("TelegramPollingSession", () => {
|
||||
beforeEach(async () => {
|
||||
vi.resetModules();
|
||||
@@ -153,21 +242,7 @@ describe("TelegramPollingSession", () => {
|
||||
};
|
||||
});
|
||||
|
||||
let watchdog: (() => void) | undefined;
|
||||
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
|
||||
watchdog = fn as () => void;
|
||||
return 1 as unknown as ReturnType<typeof setInterval>;
|
||||
});
|
||||
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {});
|
||||
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => {
|
||||
void Promise.resolve().then(() => (fn as () => void)());
|
||||
return 1 as unknown as ReturnType<typeof setTimeout>;
|
||||
});
|
||||
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {});
|
||||
const dateNowSpy = vi
|
||||
.spyOn(Date, "now")
|
||||
.mockImplementationOnce(() => 0)
|
||||
.mockImplementation(() => 120_001);
|
||||
const watchdogHarness = installPollingStallWatchdogHarness();
|
||||
|
||||
const log = vi.fn();
|
||||
const session = new TelegramPollingSession({
|
||||
@@ -186,10 +261,7 @@ describe("TelegramPollingSession", () => {
|
||||
|
||||
try {
|
||||
const runPromise = session.runUntilAbort();
|
||||
for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) {
|
||||
await Promise.resolve();
|
||||
}
|
||||
expect(watchdog).toBeTypeOf("function");
|
||||
const watchdog = await watchdogHarness.waitForWatchdog();
|
||||
watchdog?.();
|
||||
await runPromise;
|
||||
|
||||
@@ -199,11 +271,7 @@ describe("TelegramPollingSession", () => {
|
||||
expect(log).toHaveBeenCalledWith(expect.stringContaining("Polling stall detected"));
|
||||
expect(log).toHaveBeenCalledWith(expect.stringContaining("polling stall detected"));
|
||||
} finally {
|
||||
setIntervalSpy.mockRestore();
|
||||
clearIntervalSpy.mockRestore();
|
||||
setTimeoutSpy.mockRestore();
|
||||
clearTimeoutSpy.mockRestore();
|
||||
dateNowSpy.mockRestore();
|
||||
watchdogHarness.restore();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -239,21 +307,7 @@ describe("TelegramPollingSession", () => {
|
||||
};
|
||||
});
|
||||
|
||||
let watchdog: (() => void) | undefined;
|
||||
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
|
||||
watchdog = fn as () => void;
|
||||
return 1 as unknown as ReturnType<typeof setInterval>;
|
||||
});
|
||||
const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {});
|
||||
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => {
|
||||
void Promise.resolve().then(() => (fn as () => void)());
|
||||
return 1 as unknown as ReturnType<typeof setTimeout>;
|
||||
});
|
||||
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {});
|
||||
const dateNowSpy = vi
|
||||
.spyOn(Date, "now")
|
||||
.mockImplementationOnce(() => 0)
|
||||
.mockImplementation(() => 120_001);
|
||||
const watchdogHarness = installPollingStallWatchdogHarness();
|
||||
|
||||
const transport1 = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch };
|
||||
const transport2 = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch };
|
||||
@@ -276,22 +330,14 @@ describe("TelegramPollingSession", () => {
|
||||
});
|
||||
|
||||
const runPromise = session.runUntilAbort();
|
||||
for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) {
|
||||
await Promise.resolve();
|
||||
}
|
||||
const watchdog = await watchdogHarness.waitForWatchdog();
|
||||
watchdog?.();
|
||||
await runPromise;
|
||||
|
||||
expect(createTelegramBotMock).toHaveBeenCalledTimes(2);
|
||||
expect(createTelegramBotMock.mock.calls[0]?.[0]?.telegramTransport).toBe(transport1);
|
||||
expect(createTelegramBotMock.mock.calls[1]?.[0]?.telegramTransport).toBe(transport2);
|
||||
expectTelegramBotTransportSequence(transport1, transport2);
|
||||
expect(createTelegramTransport).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
setIntervalSpy.mockRestore();
|
||||
clearIntervalSpy.mockRestore();
|
||||
setTimeoutSpy.mockRestore();
|
||||
clearTimeoutSpy.mockRestore();
|
||||
dateNowSpy.mockRestore();
|
||||
watchdogHarness.restore();
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
@@ -299,52 +345,21 @@ describe("TelegramPollingSession", () => {
|
||||
it("rebuilds the transport after a recoverable polling error", async () => {
|
||||
const abort = new AbortController();
|
||||
const recoverableError = new Error("recoverable polling error");
|
||||
const transport1 = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch };
|
||||
const transport2 = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch };
|
||||
const transport1 = makeTelegramTransport();
|
||||
const transport2 = makeTelegramTransport();
|
||||
const createTelegramTransport = vi.fn(() => transport2);
|
||||
createTelegramBotMock.mockReturnValueOnce(makeBot()).mockReturnValueOnce(makeBot());
|
||||
mockRestartAfterPollingError(recoverableError, abort);
|
||||
|
||||
let firstCycle = true;
|
||||
runMock.mockImplementation(() => {
|
||||
if (firstCycle) {
|
||||
firstCycle = false;
|
||||
return {
|
||||
task: async () => {
|
||||
throw recoverableError;
|
||||
},
|
||||
stop: vi.fn(async () => undefined),
|
||||
isRunning: () => false,
|
||||
};
|
||||
}
|
||||
return {
|
||||
task: async () => {
|
||||
abort.abort();
|
||||
},
|
||||
stop: vi.fn(async () => undefined),
|
||||
isRunning: () => false,
|
||||
};
|
||||
});
|
||||
|
||||
const session = new TelegramPollingSession({
|
||||
token: "tok",
|
||||
config: {},
|
||||
accountId: "default",
|
||||
runtime: undefined,
|
||||
proxyFetch: undefined,
|
||||
const session = createPollingSessionWithTransportRestart({
|
||||
abortSignal: abort.signal,
|
||||
runnerOptions: {},
|
||||
getLastUpdateId: () => null,
|
||||
persistUpdateId: async () => undefined,
|
||||
log: () => undefined,
|
||||
telegramTransport: transport1,
|
||||
createTelegramTransport,
|
||||
});
|
||||
|
||||
await session.runUntilAbort();
|
||||
|
||||
expect(createTelegramBotMock).toHaveBeenCalledTimes(2);
|
||||
expect(createTelegramBotMock.mock.calls[0]?.[0]?.telegramTransport).toBe(transport1);
|
||||
expect(createTelegramBotMock.mock.calls[1]?.[0]?.telegramTransport).toBe(transport2);
|
||||
expectTelegramBotTransportSequence(transport1, transport2);
|
||||
expect(createTelegramTransport).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
@@ -357,55 +372,21 @@ describe("TelegramPollingSession", () => {
|
||||
method: "getUpdates",
|
||||
},
|
||||
);
|
||||
const transport1 = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch };
|
||||
const createTelegramTransport = vi.fn(() => ({
|
||||
fetch: globalThis.fetch,
|
||||
sourceFetch: globalThis.fetch,
|
||||
}));
|
||||
const transport1 = makeTelegramTransport();
|
||||
const createTelegramTransport = vi.fn(() => makeTelegramTransport());
|
||||
createTelegramBotMock.mockReturnValueOnce(makeBot()).mockReturnValueOnce(makeBot());
|
||||
isRecoverableTelegramNetworkErrorMock.mockReturnValue(false);
|
||||
mockRestartAfterPollingError(conflictError, abort);
|
||||
|
||||
let firstCycle = true;
|
||||
runMock.mockImplementation(() => {
|
||||
if (firstCycle) {
|
||||
firstCycle = false;
|
||||
return {
|
||||
task: async () => {
|
||||
throw conflictError;
|
||||
},
|
||||
stop: vi.fn(async () => undefined),
|
||||
isRunning: () => false,
|
||||
};
|
||||
}
|
||||
return {
|
||||
task: async () => {
|
||||
abort.abort();
|
||||
},
|
||||
stop: vi.fn(async () => undefined),
|
||||
isRunning: () => false,
|
||||
};
|
||||
});
|
||||
|
||||
const session = new TelegramPollingSession({
|
||||
token: "tok",
|
||||
config: {},
|
||||
accountId: "default",
|
||||
runtime: undefined,
|
||||
proxyFetch: undefined,
|
||||
const session = createPollingSessionWithTransportRestart({
|
||||
abortSignal: abort.signal,
|
||||
runnerOptions: {},
|
||||
getLastUpdateId: () => null,
|
||||
persistUpdateId: async () => undefined,
|
||||
log: () => undefined,
|
||||
telegramTransport: transport1,
|
||||
createTelegramTransport,
|
||||
});
|
||||
|
||||
await session.runUntilAbort();
|
||||
|
||||
expect(createTelegramBotMock).toHaveBeenCalledTimes(2);
|
||||
expect(createTelegramBotMock.mock.calls[0]?.[0]?.telegramTransport).toBe(transport1);
|
||||
expect(createTelegramBotMock.mock.calls[1]?.[0]?.telegramTransport).toBe(transport1);
|
||||
expectTelegramBotTransportSequence(transport1, transport1);
|
||||
expect(createTelegramTransport).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user