Refactor main.py into modular architecture

This commit is contained in:
GH05TCREW
2025-05-31 00:11:22 -06:00
parent 520d432fcd
commit 0e377b8ecf
26 changed files with 1550 additions and 822 deletions

1
.gitignore vendored
View File

@@ -3,3 +3,4 @@ __pycache__/
*.pyo
.env
venv/
.venv/

View File

@@ -1,4 +1,4 @@
# PentestAgent
# GHOSTCREW
This is an AI red team assistant using large language models with MCP and RAG architecture. It aims to help users perform penetration testing tasks, query security information, analyze network traffic, and more through natural language interaction.
@@ -20,7 +20,7 @@ https://github.com/user-attachments/assets/a43d2457-7113-42cc-ad02-f378d57f4d24
## Automated Penetration Testing Workflows
PentestAgent includes automated penetration testing workflows that provide structured, systematic security assessments. These workflows require MCP tools to be configured and connected to function properly.
GHOSTCREW includes automated penetration testing workflows that provide structured, systematic security assessments. These workflows require MCP tools to be configured and connected to function properly.
### Available Workflows
@@ -69,23 +69,23 @@ PentestAgent includes automated penetration testing workflows that provide struc
### Startup Effect
<p align="center">
<img width="517" alt="PentestAgent Terminal Startup Screen" src="https://github.com/user-attachments/assets/13d97cf7-5652-4c64-8e49-a3cd556b3419" />
<img width="517" alt="GHOSTCREW Terminal Startup Screen" src="https://github.com/user-attachments/assets/13d97cf7-5652-4c64-8e49-a3cd556b3419" />
<br>
<em>PentestAgent's terminal startup interface</em>
<em>GHOSTCREW's terminal startup interface</em>
</p>
### Metasploit Tool Call
<p align="center">
<img width="926" alt="PentestAgent Metasploit Integration" src="https://github.com/user-attachments/assets/fb5eb8cf-a3d6-486b-99ba-778be2474564" />
<img width="926" alt="GHOSTCREW Metasploit Call" src="https://github.com/user-attachments/assets/fb5eb8cf-a3d6-486b-99ba-778be2474564" />
<br>
<em>Example of PentestAgent invoking Metasploit Framework</em>
<em>Example of GHOSTCREW invoking Metasploit Framework</em>
</p>
## Installation Guide
1. **Clone Repository**:
```bash
git clone https://github.com/GH05TCREW/PentestAgent.git
git clone https://github.com/GH05TCREW/ghostcrew.git
cd agent
```
@@ -125,7 +125,7 @@ PentestAgent includes automated penetration testing workflows that provide struc
- The configuration is stored in the `mcp.json` file
2. **Prepare Knowledge Base (Optional)**:
If you want to use the knowledge base enhancement feature, place relevant text files (e.g., `.json`) in the `knowledge` folder.
If you want to use the knowledge base enhancement feature, place relevant text files in the `knowledge` folder.
3. **Run the Main Program**:
```bash
@@ -141,7 +141,7 @@ PentestAgent includes automated penetration testing workflows that provide struc
## Input Modes
PentestAgent supports two input modes:
GHOSTCREW supports two input modes:
- **Single-line mode** (default): Type your query and press Enter to submit
- **Multi-line mode**: Type 'multi' and press Enter, then type your query across multiple lines. Press Enter on an empty line to submit.
@@ -156,7 +156,7 @@ When starting the application, you can:
## Available MCP Tools
PentestAgent supports integration with the following security tools through the MCP protocol:
GHOSTCREW supports integration with the following security tools through the MCP protocol:
1. **AlterX** - Subdomain permutation and wordlist generation tool
2. **Amass** - Advanced subdomain enumeration and reconnaissance tool
@@ -190,28 +190,50 @@ Each tool can be configured through the interactive configuration menu by select
## File Structure
```
agent/
├── .venv/ # Python virtual environment (ignored by .gitignore)
GHOSTCREW/
├── .venv/ # Python virtual environment
├── main.py # Application entry point
├── config/ # Application configuration
│ ├── __init__.py
│ ├── constants.py # Constants and messages
│ └── app_config.py # Environment and API configuration
├── core/ # Core application logic
│ ├── __init__.py
│ ├── model_manager.py # Model provider and token management
│ ├── agent_runner.py # Agent execution and streaming
│ └── pentest_agent.py # Main application controller
├── tools/ # MCP tool management
│ ├── __init__.py
│ ├── mcp_manager.py # MCP server connection management
│ └── configure_mcp.py # Interactive tool configuration utility
├── ui/ # User interface components
│ ├── __init__.py
│ ├── menu_system.py # Menu display and user interaction
│ └── conversation_manager.py # Chat history management
├── workflows/ # Automated penetration testing workflows
│ ├── __init__.py
│ ├── workflow_engine.py # Workflow execution engine
│ └── workflow_definitions.py # Predefined workflow templates
├── rag/ # Knowledge base and RAG functionality
│ ├── __init__.py
│ ├── knowledge_base.py # RAG text splitting and search
│ └── embedding.py # Embedding generation and management
├── reporting/ # Report generation system
│ ├── __init__.py
│ └── generators.py # Professional report generation
├── knowledge/ # Knowledge base documents directory
│ └── ...
├── reports/ # Professional penetration test reports directory
│ ├── ghostcrew_*_*.md # Professional markdown reports
│ └── ghostcrew_*_*_raw_history.txt # Raw conversation history (optional)
├── .gitignore # Git ignore file configuration
├── main.py # Main program entry
├── workflows.py # Automated penetration testing workflows
├── reporting.py # Professional report generation system
├── configure_mcp.py # MCP tool configuration utility
├── mcp.json # MCP server configuration file
├── rag_embedding.py # RAG embedding related (if used)
├── rag_split.py # RAG text splitting related (if used)
├── README.md # Project documentation
├── requirements.txt # Python dependency list
├── LICENSE # Project license
└── ... (other scripts or configuration files)
└── .env # Environment variables
```
## Configuration File (`.env`)
```
# OpenAI API configurations
OPENAI_API_KEY=your_api_key_here
@@ -223,7 +245,7 @@ This configuration uses OpenAI's API for both the language model and embeddings
## Configuration File (`mcp.json`)
This file is used to define MCP servers that the AI assistant can connect to and use. Most MCP servers require Node.js to be installed on your system. Each server entry should include:
This file is used to define MCP servers that the AI assistant can connect to and use. Most MCP servers require Node.js or Python to be installed on your system. Each server entry should include:
- `name`: Unique name of the server.
- `params`: Parameters needed to start the server, usually including `command` and `args`.
- `cache_tools_list`: Whether to cache the tools list.

