diff --git a/.gitignore b/.gitignore index c491ae7..33d5fa7 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,7 @@ __pycache__/ *.pyo .env venv/ + +# Reports directory - contains sensitive penetration test results +reports/ +*.txt diff --git a/README.md b/README.md index 59aa06e..4fca9df 100644 --- a/README.md +++ b/README.md @@ -10,11 +10,61 @@ https://github.com/user-attachments/assets/73176f92-a94d-4b66-9aa7-ee06c438a741 - **MCP Server Integration**: Through the `mcp.json` configuration file, multiple MCP servers can be flexibly integrated and managed to extend the assistant's capabilities. - **Tool Management**: Configure, connect to, and manage MCP tools through an interactive menu, including the ability to clear all configurations. - **Tool Invocation**: The AI assistant can call tools provided by configured MCP servers (such as: nmap, metasploit, ffuf, etc.) based on user requests. +- **Automated Pentesting Workflows**: Execute predefined penetration testing workflows that systematically use configured security tools to perform comprehensive assessments. - **Conversation History**: Supports multi-turn dialogues, remembering previous interaction content. - **Streaming Output**: AI responses can be streamed for a better user experience. - **Knowledge Base Enhancement (Optional)**: Supports enhancing AI responses through a local knowledge base RAG (`knowledge` directory). - **Configurable Models**: Supports configuration of different language model parameters. +## Automated Penetration Testing Workflows + +GHOSTCREW includes automated penetration testing workflows that provide structured, systematic security assessments. These workflows require MCP tools to be configured and connected to function properly. + +### Available Workflows + +1. **Reconnaissance and Discovery** + - Comprehensive information gathering and target profiling + - Performs reconnaissance, subdomain discovery, port scanning, technology fingerprinting, and historical data analysis + - **Steps**: 5 systematic phases + +2. **Web Application Security Assessment** + - Comprehensive web application penetration testing + - Tests for directory traversal, SQL injection, web vulnerabilities, SSL/TLS security, authentication flaws, and file inclusion + - **Steps**: 6 focused web security phases + +3. **Network Infrastructure Penetration Test** + - Network-focused penetration testing and exploitation + - Includes network scanning, service enumeration, vulnerability identification, misconfiguration testing, and exploitation attempts + - **Steps**: 6 network security phases + +4. **Complete Penetration Test** + - Full-scope penetration testing methodology + - Comprehensive assessment covering reconnaissance, enumeration, vulnerability scanning, web testing, network exploitation, post-exploitation, and reporting + - **Steps**: 7 complete assessment phases + +### Workflow Features + +- **Tool Integration**: All workflows utilize configured MCP tools (Nmap, Metasploit, Nuclei, etc.) for real security testing +- **Professional Output**: Each step provides detailed technical findings, vulnerability analysis, risk assessment, and remediation recommendations +- **Report Generation**: Automatically save reports to organized `reports/` directory with workflow-specific naming +- **Target Flexibility**: Works with IP addresses, domain names, or network ranges +- **Progress Tracking**: Real-time progress indication through each workflow step + +### Usage Requirements + +- **MCP Tools Required**: Automated workflows require at least one MCP security tool to be configured and connected +- **Access Control**: The system prevents workflow execution without proper tools to avoid generating simulated results +- **Professional Context**: Designed for authorized penetration testing and security assessments only + +### How to Use Workflows + +1. Start GHOSTCREW and configure MCP tools when prompted +2. Select "Automated Penetration Testing" from the main menu +3. Choose your desired workflow type +4. Enter the target (IP, domain, or network range) +5. Confirm execution and monitor progress +6. Optionally save results to file for documentation + ### Startup Effect
@@ -125,8 +175,11 @@ agent/
├── .venv/ # Python virtual environment (ignored by .gitignore)
├── knowledge/ # Knowledge base documents directory
│ └── ...
+├── reports/ # Automated penetration test reports directory
+│ └── ghostcrew_pentest_*.txt
├── .gitignore # Git ignore file configuration
├── main.py # Main program entry
+├── workflows.py # Automated penetration testing workflows
├── configure_mcp.py # MCP tool configuration utility
├── mcp.json # MCP server configuration file
├── rag_embedding.py # RAG embedding related (if used)
diff --git a/main.py b/main.py
index 02722ae..a27e472 100644
--- a/main.py
+++ b/main.py
@@ -8,6 +8,12 @@ from colorama import init, Fore, Back, Style
from ollama import chat,Message
import tiktoken
+# Import workflows
+try:
+ from workflows import get_available_workflows, get_workflow_by_key, list_workflow_names
+ WORKFLOWS_AVAILABLE = True
+except ImportError:
+ WORKFLOWS_AVAILABLE = False
init(autoreset=True)
@@ -78,6 +84,96 @@ class DefaultModelProvider(ModelProvider):
# Create model provider instance
model_provider = DefaultModelProvider()
+def get_available_tools(connected_servers):
+ """Get list of available/connected tool names"""
+ return [server.name for server in connected_servers]
+
+async def run_automated_workflow(workflow, target, connected_servers, conversation_history, kb_instance):
+ """Execute an automated penetration testing workflow"""
+ available_tools = get_available_tools(connected_servers)
+
+ print(f"\n{Fore.CYAN}Starting Automated Workflow: {workflow['name']}{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}Target: {target}{Style.RESET_ALL}")
+ print(f"{Fore.CYAN}Available Tools: {', '.join(available_tools) if available_tools else 'None'}{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Description: {workflow['description']}{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}{'='*60}{Style.RESET_ALL}")
+
+ results = []
+
+ for i, step in enumerate(workflow['steps'], 1):
+ print(f"\n{Fore.CYAN}Step {i}/{len(workflow['steps'])}{Style.RESET_ALL}")
+ formatted_step = step.format(target=target)
+ print(f"{Fore.WHITE}{formatted_step}{Style.RESET_ALL}")
+
+ # Create comprehensive query for this step
+ enhanced_query = f"""
+Execute the following penetration testing step as part of an automated workflow:
+
+TARGET: {target}
+STEP: {formatted_step}
+AVAILABLE_TOOLS: {', '.join(available_tools) if available_tools else 'No tools configured'}
+
+Please execute this step systematically using appropriate tools and provide:
+1. Detailed command outputs and technical findings
+2. Security vulnerabilities or issues discovered
+3. Risk assessment and severity analysis
+4. Specific recommendations for remediation
+5. Relevant technical details and evidence
+
+Be thorough and professional in your analysis. Use the available tools effectively.
+"""
+
+ # Execute the step through the agent
+ result = await run_agent(enhanced_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance)
+
+ if result and hasattr(result, "final_output"):
+ results.append({
+ "step": i,
+ "description": formatted_step,
+ "output": result.final_output
+ })
+
+ # Add to conversation history
+ conversation_history.append({
+ "user_query": enhanced_query,
+ "ai_response": result.final_output
+ })
+
+ print(f"{Fore.GREEN}Step {i} completed{Style.RESET_ALL}")
+
+ # Brief delay between steps
+ await asyncio.sleep(1)
+
+ # Workflow completion summary
+ print(f"\n{Fore.GREEN}Automated workflow completed successfully{Style.RESET_ALL}")
+ print(f"{Fore.CYAN}Steps executed: {len(results)}/{len(workflow['steps'])}{Style.RESET_ALL}")
+
+ return results
+
+def show_automated_menu():
+ """Display the automated workflow selection menu"""
+ if not WORKFLOWS_AVAILABLE:
+ print(f"{Fore.YELLOW}Automated workflows not available. workflows.py file not found.{Style.RESET_ALL}")
+ return None
+
+ print(f"\n{Fore.CYAN}AUTOMATED PENTESTING WORKFLOWS{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}{'='*50}{Style.RESET_ALL}")
+
+ workflow_list = list_workflow_names()
+ workflows = get_available_workflows()
+
+ for i, (key, name) in enumerate(workflow_list, 1):
+ description = workflows[key]["description"]
+ step_count = len(workflows[key]["steps"])
+ print(f"{i}. {Fore.YELLOW}{name}{Style.RESET_ALL}")
+ print(f" {Fore.WHITE}{description}{Style.RESET_ALL}")
+ print(f" {Fore.CYAN}Steps: {step_count}{Style.RESET_ALL}")
+ print()
+
+ print(f"{len(workflow_list)+1}. {Fore.RED}Back to Main Menu{Style.RESET_ALL}")
+
+ return workflow_list
+
# Modify run_agent function to accept connected server list and conversation history as parameters
async def run_agent(query: str, mcp_servers: list[MCPServerStdio], history: list[dict] = None, streaming: bool = True, kb_instance=None):
"""
@@ -406,79 +502,214 @@ async def main():
# Create conversation history list
conversation_history = []
- # --- Enter interactive main loop ---
+ # --- Main program loop ---
while True:
- # Check if the user wants multi-line input
- print(f"\n{Fore.GREEN}[>]{Style.RESET_ALL} ", end="")
- user_query = input().strip()
+ # Show main menu
+ print(f"\n{Fore.CYAN}MAIN MENU{Style.RESET_ALL}")
+ print(f"1. {Fore.GREEN}Interactive Chat Mode{Style.RESET_ALL}")
- # Handle special commands
- if user_query.lower() in ["quit", "exit"]:
- print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}")
- break # Exit loop, enter finally block
-
- # Handle empty input
- if not user_query:
- print(f"{Fore.YELLOW}No query entered. Please type your question.{Style.RESET_ALL}")
- continue
-
- # Handle multi-line mode request
- if user_query.lower() == "multi":
- print(f"{Fore.CYAN}Entering multi-line mode. Type your query across multiple lines.{Style.RESET_ALL}")
- print(f"{Fore.CYAN}Press Enter on an empty line to submit.{Style.RESET_ALL}")
+ # Check if automated mode should be available
+ if WORKFLOWS_AVAILABLE and connected_servers:
+ print(f"2. {Fore.YELLOW}Automated Penetration Testing{Style.RESET_ALL}")
+ max_option = 3
+ elif WORKFLOWS_AVAILABLE and not connected_servers:
+ print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (requires MCP tools){Style.RESET_ALL}")
+ max_option = 3
+ else:
+ print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (workflows.py not found){Style.RESET_ALL}")
+ max_option = 3
+
+ print(f"3. {Fore.RED}Quit{Style.RESET_ALL}")
+
+ menu_choice = input(f"\n{Fore.GREEN}Select mode (1-3): {Style.RESET_ALL}").strip()
+
+ if menu_choice == "1":
+ # Interactive chat mode (existing functionality)
+ print(f"\n{Fore.CYAN}INTERACTIVE CHAT MODE{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Type your questions or commands. Use 'multi' for multi-line input.{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Type 'menu' to return to main menu.{Style.RESET_ALL}\n")
- lines = []
while True:
- line = input()
- if line == "":
- break
- lines.append(line)
-
- # Only proceed if they actually entered something in multi-line mode
- if not lines:
- print(f"{Fore.YELLOW}No query entered in multi-line mode.{Style.RESET_ALL}")
- continue
+ # Check if the user wants multi-line input
+ print(f"\n{Fore.GREEN}[>]{Style.RESET_ALL} ", end="")
+ user_query = input().strip()
- user_query = "\n".join(lines)
+ # Handle special commands
+ if user_query.lower() in ["quit", "exit"]:
+ print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}")
+ return
+
+ if user_query.lower() == "menu":
+ break
+
+ # Handle empty input
+ if not user_query:
+ print(f"{Fore.YELLOW}No query entered. Please type your question.{Style.RESET_ALL}")
+ continue
+
+ # Handle multi-line mode request
+ if user_query.lower() == "multi":
+ print(f"{Fore.CYAN}Entering multi-line mode. Type your query across multiple lines.{Style.RESET_ALL}")
+ print(f"{Fore.CYAN}Press Enter on an empty line to submit.{Style.RESET_ALL}")
+
+ lines = []
+ while True:
+ line = input()
+ if line == "":
+ break
+ lines.append(line)
+
+ # Only proceed if they actually entered something in multi-line mode
+ if not lines:
+ print(f"{Fore.YELLOW}No query entered in multi-line mode.{Style.RESET_ALL}")
+ continue
+
+ user_query = "\n".join(lines)
- # Create record for current dialogue
- current_dialogue = {"user_query": user_query, "ai_response": ""}
-
- # When running agent, pass in the already connected server list and conversation history
- # Only pass the successfully connected server list to the Agent
- # Pass kb_instance to run_agent
- result = await run_agent(user_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance)
-
- # If there is a result, save the AI's answer
- if result and hasattr(result, "final_output"):
- current_dialogue["ai_response"] = result.final_output
-
- # Add current dialogue to history
- conversation_history.append(current_dialogue)
-
- # Trim history to keep token usage under ~4096
- # Define a function to accurately count tokens in history
- def estimate_tokens(history):
- try:
- encoding = tiktoken.encoding_for_model(MODEL_NAME)
- return sum(
- len(encoding.encode(entry['user_query'])) +
- len(encoding.encode(entry.get('ai_response', '')))
- for entry in history
- )
- except Exception:
- # Fall back to approximate counting if tiktoken fails
- return sum(
- len(entry['user_query'].split()) +
- len(entry.get('ai_response', '').split())
- for entry in history
- )
+ # Create record for current dialogue
+ current_dialogue = {"user_query": user_query, "ai_response": ""}
+
+ # When running agent, pass in the already connected server list and conversation history
+ # Only pass the successfully connected server list to the Agent
+ # Pass kb_instance to run_agent
+ result = await run_agent(user_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance)
+
+ # If there is a result, save the AI's answer
+ if result and hasattr(result, "final_output"):
+ current_dialogue["ai_response"] = result.final_output
+
+ # Add current dialogue to history
+ conversation_history.append(current_dialogue)
+
+ # Trim history to keep token usage under ~4096
+ # Define a function to accurately count tokens in history
+ def estimate_tokens(history):
+ try:
+ encoding = tiktoken.encoding_for_model(MODEL_NAME)
+ return sum(
+ len(encoding.encode(entry['user_query'])) +
+ len(encoding.encode(entry.get('ai_response', '')))
+ for entry in history
+ )
+ except Exception:
+ # Fall back to approximate counting if tiktoken fails
+ return sum(
+ len(entry['user_query'].split()) +
+ len(entry.get('ai_response', '').split())
+ for entry in history
+ )
- # Trim history while token count exceeds the limit
- while estimate_tokens(conversation_history) > 4000:
- conversation_history.pop(0)
+ # Trim history while token count exceeds the limit
+ while estimate_tokens(conversation_history) > 4000:
+ conversation_history.pop(0)
+
+ print(f"\n{Fore.CYAN}Ready for next query. Type 'quit', 'multi' for multi-line, or 'menu' for main menu.{Style.RESET_ALL}")
+
+ elif menu_choice == "2":
+ # Automated penetration testing workflows
+ if not WORKFLOWS_AVAILABLE:
+ print(f"\n{Fore.YELLOW}Automated workflows not available. workflows.py file not found.{Style.RESET_ALL}")
+ input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
+ continue
- print(f"\n{Fore.CYAN}Ready for your next query. Type 'quit' to exit or 'multi' for multi-line input.{Style.RESET_ALL}")
+ if not connected_servers:
+ print(f"\n{Fore.YELLOW}Automated penetration testing requires MCP tools to be configured and connected.{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Without real security tools, the AI would only generate simulated responses.{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Please restart the application and configure MCP tools to use this feature.{Style.RESET_ALL}")
+ input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
+ continue
+
+ while True:
+ workflow_list = show_automated_menu()
+ if not workflow_list:
+ break
+
+ try:
+ choice = input(f"\n{Fore.GREEN}Select workflow (1-{len(workflow_list)+1}): {Style.RESET_ALL}").strip()
+
+ if not choice.isdigit():
+ print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}")
+ continue
+
+ choice = int(choice)
+
+ if 1 <= choice <= len(workflow_list):
+ # Execute selected workflow
+ workflow_key, workflow_name = workflow_list[choice-1]
+ workflow = get_workflow_by_key(workflow_key)
+
+ if not workflow:
+ print(f"{Fore.RED}Error loading workflow.{Style.RESET_ALL}")
+ continue
+
+ target = input(f"{Fore.YELLOW}Enter target (IP, domain, or network): {Style.RESET_ALL}").strip()
+ if not target:
+ print(f"{Fore.RED}Target is required.{Style.RESET_ALL}")
+ continue
+
+ confirm = input(f"{Fore.YELLOW}Execute '{workflow['name']}' on {target}? (yes/no): {Style.RESET_ALL}").strip().lower()
+ if confirm == 'yes':
+ await run_automated_workflow(workflow, target, connected_servers, conversation_history, kb_instance)
+
+ # Option to save results
+ export_choice = input(f"\n{Fore.CYAN}Save results to file? (yes/no): {Style.RESET_ALL}").strip().lower()
+ if export_choice == 'yes':
+ import time
+
+ # Create reports directory if it doesn't exist
+ reports_dir = "reports"
+ if not os.path.exists(reports_dir):
+ os.makedirs(reports_dir)
+ print(f"{Fore.GREEN}Created reports directory: {reports_dir}{Style.RESET_ALL}")
+
+ # Generate filename with timestamp
+ timestamp = int(time.time())
+ filename = f"{reports_dir}/ghostcrew_{workflow_key}_{timestamp}.txt"
+
+ with open(filename, 'w') as f:
+ f.write(f"GHOSTCREW Automated Penetration Test Report\n")
+ f.write("="*60 + "\n")
+ f.write(f"Workflow: {workflow['name']}\n")
+ f.write(f"Target: {target}\n")
+ f.write(f"Timestamp: {time.ctime()}\n")
+ f.write(f"Report Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
+ f.write("="*60 + "\n\n")
+
+ # Save recent conversation history related to this workflow
+ recent_history = conversation_history[-len(workflow['steps']):]
+ for i, entry in enumerate(recent_history, 1):
+ f.write(f"STEP {i}: {workflow['steps'][i-1].format(target=target)}\n")
+ f.write("-"*40 + "\n")
+ f.write(f"Query: {entry['user_query']}\n\n")
+ f.write(f"Response: {entry.get('ai_response', 'No response')}\n")
+ f.write("="*60 + "\n\n")
+
+ print(f"{Fore.GREEN}Results saved to: {filename}{Style.RESET_ALL}")
+
+ input(f"\n{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
+ else:
+ print(f"{Fore.YELLOW}Workflow cancelled.{Style.RESET_ALL}")
+
+ elif choice == len(workflow_list) + 1:
+ # Back to main menu
+ break
+
+ else:
+ print(f"{Fore.RED}Invalid choice. Please select a valid option.{Style.RESET_ALL}")
+
+ except ValueError:
+ print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}")
+ except KeyboardInterrupt:
+ print(f"\n{Fore.YELLOW}Operation cancelled.{Style.RESET_ALL}")
+ break
+
+ elif menu_choice == "3":
+ # Quit
+ print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}")
+ break
+
+ else:
+ print(f"{Fore.RED}Invalid choice. Please select 1, 2, or 3.{Style.RESET_ALL}")
# --- Catch interrupts and runtime exceptions ---
except KeyboardInterrupt:
diff --git a/workflows.py b/workflows.py
new file mode 100644
index 0000000..5ef9130
--- /dev/null
+++ b/workflows.py
@@ -0,0 +1,73 @@
+# GHOSTCREW Automated Penetration Testing Workflows
+
+def get_available_workflows():
+ """
+ Get all available automated workflows.
+ All workflows can use any configured tools - no restrictions.
+ """
+
+ workflows = {
+ "reconnaissance": {
+ "name": "Reconnaissance and Discovery",
+ "description": "Comprehensive information gathering and target profiling",
+ "steps": [
+ "Perform comprehensive reconnaissance on {target}",
+ "Discover subdomains and DNS information",
+ "Scan for open ports and services",
+ "Identify technology stack and fingerprints",
+ "Gather historical data and archived content"
+ ]
+ },
+
+ "web_application": {
+ "name": "Web Application Security Assessment",
+ "description": "Comprehensive web application penetration testing",
+ "steps": [
+ "Discover web directories and hidden content on {target}",
+ "Test for SQL injection vulnerabilities",
+ "Scan for web application vulnerabilities and misconfigurations",
+ "Analyze SSL/TLS configuration and security",
+ "Test for authentication and session management flaws",
+ "Check for file inclusion and upload vulnerabilities"
+ ]
+ },
+
+ "network_infrastructure": {
+ "name": "Network Infrastructure Penetration Test",
+ "description": "Network-focused penetration testing and exploitation",
+ "steps": [
+ "Scan network range {target} for live hosts and services",
+ "Perform detailed service enumeration and version detection",
+ "Scan for known vulnerabilities in discovered services",
+ "Test for network service misconfigurations",
+ "Attempt exploitation of discovered vulnerabilities",
+ "Assess network segmentation and access controls"
+ ]
+ },
+
+ "full_penetration_test": {
+ "name": "Complete Penetration Test",
+ "description": "Full-scope penetration testing methodology",
+ "steps": [
+ "Phase 1: Comprehensive reconnaissance and asset discovery on {target}",
+ "Phase 2: Service enumeration and technology fingerprinting",
+ "Phase 3: Vulnerability scanning and identification",
+ "Phase 4: Web application security testing (if applicable)",
+ "Phase 5: Network service exploitation attempts",
+ "Phase 6: Post-exploitation and privilege escalation testing",
+ "Phase 7: Document findings and provide comprehensive remediation guidance"
+ ]
+ }
+ }
+
+ return workflows
+
+def get_workflow_by_key(workflow_key):
+ """Get a specific workflow by its key"""
+ workflows = get_available_workflows()
+ return workflows.get(workflow_key, None)
+
+def list_workflow_names():
+ """Get a list of all workflow names for display"""
+ workflows = get_available_workflows()
+ return [(key, workflow["name"]) for key, workflow in workflows.items()]
\ No newline at end of file