mcp: add MetasploitMCP adapter, installer, and config; add LAUNCH_METASPLOIT_MCP env var

This commit is contained in:
giveen
2026-01-14 12:57:53 -07:00
parent 57a0e6e7c8
commit 97df933f42
5 changed files with 383 additions and 41 deletions

View File

@@ -110,28 +110,50 @@ class MCPManager:
start_on_launch=config.get("start_on_launch", False),
description=config.get("description", ""),
)
# Allow override via environment variable: LAUNCH_HEXTRIKE or LAUNCH_HEXSTRIKE
# If set to a truthy value (1,true,y), force-enable auto-start for vendored HexStrike.
# If set to a falsy value (0,false,n), force-disable auto-start for vendored HexStrike.
launch_env = os.environ.get("LAUNCH_HEXTRIKE") or os.environ.get("LAUNCH_HEXSTRIKE")
if launch_env is not None:
# Allow override via environment variables for vendored MCP servers.
# Per-adapter overrides supported:
# - Hexstrike: LAUNCH_HEXTRIKE or LAUNCH_HEXSTRIKE
# - Metasploit: LAUNCH_METASPLOIT_MCP
# If set to a truthy value (1,true,y), force-enable auto-start for matching vendored server.
# If set to a falsy value (0,false,n), force-disable auto-start for matching vendored server.
def _apply_launch_override(env_names, match_fn):
launch_env = None
for e in env_names:
launch_env = os.environ.get(e)
if launch_env is not None:
break
if launch_env is None:
return
v = str(launch_env).strip().lower()
enable = v in ("1", "true", "yes", "y")
disable = v in ("0", "false", "no", "n")
for name, cfg in servers.items():
lowered = name.lower() if name else ""
is_hex = (
"hexstrike" in lowered
or (cfg.command and "third_party/hexstrike" in str(cfg.command))
or any("third_party/hexstrike" in str(a) for a in (cfg.args or []))
)
if not is_hex:
try:
if not match_fn(name, cfg):
continue
if enable:
cfg.start_on_launch = True
elif disable:
cfg.start_on_launch = False
except Exception:
continue
if enable:
cfg.start_on_launch = True
elif disable:
cfg.start_on_launch = False
# Hexstrike override
_apply_launch_override(["LAUNCH_HEXTRIKE", "LAUNCH_HEXSTRIKE"],
lambda name, cfg: (
(name or "").lower().find("hexstrike") != -1
or (cfg.command and "third_party/hexstrike" in str(cfg.command))
or any("third_party/hexstrike" in str(a) for a in (cfg.args or []))
))
# Metasploit override
_apply_launch_override(["LAUNCH_METASPLOIT_MCP"],
lambda name, cfg: (
(name or "").lower().find("metasploit") != -1
or (cfg.command and "third_party/MetasploitMCP" in str(cfg.command))
or any("third_party/MetasploitMCP" in str(a) for a in (cfg.args or []))
))
return servers
except json.JSONDecodeError as e:
@@ -277,14 +299,21 @@ class MCPManager:
async def connect_all(self) -> List[Any]:
servers_config = self._load_config()
# Respect explicit LAUNCH_HEXTRIKE/LAUNCH_HEXSTRIKE env override.
# If set to a falsy value (0/false/no/n) we will skip connecting to vendored HexStrike servers.
launch_env = os.environ.get("LAUNCH_HEXTRIKE") or os.environ.get("LAUNCH_HEXSTRIKE")
launch_disabled = False
if launch_env is not None:
v = str(launch_env).strip().lower()
# Respect explicit LAUNCH_* env overrides for vendored MCP servers.
# If set to a falsy value (0/false/no/n) we will skip connecting to matching vendored servers.
launch_hex_env = os.environ.get("LAUNCH_HEXTRIKE") or os.environ.get("LAUNCH_HEXSTRIKE")
launch_hex_disabled = False
if launch_hex_env is not None:
v = str(launch_hex_env).strip().lower()
if v in ("0", "false", "no", "n"):
launch_disabled = True
launch_hex_disabled = True
launch_msf_env = os.environ.get("LAUNCH_METASPLOIT_MCP")
launch_msf_disabled = False
if launch_msf_env is not None:
v = str(launch_msf_env).strip().lower()
if v in ("0", "false", "no", "n"):
launch_msf_disabled = True
all_tools = []
for name, config in servers_config.items():
@@ -300,27 +329,49 @@ class MCPManager:
if launch_disabled and is_hex:
print(f"[MCP] Skipping auto-connection for {name} due to LAUNCH_HEXTRIKE={launch_env}")
continue
# Optionally auto-start vendored servers (e.g., HexStrike subtree)
# Optionally auto-start vendored servers (e.g., HexStrike subtree or MetasploitMCP)
if getattr(config, "start_on_launch", False):
try:
# Attempt to auto-start vendored HexStrike if present
if "third_party/hexstrike" in " ".join(config.args) or (
config.command and "third_party/hexstrike" in config.command
):
try:
from .hexstrike_adapter import HexstrikeAdapter
args_joined = " ".join(config.args or [])
cmd_str = config.command or ""
adapter = HexstrikeAdapter()
started = await adapter.start()
if started:
# remember adapter so we can stop it later
try:
self._started_adapters[name] = adapter
except Exception:
pass
print(f"[MCP] Auto-started vendored server for {name}")
except Exception as e:
print(f"[MCP] Failed to auto-start vendored server {name}: {e}")
# Hexstrike auto-start
if "third_party/hexstrike" in args_joined or (cmd_str and "third_party/hexstrike" in cmd_str):
if not launch_hex_disabled:
try:
from .hexstrike_adapter import HexstrikeAdapter
adapter = HexstrikeAdapter()
started = await adapter.start()
if started:
try:
self._started_adapters[name] = adapter
except Exception:
pass
print(f"[MCP] Auto-started vendored server for {name}")
except Exception as e:
print(f"[MCP] Failed to auto-start vendored server {name}: {e}")
else:
print(f"[MCP] Skipping auto-start for {name} due to LAUNCH_HEXTRIKE override")
# Metasploit auto-start
if "third_party/MetasploitMCP" in args_joined or (cmd_str and "third_party/MetasploitMCP" in cmd_str) or (name and "metasploit" in name.lower()):
if not launch_msf_disabled:
try:
from .metasploit_adapter import MetasploitAdapter
adapter = MetasploitAdapter()
started = await adapter.start()
if started:
try:
self._started_adapters[name] = adapter
except Exception:
pass
print(f"[MCP] Auto-started vendored server for {name}")
except Exception as e:
print(f"[MCP] Failed to auto-start vendored server {name}: {e}")
else:
print(f"[MCP] Skipping auto-start for {name} due to LAUNCH_METASPLOIT_MCP override")
except Exception:
pass
server = await self._connect_server(config)

