Merge branch 'dev'

This commit is contained in:
famez
2026-02-12 19:16:36 +01:00
11 changed files with 293 additions and 62 deletions

3
.gitignore vendored
View File

@@ -93,3 +93,6 @@ tests/test_*.py
# Workspaces directory (user data should not be committed)
/workspaces/
loot/token_usage.json
mcp_examples/kali/mcp_servers.json
pentestagent/mcp/mcp_servers.json

4
mcp_examples/README.md Normal file
View File

@@ -0,0 +1,4 @@
# MCP examples
## Description
In this section, you will find some examples to integrate MCP with the agent.

View File

@@ -0,0 +1,19 @@
FROM pentestagent:latest
RUN git clone https://github.com/Wh0am123/MCP-Kali-Server.git
WORKDIR /app/MCP-Kali-Server/
RUN git checkout 61272cde048c236cc61c75e2fac3c06243c633dd
WORKDIR /app
RUN pip install --no-cache-dir -r MCP-Kali-Server/requirements.txt
COPY mcp_servers.json /app/pentestagent/mcp/
# Expose any needed ports
EXPOSE 8080
# Default command
CMD ["python", "-m", "pentestagent"]

View File

@@ -0,0 +1,49 @@
# KALI MCP integration
## Steps
Brief description of what this project does.
1. Compile base Docker image.
```bash
docker build ../../ -t pentestagent:latest
```
2. Configure .env file with API tokens and models for AI.
```
OPENAI_API_KEY=sk-...
PENTESTAGENT_MODEL=gpt-5
```
3. Configure the IP address or hostname in the mcp_servers.json file:
``` json
{
"mcpServers": {
"kali": {
"command": "python",
"args": ["MCP-Kali-Server/mcp_server.py", "--server", "http://<hostname>:5000"]
}
}
}
```
4. Launch docker-compose
```bash
# Build
docker-compose build
# Run
docker-compose run --rm pentestagent
```
At this point, you will be able to check the MCP servers with the /mcp list command
![alt text](image.png)
On server side, there should be a connection incoming:
![alt text](image-1.png)

View File

@@ -0,0 +1,21 @@
services:
pentestagent:
build:
context: .
dockerfile: Dockerfile
container_name: pentestagent_with_kali_mcp
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- PENTESTAGENT_MODEL=${PENTESTAGENT_MODEL}
- PENTESTAGENT_DEBUG=${PENTESTAGENT_DEBUG:-false}
networks:
- pentestagent-net
stdin_open: true
tty: true
networks:
pentestagent-net:
driver: bridge

Binary file not shown.

After

Width:  |  Height:  |  Size: 364 KiB

BIN
mcp_examples/kali/image.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

View File

@@ -838,6 +838,18 @@ Call create_plan with the new steps OR feasible=False."""
def add_tools(self, tools : List["Tool"]):
self.tools.extend(tools)
def delete_tools(self, tools: List["Tool"]):
"""
Remove tools from the agent by name.
Args:
tools: List of tool names to remove
"""
self.tools = [t for t in self.tools if t.name not in [tool.name for tool in tools]]
def get_tools(self) -> List["Tool"]:
return self.tools
async def assist(self, message: str) -> AsyncIterator[AgentMessage]:
"""
Assist mode - single LLM call, single tool execution if needed.

View File

