feat(gitlab): add duo streaming transport

This commit is contained in:
LuxVTZ
2026-03-10 18:39:25 +04:00
parent 54c3eb1b1e
commit c631df8c3b
3 changed files with 843 additions and 27 deletions

View File

@@ -1,6 +1,7 @@
package executor
import (
"bufio"
"bytes"
"context"
"encoding/json"
@@ -14,6 +15,7 @@ import (
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
"github.com/router-for-me/CLIProxyAPI/v6/internal/registry"
"github.com/router-for-me/CLIProxyAPI/v6/internal/thinking"
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor"
sdktranslator "github.com/router-for-me/CLIProxyAPI/v6/sdk/translator"
@@ -26,6 +28,7 @@ const (
gitLabAuthMethodPAT = "pat"
gitLabChatEndpoint = "/api/v4/chat/completions"
gitLabCodeSuggestionsEndpoint = "/api/v4/code_suggestions/completions"
gitLabSSEStreamingHeader = "X-Supports-Sse-Streaming"
)
type GitLabExecutor struct {
@@ -40,6 +43,15 @@ type gitLabPrompt struct {
CodeSuggestionContext []map[string]any
}
type gitLabOpenAIStreamState struct {
ID string
Model string
Created int64
LastFullText string
Started bool
Finished bool
}
func NewGitLabExecutor(cfg *config.Config) *GitLabExecutor {
return &GitLabExecutor{cfg: cfg}
}
@@ -62,7 +74,7 @@ func (e *GitLabExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
return resp, err
}
text, err := e.invoke(ctx, auth, prompt)
text, err := e.invokeText(ctx, auth, prompt)
if err != nil {
return resp, err
}
@@ -101,11 +113,16 @@ func (e *GitLabExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
return nil, statusErr{code: http.StatusBadRequest, msg: "gitlab duo executor: request has no usable text content"}
}
text, err := e.invoke(ctx, auth, prompt)
if result, streamErr := e.requestCodeSuggestionsStream(ctx, auth, prompt, translated, req, opts, reporter); streamErr == nil {
return result, nil
} else if !shouldFallbackToCodeSuggestions(streamErr) {
return nil, streamErr
}
text, err := e.invokeText(ctx, auth, prompt)
if err != nil {
return nil, err
}
responseModel := gitLabResolvedModel(auth, req.Model)
openAIResponse := buildGitLabOpenAIResponse(responseModel, text, translated)
reporter.publish(ctx, parseOpenAIUsage(openAIResponse))
@@ -215,7 +232,7 @@ func (e *GitLabExecutor) translateToOpenAI(req cliproxyexecutor.Request, opts cl
return sdktranslator.TranslateRequest(opts.SourceFormat, sdktranslator.FromString("openai"), baseModel, req.Payload, opts.Stream), nil
}
func (e *GitLabExecutor) invoke(ctx context.Context, auth *cliproxyauth.Auth, prompt gitLabPrompt) (string, error) {
func (e *GitLabExecutor) invokeText(ctx context.Context, auth *cliproxyauth.Auth, prompt gitLabPrompt) (string, error) {
if text, err := e.requestChat(ctx, auth, prompt); err == nil {
return text, nil
} else if !shouldFallbackToCodeSuggestions(err) {
@@ -257,27 +274,189 @@ func (e *GitLabExecutor) requestCodeSuggestions(ctx context.Context, auth *clipr
return e.doJSONTextRequest(ctx, auth, gitLabCodeSuggestionsEndpoint, body)
}
func (e *GitLabExecutor) requestCodeSuggestionsStream(
ctx context.Context,
auth *cliproxyauth.Auth,
prompt gitLabPrompt,
translated []byte,
req cliproxyexecutor.Request,
opts cliproxyexecutor.Options,
reporter *usageReporter,
) (*cliproxyexecutor.StreamResult, error) {
contentAbove := strings.TrimSpace(prompt.ContentAboveCursor)
if contentAbove == "" {
contentAbove = prompt.Instruction
}
body := map[string]any{
"current_file": map[string]any{
"file_name": prompt.FileName,
"content_above_cursor": contentAbove,
"content_below_cursor": "",
},
"intent": "generation",
"generation_type": "small_file",
"user_instruction": prompt.Instruction,
"stream": true,
}
if len(prompt.CodeSuggestionContext) > 0 {
body["context"] = prompt.CodeSuggestionContext
}
httpResp, bodyRaw, err := e.doJSONRequest(ctx, auth, gitLabCodeSuggestionsEndpoint, body, "text/event-stream")
if err != nil {
return nil, err
}
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
defer func() { _ = httpResp.Body.Close() }()
respBody, readErr := io.ReadAll(httpResp.Body)
if readErr != nil {
recordAPIResponseError(ctx, e.cfg, readErr)
return nil, readErr
}
appendAPIResponseChunk(ctx, e.cfg, respBody)
return nil, statusErr{code: httpResp.StatusCode, msg: strings.TrimSpace(string(respBody))}
}
responseModel := gitLabResolvedModel(auth, req.Model)
out := make(chan cliproxyexecutor.StreamChunk, 16)
go func() {
defer close(out)
defer func() { _ = httpResp.Body.Close() }()
scanner := bufio.NewScanner(httpResp.Body)
scanner.Buffer(nil, 52_428_800)
var (
param any
eventName string
state gitLabOpenAIStreamState
)
for scanner.Scan() {
line := bytes.Clone(scanner.Bytes())
appendAPIResponseChunk(ctx, e.cfg, line)
trimmed := bytes.TrimSpace(line)
if len(trimmed) == 0 {
continue
}
if bytes.HasPrefix(trimmed, []byte("event:")) {
eventName = strings.TrimSpace(string(trimmed[len("event:"):]))
continue
}
if !bytes.HasPrefix(trimmed, []byte("data:")) {
continue
}
payload := bytes.TrimSpace(trimmed[len("data:"):])
normalized := normalizeGitLabStreamChunk(eventName, payload, responseModel, &state)
eventName = ""
for _, item := range normalized {
if detail, ok := parseOpenAIStreamUsage(item); ok {
reporter.publish(ctx, detail)
}
chunks := sdktranslator.TranslateStream(
ctx,
sdktranslator.FromString("openai"),
opts.SourceFormat,
req.Model,
opts.OriginalRequest,
translated,
item,
&param,
)
for i := range chunks {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
}
}
}
if errScan := scanner.Err(); errScan != nil {
recordAPIResponseError(ctx, e.cfg, errScan)
reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
return
}
if !state.Finished {
for _, item := range finalizeGitLabStream(responseModel, &state) {
chunks := sdktranslator.TranslateStream(
ctx,
sdktranslator.FromString("openai"),
opts.SourceFormat,
req.Model,
opts.OriginalRequest,
translated,
item,
&param,
)
for i := range chunks {
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
}
}
}
reporter.ensurePublished(ctx)
}()
return &cliproxyexecutor.StreamResult{
Headers: cloneGitLabStreamHeaders(httpResp.Header, bodyRaw),
Chunks: out,
}, nil
}
func (e *GitLabExecutor) doJSONTextRequest(ctx context.Context, auth *cliproxyauth.Auth, endpoint string, payload map[string]any) (string, error) {
resp, _, err := e.doJSONRequest(ctx, auth, endpoint, payload, "application/json")
if err != nil {
return "", err
}
defer func() { _ = resp.Body.Close() }()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return "", err
}
appendAPIResponseChunk(ctx, e.cfg, respBody)
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return "", statusErr{code: resp.StatusCode, msg: strings.TrimSpace(string(respBody))}
}
text, err := parseGitLabTextResponse(endpoint, respBody)
if err != nil {
return "", err
}
return strings.TrimSpace(text), nil
}
func (e *GitLabExecutor) doJSONRequest(
ctx context.Context,
auth *cliproxyauth.Auth,
endpoint string,
payload map[string]any,
accept string,
) (*http.Response, []byte, error) {
token := gitLabPrimaryToken(auth)
baseURL := gitLabBaseURL(auth)
if token == "" || baseURL == "" {
return "", statusErr{code: http.StatusUnauthorized, msg: "gitlab duo executor: missing credentials"}
return nil, nil, statusErr{code: http.StatusUnauthorized, msg: "gitlab duo executor: missing credentials"}
}
body, err := json.Marshal(payload)
if err != nil {
return "", fmt.Errorf("gitlab duo executor: marshal request failed: %w", err)
return nil, nil, fmt.Errorf("gitlab duo executor: marshal request failed: %w", err)
}
url := strings.TrimRight(baseURL, "/") + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return "", err
return nil, nil, err
}
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
req.Header.Set("Accept", accept)
req.Header.Set("User-Agent", "CLIProxyAPI/GitLab-Duo")
applyGitLabRequestHeaders(req, auth)
if strings.EqualFold(accept, "text/event-stream") {
req.Header.Set("Cache-Control", "no-cache")
req.Header.Set(gitLabSSEStreamingHeader, "true")
req.Header.Set("Accept-Encoding", "identity")
}
var authID, authLabel, authType, authValue string
if auth != nil {
@@ -301,27 +480,10 @@ func (e *GitLabExecutor) doJSONTextRequest(ctx context.Context, auth *cliproxyau
resp, err := httpClient.Do(req)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return "", err
return nil, body, err
}
defer func() { _ = resp.Body.Close() }()
recordAPIResponseMetadata(ctx, e.cfg, resp.StatusCode, resp.Header.Clone())
respBody, err := io.ReadAll(resp.Body)
if err != nil {
recordAPIResponseError(ctx, e.cfg, err)
return "", err
}
appendAPIResponseChunk(ctx, e.cfg, respBody)
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return "", statusErr{code: resp.StatusCode, msg: strings.TrimSpace(string(respBody))}
}
text, err := parseGitLabTextResponse(endpoint, respBody)
if err != nil {
return "", err
}
return strings.TrimSpace(text), nil
return resp, body, nil
}
func (e *GitLabExecutor) refreshOAuthToken(ctx context.Context, client *gitlab.AuthClient, auth *cliproxyauth.Auth, baseURL string) (*gitlab.TokenResponse, error) {
@@ -455,6 +617,236 @@ func parseGitLabTextResponse(endpoint string, body []byte) (string, error) {
return "", fmt.Errorf("gitlab duo executor: upstream returned no text payload")
}
func applyGitLabRequestHeaders(req *http.Request, auth *cliproxyauth.Auth) {
if req == nil {
return
}
if auth != nil {
util.ApplyCustomHeadersFromAttrs(req, auth.Attributes)
}
for key, value := range gitLabGatewayHeaders(auth) {
if key == "" || value == "" {
continue
}
req.Header.Set(key, value)
}
}
func gitLabGatewayHeaders(auth *cliproxyauth.Auth) map[string]string {
if auth == nil || auth.Metadata == nil {
return nil
}
raw, ok := auth.Metadata["duo_gateway_headers"]
if !ok {
return nil
}
out := make(map[string]string)
switch typed := raw.(type) {
case map[string]string:
for key, value := range typed {
key = strings.TrimSpace(key)
value = strings.TrimSpace(value)
if key != "" && value != "" {
out[key] = value
}
}
case map[string]any:
for key, value := range typed {
key = strings.TrimSpace(key)
if key == "" {
continue
}
strValue := strings.TrimSpace(fmt.Sprint(value))
if strValue != "" {
out[key] = strValue
}
}
}
if len(out) == 0 {
return nil
}
return out
}
func cloneGitLabStreamHeaders(headers http.Header, _ []byte) http.Header {
cloned := headers.Clone()
if cloned == nil {
cloned = make(http.Header)
}
cloned.Set("Content-Type", "text/event-stream")
return cloned
}
func normalizeGitLabStreamChunk(eventName string, payload []byte, fallbackModel string, state *gitLabOpenAIStreamState) [][]byte {
payload = bytes.TrimSpace(payload)
if len(payload) == 0 {
return nil
}
if bytes.Equal(payload, []byte("[DONE]")) {
return finalizeGitLabStream(fallbackModel, state)
}
root := gjson.ParseBytes(payload)
if root.Exists() {
if obj := root.Get("object").String(); obj == "chat.completion.chunk" {
return [][]byte{append([]byte("data: "), bytes.Clone(payload)...)}
}
if root.Get("choices.0.delta").Exists() || root.Get("choices.0.finish_reason").Exists() {
return [][]byte{append([]byte("data: "), bytes.Clone(payload)...)}
}
}
state.ensureInitialized(fallbackModel, root)
switch strings.TrimSpace(eventName) {
case "stream_end":
return finalizeGitLabStream(fallbackModel, state)
case "stream_start":
if text := extractGitLabStreamText(root); text != "" {
return state.emitText(text)
}
return nil
}
if done := root.Get("done"); done.Exists() && done.Bool() {
return finalizeGitLabStream(fallbackModel, state)
}
if finishReason := strings.TrimSpace(root.Get("finish_reason").String()); finishReason != "" {
out := state.emitText(extractGitLabStreamText(root))
return append(out, state.finish(finishReason)...)
}
return state.emitText(extractGitLabStreamText(root))
}
func extractGitLabStreamText(root gjson.Result) string {
for _, key := range []string{
"choices.0.delta.content",
"choices.0.text",
"delta.content",
"content_chunk",
"content",
"text",
"response",
"completion",
} {
if value := root.Get(key).String(); strings.TrimSpace(value) != "" {
return value
}
}
return ""
}
func finalizeGitLabStream(fallbackModel string, state *gitLabOpenAIStreamState) [][]byte {
if state == nil {
return nil
}
state.ensureInitialized(fallbackModel, gjson.Result{})
return state.finish("stop")
}
func (s *gitLabOpenAIStreamState) ensureInitialized(fallbackModel string, root gjson.Result) {
if s == nil {
return
}
if s.ID == "" {
s.ID = fmt.Sprintf("gitlab-%d", time.Now().UnixNano())
}
if s.Created == 0 {
s.Created = time.Now().Unix()
}
if s.Model == "" {
for _, key := range []string{"model.name", "model", "metadata.model_name"} {
if value := strings.TrimSpace(root.Get(key).String()); value != "" {
s.Model = value
break
}
}
}
if s.Model == "" {
s.Model = fallbackModel
}
}
func (s *gitLabOpenAIStreamState) emitText(text string) [][]byte {
if s == nil {
return nil
}
if strings.TrimSpace(text) == "" {
return nil
}
delta := s.nextDelta(text)
if delta == "" {
return nil
}
out := make([][]byte, 0, 2)
if !s.Started {
out = append(out, s.buildChunk(map[string]any{"role": "assistant"}, ""))
s.Started = true
}
out = append(out, s.buildChunk(map[string]any{"content": delta}, ""))
return out
}
func (s *gitLabOpenAIStreamState) finish(reason string) [][]byte {
if s == nil || s.Finished {
return nil
}
if !s.Started {
s.Started = true
}
s.Finished = true
return [][]byte{
s.buildChunk(map[string]any{}, reason),
[]byte("data: [DONE]"),
}
}
func (s *gitLabOpenAIStreamState) nextDelta(text string) string {
if s == nil {
return text
}
if strings.TrimSpace(text) == "" {
return ""
}
if s.LastFullText == "" {
s.LastFullText = text
return text
}
if text == s.LastFullText {
return ""
}
if strings.HasPrefix(text, s.LastFullText) {
delta := text[len(s.LastFullText):]
s.LastFullText = text
return delta
}
s.LastFullText += text
return text
}
func (s *gitLabOpenAIStreamState) buildChunk(delta map[string]any, finishReason string) []byte {
payload := map[string]any{
"id": s.ID,
"object": "chat.completion.chunk",
"created": s.Created,
"model": s.Model,
"choices": []map[string]any{{
"index": 0,
"delta": delta,
}},
}
if finishReason != "" {
payload["choices"] = []map[string]any{{
"index": 0,
"delta": delta,
"finish_reason": finishReason,
}}
}
raw, _ := json.Marshal(payload)
return append([]byte("data: "), raw...)
}
func shouldFallbackToCodeSuggestions(err error) bool {
if err == nil {
return false

View File

@@ -3,8 +3,10 @@ package executor
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
@@ -153,3 +155,147 @@ func TestGitLabExecutorRefreshUpdatesMetadata(t *testing.T) {
t.Fatalf("expected refreshed model metadata, got %#v", got)
}
}
func TestGitLabExecutorExecuteStreamUsesCodeSuggestionsSSE(t *testing.T) {
var gotAccept, gotStreamingHeader, gotEncoding string
var gotStreamFlag bool
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != gitLabCodeSuggestionsEndpoint {
t.Fatalf("unexpected path %q", r.URL.Path)
}
gotAccept = r.Header.Get("Accept")
gotStreamingHeader = r.Header.Get(gitLabSSEStreamingHeader)
gotEncoding = r.Header.Get("Accept-Encoding")
gotStreamFlag = gjson.GetBytes(readBody(t, r), "stream").Bool()
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte("event: stream_start\n"))
_, _ = w.Write([]byte("data: {\"model\":{\"name\":\"claude-sonnet-4-5\"}}\n\n"))
_, _ = w.Write([]byte("event: content_chunk\n"))
_, _ = w.Write([]byte("data: {\"content\":\"hello\"}\n\n"))
_, _ = w.Write([]byte("event: content_chunk\n"))
_, _ = w.Write([]byte("data: {\"content\":\" world\"}\n\n"))
_, _ = w.Write([]byte("event: stream_end\n"))
_, _ = w.Write([]byte("data: {}\n\n"))
}))
defer srv.Close()
exec := NewGitLabExecutor(&config.Config{})
auth := &cliproxyauth.Auth{
Provider: "gitlab",
Metadata: map[string]any{
"base_url": srv.URL,
"access_token": "oauth-access",
"model_name": "claude-sonnet-4-5",
},
}
req := cliproxyexecutor.Request{
Model: "gitlab-duo",
Payload: []byte(`{"model":"gitlab-duo","stream":true,"messages":[{"role":"user","content":"hello"}]}`),
}
result, err := exec.ExecuteStream(context.Background(), auth, req, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai"),
})
if err != nil {
t.Fatalf("ExecuteStream() error = %v", err)
}
lines := collectStreamLines(t, result)
if gotAccept != "text/event-stream" {
t.Fatalf("Accept = %q, want text/event-stream", gotAccept)
}
if gotStreamingHeader != "true" {
t.Fatalf("%s = %q, want true", gitLabSSEStreamingHeader, gotStreamingHeader)
}
if gotEncoding != "identity" {
t.Fatalf("Accept-Encoding = %q, want identity", gotEncoding)
}
if !gotStreamFlag {
t.Fatalf("expected upstream request to set stream=true")
}
if len(lines) < 4 {
t.Fatalf("expected translated stream chunks, got %d", len(lines))
}
if !strings.Contains(strings.Join(lines, "\n"), `"content":"hello"`) {
t.Fatalf("expected hello delta in stream, got %q", strings.Join(lines, "\n"))
}
if !strings.Contains(strings.Join(lines, "\n"), `"content":" world"`) {
t.Fatalf("expected world delta in stream, got %q", strings.Join(lines, "\n"))
}
if last := lines[len(lines)-1]; last != "data: [DONE]" {
t.Fatalf("expected stream terminator, got %q", last)
}
}
func TestGitLabExecutorExecuteStreamFallsBackToSyntheticChat(t *testing.T) {
chatCalls := 0
streamCalls := 0
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case gitLabCodeSuggestionsEndpoint:
streamCalls++
http.Error(w, "feature unavailable", http.StatusForbidden)
case gitLabChatEndpoint:
chatCalls++
_, _ = w.Write([]byte(`"chat fallback response"`))
default:
t.Fatalf("unexpected path %q", r.URL.Path)
}
}))
defer srv.Close()
exec := NewGitLabExecutor(&config.Config{})
auth := &cliproxyauth.Auth{
Provider: "gitlab",
Metadata: map[string]any{
"base_url": srv.URL,
"access_token": "oauth-access",
"model_name": "claude-sonnet-4-5",
},
}
req := cliproxyexecutor.Request{
Model: "gitlab-duo",
Payload: []byte(`{"model":"gitlab-duo","stream":true,"messages":[{"role":"user","content":"hello"}]}`),
}
result, err := exec.ExecuteStream(context.Background(), auth, req, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai"),
})
if err != nil {
t.Fatalf("ExecuteStream() error = %v", err)
}
lines := collectStreamLines(t, result)
if streamCalls != 1 {
t.Fatalf("expected streaming endpoint once, got %d", streamCalls)
}
if chatCalls != 1 {
t.Fatalf("expected chat fallback once, got %d", chatCalls)
}
if !strings.Contains(strings.Join(lines, "\n"), `"content":"chat fallback response"`) {
t.Fatalf("expected fallback content in stream, got %q", strings.Join(lines, "\n"))
}
}
func collectStreamLines(t *testing.T, result *cliproxyexecutor.StreamResult) []string {
t.Helper()
lines := make([]string, 0, 8)
for chunk := range result.Chunks {
if chunk.Err != nil {
t.Fatalf("unexpected stream error: %v", chunk.Err)
}
lines = append(lines, string(chunk.Payload))
}
return lines
}
func readBody(t *testing.T, r *http.Request) []byte {
t.Helper()
defer func() { _ = r.Body.Close() }()
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatalf("ReadAll() error = %v", err)
}
return body
}