From e1b6a6af071c30afe0471a9f5e3ad5466712ff32 Mon Sep 17 00:00:00 2001 From: giveen Date: Wed, 21 Jan 2026 12:01:36 -0700 Subject: [PATCH] feat(mcp): add generic stdio adapter under pentestagent/mcp and update example config --- mcp_servers.json.example | 10 ++ pentestagent/mcp/stdio_adapter.py | 137 ++++++++++++++++++ .../hexstrike/hexstrike_stdio_adapter.py | 136 +++++++++++++++++ third_party/mcp/stdio_adapter.py | 137 ++++++++++++++++++ 4 files changed, 420 insertions(+) create mode 100644 mcp_servers.json.example create mode 100644 pentestagent/mcp/stdio_adapter.py create mode 100644 third_party/hexstrike/hexstrike_stdio_adapter.py create mode 100644 third_party/mcp/stdio_adapter.py diff --git a/mcp_servers.json.example b/mcp_servers.json.example new file mode 100644 index 0000000..785f282 --- /dev/null +++ b/mcp_servers.json.example @@ -0,0 +1,10 @@ +{ + "mcpServers": { + "hexstrike-local": { + "command": "python3", + "args": ["-u", "pentestagent/mcp/stdio_adapter.py"], + "description": "Generic StdIO adapter bridge to HTTP API (set STDIO_TARGET)", + "enabled": true + } + } +} diff --git a/pentestagent/mcp/stdio_adapter.py b/pentestagent/mcp/stdio_adapter.py new file mode 100644 index 0000000..ae804b2 --- /dev/null +++ b/pentestagent/mcp/stdio_adapter.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +"""Generic stdio JSON-RPC adapter bridge to an HTTP API. + +Configure via environment variables: +- `STDIO_TARGET` (default: "http://127.0.0.1:8888") +- `STDIO_TOOLS` (JSON list of tool descriptors, default: `[{"name":"http_api","description":"Generic HTTP proxy"}]`) + +The adapter implements the minimal MCP/stdio surface required by +`pentestagent`'s `StdioTransport`: +- handle `initialize` and `notifications/initialized` +- respond to `tools/list` +- handle `tools/call` and forward to HTTP endpoints + +`tools/call` arguments format (generic): + {"path": "/api/foo", "method": "POST", "params": {...}, "body": {...} } + +This file is intentionally small and dependency-light; it uses `requests` +when available and returns response JSON or text. +""" +from __future__ import annotations + +import json +import os +import sys +from typing import Any, Dict, List + +try: + import requests +except Exception: + requests = None + + +TARGET = os.environ.get("STDIO_TARGET", "http://127.0.0.1:8888").rstrip("/") +_tools_env = os.environ.get("STDIO_TOOLS") +if _tools_env: + try: + TOOLS: List[Dict[str, str]] = json.loads(_tools_env) + except Exception: + TOOLS = [{"name": "http_api", "description": "Generic HTTP proxy"}] +else: + TOOLS = [{"name": "http_api", "description": "Generic HTTP proxy"}] + + +def _send(resp: Dict[str, Any]) -> None: + print(json.dumps(resp, separators=(",", ":")), flush=True) + + +def send_response(req_id: Any, result: Any = None, error: Any = None) -> None: + resp: Dict[str, Any] = {"jsonrpc": "2.0", "id": req_id} + if error is not None: + resp["error"] = {"code": -32000, "message": str(error)} + else: + resp["result"] = result if result is not None else {} + _send(resp) + + +def handle_tools_list(req_id: Any) -> None: + send_response(req_id, {"tools": TOOLS}) + + +def _http_forward(path: str, method: str = "POST", params: Dict[str, Any] | None = None, body: Any | None = None) -> Any: + if requests is None: + raise RuntimeError("`requests` not installed in adapter process") + url = path if path.startswith("http") else TARGET + (path if path.startswith("/") else "/" + path) + method = (method or "POST").upper() + if method == "GET": + r = requests.get(url, params=params or {}, timeout=60) + else: + r = requests.request(method, url, json=body or {}, params=params or {}, timeout=300) + try: + return r.json() + except Exception: + return r.text + + +def handle_tools_call(req: Dict[str, Any]) -> None: + req_id = req.get("id") + params = req.get("params", {}) or {} + name = params.get("name") + arguments = params.get("arguments") or {} + + # Validate tool + if not any(t.get("name") == name for t in TOOLS): + send_response(req_id, error=f"unknown tool '{name}'") + return + + path = arguments.get("path") + if not path: + send_response(req_id, error="missing 'path' in arguments") + return + + method = arguments.get("method", "POST") + body = arguments.get("body") + qparams = arguments.get("params") + + try: + content = _http_forward(path, method=method, params=qparams, body=body) + send_response(req_id, {"content": content}) + except Exception as e: + send_response(req_id, error=str(e)) + + +def main() -> None: + while True: + line = sys.stdin.readline() + if not line: + break + line = line.strip() + if not line: + continue + try: + req = json.loads(line) + except Exception: + continue + + method = req.get("method") + req_id = req.get("id") + + if method == "initialize": + send_response(req_id, {"capabilities": {}}) + elif method == "notifications/initialized": + # ignore notification + continue + elif method == "tools/list": + handle_tools_list(req_id) + elif method == "tools/call": + handle_tools_call(req) + else: + if req_id is not None: + send_response(req_id, error=f"unsupported method '{method}'") + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + pass diff --git a/third_party/hexstrike/hexstrike_stdio_adapter.py b/third_party/hexstrike/hexstrike_stdio_adapter.py new file mode 100644 index 0000000..e467ef3 --- /dev/null +++ b/third_party/hexstrike/hexstrike_stdio_adapter.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +"""Small stdio JSON-RPC bridge for HexStrike HTTP API. + +This adapter implements the minimal MCP stdio JSON-RPC surface +expected by `pentestagent`'s `StdioTransport`: + +- Responds to `initialize` with a simple result +- Accepts `notifications/initialized` (notification) +- Implements `tools/list` returning one proxy tool: `http_api` +- Implements `tools/call` for the `http_api` tool and forwards + requests to the HexStrike HTTP API (configured via env var + `HEXSTRIKE_SERVER`, default `http://127.0.0.1:8888`). + +Usage (mcp_servers.json): + { + "mcpServers": { + "hexstrike-local": { + "command": "python3", + "args": ["-u", "third_party/hexstrike/hexstrike_stdio_adapter.py"], + "description": "StdIO adapter bridge to HexStrike HTTP API" + } + } + } +""" +from __future__ import annotations + +import json +import os +import sys +from typing import Any, Dict + +try: + import requests +except Exception: + requests = None + + +BASE = os.environ.get("HEXSTRIKE_SERVER", "http://127.0.0.1:8888").rstrip("/") + + +def send_response(req_id: Any, result: Any = None, error: Any = None) -> None: + resp: Dict[str, Any] = {"jsonrpc": "2.0", "id": req_id} + if error is not None: + resp["error"] = {"code": -32000, "message": str(error)} + else: + resp["result"] = result if result is not None else {} + print(json.dumps(resp, separators=(",", ":")), flush=True) + + +def handle_tools_list(req_id: Any) -> None: + tools = [{"name": "http_api", "description": "Proxy to HexStrike HTTP API"}] + send_response(req_id, {"tools": tools}) + + +def forward_http(path: str, method: str = "POST", params: Dict[str, Any] | None = None, body: Any | None = None) -> Any: + if requests is None: + raise RuntimeError("requests library is not available in adapter process") + url = path if path.startswith("http") else BASE + (path if path.startswith("/") else "/" + path) + method = (method or "POST").upper() + if method == "GET": + r = requests.get(url, params=params or {}, timeout=60) + else: + r = requests.post(url, json=body or {}, params=params or {}, timeout=300) + try: + return r.json() + except Exception: + return r.text + + +def handle_tools_call(req: Dict[str, Any]) -> None: + req_id = req.get("id") + params = req.get("params", {}) or {} + name = params.get("name") + arguments = params.get("arguments") or {} + + if name != "http_api": + send_response(req_id, error=f"unknown tool '{name}'") + return + + path = arguments.get("path") + if not path: + send_response(req_id, error="missing 'path' in arguments") + return + + method = arguments.get("method", "POST") + body = arguments.get("body") + qparams = arguments.get("params") + + try: + content = forward_http(path, method=method, params=qparams, body=body) + # Manager expects: result -> {"content": ...} + send_response(req_id, {"content": content}) + except Exception as e: + send_response(req_id, error=str(e)) + + +def main() -> None: + # Read newline-delimited JSON-RPC messages from stdin. + # Keep process alive until stdin is closed. + while True: + line = sys.stdin.readline() + if not line: + break + line = line.strip() + if not line: + continue + try: + req = json.loads(line) + except Exception: + # ignore invalid json + continue + + method = req.get("method") + # Notifications have no id + req_id = req.get("id") + + if method == "initialize": + send_response(req_id, {"capabilities": {}}) + elif method == "notifications/initialized": + # notification — no response + continue + elif method == "tools/list": + handle_tools_list(req_id) + elif method == "tools/call": + handle_tools_call(req) + else: + # unknown method + if req_id is not None: + send_response(req_id, error=f"unsupported method '{method}'") + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + pass diff --git a/third_party/mcp/stdio_adapter.py b/third_party/mcp/stdio_adapter.py new file mode 100644 index 0000000..ae804b2 --- /dev/null +++ b/third_party/mcp/stdio_adapter.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python3 +"""Generic stdio JSON-RPC adapter bridge to an HTTP API. + +Configure via environment variables: +- `STDIO_TARGET` (default: "http://127.0.0.1:8888") +- `STDIO_TOOLS` (JSON list of tool descriptors, default: `[{"name":"http_api","description":"Generic HTTP proxy"}]`) + +The adapter implements the minimal MCP/stdio surface required by +`pentestagent`'s `StdioTransport`: +- handle `initialize` and `notifications/initialized` +- respond to `tools/list` +- handle `tools/call` and forward to HTTP endpoints + +`tools/call` arguments format (generic): + {"path": "/api/foo", "method": "POST", "params": {...}, "body": {...} } + +This file is intentionally small and dependency-light; it uses `requests` +when available and returns response JSON or text. +""" +from __future__ import annotations + +import json +import os +import sys +from typing import Any, Dict, List + +try: + import requests +except Exception: + requests = None + + +TARGET = os.environ.get("STDIO_TARGET", "http://127.0.0.1:8888").rstrip("/") +_tools_env = os.environ.get("STDIO_TOOLS") +if _tools_env: + try: + TOOLS: List[Dict[str, str]] = json.loads(_tools_env) + except Exception: + TOOLS = [{"name": "http_api", "description": "Generic HTTP proxy"}] +else: + TOOLS = [{"name": "http_api", "description": "Generic HTTP proxy"}] + + +def _send(resp: Dict[str, Any]) -> None: + print(json.dumps(resp, separators=(",", ":")), flush=True) + + +def send_response(req_id: Any, result: Any = None, error: Any = None) -> None: + resp: Dict[str, Any] = {"jsonrpc": "2.0", "id": req_id} + if error is not None: + resp["error"] = {"code": -32000, "message": str(error)} + else: + resp["result"] = result if result is not None else {} + _send(resp) + + +def handle_tools_list(req_id: Any) -> None: + send_response(req_id, {"tools": TOOLS}) + + +def _http_forward(path: str, method: str = "POST", params: Dict[str, Any] | None = None, body: Any | None = None) -> Any: + if requests is None: + raise RuntimeError("`requests` not installed in adapter process") + url = path if path.startswith("http") else TARGET + (path if path.startswith("/") else "/" + path) + method = (method or "POST").upper() + if method == "GET": + r = requests.get(url, params=params or {}, timeout=60) + else: + r = requests.request(method, url, json=body or {}, params=params or {}, timeout=300) + try: + return r.json() + except Exception: + return r.text + + +def handle_tools_call(req: Dict[str, Any]) -> None: + req_id = req.get("id") + params = req.get("params", {}) or {} + name = params.get("name") + arguments = params.get("arguments") or {} + + # Validate tool + if not any(t.get("name") == name for t in TOOLS): + send_response(req_id, error=f"unknown tool '{name}'") + return + + path = arguments.get("path") + if not path: + send_response(req_id, error="missing 'path' in arguments") + return + + method = arguments.get("method", "POST") + body = arguments.get("body") + qparams = arguments.get("params") + + try: + content = _http_forward(path, method=method, params=qparams, body=body) + send_response(req_id, {"content": content}) + except Exception as e: + send_response(req_id, error=str(e)) + + +def main() -> None: + while True: + line = sys.stdin.readline() + if not line: + break + line = line.strip() + if not line: + continue + try: + req = json.loads(line) + except Exception: + continue + + method = req.get("method") + req_id = req.get("id") + + if method == "initialize": + send_response(req_id, {"capabilities": {}}) + elif method == "notifications/initialized": + # ignore notification + continue + elif method == "tools/list": + handle_tools_list(req_id) + elif method == "tools/call": + handle_tools_call(req) + else: + if req_id is not None: + send_response(req_id, error=f"unsupported method '{method}'") + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + pass