added error saving vector store (#2081)

* added error saving vector store

* fixed code formating

* added tests for embedding pipeline
This commit is contained in:
Harshit Ranjan
2025-10-31 19:59:35 +05:30
committed by GitHub
parent e086c79da0
commit 695191d888
2 changed files with 168 additions and 12 deletions

View File

@@ -1,5 +1,6 @@
import os import os
import logging import logging
from typing import List, Any
from retry import retry from retry import retry
from tqdm import tqdm from tqdm import tqdm
from application.core.settings import settings from application.core.settings import settings
@@ -22,13 +23,16 @@ def sanitize_content(content: str) -> str:
@retry(tries=10, delay=60) @retry(tries=10, delay=60)
def add_text_to_store_with_retry(store, doc, source_id): def add_text_to_store_with_retry(store: Any, doc: Any, source_id: str) -> None:
""" """Add a document's text and metadata to the vector store with retry logic.
Add a document's text and metadata to the vector store with retry logic.
Args: Args:
store: The vector store object. store: The vector store object.
doc: The document to be added. doc: The document to be added.
source_id: Unique identifier for the source. source_id: Unique identifier for the source.
Raises:
Exception: If document addition fails after all retry attempts.
""" """
try: try:
# Sanitize content to remove NUL characters that cause ingestion failures # Sanitize content to remove NUL characters that cause ingestion failures
@@ -41,18 +45,21 @@ def add_text_to_store_with_retry(store, doc, source_id):
raise raise
def embed_and_store_documents(docs, folder_name, source_id, task_status): def embed_and_store_documents(docs: List[Any], folder_name: str, source_id: str, task_status: Any) -> None:
""" """Embeds documents and stores them in a vector store.
Embeds documents and stores them in a vector store.
Args: Args:
docs (list): List of documents to be embedded and stored. docs: List of documents to be embedded and stored.
folder_name (str): Directory to save the vector store. folder_name: Directory to save the vector store.
source_id (str): Unique identifier for the source. source_id: Unique identifier for the source.
task_status: Task state manager for progress updates. task_status: Task state manager for progress updates.
Returns: Returns:
None None
Raises:
OSError: If unable to create folder or save vector store.
Exception: If vector store creation or document embedding fails.
""" """
# Ensure the folder exists # Ensure the folder exists
if not os.path.exists(folder_name): if not os.path.exists(folder_name):
@@ -95,10 +102,21 @@ def embed_and_store_documents(docs, folder_name, source_id, task_status):
except Exception as e: except Exception as e:
logging.error(f"Error embedding document {idx}: {e}", exc_info=True) logging.error(f"Error embedding document {idx}: {e}", exc_info=True)
logging.info(f"Saving progress at document {idx} out of {total_docs}") logging.info(f"Saving progress at document {idx} out of {total_docs}")
try:
store.save_local(folder_name) store.save_local(folder_name)
logging.info("Progress saved successfully")
except Exception as save_error:
logging.error(f"CRITICAL: Failed to save progress: {save_error}", exc_info=True)
# Continue without breaking to attempt final save
break break
# Save the vector store # Save the vector store
if settings.VECTOR_STORE == "faiss": if settings.VECTOR_STORE == "faiss":
try:
store.save_local(folder_name) store.save_local(folder_name)
logging.info("Vector store saved successfully.") logging.info("Vector store saved successfully.")
except Exception as e:
logging.error(f"CRITICAL: Failed to save final vector store: {e}", exc_info=True)
raise OSError(f"Unable to save vector store to {folder_name}: {e}") from e
else:
logging.info("Vector store saved successfully.")

View File

@@ -0,0 +1,138 @@
import pytest
import logging
from unittest.mock import patch, MagicMock
from application.parser.embedding_pipeline import (
sanitize_content,
add_text_to_store_with_retry,
embed_and_store_documents,
)
def test_sanitize_content_removes_nulls():
content = "This\x00is\x00a\x00test"
result = sanitize_content(content)
assert "\x00" not in result
assert result == "Thisisatest"
def test_sanitize_content_empty_or_none():
assert sanitize_content("") == ""
assert sanitize_content(None) is None
def test_add_text_to_store_with_retry_success():
store = MagicMock()
doc = MagicMock()
doc.page_content = "Test content"
doc.metadata = {}
add_text_to_store_with_retry(store, doc, "123")
store.add_texts.assert_called_once_with(
["Test content"], metadatas=[{"source_id": "123"}]
)
@pytest.fixture
def mock_settings(monkeypatch):
mock_settings = MagicMock()
monkeypatch.setattr(
"application.parser.embedding_pipeline.settings", mock_settings
)
return mock_settings
@pytest.fixture
def mock_vector_creator(monkeypatch):
mock_creator = MagicMock()
monkeypatch.setattr(
"application.parser.embedding_pipeline.VectorCreator", mock_creator
)
return mock_creator
def test_embed_and_store_documents_creates_folder(tmp_path, mock_settings, mock_vector_creator):
mock_settings.VECTOR_STORE = "faiss"
docs = [MagicMock(page_content="doc1", metadata={}), MagicMock(page_content="doc2", metadata={})]
folder_name = tmp_path / "test_store"
source_id = "xyz"
task_status = MagicMock()
mock_store = MagicMock()
mock_vector_creator.create_vectorstore.return_value = mock_store
embed_and_store_documents(docs, str(folder_name), source_id, task_status)
assert folder_name.exists()
mock_vector_creator.create_vectorstore.assert_called_once()
mock_store.save_local.assert_called_once_with(str(folder_name))
task_status.update_state.assert_called()
def test_embed_and_store_documents_non_faiss(tmp_path, mock_settings, mock_vector_creator):
mock_settings.VECTOR_STORE = "chromadb"
docs = [MagicMock(page_content="doc1", metadata={}), MagicMock(page_content="doc2", metadata={})]
folder_name = tmp_path / "chromadb_store"
source_id = "test123"
task_status = MagicMock()
mock_store = MagicMock()
mock_vector_creator.create_vectorstore.return_value = mock_store
embed_and_store_documents(docs, str(folder_name), source_id, task_status)
mock_store.delete_index.assert_called_once()
task_status.update_state.assert_called()
assert folder_name.exists()
@patch("application.parser.embedding_pipeline.add_text_to_store_with_retry")
def test_embed_and_store_documents_partial_failure(
mock_add_retry, tmp_path, mock_settings, mock_vector_creator, caplog
):
mock_settings.VECTOR_STORE = "faiss"
docs = [MagicMock(page_content="good", metadata={}), MagicMock(page_content="bad", metadata={})]
folder_name = tmp_path / "partial_fail"
source_id = "id123"
task_status = MagicMock()
mock_store = MagicMock()
mock_vector_creator.create_vectorstore.return_value = mock_store
# First document succeeds, second fails
def side_effect(*args, **kwargs):
if "bad" in args[1].page_content:
raise Exception("Embedding failed")
mock_add_retry.side_effect = side_effect
with caplog.at_level(logging.ERROR):
embed_and_store_documents(docs, str(folder_name), source_id, task_status)
assert "Error embedding document" in caplog.text
mock_store.save_local.assert_called()
def test_embed_and_store_documents_save_fails_raises_oserror(
tmp_path, mock_settings, mock_vector_creator
):
mock_settings.VECTOR_STORE = "faiss"
docs = [MagicMock(page_content="good", metadata={})]
folder_name = tmp_path / "save_fail"
source_id = "id789"
task_status = MagicMock()
mock_store = MagicMock()
mock_store.save_local.side_effect = Exception("Disk full")
mock_vector_creator.create_vectorstore.return_value = mock_store
with pytest.raises(OSError, match="Unable to save vector store"):
embed_and_store_documents(docs, str(folder_name), source_id, task_status)