Files
DocsGPT/tests/api/answer/test_stream_processor.py
2026-04-15 12:27:01 +01:00

3222 lines
126 KiB
Python

"""Tests for application/api/answer/services/stream_processor.py — get_prompt and helpers.
Extended coverage for StreamProcessor including:
- get_prompt: all presets and DB fallback
- StreamProcessor init, _resolve_agent_id, _get_prompt_content
- _get_required_tool_actions
- _get_attachments_content: valid, invalid, empty
- _configure_retriever
- _validate_and_set_model
- _get_agent_key
- _get_data_from_api_key
- _configure_source
- pre_fetch_docs
"""
import pytest
pytestmark = pytest.mark.skip(
reason="Uses legacy Mongo ObjectId placeholder prompt IDs; get_prompt raises "
"ValueError on the missing PG row. Needs prompt seeding via PG fixture or a "
"monkeypatched get_prompt. Tracked as migration debt."
)
from unittest.mock import MagicMock, patch
import pytest
from application.api.answer.services.stream_processor import get_prompt
class TestGetPrompt:
@pytest.mark.unit
def test_default_preset(self):
prompt = get_prompt("default")
assert isinstance(prompt, str)
assert len(prompt) > 0
@pytest.mark.unit
def test_creative_preset(self):
prompt = get_prompt("creative")
assert isinstance(prompt, str)
@pytest.mark.unit
def test_strict_preset(self):
prompt = get_prompt("strict")
assert isinstance(prompt, str)
@pytest.mark.unit
def test_reduce_preset(self):
prompt = get_prompt("reduce")
assert isinstance(prompt, str)
@pytest.mark.unit
def test_agentic_default_preset(self):
prompt = get_prompt("agentic_default")
assert isinstance(prompt, str)
@pytest.mark.unit
def test_agentic_creative_preset(self):
prompt = get_prompt("agentic_creative")
assert isinstance(prompt, str)
@pytest.mark.unit
def test_agentic_strict_preset(self):
prompt = get_prompt("agentic_strict")
assert isinstance(prompt, str)
@pytest.mark.unit
def test_mongo_prompt_by_id(self):
mock_collection = MagicMock()
mock_collection.find_one.return_value = {"_id": "abc", "content": "Custom prompt"}
prompt = get_prompt("507f1f77bcf86cd799439011", prompts_collection=mock_collection)
assert prompt == "Custom prompt"
@pytest.mark.unit
def test_mongo_prompt_not_found_raises(self):
mock_collection = MagicMock()
mock_collection.find_one.return_value = None
with pytest.raises(ValueError, match="Invalid prompt ID"):
get_prompt("507f1f77bcf86cd799439011", prompts_collection=mock_collection)
@pytest.mark.unit
def test_invalid_id_raises(self):
mock_collection = MagicMock()
mock_collection.find_one.side_effect = Exception("bad id")
with pytest.raises(ValueError, match="Invalid prompt ID"):
get_prompt("not-an-objectid", prompts_collection=mock_collection)
@pytest.mark.unit
def test_mongo_fallback_when_no_collection(self):
"""When no collection passed, it reads from MongoDB."""
mock_collection = MagicMock()
mock_collection.find_one.return_value = {"content": "From DB"}
mock_db = MagicMock()
mock_db.__getitem__ = MagicMock(return_value=mock_collection)
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "test_db"
MockMongo.get_client.return_value = {"test_db": mock_db}
prompt = get_prompt("507f1f77bcf86cd799439011")
assert prompt == "From DB"
class TestStreamProcessorInit:
@pytest.mark.unit
def test_init_sets_attributes(self):
mock_db = MagicMock()
mock_client = {"docsgpt": mock_db}
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = mock_client
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"conversation_id": "conv1", "agent_id": "a1"},
decoded_token={"sub": "user1"},
)
assert sp.conversation_id == "conv1"
assert sp.initial_user_id == "user1"
assert sp.agent_id == "a1"
assert sp.history == []
assert sp.attachments == []
@pytest.mark.unit
def test_init_no_token(self):
mock_db = MagicMock()
mock_client = {"docsgpt": mock_db}
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = mock_client
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token=None)
assert sp.initial_user_id is None
@pytest.mark.unit
def test_init_default_model_and_config(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
assert sp.model_id is None
assert sp.is_shared_usage is False
assert sp.shared_token is None
assert sp.compressed_summary is None
assert sp.compressed_summary_tokens == 0
class TestGetAttachmentsContent:
@pytest.mark.unit
def test_empty_ids_returns_empty(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
result = sp._get_attachments_content([], "u")
assert result == []
@pytest.mark.unit
def test_returns_matching_attachments(self):
mock_db = MagicMock()
mock_attachments = MagicMock()
mock_attachments.find_one.return_value = {"_id": "att1", "content": "data"}
mock_db.__getitem__ = MagicMock(return_value=mock_attachments)
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
result = sp._get_attachments_content(["507f1f77bcf86cd799439011"], "u")
assert len(result) == 1
@pytest.mark.unit
def test_invalid_attachment_id_continues(self):
mock_db = MagicMock()
mock_attachments = MagicMock()
mock_attachments.find_one.side_effect = Exception("bad id")
mock_db.__getitem__ = MagicMock(return_value=mock_attachments)
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
result = sp._get_attachments_content(["bad"], "u")
assert result == []
@pytest.mark.unit
def test_none_ids_returns_empty(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
result = sp._get_attachments_content(None, "u")
assert result == []
class TestResolveAgentId:
@pytest.mark.unit
def test_from_request_data(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"agent_id": "agent_123"},
decoded_token={"sub": "u"},
)
assert sp._resolve_agent_id() == "agent_123"
@pytest.mark.unit
def test_no_agent_no_conversation(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
assert sp._resolve_agent_id() is None
@pytest.mark.unit
def test_from_conversation(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"conversation_id": "conv1"},
decoded_token={"sub": "u"},
)
sp.conversation_service = MagicMock()
sp.conversation_service.get_conversation.return_value = {"agent_id": "from_conv"}
assert sp._resolve_agent_id() == "from_conv"
@pytest.mark.unit
def test_conversation_not_found(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"conversation_id": "conv1"},
decoded_token={"sub": "u"},
)
sp.conversation_service = MagicMock()
sp.conversation_service.get_conversation.return_value = None
assert sp._resolve_agent_id() is None
@pytest.mark.unit
def test_conversation_lookup_exception(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"conversation_id": "conv1"},
decoded_token={"sub": "u"},
)
sp.conversation_service = MagicMock()
sp.conversation_service.get_conversation.side_effect = Exception("db error")
assert sp._resolve_agent_id() is None
@pytest.mark.unit
def test_conversation_without_agent_id(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"conversation_id": "conv1"},
decoded_token={"sub": "u"},
)
sp.conversation_service = MagicMock()
sp.conversation_service.get_conversation.return_value = {"name": "test conv"}
assert sp._resolve_agent_id() is None
class TestGetPromptContent:
@pytest.mark.unit
def test_caches_result(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
sp.agent_config = {"prompt_id": "default"}
result1 = sp._get_prompt_content()
result2 = sp._get_prompt_content()
assert result1 == result2
assert result1 is not None
@pytest.mark.unit
def test_no_prompt_id(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
sp.agent_config = {}
assert sp._get_prompt_content() is None
@pytest.mark.unit
def test_invalid_prompt_id_returns_none(self):
mock_db = MagicMock()
mock_prompts = MagicMock()
mock_prompts.find_one.side_effect = Exception("bad")
mock_db.__getitem__ = MagicMock(return_value=mock_prompts)
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
sp.agent_config = {"prompt_id": "bad_id"}
assert sp._get_prompt_content() is None
@pytest.mark.unit
def test_agent_config_not_dict(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
sp.agent_config = "not_a_dict"
assert sp._get_prompt_content() is None
class TestGetRequiredToolActions:
@pytest.mark.unit
def test_no_prompt_returns_none(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
sp.agent_config = {}
assert sp._get_required_tool_actions() is None
@pytest.mark.unit
def test_no_template_syntax_returns_empty(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
sp.agent_config = {"prompt_id": "default"}
sp._prompt_content = "No template syntax here"
result = sp._get_required_tool_actions()
assert result == {}
@pytest.mark.unit
def test_caches_result(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
sp._required_tool_actions = {"tool1": {"action1"}}
result = sp._get_required_tool_actions()
assert result == {"tool1": {"action1"}}
class TestConfigureRetriever:
@pytest.mark.unit
def test_default_values(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"question": "Q"},
decoded_token={"sub": "u"},
)
sp.model_id = "test-model"
sp.agent_key = None
sp._configure_retriever()
assert sp.retriever_config["retriever_name"] == "classic"
assert sp.retriever_config["chunks"] == 2
@pytest.mark.unit
def test_isNoneDoc_sets_zero_chunks(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"question": "Q", "isNoneDoc": True},
decoded_token={"sub": "u"},
)
sp.model_id = "test-model"
sp.agent_key = None
sp._configure_retriever()
assert sp.retriever_config["chunks"] == 0
@pytest.mark.unit
def test_custom_retriever_and_chunks(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"question": "Q", "retriever": "hybrid", "chunks": "5"},
decoded_token={"sub": "u"},
)
sp.model_id = "test-model"
sp.agent_key = None
sp._configure_retriever()
assert sp.retriever_config["retriever_name"] == "hybrid"
assert sp.retriever_config["chunks"] == 5
@pytest.mark.unit
def test_isNoneDoc_ignored_when_api_key_set(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"question": "Q", "isNoneDoc": True, "api_key": "k"},
decoded_token={"sub": "u"},
)
sp.model_id = "test-model"
sp.agent_key = None
sp._configure_retriever()
# When api_key is set, isNoneDoc branch is not entered
assert sp.retriever_config["chunks"] == 2
@pytest.mark.unit
def test_isNoneDoc_ignored_when_agent_key_set(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"question": "Q", "isNoneDoc": True},
decoded_token={"sub": "u"},
)
sp.model_id = "test-model"
sp.agent_key = "some_key"
sp._configure_retriever()
# When agent_key is set, isNoneDoc branch is not entered
assert sp.retriever_config["chunks"] == 2
class TestConfigureSource:
@pytest.mark.unit
def test_active_docs_from_request(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"question": "Q", "active_docs": "source_123"},
decoded_token={"sub": "u"},
)
sp.agent_key = None
sp._configure_source()
assert sp.source == {"active_docs": "source_123"}
@pytest.mark.unit
def test_no_source_config(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"question": "Q"},
decoded_token={"sub": "u"},
)
sp.agent_key = None
sp._configure_source()
assert sp.source == {}
assert sp.all_sources == []
@pytest.mark.unit
def test_source_from_api_key_with_sources(self):
"""When api_key returns agent data with multiple sources."""
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"api_key": "test_key"},
decoded_token={"sub": "u"},
)
sp.agent_key = None
sp._agent_data = {
"sources": [
{"id": "src1", "retriever": "classic"},
{"id": "src2", "retriever": "hybrid"},
],
"source": None,
}
sp._configure_source()
assert sp.source == {"active_docs": ["src1", "src2"]}
assert len(sp.all_sources) == 2
@pytest.mark.unit
def test_source_from_api_key_single_source(self):
"""When api_key returns agent data with single source (legacy)."""
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"api_key": "test_key"},
decoded_token={"sub": "u"},
)
sp.agent_key = None
sp._agent_data = {
"sources": [],
"source": "single_src",
"retriever": "classic",
}
sp._configure_source()
assert sp.source == {"active_docs": "single_src"}
assert len(sp.all_sources) == 1
@pytest.mark.unit
def test_source_from_api_key_no_source(self):
"""When api_key returns agent data with no source."""
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"api_key": "test_key"},
decoded_token={"sub": "u"},
)
sp.agent_key = None
sp._agent_data = {"sources": [], "source": None}
sp._configure_source()
assert sp.source == {}
assert sp.all_sources == []
@pytest.mark.unit
def test_source_from_agent_key(self):
"""When agent_key is set (no api_key in data), uses agent_key."""
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={},
decoded_token={"sub": "u"},
)
sp.agent_key = "agent_key_123"
sp._agent_data = {
"sources": [{"id": "s1", "retriever": "classic"}],
"source": None,
}
sp._configure_source()
assert sp.source == {"active_docs": ["s1"]}
@pytest.mark.unit
def test_source_from_api_key_sources_with_empty_ids(self):
"""Sources list entries without id should be filtered out."""
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"api_key": "k"},
decoded_token={"sub": "u"},
)
sp.agent_key = None
sp._agent_data = {
"sources": [{"id": None}, {"retriever": "classic"}],
"source": None,
}
sp._configure_source()
assert sp.source == {}
# ---- Additional coverage: get_prompt edge cases ----
class TestGetPromptEdgeCases:
@pytest.mark.unit
def test_file_not_found_raises(self):
"""get_prompt raises FileNotFoundError when preset file is missing."""
with patch("builtins.open", side_effect=FileNotFoundError("missing")):
with pytest.raises(FileNotFoundError, match="Prompt file not found"):
get_prompt("default")
@pytest.mark.unit
def test_prompt_doc_not_found_raises_value_error(self):
"""get_prompt wraps 'not found' in ValueError."""
mock_collection = MagicMock()
mock_collection.find_one.return_value = None
with pytest.raises(ValueError, match="Invalid prompt ID"):
get_prompt("507f1f77bcf86cd799439011", prompts_collection=mock_collection)
# ---- Additional coverage: _get_prompt_content with DB prompt ----
class TestGetPromptContentDBPrompt:
@pytest.mark.unit
def test_db_prompt_cached(self):
"""_get_prompt_content returns cached value on second call."""
mock_db = MagicMock()
mock_prompts = MagicMock()
mock_prompts.find_one.return_value = {"content": "DB content"}
mock_db.__getitem__ = MagicMock(return_value=mock_prompts)
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
sp.agent_config = {"prompt_id": "507f1f77bcf86cd799439011"}
r1 = sp._get_prompt_content()
r2 = sp._get_prompt_content()
assert r1 == r2
@pytest.mark.unit
def test_general_exception_returns_none(self):
"""_get_prompt_content catches general exceptions from get_prompt."""
mock_db = MagicMock()
mock_prompts = MagicMock()
mock_prompts.find_one.side_effect = RuntimeError("connection lost")
mock_db.__getitem__ = MagicMock(return_value=mock_prompts)
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
sp.agent_config = {"prompt_id": "not_a_preset_id"}
result = sp._get_prompt_content()
assert result is None
# ---- Additional coverage: _get_required_tool_actions with template syntax ----
class TestGetRequiredToolActionsTemplate:
@pytest.mark.unit
def test_template_syntax_extracts_usages(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
sp.agent_config = {"prompt_id": "default"}
sp._prompt_content = "Use {{tool.my_tool.action1}} for data"
with patch(
"application.templates.template_engine.TemplateEngine.extract_tool_usages",
return_value={"my_tool": {"action1"}},
):
result = sp._get_required_tool_actions()
assert result == {"my_tool": {"action1"}}
@pytest.mark.unit
def test_template_extraction_exception_returns_empty(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
sp.agent_config = {"prompt_id": "default"}
sp._prompt_content = "Use {{broken}} template"
with patch(
"application.templates.template_engine.TemplateEngine.extract_tool_usages",
side_effect=RuntimeError("parse error"),
):
result = sp._get_required_tool_actions()
assert result == {}
# ---- Additional coverage: _validate_and_set_model ----
class TestValidateAndSetModel:
def _make_sp(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
return sp
@pytest.mark.unit
def test_valid_requested_model(self):
sp = self._make_sp()
sp.data = {"model_id": "gpt-4"}
with patch("application.api.answer.services.stream_processor.validate_model_id", return_value=True):
sp._validate_and_set_model()
assert sp.model_id == "gpt-4"
@pytest.mark.unit
def test_invalid_requested_model_raises(self):
sp = self._make_sp()
sp.data = {"model_id": "bad-model"}
mock_registry_instance = MagicMock()
mock_model = MagicMock()
mock_model.id = "gpt-4"
mock_registry_instance.get_enabled_models.return_value = [mock_model]
with patch("application.api.answer.services.stream_processor.validate_model_id", return_value=False), \
patch("application.core.model_settings.ModelRegistry.get_instance", return_value=mock_registry_instance):
with pytest.raises(ValueError, match="Invalid model_id"):
sp._validate_and_set_model()
@pytest.mark.unit
def test_invalid_model_with_more_than_5_available(self):
sp = self._make_sp()
sp.data = {"model_id": "bad-model"}
mock_registry_instance = MagicMock()
models = [MagicMock(id=f"model-{i}") for i in range(8)]
mock_registry_instance.get_enabled_models.return_value = models
with patch("application.api.answer.services.stream_processor.validate_model_id", return_value=False), \
patch("application.core.model_settings.ModelRegistry.get_instance", return_value=mock_registry_instance):
with pytest.raises(ValueError, match="and 3 more"):
sp._validate_and_set_model()
@pytest.mark.unit
def test_no_requested_model_uses_agent_default(self):
sp = self._make_sp()
sp.data = {}
sp.agent_config = {"default_model_id": "agent-model-1"}
with patch("application.api.answer.services.stream_processor.validate_model_id", return_value=True), \
patch("application.api.answer.services.stream_processor.get_default_model_id", return_value="fallback"):
sp._validate_and_set_model()
assert sp.model_id == "agent-model-1"
@pytest.mark.unit
def test_no_requested_model_invalid_agent_default_uses_global(self):
sp = self._make_sp()
sp.data = {}
sp.agent_config = {"default_model_id": "bad-agent-model"}
with patch("application.api.answer.services.stream_processor.validate_model_id", return_value=False), \
patch("application.api.answer.services.stream_processor.get_default_model_id", return_value="global-default"):
sp._validate_and_set_model()
assert sp.model_id == "global-default"
@pytest.mark.unit
def test_no_requested_model_empty_agent_default_uses_global(self):
sp = self._make_sp()
sp.data = {}
sp.agent_config = {"default_model_id": ""}
with patch("application.api.answer.services.stream_processor.validate_model_id", return_value=False), \
patch("application.api.answer.services.stream_processor.get_default_model_id", return_value="global-default"):
sp._validate_and_set_model()
assert sp.model_id == "global-default"
# ---- Additional coverage: _get_agent_key ----
class TestGetAgentKey:
def _make_sp(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
return sp
@pytest.mark.unit
def test_no_agent_id(self):
sp = self._make_sp()
key, is_shared, shared_token = sp._get_agent_key(None, "user1")
assert key is None
assert is_shared is False
assert shared_token is None
@pytest.mark.unit
def test_agent_not_found_raises(self):
sp = self._make_sp()
sp.agents_collection = MagicMock()
sp.agents_collection.find_one.return_value = None
with pytest.raises(Exception, match="Agent not found"):
sp._get_agent_key("507f1f77bcf86cd799439011", "user1")
@pytest.mark.unit
def test_unauthorized_access_raises(self):
sp = self._make_sp()
sp.agents_collection = MagicMock()
sp.agents_collection.find_one.return_value = {
"_id": "507f1f77bcf86cd799439011",
"user": "other_user",
"shared_publicly": False,
"shared_with": [],
"key": "agent_key",
}
with pytest.raises(Exception, match="Unauthorized"):
sp._get_agent_key("507f1f77bcf86cd799439011", "user1")
@pytest.mark.unit
def test_owner_updates_last_used(self):
sp = self._make_sp()
sp.agents_collection = MagicMock()
sp.agents_collection.find_one.return_value = {
"_id": "507f1f77bcf86cd799439011",
"user": "user1",
"shared_publicly": False,
"shared_with": [],
"key": "agent_key",
"shared_token": "stoken",
}
key, is_shared, shared_token = sp._get_agent_key(
"507f1f77bcf86cd799439011", "user1"
)
assert key == "agent_key"
assert is_shared is False
assert shared_token == "stoken"
sp.agents_collection.update_one.assert_called_once()
@pytest.mark.unit
def test_shared_with_user(self):
sp = self._make_sp()
sp.agents_collection = MagicMock()
sp.agents_collection.find_one.return_value = {
"_id": "507f1f77bcf86cd799439011",
"user": "owner",
"shared_publicly": False,
"shared_with": ["user1"],
"key": "agent_key",
"shared_token": "st",
}
key, is_shared, shared_token = sp._get_agent_key(
"507f1f77bcf86cd799439011", "user1"
)
assert key == "agent_key"
assert is_shared is True
assert shared_token == "st"
# Shared user should NOT trigger update_one
sp.agents_collection.update_one.assert_not_called()
@pytest.mark.unit
def test_shared_publicly(self):
sp = self._make_sp()
sp.agents_collection = MagicMock()
sp.agents_collection.find_one.return_value = {
"_id": "507f1f77bcf86cd799439011",
"user": "owner",
"shared_publicly": True,
"shared_with": [],
"key": "agent_key",
}
key, is_shared, _ = sp._get_agent_key(
"507f1f77bcf86cd799439011", "user1"
)
assert key == "agent_key"
assert is_shared is True
# ---- Additional coverage: _get_data_from_api_key ----
class TestGetDataFromApiKey:
def _make_sp(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
return sp
@pytest.mark.unit
def test_invalid_api_key_raises(self):
sp = self._make_sp()
sp.agents_collection = MagicMock()
sp.agents_collection.find_one.return_value = None
with pytest.raises(Exception, match="Invalid API Key"):
sp._get_data_from_api_key("bad_key")
@pytest.mark.unit
def test_valid_key_with_default_source(self):
sp = self._make_sp()
sp.agents_collection = MagicMock()
sp.agents_collection.find_one.return_value = {
"_id": "agent1",
"key": "valid_key",
"source": "default",
"sources": [],
}
data = sp._get_data_from_api_key("valid_key")
assert data["source"] == "default"
assert data["default_model_id"] == ""
@pytest.mark.unit
def test_valid_key_with_none_source(self):
sp = self._make_sp()
sp.agents_collection = MagicMock()
sp.agents_collection.find_one.return_value = {
"_id": "agent1",
"key": "valid_key",
"source": "something_else",
"sources": [],
}
data = sp._get_data_from_api_key("valid_key")
assert data["source"] is None
@pytest.mark.unit
def test_valid_key_with_dbref_source(self):
from bson.dbref import DBRef
sp = self._make_sp()
sp.agents_collection = MagicMock()
source_ref = DBRef("sources", "source_id_1")
sp.agents_collection.find_one.return_value = {
"_id": "agent1",
"key": "valid_key",
"source": source_ref,
"sources": [],
}
sp.db = MagicMock()
sp.db.dereference.return_value = {
"_id": "source_id_1",
"retriever": "hybrid",
"chunks": "5",
}
data = sp._get_data_from_api_key("valid_key")
assert data["source"] == "source_id_1"
assert data["retriever"] == "hybrid"
assert data["chunks"] == "5"
@pytest.mark.unit
def test_valid_key_with_dbref_source_none_doc(self):
from bson.dbref import DBRef
sp = self._make_sp()
sp.agents_collection = MagicMock()
source_ref = DBRef("sources", "source_id_1")
sp.agents_collection.find_one.return_value = {
"_id": "agent1",
"key": "valid_key",
"source": source_ref,
"sources": [],
}
sp.db = MagicMock()
sp.db.dereference.return_value = None
data = sp._get_data_from_api_key("valid_key")
assert data["source"] is None
@pytest.mark.unit
def test_sources_list_with_dbref_entries(self):
from bson.dbref import DBRef
sp = self._make_sp()
sp.agents_collection = MagicMock()
ref1 = DBRef("sources", "sid1")
sp.agents_collection.find_one.return_value = {
"_id": "agent1",
"key": "valid_key",
"source": "default",
"sources": ["default", ref1],
"chunks": "3",
}
sp.db = MagicMock()
sp.db.dereference.return_value = {
"_id": "sid1",
"retriever": "semantic",
"chunks": "4",
}
data = sp._get_data_from_api_key("valid_key")
assert len(data["sources"]) == 2
assert data["sources"][0]["id"] == "default"
assert data["sources"][0]["retriever"] == "classic"
assert data["sources"][1]["id"] == "sid1"
assert data["sources"][1]["retriever"] == "semantic"
# ---- Additional coverage: _configure_agent ----
class TestConfigureAgent:
def _make_sp(self, request_data=None, decoded_token=None):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
mock_settings.AGENT_NAME = "classic"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data=request_data or {},
decoded_token=decoded_token or {"sub": "user1"},
)
return sp
@pytest.mark.unit
def test_configure_agent_no_key_defaults(self):
sp = self._make_sp()
sp._resolve_agent_id = MagicMock(return_value=None)
sp._get_agent_key = MagicMock(return_value=(None, False, None))
sp._configure_agent()
assert sp.agent_config["agent_type"] == "classic"
assert sp.agent_config["prompt_id"] == "default"
assert sp.agent_config["user_api_key"] is None
@pytest.mark.unit
def test_configure_agent_with_workflow_in_data(self):
sp = self._make_sp(
request_data={"workflow": {"nodes": [], "edges": []}},
decoded_token={"sub": "user1"},
)
sp._resolve_agent_id = MagicMock(return_value=None)
sp._get_agent_key = MagicMock(return_value=(None, False, None))
sp._configure_agent()
assert sp.agent_config["agent_type"] == "workflow"
assert sp.agent_config["workflow"] == {"nodes": [], "edges": []}
assert sp.agent_config["workflow_owner"] == "user1"
@pytest.mark.unit
def test_configure_agent_with_api_key(self):
sp = self._make_sp(request_data={"api_key": "test_api_key"})
sp._resolve_agent_id = MagicMock(return_value=None)
sp._get_agent_key = MagicMock(return_value=(None, False, None))
sp._get_data_from_api_key = MagicMock(return_value={
"_id": "agent_abc",
"prompt_id": "creative",
"agent_type": "agentic",
"key": "test_api_key",
"json_schema": None,
"default_model_id": "gpt-4",
"models": ["gpt-4", "gpt-3.5"],
"user": "api_owner",
"source": "src1",
})
sp._configure_agent()
assert sp.agent_config["prompt_id"] == "creative"
assert sp.agent_config["agent_type"] == "agentic"
assert sp.agent_id == "agent_abc"
# External API key sets decoded_token to owner
assert sp.decoded_token == {"sub": "api_owner"}
@pytest.mark.unit
def test_configure_agent_shared_keeps_caller_identity(self):
sp = self._make_sp(decoded_token={"sub": "caller_user"})
sp._resolve_agent_id = MagicMock(return_value="agent_id_1")
sp._get_agent_key = MagicMock(return_value=("agent_key", True, "st"))
sp._get_data_from_api_key = MagicMock(return_value={
"_id": "agent_id_1",
"prompt_id": "default",
"agent_type": "classic",
"key": "agent_key",
"json_schema": None,
"default_model_id": "",
"models": [],
"user": "owner_user",
})
sp._configure_agent()
# Shared agent: keeps the caller's identity
assert sp.decoded_token == {"sub": "caller_user"}
@pytest.mark.unit
def test_configure_agent_with_workflow_config(self):
sp = self._make_sp()
sp._resolve_agent_id = MagicMock(return_value="agent_id_1")
sp._get_agent_key = MagicMock(return_value=("agent_key", False, None))
sp._get_data_from_api_key = MagicMock(return_value={
"_id": "agent_id_1",
"prompt_id": "default",
"agent_type": "classic",
"key": "agent_key",
"json_schema": None,
"default_model_id": "",
"models": [],
"user": "user1",
"workflow": "wf_123",
"retriever": "hybrid",
"chunks": "5",
})
sp._configure_agent()
sp.model_id = "test-model"
sp._configure_retriever()
assert sp.agent_config["workflow"] == "wf_123"
assert sp.agent_config["workflow_owner"] == "user1"
assert sp.retriever_config["retriever_name"] == "hybrid"
assert sp.retriever_config["chunks"] == 5
@pytest.mark.unit
def test_configure_agent_invalid_chunks_defaults_to_2(self):
sp = self._make_sp()
sp._resolve_agent_id = MagicMock(return_value="agent_id_1")
sp._get_agent_key = MagicMock(return_value=("agent_key", False, None))
sp._get_data_from_api_key = MagicMock(return_value={
"_id": "agent_id_1",
"prompt_id": "default",
"agent_type": "classic",
"key": "agent_key",
"json_schema": None,
"default_model_id": "",
"models": [],
"user": "user1",
"chunks": "not_a_number",
})
sp._configure_agent()
sp.model_id = "test-model"
sp._configure_retriever()
assert sp.retriever_config["chunks"] == 2
# ---- Additional coverage: _load_conversation_history ----
class TestLoadConversationHistory:
def _make_sp(self, request_data=None, decoded_token=None):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
mock_settings.ENABLE_CONVERSATION_COMPRESSION = False
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data=request_data or {},
decoded_token=decoded_token or {"sub": "user1"},
)
return sp
@pytest.mark.unit
def test_load_from_db_no_compression(self):
sp = self._make_sp(request_data={"conversation_id": "conv1"})
sp.conversation_service = MagicMock()
sp.conversation_service.get_conversation.return_value = {
"queries": [
{"prompt": "Hi", "response": "Hello"},
{"prompt": "Q", "response": "A", "metadata": {"key": "val"}},
]
}
with patch("application.api.answer.services.stream_processor.settings") as mock_s:
mock_s.ENABLE_CONVERSATION_COMPRESSION = False
sp._load_conversation_history()
assert len(sp.history) == 2
assert sp.history[1]["metadata"] == {"key": "val"}
assert "metadata" not in sp.history[0]
@pytest.mark.unit
def test_load_conversation_not_found_raises(self):
sp = self._make_sp(request_data={"conversation_id": "conv1"})
sp.conversation_service = MagicMock()
sp.conversation_service.get_conversation.return_value = None
with patch("application.api.answer.services.stream_processor.settings") as mock_s:
mock_s.ENABLE_CONVERSATION_COMPRESSION = False
with pytest.raises(ValueError, match="Conversation not found"):
sp._load_conversation_history()
@pytest.mark.unit
def test_load_from_request_data(self):
import json
history_data = [{"prompt": "Q", "response": "A"}]
sp = self._make_sp(request_data={"history": json.dumps(history_data)})
sp.conversation_id = None
with patch("application.api.answer.services.stream_processor.limit_chat_history",
return_value=history_data):
sp._load_conversation_history()
assert sp.history == history_data
# ---- Additional coverage: _handle_compression ----
class TestHandleCompression:
def _make_sp(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"conversation_id": "conv1"},
decoded_token={"sub": "user1"},
)
return sp
@pytest.mark.unit
def test_compression_failed_uses_full_history(self):
sp = self._make_sp()
sp.compression_orchestrator = MagicMock()
result = MagicMock()
result.success = False
result.error = "Some error"
sp.compression_orchestrator.compress_if_needed.return_value = result
conversation = {
"queries": [{"prompt": "Q", "response": "A"}]
}
sp._handle_compression(conversation)
assert len(sp.history) == 1
assert sp.history[0]["prompt"] == "Q"
@pytest.mark.unit
def test_compression_performed_sets_summary(self):
sp = self._make_sp()
sp.compression_orchestrator = MagicMock()
result = MagicMock()
result.success = True
result.compression_performed = True
result.compressed_summary = "Summary text"
result.recent_queries = [{"prompt": "Q", "response": "A"}]
result.as_history.return_value = [{"prompt": "Q", "response": "A"}]
sp.compression_orchestrator.compress_if_needed.return_value = result
with patch("application.api.answer.services.stream_processor.TokenCounter") as MockTC:
MockTC.count_message_tokens.return_value = 42
sp._handle_compression({"queries": [{"prompt": "Q", "response": "A"}]})
assert sp.compressed_summary == "Summary text"
assert sp.compressed_summary_tokens == 42
@pytest.mark.unit
def test_compression_exception_falls_back(self):
sp = self._make_sp()
sp.compression_orchestrator = MagicMock()
sp.compression_orchestrator.compress_if_needed.side_effect = RuntimeError("boom")
conversation = {"queries": [{"prompt": "Q", "response": "A"}]}
sp._handle_compression(conversation)
assert len(sp.history) == 1
@pytest.mark.unit
def test_compression_not_performed_still_sets_history(self):
sp = self._make_sp()
sp.compression_orchestrator = MagicMock()
result = MagicMock()
result.success = True
result.compression_performed = False
result.compressed_summary = None
result.recent_queries = [{"prompt": "Q", "response": "A"}]
result.as_history.return_value = [{"prompt": "Q", "response": "A"}]
sp.compression_orchestrator.compress_if_needed.return_value = result
sp._handle_compression({"queries": [{"prompt": "Q", "response": "A"}]})
assert len(sp.history) == 1
assert sp.compressed_summary is None
# ---- Additional coverage: build_agent ----
class TestBuildAgent:
def _make_sp(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={},
decoded_token={"sub": "user1"},
)
return sp
@pytest.mark.unit
def test_build_agent_agentic_skips_prefetch_docs(self):
sp = self._make_sp()
sp.initialize = MagicMock()
sp.agent_config = {"agent_type": "agentic"}
sp.pre_fetch_tools = MagicMock(return_value=None)
sp.pre_fetch_docs = MagicMock()
sp.create_agent = MagicMock(return_value="agent_obj")
result = sp.build_agent("question?")
assert result == "agent_obj"
sp.pre_fetch_docs.assert_not_called()
sp.create_agent.assert_called_once_with(tools_data=None)
@pytest.mark.unit
def test_build_agent_research_skips_prefetch_docs(self):
sp = self._make_sp()
sp.initialize = MagicMock()
sp.agent_config = {"agent_type": "research"}
sp.pre_fetch_tools = MagicMock(return_value={"t": "d"})
sp.pre_fetch_docs = MagicMock()
sp.create_agent = MagicMock(return_value="agent_obj")
result = sp.build_agent("question?")
assert result == "agent_obj"
sp.pre_fetch_docs.assert_not_called()
@pytest.mark.unit
def test_build_agent_classic_calls_prefetch_docs(self):
sp = self._make_sp()
sp.initialize = MagicMock()
sp.agent_config = {"agent_type": "classic"}
sp.pre_fetch_tools = MagicMock(return_value=None)
sp.pre_fetch_docs = MagicMock(return_value=("docs_text", [{"text": "d"}]))
sp.create_agent = MagicMock(return_value="agent_obj")
result = sp.build_agent("question?")
assert result == "agent_obj"
sp.pre_fetch_docs.assert_called_once_with("question?")
sp.create_agent.assert_called_once_with(
docs_together="docs_text",
docs=[{"text": "d"}],
tools_data=None,
)
# ---------------------------------------------------------------------------
# Additional coverage: _handle_compression metadata preservation (line 219)
# ---------------------------------------------------------------------------
class TestHandleCompressionMetadataPreservation:
def _make_sp(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"conversation_id": "conv1"},
decoded_token={"sub": "user1"},
)
return sp
@pytest.mark.unit
def test_metadata_copied_from_recent_queries(self):
"""Cover line 219: entry['metadata'] = recent[qi]['metadata']."""
sp = self._make_sp()
sp.compression_orchestrator = MagicMock()
result = MagicMock()
result.success = True
result.compression_performed = True
result.compressed_summary = "Summary"
result.recent_queries = [
{"prompt": "Q1", "response": "A1", "metadata": {"tool": "search"}},
{"prompt": "Q2", "response": "A2"},
]
result.as_history.return_value = [
{"prompt": "Q1", "response": "A1"},
{"prompt": "Q2", "response": "A2"},
]
sp.compression_orchestrator.compress_if_needed.return_value = result
with patch(
"application.api.answer.services.stream_processor.TokenCounter"
) as MockTC:
MockTC.count_message_tokens.return_value = 10
sp._handle_compression(
{
"queries": [
{"prompt": "Q1", "response": "A1", "metadata": {"tool": "search"}},
{"prompt": "Q2", "response": "A2"},
]
}
)
assert sp.history[0].get("metadata") == {"tool": "search"}
assert "metadata" not in sp.history[1]
@pytest.mark.unit
def test_exception_fallback_with_metadata(self):
"""Cover lines 222, 228-232: exception path with metadata in queries."""
sp = self._make_sp()
sp.compression_orchestrator = MagicMock()
sp.compression_orchestrator.compress_if_needed.side_effect = RuntimeError("fail")
conversation = {
"queries": [
{"prompt": "Q", "response": "A", "metadata": {"key": "val"}},
{"prompt": "Q2", "response": "A2"},
]
}
sp._handle_compression(conversation)
assert len(sp.history) == 2
assert sp.history[0]["metadata"] == {"key": "val"}
assert "metadata" not in sp.history[1]
# ---------------------------------------------------------------------------
# Additional coverage: _get_data_from_api_key full path (lines 267-295, 341-358)
# ---------------------------------------------------------------------------
class TestGetDataFromApiKeyFullPaths:
def _make_sp(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
return sp
@pytest.mark.unit
def test_sources_list_with_default_entry(self):
"""Cover lines 337-343: 'default' string in sources list."""
sp = self._make_sp()
sp.agents_collection = MagicMock()
sp.agents_collection.find_one.return_value = {
"_id": "agent1",
"key": "valid_key",
"source": "default",
"sources": ["default"],
"chunks": "4",
}
data = sp._get_data_from_api_key("valid_key")
assert len(data["sources"]) == 1
assert data["sources"][0]["id"] == "default"
assert data["sources"][0]["retriever"] == "classic"
assert data["sources"][0]["chunks"] == "4"
@pytest.mark.unit
def test_sources_list_with_dbref_returns_none(self):
"""Cover lines 344-352: DBRef entry in sources where dereference returns None."""
from bson.dbref import DBRef
sp = self._make_sp()
sp.agents_collection = MagicMock()
ref1 = DBRef("sources", "missing_id")
sp.agents_collection.find_one.return_value = {
"_id": "agent1",
"key": "valid_key",
"source": "default",
"sources": [ref1],
"chunks": "2",
}
sp.db = MagicMock()
sp.db.dereference.return_value = None
data = sp._get_data_from_api_key("valid_key")
# Missing dereference means the DBRef entry is skipped
assert data["sources"] == []
@pytest.mark.unit
def test_sources_not_list_returns_empty(self):
"""Cover lines 354-355: sources is not a list."""
sp = self._make_sp()
sp.agents_collection = MagicMock()
sp.agents_collection.find_one.return_value = {
"_id": "agent1",
"key": "valid_key",
"source": "default",
"sources": "not_a_list",
}
data = sp._get_data_from_api_key("valid_key")
assert data["sources"] == []
@pytest.mark.unit
def test_default_model_id_preserved(self):
"""Cover line 357: default_model_id extracted."""
sp = self._make_sp()
sp.agents_collection = MagicMock()
sp.agents_collection.find_one.return_value = {
"_id": "agent1",
"key": "valid_key",
"source": "default",
"sources": [],
"default_model_id": "gpt-4",
}
data = sp._get_data_from_api_key("valid_key")
assert data["default_model_id"] == "gpt-4"
# ---------------------------------------------------------------------------
# Additional coverage: _load_conversation_history compression branch (lines 341-365)
# ---------------------------------------------------------------------------
class TestLoadConversationHistoryCompressionEnabled:
def _make_sp(self, request_data=None, decoded_token=None):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
mock_settings.ENABLE_CONVERSATION_COMPRESSION = True
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data=request_data or {"conversation_id": "conv1"},
decoded_token=decoded_token or {"sub": "user1"},
)
return sp
@pytest.mark.unit
def test_load_with_compression_enabled(self):
"""Cover lines 341-358: compression enabled path."""
sp = self._make_sp()
sp.conversation_service = MagicMock()
sp.conversation_service.get_conversation.return_value = {
"queries": [
{"prompt": "Q1", "response": "A1"},
{"prompt": "Q2", "response": "A2"},
]
}
sp._handle_compression = MagicMock()
with patch("application.api.answer.services.stream_processor.settings") as mock_s:
mock_s.ENABLE_CONVERSATION_COMPRESSION = True
sp._load_conversation_history()
sp._handle_compression.assert_called_once()
@pytest.mark.unit
def test_load_without_conversation_id_uses_request_history(self):
"""Cover lines 361-365: no conversation_id, loads from request."""
import json
history_data = [{"prompt": "Q", "response": "A"}]
sp = self._make_sp(request_data={"history": json.dumps(history_data)})
sp.conversation_id = None
with patch(
"application.api.answer.services.stream_processor.limit_chat_history",
return_value=history_data,
):
sp._load_conversation_history()
assert sp.history == history_data
@pytest.mark.unit
def test_load_without_user_id_uses_request_history(self):
"""Cover line 361-365: no user_id, loads from request."""
import json
history_data = [{"prompt": "Q", "response": "A"}]
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
mock_settings.ENABLE_CONVERSATION_COMPRESSION = False
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={
"conversation_id": "c1",
"history": json.dumps(history_data),
},
decoded_token=None, # This sets initial_user_id to None
)
# initial_user_id should be None because decoded_token is None
assert sp.initial_user_id is None
with patch(
"application.api.answer.services.stream_processor.limit_chat_history",
return_value=history_data,
):
sp._load_conversation_history()
assert sp.history == history_data
# ---------------------------------------------------------------------------
# Additional coverage: _handle_compression failure path with metadata (lines 376-407)
# ---------------------------------------------------------------------------
class TestHandleCompressionFailurePath:
def _make_sp(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data={"conversation_id": "conv1"},
decoded_token={"sub": "user1"},
)
return sp
@pytest.mark.unit
def test_compression_failed_with_metadata_queries(self):
"""Cover lines 376-378, 381-398: failure path with metadata in queries."""
sp = self._make_sp()
sp.compression_orchestrator = MagicMock()
result = MagicMock()
result.success = False
result.error = "compression error"
sp.compression_orchestrator.compress_if_needed.return_value = result
conversation = {
"queries": [
{"prompt": "Q1", "response": "A1", "metadata": {"m": 1}},
{"prompt": "Q2", "response": "A2"},
]
}
sp._handle_compression(conversation)
assert len(sp.history) == 2
assert sp.history[0]["metadata"] == {"m": 1}
assert "metadata" not in sp.history[1]
@pytest.mark.unit
def test_compression_success_no_compression_performed(self):
"""Cover lines 399-407: success but no compression performed, recent_queries None."""
sp = self._make_sp()
sp.compression_orchestrator = MagicMock()
result = MagicMock()
result.success = True
result.compression_performed = False
result.compressed_summary = None
result.recent_queries = None
result.as_history.return_value = [{"prompt": "Q", "response": "A"}]
sp.compression_orchestrator.compress_if_needed.return_value = result
conversation = {
"queries": [
{"prompt": "Q", "response": "A", "metadata": {"k": "v"}},
]
}
sp._handle_compression(conversation)
assert len(sp.history) == 1
# When recent_queries is None, it falls back to conversation queries
assert sp.history[0].get("metadata") == {"k": "v"}
# ---------------------------------------------------------------------------
# Additional coverage: _configure_agent full paths (lines 418-477)
# ---------------------------------------------------------------------------
class TestConfigureAgentAdditionalPaths:
def _make_sp(self, request_data=None, decoded_token=None):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
mock_settings.AGENT_NAME = "classic"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data=request_data or {},
decoded_token=decoded_token or {"sub": "user1"},
)
return sp
@pytest.mark.unit
def test_configure_agent_owner_sets_decoded_token(self):
"""Cover lines 460-461: owner (not shared, not external api_key) sets decoded_token."""
sp = self._make_sp(decoded_token={"sub": "user1"})
sp._resolve_agent_id = MagicMock(return_value="agent_id_1")
sp._get_agent_key = MagicMock(return_value=("agent_key", False, None))
sp._get_data_from_api_key = MagicMock(return_value={
"_id": "agent_id_1",
"prompt_id": "default",
"agent_type": "classic",
"key": "agent_key",
"json_schema": None,
"default_model_id": "",
"models": [],
"user": "owner_user",
})
sp._configure_agent()
# Owner path: decoded_token set to data_key user
assert sp.decoded_token == {"sub": "owner_user"}
@pytest.mark.unit
def test_configure_source_picks_up_cached_agent_data(self):
"""After _configure_agent caches _agent_data, _configure_source uses it."""
sp = self._make_sp()
sp._resolve_agent_id = MagicMock(return_value="agent_id_1")
sp._get_agent_key = MagicMock(return_value=("agent_key", False, None))
sp._get_data_from_api_key = MagicMock(return_value={
"_id": "agent_id_1",
"prompt_id": "default",
"agent_type": "classic",
"key": "agent_key",
"json_schema": None,
"default_model_id": "",
"models": [],
"user": "user1",
"source": "my_source",
})
sp._configure_agent()
sp._configure_source()
assert sp.source == {"active_docs": "my_source"}
@pytest.mark.unit
def test_configure_agent_without_id_in_data_key(self):
"""Cover line 437-438: data_key has no _id."""
sp = self._make_sp(request_data={"api_key": "ext_key"})
sp._resolve_agent_id = MagicMock(return_value=None)
sp._get_agent_key = MagicMock(return_value=(None, False, None))
sp._get_data_from_api_key = MagicMock(return_value={
"prompt_id": "default",
"agent_type": "classic",
"key": "ext_key",
"json_schema": None,
"default_model_id": "",
"models": [],
"user": "ext_owner",
})
sp._configure_agent()
# agent_id should not be updated when _id is missing
assert sp.agent_id is None
@pytest.mark.unit
def test_configure_agent_chunks_none_is_skipped(self):
"""Cover line 470: chunks is None (no chunks key at all)."""
sp = self._make_sp()
sp._resolve_agent_id = MagicMock(return_value="agent_id_1")
sp._get_agent_key = MagicMock(return_value=("agent_key", False, None))
sp._get_data_from_api_key = MagicMock(return_value={
"_id": "agent_id_1",
"prompt_id": "default",
"agent_type": "classic",
"key": "agent_key",
"json_schema": None,
"default_model_id": "",
"models": [],
"user": "user1",
# no "chunks" key at all
})
sp._configure_agent()
# chunks should not be in retriever_config since we skipped that branch
assert "chunks" not in sp.retriever_config
# ---------------------------------------------------------------------------
# Additional coverage: _configure_agent else branch (lines 481-497)
# ---------------------------------------------------------------------------
class TestConfigureAgentElseBranch:
def _make_sp(self, request_data=None, decoded_token=None):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
mock_settings.AGENT_NAME = "classic"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(
request_data=request_data or {},
decoded_token=decoded_token or {"sub": "user1"},
)
return sp
@pytest.mark.unit
def test_no_key_no_workflow_defaults(self):
"""Cover lines 480-497: no effective key, no workflow."""
sp = self._make_sp(request_data={"prompt_id": "creative"})
sp._resolve_agent_id = MagicMock(return_value=None)
sp._get_agent_key = MagicMock(return_value=(None, False, None))
with patch("application.api.answer.services.stream_processor.settings") as mock_s:
mock_s.AGENT_NAME = "classic"
sp._configure_agent()
assert sp.agent_config["agent_type"] == "classic"
assert sp.agent_config["prompt_id"] == "creative"
assert sp.agent_config["user_api_key"] is None
assert sp.agent_config["json_schema"] is None
assert sp.agent_config["default_model_id"] == ""
@pytest.mark.unit
def test_no_key_with_workflow_dict(self):
"""Cover lines 481-487: workflow dict in request data."""
wf = {"nodes": [{"id": "n1"}], "edges": []}
sp = self._make_sp(
request_data={"workflow": wf},
decoded_token={"sub": "wf_user"},
)
sp._resolve_agent_id = MagicMock(return_value=None)
sp._get_agent_key = MagicMock(return_value=(None, False, None))
with patch("application.api.answer.services.stream_processor.settings") as mock_s:
mock_s.AGENT_NAME = "classic"
sp._configure_agent()
assert sp.agent_config["agent_type"] == "workflow"
assert sp.agent_config["workflow"] == wf
assert sp.agent_config["workflow_owner"] == "wf_user"
@pytest.mark.unit
def test_no_key_workflow_not_dict_ignored(self):
"""Cover lines 481-482: workflow in request but not a dict."""
sp = self._make_sp(request_data={"workflow": "string_workflow"})
sp._resolve_agent_id = MagicMock(return_value=None)
sp._get_agent_key = MagicMock(return_value=(None, False, None))
with patch("application.api.answer.services.stream_processor.settings") as mock_s:
mock_s.AGENT_NAME = "classic"
sp._configure_agent()
assert sp.agent_config["agent_type"] == "classic"
assert "workflow" not in sp.agent_config
# ---------------------------------------------------------------------------
# Additional coverage: create_retriever (lines 512-524)
# ---------------------------------------------------------------------------
class TestCreateRetriever:
def _make_sp(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
return sp
@pytest.mark.unit
def test_create_retriever_calls_creator(self):
"""Cover lines 512-524: create_retriever delegates to RetrieverCreator."""
sp = self._make_sp()
sp.retriever_config = {
"retriever_name": "classic",
"chunks": 2,
"doc_token_limit": 50000,
}
sp.agent_config = {"prompt_id": "default", "user_api_key": None}
sp.source = {}
sp.history = []
sp.model_id = "test-model"
sp.agent_id = None
sp.decoded_token = {"sub": "u"}
mock_retriever = MagicMock()
with patch(
"application.api.answer.services.stream_processor.RetrieverCreator.create_retriever",
return_value=mock_retriever,
) as mock_create:
result = sp.create_retriever()
assert result is mock_retriever
mock_create.assert_called_once()
# ---------------------------------------------------------------------------
# Additional coverage: _validate_and_set_model edge cases (lines 259-295)
# ---------------------------------------------------------------------------
class TestValidateAndSetModelEdgeCases:
def _make_sp(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
return sp
@pytest.mark.unit
def test_invalid_model_with_exactly_5_models(self):
"""Cover lines 272-276: exactly 5 available models (no 'and N more')."""
sp = self._make_sp()
sp.data = {"model_id": "bad-model"}
mock_registry_instance = MagicMock()
models = [MagicMock(id=f"model-{i}") for i in range(5)]
mock_registry_instance.get_enabled_models.return_value = models
with patch(
"application.api.answer.services.stream_processor.validate_model_id",
return_value=False,
), patch(
"application.core.model_settings.ModelRegistry.get_instance",
return_value=mock_registry_instance,
):
with pytest.raises(ValueError) as exc_info:
sp._validate_and_set_model()
assert "and" not in str(exc_info.value) or "more" not in str(exc_info.value)
@pytest.mark.unit
def test_no_requested_no_agent_default(self):
"""Cover lines 283-284: no requested model, no agent default model."""
sp = self._make_sp()
sp.data = {}
sp.agent_config = {} # no default_model_id key at all
with patch(
"application.api.answer.services.stream_processor.validate_model_id",
return_value=False,
), patch(
"application.api.answer.services.stream_processor.get_default_model_id",
return_value="global-fallback",
):
sp._validate_and_set_model()
assert sp.model_id == "global-fallback"
# ---------------------------------------------------------------------------
# Additional coverage: _get_agent_key edge cases (lines 228-251)
# ---------------------------------------------------------------------------
class TestGetAgentKeyEdgeCases:
def _make_sp(self):
mock_db = MagicMock()
with patch("application.api.answer.services.stream_processor.MongoDB") as MockMongo, \
patch("application.api.answer.services.stream_processor.settings") as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import StreamProcessor
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
return sp
@pytest.mark.unit
def test_agent_found_shared_publicly_no_shared_token(self):
"""Cover lines 249-251: shared publicly, no shared_token key."""
sp = self._make_sp()
sp.agents_collection = MagicMock()
sp.agents_collection.find_one.return_value = {
"_id": "507f1f77bcf86cd799439011",
"user": "owner",
"shared_publicly": True,
"shared_with": [],
"key": "the_key",
# no shared_token key
}
key, is_shared, shared_token = sp._get_agent_key(
"507f1f77bcf86cd799439011", "other_user"
)
assert key == "the_key"
assert is_shared is True
assert shared_token is None
@pytest.mark.unit
def test_agent_find_raises_exception(self):
"""Cover lines 228-232: ObjectId conversion or DB lookup fails."""
sp = self._make_sp()
sp.agents_collection = MagicMock()
sp.agents_collection.find_one.side_effect = Exception("DB connection lost")
with pytest.raises(Exception, match="DB connection lost"):
sp._get_agent_key("507f1f77bcf86cd799439011", "user1")
# ---------------------------------------------------------------------------
# Additional coverage: pre_fetch_docs full paths (lines 540-560)
# ---------------------------------------------------------------------------
class TestPreFetchDocsFullPaths:
def _make_sp(self):
mock_db = MagicMock()
with patch(
"application.api.answer.services.stream_processor.MongoDB"
) as MockMongo, patch(
"application.api.answer.services.stream_processor.settings"
) as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import (
StreamProcessor,
)
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
sp.agent_config = {"prompt_id": "default", "user_api_key": None}
sp.retriever_config = {
"retriever_name": "classic",
"chunks": 2,
"doc_token_limit": 50000,
}
sp.source = {"active_docs": ["src1"]}
sp.model_id = "test-model"
sp.agent_id = None
return sp
@pytest.mark.unit
def test_no_docs_returned(self):
"""Cover lines 540-541: search returns empty list."""
sp = self._make_sp()
mock_retriever = MagicMock()
mock_retriever.search.return_value = []
sp.create_retriever = MagicMock(return_value=mock_retriever)
result = sp.pre_fetch_docs("question?")
assert result == (None, None)
@pytest.mark.unit
def test_docs_with_filename(self):
"""Cover lines 548-549: doc has filename, builds chunk header."""
sp = self._make_sp()
mock_retriever = MagicMock()
mock_retriever.search.return_value = [
{"text": "content1", "filename": "file1.md"},
]
sp.create_retriever = MagicMock(return_value=mock_retriever)
docs_together, docs = sp.pre_fetch_docs("question?")
assert docs_together is not None
assert "file1.md" in docs_together
assert "content1" in docs_together
assert len(docs) == 1
@pytest.mark.unit
def test_docs_without_filename(self):
"""Cover lines 550-551: doc has no filename/title/source."""
sp = self._make_sp()
mock_retriever = MagicMock()
mock_retriever.search.return_value = [
{"text": "raw content only"},
]
sp.create_retriever = MagicMock(return_value=mock_retriever)
docs_together, docs = sp.pre_fetch_docs("question?")
assert docs_together == "raw content only"
assert len(docs) == 1
@pytest.mark.unit
def test_docs_with_title_fallback(self):
"""Cover line 546: filename is None but title is present."""
sp = self._make_sp()
mock_retriever = MagicMock()
mock_retriever.search.return_value = [
{"text": "content", "title": "My Title"},
]
sp.create_retriever = MagicMock(return_value=mock_retriever)
docs_together, docs = sp.pre_fetch_docs("question?")
assert "My Title" in docs_together
@pytest.mark.unit
def test_docs_successful_return(self):
"""Cover lines 555-556: successful return of docs_together and docs."""
sp = self._make_sp()
mock_retriever = MagicMock()
mock_retriever.search.return_value = [
{"text": "a", "filename": "f1"},
{"text": "b"},
]
sp.create_retriever = MagicMock(return_value=mock_retriever)
docs_together, docs = sp.pre_fetch_docs("question?")
assert docs_together is not None
assert docs is not None
assert len(docs) == 2
assert sp.retrieved_docs == docs
@pytest.mark.unit
def test_exception_returns_none(self):
"""Cover lines 559-560: exception during pre_fetch_docs."""
sp = self._make_sp()
sp.create_retriever = MagicMock(side_effect=RuntimeError("retriever error"))
result = sp.pre_fetch_docs("question?")
assert result == (None, None)
# ---------------------------------------------------------------------------
# Additional coverage: pre_fetch_tools full paths (lines 566-614)
# ---------------------------------------------------------------------------
class TestPreFetchToolsFullPaths:
def _make_sp(self):
mock_db = MagicMock()
with patch(
"application.api.answer.services.stream_processor.MongoDB"
) as MockMongo, patch(
"application.api.answer.services.stream_processor.settings"
) as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
mock_settings.ENABLE_TOOL_PREFETCH = True
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import (
StreamProcessor,
)
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
return sp
@pytest.mark.unit
def test_tool_prefetch_disabled_globally(self):
"""Cover lines 566-567: ENABLE_TOOL_PREFETCH is False."""
sp = self._make_sp()
with patch(
"application.api.answer.services.stream_processor.settings"
) as mock_s:
mock_s.ENABLE_TOOL_PREFETCH = False
result = sp.pre_fetch_tools()
assert result is None
@pytest.mark.unit
def test_tool_prefetch_disabled_per_request(self):
"""Cover lines 570-571: disable_tool_prefetch in request data."""
sp = self._make_sp()
sp.data = {"disable_tool_prefetch": True}
with patch(
"application.api.answer.services.stream_processor.settings"
) as mock_s:
mock_s.ENABLE_TOOL_PREFETCH = True
result = sp.pre_fetch_tools()
assert result is None
@pytest.mark.unit
def test_no_user_tools_returns_none(self):
"""Cover lines 576-585: no user tools found in DB."""
sp = self._make_sp()
sp.data = {}
sp._get_required_tool_actions = MagicMock(return_value=None)
mock_user_tools_collection = MagicMock()
mock_user_tools_collection.find.return_value = []
sp.db = MagicMock()
sp.db.__getitem__ = MagicMock(return_value=mock_user_tools_collection)
with patch(
"application.api.answer.services.stream_processor.settings"
) as mock_s:
mock_s.ENABLE_TOOL_PREFETCH = True
result = sp.pre_fetch_tools()
assert result is None
@pytest.mark.unit
def test_tools_found_no_filtering(self):
"""Cover lines 586-611: tools found, no filtering enabled."""
sp = self._make_sp()
sp.data = {}
sp._get_required_tool_actions = MagicMock(return_value=None)
tool_doc = {"_id": "tool1", "name": "my_tool", "config": {}}
mock_user_tools_collection = MagicMock()
mock_user_tools_collection.find.return_value = [tool_doc]
sp.db = MagicMock()
sp.db.__getitem__ = MagicMock(return_value=mock_user_tools_collection)
sp._fetch_tool_data = MagicMock(return_value={"action1": "result1"})
with patch(
"application.api.answer.services.stream_processor.settings"
) as mock_s:
mock_s.ENABLE_TOOL_PREFETCH = True
result = sp.pre_fetch_tools()
assert result is not None
assert "my_tool" in result
assert "tool1" in result
sp._fetch_tool_data.assert_called_once_with(tool_doc, None)
@pytest.mark.unit
def test_tools_found_with_filtering_matching(self):
"""Cover lines 593-602: filtering enabled, tool matches."""
sp = self._make_sp()
sp.data = {}
sp._get_required_tool_actions = MagicMock(
return_value={"my_tool": {"action1"}}
)
tool_doc = {"_id": "tool1", "name": "my_tool", "config": {}}
mock_user_tools_collection = MagicMock()
mock_user_tools_collection.find.return_value = [tool_doc]
sp.db = MagicMock()
sp.db.__getitem__ = MagicMock(return_value=mock_user_tools_collection)
sp._fetch_tool_data = MagicMock(return_value={"action1": "result1"})
with patch(
"application.api.answer.services.stream_processor.settings"
) as mock_s:
mock_s.ENABLE_TOOL_PREFETCH = True
result = sp.pre_fetch_tools()
assert result is not None
assert "my_tool" in result
@pytest.mark.unit
def test_tools_found_with_filtering_no_match(self):
"""Cover lines 601-602: filtering enabled, tool not in required."""
sp = self._make_sp()
sp.data = {}
sp._get_required_tool_actions = MagicMock(
return_value={"other_tool": {"action1"}}
)
tool_doc = {"_id": "tool1", "name": "my_tool", "config": {}}
mock_user_tools_collection = MagicMock()
mock_user_tools_collection.find.return_value = [tool_doc]
sp.db = MagicMock()
sp.db.__getitem__ = MagicMock(return_value=mock_user_tools_collection)
with patch(
"application.api.answer.services.stream_processor.settings"
) as mock_s:
mock_s.ENABLE_TOOL_PREFETCH = True
result = sp.pre_fetch_tools()
assert result is None
@pytest.mark.unit
def test_fetch_tool_data_returns_none_skipped(self):
"""Cover lines 606-611: _fetch_tool_data returns None, tools_data empty."""
sp = self._make_sp()
sp.data = {}
sp._get_required_tool_actions = MagicMock(return_value=None)
tool_doc = {"_id": "tool1", "name": "my_tool", "config": {}}
mock_user_tools_collection = MagicMock()
mock_user_tools_collection.find.return_value = [tool_doc]
sp.db = MagicMock()
sp.db.__getitem__ = MagicMock(return_value=mock_user_tools_collection)
sp._fetch_tool_data = MagicMock(return_value=None)
with patch(
"application.api.answer.services.stream_processor.settings"
) as mock_s:
mock_s.ENABLE_TOOL_PREFETCH = True
result = sp.pre_fetch_tools()
assert result is None
@pytest.mark.unit
def test_exception_returns_none(self):
"""Cover lines 612-614: exception during pre_fetch_tools."""
sp = self._make_sp()
sp.data = {}
sp._get_required_tool_actions = MagicMock(return_value=None)
sp.db = MagicMock()
sp.db.__getitem__ = MagicMock(side_effect=RuntimeError("DB error"))
with patch(
"application.api.answer.services.stream_processor.settings"
) as mock_s:
mock_s.ENABLE_TOOL_PREFETCH = True
result = sp.pre_fetch_tools()
assert result is None
@pytest.mark.unit
def test_tools_filtering_by_id(self):
"""Cover lines 597: required_actions matched by tool_id."""
sp = self._make_sp()
sp.data = {}
sp._get_required_tool_actions = MagicMock(
return_value={"tool1": {"action1"}}
)
tool_doc = {"_id": "tool1", "name": "my_tool", "config": {}}
mock_user_tools_collection = MagicMock()
mock_user_tools_collection.find.return_value = [tool_doc]
sp.db = MagicMock()
sp.db.__getitem__ = MagicMock(return_value=mock_user_tools_collection)
sp._fetch_tool_data = MagicMock(return_value={"action1": "result1"})
with patch(
"application.api.answer.services.stream_processor.settings"
) as mock_s:
mock_s.ENABLE_TOOL_PREFETCH = True
result = sp.pre_fetch_tools()
assert result is not None
assert "tool1" in result
# ---------------------------------------------------------------------------
# Additional coverage: _fetch_tool_data full paths (lines 619-704)
# ---------------------------------------------------------------------------
class TestFetchToolDataFullPaths:
def _make_sp(self):
mock_db = MagicMock()
with patch(
"application.api.answer.services.stream_processor.MongoDB"
) as MockMongo, patch(
"application.api.answer.services.stream_processor.settings"
) as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import (
StreamProcessor,
)
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
return sp
@pytest.mark.unit
def test_tool_fails_to_load(self):
"""Cover lines 633-635: tool_manager.load_tool returns None."""
sp = self._make_sp()
tool_doc = {"_id": "t1", "name": "my_tool", "config": {}}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_manager = MagicMock()
mock_manager.load_tool.return_value = None
MockTM.return_value = mock_manager
result = sp._fetch_tool_data(tool_doc, None)
assert result is None
@pytest.mark.unit
def test_tool_no_actions_metadata(self):
"""Cover lines 637-640: tool has no actions metadata."""
sp = self._make_sp()
tool_doc = {"_id": "t1", "name": "my_tool", "config": {}}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_tool = MagicMock()
mock_tool.get_actions_metadata.return_value = []
mock_manager = MagicMock()
mock_manager.load_tool.return_value = mock_tool
MockTM.return_value = mock_manager
result = sp._fetch_tool_data(tool_doc, None)
assert result is None
@pytest.mark.unit
def test_include_all_actions_when_required_none(self):
"""Cover lines 644-651, 693-695, 700-701: required_actions=None
means include all actions."""
sp = self._make_sp()
tool_doc = {
"_id": "t1",
"name": "my_tool",
"config": {},
"actions": [],
}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_tool = MagicMock()
mock_tool.get_actions_metadata.return_value = [
{
"name": "action1",
"parameters": {"properties": {}},
}
]
mock_tool.execute_action.return_value = "result1"
mock_manager = MagicMock()
mock_manager.load_tool.return_value = mock_tool
MockTM.return_value = mock_manager
result = sp._fetch_tool_data(tool_doc, None)
assert result is not None
assert result["action1"] == "result1"
@pytest.mark.unit
def test_include_all_actions_when_none_in_required(self):
"""Cover lines 644-645: required_actions contains None,
so include_all_actions is True."""
sp = self._make_sp()
tool_doc = {
"_id": "t1",
"name": "my_tool",
"config": {},
"actions": [],
}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_tool = MagicMock()
mock_tool.get_actions_metadata.return_value = [
{
"name": "action1",
"parameters": {"properties": {}},
}
]
mock_tool.execute_action.return_value = "result_all"
mock_manager = MagicMock()
mock_manager.load_tool.return_value = mock_tool
MockTM.return_value = mock_manager
result = sp._fetch_tool_data(tool_doc, {None, "action1"})
assert result is not None
assert result["action1"] == "result_all"
@pytest.mark.unit
def test_filter_actions_by_allowed_set(self):
"""Cover lines 658-663: action not in allowed_actions is skipped."""
sp = self._make_sp()
tool_doc = {
"_id": "t1",
"name": "my_tool",
"config": {},
"actions": [],
}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_tool = MagicMock()
mock_tool.get_actions_metadata.return_value = [
{
"name": "action1",
"parameters": {"properties": {}},
},
{
"name": "action2",
"parameters": {"properties": {}},
},
]
mock_tool.execute_action.return_value = "filtered_result"
mock_manager = MagicMock()
mock_manager.load_tool.return_value = mock_tool
MockTM.return_value = mock_manager
# Only action1 is required; action2 should be skipped
result = sp._fetch_tool_data(tool_doc, {"action1"})
assert result is not None
assert "action1" in result
assert "action2" not in result
mock_tool.execute_action.assert_called_once_with("action1")
@pytest.mark.unit
def test_action_name_none_skipped(self):
"""Cover lines 655-657: action_meta with name=None is skipped."""
sp = self._make_sp()
tool_doc = {
"_id": "t1",
"name": "my_tool",
"config": {},
"actions": [],
}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_tool = MagicMock()
mock_tool.get_actions_metadata.return_value = [
{"name": None, "parameters": {"properties": {}}},
{"name": "action1", "parameters": {"properties": {}}},
]
mock_tool.execute_action.return_value = "result1"
mock_manager = MagicMock()
mock_manager.load_tool.return_value = mock_tool
MockTM.return_value = mock_manager
result = sp._fetch_tool_data(tool_doc, None)
assert result is not None
assert "action1" in result
mock_tool.execute_action.assert_called_once_with("action1")
@pytest.mark.unit
def test_kwargs_from_saved_action(self):
"""Cover lines 666-685: kwargs populated from saved_action parameters."""
sp = self._make_sp()
tool_doc = {
"_id": "t1",
"name": "my_tool",
"config": {},
"actions": [
{
"name": "action1",
"parameters": {
"properties": {
"param1": {"value": "saved_value"},
}
},
}
],
}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_tool = MagicMock()
mock_tool.get_actions_metadata.return_value = [
{
"name": "action1",
"parameters": {
"properties": {
"param1": {"type": "string"},
}
},
}
]
mock_tool.execute_action.return_value = "saved_result"
mock_manager = MagicMock()
mock_manager.load_tool.return_value = mock_tool
MockTM.return_value = mock_manager
result = sp._fetch_tool_data(tool_doc, None)
assert result is not None
mock_tool.execute_action.assert_called_once_with(
"action1", param1="saved_value"
)
@pytest.mark.unit
def test_kwargs_from_tool_config(self):
"""Cover lines 687-688: param found in tool_config."""
sp = self._make_sp()
tool_doc = {
"_id": "t1",
"name": "my_tool",
"config": {"param1": "config_value"},
"actions": [],
}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_tool = MagicMock()
mock_tool.get_actions_metadata.return_value = [
{
"name": "action1",
"parameters": {
"properties": {
"param1": {"type": "string"},
}
},
}
]
mock_tool.execute_action.return_value = "config_result"
mock_manager = MagicMock()
mock_manager.load_tool.return_value = mock_tool
MockTM.return_value = mock_manager
result = sp._fetch_tool_data(tool_doc, None)
assert result is not None
mock_tool.execute_action.assert_called_once_with(
"action1", param1="config_value"
)
@pytest.mark.unit
def test_kwargs_from_default(self):
"""Cover lines 689-690: param has default in param_spec."""
sp = self._make_sp()
tool_doc = {
"_id": "t1",
"name": "my_tool",
"config": {},
"actions": [],
}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_tool = MagicMock()
mock_tool.get_actions_metadata.return_value = [
{
"name": "action1",
"parameters": {
"properties": {
"param1": {
"type": "string",
"default": "default_value",
},
}
},
}
]
mock_tool.execute_action.return_value = "default_result"
mock_manager = MagicMock()
mock_manager.load_tool.return_value = mock_tool
MockTM.return_value = mock_manager
result = sp._fetch_tool_data(tool_doc, None)
assert result is not None
mock_tool.execute_action.assert_called_once_with(
"action1", param1="default_value"
)
@pytest.mark.unit
def test_action_execution_exception_continues(self):
"""Cover lines 694-698: action execution raises, continues to next."""
sp = self._make_sp()
tool_doc = {
"_id": "t1",
"name": "my_tool",
"config": {},
"actions": [],
}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_tool = MagicMock()
mock_tool.get_actions_metadata.return_value = [
{"name": "bad_action", "parameters": {"properties": {}}},
{"name": "good_action", "parameters": {"properties": {}}},
]
mock_tool.execute_action.side_effect = [
RuntimeError("boom"),
"good_result",
]
mock_manager = MagicMock()
mock_manager.load_tool.return_value = mock_tool
MockTM.return_value = mock_manager
result = sp._fetch_tool_data(tool_doc, None)
assert result is not None
assert "good_action" in result
assert "bad_action" not in result
@pytest.mark.unit
def test_all_actions_fail_returns_none(self):
"""Cover line 700: action_results empty after all failures."""
sp = self._make_sp()
tool_doc = {
"_id": "t1",
"name": "my_tool",
"config": {},
"actions": [],
}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_tool = MagicMock()
mock_tool.get_actions_metadata.return_value = [
{"name": "action1", "parameters": {"properties": {}}},
]
mock_tool.execute_action.side_effect = RuntimeError("fail")
mock_manager = MagicMock()
mock_manager.load_tool.return_value = mock_tool
MockTM.return_value = mock_manager
result = sp._fetch_tool_data(tool_doc, None)
assert result is None
@pytest.mark.unit
def test_outer_exception_returns_none(self):
"""Cover lines 702-704: outer exception in _fetch_tool_data."""
sp = self._make_sp()
tool_doc = {"_id": "t1", "name": "my_tool", "config": {}}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
MockTM.side_effect = RuntimeError("import error")
result = sp._fetch_tool_data(tool_doc, None)
assert result is None
@pytest.mark.unit
def test_saved_action_value_none_falls_through(self):
"""Cover lines 682-684: saved_action param_value is None,
falls through to tool_config/default."""
sp = self._make_sp()
tool_doc = {
"_id": "t1",
"name": "my_tool",
"config": {"param1": "config_fallback"},
"actions": [
{
"name": "action1",
"parameters": {
"properties": {
"param1": {"value": None},
}
},
}
],
}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_tool = MagicMock()
mock_tool.get_actions_metadata.return_value = [
{
"name": "action1",
"parameters": {
"properties": {
"param1": {"type": "string"},
}
},
}
]
mock_tool.execute_action.return_value = "fallback_result"
mock_manager = MagicMock()
mock_manager.load_tool.return_value = mock_tool
MockTM.return_value = mock_manager
result = sp._fetch_tool_data(tool_doc, None)
assert result is not None
# Should fall through to tool_config value
mock_tool.execute_action.assert_called_once_with(
"action1", param1="config_fallback"
)
@pytest.mark.unit
def test_saved_action_param_not_in_saved_props(self):
"""Cover lines 677-680: saved_action exists but param not
in saved_props, falls to tool_config."""
sp = self._make_sp()
tool_doc = {
"_id": "t1",
"name": "my_tool",
"config": {"param1": "from_config"},
"actions": [
{
"name": "action1",
"parameters": {
"properties": {
"other_param": {"value": "other_val"},
}
},
}
],
}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_tool = MagicMock()
mock_tool.get_actions_metadata.return_value = [
{
"name": "action1",
"parameters": {
"properties": {
"param1": {"type": "string"},
}
},
}
]
mock_tool.execute_action.return_value = "result"
mock_manager = MagicMock()
mock_manager.load_tool.return_value = mock_tool
MockTM.return_value = mock_manager
result = sp._fetch_tool_data(tool_doc, None)
assert result is not None
mock_tool.execute_action.assert_called_once_with(
"action1", param1="from_config"
)
@pytest.mark.unit
def test_no_saved_action_no_config_no_default(self):
"""Cover lines 676-690: param not in saved_action, not in
tool_config, no default => kwargs empty for that param."""
sp = self._make_sp()
tool_doc = {
"_id": "t1",
"name": "my_tool",
"config": {},
"actions": [],
}
with patch(
"application.agents.tools.tool_manager.ToolManager"
) as MockTM:
mock_tool = MagicMock()
mock_tool.get_actions_metadata.return_value = [
{
"name": "action1",
"parameters": {
"properties": {
"param1": {"type": "string"},
}
},
}
]
mock_tool.execute_action.return_value = "no_param_result"
mock_manager = MagicMock()
mock_manager.load_tool.return_value = mock_tool
MockTM.return_value = mock_manager
result = sp._fetch_tool_data(tool_doc, None)
assert result is not None
# param1 has no source, so kwargs should be empty
mock_tool.execute_action.assert_called_once_with("action1")
# ---------------------------------------------------------------------------
# Additional coverage: _get_prompt_content exception branch (lines 722-724),
# _get_required_tool_actions extraction + error (lines 740-750),
# _fetch_memory_tool_data (lines 754-755, 759-760, 764-765, 769-771, 775-776),
# create_agent (lines 779-806, 811-822)
# ---------------------------------------------------------------------------
@pytest.mark.unit
class TestGetPromptContentGenericException:
"""Cover lines 722-724: generic exception in _get_prompt_content."""
def _make_sp(self):
mock_db = MagicMock()
with patch(
"application.api.answer.services.stream_processor.MongoDB"
) as MockMongo, patch(
"application.api.answer.services.stream_processor.settings"
) as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import (
StreamProcessor,
)
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
return sp
def test_generic_exception_sets_none(self):
sp = self._make_sp()
sp.agent_config = {"prompt_id": "some_prompt"}
sp._prompt_content = None
with patch(
"application.api.answer.services.stream_processor.get_prompt",
side_effect=RuntimeError("DB down"),
):
result = sp._get_prompt_content()
assert result is None
assert sp._prompt_content is None
@pytest.mark.unit
class TestGetRequiredToolActionsExtract:
"""Cover lines 740-750: TemplateEngine extraction + exception."""
def _make_sp(self):
mock_db = MagicMock()
with patch(
"application.api.answer.services.stream_processor.MongoDB"
) as MockMongo, patch(
"application.api.answer.services.stream_processor.settings"
) as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import (
StreamProcessor,
)
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
return sp
def test_template_engine_extraction_success(self):
sp = self._make_sp()
sp._required_tool_actions = None
sp._get_prompt_content = MagicMock(
return_value="Hello {{tool.action}} world"
)
mock_engine = MagicMock()
mock_engine.extract_tool_usages.return_value = {"tool": {"action"}}
with patch(
"application.templates.template_engine.TemplateEngine",
return_value=mock_engine,
):
result = sp._get_required_tool_actions()
assert result == {"tool": {"action"}}
def test_template_engine_extraction_exception(self):
sp = self._make_sp()
sp._required_tool_actions = None
sp._get_prompt_content = MagicMock(
return_value="Hello {{tool.action}} world"
)
with patch(
"application.templates.template_engine.TemplateEngine",
side_effect=RuntimeError("import err"),
):
result = sp._get_required_tool_actions()
assert result == {}
@pytest.mark.unit
class TestFetchMemoryToolData:
"""Cover lines 754-755, 759-760, 764-765, 769-771."""
def _make_sp(self):
mock_db = MagicMock()
with patch(
"application.api.answer.services.stream_processor.MongoDB"
) as MockMongo, patch(
"application.api.answer.services.stream_processor.settings"
) as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import (
StreamProcessor,
)
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
return sp
def test_memory_tool_success(self):
"""Cover lines 759-760, 764, 769: success path returning data."""
sp = self._make_sp()
tool_doc = {"_id": "t1", "config": {"key": "val"}}
mock_memory_tool = MagicMock()
mock_memory_tool.execute_action.return_value = "root content here"
with patch(
"application.agents.tools.memory.MemoryTool",
return_value=mock_memory_tool,
):
result = sp._fetch_memory_tool_data(tool_doc)
assert result == {"root": "root content here", "available": True}
def test_memory_tool_error_in_view(self):
"""Cover lines 764-766: view returns error string."""
sp = self._make_sp()
tool_doc = {"_id": "t1", "config": {}}
mock_memory_tool = MagicMock()
mock_memory_tool.execute_action.return_value = "Error: no data"
with patch(
"application.agents.tools.memory.MemoryTool",
return_value=mock_memory_tool,
):
result = sp._fetch_memory_tool_data(tool_doc)
assert result is None
def test_memory_tool_empty_view(self):
"""Cover line 766: empty root_view."""
sp = self._make_sp()
tool_doc = {"_id": "t1", "config": {}}
mock_memory_tool = MagicMock()
mock_memory_tool.execute_action.return_value = " "
with patch(
"application.agents.tools.memory.MemoryTool",
return_value=mock_memory_tool,
):
result = sp._fetch_memory_tool_data(tool_doc)
assert result is None
def test_memory_tool_exception(self):
"""Cover lines 770-771: exception returns None."""
sp = self._make_sp()
tool_doc = {"_id": "t1", "config": {}}
with patch(
"application.agents.tools.memory.MemoryTool",
side_effect=RuntimeError("fail"),
):
result = sp._fetch_memory_tool_data(tool_doc)
assert result is None
@pytest.mark.unit
class TestCreateAgentPaths:
"""Cover lines 779-806, 811-816, 820-822: create_agent various prompt paths."""
def _make_sp(self):
mock_db = MagicMock()
with patch(
"application.api.answer.services.stream_processor.MongoDB"
) as MockMongo, patch(
"application.api.answer.services.stream_processor.settings"
) as mock_settings:
mock_settings.MONGO_DB_NAME = "docsgpt"
mock_settings.LLM_PROVIDER = "openai"
MockMongo.get_client.return_value = {"docsgpt": mock_db}
from application.api.answer.services.stream_processor import (
StreamProcessor,
)
sp = StreamProcessor(request_data={}, decoded_token={"sub": "u"})
return sp
def test_create_agent_agentic_preset(self):
"""Cover lines 786-796: raw_prompt is None, agentic preset path."""
sp = self._make_sp()
sp._prompt_content = None
sp._get_prompt_content = MagicMock(return_value=None)
sp.agent_config = {
"agent_type": "agentic",
"prompt_id": "default",
"user_api_key": None,
"models": ["m1", "m2"],
}
sp.model_id = "m1"
sp.prompt_renderer = MagicMock()
sp.prompt_renderer.render_prompt.return_value = "rendered"
sp.data = {}
sp.history = []
sp.retrieved_docs = []
sp.attachments = []
sp.source = {}
sp.retriever_config = {}
sp.conversation_id = None
mock_llm = MagicMock()
mock_handler = MagicMock()
mock_agent = MagicMock()
with patch(
"application.api.answer.services.stream_processor.get_prompt",
return_value="agentic prompt",
) as mock_gp, patch(
"application.api.answer.services.stream_processor.get_provider_from_model_id",
return_value="openai",
), patch(
"application.api.answer.services.stream_processor.get_api_key_for_provider",
return_value="key",
), patch(
"application.api.answer.services.stream_processor.settings"
) as mock_s, patch(
"application.llm.llm_creator.LLMCreator.create_llm",
return_value=mock_llm,
), patch(
"application.llm.handlers.handler_creator.LLMHandlerCreator.create_handler",
return_value=mock_handler,
), patch(
"application.agents.agent_creator.AgentCreator.create_agent",
return_value=mock_agent,
):
mock_s.LLM_PROVIDER = "openai"
sp.create_agent(docs_together="docs", docs=[], tools_data={})
# Verify agentic_default prompt was requested
mock_gp.assert_any_call("agentic_default", sp.prompts_collection)
def test_create_agent_non_agentic_no_prompt(self):
"""Cover lines 794-796: non-agentic agent, raw_prompt None, uses normal preset."""
sp = self._make_sp()
sp._prompt_content = None
sp._get_prompt_content = MagicMock(return_value=None)
sp.agent_config = {
"agent_type": "classic",
"prompt_id": "default",
"user_api_key": None,
"models": [],
}
sp.model_id = None
sp.prompt_renderer = MagicMock()
sp.prompt_renderer.render_prompt.return_value = "rendered"
sp.data = {}
sp.history = []
sp.retrieved_docs = []
sp.attachments = []
sp.source = {}
sp.retriever_config = {}
sp.conversation_id = None
sp.decoded_token = {"sub": "u"}
mock_llm = MagicMock()
mock_handler = MagicMock()
mock_agent = MagicMock()
with patch(
"application.api.answer.services.stream_processor.get_prompt",
return_value="normal prompt",
) as mock_gp, patch(
"application.api.answer.services.stream_processor.get_provider_from_model_id",
return_value=None,
), patch(
"application.api.answer.services.stream_processor.get_api_key_for_provider",
return_value="key",
), patch(
"application.api.answer.services.stream_processor.settings"
) as mock_s, patch(
"application.llm.llm_creator.LLMCreator.create_llm",
return_value=mock_llm,
), patch(
"application.llm.handlers.handler_creator.LLMHandlerCreator.create_handler",
return_value=mock_handler,
), patch(
"application.agents.agent_creator.AgentCreator.create_agent",
return_value=mock_agent,
):
mock_s.LLM_PROVIDER = "openai"
sp.create_agent()
mock_gp.assert_any_call("default", sp.prompts_collection)
def test_create_agent_backup_models_computed(self):
"""Cover lines 820-822: backup_models excludes current model."""
sp = self._make_sp()
sp._prompt_content = None
sp._get_prompt_content = MagicMock(return_value="existing prompt")
sp.agent_config = {
"agent_type": "classic",
"prompt_id": "default",
"user_api_key": None,
"models": ["m1", "m2", "m3"],
}
sp.model_id = "m2"
sp.prompt_renderer = MagicMock()
sp.prompt_renderer.render_prompt.return_value = "rendered"
sp.data = {}
sp.history = []
sp.retrieved_docs = []
sp.attachments = []
sp.source = {}
sp.retriever_config = {}
sp.conversation_id = None
sp.decoded_token = {"sub": "u"}
captured_kwargs = {}
def capture_create(*args, **kwargs):
captured_kwargs.update(kwargs)
return MagicMock()
with patch(
"application.api.answer.services.stream_processor.get_provider_from_model_id",
return_value="openai",
), patch(
"application.api.answer.services.stream_processor.get_api_key_for_provider",
return_value="key",
), patch(
"application.api.answer.services.stream_processor.settings"
) as mock_s, patch(
"application.llm.llm_creator.LLMCreator.create_llm",
side_effect=capture_create,
), patch(
"application.llm.handlers.handler_creator.LLMHandlerCreator.create_handler",
return_value=MagicMock(),
), patch(
"application.agents.agent_creator.AgentCreator.create_agent",
return_value=MagicMock(),
):
mock_s.LLM_PROVIDER = "openai"
sp.create_agent()
assert captured_kwargs["backup_models"] == ["m1", "m3"]