fix(crew): prevent zombie workers and improve TUI robustness

This commit is contained in:
GH05TCREW
2025-12-13 10:11:33 -07:00
parent 1990c0fcb5
commit 0d668a14af
4 changed files with 66 additions and 9 deletions

View File

@@ -20,6 +20,7 @@ class AgentStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETE = "complete"
WARNING = "warning" # Completed but hit max iterations
ERROR = "error"
CANCELLED = "cancelled"

View File

@@ -109,16 +109,20 @@ class WorkerPool:
worker_runtime = LocalRuntime()
await worker_runtime.start()
from ...config.constants import WORKER_MAX_ITERATIONS
agent = GhostCrewAgent(
llm=self.llm,
tools=self.tools,
runtime=worker_runtime, # Use isolated runtime
target=self.target,
rag_engine=self.rag_engine,
max_iterations=WORKER_MAX_ITERATIONS,
)
try:
final_response = ""
hit_max_iterations = False
async for response in agent.agent_loop(worker.task):
# Track tool calls
if response.tool_calls:
@@ -136,19 +140,34 @@ class WorkerPool:
# Capture final response (text without tool calls)
if response.content and not response.tool_calls:
final_response = response.content
# Check if max iterations was hit
if response.metadata and response.metadata.get("max_iterations_reached"):
hit_max_iterations = True
worker.result = final_response or "No findings."
worker.status = AgentStatus.COMPLETE
worker.completed_at = time.time()
self._results[worker.id] = worker.result
self._emit(
worker.id,
"complete",
{
"summary": worker.result[:200],
},
)
if hit_max_iterations:
worker.status = AgentStatus.WARNING
self._emit(
worker.id,
"warning",
{
"summary": worker.result[:200],
"reason": "Max iterations reached",
},
)
else:
worker.status = AgentStatus.COMPLETE
self._emit(
worker.id,
"complete",
{
"summary": worker.result[:200],
},
)
except asyncio.CancelledError:
worker.status = AgentStatus.CANCELLED

View File

@@ -57,6 +57,7 @@ DEFAULT_MAX_TOKENS = 4096
# Agent Defaults
DEFAULT_MAX_ITERATIONS = int(os.environ.get("GHOSTCREW_MAX_ITERATIONS", "50"))
WORKER_MAX_ITERATIONS = int(os.environ.get("GHOSTCREW_WORKER_MAX_ITERATIONS", "10"))
# File Extensions
KNOWLEDGE_TEXT_EXTENSIONS = [".txt", ".md"]

View File

@@ -1310,7 +1310,7 @@ Be concise. Use the actual data from notes."""
wtype = worker.get("worker_type", "worker")
findings = worker.get("findings", 0)
# Simple 3-state icons: working (braille), done (checkmark), error (X)
# 4-state icons: working (braille), done (checkmark), warning (!), error (X)
if status in ("running", "pending"):
# Animated braille spinner for all in-progress states
icon = self._spinner_frames[self._spinner_frame % len(self._spinner_frames)]
@@ -1318,6 +1318,9 @@ Be concise. Use the actual data from notes."""
elif status == "complete":
icon = ""
color = "#22c55e" # green
elif status == "warning":
icon = "!"
color = "#f59e0b" # amber/orange
else: # error, cancelled, unknown
icon = ""
color = "#ef4444" # red
@@ -1360,6 +1363,14 @@ Be concise. Use the actual data from notes."""
)
self._crew_findings_count += findings_count
self._update_crew_stats()
elif event_type == "warning":
# Worker hit max iterations but has results
self._update_crew_worker(worker_id, status="warning")
reason = data.get("reason", "Partial completion")
worker = self._crew_workers.get(worker_id, {})
wtype = worker.get("worker_type", "worker")
self._add_system(f"[!] {wtype.upper()} stopped: {reason}")
self._update_crew_stats()
elif event_type == "error":
self._update_crew_worker(worker_id, status="error")
worker = self._crew_workers.get(worker_id, {})
@@ -1671,6 +1682,14 @@ Be concise. Use the actual data from notes."""
self._is_running = False
def action_quit_app(self) -> None:
# Stop any running tasks first
if self._is_running:
self._should_stop = True
if self._current_worker and not self._current_worker.is_finished:
self._current_worker.cancel()
if self._current_crew:
# Schedule cancel but don't wait - we're exiting
asyncio.create_task(self._cancel_crew())
self.exit()
def action_stop_agent(self) -> None:
@@ -1682,6 +1701,10 @@ Be concise. Use the actual data from notes."""
if self._current_worker and not self._current_worker.is_finished:
self._current_worker.cancel()
# Cancel crew orchestrator if running
if self._current_crew:
asyncio.create_task(self._cancel_crew())
# Clean up agent state to prevent stale tool responses
if self.agent:
self.agent.cleanup_after_cancel()
@@ -1690,6 +1713,19 @@ Be concise. Use the actual data from notes."""
if self.mcp_manager:
asyncio.create_task(self._reconnect_mcp_after_cancel())
async def _cancel_crew(self) -> None:
"""Cancel crew orchestrator and all workers."""
try:
if self._current_crew:
await self._current_crew.cancel()
self._current_crew = None
# Mark all running workers as cancelled in the UI
for worker_id, worker in self._crew_workers.items():
if worker.get("status") in ("running", "pending"):
self._update_crew_worker(worker_id, status="cancelled")
except Exception:
pass # Best effort
async def _reconnect_mcp_after_cancel(self) -> None:
"""Reconnect MCP servers after cancellation to restore clean state."""
await asyncio.sleep(0.5) # Brief delay for cancellation to propagate