From a4507008c17b9b3dd8f4c7940861f3782065521d Mon Sep 17 00:00:00 2001 From: Manish Madan Date: Wed, 8 Oct 2025 23:07:30 +0530 Subject: [PATCH] complete_stream: Stop response streaming (#2031) * (feat:pause-stream) generator exit * (feat:pause-stream) close request * (feat:pause-stream) finally close; google anthropic --------- Co-authored-by: GH Action - Upstream Sync --- application/api/answer/routes/base.py | 37 +++++++++++++++++++++++++++ application/llm/anthropic.py | 8 ++++-- application/llm/docsgpt_provider.py | 18 +++++++++---- application/llm/google_ai.py | 26 +++++++++++-------- application/llm/openai.py | 22 +++++++++------- 5 files changed, 84 insertions(+), 27 deletions(-) diff --git a/application/api/answer/routes/base.py b/application/api/answer/routes/base.py index 8eb5aafc..5162ab77 100644 --- a/application/api/answer/routes/base.py +++ b/application/api/answer/routes/base.py @@ -193,6 +193,43 @@ class BaseAnswerResource: data = json.dumps({"type": "end"}) yield f"data: {data}\n\n" + except GeneratorExit: + # Client aborted the connection + logger.info( + f"Stream aborted by client for question: {question[:50]}... " + ) + # Save partial response to database before exiting + if should_save_conversation and response_full: + try: + if isNoneDoc: + for doc in source_log_docs: + doc["source"] = "None" + llm = LLMCreator.create_llm( + settings.LLM_PROVIDER, + api_key=settings.API_KEY, + user_api_key=user_api_key, + decoded_token=decoded_token, + ) + self.conversation_service.save_conversation( + conversation_id, + question, + response_full, + thought, + source_log_docs, + tool_calls, + llm, + self.gpt_model, + decoded_token, + index=index, + api_key=user_api_key, + agent_id=agent_id, + is_shared_usage=is_shared_usage, + shared_token=shared_token, + attachment_ids=attachment_ids, + ) + except Exception as e: + logger.error(f"Error saving partial response: {str(e)}", exc_info=True) + raise except Exception as e: logger.error(f"Error in stream: {str(e)}", exc_info=True) data = json.dumps( diff --git a/application/llm/anthropic.py b/application/llm/anthropic.py index 1fa3b5b2..b55dd855 100644 --- a/application/llm/anthropic.py +++ b/application/llm/anthropic.py @@ -46,5 +46,9 @@ class AnthropicLLM(BaseLLM): stream=True, ) - for completion in stream_response: - yield completion.completion + try: + for completion in stream_response: + yield completion.completion + finally: + if hasattr(stream_response, 'close'): + stream_response.close() diff --git a/application/llm/docsgpt_provider.py b/application/llm/docsgpt_provider.py index 001035c4..dbbcbfd2 100644 --- a/application/llm/docsgpt_provider.py +++ b/application/llm/docsgpt_provider.py @@ -121,11 +121,19 @@ class DocsGPTAPILLM(BaseLLM): model="docsgpt", messages=messages, stream=stream, **kwargs ) - for line in response: - if len(line.choices) > 0 and line.choices[0].delta.content is not None and len(line.choices[0].delta.content) > 0: - yield line.choices[0].delta.content - elif len(line.choices) > 0: - yield line.choices[0] + try: + for line in response: + if ( + len(line.choices) > 0 + and line.choices[0].delta.content is not None + and len(line.choices[0].delta.content) > 0 + ): + yield line.choices[0].delta.content + elif len(line.choices) > 0: + yield line.choices[0] + finally: + if hasattr(response, 'close'): + response.close() def _supports_tools(self): return True \ No newline at end of file diff --git a/application/llm/google_ai.py b/application/llm/google_ai.py index b88e1d9f..9263c873 100644 --- a/application/llm/google_ai.py +++ b/application/llm/google_ai.py @@ -373,17 +373,21 @@ class GoogleLLM(BaseLLM): config=config, ) - for chunk in response: - if hasattr(chunk, "candidates") and chunk.candidates: - for candidate in chunk.candidates: - if candidate.content and candidate.content.parts: - for part in candidate.content.parts: - if part.function_call: - yield part - elif part.text: - yield part.text - elif hasattr(chunk, "text"): - yield chunk.text + try: + for chunk in response: + if hasattr(chunk, "candidates") and chunk.candidates: + for candidate in chunk.candidates: + if candidate.content and candidate.content.parts: + for part in candidate.content.parts: + if part.function_call: + yield part + elif part.text: + yield part.text + elif hasattr(chunk, "text"): + yield chunk.text + finally: + if hasattr(response, 'close'): + response.close() def _supports_tools(self): """Return whether this LLM supports function calling.""" diff --git a/application/llm/openai.py b/application/llm/openai.py index 618aa238..cd69cea9 100644 --- a/application/llm/openai.py +++ b/application/llm/openai.py @@ -170,15 +170,19 @@ class OpenAILLM(BaseLLM): response = self.client.chat.completions.create(**request_params) - for line in response: - if ( - len(line.choices) > 0 - and line.choices[0].delta.content is not None - and len(line.choices[0].delta.content) > 0 - ): - yield line.choices[0].delta.content - elif len(line.choices) > 0: - yield line.choices[0] + try: + for line in response: + if ( + len(line.choices) > 0 + and line.choices[0].delta.content is not None + and len(line.choices[0].delta.content) > 0 + ): + yield line.choices[0].delta.content + elif len(line.choices) > 0: + yield line.choices[0] + finally: + if hasattr(response, 'close'): + response.close() def _supports_tools(self): return True