import logging import os from application.core.settings import settings from application.llm.llm_creator import LLMCreator from application.retriever.base import BaseRetriever from application.vectorstore.vector_creator import VectorCreator class ClassicRAG(BaseRetriever): def __init__( self, source, chat_history=None, prompt="", chunks=2, token_limit=150, gpt_model="docsgpt", user_api_key=None, llm_name=settings.LLM_PROVIDER, api_key=settings.API_KEY, decoded_token=None, ): """Initialize ClassicRAG retriever with vectorstore sources and LLM configuration""" self.original_question = source.get("question", "") self.chat_history = chat_history if chat_history is not None else [] self.prompt = prompt if isinstance(chunks, str): try: self.chunks = int(chunks) except ValueError: logging.warning( f"Invalid chunks value '{chunks}', using default value 2" ) self.chunks = 2 else: self.chunks = chunks user_identifier = user_api_key if user_api_key else "default" logging.info( f"ClassicRAG initialized with chunks={self.chunks}, user_api_key={user_identifier}, " f"sources={'active_docs' in source and source['active_docs'] is not None}" ) self.gpt_model = gpt_model self.token_limit = ( token_limit if token_limit < settings.LLM_TOKEN_LIMITS.get( self.gpt_model, settings.DEFAULT_MAX_HISTORY ) else settings.LLM_TOKEN_LIMITS.get( self.gpt_model, settings.DEFAULT_MAX_HISTORY ) ) self.user_api_key = user_api_key self.llm_name = llm_name self.api_key = api_key self.llm = LLMCreator.create_llm( self.llm_name, api_key=self.api_key, user_api_key=self.user_api_key, decoded_token=decoded_token, ) if "active_docs" in source and source["active_docs"] is not None: if isinstance(source["active_docs"], list): self.vectorstores = source["active_docs"] else: self.vectorstores = [source["active_docs"]] else: self.vectorstores = [] self.question = self._rephrase_query() self.decoded_token = decoded_token self._validate_vectorstore_config() def _validate_vectorstore_config(self): """Validate vectorstore IDs and remove any empty/invalid entries""" if not self.vectorstores: logging.warning("No vectorstores configured for retrieval") return invalid_ids = [ vs_id for vs_id in self.vectorstores if not vs_id or not vs_id.strip() ] if invalid_ids: logging.warning(f"Found invalid vectorstore IDs: {invalid_ids}") self.vectorstores = [ vs_id for vs_id in self.vectorstores if vs_id and vs_id.strip() ] def _rephrase_query(self): """Rephrase user query with chat history context for better retrieval""" if ( not self.original_question or not self.chat_history or self.chat_history == [] or self.chunks == 0 or not self.vectorstores ): return self.original_question prompt = ( "Given the following conversation history:\n" f"{self.chat_history}\n\n" "Rephrase the following user question to be a standalone search query " "that captures all relevant context from the conversation:\n" ) messages = [ {"role": "system", "content": prompt}, {"role": "user", "content": self.original_question}, ] try: rephrased_query = self.llm.gen(model=self.gpt_model, messages=messages) print(f"Rephrased query: {rephrased_query}") return rephrased_query if rephrased_query else self.original_question except Exception as e: logging.error(f"Error rephrasing query: {e}", exc_info=True) return self.original_question def _get_data(self): """Retrieve relevant documents from configured vectorstores""" if self.chunks == 0 or not self.vectorstores: logging.info( f"ClassicRAG._get_data: Skipping retrieval - chunks={self.chunks}, " f"vectorstores_count={len(self.vectorstores) if self.vectorstores else 0}" ) return [] all_docs = [] chunks_per_source = max(1, self.chunks // len(self.vectorstores)) logging.info( f"ClassicRAG._get_data: Starting retrieval with chunks={self.chunks}, " f"vectorstores={self.vectorstores}, chunks_per_source={chunks_per_source}, " f"query='{self.question[:50]}...'" ) for vectorstore_id in self.vectorstores: if vectorstore_id: try: docsearch = VectorCreator.create_vectorstore( settings.VECTOR_STORE, vectorstore_id, settings.EMBEDDINGS_KEY ) docs_temp = docsearch.search(self.question, k=chunks_per_source) for doc in docs_temp: if hasattr(doc, "page_content") and hasattr(doc, "metadata"): page_content = doc.page_content metadata = doc.metadata else: page_content = doc.get("text", doc.get("page_content", "")) metadata = doc.get("metadata", {}) title = metadata.get( "title", metadata.get("post_title", page_content) ) if not isinstance(title, str): title = str(title) title = title.split("/")[-1] filename = ( metadata.get("filename") or metadata.get("file_name") or metadata.get("source") ) if isinstance(filename, str): filename = os.path.basename(filename) or filename else: filename = title if not filename: filename = title source_path = metadata.get("source") or vectorstore_id all_docs.append( { "title": title, "text": page_content, "source": source_path, "filename": filename, } ) except Exception as e: logging.error( f"Error searching vectorstore {vectorstore_id}: {e}", exc_info=True, ) continue logging.info( f"ClassicRAG._get_data: Retrieval complete - retrieved {len(all_docs)} documents " f"(requested chunks={self.chunks}, chunks_per_source={chunks_per_source})" ) return all_docs def search(self, query: str = ""): """Search for documents using optional query override""" if query: self.original_question = query self.question = self._rephrase_query() return self._get_data() def get_params(self): """Return current retriever configuration parameters""" return { "question": self.original_question, "rephrased_question": self.question, "sources": self.vectorstores, "chunks": self.chunks, "token_limit": self.token_limit, "gpt_model": self.gpt_model, "user_api_key": self.user_api_key, }