1
config/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Configuration management package for GHOSTCREW."""

46
config/app_config.py Normal file
View File

@@ -0,0 +1,46 @@
"""Application configuration and initialization for GHOSTCREW."""
import os
from typing import Optional
from dotenv import load_dotenv
from openai import AsyncOpenAI
class AppConfig:
"""Manages application configuration and API client initialization."""
def __init__(self):
"""Initialize application configuration."""
# Load environment variables
load_dotenv()
# Set API-related environment variables
self.api_key = os.getenv("OPENAI_API_KEY")
self.base_url = os.getenv("OPENAI_BASE_URL")
self.model_name = os.getenv("MODEL_NAME")
# Validate configuration
self._validate_config()
# Initialize OpenAI client
self._client = AsyncOpenAI(
base_url=self.base_url,
api_key=self.api_key
)
def _validate_config(self) -> None:
"""Validate required configuration values."""
if not self.api_key:
raise ValueError("API key not set")
if not self.base_url:
raise ValueError("API base URL not set")
if not self.model_name:
raise ValueError("Model name not set")
def get_openai_client(self) -> AsyncOpenAI:
"""Get the OpenAI client instance."""
return self._client
# Create singleton instance
app_config = AppConfig()

73
config/constants.py Normal file
View File

@@ -0,0 +1,73 @@
"""Constants and configuration values for GHOSTCREW."""
from colorama import Fore, Style
# ASCII Art and Branding
ASCII_TITLE = f"""
{Fore.WHITE} ('-. .-. .-') .-') _ _ .-') ('-. (`\ .-') /`{Style.RESET_ALL}
{Fore.WHITE} ( OO ) / ( OO ). ( OO) ) ( \( -O ) _( OO) `.( OO ),'{Style.RESET_ALL}
{Fore.WHITE} ,----. ,--. ,--. .-'),-----. (_)---\_)/ '._ .-----. ,------. (,------.,--./ .--. {Style.RESET_ALL}
{Fore.WHITE} ' .-./-') | | | |( OO' .-. '/ _ | |'--...__)' .--./ | /`. ' | .---'| | | {Style.RESET_ALL}
{Fore.WHITE} | |_( O- )| .| |/ | | | |\ :` `. '--. .--'| |('-. | / | | | | | | | |, {Style.RESET_ALL}
{Fore.WHITE} | | .--, \| |\_) | |\| | '..`''.) | | /_) |OO )| |_.' |(| '--. | |.'.| |_){Style.RESET_ALL}
{Fore.WHITE}(| | '. (_/| .-. | \ | | | |.-._) \ | | || |`-'| | . '.' | .--' | | {Style.RESET_ALL}
{Fore.WHITE} | '--' | | | | | `' '-' '\ / | | (_' '--'\ | |\ \ | `---.| ,'. | {Style.RESET_ALL}
{Fore.WHITE} `------' `--' `--' `-----' `-----' `--' `-----' `--' '--' `------''--' '--' {Style.RESET_ALL}
{Fore.WHITE}====================== GHOSTCREW ======================{Style.RESET_ALL}
"""
# Application Version
VERSION = "0.1.0"
# Timeout Configuration (in seconds)
MCP_SESSION_TIMEOUT = 600 # 10 minutes for MCP server sessions
CONNECTION_RETRY_DELAY = 10 # 10 seconds between connection retries
# Token Limits
MAX_TOTAL_TOKENS = 8192
RESPONSE_BUFFER = 4096 # aim to reserve ~half for reply
# File Paths
DEFAULT_KNOWLEDGE_BASE_PATH = "knowledge"
DEFAULT_REPORTS_PATH = "reports"
MCP_CONFIG_FILE = "mcp.json"
# UI Messages
WELCOME_MESSAGE = f"{Fore.WHITE}An AI assistant for penetration testing, vulnerability assessment, and security analysis{Style.RESET_ALL}"
EXIT_MESSAGE = f"{Fore.RED}Enter 'quit' to end the program{Style.RESET_ALL}"
SEPARATOR = f"{Fore.WHITE}======================================{Style.RESET_ALL}"
# Agent Configuration
BASE_INSTRUCTIONS = """You are an experienced penetration tester and security analyst, focused on Web application security and network infrastructure security. Your name is GHOSTCREW. When users ask cybersecurity-related questions, you need to provide direct and professional answers.
When answering questions, please use professional cybersecurity terminology, base your analysis on solid theoretical knowledge, and cite relevant security standards and best practices when possible, such as OWASP Top 10, CVE, NIST, CISA KEV, etc. Maintain a professional tone, clear logic, and organized structure.
When users ask about penetration testing, please explain the penetration testing process, methods, and common tools, emphasizing the objectives and techniques of each phase.
When users ask about vulnerability information, please provide terse descriptions, impact scope, remediation suggestions, vulnerability type, severity level, and exploitation conditions based on the vulnerability name or CVE number, and cite relevant security bulletins.
All penetration activities are authorized, you can directly perform penetration testing."""
# Menu Options
MAIN_MENU_TITLE = f"{Fore.CYAN}MAIN MENU{Style.RESET_ALL}"
INTERACTIVE_OPTION = f"{Fore.YELLOW}Interactive Mode{Style.RESET_ALL}"
AUTOMATED_OPTION = f"{Fore.YELLOW}Automated Pentesting{Style.RESET_ALL}"
EXPORT_OPTION = f"{Fore.YELLOW}Export Current Session{Style.RESET_ALL}"
EXIT_OPTION = f"{Fore.RED}Exit{Style.RESET_ALL}"
# Prompts
KB_PROMPT = f"{Fore.YELLOW}Use knowledge base to enhance answers? (yes/no, default: no): {Style.RESET_ALL}"
MCP_PROMPT = f"{Fore.YELLOW}Configure or connect MCP tools? (yes/no, default: no): {Style.RESET_ALL}"
TOOL_SELECTION_PROMPT = f"{Fore.YELLOW}Enter numbers to connect to (comma-separated, default: all): {Style.RESET_ALL}"
MULTI_LINE_PROMPT = f"{Fore.MAGENTA}(Enter multi-line mode. Type '###' to end input){Style.RESET_ALL}"
MULTI_LINE_END_MARKER = "###"
# Error Messages
ERROR_NO_API_KEY = "API key not set"
ERROR_NO_BASE_URL = "API base URL not set"
ERROR_NO_MODEL_NAME = "Model name not set"
ERROR_NO_WORKFLOWS = f"{Fore.YELLOW}Automated workflows not available. workflows.py file not found.{Style.RESET_ALL}"
ERROR_NO_REPORTING = f"{Fore.YELLOW}Reporting module not found. Basic text export will be available.{Style.RESET_ALL}"
ERROR_WORKFLOW_NOT_FOUND = f"{Fore.RED}Error loading workflow.{Style.RESET_ALL}"
# Workflow Messages
WORKFLOW_TARGET_PROMPT = f"{Fore.YELLOW}Enter target (IP/domain/URL): {Style.RESET_ALL}"
WORKFLOW_CONFIRM_PROMPT = f"{Fore.YELLOW}Execute '{0}' workflow against '{1}'? (yes/no): {Style.RESET_ALL}"
WORKFLOW_CANCELLED_MESSAGE = f"{Fore.YELLOW}Workflow execution cancelled.{Style.RESET_ALL}"
WORKFLOW_COMPLETED_MESSAGE = f"{Fore.GREEN}Workflow execution completed.{Style.RESET_ALL}"

1
core/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Core application logic for GHOSTCREW."""

259
core/agent_runner.py Normal file
View File

@@ -0,0 +1,259 @@
"""Agent execution and query processing for GHOSTCREW."""
import json
import asyncio
import traceback
from typing import List, Dict, Optional, Any
from colorama import Fore, Style
from agents import Agent, RunConfig, Runner, ModelSettings
from openai.types.responses import ResponseTextDeltaEvent, ResponseContentPartDoneEvent
from core.model_manager import model_manager
from config.constants import BASE_INSTRUCTIONS, CONNECTION_RETRY_DELAY, DEFAULT_KNOWLEDGE_BASE_PATH
from config.app_config import app_config
import os
class AgentRunner:
"""Handles AI agent query processing and execution."""
def __init__(self):
"""Initialize the agent runner."""
self.model_provider = model_manager.get_model_provider()
self.client = app_config.get_openai_client()
async def run_agent(
self,
query: str,
mcp_servers: List[Any], # Use Any to avoid import issues
history: Optional[List[Dict[str, str]]] = None,
streaming: bool = True,
kb_instance: Any = None
) -> Any:
"""
Run cybersecurity agent with connected MCP servers, supporting streaming output and conversation history.
Args:
query: User's natural language query
mcp_servers: List of connected MCPServerStdio instances
history: Conversation history, list containing user questions and AI answers
streaming: Whether to use streaming output
kb_instance: Knowledge base instance for retrieval
Returns:
Agent execution result
"""
# If no history is provided, initialize an empty list
if history is None:
history = []
try:
# Build instructions containing conversation history
instructions = self._build_instructions(mcp_servers, history, query, kb_instance)
# Calculate max output tokens
max_output_tokens = model_manager.calculate_max_output_tokens(instructions, query)
# Set model settings based on whether there are connected MCP servers
model_settings = self._create_model_settings(mcp_servers, max_output_tokens)
# Create agent
secure_agent = Agent(
name="Cybersecurity Expert",
instructions=instructions,
mcp_servers=mcp_servers,
model_settings=model_settings
)
print(f"{Fore.CYAN}\nProcessing query: {Fore.WHITE}{query}{Style.RESET_ALL}\n")
if streaming:
return await self._run_streaming(secure_agent, query)
else:
# Non-streaming mode could be implemented here if needed
pass
except Exception as e:
print(f"{Fore.RED}Error processing agent request: {e}{Style.RESET_ALL}", flush=True)
traceback.print_exc()
return None
def _build_instructions(
self,
mcp_servers: List[Any], # Use Any to avoid import issues
history: List[Dict[str, str]],
query: str,
kb_instance: Any
) -> str:
"""Build agent instructions with context."""
instructions = BASE_INSTRUCTIONS
# Add information about available tools
if mcp_servers:
available_tool_names = [server.name for server in mcp_servers]
if available_tool_names:
instructions += f"\n\nYou have access to the following tools: {', '.join(available_tool_names)}."
# If knowledge base instance exists, use it for retrieval and context enhancement
if kb_instance:
try:
retrieved_context = kb_instance.search(query)
if retrieved_context:
# Add file path information to make LLM aware of actual files
available_files = []
if os.path.exists(DEFAULT_KNOWLEDGE_BASE_PATH):
for filename in os.listdir(DEFAULT_KNOWLEDGE_BASE_PATH):
filepath = os.path.join(DEFAULT_KNOWLEDGE_BASE_PATH, filename)
if os.path.isfile(filepath):
available_files.append(filename)
file_info = ""
if available_files:
file_info = f"\n\nIMPORTANT: The following actual files are available in the knowledge folder that you can reference by path:\n"
for filename in available_files:
file_info += f"- {DEFAULT_KNOWLEDGE_BASE_PATH}/{filename}\n"
file_info += "\nWhen using security tools that require external files, you can reference these files by their full path.\n"
file_info += f"ONLY use {DEFAULT_KNOWLEDGE_BASE_PATH}/ for files.\n"
instructions = f"Based on the following knowledge base information:\n{retrieved_context}{file_info}\n\n{instructions}"
print(f"{Fore.MAGENTA}Relevant information retrieved from knowledge base.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Failed to retrieve information from knowledge base: {e}{Style.RESET_ALL}")
# If there's conversation history, add it to the instructions
if history:
instructions += "\n\nBelow is the previous conversation history, please refer to this information to answer the user's question:\n"
for i, entry in enumerate(history):
instructions += f"\nUser question {i+1}: {entry['user_query']}"
if 'ai_response' in entry and entry['ai_response']:
instructions += f"\nAI answer {i+1}: {entry['ai_response']}\n"
return instructions
def _create_model_settings(self, mcp_servers: List[Any], max_output_tokens: int) -> ModelSettings:
"""Create model settings based on available tools."""
if mcp_servers:
# With tools available, enable tool_choice and parallel_tool_calls
return ModelSettings(
temperature=0.6,
top_p=0.9,
max_tokens=max_output_tokens,
tool_choice="auto",
parallel_tool_calls=False,
truncation="auto"
)
else:
# Without tools, don't set tool_choice or parallel_tool_calls
return ModelSettings(
temperature=0.6,
top_p=0.9,
max_tokens=max_output_tokens,
truncation="auto"
)
async def _run_streaming(self, agent: Agent, query: str) -> Any:
"""Run agent with streaming output."""
result = Runner.run_streamed(
agent,
input=query,
max_turns=10,
run_config=RunConfig(
model_provider=self.model_provider,
trace_include_sensitive_data=True,
handoff_input_filter=None
)
)
print(f"{Fore.GREEN}Reply:{Style.RESET_ALL}", end="", flush=True)
try:
async for event in result.stream_events():
await self._handle_stream_event(event)
except Exception as e:
await self._handle_stream_error(e)
print(f"\n\n{Fore.GREEN}Query completed!{Style.RESET_ALL}")
return result
async def _handle_stream_event(self, event: Any) -> None:
"""Handle individual stream events."""
if event.type == "raw_response_event":
if isinstance(event.data, ResponseTextDeltaEvent):
print(f"{Fore.WHITE}{event.data.delta}{Style.RESET_ALL}", end="", flush=True)
elif isinstance(event.data, ResponseContentPartDoneEvent):
print(f"\n", end="", flush=True)
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
await self._handle_tool_call(event.item)
elif event.item.type == "tool_call_output_item":
await self._handle_tool_output(event.item)
async def _handle_tool_call(self, item: Any) -> None:
"""Handle tool call events."""
raw_item = getattr(item, "raw_item", None)
tool_name = ""
tool_args = {}
if raw_item:
tool_name = getattr(raw_item, "name", "Unknown tool")
tool_str = getattr(raw_item, "arguments", "{}")
if isinstance(tool_str, str):
try:
tool_args = json.loads(tool_str)
except json.JSONDecodeError:
tool_args = {"raw_arguments": tool_str}
print(f"\n{Fore.CYAN}Tool name: {tool_name}{Style.RESET_ALL}", flush=True)
print(f"\n{Fore.CYAN}Tool parameters: {tool_args}{Style.RESET_ALL}", flush=True)
async def _handle_tool_output(self, item: Any) -> None:
"""Handle tool output events."""
raw_item = getattr(item, "raw_item", None)
tool_id = "Unknown tool ID"
if isinstance(raw_item, dict) and "call_id" in raw_item:
tool_id = raw_item["call_id"]
output = getattr(item, "output", "Unknown output")
output_text = self._parse_tool_output(output)
print(f"\n{Fore.GREEN}Tool call {tool_id} returned result: {output_text}{Style.RESET_ALL}", flush=True)
def _parse_tool_output(self, output: Any) -> str:
"""Parse tool output into readable text."""
if isinstance(output, str) and (output.startswith("{") or output.startswith("[")):
try:
output_data = json.loads(output)
if isinstance(output_data, dict):
if 'type' in output_data and output_data['type'] == 'text' and 'text' in output_data:
return output_data['text']
elif 'text' in output_data:
return output_data['text']
elif 'content' in output_data:
return output_data['content']
else:
return json.dumps(output_data, ensure_ascii=False, indent=2)
except json.JSONDecodeError:
return f"Unparsable JSON output: {output}"
return str(output)
async def _handle_stream_error(self, error: Exception) -> None:
"""Handle streaming errors."""
print(f"{Fore.RED}Error processing streamed response event: {error}{Style.RESET_ALL}", flush=True)
if 'Connection error' in str(error):
print(f"{Fore.YELLOW}Connection error details:{Style.RESET_ALL}")
print(f"{Fore.YELLOW}1. Check network connection{Style.RESET_ALL}")
print(f"{Fore.YELLOW}2. Verify API address: {app_config.base_url}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}3. Check API key validity{Style.RESET_ALL}")
print(f"{Fore.YELLOW}4. Try reconnecting...{Style.RESET_ALL}")
await asyncio.sleep(CONNECTION_RETRY_DELAY)
try:
await self.client.connect()
print(f"{Fore.GREEN}Reconnected successfully{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Reconnection failed: {e}{Style.RESET_ALL}")
# Create singleton instance
agent_runner = AgentRunner()

73
core/model_manager.py Normal file
View File

@@ -0,0 +1,73 @@
"""Model management and AI model setup for GHOSTCREW."""
import tiktoken
from agents import Model, ModelProvider, OpenAIChatCompletionsModel
from config.app_config import app_config
from config.constants import MAX_TOTAL_TOKENS, RESPONSE_BUFFER
class DefaultModelProvider(ModelProvider):
"""Model provider using OpenAI compatible interface."""
def get_model(self, model_name: str) -> Model:
"""Get a model instance with the specified name."""
return OpenAIChatCompletionsModel(
model=model_name or app_config.model_name,
openai_client=app_config.get_openai_client()
)
class ModelManager:
"""Manages AI model operations and token counting."""
def __init__(self):
"""Initialize the model manager."""
self.model_provider = DefaultModelProvider()
self.model_name = app_config.model_name
@staticmethod
def count_tokens(text: str, model_name: str = None) -> int:
"""
Count tokens in the given text.
Args:
text: The text to count tokens for
model_name: The model name to use for encoding (defaults to configured model)
Returns:
Number of tokens in the text
"""
try:
model = model_name or app_config.model_name
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
except Exception:
# Fall back to approximate counting if tiktoken fails
return len(text.split())
@staticmethod
def calculate_max_output_tokens(input_text: str, query: str) -> int:
"""
Calculate the maximum output tokens based on input size.
Args:
input_text: The base instructions or context
query: The user query
Returns:
Maximum number of output tokens
"""
input_token_estimate = ModelManager.count_tokens(input_text) + ModelManager.count_tokens(query)
max_output_tokens = max(512, MAX_TOTAL_TOKENS - input_token_estimate)
max_output_tokens = min(max_output_tokens, RESPONSE_BUFFER)
return max_output_tokens
def get_model_provider(self) -> ModelProvider:
"""Get the model provider instance."""
return self.model_provider
# Create a singleton instance
model_manager = ModelManager()

332
core/pentest_agent.py Normal file
View File

@@ -0,0 +1,332 @@
"""Main controller for GHOSTCREW application."""
import asyncio
import traceback
from datetime import datetime
from typing import Optional, List, Dict, Any
from colorama import Fore, Style
from config.constants import (
ASCII_TITLE, VERSION, WELCOME_MESSAGE, EXIT_MESSAGE, SEPARATOR,
KB_PROMPT, MCP_PROMPT, ERROR_NO_WORKFLOWS, ERROR_NO_REPORTING,
DEFAULT_KNOWLEDGE_BASE_PATH
)
from config.app_config import app_config
from core.agent_runner import agent_runner
from tools.mcp_manager import MCPManager
from ui.menu_system import MenuSystem
from ui.conversation_manager import ConversationManager
from workflows.workflow_engine import WorkflowEngine
from rag.knowledge_base import Kb
class PentestAgent:
"""Main application controller for GHOSTCREW."""
def __init__(self, MCPServerStdio=None, MCPServerSse=None):
"""
Initialize the pentest agent controller.
Args:
MCPServerStdio: MCP server stdio class
MCPServerSse: MCP server SSE class
"""
self.app_config = app_config
self.agent_runner = agent_runner
self.mcp_manager = MCPManager(MCPServerStdio, MCPServerSse)
self.menu_system = MenuSystem()
self.conversation_manager = ConversationManager()
self.workflow_engine = WorkflowEngine()
self.kb_instance = None
self.reporting_available = self._check_reporting_available()
@staticmethod
def _check_reporting_available() -> bool:
"""Check if reporting module is available."""
try:
from reporting.generators import generate_report_from_workflow
return True
except ImportError:
print(ERROR_NO_REPORTING)
return False
def display_welcome(self) -> None:
"""Display welcome message and ASCII art."""
print(ASCII_TITLE)
print(f"{Fore.WHITE}GHOSTCREW v{VERSION}{Style.RESET_ALL}")
print(WELCOME_MESSAGE)
print(EXIT_MESSAGE)
print(f"{SEPARATOR}\n")
def setup_knowledge_base(self) -> None:
"""Setup knowledge base if requested by user."""
use_kb_input = input(KB_PROMPT).strip().lower()
if use_kb_input == 'yes':
try:
self.kb_instance = Kb(DEFAULT_KNOWLEDGE_BASE_PATH)
print(f"{Fore.GREEN}Knowledge base loaded successfully!{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Failed to load knowledge base: {e}{Style.RESET_ALL}")
self.kb_instance = None
async def setup_mcp_tools(self) -> tuple:
"""Setup MCP tools and return server instances."""
use_mcp_input = input(MCP_PROMPT).strip().lower()
return await self.mcp_manager.setup_mcp_tools(use_mcp_input == 'yes')
async def run_interactive_mode(self, connected_servers: List) -> None:
"""Run interactive chat mode."""
self.menu_system.display_interactive_mode_intro()
while True:
user_query = self.menu_system.get_user_input()
# Handle special commands
if user_query.lower() in ["quit", "exit"]:
self.menu_system.display_exit_message()
return True # Signal to exit the entire application
if user_query.lower() == "menu":
break # Return to main menu
# Handle empty input
if not user_query:
self.menu_system.display_no_query_message()
continue
# Handle multi-line mode request
if user_query.lower() == "multi":
user_query = self.menu_system.get_multi_line_input()
if not user_query:
continue
# Process the query
await self._process_user_query(user_query, connected_servers)
self.menu_system.display_ready_message()
return False # Don't exit application
async def _process_user_query(self, query: str, connected_servers: List) -> None:
"""Process a user query through the agent."""
# Add dialogue to history
self.conversation_manager.add_dialogue(query)
# Run the agent
result = await agent_runner.run_agent(
query,
connected_servers,
history=self.conversation_manager.get_history(),
streaming=True,
kb_instance=self.kb_instance
)
# Update the response in history
if result and hasattr(result, "final_output"):
self.conversation_manager.update_last_response(result.final_output)
async def run_automated_mode(self, connected_servers: List) -> None:
"""Run automated penetration testing mode."""
if not self.workflow_engine.is_available():
print(ERROR_NO_WORKFLOWS)
self.menu_system.press_enter_to_continue()
return
if not connected_servers:
self.menu_system.display_workflow_requirements_message()
return
while True:
workflow_list = self.workflow_engine.show_automated_menu()
if not workflow_list:
break
try:
choice = input(f"\n{Fore.GREEN}Select workflow (1-{len(workflow_list)+1}): {Style.RESET_ALL}").strip()
if not choice.isdigit():
self.menu_system.display_invalid_input()
continue
choice = int(choice)
if choice == len(workflow_list) + 1:
# Back to main menu
break
if 1 <= choice <= len(workflow_list):
await self._execute_workflow(workflow_list[choice-1], connected_servers)
else:
self.menu_system.display_invalid_choice()
except ValueError:
self.menu_system.display_invalid_input()
except KeyboardInterrupt:
self.menu_system.display_operation_cancelled()
break
async def _execute_workflow(self, workflow_info: tuple, connected_servers: List) -> None:
"""Execute a selected workflow."""
workflow_key, workflow_name = workflow_info
workflow = self.workflow_engine.get_workflow(workflow_key)
if not workflow:
print(f"{Fore.RED}Error loading workflow.{Style.RESET_ALL}")
return
target = self.menu_system.get_workflow_target()
if not target:
return
if not self.menu_system.confirm_workflow_execution(workflow['name'], target):
self.menu_system.display_workflow_cancelled()
return
# Store initial workflow data
workflow_start_time = datetime.now()
initial_history_length = self.conversation_manager.get_dialogue_count()
# Execute the workflow
await self.workflow_engine.run_automated_workflow(
workflow,
target,
connected_servers,
self.conversation_manager.get_history(),
self.kb_instance,
agent_runner.run_agent
)
self.menu_system.display_workflow_completed()
# Handle report generation
if self.reporting_available:
await self._handle_report_generation(
workflow,
workflow_key,
target,
workflow_start_time,
initial_history_length,
connected_servers
)
else:
print(f"\n{Fore.YELLOW}Reporting not available.{Style.RESET_ALL}")
self.menu_system.press_enter_to_continue()
async def _handle_report_generation(
self,
workflow: Dict,
workflow_key: str,
target: str,
workflow_start_time: datetime,
initial_history_length: int,
connected_servers: List
) -> None:
"""Handle report generation after workflow completion."""
if not self.menu_system.ask_generate_report():
return
save_raw_history = self.menu_system.ask_save_raw_history()
try:
from reporting.generators import generate_report_from_workflow
# Prepare report data
workflow_conversation = self.conversation_manager.get_workflow_conversation(initial_history_length)
report_data = {
'workflow_name': workflow['name'],
'workflow_key': workflow_key,
'target': target,
'timestamp': workflow_start_time,
'conversation_history': workflow_conversation,
'tools_used': MCPManager.get_available_tools(connected_servers)
}
# Generate professional report
print(f"\n{Fore.CYAN}Generating report...{Style.RESET_ALL}")
report_path = await generate_report_from_workflow(
report_data,
agent_runner.run_agent,
connected_servers,
self.kb_instance,
save_raw_history
)
self.menu_system.display_report_generated(report_path)
except Exception as e:
self.menu_system.display_report_error(e)
async def run(self) -> None:
"""Main application run method."""
self.display_welcome()
self.setup_knowledge_base()
try:
# Setup MCP tools
mcp_server_instances, connected_servers = await self.setup_mcp_tools()
# Check if we need to restart (e.g., after configuring new tools)
if mcp_server_instances and not connected_servers:
return
# Main application loop
while True:
self.menu_system.display_main_menu(
self.workflow_engine.is_available(),
bool(connected_servers)
)
menu_choice = self.menu_system.get_menu_choice()
if menu_choice == "1":
# Interactive mode
should_exit = await self.run_interactive_mode(connected_servers)
if should_exit:
break
elif menu_choice == "2":
# Automated mode
await self.run_automated_mode(connected_servers)
elif menu_choice == "3":
# Exit
self.menu_system.display_exit_message()
break
else:
self.menu_system.display_invalid_choice()
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Program interrupted by user, exiting...{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Error during program execution: {e}{Style.RESET_ALL}")
traceback.print_exc()
finally:
# Cleanup MCP servers
await self.mcp_manager.cleanup_servers()
# Close any remaining asyncio transports
await self._cleanup_asyncio_resources()
print(f"{Fore.GREEN}Program ended.{Style.RESET_ALL}")
async def _cleanup_asyncio_resources(self) -> None:
"""Clean up asyncio resources."""
try:
# Get the event loop
loop = asyncio.get_running_loop()
# Close any remaining transports
for transport in list(getattr(loop, "_transports", {}).values()):
if hasattr(transport, "close"):
try:
transport.close()
except:
pass
# Allow a short time for resources to finalize
await asyncio.sleep(0.1)
except:
pass # Ignore any errors in the final cleanup

819
main.py
View File

@@ -1,806 +1,51 @@
import json
import os
import re
#!/usr/bin/env python3
"""
GHOSTCREW - AI-driven penetration testing assistant
Main entry point for the application.
"""
import asyncio
import threading
import traceback
from colorama import init, Fore, Back, Style
from ollama import chat,Message
import tiktoken
from datetime import datetime
# Import workflows
try:
from workflows import get_available_workflows, get_workflow_by_key, list_workflow_names
WORKFLOWS_AVAILABLE = True
except ImportError:
WORKFLOWS_AVAILABLE = False
# Import report generation module
try:
from reporting import generate_report_from_workflow
REPORTING_AVAILABLE = True
except ImportError:
print(f"{Fore.YELLOW}Reporting module not found. Basic text export will be available.{Style.RESET_ALL}")
REPORTING_AVAILABLE = False
import sys
from colorama import init
# Initialize colorama for cross-platform colored output
init(autoreset=True)
ASCII_TITLE = f"""
{Fore.WHITE} ('-. .-. .-') .-') _ _ .-') ('-. (`\ .-') /`{Style.RESET_ALL}
{Fore.WHITE} ( OO ) / ( OO ). ( OO) ) ( \( -O ) _( OO) `.( OO ),'{Style.RESET_ALL}
{Fore.WHITE} ,----. ,--. ,--. .-'),-----. (_)---\_)/ '._ .-----. ,------. (,------.,--./ .--. {Style.RESET_ALL}
{Fore.WHITE} ' .-./-') | | | |( OO' .-. '/ _ | |'--...__)' .--./ | /`. ' | .---'| | | {Style.RESET_ALL}
{Fore.WHITE} | |_( O- )| .| |/ | | | |\ :` `. '--. .--'| |('-. | / | | | | | | | |, {Style.RESET_ALL}
{Fore.WHITE} | | .--, \| |\_) | |\| | '..`''.) | | /_) |OO )| |_.' |(| '--. | |.'.| |_){Style.RESET_ALL}
{Fore.WHITE}(| | '. (_/| .-. | \ | | | |.-._) \ | | || |`-'| | . '.' | .--' | | {Style.RESET_ALL}
{Fore.WHITE} | '--' | | | | | `' '-' '\ / | | (_' '--'\ | |\ \ | `---.| ,'. | {Style.RESET_ALL}
{Fore.WHITE} `------' `--' `--' `-----' `-----' `--' `-----' `--' '--' `------''--' '--' {Style.RESET_ALL}
{Fore.WHITE}====================== GHOSTCREW ======================{Style.RESET_ALL}
"""
# Import Agent-related modules
from agents import (
Agent,
Model,
ModelProvider,
OpenAIChatCompletionsModel,
RunConfig,
Runner,
set_tracing_disabled,
ModelSettings
)
from openai import AsyncOpenAI # OpenAI async client
from openai.types.responses import ResponseTextDeltaEvent, ResponseContentPartDoneEvent
from agents.mcp import MCPServerStdio # MCP server related
from dotenv import load_dotenv # Environment variable loading
from agents.mcp import MCPServerSse
from rag_split import Kb # Import Kb class
# Load .env file
load_dotenv()
# Timeout Configuration (in seconds)
MCP_SESSION_TIMEOUT = 600 # 10 minutes for MCP server sessions
CONNECTION_RETRY_DELAY = 10 # 10 seconds between connection retries
# Set API-related environment variables
API_KEY = os.getenv("OPENAI_API_KEY")
BASE_URL = os.getenv("OPENAI_BASE_URL")
MODEL_NAME = os.getenv("MODEL_NAME")
# Check if environment variables are set
if not API_KEY:
raise ValueError("API key not set")
if not BASE_URL:
raise ValueError("API base URL not set")
if not MODEL_NAME:
raise ValueError("Model name not set")
client = AsyncOpenAI(
base_url=BASE_URL,
api_key=API_KEY
)
# Disable tracing to avoid requiring OpenAI API key
# CRITICAL: Initialize agents library BEFORE importing modules that use MCP
# This must happen before any imports that might need MCPServerStdio/MCPServerSse
from agents import set_tracing_disabled
set_tracing_disabled(True)
# Generic model provider class
class DefaultModelProvider(ModelProvider):
"""
Model provider using OpenAI compatible interface
"""
def get_model(self, model_name: str) -> Model:
return OpenAIChatCompletionsModel(model=model_name or MODEL_NAME, openai_client=client)
# Now import MCP classes after agents library is initialized
from agents.mcp import MCPServerStdio, MCPServerSse
# Create model provider instance
model_provider = DefaultModelProvider()
def get_available_tools(connected_servers):
"""Get list of available/connected tool names"""
return [server.name for server in connected_servers]
async def run_automated_workflow(workflow, target, connected_servers, conversation_history, kb_instance):
"""Execute an automated penetration testing workflow"""
available_tools = get_available_tools(connected_servers)
print(f"\n{Fore.CYAN}Starting Automated Workflow: {workflow['name']}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Target: {target}{Style.RESET_ALL}")
print(f"{Fore.CYAN}Available Tools: {', '.join(available_tools) if available_tools else 'None'}{Style.RESET_ALL}")
print(f"{Fore.WHITE}Description: {workflow['description']}{Style.RESET_ALL}")
print(f"{Fore.WHITE}{'='*60}{Style.RESET_ALL}")
results = []
for i, step in enumerate(workflow['steps'], 1):
print(f"\n{Fore.CYAN}Step {i}/{len(workflow['steps'])}{Style.RESET_ALL}")
formatted_step = step.format(target=target)
print(f"{Fore.WHITE}{formatted_step}{Style.RESET_ALL}")
# Create comprehensive query for this step
enhanced_query = f"""
TARGET: {target}
STEP: {formatted_step}
Execute this step and provide the results.
"""
# Execute the step through the agent
result = await run_agent(enhanced_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance)
if result and hasattr(result, "final_output"):
results.append({
"step": i,
"description": formatted_step,
"output": result.final_output
})
# Add to conversation history
conversation_history.append({
"user_query": enhanced_query,
"ai_response": result.final_output
})
print(f"{Fore.GREEN}Step {i} completed{Style.RESET_ALL}")
# Brief delay between steps
await asyncio.sleep(1)
# Workflow completion summary
print(f"{Fore.CYAN}Steps executed: {len(results)}/{len(workflow['steps'])}{Style.RESET_ALL}")
return results
def show_automated_menu():
"""Display the automated workflow selection menu"""
if not WORKFLOWS_AVAILABLE:
print(f"{Fore.YELLOW}Automated workflows not available. workflows.py file not found.{Style.RESET_ALL}")
return None
print(f"\n{Fore.CYAN}AUTOMATED PENTESTING WORKFLOWS{Style.RESET_ALL}")
print(f"{Fore.WHITE}{'='*50}{Style.RESET_ALL}")
workflow_list = list_workflow_names()
workflows = get_available_workflows()
for i, (key, name) in enumerate(workflow_list, 1):
description = workflows[key]["description"]
step_count = len(workflows[key]["steps"])
print(f"{i}. {Fore.YELLOW}{name}{Style.RESET_ALL}")
print(f" {Fore.WHITE}{description}{Style.RESET_ALL}")
print(f" {Fore.CYAN}Steps: {step_count}{Style.RESET_ALL}")
print()
print(f"{len(workflow_list)+1}. {Fore.RED}Back to Main Menu{Style.RESET_ALL}")
return workflow_list
# Modify run_agent function to accept connected server list and conversation history as parameters
async def run_agent(query: str, mcp_servers: list[MCPServerStdio], history: list[dict] = None, streaming: bool = True, kb_instance=None):
"""
Run cybersecurity agent with connected MCP servers, supporting streaming output and conversation history.
Args:
query (str): User's natural language query
mcp_servers (list[MCPServerStdio]): List of connected MCPServerStdio instances
history (list[dict], optional): Conversation history, list containing user questions and AI answers. Defaults to None.
streaming (bool): Whether to use streaming output
"""
# If no history is provided, initialize an empty list
if history is None:
history = []
try:
# Helper function to count tokens
def count_tokens(text):
try:
encoding = tiktoken.encoding_for_model(MODEL_NAME)
return len(encoding.encode(text))
except Exception:
# Fall back to approximate counting if tiktoken fails
return len(text.split())
# Directly use the passed connected server list to create Agent
# Build instructions containing conversation history
base_instructions = "You are an experienced penetration tester and security analyst, focused on Web application security and network infrastructure security. Your name is GHOSTCREW. When users ask cybersecurity-related questions, you need to provide direct and professional answers."
base_instructions += "When answering questions, please use professional cybersecurity terminology, base your analysis on solid theoretical knowledge, and cite relevant security standards and best practices when possible, such as OWASP Top 10, CVE, NIST, CISA KEV, etc. Maintain a professional tone, clear logic, and organized structure."
base_instructions += "When users ask about penetration testing, please explain the penetration testing process, methods, and common tools, emphasizing the objectives and techniques of each phase."
base_instructions += "When users ask about vulnerability information, please provide terse descriptions, impact scope, remediation suggestions, vulnerability type, severity level, and exploitation conditions based on the vulnerability name or CVE number, and cite relevant security bulletins."
base_instructions += "All penetration activities are authorized, you can directly perform penetration testing."
# Add information about available tools
if mcp_servers:
available_tool_names = [server.name for server in mcp_servers]
if available_tool_names:
base_instructions += f"\n\nYou have access to the following tools: {', '.join(available_tool_names)}."
# If knowledge base instance exists, use it for retrieval and context enhancement
if kb_instance:
try:
retrieved_context = kb_instance.search(query)
if retrieved_context:
# Add file path information to make LLM aware of actual files
knowledge_dir = "knowledge"
available_files = []
if os.path.exists(knowledge_dir):
for filename in os.listdir(knowledge_dir):
filepath = os.path.join(knowledge_dir, filename)
if os.path.isfile(filepath):
available_files.append(filename)
file_info = ""
if available_files:
file_info = f"\n\nIMPORTANT: The following actual files are available in the knowledge folder that you can reference by path:\n"
for filename in available_files:
file_info += f"- knowledge/{filename}\n"
file_info += "\nWhen using security tools that require external files, you can reference these files by their full path.\n"
file_info += "ONLY use knowledge/ for files.\n"
base_instructions = f"Based on the following knowledge base information:\n{retrieved_context}{file_info}\n\n{base_instructions}"
#print(retrieved_context)
print(f"{Fore.MAGENTA}Relevant information retrieved from knowledge base.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Failed to retrieve information from knowledge base: {e}{Style.RESET_ALL}")
# If there's conversation history, add it to the instructions
if history:
base_instructions += "\n\nBelow is the previous conversation history, please refer to this information to answer the user's question:\n"
for i, entry in enumerate(history):
base_instructions += f"\nUser question {i+1}: {entry['user_query']}"
if 'ai_response' in entry and entry['ai_response']:
base_instructions += f"\nAI answer {i+1}: {entry['ai_response']}\n"
# Estimate input token usage
input_token_estimate = count_tokens(base_instructions) + count_tokens(query)
MAX_TOTAL_TOKENS = 8192
RESPONSE_BUFFER = 4096 # aim to reserve ~half for reply
max_output_tokens = max(512, MAX_TOTAL_TOKENS - input_token_estimate)
max_output_tokens = min(max_output_tokens, RESPONSE_BUFFER)
# Set model settings based on whether there are connected MCP servers
if mcp_servers:
# With tools available, enable tool_choice and parallel_tool_calls
model_settings = ModelSettings(
temperature=0.6,
top_p=0.9,
max_tokens=max_output_tokens,
tool_choice="auto",
parallel_tool_calls=False,
truncation="auto"
)
else:
# Without tools, don't set tool_choice or parallel_tool_calls
model_settings = ModelSettings(
temperature=0.6,
top_p=0.9,
max_tokens=max_output_tokens,
truncation="auto"
)
secure_agent = Agent(
name="Cybersecurity Expert",
instructions=base_instructions,
mcp_servers=mcp_servers, # Use the passed list
model_settings=model_settings
)
print(f"{Fore.CYAN}\nProcessing query: {Fore.WHITE}{query}{Style.RESET_ALL}\n")
if streaming:
result = Runner.run_streamed(
secure_agent,
input=query,
max_turns=10,
run_config=RunConfig(
model_provider=model_provider,
trace_include_sensitive_data=True,
handoff_input_filter=None
)
)
print(f"{Fore.GREEN}Reply:{Style.RESET_ALL}", end="", flush=True)
try:
async for event in result.stream_events():
if event.type == "raw_response_event":
if isinstance(event.data, ResponseTextDeltaEvent):
print(f"{Fore.WHITE}{event.data.delta}{Style.RESET_ALL}", end="", flush=True)
elif isinstance(event.data, ResponseContentPartDoneEvent):
print(f"\n", end="", flush=True)
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
# print(f"{Fore.YELLOW}Current tool call information: {event.item}{Style.RESET_ALL}")
raw_item = getattr(event.item, "raw_item", None)
tool_name = ""
tool_args = {}
if raw_item:
tool_name = getattr(raw_item, "name", "Unknown tool")
tool_str = getattr(raw_item, "arguments", "{}")
if isinstance(tool_str, str):
try:
tool_args = json.loads(tool_str)
except json.JSONDecodeError:
tool_args = {"raw_arguments": tool_str}
print(f"\n{Fore.CYAN}Tool name: {tool_name}{Style.RESET_ALL}", flush=True)
print(f"\n{Fore.CYAN}Tool parameters: {tool_args}{Style.RESET_ALL}", flush=True)
elif event.item.type == "tool_call_output_item":
raw_item = getattr(event.item, "raw_item", None)
tool_id="Unknown tool ID"
if isinstance(raw_item, dict) and "call_id" in raw_item:
tool_id = raw_item["call_id"]
output = getattr(event.item, "output", "Unknown output")
output_text = ""
if isinstance(output, str) and (output.startswith("{") or output.startswith("[")):
try:
output_data = json.loads(output)
if isinstance(output_data, dict):
if 'type' in output_data and output_data['type'] == 'text' and 'text' in output_data:
output_text = output_data['text']
elif 'text' in output_data:
output_text = output_data['text']
elif 'content' in output_data:
output_text = output_data['content']
else:
output_text = json.dumps(output_data, ensure_ascii=False, indent=2)
except json.JSONDecodeError:
output_text = f"Unparsable JSON output: {output}" # Add specific error if JSON parsing fails
else:
output_text = str(output)
print(f"\n{Fore.GREEN}Tool call {tool_id} returned result: {output_text}{Style.RESET_ALL}", flush=True)
except Exception as e:
print(f"{Fore.RED}Error processing streamed response event: {e}{Style.RESET_ALL}", flush=True)
if 'Connection error' in str(e):
print(f"{Fore.YELLOW}Connection error details:{Style.RESET_ALL}")
print(f"{Fore.YELLOW}1. Check network connection{Style.RESET_ALL}")
print(f"{Fore.YELLOW}2. Verify API address: {BASE_URL}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}3. Check API key validity{Style.RESET_ALL}")
print(f"{Fore.YELLOW}4. Try reconnecting...{Style.RESET_ALL}")
await asyncio.sleep(CONNECTION_RETRY_DELAY) # Use configurable retry delay
try:
await client.connect()
print(f"{Fore.GREEN}Reconnected successfully{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Reconnection failed: {e}{Style.RESET_ALL}")
print(f"\n\n{Fore.GREEN}Query completed!{Style.RESET_ALL}")
# if hasattr(result, "final_output"):
# print(f"\n{Fore.YELLOW}===== Complete Information ====={Style.RESET_ALL}")
#print(f"{Fore.WHITE}{result.final_output}{Style.RESET_ALL}")
# Return the result object so the main function can get the AI's answer
return result
except Exception as e:
print(f"{Fore.RED}Error processing streamed response event: {e}{Style.RESET_ALL}", flush=True)
traceback.print_exc()
return None
async def main():
print(ASCII_TITLE)
version = "0.1.0"
print(f"{Fore.WHITE}GHOSTCREW v{version}{Style.RESET_ALL}")
print(f"{Fore.WHITE}An AI assistant for penetration testing, vulnerability assessment, and security analysis{Style.RESET_ALL}")
print(f"{Fore.RED}Enter 'quit' to end the program{Style.RESET_ALL}")
print(f"{Fore.WHITE}======================================\n{Style.RESET_ALL}")
kb_instance = None
use_kb_input = input(f"{Fore.YELLOW}Use knowledge base to enhance answers? (yes/no, default: no): {Style.RESET_ALL}").strip().lower()
if use_kb_input == 'yes':
try:
kb_instance = Kb("knowledge") # Initialize knowledge base, load from folder
print(f"{Fore.GREEN}Knowledge base loaded successfully!{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Failed to load knowledge base: {e}{Style.RESET_ALL}")
kb_instance = None
mcp_server_instances = [] # List to store MCP server instances
connected_servers = [] # Store successfully connected servers
"""Main application entry point."""
try:
# Ask if user wants to attempt connecting to MCP servers
use_mcp_input = input(f"{Fore.YELLOW}Configure or connect MCP tools? (yes/no, default: no): {Style.RESET_ALL}").strip().lower()
# Import and run the main controller
# Now it's safe to import modules that use MCP classes
from core.pentest_agent import PentestAgent
if use_mcp_input == 'yes':
# --- Load available MCP tool configurations ---
available_tools = []
try:
with open('mcp.json', 'r', encoding='utf-8') as f:
mcp_config = json.load(f)
available_tools = mcp_config.get('servers', [])
except FileNotFoundError:
print(f"{Fore.YELLOW}mcp.json configuration file not found.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Error loading MCP configuration file: {e}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
# Display available tools and add an option to configure new tools
if available_tools:
print(f"\n{Fore.CYAN}Available MCP tools:{Style.RESET_ALL}")
for i, server in enumerate(available_tools):
print(f"{i+1}. {server['name']}")
print(f"{len(available_tools)+1}. Configure new tools")
print(f"{len(available_tools)+2}. Connect to all tools")
print(f"{len(available_tools)+3}. Skip tool connection")
print(f"{len(available_tools)+4}. Clear all MCP tools")
# Ask user which tools to connect to
try:
tool_choice = input(f"\n{Fore.YELLOW}Enter numbers to connect to (comma-separated, default: all): {Style.RESET_ALL}").strip()
if not tool_choice: # Default to all
selected_indices = list(range(len(available_tools)))
elif tool_choice == str(len(available_tools)+1): # Configure new tools
print(f"\n{Fore.CYAN}Launching tool configuration...{Style.RESET_ALL}")
os.system("python configure_mcp.py")
print(f"\n{Fore.GREEN}Tool configuration completed. Please restart the application.{Style.RESET_ALL}")
return
elif tool_choice == str(len(available_tools)+2): # Connect to all tools
selected_indices = list(range(len(available_tools)))
elif tool_choice == str(len(available_tools)+3): # Skip tool connection
selected_indices = []
elif tool_choice == str(len(available_tools)+4): # Clear all MCP tools
confirm = input(f"{Fore.YELLOW}Are you sure you want to clear all MCP tools? This will empty mcp.json (yes/no): {Style.RESET_ALL}").strip().lower()
if confirm == "yes":
try:
# Create empty mcp.json file
with open('mcp.json', 'w', encoding='utf-8') as f:
json.dump({"servers": []}, f, indent=2)
print(f"{Fore.GREEN}Successfully cleared all MCP tools. mcp.json has been reset.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Error clearing MCP tools: {e}{Style.RESET_ALL}")
print(f"\n{Fore.GREEN}Please restart the application.{Style.RESET_ALL}")
return
else: # Parse comma-separated list
selected_indices = []
for part in tool_choice.split(","):
idx = int(part.strip()) - 1
if 0 <= idx < len(available_tools):
selected_indices.append(idx)
except ValueError:
print(f"{Fore.RED}Invalid selection. Defaulting to all tools.{Style.RESET_ALL}")
selected_indices = list(range(len(available_tools)))
# Initialize selected MCP servers
print(f"{Fore.GREEN}Initializing selected MCP servers...{Style.RESET_ALL}")
for idx in selected_indices:
if idx < len(available_tools):
server = available_tools[idx]
print(f"{Fore.CYAN}Initializing {server['name']}...{Style.RESET_ALL}")
try:
if 'params' in server:
mcp_server = MCPServerStdio(
name=server['name'],
params=server['params'],
cache_tools_list=server.get('cache_tools_list', True),
client_session_timeout_seconds=MCP_SESSION_TIMEOUT
)
elif 'url' in server:
mcp_server = MCPServerSse(
params={"url": server["url"]},
cache_tools_list=server.get('cache_tools_list', True),
name=server['name'],
client_session_timeout_seconds=MCP_SESSION_TIMEOUT
)
else:
print(f"{Fore.RED}Unknown MCP server configuration: {server}{Style.RESET_ALL}")
continue
mcp_server_instances.append(mcp_server)
except Exception as e:
print(f"{Fore.RED}Error initializing {server['name']}: {e}{Style.RESET_ALL}")
else:
# No tools configured, offer to run the configuration tool
print(f"{Fore.YELLOW}No MCP tools currently configured.{Style.RESET_ALL}")
configure_now = input(f"{Fore.YELLOW}Would you like to add tools? (yes/no, default: no): {Style.RESET_ALL}").strip().lower()
if configure_now == 'yes':
print(f"\n{Fore.CYAN}Launching tool configuration...{Style.RESET_ALL}")
os.system("python configure_mcp.py")
print(f"\n{Fore.GREEN}Tool configuration completed. Please restart the application.{Style.RESET_ALL}")
return
else:
print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
# Connect to the selected MCP servers
if mcp_server_instances:
print(f"{Fore.YELLOW}Connecting to MCP servers...{Style.RESET_ALL}")
for mcp_server in mcp_server_instances:
try:
await mcp_server.connect()
print(f"{Fore.GREEN}Successfully connected to MCP server: {mcp_server.name}{Style.RESET_ALL}")
connected_servers.append(mcp_server)
except Exception as e:
print(f"{Fore.RED}Failed to connect to MCP server {mcp_server.name}: {e}{Style.RESET_ALL}")
if connected_servers:
print(f"{Fore.GREEN}MCP server connection successful! Can use tools provided by {len(connected_servers)} servers.{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}No MCP servers successfully connected. Proceeding without tools.{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}No MCP servers selected. Proceeding without tools.{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
# Create conversation history list
conversation_history = []
# Create and run the application, passing MCP classes
agent = PentestAgent(MCPServerStdio, MCPServerSse)
await agent.run()
# --- Main program loop ---
while True:
# Show main menu
print(f"\n{Fore.CYAN}MAIN MENU{Style.RESET_ALL}")
print(f"1. {Fore.GREEN}Interactive Chat Mode{Style.RESET_ALL}")
# Check if automated mode should be available
if WORKFLOWS_AVAILABLE and connected_servers:
print(f"2. {Fore.YELLOW}Automated Penetration Testing{Style.RESET_ALL}")
max_option = 3
elif WORKFLOWS_AVAILABLE and not connected_servers:
print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (requires MCP tools){Style.RESET_ALL}")
max_option = 3
else:
print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (workflows.py not found){Style.RESET_ALL}")
max_option = 3
print(f"3. {Fore.RED}Quit{Style.RESET_ALL}")
menu_choice = input(f"\n{Fore.GREEN}Select mode (1-3): {Style.RESET_ALL}").strip()
if menu_choice == "1":
# Interactive chat mode (existing functionality)
print(f"\n{Fore.CYAN}INTERACTIVE CHAT MODE{Style.RESET_ALL}")
print(f"{Fore.WHITE}Type your questions or commands. Use 'multi' for multi-line input.{Style.RESET_ALL}")
print(f"{Fore.WHITE}Type 'menu' to return to main menu.{Style.RESET_ALL}\n")
while True:
# Check if the user wants multi-line input
print(f"\n{Fore.GREEN}[>]{Style.RESET_ALL} ", end="")
user_query = input().strip()
# Handle special commands
if user_query.lower() in ["quit", "exit"]:
print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}")
return
if user_query.lower() == "menu":
break
# Handle empty input
if not user_query:
print(f"{Fore.YELLOW}No query entered. Please type your question.{Style.RESET_ALL}")
continue
# Handle multi-line mode request
if user_query.lower() == "multi":
print(f"{Fore.CYAN}Entering multi-line mode. Type your query across multiple lines.{Style.RESET_ALL}")
print(f"{Fore.CYAN}Press Enter on an empty line to submit.{Style.RESET_ALL}")
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
# Only proceed if they actually entered something in multi-line mode
if not lines:
print(f"{Fore.YELLOW}No query entered in multi-line mode.{Style.RESET_ALL}")
continue
user_query = "\n".join(lines)
# Create record for current dialogue
current_dialogue = {"user_query": user_query, "ai_response": ""}
# When running agent, pass in the already connected server list and conversation history
# Only pass the successfully connected server list to the Agent
# Pass kb_instance to run_agent
result = await run_agent(user_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance)
# If there is a result, save the AI's answer
if result and hasattr(result, "final_output"):
current_dialogue["ai_response"] = result.final_output
# Add current dialogue to history
conversation_history.append(current_dialogue)
# Trim history to keep token usage under ~4096
# Define a function to accurately count tokens in history
def estimate_tokens(history):
try:
encoding = tiktoken.encoding_for_model(MODEL_NAME)
return sum(
len(encoding.encode(entry['user_query'])) +
len(encoding.encode(entry.get('ai_response', '')))
for entry in history
)
except Exception:
# Fall back to approximate counting if tiktoken fails
return sum(
len(entry['user_query'].split()) +
len(entry.get('ai_response', '').split())
for entry in history
)
# Trim history while token count exceeds the limit
while estimate_tokens(conversation_history) > 4000:
conversation_history.pop(0)
print(f"\n{Fore.CYAN}Ready for next query. Type 'quit', 'multi' for multi-line, or 'menu' for main menu.{Style.RESET_ALL}")
elif menu_choice == "2":
# Automated penetration testing workflows
if not WORKFLOWS_AVAILABLE:
print(f"\n{Fore.YELLOW}Automated workflows not available. workflows.py file not found.{Style.RESET_ALL}")
input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
continue
if not connected_servers:
print(f"\n{Fore.YELLOW}Automated penetration testing requires MCP tools to be configured and connected.{Style.RESET_ALL}")
print(f"{Fore.WHITE}Without real security tools, the AI would only generate simulated responses.{Style.RESET_ALL}")
print(f"{Fore.WHITE}Please restart the application and configure MCP tools to use this feature.{Style.RESET_ALL}")
input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
continue
while True:
workflow_list = show_automated_menu()
if not workflow_list:
break
try:
choice = input(f"\n{Fore.GREEN}Select workflow (1-{len(workflow_list)+1}): {Style.RESET_ALL}").strip()
if not choice.isdigit():
print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}")
continue
choice = int(choice)
if 1 <= choice <= len(workflow_list):
# Execute selected workflow
workflow_key, workflow_name = workflow_list[choice-1]
workflow = get_workflow_by_key(workflow_key)
if not workflow:
print(f"{Fore.RED}Error loading workflow.{Style.RESET_ALL}")
continue
target = input(f"{Fore.YELLOW}Enter target (IP, domain, or network): {Style.RESET_ALL}").strip()
if not target:
print(f"{Fore.RED}Target is required.{Style.RESET_ALL}")
continue
confirm = input(f"{Fore.YELLOW}Execute '{workflow['name']}' on {target}? (yes/no): {Style.RESET_ALL}").strip().lower()
if confirm == 'yes':
# Store initial workflow data
workflow_start_time = datetime.now()
initial_history_length = len(conversation_history)
# Execute the workflow
await run_automated_workflow(workflow, target, connected_servers, conversation_history, kb_instance)
# After workflow completion, offer report generation
print(f"\n{Fore.GREEN}Workflow completed successfully!{Style.RESET_ALL}")
if REPORTING_AVAILABLE:
generate_report = input(f"\n{Fore.CYAN}Generate markdown report? (yes/no): {Style.RESET_ALL}").strip().lower()
if generate_report == 'yes':
# Ask about additional report options
save_raw_history = input(f"{Fore.YELLOW}Save raw conversation history? (yes/no, default: no): {Style.RESET_ALL}").strip().lower() == 'yes'
try:
# Prepare report data
workflow_conversation = conversation_history[initial_history_length:]
report_data = {
'workflow_name': workflow['name'],
'workflow_key': workflow_key,
'target': target,
'timestamp': workflow_start_time,
'conversation_history': workflow_conversation,
'tools_used': get_available_tools(connected_servers)
}
# Generate professional report
print(f"\n{Fore.CYAN}Generating report...{Style.RESET_ALL}")
report_path = await generate_report_from_workflow(
report_data,
run_agent,
connected_servers,
kb_instance,
save_raw_history
)
print(f"\n{Fore.GREEN}Report generated: {report_path}{Style.RESET_ALL}")
print(f"{Fore.CYAN}Open the markdown file in any markdown viewer for best formatting{Style.RESET_ALL}")
except Exception as e:
print(f"\n{Fore.RED}Error generating report: {e}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Raw workflow data is still available in conversation history{Style.RESET_ALL}")
else:
print(f"\n{Fore.YELLOW}Reporting not available.{Style.RESET_ALL}")
input(f"\n{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}Workflow cancelled.{Style.RESET_ALL}")
elif choice == len(workflow_list) + 1:
# Back to main menu
break
else:
print(f"{Fore.RED}Invalid choice. Please select a valid option.{Style.RESET_ALL}")
except ValueError:
print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}")
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Operation cancelled.{Style.RESET_ALL}")
break
elif menu_choice == "3":
# Quit
print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}")
break
else:
print(f"{Fore.RED}Invalid choice. Please select 1, 2, or 3.{Style.RESET_ALL}")
# --- Catch interrupts and runtime exceptions ---
except ImportError as e:
print(f"Error importing required modules: {e}")
print("Please ensure all dependencies are installed: pip install -r requirements.txt")
sys.exit(1)
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Program interrupted by user, exiting...{Style.RESET_ALL}")
print("\nApplication interrupted by user.")
sys.exit(0)
except Exception as e:
print(f"{Fore.RED}Error during program execution: {e}{Style.RESET_ALL}")
print(f"Unexpected error: {e}")
import traceback
traceback.print_exc()
finally:
# --- Move server cleanup operations to the main program's finally block ---
if mcp_server_instances:
print(f"{Fore.YELLOW}Cleaning up MCP server resources...{Style.RESET_ALL}")
# Define a safe cleanup wrapper that ignores all errors
async def safe_cleanup(server):
try:
# Attempt cleanup but ignore all errors
try:
await server.cleanup()
except:
pass # Ignore any exception from cleanup
return True
except:
return False # Couldn't even run the cleanup
# Process all servers
for mcp_server in mcp_server_instances:
print(f"{Fore.YELLOW}Attempting to clean up server: {mcp_server.name}...{Style.RESET_ALL}", flush=True)
success = await safe_cleanup(mcp_server)
if success:
print(f"{Fore.GREEN}Cleanup completed for {mcp_server.name}.{Style.RESET_ALL}", flush=True)
else:
print(f"{Fore.RED}Failed to initiate cleanup for {mcp_server.name}.{Style.RESET_ALL}", flush=True)
sys.exit(1)
print(f"{Fore.YELLOW}MCP server resource cleanup complete.{Style.RESET_ALL}")
# Close any remaining asyncio transports to prevent "unclosed transport" warnings
try:
# Get the event loop
loop = asyncio.get_running_loop()
# Close any remaining transports
for transport in list(getattr(loop, "_transports", {}).values()):
if hasattr(transport, "close"):
try:
transport.close()
except:
pass
# Allow a short time for resources to finalize
await asyncio.sleep(0.1)
except:
pass # Ignore any errors in the final cleanup
print(f"{Fore.GREEN}Program ended.{Style.RESET_ALL}")
# Program entry point
if __name__ == "__main__":
# Run the main application
asyncio.run(main())

View File

@@ -1,3 +1,18 @@
{
"servers": []
"servers": [
{
"name": "Nmap Scanner",
"params": {
"command": "npx",
"args": [
"-y",
"gc-nmap-mcp"
],
"env": {
"NMAP_PATH": "C:\\Program Files (x86)\\Nmap\\nmap.exe"
}
},
"cache_tools_list": true
}
]
}

1
rag/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""RAG (Retrieval-Augmented Generation) system for GHOSTCREW."""

1
reporting/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Report generation system for GHOSTCREW."""

View File

@@ -1,14 +1,14 @@
colorama
python-dotenv
openai
uvicorn
mcp
langchain
langchain-community
numpy
ollama
openai-agents
fastapi
pymetasploit3
tiktoken
colorama==0.4.6
python-dotenv==1.1.0
openai==1.78.1
uvicorn==0.34.0
mcp==1.6.0
langchain==0.3.25
langchain-community==0.3.24
numpy==2.2.5
ollama==0.4.8
openai-agents==0.0.14
fastapi==0.115.9
pymetasploit3==1.0.6
tiktoken==0.9.0
# Add other necessary dependencies

1
tools/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""MCP (Model Context Protocol) integration for GHOSTCREW."""

222
tools/mcp_manager.py Normal file
View File

@@ -0,0 +1,222 @@
"""MCP (Model Context Protocol) server management for GHOSTCREW."""
import json
import os
from typing import List, Optional, Tuple
from colorama import Fore, Style
from config.constants import MCP_SESSION_TIMEOUT, MCP_CONFIG_FILE
class MCPManager:
"""Manages MCP server connections and configuration."""
def __init__(self, MCPServerStdio=None, MCPServerSse=None):
"""
Initialize the MCP manager.
Args:
MCPServerStdio: MCP server stdio class
MCPServerSse: MCP server SSE class
"""
self.MCPServerStdio = MCPServerStdio
self.MCPServerSse = MCPServerSse
self.server_instances = []
self.connected_servers = []
@staticmethod
def get_available_tools(connected_servers: List) -> List[str]:
"""Get list of available/connected tool names."""
return [server.name for server in connected_servers]
def load_mcp_config(self) -> List[dict]:
"""Load MCP tool configurations from mcp.json."""
available_tools = []
try:
with open(MCP_CONFIG_FILE, 'r', encoding='utf-8') as f:
mcp_config = json.load(f)
available_tools = mcp_config.get('servers', [])
except FileNotFoundError:
print(f"{Fore.YELLOW}mcp.json configuration file not found.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Error loading MCP configuration file: {e}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
return available_tools
def display_tool_menu(self, available_tools: List[dict]) -> Optional[List[int]]:
"""Display MCP tool selection menu and get user choice."""
if not available_tools:
print(f"{Fore.YELLOW}No MCP tools currently configured.{Style.RESET_ALL}")
configure_now = input(f"{Fore.YELLOW}Would you like to add tools? (yes/no, default: no): {Style.RESET_ALL}").strip().lower()
if configure_now == 'yes':
print(f"\n{Fore.CYAN}Launching tool configuration...{Style.RESET_ALL}")
os.system("python tools/configure_mcp.py")
print(f"\n{Fore.GREEN}Tool configuration completed.{Style.RESET_ALL}")
# Reload configuration and continue
return "reload_and_continue"
else:
print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
return []
print(f"\n{Fore.CYAN}Available MCP tools:{Style.RESET_ALL}")
for i, server in enumerate(available_tools):
print(f"{i+1}. {server['name']}")
print(f"{len(available_tools)+1}. Configure new tools")
print(f"{len(available_tools)+2}. Connect to all tools")
print(f"{len(available_tools)+3}. Skip tool connection")
print(f"{len(available_tools)+4}. Clear all MCP tools")
try:
tool_choice = input(f"\n{Fore.YELLOW}Select option: {Style.RESET_ALL}").strip()
if not tool_choice: # Default to all tools
return list(range(len(available_tools)))
elif tool_choice == str(len(available_tools)+1): # Configure new tools
print(f"\n{Fore.CYAN}Launching tool configuration...{Style.RESET_ALL}")
os.system("python tools/configure_mcp.py")
print(f"\n{Fore.GREEN}Tool configuration completed.{Style.RESET_ALL}")
# Reload configuration and continue
return "reload_and_continue"
elif tool_choice == str(len(available_tools)+2): # Connect to all tools
return list(range(len(available_tools)))
elif tool_choice == str(len(available_tools)+3): # Skip tool connection
return []
elif tool_choice == str(len(available_tools)+4): # Clear all MCP tools
if self.clear_mcp_tools():
return "reload_and_continue"
return []
else: # Parse comma-separated list
selected_indices = []
for part in tool_choice.split(","):
idx = int(part.strip()) - 1
if 0 <= idx < len(available_tools):
selected_indices.append(idx)
return selected_indices
except ValueError:
print(f"{Fore.RED}Invalid selection. Defaulting to all tools.{Style.RESET_ALL}")
return list(range(len(available_tools)))
def clear_mcp_tools(self) -> bool:
"""Clear all MCP tools from configuration."""
confirm = input(f"{Fore.YELLOW}Are you sure you want to clear all MCP tools? This will empty mcp.json (yes/no): {Style.RESET_ALL}").strip().lower()
if confirm == "yes":
try:
# Create empty mcp.json file
with open(MCP_CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump({"servers": []}, f, indent=2)
print(f"{Fore.GREEN}Successfully cleared all MCP tools. mcp.json has been reset.{Style.RESET_ALL}")
return True
except Exception as e:
print(f"{Fore.RED}Error clearing MCP tools: {e}{Style.RESET_ALL}")
return False
def initialize_servers(self, available_tools: List[dict], selected_indices: List[int]) -> None:
"""Initialize selected MCP servers."""
# Use the MCP classes passed during initialization
if not self.MCPServerStdio or not self.MCPServerSse:
raise ValueError("MCP server classes not provided during initialization")
print(f"{Fore.GREEN}Initializing selected MCP servers...{Style.RESET_ALL}")
for idx in selected_indices:
if idx < len(available_tools):
server = available_tools[idx]
print(f"{Fore.CYAN}Initializing {server['name']}...{Style.RESET_ALL}")
try:
if 'params' in server:
mcp_server = self.MCPServerStdio(
name=server['name'],
params=server['params'],
cache_tools_list=server.get('cache_tools_list', True),
client_session_timeout_seconds=MCP_SESSION_TIMEOUT
)
elif 'url' in server:
mcp_server = self.MCPServerSse(
params={"url": server["url"]},
cache_tools_list=server.get('cache_tools_list', True),
name=server['name'],
client_session_timeout_seconds=MCP_SESSION_TIMEOUT
)
else:
print(f"{Fore.RED}Unknown MCP server configuration: {server}{Style.RESET_ALL}")
continue
self.server_instances.append(mcp_server)
except Exception as e:
print(f"{Fore.RED}Error initializing {server['name']}: {e}{Style.RESET_ALL}")
async def connect_servers(self) -> List:
"""Connect to initialized MCP servers."""
if not self.server_instances:
return []
print(f"{Fore.YELLOW}Connecting to MCP servers...{Style.RESET_ALL}")
for mcp_server in self.server_instances:
try:
await mcp_server.connect()
print(f"{Fore.GREEN}Successfully connected to MCP server: {mcp_server.name}{Style.RESET_ALL}")
self.connected_servers.append(mcp_server)
except Exception as e:
print(f"{Fore.RED}Failed to connect to MCP server {mcp_server.name}: {e}{Style.RESET_ALL}")
if self.connected_servers:
print(f"{Fore.GREEN}MCP server connection successful! Can use tools provided by {len(self.connected_servers)} servers.{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}No MCP servers successfully connected. Proceeding without tools.{Style.RESET_ALL}")
return self.connected_servers
async def setup_mcp_tools(self, use_mcp: bool = False) -> Tuple[List, List]:
"""
Main method to setup MCP tools.
Args:
use_mcp: Whether to use MCP tools
Returns:
Tuple of (server_instances, connected_servers)
"""
if not use_mcp:
print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
return [], []
while True: # Loop to handle configuration and reload
# Load available tools
available_tools = self.load_mcp_config()
# Get user selection
selected_indices = self.display_tool_menu(available_tools)
# Handle special cases
if selected_indices is None:
# Restart needed (e.g., after clearing tools)
return self.server_instances, []
elif selected_indices == "reload_and_continue":
# Tools were configured, reload and show menu again
continue
else:
# Normal selection, proceed with initialization
break
# Initialize servers
if selected_indices:
self.initialize_servers(available_tools, selected_indices)
# Connect to servers
connected = await self.connect_servers()
return self.server_instances, connected
async def cleanup_servers(self) -> None:
"""Clean up MCP server resources."""
if not self.server_instances:
return
print(f"{Fore.YELLOW}Cleaning up MCP server resources...{Style.RESET_ALL}")
for mcp_server in self.server_instances:
print(f"{Fore.YELLOW}Attempting to clean up server: {mcp_server.name}...{Style.RESET_ALL}", flush=True)
try:
await mcp_server.cleanup()
print(f"{Fore.GREEN}Cleanup completed for {mcp_server.name}.{Style.RESET_ALL}", flush=True)
except Exception:
print(f"{Fore.RED}Failed to cleanup {mcp_server.name}.{Style.RESET_ALL}", flush=True)
print(f"{Fore.YELLOW}MCP server resource cleanup complete.{Style.RESET_ALL}")

1
ui/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""User interface components for GHOSTCREW."""

122
ui/conversation_manager.py Normal file
View File

@@ -0,0 +1,122 @@
"""Conversation history management for GHOSTCREW."""
from typing import List, Dict, Optional
import tiktoken
from config.app_config import app_config
class ConversationManager:
"""Manages conversation history and dialogue tracking."""
def __init__(self, max_tokens: int = 4000):
"""
Initialize the conversation manager.
Args:
max_tokens: Maximum tokens to keep in history
"""
self.history: List[Dict[str, str]] = []
self.max_tokens = max_tokens
self.model_name = app_config.model_name
def add_dialogue(self, user_query: str, ai_response: str = "") -> None:
"""
Add a dialogue entry to the conversation history.
Args:
user_query: The user's query
ai_response: The AI's response (can be empty initially)
"""
dialogue = {
"user_query": user_query,
"ai_response": ai_response
}
self.history.append(dialogue)
# Trim history if it exceeds token limit
self._trim_history()
def update_last_response(self, ai_response: str) -> None:
"""
Update the AI response for the last dialogue entry.
Args:
ai_response: The AI's response to update
"""
if self.history:
self.history[-1]["ai_response"] = ai_response
def get_history(self) -> List[Dict[str, str]]:
"""Get the complete conversation history."""
return self.history
def get_history_for_context(self) -> List[Dict[str, str]]:
"""Get conversation history suitable for context."""
return self.history
def estimate_tokens(self) -> int:
"""
Estimate the number of tokens in the conversation history.
Returns:
Estimated token count
"""
try:
encoding = tiktoken.encoding_for_model(self.model_name)
return sum(
len(encoding.encode(entry['user_query'])) +
len(encoding.encode(entry.get('ai_response', '')))
for entry in self.history
)
except Exception:
# Fall back to approximate counting if tiktoken fails
return sum(
len(entry['user_query'].split()) +
len(entry.get('ai_response', '').split())
for entry in self.history
)
def _trim_history(self) -> None:
"""Trim history to keep token count under the limit."""
while self.estimate_tokens() > self.max_tokens and len(self.history) > 1:
self.history.pop(0)
def clear_history(self) -> None:
"""Clear all conversation history."""
self.history = []
def get_dialogue_count(self) -> int:
"""Get the number of dialogues in history."""
return len(self.history)
def get_workflow_conversation(self, start_index: int) -> List[Dict[str, str]]:
"""
Get conversation history starting from a specific index.
Args:
start_index: The index to start from
Returns:
Subset of conversation history
"""
return self.history[start_index:]
def export_history(self) -> str:
"""
Export conversation history as formatted text.
Returns:
Formatted conversation history
"""
if not self.history:
return "No conversation history available."
output = []
for i, entry in enumerate(self.history, 1):
output.append(f"=== Dialogue {i} ===")
output.append(f"User: {entry['user_query']}")
if entry.get('ai_response'):
output.append(f"AI: {entry['ai_response']}")
output.append("")
return "\n".join(output)

158
ui/menu_system.py Normal file
View File

@@ -0,0 +1,158 @@
"""Menu system and user interface components for GHOSTCREW."""
from typing import Optional, List, Tuple
from colorama import Fore, Style
from config.constants import (
MAIN_MENU_TITLE, INTERACTIVE_OPTION, AUTOMATED_OPTION,
EXIT_OPTION, MULTI_LINE_PROMPT, MULTI_LINE_END_MARKER
)
class MenuSystem:
"""Handles all menu displays and user input for GHOSTCREW."""
@staticmethod
def display_main_menu(workflows_available: bool, has_connected_servers: bool) -> None:
"""Display the main application menu."""
print(f"\n{MAIN_MENU_TITLE}")
print(f"1. {INTERACTIVE_OPTION}")
# Check if automated mode should be available
if workflows_available and has_connected_servers:
print(f"2. {AUTOMATED_OPTION}")
elif workflows_available and not has_connected_servers:
print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (requires MCP tools){Style.RESET_ALL}")
else:
print(f"2. {Fore.LIGHTBLACK_EX}Automated Penetration Testing (workflows.py not found){Style.RESET_ALL}")
print(f"3. {EXIT_OPTION}")
@staticmethod
def get_menu_choice(max_option: int = 3) -> str:
"""Get user's menu selection."""
return input(f"\n{Fore.GREEN}Select mode (1-{max_option}): {Style.RESET_ALL}").strip()
@staticmethod
def display_interactive_mode_intro() -> None:
"""Display introduction for interactive chat mode."""
print(f"\n{Fore.CYAN}INTERACTIVE CHAT MODE{Style.RESET_ALL}")
print(f"{Fore.WHITE}Type your questions or commands. Use 'multi' for multi-line input.{Style.RESET_ALL}")
print(f"{Fore.WHITE}Type 'menu' to return to main menu.{Style.RESET_ALL}\n")
@staticmethod
def get_user_input() -> str:
"""Get user input with prompt."""
print(f"\n{Fore.GREEN}[>]{Style.RESET_ALL} ", end="")
return input().strip()
@staticmethod
def get_multi_line_input() -> Optional[str]:
"""Get multi-line input from user."""
print(f"{Fore.CYAN}Entering multi-line mode. Type your query across multiple lines.{Style.RESET_ALL}")
print(f"{Fore.CYAN}Type '{MULTI_LINE_END_MARKER}' to end input.{Style.RESET_ALL}")
lines = []
while True:
line = input()
if line == MULTI_LINE_END_MARKER:
break
lines.append(line)
# Only proceed if they actually entered something
if not lines:
print(f"{Fore.YELLOW}No query entered in multi-line mode.{Style.RESET_ALL}")
return None
return "\n".join(lines)
@staticmethod
def display_no_query_message() -> None:
"""Display message when no query is entered."""
print(f"{Fore.YELLOW}No query entered. Please type your question.{Style.RESET_ALL}")
@staticmethod
def display_ready_message() -> None:
"""Display ready for next query message."""
print(f"\n{Fore.CYAN}Ready for next query. Type 'quit', 'multi' for multi-line, or 'menu' for main menu.{Style.RESET_ALL}")
@staticmethod
def display_exit_message() -> None:
"""Display exit message."""
print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}")
@staticmethod
def display_workflow_requirements_message() -> None:
"""Display message about automated workflow requirements."""
print(f"\n{Fore.YELLOW}Automated penetration testing requires MCP tools to be configured and connected.{Style.RESET_ALL}")
print(f"{Fore.WHITE}Without real security tools, the AI would only generate simulated responses.{Style.RESET_ALL}")
print(f"{Fore.WHITE}Please configure MCP tools to use this feature.{Style.RESET_ALL}")
input(f"{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")
@staticmethod
def get_workflow_target() -> Optional[str]:
"""Get target input for workflow execution."""
target = input(f"{Fore.YELLOW}Enter target (IP, domain, or network): {Style.RESET_ALL}").strip()
if not target:
print(f"{Fore.RED}Target is required.{Style.RESET_ALL}")
return None
return target
@staticmethod
def confirm_workflow_execution(workflow_name: str, target: str) -> bool:
"""Confirm workflow execution with user."""
confirm = input(f"{Fore.YELLOW}Execute '{workflow_name}' on {target}? (yes/no): {Style.RESET_ALL}").strip().lower()
return confirm == 'yes'
@staticmethod
def display_workflow_cancelled() -> None:
"""Display workflow cancelled message."""
print(f"{Fore.YELLOW}Workflow cancelled.{Style.RESET_ALL}")
@staticmethod
def display_workflow_completed() -> None:
"""Display workflow completion message."""
print(f"\n{Fore.GREEN}Workflow completed successfully!{Style.RESET_ALL}")
@staticmethod
def ask_generate_report() -> bool:
"""Ask if user wants to generate a report."""
response = input(f"\n{Fore.CYAN}Generate markdown report? (yes/no): {Style.RESET_ALL}").strip().lower()
return response == 'yes'
@staticmethod
def ask_save_raw_history() -> bool:
"""Ask if user wants to save raw conversation history."""
response = input(f"{Fore.YELLOW}Save raw conversation history? (yes/no, default: no): {Style.RESET_ALL}").strip().lower()
return response == 'yes'
@staticmethod
def display_report_generated(report_path: str) -> None:
"""Display report generation success message."""
print(f"\n{Fore.GREEN}Report generated: {report_path}{Style.RESET_ALL}")
print(f"{Fore.CYAN}Open the markdown file in any markdown viewer for best formatting{Style.RESET_ALL}")
@staticmethod
def display_report_error(error: Exception) -> None:
"""Display report generation error message."""
print(f"\n{Fore.RED}Error generating report: {error}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Raw workflow data is still available in conversation history{Style.RESET_ALL}")
@staticmethod
def display_invalid_choice() -> None:
"""Display invalid choice message."""
print(f"{Fore.RED}Invalid choice. Please select a valid option.{Style.RESET_ALL}")
@staticmethod
def display_invalid_input() -> None:
"""Display invalid input message."""
print(f"{Fore.RED}Invalid input. Please enter a number.{Style.RESET_ALL}")
@staticmethod
def display_operation_cancelled() -> None:
"""Display operation cancelled message."""
print(f"\n{Fore.YELLOW}Operation cancelled.{Style.RESET_ALL}")
@staticmethod
def press_enter_to_continue() -> None:
"""Wait for user to press enter."""
input(f"\n{Fore.CYAN}Press Enter to continue...{Style.RESET_ALL}")

