Merge pull request #28 from famez/main

MCP improvements
This commit is contained in:
Masic
2026-02-12 21:08:45 -07:00
committed by GitHub
13 changed files with 322 additions and 78 deletions

3
.gitignore vendored
View File

@@ -93,3 +93,6 @@ tests/test_*.py
# Workspaces directory (user data should not be committed)
/workspaces/
loot/token_usage.json
mcp_examples/kali/mcp_servers.json
pentestagent/mcp/mcp_servers.json

View File

@@ -1,15 +0,0 @@
This branch `mcp-cleanup` contains a focused cleanup that disables automatic
installation and auto-start of vendored MCP adapters (HexStrike, MetasploitMCP,
etc.). Operators should manually run installer scripts under `third_party/` and
configure `mcp_servers.json` when they want to enable MCP-backed tools.
Files changed (summary):
- `pentestagent/mcp/manager.py` — removed LAUNCH_* auto-start overrides and vendored auto-start logic.
- `pentestagent/interface/tui.py` and `pentestagent/interface/cli.py` — disabled automatic MCP auto-connect.
- `scripts/setup.sh` and `scripts/setup.ps1` — removed automatic vendored MCP install/start steps and added manual instructions.
- `README.md` — documented the manual MCP install workflow.
This commit is intentionally small and only intended to make the branch visible
for review. The functional changes are in the files listed above.
If you want a different summary or formatting, tell me and I'll update it.

4
mcp_examples/README.md Normal file
View File

@@ -0,0 +1,4 @@
# MCP examples
## Description
In this section, you will find some examples to integrate MCP with the agent.

View File

@@ -0,0 +1,19 @@
FROM pentestagent:latest
RUN git clone https://github.com/Wh0am123/MCP-Kali-Server.git
WORKDIR /app/MCP-Kali-Server/
RUN git checkout 61272cde048c236cc61c75e2fac3c06243c633dd
WORKDIR /app
RUN pip install --no-cache-dir -r MCP-Kali-Server/requirements.txt
COPY mcp_servers.json /app/pentestagent/mcp/
# Expose any needed ports
EXPOSE 8080
# Default command
CMD ["python", "-m", "pentestagent"]

View File

@@ -0,0 +1,48 @@
# KALI MCP integration
## Steps
1. Compile base Docker image.
```bash
docker build ../../ -t pentestagent:latest
```
2. Configure .env file with API tokens and models for AI.
```
OPENAI_API_KEY=sk-...
PENTESTAGENT_MODEL=gpt-5
```
3. Configure the IP address or hostname in the mcp_servers.json file:
``` json
{
"mcpServers": {
"kali": {
"command": "python",
"args": ["MCP-Kali-Server/mcp_server.py", "--server", "http://<hostname>:5000"]
}
}
}
```
4. Launch docker-compose
```bash
# Build
docker-compose build
# Run
docker-compose run --rm pentestagent
```
At this point, you will be able to check the MCP servers with the /mcp list command
![alt text](image.png)
On server side, there should be a connection incoming:
![alt text](image-1.png)

View File

@@ -0,0 +1,21 @@
services:
pentestagent:
build:
context: .
dockerfile: Dockerfile
container_name: pentestagent_with_kali_mcp
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- PENTESTAGENT_MODEL=${PENTESTAGENT_MODEL}
- PENTESTAGENT_DEBUG=${PENTESTAGENT_DEBUG:-false}
networks:
- pentestagent-net
stdin_open: true
tty: true
networks:
pentestagent-net:
driver: bridge

Binary file not shown.

After

Width:  |  Height:  |  Size: 364 KiB

BIN
mcp_examples/kali/image.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

View File

@@ -0,0 +1,8 @@
{
"mcpServers": {
"kali": {
"command": "python",
"args": ["MCP-Kali-Server/mcp_server.py", "--server", "http://192.168.56.108:5000"]
}
}
}

View File

@@ -838,6 +838,18 @@ Call create_plan with the new steps OR feasible=False."""
def add_tools(self, tools : List["Tool"]):
self.tools.extend(tools)
def delete_tools(self, tools: List["Tool"]):
"""
Remove tools from the agent by name.
Args:
tools: List of tool names to remove
"""
self.tools = [t for t in self.tools if t.name not in [tool.name for tool in tools]]
def get_tools(self) -> List["Tool"]:
return self.tools
async def assist(self, message: str) -> AsyncIterator[AgentMessage]:
"""
Assist mode - single LLM call, single tool execution if needed.

