diff --git a/application/api/internal/routes.py b/application/api/internal/routes.py index e95b6327..6ba07431 100755 --- a/application/api/internal/routes.py +++ b/application/api/internal/routes.py @@ -1,5 +1,6 @@ import os import datetime +import logging from flask import Blueprint, request, send_from_directory from werkzeug.utils import secure_filename from bson.objectid import ObjectId @@ -46,7 +47,7 @@ def upload_index_files(): sync_frequency = secure_filename(request.form["sync_frequency"]) if "sync_frequency" in request.form else None storage = StorageCreator.create_storage(settings.STORAGE_TYPE) - + if settings.VECTOR_STORE == "faiss": if "file_faiss" not in request.files: print("No file part") @@ -60,13 +61,18 @@ def upload_index_files(): file_pkl = request.files["file_pkl"] if file_pkl.filename == "": return {"status": "no file name"} - + # Save index files storage_path_faiss = f"indexes/{str(id)}/index.faiss" storage_path_pkl = f"indexes/{str(id)}/index.pkl" - - storage.save_file(file_faiss, storage_path_faiss) - storage.save_file(file_pkl, storage_path_pkl) + + try: + storage.save_file(file_faiss, storage_path_faiss) + storage.save_file(file_pkl, storage_path_pkl) + logging.info(f"Successfully saved FAISS index files for ID {id}") + except Exception as e: + logging.error(f"Error saving FAISS index files: {e}") + return {"status": "error", "message": str(e)} existing_entry = sources_collection.find_one({"_id": ObjectId(id)}) if existing_entry: diff --git a/application/parser/embedding_pipeline.py b/application/parser/embedding_pipeline.py index 0435cd14..005d3756 100755 --- a/application/parser/embedding_pipeline.py +++ b/application/parser/embedding_pipeline.py @@ -42,17 +42,18 @@ def embed_and_store_documents(docs, folder_name, source_id, task_status): # Initialize vector store if settings.VECTOR_STORE == "faiss": + docs_init = [docs.pop(0)] store = VectorCreator.create_vectorstore( settings.VECTOR_STORE, docs_init=docs_init, - source_id=folder_name, + source_id=str(source_id), embeddings_key=os.getenv("EMBEDDINGS_KEY"), ) else: store = VectorCreator.create_vectorstore( settings.VECTOR_STORE, - source_id=source_id, + source_id=str(source_id), embeddings_key=os.getenv("EMBEDDINGS_KEY"), ) store.delete_index() @@ -82,5 +83,6 @@ def embed_and_store_documents(docs, folder_name, source_id, task_status): # Save the vector store if settings.VECTOR_STORE == "faiss": + # For FAISS, save to the temporary folder first store.save_local(folder_name) logging.info("Vector store saved successfully.") diff --git a/application/vectorstore/faiss.py b/application/vectorstore/faiss.py index 87ffcccb..5a38f966 100644 --- a/application/vectorstore/faiss.py +++ b/application/vectorstore/faiss.py @@ -1,35 +1,45 @@ import os +import tempfile +import logging from langchain_community.vectorstores import FAISS from application.core.settings import settings from application.parser.schema.base import Document from application.vectorstore.base import BaseVectorStore +from application.storage.storage_creator import StorageCreator -def get_vectorstore(path: str) -> str: - if path: - vectorstore = os.path.join("application", "indexes", path) +def get_vectorstore_path(source_id: str) -> str: + if source_id: + clean_id = source_id.replace("application/indexes/", "").rstrip("/") + return f"indexes/{clean_id}" else: - vectorstore = os.path.join("application") - return vectorstore - + return "indexes" class FaissStore(BaseVectorStore): def __init__(self, source_id: str, embeddings_key: str, docs_init=None): super().__init__() self.source_id = source_id - self.path = get_vectorstore(source_id) + self.storage = StorageCreator.get_storage() + self.storage_path = get_vectorstore_path(source_id) self.embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key) try: if docs_init: self.docsearch = FAISS.from_documents(docs_init, self.embeddings) else: - self.docsearch = FAISS.load_local( - self.path, self.embeddings, allow_dangerous_deserialization=True - ) - except Exception: + if self.storage.__class__.__name__ == "LocalStorage": + # For local storage, we can use the path directly + local_path = self.storage._get_full_path(self.storage_path) + self.docsearch = FAISS.load_local( + local_path, self.embeddings, allow_dangerous_deserialization=True + ) + else: + # For non-local storage (S3, etc.), download files to temp directory first + self.docsearch = self._load_from_remote_storage() + except Exception as e: + logging.error(f"Error initializing FAISS store: {e}") raise self.assert_embedding_dimensions(self.embeddings) @@ -40,8 +50,26 @@ class FaissStore(BaseVectorStore): def add_texts(self, *args, **kwargs): return self.docsearch.add_texts(*args, **kwargs) - def save_local(self, *args, **kwargs): - return self.docsearch.save_local(*args, **kwargs) + def save_local(self, folder_path=None): + path_to_use = folder_path or self.storage_path + + if folder_path or self.storage.__class__.__name__ == "LocalStorage": + # If it's a local path or temp dir, save directly + local_path = path_to_use + if self.storage.__class__.__name__ == "LocalStorage" and not folder_path: + local_path = self.storage._get_full_path(path_to_use) + + os.makedirs(os.path.dirname(local_path) if os.path.dirname(local_path) else local_path, exist_ok=True) + + self.docsearch.save_local(local_path) + + if folder_path and self.storage.__class__.__name__ != "LocalStorage": + self._upload_index_to_remote(folder_path) + else: + # For remote storage, save to temp dir first, then upload + with tempfile.TemporaryDirectory() as temp_dir: + self.docsearch.save_local(temp_dir) + self._upload_index_to_remote(temp_dir) def delete_index(self, *args, **kwargs): return self.docsearch.delete(*args, **kwargs) @@ -80,10 +108,62 @@ class FaissStore(BaseVectorStore): metadata = metadata or {} doc = Document(text=text, extra_info=metadata).to_langchain_format() doc_id = self.docsearch.add_documents([doc]) - self.save_local(self.path) + self.save_local() return doc_id def delete_chunk(self, chunk_id): self.delete_index([chunk_id]) - self.save_local(self.path) + self.save_local() return True + + def _load_from_remote_storage(self): + with tempfile.TemporaryDirectory() as temp_dir: + try: + # Check if both index files exist in remote storage + faiss_path = f"{self.storage_path}/index.faiss" + pkl_path = f"{self.storage_path}/index.pkl" + + if not self.storage.file_exists(faiss_path) or not self.storage.file_exists(pkl_path): + raise FileNotFoundError(f"FAISS index files not found at {self.storage_path}") + + # Download both files to temp directory + faiss_file = self.storage.get_file(faiss_path) + pkl_file = self.storage.get_file(pkl_path) + + local_faiss_path = os.path.join(temp_dir, "index.faiss") + local_pkl_path = os.path.join(temp_dir, "index.pkl") + + with open(local_faiss_path, 'wb') as f: + f.write(faiss_file.read()) + + with open(local_pkl_path, 'wb') as f: + f.write(pkl_file.read()) + + # Load the index from the temp directory + return FAISS.load_local( + temp_dir, self.embeddings, allow_dangerous_deserialization=True + ) + except Exception as e: + logging.error(f"Error loading FAISS index from remote storage: {e}") + raise + + def _upload_index_to_remote(self, local_folder): + try: + # Get paths to the index files + local_faiss_path = os.path.join(local_folder, "index.faiss") + local_pkl_path = os.path.join(local_folder, "index.pkl") + + remote_faiss_path = f"{self.storage_path}/index.faiss" + remote_pkl_path = f"{self.storage_path}/index.pkl" + + # Upload both files to remote storage + with open(local_faiss_path, 'rb') as f: + self.storage.save_file(f, remote_faiss_path) + + with open(local_pkl_path, 'rb') as f: + self.storage.save_file(f, remote_pkl_path) + + logging.info(f"Successfully uploaded FAISS index to {self.storage_path}") + except Exception as e: + logging.error(f"Error uploading FAISS index to remote storage: {e}") + raise diff --git a/application/worker.py b/application/worker.py index f8076260..5d32cf66 100755 --- a/application/worker.py +++ b/application/worker.py @@ -89,9 +89,12 @@ def download_file(url, params, dest_path): def upload_index(full_path, file_data): try: if settings.VECTOR_STORE == "faiss": + faiss_path = os.path.join(full_path, "index.faiss") + pkl_path = os.path.join(full_path, "index.pkl") + files = { - "file_faiss": open(full_path + "/index.faiss", "rb"), - "file_pkl": open(full_path + "/index.pkl", "rb"), + "file_faiss": open(faiss_path, "rb"), + "file_pkl": open(pkl_path, "rb"), } response = requests.post( urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data @@ -104,8 +107,11 @@ def upload_index(full_path, file_data): except requests.RequestException as e: logging.error(f"Error uploading index: {e}") raise + except FileNotFoundError as e: + logging.error(f"File not found: {e}") + raise finally: - if settings.VECTOR_STORE == "faiss": + if settings.VECTOR_STORE == "faiss" and 'files' in locals(): for file in files.values(): file.close() @@ -137,23 +143,23 @@ def ingest_worker( storage = StorageCreator.create_storage(settings.STORAGE_TYPE) temp_dir = tempfile.mkdtemp() full_path = os.path.join(temp_dir, name_job) - + if not os.path.exists(full_path): os.makedirs(full_path) logging.info(f"Ingest file: {directory}/{user}/{name_job}/{filename}", extra={"user": user, "job": name_job}) file_data = {"name": name_job, "file": filename, "user": user} - + try: file_path = f"{directory}/{user}/{name_job}/{filename}" - + try: file_obj = storage.get_file(file_path) - + local_file_path = os.path.join(full_path, filename) with open(local_file_path, 'wb') as f: shutil.copyfileobj(file_obj, f) - + # check if file is .zip and extract it if filename.endswith(".zip"): extract_zip_recursive( @@ -188,7 +194,7 @@ def ingest_worker( vector_dir = os.path.join(temp_dir, "vector_store") os.makedirs(vector_dir, exist_ok=True) - + embed_and_store_documents(docs, vector_dir, str(id), self) tokens = count_tokens_docs(docs) self.update_state(state="PROGRESS", meta={"current": 100}) @@ -203,11 +209,11 @@ def ingest_worker( "id": str(id), "type": "local", }) - + mongo = MongoDB.get_client() db = mongo["docsgpt"] sources_collection = db["sources"] - + sources_collection.insert_one({ "_id": id, "name": name_job, @@ -228,7 +234,7 @@ def ingest_worker( "user": user, "limited": False, } - + finally: if os.path.exists(temp_dir): shutil.rmtree(temp_dir)