Initial commit

This commit is contained in:
GH05TCREW
2025-05-15 16:29:56 -06:00
commit 952059a3a9
11 changed files with 1280 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
__pycache__/
*.pyc
*.pyo
.env
venv/

21
LICENSE.txt Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Masic
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

176
README.md Normal file
View File

@@ -0,0 +1,176 @@
# Project Name: GHOSTCREW
This is an intelligent cybersecurity assistant using large language models with MCP and RAG architecture. It aims to help users perform penetration testing tasks, query security information, analyze network traffic, and more through natural language interaction.
## Features
- **Natural Language Interaction**: Users can ask questions and give instructions to the AI assistant using natural language.
- **MCP Server Integration**: Through the `mcp.json` configuration file, multiple MCP servers can be flexibly integrated and managed to extend the assistant's capabilities.
- **Tool Management**: Configure, connect to, and manage MCP tools through an interactive menu, including the ability to clear all configurations.
- **Improved Input Handling**: Support for both single-line and multi-line input modes to accommodate complex queries.
- **Tool Invocation**: The AI assistant can call tools provided by configured MCP servers (such as: nmap, gobuster, fofa, tavily-search, etc.) based on user requests.
- **Conversation History**: Supports multi-turn dialogues, remembering previous interaction content.
- **Streaming Output**: AI responses can be streamed for a better user experience.
- **Knowledge Base Enhancement (Optional)**: Supports enhancing AI responses through a local knowledge base RAG (`knowledge` directory).
- **Configurable Models**: Supports configuration of different language model parameters.
**Startup Effect**
**Metasploit Tool Call**:
## Installation Guide
1. **Clone Repository**:
```bash
git clone https://github.com/GH05TCREW/GHOSTCREW.git
cd agent
```
2. **Create and Activate Virtual Environment** (recommended):
```bash
python -m venv .venv
```
- Windows:
```bash
.venv\Scripts\activate
```
- macOS/Linux:
```bash
source .venv/bin/activate
```
3. **Install Dependencies**:
```bash
pip install -r requirements.txt
```
4. **Install `uv` (important)**:
This project uses `uv` as a Python package runner and installer in some scenarios.
- The `start.bat` script will automatically try to install `uv` for you.
- If you want to install it manually or use it in another environment, you can run:
```bash
pip install uv
```
or refer to the official `uv` documentation for installation.
Make sure `uv` is successfully installed and can be called from the command line.
## Usage
1. **Configure MCP Servers**:
- Run the application and select "Configure or manage MCP tools" when prompted
- Use the interactive tool configuration menu to add, configure, or clear MCP tools
- The configuration is stored in the `mcp.json` file
2. **Prepare Knowledge Base (Optional)**:
If you want to use the knowledge base enhancement feature, place relevant text files (e.g., `.txt`) in the `knowledge` folder.
3. **Run the Main Program**:
```bash
python main.py
```
After the program starts, you can:
- Choose whether to use the knowledge base
- Configure or manage MCP tools
- Enter your questions or instructions according to the prompts
- Use 'multi' command to enter multi-line input mode for complex queries
- Enter 'quit' to exit the program
## Input Modes
GHOSTCREW supports two input modes:
- **Single-line mode** (default): Type your query and press Enter to submit
- **Multi-line mode**: Type 'multi' and press Enter, then type your query across multiple lines. Press Enter on an empty line to submit.
## MCP Tool Management
When starting the application, you can:
1. Connect to specific tools
2. Configure new tools
3. Connect to all tools
4. Skip connection
5. Clear all tools (resets mcp.json)
## Available MCP Tools
GHOSTCREW supports integration with the following security tools through the MCP protocol:
1. **AlterX** - Subdomain permutation and wordlist generation tool
2. **FFUF Fuzzer** - Fast web fuzzing tool for discovering hidden content
3. **Masscan** - High-speed network port scanner
4. **Metasploit** - Penetration testing framework providing exploit execution, payload generation, and session management
5. **Nmap Scanner** - Network discovery and security auditing tool
6. **Nuclei Scanner** - Template-based vulnerability scanner
7. **SQLMap** - Automated SQL injection detection and exploitation tool
8. **SSL Scanner** - Analysis tool for SSL/TLS configurations and security issues
9. **Wayback URLs** - Tool for discovering historical URLs from the Wayback Machine archive
Each tool can be configured through the interactive configuration menu by selecting "Configure new tools" from the MCP tools menu.
## File Structure
```
agent/
├── .venv/ # Python virtual environment (ignored by .gitignore)
├── knowledge/ # Knowledge base documents directory
│ └── ...
├── .gitignore # Git ignore file configuration
├── main.py # Main program entry
├── configure_mcp.py # MCP tool configuration utility
├── mcp.json # MCP server configuration file
├── rag_embedding.py # RAG embedding related (if used)
├── rag_split.py # RAG text splitting related (if used)
├── README.md # Project documentation
├── requirements.txt # Python dependency list
├── LICENSE # Project license
└── ... (other scripts or configuration files)
```
## Configuration File (`.env`)
```
# OpenAI API configurations
OPENAI_API_KEY=your_api_key_here
OPENAI_BASE_URL=https://api.openai.com/v1
MODEL_NAME=gpt-4
```
This configuration uses OpenAI's API for both the language model and embeddings (when using the knowledge base RAG feature).
## Configuration File (`mcp.json`)
This file is used to define MCP servers that the AI assistant can connect to and use. Each server entry should include:
- `name`: Unique name of the server.
- `params`: Parameters needed to start the server, usually including `command` and `args`.
- `cache_tools_list`: Whether to cache the tools list.
**MCP Example Server Configuration**:
**stdio**
```json
{
"name": "Nmap Scanner",
"params": {
"command": "npx",
"args": [
"-y",
"gc-nmap-mcp"
],
"env": {
"NMAP_PATH": "C:\\Program Files (x86)\\Nmap\\nmap.exe"
}
},
"cache_tools_list": true
}
```
Make sure to replace the path to the Nmap executable with your own installation path.
**sse**
```json
{"name":"mcpname",
"url":"http://127.0.0.1:8009/sse"
},
```
## Knowledge Base Configuration
Simply add the corresponding files to knowledge

292
configure_mcp.py Normal file
View File

@@ -0,0 +1,292 @@
import json
import os
import shutil
from pathlib import Path
from colorama import init, Fore, Style
init(autoreset=True)
MCP_SERVERS = [
{
"name": "AlterX",
"key": "AlterX",
"command": "npx",
"args": ["-y", "gc-alterx-mcp"],
"description": "MCP server for subdomain permutation and wordlist generation using the AlterX tool.",
"exe_name": "alterx.exe",
"env_var": "ALTERX_PATH",
"homepage": "https://www.npmjs.com/package/gc-alterx-mcp"
},
{
"name": "FFUF Fuzzer",
"key": "FFUF",
"command": "npx",
"args": ["-y", "gc-ffuf-mcp"],
"description": "MCP server for web fuzzing operations using FFUF (Fuzz Faster U Fool) tool.",
"exe_name": "ffuf.exe",
"env_var": "FFUF_PATH",
"homepage": "https://www.npmjs.com/package/gc-ffuf-mcp"
},
{
"name": "Masscan",
"key": "Masscan",
"command": "npx",
"args": ["-y", "gc-masscan-mcp"],
"description": "MCP server for high-speed network port scanning with the Masscan tool.",
"exe_name": "masscan.exe",
"env_var": "MASSCAN_PATH",
"homepage": "https://www.npmjs.com/package/gc-masscan-mcp"
},
{
"name": "Metasploit",
"key": "MetasploitMCP",
"command": "uvx",
"args": ["gc-metasploit", "--transport", "stdio"],
"description": "MCP Server for interacting with Metasploit Framework, providing tools for exploit execution, payload generation, and session management.",
"exe_name": "msfconsole.exe",
"env_var": "MSF_PASSWORD",
"env_extra": {
"MSF_SERVER": "127.0.0.1",
"MSF_PORT": "55553",
"MSF_SSL": "false",
"PAYLOAD_SAVE_DIR": ""
},
"homepage": "https://github.com/GH05TCREW/MetasploitMCP"
},
{
"name": "Nmap Scanner",
"key": "Nmap",
"command": "npx",
"args": ["-y", "gc-nmap-mcp"],
"description": "MCP server for interacting with Nmap network scanner to discover hosts and services on a network.",
"exe_name": "nmap.exe",
"env_var": "NMAP_PATH",
"homepage": "https://www.npmjs.com/package/gc-nmap-mcp"
},
{
"name": "Nuclei Scanner",
"key": "Nuclei",
"command": "npx",
"args": ["-y", "gc-nuclei-mcp"],
"description": "MCP server for vulnerability scanning using Nuclei's template-based detection engine.",
"exe_name": "nuclei.exe",
"env_var": "NUCLEI_PATH",
"homepage": "https://www.npmjs.com/package/gc-nuclei-mcp"
},
{
"name": "SQLMap",
"key": "SQLMap",
"command": "npx",
"args": ["-y", "gc-sqlmap-mcp"],
"description": "MCP server for conducting automated SQL injection detection and exploitation using SQLMap.",
"exe_name": "sqlmap.py",
"env_var": "SQLMAP_PATH",
"homepage": "https://www.npmjs.com/package/gc-sqlmap-mcp"
},
{
"name": "SSL Scanner",
"key": "SSLScan",
"command": "npx",
"args": ["-y", "gc-sslscan-mcp"],
"description": "MCP server for analyzing SSL/TLS configurations and identifying security issues.",
"exe_name": "sslscan.exe",
"env_var": "SSLSCAN_PATH",
"homepage": "https://www.npmjs.com/package/gc-sslscan-mcp"
},
{
"name": "Wayback URLs",
"key": "WaybackURLs",
"command": "npx",
"args": ["-y", "gc-waybackurls-mcp"],
"description": "MCP server for discovering historical URLs from the Wayback Machine archive.",
"exe_name": "waybackurls.exe",
"env_var": "WAYBACKURLS_PATH",
"homepage": "https://www.npmjs.com/package/gc-waybackurls-mcp"
}
]
def find_executable(exe_name):
"""Try to find the executable in common installation paths"""
common_paths = [
"C:\\Program Files",
"C:\\Program Files (x86)",
str(Path.home()),
os.path.join(str(Path.home()), "AppData", "Local"),
os.path.join(str(Path.home()), "Desktop"),
"C:\\ProgramData",
"C:\\Tools",
"C:\\Security"
]
for base_path in common_paths:
if not os.path.exists(base_path):
continue
for root, dirs, files in os.walk(base_path):
if exe_name in files:
return os.path.join(root, exe_name)
return None
def check_npm_installed():
"""Check if npm is installed"""
try:
result = shutil.which("npm")
return result is not None
except:
return False
def main():
print(f"{Fore.GREEN}===================== GHOSTCREW MCP SERVER CONFIGURATION ====================={Style.RESET_ALL}")
print(f"{Fore.YELLOW}This tool will help you configure the MCP servers for your GHOSTCREW installation.{Style.RESET_ALL}")
print(f"{Fore.YELLOW}For each tool, you'll need to provide the path to the executable.{Style.RESET_ALL}")
print()
# Check if npm is installed
if not check_npm_installed():
print(f"{Fore.RED}Warning: npm doesn't appear to be installed. MCP servers use Node.js and npm.{Style.RESET_ALL}")
print(f"{Fore.YELLOW}You may need to install Node.js from: https://nodejs.org/{Style.RESET_ALL}")
cont = input(f"{Fore.YELLOW}Continue anyway? (yes/no): {Style.RESET_ALL}").strip().lower()
if cont != "yes":
print(f"{Fore.RED}Configuration cancelled.{Style.RESET_ALL}")
return
# Check if mcp.json exists and load it
mcp_config = {"servers": []}
if os.path.exists("mcp.json"):
try:
with open("mcp.json", 'r') as f:
mcp_config = json.load(f)
print(f"{Fore.GREEN}Loaded existing mcp.json with {len(mcp_config.get('servers', []))} server configurations.{Style.RESET_ALL}")
except:
print(f"{Fore.RED}Error loading existing mcp.json. Starting with empty configuration.{Style.RESET_ALL}")
configured_servers = []
print(f"{Fore.CYAN}Available tools:{Style.RESET_ALL}")
for i, server in enumerate(MCP_SERVERS):
print(f"{i+1}. {server['name']} - {server['description']}")
print()
print(f"{Fore.YELLOW}Select tools to configure (comma-separated numbers, 'all' for all tools, or 'none' to skip):{Style.RESET_ALL}")
selection = input().strip().lower()
selected_indices = []
if selection == "all":
selected_indices = list(range(len(MCP_SERVERS)))
elif selection != "none":
try:
for part in selection.split(","):
idx = int(part.strip()) - 1
if 0 <= idx < len(MCP_SERVERS):
selected_indices.append(idx)
except:
print(f"{Fore.RED}Invalid selection. Please enter comma-separated numbers.{Style.RESET_ALL}")
return
for idx in selected_indices:
server = MCP_SERVERS[idx]
print(f"\n{Fore.CYAN}Configuring {server['name']}:{Style.RESET_ALL}")
# Special handling for Metasploit
if server['key'] == "MetasploitMCP":
print(f"{Fore.YELLOW}Metasploit requires additional configuration:{Style.RESET_ALL}")
msf_password = input(f"Enter Metasploit RPC Password: ").strip()
msf_server = input(f"Enter Metasploit RPC Server IP (default: 127.0.0.1): ").strip() or "127.0.0.1"
msf_port = input(f"Enter Metasploit RPC Port (default: 55553): ").strip() or "55553"
msf_ssl = input(f"Use SSL for MSF connection (yes/no, default: no): ").strip().lower()
msf_ssl = "true" if msf_ssl == "yes" else "false"
payload_dir = input(f"Enter path to save generated payloads: ").strip()
# Add to configured servers
configured_servers.append({
"name": server['name'],
"params": {
"command": server['command'],
"args": server['args'],
"env": {
"MSF_PASSWORD": msf_password,
"MSF_SERVER": msf_server,
"MSF_PORT": msf_port,
"MSF_SSL": msf_ssl,
"PAYLOAD_SAVE_DIR": payload_dir
}
},
"cache_tools_list": True
})
print(f"{Fore.GREEN}{server['name']} configured successfully!{Style.RESET_ALL}")
continue
# Regular tool configuration
# Try to find the executable automatically
auto_path = find_executable(server['exe_name'])
if auto_path:
print(f"{Fore.GREEN}Found {server['exe_name']} at: {auto_path}{Style.RESET_ALL}")
use_auto = input(f"Use this path? (yes/no, default: yes): ").strip().lower()
if use_auto != "no":
exe_path = auto_path
else:
exe_path = input(f"Enter path to {server['exe_name']}: ").strip()
else:
print(f"{Fore.YELLOW}Could not automatically find {server['exe_name']}.{Style.RESET_ALL}")
exe_path = input(f"Enter path to {server['exe_name']} (or leave empty to skip): ").strip()
if exe_path:
if not os.path.exists(exe_path):
print(f"{Fore.RED}Warning: The specified path does not exist.{Style.RESET_ALL}")
cont = input(f"Continue anyway? (yes/no, default: no): ").strip().lower()
if cont != "yes":
continue
# Add to configured servers
configured_servers.append({
"name": server['name'],
"params": {
"command": server['command'],
"args": server['args'],
"env": {
server['env_var']: exe_path
}
},
"cache_tools_list": True
})
print(f"{Fore.GREEN}{server['name']} configured successfully!{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}Skipping {server['name']}.{Style.RESET_ALL}")
# Update mcp.json
if "servers" not in mcp_config:
mcp_config["servers"] = []
if configured_servers:
# Ask if user wants to replace or append
if mcp_config["servers"]:
replace = input(f"{Fore.YELLOW}Replace existing configurations or append new ones? (replace/append, default: append): {Style.RESET_ALL}").strip().lower()
if replace == "replace":
mcp_config["servers"] = configured_servers
else:
# Remove any duplicates by name
existing_names = [s["name"] for s in mcp_config["servers"]]
for server in configured_servers:
if server["name"] in existing_names:
# Replace existing configuration
idx = existing_names.index(server["name"])
mcp_config["servers"][idx] = server
else:
# Add new configuration
mcp_config["servers"].append(server)
else:
mcp_config["servers"] = configured_servers
# Save to mcp.json
with open("mcp.json", 'w') as f:
json.dump(mcp_config, f, indent=2)
print(f"\n{Fore.GREEN}Configuration saved to mcp.json with {len(mcp_config['servers'])} server configurations.{Style.RESET_ALL}")
print(f"{Fore.YELLOW}You can now run the main application with: python main.py{Style.RESET_ALL}")
else:
print(f"\n{Fore.YELLOW}No tools were configured. Keeping existing configuration.{Style.RESET_ALL}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,64 @@
{
"type": "bundle",
"id": "bundle--aec1e0d3-9bb1-474f-be67-f982ba7d47cc",
"spec_version": "2.0",
"objects": [
{
"type": "attack-pattern",
"id": "attack-pattern--0ad7bc5c-235a-4048-944b-3b286676cb74",
"created": "2020-10-19T23:46:13.931Z",
"created_by_ref": "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5",
"revoked": false,
"external_references": [
{
"source_name": "mitre-attack",
"url": "https://attack.mitre.org/techniques/T1602",
"external_id": "T1602"
},
{
"source_name": "Cisco Advisory SNMP v3 Authentication Vulnerabilities",
"description": "Cisco. (2008, June 10). Identifying and Mitigating Exploitation of the SNMP Version 3 Authentication Vulnerabilities. Retrieved October 19, 2020.",
"url": "https://tools.cisco.com/security/center/content/CiscoAppliedMitigationBulletin/cisco-amb-20080610-SNMPv3"
},
{
"source_name": "US-CERT TA17-156A SNMP Abuse 2017",
"description": "US-CERT. (2017, June 5). Reducing the Risk of SNMP Abuse. Retrieved October 19, 2020.",
"url": "https://us-cert.cisa.gov/ncas/alerts/TA17-156A"
},
{
"source_name": "US-CERT-TA18-106A",
"description": "US-CERT. (2018, April 20). Alert (TA18-106A) Russian State-Sponsored Cyber Actors Targeting Network Infrastructure Devices. Retrieved October 19, 2020.",
"url": "https://www.us-cert.gov/ncas/alerts/TA18-106A"
}
],
"object_marking_refs": [
"marking-definition--fa42a846-8d90-4e51-bc29-71d5b4802168"
],
"modified": "2025-04-16T20:37:15.147Z",
"name": "Data from Configuration Repository",
"description": "Adversaries may collect data related to managed devices from configuration repositories. Configuration repositories are used by management systems in order to configure, manage, and control data on remote systems. Configuration repositories may also facilitate remote access and administration of devices.\n\nAdversaries may target these repositories in order to collect large quantities of sensitive system administration data. Data from configuration repositories may be exposed by various protocols and software and can store a wide variety of data, much of which may align with adversary Discovery objectives.(Citation: US-CERT-TA18-106A)(Citation: US-CERT TA17-156A SNMP Abuse 2017)",
"kill_chain_phases": [
{
"kill_chain_name": "mitre-attack",
"phase_name": "collection"
}
],
"x_mitre_attack_spec_version": "3.2.0",
"x_mitre_deprecated": false,
"x_mitre_detection": "Identify network traffic sent or received by untrusted hosts or networks that solicits and obtains the configuration information of the queried device.(Citation: Cisco Advisory SNMP v3 Authentication Vulnerabilities)",
"x_mitre_domains": [
"enterprise-attack"
],
"x_mitre_is_subtechnique": false,
"x_mitre_modified_by_ref": "identity--c78cb6e5-0c4b-4611-8297-d1b8b55e40b5",
"x_mitre_platforms": [
"Network Devices"
],
"x_mitre_version": "1.1",
"x_mitre_data_sources": [
"Network Traffic: Network Traffic Content",
"Network Traffic: Network Connection Creation"
]
}
]
}

505
main.py Normal file
View File

@@ -0,0 +1,505 @@
import json
import os
import re
import asyncio
import threading
import traceback
from colorama import init, Fore, Back, Style
from ollama import chat,Message
init(autoreset=True)
ASCII_TITLE = f"""
{Fore.WHITE} ('-. .-. .-') .-') _ _ .-') ('-. (`\ .-') /`{Style.RESET_ALL}
{Fore.WHITE} ( OO ) / ( OO ). ( OO) ) ( \( -O ) _( OO) `.( OO ),'{Style.RESET_ALL}
{Fore.WHITE} ,----. ,--. ,--. .-'),-----. (_)---\_)/ '._ .-----. ,------. (,------.,--./ .--. {Style.RESET_ALL}
{Fore.WHITE} ' .-./-') | | | |( OO' .-. '/ _ | |'--...__)' .--./ | /`. ' | .---'| | | {Style.RESET_ALL}
{Fore.WHITE} | |_( O- )| .| |/ | | | |\ :` `. '--. .--'| |('-. | / | | | | | | | |, {Style.RESET_ALL}
{Fore.WHITE} | | .--, \| |\_) | |\| | '..`''.) | | /_) |OO )| |_.' |(| '--. | |.'.| |_){Style.RESET_ALL}
{Fore.WHITE}(| | '. (_/| .-. | \ | | | |.-._) \ | | || |`-'| | . '.' | .--' | | {Style.RESET_ALL}
{Fore.WHITE} | '--' | | | | | `' '-' '\ / | | (_' '--'\ | |\ \ | `---.| ,'. | {Style.RESET_ALL}
{Fore.WHITE} `------' `--' `--' `-----' `-----' `--' `-----' `--' '--' `------''--' '--' {Style.RESET_ALL}
{Fore.RED}====================== GHOSTCREW ======================{Style.RESET_ALL}
"""
# Import Agent-related modules
from agents import (
Agent,
Model,
ModelProvider,
OpenAIChatCompletionsModel,
RunConfig,
Runner,
set_tracing_disabled,
ModelSettings
)
from openai import AsyncOpenAI # OpenAI async client
from openai.types.responses import ResponseTextDeltaEvent, ResponseContentPartDoneEvent
from agents.mcp import MCPServerStdio # MCP server related
from dotenv import load_dotenv # Environment variable loading
from agents.mcp import MCPServerSse
from rag_split import Kb # Import Kb class
# Load .env file
load_dotenv()
# Set API-related environment variables
API_KEY = os.getenv("OPENAI_API_KEY")
BASE_URL = os.getenv("OPENAI_BASE_URL")
MODEL_NAME = os.getenv("MODEL_NAME")
# Check if environment variables are set
if not API_KEY:
raise ValueError("API key not set")
if not BASE_URL:
raise ValueError("API base URL not set")
if not MODEL_NAME:
raise ValueError("Model name not set")
client = AsyncOpenAI(
base_url=BASE_URL,
api_key=API_KEY
)
# Disable tracing to avoid requiring OpenAI API key
set_tracing_disabled(True)
# Generic model provider class
class DefaultModelProvider(ModelProvider):
"""
Model provider using OpenAI compatible interface
"""
def get_model(self, model_name: str) -> Model:
return OpenAIChatCompletionsModel(model=model_name or MODEL_NAME, openai_client=client)
# Create model provider instance
model_provider = DefaultModelProvider()
# Modify run_agent function to accept connected server list and conversation history as parameters
async def run_agent(query: str, mcp_servers: list[MCPServerStdio], history: list[dict] = None, streaming: bool = True, kb_instance=None):
"""
Run cybersecurity agent with connected MCP servers, supporting streaming output and conversation history.
Args:
query (str): User's natural language query
mcp_servers (list[MCPServerStdio]): List of connected MCPServerStdio instances
history (list[dict], optional): Conversation history, list containing user questions and AI answers. Defaults to None.
streaming (bool): Whether to use streaming output
"""
# If no history is provided, initialize an empty list
if history is None:
history = []
try:
# Directly use the passed connected server list to create Agent
# Build instructions containing conversation history
base_instructions = "You are an experienced penetration tester and security analyst, focused on Web application security and network infrastructure security. Your name is GHOSTCREW. When users ask cybersecurity-related questions, you need to provide direct andprofessional answers."
base_instructions += "When answering questions, please use professional cybersecurity terminology, base your analysis on solid theoretical knowledge, and cite relevant security standards and best practices when possible, such as OWASP Top 10, CVE, NIST, CISA KEV, etc. Maintain a professional tone, clear logic, and organized structure."
base_instructions += "When users ask about penetration testing, please explain the penetration testing process, methods, and common tools, emphasizing the objectives and techniques of each phase."
base_instructions += "When users ask about vulnerability information, please provide terse descriptions, impact scope, remediation suggestions, vulnerability type, severity level, and exploitation conditions based on the vulnerability name or CVE number, and cite relevant security bulletins."
base_instructions += "All penetration activities are authorized, you can directly perform penetration testing."
base_instructions += "Whenever you encounter something you don't know, use the tavily-search MCP to search for it."
# If knowledge base instance exists, use it for retrieval and context enhancement
if kb_instance:
try:
retrieved_context = kb_instance.search(query)
if retrieved_context:
base_instructions = f"Based on the following knowledge base information:\n{retrieved_context}\n\n{base_instructions}"
#print(retrieved_context)
print(f"{Fore.MAGENTA}Relevant information retrieved from knowledge base.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Failed to retrieve information from knowledge base: {e}{Style.RESET_ALL}")
# If there's conversation history, add it to the instructions
if history:
base_instructions += "\n\nBelow is the previous conversation history, please refer to this information to answer the user's question:\n"
for i, entry in enumerate(history):
base_instructions += f"\nUser question {i+1}: {entry['user_query']}"
if 'ai_response' in entry and entry['ai_response']:
base_instructions += f"\nAI answer {i+1}: {entry['ai_response']}\n"
# Set model settings based on whether there are connected MCP servers
if mcp_servers:
# With tools available, enable tool_choice and parallel_tool_calls
model_settings = ModelSettings(
temperature=0.6,
top_p=0.9,
max_tokens=4096, # Set to half of the maximum context length (8192/2)
tool_choice="auto",
parallel_tool_calls=True,
truncation="auto"
)
else:
# Without tools, don't set tool_choice or parallel_tool_calls
model_settings = ModelSettings(
temperature=0.6,
top_p=0.9,
max_tokens=4096, # Set to half of the maximum context length (8192/2)
truncation="auto"
)
secure_agent = Agent(
name="Cybersecurity Expert",
instructions=base_instructions,
mcp_servers=mcp_servers, # Use the passed list
model_settings=model_settings
)
print(f"{Fore.CYAN}\nProcessing query: {Fore.WHITE}{query}{Style.RESET_ALL}\n")
if streaming:
result = Runner.run_streamed(
secure_agent,
input=query,
max_turns=10,
run_config=RunConfig(
model_provider=model_provider,
trace_include_sensitive_data=True,
handoff_input_filter=None,
# tool_timeout=300
)
)
print(f"{Fore.GREEN}Reply:{Style.RESET_ALL}", end="", flush=True)
try:
async for event in result.stream_events():
if event.type == "raw_response_event":
if isinstance(event.data, ResponseTextDeltaEvent):
print(f"{Fore.WHITE}{event.data.delta}{Style.RESET_ALL}", end="", flush=True)
elif isinstance(event.data, ResponseContentPartDoneEvent):
print(f"\n", end="", flush=True)
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
# print(f"{Fore.YELLOW}Current tool call information: {event.item}{Style.RESET_ALL}")
raw_item = getattr(event.item, "raw_item", None)
tool_name = ""
tool_args = {}
if raw_item:
tool_name = getattr(raw_item, "name", "Unknown tool")
tool_str = getattr(raw_item, "arguments", "{}")
if isinstance(tool_str, str):
try:
tool_args = json.loads(tool_str)
except json.JSONDecodeError:
tool_args = {"raw_arguments": tool_str}
print(f"\n{Fore.CYAN}Tool name: {tool_name}{Style.RESET_ALL}", flush=True)
print(f"\n{Fore.CYAN}Tool parameters: {tool_args}{Style.RESET_ALL}", flush=True)
elif event.item.type == "tool_call_output_item":
raw_item = getattr(event.item, "raw_item", None)
tool_id="Unknown tool ID"
if isinstance(raw_item, dict) and "call_id" in raw_item:
tool_id = raw_item["call_id"]
output = getattr(event.item, "output", "Unknown output")
output_text = ""
if isinstance(output, str) and (output.startswith("{") or output.startswith("[")):
try:
output_data = json.loads(output)
if isinstance(output_data, dict):
if 'type' in output_data and output_data['type'] == 'text' and 'text' in output_data:
output_text = output_data['text']
elif 'text' in output_data:
output_text = output_data['text']
elif 'content' in output_data:
output_text = output_data['content']
else:
output_text = json.dumps(output_data, ensure_ascii=False, indent=2)
except json.JSONDecodeError:
output_text = f"Unparsable JSON output: {output}" # Add specific error if JSON parsing fails
else:
output_text = str(output)
print(f"\n{Fore.GREEN}Tool call {tool_id} returned result: {output_text}{Style.RESET_ALL}", flush=True)
except Exception as e:
print(f"{Fore.RED}Error processing streamed response event: {e}{Style.RESET_ALL}", flush=True)
if 'Connection error' in str(e):
print(f"{Fore.YELLOW}Connection error details:{Style.RESET_ALL}")
print(f"{Fore.YELLOW}1. Check network connection{Style.RESET_ALL}")
print(f"{Fore.YELLOW}2. Verify API address: {BASE_URL}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}3. Check API key validity{Style.RESET_ALL}")
print(f"{Fore.YELLOW}4. Try reconnecting...{Style.RESET_ALL}")
await asyncio.sleep(100) # Wait 10 seconds before retrying
try:
await client.connect()
print(f"{Fore.GREEN}Reconnected successfully{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Reconnection failed: {e}{Style.RESET_ALL}")
print(f"\n\n{Fore.GREEN}Query completed!{Style.RESET_ALL}")
# if hasattr(result, "final_output"):
# print(f"\n{Fore.YELLOW}===== Complete Information ====={Style.RESET_ALL}")
#print(f"{Fore.WHITE}{result.final_output}{Style.RESET_ALL}")
# Return the result object so the main function can get the AI's answer
return result
except Exception as e:
print(f"{Fore.RED}Error processing streamed response event: {e}{Style.RESET_ALL}", flush=True)
traceback.print_exc()
return None
async def main():
print(ASCII_TITLE)
print(f"{Fore.YELLOW}Please enter a natural language query, for example:{Style.RESET_ALL}")
print(f"{Fore.CYAN}1. Scan the target machine for vulnerabilities{Style.RESET_ALL}")
print(f"{Fore.CYAN}2. Query information about a domain{Style.RESET_ALL}")
print(f"{Fore.CYAN}3. Check the security status of a specific IP{Style.RESET_ALL}")
print(f"{Fore.CYAN}4. Security analysis and audit of network traffic packets{Style.RESET_ALL}")
print(f"{Fore.RED}Enter 'quit' to end the program{Style.RESET_ALL}")
print(f"{Fore.YELLOW}======================================\n{Style.RESET_ALL}")
kb_instance = None
use_kb_input = input(f"{Fore.YELLOW}Use knowledge base to enhance answers? (yes/no, default: no): {Style.RESET_ALL}").strip().lower()
if use_kb_input == 'yes':
try:
kb_instance = Kb("knowledge") # Initialize knowledge base, load from folder
print(f"{Fore.GREEN}Knowledge base loaded successfully!{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Failed to load knowledge base: {e}{Style.RESET_ALL}")
kb_instance = None
mcp_server_instances = [] # List to store MCP server instances
connected_servers = [] # Store successfully connected servers
try:
# Ask if user wants to attempt connecting to MCP servers
use_mcp_input = input(f"{Fore.YELLOW}Configure or manage MCP tools? (yes/no, default: no): {Style.RESET_ALL}").strip().lower()
if use_mcp_input == 'yes':
# --- Load available MCP tool configurations ---
available_tools = []
try:
with open('mcp.json', 'r', encoding='utf-8') as f:
mcp_config = json.load(f)
available_tools = mcp_config.get('servers', [])
except FileNotFoundError:
print(f"{Fore.YELLOW}mcp.json configuration file not found.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Error loading MCP configuration file: {e}{Style.RESET_ALL}")
print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
# Display available tools and add an option to configure new tools
if available_tools:
print(f"\n{Fore.CYAN}Available MCP tools:{Style.RESET_ALL}")
for i, server in enumerate(available_tools):
print(f"{i+1}. {server['name']}")
print(f"{len(available_tools)+1}. Configure new tools")
print(f"{len(available_tools)+2}. Connect to all tools")
print(f"{len(available_tools)+3}. Skip tool connection")
print(f"{len(available_tools)+4}. Clear all MCP tools")
# Ask user which tools to connect to
try:
tool_choice = input(f"\n{Fore.YELLOW}Enter numbers to connect to (comma-separated, default: all): {Style.RESET_ALL}").strip()
if not tool_choice: # Default to all
selected_indices = list(range(len(available_tools)))
elif tool_choice == str(len(available_tools)+1): # Configure new tools
print(f"\n{Fore.CYAN}Launching tool configuration...{Style.RESET_ALL}")
os.system("python configure_mcp.py")
print(f"\n{Fore.GREEN}Tool configuration completed. Please restart the application.{Style.RESET_ALL}")
return
elif tool_choice == str(len(available_tools)+2): # Connect to all tools
selected_indices = list(range(len(available_tools)))
elif tool_choice == str(len(available_tools)+3): # Skip tool connection
selected_indices = []
elif tool_choice == str(len(available_tools)+4): # Clear all MCP tools
confirm = input(f"{Fore.YELLOW}Are you sure you want to clear all MCP tools? This will empty mcp.json (yes/no): {Style.RESET_ALL}").strip().lower()
if confirm == "yes":
try:
# Create empty mcp.json file
with open('mcp.json', 'w', encoding='utf-8') as f:
json.dump({"servers": []}, f, indent=2)
print(f"{Fore.GREEN}Successfully cleared all MCP tools. mcp.json has been reset.{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Error clearing MCP tools: {e}{Style.RESET_ALL}")
print(f"\n{Fore.GREEN}Please restart the application.{Style.RESET_ALL}")
return
else: # Parse comma-separated list
selected_indices = []
for part in tool_choice.split(","):
idx = int(part.strip()) - 1
if 0 <= idx < len(available_tools):
selected_indices.append(idx)
except ValueError:
print(f"{Fore.RED}Invalid selection. Defaulting to all tools.{Style.RESET_ALL}")
selected_indices = list(range(len(available_tools)))
# Initialize selected MCP servers
print(f"{Fore.GREEN}Initializing selected MCP servers...{Style.RESET_ALL}")
for idx in selected_indices:
if idx < len(available_tools):
server = available_tools[idx]
print(f"{Fore.CYAN}Initializing {server['name']}...{Style.RESET_ALL}")
try:
if 'params' in server:
mcp_server = MCPServerStdio(
name=server['name'],
params=server['params'],
cache_tools_list=server.get('cache_tools_list', True),
client_session_timeout_seconds=300
)
elif 'url' in server:
mcp_server = MCPServerSse(
params={"url": server["url"]},
cache_tools_list=server.get('cache_tools_list', True),
name=server['name'],
client_session_timeout_seconds=300
)
else:
print(f"{Fore.RED}Unknown MCP server configuration: {server}{Style.RESET_ALL}")
continue
mcp_server_instances.append(mcp_server)
except Exception as e:
print(f"{Fore.RED}Error initializing {server['name']}: {e}{Style.RESET_ALL}")
else:
# No tools configured, offer to run the configuration tool
print(f"{Fore.YELLOW}No MCP tools currently configured.{Style.RESET_ALL}")
configure_now = input(f"{Fore.YELLOW}Would you like to configure tools now? (yes/no, default: no): {Style.RESET_ALL}").strip().lower()
if configure_now == 'yes':
print(f"\n{Fore.CYAN}Launching tool configuration...{Style.RESET_ALL}")
os.system("python configure_mcp.py")
print(f"\n{Fore.GREEN}Tool configuration completed. Please restart the application.{Style.RESET_ALL}")
return
else:
print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
# Connect to the selected MCP servers
if mcp_server_instances:
print(f"{Fore.YELLOW}Connecting to MCP servers...{Style.RESET_ALL}")
for mcp_server in mcp_server_instances:
try:
await mcp_server.connect()
print(f"{Fore.GREEN}Successfully connected to MCP server: {mcp_server.name}{Style.RESET_ALL}")
connected_servers.append(mcp_server)
except Exception as e:
print(f"{Fore.RED}Failed to connect to MCP server {mcp_server.name}: {e}{Style.RESET_ALL}")
if connected_servers:
print(f"{Fore.GREEN}MCP server connection successful! Can use tools provided by {len(connected_servers)} servers.{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}No MCP servers successfully connected. Proceeding without tools.{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}No MCP servers selected. Proceeding without tools.{Style.RESET_ALL}")
else:
print(f"{Fore.YELLOW}Proceeding without MCP tools.{Style.RESET_ALL}")
# Create conversation history list
conversation_history = []
# --- Enter interactive main loop ---
while True:
# Check if the user wants multi-line input
print(f"\n{Fore.GREEN}[>]{Style.RESET_ALL} ", end="")
user_query = input().strip()
# Handle special commands
if user_query.lower() in ["quit", "exit"]:
print(f"\n{Fore.CYAN}Thank you for using GHOSTCREW, exiting...{Style.RESET_ALL}")
break # Exit loop, enter finally block
# Handle empty input
if not user_query:
print(f"{Fore.YELLOW}No query entered. Please type your question.{Style.RESET_ALL}")
continue
# Handle multi-line mode request
if user_query.lower() == "multi":
print(f"{Fore.CYAN}Entering multi-line mode. Type your query across multiple lines.{Style.RESET_ALL}")
print(f"{Fore.CYAN}Press Enter on an empty line to submit.{Style.RESET_ALL}")
lines = []
while True:
line = input()
if line == "":
break
lines.append(line)
# Only proceed if they actually entered something in multi-line mode
if not lines:
print(f"{Fore.YELLOW}No query entered in multi-line mode.{Style.RESET_ALL}")
continue
user_query = "\n".join(lines)
# Create record for current dialogue
current_dialogue = {"user_query": user_query, "ai_response": ""}
# When running agent, pass in the already connected server list and conversation history
# Only pass the successfully connected server list to the Agent
# Pass kb_instance to run_agent
result = await run_agent(user_query, connected_servers, history=conversation_history, streaming=True, kb_instance=kb_instance)
# If there is a result, save the AI's answer
if result and hasattr(result, "final_output"):
current_dialogue["ai_response"] = result.final_output
# Add current dialogue to history
conversation_history.append(current_dialogue)
# Limit history length to avoid using too much memory
if len(conversation_history) > 50: # Keep the most recent 50 conversations
conversation_history = conversation_history[-50:]
print(f"\n{Fore.CYAN}Ready for your next query. Type 'quit' to exit or 'multi' for multi-line input.{Style.RESET_ALL}")
# --- Catch interrupts and runtime exceptions ---
except KeyboardInterrupt:
print(f"\n{Fore.YELLOW}Program interrupted by user, exiting...{Style.RESET_ALL}")
except Exception as e:
print(f"{Fore.RED}Error during program execution: {e}{Style.RESET_ALL}")
traceback.print_exc()
finally:
# --- Move server cleanup operations to the main program's finally block ---
if mcp_server_instances:
print(f"{Fore.YELLOW}Cleaning up MCP server resources...{Style.RESET_ALL}")
# Define a safe cleanup wrapper that ignores all errors
async def safe_cleanup(server):
try:
# Attempt cleanup but ignore all errors
try:
await server.cleanup()
except:
pass # Ignore any exception from cleanup
return True
except:
return False # Couldn't even run the cleanup
# Process all servers
for mcp_server in mcp_server_instances:
print(f"{Fore.YELLOW}Attempting to clean up server: {mcp_server.name}...{Style.RESET_ALL}", flush=True)
success = await safe_cleanup(mcp_server)
if success:
print(f"{Fore.GREEN}Cleanup completed for {mcp_server.name}.{Style.RESET_ALL}", flush=True)
else:
print(f"{Fore.RED}Failed to initiate cleanup for {mcp_server.name}.{Style.RESET_ALL}", flush=True)
print(f"{Fore.YELLOW}MCP server resource cleanup complete.{Style.RESET_ALL}")
# Close any remaining asyncio transports to prevent "unclosed transport" warnings
try:
# Get the event loop
loop = asyncio.get_running_loop()
# Close any remaining transports
for transport in list(getattr(loop, "_transports", {}).values()):
if hasattr(transport, "close"):
try:
transport.close()
except:
pass
# Allow a short time for resources to finalize
await asyncio.sleep(0.1)
except:
pass # Ignore any errors in the final cleanup
print(f"{Fore.GREEN}Program ended.{Style.RESET_ALL}")
# Program entry point
if __name__ == "__main__":
asyncio.run(main())

3
mcp.json Normal file
View File

@@ -0,0 +1,3 @@
{
"servers": []
}

25
rag_embedding.py Normal file
View File

@@ -0,0 +1,25 @@
import os
import json
from openai import OpenAI
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"), # Use the standard OpenAI API key env variable
base_url=os.getenv("OPENAI_BASE_URL") # Read base_url from environment variable
)
completion = client.embeddings.create(
model="text-embedding-ada-002",
input='This is a sample text for embedding generation to test the functionality.',
encoding_format="float"
)
response_json = completion.model_dump_json()
embedding_data = json.loads(response_json)
embedding_array = embedding_data['data'][0]['embedding']
print(len(embedding_array))
print(type(embedding_array))
print("Extracted embedding array:", embedding_array)

