feat(cursor): auto-migrate sessions to healthy account on quota exhaustion

When a Cursor account's quota is exhausted, sessions bound to it can now
seamlessly continue on a different account:

Layer 1 — Checkpoint decoupling:
  Key checkpoints by conversationId (not authID:conversationId). Store
  authID inside savedCheckpoint. On lookup, if auth changed, discard the
  stale checkpoint and flatten conversation history into userText.

Layer 2 — Cross-account session cleanup:
  When a request arrives for a conversation whose session belongs to a
  different (now-exhausted) auth, close the old H2 stream and remove
  the stale session to free resources.

Layer 3 — H2Stream.Err() exposure:
  New Err() method on H2Stream so callers can inspect RST_STREAM,
  GOAWAY, or other stream-level errors after closure.

Layer 4 — processH2SessionFrames error propagation:
  Returns error instead of bare return. Connect EndStream errors (quota,
  rate limit) are now propagated instead of being logged and swallowed.

Layer 5 — Pre-response transparent retry:
  If the stream fails before any data is sent to the client, return an
  error to the conductor so it retries with a different auth — fully
  transparent to the client.

Layer 6 — Post-response error logging:
  If the stream fails after data was already sent, log a warning. The
  conductor's existing cooldown mechanism ensures the next request routes
  to a healthy account.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
MrHuangJser
2026-03-27 10:50:32 +08:00
parent 8902e1cccb
commit 40dee4453a
2 changed files with 128 additions and 23 deletions

View File

