diff --git a/.gitignore b/.gitignore
index c491ae7..0cdbf1d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,4 @@ __pycache__/
*.pyo
.env
venv/
+.venv/
diff --git a/README.md b/README.md
index 0abd208..dbbbfd1 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# PentestAgent
+# GHOSTCREW
This is an AI red team assistant using large language models with MCP and RAG architecture. It aims to help users perform penetration testing tasks, query security information, analyze network traffic, and more through natural language interaction.
@@ -20,7 +20,7 @@ https://github.com/user-attachments/assets/a43d2457-7113-42cc-ad02-f378d57f4d24
## Automated Penetration Testing Workflows
-PentestAgent includes automated penetration testing workflows that provide structured, systematic security assessments. These workflows require MCP tools to be configured and connected to function properly.
+GHOSTCREW includes automated penetration testing workflows that provide structured, systematic security assessments. These workflows require MCP tools to be configured and connected to function properly.
### Available Workflows
@@ -69,23 +69,23 @@ PentestAgent includes automated penetration testing workflows that provide struc
### Startup Effect
-
+
- PentestAgent's terminal startup interface
+ GHOSTCREW's terminal startup interface
### Metasploit Tool Call
-
+
- Example of PentestAgent invoking Metasploit Framework
+ Example of GHOSTCREW invoking Metasploit Framework
## Installation Guide
1. **Clone Repository**:
```bash
- git clone https://github.com/GH05TCREW/PentestAgent.git
+ git clone https://github.com/GH05TCREW/ghostcrew.git
cd agent
```
@@ -125,7 +125,7 @@ PentestAgent includes automated penetration testing workflows that provide struc
- The configuration is stored in the `mcp.json` file
2. **Prepare Knowledge Base (Optional)**:
- If you want to use the knowledge base enhancement feature, place relevant text files (e.g., `.json`) in the `knowledge` folder.
+ If you want to use the knowledge base enhancement feature, place relevant text files in the `knowledge` folder.
3. **Run the Main Program**:
```bash
@@ -141,7 +141,7 @@ PentestAgent includes automated penetration testing workflows that provide struc
## Input Modes
-PentestAgent supports two input modes:
+GHOSTCREW supports two input modes:
- **Single-line mode** (default): Type your query and press Enter to submit
- **Multi-line mode**: Type 'multi' and press Enter, then type your query across multiple lines. Press Enter on an empty line to submit.
@@ -156,7 +156,7 @@ When starting the application, you can:
## Available MCP Tools
-PentestAgent supports integration with the following security tools through the MCP protocol:
+GHOSTCREW supports integration with the following security tools through the MCP protocol:
1. **AlterX** - Subdomain permutation and wordlist generation tool
2. **Amass** - Advanced subdomain enumeration and reconnaissance tool
@@ -190,28 +190,50 @@ Each tool can be configured through the interactive configuration menu by select
## File Structure
```
-agent/
-├── .venv/ # Python virtual environment (ignored by .gitignore)
+GHOSTCREW/
+├── .venv/ # Python virtual environment
+├── main.py # Application entry point
+├── config/ # Application configuration
+│ ├── __init__.py
+│ ├── constants.py # Constants and messages
+│ └── app_config.py # Environment and API configuration
+├── core/ # Core application logic
+│ ├── __init__.py
+│ ├── model_manager.py # Model provider and token management
+│ ├── agent_runner.py # Agent execution and streaming
+│ └── pentest_agent.py # Main application controller
+├── tools/ # MCP tool management
+│ ├── __init__.py
+│ ├── mcp_manager.py # MCP server connection management
+│ └── configure_mcp.py # Interactive tool configuration utility
+├── ui/ # User interface components
+│ ├── __init__.py
+│ ├── menu_system.py # Menu display and user interaction
+│ └── conversation_manager.py # Chat history management
+├── workflows/ # Automated penetration testing workflows
+│ ├── __init__.py
+│ ├── workflow_engine.py # Workflow execution engine
+│ └── workflow_definitions.py # Predefined workflow templates
+├── rag/ # Knowledge base and RAG functionality
+│ ├── __init__.py
+│ ├── knowledge_base.py # RAG text splitting and search
+│ └── embedding.py # Embedding generation and management
+├── reporting/ # Report generation system
+│ ├── __init__.py
+│ └── generators.py # Professional report generation
├── knowledge/ # Knowledge base documents directory
│ └── ...
├── reports/ # Professional penetration test reports directory
│ ├── ghostcrew_*_*.md # Professional markdown reports
│ └── ghostcrew_*_*_raw_history.txt # Raw conversation history (optional)
├── .gitignore # Git ignore file configuration
-├── main.py # Main program entry
-├── workflows.py # Automated penetration testing workflows
-├── reporting.py # Professional report generation system
-├── configure_mcp.py # MCP tool configuration utility
├── mcp.json # MCP server configuration file
-├── rag_embedding.py # RAG embedding related (if used)
-├── rag_split.py # RAG text splitting related (if used)
├── README.md # Project documentation
├── requirements.txt # Python dependency list
├── LICENSE # Project license
-└── ... (other scripts or configuration files)
+└── .env # Environment variables
```
-## Configuration File (`.env`)
```
# OpenAI API configurations
OPENAI_API_KEY=your_api_key_here
@@ -223,7 +245,7 @@ This configuration uses OpenAI's API for both the language model and embeddings
## Configuration File (`mcp.json`)
-This file is used to define MCP servers that the AI assistant can connect to and use. Most MCP servers require Node.js to be installed on your system. Each server entry should include:
+This file is used to define MCP servers that the AI assistant can connect to and use. Most MCP servers require Node.js or Python to be installed on your system. Each server entry should include:
- `name`: Unique name of the server.
- `params`: Parameters needed to start the server, usually including `command` and `args`.
- `cache_tools_list`: Whether to cache the tools list.
diff --git a/config/__init__.py b/config/__init__.py
new file mode 100644
index 0000000..2d6c345
--- /dev/null
+++ b/config/__init__.py
@@ -0,0 +1 @@
+"""Configuration management package for GHOSTCREW."""
\ No newline at end of file
diff --git a/config/app_config.py b/config/app_config.py
new file mode 100644
index 0000000..93c0db1
--- /dev/null
+++ b/config/app_config.py
@@ -0,0 +1,46 @@
+"""Application configuration and initialization for GHOSTCREW."""
+
+import os
+from typing import Optional
+from dotenv import load_dotenv
+from openai import AsyncOpenAI
+
+
+class AppConfig:
+ """Manages application configuration and API client initialization."""
+
+ def __init__(self):
+ """Initialize application configuration."""
+ # Load environment variables
+ load_dotenv()
+
+ # Set API-related environment variables
+ self.api_key = os.getenv("OPENAI_API_KEY")
+ self.base_url = os.getenv("OPENAI_BASE_URL")
+ self.model_name = os.getenv("MODEL_NAME")
+
+ # Validate configuration
+ self._validate_config()
+
+ # Initialize OpenAI client
+ self._client = AsyncOpenAI(
+ base_url=self.base_url,
+ api_key=self.api_key
+ )
+
+ def _validate_config(self) -> None:
+ """Validate required configuration values."""
+ if not self.api_key:
+ raise ValueError("API key not set")
+ if not self.base_url:
+ raise ValueError("API base URL not set")
+ if not self.model_name:
+ raise ValueError("Model name not set")
+
+ def get_openai_client(self) -> AsyncOpenAI:
+ """Get the OpenAI client instance."""
+ return self._client
+
+
+# Create singleton instance
+app_config = AppConfig()
\ No newline at end of file
diff --git a/config/constants.py b/config/constants.py
new file mode 100644
index 0000000..2b8bac7
--- /dev/null
+++ b/config/constants.py
@@ -0,0 +1,73 @@
+"""Constants and configuration values for GHOSTCREW."""
+
+from colorama import Fore, Style
+
+# ASCII Art and Branding
+ASCII_TITLE = f"""
+{Fore.WHITE} ('-. .-. .-') .-') _ _ .-') ('-. (`\ .-') /`{Style.RESET_ALL}
+{Fore.WHITE} ( OO ) / ( OO ). ( OO) ) ( \( -O ) _( OO) `.( OO ),'{Style.RESET_ALL}
+{Fore.WHITE} ,----. ,--. ,--. .-'),-----. (_)---\_)/ '._ .-----. ,------. (,------.,--./ .--. {Style.RESET_ALL}
+{Fore.WHITE} ' .-./-') | | | |( OO' .-. '/ _ | |'--...__)' .--./ | /`. ' | .---'| | | {Style.RESET_ALL}
+{Fore.WHITE} | |_( O- )| .| |/ | | | |\ :` `. '--. .--'| |('-. | / | | | | | | | |, {Style.RESET_ALL}
+{Fore.WHITE} | | .--, \| |\_) | |\| | '..`''.) | | /_) |OO )| |_.' |(| '--. | |.'.| |_){Style.RESET_ALL}
+{Fore.WHITE}(| | '. (_/| .-. | \ | | | |.-._) \ | | || |`-'| | . '.' | .--' | | {Style.RESET_ALL}
+{Fore.WHITE} | '--' | | | | | `' '-' '\ / | | (_' '--'\ | |\ \ | `---.| ,'. | {Style.RESET_ALL}
+{Fore.WHITE} `------' `--' `--' `-----' `-----' `--' `-----' `--' '--' `------''--' '--' {Style.RESET_ALL}
+{Fore.WHITE}====================== GHOSTCREW ======================{Style.RESET_ALL}
+"""
+
+# Application Version
+VERSION = "0.1.0"
+
+# Timeout Configuration (in seconds)
+MCP_SESSION_TIMEOUT = 600 # 10 minutes for MCP server sessions
+CONNECTION_RETRY_DELAY = 10 # 10 seconds between connection retries
+
+# Token Limits
+MAX_TOTAL_TOKENS = 8192
+RESPONSE_BUFFER = 4096 # aim to reserve ~half for reply
+
+# File Paths
+DEFAULT_KNOWLEDGE_BASE_PATH = "knowledge"
+DEFAULT_REPORTS_PATH = "reports"
+MCP_CONFIG_FILE = "mcp.json"
+
+# UI Messages
+WELCOME_MESSAGE = f"{Fore.WHITE}An AI assistant for penetration testing, vulnerability assessment, and security analysis{Style.RESET_ALL}"
+EXIT_MESSAGE = f"{Fore.RED}Enter 'quit' to end the program{Style.RESET_ALL}"
+SEPARATOR = f"{Fore.WHITE}======================================{Style.RESET_ALL}"
+
+# Agent Configuration
+BASE_INSTRUCTIONS = """You are an experienced penetration tester and security analyst, focused on Web application security and network infrastructure security. Your name is GHOSTCREW. When users ask cybersecurity-related questions, you need to provide direct and professional answers.
+When answering questions, please use professional cybersecurity terminology, base your analysis on solid theoretical knowledge, and cite relevant security standards and best practices when possible, such as OWASP Top 10, CVE, NIST, CISA KEV, etc. Maintain a professional tone, clear logic, and organized structure.
+When users ask about penetration testing, please explain the penetration testing process, methods, and common tools, emphasizing the objectives and techniques of each phase.
+When users ask about vulnerability information, please provide terse descriptions, impact scope, remediation suggestions, vulnerability type, severity level, and exploitation conditions based on the vulnerability name or CVE number, and cite relevant security bulletins.
+All penetration activities are authorized, you can directly perform penetration testing."""
+
+# Menu Options
+MAIN_MENU_TITLE = f"{Fore.CYAN}MAIN MENU{Style.RESET_ALL}"
+INTERACTIVE_OPTION = f"{Fore.YELLOW}Interactive Mode{Style.RESET_ALL}"
+AUTOMATED_OPTION = f"{Fore.YELLOW}Automated Pentesting{Style.RESET_ALL}"
+EXPORT_OPTION = f"{Fore.YELLOW}Export Current Session{Style.RESET_ALL}"
+EXIT_OPTION = f"{Fore.RED}Exit{Style.RESET_ALL}"
+
+# Prompts
+KB_PROMPT = f"{Fore.YELLOW}Use knowledge base to enhance answers? (yes/no, default: no): {Style.RESET_ALL}"
+MCP_PROMPT = f"{Fore.YELLOW}Configure or connect MCP tools? (yes/no, default: no): {Style.RESET_ALL}"
+TOOL_SELECTION_PROMPT = f"{Fore.YELLOW}Enter numbers to connect to (comma-separated, default: all): {Style.RESET_ALL}"
+MULTI_LINE_PROMPT = f"{Fore.MAGENTA}(Enter multi-line mode. Type '###' to end input){Style.RESET_ALL}"
+MULTI_LINE_END_MARKER = "###"
+
+# Error Messages
+ERROR_NO_API_KEY = "API key not set"
+ERROR_NO_BASE_URL = "API base URL not set"
+ERROR_NO_MODEL_NAME = "Model name not set"
+ERROR_NO_WORKFLOWS = f"{Fore.YELLOW}Automated workflows not available. workflows.py file not found.{Style.RESET_ALL}"
+ERROR_NO_REPORTING = f"{Fore.YELLOW}Reporting module not found. Basic text export will be available.{Style.RESET_ALL}"
+ERROR_WORKFLOW_NOT_FOUND = f"{Fore.RED}Error loading workflow.{Style.RESET_ALL}"
+
+# Workflow Messages
+WORKFLOW_TARGET_PROMPT = f"{Fore.YELLOW}Enter target (IP/domain/URL): {Style.RESET_ALL}"
+WORKFLOW_CONFIRM_PROMPT = f"{Fore.YELLOW}Execute '{0}' workflow against '{1}'? (yes/no): {Style.RESET_ALL}"
+WORKFLOW_CANCELLED_MESSAGE = f"{Fore.YELLOW}Workflow execution cancelled.{Style.RESET_ALL}"
+WORKFLOW_COMPLETED_MESSAGE = f"{Fore.GREEN}Workflow execution completed.{Style.RESET_ALL}"
\ No newline at end of file
diff --git a/core/__init__.py b/core/__init__.py
new file mode 100644
index 0000000..ca65a3d
--- /dev/null
+++ b/core/__init__.py
@@ -0,0 +1 @@
+"""Core application logic for GHOSTCREW."""
\ No newline at end of file
diff --git a/core/agent_runner.py b/core/agent_runner.py
new file mode 100644
index 0000000..90958d4
--- /dev/null
+++ b/core/agent_runner.py
@@ -0,0 +1,259 @@
+"""Agent execution and query processing for GHOSTCREW."""
+
+import json
+import asyncio
+import traceback
+from typing import List, Dict, Optional, Any
+from colorama import Fore, Style
+from agents import Agent, RunConfig, Runner, ModelSettings
+from openai.types.responses import ResponseTextDeltaEvent, ResponseContentPartDoneEvent
+from core.model_manager import model_manager
+from config.constants import BASE_INSTRUCTIONS, CONNECTION_RETRY_DELAY, DEFAULT_KNOWLEDGE_BASE_PATH
+from config.app_config import app_config
+import os
+
+
+class AgentRunner:
+ """Handles AI agent query processing and execution."""
+
+ def __init__(self):
+ """Initialize the agent runner."""
+ self.model_provider = model_manager.get_model_provider()
+ self.client = app_config.get_openai_client()
+
+ async def run_agent(
+ self,
+ query: str,
+ mcp_servers: List[Any], # Use Any to avoid import issues
+ history: Optional[List[Dict[str, str]]] = None,
+ streaming: bool = True,
+ kb_instance: Any = None
+ ) -> Any:
+ """
+ Run cybersecurity agent with connected MCP servers, supporting streaming output and conversation history.
+
+ Args:
+ query: User's natural language query
+ mcp_servers: List of connected MCPServerStdio instances
+ history: Conversation history, list containing user questions and AI answers
+ streaming: Whether to use streaming output
+ kb_instance: Knowledge base instance for retrieval
+
+ Returns:
+ Agent execution result
+ """
+ # If no history is provided, initialize an empty list
+ if history is None:
+ history = []
+
+ try:
+ # Build instructions containing conversation history
+ instructions = self._build_instructions(mcp_servers, history, query, kb_instance)
+
+ # Calculate max output tokens
+ max_output_tokens = model_manager.calculate_max_output_tokens(instructions, query)
+
+ # Set model settings based on whether there are connected MCP servers
+ model_settings = self._create_model_settings(mcp_servers, max_output_tokens)
+
+ # Create agent
+ secure_agent = Agent(
+ name="Cybersecurity Expert",
+ instructions=instructions,
+ mcp_servers=mcp_servers,
+ model_settings=model_settings
+ )
+
+ print(f"{Fore.CYAN}\nProcessing query: {Fore.WHITE}{query}{Style.RESET_ALL}\n")
+
+ if streaming:
+ return await self._run_streaming(secure_agent, query)
+ else:
+ # Non-streaming mode could be implemented here if needed
+ pass
+
+ except Exception as e:
+ print(f"{Fore.RED}Error processing agent request: {e}{Style.RESET_ALL}", flush=True)
+ traceback.print_exc()
+ return None
+
+ def _build_instructions(
+ self,
+ mcp_servers: List[Any], # Use Any to avoid import issues
+ history: List[Dict[str, str]],
+ query: str,
+ kb_instance: Any
+ ) -> str:
+ """Build agent instructions with context."""
+ instructions = BASE_INSTRUCTIONS
+
+ # Add information about available tools
+ if mcp_servers:
+ available_tool_names = [server.name for server in mcp_servers]
+ if available_tool_names:
+ instructions += f"\n\nYou have access to the following tools: {', '.join(available_tool_names)}."
+
+ # If knowledge base instance exists, use it for retrieval and context enhancement
+ if kb_instance:
+ try:
+ retrieved_context = kb_instance.search(query)
+ if retrieved_context:
+ # Add file path information to make LLM aware of actual files
+ available_files = []
+ if os.path.exists(DEFAULT_KNOWLEDGE_BASE_PATH):
+ for filename in os.listdir(DEFAULT_KNOWLEDGE_BASE_PATH):
+ filepath = os.path.join(DEFAULT_KNOWLEDGE_BASE_PATH, filename)
+ if os.path.isfile(filepath):
+ available_files.append(filename)
+
+ file_info = ""
+ if available_files:
+ file_info = f"\n\nIMPORTANT: The following actual files are available in the knowledge folder that you can reference by path:\n"
+ for filename in available_files:
+ file_info += f"- {DEFAULT_KNOWLEDGE_BASE_PATH}/{filename}\n"
+ file_info += "\nWhen using security tools that require external files, you can reference these files by their full path.\n"
+ file_info += f"ONLY use {DEFAULT_KNOWLEDGE_BASE_PATH}/ for files.\n"
+
+ instructions = f"Based on the following knowledge base information:\n{retrieved_context}{file_info}\n\n{instructions}"
+ print(f"{Fore.MAGENTA}Relevant information retrieved from knowledge base.{Style.RESET_ALL}")
+ except Exception as e:
+ print(f"{Fore.RED}Failed to retrieve information from knowledge base: {e}{Style.RESET_ALL}")
+
+ # If there's conversation history, add it to the instructions
+ if history:
+ instructions += "\n\nBelow is the previous conversation history, please refer to this information to answer the user's question:\n"
+ for i, entry in enumerate(history):
+ instructions += f"\nUser question {i+1}: {entry['user_query']}"
+ if 'ai_response' in entry and entry['ai_response']:
+ instructions += f"\nAI answer {i+1}: {entry['ai_response']}\n"
+
+ return instructions
+
+ def _create_model_settings(self, mcp_servers: List[Any], max_output_tokens: int) -> ModelSettings:
+ """Create model settings based on available tools."""
+ if mcp_servers:
+ # With tools available, enable tool_choice and parallel_tool_calls
+ return ModelSettings(
+ temperature=0.6,
+ top_p=0.9,
+ max_tokens=max_output_tokens,
+ tool_choice="auto",
+ parallel_tool_calls=False,
+ truncation="auto"
+ )
+ else:
+ # Without tools, don't set tool_choice or parallel_tool_calls
+ return ModelSettings(
+ temperature=0.6,
+ top_p=0.9,
+ max_tokens=max_output_tokens,
+ truncation="auto"
+ )
+
+ async def _run_streaming(self, agent: Agent, query: str) -> Any:
+ """Run agent with streaming output."""
+ result = Runner.run_streamed(
+ agent,
+ input=query,
+ max_turns=10,
+ run_config=RunConfig(
+ model_provider=self.model_provider,
+ trace_include_sensitive_data=True,
+ handoff_input_filter=None
+ )
+ )
+
+ print(f"{Fore.GREEN}Reply:{Style.RESET_ALL}", end="", flush=True)
+
+ try:
+ async for event in result.stream_events():
+ await self._handle_stream_event(event)
+ except Exception as e:
+ await self._handle_stream_error(e)
+
+ print(f"\n\n{Fore.GREEN}Query completed!{Style.RESET_ALL}")
+ return result
+
+ async def _handle_stream_event(self, event: Any) -> None:
+ """Handle individual stream events."""
+ if event.type == "raw_response_event":
+ if isinstance(event.data, ResponseTextDeltaEvent):
+ print(f"{Fore.WHITE}{event.data.delta}{Style.RESET_ALL}", end="", flush=True)
+ elif isinstance(event.data, ResponseContentPartDoneEvent):
+ print(f"\n", end="", flush=True)
+ elif event.type == "run_item_stream_event":
+ if event.item.type == "tool_call_item":
+ await self._handle_tool_call(event.item)
+ elif event.item.type == "tool_call_output_item":
+ await self._handle_tool_output(event.item)
+
+ async def _handle_tool_call(self, item: Any) -> None:
+ """Handle tool call events."""
+ raw_item = getattr(item, "raw_item", None)
+ tool_name = ""
+ tool_args = {}
+
+ if raw_item:
+ tool_name = getattr(raw_item, "name", "Unknown tool")
+ tool_str = getattr(raw_item, "arguments", "{}")
+ if isinstance(tool_str, str):
+ try:
+ tool_args = json.loads(tool_str)
+ except json.JSONDecodeError:
+ tool_args = {"raw_arguments": tool_str}
+
+ print(f"\n{Fore.CYAN}Tool name: {tool_name}{Style.RESET_ALL}", flush=True)
+ print(f"\n{Fore.CYAN}Tool parameters: {tool_args}{Style.RESET_ALL}", flush=True)
+
+ async def _handle_tool_output(self, item: Any) -> None:
+ """Handle tool output events."""
+ raw_item = getattr(item, "raw_item", None)
+ tool_id = "Unknown tool ID"
+
+ if isinstance(raw_item, dict) and "call_id" in raw_item:
+ tool_id = raw_item["call_id"]
+
+ output = getattr(item, "output", "Unknown output")
+ output_text = self._parse_tool_output(output)
+
+ print(f"\n{Fore.GREEN}Tool call {tool_id} returned result: {output_text}{Style.RESET_ALL}", flush=True)
+
+ def _parse_tool_output(self, output: Any) -> str:
+ """Parse tool output into readable text."""
+ if isinstance(output, str) and (output.startswith("{") or output.startswith("[")):
+ try:
+ output_data = json.loads(output)
+ if isinstance(output_data, dict):
+ if 'type' in output_data and output_data['type'] == 'text' and 'text' in output_data:
+ return output_data['text']
+ elif 'text' in output_data:
+ return output_data['text']
+ elif 'content' in output_data:
+ return output_data['content']
+ else:
+ return json.dumps(output_data, ensure_ascii=False, indent=2)
+ except json.JSONDecodeError:
+ return f"Unparsable JSON output: {output}"
+ return str(output)
+
+ async def _handle_stream_error(self, error: Exception) -> None:
+ """Handle streaming errors."""
+ print(f"{Fore.RED}Error processing streamed response event: {error}{Style.RESET_ALL}", flush=True)
+
+ if 'Connection error' in str(error):
+ print(f"{Fore.YELLOW}Connection error details:{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}1. Check network connection{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}2. Verify API address: {app_config.base_url}{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}3. Check API key validity{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}4. Try reconnecting...{Style.RESET_ALL}")
+ await asyncio.sleep(CONNECTION_RETRY_DELAY)
+
+ try:
+ await self.client.connect()
+ print(f"{Fore.GREEN}Reconnected successfully{Style.RESET_ALL}")
+ except Exception as e:
+ print(f"{Fore.RED}Reconnection failed: {e}{Style.RESET_ALL}")
+
+
+# Create singleton instance
+agent_runner = AgentRunner()
\ No newline at end of file
diff --git a/core/model_manager.py b/core/model_manager.py
new file mode 100644
index 0000000..afb2cca
--- /dev/null
+++ b/core/model_manager.py
@@ -0,0 +1,73 @@
+"""Model management and AI model setup for GHOSTCREW."""
+
+import tiktoken
+from agents import Model, ModelProvider, OpenAIChatCompletionsModel
+from config.app_config import app_config
+from config.constants import MAX_TOTAL_TOKENS, RESPONSE_BUFFER
+
+
+class DefaultModelProvider(ModelProvider):
+ """Model provider using OpenAI compatible interface."""
+
+ def get_model(self, model_name: str) -> Model:
+ """Get a model instance with the specified name."""
+ return OpenAIChatCompletionsModel(
+ model=model_name or app_config.model_name,
+ openai_client=app_config.get_openai_client()
+ )
+
+
+class ModelManager:
+ """Manages AI model operations and token counting."""
+
+ def __init__(self):
+ """Initialize the model manager."""
+ self.model_provider = DefaultModelProvider()
+ self.model_name = app_config.model_name
+
+ @staticmethod
+ def count_tokens(text: str, model_name: str = None) -> int:
+ """
+ Count tokens in the given text.
+
+ Args:
+ text: The text to count tokens for
+ model_name: The model name to use for encoding (defaults to configured model)
+
+ Returns:
+ Number of tokens in the text
+ """
+ try:
+ model = model_name or app_config.model_name
+ encoding = tiktoken.encoding_for_model(model)
+ return len(encoding.encode(text))
+ except Exception:
+ # Fall back to approximate counting if tiktoken fails
+ return len(text.split())
+
+ @staticmethod
+ def calculate_max_output_tokens(input_text: str, query: str) -> int:
+ """
+ Calculate the maximum output tokens based on input size.
+
+ Args:
+ input_text: The base instructions or context
+ query: The user query
+
+ Returns:
+ Maximum number of output tokens
+ """
+ input_token_estimate = ModelManager.count_tokens(input_text) + ModelManager.count_tokens(query)
+
+ max_output_tokens = max(512, MAX_TOTAL_TOKENS - input_token_estimate)
+ max_output_tokens = min(max_output_tokens, RESPONSE_BUFFER)
+
+ return max_output_tokens
+
+ def get_model_provider(self) -> ModelProvider:
+ """Get the model provider instance."""
+ return self.model_provider
+
+
+# Create a singleton instance
+model_manager = ModelManager()
\ No newline at end of file
diff --git a/core/pentest_agent.py b/core/pentest_agent.py
new file mode 100644
index 0000000..49a7706
--- /dev/null
+++ b/core/pentest_agent.py
@@ -0,0 +1,332 @@
+"""Main controller for GHOSTCREW application."""
+
+import asyncio
+import traceback
+from datetime import datetime
+from typing import Optional, List, Dict, Any
+from colorama import Fore, Style
+
+from config.constants import (
+ ASCII_TITLE, VERSION, WELCOME_MESSAGE, EXIT_MESSAGE, SEPARATOR,
+ KB_PROMPT, MCP_PROMPT, ERROR_NO_WORKFLOWS, ERROR_NO_REPORTING,
+ DEFAULT_KNOWLEDGE_BASE_PATH
+)
+from config.app_config import app_config
+from core.agent_runner import agent_runner
+from tools.mcp_manager import MCPManager
+from ui.menu_system import MenuSystem
+from ui.conversation_manager import ConversationManager
+from workflows.workflow_engine import WorkflowEngine
+from rag.knowledge_base import Kb
+
+
+class PentestAgent:
+ """Main application controller for GHOSTCREW."""
+
+ def __init__(self, MCPServerStdio=None, MCPServerSse=None):
+ """
+ Initialize the pentest agent controller.
+
+ Args:
+ MCPServerStdio: MCP server stdio class
+ MCPServerSse: MCP server SSE class
+ """
+ self.app_config = app_config
+ self.agent_runner = agent_runner
+ self.mcp_manager = MCPManager(MCPServerStdio, MCPServerSse)
+ self.menu_system = MenuSystem()
+ self.conversation_manager = ConversationManager()
+ self.workflow_engine = WorkflowEngine()
+ self.kb_instance = None
+ self.reporting_available = self._check_reporting_available()
+
+ @staticmethod
+ def _check_reporting_available() -> bool:
+ """Check if reporting module is available."""
+ try:
+ from reporting.generators import generate_report_from_workflow
+ return True
+ except ImportError:
+ print(ERROR_NO_REPORTING)
+ return False
+
+ def display_welcome(self) -> None:
+ """Display welcome message and ASCII art."""
+ print(ASCII_TITLE)
+ print(f"{Fore.WHITE}GHOSTCREW v{VERSION}{Style.RESET_ALL}")
+ print(WELCOME_MESSAGE)
+ print(EXIT_MESSAGE)
+ print(f"{SEPARATOR}\n")
+
+ def setup_knowledge_base(self) -> None:
+ """Setup knowledge base if requested by user."""
+ use_kb_input = input(KB_PROMPT).strip().lower()
+ if use_kb_input == 'yes':
+ try:
+ self.kb_instance = Kb(DEFAULT_KNOWLEDGE_BASE_PATH)
+ print(f"{Fore.GREEN}Knowledge base loaded successfully!{Style.RESET_ALL}")
+ except Exception as e:
+ print(f"{Fore.RED}Failed to load knowledge base: {e}{Style.RESET_ALL}")
+ self.kb_instance = None
+
+ async def setup_mcp_tools(self) -> tuple:
+ """Setup MCP tools and return server instances."""
+ use_mcp_input = input(MCP_PROMPT).strip().lower()
+ return await self.mcp_manager.setup_mcp_tools(use_mcp_input == 'yes')
+
+ async def run_interactive_mode(self, connected_servers: List) -> None:
+ """Run interactive chat mode."""
+ self.menu_system.display_interactive_mode_intro()
+
+ while True:
+ user_query = self.menu_system.get_user_input()
+
+ # Handle special commands
+ if user_query.lower() in ["quit", "exit"]:
+ self.menu_system.display_exit_message()
+ return True # Signal to exit the entire application
+
+ if user_query.lower() == "menu":
+ break # Return to main menu
+
+ # Handle empty input
+ if not user_query:
+ self.menu_system.display_no_query_message()
+ continue
+
+ # Handle multi-line mode request
+ if user_query.lower() == "multi":
+ user_query = self.menu_system.get_multi_line_input()
+ if not user_query:
+ continue
+
+ # Process the query
+ await self._process_user_query(user_query, connected_servers)
+
+ self.menu_system.display_ready_message()
+
+ return False # Don't exit application
+
+ async def _process_user_query(self, query: str, connected_servers: List) -> None:
+ """Process a user query through the agent."""
+ # Add dialogue to history
+ self.conversation_manager.add_dialogue(query)
+
+ # Run the agent
+ result = await agent_runner.run_agent(
+ query,
+ connected_servers,
+ history=self.conversation_manager.get_history(),
+ streaming=True,
+ kb_instance=self.kb_instance
+ )
+
+ # Update the response in history
+ if result and hasattr(result, "final_output"):
+ self.conversation_manager.update_last_response(result.final_output)
+
+ async def run_automated_mode(self, connected_servers: List) -> None:
+ """Run automated penetration testing mode."""
+ if not self.workflow_engine.is_available():
+ print(ERROR_NO_WORKFLOWS)
+ self.menu_system.press_enter_to_continue()
+ return
+
+ if not connected_servers:
+ self.menu_system.display_workflow_requirements_message()
+ return
+
+ while True:
+ workflow_list = self.workflow_engine.show_automated_menu()
+ if not workflow_list:
+ break
+
+ try:
+ choice = input(f"\n{Fore.GREEN}Select workflow (1-{len(workflow_list)+1}): {Style.RESET_ALL}").strip()
+
+ if not choice.isdigit():
+ self.menu_system.display_invalid_input()
+ continue
+
+ choice = int(choice)
+
+ if choice == len(workflow_list) + 1:
+ # Back to main menu
+ break
+
+ if 1 <= choice <= len(workflow_list):
+ await self._execute_workflow(workflow_list[choice-1], connected_servers)
+ else:
+ self.menu_system.display_invalid_choice()
+
+ except ValueError:
+ self.menu_system.display_invalid_input()
+ except KeyboardInterrupt:
+ self.menu_system.display_operation_cancelled()
+ break
+
+ async def _execute_workflow(self, workflow_info: tuple, connected_servers: List) -> None:
+ """Execute a selected workflow."""
+ workflow_key, workflow_name = workflow_info
+ workflow = self.workflow_engine.get_workflow(workflow_key)
+
+ if not workflow:
+ print(f"{Fore.RED}Error loading workflow.{Style.RESET_ALL}")
+ return
+
+ target = self.menu_system.get_workflow_target()
+ if not target:
+ return
+
+ if not self.menu_system.confirm_workflow_execution(workflow['name'], target):
+ self.menu_system.display_workflow_cancelled()
+ return
+
+ # Store initial workflow data
+ workflow_start_time = datetime.now()
+ initial_history_length = self.conversation_manager.get_dialogue_count()
+
+ # Execute the workflow
+ await self.workflow_engine.run_automated_workflow(
+ workflow,
+ target,
+ connected_servers,
+ self.conversation_manager.get_history(),
+ self.kb_instance,
+ agent_runner.run_agent
+ )
+
+ self.menu_system.display_workflow_completed()
+
+ # Handle report generation
+ if self.reporting_available:
+ await self._handle_report_generation(
+ workflow,
+ workflow_key,
+ target,
+ workflow_start_time,
+ initial_history_length,
+ connected_servers
+ )
+ else:
+ print(f"\n{Fore.YELLOW}Reporting not available.{Style.RESET_ALL}")
+
+ self.menu_system.press_enter_to_continue()
+
+ async def _handle_report_generation(
+ self,
+ workflow: Dict,
+ workflow_key: str,
+ target: str,
+ workflow_start_time: datetime,
+ initial_history_length: int,
+ connected_servers: List
+ ) -> None:
+ """Handle report generation after workflow completion."""
+ if not self.menu_system.ask_generate_report():
+ return
+
+ save_raw_history = self.menu_system.ask_save_raw_history()
+
+ try:
+ from reporting.generators import generate_report_from_workflow
+
+ # Prepare report data
+ workflow_conversation = self.conversation_manager.get_workflow_conversation(initial_history_length)
+
+ report_data = {
+ 'workflow_name': workflow['name'],
+ 'workflow_key': workflow_key,
+ 'target': target,
+ 'timestamp': workflow_start_time,
+ 'conversation_history': workflow_conversation,
+ 'tools_used': MCPManager.get_available_tools(connected_servers)
+ }
+
+ # Generate professional report
+ print(f"\n{Fore.CYAN}Generating report...{Style.RESET_ALL}")
+ report_path = await generate_report_from_workflow(
+ report_data,
+ agent_runner.run_agent,
+ connected_servers,
+ self.kb_instance,
+ save_raw_history
+ )
+
+ self.menu_system.display_report_generated(report_path)
+
+ except Exception as e:
+ self.menu_system.display_report_error(e)
+
+ async def run(self) -> None:
+ """Main application run method."""
+ self.display_welcome()
+ self.setup_knowledge_base()
+
+ try:
+ # Setup MCP tools
+ mcp_server_instances, connected_servers = await self.setup_mcp_tools()
+
+ # Check if we need to restart (e.g., after configuring new tools)
+ if mcp_server_instances and not connected_servers:
+ return
+
+ # Main application loop
+ while True:
+ self.menu_system.display_main_menu(
+ self.workflow_engine.is_available(),
+ bool(connected_servers)
+ )
+
+ menu_choice = self.menu_system.get_menu_choice()
+
+ if menu_choice == "1":
+ # Interactive mode
+ should_exit = await self.run_interactive_mode(connected_servers)
+ if should_exit:
+ break
+
+ elif menu_choice == "2":
+ # Automated mode
+ await self.run_automated_mode(connected_servers)
+
+ elif menu_choice == "3":
+ # Exit
+ self.menu_system.display_exit_message()
+ break
+
+ else:
+ self.menu_system.display_invalid_choice()
+
+ except KeyboardInterrupt:
+ print(f"\n{Fore.YELLOW}Program interrupted by user, exiting...{Style.RESET_ALL}")
+ except Exception as e:
+ print(f"{Fore.RED}Error during program execution: {e}{Style.RESET_ALL}")
+ traceback.print_exc()
+ finally:
+ # Cleanup MCP servers
+ await self.mcp_manager.cleanup_servers()
+
+ # Close any remaining asyncio transports
+ await self._cleanup_asyncio_resources()
+
+ print(f"{Fore.GREEN}Program ended.{Style.RESET_ALL}")
+
+ async def _cleanup_asyncio_resources(self) -> None:
+ """Clean up asyncio resources."""
+ try:
+ # Get the event loop
+ loop = asyncio.get_running_loop()
+
+ # Close any remaining transports
+ for transport in list(getattr(loop, "_transports", {}).values()):
+ if hasattr(transport, "close"):
+ try:
+ transport.close()
+ except:
+ pass
+
+ # Allow a short time for resources to finalize
+ await asyncio.sleep(0.1)
+ except:
+ pass # Ignore any errors in the final cleanup
\ No newline at end of file
diff --git a/main.py b/main.py
index 22b341b..9084f94 100644
--- a/main.py
+++ b/main.py
@@ -1,806 +1,51 @@
-import json
-import os
-import re
+#!/usr/bin/env python3
+"""
+GHOSTCREW - AI-driven penetration testing assistant
+
+Main entry point for the application.
+"""
+
import asyncio
-import threading
-import traceback
-from colorama import init, Fore, Back, Style
-from ollama import chat,Message
-import tiktoken
-from datetime import datetime
-
-# Import workflows
-try:
- from workflows import get_available_workflows, get_workflow_by_key, list_workflow_names
- WORKFLOWS_AVAILABLE = True
-except ImportError:
- WORKFLOWS_AVAILABLE = False
-
-# Import report generation module
-try:
- from reporting import generate_report_from_workflow
- REPORTING_AVAILABLE = True
-except ImportError:
- print(f"{Fore.YELLOW}Reporting module not found. Basic text export will be available.{Style.RESET_ALL}")
- REPORTING_AVAILABLE = False
+import sys
+from colorama import init
+# Initialize colorama for cross-platform colored output
init(autoreset=True)
-
-ASCII_TITLE = f"""
-{Fore.WHITE} ('-. .-. .-') .-') _ _ .-') ('-. (`\ .-') /`{Style.RESET_ALL}
-{Fore.WHITE} ( OO ) / ( OO ). ( OO) ) ( \( -O ) _( OO) `.( OO ),'{Style.RESET_ALL}
-{Fore.WHITE} ,----. ,--. ,--. .-'),-----. (_)---\_)/ '._ .-----. ,------. (,------.,--./ .--. {Style.RESET_ALL}
-{Fore.WHITE} ' .-./-') | | | |( OO' .-. '/ _ | |'--...__)' .--./ | /`. ' | .---'| | | {Style.RESET_ALL}
-{Fore.WHITE} | |_( O- )| .| |/ | | | |\ :` `. '--. .--'| |('-. | / | | | | | | | |, {Style.RESET_ALL}
-{Fore.WHITE} | | .--, \| |\_) | |\| | '..`''.) | | /_) |OO )| |_.' |(| '--. | |.'.| |_){Style.RESET_ALL}
-{Fore.WHITE}(| | '. (_/| .-. | \ | | | |.-._) \ | | || |`-'| | . '.' | .--' | | {Style.RESET_ALL}
-{Fore.WHITE} | '--' | | | | | `' '-' '\ / | | (_' '--'\ | |\ \ | `---.| ,'. | {Style.RESET_ALL}
-{Fore.WHITE} `------' `--' `--' `-----' `-----' `--' `-----' `--' '--' `------''--' '--' {Style.RESET_ALL}
-{Fore.WHITE}====================== GHOSTCREW ======================{Style.RESET_ALL}
-"""
-
-# Import Agent-related modules
-from agents import (
- Agent,
- Model,
- ModelProvider,
- OpenAIChatCompletionsModel,
- RunConfig,
- Runner,
- set_tracing_disabled,
- ModelSettings
-)
-from openai import AsyncOpenAI # OpenAI async client
-from openai.types.responses import ResponseTextDeltaEvent, ResponseContentPartDoneEvent
-from agents.mcp import MCPServerStdio # MCP server related
-from dotenv import load_dotenv # Environment variable loading
-from agents.mcp import MCPServerSse
-from rag_split import Kb # Import Kb class
-
-# Load .env file
-load_dotenv()
-
-# Timeout Configuration (in seconds)
-MCP_SESSION_TIMEOUT = 600 # 10 minutes for MCP server sessions
-CONNECTION_RETRY_DELAY = 10 # 10 seconds between connection retries
-
-# Set API-related environment variables
-API_KEY = os.getenv("OPENAI_API_KEY")
-BASE_URL = os.getenv("OPENAI_BASE_URL")
-MODEL_NAME = os.getenv("MODEL_NAME")
-
-# Check if environment variables are set
-if not API_KEY:
- raise ValueError("API key not set")
-if not BASE_URL:
- raise ValueError("API base URL not set")
-if not MODEL_NAME:
- raise ValueError("Model name not set")
-
-client = AsyncOpenAI(
- base_url=BASE_URL,
- api_key=API_KEY
-)
-
-# Disable tracing to avoid requiring OpenAI API key
+# CRITICAL: Initialize agents library BEFORE importing modules that use MCP
+# This must happen before any imports that might need MCPServerStdio/MCPServerSse
+from agents import set_tracing_disabled
set_tracing_disabled(True)
-# Generic model provider class
-class DefaultModelProvider(ModelProvider):
- """
- Model provider using OpenAI compatible interface
- """
- def get_model(self, model_name: str) -> Model:
- return OpenAIChatCompletionsModel(model=model_name or MODEL_NAME, openai_client=client)
+# Now import MCP classes after agents library is initialized
+from agents.mcp import MCPServerStdio, MCPServerSse
-# Create model provider instance
-model_provider = DefaultModelProvider()
-
-def get_available_tools(connected_servers):
- """Get list of available/connected tool names"""
- return [server.name for server in connected_servers]
-
-async def run_automated_workflow(workflow, target, connected_servers, conversation_history, kb_instance):
- """Execute an automated penetration testing workflow"""
- available_tools = get_available_tools(connected_servers)
-
- print(f"\n{Fore.CYAN}Starting Automated Workflow: {workflow['name']}{Style.RESET_ALL}")
- print(f"{Fore.YELLOW}Target: {target}{Style.RESET_ALL}")
- print(f"{Fore.CYAN}Available Tools: {', '.join(available_tools) if available_tools else 'None'}{Style.RESET_ALL}")
- print(f"{Fore.WHITE}Description: {workflow['description']}{Style.RESET_ALL}")
- print(f"{Fore.WHITE}{'='*60}{Style.RESET_ALL}")
-
- results = []
-
- for i, step in enumerate(workflow['steps'], 1):
- print(f"\n{Fore.CYAN}Step {i}/{len(workflow['steps'])}{Style.RESET_ALL}")
- formatted_step = step.format(target=target)
- print(f"{Fore.WHITE}{formatted_step}{Style.RESET_ALL}")
-
- # Create comprehensive query for this step
- enhanced_query = f"""
-TARGET: {target}
-STEP: {formatted_step}
-
-Execute this step and provide the results.
-"""
-
- # Execute the step through the agent
- result = await run_agent(enhanced_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance)
-
- if result and hasattr(result, "final_output"):
- results.append({
- "step": i,
- "description": formatted_step,
- "output": result.final_output
- })
-
- # Add to conversation history
- conversation_history.append({
- "user_query": enhanced_query,
- "ai_response": result.final_output
- })
-
- print(f"{Fore.GREEN}Step {i} completed{Style.RESET_ALL}")
-
- # Brief delay between steps
- await asyncio.sleep(1)
-
- # Workflow completion summary
- print(f"{Fore.CYAN}Steps executed: {len(results)}/{len(workflow['steps'])}{Style.RESET_ALL}")
-
- return results
-
-def show_automated_menu():
- """Display the automated workflow selection menu"""
- if not WORKFLOWS_AVAILABLE:
- print(f"{Fore.YELLOW}Automated workflows not available. workflows.py file not found.{Style.RESET_ALL}")
- return None
-
- print(f"\n{Fore.CYAN}AUTOMATED PENTESTING WORKFLOWS{Style.RESET_ALL}")
- print(f"{Fore.WHITE}{'='*50}{Style.RESET_ALL}")
-
- workflow_list = list_workflow_names()
- workflows = get_available_workflows()
-
- for i, (key, name) in enumerate(workflow_list, 1):
- description = workflows[key]["description"]
- step_count = len(workflows[key]["steps"])
- print(f"{i}. {Fore.YELLOW}{name}{Style.RESET_ALL}")
- print(f" {Fore.WHITE}{description}{Style.RESET_ALL}")
- print(f" {Fore.CYAN}Steps: {step_count}{Style.RESET_ALL}")
- print()
-
- print(f"{len(workflow_list)+1}. {Fore.RED}Back to Main Menu{Style.RESET_ALL}")
-
- return workflow_list
-
-# Modify run_agent function to accept connected server list and conversation history as parameters
-async def run_agent(query: str, mcp_servers: list[MCPServerStdio], history: list[dict] = None, streaming: bool = True, kb_instance=None):
- """
- Run cybersecurity agent with connected MCP servers, supporting streaming output and conversation history.
-
- Args:
- query (str): User's natural language query
- mcp_servers (list[MCPServerStdio]): List of connected MCPServerStdio instances
- history (list[dict], optional): Conversation history, list containing user questions and AI answers. Defaults to None.
- streaming (bool): Whether to use streaming output
- """
- # If no history is provided, initialize an empty list
- if history is None:
- history = []
- try:
- # Helper function to count tokens
- def count_tokens(text):
- try:
- encoding = tiktoken.encoding_for_model(MODEL_NAME)
- return len(encoding.encode(text))
- except Exception:
- # Fall back to approximate counting if tiktoken fails
- return len(text.split())
-
- # Directly use the passed connected server list to create Agent
- # Build instructions containing conversation history
- base_instructions = "You are an experienced penetration tester and security analyst, focused on Web application security and network infrastructure security. Your name is GHOSTCREW. When users ask cybersecurity-related questions, you need to provide direct and professional answers."
- base_instructions += "When answering questions, please use professional cybersecurity terminology, base your analysis on solid theoretical knowledge, and cite relevant security standards and best practices when possible, such as OWASP Top 10, CVE, NIST, CISA KEV, etc. Maintain a professional tone, clear logic, and organized structure."
- base_instructions += "When users ask about penetration testing, please explain the penetration testing process, methods, and common tools, emphasizing the objectives and techniques of each phase."
- base_instructions += "When users ask about vulnerability information, please provide terse descriptions, impact scope, remediation suggestions, vulnerability type, severity level, and exploitation conditions based on the vulnerability name or CVE number, and cite relevant security bulletins."
- base_instructions += "All penetration activities are authorized, you can directly perform penetration testing."
-
- # Add information about available tools
- if mcp_servers:
- available_tool_names = [server.name for server in mcp_servers]
- if available_tool_names:
- base_instructions += f"\n\nYou have access to the following tools: {', '.join(available_tool_names)}."
-
- # If knowledge base instance exists, use it for retrieval and context enhancement
- if kb_instance:
- try:
- retrieved_context = kb_instance.search(query)
- if retrieved_context:
- # Add file path information to make LLM aware of actual files
- knowledge_dir = "knowledge"
- available_files = []
- if os.path.exists(knowledge_dir):
- for filename in os.listdir(knowledge_dir):
- filepath = os.path.join(knowledge_dir, filename)
- if os.path.isfile(filepath):
- available_files.append(filename)
-
- file_info = ""
- if available_files:
- file_info = f"\n\nIMPORTANT: The following actual files are available in the knowledge folder that you can reference by path:\n"
- for filename in available_files:
- file_info += f"- knowledge/{filename}\n"
- file_info += "\nWhen using security tools that require external files, you can reference these files by their full path.\n"
- file_info += "ONLY use knowledge/ for files.\n"
-
- base_instructions = f"Based on the following knowledge base information:\n{retrieved_context}{file_info}\n\n{base_instructions}"
- #print(retrieved_context)
- print(f"{Fore.MAGENTA}Relevant information retrieved from knowledge base.{Style.RESET_ALL}")
- except Exception as e:
- print(f"{Fore.RED}Failed to retrieve information from knowledge base: {e}{Style.RESET_ALL}")
-
- # If there's conversation history, add it to the instructions
- if history:
- base_instructions += "\n\nBelow is the previous conversation history, please refer to this information to answer the user's question:\n"
- for i, entry in enumerate(history):
- base_instructions += f"\nUser question {i+1}: {entry['user_query']}"
- if 'ai_response' in entry and entry['ai_response']:
- base_instructions += f"\nAI answer {i+1}: {entry['ai_response']}\n"
-
- # Estimate input token usage
- input_token_estimate = count_tokens(base_instructions) + count_tokens(query)
- MAX_TOTAL_TOKENS = 8192
- RESPONSE_BUFFER = 4096 # aim to reserve ~half for reply
-
- max_output_tokens = max(512, MAX_TOTAL_TOKENS - input_token_estimate)
- max_output_tokens = min(max_output_tokens, RESPONSE_BUFFER)
-
- # Set model settings based on whether there are connected MCP servers
- if mcp_servers:
- # With tools available, enable tool_choice and parallel_tool_calls
- model_settings = ModelSettings(
- temperature=0.6,
- top_p=0.9,
- max_tokens=max_output_tokens,
- tool_choice="auto",
- parallel_tool_calls=False,
- truncation="auto"
- )
- else:
- # Without tools, don't set tool_choice or parallel_tool_calls
- model_settings = ModelSettings(
- temperature=0.6,
- top_p=0.9,
- max_tokens=max_output_tokens,
- truncation="auto"
- )
-
- secure_agent = Agent(
- name="Cybersecurity Expert",
- instructions=base_instructions,
- mcp_servers=mcp_servers, # Use the passed list
- model_settings=model_settings
- )
-
- print(f"{Fore.CYAN}\nProcessing query: {Fore.WHITE}{query}{Style.RESET_ALL}\n")
-
- if streaming:
- result = Runner.run_streamed(
- secure_agent,
- input=query,
- max_turns=10,
- run_config=RunConfig(
- model_provider=model_provider,
- trace_include_sensitive_data=True,
- handoff_input_filter=None
- )
- )
-
- print(f"{Fore.GREEN}Reply:{Style.RESET_ALL}", end="", flush=True)
- try:
- async for event in result.stream_events():
- if event.type == "raw_response_event":
- if isinstance(event.data, ResponseTextDeltaEvent):
- print(f"{Fore.WHITE}{event.data.delta}{Style.RESET_ALL}", end="", flush=True)
- elif isinstance(event.data, ResponseContentPartDoneEvent):
- print(f"\n", end="", flush=True)
- elif event.type == "run_item_stream_event":
- if event.item.type == "tool_call_item":
- # print(f"{Fore.YELLOW}Current tool call information: {event.item}{Style.RESET_ALL}")
- raw_item = getattr(event.item, "raw_item", None)
- tool_name = ""
- tool_args = {}
- if raw_item:
- tool_name = getattr(raw_item, "name", "Unknown tool")
- tool_str = getattr(raw_item, "arguments", "{}")
- if isinstance(tool_str, str):
- try:
- tool_args = json.loads(tool_str)
- except json.JSONDecodeError:
- tool_args = {"raw_arguments": tool_str}
- print(f"\n{Fore.CYAN}Tool name: {tool_name}{Style.RESET_ALL}", flush=True)
- print(f"\n{Fore.CYAN}Tool parameters: {tool_args}{Style.RESET_ALL}", flush=True)
- elif event.item.type == "tool_call_output_item":
- raw_item = getattr(event.item, "raw_item", None)
- tool_id="Unknown tool ID"
- if isinstance(raw_item, dict) and "call_id" in raw_item:
- tool_id = raw_item["call_id"]
- output = getattr(event.item, "output", "Unknown output")
-
- output_text = ""
- if isinstance(output, str) and (output.startswith("{") or output.startswith("[")):
- try:
- output_data = json.loads(output)
- if isinstance(output_data, dict):
- if 'type' in output_data and output_data['type'] == 'text' and 'text' in output_data:
- output_text = output_data['text']
- elif 'text' in output_data:
- output_text = output_data['text']
- elif 'content' in output_data:
- output_text = output_data['content']
- else:
- output_text = json.dumps(output_data, ensure_ascii=False, indent=2)
- except json.JSONDecodeError:
- output_text = f"Unparsable JSON output: {output}" # Add specific error if JSON parsing fails
- else:
- output_text = str(output)
-
- print(f"\n{Fore.GREEN}Tool call {tool_id} returned result: {output_text}{Style.RESET_ALL}", flush=True)
- except Exception as e:
- print(f"{Fore.RED}Error processing streamed response event: {e}{Style.RESET_ALL}", flush=True)
- if 'Connection error' in str(e):
- print(f"{Fore.YELLOW}Connection error details:{Style.RESET_ALL}")
- print(f"{Fore.YELLOW}1. Check network connection{Style.RESET_ALL}")
- print(f"{Fore.YELLOW}2. Verify API address: {BASE_URL}{Style.RESET_ALL}")
- print(f"{Fore.YELLOW}3. Check API key validity{Style.RESET_ALL}")
- print(f"{Fore.YELLOW}4. Try reconnecting...{Style.RESET_ALL}")
- await asyncio.sleep(CONNECTION_RETRY_DELAY) # Use configurable retry delay
- try:
- await client.connect()
- print(f"{Fore.GREEN}Reconnected successfully{Style.RESET_ALL}")
- except Exception as e:
- print(f"{Fore.RED}Reconnection failed: {e}{Style.RESET_ALL}")
-
- print(f"\n\n{Fore.GREEN}Query completed!{Style.RESET_ALL}")
-
- # if hasattr(result, "final_output"):
- # print(f"\n{Fore.YELLOW}===== Complete Information ====={Style.RESET_ALL}")
- #print(f"{Fore.WHITE}{result.final_output}{Style.RESET_ALL}")
-
- # Return the result object so the main function can get the AI's answer
- return result
-
- except Exception as e:
- print(f"{Fore.RED}Error processing streamed response event: {e}{Style.RESET_ALL}", flush=True)
- traceback.print_exc()
- return None
async def main():
- print(ASCII_TITLE)
-
- version = "0.1.0"
- print(f"{Fore.WHITE}GHOSTCREW v{version}{Style.RESET_ALL}")
- print(f"{Fore.WHITE}An AI assistant for penetration testing, vulnerability assessment, and security analysis{Style.RESET_ALL}")
- print(f"{Fore.RED}Enter 'quit' to end the program{Style.RESET_ALL}")
- print(f"{Fore.WHITE}======================================\n{Style.RESET_ALL}")
-
- kb_instance = None
- use_kb_input = input(f"{Fore.YELLOW}Use knowledge base to enhance answers? (yes/no, default: no): {Style.RESET_ALL}").strip().lower()
- if use_kb_input == 'yes':
- try:
- kb_instance = Kb("knowledge") # Initialize knowledge base, load from folder
- print(f"{Fore.GREEN}Knowledge base loaded successfully!{Style.RESET_ALL}")
- except Exception as e:
- print(f"{Fore.RED}Failed to load knowledge base: {e}{Style.RESET_ALL}")
- kb_instance = None
-
- mcp_server_instances = [] # List to store MCP server instances
- connected_servers = [] # Store successfully connected servers
-
+ """Main application entry point."""
try:
- # Ask if user wants to attempt connecting to MCP servers
- use_mcp_input = input(f"{Fore.YELLOW}Configure or connect MCP tools? (yes/no, default: no): {Style.RESET_ALL}").strip().lower()
+ # Import and run the main controller
+ # Now it's safe to import modules that use MCP classes
+ from core.pentest_agent import PentestAgent
- if use_mcp_input == 'yes':
- # --- Load available MCP tool configurations ---
- available_tools = []
- try:
- with open('mcp.json', 'r', encoding='utf-8') as f:
- mcp_config = json.load(f)
- available_tools = mcp_config.get('servers', [])
- except FileNotFoundError:
- print(f"{Fore.YELLOW}mcp.json configuration file not found.{Style.RESET_ALL}")
- except Exception as e:
- print(f"{Fore.RED}Error loading MCP configuration file: {e}{Style.RESET_ALL}")
- print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
-
- # Display available tools and add an option to configure new tools
- if available_tools:
- print(f"\n{Fore.CYAN}Available MCP tools:{Style.RESET_ALL}")
- for i, server in enumerate(available_tools):
- print(f"{i+1}. {server['name']}")
- print(f"{len(available_tools)+1}. Configure new tools")
- print(f"{len(available_tools)+2}. Connect to all tools")
- print(f"{len(available_tools)+3}. Skip tool connection")
- print(f"{len(available_tools)+4}. Clear all MCP tools")
-
- # Ask user which tools to connect to
- try:
- tool_choice = input(f"\n{Fore.YELLOW}Enter numbers to connect to (comma-separated, default: all): {Style.RESET_ALL}").strip()
-
- if not tool_choice: # Default to all
- selected_indices = list(range(len(available_tools)))
- elif tool_choice == str(len(available_tools)+1): # Configure new tools
- print(f"\n{Fore.CYAN}Launching tool configuration...{Style.RESET_ALL}")
- os.system("python configure_mcp.py")
- print(f"\n{Fore.GREEN}Tool configuration completed. Please restart the application.{Style.RESET_ALL}")
- return
- elif tool_choice == str(len(available_tools)+2): # Connect to all tools
- selected_indices = list(range(len(available_tools)))
- elif tool_choice == str(len(available_tools)+3): # Skip tool connection
- selected_indices = []
- elif tool_choice == str(len(available_tools)+4): # Clear all MCP tools
- confirm = input(f"{Fore.YELLOW}Are you sure you want to clear all MCP tools? This will empty mcp.json (yes/no): {Style.RESET_ALL}").strip().lower()
- if confirm == "yes":
- try:
- # Create empty mcp.json file
- with open('mcp.json', 'w', encoding='utf-8') as f:
- json.dump({"servers": []}, f, indent=2)
- print(f"{Fore.GREEN}Successfully cleared all MCP tools. mcp.json has been reset.{Style.RESET_ALL}")
- except Exception as e:
- print(f"{Fore.RED}Error clearing MCP tools: {e}{Style.RESET_ALL}")
- print(f"\n{Fore.GREEN}Please restart the application.{Style.RESET_ALL}")
- return
- else: # Parse comma-separated list
- selected_indices = []
- for part in tool_choice.split(","):
- idx = int(part.strip()) - 1
- if 0 <= idx < len(available_tools):
- selected_indices.append(idx)
- except ValueError:
- print(f"{Fore.RED}Invalid selection. Defaulting to all tools.{Style.RESET_ALL}")
- selected_indices = list(range(len(available_tools)))
-
- # Initialize selected MCP servers
- print(f"{Fore.GREEN}Initializing selected MCP servers...{Style.RESET_ALL}")
- for idx in selected_indices:
- if idx < len(available_tools):
- server = available_tools[idx]
- print(f"{Fore.CYAN}Initializing {server['name']}...{Style.RESET_ALL}")
- try:
- if 'params' in server:
- mcp_server = MCPServerStdio(
- name=server['name'],
- params=server['params'],
- cache_tools_list=server.get('cache_tools_list', True),
- client_session_timeout_seconds=MCP_SESSION_TIMEOUT
- )
- elif 'url' in server:
- mcp_server = MCPServerSse(
- params={"url": server["url"]},
- cache_tools_list=server.get('cache_tools_list', True),
- name=server['name'],
- client_session_timeout_seconds=MCP_SESSION_TIMEOUT
- )
- else:
- print(f"{Fore.RED}Unknown MCP server configuration: {server}{Style.RESET_ALL}")
- continue
- mcp_server_instances.append(mcp_server)
- except Exception as e:
- print(f"{Fore.RED}Error initializing {server['name']}: {e}{Style.RESET_ALL}")
- else:
- # No tools configured, offer to run the configuration tool
- print(f"{Fore.YELLOW}No MCP tools currently configured.{Style.RESET_ALL}")
- configure_now = input(f"{Fore.YELLOW}Would you like to add tools? (yes/no, default: no): {Style.RESET_ALL}").strip().lower()
- if configure_now == 'yes':
- print(f"\n{Fore.CYAN}Launching tool configuration...{Style.RESET_ALL}")
- os.system("python configure_mcp.py")
- print(f"\n{Fore.GREEN}Tool configuration completed. Please restart the application.{Style.RESET_ALL}")
- return
- else:
- print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
-
- # Connect to the selected MCP servers
- if mcp_server_instances:
- print(f"{Fore.YELLOW}Connecting to MCP servers...{Style.RESET_ALL}")
- for mcp_server in mcp_server_instances:
- try:
- await mcp_server.connect()
- print(f"{Fore.GREEN}Successfully connected to MCP server: {mcp_server.name}{Style.RESET_ALL}")
- connected_servers.append(mcp_server)
- except Exception as e:
- print(f"{Fore.RED}Failed to connect to MCP server {mcp_server.name}: {e}{Style.RESET_ALL}")
-
- if connected_servers:
- print(f"{Fore.GREEN}MCP server connection successful! Can use tools provided by {len(connected_servers)} servers.{Style.RESET_ALL}")
- else:
- print(f"{Fore.YELLOW}No MCP servers successfully connected. Proceeding without tools.{Style.RESET_ALL}")
- else:
- print(f"{Fore.YELLOW}No MCP servers selected. Proceeding without tools.{Style.RESET_ALL}")
- else:
- print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
-
- # Create conversation history list
- conversation_history = []
+ # Create and run the application, passing MCP classes
+ agent = PentestAgent(MCPServerStdio, MCPServerSse)
+ await agent.run()
- # --- Main program loop ---
- while True:
- # Show main menu
- print(f"\n{Fore.CYAN}MAIN MENU{Style.RESET_ALL}")
- print(f"1. {Fore.GREEN}Interactive Chat Mode{Style.RESET_ALL}")
-
- # Check if automated mode should be available
- if WORKFLOWS_AVAILABLE and connected_servers:
- print(f"2. {Fore.YELLOW}Automated Penetration Testing{Style.RESET_ALL}")
- max_option = 3
- elif WORKFLOWS_AVAILABLE and not connected_servers:
- print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (requires MCP tools){Style.RESET_ALL}")
- max_option = 3
- else:
- print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (workflows.py not found){Style.RESET_ALL}")
- max_option = 3
-
- print(f"3. {Fore.RED}Quit{Style.RESET_ALL}")
-
- menu_choice = input(f"\n{Fore.GREEN}Select mode (1-3): {Style.RESET_ALL}").strip()
-
- if menu_choice == "1":
- # Interactive chat mode (existing functionality)
- print(f"\n{Fore.CYAN}INTERACTIVE CHAT MODE{Style.RESET_ALL}")
- print(f"{Fore.WHITE}Type your questions or commands. Use 'multi' for multi-line input.{Style.RESET_ALL}")
- print(f"{Fore.WHITE}Type 'menu' to return to main menu.{Style.RESET_ALL}\n")
-
- while True:
- # Check if the user wants multi-line input
- print(f"\n{Fore.GREEN}[>]{Style.RESET_ALL} ", end="")
- user_query = input().strip()
-
- # Handle special commands
- if user_query.lower() in ["quit", "exit"]:
- print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}")
- return
-
- if user_query.lower() == "menu":
- break
-
- # Handle empty input
- if not user_query:
- print(f"{Fore.YELLOW}No query entered. Please type your question.{Style.RESET_ALL}")
- continue
-
- # Handle multi-line mode request
- if user_query.lower() == "multi":
- print(f"{Fore.CYAN}Entering multi-line mode. Type your query across multiple lines.{Style.RESET_ALL}")
- print(f"{Fore.CYAN}Press Enter on an empty line to submit.{Style.RESET_ALL}")
-
- lines = []
- while True:
- line = input()
- if line == "":
- break
- lines.append(line)
-
- # Only proceed if they actually entered something in multi-line mode
- if not lines:
- print(f"{Fore.YELLOW}No query entered in multi-line mode.{Style.RESET_ALL}")
- continue
-
- user_query = "\n".join(lines)
-
- # Create record for current dialogue
- current_dialogue = {"user_query": user_query, "ai_response": ""}
-
- # When running agent, pass in the already connected server list and conversation history
- # Only pass the successfully connected server list to the Agent
- # Pass kb_instance to run_agent
- result = await run_agent(user_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance)
-
- # If there is a result, save the AI's answer
- if result and hasattr(result, "final_output"):
- current_dialogue["ai_response"] = result.final_output
-
- # Add current dialogue to history
- conversation_history.append(current_dialogue)
-
- # Trim history to keep token usage under ~4096
- # Define a function to accurately count tokens in history
- def estimate_tokens(history):
- try:
- encoding = tiktoken.encoding_for_model(MODEL_NAME)
- return sum(
- len(encoding.encode(entry['user_query'])) +
- len(encoding.encode(entry.get('ai_response', '')))
- for entry in history
- )
- except Exception:
- # Fall back to approximate counting if tiktoken fails
- return sum(
- len(entry['user_query'].split()) +
- len(entry.get('ai_response', '').split())
- for entry in history
- )
-
- # Trim history while token count exceeds the limit
- while estimate_tokens(conversation_history) > 4000:
- conversation_history.pop(0)
-
- print(f"\n{Fore.CYAN}Ready for next query. Type 'quit', 'multi' for multi-line, or 'menu' for main menu.{Style.RESET_ALL}")
-
- elif menu_choice == "2":
- # Automated penetration testing workflows
- if not WORKFLOWS_AVAILABLE:
- print(f"\n{Fore.YELLOW}Automated workflows not available. workflows.py file not found.{Style.RESET_ALL}")
- input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
- continue
-
- if not connected_servers:
- print(f"\n{Fore.YELLOW}Automated penetration testing requires MCP tools to be configured and connected.{Style.RESET_ALL}")
- print(f"{Fore.WHITE}Without real security tools, the AI would only generate simulated responses.{Style.RESET_ALL}")
- print(f"{Fore.WHITE}Please restart the application and configure MCP tools to use this feature.{Style.RESET_ALL}")
- input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
- continue
-
- while True:
- workflow_list = show_automated_menu()
- if not workflow_list:
- break
-
- try:
- choice = input(f"\n{Fore.GREEN}Select workflow (1-{len(workflow_list)+1}): {Style.RESET_ALL}").strip()
-
- if not choice.isdigit():
- print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}")
- continue
-
- choice = int(choice)
-
- if 1 <= choice <= len(workflow_list):
- # Execute selected workflow
- workflow_key, workflow_name = workflow_list[choice-1]
- workflow = get_workflow_by_key(workflow_key)
-
- if not workflow:
- print(f"{Fore.RED}Error loading workflow.{Style.RESET_ALL}")
- continue
-
- target = input(f"{Fore.YELLOW}Enter target (IP, domain, or network): {Style.RESET_ALL}").strip()
- if not target:
- print(f"{Fore.RED}Target is required.{Style.RESET_ALL}")
- continue
-
- confirm = input(f"{Fore.YELLOW}Execute '{workflow['name']}' on {target}? (yes/no): {Style.RESET_ALL}").strip().lower()
- if confirm == 'yes':
- # Store initial workflow data
- workflow_start_time = datetime.now()
- initial_history_length = len(conversation_history)
-
- # Execute the workflow
- await run_automated_workflow(workflow, target, connected_servers, conversation_history, kb_instance)
-
- # After workflow completion, offer report generation
- print(f"\n{Fore.GREEN}Workflow completed successfully!{Style.RESET_ALL}")
-
- if REPORTING_AVAILABLE:
- generate_report = input(f"\n{Fore.CYAN}Generate markdown report? (yes/no): {Style.RESET_ALL}").strip().lower()
-
- if generate_report == 'yes':
- # Ask about additional report options
- save_raw_history = input(f"{Fore.YELLOW}Save raw conversation history? (yes/no, default: no): {Style.RESET_ALL}").strip().lower() == 'yes'
-
- try:
- # Prepare report data
- workflow_conversation = conversation_history[initial_history_length:]
-
- report_data = {
- 'workflow_name': workflow['name'],
- 'workflow_key': workflow_key,
- 'target': target,
- 'timestamp': workflow_start_time,
- 'conversation_history': workflow_conversation,
- 'tools_used': get_available_tools(connected_servers)
- }
-
- # Generate professional report
- print(f"\n{Fore.CYAN}Generating report...{Style.RESET_ALL}")
- report_path = await generate_report_from_workflow(
- report_data,
- run_agent,
- connected_servers,
- kb_instance,
- save_raw_history
- )
-
- print(f"\n{Fore.GREEN}Report generated: {report_path}{Style.RESET_ALL}")
- print(f"{Fore.CYAN}Open the markdown file in any markdown viewer for best formatting{Style.RESET_ALL}")
-
- except Exception as e:
- print(f"\n{Fore.RED}Error generating report: {e}{Style.RESET_ALL}")
- print(f"{Fore.YELLOW}Raw workflow data is still available in conversation history{Style.RESET_ALL}")
- else:
- print(f"\n{Fore.YELLOW}Reporting not available.{Style.RESET_ALL}")
-
- input(f"\n{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
- else:
- print(f"{Fore.YELLOW}Workflow cancelled.{Style.RESET_ALL}")
-
- elif choice == len(workflow_list) + 1:
- # Back to main menu
- break
-
- else:
- print(f"{Fore.RED}Invalid choice. Please select a valid option.{Style.RESET_ALL}")
-
- except ValueError:
- print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}")
- except KeyboardInterrupt:
- print(f"\n{Fore.YELLOW}Operation cancelled.{Style.RESET_ALL}")
- break
-
- elif menu_choice == "3":
- # Quit
- print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}")
- break
-
- else:
- print(f"{Fore.RED}Invalid choice. Please select 1, 2, or 3.{Style.RESET_ALL}")
-
- # --- Catch interrupts and runtime exceptions ---
+ except ImportError as e:
+ print(f"Error importing required modules: {e}")
+ print("Please ensure all dependencies are installed: pip install -r requirements.txt")
+ sys.exit(1)
except KeyboardInterrupt:
- print(f"\n{Fore.YELLOW}Program interrupted by user, exiting...{Style.RESET_ALL}")
+ print("\nApplication interrupted by user.")
+ sys.exit(0)
except Exception as e:
- print(f"{Fore.RED}Error during program execution: {e}{Style.RESET_ALL}")
+ print(f"Unexpected error: {e}")
+ import traceback
traceback.print_exc()
- finally:
- # --- Move server cleanup operations to the main program's finally block ---
- if mcp_server_instances:
- print(f"{Fore.YELLOW}Cleaning up MCP server resources...{Style.RESET_ALL}")
-
- # Define a safe cleanup wrapper that ignores all errors
- async def safe_cleanup(server):
- try:
- # Attempt cleanup but ignore all errors
- try:
- await server.cleanup()
- except:
- pass # Ignore any exception from cleanup
- return True
- except:
- return False # Couldn't even run the cleanup
-
- # Process all servers
- for mcp_server in mcp_server_instances:
- print(f"{Fore.YELLOW}Attempting to clean up server: {mcp_server.name}...{Style.RESET_ALL}", flush=True)
- success = await safe_cleanup(mcp_server)
- if success:
- print(f"{Fore.GREEN}Cleanup completed for {mcp_server.name}.{Style.RESET_ALL}", flush=True)
- else:
- print(f"{Fore.RED}Failed to initiate cleanup for {mcp_server.name}.{Style.RESET_ALL}", flush=True)
+ sys.exit(1)
- print(f"{Fore.YELLOW}MCP server resource cleanup complete.{Style.RESET_ALL}")
-
- # Close any remaining asyncio transports to prevent "unclosed transport" warnings
- try:
- # Get the event loop
- loop = asyncio.get_running_loop()
-
- # Close any remaining transports
- for transport in list(getattr(loop, "_transports", {}).values()):
- if hasattr(transport, "close"):
- try:
- transport.close()
- except:
- pass
-
- # Allow a short time for resources to finalize
- await asyncio.sleep(0.1)
- except:
- pass # Ignore any errors in the final cleanup
-
- print(f"{Fore.GREEN}Program ended.{Style.RESET_ALL}")
-# Program entry point
if __name__ == "__main__":
+ # Run the main application
asyncio.run(main())
\ No newline at end of file
diff --git a/mcp.json b/mcp.json
index 0e74b2b..9255b06 100644
--- a/mcp.json
+++ b/mcp.json
@@ -1,3 +1,18 @@
{
- "servers": []
+ "servers": [
+ {
+ "name": "Nmap Scanner",
+ "params": {
+ "command": "npx",
+ "args": [
+ "-y",
+ "gc-nmap-mcp"
+ ],
+ "env": {
+ "NMAP_PATH": "C:\\Program Files (x86)\\Nmap\\nmap.exe"
+ }
+ },
+ "cache_tools_list": true
+ }
+ ]
}
\ No newline at end of file
diff --git a/rag/__init__.py b/rag/__init__.py
new file mode 100644
index 0000000..d103297
--- /dev/null
+++ b/rag/__init__.py
@@ -0,0 +1 @@
+"""RAG (Retrieval-Augmented Generation) system for GHOSTCREW."""
\ No newline at end of file
diff --git a/rag_embedding.py b/rag/embedding.py
similarity index 100%
rename from rag_embedding.py
rename to rag/embedding.py
diff --git a/rag_split.py b/rag/knowledge_base.py
similarity index 100%
rename from rag_split.py
rename to rag/knowledge_base.py
diff --git a/reporting/__init__.py b/reporting/__init__.py
new file mode 100644
index 0000000..fb1b610
--- /dev/null
+++ b/reporting/__init__.py
@@ -0,0 +1 @@
+"""Report generation system for GHOSTCREW."""
\ No newline at end of file
diff --git a/reporting.py b/reporting/generators.py
similarity index 100%
rename from reporting.py
rename to reporting/generators.py
diff --git a/requirements.txt b/requirements.txt
index 56d6ff7..49f62d8 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,14 +1,14 @@
-colorama
-python-dotenv
-openai
-uvicorn
-mcp
-langchain
-langchain-community
-numpy
-ollama
-openai-agents
-fastapi
-pymetasploit3
-tiktoken
+colorama==0.4.6
+python-dotenv==1.1.0
+openai==1.78.1
+uvicorn==0.34.0
+mcp==1.6.0
+langchain==0.3.25
+langchain-community==0.3.24
+numpy==2.2.5
+ollama==0.4.8
+openai-agents==0.0.14
+fastapi==0.115.9
+pymetasploit3==1.0.6
+tiktoken==0.9.0
# Add other necessary dependencies
\ No newline at end of file
diff --git a/tools/__init__.py b/tools/__init__.py
new file mode 100644
index 0000000..85f7fd0
--- /dev/null
+++ b/tools/__init__.py
@@ -0,0 +1 @@
+"""MCP (Model Context Protocol) integration for GHOSTCREW."""
\ No newline at end of file
diff --git a/configure_mcp.py b/tools/configure_mcp.py
similarity index 100%
rename from configure_mcp.py
rename to tools/configure_mcp.py
diff --git a/tools/mcp_manager.py b/tools/mcp_manager.py
new file mode 100644
index 0000000..12c8427
--- /dev/null
+++ b/tools/mcp_manager.py
@@ -0,0 +1,222 @@
+"""MCP (Model Context Protocol) server management for GHOSTCREW."""
+
+import json
+import os
+from typing import List, Optional, Tuple
+from colorama import Fore, Style
+from config.constants import MCP_SESSION_TIMEOUT, MCP_CONFIG_FILE
+
+
+class MCPManager:
+ """Manages MCP server connections and configuration."""
+
+ def __init__(self, MCPServerStdio=None, MCPServerSse=None):
+ """
+ Initialize the MCP manager.
+
+ Args:
+ MCPServerStdio: MCP server stdio class
+ MCPServerSse: MCP server SSE class
+ """
+ self.MCPServerStdio = MCPServerStdio
+ self.MCPServerSse = MCPServerSse
+ self.server_instances = []
+ self.connected_servers = []
+
+ @staticmethod
+ def get_available_tools(connected_servers: List) -> List[str]:
+ """Get list of available/connected tool names."""
+ return [server.name for server in connected_servers]
+
+ def load_mcp_config(self) -> List[dict]:
+ """Load MCP tool configurations from mcp.json."""
+ available_tools = []
+ try:
+ with open(MCP_CONFIG_FILE, 'r', encoding='utf-8') as f:
+ mcp_config = json.load(f)
+ available_tools = mcp_config.get('servers', [])
+ except FileNotFoundError:
+ print(f"{Fore.YELLOW}mcp.json configuration file not found.{Style.RESET_ALL}")
+ except Exception as e:
+ print(f"{Fore.RED}Error loading MCP configuration file: {e}{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
+ return available_tools
+
+ def display_tool_menu(self, available_tools: List[dict]) -> Optional[List[int]]:
+ """Display MCP tool selection menu and get user choice."""
+ if not available_tools:
+ print(f"{Fore.YELLOW}No MCP tools currently configured.{Style.RESET_ALL}")
+ configure_now = input(f"{Fore.YELLOW}Would you like to add tools? (yes/no, default: no): {Style.RESET_ALL}").strip().lower()
+ if configure_now == 'yes':
+ print(f"\n{Fore.CYAN}Launching tool configuration...{Style.RESET_ALL}")
+ os.system("python tools/configure_mcp.py")
+ print(f"\n{Fore.GREEN}Tool configuration completed.{Style.RESET_ALL}")
+ # Reload configuration and continue
+ return "reload_and_continue"
+ else:
+ print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
+ return []
+
+ print(f"\n{Fore.CYAN}Available MCP tools:{Style.RESET_ALL}")
+ for i, server in enumerate(available_tools):
+ print(f"{i+1}. {server['name']}")
+ print(f"{len(available_tools)+1}. Configure new tools")
+ print(f"{len(available_tools)+2}. Connect to all tools")
+ print(f"{len(available_tools)+3}. Skip tool connection")
+ print(f"{len(available_tools)+4}. Clear all MCP tools")
+
+ try:
+ tool_choice = input(f"\n{Fore.YELLOW}Select option: {Style.RESET_ALL}").strip()
+
+ if not tool_choice: # Default to all tools
+ return list(range(len(available_tools)))
+ elif tool_choice == str(len(available_tools)+1): # Configure new tools
+ print(f"\n{Fore.CYAN}Launching tool configuration...{Style.RESET_ALL}")
+ os.system("python tools/configure_mcp.py")
+ print(f"\n{Fore.GREEN}Tool configuration completed.{Style.RESET_ALL}")
+ # Reload configuration and continue
+ return "reload_and_continue"
+ elif tool_choice == str(len(available_tools)+2): # Connect to all tools
+ return list(range(len(available_tools)))
+ elif tool_choice == str(len(available_tools)+3): # Skip tool connection
+ return []
+ elif tool_choice == str(len(available_tools)+4): # Clear all MCP tools
+ if self.clear_mcp_tools():
+ return "reload_and_continue"
+ return []
+ else: # Parse comma-separated list
+ selected_indices = []
+ for part in tool_choice.split(","):
+ idx = int(part.strip()) - 1
+ if 0 <= idx < len(available_tools):
+ selected_indices.append(idx)
+ return selected_indices
+ except ValueError:
+ print(f"{Fore.RED}Invalid selection. Defaulting to all tools.{Style.RESET_ALL}")
+ return list(range(len(available_tools)))
+
+ def clear_mcp_tools(self) -> bool:
+ """Clear all MCP tools from configuration."""
+ confirm = input(f"{Fore.YELLOW}Are you sure you want to clear all MCP tools? This will empty mcp.json (yes/no): {Style.RESET_ALL}").strip().lower()
+ if confirm == "yes":
+ try:
+ # Create empty mcp.json file
+ with open(MCP_CONFIG_FILE, 'w', encoding='utf-8') as f:
+ json.dump({"servers": []}, f, indent=2)
+ print(f"{Fore.GREEN}Successfully cleared all MCP tools. mcp.json has been reset.{Style.RESET_ALL}")
+ return True
+ except Exception as e:
+ print(f"{Fore.RED}Error clearing MCP tools: {e}{Style.RESET_ALL}")
+ return False
+
+ def initialize_servers(self, available_tools: List[dict], selected_indices: List[int]) -> None:
+ """Initialize selected MCP servers."""
+ # Use the MCP classes passed during initialization
+ if not self.MCPServerStdio or not self.MCPServerSse:
+ raise ValueError("MCP server classes not provided during initialization")
+
+ print(f"{Fore.GREEN}Initializing selected MCP servers...{Style.RESET_ALL}")
+ for idx in selected_indices:
+ if idx < len(available_tools):
+ server = available_tools[idx]
+ print(f"{Fore.CYAN}Initializing {server['name']}...{Style.RESET_ALL}")
+ try:
+ if 'params' in server:
+ mcp_server = self.MCPServerStdio(
+ name=server['name'],
+ params=server['params'],
+ cache_tools_list=server.get('cache_tools_list', True),
+ client_session_timeout_seconds=MCP_SESSION_TIMEOUT
+ )
+ elif 'url' in server:
+ mcp_server = self.MCPServerSse(
+ params={"url": server["url"]},
+ cache_tools_list=server.get('cache_tools_list', True),
+ name=server['name'],
+ client_session_timeout_seconds=MCP_SESSION_TIMEOUT
+ )
+ else:
+ print(f"{Fore.RED}Unknown MCP server configuration: {server}{Style.RESET_ALL}")
+ continue
+ self.server_instances.append(mcp_server)
+ except Exception as e:
+ print(f"{Fore.RED}Error initializing {server['name']}: {e}{Style.RESET_ALL}")
+
+ async def connect_servers(self) -> List:
+ """Connect to initialized MCP servers."""
+ if not self.server_instances:
+ return []
+
+ print(f"{Fore.YELLOW}Connecting to MCP servers...{Style.RESET_ALL}")
+ for mcp_server in self.server_instances:
+ try:
+ await mcp_server.connect()
+ print(f"{Fore.GREEN}Successfully connected to MCP server: {mcp_server.name}{Style.RESET_ALL}")
+ self.connected_servers.append(mcp_server)
+ except Exception as e:
+ print(f"{Fore.RED}Failed to connect to MCP server {mcp_server.name}: {e}{Style.RESET_ALL}")
+
+ if self.connected_servers:
+ print(f"{Fore.GREEN}MCP server connection successful! Can use tools provided by {len(self.connected_servers)} servers.{Style.RESET_ALL}")
+ else:
+ print(f"{Fore.YELLOW}No MCP servers successfully connected. Proceeding without tools.{Style.RESET_ALL}")
+
+ return self.connected_servers
+
+ async def setup_mcp_tools(self, use_mcp: bool = False) -> Tuple[List, List]:
+ """
+ Main method to setup MCP tools.
+
+ Args:
+ use_mcp: Whether to use MCP tools
+
+ Returns:
+ Tuple of (server_instances, connected_servers)
+ """
+ if not use_mcp:
+ print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
+ return [], []
+
+ while True: # Loop to handle configuration and reload
+ # Load available tools
+ available_tools = self.load_mcp_config()
+
+ # Get user selection
+ selected_indices = self.display_tool_menu(available_tools)
+
+ # Handle special cases
+ if selected_indices is None:
+ # Restart needed (e.g., after clearing tools)
+ return self.server_instances, []
+ elif selected_indices == "reload_and_continue":
+ # Tools were configured, reload and show menu again
+ continue
+ else:
+ # Normal selection, proceed with initialization
+ break
+
+ # Initialize servers
+ if selected_indices:
+ self.initialize_servers(available_tools, selected_indices)
+
+ # Connect to servers
+ connected = await self.connect_servers()
+
+ return self.server_instances, connected
+
+ async def cleanup_servers(self) -> None:
+ """Clean up MCP server resources."""
+ if not self.server_instances:
+ return
+
+ print(f"{Fore.YELLOW}Cleaning up MCP server resources...{Style.RESET_ALL}")
+
+ for mcp_server in self.server_instances:
+ print(f"{Fore.YELLOW}Attempting to clean up server: {mcp_server.name}...{Style.RESET_ALL}", flush=True)
+ try:
+ await mcp_server.cleanup()
+ print(f"{Fore.GREEN}Cleanup completed for {mcp_server.name}.{Style.RESET_ALL}", flush=True)
+ except Exception:
+ print(f"{Fore.RED}Failed to cleanup {mcp_server.name}.{Style.RESET_ALL}", flush=True)
+
+ print(f"{Fore.YELLOW}MCP server resource cleanup complete.{Style.RESET_ALL}")
\ No newline at end of file
diff --git a/ui/__init__.py b/ui/__init__.py
new file mode 100644
index 0000000..209243b
--- /dev/null
+++ b/ui/__init__.py
@@ -0,0 +1 @@
+"""User interface components for GHOSTCREW."""
\ No newline at end of file
diff --git a/ui/conversation_manager.py b/ui/conversation_manager.py
new file mode 100644
index 0000000..bf01b4d
--- /dev/null
+++ b/ui/conversation_manager.py
@@ -0,0 +1,122 @@
+"""Conversation history management for GHOSTCREW."""
+
+from typing import List, Dict, Optional
+import tiktoken
+from config.app_config import app_config
+
+
+class ConversationManager:
+ """Manages conversation history and dialogue tracking."""
+
+ def __init__(self, max_tokens: int = 4000):
+ """
+ Initialize the conversation manager.
+
+ Args:
+ max_tokens: Maximum tokens to keep in history
+ """
+ self.history: List[Dict[str, str]] = []
+ self.max_tokens = max_tokens
+ self.model_name = app_config.model_name
+
+ def add_dialogue(self, user_query: str, ai_response: str = "") -> None:
+ """
+ Add a dialogue entry to the conversation history.
+
+ Args:
+ user_query: The user's query
+ ai_response: The AI's response (can be empty initially)
+ """
+ dialogue = {
+ "user_query": user_query,
+ "ai_response": ai_response
+ }
+ self.history.append(dialogue)
+
+ # Trim history if it exceeds token limit
+ self._trim_history()
+
+ def update_last_response(self, ai_response: str) -> None:
+ """
+ Update the AI response for the last dialogue entry.
+
+ Args:
+ ai_response: The AI's response to update
+ """
+ if self.history:
+ self.history[-1]["ai_response"] = ai_response
+
+ def get_history(self) -> List[Dict[str, str]]:
+ """Get the complete conversation history."""
+ return self.history
+
+ def get_history_for_context(self) -> List[Dict[str, str]]:
+ """Get conversation history suitable for context."""
+ return self.history
+
+ def estimate_tokens(self) -> int:
+ """
+ Estimate the number of tokens in the conversation history.
+
+ Returns:
+ Estimated token count
+ """
+ try:
+ encoding = tiktoken.encoding_for_model(self.model_name)
+ return sum(
+ len(encoding.encode(entry['user_query'])) +
+ len(encoding.encode(entry.get('ai_response', '')))
+ for entry in self.history
+ )
+ except Exception:
+ # Fall back to approximate counting if tiktoken fails
+ return sum(
+ len(entry['user_query'].split()) +
+ len(entry.get('ai_response', '').split())
+ for entry in self.history
+ )
+
+ def _trim_history(self) -> None:
+ """Trim history to keep token count under the limit."""
+ while self.estimate_tokens() > self.max_tokens and len(self.history) > 1:
+ self.history.pop(0)
+
+ def clear_history(self) -> None:
+ """Clear all conversation history."""
+ self.history = []
+
+ def get_dialogue_count(self) -> int:
+ """Get the number of dialogues in history."""
+ return len(self.history)
+
+ def get_workflow_conversation(self, start_index: int) -> List[Dict[str, str]]:
+ """
+ Get conversation history starting from a specific index.
+
+ Args:
+ start_index: The index to start from
+
+ Returns:
+ Subset of conversation history
+ """
+ return self.history[start_index:]
+
+ def export_history(self) -> str:
+ """
+ Export conversation history as formatted text.
+
+ Returns:
+ Formatted conversation history
+ """
+ if not self.history:
+ return "No conversation history available."
+
+ output = []
+ for i, entry in enumerate(self.history, 1):
+ output.append(f"=== Dialogue {i} ===")
+ output.append(f"User: {entry['user_query']}")
+ if entry.get('ai_response'):
+ output.append(f"AI: {entry['ai_response']}")
+ output.append("")
+
+ return "\n".join(output)
\ No newline at end of file
diff --git a/ui/menu_system.py b/ui/menu_system.py
new file mode 100644
index 0000000..f0c87d7
--- /dev/null
+++ b/ui/menu_system.py
@@ -0,0 +1,158 @@
+"""Menu system and user interface components for GHOSTCREW."""
+
+from typing import Optional, List, Tuple
+from colorama import Fore, Style
+from config.constants import (
+ MAIN_MENU_TITLE, INTERACTIVE_OPTION, AUTOMATED_OPTION,
+ EXIT_OPTION, MULTI_LINE_PROMPT, MULTI_LINE_END_MARKER
+)
+
+
+class MenuSystem:
+ """Handles all menu displays and user input for GHOSTCREW."""
+
+ @staticmethod
+ def display_main_menu(workflows_available: bool, has_connected_servers: bool) -> None:
+ """Display the main application menu."""
+ print(f"\n{MAIN_MENU_TITLE}")
+ print(f"1. {INTERACTIVE_OPTION}")
+
+ # Check if automated mode should be available
+ if workflows_available and has_connected_servers:
+ print(f"2. {AUTOMATED_OPTION}")
+ elif workflows_available and not has_connected_servers:
+ print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (requires MCP tools){Style.RESET_ALL}")
+ else:
+ print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (workflows.py not found){Style.RESET_ALL}")
+
+ print(f"3. {EXIT_OPTION}")
+
+ @staticmethod
+ def get_menu_choice(max_option: int = 3) -> str:
+ """Get user's menu selection."""
+ return input(f"\n{Fore.GREEN}Select mode (1-{max_option}): {Style.RESET_ALL}").strip()
+
+ @staticmethod
+ def display_interactive_mode_intro() -> None:
+ """Display introduction for interactive chat mode."""
+ print(f"\n{Fore.CYAN}INTERACTIVE CHAT MODE{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Type your questions or commands. Use 'multi' for multi-line input.{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Type 'menu' to return to main menu.{Style.RESET_ALL}\n")
+
+ @staticmethod
+ def get_user_input() -> str:
+ """Get user input with prompt."""
+ print(f"\n{Fore.GREEN}[>]{Style.RESET_ALL} ", end="")
+ return input().strip()
+
+ @staticmethod
+ def get_multi_line_input() -> Optional[str]:
+ """Get multi-line input from user."""
+ print(f"{Fore.CYAN}Entering multi-line mode. Type your query across multiple lines.{Style.RESET_ALL}")
+ print(f"{Fore.CYAN}Type '{MULTI_LINE_END_MARKER}' to end input.{Style.RESET_ALL}")
+
+ lines = []
+ while True:
+ line = input()
+ if line == MULTI_LINE_END_MARKER:
+ break
+ lines.append(line)
+
+ # Only proceed if they actually entered something
+ if not lines:
+ print(f"{Fore.YELLOW}No query entered in multi-line mode.{Style.RESET_ALL}")
+ return None
+
+ return "\n".join(lines)
+
+ @staticmethod
+ def display_no_query_message() -> None:
+ """Display message when no query is entered."""
+ print(f"{Fore.YELLOW}No query entered. Please type your question.{Style.RESET_ALL}")
+
+ @staticmethod
+ def display_ready_message() -> None:
+ """Display ready for next query message."""
+ print(f"\n{Fore.CYAN}Ready for next query. Type 'quit', 'multi' for multi-line, or 'menu' for main menu.{Style.RESET_ALL}")
+
+ @staticmethod
+ def display_exit_message() -> None:
+ """Display exit message."""
+ print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}")
+
+ @staticmethod
+ def display_workflow_requirements_message() -> None:
+ """Display message about automated workflow requirements."""
+ print(f"\n{Fore.YELLOW}Automated penetration testing requires MCP tools to be configured and connected.{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Without real security tools, the AI would only generate simulated responses.{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Please configure MCP tools to use this feature.{Style.RESET_ALL}")
+ input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
+
+ @staticmethod
+ def get_workflow_target() -> Optional[str]:
+ """Get target input for workflow execution."""
+ target = input(f"{Fore.YELLOW}Enter target (IP, domain, or network): {Style.RESET_ALL}").strip()
+ if not target:
+ print(f"{Fore.RED}Target is required.{Style.RESET_ALL}")
+ return None
+ return target
+
+ @staticmethod
+ def confirm_workflow_execution(workflow_name: str, target: str) -> bool:
+ """Confirm workflow execution with user."""
+ confirm = input(f"{Fore.YELLOW}Execute '{workflow_name}' on {target}? (yes/no): {Style.RESET_ALL}").strip().lower()
+ return confirm == 'yes'
+
+ @staticmethod
+ def display_workflow_cancelled() -> None:
+ """Display workflow cancelled message."""
+ print(f"{Fore.YELLOW}Workflow cancelled.{Style.RESET_ALL}")
+
+ @staticmethod
+ def display_workflow_completed() -> None:
+ """Display workflow completion message."""
+ print(f"\n{Fore.GREEN}Workflow completed successfully!{Style.RESET_ALL}")
+
+ @staticmethod
+ def ask_generate_report() -> bool:
+ """Ask if user wants to generate a report."""
+ response = input(f"\n{Fore.CYAN}Generate markdown report? (yes/no): {Style.RESET_ALL}").strip().lower()
+ return response == 'yes'
+
+ @staticmethod
+ def ask_save_raw_history() -> bool:
+ """Ask if user wants to save raw conversation history."""
+ response = input(f"{Fore.YELLOW}Save raw conversation history? (yes/no, default: no): {Style.RESET_ALL}").strip().lower()
+ return response == 'yes'
+
+ @staticmethod
+ def display_report_generated(report_path: str) -> None:
+ """Display report generation success message."""
+ print(f"\n{Fore.GREEN}Report generated: {report_path}{Style.RESET_ALL}")
+ print(f"{Fore.CYAN}Open the markdown file in any markdown viewer for best formatting{Style.RESET_ALL}")
+
+ @staticmethod
+ def display_report_error(error: Exception) -> None:
+ """Display report generation error message."""
+ print(f"\n{Fore.RED}Error generating report: {error}{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}Raw workflow data is still available in conversation history{Style.RESET_ALL}")
+
+ @staticmethod
+ def display_invalid_choice() -> None:
+ """Display invalid choice message."""
+ print(f"{Fore.RED}Invalid choice. Please select a valid option.{Style.RESET_ALL}")
+
+ @staticmethod
+ def display_invalid_input() -> None:
+ """Display invalid input message."""
+ print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}")
+
+ @staticmethod
+ def display_operation_cancelled() -> None:
+ """Display operation cancelled message."""
+ print(f"\n{Fore.YELLOW}Operation cancelled.{Style.RESET_ALL}")
+
+ @staticmethod
+ def press_enter_to_continue() -> None:
+ """Wait for user to press enter."""
+ input(f"\n{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
\ No newline at end of file
diff --git a/workflows/__init__.py b/workflows/__init__.py
new file mode 100644
index 0000000..8a7f421
--- /dev/null
+++ b/workflows/__init__.py
@@ -0,0 +1 @@
+"""Workflow system for GHOSTCREW."""
\ No newline at end of file
diff --git a/workflows.py b/workflows/workflow_definitions.py
similarity index 100%
rename from workflows.py
rename to workflows/workflow_definitions.py
diff --git a/workflows/workflow_engine.py b/workflows/workflow_engine.py
new file mode 100644
index 0000000..f64475a
--- /dev/null
+++ b/workflows/workflow_engine.py
@@ -0,0 +1,153 @@
+"""Workflow execution engine for GHOSTCREW."""
+
+import asyncio
+from typing import List, Dict, Any, Optional
+from colorama import Fore, Style
+from datetime import datetime
+from workflows.workflow_definitions import (
+ get_available_workflows, get_workflow_by_key, list_workflow_names
+)
+from config.constants import (
+ ERROR_NO_WORKFLOWS, ERROR_WORKFLOW_NOT_FOUND, WORKFLOW_TARGET_PROMPT,
+ WORKFLOW_CONFIRM_PROMPT, WORKFLOW_CANCELLED_MESSAGE, WORKFLOW_COMPLETED_MESSAGE
+)
+from tools.mcp_manager import MCPManager
+
+
+class WorkflowEngine:
+ """Handles automated workflow execution."""
+
+ def __init__(self):
+ """Initialize the workflow engine."""
+ self.workflows_available = self._check_workflows_available()
+
+ @staticmethod
+ def _check_workflows_available() -> bool:
+ """Check if workflow definitions are available."""
+ try:
+ # Test import to verify module is available
+ from workflows.workflow_definitions import get_available_workflows
+ return True
+ except ImportError:
+ return False
+
+ def is_available(self) -> bool:
+ """Check if workflows are available."""
+ return self.workflows_available
+
+ @staticmethod
+ def show_automated_menu() -> Optional[List[tuple]]:
+ """Display the automated workflow selection menu."""
+ try:
+ print(f"\n{Fore.CYAN}AUTOMATED PENTESTING WORKFLOWS{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}{'='*50}{Style.RESET_ALL}")
+
+ workflow_list = list_workflow_names()
+ workflows = get_available_workflows()
+
+ for i, (key, name) in enumerate(workflow_list, 1):
+ description = workflows[key]["description"]
+ step_count = len(workflows[key]["steps"])
+ print(f"{i}. {Fore.YELLOW}{name}{Style.RESET_ALL}")
+ print(f" {Fore.WHITE}{description}{Style.RESET_ALL}")
+ print(f" {Fore.CYAN}Steps: {step_count}{Style.RESET_ALL}")
+ print()
+
+ print(f"{len(workflow_list)+1}. {Fore.RED}Back to Main Menu{Style.RESET_ALL}")
+
+ return workflow_list
+ except Exception:
+ print(f"{Fore.YELLOW}Error loading workflows.{Style.RESET_ALL}")
+ return None
+
+ async def run_automated_workflow(
+ self,
+ workflow: Dict[str, Any],
+ target: str,
+ connected_servers: List[Any],
+ conversation_history: List[Dict[str, str]],
+ kb_instance: Any,
+ run_agent_func: Any
+ ) -> List[Dict[str, Any]]:
+ """
+ Execute an automated penetration testing workflow.
+
+ Args:
+ workflow: The workflow definition
+ target: The target for the workflow
+ connected_servers: List of connected MCP servers
+ conversation_history: Conversation history list
+ kb_instance: Knowledge base instance
+ run_agent_func: Function to run agent queries
+
+ Returns:
+ List of workflow results
+ """
+ available_tools = MCPManager.get_available_tools(connected_servers)
+
+ print(f"\n{Fore.CYAN}Starting Automated Workflow: {workflow['name']}{Style.RESET_ALL}")
+ print(f"{Fore.YELLOW}Target: {target}{Style.RESET_ALL}")
+ print(f"{Fore.CYAN}Available Tools: {', '.join(available_tools) if available_tools else 'None'}{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}Description: {workflow['description']}{Style.RESET_ALL}")
+ print(f"{Fore.WHITE}{'='*60}{Style.RESET_ALL}")
+
+ results = []
+
+ for i, step in enumerate(workflow['steps'], 1):
+ print(f"\n{Fore.CYAN}Step {i}/{len(workflow['steps'])}{Style.RESET_ALL}")
+ formatted_step = step.format(target=target)
+ print(f"{Fore.WHITE}{formatted_step}{Style.RESET_ALL}")
+
+ # Create comprehensive query for this step
+ enhanced_query = f"""
+TARGET: {target}
+STEP: {formatted_step}
+
+Execute this step and provide the results.
+"""
+
+ # Execute the step through the agent
+ result = await run_agent_func(
+ enhanced_query,
+ connected_servers,
+ history=conversation_history,
+ streaming=True,
+ kb_instance=kb_instance
+ )
+
+ if result and hasattr(result, "final_output"):
+ results.append({
+ "step": i,
+ "description": formatted_step,
+ "output": result.final_output
+ })
+
+ # Add to conversation history
+ conversation_history.append({
+ "user_query": enhanced_query,
+ "ai_response": result.final_output
+ })
+
+ print(f"{Fore.GREEN}Step {i} completed{Style.RESET_ALL}")
+
+ # Brief delay between steps
+ await asyncio.sleep(1)
+
+ # Workflow completion summary
+ print(f"{Fore.CYAN}Steps executed: {len(results)}/{len(workflow['steps'])}{Style.RESET_ALL}")
+
+ return results
+
+ def get_workflow(self, workflow_key: str) -> Optional[Dict[str, Any]]:
+ """Get a workflow by its key."""
+ try:
+ return get_workflow_by_key(workflow_key)
+ except Exception:
+ return None
+
+ def get_workflow_list(self) -> List[tuple]:
+ """Get list of available workflows."""
+ try:
+ return list_workflow_names()
+ except Exception:
+ return []
\ No newline at end of file