feat: Added support for SSE transport for MCP servers.

Tested with n8n MCP server node.
This commit is contained in:
famez
2026-02-22 13:49:46 +01:00
parent da17546de4
commit 86aa316007
21 changed files with 308 additions and 274 deletions

View File

@@ -0,0 +1,4 @@
# SSE examples
## Description
In this section, you will find some examples to integrate SSE MCP servers with the agent.

View File

@@ -0,0 +1,47 @@
# N8N MCP integration
## Steps
1. Adjust the mcp_servers.json to point to your MCP server in n8n.
``` json
{
"mcpServers": {
"n8n": {
"type": "sse",
"url": "http://192.168.0.19:5678/mcp/64c33586-ce0b-493c-b0af-106b90f843bf"
}
}
}
```
For authentication:
``` json
{
"mcpServers": {
"n8n": {
"type": "sse",
"url": "http://192.168.0.19:5678/mcp/64c33586-ce0b-493c-b0af-106b90f843bf",
"bearer": "aeiou"
}
}
}
```
2. Execute the docker commands.
```bash
docker-compose build
docker-compose run --rm pentestagent
```
The n8n MCP server should be available:
![alt text](image.png)
![alt text](image-1.png)
![alt text](image-2.png)
![alt text](image-3.png)

View File

@@ -0,0 +1,22 @@
services:
pentestagent:
build:
context: ../../../
dockerfile: Dockerfile
container_name: pentestagent_with_n8n_mcp
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- PENTESTAGENT_MODEL=${PENTESTAGENT_MODEL}
- PENTESTAGENT_DEBUG=${PENTESTAGENT_DEBUG:-false}
volumes:
- ./mcp_servers.json:/app/mcp_servers.json
networks:
- pentestagent-net
stdin_open: true
tty: true
networks:
pentestagent-net:
driver: bridge

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

View File

@@ -0,0 +1,8 @@
{
"mcpServers": {
"n8n": {
"type": "sse",
"url": "http://192.168.0.19:5678/mcp/64c33586-ce0b-493c-b0af-106b90f843bf"
}
}
}

View File

@@ -0,0 +1,4 @@
# STDIO examples
## Description
In this section, you will find some examples to integrate STDIO MCP servers with the agent.

View File

@@ -10,7 +10,7 @@ WORKDIR /app
RUN pip install --no-cache-dir -r MCP-Kali-Server/requirements.txt
COPY mcp_servers.json /app/pentestagent/mcp/
COPY --chown=pentestagent:pentestagent mcp_servers.json /app/pentestagent/mcp/
# Expose any needed ports
EXPOSE 8080

View File

Before

Width:  |  Height:  |  Size: 364 KiB

After

Width:  |  Height:  |  Size: 364 KiB

View File

Before

Width:  |  Height:  |  Size: 46 KiB

After

Width:  |  Height:  |  Size: 46 KiB

View File

@@ -2,7 +2,7 @@
"mcpServers": {
"kali": {
"command": "python",
"args": ["MCP-Kali-Server/mcp_server.py", "--server", "http://192.168.56.108:5000"]
"args": ["MCP-Kali-Server/mcp_server.py", "--server", "http://192.168.0.22:5000"]
}
}
}

View File

