Files
pentestagent/pentestagent/interface/tui.py

3155 lines
120 KiB
Python

"""
PentestAgent TUI - Terminal User Interface
"""
import asyncio
import re
import textwrap
import logging
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast
from rich.text import Text
from textual import on, work
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import (
Center,
Container,
Horizontal,
ScrollableContainer,
Vertical,
)
from textual.reactive import reactive
from textual.screen import ModalScreen
from textual.scrollbar import ScrollBar, ScrollBarRender
from textual.timer import Timer
from textual.widgets import Button, Input, Static, Tree
from textual.widgets.tree import TreeNode
from ..config.constants import DEFAULT_MODEL
# ANSI escape sequence pattern for stripping control codes from input
_ANSI_ESCAPE = re.compile(
r"\x1b\[[0-9;]*[mGKHflSTABCDEFsu]|\x1b\].*?\x07|\x1b\[<[0-9;]*[Mm]"
)
# ASCII-safe scrollbar renderer to avoid Unicode glyph issues
class ASCIIScrollBarRender(ScrollBarRender):
"""Scrollbar renderer using ASCII-safe characters."""
BLANK_GLYPH = " "
VERTICAL_BARS = [" ", " ", " ", " ", " ", " ", " ", " "]
HORIZONTAL_BARS = [" ", " ", " ", " ", " ", " ", " ", " "]
# Apply ASCII scrollbar globally
ScrollBar.renderer = ASCIIScrollBarRender
# Custom Tree with ASCII-safe icons for PowerShell compatibility
class CrewTree(Tree):
"""Tree widget with ASCII-compatible expand/collapse icons."""
ICON_NODE = "> "
ICON_NODE_EXPANDED = "v "
if TYPE_CHECKING:
from ..agents.pa_agent import PentestAgentAgent
def wrap_text_lines(text: str, width: int = 80) -> List[str]:
"""
Wrap text content preserving line breaks and wrapping long lines.
Args:
text: The text to wrap
width: Maximum width per line (default 80 for safe terminal fit)
Returns:
List of wrapped lines
"""
result = []
for line in text.split("\n"):
if len(line) <= width:
result.append(line)
else:
# Wrap long lines
wrapped = textwrap.wrap(
line, width=width, break_long_words=False, break_on_hyphens=False
)
result.extend(wrapped if wrapped else [""])
return result
# ----- Help Screen -----
class HelpScreen(ModalScreen):
"""Help modal"""
BINDINGS = [
Binding("escape", "dismiss", "Close"),
Binding("q", "dismiss", "Close"),
]
CSS = """
HelpScreen {
align: center middle;
scrollbar-background: #1a1a1a;
scrollbar-background-hover: #1a1a1a;
scrollbar-background-active: #1a1a1a;
scrollbar-color: #3a3a3a;
scrollbar-color-hover: #3a3a3a;
scrollbar-color-active: #3a3a3a;
scrollbar-corner-color: #1a1a1a;
scrollbar-size: 1 1;
}
#help-container {
width: 110;
height: 30;
background: #121212;
border: solid #3a3a3a;
padding: 1 2;
layout: vertical;
}
#help-title {
text-align: center;
text-style: bold;
color: #d4d4d4;
margin-bottom: 1;
}
#help-content {
color: #9a9a9a;
}
#help-close {
margin-top: 1;
width: auto;
min-width: 10;
background: #1a1a1a;
color: #9a9a9a;
border: none;
}
#help-close:hover {
background: #262626;
}
#help-close:focus {
background: #262626;
text-style: none;
}
"""
def compose(self) -> ComposeResult:
yield Container(
Static("PentestAgent Help", id="help-title"),
Static(self._get_help_text(), id="help-content"),
Center(Button("Close", id="help-close")),
id="help-container",
)
def _get_help_text(self) -> str:
header = (
"[bold]Modes:[/] Assist | Agent | Crew\n"
"[bold]Keys:[/] Enter=Send Up/Down=History Ctrl+Q=Quit\n\n"
"[bold]Commands:[/]\n"
)
cmds = [
("/agent <task>", "Run in agent mode"),
("/crew <task>", "Run multi-agent crew mode"),
("/target <host>", "Set target"),
("/prompt", "Show system prompt"),
("/memory", "Show memory stats"),
("/token", "Show token usage & cost"),
("/notes", "Show saved notes"),
("/report", "Generate report"),
("/help", "Show help"),
("/clear", "Clear chat"),
("/tools", "List tools"),
("/quit", "Exit"),
]
# Determine consistent width for command column so the dash aligns
cmd_col_width = max(len(c) for c, _ in cmds) + 3 # padding before dash
lines = []
for cmd, desc in cmds:
pad = " " * (cmd_col_width - len(cmd))
lines.append(f" {cmd}{pad}- {desc}")
return header + "\n".join(lines)
def action_dismiss(self) -> None:
self.app.pop_screen()
@on(Button.Pressed, "#help-close")
def close_help(self) -> None:
self.app.pop_screen()
class WorkspaceHelpScreen(ModalScreen):
"""Help modal for workspace commands."""
BINDINGS = [
Binding("escape", "dismiss", "Close"),
Binding("q", "dismiss", "Close"),
]
CSS = """
WorkspaceHelpScreen {
align: center middle;
scrollbar-background: #1a1a1a;
scrollbar-background-hover: #1a1a1a;
scrollbar-background-active: #1a1a1a;
scrollbar-color: #3a3a3a;
scrollbar-color-hover: #3a3a3a;
scrollbar-color-active: #3a3a3a;
scrollbar-corner-color: #1a1a1a;
scrollbar-size: 1 1;
}
#help-container {
width: 60;
height: 26;
background: #121212;
border: solid #3a3a3a;
padding: 1 2;
layout: vertical;
}
#help-title {
text-align: center;
text-style: bold;
color: #d4d4d4;
margin-bottom: 1;
}
#help-content {
color: #9a9a9a;
}
#help-close {
margin-top: 1;
width: auto;
min-width: 10;
background: #1a1a1a;
color: #9a9a9a;
border: none;
}
#help-close:hover {
background: #262626;
}
#help-close:focus {
background: #262626;
text-style: none;
}
"""
def compose(self) -> ComposeResult:
from rich.table import Table
from rich.text import Text
# Build a two-column table to prevent wrapping
table = Table.grid(padding=(0, 3))
table.add_column(justify="left", ratio=2)
table.add_column(justify="left", ratio=3)
# Header and usage
header = Text("Workspace Commands", style="bold")
usage = Text("Usage: /workspace <action> or /workspace <name>")
# Commands list
cmds = [
("/workspace", "Show active"),
("/workspace list", "List all workspaces"),
("/workspace info [NAME]", "Show workspace metadata"),
("/workspace note <text>", "Add operator note"),
("/workspace clear", "Deactivate workspace"),
("/workspace NAME", "Create or activate workspace"),
("/workspace help", "Show this help"),
]
# Compose table rows
table.add_row(Text("Commands:", style="bold"), Text(""))
for left, right in cmds:
table.add_row(left, right)
yield Container(
Static(header, id="help-title"),
Static(usage, id="help-usage"),
Static(table, id="help-content"),
Center(Button("Close", id="help-close"), id="help-center"),
id="help-container",
)
def _get_help_text(self) -> str:
header = "Usage: /workspace <action> or /workspace <name>\n"
cmds = [
("/workspace", "Show active"),
("/workspace list", "List all workspaces"),
("/workspace info [NAME]", "Show workspace metadata"),
("/workspace note <text>", "Add operator note"),
("/workspace clear", "Deactivate workspace"),
("/workspace NAME", "Create or activate workspace"),
("/workspace help", "Show this help"),
]
# Build two-column layout with fixed left column width
left_width = 44
lines = [header, "Commands:\n"]
for left, right in cmds:
if len(left) >= left_width - 2:
# if left is long, place on its own line
lines.append(f" {left}\n {right}")
else:
pad = " " * (left_width - len(left))
lines.append(f" {left}{pad}{right}")
return "\n".join(lines)
def action_dismiss(self) -> None:
self.app.pop_screen()
@on(Button.Pressed, "#help-close")
def close_help(self) -> None:
self.app.pop_screen()
class ToolsScreen(ModalScreen):
"""Interactive tools browser — split-pane layout.
Left pane: tree of tools. Right pane: full description (scrollable).
Selecting another tool replaces the right-pane content. Close returns
to the main screen.
"""
BINDINGS = [Binding("escape", "dismiss", "Close"), Binding("q", "dismiss", "Close")]
CSS = """
ToolsScreen { align: center middle; }
"""
def __init__(self, tools: List[Any]) -> None:
super().__init__()
self.tools = tools
def compose(self) -> ComposeResult:
# Build a split view: left tree, right description
with Container(id="tools-container"):
with Horizontal(id="tools-split"):
with Vertical(id="tools-left"):
yield Static("Tools", id="tools-title")
yield Tree("TOOLS", id="tools-tree")
with Vertical(id="tools-right"):
yield Static("Description", id="tools-desc-title")
yield ScrollableContainer(Static("Select a tool to view details.", id="tools-desc"), id="tools-desc-scroll")
yield Center(Button("Close", id="tools-close"))
def on_mount(self) -> None:
try:
tree = self.query_one("#tools-tree", Tree)
except Exception as e:
logging.getLogger(__name__).exception("Failed to query tools tree: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to initialize tools tree: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about tools tree init failure: %s", e)
return
root = tree.root
root.allow_expand = True
root.show_root = False
# Populate tool nodes
for t in self.tools:
name = getattr(t, "name", str(t))
root.add(name, data={"tool": t})
try:
tree.focus()
except Exception as e:
logging.getLogger(__name__).exception("Failed to focus tools tree: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to focus tools tree: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about tools tree focus failure: %s", e)
@on(Tree.NodeSelected, "#tools-tree")
def on_tool_selected(self, event: Tree.NodeSelected) -> None:
node = event.node
try:
tool = node.data.get("tool") if node.data else None
name = node.label or (getattr(tool, "name", str(tool)) if tool else "Unknown")
# Prefer Tool.description (registered tools use this), then fall back
desc = None
if tool is not None:
desc = getattr(tool, "description", None)
if not desc:
desc = (
getattr(tool, "summary", None)
or getattr(tool, "help_text", None)
or getattr(tool, "__doc__", None)
)
if not desc:
desc = "No description available."
# Update right-hand description pane
try:
desc_widget = self.query_one("#tools-desc", Static)
text = Text()
text.append(f"{name}\n", style="bold #d4d4d4")
text.append(str(desc), style="#d4d4d4")
desc_widget.update(text)
except Exception as e:
logging.getLogger(__name__).exception("Failed to update tool description pane: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to update tool description: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about tool desc update failure: %s", e)
except Exception as e:
logging.getLogger(__name__).exception("Unhandled error in on_tool_selected: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: error handling tool selection: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about tool selection error: %s", e)
@on(Button.Pressed, "#tools-close")
def close_tools(self) -> None:
self.app.pop_screen()
# ----- Main Chat Message Widgets -----
class ThinkingMessage(Static):
"""Thinking/reasoning message"""
def __init__(self, content: str, **kwargs):
super().__init__(**kwargs)
self.thinking_content = content
def render(self) -> Text:
text = Text()
text.append("* ", style="#9a9a9a")
text.append("Thinking\n", style="bold #9a9a9a")
# Wrap content
for line in wrap_text_lines(self.thinking_content, width=90):
text.append(f" {line}\n", style="#6b6b6b italic")
return text
class ToolMessage(Static):
"""Tool execution message"""
# Standard tool icon and color (pa theme)
TOOL_ICON = "$"
TOOL_COLOR = "#9a9a9a" # spirit gray
def __init__(self, tool_name: str, args: str = "", **kwargs):
super().__init__(**kwargs)
self.tool_name = tool_name
self.tool_args = args
def render(self) -> Text:
text = Text()
text.append(f"{self.TOOL_ICON} ", style=self.TOOL_COLOR)
text.append(f"{self.tool_name}", style=self.TOOL_COLOR)
text.append("\n", style="")
# Wrap args
if self.tool_args:
for line in wrap_text_lines(self.tool_args, width=110):
text.append(f" {line}\n", style="#6b6b6b")
return text
class ToolResultMessage(Static):
"""Tool result/output message"""
RESULT_ICON = "#"
RESULT_COLOR = "#7a7a7a"
def __init__(self, tool_name: str, result: str = "", **kwargs):
super().__init__(**kwargs)
self.tool_name = tool_name
self.result = result
def render(self) -> Text:
text = Text()
text.append(f"{self.RESULT_ICON} ", style=self.RESULT_COLOR)
text.append(f"{self.tool_name} output", style=self.RESULT_COLOR)
text.append("\n", style="")
if self.result:
for line in wrap_text_lines(self.result, width=110):
text.append(f" {line}\n", style="#5a5a5a")
return text
class AssistantMessage(Static):
"""Assistant response message"""
def __init__(self, content: str, **kwargs):
super().__init__(**kwargs)
self.message_content = content
def render(self) -> Text:
text = Text()
text.append(">> ", style="#9a9a9a")
text.append("PentestAgent\n", style="bold #d4d4d4")
# Wrap content
for line in wrap_text_lines(self.message_content, width=90):
text.append(f" {line}\n", style="#d4d4d4")
return text
class UserMessage(Static):
"""User message"""
def __init__(self, content: str, **kwargs):
super().__init__(**kwargs)
self.message_content = content
def render(self) -> Text:
text = Text()
text.append("> ", style="#9a9a9a")
text.append("You\n", style="bold #d4d4d4")
text.append(f" {self.message_content}\n", style="#d4d4d4")
return text
class SystemMessage(Static):
"""System message"""
def __init__(self, content: str, **kwargs):
super().__init__(**kwargs)
self.message_content = content
def render(self) -> Text:
text = Text()
for line in self.message_content.split("\n"):
text.append(f" {line}\n", style="#6b6b6b") # phantom - subtle system text
return text
# ----- Status Bar -----
class StatusBar(Static):
"""Animated status bar"""
status = reactive("idle")
mode = reactive("assist") # "assist" or "agent"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._frame = 0
self._timer: Optional[Timer] = None
def on_mount(self) -> None:
self._timer = self.set_interval(0.2, self._tick)
def _tick(self) -> None:
self._frame = (self._frame + 1) % 4
if self.status not in ["idle", "complete"]:
self.refresh()
def render(self) -> Text:
dots = "." * (self._frame + 1)
# Use fixed-width labels (pad dots to 4 chars so text doesn't jump)
dots_padded = dots.ljust(4)
# PA theme status colors (muted, ethereal)
status_map = {
"idle": ("Ready", "#6b6b6b"),
"initializing": (f"Initializing{dots_padded}", "#9a9a9a"),
"thinking": (f"Thinking{dots_padded}", "#9a9a9a"),
"running": (f"Running{dots_padded}", "#9a9a9a"),
"processing": (f"Processing{dots_padded}", "#9a9a9a"),
"waiting": ("Waiting for input", "#9a9a9a"),
"complete": ("Complete", "#4a9f6e"),
"error": ("Error", "#9f4a4a"),
}
label, color = status_map.get(self.status, (self.status, "#6b6b6b"))
text = Text()
# Show mode (ASCII-safe symbols)
if self.mode == "crew":
text.append(" :: Crew ", style="#9a9a9a")
elif self.mode == "agent":
text.append(" >> Agent ", style="#9a9a9a")
else:
text.append(" >> Assist ", style="#9a9a9a")
text.append(f"| {label}", style=color)
if self.status not in ["idle", "initializing", "complete", "error"]:
text.append(" ESC to stop", style="#525252")
return text
class MemoryDiagnostics(Static):
"""Live memory diagnostics widget mounted into the chat area.
This widget polls the agent's LLM memory stats periodically and
renders a compact, updating diagnostics panel.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._timer: Optional[Timer] = None
def on_mount(self) -> None:
# Refresh periodically for a lively display
self._timer = self.set_interval(0.8, self.refresh)
def on_unmount(self) -> None:
if self._timer:
self._timer.stop()
def _bar(self, ratio: float, width: int = 20) -> str:
filled = int(max(0, min(1.0, ratio)) * width)
return "" * filled + "" * (width - filled)
def render(self) -> Text:
text = Text()
try:
app = self.app
agent = getattr(app, "agent", None)
if not agent or not getattr(agent, "llm", None):
text.append("Memory Diagnostics\n", style="bold #d4d4d4")
text.append("Agent not initialized", style="#9a9a9a")
return text
stats = agent.llm.get_memory_stats()
msgs = len(agent.conversation_history)
llm_msgs = agent._format_messages_for_llm()
current_tokens = agent.llm.memory.get_total_tokens(llm_msgs)
budget = stats.get("token_budget") or 1
thresh = stats.get("summarize_threshold") or budget
recent_keep = stats.get("recent_to_keep", 5)
has_summary = stats.get("has_summary", False)
summarized_count = stats.get("summarized_message_count", 0)
# Header
text.append("Memory Diagnostics\n", style="bold #d4d4d4")
# Use a consistent bar width for all bars and align labels
bar_width = 28
labels = ["Tokens:", "Messages:", "Retention:"]
label_width = max(len(label_text) for label_text in labels)
# Tokens line
ratio = current_tokens / max(1, budget)
bar = self._bar(ratio, width=bar_width)
label = "Tokens:".ljust(label_width)
text.append(
f"{label} [{bar}] {current_tokens:,} / {budget:,}\n", style="#9a9a9a"
)
# Messages line (scale messages to an expected max window)
expected_msgs_max = max(1, recent_keep * 6)
mratio = min(1.0, msgs / expected_msgs_max)
mbar = self._bar(mratio, width=bar_width)
label = "Messages:".ljust(label_width)
text.append(f"{label} [{mbar}] {msgs} active\n", style="#9a9a9a")
# Retention / recent
k_ratio = min(1.0, recent_keep / max(1, recent_keep))
keep_bar = self._bar(k_ratio, width=bar_width)
label = "Retention:".ljust(label_width)
text.append(
f"{label} [{keep_bar}] keeping last {recent_keep}\n", style="#9a9a9a"
)
# Summary status
summary_state = "active" if has_summary else "inactive"
emoji = "🟢" if has_summary else "🔴"
text.append(f"Summary: {emoji} {summary_state}\n", style="#9a9a9a")
# Summarized / threshold
text.append(
f"Summarized: {summarized_count} / {thresh:,}\n", style="#9a9a9a"
)
text.append(f"Threshold: {thresh:,}\n", style="#9a9a9a")
except Exception as e:
text.append(f"Memory diagnostics error: {e}", style="#9a9a9a")
return text
class TokenDiagnostics(Static):
"""Live token/cost diagnostics panel mounted into the chat area.
Reads persisted daily usage from the token_tracker, computes cost
using environment variables, and displays a simple ASCII progress bar.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._timer: Optional[Timer] = None
def on_mount(self) -> None:
# Refresh periodically for a lively display
self._timer = self.set_interval(1.0, self.refresh)
def on_unmount(self) -> None:
if self._timer:
self._timer.stop()
def _bar(self, ratio: float, width: int = 28) -> str:
"""Block-style usage bar matching MemoryDiagnostics visuals."""
r = max(0.0, min(1.0, ratio))
filled = int(r * width)
return "" * filled + "" * (width - filled)
def render(self) -> Text:
text = Text()
try:
import os
# Lazy import of token_tracker (best-effort)
try:
from ..tools import token_tracker
except Exception as e:
logging.getLogger(__name__).exception("Failed to import token_tracker: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: token tracker import failed: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about token_tracker import failure: %s", e)
token_tracker = None
text.append("Token Usage Diagnostics\n", style="bold #d4d4d4")
if not token_tracker:
text.append(
"Token tracker not available (tools/token_tracker).\n",
style="#9a9a9a",
)
return text
stats = token_tracker.get_stats_sync()
# If a reset is pending (date changed), perform a reset now so daily
# usage is accurate and visible to the user.
reset_occurred = False
if stats.get("reset_pending"):
try:
token_tracker.record_usage_sync(0, 0)
stats = token_tracker.get_stats_sync()
reset_occurred = True
except Exception as e:
logging.getLogger(__name__).exception("Token tracker reset failed: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Token tracker reset failed: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about token tracker reset failure: %s", e)
# Extract values
last_in = int(stats.get("last_input_tokens", 0) or 0)
last_out = int(stats.get("last_output_tokens", 0) or 0)
last_total = int(stats.get("last_total_tokens", 0) or 0)
daily_usage = int(stats.get("daily_usage", 0) or 0)
last_reset = stats.get("last_reset_date")
current_date = stats.get("current_date")
# (env parsing moved below)
# Environment cost config
def _parse_env(name: str):
v = os.getenv(name)
if v is None or v == "":
return None
try:
return float(v)
except Exception as e:
logging.getLogger(__name__).debug("Failed to parse env var %s: %s", name, e)
return "INVALID"
unified = _parse_env("COST_PER_MILLION")
input_cost_per_m = _parse_env("INPUT_COST_PER_MILLION")
output_cost_per_m = _parse_env("OUTPUT_COST_PER_MILLION")
daily_limit = _parse_env("DAILY_TOKEN_LIMIT")
# Determine if any env-based limits exist
has_env_limits = any(
v is not None
for v in (unified, input_cost_per_m, output_cost_per_m, daily_limit)
)
# If nothing has been recorded yet (no tokens, no daily usage)
# and no env limits are configured, show the concise sentinel only.
if last_total == 0 and daily_usage == 0 and not has_env_limits:
text.append("No token usage recorded\n", style="#9a9a9a")
return text
# Validate env vars
env_errors = []
if unified == "INVALID":
env_errors.append("COST_PER_MILLION is not numeric")
if input_cost_per_m == "INVALID":
env_errors.append("INPUT_COST_PER_MILLION is not numeric")
if output_cost_per_m == "INVALID":
env_errors.append("OUTPUT_COST_PER_MILLION is not numeric")
if daily_limit == "INVALID":
env_errors.append("DAILY_TOKEN_LIMIT is not numeric")
if env_errors:
text.append("Environment configuration errors:\n", style="#ef4444")
for e in env_errors:
text.append(f" - {e}\n", style="#9a9a9a")
text.append(
"\nSet environment variables correctly to compute costs.\n",
style="#9a9a9a",
)
return text
# Compute costs
if unified is not None:
# Use unified cost for both input and output
input_cost = (last_in / 1_000_000.0) * float(unified)
output_cost = (last_out / 1_000_000.0) * float(unified)
else:
# Require per-direction costs to be present to compute
if input_cost_per_m is None or output_cost_per_m is None:
text.append(
"Cost vars missing. Set COST_PER_MILLION or both INPUT_COST_PER_MILLION and OUTPUT_COST_PER_MILLION.\n",
style="#9a9a9a",
)
# Still show numeric token stats below
input_cost = output_cost = None
else:
input_cost = (last_in / 1_000_000.0) * float(input_cost_per_m)
output_cost = (last_out / 1_000_000.0) * float(output_cost_per_m)
total_cost = None
if input_cost is not None and output_cost is not None:
total_cost = input_cost + output_cost
# Daily budget calculations per spec
# Derive daily usage excluding last command (in case tracker already included it)
daily_without_last = max(daily_usage - last_total, 0)
new_daily_total = daily_without_last + last_total
remaining_tokens = None
percent_used = None
if daily_limit is not None:
try:
dl = float(daily_limit)
remaining_tokens = max(int(dl - new_daily_total), 0)
percent_used = (new_daily_total / max(1.0, dl)) * 100.0
except Exception as e:
logging.getLogger(__name__).exception("Failed to compute daily limit values: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to compute daily token limit: {e}")
except Exception as e:
logging.getLogger(__name__).exception("Failed to notify operator about daily limit computation failure: %s", e)
remaining_tokens = None
# Render structured panel with aligned labels and block bars
bar_width = 28
labels = [
"Last command:",
"Cost:",
"Daily usage:",
"Remaining:",
"Usage:",
"Last reset:",
"Current date:",
"Reset occurred:",
]
label_width = max(len(label_text) for label_text in labels)
# Last command tokens
label = "Last command:".ljust(label_width)
text.append(
f"{label} in={last_in:,} out={last_out:,} total={last_total:,}\n",
style="#9a9a9a",
)
# Cost line
label = "Cost:".ljust(label_width)
if input_cost is not None and output_cost is not None:
text.append(
f"{label} in=${input_cost:.6f} out=${output_cost:.6f} total=${total_cost:.6f}\n",
style="#9a9a9a",
)
else:
text.append(
f"{label} not computed (missing env vars)\n",
style="#9a9a9a",
)
# Daily usage
label = "Daily usage:".ljust(label_width)
text.append(f"{label} {new_daily_total:,}\n", style="#9a9a9a")
# Remaining tokens
label = "Remaining:".ljust(label_width)
if remaining_tokens is not None:
text.append(f"{label} {remaining_tokens:,}\n", style="#9a9a9a")
else:
text.append(
f"{label} N/A (DAILY_TOKEN_LIMIT not set)\n",
style="#9a9a9a",
)
# Usage percent + bar
label = "Usage:".ljust(label_width)
if percent_used is not None:
bar = self._bar(percent_used / 100.0, width=bar_width)
text.append(
f"{label} [{bar}] {percent_used:.1f}%\n",
style="#9a9a9a",
)
else:
text.append(f"{label} N/A\n", style="#9a9a9a")
# Dates
label = "Last reset:".ljust(label_width)
text.append(f"{label} {last_reset}\n", style="#9a9a9a")
label = "Current date:".ljust(label_width)
text.append(f"{label} {current_date}\n", style="#9a9a9a")
# Reset occurrence
label = "Reset occurred:".ljust(label_width)
text.append(
f"{label} {'Yes' if reset_occurred else 'No'}\n",
style="#9a9a9a",
)
except Exception as e:
text.append(f"Token diagnostics error: {e}\n", style="#9a9a9a")
return text
# ----- Main TUI App -----
class PentestAgentTUI(App):
"""Main PentestAgent TUI Application"""
# ═══════════════════════════════════════════════════════════
# PA THEME - Ethereal grays
# ═══════════════════════════════════════════════════════════
# Void: #0a0a0a (terminal black - the darkness)
# Shadow: #121212 (subtle surface)
# Mist: #1a1a1a (panels, elevated)
# Whisper: #262626 (default borders)
# Fog: #3a3a3a (hover states)
# Apparition: #525252 (focus states)
# Phantom: #6b6b6b (secondary text)
# Spirit: #9a9a9a (normal text)
# Specter: #d4d4d4 (primary text)
# Ectoplasm: #f0f0f0 (highlights)
# ═══════════════════════════════════════════════════════════
CSS = """
Screen {
background: #0a0a0a;
}
#main-container {
width: 100%;
height: 100%;
layout: horizontal;
}
/* Chat area - takes full width normally, fills remaining space with sidebar */
#chat-area {
width: 1fr;
height: 100%;
}
#chat-area.with-sidebar {
width: 1fr;
}
#chat-scroll {
width: 100%;
height: 1fr;
background: transparent;
padding: 1 2;
scrollbar-background: #1a1a1a;
scrollbar-background-hover: #1a1a1a;
scrollbar-background-active: #1a1a1a;
scrollbar-color: #3a3a3a;
scrollbar-color-hover: #3a3a3a;
scrollbar-color-active: #3a3a3a;
scrollbar-corner-color: #1a1a1a;
scrollbar-size: 1 1;
}
#input-container {
width: 100%;
height: 3;
background: transparent;
border: round #262626;
margin: 0 2;
padding: 0;
layout: horizontal;
align-vertical: middle;
}
#input-container:focus-within {
border: round #525252;
}
#input-container:focus-within #chat-prompt {
color: #d4d4d4;
}
#chat-prompt {
width: auto;
height: 100%;
padding: 0 0 0 1;
color: #6b6b6b;
content-align-vertical: middle;
}
#chat-input {
width: 1fr;
height: 100%;
background: transparent;
border: none;
padding: 0;
margin: 0;
color: #d4d4d4;
}
#chat-input:focus {
border: none;
}
#chat-input > .input--placeholder {
color: #6b6b6b;
text-style: italic;
}
#status-bar {
width: 100%;
height: 1;
background: transparent;
padding: 0 3;
margin: 0;
}
.message {
margin-bottom: 1;
}
/* Sidebar - hidden by default */
#sidebar {
width: 28;
height: 100%;
display: none;
padding-right: 1;
}
#sidebar.visible {
display: block;
}
#workers-tree {
height: 1fr;
background: transparent;
border: round #262626;
padding: 0 1;
margin-bottom: 0;
}
#workers-tree:focus {
border: round #3a3a3a;
}
#crew-stats {
height: auto;
max-height: 10;
background: transparent;
border: round #262626;
border-title-color: #9a9a9a;
border-title-style: bold;
padding: 0 1;
margin-top: 0;
}
Tree {
background: transparent;
color: #d4d4d4;
scrollbar-background: #1a1a1a;
scrollbar-background-hover: #1a1a1a;
scrollbar-background-active: #1a1a1a;
scrollbar-color: #3a3a3a;
scrollbar-color-hover: #3a3a3a;
scrollbar-color-active: #3a3a3a;
scrollbar-size: 1 1;
}
Tree > .tree--cursor {
background: transparent;
}
Tree > .tree--highlight {
background: transparent;
}
Tree > .tree--highlight-line {
background: transparent;
}
.tree--node-label {
padding: 0 1;
}
.tree--node:hover .tree--node-label {
background: transparent;
}
.tree--node.-selected .tree--node-label {
background: transparent;
color: #d4d4d4;
}
"""
BINDINGS = [
Binding("ctrl+q", "quit_app", "Quit", priority=True),
Binding("ctrl+c", "stop_agent", "Stop", priority=True, show=False),
Binding("escape", "stop_agent", "Stop", priority=True),
Binding("f1", "show_help", "Help"),
Binding("tab", "focus_next", "Next", show=False),
Binding("up", "history_up", "Prev", show=False),
Binding("down", "history_down", "Next", show=False),
]
TITLE = "PentestAgent"
SUB_TITLE = "AI Penetration Testing"
def __init__(
self,
target: Optional[str] = None,
model: Optional[str] = None,
use_docker: bool = False,
**kwargs,
):
super().__init__(**kwargs)
self.target = target
self.model = model or DEFAULT_MODEL
self.use_docker = use_docker
# Agent components
self.agent: Optional["PentestAgentAgent"] = None
self.runtime = None
self.mcp_manager = None
self.all_tools = []
self.rag_engine = None # RAG engine
# State
self._mode = "assist" # "assist", "agent", or "crew"
self._is_running = False
self._is_initializing = True # Block input during init
self._should_stop = False
# Worker handle returned by `@work` or an `asyncio.Task` (keep generic)
self._current_worker: Optional[Any] = None # Track running worker for cancellation
self._current_crew = None # Track crew orchestrator for cancellation
# Crew mode state
self._crew_workers: Dict[str, Dict[str, Any]] = {}
self._crew_worker_nodes: Dict[str, TreeNode] = {}
self._crew_orchestrator_node: Optional[TreeNode] = None
self._crew_findings_count = 0
self._viewing_worker_id: Optional[str] = None
self._worker_events: Dict[str, List[Dict]] = {}
self._crew_start_time: Optional[float] = None
self._crew_tokens_used: int = 0
self._crew_stats_timer: Optional[Timer] = None
self._spinner_timer: Optional[Timer] = None
self._spinner_frame: int = 0
self._spinner_frames = [
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
] # Braille dots spinner
# Command history
self._cmd_history: List[str] = []
self._history_index: int = 0
def compose(self) -> ComposeResult:
with Horizontal(id="main-container"):
# Chat area (left side)
with Vertical(id="chat-area"):
yield ScrollableContainer(id="chat-scroll")
yield StatusBar(id="status-bar")
with Horizontal(id="input-container"):
yield Static("> ", id="chat-prompt")
yield Input(placeholder="Enter task or type /help", id="chat-input")
# Sidebar (right side, hidden by default)
with Vertical(id="sidebar"):
yield CrewTree("CREW", id="workers-tree")
yield Static("", id="crew-stats")
async def on_mount(self) -> None:
"""Initialize on mount"""
# Register notifier callback so other modules can emit operator-visible messages
try:
from .notifier import register_callback
register_callback(self._notifier_callback)
except Exception as e:
logging.getLogger(__name__).exception("Failed to register TUI notifier callback: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to register notifier callback: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about notifier registration failure: %s", ne)
# Call the textual worker - decorator returns a Worker, not a coroutine
_ = cast(Any, self._initialize_agent())
@work(thread=False)
async def _initialize_agent(self) -> None:
"""Initialize agent"""
self._set_status("initializing")
try:
import os
from ..agents.pa_agent import PentestAgentAgent
from ..knowledge import RAGEngine
from ..llm import LLM, ModelConfig
from ..mcp import MCPManager
from ..runtime.docker_runtime import DockerRuntime
from ..runtime.runtime import LocalRuntime
from ..tools import get_all_tools, register_tool_instance
# RAG Engine - auto-load knowledge sources
rag_doc_count = 0
knowledge_path = None
# Check local knowledge dir first (must have files, not just exist)
local_knowledge = Path("knowledge")
bundled_path = Path(__file__).parent.parent / "knowledge" / "sources"
if local_knowledge.exists() and any(local_knowledge.rglob("*.*")):
knowledge_path = local_knowledge
elif bundled_path.exists():
knowledge_path = bundled_path
if knowledge_path:
try:
# Determine embedding method: env var > auto-detect
embeddings_setting = os.getenv(
"PENTESTAGENT_EMBEDDINGS", ""
).lower()
if embeddings_setting == "local":
use_local = True
elif embeddings_setting == "openai":
use_local = False
else:
# Auto: use OpenAI if key available, else local
use_local = not os.getenv("OPENAI_API_KEY")
self.rag_engine = RAGEngine(
knowledge_path=knowledge_path, use_local_embeddings=use_local
)
await asyncio.to_thread(self.rag_engine.index)
rag_doc_count = self.rag_engine.get_document_count()
except Exception as e:
self._add_system(f"[!] RAG: {e}")
self.rag_engine = None
# MCP - auto-load only if enabled in environment
mcp_server_count = 0
import os
launch_hexstrike = os.getenv("LAUNCH_HEXTRIKE", "false").lower() == "true"
launch_metasploit = os.getenv("LAUNCH_METASPLOIT_MCP", "false").lower() == "true"
if launch_hexstrike or launch_metasploit:
try:
self.mcp_manager = MCPManager()
if self.mcp_manager.config_path.exists():
mcp_tools = await self.mcp_manager.connect_all()
for tool in mcp_tools:
register_tool_instance(tool)
mcp_server_count = len(self.mcp_manager.servers)
except Exception as e:
self._add_system(f"[!] MCP: {e}")
else:
self.mcp_manager = None
mcp_server_count = 0
# Runtime - Docker or Local
if self.use_docker:
self._add_system("+ Starting Docker container...")
self.runtime = DockerRuntime(mcp_manager=self.mcp_manager)
else:
self.runtime = LocalRuntime(mcp_manager=self.mcp_manager)
await self.runtime.start()
# LLM
# Validate model/runtime presence and provide a user-friendly error
if not self.model:
self._add_system(
"[!] No model configured. Set PENTESTAGENT_MODEL environment variable or create a .env file (see .env.example)."
)
self._set_status("error")
self._is_initializing = False
return
if not self.runtime:
self._add_system("[!] Runtime failed to initialize.")
self._set_status("error")
self._is_initializing = False
return
llm = LLM(
model=self.model,
config=ModelConfig(temperature=0.7),
rag_engine=self.rag_engine,
)
# Tools
self.all_tools = get_all_tools() # Ensure tools are loaded
# Agent
self.agent = PentestAgentAgent(
llm=llm,
tools=self.all_tools,
runtime=self.runtime,
target=self.target,
rag_engine=self.rag_engine,
)
self._set_status("idle", "assist")
self._is_initializing = False # Allow input now
# Show ready message
tools_str = ", ".join(t.name for t in self.all_tools[:5])
if len(self.all_tools) > 5:
tools_str += f", +{len(self.all_tools) - 5} more"
runtime_str = "Docker" if self.use_docker else "Local"
self._add_system(
f"+ PentestAgent ready\n"
f" Model: {self.model} | Tools: {len(self.all_tools)} | MCP: {mcp_server_count} | RAG: {rag_doc_count}\n"
f" Runtime: {runtime_str} | Mode: Assist (use /agent or /crew for autonomous modes)"
)
# Show target if provided (but don't auto-start)
if self.target:
self._add_system(f" Target: {self.target}")
except Exception as e:
import traceback
self._add_system(f"[!] Init failed: {e}\n{traceback.format_exc()}")
self._set_status("error")
self._is_initializing = False # Allow input even on error
def _set_status(self, status: str, mode: Optional[str] = None) -> None:
"""Update status bar"""
try:
bar = self.query_one("#status-bar", StatusBar)
bar.status = status
if mode:
bar.mode = mode
self._mode = mode
except Exception as e:
logging.getLogger(__name__).exception("Failed to update status bar: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to update status bar: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about status bar update failure: %s", ne)
def _show_notification(self, level: str, message: str) -> None:
"""Display a short operator-visible notification in the chat area."""
try:
# Prepend a concise system message so it is visible in the chat
prefix = "[!]" if level.lower() in ("error", "critical") else "[!]"
self._add_system(f"{prefix} {message}")
# Set status bar to error briefly for emphasis
if level.lower() in ("error", "critical"):
self._set_status("error")
except Exception as e:
logging.getLogger(__name__).exception("Failed to show notification in TUI: %s", e)
def _notifier_callback(self, level: str, message: str) -> None:
"""Callback wired to `pentestagent.interface.notifier`.
This will be registered on mount so other modules can emit notifications.
"""
try:
# textual apps typically run in the main thread; try to schedule update
# using call_from_thread if available, otherwise call directly.
if hasattr(self, "call_from_thread"):
try:
self.call_from_thread(self._show_notification, level, message)
return
except Exception as e:
logging.getLogger(__name__).exception("call_from_thread failed in notifier callback: %s", e)
# Fall through to direct call
self._show_notification(level, message)
except Exception as e:
logging.getLogger(__name__).exception("Exception in notifier callback handling: %s", e)
def _add_message(self, widget: Static) -> None:
"""Add a message widget to chat"""
try:
scroll = self.query_one("#chat-scroll", ScrollableContainer)
widget.add_class("message")
scroll.mount(widget)
scroll.scroll_end(animate=False)
except Exception as e:
logging.getLogger(__name__).exception("Failed to add message to chat: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to add chat message: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about add_message failure: %s", ne)
def _add_system(self, content: str) -> None:
self._add_message(SystemMessage(content))
def _add_user(self, content: str) -> None:
self._add_message(UserMessage(content))
def _add_assistant(self, content: str) -> None:
self._add_message(AssistantMessage(content))
def _add_thinking(self, content: str) -> None:
self._add_message(ThinkingMessage(content))
def _add_tool(self, name: str, action: str = "") -> None:
self._add_message(ToolMessage(name, action))
def _add_tool_result(self, name: str, result: str) -> None:
"""Display tool execution result"""
# Hide tool output - LLM will synthesize it in its response
# This prevents duplication and keeps the chat clean
pass
def _show_system_prompt(self) -> None:
"""Display the current system prompt"""
if self.agent:
prompt = self.agent.get_system_prompt()
self._add_system(f"=== System Prompt ===\n{prompt}")
else:
self._add_system("Agent not initialized")
def _show_memory_stats(self) -> None:
"""Mount a live memory diagnostics widget into the chat area."""
try:
scroll = self.query_one("#chat-scroll", ScrollableContainer)
except Exception as e:
logging.getLogger(__name__).exception("Failed to query chat-scroll for memory diagnostics: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: memory diagnostics unavailable: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about memory diagnostics availability: %s", ne)
self._add_system("Agent not initialized")
return
# Mount a new diagnostics panel with a unique ID and scroll into view
try:
import uuid
panel_id = f"memory-diagnostics-{uuid.uuid4().hex}"
except Exception as e:
logging.getLogger(__name__).exception("Failed to generate memory diagnostics panel id: %s", e)
panel_id = None
widget = MemoryDiagnostics(id=panel_id)
scroll.mount(widget)
try:
scroll.scroll_end(animate=False)
except Exception as e:
logging.getLogger(__name__).exception("Failed to scroll to memory diagnostics panel: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to scroll to memory diagnostics panel: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about scroll failure: %s", ne)
def _show_token_stats(self) -> None:
"""Mount a live token diagnostics widget into the chat area."""
try:
scroll = self.query_one("#chat-scroll", ScrollableContainer)
except Exception as e:
logging.getLogger(__name__).exception("Failed to query chat-scroll for token diagnostics: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: token diagnostics unavailable: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about token diagnostics availability: %s", ne)
self._add_system("Agent not initialized")
return
# Mount a new diagnostics panel with a unique ID and scroll into view
try:
import uuid
panel_id = f"token-diagnostics-{uuid.uuid4().hex}"
except Exception as e:
logging.getLogger(__name__).exception("Failed to generate token diagnostics panel id: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to generate token diagnostics panel id: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about token diagnostics panel id generation failure: %s", ne)
panel_id = None
widget = TokenDiagnostics(id=panel_id)
scroll.mount(widget)
try:
scroll.scroll_end(animate=False)
except Exception as e:
logging.getLogger(__name__).exception("Failed to scroll to token diagnostics panel: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"TUI: failed to scroll to token diagnostics panel: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about token diagnostics scroll failure: %s", ne)
async def _show_notes(self) -> None:
"""Display saved notes"""
from ..tools.notes import get_all_notes
notes = await get_all_notes()
if not notes:
self._add_system(
"=== Notes ===\nNo notes saved.\n\nThe AI can save key findings using the notes tool."
)
return
lines = [f"=== Notes ({len(notes)} entries) ==="]
for key, value in notes.items():
# Show full value, indent multi-line content
if "\n" in value:
indented = str(value).replace("\n", "\n ")
lines.append(f"\n[{key}]\n {indented}")
else:
lines.append(f"[{key}] {value}")
lines.append("\nFile: loot/notes.json")
lines.append("Reports: loot/reports/")
self._add_system("\n".join(lines))
def _build_prior_context(self) -> str:
"""Build a summary of prior findings for crew mode.
Extracts:
- Tool results (nmap scans, etc.) - the actual findings
- Assistant analyses - interpretations and summaries
- Last user task - what they were working on
Excludes:
- Raw user messages (noise)
- Tool call declarations (just names/args, not results)
- Very short responses
"""
if not self.agent or not self.agent.conversation_history:
return ""
findings = []
last_user_task = ""
for msg in self.agent.conversation_history:
# Track user tasks/questions
if msg.role == "user" and msg.content:
last_user_task = msg.content[:200]
# Extract tool results (the actual findings)
elif msg.tool_results:
for result in msg.tool_results:
if result.success and result.result:
content = (
result.result[:1500]
if len(result.result) > 1500
else result.result
)
findings.append(f"[{result.tool_name}]\n{content}")
# Include assistant analyses (but not tool call messages)
elif msg.role == "assistant" and msg.content and not msg.tool_calls:
if len(msg.content) > 50:
findings.append(f"[Analysis]\n{msg.content[:1000]}")
if not findings and not last_user_task:
return ""
# Build context with last user task + recent findings
parts = []
if last_user_task:
parts.append(f"Last task: {last_user_task}")
if findings:
parts.append("Findings:\n" + "\n\n".join(findings[-5:]))
context = "\n\n".join(parts)
if len(context) > 4000:
context = context[:4000] + "\n... (truncated)"
return context
def _set_target(self, cmd: str) -> None:
"""Set the target for the engagement"""
# Remove /target prefix
target = cmd[7:].strip()
if not target:
if self.target:
self._add_system(
f"Current target: {self.target}\nUsage: /target <host>"
)
else:
self._add_system(
"No target set.\nUsage: /target <host>\nExample: /target 192.168.1.1"
)
return
self.target = target
# Update agent's target if agent exists
if self.agent:
self.agent.target = target
try:
from pentestagent.agents.base_agent import AgentMessage
# Inform the agent (LLM) that the operator changed the target so
# subsequent generations use the new value instead of older
# workspace-bound targets from conversation history.
self.agent.conversation_history.append(
AgentMessage(role="system", content=f"Operator set target to {target}")
)
# Track manual target override so workspace restores can remove
# or supersede this message when appropriate.
try:
setattr(self.agent, "_manual_target", target)
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception(
"Failed to append target change to agent history: %s", e
)
# Persist to active workspace if present
try:
from pentestagent.workspaces.manager import WorkspaceManager
wm = WorkspaceManager()
active = wm.get_active()
if active:
try:
wm.set_last_target(active, target)
except Exception as e:
logging.getLogger(__name__).exception("Failed to persist last target for workspace %s: %s", active, e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"Failed to persist last target for workspace {active}: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about target persist error: %s", ne)
except Exception as e:
logging.getLogger(__name__).exception("Failed to access WorkspaceManager to persist last target: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"TUI: failed to persist last target: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about WorkspaceManager access failure: %s", ne)
# Update displayed Target in the UI
try:
self._apply_target_display(target)
except Exception as e:
logging.getLogger(__name__).exception("Failed to apply target display: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"Failed to update target display: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about target display error: %s", ne)
# Update the initial ready SystemMessage (if present) so Target appears under Runtime
try:
scroll = self.query_one("#chat-scroll", ScrollableContainer)
updated = False
for child in scroll.children:
if isinstance(child, SystemMessage) and "PentestAgent ready" in getattr(
child, "message_content", ""
):
# Replace existing Target line if present, otherwise append
try:
if "Target:" in child.message_content:
# replace the first Target line
child.message_content = re.sub(
r"(?m)^\s*Target:.*$",
f" Target: {target}",
child.message_content,
count=1,
)
else:
child.message_content = (
child.message_content + f"\n Target: {target}"
)
try:
child.refresh()
except Exception as e:
logging.getLogger(__name__).exception("Failed to refresh child message after target update: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"Failed to refresh UI after target update: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about UI refresh error: %s", ne)
except Exception as e:
# Fallback to append if regex replacement fails, and surface warning
logging.getLogger(__name__).exception("Failed to update SystemMessage target line: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"Failed to update target display: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about target update error: %s", ne)
child.message_content = (
child.message_content + f"\n Target: {target}"
)
updated = True
break
if not updated:
# Fallback: add system line near top by inserting at beginning
try:
first = scroll.children[0] if scroll.children else None
msg = SystemMessage(f" Target: {target}")
if first:
scroll.mount_before(msg, first)
else:
scroll.mount(msg)
except Exception as e:
logging.getLogger(__name__).exception("Failed to mount target system message: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"TUI: failed to display target: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about target display failure: %s", ne)
self._add_system(f" Target: {target}")
except Exception as e:
logging.getLogger(__name__).exception("Failed while applying target display: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"TUI: failed while updating target display: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about target display outer failure: %s", ne)
# Last resort: append a subtle system line
self._add_system(f" Target: {target}")
@work(exclusive=True)
async def _run_report_generation(self) -> None:
"""Generate a pentest report from notes and conversation"""
from pathlib import Path
from ..tools.notes import get_all_notes
if not self.agent or not self.agent.llm:
self._add_system("[!] Agent not initialized")
return
notes = await get_all_notes()
if not notes:
self._add_system(
"No notes found. PentestAgent saves findings using the notes tool during testing."
)
return
self._add_system("Generating report...")
# Format notes
notes_text = "\n".join(f"### {k}\n{v}\n" for k, v in notes.items())
# Build conversation summary from full history
conversation_summary = ""
if self.agent.conversation_history:
# Summarize key actions from conversation
actions = []
for msg in self.agent.conversation_history:
if msg.role == "assistant" and msg.tool_calls:
for tc in msg.tool_calls:
actions.append(f"- Tool: {tc.name}")
elif msg.role == "tool_result" and msg.tool_results:
for tr in msg.tool_results:
# Include truncated result
result = tr.result or ""
output = result[:200] + "..." if len(result) > 200 else result
actions.append(f" Result: {output}")
if actions:
conversation_summary = "\n".join(actions[-30:]) # Last 30 actions
report_prompt = f"""Generate a penetration test report in Markdown from the notes below.
# Notes
{notes_text}
# Activity Log
{conversation_summary if conversation_summary else "N/A"}
# Target
{self.target or "Not specified"}
Output a report with:
1. Executive Summary (2-3 sentences)
2. Findings (use notes, include severity: Critical/High/Medium/Low/Info)
3. Recommendations
Be concise. Use the actual data from notes."""
try:
report_content = await self.agent.llm.simple_completion(
prompt=report_prompt,
system="You are a penetration tester writing a security report. Be concise and factual.",
)
if not report_content or not report_content.strip():
self._add_system(
"[!] Report generation returned empty. Check LLM connection."
)
return
# Save to loot/reports/
reports_dir = Path("loot/reports")
reports_dir.mkdir(parents=True, exist_ok=True)
# Append Shadow Graph if available
try:
from ..knowledge.graph import ShadowGraph
from ..tools.notes import get_all_notes_sync
# Rehydrate graph from notes
graph = ShadowGraph()
notes = get_all_notes_sync()
if notes:
graph.update_from_notes(notes)
mermaid_code = graph.to_mermaid()
if mermaid_code:
report_content += (
"\n\n## Attack Graph (Visual)\n\n```mermaid\n"
+ mermaid_code
+ "\n```\n"
)
except Exception as e:
self._add_system(f"[!] Graph generation error: {e}")
timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
report_path = reports_dir / f"report_{timestamp}.md"
report_path.write_text(report_content, encoding="utf-8")
self._add_system(f"+ Report saved: {report_path}")
except Exception as e:
self._add_system(f"[!] Report error: {e}")
@on(Input.Submitted, "#chat-input")
async def handle_submit(self, event: Input.Submitted) -> None:
"""Handle input submission"""
# Block input while initializing or AI is processing
if self._is_initializing or self._is_running:
return
# Strip ANSI escape sequences and control codes
message = _ANSI_ESCAPE.sub("", event.value).strip()
if not message:
return
# Save to history (de-duplicate consecutive duplicates)
if not (self._cmd_history and self._cmd_history[-1] == message):
self._cmd_history.append(message)
# Reset index to one past the end (blank)
self._history_index = len(self._cmd_history)
event.input.value = ""
# Commands
if message.startswith("/"):
await self._handle_command(message)
return
self._add_user(message)
# Hide crew sidebar when entering assist mode
self._hide_sidebar()
# Use assist mode by default
if self.agent and not self._is_running:
# Schedule assist run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
self._current_worker = self._run_assist(message)
async def _handle_command(self, cmd: str) -> None:
"""Handle slash commands"""
cmd_lower = cmd.lower().strip()
cmd_original = cmd.strip()
if cmd_lower in ["/help", "/h", "/?"]:
await self.push_screen(HelpScreen())
elif cmd_lower == "/clear":
scroll = self.query_one("#chat-scroll", ScrollableContainer)
await scroll.remove_children()
self._hide_sidebar()
# Clear agent conversation history for fresh start
if self.agent:
self.agent.conversation_history.clear()
self._add_system("Chat cleared")
elif cmd_lower == "/tools":
# Open the interactive tools browser (split-pane).
try:
await self.push_screen(ToolsScreen(tools=self.all_tools))
except Exception:
# Fallback: list tools in the system area if UI push fails
from ..runtime.runtime import detect_environment
names = [t.name for t in self.all_tools]
msg = f"Tools ({len(names)}): " + ", ".join(names)
# Add detected CLI tools
env = detect_environment()
if env.available_tools:
# Group by category
by_category = {}
for tool_info in env.available_tools:
if tool_info.category not in by_category:
by_category[tool_info.category] = []
by_category[tool_info.category].append(tool_info.name)
cli_sections = []
for category in sorted(by_category.keys()):
tools_list = ", ".join(sorted(by_category[category]))
cli_sections.append(f"{category}: {tools_list}")
msg += f"\n\nCLI Tools ({len(env.available_tools)}):\n" + "\n".join(
cli_sections
)
self._add_system(msg)
elif cmd_lower in ["/quit", "/exit", "/q"]:
self.exit()
elif cmd_lower == "/prompt":
self._show_system_prompt()
elif cmd_lower == "/memory":
self._show_memory_stats()
elif cmd_lower == "/token":
self._show_token_stats()
elif cmd_lower == "/notes":
await self._show_notes()
elif cmd_lower == "/report":
# Call the textual worker - decorator returns a Worker
_ = cast(Any, self._run_report_generation())
elif cmd_original.startswith("/target"):
self._set_target(cmd_original)
elif cmd_original.startswith("/workspace"):
# Support lightweight workspace management from the TUI
try:
from pentestagent.workspaces.manager import (
WorkspaceError,
WorkspaceManager,
)
from pentestagent.workspaces.utils import resolve_knowledge_paths
wm = WorkspaceManager()
rest = cmd_original[len("/workspace") :].strip()
if not rest:
active = wm.get_active()
if not active:
self._add_system("No active workspace.")
else:
# restore last target if present
last = wm.get_meta_field(active, "last_target")
if last:
self.target = last
if self.agent:
self.agent.target = last
# If the operator set a manual target while no
# workspace was active, remove/supersede that
# system message so the LLM uses the workspace
# target instead of the stale manual one.
try:
manual = getattr(self.agent, "_manual_target", None)
if manual:
self.agent.conversation_history = [
m
for m in self.agent.conversation_history
if not (
m.role == "system"
and isinstance(m.content, str)
and m.content.strip().startswith("Operator set target to")
)
]
try:
delattr(self.agent, "_manual_target")
except Exception:
pass
except Exception:
pass
try:
from pentestagent.agents.base_agent import AgentMessage
self.agent.conversation_history.append(
AgentMessage(
role="system",
content=f"Workspace active; restored last target: {last}",
)
)
except Exception:
pass
try:
self._apply_target_display(last)
except Exception as e:
logging.getLogger(__name__).exception("Failed to apply target display when restoring last target: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"TUI: failed restoring last target display: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about restore-last-target failure")
self._add_system(f"Active workspace: {active}")
return
parts = rest.split()
verb = parts[0].lower()
if verb == "help":
try:
await self.push_screen(WorkspaceHelpScreen())
except Exception:
# Fallback: show inline help text
self._add_system(
"Usage: /workspace <action>\nCommands: list, info, note, clear, help, <name>"
)
return
if verb == "list":
wss = wm.list_workspaces()
if not wss:
self._add_system("No workspaces found.")
return
out = []
active = wm.get_active()
for name in sorted(wss):
prefix = "* " if name == active else " "
out.append(f"{prefix}{name}")
self._add_system("\n".join(out))
return
if verb == "info":
name = parts[1] if len(parts) > 1 else wm.get_active()
if not name:
self._add_system("No workspace specified and no active workspace.")
return
try:
meta = wm.get_meta(name)
created = meta.get("created_at")
last_active = meta.get("last_active_at")
targets = meta.get("targets", [])
kp = resolve_knowledge_paths()
ks = "workspace" if kp.get("using_workspace") else "global"
self._add_system(
f"Name: {name}\nCreated: {created}\nLast active: {last_active}\nTargets: {len(targets)}\nKnowledge scope: {ks}"
)
except Exception as e:
self._add_system(f"Error retrieving workspace info: {e}")
return
if verb == "note":
# By default, use the active workspace; allow explicit override via --workspace/-w.
name = wm.get_active()
i = 1
# Parse optional workspace selector flags before the note text.
while i < len(parts):
part = parts[i]
if part in ("--workspace", "-w"):
if i + 1 >= len(parts):
self._add_system("Usage: /workspace note [--workspace NAME] <text>")
return
name = parts[i + 1]
i += 2
continue
# First non-option token marks the start of the note text
break
if not name:
self._add_system("No active workspace. Set one with /workspace <name>.")
return
text = " ".join(parts[i:])
if not text:
self._add_system("Usage: /workspace note [--workspace NAME] <text>")
return
try:
wm.set_operator_note(name, text)
self._add_system(f"Operator note saved for workspace '{name}'.")
except Exception as e:
self._add_system(f"Error saving note: {e}")
return
if verb == "clear":
active = wm.get_active()
if not active:
self._add_system("No active workspace.")
return
marker = wm.active_marker()
try:
if marker.exists():
marker.unlink()
# Clear TUI and agent target when workspace is deactivated
self.target = ""
try:
self._apply_target_display("")
except Exception:
pass
if self.agent:
try:
# Clear agent's target and any manual override
self.agent.target = ""
try:
if hasattr(self.agent, "_manual_target"):
delattr(self.agent, "_manual_target")
except Exception:
pass
from pentestagent.agents.base_agent import AgentMessage
self.agent.conversation_history.append(
AgentMessage(
role="system",
content=(
f"Workspace '{active}' deactivated; cleared target"
),
)
)
except Exception:
logging.getLogger(__name__).exception(
"Failed to clear agent target on workspace clear"
)
self._add_system(f"Workspace '{active}' deactivated.")
except Exception as e:
self._add_system(f"Error deactivating workspace: {e}")
return
# Default: treat rest as workspace name -> create (only if missing) and set active
name = rest
try:
existed = wm.workspace_path(name).exists()
if not existed:
wm.create(name)
wm.set_active(name)
# restore last target if set on workspace
last = wm.get_meta_field(name, "last_target")
if last:
self.target = last
if self.agent:
self.agent.target = last
try:
self._apply_target_display(last)
except Exception as e:
logging.getLogger(__name__).exception("Failed to apply target display when activating workspace: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"TUI: failed to restore workspace target display: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about workspace target restore failure")
if existed:
self._add_system(f"Workspace '{name}' set active.")
else:
self._add_system(f"Workspace '{name}' created and set active.")
except WorkspaceError as e:
self._add_system(f"Error: {e}")
except Exception as e:
self._add_system(f"Error creating workspace: {e}")
except Exception as e:
self._add_system(f"Workspace command error: {e}")
return
elif cmd_original.startswith("/agent"):
await self._parse_agent_command(cmd_original)
elif cmd_original.startswith("/crew"):
await self._parse_crew_command(cmd_original)
else:
self._add_system(f"Unknown command: {cmd}\nType /help for commands.")
async def _parse_agent_command(self, cmd: str) -> None:
"""Parse and execute /agent command"""
# Remove /agent prefix
rest = cmd[6:].strip()
if not rest:
self._add_system(
"Usage: /agent <task>\n"
"Example: /agent scan 192.168.1.1\n"
" /agent enumerate SSH on target"
)
return
task = rest
if not task:
self._add_system("Error: No task provided. Usage: /agent <task>")
return
self._add_user(f"/agent {task}")
self._add_system(">> Agent Mode")
# Hide crew sidebar when entering agent mode
self._hide_sidebar()
if self.agent and not self._is_running:
# Schedule agent mode and keep task handle
# Schedule agent run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
self._current_worker = self._run_agent_mode(task)
async def _parse_crew_command(self, cmd: str) -> None:
"""Parse and execute /crew command"""
# Remove /crew prefix
rest = cmd[5:].strip()
if not rest:
self._add_system(
"Usage: /crew <task>\n"
"Example: /crew https://example.com\n"
" /crew 192.168.1.100\n\n"
"Crew mode spawns specialized workers in parallel:\n"
" - recon: Reconnaissance and mapping\n"
" - sqli: SQL injection testing\n"
" - xss: Cross-site scripting testing\n"
" - ssrf: Server-side request forgery\n"
" - auth: Authentication testing\n"
" - idor: Insecure direct object references\n"
" - info: Information disclosure"
)
return
target = rest
if not self._is_running:
self._add_user(f"/crew {target}")
self._show_sidebar()
# Schedule crew mode and keep handle
# Schedule crew run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
self._current_worker = self._run_crew_mode(target)
def _show_sidebar(self) -> None:
"""Show the sidebar for crew mode."""
try:
import time
sidebar = self.query_one("#sidebar")
sidebar.add_class("visible")
chat_area = self.query_one("#chat-area")
chat_area.add_class("with-sidebar")
# Setup tree
tree = self.query_one("#workers-tree", CrewTree)
tree.root.expand()
tree.show_root = False
# Clear old nodes
tree.root.remove_children()
self._crew_worker_nodes.clear()
self._crew_workers.clear()
self._worker_events.clear()
self._crew_findings_count = 0
# Start tracking time and tokens
self._crew_start_time = time.time()
self._crew_tokens_used = 0
# Start stats timer (update every second)
if self._crew_stats_timer:
self._crew_stats_timer.stop()
self._crew_stats_timer = self.set_interval(1.0, self._update_crew_stats)
# Start spinner timer for running workers (faster interval for smooth animation)
if self._spinner_timer:
self._spinner_timer.stop()
self._spinner_timer = self.set_interval(0.15, self._update_spinner)
# Add crew root node (no orchestrator - just "CREW" header)
self._crew_orchestrator_node = tree.root.add(
"CREW", data={"type": "crew", "id": "crew"}
)
if self._crew_orchestrator_node:
try:
self._crew_orchestrator_node.expand()
tree.select_node(self._crew_orchestrator_node)
except Exception as e:
logging.getLogger(__name__).exception("Failed to expand/select crew orchestrator node: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to expand crew sidebar node: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about crew node expansion failure")
self._viewing_worker_id = None
# Update stats
self._update_crew_stats()
except Exception as e:
self._add_system(f"[!] Sidebar error: {e}")
def _apply_target_display(self, target: str) -> None:
"""Update or insert the Target line in the system/banner area."""
try:
scroll = self.query_one("#chat-scroll", ScrollableContainer)
updated = False
for child in scroll.children:
if isinstance(child, SystemMessage) and "PentestAgent ready" in getattr(
child, "message_content", ""
):
# Replace existing Target line if present, otherwise append
try:
if "Target:" in child.message_content:
child.message_content = re.sub(
r"(?m)^\s*Target:.*$",
f" Target: {target}",
child.message_content,
count=1,
)
else:
child.message_content = (
child.message_content + f"\n Target: {target}"
)
try:
child.refresh()
except Exception as e:
logging.getLogger(__name__).exception("Failed to refresh child message: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"TUI: failed to refresh UI element: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about child refresh failure")
except Exception as e:
logging.getLogger(__name__).exception("Failed to update SystemMessage target line: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"Failed to update target display: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about target update error")
child.message_content = (
child.message_content + f"\n Target: {target}"
)
updated = True
break
if not updated:
try:
first = scroll.children[0] if scroll.children else None
msg = SystemMessage(f" Target: {target}")
if first:
scroll.mount_before(msg, first)
else:
scroll.mount(msg)
except Exception:
self._add_system(f" Target: {target}")
except Exception:
self._add_system(f" Target: {target}")
def _hide_sidebar(self) -> None:
"""Hide the sidebar."""
try:
# Stop stats timer
if self._crew_stats_timer:
self._crew_stats_timer.stop()
self._crew_stats_timer = None
sidebar = self.query_one("#sidebar")
sidebar.remove_class("visible")
chat_area = self.query_one("#chat-area")
chat_area.remove_class("with-sidebar")
except Exception as e:
logging.getLogger(__name__).exception("Sidebar error: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: sidebar error: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about sidebar error")
def _update_crew_stats(self) -> None:
"""Update crew stats panel."""
try:
import time
text = Text()
# Elapsed time
text.append("Time: ", style="bold #d4d4d4")
if self._crew_start_time:
elapsed = time.time() - self._crew_start_time
if elapsed < 60:
time_str = f"{int(elapsed)}s"
elif elapsed < 3600:
mins = int(elapsed // 60)
secs = int(elapsed % 60)
time_str = f"{mins}m {secs}s"
else:
hrs = int(elapsed // 3600)
mins = int((elapsed % 3600) // 60)
time_str = f"{hrs}h {mins}m"
text.append(time_str, style="#9a9a9a")
else:
text.append("--", style="#525252")
text.append("\n")
# Tokens used
text.append("Tokens: ", style="bold #d4d4d4")
if self._crew_tokens_used > 0:
if self._crew_tokens_used >= 1000:
token_str = f"{self._crew_tokens_used / 1000:.1f}k"
else:
token_str = str(self._crew_tokens_used)
text.append(token_str, style="#9a9a9a")
else:
text.append("--", style="#525252")
stats = self.query_one("#crew-stats", Static)
stats.update(text)
stats.border_title = "# Stats"
except Exception as e:
logging.getLogger(__name__).exception("Failed to hide sidebar: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to hide sidebar: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hide_sidebar failure")
def _update_spinner(self) -> None:
"""Update spinner animation for running workers."""
try:
# Advance spinner frame
self._spinner_frame += 1
# Only update labels for running workers (efficient)
has_running = False
for worker_id, worker in self._crew_workers.items():
if worker.get("status") == "running":
has_running = True
# Update the tree node label
if worker_id in self._crew_worker_nodes:
node = self._crew_worker_nodes[worker_id]
node.set_label(self._format_worker_label(worker_id))
# Stop spinner if no workers are running (save resources)
if not has_running and self._spinner_timer:
self._spinner_timer.stop()
self._spinner_timer = None
except Exception as e:
logging.getLogger(__name__).exception("Failed to update crew stats: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to update crew stats: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about crew stats update failure")
def _add_crew_worker(self, worker_id: str, worker_type: str, task: str) -> None:
"""Add a worker to the sidebar tree."""
self._crew_workers[worker_id] = {
"worker_type": worker_type,
"task": task,
"status": "pending",
"findings": 0,
}
try:
label = self._format_worker_label(worker_id)
if self._crew_orchestrator_node:
node = self._crew_orchestrator_node.add(
label, data={"type": "worker", "id": worker_id}
)
self._crew_worker_nodes[worker_id] = node
try:
self._crew_orchestrator_node.expand()
except Exception as e:
logging.getLogger(__name__).exception("Failed to expand crew orchestrator node: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to expand crew node: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about crew node expansion failure")
self._update_crew_stats()
except Exception as e:
logging.getLogger(__name__).exception("Failed to update spinner: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to update spinner: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about spinner update failure")
def _update_crew_worker(self, worker_id: str, **updates) -> None:
"""Update a worker's state."""
if worker_id not in self._crew_workers:
return
self._crew_workers[worker_id].update(updates)
# Restart spinner if a worker started running
if updates.get("status") == "running" and not self._spinner_timer:
self._spinner_timer = self.set_interval(0.15, self._update_spinner)
try:
if worker_id in self._crew_worker_nodes:
label = self._format_worker_label(worker_id)
self._crew_worker_nodes[worker_id].set_label(label)
self._update_crew_stats()
except Exception as e:
logging.getLogger(__name__).exception("Failed to add crew worker node: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to add crew worker node: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about add_crew_worker failure")
def _format_worker_label(self, worker_id: str) -> Text:
"""Format worker label for tree."""
worker = self._crew_workers.get(worker_id, {})
status = worker.get("status", "pending")
wtype = worker.get("worker_type", "worker")
findings = worker.get("findings", 0)
# 4-state icons: working (braille), done (checkmark), warning (!), error (X)
if status in ("running", "pending"):
# Animated braille spinner for all in-progress states
icon = self._spinner_frames[self._spinner_frame % len(self._spinner_frames)]
color = "#d4d4d4" # white
elif status == "complete":
icon = ""
color = "#22c55e" # green
elif status == "warning":
icon = "!"
color = "#f59e0b" # amber/orange
else: # error, cancelled, unknown
icon = ""
color = "#ef4444" # red
text = Text()
text.append(f"{icon} ", style=color)
text.append(wtype.upper(), style="bold")
if status == "complete" and findings > 0:
text.append(f" [{findings}]", style="#22c55e") # green
elif status in ("error", "cancelled"):
# Don't append " !" here since we already have the X icon
pass
return text
def _handle_worker_event(
self, worker_id: str, event_type: str, data: Dict[str, Any]
) -> None:
"""Handle worker events from CrewAgent - updates tree sidebar only."""
try:
if event_type == "spawn":
worker_type = data.get("worker_type", "unknown")
task = data.get("task", "")
self._add_crew_worker(worker_id, worker_type, task)
elif event_type == "status":
status = data.get("status", "running")
self._update_crew_worker(worker_id, status=status)
elif event_type == "tool":
# Add tool as child node under the agent
tool_name = data.get("tool", "unknown")
self._add_tool_to_worker(worker_id, tool_name)
elif event_type == "tokens":
# Track token usage
tokens = data.get("tokens", 0)
self._crew_tokens_used += tokens
elif event_type == "complete":
findings_count = data.get("findings_count", 0)
self._update_crew_worker(
worker_id, status="complete", findings=findings_count
)
self._crew_findings_count += findings_count
self._update_crew_stats()
elif event_type == "warning":
# Worker hit max iterations but has results
self._update_crew_worker(worker_id, status="warning")
reason = data.get("reason", "Partial completion")
worker = self._crew_workers.get(worker_id, {})
wtype = worker.get("worker_type", "worker")
self._add_system(f"[!] {wtype.upper()} stopped: {reason}")
self._update_crew_stats()
elif event_type == "failed":
# Worker determined task infeasible
self._update_crew_worker(worker_id, status="failed")
reason = data.get("reason", "Task infeasible")
worker = self._crew_workers.get(worker_id, {})
wtype = worker.get("worker_type", "worker")
self._add_system(f"[!] {wtype.upper()} failed: {reason}")
self._update_crew_stats()
elif event_type == "error":
self._update_crew_worker(worker_id, status="error")
worker = self._crew_workers.get(worker_id, {})
wtype = worker.get("worker_type", "worker")
error_msg = data.get("error", "Unknown error")
# Only show errors in chat - they're important
self._add_system(f"[!] {wtype.upper()} failed: {error_msg}")
except Exception as e:
self._add_system(f"[!] Worker event error: {e}")
def _add_tool_to_worker(self, worker_id: str, tool_name: str) -> None:
"""Add a tool usage as child node under worker in tree."""
try:
node = self._crew_worker_nodes.get(worker_id)
if node:
node.add_leaf(f" {tool_name}")
node.expand()
except Exception as e:
logging.getLogger(__name__).exception("Failed to update crew worker display: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to update crew worker display: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about update_crew_worker failure")
@on(Tree.NodeSelected, "#workers-tree")
def on_worker_tree_selected(self, event: Tree.NodeSelected) -> None:
"""Handle tree node selection."""
node = event.node
if node.data:
node_type = node.data.get("type")
if node_type == "crew":
self._viewing_worker_id = None
elif node_type == "worker":
self._viewing_worker_id = node.data.get("id")
@work(thread=False)
async def _run_crew_mode(self, target: str) -> None:
"""Run crew mode with sidebar."""
self._is_running = True
self._should_stop = False
self._set_status("thinking", "crew")
try:
from ..agents.base_agent import AgentMessage
from ..agents.crew import CrewOrchestrator
from ..llm import LLM, ModelConfig
# Build prior context from assist/agent conversation history
prior_context = self._build_prior_context()
# Ensure model/runtime are available for static analysis
assert self.model is not None
assert self.runtime is not None
llm = LLM(model=self.model, config=ModelConfig(temperature=0.7))
crew = CrewOrchestrator(
llm=llm,
tools=self.all_tools,
runtime=self.runtime,
on_worker_event=self._handle_worker_event,
rag_engine=self.rag_engine,
target=target,
prior_context=prior_context,
)
self._current_crew = crew # Track for cancellation
self._add_system(f"@ Task: {target}")
# Track crew results for memory
crew_report = None
async for update in crew.run(target):
if self._should_stop:
await crew.cancel()
self._add_system("[!] Stopped by user")
break
phase = update.get("phase", "")
if phase == "starting":
self._set_status("thinking", "crew")
elif phase == "tokens":
# Track orchestrator token usage
tokens = update.get("tokens", 0)
self._crew_tokens_used += tokens
self._update_crew_stats()
elif phase == "thinking":
# Show the orchestrator's reasoning
content = update.get("content", "")
if content:
self._add_thinking(content)
elif phase == "tool_call":
# Show orchestration tool calls
tool = update.get("tool", "")
args = update.get("args", {})
self._add_tool(tool, str(args))
elif phase == "tool_result":
# Tool results are tracked via worker events
pass
elif phase == "complete":
crew_report = update.get("report", "")
if crew_report:
self._add_assistant(crew_report)
elif phase == "error":
error = update.get("error", "Unknown error")
self._add_system(f"[!] Crew error: {error}")
# Add crew results to main agent's conversation history
# so assist mode can reference what happened
if self.agent and crew_report:
# Add the crew task as a user message
self.agent.conversation_history.append(
AgentMessage(
role="user",
content=f"[CREW MODE] Run parallel analysis on target: {target}",
)
)
# Add the crew report as assistant response
self.agent.conversation_history.append(
AgentMessage(role="assistant", content=crew_report)
)
self._set_status("complete", "crew")
self._add_system("+ Crew task complete.")
# Stop timers
if self._crew_stats_timer:
self._crew_stats_timer.stop()
self._crew_stats_timer = None
if self._spinner_timer:
self._spinner_timer.stop()
self._spinner_timer = None
# Clear crew reference
self._current_crew = None
except asyncio.CancelledError:
# Cancel crew workers first
if self._current_crew:
await self._current_crew.cancel()
self._current_crew = None
self._add_system("[!] Cancelled")
self._set_status("idle", "crew")
# Stop timers on cancel
if self._crew_stats_timer:
self._crew_stats_timer.stop()
self._crew_stats_timer = None
if self._spinner_timer:
self._spinner_timer.stop()
self._spinner_timer = None
except Exception as e:
import traceback
# Cancel crew workers on error too
if self._current_crew:
try:
await self._current_crew.cancel()
except Exception as e:
logging.getLogger(__name__).exception("Failed to add tool to worker node: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to add tool to worker: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about add_tool_to_worker failure")
self._current_crew = None
self._add_system(f"[!] Crew error: {e}\n{traceback.format_exc()}")
self._set_status("error")
# Stop timers on error too
if self._crew_stats_timer:
self._crew_stats_timer.stop()
self._crew_stats_timer = None
if self._spinner_timer:
self._spinner_timer.stop()
self._spinner_timer = None
finally:
self._is_running = False
@work(thread=False)
async def _run_assist(self, message: str) -> None:
"""Run in assist mode - single response"""
if not self.agent:
self._add_system("[!] Agent not ready")
return
self._is_running = True
self._should_stop = False
self._set_status("thinking", "assist")
try:
async for response in self.agent.assist(message):
if self._should_stop:
self._add_system("[!] Stopped by user")
break
self._set_status("processing")
# Show thinking/plan FIRST if there's content with tool calls
if response.content:
content = response.content.strip()
if response.tool_calls:
self._add_thinking(content)
else:
self._add_assistant(content)
# Show tool calls (skip 'finish' - internal control)
if response.tool_calls:
for call in response.tool_calls:
if call.name == "finish":
continue # Skip - summary shown as final message
args_str = str(call.arguments)
self._add_tool(call.name, args_str)
# Show tool results (displayed after execution completes)
# Skip 'finish' tool - its result is shown as the final summary
if response.tool_results:
for result in response.tool_results:
if result.tool_name == "finish":
continue # Skip - summary shown separately
if result.success:
self._add_tool_result(
result.tool_name, result.result or "Done"
)
else:
self._add_tool_result(
result.tool_name, f"Error: {result.error}"
)
self._set_status("idle", "assist")
except asyncio.CancelledError:
self._add_system("[!] Cancelled")
self._set_status("idle", "assist")
except Exception as e:
self._add_system(f"[!] Error: {e}")
self._set_status("error")
finally:
self._is_running = False
@work(thread=False)
async def _run_agent_mode(self, task: str) -> None:
"""Run in agent mode - autonomous until task complete or user stops"""
if not self.agent:
self._add_system("[!] Agent not ready")
return
self._is_running = True
self._should_stop = False
self._set_status("thinking", "agent")
try:
async for response in self.agent.agent_loop(task):
if self._should_stop:
self._add_system("[!] Stopped by user")
break
self._set_status("processing")
# Show thinking/plan FIRST if there's content with tool calls
if response.content:
content = response.content.strip()
# If it has tool calls, it's thinking.
# If it's marked as intermediate, it's thinking.
if response.tool_calls or response.metadata.get("intermediate"):
self._add_thinking(content)
else:
# Check if this is a task completion message
if response.metadata.get("task_complete"):
self._add_assistant(content)
else:
self._add_assistant(content)
# Show tool calls AFTER thinking
if response.tool_calls:
for call in response.tool_calls:
# Show all tools including finish
args_str = str(call.arguments)
self._add_tool(call.name, args_str)
# Show tool results
if response.tool_results:
for result in response.tool_results:
if result.tool_name == "finish":
# Skip showing result for finish tool as it's redundant with the tool call display
continue
if result.success:
self._add_tool_result(
result.tool_name, result.result or "Done"
)
else:
self._add_tool_result(
result.tool_name, f"Error: {result.error}"
)
# Check state
if self.agent.state.value == "waiting_input":
self._set_status("waiting")
self._add_system("? Awaiting input...")
break
elif self.agent.state.value == "complete":
break
self._set_status("thinking")
self._set_status("complete", "agent")
self._add_system("+ Agent task complete. Back to assist mode.")
# Return to assist mode
await asyncio.sleep(1)
self._set_status("idle", "assist")
except asyncio.CancelledError:
self._add_system("[!] Cancelled")
self._set_status("idle", "assist")
except Exception as e:
self._add_system(f"[!] Error: {e}")
self._set_status("error")
finally:
self._is_running = False
def action_quit_app(self) -> None:
# Stop any running tasks first
if self._is_running:
self._should_stop = True
if self._current_worker and not getattr(self._current_worker, "done", lambda: False)():
cancel = getattr(self._current_worker, "cancel", None)
if cancel:
cancel()
if self._current_crew:
# Schedule cancel but don't wait - we're exiting
asyncio.create_task(self._cancel_crew())
self.exit()
def action_stop_agent(self) -> None:
if self._is_running:
self._should_stop = True
self._add_system("[!] Stopping...")
# Cancel the running worker to interrupt blocking awaits
if self._current_worker and not getattr(self._current_worker, "done", lambda: False)():
cancel = getattr(self._current_worker, "cancel", None)
if cancel:
cancel()
# Cancel crew orchestrator if running
if self._current_crew:
asyncio.create_task(self._cancel_crew())
# Clean up agent state to prevent stale tool responses
if self.agent:
self.agent.cleanup_after_cancel()
# Reconnect MCP servers (they may be in a bad state after cancellation)
if self.mcp_manager:
asyncio.create_task(self._reconnect_mcp_after_cancel())
async def _cancel_crew(self) -> None:
"""Cancel crew orchestrator and all workers."""
try:
if self._current_crew:
await self._current_crew.cancel()
self._current_crew = None
# Mark all running workers as cancelled in the UI
for worker_id, worker in self._crew_workers.items():
if worker.get("status") in ("running", "pending"):
self._update_crew_worker(worker_id, status="cancelled")
except Exception as e:
logging.getLogger(__name__).exception("Failed to cancel crew orchestrator cleanly: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed during crew cancellation: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about crew cancellation failure")
async def _reconnect_mcp_after_cancel(self) -> None:
"""Reconnect MCP servers after cancellation to restore clean state."""
await asyncio.sleep(0.5) # Brief delay for cancellation to propagate
try:
if self.mcp_manager:
await self.mcp_manager.reconnect_all()
except Exception as e:
logging.getLogger(__name__).exception("Failed to reconnect MCP servers after cancel: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: failed to reconnect MCP servers after cancel: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about MCP reconnect failure")
def action_show_help(self) -> None:
self.push_screen(HelpScreen())
# ----- History navigation -----
def action_history_up(self) -> None:
"""Recall previous input into the chat field."""
try:
inp = self.query_one("#chat-input", Input)
except Exception as e:
logging.getLogger(__name__).exception("Failed to query chat input for history up: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: history navigation failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about history_up failure")
return
if not self._cmd_history:
return
# Move back but not below zero
if self._history_index > 0:
self._history_index -= 1
inp.value = self._cmd_history[self._history_index]
def action_history_down(self) -> None:
"""Recall next input (or clear when at end)."""
try:
inp = self.query_one("#chat-input", Input)
except Exception as e:
logging.getLogger(__name__).exception("Failed to query chat input for history down: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: history navigation failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about history_down failure")
return
if not self._cmd_history:
return
if self._history_index < len(self._cmd_history) - 1:
self._history_index += 1
inp.value = self._cmd_history[self._history_index]
else:
# Past the end: clear input and set index to end
self._history_index = len(self._cmd_history)
inp.value = ""
async def on_unmount(self) -> None:
"""Cleanup"""
if self.mcp_manager:
try:
await self.mcp_manager.disconnect_all()
await asyncio.sleep(0.1)
except Exception as e:
logging.getLogger(__name__).exception("Failed to disconnect MCP manager on unmount: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: error during shutdown disconnect: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about MCP disconnect failure")
if self.runtime:
try:
await self.runtime.stop()
except Exception as e:
logging.getLogger(__name__).exception("Failed to stop runtime on unmount: %s", e)
try:
from .notifier import notify
notify("warning", f"TUI: runtime stop error during shutdown: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about runtime stop failure")
# ----- Entry Point -----
def run_tui(
target: Optional[str] = None,
model: Optional[str] = None,
use_docker: bool = False,
):
"""Run the PentestAgent TUI"""
app = PentestAgentTUI(
target=target,
model=model,
use_docker=use_docker,
)
app.run()
if __name__ == "__main__":
run_tui()