refactor: update CLI, add token tracking

This commit is contained in:
GH05TCREW
2025-12-15 21:26:04 -07:00
parent ec30a07e4e
commit c457c2943b
5 changed files with 207 additions and 102 deletions

View File

@@ -597,6 +597,7 @@ Call the create_plan tool with your steps."""
role="assistant",
content="\n".join(plan_display),
metadata={"auto_plan": True},
usage=response.usage,
)
self.conversation_history.append(plan_msg)
return plan_msg

View File

@@ -167,6 +167,12 @@ class CrewOrchestrator:
tools=crew_tools,
)
# Track tokens for orchestrator
if response.usage:
total = response.usage.get("total_tokens", 0)
if total > 0:
yield {"phase": "tokens", "tokens": total}
# Check for tool calls first to determine if content is "thinking" or "final answer"
if response.tool_calls:
# If there are tool calls, the content is "thinking" (reasoning before action)

View File

@@ -133,10 +133,26 @@ class WorkerPool:
worker.tools_used.append(tc.name)
self._emit(worker.id, "tool", {"tool": tc.name})
# Track tokens
# Track tokens (avoid double counting)
if response.usage:
total = response.usage.get("total_tokens", 0)
if total > 0:
is_intermediate = response.metadata.get("intermediate", False)
has_tools = bool(response.tool_calls)
# Same logic as CLI to avoid double counting
should_count = False
if is_intermediate:
should_count = True
worker.last_msg_intermediate = True
elif has_tools:
if not getattr(worker, "last_msg_intermediate", False):
should_count = True
worker.last_msg_intermediate = False
else:
should_count = True
worker.last_msg_intermediate = False
if should_count and total > 0:
self._emit(worker.id, "tokens", {"tokens": total})
# Capture final response (text without tool calls)
@@ -150,7 +166,22 @@ class WorkerPool:
if response.metadata.get("replan_impossible"):
is_infeasible = True
worker.result = final_response or "No findings."
# Prioritize structured results from the plan over chatty summaries
plan_summary = ""
plan = getattr(worker_runtime, "plan", None)
if plan and plan.steps:
completed_steps = [
s for s in plan.steps if s.status == "complete" and s.result
]
if completed_steps:
summary_lines = []
for s in completed_steps:
summary_lines.append(f"- {s.description}: {s.result}")
plan_summary = "\n".join(summary_lines)
# Use plan summary if available, otherwise fallback to chat response
worker.result = plan_summary or final_response or "No findings."
worker.completed_at = time.time()
self._results[worker.id] = worker.result

View File

@@ -25,7 +25,7 @@ async def run_cli(
model: str,
task: str = None,
report: str = None,
max_tools: int = 50,
max_loops: int = 50,
use_docker: bool = False,
):
"""
@@ -36,7 +36,7 @@ async def run_cli(
model: LLM model to use
task: Optional task description
report: Report path ("auto" for loot/reports/<target>_<timestamp>.md)
max_tools: Max tool calls before stopping
max_loops: Max agent loops before stopping
use_docker: Run tools in Docker container
"""
from ..agents.ghostcrew_agent import GhostCrewAgent
@@ -56,8 +56,8 @@ async def run_cli(
start_text.append(f"{model}\n", style=GHOST_PRIMARY)
start_text.append("Runtime: ", style=GHOST_SECONDARY)
start_text.append(f"{'Docker' if use_docker else 'Local'}\n", style=GHOST_PRIMARY)
start_text.append("Max calls: ", style=GHOST_SECONDARY)
start_text.append(f"{max_tools}\n", style=GHOST_PRIMARY)
start_text.append("Max loops: ", style=GHOST_SECONDARY)
start_text.append(f"{max_loops}\n", style=GHOST_PRIMARY)
task_msg = task or f"Perform a penetration test on {target}"
start_text.append("Task: ", style=GHOST_SECONDARY)
@@ -122,9 +122,13 @@ async def run_cli(
start_time = time.time()
tool_count = 0
iteration = 0
findings = [] # Store findings for report
findings_count = 0 # Count of notes/findings recorded
findings = [] # Store actual findings text
total_tokens = 0 # Track total token usage
messages = [] # Store agent messages
tool_log = [] # Log of tools executed (ts, name, command, result, exit_code)
last_content = ""
last_msg_intermediate = False # Track if previous message was intermediate (to avoid double counting tokens)
stopped_reason = None
def print_status(msg: str, style: str = GHOST_DIM):
@@ -301,13 +305,13 @@ async def run_cli(
return None
async def print_summary(interrupted: bool = False):
nonlocal findings
nonlocal messages
# Generate summary if we don't have findings yet
if not findings and tool_log:
# Generate summary if we don't have messages yet
if not messages and tool_log:
summary = await generate_summary()
if summary:
findings.append(summary)
messages.append(summary)
elapsed = int(time.time() - start_time)
mins, secs = divmod(elapsed, 60)
@@ -321,14 +325,18 @@ async def run_cli(
final_text.append(f"{status}\n\n", style=f"bold {GHOST_PRIMARY}")
final_text.append("Duration: ", style=GHOST_DIM)
final_text.append(f"{mins}m {secs}s\n", style=GHOST_SECONDARY)
final_text.append("Iterations: ", style=GHOST_DIM)
final_text.append(f"{iteration}\n", style=GHOST_SECONDARY)
final_text.append("Loops: ", style=GHOST_DIM)
final_text.append(f"{iteration}/{max_loops}\n", style=GHOST_SECONDARY)
final_text.append("Tools: ", style=GHOST_DIM)
final_text.append(f"{tool_count}/{max_tools}\n", style=GHOST_SECONDARY)
final_text.append(f"{tool_count}\n", style=GHOST_SECONDARY)
if findings:
if total_tokens > 0:
final_text.append("Tokens: ", style=GHOST_DIM)
final_text.append(f"{total_tokens:,}\n", style=GHOST_SECONDARY)
if findings_count > 0:
final_text.append("Findings: ", style=GHOST_DIM)
final_text.append(f"{len(findings)}", style=GHOST_SECONDARY)
final_text.append(f"{findings_count}", style=GHOST_SECONDARY)
console.print()
console.print(
@@ -339,12 +347,12 @@ async def run_cli(
)
)
# Show summary/findings
if findings:
# Show summary/messages only if it's new content (not just displayed)
if messages and messages[-1] != last_content:
console.print()
console.print(
Panel(
Markdown(findings[-1]),
Markdown(messages[-1]),
title=f"[{GHOST_PRIMARY}]Summary",
border_style=GHOST_BORDER,
)
@@ -359,6 +367,27 @@ async def run_cli(
async for response in agent.agent_loop(task_msg):
iteration += 1
# Track token usage
if response.usage:
usage = response.usage.get("total_tokens", 0)
is_intermediate = response.metadata.get("intermediate", False)
has_tools = bool(response.tool_calls)
# Logic to avoid double counting:
# 1. Intermediate messages (thinking) always count
# 2. Tool messages count ONLY if not preceded by intermediate message
if is_intermediate:
total_tokens += usage
last_msg_intermediate = True
elif has_tools:
if not last_msg_intermediate:
total_tokens += usage
last_msg_intermediate = False
else:
# Other messages (like plan)
total_tokens += usage
last_msg_intermediate = False
# Show tool calls and results as they happen
if response.tool_calls:
for i, call in enumerate(response.tool_calls):
@@ -367,6 +396,26 @@ async def run_cli(
call.function, "name", "tool"
)
# Track findings (notes tool)
if name == "notes":
findings_count += 1
try:
args = getattr(call, "arguments", None) or getattr(
call.function, "arguments", "{}"
)
if isinstance(args, str):
import json
args = json.loads(args)
if isinstance(args, dict):
note_content = args.get("content", "") or args.get(
"note", ""
)
if note_content:
findings.append(note_content)
except Exception:
pass
elapsed = int(time.time() - start_time)
mins, secs = divmod(elapsed, 60)
ts = f"{mins:02d}:{secs:02d}"
@@ -427,7 +476,7 @@ async def run_cli(
# Metasploit-style output with better spacing
console.print() # Blank line before each tool
print_status(f"$ {name} ({tool_count}/{max_tools})", GHOST_ACCENT)
print_status(f"$ {name} ({tool_count})", GHOST_ACCENT)
# Show command/args on separate indented line (truncated for display)
if command_text:
@@ -457,17 +506,10 @@ async def run_cli(
f" [{GHOST_DIM}][*] {result_line[:60]}...[/]"
)
# Check max tools limit
if tool_count >= max_tools:
stopped_reason = "max calls reached"
console.print()
print_status(f"Max calls limit reached ({max_tools})", "yellow")
raise StopIteration()
# Print assistant content immediately (analysis/findings)
if response.content and response.content != last_content:
last_content = response.content
findings.append(response.content)
messages.append(response.content)
console.print()
console.print(
@@ -479,6 +521,13 @@ async def run_cli(
)
console.print()
# Check max loops limit
if iteration >= max_loops:
stopped_reason = "max loops reached"
console.print()
print_status(f"Max loops limit reached ({max_loops})", "yellow")
raise StopIteration()
await print_summary(interrupted=False)
except StopIteration:

View File

@@ -8,24 +8,70 @@ from .cli import run_cli
from .tui import run_tui
def parse_arguments() -> argparse.Namespace:
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="GhostCrew - AI Penetration Testing",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
ghostcrew Launch TUI
ghostcrew -t 192.168.1.1 Launch TUI with target
ghostcrew -n -t example.com Non-interactive run
ghostcrew tools list List available tools
ghostcrew mcp list List MCP servers
ghostcrew tui Launch TUI
ghostcrew tui -t 192.168.1.1 Launch TUI with target
ghostcrew run -t localhost --task "scan" Headless run
ghostcrew tools list List available tools
ghostcrew mcp list List MCP servers
""",
)
parser.add_argument("--version", action="version", version="GhostCrew 0.2.0")
# Subcommands
subparsers = parser.add_subparsers(dest="command", help="Commands")
# Common arguments for runtime modes
runtime_parent = argparse.ArgumentParser(add_help=False)
runtime_parent.add_argument("--target", "-t", help="Target (IP, hostname, or URL)")
runtime_parent.add_argument(
"--model",
"-m",
default=DEFAULT_MODEL,
help="LLM model (set GHOSTCREW_MODEL in .env)",
)
runtime_parent.add_argument(
"--docker",
"-d",
action="store_true",
help="Run tools inside Docker container (requires Docker)",
)
# TUI subcommand
subparsers.add_parser(
"tui", parents=[runtime_parent], help="Launch TUI (Interactive Mode)"
)
# Run subcommand (Headless)
run_parser = subparsers.add_parser(
"run", parents=[runtime_parent], help="Run in headless mode"
)
run_parser.add_argument("task", nargs="+", help="Task to run")
run_parser.add_argument(
"--report",
"-r",
nargs="?",
const="auto",
help=(
"Generate report. "
"If used without value, auto-generates path under loot/reports/. "
"If omitted, no report is generated."
),
)
run_parser.add_argument(
"--max-loops",
type=int,
default=50,
help="Max agent loops before stopping (default: 50)",
)
# Tools subcommand
tools_parser = subparsers.add_parser("tools", help="Manage tools")
tools_subparsers = tools_parser.add_subparsers(
@@ -51,7 +97,7 @@ Examples:
mcp_add.add_argument("name", help="Server name")
mcp_add.add_argument("command", help="Command to run (e.g., npx)")
mcp_add.add_argument("args", nargs="*", help="Command arguments")
mcp_add.add_argument("--description", "-d", default="", help="Server description")
mcp_add.add_argument("--description", default="", help="Server description")
# mcp remove
mcp_remove = mcp_subparsers.add_parser("remove", help="Remove an MCP server")
@@ -61,54 +107,7 @@ Examples:
mcp_test = mcp_subparsers.add_parser("test", help="Test MCP server connection")
mcp_test.add_argument("name", help="Server name to test")
# Target option
parser.add_argument("--target", "-t", help="Target (IP, hostname, or URL)")
# Non-interactive mode
parser.add_argument(
"-n",
"--headless",
action="store_true",
help="Run without TUI (requires --target)",
)
# Task for non-interactive mode
parser.add_argument("--task", help="Task to run in non-interactive mode")
# Report output (saves to loot/reports/ by default)
parser.add_argument(
"--report",
"-r",
nargs="?",
const="auto",
help="Generate report (default: loot/reports/<target>_<timestamp>.md)",
)
# Max tool calls limit
parser.add_argument(
"--max", type=int, default=50, help="Max calls before stopping (default: 50)"
)
# Model options
parser.add_argument(
"--model",
"-m",
default=DEFAULT_MODEL,
help="LLM model (set GHOSTCREW_MODEL in .env)",
)
# Docker mode
parser.add_argument(
"--docker",
"-d",
action="store_true",
help="Run tools inside Docker container (requires Docker)",
)
# Version
parser.add_argument("--version", action="version", version="GhostCrew 0.2.0")
return parser.parse_args()
return parser, parser.parse_args()
def handle_tools_command(args: argparse.Namespace):
@@ -242,7 +241,7 @@ def handle_mcp_command(args: argparse.Namespace):
def main():
"""Main entry point."""
args = parse_arguments()
parser, args = parse_arguments()
# Handle subcommands
if args.command == "tools":
@@ -253,36 +252,55 @@ def main():
handle_mcp_command(args)
return
# Check model configuration
if not args.model:
print("Error: No model configured.")
print("Set GHOSTCREW_MODEL in .env file or use --model flag.")
print(
"Example: GHOSTCREW_MODEL=gpt-5 or GHOSTCREW_MODEL=claude-sonnet-4-20250514"
)
return
# Determine interface mode
if args.headless:
if not args.target:
print("Error: --target is required for headless mode")
if args.command == "run":
# Check model configuration
if not args.model:
print("Error: No model configured.")
print("Set GHOSTCREW_MODEL in .env file or use --model flag.")
print(
"Example: GHOSTCREW_MODEL=gpt-5 or GHOSTCREW_MODEL=claude-sonnet-4-20250514"
)
return
if not args.target:
print("Error: --target is required for run mode")
return
# Join task arguments
task_description = " ".join(args.task)
try:
asyncio.run(
run_cli(
target=args.target,
model=args.model,
task=args.task,
task=task_description,
report=args.report,
max_tools=args.max,
max_loops=args.max_loops,
use_docker=args.docker,
)
)
except KeyboardInterrupt:
print("\n[!] Interrupted by user.")
else:
# TUI doesn't need asyncio.run - it runs its own event loop
return
if args.command == "tui":
# Check model configuration
if not args.model:
print("Error: No model configured.")
print("Set GHOSTCREW_MODEL in .env file or use --model flag.")
print(
"Example: GHOSTCREW_MODEL=gpt-5 or GHOSTCREW_MODEL=claude-sonnet-4-20250514"
)
return
run_tui(target=args.target, model=args.model, use_docker=args.docker)
return
# If no command provided, default to TUI
if args.command is None:
run_tui(target=None, model=DEFAULT_MODEL, use_docker=False)
return
if __name__ == "__main__":