mirror of
https://github.com/router-for-me/CLIProxyAPIPlus.git
synced 2026-03-22 17:21:49 +00:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
851712a49e | ||
|
|
9e34323a40 | ||
|
|
70897247b2 | ||
|
|
9c341f5aa5 | ||
|
|
e3e741d0be | ||
|
|
7c7c5fd967 | ||
|
|
fe8c7a62aa | ||
|
|
2af4a8dc12 | ||
|
|
0f53b952b2 | ||
|
|
f30ffd5f5e |
@@ -148,87 +148,108 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au
|
||||
baseURLs := antigravityBaseURLFallbackOrder(auth)
|
||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
|
||||
var lastStatus int
|
||||
var lastBody []byte
|
||||
var lastErr error
|
||||
attempts := antigravityRetryAttempts(auth, e.cfg)
|
||||
|
||||
for idx, baseURL := range baseURLs {
|
||||
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, false, opts.Alt, baseURL)
|
||||
if errReq != nil {
|
||||
err = errReq
|
||||
return resp, err
|
||||
}
|
||||
attemptLoop:
|
||||
for attempt := 0; attempt < attempts; attempt++ {
|
||||
var lastStatus int
|
||||
var lastBody []byte
|
||||
var lastErr error
|
||||
|
||||
httpResp, errDo := httpClient.Do(httpReq)
|
||||
if errDo != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errDo)
|
||||
if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) {
|
||||
return resp, errDo
|
||||
for idx, baseURL := range baseURLs {
|
||||
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, false, opts.Alt, baseURL)
|
||||
if errReq != nil {
|
||||
err = errReq
|
||||
return resp, err
|
||||
}
|
||||
lastStatus = 0
|
||||
lastBody = nil
|
||||
lastErr = errDo
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
|
||||
httpResp, errDo := httpClient.Do(httpReq)
|
||||
if errDo != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errDo)
|
||||
if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) {
|
||||
return resp, errDo
|
||||
}
|
||||
lastStatus = 0
|
||||
lastBody = nil
|
||||
lastErr = errDo
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
err = errDo
|
||||
return resp, err
|
||||
}
|
||||
err = errDo
|
||||
return resp, err
|
||||
|
||||
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||
bodyBytes, errRead := io.ReadAll(httpResp.Body)
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||
}
|
||||
if errRead != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errRead)
|
||||
err = errRead
|
||||
return resp, err
|
||||
}
|
||||
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
|
||||
|
||||
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
|
||||
log.Debugf("antigravity executor: upstream error status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), bodyBytes))
|
||||
lastStatus = httpResp.StatusCode
|
||||
lastBody = append([]byte(nil), bodyBytes...)
|
||||
lastErr = nil
|
||||
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) {
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
if attempt+1 < attempts {
|
||||
delay := antigravityNoCapacityRetryDelay(attempt)
|
||||
log.Debugf("antigravity executor: no capacity for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts)
|
||||
if errWait := antigravityWait(ctx, delay); errWait != nil {
|
||||
return resp, errWait
|
||||
}
|
||||
continue attemptLoop
|
||||
}
|
||||
}
|
||||
sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
|
||||
if httpResp.StatusCode == http.StatusTooManyRequests {
|
||||
if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil {
|
||||
sErr.retryAfter = retryAfter
|
||||
}
|
||||
}
|
||||
err = sErr
|
||||
return resp, err
|
||||
}
|
||||
|
||||
reporter.publish(ctx, parseAntigravityUsage(bodyBytes))
|
||||
var param any
|
||||
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bodyBytes, ¶m)
|
||||
resp = cliproxyexecutor.Response{Payload: []byte(converted)}
|
||||
reporter.ensurePublished(ctx)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||
bodyBytes, errRead := io.ReadAll(httpResp.Body)
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||
}
|
||||
if errRead != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errRead)
|
||||
err = errRead
|
||||
return resp, err
|
||||
}
|
||||
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
|
||||
|
||||
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
|
||||
log.Debugf("antigravity executor: upstream error status: %d, body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), bodyBytes))
|
||||
lastStatus = httpResp.StatusCode
|
||||
lastBody = append([]byte(nil), bodyBytes...)
|
||||
lastErr = nil
|
||||
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
|
||||
if httpResp.StatusCode == http.StatusTooManyRequests {
|
||||
if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil {
|
||||
switch {
|
||||
case lastStatus != 0:
|
||||
sErr := statusErr{code: lastStatus, msg: string(lastBody)}
|
||||
if lastStatus == http.StatusTooManyRequests {
|
||||
if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil {
|
||||
sErr.retryAfter = retryAfter
|
||||
}
|
||||
}
|
||||
err = sErr
|
||||
return resp, err
|
||||
case lastErr != nil:
|
||||
err = lastErr
|
||||
default:
|
||||
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
|
||||
}
|
||||
|
||||
reporter.publish(ctx, parseAntigravityUsage(bodyBytes))
|
||||
var param any
|
||||
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bodyBytes, ¶m)
|
||||
resp = cliproxyexecutor.Response{Payload: []byte(converted)}
|
||||
reporter.ensurePublished(ctx)
|
||||
return resp, nil
|
||||
return resp, err
|
||||
}
|
||||
|
||||
switch {
|
||||
case lastStatus != 0:
|
||||
sErr := statusErr{code: lastStatus, msg: string(lastBody)}
|
||||
if lastStatus == http.StatusTooManyRequests {
|
||||
if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil {
|
||||
sErr.retryAfter = retryAfter
|
||||
}
|
||||
}
|
||||
err = sErr
|
||||
case lastErr != nil:
|
||||
err = lastErr
|
||||
default:
|
||||
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
@@ -268,150 +289,171 @@ func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth *
|
||||
baseURLs := antigravityBaseURLFallbackOrder(auth)
|
||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
|
||||
var lastStatus int
|
||||
var lastBody []byte
|
||||
var lastErr error
|
||||
attempts := antigravityRetryAttempts(auth, e.cfg)
|
||||
|
||||
for idx, baseURL := range baseURLs {
|
||||
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL)
|
||||
if errReq != nil {
|
||||
err = errReq
|
||||
return resp, err
|
||||
}
|
||||
attemptLoop:
|
||||
for attempt := 0; attempt < attempts; attempt++ {
|
||||
var lastStatus int
|
||||
var lastBody []byte
|
||||
var lastErr error
|
||||
|
||||
httpResp, errDo := httpClient.Do(httpReq)
|
||||
if errDo != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errDo)
|
||||
if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) {
|
||||
return resp, errDo
|
||||
for idx, baseURL := range baseURLs {
|
||||
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL)
|
||||
if errReq != nil {
|
||||
err = errReq
|
||||
return resp, err
|
||||
}
|
||||
lastStatus = 0
|
||||
lastBody = nil
|
||||
lastErr = errDo
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
err = errDo
|
||||
return resp, err
|
||||
}
|
||||
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
|
||||
bodyBytes, errRead := io.ReadAll(httpResp.Body)
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||
}
|
||||
if errRead != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errRead)
|
||||
if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) {
|
||||
err = errRead
|
||||
return resp, err
|
||||
}
|
||||
if errCtx := ctx.Err(); errCtx != nil {
|
||||
err = errCtx
|
||||
return resp, err
|
||||
|
||||
httpResp, errDo := httpClient.Do(httpReq)
|
||||
if errDo != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errDo)
|
||||
if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) {
|
||||
return resp, errDo
|
||||
}
|
||||
lastStatus = 0
|
||||
lastBody = nil
|
||||
lastErr = errRead
|
||||
lastErr = errDo
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
err = errRead
|
||||
err = errDo
|
||||
return resp, err
|
||||
}
|
||||
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
|
||||
lastStatus = httpResp.StatusCode
|
||||
lastBody = append([]byte(nil), bodyBytes...)
|
||||
lastErr = nil
|
||||
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
|
||||
bodyBytes, errRead := io.ReadAll(httpResp.Body)
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||
}
|
||||
if errRead != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errRead)
|
||||
if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) {
|
||||
err = errRead
|
||||
return resp, err
|
||||
}
|
||||
if errCtx := ctx.Err(); errCtx != nil {
|
||||
err = errCtx
|
||||
return resp, err
|
||||
}
|
||||
lastStatus = 0
|
||||
lastBody = nil
|
||||
lastErr = errRead
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
err = errRead
|
||||
return resp, err
|
||||
}
|
||||
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
|
||||
lastStatus = httpResp.StatusCode
|
||||
lastBody = append([]byte(nil), bodyBytes...)
|
||||
lastErr = nil
|
||||
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) {
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
if attempt+1 < attempts {
|
||||
delay := antigravityNoCapacityRetryDelay(attempt)
|
||||
log.Debugf("antigravity executor: no capacity for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts)
|
||||
if errWait := antigravityWait(ctx, delay); errWait != nil {
|
||||
return resp, errWait
|
||||
}
|
||||
continue attemptLoop
|
||||
}
|
||||
}
|
||||
sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
|
||||
if httpResp.StatusCode == http.StatusTooManyRequests {
|
||||
if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil {
|
||||
sErr.retryAfter = retryAfter
|
||||
}
|
||||
}
|
||||
err = sErr
|
||||
return resp, err
|
||||
}
|
||||
sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
|
||||
if httpResp.StatusCode == http.StatusTooManyRequests {
|
||||
if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil {
|
||||
|
||||
out := make(chan cliproxyexecutor.StreamChunk)
|
||||
go func(resp *http.Response) {
|
||||
defer close(out)
|
||||
defer func() {
|
||||
if errClose := resp.Body.Close(); errClose != nil {
|
||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||
}
|
||||
}()
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
scanner.Buffer(nil, streamScannerBuffer)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
appendAPIResponseChunk(ctx, e.cfg, line)
|
||||
|
||||
// Filter usage metadata for all models
|
||||
// Only retain usage statistics in the terminal chunk
|
||||
line = FilterSSEUsageMetadata(line)
|
||||
|
||||
payload := jsonPayload(line)
|
||||
if payload == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if detail, ok := parseAntigravityStreamUsage(payload); ok {
|
||||
reporter.publish(ctx, detail)
|
||||
}
|
||||
|
||||
out <- cliproxyexecutor.StreamChunk{Payload: payload}
|
||||
}
|
||||
if errScan := scanner.Err(); errScan != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||
reporter.publishFailure(ctx)
|
||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||
} else {
|
||||
reporter.ensurePublished(ctx)
|
||||
}
|
||||
}(httpResp)
|
||||
|
||||
var buffer bytes.Buffer
|
||||
for chunk := range out {
|
||||
if chunk.Err != nil {
|
||||
return resp, chunk.Err
|
||||
}
|
||||
if len(chunk.Payload) > 0 {
|
||||
_, _ = buffer.Write(chunk.Payload)
|
||||
_, _ = buffer.Write([]byte("\n"))
|
||||
}
|
||||
}
|
||||
resp = cliproxyexecutor.Response{Payload: e.convertStreamToNonStream(buffer.Bytes())}
|
||||
|
||||
reporter.publish(ctx, parseAntigravityUsage(resp.Payload))
|
||||
var param any
|
||||
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, resp.Payload, ¶m)
|
||||
resp = cliproxyexecutor.Response{Payload: []byte(converted)}
|
||||
reporter.ensurePublished(ctx)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
switch {
|
||||
case lastStatus != 0:
|
||||
sErr := statusErr{code: lastStatus, msg: string(lastBody)}
|
||||
if lastStatus == http.StatusTooManyRequests {
|
||||
if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil {
|
||||
sErr.retryAfter = retryAfter
|
||||
}
|
||||
}
|
||||
err = sErr
|
||||
return resp, err
|
||||
case lastErr != nil:
|
||||
err = lastErr
|
||||
default:
|
||||
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
|
||||
}
|
||||
|
||||
out := make(chan cliproxyexecutor.StreamChunk)
|
||||
go func(resp *http.Response) {
|
||||
defer close(out)
|
||||
defer func() {
|
||||
if errClose := resp.Body.Close(); errClose != nil {
|
||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||
}
|
||||
}()
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
scanner.Buffer(nil, streamScannerBuffer)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
appendAPIResponseChunk(ctx, e.cfg, line)
|
||||
|
||||
// Filter usage metadata for all models
|
||||
// Only retain usage statistics in the terminal chunk
|
||||
line = FilterSSEUsageMetadata(line)
|
||||
|
||||
payload := jsonPayload(line)
|
||||
if payload == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if detail, ok := parseAntigravityStreamUsage(payload); ok {
|
||||
reporter.publish(ctx, detail)
|
||||
}
|
||||
|
||||
out <- cliproxyexecutor.StreamChunk{Payload: payload}
|
||||
}
|
||||
if errScan := scanner.Err(); errScan != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||
reporter.publishFailure(ctx)
|
||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||
} else {
|
||||
reporter.ensurePublished(ctx)
|
||||
}
|
||||
}(httpResp)
|
||||
|
||||
var buffer bytes.Buffer
|
||||
for chunk := range out {
|
||||
if chunk.Err != nil {
|
||||
return resp, chunk.Err
|
||||
}
|
||||
if len(chunk.Payload) > 0 {
|
||||
_, _ = buffer.Write(chunk.Payload)
|
||||
_, _ = buffer.Write([]byte("\n"))
|
||||
}
|
||||
}
|
||||
resp = cliproxyexecutor.Response{Payload: e.convertStreamToNonStream(buffer.Bytes())}
|
||||
|
||||
reporter.publish(ctx, parseAntigravityUsage(resp.Payload))
|
||||
var param any
|
||||
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, resp.Payload, ¶m)
|
||||
resp = cliproxyexecutor.Response{Payload: []byte(converted)}
|
||||
reporter.ensurePublished(ctx)
|
||||
|
||||
return resp, nil
|
||||
return resp, err
|
||||
}
|
||||
|
||||
switch {
|
||||
case lastStatus != 0:
|
||||
sErr := statusErr{code: lastStatus, msg: string(lastBody)}
|
||||
if lastStatus == http.StatusTooManyRequests {
|
||||
if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil {
|
||||
sErr.retryAfter = retryAfter
|
||||
}
|
||||
}
|
||||
err = sErr
|
||||
case lastErr != nil:
|
||||
err = lastErr
|
||||
default:
|
||||
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
@@ -635,139 +677,160 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya
|
||||
baseURLs := antigravityBaseURLFallbackOrder(auth)
|
||||
httpClient := newProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
|
||||
|
||||
var lastStatus int
|
||||
var lastBody []byte
|
||||
var lastErr error
|
||||
attempts := antigravityRetryAttempts(auth, e.cfg)
|
||||
|
||||
for idx, baseURL := range baseURLs {
|
||||
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL)
|
||||
if errReq != nil {
|
||||
err = errReq
|
||||
return nil, err
|
||||
}
|
||||
httpResp, errDo := httpClient.Do(httpReq)
|
||||
if errDo != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errDo)
|
||||
if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) {
|
||||
return nil, errDo
|
||||
attemptLoop:
|
||||
for attempt := 0; attempt < attempts; attempt++ {
|
||||
var lastStatus int
|
||||
var lastBody []byte
|
||||
var lastErr error
|
||||
|
||||
for idx, baseURL := range baseURLs {
|
||||
httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, translated, true, opts.Alt, baseURL)
|
||||
if errReq != nil {
|
||||
err = errReq
|
||||
return nil, err
|
||||
}
|
||||
lastStatus = 0
|
||||
lastBody = nil
|
||||
lastErr = errDo
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
err = errDo
|
||||
return nil, err
|
||||
}
|
||||
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
|
||||
bodyBytes, errRead := io.ReadAll(httpResp.Body)
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||
}
|
||||
if errRead != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errRead)
|
||||
if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) {
|
||||
err = errRead
|
||||
return nil, err
|
||||
}
|
||||
if errCtx := ctx.Err(); errCtx != nil {
|
||||
err = errCtx
|
||||
return nil, err
|
||||
httpResp, errDo := httpClient.Do(httpReq)
|
||||
if errDo != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errDo)
|
||||
if errors.Is(errDo, context.Canceled) || errors.Is(errDo, context.DeadlineExceeded) {
|
||||
return nil, errDo
|
||||
}
|
||||
lastStatus = 0
|
||||
lastBody = nil
|
||||
lastErr = errRead
|
||||
lastErr = errDo
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
log.Debugf("antigravity executor: request error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
err = errRead
|
||||
err = errDo
|
||||
return nil, err
|
||||
}
|
||||
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
|
||||
lastStatus = httpResp.StatusCode
|
||||
lastBody = append([]byte(nil), bodyBytes...)
|
||||
lastErr = nil
|
||||
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
|
||||
if httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices {
|
||||
bodyBytes, errRead := io.ReadAll(httpResp.Body)
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||
}
|
||||
if errRead != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errRead)
|
||||
if errors.Is(errRead, context.Canceled) || errors.Is(errRead, context.DeadlineExceeded) {
|
||||
err = errRead
|
||||
return nil, err
|
||||
}
|
||||
if errCtx := ctx.Err(); errCtx != nil {
|
||||
err = errCtx
|
||||
return nil, err
|
||||
}
|
||||
lastStatus = 0
|
||||
lastBody = nil
|
||||
lastErr = errRead
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: read error on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
err = errRead
|
||||
return nil, err
|
||||
}
|
||||
appendAPIResponseChunk(ctx, e.cfg, bodyBytes)
|
||||
lastStatus = httpResp.StatusCode
|
||||
lastBody = append([]byte(nil), bodyBytes...)
|
||||
lastErr = nil
|
||||
if httpResp.StatusCode == http.StatusTooManyRequests && idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: rate limited on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
if antigravityShouldRetryNoCapacity(httpResp.StatusCode, bodyBytes) {
|
||||
if idx+1 < len(baseURLs) {
|
||||
log.Debugf("antigravity executor: no capacity on base url %s, retrying with fallback base url: %s", baseURL, baseURLs[idx+1])
|
||||
continue
|
||||
}
|
||||
if attempt+1 < attempts {
|
||||
delay := antigravityNoCapacityRetryDelay(attempt)
|
||||
log.Debugf("antigravity executor: no capacity for model %s, retrying in %s (attempt %d/%d)", baseModel, delay, attempt+1, attempts)
|
||||
if errWait := antigravityWait(ctx, delay); errWait != nil {
|
||||
return nil, errWait
|
||||
}
|
||||
continue attemptLoop
|
||||
}
|
||||
}
|
||||
sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
|
||||
if httpResp.StatusCode == http.StatusTooManyRequests {
|
||||
if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil {
|
||||
sErr.retryAfter = retryAfter
|
||||
}
|
||||
}
|
||||
err = sErr
|
||||
return nil, err
|
||||
}
|
||||
sErr := statusErr{code: httpResp.StatusCode, msg: string(bodyBytes)}
|
||||
if httpResp.StatusCode == http.StatusTooManyRequests {
|
||||
if retryAfter, parseErr := parseRetryDelay(bodyBytes); parseErr == nil && retryAfter != nil {
|
||||
|
||||
out := make(chan cliproxyexecutor.StreamChunk)
|
||||
stream = out
|
||||
go func(resp *http.Response) {
|
||||
defer close(out)
|
||||
defer func() {
|
||||
if errClose := resp.Body.Close(); errClose != nil {
|
||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||
}
|
||||
}()
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
scanner.Buffer(nil, streamScannerBuffer)
|
||||
var param any
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
appendAPIResponseChunk(ctx, e.cfg, line)
|
||||
|
||||
// Filter usage metadata for all models
|
||||
// Only retain usage statistics in the terminal chunk
|
||||
line = FilterSSEUsageMetadata(line)
|
||||
|
||||
payload := jsonPayload(line)
|
||||
if payload == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if detail, ok := parseAntigravityStreamUsage(payload); ok {
|
||||
reporter.publish(ctx, detail)
|
||||
}
|
||||
|
||||
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(payload), ¶m)
|
||||
for i := range chunks {
|
||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
||||
}
|
||||
}
|
||||
tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, []byte("[DONE]"), ¶m)
|
||||
for i := range tail {
|
||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(tail[i])}
|
||||
}
|
||||
if errScan := scanner.Err(); errScan != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||
reporter.publishFailure(ctx)
|
||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||
} else {
|
||||
reporter.ensurePublished(ctx)
|
||||
}
|
||||
}(httpResp)
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
switch {
|
||||
case lastStatus != 0:
|
||||
sErr := statusErr{code: lastStatus, msg: string(lastBody)}
|
||||
if lastStatus == http.StatusTooManyRequests {
|
||||
if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil {
|
||||
sErr.retryAfter = retryAfter
|
||||
}
|
||||
}
|
||||
err = sErr
|
||||
return nil, err
|
||||
case lastErr != nil:
|
||||
err = lastErr
|
||||
default:
|
||||
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
|
||||
}
|
||||
|
||||
out := make(chan cliproxyexecutor.StreamChunk)
|
||||
stream = out
|
||||
go func(resp *http.Response) {
|
||||
defer close(out)
|
||||
defer func() {
|
||||
if errClose := resp.Body.Close(); errClose != nil {
|
||||
log.Errorf("antigravity executor: close response body error: %v", errClose)
|
||||
}
|
||||
}()
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
scanner.Buffer(nil, streamScannerBuffer)
|
||||
var param any
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
appendAPIResponseChunk(ctx, e.cfg, line)
|
||||
|
||||
// Filter usage metadata for all models
|
||||
// Only retain usage statistics in the terminal chunk
|
||||
line = FilterSSEUsageMetadata(line)
|
||||
|
||||
payload := jsonPayload(line)
|
||||
if payload == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if detail, ok := parseAntigravityStreamUsage(payload); ok {
|
||||
reporter.publish(ctx, detail)
|
||||
}
|
||||
|
||||
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, bytes.Clone(payload), ¶m)
|
||||
for i := range chunks {
|
||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
|
||||
}
|
||||
}
|
||||
tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, []byte("[DONE]"), ¶m)
|
||||
for i := range tail {
|
||||
out <- cliproxyexecutor.StreamChunk{Payload: []byte(tail[i])}
|
||||
}
|
||||
if errScan := scanner.Err(); errScan != nil {
|
||||
recordAPIResponseError(ctx, e.cfg, errScan)
|
||||
reporter.publishFailure(ctx)
|
||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||
} else {
|
||||
reporter.ensurePublished(ctx)
|
||||
}
|
||||
}(httpResp)
|
||||
return stream, nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch {
|
||||
case lastStatus != 0:
|
||||
sErr := statusErr{code: lastStatus, msg: string(lastBody)}
|
||||
if lastStatus == http.StatusTooManyRequests {
|
||||
if retryAfter, parseErr := parseRetryDelay(lastBody); parseErr == nil && retryAfter != nil {
|
||||
sErr.retryAfter = retryAfter
|
||||
}
|
||||
}
|
||||
err = sErr
|
||||
case lastErr != nil:
|
||||
err = lastErr
|
||||
default:
|
||||
err = statusErr{code: http.StatusServiceUnavailable, msg: "antigravity executor: no base url available"}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -1384,14 +1447,70 @@ func resolveUserAgent(auth *cliproxyauth.Auth) string {
|
||||
return defaultAntigravityAgent
|
||||
}
|
||||
|
||||
func antigravityRetryAttempts(auth *cliproxyauth.Auth, cfg *config.Config) int {
|
||||
retry := 0
|
||||
if cfg != nil {
|
||||
retry = cfg.RequestRetry
|
||||
}
|
||||
if auth != nil {
|
||||
if override, ok := auth.RequestRetryOverride(); ok {
|
||||
retry = override
|
||||
}
|
||||
}
|
||||
if retry < 0 {
|
||||
retry = 0
|
||||
}
|
||||
attempts := retry + 1
|
||||
if attempts < 1 {
|
||||
return 1
|
||||
}
|
||||
return attempts
|
||||
}
|
||||
|
||||
func antigravityShouldRetryNoCapacity(statusCode int, body []byte) bool {
|
||||
if statusCode != http.StatusServiceUnavailable {
|
||||
return false
|
||||
}
|
||||
if len(body) == 0 {
|
||||
return false
|
||||
}
|
||||
msg := strings.ToLower(string(body))
|
||||
return strings.Contains(msg, "no capacity available")
|
||||
}
|
||||
|
||||
func antigravityNoCapacityRetryDelay(attempt int) time.Duration {
|
||||
if attempt < 0 {
|
||||
attempt = 0
|
||||
}
|
||||
delay := time.Duration(attempt+1) * 250 * time.Millisecond
|
||||
if delay > 2*time.Second {
|
||||
delay = 2 * time.Second
|
||||
}
|
||||
return delay
|
||||
}
|
||||
|
||||
func antigravityWait(ctx context.Context, wait time.Duration) error {
|
||||
if wait <= 0 {
|
||||
return nil
|
||||
}
|
||||
timer := time.NewTimer(wait)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-timer.C:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func antigravityBaseURLFallbackOrder(auth *cliproxyauth.Auth) []string {
|
||||
if base := resolveCustomAntigravityBaseURL(auth); base != "" {
|
||||
return []string{base}
|
||||
}
|
||||
return []string{
|
||||
antigravitySandboxBaseURLDaily,
|
||||
antigravityBaseURLDaily,
|
||||
antigravityBaseURLProd,
|
||||
antigravitySandboxBaseURLDaily,
|
||||
// antigravityBaseURLProd,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -163,7 +163,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("response body close error: %v", errClose)
|
||||
@@ -295,7 +295,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("response body close error: %v", errClose)
|
||||
}
|
||||
|
||||
@@ -150,7 +150,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
return resp, err
|
||||
}
|
||||
@@ -265,7 +265,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
|
||||
return nil, readErr
|
||||
}
|
||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -227,7 +227,7 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
|
||||
|
||||
lastStatus = httpResp.StatusCode
|
||||
lastBody = append([]byte(nil), data...)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data))
|
||||
if httpResp.StatusCode == 429 {
|
||||
if idx+1 < len(models) {
|
||||
log.Debugf("gemini cli executor: rate limited, retrying with next model: %s", models[idx+1])
|
||||
@@ -360,7 +360,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
|
||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||
lastStatus = httpResp.StatusCode
|
||||
lastBody = append([]byte(nil), data...)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data))
|
||||
if httpResp.StatusCode == 429 {
|
||||
if idx+1 < len(models) {
|
||||
log.Debugf("gemini cli executor: rate limited, retrying with next model: %s", models[idx+1])
|
||||
|
||||
@@ -188,7 +188,7 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
return resp, err
|
||||
}
|
||||
@@ -282,7 +282,7 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("gemini executor: close response body error: %v", errClose)
|
||||
}
|
||||
@@ -402,7 +402,7 @@ func (e *GeminiExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Aut
|
||||
}
|
||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
log.Debugf("request error, error status: %d, error body: %s", resp.StatusCode, summarizeErrorBody(resp.Header.Get("Content-Type"), data))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", resp.StatusCode, summarizeErrorBody(resp.Header.Get("Content-Type"), data))
|
||||
return cliproxyexecutor.Response{}, statusErr{code: resp.StatusCode, msg: string(data)}
|
||||
}
|
||||
|
||||
|
||||
@@ -389,7 +389,7 @@ func (e *GeminiVertexExecutor) executeWithServiceAccount(ctx context.Context, au
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
return resp, err
|
||||
}
|
||||
@@ -503,7 +503,7 @@ func (e *GeminiVertexExecutor) executeWithAPIKey(ctx context.Context, auth *clip
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
return resp, err
|
||||
}
|
||||
@@ -601,7 +601,7 @@ func (e *GeminiVertexExecutor) executeStreamWithServiceAccount(ctx context.Conte
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("vertex executor: close response body error: %v", errClose)
|
||||
}
|
||||
@@ -725,7 +725,7 @@ func (e *GeminiVertexExecutor) executeStreamWithAPIKey(ctx context.Context, auth
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("vertex executor: close response body error: %v", errClose)
|
||||
}
|
||||
@@ -838,7 +838,7 @@ func (e *GeminiVertexExecutor) countTokensWithServiceAccount(ctx context.Context
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
return cliproxyexecutor.Response{}, statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
}
|
||||
data, errRead := io.ReadAll(httpResp.Body)
|
||||
@@ -922,7 +922,7 @@ func (e *GeminiVertexExecutor) countTokensWithAPIKey(ctx context.Context, auth *
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
return cliproxyexecutor.Response{}, statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
}
|
||||
data, errRead := io.ReadAll(httpResp.Body)
|
||||
|
||||
@@ -142,7 +142,7 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("iflow request error: status %d body %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
return resp, err
|
||||
}
|
||||
@@ -244,7 +244,7 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
|
||||
log.Errorf("iflow executor: close response body error: %v", errClose)
|
||||
}
|
||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||
log.Debugf("iflow streaming error: status %d body %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), data))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(data)}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -12,7 +12,10 @@ import (
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/logging"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -332,6 +335,12 @@ func summarizeErrorBody(contentType string, body []byte) string {
|
||||
}
|
||||
return "[html body omitted]"
|
||||
}
|
||||
|
||||
// Try to extract error message from JSON response
|
||||
if message := extractJSONErrorMessage(body); message != "" {
|
||||
return message
|
||||
}
|
||||
|
||||
return string(body)
|
||||
}
|
||||
|
||||
@@ -358,3 +367,25 @@ func extractHTMLTitle(body []byte) string {
|
||||
}
|
||||
return strings.Join(strings.Fields(title), " ")
|
||||
}
|
||||
|
||||
// extractJSONErrorMessage attempts to extract error.message from JSON error responses
|
||||
func extractJSONErrorMessage(body []byte) string {
|
||||
result := gjson.GetBytes(body, "error.message")
|
||||
if result.Exists() && result.String() != "" {
|
||||
return result.String()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// logWithRequestID returns a logrus Entry with request_id field populated from context.
|
||||
// If no request ID is found in context, it returns the standard logger.
|
||||
func logWithRequestID(ctx context.Context) *log.Entry {
|
||||
if ctx == nil {
|
||||
return log.NewEntry(log.StandardLogger())
|
||||
}
|
||||
requestID := logging.GetRequestID(ctx)
|
||||
if requestID == "" {
|
||||
return log.NewEntry(log.StandardLogger())
|
||||
}
|
||||
return log.WithField("request_id", requestID)
|
||||
}
|
||||
|
||||
@@ -146,7 +146,7 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
return resp, err
|
||||
}
|
||||
@@ -239,7 +239,7 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("openai compat executor: close response body error: %v", errClose)
|
||||
}
|
||||
|
||||
@@ -133,7 +133,7 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
|
||||
return resp, err
|
||||
}
|
||||
@@ -222,7 +222,7 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
||||
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
|
||||
b, _ := io.ReadAll(httpResp.Body)
|
||||
appendAPIResponseChunk(ctx, e.cfg, b)
|
||||
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
logWithRequestID(ctx).Debugf("request error, error status: %d, error message: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
|
||||
if errClose := httpResp.Body.Close(); errClose != nil {
|
||||
log.Errorf("qwen executor: close response body error: %v", errClose)
|
||||
}
|
||||
|
||||
@@ -499,6 +499,16 @@ func shortenToolNameIfNeeded(name string) string {
|
||||
return name[:limit]
|
||||
}
|
||||
|
||||
func ensureKiroInputSchema(parameters interface{}) interface{} {
|
||||
if parameters != nil {
|
||||
return parameters
|
||||
}
|
||||
return map[string]interface{}{
|
||||
"type": "object",
|
||||
"properties": map[string]interface{}{},
|
||||
}
|
||||
}
|
||||
|
||||
// convertClaudeToolsToKiro converts Claude tools to Kiro format
|
||||
func convertClaudeToolsToKiro(tools gjson.Result) []KiroToolWrapper {
|
||||
var kiroTools []KiroToolWrapper
|
||||
@@ -509,7 +519,12 @@ func convertClaudeToolsToKiro(tools gjson.Result) []KiroToolWrapper {
|
||||
for _, tool := range tools.Array() {
|
||||
name := tool.Get("name").String()
|
||||
description := tool.Get("description").String()
|
||||
inputSchema := tool.Get("input_schema").Value()
|
||||
inputSchemaResult := tool.Get("input_schema")
|
||||
var inputSchema interface{}
|
||||
if inputSchemaResult.Exists() && inputSchemaResult.Type != gjson.Null {
|
||||
inputSchema = inputSchemaResult.Value()
|
||||
}
|
||||
inputSchema = ensureKiroInputSchema(inputSchema)
|
||||
|
||||
// Shorten tool name if it exceeds 64 characters (common with MCP tools)
|
||||
originalName := name
|
||||
|
||||
@@ -314,7 +314,7 @@ func ConvertOpenAIToolsToKiroFormat(tools []map[string]interface{}) []KiroToolWr
|
||||
|
||||
name := kirocommon.GetString(fn, "name")
|
||||
description := kirocommon.GetString(fn, "description")
|
||||
parameters := fn["parameters"]
|
||||
parameters := ensureKiroInputSchema(fn["parameters"])
|
||||
|
||||
if name == "" {
|
||||
continue
|
||||
@@ -368,4 +368,4 @@ func ConvertClaudeToolUseToOpenAI(toolUseID, toolName string, input map[string]i
|
||||
// LogStreamEvent logs a streaming event for debugging
|
||||
func LogStreamEvent(eventType, data string) {
|
||||
log.Debugf("kiro-openai: stream event type=%s, data_len=%d", eventType, len(data))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -381,6 +381,16 @@ func shortenToolNameIfNeeded(name string) string {
|
||||
return name[:limit]
|
||||
}
|
||||
|
||||
func ensureKiroInputSchema(parameters interface{}) interface{} {
|
||||
if parameters != nil {
|
||||
return parameters
|
||||
}
|
||||
return map[string]interface{}{
|
||||
"type": "object",
|
||||
"properties": map[string]interface{}{},
|
||||
}
|
||||
}
|
||||
|
||||
// convertOpenAIToolsToKiro converts OpenAI tools to Kiro format
|
||||
func convertOpenAIToolsToKiro(tools gjson.Result) []KiroToolWrapper {
|
||||
var kiroTools []KiroToolWrapper
|
||||
@@ -401,7 +411,12 @@ func convertOpenAIToolsToKiro(tools gjson.Result) []KiroToolWrapper {
|
||||
|
||||
name := fn.Get("name").String()
|
||||
description := fn.Get("description").String()
|
||||
parameters := fn.Get("parameters").Value()
|
||||
parametersResult := fn.Get("parameters")
|
||||
var parameters interface{}
|
||||
if parametersResult.Exists() && parametersResult.Type != gjson.Null {
|
||||
parameters = parametersResult.Value()
|
||||
}
|
||||
parameters = ensureKiroInputSchema(parameters)
|
||||
|
||||
// Shorten tool name if it exceeds 64 characters (common with MCP tools)
|
||||
originalName := name
|
||||
|
||||
@@ -167,6 +167,16 @@ func SynthesizeGeminiVirtualAuths(primary *coreauth.Auth, metadata map[string]an
|
||||
"virtual_parent_id": primary.ID,
|
||||
"type": metadata["type"],
|
||||
}
|
||||
if v, ok := metadata["disable_cooling"]; ok {
|
||||
metadataCopy["disable_cooling"] = v
|
||||
} else if v, ok := metadata["disable-cooling"]; ok {
|
||||
metadataCopy["disable_cooling"] = v
|
||||
}
|
||||
if v, ok := metadata["request_retry"]; ok {
|
||||
metadataCopy["request_retry"] = v
|
||||
} else if v, ok := metadata["request-retry"]; ok {
|
||||
metadataCopy["request_retry"] = v
|
||||
}
|
||||
proxy := strings.TrimSpace(primary.ProxyURL)
|
||||
if proxy != "" {
|
||||
metadataCopy["proxy_url"] = proxy
|
||||
|
||||
@@ -69,10 +69,12 @@ func TestFileSynthesizer_Synthesize_ValidAuthFile(t *testing.T) {
|
||||
|
||||
// Create a valid auth file
|
||||
authData := map[string]any{
|
||||
"type": "claude",
|
||||
"email": "test@example.com",
|
||||
"proxy_url": "http://proxy.local",
|
||||
"prefix": "test-prefix",
|
||||
"type": "claude",
|
||||
"email": "test@example.com",
|
||||
"proxy_url": "http://proxy.local",
|
||||
"prefix": "test-prefix",
|
||||
"disable_cooling": true,
|
||||
"request_retry": 2,
|
||||
}
|
||||
data, _ := json.Marshal(authData)
|
||||
err := os.WriteFile(filepath.Join(tempDir, "claude-auth.json"), data, 0644)
|
||||
@@ -108,6 +110,12 @@ func TestFileSynthesizer_Synthesize_ValidAuthFile(t *testing.T) {
|
||||
if auths[0].ProxyURL != "http://proxy.local" {
|
||||
t.Errorf("expected proxy_url http://proxy.local, got %s", auths[0].ProxyURL)
|
||||
}
|
||||
if v, ok := auths[0].Metadata["disable_cooling"].(bool); !ok || !v {
|
||||
t.Errorf("expected disable_cooling true, got %v", auths[0].Metadata["disable_cooling"])
|
||||
}
|
||||
if v, ok := auths[0].Metadata["request_retry"].(float64); !ok || int(v) != 2 {
|
||||
t.Errorf("expected request_retry 2, got %v", auths[0].Metadata["request_retry"])
|
||||
}
|
||||
if auths[0].Status != coreauth.StatusActive {
|
||||
t.Errorf("expected status active, got %s", auths[0].Status)
|
||||
}
|
||||
@@ -336,9 +344,11 @@ func TestSynthesizeGeminiVirtualAuths_MultiProject(t *testing.T) {
|
||||
},
|
||||
}
|
||||
metadata := map[string]any{
|
||||
"project_id": "project-a, project-b, project-c",
|
||||
"email": "test@example.com",
|
||||
"type": "gemini",
|
||||
"project_id": "project-a, project-b, project-c",
|
||||
"email": "test@example.com",
|
||||
"type": "gemini",
|
||||
"request_retry": 2,
|
||||
"disable_cooling": true,
|
||||
}
|
||||
|
||||
virtuals := SynthesizeGeminiVirtualAuths(primary, metadata, now)
|
||||
@@ -376,6 +386,12 @@ func TestSynthesizeGeminiVirtualAuths_MultiProject(t *testing.T) {
|
||||
if v.ProxyURL != "http://proxy.local" {
|
||||
t.Errorf("expected proxy_url http://proxy.local, got %s", v.ProxyURL)
|
||||
}
|
||||
if vv, ok := v.Metadata["disable_cooling"].(bool); !ok || !vv {
|
||||
t.Errorf("expected disable_cooling true, got %v", v.Metadata["disable_cooling"])
|
||||
}
|
||||
if vv, ok := v.Metadata["request_retry"].(int); !ok || vv != 2 {
|
||||
t.Errorf("expected request_retry 2, got %v", v.Metadata["request_retry"])
|
||||
}
|
||||
if v.Attributes["runtime_only"] != "true" {
|
||||
t.Error("expected runtime_only=true")
|
||||
}
|
||||
|
||||
@@ -73,9 +73,7 @@ func (s *FileTokenStore) Save(ctx context.Context, auth *cliproxyauth.Auth) (str
|
||||
return "", fmt.Errorf("auth filestore: marshal metadata failed: %w", errMarshal)
|
||||
}
|
||||
if existing, errRead := os.ReadFile(path); errRead == nil {
|
||||
// Use metadataEqualIgnoringTimestamps to skip writes when only timestamp fields change.
|
||||
// This prevents the token refresh loop caused by timestamp/expired/expires_in changes.
|
||||
if metadataEqualIgnoringTimestamps(existing, raw, auth.Provider) {
|
||||
if jsonEqual(existing, raw) {
|
||||
return path, nil
|
||||
}
|
||||
file, errOpen := os.OpenFile(path, os.O_WRONLY|os.O_TRUNC, 0o600)
|
||||
@@ -308,8 +306,7 @@ func (s *FileTokenStore) baseDirSnapshot() string {
|
||||
return s.baseDir
|
||||
}
|
||||
|
||||
// DEPRECATED: Use metadataEqualIgnoringTimestamps for comparing auth metadata.
|
||||
// This function is kept for backward compatibility but can cause refresh loops.
|
||||
// jsonEqual compares two JSON blobs by parsing them into Go objects and deep comparing.
|
||||
func jsonEqual(a, b []byte) bool {
|
||||
var objA any
|
||||
var objB any
|
||||
@@ -322,41 +319,6 @@ func jsonEqual(a, b []byte) bool {
|
||||
return deepEqualJSON(objA, objB)
|
||||
}
|
||||
|
||||
// metadataEqualIgnoringTimestamps compares two metadata JSON blobs,
|
||||
// ignoring fields that change on every refresh but don't affect functionality.
|
||||
// This prevents unnecessary file writes that would trigger watcher events and
|
||||
// create refresh loops.
|
||||
// The provider parameter controls whether access_token is ignored: providers like
|
||||
// Google OAuth (gemini, gemini-cli) can re-fetch tokens when needed, while others
|
||||
// like iFlow require the refreshed token to be persisted.
|
||||
func metadataEqualIgnoringTimestamps(a, b []byte, provider string) bool {
|
||||
var objA, objB map[string]any
|
||||
if err := json.Unmarshal(a, &objA); err != nil {
|
||||
return false
|
||||
}
|
||||
if err := json.Unmarshal(b, &objB); err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// Fields to ignore: these change on every refresh but don't affect authentication logic.
|
||||
// - timestamp, expired, expires_in, last_refresh: time-related fields that change on refresh
|
||||
ignoredFields := []string{"timestamp", "expired", "expires_in", "last_refresh"}
|
||||
|
||||
// For providers that can re-fetch tokens when needed (e.g., Google OAuth),
|
||||
// we ignore access_token to avoid unnecessary file writes.
|
||||
switch provider {
|
||||
case "gemini", "gemini-cli", "antigravity":
|
||||
ignoredFields = append(ignoredFields, "access_token")
|
||||
}
|
||||
|
||||
for _, field := range ignoredFields {
|
||||
delete(objA, field)
|
||||
delete(objB, field)
|
||||
}
|
||||
|
||||
return deepEqualJSON(objA, objB)
|
||||
}
|
||||
|
||||
func deepEqualJSON(a, b any) bool {
|
||||
switch valA := a.(type) {
|
||||
case map[string]any:
|
||||
|
||||
@@ -61,6 +61,15 @@ func SetQuotaCooldownDisabled(disable bool) {
|
||||
quotaCooldownDisabled.Store(disable)
|
||||
}
|
||||
|
||||
func quotaCooldownDisabledForAuth(auth *Auth) bool {
|
||||
if auth != nil {
|
||||
if override, ok := auth.DisableCoolingOverride(); ok {
|
||||
return override
|
||||
}
|
||||
}
|
||||
return quotaCooldownDisabled.Load()
|
||||
}
|
||||
|
||||
// Result captures execution outcome used to adjust auth state.
|
||||
type Result struct {
|
||||
// AuthID references the auth that produced this result.
|
||||
@@ -468,20 +477,16 @@ func (m *Manager) Execute(ctx context.Context, providers []string, req cliproxye
|
||||
return cliproxyexecutor.Response{}, &Error{Code: "provider_not_found", Message: "no provider supplied"}
|
||||
}
|
||||
|
||||
retryTimes, maxWait := m.retrySettings()
|
||||
attempts := retryTimes + 1
|
||||
if attempts < 1 {
|
||||
attempts = 1
|
||||
}
|
||||
_, maxWait := m.retrySettings()
|
||||
|
||||
var lastErr error
|
||||
for attempt := 0; attempt < attempts; attempt++ {
|
||||
for attempt := 0; ; attempt++ {
|
||||
resp, errExec := m.executeMixedOnce(ctx, normalized, req, opts)
|
||||
if errExec == nil {
|
||||
return resp, nil
|
||||
}
|
||||
lastErr = errExec
|
||||
wait, shouldRetry := m.shouldRetryAfterError(errExec, attempt, attempts, normalized, req.Model, maxWait)
|
||||
wait, shouldRetry := m.shouldRetryAfterError(errExec, attempt, normalized, req.Model, maxWait)
|
||||
if !shouldRetry {
|
||||
break
|
||||
}
|
||||
@@ -503,20 +508,16 @@ func (m *Manager) ExecuteCount(ctx context.Context, providers []string, req clip
|
||||
return cliproxyexecutor.Response{}, &Error{Code: "provider_not_found", Message: "no provider supplied"}
|
||||
}
|
||||
|
||||
retryTimes, maxWait := m.retrySettings()
|
||||
attempts := retryTimes + 1
|
||||
if attempts < 1 {
|
||||
attempts = 1
|
||||
}
|
||||
_, maxWait := m.retrySettings()
|
||||
|
||||
var lastErr error
|
||||
for attempt := 0; attempt < attempts; attempt++ {
|
||||
for attempt := 0; ; attempt++ {
|
||||
resp, errExec := m.executeCountMixedOnce(ctx, normalized, req, opts)
|
||||
if errExec == nil {
|
||||
return resp, nil
|
||||
}
|
||||
lastErr = errExec
|
||||
wait, shouldRetry := m.shouldRetryAfterError(errExec, attempt, attempts, normalized, req.Model, maxWait)
|
||||
wait, shouldRetry := m.shouldRetryAfterError(errExec, attempt, normalized, req.Model, maxWait)
|
||||
if !shouldRetry {
|
||||
break
|
||||
}
|
||||
@@ -538,20 +539,16 @@ func (m *Manager) ExecuteStream(ctx context.Context, providers []string, req cli
|
||||
return nil, &Error{Code: "provider_not_found", Message: "no provider supplied"}
|
||||
}
|
||||
|
||||
retryTimes, maxWait := m.retrySettings()
|
||||
attempts := retryTimes + 1
|
||||
if attempts < 1 {
|
||||
attempts = 1
|
||||
}
|
||||
_, maxWait := m.retrySettings()
|
||||
|
||||
var lastErr error
|
||||
for attempt := 0; attempt < attempts; attempt++ {
|
||||
for attempt := 0; ; attempt++ {
|
||||
chunks, errStream := m.executeStreamMixedOnce(ctx, normalized, req, opts)
|
||||
if errStream == nil {
|
||||
return chunks, nil
|
||||
}
|
||||
lastErr = errStream
|
||||
wait, shouldRetry := m.shouldRetryAfterError(errStream, attempt, attempts, normalized, req.Model, maxWait)
|
||||
wait, shouldRetry := m.shouldRetryAfterError(errStream, attempt, normalized, req.Model, maxWait)
|
||||
if !shouldRetry {
|
||||
break
|
||||
}
|
||||
@@ -1034,11 +1031,15 @@ func (m *Manager) retrySettings() (int, time.Duration) {
|
||||
return int(m.requestRetry.Load()), time.Duration(m.maxRetryInterval.Load())
|
||||
}
|
||||
|
||||
func (m *Manager) closestCooldownWait(providers []string, model string) (time.Duration, bool) {
|
||||
func (m *Manager) closestCooldownWait(providers []string, model string, attempt int) (time.Duration, bool) {
|
||||
if m == nil || len(providers) == 0 {
|
||||
return 0, false
|
||||
}
|
||||
now := time.Now()
|
||||
defaultRetry := int(m.requestRetry.Load())
|
||||
if defaultRetry < 0 {
|
||||
defaultRetry = 0
|
||||
}
|
||||
providerSet := make(map[string]struct{}, len(providers))
|
||||
for i := range providers {
|
||||
key := strings.TrimSpace(strings.ToLower(providers[i]))
|
||||
@@ -1061,6 +1062,16 @@ func (m *Manager) closestCooldownWait(providers []string, model string) (time.Du
|
||||
if _, ok := providerSet[providerKey]; !ok {
|
||||
continue
|
||||
}
|
||||
effectiveRetry := defaultRetry
|
||||
if override, ok := auth.RequestRetryOverride(); ok {
|
||||
effectiveRetry = override
|
||||
}
|
||||
if effectiveRetry < 0 {
|
||||
effectiveRetry = 0
|
||||
}
|
||||
if attempt >= effectiveRetry {
|
||||
continue
|
||||
}
|
||||
blocked, reason, next := isAuthBlockedForModel(auth, model, now)
|
||||
if !blocked || next.IsZero() || reason == blockReasonDisabled {
|
||||
continue
|
||||
@@ -1077,8 +1088,8 @@ func (m *Manager) closestCooldownWait(providers []string, model string) (time.Du
|
||||
return minWait, found
|
||||
}
|
||||
|
||||
func (m *Manager) shouldRetryAfterError(err error, attempt, maxAttempts int, providers []string, model string, maxWait time.Duration) (time.Duration, bool) {
|
||||
if err == nil || attempt >= maxAttempts-1 {
|
||||
func (m *Manager) shouldRetryAfterError(err error, attempt int, providers []string, model string, maxWait time.Duration) (time.Duration, bool) {
|
||||
if err == nil {
|
||||
return 0, false
|
||||
}
|
||||
if maxWait <= 0 {
|
||||
@@ -1087,7 +1098,7 @@ func (m *Manager) shouldRetryAfterError(err error, attempt, maxAttempts int, pro
|
||||
if status := statusCodeFromError(err); status == http.StatusOK {
|
||||
return 0, false
|
||||
}
|
||||
wait, found := m.closestCooldownWait(providers, model)
|
||||
wait, found := m.closestCooldownWait(providers, model, attempt)
|
||||
if !found || wait > maxWait {
|
||||
return 0, false
|
||||
}
|
||||
@@ -1176,7 +1187,7 @@ func (m *Manager) MarkResult(ctx context.Context, result Result) {
|
||||
if result.RetryAfter != nil {
|
||||
next = now.Add(*result.RetryAfter)
|
||||
} else {
|
||||
cooldown, nextLevel := nextQuotaCooldown(backoffLevel)
|
||||
cooldown, nextLevel := nextQuotaCooldown(backoffLevel, quotaCooldownDisabledForAuth(auth))
|
||||
if cooldown > 0 {
|
||||
next = now.Add(cooldown)
|
||||
}
|
||||
@@ -1193,7 +1204,7 @@ func (m *Manager) MarkResult(ctx context.Context, result Result) {
|
||||
shouldSuspendModel = true
|
||||
setModelQuota = true
|
||||
case 408, 500, 502, 503, 504:
|
||||
if quotaCooldownDisabled.Load() {
|
||||
if quotaCooldownDisabledForAuth(auth) {
|
||||
state.NextRetryAfter = time.Time{}
|
||||
} else {
|
||||
next := now.Add(1 * time.Minute)
|
||||
@@ -1439,7 +1450,7 @@ func applyAuthFailureState(auth *Auth, resultErr *Error, retryAfter *time.Durati
|
||||
if retryAfter != nil {
|
||||
next = now.Add(*retryAfter)
|
||||
} else {
|
||||
cooldown, nextLevel := nextQuotaCooldown(auth.Quota.BackoffLevel)
|
||||
cooldown, nextLevel := nextQuotaCooldown(auth.Quota.BackoffLevel, quotaCooldownDisabledForAuth(auth))
|
||||
if cooldown > 0 {
|
||||
next = now.Add(cooldown)
|
||||
}
|
||||
@@ -1449,7 +1460,7 @@ func applyAuthFailureState(auth *Auth, resultErr *Error, retryAfter *time.Durati
|
||||
auth.NextRetryAfter = next
|
||||
case 408, 500, 502, 503, 504:
|
||||
auth.StatusMessage = "transient upstream error"
|
||||
if quotaCooldownDisabled.Load() {
|
||||
if quotaCooldownDisabledForAuth(auth) {
|
||||
auth.NextRetryAfter = time.Time{}
|
||||
} else {
|
||||
auth.NextRetryAfter = now.Add(1 * time.Minute)
|
||||
@@ -1462,11 +1473,11 @@ func applyAuthFailureState(auth *Auth, resultErr *Error, retryAfter *time.Durati
|
||||
}
|
||||
|
||||
// nextQuotaCooldown returns the next cooldown duration and updated backoff level for repeated quota errors.
|
||||
func nextQuotaCooldown(prevLevel int) (time.Duration, int) {
|
||||
func nextQuotaCooldown(prevLevel int, disableCooling bool) (time.Duration, int) {
|
||||
if prevLevel < 0 {
|
||||
prevLevel = 0
|
||||
}
|
||||
if quotaCooldownDisabled.Load() {
|
||||
if disableCooling {
|
||||
return 0, prevLevel
|
||||
}
|
||||
cooldown := quotaBackoffBase * time.Duration(1<<prevLevel)
|
||||
@@ -1642,6 +1653,9 @@ func (m *Manager) persist(ctx context.Context, auth *Auth) error {
|
||||
if m.store == nil || auth == nil {
|
||||
return nil
|
||||
}
|
||||
if shouldSkipPersist(ctx) {
|
||||
return nil
|
||||
}
|
||||
if auth.Attributes != nil {
|
||||
if v := strings.ToLower(strings.TrimSpace(auth.Attributes["runtime_only"])); v == "true" {
|
||||
return nil
|
||||
|
||||
97
sdk/cliproxy/auth/conductor_overrides_test.go
Normal file
97
sdk/cliproxy/auth/conductor_overrides_test.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package auth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestManager_ShouldRetryAfterError_RespectsAuthRequestRetryOverride(t *testing.T) {
|
||||
m := NewManager(nil, nil, nil)
|
||||
m.SetRetryConfig(3, 30*time.Second)
|
||||
|
||||
model := "test-model"
|
||||
next := time.Now().Add(5 * time.Second)
|
||||
|
||||
auth := &Auth{
|
||||
ID: "auth-1",
|
||||
Provider: "claude",
|
||||
Metadata: map[string]any{
|
||||
"request_retry": float64(0),
|
||||
},
|
||||
ModelStates: map[string]*ModelState{
|
||||
model: {
|
||||
Unavailable: true,
|
||||
Status: StatusError,
|
||||
NextRetryAfter: next,
|
||||
},
|
||||
},
|
||||
}
|
||||
if _, errRegister := m.Register(context.Background(), auth); errRegister != nil {
|
||||
t.Fatalf("register auth: %v", errRegister)
|
||||
}
|
||||
|
||||
_, maxWait := m.retrySettings()
|
||||
wait, shouldRetry := m.shouldRetryAfterError(&Error{HTTPStatus: 500, Message: "boom"}, 0, []string{"claude"}, model, maxWait)
|
||||
if shouldRetry {
|
||||
t.Fatalf("expected shouldRetry=false for request_retry=0, got true (wait=%v)", wait)
|
||||
}
|
||||
|
||||
auth.Metadata["request_retry"] = float64(1)
|
||||
if _, errUpdate := m.Update(context.Background(), auth); errUpdate != nil {
|
||||
t.Fatalf("update auth: %v", errUpdate)
|
||||
}
|
||||
|
||||
wait, shouldRetry = m.shouldRetryAfterError(&Error{HTTPStatus: 500, Message: "boom"}, 0, []string{"claude"}, model, maxWait)
|
||||
if !shouldRetry {
|
||||
t.Fatalf("expected shouldRetry=true for request_retry=1, got false")
|
||||
}
|
||||
if wait <= 0 {
|
||||
t.Fatalf("expected wait > 0, got %v", wait)
|
||||
}
|
||||
|
||||
_, shouldRetry = m.shouldRetryAfterError(&Error{HTTPStatus: 500, Message: "boom"}, 1, []string{"claude"}, model, maxWait)
|
||||
if shouldRetry {
|
||||
t.Fatalf("expected shouldRetry=false on attempt=1 for request_retry=1, got true")
|
||||
}
|
||||
}
|
||||
|
||||
func TestManager_MarkResult_RespectsAuthDisableCoolingOverride(t *testing.T) {
|
||||
prev := quotaCooldownDisabled.Load()
|
||||
quotaCooldownDisabled.Store(false)
|
||||
t.Cleanup(func() { quotaCooldownDisabled.Store(prev) })
|
||||
|
||||
m := NewManager(nil, nil, nil)
|
||||
|
||||
auth := &Auth{
|
||||
ID: "auth-1",
|
||||
Provider: "claude",
|
||||
Metadata: map[string]any{
|
||||
"disable_cooling": true,
|
||||
},
|
||||
}
|
||||
if _, errRegister := m.Register(context.Background(), auth); errRegister != nil {
|
||||
t.Fatalf("register auth: %v", errRegister)
|
||||
}
|
||||
|
||||
model := "test-model"
|
||||
m.MarkResult(context.Background(), Result{
|
||||
AuthID: "auth-1",
|
||||
Provider: "claude",
|
||||
Model: model,
|
||||
Success: false,
|
||||
Error: &Error{HTTPStatus: 500, Message: "boom"},
|
||||
})
|
||||
|
||||
updated, ok := m.GetByID("auth-1")
|
||||
if !ok || updated == nil {
|
||||
t.Fatalf("expected auth to be present")
|
||||
}
|
||||
state := updated.ModelStates[model]
|
||||
if state == nil {
|
||||
t.Fatalf("expected model state to be present")
|
||||
}
|
||||
if !state.NextRetryAfter.IsZero() {
|
||||
t.Fatalf("expected NextRetryAfter to be zero when disable_cooling=true, got %v", state.NextRetryAfter)
|
||||
}
|
||||
}
|
||||
24
sdk/cliproxy/auth/persist_policy.go
Normal file
24
sdk/cliproxy/auth/persist_policy.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package auth
|
||||
|
||||
import "context"
|
||||
|
||||
type skipPersistContextKey struct{}
|
||||
|
||||
// WithSkipPersist returns a derived context that disables persistence for Manager Update/Register calls.
|
||||
// It is intended for code paths that are reacting to file watcher events, where the file on disk is
|
||||
// already the source of truth and persisting again would create a write-back loop.
|
||||
func WithSkipPersist(ctx context.Context) context.Context {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
return context.WithValue(ctx, skipPersistContextKey{}, true)
|
||||
}
|
||||
|
||||
func shouldSkipPersist(ctx context.Context) bool {
|
||||
if ctx == nil {
|
||||
return false
|
||||
}
|
||||
v := ctx.Value(skipPersistContextKey{})
|
||||
enabled, ok := v.(bool)
|
||||
return ok && enabled
|
||||
}
|
||||
62
sdk/cliproxy/auth/persist_policy_test.go
Normal file
62
sdk/cliproxy/auth/persist_policy_test.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package auth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type countingStore struct {
|
||||
saveCount atomic.Int32
|
||||
}
|
||||
|
||||
func (s *countingStore) List(context.Context) ([]*Auth, error) { return nil, nil }
|
||||
|
||||
func (s *countingStore) Save(context.Context, *Auth) (string, error) {
|
||||
s.saveCount.Add(1)
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (s *countingStore) Delete(context.Context, string) error { return nil }
|
||||
|
||||
func TestWithSkipPersist_DisablesUpdatePersistence(t *testing.T) {
|
||||
store := &countingStore{}
|
||||
mgr := NewManager(store, nil, nil)
|
||||
auth := &Auth{
|
||||
ID: "auth-1",
|
||||
Provider: "antigravity",
|
||||
Metadata: map[string]any{"type": "antigravity"},
|
||||
}
|
||||
|
||||
if _, err := mgr.Update(context.Background(), auth); err != nil {
|
||||
t.Fatalf("Update returned error: %v", err)
|
||||
}
|
||||
if got := store.saveCount.Load(); got != 1 {
|
||||
t.Fatalf("expected 1 Save call, got %d", got)
|
||||
}
|
||||
|
||||
ctxSkip := WithSkipPersist(context.Background())
|
||||
if _, err := mgr.Update(ctxSkip, auth); err != nil {
|
||||
t.Fatalf("Update(skipPersist) returned error: %v", err)
|
||||
}
|
||||
if got := store.saveCount.Load(); got != 1 {
|
||||
t.Fatalf("expected Save call count to remain 1, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithSkipPersist_DisablesRegisterPersistence(t *testing.T) {
|
||||
store := &countingStore{}
|
||||
mgr := NewManager(store, nil, nil)
|
||||
auth := &Auth{
|
||||
ID: "auth-1",
|
||||
Provider: "antigravity",
|
||||
Metadata: map[string]any{"type": "antigravity"},
|
||||
}
|
||||
|
||||
if _, err := mgr.Register(WithSkipPersist(context.Background()), auth); err != nil {
|
||||
t.Fatalf("Register(skipPersist) returned error: %v", err)
|
||||
}
|
||||
if got := store.saveCount.Load(); got != 0 {
|
||||
t.Fatalf("expected 0 Save calls, got %d", got)
|
||||
}
|
||||
}
|
||||
@@ -194,6 +194,108 @@ func (a *Auth) ProxyInfo() string {
|
||||
return "via proxy"
|
||||
}
|
||||
|
||||
// DisableCoolingOverride returns the auth-file scoped disable_cooling override when present.
|
||||
// The value is read from metadata key "disable_cooling" (or legacy "disable-cooling").
|
||||
func (a *Auth) DisableCoolingOverride() (bool, bool) {
|
||||
if a == nil || a.Metadata == nil {
|
||||
return false, false
|
||||
}
|
||||
if val, ok := a.Metadata["disable_cooling"]; ok {
|
||||
if parsed, okParse := parseBoolAny(val); okParse {
|
||||
return parsed, true
|
||||
}
|
||||
}
|
||||
if val, ok := a.Metadata["disable-cooling"]; ok {
|
||||
if parsed, okParse := parseBoolAny(val); okParse {
|
||||
return parsed, true
|
||||
}
|
||||
}
|
||||
return false, false
|
||||
}
|
||||
|
||||
// RequestRetryOverride returns the auth-file scoped request_retry override when present.
|
||||
// The value is read from metadata key "request_retry" (or legacy "request-retry").
|
||||
func (a *Auth) RequestRetryOverride() (int, bool) {
|
||||
if a == nil || a.Metadata == nil {
|
||||
return 0, false
|
||||
}
|
||||
if val, ok := a.Metadata["request_retry"]; ok {
|
||||
if parsed, okParse := parseIntAny(val); okParse {
|
||||
if parsed < 0 {
|
||||
parsed = 0
|
||||
}
|
||||
return parsed, true
|
||||
}
|
||||
}
|
||||
if val, ok := a.Metadata["request-retry"]; ok {
|
||||
if parsed, okParse := parseIntAny(val); okParse {
|
||||
if parsed < 0 {
|
||||
parsed = 0
|
||||
}
|
||||
return parsed, true
|
||||
}
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func parseBoolAny(val any) (bool, bool) {
|
||||
switch typed := val.(type) {
|
||||
case bool:
|
||||
return typed, true
|
||||
case string:
|
||||
trimmed := strings.TrimSpace(typed)
|
||||
if trimmed == "" {
|
||||
return false, false
|
||||
}
|
||||
parsed, err := strconv.ParseBool(trimmed)
|
||||
if err != nil {
|
||||
return false, false
|
||||
}
|
||||
return parsed, true
|
||||
case float64:
|
||||
return typed != 0, true
|
||||
case json.Number:
|
||||
parsed, err := typed.Int64()
|
||||
if err != nil {
|
||||
return false, false
|
||||
}
|
||||
return parsed != 0, true
|
||||
default:
|
||||
return false, false
|
||||
}
|
||||
}
|
||||
|
||||
func parseIntAny(val any) (int, bool) {
|
||||
switch typed := val.(type) {
|
||||
case int:
|
||||
return typed, true
|
||||
case int32:
|
||||
return int(typed), true
|
||||
case int64:
|
||||
return int(typed), true
|
||||
case float64:
|
||||
return int(typed), true
|
||||
case json.Number:
|
||||
parsed, err := typed.Int64()
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
return int(parsed), true
|
||||
case string:
|
||||
trimmed := strings.TrimSpace(typed)
|
||||
if trimmed == "" {
|
||||
return 0, false
|
||||
}
|
||||
parsed, err := strconv.Atoi(trimmed)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
return parsed, true
|
||||
default:
|
||||
return 0, false
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Auth) AccountInfo() (string, string) {
|
||||
if a == nil {
|
||||
return "", ""
|
||||
|
||||
@@ -135,6 +135,7 @@ func (s *Service) ensureAuthUpdateQueue(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (s *Service) consumeAuthUpdates(ctx context.Context) {
|
||||
ctx = coreauth.WithSkipPersist(ctx)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
||||
Reference in New Issue
Block a user