diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 1f055c5c..39721ca7 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -543,6 +543,20 @@ func (m *Manager) executeStreamWithModelPool(ctx context.Context, executor Provi return m.wrapStreamResult(ctx, auth.Clone(), provider, routeModel, streamResult.Headers, nil, errCh), nil } + if closed && len(buffered) == 0 { + emptyErr := &Error{Code: "empty_stream", Message: "upstream stream closed before first payload", Retryable: true} + result := Result{AuthID: auth.ID, Provider: provider, Model: routeModel, Success: false, Error: emptyErr} + m.MarkResult(ctx, result) + if idx < len(execModels)-1 { + lastErr = emptyErr + continue + } + errCh := make(chan cliproxyexecutor.StreamChunk, 1) + errCh <- cliproxyexecutor.StreamChunk{Err: emptyErr} + close(errCh) + return m.wrapStreamResult(ctx, auth.Clone(), provider, routeModel, streamResult.Headers, nil, errCh), nil + } + remaining := streamResult.Chunks if closed { closedCh := make(chan cliproxyexecutor.StreamChunk) diff --git a/sdk/cliproxy/auth/openai_compat_pool_test.go b/sdk/cliproxy/auth/openai_compat_pool_test.go index d873fd38..5a5ecb4f 100644 --- a/sdk/cliproxy/auth/openai_compat_pool_test.go +++ b/sdk/cliproxy/auth/openai_compat_pool_test.go @@ -21,6 +21,7 @@ type openAICompatPoolExecutor struct { executeErrors map[string]error countErrors map[string]error streamFirstErrors map[string]error + streamPayloads map[string][]cliproxyexecutor.StreamChunk } func (e *openAICompatPoolExecutor) Identifier() string { return e.id } @@ -46,14 +47,22 @@ func (e *openAICompatPoolExecutor) ExecuteStream(ctx context.Context, auth *Auth e.mu.Lock() e.streamModels = append(e.streamModels, req.Model) err := e.streamFirstErrors[req.Model] + payloadChunks, hasCustomChunks := e.streamPayloads[req.Model] + chunks := append([]cliproxyexecutor.StreamChunk(nil), payloadChunks...) e.mu.Unlock() - ch := make(chan cliproxyexecutor.StreamChunk, 1) + ch := make(chan cliproxyexecutor.StreamChunk, max(1, len(chunks))) if err != nil { ch <- cliproxyexecutor.StreamChunk{Err: err} close(ch) return &cliproxyexecutor.StreamResult{Headers: http.Header{"X-Model": {req.Model}}, Chunks: ch}, nil } - ch <- cliproxyexecutor.StreamChunk{Payload: []byte(req.Model)} + if !hasCustomChunks { + ch <- cliproxyexecutor.StreamChunk{Payload: []byte(req.Model)} + } else { + for _, chunk := range chunks { + ch <- chunk + } + } close(ch) return &cliproxyexecutor.StreamResult{Headers: http.Header{"X-Model": {req.Model}}, Chunks: ch}, nil } @@ -261,6 +270,42 @@ func TestManagerExecute_OpenAICompatAliasPoolFallsBackWithinSameAuth(t *testing. } } +func TestManagerExecuteStream_OpenAICompatAliasPoolRetriesOnEmptyBootstrap(t *testing.T) { + alias := "claude-opus-4.66" + executor := &openAICompatPoolExecutor{ + id: "pool", + streamPayloads: map[string][]cliproxyexecutor.StreamChunk{ + "qwen3.5-plus": {}, + }, + } + m := newOpenAICompatPoolTestManager(t, alias, []internalconfig.OpenAICompatibilityModel{ + {Name: "qwen3.5-plus", Alias: alias}, + {Name: "glm-5", Alias: alias}, + }, executor) + + streamResult, err := m.ExecuteStream(context.Background(), []string{"pool"}, cliproxyexecutor.Request{Model: alias}, cliproxyexecutor.Options{}) + if err != nil { + t.Fatalf("execute stream: %v", err) + } + var payload []byte + for chunk := range streamResult.Chunks { + if chunk.Err != nil { + t.Fatalf("unexpected stream error: %v", chunk.Err) + } + payload = append(payload, chunk.Payload...) + } + if string(payload) != "glm-5" { + t.Fatalf("payload = %q, want %q", string(payload), "glm-5") + } + got := executor.StreamModels() + want := []string{"qwen3.5-plus", "glm-5"} + for i := range want { + if got[i] != want[i] { + t.Fatalf("stream call %d model = %q, want %q", i, got[i], want[i]) + } + } +} + func TestManagerExecuteStream_OpenAICompatAliasPoolFallsBackBeforeFirstByte(t *testing.T) { alias := "claude-opus-4.66" executor := &openAICompatPoolExecutor{