From e11637dc62da1e0c8ff3960dcd1dad32fef60d4c Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Mon, 1 Sep 2025 11:00:47 +0800 Subject: [PATCH] Refactor translator packages for OpenAI Chat Completions - Renamed `openai` packages to `chat_completions` across translator modules. - Introduced `openai_responses_handlers` with handlers for `/v1/models` and OpenAI-compatible chat completions endpoints. - Updated constants and registry identifiers for OpenAI response type. - Simplified request/response conversions and added detailed retry/error handling. - Added `golang.org/x/crypto` for additional cryptographic functions. --- .../openai/openai_responses_handlers.go | 217 +----------------- internal/api/server.go | 2 + internal/client/codex_client.go | 6 +- 3 files changed, 9 insertions(+), 216 deletions(-) diff --git a/internal/api/handlers/openai/openai_responses_handlers.go b/internal/api/handlers/openai/openai_responses_handlers.go index bc92e4da..43ddf70b 100644 --- a/internal/api/handlers/openai/openai_responses_handlers.go +++ b/internal/api/handlers/openai/openai_responses_handlers.go @@ -64,13 +64,13 @@ func (h *OpenAIResponsesAPIHandler) OpenAIResponsesModels(c *gin.Context) { }) } -// ChatCompletions handles the /v1/chat/completions endpoint. +// Responses handles the /v1/responses endpoint. // It determines whether the request is for a streaming or non-streaming response // and calls the appropriate handler based on the model provider. // // Parameters: // - c: The Gin context containing the HTTP request and response -func (h *OpenAIResponsesAPIHandler) ChatCompletions(c *gin.Context) { +func (h *OpenAIResponsesAPIHandler) Responses(c *gin.Context) { rawJSON, err := c.GetRawData() // If data retrieval fails, return a 400 Bad Request error. if err != nil { @@ -93,36 +93,6 @@ func (h *OpenAIResponsesAPIHandler) ChatCompletions(c *gin.Context) { } -// Completions handles the /v1/completions endpoint. -// It determines whether the request is for a streaming or non-streaming response -// and calls the appropriate handler based on the model provider. -// This endpoint follows the OpenAIResponses completions API specification. -// -// Parameters: -// - c: The Gin context containing the HTTP request and response -func (h *OpenAIResponsesAPIHandler) Completions(c *gin.Context) { - rawJSON, err := c.GetRawData() - // If data retrieval fails, return a 400 Bad Request error. - if err != nil { - c.JSON(http.StatusBadRequest, handlers.ErrorResponse{ - Error: handlers.ErrorDetail{ - Message: fmt.Sprintf("Invalid request: %v", err), - Type: "invalid_request_error", - }, - }) - return - } - - // Check if the client requested a streaming response. - streamResult := gjson.GetBytes(rawJSON, "stream") - if streamResult.Type == gjson.True { - h.handleCompletionsStreamingResponse(c, rawJSON) - } else { - h.handleCompletionsNonStreamingResponse(c, rawJSON) - } - -} - // handleNonStreamingResponse handles non-streaming chat completion responses // for Gemini models. It selects a client from the pool, sends the request, and // aggregates the response before sending it back to the client in OpenAIResponses format. @@ -257,14 +227,13 @@ outLoop: // Process incoming response chunks. case chunk, okStream := <-respChan: if !okStream { - // Stream is closed, send the final [DONE] message. - _, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n") flusher.Flush() cliCancel() return } - _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", string(chunk)) + _, _ = c.Writer.Write(chunk) + _, _ = c.Writer.Write([]byte("\n")) flusher.Flush() // Handle errors from the backend. case err, okError := <-errChan: @@ -294,181 +263,3 @@ outLoop: } } } - -// handleCompletionsNonStreamingResponse handles non-streaming completions responses. -// It converts completions request to chat completions format, sends to backend, -// then converts the response back to completions format before sending to client. -// -// Parameters: -// - c: The Gin context containing the HTTP request and response -// - rawJSON: The raw JSON bytes of the OpenAIResponses-compatible completions request -func (h *OpenAIResponsesAPIHandler) handleCompletionsNonStreamingResponse(c *gin.Context, rawJSON []byte) { - c.Header("Content-Type", "application/json") - - // Convert completions request to chat completions format - chatCompletionsJSON := convertCompletionsRequestToChatCompletions(rawJSON) - - modelName := gjson.GetBytes(chatCompletionsJSON, "model").String() - cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) - - var cliClient interfaces.Client - defer func() { - if cliClient != nil { - if mutex := cliClient.GetRequestMutex(); mutex != nil { - mutex.Unlock() - } - } - }() - - retryCount := 0 - for retryCount <= h.Cfg.RequestRetry { - var errorResponse *interfaces.ErrorMessage - cliClient, errorResponse = h.GetClient(modelName) - if errorResponse != nil { - c.Status(errorResponse.StatusCode) - _, _ = fmt.Fprint(c.Writer, errorResponse.Error.Error()) - cliCancel() - return - } - - // Send the converted chat completions request - resp, err := cliClient.SendRawMessage(cliCtx, modelName, chatCompletionsJSON, "") - if err != nil { - switch err.StatusCode { - case 429: - if h.Cfg.QuotaExceeded.SwitchProject { - log.Debugf("quota exceeded, switch client") - continue // Restart the client selection process - } - case 403, 408, 500, 502, 503, 504: - log.Debugf("http status code %d, switch client", err.StatusCode) - retryCount++ - continue - default: - // Forward other errors directly to the client - c.Status(err.StatusCode) - _, _ = c.Writer.Write([]byte(err.Error.Error())) - cliCancel(err.Error) - } - break - } else { - // Convert chat completions response back to completions format - completionsResp := convertChatCompletionsResponseToCompletions(resp) - _, _ = c.Writer.Write(completionsResp) - cliCancel(completionsResp) - break - } - } -} - -// handleCompletionsStreamingResponse handles streaming completions responses. -// It converts completions request to chat completions format, streams from backend, -// then converts each response chunk back to completions format before sending to client. -// -// Parameters: -// - c: The Gin context containing the HTTP request and response -// - rawJSON: The raw JSON bytes of the OpenAIResponses-compatible completions request -func (h *OpenAIResponsesAPIHandler) handleCompletionsStreamingResponse(c *gin.Context, rawJSON []byte) { - c.Header("Content-Type", "text/event-stream") - c.Header("Cache-Control", "no-cache") - c.Header("Connection", "keep-alive") - c.Header("Access-Control-Allow-Origin", "*") - - // Get the http.Flusher interface to manually flush the response. - flusher, ok := c.Writer.(http.Flusher) - if !ok { - c.JSON(http.StatusInternalServerError, handlers.ErrorResponse{ - Error: handlers.ErrorDetail{ - Message: "Streaming not supported", - Type: "server_error", - }, - }) - return - } - - // Convert completions request to chat completions format - chatCompletionsJSON := convertCompletionsRequestToChatCompletions(rawJSON) - - modelName := gjson.GetBytes(chatCompletionsJSON, "model").String() - cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) - - var cliClient interfaces.Client - defer func() { - // Ensure the client's mutex is unlocked on function exit. - if cliClient != nil { - if mutex := cliClient.GetRequestMutex(); mutex != nil { - mutex.Unlock() - } - } - }() - - retryCount := 0 -outLoop: - for retryCount <= h.Cfg.RequestRetry { - var errorResponse *interfaces.ErrorMessage - cliClient, errorResponse = h.GetClient(modelName) - if errorResponse != nil { - c.Status(errorResponse.StatusCode) - _, _ = fmt.Fprint(c.Writer, errorResponse.Error.Error()) - flusher.Flush() - cliCancel() - return - } - - // Send the converted chat completions request and receive response chunks - respChan, errChan := cliClient.SendRawMessageStream(cliCtx, modelName, chatCompletionsJSON, "") - - for { - select { - // Handle client disconnection. - case <-c.Request.Context().Done(): - if c.Request.Context().Err().Error() == "context canceled" { - log.Debugf("client disconnected: %v", c.Request.Context().Err()) - cliCancel() // Cancel the backend request. - return - } - // Process incoming response chunks. - case chunk, okStream := <-respChan: - if !okStream { - // Stream is closed, send the final [DONE] message. - _, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n") - flusher.Flush() - cliCancel() - return - } - - // Convert chat completions chunk to completions chunk format - completionsChunk := convertChatCompletionsStreamChunkToCompletions(chunk) - // Skip this chunk if it has no meaningful content (empty text) - if completionsChunk != nil { - _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", string(completionsChunk)) - flusher.Flush() - } - // Handle errors from the backend. - case err, okError := <-errChan: - if okError { - switch err.StatusCode { - case 429: - if h.Cfg.QuotaExceeded.SwitchProject { - log.Debugf("quota exceeded, switch client") - continue outLoop // Restart the client selection process - } - case 403, 408, 500, 502, 503, 504: - log.Debugf("http status code %d, switch client", err.StatusCode) - retryCount++ - continue outLoop - default: - // Forward other errors directly to the client - c.Status(err.StatusCode) - _, _ = fmt.Fprint(c.Writer, err.Error.Error()) - flusher.Flush() - cliCancel(err.Error) - } - return - } - // Send a keep-alive signal to the client. - case <-time.After(500 * time.Millisecond): - } - } - } -} diff --git a/internal/api/server.go b/internal/api/server.go index 10eba22e..a1586077 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -107,6 +107,7 @@ func (s *Server) setupRoutes() { geminiHandlers := gemini.NewGeminiAPIHandler(s.handlers) geminiCLIHandlers := gemini.NewGeminiCLIAPIHandler(s.handlers) claudeCodeHandlers := claude.NewClaudeCodeAPIHandler(s.handlers) + openaiResponsesHandlers := openai.NewOpenAIResponsesAPIHandler(s.handlers) // OpenAI compatible API routes v1 := s.engine.Group("/v1") @@ -116,6 +117,7 @@ func (s *Server) setupRoutes() { v1.POST("/chat/completions", openaiHandlers.ChatCompletions) v1.POST("/completions", openaiHandlers.Completions) v1.POST("/messages", claudeCodeHandlers.ClaudeMessages) + v1.POST("/responses", openaiResponsesHandlers.Responses) } // Gemini compatible API routes diff --git a/internal/client/codex_client.go b/internal/client/codex_client.go index 2c3430b5..c64b23a2 100644 --- a/internal/client/codex_client.go +++ b/internal/client/codex_client.go @@ -31,7 +31,7 @@ import ( ) const ( - chatGPTEndpoint = "https://chatgpt.com/backend-api" + chatGPTEndpoint = "https://chatgpt.com/backend-api/codex" ) // CodexClient implements the Client interface for OpenAI API @@ -128,7 +128,7 @@ func (c *CodexClient) SendRawMessage(ctx context.Context, modelName string, rawJ handlerType := handler.HandlerType() rawJSON = translator.Request(handlerType, c.Type(), modelName, rawJSON, false) - respBody, err := c.APIRequest(ctx, modelName, "/codex/responses", rawJSON, alt, false) + respBody, err := c.APIRequest(ctx, modelName, "/responses", rawJSON, alt, false) if err != nil { if err.StatusCode == 429 { now := time.Now() @@ -193,7 +193,7 @@ func (c *CodexClient) SendRawMessageStream(ctx context.Context, modelName string } var err *interfaces.ErrorMessage - stream, err = c.APIRequest(ctx, modelName, "/codex/responses", rawJSON, alt, true) + stream, err = c.APIRequest(ctx, modelName, "/responses", rawJSON, alt, true) if err != nil { if err.StatusCode == 429 { now := time.Now()