feat(cursor): implement StatusError for conductor cooldown integration

Cursor executor errors were plain fmt.Errorf — the conductor couldn't
extract HTTP status codes, so exhausted accounts never entered cooldown.

Changes:
- Add ConnectError struct to proto/connect.go: ParseConnectEndStream now
  returns *ConnectError with Code/Message fields for precise matching
- Add cursorStatusErr implementing StatusError + RetryAfter interfaces
- Add classifyCursorError() with two-layer classification:
  Layer 1: exact match on ConnectError.Code (gRPC standard codes)
    resource_exhausted → 429, unauthenticated → 401,
    permission_denied → 403, unavailable → 503, internal → 500
  Layer 2: fuzzy string match for H2 errors (RST_STREAM → 502)
- Log all ConnectError code/message pairs for observing real server
  error codes (we have no samples yet)
- Wrap Execute and ExecuteStream error returns with classifyCursorError

Now the conductor properly marks Cursor auths as cooldown on quota errors,
enabling exponential backoff and round-robin failover.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
MrHuangJser
2026-03-27 11:42:22 +08:00
parent 40dee4453a
commit 1b7447b682
2 changed files with 71 additions and 3 deletions

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"errors"
"crypto/tls"
"encoding/base64"
"encoding/hex"
@@ -140,6 +141,60 @@ func (e *CursorExecutor) findSessionByConversationLocked(convId string) string {
return ""
}
// cursorStatusErr implements the StatusError and RetryAfter interfaces so the
// conductor can classify Cursor errors (e.g. 429 → quota cooldown).
type cursorStatusErr struct {
code int
msg string
}
func (e cursorStatusErr) Error() string { return e.msg }
func (e cursorStatusErr) StatusCode() int { return e.code }
func (e cursorStatusErr) RetryAfter() *time.Duration { return nil } // no retry-after info from Cursor; conductor uses exponential backoff
// classifyCursorError maps Cursor Connect/H2 errors to HTTP status codes.
// Layer 1: precise match on ConnectError.Code (gRPC standard codes).
// Layer 2: fuzzy string match for H2 frame errors and unknown formats.
// Unclassified errors pass through unchanged.
func classifyCursorError(err error) error {
if err == nil {
return nil
}
// Layer 1: structured ConnectError from ParseConnectEndStream
var ce *cursorproto.ConnectError
if errors.As(err, &ce) {
log.Infof("cursor: Connect error code=%q message=%q", ce.Code, ce.Message)
switch ce.Code {
case "resource_exhausted":
return cursorStatusErr{code: 429, msg: err.Error()}
case "unauthenticated":
return cursorStatusErr{code: 401, msg: err.Error()}
case "permission_denied":
return cursorStatusErr{code: 403, msg: err.Error()}
case "unavailable":
return cursorStatusErr{code: 503, msg: err.Error()}
case "internal":
return cursorStatusErr{code: 500, msg: err.Error()}
default:
// Unknown Connect code — log for observation, treat as 502
return cursorStatusErr{code: 502, msg: err.Error()}
}
}
// Layer 2: fuzzy match for H2 errors and unstructured messages
msg := strings.ToLower(err.Error())
switch {
case strings.Contains(msg, "rate limit") || strings.Contains(msg, "quota") ||
strings.Contains(msg, "too many"):
return cursorStatusErr{code: 429, msg: err.Error()}
case strings.Contains(msg, "rst_stream") || strings.Contains(msg, "goaway"):
return cursorStatusErr{code: 502, msg: err.Error()}
}
return err
}
// PrepareRequest implements ProviderExecutor (for HttpRequest support).
func (e *CursorExecutor) PrepareRequest(req *http.Request, auth *cliproxyauth.Auth) error {
token := cursorAccessToken(auth)
@@ -273,7 +328,7 @@ func (e *CursorExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
nil, // tokenUsage - non-streaming
nil, // onCheckpoint - non-streaming doesn't persist
); streamErr != nil && fullText.Len() == 0 {
return resp, fmt.Errorf("cursor: stream error: %w", streamErr)
return resp, classifyCursorError(fmt.Errorf("cursor: stream error: %w", streamErr))
}
id := "chatcmpl-" + uuid.New().String()[:28]
@@ -680,7 +735,7 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
// return an error so the conductor retries with a different auth.
select {
case streamErr := <-streamErrCh:
return nil, fmt.Errorf("cursor: stream failed before response: %w", streamErr)
return nil, classifyCursorError(fmt.Errorf("cursor: stream failed before response: %w", streamErr))
case <-firstChunkSent:
// Data started flowing — return stream to client
return &cliproxyexecutor.StreamResult{Chunks: chunks}, nil