From 8950d92682324b0b3d9cff38041fd6b7f7919ee2 Mon Sep 17 00:00:00 2001 From: ChrAlpha <53332481+ChrAlpha@users.noreply.github.com> Date: Thu, 15 Jan 2026 18:30:01 +0800 Subject: [PATCH] feat(openai): implement endpoint resolution and response handling for Chat and Responses models --- sdk/api/handlers/openai/endpoint_compat.go | 37 ++++ sdk/api/handlers/openai/openai_handlers.go | 168 ++++++++++++++++++ .../openai/openai_responses_handlers.go | 149 +++++++++++++++- 3 files changed, 353 insertions(+), 1 deletion(-) create mode 100644 sdk/api/handlers/openai/endpoint_compat.go diff --git a/sdk/api/handlers/openai/endpoint_compat.go b/sdk/api/handlers/openai/endpoint_compat.go new file mode 100644 index 00000000..56fac508 --- /dev/null +++ b/sdk/api/handlers/openai/endpoint_compat.go @@ -0,0 +1,37 @@ +package openai + +import "github.com/router-for-me/CLIProxyAPI/v6/internal/registry" + +const ( + openAIChatEndpoint = "/chat/completions" + openAIResponsesEndpoint = "/responses" +) + +func resolveEndpointOverride(modelName, requestedEndpoint string) (string, bool) { + if modelName == "" { + return "", false + } + info := registry.GetGlobalRegistry().GetModelInfo(modelName) + if info == nil || len(info.SupportedEndpoints) == 0 { + return "", false + } + if endpointListContains(info.SupportedEndpoints, requestedEndpoint) { + return "", false + } + if requestedEndpoint == openAIChatEndpoint && endpointListContains(info.SupportedEndpoints, openAIResponsesEndpoint) { + return openAIResponsesEndpoint, true + } + if requestedEndpoint == openAIResponsesEndpoint && endpointListContains(info.SupportedEndpoints, openAIChatEndpoint) { + return openAIChatEndpoint, true + } + return "", false +} + +func endpointListContains(items []string, value string) bool { + for _, item := range items { + if item == value { + return true + } + } + return false +} \ No newline at end of file diff --git a/sdk/api/handlers/openai/openai_handlers.go b/sdk/api/handlers/openai/openai_handlers.go index 09471ce1..c8aaba78 100644 --- a/sdk/api/handlers/openai/openai_handlers.go +++ b/sdk/api/handlers/openai/openai_handlers.go @@ -17,6 +17,7 @@ import ( . "github.com/router-for-me/CLIProxyAPI/v6/internal/constant" "github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces" "github.com/router-for-me/CLIProxyAPI/v6/internal/registry" + codexconverter "github.com/router-for-me/CLIProxyAPI/v6/internal/translator/codex/openai/chat-completions" responsesconverter "github.com/router-for-me/CLIProxyAPI/v6/internal/translator/openai/openai/responses" "github.com/router-for-me/CLIProxyAPI/v6/sdk/api/handlers" "github.com/tidwall/gjson" @@ -112,6 +113,23 @@ func (h *OpenAIAPIHandler) ChatCompletions(c *gin.Context) { streamResult := gjson.GetBytes(rawJSON, "stream") stream := streamResult.Type == gjson.True + modelName := gjson.GetBytes(rawJSON, "model").String() + if overrideEndpoint, ok := resolveEndpointOverride(modelName, openAIChatEndpoint); ok && overrideEndpoint == openAIResponsesEndpoint { + originalChat := rawJSON + if shouldTreatAsResponsesFormat(rawJSON) { + // Already responses-style payload; no conversion needed. + } else { + rawJSON = codexconverter.ConvertOpenAIRequestToCodex(modelName, rawJSON, stream) + } + stream = gjson.GetBytes(rawJSON, "stream").Bool() + if stream { + h.handleStreamingResponseViaResponses(c, rawJSON, originalChat) + } else { + h.handleNonStreamingResponseViaResponses(c, rawJSON, originalChat) + } + return + } + // Some clients send OpenAI Responses-format payloads to /v1/chat/completions. // Convert them to Chat Completions so downstream translators preserve tool metadata. if shouldTreatAsResponsesFormat(rawJSON) { @@ -245,6 +263,76 @@ func convertCompletionsRequestToChatCompletions(rawJSON []byte) []byte { return []byte(out) } +func convertResponsesObjectToChatCompletion(ctx context.Context, modelName string, originalChatJSON, responsesRequestJSON, responsesPayload []byte) []byte { + if len(responsesPayload) == 0 { + return nil + } + wrapped := wrapResponsesPayloadAsCompleted(responsesPayload) + if len(wrapped) == 0 { + return nil + } + var param any + converted := codexconverter.ConvertCodexResponseToOpenAINonStream(ctx, modelName, originalChatJSON, responsesRequestJSON, wrapped, ¶m) + if converted == "" { + return nil + } + return []byte(converted) +} + +func wrapResponsesPayloadAsCompleted(payload []byte) []byte { + if gjson.GetBytes(payload, "type").Exists() { + return payload + } + if gjson.GetBytes(payload, "object").String() != "response" { + return payload + } + wrapped := `{"type":"response.completed","response":{}}` + wrapped, _ = sjson.SetRaw(wrapped, "response", string(payload)) + return []byte(wrapped) +} + +func writeConvertedResponsesChunk(c *gin.Context, ctx context.Context, modelName string, originalChatJSON, responsesRequestJSON, chunk []byte, param *any) { + outputs := codexconverter.ConvertCodexResponseToOpenAI(ctx, modelName, originalChatJSON, responsesRequestJSON, chunk, param) + for _, out := range outputs { + if out == "" { + continue + } + _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", out) + } +} + +func (h *OpenAIAPIHandler) forwardResponsesAsChatStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage, ctx context.Context, modelName string, originalChatJSON, responsesRequestJSON []byte, param *any) { + h.ForwardStream(c, flusher, cancel, data, errs, handlers.StreamForwardOptions{ + WriteChunk: func(chunk []byte) { + outputs := codexconverter.ConvertCodexResponseToOpenAI(ctx, modelName, originalChatJSON, responsesRequestJSON, chunk, param) + for _, out := range outputs { + if out == "" { + continue + } + _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", out) + } + }, + WriteTerminalError: func(errMsg *interfaces.ErrorMessage) { + if errMsg == nil { + return + } + status := http.StatusInternalServerError + if errMsg.StatusCode > 0 { + status = errMsg.StatusCode + } + errText := http.StatusText(status) + if errMsg.Error != nil && errMsg.Error.Error() != "" { + errText = errMsg.Error.Error() + } + body := handlers.BuildErrorResponseBody(status, errText) + _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", string(body)) + }, + WriteDone: func() { + _, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n") + }, + }) +} + // convertChatCompletionsResponseToCompletions converts chat completions API response back to completions format. // This ensures the completions endpoint returns data in the expected format. // @@ -435,6 +523,25 @@ func (h *OpenAIAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSON [] cliCancel() } +func (h *OpenAIAPIHandler) handleNonStreamingResponseViaResponses(c *gin.Context, rawJSON []byte, originalChatJSON []byte) { + c.Header("Content-Type", "application/json") + + modelName := gjson.GetBytes(rawJSON, "model").String() + cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) + resp, errMsg := h.ExecuteWithAuthManager(cliCtx, OpenaiResponse, modelName, rawJSON, h.GetAlt(c)) + if errMsg != nil { + h.WriteErrorResponse(c, errMsg) + cliCancel(errMsg.Error) + return + } + converted := convertResponsesObjectToChatCompletion(cliCtx, modelName, originalChatJSON, rawJSON, resp) + if converted == nil { + converted = resp + } + _, _ = c.Writer.Write(converted) + cliCancel() +} + // handleStreamingResponse handles streaming responses for Gemini models. // It establishes a streaming connection with the backend service and forwards // the response chunks to the client in real-time using Server-Sent Events. @@ -509,6 +616,67 @@ func (h *OpenAIAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON []byt } } +func (h *OpenAIAPIHandler) handleStreamingResponseViaResponses(c *gin.Context, rawJSON []byte, originalChatJSON []byte) { + flusher, ok := c.Writer.(http.Flusher) + if !ok { + c.JSON(http.StatusInternalServerError, handlers.ErrorResponse{ + Error: handlers.ErrorDetail{ + Message: "Streaming not supported", + Type: "server_error", + }, + }) + return + } + + modelName := gjson.GetBytes(rawJSON, "model").String() + cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) + dataChan, errChan := h.ExecuteStreamWithAuthManager(cliCtx, OpenaiResponse, modelName, rawJSON, h.GetAlt(c)) + var param any + + setSSEHeaders := func() { + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + c.Header("Access-Control-Allow-Origin", "*") + } + + // Peek for first usable chunk + for { + select { + case <-c.Request.Context().Done(): + cliCancel(c.Request.Context().Err()) + return + case errMsg, ok := <-errChan: + if !ok { + errChan = nil + continue + } + h.WriteErrorResponse(c, errMsg) + if errMsg != nil { + cliCancel(errMsg.Error) + } else { + cliCancel(nil) + } + return + case chunk, ok := <-dataChan: + if !ok { + setSSEHeaders() + _, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n") + flusher.Flush() + cliCancel(nil) + return + } + + setSSEHeaders() + writeConvertedResponsesChunk(c, cliCtx, modelName, originalChatJSON, rawJSON, chunk, ¶m) + flusher.Flush() + + h.forwardResponsesAsChatStream(c, flusher, func(err error) { cliCancel(err) }, dataChan, errChan, cliCtx, modelName, originalChatJSON, rawJSON, ¶m) + return + } + } +} + // handleCompletionsNonStreamingResponse handles non-streaming completions responses. // It converts completions request to chat completions format, sends to backend, // then converts the response back to completions format before sending to client. diff --git a/sdk/api/handlers/openai/openai_responses_handlers.go b/sdk/api/handlers/openai/openai_responses_handlers.go index 31099f81..e6c29001 100644 --- a/sdk/api/handlers/openai/openai_responses_handlers.go +++ b/sdk/api/handlers/openai/openai_responses_handlers.go @@ -16,6 +16,7 @@ import ( . "github.com/router-for-me/CLIProxyAPI/v6/internal/constant" "github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces" "github.com/router-for-me/CLIProxyAPI/v6/internal/registry" + responsesconverter "github.com/router-for-me/CLIProxyAPI/v6/internal/translator/openai/openai/responses" "github.com/router-for-me/CLIProxyAPI/v6/sdk/api/handlers" "github.com/tidwall/gjson" ) @@ -83,7 +84,21 @@ func (h *OpenAIResponsesAPIHandler) Responses(c *gin.Context) { // Check if the client requested a streaming response. streamResult := gjson.GetBytes(rawJSON, "stream") - if streamResult.Type == gjson.True { + stream := streamResult.Type == gjson.True + + modelName := gjson.GetBytes(rawJSON, "model").String() + if overrideEndpoint, ok := resolveEndpointOverride(modelName, openAIResponsesEndpoint); ok && overrideEndpoint == openAIChatEndpoint { + chatJSON := responsesconverter.ConvertOpenAIResponsesRequestToOpenAIChatCompletions(modelName, rawJSON, stream) + stream = gjson.GetBytes(chatJSON, "stream").Bool() + if stream { + h.handleStreamingResponseViaChat(c, rawJSON, chatJSON) + } else { + h.handleNonStreamingResponseViaChat(c, rawJSON, chatJSON) + } + return + } + + if stream { h.handleStreamingResponse(c, rawJSON) } else { h.handleNonStreamingResponse(c, rawJSON) @@ -116,6 +131,28 @@ func (h *OpenAIResponsesAPIHandler) handleNonStreamingResponse(c *gin.Context, r cliCancel() } +func (h *OpenAIResponsesAPIHandler) handleNonStreamingResponseViaChat(c *gin.Context, originalResponsesJSON, chatJSON []byte) { + c.Header("Content-Type", "application/json") + + modelName := gjson.GetBytes(chatJSON, "model").String() + cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) + resp, errMsg := h.ExecuteWithAuthManager(cliCtx, OpenAI, modelName, chatJSON, "") + if errMsg != nil { + h.WriteErrorResponse(c, errMsg) + cliCancel(errMsg.Error) + return + } + var param any + converted := responsesconverter.ConvertOpenAIChatCompletionsResponseToOpenAIResponsesNonStream(cliCtx, modelName, originalResponsesJSON, originalResponsesJSON, resp, ¶m) + if converted == "" { + _, _ = c.Writer.Write(resp) + cliCancel() + return + } + _, _ = c.Writer.Write([]byte(converted)) + cliCancel() +} + // handleStreamingResponse handles streaming responses for Gemini models. // It establishes a streaming connection with the backend service and forwards // the response chunks to the client in real-time using Server-Sent Events. @@ -196,6 +233,116 @@ func (h *OpenAIResponsesAPIHandler) handleStreamingResponse(c *gin.Context, rawJ } } +func (h *OpenAIResponsesAPIHandler) handleStreamingResponseViaChat(c *gin.Context, originalResponsesJSON, chatJSON []byte) { + flusher, ok := c.Writer.(http.Flusher) + if !ok { + c.JSON(http.StatusInternalServerError, handlers.ErrorResponse{ + Error: handlers.ErrorDetail{ + Message: "Streaming not supported", + Type: "server_error", + }, + }) + return + } + + modelName := gjson.GetBytes(chatJSON, "model").String() + cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) + dataChan, errChan := h.ExecuteStreamWithAuthManager(cliCtx, OpenAI, modelName, chatJSON, "") + var param any + + setSSEHeaders := func() { + c.Header("Content-Type", "text/event-stream") + c.Header("Cache-Control", "no-cache") + c.Header("Connection", "keep-alive") + c.Header("Access-Control-Allow-Origin", "*") + } + + for { + select { + case <-c.Request.Context().Done(): + cliCancel(c.Request.Context().Err()) + return + case errMsg, ok := <-errChan: + if !ok { + errChan = nil + continue + } + h.WriteErrorResponse(c, errMsg) + if errMsg != nil { + cliCancel(errMsg.Error) + } else { + cliCancel(nil) + } + return + case chunk, ok := <-dataChan: + if !ok { + setSSEHeaders() + _, _ = c.Writer.Write([]byte("\n")) + flusher.Flush() + cliCancel(nil) + return + } + + setSSEHeaders() + writeChatAsResponsesChunk(c, cliCtx, modelName, originalResponsesJSON, chunk, ¶m) + flusher.Flush() + + h.forwardChatAsResponsesStream(c, flusher, func(err error) { cliCancel(err) }, dataChan, errChan, cliCtx, modelName, originalResponsesJSON, ¶m) + return + } + } +} + +func writeChatAsResponsesChunk(c *gin.Context, ctx context.Context, modelName string, originalResponsesJSON, chunk []byte, param *any) { + outputs := responsesconverter.ConvertOpenAIChatCompletionsResponseToOpenAIResponses(ctx, modelName, originalResponsesJSON, originalResponsesJSON, chunk, param) + for _, out := range outputs { + if out == "" { + continue + } + if bytes.HasPrefix([]byte(out), []byte("event:")) { + _, _ = c.Writer.Write([]byte("\n")) + } + _, _ = c.Writer.Write([]byte(out)) + _, _ = c.Writer.Write([]byte("\n")) + } +} + +func (h *OpenAIResponsesAPIHandler) forwardChatAsResponsesStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage, ctx context.Context, modelName string, originalResponsesJSON []byte, param *any) { + h.ForwardStream(c, flusher, cancel, data, errs, handlers.StreamForwardOptions{ + WriteChunk: func(chunk []byte) { + outputs := responsesconverter.ConvertOpenAIChatCompletionsResponseToOpenAIResponses(ctx, modelName, originalResponsesJSON, originalResponsesJSON, chunk, param) + for _, out := range outputs { + if out == "" { + continue + } + if bytes.HasPrefix([]byte(out), []byte("event:")) { + _, _ = c.Writer.Write([]byte("\n")) + } + _, _ = c.Writer.Write([]byte(out)) + _, _ = c.Writer.Write([]byte("\n")) + } + }, + WriteTerminalError: func(errMsg *interfaces.ErrorMessage) { + if errMsg == nil { + return + } + status := http.StatusInternalServerError + if errMsg.StatusCode > 0 { + status = errMsg.StatusCode + } + errText := http.StatusText(status) + if errMsg.Error != nil && errMsg.Error.Error() != "" { + errText = errMsg.Error.Error() + } + body := handlers.BuildErrorResponseBody(status, errText) + _, _ = fmt.Fprintf(c.Writer, "\nevent: error\ndata: %s\n\n", string(body)) + }, + WriteDone: func() { + _, _ = c.Writer.Write([]byte("\n")) + }, + }) +} + func (h *OpenAIResponsesAPIHandler) forwardResponsesStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage) { h.ForwardStream(c, flusher, cancel, data, errs, handlers.StreamForwardOptions{ WriteChunk: func(chunk []byte) {