mirror of
https://github.com/router-for-me/CLIProxyAPIPlus.git
synced 2026-03-09 15:25:17 +00:00
Merge remote-tracking branch 'upstream/dev' into dev
# Conflicts: # internal/runtime/executor/antigravity_executor.go
This commit is contained in:
@@ -55,8 +55,78 @@ const (
|
||||
var (
|
||||
randSource = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
randSourceMutex sync.Mutex
|
||||
// antigravityPrimaryModelsCache keeps the latest non-empty model list fetched
|
||||
// from any antigravity auth. Empty fetches never overwrite this cache.
|
||||
antigravityPrimaryModelsCache struct {
|
||||
mu sync.RWMutex
|
||||
models []*registry.ModelInfo
|
||||
}
|
||||
)
|
||||
|
||||
func cloneAntigravityModels(models []*registry.ModelInfo) []*registry.ModelInfo {
|
||||
if len(models) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := make([]*registry.ModelInfo, 0, len(models))
|
||||
for _, model := range models {
|
||||
if model == nil || strings.TrimSpace(model.ID) == "" {
|
||||
continue
|
||||
}
|
||||
out = append(out, cloneAntigravityModelInfo(model))
|
||||
}
|
||||
if len(out) == 0 {
|
||||
return nil
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func cloneAntigravityModelInfo(model *registry.ModelInfo) *registry.ModelInfo {
|
||||
if model == nil {
|
||||
return nil
|
||||
}
|
||||
clone := *model
|
||||
if len(model.SupportedGenerationMethods) > 0 {
|
||||
clone.SupportedGenerationMethods = append([]string(nil), model.SupportedGenerationMethods...)
|
||||
}
|
||||
if len(model.SupportedParameters) > 0 {
|
||||
clone.SupportedParameters = append([]string(nil), model.SupportedParameters...)
|
||||
}
|
||||
if model.Thinking != nil {
|
||||
thinkingClone := *model.Thinking
|
||||
if len(model.Thinking.Levels) > 0 {
|
||||
thinkingClone.Levels = append([]string(nil), model.Thinking.Levels...)
|
||||
}
|
||||
clone.Thinking = &thinkingClone
|
||||
}
|
||||
return &clone
|
||||
}
|
||||
|
||||
func storeAntigravityPrimaryModels(models []*registry.ModelInfo) bool {
|
||||
cloned := cloneAntigravityModels(models)
|
||||
if len(cloned) == 0 {
|
||||
return false
|
||||
}
|
||||
antigravityPrimaryModelsCache.mu.Lock()
|
||||
antigravityPrimaryModelsCache.models = cloned
|
||||
antigravityPrimaryModelsCache.mu.Unlock()
|
||||
return true
|
||||
}
|
||||
|
||||
func loadAntigravityPrimaryModels() []*registry.ModelInfo {
|
||||
antigravityPrimaryModelsCache.mu.RLock()
|
||||
cloned := cloneAntigravityModels(antigravityPrimaryModelsCache.models)
|
||||
antigravityPrimaryModelsCache.mu.RUnlock()
|
||||
return cloned
|
||||
}
|
||||
|
||||
func fallbackAntigravityPrimaryModels() []*registry.ModelInfo {
|
||||
models := loadAntigravityPrimaryModels()
|
||||
if len(models) > 0 {
|
||||
log.Debugf("antigravity executor: using cached primary model list (%d models)", len(models))
|
||||
}
|
||||
return models
|
||||
}
|
||||
|
||||
// AntigravityExecutor proxies requests to the antigravity upstream.
|
||||
type AntigravityExecutor struct {
|
||||
cfg *config.Config
|
||||
@@ -1072,7 +1142,7 @@ func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *c
|
||||
exec := &AntigravityExecutor{cfg: cfg}
|
||||
token, updatedAuth, errToken := exec.ensureAccessToken(ctx, auth)
|
||||
if errToken != nil || token == "" {
|
||||
return nil
|
||||
return fallbackAntigravityPrimaryModels()
|
||||
}
|
||||
if updatedAuth != nil {
|
||||
auth = updatedAuth
|
||||
@@ -1096,7 +1166,7 @@ func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *c
|
||||
|
||||
httpReq, errReq := http.NewRequestWithContext(ctx, http.MethodPost, modelsURL, bytes.NewReader(payload))
|
||||
if errReq != nil {
|
||||
return nil
|
||||
return fallbackAntigravityPrimaryModels()
|
||||
}
|
||||
httpReq.Close = true
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
@@ -1109,14 +1179,13 @@ func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *c
|
||||
httpResp, errDo := httpClient.Do(httpReq)
|
||||
if errDo != nil {
|
||||
if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) {
|
||||
return nil
|
||||
return fallbackAntigravityPrimaryModels()
|
||||
}
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: models request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
log.Errorf("antigravity executor: models request failed: %v", errDo)
|
||||
return nil
|
||||
return fallbackAntigravityPrimaryModels()
|
||||
}
|
||||
|
||||
bodyBytes, errRead := io.ReadAll(httpResp.Body)
|
||||
@@ -1128,21 +1197,27 @@ func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *c
|
||||
log.Debugf("antigravity executor: models read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
log.Errorf("antigravity executor: models read body failed: %v", errRead)
|
||||
return nil
|
||||
return fallbackAntigravityPrimaryModels()
|
||||
}
|
||||
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
|
||||
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: models request rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
log.Errorf("antigravity executor: models request error status %d: %s", httpResp.StatusCode, string(bodyBytes))
|
||||
return nil
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: models request failed with status %d on base url %s, retrying with fallback base url: %s", httpResp.StatusCode, baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
return fallbackAntigravityPrimaryModels()
|
||||
}
|
||||
|
||||
result := gjson.GetBytes(bodyBytes, "models")
|
||||
if !result.Exists() {
|
||||
return nil
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: models field missing on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
return fallbackAntigravityPrimaryModels()
|
||||
}
|
||||
|
||||
now := time.Now().Unix()
|
||||
@@ -1210,9 +1285,18 @@ func FetchAntigravityModels(ctx context.Context, auth *cliproxyauth.Auth, cfg *c
|
||||
}
|
||||
models = append(models, modelInfo)
|
||||
}
|
||||
if len(models) == 0 {
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: empty models list on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
log.Debug("antigravity executor: fetched empty model list; retaining cached primary model list")
|
||||
return fallbackAntigravityPrimaryModels()
|
||||
}
|
||||
storeAntigravityPrimaryModels(models)
|
||||
return models
|
||||
}
|
||||
return nil
|
||||
return fallbackAntigravityPrimaryModels()
|
||||
}
|
||||
|
||||
func (e *AntigravityExecutor) ensureAccessToken(ctx context.Context, auth *cliproxyauth.Auth) (string, *cliproxyauth.Auth, error) {
|
||||
|
||||
@@ -0,0 +1,90 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/registry"
|
||||
)
|
||||
|
||||
func resetAntigravityPrimaryModelsCacheForTest() {
|
||||
antigravityPrimaryModelsCache.mu.Lock()
|
||||
antigravityPrimaryModelsCache.models = nil
|
||||
antigravityPrimaryModelsCache.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestStoreAntigravityPrimaryModels_EmptyDoesNotOverwrite(t *testing.T) {
|
||||
resetAntigravityPrimaryModelsCacheForTest()
|
||||
t.Cleanup(resetAntigravityPrimaryModelsCacheForTest)
|
||||
|
||||
seed := []*registry.ModelInfo{
|
||||
{ID: "claude-sonnet-4-5"},
|
||||
{ID: "gemini-2.5-pro"},
|
||||
}
|
||||
if updated := storeAntigravityPrimaryModels(seed); !updated {
|
||||
t.Fatal("expected non-empty model list to update primary cache")
|
||||
}
|
||||
|
||||
if updated := storeAntigravityPrimaryModels(nil); updated {
|
||||
t.Fatal("expected nil model list not to overwrite primary cache")
|
||||
}
|
||||
if updated := storeAntigravityPrimaryModels([]*registry.ModelInfo{}); updated {
|
||||
t.Fatal("expected empty model list not to overwrite primary cache")
|
||||
}
|
||||
|
||||
got := loadAntigravityPrimaryModels()
|
||||
if len(got) != 2 {
|
||||
t.Fatalf("expected cached model count 2, got %d", len(got))
|
||||
}
|
||||
if got[0].ID != "claude-sonnet-4-5" || got[1].ID != "gemini-2.5-pro" {
|
||||
t.Fatalf("unexpected cached model ids: %q, %q", got[0].ID, got[1].ID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadAntigravityPrimaryModels_ReturnsClone(t *testing.T) {
|
||||
resetAntigravityPrimaryModelsCacheForTest()
|
||||
t.Cleanup(resetAntigravityPrimaryModelsCacheForTest)
|
||||
|
||||
if updated := storeAntigravityPrimaryModels([]*registry.ModelInfo{{
|
||||
ID: "gpt-5",
|
||||
DisplayName: "GPT-5",
|
||||
SupportedGenerationMethods: []string{"generateContent"},
|
||||
SupportedParameters: []string{"temperature"},
|
||||
Thinking: ®istry.ThinkingSupport{
|
||||
Levels: []string{"high"},
|
||||
},
|
||||
}}); !updated {
|
||||
t.Fatal("expected model cache update")
|
||||
}
|
||||
|
||||
got := loadAntigravityPrimaryModels()
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("expected one cached model, got %d", len(got))
|
||||
}
|
||||
got[0].ID = "mutated-id"
|
||||
if len(got[0].SupportedGenerationMethods) > 0 {
|
||||
got[0].SupportedGenerationMethods[0] = "mutated-method"
|
||||
}
|
||||
if len(got[0].SupportedParameters) > 0 {
|
||||
got[0].SupportedParameters[0] = "mutated-parameter"
|
||||
}
|
||||
if got[0].Thinking != nil && len(got[0].Thinking.Levels) > 0 {
|
||||
got[0].Thinking.Levels[0] = "mutated-level"
|
||||
}
|
||||
|
||||
again := loadAntigravityPrimaryModels()
|
||||
if len(again) != 1 {
|
||||
t.Fatalf("expected one cached model after mutation, got %d", len(again))
|
||||
}
|
||||
if again[0].ID != "gpt-5" {
|
||||
t.Fatalf("expected cached model id to remain %q, got %q", "gpt-5", again[0].ID)
|
||||
}
|
||||
if len(again[0].SupportedGenerationMethods) == 0 || again[0].SupportedGenerationMethods[0] != "generateContent" {
|
||||
t.Fatalf("expected cached generation methods to be unmutated, got %v", again[0].SupportedGenerationMethods)
|
||||
}
|
||||
if len(again[0].SupportedParameters) == 0 || again[0].SupportedParameters[0] != "temperature" {
|
||||
t.Fatalf("expected cached supported parameters to be unmutated, got %v", again[0].SupportedParameters)
|
||||
}
|
||||
if again[0].Thinking == nil || len(again[0].Thinking.Levels) == 0 || again[0].Thinking.Levels[0] != "high" {
|
||||
t.Fatalf("expected cached model thinking levels to be unmutated, got %v", again[0].Thinking)
|
||||
}
|
||||
}
|
||||
@@ -156,7 +156,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
err = newCodexStatusErr(httpResp.StatusCode, b)
|
||||
return resp, err
|
||||
}
|
||||
data, err := io.ReadAll(httpResp.Body)
|
||||
@@ -260,7 +260,7 @@ func (e *CodexExecutor) executeCompact(ctx context.Context, auth *cliproxyauth.A
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
err = newCodexStatusErr(httpResp.StatusCode, b)
|
||||
return resp, err
|
||||
}
|
||||
data, err := io.ReadAll(httpResp.Body)
|
||||
@@ -358,7 +358,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
|
||||
}
|
||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
|
||||
err = newCodexStatusErr(httpResp.StatusCode, data)
|
||||
return nil, err
|
||||
}
|
||||
out := make(chan cliproxyexecutor.StreamChunk)
|
||||
@@ -673,6 +673,35 @@ func applyCodexHeaders(r *http.Request, auth *cliproxyauth.Auth, token string, s
|
||||
util.ApplyCustomHeadersFromAttrs(r, attrs)
|
||||
}
|
||||
|
||||
func newCodexStatusErr(statusCode int, body []byte) statusErr {
|
||||
err := statusErr{code: statusCode, msg: string(body)}
|
||||
if retryAfter := parseCodexRetryAfter(statusCode, body, time.Now()); retryAfter != nil {
|
||||
err.retryAfter = retryAfter
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func parseCodexRetryAfter(statusCode int, errorBody []byte, now time.Time) *time.Duration {
|
||||
if statusCode != http.StatusTooManyRequests || len(errorBody) == 0 {
|
||||
return nil
|
||||
}
|
||||
if strings.TrimSpace(gjson.GetBytes(errorBody, "error.type").String()) != "usage_limit_reached" {
|
||||
return nil
|
||||
}
|
||||
if resetsAt := gjson.GetBytes(errorBody, "error.resets_at").Int(); resetsAt > 0 {
|
||||
resetAtTime := time.Unix(resetsAt, 0)
|
||||
if resetAtTime.After(now) {
|
||||
retryAfter := resetAtTime.Sub(now)
|
||||
return &retryAfter
|
||||
}
|
||||
}
|
||||
if resetsInSeconds := gjson.GetBytes(errorBody, "error.resets_in_seconds").Int(); resetsInSeconds > 0 {
|
||||
retryAfter := time.Duration(resetsInSeconds) * time.Second
|
||||
return &retryAfter
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func codexCreds(a *cliproxyauth.Auth) (apiKey, baseURL string) {
|
||||
if a == nil {
|
||||
return "", ""
|
||||
|
||||
65
internal/runtime/executor/codex_executor_retry_test.go
Normal file
65
internal/runtime/executor/codex_executor_retry_test.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package executor
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestParseCodexRetryAfter(t *testing.T) {
|
||||
now := time.Unix(1_700_000_000, 0)
|
||||
|
||||
t.Run("resets_in_seconds", func(t *testing.T) {
|
||||
body := []byte(`{"error":{"type":"usage_limit_reached","resets_in_seconds":123}}`)
|
||||
retryAfter := parseCodexRetryAfter(http.StatusTooManyRequests, body, now)
|
||||
if retryAfter == nil {
|
||||
t.Fatalf("expected retryAfter, got nil")
|
||||
}
|
||||
if *retryAfter != 123*time.Second {
|
||||
t.Fatalf("retryAfter = %v, want %v", *retryAfter, 123*time.Second)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("prefers resets_at", func(t *testing.T) {
|
||||
resetAt := now.Add(5 * time.Minute).Unix()
|
||||
body := []byte(`{"error":{"type":"usage_limit_reached","resets_at":` + itoa(resetAt) + `,"resets_in_seconds":1}}`)
|
||||
retryAfter := parseCodexRetryAfter(http.StatusTooManyRequests, body, now)
|
||||
if retryAfter == nil {
|
||||
t.Fatalf("expected retryAfter, got nil")
|
||||
}
|
||||
if *retryAfter != 5*time.Minute {
|
||||
t.Fatalf("retryAfter = %v, want %v", *retryAfter, 5*time.Minute)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("fallback when resets_at is past", func(t *testing.T) {
|
||||
resetAt := now.Add(-1 * time.Minute).Unix()
|
||||
body := []byte(`{"error":{"type":"usage_limit_reached","resets_at":` + itoa(resetAt) + `,"resets_in_seconds":77}}`)
|
||||
retryAfter := parseCodexRetryAfter(http.StatusTooManyRequests, body, now)
|
||||
if retryAfter == nil {
|
||||
t.Fatalf("expected retryAfter, got nil")
|
||||
}
|
||||
if *retryAfter != 77*time.Second {
|
||||
t.Fatalf("retryAfter = %v, want %v", *retryAfter, 77*time.Second)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("non-429 status code", func(t *testing.T) {
|
||||
body := []byte(`{"error":{"type":"usage_limit_reached","resets_in_seconds":30}}`)
|
||||
if got := parseCodexRetryAfter(http.StatusBadRequest, body, now); got != nil {
|
||||
t.Fatalf("expected nil for non-429, got %v", *got)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("non usage_limit_reached error type", func(t *testing.T) {
|
||||
body := []byte(`{"error":{"type":"server_error","resets_in_seconds":30}}`)
|
||||
if got := parseCodexRetryAfter(http.StatusTooManyRequests, body, now); got != nil {
|
||||
t.Fatalf("expected nil for non-usage_limit_reached, got %v", *got)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func itoa(v int64) string {
|
||||
return strconv.FormatInt(v, 10)
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
qwenauth "github.com/router-for-me/CLIProxyAPI/v6/internal/auth/qwen"
|
||||
@@ -22,9 +23,151 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
qwenUserAgent = "QwenCode/0.10.3 (darwin; arm64)"
|
||||
qwenUserAgent = "QwenCode/0.10.3 (darwin; arm64)"
|
||||
qwenRateLimitPerMin = 60 // 60 requests per minute per credential
|
||||
qwenRateLimitWindow = time.Minute // sliding window duration
|
||||
)
|
||||
|
||||
// qwenBeijingLoc caches the Beijing timezone to avoid repeated LoadLocation syscalls.
|
||||
var qwenBeijingLoc = func() *time.Location {
|
||||
loc, err := time.LoadLocation("Asia/Shanghai")
|
||||
if err != nil || loc == nil {
|
||||
log.Warnf("qwen: failed to load Asia/Shanghai timezone: %v, using fixed UTC+8", err)
|
||||
return time.FixedZone("CST", 8*3600)
|
||||
}
|
||||
return loc
|
||||
}()
|
||||
|
||||
// qwenQuotaCodes is a package-level set of error codes that indicate quota exhaustion.
|
||||
var qwenQuotaCodes = map[string]struct{}{
|
||||
"insufficient_quota": {},
|
||||
"quota_exceeded": {},
|
||||
}
|
||||
|
||||
// qwenRateLimiter tracks request timestamps per credential for rate limiting.
|
||||
// Qwen has a limit of 60 requests per minute per account.
|
||||
var qwenRateLimiter = struct {
|
||||
sync.Mutex
|
||||
requests map[string][]time.Time // authID -> request timestamps
|
||||
}{
|
||||
requests: make(map[string][]time.Time),
|
||||
}
|
||||
|
||||
// redactAuthID returns a redacted version of the auth ID for safe logging.
|
||||
// Keeps a small prefix/suffix to allow correlation across events.
|
||||
func redactAuthID(id string) string {
|
||||
if id == "" {
|
||||
return ""
|
||||
}
|
||||
if len(id) <= 8 {
|
||||
return id
|
||||
}
|
||||
return id[:4] + "..." + id[len(id)-4:]
|
||||
}
|
||||
|
||||
// checkQwenRateLimit checks if the credential has exceeded the rate limit.
|
||||
// Returns nil if allowed, or a statusErr with retryAfter if rate limited.
|
||||
func checkQwenRateLimit(authID string) error {
|
||||
if authID == "" {
|
||||
// Empty authID should not bypass rate limiting in production
|
||||
// Use debug level to avoid log spam for certain auth flows
|
||||
log.Debug("qwen rate limit check: empty authID, skipping rate limit")
|
||||
return nil
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
windowStart := now.Add(-qwenRateLimitWindow)
|
||||
|
||||
qwenRateLimiter.Lock()
|
||||
defer qwenRateLimiter.Unlock()
|
||||
|
||||
// Get and filter timestamps within the window
|
||||
timestamps := qwenRateLimiter.requests[authID]
|
||||
var validTimestamps []time.Time
|
||||
for _, ts := range timestamps {
|
||||
if ts.After(windowStart) {
|
||||
validTimestamps = append(validTimestamps, ts)
|
||||
}
|
||||
}
|
||||
|
||||
// Always prune expired entries to prevent memory leak
|
||||
// Delete empty entries, otherwise update with pruned slice
|
||||
if len(validTimestamps) == 0 {
|
||||
delete(qwenRateLimiter.requests, authID)
|
||||
}
|
||||
|
||||
// Check if rate limit exceeded
|
||||
if len(validTimestamps) >= qwenRateLimitPerMin {
|
||||
// Calculate when the oldest request will expire
|
||||
oldestInWindow := validTimestamps[0]
|
||||
retryAfter := oldestInWindow.Add(qwenRateLimitWindow).Sub(now)
|
||||
if retryAfter < time.Second {
|
||||
retryAfter = time.Second
|
||||
}
|
||||
retryAfterSec := int(retryAfter.Seconds())
|
||||
return statusErr{
|
||||
code: http.StatusTooManyRequests,
|
||||
msg: fmt.Sprintf(`{"error":{"code":"rate_limit_exceeded","message":"Qwen rate limit: %d requests/minute exceeded, retry after %ds","type":"rate_limit_exceeded"}}`, qwenRateLimitPerMin, retryAfterSec),
|
||||
retryAfter: &retryAfter,
|
||||
}
|
||||
}
|
||||
|
||||
// Record this request and update the map with pruned timestamps
|
||||
validTimestamps = append(validTimestamps, now)
|
||||
qwenRateLimiter.requests[authID] = validTimestamps
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// isQwenQuotaError checks if the error response indicates a quota exceeded error.
|
||||
// Qwen returns HTTP 403 with error.code="insufficient_quota" when daily quota is exhausted.
|
||||
func isQwenQuotaError(body []byte) bool {
|
||||
code := strings.ToLower(gjson.GetBytes(body, "error.code").String())
|
||||
errType := strings.ToLower(gjson.GetBytes(body, "error.type").String())
|
||||
|
||||
// Primary check: exact match on error.code or error.type (most reliable)
|
||||
if _, ok := qwenQuotaCodes[code]; ok {
|
||||
return true
|
||||
}
|
||||
if _, ok := qwenQuotaCodes[errType]; ok {
|
||||
return true
|
||||
}
|
||||
|
||||
// Fallback: check message only if code/type don't match (less reliable)
|
||||
msg := strings.ToLower(gjson.GetBytes(body, "error.message").String())
|
||||
if strings.Contains(msg, "insufficient_quota") || strings.Contains(msg, "quota exceeded") ||
|
||||
strings.Contains(msg, "free allocated quota exceeded") {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// wrapQwenError wraps an HTTP error response, detecting quota errors and mapping them to 429.
|
||||
// Returns the appropriate status code and retryAfter duration for statusErr.
|
||||
// Only checks for quota errors when httpCode is 403 or 429 to avoid false positives.
|
||||
func wrapQwenError(ctx context.Context, httpCode int, body []byte) (errCode int, retryAfter *time.Duration) {
|
||||
errCode = httpCode
|
||||
// Only check quota errors for expected status codes to avoid false positives
|
||||
// Qwen returns 403 for quota errors, 429 for rate limits
|
||||
if (httpCode == http.StatusForbidden || httpCode == http.StatusTooManyRequests) && isQwenQuotaError(body) {
|
||||
errCode = http.StatusTooManyRequests // Map to 429 to trigger quota logic
|
||||
cooldown := timeUntilNextDay()
|
||||
retryAfter = &cooldown
|
||||
logWithRequestID(ctx).Warnf("qwen quota exceeded (http %d -> %d), cooling down until tomorrow (%v)", httpCode, errCode, cooldown)
|
||||
}
|
||||
return errCode, retryAfter
|
||||
}
|
||||
|
||||
// timeUntilNextDay returns duration until midnight Beijing time (UTC+8).
|
||||
// Qwen's daily quota resets at 00:00 Beijing time.
|
||||
func timeUntilNextDay() time.Duration {
|
||||
now := time.Now()
|
||||
nowLocal := now.In(qwenBeijingLoc)
|
||||
tomorrow := time.Date(nowLocal.Year(), nowLocal.Month(), nowLocal.Day()+1, 0, 0, 0, 0, qwenBeijingLoc)
|
||||
return tomorrow.Sub(now)
|
||||
}
|
||||
|
||||
// QwenExecutor is a stateless executor for Qwen Code using OpenAI-compatible chat completions.
|
||||
// If access token is unavailable, it falls back to legacy via ClientAdapter.
|
||||
type QwenExecutor struct {
|
||||
@@ -67,6 +210,17 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
|
||||
if opts.Alt == "responses/compact" {
|
||||
return resp, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
||||
}
|
||||
|
||||
// Check rate limit before proceeding
|
||||
var authID string
|
||||
if auth != nil {
|
||||
authID = auth.ID
|
||||
}
|
||||
if err := checkQwenRateLimit(authID); err != nil {
|
||||
logWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
|
||||
return resp, err
|
||||
}
|
||||
|
||||
baseModel := thinking.ParseSuffix(req.Model).ModelName
|
||||
|
||||
token, baseURL := qwenCreds(auth)
|
||||
@@ -102,9 +256,8 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
|
||||
return resp, err
|
||||
}
|
||||
applyQwenHeaders(httpReq, token, false)
|
||||
var authID, authLabel, authType, authValue string
|
||||
var authLabel, authType, authValue string
|
||||
if auth != nil {
|
||||
authID = auth.ID
|
||||
authLabel = auth.Label
|
||||
authType, authValue = auth.AccountInfo()
|
||||
}
|
||||
@@ -135,8 +288,10 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
|
||||
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
|
||||
return resp, err
|
||||
}
|
||||
data, err := io.ReadAll(httpResp.Body)
|
||||
@@ -158,6 +313,17 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
||||
if opts.Alt == "responses/compact" {
|
||||
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
||||
}
|
||||
|
||||
// Check rate limit before proceeding
|
||||
var authID string
|
||||
if auth != nil {
|
||||
authID = auth.ID
|
||||
}
|
||||
if err := checkQwenRateLimit(authID); err != nil {
|
||||
logWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
baseModel := thinking.ParseSuffix(req.Model).ModelName
|
||||
|
||||
token, baseURL := qwenCreds(auth)
|
||||
@@ -200,9 +366,8 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
||||
return nil, err
|
||||
}
|
||||
applyQwenHeaders(httpReq, token, true)
|
||||
var authID, authLabel, authType, authValue string
|
||||
var authLabel, authType, authValue string
|
||||
if auth != nil {
|
||||
authID = auth.ID
|
||||
authLabel = auth.Label
|
||||
authType, authValue = auth.AccountInfo()
|
||||
}
|
||||
@@ -228,11 +393,13 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
|
||||
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("qwen executor: close response body error: %v", errClose)
|
||||
}
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
|
||||
return nil, err
|
||||
}
|
||||
out := make(chan cliproxyexecutor.StreamChunk)
|
||||
|
||||
Reference in New Issue
Block a user