mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-11-29 08:33:20 +00:00
Revert "(fix:indexes) look for the right path"
This reverts commit 5ad34e2216.
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
import os
|
||||
import datetime
|
||||
import logging
|
||||
from flask import Blueprint, request, send_from_directory
|
||||
from werkzeug.utils import secure_filename
|
||||
from bson.objectid import ObjectId
|
||||
@@ -47,7 +46,7 @@ def upload_index_files():
|
||||
sync_frequency = secure_filename(request.form["sync_frequency"]) if "sync_frequency" in request.form else None
|
||||
|
||||
storage = StorageCreator.create_storage(settings.STORAGE_TYPE)
|
||||
|
||||
|
||||
if settings.VECTOR_STORE == "faiss":
|
||||
if "file_faiss" not in request.files:
|
||||
print("No file part")
|
||||
@@ -61,18 +60,13 @@ def upload_index_files():
|
||||
file_pkl = request.files["file_pkl"]
|
||||
if file_pkl.filename == "":
|
||||
return {"status": "no file name"}
|
||||
|
||||
|
||||
# Save index files
|
||||
storage_path_faiss = f"indexes/{str(id)}/index.faiss"
|
||||
storage_path_pkl = f"indexes/{str(id)}/index.pkl"
|
||||
|
||||
try:
|
||||
storage.save_file(file_faiss, storage_path_faiss)
|
||||
storage.save_file(file_pkl, storage_path_pkl)
|
||||
logging.info(f"Successfully saved FAISS index files for ID {id}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error saving FAISS index files: {e}")
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
storage.save_file(file_faiss, storage_path_faiss)
|
||||
storage.save_file(file_pkl, storage_path_pkl)
|
||||
|
||||
existing_entry = sources_collection.find_one({"_id": ObjectId(id)})
|
||||
if existing_entry:
|
||||
|
||||
@@ -42,18 +42,17 @@ def embed_and_store_documents(docs, folder_name, source_id, task_status):
|
||||
|
||||
# Initialize vector store
|
||||
if settings.VECTOR_STORE == "faiss":
|
||||
|
||||
docs_init = [docs.pop(0)]
|
||||
store = VectorCreator.create_vectorstore(
|
||||
settings.VECTOR_STORE,
|
||||
docs_init=docs_init,
|
||||
source_id=str(source_id),
|
||||
source_id=folder_name,
|
||||
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
|
||||
)
|
||||
else:
|
||||
store = VectorCreator.create_vectorstore(
|
||||
settings.VECTOR_STORE,
|
||||
source_id=str(source_id),
|
||||
source_id=source_id,
|
||||
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
|
||||
)
|
||||
store.delete_index()
|
||||
@@ -83,6 +82,5 @@ def embed_and_store_documents(docs, folder_name, source_id, task_status):
|
||||
|
||||
# Save the vector store
|
||||
if settings.VECTOR_STORE == "faiss":
|
||||
# For FAISS, save to the temporary folder first
|
||||
store.save_local(folder_name)
|
||||
logging.info("Vector store saved successfully.")
|
||||
|
||||
@@ -1,45 +1,35 @@
|
||||
import os
|
||||
import tempfile
|
||||
import logging
|
||||
|
||||
from langchain_community.vectorstores import FAISS
|
||||
|
||||
from application.core.settings import settings
|
||||
from application.parser.schema.base import Document
|
||||
from application.vectorstore.base import BaseVectorStore
|
||||
from application.storage.storage_creator import StorageCreator
|
||||
|
||||
|
||||
def get_vectorstore_path(source_id: str) -> str:
|
||||
if source_id:
|
||||
clean_id = source_id.replace("application/indexes/", "").rstrip("/")
|
||||
return f"indexes/{clean_id}"
|
||||
def get_vectorstore(path: str) -> str:
|
||||
if path:
|
||||
vectorstore = os.path.join("application", "indexes", path)
|
||||
else:
|
||||
return "indexes"
|
||||
vectorstore = os.path.join("application")
|
||||
return vectorstore
|
||||
|
||||
|
||||
class FaissStore(BaseVectorStore):
|
||||
def __init__(self, source_id: str, embeddings_key: str, docs_init=None):
|
||||
super().__init__()
|
||||
self.source_id = source_id
|
||||
self.storage = StorageCreator.get_storage()
|
||||
self.storage_path = get_vectorstore_path(source_id)
|
||||
self.path = get_vectorstore(source_id)
|
||||
self.embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key)
|
||||
|
||||
try:
|
||||
if docs_init:
|
||||
self.docsearch = FAISS.from_documents(docs_init, self.embeddings)
|
||||
else:
|
||||
if self.storage.__class__.__name__ == "LocalStorage":
|
||||
# For local storage, we can use the path directly
|
||||
local_path = self.storage._get_full_path(self.storage_path)
|
||||
self.docsearch = FAISS.load_local(
|
||||
local_path, self.embeddings, allow_dangerous_deserialization=True
|
||||
)
|
||||
else:
|
||||
# For non-local storage (S3, etc.), download files to temp directory first
|
||||
self.docsearch = self._load_from_remote_storage()
|
||||
except Exception as e:
|
||||
logging.error(f"Error initializing FAISS store: {e}")
|
||||
self.docsearch = FAISS.load_local(
|
||||
self.path, self.embeddings, allow_dangerous_deserialization=True
|
||||
)
|
||||
except Exception:
|
||||
raise
|
||||
|
||||
self.assert_embedding_dimensions(self.embeddings)
|
||||
@@ -50,26 +40,8 @@ class FaissStore(BaseVectorStore):
|
||||
def add_texts(self, *args, **kwargs):
|
||||
return self.docsearch.add_texts(*args, **kwargs)
|
||||
|
||||
def save_local(self, folder_path=None):
|
||||
path_to_use = folder_path or self.storage_path
|
||||
|
||||
if folder_path or self.storage.__class__.__name__ == "LocalStorage":
|
||||
# If it's a local path or temp dir, save directly
|
||||
local_path = path_to_use
|
||||
if self.storage.__class__.__name__ == "LocalStorage" and not folder_path:
|
||||
local_path = self.storage._get_full_path(path_to_use)
|
||||
|
||||
os.makedirs(os.path.dirname(local_path) if os.path.dirname(local_path) else local_path, exist_ok=True)
|
||||
|
||||
self.docsearch.save_local(local_path)
|
||||
|
||||
if folder_path and self.storage.__class__.__name__ != "LocalStorage":
|
||||
self._upload_index_to_remote(folder_path)
|
||||
else:
|
||||
# For remote storage, save to temp dir first, then upload
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
self.docsearch.save_local(temp_dir)
|
||||
self._upload_index_to_remote(temp_dir)
|
||||
def save_local(self, *args, **kwargs):
|
||||
return self.docsearch.save_local(*args, **kwargs)
|
||||
|
||||
def delete_index(self, *args, **kwargs):
|
||||
return self.docsearch.delete(*args, **kwargs)
|
||||
@@ -108,62 +80,10 @@ class FaissStore(BaseVectorStore):
|
||||
metadata = metadata or {}
|
||||
doc = Document(text=text, extra_info=metadata).to_langchain_format()
|
||||
doc_id = self.docsearch.add_documents([doc])
|
||||
self.save_local()
|
||||
self.save_local(self.path)
|
||||
return doc_id
|
||||
|
||||
def delete_chunk(self, chunk_id):
|
||||
self.delete_index([chunk_id])
|
||||
self.save_local()
|
||||
self.save_local(self.path)
|
||||
return True
|
||||
|
||||
def _load_from_remote_storage(self):
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
try:
|
||||
# Check if both index files exist in remote storage
|
||||
faiss_path = f"{self.storage_path}/index.faiss"
|
||||
pkl_path = f"{self.storage_path}/index.pkl"
|
||||
|
||||
if not self.storage.file_exists(faiss_path) or not self.storage.file_exists(pkl_path):
|
||||
raise FileNotFoundError(f"FAISS index files not found at {self.storage_path}")
|
||||
|
||||
# Download both files to temp directory
|
||||
faiss_file = self.storage.get_file(faiss_path)
|
||||
pkl_file = self.storage.get_file(pkl_path)
|
||||
|
||||
local_faiss_path = os.path.join(temp_dir, "index.faiss")
|
||||
local_pkl_path = os.path.join(temp_dir, "index.pkl")
|
||||
|
||||
with open(local_faiss_path, 'wb') as f:
|
||||
f.write(faiss_file.read())
|
||||
|
||||
with open(local_pkl_path, 'wb') as f:
|
||||
f.write(pkl_file.read())
|
||||
|
||||
# Load the index from the temp directory
|
||||
return FAISS.load_local(
|
||||
temp_dir, self.embeddings, allow_dangerous_deserialization=True
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Error loading FAISS index from remote storage: {e}")
|
||||
raise
|
||||
|
||||
def _upload_index_to_remote(self, local_folder):
|
||||
try:
|
||||
# Get paths to the index files
|
||||
local_faiss_path = os.path.join(local_folder, "index.faiss")
|
||||
local_pkl_path = os.path.join(local_folder, "index.pkl")
|
||||
|
||||
remote_faiss_path = f"{self.storage_path}/index.faiss"
|
||||
remote_pkl_path = f"{self.storage_path}/index.pkl"
|
||||
|
||||
# Upload both files to remote storage
|
||||
with open(local_faiss_path, 'rb') as f:
|
||||
self.storage.save_file(f, remote_faiss_path)
|
||||
|
||||
with open(local_pkl_path, 'rb') as f:
|
||||
self.storage.save_file(f, remote_pkl_path)
|
||||
|
||||
logging.info(f"Successfully uploaded FAISS index to {self.storage_path}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error uploading FAISS index to remote storage: {e}")
|
||||
raise
|
||||
|
||||
@@ -89,12 +89,9 @@ def download_file(url, params, dest_path):
|
||||
def upload_index(full_path, file_data):
|
||||
try:
|
||||
if settings.VECTOR_STORE == "faiss":
|
||||
faiss_path = os.path.join(full_path, "index.faiss")
|
||||
pkl_path = os.path.join(full_path, "index.pkl")
|
||||
|
||||
files = {
|
||||
"file_faiss": open(faiss_path, "rb"),
|
||||
"file_pkl": open(pkl_path, "rb"),
|
||||
"file_faiss": open(full_path + "/index.faiss", "rb"),
|
||||
"file_pkl": open(full_path + "/index.pkl", "rb"),
|
||||
}
|
||||
response = requests.post(
|
||||
urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data
|
||||
@@ -107,11 +104,8 @@ def upload_index(full_path, file_data):
|
||||
except requests.RequestException as e:
|
||||
logging.error(f"Error uploading index: {e}")
|
||||
raise
|
||||
except FileNotFoundError as e:
|
||||
logging.error(f"File not found: {e}")
|
||||
raise
|
||||
finally:
|
||||
if settings.VECTOR_STORE == "faiss" and 'files' in locals():
|
||||
if settings.VECTOR_STORE == "faiss":
|
||||
for file in files.values():
|
||||
file.close()
|
||||
|
||||
@@ -143,23 +137,23 @@ def ingest_worker(
|
||||
storage = StorageCreator.create_storage(settings.STORAGE_TYPE)
|
||||
temp_dir = tempfile.mkdtemp()
|
||||
full_path = os.path.join(temp_dir, name_job)
|
||||
|
||||
|
||||
if not os.path.exists(full_path):
|
||||
os.makedirs(full_path)
|
||||
|
||||
logging.info(f"Ingest file: {directory}/{user}/{name_job}/{filename}", extra={"user": user, "job": name_job})
|
||||
file_data = {"name": name_job, "file": filename, "user": user}
|
||||
|
||||
|
||||
try:
|
||||
file_path = f"{directory}/{user}/{name_job}/{filename}"
|
||||
|
||||
|
||||
try:
|
||||
file_obj = storage.get_file(file_path)
|
||||
|
||||
|
||||
local_file_path = os.path.join(full_path, filename)
|
||||
with open(local_file_path, 'wb') as f:
|
||||
shutil.copyfileobj(file_obj, f)
|
||||
|
||||
|
||||
# check if file is .zip and extract it
|
||||
if filename.endswith(".zip"):
|
||||
extract_zip_recursive(
|
||||
@@ -194,7 +188,7 @@ def ingest_worker(
|
||||
|
||||
vector_dir = os.path.join(temp_dir, "vector_store")
|
||||
os.makedirs(vector_dir, exist_ok=True)
|
||||
|
||||
|
||||
embed_and_store_documents(docs, vector_dir, str(id), self)
|
||||
tokens = count_tokens_docs(docs)
|
||||
self.update_state(state="PROGRESS", meta={"current": 100})
|
||||
@@ -209,11 +203,11 @@ def ingest_worker(
|
||||
"id": str(id),
|
||||
"type": "local",
|
||||
})
|
||||
|
||||
|
||||
mongo = MongoDB.get_client()
|
||||
db = mongo["docsgpt"]
|
||||
sources_collection = db["sources"]
|
||||
|
||||
|
||||
sources_collection.insert_one({
|
||||
"_id": id,
|
||||
"name": name_job,
|
||||
@@ -234,7 +228,7 @@ def ingest_worker(
|
||||
"user": user,
|
||||
"limited": False,
|
||||
}
|
||||
|
||||
|
||||
finally:
|
||||
if os.path.exists(temp_dir):
|
||||
shutil.rmtree(temp_dir)
|
||||
|
||||
Reference in New Issue
Block a user