From 0d668a14afede762f170ab825f8a615781912e46 Mon Sep 17 00:00:00 2001 From: GH05TCREW Date: Sat, 13 Dec 2025 10:11:33 -0700 Subject: [PATCH] fix(crew): prevent zombie workers and improve TUI robustness --- ghostcrew/agents/crew/models.py | 1 + ghostcrew/agents/crew/worker_pool.py | 35 +++++++++++++++++++------ ghostcrew/config/constants.py | 1 + ghostcrew/interface/tui.py | 38 +++++++++++++++++++++++++++- 4 files changed, 66 insertions(+), 9 deletions(-) diff --git a/ghostcrew/agents/crew/models.py b/ghostcrew/agents/crew/models.py index 935120a..228a31e 100644 --- a/ghostcrew/agents/crew/models.py +++ b/ghostcrew/agents/crew/models.py @@ -20,6 +20,7 @@ class AgentStatus(Enum): PENDING = "pending" RUNNING = "running" COMPLETE = "complete" + WARNING = "warning" # Completed but hit max iterations ERROR = "error" CANCELLED = "cancelled" diff --git a/ghostcrew/agents/crew/worker_pool.py b/ghostcrew/agents/crew/worker_pool.py index 40c6ac1..69b35a7 100644 --- a/ghostcrew/agents/crew/worker_pool.py +++ b/ghostcrew/agents/crew/worker_pool.py @@ -109,16 +109,20 @@ class WorkerPool: worker_runtime = LocalRuntime() await worker_runtime.start() + from ...config.constants import WORKER_MAX_ITERATIONS + agent = GhostCrewAgent( llm=self.llm, tools=self.tools, runtime=worker_runtime, # Use isolated runtime target=self.target, rag_engine=self.rag_engine, + max_iterations=WORKER_MAX_ITERATIONS, ) try: final_response = "" + hit_max_iterations = False async for response in agent.agent_loop(worker.task): # Track tool calls if response.tool_calls: @@ -136,19 +140,34 @@ class WorkerPool: # Capture final response (text without tool calls) if response.content and not response.tool_calls: final_response = response.content + + # Check if max iterations was hit + if response.metadata and response.metadata.get("max_iterations_reached"): + hit_max_iterations = True worker.result = final_response or "No findings." - worker.status = AgentStatus.COMPLETE worker.completed_at = time.time() self._results[worker.id] = worker.result - self._emit( - worker.id, - "complete", - { - "summary": worker.result[:200], - }, - ) + if hit_max_iterations: + worker.status = AgentStatus.WARNING + self._emit( + worker.id, + "warning", + { + "summary": worker.result[:200], + "reason": "Max iterations reached", + }, + ) + else: + worker.status = AgentStatus.COMPLETE + self._emit( + worker.id, + "complete", + { + "summary": worker.result[:200], + }, + ) except asyncio.CancelledError: worker.status = AgentStatus.CANCELLED diff --git a/ghostcrew/config/constants.py b/ghostcrew/config/constants.py index c777ed3..95a558f 100644 --- a/ghostcrew/config/constants.py +++ b/ghostcrew/config/constants.py @@ -57,6 +57,7 @@ DEFAULT_MAX_TOKENS = 4096 # Agent Defaults DEFAULT_MAX_ITERATIONS = int(os.environ.get("GHOSTCREW_MAX_ITERATIONS", "50")) +WORKER_MAX_ITERATIONS = int(os.environ.get("GHOSTCREW_WORKER_MAX_ITERATIONS", "10")) # File Extensions KNOWLEDGE_TEXT_EXTENSIONS = [".txt", ".md"] diff --git a/ghostcrew/interface/tui.py b/ghostcrew/interface/tui.py index 8f33649..dd3abdb 100644 --- a/ghostcrew/interface/tui.py +++ b/ghostcrew/interface/tui.py @@ -1310,7 +1310,7 @@ Be concise. Use the actual data from notes.""" wtype = worker.get("worker_type", "worker") findings = worker.get("findings", 0) - # Simple 3-state icons: working (braille), done (checkmark), error (X) + # 4-state icons: working (braille), done (checkmark), warning (!), error (X) if status in ("running", "pending"): # Animated braille spinner for all in-progress states icon = self._spinner_frames[self._spinner_frame % len(self._spinner_frames)] @@ -1318,6 +1318,9 @@ Be concise. Use the actual data from notes.""" elif status == "complete": icon = "✓" color = "#22c55e" # green + elif status == "warning": + icon = "!" + color = "#f59e0b" # amber/orange else: # error, cancelled, unknown icon = "✗" color = "#ef4444" # red @@ -1360,6 +1363,14 @@ Be concise. Use the actual data from notes.""" ) self._crew_findings_count += findings_count self._update_crew_stats() + elif event_type == "warning": + # Worker hit max iterations but has results + self._update_crew_worker(worker_id, status="warning") + reason = data.get("reason", "Partial completion") + worker = self._crew_workers.get(worker_id, {}) + wtype = worker.get("worker_type", "worker") + self._add_system(f"[!] {wtype.upper()} stopped: {reason}") + self._update_crew_stats() elif event_type == "error": self._update_crew_worker(worker_id, status="error") worker = self._crew_workers.get(worker_id, {}) @@ -1671,6 +1682,14 @@ Be concise. Use the actual data from notes.""" self._is_running = False def action_quit_app(self) -> None: + # Stop any running tasks first + if self._is_running: + self._should_stop = True + if self._current_worker and not self._current_worker.is_finished: + self._current_worker.cancel() + if self._current_crew: + # Schedule cancel but don't wait - we're exiting + asyncio.create_task(self._cancel_crew()) self.exit() def action_stop_agent(self) -> None: @@ -1682,6 +1701,10 @@ Be concise. Use the actual data from notes.""" if self._current_worker and not self._current_worker.is_finished: self._current_worker.cancel() + # Cancel crew orchestrator if running + if self._current_crew: + asyncio.create_task(self._cancel_crew()) + # Clean up agent state to prevent stale tool responses if self.agent: self.agent.cleanup_after_cancel() @@ -1690,6 +1713,19 @@ Be concise. Use the actual data from notes.""" if self.mcp_manager: asyncio.create_task(self._reconnect_mcp_after_cancel()) + async def _cancel_crew(self) -> None: + """Cancel crew orchestrator and all workers.""" + try: + if self._current_crew: + await self._current_crew.cancel() + self._current_crew = None + # Mark all running workers as cancelled in the UI + for worker_id, worker in self._crew_workers.items(): + if worker.get("status") in ("running", "pending"): + self._update_crew_worker(worker_id, status="cancelled") + except Exception: + pass # Best effort + async def _reconnect_mcp_after_cancel(self) -> None: """Reconnect MCP servers after cancellation to restore clean state.""" await asyncio.sleep(0.5) # Brief delay for cancellation to propagate