diff --git a/pentestagent/interface/cli.py b/pentestagent/interface/cli.py index eaeaeb0..9f360fd 100644 --- a/pentestagent/interface/cli.py +++ b/pentestagent/interface/cli.py @@ -9,6 +9,9 @@ from rich.console import Console from rich.markdown import Markdown from rich.panel import Panel from rich.text import Text +import json +import ast +import re from ..config.constants import AGENT_MAX_ITERATIONS, ORCHESTRATOR_MAX_ITERATIONS @@ -136,10 +139,90 @@ async def run_cli( console.print(f"[{PA_DIM}]{timestamp}[/] [{style}]{msg}[/]") def display_message(content: str, title: str) -> bool: - """Display a message panel if it hasn't been shown yet.""" + """Display a message panel if it hasn't been shown yet. + + This will attempt to detect JSON or Python-dict-like content and + pretty-print it inside a fenced JSON code block so it's readable + in the terminal. Falls back to rendering as Markdown otherwise. + """ nonlocal last_content - if content and content != last_content: - console.print() + if not content or content == last_content: + return False + + # Try to detect JSON first and recursively unescape nested JSON strings + pretty_md = None + + def _parse_nested(obj): + """Recursively parse nested JSON strings inside dicts/lists.""" + if isinstance(obj, str): + # Quick JSON parse + try: + parsed = json.loads(obj) + return _parse_nested(parsed) + except Exception: + # Attempt to find a JSON substring (handles escaped inner JSON) + m = re.search(r"(\{[\s\S]*\})", obj) + if m: + try: + parsed = json.loads(m.group(1)) + return _parse_nested(parsed) + except Exception: + return obj + return obj + elif isinstance(obj, dict): + return {k: _parse_nested(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [_parse_nested(v) for v in obj] + else: + return obj + + try: + parsed = json.loads(content) + parsed = _parse_nested(parsed) + pretty = json.dumps(parsed, indent=2, ensure_ascii=False) + pretty_md = f"```json\n{pretty}\n```" + except Exception: + # Not valid JSON — try Python literal (e.g. single-quoted dict) + try: + parsed = ast.literal_eval(content) + parsed = _parse_nested(parsed) + if isinstance(parsed, (dict, list)): + pretty = json.dumps(parsed, indent=2, ensure_ascii=False) + pretty_md = f"```json\n{pretty}\n```" + except Exception: + pretty_md = None + + console.print() + if pretty_md is not None: + # Use the already-parsed structure (may have been unescaped + # / normalized by _parse_nested) to prefer a human-readable + # `summary` field. Fall back to pretty JSON when no summary. + parsed_for_summary = None + try: + parsed_for_summary = parsed # set in the JSON branch above + except NameError: + try: + parsed_for_summary = ast.literal_eval(content) + except Exception: + parsed_for_summary = None + + if isinstance(parsed_for_summary, dict) and isinstance(parsed_for_summary.get("summary"), str): + console.print( + Panel( + Markdown(parsed_for_summary.get("summary")), + title=f"[{PA_PRIMARY}]{title}", + border_style=PA_BORDER, + ) + ) + else: + console.print( + Panel( + Markdown(pretty_md), + title=f"[{PA_PRIMARY}]{title}", + border_style=PA_BORDER, + ) + ) + else: console.print( Panel( Markdown(content), @@ -147,10 +230,9 @@ async def run_cli( border_style=PA_BORDER, ) ) - console.print() - last_content = content - return True - return False + console.print() + last_content = content + return True def generate_report() -> str: """Generate markdown report.""" @@ -325,7 +407,16 @@ async def run_cli( messages=[{"role": "user", "content": "\n".join(context_lines)}], tools=[], ) - return response.content + content = response.content or "" + # Prefer structured JSON 'summary' if present + try: + parsed = json.loads(content) + if isinstance(parsed, dict) and isinstance(parsed.get("summary"), str): + return parsed.get("summary") + except Exception: + pass + + return content except Exception: return None diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index 6583820..a06322d 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -156,7 +156,7 @@ class HelpScreen(ModalScreen): def _get_help_text(self) -> str: return """[bold]Modes:[/] Assist | Agent | Crew -[bold]Keys:[/] Enter=Send Ctrl+C=Stop Ctrl+Q=Quit F1=Help + [bold]Keys:[/] Enter=Send Up/Down=History Ctrl+C=Stop Ctrl+Q=Quit F1=Help [bold]Commands:[/] /agent - Run in agent mode @@ -553,6 +553,8 @@ class PentestAgentTUI(App): Binding("escape", "stop_agent", "Stop", priority=True), Binding("f1", "show_help", "Help"), Binding("tab", "focus_next", "Next", show=False), + Binding("up", "history_up", "Prev", show=False), + Binding("down", "history_down", "Next", show=False), ] TITLE = "PentestAgent" @@ -610,6 +612,10 @@ class PentestAgentTUI(App): "⠏", ] # Braille dots spinner + # Command history + self._cmd_history: List[str] = [] + self._history_index: int = 0 + def compose(self) -> ComposeResult: with Horizontal(id="main-container"): # Chat area (left side) @@ -1041,6 +1047,12 @@ Be concise. Use the actual data from notes.""" if not message: return + # Save to history (de-duplicate consecutive duplicates) + if not (self._cmd_history and self._cmd_history[-1] == message): + self._cmd_history.append(message) + # Reset index to one past the end (blank) + self._history_index = len(self._cmd_history) + event.input.value = "" # Commands @@ -1795,6 +1807,42 @@ Be concise. Use the actual data from notes.""" def action_show_help(self) -> None: self.push_screen(HelpScreen()) + # ----- History navigation ----- + + def action_history_up(self) -> None: + """Recall previous input into the chat field.""" + try: + inp = self.query_one("#chat-input", Input) + except Exception: + return + + if not self._cmd_history: + return + + # Move back but not below zero + if self._history_index > 0: + self._history_index -= 1 + + inp.value = self._cmd_history[self._history_index] + + def action_history_down(self) -> None: + """Recall next input (or clear when at end).""" + try: + inp = self.query_one("#chat-input", Input) + except Exception: + return + + if not self._cmd_history: + return + + if self._history_index < len(self._cmd_history) - 1: + self._history_index += 1 + inp.value = self._cmd_history[self._history_index] + else: + # Past the end: clear input and set index to end + self._history_index = len(self._cmd_history) + inp.value = "" + async def on_unmount(self) -> None: """Cleanup""" if self.mcp_manager: