fix(executor): add transient 429 resource exhausted handling with retry logic

This commit is contained in:
Luis Pater
2026-04-08 02:48:53 +08:00
parent c8b7e2b8d6
commit 91e7591955
2 changed files with 154 additions and 7 deletions

View File

@@ -261,6 +261,28 @@ func classifyAntigravity429(body []byte) antigravity429Category {
return antigravity429Unknown
}
func antigravityHasQuotaResetDelayOrModelInfo(body []byte) bool {
if len(body) == 0 {
return false
}
details := gjson.GetBytes(body, "error.details")
if !details.Exists() || !details.IsArray() {
return false
}
for _, detail := range details.Array() {
if detail.Get("@type").String() != "type.googleapis.com/google.rpc.ErrorInfo" {
continue
}
if strings.TrimSpace(detail.Get("metadata.quotaResetDelay").String()) != "" {
return true
}
if strings.TrimSpace(detail.Get("metadata.model").String()) != "" {
return true
}
}
return false
}
func antigravityCreditsRetryEnabled(cfg *config.Config) bool {
return cfg != nil && cfg.QuotaExceeded.AntigravityCredits
}
@@ -362,6 +384,12 @@ func shouldMarkAntigravityCreditsExhausted(statusCode int, body []byte, reqErr e
lowerBody := strings.ToLower(string(body))
for _, keyword := range antigravityCreditsExhaustedKeywords {
if strings.Contains(lowerBody, keyword) {
if keyword == "resource has been exhausted" &&
statusCode == http.StatusTooManyRequests &&
classifyAntigravity429(body) == antigravity429Unknown &&
!antigravityHasQuotaResetDelayOrModelInfo(body) {
return false
}
return true
}
}
@@ -575,6 +603,14 @@ attemptLoop:
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
continue
}
if antigravityShouldRetryTransientResourceExhausted429(httpResp.StatusCode, bodyBytes) && attempt+1 < attempts {
delay := antigravityTransient429RetryDelay(attempt)
log.Debugf("antigravity executor: transient 429 resource exhausted for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts)
if errWait := antigravityWait(ctx, delay); errWait != nil {
return resp, errWait
}
continue attemptLoop
}
if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) {
if idx+1 < len(baseURLs) {
log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
@@ -742,6 +778,14 @@ attemptLoop:
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
continue
}
if antigravityShouldRetryTransientResourceExhausted429(httpResp.StatusCode, bodyBytes) && attempt+1 < attempts {
delay := antigravityTransient429RetryDelay(attempt)
log.Debugf("antigravity executor: transient 429 resource exhausted for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts)
if errWait := antigravityWait(ctx, delay); errWait != nil {
return resp, errWait
}
continue attemptLoop
}
if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) {
if idx+1 < len(baseURLs) {
log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
@@ -1158,6 +1202,14 @@ attemptLoop:
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
continue
}
if antigravityShouldRetryTransientResourceExhausted429(httpResp.StatusCode, bodyBytes) && attempt+1 < attempts {
delay := antigravityTransient429RetryDelay(attempt)
log.Debugf("antigravity executor: transient 429 resource exhausted for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts)
if errWait := antigravityWait(ctx, delay); errWait != nil {
return nil, errWait
}
continue attemptLoop
}
if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) {
if idx+1 < len(baseURLs) {
log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
@@ -1774,6 +1826,24 @@ func antigravityShouldRetryNoCapacity(statusCode int, body []byte) bool {
return strings.Contains(msg, "no capacity available")
}
func antigravityShouldRetryTransientResourceExhausted429(statusCode int, body []byte) bool {
if statusCode != http.StatusTooManyRequests {
return false
}
if len(body) == 0 {
return false
}
if classifyAntigravity429(body) != antigravity429Unknown {
return false
}
status := strings.TrimSpace(gjson.GetBytes(body, "error.status").String())
if !strings.EqualFold(status, "RESOURCE_EXHAUSTED") {
return false
}
msg := strings.ToLower(string(body))
return strings.Contains(msg, "resource has been exhausted")
}
func antigravityNoCapacityRetryDelay(attempt int) time.Duration {
if attempt < 0 {
attempt = 0
@@ -1785,6 +1855,17 @@ func antigravityNoCapacityRetryDelay(attempt int) time.Duration {
return delay
}
func antigravityTransient429RetryDelay(attempt int) time.Duration {
if attempt < 0 {
attempt = 0
}
delay := time.Duration(attempt+1) * 100 * time.Millisecond
if delay > 500*time.Millisecond {
delay = 500 * time.Millisecond
}
return delay
}
func antigravityWait(ctx context.Context, wait time.Duration) error {
if wait <= 0 {
return nil

View File

@@ -82,20 +82,86 @@ func TestInjectEnabledCreditTypes(t *testing.T) {
}
func TestShouldMarkAntigravityCreditsExhausted(t *testing.T) {
for _, body := range [][]byte{
[]byte(`{"error":{"message":"Insufficient GOOGLE_ONE_AI credits"}}`),
[]byte(`{"error":{"message":"minimumCreditAmountForUsage requirement not met"}}`),
[]byte(`{"error":{"message":"Resource has been exhausted"}}`),
} {
if !shouldMarkAntigravityCreditsExhausted(http.StatusForbidden, body, nil) {
t.Run("credit errors are marked", func(t *testing.T) {
for _, body := range [][]byte{
[]byte(`{"error":{"message":"Insufficient GOOGLE_ONE_AI credits"}}`),
[]byte(`{"error":{"message":"minimumCreditAmountForUsage requirement not met"}}`),
} {
if !shouldMarkAntigravityCreditsExhausted(http.StatusForbidden, body, nil) {
t.Fatalf("shouldMarkAntigravityCreditsExhausted(%s) = false, want true", string(body))
}
}
})
t.Run("transient 429 resource exhausted is not marked", func(t *testing.T) {
body := []byte(`{"error":{"code":429,"message":"Resource has been exhausted (e.g. check quota).","status":"RESOURCE_EXHAUSTED"}}`)
if shouldMarkAntigravityCreditsExhausted(http.StatusTooManyRequests, body, nil) {
t.Fatalf("shouldMarkAntigravityCreditsExhausted(%s) = true, want false", string(body))
}
})
t.Run("resource exhausted with quota metadata is still marked", func(t *testing.T) {
body := []byte(`{"error":{"code":429,"message":"Resource has been exhausted","status":"RESOURCE_EXHAUSTED","details":[{"@type":"type.googleapis.com/google.rpc.ErrorInfo","metadata":{"quotaResetDelay":"1h","model":"claude-sonnet-4-6"}}]}}`)
if !shouldMarkAntigravityCreditsExhausted(http.StatusTooManyRequests, body, nil) {
t.Fatalf("shouldMarkAntigravityCreditsExhausted(%s) = false, want true", string(body))
}
}
})
if shouldMarkAntigravityCreditsExhausted(http.StatusServiceUnavailable, []byte(`{"error":{"message":"credits exhausted"}}`), nil) {
t.Fatal("shouldMarkAntigravityCreditsExhausted() = true for 5xx, want false")
}
}
func TestAntigravityExecute_RetriesTransient429ResourceExhausted(t *testing.T) {
resetAntigravityCreditsRetryState()
t.Cleanup(resetAntigravityCreditsRetryState)
var requestCount int
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
switch requestCount {
case 1:
w.WriteHeader(http.StatusTooManyRequests)
_, _ = w.Write([]byte(`{"error":{"code":429,"message":"Resource has been exhausted (e.g. check quota).","status":"RESOURCE_EXHAUSTED"}}`))
case 2:
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"response":{"candidates":[{"content":{"role":"model","parts":[{"text":"ok"}]}}],"usageMetadata":{"promptTokenCount":1,"candidatesTokenCount":1,"totalTokenCount":2}}}`))
default:
t.Fatalf("unexpected request count %d", requestCount)
}
}))
defer server.Close()
exec := NewAntigravityExecutor(&config.Config{RequestRetry: 1})
auth := &cliproxyauth.Auth{
ID: "auth-transient-429",
Attributes: map[string]string{
"base_url": server.URL,
},
Metadata: map[string]any{
"access_token": "token",
"project_id": "project-1",
"expired": time.Now().Add(1 * time.Hour).Format(time.RFC3339),
},
}
resp, err := exec.Execute(context.Background(), auth, cliproxyexecutor.Request{
Model: "gemini-2.5-flash",
Payload: []byte(`{"request":{"contents":[{"role":"user","parts":[{"text":"hi"}]}]}}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FormatAntigravity,
})
if err != nil {
t.Fatalf("Execute() error = %v", err)
}
if len(resp.Payload) == 0 {
t.Fatal("Execute() returned empty payload")
}
if requestCount != 2 {
t.Fatalf("request count = %d, want 2", requestCount)
}
}
func TestAntigravityExecute_RetriesQuotaExhaustedWithCredits(t *testing.T) {
resetAntigravityCreditsRetryState()
t.Cleanup(resetAntigravityCreditsRetryState)