diff --git a/.dockerignore b/.dockerignore index 987e4a3..3895fe7 100644 --- a/.dockerignore +++ b/.dockerignore @@ -5,7 +5,6 @@ __pycache__ .pytest_cache .mypy_cache .git -loot dist build *.egg-info diff --git a/.gitignore b/.gitignore index a2a10dd..fa250f8 100644 --- a/.gitignore +++ b/.gitignore @@ -32,7 +32,6 @@ env/ # IDE .idea/ -.vscode/ *.swp *.swo *~ @@ -61,8 +60,6 @@ dmypy.json *.log logs/ -# Output -loot/ # Secrets secrets/ diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..1e205f9 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,14 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Debug pentestagent (module)", + "type": "python", + "request": "launch", + "module": "pentestagent", + "cwd": "/app", + "console": "integratedTerminal", + "justMyCode": true + } + ] +} diff --git a/README.md b/README.md index 5f8b4ba..d77a46a 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,7 @@ PentestAgent has three modes, accessible via commands in the TUI: /report Generate report from session /memory Show token/memory usage /prompt Show system prompt +/mcp Visualizes or adds a new MCP server. /clear Clear chat and history /quit Exit (also /exit, /q) /help Show help (also /h, /?) @@ -146,10 +147,7 @@ PentestAgent includes built-in tools and supports MCP (Model Context Protocol) f ### MCP Integration -PentestAgent supports MCP (Model Context Protocol) servers, but automatic -installation and auto-start of vendored MCP adapters has been removed. Operators -should run the installers and setup scripts under `third_party/` manually and -then configure `mcp_servers.json` for any MCP servers they intend to use. Example +PentestAgent supports MCP (Model Context Protocol) servers. Configure `mcp_servers.json` for any MCP servers they intend to use. Example config (place under `mcp_servers.json`): ```json @@ -214,14 +212,3 @@ Only use against systems you have explicit authorization to test. Unauthorized a ## License MIT - -## HexStrike Integration & Thanks - -This branch vendors an optional integration with HexStrike (a powerful MCP-enabled scoring and tooling framework). HexStrike acts as a force-multiplier for PentestAgent by exposing a rich set of automated pentesting tools and workflows that the agent can call via MCP — greatly expanding available capabilities with minimal setup. - -Special thanks and credit to the HexStrike project and its author: https://github.com/0x4m4/hexstrike-ai - -- Notes: -- HexStrike is vendored under `third_party/hexstrike` and is opt-in; follow `scripts/install_hexstrike_deps.sh` or the vendor README to install its dependencies and start the service manually. -- Automatic background install/start of vendored MCP adapters has been removed; operators should use the provided third-party scripts and then update `mcp_servers.json`. -- This update also includes several TUI fixes (improved background worker handling and safer task cancellation) to stabilize the terminal UI while using long-running MCP tools. diff --git a/loot/.gitkeep b/loot/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index 472eaef..1fdb66d 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -176,6 +176,7 @@ class HelpScreen(ModalScreen): ("/help", "Show help"), ("/clear", "Clear chat"), ("/tools", "List tools"), + ("/mcp", "List mcp servers"), ("/quit", "Exit"), ] @@ -441,6 +442,125 @@ class ToolsScreen(ModalScreen): self.app.pop_screen() +class MCPScreen(ModalScreen): + """Interactive MCP browser — split-pane layout. + + Left pane: tree of MCP servers. Right pane: full description (scrollable). + + """ + + BINDINGS = [Binding("escape", "dismiss", "Close"), Binding("q", "dismiss", "Close")] + + CSS = """ + MCPScreen { align: center middle; } + """ + from ..mcp import MCPManager + + def __init__(self, mcp_manager: MCPManager) -> None: + super().__init__() + self.mcp_manager = mcp_manager + + def compose(self) -> ComposeResult: + # Build a split view: left tree, right description + with Container(id="mcp-container"): + with Horizontal(id="mcp-split"): + with Vertical(id="mcp-left"): + yield Static("MCP Servers", id="mcp-title") + yield Tree("MCP Servers", id="mcp-tree") + + with Vertical(id="mcp-right"): + yield Static("Description", id="mcp-desc-title") + yield ScrollableContainer(Static("Select a MCP server to view details.", id="mcp-desc"), id="mcp-desc-scroll") + + yield Center(Button("Close", id="mcp-close")) + + def on_mount(self) -> None: + try: + tree = self.query_one("#mcp-tree", Tree) + except Exception as e: + logging.getLogger(__name__).exception("Failed to query MCP tree: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: failed to initialize MCP tree: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about MCP tree init failure: %s", e) + return + + root = tree.root + root.allow_expand = True + root.show_root = False + + # Populate tool nodes + + servers = self.mcp_manager.get_all_servers() + + for server in servers: + server_node = root.add(server.name, data={"server": server}) + for tool in server.tools: + server_node.add(tool['name'], data={"tool": tool}) + + try: + tree.focus() + except Exception as e: + logging.getLogger(__name__).exception("Failed to focus MCP tree: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: failed to focus MCP tree: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about MCP tree focus failure: %s", e) + + @on(Tree.NodeSelected, "#mcp-tree") + def on_mcp_selected(self, event: Tree.NodeSelected) -> None: + node = event.node + try: + mcp = node.data.get("server") if node.data else None + tool = node.data.get("tool") if node.data else None + + # Update right-hand description pane + try: + desc_widget = self.query_one("#mcp-desc", Static) + text = Text() + if mcp is not None: + text.append(f"{mcp.name}\n", style="bold #d4d4d4") + text.append(f"{mcp.config.description}\n", style="#d4d4d4") + text.append(f"Command: {mcp.config.command}\n", style="#9a9a9a") + text.append(f"Args: {mcp.config.args}\n", style="#9a9a9a") + + enabled_icon = "🟢" if mcp.config.enabled else "🔴" + text.append(f"Enabled: {enabled_icon}\n", style="#9a9a9a") + + connected_icon = "🟢" if mcp.config.enabled else "🔴" + text.append(f"Connected: {connected_icon}\n", style="#9a9a9a") + elif tool is not None: + text.append(f"{tool['description']}\n", style="#d4d4d4") + else: + text.append(f"Choose a server\n", style="#d4d4d4") + desc_widget.update(text) + + except Exception as e: + logging.getLogger(__name__).exception("Failed to update mcp description pane: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: failed to update mcp description: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about mcp desc update failure: %s", e) + except Exception as e: + logging.getLogger(__name__).exception("Unhandled error in on_mcp_selected: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: error handling mcp selection: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about mcp selection error: %s", e) + + @on(Button.Pressed, "#mcp-close") + def close_mcp(self) -> None: + self.app.pop_screen() + + # ----- Main Chat Message Widgets ----- @@ -2007,6 +2127,9 @@ Be concise. Use the actual data from notes.""" ) self._add_system(msg) + + elif cmd_lower.startswith("/mcp"): + await self._parse_mcp_command(cmd_original) elif cmd_lower in ["/quit", "/exit", "/q"]: self.exit() elif cmd_lower == "/prompt": @@ -2313,6 +2436,70 @@ Be concise. Use the actual data from notes.""" # Schedule crew run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker) self._current_worker = self._run_crew_mode(target) + async def _parse_mcp_command(self, cmd: str) -> None: + # Remove /agent prefix + rest = cmd[len("/mcp"):].strip() + + if not rest: + self._add_system( + "Usage: /mcp \n" + "Example: /mcp list \n" + " /mcp add" + ) + return + + action = rest + + if action == 'list': + if self.mcp_manager: + + # Open the interactive mcp browser (split-pane). + try: + await self.push_screen(MCPScreen(mcp_manager=self.mcp_manager)) + except Exception: + pass + elif action.startswith("add"): + + from ..tools import get_all_tools, register_tool_instance + + args = rest[len("add"):].strip() + + # Parse the args string into individual components + parts = args.split() + if len(parts) < 2: + self._add_system("Usage: /mcp add [args...]") + return + + name = parts[0] + command = parts[1] + mcp_args = parts[2:] if len(parts) > 2 else [] + + self.mcp_manager.add_server( + name=name, + command=command, + args=mcp_args, + ) + + server = await self.mcp_manager.connect_server(name) + + self.mcp_server_count = len(self.mcp_manager.list_configured_servers()) + + tools = self.mcp_manager.create_mcp_tools_from_server(server) + + self.agent.add_tools(tools) + + for tool in tools: + register_tool_instance(tool) + + self.all_tools = get_all_tools() + self._update_header() + + + if not action: + self._add_system("Error: No action provided. Usage: /mcp ") + return + + def _show_sidebar(self) -> None: """Show the sidebar for crew mode.""" try: diff --git a/pentestagent/mcp/manager.py b/pentestagent/mcp/manager.py index 461b055..210863a 100644 --- a/pentestagent/mcp/manager.py +++ b/pentestagent/mcp/manager.py @@ -158,6 +158,14 @@ class MCPManager: } for n, s in servers.items() ] + + def create_mcp_tools_from_server(self, server: MCPServer) -> List["Tool"]: + all_tools = [] + for tool_def in server.tools: + tool = create_mcp_tool(tool_def, server, self) + all_tools.append(tool) + return all_tools + async def connect_all(self) -> List["Tool"]: servers_config = self._load_config() @@ -168,9 +176,8 @@ class MCPManager: server = await self._connect_server(config) if server: self.servers[name] = server - for tool_def in server.tools: - tool = create_mcp_tool(tool_def, server, self) - all_tools.append(tool) + tools = self.create_mcp_tools_from_server(server) + all_tools.extend(tools) print(f"[MCP] Connected to {name} with {len(server.tools)} tools") return all_tools