- Restyle Token Usage diagnostics to match Memory panel (block bars, aligned labels)

- Add /token command to help and align all help entries via two-column layout
- Increase help modal height so Close button remains visible (no scroll)
- Refresh .env.example: remove unused LLM_PROVIDER, switch to OLLAMA_BASE_URL, add embeddings/debug/context budgeting, pricing vars, and daily token limit
This commit is contained in:
giveen
2026-01-11 14:46:25 -07:00
parent 4619e43d62
commit 567e25ed8c
2 changed files with 326 additions and 25 deletions

View File

@@ -15,17 +15,54 @@ TAVILY_API_KEY=
# Other providers: azure/, bedrock/, groq/, ollama/, together_ai/ (see litellm docs)
PENTESTAGENT_MODEL=gpt-5
# Ollama local/remote API base
# Provider selection:
# Note: The app determines provider from `PENTESTAGENT_MODEL` prefix
# (e.g., `ollama/...`, `gpt-5`, `claude-...`, `gemini/...`). No separate
# `LLM_PROVIDER` variable is used.
# Ollama base URL (set this when using an `ollama/...` model)
# Example: http://127.0.0.1:11434 or http://192.168.0.165:11434
# Set this when using Ollama as the provider so LiteLLM/clients point to the correct host
# OLLAMA_API_BASE=http://127.0.0.1:11434
OLLAMA_BASE_URL=http://127.0.0.1:11434
# Example local model string (uncomment to use instead of gpt-5)
# PENTESTAGENT_MODEL="ollama/qwen2.5:7b-instruct"
# Embeddings (for RAG knowledge base)
# Options: openai, local (default: openai if OPENAI_API_KEY set, else local)
# PENTESTAGENT_EMBEDDINGS=local
PENTESTAGENT_EMBEDDINGS=local
# Settings
PENTESTAGENT_DEBUG=false
PENTESTAGENT_DEBUG=true
# Optional: manually declare model/context and daily token budgeting
# Useful when provider metadata isn't available or you want to enforce local limits.
# Set the model's maximum context window (in tokens). Example values:
# - Gemini large: 131072
# - Gemini flash: 65536
# - Ollama local model: 8192
# PENTESTAGENT_MODEL_MAX_CONTEXT=131072
# Optional daily token budget tracking (integers, tokens):
# - Set the total token allowance you want to track per day
# - Set the current used amount (optional; defaults to 0)
# PENTESTAGENT_DAILY_TOKEN_BUDGET=500000
# PENTESTAGENT_DAILY_TOKEN_USED=0
# ---------------------------------------------------------------------------
# Example pricing & daily token limit used by `/token` diagnostics
# Uncomment and adjust to enable cost calculations.
# Per 1M tokens pricing (USD):
# Example (input at $2.00 / 1M, output at $12.00 / 1M)
INPUT_COST_PER_MILLION=2.0
OUTPUT_COST_PER_MILLION=12.0
# Optional unified override (applies to both input and output)
# COST_PER_MILLION=14.0
# Example daily budget (tokens)
DAILY_TOKEN_LIMIT=1000000
# ---------------------------------------------------------------------------
# Agent max iterations (regular agent + crew workers, default: 30)
# PENTESTAGENT_AGENT_MAX_ITERATIONS=30

View File

@@ -110,10 +110,11 @@ class HelpScreen(ModalScreen):
#help-container {
width: 60;
height: 23;
height: 26;
background: #121212;
border: solid #3a3a3a;
padding: 1 2;
layout: vertical;
}
#help-title {
@@ -127,6 +128,7 @@ class HelpScreen(ModalScreen):
color: #9a9a9a;
}
#help-close {
margin-top: 1;
width: auto;
@@ -155,21 +157,35 @@ class HelpScreen(ModalScreen):
)
def _get_help_text(self) -> str:
return """[bold]Modes:[/] Assist | Agent | Crew
[bold]Keys:[/] Enter=Send Up/Down=History Ctrl+Q=Quit
header = (
"[bold]Modes:[/] Assist | Agent | Crew\n"
"[bold]Keys:[/] Enter=Send Up/Down=History Ctrl+Q=Quit\n\n"
"[bold]Commands:[/]\n"
)
[bold]Commands:[/]
/agent <task> - Run in agent mode
/crew <task> - Run multi-agent crew mode
/target <host> - Set target
/prompt - Show system prompt
/memory - Show memory stats
/notes - Show saved notes
/report - Generate report
/help - Show help
/clear - Clear chat
/tools - List tools
/quit - Exit"""
cmds = [
("/agent <task>", "Run in agent mode"),
("/crew <task>", "Run multi-agent crew mode"),
("/target <host>", "Set target"),
("/prompt", "Show system prompt"),
("/memory", "Show memory stats"),
("/token", "Show token usage & cost"),
("/notes", "Show saved notes"),
("/report", "Generate report"),
("/help", "Show help"),
("/clear", "Clear chat"),
("/tools", "List tools"),
("/quit", "Exit"),
]
# Determine consistent width for command column so the dash aligns
cmd_col_width = max(len(c) for c, _ in cmds) + 3 # padding before dash
lines = []
for cmd, desc in cmds:
pad = " " * (cmd_col_width - len(cmd))
lines.append(f" {cmd}{pad}- {desc}")
return header + "\n".join(lines)
def action_dismiss(self) -> None:
self.app.pop_screen()
@@ -446,6 +462,228 @@ class MemoryDiagnostics(Static):
return text
class TokenDiagnostics(Static):
"""Live token/cost diagnostics panel mounted into the chat area.
Reads persisted daily usage from the token_tracker, computes cost
using environment variables, and displays a simple ASCII progress bar.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._timer: Optional[Timer] = None
def on_mount(self) -> None:
# Refresh periodically for a lively display
self._timer = self.set_interval(1.0, self.refresh)
def on_unmount(self) -> None:
if self._timer:
self._timer.stop()
def _bar(self, ratio: float, width: int = 28) -> str:
"""Block-style usage bar matching MemoryDiagnostics visuals."""
r = max(0.0, min(1.0, ratio))
filled = int(r * width)
return "" * filled + "" * (width - filled)
def render(self) -> Text:
text = Text()
try:
import os
# Lazy import of token_tracker (best-effort)
try:
from ..tools import token_tracker
except Exception:
token_tracker = None
text.append("Token Usage Diagnostics\n", style="bold #d4d4d4")
if not token_tracker:
text.append("Token tracker not available (tools/token_tracker).\n", style="#9a9a9a")
return text
stats = token_tracker.get_stats_sync()
# If a reset is pending (date changed), perform a reset now so daily
# usage is accurate and visible to the user.
reset_occurred = False
if stats.get("reset_pending"):
try:
token_tracker.record_usage_sync(0, 0)
stats = token_tracker.get_stats_sync()
reset_occurred = True
except Exception:
pass
# Extract values
last_in = int(stats.get("last_input_tokens", 0) or 0)
last_out = int(stats.get("last_output_tokens", 0) or 0)
last_total = int(stats.get("last_total_tokens", 0) or 0)
daily_usage = int(stats.get("daily_usage", 0) or 0)
last_reset = stats.get("last_reset_date")
current_date = stats.get("current_date")
# (env parsing moved below)
# Environment cost config
def _parse_env(name: str):
v = os.getenv(name)
if v is None or v == "":
return None
try:
return float(v)
except Exception:
return "INVALID"
unified = _parse_env("COST_PER_MILLION")
input_cost_per_m = _parse_env("INPUT_COST_PER_MILLION")
output_cost_per_m = _parse_env("OUTPUT_COST_PER_MILLION")
daily_limit = _parse_env("DAILY_TOKEN_LIMIT")
# Determine if any env-based limits exist
has_env_limits = any(
v is not None
for v in (unified, input_cost_per_m, output_cost_per_m, daily_limit)
)
# If nothing has been recorded yet (no tokens, no daily usage)
# and no env limits are configured, show the concise sentinel only.
if last_total == 0 and daily_usage == 0 and not has_env_limits:
text.append("No token usage recorded\n", style="#9a9a9a")
return text
# Validate env vars
env_errors = []
if unified == "INVALID":
env_errors.append("COST_PER_MILLION is not numeric")
if input_cost_per_m == "INVALID":
env_errors.append("INPUT_COST_PER_MILLION is not numeric")
if output_cost_per_m == "INVALID":
env_errors.append("OUTPUT_COST_PER_MILLION is not numeric")
if daily_limit == "INVALID":
env_errors.append("DAILY_TOKEN_LIMIT is not numeric")
if env_errors:
text.append("Environment configuration errors:\n", style="#ef4444")
for e in env_errors:
text.append(f" - {e}\n", style="#9a9a9a")
text.append("\nSet environment variables correctly to compute costs.\n", style="#9a9a9a")
return text
# Compute costs
if unified is not None:
# Use unified cost for both input and output
input_cost = (last_in / 1_000_000.0) * float(unified)
output_cost = (last_out / 1_000_000.0) * float(unified)
else:
# Require per-direction costs to be present to compute
if input_cost_per_m is None or output_cost_per_m is None:
text.append("Cost vars missing. Set COST_PER_MILLION or both INPUT_COST_PER_MILLION and OUTPUT_COST_PER_MILLION.\n", style="#9a9a9a")
# Still show numeric token stats below
input_cost = output_cost = None
else:
input_cost = (last_in / 1_000_000.0) * float(input_cost_per_m)
output_cost = (last_out / 1_000_000.0) * float(output_cost_per_m)
total_cost = None
if input_cost is not None and output_cost is not None:
total_cost = input_cost + output_cost
# Daily budget calculations per spec
# Derive daily usage excluding last command (in case tracker already included it)
daily_without_last = max(daily_usage - last_total, 0)
new_daily_total = daily_without_last + last_total
remaining_tokens = None
percent_used = None
if daily_limit is not None:
try:
dl = float(daily_limit)
remaining_tokens = max(int(dl - new_daily_total), 0)
percent_used = (new_daily_total / max(1.0, dl)) * 100.0
except Exception:
remaining_tokens = None
# Render structured panel with aligned labels and block bars
bar_width = 28
labels = [
"Last command:",
"Cost:",
"Daily usage:",
"Remaining:",
"Usage:",
"Last reset:",
"Current date:",
"Reset occurred:",
]
label_width = max(len(l) for l in labels)
# Last command tokens
label = "Last command:".ljust(label_width)
text.append(
f"{label} in={last_in:,} out={last_out:,} total={last_total:,}\n",
style="#9a9a9a",
)
# Cost line
label = "Cost:".ljust(label_width)
if input_cost is not None and output_cost is not None:
text.append(
f"{label} in=${input_cost:.6f} out=${output_cost:.6f} total=${total_cost:.6f}\n",
style="#9a9a9a",
)
else:
text.append(
f"{label} not computed (missing env vars)\n",
style="#9a9a9a",
)
# Daily usage
label = "Daily usage:".ljust(label_width)
text.append(f"{label} {new_daily_total:,}\n", style="#9a9a9a")
# Remaining tokens
label = "Remaining:".ljust(label_width)
if remaining_tokens is not None:
text.append(f"{label} {remaining_tokens:,}\n", style="#9a9a9a")
else:
text.append(
f"{label} N/A (DAILY_TOKEN_LIMIT not set)\n",
style="#9a9a9a",
)
# Usage percent + bar
label = "Usage:".ljust(label_width)
if percent_used is not None:
bar = self._bar(percent_used / 100.0, width=bar_width)
text.append(
f"{label} [{bar}] {percent_used:.1f}%\n",
style="#9a9a9a",
)
else:
text.append(f"{label} N/A\n", style="#9a9a9a")
# Dates
label = "Last reset:".ljust(label_width)
text.append(f"{label} {last_reset}\n", style="#9a9a9a")
label = "Current date:".ljust(label_width)
text.append(f"{label} {current_date}\n", style="#9a9a9a")
# Reset occurrence
label = "Reset occurred:".ljust(label_width)
text.append(
f"{label} {'Yes' if reset_occurred else 'No'}\n",
style="#9a9a9a",
)
except Exception as e:
text.append(f"Token diagnostics error: {e}\n", style="#9a9a9a")
return text
# ----- Main TUI App -----
@@ -896,16 +1134,40 @@ class PentestAgentTUI(App):
except Exception:
self._add_system("Agent not initialized")
return
# Remove any existing MemoryDiagnostics widgets to avoid duplicates
# Mount a new diagnostics panel with a unique ID and scroll into view
try:
for w in list(scroll.query(MemoryDiagnostics)):
w.remove()
import uuid
panel_id = f"memory-diagnostics-{uuid.uuid4().hex}"
except Exception:
panel_id = None
widget = MemoryDiagnostics(id=panel_id)
scroll.mount(widget)
try:
scroll.scroll_end(animate=False)
except Exception:
pass
widget = MemoryDiagnostics(id="memory-diagnostics")
def _show_token_stats(self) -> None:
"""Mount a live token diagnostics widget into the chat area."""
try:
scroll = self.query_one("#chat-scroll", ScrollableContainer)
except Exception:
self._add_system("Agent not initialized")
return
# Mount a new diagnostics panel with a unique ID and scroll into view
try:
import uuid
panel_id = f"token-diagnostics-{uuid.uuid4().hex}"
except Exception:
panel_id = None
widget = TokenDiagnostics(id=panel_id)
scroll.mount(widget)
try:
scroll.scroll_end(animate=False)
except Exception:
pass
async def _show_notes(self) -> None:
"""Display saved notes"""
@@ -1224,6 +1486,8 @@ Be concise. Use the actual data from notes."""
self._show_system_prompt()
elif cmd_lower == "/memory":
self._show_memory_stats()
elif cmd_lower == "/token":
self._show_token_stats()
elif cmd_lower == "/notes":
await self._show_notes()
elif cmd_lower == "/report":