@@ -205,6 +205,10 @@ func (s *H2Stream) Data() <-chan []byte { return s.dataCh }
// Done returns a channel closed when the stream ends.
func (s *H2Stream) Done() <-chan struct{} { return s.doneCh }
// Err returns the error (if any) that caused the stream to close.
// Returns nil for a clean shutdown (EOF / StreamEnded).
func (s *H2Stream) Err() error { return s.err }
// Close tears down the connection.
func (s *H2Stream) Close() {
s.conn.Close()

View File

@@ -51,6 +51,7 @@ type CursorExecutor struct {
type savedCheckpoint struct {
data []byte // raw ConversationStateStructure protobuf bytes
blobStore map[string][]byte // blobs referenced by the checkpoint
authID string // auth that produced this checkpoint (checkpoint is auth-specific)
updatedAt time.Time
}
@@ -126,6 +127,19 @@ func (e *CursorExecutor) cleanupLoop() {
}
}
// findSessionByConversationLocked searches for a session matching the given
// conversationId regardless of authID. Used to find and clean up stale sessions
// from a previous auth after quota failover. Caller must hold e.mu.
func (e *CursorExecutor) findSessionByConversationLocked(convId string) string {
suffix := ":" + convId
for k := range e.sessions {
if strings.HasSuffix(k, suffix) {
return k
}
}
return ""
}
// PrepareRequest implements ProviderExecutor (for HttpRequest support).
func (e *CursorExecutor) PrepareRequest(req *http.Request, auth *cliproxyauth.Auth) error {
token := cursorAccessToken(auth)
@@ -250,7 +264,7 @@ func (e *CursorExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
// Collect full text from streaming response
var fullText strings.Builder
processH2SessionFrames(sessionCtx, stream, params.BlobStore, nil,
if streamErr := processH2SessionFrames(sessionCtx, stream, params.BlobStore, nil,
func(text string, isThinking bool) {
fullText.WriteString(text)
},
@@ -258,7 +272,9 @@ func (e *CursorExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
nil,
nil, // tokenUsage - non-streaming
nil, // onCheckpoint - non-streaming doesn't persist
)
); streamErr != nil && fullText.Len() == 0 {
return resp, fmt.Errorf("cursor: stream error: %w", streamErr)
}
id := "chatcmpl-" + uuid.New().String()[:28]
created := time.Now().Unix()
@@ -324,9 +340,10 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
authID := auth.ID // e.g. "cursor.json" or "cursor-account2.json"
log.Debugf("cursor: conversationId=%s authID=%s", conversationId, authID)
// Include authID in keys for multi-account isolation
// Session key includes authID (H2 stream is auth-specific, not transferable).
// Checkpoint key uses conversationId only — allows detecting auth migration.
sessionKey := authID + ":" + conversationId
checkpointKey := sessionKey // same isolation
checkpointKey := conversationId
needsTranslate := from.String() != "" && from.String() != "openai"
// Check if we can resume an existing session with tool results
@@ -336,6 +353,20 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
if hasSession {
delete(e.sessions, sessionKey)
}
// If no session found for current auth, check for stale sessions from
// a different auth on the same conversation (quota failover scenario).
// Clean them up since the H2 stream belongs to the old account.
if !hasSession {
if oldKey := e.findSessionByConversationLocked(conversationId); oldKey != "" {
oldSession := e.sessions[oldKey]
log.Infof("cursor: cleaning up stale session from auth %s for conv=%s (auth migrated to %s)", oldSession.authID, conversationId, authID)
oldSession.cancel()
if oldSession.stream != nil {
oldSession.stream.Close()
}
delete(e.sessions, oldKey)
}
}
e.mu.Unlock()
if hasSession && session.stream != nil && session.authID == authID {
@@ -347,23 +378,33 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
}
}
// Clean up any stale session for this key
// Clean up any stale session for this key (or from a previous auth on same conversation)
e.mu.Lock()
if old, ok := e.sessions[sessionKey]; ok {
old.cancel()
delete(e.sessions, sessionKey)
} else if oldKey := e.findSessionByConversationLocked(conversationId); oldKey != "" {
old := e.sessions[oldKey]
old.cancel()
if old.stream != nil {
old.stream.Close()
}
delete(e.sessions, oldKey)
}
e.mu.Unlock()
// Look up saved checkpoint for this conversation + account
// Look up saved checkpoint for this conversation (keyed by conversationId only).
// Checkpoint is auth-specific: if auth changed (e.g. quota exhaustion failover),
// the old checkpoint is useless on the new account — discard and flatten.
e.mu.Lock()
saved, hasCheckpoint := e.checkpoints[checkpointKey]
e.mu.Unlock()
params := buildRunRequestParams(parsed, conversationId)
if hasCheckpoint && saved.data != nil {
log.Debugf("cursor: using saved checkpoint (%d bytes) for key=%s", len(saved.data), checkpointKey)
if hasCheckpoint && saved.data != nil && saved.authID == authID {
// Same auth — use checkpoint normally
log.Debugf("cursor: using saved checkpoint (%d bytes) for conv=%s auth=%s", len(saved.data), checkpointKey, authID)
params.RawCheckpoint = saved.data
// Merge saved blobStore into params
if params.BlobStore == nil {
@@ -374,6 +415,17 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
params.BlobStore[k] = v
}
}
} else if hasCheckpoint && saved.data != nil && saved.authID != authID {
// Auth changed (quota failover) — checkpoint is not portable across accounts.
// Discard and flatten conversation history into userText.
log.Infof("cursor: auth migrated (%s → %s) for conv=%s, discarding checkpoint and flattening context", saved.authID, authID, checkpointKey)
e.mu.Lock()
delete(e.checkpoints, checkpointKey)
e.mu.Unlock()
if len(parsed.ToolResults) > 0 || len(parsed.Turns) > 0 {
flattenConversationIntoUserText(parsed)
params = buildRunRequestParams(parsed, conversationId)
}
} else if len(parsed.ToolResults) > 0 || len(parsed.Turns) > 0 {
// Fallback: no checkpoint available (cold resume / proxy restart).
// Flatten the full conversation history (including tool interactions) into userText.
@@ -458,6 +510,21 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
}
}
// Pre-response error detection for transparent failover:
// If the stream fails before any chunk is emitted (e.g. quota exceeded),
// ExecuteStream returns an error so the conductor retries with a different auth.
streamErrCh := make(chan error, 1)
firstChunkSent := make(chan struct{}, 1) // buffered: goroutine won't block signaling
origEmitToOut := emitToOut
emitToOut = func(chunk cliproxyexecutor.StreamChunk) {
select {
case firstChunkSent <- struct{}{}:
default:
}
origEmitToOut(chunk)
}
go func() {
var resumeOutCh chan cliproxyexecutor.StreamChunk
_ = resumeOutCh
@@ -466,7 +533,7 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
usage := &cursorTokenUsage{}
usage.setInputEstimate(len(payload))
processH2SessionFrames(sessionCtx, stream, params.BlobStore, params.McpTools,
streamErr := processH2SessionFrames(sessionCtx, stream, params.BlobStore, params.McpTools,
func(text string, isThinking bool) {
if isThinking {
if !thinkingActive {
@@ -537,19 +604,43 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
toolResultCh,
usage,
func(cpData []byte) {
// Save checkpoint for this conversation
// Save checkpoint keyed by conversationId, tagged with authID for migration detection
e.mu.Lock()
e.checkpoints[checkpointKey] = &savedCheckpoint{
data: cpData,
blobStore: params.BlobStore,
authID: authID,
updatedAt: time.Now(),
}
e.mu.Unlock()
log.Debugf("cursor: saved checkpoint (%d bytes) for key=%s", len(cpData), checkpointKey)
log.Debugf("cursor: saved checkpoint (%d bytes) for conv=%s auth=%s", len(cpData), checkpointKey, authID)
},
)
// processH2SessionFrames returned — stream is done
// processH2SessionFrames returned — stream is done.
// Check if error happened before any chunks were emitted.
if streamErr != nil {
select {
case <-firstChunkSent:
// Chunks were already sent to client — can't transparently retry.
// Next request will failover via conductor's cooldown mechanism.
log.Warnf("cursor: stream error after data sent (auth=%s conv=%s): %v", authID, conversationId, streamErr)
default:
// No data sent yet — propagate error for transparent conductor retry.
log.Warnf("cursor: stream error before data sent (auth=%s conv=%s): %v — signaling retry", authID, conversationId, streamErr)
streamErrCh <- streamErr
outMu.Lock()
if currentOut != nil {
close(currentOut)
currentOut = nil
}
outMu.Unlock()
sessionCancel()
stream.Close()
return
}
}
if thinkingActive {
sendChunkSwitchable(`{"content":"</think>"}`, "")
}
@@ -584,7 +675,16 @@ func (e *CursorExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
stream.Close()
}()
return &cliproxyexecutor.StreamResult{Chunks: chunks}, nil
// Wait for either the first chunk or a pre-response error.
// If the stream fails before emitting any data (e.g. quota exceeded),
// return an error so the conductor retries with a different auth.
select {
case streamErr := <-streamErrCh:
return nil, fmt.Errorf("cursor: stream failed before response: %w", streamErr)
case <-firstChunkSent:
// Data started flowing — return stream to client
return &cliproxyexecutor.StreamResult{Chunks: chunks}, nil
}
}
// resumeWithToolResults injects tool results into the running processH2SessionFrames
@@ -701,7 +801,7 @@ func processH2SessionFrames(
toolResultCh <-chan []toolResultInfo, // nil for no tool result injection; non-nil to wait for results
tokenUsage *cursorTokenUsage, // tracks accumulated token usage (may be nil)
onCheckpoint func(data []byte), // called when server sends conversation_checkpoint_update
) {
) error {
var buf bytes.Buffer
rejectReason := "Tool not available in this environment. Use the MCP tools provided instead."
log.Debugf("cursor: processH2SessionFrames started for streamID=%s, waiting for data...", stream.ID())
@@ -709,11 +809,11 @@ func processH2SessionFrames(
select {
case <-ctx.Done():
log.Debugf("cursor: processH2SessionFrames exiting: context done")
return
return ctx.Err()
case data, ok := <-stream.Data():
if !ok {
log.Debugf("cursor: processH2SessionFrames[%s]: exiting: stream data channel closed", stream.ID())
return
return stream.Err() // may be RST_STREAM, GOAWAY, or nil for clean close
}
// Log first 20 bytes of raw data for debugging
previewLen := min(20, len(data))
@@ -740,6 +840,7 @@ func processH2SessionFrames(
if flags&cursorproto.ConnectEndStreamFlag != 0 {
if err := cursorproto.ParseConnectEndStream(payload); err != nil {
log.Warnf("cursor: connect end stream error: %v", err)
return err // propagate server-side errors (quota, rate limit, etc.)
}
continue
}
@@ -765,7 +866,7 @@ func processH2SessionFrames(
case cursorproto.ServerMsgTurnEnded:
log.Debugf("cursor: TurnEnded received, stream will finish")
return
return nil // clean completion
case cursorproto.ServerMsgHeartbeat:
// Server heartbeat, ignore silently
@@ -818,7 +919,7 @@ func processH2SessionFrames(
onMcpExec(pending)
if toolResultCh == nil {
return
return nil
}
// Inline mode: wait for tool result while handling KV/heartbeat
@@ -828,16 +929,16 @@ func processH2SessionFrames(
for {
select {
case <-ctx.Done():
return
return ctx.Err()
case results, ok := <-toolResultCh:
if !ok {
return
return nil
}
toolResults = results
break waitLoop
case waitData, ok := <-stream.Data():
if !ok {
return
return stream.Err()
}
buf.Write(waitData)
for {
@@ -875,7 +976,7 @@ func processH2SessionFrames(
}
}
case <-stream.Done():
return
return stream.Err()
}
}
@@ -916,7 +1017,7 @@ func processH2SessionFrames(
case <-stream.Done():
log.Debugf("cursor: processH2SessionFrames exiting: stream done")
return
return stream.Err()
}
}
}