View File

@@ -12,5 +12,18 @@
"enabled": true,
"start_on_launch": false
}
,
"metasploit-local": {
"command": "python3",
"args": [
"third_party/MetasploitMCP/metasploit_mcp.py",
"--server",
"http://127.0.0.1:7777"
],
"description": "Metasploit MCP (vendored) - local server",
"timeout": 300,
"enabled": false,
"start_on_launch": false
}
}
}

View File

@@ -0,0 +1,222 @@
"""Adapter to manage a vendored Metasploit MCP server.
This follows the same lightweight pattern as the Hexstrike adapter: it
expects the MetasploitMCP repository to be vendored under
``third_party/MetasploitMCP`` (or a custom path provided by the caller).
The adapter starts the server as a background subprocess and performs a
health check on a configurable port.
"""
import asyncio
import os
import shutil
from pathlib import Path
from typing import Optional
import time
import signal
try:
import aiohttp
except Exception:
aiohttp = None
LOOT_DIR = Path("loot/artifacts")
LOOT_DIR.mkdir(parents=True, exist_ok=True)
LOG_FILE = LOOT_DIR / "metasploit_mcp.log"
class MetasploitAdapter:
"""Manage a vendored Metasploit MCP server under `third_party/MetasploitMCP`.
Usage:
adapter = MetasploitAdapter()
await adapter.start()
# ... use MCPManager to connect to the server
await adapter.stop()
"""
def __init__(
self,
host: str = "127.0.0.1",
port: int = 7777,
python_cmd: str = "python3",
server_script: Optional[Path] = None,
cwd: Optional[Path] = None,
env: Optional[dict] = None,
) -> None:
self.host = host
self.port = int(port)
self.python_cmd = python_cmd
self.server_script = (
server_script or Path("third_party/MetasploitMCP/metasploit_mcp.py")
)
self.cwd = cwd or Path.cwd()
self.env = {**os.environ, **(env or {})}
self._process: Optional[asyncio.subprocess.Process] = None
self._reader_task: Optional[asyncio.Task] = None
def _build_command(self):
return [self.python_cmd, str(self.server_script), "--port", str(self.port)]
async def start(self, background: bool = True, timeout: int = 30) -> bool:
"""Start the vendored Metasploit MCP server.
Returns True if the server started and passed health check within
`timeout` seconds.
"""
if not self.server_script.exists():
raise FileNotFoundError(
f"Metasploit MCP server script not found at {self.server_script}."
)
if self._process and self._process.returncode is None:
return await self.health_check(timeout=1)
cmd = self._build_command()
resolved = shutil.which(self.python_cmd) or self.python_cmd
self._process = await asyncio.create_subprocess_exec(
resolved,
*cmd[1:],
cwd=str(self.cwd),
env=self.env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
start_new_session=True,
)
# Log PID
try:
pid = getattr(self._process, "pid", None)
if pid:
with LOG_FILE.open("a") as fh:
fh.write(f"[MetasploitAdapter] started pid={pid}\n")
except Exception:
pass
# Start background reader
loop = asyncio.get_running_loop()
self._reader_task = loop.create_task(self._capture_output())
try:
return await self.health_check(timeout=timeout)
except Exception:
return False
async def _capture_output(self) -> None:
if not self._process or not self._process.stdout:
return
try:
with LOG_FILE.open("ab") as fh:
while True:
line = await self._process.stdout.readline()
if not line:
break
fh.write(line)
fh.flush()
except asyncio.CancelledError:
pass
except Exception:
pass
async def stop(self, timeout: int = 5) -> None:
proc = self._process
if not proc:
return
try:
proc.terminate()
await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.TimeoutError:
try:
proc.kill()
except Exception:
pass
except Exception:
pass
self._process = None
if self._reader_task and not self._reader_task.done():
self._reader_task.cancel()
try:
await self._reader_task
except Exception:
pass
def stop_sync(self, timeout: int = 5) -> None:
proc = self._process
if not proc:
return
try:
pid = getattr(proc, "pid", None)
if pid:
try:
pgid = os.getpgid(pid)
os.killpg(pgid, signal.SIGTERM)
except Exception:
try:
os.kill(pid, signal.SIGTERM)
except Exception:
pass
end = time.time() + float(timeout)
while time.time() < end:
ret = getattr(proc, "returncode", None)
if ret is not None:
break
time.sleep(0.1)
try:
pgid = os.getpgid(pid)
os.killpg(pgid, signal.SIGKILL)
except Exception:
try:
os.kill(pid, signal.SIGKILL)
except Exception:
pass
except Exception:
pass
def __del__(self):
try:
self.stop_sync()
except Exception:
pass
try:
self._process = None
except Exception:
pass
async def health_check(self, timeout: int = 5) -> bool:
url = f"http://{self.host}:{self.port}/health"
if aiohttp:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as resp:
return resp.status == 200
except Exception:
return False
import urllib.request
def _check():
try:
with urllib.request.urlopen(url, timeout=timeout) as r:
return r.status == 200
except Exception:
return False
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _check)
def is_running(self) -> bool:
return self._process is not None and self._process.returncode is None
__all__ = ["MetasploitAdapter"]

