diff --git a/internal/runtime/executor/qwen_executor.go b/internal/runtime/executor/qwen_executor.go index 5c8ff039..ec02460e 100644 --- a/internal/runtime/executor/qwen_executor.go +++ b/internal/runtime/executor/qwen_executor.go @@ -153,6 +153,17 @@ func wrapQwenError(ctx context.Context, httpCode int, body []byte) (errCode int, return errCode, retryAfter } +func qwenShouldAttemptImmediateRefreshRetry(auth *cliproxyauth.Auth) bool { + if auth == nil || auth.Metadata == nil { + return false + } + if provider := strings.TrimSpace(auth.Provider); provider != "" && !strings.EqualFold(provider, "qwen") { + return false + } + refreshToken, _ := auth.Metadata["refresh_token"].(string) + return strings.TrimSpace(refreshToken) != "" +} + // ensureQwenSystemMessage ensures the request has a single system message at the beginning. // It always injects the default system prompt and merges any user-provided system messages // into the injected system message content to satisfy Qwen's strict message ordering rules. @@ -255,7 +266,8 @@ func ensureQwenSystemMessage(payload []byte) ([]byte, error) { // QwenExecutor is a stateless executor for Qwen Code using OpenAI-compatible chat completions. // If access token is unavailable, it falls back to legacy via ClientAdapter. type QwenExecutor struct { - cfg *config.Config + cfg *config.Config + refreshForImmediateRetry func(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) } func NewQwenExecutor(cfg *config.Config) *QwenExecutor { return &QwenExecutor{cfg: cfg} } @@ -295,23 +307,13 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req return resp, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"} } - // Check rate limit before proceeding var authID string if auth != nil { authID = auth.ID } - if err := checkQwenRateLimit(authID); err != nil { - helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID)) - return resp, err - } baseModel := thinking.ParseSuffix(req.Model).ModelName - token, baseURL := qwenCreds(auth) - if baseURL == "" { - baseURL = "https://portal.qwen.ai/v1" - } - reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth) defer reporter.TrackFailure(ctx, &err) @@ -338,68 +340,107 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req return resp, err } - url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) - if err != nil { - return resp, err - } - applyQwenHeaders(httpReq, token, false) - var attrs map[string]string - if auth != nil { - attrs = auth.Attributes - } - util.ApplyCustomHeadersFromAttrs(httpReq, attrs) - var authLabel, authType, authValue string - if auth != nil { - authLabel = auth.Label - authType, authValue = auth.AccountInfo() - } - helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{ - URL: url, - Method: http.MethodPost, - Headers: httpReq.Header.Clone(), - Body: body, - Provider: e.Identifier(), - AuthID: authID, - AuthLabel: authLabel, - AuthType: authType, - AuthValue: authValue, - }) + qwenImmediateRetryAttempted := false + for { + if errRate := checkQwenRateLimit(authID); errRate != nil { + helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID)) + return resp, errRate + } - httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - httpResp, err := httpClient.Do(httpReq) - if err != nil { - helps.RecordAPIResponseError(ctx, e.cfg, err) - return resp, err - } - defer func() { + token, baseURL := qwenCreds(auth) + if baseURL == "" { + baseURL = "https://portal.qwen.ai/v1" + } + + url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" + httpReq, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + if errReq != nil { + return resp, errReq + } + applyQwenHeaders(httpReq, token, false) + var attrs map[string]string + if auth != nil { + attrs = auth.Attributes + } + util.ApplyCustomHeadersFromAttrs(httpReq, attrs) + var authLabel, authType, authValue string + if auth != nil { + authLabel = auth.Label + authType, authValue = auth.AccountInfo() + } + helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{ + URL: url, + Method: http.MethodPost, + Headers: httpReq.Header.Clone(), + Body: body, + Provider: e.Identifier(), + AuthID: authID, + AuthLabel: authLabel, + AuthType: authType, + AuthValue: authValue, + }) + + httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpResp, errDo := httpClient.Do(httpReq) + if errDo != nil { + helps.RecordAPIResponseError(ctx, e.cfg, errDo) + return resp, errDo + } + + helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + b, _ := io.ReadAll(httpResp.Body) + helps.AppendAPIResponseChunk(ctx, e.cfg, b) + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("qwen executor: close response body error: %v", errClose) + } + + errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b) + helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + + if errCode == http.StatusTooManyRequests && !qwenImmediateRetryAttempted && qwenShouldAttemptImmediateRefreshRetry(auth) { + helps.LogWithRequestID(ctx).WithFields(log.Fields{ + "auth_id": redactAuthID(authID), + "model": req.Model, + }).Info("qwen 429 encountered, refreshing token for immediate retry") + + qwenImmediateRetryAttempted = true + refreshFn := e.refreshForImmediateRetry + if refreshFn == nil { + refreshFn = e.Refresh + } + refreshedAuth, errRefresh := refreshFn(ctx, auth) + if errRefresh != nil { + helps.LogWithRequestID(ctx).WithError(errRefresh).WithField("auth_id", redactAuthID(authID)).Warn("qwen 429 refresh failed; skipping immediate retry") + } else if refreshedAuth != nil { + auth = refreshedAuth + continue + } + } + + err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter} + return resp, err + } + + data, errRead := io.ReadAll(httpResp.Body) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("qwen executor: close response body error: %v", errClose) } - }() - helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { - b, _ := io.ReadAll(httpResp.Body) - helps.AppendAPIResponseChunk(ctx, e.cfg, b) + if errRead != nil { + helps.RecordAPIResponseError(ctx, e.cfg, errRead) + return resp, errRead + } - errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b) - helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) - err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter} - return resp, err + helps.AppendAPIResponseChunk(ctx, e.cfg, data) + reporter.Publish(ctx, helps.ParseOpenAIUsage(data)) + + var param any + // Note: TranslateNonStream uses req.Model (original with suffix) to preserve + // the original model name in the response for client compatibility. + out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m) + resp = cliproxyexecutor.Response{Payload: out, Headers: httpResp.Header.Clone()} + return resp, nil } - data, err := io.ReadAll(httpResp.Body) - if err != nil { - helps.RecordAPIResponseError(ctx, e.cfg, err) - return resp, err - } - helps.AppendAPIResponseChunk(ctx, e.cfg, data) - reporter.Publish(ctx, helps.ParseOpenAIUsage(data)) - var param any - // Note: TranslateNonStream uses req.Model (original with suffix) to preserve - // the original model name in the response for client compatibility. - out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m) - resp = cliproxyexecutor.Response{Payload: out, Headers: httpResp.Header.Clone()} - return resp, nil } func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) { @@ -407,23 +448,13 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"} } - // Check rate limit before proceeding var authID string if auth != nil { authID = auth.ID } - if err := checkQwenRateLimit(authID); err != nil { - helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID)) - return nil, err - } baseModel := thinking.ParseSuffix(req.Model).ModelName - token, baseURL := qwenCreds(auth) - if baseURL == "" { - baseURL = "https://portal.qwen.ai/v1" - } - reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth) defer reporter.TrackFailure(ctx, &err) @@ -457,86 +488,122 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut return nil, err } - url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" - httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) - if err != nil { - return nil, err - } - applyQwenHeaders(httpReq, token, true) - var attrs map[string]string - if auth != nil { - attrs = auth.Attributes - } - util.ApplyCustomHeadersFromAttrs(httpReq, attrs) - var authLabel, authType, authValue string - if auth != nil { - authLabel = auth.Label - authType, authValue = auth.AccountInfo() - } - helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{ - URL: url, - Method: http.MethodPost, - Headers: httpReq.Header.Clone(), - Body: body, - Provider: e.Identifier(), - AuthID: authID, - AuthLabel: authLabel, - AuthType: authType, - AuthValue: authValue, - }) - - httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) - httpResp, err := httpClient.Do(httpReq) - if err != nil { - helps.RecordAPIResponseError(ctx, e.cfg, err) - return nil, err - } - helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { - b, _ := io.ReadAll(httpResp.Body) - helps.AppendAPIResponseChunk(ctx, e.cfg, b) - - errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b) - helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) - if errClose := httpResp.Body.Close(); errClose != nil { - log.Errorf("qwen executor: close response body error: %v", errClose) + qwenImmediateRetryAttempted := false + for { + if errRate := checkQwenRateLimit(authID); errRate != nil { + helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID)) + return nil, errRate } - err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter} - return nil, err - } - out := make(chan cliproxyexecutor.StreamChunk) - go func() { - defer close(out) - defer func() { + + token, baseURL := qwenCreds(auth) + if baseURL == "" { + baseURL = "https://portal.qwen.ai/v1" + } + + url := strings.TrimSuffix(baseURL, "/") + "/chat/completions" + httpReq, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) + if errReq != nil { + return nil, errReq + } + applyQwenHeaders(httpReq, token, true) + var attrs map[string]string + if auth != nil { + attrs = auth.Attributes + } + util.ApplyCustomHeadersFromAttrs(httpReq, attrs) + var authLabel, authType, authValue string + if auth != nil { + authLabel = auth.Label + authType, authValue = auth.AccountInfo() + } + helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{ + URL: url, + Method: http.MethodPost, + Headers: httpReq.Header.Clone(), + Body: body, + Provider: e.Identifier(), + AuthID: authID, + AuthLabel: authLabel, + AuthType: authType, + AuthValue: authValue, + }) + + httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0) + httpResp, errDo := httpClient.Do(httpReq) + if errDo != nil { + helps.RecordAPIResponseError(ctx, e.cfg, errDo) + return nil, errDo + } + + helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) + if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 { + b, _ := io.ReadAll(httpResp.Body) + helps.AppendAPIResponseChunk(ctx, e.cfg, b) if errClose := httpResp.Body.Close(); errClose != nil { log.Errorf("qwen executor: close response body error: %v", errClose) } + + errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b) + helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) + + if errCode == http.StatusTooManyRequests && !qwenImmediateRetryAttempted && qwenShouldAttemptImmediateRefreshRetry(auth) { + helps.LogWithRequestID(ctx).WithFields(log.Fields{ + "auth_id": redactAuthID(authID), + "model": req.Model, + }).Info("qwen 429 encountered, refreshing token for immediate retry (stream)") + + qwenImmediateRetryAttempted = true + refreshFn := e.refreshForImmediateRetry + if refreshFn == nil { + refreshFn = e.Refresh + } + refreshedAuth, errRefresh := refreshFn(ctx, auth) + if errRefresh != nil { + helps.LogWithRequestID(ctx).WithError(errRefresh).WithField("auth_id", redactAuthID(authID)).Warn("qwen 429 refresh failed; skipping immediate retry (stream)") + } else if refreshedAuth != nil { + auth = refreshedAuth + continue + } + } + + err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter} + return nil, err + } + + out := make(chan cliproxyexecutor.StreamChunk) + go func() { + defer close(out) + defer func() { + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("qwen executor: close response body error: %v", errClose) + } + }() + scanner := bufio.NewScanner(httpResp.Body) + scanner.Buffer(nil, 52_428_800) // 50MB + var param any + for scanner.Scan() { + line := scanner.Bytes() + helps.AppendAPIResponseChunk(ctx, e.cfg, line) + if detail, ok := helps.ParseOpenAIStreamUsage(line); ok { + reporter.Publish(ctx, detail) + } + chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(line), ¶m) + for i := range chunks { + out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} + } + } + doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), ¶m) + for i := range doneChunks { + out <- cliproxyexecutor.StreamChunk{Payload: doneChunks[i]} + } + if errScan := scanner.Err(); errScan != nil { + helps.RecordAPIResponseError(ctx, e.cfg, errScan) + reporter.PublishFailure(ctx) + out <- cliproxyexecutor.StreamChunk{Err: errScan} + } }() - scanner := bufio.NewScanner(httpResp.Body) - scanner.Buffer(nil, 52_428_800) // 50MB - var param any - for scanner.Scan() { - line := scanner.Bytes() - helps.AppendAPIResponseChunk(ctx, e.cfg, line) - if detail, ok := helps.ParseOpenAIStreamUsage(line); ok { - reporter.Publish(ctx, detail) - } - chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(line), ¶m) - for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} - } - } - doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), ¶m) - for i := range doneChunks { - out <- cliproxyexecutor.StreamChunk{Payload: doneChunks[i]} - } - if errScan := scanner.Err(); errScan != nil { - helps.RecordAPIResponseError(ctx, e.cfg, errScan) - reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} - } - }() - return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil + return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil + } } func (e *QwenExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { diff --git a/internal/runtime/executor/qwen_executor_test.go b/internal/runtime/executor/qwen_executor_test.go index cf9ed21f..97b4757e 100644 --- a/internal/runtime/executor/qwen_executor_test.go +++ b/internal/runtime/executor/qwen_executor_test.go @@ -3,10 +3,16 @@ package executor import ( "context" "net/http" + "net/http/httptest" + "sync/atomic" "testing" + "time" + "github.com/router-for-me/CLIProxyAPI/v6/internal/config" "github.com/router-for-me/CLIProxyAPI/v6/internal/thinking" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth" + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" + sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator" "github.com/tidwall/gjson" ) @@ -209,3 +215,168 @@ func TestQwenCreds_NormalizesResourceURL(t *testing.T) { }) } } + +func TestQwenExecutorExecute_429RefreshAndRetry(t *testing.T) { + qwenRateLimiter.Lock() + qwenRateLimiter.requests = make(map[string][]time.Time) + qwenRateLimiter.Unlock() + + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&calls, 1) + if r.URL.Path != "/v1/chat/completions" { + w.WriteHeader(http.StatusNotFound) + return + } + switch r.Header.Get("Authorization") { + case "Bearer old-token": + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusTooManyRequests) + _, _ = w.Write([]byte(`{"error":{"code":"quota_exceeded","message":"quota exceeded","type":"quota_exceeded"}}`)) + return + case "Bearer new-token": + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"id":"chatcmpl-test","object":"chat.completion","created":1,"model":"qwen-max","choices":[{"index":0,"message":{"role":"assistant","content":"hi"},"finish_reason":"stop"}],"usage":{"prompt_tokens":1,"completion_tokens":1,"total_tokens":2}}`)) + return + default: + w.WriteHeader(http.StatusUnauthorized) + return + } + })) + defer srv.Close() + + exec := NewQwenExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + ID: "auth-test", + Provider: "qwen", + Attributes: map[string]string{ + "base_url": srv.URL + "/v1", + }, + Metadata: map[string]any{ + "access_token": "old-token", + "refresh_token": "refresh-token", + }, + } + + var refresherCalls int32 + exec.refreshForImmediateRetry = func(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) { + atomic.AddInt32(&refresherCalls, 1) + refreshed := auth.Clone() + if refreshed.Metadata == nil { + refreshed.Metadata = make(map[string]any) + } + refreshed.Metadata["access_token"] = "new-token" + refreshed.Metadata["refresh_token"] = "refresh-token-2" + return refreshed, nil + } + ctx := context.Background() + + resp, err := exec.Execute(ctx, auth, cliproxyexecutor.Request{ + Model: "qwen-max", + Payload: []byte(`{"model":"qwen-max","messages":[{"role":"user","content":"hi"}]}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai"), + }) + if err != nil { + t.Fatalf("Execute() error = %v", err) + } + if len(resp.Payload) == 0 { + t.Fatalf("Execute() payload is empty") + } + if atomic.LoadInt32(&calls) != 2 { + t.Fatalf("upstream calls = %d, want 2", atomic.LoadInt32(&calls)) + } + if atomic.LoadInt32(&refresherCalls) != 1 { + t.Fatalf("refresher calls = %d, want 1", atomic.LoadInt32(&refresherCalls)) + } +} + +func TestQwenExecutorExecuteStream_429RefreshAndRetry(t *testing.T) { + qwenRateLimiter.Lock() + qwenRateLimiter.requests = make(map[string][]time.Time) + qwenRateLimiter.Unlock() + + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&calls, 1) + if r.URL.Path != "/v1/chat/completions" { + w.WriteHeader(http.StatusNotFound) + return + } + switch r.Header.Get("Authorization") { + case "Bearer old-token": + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusTooManyRequests) + _, _ = w.Write([]byte(`{"error":{"code":"quota_exceeded","message":"quota exceeded","type":"quota_exceeded"}}`)) + return + case "Bearer new-token": + w.Header().Set("Content-Type", "text/event-stream") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("data: {\"id\":\"chatcmpl-test\",\"object\":\"chat.completion.chunk\",\"created\":1,\"model\":\"qwen-max\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":null}]}\n")) + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } + return + default: + w.WriteHeader(http.StatusUnauthorized) + return + } + })) + defer srv.Close() + + exec := NewQwenExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + ID: "auth-test", + Provider: "qwen", + Attributes: map[string]string{ + "base_url": srv.URL + "/v1", + }, + Metadata: map[string]any{ + "access_token": "old-token", + "refresh_token": "refresh-token", + }, + } + + var refresherCalls int32 + exec.refreshForImmediateRetry = func(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) { + atomic.AddInt32(&refresherCalls, 1) + refreshed := auth.Clone() + if refreshed.Metadata == nil { + refreshed.Metadata = make(map[string]any) + } + refreshed.Metadata["access_token"] = "new-token" + refreshed.Metadata["refresh_token"] = "refresh-token-2" + return refreshed, nil + } + ctx := context.Background() + + stream, err := exec.ExecuteStream(ctx, auth, cliproxyexecutor.Request{ + Model: "qwen-max", + Payload: []byte(`{"model":"qwen-max","stream":true,"messages":[{"role":"user","content":"hi"}]}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai"), + }) + if err != nil { + t.Fatalf("ExecuteStream() error = %v", err) + } + if atomic.LoadInt32(&calls) != 2 { + t.Fatalf("upstream calls = %d, want 2", atomic.LoadInt32(&calls)) + } + if atomic.LoadInt32(&refresherCalls) != 1 { + t.Fatalf("refresher calls = %d, want 1", atomic.LoadInt32(&refresherCalls)) + } + + var sawPayload bool + for chunk := range stream.Chunks { + if chunk.Err != nil { + t.Fatalf("stream chunk error = %v", chunk.Err) + } + if len(chunk.Payload) > 0 { + sawPayload = true + } + } + if !sawPayload { + t.Fatalf("stream did not produce any payload chunks") + } +}