diff --git a/application/vectorstore/qdrant.py b/application/vectorstore/qdrant.py index 61a9d63d..4435378f 100644 --- a/application/vectorstore/qdrant.py +++ b/application/vectorstore/qdrant.py @@ -1,5 +1,7 @@ +import logging from application.vectorstore.base import BaseVectorStore from application.core.settings import settings +from application.vectorstore.document_class import Document class QdrantStore(BaseVectorStore): @@ -7,18 +9,22 @@ class QdrantStore(BaseVectorStore): from qdrant_client import models from langchain_community.vectorstores.qdrant import Qdrant + # Store the source_id for use in add_chunk + self._source_id = str(source_id).replace("application/indexes/", "").rstrip("/") + self._filter = models.Filter( must=[ models.FieldCondition( key="metadata.source_id", - match=models.MatchValue(value=source_id.replace("application/indexes/", "").rstrip("/")), + match=models.MatchValue(value=self._source_id), ) ] ) + embedding=self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key) self._docsearch = Qdrant.construct_instance( ["TEXT_TO_OBTAIN_EMBEDDINGS_DIMENSION"], - embedding=self._get_embeddings(settings.EMBEDDINGS_NAME, embeddings_key), + embedding=embedding, collection_name=settings.QDRANT_COLLECTION_NAME, location=settings.QDRANT_LOCATION, url=settings.QDRANT_URL, @@ -32,6 +38,32 @@ class QdrantStore(BaseVectorStore): path=settings.QDRANT_PATH, distance_func=settings.QDRANT_DISTANCE_FUNC, ) + try: + collections = self._docsearch.client.get_collections() + collection_exists = settings.QDRANT_COLLECTION_NAME in [ + collection.name for collection in collections.collections + ] + + if not collection_exists: + self._docsearch.client.recreate_collection( + collection_name=settings.QDRANT_COLLECTION_NAME, + vectors_config=models.VectorParams(size=embedding.client[1].word_embedding_dimension, distance=models.Distance.COSINE), + ) + + # Ensure the required index exists for metadata.source_id + try: + self._docsearch.client.create_payload_index( + collection_name=settings.QDRANT_COLLECTION_NAME, + field_name="metadata.source_id", + field_schema=models.PayloadSchemaType.KEYWORD, + ) + except Exception as index_error: + # Index might already exist, which is fine + if "already exists" not in str(index_error).lower(): + logging.warning(f"Could not create index for metadata.source_id: {index_error}") + + except Exception as e: + logging.warning(f"Could not check for collection: {e}") def search(self, *args, **kwargs): return self._docsearch.similarity_search(filter=self._filter, *args, **kwargs) @@ -46,3 +78,60 @@ class QdrantStore(BaseVectorStore): return self._docsearch.client.delete( collection_name=settings.QDRANT_COLLECTION_NAME, points_selector=self._filter ) + + def get_chunks(self): + try: + from qdrant_client import models + + chunks = [] + offset = None + while True: + records, offset = self._docsearch.client.scroll( + collection_name=settings.QDRANT_COLLECTION_NAME, + scroll_filter=self._filter, + limit=10, + with_payload=True, + with_vectors=False, + offset=offset, + ) + for record in records: + doc_id = record.id + text = record.payload.get("page_content") + metadata = record.payload.get("metadata") + chunks.append( + {"doc_id": doc_id, "text": text, "metadata": metadata} + ) + if offset is None: + break + return chunks + except Exception as e: + logging.error(f"Error getting chunks: {e}", exc_info=True) + return [] + + def add_chunk(self, text, metadata=None): + import uuid + metadata = metadata or {} + + # Create a copy to avoid modifying the original metadata + final_metadata = metadata.copy() + + # Ensure the source_id is in the metadata so the chunk can be found by filters + final_metadata["source_id"] = self._source_id + + doc = Document(page_content=text, metadata=final_metadata) + # Generate a unique ID for the document + doc_id = str(uuid.uuid4()) + doc.id = doc_id + doc_ids = self._docsearch.add_documents([doc]) + return doc_ids[0] if doc_ids else doc_id + + def delete_chunk(self, chunk_id): + try: + self._docsearch.client.delete( + collection_name=settings.QDRANT_COLLECTION_NAME, + points_selector=[chunk_id], + ) + return True + except Exception as e: + logging.error(f"Error deleting chunk: {e}", exc_info=True) + return False