From 0bb8465254b43fdbc1e1c527860e1f168e8acb73 Mon Sep 17 00:00:00 2001 From: famez Date: Sun, 8 Feb 2026 18:33:03 +0100 Subject: [PATCH] Added MCP list command --- pentestagent/interface/tui.py | 146 ++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index 472eaef..5856206 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -176,6 +176,7 @@ class HelpScreen(ModalScreen): ("/help", "Show help"), ("/clear", "Clear chat"), ("/tools", "List tools"), + ("/mcp", "List mcp servers"), ("/quit", "Exit"), ] @@ -441,6 +442,120 @@ class ToolsScreen(ModalScreen): self.app.pop_screen() +class MCPScreen(ModalScreen): + """Interactive MCP browser — split-pane layout. + + Left pane: tree of MCP servers. Right pane: full description (scrollable). + + """ + + BINDINGS = [Binding("escape", "dismiss", "Close"), Binding("q", "dismiss", "Close")] + + CSS = """ + MCPScreen { align: center middle; } + """ + from ..mcp import MCPManager + + def __init__(self, mcp_manager: MCPManager) -> None: + super().__init__() + self.mcp_manager = mcp_manager + + def compose(self) -> ComposeResult: + # Build a split view: left tree, right description + with Container(id="mcp-container"): + with Horizontal(id="mcp-split"): + with Vertical(id="mcp-left"): + yield Static("MCP Servers", id="mcp-title") + yield Tree("MCP Servers", id="mcp-tree") + + with Vertical(id="mcp-right"): + yield Static("Description", id="mcp-desc-title") + yield ScrollableContainer(Static("Select a MCP server to view details.", id="mcp-desc"), id="mcp-desc-scroll") + + yield Center(Button("Close", id="mcp-close")) + + def on_mount(self) -> None: + try: + tree = self.query_one("#mcp-tree", Tree) + except Exception as e: + logging.getLogger(__name__).exception("Failed to query MCP tree: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: failed to initialize MCP tree: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about MCP tree init failure: %s", e) + return + + root = tree.root + root.allow_expand = True + root.show_root = False + + # Populate tool nodes + + servers = self.mcp_manager.get_all_servers() + + for server in servers: + root.add(server.name, data={"server": server}) + + try: + tree.focus() + except Exception as e: + logging.getLogger(__name__).exception("Failed to focus MCP tree: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: failed to focus MCP tree: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about MCP tree focus failure: %s", e) + + @on(Tree.NodeSelected, "#mcp-tree") + def on_mcp_selected(self, event: Tree.NodeSelected) -> None: + node = event.node + try: + mcp = node.data.get("server") if node.data else None + + # Update right-hand description pane + try: + desc_widget = self.query_one("#mcp-desc", Static) + text = Text() + if mcp is not None: + text.append(f"{mcp.name}\n", style="bold #d4d4d4") + text.append(f"{mcp.config.description}\n", style="#d4d4d4") + text.append(f"Command: {mcp.config.command}\n", style="#9a9a9a") + text.append(f"Args: {mcp.config.args}\n", style="#9a9a9a") + + enabled_icon = "🟢" if mcp.config.enabled else "🔴" + text.append(f"Enabled: {enabled_icon}\n", style="#9a9a9a") + + connected_icon = "🟢" if mcp.config.enabled else "🔴" + text.append(f"Connected: {connected_icon}\n", style="#9a9a9a") + else: + text.append(f"Choose a server\n", style="#d4d4d4") + desc_widget.update(text) + + except Exception as e: + logging.getLogger(__name__).exception("Failed to update mcp description pane: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: failed to update mcp description: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about mcp desc update failure: %s", e) + except Exception as e: + logging.getLogger(__name__).exception("Unhandled error in on_mcp_selected: %s", e) + try: + from ..interface.notifier import notify + + notify("warning", f"TUI: error handling mcp selection: {e}") + except Exception as e: + logging.getLogger(__name__).exception("Failed to notify operator about mcp selection error: %s", e) + + @on(Button.Pressed, "#mcp-close") + def close_mcp(self) -> None: + self.app.pop_screen() + + # ----- Main Chat Message Widgets ----- @@ -2007,6 +2122,9 @@ Be concise. Use the actual data from notes.""" ) self._add_system(msg) + + elif cmd_lower.startswith("/mcp"): + await self._parse_mcp_command(cmd_original) elif cmd_lower in ["/quit", "/exit", "/q"]: self.exit() elif cmd_lower == "/prompt": @@ -2313,6 +2431,34 @@ Be concise. Use the actual data from notes.""" # Schedule crew run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker) self._current_worker = self._run_crew_mode(target) + async def _parse_mcp_command(self, cmd: str) -> None: + # Remove /agent prefix + rest = cmd[len("/mcp"):].strip() + + if not rest: + self._add_system( + "Usage: /mcp \n" + "Example: /mcp list \n" + " /mcp add" + ) + return + + action = rest + + if action == 'list': + if self.mcp_manager: + + # Open the interactive mcp browser (split-pane). + try: + await self.push_screen(MCPScreen(mcp_manager=self.mcp_manager)) + except Exception: + pass + + if not action: + self._add_system("Error: No action provided. Usage: /mcp ") + return + + def _show_sidebar(self) -> None: """Show the sidebar for crew mode.""" try: