Add pentest workflows

This commit is contained in:
GH05TCREW
2025-05-22 21:34:26 -06:00
parent 18da654676
commit d2bae28ca9
4 changed files with 426 additions and 65 deletions

4
.gitignore vendored
View File

@@ -3,3 +3,7 @@ __pycache__/
*.pyo
.env
venv/
# Reports directory - contains sensitive penetration test results
reports/
*.txt

View File

@@ -10,11 +10,61 @@ https://github.com/user-attachments/assets/73176f92-a94d-4b66-9aa7-ee06c438a741
- **MCP Server Integration**: Through the `mcp.json` configuration file, multiple MCP servers can be flexibly integrated and managed to extend the assistant's capabilities.
- **Tool Management**: Configure, connect to, and manage MCP tools through an interactive menu, including the ability to clear all configurations.
- **Tool Invocation**: The AI assistant can call tools provided by configured MCP servers (such as: nmap, metasploit, ffuf, etc.) based on user requests.
- **Automated Pentesting Workflows**: Execute predefined penetration testing workflows that systematically use configured security tools to perform comprehensive assessments.
- **Conversation History**: Supports multi-turn dialogues, remembering previous interaction content.
- **Streaming Output**: AI responses can be streamed for a better user experience.
- **Knowledge Base Enhancement (Optional)**: Supports enhancing AI responses through a local knowledge base RAG (`knowledge` directory).
- **Configurable Models**: Supports configuration of different language model parameters.
## Automated Penetration Testing Workflows
GHOSTCREW includes automated penetration testing workflows that provide structured, systematic security assessments. These workflows require MCP tools to be configured and connected to function properly.
### Available Workflows
1. **Reconnaissance and Discovery**
- Comprehensive information gathering and target profiling
- Performs reconnaissance, subdomain discovery, port scanning, technology fingerprinting, and historical data analysis
- **Steps**: 5 systematic phases
2. **Web Application Security Assessment**
- Comprehensive web application penetration testing
- Tests for directory traversal, SQL injection, web vulnerabilities, SSL/TLS security, authentication flaws, and file inclusion
- **Steps**: 6 focused web security phases
3. **Network Infrastructure Penetration Test**
- Network-focused penetration testing and exploitation
- Includes network scanning, service enumeration, vulnerability identification, misconfiguration testing, and exploitation attempts
- **Steps**: 6 network security phases
4. **Complete Penetration Test**
- Full-scope penetration testing methodology
- Comprehensive assessment covering reconnaissance, enumeration, vulnerability scanning, web testing, network exploitation, post-exploitation, and reporting
- **Steps**: 7 complete assessment phases
### Workflow Features
- **Tool Integration**: All workflows utilize configured MCP tools (Nmap, Metasploit, Nuclei, etc.) for real security testing
- **Professional Output**: Each step provides detailed technical findings, vulnerability analysis, risk assessment, and remediation recommendations
- **Report Generation**: Automatically save reports to organized `reports/` directory with workflow-specific naming
- **Target Flexibility**: Works with IP addresses, domain names, or network ranges
- **Progress Tracking**: Real-time progress indication through each workflow step
### Usage Requirements
- **MCP Tools Required**: Automated workflows require at least one MCP security tool to be configured and connected
- **Access Control**: The system prevents workflow execution without proper tools to avoid generating simulated results
- **Professional Context**: Designed for authorized penetration testing and security assessments only
### How to Use Workflows
1. Start GHOSTCREW and configure MCP tools when prompted
2. Select "Automated Penetration Testing" from the main menu
3. Choose your desired workflow type
4. Enter the target (IP, domain, or network range)
5. Confirm execution and monitor progress
6. Optionally save results to file for documentation
### Startup Effect
<p align="center">
<img width="650" alt="GHOSTCREW Terminal Startup Screen" src="https://github.com/user-attachments/assets/f92941e6-31c9-44a4-81f8-f30aa2b3ada8">
@@ -125,8 +175,11 @@ agent/
├── .venv/ # Python virtual environment (ignored by .gitignore)
├── knowledge/ # Knowledge base documents directory
│ └── ...
├── reports/ # Automated penetration test reports directory
│ └── ghostcrew_pentest_*.txt
├── .gitignore # Git ignore file configuration
├── main.py # Main program entry
├── workflows.py # Automated penetration testing workflows
├── configure_mcp.py # MCP tool configuration utility
├── mcp.json # MCP server configuration file
├── rag_embedding.py # RAG embedding related (if used)

361
main.py
View File

@@ -8,6 +8,12 @@ from colorama import init, Fore, Back, Style
from ollama import chat,Message
import tiktoken
# Import workflows
try:
from workflows import get_available_workflows, get_workflow_by_key, list_workflow_names
WORKFLOWS_AVAILABLE = True
except ImportError:
WORKFLOWS_AVAILABLE = False
init(autoreset=True)
@@ -78,6 +84,96 @@ class DefaultModelProvider(ModelProvider):
# Create model provider instance
model_provider = DefaultModelProvider()
def get_available_tools(connected_servers):
"""Get list of available/connected tool names"""
return [server.name for server in connected_servers]
async def run_automated_workflow(workflow, target, connected_servers, conversation_history, kb_instance):
"""Execute an automated penetration testing workflow"""
available_tools = get_available_tools(connected_servers)
print(f"\n{Fore.CYAN}Starting Automated Workflow: {workflow['name']}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Target: {target}{Style.RESET_ALL}")
print(f"{Fore.CYAN}Available Tools: {', '.join(available_tools) if available_tools else 'None'}{Style.RESET_ALL}")
print(f"{Fore.WHITE}Description: {workflow['description']}{Style.RESET_ALL}")
print(f"{Fore.WHITE}{'='*60}{Style.RESET_ALL}")
results = []
for i, step in enumerate(workflow['steps'], 1):
print(f"\n{Fore.CYAN}Step {i}/{len(workflow['steps'])}{Style.RESET_ALL}")
formatted_step = step.format(target=target)
print(f"{Fore.WHITE}{formatted_step}{Style.RESET_ALL}")
# Create comprehensive query for this step
enhanced_query = f"""
Execute the following penetration testing step as part of an automated workflow:
TARGET: {target}
STEP: {formatted_step}
AVAILABLE_TOOLS: {', '.join(available_tools) if available_tools else 'No tools configured'}
Please execute this step systematically using appropriate tools and provide:
1. Detailed command outputs and technical findings
2. Security vulnerabilities or issues discovered
3. Risk assessment and severity analysis
4. Specific recommendations for remediation
5. Relevant technical details and evidence
Be thorough and professional in your analysis. Use the available tools effectively.
"""
# Execute the step through the agent
result = await run_agent(enhanced_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance)
if result and hasattr(result, "final_output"):
results.append({
"step": i,
"description": formatted_step,
"output": result.final_output
})
# Add to conversation history
conversation_history.append({
"user_query": enhanced_query,
"ai_response": result.final_output
})
print(f"{Fore.GREEN}Step {i} completed{Style.RESET_ALL}")
# Brief delay between steps
await asyncio.sleep(1)
# Workflow completion summary
print(f"\n{Fore.GREEN}Automated workflow completed successfully{Style.RESET_ALL}")
print(f"{Fore.CYAN}Steps executed: {len(results)}/{len(workflow['steps'])}{Style.RESET_ALL}")
return results
def show_automated_menu():
"""Display the automated workflow selection menu"""
if not WORKFLOWS_AVAILABLE:
print(f"{Fore.YELLOW}Automated workflows not available. workflows.py file not found.{Style.RESET_ALL}")
return None
print(f"\n{Fore.CYAN}AUTOMATED PENTESTING WORKFLOWS{Style.RESET_ALL}")
print(f"{Fore.WHITE}{'='*50}{Style.RESET_ALL}")
workflow_list = list_workflow_names()
workflows = get_available_workflows()
for i, (key, name) in enumerate(workflow_list, 1):
description = workflows[key]["description"]
step_count = len(workflows[key]["steps"])
print(f"{i}. {Fore.YELLOW}{name}{Style.RESET_ALL}")
print(f" {Fore.WHITE}{description}{Style.RESET_ALL}")
print(f" {Fore.CYAN}Steps: {step_count}{Style.RESET_ALL}")
print()
print(f"{len(workflow_list)+1}. {Fore.RED}Back to Main Menu{Style.RESET_ALL}")
return workflow_list
# Modify run_agent function to accept connected server list and conversation history as parameters
async def run_agent(query: str, mcp_servers: list[MCPServerStdio], history: list[dict] = None, streaming: bool = True, kb_instance=None):
"""
@@ -406,79 +502,214 @@ async def main():
# Create conversation history list
conversation_history = []
# --- Enter interactive main loop ---
# --- Main program loop ---
while True:
# Check if the user wants multi-line input
print(f"\n{Fore.GREEN}[>]{Style.RESET_ALL} ", end="")
user_query = input().strip()
# Show main menu
print(f"\n{Fore.CYAN}MAIN MENU{Style.RESET_ALL}")
print(f"1. {Fore.GREEN}Interactive Chat Mode{Style.RESET_ALL}")
# Handle special commands
if user_query.lower() in ["quit", "exit"]:
print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}")
break # Exit loop, enter finally block
# Handle empty input
if not user_query:
print(f"{Fore.YELLOW}No query entered. Please type your question.{Style.RESET_ALL}")
continue
# Handle multi-line mode request
if user_query.lower() == "multi":
print(f"{Fore.CYAN}Entering multi-line mode. Type your query across multiple lines.{Style.RESET_ALL}")
print(f"{Fore.CYAN}Press Enter on an empty line to submit.{Style.RESET_ALL}")
# Check if automated mode should be available
if WORKFLOWS_AVAILABLE and connected_servers:
print(f"2. {Fore.YELLOW}Automated Penetration Testing{Style.RESET_ALL}")
max_option = 3
elif WORKFLOWS_AVAILABLE and not connected_servers:
print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (requires MCP tools){Style.RESET_ALL}")
max_option = 3
else:
print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (workflows.py not found){Style.RESET_ALL}")
max_option = 3
print(f"3. {Fore.RED}Quit{Style.RESET_ALL}")
menu_choice = input(f"\n{Fore.GREEN}Select mode (1-3): {Style.RESET_ALL}").strip()
if menu_choice == "1":
# Interactive chat mode (existing functionality)
print(f"\n{Fore.CYAN}INTERACTIVE CHAT MODE{Style.RESET_ALL}")
print(f"{Fore.WHITE}Type your questions or commands. Use 'multi' for multi-line input.{Style.RESET_ALL}")
print(f"{Fore.WHITE}Type 'menu' to return to main menu.{Style.RESET_ALL}\n")
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
# Only proceed if they actually entered something in multi-line mode
if not lines:
print(f"{Fore.YELLOW}No query entered in multi-line mode.{Style.RESET_ALL}")
continue
# Check if the user wants multi-line input
print(f"\n{Fore.GREEN}[>]{Style.RESET_ALL} ", end="")
user_query = input().strip()
user_query = "\n".join(lines)
# Handle special commands
if user_query.lower() in ["quit", "exit"]:
print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}")
return
if user_query.lower() == "menu":
break
# Handle empty input
if not user_query:
print(f"{Fore.YELLOW}No query entered. Please type your question.{Style.RESET_ALL}")
continue
# Handle multi-line mode request
if user_query.lower() == "multi":
print(f"{Fore.CYAN}Entering multi-line mode. Type your query across multiple lines.{Style.RESET_ALL}")
print(f"{Fore.CYAN}Press Enter on an empty line to submit.{Style.RESET_ALL}")
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
# Only proceed if they actually entered something in multi-line mode
if not lines:
print(f"{Fore.YELLOW}No query entered in multi-line mode.{Style.RESET_ALL}")
continue
user_query = "\n".join(lines)
# Create record for current dialogue
current_dialogue = {"user_query": user_query, "ai_response": ""}
# When running agent, pass in the already connected server list and conversation history
# Only pass the successfully connected server list to the Agent
# Pass kb_instance to run_agent
result = await run_agent(user_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance)
# If there is a result, save the AI's answer
if result and hasattr(result, "final_output"):
current_dialogue["ai_response"] = result.final_output
# Add current dialogue to history
conversation_history.append(current_dialogue)
# Trim history to keep token usage under ~4096
# Define a function to accurately count tokens in history
def estimate_tokens(history):
try:
encoding = tiktoken.encoding_for_model(MODEL_NAME)
return sum(
len(encoding.encode(entry['user_query'])) +
len(encoding.encode(entry.get('ai_response', '')))
for entry in history
)
except Exception:
# Fall back to approximate counting if tiktoken fails
return sum(
len(entry['user_query'].split()) +
len(entry.get('ai_response', '').split())
for entry in history
)
# Create record for current dialogue
current_dialogue = {"user_query": user_query, "ai_response": ""}
# When running agent, pass in the already connected server list and conversation history
# Only pass the successfully connected server list to the Agent
# Pass kb_instance to run_agent
result = await run_agent(user_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance)
# If there is a result, save the AI's answer
if result and hasattr(result, "final_output"):
current_dialogue["ai_response"] = result.final_output
# Add current dialogue to history
conversation_history.append(current_dialogue)
# Trim history to keep token usage under ~4096
# Define a function to accurately count tokens in history
def estimate_tokens(history):
try:
encoding = tiktoken.encoding_for_model(MODEL_NAME)
return sum(
len(encoding.encode(entry['user_query'])) +
len(encoding.encode(entry.get('ai_response', '')))
for entry in history
)
except Exception:
# Fall back to approximate counting if tiktoken fails
return sum(
len(entry['user_query'].split()) +
len(entry.get('ai_response', '').split())
for entry in history
)
# Trim history while token count exceeds the limit
while estimate_tokens(conversation_history) > 4000:
conversation_history.pop(0)
# Trim history while token count exceeds the limit
while estimate_tokens(conversation_history) > 4000:
conversation_history.pop(0)
print(f"\n{Fore.CYAN}Ready for next query. Type 'quit', 'multi' for multi-line, or 'menu' for main menu.{Style.RESET_ALL}")
elif menu_choice == "2":
# Automated penetration testing workflows
if not WORKFLOWS_AVAILABLE:
print(f"\n{Fore.YELLOW}Automated workflows not available. workflows.py file not found.{Style.RESET_ALL}")
input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
continue
print(f"\n{Fore.CYAN}Ready for your next query. Type 'quit' to exit or 'multi' for multi-line input.{Style.RESET_ALL}")
if not connected_servers:
print(f"\n{Fore.YELLOW}Automated penetration testing requires MCP tools to be configured and connected.{Style.RESET_ALL}")
print(f"{Fore.WHITE}Without real security tools, the AI would only generate simulated responses.{Style.RESET_ALL}")
print(f"{Fore.WHITE}Please restart the application and configure MCP tools to use this feature.{Style.RESET_ALL}")
input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
continue
while True:
workflow_list = show_automated_menu()
if not workflow_list:
break
try:
choice = input(f"\n{Fore.GREEN}Select workflow (1-{len(workflow_list)+1}): {Style.RESET_ALL}").strip()
if not choice.isdigit():
print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}")
continue
choice = int(choice)
if 1 <= choice <= len(workflow_list):
# Execute selected workflow
workflow_key, workflow_name = workflow_list[choice-1]
workflow = get_workflow_by_key(workflow_key)
if not workflow:
print(f"{Fore.RED}Error loading workflow.{Style.RESET_ALL}")
continue
target = input(f"{Fore.YELLOW}Enter target (IP, domain, or network): {Style.RESET_ALL}").strip()
if not target:
print(f"{Fore.RED}Target is required.{Style.RESET_ALL}")
continue
confirm = input(f"{Fore.YELLOW}Execute '{workflow['name']}' on {target}? (yes/no): {Style.RESET_ALL}").strip().lower()
if confirm == 'yes':
await run_automated_workflow(workflow, target, connected_servers, conversation_history, kb_instance)
# Option to save results
export_choice = input(f"\n{Fore.CYAN}Save results to file? (yes/no): {Style.RESET_ALL}").strip().lower()
if export_choice == 'yes':
import time
# Create reports directory if it doesn't exist
reports_dir = "reports"
if not os.path.exists(reports_dir):
os.makedirs(reports_dir)
print(f"{Fore.GREEN}Created reports directory: {reports_dir}{Style.RESET_ALL}")
# Generate filename with timestamp
timestamp = int(time.time())
filename = f"{reports_dir}/ghostcrew_{workflow_key}_{timestamp}.txt"
with open(filename, 'w') as f:
f.write(f"GHOSTCREW Automated Penetration Test Report\n")
f.write("="*60 + "\n")
f.write(f"Workflow: {workflow['name']}\n")
f.write(f"Target: {target}\n")
f.write(f"Timestamp: {time.ctime()}\n")
f.write(f"Report Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("="*60 + "\n\n")
# Save recent conversation history related to this workflow
recent_history = conversation_history[-len(workflow['steps']):]
for i, entry in enumerate(recent_history, 1):
f.write(f"STEP {i}: {workflow['steps'][i-1].format(target=target)}\n")
f.write("-"*40 + "\n")
f.write(f"Query: {entry['user_query']}\n\n")
f.write(f"Response: {entry.get('ai_response', 'No response')}\n")
f.write("="*60 + "\n\n")
print(f"{Fore.GREEN}Results saved to: {filename}{Style.RESET_ALL}")
input(f"\n{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}Workflow cancelled.{Style.RESET_ALL}")
elif choice == len(workflow_list) + 1:
# Back to main menu
break
else:
print(f"{Fore.RED}Invalid choice. Please select a valid option.{Style.RESET_ALL}")
except ValueError:
print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}")
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Operation cancelled.{Style.RESET_ALL}")
break
elif menu_choice == "3":
# Quit
print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}")
break
else:
print(f"{Fore.RED}Invalid choice. Please select 1, 2, or 3.{Style.RESET_ALL}")
# --- Catch interrupts and runtime exceptions ---
except KeyboardInterrupt:

73
workflows.py Normal file
View File

@@ -0,0 +1,73 @@
# GHOSTCREW Automated Penetration Testing Workflows
def get_available_workflows():
"""
Get all available automated workflows.
All workflows can use any configured tools - no restrictions.
"""
workflows = {
"reconnaissance": {
"name": "Reconnaissance and Discovery",
"description": "Comprehensive information gathering and target profiling",
"steps": [
"Perform comprehensive reconnaissance on {target}",
"Discover subdomains and DNS information",
"Scan for open ports and services",
"Identify technology stack and fingerprints",
"Gather historical data and archived content"
]
},
"web_application": {
"name": "Web Application Security Assessment",
"description": "Comprehensive web application penetration testing",
"steps": [
"Discover web directories and hidden content on {target}",
"Test for SQL injection vulnerabilities",
"Scan for web application vulnerabilities and misconfigurations",
"Analyze SSL/TLS configuration and security",
"Test for authentication and session management flaws",
"Check for file inclusion and upload vulnerabilities"
]
},
"network_infrastructure": {
"name": "Network Infrastructure Penetration Test",
"description": "Network-focused penetration testing and exploitation",
"steps": [
"Scan network range {target} for live hosts and services",
"Perform detailed service enumeration and version detection",
"Scan for known vulnerabilities in discovered services",
"Test for network service misconfigurations",
"Attempt exploitation of discovered vulnerabilities",
"Assess network segmentation and access controls"
]
},
"full_penetration_test": {
"name": "Complete Penetration Test",
"description": "Full-scope penetration testing methodology",
"steps": [
"Phase 1: Comprehensive reconnaissance and asset discovery on {target}",
"Phase 2: Service enumeration and technology fingerprinting",
"Phase 3: Vulnerability scanning and identification",
"Phase 4: Web application security testing (if applicable)",
"Phase 5: Network service exploitation attempts",
"Phase 6: Post-exploitation and privilege escalation testing",
"Phase 7: Document findings and provide comprehensive remediation guidance"
]
}
}
return workflows
def get_workflow_by_key(workflow_key):
"""Get a specific workflow by its key"""
workflows = get_available_workflows()
return workflows.get(workflow_key, None)
def list_workflow_names():
"""Get a list of all workflow names for display"""
workflows = get_available_workflows()
return [(key, workflow["name"]) for key, workflow in workflows.items()]