mcp: add daemon mode + disconnect; harden StdioTransport cleanup

This commit is contained in:
giveen
2026-01-21 10:55:30 -07:00
parent c5c6fee8da
commit f3f3b0956b
4 changed files with 261 additions and 19 deletions

View File

@@ -85,7 +85,14 @@ Examples:
)
# tools list
tools_subparsers.add_parser("list", help="List all available tools")
tools_list = tools_subparsers.add_parser(
"list", help="List all available tools"
)
tools_list.add_argument(
"--include-mcp",
action="store_true",
help="Temporarily connect to configured MCP servers and include their tools",
)
# tools info
tools_info = tools_subparsers.add_parser("info", help="Show tool details")
@@ -127,6 +134,32 @@ Examples:
# mcp test
mcp_test = mcp_subparsers.add_parser("test", help="Test MCP server connection")
mcp_test.add_argument("name", help="Server name to test")
# mcp connect (keep manager connected and register tools)
mcp_connect = mcp_subparsers.add_parser(
"connect", help="Connect to an MCP server and keep connection alive"
)
mcp_connect.add_argument(
"name",
nargs="?",
default="all",
help="Server name to connect (or 'all' to connect all configured)",
)
mcp_connect.add_argument(
"--detach",
action="store_true",
help="Run as background daemon (writes PID file at ~/.pentestagent/mcp.pid)",
)
# mcp disconnect
mcp_disconnect = mcp_subparsers.add_parser(
"disconnect", help="Disconnect from an MCP server"
)
mcp_disconnect.add_argument(
"name",
nargs="?",
default="all",
help="Server name to disconnect (or 'all' to disconnect all)",
)
# workspace management
ws_parser = subparsers.add_parser(
@@ -160,7 +193,27 @@ def handle_tools_command(args: argparse.Namespace):
console = Console()
if args.tools_command == "list":
tools = get_all_tools()
# Optionally include MCP-discovered tools by connecting temporarily
manager = None
if getattr(args, "include_mcp", False):
from ..mcp.manager import MCPManager
manager = MCPManager()
try:
asyncio.run(manager.connect_all())
except Exception:
pass
try:
tools = get_all_tools()
finally:
# If we temporarily connected to MCP servers, disconnect them to
# ensure subprocess transports are closed before the event loop exits.
if manager is not None:
try:
asyncio.run(manager.disconnect_all())
except Exception:
pass
if not tools:
console.print("[yellow]No tools found[/]")
@@ -320,6 +373,152 @@ def handle_mcp_command(args: argparse.Namespace):
asyncio.run(test_server())
elif args.mcp_command == "connect":
# Connect and keep the manager running so MCP tools remain registered
name = args.name
detach = getattr(args, "detach", False)
console.print(f"[bold]Connecting to MCP server: {name}[/]\n")
async def run_connect():
# Long-running connect: connect requested server(s) and wait for signal
import signal
stop_event = asyncio.Event()
def _signal_handler():
try:
stop_event.set()
except Exception:
pass
loop = asyncio.get_running_loop()
for s in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(s, _signal_handler)
except Exception:
# Not all platforms support add_signal_handler (e.g., Windows)
pass
if name == "all":
await manager.connect_all()
else:
server = await manager.connect_server(name)
if not server:
console.print(f"[red]Failed to connect: {name}[/]")
return
console.print("[green]Connected. Press Ctrl-C to stop and disconnect.[/]")
await stop_event.wait()
console.print("\n[yellow]Shutting down connections...[/]")
try:
await manager.disconnect_all()
except Exception:
pass
# If detach requested, perform a simple double-fork to daemonize
if detach:
import os
from pathlib import Path
pid_dir = Path.home() / ".pentestagent"
pid_dir.mkdir(parents=True, exist_ok=True)
pidfile = pid_dir / "mcp.pid"
# Simple double-fork daemonization (POSIX only)
try:
pid = os.fork()
if pid > 0:
# parent exits
console.print(f"[green]MCP manager detached (pid: {pid}). PID file: {pidfile}[/]")
return
except OSError as e:
console.print(f"[red]Fork failed: {e}[/]")
return
os.setsid()
try:
pid2 = os.fork()
if pid2 > 0:
# first child exits
os._exit(0)
except OSError:
pass
# child continues as daemon
# detach std file descriptors
try:
with open(os.devnull, "rb") as devnull_in, open(os.devnull, "wb") as devnull_out:
os.dup2(devnull_in.fileno(), 0)
os.dup2(devnull_out.fileno(), 1)
os.dup2(devnull_out.fileno(), 2)
except Exception:
pass
# write pidfile
try:
with open(pidfile, "w") as f:
f.write(str(os.getpid()))
except Exception:
pass
# Run the connect loop in the daemon
try:
asyncio.run(run_connect())
finally:
try:
if pidfile.exists():
pidfile.unlink()
except Exception:
pass
else:
try:
asyncio.run(run_connect())
except KeyboardInterrupt:
console.print("[yellow]Interrupted by user[/]")
elif args.mcp_command == "disconnect":
name = args.name
# If a background daemon was created via --detach, try to read its pidfile
from pathlib import Path
pid_dir = Path.home() / ".pentestagent"
pidfile = pid_dir / "mcp.pid"
if pidfile.exists():
try:
pid_text = pidfile.read_text().strip()
pid = int(pid_text)
import os, signal, time
try:
os.kill(pid, signal.SIGTERM)
# give it a moment to exit
time.sleep(0.5)
except ProcessLookupError:
pass
try:
pidfile.unlink()
except Exception:
pass
console.print(f"[green]Sent SIGTERM to daemon (pid: {pid}). PID file removed.[/]")
return
except Exception:
# Fall back to in-process disconnect below
pass
async def run_disconnect():
if name == "all":
await manager.disconnect_all()
console.print("[green]Disconnected all MCP servers[/]")
else:
await manager.disconnect_server(name)
console.print(f"[green]Disconnected MCP server: {name}[/]")
asyncio.run(run_disconnect())
else:
console.print("[yellow]Use 'pentestagent mcp --help' for available commands[/]")

View File

@@ -22,6 +22,7 @@ from pathlib import Path
from typing import Any, Dict, List, Optional
from .tools import create_mcp_tool
from ..tools.registry import register_tool_instance, unregister_tool
from .transport import MCPTransport, StdioTransport
@@ -288,6 +289,11 @@ class MCPManager:
self.servers[name] = server
for tool_def in server.tools:
tool = create_mcp_tool(tool_def, server, self)
# Register tool into global registry so it appears in `tools list`
try:
register_tool_instance(tool)
except Exception:
pass
all_tools.append(tool)
print(f"[MCP] Connected to {name} with {len(server.tools)} tools")
return all_tools
@@ -334,6 +340,16 @@ class MCPManager:
server = await self._connect_server(config)
if server:
self.servers[name] = server
# Register tools for this server
try:
for tool_def in server.tools:
tool = create_mcp_tool(tool_def, server, self)
try:
register_tool_instance(tool)
except Exception:
pass
except Exception:
pass
return server
async def _connect_server(self, config: MCPServerConfig) -> Optional[MCPServer]:
@@ -457,6 +473,16 @@ class MCPManager:
server = self.servers.get(name)
if server:
await server.disconnect()
# Unregister MCP tools provided by this server
try:
for tool_def in server.tools:
mname = f"mcp_{server.name}_{tool_def.get('name')}"
try:
unregister_tool(mname)
except Exception:
pass
except Exception:
pass
del self.servers[name]
# If we started an adapter for this server, stop it as well
adapter = self._started_adapters.pop(name, None)

View File

@@ -4,24 +4,10 @@
"command": "python3",
"args": [
"third_party/hexstrike/hexstrike_mcp.py",
"--server",
"http://127.0.0.1:8888"
"--timeout",
"300"
],
"description": "HexStrike AI (vendored) - local server",
"timeout": 300,
"enabled": true,
"start_on_launch": false
}
,
"metasploit-local": {
"command": "python3",
"args": [
"third_party/MetasploitMCP/MetasploitMCP.py",
"--server",
"http://127.0.0.1:7777"
],
"description": "Metasploit MCP (vendored) - local server",
"timeout": 300,
"description": "HexStrike AI (vendored) - local MCP adapter (spawned via stdio)",
"enabled": true,
"start_on_launch": false
}

View File

@@ -160,6 +160,37 @@ class StdioTransport(MCPTransport):
except Exception:
pass
# Additional aggressive cleanup to avoid scheduling callbacks on a
# closed event loop during interpreter shutdown. Some Python
# implementations keep an internal transport around that attempts to
# call into the loop during object finalization; proactively close it
# while the loop is still running.
try:
loop = asyncio.get_running_loop()
except Exception:
loop = None
try:
# Accessing the private _transport attribute is a pragmatic
# measure to ensure the underlying pipe transport is closed
# immediately instead of relying on destructor behavior.
if loop is not None and not loop.is_closed() and hasattr(proc, "_transport"):
try:
proc._transport.close()
except Exception:
pass
except Exception:
pass
# Drop references to pipe objects to allow GC without scheduling
# destructor callbacks later.
try:
proc.stdin = None
proc.stdout = None
proc.stderr = None
except Exception:
pass
class SSETransport(MCPTransport):
"""MCP transport over Server-Sent Events (HTTP)."""