@@ -447,7 +447,7 @@ def handle_mcp_command(args: argparse.Namespace):
console.print(f"\nConfig file: {manager.config_path}")
elif args.mcp_command == "add":
manager.add_server(
manager.add_stdio_server(
name=args.name,
command=args.command,
args=args.args or [],

View File

@@ -455,7 +455,7 @@ class MCPScreen(ModalScreen):
MCPScreen { align: center middle; }
"""
from ..mcp import MCPManager
from ..mcp import MCPManager, MCPServerConfig, StdioServerConfig, SSEServerConfig
from ..agents.pa_agent import PentestAgentAgent
def __init__(self, mcp_manager: MCPManager, agent: PentestAgentAgent, tui: "PentestAgentTUI") -> None:
@@ -479,15 +479,14 @@ class MCPScreen(ModalScreen):
# RIGHT SIDE
with Vertical(id="mcp-right"):
yield Static("Description", id="mcp-desc-title")
# ---- Toggle Button ----
yield Button("Enabled: 🔴", id="mcp-toggle-enabled")
yield Static("Description", id="mcp-desc-title")
yield ScrollableContainer(
Static("Select a MCP server to view details.", id="mcp-desc"),
id="mcp-desc-scroll"
)
yield Center(Button("Close", id="mcp-close"))
@@ -525,6 +524,8 @@ class MCPScreen(ModalScreen):
@on(Tree.NodeSelected, "#mcp-tree")
def on_mcp_selected(self, event: Tree.NodeSelected) -> None:
from ..mcp import MCPServer, StdioServerConfig, SSEServerConfig
node = event.node
self.selected_server = node.data.get("server") if node.data else None
@@ -536,13 +537,19 @@ class MCPScreen(ModalScreen):
text = Text()
if self.selected_server is not None:
mcp = self.selected_server
mcp : MCPServer = self.selected_server
text.append(f"{mcp.name}\n", style="bold #d4d4d4")
text.append(f"{mcp.config.description}\n\n", style="#d4d4d4")
text.append(f"Command: {mcp.config.command}\n", style="#9a9a9a")
text.append(f"Args: {mcp.config.args}\n\n", style="#9a9a9a")
text.append(f"Type: {mcp.config.type}\n", style="#9a9a9a")
if isinstance(mcp.config, SSEServerConfig):
text.append(f"URL: {mcp.config.url}\n", style="#9a9a9a")
elif isinstance(mcp.config, StdioServerConfig):
text.append(f"Command: {mcp.config.command}\n", style="#9a9a9a")
text.append(f"Args: {mcp.config.args}\n\n", style="#9a9a9a")
enabled_icon = "🟢" if mcp.config.enabled else "🔴"
connected_icon = "🟢" if mcp.connected else "🔴"
@@ -1591,16 +1598,23 @@ class PentestAgentTUI(App):
try:
loop = asyncio.get_running_loop()
loop.create_task(load_mcp())
except RuntimeError:
except RuntimeError as e:
# No running loop (unlikely in Textual worker), run in thread
import traceback
self._add_system(f"[!] Init failed: {e}\n{traceback.format_exc()}")
self._set_status("error")
try:
asyncio.run(load_mcp())
except Exception:
pass
except Exception as e:
import traceback
self._add_system(f"[!] Init failed: {e}\n{traceback.format_exc()}")
self._set_status("error")
mcp_server_count = len(self.mcp_manager.list_configured_servers())
except Exception:
except Exception as e:
self.mcp_manager = None
mcp_server_count = 0
self._add_system(f"[!] Init failed: {e}\n{traceback.format_exc()}")
self._set_status("error")
@@ -2565,32 +2579,46 @@ Be concise. Use the actual data from notes."""
# Parse the args string into individual components
parts = args.split()
if len(parts) < 2:
self._add_system("Usage: /mcp add <name> <command> [args...]")
self._add_system("Usage: /mcp add <type> <name> <command|url> [args...]")
return
name = parts[0]
command = parts[1]
mcp_type = parts[0]
name = parts[1]
command_or_url = parts[1]
mcp_args = parts[2:] if len(parts) > 2 else []
self.mcp_manager.add_server(
name=name,
command=command,
args=mcp_args,
)
if not self.mcp_manager:
return
if mcp_type == "sse":
self.mcp_manager.add_sse_server(
name=name,
url=command_or_url,
)
else:
self.mcp_manager.add_stdio_server(
name=name,
command=command_or_url,
args=mcp_args,
)
server = await self.mcp_manager.connect_server(name)
self.mcp_server_count = len(self.mcp_manager.list_configured_servers())
tools = self.mcp_manager.create_mcp_tools_from_server(server)
if server:
self.agent.add_tools(tools)
tools = self.mcp_manager.create_mcp_tools_from_server(server)
if self.agent:
self.agent.add_tools(tools)
for tool in tools:
register_tool_instance(tool)
for tool in tools:
register_tool_instance(tool)
self.all_tools = get_all_tools()
self._update_header()
self.all_tools = get_all_tools()
self._update_header()
if not action:

View File

@@ -1,17 +1,17 @@
"""MCP (Model Context Protocol) integration for PentestAgent."""
from .discovery import MCPDiscovery
from .manager import MCPManager, MCPServer, MCPServerConfig
from .manager import MCPManager, MCPServer, MCPServerConfig, StdioServerConfig, SSEServerConfig
from .tools import create_mcp_tool
from .transport import MCPTransport, SSETransport, StdioTransport
__all__ = [
"MCPManager",
"MCPServerConfig",
"StdioServerConfig",
"SSEServerConfig",
"MCPServer",
"MCPTransport",
"StdioTransport",
"SSETransport",
"create_mcp_tool",
"MCPDiscovery",
]

View File

@@ -1,204 +0,0 @@
"""MCP tool discovery for PentestAgent."""
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional
@dataclass
class DiscoveredServer:
"""A discovered MCP server."""
name: str
description: str
type: str # "stdio" or "sse"
command: Optional[str] = None
args: Optional[List[str]] = None
url: Optional[str] = None
tools: List[dict] = None
def __post_init__(self):
if self.tools is None:
self.tools = []
class MCPDiscovery:
"""Discovers available MCP servers and tools."""
# Known MCP servers for security tools
KNOWN_SERVERS = [
{
"name": "nmap",
"description": "Network scanning and host discovery",
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-nmap"],
},
{
"name": "filesystem",
"description": "File system operations",
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem"],
},
{
"name": "fetch",
"description": "HTTP requests and web fetching",
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-fetch"],
},
]
def __init__(self, config_path: Path = Path("mcp.json")):
"""
Initialize MCP discovery.
Args:
config_path: Path to the MCP configuration file
"""
self.config_path = config_path
def discover_local(self) -> List[DiscoveredServer]:
"""
Discover locally installed MCP servers.
Returns:
List of discovered servers
"""
discovered = []
# Check for npm global packages
# Check for Python packages
# This is a simplified implementation
for server_info in self.KNOWN_SERVERS:
discovered.append(DiscoveredServer(**server_info))
return discovered
def load_from_config(self) -> List[Dict[str, Any]]:
"""
Load server configurations from file.
Returns:
List of server configurations
"""
if not self.config_path.exists():
return []
try:
config = json.loads(self.config_path.read_text(encoding="utf-8"))
return config.get("servers", [])
except json.JSONDecodeError:
return []
def add_server_to_config(
self,
name: str,
server_type: str,
command: Optional[str] = None,
args: Optional[List[str]] = None,
url: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
) -> bool:
"""
Add a server to the configuration file.
Args:
name: Server name
server_type: "stdio" or "sse"
command: Command for stdio servers
args: Arguments for stdio servers
url: URL for SSE servers
env: Environment variables
Returns:
True if added successfully
"""
# Load existing config
if self.config_path.exists():
try:
config = json.loads(self.config_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
config = {"servers": []}
else:
config = {"servers": []}
# Check if server already exists
for existing in config["servers"]:
if existing.get("name") == name:
return False
# Build server config
server_config = {"name": name, "type": server_type, "enabled": True}
if server_type == "stdio":
server_config["command"] = command
server_config["args"] = args or []
if env:
server_config["env"] = env
elif server_type == "sse":
server_config["url"] = url
config["servers"].append(server_config)
# Save config
self.config_path.write_text(json.dumps(config, indent=2), encoding="utf-8")
return True
def remove_server_from_config(self, name: str) -> bool:
"""
Remove a server from the configuration file.
Args:
name: Server name to remove
Returns:
True if removed successfully
"""
if not self.config_path.exists():
return False
try:
config = json.loads(self.config_path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return False
original_count = len(config.get("servers", []))
config["servers"] = [
s for s in config.get("servers", []) if s.get("name") != name
]
if len(config["servers"]) == original_count:
return False
self.config_path.write_text(json.dumps(config, indent=2), encoding="utf-8")
return True
def generate_default_config(self) -> Dict[str, Any]:
"""
Generate a default MCP configuration.
Returns:
Default configuration dictionary
"""
return {
"servers": [
{
"name": "filesystem",
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
"enabled": True,
}
]
}
def save_default_config(self):
"""Save the default configuration to file."""
config = self.generate_default_config()
self.config_path.write_text(json.dumps(config, indent=2), encoding="utf-8")

View File

@@ -18,23 +18,49 @@ import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from abc import ABC
from .tools import create_mcp_tool
from .transport import MCPTransport, StdioTransport
from .transport import MCPTransport, StdioTransport, SSETransport
@dataclass
class MCPServerConfig:
"""Configuration for an MCP server."""
class MCPServerConfig(ABC):
"""Base configuration for an MCP server."""
type: str = field(init=False)
name: str
command: str
args: List[str] = field(default_factory=list)
env: Dict[str, str] = field(default_factory=dict)
enabled: bool = True
description: str = ""
@dataclass
class StdioServerConfig(MCPServerConfig):
"""Configuration for a stdio-based MCP server."""
command: str = ""
args: List[str] = field(default_factory=list)
env: Dict[str, str] = field(default_factory=dict)
def __post_init__(self):
self.type = "stdio"
@dataclass
class SSEServerConfig(MCPServerConfig):
"""Configuration for an SSE-based MCP server."""
url: str = ""
bearer: str = ""
def __post_init__(self):
self.type = "sse"
def set_bearer(self, bearer: str) -> None:
self.bearer = bearer
"""Configuration for an MCP server."""
@dataclass
class MCPServer:
"""Represents a connected MCP server."""
@@ -104,18 +130,39 @@ class MCPManager:
raw = json.loads(self.config_path.read_text(encoding="utf-8"))
servers = {}
mcp_servers = raw.get("mcpServers", {})
for name, config in mcp_servers.items():
if not config.get("command"):
continue
servers[name] = MCPServerConfig(
name=name,
command=config["command"],
args=config.get("args", []),
env=config.get("env", {}),
enabled=config.get("enabled", True),
description=config.get("description", ""),
)
if config.get("type") and config["type"] == "sse":
if not config.get("url"):
continue #Improper configuration
servers[name] = SSEServerConfig(
name=name,
url=config.get("url", ""),
enabled=config.get("enabled", True),
description=config.get("description", ""),
)
if config.get("bearer"):
servers[name].set_bearer(config["bearer"])
else:
if not config.get("command"):
continue
servers[name] = StdioServerConfig(
name=name,
command=config.get("command", ""),
args=config.get("args", []),
env=config.get("env", {}),
enabled=config.get("enabled", True),
description=config.get("description", ""),
)
return servers
except json.JSONDecodeError as e:
print(f"[MCP] Error loading config: {e}")
return {}
@@ -123,18 +170,29 @@ class MCPManager:
def _save_config(self, servers: Dict[str, MCPServerConfig]):
config = {"mcpServers": {}}
for name, server in servers.items():
server_config = {"command": server.command, "args": server.args}
if server.env:
server_config["env"] = server.env
server_config: dict[str, Any] = {"type": server.type}
if server.description:
server_config["description"] = server.description
if not server.enabled:
server_config["enabled"] = False
if isinstance(server, SSEServerConfig):
server_config["url"] = server.url
if server.bearer:
server_config["bearer"] = server.bearer
elif isinstance(server, StdioServerConfig):
server_config["command"] = server.command
server_config["args"] = server.args
if server.env:
server_config["env"] = server.env
config["mcpServers"][name] = server_config
self.config_path.parent.mkdir(parents=True, exist_ok=True)
self.config_path.write_text(json.dumps(config, indent=2), encoding="utf-8")
def add_server(
def add_stdio_server(
self,
name: str,
command: str,
@@ -143,15 +201,30 @@ class MCPManager:
description: str = "",
):
servers = self._load_config()
servers[name] = MCPServerConfig(
servers[name] = StdioServerConfig(
name=name,
command=command,
command=command or "",
args=args or [],
env=env or {},
description=description,
)
self._save_config(servers)
print(f"[MCP] Added server: {name}")
print(f"[MCP] Added server: {name}, stdio type")
def add_sse_server(
self,
name: str,
url: str,
description: str = "",
):
servers = self._load_config()
servers[name] = SSEServerConfig(
name=name,
url=url or "",
description=description,
)
self._save_config(servers)
print(f"[MCP] Added server: {name}, sse type")
def remove_server(self, name: str) -> bool:
servers = self._load_config()
@@ -166,9 +239,11 @@ class MCPManager:
return [
{
"name": n,
"command": s.command,
"args": s.args,
"env": s.env,
"type": s.type,
"command": getattr(s, 'command', None),
"url": getattr(s, 'url', None),
"args": getattr(s, 'args', None),
"env": getattr(s, 'env', None),
"enabled": s.enabled,
"description": s.description,
"connected": n in self.servers and self.servers[n].connected,
@@ -211,10 +286,18 @@ class MCPManager:
async def _connect_server(self, config: MCPServerConfig) -> Optional[MCPServer]:
transport = None
try:
env = {**os.environ, **config.env}
transport = StdioTransport(
command=config.command, args=config.args, env=env
)
if isinstance(config, SSEServerConfig):
transport = SSETransport(url=config.url, bearer=config.bearer)
elif isinstance(config, StdioServerConfig):
env = {**os.environ, **config.env}
transport = StdioTransport(
command=config.command, args=config.args, env=env
)
if transport is None:
raise RuntimeError("Failed to create transport")
await transport.connect()
await transport.send(
@@ -222,7 +305,7 @@ class MCPManager:
"jsonrpc": "2.0",
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"protocolVersion": "2025-11-25",
"capabilities": {},
"clientInfo": {"name": "pentestagent", "version": "0.2.0"},
},

View File

@@ -32,7 +32,7 @@ class MCPTransport(ABC):
pass
@abstractmethod
async def get_logs(self) -> str:
def get_logs(self) -> str:
pass
@@ -197,7 +197,7 @@ class StdioTransport(MCPTransport):
class SSETransport(MCPTransport):
"""MCP transport over Server-Sent Events (HTTP)."""
def __init__(self, url: str):
def __init__(self, url: str, bearer: str = ""):
"""
Initialize SSE transport.
@@ -207,6 +207,8 @@ class SSETransport(MCPTransport):
self.url = url
self.session: Optional[Any] = None # aiohttp.ClientSession
self._connected = False
self._logs = ""
self._bearer = bearer
@property
def is_connected(self) -> bool:
@@ -214,21 +216,29 @@ class SSETransport(MCPTransport):
return self._connected and self.session is not None
def get_logs(self) -> str:
return ""
return self._logs
async def connect(self):
"""Connect to the SSE endpoint."""
try:
import aiohttp
self.session = aiohttp.ClientSession()
headers={
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream"
}
if self._bearer:
headers["Authorization"] = f"Bearer {self._bearer}"
self.session = aiohttp.ClientSession(headers = headers)
self._connected = True
except ImportError as e:
raise RuntimeError(
"aiohttp is required for SSE transport. Install with: pip install aiohttp"
) from e
async def send(self, message: dict) -> dict:
async def send(self, message: dict, timeout: float = 15.0) -> dict:
"""
Send a message via HTTP POST.
@@ -242,16 +252,48 @@ class SSETransport(MCPTransport):
raise RuntimeError("Transport not connected")
try:
import aiohttp
headers_str = "\n".join(f"{k}: {v}" for k, v in self.session.headers.items())
self._logs += f"Request headers: {headers_str}" + "\n"
async with self.session.post(
self.url, json=message, headers={"Content-Type": "application/json"}
self.url, json=message, timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status != 200:
if response.status != 200 and response.status != 202:
raise RuntimeError(f"HTTP error: {response.status}")
return await response.json()
#https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#session-management
mcp_session_id = response.headers.get("mcp-session-id")
if mcp_session_id:
self.session.headers.update({ "mcp-session-id" : mcp_session_id })
content_type = response.headers.get("Content-Type", "")
headers_str = "\n".join(f"{k}: {v}" for k, v in response.headers.items())
self._logs += f"Response headers: {headers_str}\nResponse status: {response.status}\n"
if "application/json" in content_type:
return await response.json()
elif "text/event-stream" in content_type:
text = await response.text()
# Parse SSE format: "data: {json}\n"
for line in text.split("\n"):
if line.startswith("data:"):
try:
return json.loads(line[len("data:"):].strip())
except json.JSONDecodeError:
pass
raise RuntimeError("No valid data field in SSE response")
return dict()
except ImportError as e:
raise RuntimeError(
"aiohttp is required for SSE transport. Install with: pip install aiohttp"
) from e
except Exception as e:
raise RuntimeError(f"SSE request failed: {e}") from e
async def disconnect(self):
"""Close the HTTP session."""
@@ -291,7 +333,7 @@ class WebSocketTransport(MCPTransport):
except ImportError as e:
raise RuntimeError("aiohttp is required for WebSocket transport") from e
async def send(self, message: dict) -> dict:
async def send(self, message: dict, timeout: float = 15.0) -> dict:
"""
Send a message via WebSocket.