mirror of
https://github.com/GH05TCREW/pentestagent.git
synced 2026-04-26 17:56:02 +00:00
refactor: overhaul codebase for v0.2.0
This commit is contained in:
508
ghostcrew/interface/cli.py
Normal file
508
ghostcrew/interface/cli.py
Normal file
@@ -0,0 +1,508 @@
|
||||
"""Non-interactive CLI mode for GhostCrew."""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from rich.console import Console
|
||||
from rich.markdown import Markdown
|
||||
from rich.panel import Panel
|
||||
from rich.text import Text
|
||||
|
||||
console = Console()
|
||||
|
||||
# Ghost theme colors (matching TUI)
|
||||
GHOST_PRIMARY = "#d4d4d4" # light gray - primary text
|
||||
GHOST_SECONDARY = "#9a9a9a" # medium gray - secondary text
|
||||
GHOST_DIM = "#6b6b6b" # dim gray - muted text
|
||||
GHOST_BORDER = "#3a3a3a" # dark gray - borders
|
||||
GHOST_ACCENT = "#7a7a7a" # accent gray
|
||||
|
||||
|
||||
async def run_cli(
|
||||
target: str,
|
||||
model: str,
|
||||
task: str = None,
|
||||
report: str = None,
|
||||
max_tools: int = 50,
|
||||
use_docker: bool = False,
|
||||
):
|
||||
"""
|
||||
Run GhostCrew in non-interactive mode.
|
||||
|
||||
Args:
|
||||
target: Target to test
|
||||
model: LLM model to use
|
||||
task: Optional task description
|
||||
report: Report path ("auto" for loot/<target>_<timestamp>.md)
|
||||
max_tools: Max tool calls before stopping
|
||||
use_docker: Run tools in Docker container
|
||||
"""
|
||||
from ..agents.ghostcrew_agent import GhostCrewAgent
|
||||
from ..knowledge import RAGEngine
|
||||
from ..llm import LLM
|
||||
from ..runtime.docker_runtime import DockerRuntime
|
||||
from ..runtime.runtime import LocalRuntime
|
||||
from ..tools import get_all_tools
|
||||
|
||||
# Startup panel
|
||||
start_text = Text()
|
||||
start_text.append("GHOSTCREW", style=f"bold {GHOST_PRIMARY}")
|
||||
start_text.append(" - Non-interactive Mode\n\n", style=GHOST_DIM)
|
||||
start_text.append("Target: ", style=GHOST_SECONDARY)
|
||||
start_text.append(f"{target}\n", style=GHOST_PRIMARY)
|
||||
start_text.append("Model: ", style=GHOST_SECONDARY)
|
||||
start_text.append(f"{model}\n", style=GHOST_PRIMARY)
|
||||
start_text.append("Runtime: ", style=GHOST_SECONDARY)
|
||||
start_text.append(f"{'Docker' if use_docker else 'Local'}\n", style=GHOST_PRIMARY)
|
||||
start_text.append("Max calls: ", style=GHOST_SECONDARY)
|
||||
start_text.append(f"{max_tools}\n", style=GHOST_PRIMARY)
|
||||
|
||||
task_msg = task or f"Perform a penetration test on {target}"
|
||||
start_text.append("Task: ", style=GHOST_SECONDARY)
|
||||
start_text.append(task_msg, style=GHOST_PRIMARY)
|
||||
|
||||
console.print()
|
||||
console.print(
|
||||
Panel(
|
||||
start_text, title=f"[{GHOST_SECONDARY}]Starting", border_style=GHOST_BORDER
|
||||
)
|
||||
)
|
||||
console.print()
|
||||
|
||||
# Initialize RAG if knowledge exists
|
||||
rag = None
|
||||
knowledge_path = Path("knowledge")
|
||||
if knowledge_path.exists():
|
||||
try:
|
||||
rag = RAGEngine(knowledge_path=knowledge_path)
|
||||
rag.index()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Initialize MCP if config exists (silently skip failures)
|
||||
mcp_manager = None
|
||||
mcp_count = 0
|
||||
try:
|
||||
from ..mcp import MCPManager
|
||||
from ..tools import register_tool_instance
|
||||
|
||||
mcp_manager = MCPManager()
|
||||
if mcp_manager.config_path.exists():
|
||||
mcp_tools = await mcp_manager.connect_all()
|
||||
for tool in mcp_tools:
|
||||
register_tool_instance(tool)
|
||||
mcp_count = len(mcp_tools)
|
||||
if mcp_count > 0:
|
||||
console.print(f"[{GHOST_DIM}]Loaded {mcp_count} MCP tools[/]")
|
||||
except Exception:
|
||||
pass # MCP is optional, continue without it
|
||||
|
||||
# Initialize runtime - Docker or Local
|
||||
if use_docker:
|
||||
console.print(f"[{GHOST_DIM}]Starting Docker container...[/]")
|
||||
runtime = DockerRuntime(mcp_manager=mcp_manager)
|
||||
else:
|
||||
runtime = LocalRuntime(mcp_manager=mcp_manager)
|
||||
await runtime.start()
|
||||
|
||||
llm = LLM(model=model, rag_engine=rag)
|
||||
tools = get_all_tools()
|
||||
|
||||
agent = GhostCrewAgent(
|
||||
llm=llm,
|
||||
tools=tools,
|
||||
runtime=runtime,
|
||||
target=target,
|
||||
rag_engine=rag,
|
||||
)
|
||||
|
||||
# Stats tracking
|
||||
start_time = time.time()
|
||||
tool_count = 0
|
||||
iteration = 0
|
||||
findings = [] # Store findings for report
|
||||
tool_log = [] # Log of tools executed (ts, name, command, result, exit_code)
|
||||
last_content = ""
|
||||
stopped_reason = None
|
||||
|
||||
def print_status(msg: str, style: str = GHOST_DIM):
|
||||
elapsed = int(time.time() - start_time)
|
||||
mins, secs = divmod(elapsed, 60)
|
||||
timestamp = f"[{mins:02d}:{secs:02d}]"
|
||||
console.print(f"[{GHOST_DIM}]{timestamp}[/] [{style}]{msg}[/]")
|
||||
|
||||
def generate_report() -> str:
|
||||
"""Generate markdown report."""
|
||||
elapsed = int(time.time() - start_time)
|
||||
mins, secs = divmod(elapsed, 60)
|
||||
|
||||
status_text = "Complete"
|
||||
if stopped_reason:
|
||||
status_text = f"Interrupted ({stopped_reason})"
|
||||
|
||||
lines = [
|
||||
"# GhostCrew Penetration Test Report",
|
||||
"",
|
||||
"## Executive Summary",
|
||||
"",
|
||||
]
|
||||
|
||||
# Add AI summary at top if available
|
||||
if findings:
|
||||
lines.append(findings[-1])
|
||||
lines.append("")
|
||||
else:
|
||||
lines.append("*Assessment incomplete - no analysis generated.*")
|
||||
lines.append("")
|
||||
|
||||
# Engagement details table
|
||||
lines.extend(
|
||||
[
|
||||
"## Engagement Details",
|
||||
"",
|
||||
"| Field | Value |",
|
||||
"|-------|-------|",
|
||||
f"| **Target** | `{target}` |",
|
||||
f"| **Task** | {task_msg} |",
|
||||
f"| **Date** | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |",
|
||||
f"| **Duration** | {mins}m {secs}s |",
|
||||
f"| **Commands Executed** | {tool_count} |",
|
||||
f"| **Status** | {status_text} |",
|
||||
"",
|
||||
"---",
|
||||
"",
|
||||
"## Commands Executed",
|
||||
"",
|
||||
]
|
||||
)
|
||||
|
||||
# Detailed command log
|
||||
for i, entry in enumerate(tool_log, 1):
|
||||
ts = entry.get("ts", "??:??")
|
||||
name = entry.get("name", "unknown")
|
||||
command = entry.get("command", "")
|
||||
result = entry.get("result", "")
|
||||
exit_code = entry.get("exit_code")
|
||||
|
||||
lines.append(f"### {i}. {name} `[{ts}]`")
|
||||
lines.append("")
|
||||
|
||||
if command:
|
||||
lines.append("**Command:**")
|
||||
lines.append("```")
|
||||
lines.append(command)
|
||||
lines.append("```")
|
||||
lines.append("")
|
||||
|
||||
if exit_code is not None:
|
||||
lines.append(f"**Exit Code:** `{exit_code}`")
|
||||
lines.append("")
|
||||
|
||||
if result:
|
||||
lines.append("**Output:**")
|
||||
lines.append("```")
|
||||
# Limit output to 2000 chars per command for report size
|
||||
if len(result) > 2000:
|
||||
lines.append(result[:2000])
|
||||
lines.append(f"\n... (truncated, {len(result)} total chars)")
|
||||
else:
|
||||
lines.append(result)
|
||||
lines.append("```")
|
||||
lines.append("")
|
||||
|
||||
# Findings section
|
||||
lines.extend(
|
||||
[
|
||||
"---",
|
||||
"",
|
||||
"## Analysis",
|
||||
"",
|
||||
]
|
||||
)
|
||||
|
||||
if findings:
|
||||
for i, finding in enumerate(findings, 1):
|
||||
if len(findings) > 1:
|
||||
lines.append(f"### Analysis {i}")
|
||||
lines.append("")
|
||||
lines.append(finding)
|
||||
lines.append("")
|
||||
else:
|
||||
lines.append(
|
||||
"*No AI analysis generated. Try running with higher `--max` value.*"
|
||||
)
|
||||
lines.append("")
|
||||
|
||||
# Footer
|
||||
lines.extend(
|
||||
[
|
||||
"---",
|
||||
"",
|
||||
f"*Report generated by GhostCrew on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*",
|
||||
]
|
||||
)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def save_report():
|
||||
"""Save report to file."""
|
||||
if not report:
|
||||
return
|
||||
|
||||
# Determine path
|
||||
if report == "auto":
|
||||
loot_dir = Path("loot")
|
||||
loot_dir.mkdir(exist_ok=True)
|
||||
safe_target = target.replace("://", "_").replace("/", "_").replace(":", "_")
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
report_path = loot_dir / f"{safe_target}_{timestamp}.md"
|
||||
else:
|
||||
report_path = Path(report)
|
||||
report_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
content = generate_report()
|
||||
report_path.write_text(content, encoding="utf-8")
|
||||
console.print(f"[{GHOST_SECONDARY}]Report saved: {report_path}[/]")
|
||||
|
||||
async def generate_summary():
|
||||
"""Ask the LLM to summarize findings when stopped early."""
|
||||
if not tool_log:
|
||||
return None
|
||||
|
||||
print_status("Generating summary...", GHOST_SECONDARY)
|
||||
|
||||
# Build context from tool results (use full results, not truncated)
|
||||
context_lines = ["Summarize the penetration test findings so far:\n"]
|
||||
context_lines.append(f"Target: {target}")
|
||||
context_lines.append(f"Tools executed: {tool_count}\n")
|
||||
|
||||
for entry in tool_log[-10:]: # Last 10 tools
|
||||
name = entry.get("name", "unknown")
|
||||
command = entry.get("command", "")
|
||||
result = entry.get("result", "")[:500] # Limit for context window
|
||||
context_lines.append(f"- **{name}**: `{command}`")
|
||||
if result:
|
||||
context_lines.append(f" Output: {result}")
|
||||
|
||||
context_lines.append(
|
||||
"\nProvide a brief summary of what was discovered and any security concerns found."
|
||||
)
|
||||
|
||||
try:
|
||||
response = await llm.generate(
|
||||
system_prompt="You are a penetration testing assistant. Summarize the findings concisely.",
|
||||
messages=[{"role": "user", "content": "\n".join(context_lines)}],
|
||||
tools=[],
|
||||
)
|
||||
return response.content
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
async def print_summary(interrupted: bool = False):
|
||||
nonlocal findings
|
||||
|
||||
# Generate summary if we don't have findings yet
|
||||
if not findings and tool_log:
|
||||
summary = await generate_summary()
|
||||
if summary:
|
||||
findings.append(summary)
|
||||
|
||||
elapsed = int(time.time() - start_time)
|
||||
mins, secs = divmod(elapsed, 60)
|
||||
|
||||
title = "Interrupted" if interrupted else "Finished"
|
||||
status = "PARTIAL RESULTS" if interrupted else "COMPLETE"
|
||||
if stopped_reason:
|
||||
status = f"STOPPED ({stopped_reason})"
|
||||
|
||||
final_text = Text()
|
||||
final_text.append(f"{status}\n\n", style=f"bold {GHOST_PRIMARY}")
|
||||
final_text.append("Duration: ", style=GHOST_DIM)
|
||||
final_text.append(f"{mins}m {secs}s\n", style=GHOST_SECONDARY)
|
||||
final_text.append("Iterations: ", style=GHOST_DIM)
|
||||
final_text.append(f"{iteration}\n", style=GHOST_SECONDARY)
|
||||
final_text.append("Tools: ", style=GHOST_DIM)
|
||||
final_text.append(f"{tool_count}/{max_tools}\n", style=GHOST_SECONDARY)
|
||||
|
||||
if findings:
|
||||
final_text.append("Findings: ", style=GHOST_DIM)
|
||||
final_text.append(f"{len(findings)}", style=GHOST_SECONDARY)
|
||||
|
||||
console.print()
|
||||
console.print(
|
||||
Panel(
|
||||
final_text,
|
||||
title=f"[{GHOST_SECONDARY}]{title}",
|
||||
border_style=GHOST_BORDER,
|
||||
)
|
||||
)
|
||||
|
||||
# Show summary/findings
|
||||
if findings:
|
||||
console.print()
|
||||
console.print(
|
||||
Panel(
|
||||
Markdown(findings[-1]),
|
||||
title=f"[{GHOST_PRIMARY}]Summary",
|
||||
border_style=GHOST_BORDER,
|
||||
)
|
||||
)
|
||||
|
||||
# Save report
|
||||
save_report()
|
||||
|
||||
print_status("Initializing agent...")
|
||||
|
||||
try:
|
||||
async for response in agent.agent_loop(task_msg):
|
||||
iteration += 1
|
||||
|
||||
# Show tool calls and results as they happen
|
||||
if response.tool_calls:
|
||||
for i, call in enumerate(response.tool_calls):
|
||||
tool_count += 1
|
||||
name = getattr(call, "name", None) or getattr(
|
||||
call.function, "name", "tool"
|
||||
)
|
||||
|
||||
elapsed = int(time.time() - start_time)
|
||||
mins, secs = divmod(elapsed, 60)
|
||||
ts = f"{mins:02d}:{secs:02d}"
|
||||
|
||||
# Get result if available
|
||||
if response.tool_results and i < len(response.tool_results):
|
||||
tr = response.tool_results[i]
|
||||
result_text = tr.result or tr.error or ""
|
||||
if result_text:
|
||||
# Truncate for display
|
||||
preview = result_text[:200].replace("\n", " ")
|
||||
if len(result_text) > 200:
|
||||
preview += "..."
|
||||
|
||||
# Parse args for command extraction
|
||||
command_text = ""
|
||||
exit_code = None
|
||||
try:
|
||||
args = getattr(call, "arguments", None) or getattr(
|
||||
call.function, "arguments", "{}"
|
||||
)
|
||||
if isinstance(args, str):
|
||||
import json
|
||||
|
||||
args = json.loads(args)
|
||||
if isinstance(args, dict):
|
||||
command_text = args.get("command", "")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Extract exit code from result
|
||||
if response.tool_results and i < len(response.tool_results):
|
||||
tr = response.tool_results[i]
|
||||
full_result = tr.result or tr.error or ""
|
||||
# Try to parse exit code
|
||||
if "Exit Code:" in full_result:
|
||||
try:
|
||||
import re
|
||||
|
||||
match = re.search(r"Exit Code:\s*(\d+)", full_result)
|
||||
if match:
|
||||
exit_code = int(match.group(1))
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
full_result = ""
|
||||
|
||||
# Store full data for report (not truncated)
|
||||
tool_log.append(
|
||||
{
|
||||
"ts": ts,
|
||||
"name": name,
|
||||
"command": command_text,
|
||||
"result": full_result,
|
||||
"exit_code": exit_code,
|
||||
}
|
||||
)
|
||||
|
||||
# Metasploit-style output with better spacing
|
||||
console.print() # Blank line before each tool
|
||||
print_status(f"$ {name} ({tool_count}/{max_tools})", GHOST_ACCENT)
|
||||
|
||||
# Show command/args on separate indented line (truncated for display)
|
||||
if command_text:
|
||||
display_cmd = command_text[:80]
|
||||
if len(command_text) > 80:
|
||||
display_cmd += "..."
|
||||
console.print(f" [{GHOST_DIM}]{display_cmd}[/]")
|
||||
|
||||
# Show result on separate line with status indicator
|
||||
if response.tool_results and i < len(response.tool_results):
|
||||
tr = response.tool_results[i]
|
||||
if tr.error:
|
||||
console.print(
|
||||
f" [{GHOST_DIM}][!] {tr.error[:100]}[/]"
|
||||
)
|
||||
elif tr.result:
|
||||
# Show exit code or brief result
|
||||
result_line = tr.result[:100].replace("\n", " ")
|
||||
if exit_code == 0 or "success" in result_line.lower():
|
||||
console.print(f" [{GHOST_DIM}][+] OK[/]")
|
||||
elif exit_code is not None and exit_code != 0:
|
||||
console.print(
|
||||
f" [{GHOST_DIM}][-] Exit {exit_code}[/]"
|
||||
)
|
||||
else:
|
||||
console.print(
|
||||
f" [{GHOST_DIM}][*] {result_line[:60]}...[/]"
|
||||
)
|
||||
|
||||
# Check max tools limit
|
||||
if tool_count >= max_tools:
|
||||
stopped_reason = "max calls reached"
|
||||
console.print()
|
||||
print_status(f"Max calls limit reached ({max_tools})", "yellow")
|
||||
raise StopIteration()
|
||||
|
||||
# Print assistant content immediately (analysis/findings)
|
||||
if response.content and response.content != last_content:
|
||||
last_content = response.content
|
||||
findings.append(response.content)
|
||||
|
||||
console.print()
|
||||
console.print(
|
||||
Panel(
|
||||
Markdown(response.content),
|
||||
title=f"[{GHOST_PRIMARY}]GhostCrew",
|
||||
border_style=GHOST_BORDER,
|
||||
)
|
||||
)
|
||||
console.print()
|
||||
|
||||
await print_summary(interrupted=False)
|
||||
|
||||
except StopIteration:
|
||||
await print_summary(interrupted=True)
|
||||
except (KeyboardInterrupt, asyncio.CancelledError):
|
||||
stopped_reason = "user interrupt"
|
||||
await print_summary(interrupted=True)
|
||||
except Exception as e:
|
||||
console.print(f"\n[red]Error: {e}[/]")
|
||||
stopped_reason = f"error: {e}"
|
||||
await print_summary(interrupted=True)
|
||||
|
||||
finally:
|
||||
# Cleanup MCP connections first
|
||||
if mcp_manager:
|
||||
try:
|
||||
await mcp_manager.disconnect_all()
|
||||
await asyncio.sleep(0.1) # Allow transports to close cleanly
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Then stop runtime
|
||||
if runtime:
|
||||
try:
|
||||
await runtime.stop()
|
||||
except Exception:
|
||||
pass
|
||||
Reference in New Issue
Block a user