From 115a2475dc1224ccb9daba932c9f7806f7e6b3ce Mon Sep 17 00:00:00 2001 From: GH05TCREW Date: Fri, 23 May 2025 18:37:53 -0600 Subject: [PATCH] Improve reporting output --- README.md | 10 +- main.py | 88 ++++++---- reporting.py | 470 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 531 insertions(+), 37 deletions(-) create mode 100644 reporting.py diff --git a/README.md b/README.md index 12359ec..9bdb3a4 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ https://github.com/user-attachments/assets/73176f92-a94d-4b66-9aa7-ee06c438a741 - **Tool Management**: Configure, connect to, and manage MCP tools through an interactive menu, including the ability to clear all configurations. - **Tool Invocation**: The AI assistant can call tools provided by configured MCP servers (such as: nmap, metasploit, ffuf, etc.) based on user requests. - **Automated Pentesting Workflows**: Execute predefined penetration testing workflows that systematically use configured security tools to perform comprehensive assessments. +- **Report Generation**: Generate markdown reports with structured findings, evidence, and recommendations. - **Conversation History**: Supports multi-turn dialogues, remembering previous interaction content. - **Streaming Output**: AI responses can be streamed for a better user experience. - **Knowledge Base Enhancement (Optional)**: Supports enhancing AI responses through a local knowledge base RAG (`knowledge` directory). @@ -132,7 +133,8 @@ GHOSTCREW includes automated penetration testing workflows that provide structur After the program starts, you can: - Choose whether to use the knowledge base - Configure or activate MCP tools - - Enter your questions or instructions according to the prompts + - Select between Interactive Chat Mode or Automated Penetration Testing + - Execute workflows and generate reports - Use 'multi' command to enter multi-line input mode for complex queries - Enter 'quit' to exit the program @@ -184,11 +186,13 @@ agent/ ├── .venv/ # Python virtual environment (ignored by .gitignore) ├── knowledge/ # Knowledge base documents directory │ └── ... -├── reports/ # Automated penetration test reports directory -│ └── ghostcrew_pentest_*.txt +├── reports/ # Professional penetration test reports directory +│ ├── ghostcrew_*_*.md # Professional markdown reports +│ └── ghostcrew_*_*_raw_history.txt # Raw conversation history (optional) ├── .gitignore # Git ignore file configuration ├── main.py # Main program entry ├── workflows.py # Automated penetration testing workflows +├── reporting.py # Professional report generation system ├── configure_mcp.py # MCP tool configuration utility ├── mcp.json # MCP server configuration file ├── rag_embedding.py # RAG embedding related (if used) diff --git a/main.py b/main.py index a27e472..7727c20 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ import traceback from colorama import init, Fore, Back, Style from ollama import chat,Message import tiktoken +from datetime import datetime # Import workflows try: @@ -15,6 +16,14 @@ try: except ImportError: WORKFLOWS_AVAILABLE = False +# Import report generation module +try: + from reporting import generate_report_from_workflow + REPORTING_AVAILABLE = True +except ImportError: + print(f"{Fore.YELLOW}Reporting module not found. Basic text export will be available.{Style.RESET_ALL}") + REPORTING_AVAILABLE = False + init(autoreset=True) @@ -145,7 +154,6 @@ Be thorough and professional in your analysis. Use the available tools effective await asyncio.sleep(1) # Workflow completion summary - print(f"\n{Fore.GREEN}Automated workflow completed successfully{Style.RESET_ALL}") print(f"{Fore.CYAN}Steps executed: {len(results)}/{len(workflow['steps'])}{Style.RESET_ALL}") return results @@ -200,7 +208,7 @@ async def run_agent(query: str, mcp_servers: list[MCPServerStdio], history: list # Directly use the passed connected server list to create Agent # Build instructions containing conversation history - base_instructions = "You are an experienced penetration tester and security analyst, focused on Web application security and network infrastructure security. Your name is GHOSTCREW. When users ask cybersecurity-related questions, you need to provide direct andprofessional answers." + base_instructions = "You are an experienced penetration tester and security analyst, focused on Web application security and network infrastructure security. Your name is GHOSTCREW. When users ask cybersecurity-related questions, you need to provide direct and professional answers." base_instructions += "When answering questions, please use professional cybersecurity terminology, base your analysis on solid theoretical knowledge, and cite relevant security standards and best practices when possible, such as OWASP Top 10, CVE, NIST, CISA KEV, etc. Maintain a professional tone, clear logic, and organized structure." base_instructions += "When users ask about penetration testing, please explain the penetration testing process, methods, and common tools, emphasizing the objectives and techniques of each phase." base_instructions += "When users ask about vulnerability information, please provide terse descriptions, impact scope, remediation suggestions, vulnerability type, severity level, and exploitation conditions based on the vulnerability name or CVE number, and cite relevant security bulletins." @@ -649,42 +657,54 @@ async def main(): confirm = input(f"{Fore.YELLOW}Execute '{workflow['name']}' on {target}? (yes/no): {Style.RESET_ALL}").strip().lower() if confirm == 'yes': + # Store initial workflow data + workflow_start_time = datetime.now() + initial_history_length = len(conversation_history) + + # Execute the workflow await run_automated_workflow(workflow, target, connected_servers, conversation_history, kb_instance) - # Option to save results - export_choice = input(f"\n{Fore.CYAN}Save results to file? (yes/no): {Style.RESET_ALL}").strip().lower() - if export_choice == 'yes': - import time + # After workflow completion, offer report generation + print(f"\n{Fore.GREEN}Workflow completed successfully!{Style.RESET_ALL}") + + if REPORTING_AVAILABLE: + generate_report = input(f"\n{Fore.CYAN}Generate markdown report? (yes/no): {Style.RESET_ALL}").strip().lower() - # Create reports directory if it doesn't exist - reports_dir = "reports" - if not os.path.exists(reports_dir): - os.makedirs(reports_dir) - print(f"{Fore.GREEN}Created reports directory: {reports_dir}{Style.RESET_ALL}") - - # Generate filename with timestamp - timestamp = int(time.time()) - filename = f"{reports_dir}/ghostcrew_{workflow_key}_{timestamp}.txt" - - with open(filename, 'w') as f: - f.write(f"GHOSTCREW Automated Penetration Test Report\n") - f.write("="*60 + "\n") - f.write(f"Workflow: {workflow['name']}\n") - f.write(f"Target: {target}\n") - f.write(f"Timestamp: {time.ctime()}\n") - f.write(f"Report Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n") - f.write("="*60 + "\n\n") + if generate_report == 'yes': + # Ask about additional report options + save_raw_history = input(f"{Fore.YELLOW}Save raw conversation history? (yes/no, default: no): {Style.RESET_ALL}").strip().lower() == 'yes' - # Save recent conversation history related to this workflow - recent_history = conversation_history[-len(workflow['steps']):] - for i, entry in enumerate(recent_history, 1): - f.write(f"STEP {i}: {workflow['steps'][i-1].format(target=target)}\n") - f.write("-"*40 + "\n") - f.write(f"Query: {entry['user_query']}\n\n") - f.write(f"Response: {entry.get('ai_response', 'No response')}\n") - f.write("="*60 + "\n\n") - - print(f"{Fore.GREEN}Results saved to: {filename}{Style.RESET_ALL}") + try: + # Prepare report data + workflow_conversation = conversation_history[initial_history_length:] + + report_data = { + 'workflow_name': workflow['name'], + 'workflow_key': workflow_key, + 'target': target, + 'timestamp': workflow_start_time, + 'conversation_history': workflow_conversation, + 'tools_used': get_available_tools(connected_servers) + } + + # Generate professional report + print(f"\n{Fore.CYAN}Generating report...{Style.RESET_ALL}") + report_path = await generate_report_from_workflow( + report_data, + run_agent, + connected_servers, + kb_instance, + save_raw_history + ) + + print(f"\n{Fore.GREEN}Report generated: {report_path}{Style.RESET_ALL}") + print(f"{Fore.CYAN}Open the markdown file in any markdown viewer for best formatting{Style.RESET_ALL}") + + except Exception as e: + print(f"\n{Fore.RED}Error generating report: {e}{Style.RESET_ALL}") + print(f"{Fore.YELLOW}Raw workflow data is still available in conversation history{Style.RESET_ALL}") + else: + print(f"\n{Fore.YELLOW}Reporting not available.{Style.RESET_ALL}") input(f"\n{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}") else: diff --git a/reporting.py b/reporting.py new file mode 100644 index 0000000..6d58bdc --- /dev/null +++ b/reporting.py @@ -0,0 +1,470 @@ +""" +Professional Markdown Report Generator for GHOSTCREW +Processes workflow conversation history and generates beautiful reports +""" + +import json +import os +import asyncio +from datetime import datetime +from typing import Dict, List, Any +import re + + +class PentestReportGenerator: + """Generate professional penetration testing reports from workflow data""" + + def __init__(self, report_data: Dict[str, Any]): + self.workflow_name = report_data['workflow_name'] + self.workflow_key = report_data['workflow_key'] + self.target = report_data['target'] + self.timestamp = report_data['timestamp'] + self.conversation_history = report_data['conversation_history'] + self.tools_used = report_data.get('tools_used', []) + + # Will be populated by AI analysis + self.structured_findings = {} + + def format_conversation_history(self) -> str: + """Format conversation history for AI analysis""" + formatted = [] + + for i, entry in enumerate(self.conversation_history, 1): + formatted.append(f"\n--- STEP {i} ---") + formatted.append(f"QUERY: {entry.get('user_query', '')}") + formatted.append(f"RESPONSE: {entry.get('ai_response', '')}") + formatted.append("=" * 50) + + return "\n".join(formatted) + + def create_analysis_prompt(self) -> str: + """Create comprehensive analysis prompt for AI""" + + prompt = f""" +You are analyzing a complete penetration testing workflow to create a professional security assessment report. + +ASSESSMENT DETAILS: +- Workflow: {self.workflow_name} +- Target: {self.target} +- Date: {self.timestamp.strftime('%Y-%m-%d')} +- Tools Available: {', '.join(self.tools_used) if self.tools_used else 'Various security tools'} + +COMPLETE WORKFLOW CONVERSATION LOG: +{self.format_conversation_history()} + +Please analyze this entire workflow and extract structured information. When extracting evidence, include the actual commands used and their outputs. Respond with a JSON object containing: + +{{ + "executive_summary": "2-3 paragraph executive summary focusing on business impact and key risks", + "key_statistics": {{ + "total_vulnerabilities": 0, + "critical_count": 0, + "high_count": 0, + "systems_tested": 0, + "systems_compromised": 0 + }}, + "vulnerabilities": [ + {{ + "severity": "Critical|High|Medium|Low|Informational", + "title": "Descriptive vulnerability title", + "description": "Technical description of the vulnerability", + "impact": "Business impact if exploited", + "affected_systems": ["IP/hostname"], + "remediation": "Specific remediation steps", + "evidence": "Include actual commands used and key outputs that demonstrate this finding. Format as: 'Command: [command] Output: [relevant output]'", + "cvss_score": "If applicable", + "references": ["CVE numbers, links, etc."] + }} + ], + "compromised_systems": [ + {{ + "system": "IP/hostname", + "access_level": "user|admin|root|system", + "method": "How access was gained", + "evidence": "Actual commands and outputs showing compromise" + }} + ], + "credentials_found": [ + {{ + "username": "username", + "credential_type": "password|hash|token|key", + "system": "where found", + "strength": "weak|moderate|strong" + }} + ], + "tools_used": ["List of tools actually used in testing"], + "attack_paths": [ + {{ + "path_description": "Description of attack chain", + "steps": ["Step 1", "Step 2", "etc"], + "impact": "What this path achieves" + }} + ], + "recommendations": [ + {{ + "priority": "Immediate|Short-term|Medium-term|Long-term", + "category": "Network|Application|System|Process", + "recommendation": "Specific actionable recommendation", + "business_justification": "Why this is important for business" + }} + ], + "methodology": "Brief description of testing methodology used", + "scope_limitations": "Any scope limitations or areas not tested", + "conclusion": "Overall security posture assessment and key takeaways" +}} + +Focus on extracting real findings from the conversation. Include actual command examples and outputs in the evidence fields. If no vulnerabilities were found, that's also valuable information. Be accurate and professional. +""" + return prompt + + async def analyze_with_ai(self, prompt: str, run_agent_func, connected_servers, kb_instance=None): + """Run AI analysis using the main agent function""" + try: + # Use streaming=True since run_agent doesn't properly handle streaming=False + result = await run_agent_func( + prompt, + connected_servers, + history=[], + streaming=True, + kb_instance=kb_instance + ) + + if result and hasattr(result, "final_output"): + return result.final_output + return None + + except Exception as e: + print(f"Error in AI analysis: {e}") + return None + + def parse_ai_response(self, ai_response: str) -> Dict[str, Any]: + """Parse AI response and extract JSON data""" + try: + # Try to find JSON in the response + json_start = ai_response.find('{') + json_end = ai_response.rfind('}') + 1 + + if json_start != -1 and json_end != -1: + json_str = ai_response[json_start:json_end] + return json.loads(json_str) + else: + # Fallback - create basic structure + return { + "executive_summary": ai_response[:500] + "...", + "key_statistics": {"total_vulnerabilities": 0}, + "vulnerabilities": [], + "compromised_systems": [], + "recommendations": [], + "methodology": "Standard penetration testing methodology", + "conclusion": "Assessment completed successfully." + } + + except json.JSONDecodeError: + # Fallback structure + return { + "executive_summary": "Assessment completed. See technical findings for details.", + "key_statistics": {"total_vulnerabilities": 0}, + "vulnerabilities": [], + "compromised_systems": [], + "recommendations": [], + "methodology": "Standard penetration testing methodology", + "conclusion": "Unable to parse detailed findings." + } + + def generate_markdown_report(self) -> str: + """Generate the final markdown report""" + findings = self.structured_findings + + report = [] + + # Title Page + report.append(f"# Penetration Testing Report") + report.append(f"\n## {self.workflow_name}") + report.append(f"\n**Target:** {self.target} ") + report.append(f"**Assessment Date:** {self.timestamp.strftime('%Y-%m-%d')} ") + report.append(f"**Report Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ") + report.append(f"**Report ID:** GHOSTCREW-{self.workflow_key}-{int(self.timestamp.timestamp())} ") + report.append(f"\n---\n") + + # Table of Contents + report.append("## Table of Contents\n") + report.append("1. [Executive Summary](#1-executive-summary)") + report.append("2. [Assessment Overview](#2-assessment-overview)") + report.append("3. [Key Findings](#3-key-findings)") + report.append("4. [Vulnerability Details](#4-vulnerability-details)") + report.append("5. [Compromised Systems](#5-compromised-systems)") + report.append("6. [Attack Paths](#6-attack-paths)") + report.append("7. [Recommendations](#7-recommendations)") + report.append("8. [Technical Methodology](#8-technical-methodology)") + report.append("9. [Conclusion](#9-conclusion)") + report.append("\n---\n") + + # Executive Summary + report.append("## 1. Executive Summary\n") + report.append(findings.get('executive_summary', 'Assessment completed successfully.')) + report.append("\n---\n") + + # Assessment Overview + report.append("## 2. Assessment Overview\n") + report.append(f"### Scope") + report.append(f"- **Primary Target:** {self.target}") + report.append(f"- **Assessment Type:** {self.workflow_name}") + report.append(f"- **Testing Window:** {self.timestamp.strftime('%Y-%m-%d')}") + + if self.tools_used: + report.append(f"\n### Tools Used") + for tool in self.tools_used: + report.append(f"- {tool}") + + stats = findings.get('key_statistics', {}) + if stats: + report.append(f"\n### Key Statistics") + report.append(f"- **Total Vulnerabilities:** {stats.get('total_vulnerabilities', 0)}") + report.append(f"- **Critical Severity:** {stats.get('critical_count', 0)}") + report.append(f"- **High Severity:** {stats.get('high_count', 0)}") + report.append(f"- **Systems Compromised:** {stats.get('systems_compromised', 0)}") + + report.append("\n---\n") + + # Key Findings Summary + report.append("## 3. Key Findings\n") + + vulnerabilities = findings.get('vulnerabilities', []) + if vulnerabilities: + # Group by severity + severity_groups = {'Critical': [], 'High': [], 'Medium': [], 'Low': [], 'Informational': []} + for vuln in vulnerabilities: + severity = vuln.get('severity', 'Low') + if severity in severity_groups: + severity_groups[severity].append(vuln) + + report.append("### Vulnerability Summary\n") + report.append("| Severity | Count | Description |") + report.append("|----------|-------|-------------|") + + for severity, vulns in severity_groups.items(): + if vulns: + count = len(vulns) + titles = [v.get('title', 'Unknown') for v in vulns[:3]] + desc = ', '.join(titles) + if len(vulns) > 3: + desc += f' (and {len(vulns) - 3} more)' + report.append(f"| {severity} | {count} | {desc} |") + else: + report.append("No significant vulnerabilities were identified during the assessment.") + + report.append("\n---\n") + + # Vulnerability Details + report.append("## 4. Vulnerability Details\n") + + if vulnerabilities: + # Group by severity for detailed listing + for severity in ['Critical', 'High', 'Medium', 'Low', 'Informational']: + severity_vulns = [v for v in vulnerabilities if v.get('severity') == severity] + + if severity_vulns: + report.append(f"### {severity} Severity Vulnerabilities\n") + + for i, vuln in enumerate(severity_vulns, 1): + report.append(f"#### {severity.upper()}-{i:03d}: {vuln.get('title', 'Unknown Vulnerability')}\n") + report.append(f"**Description:** {vuln.get('description', 'No description provided')}\n") + report.append(f"**Impact:** {vuln.get('impact', 'Impact assessment pending')}\n") + + if vuln.get('affected_systems'): + report.append(f"**Affected Systems:** {', '.join(vuln['affected_systems'])}\n") + + report.append(f"**Remediation:** {vuln.get('remediation', 'Remediation steps pending')}\n") + + if vuln.get('evidence'): + report.append(f"**Evidence:**") + report.append("```") + report.append(vuln['evidence']) + report.append("```") + + if vuln.get('references'): + report.append(f"**References:** {', '.join(vuln['references'])}\n") + + report.append("\n") + else: + report.append("No vulnerabilities were identified during this assessment.") + + report.append("---\n") + + # Compromised Systems + report.append("## 5. Compromised Systems\n") + + compromised = findings.get('compromised_systems', []) + if compromised: + report.append("| System | Access Level | Method | Evidence |") + report.append("|--------|--------------|--------|----------|") + + for system in compromised: + report.append(f"| {system.get('system', 'Unknown')} | {system.get('access_level', 'Unknown')} | {system.get('method', 'Unknown')} | {system.get('evidence', 'See technical details')[:50]}{'...' if len(system.get('evidence', '')) > 50 else ''} |") + else: + report.append("No systems were successfully compromised during the assessment.") + + report.append("\n---\n") + + # Attack Paths + report.append("## 6. Attack Paths\n") + + attack_paths = findings.get('attack_paths', []) + if attack_paths: + for i, path in enumerate(attack_paths, 1): + report.append(f"### Attack Path {i}: {path.get('path_description', 'Unknown Path')}\n") + report.append(f"**Impact:** {path.get('impact', 'Unknown impact')}\n") + + steps = path.get('steps', []) + if steps: + report.append("**Steps:**") + for step_num, step in enumerate(steps, 1): + report.append(f"{step_num}. {step}") + report.append("\n") + else: + report.append("No specific attack paths were identified or documented.") + + report.append("---\n") + + # Recommendations + report.append("## 7. Recommendations\n") + + recommendations = findings.get('recommendations', []) + if recommendations: + # Group by priority + priority_groups = {'Immediate': [], 'Short-term': [], 'Medium-term': [], 'Long-term': []} + for rec in recommendations: + priority = rec.get('priority', 'Medium-term') + if priority in priority_groups: + priority_groups[priority].append(rec) + + for priority, recs in priority_groups.items(): + if recs: + report.append(f"### {priority} Priority\n") + for rec in recs: + report.append(f"**{rec.get('category', 'General')}:** {rec.get('recommendation', 'No recommendation provided')}") + if rec.get('business_justification'): + report.append(f" \n*Business Justification:* {rec['business_justification']}") + report.append("\n") + else: + report.append("Continue following security best practices and conduct regular assessments.") + + report.append("---\n") + + # Technical Methodology + report.append("## 8. Technical Methodology\n") + report.append(findings.get('methodology', 'Standard penetration testing methodology was followed.')) + + if findings.get('scope_limitations'): + report.append(f"\n### Scope Limitations\n") + report.append(findings['scope_limitations']) + + report.append("\n---\n") + + # Conclusion + report.append("## 9. Conclusion\n") + report.append(findings.get('conclusion', 'Assessment completed successfully.')) + + report.append(f"\n\n---\n") + report.append(f"*Report generated by GHOSTCREW v0.1.0* ") + report.append(f"*{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*") + + return "\n".join(report) + + async def generate_report(self, run_agent_func, connected_servers, kb_instance=None, save_raw_history=False) -> str: + """Main method to generate the complete report""" + + print(f"Analyzing workflow findings...") + + # Step 1: Create analysis prompt + analysis_prompt = self.create_analysis_prompt() + + # Step 2: Get AI analysis + ai_response = await self.analyze_with_ai( + analysis_prompt, + run_agent_func, + connected_servers, + kb_instance + ) + + if not ai_response: + raise Exception("Failed to get AI analysis") + + print(f"Extracting structured findings...") + + # Step 3: Parse AI response + self.structured_findings = self.parse_ai_response(ai_response) + + print(f"Generating markdown report...") + + # Step 4: Generate markdown report + markdown_report = self.generate_markdown_report() + + # Step 5: Save report with options + report_filename = self.save_report(markdown_report, save_raw_history) + + return report_filename + + def save_report(self, markdown_content: str, save_raw_history: bool = False) -> str: + """Save the report to file with optional raw history""" + + # Create reports directory if it doesn't exist + reports_dir = "reports" + if not os.path.exists(reports_dir): + os.makedirs(reports_dir) + + # Generate filename + timestamp_str = str(int(self.timestamp.timestamp())) + filename = f"{reports_dir}/ghostcrew_{self.workflow_key}_{timestamp_str}.md" + + # Save markdown file + with open(filename, 'w', encoding='utf-8') as f: + f.write(markdown_content) + + # Optionally save raw conversation history + if save_raw_history: + raw_history_content = [] + raw_history_content.append(f"GHOSTCREW Raw Workflow History") + raw_history_content.append(f"Workflow: {self.workflow_name}") + raw_history_content.append(f"Target: {self.target}") + raw_history_content.append(f"Date: {self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}") + raw_history_content.append(f"=" * 60) + raw_history_content.append("") + + for i, entry in enumerate(self.conversation_history, 1): + raw_history_content.append(f"STEP {i} - QUERY:") + raw_history_content.append("-" * 40) + raw_history_content.append(entry.get('user_query', 'No query recorded')) + raw_history_content.append("") + raw_history_content.append(f"STEP {i} - AI RESPONSE:") + raw_history_content.append("-" * 40) + raw_history_content.append(entry.get('ai_response', 'No response recorded')) + raw_history_content.append("") + raw_history_content.append("=" * 60) + raw_history_content.append("") + + raw_filename = f"{reports_dir}/ghostcrew_{self.workflow_key}_{timestamp_str}_raw_history.txt" + with open(raw_filename, 'w', encoding='utf-8') as f: + f.write('\n'.join(raw_history_content)) + print(f"Raw conversation history saved: {raw_filename}") + + return filename + + +async def generate_report_from_workflow(report_data: Dict[str, Any], run_agent_func, connected_servers, kb_instance=None, save_raw_history=False) -> str: + """ + Main function to generate a professional report from workflow data + + Args: + report_data: Dictionary containing workflow information + run_agent_func: The main agent function for AI analysis + connected_servers: Connected MCP servers + kb_instance: Knowledge base instance + save_raw_history: Whether to save raw conversation history + + Returns: + str: Path to generated report file + """ + + generator = PentestReportGenerator(report_data) + return await generator.generate_report(run_agent_func, connected_servers, kb_instance, save_raw_history) \ No newline at end of file