diff --git a/CHANGELOG.md b/CHANGELOG.md index c110e2f612f..c7252c469cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ Docs: https://docs.openclaw.ai - Android/Nodes: harden `app.update` by requiring HTTPS and gateway-host URL matching plus SHA-256 verification, stream URL camera downloads to disk with size guards to avoid memory spikes, and stop signing release builds with debug keys. (#13541) Thanks @smartprogrammer93. - Auto-reply/Threading: auto-inject implicit reply threading so `replyToMode` works without requiring model-emitted `[[reply_to_current]]`, while preserving `replyToMode: "off"` behavior for implicit Slack replies and keeping block-streaming chunk coalescing stable under `replyToMode: "first"`. (#14976) Thanks @Diaspar4u. - Sandbox: pass configured `sandbox.docker.env` variables to sandbox containers at `docker create` time. (#15138) Thanks @stevebot-alive. +- Gateway/Restart: clear stale command-queue and heartbeat wake runtime state after SIGUSR1 in-process restarts to prevent zombie gateway behavior where queued work stops draining. (#15195) Thanks @joeykrug. - Onboarding/CLI: restore terminal state without resuming paused `stdin`, so onboarding exits cleanly after choosing Web UI and the installer returns instead of appearing stuck. - Auth/OpenAI Codex: share OAuth login handling across onboarding and `models auth login --provider openai-codex`, keep onboarding alive when OAuth fails, and surface a direct OAuth help note instead of terminating the wizard. (#15406, follow-up to #14552) Thanks @zhiluo20. - Onboarding/Providers: add vLLM as an onboarding provider with model discovery, auth profile wiring, and non-interactive auth-choice validation. (#12577) Thanks @gejifeng. diff --git a/scripts/recover-orphaned-processes.sh b/scripts/recover-orphaned-processes.sh new file mode 100755 index 00000000000..d37c5ea4c80 --- /dev/null +++ b/scripts/recover-orphaned-processes.sh @@ -0,0 +1,191 @@ +#!/usr/bin/env bash +# Scan for orphaned coding agent processes after a gateway restart. +# +# Background coding agents (Claude Code, Codex CLI) spawned by the gateway +# can outlive the session that started them when the gateway restarts. +# This script finds them and reports their state. +# +# Usage: +# recover-orphaned-processes.sh +# +# Output: JSON object with `orphaned` array and `ts` timestamp. +set -euo pipefail + +usage() { + cat <<'USAGE' +Usage: recover-orphaned-processes.sh + +Scans for likely orphaned coding agent processes and prints JSON. +USAGE +} + +if [ "${1:-}" = "--help" ] || [ "${1:-}" = "-h" ]; then + usage + exit 0 +fi + +if [ "$#" -gt 0 ]; then + usage >&2 + exit 2 +fi + +if ! command -v node &>/dev/null; then + _ts="unknown" + command -v date &>/dev/null && _ts="$(date -u +%Y-%m-%dT%H:%M:%SZ 2>/dev/null)" || true + [ -z "$_ts" ] && _ts="unknown" + printf '{"error":"node not found on PATH","orphaned":[],"ts":"%s"}\n' "$_ts" + exit 0 +fi + +node <<'NODE' +const { execFileSync } = require("node:child_process"); +const fs = require("node:fs"); + +let username = process.env.USER || process.env.LOGNAME || ""; + +if (username && !/^[a-zA-Z0-9._-]+$/.test(username)) { + username = ""; +} + +function runFile(file, args) { + try { + return execFileSync(file, args, { + encoding: "utf8", + stdio: ["ignore", "pipe", "ignore"], + }); + } catch (err) { + if (err && typeof err.stdout === "string") { + return err.stdout; + } + if (err && err.stdout && Buffer.isBuffer(err.stdout)) { + return err.stdout.toString("utf8"); + } + return ""; + } +} + +function resolveStarted(pid) { + const started = runFile("ps", ["-o", "lstart=", "-p", String(pid)]).trim(); + return started.length > 0 ? started : "unknown"; +} + +function resolveCwd(pid) { + if (process.platform === "linux") { + try { + return fs.readlinkSync(`/proc/${pid}/cwd`); + } catch { + return "unknown"; + } + } + const lsof = runFile("lsof", ["-a", "-d", "cwd", "-p", String(pid), "-Fn"]); + const match = lsof.match(/^n(.+)$/m); + return match ? match[1] : "unknown"; +} + +function sanitizeCommand(cmd) { + // Avoid leaking obvious secrets when this diagnostic output is shared. + return cmd + .replace( + /(--(?:token|api[-_]?key|password|secret|authorization)\s+)([^\s]+)/gi, + "$1", + ) + .replace( + /((?:token|api[-_]?key|password|secret|authorization)=)([^\s]+)/gi, + "$1", + ) + .replace(/(Bearer\s+)[A-Za-z0-9._~+/=-]+/g, "$1"); +} + +// Pre-filter candidate PIDs using pgrep to avoid scanning all processes. +// Only falls back to a full ps scan when pgrep is genuinely unavailable +// (ENOENT), not when it simply finds no matches (exit code 1). +let pgrepUnavailable = false; +const pgrepResult = (() => { + const args = + username.length > 0 + ? ["-u", username, "-f", "codex|claude"] + : ["-f", "codex|claude"]; + try { + return execFileSync("pgrep", args, { + encoding: "utf8", + stdio: ["ignore", "pipe", "ignore"], + }); + } catch (err) { + if (err && err.code === "ENOENT") { + pgrepUnavailable = true; + return ""; + } + // pgrep exit code 1 = no matches — return stdout (empty) + if (err && typeof err.stdout === "string") return err.stdout; + return ""; + } +})(); + +const candidatePids = pgrepResult + .split("\n") + .map((s) => s.trim()) + .filter((s) => s.length > 0 && /^\d+$/.test(s)); + +let lines; +if (candidatePids.length > 0) { + // Fetch command info only for candidate PIDs. + lines = runFile("ps", ["-o", "pid=,command=", "-p", candidatePids.join(",")]).split("\n"); +} else if (pgrepUnavailable && username.length > 0) { + // pgrep not installed — fall back to user-scoped ps scan. + lines = runFile("ps", ["-U", username, "-o", "pid=,command="]).split("\n"); +} else if (pgrepUnavailable) { + // pgrep not installed and no username — full scan as last resort. + lines = runFile("ps", ["-axo", "pid=,command="]).split("\n"); +} else { + // pgrep ran successfully but found no matches — no orphans. + lines = []; +} + +const includePattern = /codex|claude/i; + +const excludePatterns = [ + /openclaw-gateway/i, + /signal-cli/i, + /node_modules\/\.bin\/openclaw/i, + /recover-orphaned-processes\.sh/i, +]; + +const orphaned = []; + +for (const rawLine of lines) { + const line = rawLine.trim(); + if (!line) { + continue; + } + const match = line.match(/^(\d+)\s+(.+)$/); + if (!match) { + continue; + } + + const pid = Number(match[1]); + const cmd = match[2]; + if (!Number.isInteger(pid) || pid <= 0 || pid === process.pid) { + continue; + } + if (!includePattern.test(cmd)) { + continue; + } + if (excludePatterns.some((pattern) => pattern.test(cmd))) { + continue; + } + + orphaned.push({ + pid, + cmd: sanitizeCommand(cmd), + cwd: resolveCwd(pid), + started: resolveStarted(pid), + }); +} + +process.stdout.write( + JSON.stringify({ + orphaned, + ts: new Date().toISOString(), + }) + "\n", +); +NODE diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts new file mode 100644 index 00000000000..928e02cc5e9 --- /dev/null +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -0,0 +1,119 @@ +import { describe, expect, it, vi } from "vitest"; + +const acquireGatewayLock = vi.fn(async () => ({ + release: vi.fn(async () => {}), +})); +const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true); +const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false); +const getActiveTaskCount = vi.fn(() => 0); +const waitForActiveTasks = vi.fn(async () => ({ drained: true })); +const resetAllLanes = vi.fn(); +const DRAIN_TIMEOUT_LOG = "drain timeout reached; proceeding with restart"; +const gatewayLog = { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +vi.mock("../../infra/gateway-lock.js", () => ({ + acquireGatewayLock: () => acquireGatewayLock(), +})); + +vi.mock("../../infra/restart.js", () => ({ + consumeGatewaySigusr1RestartAuthorization: () => consumeGatewaySigusr1RestartAuthorization(), + isGatewaySigusr1RestartExternallyAllowed: () => isGatewaySigusr1RestartExternallyAllowed(), +})); + +vi.mock("../../process/command-queue.js", () => ({ + getActiveTaskCount: () => getActiveTaskCount(), + waitForActiveTasks: (timeoutMs: number) => waitForActiveTasks(timeoutMs), + resetAllLanes: () => resetAllLanes(), +})); + +vi.mock("../../logging/subsystem.js", () => ({ + createSubsystemLogger: () => gatewayLog, +})); + +function removeNewSignalListeners( + signal: NodeJS.Signals, + existing: Set<(...args: unknown[]) => void>, +) { + for (const listener of process.listeners(signal)) { + const fn = listener as (...args: unknown[]) => void; + if (!existing.has(fn)) { + process.removeListener(signal, fn); + } + } +} + +describe("runGatewayLoop", () => { + it("restarts after SIGUSR1 even when drain times out, and resets lanes for the new iteration", async () => { + vi.clearAllMocks(); + getActiveTaskCount.mockReturnValueOnce(2).mockReturnValueOnce(0); + waitForActiveTasks.mockResolvedValueOnce({ drained: false }); + + type StartServer = () => Promise<{ + close: (opts: { reason: string; restartExpectedMs: number | null }) => Promise; + }>; + + const closeFirst = vi.fn(async () => {}); + const closeSecond = vi.fn(async () => {}); + const start = vi + .fn() + .mockResolvedValueOnce({ close: closeFirst }) + .mockResolvedValueOnce({ close: closeSecond }) + .mockRejectedValueOnce(new Error("stop-loop")); + + const beforeSigterm = new Set( + process.listeners("SIGTERM") as Array<(...args: unknown[]) => void>, + ); + const beforeSigint = new Set( + process.listeners("SIGINT") as Array<(...args: unknown[]) => void>, + ); + const beforeSigusr1 = new Set( + process.listeners("SIGUSR1") as Array<(...args: unknown[]) => void>, + ); + + const loopPromise = import("./run-loop.js").then(({ runGatewayLoop }) => + runGatewayLoop({ + start, + runtime: { + exit: vi.fn(), + } as { exit: (code: number) => never }, + }), + ); + + try { + await vi.waitFor(() => { + expect(start).toHaveBeenCalledTimes(1); + }); + + process.emit("SIGUSR1"); + + await vi.waitFor(() => { + expect(start).toHaveBeenCalledTimes(2); + }); + + expect(waitForActiveTasks).toHaveBeenCalledWith(30_000); + expect(gatewayLog.warn).toHaveBeenCalledWith(DRAIN_TIMEOUT_LOG); + expect(closeFirst).toHaveBeenCalledWith({ + reason: "gateway restarting", + restartExpectedMs: 1500, + }); + expect(resetAllLanes).toHaveBeenCalledTimes(1); + + process.emit("SIGUSR1"); + + await expect(loopPromise).rejects.toThrow("stop-loop"); + expect(closeSecond).toHaveBeenCalledWith({ + reason: "gateway restarting", + restartExpectedMs: 1500, + }); + expect(resetAllLanes).toHaveBeenCalledTimes(2); + } finally { + removeNewSignalListeners("SIGTERM", beforeSigterm); + removeNewSignalListeners("SIGINT", beforeSigint); + removeNewSignalListeners("SIGUSR1", beforeSigusr1); + } + }); +}); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 9486e199e35..ec582fdcb8d 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -6,7 +6,12 @@ import { isGatewaySigusr1RestartExternallyAllowed, } from "../../infra/restart.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; -import { getActiveTaskCount, waitForActiveTasks } from "../../process/command-queue.js"; +import { + getActiveTaskCount, + resetAllLanes, + waitForActiveTasks, +} from "../../process/command-queue.js"; +import { createRestartIterationHook } from "../../process/restart-recovery.js"; const gatewayLog = createSubsystemLogger("gateway"); @@ -111,10 +116,21 @@ export async function runGatewayLoop(params: { process.on("SIGUSR1", onSigusr1); try { + const onIteration = createRestartIterationHook(() => { + // After an in-process restart (SIGUSR1), reset command-queue lane state. + // Interrupted tasks from the previous lifecycle may have left `active` + // counts elevated (their finally blocks never ran), permanently blocking + // new work from draining. This must happen here — at the restart + // coordinator level — rather than inside individual subsystem init + // functions, to avoid surprising cross-cutting side effects. + resetAllLanes(); + }); + // Keep process alive; SIGUSR1 triggers an in-process restart (no supervisor required). // SIGTERM/SIGINT still exit after a graceful shutdown. // eslint-disable-next-line no-constant-condition while (true) { + onIteration(); server = await params.start(); await new Promise((resolve) => { restartResolver = resolve; diff --git a/src/infra/heartbeat-wake.test.ts b/src/infra/heartbeat-wake.test.ts index b3f8e0d32f7..63d47523023 100644 --- a/src/infra/heartbeat-wake.test.ts +++ b/src/infra/heartbeat-wake.test.ts @@ -173,6 +173,59 @@ describe("heartbeat-wake", () => { expect(handler).toHaveBeenCalledWith({ reason: "exec-event" }); }); + it("resets running/scheduled flags when new handler is registered", async () => { + vi.useFakeTimers(); + + // Simulate a handler that's mid-execution when SIGUSR1 fires. + // We do this by having the handler hang forever (never resolve). + let resolveHang: () => void; + const hangPromise = new Promise((r) => { + resolveHang = r; + }); + const handlerA = vi + .fn() + .mockReturnValue(hangPromise.then(() => ({ status: "ran" as const, durationMs: 1 }))); + setHeartbeatWakeHandler(handlerA); + + // Trigger the handler — it starts running but never finishes + requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); + await vi.advanceTimersByTimeAsync(1); + expect(handlerA).toHaveBeenCalledTimes(1); + + // Now simulate SIGUSR1: register a new handler while handlerA is still running. + // Without the fix, `running` would stay true and handlerB would never fire. + const handlerB = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + setHeartbeatWakeHandler(handlerB); + + // handlerB should be able to fire (running was reset) + requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); + await vi.advanceTimersByTimeAsync(1); + expect(handlerB).toHaveBeenCalledTimes(1); + + // Clean up the hanging promise + resolveHang!(); + await Promise.resolve(); + }); + + it("clears stale retry cooldown when a new handler is registered", async () => { + vi.useFakeTimers(); + const handlerA = vi.fn().mockResolvedValue({ status: "skipped", reason: "requests-in-flight" }); + setHeartbeatWakeHandler(handlerA); + + requestHeartbeatNow({ reason: "interval", coalesceMs: 0 }); + await vi.advanceTimersByTimeAsync(1); + expect(handlerA).toHaveBeenCalledTimes(1); + + // Simulate SIGUSR1 startup with a fresh wake handler. + const handlerB = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 }); + setHeartbeatWakeHandler(handlerB); + + requestHeartbeatNow({ reason: "manual", coalesceMs: 0 }); + await vi.advanceTimersByTimeAsync(1); + expect(handlerB).toHaveBeenCalledTimes(1); + expect(handlerB).toHaveBeenCalledWith({ reason: "manual" }); + }); + it("drains pending wake once a handler is registered", async () => { vi.useFakeTimers(); diff --git a/src/infra/heartbeat-wake.ts b/src/infra/heartbeat-wake.ts index 72f97378f67..6297b5ffb68 100644 --- a/src/infra/heartbeat-wake.ts +++ b/src/infra/heartbeat-wake.ts @@ -146,6 +146,23 @@ export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null): () = handlerGeneration += 1; const generation = handlerGeneration; handler = next; + if (next) { + // New lifecycle starting (e.g. after SIGUSR1 in-process restart). + // Clear any timer metadata from the previous lifecycle so stale retry + // cooldowns do not delay a fresh handler. + if (timer) { + clearTimeout(timer); + } + timer = null; + timerDueAt = null; + timerKind = null; + // Reset module-level execution state that may be stale from interrupted + // runs in the previous lifecycle. Without this, `running === true` from + // an interrupted heartbeat blocks all future schedule() attempts, and + // `scheduled === true` can cause spurious immediate re-runs. + running = false; + scheduled = false; + } if (handler && pendingWake) { schedule(DEFAULT_COALESCE_MS, "normal"); } diff --git a/src/macos/gateway-daemon.ts b/src/macos/gateway-daemon.ts index eb02c060640..38fd5485ff0 100644 --- a/src/macos/gateway-daemon.ts +++ b/src/macos/gateway-daemon.ts @@ -52,6 +52,8 @@ async function main() { { consumeGatewaySigusr1RestartAuthorization, isGatewaySigusr1RestartExternallyAllowed }, { defaultRuntime }, { enableConsoleCapture, setConsoleTimestampPrefix }, + commandQueueMod, + { createRestartIterationHook }, ] = await Promise.all([ import("../config/config.js"), import("../gateway/server.js"), @@ -61,6 +63,8 @@ async function main() { import("../infra/restart.js"), import("../runtime.js"), import("../logging.js"), + import("../process/command-queue.js"), + import("../process/restart-recovery.js"), ] as const); enableConsoleCapture(); @@ -132,14 +136,32 @@ async function main() { `gateway: received ${signal}; ${isRestart ? "restarting" : "shutting down"}`, ); + const DRAIN_TIMEOUT_MS = 30_000; + const SHUTDOWN_TIMEOUT_MS = 5_000; + const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS; forceExitTimer = setTimeout(() => { defaultRuntime.error("gateway: shutdown timed out; exiting without full cleanup"); cleanupSignals(); process.exit(0); - }, 5000); + }, forceExitMs); void (async () => { try { + if (isRestart) { + const activeTasks = commandQueueMod.getActiveTaskCount(); + if (activeTasks > 0) { + defaultRuntime.log( + `gateway: draining ${activeTasks} active task(s) before restart (timeout ${DRAIN_TIMEOUT_MS}ms)`, + ); + const { drained } = await commandQueueMod.waitForActiveTasks(DRAIN_TIMEOUT_MS); + if (drained) { + defaultRuntime.log("gateway: all active tasks drained"); + } else { + defaultRuntime.log("gateway: drain timeout reached; proceeding with restart"); + } + } + } + await server?.close({ reason: isRestart ? "gateway restarting" : "gateway stopping", restartExpectedMs: isRestart ? 1500 : null, @@ -196,8 +218,17 @@ async function main() { } throw err; } + const onIteration = createRestartIterationHook(() => { + // After an in-process restart (SIGUSR1), reset command-queue lane state. + // Interrupted tasks from the previous lifecycle may have left `active` + // counts elevated (their finally blocks never ran), permanently blocking + // new work from draining. + commandQueueMod.resetAllLanes(); + }); + // eslint-disable-next-line no-constant-condition while (true) { + onIteration(); try { server = await startGatewayServer(port, { bind }); } catch (err) { @@ -210,7 +241,7 @@ async function main() { }); } } finally { - await (lock as GatewayLockHandle | null)?.release(); + await lock?.release(); cleanupSignals(); } } diff --git a/src/process/command-queue.test.ts b/src/process/command-queue.test.ts index 60034b43929..5c0b20930af 100644 --- a/src/process/command-queue.test.ts +++ b/src/process/command-queue.test.ts @@ -23,6 +23,7 @@ import { enqueueCommandInLane, getActiveTaskCount, getQueueSize, + resetAllLanes, setCommandLaneConcurrency, waitForActiveTasks, } from "./command-queue.js"; @@ -36,6 +37,12 @@ describe("command queue", () => { diagnosticMocks.diag.error.mockClear(); }); + it("resetAllLanes is safe when no lanes have been created", () => { + expect(getActiveTaskCount()).toBe(0); + expect(() => resetAllLanes()).not.toThrow(); + expect(getActiveTaskCount()).toBe(0); + }); + it("runs tasks one at a time in order", async () => { let active = 0; let maxActive = 0; @@ -162,6 +169,49 @@ describe("command queue", () => { await task; }); + it("resetAllLanes drains queued work immediately after reset", async () => { + const lane = `reset-test-${Date.now()}-${Math.random().toString(16).slice(2)}`; + setCommandLaneConcurrency(lane, 1); + + let resolve1!: () => void; + const blocker = new Promise((r) => { + resolve1 = r; + }); + + // Start a task that blocks the lane + const task1 = enqueueCommandInLane(lane, async () => { + await blocker; + }); + + await vi.waitFor(() => { + expect(getActiveTaskCount()).toBeGreaterThanOrEqual(1); + }); + + // Enqueue another task — it should be stuck behind the blocker + let task2Ran = false; + const task2 = enqueueCommandInLane(lane, async () => { + task2Ran = true; + }); + + await vi.waitFor(() => { + expect(getQueueSize(lane)).toBeGreaterThanOrEqual(2); + }); + expect(task2Ran).toBe(false); + + // Simulate SIGUSR1: reset all lanes. Queued work (task2) should be + // drained immediately — no fresh enqueue needed. + resetAllLanes(); + + // Complete the stale in-flight task; generation mismatch makes its + // completion path a no-op for queue bookkeeping. + resolve1(); + await task1; + + // task2 should have been pumped by resetAllLanes's drain pass. + await task2; + expect(task2Ran).toBe(true); + }); + it("waitForActiveTasks ignores tasks that start after the call", async () => { const lane = `drain-snapshot-${Date.now()}-${Math.random().toString(16).slice(2)}`; setCommandLaneConcurrency(lane, 2); diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index b0f012ca245..9ee4c741719 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -29,10 +29,10 @@ type QueueEntry = { type LaneState = { lane: string; queue: QueueEntry[]; - active: number; activeTaskIds: Set; maxConcurrent: number; draining: boolean; + generation: number; }; const lanes = new Map(); @@ -46,15 +46,23 @@ function getLaneState(lane: string): LaneState { const created: LaneState = { lane, queue: [], - active: 0, activeTaskIds: new Set(), maxConcurrent: 1, draining: false, + generation: 0, }; lanes.set(lane, created); return created; } +function completeTask(state: LaneState, taskId: number, taskGeneration: number): boolean { + if (taskGeneration !== state.generation) { + return false; + } + state.activeTaskIds.delete(taskId); + return true; +} + function drainLane(lane: string) { const state = getLaneState(lane); if (state.draining) { @@ -63,7 +71,7 @@ function drainLane(lane: string) { state.draining = true; const pump = () => { - while (state.active < state.maxConcurrent && state.queue.length > 0) { + while (state.activeTaskIds.size < state.maxConcurrent && state.queue.length > 0) { const entry = state.queue.shift() as QueueEntry; const waitedMs = Date.now() - entry.enqueuedAt; if (waitedMs >= entry.warnAfterMs) { @@ -74,29 +82,31 @@ function drainLane(lane: string) { } logLaneDequeue(lane, waitedMs, state.queue.length); const taskId = nextTaskId++; - state.active += 1; + const taskGeneration = state.generation; state.activeTaskIds.add(taskId); void (async () => { const startTime = Date.now(); try { const result = await entry.task(); - state.active -= 1; - state.activeTaskIds.delete(taskId); - diag.debug( - `lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.active} queued=${state.queue.length}`, - ); - pump(); + const completedCurrentGeneration = completeTask(state, taskId, taskGeneration); + if (completedCurrentGeneration) { + diag.debug( + `lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.activeTaskIds.size} queued=${state.queue.length}`, + ); + pump(); + } entry.resolve(result); } catch (err) { - state.active -= 1; - state.activeTaskIds.delete(taskId); + const completedCurrentGeneration = completeTask(state, taskId, taskGeneration); const isProbeLane = lane.startsWith("auth-probe:") || lane.startsWith("session:probe-"); if (!isProbeLane) { diag.error( `lane task error: lane=${lane} durationMs=${Date.now() - startTime} error="${String(err)}"`, ); } - pump(); + if (completedCurrentGeneration) { + pump(); + } entry.reject(err); } })(); @@ -134,7 +144,7 @@ export function enqueueCommandInLane( warnAfterMs, onWait: opts?.onWait, }); - logLaneEnqueue(cleaned, state.queue.length + state.active); + logLaneEnqueue(cleaned, state.queue.length + state.activeTaskIds.size); drainLane(cleaned); }); } @@ -155,13 +165,13 @@ export function getQueueSize(lane: string = CommandLane.Main) { if (!state) { return 0; } - return state.queue.length + state.active; + return state.queue.length + state.activeTaskIds.size; } export function getTotalQueueSize() { let total = 0; for (const s of lanes.values()) { - total += s.queue.length + s.active; + total += s.queue.length + s.activeTaskIds.size; } return total; } @@ -180,6 +190,36 @@ export function clearCommandLane(lane: string = CommandLane.Main) { return removed; } +/** + * Reset all lane runtime state to idle. Used after SIGUSR1 in-process + * restarts where interrupted tasks' finally blocks may not run, leaving + * stale active task IDs that permanently block new work from draining. + * + * Bumps lane generation and clears execution counters so stale completions + * from old in-flight tasks are ignored. Queued entries are intentionally + * preserved — they represent pending user work that should still execute + * after restart. + * + * After resetting, drains any lanes that still have queued entries so + * preserved work is pumped immediately rather than waiting for a future + * `enqueueCommandInLane()` call (which may never come). + */ +export function resetAllLanes(): void { + const lanesToDrain: string[] = []; + for (const state of lanes.values()) { + state.generation += 1; + state.activeTaskIds.clear(); + state.draining = false; + if (state.queue.length > 0) { + lanesToDrain.push(state.lane); + } + } + // Drain after the full reset pass so all lanes are in a clean state first. + for (const lane of lanesToDrain) { + drainLane(lane); + } +} + /** * Returns the total number of actively executing tasks across all lanes * (excludes queued-but-not-started entries). @@ -187,7 +227,7 @@ export function clearCommandLane(lane: string = CommandLane.Main) { export function getActiveTaskCount(): number { let total = 0; for (const s of lanes.values()) { - total += s.active; + total += s.activeTaskIds.size; } return total; } diff --git a/src/process/restart-recovery.test.ts b/src/process/restart-recovery.test.ts new file mode 100644 index 00000000000..5091d7b9928 --- /dev/null +++ b/src/process/restart-recovery.test.ts @@ -0,0 +1,18 @@ +import { describe, expect, it, vi } from "vitest"; +import { createRestartIterationHook } from "./restart-recovery.js"; + +describe("restart-recovery", () => { + it("skips recovery on first iteration and runs on subsequent iterations", () => { + const onRestart = vi.fn(); + const onIteration = createRestartIterationHook(onRestart); + + expect(onIteration()).toBe(false); + expect(onRestart).not.toHaveBeenCalled(); + + expect(onIteration()).toBe(true); + expect(onRestart).toHaveBeenCalledTimes(1); + + expect(onIteration()).toBe(true); + expect(onRestart).toHaveBeenCalledTimes(2); + }); +}); diff --git a/src/process/restart-recovery.ts b/src/process/restart-recovery.ts new file mode 100644 index 00000000000..2f9818d7f5a --- /dev/null +++ b/src/process/restart-recovery.ts @@ -0,0 +1,16 @@ +/** + * Returns an iteration hook for in-process restart loops. + * The first call is considered initial startup and does nothing. + * Each subsequent call represents a restart iteration and invokes `onRestart`. + */ +export function createRestartIterationHook(onRestart: () => void): () => boolean { + let isFirstIteration = true; + return () => { + if (isFirstIteration) { + isFirstIteration = false; + return false; + } + onRestart(); + return true; + }; +}