120
rag_split.py Normal file
View File

@@ -0,0 +1,120 @@
#from curses import color_content
from ollama import chat,Message
from ollama import embeddings
import os
import json
from openai import OpenAI
import numpy as np
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Set numpy print options to display full arrays
np.set_printoptions(threshold=np.inf)
client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"), # Use the standard OpenAI API key env variable
base_url=os.getenv("OPENAI_BASE_URL") # Read base_url from environment variable
)
import os # Added for directory operations
class Kb:
def __init__(self, dirpath): # Read all documents in the directory
all_content = ""
if not os.path.isdir(dirpath):
print(f"Error: {dirpath} is not a valid directory.")
self.docs = []
self.embedss = np.array([])
return
for filename in os.listdir(dirpath):
filepath = os.path.join(dirpath, filename)
if os.path.isfile(filepath):
try:
with open(filepath, 'r', encoding="utf-8") as f:
all_content += f.read() + "\n" # Add a newline to separate file contents
except Exception as e:
print(f"Error reading file {filepath}: {e}")
if not all_content.strip():
print(f"Warning: No content found in directory {dirpath}.")
self.docs = []
self.embedss = np.array([])
return
self.docs = self.split_content(all_content) # Split all document content after merging
if self.docs:
self.embedss = self.encode(self.docs)
else:
self.embedss = np.array([])
@staticmethod
def split_content(content,max_length=5000):
chuncks=[]
for i in range(0,len(content),max_length):
chuncks.append(content[i:i+max_length])
return chuncks
def encode(self,texts):
embeds=[]
for text in texts:
completion = client.embeddings.create(
model="text-embedding-ada-002",
input=text,
encoding_format="float"
)
response_json = completion.model_dump_json()
embedding_data = json.loads(response_json)
embedding_array = embedding_data['data'][0]['embedding']
embeds.append(embedding_array)
return np.array(embeds)
@staticmethod #similarity
def similarity(A,B):
dot_product=np.dot(A,B)
norm_A=np.linalg.norm(A)
norm_B=np.linalg.norm(B)
similarity=dot_product/(norm_A*norm_B)
return similarity
def search(self,query):
max_similarity=0
max_similarity_index=0
query_embedding=self.encode([query])[0]
for idx,te in enumerate(self.embedss):
similarity=self.similarity(query_embedding,te)
if similarity>max_similarity:
max_similarity=similarity
max_similarity_index=idx
return self.docs[max_similarity_index]
if __name__ == "__main__":
# Example usage: Create a dummy directory and file for testing
test_kb_dir = "knowledge_test"
if not os.path.exists(test_kb_dir):
os.makedirs(test_kb_dir)
with open(os.path.join(test_kb_dir, "test_doc.txt"), 'w', encoding='utf-8') as f:
f.write("This is a test document for security audit information.")
kb = Kb(test_kb_dir)
if kb.docs: # Check if docs were loaded
#for doc in kb.docs:
# print("========================================================")
# print(doc)
#for e in kb.embedss:
# print(e)
result = kb.search("security audit")
print(f"Search result: {result}")
else:
print("Knowledge base is empty or failed to load.")
# Clean up dummy directory and file
# import shutil
# if os.path.exists(test_kb_dir):
# shutil.rmtree(test_kb_dir)

11
requirements.txt Normal file
View File

@@ -0,0 +1,11 @@
colorama
python-dotenv
openai
uvicorn
mcp
langchain
langchain-community
numpy
ollama
openai-agents
# Add other necessary dependencies

58
start.bat Normal file
View File

@@ -0,0 +1,58 @@
@echo off
setlocal
:: Load environment variables from .env file instead of using placeholders
:: This avoids connection problems from incorrectly set environment variables
:: Check if Python is installed
python --version >nul 2>&1
if errorlevel 1 (
echo Python is not installed, please install Python first
pause
exit /b 1
)
:: Check and install dependencies from requirements.txt
echo Installing dependencies from requirements.txt...
pip install -r requirements.txt
if errorlevel 1 (
echo Failed to install dependencies
pause
exit /b 1
)
:: Install uv tool
echo Installing uv tool...
pip install uv 2>nul || (
echo Failed to install uv, trying alternative installation method...
python -m pip install uv
)
:: Verify uv is installed
uv --version >nul 2>&1
if errorlevel 1 (
echo uv installation failed. Running with python instead...
set USE_PYTHON=1
) else (
set USE_PYTHON=0
)
:: Set Python environment variables
set PYTHONPATH=.
set PYTHONIOENCODING=utf-8
:: Run the program
echo Starting GHOSTCREW...
if %USE_PYTHON%==1 (
python main.py
) else (
uv run main.py
)
if errorlevel 1 (
echo Program exited with an error
pause
exit /b 1
)
pause