diff --git a/internal/runtime/executor/qwen_executor.go b/internal/runtime/executor/qwen_executor.go index 146be5c1..07ad0b3b 100644 --- a/internal/runtime/executor/qwen_executor.go +++ b/internal/runtime/executor/qwen_executor.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "strconv" "strings" "sync" "time" @@ -153,6 +154,40 @@ func wrapQwenError(ctx context.Context, httpCode int, body []byte) (errCode int, return errCode, retryAfter } +func qwenDisableCooling(cfg *config.Config, auth *cliproxyauth.Auth) bool { + if auth != nil { + if override, ok := auth.DisableCoolingOverride(); ok { + return override + } + } + if cfg == nil { + return false + } + return cfg.DisableCooling +} + +func parseRetryAfterHeader(header http.Header, now time.Time) *time.Duration { + raw := strings.TrimSpace(header.Get("Retry-After")) + if raw == "" { + return nil + } + if seconds, err := strconv.Atoi(raw); err == nil { + if seconds <= 0 { + return nil + } + d := time.Duration(seconds) * time.Second + return &d + } + if at, err := http.ParseTime(raw); err == nil { + if !at.After(now) { + return nil + } + d := at.Sub(now) + return &d + } + return nil +} + // ensureQwenSystemMessage ensures the request has a single system message at the beginning. // It always injects the default system prompt and merges any user-provided system messages // into the injected system message content to satisfy Qwen's strict message ordering rules. @@ -384,6 +419,13 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req } errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b) + if errCode == http.StatusTooManyRequests && retryAfter == nil { + retryAfter = parseRetryAfterHeader(httpResp.Header, time.Now()) + } + if errCode == http.StatusTooManyRequests && retryAfter == nil && qwenDisableCooling(e.cfg, auth) && isQwenQuotaError(b) { + defaultRetryAfter := time.Second + retryAfter = &defaultRetryAfter + } helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter} @@ -511,6 +553,13 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut } errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b) + if errCode == http.StatusTooManyRequests && retryAfter == nil { + retryAfter = parseRetryAfterHeader(httpResp.Header, time.Now()) + } + if errCode == http.StatusTooManyRequests && retryAfter == nil && qwenDisableCooling(e.cfg, auth) && isQwenQuotaError(b) { + defaultRetryAfter := time.Second + retryAfter = &defaultRetryAfter + } helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter} diff --git a/internal/runtime/executor/qwen_executor_test.go b/internal/runtime/executor/qwen_executor_test.go index f6363f66..f19cc8ca 100644 --- a/internal/runtime/executor/qwen_executor_test.go +++ b/internal/runtime/executor/qwen_executor_test.go @@ -378,3 +378,237 @@ func TestQwenExecutorExecuteStream_429DoesNotRefreshOrRetry(t *testing.T) { t.Fatalf("refresher calls = %d, want 0", atomic.LoadInt32(&refresherCalls)) } } + +func TestQwenExecutorExecute_429RetryAfterHeaderPropagatesToStatusErr(t *testing.T) { + qwenRateLimiter.Lock() + qwenRateLimiter.requests = make(map[string][]time.Time) + qwenRateLimiter.Unlock() + + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&calls, 1) + if r.URL.Path != "/v1/chat/completions" { + w.WriteHeader(http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Retry-After", "2") + w.WriteHeader(http.StatusTooManyRequests) + _, _ = w.Write([]byte(`{"error":{"code":"rate_limit_exceeded","message":"rate limited","type":"rate_limit_exceeded"}}`)) + })) + defer srv.Close() + + exec := NewQwenExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + ID: "auth-test", + Provider: "qwen", + Attributes: map[string]string{ + "base_url": srv.URL + "/v1", + }, + Metadata: map[string]any{ + "access_token": "test-token", + }, + } + ctx := context.Background() + + _, err := exec.Execute(ctx, auth, cliproxyexecutor.Request{ + Model: "qwen-max", + Payload: []byte(`{"model":"qwen-max","messages":[{"role":"user","content":"hi"}]}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai"), + }) + if err == nil { + t.Fatalf("Execute() expected error, got nil") + } + status, ok := err.(statusErr) + if !ok { + t.Fatalf("Execute() error type = %T, want statusErr", err) + } + if status.StatusCode() != http.StatusTooManyRequests { + t.Fatalf("Execute() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests) + } + if status.RetryAfter() == nil { + t.Fatalf("Execute() RetryAfter is nil, want non-nil") + } + if got := *status.RetryAfter(); got != 2*time.Second { + t.Fatalf("Execute() RetryAfter = %v, want %v", got, 2*time.Second) + } + if atomic.LoadInt32(&calls) != 1 { + t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls)) + } +} + +func TestQwenExecutorExecuteStream_429RetryAfterHeaderPropagatesToStatusErr(t *testing.T) { + qwenRateLimiter.Lock() + qwenRateLimiter.requests = make(map[string][]time.Time) + qwenRateLimiter.Unlock() + + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&calls, 1) + if r.URL.Path != "/v1/chat/completions" { + w.WriteHeader(http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Retry-After", "2") + w.WriteHeader(http.StatusTooManyRequests) + _, _ = w.Write([]byte(`{"error":{"code":"rate_limit_exceeded","message":"rate limited","type":"rate_limit_exceeded"}}`)) + })) + defer srv.Close() + + exec := NewQwenExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + ID: "auth-test", + Provider: "qwen", + Attributes: map[string]string{ + "base_url": srv.URL + "/v1", + }, + Metadata: map[string]any{ + "access_token": "test-token", + }, + } + ctx := context.Background() + + _, err := exec.ExecuteStream(ctx, auth, cliproxyexecutor.Request{ + Model: "qwen-max", + Payload: []byte(`{"model":"qwen-max","stream":true,"messages":[{"role":"user","content":"hi"}]}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai"), + }) + if err == nil { + t.Fatalf("ExecuteStream() expected error, got nil") + } + status, ok := err.(statusErr) + if !ok { + t.Fatalf("ExecuteStream() error type = %T, want statusErr", err) + } + if status.StatusCode() != http.StatusTooManyRequests { + t.Fatalf("ExecuteStream() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests) + } + if status.RetryAfter() == nil { + t.Fatalf("ExecuteStream() RetryAfter is nil, want non-nil") + } + if got := *status.RetryAfter(); got != 2*time.Second { + t.Fatalf("ExecuteStream() RetryAfter = %v, want %v", got, 2*time.Second) + } + if atomic.LoadInt32(&calls) != 1 { + t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls)) + } +} + +func TestQwenExecutorExecute_429QuotaExhausted_DisableCoolingSetsDefaultRetryAfter(t *testing.T) { + qwenRateLimiter.Lock() + qwenRateLimiter.requests = make(map[string][]time.Time) + qwenRateLimiter.Unlock() + + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&calls, 1) + if r.URL.Path != "/v1/chat/completions" { + w.WriteHeader(http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusTooManyRequests) + _, _ = w.Write([]byte(`{"error":{"code":"quota_exceeded","message":"quota exceeded","type":"quota_exceeded"}}`)) + })) + defer srv.Close() + + exec := NewQwenExecutor(&config.Config{DisableCooling: true}) + auth := &cliproxyauth.Auth{ + ID: "auth-test", + Provider: "qwen", + Attributes: map[string]string{ + "base_url": srv.URL + "/v1", + }, + Metadata: map[string]any{ + "access_token": "test-token", + }, + } + ctx := context.Background() + + _, err := exec.Execute(ctx, auth, cliproxyexecutor.Request{ + Model: "qwen-max", + Payload: []byte(`{"model":"qwen-max","messages":[{"role":"user","content":"hi"}]}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai"), + }) + if err == nil { + t.Fatalf("Execute() expected error, got nil") + } + status, ok := err.(statusErr) + if !ok { + t.Fatalf("Execute() error type = %T, want statusErr", err) + } + if status.StatusCode() != http.StatusTooManyRequests { + t.Fatalf("Execute() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests) + } + if status.RetryAfter() == nil { + t.Fatalf("Execute() RetryAfter is nil, want non-nil") + } + if got := *status.RetryAfter(); got != time.Second { + t.Fatalf("Execute() RetryAfter = %v, want %v", got, time.Second) + } + if atomic.LoadInt32(&calls) != 1 { + t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls)) + } +} + +func TestQwenExecutorExecuteStream_429QuotaExhausted_DisableCoolingSetsDefaultRetryAfter(t *testing.T) { + qwenRateLimiter.Lock() + qwenRateLimiter.requests = make(map[string][]time.Time) + qwenRateLimiter.Unlock() + + var calls int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&calls, 1) + if r.URL.Path != "/v1/chat/completions" { + w.WriteHeader(http.StatusNotFound) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusTooManyRequests) + _, _ = w.Write([]byte(`{"error":{"code":"quota_exceeded","message":"quota exceeded","type":"quota_exceeded"}}`)) + })) + defer srv.Close() + + exec := NewQwenExecutor(&config.Config{DisableCooling: true}) + auth := &cliproxyauth.Auth{ + ID: "auth-test", + Provider: "qwen", + Attributes: map[string]string{ + "base_url": srv.URL + "/v1", + }, + Metadata: map[string]any{ + "access_token": "test-token", + }, + } + ctx := context.Background() + + _, err := exec.ExecuteStream(ctx, auth, cliproxyexecutor.Request{ + Model: "qwen-max", + Payload: []byte(`{"model":"qwen-max","stream":true,"messages":[{"role":"user","content":"hi"}]}`), + }, cliproxyexecutor.Options{ + SourceFormat: sdktranslator.FromString("openai"), + }) + if err == nil { + t.Fatalf("ExecuteStream() expected error, got nil") + } + status, ok := err.(statusErr) + if !ok { + t.Fatalf("ExecuteStream() error type = %T, want statusErr", err) + } + if status.StatusCode() != http.StatusTooManyRequests { + t.Fatalf("ExecuteStream() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests) + } + if status.RetryAfter() == nil { + t.Fatalf("ExecuteStream() RetryAfter is nil, want non-nil") + } + if got := *status.RetryAfter(); got != time.Second { + t.Fatalf("ExecuteStream() RetryAfter = %v, want %v", got, time.Second) + } + if atomic.LoadInt32(&calls) != 1 { + t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls)) + } +}