Files
DocsGPT/tests/test_integration.py
2025-11-24 10:39:27 +00:00

1288 lines
51 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Integration test script for DocsGPT API endpoints.
Tests:
1. /stream endpoint without agent
2. /api/answer endpoint without agent
3. Create agent via API
4. /stream endpoint with agent
5. /api/answer endpoint with agent
Usage:
python tests/test_integration.py # auto-generates JWT token from local secret when available
python tests/test_integration.py --base-url http://localhost:7091
python tests/test_integration.py --token YOUR_JWT_TOKEN # override auto-generation
"""
import argparse
import json
import os
import sys
import time
from pathlib import Path
from typing import Optional
import requests
class Colors:
"""ANSI color codes for terminal output"""
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
def generate_default_token() -> tuple[Optional[str], Optional[str]]:
"""
Try to generate a JWT token using the same logic as generate_test_token.py.
Returns a tuple of (token, error_message). Token is None on failure.
"""
secret = os.getenv("JWT_SECRET_KEY")
key_file = Path(".jwt_secret_key")
if not secret:
try:
secret = key_file.read_text().strip()
except FileNotFoundError:
return None, f"Set JWT_SECRET_KEY or create {key_file} by running the backend once."
except OSError as exc:
return None, f"Could not read {key_file}: {exc}"
if not secret:
return None, "JWT secret key is empty."
try:
from jose import jwt # type: ignore
except ImportError:
return None, "python-jose is not installed (pip install 'python-jose' to auto-generate tokens)."
try:
payload = {"sub": "test_integration_user"}
return jwt.encode(payload, secret, algorithm="HS256"), None
except Exception as exc:
return None, f"Failed to generate JWT token: {exc}"
class DocsGPTTester:
def __init__(self, base_url: str, token: Optional[str] = None, token_source: str = "provided"):
self.base_url = base_url.rstrip('/')
self.token = token
self.token_source = token_source
self.headers = {}
if token:
self.headers['Authorization'] = f'Bearer {token}'
self.agent_id = None
self.test_results = []
def print_header(self, message: str):
"""Print a colored header"""
print(f"\n{Colors.HEADER}{Colors.BOLD}{'=' * 70}{Colors.ENDC}")
print(f"{Colors.HEADER}{Colors.BOLD}{message}{Colors.ENDC}")
print(f"{Colors.HEADER}{Colors.BOLD}{'=' * 70}{Colors.ENDC}\n")
def print_success(self, message: str):
"""Print a success message"""
print(f"{Colors.OKGREEN}{message}{Colors.ENDC}")
def print_error(self, message: str):
"""Print an error message"""
print(f"{Colors.FAIL}{message}{Colors.ENDC}")
def print_info(self, message: str):
"""Print an info message"""
print(f"{Colors.OKCYAN} {message}{Colors.ENDC}")
def print_warning(self, message: str):
"""Print a warning message"""
print(f"{Colors.WARNING}{message}{Colors.ENDC}")
def test_stream_endpoint(self, agent_id: Optional[str] = None) -> bool:
"""Test the /stream endpoint"""
endpoint = f"{self.base_url}/stream"
test_name = f"Stream endpoint{'with agent ' + agent_id if agent_id else ' (no agent)'}"
self.print_header(f"Testing {test_name}")
payload = {
"question": "What is DocsGPT?",
"history": "[]",
"isNoneDoc": True,
}
if agent_id:
payload["agent_id"] = agent_id
try:
self.print_info(f"POST {endpoint}")
self.print_info(f"Payload: {json.dumps(payload, indent=2)}")
response = requests.post(
endpoint,
json=payload,
headers=self.headers,
stream=True,
timeout=30
)
self.print_info(f"Status Code: {response.status_code}")
if response.status_code != 200:
self.print_error(f"Expected 200, got {response.status_code}")
self.print_error(f"Response: {response.text[:500]}")
self.test_results.append((test_name, False, f"Status {response.status_code}"))
return False
# Parse SSE stream
events = []
full_response = ""
conversation_id = None
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data_str = line[6:] # Remove 'data: ' prefix
try:
data = json.loads(data_str)
events.append(data)
# Handle different event types
if data.get('type') in ['stream', 'answer']:
# Both 'stream' and 'answer' types contain response text
full_response += data.get('message', '') or data.get('answer', '')
elif data.get('type') == 'id':
conversation_id = data.get('id')
elif data.get('type') == 'end':
break
except json.JSONDecodeError:
pass
self.print_success(f"Received {len(events)} events")
self.print_info(f"Response preview: {full_response[:100]}...")
if conversation_id:
self.print_success(f"Conversation ID: {conversation_id}")
if not full_response:
self.print_warning("No response content received")
self.test_results.append((test_name, True, "Success"))
self.print_success(f"{test_name} passed!")
return True
except requests.exceptions.RequestException as e:
self.print_error(f"Request failed: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return False
except Exception as e:
self.print_error(f"Unexpected error: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return False
def test_answer_endpoint(self, agent_id: Optional[str] = None) -> bool:
"""Test the /api/answer endpoint"""
endpoint = f"{self.base_url}/api/answer"
test_name = f"Answer endpoint{' with agent ' + agent_id if agent_id else ' (no agent)'}"
self.print_header(f"Testing {test_name}")
payload = {
"question": "What is DocsGPT?",
"history": "[]",
"isNoneDoc": True,
}
if agent_id:
payload["agent_id"] = agent_id
try:
self.print_info(f"POST {endpoint}")
self.print_info(f"Payload: {json.dumps(payload, indent=2)}")
response = requests.post(
endpoint,
json=payload,
headers=self.headers,
timeout=30
)
self.print_info(f"Status Code: {response.status_code}")
if response.status_code != 200:
self.print_error(f"Expected 200, got {response.status_code}")
self.print_error(f"Response: {response.text[:500]}")
self.test_results.append((test_name, False, f"Status {response.status_code}"))
return False
result = response.json()
self.print_info(f"Response keys: {list(result.keys())}")
if 'answer' in result:
answer = result['answer']
self.print_success(f"Answer received: {answer[:100]}...")
else:
self.print_warning("No 'answer' field in response")
if 'conversation_id' in result:
self.print_success(f"Conversation ID: {result['conversation_id']}")
if 'sources' in result:
self.print_info(f"Sources: {len(result['sources'])} items")
self.test_results.append((test_name, True, "Success"))
self.print_success(f"{test_name} passed!")
return True
except requests.exceptions.RequestException as e:
self.print_error(f"Request failed: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return False
except Exception as e:
self.print_error(f"Unexpected error: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return False
def upload_text_source(self) -> Optional[str]:
"""Upload a simple text source for testing
This creates a source without requiring crawler infrastructure.
"""
endpoint = f"{self.base_url}/api/upload"
test_name = "Upload Text Source"
self.print_header(f"Testing {test_name}")
if not self.token:
self.print_warning("No authentication token provided")
self.print_info("Source upload requires authentication")
self.test_results.append((test_name, True, "Skipped (auth required)"))
return None
# Create a simple text file for upload
test_content = """# DocsGPT Test Documentation
## Installation
To install DocsGPT, follow these steps:
1. Clone the repository
2. Run `docker compose up`
3. Access the application at http://localhost:5173
## Configuration
DocsGPT can be configured using environment variables:
- API_KEY: Your OpenAI API key
- LLM_PROVIDER: Choose between openai, anthropic, or google
- ENABLE_CONVERSATION_COMPRESSION: Enable context compression
## Features
DocsGPT provides:
- Conversation compression for long chats
- Real-time token tracking
- Multiple LLM provider support
- Agent system with tools
"""
try:
self.print_info(f"POST {endpoint}")
self.print_info("Uploading test documentation...")
# Create a file-like object
files = {
'file': ('test_docs.txt', test_content.encode(), 'text/plain')
}
data = {
'user': 'test_user',
'name': f'Test Docs {int(time.time())}',
}
response = requests.post(
endpoint,
files=files,
data=data,
headers=self.headers,
timeout=30
)
self.print_info(f"Status Code: {response.status_code}")
if response.status_code == 200:
result = response.json()
task_id = result.get('task_id')
if task_id:
self.print_success(f"Upload task started: {task_id}")
self.print_info("Waiting for processing (10 seconds)...")
time.sleep(10)
self.test_results.append((test_name, True, f"Task: {task_id}"))
return task_id
else:
self.print_warning("No task_id returned")
self.test_results.append((test_name, False, "No task_id"))
return None
else:
self.print_error(f"Expected 200, got {response.status_code}")
try:
error_data = response.json()
self.print_error(f"Error: {error_data}")
except Exception:
self.print_error(f"Response: {response.text[:500]}")
self.test_results.append((test_name, False, f"Status {response.status_code}"))
return None
except requests.exceptions.RequestException as e:
self.print_error(f"Request failed: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return None
except Exception as e:
self.print_error(f"Unexpected error: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return None
def upload_crawler_source(self) -> Optional[str]:
"""Upload a crawler source for DocsGPT documentation"""
endpoint = f"{self.base_url}/api/remote"
test_name = "Upload Crawler Source"
self.print_header(f"Testing {test_name}")
if not self.token:
self.print_warning("No authentication token provided")
self.print_info("Source upload requires authentication")
self.print_info("Skipping source upload and agent tests...")
self.test_results.append((test_name, True, "Skipped (auth required)"))
return None
payload = {
"user": "test_user",
"source": "crawler",
"name": f"DocsGPT Docs {int(time.time())}",
"data": json.dumps({"url": "https://docs.docsgpt.cloud/"}),
}
try:
self.print_info(f"POST {endpoint}")
self.print_info("Crawling: https://docs.docsgpt.cloud/")
response = requests.post(
endpoint,
data=payload,
headers=self.headers,
timeout=30
)
self.print_info(f"Status Code: {response.status_code}")
if response.status_code == 200:
result = response.json()
task_id = result.get('task_id')
if task_id:
self.print_success(f"Crawler task started: {task_id}")
self.print_info("Waiting for crawler to complete (30 seconds)...")
time.sleep(30) # Wait for crawler to process
self.test_results.append((test_name, True, f"Task: {task_id}"))
return task_id
else:
self.print_warning("No task_id returned")
self.test_results.append((test_name, False, "No task_id"))
return None
else:
self.print_error(f"Expected 200, got {response.status_code}")
try:
error_data = response.json()
self.print_error(f"Error: {error_data}")
except Exception:
self.print_error(f"Response: {response.text[:500]}")
self.test_results.append((test_name, False, f"Status {response.status_code}"))
return None
except requests.exceptions.RequestException as e:
self.print_error(f"Request failed: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return None
except Exception as e:
self.print_error(f"Unexpected error: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return None
def get_source_id_from_task(self, task_id: str) -> Optional[str]:
"""Check task status and get source ID"""
endpoint = f"{self.base_url}/api/task_status"
try:
response = requests.get(
endpoint,
params={"task_id": task_id},
headers=self.headers,
timeout=10
)
if response.status_code == 200:
result = response.json()
if result.get('status') == 'SUCCESS':
# Task completed, now find the source
# Query sources collection to find the latest source
sources_response = requests.get(
f"{self.base_url}/api/sources",
headers=self.headers,
timeout=10
)
if sources_response.status_code == 200:
sources = sources_response.json()
# Filter out the "Default" source and get user sources only
user_sources = [s for s in sources if s.get('date') != 'default']
if user_sources and len(user_sources) > 0:
# Get the most recent source (first one, as they're sorted by date desc)
latest_source = user_sources[0]
return latest_source.get('id')
return None
except Exception as e:
self.print_error(f"Error getting source ID: {str(e)}")
return None
def create_agent(self, source_id: Optional[str] = None, published: bool = False) -> Optional[tuple]:
"""Create an agent via API
Args:
source_id: Optional source ID to attach to agent
published: If True, create published agent (requires source_id)
Returns:
Tuple of (agent_id, api_key) if successful, None otherwise
"""
endpoint = f"{self.base_url}/api/create_agent"
if published and source_id:
test_name = f"Create Published Agent with source {source_id[:8]}..."
elif published:
test_name = "Create Published Agent (skipped - no source)"
else:
test_name = "Create Draft Agent"
self.print_header(f"Testing {test_name}")
if not self.token:
self.print_warning("No authentication token provided")
self.print_info("Agent creation requires authentication")
self.print_info("To test agents, provide a JWT token with --token argument")
self.print_info("Skipping agent tests...")
# Mark as skipped rather than attempting without auth
self.test_results.append((test_name, True, "Skipped (auth required)"))
return None
# Published agents require a source
if published and not source_id:
self.print_warning("Cannot create published agent without source")
self.test_results.append((test_name, True, "Skipped (no source)"))
return None
# Create payload based on type
if published:
self.print_info(f"Creating published agent with source {source_id[:8]}...")
payload = {
"name": f"Test Agent (Published) {int(time.time())}",
"description": "Integration test agent with source",
"prompt_id": "default",
"chunks": 2,
"retriever": "classic",
"agent_type": "classic",
"status": "published",
"source": source_id,
}
else:
self.print_info("Creating draft agent (for agent_id testing)")
payload = {
"name": f"Test Agent (Draft) {int(time.time())}",
"description": "Integration test draft agent",
"prompt_id": "default",
"chunks": 2,
"retriever": "classic",
"agent_type": "classic",
"status": "draft",
}
try:
self.print_info(f"POST {endpoint}")
self.print_info(f"Payload: {json.dumps(payload, indent=2)}")
response = requests.post(
endpoint,
json=payload,
headers=self.headers,
timeout=10
)
self.print_info(f"Status Code: {response.status_code}")
if response.status_code in [200, 201]: # Accept both 200 OK and 201 Created
result = response.json()
agent_id = result.get('id')
api_key = result.get('key', '')
if agent_id:
self.agent_id = agent_id
self.print_success(f"Agent created with ID: {agent_id}")
if api_key:
self.print_success(f"Agent API key: {api_key[:20]}...")
self.test_results.append((test_name, True, f"ID: {agent_id}, API Key: Yes"))
return (agent_id, api_key)
else:
self.print_warning("Agent created but no API key (draft agent)")
self.test_results.append((test_name, True, f"ID: {agent_id}, API Key: No"))
return (agent_id, None)
else:
self.print_warning("Agent created but no ID returned")
self.test_results.append((test_name, False, "No ID returned"))
return None
elif response.status_code == 401:
self.print_warning("Authentication required for agent creation")
self.print_info("To test agents, provide a JWT token with --token argument")
self.print_info("Skipping agent tests...")
# Mark as "skipped" rather than "failed"
self.test_results.append((test_name, True, "Skipped (auth required)"))
return None
else:
self.print_error(f"Expected 200/201, got {response.status_code}")
try:
error_data = response.json()
self.print_error(f"Error: {error_data.get('message', response.text[:200])}")
except Exception:
self.print_error(f"Response: {response.text[:500]}")
self.test_results.append((test_name, False, f"Status {response.status_code}"))
return None
except requests.exceptions.RequestException as e:
self.print_error(f"Request failed: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return None
except Exception as e:
self.print_error(f"Unexpected error: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return None
def test_api_key_endpoint(self, api_key: str, endpoint_type: str = "stream") -> bool:
"""Test endpoint with API key instead of agent_id"""
test_name = f"{endpoint_type.capitalize()} endpoint with API key"
self.print_header(f"Testing {test_name}")
if endpoint_type == "stream":
endpoint = f"{self.base_url}/stream"
else:
endpoint = f"{self.base_url}/api/answer"
payload = {
"question": "What is DocsGPT?",
"history": "[]",
"api_key": api_key, # Use api_key instead of agent_id
}
try:
self.print_info(f"POST {endpoint}")
self.print_info(f"Using API key: {api_key[:20]}...")
if endpoint_type == "stream":
response = requests.post(
endpoint,
json=payload,
headers=self.headers,
stream=True,
timeout=30
)
self.print_info(f"Status Code: {response.status_code}")
if response.status_code != 200:
self.print_error(f"Expected 200, got {response.status_code}")
self.print_error(f"Response: {response.text[:500]}")
self.test_results.append((test_name, False, f"Status {response.status_code}"))
return False
# Parse SSE stream
events = []
full_response = ""
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data_str = line[6:]
try:
data = json.loads(data_str)
events.append(data)
if data.get('type') in ['stream', 'answer']:
full_response += data.get('message', '') or data.get('answer', '')
elif data.get('type') == 'end':
break
except json.JSONDecodeError:
pass
self.print_success(f"Received {len(events)} events")
self.print_info(f"Response preview: {full_response[:100]}...")
self.test_results.append((test_name, True, "Success"))
return True
else: # answer endpoint
response = requests.post(
endpoint,
json=payload,
headers=self.headers,
timeout=30
)
self.print_info(f"Status Code: {response.status_code}")
if response.status_code != 200:
self.print_error(f"Expected 200, got {response.status_code}")
self.print_error(f"Response: {response.text[:500]}")
self.test_results.append((test_name, False, f"Status {response.status_code}"))
return False
result = response.json()
answer = result.get('answer', '')
self.print_success(f"Answer received: {answer[:100]}...")
self.test_results.append((test_name, True, "Success"))
return True
except requests.exceptions.RequestException as e:
self.print_error(f"Request failed: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return False
except Exception as e:
self.print_error(f"Unexpected error: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return False
def test_model_validation(self) -> bool:
"""Test model_id validation"""
endpoint = f"{self.base_url}/stream"
test_name = "Model validation (invalid model_id)"
self.print_header(f"Testing {test_name}")
payload = {
"question": "Test question",
"history": "[]",
"isNoneDoc": True,
"model_id": "invalid-model-xyz-123",
}
try:
self.print_info(f"POST {endpoint}")
self.print_info("Testing with invalid model_id: invalid-model-xyz-123")
response = requests.post(
endpoint,
json=payload,
headers=self.headers,
stream=True,
timeout=10
)
self.print_info(f"Status Code: {response.status_code}")
if response.status_code == 400:
# Read the error from SSE stream
error_message = None
error_field = None
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data_str = line[6:]
try:
data = json.loads(data_str)
if data.get('type') == 'error':
# Try both 'message' and 'error' fields
error_message = data.get('message') or data.get('error', '')
error_field = 'message' if 'message' in data else 'error'
break
except json.JSONDecodeError:
pass
# Consider it successful if we got a 400 with any error message
if error_message:
self.print_success("Invalid model_id rejected with 400 status")
self.print_info(f"Error ({error_field}): {error_message[:200]}")
# Check if it's the detailed validation error or generic error
if 'Invalid model_id' in error_message or 'model' in error_message.lower():
self.print_success("✓ Validation error contains model information")
self.test_results.append((test_name, True, "Validation works"))
else:
self.print_warning("Generic error message (validation may need improvement)")
self.test_results.append((test_name, True, "Generic validation"))
return True
else:
self.print_warning("No error message in response")
self.test_results.append((test_name, False, "No error message"))
return False
else:
self.print_warning(f"Expected 400, got {response.status_code}")
self.test_results.append((test_name, False, f"Status {response.status_code}"))
return False
except Exception as e:
self.print_error(f"Unexpected error: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return False
def create_web_scraping_agent(self) -> Optional[tuple]:
"""Create an agent with read_webpage tool enabled
Returns:
Tuple of (agent_id, api_key) if successful, None otherwise
"""
endpoint = f"{self.base_url}/api/create_agent"
test_name = "Create Web Scraping Agent"
self.print_header(f"Testing {test_name}")
if not self.token:
self.print_warning("No authentication token provided")
self.test_results.append((test_name, True, "Skipped (auth required)"))
return None
# Create agent with read_webpage tool
payload = {
"name": f"Web Scraping Agent {int(time.time())}",
"description": "Test agent with read_webpage tool for compression testing",
"prompt_id": "default",
"chunks": 2,
"retriever": "classic",
"agent_type": "react", # ReAct agent supports tools
"status": "draft",
"tools": ["read_webpage"], # Enable read_webpage tool
}
try:
self.print_info(f"POST {endpoint}")
self.print_info("Creating agent with read_webpage tool...")
response = requests.post(
endpoint,
json=payload,
headers=self.headers,
timeout=10
)
self.print_info(f"Status Code: {response.status_code}")
if response.status_code in [200, 201]:
result = response.json()
agent_id = result.get('id')
api_key = result.get('key', '')
if agent_id:
self.print_success(f"Web scraping agent created with ID: {agent_id}")
if api_key:
self.print_success(f"Agent API key: {api_key[:20]}...")
self.test_results.append((test_name, True, f"ID: {agent_id}, API Key: Yes"))
return (agent_id, api_key)
else:
self.print_warning("Agent created but no API key (draft agent)")
self.test_results.append((test_name, True, f"ID: {agent_id}, API Key: No"))
return (agent_id, None)
else:
self.print_warning("Agent created but no ID returned")
self.test_results.append((test_name, False, "No ID returned"))
return None
else:
self.print_error(f"Expected 200/201, got {response.status_code}")
try:
error_data = response.json()
self.print_error(f"Error: {error_data.get('message', response.text[:200])}")
except Exception:
self.print_error(f"Response: {response.text[:500]}")
self.test_results.append((test_name, False, f"Status {response.status_code}"))
return None
except requests.exceptions.RequestException as e:
self.print_error(f"Request failed: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return None
except Exception as e:
self.print_error(f"Unexpected error: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return None
def test_compression_heavy_tool_usage(self, agent_result: Optional[tuple] = None) -> bool:
"""Test compression with heavy tool usage (real API calls)
This simulates a scenario where an agent makes many tool calls
(including read_webpage for web scraping), generating large responses
that should trigger compression.
Args:
agent_result: Optional tuple of (agent_id, api_key) from agent creation
"""
endpoint = f"{self.base_url}/api/answer"
test_name = "Compression - Heavy Tool Usage"
self.print_header(f"Testing {test_name}")
if not self.token:
self.print_warning("Authentication required for compression tests")
self.test_results.append((test_name, True, "Skipped (auth required)"))
return False
# Use provided agent or create one
if not agent_result:
self.print_info("No web scraping agent provided, creating one...")
agent_result = self.create_web_scraping_agent()
if not agent_result:
self.print_warning("Could not create web scraping agent, using isNoneDoc instead")
agent_id = None
api_key = None
else:
agent_id, api_key = agent_result
# Define URLs to scrape for testing
urls_to_scrape = [
"https://docs.docsgpt.cloud/",
"https://docs.docsgpt.cloud/getting-started/quickstart",
"https://docs.docsgpt.cloud/getting-started/installation",
"https://docs.docsgpt.cloud/extensions/extensions-intro",
"https://github.com/arc53/DocsGPT",
]
# Make requests with tool usage
self.print_info("Making 10 consecutive requests to build up conversation history...")
self.print_info("Some requests will use read_webpage tool for web scraping...")
current_conv_id = None
for i in range(10):
# Alternate between regular questions and web scraping
if i < 5 and agent_id:
# Use web scraping for first 5 requests
url = urls_to_scrape[i % len(urls_to_scrape)]
question = f"Please read and summarize the content from this webpage: {url}"
else:
# Use regular questions for remaining requests
question = f"Tell me about Python topic number {i+1}: data structures, decorators, async, testing, etc. Please provide a comprehensive explanation."
payload = {
"question": question,
"history": "[]",
"model_id": "gemini-2.5-pro",
}
# Use agent if available, otherwise isNoneDoc
if agent_id:
payload["agent_id"] = agent_id
elif api_key:
payload["api_key"] = api_key
else:
payload["isNoneDoc"] = True
if current_conv_id:
payload["conversation_id"] = current_conv_id
try:
response = requests.post(
endpoint,
json=payload,
headers=self.headers,
timeout=90 # Longer timeout for web scraping
)
if response.status_code == 200:
result = response.json()
current_conv_id = result.get('conversation_id', current_conv_id)
answer_preview = result.get('answer', '')[:80]
self.print_success(f"Request {i+1}/10 completed (conv_id: {current_conv_id})")
self.print_info(f" Answer preview: {answer_preview}...")
else:
self.print_error(f"Request {i+1}/10 failed with status {response.status_code}")
self.test_results.append((test_name, False, f"Request {i+1} failed"))
return False
time.sleep(2) # Small delay between requests
except Exception as e:
self.print_error(f"Request {i+1}/10 failed: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return False
# Check if conversation was compressed by examining metadata
if current_conv_id:
self.print_info(f"Checking compression status for conversation {current_conv_id}")
# Note: This would require a /api/conversation/{id} endpoint to verify
self.print_success("Heavy tool usage test completed")
tool_info = "with read_webpage" if agent_id else "without tools"
self.test_results.append((test_name, True, f"10 requests {tool_info}, conv_id: {current_conv_id}"))
return True
else:
self.print_warning("No conversation_id received")
self.test_results.append((test_name, False, "No conversation_id"))
return False
def test_compression_needle_in_haystack(self) -> bool:
"""Test that compression preserves critical information
This sends a long conversation with important info in the middle,
then asks about that info to verify it was preserved through compression.
"""
endpoint = f"{self.base_url}/api/answer"
test_name = "Compression - Needle in Haystack"
self.print_header(f"Testing {test_name}")
if not self.token:
self.print_warning("Authentication required for compression tests")
self.test_results.append((test_name, True, "Skipped (auth required)"))
return False
conversation_id = None
# Step 1: Send general questions
self.print_info("Step 1: Sending general questions...")
for i, question in enumerate([
"Tell me about Python best practices in detail",
"Explain Python data structures comprehensively",
]):
payload = {
"question": question,
"history": "[]",
"isNoneDoc": True,
"model_id": "gemini-2.5-pro",
}
if conversation_id:
payload["conversation_id"] = conversation_id
try:
response = requests.post(endpoint, json=payload, headers=self.headers, timeout=60)
if response.status_code == 200:
result = response.json()
conversation_id = result.get('conversation_id', conversation_id)
self.print_success(f"General question {i+1}/2 completed")
else:
self.print_error(f"Request failed with status {response.status_code}")
self.test_results.append((test_name, False, "General questions failed"))
return False
time.sleep(2)
except Exception as e:
self.print_error(f"Request failed: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return False
# Step 2: Send CRITICAL information
self.print_info("Step 2: Sending CRITICAL information to remember...")
critical_payload = {
"question": "Please remember this critical information: The production database password is stored in DB_PASSWORD_PROD environment variable. The backup runs at 3:00 AM UTC daily. Premium users have 10,000 req/hour limit.",
"history": "[]",
"isNoneDoc": True,
"model_id": "gemini-2.5-pro",
"conversation_id": conversation_id,
}
try:
response = requests.post(endpoint, json=critical_payload, headers=self.headers, timeout=60)
if response.status_code == 200:
result = response.json()
conversation_id = result.get('conversation_id', conversation_id)
self.print_success("Critical information sent")
else:
self.print_error("Critical info request failed")
self.test_results.append((test_name, False, "Critical info failed"))
return False
time.sleep(2)
except Exception as e:
self.print_error(f"Request failed: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return False
# Step 3: Send more general questions to bury the critical info
self.print_info("Step 3: Sending more questions to bury the critical info...")
for i, question in enumerate([
"Explain Python decorators in great detail",
"Tell me about Python async programming comprehensively",
]):
payload = {
"question": question,
"history": "[]",
"isNoneDoc": True,
"model_id": "gemini-2.5-pro",
"conversation_id": conversation_id,
}
try:
response = requests.post(endpoint, json=payload, headers=self.headers, timeout=60)
if response.status_code == 200:
result = response.json()
conversation_id = result.get('conversation_id', conversation_id)
self.print_success(f"Burying question {i+1}/2 completed")
else:
self.print_error("Request failed")
self.test_results.append((test_name, False, "Burying questions failed"))
return False
time.sleep(2)
except Exception as e:
self.print_error(f"Request failed: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return False
# Step 4: Ask about the critical information
self.print_info("Step 4: Testing if critical info was preserved...")
recall_payload = {
"question": "What was the database password environment variable I mentioned earlier?",
"history": "[]",
"isNoneDoc": True,
"model_id": "gemini-2.5-pro",
"conversation_id": conversation_id,
}
try:
response = requests.post(endpoint, json=recall_payload, headers=self.headers, timeout=60)
if response.status_code == 200:
result = response.json()
answer = result.get('answer', '').lower()
# Check if the critical info was preserved
if 'db_password_prod' in answer or 'database password' in answer:
self.print_success("✓ Critical information preserved through compression!")
self.print_info(f"Answer: {answer[:150]}...")
self.test_results.append((test_name, True, "Info preserved"))
return True
else:
self.print_warning("Critical information may have been lost")
self.print_info(f"Answer: {answer[:150]}...")
self.test_results.append((test_name, False, "Info not preserved"))
return False
else:
self.print_error("Recall request failed")
self.test_results.append((test_name, False, "Recall failed"))
return False
except Exception as e:
self.print_error(f"Request failed: {str(e)}")
self.test_results.append((test_name, False, str(e)))
return False
def print_summary(self):
"""Print test results summary"""
self.print_header("Test Results Summary")
passed = sum(1 for _, success, _ in self.test_results if success)
failed = len(self.test_results) - passed
print(f"\n{Colors.BOLD}Total Tests: {len(self.test_results)}{Colors.ENDC}")
print(f"{Colors.OKGREEN}Passed: {passed}{Colors.ENDC}")
print(f"{Colors.FAIL}Failed: {failed}{Colors.ENDC}\n")
print(f"{Colors.BOLD}Detailed Results:{Colors.ENDC}")
for test_name, success, message in self.test_results:
status = f"{Colors.OKGREEN}PASS{Colors.ENDC}" if success else f"{Colors.FAIL}FAIL{Colors.ENDC}"
print(f" {status} - {test_name}: {message}")
print()
return failed == 0
def run_all_tests(self):
"""Run all integration tests"""
self.print_header("DocsGPT Integration Tests")
self.print_info(f"Base URL: {self.base_url}")
if self.token:
self.print_info(f"Authentication: Yes ({self.token_source})")
else:
self.print_info("Authentication: No (agent-related tests will be skipped)")
# Test 1: Stream endpoint without agent
self.test_stream_endpoint()
time.sleep(1)
# Test 2: Answer endpoint without agent
self.test_answer_endpoint()
time.sleep(1)
# Test 3: Model validation
self.test_model_validation()
time.sleep(1)
# Test 4: Compression tests (requires token)
if self.token:
self.print_info("Running compression integration tests...")
time.sleep(1)
# Test 4a: Heavy tool usage compression
self.test_compression_heavy_tool_usage()
time.sleep(2)
# Test 4b: Needle in haystack compression
self.test_compression_needle_in_haystack()
time.sleep(1)
else:
self.print_info("Skipping compression tests (no authentication)")
# Test 5: Upload text source (requires token) - faster than crawler
task_id = self.upload_text_source()
source_id = None
if task_id:
# Test 6: Get source ID from completed task
source_id = self.get_source_id_from_task(task_id)
if source_id:
self.print_success(f"Source created with ID: {source_id}")
else:
self.print_warning("Could not retrieve source ID from task - trying crawler fallback")
# Fallback to crawler if text upload failed
crawler_task_id = self.upload_crawler_source()
if crawler_task_id:
source_id = self.get_source_id_from_task(crawler_task_id)
if source_id:
self.print_success(f"Source created with ID (crawler): {source_id}")
else:
self.print_warning("Could not retrieve source ID from crawler task either")
# Test 7: Create published agent (for API key testing) - default behavior
# Published agents get an API key automatically
published_result = self.create_agent(source_id=source_id, published=True)
if published_result:
agent_id, api_key = published_result
time.sleep(1)
if api_key:
# Test 8 & 9: Test with API key (primary method)
self.test_api_key_endpoint(api_key, endpoint_type="stream")
time.sleep(1)
self.test_api_key_endpoint(api_key, endpoint_type="answer")
time.sleep(1)
# Test 10: Also test with agent_id for completeness
self.test_stream_endpoint(agent_id=agent_id)
time.sleep(1)
self.test_answer_endpoint(agent_id=agent_id)
# Test 11: If agent has a source, test source-specific questions
if source_id:
time.sleep(1)
self.print_info("Testing published agent with source-specific questions...")
test_name = "Published agent with source (DocsGPT question)"
self.print_header(f"Testing {test_name}")
payload = {
"question": "How do I install DocsGPT?",
"history": "[]",
"api_key": api_key,
}
try:
response = requests.post(
f"{self.base_url}/api/answer",
json=payload,
headers=self.headers,
timeout=30
)
if response.status_code == 200:
result = response.json()
answer = result.get('answer', '')
self.print_success(f"Answer received: {answer[:100]}...")
if any(word in answer.lower() for word in ['install', 'docker', 'setup']):
self.print_success("Answer contains relevant information from source")
self.test_results.append((test_name, True, "Success"))
else:
self.print_warning("Answer may not be using source data")
self.test_results.append((test_name, True, "Answer unclear"))
else:
self.print_error(f"Status {response.status_code}")
self.test_results.append((test_name, False, f"Status {response.status_code}"))
except Exception as e:
self.print_error(f"Test failed: {str(e)}")
self.test_results.append((test_name, False, str(e)))
else:
self.print_warning("Published agent created but no API key received")
self.print_info("Testing with agent_id instead...")
# Fallback to agent_id testing
self.test_stream_endpoint(agent_id=agent_id)
time.sleep(1)
self.test_answer_endpoint(agent_id=agent_id)
else:
if self.token:
self.print_warning("Published agent creation failed - some tests skipped")
else:
self.print_info("Skipping agent tests (no authentication token)")
# Print summary
success = self.print_summary()
return 0 if success else 1
def main():
parser = argparse.ArgumentParser(
description='Integration test script for DocsGPT API endpoints',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Test local instance
python tests/test_integration.py # auto-generates JWT token from local secret if possible
# Test remote instance
python tests/test_integration.py --base-url https://app.docsgpt.com
# Test with authentication (required for agent creation)
python tests/test_integration.py --token YOUR_JWT_TOKEN
# Test specific endpoint only
python tests/test_integration.py --base-url http://localhost:7091 --token YOUR_TOKEN
"""
)
parser.add_argument(
'--base-url',
default='http://localhost:7091',
help='Base URL of DocsGPT instance (default: http://localhost:7091)'
)
parser.add_argument(
'--token',
help='JWT authentication token (auto-generated from local secret when available)'
)
args = parser.parse_args()
token = args.token
token_source = "provided via --token" if token else "auto-generated from local JWT secret"
if not token:
token, token_error = generate_default_token()
if token:
print(f"{Colors.OKCYAN} Using auto-generated JWT token from local secret{Colors.ENDC}")
else:
token_source = "none"
if token_error:
print(f"{Colors.WARNING}⚠ Could not auto-generate JWT token: {token_error}{Colors.ENDC}")
print(f"{Colors.WARNING}⚠ Agent creation tests will be skipped unless you provide --token{Colors.ENDC}")
try:
tester = DocsGPTTester(args.base_url, token, token_source=token_source)
exit_code = tester.run_all_tests()
sys.exit(exit_code)
except KeyboardInterrupt:
print(f"\n{Colors.WARNING}Tests interrupted by user{Colors.ENDC}")
sys.exit(1)
except Exception as e:
print(f"\n{Colors.FAIL}Fatal error: {str(e)}{Colors.ENDC}")
sys.exit(1)
if __name__ == '__main__':
main()