View File

@@ -442,57 +442,69 @@ class ToolsScreen(ModalScreen):
self.app.pop_screen()
class MCPScreen(ModalScreen):
"""Interactive MCP browser — split-pane layout.
"""Interactive MCP browser — split-pane layout."""
Left pane: tree of MCP servers. Right pane: full description (scrollable).
"""
BINDINGS = [Binding("escape", "dismiss", "Close"), Binding("q", "dismiss", "Close")]
BINDINGS = [
Binding("escape", "dismiss", "Close"),
Binding("q", "dismiss", "Close"),
]
CSS = """
MCPScreen { align: center middle; }
"""
from ..mcp import MCPManager
def __init__(self, mcp_manager: MCPManager) -> None:
from ..mcp import MCPManager
from ..agents.pa_agent import PentestAgentAgent
def __init__(self, mcp_manager: MCPManager, agent: PentestAgentAgent, tui: "PentestAgentTUI") -> None:
super().__init__()
self.mcp_manager = mcp_manager
self.agent = agent
self.tui = tui
self.selected_server = None
self.selected_tool = None
# ------------------------------------------------------------
def compose(self) -> ComposeResult:
# Build a split view: left tree, right description
with Container(id="mcp-container"):
with Horizontal(id="mcp-split"):
# LEFT SIDE
with Vertical(id="mcp-left"):
yield Static("MCP Servers", id="mcp-title")
yield Tree("MCP Servers", id="mcp-tree")
# RIGHT SIDE
with Vertical(id="mcp-right"):
yield Static("Description", id="mcp-desc-title")
yield ScrollableContainer(Static("Select a MCP server to view details.", id="mcp-desc"), id="mcp-desc-scroll")
# ---- Toggle Button ----
yield Button("Enabled: 🔴", id="mcp-toggle-enabled")
yield ScrollableContainer(
Static("Select a MCP server to view details.", id="mcp-desc"),
id="mcp-desc-scroll"
)
yield Center(Button("Close", id="mcp-close"))
# ------------------------------------------------------------
def on_mount(self) -> None:
try:
tree = self.query_one("#mcp-tree", Tree)
except Exception as e:
logging.getLogger(__name__).exception("Failed to query MCP tree: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to initialize MCP tree: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about MCP tree init failure: %s", e)
return
root = tree.root
root.allow_expand = True
root.show_root = False
# Populate tool nodes
servers = self.mcp_manager.get_all_servers()
for server in servers:
@@ -500,67 +512,117 @@ class MCPScreen(ModalScreen):
for tool in server.tools:
server_node.add(tool['name'], data={"tool": tool})
# Hide toggle button initially
toggle_btn = self.query_one("#mcp-toggle-enabled", Button)
toggle_btn.display = False
try:
tree.focus()
except Exception as e:
logging.getLogger(__name__).exception("Failed to focus MCP tree: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to focus MCP tree: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about MCP tree focus failure: %s", e)
# ------------------------------------------------------------
@on(Tree.NodeSelected, "#mcp-tree")
def on_mcp_selected(self, event: Tree.NodeSelected) -> None:
node = event.node
self.selected_server = node.data.get("server") if node.data else None
self.selected_tool = node.data.get("tool") if node.data else None
desc_widget = self.query_one("#mcp-desc", Static)
toggle_btn = self.query_one("#mcp-toggle-enabled", Button)
text = Text()
if self.selected_server is not None:
mcp = self.selected_server
text.append(f"{mcp.name}\n", style="bold #d4d4d4")
text.append(f"{mcp.config.description}\n\n", style="#d4d4d4")
text.append(f"Command: {mcp.config.command}\n", style="#9a9a9a")
text.append(f"Args: {mcp.config.args}\n\n", style="#9a9a9a")
enabled_icon = "🟢" if mcp.config.enabled else "🔴"
connected_icon = "🟢" if mcp.connected else "🔴"
text.append(f"Connected: {connected_icon}\n", style="#9a9a9a")
if mcp.last_error:
text.append(f"\nLast error: {mcp.last_error}\n", style="#e91b1b")
logs = mcp.get_logs()
if logs:
text.append(f"\n{logs}\n", style="#86e41a")
desc_widget.update(text)
toggle_btn.display = True
toggle_btn.label = f"Enabled: {enabled_icon}"
elif self.selected_tool is not None:
text.append(f"{self.selected_tool['description']}\n", style="#d4d4d4")
desc_widget.update(text)
toggle_btn.display = False
else:
desc_widget.update("Choose a server.")
toggle_btn.display = False
# ------------------------------------------------------------
@on(Button.Pressed, "#mcp-toggle-enabled")
async def toggle_enabled(self) -> None:
mcp = self.selected_server
if mcp is None:
return
try:
mcp = node.data.get("server") if node.data else None
tool = node.data.get("tool") if node.data else None
new_value = not mcp.config.enabled
# Update right-hand description pane
try:
desc_widget = self.query_one("#mcp-desc", Static)
text = Text()
if mcp is not None:
text.append(f"{mcp.name}\n", style="bold #d4d4d4")
text.append(f"{mcp.config.description}\n", style="#d4d4d4")
text.append(f"Command: {mcp.config.command}\n", style="#9a9a9a")
text.append(f"Args: {mcp.config.args}\n", style="#9a9a9a")
enabled_icon = "🟢" if mcp.config.enabled else "🔴"
text.append(f"Enabled: {enabled_icon}\n", style="#9a9a9a")
connected_icon = "🟢" if mcp.config.enabled else "🔴"
text.append(f"Connected: {connected_icon}\n", style="#9a9a9a")
elif tool is not None:
text.append(f"{tool['description']}\n", style="#d4d4d4")
else:
text.append(f"Choose a server\n", style="#d4d4d4")
desc_widget.update(text)
if new_value:
await self.mcp_manager.enable(mcp.name)
tools = self.mcp_manager.create_mcp_tools_from_server(mcp)
self.agent.add_tools(tools)
else:
tools = self.mcp_manager.create_mcp_tools_from_server(mcp)
self.agent.delete_tools(tools)
await self.mcp_manager.disable(mcp.name)
self.tui._update_header()
except Exception as e:
logging.getLogger(__name__).exception("Failed to update mcp description pane: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to update mcp description: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about mcp desc update failure: %s", e)
except Exception as e:
logging.getLogger(__name__).exception("Unhandled error in on_mcp_selected: %s", e)
logging.getLogger(__name__).exception("Toggle failed: %s", e)
try:
from ..interface.notifier import notify
notify("error", f"Failed to toggle {mcp.name}: {e}")
except Exception:
pass
return
notify("warning", f"TUI: error handling mcp selection: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about mcp selection error: %s", e)
# Refresh UI
self._refresh_description()
# ------------------------------------------------------------
def _refresh_description(self):
"""Rebuild right-hand side using current selection."""
if self.selected_server:
# Simulate reselecting same node
dummy = type("Node", (), {"data": {"server": self.selected_server}})
event = type("Evt", (), {"node": dummy})
self.on_mcp_selected(event)
# ------------------------------------------------------------
@on(Button.Pressed, "#mcp-close")
def close_mcp(self) -> None:
self.app.pop_screen()
# ----- Main Chat Message Widgets -----
@@ -2099,7 +2161,7 @@ Be concise. Use the actual data from notes."""
elif cmd_lower == "/tools":
# Open the interactive tools browser (split-pane).
try:
await self.push_screen(ToolsScreen(tools=self.all_tools))
await self.push_screen(ToolsScreen(tools=self.agent.get_tools()))
except Exception:
# Fallback: list tools in the system area if UI push fails
from ..runtime.runtime import detect_environment
@@ -2455,7 +2517,7 @@ Be concise. Use the actual data from notes."""
# Open the interactive mcp browser (split-pane).
try:
await self.push_screen(MCPScreen(mcp_manager=self.mcp_manager))
await self.push_screen(MCPScreen(mcp_manager=self.mcp_manager, agent=self.agent, tui=self))
except Exception:
pass
elif action.startswith("add"):
@@ -2629,7 +2691,7 @@ Be concise. Use the actual data from notes."""
else:
# try to recreate a compact model/runtime line
runtime_str = "Docker" if getattr(self, "use_docker", False) else "Local"
tools_count = len(getattr(self, "all_tools", [])) if hasattr(self, "all_tools") else 0
tools_count = len(self.agent.get_tools())
mode = getattr(self, '_mode', 'assist')
if mode == 'assist':
mode = 'assist (use /agent or /crew for autonomous modes)'

View File

@@ -41,19 +41,36 @@ class MCPServer:
name: str
config: MCPServerConfig
transport: MCPTransport
transport: Optional[MCPTransport]
tools: List[dict] = field(default_factory=list)
connected: bool = False
# Lock for serializing all communication with this server
# Prevents message ID collisions and transport interleaving
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
last_error: str = "" #If something went wrong, get the last error message.
async def disconnect(self):
"""Disconnect from the server."""
if self.connected:
await self.transport.disconnect()
if self.transport:
await self.transport.disconnect()
self.connected = False
def is_enabled(self) -> bool:
return self.config.enabled
def enable(self):
self.config.enabled = True
def disable(self):
self.config.enabled = False
def get_logs(self) -> str:
if not self.transport:
return ""
return self.transport.get_logs()
class MCPManager:
"""Manages MCP server connections and exposes tools to agents."""
@@ -236,7 +253,14 @@ class MCPManager:
except Exception:
pass
print(f"[MCP] Failed to connect to {config.name}: {e}")
return None
return MCPServer(
name=config.name,
config=config,
transport=None,
tools=[],
connected=False,
last_error=str(e)
)
async def call_tool(self, server_name: str, tool_name: str, arguments: dict) -> Any:
server = self.servers.get(server_name)
@@ -293,3 +317,25 @@ class MCPManager:
def is_connected(self, name: str) -> bool:
server = self.servers.get(name)
return server is not None and server.connected
async def enable(self, name: str):
server = self.servers.get(name)
if not server:
return
server.enable()
server = await self._connect_server(server.config)
if server: #Do a shadow copy.
self.servers[name].name = server.name
self.servers[name].config = server.config
self.servers[name].connected = server.connected
self.servers[name].last_error = server.last_error
self.servers[name].tools = server.tools
self.servers[name].transport = server.transport
async def disable(self, name: str):
server = self.servers.get(name)
if not server:
return
await server.disconnect()
server.disable()

View File

@@ -31,6 +31,10 @@ class MCPTransport(ABC):
"""Check if the transport is connected."""
pass
@abstractmethod
async def get_logs(self) -> str:
pass
class StdioTransport(MCPTransport):
"""MCP transport over stdio (for npx/uvx commands)."""
@@ -49,11 +53,28 @@ class StdioTransport(MCPTransport):
self.env = env
self.process: Optional[asyncio.subprocess.Process] = None
self._lock = asyncio.Lock()
self.logstask = None
self.logs = ""
def get_logs(self) -> str:
return self.logs
@property
def is_connected(self) -> bool:
"""Check if the process is running."""
return self.process is not None and self.process.returncode is None
async def _read_stderr_loop(self):
try:
while True:
line = await self.process.stderr.readline()
if not line:
break
self.logs += line.decode().rstrip() + "\n"
except asyncio.CancelledError:
# Optional: do any cleanup here
pass
async def connect(self):
"""Start the MCP server process."""
@@ -84,6 +105,9 @@ class StdioTransport(MCPTransport):
limit=1024 * 1024, # 1MB buffer limit for large MCP responses
)
self.logstask = asyncio.create_task(self._read_stderr_loop())
async def send(self, message: dict, timeout: float = 15.0) -> dict:
"""
Send a JSON-RPC message and wait for response.
@@ -129,6 +153,15 @@ class StdioTransport(MCPTransport):
if not self.process:
return
if self.logstask:
self.logstask.cancel()
try:
await self.logstask
except asyncio.CancelledError:
pass # Task was successfully cancelled
proc = self.process
self.process = None
@@ -180,6 +213,9 @@ class SSETransport(MCPTransport):
"""Check if the session is active."""
return self._connected and self.session is not None
def get_logs(self) -> str:
return ""
async def connect(self):
"""Connect to the SSE endpoint."""
try: