feat: view and add document chunks for mongodb and faiss

This commit is contained in:
Siddhant Rai
2025-02-07 19:39:07 +05:30
parent f97b56a87b
commit 0379b81d43
11 changed files with 558 additions and 56 deletions

View File

@@ -1478,7 +1478,7 @@ class GetFeedbackAnalytics(Resource):
)
except Exception as err:
return make_response(jsonify({"success": False, "error": str(err)}), 400)
end_date = datetime.datetime.now(datetime.timezone.utc)
if filter_option == "last_hour":
@@ -1525,11 +1525,8 @@ class GetFeedbackAnalytics(Resource):
{"$match": {"queries.feedback": {"$exists": True}}},
{
"$group": {
"_id": {
"time": date_field,
"feedback": "$queries.feedback"
},
"count": {"$sum": 1}
"_id": {"time": date_field, "feedback": "$queries.feedback"},
"count": {"$sum": 1},
}
},
{
@@ -1540,7 +1537,7 @@ class GetFeedbackAnalytics(Resource):
"$cond": [
{"$eq": ["$_id.feedback", "LIKE"]},
"$count",
0
0,
]
}
},
@@ -1549,13 +1546,13 @@ class GetFeedbackAnalytics(Resource):
"$cond": [
{"$eq": ["$_id.feedback", "DISLIKE"]},
"$count",
0
0,
]
}
}
},
}
},
{"$sort": {"_id": 1}}
{"$sort": {"_id": 1}},
]
feedback_data = conversations_collection.aggregate(pipeline)
@@ -1574,7 +1571,7 @@ class GetFeedbackAnalytics(Resource):
for entry in feedback_data:
daily_feedback[entry["_id"]] = {
"positive": entry["positive"],
"negative": entry["negative"]
"negative": entry["negative"],
}
except Exception as err:
@@ -2031,3 +2028,128 @@ class DeleteTool(Resource):
return {"success": False, "error": str(err)}, 400
return {"success": True}, 200
def get_vector_store(source_id):
"""
Get the Vector Store
Args:
source_id (str): source id of the document
"""
store = VectorCreator.create_vectorstore(
settings.VECTOR_STORE,
source_id=source_id,
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
return store
@user_ns.route("/api/get_chunks")
class GetChunks(Resource):
@api.doc(
description="Retrieves all chunks associated with a document",
params={"id": "The document ID"},
)
def get(self):
doc_id = request.args.get("id")
page = int(request.args.get("page", 1))
per_page = int(request.args.get("per_page", 10))
if not ObjectId.is_valid(doc_id):
return make_response(jsonify({"error": "Invalid doc_id"}), 400)
try:
store = get_vector_store(doc_id)
chunks = store.get_chunks()
total_chunks = len(chunks)
start = (page - 1) * per_page
end = start + per_page
paginated_chunks = chunks[start:end]
return make_response(
jsonify(
{
"page": page,
"per_page": per_page,
"total": total_chunks,
"chunks": paginated_chunks,
}
),
200,
)
except Exception as e:
return make_response(jsonify({"error": str(e)}), 500)
@user_ns.route("/api/add_chunk")
class AddChunk(Resource):
@api.expect(
api.model(
"AddChunkModel",
{
"id": fields.String(required=True, description="Document ID"),
"text": fields.String(required=True, description="Text of the chunk"),
"metadata": fields.Raw(
required=False,
description="Metadata associated with the chunk",
),
},
)
)
@api.doc(
description="Adds a new chunk to the document",
)
def post(self):
data = request.get_json()
required_fields = ["id", "text"]
missing_fields = check_required_fields(data, required_fields)
if missing_fields:
return missing_fields
doc_id = data.get("id")
text = data.get("text")
metadata = data.get("metadata", {})
if not ObjectId.is_valid(doc_id):
return make_response(jsonify({"error": "Invalid doc_id"}), 400)
try:
store = get_vector_store(doc_id)
chunk_id = store.add_chunk(text, metadata)
return make_response(
jsonify({"message": "Chunk added successfully", "chunk_id": chunk_id}),
201,
)
except Exception as e:
return make_response(jsonify({"error": str(e)}), 500)
@user_ns.route("/api/delete_chunk")
class DeleteChunk(Resource):
@api.doc(
description="Deletes a specific chunk from the document.",
params={"id": "The document ID", "chunk_id": "The ID of the chunk to delete"},
)
def delete(self):
doc_id = request.args.get("id")
chunk_id = request.args.get("chunk_id")
if not ObjectId.is_valid(doc_id):
return make_response(jsonify({"error": "Invalid doc_id"}), 400)
try:
store = get_vector_store(doc_id)
deleted = store.delete_chunk(chunk_id)
if deleted:
return make_response(
jsonify({"message": "Chunk deleted successfully"}), 200
)
else:
return make_response(
jsonify({"message": "Chunk not found or could not be deleted"}),
404,
)
except Exception as e:
return make_response(jsonify({"error": str(e)}), 500)

View File

@@ -1,8 +1,12 @@
from langchain_community.vectorstores import FAISS
from application.vectorstore.base import BaseVectorStore
from application.core.settings import settings
import os
from langchain_community.vectorstores import FAISS
from application.core.settings import settings
from application.parser.schema.base import Document
from application.vectorstore.base import BaseVectorStore
def get_vectorstore(path: str) -> str:
if path:
vectorstore = os.path.join("application", "indexes", path)
@@ -10,9 +14,11 @@ def get_vectorstore(path: str) -> str:
vectorstore = os.path.join("application")
return vectorstore
class FaissStore(BaseVectorStore):
def __init__(self, source_id: str, embeddings_key: str, docs_init=None):
super().__init__()
self.source_id = source_id
self.path = get_vectorstore(source_id)
embeddings = self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key)
@@ -20,7 +26,9 @@ class FaissStore(BaseVectorStore):
if docs_init:
self.docsearch = FAISS.from_documents(docs_init, embeddings)
else:
self.docsearch = FAISS.load_local(self.path, embeddings, allow_dangerous_deserialization=True)
self.docsearch = FAISS.load_local(
self.path, embeddings, allow_dangerous_deserialization=True
)
except Exception:
raise
@@ -40,11 +48,53 @@ class FaissStore(BaseVectorStore):
def assert_embedding_dimensions(self, embeddings):
"""Check that the word embedding dimension of the docsearch index matches the dimension of the word embeddings used."""
if settings.EMBEDDINGS_NAME == "huggingface_sentence-transformers/all-mpnet-base-v2":
word_embedding_dimension = getattr(embeddings, 'dimension', None)
if (
settings.EMBEDDINGS_NAME
== "huggingface_sentence-transformers/all-mpnet-base-v2"
):
word_embedding_dimension = getattr(embeddings, "dimension", None)
if word_embedding_dimension is None:
raise AttributeError("'dimension' attribute not found in embeddings instance.")
raise AttributeError(
"'dimension' attribute not found in embeddings instance."
)
docsearch_index_dimension = self.docsearch.index.d
if word_embedding_dimension != docsearch_index_dimension:
raise ValueError(f"Embedding dimension mismatch: embeddings.dimension ({word_embedding_dimension}) != docsearch index dimension ({docsearch_index_dimension})")
raise ValueError(
f"Embedding dimension mismatch: embeddings.dimension ({word_embedding_dimension}) != docsearch index dimension ({docsearch_index_dimension})"
)
def get_chunks(self):
chunks = []
if self.docsearch:
for doc_id, doc in self.docsearch.docstore._dict.items():
chunk_data = {
"doc_id": doc_id,
"text": doc.page_content,
"metadata": doc.metadata,
}
chunks.append(chunk_data)
return chunks
def add_chunk(self, text, metadata=None):
metadata = metadata or {}
doc = Document(text=text, extra_info=metadata).to_langchain_format()
doc_id = self.docsearch.add_documents([doc])
self.save_local(self.path)
return doc_id
def delete_chunk(self, chunk_id):
docstore = self.docsearch.docstore._dict
if chunk_id not in docstore:
return False
del docstore[chunk_id]
documents = list(docstore.values())
if documents:
self.docsearch = FAISS.from_documents(documents, self.embeddings)
else:
self.docsearch = FAISS.from_texts([" "], self.embeddings)
self.save_local()
return True

