From 695191d88814d571104c855b21c734860cf15d77 Mon Sep 17 00:00:00 2001 From: Harshit Ranjan <159914116+HarshitR2004@users.noreply.github.com> Date: Fri, 31 Oct 2025 19:59:35 +0530 Subject: [PATCH] added error saving vector store (#2081) * added error saving vector store * fixed code formating * added tests for embedding pipeline --- application/parser/embedding_pipeline.py | 42 ++++-- tests/parser/file/test_embedding_pipeline.py | 138 +++++++++++++++++++ 2 files changed, 168 insertions(+), 12 deletions(-) create mode 100644 tests/parser/file/test_embedding_pipeline.py diff --git a/application/parser/embedding_pipeline.py b/application/parser/embedding_pipeline.py index 7511f3df..a777b469 100755 --- a/application/parser/embedding_pipeline.py +++ b/application/parser/embedding_pipeline.py @@ -1,5 +1,6 @@ import os import logging +from typing import List, Any from retry import retry from tqdm import tqdm from application.core.settings import settings @@ -22,13 +23,16 @@ def sanitize_content(content: str) -> str: @retry(tries=10, delay=60) -def add_text_to_store_with_retry(store, doc, source_id): - """ - Add a document's text and metadata to the vector store with retry logic. +def add_text_to_store_with_retry(store: Any, doc: Any, source_id: str) -> None: + """Add a document's text and metadata to the vector store with retry logic. + Args: store: The vector store object. doc: The document to be added. source_id: Unique identifier for the source. + + Raises: + Exception: If document addition fails after all retry attempts. """ try: # Sanitize content to remove NUL characters that cause ingestion failures @@ -41,18 +45,21 @@ def add_text_to_store_with_retry(store, doc, source_id): raise -def embed_and_store_documents(docs, folder_name, source_id, task_status): - """ - Embeds documents and stores them in a vector store. +def embed_and_store_documents(docs: List[Any], folder_name: str, source_id: str, task_status: Any) -> None: + """Embeds documents and stores them in a vector store. Args: - docs (list): List of documents to be embedded and stored. - folder_name (str): Directory to save the vector store. - source_id (str): Unique identifier for the source. + docs: List of documents to be embedded and stored. + folder_name: Directory to save the vector store. + source_id: Unique identifier for the source. task_status: Task state manager for progress updates. Returns: None + + Raises: + OSError: If unable to create folder or save vector store. + Exception: If vector store creation or document embedding fails. """ # Ensure the folder exists if not os.path.exists(folder_name): @@ -95,10 +102,21 @@ def embed_and_store_documents(docs, folder_name, source_id, task_status): except Exception as e: logging.error(f"Error embedding document {idx}: {e}", exc_info=True) logging.info(f"Saving progress at document {idx} out of {total_docs}") - store.save_local(folder_name) + try: + store.save_local(folder_name) + logging.info("Progress saved successfully") + except Exception as save_error: + logging.error(f"CRITICAL: Failed to save progress: {save_error}", exc_info=True) + # Continue without breaking to attempt final save break # Save the vector store if settings.VECTOR_STORE == "faiss": - store.save_local(folder_name) - logging.info("Vector store saved successfully.") + try: + store.save_local(folder_name) + logging.info("Vector store saved successfully.") + except Exception as e: + logging.error(f"CRITICAL: Failed to save final vector store: {e}", exc_info=True) + raise OSError(f"Unable to save vector store to {folder_name}: {e}") from e + else: + logging.info("Vector store saved successfully.") diff --git a/tests/parser/file/test_embedding_pipeline.py b/tests/parser/file/test_embedding_pipeline.py new file mode 100644 index 00000000..b0524256 --- /dev/null +++ b/tests/parser/file/test_embedding_pipeline.py @@ -0,0 +1,138 @@ +import pytest +import logging +from unittest.mock import patch, MagicMock + +from application.parser.embedding_pipeline import ( + sanitize_content, + add_text_to_store_with_retry, + embed_and_store_documents, +) + + + +def test_sanitize_content_removes_nulls(): + content = "This\x00is\x00a\x00test" + result = sanitize_content(content) + assert "\x00" not in result + assert result == "Thisisatest" + + +def test_sanitize_content_empty_or_none(): + assert sanitize_content("") == "" + assert sanitize_content(None) is None + + + +def test_add_text_to_store_with_retry_success(): + store = MagicMock() + doc = MagicMock() + doc.page_content = "Test content" + doc.metadata = {} + + add_text_to_store_with_retry(store, doc, "123") + + store.add_texts.assert_called_once_with( + ["Test content"], metadatas=[{"source_id": "123"}] + ) + + +@pytest.fixture +def mock_settings(monkeypatch): + mock_settings = MagicMock() + monkeypatch.setattr( + "application.parser.embedding_pipeline.settings", mock_settings + ) + return mock_settings + + +@pytest.fixture +def mock_vector_creator(monkeypatch): + mock_creator = MagicMock() + monkeypatch.setattr( + "application.parser.embedding_pipeline.VectorCreator", mock_creator + ) + return mock_creator + + + +def test_embed_and_store_documents_creates_folder(tmp_path, mock_settings, mock_vector_creator): + mock_settings.VECTOR_STORE = "faiss" + + docs = [MagicMock(page_content="doc1", metadata={}), MagicMock(page_content="doc2", metadata={})] + folder_name = tmp_path / "test_store" + source_id = "xyz" + task_status = MagicMock() + + mock_store = MagicMock() + mock_vector_creator.create_vectorstore.return_value = mock_store + + embed_and_store_documents(docs, str(folder_name), source_id, task_status) + + assert folder_name.exists() + mock_vector_creator.create_vectorstore.assert_called_once() + mock_store.save_local.assert_called_once_with(str(folder_name)) + task_status.update_state.assert_called() + + +def test_embed_and_store_documents_non_faiss(tmp_path, mock_settings, mock_vector_creator): + mock_settings.VECTOR_STORE = "chromadb" + + docs = [MagicMock(page_content="doc1", metadata={}), MagicMock(page_content="doc2", metadata={})] + folder_name = tmp_path / "chromadb_store" + source_id = "test123" + task_status = MagicMock() + + mock_store = MagicMock() + mock_vector_creator.create_vectorstore.return_value = mock_store + + embed_and_store_documents(docs, str(folder_name), source_id, task_status) + + mock_store.delete_index.assert_called_once() + task_status.update_state.assert_called() + assert folder_name.exists() + + +@patch("application.parser.embedding_pipeline.add_text_to_store_with_retry") +def test_embed_and_store_documents_partial_failure( + mock_add_retry, tmp_path, mock_settings, mock_vector_creator, caplog +): + mock_settings.VECTOR_STORE = "faiss" + + docs = [MagicMock(page_content="good", metadata={}), MagicMock(page_content="bad", metadata={})] + folder_name = tmp_path / "partial_fail" + source_id = "id123" + task_status = MagicMock() + + mock_store = MagicMock() + mock_vector_creator.create_vectorstore.return_value = mock_store + + # First document succeeds, second fails + def side_effect(*args, **kwargs): + if "bad" in args[1].page_content: + raise Exception("Embedding failed") + mock_add_retry.side_effect = side_effect + + with caplog.at_level(logging.ERROR): + embed_and_store_documents(docs, str(folder_name), source_id, task_status) + + assert "Error embedding document" in caplog.text + mock_store.save_local.assert_called() + + +def test_embed_and_store_documents_save_fails_raises_oserror( + tmp_path, mock_settings, mock_vector_creator +): + mock_settings.VECTOR_STORE = "faiss" + + docs = [MagicMock(page_content="good", metadata={})] + folder_name = tmp_path / "save_fail" + source_id = "id789" + task_status = MagicMock() + + mock_store = MagicMock() + mock_store.save_local.side_effect = Exception("Disk full") + mock_vector_creator.create_vectorstore.return_value = mock_store + + with pytest.raises(OSError, match="Unable to save vector store"): + embed_and_store_documents(docs, str(folder_name), source_id, task_status) +