diff --git a/ghostcrew/agents/crew/worker_pool.py b/ghostcrew/agents/crew/worker_pool.py deleted file mode 100644 index 059b624..0000000 --- a/ghostcrew/agents/crew/worker_pool.py +++ /dev/null @@ -1,336 +0,0 @@ -"""Worker pool for managing concurrent agent execution.""" - -import asyncio -import time -from typing import TYPE_CHECKING, Any, Dict, List, Optional - -from .models import AgentStatus, AgentWorker, WorkerCallback - -if TYPE_CHECKING: - from ...llm import LLM - from ...runtime import Runtime - from ...tools import Tool - - -class WorkerPool: - """Manages concurrent execution of worker agents.""" - - def __init__( - self, - llm: "LLM", - tools: List["Tool"], - runtime: "Runtime", - target: str = "", - rag_engine: Any = None, - on_worker_event: Optional[WorkerCallback] = None, - ): - self.llm = llm - self.tools = tools - self.runtime = runtime - self.target = target - self.rag_engine = rag_engine - self.on_worker_event = on_worker_event - - self._workers: Dict[str, AgentWorker] = {} - self._tasks: Dict[str, asyncio.Task] = {} - self._results: Dict[str, str] = {} - self._next_id = 0 - self._lock = asyncio.Lock() - - def _emit(self, worker_id: str, event: str, data: Dict[str, Any]) -> None: - """Emit event to callback if registered.""" - if self.on_worker_event: - self.on_worker_event(worker_id, event, data) - - def _generate_id(self) -> str: - """Generate unique worker ID.""" - worker_id = f"agent-{self._next_id}" - self._next_id += 1 - return worker_id - - async def spawn( - self, - task: str, - priority: int = 1, - depends_on: Optional[List[str]] = None, - ) -> str: - """ - Spawn a new worker agent. - - Args: - task: The task description for the agent - priority: Higher priority runs first (for future use) - depends_on: List of agent IDs that must complete first - - Returns: - The worker ID - """ - async with self._lock: - worker_id = self._generate_id() - - worker = AgentWorker( - id=worker_id, - task=task, - priority=priority, - depends_on=depends_on or [], - ) - self._workers[worker_id] = worker - - # Emit spawn event for UI - self._emit( - worker_id, - "spawn", - { - "worker_type": worker_id, - "task": task, - }, - ) - - # Start the agent task - self._tasks[worker_id] = asyncio.create_task(self._run_worker(worker)) - - return worker_id - - async def _run_worker(self, worker: AgentWorker) -> None: - """Run a single worker agent.""" - from ..pa_agent import PentestAgentAgent - - # Wait for dependencies - if worker.depends_on: - await self._wait_for_dependencies(worker.depends_on) - - worker.status = AgentStatus.RUNNING - worker.started_at = time.time() - self._emit(worker.id, "status", {"status": "running"}) - - # Create isolated runtime for this worker (prevents browser state conflicts) - from ...runtime.runtime import LocalRuntime - - worker_runtime = LocalRuntime() - await worker_runtime.start() - - from ...config.constants import WORKER_MAX_ITERATIONS - - agent = PentestAgentAgent( - llm=self.llm, - tools=self.tools, - runtime=worker_runtime, # Use isolated runtime - target=self.target, - rag_engine=self.rag_engine, - max_iterations=WORKER_MAX_ITERATIONS, - ) - - try: - final_response = "" - hit_max_iterations = False - is_infeasible = False - - async for response in agent.agent_loop(worker.task): - # Track tool calls - if response.tool_calls: - for tc in response.tool_calls: - if tc.name not in worker.tools_used: - worker.tools_used.append(tc.name) - self._emit(worker.id, "tool", {"tool": tc.name}) - - # Track tokens (avoid double counting) - if response.usage: - total = response.usage.get("total_tokens", 0) - is_intermediate = response.metadata.get("intermediate", False) - has_tools = bool(response.tool_calls) - - # Same logic as CLI to avoid double counting - should_count = False - if is_intermediate: - should_count = True - worker.last_msg_intermediate = True - elif has_tools: - if not getattr(worker, "last_msg_intermediate", False): - should_count = True - worker.last_msg_intermediate = False - else: - should_count = True - worker.last_msg_intermediate = False - - if should_count and total > 0: - self._emit(worker.id, "tokens", {"tokens": total}) - - # Capture final response (text without tool calls) - if response.content and not response.tool_calls: - final_response = response.content - - # Check metadata flags - if response.metadata: - if response.metadata.get("max_iterations_reached"): - hit_max_iterations = True - if response.metadata.get("replan_impossible"): - is_infeasible = True - - # Prioritize structured results from the plan over chatty summaries - plan_summary = "" - plan = getattr(worker_runtime, "plan", None) - if plan and plan.steps: - from ...tools.finish import StepStatus - - # Include ALL steps regardless of status - skips and failures are valuable context - # Note: PlanStep stores failure/skip reasons in the 'result' field - steps_with_info = [s for s in plan.steps if s.result] - if steps_with_info: - summary_lines = [] - for s in steps_with_info: - status_marker = { - StepStatus.COMPLETE: "✓", - StepStatus.SKIP: "⊘", - StepStatus.FAIL: "✗", - }.get(s.status, "·") - info = s.result or "No details" - summary_lines.append(f"{status_marker} {s.description}: {info}") - plan_summary = "\n".join(summary_lines) - - # Use plan summary if available, otherwise fallback to chat response - worker.result = plan_summary or final_response or "No findings." - - worker.completed_at = time.time() - self._results[worker.id] = worker.result - - if is_infeasible: - worker.status = AgentStatus.FAILED - self._emit( - worker.id, - "failed", - { - "summary": worker.result[:200], - "reason": "Task determined infeasible", - }, - ) - elif hit_max_iterations: - worker.status = AgentStatus.WARNING - self._emit( - worker.id, - "warning", - { - "summary": worker.result[:200], - "reason": "Max iterations reached", - }, - ) - else: - worker.status = AgentStatus.COMPLETE - self._emit( - worker.id, - "complete", - { - "summary": worker.result[:200], - }, - ) - - except asyncio.CancelledError: - worker.status = AgentStatus.CANCELLED - worker.completed_at = time.time() - self._emit(worker.id, "cancelled", {}) - raise - - except Exception as e: - worker.error = str(e) - worker.status = AgentStatus.ERROR - worker.completed_at = time.time() - self._emit(worker.id, "error", {"error": str(e)}) - - finally: - # Cleanup worker's isolated runtime - try: - await worker_runtime.stop() - except Exception: - pass # Best effort cleanup - - async def _wait_for_dependencies(self, depends_on: List[str]) -> None: - """Wait for dependent workers to complete.""" - for dep_id in depends_on: - if dep_id in self._tasks: - try: - await self._tasks[dep_id] - except (asyncio.CancelledError, Exception): - pass # Dependency failed, but we continue - - async def wait_for(self, agent_ids: Optional[List[str]] = None) -> Dict[str, Any]: - """ - Wait for specified agents (or all) to complete. - - Args: - agent_ids: List of agent IDs to wait for. None = wait for all. - - Returns: - Dict mapping agent_id to result/error - """ - if agent_ids is None: - agent_ids = list(self._tasks.keys()) - - results = {} - for agent_id in agent_ids: - if agent_id in self._tasks: - try: - await self._tasks[agent_id] - except (asyncio.CancelledError, Exception): - pass - - worker = self._workers.get(agent_id) - if worker: - results[agent_id] = { - "task": worker.task, - "status": worker.status.value, - "result": worker.result, - "error": worker.error, - "tools_used": worker.tools_used, - } - - return results - - def get_status(self, agent_id: str) -> Optional[Dict[str, Any]]: - """Get status of a specific agent.""" - worker = self._workers.get(agent_id) - if not worker: - return None - return worker.to_dict() - - def get_all_status(self) -> Dict[str, Dict[str, Any]]: - """Get status of all agents.""" - return {wid: w.to_dict() for wid, w in self._workers.items()} - - async def cancel(self, agent_id: str) -> bool: - """Cancel a running agent.""" - if agent_id not in self._tasks: - return False - - task = self._tasks[agent_id] - if not task.done(): - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - return True - return False - - async def cancel_all(self) -> None: - """Cancel all running agents.""" - for task in self._tasks.values(): - if not task.done(): - task.cancel() - - # Wait for all to finish - if self._tasks: - await asyncio.gather(*self._tasks.values(), return_exceptions=True) - - def get_results(self) -> Dict[str, str]: - """Get results from all completed agents.""" - return dict(self._results) - - def get_workers(self) -> List[AgentWorker]: - """Get all workers.""" - return list(self._workers.values()) - - def reset(self) -> None: - """Reset the pool for a new task.""" - self._workers.clear() - self._tasks.clear() - self._results.clear() - self._next_id = 0 diff --git a/ghostcrew/interface/cli.py b/ghostcrew/interface/cli.py deleted file mode 100644 index 29f9929..0000000 --- a/ghostcrew/interface/cli.py +++ /dev/null @@ -1,686 +0,0 @@ -"""Non-interactive CLI mode for PentestAgent.""" - -import asyncio -import time -from datetime import datetime -from pathlib import Path - -from rich.console import Console -from rich.markdown import Markdown -from rich.panel import Panel -from rich.text import Text - -console = Console() - -# PA theme colors (matching TUI) -PA_PRIMARY = "#d4d4d4" # light gray - primary text -PA_SECONDARY = "#9a9a9a" # medium gray - secondary text -PA_DIM = "#6b6b6b" # dim gray - muted text -PA_BORDER = "#3a3a3a" # dark gray - borders -PA_ACCENT = "#7a7a7a" # accent gray - - -async def run_cli( - target: str, - model: str, - task: str = None, - report: str = None, - max_loops: int = 50, - use_docker: bool = False, - mode: str = "agent", -): - """ - Run PentestAgent in non-interactive mode. - - Args: - target: Target to test - model: LLM model to use - task: Optional task description - report: Report path ("auto" for loot/reports/_.md) - max_loops: Max agent loops before stopping - use_docker: Run tools in Docker container - mode: Execution mode ("agent" or "crew") - """ - from ..agents.pa_agent import PentestAgentAgent - from ..knowledge import RAGEngine - from ..llm import LLM - from ..runtime.docker_runtime import DockerRuntime - from ..runtime.runtime import LocalRuntime - from ..tools import get_all_tools - - # Startup panel - start_text = Text() - start_text.append("PENTESTAGENT", style=f"bold {PA_PRIMARY}") - start_text.append(" - Non-interactive Mode\n\n", style=PA_DIM) - start_text.append("Target: ", style=PA_SECONDARY) - start_text.append(f"{target}\n", style=PA_PRIMARY) - start_text.append("Model: ", style=PA_SECONDARY) - start_text.append(f"{model}\n", style=PA_PRIMARY) - start_text.append("Mode: ", style=PA_SECONDARY) - start_text.append(f"{mode.title()}\n", style=PA_PRIMARY) - start_text.append("Runtime: ", style=PA_SECONDARY) - start_text.append(f"{'Docker' if use_docker else 'Local'}\n", style=PA_PRIMARY) - start_text.append("Max loops: ", style=PA_SECONDARY) - start_text.append(f"{max_loops}\n", style=PA_PRIMARY) - - task_msg = task or f"Perform a penetration test on {target}" - start_text.append("Task: ", style=PA_SECONDARY) - start_text.append(task_msg, style=PA_PRIMARY) - - console.print() - console.print( - Panel( - start_text, title=f"[{PA_SECONDARY}]Starting", border_style=PA_BORDER - ) - ) - console.print() - - # Initialize RAG if knowledge exists - rag = None - knowledge_path = Path("knowledge") - if knowledge_path.exists(): - try: - rag = RAGEngine(knowledge_path=knowledge_path) - rag.index() - except Exception: - pass - - # Initialize MCP if config exists (silently skip failures) - mcp_manager = None - mcp_count = 0 - try: - from ..mcp import MCPManager - from ..tools import register_tool_instance - - mcp_manager = MCPManager() - if mcp_manager.config_path.exists(): - mcp_tools = await mcp_manager.connect_all() - for tool in mcp_tools: - register_tool_instance(tool) - mcp_count = len(mcp_tools) - if mcp_count > 0: - console.print(f"[{PA_DIM}]Loaded {mcp_count} MCP tools[/]") - except Exception: - pass # MCP is optional, continue without it - - # Initialize runtime - Docker or Local - if use_docker: - console.print(f"[{PA_DIM}]Starting Docker container...[/]") - runtime = DockerRuntime(mcp_manager=mcp_manager) - else: - runtime = LocalRuntime(mcp_manager=mcp_manager) - await runtime.start() - - llm = LLM(model=model, rag_engine=rag) - tools = get_all_tools() - - # Stats tracking - start_time = time.time() - tool_count = 0 - iteration = 0 - findings_count = 0 # Count of notes/findings recorded - findings = [] # Store actual findings text - total_tokens = 0 # Track total token usage - messages = [] # Store agent messages - tool_log = [] # Log of tools executed (ts, name, command, result, exit_code) - last_content = "" - last_msg_intermediate = False # Track if previous message was intermediate (to avoid double counting tokens) - stopped_reason = None - - def print_status(msg: str, style: str = PA_DIM): - elapsed = int(time.time() - start_time) - mins, secs = divmod(elapsed, 60) - timestamp = f"[{mins:02d}:{secs:02d}]" - console.print(f"[{PA_DIM}]{timestamp}[/] [{style}]{msg}[/]") - - def display_message(content: str, title: str) -> bool: - """Display a message panel if it hasn't been shown yet.""" - nonlocal last_content - if content and content != last_content: - console.print() - console.print( - Panel( - Markdown(content), - title=f"[{PA_PRIMARY}]{title}", - border_style=PA_BORDER, - ) - ) - console.print() - last_content = content - return True - return False - - def generate_report() -> str: - """Generate markdown report.""" - elapsed = int(time.time() - start_time) - mins, secs = divmod(elapsed, 60) - - status_text = "Complete" - if stopped_reason: - status_text = f"Interrupted ({stopped_reason})" - - lines = [ - "# PentestAgent Penetration Test Report", - "", - "## Executive Summary", - "", - ] - - # Add AI summary at top if available - # If the last finding is a full report (Crew mode), use it as the main body - # and avoid adding duplicate headers - main_content = "" - if findings: - main_content = findings[-1] - # If it's a full report (starts with #), don't add our own headers if possible - if not main_content.strip().startswith("#"): - lines.append(main_content) - lines.append("") - else: - # It's a full report, so we might want to replace the default header - # or just append it. Let's append it but skip the "Executive Summary" header above if we could. - # For now, just append it. - lines.append(main_content) - lines.append("") - else: - lines.append("*Assessment incomplete - no analysis generated.*") - lines.append("") - - # Engagement details table - lines.extend( - [ - "## Engagement Details", - "", - "| Field | Value |", - "|-------|-------|", - f"| **Target** | `{target}` |", - f"| **Task** | {task_msg} |", - f"| **Date** | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |", - f"| **Duration** | {mins}m {secs}s |", - f"| **Commands Executed** | {tool_count} |", - f"| **Status** | {status_text} |", - "", - "---", - "", - "## Commands Executed", - "", - ] - ) - - # Detailed command log - for i, entry in enumerate(tool_log, 1): - ts = entry.get("ts", "??:??") - name = entry.get("name", "unknown") - command = entry.get("command", "") - result = entry.get("result", "") - exit_code = entry.get("exit_code") - - lines.append(f"### {i}. {name} `[{ts}]`") - lines.append("") - - if command: - lines.append("**Command:**") - lines.append("```") - lines.append(command) - lines.append("```") - lines.append("") - - if exit_code is not None: - lines.append(f"**Exit Code:** `{exit_code}`") - lines.append("") - - if result: - lines.append("**Output:**") - lines.append("```") - # Limit output to 2000 chars per command for report size - if len(result) > 2000: - lines.append(result[:2000]) - lines.append(f"\n... (truncated, {len(result)} total chars)") - else: - lines.append(result) - lines.append("```") - lines.append("") - - # Findings section - # Only show if there are other findings besides the final report we already showed - other_findings = findings[:-1] if findings and len(findings) > 1 else [] - - if other_findings: - lines.extend( - [ - "---", - "", - "## Detailed Findings", - "", - ] - ) - - for i, finding in enumerate(other_findings, 1): - if len(other_findings) > 1: - lines.append(f"### Finding {i}") - lines.append("") - lines.append(finding) - lines.append("") - - # Footer - lines.extend( - [ - "---", - "", - f"*Report generated by PentestAgent on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*", - ] - ) - - return "\n".join(lines) - - def save_report(): - """Save report to file.""" - if not report: - return - - # Determine path - if report == "auto": - reports_dir = Path("loot/reports") - reports_dir.mkdir(parents=True, exist_ok=True) - safe_target = target.replace("://", "_").replace("/", "_").replace(":", "_") - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - report_path = reports_dir / f"{safe_target}_{timestamp}.md" - else: - report_path = Path(report) - report_path.parent.mkdir(parents=True, exist_ok=True) - - content = generate_report() - report_path.write_text(content, encoding="utf-8") - console.print(f"[{PA_SECONDARY}]Report saved: {report_path}[/]") - - async def generate_summary(): - """Ask the LLM to summarize findings when stopped early.""" - if not tool_log: - return None - - print_status("Generating summary...", PA_SECONDARY) - - # Build context from tool results (use full results, not truncated) - context_lines = ["Summarize the penetration test findings so far:\n"] - context_lines.append(f"Target: {target}") - context_lines.append(f"Tools executed: {tool_count}\n") - - for entry in tool_log[-10:]: # Last 10 tools - name = entry.get("name", "unknown") - command = entry.get("command", "") - result = entry.get("result", "")[:500] # Limit for context window - context_lines.append(f"- **{name}**: `{command}`") - if result: - context_lines.append(f" Output: {result}") - - context_lines.append( - "\nProvide a brief summary of what was discovered and any security concerns found." - ) - - try: - response = await llm.generate( - system_prompt="You are a penetration testing assistant. Summarize the findings concisely.", - messages=[{"role": "user", "content": "\n".join(context_lines)}], - tools=[], - ) - return response.content - except Exception: - return None - - async def print_summary(interrupted: bool = False): - nonlocal messages - - # Generate summary if we don't have messages yet - if not messages and tool_log: - summary = await generate_summary() - if summary: - messages.append(summary) - - elapsed = int(time.time() - start_time) - mins, secs = divmod(elapsed, 60) - - title = "Interrupted" if interrupted else "Finished" - status = "PARTIAL RESULTS" if interrupted else "COMPLETE" - if stopped_reason: - status = f"STOPPED ({stopped_reason})" - - final_text = Text() - final_text.append(f"{status}\n\n", style=f"bold {PA_PRIMARY}") - final_text.append("Duration: ", style=PA_DIM) - final_text.append(f"{mins}m {secs}s\n", style=PA_SECONDARY) - final_text.append("Loops: ", style=PA_DIM) - final_text.append(f"{iteration}/{max_loops}\n", style=PA_SECONDARY) - final_text.append("Tools: ", style=PA_DIM) - final_text.append(f"{tool_count}\n", style=PA_SECONDARY) - - if total_tokens > 0: - final_text.append("Tokens: ", style=PA_DIM) - final_text.append(f"{total_tokens:,}\n", style=PA_SECONDARY) - - if findings_count > 0: - final_text.append("Findings: ", style=PA_DIM) - final_text.append(f"{findings_count}", style=PA_SECONDARY) - - console.print() - console.print( - Panel( - final_text, - title=f"[{PA_SECONDARY}]{title}", - border_style=PA_BORDER, - ) - ) - - # Show summary/messages only if it's new content (not just displayed) - if messages: - display_message(messages[-1], "Summary") - - # Save report - save_report() - - print_status("Initializing...") - - try: - if mode == "crew": - from ..agents.crew import CrewOrchestrator - - def on_worker_event(worker_id: str, event_type: str, data: dict): - nonlocal tool_count, findings_count, total_tokens - - if event_type == "spawn": - task = data.get("task", "") - print_status(f"Spawned worker {worker_id}: {task}", PA_ACCENT) - - elif event_type == "tool": - tool_name = data.get("tool", "unknown") - tool_count += 1 - print_status( - f"Worker {worker_id} using tool: {tool_name}", PA_DIM - ) - - # Log tool usage (limited info available from event) - elapsed = int(time.time() - start_time) - mins, secs = divmod(elapsed, 60) - ts = f"{mins:02d}:{secs:02d}" - - tool_log.append( - { - "ts": ts, - "name": tool_name, - "command": f"(Worker {worker_id})", - "result": "", - "exit_code": None, - } - ) - - elif event_type == "tokens": - tokens = data.get("tokens", 0) - total_tokens += tokens - - elif event_type == "complete": - f_count = data.get("findings_count", 0) - findings_count += f_count - print_status( - f"Worker {worker_id} complete ({f_count} findings)", "green" - ) - - elif event_type == "failed": - reason = data.get("reason", "unknown") - print_status(f"Worker {worker_id} failed: {reason}", "red") - - elif event_type == "status": - status = data.get("status", "") - print_status(f"Worker {worker_id} status: {status}", PA_DIM) - - elif event_type == "warning": - reason = data.get("reason", "unknown") - print_status(f"Worker {worker_id} warning: {reason}", "yellow") - - elif event_type == "error": - error = data.get("error", "unknown") - print_status(f"Worker {worker_id} error: {error}", "red") - - elif event_type == "cancelled": - print_status(f"Worker {worker_id} cancelled", "yellow") - - crew = CrewOrchestrator( - llm=llm, - tools=tools, - runtime=runtime, - on_worker_event=on_worker_event, - rag_engine=rag, - target=target, - ) - - async for update in crew.run(task_msg): - iteration += 1 - phase = update.get("phase", "") - - if phase == "starting": - print_status("Crew orchestrator starting...", PA_PRIMARY) - - elif phase == "thinking": - content = update.get("content", "") - if content: - display_message(content, "PentestAgent Plan") - - elif phase == "tool_call": - tool = update.get("tool", "") - args = update.get("args", {}) - print_status(f"Orchestrator calling: {tool}", PA_ACCENT) - - elif phase == "complete": - report_content = update.get("report", "") - if report_content: - messages.append(report_content) - findings.append( - report_content - ) # Add to findings so it appears in the saved report - display_message(report_content, "Crew Report") - - elif phase == "error": - error = update.get("error", "Unknown error") - print_status(f"Crew error: {error}", "red") - - if iteration >= max_loops: - stopped_reason = "max loops reached" - raise StopIteration() - - else: - # Default Agent Mode - agent = PentestAgentAgent( - llm=llm, - tools=tools, - runtime=runtime, - target=target, - rag_engine=rag, - ) - - async for response in agent.agent_loop(task_msg): - iteration += 1 - - # Track token usage - if response.usage: - usage = response.usage.get("total_tokens", 0) - is_intermediate = response.metadata.get("intermediate", False) - has_tools = bool(response.tool_calls) - - # Logic to avoid double counting: - # 1. Intermediate messages (thinking) always count - # 2. Tool messages count ONLY if not preceded by intermediate message - if is_intermediate: - total_tokens += usage - last_msg_intermediate = True - elif has_tools: - if not last_msg_intermediate: - total_tokens += usage - last_msg_intermediate = False - else: - # Other messages (like plan) - total_tokens += usage - last_msg_intermediate = False - - # Show tool calls and results as they happen - if response.tool_calls: - for i, call in enumerate(response.tool_calls): - tool_count += 1 - name = getattr(call, "name", None) or getattr( - call.function, "name", "tool" - ) - - # Track findings (notes tool) - if name == "notes": - findings_count += 1 - try: - args = getattr(call, "arguments", None) or getattr( - call.function, "arguments", "{}" - ) - if isinstance(args, str): - import json - - args = json.loads(args) - if isinstance(args, dict): - note_content = ( - args.get("value", "") - or args.get("content", "") - or args.get("note", "") - ) - if note_content: - findings.append(note_content) - except Exception: - pass - - elapsed = int(time.time() - start_time) - mins, secs = divmod(elapsed, 60) - ts = f"{mins:02d}:{secs:02d}" - - # Get result if available - if response.tool_results and i < len(response.tool_results): - tr = response.tool_results[i] - result_text = tr.result or tr.error or "" - if result_text: - # Truncate for display - preview = result_text[:200].replace("\n", " ") - if len(result_text) > 200: - preview += "..." - - # Parse args for command extraction - command_text = "" - exit_code = None - try: - args = getattr(call, "arguments", None) or getattr( - call.function, "arguments", "{}" - ) - if isinstance(args, str): - import json - - args = json.loads(args) - if isinstance(args, dict): - command_text = args.get("command", "") - except Exception: - pass - - # Extract exit code from result - if response.tool_results and i < len(response.tool_results): - tr = response.tool_results[i] - full_result = tr.result or tr.error or "" - # Try to parse exit code - if "Exit Code:" in full_result: - try: - import re - - match = re.search( - r"Exit Code:\s*(\d+)", full_result - ) - if match: - exit_code = int(match.group(1)) - except Exception: - pass - else: - full_result = "" - - # Store full data for report (not truncated) - tool_log.append( - { - "ts": ts, - "name": name, - "command": command_text, - "result": full_result, - "exit_code": exit_code, - } - ) - - # Metasploit-style output with better spacing - console.print() # Blank line before each tool - print_status(f"$ {name} ({tool_count})", PA_ACCENT) - - # Show command/args on separate indented line (truncated for display) - if command_text: - display_cmd = command_text[:80] - if len(command_text) > 80: - display_cmd += "..." - console.print(f" [{PA_DIM}]{display_cmd}[/]") - - # Show result on separate line with status indicator - if response.tool_results and i < len(response.tool_results): - tr = response.tool_results[i] - if tr.error: - console.print( - f" [{PA_DIM}][!] {tr.error[:100]}[/]" - ) - elif tr.result: - # Show exit code or brief result - result_line = tr.result[:100].replace("\n", " ") - if exit_code == 0 or "success" in result_line.lower(): - console.print(f" [{PA_DIM}][+] OK[/]") - elif exit_code is not None and exit_code != 0: - console.print( - f" [{PA_DIM}][-] Exit {exit_code}[/]" - ) - else: - console.print( - f" [{PA_DIM}][*] {result_line[:60]}...[/]" - ) - - # Print assistant content immediately (analysis/findings) - if response.content: - if display_message(response.content, "PentestAgent"): - messages.append(response.content) - - # Check max loops limit - if iteration >= max_loops: - stopped_reason = "max loops reached" - console.print() - print_status(f"Max loops limit reached ({max_loops})", "yellow") - raise StopIteration() - - # In agent mode, ensure the final message is treated as the main finding (Executive Summary) - if mode != "crew" and messages: - findings.append(messages[-1]) - - await print_summary(interrupted=False) - - except StopIteration: - await print_summary(interrupted=True) - except (KeyboardInterrupt, asyncio.CancelledError): - stopped_reason = "user interrupt" - await print_summary(interrupted=True) - except Exception as e: - console.print(f"\n[red]Error: {e}[/]") - stopped_reason = f"error: {e}" - await print_summary(interrupted=True) - - finally: - # Cleanup MCP connections first - if mcp_manager: - try: - await mcp_manager.disconnect_all() - await asyncio.sleep(0.1) # Allow transports to close cleanly - except Exception: - pass - - # Then stop runtime - if runtime: - try: - await runtime.stop() - except Exception: - pass diff --git a/ghostcrew/interface/tui.py b/ghostcrew/interface/tui.py deleted file mode 100644 index f9897b1..0000000 --- a/ghostcrew/interface/tui.py +++ /dev/null @@ -1,1804 +0,0 @@ -""" -PentestAgent TUI - Terminal User Interface -""" - -import asyncio -import textwrap -from datetime import datetime -from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, List, Optional - -from rich.text import Text -from textual import on, work -from textual.app import App, ComposeResult -from textual.binding import Binding -from textual.containers import ( - Center, - Container, - Horizontal, - ScrollableContainer, - Vertical, -) -from textual.reactive import reactive -from textual.screen import ModalScreen -from textual.scrollbar import ScrollBar, ScrollBarRender -from textual.timer import Timer -from textual.widgets import Button, Input, Static, Tree -from textual.widgets.tree import TreeNode - -from ..config.constants import DEFAULT_MODEL - - -# ASCII-safe scrollbar renderer to avoid Unicode glyph issues -class ASCIIScrollBarRender(ScrollBarRender): - """Scrollbar renderer using ASCII-safe characters.""" - - BLANK_GLYPH = " " - VERTICAL_BARS = [" ", " ", " ", " ", " ", " ", " ", " "] - HORIZONTAL_BARS = [" ", " ", " ", " ", " ", " ", " ", " "] - - -# Apply ASCII scrollbar globally -ScrollBar.renderer = ASCIIScrollBarRender - - -# Custom Tree with ASCII-safe icons for PowerShell compatibility -class CrewTree(Tree): - """Tree widget with ASCII-compatible expand/collapse icons.""" - - ICON_NODE = "> " - ICON_NODE_EXPANDED = "v " - - -if TYPE_CHECKING: - from ..agents.pa_agent import PentestAgentAgent - - -def wrap_text_lines(text: str, width: int = 80) -> List[str]: - """ - Wrap text content preserving line breaks and wrapping long lines. - - Args: - text: The text to wrap - width: Maximum width per line (default 80 for safe terminal fit) - - Returns: - List of wrapped lines - """ - result = [] - for line in text.split("\n"): - if len(line) <= width: - result.append(line) - else: - # Wrap long lines - wrapped = textwrap.wrap( - line, width=width, break_long_words=False, break_on_hyphens=False - ) - result.extend(wrapped if wrapped else [""]) - return result - - -# ----- Help Screen ----- - - -class HelpScreen(ModalScreen): - """Help modal""" - - BINDINGS = [ - Binding("escape", "dismiss", "Close"), - Binding("q", "dismiss", "Close"), - ] - - CSS = """ - HelpScreen { - align: center middle; - scrollbar-background: #1a1a1a; - scrollbar-background-hover: #1a1a1a; - scrollbar-background-active: #1a1a1a; - scrollbar-color: #3a3a3a; - scrollbar-color-hover: #3a3a3a; - scrollbar-color-active: #3a3a3a; - scrollbar-corner-color: #1a1a1a; - scrollbar-size: 1 1; - } - - #help-container { - width: 60; - height: 23; - background: #121212; - border: solid #3a3a3a; - padding: 1 2; - } - - #help-title { - text-align: center; - text-style: bold; - color: #d4d4d4; - margin-bottom: 1; - } - - #help-content { - color: #9a9a9a; - } - - #help-close { - margin-top: 1; - width: auto; - min-width: 10; - background: #1a1a1a; - color: #9a9a9a; - border: none; - } - - #help-close:hover { - background: #262626; - } - - #help-close:focus { - background: #262626; - text-style: none; - } - """ - - def compose(self) -> ComposeResult: - yield Container( - Static("PentestAgent Help", id="help-title"), - Static(self._get_help_text(), id="help-content"), - Center(Button("Close", id="help-close")), - id="help-container", - ) - - def _get_help_text(self) -> str: - return """[bold]Modes:[/] Assist | Agent | Crew -[bold]Keys:[/] Enter=Send Ctrl+C=Stop Ctrl+Q=Quit F1=Help - -[bold]Commands:[/] - /agent - Run in agent mode - /crew - Run multi-agent crew mode - /target - Set target - /prompt - Show system prompt - /memory - Show memory stats - /notes - Show saved notes - /report - Generate report - /help - Show help - /clear - Clear chat - /tools - List tools - /quit - Exit""" - - def action_dismiss(self) -> None: - self.app.pop_screen() - - @on(Button.Pressed, "#help-close") - def close_help(self) -> None: - self.app.pop_screen() - - -# ----- Main Chat Message Widgets ----- - - -class ThinkingMessage(Static): - """Thinking/reasoning message""" - - def __init__(self, content: str, **kwargs): - super().__init__(**kwargs) - self.thinking_content = content - - def render(self) -> Text: - text = Text() - text.append("| ", style="#3a3a3a") - text.append("* ", style="#9a9a9a") - text.append("Thinking\n", style="bold #9a9a9a") - - # Wrap content - use 70 chars to account for sidebar + prefix - for line in wrap_text_lines(self.thinking_content, width=70): - text.append("| ", style="#3a3a3a") - text.append(f"{line}\n", style="#6b6b6b italic") - - return text - - -class ToolMessage(Static): - """Tool execution message""" - - # Standard tool icon and color (pa theme) - TOOL_ICON = "$" - TOOL_COLOR = "#9a9a9a" # spirit gray - - def __init__(self, tool_name: str, args: str = "", **kwargs): - super().__init__(**kwargs) - self.tool_name = tool_name - self.tool_args = args - - def render(self) -> Text: - text = Text() - text.append("| ", style="#3a3a3a") - text.append(f"{self.TOOL_ICON} ", style=self.TOOL_COLOR) - text.append(f"{self.tool_name}", style=self.TOOL_COLOR) - text.append("\n", style="") - - # Wrap args and show each line with vertical bar - if self.tool_args: - for line in wrap_text_lines(self.tool_args, width=100): - text.append("| ", style="#3a3a3a") - text.append(f"{line}\n", style="#6b6b6b") - - return text - - -class ToolResultMessage(Static): - """Tool result/output message""" - - RESULT_ICON = "#" - RESULT_COLOR = "#7a7a7a" - - def __init__(self, tool_name: str, result: str = "", **kwargs): - super().__init__(**kwargs) - self.tool_name = tool_name - self.result = result - - def render(self) -> Text: - text = Text() - text.append("| ", style="#3a3a3a") - text.append(f"{self.RESULT_ICON} ", style=self.RESULT_COLOR) - text.append(f"{self.tool_name} output", style=self.RESULT_COLOR) - text.append("\n", style="") - - if self.result: - for line in wrap_text_lines(self.result, width=100): - text.append("| ", style="#3a3a3a") - text.append(f"{line}\n", style="#5a5a5a") - - return text - - -class AssistantMessage(Static): - """Assistant response message""" - - def __init__(self, content: str, **kwargs): - super().__init__(**kwargs) - self.message_content = content - - def render(self) -> Text: - text = Text() - text.append("| ", style="#525252") - text.append(">> ", style="#9a9a9a") - text.append("PentestAgent\n", style="bold #d4d4d4") - - # Wrap content - use 70 chars to account for sidebar + prefix - for line in wrap_text_lines(self.message_content, width=70): - text.append("| ", style="#525252") - text.append(f"{line}\n", style="#d4d4d4") - - return text - - -class UserMessage(Static): - """User message""" - - def __init__(self, content: str, **kwargs): - super().__init__(**kwargs) - self.message_content = content - - def render(self) -> Text: - text = Text() - text.append("| ", style="#6b6b6b") # phantom border - text.append("> ", style="#9a9a9a") - text.append("You\n", style="bold #d4d4d4") # specter - text.append("| ", style="#6b6b6b") # phantom border - text.append(f"{self.message_content}\n", style="#d4d4d4") # specter - return text - - -class SystemMessage(Static): - """System message""" - - def __init__(self, content: str, **kwargs): - super().__init__(**kwargs) - self.message_content = content - - def render(self) -> Text: - text = Text() - for line in self.message_content.split("\n"): - text.append(f" {line}\n", style="#6b6b6b") # phantom - subtle system text - return text - - -# ----- Status Bar ----- - - -class StatusBar(Static): - """Animated status bar""" - - status = reactive("idle") - mode = reactive("assist") # "assist" or "agent" - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self._frame = 0 - self._timer: Optional[Timer] = None - - def on_mount(self) -> None: - self._timer = self.set_interval(0.2, self._tick) - - def _tick(self) -> None: - self._frame = (self._frame + 1) % 4 - if self.status not in ["idle", "complete"]: - self.refresh() - - def render(self) -> Text: - dots = "." * (self._frame + 1) - - # Use fixed-width labels (pad dots to 4 chars so text doesn't jump) - dots_padded = dots.ljust(4) - - # PA theme status colors (muted, ethereal) - status_map = { - "idle": ("Ready", "#6b6b6b"), - "initializing": (f"Initializing{dots_padded}", "#9a9a9a"), - "thinking": (f"Thinking{dots_padded}", "#9a9a9a"), - "running": (f"Running{dots_padded}", "#9a9a9a"), - "processing": (f"Processing{dots_padded}", "#9a9a9a"), - "waiting": ("Waiting for input", "#9a9a9a"), - "complete": ("Complete", "#4a9f6e"), - "error": ("Error", "#9f4a4a"), - } - - label, color = status_map.get(self.status, (self.status, "#6b6b6b")) - - text = Text() - - # Show mode (ASCII-safe symbols) - if self.mode == "crew": - text.append(" :: Crew ", style="#9a9a9a") - elif self.mode == "agent": - text.append(" >> Agent ", style="#9a9a9a") - else: - text.append(" >> Assist ", style="#9a9a9a") - - text.append(f"| {label}", style=color) - - if self.status not in ["idle", "initializing", "complete", "error"]: - text.append(" ESC to stop", style="#525252") - - return text - - -# ----- Main TUI App ----- - - -class PentestAgentTUI(App): - """Main PentestAgent TUI Application""" - - # ═══════════════════════════════════════════════════════════ - # PA THEME - Ethereal grays - # ═══════════════════════════════════════════════════════════ - # Void: #0a0a0a (terminal black - the darkness) - # Shadow: #121212 (subtle surface) - # Mist: #1a1a1a (panels, elevated) - # Whisper: #262626 (default borders) - # Fog: #3a3a3a (hover states) - # Apparition: #525252 (focus states) - # Phantom: #6b6b6b (secondary text) - # Spirit: #9a9a9a (normal text) - # Specter: #d4d4d4 (primary text) - # Ectoplasm: #f0f0f0 (highlights) - # ═══════════════════════════════════════════════════════════ - - CSS = """ - Screen { - background: #0a0a0a; - } - - #main-container { - width: 100%; - height: 100%; - layout: horizontal; - } - - /* Chat area - takes full width normally, fills remaining space with sidebar */ - #chat-area { - width: 1fr; - height: 100%; - } - - #chat-area.with-sidebar { - width: 1fr; - } - - #chat-scroll { - width: 100%; - height: 1fr; - background: transparent; - padding: 1 2; - scrollbar-background: #1a1a1a; - scrollbar-background-hover: #1a1a1a; - scrollbar-background-active: #1a1a1a; - scrollbar-color: #3a3a3a; - scrollbar-color-hover: #3a3a3a; - scrollbar-color-active: #3a3a3a; - scrollbar-corner-color: #1a1a1a; - scrollbar-size: 1 1; - } - - #input-container { - width: 100%; - height: 3; - background: transparent; - border: round #262626; - margin: 0 2; - padding: 0; - layout: horizontal; - align-vertical: middle; - } - - #input-container:focus-within { - border: round #525252; - } - - #input-container:focus-within #chat-prompt { - color: #d4d4d4; - } - - #chat-prompt { - width: auto; - height: 100%; - padding: 0 0 0 1; - color: #6b6b6b; - content-align-vertical: middle; - } - - #chat-input { - width: 1fr; - height: 100%; - background: transparent; - border: none; - padding: 0; - margin: 0; - color: #d4d4d4; - } - - #chat-input:focus { - border: none; - } - - #chat-input > .input--placeholder { - color: #6b6b6b; - text-style: italic; - } - - #status-bar { - width: 100%; - height: 1; - background: transparent; - padding: 0 3; - margin: 0; - } - - .message { - margin-bottom: 1; - } - - /* Sidebar - hidden by default */ - #sidebar { - width: 28; - height: 100%; - display: none; - padding-right: 1; - } - - #sidebar.visible { - display: block; - } - - #workers-tree { - height: 1fr; - background: transparent; - border: round #262626; - padding: 0 1; - margin-bottom: 0; - } - - #workers-tree:focus { - border: round #3a3a3a; - } - - #crew-stats { - height: auto; - max-height: 10; - background: transparent; - border: round #262626; - border-title-color: #9a9a9a; - border-title-style: bold; - padding: 0 1; - margin-top: 0; - } - - Tree { - background: transparent; - color: #d4d4d4; - scrollbar-background: #1a1a1a; - scrollbar-background-hover: #1a1a1a; - scrollbar-background-active: #1a1a1a; - scrollbar-color: #3a3a3a; - scrollbar-color-hover: #3a3a3a; - scrollbar-color-active: #3a3a3a; - scrollbar-size: 1 1; - } - - Tree > .tree--cursor { - background: transparent; - } - - Tree > .tree--highlight { - background: transparent; - } - - Tree > .tree--highlight-line { - background: transparent; - } - - .tree--node-label { - padding: 0 1; - } - - .tree--node:hover .tree--node-label { - background: transparent; - } - - .tree--node.-selected .tree--node-label { - background: transparent; - color: #d4d4d4; - } - """ - - BINDINGS = [ - Binding("ctrl+q", "quit_app", "Quit", priority=True), - Binding("ctrl+c", "stop_agent", "Stop", priority=True, show=False), - Binding("escape", "stop_agent", "Stop", priority=True), - Binding("f1", "show_help", "Help"), - Binding("tab", "focus_next", "Next", show=False), - ] - - TITLE = "PentestAgent" - SUB_TITLE = "AI Penetration Testing" - - def __init__( - self, - target: Optional[str] = None, - model: str = None, - use_docker: bool = False, - **kwargs, - ): - super().__init__(**kwargs) - self.target = target - self.model = model or DEFAULT_MODEL - self.use_docker = use_docker - - # Agent components - self.agent: Optional["PentestAgentAgent"] = None - self.runtime = None - self.mcp_manager = None - self.all_tools = [] - self.rag_engine = None # RAG engine - - # State - self._mode = "assist" # "assist", "agent", or "crew" - self._is_running = False - self._is_initializing = True # Block input during init - self._should_stop = False - self._current_worker = None # Track running worker for cancellation - self._current_crew = None # Track crew orchestrator for cancellation - - # Crew mode state - self._crew_workers: Dict[str, Dict[str, Any]] = {} - self._crew_worker_nodes: Dict[str, TreeNode] = {} - self._crew_orchestrator_node: Optional[TreeNode] = None - self._crew_findings_count = 0 - self._viewing_worker_id: Optional[str] = None - self._worker_events: Dict[str, List[Dict]] = {} - self._crew_start_time: Optional[float] = None - self._crew_tokens_used: int = 0 - self._crew_stats_timer: Optional[Timer] = None - self._spinner_timer: Optional[Timer] = None - self._spinner_frame: int = 0 - self._spinner_frames = [ - "⠋", - "⠙", - "⠹", - "⠸", - "⠼", - "⠴", - "⠦", - "⠧", - "⠇", - "⠏", - ] # Braille dots spinner - - def compose(self) -> ComposeResult: - with Horizontal(id="main-container"): - # Chat area (left side) - with Vertical(id="chat-area"): - yield ScrollableContainer(id="chat-scroll") - yield StatusBar(id="status-bar") - with Horizontal(id="input-container"): - yield Static("> ", id="chat-prompt") - yield Input(placeholder="Enter task or type /help", id="chat-input") - - # Sidebar (right side, hidden by default) - with Vertical(id="sidebar"): - yield CrewTree("CREW", id="workers-tree") - yield Static("", id="crew-stats") - - async def on_mount(self) -> None: - """Initialize on mount""" - self._initialize_agent() - - @work(thread=False) - async def _initialize_agent(self) -> None: - """Initialize agent""" - self._set_status("initializing") - - try: - import os - - from ..agents.pa_agent import PentestAgentAgent - from ..knowledge import RAGEngine - from ..llm import LLM, ModelConfig - from ..mcp import MCPManager - from ..runtime.docker_runtime import DockerRuntime - from ..runtime.runtime import LocalRuntime - from ..tools import get_all_tools, register_tool_instance - - # RAG Engine - auto-load knowledge sources - rag_doc_count = 0 - knowledge_path = None - - # Check local knowledge dir first (must have files, not just exist) - local_knowledge = Path("knowledge") - bundled_path = Path(__file__).parent.parent / "knowledge" / "sources" - - if local_knowledge.exists() and any(local_knowledge.rglob("*.*")): - knowledge_path = local_knowledge - elif bundled_path.exists(): - knowledge_path = bundled_path - - if knowledge_path: - try: - # Determine embedding method: env var > auto-detect - embeddings_setting = os.getenv("PENTESTAGENT_EMBEDDINGS", "").lower() - if embeddings_setting == "local": - use_local = True - elif embeddings_setting == "openai": - use_local = False - else: - # Auto: use OpenAI if key available, else local - use_local = not os.getenv("OPENAI_API_KEY") - - self.rag_engine = RAGEngine( - knowledge_path=knowledge_path, use_local_embeddings=use_local - ) - await asyncio.to_thread(self.rag_engine.index) - rag_doc_count = self.rag_engine.get_document_count() - except Exception as e: - self._add_system(f"[!] RAG: {e}") - self.rag_engine = None - - # MCP - auto-load if config exists - mcp_server_count = 0 - try: - self.mcp_manager = MCPManager() - if self.mcp_manager.config_path.exists(): - mcp_tools = await self.mcp_manager.connect_all() - for tool in mcp_tools: - register_tool_instance(tool) - mcp_server_count = len(self.mcp_manager.servers) - except Exception as e: - self._add_system(f"[!] MCP: {e}") - - # Runtime - Docker or Local - if self.use_docker: - self._add_system("+ Starting Docker container...") - self.runtime = DockerRuntime(mcp_manager=self.mcp_manager) - else: - self.runtime = LocalRuntime(mcp_manager=self.mcp_manager) - await self.runtime.start() - - # LLM - llm = LLM( - model=self.model, - config=ModelConfig(temperature=0.7), - rag_engine=self.rag_engine, - ) - - # Tools - self.all_tools = get_all_tools() - - # Agent - self.agent = PentestAgentAgent( - llm=llm, - tools=self.all_tools, - runtime=self.runtime, - target=self.target, - rag_engine=self.rag_engine, - ) - - self._set_status("idle", "assist") - self._is_initializing = False # Allow input now - - # Show ready message - tools_str = ", ".join(t.name for t in self.all_tools[:5]) - if len(self.all_tools) > 5: - tools_str += f", +{len(self.all_tools) - 5} more" - - runtime_str = "Docker" if self.use_docker else "Local" - self._add_system( - f"+ PentestAgent ready\n" - f" Model: {self.model} | Tools: {len(self.all_tools)} | MCP: {mcp_server_count} | RAG: {rag_doc_count}\n" - f" Runtime: {runtime_str} | Mode: Assist (use /agent or /crew for autonomous modes)" - ) - - # Show target if provided (but don't auto-start) - if self.target: - self._add_system(f" Target: {self.target}") - - except Exception as e: - import traceback - - self._add_system(f"[!] Init failed: {e}\n{traceback.format_exc()}") - self._set_status("error") - self._is_initializing = False # Allow input even on error - - def _set_status(self, status: str, mode: Optional[str] = None) -> None: - """Update status bar""" - try: - bar = self.query_one("#status-bar", StatusBar) - bar.status = status - if mode: - bar.mode = mode - self._mode = mode - except Exception: - pass - - def _add_message(self, widget: Static) -> None: - """Add a message widget to chat""" - try: - scroll = self.query_one("#chat-scroll", ScrollableContainer) - widget.add_class("message") - scroll.mount(widget) - scroll.scroll_end(animate=False) - except Exception: - pass - - def _add_system(self, content: str) -> None: - self._add_message(SystemMessage(content)) - - def _add_user(self, content: str) -> None: - self._add_message(UserMessage(content)) - - def _add_assistant(self, content: str) -> None: - self._add_message(AssistantMessage(content)) - - def _add_thinking(self, content: str) -> None: - self._add_message(ThinkingMessage(content)) - - def _add_tool(self, name: str, action: str = "") -> None: - self._add_message(ToolMessage(name, action)) - - def _add_tool_result(self, name: str, result: str) -> None: - """Display tool execution result""" - # Hide tool output - LLM will synthesize it in its response - # This prevents duplication and keeps the chat clean - pass - - def _show_system_prompt(self) -> None: - """Display the current system prompt""" - if self.agent: - prompt = self.agent.get_system_prompt() - self._add_system(f"=== System Prompt ===\n{prompt}") - else: - self._add_system("Agent not initialized") - - def _show_memory_stats(self) -> None: - """Display memory usage statistics""" - if self.agent and self.agent.llm: - stats = self.agent.llm.get_memory_stats() - messages_count = len(self.agent.conversation_history) - - # Format messages for token counting - llm_messages = self.agent._format_messages_for_llm() - current_tokens = self.agent.llm.memory.get_total_tokens(llm_messages) - - info = ( - f"=== Memory Stats ===\n" - f"Messages: {messages_count}\n" - f"Current tokens: {current_tokens:,}\n" - f"Token budget: {stats['token_budget']:,}\n" - f"Summarize at: {stats['summarize_threshold']:,} tokens\n" - f"Recent to keep: {stats['recent_to_keep']} messages\n" - f"Has summary: {stats['has_summary']}\n" - f"Summarized: {stats['summarized_message_count']} messages" - ) - self._add_system(info) - else: - self._add_system("Agent not initialized") - - async def _show_notes(self) -> None: - """Display saved notes""" - from ..tools.notes import get_all_notes - - notes = await get_all_notes() - if not notes: - self._add_system( - "=== Notes ===\nNo notes saved.\n\nThe AI can save key findings using the notes tool." - ) - return - - lines = [f"=== Notes ({len(notes)} entries) ==="] - for key, value in notes.items(): - # Show full value, indent multi-line content - if "\n" in value: - indented = value.replace("\n", "\n ") - lines.append(f"\n[{key}]\n {indented}") - else: - lines.append(f"[{key}] {value}") - lines.append("\nFile: loot/notes.json") - lines.append("Reports: loot/reports/") - - self._add_system("\n".join(lines)) - - def _build_prior_context(self) -> str: - """Build a summary of prior findings for crew mode. - - Extracts: - - Tool results (nmap scans, etc.) - the actual findings - - Assistant analyses - interpretations and summaries - - Last user task - what they were working on - - Excludes: - - Raw user messages (noise) - - Tool call declarations (just names/args, not results) - - Very short responses - """ - if not self.agent or not self.agent.conversation_history: - return "" - - findings = [] - last_user_task = "" - - for msg in self.agent.conversation_history: - # Track user tasks/questions - if msg.role == "user" and msg.content: - last_user_task = msg.content[:200] - - # Extract tool results (the actual findings) - elif msg.tool_results: - for result in msg.tool_results: - if result.success and result.result: - content = ( - result.result[:1500] - if len(result.result) > 1500 - else result.result - ) - findings.append(f"[{result.tool_name}]\n{content}") - - # Include assistant analyses (but not tool call messages) - elif msg.role == "assistant" and msg.content and not msg.tool_calls: - if len(msg.content) > 50: - findings.append(f"[Analysis]\n{msg.content[:1000]}") - - if not findings and not last_user_task: - return "" - - # Build context with last user task + recent findings - parts = [] - if last_user_task: - parts.append(f"Last task: {last_user_task}") - if findings: - parts.append("Findings:\n" + "\n\n".join(findings[-5:])) - - context = "\n\n".join(parts) - if len(context) > 4000: - context = context[:4000] + "\n... (truncated)" - - return context - - def _set_target(self, cmd: str) -> None: - """Set the target for the engagement""" - # Remove /target prefix - target = cmd[7:].strip() - - if not target: - if self.target: - self._add_system( - f"Current target: {self.target}\nUsage: /target " - ) - else: - self._add_system( - "No target set.\nUsage: /target \nExample: /target 192.168.1.1" - ) - return - - self.target = target - - # Update agent's target if agent exists - if self.agent: - self.agent.target = target - - self._add_system(f"@ Target set: {target}") - - @work(exclusive=True) - async def _run_report_generation(self) -> None: - """Generate a pentest report from notes and conversation""" - from pathlib import Path - - from ..tools.notes import get_all_notes - - if not self.agent or not self.agent.llm: - self._add_system("[!] Agent not initialized") - return - - notes = await get_all_notes() - if not notes: - self._add_system( - "No notes found. PentestAgent saves findings using the notes tool during testing." - ) - return - - self._add_system("Generating report...") - - # Format notes - notes_text = "\n".join(f"### {k}\n{v}\n" for k, v in notes.items()) - - # Build conversation summary from full history - conversation_summary = "" - if self.agent.conversation_history: - # Summarize key actions from conversation - actions = [] - for msg in self.agent.conversation_history: - if msg.role == "assistant" and msg.tool_calls: - for tc in msg.tool_calls: - actions.append(f"- Tool: {tc.name}") - elif msg.role == "tool_result" and msg.tool_results: - for tr in msg.tool_results: - # Include truncated result - result = tr.result or "" - output = result[:200] + "..." if len(result) > 200 else result - actions.append(f" Result: {output}") - if actions: - conversation_summary = "\n".join(actions[-30:]) # Last 30 actions - - report_prompt = f"""Generate a penetration test report in Markdown from the notes below. - -# Notes -{notes_text} - -# Activity Log -{conversation_summary if conversation_summary else "N/A"} - -# Target -{self.target or "Not specified"} - -Output a report with: -1. Executive Summary (2-3 sentences) -2. Findings (use notes, include severity: Critical/High/Medium/Low/Info) -3. Recommendations - -Be concise. Use the actual data from notes.""" - - try: - report_content = await self.agent.llm.simple_completion( - prompt=report_prompt, - system="You are a penetration tester writing a security report. Be concise and factual.", - ) - - if not report_content or not report_content.strip(): - self._add_system( - "[!] Report generation returned empty. Check LLM connection." - ) - return - - # Save to loot/reports/ - reports_dir = Path("loot/reports") - reports_dir.mkdir(parents=True, exist_ok=True) - - # Append Shadow Graph if available - try: - from ..knowledge.graph import ShadowGraph - from ..tools.notes import get_all_notes_sync - - # Rehydrate graph from notes - graph = ShadowGraph() - notes = get_all_notes_sync() - if notes: - graph.update_from_notes(notes) - mermaid_code = graph.to_mermaid() - - if mermaid_code: - report_content += ( - "\n\n## Attack Graph (Visual)\n\n```mermaid\n" - + mermaid_code - + "\n```\n" - ) - except Exception as e: - self._add_system(f"[!] Graph generation error: {e}") - - timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S") - report_path = reports_dir / f"report_{timestamp}.md" - report_path.write_text(report_content, encoding="utf-8") - - self._add_system(f"+ Report saved: {report_path}") - - except Exception as e: - self._add_system(f"[!] Report error: {e}") - - @on(Input.Submitted, "#chat-input") - async def handle_submit(self, event: Input.Submitted) -> None: - """Handle input submission""" - # Block input while initializing or AI is processing - if self._is_initializing or self._is_running: - return - - message = event.value.strip() - if not message: - return - - event.input.value = "" - - # Commands - if message.startswith("/"): - await self._handle_command(message) - return - - self._add_user(message) - - # Hide crew sidebar when entering assist mode - self._hide_sidebar() - - # Use assist mode by default - if self.agent and not self._is_running: - self._current_worker = self._run_assist(message) - - async def _handle_command(self, cmd: str) -> None: - """Handle slash commands""" - cmd_lower = cmd.lower().strip() - cmd_original = cmd.strip() - - if cmd_lower in ["/help", "/h", "/?"]: - await self.push_screen(HelpScreen()) - elif cmd_lower == "/clear": - scroll = self.query_one("#chat-scroll", ScrollableContainer) - await scroll.remove_children() - self._hide_sidebar() - # Clear agent conversation history for fresh start - if self.agent: - self.agent.conversation_history.clear() - self._add_system("Chat cleared") - elif cmd_lower == "/tools": - names = [t.name for t in self.all_tools] - self._add_system(f"Tools ({len(names)}): " + ", ".join(names)) - elif cmd_lower in ["/quit", "/exit", "/q"]: - self.exit() - elif cmd_lower == "/prompt": - self._show_system_prompt() - elif cmd_lower == "/memory": - self._show_memory_stats() - elif cmd_lower == "/notes": - await self._show_notes() - elif cmd_lower == "/report": - self._run_report_generation() - elif cmd_original.startswith("/target"): - self._set_target(cmd_original) - elif cmd_original.startswith("/agent"): - await self._parse_agent_command(cmd_original) - elif cmd_original.startswith("/crew"): - await self._parse_crew_command(cmd_original) - else: - self._add_system(f"Unknown command: {cmd}\nType /help for commands.") - - async def _parse_agent_command(self, cmd: str) -> None: - """Parse and execute /agent command""" - - # Remove /agent prefix - rest = cmd[6:].strip() - - if not rest: - self._add_system( - "Usage: /agent \n" - "Example: /agent scan 192.168.1.1\n" - " /agent enumerate SSH on target" - ) - return - - task = rest - - if not task: - self._add_system("Error: No task provided. Usage: /agent ") - return - - self._add_user(f"/agent {task}") - self._add_system(">> Agent Mode") - - # Hide crew sidebar when entering agent mode - self._hide_sidebar() - - if self.agent and not self._is_running: - self._current_worker = self._run_agent_mode(task) - - async def _parse_crew_command(self, cmd: str) -> None: - """Parse and execute /crew command""" - # Remove /crew prefix - rest = cmd[5:].strip() - - if not rest: - self._add_system( - "Usage: /crew \n" - "Example: /crew https://example.com\n" - " /crew 192.168.1.100\n\n" - "Crew mode spawns specialized workers in parallel:\n" - " - recon: Reconnaissance and mapping\n" - " - sqli: SQL injection testing\n" - " - xss: Cross-site scripting testing\n" - " - ssrf: Server-side request forgery\n" - " - auth: Authentication testing\n" - " - idor: Insecure direct object references\n" - " - info: Information disclosure" - ) - return - - target = rest - - if not self._is_running: - self._add_user(f"/crew {target}") - self._show_sidebar() - self._current_worker = self._run_crew_mode(target) - - def _show_sidebar(self) -> None: - """Show the sidebar for crew mode.""" - try: - import time - - sidebar = self.query_one("#sidebar") - sidebar.add_class("visible") - - chat_area = self.query_one("#chat-area") - chat_area.add_class("with-sidebar") - - # Setup tree - tree = self.query_one("#workers-tree", CrewTree) - tree.root.expand() - tree.show_root = False - - # Clear old nodes - tree.root.remove_children() - self._crew_worker_nodes.clear() - self._crew_workers.clear() - self._worker_events.clear() - self._crew_findings_count = 0 - - # Start tracking time and tokens - self._crew_start_time = time.time() - self._crew_tokens_used = 0 - - # Start stats timer (update every second) - if self._crew_stats_timer: - self._crew_stats_timer.stop() - self._crew_stats_timer = self.set_interval(1.0, self._update_crew_stats) - - # Start spinner timer for running workers (faster interval for smooth animation) - if self._spinner_timer: - self._spinner_timer.stop() - self._spinner_timer = self.set_interval(0.15, self._update_spinner) - - # Add crew root node (no orchestrator - just "CREW" header) - self._crew_orchestrator_node = tree.root.add( - "CREW", data={"type": "crew", "id": "crew"} - ) - self._crew_orchestrator_node.expand() - tree.select_node(self._crew_orchestrator_node) - self._viewing_worker_id = None - - # Update stats - self._update_crew_stats() - except Exception as e: - self._add_system(f"[!] Sidebar error: {e}") - - def _hide_sidebar(self) -> None: - """Hide the sidebar.""" - try: - # Stop stats timer - if self._crew_stats_timer: - self._crew_stats_timer.stop() - self._crew_stats_timer = None - - sidebar = self.query_one("#sidebar") - sidebar.remove_class("visible") - - chat_area = self.query_one("#chat-area") - chat_area.remove_class("with-sidebar") - except Exception: - pass - - def _update_crew_stats(self) -> None: - """Update crew stats panel.""" - try: - import time - - text = Text() - - # Elapsed time - text.append("Time: ", style="bold #d4d4d4") - if self._crew_start_time: - elapsed = time.time() - self._crew_start_time - if elapsed < 60: - time_str = f"{int(elapsed)}s" - elif elapsed < 3600: - mins = int(elapsed // 60) - secs = int(elapsed % 60) - time_str = f"{mins}m {secs}s" - else: - hrs = int(elapsed // 3600) - mins = int((elapsed % 3600) // 60) - time_str = f"{hrs}h {mins}m" - text.append(time_str, style="#9a9a9a") - else: - text.append("--", style="#525252") - - text.append("\n") - - # Tokens used - text.append("Tokens: ", style="bold #d4d4d4") - if self._crew_tokens_used > 0: - if self._crew_tokens_used >= 1000: - token_str = f"{self._crew_tokens_used / 1000:.1f}k" - else: - token_str = str(self._crew_tokens_used) - text.append(token_str, style="#9a9a9a") - else: - text.append("--", style="#525252") - - stats = self.query_one("#crew-stats", Static) - stats.update(text) - stats.border_title = "# Stats" - except Exception: - pass - - def _update_spinner(self) -> None: - """Update spinner animation for running workers.""" - try: - # Advance spinner frame - self._spinner_frame += 1 - - # Only update labels for running workers (efficient) - has_running = False - for worker_id, worker in self._crew_workers.items(): - if worker.get("status") == "running": - has_running = True - # Update the tree node label - if worker_id in self._crew_worker_nodes: - node = self._crew_worker_nodes[worker_id] - node.set_label(self._format_worker_label(worker_id)) - - # Stop spinner if no workers are running (save resources) - if not has_running and self._spinner_timer: - self._spinner_timer.stop() - self._spinner_timer = None - except Exception: - pass - - def _add_crew_worker(self, worker_id: str, worker_type: str, task: str) -> None: - """Add a worker to the sidebar tree.""" - self._crew_workers[worker_id] = { - "worker_type": worker_type, - "task": task, - "status": "pending", - "findings": 0, - } - - try: - label = self._format_worker_label(worker_id) - node = self._crew_orchestrator_node.add( - label, data={"type": "worker", "id": worker_id} - ) - self._crew_worker_nodes[worker_id] = node - self._crew_orchestrator_node.expand() - self._update_crew_stats() - except Exception: - pass - - def _update_crew_worker(self, worker_id: str, **updates) -> None: - """Update a worker's state.""" - if worker_id not in self._crew_workers: - return - - self._crew_workers[worker_id].update(updates) - - # Restart spinner if a worker started running - if updates.get("status") == "running" and not self._spinner_timer: - self._spinner_timer = self.set_interval(0.15, self._update_spinner) - - try: - if worker_id in self._crew_worker_nodes: - label = self._format_worker_label(worker_id) - self._crew_worker_nodes[worker_id].set_label(label) - self._update_crew_stats() - except Exception: - pass - - def _format_worker_label(self, worker_id: str) -> Text: - """Format worker label for tree.""" - worker = self._crew_workers.get(worker_id, {}) - status = worker.get("status", "pending") - wtype = worker.get("worker_type", "worker") - findings = worker.get("findings", 0) - - # 4-state icons: working (braille), done (checkmark), warning (!), error (X) - if status in ("running", "pending"): - # Animated braille spinner for all in-progress states - icon = self._spinner_frames[self._spinner_frame % len(self._spinner_frames)] - color = "#d4d4d4" # white - elif status == "complete": - icon = "✓" - color = "#22c55e" # green - elif status == "warning": - icon = "!" - color = "#f59e0b" # amber/orange - else: # error, cancelled, unknown - icon = "✗" - color = "#ef4444" # red - - text = Text() - text.append(f"{icon} ", style=color) - text.append(wtype.upper(), style="bold") - - if status == "complete" and findings > 0: - text.append(f" [{findings}]", style="#22c55e") # green - elif status in ("error", "cancelled"): - # Don't append " !" here since we already have the X icon - pass - - return text - - def _handle_worker_event( - self, worker_id: str, event_type: str, data: Dict[str, Any] - ) -> None: - """Handle worker events from CrewAgent - updates tree sidebar only.""" - try: - if event_type == "spawn": - worker_type = data.get("worker_type", "unknown") - task = data.get("task", "") - self._add_crew_worker(worker_id, worker_type, task) - elif event_type == "status": - status = data.get("status", "running") - self._update_crew_worker(worker_id, status=status) - elif event_type == "tool": - # Add tool as child node under the agent - tool_name = data.get("tool", "unknown") - self._add_tool_to_worker(worker_id, tool_name) - elif event_type == "tokens": - # Track token usage - tokens = data.get("tokens", 0) - self._crew_tokens_used += tokens - elif event_type == "complete": - findings_count = data.get("findings_count", 0) - self._update_crew_worker( - worker_id, status="complete", findings=findings_count - ) - self._crew_findings_count += findings_count - self._update_crew_stats() - elif event_type == "warning": - # Worker hit max iterations but has results - self._update_crew_worker(worker_id, status="warning") - reason = data.get("reason", "Partial completion") - worker = self._crew_workers.get(worker_id, {}) - wtype = worker.get("worker_type", "worker") - self._add_system(f"[!] {wtype.upper()} stopped: {reason}") - self._update_crew_stats() - elif event_type == "failed": - # Worker determined task infeasible - self._update_crew_worker(worker_id, status="failed") - reason = data.get("reason", "Task infeasible") - worker = self._crew_workers.get(worker_id, {}) - wtype = worker.get("worker_type", "worker") - self._add_system(f"[!] {wtype.upper()} failed: {reason}") - self._update_crew_stats() - elif event_type == "error": - self._update_crew_worker(worker_id, status="error") - worker = self._crew_workers.get(worker_id, {}) - wtype = worker.get("worker_type", "worker") - error_msg = data.get("error", "Unknown error") - # Only show errors in chat - they're important - self._add_system(f"[!] {wtype.upper()} failed: {error_msg}") - except Exception as e: - self._add_system(f"[!] Worker event error: {e}") - - def _add_tool_to_worker(self, worker_id: str, tool_name: str) -> None: - """Add a tool usage as child node under worker in tree.""" - try: - node = self._crew_worker_nodes.get(worker_id) - if node: - node.add_leaf(f" {tool_name}") - node.expand() - except Exception: - pass - - @on(Tree.NodeSelected, "#workers-tree") - def on_worker_tree_selected(self, event: Tree.NodeSelected) -> None: - """Handle tree node selection.""" - node = event.node - if node.data: - node_type = node.data.get("type") - if node_type == "crew": - self._viewing_worker_id = None - elif node_type == "worker": - self._viewing_worker_id = node.data.get("id") - - @work(thread=False) - async def _run_crew_mode(self, target: str) -> None: - """Run crew mode with sidebar.""" - self._is_running = True - self._should_stop = False - self._set_status("thinking", "crew") - - try: - from ..agents.base_agent import AgentMessage - from ..agents.crew import CrewOrchestrator - from ..llm import LLM, ModelConfig - - # Build prior context from assist/agent conversation history - prior_context = self._build_prior_context() - - llm = LLM(model=self.model, config=ModelConfig(temperature=0.7)) - - crew = CrewOrchestrator( - llm=llm, - tools=self.all_tools, - runtime=self.runtime, - on_worker_event=self._handle_worker_event, - rag_engine=self.rag_engine, - target=self.target, - prior_context=prior_context, - ) - self._current_crew = crew # Track for cancellation - - self._add_system(f"@ Task: {target}") - - # Track crew results for memory - crew_report = None - - async for update in crew.run(target): - if self._should_stop: - await crew.cancel() - self._add_system("[!] Stopped by user") - break - - phase = update.get("phase", "") - - if phase == "starting": - self._set_status("thinking", "crew") - - elif phase == "thinking": - # Show the orchestrator's reasoning - content = update.get("content", "") - if content: - self._add_thinking(content) - - elif phase == "tool_call": - # Show orchestration tool calls - tool = update.get("tool", "") - args = update.get("args", {}) - self._add_tool(tool, str(args)) - - elif phase == "tool_result": - # Tool results are tracked via worker events - pass - - elif phase == "complete": - crew_report = update.get("report", "") - if crew_report: - self._add_assistant(crew_report) - - elif phase == "error": - error = update.get("error", "Unknown error") - self._add_system(f"[!] Crew error: {error}") - - # Add crew results to main agent's conversation history - # so assist mode can reference what happened - if self.agent and crew_report: - # Add the crew task as a user message - self.agent.conversation_history.append( - AgentMessage( - role="user", - content=f"[CREW MODE] Run parallel analysis on target: {target}", - ) - ) - # Add the crew report as assistant response - self.agent.conversation_history.append( - AgentMessage(role="assistant", content=crew_report) - ) - - self._set_status("complete", "crew") - self._add_system("+ Crew task complete.") - - # Stop timers - if self._crew_stats_timer: - self._crew_stats_timer.stop() - self._crew_stats_timer = None - if self._spinner_timer: - self._spinner_timer.stop() - self._spinner_timer = None - - # Clear crew reference - self._current_crew = None - - except asyncio.CancelledError: - # Cancel crew workers first - if self._current_crew: - await self._current_crew.cancel() - self._current_crew = None - self._add_system("[!] Cancelled") - self._set_status("idle", "crew") - # Stop timers on cancel - if self._crew_stats_timer: - self._crew_stats_timer.stop() - self._crew_stats_timer = None - if self._spinner_timer: - self._spinner_timer.stop() - self._spinner_timer = None - - except Exception as e: - import traceback - - # Cancel crew workers on error too - if self._current_crew: - try: - await self._current_crew.cancel() - except Exception: - pass - self._current_crew = None - self._add_system(f"[!] Crew error: {e}\n{traceback.format_exc()}") - self._set_status("error") - # Stop timers on error too - if self._crew_stats_timer: - self._crew_stats_timer.stop() - self._crew_stats_timer = None - if self._spinner_timer: - self._spinner_timer.stop() - self._spinner_timer = None - finally: - self._is_running = False - - @work(thread=False) - async def _run_assist(self, message: str) -> None: - """Run in assist mode - single response""" - if not self.agent: - self._add_system("[!] Agent not ready") - return - - self._is_running = True - self._should_stop = False - self._set_status("thinking", "assist") - - try: - async for response in self.agent.assist(message): - if self._should_stop: - self._add_system("[!] Stopped by user") - break - - self._set_status("processing") - - # Show thinking/plan FIRST if there's content with tool calls - if response.content: - content = response.content.strip() - if response.tool_calls: - self._add_thinking(content) - else: - self._add_assistant(content) - - # Show tool calls (skip 'finish' - internal control) - if response.tool_calls: - for call in response.tool_calls: - if call.name == "finish": - continue # Skip - summary shown as final message - args_str = str(call.arguments) - self._add_tool(call.name, args_str) - - # Show tool results (displayed after execution completes) - # Skip 'finish' tool - its result is shown as the final summary - if response.tool_results: - for result in response.tool_results: - if result.tool_name == "finish": - continue # Skip - summary shown separately - if result.success: - self._add_tool_result( - result.tool_name, result.result or "Done" - ) - else: - self._add_tool_result( - result.tool_name, f"Error: {result.error}" - ) - - self._set_status("idle", "assist") - - except asyncio.CancelledError: - self._add_system("[!] Cancelled") - self._set_status("idle", "assist") - except Exception as e: - self._add_system(f"[!] Error: {e}") - self._set_status("error") - finally: - self._is_running = False - - @work(thread=False) - async def _run_agent_mode(self, task: str) -> None: - """Run in agent mode - autonomous until task complete or user stops""" - if not self.agent: - self._add_system("[!] Agent not ready") - return - - self._is_running = True - self._should_stop = False - - self._set_status("thinking", "agent") - - try: - async for response in self.agent.agent_loop(task): - if self._should_stop: - self._add_system("[!] Stopped by user") - break - - self._set_status("processing") - - # Show thinking/plan FIRST if there's content with tool calls - if response.content: - content = response.content.strip() - # If it has tool calls, it's thinking. - # If it's marked as intermediate, it's thinking. - if response.tool_calls or response.metadata.get("intermediate"): - self._add_thinking(content) - else: - # Check if this is a task completion message - if response.metadata.get("task_complete"): - self._add_assistant(content) - else: - self._add_assistant(content) - - # Show tool calls AFTER thinking - if response.tool_calls: - for call in response.tool_calls: - # Show all tools including finish - args_str = str(call.arguments) - self._add_tool(call.name, args_str) - - # Show tool results - if response.tool_results: - for result in response.tool_results: - if result.tool_name == "finish": - # Skip showing result for finish tool as it's redundant with the tool call display - continue - - if result.success: - self._add_tool_result( - result.tool_name, result.result or "Done" - ) - else: - self._add_tool_result( - result.tool_name, f"Error: {result.error}" - ) - - # Check state - if self.agent.state.value == "waiting_input": - self._set_status("waiting") - self._add_system("? Awaiting input...") - break - elif self.agent.state.value == "complete": - break - - self._set_status("thinking") - - self._set_status("complete", "agent") - self._add_system("+ Agent task complete. Back to assist mode.") - - # Return to assist mode - await asyncio.sleep(1) - self._set_status("idle", "assist") - - except asyncio.CancelledError: - self._add_system("[!] Cancelled") - self._set_status("idle", "assist") - except Exception as e: - self._add_system(f"[!] Error: {e}") - self._set_status("error") - finally: - self._is_running = False - - def action_quit_app(self) -> None: - # Stop any running tasks first - if self._is_running: - self._should_stop = True - if self._current_worker and not self._current_worker.is_finished: - self._current_worker.cancel() - if self._current_crew: - # Schedule cancel but don't wait - we're exiting - asyncio.create_task(self._cancel_crew()) - self.exit() - - def action_stop_agent(self) -> None: - if self._is_running: - self._should_stop = True - self._add_system("[!] Stopping...") - - # Cancel the running worker to interrupt blocking awaits - if self._current_worker and not self._current_worker.is_finished: - self._current_worker.cancel() - - # Cancel crew orchestrator if running - if self._current_crew: - asyncio.create_task(self._cancel_crew()) - - # Clean up agent state to prevent stale tool responses - if self.agent: - self.agent.cleanup_after_cancel() - - # Reconnect MCP servers (they may be in a bad state after cancellation) - if self.mcp_manager: - asyncio.create_task(self._reconnect_mcp_after_cancel()) - - async def _cancel_crew(self) -> None: - """Cancel crew orchestrator and all workers.""" - try: - if self._current_crew: - await self._current_crew.cancel() - self._current_crew = None - # Mark all running workers as cancelled in the UI - for worker_id, worker in self._crew_workers.items(): - if worker.get("status") in ("running", "pending"): - self._update_crew_worker(worker_id, status="cancelled") - except Exception: - pass # Best effort - - async def _reconnect_mcp_after_cancel(self) -> None: - """Reconnect MCP servers after cancellation to restore clean state.""" - await asyncio.sleep(0.5) # Brief delay for cancellation to propagate - try: - await self.mcp_manager.reconnect_all() - except Exception: - pass # Best effort - don't crash if reconnect fails - - def action_show_help(self) -> None: - self.push_screen(HelpScreen()) - - async def on_unmount(self) -> None: - """Cleanup""" - if self.mcp_manager: - try: - await self.mcp_manager.disconnect_all() - await asyncio.sleep(0.1) - except Exception: - pass - - if self.runtime: - try: - await self.runtime.stop() - except Exception: - pass - - -# ----- Entry Point ----- - - -def run_tui( - target: Optional[str] = None, - model: str = None, - use_docker: bool = False, -): - """Run the PentestAgent TUI""" - app = PentestAgentTUI( - target=target, - model=model, - use_docker=use_docker, - ) - app.run() - - -if __name__ == "__main__": - run_tui()