From 24c8b24b1f367669d208cf93e07895b3d9b896f2 Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Wed, 23 Apr 2025 00:52:22 +0530 Subject: [PATCH] Revert "(fix:indexes) look for the right path" This reverts commit 5ad34e2216e3052f075059afbf2026043047bf06. --- application/api/internal/routes.py | 16 ++-- application/parser/embedding_pipeline.py | 6 +- application/vectorstore/faiss.py | 110 ++++------------------- application/worker.py | 30 +++---- 4 files changed, 34 insertions(+), 128 deletions(-) diff --git a/application/api/internal/routes.py b/application/api/internal/routes.py index 6ba07431..e95b6327 100755 --- a/application/api/internal/routes.py +++ b/application/api/internal/routes.py @@ -1,6 +1,5 @@ import os import datetime -import logging from flask import Blueprint, request, send_from_directory from werkzeug.utils import secure_filename from bson.objectid import ObjectId @@ -47,7 +46,7 @@ def upload_index_files(): sync_frequency = secure_filename(request.form["sync_frequency"]) if "sync_frequency" in request.form else None storage = StorageCreator.create_storage(settings.STORAGE_TYPE) - + if settings.VECTOR_STORE == "faiss": if "file_faiss" not in request.files: print("No file part") @@ -61,18 +60,13 @@ def upload_index_files(): file_pkl = request.files["file_pkl"] if file_pkl.filename == "": return {"status": "no file name"} - + # Save index files storage_path_faiss = f"indexes/{str(id)}/index.faiss" storage_path_pkl = f"indexes/{str(id)}/index.pkl" - - try: - storage.save_file(file_faiss, storage_path_faiss) - storage.save_file(file_pkl, storage_path_pkl) - logging.info(f"Successfully saved FAISS index files for ID {id}") - except Exception as e: - logging.error(f"Error saving FAISS index files: {e}") - return {"status": "error", "message": str(e)} + + storage.save_file(file_faiss, storage_path_faiss) + storage.save_file(file_pkl, storage_path_pkl) existing_entry = sources_collection.find_one({"_id": ObjectId(id)}) if existing_entry: diff --git a/application/parser/embedding_pipeline.py b/application/parser/embedding_pipeline.py index 005d3756..0435cd14 100755 --- a/application/parser/embedding_pipeline.py +++ b/application/parser/embedding_pipeline.py @@ -42,18 +42,17 @@ def embed_and_store_documents(docs, folder_name, source_id, task_status): # Initialize vector store if settings.VECTOR_STORE == "faiss": - docs_init = [docs.pop(0)] store = VectorCreator.create_vectorstore( settings.VECTOR_STORE, docs_init=docs_init, - source_id=str(source_id), + source_id=folder_name, embeddings_key=os.getenv("EMBEDDINGS_KEY"), ) else: store = VectorCreator.create_vectorstore( settings.VECTOR_STORE, - source_id=str(source_id), + source_id=source_id, embeddings_key=os.getenv("EMBEDDINGS_KEY"), ) store.delete_index() @@ -83,6 +82,5 @@ def embed_and_store_documents(docs, folder_name, source_id, task_status): # Save the vector store if settings.VECTOR_STORE == "faiss": - # For FAISS, save to the temporary folder first store.save_local(folder_name) logging.info("Vector store saved successfully.") diff --git a/application/vectorstore/faiss.py b/application/vectorstore/faiss.py index 5a38f966..87ffcccb 100644 --- a/application/vectorstore/faiss.py +++ b/application/vectorstore/faiss.py @@ -1,45 +1,35 @@ import os -import tempfile -import logging from langchain_community.vectorstores import FAISS from application.core.settings import settings from application.parser.schema.base import Document from application.vectorstore.base import BaseVectorStore -from application.storage.storage_creator import StorageCreator -def get_vectorstore_path(source_id: str) -> str: - if source_id: - clean_id = source_id.replace("application/indexes/", "").rstrip("/") - return f"indexes/{clean_id}" +def get_vectorstore(path: str) -> str: + if path: + vectorstore = os.path.join("application", "indexes", path) else: - return "indexes" + vectorstore = os.path.join("application") + return vectorstore + class FaissStore(BaseVectorStore): def __init__(self, source_id: str, embeddings_key: str, docs_init=None): super().__init__() self.source_id = source_id - self.storage = StorageCreator.get_storage() - self.storage_path = get_vectorstore_path(source_id) + self.path = get_vectorstore(source_id) self.embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key) try: if docs_init: self.docsearch = FAISS.from_documents(docs_init, self.embeddings) else: - if self.storage.__class__.__name__ == "LocalStorage": - # For local storage, we can use the path directly - local_path = self.storage._get_full_path(self.storage_path) - self.docsearch = FAISS.load_local( - local_path, self.embeddings, allow_dangerous_deserialization=True - ) - else: - # For non-local storage (S3, etc.), download files to temp directory first - self.docsearch = self._load_from_remote_storage() - except Exception as e: - logging.error(f"Error initializing FAISS store: {e}") + self.docsearch = FAISS.load_local( + self.path, self.embeddings, allow_dangerous_deserialization=True + ) + except Exception: raise self.assert_embedding_dimensions(self.embeddings) @@ -50,26 +40,8 @@ class FaissStore(BaseVectorStore): def add_texts(self, *args, **kwargs): return self.docsearch.add_texts(*args, **kwargs) - def save_local(self, folder_path=None): - path_to_use = folder_path or self.storage_path - - if folder_path or self.storage.__class__.__name__ == "LocalStorage": - # If it's a local path or temp dir, save directly - local_path = path_to_use - if self.storage.__class__.__name__ == "LocalStorage" and not folder_path: - local_path = self.storage._get_full_path(path_to_use) - - os.makedirs(os.path.dirname(local_path) if os.path.dirname(local_path) else local_path, exist_ok=True) - - self.docsearch.save_local(local_path) - - if folder_path and self.storage.__class__.__name__ != "LocalStorage": - self._upload_index_to_remote(folder_path) - else: - # For remote storage, save to temp dir first, then upload - with tempfile.TemporaryDirectory() as temp_dir: - self.docsearch.save_local(temp_dir) - self._upload_index_to_remote(temp_dir) + def save_local(self, *args, **kwargs): + return self.docsearch.save_local(*args, **kwargs) def delete_index(self, *args, **kwargs): return self.docsearch.delete(*args, **kwargs) @@ -108,62 +80,10 @@ class FaissStore(BaseVectorStore): metadata = metadata or {} doc = Document(text=text, extra_info=metadata).to_langchain_format() doc_id = self.docsearch.add_documents([doc]) - self.save_local() + self.save_local(self.path) return doc_id def delete_chunk(self, chunk_id): self.delete_index([chunk_id]) - self.save_local() + self.save_local(self.path) return True - - def _load_from_remote_storage(self): - with tempfile.TemporaryDirectory() as temp_dir: - try: - # Check if both index files exist in remote storage - faiss_path = f"{self.storage_path}/index.faiss" - pkl_path = f"{self.storage_path}/index.pkl" - - if not self.storage.file_exists(faiss_path) or not self.storage.file_exists(pkl_path): - raise FileNotFoundError(f"FAISS index files not found at {self.storage_path}") - - # Download both files to temp directory - faiss_file = self.storage.get_file(faiss_path) - pkl_file = self.storage.get_file(pkl_path) - - local_faiss_path = os.path.join(temp_dir, "index.faiss") - local_pkl_path = os.path.join(temp_dir, "index.pkl") - - with open(local_faiss_path, 'wb') as f: - f.write(faiss_file.read()) - - with open(local_pkl_path, 'wb') as f: - f.write(pkl_file.read()) - - # Load the index from the temp directory - return FAISS.load_local( - temp_dir, self.embeddings, allow_dangerous_deserialization=True - ) - except Exception as e: - logging.error(f"Error loading FAISS index from remote storage: {e}") - raise - - def _upload_index_to_remote(self, local_folder): - try: - # Get paths to the index files - local_faiss_path = os.path.join(local_folder, "index.faiss") - local_pkl_path = os.path.join(local_folder, "index.pkl") - - remote_faiss_path = f"{self.storage_path}/index.faiss" - remote_pkl_path = f"{self.storage_path}/index.pkl" - - # Upload both files to remote storage - with open(local_faiss_path, 'rb') as f: - self.storage.save_file(f, remote_faiss_path) - - with open(local_pkl_path, 'rb') as f: - self.storage.save_file(f, remote_pkl_path) - - logging.info(f"Successfully uploaded FAISS index to {self.storage_path}") - except Exception as e: - logging.error(f"Error uploading FAISS index to remote storage: {e}") - raise diff --git a/application/worker.py b/application/worker.py index 5d32cf66..f8076260 100755 --- a/application/worker.py +++ b/application/worker.py @@ -89,12 +89,9 @@ def download_file(url, params, dest_path): def upload_index(full_path, file_data): try: if settings.VECTOR_STORE == "faiss": - faiss_path = os.path.join(full_path, "index.faiss") - pkl_path = os.path.join(full_path, "index.pkl") - files = { - "file_faiss": open(faiss_path, "rb"), - "file_pkl": open(pkl_path, "rb"), + "file_faiss": open(full_path + "/index.faiss", "rb"), + "file_pkl": open(full_path + "/index.pkl", "rb"), } response = requests.post( urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data @@ -107,11 +104,8 @@ def upload_index(full_path, file_data): except requests.RequestException as e: logging.error(f"Error uploading index: {e}") raise - except FileNotFoundError as e: - logging.error(f"File not found: {e}") - raise finally: - if settings.VECTOR_STORE == "faiss" and 'files' in locals(): + if settings.VECTOR_STORE == "faiss": for file in files.values(): file.close() @@ -143,23 +137,23 @@ def ingest_worker( storage = StorageCreator.create_storage(settings.STORAGE_TYPE) temp_dir = tempfile.mkdtemp() full_path = os.path.join(temp_dir, name_job) - + if not os.path.exists(full_path): os.makedirs(full_path) logging.info(f"Ingest file: {directory}/{user}/{name_job}/{filename}", extra={"user": user, "job": name_job}) file_data = {"name": name_job, "file": filename, "user": user} - + try: file_path = f"{directory}/{user}/{name_job}/{filename}" - + try: file_obj = storage.get_file(file_path) - + local_file_path = os.path.join(full_path, filename) with open(local_file_path, 'wb') as f: shutil.copyfileobj(file_obj, f) - + # check if file is .zip and extract it if filename.endswith(".zip"): extract_zip_recursive( @@ -194,7 +188,7 @@ def ingest_worker( vector_dir = os.path.join(temp_dir, "vector_store") os.makedirs(vector_dir, exist_ok=True) - + embed_and_store_documents(docs, vector_dir, str(id), self) tokens = count_tokens_docs(docs) self.update_state(state="PROGRESS", meta={"current": 100}) @@ -209,11 +203,11 @@ def ingest_worker( "id": str(id), "type": "local", }) - + mongo = MongoDB.get_client() db = mongo["docsgpt"] sources_collection = db["sources"] - + sources_collection.insert_one({ "_id": id, "name": name_job, @@ -234,7 +228,7 @@ def ingest_worker( "user": user, "limited": False, } - + finally: if os.path.exists(temp_dir): shutil.rmtree(temp_dir)