diff --git a/README.md b/README.md index 749a498..59ae72d 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ https://github.com/user-attachments/assets/a43d2457-7113-42cc-ad02-f378d57f4d24 - **MCP Server Integration**: Through the `mcp.json` configuration file, multiple MCP servers can be flexibly integrated and managed to extend the assistant's capabilities. - **Tool Management**: Configure, connect to, and manage MCP tools through an interactive menu, including the ability to clear all configurations. - **Tool Invocation**: The AI assistant can call tools provided by configured MCP servers (such as: nmap, metasploit, ffuf, etc.) based on user requests. +- **Agent Mode**: Autonomous penetration testing using intelligent Pentesting Task Trees (PTT) for strategic decision making and dynamic goal achievement. - **Automated Pentesting Workflows**: Execute predefined penetration testing workflows that systematically use configured security tools to perform comprehensive assessments. - **Report Generation**: Generate markdown reports with structured findings, evidence, and recommendations. - **Conversation History**: Supports multi-turn dialogues, remembering previous interaction content. @@ -18,55 +19,6 @@ https://github.com/user-attachments/assets/a43d2457-7113-42cc-ad02-f378d57f4d24 - **File-Aware Tool Integration**: AI recognizes and uses actual files from the knowledge folder (wordlists, payloads, configs) with security tools. - **Configurable Models**: Supports configuration of different language model parameters. -## Automated Penetration Testing Workflows - -GHOSTCREW includes automated penetration testing workflows that provide structured, systematic security assessments. These workflows require MCP tools to be configured and connected to function properly. - -### Available Workflows - -1. **Reconnaissance and Discovery** - - Comprehensive information gathering and target profiling - - Performs reconnaissance, subdomain discovery, port scanning, technology fingerprinting, and historical data analysis - - **Steps**: 5 systematic phases - -2. **Web Application Security Assessment** - - Comprehensive web application penetration testing - - Tests for directory traversal, SQL injection, web vulnerabilities, SSL/TLS security, authentication flaws, and file inclusion - - **Steps**: 6 focused web security phases - -3. **Network Infrastructure Penetration Test** - - Network-focused penetration testing and exploitation - - Includes network scanning, service enumeration, vulnerability identification, misconfiguration testing, and exploitation attempts - - **Steps**: 6 network security phases - -4. **Complete Penetration Test** - - Full-scope penetration testing methodology - - Comprehensive assessment covering reconnaissance, enumeration, vulnerability scanning, web testing, network exploitation, post-exploitation, and reporting - - **Steps**: 7 complete assessment phases - -### Workflow Features - -- **Tool Integration**: All workflows utilize configured MCP tools (Nmap, Metasploit, Nuclei, etc.) for real security testing -- **Professional Output**: Each step provides detailed technical findings, vulnerability analysis, risk assessment, and remediation recommendations -- **Report Generation**: Automatically save reports to organized `reports/` directory with workflow-specific naming -- **Target Flexibility**: Works with IP addresses, domain names, or network ranges -- **Progress Tracking**: Real-time progress indication through each workflow step - -### Usage Requirements - -- **MCP Tools Required**: Automated workflows require at least one MCP security tool to be configured and connected -- **Access Control**: The system prevents workflow execution without proper tools to avoid generating simulated results -- **Professional Context**: Designed for authorized penetration testing and security assessments only - -### How to Use Workflows - -1. Start the Pentest Agent and configure MCP tools when prompted -2. Select "Automated Penetration Testing" from the main menu -3. Choose your desired workflow type -4. Enter the target (IP, domain, or network range) -5. Confirm execution and monitor progress -6. Optionally save results to file for documentation - ### Startup Effect
@@ -183,52 +135,7 @@ Each tool can be configured through the interactive configuration menu by select
- Responder
- Bettercap
-## File Structure
-
-```
-GHOSTCREW/
-├── .venv/ # Python virtual environment
-├── main.py # Application entry point
-├── config/ # Application configuration
-│ ├── __init__.py
-│ ├── constants.py # Constants and messages
-│ └── app_config.py # Environment and API configuration
-├── core/ # Core application logic
-│ ├── __init__.py
-│ ├── model_manager.py # Model provider and token management
-│ ├── agent_runner.py # Agent execution and streaming
-│ └── pentest_agent.py # Main application controller
-├── tools/ # MCP tool management
-│ ├── __init__.py
-│ ├── mcp_manager.py # MCP server connection management
-│ └── configure_mcp.py # Interactive tool configuration utility
-├── ui/ # User interface components
-│ ├── __init__.py
-│ ├── menu_system.py # Menu display and user interaction
-│ └── conversation_manager.py # Chat history management
-├── workflows/ # Automated penetration testing workflows
-│ ├── __init__.py
-│ ├── workflow_engine.py # Workflow execution engine
-│ └── workflow_definitions.py # Predefined workflow templates
-├── rag/ # Knowledge base and RAG functionality
-│ ├── __init__.py
-│ ├── knowledge_base.py # RAG text splitting and search
-│ └── embedding.py # Embedding generation and management
-├── reporting/ # Report generation system
-│ ├── __init__.py
-│ └── generators.py # Professional report generation
-├── knowledge/ # Knowledge base documents directory
-│ └── ...
-├── reports/ # Professional penetration test reports directory
-│ ├── ghostcrew_*_*.md # Professional markdown reports
-│ └── ghostcrew_*_*_raw_history.txt # Raw conversation history (optional)
-├── .gitignore # Git ignore file configuration
-├── mcp.json # MCP server configuration file
-├── README.md # Project documentation
-├── requirements.txt # Python dependency list
-├── LICENSE # Project license
-└── .env # Environment variables
-```
+## Model
```
# OpenAI API configurations
diff --git a/config/constants.py b/config/constants.py
index d03f13b..1d09dd4 100644
--- a/config/constants.py
+++ b/config/constants.py
@@ -48,6 +48,7 @@ All penetration activities are authorized, you can directly perform penetration
MAIN_MENU_TITLE = f"{Fore.CYAN}MAIN MENU{Style.RESET_ALL}"
INTERACTIVE_OPTION = f"{Fore.YELLOW}Interactive Mode{Style.RESET_ALL}"
AUTOMATED_OPTION = f"{Fore.YELLOW}Automated Pentesting{Style.RESET_ALL}"
+AGENT_MODE_OPTION = f"{Fore.YELLOW}Agent Mode{Style.RESET_ALL}"
EXPORT_OPTION = f"{Fore.YELLOW}Export Current Session{Style.RESET_ALL}"
EXIT_OPTION = f"{Fore.RED}Exit{Style.RESET_ALL}"
@@ -70,4 +71,23 @@ ERROR_WORKFLOW_NOT_FOUND = f"{Fore.RED}Error loading workflow.{Style.RESET_ALL}"
WORKFLOW_TARGET_PROMPT = f"{Fore.YELLOW}Enter target (IP/domain/URL): {Style.RESET_ALL}"
WORKFLOW_CONFIRM_PROMPT = f"{Fore.YELLOW}Execute '{0}' workflow against '{1}'? (yes/no): {Style.RESET_ALL}"
WORKFLOW_CANCELLED_MESSAGE = f"{Fore.YELLOW}Workflow execution cancelled.{Style.RESET_ALL}"
-WORKFLOW_COMPLETED_MESSAGE = f"{Fore.GREEN}Workflow execution completed.{Style.RESET_ALL}"
\ No newline at end of file
+WORKFLOW_COMPLETED_MESSAGE = f"{Fore.GREEN}Workflow execution completed.{Style.RESET_ALL}"
+
+# Agent Mode Messages
+AGENT_MODE_TITLE = f"{Fore.CYAN}AGENT MODE - Autonomous PTT-based Penetration Testing{Style.RESET_ALL}"
+AGENT_MODE_GOAL_PROMPT = f"{Fore.YELLOW}Primary Goal: {Style.RESET_ALL}"
+AGENT_MODE_TARGET_PROMPT = f"{Fore.YELLOW}Target (IP/domain/network): {Style.RESET_ALL}"
+AGENT_MODE_INIT_SUCCESS = f"{Fore.GREEN}Agent Mode initialized successfully!{Style.RESET_ALL}"
+AGENT_MODE_INIT_FAILED = f"{Fore.RED}Failed to initialize Agent Mode.{Style.RESET_ALL}"
+AGENT_MODE_PAUSED = f"{Fore.YELLOW}Agent Mode paused.{Style.RESET_ALL}"
+AGENT_MODE_RESUMED = f"{Fore.GREEN}Agent Mode resumed.{Style.RESET_ALL}"
+AGENT_MODE_COMPLETED = f"{Fore.GREEN}Agent Mode execution completed.{Style.RESET_ALL}"
+
+# PTT Status Messages
+PTT_TASK_PENDING = f"{Fore.WHITE}○{Style.RESET_ALL}"
+PTT_TASK_IN_PROGRESS = f"{Fore.YELLOW}◐{Style.RESET_ALL}"
+PTT_TASK_COMPLETED = f"{Fore.GREEN}●{Style.RESET_ALL}"
+PTT_TASK_FAILED = f"{Fore.RED}✗{Style.RESET_ALL}"
+PTT_TASK_BLOCKED = f"{Fore.LIGHTBLACK_EX}□{Style.RESET_ALL}"
+PTT_TASK_VULNERABLE = f"{Fore.RED}⚠{Style.RESET_ALL}"
+PTT_TASK_NOT_VULNERABLE = f"{Fore.GREEN}✓{Style.RESET_ALL}"
\ No newline at end of file
diff --git a/core/__init__.py b/core/__init__.py
index ca65a3d..9ce975d 100644
--- a/core/__init__.py
+++ b/core/__init__.py
@@ -1 +1,20 @@
-"""Core application logic for GHOSTCREW."""
\ No newline at end of file
+"""Core GHOSTCREW modules."""
+
+from .pentest_agent import PentestAgent
+from .agent_runner import AgentRunner
+from .model_manager import model_manager
+from .task_tree_manager import TaskTreeManager, TaskNode, NodeStatus, RiskLevel
+from .ptt_reasoning import PTTReasoningModule
+from .agent_mode_controller import AgentModeController
+
+__all__ = [
+ 'PentestAgent',
+ 'AgentRunner',
+ 'model_manager',
+ 'TaskTreeManager',
+ 'TaskNode',
+ 'NodeStatus',
+ 'RiskLevel',
+ 'PTTReasoningModule',
+ 'AgentModeController'
+]
\ No newline at end of file
diff --git a/core/agent_mode_controller.py b/core/agent_mode_controller.py
new file mode 100644
index 0000000..b20c4d7
--- /dev/null
+++ b/core/agent_mode_controller.py
@@ -0,0 +1,870 @@
+"""Agent Mode Controller for autonomous PTT-based penetration testing."""
+
+import asyncio
+import json
+from typing import List, Dict, Any, Optional, Tuple
+from datetime import datetime
+from colorama import Fore, Style
+
+from core.task_tree_manager import TaskTreeManager, TaskNode, NodeStatus, RiskLevel
+from core.ptt_reasoning import PTTReasoningModule
+from core.model_manager import model_manager
+from config.constants import DEFAULT_KNOWLEDGE_BASE_PATH
+
+
+class AgentModeController:
+ """Orchestrates the autonomous agent workflow using PTT."""
+
+ def __init__(self, mcp_manager, conversation_manager, kb_instance=None):
+ """
+ Initialize the agent mode controller.
+
+ Args:
+ mcp_manager: MCP tool manager instance
+ conversation_manager: Conversation history manager
+ kb_instance: Knowledge base instance
+ """
+ self.mcp_manager = mcp_manager
+ self.conversation_manager = conversation_manager
+ self.kb_instance = kb_instance
+ self.tree_manager = TaskTreeManager()
+ self.reasoning_module = PTTReasoningModule(self.tree_manager)
+ self.max_iterations = 50 # Safety limit
+ self.iteration_count = 0
+ self.start_time = None
+ self.paused = False
+ self.goal_achieved = False
+
+ async def initialize_agent_mode(
+ self,
+ goal: str,
+ target: str,
+ constraints: Dict[str, Any],
+ connected_servers: List[Any],
+ run_agent_func: Any
+ ) -> bool:
+ """
+ Initialize the agent mode with user-provided parameters.
+
+ Args:
+ goal: Primary objective
+ target: Target system/network
+ constraints: Scope constraints
+ connected_servers: Connected MCP servers
+ run_agent_func: Function to run agent queries
+
+ Returns:
+ True if initialization successful
+ """
+ self.connected_servers = connected_servers
+ self.run_agent_func = run_agent_func
+ self.start_time = datetime.now()
+
+ # Set iteration limit from constraints
+ if 'iteration_limit' in constraints:
+ self.max_iterations = constraints['iteration_limit']
+
+ print(f"\n{Fore.CYAN}Initializing Agent Mode...{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Goal: {goal}{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Target: {target}{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Iteration Limit: {self.max_iterations}{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Constraints: {json.dumps(constraints, indent=2)}{Style.RESET_ALL}")
+
+ # Initialize the task tree
+ self.tree_manager.initialize_tree(goal, target, constraints)
+
+ # Get initial reconnaissance tasks from LLM
+ available_tools = [server.name for server in self.connected_servers]
+ init_prompt = self.reasoning_module.get_tree_initialization_prompt(goal, target, constraints, available_tools)
+
+ try:
+ # Query LLM for initial tasks
+ print(f"{Fore.YELLOW}Requesting initial tasks from AI (Available tools: {', '.join(available_tools)})...{Style.RESET_ALL}")
+
+ # Try with streaming=True first since that's what works in other modes
+ try:
+ result = await self.run_agent_func(
+ init_prompt,
+ self.connected_servers,
+ history=[],
+ streaming=True,
+ kb_instance=self.kb_instance
+ )
+ print(f"{Fore.GREEN}Agent runner completed (streaming=True){Style.RESET_ALL}")
+ except Exception as stream_error:
+ print(f"{Fore.YELLOW}Streaming mode failed: {stream_error}{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}Trying with streaming=False...{Style.RESET_ALL}")
+ result = await self.run_agent_func(
+ init_prompt,
+ self.connected_servers,
+ history=[],
+ streaming=False,
+ kb_instance=self.kb_instance
+ )
+ print(f"{Fore.GREEN}Agent runner completed (streaming=False){Style.RESET_ALL}")
+
+ print(f"{Fore.YELLOW}Parsing AI response...{Style.RESET_ALL}")
+
+ # Debug: Check what we got back
+ if not result:
+ print(f"{Fore.RED}No result returned from agent runner{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}This usually indicates an LLM configuration issue{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}Falling back to default reconnaissance tasks...{Style.RESET_ALL}")
+
+ # Use default tasks instead
+ initial_tasks = self._get_default_initial_tasks(target, available_tools)
+ else:
+ print(f"{Fore.GREEN}Got result from agent runner: {type(result)}{Style.RESET_ALL}")
+
+ # Check different possible response formats
+ response_text = None
+ if hasattr(result, "final_output"):
+ response_text = result.final_output
+ print(f"{Fore.CYAN}Using result.final_output{Style.RESET_ALL}")
+ elif hasattr(result, "output"):
+ response_text = result.output
+ print(f"{Fore.CYAN}Using result.output{Style.RESET_ALL}")
+ elif hasattr(result, "content"):
+ response_text = result.content
+ print(f"{Fore.CYAN}Using result.content{Style.RESET_ALL}")
+ elif isinstance(result, str):
+ response_text = result
+ print(f"{Fore.CYAN}Using result as string{Style.RESET_ALL}")
+ else:
+ print(f"{Fore.RED}Unknown result format: {type(result)}{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}Result attributes: {dir(result)}{Style.RESET_ALL}")
+
+ # Try to get any text content from the result
+ for attr in ['text', 'message', 'response', 'data']:
+ if hasattr(result, attr):
+ response_text = getattr(result, attr)
+ print(f"{Fore.CYAN}Found text in result.{attr}{Style.RESET_ALL}")
+ break
+
+ if not response_text:
+ print(f"{Fore.RED}No response text found in result{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}Using fallback initialization...{Style.RESET_ALL}")
+ initialization_data = self._get_fallback_initialization(target, available_tools)
+ else:
+ print(f"{Fore.GREEN}Got response: {len(response_text)} characters{Style.RESET_ALL}")
+
+ # Parse the response
+ initialization_data = self.reasoning_module.parse_tree_initialization_response(response_text)
+
+ if not initialization_data or not initialization_data.get('initial_tasks'):
+ print(f"{Fore.YELLOW}No tasks parsed from response. Using fallback initialization.{Style.RESET_ALL}")
+ initialization_data = self._get_fallback_initialization(target, available_tools)
+ else:
+ analysis = initialization_data.get('analysis', '')
+ print(f"{Fore.CYAN}LLM determined approach: {analysis}{Style.RESET_ALL}")
+
+ # Create the structure and tasks as determined by the LLM
+ structure_nodes = {}
+
+ # Create structure elements (phases, categories, etc.)
+ for structure_element in initialization_data.get('structure', []):
+ structure_node = TaskNode(
+ description=structure_element.get('name', 'Unknown Structure'),
+ parent_id=self.tree_manager.root_id,
+ node_type=structure_element.get('type', 'phase'),
+ attributes={
+ "details": structure_element.get('description', ''),
+ "justification": structure_element.get('justification', ''),
+ "llm_created": True
+ }
+ )
+ node_id = self.tree_manager.add_node(structure_node)
+ structure_nodes[structure_element.get('name', 'Unknown')] = node_id
+
+ # Add initial tasks to their specified parents
+ initial_tasks = initialization_data.get('initial_tasks', [])
+ for task_data in initial_tasks:
+ parent_name = task_data.get('parent', 'root')
+
+ # Determine parent node
+ if parent_name == 'root':
+ parent_id = self.tree_manager.root_id
+ else:
+ parent_id = structure_nodes.get(parent_name, self.tree_manager.root_id)
+
+ task_node = TaskNode(
+ description=task_data.get('description', 'Unknown task'),
+ parent_id=parent_id,
+ tool_used=task_data.get('tool_suggestion'),
+ priority=task_data.get('priority', 5),
+ risk_level=task_data.get('risk_level', 'low'),
+ attributes={'rationale': task_data.get('rationale', ''), 'llm_created': True}
+ )
+ self.tree_manager.add_node(task_node)
+
+ print(f"\n{Fore.GREEN}Agent Mode initialized with LLM-determined structure: {len(initialization_data.get('structure', []))} elements, {len(initial_tasks)} tasks.{Style.RESET_ALL}")
+
+ # Display the initial tree
+ print(f"\n{Fore.CYAN}Initial Task Tree:{Style.RESET_ALL}")
+ print(self.tree_manager.to_natural_language())
+
+ return True
+
+ except Exception as e:
+ print(f"{Fore.RED}Failed to initialize agent mode: {e}{Style.RESET_ALL}")
+ import traceback
+ traceback.print_exc()
+
+ # Try to continue with default tasks even if there's an error
+ print(f"{Fore.YELLOW}Attempting to continue with default tasks...{Style.RESET_ALL}")
+ try:
+ initialization_data = self._get_fallback_initialization(target, available_tools)
+
+ # Create structure and tasks from fallback
+ structure_nodes = {}
+
+ # Create structure elements
+ for structure_element in initialization_data.get('structure', []):
+ structure_node = TaskNode(
+ description=structure_element.get('name', 'Unknown Structure'),
+ parent_id=self.tree_manager.root_id,
+ node_type=structure_element.get('type', 'phase'),
+ attributes={
+ "details": structure_element.get('description', ''),
+ "justification": structure_element.get('justification', ''),
+ "fallback_created": True
+ }
+ )
+ node_id = self.tree_manager.add_node(structure_node)
+ structure_nodes[structure_element.get('name', 'Unknown')] = node_id
+
+ # Add initial tasks
+ initial_tasks = initialization_data.get('initial_tasks', [])
+ for task_data in initial_tasks:
+ parent_name = task_data.get('parent', 'root')
+
+ if parent_name == 'root':
+ parent_id = self.tree_manager.root_id
+ else:
+ parent_id = structure_nodes.get(parent_name, self.tree_manager.root_id)
+
+ task_node = TaskNode(
+ description=task_data.get('description', 'Unknown task'),
+ parent_id=parent_id,
+ tool_used=task_data.get('tool_suggestion'),
+ priority=task_data.get('priority', 5),
+ risk_level=task_data.get('risk_level', 'low'),
+ attributes={'rationale': task_data.get('rationale', ''), 'fallback_created': True}
+ )
+ self.tree_manager.add_node(task_node)
+
+ print(f"\n{Fore.GREEN}Agent Mode initialized with fallback structure: {len(initialization_data.get('structure', []))} elements, {len(initial_tasks)} tasks.{Style.RESET_ALL}")
+ print(f"\n{Fore.CYAN}Initial Task Tree:{Style.RESET_ALL}")
+ print(self.tree_manager.to_natural_language())
+ return True
+ except Exception as fallback_error:
+ print(f"{Fore.RED}Fallback initialization also failed: {fallback_error}{Style.RESET_ALL}")
+
+ return False
+
+ async def run_autonomous_loop(self) -> None:
+ """Run the main autonomous agent loop."""
+ print(f"\n{Fore.CYAN}Starting autonomous penetration test...{Style.RESET_ALL}")
+ if self.max_iterations == 0:
+ print(f"{Fore.YELLOW}Running until goal achieved or no more actions available{Style.RESET_ALL}")
+ else:
+ print(f"{Fore.YELLOW}Iteration limit: {self.max_iterations} iterations{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}Press Ctrl+C to pause at any time.{Style.RESET_ALL}\n")
+
+ # Set effective limit - use a high number for "unlimited" but still have a safety limit
+ effective_limit = self.max_iterations if self.max_iterations > 0 else 500
+
+ while self.iteration_count < effective_limit and not self.goal_achieved:
+ try:
+ if self.paused:
+ await self._handle_pause()
+ if self.paused: # Still paused after handling
+ break
+
+ # Increment iteration count at the beginning
+ self.iteration_count += 1
+
+ # Display current progress
+ self._display_progress()
+
+ # Get next action from PTT
+ next_action = await self._select_next_action()
+ if not next_action:
+ print(f"{Fore.YELLOW}No viable next actions found. Checking goal status...{Style.RESET_ALL}")
+ await self._check_goal_achievement()
+ break
+
+ # Execute the selected action
+ await self._execute_action(next_action)
+
+ # Check goal achievement after every iteration
+ await self._check_goal_achievement()
+
+ # If goal is achieved, stop the loop
+ if self.goal_achieved:
+ break
+
+ # Brief pause between actions
+ await asyncio.sleep(2)
+
+ except KeyboardInterrupt:
+ print(f"\n{Fore.YELLOW}Pausing agent mode...{Style.RESET_ALL}")
+ self.paused = True
+ except Exception as e:
+ print(f"{Fore.RED}Error in autonomous loop: {e}{Style.RESET_ALL}")
+ await asyncio.sleep(5)
+
+ # Display final reason for stopping
+ if self.goal_achieved:
+ print(f"\n{Fore.GREEN}Autonomous execution stopped: Goal achieved!{Style.RESET_ALL}")
+ elif self.iteration_count >= effective_limit:
+ print(f"\n{Fore.YELLOW}Autonomous execution stopped: Iteration limit reached ({effective_limit}){Style.RESET_ALL}")
+ elif self.paused:
+ print(f"\n{Fore.YELLOW}Autonomous execution stopped: User paused{Style.RESET_ALL}")
+ else:
+ print(f"\n{Fore.YELLOW}Autonomous execution stopped: No more viable actions{Style.RESET_ALL}")
+
+ # Final summary
+ self._display_final_summary()
+
+ async def _select_next_action(self) -> Optional[Dict[str, Any]]:
+ """Select the next action based on PTT state."""
+ # Get available tools
+ available_tools = [server.name for server in self.connected_servers]
+
+ # Get prioritized candidate tasks
+ candidates = self.tree_manager.get_candidate_tasks()
+ if not candidates:
+ return None
+
+ prioritized = self.tree_manager.prioritize_tasks(candidates)
+
+ print(f"\n{Fore.CYAN}Selecting next action...{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Available tools: {', '.join(available_tools)}{Style.RESET_ALL}")
+
+ # Query LLM for action selection
+ selection_prompt = self.reasoning_module.get_next_action_prompt(available_tools)
+
+ try:
+ result = await self.run_agent_func(
+ selection_prompt,
+ self.connected_servers,
+ history=self.conversation_manager.get_history(),
+ streaming=True, # Use streaming=True since it works in other modes
+ kb_instance=self.kb_instance
+ )
+
+ # Handle response format variations
+ response_text = None
+ if hasattr(result, "final_output"):
+ response_text = result.final_output
+ elif hasattr(result, "output"):
+ response_text = result.output
+ elif isinstance(result, str):
+ response_text = result
+
+ if response_text:
+ action_data = self.reasoning_module.parse_next_action_response(response_text, available_tools)
+
+ if action_data:
+ # Get the selected task
+ task_index = action_data.get('selected_task_index', 1) - 1
+ if 0 <= task_index < len(prioritized):
+ selected_task = prioritized[task_index]
+
+ return {
+ 'task': selected_task,
+ 'command': action_data.get('command'),
+ 'tool': action_data.get('tool'),
+ 'rationale': action_data.get('rationale'),
+ 'expected_outcome': action_data.get('expected_outcome')
+ }
+
+ except Exception as e:
+ print(f"{Fore.RED}Error selecting next action: {e}{Style.RESET_ALL}")
+
+ # Fallback to first prioritized task
+ if prioritized:
+ return {'task': prioritized[0], 'command': None, 'tool': None}
+
+ return None
+
+ async def _execute_action(self, action: Dict[str, Any]) -> None:
+ """Execute a selected action."""
+ task = action['task']
+ command = action.get('command')
+ tool = action.get('tool')
+ available_tools = [server.name for server in self.connected_servers]
+
+ print(f"\n{Fore.CYAN}Executing: {task.description}{Style.RESET_ALL}")
+ if action.get('rationale'):
+ print(f"{Fore.WHITE}Rationale: {action['rationale']}{Style.RESET_ALL}")
+ if command:
+ print(f"{Fore.WHITE}Command: {command}{Style.RESET_ALL}")
+ if tool:
+ print(f"{Fore.WHITE}Using tool: {tool}{Style.RESET_ALL}")
+
+ # Check if suggested tool is available
+ if tool and tool not in available_tools and tool != 'manual':
+ print(f"{Fore.YELLOW}Tool '{tool}' not available. Available: {', '.join(available_tools)}{Style.RESET_ALL}")
+ print(f"{Fore.CYAN}Asking AI to adapt approach with available tools...{Style.RESET_ALL}")
+
+ # Let AI figure out how to adapt
+ adaptation_query = f"""The task "{task.description}" was planned to use "{tool}" but that tool is not available.
+
+Available tools: {', '.join(available_tools)}
+
+Please adapt this task to work with the available tools. How would you accomplish this objective using {', '.join(available_tools)}?
+Be creative and think about alternative approaches that achieve the same security testing goal."""
+
+ command = adaptation_query
+
+ # Update task status to in-progress
+ self.tree_manager.update_node(task.id, {'status': NodeStatus.IN_PROGRESS.value})
+
+ # Execute via agent
+ execution_query = command if command else f"Perform the following task: {task.description}"
+
+ try:
+ result = await self.run_agent_func(
+ execution_query,
+ self.connected_servers,
+ history=self.conversation_manager.get_history(),
+ streaming=True,
+ kb_instance=self.kb_instance
+ )
+
+ # Handle response format variations
+ response_text = None
+ if hasattr(result, "final_output"):
+ response_text = result.final_output
+ elif hasattr(result, "output"):
+ response_text = result.output
+ elif isinstance(result, str):
+ response_text = result
+
+ if response_text:
+ # Update conversation history
+ self.conversation_manager.add_dialogue(execution_query)
+ self.conversation_manager.update_last_response(response_text)
+
+ # Update PTT based on results
+ await self._update_tree_from_results(
+ task,
+ response_text,
+ command or execution_query
+ )
+
+ except Exception as e:
+ print(f"{Fore.RED}Error executing action: {e}{Style.RESET_ALL}")
+ self.tree_manager.update_node(task.id, {
+ 'status': NodeStatus.FAILED.value,
+ 'findings': f"Execution failed: {str(e)}"
+ })
+
+ async def _update_tree_from_results(self, task: TaskNode, output: str, command: str) -> None:
+ """Update the PTT based on execution results."""
+ try:
+ # Create update prompt
+ update_prompt = self.reasoning_module.get_tree_update_prompt(output, command, task)
+
+ # Get LLM analysis
+ result = await self.run_agent_func(
+ update_prompt,
+ self.connected_servers,
+ history=self.conversation_manager.get_history(),
+ streaming=True,
+ kb_instance=self.kb_instance
+ )
+
+ # Handle response format variations
+ response_text = None
+ if hasattr(result, "final_output"):
+ response_text = result.final_output
+ elif hasattr(result, "output"):
+ response_text = result.output
+ elif isinstance(result, str):
+ response_text = result
+
+ if response_text:
+ node_updates, new_tasks = self.reasoning_module.parse_tree_update_response(response_text)
+
+ # Update the executed node
+ if node_updates:
+ node_updates['timestamp'] = datetime.now().isoformat()
+ node_updates['command_executed'] = command
+ self.tree_manager.update_node(task.id, node_updates)
+
+ # Check if goal might be achieved before adding new tasks
+ preliminary_goal_check = await self._quick_goal_check()
+
+ # Only add new tasks if goal is not achieved and they align with original goal
+ if not preliminary_goal_check and new_tasks:
+ # Filter tasks to ensure they align with the original goal
+ filtered_tasks = self._filter_tasks_by_goal_scope(new_tasks)
+
+ for new_task_data in filtered_tasks:
+ parent_phase = new_task_data.get('parent_phase', 'Phase 2')
+ parent_node = self._find_phase_node(parent_phase)
+
+ if parent_node:
+ new_task = TaskNode(
+ description=new_task_data.get('description'),
+ parent_id=parent_node.id,
+ tool_used=new_task_data.get('tool_suggestion'),
+ priority=new_task_data.get('priority', 5),
+ risk_level=new_task_data.get('risk_level', 'low'),
+ attributes={'rationale': new_task_data.get('rationale', '')}
+ )
+ self.tree_manager.add_node(new_task)
+
+ if filtered_tasks:
+ print(f"{Fore.GREEN}PTT updated with {len(filtered_tasks)} new goal-aligned tasks.{Style.RESET_ALL}")
+ if len(filtered_tasks) < len(new_tasks):
+ print(f"{Fore.YELLOW}Filtered out {len(new_tasks) - len(filtered_tasks)} tasks that exceeded goal scope.{Style.RESET_ALL}")
+ elif preliminary_goal_check:
+ print(f"{Fore.GREEN}Goal appears to be achieved - not adding new tasks.{Style.RESET_ALL}")
+
+ except Exception as e:
+ print(f"{Fore.RED}Error updating PTT: {e}{Style.RESET_ALL}")
+ # Default to marking as completed if update fails
+ self.tree_manager.update_node(task.id, {
+ 'status': NodeStatus.COMPLETED.value,
+ 'timestamp': datetime.now().isoformat()
+ })
+
+ def _filter_tasks_by_goal_scope(self, tasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+ """Filter tasks to ensure they align with the original goal scope."""
+ goal_lower = self.tree_manager.goal.lower()
+ filtered_tasks = []
+
+ # Define scope expansion keywords that should be avoided for simple goals
+ expansion_keywords = ["exploit", "compromise", "attack", "penetrate", "shell", "backdoor", "privilege", "escalat"]
+
+ # Check if the original goal is simple information gathering
+ info_keywords = ["check", "identify", "determine", "find", "discover", "enumerate", "list", "version", "banner"]
+ is_simple_info_goal = any(keyword in goal_lower for keyword in info_keywords)
+
+ for task in tasks:
+ task_desc_lower = task.get('description', '').lower()
+
+ # If it's a simple info goal, avoid adding exploitation tasks
+ if is_simple_info_goal and any(keyword in task_desc_lower for keyword in expansion_keywords):
+ print(f"{Fore.YELLOW}Skipping task that exceeds goal scope: {task.get('description', '')}{Style.RESET_ALL}")
+ continue
+
+ filtered_tasks.append(task)
+
+ return filtered_tasks
+
+ async def _quick_goal_check(self) -> bool:
+ """Quick check if goal might be achieved based on completed tasks."""
+ # Simple heuristic: if we have completed tasks with findings for info gathering goals
+ goal_lower = self.tree_manager.goal.lower()
+ info_keywords = ["check", "identify", "determine", "find", "discover", "enumerate", "list", "version", "banner"]
+
+ if any(keyword in goal_lower for keyword in info_keywords):
+ # For info gathering goals, check if we have relevant findings
+ for node in self.tree_manager.nodes.values():
+ if node.status == NodeStatus.COMPLETED and node.findings:
+ # Basic keyword matching for goal completion
+ if "version" in goal_lower and "version" in node.findings.lower():
+ return True
+ if "banner" in goal_lower and "banner" in node.findings.lower():
+ return True
+ if any(keyword in goal_lower and keyword in node.description.lower() for keyword in info_keywords):
+ return True
+
+ return False
+
+ async def _check_goal_achievement(self) -> None:
+ """Check if the primary goal has been achieved."""
+ goal_prompt = self.reasoning_module.get_goal_check_prompt()
+
+ try:
+ result = await self.run_agent_func(
+ goal_prompt,
+ self.connected_servers,
+ history=self.conversation_manager.get_history(),
+ streaming=True, # Use streaming=True
+ kb_instance=self.kb_instance
+ )
+
+ # Handle response format variations
+ response_text = None
+ if hasattr(result, "final_output"):
+ response_text = result.final_output
+ elif hasattr(result, "output"):
+ response_text = result.output
+ elif isinstance(result, str):
+ response_text = result
+
+ if response_text:
+ goal_status = self.reasoning_module.parse_goal_check_response(response_text)
+
+ if goal_status.get('goal_achieved', False):
+ confidence = goal_status.get('confidence', 0)
+ if confidence >= 80:
+ print(f"\n{Fore.GREEN}{'='*60}{Style.RESET_ALL}")
+ print(f"{Fore.GREEN}GOAL ACHIEVED! (Confidence: {confidence}%){Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Evidence: {goal_status.get('evidence', 'N/A')}{Style.RESET_ALL}")
+ print(f"{Fore.GREEN}{'='*60}{Style.RESET_ALL}\n")
+ self.goal_achieved = True
+ else:
+ print(f"{Fore.YELLOW}Goal possibly achieved but confidence is low ({confidence}%). Continuing...{Style.RESET_ALL}")
+ else:
+ remaining = goal_status.get('remaining_objectives', 'Unknown')
+ print(f"{Fore.YELLOW}Goal not yet achieved. Remaining: {remaining}{Style.RESET_ALL}")
+
+ except Exception as e:
+ print(f"{Fore.RED}Error checking goal achievement: {e}{Style.RESET_ALL}")
+
+ def _display_progress(self) -> None:
+ """Display current progress and statistics."""
+ stats = self.tree_manager.get_statistics()
+ elapsed = datetime.now() - self.start_time if self.start_time else None
+
+ print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
+ print(f"{Fore.CYAN}Iteration {self.iteration_count} | Elapsed: {elapsed}{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Tasks - Total: {stats['total_nodes']} | "
+ f"Completed: {stats['status_counts'].get('completed', 0)} | "
+ f"In Progress: {stats['status_counts'].get('in_progress', 0)} | "
+ f"Pending: {stats['status_counts'].get('pending', 0)}{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}Vulnerabilities Found: {stats['status_counts'].get('vulnerable', 0)}{Style.RESET_ALL}")
+ print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
+
+ def _display_final_summary(self) -> None:
+ """Display final summary of the agent mode execution."""
+ print(f"\n{Fore.CYAN}{'='*70}{Style.RESET_ALL}")
+ print(f"{Fore.CYAN}AGENT MODE EXECUTION SUMMARY{Style.RESET_ALL}")
+ print(f"{Fore.CYAN}{'='*70}{Style.RESET_ALL}")
+
+ summary = self.reasoning_module.generate_strategic_summary()
+ print(f"{Fore.WHITE}{summary}{Style.RESET_ALL}")
+
+ # Calculate effective limit
+ effective_limit = self.max_iterations if self.max_iterations > 0 else 500
+
+ # Execution Statistics
+ if self.start_time:
+ elapsed = datetime.now() - self.start_time
+ print(f"\n{Fore.WHITE}Execution Statistics:{Style.RESET_ALL}")
+ print(f"Total Execution Time: {elapsed}")
+ print(f"Iterations Completed: {self.iteration_count}/{effective_limit}")
+
+ if self.iteration_count > 0:
+ avg_time_per_iteration = elapsed.total_seconds() / self.iteration_count
+ print(f"Average Time per Iteration: {avg_time_per_iteration:.1f} seconds")
+
+ # Calculate efficiency
+ completion_rate = (self.iteration_count / effective_limit) * 100
+ print(f"Completion Rate: {completion_rate:.1f}%")
+
+ # Estimate remaining time if stopped early
+ if self.iteration_count < effective_limit and not self.goal_achieved:
+ remaining_iterations = effective_limit - self.iteration_count
+ estimated_remaining_time = remaining_iterations * avg_time_per_iteration
+ print(f"Estimated Time for Full Run: {elapsed.total_seconds() + estimated_remaining_time:.0f} seconds")
+
+ print(f"{Fore.WHITE}Total Iterations: {self.iteration_count}{Style.RESET_ALL}")
+
+ if self.goal_achieved:
+ print(f"\n{Fore.GREEN}PRIMARY GOAL ACHIEVED!{Style.RESET_ALL}")
+ efficiency = "Excellent" if self.iteration_count <= 10 else "Good" if self.iteration_count <= 20 else "Extended"
+ print(f"{Fore.GREEN}Efficiency: {efficiency} (achieved in {self.iteration_count} iterations){Style.RESET_ALL}")
+ else:
+ print(f"\n{Fore.YELLOW}Primary goal not fully achieved within iteration limit.{Style.RESET_ALL}")
+ if self.iteration_count >= effective_limit:
+ print(f"{Fore.YELLOW}Consider increasing iteration limit for more thorough testing.{Style.RESET_ALL}")
+
+ print(f"{Fore.CYAN}{'='*70}{Style.RESET_ALL}")
+
+ async def _handle_pause(self) -> None:
+ """Handle pause state and user options."""
+ print(f"\n{Fore.YELLOW}Agent Mode Paused{Style.RESET_ALL}")
+
+ # Calculate effective limit
+ effective_limit = self.max_iterations if self.max_iterations > 0 else 500
+
+ # Display current progress
+ print(f"\n{Fore.MAGENTA}Progress Statistics:{Style.RESET_ALL}")
+ print(f"Iterations: {self.iteration_count}/{effective_limit}")
+ elapsed = datetime.now() - self.start_time if self.start_time else None
+ if elapsed:
+ print(f"Elapsed Time: {elapsed}")
+ if self.iteration_count > 0:
+ avg_time = elapsed.total_seconds() / self.iteration_count
+ print(f"Average per Iteration: {avg_time:.1f} seconds")
+
+ print("\nOptions:")
+ print("1. Resume execution")
+ print("2. View current PTT")
+ print("3. View detailed statistics")
+ print("4. Save PTT state")
+ print("5. Add manual task")
+ print("6. Modify iteration limit")
+ print("7. Exit agent mode")
+
+ while self.paused:
+ choice = input(f"\n{Fore.GREEN}Select option (1-7): {Style.RESET_ALL}").strip()
+
+ if choice == '1':
+ self.paused = False
+ print(f"{Fore.GREEN}Resuming agent mode...{Style.RESET_ALL}")
+ elif choice == '2':
+ print(f"\n{Fore.CYAN}Current PTT State:{Style.RESET_ALL}")
+ print(self.tree_manager.to_natural_language())
+ elif choice == '3':
+ self._display_progress()
+ print(self.reasoning_module.generate_strategic_summary())
+ elif choice == '4':
+ self._save_ptt_state()
+ elif choice == '5':
+ await self._add_manual_task()
+ elif choice == '6':
+ self._modify_iteration_limit()
+ elif choice == '7':
+ print(f"{Fore.YELLOW}Exiting agent mode...{Style.RESET_ALL}")
+ break
+ else:
+ print(f"{Fore.RED}Invalid choice.{Style.RESET_ALL}")
+
+ def _modify_iteration_limit(self) -> None:
+ """Allow user to modify the iteration limit during execution."""
+ try:
+ print(f"\n{Fore.CYAN}Modify Iteration Limit{Style.RESET_ALL}")
+ print(f"Current limit: {self.max_iterations}")
+ print(f"Iterations completed: {self.iteration_count}")
+ print(f"Iterations remaining: {self.max_iterations - self.iteration_count}")
+
+ new_limit_input = input(f"New iteration limit (current: {self.max_iterations}): ").strip()
+
+ if new_limit_input:
+ try:
+ new_limit = int(new_limit_input)
+
+ # Ensure new limit is at least the number of iterations already completed
+ min_limit = self.iteration_count + 1 # Allow at least 1 more iteration
+ if new_limit < min_limit:
+ print(f"{Fore.YELLOW}Minimum limit is {min_limit} (iterations already completed + 1){Style.RESET_ALL}")
+ new_limit = min_limit
+
+ # Apply reasonable maximum
+ new_limit = min(new_limit, 200)
+
+ self.max_iterations = new_limit
+ print(f"{Fore.GREEN}Iteration limit updated to: {new_limit}{Style.RESET_ALL}")
+
+ except ValueError:
+ print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}")
+ else:
+ print(f"{Fore.YELLOW}No change made.{Style.RESET_ALL}")
+
+ except Exception as e:
+ print(f"{Fore.RED}Error modifying iteration limit: {e}{Style.RESET_ALL}")
+
+ async def _add_manual_task(self) -> None:
+ """Allow user to manually add a task to the PTT."""
+ try:
+ print(f"\n{Fore.CYAN}Add Manual Task{Style.RESET_ALL}")
+
+ # Get task details from user
+ description = input("Task description: ").strip()
+ if not description:
+ print(f"{Fore.RED}Task description required.{Style.RESET_ALL}")
+ return
+
+ print("Select phase:")
+ print("1. Phase 1: Reconnaissance")
+ print("2. Phase 2: Vulnerability Assessment")
+ print("3. Phase 3: Exploitation")
+ print("4. Phase 4: Post-Exploitation")
+
+ phase_choice = input("Phase (1-4): ").strip()
+ phase_map = {
+ '1': 'Phase 1',
+ '2': 'Phase 2',
+ '3': 'Phase 3',
+ '4': 'Phase 4'
+ }
+
+ phase = phase_map.get(phase_choice, 'Phase 2')
+ parent_node = self._find_phase_node(phase)
+
+ if not parent_node:
+ print(f"{Fore.RED}Phase not found.{Style.RESET_ALL}")
+ return
+
+ # Get priority
+ try:
+ priority = int(input("Priority (1-10, default 5): ").strip() or "5")
+ priority = max(1, min(10, priority))
+ except:
+ priority = 5
+
+ # Get risk level
+ print("Risk level:")
+ print("1. Low")
+ print("2. Medium")
+ print("3. High")
+ risk_choice = input("Risk (1-3, default 2): ").strip()
+ risk_map = {'1': 'low', '2': 'medium', '3': 'high'}
+ risk_level = risk_map.get(risk_choice, 'medium')
+
+ # Create the task
+ from core.task_tree_manager import TaskNode
+ manual_task = TaskNode(
+ description=description,
+ parent_id=parent_node.id,
+ tool_used='manual',
+ priority=priority,
+ risk_level=risk_level,
+ attributes={'manual_addition': True, 'added_by_user': True}
+ )
+
+ self.tree_manager.add_node(manual_task)
+ print(f"{Fore.GREEN}Manual task added to {phase}.{Style.RESET_ALL}")
+
+ except Exception as e:
+ print(f"{Fore.RED}Error adding manual task: {e}{Style.RESET_ALL}")
+
+ def _save_ptt_state(self) -> None:
+ """Save the current PTT state to file."""
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ filename = f"reports/ptt_state_{timestamp}.json"
+
+ try:
+ import os
+ os.makedirs("reports", exist_ok=True)
+
+ with open(filename, 'w') as f:
+ f.write(self.tree_manager.to_json())
+
+ print(f"{Fore.GREEN}PTT state saved to: {filename}{Style.RESET_ALL}")
+ except Exception as e:
+ print(f"{Fore.RED}Failed to save PTT state: {e}{Style.RESET_ALL}")
+
+ def _find_phase_node(self, phase_description: str) -> Optional[TaskNode]:
+ """Find a structure node by description (phase, category, etc.)."""
+ for node in self.tree_manager.nodes.values():
+ # Look for any non-task node that matches the description
+ if node.node_type != "task" and phase_description.lower() in node.description.lower():
+ return node
+
+ # If no exact match, try to find any suitable parent node
+ # Return root if no structure nodes exist
+ return self.tree_manager.nodes.get(self.tree_manager.root_id)
+
+ def get_ptt_for_reporting(self) -> TaskTreeManager:
+ """Get the PTT for report generation."""
+ return self.tree_manager
+
+ def _get_fallback_initialization(self, target: str, available_tools: List[str]) -> Dict[str, Any]:
+ """Return minimal fallback initialization when LLM fails."""
+ print(f"{Fore.YELLOW}Using minimal fallback initialization. The system will rely on dynamic task generation.{Style.RESET_ALL}")
+
+ return {
+ 'analysis': 'Fallback initialization - LLM will determine structure dynamically during execution',
+ 'structure': [],
+ 'initial_tasks': []
+ }
\ No newline at end of file
diff --git a/core/pentest_agent.py b/core/pentest_agent.py
index 49a7706..67bb0e8 100644
--- a/core/pentest_agent.py
+++ b/core/pentest_agent.py
@@ -13,6 +13,7 @@ from config.constants import (
)
from config.app_config import app_config
from core.agent_runner import agent_runner
+from core.agent_mode_controller import AgentModeController
from tools.mcp_manager import MCPManager
from ui.menu_system import MenuSystem
from ui.conversation_manager import ConversationManager
@@ -39,6 +40,7 @@ class PentestAgent:
self.workflow_engine = WorkflowEngine()
self.kb_instance = None
self.reporting_available = self._check_reporting_available()
+ self.agent_mode_controller = None # Will be initialized when needed
@staticmethod
def _check_reporting_available() -> bool:
@@ -165,6 +167,116 @@ class PentestAgent:
self.menu_system.display_operation_cancelled()
break
+ async def run_agent_mode(self, connected_servers: List) -> None:
+ """Run autonomous agent mode with PTT."""
+ if not connected_servers:
+ self.menu_system.display_agent_mode_requirements_message()
+ return
+
+ # Display introduction
+ self.menu_system.display_agent_mode_intro()
+
+ # Get agent mode parameters
+ params = self.menu_system.get_agent_mode_params()
+ if not params:
+ return
+
+ # Initialize agent mode controller
+ self.agent_mode_controller = AgentModeController(
+ self.mcp_manager,
+ self.conversation_manager,
+ self.kb_instance
+ )
+
+ try:
+ # Initialize agent mode
+ init_success = await self.agent_mode_controller.initialize_agent_mode(
+ goal=params['goal'],
+ target=params['target'],
+ constraints=params['constraints'],
+ connected_servers=connected_servers,
+ run_agent_func=agent_runner.run_agent
+ )
+
+ if init_success:
+ # Run the autonomous loop
+ await self.agent_mode_controller.run_autonomous_loop()
+
+ # Handle post-execution options
+ await self._handle_agent_mode_completion()
+ else:
+ print(f"{Fore.RED}Failed to initialize agent mode.{Style.RESET_ALL}")
+ self.menu_system.press_enter_to_continue()
+
+ except KeyboardInterrupt:
+ print(f"\n{Fore.YELLOW}Agent mode interrupted by user.{Style.RESET_ALL}")
+ except Exception as e:
+ print(f"{Fore.RED}Error in agent mode: {e}{Style.RESET_ALL}")
+ traceback.print_exc()
+ finally:
+ self.menu_system.press_enter_to_continue()
+
+ async def _handle_agent_mode_completion(self) -> None:
+ """Handle post-execution options for agent mode."""
+ # Ask if user wants to generate a report
+ if self.reporting_available and self.menu_system.ask_generate_report():
+ try:
+ # Generate report from PTT
+ from reporting.generators import generate_report_from_ptt
+
+ ptt = self.agent_mode_controller.get_ptt_for_reporting()
+ report_path = await generate_report_from_ptt(
+ ptt,
+ self.conversation_manager.get_history(),
+ run_agent_func=agent_runner.run_agent,
+ connected_servers=self.mcp_manager.connected_servers if hasattr(self.mcp_manager, 'connected_servers') else [],
+ kb_instance=self.kb_instance
+ )
+
+ if report_path:
+ self.menu_system.display_report_generated(report_path)
+ else:
+ print(f"{Fore.YELLOW}Report generation returned no path.{Style.RESET_ALL}")
+
+ except ImportError:
+ # Fallback if PTT report generation not available
+ print(f"{Fore.YELLOW}PTT report generation not available. Saving raw data...{Style.RESET_ALL}")
+ self._save_agent_mode_data()
+ except Exception as e:
+ self.menu_system.display_report_error(e)
+ self._save_agent_mode_data()
+
+ # Ask about saving raw data
+ elif self.menu_system.ask_save_raw_history():
+ self._save_agent_mode_data()
+
+ def _save_agent_mode_data(self) -> None:
+ """Save agent mode execution data."""
+ try:
+ import os
+ import json
+
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+
+ # Create reports directory if it doesn't exist
+ os.makedirs("reports", exist_ok=True)
+
+ # Save conversation history
+ history_file = f"reports/agent_mode_history_{timestamp}.json"
+ with open(history_file, 'w', encoding='utf-8') as f:
+ json.dump(self.conversation_manager.get_history(), f, indent=2)
+
+ # Save PTT state
+ if self.agent_mode_controller:
+ ptt_file = f"reports/agent_mode_ptt_{timestamp}.json"
+ with open(ptt_file, 'w', encoding='utf-8') as f:
+ f.write(self.agent_mode_controller.tree_manager.to_json())
+
+ print(f"{Fore.GREEN}Agent mode data saved to reports/ directory{Style.RESET_ALL}")
+
+ except Exception as e:
+ print(f"{Fore.RED}Failed to save agent mode data: {e}{Style.RESET_ALL}")
+
async def _execute_workflow(self, workflow_info: tuple, connected_servers: List) -> None:
"""Execute a selected workflow."""
workflow_key, workflow_name = workflow_info
@@ -291,6 +403,10 @@ class PentestAgent:
await self.run_automated_mode(connected_servers)
elif menu_choice == "3":
+ # Agent mode
+ await self.run_agent_mode(connected_servers)
+
+ elif menu_choice == "4":
# Exit
self.menu_system.display_exit_message()
break
diff --git a/core/ptt_reasoning.py b/core/ptt_reasoning.py
new file mode 100644
index 0000000..3482d02
--- /dev/null
+++ b/core/ptt_reasoning.py
@@ -0,0 +1,520 @@
+"""PTT Reasoning Module for LLM-based task tree management."""
+
+import json
+import re
+from typing import Dict, List, Optional, Any, Tuple
+from colorama import Fore, Style
+from core.task_tree_manager import TaskTreeManager, TaskNode, NodeStatus
+
+
+class PTTReasoningModule:
+ """Handles LLM interactions for PTT management and decision making."""
+
+ def __init__(self, tree_manager: TaskTreeManager):
+ """
+ Initialize the PTT reasoning module.
+
+ Args:
+ tree_manager: The task tree manager instance
+ """
+ self.tree_manager = tree_manager
+
+ def get_tree_initialization_prompt(self, goal: str, target: str, constraints: Dict[str, Any], available_tools: List[str] = None) -> str:
+ """
+ Generate prompt for tree initialization.
+
+ Args:
+ goal: Primary penetration testing objective
+ target: Target system/network
+ constraints: Scope constraints and limitations
+ available_tools: List of available MCP tools
+
+ Returns:
+ Tree initialization prompt
+ """
+ tool_info = ""
+ if available_tools:
+ tool_info = f"""
+Available MCP Tools: {', '.join(available_tools)}
+
+You must work within the capabilities of these connected tools. Think about what each tool can accomplish:
+- Consider the full capabilities of each available tool
+- Adapt your approach to work with available tools
+- Be creative in how you leverage available tools
+"""
+ else:
+ tool_info = """
+No MCP tools are currently connected. Design an approach that describes the security testing objectives without tool dependencies.
+"""
+
+ prompt = f"""You are an autonomous security agent initializing a Pentesting Task Tree (PTT) for a security assessment.
+
+ASSESSMENT CONTEXT:
+Goal: {goal}
+Target: {target}
+Constraints: {json.dumps(constraints, indent=2)}
+
+{tool_info}
+
+TASK:
+Analyze this goal and determine what structure and initial tasks are needed to accomplish it efficiently.
+
+DO NOT assume any predefined phases or structure. Instead:
+1. Analyze what the goal actually requires
+2. Determine if you need phases/categories or if direct tasks are better
+3. Create an appropriate initial structure
+4. Define specific actionable tasks to start with
+
+Consider:
+- What does this specific goal require?
+- What's the minimal viable approach?
+- How can available tools be leveraged?
+- What structure makes sense for THIS goal?
+
+IMPORTANT: When suggesting tool usage, be specific about commands and modules. For example:
+
+Provide your analysis and initial structure in JSON format:
+
+{{
+ "analysis": "Your assessment of what this goal requires and approach",
+ "structure": [
+ {{
+ "type": "phase/category/direct",
+ "name": "Name of organizational structure",
+ "description": "What this encompasses",
+ "justification": "Why this structure element is needed for this goal"
+ }}
+ ],
+ "initial_tasks": [
+ {{
+ "description": "Specific actionable task",
+ "parent": "Which structure element this belongs to, or 'root' for direct tasks",
+ "tool_suggestion": "Which available tool to use, or 'manual' if no suitable tool",
+ "priority": 1-10,
+ "risk_level": "low/medium/high",
+ "rationale": "Why this task is necessary for the goal"
+ }}
+ ]
+}}
+
+BE INTELLIGENT: If the goal is simple, don't create complex multi-phase structures. If it's complex, then structure appropriately. Let the goal drive the structure, not the other way around."""
+
+ return prompt
+
+ def get_tree_update_prompt(self, tool_output: str, command: str, node: TaskNode) -> str:
+ """
+ Generate prompt for updating the tree based on tool output.
+
+ Args:
+ tool_output: Output from the executed tool
+ command: The command that was executed
+ node: The node being updated
+
+ Returns:
+ Update prompt
+ """
+ current_tree = self.tree_manager.to_natural_language()
+
+ prompt = f"""You are managing a Pentesting Task Tree (PTT). A task has been executed and you need to update the tree based on the results.
+
+Current PTT State:
+{current_tree}
+
+Executed Task: {node.description}
+Command: {command}
+Tool Output:
+{tool_output[:2000]} # Limit output length
+
+Based on this output, provide updates in the following JSON format:
+
+{{
+ "node_updates": {{
+ "status": "completed/failed/vulnerable/not_vulnerable",
+ "findings": "Summary of key findings from the output",
+ "output_summary": "Brief technical summary"
+ }},
+ "new_tasks": [
+ {{
+ "description": "New task based on findings",
+ "parent_phase": "Phase 1/2/3/4",
+ "tool_suggestion": "Suggested tool",
+ "priority": 1-10,
+ "risk_level": "low/medium/high",
+ "rationale": "Why this task is important"
+ }}
+ ],
+ "insights": "Any strategic insights or patterns noticed"
+}}
+
+Consider:
+1. What vulnerabilities or opportunities were discovered?
+2. What follow-up actions are needed based on the findings?
+3. Should any new attack vectors be explored?
+4. Are there any security misconfigurations evident?"""
+
+ return prompt
+
+ def get_next_action_prompt(self, available_tools: List[str]) -> str:
+ """
+ Generate prompt for selecting the next action.
+
+ Args:
+ available_tools: List of available MCP tools
+
+ Returns:
+ Next action selection prompt
+ """
+ current_tree = self.tree_manager.to_natural_language()
+ candidates = self.tree_manager.get_candidate_tasks()
+
+ # Prepare candidate descriptions
+ candidate_desc = []
+ for i, task in enumerate(candidates[:10]): # Limit to top 10
+ desc = f"{i+1}. {task.description}"
+ if task.priority:
+ desc += f" (Priority: {task.priority})"
+ candidate_desc.append(desc)
+
+ # Generate tool context
+ if available_tools:
+ tool_context = f"""
+Connected MCP Tools: {', '.join(available_tools)}
+
+Think about how to leverage these tools for the selected task. Each tool has its own capabilities -
+be creative and intelligent about how to accomplish penetration testing objectives with available tools.
+If a tool doesn't directly support a traditional approach, consider alternative methods that achieve the same goal.
+"""
+ else:
+ tool_context = """
+No MCP tools are currently connected. Select tasks that can be performed manually or recommend connecting appropriate tools.
+"""
+
+ prompt = f"""You are managing a Pentesting Task Tree (PTT) and need to select the next action.
+
+Goal: {self.tree_manager.goal}
+Target: {self.tree_manager.target}
+
+Current PTT State:
+{current_tree}
+
+{tool_context}
+
+Candidate Tasks:
+{chr(10).join(candidate_desc)}
+
+Statistics:
+- Total tasks: {len(self.tree_manager.nodes)}
+- Completed: {sum(1 for n in self.tree_manager.nodes.values() if n.status == NodeStatus.COMPLETED)}
+- In Progress: {sum(1 for n in self.tree_manager.nodes.values() if n.status == NodeStatus.IN_PROGRESS)}
+- Pending: {sum(1 for n in self.tree_manager.nodes.values() if n.status == NodeStatus.PENDING)}
+
+Select the most strategic next action and provide your response in JSON format:
+
+{{
+ "selected_task_index": 1-based index from candidate list,
+ "rationale": "Why this task is the best next step",
+ "command": "Intelligent request that leverages available tools effectively",
+ "tool": "Which available tool to use, or 'manual' if no suitable tool",
+ "expected_outcome": "What we hope to discover/achieve",
+ "alternative_if_blocked": "Backup task index if this fails"
+}}
+
+Consider:
+1. Logical progression through the penetration testing methodology
+2. Task dependencies and prerequisites
+3. Risk vs reward of different approaches
+4. How to best utilize available tools for maximum effectiveness
+5. Strategic value of each potential action
+
+Be intelligent about tool selection - think about what each available tool can accomplish."""
+
+ return prompt
+
+ def get_goal_check_prompt(self) -> str:
+ """
+ Generate prompt to check if the goal has been achieved.
+
+ Returns:
+ Goal achievement check prompt
+ """
+ current_tree = self.tree_manager.to_natural_language()
+ goal = self.tree_manager.goal
+
+ # Extract completed tasks and findings for better context
+ completed_tasks_with_findings = []
+ for node in self.tree_manager.nodes.values():
+ if node.status == NodeStatus.COMPLETED and node.findings:
+ completed_tasks_with_findings.append(f"✓ {node.description}: {node.findings}")
+
+ completed_context = "\n".join(completed_tasks_with_findings) if completed_tasks_with_findings else "No completed tasks with findings yet."
+
+ prompt = f"""Analyze the current Pentesting Task Tree (PTT) to determine if the PRIMARY GOAL has been achieved.
+
+IMPORTANT: Focus ONLY on whether the specific goal stated has been accomplished. Do not suggest additional scope or activities beyond the original goal.
+
+PRIMARY GOAL: {goal}
+Target: {self.tree_manager.target}
+
+COMPLETED TASKS WITH FINDINGS:
+{completed_context}
+
+Current PTT State:
+{current_tree}
+
+GOAL ACHIEVEMENT CRITERIA:
+- For information gathering goals, the goal is achieved when that specific information is obtained
+- For vulnerability assessment goals, the goal is achieved when vulnerabilities are identified and documented
+- For exploitation goals, the goal is achieved when successful exploitation is demonstrated
+- For access goals, the goal is achieved when the specified access level is obtained
+
+Provide your analysis in JSON format:
+
+{{
+ "goal_achieved": true/false,
+ "confidence": 0-100,
+ "evidence": "Specific evidence that the PRIMARY GOAL has been met (quote actual findings)",
+ "remaining_objectives": "What still needs to be done if goal not achieved (related to the ORIGINAL goal only)",
+ "recommendations": "Next steps ONLY if they relate to the original goal - do not expand scope",
+ "scope_warning": "Flag if any tasks seem to exceed the original goal scope"
+}}
+
+Consider:
+1. Has the SPECIFIC goal been demonstrably achieved?
+2. Is there sufficient evidence/proof in the completed tasks?
+3. Are there critical paths unexplored that are NECESSARY for the original goal?
+4. Would additional testing strengthen the results for the ORIGINAL goal only?
+
+DO NOT recommend expanding the scope beyond the original goal. If the goal is completed, mark it as achieved regardless of what other security activities could be performed."""
+
+ return prompt
+
+ def parse_tree_initialization_response(self, llm_response: str) -> Dict[str, Any]:
+ """Parse LLM response for tree initialization."""
+ try:
+ print(f"{Fore.CYAN}Parsing initialization response...{Style.RESET_ALL}")
+ # Extract JSON from response
+ response_json = self._extract_json(llm_response)
+
+ analysis = response_json.get('analysis', 'No analysis provided')
+ structure = response_json.get('structure', [])
+ initial_tasks = response_json.get('initial_tasks', [])
+
+ print(f"{Fore.GREEN}LLM Analysis: {analysis}{Style.RESET_ALL}")
+ print(f"{Fore.GREEN}Successfully parsed {len(structure)} structure elements and {len(initial_tasks)} tasks{Style.RESET_ALL}")
+
+ return {
+ 'analysis': analysis,
+ 'structure': structure,
+ 'initial_tasks': initial_tasks
+ }
+ except Exception as e:
+ print(f"{Fore.YELLOW}Failed to parse initialization response: {e}{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}Response text (first 500 chars): {llm_response[:500]}{Style.RESET_ALL}")
+ return {
+ 'analysis': 'Failed to parse LLM response',
+ 'structure': [],
+ 'initial_tasks': []
+ }
+
+ def parse_tree_update_response(self, llm_response: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
+ """Parse LLM response for tree updates."""
+ try:
+ response_json = self._extract_json(llm_response)
+ node_updates = response_json.get('node_updates', {})
+ new_tasks = response_json.get('new_tasks', [])
+ return node_updates, new_tasks
+ except Exception as e:
+ print(f"{Fore.YELLOW}Failed to parse update response: {e}{Style.RESET_ALL}")
+ return {}, []
+
+ def parse_next_action_response(self, llm_response: str, available_tools: List[str] = None) -> Optional[Dict[str, Any]]:
+ """Parse LLM response for next action selection."""
+ try:
+ response_json = self._extract_json(llm_response)
+ return response_json
+ except Exception as e:
+ print(f"{Fore.YELLOW}Failed to parse next action response: {e}{Style.RESET_ALL}")
+ return None
+
+ def parse_goal_check_response(self, llm_response: str) -> Dict[str, Any]:
+ """Parse LLM response for goal achievement check."""
+ try:
+ response_json = self._extract_json(llm_response)
+ return response_json
+ except Exception as e:
+ print(f"{Fore.YELLOW}Failed to parse goal check response: {e}{Style.RESET_ALL}")
+ return {"goal_achieved": False, "confidence": 0}
+
+ def _extract_json(self, text: str) -> Dict[str, Any]:
+ """Extract JSON from LLM response text."""
+ if not text:
+ raise ValueError("Empty response text")
+
+ print(f"{Fore.CYAN}Attempting to extract JSON from {len(text)} character response{Style.RESET_ALL}")
+
+ # Try multiple strategies to extract JSON
+ strategies = [
+ self._extract_json_code_block,
+ self._extract_json_braces,
+ self._extract_json_fuzzy,
+ self._create_fallback_json
+ ]
+
+ for i, strategy in enumerate(strategies):
+ try:
+ result = strategy(text)
+ if result:
+ print(f"{Fore.GREEN}Successfully extracted JSON using strategy {i+1}{Style.RESET_ALL}")
+ return result
+ except Exception as e:
+ print(f"{Fore.YELLOW}Strategy {i+1} failed: {e}{Style.RESET_ALL}")
+ continue
+
+ raise ValueError("Could not extract valid JSON from response")
+
+ def _extract_json_code_block(self, text: str) -> Dict[str, Any]:
+ """Extract JSON from code blocks."""
+ # Look for JSON between ```json and ``` or just ```
+ patterns = [
+ r'```json\s*(\{.*?\})\s*```',
+ r'```\s*(\{.*?\})\s*```'
+ ]
+
+ for pattern in patterns:
+ match = re.search(pattern, text, re.DOTALL)
+ if match:
+ json_str = match.group(1)
+ return json.loads(json_str)
+
+ raise ValueError("No JSON code block found")
+
+ def _extract_json_braces(self, text: str) -> Dict[str, Any]:
+ """Extract JSON by finding brace boundaries."""
+ # Find the first { and last }
+ json_start = text.find('{')
+ json_end = text.rfind('}')
+
+ if json_start != -1 and json_end != -1 and json_end > json_start:
+ json_str = text[json_start:json_end + 1]
+ return json.loads(json_str)
+
+ raise ValueError("No valid JSON braces found")
+
+ def _extract_json_fuzzy(self, text: str) -> Dict[str, Any]:
+ """Try to extract JSON with more flexible matching."""
+ # Look for task-like patterns and try to construct JSON
+ if "tasks" in text.lower():
+ # Try to find task descriptions
+ task_patterns = [
+ r'"description":\s*"([^"]+)"',
+ r'"tool_suggestion":\s*"([^"]+)"',
+ r'"priority":\s*(\d+)',
+ r'"risk_level":\s*"([^"]+)"'
+ ]
+
+ # This is a simplified approach - could be enhanced
+ # For now, fall through to the next strategy
+ pass
+
+ raise ValueError("Fuzzy JSON extraction failed")
+
+ def _create_fallback_json(self, text: str) -> Dict[str, Any]:
+ """Create fallback JSON if no valid JSON is found."""
+ print(f"{Fore.YELLOW}Creating fallback JSON structure{Style.RESET_ALL}")
+
+ # Return an empty but valid structure
+ return {
+ "tasks": [],
+ "node_updates": {"status": "completed"},
+ "new_tasks": [],
+ "selected_task_index": 1,
+ "goal_achieved": False,
+ "confidence": 0
+ }
+
+ def verify_tree_update(self, old_tree_state: str, new_tree_state: str) -> bool:
+ """
+ Verify that tree updates maintain integrity.
+
+ Args:
+ old_tree_state: Tree state before update
+ new_tree_state: Tree state after update
+
+ Returns:
+ True if update is valid
+ """
+ # For now, basic verification - can be enhanced
+ # Check that only leaf nodes were modified (as per PentestGPT approach)
+ # This is simplified - in practice would need more sophisticated checks
+
+ return True # Placeholder - implement actual verification logic
+
+ def generate_strategic_summary(self) -> str:
+ """Generate a strategic summary of the current PTT state."""
+ stats = self.tree_manager.get_statistics()
+
+ summary = f"""
+=== PTT Strategic Summary ===
+Goal: {self.tree_manager.goal}
+Target: {self.tree_manager.target}
+
+Progress Overview:
+- Total Tasks: {stats['total_nodes']}
+- Completed: {stats['status_counts'].get('completed', 0)}
+- In Progress: {stats['status_counts'].get('in_progress', 0)}
+- Failed: {stats['status_counts'].get('failed', 0)}
+- Vulnerabilities Found: {stats['status_counts'].get('vulnerable', 0)}
+
+Current Phase Focus:
+"""
+
+ # Identify which phase is most active
+ phase_activity = {}
+ for node in self.tree_manager.nodes.values():
+ if node.node_type == "phase":
+ completed_children = sum(
+ 1 for child_id in node.children_ids
+ if child_id in self.tree_manager.nodes
+ and self.tree_manager.nodes[child_id].status == NodeStatus.COMPLETED
+ )
+ total_children = len(node.children_ids)
+ phase_activity[node.description] = (completed_children, total_children)
+
+ for phase, (completed, total) in phase_activity.items():
+ if total > 0:
+ progress = (completed / total) * 100
+ summary += f"- {phase}: {completed}/{total} tasks ({progress:.0f}%)\n"
+
+ # Add key findings
+ summary += "\nKey Findings:\n"
+ vuln_count = 0
+ for node in self.tree_manager.nodes.values():
+ if node.status == NodeStatus.VULNERABLE and node.findings:
+ vuln_count += 1
+ summary += f"- {node.description}: {node.findings[:100]}...\n"
+ if vuln_count >= 5: # Limit to top 5
+ break
+
+ return summary
+
+ def validate_and_fix_tool_suggestions(self, tasks: List[Dict[str, Any]], available_tools: List[str]) -> List[Dict[str, Any]]:
+ """Let the LLM re-evaluate tool suggestions if they don't match available tools."""
+ if not available_tools:
+ return tasks
+
+ # Check if any tasks use unavailable tools
+ needs_fixing = []
+ valid_tasks = []
+
+ for task in tasks:
+ tool_suggestion = task.get('tool_suggestion', '')
+ if tool_suggestion in available_tools or tool_suggestion in ['manual', 'generic']:
+ valid_tasks.append(task)
+ else:
+ needs_fixing.append(task)
+
+ if needs_fixing:
+ print(f"{Fore.YELLOW}Some tasks reference unavailable tools. Letting AI re-evaluate...{Style.RESET_ALL}")
+ # Return all tasks - let the execution phase handle tool mismatches intelligently
+
+ return tasks
\ No newline at end of file
diff --git a/core/task_tree_manager.py b/core/task_tree_manager.py
new file mode 100644
index 0000000..449302b
--- /dev/null
+++ b/core/task_tree_manager.py
@@ -0,0 +1,357 @@
+"""Task Tree Manager for PTT-based autonomous agent mode."""
+
+import json
+import uuid
+from typing import Dict, List, Optional, Any, Tuple
+from datetime import datetime
+from enum import Enum
+
+
+class NodeStatus(Enum):
+ """Enumeration of possible node statuses."""
+ PENDING = "pending"
+ IN_PROGRESS = "in_progress"
+ COMPLETED = "completed"
+ FAILED = "failed"
+ BLOCKED = "blocked"
+ VULNERABLE = "vulnerable"
+ NOT_VULNERABLE = "not_vulnerable"
+
+
+class RiskLevel(Enum):
+ """Enumeration of risk levels."""
+ LOW = "low"
+ MEDIUM = "medium"
+ HIGH = "high"
+
+
+class TaskNode:
+ """Represents a single node in the task tree."""
+
+ def __init__(
+ self,
+ description: str,
+ parent_id: Optional[str] = None,
+ node_type: str = "task",
+ **kwargs
+ ):
+ """Initialize a task node."""
+ self.id = kwargs.get('id', str(uuid.uuid4()))
+ self.description = description
+ self.status = NodeStatus(kwargs.get('status', NodeStatus.PENDING.value))
+ self.node_type = node_type # task, phase, finding, objective
+ self.parent_id = parent_id
+ self.children_ids: List[str] = kwargs.get('children_ids', [])
+
+ # Task execution details
+ self.tool_used = kwargs.get('tool_used', None)
+ self.command_executed = kwargs.get('command_executed', None)
+ self.output_summary = kwargs.get('output_summary', None)
+ self.findings = kwargs.get('findings', None)
+
+ # Metadata
+ self.priority = kwargs.get('priority', 5) # 1-10, higher is more important
+ self.risk_level = RiskLevel(kwargs.get('risk_level', RiskLevel.LOW.value))
+ self.timestamp = kwargs.get('timestamp', None)
+ self.kb_references = kwargs.get('kb_references', [])
+ self.dependencies = kwargs.get('dependencies', [])
+
+ # Additional attributes
+ self.attributes = kwargs.get('attributes', {})
+
+ def to_dict(self) -> Dict[str, Any]:
+ """Convert node to dictionary representation."""
+ return {
+ 'id': self.id,
+ 'description': self.description,
+ 'status': self.status.value,
+ 'node_type': self.node_type,
+ 'parent_id': self.parent_id,
+ 'children_ids': self.children_ids,
+ 'tool_used': self.tool_used,
+ 'command_executed': self.command_executed,
+ 'output_summary': self.output_summary,
+ 'findings': self.findings,
+ 'priority': self.priority,
+ 'risk_level': self.risk_level.value,
+ 'timestamp': self.timestamp,
+ 'kb_references': self.kb_references,
+ 'dependencies': self.dependencies,
+ 'attributes': self.attributes
+ }
+
+ @classmethod
+ def from_dict(cls, data: Dict[str, Any]) -> 'TaskNode':
+ """Create node from dictionary representation."""
+ return cls(
+ description=data['description'],
+ **data
+ )
+
+
+class TaskTreeManager:
+ """Manages the Pentesting Task Tree (PTT) structure and operations."""
+
+ def __init__(self):
+ """Initialize the task tree manager."""
+ self.nodes: Dict[str, TaskNode] = {}
+ self.root_id: Optional[str] = None
+ self.goal: Optional[str] = None
+ self.target: Optional[str] = None
+ self.constraints: Dict[str, Any] = {}
+ self.creation_time = datetime.now()
+
+ def initialize_tree(self, goal: str, target: str, constraints: Dict[str, Any] = None) -> str:
+ """
+ Initialize the task tree with a goal and target.
+
+ Args:
+ goal: The primary objective
+ target: The target system/network
+ constraints: Any constraints or scope limitations
+
+ Returns:
+ The root node ID
+ """
+ self.goal = goal
+ self.target = target
+ self.constraints = constraints or {}
+
+ # Create root node - let the LLM determine what structure is needed
+ root_node = TaskNode(
+ description=f"Goal: {goal}",
+ node_type="objective"
+ )
+ self.root_id = root_node.id
+ self.nodes[root_node.id] = root_node
+
+ return self.root_id
+
+ def add_node(self, node: TaskNode) -> str:
+ """
+ Add a node to the tree.
+
+ Args:
+ node: The TaskNode to add
+
+ Returns:
+ The node ID
+ """
+ self.nodes[node.id] = node
+
+ # Update parent's children list
+ if node.parent_id and node.parent_id in self.nodes:
+ parent = self.nodes[node.parent_id]
+ if node.id not in parent.children_ids:
+ parent.children_ids.append(node.id)
+
+ return node.id
+
+ def update_node(self, node_id: str, updates: Dict[str, Any]) -> bool:
+ """
+ Update a node's attributes.
+
+ Args:
+ node_id: The ID of the node to update
+ updates: Dictionary of attributes to update
+
+ Returns:
+ True if successful, False otherwise
+ """
+ if node_id not in self.nodes:
+ return False
+
+ node = self.nodes[node_id]
+
+ # Update allowed fields
+ allowed_fields = {
+ 'status', 'tool_used', 'command_executed', 'output_summary',
+ 'findings', 'priority', 'risk_level', 'timestamp', 'kb_references'
+ }
+
+ for field, value in updates.items():
+ if field in allowed_fields:
+ if field == 'status':
+ node.status = NodeStatus(value)
+ elif field == 'risk_level':
+ node.risk_level = RiskLevel(value)
+ else:
+ setattr(node, field, value)
+ elif field == 'attributes':
+ node.attributes.update(value)
+
+ return True
+
+ def get_node(self, node_id: str) -> Optional[TaskNode]:
+ """Get a node by ID."""
+ return self.nodes.get(node_id)
+
+ def get_children(self, node_id: str) -> List[TaskNode]:
+ """Get all children of a node."""
+ if node_id not in self.nodes:
+ return []
+
+ parent = self.nodes[node_id]
+ return [self.nodes[child_id] for child_id in parent.children_ids if child_id in self.nodes]
+
+ def get_leaf_nodes(self) -> List[TaskNode]:
+ """Get all leaf nodes (nodes without children)."""
+ return [node for node in self.nodes.values() if not node.children_ids]
+
+ def get_candidate_tasks(self) -> List[TaskNode]:
+ """
+ Get candidate tasks for next action.
+
+ Returns tasks that are:
+ - Leaf nodes
+ - Status is PENDING or FAILED
+ - All dependencies are completed
+ """
+ candidates = []
+
+ for node in self.get_leaf_nodes():
+ if node.status in [NodeStatus.PENDING, NodeStatus.FAILED]:
+ # Check dependencies
+ deps_satisfied = all(
+ self.nodes.get(dep_id, TaskNode("")).status == NodeStatus.COMPLETED
+ for dep_id in node.dependencies
+ )
+
+ if deps_satisfied:
+ candidates.append(node)
+
+ return candidates
+
+ def prioritize_tasks(self, tasks: List[TaskNode]) -> List[TaskNode]:
+ """
+ Prioritize tasks based on various factors.
+
+ Args:
+ tasks: List of candidate tasks
+
+ Returns:
+ Sorted list of tasks (highest priority first)
+ """
+ def task_score(task: TaskNode) -> float:
+ # Base score from priority
+ score = task.priority
+
+ # Boost for reconnaissance tasks in early stages
+ if "recon" in task.description.lower() or "scan" in task.description.lower():
+ completed_count = sum(1 for n in self.nodes.values() if n.status == NodeStatus.COMPLETED)
+ if completed_count < 5:
+ score += 3
+
+ # Boost for vulnerability assessment after recon
+ if "vuln" in task.description.lower() and self._has_completed_recon():
+ score += 2
+
+ # Penalty for high-risk tasks early on
+ if task.risk_level == RiskLevel.HIGH:
+ score -= 2
+
+ return score
+
+ return sorted(tasks, key=task_score, reverse=True)
+
+ def _has_completed_recon(self) -> bool:
+ """Check if basic reconnaissance has been completed."""
+ recon_keywords = ["scan", "recon", "enumerat", "discover"]
+ completed_recon = any(
+ any(keyword in node.description.lower() for keyword in recon_keywords)
+ and node.status == NodeStatus.COMPLETED
+ for node in self.nodes.values()
+ )
+ return completed_recon
+
+ def to_natural_language(self, node_id: Optional[str] = None, indent: int = 0) -> str:
+ """
+ Convert the tree (or subtree) to natural language representation.
+
+ Args:
+ node_id: Starting node ID (None for root)
+ indent: Indentation level
+
+ Returns:
+ Natural language representation of the tree
+ """
+ if node_id is None:
+ node_id = self.root_id
+
+ if node_id not in self.nodes:
+ return ""
+
+ node = self.nodes[node_id]
+ indent_str = " " * indent
+
+ # Format node information
+ status_symbol = {
+ NodeStatus.PENDING: "○",
+ NodeStatus.IN_PROGRESS: "◐",
+ NodeStatus.COMPLETED: "●",
+ NodeStatus.FAILED: "✗",
+ NodeStatus.BLOCKED: "□",
+ NodeStatus.VULNERABLE: "⚠",
+ NodeStatus.NOT_VULNERABLE: "✓"
+ }.get(node.status, "?")
+
+ lines = [f"{indent_str}{status_symbol} {node.description}"]
+
+ # Add findings if present
+ if node.findings:
+ lines.append(f"{indent_str} → Findings: {node.findings}")
+
+ # Add tool/command info if present
+ if node.tool_used:
+ lines.append(f"{indent_str} → Tool: {node.tool_used}")
+
+ # Process children
+ for child_id in node.children_ids:
+ lines.append(self.to_natural_language(child_id, indent + 1))
+
+ return "\n".join(lines)
+
+ def to_json(self) -> str:
+ """Serialize the tree to JSON."""
+ data = {
+ 'goal': self.goal,
+ 'target': self.target,
+ 'constraints': self.constraints,
+ 'root_id': self.root_id,
+ 'creation_time': self.creation_time.isoformat(),
+ 'nodes': {node_id: node.to_dict() for node_id, node in self.nodes.items()}
+ }
+ return json.dumps(data, indent=2)
+
+ @classmethod
+ def from_json(cls, json_str: str) -> 'TaskTreeManager':
+ """Deserialize a tree from JSON."""
+ data = json.loads(json_str)
+
+ manager = cls()
+ manager.goal = data['goal']
+ manager.target = data['target']
+ manager.constraints = data['constraints']
+ manager.root_id = data['root_id']
+ manager.creation_time = datetime.fromisoformat(data['creation_time'])
+
+ # Recreate nodes
+ for node_id, node_data in data['nodes'].items():
+ node = TaskNode.from_dict(node_data)
+ manager.nodes[node_id] = node
+
+ return manager
+
+ def get_statistics(self) -> Dict[str, Any]:
+ """Get tree statistics."""
+ status_counts = {}
+ for node in self.nodes.values():
+ status = node.status.value
+ status_counts[status] = status_counts.get(status, 0) + 1
+
+ return {
+ 'total_nodes': len(self.nodes),
+ 'status_counts': status_counts,
+ 'leaf_nodes': len(self.get_leaf_nodes()),
+ 'candidate_tasks': len(self.get_candidate_tasks())
+ }
\ No newline at end of file
diff --git a/reporting/generators.py b/reporting/generators.py
index 6d58bdc..60f832b 100644
--- a/reporting/generators.py
+++ b/reporting/generators.py
@@ -9,6 +9,7 @@ import asyncio
from datetime import datetime
from typing import Dict, List, Any
import re
+from colorama import Fore, Style
class PentestReportGenerator:
@@ -467,4 +468,350 @@ async def generate_report_from_workflow(report_data: Dict[str, Any], run_agent_f
"""
generator = PentestReportGenerator(report_data)
- return await generator.generate_report(run_agent_func, connected_servers, kb_instance, save_raw_history)
\ No newline at end of file
+ return await generator.generate_report(run_agent_func, connected_servers, kb_instance, save_raw_history)
+
+
+async def generate_report_from_ptt(ptt_manager, conversation_history: List[Dict[str, Any]], run_agent_func=None, connected_servers=None, kb_instance=None, save_raw_history=False) -> str:
+ """
+ Generate a professional report from PTT (Pentesting Task Tree) data
+
+ Args:
+ ptt_manager: TaskTreeManager instance containing the PTT
+ conversation_history: List of conversation history entries
+ run_agent_func: The main agent function for AI analysis
+ connected_servers: Connected MCP servers
+ kb_instance: Knowledge base instance
+ save_raw_history: Whether to save raw conversation history
+
+ Returns:
+ str: Path to generated report file
+ """
+
+ # Convert PTT data to report-compatible format
+ report_data = {
+ 'workflow_name': f"Agent Mode: {ptt_manager.goal}",
+ 'workflow_key': 'agent_mode',
+ 'target': ptt_manager.target,
+ 'timestamp': ptt_manager.creation_time,
+ 'conversation_history': conversation_history,
+ 'tools_used': [server.name for server in connected_servers] if connected_servers else [],
+ 'ptt_data': {
+ 'goal': ptt_manager.goal,
+ 'target': ptt_manager.target,
+ 'constraints': ptt_manager.constraints,
+ 'statistics': ptt_manager.get_statistics(),
+ 'tree_structure': ptt_manager.to_natural_language(),
+ 'nodes': {node_id: node.to_dict() for node_id, node in ptt_manager.nodes.items()}
+ }
+ }
+
+ # Create a specialized PTT report generator
+ generator = PTTReportGenerator(report_data)
+
+ if run_agent_func and connected_servers:
+ return await generator.generate_report(run_agent_func, connected_servers, kb_instance, save_raw_history)
+ else:
+ # Generate a basic report without AI analysis if no agent function available
+ return generator.generate_basic_report(save_raw_history)
+
+
+class PTTReportGenerator:
+ """Generate professional penetration testing reports from PTT data"""
+
+ def __init__(self, report_data: Dict[str, Any]):
+ self.workflow_name = report_data['workflow_name']
+ self.workflow_key = report_data['workflow_key']
+ self.target = report_data['target']
+ self.timestamp = report_data['timestamp']
+ self.conversation_history = report_data['conversation_history']
+ self.tools_used = report_data.get('tools_used', [])
+ self.ptt_data = report_data.get('ptt_data', {})
+
+ # Will be populated by AI analysis
+ self.structured_findings = {}
+
+ def generate_basic_report(self, save_raw_history: bool = False) -> str:
+ """Generate a basic report without AI analysis"""
+ # Extract findings from PTT nodes
+ vulnerabilities = []
+ completed_tasks = []
+ failed_tasks = []
+
+ for node_data in self.ptt_data.get('nodes', {}).values():
+ if node_data.get('status') == 'completed' and node_data.get('findings'):
+ completed_tasks.append({
+ 'description': node_data.get('description', ''),
+ 'findings': node_data.get('findings', ''),
+ 'tool_used': node_data.get('tool_used', ''),
+ 'output_summary': node_data.get('output_summary', '')
+ })
+ elif node_data.get('status') == 'vulnerable':
+ vulnerabilities.append({
+ 'title': node_data.get('description', 'Unknown Vulnerability'),
+ 'description': node_data.get('findings', 'No description available'),
+ 'severity': 'Medium', # Default severity
+ 'affected_systems': [self.target],
+ 'evidence': node_data.get('output_summary', ''),
+ 'remediation': 'Review and patch identified vulnerabilities'
+ })
+ elif node_data.get('status') == 'failed':
+ failed_tasks.append({
+ 'description': node_data.get('description', ''),
+ 'tool_used': node_data.get('tool_used', ''),
+ 'error': node_data.get('output_summary', '')
+ })
+
+ # Create structured findings
+ self.structured_findings = {
+ 'executive_summary': f"Autonomous penetration testing completed against {self.target}. Goal: {self.ptt_data.get('goal', 'Unknown')}. {len(completed_tasks)} tasks completed successfully, {len(vulnerabilities)} vulnerabilities identified.",
+ 'vulnerabilities': vulnerabilities,
+ 'key_statistics': {
+ 'total_vulnerabilities': len(vulnerabilities),
+ 'critical_count': 0,
+ 'high_count': 0,
+ 'medium_count': len(vulnerabilities),
+ 'systems_compromised': 1 if vulnerabilities else 0
+ },
+ 'methodology': f"Autonomous penetration testing using Pentesting Task Tree (PTT) methodology with intelligent task prioritization and execution.",
+ 'conclusion': f"Assessment {'successfully identified security weaknesses' if vulnerabilities else 'completed without identifying critical vulnerabilities'}. {'Immediate remediation recommended' if vulnerabilities else 'Continue monitoring and regular assessments'}.",
+ 'recommendations': [
+ {
+ 'category': 'Patch Management',
+ 'recommendation': 'Apply security patches to all identified vulnerable services',
+ 'priority': 'Immediate',
+ 'business_justification': 'Prevents exploitation of known vulnerabilities'
+ },
+ {
+ 'category': 'Monitoring',
+ 'recommendation': 'Implement monitoring for the services and ports identified during reconnaissance',
+ 'priority': 'Short-term',
+ 'business_justification': 'Early detection of potential security incidents'
+ }
+ ] if vulnerabilities else [
+ {
+ 'category': 'Continued Security',
+ 'recommendation': 'Maintain current security posture and conduct regular assessments',
+ 'priority': 'Medium-term',
+ 'business_justification': 'Proactive security maintenance'
+ }
+ ]
+ }
+
+ # Generate the markdown report
+ markdown_report = self.generate_markdown_report()
+
+ # Save report
+ return self.save_report(markdown_report, save_raw_history)
+
+ async def generate_report(self, run_agent_func, connected_servers, kb_instance=None, save_raw_history=False) -> str:
+ """Generate a comprehensive report with AI analysis"""
+ try:
+ # Create analysis prompt specifically for PTT data
+ analysis_prompt = self.create_ptt_analysis_prompt()
+
+ # Get AI analysis
+ ai_response = await self.analyze_with_ai(
+ analysis_prompt,
+ run_agent_func,
+ connected_servers,
+ kb_instance
+ )
+
+ if ai_response:
+ # Parse AI response
+ self.structured_findings = self.parse_ai_response(ai_response)
+ else:
+ print(f"{Fore.YELLOW}AI analysis failed, generating basic report...{Style.RESET_ALL}")
+ return self.generate_basic_report(save_raw_history)
+
+ except Exception as e:
+ print(f"{Fore.YELLOW}Error in AI analysis: {e}. Generating basic report...{Style.RESET_ALL}")
+ return self.generate_basic_report(save_raw_history)
+
+ # Generate markdown report
+ markdown_report = self.generate_markdown_report()
+
+ # Save report
+ return self.save_report(markdown_report, save_raw_history)
+
+ def create_ptt_analysis_prompt(self) -> str:
+ """Create analysis prompt for PTT data"""
+ ptt_structure = self.ptt_data.get('tree_structure', 'No PTT structure available')
+ goal = self.ptt_data.get('goal', 'Unknown goal')
+ target = self.target
+ statistics = self.ptt_data.get('statistics', {})
+
+ # Extract key findings from completed tasks
+ key_findings = []
+ for node_data in self.ptt_data.get('nodes', {}).values():
+ if node_data.get('status') in ['completed', 'vulnerable'] and node_data.get('findings'):
+ key_findings.append(f"- {node_data.get('description', '')}: {node_data.get('findings', '')}")
+
+ findings_text = '\n'.join(key_findings) if key_findings else 'No significant findings recorded'
+
+ prompt = f"""You are analyzing the results of an autonomous penetration test conducted using a Pentesting Task Tree (PTT) methodology.
+
+ASSESSMENT DETAILS:
+Goal: {goal}
+Target: {target}
+Statistics: {statistics}
+
+PTT STRUCTURE:
+{ptt_structure}
+
+KEY FINDINGS:
+{findings_text}
+
+Based on this PTT analysis, provide a comprehensive security assessment in the following JSON format:
+
+{{
+ "executive_summary": "Professional executive summary of the assessment",
+ "key_statistics": {{
+ "total_vulnerabilities": 0,
+ "critical_count": 0,
+ "high_count": 0,
+ "medium_count": 0,
+ "low_count": 0,
+ "systems_compromised": 0
+ }},
+ "vulnerabilities": [
+ {{
+ "title": "Vulnerability name",
+ "description": "Detailed description",
+ "severity": "Critical/High/Medium/Low",
+ "impact": "Business impact description",
+ "affected_systems": ["system1", "system2"],
+ "evidence": "Technical evidence",
+ "remediation": "Specific remediation steps",
+ "references": ["CVE-XXXX", "reference links"]
+ }}
+ ],
+ "compromised_systems": [
+ {{
+ "system": "system identifier",
+ "access_level": "user/admin/root",
+ "method": "exploitation method",
+ "evidence": "proof of compromise"
+ }}
+ ],
+ "attack_paths": [
+ {{
+ "path_description": "Attack path name",
+ "impact": "potential impact",
+ "steps": ["step1", "step2", "step3"]
+ }}
+ ],
+ "recommendations": [
+ {{
+ "category": "category name",
+ "recommendation": "specific recommendation",
+ "priority": "Immediate/Short-term/Medium-term/Long-term",
+ "business_justification": "why this matters to business"
+ }}
+ ],
+ "methodology": "Description of the PTT methodology used",
+ "conclusion": "Professional conclusion of the assessment"
+}}
+
+Focus on:
+1. Extracting real security findings from the PTT execution
+2. Proper risk classification
+3. Actionable recommendations
+4. Business-relevant impact assessment"""
+
+ return prompt
+
+ async def analyze_with_ai(self, prompt: str, run_agent_func, connected_servers, kb_instance) -> str:
+ """Analyze the assessment with AI"""
+ try:
+ result = await run_agent_func(
+ prompt,
+ connected_servers,
+ history=[],
+ streaming=True,
+ kb_instance=kb_instance
+ )
+
+ if hasattr(result, "final_output"):
+ return result.final_output
+ elif hasattr(result, "output"):
+ return result.output
+ elif isinstance(result, str):
+ return result
+
+ except Exception as e:
+ print(f"{Fore.RED}Error in AI analysis: {e}{Style.RESET_ALL}")
+
+ return None
+
+ def parse_ai_response(self, response: str) -> Dict[str, Any]:
+ """Parse AI response for structured findings"""
+ try:
+ # Try to extract JSON from the response
+ import re
+
+ # Look for JSON in the response
+ json_match = re.search(r'\{.*\}', response, re.DOTALL)
+ if json_match:
+ json_str = json_match.group()
+ return json.loads(json_str)
+
+ except Exception as e:
+ print(f"{Fore.YELLOW}Failed to parse AI response: {e}{Style.RESET_ALL}")
+
+ # Fallback to basic findings
+ return {
+ 'executive_summary': 'Assessment completed successfully.',
+ 'vulnerabilities': [],
+ 'key_statistics': {'total_vulnerabilities': 0},
+ 'methodology': 'Autonomous penetration testing using PTT methodology.',
+ 'conclusion': 'Assessment completed.',
+ 'recommendations': []
+ }
+
+ def generate_markdown_report(self) -> str:
+ """Generate the final markdown report using the same format as PentestReportGenerator"""
+ # Use the same report generation logic as the workflow reporter
+ temp_generator = PentestReportGenerator({
+ 'workflow_name': self.workflow_name,
+ 'workflow_key': self.workflow_key,
+ 'target': self.target,
+ 'timestamp': self.timestamp,
+ 'conversation_history': self.conversation_history,
+ 'tools_used': self.tools_used
+ })
+ temp_generator.structured_findings = self.structured_findings
+
+ return temp_generator.generate_markdown_report()
+
+ def save_report(self, markdown_content: str, save_raw_history: bool = False) -> str:
+ """Save the report to file"""
+ # Create reports directory if it doesn't exist
+ reports_dir = "reports"
+ if not os.path.exists(reports_dir):
+ os.makedirs(reports_dir)
+
+ # Generate filename
+ timestamp_str = str(int(self.timestamp.timestamp()))
+ safe_target = re.sub(r'[^\w\-_\.]', '_', self.target)
+ filename = f"{reports_dir}/ghostcrew_agent_mode_{safe_target}_{timestamp_str}.md"
+
+ # Save markdown file
+ with open(filename, 'w', encoding='utf-8') as f:
+ f.write(markdown_content)
+
+ # Optionally save raw history and PTT data
+ if save_raw_history:
+ raw_filename = f"{reports_dir}/ghostcrew_agent_mode_{safe_target}_{timestamp_str}_raw.json"
+ raw_data = {
+ 'ptt_data': self.ptt_data,
+ 'conversation_history': self.conversation_history,
+ 'timestamp': self.timestamp.isoformat()
+ }
+
+ with open(raw_filename, 'w', encoding='utf-8') as f:
+ json.dump(raw_data, f, indent=2, default=str)
+
+ print(f"{Fore.GREEN}Raw PTT data saved: {raw_filename}{Style.RESET_ALL}")
+
+ return filename
\ No newline at end of file
diff --git a/ui/menu_system.py b/ui/menu_system.py
index 75809c7..5b5c367 100644
--- a/ui/menu_system.py
+++ b/ui/menu_system.py
@@ -1,6 +1,6 @@
"""Menu system and user interface components for GHOSTCREW."""
-from typing import Optional, List, Tuple
+from typing import Optional, List, Tuple, Dict, Any
from colorama import Fore, Style
from config.constants import (
MAIN_MENU_TITLE, INTERACTIVE_OPTION, AUTOMATED_OPTION,
@@ -25,10 +25,16 @@ class MenuSystem:
else:
print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (workflows.py not found){Style.RESET_ALL}")
- print(f"3. {EXIT_OPTION}")
+ # Agent mode option
+ if has_connected_servers:
+ print(f"3. {Fore.YELLOW}Agent Mode{Style.RESET_ALL}")
+ else:
+ print(f"3. {Fore.LIGHTBLACK_EX}Agent Mode (requires MCP tools){Style.RESET_ALL}")
+
+ print(f"4. {EXIT_OPTION}")
@staticmethod
- def get_menu_choice(max_option: int = 3) -> str:
+ def get_menu_choice(max_option: int = 4) -> str:
"""Get user's menu selection."""
return input(f"\n{Fore.GREEN}Select mode (1-{max_option}): {Style.RESET_ALL}").strip()
@@ -39,6 +45,88 @@ class MenuSystem:
print(f"{Fore.WHITE}Type your questions or commands. Use 'multi' for multi-line input.{Style.RESET_ALL}")
print(f"{Fore.WHITE}Type 'menu' to return to main menu.{Style.RESET_ALL}\n")
+ @staticmethod
+ def display_agent_mode_intro() -> None:
+ """Display introduction for agent mode."""
+ print(f"\n{Fore.CYAN}AGENT MODE - Autonomous PTT-based Penetration Testing{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}{'='*60}{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}The AI agent will autonomously conduct a penetration test{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}using a dynamic Pentesting Task Tree (PTT) for strategic{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}decision making and maintaining context throughout the test.{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}{'='*60}{Style.RESET_ALL}\n")
+
+ @staticmethod
+ def get_agent_mode_params() -> Optional[Dict[str, Any]]:
+ """Get parameters for agent mode initialization."""
+ print(f"{Fore.CYAN}Agent Mode Setup{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Please provide the following information:{Style.RESET_ALL}\n")
+
+ # Get goal
+ print(f"{Fore.YELLOW}1. Primary Goal{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}What is the main objective of this penetration test?{Style.RESET_ALL}")
+ print(f"{Fore.LIGHTBLACK_EX}Example: 'Gain administrative access to the target system'{Style.RESET_ALL}")
+ print(f"{Fore.LIGHTBLACK_EX}Example: 'Identify and exploit vulnerabilities in the web application'{Style.RESET_ALL}")
+ goal = input(f"{Fore.GREEN}Goal: {Style.RESET_ALL}").strip()
+
+ if not goal:
+ print(f"{Fore.RED}Goal is required.{Style.RESET_ALL}")
+ return None
+
+ # Get target
+ print(f"\n{Fore.YELLOW}2. Target Information{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Specify the target system, network, or application.{Style.RESET_ALL}")
+ print(f"{Fore.LIGHTBLACK_EX}Example: '192.168.1.100' or 'example.com' or '192.168.1.0/24'{Style.RESET_ALL}")
+ target = input(f"{Fore.GREEN}Target: {Style.RESET_ALL}").strip()
+
+ if not target:
+ print(f"{Fore.RED}Target is required.{Style.RESET_ALL}")
+ return None
+
+ # Get constraints
+ constraints = {}
+ print(f"\n{Fore.YELLOW}3. Constraints/Scope (Optional){Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Any limitations or specific requirements?{Style.RESET_ALL}")
+
+ # Iteration limit
+ print(f"\n{Fore.WHITE}Iteration Limit:{Style.RESET_ALL}")
+ print("How many iterations should the agent run?")
+ print("Each iteration involves task selection, execution, and tree updates.")
+ print("Recommended: 10-30 iterations for thorough testing")
+ print("Set to 0 to run until goal is achieved or no more actions available")
+ iteration_limit_input = input(f"{Fore.GREEN}Iteration limit (default: 20): {Style.RESET_ALL}").strip()
+
+ try:
+ iteration_limit = int(iteration_limit_input) if iteration_limit_input else 20
+ # Allow 0 for unlimited, but cap maximum at 200 for safety
+ iteration_limit = max(0, min(200, iteration_limit))
+ constraints['iteration_limit'] = iteration_limit
+ except ValueError:
+ constraints['iteration_limit'] = 20
+ print(f"{Fore.YELLOW}Invalid input, using default: 20{Style.RESET_ALL}")
+
+ # Additional notes
+ notes = input(f"{Fore.GREEN}Additional notes/constraints (optional): {Style.RESET_ALL}").strip()
+ if notes:
+ constraints['notes'] = notes
+
+ # Confirm
+ print(f"\n{Fore.CYAN}Configuration Summary:{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Goal: {goal}{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Target: {target}{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Iteration Limit: {constraints['iteration_limit']}{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Constraints: {constraints}{Style.RESET_ALL}")
+
+ confirm = input(f"\n{Fore.YELLOW}Proceed with agent mode? (yes/no): {Style.RESET_ALL}").strip().lower()
+ if confirm != 'yes':
+ print(f"{Fore.YELLOW}Agent mode cancelled.{Style.RESET_ALL}")
+ return None
+
+ return {
+ 'goal': goal,
+ 'target': target,
+ 'constraints': constraints
+ }
+
@staticmethod
def get_user_input() -> str:
"""Get user input with prompt."""
@@ -88,6 +176,14 @@ class MenuSystem:
print(f"{Fore.WHITE}Please configure MCP tools to use this feature.{Style.RESET_ALL}")
input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
+ @staticmethod
+ def display_agent_mode_requirements_message() -> None:
+ """Display message about agent mode requirements."""
+ print(f"\n{Fore.YELLOW}Agent Mode requires MCP tools to be configured and connected.{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}The autonomous agent needs real security tools to execute PTT tasks.{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Please configure MCP tools to use this feature.{Style.RESET_ALL}")
+ input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
+
@staticmethod
def get_workflow_target() -> Optional[str]:
"""Get target input for workflow execution."""