diff --git a/internal/runtime/executor/qwen_executor.go b/internal/runtime/executor/qwen_executor.go index ec02460e..146be5c1 100644 --- a/internal/runtime/executor/qwen_executor.go +++ b/internal/runtime/executor/qwen_executor.go @@ -153,17 +153,6 @@ func wrapQwenError(ctx context.Context, httpCode int, body []byte) (errCode int, return errCode, retryAfter } -func qwenShouldAttemptImmediateRefreshRetry(auth *cliproxyauth.Auth) bool { - if auth == nil || auth.Metadata == nil { - return false - } - if provider := strings.TrimSpace(auth.Provider); provider != "" && !strings.EqualFold(provider, "qwen") { - return false - } - refreshToken, _ := auth.Metadata["refresh_token"].(string) - return strings.TrimSpace(refreshToken) != "" -} - // ensureQwenSystemMessage ensures the request has a single system message at the beginning. // It always injects the default system prompt and merges any user-provided system messages // into the injected system message content to satisfy Qwen's strict message ordering rules. @@ -340,7 +329,6 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req return resp, err } - qwenImmediateRetryAttempted := false for { if errRate := checkQwenRateLimit(authID); errRate != nil { helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID)) @@ -398,26 +386,6 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b) helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) - if errCode == http.StatusTooManyRequests && !qwenImmediateRetryAttempted && qwenShouldAttemptImmediateRefreshRetry(auth) { - helps.LogWithRequestID(ctx).WithFields(log.Fields{ - "auth_id": redactAuthID(authID), - "model": req.Model, - }).Info("qwen 429 encountered, refreshing token for immediate retry") - - qwenImmediateRetryAttempted = true - refreshFn := e.refreshForImmediateRetry - if refreshFn == nil { - refreshFn = e.Refresh - } - refreshedAuth, errRefresh := refreshFn(ctx, auth) - if errRefresh != nil { - helps.LogWithRequestID(ctx).WithError(errRefresh).WithField("auth_id", redactAuthID(authID)).Warn("qwen 429 refresh failed; skipping immediate retry") - } else if refreshedAuth != nil { - auth = refreshedAuth - continue - } - } - err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter} return resp, err } @@ -488,7 +456,6 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut return nil, err } - qwenImmediateRetryAttempted := false for { if errRate := checkQwenRateLimit(authID); errRate != nil { helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID)) @@ -546,26 +513,6 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b) helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) - if errCode == http.StatusTooManyRequests && !qwenImmediateRetryAttempted && qwenShouldAttemptImmediateRefreshRetry(auth) { - helps.LogWithRequestID(ctx).WithFields(log.Fields{ - "auth_id": redactAuthID(authID), - "model": req.Model, - }).Info("qwen 429 encountered, refreshing token for immediate retry (stream)") - - qwenImmediateRetryAttempted = true - refreshFn := e.refreshForImmediateRetry - if refreshFn == nil { - refreshFn = e.Refresh - } - refreshedAuth, errRefresh := refreshFn(ctx, auth) - if errRefresh != nil { - helps.LogWithRequestID(ctx).WithError(errRefresh).WithField("auth_id", redactAuthID(authID)).Warn("qwen 429 refresh failed; skipping immediate retry (stream)") - } else if refreshedAuth != nil { - auth = refreshedAuth - continue - } - } - err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter} return nil, err } diff --git a/internal/runtime/executor/qwen_executor_test.go b/internal/runtime/executor/qwen_executor_test.go index 97b4757e..f6363f66 100644 --- a/internal/runtime/executor/qwen_executor_test.go +++ b/internal/runtime/executor/qwen_executor_test.go @@ -216,7 +216,7 @@ func TestQwenCreds_NormalizesResourceURL(t *testing.T) { } } -func TestQwenExecutorExecute_429RefreshAndRetry(t *testing.T) { +func TestQwenExecutorExecute_429DoesNotRefreshOrRetry(t *testing.T) { qwenRateLimiter.Lock() qwenRateLimiter.requests = make(map[string][]time.Time) qwenRateLimiter.Unlock() @@ -272,27 +272,31 @@ func TestQwenExecutorExecute_429RefreshAndRetry(t *testing.T) { } ctx := context.Background() - resp, err := exec.Execute(ctx, auth, cliproxyexecutor.Request{ + _, err := exec.Execute(ctx, auth, cliproxyexecutor.Request{ Model: "qwen-max", Payload: []byte(`{"model":"qwen-max","messages":[{"role":"user","content":"hi"}]}`), }, cliproxyexecutor.Options{ SourceFormat: sdktranslator.FromString("openai"), }) - if err != nil { - t.Fatalf("Execute() error = %v", err) + if err == nil { + t.Fatalf("Execute() expected error, got nil") } - if len(resp.Payload) == 0 { - t.Fatalf("Execute() payload is empty") + status, ok := err.(statusErr) + if !ok { + t.Fatalf("Execute() error type = %T, want statusErr", err) } - if atomic.LoadInt32(&calls) != 2 { - t.Fatalf("upstream calls = %d, want 2", atomic.LoadInt32(&calls)) + if status.StatusCode() != http.StatusTooManyRequests { + t.Fatalf("Execute() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests) } - if atomic.LoadInt32(&refresherCalls) != 1 { - t.Fatalf("refresher calls = %d, want 1", atomic.LoadInt32(&refresherCalls)) + if atomic.LoadInt32(&calls) != 1 { + t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls)) + } + if atomic.LoadInt32(&refresherCalls) != 0 { + t.Fatalf("refresher calls = %d, want 0", atomic.LoadInt32(&refresherCalls)) } } -func TestQwenExecutorExecuteStream_429RefreshAndRetry(t *testing.T) { +func TestQwenExecutorExecuteStream_429DoesNotRefreshOrRetry(t *testing.T) { qwenRateLimiter.Lock() qwenRateLimiter.requests = make(map[string][]time.Time) qwenRateLimiter.Unlock() @@ -351,32 +355,26 @@ func TestQwenExecutorExecuteStream_429RefreshAndRetry(t *testing.T) { } ctx := context.Background() - stream, err := exec.ExecuteStream(ctx, auth, cliproxyexecutor.Request{ + _, err := exec.ExecuteStream(ctx, auth, cliproxyexecutor.Request{ Model: "qwen-max", Payload: []byte(`{"model":"qwen-max","stream":true,"messages":[{"role":"user","content":"hi"}]}`), }, cliproxyexecutor.Options{ SourceFormat: sdktranslator.FromString("openai"), }) - if err != nil { - t.Fatalf("ExecuteStream() error = %v", err) + if err == nil { + t.Fatalf("ExecuteStream() expected error, got nil") } - if atomic.LoadInt32(&calls) != 2 { - t.Fatalf("upstream calls = %d, want 2", atomic.LoadInt32(&calls)) + status, ok := err.(statusErr) + if !ok { + t.Fatalf("ExecuteStream() error type = %T, want statusErr", err) } - if atomic.LoadInt32(&refresherCalls) != 1 { - t.Fatalf("refresher calls = %d, want 1", atomic.LoadInt32(&refresherCalls)) + if status.StatusCode() != http.StatusTooManyRequests { + t.Fatalf("ExecuteStream() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests) } - - var sawPayload bool - for chunk := range stream.Chunks { - if chunk.Err != nil { - t.Fatalf("stream chunk error = %v", chunk.Err) - } - if len(chunk.Payload) > 0 { - sawPayload = true - } + if atomic.LoadInt32(&calls) != 1 { + t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls)) } - if !sawPayload { - t.Fatalf("stream did not produce any payload chunks") + if atomic.LoadInt32(&refresherCalls) != 0 { + t.Fatalf("refresher calls = %d, want 0", atomic.LoadInt32(&refresherCalls)) } }