mirror of
https://github.com/router-for-me/CLIProxyAPIPlus.git
synced 2026-04-12 09:14:15 +00:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1d8e68ad15 | ||
|
|
0ab1f5412f | ||
|
|
9ded75d335 | ||
|
|
f135fdf7fc | ||
|
|
828df80088 | ||
|
|
c585caa0ce | ||
|
|
5bb69fa4ab |
@@ -7,6 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -153,15 +154,38 @@ func wrapQwenError(ctx context.Context, httpCode int, body []byte) (errCode int,
|
|||||||
return errCode, retryAfter
|
return errCode, retryAfter
|
||||||
}
|
}
|
||||||
|
|
||||||
func qwenShouldAttemptImmediateRefreshRetry(auth *cliproxyauth.Auth) bool {
|
func qwenDisableCooling(cfg *config.Config, auth *cliproxyauth.Auth) bool {
|
||||||
if auth == nil || auth.Metadata == nil {
|
if auth != nil {
|
||||||
|
if override, ok := auth.DisableCoolingOverride(); ok {
|
||||||
|
return override
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if cfg == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if provider := strings.TrimSpace(auth.Provider); provider != "" && !strings.EqualFold(provider, "qwen") {
|
return cfg.DisableCooling
|
||||||
return false
|
}
|
||||||
|
|
||||||
|
func parseRetryAfterHeader(header http.Header, now time.Time) *time.Duration {
|
||||||
|
raw := strings.TrimSpace(header.Get("Retry-After"))
|
||||||
|
if raw == "" {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
refreshToken, _ := auth.Metadata["refresh_token"].(string)
|
if seconds, err := strconv.Atoi(raw); err == nil {
|
||||||
return strings.TrimSpace(refreshToken) != ""
|
if seconds <= 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
d := time.Duration(seconds) * time.Second
|
||||||
|
return &d
|
||||||
|
}
|
||||||
|
if at, err := http.ParseTime(raw); err == nil {
|
||||||
|
if !at.After(now) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
d := at.Sub(now)
|
||||||
|
return &d
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensureQwenSystemMessage ensures the request has a single system message at the beginning.
|
// ensureQwenSystemMessage ensures the request has a single system message at the beginning.
|
||||||
@@ -340,7 +364,6 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
|
|||||||
return resp, err
|
return resp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
qwenImmediateRetryAttempted := false
|
|
||||||
for {
|
for {
|
||||||
if errRate := checkQwenRateLimit(authID); errRate != nil {
|
if errRate := checkQwenRateLimit(authID); errRate != nil {
|
||||||
helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
|
helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
|
||||||
@@ -396,27 +419,14 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
|
|||||||
}
|
}
|
||||||
|
|
||||||
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
|
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
|
||||||
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
if errCode == http.StatusTooManyRequests && retryAfter == nil {
|
||||||
|
retryAfter = parseRetryAfterHeader(httpResp.Header, time.Now())
|
||||||
if errCode == http.StatusTooManyRequests && !qwenImmediateRetryAttempted && qwenShouldAttemptImmediateRefreshRetry(auth) {
|
|
||||||
helps.LogWithRequestID(ctx).WithFields(log.Fields{
|
|
||||||
"auth_id": redactAuthID(authID),
|
|
||||||
"model": req.Model,
|
|
||||||
}).Info("qwen 429 encountered, refreshing token for immediate retry")
|
|
||||||
|
|
||||||
qwenImmediateRetryAttempted = true
|
|
||||||
refreshFn := e.refreshForImmediateRetry
|
|
||||||
if refreshFn == nil {
|
|
||||||
refreshFn = e.Refresh
|
|
||||||
}
|
|
||||||
refreshedAuth, errRefresh := refreshFn(ctx, auth)
|
|
||||||
if errRefresh != nil {
|
|
||||||
helps.LogWithRequestID(ctx).WithError(errRefresh).WithField("auth_id", redactAuthID(authID)).Warn("qwen 429 refresh failed; skipping immediate retry")
|
|
||||||
} else if refreshedAuth != nil {
|
|
||||||
auth = refreshedAuth
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if errCode == http.StatusTooManyRequests && retryAfter == nil && qwenDisableCooling(e.cfg, auth) && isQwenQuotaError(b) {
|
||||||
|
defaultRetryAfter := time.Second
|
||||||
|
retryAfter = &defaultRetryAfter
|
||||||
|
}
|
||||||
|
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||||
|
|
||||||
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
|
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
|
||||||
return resp, err
|
return resp, err
|
||||||
@@ -488,7 +498,6 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
qwenImmediateRetryAttempted := false
|
|
||||||
for {
|
for {
|
||||||
if errRate := checkQwenRateLimit(authID); errRate != nil {
|
if errRate := checkQwenRateLimit(authID); errRate != nil {
|
||||||
helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
|
helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
|
||||||
@@ -544,27 +553,14 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
|||||||
}
|
}
|
||||||
|
|
||||||
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
|
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
|
||||||
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
if errCode == http.StatusTooManyRequests && retryAfter == nil {
|
||||||
|
retryAfter = parseRetryAfterHeader(httpResp.Header, time.Now())
|
||||||
if errCode == http.StatusTooManyRequests && !qwenImmediateRetryAttempted && qwenShouldAttemptImmediateRefreshRetry(auth) {
|
|
||||||
helps.LogWithRequestID(ctx).WithFields(log.Fields{
|
|
||||||
"auth_id": redactAuthID(authID),
|
|
||||||
"model": req.Model,
|
|
||||||
}).Info("qwen 429 encountered, refreshing token for immediate retry (stream)")
|
|
||||||
|
|
||||||
qwenImmediateRetryAttempted = true
|
|
||||||
refreshFn := e.refreshForImmediateRetry
|
|
||||||
if refreshFn == nil {
|
|
||||||
refreshFn = e.Refresh
|
|
||||||
}
|
|
||||||
refreshedAuth, errRefresh := refreshFn(ctx, auth)
|
|
||||||
if errRefresh != nil {
|
|
||||||
helps.LogWithRequestID(ctx).WithError(errRefresh).WithField("auth_id", redactAuthID(authID)).Warn("qwen 429 refresh failed; skipping immediate retry (stream)")
|
|
||||||
} else if refreshedAuth != nil {
|
|
||||||
auth = refreshedAuth
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if errCode == http.StatusTooManyRequests && retryAfter == nil && qwenDisableCooling(e.cfg, auth) && isQwenQuotaError(b) {
|
||||||
|
defaultRetryAfter := time.Second
|
||||||
|
retryAfter = &defaultRetryAfter
|
||||||
|
}
|
||||||
|
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||||
|
|
||||||
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
|
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -216,7 +216,7 @@ func TestQwenCreds_NormalizesResourceURL(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestQwenExecutorExecute_429RefreshAndRetry(t *testing.T) {
|
func TestQwenExecutorExecute_429DoesNotRefreshOrRetry(t *testing.T) {
|
||||||
qwenRateLimiter.Lock()
|
qwenRateLimiter.Lock()
|
||||||
qwenRateLimiter.requests = make(map[string][]time.Time)
|
qwenRateLimiter.requests = make(map[string][]time.Time)
|
||||||
qwenRateLimiter.Unlock()
|
qwenRateLimiter.Unlock()
|
||||||
@@ -272,27 +272,31 @@ func TestQwenExecutorExecute_429RefreshAndRetry(t *testing.T) {
|
|||||||
}
|
}
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
resp, err := exec.Execute(ctx, auth, cliproxyexecutor.Request{
|
_, err := exec.Execute(ctx, auth, cliproxyexecutor.Request{
|
||||||
Model: "qwen-max",
|
Model: "qwen-max",
|
||||||
Payload: []byte(`{"model":"qwen-max","messages":[{"role":"user","content":"hi"}]}`),
|
Payload: []byte(`{"model":"qwen-max","messages":[{"role":"user","content":"hi"}]}`),
|
||||||
}, cliproxyexecutor.Options{
|
}, cliproxyexecutor.Options{
|
||||||
SourceFormat: sdktranslator.FromString("openai"),
|
SourceFormat: sdktranslator.FromString("openai"),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err == nil {
|
||||||
t.Fatalf("Execute() error = %v", err)
|
t.Fatalf("Execute() expected error, got nil")
|
||||||
}
|
}
|
||||||
if len(resp.Payload) == 0 {
|
status, ok := err.(statusErr)
|
||||||
t.Fatalf("Execute() payload is empty")
|
if !ok {
|
||||||
|
t.Fatalf("Execute() error type = %T, want statusErr", err)
|
||||||
}
|
}
|
||||||
if atomic.LoadInt32(&calls) != 2 {
|
if status.StatusCode() != http.StatusTooManyRequests {
|
||||||
t.Fatalf("upstream calls = %d, want 2", atomic.LoadInt32(&calls))
|
t.Fatalf("Execute() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests)
|
||||||
}
|
}
|
||||||
if atomic.LoadInt32(&refresherCalls) != 1 {
|
if atomic.LoadInt32(&calls) != 1 {
|
||||||
t.Fatalf("refresher calls = %d, want 1", atomic.LoadInt32(&refresherCalls))
|
t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls))
|
||||||
|
}
|
||||||
|
if atomic.LoadInt32(&refresherCalls) != 0 {
|
||||||
|
t.Fatalf("refresher calls = %d, want 0", atomic.LoadInt32(&refresherCalls))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestQwenExecutorExecuteStream_429RefreshAndRetry(t *testing.T) {
|
func TestQwenExecutorExecuteStream_429DoesNotRefreshOrRetry(t *testing.T) {
|
||||||
qwenRateLimiter.Lock()
|
qwenRateLimiter.Lock()
|
||||||
qwenRateLimiter.requests = make(map[string][]time.Time)
|
qwenRateLimiter.requests = make(map[string][]time.Time)
|
||||||
qwenRateLimiter.Unlock()
|
qwenRateLimiter.Unlock()
|
||||||
@@ -351,32 +355,260 @@ func TestQwenExecutorExecuteStream_429RefreshAndRetry(t *testing.T) {
|
|||||||
}
|
}
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
stream, err := exec.ExecuteStream(ctx, auth, cliproxyexecutor.Request{
|
_, err := exec.ExecuteStream(ctx, auth, cliproxyexecutor.Request{
|
||||||
Model: "qwen-max",
|
Model: "qwen-max",
|
||||||
Payload: []byte(`{"model":"qwen-max","stream":true,"messages":[{"role":"user","content":"hi"}]}`),
|
Payload: []byte(`{"model":"qwen-max","stream":true,"messages":[{"role":"user","content":"hi"}]}`),
|
||||||
}, cliproxyexecutor.Options{
|
}, cliproxyexecutor.Options{
|
||||||
SourceFormat: sdktranslator.FromString("openai"),
|
SourceFormat: sdktranslator.FromString("openai"),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err == nil {
|
||||||
t.Fatalf("ExecuteStream() error = %v", err)
|
t.Fatalf("ExecuteStream() expected error, got nil")
|
||||||
}
|
}
|
||||||
if atomic.LoadInt32(&calls) != 2 {
|
status, ok := err.(statusErr)
|
||||||
t.Fatalf("upstream calls = %d, want 2", atomic.LoadInt32(&calls))
|
if !ok {
|
||||||
|
t.Fatalf("ExecuteStream() error type = %T, want statusErr", err)
|
||||||
}
|
}
|
||||||
if atomic.LoadInt32(&refresherCalls) != 1 {
|
if status.StatusCode() != http.StatusTooManyRequests {
|
||||||
t.Fatalf("refresher calls = %d, want 1", atomic.LoadInt32(&refresherCalls))
|
t.Fatalf("ExecuteStream() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests)
|
||||||
}
|
}
|
||||||
|
if atomic.LoadInt32(&calls) != 1 {
|
||||||
var sawPayload bool
|
t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls))
|
||||||
for chunk := range stream.Chunks {
|
|
||||||
if chunk.Err != nil {
|
|
||||||
t.Fatalf("stream chunk error = %v", chunk.Err)
|
|
||||||
}
|
|
||||||
if len(chunk.Payload) > 0 {
|
|
||||||
sawPayload = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if !sawPayload {
|
if atomic.LoadInt32(&refresherCalls) != 0 {
|
||||||
t.Fatalf("stream did not produce any payload chunks")
|
t.Fatalf("refresher calls = %d, want 0", atomic.LoadInt32(&refresherCalls))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestQwenExecutorExecute_429RetryAfterHeaderPropagatesToStatusErr(t *testing.T) {
|
||||||
|
qwenRateLimiter.Lock()
|
||||||
|
qwenRateLimiter.requests = make(map[string][]time.Time)
|
||||||
|
qwenRateLimiter.Unlock()
|
||||||
|
|
||||||
|
var calls int32
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
atomic.AddInt32(&calls, 1)
|
||||||
|
if r.URL.Path != "/v1/chat/completions" {
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.Header().Set("Retry-After", "2")
|
||||||
|
w.WriteHeader(http.StatusTooManyRequests)
|
||||||
|
_, _ = w.Write([]byte(`{"error":{"code":"rate_limit_exceeded","message":"rate limited","type":"rate_limit_exceeded"}}`))
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
exec := NewQwenExecutor(&config.Config{})
|
||||||
|
auth := &cliproxyauth.Auth{
|
||||||
|
ID: "auth-test",
|
||||||
|
Provider: "qwen",
|
||||||
|
Attributes: map[string]string{
|
||||||
|
"base_url": srv.URL + "/v1",
|
||||||
|
},
|
||||||
|
Metadata: map[string]any{
|
||||||
|
"access_token": "test-token",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
_, err := exec.Execute(ctx, auth, cliproxyexecutor.Request{
|
||||||
|
Model: "qwen-max",
|
||||||
|
Payload: []byte(`{"model":"qwen-max","messages":[{"role":"user","content":"hi"}]}`),
|
||||||
|
}, cliproxyexecutor.Options{
|
||||||
|
SourceFormat: sdktranslator.FromString("openai"),
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Execute() expected error, got nil")
|
||||||
|
}
|
||||||
|
status, ok := err.(statusErr)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Execute() error type = %T, want statusErr", err)
|
||||||
|
}
|
||||||
|
if status.StatusCode() != http.StatusTooManyRequests {
|
||||||
|
t.Fatalf("Execute() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests)
|
||||||
|
}
|
||||||
|
if status.RetryAfter() == nil {
|
||||||
|
t.Fatalf("Execute() RetryAfter is nil, want non-nil")
|
||||||
|
}
|
||||||
|
if got := *status.RetryAfter(); got != 2*time.Second {
|
||||||
|
t.Fatalf("Execute() RetryAfter = %v, want %v", got, 2*time.Second)
|
||||||
|
}
|
||||||
|
if atomic.LoadInt32(&calls) != 1 {
|
||||||
|
t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestQwenExecutorExecuteStream_429RetryAfterHeaderPropagatesToStatusErr(t *testing.T) {
|
||||||
|
qwenRateLimiter.Lock()
|
||||||
|
qwenRateLimiter.requests = make(map[string][]time.Time)
|
||||||
|
qwenRateLimiter.Unlock()
|
||||||
|
|
||||||
|
var calls int32
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
atomic.AddInt32(&calls, 1)
|
||||||
|
if r.URL.Path != "/v1/chat/completions" {
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.Header().Set("Retry-After", "2")
|
||||||
|
w.WriteHeader(http.StatusTooManyRequests)
|
||||||
|
_, _ = w.Write([]byte(`{"error":{"code":"rate_limit_exceeded","message":"rate limited","type":"rate_limit_exceeded"}}`))
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
exec := NewQwenExecutor(&config.Config{})
|
||||||
|
auth := &cliproxyauth.Auth{
|
||||||
|
ID: "auth-test",
|
||||||
|
Provider: "qwen",
|
||||||
|
Attributes: map[string]string{
|
||||||
|
"base_url": srv.URL + "/v1",
|
||||||
|
},
|
||||||
|
Metadata: map[string]any{
|
||||||
|
"access_token": "test-token",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
_, err := exec.ExecuteStream(ctx, auth, cliproxyexecutor.Request{
|
||||||
|
Model: "qwen-max",
|
||||||
|
Payload: []byte(`{"model":"qwen-max","stream":true,"messages":[{"role":"user","content":"hi"}]}`),
|
||||||
|
}, cliproxyexecutor.Options{
|
||||||
|
SourceFormat: sdktranslator.FromString("openai"),
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("ExecuteStream() expected error, got nil")
|
||||||
|
}
|
||||||
|
status, ok := err.(statusErr)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("ExecuteStream() error type = %T, want statusErr", err)
|
||||||
|
}
|
||||||
|
if status.StatusCode() != http.StatusTooManyRequests {
|
||||||
|
t.Fatalf("ExecuteStream() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests)
|
||||||
|
}
|
||||||
|
if status.RetryAfter() == nil {
|
||||||
|
t.Fatalf("ExecuteStream() RetryAfter is nil, want non-nil")
|
||||||
|
}
|
||||||
|
if got := *status.RetryAfter(); got != 2*time.Second {
|
||||||
|
t.Fatalf("ExecuteStream() RetryAfter = %v, want %v", got, 2*time.Second)
|
||||||
|
}
|
||||||
|
if atomic.LoadInt32(&calls) != 1 {
|
||||||
|
t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestQwenExecutorExecute_429QuotaExhausted_DisableCoolingSetsDefaultRetryAfter(t *testing.T) {
|
||||||
|
qwenRateLimiter.Lock()
|
||||||
|
qwenRateLimiter.requests = make(map[string][]time.Time)
|
||||||
|
qwenRateLimiter.Unlock()
|
||||||
|
|
||||||
|
var calls int32
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
atomic.AddInt32(&calls, 1)
|
||||||
|
if r.URL.Path != "/v1/chat/completions" {
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.WriteHeader(http.StatusTooManyRequests)
|
||||||
|
_, _ = w.Write([]byte(`{"error":{"code":"quota_exceeded","message":"quota exceeded","type":"quota_exceeded"}}`))
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
exec := NewQwenExecutor(&config.Config{DisableCooling: true})
|
||||||
|
auth := &cliproxyauth.Auth{
|
||||||
|
ID: "auth-test",
|
||||||
|
Provider: "qwen",
|
||||||
|
Attributes: map[string]string{
|
||||||
|
"base_url": srv.URL + "/v1",
|
||||||
|
},
|
||||||
|
Metadata: map[string]any{
|
||||||
|
"access_token": "test-token",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
_, err := exec.Execute(ctx, auth, cliproxyexecutor.Request{
|
||||||
|
Model: "qwen-max",
|
||||||
|
Payload: []byte(`{"model":"qwen-max","messages":[{"role":"user","content":"hi"}]}`),
|
||||||
|
}, cliproxyexecutor.Options{
|
||||||
|
SourceFormat: sdktranslator.FromString("openai"),
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("Execute() expected error, got nil")
|
||||||
|
}
|
||||||
|
status, ok := err.(statusErr)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("Execute() error type = %T, want statusErr", err)
|
||||||
|
}
|
||||||
|
if status.StatusCode() != http.StatusTooManyRequests {
|
||||||
|
t.Fatalf("Execute() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests)
|
||||||
|
}
|
||||||
|
if status.RetryAfter() == nil {
|
||||||
|
t.Fatalf("Execute() RetryAfter is nil, want non-nil")
|
||||||
|
}
|
||||||
|
if got := *status.RetryAfter(); got != time.Second {
|
||||||
|
t.Fatalf("Execute() RetryAfter = %v, want %v", got, time.Second)
|
||||||
|
}
|
||||||
|
if atomic.LoadInt32(&calls) != 1 {
|
||||||
|
t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestQwenExecutorExecuteStream_429QuotaExhausted_DisableCoolingSetsDefaultRetryAfter(t *testing.T) {
|
||||||
|
qwenRateLimiter.Lock()
|
||||||
|
qwenRateLimiter.requests = make(map[string][]time.Time)
|
||||||
|
qwenRateLimiter.Unlock()
|
||||||
|
|
||||||
|
var calls int32
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
atomic.AddInt32(&calls, 1)
|
||||||
|
if r.URL.Path != "/v1/chat/completions" {
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.WriteHeader(http.StatusTooManyRequests)
|
||||||
|
_, _ = w.Write([]byte(`{"error":{"code":"quota_exceeded","message":"quota exceeded","type":"quota_exceeded"}}`))
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
exec := NewQwenExecutor(&config.Config{DisableCooling: true})
|
||||||
|
auth := &cliproxyauth.Auth{
|
||||||
|
ID: "auth-test",
|
||||||
|
Provider: "qwen",
|
||||||
|
Attributes: map[string]string{
|
||||||
|
"base_url": srv.URL + "/v1",
|
||||||
|
},
|
||||||
|
Metadata: map[string]any{
|
||||||
|
"access_token": "test-token",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
_, err := exec.ExecuteStream(ctx, auth, cliproxyexecutor.Request{
|
||||||
|
Model: "qwen-max",
|
||||||
|
Payload: []byte(`{"model":"qwen-max","stream":true,"messages":[{"role":"user","content":"hi"}]}`),
|
||||||
|
}, cliproxyexecutor.Options{
|
||||||
|
SourceFormat: sdktranslator.FromString("openai"),
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("ExecuteStream() expected error, got nil")
|
||||||
|
}
|
||||||
|
status, ok := err.(statusErr)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("ExecuteStream() error type = %T, want statusErr", err)
|
||||||
|
}
|
||||||
|
if status.StatusCode() != http.StatusTooManyRequests {
|
||||||
|
t.Fatalf("ExecuteStream() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests)
|
||||||
|
}
|
||||||
|
if status.RetryAfter() == nil {
|
||||||
|
t.Fatalf("ExecuteStream() RetryAfter is nil, want non-nil")
|
||||||
|
}
|
||||||
|
if got := *status.RetryAfter(); got != time.Second {
|
||||||
|
t.Fatalf("ExecuteStream() RetryAfter = %v, want %v", got, time.Second)
|
||||||
|
}
|
||||||
|
if atomic.LoadInt32(&calls) != 1 {
|
||||||
|
t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user