View File

@@ -0,0 +1,16 @@
#!/usr/bin/env bash
# Helper script to vendor MetasploitMCP into this repo using git subtree.
# Run from repository root.
set -euo pipefail
REPO_URL="https://github.com/GH05TCREW/MetasploitMCP.git"
PREFIX="third_party/MetasploitMCP"
BRANCH="main"
echo "This will add MetasploitMCP as a git subtree under ${PREFIX}."
echo "If you already have a subtree, use 'git subtree pull' instead.\n"
git subtree add --prefix="${PREFIX}" "${REPO_URL}" "${BRANCH}" --squash
echo "MetasploitMCP subtree added under ${PREFIX}."

View File

@@ -0,0 +1,40 @@
#!/usr/bin/env bash
set -euo pipefail
# Install vendored MetasploitMCP Python dependencies.
# This script will source a local .env if present so any environment
# variables (proxies/indices/LLM keys) are respected during installation.
HERE=$(dirname "${BASH_SOURCE[0]}")
ROOT=$(cd "$HERE/.." && pwd)
cd "$ROOT"
if [ -f ".env" ]; then
echo "Sourcing .env"
set -a
# shellcheck disable=SC1091
source .env
set +a
fi
REQ=third_party/MetasploitMCP/requirements.txt
if [ ! -f "$REQ" ]; then
echo "Cannot find $REQ. Is the MetasploitMCP subtree present?"
exit 1
fi
echo "Installing MetasploitMCP requirements from $REQ"
PY=$(which python || true)
if [ -n "${VIRTUAL_ENV:-}" ]; then
PY="$VIRTUAL_ENV/bin/python"
fi
"$PY" -m pip install --upgrade pip
"$PY" -m pip install -r "$REQ"
echo "MetasploitMCP dependencies installed. Note: external components may still be required."
exit 0