View File

@@ -124,3 +124,52 @@ class MongoDBVectorStore(BaseVectorStore):
def delete_index(self, *args, **kwargs):
self._collection.delete_many({"source_id": self._source_id})
def get_chunks(self):
try:
chunks = []
cursor = self._collection.find({"source_id": self._source_id})
for doc in cursor:
doc_id = str(doc.get("_id"))
text = doc.get(self._text_key)
metadata = {
k: v
for k, v in doc.items()
if k
not in ["_id", self._text_key, self._embedding_key, "source_id"]
}
if text:
chunks.append(
{"doc_id": doc_id, "text": text, "metadata": metadata}
)
return chunks
except Exception as e:
return []
def add_chunk(self, text, metadata=None):
metadata = metadata or {}
embeddings = self._embedding.embed_documents([text])
if not embeddings:
raise ValueError("Could not generate embedding for chunk")
chunk_data = {
self._text_key: text,
self._embedding_key: embeddings[0],
"source_id": self._source_id,
**metadata,
}
result = self._collection.insert_one(chunk_data)
return str(result.inserted_id)
def delete_chunk(self, chunk_id):
try:
from bson.objectid import ObjectId
object_id = ObjectId(chunk_id)
result = self._collection.delete_one({"_id": object_id})
return result.deleted_count > 0
except Exception as e:
print(f"Error deleting chunk: {e}")
return False