mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-11-29 08:33:20 +00:00
315 lines
9.9 KiB
Python
315 lines
9.9 KiB
Python
"""
|
|
Tests for token management and compression features.
|
|
|
|
NOTE: These tests are for future planned features that are not yet implemented.
|
|
They are skipped until the following modules are created:
|
|
- application.compression (DocumentCompressor, HistoryCompressor, etc.)
|
|
- application.core.token_budget (TokenBudgetManager)
|
|
"""
|
|
# ruff: noqa: F821
|
|
import pytest
|
|
|
|
pytest.skip(
|
|
"Token management features not yet implemented - planned for future release",
|
|
allow_module_level=True,
|
|
)
|
|
|
|
|
|
class TestTokenBudgetManager:
|
|
"""Test TokenBudgetManager functionality"""
|
|
|
|
def test_calculate_budget(self):
|
|
"""Test budget calculation"""
|
|
manager = TokenBudgetManager(model_id="gpt-4o")
|
|
budget = manager.calculate_budget()
|
|
|
|
assert budget.total_budget > 0
|
|
assert budget.system_prompt > 0
|
|
assert budget.chat_history > 0
|
|
assert budget.retrieved_docs > 0
|
|
|
|
def test_measure_usage(self):
|
|
"""Test token usage measurement"""
|
|
manager = TokenBudgetManager(model_id="gpt-4o")
|
|
|
|
usage = manager.measure_usage(
|
|
system_prompt="You are a helpful assistant.",
|
|
current_query="What is Python?",
|
|
chat_history=[
|
|
{"prompt": "Hello", "response": "Hi there!"},
|
|
{"prompt": "How are you?", "response": "I'm doing well, thanks!"},
|
|
],
|
|
)
|
|
|
|
assert usage.total > 0
|
|
assert usage.system_prompt > 0
|
|
assert usage.current_query > 0
|
|
assert usage.chat_history > 0
|
|
|
|
def test_compression_recommendation(self):
|
|
"""Test compression recommendation generation"""
|
|
manager = TokenBudgetManager(model_id="gpt-4o")
|
|
|
|
# Create scenario with excessive history
|
|
large_history = [
|
|
{"prompt": f"Question {i}" * 100, "response": f"Answer {i}" * 100}
|
|
for i in range(100)
|
|
]
|
|
|
|
budget, usage, recommendation = manager.check_and_recommend(
|
|
system_prompt="You are a helpful assistant.",
|
|
current_query="What is Python?",
|
|
chat_history=large_history,
|
|
)
|
|
|
|
# Should recommend compression
|
|
assert recommendation.needs_compression()
|
|
assert recommendation.compress_history
|
|
|
|
|
|
class TestHistoryCompressor:
|
|
"""Test HistoryCompressor functionality"""
|
|
|
|
def test_sliding_window_compression(self):
|
|
"""Test sliding window compression strategy"""
|
|
compressor = HistoryCompressor()
|
|
|
|
history = [
|
|
{"prompt": f"Question {i}", "response": f"Answer {i}"} for i in range(20)
|
|
]
|
|
|
|
compressed, metadata = compressor.compress(
|
|
history, target_tokens=500, strategy="sliding_window"
|
|
)
|
|
|
|
assert len(compressed) < len(history)
|
|
assert metadata["original_messages"] == 20
|
|
assert metadata["compressed_messages"] < 20
|
|
assert metadata["strategy"] == "sliding_window"
|
|
|
|
def test_preserve_tool_calls(self):
|
|
"""Test that tool calls are preserved during compression"""
|
|
compressor = HistoryCompressor()
|
|
|
|
history = [
|
|
{"prompt": "Question 1", "response": "Answer 1"},
|
|
{
|
|
"prompt": "Use a tool",
|
|
"response": "Tool used",
|
|
"tool_calls": [{"tool_name": "search", "result": "Found something"}],
|
|
},
|
|
{"prompt": "Question 3", "response": "Answer 3"},
|
|
]
|
|
|
|
compressed, metadata = compressor.compress(
|
|
history, target_tokens=200, strategy="sliding_window", preserve_tool_calls=True
|
|
)
|
|
|
|
# Tool call message should be preserved
|
|
has_tool_calls = any("tool_calls" in msg for msg in compressed)
|
|
assert has_tool_calls
|
|
|
|
|
|
class TestDocumentCompressor:
|
|
"""Test DocumentCompressor functionality"""
|
|
|
|
def test_rerank_compression(self):
|
|
"""Test re-ranking compression strategy"""
|
|
compressor = DocumentCompressor()
|
|
|
|
docs = [
|
|
{"text": f"Document {i} with some content here" * 20, "title": f"Doc {i}"}
|
|
for i in range(10)
|
|
]
|
|
|
|
compressed, metadata = compressor.compress(
|
|
docs, target_tokens=500, query="Document 5", strategy="rerank"
|
|
)
|
|
|
|
assert len(compressed) < len(docs)
|
|
assert metadata["original_docs"] == 10
|
|
assert metadata["strategy"] == "rerank"
|
|
|
|
def test_excerpt_extraction(self):
|
|
"""Test excerpt extraction strategy"""
|
|
compressor = DocumentCompressor()
|
|
|
|
docs = [
|
|
{
|
|
"text": "This is a long document. " * 100
|
|
+ "Python is great. "
|
|
+ "More text here. " * 100,
|
|
"title": "Python Guide",
|
|
}
|
|
]
|
|
|
|
compressed, metadata = compressor.compress(
|
|
docs, target_tokens=300, query="Python", strategy="excerpt"
|
|
)
|
|
|
|
assert metadata["excerpts_created"] > 0
|
|
# Excerpt should contain the query term
|
|
assert "python" in compressed[0]["text"].lower()
|
|
|
|
|
|
class TestToolResultCompressor:
|
|
"""Test ToolResultCompressor functionality"""
|
|
|
|
def test_truncate_large_results(self):
|
|
"""Test truncation of large tool results"""
|
|
compressor = ToolResultCompressor()
|
|
|
|
tool_results = [
|
|
{
|
|
"tool_name": "search",
|
|
"result": "Very long result " * 1000,
|
|
"arguments": {},
|
|
}
|
|
]
|
|
|
|
compressed, metadata = compressor.compress(
|
|
tool_results, target_tokens=100, strategy="truncate"
|
|
)
|
|
|
|
assert metadata["results_truncated"] > 0
|
|
# Result should be shorter
|
|
compressed_result_len = len(str(compressed[0]["result"]))
|
|
original_result_len = len(tool_results[0]["result"])
|
|
assert compressed_result_len < original_result_len
|
|
|
|
def test_extract_json_fields(self):
|
|
"""Test extraction of key fields from JSON results"""
|
|
compressor = ToolResultCompressor()
|
|
|
|
tool_results = [
|
|
{
|
|
"tool_name": "api_call",
|
|
"result": {
|
|
"data": {"important": "value"},
|
|
"metadata": {"verbose": "information" * 100},
|
|
"debug": {"lots": "of data" * 100},
|
|
},
|
|
"arguments": {},
|
|
}
|
|
]
|
|
|
|
compressed, metadata = compressor.compress(
|
|
tool_results, target_tokens=100, strategy="extract"
|
|
)
|
|
|
|
# Should keep important fields, discard verbose ones
|
|
assert "data" in compressed[0]["result"]
|
|
|
|
|
|
class TestPromptOptimizer:
|
|
"""Test PromptOptimizer functionality"""
|
|
|
|
def test_compress_tool_descriptions(self):
|
|
"""Test compression of tool descriptions"""
|
|
optimizer = PromptOptimizer()
|
|
|
|
tools = [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": f"tool_{i}",
|
|
"description": "This is a very long description " * 50,
|
|
"parameters": {},
|
|
},
|
|
}
|
|
for i in range(10)
|
|
]
|
|
|
|
optimized, metadata = optimizer.optimize_tools(
|
|
tools, target_tokens=500, strategy="compress"
|
|
)
|
|
|
|
assert metadata["optimized_tokens"] < metadata["original_tokens"]
|
|
assert metadata["descriptions_compressed"] > 0
|
|
|
|
def test_lazy_load_tools(self):
|
|
"""Test lazy loading of tools based on query"""
|
|
optimizer = PromptOptimizer()
|
|
|
|
tools = [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "search_tool",
|
|
"description": "Search for information",
|
|
"parameters": {},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "calculate_tool",
|
|
"description": "Perform calculations",
|
|
"parameters": {},
|
|
},
|
|
},
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": "other_tool",
|
|
"description": "Do something else",
|
|
"parameters": {},
|
|
},
|
|
},
|
|
]
|
|
|
|
optimized, metadata = optimizer.optimize_tools(
|
|
tools, target_tokens=200, query="I want to search for something", strategy="lazy_load"
|
|
)
|
|
|
|
# Should prefer search tool
|
|
assert len(optimized) < len(tools)
|
|
tool_names = [t["function"]["name"] for t in optimized]
|
|
# Search tool should be included due to query relevance
|
|
assert any("search" in name for name in tool_names)
|
|
|
|
|
|
def test_integration_compression_workflow():
|
|
"""Test complete compression workflow"""
|
|
# Simulate a scenario with large inputs
|
|
manager = TokenBudgetManager(model_id="gpt-4o")
|
|
history_compressor = HistoryCompressor()
|
|
doc_compressor = DocumentCompressor()
|
|
|
|
# Large chat history
|
|
history = [
|
|
{"prompt": f"Question {i}" * 50, "response": f"Answer {i}" * 50}
|
|
for i in range(50)
|
|
]
|
|
|
|
# Large documents
|
|
docs = [
|
|
{"text": f"Document {i} content" * 100, "title": f"Doc {i}"} for i in range(20)
|
|
]
|
|
|
|
# Check budget
|
|
budget, usage, recommendation = manager.check_and_recommend(
|
|
system_prompt="You are a helpful assistant.",
|
|
current_query="What is Python?",
|
|
chat_history=history,
|
|
retrieved_docs=docs,
|
|
)
|
|
|
|
# Should need compression
|
|
assert recommendation.needs_compression()
|
|
|
|
# Apply compression
|
|
if recommendation.compress_history:
|
|
compressed_history, hist_meta = history_compressor.compress(
|
|
history, recommendation.target_history_tokens or budget.chat_history
|
|
)
|
|
assert len(compressed_history) < len(history)
|
|
|
|
if recommendation.compress_docs:
|
|
compressed_docs, doc_meta = doc_compressor.compress(
|
|
docs,
|
|
recommendation.target_docs_tokens or budget.retrieved_docs,
|
|
query="Python",
|
|
)
|
|
assert len(compressed_docs) < len(docs)
|