Files
moltbot/src/cli/gateway-cli/run-loop.ts
2026-04-28 18:42:49 -07:00

538 lines
21 KiB
TypeScript

import net from "node:net";
import type { startGatewayServer } from "../../gateway/server.js";
import { formatErrorMessage } from "../../infra/errors.js";
import { acquireGatewayLock } from "../../infra/gateway-lock.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import type { RuntimeEnv } from "../../runtime.js";
const gatewayLog = createSubsystemLogger("gateway");
const LAUNCHD_SUPERVISED_RESTART_EXIT_DELAY_MS = 1500;
const DEFAULT_RESTART_DRAIN_TIMEOUT_MS = 300_000;
const RESTART_DRAIN_STILL_PENDING_WARN_MS = 30_000;
const UPDATE_RESPAWN_HEALTH_TIMEOUT_MS = 10_000;
const UPDATE_RESPAWN_HEALTH_POLL_MS = 200;
type GatewayRunSignalAction = "stop" | "restart";
type RestartDrainTimeoutMs = number | undefined;
type EmbeddedRunsModule = typeof import("../../agents/pi-embedded-runner/runs.js");
type RuntimeConfigModule = typeof import("../../config/config.js");
type ProcessRespawnModule = typeof import("../../infra/process-respawn.js");
type RestartSentinelModule = typeof import("../../infra/restart-sentinel.js");
type RestartModule = typeof import("../../infra/restart.js");
type SupervisorMarkersModule = typeof import("../../infra/supervisor-markers.js");
type DiagnosticStabilityBundleModule =
typeof import("../../logging/diagnostic-stability-bundle.js");
type BundledRuntimeDepsActivityModule =
typeof import("../../plugins/bundled-runtime-deps-activity.js");
type CommandQueueModule = typeof import("../../process/command-queue.js");
type RuntimeInternalModule = typeof import("../../tasks/runtime-internal.js");
let embeddedRunsModule: Promise<EmbeddedRunsModule> | undefined;
let runtimeConfigModule: Promise<RuntimeConfigModule> | undefined;
let processRespawnModule: Promise<ProcessRespawnModule> | undefined;
let restartSentinelModule: Promise<RestartSentinelModule> | undefined;
let restartModule: Promise<RestartModule> | undefined;
let supervisorMarkersModule: Promise<SupervisorMarkersModule> | undefined;
let diagnosticStabilityBundleModule: Promise<DiagnosticStabilityBundleModule> | undefined;
let bundledRuntimeDepsActivityModule: Promise<BundledRuntimeDepsActivityModule> | undefined;
let commandQueueModule: Promise<CommandQueueModule> | undefined;
let runtimeInternalModule: Promise<RuntimeInternalModule> | undefined;
const loadEmbeddedRunsModule = () =>
(embeddedRunsModule ??= import("../../agents/pi-embedded-runner/runs.js"));
const loadRuntimeConfigModule = () => (runtimeConfigModule ??= import("../../config/config.js"));
const loadProcessRespawnModule = () =>
(processRespawnModule ??= import("../../infra/process-respawn.js"));
const loadRestartSentinelModule = () =>
(restartSentinelModule ??= import("../../infra/restart-sentinel.js"));
const loadRestartModule = () => (restartModule ??= import("../../infra/restart.js"));
const loadSupervisorMarkersModule = () =>
(supervisorMarkersModule ??= import("../../infra/supervisor-markers.js"));
const loadDiagnosticStabilityBundleModule = () =>
(diagnosticStabilityBundleModule ??= import("../../logging/diagnostic-stability-bundle.js"));
const loadBundledRuntimeDepsActivityModule = () =>
(bundledRuntimeDepsActivityModule ??= import("../../plugins/bundled-runtime-deps-activity.js"));
const loadCommandQueueModule = () =>
(commandQueueModule ??= import("../../process/command-queue.js"));
const loadRuntimeInternalModule = () =>
(runtimeInternalModule ??= import("../../tasks/runtime-internal.js"));
function createRestartIterationHook(onRestart: () => Promise<void> | void): () => Promise<boolean> {
let isFirstIteration = true;
return async () => {
if (isFirstIteration) {
isFirstIteration = false;
return false;
}
await onRestart();
return true;
};
}
async function waitForGatewayPortReady(host: string, port: number): Promise<boolean> {
return await new Promise<boolean>((resolve) => {
const socket = net.createConnection({ host, port });
let settled = false;
const finish = (value: boolean) => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
socket.removeAllListeners();
socket.destroy();
resolve(value);
};
const timer = setTimeout(() => {
finish(false);
}, UPDATE_RESPAWN_HEALTH_POLL_MS);
socket.once("connect", () => finish(true));
socket.once("error", () => finish(false));
});
}
async function waitForHealthyGatewayChild(
port: number,
_pid?: number,
host = "127.0.0.1",
timeoutMs = UPDATE_RESPAWN_HEALTH_TIMEOUT_MS,
): Promise<boolean> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (await waitForGatewayPortReady(host, port)) {
return true;
}
await new Promise<void>((resolve) => {
setTimeout(resolve, UPDATE_RESPAWN_HEALTH_POLL_MS);
});
}
return false;
}
export async function runGatewayLoop(params: {
start: (params?: {
startupStartedAt?: number;
}) => Promise<Awaited<ReturnType<typeof startGatewayServer>>>;
runtime: RuntimeEnv;
lockPort?: number;
healthHost?: string;
waitForHealthyChild?: (port: number, pid?: number, host?: string) => Promise<boolean>;
}) {
let startupStartedAt = Date.now();
let lock = await acquireGatewayLock({ port: params.lockPort });
let server: Awaited<ReturnType<typeof startGatewayServer>> | null = null;
let shuttingDown = false;
let restartResolver: (() => void) | null = null;
const waitForHealthyChild = params.waitForHealthyChild ?? waitForHealthyGatewayChild;
const cleanupSignals = () => {
process.removeListener("SIGTERM", onSigterm);
process.removeListener("SIGINT", onSigint);
process.removeListener("SIGUSR1", onSigusr1);
};
const exitProcess = (code: number) => {
cleanupSignals();
params.runtime.exit(code);
};
const writeStabilityBundle = async (reason: string, error?: unknown) => {
const { writeDiagnosticStabilityBundleForFailureSync } =
await loadDiagnosticStabilityBundleModule();
const result = writeDiagnosticStabilityBundleForFailureSync(reason, error);
if ("message" in result) {
gatewayLog.warn(result.message);
}
};
const releaseLockIfHeld = async (): Promise<boolean> => {
if (!lock) {
return false;
}
await lock.release();
lock = null;
return true;
};
const reacquireLockForInProcessRestart = async (): Promise<boolean> => {
try {
startupStartedAt = Date.now();
lock = await acquireGatewayLock({ port: params.lockPort });
return true;
} catch (err) {
gatewayLog.error(`failed to reacquire gateway lock for in-process restart: ${String(err)}`);
exitProcess(1);
return false;
}
};
const handleRestartAfterServerClose = async (restartReason?: string) => {
const hadLock = await releaseLockIfHeld();
const isUpdateRestart = restartReason === "update.run";
const { respawnGatewayProcessForUpdate, restartGatewayProcessWithFreshPid } =
await loadProcessRespawnModule();
const { detectRespawnSupervisor } = await loadSupervisorMarkersModule();
const { markUpdateRestartSentinelFailure } = await loadRestartSentinelModule();
if (isUpdateRestart) {
const respawn = respawnGatewayProcessForUpdate();
if (respawn.mode === "spawned") {
const port = params.lockPort;
const healthy =
typeof port === "number"
? await waitForHealthyChild(port, respawn.pid, params.healthHost ?? "127.0.0.1")
: false;
if (healthy) {
gatewayLog.info(
`restart mode: update process respawn (spawned pid ${respawn.pid ?? "unknown"})`,
);
exitProcess(0);
return;
}
gatewayLog.warn(
`update respawn child did not become healthy (${respawn.pid ?? "unknown"}); falling back to in-process restart`,
);
try {
respawn.child?.kill();
} catch {
// Best-effort; parent fallback keeps the gateway reachable for recovery.
}
await markUpdateRestartSentinelFailure("restart-unhealthy").catch((err) => {
gatewayLog.warn(`failed to mark update restart sentinel unhealthy: ${String(err)}`);
});
if (hadLock && !(await reacquireLockForInProcessRestart())) {
return;
}
shuttingDown = false;
restartResolver?.();
return;
}
if (respawn.mode === "supervised") {
gatewayLog.info("restart mode: update process respawn (supervisor restart)");
if (detectRespawnSupervisor(process.env, process.platform) === "launchd") {
await new Promise((resolve) => {
setTimeout(resolve, LAUNCHD_SUPERVISED_RESTART_EXIT_DELAY_MS);
});
}
exitProcess(0);
return;
}
if (respawn.mode === "failed") {
gatewayLog.warn(
`update respawn failed (${respawn.detail ?? "unknown error"}); falling back to in-process restart`,
);
await markUpdateRestartSentinelFailure("restart-unhealthy").catch((err) => {
gatewayLog.warn(`failed to mark update restart sentinel unhealthy: ${String(err)}`);
});
} else {
gatewayLog.info(
`restart mode: in-process restart (${respawn.detail ?? "OPENCLAW_NO_RESPAWN"})`,
);
}
if (hadLock && !(await reacquireLockForInProcessRestart())) {
return;
}
shuttingDown = false;
restartResolver?.();
return;
}
// Release the lock BEFORE spawning so the child can acquire it immediately.
const respawn = restartGatewayProcessWithFreshPid();
if (respawn.mode === "spawned" || respawn.mode === "supervised") {
const modeLabel =
respawn.mode === "spawned"
? `spawned pid ${respawn.pid ?? "unknown"}`
: "supervisor restart";
gatewayLog.info(`restart mode: full process restart (${modeLabel})`);
if (
respawn.mode === "supervised" &&
detectRespawnSupervisor(process.env, process.platform) === "launchd"
) {
// A short clean-exit pause keeps rapid SIGUSR1/config restarts from
// tripping launchd crash-loop throttling before KeepAlive relaunches.
await new Promise((resolve) => {
setTimeout(resolve, LAUNCHD_SUPERVISED_RESTART_EXIT_DELAY_MS);
});
}
exitProcess(0);
return;
}
if (respawn.mode === "failed") {
await writeStabilityBundle("gateway.restart_respawn_failed");
gatewayLog.warn(
`full process restart failed (${respawn.detail ?? "unknown error"}); falling back to in-process restart`,
);
} else {
gatewayLog.info(
`restart mode: in-process restart (${respawn.detail ?? "OPENCLAW_NO_RESPAWN"})`,
);
}
if (hadLock && !(await reacquireLockForInProcessRestart())) {
return;
}
shuttingDown = false;
restartResolver?.();
};
const handleStopAfterServerClose = async () => {
await releaseLockIfHeld();
exitProcess(0);
};
const SUPERVISOR_STOP_TIMEOUT_MS = 30_000;
const SHUTDOWN_TIMEOUT_MS = SUPERVISOR_STOP_TIMEOUT_MS - 5_000;
const resolveRestartDrainTimeoutMs = async (): Promise<RestartDrainTimeoutMs> => {
try {
const [{ getRuntimeConfig }, { resolveGatewayRestartDeferralTimeoutMs }] = await Promise.all([
loadRuntimeConfigModule(),
loadRestartModule(),
]);
const timeoutMs = getRuntimeConfig().gateway?.reload?.deferralTimeoutMs;
return resolveGatewayRestartDeferralTimeoutMs(timeoutMs);
} catch {
return DEFAULT_RESTART_DRAIN_TIMEOUT_MS;
}
};
const request = (action: GatewayRunSignalAction, signal: string, restartReason?: string) => {
if (shuttingDown) {
gatewayLog.info(`received ${signal} during shutdown; ignoring`);
return;
}
shuttingDown = true;
const isRestart = action === "restart";
gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`);
let forceExitTimer: ReturnType<typeof setTimeout> | null = null;
const armForceExitTimer = (forceExitMs: number) => {
if (forceExitTimer) {
return;
}
forceExitTimer = setTimeout(() => {
gatewayLog.error("shutdown timed out; exiting without full cleanup");
void (async () => {
try {
await writeStabilityBundle(
isRestart ? "gateway.restart_shutdown_timeout" : "gateway.stop_shutdown_timeout",
);
} finally {
// Keep the in-process watchdog below the supervisor stop budget so this
// path wins before launchd/systemd escalates to a hard kill. Exit
// non-zero on any timeout so supervised installs restart cleanly.
exitProcess(1);
}
})();
}, forceExitMs);
};
const clearForceExitTimer = () => {
if (!forceExitTimer) {
return;
}
clearTimeout(forceExitTimer);
forceExitTimer = null;
};
void (async () => {
const restartDrainTimeoutMs = isRestart ? await resolveRestartDrainTimeoutMs() : 0;
if (!isRestart) {
armForceExitTimer(SHUTDOWN_TIMEOUT_MS);
} else if (restartDrainTimeoutMs !== undefined) {
// Allow extra time for draining active turns on explicitly capped restarts.
armForceExitTimer(restartDrainTimeoutMs + SHUTDOWN_TIMEOUT_MS);
}
const formatRestartDrainBudget = () =>
restartDrainTimeoutMs === undefined
? "without a timeout"
: `with timeout ${restartDrainTimeoutMs}ms`;
const armCloseForceExitTimerForIndefiniteRestart = () => {
if (isRestart && restartDrainTimeoutMs === undefined) {
armForceExitTimer(SHUTDOWN_TIMEOUT_MS);
}
};
try {
// On restart, wait for in-flight agent turns to finish before
// tearing down the server so buffered messages are delivered.
if (isRestart) {
const [
{ abortEmbeddedPiRun, getActiveEmbeddedRunCount, waitForActiveEmbeddedRuns },
{ getActiveBundledRuntimeDepsInstallCount, waitForBundledRuntimeDepsInstallIdle },
{ getActiveTaskCount, markGatewayDraining, waitForActiveTasks },
] = await Promise.all([
loadEmbeddedRunsModule(),
loadBundledRuntimeDepsActivityModule(),
loadCommandQueueModule(),
]);
const createStillPendingDrainLogger = () =>
setInterval(() => {
gatewayLog.warn(
`still draining ${getActiveTaskCount()} active task(s), ${getActiveEmbeddedRunCount()} active embedded run(s), and ${getActiveBundledRuntimeDepsInstallCount()} runtime deps install(s) before restart`,
);
}, RESTART_DRAIN_STILL_PENDING_WARN_MS);
// Reject new enqueues immediately during the drain window so
// sessions get an explicit restart error instead of silent task loss.
markGatewayDraining();
const activeTasks = getActiveTaskCount();
const activeRuns = getActiveEmbeddedRunCount();
const activeRuntimeDepsInstalls = getActiveBundledRuntimeDepsInstallCount();
// Best-effort abort for compacting runs so long compaction operations
// don't hold session write locks across restart boundaries.
if (activeRuns > 0) {
abortEmbeddedPiRun(undefined, { mode: "compacting" });
}
if (activeTasks > 0 || activeRuns > 0 || activeRuntimeDepsInstalls > 0) {
gatewayLog.info(
`draining ${activeTasks} active task(s), ${activeRuns} active embedded run(s), and ${activeRuntimeDepsInstalls} runtime deps install(s) before restart ${formatRestartDrainBudget()}`,
);
const stillPendingDrainLogger = createStillPendingDrainLogger();
const [tasksDrain, runsDrain, runtimeDepsDrain] = await Promise.all([
activeTasks > 0
? waitForActiveTasks(restartDrainTimeoutMs)
: Promise.resolve({ drained: true }),
activeRuns > 0
? waitForActiveEmbeddedRuns(restartDrainTimeoutMs)
: Promise.resolve({ drained: true }),
activeRuntimeDepsInstalls > 0
? waitForBundledRuntimeDepsInstallIdle(restartDrainTimeoutMs)
: Promise.resolve({ drained: true }),
]).finally(() => clearInterval(stillPendingDrainLogger));
if (tasksDrain.drained && runsDrain.drained && runtimeDepsDrain.drained) {
gatewayLog.info("all active work drained");
} else {
gatewayLog.warn("drain timeout reached; proceeding with restart");
// Final best-effort abort to avoid carrying active runs into the
// next lifecycle when drain time budget is exhausted.
abortEmbeddedPiRun(undefined, { mode: "all" });
}
}
}
armCloseForceExitTimerForIndefiniteRestart();
await server?.close({
reason: isRestart ? "gateway restarting" : "gateway stopping",
restartExpectedMs: isRestart ? 1500 : null,
});
} catch (err) {
gatewayLog.error(`shutdown error: ${String(err)}`);
} finally {
clearForceExitTimer();
server = null;
if (isRestart) {
await handleRestartAfterServerClose(restartReason);
} else {
await handleStopAfterServerClose();
}
}
})();
};
const onSigterm = () => {
gatewayLog.info("signal SIGTERM received");
void (async () => {
const { consumeGatewayRestartIntentSync } = await loadRestartModule();
request(consumeGatewayRestartIntentSync() ? "restart" : "stop", "SIGTERM");
})();
};
const onSigint = () => {
gatewayLog.info("signal SIGINT received");
request("stop", "SIGINT");
};
const onSigusr1 = () => {
gatewayLog.info("signal SIGUSR1 received");
void (async () => {
const {
consumeGatewaySigusr1RestartAuthorization,
isGatewaySigusr1RestartExternallyAllowed,
markGatewaySigusr1RestartHandled,
peekGatewaySigusr1RestartReason,
scheduleGatewaySigusr1Restart,
} = await loadRestartModule();
const authorized = consumeGatewaySigusr1RestartAuthorization();
if (!authorized) {
if (!isGatewaySigusr1RestartExternallyAllowed()) {
gatewayLog.warn(
"SIGUSR1 restart ignored (not authorized; commands.restart=false or use gateway tool).",
);
return;
}
if (shuttingDown) {
gatewayLog.info("received SIGUSR1 during shutdown; ignoring");
return;
}
// External SIGUSR1 requests should still reuse the in-process restart
// scheduler so idle drain and restart coalescing stay consistent.
scheduleGatewaySigusr1Restart({ delayMs: 0, reason: "SIGUSR1" });
return;
}
const restartReason = peekGatewaySigusr1RestartReason();
markGatewaySigusr1RestartHandled();
request("restart", "SIGUSR1", restartReason);
})();
};
process.on("SIGTERM", onSigterm);
process.on("SIGINT", onSigint);
process.on("SIGUSR1", onSigusr1);
try {
const onIteration = createRestartIterationHook(async () => {
// After an in-process restart (SIGUSR1), reset command-queue lane state.
// Interrupted tasks from the previous lifecycle may have left `active`
// counts elevated (their finally blocks never ran), permanently blocking
// new work from draining. The same boundary also discards stale restart
// deferral timers and reloads the task registry from durable state so
// cancelled/completed work is not kept alive by old in-memory maps.
const [
{ resetAllLanes },
{ resetGatewayRestartStateForInProcessRestart },
{ reloadTaskRegistryFromStore },
] = await Promise.all([
loadCommandQueueModule(),
loadRestartModule(),
loadRuntimeInternalModule(),
]);
resetAllLanes();
resetGatewayRestartStateForInProcessRestart();
reloadTaskRegistryFromStore();
});
// Keep process alive; SIGUSR1 triggers an in-process restart (no supervisor required).
// SIGTERM/SIGINT still exit after a graceful shutdown.
let isFirstStart = true;
for (;;) {
await onIteration();
try {
server = await params.start({ startupStartedAt });
isFirstStart = false;
} catch (err) {
// On initial startup, let the error propagate so the outer handler
// can report "Gateway failed to start" and exit non-zero. Only
// swallow errors on subsequent in-process restarts to keep the
// process alive (a crash would lose macOS TCC permissions). (#35862)
if (isFirstStart) {
throw err;
}
server = null;
// Release the gateway lock so that `daemon restart/stop` (which
// discovers PIDs via the gateway port) can still manage the process.
// Without this, the process holds the lock but is not listening,
// forcing manual cleanup. (#35862)
await releaseLockIfHeld();
const errMsg = formatErrorMessage(err);
const errStack = err instanceof Error && err.stack ? `\n${err.stack}` : "";
await writeStabilityBundle("gateway.restart_startup_failed", err);
gatewayLog.error(
`gateway startup failed: ${errMsg}. ` +
`Process will stay alive; fix the issue and restart.${errStack}`,
);
}
await new Promise<void>((resolve) => {
restartResolver = resolve;
});
}
} finally {
await releaseLockIfHeld();
cleanupSignals();
}
}