mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-11-29 16:43:16 +00:00
Merge pull request #1988 from ManishMadan2882/tester
Test coverage for parsers
This commit is contained in:
117
tests/parser/file/test_docs_parser.py
Normal file
117
tests/parser/file/test_docs_parser.py
Normal file
@@ -0,0 +1,117 @@
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from application.parser.file.docs_parser import PDFParser, DocxParser
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pdf_parser():
|
||||
return PDFParser()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def docx_parser():
|
||||
return DocxParser()
|
||||
|
||||
|
||||
def test_pdf_init_parser():
|
||||
parser = PDFParser()
|
||||
assert isinstance(parser._init_parser(), dict)
|
||||
assert not parser.parser_config_set
|
||||
parser.init_parser()
|
||||
assert parser.parser_config_set
|
||||
|
||||
|
||||
def test_docx_init_parser():
|
||||
parser = DocxParser()
|
||||
assert isinstance(parser._init_parser(), dict)
|
||||
assert not parser.parser_config_set
|
||||
parser.init_parser()
|
||||
assert parser.parser_config_set
|
||||
|
||||
|
||||
@patch("application.parser.file.docs_parser.settings")
|
||||
def test_parse_pdf_with_pypdf(mock_settings, pdf_parser):
|
||||
mock_settings.PARSE_PDF_AS_IMAGE = False
|
||||
|
||||
# Create mock pages with text content
|
||||
mock_page1 = MagicMock()
|
||||
mock_page1.extract_text.return_value = "Test PDF content page 1"
|
||||
mock_page2 = MagicMock()
|
||||
mock_page2.extract_text.return_value = "Test PDF content page 2"
|
||||
|
||||
mock_reader_instance = MagicMock()
|
||||
mock_reader_instance.pages = [mock_page1, mock_page2]
|
||||
|
||||
original_parse_file = pdf_parser.parse_file
|
||||
|
||||
def mock_parse_file(*args, **kwargs):
|
||||
_ = args, kwargs
|
||||
text_list = []
|
||||
num_pages = len(mock_reader_instance.pages)
|
||||
for page_index in range(num_pages):
|
||||
page = mock_reader_instance.pages[page_index]
|
||||
page_text = page.extract_text()
|
||||
text_list.append(page_text)
|
||||
text = "\n".join(text_list)
|
||||
return text
|
||||
|
||||
pdf_parser.parse_file = mock_parse_file
|
||||
|
||||
try:
|
||||
result = pdf_parser.parse_file(Path("test.pdf"))
|
||||
assert result == "Test PDF content page 1\nTest PDF content page 2"
|
||||
finally:
|
||||
pdf_parser.parse_file = original_parse_file
|
||||
|
||||
|
||||
@patch("application.parser.file.docs_parser.settings")
|
||||
def test_parse_pdf_pypdf_import_error(mock_settings, pdf_parser):
|
||||
mock_settings.PARSE_PDF_AS_IMAGE = False
|
||||
|
||||
original_parse_file = pdf_parser.parse_file
|
||||
|
||||
def mock_parse_file(*args, **kwargs):
|
||||
_ = args, kwargs
|
||||
raise ValueError("pypdf is required to read PDF files.")
|
||||
|
||||
pdf_parser.parse_file = mock_parse_file
|
||||
|
||||
try:
|
||||
with pytest.raises(ValueError, match="pypdf is required to read PDF files"):
|
||||
pdf_parser.parse_file(Path("test.pdf"))
|
||||
finally:
|
||||
pdf_parser.parse_file = original_parse_file
|
||||
|
||||
|
||||
def test_parse_docx(docx_parser):
|
||||
original_parse_file = docx_parser.parse_file
|
||||
|
||||
def mock_parse_file(*args, **kwargs):
|
||||
_ = args, kwargs
|
||||
return "Test DOCX content"
|
||||
|
||||
docx_parser.parse_file = mock_parse_file
|
||||
|
||||
try:
|
||||
result = docx_parser.parse_file(Path("test.docx"))
|
||||
assert result == "Test DOCX content"
|
||||
finally:
|
||||
docx_parser.parse_file = original_parse_file
|
||||
|
||||
|
||||
def test_parse_docx_import_error(docx_parser):
|
||||
original_parse_file = docx_parser.parse_file
|
||||
|
||||
def mock_parse_file(*args, **kwargs):
|
||||
_ = args, kwargs
|
||||
raise ValueError("docx2txt is required to read Microsoft Word files.")
|
||||
|
||||
docx_parser.parse_file = mock_parse_file
|
||||
|
||||
try:
|
||||
with pytest.raises(ValueError, match="docx2txt is required to read Microsoft Word files"):
|
||||
docx_parser.parse_file(Path("test.docx"))
|
||||
finally:
|
||||
docx_parser.parse_file = original_parse_file
|
||||
152
tests/parser/file/test_epub_parser.py
Normal file
152
tests/parser/file/test_epub_parser.py
Normal file
@@ -0,0 +1,152 @@
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
import sys
|
||||
import types
|
||||
|
||||
from application.parser.file.epub_parser import EpubParser
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def epub_parser():
|
||||
return EpubParser()
|
||||
|
||||
|
||||
def test_epub_init_parser():
|
||||
parser = EpubParser()
|
||||
assert isinstance(parser._init_parser(), dict)
|
||||
assert not parser.parser_config_set
|
||||
parser.init_parser()
|
||||
assert parser.parser_config_set
|
||||
|
||||
|
||||
def test_epub_parser_ebooklib_import_error(epub_parser):
|
||||
"""Test that ImportError is raised when ebooklib is not available."""
|
||||
with patch.dict(sys.modules, {"ebooklib": None}):
|
||||
with pytest.raises(ValueError, match="`EbookLib` is required to read Epub files"):
|
||||
epub_parser.parse_file(Path("test.epub"))
|
||||
|
||||
|
||||
def test_epub_parser_html2text_import_error(epub_parser):
|
||||
"""Test that ImportError is raised when html2text is not available."""
|
||||
fake_ebooklib = types.ModuleType("ebooklib")
|
||||
fake_epub = types.ModuleType("ebooklib.epub")
|
||||
fake_ebooklib.epub = fake_epub
|
||||
|
||||
with patch.dict(sys.modules, {"ebooklib": fake_ebooklib, "ebooklib.epub": fake_epub}):
|
||||
with patch.dict(sys.modules, {"html2text": None}):
|
||||
with pytest.raises(ValueError, match="`html2text` is required to parse Epub files"):
|
||||
epub_parser.parse_file(Path("test.epub"))
|
||||
|
||||
|
||||
def test_epub_parser_successful_parsing(epub_parser):
|
||||
"""Test successful parsing of an epub file."""
|
||||
|
||||
fake_ebooklib = types.ModuleType("ebooklib")
|
||||
fake_epub = types.ModuleType("ebooklib.epub")
|
||||
fake_html2text = types.ModuleType("html2text")
|
||||
|
||||
# Mock ebooklib constants
|
||||
fake_ebooklib.ITEM_DOCUMENT = "document"
|
||||
fake_ebooklib.epub = fake_epub
|
||||
|
||||
mock_item1 = MagicMock()
|
||||
mock_item1.get_type.return_value = "document"
|
||||
mock_item1.get_content.return_value = b"<h1>Chapter 1</h1><p>Content 1</p>"
|
||||
|
||||
mock_item2 = MagicMock()
|
||||
mock_item2.get_type.return_value = "document"
|
||||
mock_item2.get_content.return_value = b"<h1>Chapter 2</h1><p>Content 2</p>"
|
||||
|
||||
mock_item3 = MagicMock()
|
||||
mock_item3.get_type.return_value = "other" # Should be ignored
|
||||
mock_item3.get_content.return_value = b"<p>Other content</p>"
|
||||
|
||||
mock_book = MagicMock()
|
||||
mock_book.get_items.return_value = [mock_item1, mock_item2, mock_item3]
|
||||
|
||||
fake_epub.read_epub = MagicMock(return_value=mock_book)
|
||||
|
||||
def mock_html2text_func(html_content):
|
||||
if "Chapter 1" in html_content:
|
||||
return "# Chapter 1\n\nContent 1\n"
|
||||
elif "Chapter 2" in html_content:
|
||||
return "# Chapter 2\n\nContent 2\n"
|
||||
return "Other content\n"
|
||||
|
||||
fake_html2text.html2text = mock_html2text_func
|
||||
|
||||
with patch.dict(sys.modules, {
|
||||
"ebooklib": fake_ebooklib,
|
||||
"ebooklib.epub": fake_epub,
|
||||
"html2text": fake_html2text
|
||||
}):
|
||||
result = epub_parser.parse_file(Path("test.epub"))
|
||||
|
||||
expected_result = "# Chapter 1\n\nContent 1\n\n# Chapter 2\n\nContent 2\n"
|
||||
assert result == expected_result
|
||||
|
||||
# Verify epub.read_epub was called with correct parameters
|
||||
fake_epub.read_epub.assert_called_once_with(Path("test.epub"), options={"ignore_ncx": True})
|
||||
|
||||
|
||||
def test_epub_parser_empty_book(epub_parser):
|
||||
"""Test parsing an epub file with no document items."""
|
||||
# Create mock modules
|
||||
fake_ebooklib = types.ModuleType("ebooklib")
|
||||
fake_epub = types.ModuleType("ebooklib.epub")
|
||||
fake_html2text = types.ModuleType("html2text")
|
||||
|
||||
fake_ebooklib.ITEM_DOCUMENT = "document"
|
||||
fake_ebooklib.epub = fake_epub
|
||||
|
||||
# Create mock book with no document items
|
||||
mock_book = MagicMock()
|
||||
mock_book.get_items.return_value = []
|
||||
|
||||
fake_epub.read_epub = MagicMock(return_value=mock_book)
|
||||
fake_html2text.html2text = MagicMock()
|
||||
|
||||
with patch.dict(sys.modules, {
|
||||
"ebooklib": fake_ebooklib,
|
||||
"ebooklib.epub": fake_epub,
|
||||
"html2text": fake_html2text
|
||||
}):
|
||||
result = epub_parser.parse_file(Path("empty.epub"))
|
||||
assert result == ""
|
||||
|
||||
fake_html2text.html2text.assert_not_called()
|
||||
|
||||
|
||||
def test_epub_parser_non_document_items_ignored(epub_parser):
|
||||
"""Test that non-document items are ignored during parsing."""
|
||||
fake_ebooklib = types.ModuleType("ebooklib")
|
||||
fake_epub = types.ModuleType("ebooklib.epub")
|
||||
fake_html2text = types.ModuleType("html2text")
|
||||
|
||||
fake_ebooklib.ITEM_DOCUMENT = "document"
|
||||
fake_ebooklib.epub = fake_epub
|
||||
|
||||
mock_doc_item = MagicMock()
|
||||
mock_doc_item.get_type.return_value = "document"
|
||||
mock_doc_item.get_content.return_value = b"<p>Document content</p>"
|
||||
|
||||
mock_other_item = MagicMock()
|
||||
mock_other_item.get_type.return_value = "image" # Not a document
|
||||
|
||||
mock_book = MagicMock()
|
||||
mock_book.get_items.return_value = [mock_other_item, mock_doc_item]
|
||||
|
||||
fake_epub.read_epub = MagicMock(return_value=mock_book)
|
||||
fake_html2text.html2text = MagicMock(return_value="Document content\n")
|
||||
|
||||
with patch.dict(sys.modules, {
|
||||
"ebooklib": fake_ebooklib,
|
||||
"ebooklib.epub": fake_epub,
|
||||
"html2text": fake_html2text
|
||||
}):
|
||||
result = epub_parser.parse_file(Path("test.epub"))
|
||||
|
||||
assert result == "Document content\n"
|
||||
|
||||
fake_html2text.html2text.assert_called_once_with("<p>Document content</p>")
|
||||
44
tests/parser/file/test_html_parser.py
Normal file
44
tests/parser/file/test_html_parser.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import sys
|
||||
import types
|
||||
|
||||
from application.parser.file.html_parser import HTMLParser
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def html_parser():
|
||||
return HTMLParser()
|
||||
|
||||
|
||||
def test_html_init_parser():
|
||||
parser = HTMLParser()
|
||||
assert isinstance(parser._init_parser(), dict)
|
||||
assert not parser.parser_config_set
|
||||
parser.init_parser()
|
||||
assert parser.parser_config_set
|
||||
|
||||
|
||||
def test_html_parser_parse_file():
|
||||
parser = HTMLParser()
|
||||
mock_doc = MagicMock()
|
||||
mock_doc.page_content = "Extracted HTML content"
|
||||
mock_doc.metadata = {"source": "test.html"}
|
||||
|
||||
import types, sys
|
||||
fake_lc = types.ModuleType("langchain_community")
|
||||
fake_dl = types.ModuleType("langchain_community.document_loaders")
|
||||
|
||||
bshtml_mock = MagicMock(return_value=MagicMock(load=MagicMock(return_value=[mock_doc])))
|
||||
fake_dl.BSHTMLLoader = bshtml_mock
|
||||
fake_lc.document_loaders = fake_dl
|
||||
|
||||
with patch.dict(sys.modules, {
|
||||
"langchain_community": fake_lc,
|
||||
"langchain_community.document_loaders": fake_dl,
|
||||
}):
|
||||
result = parser.parse_file(Path("test.html"))
|
||||
assert result == [mock_doc]
|
||||
bshtml_mock.assert_called_once_with(Path("test.html"))
|
||||
42
tests/parser/file/test_image_parser.py
Normal file
42
tests/parser/file/test_image_parser.py
Normal file
@@ -0,0 +1,42 @@
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock, mock_open
|
||||
|
||||
from application.parser.file.image_parser import ImageParser
|
||||
|
||||
|
||||
def test_image_init_parser():
|
||||
parser = ImageParser()
|
||||
assert isinstance(parser._init_parser(), dict)
|
||||
assert not parser.parser_config_set
|
||||
parser.init_parser()
|
||||
assert parser.parser_config_set
|
||||
|
||||
|
||||
@patch("application.parser.file.image_parser.settings")
|
||||
def test_image_parser_remote_true(mock_settings):
|
||||
mock_settings.PARSE_IMAGE_REMOTE = True
|
||||
parser = ImageParser()
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.json.return_value = {"markdown": "# From Image"}
|
||||
|
||||
with patch("application.parser.file.image_parser.requests.post", return_value=mock_response) as mock_post:
|
||||
with patch("builtins.open", mock_open()):
|
||||
result = parser.parse_file(Path("img.png"))
|
||||
|
||||
assert result == "# From Image"
|
||||
mock_post.assert_called_once()
|
||||
|
||||
|
||||
@patch("application.parser.file.image_parser.settings")
|
||||
def test_image_parser_remote_false(mock_settings):
|
||||
mock_settings.PARSE_IMAGE_REMOTE = False
|
||||
parser = ImageParser()
|
||||
|
||||
with patch("application.parser.file.image_parser.requests.post") as mock_post:
|
||||
result = parser.parse_file(Path("img.png"))
|
||||
|
||||
assert result == ""
|
||||
mock_post.assert_not_called()
|
||||
|
||||
49
tests/parser/file/test_json_parser.py
Normal file
49
tests/parser/file/test_json_parser.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, mock_open
|
||||
|
||||
from application.parser.file.json_parser import JSONParser
|
||||
|
||||
|
||||
def test_json_init_parser():
|
||||
parser = JSONParser()
|
||||
assert isinstance(parser._init_parser(), dict)
|
||||
assert not parser.parser_config_set
|
||||
parser.init_parser()
|
||||
assert parser.parser_config_set
|
||||
|
||||
|
||||
def test_json_parser_parses_dict_concat():
|
||||
parser = JSONParser()
|
||||
with patch("builtins.open", mock_open(read_data="{}")):
|
||||
with patch("json.load", return_value={"a": 1}):
|
||||
result = parser.parse_file(Path("t.json"))
|
||||
assert result == "{'a': 1}"
|
||||
|
||||
|
||||
def test_json_parser_parses_list_no_concat():
|
||||
parser = JSONParser()
|
||||
parser._concat_rows = False
|
||||
data = [{"a": 1}, {"b": 2}]
|
||||
with patch("builtins.open", mock_open(read_data="[]")):
|
||||
with patch("json.load", return_value=data):
|
||||
result = parser.parse_file(Path("t.json"))
|
||||
assert result == data
|
||||
|
||||
|
||||
def test_json_parser_row_joiner_config():
|
||||
parser = JSONParser(row_joiner=" || ")
|
||||
with patch("builtins.open", mock_open(read_data="[]")):
|
||||
with patch("json.load", return_value=[{"a": 1}, {"b": 2}]):
|
||||
result = parser.parse_file(Path("t.json"))
|
||||
assert result == "{'a': 1} || {'b': 2}"
|
||||
|
||||
|
||||
def test_json_parser_forwards_json_config():
|
||||
pf = lambda s: 1.23
|
||||
parser = JSONParser(json_config={"parse_float": pf})
|
||||
with patch("builtins.open", mock_open(read_data="[]")):
|
||||
with patch("json.load", return_value=[]) as mock_load:
|
||||
parser.parse_file(Path("t.json"))
|
||||
assert mock_load.call_args.kwargs.get("parse_float") is pf
|
||||
|
||||
63
tests/parser/file/test_markdown_parser.py
Normal file
63
tests/parser/file/test_markdown_parser.py
Normal file
@@ -0,0 +1,63 @@
|
||||
from pathlib import Path
|
||||
from unittest.mock import mock_open, patch
|
||||
|
||||
import sys, types
|
||||
if "tiktoken" not in sys.modules:
|
||||
fake_tt = types.ModuleType("tiktoken")
|
||||
|
||||
class _Enc:
|
||||
def encode(self, s: str):
|
||||
return list(s)
|
||||
|
||||
def get_encoding(_: str):
|
||||
return _Enc()
|
||||
|
||||
fake_tt.get_encoding = get_encoding
|
||||
sys.modules["tiktoken"] = fake_tt
|
||||
|
||||
import tiktoken
|
||||
|
||||
from application.parser.file.markdown_parser import MarkdownParser
|
||||
|
||||
def test_markdown_init_parser():
|
||||
parser = MarkdownParser()
|
||||
assert isinstance(parser._init_parser(), dict)
|
||||
assert not parser.parser_config_set
|
||||
parser.init_parser()
|
||||
assert parser.parser_config_set
|
||||
|
||||
|
||||
def test_markdown_parse_file_basic_structure():
|
||||
content = "# Title\npara1\npara2\n## Sub\ntext\n"
|
||||
parser = MarkdownParser()
|
||||
with patch("builtins.open", mock_open(read_data=content)):
|
||||
result = parser.parse_file(Path("doc.md"))
|
||||
assert isinstance(result, list) and len(result) >= 2
|
||||
|
||||
assert "Title" in result[0]
|
||||
assert "para1" in result[0] and "para2" in result[0]
|
||||
assert "Sub" in result[1]
|
||||
assert "text" in result[1]
|
||||
|
||||
|
||||
def test_markdown_removes_links_and_images_in_parse():
|
||||
content = "# T\nSee [link](http://x) and ![[img.png]] here.\n"
|
||||
parser = MarkdownParser()
|
||||
with patch("builtins.open", mock_open(read_data=content)):
|
||||
result = parser.parse_file(Path("doc.md"))
|
||||
joined = "\n".join(result)
|
||||
assert "(http://x)" not in joined
|
||||
assert "![[img.png]]" not in joined
|
||||
assert "link" in joined
|
||||
|
||||
|
||||
def test_markdown_token_chunking_via_max_tokens():
|
||||
|
||||
raw = "abcdefghij" # 10 chars
|
||||
parser = MarkdownParser(max_tokens=4)
|
||||
with patch("builtins.open", mock_open(read_data=raw)):
|
||||
tups = parser.parse_tups(Path("doc.md"))
|
||||
assert len(tups) > 1
|
||||
for _hdr, chunk in tups:
|
||||
assert len(chunk) <= 4
|
||||
|
||||
61
tests/parser/file/test_pptx_parser.py
Normal file
61
tests/parser/file/test_pptx_parser.py
Normal file
@@ -0,0 +1,61 @@
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
from application.parser.file.pptx_parser import PPTXParser
|
||||
|
||||
|
||||
def test_pptx_init_parser():
|
||||
parser = PPTXParser()
|
||||
assert isinstance(parser._init_parser(), dict)
|
||||
assert not parser.parser_config_set
|
||||
parser.init_parser()
|
||||
assert parser.parser_config_set
|
||||
|
||||
|
||||
def _fake_presentation_with(slides_shapes_texts):
|
||||
class Shape:
|
||||
def __init__(self, text=None):
|
||||
if text is not None:
|
||||
self.text = text
|
||||
class Slide:
|
||||
def __init__(self, texts):
|
||||
self.shapes = [Shape(t) for t in texts]
|
||||
class Pres:
|
||||
def __init__(self, _file):
|
||||
self.slides = [Slide(texts) for texts in slides_shapes_texts]
|
||||
return Pres
|
||||
|
||||
|
||||
def test_pptx_parser_concat_true():
|
||||
slides = [["Hello ", "World"], ["Slide2"]]
|
||||
FakePres = _fake_presentation_with(slides)
|
||||
import sys, types
|
||||
fake_pptx = types.ModuleType("pptx")
|
||||
fake_pptx.Presentation = FakePres
|
||||
parser = PPTXParser()
|
||||
with patch.dict(sys.modules, {"pptx": fake_pptx}):
|
||||
result = parser.parse_file(Path("deck.pptx"))
|
||||
assert result == "Hello World\nSlide2"
|
||||
|
||||
|
||||
def test_pptx_parser_list_mode():
|
||||
slides = [[" A ", "B"], [" C "]]
|
||||
FakePres = _fake_presentation_with(slides)
|
||||
import sys, types
|
||||
fake_pptx = types.ModuleType("pptx")
|
||||
fake_pptx.Presentation = FakePres
|
||||
parser = PPTXParser()
|
||||
parser._concat_slides = False
|
||||
with patch.dict(sys.modules, {"pptx": fake_pptx}):
|
||||
result = parser.parse_file(Path("deck.pptx"))
|
||||
assert result == ["A B", "C"]
|
||||
|
||||
|
||||
def test_pptx_parser_import_error():
|
||||
parser = PPTXParser()
|
||||
import sys
|
||||
with patch.dict(sys.modules, {"pptx": None}):
|
||||
with pytest.raises(ImportError, match="pptx module is required to read .PPTX files"):
|
||||
parser.parse_file(Path("missing.pptx"))
|
||||
|
||||
284
tests/parser/file/test_rst_parser.py
Normal file
284
tests/parser/file/test_rst_parser.py
Normal file
@@ -0,0 +1,284 @@
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, mock_open
|
||||
|
||||
from application.parser.file.rst_parser import RstParser
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def rst_parser():
|
||||
return RstParser()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def rst_parser_custom():
|
||||
return RstParser(
|
||||
remove_hyperlinks=False,
|
||||
remove_images=False,
|
||||
remove_table_excess=False,
|
||||
remove_interpreters=False,
|
||||
remove_directives=False,
|
||||
remove_whitespaces_excess=False,
|
||||
remove_characters_excess=False
|
||||
)
|
||||
|
||||
|
||||
def test_rst_init_parser():
|
||||
parser = RstParser()
|
||||
assert isinstance(parser._init_parser(), dict)
|
||||
assert not parser.parser_config_set
|
||||
parser.init_parser()
|
||||
assert parser.parser_config_set
|
||||
|
||||
|
||||
def test_rst_parser_initialization_with_custom_options():
|
||||
"""Test RstParser initialization with custom options."""
|
||||
parser = RstParser(
|
||||
remove_hyperlinks=False,
|
||||
remove_images=False,
|
||||
remove_table_excess=False,
|
||||
remove_interpreters=False,
|
||||
remove_directives=False,
|
||||
remove_whitespaces_excess=False,
|
||||
remove_characters_excess=False
|
||||
)
|
||||
|
||||
assert not parser._remove_hyperlinks
|
||||
assert not parser._remove_images
|
||||
assert not parser._remove_table_excess
|
||||
assert not parser._remove_interpreters
|
||||
assert not parser._remove_directives
|
||||
assert not parser._remove_whitespaces_excess
|
||||
assert not parser._remove_characters_excess
|
||||
|
||||
|
||||
def test_rst_parser_default_initialization():
|
||||
"""Test RstParser initialization with default options."""
|
||||
parser = RstParser()
|
||||
|
||||
assert parser._remove_hyperlinks
|
||||
assert parser._remove_images
|
||||
assert parser._remove_table_excess
|
||||
assert parser._remove_interpreters
|
||||
assert parser._remove_directives
|
||||
assert parser._remove_whitespaces_excess
|
||||
assert parser._remove_characters_excess
|
||||
|
||||
|
||||
def test_remove_hyperlinks():
|
||||
"""Test hyperlink removal functionality."""
|
||||
parser = RstParser()
|
||||
content = "This is a `link text <http://example.com>`_ and more text."
|
||||
result = parser.remove_hyperlinks(content)
|
||||
assert result == "This is a link text and more text."
|
||||
|
||||
|
||||
def test_remove_images():
|
||||
"""Test image removal functionality."""
|
||||
parser = RstParser()
|
||||
content = "Some text\n.. image:: path/to/image.png\nMore text"
|
||||
result = parser.remove_images(content)
|
||||
assert result == "Some text\n\nMore text"
|
||||
|
||||
|
||||
def test_remove_directives():
|
||||
"""Test directive removal functionality."""
|
||||
parser = RstParser()
|
||||
content = "Text with `..note::` directive and more text"
|
||||
result = parser.remove_directives(content)
|
||||
# The regex pattern looks for `..something::` so it should remove `..note::`
|
||||
assert result == "Text with ` directive and more text"
|
||||
|
||||
|
||||
def test_remove_interpreters():
|
||||
"""Test interpreter removal functionality."""
|
||||
parser = RstParser()
|
||||
content = "Text with :doc: role and :ref: another role"
|
||||
result = parser.remove_interpreters(content)
|
||||
assert result == "Text with role and another role"
|
||||
|
||||
|
||||
def test_remove_table_excess():
|
||||
"""Test table separator removal functionality."""
|
||||
parser = RstParser()
|
||||
content = "Header\n+-----+-----+\n| A | B |\n+-----+-----+\nFooter"
|
||||
result = parser.remove_table_excess(content)
|
||||
assert "+-----+-----+" not in result
|
||||
assert "Header" in result
|
||||
assert "| A | B |" in result
|
||||
assert "Footer" in result
|
||||
|
||||
|
||||
def test_chunk_by_token_count():
|
||||
"""Test token-based chunking functionality."""
|
||||
parser = RstParser()
|
||||
text = "This is a long text that should be chunked into smaller pieces based on token count"
|
||||
chunks = parser.chunk_by_token_count(text, max_tokens=5)
|
||||
|
||||
# Should create multiple chunks
|
||||
assert len(chunks) > 1
|
||||
|
||||
# Each chunk should be reasonably sized (approximately 5 * 5 = 25 characters)
|
||||
for chunk in chunks:
|
||||
assert len(chunk) <= 30 # Allow some flexibility
|
||||
|
||||
|
||||
def test_rst_to_tups_with_headers():
|
||||
"""Test RST to tuples conversion with headers."""
|
||||
parser = RstParser()
|
||||
rst_content = """Introduction
|
||||
============
|
||||
|
||||
This is the introduction text.
|
||||
|
||||
Chapter 1
|
||||
=========
|
||||
|
||||
This is chapter 1 content.
|
||||
More content here.
|
||||
|
||||
Chapter 2
|
||||
=========
|
||||
|
||||
This is chapter 2 content."""
|
||||
|
||||
tups = parser.rst_to_tups(rst_content)
|
||||
|
||||
# Should have 3 tuples (intro, chapter 1, chapter 2)
|
||||
assert len(tups) >= 2
|
||||
|
||||
# Check that headers are captured
|
||||
headers = [tup[0] for tup in tups if tup[0] is not None]
|
||||
assert "Introduction" in headers
|
||||
assert "Chapter 1" in headers
|
||||
assert "Chapter 2" in headers
|
||||
|
||||
|
||||
def test_rst_to_tups_without_headers():
|
||||
"""Test RST to tuples conversion without headers."""
|
||||
parser = RstParser()
|
||||
rst_content = "Just plain text without any headers or structure."
|
||||
|
||||
tups = parser.rst_to_tups(rst_content)
|
||||
|
||||
# Should have one tuple with None header
|
||||
assert len(tups) == 1
|
||||
assert tups[0][0] is None
|
||||
assert "Just plain text" in tups[0][1]
|
||||
|
||||
|
||||
def test_parse_file_basic(rst_parser):
|
||||
"""Test basic parse_file functionality."""
|
||||
content = """Title
|
||||
=====
|
||||
|
||||
This is some content.
|
||||
|
||||
Subtitle
|
||||
--------
|
||||
|
||||
More content here."""
|
||||
|
||||
with patch("builtins.open", mock_open(read_data=content)):
|
||||
result = rst_parser.parse_file(Path("test.rst"))
|
||||
|
||||
# Should return a list of strings
|
||||
assert isinstance(result, list)
|
||||
assert len(result) >= 1
|
||||
|
||||
# Content should be processed and cleaned
|
||||
joined_result = "\n".join(result)
|
||||
assert "Title" in joined_result
|
||||
assert "content" in joined_result
|
||||
|
||||
|
||||
def test_parse_file_with_hyperlinks(rst_parser_custom):
|
||||
"""Test parse_file with hyperlinks when removal is disabled."""
|
||||
content = "Text with `link <http://example.com>`_ here."
|
||||
|
||||
with patch("builtins.open", mock_open(read_data=content)):
|
||||
result = rst_parser_custom.parse_file(Path("test.rst"))
|
||||
|
||||
joined_result = "\n".join(result)
|
||||
# Hyperlinks should be preserved when removal is disabled
|
||||
assert "http://example.com" in joined_result
|
||||
|
||||
|
||||
def test_parse_tups_with_max_tokens():
|
||||
"""Test parse_tups with token chunking."""
|
||||
parser = RstParser()
|
||||
content = """Header
|
||||
======
|
||||
|
||||
This is a very long piece of content that should be chunked into smaller pieces when max_tokens is specified. It contains multiple sentences and should be split appropriately."""
|
||||
|
||||
with patch("builtins.open", mock_open(read_data=content)):
|
||||
tups = parser.parse_tups(Path("test.rst"), max_tokens=10)
|
||||
|
||||
# Should create multiple chunks due to token limit
|
||||
assert len(tups) > 1
|
||||
|
||||
# Each tuple should have a header indicating chunk number
|
||||
chunk_headers = [tup[0] for tup in tups]
|
||||
assert any("Chunk" in str(header) for header in chunk_headers if header)
|
||||
|
||||
|
||||
def test_parse_tups_without_max_tokens():
|
||||
"""Test parse_tups without token chunking."""
|
||||
parser = RstParser()
|
||||
content = """Header
|
||||
======
|
||||
|
||||
Content here."""
|
||||
|
||||
with patch("builtins.open", mock_open(read_data=content)):
|
||||
tups = parser.parse_tups(Path("test.rst"), max_tokens=None)
|
||||
|
||||
# Should not create additional chunks
|
||||
assert len(tups) >= 1
|
||||
|
||||
# Headers should not contain "Chunk"
|
||||
chunk_headers = [tup[0] for tup in tups]
|
||||
assert not any("Chunk" in str(header) for header in chunk_headers if header)
|
||||
|
||||
|
||||
def test_parse_file_empty_content():
|
||||
"""Test parse_file with empty content."""
|
||||
parser = RstParser()
|
||||
|
||||
with patch("builtins.open", mock_open(read_data="")):
|
||||
result = parser.parse_file(Path("empty.rst"))
|
||||
|
||||
# Should handle empty content gracefully
|
||||
assert isinstance(result, list)
|
||||
|
||||
|
||||
def test_all_cleaning_methods_applied():
|
||||
"""Test that all cleaning methods are applied when enabled."""
|
||||
parser = RstParser()
|
||||
content = """Title
|
||||
=====
|
||||
|
||||
Text with `link <http://example.com>`_ and :doc:`reference`.
|
||||
|
||||
.. image:: image.png
|
||||
|
||||
+-----+-----+
|
||||
| A | B |
|
||||
+-----+-----+
|
||||
|
||||
`..note::` This is a note."""
|
||||
|
||||
with patch("builtins.open", mock_open(read_data=content)):
|
||||
result = parser.parse_file(Path("test.rst"))
|
||||
|
||||
joined_result = "\n".join(result)
|
||||
|
||||
# All unwanted elements should be removed
|
||||
assert "http://example.com" not in joined_result # hyperlinks removed
|
||||
assert ":doc:" not in joined_result # interpreters removed
|
||||
assert ".. image::" not in joined_result # images removed
|
||||
assert "+-----+" not in joined_result # table excess removed
|
||||
# The directive pattern looks for `..something::` so regular .. note:: won't be removed
|
||||
# but `..note::` will be removed
|
||||
assert "`..note::`" not in joined_result # directives removed
|
||||
215
tests/parser/file/test_tabular_parser.py
Normal file
215
tests/parser/file/test_tabular_parser.py
Normal file
@@ -0,0 +1,215 @@
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock, mock_open
|
||||
|
||||
from application.parser.file.tabular_parser import CSVParser, PandasCSVParser, ExcelParser
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def csv_parser():
|
||||
return CSVParser()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pandas_csv_parser():
|
||||
return PandasCSVParser()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def excel_parser():
|
||||
return ExcelParser()
|
||||
|
||||
def test_csv_init_parser():
|
||||
parser = CSVParser()
|
||||
assert isinstance(parser._init_parser(), dict)
|
||||
assert not parser.parser_config_set
|
||||
parser.init_parser()
|
||||
assert parser.parser_config_set
|
||||
|
||||
|
||||
def test_pandas_csv_init_parser():
|
||||
parser = PandasCSVParser()
|
||||
assert isinstance(parser._init_parser(), dict)
|
||||
assert not parser.parser_config_set
|
||||
parser.init_parser()
|
||||
assert parser.parser_config_set
|
||||
|
||||
|
||||
def test_excel_init_parser():
|
||||
parser = ExcelParser()
|
||||
assert isinstance(parser._init_parser(), dict)
|
||||
assert not parser.parser_config_set
|
||||
parser.init_parser()
|
||||
assert parser.parser_config_set
|
||||
|
||||
|
||||
def test_csv_parser_concat_rows(csv_parser):
|
||||
mock_data = "col1,col2\nvalue1,value2\nvalue3,value4"
|
||||
|
||||
with patch("builtins.open", mock_open(read_data=mock_data)):
|
||||
result = csv_parser.parse_file(Path("test.csv"))
|
||||
assert result == "col1, col2\nvalue1, value2\nvalue3, value4"
|
||||
|
||||
|
||||
def test_csv_parser_separate_rows(csv_parser):
|
||||
csv_parser._concat_rows = False
|
||||
mock_data = "col1,col2\nvalue1,value2\nvalue3,value4"
|
||||
|
||||
with patch("builtins.open", mock_open(read_data=mock_data)):
|
||||
result = csv_parser.parse_file(Path("test.csv"))
|
||||
assert result == ["col1, col2", "value1, value2", "value3, value4"]
|
||||
|
||||
|
||||
|
||||
|
||||
def test_pandas_csv_parser_concat_rows(pandas_csv_parser):
|
||||
mock_df = MagicMock()
|
||||
mock_df.columns.tolist.return_value = ["col1", "col2"]
|
||||
mock_df.iterrows.return_value = [
|
||||
(0, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["value1", "value2"]))),
|
||||
(1, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["value3", "value4"])))
|
||||
]
|
||||
|
||||
with patch("pandas.read_csv", return_value=mock_df):
|
||||
result = pandas_csv_parser.parse_file(Path("test.csv"))
|
||||
expected = "HEADERS: col1, col2\nvalue1, value2\nvalue3, value4"
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_pandas_csv_parser_separate_rows(pandas_csv_parser):
|
||||
pandas_csv_parser._concat_rows = False
|
||||
mock_df = MagicMock()
|
||||
mock_df.apply.return_value.tolist.return_value = ["value1, value2", "value3, value4"]
|
||||
|
||||
with patch("pandas.read_csv", return_value=mock_df):
|
||||
result = pandas_csv_parser.parse_file(Path("test.csv"))
|
||||
assert result == ["value1, value2", "value3, value4"]
|
||||
|
||||
|
||||
def test_pandas_csv_parser_header_period(pandas_csv_parser):
|
||||
pandas_csv_parser._header_period = 2
|
||||
|
||||
mock_df = MagicMock()
|
||||
mock_df.columns.tolist.return_value = ["col1", "col2"]
|
||||
mock_df.iterrows.return_value = [
|
||||
(0, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["value1", "value2"]))),
|
||||
(1, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["value3", "value4"]))),
|
||||
(2, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["value5", "value6"])))
|
||||
]
|
||||
mock_df.__len__.return_value = 3
|
||||
|
||||
with patch("pandas.read_csv", return_value=mock_df):
|
||||
result = pandas_csv_parser.parse_file(Path("test.csv"))
|
||||
expected = "HEADERS: col1, col2\nvalue1, value2\nvalue3, value4\nHEADERS: col1, col2\nvalue5, value6"
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_excel_parser_concat_rows(excel_parser):
|
||||
mock_df = MagicMock()
|
||||
mock_df.columns.tolist.return_value = ["col1", "col2"]
|
||||
mock_df.iterrows.return_value = [
|
||||
(0, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["value1", "value2"]))),
|
||||
(1, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["value3", "value4"])))
|
||||
]
|
||||
|
||||
with patch("pandas.read_excel", return_value=mock_df):
|
||||
result = excel_parser.parse_file(Path("test.xlsx"))
|
||||
expected = "HEADERS: col1, col2\nvalue1, value2\nvalue3, value4"
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_excel_parser_separate_rows(excel_parser):
|
||||
excel_parser._concat_rows = False
|
||||
mock_df = MagicMock()
|
||||
mock_df.apply.return_value.tolist.return_value = ["value1, value2", "value3, value4"]
|
||||
|
||||
with patch("pandas.read_excel", return_value=mock_df):
|
||||
result = excel_parser.parse_file(Path("test.xlsx"))
|
||||
assert result == ["value1, value2", "value3, value4"]
|
||||
|
||||
|
||||
def test_excel_parser_header_period(excel_parser):
|
||||
excel_parser._header_period = 1
|
||||
|
||||
mock_df = MagicMock()
|
||||
mock_df.columns.tolist.return_value = ["col1", "col2"]
|
||||
mock_df.iterrows.return_value = [
|
||||
(0, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["value1", "value2"]))),
|
||||
(1, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["value3", "value4"])))
|
||||
]
|
||||
mock_df.__len__.return_value = 2
|
||||
|
||||
with patch("pandas.read_excel", return_value=mock_df):
|
||||
result = excel_parser.parse_file(Path("test.xlsx"))
|
||||
expected = "value1, value2\nHEADERS: col1, col2\nvalue3, value4"
|
||||
assert result == expected
|
||||
|
||||
def test_csv_parser_import_error(csv_parser):
|
||||
import sys
|
||||
with patch.dict(sys.modules, {"csv": None}):
|
||||
with pytest.raises(ValueError, match="csv module is required to read CSV files"):
|
||||
csv_parser.parse_file(Path("test.csv"))
|
||||
|
||||
|
||||
def test_pandas_csv_parser_import_error(pandas_csv_parser):
|
||||
import sys
|
||||
with patch.dict(sys.modules, {"pandas": None}):
|
||||
with pytest.raises(ValueError, match="pandas module is required to read CSV files"):
|
||||
pandas_csv_parser.parse_file(Path("test.csv"))
|
||||
|
||||
|
||||
def test_pandas_csv_parser_header_period_zero(pandas_csv_parser):
|
||||
pandas_csv_parser._header_period = 0
|
||||
mock_df = MagicMock()
|
||||
mock_df.columns.tolist.return_value = ["c1", "c2"]
|
||||
mock_df.iterrows.return_value = [
|
||||
(0, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["v1", "v2"]))),
|
||||
(1, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["v3", "v4"]))),
|
||||
]
|
||||
with patch("pandas.read_csv", return_value=mock_df):
|
||||
result = pandas_csv_parser.parse_file(Path("f.csv"))
|
||||
assert result == "HEADERS: c1, c2\nv1, v2\nv3, v4"
|
||||
|
||||
|
||||
def test_pandas_csv_parser_header_period_one(pandas_csv_parser):
|
||||
pandas_csv_parser._header_period = 1
|
||||
mock_df = MagicMock()
|
||||
mock_df.columns.tolist.return_value = ["a", "b"]
|
||||
mock_df.iterrows.return_value = [
|
||||
(0, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["x", "y"]))),
|
||||
(1, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["m", "n"]))),
|
||||
]
|
||||
mock_df.__len__.return_value = 2
|
||||
with patch("pandas.read_csv", return_value=mock_df):
|
||||
result = pandas_csv_parser.parse_file(Path("f.csv"))
|
||||
assert result == "x, y\nHEADERS: a, b\nm, n"
|
||||
|
||||
|
||||
def test_pandas_csv_parser_passes_pandas_config():
|
||||
parser = PandasCSVParser(pandas_config={"sep": ";", "header": 0})
|
||||
mock_df = MagicMock()
|
||||
with patch("pandas.read_csv", return_value=mock_df) as mock_read:
|
||||
parser.parse_file(Path("conf.csv"))
|
||||
kwargs = mock_read.call_args.kwargs
|
||||
assert kwargs.get("sep") == ";"
|
||||
assert kwargs.get("header") == 0
|
||||
|
||||
|
||||
def test_excel_parser_custom_joiners_and_prefix(excel_parser):
|
||||
excel_parser._col_joiner = " | "
|
||||
excel_parser._row_joiner = " || "
|
||||
excel_parser._header_prefix = "COLUMNS: "
|
||||
mock_df = MagicMock()
|
||||
mock_df.columns.tolist.return_value = ["A", "B"]
|
||||
mock_df.iterrows.return_value = [
|
||||
(0, MagicMock(astype=lambda _: MagicMock(tolist=lambda: ["x", "y"]))),
|
||||
]
|
||||
with patch("pandas.read_excel", return_value=mock_df):
|
||||
result = excel_parser.parse_file(Path("t.xlsx"))
|
||||
assert result == "COLUMNS: A | B || x | y"
|
||||
|
||||
def test_excel_parser_import_error(excel_parser):
|
||||
import sys
|
||||
with patch.dict(sys.modules, {"pandas": None}):
|
||||
with pytest.raises(ValueError, match="pandas module is required to read Excel files"):
|
||||
excel_parser.parse_file(Path("test.xlsx"))
|
||||
167
tests/parser/remote/test_crawler_loader.py
Normal file
167
tests/parser/remote/test_crawler_loader.py
Normal file
@@ -0,0 +1,167 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from application.parser.remote.crawler_loader import CrawlerLoader
|
||||
from application.parser.schema.base import Document
|
||||
from langchain.docstore.document import Document as LCDocument
|
||||
|
||||
|
||||
class DummyResponse:
|
||||
def __init__(self, text: str) -> None:
|
||||
self.text = text
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
return None
|
||||
|
||||
|
||||
@patch("application.parser.remote.crawler_loader.requests.get")
|
||||
def test_load_data_crawls_same_domain_links(mock_requests_get):
|
||||
responses = {
|
||||
"http://example.com": DummyResponse(
|
||||
"""
|
||||
<html>
|
||||
<body>
|
||||
<a href='/about'>About</a>
|
||||
<a href='https://external.com/news'>External</a>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
),
|
||||
"http://example.com/about": DummyResponse("<html><body>About page</body></html>"),
|
||||
}
|
||||
|
||||
def response_side_effect(url: str):
|
||||
if url not in responses:
|
||||
raise AssertionError(f"Unexpected request for URL: {url}")
|
||||
return responses[url]
|
||||
|
||||
mock_requests_get.side_effect = response_side_effect
|
||||
|
||||
root_doc = MagicMock(spec=LCDocument)
|
||||
root_doc.page_content = "Root content"
|
||||
root_doc.metadata = {"source": "http://example.com"}
|
||||
|
||||
about_doc = MagicMock(spec=LCDocument)
|
||||
about_doc.page_content = "About content"
|
||||
about_doc.metadata = {"source": "http://example.com/about"}
|
||||
|
||||
loader_instances = {
|
||||
"http://example.com": MagicMock(),
|
||||
"http://example.com/about": MagicMock(),
|
||||
}
|
||||
loader_instances["http://example.com"].load.return_value = [root_doc]
|
||||
loader_instances["http://example.com/about"].load.return_value = [about_doc]
|
||||
|
||||
loader_call_order = []
|
||||
|
||||
def loader_factory(url_list):
|
||||
url = url_list[0]
|
||||
loader_call_order.append(url)
|
||||
return loader_instances[url]
|
||||
|
||||
crawler = CrawlerLoader(limit=5)
|
||||
crawler.loader = MagicMock(side_effect=loader_factory)
|
||||
|
||||
result = crawler.load_data("http://example.com")
|
||||
|
||||
assert len(result) == 2
|
||||
assert all(isinstance(doc, Document) for doc in result)
|
||||
|
||||
sources = {doc.extra_info.get("source") for doc in result}
|
||||
assert sources == {"http://example.com", "http://example.com/about"}
|
||||
|
||||
texts = {doc.text for doc in result}
|
||||
assert texts == {"Root content", "About content"}
|
||||
|
||||
assert mock_requests_get.call_count == 2
|
||||
assert loader_call_order == ["http://example.com", "http://example.com/about"]
|
||||
|
||||
|
||||
@patch("application.parser.remote.crawler_loader.requests.get")
|
||||
def test_load_data_accepts_list_input_and_adds_scheme(mock_requests_get):
|
||||
mock_requests_get.return_value = DummyResponse("<html><body>No links here</body></html>")
|
||||
|
||||
doc = MagicMock(spec=LCDocument)
|
||||
doc.page_content = "Homepage"
|
||||
doc.metadata = {"source": "http://example.com"}
|
||||
|
||||
loader_instance = MagicMock()
|
||||
loader_instance.load.return_value = [doc]
|
||||
|
||||
crawler = CrawlerLoader()
|
||||
crawler.loader = MagicMock(return_value=loader_instance)
|
||||
|
||||
result = crawler.load_data(["example.com", "unused.com"])
|
||||
|
||||
mock_requests_get.assert_called_once_with("http://example.com")
|
||||
crawler.loader.assert_called_once_with(["http://example.com"])
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].text == "Homepage"
|
||||
assert result[0].extra_info == {"source": "http://example.com"}
|
||||
|
||||
|
||||
@patch("application.parser.remote.crawler_loader.requests.get")
|
||||
def test_load_data_respects_limit(mock_requests_get):
|
||||
responses = {
|
||||
"http://example.com": DummyResponse(
|
||||
"""
|
||||
<html>
|
||||
<body>
|
||||
<a href='/about'>About</a>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
),
|
||||
"http://example.com/about": DummyResponse("<html><body>About</body></html>"),
|
||||
}
|
||||
|
||||
mock_requests_get.side_effect = lambda url: responses[url]
|
||||
|
||||
root_doc = MagicMock(spec=LCDocument)
|
||||
root_doc.page_content = "Root content"
|
||||
root_doc.metadata = {"source": "http://example.com"}
|
||||
|
||||
about_doc = MagicMock(spec=LCDocument)
|
||||
about_doc.page_content = "About content"
|
||||
about_doc.metadata = {"source": "http://example.com/about"}
|
||||
|
||||
loader_instances = {
|
||||
"http://example.com": MagicMock(),
|
||||
"http://example.com/about": MagicMock(),
|
||||
}
|
||||
loader_instances["http://example.com"].load.return_value = [root_doc]
|
||||
loader_instances["http://example.com/about"].load.return_value = [about_doc]
|
||||
|
||||
crawler = CrawlerLoader(limit=1)
|
||||
crawler.loader = MagicMock(side_effect=lambda url_list: loader_instances[url_list[0]])
|
||||
|
||||
result = crawler.load_data("http://example.com")
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].text == "Root content"
|
||||
assert mock_requests_get.call_count == 1
|
||||
assert crawler.loader.call_count == 1
|
||||
|
||||
|
||||
@patch("application.parser.remote.crawler_loader.logging")
|
||||
@patch("application.parser.remote.crawler_loader.requests.get")
|
||||
def test_load_data_logs_and_skips_on_loader_error(mock_requests_get, mock_logging):
|
||||
mock_requests_get.return_value = DummyResponse("<html><body>Error route</body></html>")
|
||||
|
||||
failing_loader_instance = MagicMock()
|
||||
failing_loader_instance.load.side_effect = Exception("load failure")
|
||||
|
||||
crawler = CrawlerLoader()
|
||||
crawler.loader = MagicMock(return_value=failing_loader_instance)
|
||||
|
||||
result = crawler.load_data("http://example.com")
|
||||
|
||||
assert result == []
|
||||
mock_requests_get.assert_called_once_with("http://example.com")
|
||||
failing_loader_instance.load.assert_called_once()
|
||||
|
||||
mock_logging.error.assert_called_once()
|
||||
message, = mock_logging.error.call_args.args
|
||||
assert "Error processing URL http://example.com" in message
|
||||
assert mock_logging.error.call_args.kwargs.get("exc_info") is True
|
||||
|
||||
139
tests/parser/remote/test_crawler_markdown.py
Normal file
139
tests/parser/remote/test_crawler_markdown.py
Normal file
@@ -0,0 +1,139 @@
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from application.parser.remote.crawler_markdown import CrawlerLoader
|
||||
from application.parser.schema.base import Document
|
||||
|
||||
|
||||
class DummyResponse:
|
||||
def __init__(self, text):
|
||||
self.text = text
|
||||
|
||||
def raise_for_status(self):
|
||||
return None
|
||||
|
||||
|
||||
def _fake_extract(value: str) -> SimpleNamespace:
|
||||
value = value.split("//")[-1]
|
||||
host = value.split("/")[0]
|
||||
parts = host.split(".")
|
||||
if len(parts) >= 2:
|
||||
domain = parts[-2]
|
||||
suffix = parts[-1]
|
||||
else:
|
||||
domain = host
|
||||
suffix = ""
|
||||
return SimpleNamespace(domain=domain, suffix=suffix)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _patch_tldextract(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
"application.parser.remote.crawler_markdown.tldextract.extract",
|
||||
_fake_extract,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _patch_markdownify(monkeypatch):
|
||||
outputs = {}
|
||||
|
||||
def fake_markdownify(html, *_, **__):
|
||||
return outputs.get(html, html)
|
||||
|
||||
monkeypatch.setattr(
|
||||
"application.parser.remote.crawler_markdown.markdownify",
|
||||
fake_markdownify,
|
||||
)
|
||||
return outputs
|
||||
|
||||
|
||||
def _setup_session(mock_get_side_effect):
|
||||
session = MagicMock()
|
||||
session.get.side_effect = mock_get_side_effect
|
||||
return session
|
||||
|
||||
|
||||
def test_load_data_filters_external_links(_patch_markdownify):
|
||||
root_html = """
|
||||
<html><head><title>Home</title></head>
|
||||
<body><a href="/about">About</a><a href="https://other.com">Other</a><p>Welcome</p></body>
|
||||
</html>
|
||||
"""
|
||||
about_html = "<html><head><title>About</title></head><body>About page</body></html>"
|
||||
|
||||
_patch_markdownify[root_html] = "Home Markdown"
|
||||
_patch_markdownify[about_html] = "About Markdown"
|
||||
|
||||
responses = {
|
||||
"http://example.com": DummyResponse(root_html),
|
||||
"http://example.com/about": DummyResponse(about_html),
|
||||
}
|
||||
|
||||
loader = CrawlerLoader(limit=5)
|
||||
loader.session = _setup_session(lambda url, timeout=10: responses[url])
|
||||
|
||||
docs = loader.load_data("http://example.com")
|
||||
|
||||
assert len(docs) == 2
|
||||
for doc in docs:
|
||||
assert isinstance(doc, Document)
|
||||
assert doc.extra_info["source"] in responses
|
||||
texts = {doc.text for doc in docs}
|
||||
assert texts == {"Home Markdown", "About Markdown"}
|
||||
|
||||
|
||||
def test_load_data_allows_subdomains(_patch_markdownify):
|
||||
root_html = """
|
||||
<html><head><title>Home</title></head>
|
||||
<body><a href="http://blog.example.com/post">Blog</a></body>
|
||||
</html>
|
||||
"""
|
||||
blog_html = "<html><head><title>Blog</title></head><body>Blog post</body></html>"
|
||||
|
||||
_patch_markdownify[root_html] = "Home Markdown"
|
||||
_patch_markdownify[blog_html] = "Blog Markdown"
|
||||
|
||||
responses = {
|
||||
"http://example.com": DummyResponse(root_html),
|
||||
"http://blog.example.com/post": DummyResponse(blog_html),
|
||||
}
|
||||
|
||||
loader = CrawlerLoader(limit=5, allow_subdomains=True)
|
||||
loader.session = _setup_session(lambda url, timeout=10: responses[url])
|
||||
|
||||
docs = loader.load_data("http://example.com")
|
||||
|
||||
sources = {doc.extra_info["source"] for doc in docs}
|
||||
assert "http://blog.example.com/post" in sources
|
||||
assert len(docs) == 2
|
||||
|
||||
|
||||
def test_load_data_handles_fetch_errors(monkeypatch, _patch_markdownify):
|
||||
root_html = """
|
||||
<html><head><title>Home</title></head>
|
||||
<body><a href="/about">About</a></body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
_patch_markdownify[root_html] = "Home Markdown"
|
||||
|
||||
def side_effect(url, timeout=10):
|
||||
if url == "http://example.com":
|
||||
return DummyResponse(root_html)
|
||||
raise requests.exceptions.RequestException("boom")
|
||||
|
||||
loader = CrawlerLoader(limit=5)
|
||||
loader.session = _setup_session(side_effect)
|
||||
mock_print = MagicMock()
|
||||
monkeypatch.setattr("builtins.print", mock_print)
|
||||
|
||||
docs = loader.load_data("http://example.com")
|
||||
|
||||
assert len(docs) == 1
|
||||
assert docs[0].text == "Home Markdown"
|
||||
assert mock_print.called
|
||||
|
||||
159
tests/parser/remote/test_github_loader.py
Normal file
159
tests/parser/remote/test_github_loader.py
Normal file
@@ -0,0 +1,159 @@
|
||||
import base64
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
import requests
|
||||
|
||||
from application.parser.remote.github_loader import GitHubLoader
|
||||
|
||||
|
||||
def make_response(json_data=None, status_code=200, raise_error=None):
|
||||
resp = MagicMock()
|
||||
resp.status_code = status_code
|
||||
resp.json.return_value = json_data
|
||||
if raise_error is not None:
|
||||
resp.raise_for_status.side_effect = raise_error
|
||||
else:
|
||||
resp.raise_for_status.return_value = None
|
||||
return resp
|
||||
|
||||
|
||||
class TestGitHubLoaderFetchFileContent:
|
||||
@patch("application.parser.remote.github_loader.requests.get")
|
||||
def test_text_file_base64_decoded(self, mock_get):
|
||||
loader = GitHubLoader()
|
||||
content_str = "Hello from README"
|
||||
b64 = base64.b64encode(content_str.encode("utf-8")).decode("utf-8")
|
||||
mock_get.return_value = make_response({"encoding": "base64", "content": b64})
|
||||
|
||||
result = loader.fetch_file_content("owner/repo", "README.md")
|
||||
|
||||
assert result == f"Filename: README.md\n\n{content_str}"
|
||||
mock_get.assert_called_once_with(
|
||||
"https://api.github.com/repos/owner/repo/contents/README.md",
|
||||
headers=loader.headers,
|
||||
)
|
||||
|
||||
@patch("application.parser.remote.github_loader.requests.get")
|
||||
def test_binary_file_skipped(self, mock_get):
|
||||
loader = GitHubLoader()
|
||||
mock_get.return_value = make_response({"encoding": "base64", "content": "AAAA"})
|
||||
|
||||
result = loader.fetch_file_content("owner/repo", "image.png")
|
||||
|
||||
assert result == "Filename: image.png is a binary file and was skipped."
|
||||
|
||||
@patch("application.parser.remote.github_loader.requests.get")
|
||||
def test_non_base64_plain_content(self, mock_get):
|
||||
loader = GitHubLoader()
|
||||
mock_get.return_value = make_response({"encoding": "", "content": "Plain text"})
|
||||
|
||||
result = loader.fetch_file_content("owner/repo", "file.txt")
|
||||
|
||||
assert result == "Filename: file.txt\n\nPlain text"
|
||||
|
||||
@patch("application.parser.remote.github_loader.requests.get")
|
||||
def test_http_error_raises(self, mock_get):
|
||||
loader = GitHubLoader()
|
||||
http_err = requests.HTTPError("Not found")
|
||||
mock_get.return_value = make_response(status_code=404, raise_error=http_err)
|
||||
|
||||
with pytest.raises(requests.HTTPError):
|
||||
loader.fetch_file_content("owner/repo", "missing.txt")
|
||||
|
||||
|
||||
class TestGitHubLoaderFetchRepoFiles:
|
||||
@patch("application.parser.remote.github_loader.requests.get")
|
||||
def test_recurses_directories(self, mock_get):
|
||||
loader = GitHubLoader()
|
||||
|
||||
def side_effect(url, headers=None):
|
||||
if url.endswith("/contents/"):
|
||||
return make_response([
|
||||
{"type": "file", "path": "README.md"},
|
||||
{"type": "dir", "path": "src"},
|
||||
])
|
||||
elif url.endswith("/contents/src"):
|
||||
return make_response([
|
||||
{"type": "file", "path": "src/main.py"},
|
||||
{"type": "file", "path": "src/util.py"},
|
||||
])
|
||||
raise AssertionError(f"Unexpected URL: {url}")
|
||||
|
||||
mock_get.side_effect = side_effect
|
||||
|
||||
files = loader.fetch_repo_files("owner/repo", path="")
|
||||
assert set(files) == {"README.md", "src/main.py", "src/util.py"}
|
||||
|
||||
|
||||
class TestGitHubLoaderLoadData:
|
||||
def test_load_data_builds_documents_from_files(self, monkeypatch):
|
||||
loader = GitHubLoader()
|
||||
|
||||
# Stub out network-dependent methods
|
||||
monkeypatch.setattr(loader, "fetch_repo_files", lambda repo, path="": [
|
||||
"README.md", "src/main.py"
|
||||
])
|
||||
|
||||
def fake_fetch_content(repo, file_path):
|
||||
return f"content for {file_path}"
|
||||
|
||||
monkeypatch.setattr(loader, "fetch_file_content", fake_fetch_content)
|
||||
|
||||
docs = loader.load_data("https://github.com/owner/repo")
|
||||
|
||||
assert len(docs) == 2
|
||||
assert docs[0].page_content == "content for README.md"
|
||||
assert docs[0].metadata == {
|
||||
"title": "README.md",
|
||||
"source": "https://github.com/owner/repo/blob/main/README.md",
|
||||
}
|
||||
assert docs[1].page_content == "content for src/main.py"
|
||||
assert docs[1].metadata == {
|
||||
"title": "src/main.py",
|
||||
"source": "https://github.com/owner/repo/blob/main/src/main.py",
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
class TestGitHubLoaderRobustness:
|
||||
@patch("application.parser.remote.github_loader.requests.get")
|
||||
def test_fetch_repo_files_non_json_raises(self, mock_get):
|
||||
resp = MagicMock()
|
||||
resp.json.side_effect = ValueError("No JSON")
|
||||
mock_get.return_value = resp
|
||||
with pytest.raises(ValueError):
|
||||
GitHubLoader().fetch_repo_files("owner/repo")
|
||||
|
||||
@patch("application.parser.remote.github_loader.requests.get")
|
||||
def test_fetch_repo_files_unexpected_shape_missing_type_raises(self, mock_get):
|
||||
# Missing 'type' in items should raise KeyError when accessed
|
||||
mock_get.return_value = make_response([{"path": "README.md"}])
|
||||
with pytest.raises(KeyError):
|
||||
GitHubLoader().fetch_repo_files("owner/repo")
|
||||
|
||||
@patch("application.parser.remote.github_loader.requests.get")
|
||||
def test_fetch_file_content_non_json_raises(self, mock_get):
|
||||
resp = MagicMock()
|
||||
resp.status_code = 200
|
||||
resp.json.side_effect = ValueError("No JSON")
|
||||
mock_get.return_value = resp
|
||||
with pytest.raises(ValueError):
|
||||
GitHubLoader().fetch_file_content("owner/repo", "README.md")
|
||||
|
||||
@patch("application.parser.remote.github_loader.requests.get")
|
||||
def test_fetch_file_content_unexpected_shape_missing_content_raises(self, mock_get):
|
||||
# encoding indicates base64 text, but 'content' key is missing
|
||||
resp = make_response({"encoding": "base64"})
|
||||
mock_get.return_value = resp
|
||||
with pytest.raises(KeyError):
|
||||
GitHubLoader().fetch_file_content("owner/repo", "README.md")
|
||||
|
||||
@patch("application.parser.remote.github_loader.base64.b64decode")
|
||||
@patch("application.parser.remote.github_loader.requests.get")
|
||||
def test_large_binary_skip_does_not_decode(self, mock_get, mock_b64decode):
|
||||
# Ensure we don't attempt to decode large binary content for non-text files
|
||||
mock_b64decode.side_effect = AssertionError("b64decode should not be called for binary files")
|
||||
mock_get.return_value = make_response({"encoding": "base64", "content": "AAA"})
|
||||
result = GitHubLoader().fetch_file_content("owner/repo", "bigfile.bin")
|
||||
assert result == "Filename: bigfile.bin is a binary file and was skipped."
|
||||
83
tests/parser/remote/test_reddit_loader.py
Normal file
83
tests/parser/remote/test_reddit_loader.py
Normal file
@@ -0,0 +1,83 @@
|
||||
import json
|
||||
from unittest.mock import patch, MagicMock
|
||||
import pytest
|
||||
|
||||
from application.parser.remote.reddit_loader import RedditPostsLoaderRemote
|
||||
|
||||
|
||||
class TestRedditPostsLoaderRemote:
|
||||
def test_invalid_json_raises(self):
|
||||
loader = RedditPostsLoaderRemote()
|
||||
with pytest.raises(ValueError) as exc:
|
||||
loader.load_data("not a json")
|
||||
assert "Invalid JSON input" in str(exc.value)
|
||||
|
||||
def test_missing_required_fields_raises(self):
|
||||
loader = RedditPostsLoaderRemote()
|
||||
payload = json.dumps({"client_id": "id"})
|
||||
with pytest.raises(ValueError) as exc:
|
||||
loader.load_data(payload)
|
||||
assert "Missing required fields" in str(exc.value)
|
||||
assert "client_secret" in str(exc.value)
|
||||
|
||||
@patch("application.parser.remote.reddit_loader.RedditPostsLoader")
|
||||
def test_constructs_loader_and_loads_with_defaults(self, MockRedditLoader):
|
||||
loader = RedditPostsLoaderRemote()
|
||||
|
||||
instance = MagicMock()
|
||||
docs = [MagicMock(), MagicMock()]
|
||||
instance.load.return_value = docs
|
||||
MockRedditLoader.return_value = instance
|
||||
|
||||
payload = {
|
||||
"client_id": "cid",
|
||||
"client_secret": "csecret",
|
||||
"user_agent": "ua",
|
||||
"search_queries": ["r/langchain"],
|
||||
}
|
||||
|
||||
result = loader.load_data(json.dumps(payload))
|
||||
|
||||
MockRedditLoader.assert_called_once_with(
|
||||
client_id="cid",
|
||||
client_secret="csecret",
|
||||
user_agent="ua",
|
||||
categories=["new", "hot"],
|
||||
mode="subreddit",
|
||||
search_queries=["r/langchain"],
|
||||
number_posts=10,
|
||||
)
|
||||
instance.load.assert_called_once()
|
||||
assert result == docs
|
||||
|
||||
@patch("application.parser.remote.reddit_loader.RedditPostsLoader")
|
||||
def test_constructs_loader_and_loads_with_overrides(self, MockRedditLoader):
|
||||
loader = RedditPostsLoaderRemote()
|
||||
|
||||
instance = MagicMock()
|
||||
instance.load.return_value = []
|
||||
MockRedditLoader.return_value = instance
|
||||
|
||||
payload = {
|
||||
"client_id": "cid",
|
||||
"client_secret": "csecret",
|
||||
"user_agent": "ua",
|
||||
"search_queries": ["python"],
|
||||
"categories": ["hot"],
|
||||
"mode": "comments",
|
||||
"number_posts": 3,
|
||||
}
|
||||
|
||||
loader.load_data(json.dumps(payload))
|
||||
|
||||
MockRedditLoader.assert_called_once_with(
|
||||
client_id="cid",
|
||||
client_secret="csecret",
|
||||
user_agent="ua",
|
||||
categories=["hot"],
|
||||
mode="comments",
|
||||
search_queries=["python"],
|
||||
number_posts=3,
|
||||
)
|
||||
instance.load.assert_called_once()
|
||||
|
||||
303
tests/parser/remote/test_web_loader.py
Normal file
303
tests/parser/remote/test_web_loader.py
Normal file
@@ -0,0 +1,303 @@
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from application.parser.remote.web_loader import WebLoader, headers
|
||||
from application.parser.schema.base import Document
|
||||
from langchain.docstore.document import Document as LCDocument
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def web_loader():
|
||||
return WebLoader()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_langchain_document():
|
||||
"""Create a mock LangChain document."""
|
||||
doc = MagicMock(spec=LCDocument)
|
||||
doc.page_content = "Test web page content"
|
||||
doc.metadata = {"source": "https://example.com", "title": "Test Page"}
|
||||
return doc
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_web_base_loader():
|
||||
"""Create a mock WebBaseLoader class."""
|
||||
mock_loader_class = MagicMock()
|
||||
mock_loader_instance = MagicMock()
|
||||
mock_loader_class.return_value = mock_loader_instance
|
||||
return mock_loader_class, mock_loader_instance
|
||||
|
||||
|
||||
class TestWebLoaderInitialization:
|
||||
"""Test WebLoader initialization."""
|
||||
|
||||
def test_init(self, web_loader):
|
||||
"""Test WebLoader initialization."""
|
||||
assert web_loader.loader is not None
|
||||
from langchain_community.document_loaders import WebBaseLoader
|
||||
assert web_loader.loader == WebBaseLoader
|
||||
|
||||
|
||||
class TestWebLoaderHeaders:
|
||||
"""Test WebLoader headers configuration."""
|
||||
|
||||
def test_headers_defined(self):
|
||||
"""Test that headers are properly defined."""
|
||||
assert isinstance(headers, dict)
|
||||
assert "User-Agent" in headers
|
||||
assert "Accept" in headers
|
||||
assert "Accept-Language" in headers
|
||||
assert "Referer" in headers
|
||||
assert "DNT" in headers
|
||||
assert "Connection" in headers
|
||||
assert "Upgrade-Insecure-Requests" in headers
|
||||
|
||||
def test_headers_values(self):
|
||||
"""Test header values are reasonable."""
|
||||
assert headers["User-Agent"] == "Mozilla/5.0"
|
||||
assert "text/html" in headers["Accept"]
|
||||
assert headers["Referer"] == "https://www.google.com/"
|
||||
assert headers["DNT"] == "1"
|
||||
assert headers["Connection"] == "keep-alive"
|
||||
|
||||
|
||||
class TestWebLoaderLoadData:
|
||||
"""Test WebLoader load_data method."""
|
||||
|
||||
def test_load_data_single_url_string(self, web_loader, mock_langchain_document):
|
||||
"""Test loading data from a single URL passed as string."""
|
||||
|
||||
mock_loader_instance = MagicMock()
|
||||
mock_loader_instance.load.return_value = [mock_langchain_document]
|
||||
|
||||
mock_web_base_loader_class = MagicMock()
|
||||
mock_web_base_loader_class.return_value = mock_loader_instance
|
||||
|
||||
web_loader.loader = mock_web_base_loader_class
|
||||
|
||||
result = web_loader.load_data("https://example.com")
|
||||
|
||||
assert len(result) == 1
|
||||
assert isinstance(result[0], Document)
|
||||
assert result[0].text == "Test web page content"
|
||||
assert result[0].extra_info == {"source": "https://example.com", "title": "Test Page"}
|
||||
|
||||
mock_web_base_loader_class.assert_called_once_with(["https://example.com"], header_template=headers)
|
||||
mock_loader_instance.load.assert_called_once()
|
||||
|
||||
def test_load_data_multiple_urls_list(self, web_loader):
|
||||
"""Test loading data from multiple URLs passed as list."""
|
||||
|
||||
doc1 = MagicMock(spec=LCDocument)
|
||||
doc1.page_content = "Content from site 1"
|
||||
doc1.metadata = {"source": "https://site1.com"}
|
||||
|
||||
doc2 = MagicMock(spec=LCDocument)
|
||||
doc2.page_content = "Content from site 2"
|
||||
doc2.metadata = {"source": "https://site2.com"}
|
||||
|
||||
|
||||
mock_loader_instance1 = MagicMock()
|
||||
mock_loader_instance1.load.return_value = [doc1]
|
||||
|
||||
mock_loader_instance2 = MagicMock()
|
||||
mock_loader_instance2.load.return_value = [doc2]
|
||||
|
||||
mock_web_base_loader_class = MagicMock()
|
||||
mock_web_base_loader_class.side_effect = [mock_loader_instance1, mock_loader_instance2]
|
||||
|
||||
web_loader.loader = mock_web_base_loader_class
|
||||
|
||||
urls = ["https://site1.com", "https://site2.com"]
|
||||
result = web_loader.load_data(urls)
|
||||
|
||||
assert len(result) == 2
|
||||
assert all(isinstance(doc, Document) for doc in result)
|
||||
assert result[0].text == "Content from site 1"
|
||||
assert result[1].text == "Content from site 2"
|
||||
assert result[0].extra_info == {"source": "https://site1.com"}
|
||||
assert result[1].extra_info == {"source": "https://site2.com"}
|
||||
|
||||
assert mock_web_base_loader_class.call_count == 2
|
||||
mock_web_base_loader_class.assert_any_call(["https://site1.com"], header_template=headers)
|
||||
mock_web_base_loader_class.assert_any_call(["https://site2.com"], header_template=headers)
|
||||
|
||||
def test_load_data_url_without_scheme(self, web_loader, mock_langchain_document):
|
||||
"""Test loading data from URL without scheme (should add http://)."""
|
||||
mock_loader_instance = MagicMock()
|
||||
mock_loader_instance.load.return_value = [mock_langchain_document]
|
||||
|
||||
mock_web_base_loader_class = MagicMock()
|
||||
mock_web_base_loader_class.return_value = mock_loader_instance
|
||||
|
||||
web_loader.loader = mock_web_base_loader_class
|
||||
|
||||
result = web_loader.load_data("example.com")
|
||||
|
||||
assert len(result) == 1
|
||||
assert isinstance(result[0], Document)
|
||||
|
||||
# Verify WebBaseLoader was called with http:// prefix
|
||||
mock_web_base_loader_class.assert_called_once_with(["http://example.com"], header_template=headers)
|
||||
|
||||
def test_load_data_url_with_scheme(self, web_loader, mock_langchain_document):
|
||||
"""Test loading data from URL with scheme (should not modify)."""
|
||||
mock_loader_instance = MagicMock()
|
||||
mock_loader_instance.load.return_value = [mock_langchain_document]
|
||||
|
||||
mock_web_base_loader_class = MagicMock()
|
||||
mock_web_base_loader_class.return_value = mock_loader_instance
|
||||
|
||||
web_loader.loader = mock_web_base_loader_class
|
||||
|
||||
result = web_loader.load_data("https://example.com")
|
||||
|
||||
assert len(result) == 1
|
||||
|
||||
# Verify WebBaseLoader was called with original URL
|
||||
mock_web_base_loader_class.assert_called_once_with(["https://example.com"], header_template=headers)
|
||||
|
||||
def test_load_data_multiple_documents_per_url(self, web_loader):
|
||||
"""Test loading multiple documents from a single URL."""
|
||||
doc1 = MagicMock(spec=LCDocument)
|
||||
doc1.page_content = "First document content"
|
||||
doc1.metadata = {"source": "https://example.com", "section": "intro"}
|
||||
|
||||
doc2 = MagicMock(spec=LCDocument)
|
||||
doc2.page_content = "Second document content"
|
||||
doc2.metadata = {"source": "https://example.com", "section": "main"}
|
||||
|
||||
mock_loader_instance = MagicMock()
|
||||
mock_loader_instance.load.return_value = [doc1, doc2]
|
||||
|
||||
mock_web_base_loader_class = MagicMock()
|
||||
mock_web_base_loader_class.return_value = mock_loader_instance
|
||||
|
||||
web_loader.loader = mock_web_base_loader_class
|
||||
|
||||
result = web_loader.load_data("https://example.com")
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0].text == "First document content"
|
||||
assert result[1].text == "Second document content"
|
||||
assert result[0].extra_info == {"source": "https://example.com", "section": "intro"}
|
||||
assert result[1].extra_info == {"source": "https://example.com", "section": "main"}
|
||||
|
||||
|
||||
class TestWebLoaderErrorHandling:
|
||||
"""Test WebLoader error handling."""
|
||||
|
||||
@patch('application.parser.remote.web_loader.logging')
|
||||
def test_load_data_single_url_error(self, mock_logging, web_loader):
|
||||
"""Test error handling for single URL that fails to load."""
|
||||
mock_loader_instance = MagicMock()
|
||||
mock_loader_instance.load.side_effect = Exception("Network error")
|
||||
|
||||
mock_web_base_loader_class = MagicMock()
|
||||
mock_web_base_loader_class.return_value = mock_loader_instance
|
||||
|
||||
web_loader.loader = mock_web_base_loader_class
|
||||
|
||||
result = web_loader.load_data("https://invalid-url.com")
|
||||
|
||||
assert result == [] # Should return empty list on error
|
||||
mock_logging.error.assert_called_once()
|
||||
error_call = mock_logging.error.call_args
|
||||
assert "Error processing URL https://invalid-url.com" in error_call[0][0]
|
||||
assert error_call[1]["exc_info"] is True
|
||||
|
||||
@patch('application.parser.remote.web_loader.logging')
|
||||
def test_load_data_partial_failure(self, mock_logging, web_loader):
|
||||
"""Test partial failure - some URLs succeed, some fail."""
|
||||
doc1 = MagicMock(spec=LCDocument)
|
||||
doc1.page_content = "Success content"
|
||||
doc1.metadata = {"source": "https://good-url.com"}
|
||||
|
||||
mock_loader_instance1 = MagicMock()
|
||||
mock_loader_instance1.load.return_value = [doc1]
|
||||
|
||||
mock_loader_instance2 = MagicMock()
|
||||
mock_loader_instance2.load.side_effect = Exception("Network error")
|
||||
|
||||
mock_web_base_loader_class = MagicMock()
|
||||
mock_web_base_loader_class.side_effect = [mock_loader_instance1, mock_loader_instance2]
|
||||
|
||||
web_loader.loader = mock_web_base_loader_class
|
||||
|
||||
urls = ["https://good-url.com", "https://bad-url.com"]
|
||||
result = web_loader.load_data(urls)
|
||||
|
||||
assert len(result) == 1 # Only successful URL should be in results
|
||||
assert result[0].text == "Success content"
|
||||
assert result[0].extra_info == {"source": "https://good-url.com"}
|
||||
|
||||
mock_logging.error.assert_called_once()
|
||||
error_call = mock_logging.error.call_args
|
||||
assert "Error processing URL https://bad-url.com" in error_call[0][0]
|
||||
|
||||
|
||||
class TestWebLoaderEdgeCases:
|
||||
"""Test WebLoader edge cases."""
|
||||
|
||||
def test_load_data_empty_list(self, web_loader):
|
||||
"""Test loading data with empty URL list."""
|
||||
result = web_loader.load_data([])
|
||||
assert result == []
|
||||
|
||||
def test_load_data_empty_response(self, web_loader):
|
||||
"""Test loading data when WebBaseLoader returns empty list."""
|
||||
mock_loader_instance = MagicMock()
|
||||
mock_loader_instance.load.return_value = []
|
||||
|
||||
mock_web_base_loader_class = MagicMock()
|
||||
mock_web_base_loader_class.return_value = mock_loader_instance
|
||||
|
||||
web_loader.loader = mock_web_base_loader_class
|
||||
|
||||
result = web_loader.load_data("https://empty-page.com")
|
||||
|
||||
assert result == []
|
||||
|
||||
def test_url_scheme_detection(self):
|
||||
"""Test URL scheme detection logic."""
|
||||
# Test URLs with schemes
|
||||
assert urlparse("https://example.com").scheme == "https"
|
||||
assert urlparse("http://example.com").scheme == "http"
|
||||
assert urlparse("ftp://example.com").scheme == "ftp"
|
||||
|
||||
# Test URLs without schemes
|
||||
assert urlparse("example.com").scheme == ""
|
||||
assert urlparse("www.example.com").scheme == ""
|
||||
|
||||
|
||||
class TestWebLoaderIntegration:
|
||||
"""Test WebLoader integration with base class."""
|
||||
|
||||
def test_inherits_from_base_remote(self, web_loader):
|
||||
"""Test that WebLoader inherits from BaseRemote."""
|
||||
from application.parser.remote.base import BaseRemote
|
||||
assert isinstance(web_loader, BaseRemote)
|
||||
|
||||
def test_implements_load_data_method(self, web_loader):
|
||||
"""Test that WebLoader implements required load_data method."""
|
||||
assert hasattr(web_loader, 'load_data')
|
||||
assert callable(web_loader.load_data)
|
||||
|
||||
def test_load_langchain_documents_method(self, web_loader, mock_langchain_document):
|
||||
"""Test inherited load_langchain_documents method."""
|
||||
mock_loader_instance = MagicMock()
|
||||
mock_loader_instance.load.return_value = [mock_langchain_document]
|
||||
|
||||
mock_web_base_loader_class = MagicMock()
|
||||
mock_web_base_loader_class.return_value = mock_loader_instance
|
||||
|
||||
web_loader.loader = mock_web_base_loader_class
|
||||
|
||||
result = web_loader.load_langchain_documents(inputs="https://example.com")
|
||||
|
||||
assert len(result) == 1
|
||||
assert isinstance(result[0], LCDocument)
|
||||
assert result[0].page_content == "Test web page content"
|
||||
assert result[0].metadata == {"source": "https://example.com", "title": "Test Page"}
|
||||
Reference in New Issue
Block a user