From 2f9c72c1cfb3a828bdcf5ac76467f79b5af92bc0 Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 9 Sep 2024 15:46:18 +0100 Subject: [PATCH] feat: migrate store to source_id --- application/api/answer/routes.py | 4 ++-- application/api/internal/routes.py | 6 +++--- application/api/user/routes.py | 20 ++++++++++---------- application/parser/open_ai_func.py | 8 ++++---- application/vectorstore/elasticsearch.py | 10 +++++----- application/vectorstore/mongodb.py | 8 ++++---- application/vectorstore/qdrant.py | 6 +++--- application/worker.py | 2 -- frontend/src/api/endpoints.ts | 2 +- scripts/migrate_to_v1_vectorstore.py | 17 +++++++++++------ 10 files changed, 43 insertions(+), 40 deletions(-) diff --git a/application/api/answer/routes.py b/application/api/answer/routes.py index caca7c67..de9b8bb3 100644 --- a/application/api/answer/routes.py +++ b/application/api/answer/routes.py @@ -21,7 +21,7 @@ logger = logging.getLogger(__name__) mongo = MongoClient(settings.MONGO_URI) db = mongo["docsgpt"] conversations_collection = db["conversations"] -vectors_collection = db["vectors"] +sources_collection = db["sources"] prompts_collection = db["prompts"] api_key_collection = db["api_keys"] answer = Blueprint("answer", __name__) @@ -91,7 +91,7 @@ def get_data_from_api_key(api_key): def get_retriever(source_id: str): - doc = vectors_collection.find_one({"_id": ObjectId(source_id)}) + doc = sources_collection.find_one({"_id": ObjectId(source_id)}) if doc is None: raise Exception("Source document does not exist", 404) retriever_name = None if "retriever" not in doc else doc["retriever"] diff --git a/application/api/internal/routes.py b/application/api/internal/routes.py index f6eef4c4..cea6c8ca 100755 --- a/application/api/internal/routes.py +++ b/application/api/internal/routes.py @@ -9,7 +9,7 @@ from application.core.settings import settings mongo = MongoClient(settings.MONGO_URI) db = mongo["docsgpt"] conversations_collection = db["conversations"] -vectors_collection = db["vectors"] +sources_collection = db["sources"] current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -60,8 +60,8 @@ def upload_index_files(): os.makedirs(save_dir) file_faiss.save(os.path.join(save_dir, "index.faiss")) file_pkl.save(os.path.join(save_dir, "index.pkl")) - # create entry in vectors_collection - vectors_collection.insert_one( + # create entry in sources_collection + sources_collection.insert_one( { "_id": ObjectId(id), "user": user, diff --git a/application/api/user/routes.py b/application/api/user/routes.py index 2f422d4e..5ac6e741 100644 --- a/application/api/user/routes.py +++ b/application/api/user/routes.py @@ -17,7 +17,7 @@ from application.vectorstore.vector_creator import VectorCreator mongo = MongoClient(settings.MONGO_URI) db = mongo["docsgpt"] conversations_collection = db["conversations"] -vectors_collection = db["vectors"] +sources_collection = db["sources"] prompts_collection = db["prompts"] feedback_collection = db["feedback"] api_key_collection = db["api_keys"] @@ -106,7 +106,7 @@ def delete_by_ids(): return {"status": "error"} if settings.VECTOR_STORE == "faiss": - result = vectors_collection.delete_index(ids=ids) + result = sources_collection.delete_index(ids=ids) if result: return {"status": "ok"} return {"status": "error"} @@ -116,9 +116,9 @@ def delete_by_ids(): def delete_old(): """Delete old indexes.""" import shutil - path = request.args.get("path") - doc = vectors_collection.find_one({ - "_id": ObjectId(path), + source_id = request.args.get("source_id") + doc = sources_collection.find_one({ + "_id": ObjectId(source_id), "user": "local", }) if(doc is None): @@ -129,10 +129,10 @@ def delete_old(): except FileNotFoundError: pass else: - vetorstore = VectorCreator.create_vectorstore(settings.VECTOR_STORE, path=str(doc["_id"])) + vetorstore = VectorCreator.create_vectorstore(settings.VECTOR_STORE, source_id=str(doc["_id"])) vetorstore.delete_index() - vectors_collection.delete_one({ - "_id": ObjectId(path), + sources_collection.delete_one({ + "_id": ObjectId(source_id), }) return {"status": "ok"} @@ -244,8 +244,8 @@ def combined_json(): } ] # structure: name, language, version, description, fullName, date, docLink - # append data from vectors_collection in sorted order in descending order of date - for index in vectors_collection.find({"user": user}).sort("date", -1): + # append data from sources_collection in sorted order in descending order of date + for index in sources_collection.find({"user": user}).sort("date", -1): data.append( { "id": str(index["_id"]), diff --git a/application/parser/open_ai_func.py b/application/parser/open_ai_func.py index 30daee2e..84f92db9 100755 --- a/application/parser/open_ai_func.py +++ b/application/parser/open_ai_func.py @@ -12,8 +12,8 @@ from retry import retry @retry(tries=10, delay=60) def store_add_texts_with_retry(store, i, id): - # add store to the metadata - i.metadata["store"] = str(id) + # add source_id to the metadata + i.metadata["source_id"] = str(id) store.add_texts([i.page_content], metadatas=[i.metadata]) # store_pine.add_texts([i.page_content], metadatas=[i.metadata]) @@ -34,13 +34,13 @@ def call_openai_api(docs, folder_name, id, task_status): store = VectorCreator.create_vectorstore( settings.VECTOR_STORE, docs_init=docs_init, - path=f"{folder_name}", + source_id=f"{folder_name}", embeddings_key=os.getenv("EMBEDDINGS_KEY"), ) else: store = VectorCreator.create_vectorstore( settings.VECTOR_STORE, - path=str(id), + source_id=str(id), embeddings_key=os.getenv("EMBEDDINGS_KEY"), ) # Uncomment for MPNet embeddings diff --git a/application/vectorstore/elasticsearch.py b/application/vectorstore/elasticsearch.py index 061292b0..e393e4a5 100644 --- a/application/vectorstore/elasticsearch.py +++ b/application/vectorstore/elasticsearch.py @@ -9,9 +9,9 @@ import elasticsearch class ElasticsearchStore(BaseVectorStore): _es_connection = None # Class attribute to hold the Elasticsearch connection - def __init__(self, path, embeddings_key, index_name=settings.ELASTIC_INDEX): + def __init__(self, source_id, embeddings_key, index_name=settings.ELASTIC_INDEX): super().__init__() - self.path = path.replace("application/indexes/", "").rstrip("/") + self.source_id = source_id.replace("application/indexes/", "").rstrip("/") self.embeddings_key = embeddings_key self.index_name = index_name @@ -81,7 +81,7 @@ class ElasticsearchStore(BaseVectorStore): embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, self.embeddings_key) vector = embeddings.embed_query(question) knn = { - "filter": [{"match": {"metadata.store.keyword": self.path}}], + "filter": [{"match": {"metadata.source_id.keyword": self.source_id}}], "field": "vector", "k": k, "num_candidates": 100, @@ -100,7 +100,7 @@ class ElasticsearchStore(BaseVectorStore): } } ], - "filter": [{"match": {"metadata.store.keyword": self.path}}], + "filter": [{"match": {"metadata.source_id.keyword": self.source_id}}], } }, "rank": {"rrf": {}}, @@ -209,4 +209,4 @@ class ElasticsearchStore(BaseVectorStore): def delete_index(self): self._es_connection.delete_by_query(index=self.index_name, query={"match": { - "metadata.store.keyword": self.path}},) + "metadata.source_id.keyword": self.source_id}},) diff --git a/application/vectorstore/mongodb.py b/application/vectorstore/mongodb.py index 337fc41f..32bca489 100644 --- a/application/vectorstore/mongodb.py +++ b/application/vectorstore/mongodb.py @@ -5,7 +5,7 @@ from application.vectorstore.document_class import Document class MongoDBVectorStore(BaseVectorStore): def __init__( self, - path: str = "", + source_id: str = "", embeddings_key: str = "embeddings", collection: str = "documents", index_name: str = "vector_search_index", @@ -18,7 +18,7 @@ class MongoDBVectorStore(BaseVectorStore): self._embedding_key = embedding_key self._embeddings_key = embeddings_key self._mongo_uri = settings.MONGO_URI - self._path = path.replace("application/indexes/", "").rstrip("/") + self._source_id = source_id.replace("application/indexes/", "").rstrip("/") self._embedding = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key) try: @@ -46,7 +46,7 @@ class MongoDBVectorStore(BaseVectorStore): "numCandidates": k * 10, "index": self._index_name, "filter": { - "store": {"$eq": self._path} + "source_id": {"$eq": self._source_id} } } } @@ -123,4 +123,4 @@ class MongoDBVectorStore(BaseVectorStore): return result_ids def delete_index(self, *args, **kwargs): - self._collection.delete_many({"store": self._path}) \ No newline at end of file + self._collection.delete_many({"source_id": self._source_id}) \ No newline at end of file diff --git a/application/vectorstore/qdrant.py b/application/vectorstore/qdrant.py index 482d06a1..3f94505f 100644 --- a/application/vectorstore/qdrant.py +++ b/application/vectorstore/qdrant.py @@ -5,12 +5,12 @@ from qdrant_client import models class QdrantStore(BaseVectorStore): - def __init__(self, path: str = "", embeddings_key: str = "embeddings"): + def __init__(self, source_id: str = "", embeddings_key: str = "embeddings"): self._filter = models.Filter( must=[ models.FieldCondition( - key="metadata.store", - match=models.MatchValue(value=path.replace("application/indexes/", "").rstrip("/")), + key="metadata.source_id", + match=models.MatchValue(value=source_id.replace("application/indexes/", "").rstrip("/")), ) ] ) diff --git a/application/worker.py b/application/worker.py index 7abf0a02..40e66431 100755 --- a/application/worker.py +++ b/application/worker.py @@ -145,7 +145,6 @@ def ingest_worker(self, directory, formats, name_job, filename, user, retriever= "file_pkl": open(full_path + "/index.pkl", "rb"), } response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data) - response = requests.get(urljoin(settings.API_URL, "/api/delete_old?name=" + name_job + "&?user=" + user)) else: response = requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data) @@ -197,7 +196,6 @@ def remote_worker(self, source_data, name_job, user, loader, directory="temp", r } requests.post(urljoin(settings.API_URL, "/api/upload_index"), files=files, data=file_data) - requests.get(urljoin(settings.API_URL, "/api/delete_old?name=" + name_job + "&?user=" + user)) else: requests.post(urljoin(settings.API_URL, "/api/upload_index"), data=file_data) diff --git a/frontend/src/api/endpoints.ts b/frontend/src/api/endpoints.ts index af2fb920..c06ac3d2 100644 --- a/frontend/src/api/endpoints.ts +++ b/frontend/src/api/endpoints.ts @@ -10,7 +10,7 @@ const endpoints = { DELETE_PROMPT: '/api/delete_prompt', UPDATE_PROMPT: '/api/update_prompt', SINGLE_PROMPT: (id: string) => `/api/get_single_prompt?id=${id}`, - DELETE_PATH: (docPath: string) => `/api/delete_old?path=${docPath}`, + DELETE_PATH: (docPath: string) => `/api/delete_old?source_id=${docPath}`, TASK_STATUS: (task_id: string) => `/api/task_status?task_id=${task_id}`, }, CONVERSATION: { diff --git a/scripts/migrate_to_v1_vectorstore.py b/scripts/migrate_to_v1_vectorstore.py index 3a5a82f0..9a709795 100644 --- a/scripts/migrate_to_v1_vectorstore.py +++ b/scripts/migrate_to_v1_vectorstore.py @@ -5,6 +5,7 @@ def migrate_to_v1_vectorstore_mongo(): client = pymongo.MongoClient("mongodb://localhost:27017/") db = client["docsgpt"] vectors_collection = db["vectors"] + sources_collection = db["sources"] for vector in vectors_collection.find(): if "location" in vector: @@ -14,6 +15,12 @@ def migrate_to_v1_vectorstore_mongo(): vector["remote_data"] = None vectors_collection.update_one({"_id": vector["_id"]}, {"$set": vector}) + # move data from vectors_collection to sources_collection + for vector in vectors_collection.find(): + sources_collection.insert_one(vector) + + vectors_collection.drop() + client.close() def migrate_faiss_to_v1_vectorstore(): @@ -36,13 +43,11 @@ def migrate_mongo_atlas_vector_to_v1_vectorstore(): db = client["docsgpt"] vectors_collection = db["vectors"] + # mongodb atlas collection + documents_collection = db["documents"] + for vector in vectors_collection.find(): - if "location" in vector: - del vector["location"] - if "retriever" not in vector: - vector["retriever"] = "classic" - vector["remote_data"] = None - vectors_collection.update_one({"_id": vector["_id"]}, {"$set": vector}) + documents_collection.update_many({"store": vector["user"] + "/" + vector["name"]}, {"$set": {"source_id": str(vector["_id"])}}) client.close()