mcp: wait for SSE endpoint discovery in connect() to avoid writer races

This commit is contained in:
giveen
2026-01-14 16:56:21 -07:00
parent 09723c8ed9
commit 5fd6bd7c11

View File

@@ -179,6 +179,7 @@ class SSETransport(MCPTransport):
self._sse_task: Optional[asyncio.Task] = None
self._pending: dict[str, asyncio.Future] = {}
self._pending_lock = asyncio.Lock()
self._endpoint_ready: Optional[asyncio.Event] = None
@property
def is_connected(self) -> bool:
@@ -200,12 +201,21 @@ class SSETransport(MCPTransport):
resp = await self.session.get(self.url, timeout=None)
# Store response and start background reader
self._sse_response = resp
# event used to signal when endpoint announced
self._endpoint_ready = asyncio.Event()
self._sse_task = asyncio.create_task(self._sse_listener(resp))
# Wait a short time for the endpoint to be discovered to avoid races
try:
await asyncio.wait_for(self._endpoint_ready.wait(), timeout=5.0)
except asyncio.TimeoutError:
# If endpoint not discovered, continue; send() will try discovery
pass
except Exception:
# If opening the SSE stream fails, still mark connected so
# send() can attempt POST discovery and report meaningful errors.
self._sse_response = None
self._sse_task = None
self._endpoint_ready = None
self._connected = True
except ImportError as e:
@@ -384,6 +394,12 @@ class SSETransport(MCPTransport):
self._post_url = f"{p.scheme}://{p.netloc}/{endpoint.lstrip('/')}"
except Exception:
pass
# Notify connect() that endpoint is ready
try:
if self._endpoint_ready and not self._endpoint_ready.is_set():
self._endpoint_ready.set()
except Exception:
pass
else:
# Try to parse as JSON and resolve pending futures
try: