diff --git a/README.md b/README.md index 362fdf4..9010bd3 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,16 @@ GhostCrew has three modes, accessible via commands in the TUI: Press `Esc` to stop a running agent. `Ctrl+Q` to quit. +## Playbooks + +GhostCrew includes prebuilt **attack playbooks** for black-box security testing. Playbooks define a structured approach to specific security assessments. + +**Run a playbook:** + +```bash +ghostcrew run -t example.com --playbook thp3_web +``` + ## Tools GhostCrew includes built-in tools and supports MCP (Model Context Protocol) for extensibility. @@ -178,6 +188,7 @@ ghostcrew/ knowledge/ # RAG system and shadow graph llm/ # LiteLLM wrapper mcp/ # MCP client and server configs + playbooks/ # Attack playbooks runtime/ # Execution environment tools/ # Built-in tools ``` diff --git a/README_zh.md b/README_zh.md index 6a63c1f..8703a5b 100644 --- a/README_zh.md +++ b/README_zh.md @@ -178,6 +178,7 @@ ghostcrew/ knowledge/ # RAG 系统和影子图 llm/ # LiteLLM 包装器 mcp/ # MCP 客户端和服务器配置 + playbooks/ # 攻击剧本 runtime/ # 执行环境 tools/ # 内置工具 ``` diff --git a/ghostcrew/interface/cli.py b/ghostcrew/interface/cli.py index 1766d7a..0ce35c3 100644 --- a/ghostcrew/interface/cli.py +++ b/ghostcrew/interface/cli.py @@ -27,6 +27,7 @@ async def run_cli( report: str = None, max_loops: int = 50, use_docker: bool = False, + mode: str = "agent", ): """ Run GhostCrew in non-interactive mode. @@ -38,6 +39,7 @@ async def run_cli( report: Report path ("auto" for loot/reports/_.md) max_loops: Max agent loops before stopping use_docker: Run tools in Docker container + mode: Execution mode ("agent" or "crew") """ from ..agents.ghostcrew_agent import GhostCrewAgent from ..knowledge import RAGEngine @@ -54,6 +56,8 @@ async def run_cli( start_text.append(f"{target}\n", style=GHOST_PRIMARY) start_text.append("Model: ", style=GHOST_SECONDARY) start_text.append(f"{model}\n", style=GHOST_PRIMARY) + start_text.append("Mode: ", style=GHOST_SECONDARY) + start_text.append(f"{mode.title()}\n", style=GHOST_PRIMARY) start_text.append("Runtime: ", style=GHOST_SECONDARY) start_text.append(f"{'Docker' if use_docker else 'Local'}\n", style=GHOST_PRIMARY) start_text.append("Max loops: ", style=GHOST_SECONDARY) @@ -110,14 +114,6 @@ async def run_cli( llm = LLM(model=model, rag_engine=rag) tools = get_all_tools() - agent = GhostCrewAgent( - llm=llm, - tools=tools, - runtime=runtime, - target=target, - rag_engine=rag, - ) - # Stats tracking start_time = time.time() tool_count = 0 @@ -137,6 +133,23 @@ async def run_cli( timestamp = f"[{mins:02d}:{secs:02d}]" console.print(f"[{GHOST_DIM}]{timestamp}[/] [{style}]{msg}[/]") + def display_message(content: str, title: str) -> bool: + """Display a message panel if it hasn't been shown yet.""" + nonlocal last_content + if content and content != last_content: + console.print() + console.print( + Panel( + Markdown(content), + title=f"[{GHOST_PRIMARY}]{title}", + border_style=GHOST_BORDER, + ) + ) + console.print() + last_content = content + return True + return False + def generate_report() -> str: """Generate markdown report.""" elapsed = int(time.time() - start_time) @@ -154,9 +167,21 @@ async def run_cli( ] # Add AI summary at top if available + # If the last finding is a full report (Crew mode), use it as the main body + # and avoid adding duplicate headers + main_content = "" if findings: - lines.append(findings[-1]) - lines.append("") + main_content = findings[-1] + # If it's a full report (starts with #), don't add our own headers if possible + if not main_content.strip().startswith("#"): + lines.append(main_content) + lines.append("") + else: + # It's a full report, so we might want to replace the default header + # or just append it. Let's append it but skip the "Executive Summary" header above if we could. + # For now, just append it. + lines.append(main_content) + lines.append("") else: lines.append("*Assessment incomplete - no analysis generated.*") lines.append("") @@ -217,27 +242,25 @@ async def run_cli( lines.append("") # Findings section - lines.extend( - [ - "---", - "", - "## Analysis", - "", - ] - ) + # Only show if there are other findings besides the final report we already showed + other_findings = findings[:-1] if findings and len(findings) > 1 else [] - if findings: - for i, finding in enumerate(findings, 1): - if len(findings) > 1: - lines.append(f"### Analysis {i}") + if other_findings: + lines.extend( + [ + "---", + "", + "## Detailed Findings", + "", + ] + ) + + for i, finding in enumerate(other_findings, 1): + if len(other_findings) > 1: + lines.append(f"### Finding {i}") lines.append("") lines.append(finding) lines.append("") - else: - lines.append( - "*No AI analysis generated. Try running with higher `--max` value.*" - ) - lines.append("") # Footer lines.extend( @@ -348,57 +371,201 @@ async def run_cli( ) # Show summary/messages only if it's new content (not just displayed) - if messages and messages[-1] != last_content: - console.print() - console.print( - Panel( - Markdown(messages[-1]), - title=f"[{GHOST_PRIMARY}]Summary", - border_style=GHOST_BORDER, - ) - ) + if messages: + display_message(messages[-1], "Summary") # Save report save_report() - print_status("Initializing agent...") + print_status("Initializing...") try: - async for response in agent.agent_loop(task_msg): - iteration += 1 + if mode == "crew": + from ..agents.crew import CrewOrchestrator - # Track token usage - if response.usage: - usage = response.usage.get("total_tokens", 0) - is_intermediate = response.metadata.get("intermediate", False) - has_tools = bool(response.tool_calls) + def on_worker_event(worker_id: str, event_type: str, data: dict): + nonlocal tool_count, findings_count, total_tokens - # Logic to avoid double counting: - # 1. Intermediate messages (thinking) always count - # 2. Tool messages count ONLY if not preceded by intermediate message - if is_intermediate: - total_tokens += usage - last_msg_intermediate = True - elif has_tools: - if not last_msg_intermediate: - total_tokens += usage - last_msg_intermediate = False - else: - # Other messages (like plan) - total_tokens += usage - last_msg_intermediate = False + if event_type == "spawn": + task = data.get("task", "") + print_status(f"Spawned worker {worker_id}: {task}", GHOST_ACCENT) - # Show tool calls and results as they happen - if response.tool_calls: - for i, call in enumerate(response.tool_calls): + elif event_type == "tool": + tool_name = data.get("tool", "unknown") tool_count += 1 - name = getattr(call, "name", None) or getattr( - call.function, "name", "tool" + print_status( + f"Worker {worker_id} using tool: {tool_name}", GHOST_DIM ) - # Track findings (notes tool) - if name == "notes": - findings_count += 1 + # Log tool usage (limited info available from event) + elapsed = int(time.time() - start_time) + mins, secs = divmod(elapsed, 60) + ts = f"{mins:02d}:{secs:02d}" + + tool_log.append( + { + "ts": ts, + "name": tool_name, + "command": f"(Worker {worker_id})", + "result": "", + "exit_code": None, + } + ) + + elif event_type == "tokens": + tokens = data.get("tokens", 0) + total_tokens += tokens + + elif event_type == "complete": + f_count = data.get("findings_count", 0) + findings_count += f_count + print_status( + f"Worker {worker_id} complete ({f_count} findings)", "green" + ) + + elif event_type == "failed": + reason = data.get("reason", "unknown") + print_status(f"Worker {worker_id} failed: {reason}", "red") + + elif event_type == "status": + status = data.get("status", "") + print_status(f"Worker {worker_id} status: {status}", GHOST_DIM) + + elif event_type == "warning": + reason = data.get("reason", "unknown") + print_status(f"Worker {worker_id} warning: {reason}", "yellow") + + elif event_type == "error": + error = data.get("error", "unknown") + print_status(f"Worker {worker_id} error: {error}", "red") + + elif event_type == "cancelled": + print_status(f"Worker {worker_id} cancelled", "yellow") + + crew = CrewOrchestrator( + llm=llm, + tools=tools, + runtime=runtime, + on_worker_event=on_worker_event, + rag_engine=rag, + target=target, + ) + + async for update in crew.run(task_msg): + iteration += 1 + phase = update.get("phase", "") + + if phase == "starting": + print_status("Crew orchestrator starting...", GHOST_PRIMARY) + + elif phase == "thinking": + content = update.get("content", "") + if content: + display_message(content, "GhostCrew Plan") + + elif phase == "tool_call": + tool = update.get("tool", "") + args = update.get("args", {}) + print_status(f"Orchestrator calling: {tool}", GHOST_ACCENT) + + elif phase == "complete": + report_content = update.get("report", "") + if report_content: + messages.append(report_content) + findings.append( + report_content + ) # Add to findings so it appears in the saved report + display_message(report_content, "Crew Report") + + elif phase == "error": + error = update.get("error", "Unknown error") + print_status(f"Crew error: {error}", "red") + + if iteration >= max_loops: + stopped_reason = "max loops reached" + raise StopIteration() + + else: + # Default Agent Mode + agent = GhostCrewAgent( + llm=llm, + tools=tools, + runtime=runtime, + target=target, + rag_engine=rag, + ) + + async for response in agent.agent_loop(task_msg): + iteration += 1 + + # Track token usage + if response.usage: + usage = response.usage.get("total_tokens", 0) + is_intermediate = response.metadata.get("intermediate", False) + has_tools = bool(response.tool_calls) + + # Logic to avoid double counting: + # 1. Intermediate messages (thinking) always count + # 2. Tool messages count ONLY if not preceded by intermediate message + if is_intermediate: + total_tokens += usage + last_msg_intermediate = True + elif has_tools: + if not last_msg_intermediate: + total_tokens += usage + last_msg_intermediate = False + else: + # Other messages (like plan) + total_tokens += usage + last_msg_intermediate = False + + # Show tool calls and results as they happen + if response.tool_calls: + for i, call in enumerate(response.tool_calls): + tool_count += 1 + name = getattr(call, "name", None) or getattr( + call.function, "name", "tool" + ) + + # Track findings (notes tool) + if name == "notes": + findings_count += 1 + try: + args = getattr(call, "arguments", None) or getattr( + call.function, "arguments", "{}" + ) + if isinstance(args, str): + import json + + args = json.loads(args) + if isinstance(args, dict): + note_content = ( + args.get("value", "") + or args.get("content", "") + or args.get("note", "") + ) + if note_content: + findings.append(note_content) + except Exception: + pass + + elapsed = int(time.time() - start_time) + mins, secs = divmod(elapsed, 60) + ts = f"{mins:02d}:{secs:02d}" + + # Get result if available + if response.tool_results and i < len(response.tool_results): + tr = response.tool_results[i] + result_text = tr.result or tr.error or "" + if result_text: + # Truncate for display + preview = result_text[:200].replace("\n", " ") + if len(result_text) > 200: + preview += "..." + + # Parse args for command extraction + command_text = "" + exit_code = None try: args = getattr(call, "arguments", None) or getattr( call.function, "arguments", "{}" @@ -408,125 +575,87 @@ async def run_cli( args = json.loads(args) if isinstance(args, dict): - note_content = args.get("content", "") or args.get( - "note", "" - ) - if note_content: - findings.append(note_content) + command_text = args.get("command", "") except Exception: pass - elapsed = int(time.time() - start_time) - mins, secs = divmod(elapsed, 60) - ts = f"{mins:02d}:{secs:02d}" + # Extract exit code from result + if response.tool_results and i < len(response.tool_results): + tr = response.tool_results[i] + full_result = tr.result or tr.error or "" + # Try to parse exit code + if "Exit Code:" in full_result: + try: + import re - # Get result if available - if response.tool_results and i < len(response.tool_results): - tr = response.tool_results[i] - result_text = tr.result or tr.error or "" - if result_text: - # Truncate for display - preview = result_text[:200].replace("\n", " ") - if len(result_text) > 200: - preview += "..." + match = re.search( + r"Exit Code:\s*(\d+)", full_result + ) + if match: + exit_code = int(match.group(1)) + except Exception: + pass + else: + full_result = "" - # Parse args for command extraction - command_text = "" - exit_code = None - try: - args = getattr(call, "arguments", None) or getattr( - call.function, "arguments", "{}" + # Store full data for report (not truncated) + tool_log.append( + { + "ts": ts, + "name": name, + "command": command_text, + "result": full_result, + "exit_code": exit_code, + } ) - if isinstance(args, str): - import json - args = json.loads(args) - if isinstance(args, dict): - command_text = args.get("command", "") - except Exception: - pass + # Metasploit-style output with better spacing + console.print() # Blank line before each tool + print_status(f"$ {name} ({tool_count})", GHOST_ACCENT) - # Extract exit code from result - if response.tool_results and i < len(response.tool_results): - tr = response.tool_results[i] - full_result = tr.result or tr.error or "" - # Try to parse exit code - if "Exit Code:" in full_result: - try: - import re + # Show command/args on separate indented line (truncated for display) + if command_text: + display_cmd = command_text[:80] + if len(command_text) > 80: + display_cmd += "..." + console.print(f" [{GHOST_DIM}]{display_cmd}[/]") - match = re.search(r"Exit Code:\s*(\d+)", full_result) - if match: - exit_code = int(match.group(1)) - except Exception: - pass - else: - full_result = "" - - # Store full data for report (not truncated) - tool_log.append( - { - "ts": ts, - "name": name, - "command": command_text, - "result": full_result, - "exit_code": exit_code, - } - ) - - # Metasploit-style output with better spacing - console.print() # Blank line before each tool - print_status(f"$ {name} ({tool_count})", GHOST_ACCENT) - - # Show command/args on separate indented line (truncated for display) - if command_text: - display_cmd = command_text[:80] - if len(command_text) > 80: - display_cmd += "..." - console.print(f" [{GHOST_DIM}]{display_cmd}[/]") - - # Show result on separate line with status indicator - if response.tool_results and i < len(response.tool_results): - tr = response.tool_results[i] - if tr.error: - console.print( - f" [{GHOST_DIM}][!] {tr.error[:100]}[/]" - ) - elif tr.result: - # Show exit code or brief result - result_line = tr.result[:100].replace("\n", " ") - if exit_code == 0 or "success" in result_line.lower(): - console.print(f" [{GHOST_DIM}][+] OK[/]") - elif exit_code is not None and exit_code != 0: + # Show result on separate line with status indicator + if response.tool_results and i < len(response.tool_results): + tr = response.tool_results[i] + if tr.error: console.print( - f" [{GHOST_DIM}][-] Exit {exit_code}[/]" - ) - else: - console.print( - f" [{GHOST_DIM}][*] {result_line[:60]}...[/]" + f" [{GHOST_DIM}][!] {tr.error[:100]}[/]" ) + elif tr.result: + # Show exit code or brief result + result_line = tr.result[:100].replace("\n", " ") + if exit_code == 0 or "success" in result_line.lower(): + console.print(f" [{GHOST_DIM}][+] OK[/]") + elif exit_code is not None and exit_code != 0: + console.print( + f" [{GHOST_DIM}][-] Exit {exit_code}[/]" + ) + else: + console.print( + f" [{GHOST_DIM}][*] {result_line[:60]}...[/]" + ) - # Print assistant content immediately (analysis/findings) - if response.content and response.content != last_content: - last_content = response.content - messages.append(response.content) + # Print assistant content immediately (analysis/findings) + if response.content: + if display_message(response.content, "GhostCrew"): + messages.append(response.content) - console.print() - console.print( - Panel( - Markdown(response.content), - title=f"[{GHOST_PRIMARY}]GhostCrew", - border_style=GHOST_BORDER, - ) - ) - console.print() + # Check max loops limit + if iteration >= max_loops: + stopped_reason = "max loops reached" + console.print() + print_status(f"Max loops limit reached ({max_loops})", "yellow") + raise StopIteration() - # Check max loops limit - if iteration >= max_loops: - stopped_reason = "max loops reached" - console.print() - print_status(f"Max loops limit reached ({max_loops})", "yellow") - raise StopIteration() + # In agent mode, ensure the final message is treated as the main finding (Executive Summary) + if mode != "crew" and messages: + findings.append(messages[-1]) await print_summary(interrupted=False) diff --git a/ghostcrew/interface/main.py b/ghostcrew/interface/main.py index a23bd7f..deff2a3 100644 --- a/ghostcrew/interface/main.py +++ b/ghostcrew/interface/main.py @@ -43,6 +43,11 @@ Examples: action="store_true", help="Run tools inside Docker container (requires Docker)", ) + runtime_parent.add_argument( + "--playbook", + "-p", + help="Playbook to execute (e.g., thp3_web)", + ) # TUI subcommand subparsers.add_parser( @@ -53,7 +58,7 @@ Examples: run_parser = subparsers.add_parser( "run", parents=[runtime_parent], help="Run in headless mode" ) - run_parser.add_argument("task", nargs="+", help="Task to run") + run_parser.add_argument("task", nargs="*", help="Task to run") run_parser.add_argument( "--report", "-r", @@ -266,8 +271,32 @@ def main(): print("Error: --target is required for run mode") return - # Join task arguments - task_description = " ".join(args.task) + # Handle playbook or task + task_description = "" + mode = "agent" + if args.playbook: + from ..playbooks import get_playbook + + try: + playbook = get_playbook(args.playbook) + task_description = playbook.get_task() + mode = getattr(playbook, "mode", "agent") + + # Use playbook's max_loops if defined + if hasattr(playbook, "max_loops"): + args.max_loops = playbook.max_loops + + print(f"Loaded playbook: {playbook.name}") + print(f"Description: {playbook.description}") + print(f"Mode: {mode}") + except ValueError as e: + print(f"Error: {e}") + return + elif args.task: + task_description = " ".join(args.task) + else: + print("Error: Either task (positional) or --playbook is required") + return try: asyncio.run( @@ -278,6 +307,7 @@ def main(): report=args.report, max_loops=args.max_loops, use_docker=args.docker, + mode=mode, ) ) except KeyboardInterrupt: diff --git a/ghostcrew/playbooks/__init__.py b/ghostcrew/playbooks/__init__.py new file mode 100644 index 0000000..888b7f6 --- /dev/null +++ b/ghostcrew/playbooks/__init__.py @@ -0,0 +1,52 @@ +import importlib +import inspect +import pkgutil +from pathlib import Path +from typing import Dict, Type + +from .base_playbook import BasePlaybook + +# Registry of available playbooks +PLAYBOOKS: Dict[str, Type[BasePlaybook]] = {} + + +def _discover_playbooks(): + """Dynamically discover and register playbooks in the current package.""" + package_dir = Path(__file__).parent + + # Iterate over all modules in the current package + for _, module_name, _ in pkgutil.iter_modules([str(package_dir)]): + # Skip base_playbook to avoid circular imports or registering the base class + if module_name == "base_playbook": + continue + + try: + # Import the module + module = importlib.import_module(f".{module_name}", package=__package__) + + # Find all classes in the module that inherit from BasePlaybook + for _, obj in inspect.getmembers(module, inspect.isclass): + if issubclass(obj, BasePlaybook) and obj is not BasePlaybook: + # Register the playbook using its defined name + if hasattr(obj, "name"): + PLAYBOOKS[obj.name] = obj + except Exception as e: + print(f"Error loading playbook module {module_name}: {e}") + + +# Run discovery on import +_discover_playbooks() + + +def get_playbook(name: str) -> BasePlaybook: + """Get a playbook instance by name.""" + if name not in PLAYBOOKS: + raise ValueError( + f"Playbook '{name}' not found. Available: {list(PLAYBOOKS.keys())}" + ) + return PLAYBOOKS[name]() + + +def list_playbooks() -> list[str]: + """List available playbook names.""" + return list(PLAYBOOKS.keys()) diff --git a/ghostcrew/playbooks/base_playbook.py b/ghostcrew/playbooks/base_playbook.py new file mode 100644 index 0000000..b885978 --- /dev/null +++ b/ghostcrew/playbooks/base_playbook.py @@ -0,0 +1,33 @@ +from dataclasses import dataclass, field +from typing import List + + +@dataclass +class Phase: + name: str + objective: str + techniques: List[str] + + +class BasePlaybook: + """Base class for all playbooks.""" + + name: str = "base_playbook" + description: str = "Base playbook description" + mode: str = "agent" # "agent" or "crew" + max_loops: int = 50 + phases: List[Phase] = field(default_factory=list) + + def get_task(self) -> str: + """Convert playbook into a structured task description.""" + task = f"{self.description}\n\n" + + for phase in self.phases: + task += f"Phase: {phase.name}\n" + task += f"Objective: {phase.objective}\n" + task += "Techniques:\n" + for i, technique in enumerate(phase.techniques, 1): + task += f" {i}. {technique}\n" + task += "\n" + + return task diff --git a/ghostcrew/playbooks/thp3_network.py b/ghostcrew/playbooks/thp3_network.py new file mode 100644 index 0000000..a8f02fe --- /dev/null +++ b/ghostcrew/playbooks/thp3_network.py @@ -0,0 +1,35 @@ +from ghostcrew.playbooks.base_playbook import BasePlaybook, Phase + + +class THP3NetworkPlaybook(BasePlaybook): + name = "thp3_network" + description = "Network Compromise and Lateral Movement" + mode = "crew" + + phases = [ + Phase( + name="Initial Access", + objective="Gain initial foothold on the network", + techniques=[ + "Password spraying against external services (OWA, VPN)", + "LLMNR/NBT-NS poisoning (Responder)", + ], + ), + Phase( + name="Enumeration & Privilege Escalation", + objective="Map internal network and elevate privileges", + techniques=[ + "Active Directory enumeration (PowerView, BloodHound)", + "Identify privilege escalation paths (unquoted paths, weak perms)", + "Credential dumping (Mimikatz, local files)", + ], + ), + Phase( + name="Lateral Movement & Objectives", + objective="Move through network to reach high-value targets", + techniques=[ + "Lateral movement (WMI, PsExec, DCOM)", + "Access high-value targets and data repositories", + ], + ), + ] diff --git a/ghostcrew/playbooks/thp3_recon.py b/ghostcrew/playbooks/thp3_recon.py new file mode 100644 index 0000000..91c0d5d --- /dev/null +++ b/ghostcrew/playbooks/thp3_recon.py @@ -0,0 +1,29 @@ +from ghostcrew.playbooks.base_playbook import BasePlaybook, Phase + + +class THP3ReconPlaybook(BasePlaybook): + name = "thp3_recon" + description = "Red Team Reconnaissance" + mode = "crew" + + phases = [ + Phase( + name="Passive Reconnaissance", + objective="Gather information without direct interaction", + techniques=[ + "OSINT infrastructure identification (Shodan, Censys)", + "Subdomain discovery (Sublist3r, Amass)", + "Cloud infrastructure scanning and misconfiguration checks", + "Code repository search for leaked credentials", + ], + ), + Phase( + name="Active Reconnaissance", + objective="Interact with target to map attack surface", + techniques=[ + "Port scanning and service identification (Nmap)", + "Web service screenshotting", + "Email address harvesting", + ], + ), + ] diff --git a/ghostcrew/playbooks/thp3_web.py b/ghostcrew/playbooks/thp3_web.py new file mode 100644 index 0000000..7e09299 --- /dev/null +++ b/ghostcrew/playbooks/thp3_web.py @@ -0,0 +1,30 @@ +from ghostcrew.playbooks.base_playbook import BasePlaybook, Phase + + +class THP3WebPlaybook(BasePlaybook): + name = "thp3_web" + description = "Web Application Exploitation" + mode = "crew" + + phases = [ + Phase( + name="Discovery", + objective="Understand application attack surface", + techniques=[ + "Identify technologies and frameworks (Wappalyzer, BuiltWith)", + "Enumerate endpoints and parameters (Gobuster, Dirbuster)", + ], + ), + Phase( + name="Exploitation", + objective="Identify and exploit web vulnerabilities", + techniques=[ + "Cross-Site Scripting (XSS) (Blind, DOM-based)", + "SQL and NoSQL Injection", + "Deserialization vulnerabilities (Node.js, Java, PHP)", + "Server-Side Template Injection", + "Server-Side Request Forgery (SSRF)", + "XML External Entity (XXE)", + ], + ), + ]