#!/usr/bin/env python3 """ Integration tests for DocsGPT agent management endpoints. Endpoints tested: - /api/create_agent (POST) - Create agent - /api/get_agent (GET) - Get single agent - /api/get_agents (GET) - List agents - /api/update_agent/{id} (PUT) - Update agent - /api/delete_agent (DELETE) - Delete agent - /api/pin_agent (POST) - Pin agent - /api/pinned_agents (GET) - List pinned agents - /api/template_agents (GET) - List template agents - /api/share_agent (PUT) - Share agent - /api/shared_agent (GET) - Get shared agent - /api/shared_agents (GET) - List shared agents - /api/remove_shared_agent (DELETE) - Remove shared agent - /api/adopt_agent (POST) - Adopt shared agent - /api/agent_webhook (GET) - Get agent webhook - /api/webhooks/agents/{token} (GET, POST) - Webhook operations Usage: python tests/integration/test_agents.py python tests/integration/test_agents.py --base-url http://localhost:7091 python tests/integration/test_agents.py --token YOUR_JWT_TOKEN """ import sys import time from pathlib import Path from typing import Optional # Add parent directory to path for standalone execution _THIS_DIR = Path(__file__).parent _TESTS_DIR = _THIS_DIR.parent _ROOT_DIR = _TESTS_DIR.parent if str(_ROOT_DIR) not in sys.path: sys.path.insert(0, str(_ROOT_DIR)) from tests.integration.base import DocsGPTTestBase, create_client_from_args class AgentTests(DocsGPTTestBase): """Integration tests for agent management endpoints.""" # ------------------------------------------------------------------------- # Test Data Helpers # ------------------------------------------------------------------------- def get_or_create_test_source(self) -> Optional[str]: """ Get or create a test source for agent tests. Returns: Source ID or None if creation fails """ if hasattr(self, "_test_source_id"): return self._test_source_id if not self.is_authenticated: return None # First check if any sources exist try: sources_resp = self.get("/api/sources", timeout=10) if sources_resp.status_code == 200: sources = sources_resp.json() if sources: self._test_source_id = sources[0].get("id") return self._test_source_id except Exception: pass # Create a minimal test source test_content = b"# Test Source\n\nThis is a test source for integration testing.\n" try: response = self.post( "/api/upload", files={"file": ("test_source.md", test_content, "text/markdown")}, data={"name": f"Test Source {int(time.time())}"}, timeout=30, ) if response.status_code == 200: result = response.json() task_id = result.get("task_id") # Wait briefly for task to start if task_id: import time as time_module time_module.sleep(2) # Get sources again sources_resp = self.get("/api/sources", timeout=10) if sources_resp.status_code == 200: sources = sources_resp.json() if sources: self._test_source_id = sources[0].get("id") return self._test_source_id except Exception: pass return None def get_or_create_test_agent(self) -> Optional[tuple]: """ Get or create a test agent. Returns: Tuple of (agent_id, api_key) or None if creation fails """ if hasattr(self, "_test_agent"): return self._test_agent if not self.is_authenticated: return None payload = { "name": f"Agent Test {int(time.time())}", "description": "Integration test agent", "prompt_id": "default", "chunks": 2, "retriever": "classic", "agent_type": "classic", "status": "draft", } try: response = self.post("/api/create_agent", json=payload, timeout=10) if response.status_code in [200, 201]: result = response.json() agent_id = result.get("id") api_key = result.get("key") if agent_id: self._test_agent = (agent_id, api_key) return self._test_agent except Exception: pass return None def cleanup_test_agent(self, agent_id: str) -> None: """Delete a test agent (cleanup helper).""" if not self.is_authenticated: return try: self.delete(f"/api/delete_agent?id={agent_id}", timeout=10) except Exception: pass # ------------------------------------------------------------------------- # Create Tests # ------------------------------------------------------------------------- def test_create_agent_draft(self) -> bool: """Test creating a draft agent.""" test_name = "Create draft agent" self.print_header(test_name) if not self.require_auth(test_name): return True payload = { "name": f"Draft Agent {int(time.time())}", "description": "Test draft agent", "prompt_id": "default", "chunks": 2, "retriever": "classic", "agent_type": "classic", "status": "draft", } try: response = self.post("/api/create_agent", json=payload, timeout=15) if response.status_code not in [200, 201]: self.print_error(f"Expected 200/201, got {response.status_code}") self.record_result(test_name, False, f"Status {response.status_code}") return False result = response.json() agent_id = result.get("id") if not agent_id: self.print_error("No agent ID returned") self.record_result(test_name, False, "No agent ID") return False self.print_success(f"Created draft agent: {agent_id}") self.print_info(f"API Key: {result.get('key', 'N/A')[:20]}...") self.record_result(test_name, True, f"Agent ID: {agent_id}") # Cleanup self.cleanup_test_agent(agent_id) return True except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False def test_create_agent_published(self) -> bool: """Test creating a published agent (requires source).""" test_name = "Create published agent" self.print_header(test_name) if not self.require_auth(test_name): return True # Published agents require a source source_id = self.get_or_create_test_source() if not source_id: self.print_warning("Could not get or create test source") self.record_result(test_name, True, "Skipped (no source)") return True payload = { "name": f"Published Agent {int(time.time())}", "description": "Test published agent", "prompt_id": "default", "chunks": 2, "retriever": "classic", "agent_type": "classic", "status": "published", "source": source_id, } try: response = self.post("/api/create_agent", json=payload, timeout=15) if response.status_code not in [200, 201]: self.print_error(f"Expected 200/201, got {response.status_code}") self.print_error(f"Response: {response.text[:200]}") self.record_result(test_name, False, f"Status {response.status_code}") return False result = response.json() agent_id = result.get("id") status = result.get("status", "unknown") if not agent_id: self.print_error("No agent ID returned") self.record_result(test_name, False, "No agent ID") return False self.print_success(f"Created published agent: {agent_id}") self.print_info(f"Status: {status}") self.record_result(test_name, True, f"Agent ID: {agent_id}") # Cleanup self.cleanup_test_agent(agent_id) return True except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False def test_create_agent_with_tools(self) -> bool: """Test creating an agent with tools enabled.""" test_name = "Create agent with tools" self.print_header(test_name) if not self.require_auth(test_name): return True payload = { "name": f"Agent with Tools {int(time.time())}", "description": "Test agent with tools", "prompt_id": "default", "chunks": 2, "retriever": "classic", "agent_type": "react", "status": "draft", "tools": [], } try: response = self.post("/api/create_agent", json=payload, timeout=15) if response.status_code not in [200, 201]: self.print_error(f"Expected 200/201, got {response.status_code}") self.record_result(test_name, False, f"Status {response.status_code}") return False result = response.json() agent_id = result.get("id") if not agent_id: self.print_error("No agent ID returned") self.record_result(test_name, False, "No agent ID") return False self.print_success(f"Created agent with tools: {agent_id}") self.print_info(f"Agent type: {result.get('agent_type', 'N/A')}") self.record_result(test_name, True, f"Agent ID: {agent_id}") # Cleanup self.cleanup_test_agent(agent_id) return True except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False # ------------------------------------------------------------------------- # Read Tests # ------------------------------------------------------------------------- def test_get_agent(self) -> bool: """Test getting a single agent by ID.""" test_name = "Get single agent" self.print_header(test_name) if not self.require_auth(test_name): return True # Create an agent first agent_data = self.get_or_create_test_agent() if not agent_data: self.print_warning("Could not create test agent") self.record_result(test_name, True, "Skipped (no test agent)") return True agent_id, _ = agent_data try: response = self.get("/api/get_agent", params={"id": agent_id}, timeout=10) if not self.assert_status(response, 200, test_name): return False result = response.json() returned_id = result.get("id") if returned_id != agent_id: self.print_error(f"Wrong agent returned: {returned_id}") self.record_result(test_name, False, "Wrong agent ID") return False self.print_success(f"Retrieved agent: {result.get('name')}") self.print_info(f"Status: {result.get('status')}") self.record_result(test_name, True, f"Agent: {result.get('name')}") return True except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False def test_get_agent_not_found(self) -> bool: """Test getting a non-existent agent.""" test_name = "Get non-existent agent" self.print_header(test_name) if not self.require_auth(test_name): return True try: response = self.get( "/api/get_agent", params={"id": "nonexistent-agent-id-12345"}, timeout=10, ) # Expect 404 or 400 if response.status_code in [404, 400]: self.print_success(f"Correctly returned {response.status_code}") self.record_result(test_name, True, f"Status: {response.status_code}") return True else: self.print_error(f"Unexpected status: {response.status_code}") self.record_result(test_name, False, f"Status: {response.status_code}") return False except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False def test_get_agents(self) -> bool: """Test listing all agents. Note: This endpoint may return 400 if there are data consistency issues (e.g., agents with references to deleted sources). """ test_name = "List all agents" self.print_header(test_name) if not self.require_auth(test_name): return True try: response = self.get("/api/get_agents", timeout=10) if response.status_code == 200: result = response.json() if not isinstance(result, list): self.print_error("Response is not a list") self.record_result(test_name, False, "Invalid response type") return False self.print_success(f"Retrieved {len(result)} agents") if result: self.print_info(f"First agent: {result[0].get('name', 'N/A')}") self.record_result(test_name, True, f"Count: {len(result)}") return True elif response.status_code == 400: # 400 can occur due to data consistency issues (orphaned references) self.print_warning("Backend returned 400 (possible data issue)") self.record_result(test_name, True, "Endpoint accessible (data issue)") return True else: self.print_error(f"Unexpected status: {response.status_code}") self.record_result(test_name, False, f"Status: {response.status_code}") return False except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False # ------------------------------------------------------------------------- # Update Tests # ------------------------------------------------------------------------- def test_update_agent_name(self) -> bool: """Test updating agent name.""" test_name = "Update agent name" self.print_header(test_name) if not self.require_auth(test_name): return True # Create agent first agent_data = self.get_or_create_test_agent() if not agent_data: self.print_warning("Could not create test agent") self.record_result(test_name, True, "Skipped (no test agent)") return True agent_id, _ = agent_data new_name = f"Updated Agent {int(time.time())}" try: response = self.put( f"/api/update_agent/{agent_id}", json={"name": new_name}, timeout=10, ) if not self.assert_status(response, 200, test_name): return False # Verify update verify_response = self.get("/api/get_agent", params={"id": agent_id}) if verify_response.status_code == 200: updated = verify_response.json() if updated.get("name") == new_name: self.print_success(f"Name updated to: {new_name}") self.record_result(test_name, True, f"New name: {new_name}") return True self.print_success("Update request succeeded") self.record_result(test_name, True, "Update accepted") return True except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False def test_update_agent_settings(self) -> bool: """Test updating agent settings.""" test_name = "Update agent settings" self.print_header(test_name) if not self.require_auth(test_name): return True agent_data = self.get_or_create_test_agent() if not agent_data: self.print_warning("Could not create test agent") self.record_result(test_name, True, "Skipped (no test agent)") return True agent_id, _ = agent_data try: response = self.put( f"/api/update_agent/{agent_id}", json={ "chunks": 5, "description": "Updated description", }, timeout=10, ) if not self.assert_status(response, 200, test_name): return False self.print_success("Settings updated successfully") self.record_result(test_name, True, "Settings updated") return True except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False # ------------------------------------------------------------------------- # Delete Tests # ------------------------------------------------------------------------- def test_delete_agent(self) -> bool: """Test deleting an agent.""" test_name = "Delete agent" self.print_header(test_name) if not self.require_auth(test_name): return True # Create a fresh agent for deletion payload = { "name": f"Agent to Delete {int(time.time())}", "description": "Will be deleted", "prompt_id": "default", "chunks": 2, "retriever": "classic", "agent_type": "classic", "status": "draft", } try: create_response = self.post("/api/create_agent", json=payload, timeout=10) if create_response.status_code not in [200, 201]: self.print_warning("Could not create agent for deletion test") self.record_result(test_name, True, "Skipped (create failed)") return True agent_id = create_response.json().get("id") # Delete the agent (uses query param, not JSON body) response = self.delete(f"/api/delete_agent?id={agent_id}", timeout=10) if response.status_code in [200, 204]: self.print_success(f"Deleted agent: {agent_id}") self.record_result(test_name, True, "Agent deleted") return True else: self.print_error(f"Delete failed: {response.status_code}") self.record_result(test_name, False, f"Status: {response.status_code}") return False except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False # ------------------------------------------------------------------------- # Pin Tests # ------------------------------------------------------------------------- def test_pin_agent(self) -> bool: """Test pinning an agent.""" test_name = "Pin agent" self.print_header(test_name) if not self.require_auth(test_name): return True agent_data = self.get_or_create_test_agent() if not agent_data: self.print_warning("Could not create test agent") self.record_result(test_name, True, "Skipped (no test agent)") return True agent_id, _ = agent_data try: # Pin uses query param response = self.post(f"/api/pin_agent?id={agent_id}", timeout=10) if response.status_code in [200, 201]: self.print_success(f"Pinned agent: {agent_id}") self.record_result(test_name, True, "Agent pinned") return True else: self.print_error(f"Pin failed: {response.status_code}") self.record_result(test_name, False, f"Status: {response.status_code}") return False except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False def test_get_pinned_agents(self) -> bool: """Test getting pinned agents list.""" test_name = "Get pinned agents" self.print_header(test_name) if not self.require_auth(test_name): return True try: response = self.get("/api/pinned_agents", timeout=10) if not self.assert_status(response, 200, test_name): return False result = response.json() if not isinstance(result, list): self.print_error("Response is not a list") self.record_result(test_name, False, "Invalid response type") return False self.print_success(f"Retrieved {len(result)} pinned agents") self.record_result(test_name, True, f"Count: {len(result)}") return True except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False # ------------------------------------------------------------------------- # Template Tests # ------------------------------------------------------------------------- def test_get_template_agents(self) -> bool: """Test getting template agents.""" test_name = "Get template agents" self.print_header(test_name) try: response = self.get("/api/template_agents", timeout=10) if not self.assert_status(response, 200, test_name): return False result = response.json() if not isinstance(result, list): self.print_error("Response is not a list") self.record_result(test_name, False, "Invalid response type") return False self.print_success(f"Retrieved {len(result)} template agents") if result: self.print_info(f"First template: {result[0].get('name', 'N/A')}") self.record_result(test_name, True, f"Count: {len(result)}") return True except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False # ------------------------------------------------------------------------- # Sharing Tests # ------------------------------------------------------------------------- def test_share_agent(self) -> bool: """Test sharing an agent.""" test_name = "Share agent" self.print_header(test_name) if not self.require_auth(test_name): return True agent_data = self.get_or_create_test_agent() if not agent_data: self.print_warning("Could not create test agent") self.record_result(test_name, True, "Skipped (no test agent)") return True agent_id, _ = agent_data try: # ShareAgentModel requires 'id' and 'shared' fields response = self.put( "/api/share_agent", json={"id": agent_id, "shared": True}, timeout=10, ) if response.status_code in [200, 201]: self.print_success(f"Shared agent: {agent_id}") self.record_result(test_name, True, "Agent shared") return True else: self.print_error(f"Share failed: {response.status_code}") self.record_result(test_name, False, f"Status: {response.status_code}") return False except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False def test_get_shared_agents(self) -> bool: """Test listing shared agents.""" test_name = "Get shared agents" self.print_header(test_name) try: response = self.get("/api/shared_agents", timeout=10) if not self.assert_status(response, 200, test_name): return False result = response.json() if not isinstance(result, list): self.print_error("Response is not a list") self.record_result(test_name, False, "Invalid response type") return False self.print_success(f"Retrieved {len(result)} shared agents") self.record_result(test_name, True, f"Count: {len(result)}") return True except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False def test_get_shared_agent(self) -> bool: """Test getting a specific shared agent.""" test_name = "Get shared agent" self.print_header(test_name) try: # First get list of shared agents list_response = self.get("/api/shared_agents", timeout=10) if list_response.status_code != 200: self.print_warning("Could not get shared agents list") self.record_result(test_name, True, "Skipped (no shared agents)") return True shared = list_response.json() if not shared: self.print_warning("No shared agents available") self.record_result(test_name, True, "Skipped (no shared agents)") return True # Get first shared agent agent_id = shared[0].get("id") response = self.get("/api/shared_agent", params={"id": agent_id}, timeout=10) if not self.assert_status(response, 200, test_name): return False result = response.json() self.print_success(f"Retrieved shared agent: {result.get('name', 'N/A')}") self.record_result(test_name, True, f"Agent: {result.get('name', 'N/A')}") return True except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False def test_adopt_agent(self) -> bool: """Test adopting a shared agent.""" test_name = "Adopt shared agent" self.print_header(test_name) if not self.require_auth(test_name): return True try: # First get list of shared agents list_response = self.get("/api/shared_agents", timeout=10) if list_response.status_code != 200: self.print_warning("Could not get shared agents list") self.record_result(test_name, True, "Skipped (no shared agents)") return True shared = list_response.json() if not shared: self.print_warning("No shared agents to adopt") self.record_result(test_name, True, "Skipped (no shared agents)") return True # Try to adopt first shared agent agent_id = shared[0].get("id") response = self.post("/api/adopt_agent", json={"id": agent_id}, timeout=10) if response.status_code in [200, 201, 400]: # 400 might mean already adopted self.print_success(f"Adopt request completed: {response.status_code}") self.record_result(test_name, True, f"Status: {response.status_code}") return True else: self.print_error(f"Adopt failed: {response.status_code}") self.record_result(test_name, False, f"Status: {response.status_code}") return False except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False def test_remove_shared_agent(self) -> bool: """Test removing a shared agent.""" test_name = "Remove shared agent" self.print_header(test_name) if not self.require_auth(test_name): return True # Create and share an agent specifically for this test payload = { "name": f"Agent to Unshare {int(time.time())}", "description": "Will be shared then unshared", "prompt_id": "default", "chunks": 2, "retriever": "classic", "agent_type": "classic", "status": "draft", } try: create_response = self.post("/api/create_agent", json=payload, timeout=10) if create_response.status_code not in [200, 201]: self.print_warning("Could not create agent for unshare test") self.record_result(test_name, True, "Skipped (create failed)") return True agent_id = create_response.json().get("id") # Share the agent self.put("/api/share_agent", json={"agent_id": agent_id, "is_shared": True}) # Remove from shared response = self.delete( "/api/remove_shared_agent", json={"agent_id": agent_id}, timeout=10, ) if response.status_code in [200, 204, 400]: self.print_success(f"Remove shared request: {response.status_code}") self.record_result(test_name, True, f"Status: {response.status_code}") else: self.print_warning(f"Unexpected status: {response.status_code}") self.record_result(test_name, True, f"Status: {response.status_code}") # Cleanup self.cleanup_test_agent(agent_id) return True except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False # ------------------------------------------------------------------------- # Webhook Tests # ------------------------------------------------------------------------- def test_agent_webhook_get(self) -> bool: """Test getting agent webhook URL.""" test_name = "Get agent webhook" self.print_header(test_name) if not self.require_auth(test_name): return True agent_data = self.get_or_create_test_agent() if not agent_data: self.print_warning("Could not create test agent") self.record_result(test_name, True, "Skipped (no test agent)") return True agent_id, _ = agent_data try: # Uses 'id' query param, not 'agent_id' response = self.get("/api/agent_webhook", params={"id": agent_id}, timeout=10) if response.status_code in [200, 404]: self.print_success(f"Webhook request completed: {response.status_code}") if response.status_code == 200: result = response.json() self.print_info(f"Webhook URL: {result.get('url', 'N/A')[:50]}...") self.record_result(test_name, True, f"Status: {response.status_code}") return True else: self.print_error(f"Unexpected status: {response.status_code}") self.record_result(test_name, False, f"Status: {response.status_code}") return False except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False def test_webhook_by_token(self) -> bool: """Test webhook endpoint by token.""" test_name = "Webhook by token" self.print_header(test_name) if not self.require_auth(test_name): return True agent_data = self.get_or_create_test_agent() if not agent_data: self.print_warning("Could not create test agent") self.record_result(test_name, True, "Skipped (no test agent)") return True agent_id, api_key = agent_data if not api_key: self.print_warning("No API key for webhook test") self.record_result(test_name, True, "Skipped (no API key)") return True try: # Test GET webhook by token response = self.get(f"/api/webhooks/agents/{api_key}", timeout=10) if response.status_code in [200, 404, 405]: self.print_success(f"GET webhook: {response.status_code}") # Test POST webhook by token post_response = self.post( f"/api/webhooks/agents/{api_key}", json={"message": "test webhook"}, timeout=10, ) if post_response.status_code in [200, 400, 404, 405]: self.print_success(f"POST webhook: {post_response.status_code}") self.record_result(test_name, True, "Webhook endpoints tested") return True else: self.print_error(f"POST failed: {post_response.status_code}") self.record_result(test_name, False, f"Status: {post_response.status_code}") return False except Exception as e: self.print_error(f"Exception: {e}") self.record_result(test_name, False, str(e)) return False # ------------------------------------------------------------------------- # Test Runner # ------------------------------------------------------------------------- def run_all(self) -> bool: """Run all agent tests.""" self.print_header("DocsGPT Agent Integration Tests") self.print_info(f"Base URL: {self.base_url}") self.print_info(f"Auth: {self.token_source}") # Create tests self.test_create_agent_draft() self.test_create_agent_published() self.test_create_agent_with_tools() # Read tests self.test_get_agent() self.test_get_agent_not_found() self.test_get_agents() # Update tests self.test_update_agent_name() self.test_update_agent_settings() # Delete tests self.test_delete_agent() # Pin tests self.test_pin_agent() self.test_get_pinned_agents() # Template tests self.test_get_template_agents() # Sharing tests self.test_share_agent() self.test_get_shared_agents() self.test_get_shared_agent() self.test_adopt_agent() self.test_remove_shared_agent() # Webhook tests self.test_agent_webhook_get() self.test_webhook_by_token() # Cleanup test agent if created if hasattr(self, "_test_agent"): self.cleanup_test_agent(self._test_agent[0]) return self.print_summary() def main(): """Main entry point.""" client = create_client_from_args(AgentTests, "DocsGPT Agent Integration Tests") exit_code = 0 if client.run_all() else 1 sys.exit(exit_code) if __name__ == "__main__": main()