mirror of
https://github.com/GH05TCREW/pentestagent.git
synced 2026-03-07 14:23:20 +00:00
tui: show Target under header + live Memory Diagnostics
- Update /target to show Target under initial ready header - Add MemoryDiagnostics widget with aligned tokens/messages/retention bars - Mount diagnostics on /memory command
This commit is contained in:
@@ -359,6 +359,93 @@ class StatusBar(Static):
|
||||
return text
|
||||
|
||||
|
||||
class MemoryDiagnostics(Static):
|
||||
"""Live memory diagnostics widget mounted into the chat area.
|
||||
|
||||
This widget polls the agent's LLM memory stats periodically and
|
||||
renders a compact, updating diagnostics panel.
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._timer: Optional[Timer] = None
|
||||
|
||||
def on_mount(self) -> None:
|
||||
# Refresh periodically for a lively display
|
||||
self._timer = self.set_interval(0.8, self.refresh)
|
||||
|
||||
def on_unmount(self) -> None:
|
||||
if self._timer:
|
||||
self._timer.stop()
|
||||
|
||||
def _bar(self, ratio: float, width: int = 20) -> str:
|
||||
filled = int(max(0, min(1.0, ratio)) * width)
|
||||
return "█" * filled + "░" * (width - filled)
|
||||
|
||||
def render(self) -> Text:
|
||||
text = Text()
|
||||
|
||||
try:
|
||||
app = self.app
|
||||
agent = getattr(app, "agent", None)
|
||||
if not agent or not getattr(agent, "llm", None):
|
||||
text.append("Memory Diagnostics\n", style="bold #d4d4d4")
|
||||
text.append("Agent not initialized", style="#9a9a9a")
|
||||
return text
|
||||
|
||||
stats = agent.llm.get_memory_stats()
|
||||
msgs = len(agent.conversation_history)
|
||||
llm_msgs = agent._format_messages_for_llm()
|
||||
current_tokens = agent.llm.memory.get_total_tokens(llm_msgs)
|
||||
|
||||
budget = stats.get("token_budget") or 1
|
||||
thresh = stats.get("summarize_threshold") or budget
|
||||
recent_keep = stats.get("recent_to_keep", 5)
|
||||
has_summary = stats.get("has_summary", False)
|
||||
summarized_count = stats.get("summarized_message_count", 0)
|
||||
|
||||
# Header
|
||||
text.append("Memory Diagnostics\n", style="bold #d4d4d4")
|
||||
|
||||
# Use a consistent bar width for all bars and align labels
|
||||
bar_width = 28
|
||||
labels = ["Tokens:", "Messages:", "Retention:"]
|
||||
label_width = max(len(l) for l in labels)
|
||||
|
||||
# Tokens line
|
||||
ratio = current_tokens / max(1, budget)
|
||||
bar = self._bar(ratio, width=bar_width)
|
||||
label = "Tokens:".ljust(label_width)
|
||||
text.append(f"{label} [{bar}] {current_tokens:,} / {budget:,}\n", style="#9a9a9a")
|
||||
|
||||
# Messages line (scale messages to an expected max window)
|
||||
expected_msgs_max = max(1, recent_keep * 6)
|
||||
mratio = min(1.0, msgs / expected_msgs_max)
|
||||
mbar = self._bar(mratio, width=bar_width)
|
||||
label = "Messages:".ljust(label_width)
|
||||
text.append(f"{label} [{mbar}] {msgs} active\n", style="#9a9a9a")
|
||||
|
||||
# Retention / recent
|
||||
k_ratio = min(1.0, recent_keep / max(1, recent_keep))
|
||||
keep_bar = self._bar(k_ratio, width=bar_width)
|
||||
label = "Retention:".ljust(label_width)
|
||||
text.append(f"{label} [{keep_bar}] keeping last {recent_keep}\n", style="#9a9a9a")
|
||||
|
||||
# Summary status
|
||||
summary_state = "active" if has_summary else "inactive"
|
||||
emoji = "🟢" if has_summary else "🔴"
|
||||
text.append(f"Summary: {emoji} {summary_state}\n", style="#9a9a9a")
|
||||
|
||||
# Summarized / threshold
|
||||
text.append(f"Summarized: {summarized_count} / {thresh:,}\n", style="#9a9a9a")
|
||||
text.append(f"Threshold: {thresh:,}\n", style="#9a9a9a")
|
||||
|
||||
except Exception as e:
|
||||
text.append(f"Memory diagnostics error: {e}", style="#9a9a9a")
|
||||
|
||||
return text
|
||||
|
||||
|
||||
# ----- Main TUI App -----
|
||||
|
||||
|
||||
@@ -803,28 +890,22 @@ class PentestAgentTUI(App):
|
||||
self._add_system("Agent not initialized")
|
||||
|
||||
def _show_memory_stats(self) -> None:
|
||||
"""Display memory usage statistics"""
|
||||
if self.agent and self.agent.llm:
|
||||
stats = self.agent.llm.get_memory_stats()
|
||||
messages_count = len(self.agent.conversation_history)
|
||||
|
||||
# Format messages for token counting
|
||||
llm_messages = self.agent._format_messages_for_llm()
|
||||
current_tokens = self.agent.llm.memory.get_total_tokens(llm_messages)
|
||||
|
||||
info = (
|
||||
f"=== Memory Stats ===\n"
|
||||
f"Messages: {messages_count}\n"
|
||||
f"Current tokens: {current_tokens:,}\n"
|
||||
f"Token budget: {stats['token_budget']:,}\n"
|
||||
f"Summarize at: {stats['summarize_threshold']:,} tokens\n"
|
||||
f"Recent to keep: {stats['recent_to_keep']} messages\n"
|
||||
f"Has summary: {stats['has_summary']}\n"
|
||||
f"Summarized: {stats['summarized_message_count']} messages"
|
||||
)
|
||||
self._add_system(info)
|
||||
else:
|
||||
"""Mount a live memory diagnostics widget into the chat area."""
|
||||
try:
|
||||
scroll = self.query_one("#chat-scroll", ScrollableContainer)
|
||||
except Exception:
|
||||
self._add_system("Agent not initialized")
|
||||
return
|
||||
|
||||
# Remove any existing MemoryDiagnostics widgets to avoid duplicates
|
||||
try:
|
||||
for w in list(scroll.query(MemoryDiagnostics)):
|
||||
w.remove()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
widget = MemoryDiagnostics(id="memory-diagnostics")
|
||||
scroll.mount(widget)
|
||||
|
||||
async def _show_notes(self) -> None:
|
||||
"""Display saved notes"""
|
||||
@@ -927,8 +1008,35 @@ class PentestAgentTUI(App):
|
||||
# Update agent's target if agent exists
|
||||
if self.agent:
|
||||
self.agent.target = target
|
||||
|
||||
self._add_system(f"@ Target set: {target}")
|
||||
# Update the initial ready SystemMessage (if present) so Target appears under Runtime
|
||||
try:
|
||||
scroll = self.query_one("#chat-scroll", ScrollableContainer)
|
||||
updated = False
|
||||
for child in scroll.children:
|
||||
if isinstance(child, SystemMessage) and "PentestAgent ready" in getattr(child, "message_content", ""):
|
||||
# Append Target line if not already present
|
||||
if "Target:" not in child.message_content:
|
||||
child.message_content = child.message_content + f"\n Target: {target}"
|
||||
try:
|
||||
child.refresh()
|
||||
except Exception:
|
||||
pass
|
||||
updated = True
|
||||
break
|
||||
if not updated:
|
||||
# Fallback: add system line near top by inserting at beginning
|
||||
try:
|
||||
first = scroll.children[0] if scroll.children else None
|
||||
msg = SystemMessage(f" Target: {target}")
|
||||
if first:
|
||||
scroll.mount_before(msg, first)
|
||||
else:
|
||||
scroll.mount(msg)
|
||||
except Exception:
|
||||
self._add_system(f" Target: {target}")
|
||||
except Exception:
|
||||
# Last resort: append a subtle system line
|
||||
self._add_system(f" Target: {target}")
|
||||
|
||||
@work(exclusive=True)
|
||||
async def _run_report_generation(self) -> None:
|
||||
|
||||
Reference in New Issue
Block a user