diff --git a/.env.example b/.env.example index 94fd3b3..6a5e9b7 100644 --- a/.env.example +++ b/.env.example @@ -15,17 +15,54 @@ TAVILY_API_KEY= # Other providers: azure/, bedrock/, groq/, ollama/, together_ai/ (see litellm docs) PENTESTAGENT_MODEL=gpt-5 -# Ollama local/remote API base +# Provider selection: +# Note: The app determines provider from `PENTESTAGENT_MODEL` prefix +# (e.g., `ollama/...`, `gpt-5`, `claude-...`, `gemini/...`). No separate +# `LLM_PROVIDER` variable is used. + +# Ollama base URL (set this when using an `ollama/...` model) # Example: http://127.0.0.1:11434 or http://192.168.0.165:11434 -# Set this when using Ollama as the provider so LiteLLM/clients point to the correct host -# OLLAMA_API_BASE=http://127.0.0.1:11434 +OLLAMA_BASE_URL=http://127.0.0.1:11434 + +# Example local model string (uncomment to use instead of gpt-5) +# PENTESTAGENT_MODEL="ollama/qwen2.5:7b-instruct" # Embeddings (for RAG knowledge base) # Options: openai, local (default: openai if OPENAI_API_KEY set, else local) -# PENTESTAGENT_EMBEDDINGS=local +PENTESTAGENT_EMBEDDINGS=local # Settings -PENTESTAGENT_DEBUG=false +PENTESTAGENT_DEBUG=true + +# Optional: manually declare model/context and daily token budgeting +# Useful when provider metadata isn't available or you want to enforce local limits. +# Set the model's maximum context window (in tokens). Example values: +# - Gemini large: 131072 +# - Gemini flash: 65536 +# - Ollama local model: 8192 +# PENTESTAGENT_MODEL_MAX_CONTEXT=131072 + +# Optional daily token budget tracking (integers, tokens): +# - Set the total token allowance you want to track per day +# - Set the current used amount (optional; defaults to 0) +# PENTESTAGENT_DAILY_TOKEN_BUDGET=500000 +# PENTESTAGENT_DAILY_TOKEN_USED=0 + +# --------------------------------------------------------------------------- +# Example pricing & daily token limit used by `/token` diagnostics +# Uncomment and adjust to enable cost calculations. + +# Per 1M tokens pricing (USD): +# Example (input at $2.00 / 1M, output at $12.00 / 1M) +INPUT_COST_PER_MILLION=2.0 +OUTPUT_COST_PER_MILLION=12.0 + +# Optional unified override (applies to both input and output) +# COST_PER_MILLION=14.0 + +# Example daily budget (tokens) +DAILY_TOKEN_LIMIT=1000000 +# --------------------------------------------------------------------------- # Agent max iterations (regular agent + crew workers, default: 30) # PENTESTAGENT_AGENT_MAX_ITERATIONS=30 diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index 9fe16bb..152c1f6 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -110,10 +110,11 @@ class HelpScreen(ModalScreen): #help-container { width: 60; - height: 23; + height: 26; background: #121212; border: solid #3a3a3a; padding: 1 2; + layout: vertical; } #help-title { @@ -127,6 +128,7 @@ class HelpScreen(ModalScreen): color: #9a9a9a; } + #help-close { margin-top: 1; width: auto; @@ -155,21 +157,35 @@ class HelpScreen(ModalScreen): ) def _get_help_text(self) -> str: - return """[bold]Modes:[/] Assist | Agent | Crew -[bold]Keys:[/] Enter=Send Up/Down=History Ctrl+Q=Quit + header = ( + "[bold]Modes:[/] Assist | Agent | Crew\n" + "[bold]Keys:[/] Enter=Send Up/Down=History Ctrl+Q=Quit\n\n" + "[bold]Commands:[/]\n" + ) -[bold]Commands:[/] - /agent - Run in agent mode - /crew - Run multi-agent crew mode - /target - Set target - /prompt - Show system prompt - /memory - Show memory stats - /notes - Show saved notes - /report - Generate report - /help - Show help - /clear - Clear chat - /tools - List tools - /quit - Exit""" + cmds = [ + ("/agent ", "Run in agent mode"), + ("/crew ", "Run multi-agent crew mode"), + ("/target ", "Set target"), + ("/prompt", "Show system prompt"), + ("/memory", "Show memory stats"), + ("/token", "Show token usage & cost"), + ("/notes", "Show saved notes"), + ("/report", "Generate report"), + ("/help", "Show help"), + ("/clear", "Clear chat"), + ("/tools", "List tools"), + ("/quit", "Exit"), + ] + + # Determine consistent width for command column so the dash aligns + cmd_col_width = max(len(c) for c, _ in cmds) + 3 # padding before dash + lines = [] + for cmd, desc in cmds: + pad = " " * (cmd_col_width - len(cmd)) + lines.append(f" {cmd}{pad}- {desc}") + + return header + "\n".join(lines) def action_dismiss(self) -> None: self.app.pop_screen() @@ -446,6 +462,228 @@ class MemoryDiagnostics(Static): return text +class TokenDiagnostics(Static): + """Live token/cost diagnostics panel mounted into the chat area. + + Reads persisted daily usage from the token_tracker, computes cost + using environment variables, and displays a simple ASCII progress bar. + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._timer: Optional[Timer] = None + + def on_mount(self) -> None: + # Refresh periodically for a lively display + self._timer = self.set_interval(1.0, self.refresh) + + def on_unmount(self) -> None: + if self._timer: + self._timer.stop() + + def _bar(self, ratio: float, width: int = 28) -> str: + """Block-style usage bar matching MemoryDiagnostics visuals.""" + r = max(0.0, min(1.0, ratio)) + filled = int(r * width) + return "█" * filled + "░" * (width - filled) + + def render(self) -> Text: + text = Text() + try: + import os + + # Lazy import of token_tracker (best-effort) + try: + from ..tools import token_tracker + except Exception: + token_tracker = None + + text.append("Token Usage Diagnostics\n", style="bold #d4d4d4") + + if not token_tracker: + text.append("Token tracker not available (tools/token_tracker).\n", style="#9a9a9a") + return text + + stats = token_tracker.get_stats_sync() + + # If a reset is pending (date changed), perform a reset now so daily + # usage is accurate and visible to the user. + reset_occurred = False + if stats.get("reset_pending"): + try: + token_tracker.record_usage_sync(0, 0) + stats = token_tracker.get_stats_sync() + reset_occurred = True + except Exception: + pass + + # Extract values + last_in = int(stats.get("last_input_tokens", 0) or 0) + last_out = int(stats.get("last_output_tokens", 0) or 0) + last_total = int(stats.get("last_total_tokens", 0) or 0) + daily_usage = int(stats.get("daily_usage", 0) or 0) + last_reset = stats.get("last_reset_date") + current_date = stats.get("current_date") + + # (env parsing moved below) + + # Environment cost config + def _parse_env(name: str): + v = os.getenv(name) + if v is None or v == "": + return None + try: + return float(v) + except Exception: + return "INVALID" + + unified = _parse_env("COST_PER_MILLION") + input_cost_per_m = _parse_env("INPUT_COST_PER_MILLION") + output_cost_per_m = _parse_env("OUTPUT_COST_PER_MILLION") + daily_limit = _parse_env("DAILY_TOKEN_LIMIT") + + # Determine if any env-based limits exist + has_env_limits = any( + v is not None + for v in (unified, input_cost_per_m, output_cost_per_m, daily_limit) + ) + + # If nothing has been recorded yet (no tokens, no daily usage) + # and no env limits are configured, show the concise sentinel only. + if last_total == 0 and daily_usage == 0 and not has_env_limits: + text.append("No token usage recorded\n", style="#9a9a9a") + return text + + # Validate env vars + env_errors = [] + if unified == "INVALID": + env_errors.append("COST_PER_MILLION is not numeric") + if input_cost_per_m == "INVALID": + env_errors.append("INPUT_COST_PER_MILLION is not numeric") + if output_cost_per_m == "INVALID": + env_errors.append("OUTPUT_COST_PER_MILLION is not numeric") + if daily_limit == "INVALID": + env_errors.append("DAILY_TOKEN_LIMIT is not numeric") + + if env_errors: + text.append("Environment configuration errors:\n", style="#ef4444") + for e in env_errors: + text.append(f" - {e}\n", style="#9a9a9a") + text.append("\nSet environment variables correctly to compute costs.\n", style="#9a9a9a") + return text + + # Compute costs + if unified is not None: + # Use unified cost for both input and output + input_cost = (last_in / 1_000_000.0) * float(unified) + output_cost = (last_out / 1_000_000.0) * float(unified) + else: + # Require per-direction costs to be present to compute + if input_cost_per_m is None or output_cost_per_m is None: + text.append("Cost vars missing. Set COST_PER_MILLION or both INPUT_COST_PER_MILLION and OUTPUT_COST_PER_MILLION.\n", style="#9a9a9a") + # Still show numeric token stats below + input_cost = output_cost = None + else: + input_cost = (last_in / 1_000_000.0) * float(input_cost_per_m) + output_cost = (last_out / 1_000_000.0) * float(output_cost_per_m) + + total_cost = None + if input_cost is not None and output_cost is not None: + total_cost = input_cost + output_cost + + # Daily budget calculations per spec + # Derive daily usage excluding last command (in case tracker already included it) + daily_without_last = max(daily_usage - last_total, 0) + new_daily_total = daily_without_last + last_total + + remaining_tokens = None + percent_used = None + if daily_limit is not None: + try: + dl = float(daily_limit) + remaining_tokens = max(int(dl - new_daily_total), 0) + percent_used = (new_daily_total / max(1.0, dl)) * 100.0 + except Exception: + remaining_tokens = None + + # Render structured panel with aligned labels and block bars + bar_width = 28 + labels = [ + "Last command:", + "Cost:", + "Daily usage:", + "Remaining:", + "Usage:", + "Last reset:", + "Current date:", + "Reset occurred:", + ] + label_width = max(len(l) for l in labels) + + # Last command tokens + label = "Last command:".ljust(label_width) + text.append( + f"{label} in={last_in:,} out={last_out:,} total={last_total:,}\n", + style="#9a9a9a", + ) + + # Cost line + label = "Cost:".ljust(label_width) + if input_cost is not None and output_cost is not None: + text.append( + f"{label} in=${input_cost:.6f} out=${output_cost:.6f} total=${total_cost:.6f}\n", + style="#9a9a9a", + ) + else: + text.append( + f"{label} not computed (missing env vars)\n", + style="#9a9a9a", + ) + + # Daily usage + label = "Daily usage:".ljust(label_width) + text.append(f"{label} {new_daily_total:,}\n", style="#9a9a9a") + + # Remaining tokens + label = "Remaining:".ljust(label_width) + if remaining_tokens is not None: + text.append(f"{label} {remaining_tokens:,}\n", style="#9a9a9a") + else: + text.append( + f"{label} N/A (DAILY_TOKEN_LIMIT not set)\n", + style="#9a9a9a", + ) + + # Usage percent + bar + label = "Usage:".ljust(label_width) + if percent_used is not None: + bar = self._bar(percent_used / 100.0, width=bar_width) + text.append( + f"{label} [{bar}] {percent_used:.1f}%\n", + style="#9a9a9a", + ) + else: + text.append(f"{label} N/A\n", style="#9a9a9a") + + # Dates + label = "Last reset:".ljust(label_width) + text.append(f"{label} {last_reset}\n", style="#9a9a9a") + label = "Current date:".ljust(label_width) + text.append(f"{label} {current_date}\n", style="#9a9a9a") + + # Reset occurrence + label = "Reset occurred:".ljust(label_width) + text.append( + f"{label} {'Yes' if reset_occurred else 'No'}\n", + style="#9a9a9a", + ) + + except Exception as e: + text.append(f"Token diagnostics error: {e}\n", style="#9a9a9a") + + return text + + # ----- Main TUI App ----- @@ -896,16 +1134,40 @@ class PentestAgentTUI(App): except Exception: self._add_system("Agent not initialized") return - - # Remove any existing MemoryDiagnostics widgets to avoid duplicates + # Mount a new diagnostics panel with a unique ID and scroll into view try: - for w in list(scroll.query(MemoryDiagnostics)): - w.remove() + import uuid + panel_id = f"memory-diagnostics-{uuid.uuid4().hex}" + except Exception: + panel_id = None + + widget = MemoryDiagnostics(id=panel_id) + scroll.mount(widget) + try: + scroll.scroll_end(animate=False) except Exception: pass - widget = MemoryDiagnostics(id="memory-diagnostics") + def _show_token_stats(self) -> None: + """Mount a live token diagnostics widget into the chat area.""" + try: + scroll = self.query_one("#chat-scroll", ScrollableContainer) + except Exception: + self._add_system("Agent not initialized") + return + # Mount a new diagnostics panel with a unique ID and scroll into view + try: + import uuid + panel_id = f"token-diagnostics-{uuid.uuid4().hex}" + except Exception: + panel_id = None + + widget = TokenDiagnostics(id=panel_id) scroll.mount(widget) + try: + scroll.scroll_end(animate=False) + except Exception: + pass async def _show_notes(self) -> None: """Display saved notes""" @@ -1224,6 +1486,8 @@ Be concise. Use the actual data from notes.""" self._show_system_prompt() elif cmd_lower == "/memory": self._show_memory_stats() + elif cmd_lower == "/token": + self._show_token_stats() elif cmd_lower == "/notes": await self._show_notes() elif cmd_lower == "/report":