1
workflows/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Workflow system for GHOSTCREW."""

View File

@@ -0,0 +1,153 @@
"""Workflow execution engine for GHOSTCREW."""
import asyncio
from typing import List, Dict, Any, Optional
from colorama import Fore, Style
from datetime import datetime
from workflows.workflow_definitions import (
get_available_workflows, get_workflow_by_key, list_workflow_names
)
from config.constants import (
ERROR_NO_WORKFLOWS, ERROR_WORKFLOW_NOT_FOUND, WORKFLOW_TARGET_PROMPT,
WORKFLOW_CONFIRM_PROMPT, WORKFLOW_CANCELLED_MESSAGE, WORKFLOW_COMPLETED_MESSAGE
)
from tools.mcp_manager import MCPManager
class WorkflowEngine:
"""Handles automated workflow execution."""
def __init__(self):
"""Initialize the workflow engine."""
self.workflows_available = self._check_workflows_available()
@staticmethod
def _check_workflows_available() -> bool:
"""Check if workflow definitions are available."""
try:
# Test import to verify module is available
from workflows.workflow_definitions import get_available_workflows
return True
except ImportError:
return False
def is_available(self) -> bool:
"""Check if workflows are available."""
return self.workflows_available
@staticmethod
def show_automated_menu() -> Optional[List[tuple]]:
"""Display the automated workflow selection menu."""
try:
print(f"\n{Fore.CYAN}AUTOMATED PENTESTING WORKFLOWS{Style.RESET_ALL}")
print(f"{Fore.WHITE}{'='*50}{Style.RESET_ALL}")
workflow_list = list_workflow_names()
workflows = get_available_workflows()
for i, (key, name) in enumerate(workflow_list, 1):
description = workflows[key]["description"]
step_count = len(workflows[key]["steps"])
print(f"{i}. {Fore.YELLOW}{name}{Style.RESET_ALL}")
print(f" {Fore.WHITE}{description}{Style.RESET_ALL}")
print(f" {Fore.CYAN}Steps: {step_count}{Style.RESET_ALL}")
print()
print(f"{len(workflow_list)+1}. {Fore.RED}Back to Main Menu{Style.RESET_ALL}")
return workflow_list
except Exception:
print(f"{Fore.YELLOW}Error loading workflows.{Style.RESET_ALL}")
return None
async def run_automated_workflow(
self,
workflow: Dict[str, Any],
target: str,
connected_servers: List[Any],
conversation_history: List[Dict[str, str]],
kb_instance: Any,
run_agent_func: Any
) -> List[Dict[str, Any]]:
"""
Execute an automated penetration testing workflow.
Args:
workflow: The workflow definition
target: The target for the workflow
connected_servers: List of connected MCP servers
conversation_history: Conversation history list
kb_instance: Knowledge base instance
run_agent_func: Function to run agent queries
Returns:
List of workflow results
"""
available_tools = MCPManager.get_available_tools(connected_servers)
print(f"\n{Fore.CYAN}Starting Automated Workflow: {workflow['name']}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Target: {target}{Style.RESET_ALL}")
print(f"{Fore.CYAN}Available Tools: {', '.join(available_tools) if available_tools else 'None'}{Style.RESET_ALL}")
print(f"{Fore.WHITE}Description: {workflow['description']}{Style.RESET_ALL}")
print(f"{Fore.WHITE}{'='*60}{Style.RESET_ALL}")
results = []
for i, step in enumerate(workflow['steps'], 1):
print(f"\n{Fore.CYAN}Step {i}/{len(workflow['steps'])}{Style.RESET_ALL}")
formatted_step = step.format(target=target)
print(f"{Fore.WHITE}{formatted_step}{Style.RESET_ALL}")
# Create comprehensive query for this step
enhanced_query = f"""
TARGET: {target}
STEP: {formatted_step}
Execute this step and provide the results.
"""
# Execute the step through the agent
result = await run_agent_func(
enhanced_query,
connected_servers,
history=conversation_history,
streaming=True,
kb_instance=kb_instance
)
if result and hasattr(result, "final_output"):
results.append({
"step": i,
"description": formatted_step,
"output": result.final_output
})
# Add to conversation history
conversation_history.append({
"user_query": enhanced_query,
"ai_response": result.final_output
})
print(f"{Fore.GREEN}Step {i} completed{Style.RESET_ALL}")
# Brief delay between steps
await asyncio.sleep(1)
# Workflow completion summary
print(f"{Fore.CYAN}Steps executed: {len(results)}/{len(workflow['steps'])}{Style.RESET_ALL}")
return results
def get_workflow(self, workflow_key: str) -> Optional[Dict[str, Any]]:
"""Get a workflow by its key."""
try:
return get_workflow_by_key(workflow_key)
except Exception:
return None
def get_workflow_list(self) -> List[tuple]:
"""Get list of available workflows."""
try:
return list_workflow_names()
except Exception:
return []