diff --git a/pentestagent/mcp/manager.py b/pentestagent/mcp/manager.py index fed13d0..c7b96c1 100644 --- a/pentestagent/mcp/manager.py +++ b/pentestagent/mcp/manager.py @@ -13,16 +13,13 @@ Uses standard MCP configuration format: """ import asyncio -import atexit import json import os -import signal from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional from .tools import create_mcp_tool -from ..tools.registry import register_tool_instance, unregister_tool from .transport import MCPTransport, StdioTransport @@ -36,8 +33,6 @@ class MCPServerConfig: env: Dict[str, str] = field(default_factory=dict) enabled: bool = True description: str = "" - # Whether to auto-start this server when `connect_all()` is called. - start_on_launch: bool = False @dataclass @@ -73,45 +68,7 @@ class MCPManager: def __init__(self, config_path: Optional[Path] = None): self.config_path = config_path or self._find_config() self.servers: Dict[str, MCPServer] = {} - # Track adapters we auto-started so we can stop them later - self._started_adapters: Dict[str, object] = {} self._message_id = 0 - # Control socket server attributes - self._control_server: Optional[asyncio.AbstractServer] = None - self._control_task: Optional[asyncio.Task] = None - self._control_path: Optional[Path] = None - # Auto-connect to configured MCP servers if any are enabled in config. - try: - cfg = self._load_config() - has_enabled = any(getattr(s, "enabled", True) for s in cfg.values()) - if has_enabled: - try: - loop = asyncio.get_running_loop() - except RuntimeError: - # No running loop: run connect synchronously so tools register - try: - asyncio.run(self.connect_all()) - except Exception: - pass - else: - # Running loop present: schedule background connect task - try: - loop.create_task(self.connect_all()) - except Exception: - pass - except Exception: - pass - # Ensure we attempt to clean up vendored servers on process exit - try: - atexit.register(self._atexit_cleanup) - except Exception as e: - logging.getLogger(__name__).exception("Failed to register atexit cleanup: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Failed to register MCP atexit cleanup: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about atexit.register failure") def _find_config(self) -> Path: for path in self.DEFAULT_CONFIG_PATHS: @@ -139,15 +96,8 @@ class MCPManager: args=config.get("args", []), env=config.get("env", {}), enabled=config.get("enabled", True), - start_on_launch=config.get("start_on_launch", False), description=config.get("description", ""), ) - # Environment-based auto-start overrides (previously supported via - # LAUNCH_* variables) have been removed. MCP adapters are no - # longer auto-started by the manager; operators should install and - # run adapters manually and add them to `mcp_servers.json` if they - # want `MCPManager` to connect to them. - return servers except json.JSONDecodeError as e: print(f"[MCP] Error loading config: {e}") @@ -167,78 +117,6 @@ class MCPManager: self.config_path.parent.mkdir(parents=True, exist_ok=True) self.config_path.write_text(json.dumps(config, indent=2), encoding="utf-8") - def _atexit_cleanup(self): - """Synchronous atexit cleanup that attempts to stop adapters and disconnect servers.""" - try: - # Try to run async shutdown; if an event loop is already running this may fail, - # but it's best-effort to avoid orphaned vendored servers. - asyncio.run(self._stop_started_adapters_and_disconnect()) - except Exception: - # Last-ditch: attempt to stop adapters synchronously. - # If the adapter exposes a blocking `stop()` call, call it. Otherwise, try - # to kill the underlying process by PID to avoid asyncio subprocess - # destructors running after the loop is closed. - for adapter in list(self._started_adapters.values()): - try: - # Prefer adapter-provided synchronous stop hook - stop_sync = getattr(adapter, "stop_sync", None) - if stop_sync: - try: - stop_sync() - continue - except Exception: - pass - - # Fallback: try blocking stop() if present - stop = getattr(adapter, "stop", None) - if stop and not asyncio.iscoroutinefunction(stop): - try: - stop() - continue - except Exception as e: - logging.getLogger(__name__).exception( - "Error running adapter.stop(): %s", e - ) - - # Final fallback: kill underlying PID if available - pid = None - proc = getattr(adapter, "_process", None) - if proc is not None: - pid = getattr(proc, "pid", None) - if pid: - try: - os.kill(pid, signal.SIGTERM) - except Exception as e: - logging.getLogger(__name__).exception("Failed to SIGTERM pid %s: %s", pid, e) - try: - os.kill(pid, signal.SIGKILL) - except Exception as e2: - logging.getLogger(__name__).exception("Failed to SIGKILL pid %s: %s", pid, e2) - except Exception as e: - logging.getLogger(__name__).exception("Error while attempting synchronous adapter stop: %s", e) - - async def _stop_started_adapters_and_disconnect(self) -> None: - # Stop any adapters we started - for _name, adapter in list(self._started_adapters.items()): - try: - stop = getattr(adapter, "stop", None) - if stop: - if asyncio.iscoroutinefunction(stop): - await stop() - else: - # run blocking stop in executor - loop = asyncio.get_running_loop() - await loop.run_in_executor(None, stop) - except Exception as e: - logging.getLogger(__name__).exception("Error stopping adapter in async shutdown: %s", e) - self._started_adapters.clear() - - # Disconnect any active MCP server connections - try: - await self.disconnect_all() - except Exception as e: - logging.getLogger(__name__).exception("Error during disconnect_all in shutdown: %s", e) - def add_server( self, name: str, @@ -266,18 +144,6 @@ class MCPManager: return True return False - def set_enabled(self, name: str, enabled: bool) -> bool: - """Enable or disable a configured MCP server in the config file. - - Returns True if the server existed and was updated, False otherwise. - """ - servers = self._load_config() - if name not in servers: - return False - servers[name].enabled = bool(enabled) - self._save_config(servers) - return True - def list_configured_servers(self) -> List[dict]: servers = self._load_config() return [ @@ -293,11 +159,9 @@ class MCPManager: for n, s in servers.items() ] - async def connect_all(self, register: bool = True, quiet: bool = False) -> List[Any]: + async def connect_all(self) -> List[Any]: servers_config = self._load_config() all_tools = [] - # Connect to any configured servers but DO NOT auto-start vendored adapters. - # Operators should install and start vendored MCPs manually (see third_party/). for name, config in servers_config.items(): if not config.enabled: continue @@ -306,101 +170,28 @@ class MCPManager: self.servers[name] = server for tool_def in server.tools: tool = create_mcp_tool(tool_def, server, self) - if register: - # Register tool into global registry so it appears in `tools list` - try: - register_tool_instance(tool) - except Exception: - pass all_tools.append(tool) - if not quiet: - print(f"[MCP] Connected to {name} with {len(server.tools)} tools") + print(f"[MCP] Connected to {name} with {len(server.tools)} tools") return all_tools - async def connect_server(self, name: str, register: bool = True, quiet: bool = False) -> Optional[MCPServer]: + async def connect_server(self, name: str) -> Optional[MCPServer]: servers_config = self._load_config() if name not in servers_config: return None config = servers_config[name] - # Auto-start of vendored adapters (HexStrike/MetasploitMCP) has been - # removed. Operators should install and run any third-party MCP adapters - # manually under `third_party/` and configure `mcp_servers.json`. - server = await self._connect_server(config) if server: self.servers[name] = server - # Register tools for this server - try: - for tool_def in server.tools: - tool = create_mcp_tool(tool_def, server, self) - if register: - try: - register_tool_instance(tool) - except Exception: - pass - except Exception: - pass - if not quiet: - try: - print(f"[MCP] Connected to {name} with {len(server.tools)} tools") - except Exception: - pass return server async def _connect_server(self, config: MCPServerConfig) -> Optional[MCPServer]: transport = None try: env = {**os.environ, **config.env} - - # Decide transport type: - # - If args contain a http/sse transport or a --server http:// URL, use SSETransport - # - Otherwise default to StdioTransport (spawn process and use stdio JSON-RPC) - use_http = False - http_url = None - args_joined = " ".join(config.args or []) - if "--transport http" in args_joined or "--transport sse" in args_joined: - # Try to extract host/port from args - try: - # naive parsing: look for --host and --port - host = None - port = None - for i, a in enumerate(config.args or []): - if a == "--host" and i + 1 < len(config.args): - host = config.args[i + 1] - if a == "--port" and i + 1 < len(config.args): - port = config.args[i + 1] - if host and port: - http_url = f"http://{host}:{port}/sse" - except Exception: - http_url = None - use_http = True - # If args specify a --server URL, prefer that - if not http_url: - from urllib.parse import urlparse - - for i, a in enumerate(config.args or []): - if a == "--server" and i + 1 < len(config.args): - candidate = config.args[i + 1] - if isinstance(candidate, str) and candidate.startswith("http"): - # If the provided server URL doesn't include a path, default to the MCP SSE path - p = urlparse(candidate) - if p.path and p.path != "/": - http_url = candidate - else: - http_url = candidate.rstrip("/") + "/sse" - use_http = True - break - - if use_http and http_url: - from .transport import SSETransport - - transport = SSETransport(url=http_url) - await transport.connect() - else: - transport = StdioTransport( - command=config.command, args=config.args, env=env - ) - await transport.connect() + transport = StdioTransport( + command=config.command, args=config.args, env=env + ) + await transport.connect() await transport.send( { @@ -468,30 +259,7 @@ class MCPManager: server = self.servers.get(name) if server: await server.disconnect() - # Unregister MCP tools provided by this server - try: - for tool_def in server.tools: - mname = f"mcp_{server.name}_{tool_def.get('name')}" - try: - unregister_tool(mname) - except Exception: - pass - except Exception: - pass del self.servers[name] - # If we started an adapter for this server, stop it as well - adapter = self._started_adapters.pop(name, None) - if adapter: - try: - stop = getattr(adapter, "stop", None) - if stop: - if asyncio.iscoroutinefunction(stop): - await stop() - else: - loop = asyncio.get_running_loop() - await loop.run_in_executor(None, stop) - except Exception: - pass async def disconnect_all(self): for server in list(self.servers.values()): @@ -518,129 +286,3 @@ class MCPManager: def is_connected(self, name: str) -> bool: server = self.servers.get(name) return server is not None and server.connected - - async def start_control_server(self, path: Optional[str] = None) -> str: - """Start a lightweight UNIX-domain socket control server. - - The control server accepts newline-delimited JSON requests. Supported - commands: - {"cmd": "status"} -> returns connected servers and counts - {"cmd": "list_tools"} -> returns list of MCP tools (name, server, description) - - Returns the path of the socket in use. - """ - if not path: - path = str(Path.home() / ".pentestagent" / "mcp.sock") - - sock_path = Path(path) - # Ensure parent exists - sock_path.parent.mkdir(parents=True, exist_ok=True) - - # Remove stale socket if present - try: - if sock_path.exists(): - try: - sock_path.unlink() - except Exception: - pass - except Exception: - pass - - async def _handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - try: - data = await reader.readline() - if not data: - return - try: - req = json.loads(data.decode("utf-8")) - except Exception: - resp = {"status": "error", "error": "invalid_json"} - writer.write((json.dumps(resp) + "\n").encode("utf-8")) - await writer.drain() - return - - cmd = req.get("cmd") if isinstance(req, dict) else None - if cmd == "status": - servers = [] - for name, s in self.servers.items(): - servers.append({"name": name, "connected": bool(s.connected), "tool_count": len(s.tools)}) - resp = {"status": "ok", "servers": servers} - writer.write((json.dumps(resp) + "\n").encode("utf-8")) - await writer.drain() - elif cmd == "list_tools": - tools = [] - for sname, s in self.servers.items(): - for t in s.tools: - tools.append({"name": t.get("name"), "server": sname, "description": t.get("description", "")}) - resp = {"status": "ok", "tools": tools} - writer.write((json.dumps(resp) + "\n").encode("utf-8")) - await writer.drain() - elif cmd == "call_tool": - # Expecting: {"cmd":"call_tool","server":"name","tool":"tool_name","args":{...}} - server_name = req.get("server") - tool_name = req.get("tool") - arguments = req.get("args", {}) if isinstance(req.get("args", {}), dict) else {} - if not server_name or not tool_name: - writer.write((json.dumps({"status": "error", "error": "missing_parameters"}) + "\n").encode("utf-8")) - await writer.drain() - return - try: - # perform the tool call - result = await self.call_tool(server_name, tool_name, arguments) - writer.write((json.dumps({"status": "ok", "result": result}) + "\n").encode("utf-8")) - await writer.drain() - except Exception as e: - writer.write((json.dumps({"status": "error", "error": "call_failed", "message": str(e)}) + "\n").encode("utf-8")) - await writer.drain() - - else: - resp = {"status": "error", "error": "unknown_cmd"} - writer.write((json.dumps(resp) + "\n").encode("utf-8")) - await writer.drain() - except Exception: - try: - writer.write((json.dumps({"status": "error", "error": "internal"}) + "\n").encode("utf-8")) - await writer.drain() - except Exception: - pass - finally: - try: - writer.close() - except Exception: - pass - - # Start the asyncio unix server - loop = asyncio.get_running_loop() - server = await asyncio.start_unix_server(_handle, path=path) - self._control_server = server - self._control_path = Path(path) - - # Keep server serving in background task - self._control_task = loop.create_task(server.serve_forever()) - # Restrict socket access to current user where possible - try: - os.chmod(path, 0o600) - except Exception: - pass - return path - - async def stop_control_server(self): - try: - if self._control_task: - self._control_task.cancel() - self._control_task = None - if self._control_server: - self._control_server.close() - try: - await self._control_server.wait_closed() - except Exception: - pass - self._control_server = None - if self._control_path and self._control_path.exists(): - try: - self._control_path.unlink() - except Exception: - pass - self._control_path = None - except Exception: - pass diff --git a/pentestagent/mcp/mcp_servers.json b/pentestagent/mcp/mcp_servers.json index fdc45a8..da39e4f 100644 --- a/pentestagent/mcp/mcp_servers.json +++ b/pentestagent/mcp/mcp_servers.json @@ -1,18 +1,3 @@ { - "mcpServers": { - "hexstrike-local": { - "command": "python3", - "args": [ - "-u", - "pentestagent/mcp/stdio_adapter.py" - ], - "env": { - "STDIO_TARGET": "http://127.0.0.1:8888", - "STDIO_TOOLS": "[{\"name\":\"tools_api\",\"description\":\"Tools API\"},{\"name\":\"tools_web\",\"description\":\"Web tools\"},{\"name\":\"tools_network\",\"description\":\"Network tools\"},{\"name\":\"tools_cloud\",\"description\":\"Cloud tools\"},{\"name\":\"tools_exploit\",\"description\":\"Exploit tools\"},{\"name\":\"tools_binary\",\"description\":\"Binary tools\"},{\"name\":\"tools_forensics\",\"description\":\"Forensics tools\"},{\"name\":\"tools_parameters\",\"description\":\"Tool parameter helpers\"},{\"name\":\"tools_web_advanced\",\"description\":\"Advanced web tools\"},{\"name\":\"tools_web_frameworks\",\"description\":\"Web framework helpers\"},{\"name\":\"ai\",\"description\":\"AI assistant\"},{\"name\":\"intelligence\",\"description\":\"Intelligence services\"},{\"name\":\"processes\",\"description\":\"Process management\"},{\"name\":\"files\",\"description\":\"File operations\"},{\"name\":\"python_env\",\"description\":\"Python env manager\"}]" - }, - "description": "HexStrike (stdio->HTTP) adapter - forwards MCP tools/call to HexStrike HTTP API", - "enabled": true, - "start_on_launch": false - } - } + "mcpServers": {} } diff --git a/pentestagent/mcp/transport.py b/pentestagent/mcp/transport.py index 9510bbc..52e0883 100644 --- a/pentestagent/mcp/transport.py +++ b/pentestagent/mcp/transport.py @@ -160,37 +160,6 @@ class StdioTransport(MCPTransport): except Exception: pass - # Additional aggressive cleanup to avoid scheduling callbacks on a - # closed event loop during interpreter shutdown. Some Python - # implementations keep an internal transport around that attempts to - # call into the loop during object finalization; proactively close it - # while the loop is still running. - try: - loop = asyncio.get_running_loop() - except Exception: - loop = None - - try: - # Accessing the private _transport attribute is a pragmatic - # measure to ensure the underlying pipe transport is closed - # immediately instead of relying on destructor behavior. - if loop is not None and not loop.is_closed() and hasattr(proc, "_transport"): - try: - proc._transport.close() - except Exception: - pass - except Exception: - pass - - # Drop references to pipe objects to allow GC without scheduling - # destructor callbacks later. - try: - proc.stdin = None - proc.stdout = None - proc.stderr = None - except Exception: - pass - class SSETransport(MCPTransport): """MCP transport over Server-Sent Events (HTTP).""" @@ -205,12 +174,6 @@ class SSETransport(MCPTransport): self.url = url self.session: Optional[Any] = None # aiohttp.ClientSession self._connected = False - self._post_url: Optional[str] = None - self._sse_response: Optional[Any] = None - self._sse_task: Optional[asyncio.Task] = None - self._pending: dict[str, asyncio.Future] = {} - self._pending_lock = asyncio.Lock() - self._endpoint_ready: Optional[asyncio.Event] = None @property def is_connected(self) -> bool: @@ -223,40 +186,6 @@ class SSETransport(MCPTransport): import aiohttp self.session = aiohttp.ClientSession() - - # Open a persistent SSE connection so we can receive async - # responses delivered over the event stream. Keep the response - # object alive and run a background task to parse events. - try: - # Do not use a short timeout; keep the connection open. - resp = await self.session.get(self.url, timeout=None) - # Store response and start background reader - self._sse_response = resp - # event used to signal when endpoint announced - self._endpoint_ready = asyncio.Event() - self._sse_task = asyncio.create_task(self._sse_listener(resp)) - # Wait a short time for the endpoint to be discovered to avoid races - try: - await asyncio.wait_for(self._endpoint_ready.wait(), timeout=5.0) - except asyncio.TimeoutError: - # If endpoint not discovered, continue; send() will try discovery - pass - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Failed opening SSE stream: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Failed opening SSE stream: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about SSE open failure") - # If opening the SSE stream fails, still mark connected so - # send() can attempt POST discovery and report meaningful errors. - self._sse_response = None - self._sse_task = None - self._endpoint_ready = None - self._connected = True except ImportError as e: raise RuntimeError( @@ -276,265 +205,23 @@ class SSETransport(MCPTransport): if not self.session: raise RuntimeError("Transport not connected") - if not self.session: - raise RuntimeError("Transport not connected") - - # Ensure we have a POST endpoint. If discovery hasn't completed yet, - # try a quick synchronous discovery attempt before posting so we don't - # accidentally POST to the SSE listen endpoint which returns 405. - if not self._post_url: - try: - await self._discover_post_url(timeout=2.0) - except Exception: - pass - - post_target = self._post_url or self.url - try: async with self.session.post( - post_target, json=message, headers={"Content-Type": "application/json"} + self.url, json=message, headers={"Content-Type": "application/json"} ) as response: - status = response.status - if status == 200: - return await response.json() - if status == 202: - # Asynchronous response: wait for matching SSE event with the same id - if "id" not in message: - return {} - msg_id = str(message["id"]) - fut = asyncio.get_running_loop().create_future() - async with self._pending_lock: - self._pending[msg_id] = fut - try: - result = await asyncio.wait_for(fut, timeout=15.0) - return result - finally: - async with self._pending_lock: - self._pending.pop(msg_id, None) - # Other statuses are errors - raise RuntimeError(f"HTTP error: {status}") + if response.status != 200: + raise RuntimeError(f"HTTP error: {response.status}") + + return await response.json() except Exception as e: raise RuntimeError(f"SSE request failed: {e}") from e - async def _discover_post_url(self, timeout: float = 2.0) -> None: - """Attempt a short GET to the SSE endpoint to find the advertised POST URL. - - This is a fallback used when the background listener hasn't yet - extracted the `endpoint` event. It reads a few lines with a short - timeout and sets `self._post_url` if found. - """ - if not self.session: - return - - try: - async with self.session.get(self.url, timeout=timeout) as resp: - if resp.status != 200: - return - # Read up to a few lines looking for `data:` - for _ in range(20): - line = await resp.content.readline() - if not line: - break - try: - text = line.decode(errors="ignore").strip() - except Exception: - continue - if text.startswith("data:"): - endpoint = text.split("data:", 1)[1].strip() - from urllib.parse import urlparse - - p = urlparse(self.url) - if endpoint.startswith("http"): - self._post_url = endpoint - elif endpoint.startswith("/"): - self._post_url = f"{p.scheme}://{p.netloc}{endpoint}" - else: - self._post_url = f"{p.scheme}://{p.netloc}/{endpoint.lstrip('/')}" - return - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Error during SSE POST endpoint discovery: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Error during SSE POST endpoint discovery: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about SSE discovery error") - return - async def disconnect(self): """Close the HTTP session.""" - # Cancel listener and close SSE response - try: - if self._sse_task: - self._sse_task.cancel() - try: - await self._sse_task - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Error awaiting SSE listener task during disconnect: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Error awaiting SSE listener task during disconnect: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about SSE listener await failure") - self._sse_task = None - except Exception: - import logging - - logging.getLogger(__name__).exception("Error cancelling SSE listener task during disconnect") - try: - from ..interface.notifier import notify - - notify("warning", "Error cancelling SSE listener task during disconnect") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about SSE listener cancellation error") - - try: - if self._sse_response: - try: - await self._sse_response.release() - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Error releasing SSE response during disconnect: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Error releasing SSE response during disconnect: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about SSE response release error") - self._sse_response = None - except Exception: - import logging - - logging.getLogger(__name__).exception("Error handling SSE response during disconnect") - try: - from ..interface.notifier import notify - - notify("warning", "Error handling SSE response during disconnect") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about SSE response handling error") - - # Fail any pending requests - async with self._pending_lock: - for fut in list(self._pending.values()): - if not fut.done(): - fut.set_exception(RuntimeError("Transport disconnected")) - self._pending.clear() - if self.session: await self.session.close() self.session = None - self._connected = False - - async def _sse_listener(self, resp: Any): - """Background task that reads SSE events and resolves pending futures. - - The listener expects SSE-formatted events where `data:` lines may - contain JSON payloads. If a JSON object contains an `id` field that - matches a pending request, the corresponding future is completed with - that JSON value. - """ - try: - # Read the stream line-by-line, accumulating event blocks - event_lines: list[str] = [] - async for raw in resp.content: - try: - line = raw.decode(errors="ignore").rstrip("\r\n") - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Failed to decode SSE raw chunk: %s", e) - continue - if line == "": - # End of event; process accumulated lines - event_name = None - data_lines: list[str] = [] - for evt_line in event_lines: - if evt_line.startswith("event:"): - event_name = evt_line.split(":", 1)[1].strip() - elif evt_line.startswith("data:"): - data_lines.append(evt_line.split(":", 1)[1].lstrip()) - - if data_lines: - data_text = "\n".join(data_lines) - # If this is an endpoint announcement, record POST URL - if event_name == "endpoint": - try: - from urllib.parse import urlparse - - p = urlparse(self.url) - endpoint = data_text.strip() - if endpoint.startswith("http"): - self._post_url = endpoint - elif endpoint.startswith("/"): - self._post_url = f"{p.scheme}://{p.netloc}{endpoint}" - else: - self._post_url = f"{p.scheme}://{p.netloc}/{endpoint.lstrip('/')}" - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Failed parsing SSE endpoint announcement: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Failed parsing SSE endpoint announcement: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about SSE endpoint parse failure") - # Notify connect() that endpoint is ready - try: - if self._endpoint_ready and not self._endpoint_ready.is_set(): - self._endpoint_ready.set() - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Failed to set SSE endpoint ready event: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Failed to set SSE endpoint ready event: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about SSE endpoint ready event failure") - else: - # Try to parse as JSON and resolve pending futures - try: - obj = json.loads(data_text) - if isinstance(obj, dict) and "id" in obj: - msg_id = str(obj.get("id")) - async with self._pending_lock: - fut = self._pending.get(msg_id) - if fut and not fut.done(): - fut.set_result(obj) - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Failed parsing SSE event JSON or resolving pending future: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Failed parsing SSE event JSON or resolving pending future: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about SSE event parse/future failure") - - event_lines = [] - else: - event_lines.append(line) - except asyncio.CancelledError: - return - except Exception: - # On error, fail pending futures - async with self._pending_lock: - for fut in list(self._pending.values()): - if not fut.done(): - fut.set_exception(RuntimeError("SSE listener error")) - self._pending.clear() - finally: - # Ensure we mark disconnected state self._connected = False