From d2bae28ca9ed0a7c3bda2e0f7789df2f485f9092 Mon Sep 17 00:00:00 2001 From: GH05TCREW Date: Thu, 22 May 2025 21:34:26 -0600 Subject: [PATCH] Add pentest workflows --- .gitignore | 4 + README.md | 53 ++++++++ main.py | 361 +++++++++++++++++++++++++++++++++++++++++---------- workflows.py | 73 +++++++++++ 4 files changed, 426 insertions(+), 65 deletions(-) create mode 100644 workflows.py diff --git a/.gitignore b/.gitignore index c491ae7..33d5fa7 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,7 @@ __pycache__/ *.pyo .env venv/ + +# Reports directory - contains sensitive penetration test results +reports/ +*.txt diff --git a/README.md b/README.md index 59aa06e..4fca9df 100644 --- a/README.md +++ b/README.md @@ -10,11 +10,61 @@ https://github.com/user-attachments/assets/73176f92-a94d-4b66-9aa7-ee06c438a741 - **MCP Server Integration**: Through the `mcp.json` configuration file, multiple MCP servers can be flexibly integrated and managed to extend the assistant's capabilities. - **Tool Management**: Configure, connect to, and manage MCP tools through an interactive menu, including the ability to clear all configurations. - **Tool Invocation**: The AI assistant can call tools provided by configured MCP servers (such as: nmap, metasploit, ffuf, etc.) based on user requests. +- **Automated Pentesting Workflows**: Execute predefined penetration testing workflows that systematically use configured security tools to perform comprehensive assessments. - **Conversation History**: Supports multi-turn dialogues, remembering previous interaction content. - **Streaming Output**: AI responses can be streamed for a better user experience. - **Knowledge Base Enhancement (Optional)**: Supports enhancing AI responses through a local knowledge base RAG (`knowledge` directory). - **Configurable Models**: Supports configuration of different language model parameters. +## Automated Penetration Testing Workflows + +GHOSTCREW includes automated penetration testing workflows that provide structured, systematic security assessments. These workflows require MCP tools to be configured and connected to function properly. + +### Available Workflows + +1. **Reconnaissance and Discovery** + - Comprehensive information gathering and target profiling + - Performs reconnaissance, subdomain discovery, port scanning, technology fingerprinting, and historical data analysis + - **Steps**: 5 systematic phases + +2. **Web Application Security Assessment** + - Comprehensive web application penetration testing + - Tests for directory traversal, SQL injection, web vulnerabilities, SSL/TLS security, authentication flaws, and file inclusion + - **Steps**: 6 focused web security phases + +3. **Network Infrastructure Penetration Test** + - Network-focused penetration testing and exploitation + - Includes network scanning, service enumeration, vulnerability identification, misconfiguration testing, and exploitation attempts + - **Steps**: 6 network security phases + +4. **Complete Penetration Test** + - Full-scope penetration testing methodology + - Comprehensive assessment covering reconnaissance, enumeration, vulnerability scanning, web testing, network exploitation, post-exploitation, and reporting + - **Steps**: 7 complete assessment phases + +### Workflow Features + +- **Tool Integration**: All workflows utilize configured MCP tools (Nmap, Metasploit, Nuclei, etc.) for real security testing +- **Professional Output**: Each step provides detailed technical findings, vulnerability analysis, risk assessment, and remediation recommendations +- **Report Generation**: Automatically save reports to organized `reports/` directory with workflow-specific naming +- **Target Flexibility**: Works with IP addresses, domain names, or network ranges +- **Progress Tracking**: Real-time progress indication through each workflow step + +### Usage Requirements + +- **MCP Tools Required**: Automated workflows require at least one MCP security tool to be configured and connected +- **Access Control**: The system prevents workflow execution without proper tools to avoid generating simulated results +- **Professional Context**: Designed for authorized penetration testing and security assessments only + +### How to Use Workflows + +1. Start GHOSTCREW and configure MCP tools when prompted +2. Select "Automated Penetration Testing" from the main menu +3. Choose your desired workflow type +4. Enter the target (IP, domain, or network range) +5. Confirm execution and monitor progress +6. Optionally save results to file for documentation + ### Startup Effect

GHOSTCREW Terminal Startup Screen @@ -125,8 +175,11 @@ agent/ ├── .venv/ # Python virtual environment (ignored by .gitignore) ├── knowledge/ # Knowledge base documents directory │ └── ... +├── reports/ # Automated penetration test reports directory +│ └── ghostcrew_pentest_*.txt ├── .gitignore # Git ignore file configuration ├── main.py # Main program entry +├── workflows.py # Automated penetration testing workflows ├── configure_mcp.py # MCP tool configuration utility ├── mcp.json # MCP server configuration file ├── rag_embedding.py # RAG embedding related (if used) diff --git a/main.py b/main.py index 02722ae..a27e472 100644 --- a/main.py +++ b/main.py @@ -8,6 +8,12 @@ from colorama import init, Fore, Back, Style from ollama import chat,Message import tiktoken +# Import workflows +try: + from workflows import get_available_workflows, get_workflow_by_key, list_workflow_names + WORKFLOWS_AVAILABLE = True +except ImportError: + WORKFLOWS_AVAILABLE = False init(autoreset=True) @@ -78,6 +84,96 @@ class DefaultModelProvider(ModelProvider): # Create model provider instance model_provider = DefaultModelProvider() +def get_available_tools(connected_servers): + """Get list of available/connected tool names""" + return [server.name for server in connected_servers] + +async def run_automated_workflow(workflow, target, connected_servers, conversation_history, kb_instance): + """Execute an automated penetration testing workflow""" + available_tools = get_available_tools(connected_servers) + + print(f"\n{Fore.CYAN}Starting Automated Workflow: {workflow['name']}{Style.RESET_ALL}") + print(f"{Fore.YELLOW}Target: {target}{Style.RESET_ALL}") + print(f"{Fore.CYAN}Available Tools: {', '.join(available_tools) if available_tools else 'None'}{Style.RESET_ALL}") + print(f"{Fore.WHITE}Description: {workflow['description']}{Style.RESET_ALL}") + print(f"{Fore.WHITE}{'='*60}{Style.RESET_ALL}") + + results = [] + + for i, step in enumerate(workflow['steps'], 1): + print(f"\n{Fore.CYAN}Step {i}/{len(workflow['steps'])}{Style.RESET_ALL}") + formatted_step = step.format(target=target) + print(f"{Fore.WHITE}{formatted_step}{Style.RESET_ALL}") + + # Create comprehensive query for this step + enhanced_query = f""" +Execute the following penetration testing step as part of an automated workflow: + +TARGET: {target} +STEP: {formatted_step} +AVAILABLE_TOOLS: {', '.join(available_tools) if available_tools else 'No tools configured'} + +Please execute this step systematically using appropriate tools and provide: +1. Detailed command outputs and technical findings +2. Security vulnerabilities or issues discovered +3. Risk assessment and severity analysis +4. Specific recommendations for remediation +5. Relevant technical details and evidence + +Be thorough and professional in your analysis. Use the available tools effectively. +""" + + # Execute the step through the agent + result = await run_agent(enhanced_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance) + + if result and hasattr(result, "final_output"): + results.append({ + "step": i, + "description": formatted_step, + "output": result.final_output + }) + + # Add to conversation history + conversation_history.append({ + "user_query": enhanced_query, + "ai_response": result.final_output + }) + + print(f"{Fore.GREEN}Step {i} completed{Style.RESET_ALL}") + + # Brief delay between steps + await asyncio.sleep(1) + + # Workflow completion summary + print(f"\n{Fore.GREEN}Automated workflow completed successfully{Style.RESET_ALL}") + print(f"{Fore.CYAN}Steps executed: {len(results)}/{len(workflow['steps'])}{Style.RESET_ALL}") + + return results + +def show_automated_menu(): + """Display the automated workflow selection menu""" + if not WORKFLOWS_AVAILABLE: + print(f"{Fore.YELLOW}Automated workflows not available. workflows.py file not found.{Style.RESET_ALL}") + return None + + print(f"\n{Fore.CYAN}AUTOMATED PENTESTING WORKFLOWS{Style.RESET_ALL}") + print(f"{Fore.WHITE}{'='*50}{Style.RESET_ALL}") + + workflow_list = list_workflow_names() + workflows = get_available_workflows() + + for i, (key, name) in enumerate(workflow_list, 1): + description = workflows[key]["description"] + step_count = len(workflows[key]["steps"]) + print(f"{i}. {Fore.YELLOW}{name}{Style.RESET_ALL}") + print(f" {Fore.WHITE}{description}{Style.RESET_ALL}") + print(f" {Fore.CYAN}Steps: {step_count}{Style.RESET_ALL}") + print() + + print(f"{len(workflow_list)+1}. {Fore.RED}Back to Main Menu{Style.RESET_ALL}") + + return workflow_list + # Modify run_agent function to accept connected server list and conversation history as parameters async def run_agent(query: str, mcp_servers: list[MCPServerStdio], history: list[dict] = None, streaming: bool = True, kb_instance=None): """ @@ -406,79 +502,214 @@ async def main(): # Create conversation history list conversation_history = [] - # --- Enter interactive main loop --- + # --- Main program loop --- while True: - # Check if the user wants multi-line input - print(f"\n{Fore.GREEN}[>]{Style.RESET_ALL} ", end="") - user_query = input().strip() + # Show main menu + print(f"\n{Fore.CYAN}MAIN MENU{Style.RESET_ALL}") + print(f"1. {Fore.GREEN}Interactive Chat Mode{Style.RESET_ALL}") - # Handle special commands - if user_query.lower() in ["quit", "exit"]: - print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}") - break # Exit loop, enter finally block - - # Handle empty input - if not user_query: - print(f"{Fore.YELLOW}No query entered. Please type your question.{Style.RESET_ALL}") - continue - - # Handle multi-line mode request - if user_query.lower() == "multi": - print(f"{Fore.CYAN}Entering multi-line mode. Type your query across multiple lines.{Style.RESET_ALL}") - print(f"{Fore.CYAN}Press Enter on an empty line to submit.{Style.RESET_ALL}") + # Check if automated mode should be available + if WORKFLOWS_AVAILABLE and connected_servers: + print(f"2. {Fore.YELLOW}Automated Penetration Testing{Style.RESET_ALL}") + max_option = 3 + elif WORKFLOWS_AVAILABLE and not connected_servers: + print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (requires MCP tools){Style.RESET_ALL}") + max_option = 3 + else: + print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (workflows.py not found){Style.RESET_ALL}") + max_option = 3 + + print(f"3. {Fore.RED}Quit{Style.RESET_ALL}") + + menu_choice = input(f"\n{Fore.GREEN}Select mode (1-3): {Style.RESET_ALL}").strip() + + if menu_choice == "1": + # Interactive chat mode (existing functionality) + print(f"\n{Fore.CYAN}INTERACTIVE CHAT MODE{Style.RESET_ALL}") + print(f"{Fore.WHITE}Type your questions or commands. Use 'multi' for multi-line input.{Style.RESET_ALL}") + print(f"{Fore.WHITE}Type 'menu' to return to main menu.{Style.RESET_ALL}\n") - lines = [] while True: - line = input() - if line == "": - break - lines.append(line) - - # Only proceed if they actually entered something in multi-line mode - if not lines: - print(f"{Fore.YELLOW}No query entered in multi-line mode.{Style.RESET_ALL}") - continue + # Check if the user wants multi-line input + print(f"\n{Fore.GREEN}[>]{Style.RESET_ALL} ", end="") + user_query = input().strip() - user_query = "\n".join(lines) + # Handle special commands + if user_query.lower() in ["quit", "exit"]: + print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}") + return + + if user_query.lower() == "menu": + break + + # Handle empty input + if not user_query: + print(f"{Fore.YELLOW}No query entered. Please type your question.{Style.RESET_ALL}") + continue + + # Handle multi-line mode request + if user_query.lower() == "multi": + print(f"{Fore.CYAN}Entering multi-line mode. Type your query across multiple lines.{Style.RESET_ALL}") + print(f"{Fore.CYAN}Press Enter on an empty line to submit.{Style.RESET_ALL}") + + lines = [] + while True: + line = input() + if line == "": + break + lines.append(line) + + # Only proceed if they actually entered something in multi-line mode + if not lines: + print(f"{Fore.YELLOW}No query entered in multi-line mode.{Style.RESET_ALL}") + continue + + user_query = "\n".join(lines) - # Create record for current dialogue - current_dialogue = {"user_query": user_query, "ai_response": ""} - - # When running agent, pass in the already connected server list and conversation history - # Only pass the successfully connected server list to the Agent - # Pass kb_instance to run_agent - result = await run_agent(user_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance) - - # If there is a result, save the AI's answer - if result and hasattr(result, "final_output"): - current_dialogue["ai_response"] = result.final_output - - # Add current dialogue to history - conversation_history.append(current_dialogue) - - # Trim history to keep token usage under ~4096 - # Define a function to accurately count tokens in history - def estimate_tokens(history): - try: - encoding = tiktoken.encoding_for_model(MODEL_NAME) - return sum( - len(encoding.encode(entry['user_query'])) + - len(encoding.encode(entry.get('ai_response', ''))) - for entry in history - ) - except Exception: - # Fall back to approximate counting if tiktoken fails - return sum( - len(entry['user_query'].split()) + - len(entry.get('ai_response', '').split()) - for entry in history - ) + # Create record for current dialogue + current_dialogue = {"user_query": user_query, "ai_response": ""} + + # When running agent, pass in the already connected server list and conversation history + # Only pass the successfully connected server list to the Agent + # Pass kb_instance to run_agent + result = await run_agent(user_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance) + + # If there is a result, save the AI's answer + if result and hasattr(result, "final_output"): + current_dialogue["ai_response"] = result.final_output + + # Add current dialogue to history + conversation_history.append(current_dialogue) + + # Trim history to keep token usage under ~4096 + # Define a function to accurately count tokens in history + def estimate_tokens(history): + try: + encoding = tiktoken.encoding_for_model(MODEL_NAME) + return sum( + len(encoding.encode(entry['user_query'])) + + len(encoding.encode(entry.get('ai_response', ''))) + for entry in history + ) + except Exception: + # Fall back to approximate counting if tiktoken fails + return sum( + len(entry['user_query'].split()) + + len(entry.get('ai_response', '').split()) + for entry in history + ) - # Trim history while token count exceeds the limit - while estimate_tokens(conversation_history) > 4000: - conversation_history.pop(0) + # Trim history while token count exceeds the limit + while estimate_tokens(conversation_history) > 4000: + conversation_history.pop(0) + + print(f"\n{Fore.CYAN}Ready for next query. Type 'quit', 'multi' for multi-line, or 'menu' for main menu.{Style.RESET_ALL}") + + elif menu_choice == "2": + # Automated penetration testing workflows + if not WORKFLOWS_AVAILABLE: + print(f"\n{Fore.YELLOW}Automated workflows not available. workflows.py file not found.{Style.RESET_ALL}") + input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}") + continue - print(f"\n{Fore.CYAN}Ready for your next query. Type 'quit' to exit or 'multi' for multi-line input.{Style.RESET_ALL}") + if not connected_servers: + print(f"\n{Fore.YELLOW}Automated penetration testing requires MCP tools to be configured and connected.{Style.RESET_ALL}") + print(f"{Fore.WHITE}Without real security tools, the AI would only generate simulated responses.{Style.RESET_ALL}") + print(f"{Fore.WHITE}Please restart the application and configure MCP tools to use this feature.{Style.RESET_ALL}") + input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}") + continue + + while True: + workflow_list = show_automated_menu() + if not workflow_list: + break + + try: + choice = input(f"\n{Fore.GREEN}Select workflow (1-{len(workflow_list)+1}): {Style.RESET_ALL}").strip() + + if not choice.isdigit(): + print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}") + continue + + choice = int(choice) + + if 1 <= choice <= len(workflow_list): + # Execute selected workflow + workflow_key, workflow_name = workflow_list[choice-1] + workflow = get_workflow_by_key(workflow_key) + + if not workflow: + print(f"{Fore.RED}Error loading workflow.{Style.RESET_ALL}") + continue + + target = input(f"{Fore.YELLOW}Enter target (IP, domain, or network): {Style.RESET_ALL}").strip() + if not target: + print(f"{Fore.RED}Target is required.{Style.RESET_ALL}") + continue + + confirm = input(f"{Fore.YELLOW}Execute '{workflow['name']}' on {target}? (yes/no): {Style.RESET_ALL}").strip().lower() + if confirm == 'yes': + await run_automated_workflow(workflow, target, connected_servers, conversation_history, kb_instance) + + # Option to save results + export_choice = input(f"\n{Fore.CYAN}Save results to file? (yes/no): {Style.RESET_ALL}").strip().lower() + if export_choice == 'yes': + import time + + # Create reports directory if it doesn't exist + reports_dir = "reports" + if not os.path.exists(reports_dir): + os.makedirs(reports_dir) + print(f"{Fore.GREEN}Created reports directory: {reports_dir}{Style.RESET_ALL}") + + # Generate filename with timestamp + timestamp = int(time.time()) + filename = f"{reports_dir}/ghostcrew_{workflow_key}_{timestamp}.txt" + + with open(filename, 'w') as f: + f.write(f"GHOSTCREW Automated Penetration Test Report\n") + f.write("="*60 + "\n") + f.write(f"Workflow: {workflow['name']}\n") + f.write(f"Target: {target}\n") + f.write(f"Timestamp: {time.ctime()}\n") + f.write(f"Report Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") + f.write("="*60 + "\n\n") + + # Save recent conversation history related to this workflow + recent_history = conversation_history[-len(workflow['steps']):] + for i, entry in enumerate(recent_history, 1): + f.write(f"STEP {i}: {workflow['steps'][i-1].format(target=target)}\n") + f.write("-"*40 + "\n") + f.write(f"Query: {entry['user_query']}\n\n") + f.write(f"Response: {entry.get('ai_response', 'No response')}\n") + f.write("="*60 + "\n\n") + + print(f"{Fore.GREEN}Results saved to: {filename}{Style.RESET_ALL}") + + input(f"\n{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}") + else: + print(f"{Fore.YELLOW}Workflow cancelled.{Style.RESET_ALL}") + + elif choice == len(workflow_list) + 1: + # Back to main menu + break + + else: + print(f"{Fore.RED}Invalid choice. Please select a valid option.{Style.RESET_ALL}") + + except ValueError: + print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}") + except KeyboardInterrupt: + print(f"\n{Fore.YELLOW}Operation cancelled.{Style.RESET_ALL}") + break + + elif menu_choice == "3": + # Quit + print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}") + break + + else: + print(f"{Fore.RED}Invalid choice. Please select 1, 2, or 3.{Style.RESET_ALL}") # --- Catch interrupts and runtime exceptions --- except KeyboardInterrupt: diff --git a/workflows.py b/workflows.py new file mode 100644 index 0000000..5ef9130 --- /dev/null +++ b/workflows.py @@ -0,0 +1,73 @@ +# GHOSTCREW Automated Penetration Testing Workflows + +def get_available_workflows(): + """ + Get all available automated workflows. + All workflows can use any configured tools - no restrictions. + """ + + workflows = { + "reconnaissance": { + "name": "Reconnaissance and Discovery", + "description": "Comprehensive information gathering and target profiling", + "steps": [ + "Perform comprehensive reconnaissance on {target}", + "Discover subdomains and DNS information", + "Scan for open ports and services", + "Identify technology stack and fingerprints", + "Gather historical data and archived content" + ] + }, + + "web_application": { + "name": "Web Application Security Assessment", + "description": "Comprehensive web application penetration testing", + "steps": [ + "Discover web directories and hidden content on {target}", + "Test for SQL injection vulnerabilities", + "Scan for web application vulnerabilities and misconfigurations", + "Analyze SSL/TLS configuration and security", + "Test for authentication and session management flaws", + "Check for file inclusion and upload vulnerabilities" + ] + }, + + "network_infrastructure": { + "name": "Network Infrastructure Penetration Test", + "description": "Network-focused penetration testing and exploitation", + "steps": [ + "Scan network range {target} for live hosts and services", + "Perform detailed service enumeration and version detection", + "Scan for known vulnerabilities in discovered services", + "Test for network service misconfigurations", + "Attempt exploitation of discovered vulnerabilities", + "Assess network segmentation and access controls" + ] + }, + + "full_penetration_test": { + "name": "Complete Penetration Test", + "description": "Full-scope penetration testing methodology", + "steps": [ + "Phase 1: Comprehensive reconnaissance and asset discovery on {target}", + "Phase 2: Service enumeration and technology fingerprinting", + "Phase 3: Vulnerability scanning and identification", + "Phase 4: Web application security testing (if applicable)", + "Phase 5: Network service exploitation attempts", + "Phase 6: Post-exploitation and privilege escalation testing", + "Phase 7: Document findings and provide comprehensive remediation guidance" + ] + } + } + + return workflows + +def get_workflow_by_key(workflow_key): + """Get a specific workflow by its key""" + workflows = get_available_workflows() + return workflows.get(workflow_key, None) + +def list_workflow_names(): + """Get a list of all workflow names for display""" + workflows = get_available_workflows() + return [(key, workflow["name"]) for key, workflow in workflows.items()] \ No newline at end of file