diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index fade6a9..9fe16bb 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -359,6 +359,93 @@ class StatusBar(Static): return text +class MemoryDiagnostics(Static): + """Live memory diagnostics widget mounted into the chat area. + + This widget polls the agent's LLM memory stats periodically and + renders a compact, updating diagnostics panel. + """ + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._timer: Optional[Timer] = None + + def on_mount(self) -> None: + # Refresh periodically for a lively display + self._timer = self.set_interval(0.8, self.refresh) + + def on_unmount(self) -> None: + if self._timer: + self._timer.stop() + + def _bar(self, ratio: float, width: int = 20) -> str: + filled = int(max(0, min(1.0, ratio)) * width) + return "█" * filled + "░" * (width - filled) + + def render(self) -> Text: + text = Text() + + try: + app = self.app + agent = getattr(app, "agent", None) + if not agent or not getattr(agent, "llm", None): + text.append("Memory Diagnostics\n", style="bold #d4d4d4") + text.append("Agent not initialized", style="#9a9a9a") + return text + + stats = agent.llm.get_memory_stats() + msgs = len(agent.conversation_history) + llm_msgs = agent._format_messages_for_llm() + current_tokens = agent.llm.memory.get_total_tokens(llm_msgs) + + budget = stats.get("token_budget") or 1 + thresh = stats.get("summarize_threshold") or budget + recent_keep = stats.get("recent_to_keep", 5) + has_summary = stats.get("has_summary", False) + summarized_count = stats.get("summarized_message_count", 0) + + # Header + text.append("Memory Diagnostics\n", style="bold #d4d4d4") + + # Use a consistent bar width for all bars and align labels + bar_width = 28 + labels = ["Tokens:", "Messages:", "Retention:"] + label_width = max(len(l) for l in labels) + + # Tokens line + ratio = current_tokens / max(1, budget) + bar = self._bar(ratio, width=bar_width) + label = "Tokens:".ljust(label_width) + text.append(f"{label} [{bar}] {current_tokens:,} / {budget:,}\n", style="#9a9a9a") + + # Messages line (scale messages to an expected max window) + expected_msgs_max = max(1, recent_keep * 6) + mratio = min(1.0, msgs / expected_msgs_max) + mbar = self._bar(mratio, width=bar_width) + label = "Messages:".ljust(label_width) + text.append(f"{label} [{mbar}] {msgs} active\n", style="#9a9a9a") + + # Retention / recent + k_ratio = min(1.0, recent_keep / max(1, recent_keep)) + keep_bar = self._bar(k_ratio, width=bar_width) + label = "Retention:".ljust(label_width) + text.append(f"{label} [{keep_bar}] keeping last {recent_keep}\n", style="#9a9a9a") + + # Summary status + summary_state = "active" if has_summary else "inactive" + emoji = "🟢" if has_summary else "🔴" + text.append(f"Summary: {emoji} {summary_state}\n", style="#9a9a9a") + + # Summarized / threshold + text.append(f"Summarized: {summarized_count} / {thresh:,}\n", style="#9a9a9a") + text.append(f"Threshold: {thresh:,}\n", style="#9a9a9a") + + except Exception as e: + text.append(f"Memory diagnostics error: {e}", style="#9a9a9a") + + return text + + # ----- Main TUI App ----- @@ -803,28 +890,22 @@ class PentestAgentTUI(App): self._add_system("Agent not initialized") def _show_memory_stats(self) -> None: - """Display memory usage statistics""" - if self.agent and self.agent.llm: - stats = self.agent.llm.get_memory_stats() - messages_count = len(self.agent.conversation_history) - - # Format messages for token counting - llm_messages = self.agent._format_messages_for_llm() - current_tokens = self.agent.llm.memory.get_total_tokens(llm_messages) - - info = ( - f"=== Memory Stats ===\n" - f"Messages: {messages_count}\n" - f"Current tokens: {current_tokens:,}\n" - f"Token budget: {stats['token_budget']:,}\n" - f"Summarize at: {stats['summarize_threshold']:,} tokens\n" - f"Recent to keep: {stats['recent_to_keep']} messages\n" - f"Has summary: {stats['has_summary']}\n" - f"Summarized: {stats['summarized_message_count']} messages" - ) - self._add_system(info) - else: + """Mount a live memory diagnostics widget into the chat area.""" + try: + scroll = self.query_one("#chat-scroll", ScrollableContainer) + except Exception: self._add_system("Agent not initialized") + return + + # Remove any existing MemoryDiagnostics widgets to avoid duplicates + try: + for w in list(scroll.query(MemoryDiagnostics)): + w.remove() + except Exception: + pass + + widget = MemoryDiagnostics(id="memory-diagnostics") + scroll.mount(widget) async def _show_notes(self) -> None: """Display saved notes""" @@ -927,8 +1008,35 @@ class PentestAgentTUI(App): # Update agent's target if agent exists if self.agent: self.agent.target = target - - self._add_system(f"@ Target set: {target}") + # Update the initial ready SystemMessage (if present) so Target appears under Runtime + try: + scroll = self.query_one("#chat-scroll", ScrollableContainer) + updated = False + for child in scroll.children: + if isinstance(child, SystemMessage) and "PentestAgent ready" in getattr(child, "message_content", ""): + # Append Target line if not already present + if "Target:" not in child.message_content: + child.message_content = child.message_content + f"\n Target: {target}" + try: + child.refresh() + except Exception: + pass + updated = True + break + if not updated: + # Fallback: add system line near top by inserting at beginning + try: + first = scroll.children[0] if scroll.children else None + msg = SystemMessage(f" Target: {target}") + if first: + scroll.mount_before(msg, first) + else: + scroll.mount(msg) + except Exception: + self._add_system(f" Target: {target}") + except Exception: + # Last resort: append a subtle system line + self._add_system(f" Target: {target}") @work(exclusive=True) async def _run_report_generation(self) -> None: