mirror of
https://github.com/GH05TCREW/pentestagent.git
synced 2026-03-07 14:23:20 +00:00
3444 lines
129 KiB
Python
3444 lines
129 KiB
Python
"""
|
|
PentestAgent TUI - Terminal User Interface
|
|
"""
|
|
|
|
import asyncio
|
|
import re
|
|
import textwrap
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast
|
|
|
|
from rich.text import Text
|
|
from textual import on, work
|
|
from textual.app import App, ComposeResult
|
|
from textual.binding import Binding
|
|
from textual.containers import (
|
|
Center,
|
|
Container,
|
|
Horizontal,
|
|
ScrollableContainer,
|
|
Vertical,
|
|
)
|
|
from textual.reactive import reactive
|
|
from textual.screen import ModalScreen
|
|
from textual.scrollbar import ScrollBar, ScrollBarRender
|
|
from textual.timer import Timer
|
|
from textual.widgets import Button, Input, Static, Tree
|
|
from textual.widgets.tree import TreeNode
|
|
|
|
from ..config.constants import DEFAULT_MODEL
|
|
|
|
# ANSI escape sequence pattern for stripping control codes from input
|
|
_ANSI_ESCAPE = re.compile(
|
|
r"\x1b\[[0-9;]*[mGKHflSTABCDEFsu]|\x1b\].*?\x07|\x1b\[<[0-9;]*[Mm]"
|
|
)
|
|
|
|
|
|
# ASCII-safe scrollbar renderer to avoid Unicode glyph issues
|
|
class ASCIIScrollBarRender(ScrollBarRender):
|
|
"""Scrollbar renderer using ASCII-safe characters."""
|
|
|
|
BLANK_GLYPH = " "
|
|
VERTICAL_BARS = [" ", " ", " ", " ", " ", " ", " ", " "]
|
|
HORIZONTAL_BARS = [" ", " ", " ", " ", " ", " ", " ", " "]
|
|
|
|
|
|
# Apply ASCII scrollbar globally
|
|
ScrollBar.renderer = ASCIIScrollBarRender
|
|
|
|
|
|
# Custom Tree with ASCII-safe icons for PowerShell compatibility
|
|
class CrewTree(Tree):
|
|
"""Tree widget with ASCII-compatible expand/collapse icons."""
|
|
|
|
ICON_NODE = "> "
|
|
ICON_NODE_EXPANDED = "v "
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from ..agents.pa_agent import PentestAgentAgent
|
|
|
|
|
|
def wrap_text_lines(text: str, width: int = 80) -> List[str]:
|
|
"""
|
|
Wrap text content preserving line breaks and wrapping long lines.
|
|
|
|
Args:
|
|
text: The text to wrap
|
|
width: Maximum width per line (default 80 for safe terminal fit)
|
|
|
|
Returns:
|
|
List of wrapped lines
|
|
"""
|
|
result = []
|
|
for line in text.split("\n"):
|
|
if len(line) <= width:
|
|
result.append(line)
|
|
else:
|
|
# Wrap long lines
|
|
wrapped = textwrap.wrap(
|
|
line, width=width, break_long_words=False, break_on_hyphens=False
|
|
)
|
|
result.extend(wrapped if wrapped else [""])
|
|
return result
|
|
|
|
|
|
# ----- Help Screen -----
|
|
|
|
|
|
class HelpScreen(ModalScreen):
|
|
"""Help modal"""
|
|
|
|
BINDINGS = [
|
|
Binding("escape", "dismiss", "Close"),
|
|
Binding("q", "dismiss", "Close"),
|
|
]
|
|
|
|
CSS = """
|
|
HelpScreen {
|
|
align: center middle;
|
|
scrollbar-background: #1a1a1a;
|
|
scrollbar-background-hover: #1a1a1a;
|
|
scrollbar-background-active: #1a1a1a;
|
|
scrollbar-color: #3a3a3a;
|
|
scrollbar-color-hover: #3a3a3a;
|
|
scrollbar-color-active: #3a3a3a;
|
|
scrollbar-corner-color: #1a1a1a;
|
|
scrollbar-size: 1 1;
|
|
}
|
|
|
|
#help-container {
|
|
width: 110;
|
|
height: 30;
|
|
background: #121212;
|
|
border: solid #3a3a3a;
|
|
padding: 1 2;
|
|
layout: vertical;
|
|
}
|
|
|
|
#help-title {
|
|
text-align: center;
|
|
text-style: bold;
|
|
color: #d4d4d4;
|
|
margin-bottom: 1;
|
|
}
|
|
|
|
#help-content {
|
|
color: #9a9a9a;
|
|
}
|
|
|
|
|
|
#help-close {
|
|
margin-top: 1;
|
|
width: auto;
|
|
min-width: 10;
|
|
background: #1a1a1a;
|
|
color: #9a9a9a;
|
|
border: none;
|
|
}
|
|
|
|
#help-close:hover {
|
|
background: #262626;
|
|
}
|
|
|
|
#help-close:focus {
|
|
background: #262626;
|
|
text-style: none;
|
|
}
|
|
"""
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Container(
|
|
Static("PentestAgent Help", id="help-title"),
|
|
Static(self._get_help_text(), id="help-content"),
|
|
Center(Button("Close", id="help-close")),
|
|
id="help-container",
|
|
)
|
|
|
|
def _get_help_text(self) -> str:
|
|
header = (
|
|
"[bold]Modes:[/] Assist | Agent | Crew\n"
|
|
"[bold]Keys:[/] Enter=Send Up/Down=History Ctrl+Q=Quit\n\n"
|
|
"[bold]Commands:[/]\n"
|
|
)
|
|
|
|
cmds = [
|
|
("/agent <task>", "Run in agent mode"),
|
|
("/crew <task>", "Run multi-agent crew mode"),
|
|
("/target <host>", "Set target"),
|
|
("/prompt", "Show system prompt"),
|
|
("/memory", "Show memory stats"),
|
|
("/token", "Show token usage & cost"),
|
|
("/notes", "Show saved notes"),
|
|
("/report", "Generate report"),
|
|
("/help", "Show help"),
|
|
("/clear", "Clear chat"),
|
|
("/tools", "List tools"),
|
|
("/mcp", "List mcp servers"),
|
|
("/quit", "Exit"),
|
|
]
|
|
|
|
# Determine consistent width for command column so the dash aligns
|
|
cmd_col_width = max(len(c) for c, _ in cmds) + 3 # padding before dash
|
|
lines = []
|
|
for cmd, desc in cmds:
|
|
pad = " " * (cmd_col_width - len(cmd))
|
|
lines.append(f" {cmd}{pad}- {desc}")
|
|
|
|
return header + "\n".join(lines)
|
|
|
|
def action_dismiss(self) -> None:
|
|
self.app.pop_screen()
|
|
|
|
@on(Button.Pressed, "#help-close")
|
|
def close_help(self) -> None:
|
|
self.app.pop_screen()
|
|
|
|
|
|
class WorkspaceHelpScreen(ModalScreen):
|
|
"""Help modal for workspace commands."""
|
|
|
|
BINDINGS = [
|
|
Binding("escape", "dismiss", "Close"),
|
|
Binding("q", "dismiss", "Close"),
|
|
]
|
|
|
|
CSS = """
|
|
WorkspaceHelpScreen {
|
|
align: center middle;
|
|
scrollbar-background: #1a1a1a;
|
|
scrollbar-background-hover: #1a1a1a;
|
|
scrollbar-background-active: #1a1a1a;
|
|
scrollbar-color: #3a3a3a;
|
|
scrollbar-color-hover: #3a3a3a;
|
|
scrollbar-color-active: #3a3a3a;
|
|
scrollbar-corner-color: #1a1a1a;
|
|
scrollbar-size: 1 1;
|
|
}
|
|
|
|
#help-container {
|
|
width: 60;
|
|
height: 26;
|
|
background: #121212;
|
|
border: solid #3a3a3a;
|
|
padding: 1 2;
|
|
layout: vertical;
|
|
}
|
|
|
|
#help-title {
|
|
text-align: center;
|
|
text-style: bold;
|
|
color: #d4d4d4;
|
|
margin-bottom: 1;
|
|
}
|
|
|
|
#help-content {
|
|
color: #9a9a9a;
|
|
}
|
|
|
|
|
|
#help-close {
|
|
margin-top: 1;
|
|
width: auto;
|
|
min-width: 10;
|
|
background: #1a1a1a;
|
|
color: #9a9a9a;
|
|
border: none;
|
|
}
|
|
|
|
#help-close:hover {
|
|
background: #262626;
|
|
}
|
|
|
|
#help-close:focus {
|
|
background: #262626;
|
|
text-style: none;
|
|
}
|
|
"""
|
|
def compose(self) -> ComposeResult:
|
|
from rich.table import Table
|
|
from rich.text import Text
|
|
|
|
# Build a two-column table to prevent wrapping
|
|
table = Table.grid(padding=(0, 3))
|
|
table.add_column(justify="left", ratio=2)
|
|
table.add_column(justify="left", ratio=3)
|
|
|
|
# Header and usage
|
|
header = Text("Workspace Commands", style="bold")
|
|
usage = Text("Usage: /workspace <action> or /workspace <name>")
|
|
|
|
# Commands list
|
|
cmds = [
|
|
("/workspace", "Show active"),
|
|
("/workspace list", "List all workspaces"),
|
|
("/workspace info [NAME]", "Show workspace metadata"),
|
|
("/workspace note <text>", "Add operator note"),
|
|
("/workspace clear", "Deactivate workspace"),
|
|
("/workspace NAME", "Create or activate workspace"),
|
|
("/workspace help", "Show this help"),
|
|
]
|
|
|
|
# Compose table rows
|
|
table.add_row(Text("Commands:", style="bold"), Text(""))
|
|
|
|
for left, right in cmds:
|
|
table.add_row(left, right)
|
|
|
|
yield Container(
|
|
Static(header, id="help-title"),
|
|
Static(usage, id="help-usage"),
|
|
Static(table, id="help-content"),
|
|
Center(Button("Close", id="help-close"), id="help-center"),
|
|
id="help-container",
|
|
)
|
|
|
|
def _get_help_text(self) -> str:
|
|
header = "Usage: /workspace <action> or /workspace <name>\n"
|
|
cmds = [
|
|
("/workspace", "Show active"),
|
|
("/workspace list", "List all workspaces"),
|
|
("/workspace info [NAME]", "Show workspace metadata"),
|
|
("/workspace note <text>", "Add operator note"),
|
|
("/workspace clear", "Deactivate workspace"),
|
|
("/workspace NAME", "Create or activate workspace"),
|
|
("/workspace help", "Show this help"),
|
|
]
|
|
|
|
# Build two-column layout with fixed left column width
|
|
left_width = 44
|
|
lines = [header, "Commands:\n"]
|
|
for left, right in cmds:
|
|
if len(left) >= left_width - 2:
|
|
# if left is long, place on its own line
|
|
lines.append(f" {left}\n {right}")
|
|
else:
|
|
pad = " " * (left_width - len(left))
|
|
lines.append(f" {left}{pad}{right}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
def action_dismiss(self) -> None:
|
|
self.app.pop_screen()
|
|
|
|
@on(Button.Pressed, "#help-close")
|
|
def close_help(self) -> None:
|
|
self.app.pop_screen()
|
|
|
|
|
|
class ToolsScreen(ModalScreen):
|
|
"""Interactive tools browser — split-pane layout.
|
|
|
|
Left pane: tree of tools. Right pane: full description (scrollable).
|
|
Selecting another tool replaces the right-pane content. Close returns
|
|
to the main screen.
|
|
"""
|
|
|
|
BINDINGS = [Binding("escape", "dismiss", "Close"), Binding("q", "dismiss", "Close")]
|
|
|
|
CSS = """
|
|
ToolsScreen { align: center middle; }
|
|
"""
|
|
|
|
def __init__(self, tools: List[Any]) -> None:
|
|
super().__init__()
|
|
self.tools = tools
|
|
|
|
def compose(self) -> ComposeResult:
|
|
# Build a split view: left tree, right description
|
|
with Container(id="tools-container"):
|
|
with Horizontal(id="tools-split"):
|
|
with Vertical(id="tools-left"):
|
|
yield Static("Tools", id="tools-title")
|
|
yield Tree("TOOLS", id="tools-tree")
|
|
|
|
with Vertical(id="tools-right"):
|
|
yield Static("Description", id="tools-desc-title")
|
|
yield ScrollableContainer(Static("Select a tool to view details.", id="tools-desc"), id="tools-desc-scroll")
|
|
|
|
yield Center(Button("Close", id="tools-close"))
|
|
|
|
def on_mount(self) -> None:
|
|
try:
|
|
tree = self.query_one("#tools-tree", Tree)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to query tools tree: %s", e)
|
|
try:
|
|
from ..interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to initialize tools tree: {e}")
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about tools tree init failure: %s", e)
|
|
return
|
|
|
|
root = tree.root
|
|
root.allow_expand = True
|
|
root.show_root = False
|
|
|
|
# Populate tool nodes
|
|
for t in self.tools:
|
|
name = getattr(t, "name", str(t))
|
|
root.add(name, data={"tool": t})
|
|
|
|
try:
|
|
tree.focus()
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to focus tools tree: %s", e)
|
|
try:
|
|
from ..interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to focus tools tree: {e}")
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about tools tree focus failure: %s", e)
|
|
|
|
@on(Tree.NodeSelected, "#tools-tree")
|
|
def on_tool_selected(self, event: Tree.NodeSelected) -> None:
|
|
node = event.node
|
|
try:
|
|
tool = node.data.get("tool") if node.data else None
|
|
name = node.label or (getattr(tool, "name", str(tool)) if tool else "Unknown")
|
|
|
|
# Prefer Tool.description (registered tools use this), then fall back
|
|
desc = None
|
|
if tool is not None:
|
|
desc = getattr(tool, "description", None)
|
|
if not desc:
|
|
desc = (
|
|
getattr(tool, "summary", None)
|
|
or getattr(tool, "help_text", None)
|
|
or getattr(tool, "__doc__", None)
|
|
)
|
|
if not desc:
|
|
desc = "No description available."
|
|
|
|
# Update right-hand description pane
|
|
try:
|
|
desc_widget = self.query_one("#tools-desc", Static)
|
|
text = Text()
|
|
text.append(f"{name}\n", style="bold #d4d4d4")
|
|
text.append(str(desc), style="#d4d4d4")
|
|
desc_widget.update(text)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to update tool description pane: %s", e)
|
|
try:
|
|
from ..interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to update tool description: {e}")
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about tool desc update failure: %s", e)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Unhandled error in on_tool_selected: %s", e)
|
|
try:
|
|
from ..interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: error handling tool selection: {e}")
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about tool selection error: %s", e)
|
|
|
|
@on(Button.Pressed, "#tools-close")
|
|
def close_tools(self) -> None:
|
|
self.app.pop_screen()
|
|
|
|
|
|
|
|
class MCPScreen(ModalScreen):
|
|
"""Interactive MCP browser — split-pane layout."""
|
|
|
|
BINDINGS = [
|
|
Binding("escape", "dismiss", "Close"),
|
|
Binding("q", "dismiss", "Close"),
|
|
]
|
|
|
|
CSS = """
|
|
MCPScreen { align: center middle; }
|
|
"""
|
|
|
|
from ..mcp import MCPManager
|
|
|
|
def __init__(self, mcp_manager: MCPManager) -> None:
|
|
super().__init__()
|
|
self.mcp_manager = mcp_manager
|
|
self.selected_server = None
|
|
self.selected_tool = None
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
def compose(self) -> ComposeResult:
|
|
with Container(id="mcp-container"):
|
|
with Horizontal(id="mcp-split"):
|
|
|
|
# LEFT SIDE
|
|
with Vertical(id="mcp-left"):
|
|
yield Static("MCP Servers", id="mcp-title")
|
|
yield Tree("MCP Servers", id="mcp-tree")
|
|
|
|
# RIGHT SIDE
|
|
with Vertical(id="mcp-right"):
|
|
yield Static("Description", id="mcp-desc-title")
|
|
# ---- Toggle Button ----
|
|
yield Button("Enabled: 🔴", id="mcp-toggle-enabled")
|
|
|
|
yield ScrollableContainer(
|
|
Static("Select a MCP server to view details.", id="mcp-desc"),
|
|
id="mcp-desc-scroll"
|
|
)
|
|
|
|
|
|
|
|
yield Center(Button("Close", id="mcp-close"))
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
def on_mount(self) -> None:
|
|
try:
|
|
tree = self.query_one("#mcp-tree", Tree)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to query MCP tree: %s", e)
|
|
return
|
|
|
|
root = tree.root
|
|
root.allow_expand = True
|
|
root.show_root = False
|
|
|
|
servers = self.mcp_manager.get_all_servers()
|
|
|
|
for server in servers:
|
|
server_node = root.add(server.name, data={"server": server})
|
|
for tool in server.tools:
|
|
server_node.add(tool['name'], data={"tool": tool})
|
|
|
|
# Hide toggle button initially
|
|
toggle_btn = self.query_one("#mcp-toggle-enabled", Button)
|
|
toggle_btn.display = False
|
|
|
|
try:
|
|
tree.focus()
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to focus MCP tree: %s", e)
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
@on(Tree.NodeSelected, "#mcp-tree")
|
|
def on_mcp_selected(self, event: Tree.NodeSelected) -> None:
|
|
node = event.node
|
|
|
|
self.selected_server = node.data.get("server") if node.data else None
|
|
self.selected_tool = node.data.get("tool") if node.data else None
|
|
|
|
desc_widget = self.query_one("#mcp-desc", Static)
|
|
toggle_btn = self.query_one("#mcp-toggle-enabled", Button)
|
|
|
|
text = Text()
|
|
|
|
if self.selected_server is not None:
|
|
mcp = self.selected_server
|
|
|
|
text.append(f"{mcp.name}\n", style="bold #d4d4d4")
|
|
text.append(f"{mcp.config.description}\n\n", style="#d4d4d4")
|
|
|
|
text.append(f"Command: {mcp.config.command}\n", style="#9a9a9a")
|
|
text.append(f"Args: {mcp.config.args}\n\n", style="#9a9a9a")
|
|
|
|
enabled_icon = "🟢" if mcp.config.enabled else "🔴"
|
|
connected_icon = "🟢" if mcp.connected else "🔴"
|
|
|
|
text.append(f"Connected: {connected_icon}\n", style="#9a9a9a")
|
|
|
|
if mcp.last_error:
|
|
text.append(f"\nLast error: {mcp.last_error}\n", style="#e91b1b")
|
|
|
|
desc_widget.update(text)
|
|
|
|
toggle_btn.display = True
|
|
toggle_btn.label = f"Enabled: {enabled_icon}"
|
|
|
|
elif self.selected_tool is not None:
|
|
text.append(f"{self.selected_tool['description']}\n", style="#d4d4d4")
|
|
desc_widget.update(text)
|
|
toggle_btn.display = False
|
|
|
|
else:
|
|
desc_widget.update("Choose a server.")
|
|
toggle_btn.display = False
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
@on(Button.Pressed, "#mcp-toggle-enabled")
|
|
def toggle_enabled(self) -> None:
|
|
mcp = self.selected_server
|
|
if mcp is None:
|
|
return
|
|
|
|
try:
|
|
new_value = not mcp.config.enabled
|
|
|
|
if new_value:
|
|
self.mcp_manager.enable(mcp.name)
|
|
else:
|
|
self.mcp_manager.disable(mcp.name)
|
|
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Toggle failed: %s", e)
|
|
try:
|
|
from ..interface.notifier import notify
|
|
notify("error", f"Failed to toggle {mcp.name}: {e}")
|
|
except Exception:
|
|
pass
|
|
return
|
|
|
|
# Refresh UI
|
|
self._refresh_description()
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
def _refresh_description(self):
|
|
"""Rebuild right-hand side using current selection."""
|
|
if self.selected_server:
|
|
# Simulate reselecting same node
|
|
dummy = type("Node", (), {"data": {"server": self.selected_server}})
|
|
event = type("Evt", (), {"node": dummy})
|
|
self.on_mcp_selected(event)
|
|
|
|
# ------------------------------------------------------------
|
|
|
|
@on(Button.Pressed, "#mcp-close")
|
|
def close_mcp(self) -> None:
|
|
self.app.pop_screen()
|
|
|
|
|
|
|
|
# ----- Main Chat Message Widgets -----
|
|
|
|
|
|
class ThinkingMessage(Static):
|
|
"""Thinking/reasoning message"""
|
|
|
|
def __init__(self, content: str, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.thinking_content = content
|
|
|
|
def render(self) -> Text:
|
|
text = Text()
|
|
text.append("* ", style="#9a9a9a")
|
|
text.append("Thinking\n", style="bold #9a9a9a")
|
|
|
|
# Wrap content
|
|
for line in wrap_text_lines(self.thinking_content, width=90):
|
|
text.append(f" {line}\n", style="#6b6b6b italic")
|
|
|
|
return text
|
|
|
|
|
|
class ToolMessage(Static):
|
|
"""Tool execution message"""
|
|
|
|
# Standard tool icon and color (pa theme)
|
|
TOOL_ICON = "$"
|
|
TOOL_COLOR = "#9a9a9a" # spirit gray
|
|
|
|
def __init__(self, tool_name: str, args: str = "", **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.tool_name = tool_name
|
|
self.tool_args = args
|
|
|
|
def render(self) -> Text:
|
|
text = Text()
|
|
text.append(f"{self.TOOL_ICON} ", style=self.TOOL_COLOR)
|
|
text.append(f"{self.tool_name}", style=self.TOOL_COLOR)
|
|
text.append("\n", style="")
|
|
|
|
# Wrap args
|
|
if self.tool_args:
|
|
for line in wrap_text_lines(self.tool_args, width=110):
|
|
text.append(f" {line}\n", style="#6b6b6b")
|
|
|
|
return text
|
|
|
|
|
|
class ToolResultMessage(Static):
|
|
"""Tool result/output message"""
|
|
|
|
RESULT_ICON = "#"
|
|
RESULT_COLOR = "#7a7a7a"
|
|
|
|
def __init__(self, tool_name: str, result: str = "", **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.tool_name = tool_name
|
|
self.result = result
|
|
|
|
def render(self) -> Text:
|
|
text = Text()
|
|
text.append(f"{self.RESULT_ICON} ", style=self.RESULT_COLOR)
|
|
text.append(f"{self.tool_name} output", style=self.RESULT_COLOR)
|
|
text.append("\n", style="")
|
|
|
|
if self.result:
|
|
for line in wrap_text_lines(self.result, width=110):
|
|
text.append(f" {line}\n", style="#5a5a5a")
|
|
|
|
return text
|
|
|
|
|
|
class AssistantMessage(Static):
|
|
"""Assistant response message"""
|
|
|
|
def __init__(self, content: str, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.message_content = content
|
|
|
|
def render(self) -> Text:
|
|
text = Text()
|
|
text.append(">> ", style="#9a9a9a")
|
|
text.append("PentestAgent\n", style="bold #d4d4d4")
|
|
|
|
# Wrap content
|
|
for line in wrap_text_lines(self.message_content, width=90):
|
|
text.append(f" {line}\n", style="#d4d4d4")
|
|
|
|
return text
|
|
|
|
|
|
class UserMessage(Static):
|
|
"""User message"""
|
|
|
|
def __init__(self, content: str, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.message_content = content
|
|
|
|
def render(self) -> Text:
|
|
text = Text()
|
|
text.append("> ", style="#9a9a9a")
|
|
text.append("You\n", style="bold #d4d4d4")
|
|
text.append(f" {self.message_content}\n", style="#d4d4d4")
|
|
return text
|
|
|
|
|
|
class SystemMessage(Static):
|
|
"""System message"""
|
|
|
|
def __init__(self, content: str, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.message_content = content
|
|
|
|
def render(self) -> Text:
|
|
text = Text()
|
|
for line in self.message_content.split("\n"):
|
|
text.append(f" {line}\n", style="#6b6b6b") # phantom - subtle system text
|
|
return text
|
|
|
|
|
|
# ----- Status Bar -----
|
|
|
|
|
|
class StatusBar(Static):
|
|
"""Animated status bar"""
|
|
|
|
status = reactive("idle")
|
|
mode = reactive("assist") # "assist" or "agent"
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self._frame = 0
|
|
self._timer: Optional[Timer] = None
|
|
|
|
def on_mount(self) -> None:
|
|
self._timer = self.set_interval(0.2, self._tick)
|
|
|
|
def _tick(self) -> None:
|
|
self._frame = (self._frame + 1) % 4
|
|
if self.status not in ["idle", "complete"]:
|
|
self.refresh()
|
|
|
|
def render(self) -> Text:
|
|
dots = "." * (self._frame + 1)
|
|
|
|
# Use fixed-width labels (pad dots to 4 chars so text doesn't jump)
|
|
dots_padded = dots.ljust(4)
|
|
|
|
# PA theme status colors (muted, ethereal)
|
|
status_map = {
|
|
"idle": ("Ready", "#6b6b6b"),
|
|
"initializing": (f"Initializing{dots_padded}", "#9a9a9a"),
|
|
"thinking": (f"Thinking{dots_padded}", "#9a9a9a"),
|
|
"running": (f"Running{dots_padded}", "#9a9a9a"),
|
|
"processing": (f"Processing{dots_padded}", "#9a9a9a"),
|
|
"waiting": ("Waiting for input", "#9a9a9a"),
|
|
"complete": ("Complete", "#4a9f6e"),
|
|
"error": ("Error", "#9f4a4a"),
|
|
}
|
|
|
|
label, color = status_map.get(self.status, (self.status, "#6b6b6b"))
|
|
|
|
text = Text()
|
|
|
|
# Show mode (ASCII-safe symbols)
|
|
if self.mode == "crew":
|
|
text.append(" :: Crew ", style="#9a9a9a")
|
|
elif self.mode == "agent":
|
|
text.append(" >> Agent ", style="#9a9a9a")
|
|
else:
|
|
text.append(" >> Assist ", style="#9a9a9a")
|
|
|
|
text.append(f"| {label}", style=color)
|
|
|
|
if self.status not in ["idle", "initializing", "complete", "error"]:
|
|
text.append(" ESC to stop", style="#525252")
|
|
|
|
return text
|
|
|
|
|
|
class MemoryDiagnostics(Static):
|
|
"""Live memory diagnostics widget mounted into the chat area.
|
|
|
|
This widget polls the agent's LLM memory stats periodically and
|
|
renders a compact, updating diagnostics panel.
|
|
"""
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self._timer: Optional[Timer] = None
|
|
|
|
def on_mount(self) -> None:
|
|
# Refresh periodically for a lively display
|
|
self._timer = self.set_interval(0.8, self.refresh)
|
|
|
|
def on_unmount(self) -> None:
|
|
if self._timer:
|
|
self._timer.stop()
|
|
|
|
def _bar(self, ratio: float, width: int = 20) -> str:
|
|
filled = int(max(0, min(1.0, ratio)) * width)
|
|
return "█" * filled + "░" * (width - filled)
|
|
|
|
def render(self) -> Text:
|
|
text = Text()
|
|
|
|
try:
|
|
app = self.app
|
|
agent = getattr(app, "agent", None)
|
|
if not agent or not getattr(agent, "llm", None):
|
|
text.append("Memory Diagnostics\n", style="bold #d4d4d4")
|
|
text.append("Agent not initialized", style="#9a9a9a")
|
|
return text
|
|
|
|
stats = agent.llm.get_memory_stats()
|
|
msgs = len(agent.conversation_history)
|
|
llm_msgs = agent._format_messages_for_llm()
|
|
current_tokens = agent.llm.memory.get_total_tokens(llm_msgs)
|
|
|
|
budget = stats.get("token_budget") or 1
|
|
thresh = stats.get("summarize_threshold") or budget
|
|
recent_keep = stats.get("recent_to_keep", 5)
|
|
has_summary = stats.get("has_summary", False)
|
|
summarized_count = stats.get("summarized_message_count", 0)
|
|
|
|
# Header
|
|
text.append("Memory Diagnostics\n", style="bold #d4d4d4")
|
|
|
|
# Use a consistent bar width for all bars and align labels
|
|
bar_width = 28
|
|
labels = ["Tokens:", "Messages:", "Retention:"]
|
|
label_width = max(len(label_text) for label_text in labels)
|
|
|
|
# Tokens line
|
|
ratio = current_tokens / max(1, budget)
|
|
bar = self._bar(ratio, width=bar_width)
|
|
label = "Tokens:".ljust(label_width)
|
|
text.append(
|
|
f"{label} [{bar}] {current_tokens:,} / {budget:,}\n", style="#9a9a9a"
|
|
)
|
|
|
|
# Messages line (scale messages to an expected max window)
|
|
expected_msgs_max = max(1, recent_keep * 6)
|
|
mratio = min(1.0, msgs / expected_msgs_max)
|
|
mbar = self._bar(mratio, width=bar_width)
|
|
label = "Messages:".ljust(label_width)
|
|
text.append(f"{label} [{mbar}] {msgs} active\n", style="#9a9a9a")
|
|
|
|
# Retention / recent
|
|
k_ratio = min(1.0, recent_keep / max(1, recent_keep))
|
|
keep_bar = self._bar(k_ratio, width=bar_width)
|
|
label = "Retention:".ljust(label_width)
|
|
text.append(
|
|
f"{label} [{keep_bar}] keeping last {recent_keep}\n", style="#9a9a9a"
|
|
)
|
|
|
|
# Summary status
|
|
summary_state = "active" if has_summary else "inactive"
|
|
emoji = "🟢" if has_summary else "🔴"
|
|
text.append(f"Summary: {emoji} {summary_state}\n", style="#9a9a9a")
|
|
|
|
# Summarized / threshold
|
|
text.append(
|
|
f"Summarized: {summarized_count} / {thresh:,}\n", style="#9a9a9a"
|
|
)
|
|
text.append(f"Threshold: {thresh:,}\n", style="#9a9a9a")
|
|
|
|
except Exception as e:
|
|
text.append(f"Memory diagnostics error: {e}", style="#9a9a9a")
|
|
|
|
return text
|
|
|
|
|
|
class TokenDiagnostics(Static):
|
|
"""Live token/cost diagnostics panel mounted into the chat area.
|
|
|
|
Reads persisted daily usage from the token_tracker, computes cost
|
|
using environment variables, and displays a simple ASCII progress bar.
|
|
"""
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self._timer: Optional[Timer] = None
|
|
|
|
def on_mount(self) -> None:
|
|
# Refresh periodically for a lively display
|
|
self._timer = self.set_interval(1.0, self.refresh)
|
|
|
|
def on_unmount(self) -> None:
|
|
if self._timer:
|
|
self._timer.stop()
|
|
|
|
def _bar(self, ratio: float, width: int = 28) -> str:
|
|
"""Block-style usage bar matching MemoryDiagnostics visuals."""
|
|
r = max(0.0, min(1.0, ratio))
|
|
filled = int(r * width)
|
|
return "█" * filled + "░" * (width - filled)
|
|
|
|
def render(self) -> Text:
|
|
text = Text()
|
|
try:
|
|
import os
|
|
|
|
# Lazy import of token_tracker (best-effort)
|
|
try:
|
|
from ..tools import token_tracker
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to import token_tracker: %s", e)
|
|
try:
|
|
from ..interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: token tracker import failed: {e}")
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about token_tracker import failure: %s", e)
|
|
token_tracker = None
|
|
|
|
text.append("Token Usage Diagnostics\n", style="bold #d4d4d4")
|
|
|
|
if not token_tracker:
|
|
text.append(
|
|
"Token tracker not available (tools/token_tracker).\n",
|
|
style="#9a9a9a",
|
|
)
|
|
return text
|
|
|
|
stats = token_tracker.get_stats_sync()
|
|
|
|
# If a reset is pending (date changed), perform a reset now so daily
|
|
# usage is accurate and visible to the user.
|
|
reset_occurred = False
|
|
if stats.get("reset_pending"):
|
|
try:
|
|
token_tracker.record_usage_sync(0, 0)
|
|
stats = token_tracker.get_stats_sync()
|
|
reset_occurred = True
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Token tracker reset failed: %s", e)
|
|
try:
|
|
from ..interface.notifier import notify
|
|
|
|
notify("warning", f"Token tracker reset failed: {e}")
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about token tracker reset failure: %s", e)
|
|
|
|
# Extract values
|
|
last_in = int(stats.get("last_input_tokens", 0) or 0)
|
|
last_out = int(stats.get("last_output_tokens", 0) or 0)
|
|
last_total = int(stats.get("last_total_tokens", 0) or 0)
|
|
daily_usage = int(stats.get("daily_usage", 0) or 0)
|
|
last_reset = stats.get("last_reset_date")
|
|
current_date = stats.get("current_date")
|
|
|
|
# (env parsing moved below)
|
|
|
|
# Environment cost config
|
|
def _parse_env(name: str):
|
|
v = os.getenv(name)
|
|
if v is None or v == "":
|
|
return None
|
|
try:
|
|
return float(v)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).debug("Failed to parse env var %s: %s", name, e)
|
|
return "INVALID"
|
|
|
|
unified = _parse_env("COST_PER_MILLION")
|
|
input_cost_per_m = _parse_env("INPUT_COST_PER_MILLION")
|
|
output_cost_per_m = _parse_env("OUTPUT_COST_PER_MILLION")
|
|
daily_limit = _parse_env("DAILY_TOKEN_LIMIT")
|
|
|
|
# Determine if any env-based limits exist
|
|
has_env_limits = any(
|
|
v is not None
|
|
for v in (unified, input_cost_per_m, output_cost_per_m, daily_limit)
|
|
)
|
|
|
|
# If nothing has been recorded yet (no tokens, no daily usage)
|
|
# and no env limits are configured, show the concise sentinel only.
|
|
if last_total == 0 and daily_usage == 0 and not has_env_limits:
|
|
text.append("No token usage recorded\n", style="#9a9a9a")
|
|
return text
|
|
|
|
# Validate env vars
|
|
env_errors = []
|
|
if unified == "INVALID":
|
|
env_errors.append("COST_PER_MILLION is not numeric")
|
|
if input_cost_per_m == "INVALID":
|
|
env_errors.append("INPUT_COST_PER_MILLION is not numeric")
|
|
if output_cost_per_m == "INVALID":
|
|
env_errors.append("OUTPUT_COST_PER_MILLION is not numeric")
|
|
if daily_limit == "INVALID":
|
|
env_errors.append("DAILY_TOKEN_LIMIT is not numeric")
|
|
|
|
if env_errors:
|
|
text.append("Environment configuration errors:\n", style="#ef4444")
|
|
for e in env_errors:
|
|
text.append(f" - {e}\n", style="#9a9a9a")
|
|
text.append(
|
|
"\nSet environment variables correctly to compute costs.\n",
|
|
style="#9a9a9a",
|
|
)
|
|
return text
|
|
|
|
# Compute costs
|
|
if unified is not None:
|
|
# Use unified cost for both input and output
|
|
input_cost = (last_in / 1_000_000.0) * float(unified)
|
|
output_cost = (last_out / 1_000_000.0) * float(unified)
|
|
else:
|
|
# Require per-direction costs to be present to compute
|
|
if input_cost_per_m is None or output_cost_per_m is None:
|
|
text.append(
|
|
"Cost vars missing. Set COST_PER_MILLION or both INPUT_COST_PER_MILLION and OUTPUT_COST_PER_MILLION.\n",
|
|
style="#9a9a9a",
|
|
)
|
|
# Still show numeric token stats below
|
|
input_cost = output_cost = None
|
|
else:
|
|
input_cost = (last_in / 1_000_000.0) * float(input_cost_per_m)
|
|
output_cost = (last_out / 1_000_000.0) * float(output_cost_per_m)
|
|
|
|
total_cost = None
|
|
if input_cost is not None and output_cost is not None:
|
|
total_cost = input_cost + output_cost
|
|
|
|
# Daily budget calculations per spec
|
|
# Derive daily usage excluding last command (in case tracker already included it)
|
|
daily_without_last = max(daily_usage - last_total, 0)
|
|
new_daily_total = daily_without_last + last_total
|
|
|
|
remaining_tokens = None
|
|
percent_used = None
|
|
if daily_limit is not None:
|
|
try:
|
|
dl = float(daily_limit)
|
|
remaining_tokens = max(int(dl - new_daily_total), 0)
|
|
percent_used = (new_daily_total / max(1.0, dl)) * 100.0
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to compute daily limit values: %s", e)
|
|
try:
|
|
from ..interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to compute daily token limit: {e}")
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about daily limit computation failure: %s", e)
|
|
remaining_tokens = None
|
|
|
|
# Render structured panel with aligned labels and block bars
|
|
bar_width = 28
|
|
labels = [
|
|
"Last command:",
|
|
"Cost:",
|
|
"Daily usage:",
|
|
"Remaining:",
|
|
"Usage:",
|
|
"Last reset:",
|
|
"Current date:",
|
|
"Reset occurred:",
|
|
]
|
|
label_width = max(len(label_text) for label_text in labels)
|
|
|
|
# Last command tokens
|
|
label = "Last command:".ljust(label_width)
|
|
text.append(
|
|
f"{label} in={last_in:,} out={last_out:,} total={last_total:,}\n",
|
|
style="#9a9a9a",
|
|
)
|
|
|
|
# Cost line
|
|
label = "Cost:".ljust(label_width)
|
|
if input_cost is not None and output_cost is not None:
|
|
text.append(
|
|
f"{label} in=${input_cost:.6f} out=${output_cost:.6f} total=${total_cost:.6f}\n",
|
|
style="#9a9a9a",
|
|
)
|
|
else:
|
|
text.append(
|
|
f"{label} not computed (missing env vars)\n",
|
|
style="#9a9a9a",
|
|
)
|
|
|
|
# Daily usage
|
|
label = "Daily usage:".ljust(label_width)
|
|
text.append(f"{label} {new_daily_total:,}\n", style="#9a9a9a")
|
|
|
|
# Remaining tokens
|
|
label = "Remaining:".ljust(label_width)
|
|
if remaining_tokens is not None:
|
|
text.append(f"{label} {remaining_tokens:,}\n", style="#9a9a9a")
|
|
else:
|
|
text.append(
|
|
f"{label} N/A (DAILY_TOKEN_LIMIT not set)\n",
|
|
style="#9a9a9a",
|
|
)
|
|
|
|
# Usage percent + bar
|
|
label = "Usage:".ljust(label_width)
|
|
if percent_used is not None:
|
|
bar = self._bar(percent_used / 100.0, width=bar_width)
|
|
text.append(
|
|
f"{label} [{bar}] {percent_used:.1f}%\n",
|
|
style="#9a9a9a",
|
|
)
|
|
else:
|
|
text.append(f"{label} N/A\n", style="#9a9a9a")
|
|
|
|
# Dates
|
|
label = "Last reset:".ljust(label_width)
|
|
text.append(f"{label} {last_reset}\n", style="#9a9a9a")
|
|
label = "Current date:".ljust(label_width)
|
|
text.append(f"{label} {current_date}\n", style="#9a9a9a")
|
|
|
|
# Reset occurrence
|
|
label = "Reset occurred:".ljust(label_width)
|
|
text.append(
|
|
f"{label} {'Yes' if reset_occurred else 'No'}\n",
|
|
style="#9a9a9a",
|
|
)
|
|
|
|
except Exception as e:
|
|
text.append(f"Token diagnostics error: {e}\n", style="#9a9a9a")
|
|
|
|
return text
|
|
|
|
|
|
# ----- Main TUI App -----
|
|
|
|
|
|
class PentestAgentTUI(App):
|
|
"""Main PentestAgent TUI Application"""
|
|
|
|
# ═══════════════════════════════════════════════════════════
|
|
# PA THEME - Ethereal grays
|
|
# ═══════════════════════════════════════════════════════════
|
|
# Void: #0a0a0a (terminal black - the darkness)
|
|
# Shadow: #121212 (subtle surface)
|
|
# Mist: #1a1a1a (panels, elevated)
|
|
# Whisper: #262626 (default borders)
|
|
# Fog: #3a3a3a (hover states)
|
|
# Apparition: #525252 (focus states)
|
|
# Phantom: #6b6b6b (secondary text)
|
|
# Spirit: #9a9a9a (normal text)
|
|
# Specter: #d4d4d4 (primary text)
|
|
# Ectoplasm: #f0f0f0 (highlights)
|
|
# ═══════════════════════════════════════════════════════════
|
|
|
|
CSS = """
|
|
Screen {
|
|
background: #0a0a0a;
|
|
}
|
|
|
|
#main-container {
|
|
width: 100%;
|
|
height: 100%;
|
|
layout: horizontal;
|
|
}
|
|
|
|
/* Chat area - takes full width normally, fills remaining space with sidebar */
|
|
#chat-area {
|
|
width: 1fr;
|
|
height: 100%;
|
|
}
|
|
|
|
#chat-area.with-sidebar {
|
|
width: 1fr;
|
|
}
|
|
|
|
#chat-scroll {
|
|
width: 100%;
|
|
height: 1fr;
|
|
background: transparent;
|
|
padding: 1 2;
|
|
scrollbar-background: #1a1a1a;
|
|
scrollbar-background-hover: #1a1a1a;
|
|
scrollbar-background-active: #1a1a1a;
|
|
scrollbar-color: #3a3a3a;
|
|
scrollbar-color-hover: #3a3a3a;
|
|
scrollbar-color-active: #3a3a3a;
|
|
scrollbar-corner-color: #1a1a1a;
|
|
scrollbar-size: 1 1;
|
|
}
|
|
|
|
#input-container {
|
|
width: 100%;
|
|
height: 3;
|
|
background: transparent;
|
|
border: round #262626;
|
|
margin: 0 2;
|
|
padding: 0;
|
|
layout: horizontal;
|
|
align-vertical: middle;
|
|
}
|
|
|
|
#input-container:focus-within {
|
|
border: round #525252;
|
|
}
|
|
|
|
#input-container:focus-within #chat-prompt {
|
|
color: #d4d4d4;
|
|
}
|
|
|
|
#chat-prompt {
|
|
width: auto;
|
|
height: 100%;
|
|
padding: 0 0 0 1;
|
|
color: #6b6b6b;
|
|
content-align-vertical: middle;
|
|
}
|
|
|
|
#chat-input {
|
|
width: 1fr;
|
|
height: 100%;
|
|
background: transparent;
|
|
border: none;
|
|
padding: 0;
|
|
margin: 0;
|
|
color: #d4d4d4;
|
|
}
|
|
|
|
#chat-input:focus {
|
|
border: none;
|
|
}
|
|
|
|
#chat-input > .input--placeholder {
|
|
color: #6b6b6b;
|
|
text-style: italic;
|
|
}
|
|
|
|
#status-bar {
|
|
width: 100%;
|
|
height: 1;
|
|
background: transparent;
|
|
padding: 0 3;
|
|
margin: 0;
|
|
}
|
|
|
|
.message {
|
|
margin-bottom: 1;
|
|
}
|
|
|
|
/* Sidebar - hidden by default */
|
|
#sidebar {
|
|
width: 28;
|
|
height: 100%;
|
|
display: none;
|
|
padding-right: 1;
|
|
}
|
|
|
|
#sidebar.visible {
|
|
display: block;
|
|
}
|
|
|
|
#workers-tree {
|
|
height: 1fr;
|
|
background: transparent;
|
|
border: round #262626;
|
|
padding: 0 1;
|
|
margin-bottom: 0;
|
|
}
|
|
|
|
#workers-tree:focus {
|
|
border: round #3a3a3a;
|
|
}
|
|
|
|
#crew-stats {
|
|
height: auto;
|
|
max-height: 10;
|
|
background: transparent;
|
|
border: round #262626;
|
|
border-title-color: #9a9a9a;
|
|
border-title-style: bold;
|
|
padding: 0 1;
|
|
margin-top: 0;
|
|
}
|
|
|
|
Tree {
|
|
background: transparent;
|
|
color: #d4d4d4;
|
|
scrollbar-background: #1a1a1a;
|
|
scrollbar-background-hover: #1a1a1a;
|
|
scrollbar-background-active: #1a1a1a;
|
|
scrollbar-color: #3a3a3a;
|
|
scrollbar-color-hover: #3a3a3a;
|
|
scrollbar-color-active: #3a3a3a;
|
|
scrollbar-size: 1 1;
|
|
}
|
|
|
|
Tree > .tree--cursor {
|
|
background: transparent;
|
|
}
|
|
|
|
Tree > .tree--highlight {
|
|
background: transparent;
|
|
}
|
|
|
|
Tree > .tree--highlight-line {
|
|
background: transparent;
|
|
}
|
|
|
|
.tree--node-label {
|
|
padding: 0 1;
|
|
}
|
|
|
|
.tree--node:hover .tree--node-label {
|
|
background: transparent;
|
|
}
|
|
|
|
.tree--node.-selected .tree--node-label {
|
|
background: transparent;
|
|
color: #d4d4d4;
|
|
}
|
|
"""
|
|
|
|
BINDINGS = [
|
|
Binding("ctrl+q", "quit_app", "Quit", priority=True),
|
|
Binding("ctrl+c", "stop_agent", "Stop", priority=True, show=False),
|
|
Binding("escape", "stop_agent", "Stop", priority=True),
|
|
Binding("f1", "show_help", "Help"),
|
|
Binding("tab", "focus_next", "Next", show=False),
|
|
Binding("up", "history_up", "Prev", show=False),
|
|
Binding("down", "history_down", "Next", show=False),
|
|
]
|
|
|
|
TITLE = "PentestAgent"
|
|
SUB_TITLE = "AI Penetration Testing"
|
|
|
|
def __init__(
|
|
self,
|
|
target: Optional[str] = None,
|
|
model: Optional[str] = None,
|
|
use_docker: bool = False,
|
|
**kwargs,
|
|
):
|
|
super().__init__(**kwargs)
|
|
self.target = target
|
|
self.model = model or DEFAULT_MODEL
|
|
self.use_docker = use_docker
|
|
|
|
# Agent components
|
|
self.agent: Optional["PentestAgentAgent"] = None
|
|
self.runtime = None
|
|
self.mcp_manager = None
|
|
self.all_tools = []
|
|
self.rag_engine = None # RAG engine
|
|
|
|
# State
|
|
self._mode = "assist" # "assist", "agent", or "crew"
|
|
self._is_running = False
|
|
self._is_initializing = True # Block input during init
|
|
self._should_stop = False
|
|
# Worker handle returned by `@work` or an `asyncio.Task` (keep generic)
|
|
self._current_worker: Optional[Any] = None # Track running worker for cancellation
|
|
self._current_crew = None # Track crew orchestrator for cancellation
|
|
|
|
# Crew mode state
|
|
self._crew_workers: Dict[str, Dict[str, Any]] = {}
|
|
self._crew_worker_nodes: Dict[str, TreeNode] = {}
|
|
self._crew_orchestrator_node: Optional[TreeNode] = None
|
|
self._crew_findings_count = 0
|
|
self._viewing_worker_id: Optional[str] = None
|
|
self._worker_events: Dict[str, List[Dict]] = {}
|
|
self._crew_start_time: Optional[float] = None
|
|
self._crew_tokens_used: int = 0
|
|
self._crew_stats_timer: Optional[Timer] = None
|
|
self._spinner_timer: Optional[Timer] = None
|
|
self._spinner_frame: int = 0
|
|
self._spinner_frames = [
|
|
"⠋",
|
|
"⠙",
|
|
"⠹",
|
|
"⠸",
|
|
"⠼",
|
|
"⠴",
|
|
"⠦",
|
|
"⠧",
|
|
"⠇",
|
|
"⠏",
|
|
] # Braille dots spinner
|
|
|
|
# Command history
|
|
self._cmd_history: List[str] = []
|
|
self._history_index: int = 0
|
|
|
|
def compose(self) -> ComposeResult:
|
|
with Horizontal(id="main-container"):
|
|
# Chat area (left side)
|
|
with Vertical(id="chat-area"):
|
|
# Persistent header shown above the chat scroll so operator
|
|
# always sees runtime/mode/target information.
|
|
yield Static("", id="header")
|
|
yield ScrollableContainer(id="chat-scroll")
|
|
yield StatusBar(id="status-bar")
|
|
with Horizontal(id="input-container"):
|
|
yield Static("> ", id="chat-prompt")
|
|
yield Input(placeholder="Enter task or type /help", id="chat-input")
|
|
|
|
# Sidebar (right side, hidden by default)
|
|
with Vertical(id="sidebar"):
|
|
yield CrewTree("CREW", id="workers-tree")
|
|
yield Static("", id="crew-stats")
|
|
|
|
async def on_mount(self) -> None:
|
|
"""Initialize on mount"""
|
|
# Register notifier callback so other modules can emit operator-visible messages
|
|
try:
|
|
from .notifier import register_callback
|
|
|
|
register_callback(self._notifier_callback)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to register TUI notifier callback: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to register notifier callback: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about notifier registration failure: %s", ne)
|
|
|
|
# Call the textual worker - decorator returns a Worker, not a coroutine
|
|
_ = cast(Any, self._initialize_agent())
|
|
|
|
@work(thread=False)
|
|
async def _initialize_agent(self) -> None:
|
|
"""Initialize agent"""
|
|
self._set_status("initializing")
|
|
|
|
try:
|
|
import os
|
|
|
|
from ..agents.pa_agent import PentestAgentAgent
|
|
from ..knowledge import RAGEngine
|
|
from ..llm import LLM, ModelConfig
|
|
from ..mcp import MCPManager
|
|
from ..runtime.docker_runtime import DockerRuntime
|
|
from ..runtime.runtime import LocalRuntime
|
|
from ..tools import get_all_tools, register_tool_instance
|
|
|
|
# RAG Engine - auto-load knowledge sources
|
|
rag_doc_count = 0
|
|
knowledge_path = None
|
|
|
|
# Check local knowledge dir first (must have files, not just exist)
|
|
local_knowledge = Path("knowledge")
|
|
bundled_path = Path(__file__).parent.parent / "knowledge" / "sources"
|
|
|
|
if local_knowledge.exists() and any(local_knowledge.rglob("*.*")):
|
|
knowledge_path = local_knowledge
|
|
elif bundled_path.exists():
|
|
knowledge_path = bundled_path
|
|
|
|
if knowledge_path:
|
|
try:
|
|
# Determine embedding method: env var > auto-detect
|
|
embeddings_setting = os.getenv(
|
|
"PENTESTAGENT_EMBEDDINGS", ""
|
|
).lower()
|
|
if embeddings_setting == "local":
|
|
use_local = True
|
|
elif embeddings_setting == "openai":
|
|
use_local = False
|
|
else:
|
|
# Auto: use OpenAI if key available, else local
|
|
use_local = not os.getenv("OPENAI_API_KEY")
|
|
|
|
self.rag_engine = RAGEngine(
|
|
knowledge_path=knowledge_path, use_local_embeddings=use_local
|
|
)
|
|
await asyncio.to_thread(self.rag_engine.index)
|
|
rag_doc_count = self.rag_engine.get_document_count()
|
|
except Exception as e:
|
|
self._add_system(f"[!] RAG: {e}")
|
|
self.rag_engine = None
|
|
|
|
# Runtime - Docker or Local
|
|
if self.use_docker:
|
|
self._add_system("+ Starting Docker container...")
|
|
self.runtime = DockerRuntime()
|
|
else:
|
|
self.runtime = LocalRuntime()
|
|
await self.runtime.start()
|
|
|
|
# LLM
|
|
# Validate model/runtime presence and provide a user-friendly error
|
|
if not self.model:
|
|
self._add_system(
|
|
"[!] No model configured. Set PENTESTAGENT_MODEL environment variable or create a .env file (see .env.example)."
|
|
)
|
|
self._set_status("error")
|
|
self._is_initializing = False
|
|
return
|
|
|
|
if not self.runtime:
|
|
self._add_system("[!] Runtime failed to initialize.")
|
|
self._set_status("error")
|
|
self._is_initializing = False
|
|
return
|
|
|
|
llm = LLM(
|
|
model=self.model,
|
|
config=ModelConfig(temperature=0.7),
|
|
rag_engine=self.rag_engine,
|
|
)
|
|
|
|
# Tools
|
|
self.all_tools = get_all_tools() # Ensure tools are loaded
|
|
|
|
# Agent
|
|
self.agent = PentestAgentAgent(
|
|
llm=llm,
|
|
tools=self.all_tools,
|
|
runtime=self.runtime,
|
|
target=self.target,
|
|
rag_engine=self.rag_engine,
|
|
)
|
|
|
|
|
|
try:
|
|
from ..mcp import MCPManager
|
|
|
|
self.mcp_manager = MCPManager()
|
|
# Start background connect registering tools on the background to not block.
|
|
|
|
async def load_mcp() -> None:
|
|
tools = await self.mcp_manager.connect_all()
|
|
self.agent.add_tools(tools)
|
|
for tool in tools:
|
|
register_tool_instance(tool)
|
|
self.all_tools = get_all_tools()
|
|
self._update_header()
|
|
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
loop.create_task(load_mcp())
|
|
except RuntimeError:
|
|
# No running loop (unlikely in Textual worker), run in thread
|
|
try:
|
|
asyncio.run(load_mcp())
|
|
except Exception:
|
|
pass
|
|
mcp_server_count = len(self.mcp_manager.list_configured_servers())
|
|
except Exception:
|
|
self.mcp_manager = None
|
|
mcp_server_count = 0
|
|
|
|
|
|
|
|
self._set_status("idle", "assist")
|
|
self._is_initializing = False # Allow input now
|
|
|
|
# Show ready message
|
|
tools_str = ", ".join(t.name for t in self.all_tools[:5])
|
|
if len(self.all_tools) > 5:
|
|
tools_str += f", +{len(self.all_tools) - 5} more"
|
|
|
|
runtime_str = "Docker" if self.use_docker else "Local"
|
|
# Update persistent header instead of adding an ad-hoc system
|
|
# message to the chat. This keeps the info visible at top.
|
|
self.mcp_server_count = mcp_server_count
|
|
self.rag_doc_count = rag_doc_count
|
|
try:
|
|
self._update_header(model_line=(
|
|
f"+ PentestAgent ready\n"
|
|
f" Model: {self.model} | Tools: {len(self.all_tools)} | MCP: {mcp_server_count} | RAG: {rag_doc_count}\n"
|
|
f" Runtime: {runtime_str} | Mode: Assist (use /agent or /crew for autonomous modes)"
|
|
))
|
|
except Exception:
|
|
# Fallback to previous behavior if header widget isn't available
|
|
self._add_system(
|
|
f"+ PentestAgent ready\n"
|
|
f" Model: {self.model} | Tools: {len(self.all_tools)} | MCP: {mcp_server_count} | RAG: {rag_doc_count}\n"
|
|
f" Runtime: {runtime_str} | Mode: Assist (use /agent or /crew for autonomous modes)"
|
|
)
|
|
|
|
# Also update header with target if present
|
|
if self.target:
|
|
try:
|
|
self._update_header(target=self.target)
|
|
except Exception:
|
|
self._add_system(f" Target: {self.target}")
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
self._add_system(f"[!] Init failed: {e}\n{traceback.format_exc()}")
|
|
self._set_status("error")
|
|
self._is_initializing = False # Allow input even on error
|
|
|
|
def _set_status(self, status: str, mode: Optional[str] = None) -> None:
|
|
"""Update status bar"""
|
|
try:
|
|
bar = self.query_one("#status-bar", StatusBar)
|
|
bar.status = status
|
|
if mode:
|
|
bar.mode = mode
|
|
self._mode = mode
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to update status bar: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to update status bar: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about status bar update failure: %s", ne)
|
|
|
|
def _show_notification(self, level: str, message: str) -> None:
|
|
"""Display a short operator-visible notification in the chat area."""
|
|
try:
|
|
# Prepend a concise system message so it is visible in the chat
|
|
prefix = "[!]" if level.lower() in ("error", "critical") else "[!]"
|
|
self._add_system(f"{prefix} {message}")
|
|
# Set status bar to error briefly for emphasis
|
|
if level.lower() in ("error", "critical"):
|
|
self._set_status("error")
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to show notification in TUI: %s", e)
|
|
|
|
def _notifier_callback(self, level: str, message: str) -> None:
|
|
"""Callback wired to `pentestagent.interface.notifier`.
|
|
|
|
This will be registered on mount so other modules can emit notifications.
|
|
"""
|
|
try:
|
|
# textual apps typically run in the main thread; try to schedule update
|
|
# using call_from_thread if available, otherwise call directly.
|
|
if hasattr(self, "call_from_thread"):
|
|
try:
|
|
self.call_from_thread(self._show_notification, level, message)
|
|
return
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("call_from_thread failed in notifier callback: %s", e)
|
|
# Fall through to direct call
|
|
self._show_notification(level, message)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Exception in notifier callback handling: %s", e)
|
|
|
|
def _add_message(self, widget: Static) -> None:
|
|
"""Add a message widget to chat"""
|
|
try:
|
|
scroll = self.query_one("#chat-scroll", ScrollableContainer)
|
|
widget.add_class("message")
|
|
scroll.mount(widget)
|
|
scroll.scroll_end(animate=False)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to add message to chat: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to add chat message: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about add_message failure: %s", ne)
|
|
|
|
def _add_system(self, content: str) -> None:
|
|
self._add_message(SystemMessage(content))
|
|
|
|
def _add_user(self, content: str) -> None:
|
|
self._add_message(UserMessage(content))
|
|
|
|
def _add_assistant(self, content: str) -> None:
|
|
self._add_message(AssistantMessage(content))
|
|
|
|
def _add_thinking(self, content: str) -> None:
|
|
self._add_message(ThinkingMessage(content))
|
|
|
|
def _add_tool(self, name: str, action: str = "") -> None:
|
|
self._add_message(ToolMessage(name, action))
|
|
|
|
def _add_tool_result(self, name: str, result: str) -> None:
|
|
"""Display tool execution result"""
|
|
# Hide tool output - LLM will synthesize it in its response
|
|
# This prevents duplication and keeps the chat clean
|
|
pass
|
|
|
|
def _show_system_prompt(self) -> None:
|
|
"""Display the current system prompt"""
|
|
if self.agent:
|
|
prompt = self.agent.get_system_prompt()
|
|
self._add_system(f"=== System Prompt ===\n{prompt}")
|
|
else:
|
|
self._add_system("Agent not initialized")
|
|
|
|
def _show_memory_stats(self) -> None:
|
|
"""Mount a live memory diagnostics widget into the chat area."""
|
|
try:
|
|
scroll = self.query_one("#chat-scroll", ScrollableContainer)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to query chat-scroll for memory diagnostics: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: memory diagnostics unavailable: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about memory diagnostics availability: %s", ne)
|
|
self._add_system("Agent not initialized")
|
|
return
|
|
# Mount a new diagnostics panel with a unique ID and scroll into view
|
|
try:
|
|
import uuid
|
|
|
|
panel_id = f"memory-diagnostics-{uuid.uuid4().hex}"
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to generate memory diagnostics panel id: %s", e)
|
|
panel_id = None
|
|
|
|
widget = MemoryDiagnostics(id=panel_id)
|
|
scroll.mount(widget)
|
|
try:
|
|
scroll.scroll_end(animate=False)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to scroll to memory diagnostics panel: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to scroll to memory diagnostics panel: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about scroll failure: %s", ne)
|
|
|
|
def _show_token_stats(self) -> None:
|
|
"""Mount a live token diagnostics widget into the chat area."""
|
|
try:
|
|
scroll = self.query_one("#chat-scroll", ScrollableContainer)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to query chat-scroll for token diagnostics: %s", e)
|
|
try:
|
|
from ..interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: token diagnostics unavailable: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about token diagnostics availability: %s", ne)
|
|
self._add_system("Agent not initialized")
|
|
return
|
|
# Mount a new diagnostics panel with a unique ID and scroll into view
|
|
try:
|
|
import uuid
|
|
|
|
panel_id = f"token-diagnostics-{uuid.uuid4().hex}"
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to generate token diagnostics panel id: %s", e)
|
|
try:
|
|
from ..interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to generate token diagnostics panel id: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about token diagnostics panel id generation failure: %s", ne)
|
|
panel_id = None
|
|
|
|
widget = TokenDiagnostics(id=panel_id)
|
|
scroll.mount(widget)
|
|
try:
|
|
scroll.scroll_end(animate=False)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to scroll to token diagnostics panel: %s", e)
|
|
try:
|
|
from ..interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to scroll to token diagnostics panel: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about token diagnostics scroll failure: %s", ne)
|
|
|
|
async def _show_notes(self) -> None:
|
|
"""Display saved notes"""
|
|
from ..tools.notes import get_all_notes
|
|
|
|
notes = await get_all_notes()
|
|
if not notes:
|
|
self._add_system(
|
|
"=== Notes ===\nNo notes saved.\n\nThe AI can save key findings using the notes tool."
|
|
)
|
|
return
|
|
|
|
lines = [f"=== Notes ({len(notes)} entries) ==="]
|
|
for key, value in notes.items():
|
|
# Show full value, indent multi-line content
|
|
if "\n" in value:
|
|
indented = str(value).replace("\n", "\n ")
|
|
lines.append(f"\n[{key}]\n {indented}")
|
|
else:
|
|
lines.append(f"[{key}] {value}")
|
|
lines.append("\nFile: loot/notes.json")
|
|
lines.append("Reports: loot/reports/")
|
|
|
|
self._add_system("\n".join(lines))
|
|
|
|
def _build_prior_context(self) -> str:
|
|
"""Build a summary of prior findings for crew mode.
|
|
|
|
Extracts:
|
|
- Tool results (nmap scans, etc.) - the actual findings
|
|
- Assistant analyses - interpretations and summaries
|
|
- Last user task - what they were working on
|
|
|
|
Excludes:
|
|
- Raw user messages (noise)
|
|
- Tool call declarations (just names/args, not results)
|
|
- Very short responses
|
|
"""
|
|
if not self.agent or not self.agent.conversation_history:
|
|
return ""
|
|
|
|
findings = []
|
|
last_user_task = ""
|
|
|
|
for msg in self.agent.conversation_history:
|
|
# Track user tasks/questions
|
|
if msg.role == "user" and msg.content:
|
|
last_user_task = msg.content[:200]
|
|
|
|
# Extract tool results (the actual findings)
|
|
elif msg.tool_results:
|
|
for result in msg.tool_results:
|
|
if result.success and result.result:
|
|
content = (
|
|
result.result[:1500]
|
|
if len(result.result) > 1500
|
|
else result.result
|
|
)
|
|
findings.append(f"[{result.tool_name}]\n{content}")
|
|
|
|
# Include assistant analyses (but not tool call messages)
|
|
elif msg.role == "assistant" and msg.content and not msg.tool_calls:
|
|
if len(msg.content) > 50:
|
|
findings.append(f"[Analysis]\n{msg.content[:1000]}")
|
|
|
|
if not findings and not last_user_task:
|
|
return ""
|
|
|
|
# Build context with last user task + recent findings
|
|
parts = []
|
|
if last_user_task:
|
|
parts.append(f"Last task: {last_user_task}")
|
|
if findings:
|
|
parts.append("Findings:\n" + "\n\n".join(findings[-5:]))
|
|
|
|
context = "\n\n".join(parts)
|
|
if len(context) > 4000:
|
|
context = context[:4000] + "\n... (truncated)"
|
|
|
|
return context
|
|
|
|
def _set_target(self, cmd: str) -> None:
|
|
"""Set the target for the engagement"""
|
|
# Remove /target prefix
|
|
target = cmd[7:].strip()
|
|
|
|
if not target:
|
|
if self.target:
|
|
self._add_system(
|
|
f"Current target: {self.target}\nUsage: /target <host>"
|
|
)
|
|
else:
|
|
self._add_system(
|
|
"No target set.\nUsage: /target <host>\nExample: /target 192.168.1.1"
|
|
)
|
|
return
|
|
|
|
self.target = target
|
|
|
|
# Update agent's target if agent exists
|
|
if self.agent:
|
|
self.agent.target = target
|
|
try:
|
|
from pentestagent.agents.base_agent import AgentMessage
|
|
|
|
# Inform the agent (LLM) that the operator changed the target so
|
|
# subsequent generations use the new value instead of older
|
|
# workspace-bound targets from conversation history.
|
|
self.agent.conversation_history.append(
|
|
AgentMessage(role="system", content=f"Operator set target to {target}")
|
|
)
|
|
# Track manual target override so workspace restores can remove
|
|
# or supersede this message when appropriate.
|
|
try:
|
|
setattr(self.agent, "_manual_target", target)
|
|
except Exception:
|
|
pass
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception(
|
|
"Failed to append target change to agent history: %s", e
|
|
)
|
|
|
|
# Persist to active workspace if present
|
|
try:
|
|
from pentestagent.workspaces.manager import WorkspaceManager
|
|
|
|
wm = WorkspaceManager()
|
|
active = wm.get_active()
|
|
if active:
|
|
try:
|
|
wm.set_last_target(active, target)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to persist last target for workspace %s: %s", active, e)
|
|
try:
|
|
from pentestagent.interface.notifier import notify
|
|
|
|
notify("warning", f"Failed to persist last target for workspace {active}: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about target persist error: %s", ne)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to access WorkspaceManager to persist last target: %s", e)
|
|
try:
|
|
from pentestagent.interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to persist last target: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about WorkspaceManager access failure: %s", ne)
|
|
|
|
# Update displayed Target in the UI
|
|
try:
|
|
self._apply_target_display(target)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to apply target display: %s", e)
|
|
try:
|
|
from pentestagent.interface.notifier import notify
|
|
|
|
notify("warning", f"Failed to update target display: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about target display error: %s", ne)
|
|
# Update the initial ready SystemMessage (if present) so Target appears under Runtime
|
|
try:
|
|
scroll = self.query_one("#chat-scroll", ScrollableContainer)
|
|
updated = False
|
|
for child in scroll.children:
|
|
if isinstance(child, SystemMessage) and "PentestAgent ready" in getattr(
|
|
child, "message_content", ""
|
|
):
|
|
# Replace existing Target line if present, otherwise append
|
|
try:
|
|
if "Target:" in child.message_content:
|
|
# replace the first Target line
|
|
child.message_content = re.sub(
|
|
r"(?m)^\s*Target:.*$",
|
|
f" Target: {target}",
|
|
child.message_content,
|
|
count=1,
|
|
)
|
|
else:
|
|
child.message_content = (
|
|
child.message_content + f"\n Target: {target}"
|
|
)
|
|
try:
|
|
child.refresh()
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to refresh child message after target update: %s", e)
|
|
try:
|
|
from pentestagent.interface.notifier import notify
|
|
|
|
notify("warning", f"Failed to refresh UI after target update: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about UI refresh error: %s", ne)
|
|
except Exception as e:
|
|
# Fallback to append if regex replacement fails, and surface warning
|
|
logging.getLogger(__name__).exception("Failed to update SystemMessage target line: %s", e)
|
|
try:
|
|
from pentestagent.interface.notifier import notify
|
|
|
|
notify("warning", f"Failed to update target display: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about target update error: %s", ne)
|
|
child.message_content = (
|
|
child.message_content + f"\n Target: {target}"
|
|
)
|
|
updated = True
|
|
break
|
|
if not updated:
|
|
# If we couldn't find an existing banner SystemMessage to
|
|
# update, update the persistent header instead of inserting
|
|
# additional in-chat system messages to avoid duplicates.
|
|
try:
|
|
self._update_header(target=target)
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to update persistent header with target")
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed while applying target display: %s", e)
|
|
try:
|
|
from pentestagent.interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: failed while updating target display: {e}")
|
|
except Exception as ne:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about target display outer failure: %s", ne)
|
|
# Last resort: append a subtle system line
|
|
self._add_system(f" Target: {target}")
|
|
|
|
@work(exclusive=True)
|
|
async def _run_report_generation(self) -> None:
|
|
"""Generate a pentest report from notes and conversation"""
|
|
from pathlib import Path
|
|
|
|
from ..tools.notes import get_all_notes
|
|
|
|
if not self.agent or not self.agent.llm:
|
|
self._add_system("[!] Agent not initialized")
|
|
return
|
|
|
|
notes = await get_all_notes()
|
|
if not notes:
|
|
self._add_system(
|
|
"No notes found. PentestAgent saves findings using the notes tool during testing."
|
|
)
|
|
return
|
|
|
|
self._add_system("Generating report...")
|
|
|
|
# Format notes
|
|
notes_text = "\n".join(f"### {k}\n{v}\n" for k, v in notes.items())
|
|
|
|
# Build conversation summary from full history
|
|
conversation_summary = ""
|
|
if self.agent.conversation_history:
|
|
# Summarize key actions from conversation
|
|
actions = []
|
|
for msg in self.agent.conversation_history:
|
|
if msg.role == "assistant" and msg.tool_calls:
|
|
for tc in msg.tool_calls:
|
|
actions.append(f"- Tool: {tc.name}")
|
|
elif msg.role == "tool_result" and msg.tool_results:
|
|
for tr in msg.tool_results:
|
|
# Include truncated result
|
|
result = tr.result or ""
|
|
output = result[:200] + "..." if len(result) > 200 else result
|
|
actions.append(f" Result: {output}")
|
|
if actions:
|
|
conversation_summary = "\n".join(actions[-30:]) # Last 30 actions
|
|
|
|
report_prompt = f"""Generate a penetration test report in Markdown from the notes below.
|
|
|
|
# Notes
|
|
{notes_text}
|
|
|
|
# Activity Log
|
|
{conversation_summary if conversation_summary else "N/A"}
|
|
|
|
# Target
|
|
{self.target or "Not specified"}
|
|
|
|
Output a report with:
|
|
1. Executive Summary (2-3 sentences)
|
|
2. Findings (use notes, include severity: Critical/High/Medium/Low/Info)
|
|
3. Recommendations
|
|
|
|
Be concise. Use the actual data from notes."""
|
|
|
|
try:
|
|
report_content = await self.agent.llm.simple_completion(
|
|
prompt=report_prompt,
|
|
system="You are a penetration tester writing a security report. Be concise and factual.",
|
|
)
|
|
|
|
if not report_content or not report_content.strip():
|
|
self._add_system(
|
|
"[!] Report generation returned empty. Check LLM connection."
|
|
)
|
|
return
|
|
|
|
# Save to loot/reports/
|
|
reports_dir = Path("loot/reports")
|
|
reports_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Append Shadow Graph if available
|
|
try:
|
|
from ..knowledge.graph import ShadowGraph
|
|
from ..tools.notes import get_all_notes_sync
|
|
|
|
# Rehydrate graph from notes
|
|
graph = ShadowGraph()
|
|
notes = get_all_notes_sync()
|
|
if notes:
|
|
graph.update_from_notes(notes)
|
|
mermaid_code = graph.to_mermaid()
|
|
|
|
if mermaid_code:
|
|
report_content += (
|
|
"\n\n## Attack Graph (Visual)\n\n```mermaid\n"
|
|
+ mermaid_code
|
|
+ "\n```\n"
|
|
)
|
|
except Exception as e:
|
|
self._add_system(f"[!] Graph generation error: {e}")
|
|
|
|
timestamp = datetime.now().strftime("%Y-%m-%d_%H%M%S")
|
|
report_path = reports_dir / f"report_{timestamp}.md"
|
|
report_path.write_text(report_content, encoding="utf-8")
|
|
|
|
self._add_system(f"+ Report saved: {report_path}")
|
|
|
|
except Exception as e:
|
|
self._add_system(f"[!] Report error: {e}")
|
|
|
|
@on(Input.Submitted, "#chat-input")
|
|
async def handle_submit(self, event: Input.Submitted) -> None:
|
|
"""Handle input submission"""
|
|
# Block input while initializing or AI is processing
|
|
if self._is_initializing or self._is_running:
|
|
return
|
|
|
|
# Strip ANSI escape sequences and control codes
|
|
message = _ANSI_ESCAPE.sub("", event.value).strip()
|
|
if not message:
|
|
return
|
|
|
|
# Save to history (de-duplicate consecutive duplicates)
|
|
if not (self._cmd_history and self._cmd_history[-1] == message):
|
|
self._cmd_history.append(message)
|
|
# Reset index to one past the end (blank)
|
|
self._history_index = len(self._cmd_history)
|
|
|
|
event.input.value = ""
|
|
|
|
# Commands
|
|
if message.startswith("/"):
|
|
await self._handle_command(message)
|
|
return
|
|
|
|
self._add_user(message)
|
|
|
|
# Hide crew sidebar when entering assist mode
|
|
self._hide_sidebar()
|
|
|
|
# Use assist mode by default
|
|
if self.agent and not self._is_running:
|
|
# Schedule assist run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
|
|
self._current_worker = self._run_assist(message)
|
|
|
|
async def _handle_command(self, cmd: str) -> None:
|
|
"""Handle slash commands"""
|
|
cmd_lower = cmd.lower().strip()
|
|
cmd_original = cmd.strip()
|
|
|
|
if cmd_lower in ["/help", "/h", "/?"]:
|
|
await self.push_screen(HelpScreen())
|
|
elif cmd_lower == "/clear":
|
|
scroll = self.query_one("#chat-scroll", ScrollableContainer)
|
|
await scroll.remove_children()
|
|
self._hide_sidebar()
|
|
# Clear agent conversation history for fresh start
|
|
if self.agent:
|
|
self.agent.conversation_history.clear()
|
|
self._add_system("Chat cleared")
|
|
elif cmd_lower == "/tools":
|
|
# Open the interactive tools browser (split-pane).
|
|
try:
|
|
await self.push_screen(ToolsScreen(tools=self.all_tools))
|
|
except Exception:
|
|
# Fallback: list tools in the system area if UI push fails
|
|
from ..runtime.runtime import detect_environment
|
|
|
|
names = [t.name for t in self.all_tools]
|
|
msg = f"Tools ({len(names)}): " + ", ".join(names)
|
|
|
|
# Add detected CLI tools
|
|
env = detect_environment()
|
|
if env.available_tools:
|
|
# Group by category
|
|
by_category = {}
|
|
for tool_info in env.available_tools:
|
|
if tool_info.category not in by_category:
|
|
by_category[tool_info.category] = []
|
|
by_category[tool_info.category].append(tool_info.name)
|
|
|
|
cli_sections = []
|
|
for category in sorted(by_category.keys()):
|
|
tools_list = ", ".join(sorted(by_category[category]))
|
|
cli_sections.append(f"{category}: {tools_list}")
|
|
|
|
msg += f"\n\nCLI Tools ({len(env.available_tools)}):\n" + "\n".join(
|
|
cli_sections
|
|
)
|
|
|
|
self._add_system(msg)
|
|
|
|
elif cmd_lower.startswith("/mcp"):
|
|
await self._parse_mcp_command(cmd_original)
|
|
elif cmd_lower in ["/quit", "/exit", "/q"]:
|
|
self.exit()
|
|
elif cmd_lower == "/prompt":
|
|
self._show_system_prompt()
|
|
elif cmd_lower == "/memory":
|
|
self._show_memory_stats()
|
|
elif cmd_lower == "/token":
|
|
self._show_token_stats()
|
|
elif cmd_lower == "/notes":
|
|
await self._show_notes()
|
|
elif cmd_lower == "/report":
|
|
# Call the textual worker - decorator returns a Worker
|
|
_ = cast(Any, self._run_report_generation())
|
|
elif cmd_original.startswith("/target"):
|
|
self._set_target(cmd_original)
|
|
elif cmd_original.startswith("/workspace"):
|
|
# Support lightweight workspace management from the TUI
|
|
try:
|
|
|
|
from pentestagent.workspaces.manager import (
|
|
WorkspaceError,
|
|
WorkspaceManager,
|
|
)
|
|
from pentestagent.workspaces.utils import resolve_knowledge_paths
|
|
|
|
wm = WorkspaceManager()
|
|
rest = cmd_original[len("/workspace") :].strip()
|
|
|
|
if not rest:
|
|
active = wm.get_active()
|
|
if not active:
|
|
self._add_system("No active workspace.")
|
|
else:
|
|
# restore last target if present
|
|
last = wm.get_meta_field(active, "last_target")
|
|
if last:
|
|
self.target = last
|
|
if self.agent:
|
|
self.agent.target = last
|
|
# If the operator set a manual target while no
|
|
# workspace was active, remove/supersede that
|
|
# system message so the LLM uses the workspace
|
|
# target instead of the stale manual one.
|
|
try:
|
|
manual = getattr(self.agent, "_manual_target", None)
|
|
if manual:
|
|
self.agent.conversation_history = [
|
|
m
|
|
for m in self.agent.conversation_history
|
|
if not (
|
|
m.role == "system"
|
|
and isinstance(m.content, str)
|
|
and m.content.strip().startswith("Operator set target to")
|
|
)
|
|
]
|
|
try:
|
|
delattr(self.agent, "_manual_target")
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
try:
|
|
from pentestagent.agents.base_agent import AgentMessage
|
|
|
|
self.agent.conversation_history.append(
|
|
AgentMessage(
|
|
role="system",
|
|
content=f"Workspace active; restored last target: {last}",
|
|
)
|
|
)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self._apply_target_display(last)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to apply target display when restoring last target: %s", e)
|
|
try:
|
|
from pentestagent.interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: failed restoring last target display: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about restore-last-target failure")
|
|
self._add_system(f"Active workspace: {active}")
|
|
return
|
|
|
|
parts = rest.split()
|
|
verb = parts[0].lower()
|
|
|
|
if verb == "help":
|
|
try:
|
|
await self.push_screen(WorkspaceHelpScreen())
|
|
except Exception:
|
|
# Fallback: show inline help text
|
|
self._add_system(
|
|
"Usage: /workspace <action>\nCommands: list, info, note, clear, help, <name>"
|
|
)
|
|
return
|
|
|
|
if verb == "list":
|
|
wss = wm.list_workspaces()
|
|
if not wss:
|
|
self._add_system("No workspaces found.")
|
|
return
|
|
out = []
|
|
active = wm.get_active()
|
|
for name in sorted(wss):
|
|
prefix = "* " if name == active else " "
|
|
out.append(f"{prefix}{name}")
|
|
self._add_system("\n".join(out))
|
|
return
|
|
|
|
if verb == "info":
|
|
name = parts[1] if len(parts) > 1 else wm.get_active()
|
|
if not name:
|
|
self._add_system("No workspace specified and no active workspace.")
|
|
return
|
|
try:
|
|
meta = wm.get_meta(name)
|
|
created = meta.get("created_at")
|
|
last_active = meta.get("last_active_at")
|
|
targets = meta.get("targets", [])
|
|
kp = resolve_knowledge_paths()
|
|
ks = "workspace" if kp.get("using_workspace") else "global"
|
|
self._add_system(
|
|
f"Name: {name}\nCreated: {created}\nLast active: {last_active}\nTargets: {len(targets)}\nKnowledge scope: {ks}"
|
|
)
|
|
except Exception as e:
|
|
self._add_system(f"Error retrieving workspace info: {e}")
|
|
return
|
|
|
|
if verb == "note":
|
|
# By default, use the active workspace; allow explicit override via --workspace/-w.
|
|
name = wm.get_active()
|
|
i = 1
|
|
# Parse optional workspace selector flags before the note text.
|
|
while i < len(parts):
|
|
part = parts[i]
|
|
if part in ("--workspace", "-w"):
|
|
if i + 1 >= len(parts):
|
|
self._add_system("Usage: /workspace note [--workspace NAME] <text>")
|
|
return
|
|
name = parts[i + 1]
|
|
i += 2
|
|
continue
|
|
# First non-option token marks the start of the note text
|
|
break
|
|
if not name:
|
|
self._add_system("No active workspace. Set one with /workspace <name>.")
|
|
return
|
|
text = " ".join(parts[i:])
|
|
if not text:
|
|
self._add_system("Usage: /workspace note [--workspace NAME] <text>")
|
|
return
|
|
try:
|
|
wm.set_operator_note(name, text)
|
|
self._add_system(f"Operator note saved for workspace '{name}'.")
|
|
except Exception as e:
|
|
self._add_system(f"Error saving note: {e}")
|
|
return
|
|
|
|
if verb == "clear":
|
|
active = wm.get_active()
|
|
if not active:
|
|
self._add_system("No active workspace.")
|
|
return
|
|
marker = wm.active_marker()
|
|
try:
|
|
if marker.exists():
|
|
marker.unlink()
|
|
# Clear TUI and agent target when workspace is deactivated
|
|
self.target = ""
|
|
try:
|
|
self._apply_target_display("")
|
|
except Exception:
|
|
pass
|
|
if self.agent:
|
|
try:
|
|
# Clear agent's target and any manual override
|
|
self.agent.target = ""
|
|
try:
|
|
if hasattr(self.agent, "_manual_target"):
|
|
delattr(self.agent, "_manual_target")
|
|
except Exception:
|
|
pass
|
|
from pentestagent.agents.base_agent import AgentMessage
|
|
|
|
self.agent.conversation_history.append(
|
|
AgentMessage(
|
|
role="system",
|
|
content=(
|
|
f"Workspace '{active}' deactivated; cleared target"
|
|
),
|
|
)
|
|
)
|
|
except Exception:
|
|
logging.getLogger(__name__).exception(
|
|
"Failed to clear agent target on workspace clear"
|
|
)
|
|
self._add_system(f"Workspace '{active}' deactivated.")
|
|
except Exception as e:
|
|
self._add_system(f"Error deactivating workspace: {e}")
|
|
return
|
|
|
|
# Default: treat rest as workspace name -> create (only if missing) and set active
|
|
name = rest
|
|
try:
|
|
existed = wm.workspace_path(name).exists()
|
|
if not existed:
|
|
wm.create(name)
|
|
wm.set_active(name)
|
|
# restore last target if set on workspace
|
|
last = wm.get_meta_field(name, "last_target")
|
|
if last:
|
|
self.target = last
|
|
if self.agent:
|
|
self.agent.target = last
|
|
try:
|
|
self._apply_target_display(last)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to apply target display when activating workspace: %s", e)
|
|
try:
|
|
from pentestagent.interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to restore workspace target display: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about workspace target restore failure")
|
|
|
|
if existed:
|
|
self._add_system(f"Workspace '{name}' set active.")
|
|
else:
|
|
self._add_system(f"Workspace '{name}' created and set active.")
|
|
except WorkspaceError as e:
|
|
self._add_system(f"Error: {e}")
|
|
except Exception as e:
|
|
self._add_system(f"Error creating workspace: {e}")
|
|
except Exception as e:
|
|
self._add_system(f"Workspace command error: {e}")
|
|
return
|
|
elif cmd_original.startswith("/agent"):
|
|
await self._parse_agent_command(cmd_original)
|
|
elif cmd_original.startswith("/crew"):
|
|
await self._parse_crew_command(cmd_original)
|
|
else:
|
|
self._add_system(f"Unknown command: {cmd}\nType /help for commands.")
|
|
|
|
async def _parse_agent_command(self, cmd: str) -> None:
|
|
"""Parse and execute /agent command"""
|
|
|
|
# Remove /agent prefix
|
|
rest = cmd[6:].strip()
|
|
|
|
if not rest:
|
|
self._add_system(
|
|
"Usage: /agent <task>\n"
|
|
"Example: /agent scan 192.168.1.1\n"
|
|
" /agent enumerate SSH on target"
|
|
)
|
|
return
|
|
|
|
task = rest
|
|
|
|
if not task:
|
|
self._add_system("Error: No task provided. Usage: /agent <task>")
|
|
return
|
|
|
|
self._add_user(f"/agent {task}")
|
|
self._add_system(">> Agent Mode")
|
|
|
|
# Hide crew sidebar when entering agent mode
|
|
self._hide_sidebar()
|
|
|
|
if self.agent and not self._is_running:
|
|
# Schedule agent mode and keep task handle
|
|
# Schedule agent run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
|
|
self._current_worker = self._run_agent_mode(task)
|
|
|
|
async def _parse_crew_command(self, cmd: str) -> None:
|
|
"""Parse and execute /crew command"""
|
|
# Remove /crew prefix
|
|
rest = cmd[5:].strip()
|
|
|
|
if not rest:
|
|
self._add_system(
|
|
"Usage: /crew <task>\n"
|
|
"Example: /crew https://example.com\n"
|
|
" /crew 192.168.1.100\n\n"
|
|
"Crew mode spawns specialized workers in parallel:\n"
|
|
" - recon: Reconnaissance and mapping\n"
|
|
" - sqli: SQL injection testing\n"
|
|
" - xss: Cross-site scripting testing\n"
|
|
" - ssrf: Server-side request forgery\n"
|
|
" - auth: Authentication testing\n"
|
|
" - idor: Insecure direct object references\n"
|
|
" - info: Information disclosure"
|
|
)
|
|
return
|
|
|
|
target = rest
|
|
|
|
if not self._is_running:
|
|
self._add_user(f"/crew {target}")
|
|
self._show_sidebar()
|
|
# Schedule crew mode and keep handle
|
|
# Schedule crew run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
|
|
self._current_worker = self._run_crew_mode(target)
|
|
|
|
async def _parse_mcp_command(self, cmd: str) -> None:
|
|
# Remove /agent prefix
|
|
rest = cmd[len("/mcp"):].strip()
|
|
|
|
if not rest:
|
|
self._add_system(
|
|
"Usage: /mcp <command>\n"
|
|
"Example: /mcp list \n"
|
|
" /mcp add"
|
|
)
|
|
return
|
|
|
|
action = rest
|
|
|
|
if action == 'list':
|
|
if self.mcp_manager:
|
|
|
|
# Open the interactive mcp browser (split-pane).
|
|
try:
|
|
await self.push_screen(MCPScreen(mcp_manager=self.mcp_manager))
|
|
except Exception:
|
|
pass
|
|
elif action.startswith("add"):
|
|
|
|
from ..tools import get_all_tools, register_tool_instance
|
|
|
|
args = rest[len("add"):].strip()
|
|
|
|
# Parse the args string into individual components
|
|
parts = args.split()
|
|
if len(parts) < 2:
|
|
self._add_system("Usage: /mcp add <name> <command> [args...]")
|
|
return
|
|
|
|
name = parts[0]
|
|
command = parts[1]
|
|
mcp_args = parts[2:] if len(parts) > 2 else []
|
|
|
|
self.mcp_manager.add_server(
|
|
name=name,
|
|
command=command,
|
|
args=mcp_args,
|
|
)
|
|
|
|
server = await self.mcp_manager.connect_server(name)
|
|
|
|
self.mcp_server_count = len(self.mcp_manager.list_configured_servers())
|
|
|
|
tools = self.mcp_manager.create_mcp_tools_from_server(server)
|
|
|
|
self.agent.add_tools(tools)
|
|
|
|
for tool in tools:
|
|
register_tool_instance(tool)
|
|
|
|
self.all_tools = get_all_tools()
|
|
self._update_header()
|
|
|
|
|
|
if not action:
|
|
self._add_system("Error: No action provided. Usage: /mcp <command>")
|
|
return
|
|
|
|
|
|
def _show_sidebar(self) -> None:
|
|
"""Show the sidebar for crew mode."""
|
|
try:
|
|
import time
|
|
|
|
sidebar = self.query_one("#sidebar")
|
|
sidebar.add_class("visible")
|
|
|
|
chat_area = self.query_one("#chat-area")
|
|
chat_area.add_class("with-sidebar")
|
|
|
|
# Setup tree
|
|
tree = self.query_one("#workers-tree", CrewTree)
|
|
tree.root.expand()
|
|
tree.show_root = False
|
|
|
|
# Clear old nodes
|
|
tree.root.remove_children()
|
|
self._crew_worker_nodes.clear()
|
|
self._crew_workers.clear()
|
|
self._worker_events.clear()
|
|
self._crew_findings_count = 0
|
|
|
|
# Start tracking time and tokens
|
|
self._crew_start_time = time.time()
|
|
self._crew_tokens_used = 0
|
|
|
|
# Start stats timer (update every second)
|
|
if self._crew_stats_timer:
|
|
self._crew_stats_timer.stop()
|
|
self._crew_stats_timer = self.set_interval(1.0, self._update_crew_stats)
|
|
|
|
# Start spinner timer for running workers (faster interval for smooth animation)
|
|
if self._spinner_timer:
|
|
self._spinner_timer.stop()
|
|
self._spinner_timer = self.set_interval(0.15, self._update_spinner)
|
|
|
|
# Add crew root node (no orchestrator - just "CREW" header)
|
|
self._crew_orchestrator_node = tree.root.add(
|
|
"CREW", data={"type": "crew", "id": "crew"}
|
|
)
|
|
if self._crew_orchestrator_node:
|
|
try:
|
|
self._crew_orchestrator_node.expand()
|
|
tree.select_node(self._crew_orchestrator_node)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to expand/select crew orchestrator node: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to expand crew sidebar node: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about crew node expansion failure")
|
|
self._viewing_worker_id = None
|
|
|
|
# Update stats
|
|
self._update_crew_stats()
|
|
except Exception as e:
|
|
self._add_system(f"[!] Sidebar error: {e}")
|
|
|
|
def _apply_target_display(self, target: str) -> None:
|
|
"""Update or insert the Target line in the system/banner area."""
|
|
try:
|
|
scroll = self.query_one("#chat-scroll", ScrollableContainer)
|
|
updated = False
|
|
for child in scroll.children:
|
|
if isinstance(child, SystemMessage) and "PentestAgent ready" in getattr(
|
|
child, "message_content", ""
|
|
):
|
|
# Replace existing Target line if present, otherwise append
|
|
try:
|
|
if "Target:" in child.message_content:
|
|
child.message_content = re.sub(
|
|
r"(?m)^\s*Target:.*$",
|
|
f" Target: {target}",
|
|
child.message_content,
|
|
count=1,
|
|
)
|
|
else:
|
|
child.message_content = (
|
|
child.message_content + f"\n Target: {target}"
|
|
)
|
|
try:
|
|
child.refresh()
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to refresh child message: %s", e)
|
|
try:
|
|
from pentestagent.interface.notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to refresh UI element: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about child refresh failure")
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to update SystemMessage target line: %s", e)
|
|
try:
|
|
from pentestagent.interface.notifier import notify
|
|
|
|
notify("warning", f"Failed to update target display: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about target update error")
|
|
child.message_content = (
|
|
child.message_content + f"\n Target: {target}"
|
|
)
|
|
updated = True
|
|
break
|
|
if not updated:
|
|
try:
|
|
self._update_header(target=target)
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to update persistent header with target")
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed updating in-scroll target display: %s", e)
|
|
# Also update the persistent header so the target is always visible
|
|
try:
|
|
self._update_header(target=target)
|
|
except Exception:
|
|
pass
|
|
|
|
def _update_header(self, model_line: Optional[str] = None, target: Optional[str] = None) -> None:
|
|
"""Compose and update the persistent header widget."""
|
|
try:
|
|
header = self.query_one("#header", Static)
|
|
# Build header text from provided pieces or current state
|
|
lines: List[str] = []
|
|
if model_line:
|
|
lines.append(model_line)
|
|
else:
|
|
# try to recreate a compact model/runtime line
|
|
runtime_str = "Docker" if getattr(self, "use_docker", False) else "Local"
|
|
tools_count = len(getattr(self, "all_tools", [])) if hasattr(self, "all_tools") else 0
|
|
mode = getattr(self, '_mode', 'assist')
|
|
if mode == 'assist':
|
|
mode = 'assist (use /agent or /crew for autonomous modes)'
|
|
lines.append(
|
|
f"+ PentestAgent ready\n Model: {getattr(self, 'model', '')} | Tools: {tools_count} | MCP: {getattr(self, 'mcp_server_count', '')} | RAG: {getattr(self, 'rag_doc_count', '')}\n Runtime: {runtime_str} | Mode: {mode}"
|
|
)
|
|
# Ensure target line is present/updated
|
|
if target is None:
|
|
target = getattr(self, "target", "")
|
|
if target:
|
|
# append target on its own line
|
|
lines.append(f" Target: {target}")
|
|
|
|
header.update("\n".join(lines))
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
self._add_system(f" Target: {target}")
|
|
|
|
def _hide_sidebar(self) -> None:
|
|
"""Hide the sidebar."""
|
|
try:
|
|
# Stop stats timer
|
|
if self._crew_stats_timer:
|
|
self._crew_stats_timer.stop()
|
|
self._crew_stats_timer = None
|
|
|
|
sidebar = self.query_one("#sidebar")
|
|
sidebar.remove_class("visible")
|
|
|
|
chat_area = self.query_one("#chat-area")
|
|
chat_area.remove_class("with-sidebar")
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Sidebar error: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: sidebar error: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about sidebar error")
|
|
|
|
def _update_crew_stats(self) -> None:
|
|
"""Update crew stats panel."""
|
|
try:
|
|
import time
|
|
|
|
text = Text()
|
|
|
|
# Elapsed time
|
|
text.append("Time: ", style="bold #d4d4d4")
|
|
if self._crew_start_time:
|
|
elapsed = time.time() - self._crew_start_time
|
|
if elapsed < 60:
|
|
time_str = f"{int(elapsed)}s"
|
|
elif elapsed < 3600:
|
|
mins = int(elapsed // 60)
|
|
secs = int(elapsed % 60)
|
|
time_str = f"{mins}m {secs}s"
|
|
else:
|
|
hrs = int(elapsed // 3600)
|
|
mins = int((elapsed % 3600) // 60)
|
|
time_str = f"{hrs}h {mins}m"
|
|
text.append(time_str, style="#9a9a9a")
|
|
else:
|
|
text.append("--", style="#525252")
|
|
|
|
text.append("\n")
|
|
|
|
# Tokens used
|
|
text.append("Tokens: ", style="bold #d4d4d4")
|
|
if self._crew_tokens_used > 0:
|
|
if self._crew_tokens_used >= 1000:
|
|
token_str = f"{self._crew_tokens_used / 1000:.1f}k"
|
|
else:
|
|
token_str = str(self._crew_tokens_used)
|
|
text.append(token_str, style="#9a9a9a")
|
|
else:
|
|
text.append("--", style="#525252")
|
|
|
|
stats = self.query_one("#crew-stats", Static)
|
|
stats.update(text)
|
|
stats.border_title = "# Stats"
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to hide sidebar: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to hide sidebar: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about hide_sidebar failure")
|
|
|
|
def _update_spinner(self) -> None:
|
|
"""Update spinner animation for running workers."""
|
|
try:
|
|
# Advance spinner frame
|
|
self._spinner_frame += 1
|
|
|
|
# Only update labels for running workers (efficient)
|
|
has_running = False
|
|
for worker_id, worker in self._crew_workers.items():
|
|
if worker.get("status") == "running":
|
|
has_running = True
|
|
# Update the tree node label
|
|
if worker_id in self._crew_worker_nodes:
|
|
node = self._crew_worker_nodes[worker_id]
|
|
node.set_label(self._format_worker_label(worker_id))
|
|
|
|
# Stop spinner if no workers are running (save resources)
|
|
if not has_running and self._spinner_timer:
|
|
self._spinner_timer.stop()
|
|
self._spinner_timer = None
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to update crew stats: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to update crew stats: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about crew stats update failure")
|
|
|
|
def _add_crew_worker(self, worker_id: str, worker_type: str, task: str) -> None:
|
|
"""Add a worker to the sidebar tree."""
|
|
self._crew_workers[worker_id] = {
|
|
"worker_type": worker_type,
|
|
"task": task,
|
|
"status": "pending",
|
|
"findings": 0,
|
|
}
|
|
|
|
try:
|
|
label = self._format_worker_label(worker_id)
|
|
if self._crew_orchestrator_node:
|
|
node = self._crew_orchestrator_node.add(
|
|
label, data={"type": "worker", "id": worker_id}
|
|
)
|
|
self._crew_worker_nodes[worker_id] = node
|
|
try:
|
|
self._crew_orchestrator_node.expand()
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to expand crew orchestrator node: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to expand crew node: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about crew node expansion failure")
|
|
self._update_crew_stats()
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to update spinner: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to update spinner: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about spinner update failure")
|
|
|
|
def _update_crew_worker(self, worker_id: str, **updates) -> None:
|
|
"""Update a worker's state."""
|
|
if worker_id not in self._crew_workers:
|
|
return
|
|
|
|
self._crew_workers[worker_id].update(updates)
|
|
|
|
# Restart spinner if a worker started running
|
|
if updates.get("status") == "running" and not self._spinner_timer:
|
|
self._spinner_timer = self.set_interval(0.15, self._update_spinner)
|
|
|
|
try:
|
|
if worker_id in self._crew_worker_nodes:
|
|
label = self._format_worker_label(worker_id)
|
|
self._crew_worker_nodes[worker_id].set_label(label)
|
|
self._update_crew_stats()
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to add crew worker node: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to add crew worker node: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about add_crew_worker failure")
|
|
|
|
def _format_worker_label(self, worker_id: str) -> Text:
|
|
"""Format worker label for tree."""
|
|
worker = self._crew_workers.get(worker_id, {})
|
|
status = worker.get("status", "pending")
|
|
wtype = worker.get("worker_type", "worker")
|
|
findings = worker.get("findings", 0)
|
|
|
|
# 4-state icons: working (braille), done (checkmark), warning (!), error (X)
|
|
if status in ("running", "pending"):
|
|
# Animated braille spinner for all in-progress states
|
|
icon = self._spinner_frames[self._spinner_frame % len(self._spinner_frames)]
|
|
color = "#d4d4d4" # white
|
|
elif status == "complete":
|
|
icon = "✓"
|
|
color = "#22c55e" # green
|
|
elif status == "warning":
|
|
icon = "!"
|
|
color = "#f59e0b" # amber/orange
|
|
else: # error, cancelled, unknown
|
|
icon = "✗"
|
|
color = "#ef4444" # red
|
|
|
|
text = Text()
|
|
text.append(f"{icon} ", style=color)
|
|
text.append(wtype.upper(), style="bold")
|
|
|
|
if status == "complete" and findings > 0:
|
|
text.append(f" [{findings}]", style="#22c55e") # green
|
|
elif status in ("error", "cancelled"):
|
|
# Don't append " !" here since we already have the X icon
|
|
pass
|
|
|
|
return text
|
|
|
|
def _handle_worker_event(
|
|
self, worker_id: str, event_type: str, data: Dict[str, Any]
|
|
) -> None:
|
|
"""Handle worker events from CrewAgent - updates tree sidebar only."""
|
|
try:
|
|
if event_type == "spawn":
|
|
worker_type = data.get("worker_type", "unknown")
|
|
task = data.get("task", "")
|
|
self._add_crew_worker(worker_id, worker_type, task)
|
|
elif event_type == "status":
|
|
status = data.get("status", "running")
|
|
self._update_crew_worker(worker_id, status=status)
|
|
elif event_type == "tool":
|
|
# Add tool as child node under the agent
|
|
tool_name = data.get("tool", "unknown")
|
|
self._add_tool_to_worker(worker_id, tool_name)
|
|
elif event_type == "tokens":
|
|
# Track token usage
|
|
tokens = data.get("tokens", 0)
|
|
self._crew_tokens_used += tokens
|
|
elif event_type == "complete":
|
|
findings_count = data.get("findings_count", 0)
|
|
self._update_crew_worker(
|
|
worker_id, status="complete", findings=findings_count
|
|
)
|
|
self._crew_findings_count += findings_count
|
|
self._update_crew_stats()
|
|
elif event_type == "warning":
|
|
# Worker hit max iterations but has results
|
|
self._update_crew_worker(worker_id, status="warning")
|
|
reason = data.get("reason", "Partial completion")
|
|
worker = self._crew_workers.get(worker_id, {})
|
|
wtype = worker.get("worker_type", "worker")
|
|
self._add_system(f"[!] {wtype.upper()} stopped: {reason}")
|
|
self._update_crew_stats()
|
|
elif event_type == "failed":
|
|
# Worker determined task infeasible
|
|
self._update_crew_worker(worker_id, status="failed")
|
|
reason = data.get("reason", "Task infeasible")
|
|
worker = self._crew_workers.get(worker_id, {})
|
|
wtype = worker.get("worker_type", "worker")
|
|
self._add_system(f"[!] {wtype.upper()} failed: {reason}")
|
|
self._update_crew_stats()
|
|
elif event_type == "error":
|
|
self._update_crew_worker(worker_id, status="error")
|
|
worker = self._crew_workers.get(worker_id, {})
|
|
wtype = worker.get("worker_type", "worker")
|
|
error_msg = data.get("error", "Unknown error")
|
|
# Only show errors in chat - they're important
|
|
self._add_system(f"[!] {wtype.upper()} failed: {error_msg}")
|
|
except Exception as e:
|
|
self._add_system(f"[!] Worker event error: {e}")
|
|
|
|
def _add_tool_to_worker(self, worker_id: str, tool_name: str) -> None:
|
|
"""Add a tool usage as child node under worker in tree."""
|
|
try:
|
|
node = self._crew_worker_nodes.get(worker_id)
|
|
if node:
|
|
node.add_leaf(f" {tool_name}")
|
|
node.expand()
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to update crew worker display: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to update crew worker display: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about update_crew_worker failure")
|
|
|
|
@on(Tree.NodeSelected, "#workers-tree")
|
|
def on_worker_tree_selected(self, event: Tree.NodeSelected) -> None:
|
|
"""Handle tree node selection."""
|
|
node = event.node
|
|
if node.data:
|
|
node_type = node.data.get("type")
|
|
if node_type == "crew":
|
|
self._viewing_worker_id = None
|
|
elif node_type == "worker":
|
|
self._viewing_worker_id = node.data.get("id")
|
|
|
|
@work(thread=False)
|
|
async def _run_crew_mode(self, target: str) -> None:
|
|
"""Run crew mode with sidebar."""
|
|
self._is_running = True
|
|
self._should_stop = False
|
|
self._set_status("thinking", "crew")
|
|
|
|
try:
|
|
from ..agents.base_agent import AgentMessage
|
|
from ..agents.crew import CrewOrchestrator
|
|
from ..llm import LLM, ModelConfig
|
|
|
|
# Build prior context from assist/agent conversation history
|
|
prior_context = self._build_prior_context()
|
|
|
|
# Ensure model/runtime are available for static analysis
|
|
assert self.model is not None
|
|
assert self.runtime is not None
|
|
|
|
llm = LLM(model=self.model, config=ModelConfig(temperature=0.7))
|
|
|
|
crew = CrewOrchestrator(
|
|
llm=llm,
|
|
tools=self.all_tools,
|
|
runtime=self.runtime,
|
|
on_worker_event=self._handle_worker_event,
|
|
rag_engine=self.rag_engine,
|
|
target=target,
|
|
prior_context=prior_context,
|
|
)
|
|
self._current_crew = crew # Track for cancellation
|
|
|
|
self._add_system(f"@ Task: {target}")
|
|
|
|
# Track crew results for memory
|
|
crew_report = None
|
|
|
|
async for update in crew.run(target):
|
|
if self._should_stop:
|
|
await crew.cancel()
|
|
self._add_system("[!] Stopped by user")
|
|
break
|
|
|
|
phase = update.get("phase", "")
|
|
|
|
if phase == "starting":
|
|
self._set_status("thinking", "crew")
|
|
|
|
elif phase == "tokens":
|
|
# Track orchestrator token usage
|
|
tokens = update.get("tokens", 0)
|
|
self._crew_tokens_used += tokens
|
|
self._update_crew_stats()
|
|
|
|
elif phase == "thinking":
|
|
# Show the orchestrator's reasoning
|
|
content = update.get("content", "")
|
|
if content:
|
|
self._add_thinking(content)
|
|
|
|
elif phase == "tool_call":
|
|
# Show orchestration tool calls
|
|
tool = update.get("tool", "")
|
|
args = update.get("args", {})
|
|
self._add_tool(tool, str(args))
|
|
|
|
elif phase == "tool_result":
|
|
# Tool results are tracked via worker events
|
|
pass
|
|
|
|
elif phase == "complete":
|
|
crew_report = update.get("report", "")
|
|
if crew_report:
|
|
self._add_assistant(crew_report)
|
|
|
|
elif phase == "error":
|
|
error = update.get("error", "Unknown error")
|
|
self._add_system(f"[!] Crew error: {error}")
|
|
|
|
# Add crew results to main agent's conversation history
|
|
# so assist mode can reference what happened
|
|
if self.agent and crew_report:
|
|
# Add the crew task as a user message
|
|
self.agent.conversation_history.append(
|
|
AgentMessage(
|
|
role="user",
|
|
content=f"[CREW MODE] Run parallel analysis on target: {target}",
|
|
)
|
|
)
|
|
# Add the crew report as assistant response
|
|
self.agent.conversation_history.append(
|
|
AgentMessage(role="assistant", content=crew_report)
|
|
)
|
|
|
|
self._set_status("complete", "crew")
|
|
self._add_system("+ Crew task complete.")
|
|
|
|
# Stop timers
|
|
if self._crew_stats_timer:
|
|
self._crew_stats_timer.stop()
|
|
self._crew_stats_timer = None
|
|
if self._spinner_timer:
|
|
self._spinner_timer.stop()
|
|
self._spinner_timer = None
|
|
|
|
# Clear crew reference
|
|
self._current_crew = None
|
|
|
|
except asyncio.CancelledError:
|
|
# Cancel crew workers first
|
|
if self._current_crew:
|
|
await self._current_crew.cancel()
|
|
self._current_crew = None
|
|
self._add_system("[!] Cancelled")
|
|
self._set_status("idle", "crew")
|
|
# Stop timers on cancel
|
|
if self._crew_stats_timer:
|
|
self._crew_stats_timer.stop()
|
|
self._crew_stats_timer = None
|
|
if self._spinner_timer:
|
|
self._spinner_timer.stop()
|
|
self._spinner_timer = None
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
# Cancel crew workers on error too
|
|
if self._current_crew:
|
|
try:
|
|
await self._current_crew.cancel()
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to add tool to worker node: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to add tool to worker: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about add_tool_to_worker failure")
|
|
self._current_crew = None
|
|
self._add_system(f"[!] Crew error: {e}\n{traceback.format_exc()}")
|
|
self._set_status("error")
|
|
# Stop timers on error too
|
|
if self._crew_stats_timer:
|
|
self._crew_stats_timer.stop()
|
|
self._crew_stats_timer = None
|
|
if self._spinner_timer:
|
|
self._spinner_timer.stop()
|
|
self._spinner_timer = None
|
|
finally:
|
|
self._is_running = False
|
|
|
|
@work(thread=False)
|
|
async def _run_assist(self, message: str) -> None:
|
|
"""Run in assist mode - single response"""
|
|
if not self.agent:
|
|
self._add_system("[!] Agent not ready")
|
|
return
|
|
|
|
self._is_running = True
|
|
self._should_stop = False
|
|
self._set_status("thinking", "assist")
|
|
|
|
try:
|
|
async for response in self.agent.assist(message):
|
|
if self._should_stop:
|
|
self._add_system("[!] Stopped by user")
|
|
break
|
|
|
|
self._set_status("processing")
|
|
|
|
# Show thinking/plan FIRST if there's content with tool calls
|
|
if response.content:
|
|
content = response.content.strip()
|
|
if response.tool_calls:
|
|
self._add_thinking(content)
|
|
else:
|
|
self._add_assistant(content)
|
|
|
|
# Show tool calls (skip 'finish' - internal control)
|
|
if response.tool_calls:
|
|
for call in response.tool_calls:
|
|
if call.name == "finish":
|
|
continue # Skip - summary shown as final message
|
|
args_str = str(call.arguments)
|
|
self._add_tool(call.name, args_str)
|
|
|
|
# Show tool results (displayed after execution completes)
|
|
# Skip 'finish' tool - its result is shown as the final summary
|
|
if response.tool_results:
|
|
for result in response.tool_results:
|
|
if result.tool_name == "finish":
|
|
continue # Skip - summary shown separately
|
|
if result.success:
|
|
self._add_tool_result(
|
|
result.tool_name, result.result or "Done"
|
|
)
|
|
else:
|
|
self._add_tool_result(
|
|
result.tool_name, f"Error: {result.error}"
|
|
)
|
|
|
|
self._set_status("idle", "assist")
|
|
|
|
except asyncio.CancelledError:
|
|
self._add_system("[!] Cancelled")
|
|
self._set_status("idle", "assist")
|
|
except Exception as e:
|
|
self._add_system(f"[!] Error: {e}")
|
|
self._set_status("error")
|
|
finally:
|
|
self._is_running = False
|
|
|
|
@work(thread=False)
|
|
async def _run_agent_mode(self, task: str) -> None:
|
|
"""Run in agent mode - autonomous until task complete or user stops"""
|
|
if not self.agent:
|
|
self._add_system("[!] Agent not ready")
|
|
return
|
|
|
|
self._is_running = True
|
|
self._should_stop = False
|
|
|
|
self._set_status("thinking", "agent")
|
|
|
|
try:
|
|
|
|
from ..agents.base_agent import AgentState
|
|
|
|
async for response in self.agent.agent_loop(task):
|
|
if self._should_stop:
|
|
self._add_system("[!] Stopped by user")
|
|
break
|
|
|
|
self._set_status("processing")
|
|
|
|
# Show thinking/plan FIRST if there's content with tool calls
|
|
if response.content:
|
|
content = response.content.strip()
|
|
# If it has tool calls, it's thinking.
|
|
# If it's marked as intermediate, it's thinking.
|
|
if response.tool_calls or response.metadata.get("intermediate"):
|
|
self._add_thinking(content)
|
|
else:
|
|
# Check if this is a task completion message
|
|
if response.metadata.get("task_complete"):
|
|
self._add_assistant(content)
|
|
else:
|
|
self._add_assistant(content)
|
|
|
|
# Show tool calls AFTER thinking
|
|
if response.tool_calls:
|
|
for call in response.tool_calls:
|
|
# Show all tools including finish
|
|
args_str = str(call.arguments)
|
|
self._add_tool(call.name, args_str)
|
|
|
|
# Show tool results
|
|
if response.tool_results:
|
|
for result in response.tool_results:
|
|
if result.tool_name == "finish":
|
|
# Skip showing result for finish tool as it's redundant with the tool call display
|
|
continue
|
|
|
|
if result.success:
|
|
self._add_tool_result(
|
|
result.tool_name, result.result or "Done"
|
|
)
|
|
else:
|
|
self._add_tool_result(
|
|
result.tool_name, f"Error: {result.error}"
|
|
)
|
|
|
|
# Check state
|
|
if self.agent.get_state() == AgentState.WAITING_INPUT:
|
|
self._set_status("waiting")
|
|
self._add_system("? Awaiting input...")
|
|
break
|
|
elif self.agent.get_state() == AgentState.COMPLETE:
|
|
break
|
|
|
|
self._set_status("thinking")
|
|
|
|
self._set_status("complete", "agent")
|
|
self._add_system("+ Agent task complete. Back to assist mode.")
|
|
|
|
# Return to assist mode
|
|
await asyncio.sleep(1)
|
|
self._set_status("idle", "assist")
|
|
|
|
except asyncio.CancelledError:
|
|
self._add_system("[!] Cancelled")
|
|
self._set_status("idle", "assist")
|
|
except Exception as e:
|
|
self._add_system(f"[!] Error: {e}")
|
|
self._set_status("error")
|
|
finally:
|
|
self._is_running = False
|
|
|
|
def action_quit_app(self) -> None:
|
|
# Stop any running tasks first
|
|
if self._is_running:
|
|
self._should_stop = True
|
|
if self._current_worker and not getattr(self._current_worker, "done", lambda: False)():
|
|
cancel = getattr(self._current_worker, "cancel", None)
|
|
if cancel:
|
|
cancel()
|
|
if self._current_crew:
|
|
# Schedule cancel but don't wait - we're exiting
|
|
asyncio.create_task(self._cancel_crew())
|
|
self.exit()
|
|
|
|
def action_stop_agent(self) -> None:
|
|
if self._is_running:
|
|
self._should_stop = True
|
|
self._add_system("[!] Stopping...")
|
|
|
|
# Cancel the running worker to interrupt blocking awaits
|
|
if self._current_worker and not getattr(self._current_worker, "done", lambda: False)():
|
|
cancel = getattr(self._current_worker, "cancel", None)
|
|
if cancel:
|
|
cancel()
|
|
|
|
# Cancel crew orchestrator if running
|
|
if self._current_crew:
|
|
asyncio.create_task(self._cancel_crew())
|
|
|
|
# Clean up agent state to prevent stale tool responses
|
|
if self.agent:
|
|
self.agent.cleanup_after_cancel()
|
|
|
|
# Reconnect MCP servers (they may be in a bad state after cancellation)
|
|
if self.mcp_manager:
|
|
asyncio.create_task(self._reconnect_mcp_after_cancel())
|
|
|
|
async def _cancel_crew(self) -> None:
|
|
"""Cancel crew orchestrator and all workers."""
|
|
try:
|
|
if self._current_crew:
|
|
await self._current_crew.cancel()
|
|
self._current_crew = None
|
|
# Mark all running workers as cancelled in the UI
|
|
for worker_id, worker in self._crew_workers.items():
|
|
if worker.get("status") in ("running", "pending"):
|
|
self._update_crew_worker(worker_id, status="cancelled")
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to cancel crew orchestrator cleanly: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed during crew cancellation: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about crew cancellation failure")
|
|
|
|
async def _reconnect_mcp_after_cancel(self) -> None:
|
|
"""Reconnect MCP servers after cancellation to restore clean state."""
|
|
await asyncio.sleep(0.5) # Brief delay for cancellation to propagate
|
|
try:
|
|
if self.mcp_manager:
|
|
await self.mcp_manager.reconnect_all()
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to reconnect MCP servers after cancel: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: failed to reconnect MCP servers after cancel: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about MCP reconnect failure")
|
|
|
|
def action_show_help(self) -> None:
|
|
self.push_screen(HelpScreen())
|
|
|
|
# ----- History navigation -----
|
|
|
|
def action_history_up(self) -> None:
|
|
"""Recall previous input into the chat field."""
|
|
try:
|
|
inp = self.query_one("#chat-input", Input)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to query chat input for history up: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: history navigation failed: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about history_up failure")
|
|
return
|
|
|
|
if not self._cmd_history:
|
|
return
|
|
|
|
# Move back but not below zero
|
|
if self._history_index > 0:
|
|
self._history_index -= 1
|
|
|
|
inp.value = self._cmd_history[self._history_index]
|
|
|
|
def action_history_down(self) -> None:
|
|
"""Recall next input (or clear when at end)."""
|
|
try:
|
|
inp = self.query_one("#chat-input", Input)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to query chat input for history down: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: history navigation failed: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about history_down failure")
|
|
return
|
|
|
|
if not self._cmd_history:
|
|
return
|
|
|
|
if self._history_index < len(self._cmd_history) - 1:
|
|
self._history_index += 1
|
|
inp.value = self._cmd_history[self._history_index]
|
|
else:
|
|
# Past the end: clear input and set index to end
|
|
self._history_index = len(self._cmd_history)
|
|
inp.value = ""
|
|
|
|
async def on_unmount(self) -> None:
|
|
"""Cleanup"""
|
|
if self.mcp_manager:
|
|
try:
|
|
await self.mcp_manager.disconnect_all()
|
|
await asyncio.sleep(0.1)
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to disconnect MCP manager on unmount: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: error during shutdown disconnect: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about MCP disconnect failure")
|
|
|
|
if self.runtime:
|
|
try:
|
|
await self.runtime.stop()
|
|
except Exception as e:
|
|
logging.getLogger(__name__).exception("Failed to stop runtime on unmount: %s", e)
|
|
try:
|
|
from .notifier import notify
|
|
|
|
notify("warning", f"TUI: runtime stop error during shutdown: {e}")
|
|
except Exception:
|
|
logging.getLogger(__name__).exception("Failed to notify operator about runtime stop failure")
|
|
|
|
|
|
# ----- Entry Point -----
|
|
|
|
|
|
def run_tui(
|
|
target: Optional[str] = None,
|
|
model: Optional[str] = None,
|
|
use_docker: bool = False,
|
|
):
|
|
"""Run the PentestAgent TUI"""
|
|
app = PentestAgentTUI(
|
|
target=target,
|
|
model=model,
|
|
use_docker=use_docker,
|
|
)
|
|
app.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_tui()
|