Merge pull request #27 from famez/main

More fixes and MCP improvement
This commit is contained in:
Masic
2026-02-08 14:16:34 -07:00
committed by GitHub
7 changed files with 213 additions and 22 deletions

View File

@@ -5,7 +5,6 @@ __pycache__
.pytest_cache
.mypy_cache
.git
loot
dist
build
*.egg-info

3
.gitignore vendored
View File

@@ -32,7 +32,6 @@ env/
# IDE
.idea/
.vscode/
*.swp
*.swo
*~
@@ -61,8 +60,6 @@ dmypy.json
*.log
logs/
# Output
loot/
# Secrets
secrets/

14
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,14 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Debug pentestagent (module)",
"type": "python",
"request": "launch",
"module": "pentestagent",
"cwd": "/app",
"console": "integratedTerminal",
"justMyCode": true
}
]
}

View File

@@ -119,6 +119,7 @@ PentestAgent has three modes, accessible via commands in the TUI:
/report Generate report from session
/memory Show token/memory usage
/prompt Show system prompt
/mcp <list/add> Visualizes or adds a new MCP server.
/clear Clear chat and history
/quit Exit (also /exit, /q)
/help Show help (also /h, /?)
@@ -146,10 +147,7 @@ PentestAgent includes built-in tools and supports MCP (Model Context Protocol) f
### MCP Integration
PentestAgent supports MCP (Model Context Protocol) servers, but automatic
installation and auto-start of vendored MCP adapters has been removed. Operators
should run the installers and setup scripts under `third_party/` manually and
then configure `mcp_servers.json` for any MCP servers they intend to use. Example
PentestAgent supports MCP (Model Context Protocol) servers. Configure `mcp_servers.json` for any MCP servers they intend to use. Example
config (place under `mcp_servers.json`):
```json
@@ -214,14 +212,3 @@ Only use against systems you have explicit authorization to test. Unauthorized a
## License
MIT
## HexStrike Integration & Thanks
This branch vendors an optional integration with HexStrike (a powerful MCP-enabled scoring and tooling framework). HexStrike acts as a force-multiplier for PentestAgent by exposing a rich set of automated pentesting tools and workflows that the agent can call via MCP — greatly expanding available capabilities with minimal setup.
Special thanks and credit to the HexStrike project and its author: https://github.com/0x4m4/hexstrike-ai
- Notes:
- HexStrike is vendored under `third_party/hexstrike` and is opt-in; follow `scripts/install_hexstrike_deps.sh` or the vendor README to install its dependencies and start the service manually.
- Automatic background install/start of vendored MCP adapters has been removed; operators should use the provided third-party scripts and then update `mcp_servers.json`.
- This update also includes several TUI fixes (improved background worker handling and safer task cancellation) to stabilize the terminal UI while using long-running MCP tools.

0
loot/.gitkeep Normal file
View File

View File

@@ -176,6 +176,7 @@ class HelpScreen(ModalScreen):
("/help", "Show help"),
("/clear", "Clear chat"),
("/tools", "List tools"),
("/mcp", "List mcp servers"),
("/quit", "Exit"),
]
@@ -441,6 +442,125 @@ class ToolsScreen(ModalScreen):
self.app.pop_screen()
class MCPScreen(ModalScreen):
"""Interactive MCP browser — split-pane layout.
Left pane: tree of MCP servers. Right pane: full description (scrollable).
"""
BINDINGS = [Binding("escape", "dismiss", "Close"), Binding("q", "dismiss", "Close")]
CSS = """
MCPScreen { align: center middle; }
"""
from ..mcp import MCPManager
def __init__(self, mcp_manager: MCPManager) -> None:
super().__init__()
self.mcp_manager = mcp_manager
def compose(self) -> ComposeResult:
# Build a split view: left tree, right description
with Container(id="mcp-container"):
with Horizontal(id="mcp-split"):
with Vertical(id="mcp-left"):
yield Static("MCP Servers", id="mcp-title")
yield Tree("MCP Servers", id="mcp-tree")
with Vertical(id="mcp-right"):
yield Static("Description", id="mcp-desc-title")
yield ScrollableContainer(Static("Select a MCP server to view details.", id="mcp-desc"), id="mcp-desc-scroll")
yield Center(Button("Close", id="mcp-close"))
def on_mount(self) -> None:
try:
tree = self.query_one("#mcp-tree", Tree)
except Exception as e:
logging.getLogger(__name__).exception("Failed to query MCP tree: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to initialize MCP tree: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about MCP tree init failure: %s", e)
return
root = tree.root
root.allow_expand = True
root.show_root = False
# Populate tool nodes
servers = self.mcp_manager.get_all_servers()
for server in servers:
server_node = root.add(server.name, data={"server": server})
for tool in server.tools:
server_node.add(tool['name'], data={"tool": tool})
try:
tree.focus()
except Exception as e:
logging.getLogger(__name__).exception("Failed to focus MCP tree: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to focus MCP tree: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about MCP tree focus failure: %s", e)
@on(Tree.NodeSelected, "#mcp-tree")
def on_mcp_selected(self, event: Tree.NodeSelected) -> None:
node = event.node
try:
mcp = node.data.get("server") if node.data else None
tool = node.data.get("tool") if node.data else None
# Update right-hand description pane
try:
desc_widget = self.query_one("#mcp-desc", Static)
text = Text()
if mcp is not None:
text.append(f"{mcp.name}\n", style="bold #d4d4d4")
text.append(f"{mcp.config.description}\n", style="#d4d4d4")
text.append(f"Command: {mcp.config.command}\n", style="#9a9a9a")
text.append(f"Args: {mcp.config.args}\n", style="#9a9a9a")
enabled_icon = "🟢" if mcp.config.enabled else "🔴"
text.append(f"Enabled: {enabled_icon}\n", style="#9a9a9a")
connected_icon = "🟢" if mcp.config.enabled else "🔴"
text.append(f"Connected: {connected_icon}\n", style="#9a9a9a")
elif tool is not None:
text.append(f"{tool['description']}\n", style="#d4d4d4")
else:
text.append(f"Choose a server\n", style="#d4d4d4")
desc_widget.update(text)
except Exception as e:
logging.getLogger(__name__).exception("Failed to update mcp description pane: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to update mcp description: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about mcp desc update failure: %s", e)
except Exception as e:
logging.getLogger(__name__).exception("Unhandled error in on_mcp_selected: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: error handling mcp selection: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about mcp selection error: %s", e)
@on(Button.Pressed, "#mcp-close")
def close_mcp(self) -> None:
self.app.pop_screen()
# ----- Main Chat Message Widgets -----
@@ -2007,6 +2127,9 @@ Be concise. Use the actual data from notes."""
)
self._add_system(msg)
elif cmd_lower.startswith("/mcp"):
await self._parse_mcp_command(cmd_original)
elif cmd_lower in ["/quit", "/exit", "/q"]:
self.exit()
elif cmd_lower == "/prompt":
@@ -2313,6 +2436,70 @@ Be concise. Use the actual data from notes."""
# Schedule crew run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
self._current_worker = self._run_crew_mode(target)
async def _parse_mcp_command(self, cmd: str) -> None:
# Remove /agent prefix
rest = cmd[len("/mcp"):].strip()
if not rest:
self._add_system(
"Usage: /mcp <command>\n"
"Example: /mcp list \n"
" /mcp add"
)
return
action = rest
if action == 'list':
if self.mcp_manager:
# Open the interactive mcp browser (split-pane).
try:
await self.push_screen(MCPScreen(mcp_manager=self.mcp_manager))
except Exception:
pass
elif action.startswith("add"):
from ..tools import get_all_tools, register_tool_instance
args = rest[len("add"):].strip()
# Parse the args string into individual components
parts = args.split()
if len(parts) < 2:
self._add_system("Usage: /mcp add <name> <command> [args...]")
return
name = parts[0]
command = parts[1]
mcp_args = parts[2:] if len(parts) > 2 else []
self.mcp_manager.add_server(
name=name,
command=command,
args=mcp_args,
)
server = await self.mcp_manager.connect_server(name)
self.mcp_server_count = len(self.mcp_manager.list_configured_servers())
tools = self.mcp_manager.create_mcp_tools_from_server(server)
self.agent.add_tools(tools)
for tool in tools:
register_tool_instance(tool)
self.all_tools = get_all_tools()
self._update_header()
if not action:
self._add_system("Error: No action provided. Usage: /mcp <command>")
return
def _show_sidebar(self) -> None:
"""Show the sidebar for crew mode."""
try:

View File

@@ -158,6 +158,14 @@ class MCPManager:
}
for n, s in servers.items()
]
def create_mcp_tools_from_server(self, server: MCPServer) -> List["Tool"]:
all_tools = []
for tool_def in server.tools:
tool = create_mcp_tool(tool_def, server, self)
all_tools.append(tool)
return all_tools
async def connect_all(self) -> List["Tool"]:
servers_config = self._load_config()
@@ -168,9 +176,8 @@ class MCPManager:
server = await self._connect_server(config)
if server:
self.servers[name] = server
for tool_def in server.tools:
tool = create_mcp_tool(tool_def, server, self)
all_tools.append(tool)
tools = self.create_mcp_tools_from_server(server)
all_tools.extend(tools)
print(f"[MCP] Connected to {name} with {len(server.tools)} tools")
return all_tools