@@ -442,57 +442,69 @@ class ToolsScreen(ModalScreen):
self.app.pop_screen()
class MCPScreen(ModalScreen):
"""Interactive MCP browser — split-pane layout.
"""Interactive MCP browser — split-pane layout."""
Left pane: tree of MCP servers. Right pane: full description (scrollable).
"""
BINDINGS = [Binding("escape", "dismiss", "Close"), Binding("q", "dismiss", "Close")]
BINDINGS = [
Binding("escape", "dismiss", "Close"),
Binding("q", "dismiss", "Close"),
]
CSS = """
MCPScreen { align: center middle; }
"""
from ..mcp import MCPManager
def __init__(self, mcp_manager: MCPManager) -> None:
from ..mcp import MCPManager
from ..agents.pa_agent import PentestAgentAgent
def __init__(self, mcp_manager: MCPManager, agent: PentestAgentAgent, tui: "PentestAgentTUI") -> None:
super().__init__()
self.mcp_manager = mcp_manager
self.agent = agent
self.tui = tui
self.selected_server = None
self.selected_tool = None
# ------------------------------------------------------------
def compose(self) -> ComposeResult:
# Build a split view: left tree, right description
with Container(id="mcp-container"):
with Horizontal(id="mcp-split"):
# LEFT SIDE
with Vertical(id="mcp-left"):
yield Static("MCP Servers", id="mcp-title")
yield Tree("MCP Servers", id="mcp-tree")
# RIGHT SIDE
with Vertical(id="mcp-right"):
yield Static("Description", id="mcp-desc-title")
yield ScrollableContainer(Static("Select a MCP server to view details.", id="mcp-desc"), id="mcp-desc-scroll")
# ---- Toggle Button ----
yield Button("Enabled: 🔴", id="mcp-toggle-enabled")
yield ScrollableContainer(
Static("Select a MCP server to view details.", id="mcp-desc"),
id="mcp-desc-scroll"
)
yield Center(Button("Close", id="mcp-close"))
# ------------------------------------------------------------
def on_mount(self) -> None:
try:
tree = self.query_one("#mcp-tree", Tree)
except Exception as e:
logging.getLogger(__name__).exception("Failed to query MCP tree: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to initialize MCP tree: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about MCP tree init failure: %s", e)
return
root = tree.root
root.allow_expand = True
root.show_root = False
# Populate tool nodes
servers = self.mcp_manager.get_all_servers()
for server in servers:
@@ -500,69 +512,117 @@ class MCPScreen(ModalScreen):
for tool in server.tools:
server_node.add(tool['name'], data={"tool": tool})
# Hide toggle button initially
toggle_btn = self.query_one("#mcp-toggle-enabled", Button)
toggle_btn.display = False
try:
tree.focus()
except Exception as e:
logging.getLogger(__name__).exception("Failed to focus MCP tree: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to focus MCP tree: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about MCP tree focus failure: %s", e)
# ------------------------------------------------------------
@on(Tree.NodeSelected, "#mcp-tree")
def on_mcp_selected(self, event: Tree.NodeSelected) -> None:
node = event.node
self.selected_server = node.data.get("server") if node.data else None
self.selected_tool = node.data.get("tool") if node.data else None
desc_widget = self.query_one("#mcp-desc", Static)
toggle_btn = self.query_one("#mcp-toggle-enabled", Button)
text = Text()
if self.selected_server is not None:
mcp = self.selected_server
text.append(f"{mcp.name}\n", style="bold #d4d4d4")
text.append(f"{mcp.config.description}\n\n", style="#d4d4d4")
text.append(f"Command: {mcp.config.command}\n", style="#9a9a9a")
text.append(f"Args: {mcp.config.args}\n\n", style="#9a9a9a")
enabled_icon = "🟢" if mcp.config.enabled else "🔴"
connected_icon = "🟢" if mcp.connected else "🔴"
text.append(f"Connected: {connected_icon}\n", style="#9a9a9a")
if mcp.last_error:
text.append(f"\nLast error: {mcp.last_error}\n", style="#e91b1b")
logs = mcp.get_logs()
if logs:
text.append(f"\n{logs}\n", style="#86e41a")
desc_widget.update(text)
toggle_btn.display = True
toggle_btn.label = f"Enabled: {enabled_icon}"
elif self.selected_tool is not None:
text.append(f"{self.selected_tool['description']}\n", style="#d4d4d4")
desc_widget.update(text)
toggle_btn.display = False
else:
desc_widget.update("Choose a server.")
toggle_btn.display = False
# ------------------------------------------------------------
@on(Button.Pressed, "#mcp-toggle-enabled")
async def toggle_enabled(self) -> None:
mcp = self.selected_server
if mcp is None:
return
try:
mcp = node.data.get("server") if node.data else None
tool = node.data.get("tool") if node.data else None
new_value = not mcp.config.enabled
# Update right-hand description pane
try:
desc_widget = self.query_one("#mcp-desc", Static)
text = Text()
if mcp is not None:
text.append(f"{mcp.name}\n", style="bold #d4d4d4")
text.append(f"{mcp.config.description}\n", style="#d4d4d4")
text.append(f"Command: {mcp.config.command}\n", style="#9a9a9a")
text.append(f"Args: {mcp.config.args}\n", style="#9a9a9a")
enabled_icon = "🟢" if mcp.config.enabled else "🔴"
text.append(f"Enabled: {enabled_icon}\n", style="#9a9a9a")
connected_icon = "🟢" if mcp.connected else "🔴"
text.append(f"Connected: {connected_icon}\n", style="#9a9a9a")
if mcp.last_error:
text.append(f"Last error message: {mcp.last_error}\n", style="#e91b1b")
elif tool is not None:
text.append(f"{tool['description']}\n", style="#d4d4d4")
else:
text.append(f"Choose a server\n", style="#d4d4d4")
desc_widget.update(text)
if new_value:
await self.mcp_manager.enable(mcp.name)
tools = self.mcp_manager.create_mcp_tools_from_server(mcp)
self.agent.add_tools(tools)
else:
tools = self.mcp_manager.create_mcp_tools_from_server(mcp)
self.agent.delete_tools(tools)
await self.mcp_manager.disable(mcp.name)
self.tui._update_header()
except Exception as e:
logging.getLogger(__name__).exception("Failed to update mcp description pane: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to update mcp description: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about mcp desc update failure: %s", e)
except Exception as e:
logging.getLogger(__name__).exception("Unhandled error in on_mcp_selected: %s", e)
logging.getLogger(__name__).exception("Toggle failed: %s", e)
try:
from ..interface.notifier import notify
notify("error", f"Failed to toggle {mcp.name}: {e}")
except Exception:
pass
return
notify("warning", f"TUI: error handling mcp selection: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about mcp selection error: %s", e)
# Refresh UI
self._refresh_description()
# ------------------------------------------------------------
def _refresh_description(self):
"""Rebuild right-hand side using current selection."""
if self.selected_server:
# Simulate reselecting same node
dummy = type("Node", (), {"data": {"server": self.selected_server}})
event = type("Evt", (), {"node": dummy})
self.on_mcp_selected(event)
# ------------------------------------------------------------
@on(Button.Pressed, "#mcp-close")
def close_mcp(self) -> None:
self.app.pop_screen()
# ----- Main Chat Message Widgets -----
@@ -2101,7 +2161,7 @@ Be concise. Use the actual data from notes."""
elif cmd_lower == "/tools":
# Open the interactive tools browser (split-pane).
try:
await self.push_screen(ToolsScreen(tools=self.all_tools))
await self.push_screen(ToolsScreen(tools=self.agent.get_tools()))
except Exception:
# Fallback: list tools in the system area if UI push fails
from ..runtime.runtime import detect_environment
@@ -2457,7 +2517,7 @@ Be concise. Use the actual data from notes."""
# Open the interactive mcp browser (split-pane).
try:
await self.push_screen(MCPScreen(mcp_manager=self.mcp_manager))
await self.push_screen(MCPScreen(mcp_manager=self.mcp_manager, agent=self.agent, tui=self))
except Exception:
pass
elif action.startswith("add"):
@@ -2631,7 +2691,7 @@ Be concise. Use the actual data from notes."""
else:
# try to recreate a compact model/runtime line
runtime_str = "Docker" if getattr(self, "use_docker", False) else "Local"
tools_count = len(getattr(self, "all_tools", [])) if hasattr(self, "all_tools") else 0
tools_count = len(self.agent.get_tools())
mode = getattr(self, '_mode', 'assist')
if mode == 'assist':
mode = 'assist (use /agent or /crew for autonomous modes)'

View File

@@ -66,6 +66,11 @@ class MCPServer:
def disable(self):
self.config.enabled = False
def get_logs(self) -> str:
if not self.transport:
return ""
return self.transport.get_logs()
class MCPManager:
"""Manages MCP server connections and exposes tools to agents."""
@@ -312,3 +317,25 @@ class MCPManager:
def is_connected(self, name: str) -> bool:
server = self.servers.get(name)
return server is not None and server.connected
async def enable(self, name: str):
server = self.servers.get(name)
if not server:
return
server.enable()
server = await self._connect_server(server.config)
if server: #Do a shadow copy.
self.servers[name].name = server.name
self.servers[name].config = server.config
self.servers[name].connected = server.connected
self.servers[name].last_error = server.last_error
self.servers[name].tools = server.tools
self.servers[name].transport = server.transport
async def disable(self, name: str):
server = self.servers.get(name)
if not server:
return
await server.disconnect()
server.disable()

View File

@@ -31,6 +31,10 @@ class MCPTransport(ABC):
"""Check if the transport is connected."""
pass
@abstractmethod
async def get_logs(self) -> str:
pass
class StdioTransport(MCPTransport):
"""MCP transport over stdio (for npx/uvx commands)."""
@@ -49,11 +53,28 @@ class StdioTransport(MCPTransport):
self.env = env
self.process: Optional[asyncio.subprocess.Process] = None
self._lock = asyncio.Lock()
self.logstask = None
self.logs = ""
def get_logs(self) -> str:
return self.logs
@property
def is_connected(self) -> bool:
"""Check if the process is running."""
return self.process is not None and self.process.returncode is None
async def _read_stderr_loop(self):
try:
while True:
line = await self.process.stderr.readline()
if not line:
break
self.logs += line.decode().rstrip() + "\n"
except asyncio.CancelledError:
# Optional: do any cleanup here
pass
async def connect(self):
"""Start the MCP server process."""
@@ -84,6 +105,9 @@ class StdioTransport(MCPTransport):
limit=1024 * 1024, # 1MB buffer limit for large MCP responses
)
self.logstask = asyncio.create_task(self._read_stderr_loop())
async def send(self, message: dict, timeout: float = 15.0) -> dict:
"""
Send a JSON-RPC message and wait for response.
@@ -129,6 +153,15 @@ class StdioTransport(MCPTransport):
if not self.process:
return
if self.logstask:
self.logstask.cancel()
try:
await self.logstask
except asyncio.CancelledError:
pass # Task was successfully cancelled
proc = self.process
self.process = None
@@ -180,6 +213,9 @@ class SSETransport(MCPTransport):
"""Check if the session is active."""
return self._connected and self.session is not None
def get_logs(self) -> str:
return ""
async def connect(self):
"""Connect to the SSE endpoint."""
try: