chore(mcp): restore MCP files from 49c0b93

This commit is contained in:
giveen
2026-01-21 12:32:57 -07:00
parent bfc86d1ba4
commit a20444aaaa
3 changed files with 13 additions and 699 deletions

View File

@@ -13,16 +13,13 @@ Uses standard MCP configuration format:
"""
import asyncio
import atexit
import json
import os
import signal
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from .tools import create_mcp_tool
from ..tools.registry import register_tool_instance, unregister_tool
from .transport import MCPTransport, StdioTransport
@@ -36,8 +33,6 @@ class MCPServerConfig:
env: Dict[str, str] = field(default_factory=dict)
enabled: bool = True
description: str = ""
# Whether to auto-start this server when `connect_all()` is called.
start_on_launch: bool = False
@dataclass
@@ -73,45 +68,7 @@ class MCPManager:
def __init__(self, config_path: Optional[Path] = None):
self.config_path = config_path or self._find_config()
self.servers: Dict[str, MCPServer] = {}
# Track adapters we auto-started so we can stop them later
self._started_adapters: Dict[str, object] = {}
self._message_id = 0
# Control socket server attributes
self._control_server: Optional[asyncio.AbstractServer] = None
self._control_task: Optional[asyncio.Task] = None
self._control_path: Optional[Path] = None
# Auto-connect to configured MCP servers if any are enabled in config.
try:
cfg = self._load_config()
has_enabled = any(getattr(s, "enabled", True) for s in cfg.values())
if has_enabled:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop: run connect synchronously so tools register
try:
asyncio.run(self.connect_all())
except Exception:
pass
else:
# Running loop present: schedule background connect task
try:
loop.create_task(self.connect_all())
except Exception:
pass
except Exception:
pass
# Ensure we attempt to clean up vendored servers on process exit
try:
atexit.register(self._atexit_cleanup)
except Exception as e:
logging.getLogger(__name__).exception("Failed to register atexit cleanup: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to register MCP atexit cleanup: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about atexit.register failure")
def _find_config(self) -> Path:
for path in self.DEFAULT_CONFIG_PATHS:
@@ -139,15 +96,8 @@ class MCPManager:
args=config.get("args", []),
env=config.get("env", {}),
enabled=config.get("enabled", True),
start_on_launch=config.get("start_on_launch", False),
description=config.get("description", ""),
)
# Environment-based auto-start overrides (previously supported via
# LAUNCH_* variables) have been removed. MCP adapters are no
# longer auto-started by the manager; operators should install and
# run adapters manually and add them to `mcp_servers.json` if they
# want `MCPManager` to connect to them.
return servers
except json.JSONDecodeError as e:
print(f"[MCP] Error loading config: {e}")
@@ -167,78 +117,6 @@ class MCPManager:
self.config_path.parent.mkdir(parents=True, exist_ok=True)
self.config_path.write_text(json.dumps(config, indent=2), encoding="utf-8")
def _atexit_cleanup(self):
"""Synchronous atexit cleanup that attempts to stop adapters and disconnect servers."""
try:
# Try to run async shutdown; if an event loop is already running this may fail,
# but it's best-effort to avoid orphaned vendored servers.
asyncio.run(self._stop_started_adapters_and_disconnect())
except Exception:
# Last-ditch: attempt to stop adapters synchronously.
# If the adapter exposes a blocking `stop()` call, call it. Otherwise, try
# to kill the underlying process by PID to avoid asyncio subprocess
# destructors running after the loop is closed.
for adapter in list(self._started_adapters.values()):
try:
# Prefer adapter-provided synchronous stop hook
stop_sync = getattr(adapter, "stop_sync", None)
if stop_sync:
try:
stop_sync()
continue
except Exception:
pass
# Fallback: try blocking stop() if present
stop = getattr(adapter, "stop", None)
if stop and not asyncio.iscoroutinefunction(stop):
try:
stop()
continue
except Exception as e:
logging.getLogger(__name__).exception(
"Error running adapter.stop(): %s", e
)
# Final fallback: kill underlying PID if available
pid = None
proc = getattr(adapter, "_process", None)
if proc is not None:
pid = getattr(proc, "pid", None)
if pid:
try:
os.kill(pid, signal.SIGTERM)
except Exception as e:
logging.getLogger(__name__).exception("Failed to SIGTERM pid %s: %s", pid, e)
try:
os.kill(pid, signal.SIGKILL)
except Exception as e2:
logging.getLogger(__name__).exception("Failed to SIGKILL pid %s: %s", pid, e2)
except Exception as e:
logging.getLogger(__name__).exception("Error while attempting synchronous adapter stop: %s", e)
async def _stop_started_adapters_and_disconnect(self) -> None:
# Stop any adapters we started
for _name, adapter in list(self._started_adapters.items()):
try:
stop = getattr(adapter, "stop", None)
if stop:
if asyncio.iscoroutinefunction(stop):
await stop()
else:
# run blocking stop in executor
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, stop)
except Exception as e:
logging.getLogger(__name__).exception("Error stopping adapter in async shutdown: %s", e)
self._started_adapters.clear()
# Disconnect any active MCP server connections
try:
await self.disconnect_all()
except Exception as e:
logging.getLogger(__name__).exception("Error during disconnect_all in shutdown: %s", e)
def add_server(
self,
name: str,
@@ -266,18 +144,6 @@ class MCPManager:
return True
return False
def set_enabled(self, name: str, enabled: bool) -> bool:
"""Enable or disable a configured MCP server in the config file.
Returns True if the server existed and was updated, False otherwise.
"""
servers = self._load_config()
if name not in servers:
return False
servers[name].enabled = bool(enabled)
self._save_config(servers)
return True
def list_configured_servers(self) -> List[dict]:
servers = self._load_config()
return [
@@ -293,11 +159,9 @@ class MCPManager:
for n, s in servers.items()
]
async def connect_all(self, register: bool = True, quiet: bool = False) -> List[Any]:
async def connect_all(self) -> List[Any]:
servers_config = self._load_config()
all_tools = []
# Connect to any configured servers but DO NOT auto-start vendored adapters.
# Operators should install and start vendored MCPs manually (see third_party/).
for name, config in servers_config.items():
if not config.enabled:
continue
@@ -306,101 +170,28 @@ class MCPManager:
self.servers[name] = server
for tool_def in server.tools:
tool = create_mcp_tool(tool_def, server, self)
if register:
# Register tool into global registry so it appears in `tools list`
try:
register_tool_instance(tool)
except Exception:
pass
all_tools.append(tool)
if not quiet:
print(f"[MCP] Connected to {name} with {len(server.tools)} tools")
print(f"[MCP] Connected to {name} with {len(server.tools)} tools")
return all_tools
async def connect_server(self, name: str, register: bool = True, quiet: bool = False) -> Optional[MCPServer]:
async def connect_server(self, name: str) -> Optional[MCPServer]:
servers_config = self._load_config()
if name not in servers_config:
return None
config = servers_config[name]
# Auto-start of vendored adapters (HexStrike/MetasploitMCP) has been
# removed. Operators should install and run any third-party MCP adapters
# manually under `third_party/` and configure `mcp_servers.json`.
server = await self._connect_server(config)
if server:
self.servers[name] = server
# Register tools for this server
try:
for tool_def in server.tools:
tool = create_mcp_tool(tool_def, server, self)
if register:
try:
register_tool_instance(tool)
except Exception:
pass
except Exception:
pass
if not quiet:
try:
print(f"[MCP] Connected to {name} with {len(server.tools)} tools")
except Exception:
pass
return server
async def _connect_server(self, config: MCPServerConfig) -> Optional[MCPServer]:
transport = None
try:
env = {**os.environ, **config.env}
# Decide transport type:
# - If args contain a http/sse transport or a --server http:// URL, use SSETransport
# - Otherwise default to StdioTransport (spawn process and use stdio JSON-RPC)
use_http = False
http_url = None
args_joined = " ".join(config.args or [])
if "--transport http" in args_joined or "--transport sse" in args_joined:
# Try to extract host/port from args
try:
# naive parsing: look for --host <host> and --port <port>
host = None
port = None
for i, a in enumerate(config.args or []):
if a == "--host" and i + 1 < len(config.args):
host = config.args[i + 1]
if a == "--port" and i + 1 < len(config.args):
port = config.args[i + 1]
if host and port:
http_url = f"http://{host}:{port}/sse"
except Exception:
http_url = None
use_http = True
# If args specify a --server URL, prefer that
if not http_url:
from urllib.parse import urlparse
for i, a in enumerate(config.args or []):
if a == "--server" and i + 1 < len(config.args):
candidate = config.args[i + 1]
if isinstance(candidate, str) and candidate.startswith("http"):
# If the provided server URL doesn't include a path, default to the MCP SSE path
p = urlparse(candidate)
if p.path and p.path != "/":
http_url = candidate
else:
http_url = candidate.rstrip("/") + "/sse"
use_http = True
break
if use_http and http_url:
from .transport import SSETransport
transport = SSETransport(url=http_url)
await transport.connect()
else:
transport = StdioTransport(
command=config.command, args=config.args, env=env
)
await transport.connect()
transport = StdioTransport(
command=config.command, args=config.args, env=env
)
await transport.connect()
await transport.send(
{
@@ -468,30 +259,7 @@ class MCPManager:
server = self.servers.get(name)
if server:
await server.disconnect()
# Unregister MCP tools provided by this server
try:
for tool_def in server.tools:
mname = f"mcp_{server.name}_{tool_def.get('name')}"
try:
unregister_tool(mname)
except Exception:
pass
except Exception:
pass
del self.servers[name]
# If we started an adapter for this server, stop it as well
adapter = self._started_adapters.pop(name, None)
if adapter:
try:
stop = getattr(adapter, "stop", None)
if stop:
if asyncio.iscoroutinefunction(stop):
await stop()
else:
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, stop)
except Exception:
pass
async def disconnect_all(self):
for server in list(self.servers.values()):
@@ -518,129 +286,3 @@ class MCPManager:
def is_connected(self, name: str) -> bool:
server = self.servers.get(name)
return server is not None and server.connected
async def start_control_server(self, path: Optional[str] = None) -> str:
"""Start a lightweight UNIX-domain socket control server.
The control server accepts newline-delimited JSON requests. Supported
commands:
{"cmd": "status"} -> returns connected servers and counts
{"cmd": "list_tools"} -> returns list of MCP tools (name, server, description)
Returns the path of the socket in use.
"""
if not path:
path = str(Path.home() / ".pentestagent" / "mcp.sock")
sock_path = Path(path)
# Ensure parent exists
sock_path.parent.mkdir(parents=True, exist_ok=True)
# Remove stale socket if present
try:
if sock_path.exists():
try:
sock_path.unlink()
except Exception:
pass
except Exception:
pass
async def _handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
try:
data = await reader.readline()
if not data:
return
try:
req = json.loads(data.decode("utf-8"))
except Exception:
resp = {"status": "error", "error": "invalid_json"}
writer.write((json.dumps(resp) + "\n").encode("utf-8"))
await writer.drain()
return
cmd = req.get("cmd") if isinstance(req, dict) else None
if cmd == "status":
servers = []
for name, s in self.servers.items():
servers.append({"name": name, "connected": bool(s.connected), "tool_count": len(s.tools)})
resp = {"status": "ok", "servers": servers}
writer.write((json.dumps(resp) + "\n").encode("utf-8"))
await writer.drain()
elif cmd == "list_tools":
tools = []
for sname, s in self.servers.items():
for t in s.tools:
tools.append({"name": t.get("name"), "server": sname, "description": t.get("description", "")})
resp = {"status": "ok", "tools": tools}
writer.write((json.dumps(resp) + "\n").encode("utf-8"))
await writer.drain()
elif cmd == "call_tool":
# Expecting: {"cmd":"call_tool","server":"name","tool":"tool_name","args":{...}}
server_name = req.get("server")
tool_name = req.get("tool")
arguments = req.get("args", {}) if isinstance(req.get("args", {}), dict) else {}
if not server_name or not tool_name:
writer.write((json.dumps({"status": "error", "error": "missing_parameters"}) + "\n").encode("utf-8"))
await writer.drain()
return
try:
# perform the tool call
result = await self.call_tool(server_name, tool_name, arguments)
writer.write((json.dumps({"status": "ok", "result": result}) + "\n").encode("utf-8"))
await writer.drain()
except Exception as e:
writer.write((json.dumps({"status": "error", "error": "call_failed", "message": str(e)}) + "\n").encode("utf-8"))
await writer.drain()
else:
resp = {"status": "error", "error": "unknown_cmd"}
writer.write((json.dumps(resp) + "\n").encode("utf-8"))
await writer.drain()
except Exception:
try:
writer.write((json.dumps({"status": "error", "error": "internal"}) + "\n").encode("utf-8"))
await writer.drain()
except Exception:
pass
finally:
try:
writer.close()
except Exception:
pass
# Start the asyncio unix server
loop = asyncio.get_running_loop()
server = await asyncio.start_unix_server(_handle, path=path)
self._control_server = server
self._control_path = Path(path)
# Keep server serving in background task
self._control_task = loop.create_task(server.serve_forever())
# Restrict socket access to current user where possible
try:
os.chmod(path, 0o600)
except Exception:
pass
return path
async def stop_control_server(self):
try:
if self._control_task:
self._control_task.cancel()
self._control_task = None
if self._control_server:
self._control_server.close()
try:
await self._control_server.wait_closed()
except Exception:
pass
self._control_server = None
if self._control_path and self._control_path.exists():
try:
self._control_path.unlink()
except Exception:
pass
self._control_path = None
except Exception:
pass

View File

@@ -1,18 +1,3 @@
{
"mcpServers": {
"hexstrike-local": {
"command": "python3",
"args": [
"-u",
"pentestagent/mcp/stdio_adapter.py"
],
"env": {
"STDIO_TARGET": "http://127.0.0.1:8888",
"STDIO_TOOLS": "[{\"name\":\"tools_api\",\"description\":\"Tools API\"},{\"name\":\"tools_web\",\"description\":\"Web tools\"},{\"name\":\"tools_network\",\"description\":\"Network tools\"},{\"name\":\"tools_cloud\",\"description\":\"Cloud tools\"},{\"name\":\"tools_exploit\",\"description\":\"Exploit tools\"},{\"name\":\"tools_binary\",\"description\":\"Binary tools\"},{\"name\":\"tools_forensics\",\"description\":\"Forensics tools\"},{\"name\":\"tools_parameters\",\"description\":\"Tool parameter helpers\"},{\"name\":\"tools_web_advanced\",\"description\":\"Advanced web tools\"},{\"name\":\"tools_web_frameworks\",\"description\":\"Web framework helpers\"},{\"name\":\"ai\",\"description\":\"AI assistant\"},{\"name\":\"intelligence\",\"description\":\"Intelligence services\"},{\"name\":\"processes\",\"description\":\"Process management\"},{\"name\":\"files\",\"description\":\"File operations\"},{\"name\":\"python_env\",\"description\":\"Python env manager\"}]"
},
"description": "HexStrike (stdio->HTTP) adapter - forwards MCP tools/call to HexStrike HTTP API",
"enabled": true,
"start_on_launch": false
}
}
"mcpServers": {}
}

View File

@@ -160,37 +160,6 @@ class StdioTransport(MCPTransport):
except Exception:
pass
# Additional aggressive cleanup to avoid scheduling callbacks on a
# closed event loop during interpreter shutdown. Some Python
# implementations keep an internal transport around that attempts to
# call into the loop during object finalization; proactively close it
# while the loop is still running.
try:
loop = asyncio.get_running_loop()
except Exception:
loop = None
try:
# Accessing the private _transport attribute is a pragmatic
# measure to ensure the underlying pipe transport is closed
# immediately instead of relying on destructor behavior.
if loop is not None and not loop.is_closed() and hasattr(proc, "_transport"):
try:
proc._transport.close()
except Exception:
pass
except Exception:
pass
# Drop references to pipe objects to allow GC without scheduling
# destructor callbacks later.
try:
proc.stdin = None
proc.stdout = None
proc.stderr = None
except Exception:
pass
class SSETransport(MCPTransport):
"""MCP transport over Server-Sent Events (HTTP)."""
@@ -205,12 +174,6 @@ class SSETransport(MCPTransport):
self.url = url
self.session: Optional[Any] = None # aiohttp.ClientSession
self._connected = False
self._post_url: Optional[str] = None
self._sse_response: Optional[Any] = None
self._sse_task: Optional[asyncio.Task] = None
self._pending: dict[str, asyncio.Future] = {}
self._pending_lock = asyncio.Lock()
self._endpoint_ready: Optional[asyncio.Event] = None
@property
def is_connected(self) -> bool:
@@ -223,40 +186,6 @@ class SSETransport(MCPTransport):
import aiohttp
self.session = aiohttp.ClientSession()
# Open a persistent SSE connection so we can receive async
# responses delivered over the event stream. Keep the response
# object alive and run a background task to parse events.
try:
# Do not use a short timeout; keep the connection open.
resp = await self.session.get(self.url, timeout=None)
# Store response and start background reader
self._sse_response = resp
# event used to signal when endpoint announced
self._endpoint_ready = asyncio.Event()
self._sse_task = asyncio.create_task(self._sse_listener(resp))
# Wait a short time for the endpoint to be discovered to avoid races
try:
await asyncio.wait_for(self._endpoint_ready.wait(), timeout=5.0)
except asyncio.TimeoutError:
# If endpoint not discovered, continue; send() will try discovery
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed opening SSE stream: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed opening SSE stream: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE open failure")
# If opening the SSE stream fails, still mark connected so
# send() can attempt POST discovery and report meaningful errors.
self._sse_response = None
self._sse_task = None
self._endpoint_ready = None
self._connected = True
except ImportError as e:
raise RuntimeError(
@@ -276,265 +205,23 @@ class SSETransport(MCPTransport):
if not self.session:
raise RuntimeError("Transport not connected")
if not self.session:
raise RuntimeError("Transport not connected")
# Ensure we have a POST endpoint. If discovery hasn't completed yet,
# try a quick synchronous discovery attempt before posting so we don't
# accidentally POST to the SSE listen endpoint which returns 405.
if not self._post_url:
try:
await self._discover_post_url(timeout=2.0)
except Exception:
pass
post_target = self._post_url or self.url
try:
async with self.session.post(
post_target, json=message, headers={"Content-Type": "application/json"}
self.url, json=message, headers={"Content-Type": "application/json"}
) as response:
status = response.status
if status == 200:
return await response.json()
if status == 202:
# Asynchronous response: wait for matching SSE event with the same id
if "id" not in message:
return {}
msg_id = str(message["id"])
fut = asyncio.get_running_loop().create_future()
async with self._pending_lock:
self._pending[msg_id] = fut
try:
result = await asyncio.wait_for(fut, timeout=15.0)
return result
finally:
async with self._pending_lock:
self._pending.pop(msg_id, None)
# Other statuses are errors
raise RuntimeError(f"HTTP error: {status}")
if response.status != 200:
raise RuntimeError(f"HTTP error: {response.status}")
return await response.json()
except Exception as e:
raise RuntimeError(f"SSE request failed: {e}") from e
async def _discover_post_url(self, timeout: float = 2.0) -> None:
"""Attempt a short GET to the SSE endpoint to find the advertised POST URL.
This is a fallback used when the background listener hasn't yet
extracted the `endpoint` event. It reads a few lines with a short
timeout and sets `self._post_url` if found.
"""
if not self.session:
return
try:
async with self.session.get(self.url, timeout=timeout) as resp:
if resp.status != 200:
return
# Read up to a few lines looking for `data:`
for _ in range(20):
line = await resp.content.readline()
if not line:
break
try:
text = line.decode(errors="ignore").strip()
except Exception:
continue
if text.startswith("data:"):
endpoint = text.split("data:", 1)[1].strip()
from urllib.parse import urlparse
p = urlparse(self.url)
if endpoint.startswith("http"):
self._post_url = endpoint
elif endpoint.startswith("/"):
self._post_url = f"{p.scheme}://{p.netloc}{endpoint}"
else:
self._post_url = f"{p.scheme}://{p.netloc}/{endpoint.lstrip('/')}"
return
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error during SSE POST endpoint discovery: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error during SSE POST endpoint discovery: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE discovery error")
return
async def disconnect(self):
"""Close the HTTP session."""
# Cancel listener and close SSE response
try:
if self._sse_task:
self._sse_task.cancel()
try:
await self._sse_task
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error awaiting SSE listener task during disconnect: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error awaiting SSE listener task during disconnect: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE listener await failure")
self._sse_task = None
except Exception:
import logging
logging.getLogger(__name__).exception("Error cancelling SSE listener task during disconnect")
try:
from ..interface.notifier import notify
notify("warning", "Error cancelling SSE listener task during disconnect")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE listener cancellation error")
try:
if self._sse_response:
try:
await self._sse_response.release()
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error releasing SSE response during disconnect: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error releasing SSE response during disconnect: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE response release error")
self._sse_response = None
except Exception:
import logging
logging.getLogger(__name__).exception("Error handling SSE response during disconnect")
try:
from ..interface.notifier import notify
notify("warning", "Error handling SSE response during disconnect")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE response handling error")
# Fail any pending requests
async with self._pending_lock:
for fut in list(self._pending.values()):
if not fut.done():
fut.set_exception(RuntimeError("Transport disconnected"))
self._pending.clear()
if self.session:
await self.session.close()
self.session = None
self._connected = False
async def _sse_listener(self, resp: Any):
"""Background task that reads SSE events and resolves pending futures.
The listener expects SSE-formatted events where `data:` lines may
contain JSON payloads. If a JSON object contains an `id` field that
matches a pending request, the corresponding future is completed with
that JSON value.
"""
try:
# Read the stream line-by-line, accumulating event blocks
event_lines: list[str] = []
async for raw in resp.content:
try:
line = raw.decode(errors="ignore").rstrip("\r\n")
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to decode SSE raw chunk: %s", e)
continue
if line == "":
# End of event; process accumulated lines
event_name = None
data_lines: list[str] = []
for evt_line in event_lines:
if evt_line.startswith("event:"):
event_name = evt_line.split(":", 1)[1].strip()
elif evt_line.startswith("data:"):
data_lines.append(evt_line.split(":", 1)[1].lstrip())
if data_lines:
data_text = "\n".join(data_lines)
# If this is an endpoint announcement, record POST URL
if event_name == "endpoint":
try:
from urllib.parse import urlparse
p = urlparse(self.url)
endpoint = data_text.strip()
if endpoint.startswith("http"):
self._post_url = endpoint
elif endpoint.startswith("/"):
self._post_url = f"{p.scheme}://{p.netloc}{endpoint}"
else:
self._post_url = f"{p.scheme}://{p.netloc}/{endpoint.lstrip('/')}"
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed parsing SSE endpoint announcement: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed parsing SSE endpoint announcement: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE endpoint parse failure")
# Notify connect() that endpoint is ready
try:
if self._endpoint_ready and not self._endpoint_ready.is_set():
self._endpoint_ready.set()
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to set SSE endpoint ready event: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to set SSE endpoint ready event: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE endpoint ready event failure")
else:
# Try to parse as JSON and resolve pending futures
try:
obj = json.loads(data_text)
if isinstance(obj, dict) and "id" in obj:
msg_id = str(obj.get("id"))
async with self._pending_lock:
fut = self._pending.get(msg_id)
if fut and not fut.done():
fut.set_result(obj)
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed parsing SSE event JSON or resolving pending future: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed parsing SSE event JSON or resolving pending future: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about SSE event parse/future failure")
event_lines = []
else:
event_lines.append(line)
except asyncio.CancelledError:
return
except Exception:
# On error, fail pending futures
async with self._pending_lock:
for fut in list(self._pending.values()):
if not fut.done():
fut.set_exception(RuntimeError("SSE listener error"))
self._pending.clear()
finally:
# Ensure we mark disconnected state
self._connected = False