diff --git a/tests/test_worker.py b/tests/test_worker.py new file mode 100644 index 00000000..a63e64c6 --- /dev/null +++ b/tests/test_worker.py @@ -0,0 +1,1718 @@ +import io +import json +import os +import zipfile +from unittest.mock import MagicMock, patch + +import pytest +from bson.objectid import ObjectId + +from application.parser.schema.base import Document as DocSchema +from application.worker import ( + ZipExtractionError, + _apply_display_names_to_structure, + _get_display_name, + _is_path_safe, + _normalize_file_name_map, + _validate_zip_safety, + attachment_worker, + download_file, + extract_zip_recursive, + generate_random_string, + metadata_from_filename, + upload_index, +) + + +def _make_doc(text="content", extra_info=None): + """Create a real Document for tests that go through to_langchain_format.""" + return DocSchema(text=text, extra_info=extra_info or {}) + + +# ────────────────────────────────────────────────────────────────────────────── +# metadata_from_filename +# ────────────────────────────────────────────────────────────────────────────── + + +class TestMetadataFromFilename: + def test_returns_dict_with_title(self): + assert metadata_from_filename("doc.pdf") == {"title": "doc.pdf"} + + def test_empty_string(self): + assert metadata_from_filename("") == {"title": ""} + + def test_path_as_title(self): + assert metadata_from_filename("/a/b/c.txt") == {"title": "/a/b/c.txt"} + + +# ────────────────────────────────────────────────────────────────────────────── +# _normalize_file_name_map +# ────────────────────────────────────────────────────────────────────────────── + + +class TestNormalizeFileNameMap: + def test_none_returns_empty(self): + assert _normalize_file_name_map(None) == {} + + def test_empty_string_returns_empty(self): + assert _normalize_file_name_map("") == {} + + def test_valid_dict_passthrough(self): + m = {"a.txt": "Original A.txt"} + assert _normalize_file_name_map(m) == m + + def test_valid_json_string(self): + m = {"a.txt": "Original A.txt"} + assert _normalize_file_name_map(json.dumps(m)) == m + + def test_invalid_json_string_returns_empty(self): + assert _normalize_file_name_map("{bad json") == {} + + def test_non_dict_json_returns_empty(self): + assert _normalize_file_name_map(json.dumps([1, 2, 3])) == {} + + def test_non_dict_type_returns_empty(self): + assert _normalize_file_name_map(42) == {} + + def test_empty_dict(self): + assert _normalize_file_name_map({}) == {} + + +# ────────────────────────────────────────────────────────────────────────────── +# _get_display_name +# ────────────────────────────────────────────────────────────────────────────── + + +class TestGetDisplayName: + def test_exact_match(self): + m = {"sub/file.txt": "My File"} + assert _get_display_name(m, "sub/file.txt") == "My File" + + def test_basename_fallback(self): + m = {"file.txt": "My File"} + assert _get_display_name(m, "sub/file.txt") == "My File" + + def test_no_match(self): + assert _get_display_name({"other.txt": "X"}, "file.txt") is None + + def test_empty_map(self): + assert _get_display_name({}, "file.txt") is None + + def test_none_map(self): + assert _get_display_name(None, "file.txt") is None + + def test_none_rel_path(self): + assert _get_display_name({"a": "b"}, None) is None + + def test_empty_rel_path(self): + assert _get_display_name({"a": "b"}, "") is None + + +# ────────────────────────────────────────────────────────────────────────────── +# _apply_display_names_to_structure +# ────────────────────────────────────────────────────────────────────────────── + + +class TestApplyDisplayNamesToStructure: + def test_flat_file_node(self): + structure = { + "file.txt": {"type": "text/plain", "size_bytes": 100} + } + m = {"file.txt": "Original Name.txt"} + result = _apply_display_names_to_structure(structure, m) + assert result["file.txt"]["display_name"] == "Original Name.txt" + + def test_nested_directory(self): + structure = { + "sub": { + "file.txt": {"type": "text/plain", "size_bytes": 50} + } + } + m = {"sub/file.txt": "Nested File"} + result = _apply_display_names_to_structure(structure, m) + assert result["sub"]["file.txt"]["display_name"] == "Nested File" + + def test_no_match_leaves_structure_unchanged(self): + structure = { + "file.txt": {"type": "text/plain", "size_bytes": 100} + } + result = _apply_display_names_to_structure(structure, {"other.txt": "X"}) + assert "display_name" not in result["file.txt"] + + def test_empty_map_returns_structure(self): + structure = {"a": {"type": "t", "size_bytes": 1}} + assert _apply_display_names_to_structure(structure, {}) is structure + + def test_none_map_returns_structure(self): + structure = {"a": {"type": "t", "size_bytes": 1}} + assert _apply_display_names_to_structure(structure, None) is structure + + def test_non_dict_structure_returns_as_is(self): + assert _apply_display_names_to_structure("not_a_dict", {"a": "b"}) == "not_a_dict" + + +# ────────────────────────────────────────────────────────────────────────────── +# generate_random_string +# ────────────────────────────────────────────────────────────────────────────── + + +class TestGenerateRandomString: + def test_length(self): + assert len(generate_random_string(10)) == 10 + + def test_zero_length(self): + assert generate_random_string(0) == "" + + def test_all_ascii_letters(self): + import string + result = generate_random_string(100) + assert all(c in string.ascii_letters for c in result) + + def test_deterministic(self): + # Same call should always produce the same string (no randomness) + assert generate_random_string(5) == generate_random_string(5) + + +# ────────────────────────────────────────────────────────────────────────────── +# _is_path_safe +# ────────────────────────────────────────────────────────────────────────────── + + +class TestIsPathSafe: + def test_safe_subpath(self, tmp_path): + assert _is_path_safe(str(tmp_path), str(tmp_path / "sub" / "file.txt")) + + def test_base_equals_target(self, tmp_path): + assert _is_path_safe(str(tmp_path), str(tmp_path)) + + def test_traversal_attack(self, tmp_path): + malicious = str(tmp_path / ".." / "etc" / "passwd") + assert not _is_path_safe(str(tmp_path), malicious) + + def test_sibling_directory(self, tmp_path): + sibling = str(tmp_path.parent / "other_dir" / "file.txt") + assert not _is_path_safe(str(tmp_path), sibling) + + +# ────────────────────────────────────────────────────────────────────────────── +# _validate_zip_safety +# ────────────────────────────────────────────────────────────────────────────── + + +class TestValidateZipSafety: + def _make_zip(self, tmp_path, files=None): + """Helper to create a zip file with given files dict {name: content}.""" + zip_path = str(tmp_path / "test.zip") + files = files or {"hello.txt": b"world"} + with zipfile.ZipFile(zip_path, "w") as zf: + for name, content in files.items(): + zf.writestr(name, content) + return zip_path + + def test_valid_zip(self, tmp_path): + zip_path = self._make_zip(tmp_path) + _validate_zip_safety(zip_path, str(tmp_path / "out")) + + def test_too_many_files(self, tmp_path): + zip_path = str(tmp_path / "test.zip") + with zipfile.ZipFile(zip_path, "w") as zf: + for i in range(10001): + zf.writestr(f"file_{i}.txt", "x") + with pytest.raises(ZipExtractionError, match="too many files"): + _validate_zip_safety(zip_path, str(tmp_path / "out")) + + def test_path_traversal(self, tmp_path): + zip_path = str(tmp_path / "test.zip") + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("safe.txt", "ok") + # Manually inject a traversal path entry + extract_to = str(tmp_path / "out") + os.makedirs(extract_to, exist_ok=True) + with patch("application.worker._is_path_safe", return_value=False): + with pytest.raises(ZipExtractionError, match="path traversal"): + _validate_zip_safety(zip_path, extract_to) + + def test_bad_zip_file(self, tmp_path): + bad_path = str(tmp_path / "bad.zip") + with open(bad_path, "wb") as f: + f.write(b"this is not a zip") + with pytest.raises(ZipExtractionError, match="Invalid or corrupted"): + _validate_zip_safety(bad_path, str(tmp_path / "out")) + + def test_high_compression_ratio(self, tmp_path): + zip_path = str(tmp_path / "test.zip") + # Create a highly compressible file (repeated zeros) + big_content = b"\x00" * (1024 * 1024) # 1MB of zeros + with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: + zf.writestr("big.txt", big_content) + compressed_size = os.path.getsize(zip_path) + ratio = len(big_content) / compressed_size + if ratio > 100: + with pytest.raises(ZipExtractionError, match="compression ratio"): + _validate_zip_safety(zip_path, str(tmp_path / "out")) + else: + # If compression ratio is acceptable, no error + _validate_zip_safety(zip_path, str(tmp_path / "out")) + + +# ────────────────────────────────────────────────────────────────────────────── +# extract_zip_recursive +# ────────────────────────────────────────────────────────────────────────────── + + +class TestExtractZipRecursive: + def test_basic_extraction(self, tmp_path): + zip_path = str(tmp_path / "test.zip") + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("hello.txt", "world") + extract_to = str(tmp_path / "out") + os.makedirs(extract_to) + extract_zip_recursive(zip_path, extract_to) + assert os.path.exists(os.path.join(extract_to, "hello.txt")) + assert not os.path.exists(zip_path) # zip removed after extraction + + def test_nested_zip_extraction(self, tmp_path): + # Create inner zip + inner_zip_bytes = io.BytesIO() + with zipfile.ZipFile(inner_zip_bytes, "w") as zf: + zf.writestr("inner.txt", "inner content") + inner_zip_bytes.seek(0) + + # Create outer zip containing inner zip + outer_zip_path = str(tmp_path / "outer.zip") + with zipfile.ZipFile(outer_zip_path, "w") as zf: + zf.writestr("inner.zip", inner_zip_bytes.read()) + + extract_to = str(tmp_path / "out") + os.makedirs(extract_to) + extract_zip_recursive(outer_zip_path, extract_to) + assert os.path.exists(os.path.join(extract_to, "inner.txt")) + + def test_max_depth_stops_recursion(self, tmp_path): + zip_path = str(tmp_path / "test.zip") + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("file.txt", "data") + extract_to = str(tmp_path / "out") + os.makedirs(extract_to) + # current_depth > max_depth should stop immediately + extract_zip_recursive(zip_path, extract_to, current_depth=6, max_depth=5) + # zip file should still exist (not extracted) + assert os.path.exists(zip_path) + + def test_security_failure_removes_zip(self, tmp_path): + zip_path = str(tmp_path / "test.zip") + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("ok.txt", "data") + extract_to = str(tmp_path / "out") + os.makedirs(extract_to) + with patch( + "application.worker._validate_zip_safety", + side_effect=ZipExtractionError("bad zip"), + ): + extract_zip_recursive(zip_path, extract_to) + assert not os.path.exists(zip_path) + + def test_generic_exception_does_not_crash(self, tmp_path): + zip_path = str(tmp_path / "test.zip") + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("ok.txt", "data") + extract_to = str(tmp_path / "out") + os.makedirs(extract_to) + with patch( + "application.worker._validate_zip_safety", + side_effect=RuntimeError("oops"), + ): + # Should not raise + extract_zip_recursive(zip_path, extract_to) + + +# ────────────────────────────────────────────────────────────────────────────── +# download_file +# ────────────────────────────────────────────────────────────────────────────── + + +class TestDownloadFile: + @patch("application.worker.requests.get") + def test_successful_download(self, mock_get, tmp_path): + mock_response = MagicMock() + mock_response.content = b"file content" + mock_response.raise_for_status = MagicMock() + mock_get.return_value = mock_response + + dest = str(tmp_path / "downloaded.txt") + download_file("http://example.com/file", {"key": "val"}, dest) + + mock_get.assert_called_once_with("http://example.com/file", params={"key": "val"}) + with open(dest, "rb") as f: + assert f.read() == b"file content" + + @patch("application.worker.requests.get") + def test_request_exception_raises(self, mock_get): + import requests + mock_get.side_effect = requests.RequestException("timeout") + with pytest.raises(requests.RequestException): + download_file("http://example.com/file", {}, "/tmp/nonexistent") + + +# ────────────────────────────────────────────────────────────────────────────── +# upload_index +# ────────────────────────────────────────────────────────────────────────────── + + +class TestUploadIndex: + @patch("application.worker.requests.post") + @patch("application.worker.settings") + def test_non_faiss_upload(self, mock_settings, mock_post, tmp_path): + mock_settings.VECTOR_STORE = "elasticsearch" + mock_settings.API_URL = "http://api.test" + mock_settings.INTERNAL_KEY = "secret" + mock_response = MagicMock() + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + upload_index(str(tmp_path), {"name": "test"}) + + mock_post.assert_called_once() + args, kwargs = mock_post.call_args + assert "upload_index" in args[0] + assert kwargs["headers"]["X-Internal-Key"] == "secret" + + @patch("application.worker.requests.post") + @patch("application.worker.settings") + def test_faiss_upload_with_files(self, mock_settings, mock_post, tmp_path): + mock_settings.VECTOR_STORE = "faiss" + mock_settings.API_URL = "http://api.test" + mock_settings.INTERNAL_KEY = "" + + # Create FAISS index files + faiss_path = tmp_path / "index.faiss" + pkl_path = tmp_path / "index.pkl" + faiss_path.write_bytes(b"faiss data") + pkl_path.write_bytes(b"pkl data") + + mock_response = MagicMock() + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + upload_index(str(tmp_path), {"name": "test"}) + + args, kwargs = mock_post.call_args + assert "file_faiss" in kwargs["files"] + assert "file_pkl" in kwargs["files"] + + @patch("application.worker.settings") + def test_faiss_missing_index_file(self, mock_settings, tmp_path): + mock_settings.VECTOR_STORE = "faiss" + mock_settings.API_URL = "http://api.test" + mock_settings.INTERNAL_KEY = "" + with pytest.raises(FileNotFoundError, match="FAISS index file"): + upload_index(str(tmp_path), {"name": "test"}) + + @patch("application.worker.settings") + def test_faiss_missing_pkl_file(self, mock_settings, tmp_path): + mock_settings.VECTOR_STORE = "faiss" + mock_settings.API_URL = "http://api.test" + mock_settings.INTERNAL_KEY = "" + (tmp_path / "index.faiss").write_bytes(b"data") + with pytest.raises(FileNotFoundError, match="FAISS pickle file"): + upload_index(str(tmp_path), {"name": "test"}) + + +# ────────────────────────────────────────────────────────────────────────────── +# run_agent_logic +# ────────────────────────────────────────────────────────────────────────────── + + +class TestRunAgentLogic: + @patch("application.worker.AgentCreator") + @patch("application.worker.RetrieverCreator") + @patch("application.worker.get_prompt", return_value="Test prompt") + @patch("application.worker.db") + def test_successful_run(self, mock_db, mock_get_prompt, mock_ret_creator, mock_agent_creator): + from application.worker import run_agent_logic + + mock_retriever = MagicMock() + mock_retriever.search.return_value = [{"text": "doc1"}] + mock_ret_creator.create_retriever.return_value = mock_retriever + + mock_agent = MagicMock() + mock_agent.gen.return_value = [ + {"answer": "Hello "}, + {"answer": "world"}, + {"sources": [{"title": "doc1"}]}, + {"tool_calls": [{"name": "tool1"}]}, + {"thought": "thinking..."}, + ] + mock_agent_creator.create_agent.return_value = mock_agent + + agent_config = { + "source": {}, + "retriever": "classic", + "chunks": 2, + "prompt_id": "default", + "key": "test-key", + "_id": ObjectId(), + "agent_type": "classic", + "user": "test_user", + } + + with patch("application.core.model_utils.get_api_key_for_provider", return_value="api-key"), \ + patch("application.core.model_utils.get_default_model_id", return_value="gpt-4"), \ + patch("application.core.model_utils.get_provider_from_model_id", return_value="openai"), \ + patch("application.core.model_utils.validate_model_id", return_value=False), \ + patch("application.utils.calculate_doc_token_budget", return_value=2000): + result = run_agent_logic(agent_config, "test input") + + assert result["answer"] == "Hello world" + assert result["sources"] == [{"title": "doc1"}] + assert result["tool_calls"] == [{"name": "tool1"}] + assert result["thought"] == "thinking..." + + @patch("application.worker.db") + @patch("application.worker.get_prompt", side_effect=Exception("DB error")) + def test_exception_propagated(self, mock_prompt, mock_db): + from application.worker import run_agent_logic + + with pytest.raises(Exception, match="DB error"): + run_agent_logic({"source": {}, "key": "k", "user": "u"}, "input") + + @patch("application.worker.AgentCreator") + @patch("application.worker.RetrieverCreator") + @patch("application.worker.get_prompt", return_value="prompt") + @patch("application.worker.db") + def test_with_dbref_source(self, mock_db, mock_get_prompt, mock_ret_creator, mock_agent_creator): + from bson.dbref import DBRef + from application.worker import run_agent_logic + + source_doc = {"_id": ObjectId(), "retriever": "semantic"} + mock_db.dereference.return_value = source_doc + + mock_retriever = MagicMock() + mock_retriever.search.return_value = [] + mock_ret_creator.create_retriever.return_value = mock_retriever + + mock_agent = MagicMock() + mock_agent.gen.return_value = [{"answer": "ok"}] + mock_agent_creator.create_agent.return_value = mock_agent + + agent_config = { + "source": DBRef("sources", ObjectId()), + "key": "test-key", + "_id": ObjectId(), + "user": "test_user", + } + + with patch("application.core.model_utils.get_api_key_for_provider", return_value="k"), \ + patch("application.core.model_utils.get_default_model_id", return_value="gpt-4"), \ + patch("application.core.model_utils.get_provider_from_model_id", return_value="openai"), \ + patch("application.core.model_utils.validate_model_id", return_value=False), \ + patch("application.utils.calculate_doc_token_budget", return_value=2000): + result = run_agent_logic(agent_config, "input") + + assert result["answer"] == "ok" + + @patch("application.worker.AgentCreator") + @patch("application.worker.RetrieverCreator") + @patch("application.worker.get_prompt", return_value="prompt") + @patch("application.worker.db") + def test_retriever_failure_continues(self, mock_db, mock_get_prompt, mock_ret_creator, mock_agent_creator): + from application.worker import run_agent_logic + + mock_retriever = MagicMock() + mock_retriever.search.side_effect = Exception("search failed") + mock_ret_creator.create_retriever.return_value = mock_retriever + + mock_agent = MagicMock() + mock_agent.gen.return_value = [{"answer": "still works"}] + mock_agent_creator.create_agent.return_value = mock_agent + + agent_config = { + "source": {}, + "key": "test-key", + "_id": ObjectId(), + "user": "test_user", + } + + with patch("application.core.model_utils.get_api_key_for_provider", return_value="k"), \ + patch("application.core.model_utils.get_default_model_id", return_value="gpt-4"), \ + patch("application.core.model_utils.get_provider_from_model_id", return_value="openai"), \ + patch("application.core.model_utils.validate_model_id", return_value=False), \ + patch("application.utils.calculate_doc_token_budget", return_value=2000): + result = run_agent_logic(agent_config, "input") + + assert result["answer"] == "still works" + + +# ────────────────────────────────────────────────────────────────────────────── +# ingest_worker +# ────────────────────────────────────────────────────────────────────────────── + + +class TestIngestWorker: + def _make_task(self): + task = MagicMock() + task.update_state = MagicMock() + return task + + @patch("application.worker.upload_index") + @patch("application.worker.embed_and_store_documents") + @patch("application.worker.count_tokens_docs", return_value=500) + @patch("application.worker.Chunker") + @patch("application.worker.SimpleDirectoryReader") + @patch("application.worker.StorageCreator") + def test_single_file_ingest( + self, mock_sc, mock_reader_cls, mock_chunker_cls, + mock_count, mock_embed, mock_upload + ): + from application.worker import ingest_worker + + task = self._make_task() + mock_storage = MagicMock() + mock_storage.is_directory.return_value = False + mock_storage.get_file.return_value = io.BytesIO(b"file content") + mock_sc.get_storage.return_value = mock_storage + + doc = _make_doc("test content", {"title": "test.txt"}) + mock_reader = MagicMock() + mock_reader.load_data.return_value = [doc] + mock_reader.directory_structure = {} + mock_reader_cls.return_value = mock_reader + + mock_chunker = MagicMock() + mock_chunker.chunk.return_value = [doc] + mock_chunker_cls.return_value = mock_chunker + + result = ingest_worker( + task, "inputs", [".txt"], "job1", + "inputs/user1/job1/test.txt", "test.txt", "user1" + ) + + assert result["name_job"] == "job1" + assert result["filename"] == "test.txt" + assert result["user"] == "user1" + assert result["limited"] is False + mock_embed.assert_called_once() + mock_upload.assert_called_once() + + @patch("application.worker.upload_index") + @patch("application.worker.embed_and_store_documents") + @patch("application.worker.count_tokens_docs", return_value=100) + @patch("application.worker.Chunker") + @patch("application.worker.SimpleDirectoryReader") + @patch("application.worker.StorageCreator") + def test_directory_ingest( + self, mock_sc, mock_reader_cls, mock_chunker_cls, + mock_count, mock_embed, mock_upload + ): + from application.worker import ingest_worker + + task = self._make_task() + mock_storage = MagicMock() + mock_storage.is_directory.side_effect = lambda p: not p.endswith(".txt") + mock_storage.list_files.return_value = [ + "inputs/user1/job1/a.txt", + "inputs/user1/job1/b.txt", + ] + mock_storage.get_file.return_value = io.BytesIO(b"content") + mock_sc.get_storage.return_value = mock_storage + + doc = _make_doc("test content", {"title": "a.txt"}) + mock_reader = MagicMock() + mock_reader.load_data.return_value = [doc] + mock_reader.directory_structure = {} + mock_reader_cls.return_value = mock_reader + + mock_chunker = MagicMock() + mock_chunker.chunk.return_value = [doc] + mock_chunker_cls.return_value = mock_chunker + + result = ingest_worker( + task, "inputs", [".txt"], "job1", + "inputs/user1/job1", "job1", "user1" + ) + + assert result["directory"] == "inputs" + assert mock_storage.list_files.called + + @patch("application.worker.upload_index") + @patch("application.worker.embed_and_store_documents") + @patch("application.worker.count_tokens_docs", return_value=100) + @patch("application.worker.Chunker") + @patch("application.worker.SimpleDirectoryReader") + @patch("application.worker.StorageCreator") + def test_zip_file_triggers_extraction( + self, mock_sc, mock_reader_cls, mock_chunker_cls, + mock_count, mock_embed, mock_upload + ): + from application.worker import ingest_worker + + task = self._make_task() + mock_storage = MagicMock() + mock_storage.is_directory.return_value = False + + # Create a real zip in memory + zip_buf = io.BytesIO() + with zipfile.ZipFile(zip_buf, "w") as zf: + zf.writestr("inner.txt", "content") + zip_buf.seek(0) + mock_storage.get_file.return_value = zip_buf + mock_sc.get_storage.return_value = mock_storage + + doc = _make_doc("inner content") + mock_reader = MagicMock() + mock_reader.load_data.return_value = [doc] + mock_reader.directory_structure = {} + mock_reader_cls.return_value = mock_reader + + mock_chunker = MagicMock() + mock_chunker.chunk.return_value = [doc] + mock_chunker_cls.return_value = mock_chunker + + with patch("application.worker.extract_zip_recursive") as mock_extract: + ingest_worker( + task, "inputs", [".txt"], "job1", + "inputs/user1/job1/archive.zip", "archive.zip", "user1" + ) + mock_extract.assert_called_once() + + @patch("application.worker.upload_index") + @patch("application.worker.embed_and_store_documents") + @patch("application.worker.count_tokens_docs", return_value=100) + @patch("application.worker.Chunker") + @patch("application.worker.SimpleDirectoryReader") + @patch("application.worker.StorageCreator") + def test_file_name_map_applied( + self, mock_sc, mock_reader_cls, mock_chunker_cls, + mock_count, mock_embed, mock_upload + ): + from application.worker import ingest_worker + + task = self._make_task() + mock_storage = MagicMock() + mock_storage.is_directory.return_value = False + mock_storage.get_file.return_value = io.BytesIO(b"content") + mock_sc.get_storage.return_value = mock_storage + + doc = _make_doc("content", {"source": "safe_name.txt", "title": "safe_name.txt"}) + mock_reader = MagicMock() + mock_reader.load_data.return_value = [doc] + mock_reader.directory_structure = {} + mock_reader_cls.return_value = mock_reader + + mock_chunker = MagicMock() + mock_chunker.chunk.return_value = [doc] + mock_chunker_cls.return_value = mock_chunker + + ingest_worker( + task, "inputs", [".txt"], "job1", + "inputs/user1/job1/safe_name.txt", "safe_name.txt", "user1", + file_name_map={"safe_name.txt": "Original Name.txt"}, + ) + + assert doc.extra_info["filename"] == "Original Name.txt" + assert doc.extra_info["title"] == "Original Name.txt" + + @patch("application.worker.StorageCreator") + def test_ingest_worker_exception_propagated(self, mock_sc): + from application.worker import ingest_worker + + task = self._make_task() + mock_sc.get_storage.side_effect = Exception("storage error") + + with pytest.raises(Exception, match="storage error"): + ingest_worker( + task, "inputs", [".txt"], "job1", + "inputs/user1/job1/test.txt", "test.txt", "user1" + ) + + +# ────────────────────────────────────────────────────────────────────────────── +# remote_worker +# ────────────────────────────────────────────────────────────────────────────── + + +class TestRemoteWorker: + @patch("application.worker.shutil.rmtree") + @patch("application.worker.upload_index") + @patch("application.worker.embed_and_store_documents") + @patch("application.worker.count_tokens_docs", return_value=200) + @patch("application.worker.num_tokens_from_string", return_value=50) + @patch("application.worker.Chunker") + @patch("application.worker.RemoteCreator") + def test_upload_mode( + self, mock_rc, mock_chunker_cls, mock_num_tokens, + mock_count, mock_embed, mock_upload, mock_rmtree, tmp_path + ): + from application.worker import remote_worker + + task = MagicMock() + mock_loader = MagicMock() + doc = _make_doc("content", {"file_path": "test.md", "title": "test"}) + doc.doc_id = "doc1" + mock_loader.load_data.return_value = [doc] + mock_rc.create_loader.return_value = mock_loader + + mock_chunker = MagicMock() + mock_chunker.chunk.return_value = [doc] + mock_chunker_cls.return_value = mock_chunker + + result = remote_worker( + task, "http://example.com", "job1", "user1", "web", + directory=str(tmp_path), + ) + + assert result["name_job"] == "job1" + assert result["user"] == "user1" + assert result["limited"] is False + mock_upload.assert_called_once() + + @patch("application.worker.shutil.rmtree") + @patch("application.worker.upload_index") + @patch("application.worker.embed_and_store_documents") + @patch("application.worker.count_tokens_docs", return_value=100) + @patch("application.worker.num_tokens_from_string", return_value=10) + @patch("application.worker.Chunker") + @patch("application.worker.RemoteCreator") + def test_sync_mode( + self, mock_rc, mock_chunker_cls, mock_num_tokens, + mock_count, mock_embed, mock_upload, mock_rmtree, tmp_path + ): + from application.worker import remote_worker + + task = MagicMock() + mock_loader = MagicMock() + doc = _make_doc("content", {"file_path": "test.md"}) + doc.doc_id = "doc1" + mock_loader.load_data.return_value = [doc] + mock_rc.create_loader.return_value = mock_loader + + mock_chunker = MagicMock() + mock_chunker.chunk.return_value = [doc] + mock_chunker_cls.return_value = mock_chunker + + doc_id = str(ObjectId()) + result = remote_worker( + task, "http://example.com", "job1", "user1", "web", + directory=str(tmp_path), operation_mode="sync", doc_id=doc_id, + ) + + assert result["name_job"] == "job1" + + @patch("application.worker.shutil.rmtree") + @patch("application.worker.upload_index") + @patch("application.worker.embed_and_store_documents") + @patch("application.worker.count_tokens_docs", return_value=100) + @patch("application.worker.num_tokens_from_string", return_value=10) + @patch("application.worker.Chunker") + @patch("application.worker.RemoteCreator") + def test_sync_mode_invalid_doc_id( + self, mock_rc, mock_chunker_cls, mock_num_tokens, + mock_count, mock_embed, mock_upload, mock_rmtree, tmp_path + ): + from application.worker import remote_worker + + task = MagicMock() + mock_loader = MagicMock() + doc = _make_doc("content", {"file_path": "test.md"}) + doc.doc_id = "doc1" + mock_loader.load_data.return_value = [doc] + mock_rc.create_loader.return_value = mock_loader + + mock_chunker = MagicMock() + mock_chunker.chunk.return_value = [doc] + mock_chunker_cls.return_value = mock_chunker + + with pytest.raises(ValueError, match="doc_id must be provided"): + remote_worker( + task, "http://example.com", "job1", "user1", "web", + directory=str(tmp_path), operation_mode="sync", doc_id="invalid", + ) + + @patch("application.worker.RemoteCreator") + def test_exception_cleans_up(self, mock_rc, tmp_path): + from application.worker import remote_worker + + task = MagicMock() + mock_rc.create_loader.side_effect = Exception("loader error") + + with pytest.raises(Exception, match="loader error"): + remote_worker( + task, "http://example.com", "job1", "user1", "web", + directory=str(tmp_path), + ) + + +# ────────────────────────────────────────────────────────────────────────────── +# sync +# ────────────────────────────────────────────────────────────────────────────── + + +class TestSync: + @patch("application.worker.remote_worker") + def test_successful_sync(self, mock_rw): + from application.worker import sync + + task = MagicMock() + result = sync(task, "data", "job", "user", "web", "daily", "classic", "doc123") + assert result["status"] == "success" + mock_rw.assert_called_once() + + @patch("application.worker.remote_worker", side_effect=Exception("fail")) + def test_sync_error_returns_error_status(self, mock_rw): + from application.worker import sync + + task = MagicMock() + result = sync(task, "data", "job", "user", "web", "daily", "classic") + assert result["status"] == "error" + assert "fail" in result["error"] + + +# ────────────────────────────────────────────────────────────────────────────── +# sync_worker +# ────────────────────────────────────────────────────────────────────────────── + + +class TestSyncWorker: + @patch("application.worker.sync") + @patch("application.worker.sources_collection") + def test_syncs_matching_sources(self, mock_sources, mock_sync): + from application.worker import sync_worker + + mock_sources.find.return_value = [ + { + "name": "src1", + "user": "u1", + "type": "web", + "remote_data": "http://example.com", + "retriever": "classic", + "_id": ObjectId(), + "sync_frequency": "daily", + }, + { + "name": "src2", + "user": "u2", + "type": "web", + "remote_data": "http://other.com", + "retriever": "classic", + "_id": ObjectId(), + "sync_frequency": "weekly", # won't match + }, + ] + mock_sync.return_value = {"status": "success"} + + task = MagicMock() + result = sync_worker(task, "daily") + + assert result["total_sync_count"] == 1 + assert result["sync_success"] == 1 + assert result["sync_failure"] == 0 + + @patch("application.worker.sync") + @patch("application.worker.sources_collection") + def test_counts_failures(self, mock_sources, mock_sync): + from application.worker import sync_worker + + mock_sources.find.return_value = [ + { + "name": "src1", + "user": "u1", + "type": "web", + "remote_data": "data", + "retriever": "classic", + "_id": ObjectId(), + "sync_frequency": "daily", + } + ] + mock_sync.return_value = {"status": "error", "error": "fail"} + + result = sync_worker(MagicMock(), "daily") + assert result["sync_failure"] == 1 + assert result["sync_success"] == 0 + + @patch("application.worker.sources_collection") + def test_no_matching_sources(self, mock_sources): + from application.worker import sync_worker + + mock_sources.find.return_value = [] + result = sync_worker(MagicMock(), "daily") + assert result["total_sync_count"] == 0 + + +# ────────────────────────────────────────────────────────────────────────────── +# attachment_worker +# ────────────────────────────────────────────────────────────────────────────── + + +class TestAttachmentWorker: + def test_processes_and_stores_attachment(self, mock_mongo_db): + task = MagicMock() + mock_storage = MagicMock() + mock_storage.process_file.return_value = MagicMock( + text="extracted text", + extra_info={}, + ) + file_info = { + "filename": "doc.pdf", + "attachment_id": "507f1f77bcf86cd799439011", + "path": "inputs/user1/attachments/507f1f77bcf86cd799439011/doc.pdf", + "metadata": {"storage_type": "local"}, + } + + with patch( + "application.worker.StorageCreator.get_storage", + return_value=mock_storage, + ), patch("application.worker.num_tokens_from_string", return_value=50): + result = attachment_worker(task, file_info, "user1") + + assert result["filename"] == "doc.pdf" + assert result["attachment_id"] == "507f1f77bcf86cd799439011" + assert result["token_count"] == 50 + assert result["mime_type"] == "application/pdf" + + stored = mock_mongo_db["docsgpt"]["attachments"].find_one( + {"_id": ObjectId("507f1f77bcf86cd799439011")} + ) + assert stored is not None + assert stored["content"] == "extracted text" + + def test_truncates_large_content(self, mock_mongo_db): + task = MagicMock() + mock_storage = MagicMock() + mock_storage.process_file.return_value = MagicMock( + text="x" * 300000, + extra_info={}, + ) + file_info = { + "filename": "big.txt", + "attachment_id": "507f1f77bcf86cd799439012", + "path": "inputs/user1/attachments/big.txt", + "metadata": {}, + } + + token_calls = iter([200000, 50000]) + with patch( + "application.worker.StorageCreator.get_storage", + return_value=mock_storage, + ), patch("application.worker.num_tokens_from_string", side_effect=token_calls): + attachment_worker(task, file_info, "user1") + + # Content should have been truncated to 250000 chars + stored = mock_mongo_db["docsgpt"]["attachments"].find_one( + {"_id": ObjectId("507f1f77bcf86cd799439012")} + ) + assert len(stored["content"]) == 250000 + + def test_merges_transcript_metadata(self, mock_mongo_db): + task = MagicMock() + mock_storage = MagicMock() + mock_storage.process_file.return_value = MagicMock( + text="transcript text", + extra_info={ + "transcript_language": "en", + "transcript_duration_s": 10.0, + "transcript_provider": "openai", + "other_key": "ignored", + }, + ) + file_info = { + "filename": "audio.wav", + "attachment_id": "507f1f77bcf86cd799439013", + "path": "inputs/user1/attachments/audio.wav", + "metadata": {"storage_type": "local"}, + } + + with patch( + "application.worker.StorageCreator.get_storage", + return_value=mock_storage, + ), patch("application.worker.num_tokens_from_string", return_value=20): + result = attachment_worker(task, file_info, "user1") + + assert result["metadata"]["transcript_language"] == "en" + assert result["metadata"]["storage_type"] == "local" + # "other_key" should not be in metadata (doesn't start with "transcript_") + assert "other_key" not in result["metadata"] + + def test_exception_propagated(self, mock_mongo_db): + task = MagicMock() + file_info = { + "filename": "bad.txt", + "attachment_id": "507f1f77bcf86cd799439014", + "path": "inputs/user1/attachments/bad.txt", + "metadata": {}, + } + + with patch( + "application.worker.StorageCreator.get_storage", + side_effect=Exception("storage fail"), + ): + with pytest.raises(Exception, match="storage fail"): + attachment_worker(task, file_info, "user1") + + +# ────────────────────────────────────────────────────────────────────────────── +# agent_webhook_worker +# ────────────────────────────────────────────────────────────────────────────── + + +class TestAgentWebhookWorker: + @patch("application.worker.run_agent_logic") + @patch("application.worker.MongoDB") + def test_successful_webhook(self, mock_mongo, mock_run_agent): + from application.worker import agent_webhook_worker + + agent_id = str(ObjectId()) + mock_db = MagicMock() + mock_agents = MagicMock() + mock_agents.find_one.return_value = {"_id": ObjectId(agent_id), "key": "k"} + mock_db.__getitem__ = MagicMock(return_value=mock_agents) + mock_mongo.get_client.return_value = {"docsgpt": mock_db} + + mock_run_agent.return_value = {"answer": "response"} + + task = MagicMock() + result = agent_webhook_worker(task, agent_id, {"query": "hello"}) + + assert result["status"] == "success" + assert result["result"]["answer"] == "response" + + @patch("application.worker.MongoDB") + def test_agent_not_found(self, mock_mongo): + from application.worker import agent_webhook_worker + + agent_id = str(ObjectId()) + mock_db = MagicMock() + mock_agents = MagicMock() + mock_agents.find_one.return_value = None + mock_db.__getitem__ = MagicMock(return_value=mock_agents) + mock_mongo.get_client.return_value = {"docsgpt": mock_db} + + task = MagicMock() + result = agent_webhook_worker(task, agent_id, {"query": "hello"}) + + assert result["status"] == "error" + assert "not found" in result["error"] + + @patch("application.worker.run_agent_logic", side_effect=Exception("logic error")) + @patch("application.worker.MongoDB") + def test_agent_logic_failure(self, mock_mongo, mock_run_agent): + from application.worker import agent_webhook_worker + + agent_id = str(ObjectId()) + mock_db = MagicMock() + mock_agents = MagicMock() + mock_agents.find_one.return_value = {"_id": ObjectId(agent_id), "key": "k"} + mock_db.__getitem__ = MagicMock(return_value=mock_agents) + mock_mongo.get_client.return_value = {"docsgpt": mock_db} + + task = MagicMock() + result = agent_webhook_worker(task, agent_id, {"query": "hello"}) + + assert result["status"] == "error" + + +# ────────────────────────────────────────────────────────────────────────────── +# reingest_source_worker +# ────────────────────────────────────────────────────────────────────────────── + + +class TestReingestSourceWorker: + @patch("application.worker.StorageCreator") + @patch("application.worker.sources_collection") + def test_source_not_found(self, mock_sources, mock_sc): + from application.worker import reingest_source_worker + + mock_sources.find_one.return_value = None + task = MagicMock() + + with pytest.raises(ValueError, match="not found"): + reingest_source_worker(task, str(ObjectId()), "user1") + + @patch("application.worker.SimpleDirectoryReader") + @patch("application.worker.StorageCreator") + @patch("application.worker.sources_collection") + def test_no_changes_detected(self, mock_sources, mock_sc, mock_reader_cls): + from application.worker import reingest_source_worker + + source_id = str(ObjectId()) + structure = {"file.txt": {"type": "text/plain", "size_bytes": 100}} + mock_sources.find_one.return_value = { + "_id": ObjectId(source_id), + "user": "user1", + "file_path": "inputs/user1/source1", + "directory_structure": json.dumps(structure), + } + + mock_storage = MagicMock() + mock_storage.is_directory.return_value = True + mock_storage.list_files.return_value = ["inputs/user1/source1/file.txt"] + mock_storage.get_file.return_value = io.BytesIO(b"content") + mock_sc.get_storage.return_value = mock_storage + + mock_reader = MagicMock() + mock_reader.directory_structure = structure + mock_reader.load_data.return_value = [] + mock_reader_cls.return_value = mock_reader + + task = MagicMock() + result = reingest_source_worker(task, source_id, "user1") + + assert result["status"] == "no_changes" + assert result["added_files"] == [] + assert result["removed_files"] == [] + + @patch("application.worker.Chunker") + @patch("application.worker.SimpleDirectoryReader") + @patch("application.worker.StorageCreator") + @patch("application.worker.sources_collection") + def test_added_and_removed_files(self, mock_sources, mock_sc, mock_reader_cls, mock_chunker_cls): + from application.worker import reingest_source_worker + + source_id = str(ObjectId()) + old_structure = { + "old_file.txt": {"type": "text/plain", "size_bytes": 100}, + } + new_structure = { + "new_file.txt": {"type": "text/plain", "size_bytes": 200}, + } + + mock_sources.find_one.return_value = { + "_id": ObjectId(source_id), + "user": "user1", + "file_path": "inputs/user1/source1", + "directory_structure": json.dumps(old_structure), + } + + mock_storage = MagicMock() + mock_storage.is_directory.return_value = True + mock_storage.list_files.return_value = ["inputs/user1/source1/new_file.txt"] + mock_storage.get_file.return_value = io.BytesIO(b"content") + mock_sc.get_storage.return_value = mock_storage + + mock_reader = MagicMock() + mock_reader.directory_structure = new_structure + mock_reader.load_data.return_value = [] + mock_reader.file_token_counts = {} + mock_reader_cls.return_value = mock_reader + + mock_chunker = MagicMock() + mock_chunker.chunk.return_value = [] + mock_chunker_cls.return_value = mock_chunker + + mock_vector_store = MagicMock() + mock_vector_store.get_chunks.return_value = [ + {"metadata": {"source": "old_file.txt"}, "doc_id": "chunk1"} + ] + + with patch( + "application.vectorstore.vector_creator.VectorCreator.create_vectorstore", + return_value=mock_vector_store, + ): + task = MagicMock() + result = reingest_source_worker(task, source_id, "user1") + + assert result["status"] == "completed" + assert "old_file.txt" in result["removed_files"] + assert "new_file.txt" in result["added_files"] + assert result["chunks_deleted"] == 1 + + +# ────────────────────────────────────────────────────────────────────────────── +# ingest_connector +# ────────────────────────────────────────────────────────────────────────────── + + +class TestIngestConnector: + @patch("application.worker.upload_index") + @patch("application.worker.embed_and_store_documents") + @patch("application.worker.count_tokens_docs", return_value=300) + @patch("application.worker.Chunker") + @patch("application.worker.SimpleDirectoryReader") + @patch("application.worker.ConnectorCreator") + def test_successful_upload( + self, mock_cc, mock_reader_cls, mock_chunker_cls, + mock_count, mock_embed, mock_upload + ): + from application.worker import ingest_connector + + task = MagicMock() + + mock_connector = MagicMock() + mock_connector.download_to_directory.return_value = { + "files_downloaded": 2, + } + mock_cc.is_supported.return_value = True + mock_cc.create_connector.return_value = mock_connector + + doc = _make_doc("content", {"source": "/tmp/test/file.txt"}) + mock_reader = MagicMock() + mock_reader.load_data.return_value = [doc] + mock_reader.directory_structure = {} + mock_reader_cls.return_value = mock_reader + + mock_chunker = MagicMock() + mock_chunker.chunk.return_value = [doc] + mock_chunker_cls.return_value = mock_chunker + + result = ingest_connector( + task, "job1", "user1", "google_drive", + session_token="token123", + file_ids=["f1"], + ) + + assert result["status"] == "complete" + assert result["user"] == "user1" + mock_upload.assert_called_once() + + @patch("application.worker.ConnectorCreator") + def test_no_session_token(self, mock_cc): + from application.worker import ingest_connector + + task = MagicMock() + with pytest.raises(ValueError, match="requires session_token"): + ingest_connector(task, "job1", "user1", "google_drive") + + @patch("application.worker.ConnectorCreator") + def test_unsupported_connector(self, mock_cc): + from application.worker import ingest_connector + + task = MagicMock() + mock_cc.is_supported.return_value = False + mock_cc.get_supported_connectors.return_value = ["google_drive"] + + with pytest.raises(ValueError, match="Unsupported connector"): + ingest_connector( + task, "job1", "user1", "unknown", + session_token="token", + ) + + @patch("application.worker.ConnectorCreator") + def test_empty_download_result(self, mock_cc): + from application.worker import ingest_connector + + task = MagicMock() + mock_connector = MagicMock() + mock_connector.download_to_directory.return_value = { + "files_downloaded": 0, + "empty_result": True, + } + mock_cc.is_supported.return_value = True + mock_cc.create_connector.return_value = mock_connector + + result = ingest_connector( + task, "job1", "user1", "google_drive", + session_token="token", + ) + + assert result["tokens"] == 0 + + @patch("application.worker.upload_index") + @patch("application.worker.embed_and_store_documents") + @patch("application.worker.count_tokens_docs", return_value=100) + @patch("application.worker.Chunker") + @patch("application.worker.SimpleDirectoryReader") + @patch("application.worker.ConnectorCreator") + def test_sync_mode( + self, mock_cc, mock_reader_cls, mock_chunker_cls, + mock_count, mock_embed, mock_upload + ): + from application.worker import ingest_connector + + task = MagicMock() + mock_connector = MagicMock() + mock_connector.download_to_directory.return_value = {"files_downloaded": 1} + mock_cc.is_supported.return_value = True + mock_cc.create_connector.return_value = mock_connector + + doc = _make_doc("content", {"source": "file.txt"}) + mock_reader = MagicMock() + mock_reader.load_data.return_value = [doc] + mock_reader.directory_structure = {} + mock_reader_cls.return_value = mock_reader + + mock_chunker = MagicMock() + mock_chunker.chunk.return_value = [doc] + mock_chunker_cls.return_value = mock_chunker + + doc_id = str(ObjectId()) + result = ingest_connector( + task, "job1", "user1", "google_drive", + session_token="token", + operation_mode="sync", + doc_id=doc_id, + ) + + assert result["status"] == "complete" + + @patch("application.worker.upload_index") + @patch("application.worker.embed_and_store_documents") + @patch("application.worker.count_tokens_docs", return_value=100) + @patch("application.worker.Chunker") + @patch("application.worker.SimpleDirectoryReader") + @patch("application.worker.ConnectorCreator") + def test_invalid_operation_mode( + self, mock_cc, mock_reader_cls, mock_chunker_cls, + mock_count, mock_embed, mock_upload + ): + from application.worker import ingest_connector + + task = MagicMock() + mock_connector = MagicMock() + mock_connector.download_to_directory.return_value = {"files_downloaded": 1} + mock_cc.is_supported.return_value = True + mock_cc.create_connector.return_value = mock_connector + + doc = _make_doc("content") + mock_reader = MagicMock() + mock_reader.load_data.return_value = [doc] + mock_reader.directory_structure = {} + mock_reader_cls.return_value = mock_reader + + mock_chunker = MagicMock() + mock_chunker.chunk.return_value = [doc] + mock_chunker_cls.return_value = mock_chunker + + with pytest.raises(ValueError, match="Invalid operation_mode"): + ingest_connector( + task, "job1", "user1", "google_drive", + session_token="token", + operation_mode="invalid", + ) + + +# ────────────────────────────────────────────────────────────────────────────── +# mcp_oauth +# ────────────────────────────────────────────────────────────────────────────── + + +class TestMcpOauth: + @patch("application.worker.get_redis_instance") + def test_successful_oauth(self, mock_redis_fn): + from application.worker import mcp_oauth + + mock_redis = MagicMock() + mock_redis_fn.return_value = mock_redis + + task = MagicMock() + task.request.id = "task-123" + + mock_mcp_tool = MagicMock() + mock_mcp_tool.get_actions_metadata.return_value = [ + {"name": "tool1", "description": "A tool"} + ] + + async def fake_execute(*args): + return [] + + mock_mcp_tool._client = None + mock_mcp_tool._setup_client = MagicMock() + mock_mcp_tool._execute_with_client = MagicMock( + side_effect=lambda x: fake_execute() + ) + + with patch("application.agents.tools.mcp_tool.MCPTool", return_value=mock_mcp_tool): + result = mcp_oauth(task, {"url": "http://mcp.test"}, "user1") + + assert result["success"] is True + assert result["tools_count"] == 1 + + @patch("application.worker.get_redis_instance") + def test_oauth_discovery_failure(self, mock_redis_fn): + from application.worker import mcp_oauth + + mock_redis = MagicMock() + mock_redis_fn.return_value = mock_redis + + task = MagicMock() + task.request.id = "task-456" + + mock_mcp_tool = MagicMock() + mock_mcp_tool._client = None + mock_mcp_tool._setup_client = MagicMock() + + async def fail_execute(*args): + raise Exception("connection refused") + + mock_mcp_tool._execute_with_client = MagicMock( + side_effect=lambda x: fail_execute() + ) + + with patch("application.agents.tools.mcp_tool.MCPTool", return_value=mock_mcp_tool): + result = mcp_oauth(task, {"url": "http://mcp.test"}, "user1") + + assert result["success"] is False + assert "connection refused" in result["error"] + + +# ────────────────────────────────────────────────────────────────────────────── +# mcp_oauth_status +# ────────────────────────────────────────────────────────────────────────────── + + +class TestMcpOauthStatus: + @patch("application.worker.get_redis_instance") + def test_found_status(self, mock_redis_fn): + from application.worker import mcp_oauth_status + + mock_redis = MagicMock() + status = {"status": "completed", "tools_count": 3} + mock_redis.get.return_value = json.dumps(status) + mock_redis_fn.return_value = mock_redis + + task = MagicMock() + result = mcp_oauth_status(task, "task-123") + + assert result["status"] == "completed" + assert result["tools_count"] == 3 + + @patch("application.worker.get_redis_instance") + def test_not_found(self, mock_redis_fn): + from application.worker import mcp_oauth_status + + mock_redis = MagicMock() + mock_redis.get.return_value = None + mock_redis_fn.return_value = mock_redis + + task = MagicMock() + result = mcp_oauth_status(task, "task-999") + + assert result["status"] == "not_found" + + +# ────────────────────────────────────────────────────────────────────────────── +# Additional coverage for uncovered branches +# ────────────────────────────────────────────────────────────────────────────── + + +class TestValidateZipSafetyUncompressedLimit: + def test_exceeds_uncompressed_size(self, tmp_path): + """Cover line 170: uncompressed size exceeds MAX_UNCOMPRESSED_SIZE.""" + zip_path = str(tmp_path / "test.zip") + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr("small.txt", "ok") + with patch("application.worker.MAX_UNCOMPRESSED_SIZE", 1): + with pytest.raises(ZipExtractionError, match="uncompressed size exceeds"): + _validate_zip_safety(zip_path, str(tmp_path / "out")) + + +class TestRunAgentLogicValidModel: + @patch("application.worker.AgentCreator") + @patch("application.worker.RetrieverCreator") + @patch("application.worker.get_prompt", return_value="prompt") + @patch("application.worker.db") + def test_agent_default_model_used_when_valid( + self, mock_db, mock_get_prompt, mock_ret_creator, mock_agent_creator + ): + """Cover line 335: agent has valid default_model_id.""" + from application.worker import run_agent_logic + + mock_retriever = MagicMock() + mock_retriever.search.return_value = [] + mock_ret_creator.create_retriever.return_value = mock_retriever + + mock_agent = MagicMock() + mock_agent.gen.return_value = [{"answer": "ok"}] + mock_agent_creator.create_agent.return_value = mock_agent + + agent_config = { + "source": {}, + "key": "test-key", + "_id": ObjectId(), + "user": "test_user", + "default_model_id": "gpt-4o", + } + + with patch("application.core.model_utils.get_api_key_for_provider", return_value="k"), \ + patch("application.core.model_utils.get_default_model_id", return_value="gpt-4"), \ + patch("application.core.model_utils.get_provider_from_model_id", return_value="openai"), \ + patch("application.core.model_utils.validate_model_id", return_value=True), \ + patch("application.utils.calculate_doc_token_budget", return_value=2000): + result = run_agent_logic(agent_config, "input") + + assert result["answer"] == "ok" + # Verify it used the agent's default model, not the system default + mock_agent_creator.create_agent.assert_called_once() + call_kwargs = mock_agent_creator.create_agent.call_args + assert call_kwargs[1]["model_id"] == "gpt-4o" or call_kwargs.kwargs.get("model_id") == "gpt-4o" + + +class TestIngestWorkerDirectoryDownloadError: + @patch("application.worker.upload_index") + @patch("application.worker.embed_and_store_documents") + @patch("application.worker.count_tokens_docs", return_value=100) + @patch("application.worker.Chunker") + @patch("application.worker.SimpleDirectoryReader") + @patch("application.worker.StorageCreator") + def test_directory_file_download_error_skips( + self, mock_sc, mock_reader_cls, mock_chunker_cls, + mock_count, mock_embed, mock_upload + ): + """Cover lines 480-484: error downloading individual file in directory continues.""" + from application.worker import ingest_worker + + task = MagicMock() + mock_storage = MagicMock() + mock_storage.is_directory.side_effect = lambda p: not p.endswith(".txt") + mock_storage.list_files.return_value = [ + "inputs/user1/job1/a.txt", + "inputs/user1/job1/b.txt", + ] + # First file raises error, second succeeds + mock_storage.get_file.side_effect = [ + Exception("download failed"), + io.BytesIO(b"content"), + ] + mock_sc.get_storage.return_value = mock_storage + + doc = _make_doc("content", {"title": "b.txt"}) + mock_reader = MagicMock() + mock_reader.load_data.return_value = [doc] + mock_reader.directory_structure = {} + mock_reader_cls.return_value = mock_reader + + mock_chunker = MagicMock() + mock_chunker.chunk.return_value = [doc] + mock_chunker_cls.return_value = mock_chunker + + result = ingest_worker( + task, "inputs", [".txt"], "job1", + "inputs/user1/job1", "job1", "user1" + ) + + assert result["name_job"] == "job1" + + +class TestReingestSourceWorkerAddedFiles: + @patch("application.worker.Chunker") + @patch("application.worker.SimpleDirectoryReader") + @patch("application.worker.StorageCreator") + @patch("application.worker.sources_collection") + def test_added_files_with_chunks( + self, mock_sources, mock_sc, mock_reader_cls, mock_chunker_cls, tmp_path + ): + """Cover lines 774-850: adding chunks from new files with display names and metadata.""" + from application.worker import reingest_source_worker + + source_id = str(ObjectId()) + old_structure = {} # empty — everything is "new" + new_structure = { + "new_file.txt": {"type": "text/plain", "size_bytes": 200} + } + + mock_sources.find_one.return_value = { + "_id": ObjectId(source_id), + "user": "user1", + "file_path": "inputs/user1/source1", + "directory_structure": json.dumps(old_structure), + "file_name_map": json.dumps({"new_file.txt": "Original Name.txt"}), + } + + mock_storage = MagicMock() + mock_storage.is_directory.return_value = True + mock_storage.list_files.return_value = ["inputs/user1/source1/new_file.txt"] + mock_storage.get_file.return_value = io.BytesIO(b"new content") + mock_sc.get_storage.return_value = mock_storage + + new_doc = _make_doc("new content", {"source": "/tmp/new_file.txt"}) + + # First reader (for directory structure scan) + mock_reader1 = MagicMock() + mock_reader1.directory_structure = new_structure + mock_reader1.load_data.return_value = [] + mock_reader1.file_token_counts = {"new_file.txt": 50} + + # Second reader (for processing new files) + mock_reader2 = MagicMock() + mock_reader2.load_data.return_value = [new_doc] + mock_reader2.file_token_counts = {} + + mock_reader_cls.side_effect = [mock_reader1, mock_reader2] + + mock_chunker = MagicMock() + mock_chunker.chunk.return_value = [new_doc] + mock_chunker_cls.return_value = mock_chunker + + mock_vector_store = MagicMock() + mock_vector_store.get_chunks.return_value = [] + + # We need to ensure the file actually exists in the temp directory + # so os.path.isfile passes. Patch tempfile.TemporaryDirectory to use our tmp_path + temp_dir = str(tmp_path / "workdir") + os.makedirs(temp_dir) + # Create the "new_file.txt" so isfile check passes + (tmp_path / "workdir" / "new_file.txt").write_text("new content") + + with patch( + "application.vectorstore.vector_creator.VectorCreator.create_vectorstore", + return_value=mock_vector_store, + ), patch("tempfile.TemporaryDirectory") as mock_tmp: + mock_tmp.return_value.__enter__ = MagicMock(return_value=temp_dir) + mock_tmp.return_value.__exit__ = MagicMock(return_value=False) + task = MagicMock() + result = reingest_source_worker(task, source_id, "user1") + + assert result["status"] == "completed" + assert result["chunks_added"] == 1 + mock_vector_store.add_chunk.assert_called_once() + + +class TestReingestDirectoryDownloadError: + @patch("application.worker.SimpleDirectoryReader") + @patch("application.worker.StorageCreator") + @patch("application.worker.sources_collection") + def test_file_download_error_in_reingest_skips( + self, mock_sources, mock_sc, mock_reader_cls + ): + """Cover lines 631-645: error downloading file during reingest continues.""" + from application.worker import reingest_source_worker + + source_id = str(ObjectId()) + mock_sources.find_one.return_value = { + "_id": ObjectId(source_id), + "user": "user1", + "file_path": "inputs/user1/source1", + "directory_structure": "{}", + } + + mock_storage = MagicMock() + mock_storage.is_directory.side_effect = lambda p: not p.endswith(".txt") + mock_storage.list_files.return_value = [ + "inputs/user1/source1/fail.txt", + "inputs/user1/source1/ok.txt", + ] + mock_storage.get_file.side_effect = [ + Exception("download error"), + io.BytesIO(b"content"), + ] + mock_sc.get_storage.return_value = mock_storage + + mock_reader = MagicMock() + mock_reader.directory_structure = {} + mock_reader.load_data.return_value = [] + mock_reader_cls.return_value = mock_reader + + task = MagicMock() + result = reingest_source_worker(task, source_id, "user1") + + assert result["status"] == "no_changes" + + +class TestMcpOauthInitError: + @patch("application.worker.get_redis_instance") + def test_init_failure(self, mock_redis_fn): + """Cover lines 1497-1507: outer exception handler in mcp_oauth.""" + from application.worker import mcp_oauth + + mock_redis = MagicMock() + mock_redis_fn.return_value = mock_redis + + task = MagicMock() + task.request.id = "task-789" + + with patch( + "application.agents.tools.mcp_tool.MCPTool", + side_effect=Exception("init crash"), + ): + result = mcp_oauth(task, {"url": "http://mcp.test"}, "user1") + + assert result["success"] is False + assert "init" in result["error"].lower()