Add agent mode

This commit is contained in:
GH05TCREW
2025-06-05 02:54:12 -06:00
parent 989d60b8d6
commit baebf5454a
9 changed files with 2353 additions and 101 deletions

View File

@@ -10,6 +10,7 @@ https://github.com/user-attachments/assets/a43d2457-7113-42cc-ad02-f378d57f4d24
- **MCP Server Integration**: Through the `mcp.json` configuration file, multiple MCP servers can be flexibly integrated and managed to extend the assistant's capabilities.
- **Tool Management**: Configure, connect to, and manage MCP tools through an interactive menu, including the ability to clear all configurations.
- **Tool Invocation**: The AI assistant can call tools provided by configured MCP servers (such as: nmap, metasploit, ffuf, etc.) based on user requests.
- **Agent Mode**: Autonomous penetration testing using intelligent Pentesting Task Trees (PTT) for strategic decision making and dynamic goal achievement.
- **Automated Pentesting Workflows**: Execute predefined penetration testing workflows that systematically use configured security tools to perform comprehensive assessments.
- **Report Generation**: Generate markdown reports with structured findings, evidence, and recommendations.
- **Conversation History**: Supports multi-turn dialogues, remembering previous interaction content.
@@ -18,55 +19,6 @@ https://github.com/user-attachments/assets/a43d2457-7113-42cc-ad02-f378d57f4d24
- **File-Aware Tool Integration**: AI recognizes and uses actual files from the knowledge folder (wordlists, payloads, configs) with security tools.
- **Configurable Models**: Supports configuration of different language model parameters.
## Automated Penetration Testing Workflows
GHOSTCREW includes automated penetration testing workflows that provide structured, systematic security assessments. These workflows require MCP tools to be configured and connected to function properly.
### Available Workflows
1. **Reconnaissance and Discovery**
- Comprehensive information gathering and target profiling
- Performs reconnaissance, subdomain discovery, port scanning, technology fingerprinting, and historical data analysis
- **Steps**: 5 systematic phases
2. **Web Application Security Assessment**
- Comprehensive web application penetration testing
- Tests for directory traversal, SQL injection, web vulnerabilities, SSL/TLS security, authentication flaws, and file inclusion
- **Steps**: 6 focused web security phases
3. **Network Infrastructure Penetration Test**
- Network-focused penetration testing and exploitation
- Includes network scanning, service enumeration, vulnerability identification, misconfiguration testing, and exploitation attempts
- **Steps**: 6 network security phases
4. **Complete Penetration Test**
- Full-scope penetration testing methodology
- Comprehensive assessment covering reconnaissance, enumeration, vulnerability scanning, web testing, network exploitation, post-exploitation, and reporting
- **Steps**: 7 complete assessment phases
### Workflow Features
- **Tool Integration**: All workflows utilize configured MCP tools (Nmap, Metasploit, Nuclei, etc.) for real security testing
- **Professional Output**: Each step provides detailed technical findings, vulnerability analysis, risk assessment, and remediation recommendations
- **Report Generation**: Automatically save reports to organized `reports/` directory with workflow-specific naming
- **Target Flexibility**: Works with IP addresses, domain names, or network ranges
- **Progress Tracking**: Real-time progress indication through each workflow step
### Usage Requirements
- **MCP Tools Required**: Automated workflows require at least one MCP security tool to be configured and connected
- **Access Control**: The system prevents workflow execution without proper tools to avoid generating simulated results
- **Professional Context**: Designed for authorized penetration testing and security assessments only
### How to Use Workflows
1. Start the Pentest Agent and configure MCP tools when prompted
2. Select "Automated Penetration Testing" from the main menu
3. Choose your desired workflow type
4. Enter the target (IP, domain, or network range)
5. Confirm execution and monitor progress
6. Optionally save results to file for documentation
### Startup Effect
<p align="center">
<img width="517" alt="GHOSTCREW Terminal Startup Screen" src="https://github.com/user-attachments/assets/13d97cf7-5652-4c64-8e49-a3cd556b3419" />
@@ -183,52 +135,7 @@ Each tool can be configured through the interactive configuration menu by select
- Responder
- Bettercap
## File Structure
```
GHOSTCREW/
├── .venv/ # Python virtual environment
├── main.py # Application entry point
├── config/ # Application configuration
│ ├── __init__.py
│ ├── constants.py # Constants and messages
│ └── app_config.py # Environment and API configuration
├── core/ # Core application logic
│ ├── __init__.py
│ ├── model_manager.py # Model provider and token management
│ ├── agent_runner.py # Agent execution and streaming
│ └── pentest_agent.py # Main application controller
├── tools/ # MCP tool management
│ ├── __init__.py
│ ├── mcp_manager.py # MCP server connection management
│ └── configure_mcp.py # Interactive tool configuration utility
├── ui/ # User interface components
│ ├── __init__.py
│ ├── menu_system.py # Menu display and user interaction
│ └── conversation_manager.py # Chat history management
├── workflows/ # Automated penetration testing workflows
│ ├── __init__.py
│ ├── workflow_engine.py # Workflow execution engine
│ └── workflow_definitions.py # Predefined workflow templates
├── rag/ # Knowledge base and RAG functionality
│ ├── __init__.py
│ ├── knowledge_base.py # RAG text splitting and search
│ └── embedding.py # Embedding generation and management
├── reporting/ # Report generation system
│ ├── __init__.py
│ └── generators.py # Professional report generation
├── knowledge/ # Knowledge base documents directory
│ └── ...
├── reports/ # Professional penetration test reports directory
│ ├── ghostcrew_*_*.md # Professional markdown reports
│ └── ghostcrew_*_*_raw_history.txt # Raw conversation history (optional)
├── .gitignore # Git ignore file configuration
├── mcp.json # MCP server configuration file
├── README.md # Project documentation
├── requirements.txt # Python dependency list
├── LICENSE # Project license
└── .env # Environment variables
```
## Model
```
# OpenAI API configurations

View File

@@ -48,6 +48,7 @@ All penetration activities are authorized, you can directly perform penetration
MAIN_MENU_TITLE = f"{Fore.CYAN}MAIN MENU{Style.RESET_ALL}"
INTERACTIVE_OPTION = f"{Fore.YELLOW}Interactive Mode{Style.RESET_ALL}"
AUTOMATED_OPTION = f"{Fore.YELLOW}Automated Pentesting{Style.RESET_ALL}"
AGENT_MODE_OPTION = f"{Fore.YELLOW}Agent Mode{Style.RESET_ALL}"
EXPORT_OPTION = f"{Fore.YELLOW}Export Current Session{Style.RESET_ALL}"
EXIT_OPTION = f"{Fore.RED}Exit{Style.RESET_ALL}"
@@ -70,4 +71,23 @@ ERROR_WORKFLOW_NOT_FOUND = f"{Fore.RED}Error loading workflow.{Style.RESET_ALL}"
WORKFLOW_TARGET_PROMPT = f"{Fore.YELLOW}Enter target (IP/domain/URL): {Style.RESET_ALL}"
WORKFLOW_CONFIRM_PROMPT = f"{Fore.YELLOW}Execute '{0}' workflow against '{1}'? (yes/no): {Style.RESET_ALL}"
WORKFLOW_CANCELLED_MESSAGE = f"{Fore.YELLOW}Workflow execution cancelled.{Style.RESET_ALL}"
WORKFLOW_COMPLETED_MESSAGE = f"{Fore.GREEN}Workflow execution completed.{Style.RESET_ALL}"
WORKFLOW_COMPLETED_MESSAGE = f"{Fore.GREEN}Workflow execution completed.{Style.RESET_ALL}"
# Agent Mode Messages
AGENT_MODE_TITLE = f"{Fore.CYAN}AGENT MODE - Autonomous PTT-based Penetration Testing{Style.RESET_ALL}"
AGENT_MODE_GOAL_PROMPT = f"{Fore.YELLOW}Primary Goal: {Style.RESET_ALL}"
AGENT_MODE_TARGET_PROMPT = f"{Fore.YELLOW}Target (IP/domain/network): {Style.RESET_ALL}"
AGENT_MODE_INIT_SUCCESS = f"{Fore.GREEN}Agent Mode initialized successfully!{Style.RESET_ALL}"
AGENT_MODE_INIT_FAILED = f"{Fore.RED}Failed to initialize Agent Mode.{Style.RESET_ALL}"
AGENT_MODE_PAUSED = f"{Fore.YELLOW}Agent Mode paused.{Style.RESET_ALL}"
AGENT_MODE_RESUMED = f"{Fore.GREEN}Agent Mode resumed.{Style.RESET_ALL}"
AGENT_MODE_COMPLETED = f"{Fore.GREEN}Agent Mode execution completed.{Style.RESET_ALL}"
# PTT Status Messages
PTT_TASK_PENDING = f"{Fore.WHITE}{Style.RESET_ALL}"
PTT_TASK_IN_PROGRESS = f"{Fore.YELLOW}{Style.RESET_ALL}"
PTT_TASK_COMPLETED = f"{Fore.GREEN}{Style.RESET_ALL}"
PTT_TASK_FAILED = f"{Fore.RED}{Style.RESET_ALL}"
PTT_TASK_BLOCKED = f"{Fore.LIGHTBLACK_EX}{Style.RESET_ALL}"
PTT_TASK_VULNERABLE = f"{Fore.RED}{Style.RESET_ALL}"
PTT_TASK_NOT_VULNERABLE = f"{Fore.GREEN}{Style.RESET_ALL}"

View File

@@ -1 +1,20 @@
"""Core application logic for GHOSTCREW."""
"""Core GHOSTCREW modules."""
from .pentest_agent import PentestAgent
from .agent_runner import AgentRunner
from .model_manager import model_manager
from .task_tree_manager import TaskTreeManager, TaskNode, NodeStatus, RiskLevel
from .ptt_reasoning import PTTReasoningModule
from .agent_mode_controller import AgentModeController
__all__ = [
'PentestAgent',
'AgentRunner',
'model_manager',
'TaskTreeManager',
'TaskNode',
'NodeStatus',
'RiskLevel',
'PTTReasoningModule',
'AgentModeController'
]

View File

@@ -0,0 +1,870 @@
"""Agent Mode Controller for autonomous PTT-based penetration testing."""
import asyncio
import json
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
from colorama import Fore, Style
from core.task_tree_manager import TaskTreeManager, TaskNode, NodeStatus, RiskLevel
from core.ptt_reasoning import PTTReasoningModule
from core.model_manager import model_manager
from config.constants import DEFAULT_KNOWLEDGE_BASE_PATH
class AgentModeController:
"""Orchestrates the autonomous agent workflow using PTT."""
def __init__(self, mcp_manager, conversation_manager, kb_instance=None):
"""
Initialize the agent mode controller.
Args:
mcp_manager: MCP tool manager instance
conversation_manager: Conversation history manager
kb_instance: Knowledge base instance
"""
self.mcp_manager = mcp_manager
self.conversation_manager = conversation_manager
self.kb_instance = kb_instance
self.tree_manager = TaskTreeManager()
self.reasoning_module = PTTReasoningModule(self.tree_manager)
self.max_iterations = 50 # Safety limit
self.iteration_count = 0
self.start_time = None
self.paused = False
self.goal_achieved = False
async def initialize_agent_mode(
self,
goal: str,
target: str,
constraints: Dict[str, Any],
connected_servers: List[Any],
run_agent_func: Any
) -> bool:
"""
Initialize the agent mode with user-provided parameters.
Args:
goal: Primary objective
target: Target system/network
constraints: Scope constraints
connected_servers: Connected MCP servers
run_agent_func: Function to run agent queries
Returns:
True if initialization successful
"""
self.connected_servers = connected_servers
self.run_agent_func = run_agent_func
self.start_time = datetime.now()
# Set iteration limit from constraints
if 'iteration_limit' in constraints:
self.max_iterations = constraints['iteration_limit']
print(f"\n{Fore.CYAN}Initializing Agent Mode...{Style.RESET_ALL}")
print(f"{Fore.WHITE}Goal: {goal}{Style.RESET_ALL}")
print(f"{Fore.WHITE}Target: {target}{Style.RESET_ALL}")
print(f"{Fore.WHITE}Iteration Limit: {self.max_iterations}{Style.RESET_ALL}")
print(f"{Fore.WHITE}Constraints: {json.dumps(constraints, indent=2)}{Style.RESET_ALL}")
# Initialize the task tree
self.tree_manager.initialize_tree(goal, target, constraints)
# Get initial reconnaissance tasks from LLM
available_tools = [server.name for server in self.connected_servers]
init_prompt = self.reasoning_module.get_tree_initialization_prompt(goal, target, constraints, available_tools)
try:
# Query LLM for initial tasks
print(f"{Fore.YELLOW}Requesting initial tasks from AI (Available tools: {', '.join(available_tools)})...{Style.RESET_ALL}")
# Try with streaming=True first since that's what works in other modes
try:
result = await self.run_agent_func(
init_prompt,
self.connected_servers,
history=[],
streaming=True,
kb_instance=self.kb_instance
)
print(f"{Fore.GREEN}Agent runner completed (streaming=True){Style.RESET_ALL}")
except Exception as stream_error:
print(f"{Fore.YELLOW}Streaming mode failed: {stream_error}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Trying with streaming=False...{Style.RESET_ALL}")
result = await self.run_agent_func(
init_prompt,
self.connected_servers,
history=[],
streaming=False,
kb_instance=self.kb_instance
)
print(f"{Fore.GREEN}Agent runner completed (streaming=False){Style.RESET_ALL}")
print(f"{Fore.YELLOW}Parsing AI response...{Style.RESET_ALL}")
# Debug: Check what we got back
if not result:
print(f"{Fore.RED}No result returned from agent runner{Style.RESET_ALL}")
print(f"{Fore.YELLOW}This usually indicates an LLM configuration issue{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Falling back to default reconnaissance tasks...{Style.RESET_ALL}")
# Use default tasks instead
initial_tasks = self._get_default_initial_tasks(target, available_tools)
else:
print(f"{Fore.GREEN}Got result from agent runner: {type(result)}{Style.RESET_ALL}")
# Check different possible response formats
response_text = None
if hasattr(result, "final_output"):
response_text = result.final_output
print(f"{Fore.CYAN}Using result.final_output{Style.RESET_ALL}")
elif hasattr(result, "output"):
response_text = result.output
print(f"{Fore.CYAN}Using result.output{Style.RESET_ALL}")
elif hasattr(result, "content"):
response_text = result.content
print(f"{Fore.CYAN}Using result.content{Style.RESET_ALL}")
elif isinstance(result, str):
response_text = result
print(f"{Fore.CYAN}Using result as string{Style.RESET_ALL}")
else:
print(f"{Fore.RED}Unknown result format: {type(result)}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Result attributes: {dir(result)}{Style.RESET_ALL}")
# Try to get any text content from the result
for attr in ['text', 'message', 'response', 'data']:
if hasattr(result, attr):
response_text = getattr(result, attr)
print(f"{Fore.CYAN}Found text in result.{attr}{Style.RESET_ALL}")
break
if not response_text:
print(f"{Fore.RED}No response text found in result{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Using fallback initialization...{Style.RESET_ALL}")
initialization_data = self._get_fallback_initialization(target, available_tools)
else:
print(f"{Fore.GREEN}Got response: {len(response_text)} characters{Style.RESET_ALL}")
# Parse the response
initialization_data = self.reasoning_module.parse_tree_initialization_response(response_text)
if not initialization_data or not initialization_data.get('initial_tasks'):
print(f"{Fore.YELLOW}No tasks parsed from response. Using fallback initialization.{Style.RESET_ALL}")
initialization_data = self._get_fallback_initialization(target, available_tools)
else:
analysis = initialization_data.get('analysis', '')
print(f"{Fore.CYAN}LLM determined approach: {analysis}{Style.RESET_ALL}")
# Create the structure and tasks as determined by the LLM
structure_nodes = {}
# Create structure elements (phases, categories, etc.)
for structure_element in initialization_data.get('structure', []):
structure_node = TaskNode(
description=structure_element.get('name', 'Unknown Structure'),
parent_id=self.tree_manager.root_id,
node_type=structure_element.get('type', 'phase'),
attributes={
"details": structure_element.get('description', ''),
"justification": structure_element.get('justification', ''),
"llm_created": True
}
)
node_id = self.tree_manager.add_node(structure_node)
structure_nodes[structure_element.get('name', 'Unknown')] = node_id
# Add initial tasks to their specified parents
initial_tasks = initialization_data.get('initial_tasks', [])
for task_data in initial_tasks:
parent_name = task_data.get('parent', 'root')
# Determine parent node
if parent_name == 'root':
parent_id = self.tree_manager.root_id
else:
parent_id = structure_nodes.get(parent_name, self.tree_manager.root_id)
task_node = TaskNode(
description=task_data.get('description', 'Unknown task'),
parent_id=parent_id,
tool_used=task_data.get('tool_suggestion'),
priority=task_data.get('priority', 5),
risk_level=task_data.get('risk_level', 'low'),
attributes={'rationale': task_data.get('rationale', ''), 'llm_created': True}
)
self.tree_manager.add_node(task_node)
print(f"\n{Fore.GREEN}Agent Mode initialized with LLM-determined structure: {len(initialization_data.get('structure', []))} elements, {len(initial_tasks)} tasks.{Style.RESET_ALL}")
# Display the initial tree
print(f"\n{Fore.CYAN}Initial Task Tree:{Style.RESET_ALL}")
print(self.tree_manager.to_natural_language())
return True
except Exception as e:
print(f"{Fore.RED}Failed to initialize agent mode: {e}{Style.RESET_ALL}")
import traceback
traceback.print_exc()
# Try to continue with default tasks even if there's an error
print(f"{Fore.YELLOW}Attempting to continue with default tasks...{Style.RESET_ALL}")
try:
initialization_data = self._get_fallback_initialization(target, available_tools)
# Create structure and tasks from fallback
structure_nodes = {}
# Create structure elements
for structure_element in initialization_data.get('structure', []):
structure_node = TaskNode(
description=structure_element.get('name', 'Unknown Structure'),
parent_id=self.tree_manager.root_id,
node_type=structure_element.get('type', 'phase'),
attributes={
"details": structure_element.get('description', ''),
"justification": structure_element.get('justification', ''),
"fallback_created": True
}
)
node_id = self.tree_manager.add_node(structure_node)
structure_nodes[structure_element.get('name', 'Unknown')] = node_id
# Add initial tasks
initial_tasks = initialization_data.get('initial_tasks', [])
for task_data in initial_tasks:
parent_name = task_data.get('parent', 'root')
if parent_name == 'root':
parent_id = self.tree_manager.root_id
else:
parent_id = structure_nodes.get(parent_name, self.tree_manager.root_id)
task_node = TaskNode(
description=task_data.get('description', 'Unknown task'),
parent_id=parent_id,
tool_used=task_data.get('tool_suggestion'),
priority=task_data.get('priority', 5),
risk_level=task_data.get('risk_level', 'low'),
attributes={'rationale': task_data.get('rationale', ''), 'fallback_created': True}
)
self.tree_manager.add_node(task_node)
print(f"\n{Fore.GREEN}Agent Mode initialized with fallback structure: {len(initialization_data.get('structure', []))} elements, {len(initial_tasks)} tasks.{Style.RESET_ALL}")
print(f"\n{Fore.CYAN}Initial Task Tree:{Style.RESET_ALL}")
print(self.tree_manager.to_natural_language())
return True
except Exception as fallback_error:
print(f"{Fore.RED}Fallback initialization also failed: {fallback_error}{Style.RESET_ALL}")
return False
async def run_autonomous_loop(self) -> None:
"""Run the main autonomous agent loop."""
print(f"\n{Fore.CYAN}Starting autonomous penetration test...{Style.RESET_ALL}")
if self.max_iterations == 0:
print(f"{Fore.YELLOW}Running until goal achieved or no more actions available{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}Iteration limit: {self.max_iterations} iterations{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Press Ctrl+C to pause at any time.{Style.RESET_ALL}\n")
# Set effective limit - use a high number for "unlimited" but still have a safety limit
effective_limit = self.max_iterations if self.max_iterations > 0 else 500
while self.iteration_count < effective_limit and not self.goal_achieved:
try:
if self.paused:
await self._handle_pause()
if self.paused: # Still paused after handling
break
# Increment iteration count at the beginning
self.iteration_count += 1
# Display current progress
self._display_progress()
# Get next action from PTT
next_action = await self._select_next_action()
if not next_action:
print(f"{Fore.YELLOW}No viable next actions found. Checking goal status...{Style.RESET_ALL}")
await self._check_goal_achievement()
break
# Execute the selected action
await self._execute_action(next_action)
# Check goal achievement after every iteration
await self._check_goal_achievement()
# If goal is achieved, stop the loop
if self.goal_achieved:
break
# Brief pause between actions
await asyncio.sleep(2)
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Pausing agent mode...{Style.RESET_ALL}")
self.paused = True
except Exception as e:
print(f"{Fore.RED}Error in autonomous loop: {e}{Style.RESET_ALL}")
await asyncio.sleep(5)
# Display final reason for stopping
if self.goal_achieved:
print(f"\n{Fore.GREEN}Autonomous execution stopped: Goal achieved!{Style.RESET_ALL}")
elif self.iteration_count >= effective_limit:
print(f"\n{Fore.YELLOW}Autonomous execution stopped: Iteration limit reached ({effective_limit}){Style.RESET_ALL}")
elif self.paused:
print(f"\n{Fore.YELLOW}Autonomous execution stopped: User paused{Style.RESET_ALL}")
else:
print(f"\n{Fore.YELLOW}Autonomous execution stopped: No more viable actions{Style.RESET_ALL}")
# Final summary
self._display_final_summary()
async def _select_next_action(self) -> Optional[Dict[str, Any]]:
"""Select the next action based on PTT state."""
# Get available tools
available_tools = [server.name for server in self.connected_servers]
# Get prioritized candidate tasks
candidates = self.tree_manager.get_candidate_tasks()
if not candidates:
return None
prioritized = self.tree_manager.prioritize_tasks(candidates)
print(f"\n{Fore.CYAN}Selecting next action...{Style.RESET_ALL}")
print(f"{Fore.WHITE}Available tools: {', '.join(available_tools)}{Style.RESET_ALL}")
# Query LLM for action selection
selection_prompt = self.reasoning_module.get_next_action_prompt(available_tools)
try:
result = await self.run_agent_func(
selection_prompt,
self.connected_servers,
history=self.conversation_manager.get_history(),
streaming=True, # Use streaming=True since it works in other modes
kb_instance=self.kb_instance
)
# Handle response format variations
response_text = None
if hasattr(result, "final_output"):
response_text = result.final_output
elif hasattr(result, "output"):
response_text = result.output
elif isinstance(result, str):
response_text = result
if response_text:
action_data = self.reasoning_module.parse_next_action_response(response_text, available_tools)
if action_data:
# Get the selected task
task_index = action_data.get('selected_task_index', 1) - 1
if 0 <= task_index < len(prioritized):
selected_task = prioritized[task_index]
return {
'task': selected_task,
'command': action_data.get('command'),
'tool': action_data.get('tool'),
'rationale': action_data.get('rationale'),
'expected_outcome': action_data.get('expected_outcome')
}
except Exception as e:
print(f"{Fore.RED}Error selecting next action: {e}{Style.RESET_ALL}")
# Fallback to first prioritized task
if prioritized:
return {'task': prioritized[0], 'command': None, 'tool': None}
return None
async def _execute_action(self, action: Dict[str, Any]) -> None:
"""Execute a selected action."""
task = action['task']
command = action.get('command')
tool = action.get('tool')
available_tools = [server.name for server in self.connected_servers]
print(f"\n{Fore.CYAN}Executing: {task.description}{Style.RESET_ALL}")
if action.get('rationale'):
print(f"{Fore.WHITE}Rationale: {action['rationale']}{Style.RESET_ALL}")
if command:
print(f"{Fore.WHITE}Command: {command}{Style.RESET_ALL}")
if tool:
print(f"{Fore.WHITE}Using tool: {tool}{Style.RESET_ALL}")
# Check if suggested tool is available
if tool and tool not in available_tools and tool != 'manual':
print(f"{Fore.YELLOW}Tool '{tool}' not available. Available: {', '.join(available_tools)}{Style.RESET_ALL}")
print(f"{Fore.CYAN}Asking AI to adapt approach with available tools...{Style.RESET_ALL}")
# Let AI figure out how to adapt
adaptation_query = f"""The task "{task.description}" was planned to use "{tool}" but that tool is not available.
Available tools: {', '.join(available_tools)}
Please adapt this task to work with the available tools. How would you accomplish this objective using {', '.join(available_tools)}?
Be creative and think about alternative approaches that achieve the same security testing goal."""
command = adaptation_query
# Update task status to in-progress
self.tree_manager.update_node(task.id, {'status': NodeStatus.IN_PROGRESS.value})
# Execute via agent
execution_query = command if command else f"Perform the following task: {task.description}"
try:
result = await self.run_agent_func(
execution_query,
self.connected_servers,
history=self.conversation_manager.get_history(),
streaming=True,
kb_instance=self.kb_instance
)
# Handle response format variations
response_text = None
if hasattr(result, "final_output"):
response_text = result.final_output
elif hasattr(result, "output"):
response_text = result.output
elif isinstance(result, str):
response_text = result
if response_text:
# Update conversation history
self.conversation_manager.add_dialogue(execution_query)
self.conversation_manager.update_last_response(response_text)
# Update PTT based on results
await self._update_tree_from_results(
task,
response_text,
command or execution_query
)
except Exception as e:
print(f"{Fore.RED}Error executing action: {e}{Style.RESET_ALL}")
self.tree_manager.update_node(task.id, {
'status': NodeStatus.FAILED.value,
'findings': f"Execution failed: {str(e)}"
})
async def _update_tree_from_results(self, task: TaskNode, output: str, command: str) -> None:
"""Update the PTT based on execution results."""
try:
# Create update prompt
update_prompt = self.reasoning_module.get_tree_update_prompt(output, command, task)
# Get LLM analysis
result = await self.run_agent_func(
update_prompt,
self.connected_servers,
history=self.conversation_manager.get_history(),
streaming=True,
kb_instance=self.kb_instance
)
# Handle response format variations
response_text = None
if hasattr(result, "final_output"):
response_text = result.final_output
elif hasattr(result, "output"):
response_text = result.output
elif isinstance(result, str):
response_text = result
if response_text:
node_updates, new_tasks = self.reasoning_module.parse_tree_update_response(response_text)
# Update the executed node
if node_updates:
node_updates['timestamp'] = datetime.now().isoformat()
node_updates['command_executed'] = command
self.tree_manager.update_node(task.id, node_updates)
# Check if goal might be achieved before adding new tasks
preliminary_goal_check = await self._quick_goal_check()
# Only add new tasks if goal is not achieved and they align with original goal
if not preliminary_goal_check and new_tasks:
# Filter tasks to ensure they align with the original goal
filtered_tasks = self._filter_tasks_by_goal_scope(new_tasks)
for new_task_data in filtered_tasks:
parent_phase = new_task_data.get('parent_phase', 'Phase 2')
parent_node = self._find_phase_node(parent_phase)
if parent_node:
new_task = TaskNode(
description=new_task_data.get('description'),
parent_id=parent_node.id,
tool_used=new_task_data.get('tool_suggestion'),
priority=new_task_data.get('priority', 5),
risk_level=new_task_data.get('risk_level', 'low'),
attributes={'rationale': new_task_data.get('rationale', '')}
)
self.tree_manager.add_node(new_task)
if filtered_tasks:
print(f"{Fore.GREEN}PTT updated with {len(filtered_tasks)} new goal-aligned tasks.{Style.RESET_ALL}")
if len(filtered_tasks) < len(new_tasks):
print(f"{Fore.YELLOW}Filtered out {len(new_tasks) - len(filtered_tasks)} tasks that exceeded goal scope.{Style.RESET_ALL}")
elif preliminary_goal_check:
print(f"{Fore.GREEN}Goal appears to be achieved - not adding new tasks.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Error updating PTT: {e}{Style.RESET_ALL}")
# Default to marking as completed if update fails
self.tree_manager.update_node(task.id, {
'status': NodeStatus.COMPLETED.value,
'timestamp': datetime.now().isoformat()
})
def _filter_tasks_by_goal_scope(self, tasks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Filter tasks to ensure they align with the original goal scope."""
goal_lower = self.tree_manager.goal.lower()
filtered_tasks = []
# Define scope expansion keywords that should be avoided for simple goals
expansion_keywords = ["exploit", "compromise", "attack", "penetrate", "shell", "backdoor", "privilege", "escalat"]
# Check if the original goal is simple information gathering
info_keywords = ["check", "identify", "determine", "find", "discover", "enumerate", "list", "version", "banner"]
is_simple_info_goal = any(keyword in goal_lower for keyword in info_keywords)
for task in tasks:
task_desc_lower = task.get('description', '').lower()
# If it's a simple info goal, avoid adding exploitation tasks
if is_simple_info_goal and any(keyword in task_desc_lower for keyword in expansion_keywords):
print(f"{Fore.YELLOW}Skipping task that exceeds goal scope: {task.get('description', '')}{Style.RESET_ALL}")
continue
filtered_tasks.append(task)
return filtered_tasks
async def _quick_goal_check(self) -> bool:
"""Quick check if goal might be achieved based on completed tasks."""
# Simple heuristic: if we have completed tasks with findings for info gathering goals
goal_lower = self.tree_manager.goal.lower()
info_keywords = ["check", "identify", "determine", "find", "discover", "enumerate", "list", "version", "banner"]
if any(keyword in goal_lower for keyword in info_keywords):
# For info gathering goals, check if we have relevant findings
for node in self.tree_manager.nodes.values():
if node.status == NodeStatus.COMPLETED and node.findings:
# Basic keyword matching for goal completion
if "version" in goal_lower and "version" in node.findings.lower():
return True
if "banner" in goal_lower and "banner" in node.findings.lower():
return True
if any(keyword in goal_lower and keyword in node.description.lower() for keyword in info_keywords):
return True
return False
async def _check_goal_achievement(self) -> None:
"""Check if the primary goal has been achieved."""
goal_prompt = self.reasoning_module.get_goal_check_prompt()
try:
result = await self.run_agent_func(
goal_prompt,
self.connected_servers,
history=self.conversation_manager.get_history(),
streaming=True, # Use streaming=True
kb_instance=self.kb_instance
)
# Handle response format variations
response_text = None
if hasattr(result, "final_output"):
response_text = result.final_output
elif hasattr(result, "output"):
response_text = result.output
elif isinstance(result, str):
response_text = result
if response_text:
goal_status = self.reasoning_module.parse_goal_check_response(response_text)
if goal_status.get('goal_achieved', False):
confidence = goal_status.get('confidence', 0)
if confidence >= 80:
print(f"\n{Fore.GREEN}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.GREEN}GOAL ACHIEVED! (Confidence: {confidence}%){Style.RESET_ALL}")
print(f"{Fore.WHITE}Evidence: {goal_status.get('evidence', 'N/A')}{Style.RESET_ALL}")
print(f"{Fore.GREEN}{'='*60}{Style.RESET_ALL}\n")
self.goal_achieved = True
else:
print(f"{Fore.YELLOW}Goal possibly achieved but confidence is low ({confidence}%). Continuing...{Style.RESET_ALL}")
else:
remaining = goal_status.get('remaining_objectives', 'Unknown')
print(f"{Fore.YELLOW}Goal not yet achieved. Remaining: {remaining}{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Error checking goal achievement: {e}{Style.RESET_ALL}")
def _display_progress(self) -> None:
"""Display current progress and statistics."""
stats = self.tree_manager.get_statistics()
elapsed = datetime.now() - self.start_time if self.start_time else None
print(f"\n{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.CYAN}Iteration {self.iteration_count} | Elapsed: {elapsed}{Style.RESET_ALL}")
print(f"{Fore.WHITE}Tasks - Total: {stats['total_nodes']} | "
f"Completed: {stats['status_counts'].get('completed', 0)} | "
f"In Progress: {stats['status_counts'].get('in_progress', 0)} | "
f"Pending: {stats['status_counts'].get('pending', 0)}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Vulnerabilities Found: {stats['status_counts'].get('vulnerable', 0)}{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*60}{Style.RESET_ALL}")
def _display_final_summary(self) -> None:
"""Display final summary of the agent mode execution."""
print(f"\n{Fore.CYAN}{'='*70}{Style.RESET_ALL}")
print(f"{Fore.CYAN}AGENT MODE EXECUTION SUMMARY{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*70}{Style.RESET_ALL}")
summary = self.reasoning_module.generate_strategic_summary()
print(f"{Fore.WHITE}{summary}{Style.RESET_ALL}")
# Calculate effective limit
effective_limit = self.max_iterations if self.max_iterations > 0 else 500
# Execution Statistics
if self.start_time:
elapsed = datetime.now() - self.start_time
print(f"\n{Fore.WHITE}Execution Statistics:{Style.RESET_ALL}")
print(f"Total Execution Time: {elapsed}")
print(f"Iterations Completed: {self.iteration_count}/{effective_limit}")
if self.iteration_count > 0:
avg_time_per_iteration = elapsed.total_seconds() / self.iteration_count
print(f"Average Time per Iteration: {avg_time_per_iteration:.1f} seconds")
# Calculate efficiency
completion_rate = (self.iteration_count / effective_limit) * 100
print(f"Completion Rate: {completion_rate:.1f}%")
# Estimate remaining time if stopped early
if self.iteration_count < effective_limit and not self.goal_achieved:
remaining_iterations = effective_limit - self.iteration_count
estimated_remaining_time = remaining_iterations * avg_time_per_iteration
print(f"Estimated Time for Full Run: {elapsed.total_seconds() + estimated_remaining_time:.0f} seconds")
print(f"{Fore.WHITE}Total Iterations: {self.iteration_count}{Style.RESET_ALL}")
if self.goal_achieved:
print(f"\n{Fore.GREEN}PRIMARY GOAL ACHIEVED!{Style.RESET_ALL}")
efficiency = "Excellent" if self.iteration_count <= 10 else "Good" if self.iteration_count <= 20 else "Extended"
print(f"{Fore.GREEN}Efficiency: {efficiency} (achieved in {self.iteration_count} iterations){Style.RESET_ALL}")
else:
print(f"\n{Fore.YELLOW}Primary goal not fully achieved within iteration limit.{Style.RESET_ALL}")
if self.iteration_count >= effective_limit:
print(f"{Fore.YELLOW}Consider increasing iteration limit for more thorough testing.{Style.RESET_ALL}")
print(f"{Fore.CYAN}{'='*70}{Style.RESET_ALL}")
async def _handle_pause(self) -> None:
"""Handle pause state and user options."""
print(f"\n{Fore.YELLOW}Agent Mode Paused{Style.RESET_ALL}")
# Calculate effective limit
effective_limit = self.max_iterations if self.max_iterations > 0 else 500
# Display current progress
print(f"\n{Fore.MAGENTA}Progress Statistics:{Style.RESET_ALL}")
print(f"Iterations: {self.iteration_count}/{effective_limit}")
elapsed = datetime.now() - self.start_time if self.start_time else None
if elapsed:
print(f"Elapsed Time: {elapsed}")
if self.iteration_count > 0:
avg_time = elapsed.total_seconds() / self.iteration_count
print(f"Average per Iteration: {avg_time:.1f} seconds")
print("\nOptions:")
print("1. Resume execution")
print("2. View current PTT")
print("3. View detailed statistics")
print("4. Save PTT state")
print("5. Add manual task")
print("6. Modify iteration limit")
print("7. Exit agent mode")
while self.paused:
choice = input(f"\n{Fore.GREEN}Select option (1-7): {Style.RESET_ALL}").strip()
if choice == '1':
self.paused = False
print(f"{Fore.GREEN}Resuming agent mode...{Style.RESET_ALL}")
elif choice == '2':
print(f"\n{Fore.CYAN}Current PTT State:{Style.RESET_ALL}")
print(self.tree_manager.to_natural_language())
elif choice == '3':
self._display_progress()
print(self.reasoning_module.generate_strategic_summary())
elif choice == '4':
self._save_ptt_state()
elif choice == '5':
await self._add_manual_task()
elif choice == '6':
self._modify_iteration_limit()
elif choice == '7':
print(f"{Fore.YELLOW}Exiting agent mode...{Style.RESET_ALL}")
break
else:
print(f"{Fore.RED}Invalid choice.{Style.RESET_ALL}")
def _modify_iteration_limit(self) -> None:
"""Allow user to modify the iteration limit during execution."""
try:
print(f"\n{Fore.CYAN}Modify Iteration Limit{Style.RESET_ALL}")
print(f"Current limit: {self.max_iterations}")
print(f"Iterations completed: {self.iteration_count}")
print(f"Iterations remaining: {self.max_iterations - self.iteration_count}")
new_limit_input = input(f"New iteration limit (current: {self.max_iterations}): ").strip()
if new_limit_input:
try:
new_limit = int(new_limit_input)
# Ensure new limit is at least the number of iterations already completed
min_limit = self.iteration_count + 1 # Allow at least 1 more iteration
if new_limit < min_limit:
print(f"{Fore.YELLOW}Minimum limit is {min_limit} (iterations already completed + 1){Style.RESET_ALL}")
new_limit = min_limit
# Apply reasonable maximum
new_limit = min(new_limit, 200)
self.max_iterations = new_limit
print(f"{Fore.GREEN}Iteration limit updated to: {new_limit}{Style.RESET_ALL}")
except ValueError:
print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}No change made.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Error modifying iteration limit: {e}{Style.RESET_ALL}")
async def _add_manual_task(self) -> None:
"""Allow user to manually add a task to the PTT."""
try:
print(f"\n{Fore.CYAN}Add Manual Task{Style.RESET_ALL}")
# Get task details from user
description = input("Task description: ").strip()
if not description:
print(f"{Fore.RED}Task description required.{Style.RESET_ALL}")
return
print("Select phase:")
print("1. Phase 1: Reconnaissance")
print("2. Phase 2: Vulnerability Assessment")
print("3. Phase 3: Exploitation")
print("4. Phase 4: Post-Exploitation")
phase_choice = input("Phase (1-4): ").strip()
phase_map = {
'1': 'Phase 1',
'2': 'Phase 2',
'3': 'Phase 3',
'4': 'Phase 4'
}
phase = phase_map.get(phase_choice, 'Phase 2')
parent_node = self._find_phase_node(phase)
if not parent_node:
print(f"{Fore.RED}Phase not found.{Style.RESET_ALL}")
return
# Get priority
try:
priority = int(input("Priority (1-10, default 5): ").strip() or "5")
priority = max(1, min(10, priority))
except:
priority = 5
# Get risk level
print("Risk level:")
print("1. Low")
print("2. Medium")
print("3. High")
risk_choice = input("Risk (1-3, default 2): ").strip()
risk_map = {'1': 'low', '2': 'medium', '3': 'high'}
risk_level = risk_map.get(risk_choice, 'medium')
# Create the task
from core.task_tree_manager import TaskNode
manual_task = TaskNode(
description=description,
parent_id=parent_node.id,
tool_used='manual',
priority=priority,
risk_level=risk_level,
attributes={'manual_addition': True, 'added_by_user': True}
)
self.tree_manager.add_node(manual_task)
print(f"{Fore.GREEN}Manual task added to {phase}.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Error adding manual task: {e}{Style.RESET_ALL}")
def _save_ptt_state(self) -> None:
"""Save the current PTT state to file."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"reports/ptt_state_{timestamp}.json"
try:
import os
os.makedirs("reports", exist_ok=True)
with open(filename, 'w') as f:
f.write(self.tree_manager.to_json())
print(f"{Fore.GREEN}PTT state saved to: {filename}{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Failed to save PTT state: {e}{Style.RESET_ALL}")
def _find_phase_node(self, phase_description: str) -> Optional[TaskNode]:
"""Find a structure node by description (phase, category, etc.)."""
for node in self.tree_manager.nodes.values():
# Look for any non-task node that matches the description
if node.node_type != "task" and phase_description.lower() in node.description.lower():
return node
# If no exact match, try to find any suitable parent node
# Return root if no structure nodes exist
return self.tree_manager.nodes.get(self.tree_manager.root_id)
def get_ptt_for_reporting(self) -> TaskTreeManager:
"""Get the PTT for report generation."""
return self.tree_manager
def _get_fallback_initialization(self, target: str, available_tools: List[str]) -> Dict[str, Any]:
"""Return minimal fallback initialization when LLM fails."""
print(f"{Fore.YELLOW}Using minimal fallback initialization. The system will rely on dynamic task generation.{Style.RESET_ALL}")
return {
'analysis': 'Fallback initialization - LLM will determine structure dynamically during execution',
'structure': [],
'initial_tasks': []
}

View File

@@ -13,6 +13,7 @@ from config.constants import (
)
from config.app_config import app_config
from core.agent_runner import agent_runner
from core.agent_mode_controller import AgentModeController
from tools.mcp_manager import MCPManager
from ui.menu_system import MenuSystem
from ui.conversation_manager import ConversationManager
@@ -39,6 +40,7 @@ class PentestAgent:
self.workflow_engine = WorkflowEngine()
self.kb_instance = None
self.reporting_available = self._check_reporting_available()
self.agent_mode_controller = None # Will be initialized when needed
@staticmethod
def _check_reporting_available() -> bool:
@@ -165,6 +167,116 @@ class PentestAgent:
self.menu_system.display_operation_cancelled()
break
async def run_agent_mode(self, connected_servers: List) -> None:
"""Run autonomous agent mode with PTT."""
if not connected_servers:
self.menu_system.display_agent_mode_requirements_message()
return
# Display introduction
self.menu_system.display_agent_mode_intro()
# Get agent mode parameters
params = self.menu_system.get_agent_mode_params()
if not params:
return
# Initialize agent mode controller
self.agent_mode_controller = AgentModeController(
self.mcp_manager,
self.conversation_manager,
self.kb_instance
)
try:
# Initialize agent mode
init_success = await self.agent_mode_controller.initialize_agent_mode(
goal=params['goal'],
target=params['target'],
constraints=params['constraints'],
connected_servers=connected_servers,
run_agent_func=agent_runner.run_agent
)
if init_success:
# Run the autonomous loop
await self.agent_mode_controller.run_autonomous_loop()
# Handle post-execution options
await self._handle_agent_mode_completion()
else:
print(f"{Fore.RED}Failed to initialize agent mode.{Style.RESET_ALL}")
self.menu_system.press_enter_to_continue()
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Agent mode interrupted by user.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Error in agent mode: {e}{Style.RESET_ALL}")
traceback.print_exc()
finally:
self.menu_system.press_enter_to_continue()
async def _handle_agent_mode_completion(self) -> None:
"""Handle post-execution options for agent mode."""
# Ask if user wants to generate a report
if self.reporting_available and self.menu_system.ask_generate_report():
try:
# Generate report from PTT
from reporting.generators import generate_report_from_ptt
ptt = self.agent_mode_controller.get_ptt_for_reporting()
report_path = await generate_report_from_ptt(
ptt,
self.conversation_manager.get_history(),
run_agent_func=agent_runner.run_agent,
connected_servers=self.mcp_manager.connected_servers if hasattr(self.mcp_manager, 'connected_servers') else [],
kb_instance=self.kb_instance
)
if report_path:
self.menu_system.display_report_generated(report_path)
else:
print(f"{Fore.YELLOW}Report generation returned no path.{Style.RESET_ALL}")
except ImportError:
# Fallback if PTT report generation not available
print(f"{Fore.YELLOW}PTT report generation not available. Saving raw data...{Style.RESET_ALL}")
self._save_agent_mode_data()
except Exception as e:
self.menu_system.display_report_error(e)
self._save_agent_mode_data()
# Ask about saving raw data
elif self.menu_system.ask_save_raw_history():
self._save_agent_mode_data()
def _save_agent_mode_data(self) -> None:
"""Save agent mode execution data."""
try:
import os
import json
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Create reports directory if it doesn't exist
os.makedirs("reports", exist_ok=True)
# Save conversation history
history_file = f"reports/agent_mode_history_{timestamp}.json"
with open(history_file, 'w', encoding='utf-8') as f:
json.dump(self.conversation_manager.get_history(), f, indent=2)
# Save PTT state
if self.agent_mode_controller:
ptt_file = f"reports/agent_mode_ptt_{timestamp}.json"
with open(ptt_file, 'w', encoding='utf-8') as f:
f.write(self.agent_mode_controller.tree_manager.to_json())
print(f"{Fore.GREEN}Agent mode data saved to reports/ directory{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Failed to save agent mode data: {e}{Style.RESET_ALL}")
async def _execute_workflow(self, workflow_info: tuple, connected_servers: List) -> None:
"""Execute a selected workflow."""
workflow_key, workflow_name = workflow_info
@@ -291,6 +403,10 @@ class PentestAgent:
await self.run_automated_mode(connected_servers)
elif menu_choice == "3":
# Agent mode
await self.run_agent_mode(connected_servers)
elif menu_choice == "4":
# Exit
self.menu_system.display_exit_message()
break

520
core/ptt_reasoning.py Normal file
View File

@@ -0,0 +1,520 @@
"""PTT Reasoning Module for LLM-based task tree management."""
import json
import re
from typing import Dict, List, Optional, Any, Tuple
from colorama import Fore, Style
from core.task_tree_manager import TaskTreeManager, TaskNode, NodeStatus
class PTTReasoningModule:
"""Handles LLM interactions for PTT management and decision making."""
def __init__(self, tree_manager: TaskTreeManager):
"""
Initialize the PTT reasoning module.
Args:
tree_manager: The task tree manager instance
"""
self.tree_manager = tree_manager
def get_tree_initialization_prompt(self, goal: str, target: str, constraints: Dict[str, Any], available_tools: List[str] = None) -> str:
"""
Generate prompt for tree initialization.
Args:
goal: Primary penetration testing objective
target: Target system/network
constraints: Scope constraints and limitations
available_tools: List of available MCP tools
Returns:
Tree initialization prompt
"""
tool_info = ""
if available_tools:
tool_info = f"""
Available MCP Tools: {', '.join(available_tools)}
You must work within the capabilities of these connected tools. Think about what each tool can accomplish:
- Consider the full capabilities of each available tool
- Adapt your approach to work with available tools
- Be creative in how you leverage available tools
"""
else:
tool_info = """
No MCP tools are currently connected. Design an approach that describes the security testing objectives without tool dependencies.
"""
prompt = f"""You are an autonomous security agent initializing a Pentesting Task Tree (PTT) for a security assessment.
ASSESSMENT CONTEXT:
Goal: {goal}
Target: {target}
Constraints: {json.dumps(constraints, indent=2)}
{tool_info}
TASK:
Analyze this goal and determine what structure and initial tasks are needed to accomplish it efficiently.
DO NOT assume any predefined phases or structure. Instead:
1. Analyze what the goal actually requires
2. Determine if you need phases/categories or if direct tasks are better
3. Create an appropriate initial structure
4. Define specific actionable tasks to start with
Consider:
- What does this specific goal require?
- What's the minimal viable approach?
- How can available tools be leveraged?
- What structure makes sense for THIS goal?
IMPORTANT: When suggesting tool usage, be specific about commands and modules. For example:
Provide your analysis and initial structure in JSON format:
{{
"analysis": "Your assessment of what this goal requires and approach",
"structure": [
{{
"type": "phase/category/direct",
"name": "Name of organizational structure",
"description": "What this encompasses",
"justification": "Why this structure element is needed for this goal"
}}
],
"initial_tasks": [
{{
"description": "Specific actionable task",
"parent": "Which structure element this belongs to, or 'root' for direct tasks",
"tool_suggestion": "Which available tool to use, or 'manual' if no suitable tool",
"priority": 1-10,
"risk_level": "low/medium/high",
"rationale": "Why this task is necessary for the goal"
}}
]
}}
BE INTELLIGENT: If the goal is simple, don't create complex multi-phase structures. If it's complex, then structure appropriately. Let the goal drive the structure, not the other way around."""
return prompt
def get_tree_update_prompt(self, tool_output: str, command: str, node: TaskNode) -> str:
"""
Generate prompt for updating the tree based on tool output.
Args:
tool_output: Output from the executed tool
command: The command that was executed
node: The node being updated
Returns:
Update prompt
"""
current_tree = self.tree_manager.to_natural_language()
prompt = f"""You are managing a Pentesting Task Tree (PTT). A task has been executed and you need to update the tree based on the results.
Current PTT State:
{current_tree}
Executed Task: {node.description}
Command: {command}
Tool Output:
{tool_output[:2000]} # Limit output length
Based on this output, provide updates in the following JSON format:
{{
"node_updates": {{
"status": "completed/failed/vulnerable/not_vulnerable",
"findings": "Summary of key findings from the output",
"output_summary": "Brief technical summary"
}},
"new_tasks": [
{{
"description": "New task based on findings",
"parent_phase": "Phase 1/2/3/4",
"tool_suggestion": "Suggested tool",
"priority": 1-10,
"risk_level": "low/medium/high",
"rationale": "Why this task is important"
}}
],
"insights": "Any strategic insights or patterns noticed"
}}
Consider:
1. What vulnerabilities or opportunities were discovered?
2. What follow-up actions are needed based on the findings?
3. Should any new attack vectors be explored?
4. Are there any security misconfigurations evident?"""
return prompt
def get_next_action_prompt(self, available_tools: List[str]) -> str:
"""
Generate prompt for selecting the next action.
Args:
available_tools: List of available MCP tools
Returns:
Next action selection prompt
"""
current_tree = self.tree_manager.to_natural_language()
candidates = self.tree_manager.get_candidate_tasks()
# Prepare candidate descriptions
candidate_desc = []
for i, task in enumerate(candidates[:10]): # Limit to top 10
desc = f"{i+1}. {task.description}"
if task.priority:
desc += f" (Priority: {task.priority})"
candidate_desc.append(desc)
# Generate tool context
if available_tools:
tool_context = f"""
Connected MCP Tools: {', '.join(available_tools)}
Think about how to leverage these tools for the selected task. Each tool has its own capabilities -
be creative and intelligent about how to accomplish penetration testing objectives with available tools.
If a tool doesn't directly support a traditional approach, consider alternative methods that achieve the same goal.
"""
else:
tool_context = """
No MCP tools are currently connected. Select tasks that can be performed manually or recommend connecting appropriate tools.
"""
prompt = f"""You are managing a Pentesting Task Tree (PTT) and need to select the next action.
Goal: {self.tree_manager.goal}
Target: {self.tree_manager.target}
Current PTT State:
{current_tree}
{tool_context}
Candidate Tasks:
{chr(10).join(candidate_desc)}
Statistics:
- Total tasks: {len(self.tree_manager.nodes)}
- Completed: {sum(1 for n in self.tree_manager.nodes.values() if n.status == NodeStatus.COMPLETED)}
- In Progress: {sum(1 for n in self.tree_manager.nodes.values() if n.status == NodeStatus.IN_PROGRESS)}
- Pending: {sum(1 for n in self.tree_manager.nodes.values() if n.status == NodeStatus.PENDING)}
Select the most strategic next action and provide your response in JSON format:
{{
"selected_task_index": 1-based index from candidate list,
"rationale": "Why this task is the best next step",
"command": "Intelligent request that leverages available tools effectively",
"tool": "Which available tool to use, or 'manual' if no suitable tool",
"expected_outcome": "What we hope to discover/achieve",
"alternative_if_blocked": "Backup task index if this fails"
}}
Consider:
1. Logical progression through the penetration testing methodology
2. Task dependencies and prerequisites
3. Risk vs reward of different approaches
4. How to best utilize available tools for maximum effectiveness
5. Strategic value of each potential action
Be intelligent about tool selection - think about what each available tool can accomplish."""
return prompt
def get_goal_check_prompt(self) -> str:
"""
Generate prompt to check if the goal has been achieved.
Returns:
Goal achievement check prompt
"""
current_tree = self.tree_manager.to_natural_language()
goal = self.tree_manager.goal
# Extract completed tasks and findings for better context
completed_tasks_with_findings = []
for node in self.tree_manager.nodes.values():
if node.status == NodeStatus.COMPLETED and node.findings:
completed_tasks_with_findings.append(f"{node.description}: {node.findings}")
completed_context = "\n".join(completed_tasks_with_findings) if completed_tasks_with_findings else "No completed tasks with findings yet."
prompt = f"""Analyze the current Pentesting Task Tree (PTT) to determine if the PRIMARY GOAL has been achieved.
IMPORTANT: Focus ONLY on whether the specific goal stated has been accomplished. Do not suggest additional scope or activities beyond the original goal.
PRIMARY GOAL: {goal}
Target: {self.tree_manager.target}
COMPLETED TASKS WITH FINDINGS:
{completed_context}
Current PTT State:
{current_tree}
GOAL ACHIEVEMENT CRITERIA:
- For information gathering goals, the goal is achieved when that specific information is obtained
- For vulnerability assessment goals, the goal is achieved when vulnerabilities are identified and documented
- For exploitation goals, the goal is achieved when successful exploitation is demonstrated
- For access goals, the goal is achieved when the specified access level is obtained
Provide your analysis in JSON format:
{{
"goal_achieved": true/false,
"confidence": 0-100,
"evidence": "Specific evidence that the PRIMARY GOAL has been met (quote actual findings)",
"remaining_objectives": "What still needs to be done if goal not achieved (related to the ORIGINAL goal only)",
"recommendations": "Next steps ONLY if they relate to the original goal - do not expand scope",
"scope_warning": "Flag if any tasks seem to exceed the original goal scope"
}}
Consider:
1. Has the SPECIFIC goal been demonstrably achieved?
2. Is there sufficient evidence/proof in the completed tasks?
3. Are there critical paths unexplored that are NECESSARY for the original goal?
4. Would additional testing strengthen the results for the ORIGINAL goal only?
DO NOT recommend expanding the scope beyond the original goal. If the goal is completed, mark it as achieved regardless of what other security activities could be performed."""
return prompt
def parse_tree_initialization_response(self, llm_response: str) -> Dict[str, Any]:
"""Parse LLM response for tree initialization."""
try:
print(f"{Fore.CYAN}Parsing initialization response...{Style.RESET_ALL}")
# Extract JSON from response
response_json = self._extract_json(llm_response)
analysis = response_json.get('analysis', 'No analysis provided')
structure = response_json.get('structure', [])
initial_tasks = response_json.get('initial_tasks', [])
print(f"{Fore.GREEN}LLM Analysis: {analysis}{Style.RESET_ALL}")
print(f"{Fore.GREEN}Successfully parsed {len(structure)} structure elements and {len(initial_tasks)} tasks{Style.RESET_ALL}")
return {
'analysis': analysis,
'structure': structure,
'initial_tasks': initial_tasks
}
except Exception as e:
print(f"{Fore.YELLOW}Failed to parse initialization response: {e}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Response text (first 500 chars): {llm_response[:500]}{Style.RESET_ALL}")
return {
'analysis': 'Failed to parse LLM response',
'structure': [],
'initial_tasks': []
}
def parse_tree_update_response(self, llm_response: str) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""Parse LLM response for tree updates."""
try:
response_json = self._extract_json(llm_response)
node_updates = response_json.get('node_updates', {})
new_tasks = response_json.get('new_tasks', [])
return node_updates, new_tasks
except Exception as e:
print(f"{Fore.YELLOW}Failed to parse update response: {e}{Style.RESET_ALL}")
return {}, []
def parse_next_action_response(self, llm_response: str, available_tools: List[str] = None) -> Optional[Dict[str, Any]]:
"""Parse LLM response for next action selection."""
try:
response_json = self._extract_json(llm_response)
return response_json
except Exception as e:
print(f"{Fore.YELLOW}Failed to parse next action response: {e}{Style.RESET_ALL}")
return None
def parse_goal_check_response(self, llm_response: str) -> Dict[str, Any]:
"""Parse LLM response for goal achievement check."""
try:
response_json = self._extract_json(llm_response)
return response_json
except Exception as e:
print(f"{Fore.YELLOW}Failed to parse goal check response: {e}{Style.RESET_ALL}")
return {"goal_achieved": False, "confidence": 0}
def _extract_json(self, text: str) -> Dict[str, Any]:
"""Extract JSON from LLM response text."""
if not text:
raise ValueError("Empty response text")
print(f"{Fore.CYAN}Attempting to extract JSON from {len(text)} character response{Style.RESET_ALL}")
# Try multiple strategies to extract JSON
strategies = [
self._extract_json_code_block,
self._extract_json_braces,
self._extract_json_fuzzy,
self._create_fallback_json
]
for i, strategy in enumerate(strategies):
try:
result = strategy(text)
if result:
print(f"{Fore.GREEN}Successfully extracted JSON using strategy {i+1}{Style.RESET_ALL}")
return result
except Exception as e:
print(f"{Fore.YELLOW}Strategy {i+1} failed: {e}{Style.RESET_ALL}")
continue
raise ValueError("Could not extract valid JSON from response")
def _extract_json_code_block(self, text: str) -> Dict[str, Any]:
"""Extract JSON from code blocks."""
# Look for JSON between ```json and ``` or just ```
patterns = [
r'```json\s*(\{.*?\})\s*```',
r'```\s*(\{.*?\})\s*```'
]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
json_str = match.group(1)
return json.loads(json_str)
raise ValueError("No JSON code block found")
def _extract_json_braces(self, text: str) -> Dict[str, Any]:
"""Extract JSON by finding brace boundaries."""
# Find the first { and last }
json_start = text.find('{')
json_end = text.rfind('}')
if json_start != -1 and json_end != -1 and json_end > json_start:
json_str = text[json_start:json_end + 1]
return json.loads(json_str)
raise ValueError("No valid JSON braces found")
def _extract_json_fuzzy(self, text: str) -> Dict[str, Any]:
"""Try to extract JSON with more flexible matching."""
# Look for task-like patterns and try to construct JSON
if "tasks" in text.lower():
# Try to find task descriptions
task_patterns = [
r'"description":\s*"([^"]+)"',
r'"tool_suggestion":\s*"([^"]+)"',
r'"priority":\s*(\d+)',
r'"risk_level":\s*"([^"]+)"'
]
# This is a simplified approach - could be enhanced
# For now, fall through to the next strategy
pass
raise ValueError("Fuzzy JSON extraction failed")
def _create_fallback_json(self, text: str) -> Dict[str, Any]:
"""Create fallback JSON if no valid JSON is found."""
print(f"{Fore.YELLOW}Creating fallback JSON structure{Style.RESET_ALL}")
# Return an empty but valid structure
return {
"tasks": [],
"node_updates": {"status": "completed"},
"new_tasks": [],
"selected_task_index": 1,
"goal_achieved": False,
"confidence": 0
}
def verify_tree_update(self, old_tree_state: str, new_tree_state: str) -> bool:
"""
Verify that tree updates maintain integrity.
Args:
old_tree_state: Tree state before update
new_tree_state: Tree state after update
Returns:
True if update is valid
"""
# For now, basic verification - can be enhanced
# Check that only leaf nodes were modified (as per PentestGPT approach)
# This is simplified - in practice would need more sophisticated checks
return True # Placeholder - implement actual verification logic
def generate_strategic_summary(self) -> str:
"""Generate a strategic summary of the current PTT state."""
stats = self.tree_manager.get_statistics()
summary = f"""
=== PTT Strategic Summary ===
Goal: {self.tree_manager.goal}
Target: {self.tree_manager.target}
Progress Overview:
- Total Tasks: {stats['total_nodes']}
- Completed: {stats['status_counts'].get('completed', 0)}
- In Progress: {stats['status_counts'].get('in_progress', 0)}
- Failed: {stats['status_counts'].get('failed', 0)}
- Vulnerabilities Found: {stats['status_counts'].get('vulnerable', 0)}
Current Phase Focus:
"""
# Identify which phase is most active
phase_activity = {}
for node in self.tree_manager.nodes.values():
if node.node_type == "phase":
completed_children = sum(
1 for child_id in node.children_ids
if child_id in self.tree_manager.nodes
and self.tree_manager.nodes[child_id].status == NodeStatus.COMPLETED
)
total_children = len(node.children_ids)
phase_activity[node.description] = (completed_children, total_children)
for phase, (completed, total) in phase_activity.items():
if total > 0:
progress = (completed / total) * 100
summary += f"- {phase}: {completed}/{total} tasks ({progress:.0f}%)\n"
# Add key findings
summary += "\nKey Findings:\n"
vuln_count = 0
for node in self.tree_manager.nodes.values():
if node.status == NodeStatus.VULNERABLE and node.findings:
vuln_count += 1
summary += f"- {node.description}: {node.findings[:100]}...\n"
if vuln_count >= 5: # Limit to top 5
break
return summary
def validate_and_fix_tool_suggestions(self, tasks: List[Dict[str, Any]], available_tools: List[str]) -> List[Dict[str, Any]]:
"""Let the LLM re-evaluate tool suggestions if they don't match available tools."""
if not available_tools:
return tasks
# Check if any tasks use unavailable tools
needs_fixing = []
valid_tasks = []
for task in tasks:
tool_suggestion = task.get('tool_suggestion', '')
if tool_suggestion in available_tools or tool_suggestion in ['manual', 'generic']:
valid_tasks.append(task)
else:
needs_fixing.append(task)
if needs_fixing:
print(f"{Fore.YELLOW}Some tasks reference unavailable tools. Letting AI re-evaluate...{Style.RESET_ALL}")
# Return all tasks - let the execution phase handle tool mismatches intelligently
return tasks

357
core/task_tree_manager.py Normal file
View File

@@ -0,0 +1,357 @@
"""Task Tree Manager for PTT-based autonomous agent mode."""
import json
import uuid
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime
from enum import Enum
class NodeStatus(Enum):
"""Enumeration of possible node statuses."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
BLOCKED = "blocked"
VULNERABLE = "vulnerable"
NOT_VULNERABLE = "not_vulnerable"
class RiskLevel(Enum):
"""Enumeration of risk levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class TaskNode:
"""Represents a single node in the task tree."""
def __init__(
self,
description: str,
parent_id: Optional[str] = None,
node_type: str = "task",
**kwargs
):
"""Initialize a task node."""
self.id = kwargs.get('id', str(uuid.uuid4()))
self.description = description
self.status = NodeStatus(kwargs.get('status', NodeStatus.PENDING.value))
self.node_type = node_type # task, phase, finding, objective
self.parent_id = parent_id
self.children_ids: List[str] = kwargs.get('children_ids', [])
# Task execution details
self.tool_used = kwargs.get('tool_used', None)
self.command_executed = kwargs.get('command_executed', None)
self.output_summary = kwargs.get('output_summary', None)
self.findings = kwargs.get('findings', None)
# Metadata
self.priority = kwargs.get('priority', 5) # 1-10, higher is more important
self.risk_level = RiskLevel(kwargs.get('risk_level', RiskLevel.LOW.value))
self.timestamp = kwargs.get('timestamp', None)
self.kb_references = kwargs.get('kb_references', [])
self.dependencies = kwargs.get('dependencies', [])
# Additional attributes
self.attributes = kwargs.get('attributes', {})
def to_dict(self) -> Dict[str, Any]:
"""Convert node to dictionary representation."""
return {
'id': self.id,
'description': self.description,
'status': self.status.value,
'node_type': self.node_type,
'parent_id': self.parent_id,
'children_ids': self.children_ids,
'tool_used': self.tool_used,
'command_executed': self.command_executed,
'output_summary': self.output_summary,
'findings': self.findings,
'priority': self.priority,
'risk_level': self.risk_level.value,
'timestamp': self.timestamp,
'kb_references': self.kb_references,
'dependencies': self.dependencies,
'attributes': self.attributes
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'TaskNode':
"""Create node from dictionary representation."""
return cls(
description=data['description'],
**data
)
class TaskTreeManager:
"""Manages the Pentesting Task Tree (PTT) structure and operations."""
def __init__(self):
"""Initialize the task tree manager."""
self.nodes: Dict[str, TaskNode] = {}
self.root_id: Optional[str] = None
self.goal: Optional[str] = None
self.target: Optional[str] = None
self.constraints: Dict[str, Any] = {}
self.creation_time = datetime.now()
def initialize_tree(self, goal: str, target: str, constraints: Dict[str, Any] = None) -> str:
"""
Initialize the task tree with a goal and target.
Args:
goal: The primary objective
target: The target system/network
constraints: Any constraints or scope limitations
Returns:
The root node ID
"""
self.goal = goal
self.target = target
self.constraints = constraints or {}
# Create root node - let the LLM determine what structure is needed
root_node = TaskNode(
description=f"Goal: {goal}",
node_type="objective"
)
self.root_id = root_node.id
self.nodes[root_node.id] = root_node
return self.root_id
def add_node(self, node: TaskNode) -> str:
"""
Add a node to the tree.
Args:
node: The TaskNode to add
Returns:
The node ID
"""
self.nodes[node.id] = node
# Update parent's children list
if node.parent_id and node.parent_id in self.nodes:
parent = self.nodes[node.parent_id]
if node.id not in parent.children_ids:
parent.children_ids.append(node.id)
return node.id
def update_node(self, node_id: str, updates: Dict[str, Any]) -> bool:
"""
Update a node's attributes.
Args:
node_id: The ID of the node to update
updates: Dictionary of attributes to update
Returns:
True if successful, False otherwise
"""
if node_id not in self.nodes:
return False
node = self.nodes[node_id]
# Update allowed fields
allowed_fields = {
'status', 'tool_used', 'command_executed', 'output_summary',
'findings', 'priority', 'risk_level', 'timestamp', 'kb_references'
}
for field, value in updates.items():
if field in allowed_fields:
if field == 'status':
node.status = NodeStatus(value)
elif field == 'risk_level':
node.risk_level = RiskLevel(value)
else:
setattr(node, field, value)
elif field == 'attributes':
node.attributes.update(value)
return True
def get_node(self, node_id: str) -> Optional[TaskNode]:
"""Get a node by ID."""
return self.nodes.get(node_id)
def get_children(self, node_id: str) -> List[TaskNode]:
"""Get all children of a node."""
if node_id not in self.nodes:
return []
parent = self.nodes[node_id]
return [self.nodes[child_id] for child_id in parent.children_ids if child_id in self.nodes]
def get_leaf_nodes(self) -> List[TaskNode]:
"""Get all leaf nodes (nodes without children)."""
return [node for node in self.nodes.values() if not node.children_ids]
def get_candidate_tasks(self) -> List[TaskNode]:
"""
Get candidate tasks for next action.
Returns tasks that are:
- Leaf nodes
- Status is PENDING or FAILED
- All dependencies are completed
"""
candidates = []
for node in self.get_leaf_nodes():
if node.status in [NodeStatus.PENDING, NodeStatus.FAILED]:
# Check dependencies
deps_satisfied = all(
self.nodes.get(dep_id, TaskNode("")).status == NodeStatus.COMPLETED
for dep_id in node.dependencies
)
if deps_satisfied:
candidates.append(node)
return candidates
def prioritize_tasks(self, tasks: List[TaskNode]) -> List[TaskNode]:
"""
Prioritize tasks based on various factors.
Args:
tasks: List of candidate tasks
Returns:
Sorted list of tasks (highest priority first)
"""
def task_score(task: TaskNode) -> float:
# Base score from priority
score = task.priority
# Boost for reconnaissance tasks in early stages
if "recon" in task.description.lower() or "scan" in task.description.lower():
completed_count = sum(1 for n in self.nodes.values() if n.status == NodeStatus.COMPLETED)
if completed_count < 5:
score += 3
# Boost for vulnerability assessment after recon
if "vuln" in task.description.lower() and self._has_completed_recon():
score += 2
# Penalty for high-risk tasks early on
if task.risk_level == RiskLevel.HIGH:
score -= 2
return score
return sorted(tasks, key=task_score, reverse=True)
def _has_completed_recon(self) -> bool:
"""Check if basic reconnaissance has been completed."""
recon_keywords = ["scan", "recon", "enumerat", "discover"]
completed_recon = any(
any(keyword in node.description.lower() for keyword in recon_keywords)
and node.status == NodeStatus.COMPLETED
for node in self.nodes.values()
)
return completed_recon
def to_natural_language(self, node_id: Optional[str] = None, indent: int = 0) -> str:
"""
Convert the tree (or subtree) to natural language representation.
Args:
node_id: Starting node ID (None for root)
indent: Indentation level
Returns:
Natural language representation of the tree
"""
if node_id is None:
node_id = self.root_id
if node_id not in self.nodes:
return ""
node = self.nodes[node_id]
indent_str = " " * indent
# Format node information
status_symbol = {
NodeStatus.PENDING: "",
NodeStatus.IN_PROGRESS: "",
NodeStatus.COMPLETED: "",
NodeStatus.FAILED: "",
NodeStatus.BLOCKED: "",
NodeStatus.VULNERABLE: "",
NodeStatus.NOT_VULNERABLE: ""
}.get(node.status, "?")
lines = [f"{indent_str}{status_symbol} {node.description}"]
# Add findings if present
if node.findings:
lines.append(f"{indent_str} → Findings: {node.findings}")
# Add tool/command info if present
if node.tool_used:
lines.append(f"{indent_str} → Tool: {node.tool_used}")
# Process children
for child_id in node.children_ids:
lines.append(self.to_natural_language(child_id, indent + 1))
return "\n".join(lines)
def to_json(self) -> str:
"""Serialize the tree to JSON."""
data = {
'goal': self.goal,
'target': self.target,
'constraints': self.constraints,
'root_id': self.root_id,
'creation_time': self.creation_time.isoformat(),
'nodes': {node_id: node.to_dict() for node_id, node in self.nodes.items()}
}
return json.dumps(data, indent=2)
@classmethod
def from_json(cls, json_str: str) -> 'TaskTreeManager':
"""Deserialize a tree from JSON."""
data = json.loads(json_str)
manager = cls()
manager.goal = data['goal']
manager.target = data['target']
manager.constraints = data['constraints']
manager.root_id = data['root_id']
manager.creation_time = datetime.fromisoformat(data['creation_time'])
# Recreate nodes
for node_id, node_data in data['nodes'].items():
node = TaskNode.from_dict(node_data)
manager.nodes[node_id] = node
return manager
def get_statistics(self) -> Dict[str, Any]:
"""Get tree statistics."""
status_counts = {}
for node in self.nodes.values():
status = node.status.value
status_counts[status] = status_counts.get(status, 0) + 1
return {
'total_nodes': len(self.nodes),
'status_counts': status_counts,
'leaf_nodes': len(self.get_leaf_nodes()),
'candidate_tasks': len(self.get_candidate_tasks())
}

View File

@@ -9,6 +9,7 @@ import asyncio
from datetime import datetime
from typing import Dict, List, Any
import re
from colorama import Fore, Style
class PentestReportGenerator:
@@ -467,4 +468,350 @@ async def generate_report_from_workflow(report_data: Dict[str, Any], run_agent_f
"""
generator = PentestReportGenerator(report_data)
return await generator.generate_report(run_agent_func, connected_servers, kb_instance, save_raw_history)
return await generator.generate_report(run_agent_func, connected_servers, kb_instance, save_raw_history)
async def generate_report_from_ptt(ptt_manager, conversation_history: List[Dict[str, Any]], run_agent_func=None, connected_servers=None, kb_instance=None, save_raw_history=False) -> str:
"""
Generate a professional report from PTT (Pentesting Task Tree) data
Args:
ptt_manager: TaskTreeManager instance containing the PTT
conversation_history: List of conversation history entries
run_agent_func: The main agent function for AI analysis
connected_servers: Connected MCP servers
kb_instance: Knowledge base instance
save_raw_history: Whether to save raw conversation history
Returns:
str: Path to generated report file
"""
# Convert PTT data to report-compatible format
report_data = {
'workflow_name': f"Agent Mode: {ptt_manager.goal}",
'workflow_key': 'agent_mode',
'target': ptt_manager.target,
'timestamp': ptt_manager.creation_time,
'conversation_history': conversation_history,
'tools_used': [server.name for server in connected_servers] if connected_servers else [],
'ptt_data': {
'goal': ptt_manager.goal,
'target': ptt_manager.target,
'constraints': ptt_manager.constraints,
'statistics': ptt_manager.get_statistics(),
'tree_structure': ptt_manager.to_natural_language(),
'nodes': {node_id: node.to_dict() for node_id, node in ptt_manager.nodes.items()}
}
}
# Create a specialized PTT report generator
generator = PTTReportGenerator(report_data)
if run_agent_func and connected_servers:
return await generator.generate_report(run_agent_func, connected_servers, kb_instance, save_raw_history)
else:
# Generate a basic report without AI analysis if no agent function available
return generator.generate_basic_report(save_raw_history)
class PTTReportGenerator:
"""Generate professional penetration testing reports from PTT data"""
def __init__(self, report_data: Dict[str, Any]):
self.workflow_name = report_data['workflow_name']
self.workflow_key = report_data['workflow_key']
self.target = report_data['target']
self.timestamp = report_data['timestamp']
self.conversation_history = report_data['conversation_history']
self.tools_used = report_data.get('tools_used', [])
self.ptt_data = report_data.get('ptt_data', {})
# Will be populated by AI analysis
self.structured_findings = {}
def generate_basic_report(self, save_raw_history: bool = False) -> str:
"""Generate a basic report without AI analysis"""
# Extract findings from PTT nodes
vulnerabilities = []
completed_tasks = []
failed_tasks = []
for node_data in self.ptt_data.get('nodes', {}).values():
if node_data.get('status') == 'completed' and node_data.get('findings'):
completed_tasks.append({
'description': node_data.get('description', ''),
'findings': node_data.get('findings', ''),
'tool_used': node_data.get('tool_used', ''),
'output_summary': node_data.get('output_summary', '')
})
elif node_data.get('status') == 'vulnerable':
vulnerabilities.append({
'title': node_data.get('description', 'Unknown Vulnerability'),
'description': node_data.get('findings', 'No description available'),
'severity': 'Medium', # Default severity
'affected_systems': [self.target],
'evidence': node_data.get('output_summary', ''),
'remediation': 'Review and patch identified vulnerabilities'
})
elif node_data.get('status') == 'failed':
failed_tasks.append({
'description': node_data.get('description', ''),
'tool_used': node_data.get('tool_used', ''),
'error': node_data.get('output_summary', '')
})
# Create structured findings
self.structured_findings = {
'executive_summary': f"Autonomous penetration testing completed against {self.target}. Goal: {self.ptt_data.get('goal', 'Unknown')}. {len(completed_tasks)} tasks completed successfully, {len(vulnerabilities)} vulnerabilities identified.",
'vulnerabilities': vulnerabilities,
'key_statistics': {
'total_vulnerabilities': len(vulnerabilities),
'critical_count': 0,
'high_count': 0,
'medium_count': len(vulnerabilities),
'systems_compromised': 1 if vulnerabilities else 0
},
'methodology': f"Autonomous penetration testing using Pentesting Task Tree (PTT) methodology with intelligent task prioritization and execution.",
'conclusion': f"Assessment {'successfully identified security weaknesses' if vulnerabilities else 'completed without identifying critical vulnerabilities'}. {'Immediate remediation recommended' if vulnerabilities else 'Continue monitoring and regular assessments'}.",
'recommendations': [
{
'category': 'Patch Management',
'recommendation': 'Apply security patches to all identified vulnerable services',
'priority': 'Immediate',
'business_justification': 'Prevents exploitation of known vulnerabilities'
},
{
'category': 'Monitoring',
'recommendation': 'Implement monitoring for the services and ports identified during reconnaissance',
'priority': 'Short-term',
'business_justification': 'Early detection of potential security incidents'
}
] if vulnerabilities else [
{
'category': 'Continued Security',
'recommendation': 'Maintain current security posture and conduct regular assessments',
'priority': 'Medium-term',
'business_justification': 'Proactive security maintenance'
}
]
}
# Generate the markdown report
markdown_report = self.generate_markdown_report()
# Save report
return self.save_report(markdown_report, save_raw_history)
async def generate_report(self, run_agent_func, connected_servers, kb_instance=None, save_raw_history=False) -> str:
"""Generate a comprehensive report with AI analysis"""
try:
# Create analysis prompt specifically for PTT data
analysis_prompt = self.create_ptt_analysis_prompt()
# Get AI analysis
ai_response = await self.analyze_with_ai(
analysis_prompt,
run_agent_func,
connected_servers,
kb_instance
)
if ai_response:
# Parse AI response
self.structured_findings = self.parse_ai_response(ai_response)
else:
print(f"{Fore.YELLOW}AI analysis failed, generating basic report...{Style.RESET_ALL}")
return self.generate_basic_report(save_raw_history)
except Exception as e:
print(f"{Fore.YELLOW}Error in AI analysis: {e}. Generating basic report...{Style.RESET_ALL}")
return self.generate_basic_report(save_raw_history)
# Generate markdown report
markdown_report = self.generate_markdown_report()
# Save report
return self.save_report(markdown_report, save_raw_history)
def create_ptt_analysis_prompt(self) -> str:
"""Create analysis prompt for PTT data"""
ptt_structure = self.ptt_data.get('tree_structure', 'No PTT structure available')
goal = self.ptt_data.get('goal', 'Unknown goal')
target = self.target
statistics = self.ptt_data.get('statistics', {})
# Extract key findings from completed tasks
key_findings = []
for node_data in self.ptt_data.get('nodes', {}).values():
if node_data.get('status') in ['completed', 'vulnerable'] and node_data.get('findings'):
key_findings.append(f"- {node_data.get('description', '')}: {node_data.get('findings', '')}")
findings_text = '\n'.join(key_findings) if key_findings else 'No significant findings recorded'
prompt = f"""You are analyzing the results of an autonomous penetration test conducted using a Pentesting Task Tree (PTT) methodology.
ASSESSMENT DETAILS:
Goal: {goal}
Target: {target}
Statistics: {statistics}
PTT STRUCTURE:
{ptt_structure}
KEY FINDINGS:
{findings_text}
Based on this PTT analysis, provide a comprehensive security assessment in the following JSON format:
{{
"executive_summary": "Professional executive summary of the assessment",
"key_statistics": {{
"total_vulnerabilities": 0,
"critical_count": 0,
"high_count": 0,
"medium_count": 0,
"low_count": 0,
"systems_compromised": 0
}},
"vulnerabilities": [
{{
"title": "Vulnerability name",
"description": "Detailed description",
"severity": "Critical/High/Medium/Low",
"impact": "Business impact description",
"affected_systems": ["system1", "system2"],
"evidence": "Technical evidence",
"remediation": "Specific remediation steps",
"references": ["CVE-XXXX", "reference links"]
}}
],
"compromised_systems": [
{{
"system": "system identifier",
"access_level": "user/admin/root",
"method": "exploitation method",
"evidence": "proof of compromise"
}}
],
"attack_paths": [
{{
"path_description": "Attack path name",
"impact": "potential impact",
"steps": ["step1", "step2", "step3"]
}}
],
"recommendations": [
{{
"category": "category name",
"recommendation": "specific recommendation",
"priority": "Immediate/Short-term/Medium-term/Long-term",
"business_justification": "why this matters to business"
}}
],
"methodology": "Description of the PTT methodology used",
"conclusion": "Professional conclusion of the assessment"
}}
Focus on:
1. Extracting real security findings from the PTT execution
2. Proper risk classification
3. Actionable recommendations
4. Business-relevant impact assessment"""
return prompt
async def analyze_with_ai(self, prompt: str, run_agent_func, connected_servers, kb_instance) -> str:
"""Analyze the assessment with AI"""
try:
result = await run_agent_func(
prompt,
connected_servers,
history=[],
streaming=True,
kb_instance=kb_instance
)
if hasattr(result, "final_output"):
return result.final_output
elif hasattr(result, "output"):
return result.output
elif isinstance(result, str):
return result
except Exception as e:
print(f"{Fore.RED}Error in AI analysis: {e}{Style.RESET_ALL}")
return None
def parse_ai_response(self, response: str) -> Dict[str, Any]:
"""Parse AI response for structured findings"""
try:
# Try to extract JSON from the response
import re
# Look for JSON in the response
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
json_str = json_match.group()
return json.loads(json_str)
except Exception as e:
print(f"{Fore.YELLOW}Failed to parse AI response: {e}{Style.RESET_ALL}")
# Fallback to basic findings
return {
'executive_summary': 'Assessment completed successfully.',
'vulnerabilities': [],
'key_statistics': {'total_vulnerabilities': 0},
'methodology': 'Autonomous penetration testing using PTT methodology.',
'conclusion': 'Assessment completed.',
'recommendations': []
}
def generate_markdown_report(self) -> str:
"""Generate the final markdown report using the same format as PentestReportGenerator"""
# Use the same report generation logic as the workflow reporter
temp_generator = PentestReportGenerator({
'workflow_name': self.workflow_name,
'workflow_key': self.workflow_key,
'target': self.target,
'timestamp': self.timestamp,
'conversation_history': self.conversation_history,
'tools_used': self.tools_used
})
temp_generator.structured_findings = self.structured_findings
return temp_generator.generate_markdown_report()
def save_report(self, markdown_content: str, save_raw_history: bool = False) -> str:
"""Save the report to file"""
# Create reports directory if it doesn't exist
reports_dir = "reports"
if not os.path.exists(reports_dir):
os.makedirs(reports_dir)
# Generate filename
timestamp_str = str(int(self.timestamp.timestamp()))
safe_target = re.sub(r'[^\w\-_\.]', '_', self.target)
filename = f"{reports_dir}/ghostcrew_agent_mode_{safe_target}_{timestamp_str}.md"
# Save markdown file
with open(filename, 'w', encoding='utf-8') as f:
f.write(markdown_content)
# Optionally save raw history and PTT data
if save_raw_history:
raw_filename = f"{reports_dir}/ghostcrew_agent_mode_{safe_target}_{timestamp_str}_raw.json"
raw_data = {
'ptt_data': self.ptt_data,
'conversation_history': self.conversation_history,
'timestamp': self.timestamp.isoformat()
}
with open(raw_filename, 'w', encoding='utf-8') as f:
json.dump(raw_data, f, indent=2, default=str)
print(f"{Fore.GREEN}Raw PTT data saved: {raw_filename}{Style.RESET_ALL}")
return filename

View File

@@ -1,6 +1,6 @@
"""Menu system and user interface components for GHOSTCREW."""
from typing import Optional, List, Tuple
from typing import Optional, List, Tuple, Dict, Any
from colorama import Fore, Style
from config.constants import (
MAIN_MENU_TITLE, INTERACTIVE_OPTION, AUTOMATED_OPTION,
@@ -25,10 +25,16 @@ class MenuSystem:
else:
print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (workflows.py not found){Style.RESET_ALL}")
print(f"3. {EXIT_OPTION}")
# Agent mode option
if has_connected_servers:
print(f"3. {Fore.YELLOW}Agent Mode{Style.RESET_ALL}")
else:
print(f"3. {Fore.LIGHTBLACK_EX}Agent Mode (requires MCP tools){Style.RESET_ALL}")
print(f"4. {EXIT_OPTION}")
@staticmethod
def get_menu_choice(max_option: int = 3) -> str:
def get_menu_choice(max_option: int = 4) -> str:
"""Get user's menu selection."""
return input(f"\n{Fore.GREEN}Select mode (1-{max_option}): {Style.RESET_ALL}").strip()
@@ -39,6 +45,88 @@ class MenuSystem:
print(f"{Fore.WHITE}Type your questions or commands. Use 'multi' for multi-line input.{Style.RESET_ALL}")
print(f"{Fore.WHITE}Type 'menu' to return to main menu.{Style.RESET_ALL}\n")
@staticmethod
def display_agent_mode_intro() -> None:
"""Display introduction for agent mode."""
print(f"\n{Fore.CYAN}AGENT MODE - Autonomous PTT-based Penetration Testing{Style.RESET_ALL}")
print(f"{Fore.WHITE}{'='*60}{Style.RESET_ALL}")
print(f"{Fore.WHITE}The AI agent will autonomously conduct a penetration test{Style.RESET_ALL}")
print(f"{Fore.WHITE}using a dynamic Pentesting Task Tree (PTT) for strategic{Style.RESET_ALL}")
print(f"{Fore.WHITE}decision making and maintaining context throughout the test.{Style.RESET_ALL}")
print(f"{Fore.WHITE}{'='*60}{Style.RESET_ALL}\n")
@staticmethod
def get_agent_mode_params() -> Optional[Dict[str, Any]]:
"""Get parameters for agent mode initialization."""
print(f"{Fore.CYAN}Agent Mode Setup{Style.RESET_ALL}")
print(f"{Fore.WHITE}Please provide the following information:{Style.RESET_ALL}\n")
# Get goal
print(f"{Fore.YELLOW}1. Primary Goal{Style.RESET_ALL}")
print(f"{Fore.WHITE}What is the main objective of this penetration test?{Style.RESET_ALL}")
print(f"{Fore.LIGHTBLACK_EX}Example: 'Gain administrative access to the target system'{Style.RESET_ALL}")
print(f"{Fore.LIGHTBLACK_EX}Example: 'Identify and exploit vulnerabilities in the web application'{Style.RESET_ALL}")
goal = input(f"{Fore.GREEN}Goal: {Style.RESET_ALL}").strip()
if not goal:
print(f"{Fore.RED}Goal is required.{Style.RESET_ALL}")
return None
# Get target
print(f"\n{Fore.YELLOW}2. Target Information{Style.RESET_ALL}")
print(f"{Fore.WHITE}Specify the target system, network, or application.{Style.RESET_ALL}")
print(f"{Fore.LIGHTBLACK_EX}Example: '192.168.1.100' or 'example.com' or '192.168.1.0/24'{Style.RESET_ALL}")
target = input(f"{Fore.GREEN}Target: {Style.RESET_ALL}").strip()
if not target:
print(f"{Fore.RED}Target is required.{Style.RESET_ALL}")
return None
# Get constraints
constraints = {}
print(f"\n{Fore.YELLOW}3. Constraints/Scope (Optional){Style.RESET_ALL}")
print(f"{Fore.WHITE}Any limitations or specific requirements?{Style.RESET_ALL}")
# Iteration limit
print(f"\n{Fore.WHITE}Iteration Limit:{Style.RESET_ALL}")
print("How many iterations should the agent run?")
print("Each iteration involves task selection, execution, and tree updates.")
print("Recommended: 10-30 iterations for thorough testing")
print("Set to 0 to run until goal is achieved or no more actions available")
iteration_limit_input = input(f"{Fore.GREEN}Iteration limit (default: 20): {Style.RESET_ALL}").strip()
try:
iteration_limit = int(iteration_limit_input) if iteration_limit_input else 20
# Allow 0 for unlimited, but cap maximum at 200 for safety
iteration_limit = max(0, min(200, iteration_limit))
constraints['iteration_limit'] = iteration_limit
except ValueError:
constraints['iteration_limit'] = 20
print(f"{Fore.YELLOW}Invalid input, using default: 20{Style.RESET_ALL}")
# Additional notes
notes = input(f"{Fore.GREEN}Additional notes/constraints (optional): {Style.RESET_ALL}").strip()
if notes:
constraints['notes'] = notes
# Confirm
print(f"\n{Fore.CYAN}Configuration Summary:{Style.RESET_ALL}")
print(f"{Fore.WHITE}Goal: {goal}{Style.RESET_ALL}")
print(f"{Fore.WHITE}Target: {target}{Style.RESET_ALL}")
print(f"{Fore.WHITE}Iteration Limit: {constraints['iteration_limit']}{Style.RESET_ALL}")
print(f"{Fore.WHITE}Constraints: {constraints}{Style.RESET_ALL}")
confirm = input(f"\n{Fore.YELLOW}Proceed with agent mode? (yes/no): {Style.RESET_ALL}").strip().lower()
if confirm != 'yes':
print(f"{Fore.YELLOW}Agent mode cancelled.{Style.RESET_ALL}")
return None
return {
'goal': goal,
'target': target,
'constraints': constraints
}
@staticmethod
def get_user_input() -> str:
"""Get user input with prompt."""
@@ -88,6 +176,14 @@ class MenuSystem:
print(f"{Fore.WHITE}Please configure MCP tools to use this feature.{Style.RESET_ALL}")
input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
@staticmethod
def display_agent_mode_requirements_message() -> None:
"""Display message about agent mode requirements."""
print(f"\n{Fore.YELLOW}Agent Mode requires MCP tools to be configured and connected.{Style.RESET_ALL}")
print(f"{Fore.WHITE}The autonomous agent needs real security tools to execute PTT tasks.{Style.RESET_ALL}")
print(f"{Fore.WHITE}Please configure MCP tools to use this feature.{Style.RESET_ALL}")
input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
@staticmethod
def get_workflow_target() -> Optional[str]:
"""Get target input for workflow execution."""