diff --git a/pentestagent/mcp/transport.py b/pentestagent/mcp/transport.py index 5839ed6..d8d509a 100644 --- a/pentestagent/mcp/transport.py +++ b/pentestagent/mcp/transport.py @@ -179,6 +179,7 @@ class SSETransport(MCPTransport): self._sse_task: Optional[asyncio.Task] = None self._pending: dict[str, asyncio.Future] = {} self._pending_lock = asyncio.Lock() + self._endpoint_ready: Optional[asyncio.Event] = None @property def is_connected(self) -> bool: @@ -200,12 +201,21 @@ class SSETransport(MCPTransport): resp = await self.session.get(self.url, timeout=None) # Store response and start background reader self._sse_response = resp + # event used to signal when endpoint announced + self._endpoint_ready = asyncio.Event() self._sse_task = asyncio.create_task(self._sse_listener(resp)) + # Wait a short time for the endpoint to be discovered to avoid races + try: + await asyncio.wait_for(self._endpoint_ready.wait(), timeout=5.0) + except asyncio.TimeoutError: + # If endpoint not discovered, continue; send() will try discovery + pass except Exception: # If opening the SSE stream fails, still mark connected so # send() can attempt POST discovery and report meaningful errors. self._sse_response = None self._sse_task = None + self._endpoint_ready = None self._connected = True except ImportError as e: @@ -384,6 +394,12 @@ class SSETransport(MCPTransport): self._post_url = f"{p.scheme}://{p.netloc}/{endpoint.lstrip('/')}" except Exception: pass + # Notify connect() that endpoint is ready + try: + if self._endpoint_ready and not self._endpoint_ready.is_set(): + self._endpoint_ready.set() + except Exception: + pass else: # Try to parse as JSON and resolve pending futures try: