mirror of
https://github.com/moltbot/moltbot.git
synced 2026-05-02 02:57:51 +00:00
fix: harden bonjour retry recovery
This commit is contained in:
@@ -264,14 +264,16 @@ describe("gateway bonjour advertiser", () => {
|
|||||||
await Promise.resolve();
|
await Promise.resolve();
|
||||||
expect(logWarn).toHaveBeenCalledWith(expect.stringContaining("advertise failed"));
|
expect(logWarn).toHaveBeenCalledWith(expect.stringContaining("advertise failed"));
|
||||||
|
|
||||||
// watchdog should attempt re-advertise at the 60s interval tick
|
// watchdog first retries, then recreates the advertiser after the service
|
||||||
|
// stays unhealthy across multiple 5s ticks.
|
||||||
await vi.advanceTimersByTimeAsync(15_000);
|
await vi.advanceTimersByTimeAsync(15_000);
|
||||||
expect(advertise).toHaveBeenCalledTimes(2);
|
expect(advertise).toHaveBeenCalledTimes(3);
|
||||||
|
expect(createService).toHaveBeenCalledTimes(2);
|
||||||
|
|
||||||
await started.stop();
|
await started.stop();
|
||||||
|
|
||||||
await vi.advanceTimersByTimeAsync(60_000);
|
await vi.advanceTimersByTimeAsync(60_000);
|
||||||
expect(advertise).toHaveBeenCalledTimes(2);
|
expect(advertise).toHaveBeenCalledTimes(3);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("handles advertise throwing synchronously", async () => {
|
it("handles advertise throwing synchronously", async () => {
|
||||||
@@ -338,6 +340,44 @@ describe("gateway bonjour advertiser", () => {
|
|||||||
expect(shutdown).toHaveBeenCalledTimes(2);
|
expect(shutdown).toHaveBeenCalledTimes(2);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("treats probing-to-announcing churn as one unhealthy window", async () => {
|
||||||
|
enableAdvertiserUnitMode();
|
||||||
|
vi.useFakeTimers();
|
||||||
|
|
||||||
|
const stateRef = { value: "probing" };
|
||||||
|
let advertiseCount = 0;
|
||||||
|
const destroy = vi.fn().mockResolvedValue(undefined);
|
||||||
|
const advertise = vi.fn().mockImplementation(() => {
|
||||||
|
advertiseCount += 1;
|
||||||
|
if (advertiseCount === 2) {
|
||||||
|
stateRef.value = "announcing";
|
||||||
|
}
|
||||||
|
if (advertiseCount >= 3) {
|
||||||
|
stateRef.value = "announced";
|
||||||
|
}
|
||||||
|
return Promise.resolve();
|
||||||
|
});
|
||||||
|
mockCiaoService({ advertise, destroy, stateRef });
|
||||||
|
|
||||||
|
const started = await startGatewayBonjourAdvertiser({
|
||||||
|
gatewayPort: 18789,
|
||||||
|
sshPort: 2222,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(createService).toHaveBeenCalledTimes(1);
|
||||||
|
expect(advertise).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
await vi.advanceTimersByTimeAsync(15_000);
|
||||||
|
|
||||||
|
expect(logWarn).toHaveBeenCalledWith(expect.stringContaining("service stuck in announcing"));
|
||||||
|
expect(createService).toHaveBeenCalledTimes(2);
|
||||||
|
expect(advertise).toHaveBeenCalledTimes(3);
|
||||||
|
expect(destroy).toHaveBeenCalledTimes(1);
|
||||||
|
expect(shutdown).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
await started.stop();
|
||||||
|
});
|
||||||
|
|
||||||
it("normalizes hostnames with domains for service names", async () => {
|
it("normalizes hostnames with domains for service names", async () => {
|
||||||
// Allow advertiser to run in unit tests.
|
// Allow advertiser to run in unit tests.
|
||||||
delete process.env.VITEST;
|
delete process.env.VITEST;
|
||||||
|
|||||||
@@ -90,6 +90,10 @@ function serviceSummary(label: string, svc: BonjourService): string {
|
|||||||
return `${label} fqdn=${fqdn} host=${hostname} port=${port} state=${state}`;
|
return `${label} fqdn=${fqdn} host=${hostname} port=${port} state=${state}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function isAnnouncedState(state: BonjourServiceState | "unknown") {
|
||||||
|
return String(state) === "announced";
|
||||||
|
}
|
||||||
|
|
||||||
export async function startGatewayBonjourAdvertiser(
|
export async function startGatewayBonjourAdvertiser(
|
||||||
opts: GatewayBonjourAdvertiseOpts,
|
opts: GatewayBonjourAdvertiseOpts,
|
||||||
): Promise<GatewayBonjourAdvertiser> {
|
): Promise<GatewayBonjourAdvertiser> {
|
||||||
@@ -181,6 +185,20 @@ export async function startGatewayBonjourAdvertiser(
|
|||||||
if (!cycle) {
|
if (!cycle) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
const responder = cycle.responder as {
|
||||||
|
advertiseService?: (...args: unknown[]) => unknown;
|
||||||
|
announce?: (...args: unknown[]) => unknown;
|
||||||
|
probe?: (...args: unknown[]) => unknown;
|
||||||
|
republishService?: (...args: unknown[]) => unknown;
|
||||||
|
};
|
||||||
|
const noopAsync = async () => {};
|
||||||
|
// ciao schedules its own 2s retry timers after failed probe/announce attempts.
|
||||||
|
// Those callbacks target the original responder instance, so disarm it before
|
||||||
|
// destroy/shutdown to prevent a dead cycle from re-entering advertise/probe.
|
||||||
|
responder.advertiseService = noopAsync;
|
||||||
|
responder.announce = noopAsync;
|
||||||
|
responder.probe = noopAsync;
|
||||||
|
responder.republishService = noopAsync;
|
||||||
for (const { svc } of cycle.services) {
|
for (const { svc } of cycle.services) {
|
||||||
try {
|
try {
|
||||||
await svc.destroy();
|
await svc.destroy();
|
||||||
@@ -257,8 +275,12 @@ export async function startGatewayBonjourAdvertiser(
|
|||||||
const nextState: BonjourServiceState | "unknown" =
|
const nextState: BonjourServiceState | "unknown" =
|
||||||
typeof svc.serviceState === "string" ? svc.serviceState : "unknown";
|
typeof svc.serviceState === "string" ? svc.serviceState : "unknown";
|
||||||
const current = stateTracker.get(label);
|
const current = stateTracker.get(label);
|
||||||
if (!current || current.state !== nextState) {
|
const nextEnteredAt =
|
||||||
stateTracker.set(label, { state: nextState, sinceMs: now });
|
current && !isAnnouncedState(current.state) && !isAnnouncedState(nextState)
|
||||||
|
? current.sinceMs
|
||||||
|
: now;
|
||||||
|
if (!current || current.state !== nextState || current.sinceMs !== nextEnteredAt) {
|
||||||
|
stateTracker.set(label, { state: nextState, sinceMs: nextEnteredAt });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -299,12 +321,12 @@ export async function startGatewayBonjourAdvertiser(
|
|||||||
}
|
}
|
||||||
const tracked = stateTracker.get(label);
|
const tracked = stateTracker.get(label);
|
||||||
if (
|
if (
|
||||||
stateUnknown === "announcing" &&
|
stateUnknown !== "announced" &&
|
||||||
tracked &&
|
tracked &&
|
||||||
Date.now() - tracked.sinceMs >= STUCK_ANNOUNCING_MS
|
Date.now() - tracked.sinceMs >= STUCK_ANNOUNCING_MS
|
||||||
) {
|
) {
|
||||||
void recreateAdvertiser(
|
void recreateAdvertiser(
|
||||||
`service stuck announcing for ${Date.now() - tracked.sinceMs}ms (${serviceSummary(
|
`service stuck in ${stateUnknown} for ${Date.now() - tracked.sinceMs}ms (${serviceSummary(
|
||||||
label,
|
label,
|
||||||
svc,
|
svc,
|
||||||
)})`,
|
)})`,
|
||||||
|
|||||||
Reference in New Issue
Block a user