From 0a79b5d54e3e3df2c56611af27d46ec919b4454d Mon Sep 17 00:00:00 2001 From: giveen Date: Tue, 20 Jan 2026 11:46:32 -0700 Subject: [PATCH 1/8] WIP: save local edits (tui.py, .gitignore) --- .gitignore | 3 +++ pentestagent/interface/tui.py | 10 ++++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index c1f632e..a2a10dd 100644 --- a/.gitignore +++ b/.gitignore @@ -90,6 +90,9 @@ tests/tmp/ tests/*.local.py scripts/test_*.sh *.test.sh +# Ignore all test_*.py scripts in tests/ +tests/test_*.py +*.test.sh # Workspaces directory (user data should not be committed) /workspaces/ diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index 1e8870a..eeb0cab 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -1919,8 +1919,8 @@ Be concise. Use the actual data from notes.""" # Use assist mode by default if self.agent and not self._is_running: - # Schedule assist run and keep task handle - self._current_worker = asyncio.create_task(self._run_assist(message)) + # Schedule assist run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker) + self._current_worker = self._run_assist(message) async def _handle_command(self, cmd: str) -> None: """Handle slash commands""" @@ -2179,7 +2179,8 @@ Be concise. Use the actual data from notes.""" if self.agent and not self._is_running: # Schedule agent mode and keep task handle - self._current_worker = asyncio.create_task(self._run_agent_mode(task)) + # Schedule agent run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker) + self._current_worker = self._run_agent_mode(task) async def _parse_crew_command(self, cmd: str) -> None: """Parse and execute /crew command""" @@ -2208,7 +2209,8 @@ Be concise. Use the actual data from notes.""" self._add_user(f"/crew {target}") self._show_sidebar() # Schedule crew mode and keep handle - self._current_worker = asyncio.create_task(self._run_crew_mode(target)) + # Schedule crew run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker) + self._current_worker = self._run_crew_mode(target) def _show_sidebar(self) -> None: """Show the sidebar for crew mode.""" From 5e6e4cd44ce8ac28586ad48d6cf48734ca0b3a83 Mon Sep 17 00:00:00 2001 From: giveen Date: Tue, 20 Jan 2026 12:18:17 -0700 Subject: [PATCH 2/8] fix: heuristics for flags-only terminal commands; route semantic tools to terminal --- pentestagent/agents/base_agent.py | 48 ++++++++++++++++++- pentestagent/tools/terminal/__init__.py | 62 +++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 1 deletion(-) diff --git a/pentestagent/agents/base_agent.py b/pentestagent/agents/base_agent.py index f96ddff..e9172b5 100644 --- a/pentestagent/agents/base_agent.py +++ b/pentestagent/agents/base_agent.py @@ -489,7 +489,53 @@ class BaseAgent(ABC): ) ) else: - result = await tool.execute(arguments, self.runtime) + # If the tool is the generic terminal fallback but the LLM + # requested a specific tool name (e.g. 'nmap'), construct + # a sensible command string from the provided arguments + # so the terminal tool can execute it. This handles + # LLMs that call semantic tool names which aren't + # registered as explicit tools. + if tool.name == "terminal" and name != "terminal": + # If the LLM already provided a raw 'command', prefer it + if isinstance(arguments, dict) and "command" in arguments: + terminal_args = arguments + else: + # Build a best-effort command: tool name + positional + # argument values joined by space. This is intentionally + # simple to avoid over-guessing flag semantics. + cmd_parts = [name] + if isinstance(arguments, dict): + # Prefer common keys like 'target' or 'hosts' first + for k in ("target", "host", "hosts", "hosts_list", "hosts[]"): + if k in arguments: + v = arguments[k] + if isinstance(v, (list, tuple)): + cmd_parts.extend([str(x) for x in v]) + else: + cmd_parts.append(str(v)) + # Append any remaining values + for k, v in arguments.items(): + if k in ("target", "host", "hosts", "hosts_list", "hosts[]"): + continue + if v is True: + cmd_parts.append(f"--{k}") + elif v is False or v is None: + continue + elif isinstance(v, (list, tuple)): + cmd_parts.extend([str(x) for x in v]) + else: + cmd_parts.append(str(v)) + elif isinstance(arguments, (list, tuple)): + cmd_parts.extend([str(x) for x in arguments]) + else: + # Fallback: append as single positional + cmd_parts.append(str(arguments)) + + terminal_args = {"command": " ".join(cmd_parts)} + + result = await tool.execute(terminal_args, self.runtime) + else: + result = await tool.execute(arguments, self.runtime) results.append( ToolResult( tool_call_id=tool_call_id, diff --git a/pentestagent/tools/terminal/__init__.py b/pentestagent/tools/terminal/__init__.py index 4a07c9e..19334d0 100644 --- a/pentestagent/tools/terminal/__init__.py +++ b/pentestagent/tools/terminal/__init__.py @@ -52,6 +52,68 @@ async def terminal(arguments: dict, runtime: "Runtime") -> str: else: full_command = command + # If the provided command appears to be flags-only (starts with '-') + # it's likely the LLM intended to call a specific tool (e.g. `nmap`) + # but omitted the binary name. Detect this pattern and prefix with + # the `nmap` binary when available in the runtime environment. + try: + cmd_prefix = "" + cmd_rest = full_command + if "&&" in full_command: + left, right = full_command.split("&&", 1) + cmd_prefix = left + "&& " + cmd_rest = right + + if cmd_rest.lstrip().startswith("-"): + try: + available = getattr(runtime, "environment", None) + tool_names = [t.name for t in available.available_tools] if available else [] + except Exception: + tool_names = [] + + # Heuristic mapping of common flags to likely binaries + import re + + lower = cmd_rest.lower() + chosen = None + + # nmap indicators + if re.search(r"\b-p\b|\b-p[0-9]|-s[sv]|\b-on\b|\b-o[nag]\b", lower): + if "nmap" in tool_names: + chosen = "nmap" + + # gobuster indicators (wordlist, dirscan) + if not chosen and re.search(r"\b-w\b|--wordlist|\b-u\b|--url", lower): + if "gobuster" in tool_names: + chosen = "gobuster" + + # rustscan/masscan indicators (large-port ranges, fast scan) + if not chosen and re.search(r"\b--rate\b|\b--ping\b|\b--range\b|\b-z\b", lower): + for alt in ("rustscan", "masscan"): + if alt in tool_names: + chosen = alt + break + + # web fetchers + if not chosen and re.search(r"https?://", cmd_rest): + for alt in ("curl", "wget"): + if alt in tool_names: + chosen = alt + break + + # Fallback prefer nmap if present, then first interesting tool + if not chosen: + for alt in ("nmap", "rustscan", "masscan", "gobuster", "nikto", "ffuf", "dirb", "curl", "wget"): + if alt in tool_names: + chosen = alt + break + + if chosen: + full_command = cmd_prefix + chosen + " " + cmd_rest.lstrip() + except Exception: + # Best-effort; do not fail if introspection errors occur + pass + result = await runtime.execute_command(full_command, timeout=timeout) # Format the output From ea9c69fe222faef130219265c1b6ee07ec18bfb2 Mon Sep 17 00:00:00 2001 From: giveen Date: Tue, 20 Jan 2026 12:26:37 -0700 Subject: [PATCH 3/8] fix(tui): append agent history when target set/restored so LLM sees changes --- pentestagent/interface/tui.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index eeb0cab..29d65f7 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -1669,6 +1669,19 @@ class PentestAgentTUI(App): # Update agent's target if agent exists if self.agent: self.agent.target = target + try: + from pentestagent.agents.base_agent import AgentMessage + + # Inform the agent (LLM) that the operator changed the target so + # subsequent generations use the new value instead of older + # workspace-bound targets from conversation history. + self.agent.conversation_history.append( + AgentMessage(role="system", content=f"Operator set target to {target}") + ) + except Exception as e: + logging.getLogger(__name__).exception( + "Failed to append target change to agent history: %s", e + ) # Persist to active workspace if present try: @@ -2007,6 +2020,17 @@ Be concise. Use the actual data from notes.""" self.target = last if self.agent: self.agent.target = last + try: + from pentestagent.agents.base_agent import AgentMessage + + self.agent.conversation_history.append( + AgentMessage( + role="system", + content=f"Workspace active; restored last target: {last}", + ) + ) + except Exception: + pass try: self._apply_target_display(last) except Exception as e: From 00a7449293ce37029b70103e592ac970bdd4c9e5 Mon Sep 17 00:00:00 2001 From: giveen Date: Tue, 20 Jan 2026 12:31:48 -0700 Subject: [PATCH 4/8] fix(tui): ensure workspace restores supersede manual target messages --- pentestagent/interface/tui.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index 29d65f7..df4b970 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -1678,6 +1678,12 @@ class PentestAgentTUI(App): self.agent.conversation_history.append( AgentMessage(role="system", content=f"Operator set target to {target}") ) + # Track manual target override so workspace restores can remove + # or supersede this message when appropriate. + try: + setattr(self.agent, "_manual_target", target) + except Exception: + pass except Exception as e: logging.getLogger(__name__).exception( "Failed to append target change to agent history: %s", e @@ -2020,6 +2026,28 @@ Be concise. Use the actual data from notes.""" self.target = last if self.agent: self.agent.target = last + # If the operator set a manual target while no + # workspace was active, remove/supersede that + # system message so the LLM uses the workspace + # target instead of the stale manual one. + try: + manual = getattr(self.agent, "_manual_target", None) + if manual: + self.agent.conversation_history = [ + m + for m in self.agent.conversation_history + if not ( + m.role == "system" + and isinstance(m.content, str) + and m.content.strip().startswith("Operator set target to") + ) + ] + try: + delattr(self.agent, "_manual_target") + except Exception: + pass + except Exception: + pass try: from pentestagent.agents.base_agent import AgentMessage From 6b4a56847943571466355351763d526dcd4b5ad6 Mon Sep 17 00:00:00 2001 From: giveen Date: Tue, 20 Jan 2026 12:32:33 -0700 Subject: [PATCH 5/8] fix(tui): clear target and agent state when deactivating workspace --- pentestagent/interface/tui.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index df4b970..370f81c 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -2156,6 +2156,35 @@ Be concise. Use the actual data from notes.""" try: if marker.exists(): marker.unlink() + # Clear TUI and agent target when workspace is deactivated + self.target = "" + try: + self._apply_target_display("") + except Exception: + pass + if self.agent: + try: + # Clear agent's target and any manual override + self.agent.target = "" + try: + if hasattr(self.agent, "_manual_target"): + delattr(self.agent, "_manual_target") + except Exception: + pass + from pentestagent.agents.base_agent import AgentMessage + + self.agent.conversation_history.append( + AgentMessage( + role="system", + content=( + f"Workspace '{active}' deactivated; cleared target" + ), + ) + ) + except Exception: + logging.getLogger(__name__).exception( + "Failed to clear agent target on workspace clear" + ) self._add_system(f"Workspace '{active}' deactivated.") except Exception as e: self._add_system(f"Error deactivating workspace: {e}") From cabae0fcd6f50f3c225e8edcd522634427de0d69 Mon Sep 17 00:00:00 2001 From: giveen Date: Tue, 20 Jan 2026 12:34:08 -0700 Subject: [PATCH 6/8] feat(tui): add persistent header showing runtime/mode/target and keep it updated --- pentestagent/interface/tui.py | 63 +++++++++++++++++++++++++++++++---- 1 file changed, 56 insertions(+), 7 deletions(-) diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index 370f81c..2840fea 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -1228,6 +1228,9 @@ class PentestAgentTUI(App): with Horizontal(id="main-container"): # Chat area (left side) with Vertical(id="chat-area"): + # Persistent header shown above the chat scroll so operator + # always sees runtime/mode/target information. + yield Static("", id="header") yield ScrollableContainer(id="chat-scroll") yield StatusBar(id="status-bar") with Horizontal(id="input-container"): @@ -1380,15 +1383,28 @@ class PentestAgentTUI(App): tools_str += f", +{len(self.all_tools) - 5} more" runtime_str = "Docker" if self.use_docker else "Local" - self._add_system( - f"+ PentestAgent ready\n" - f" Model: {self.model} | Tools: {len(self.all_tools)} | MCP: {mcp_server_count} | RAG: {rag_doc_count}\n" - f" Runtime: {runtime_str} | Mode: Assist (use /agent or /crew for autonomous modes)" - ) + # Update persistent header instead of adding an ad-hoc system + # message to the chat. This keeps the info visible at top. + try: + self._update_header(model_line=( + f"+ PentestAgent ready\n" + f" Model: {self.model} | Tools: {len(self.all_tools)} | MCP: {mcp_server_count} | RAG: {rag_doc_count}\n" + f" Runtime: {runtime_str} | Mode: Assist (use /agent or /crew for autonomous modes)" + )) + except Exception: + # Fallback to previous behavior if header widget isn't available + self._add_system( + f"+ PentestAgent ready\n" + f" Model: {self.model} | Tools: {len(self.all_tools)} | MCP: {mcp_server_count} | RAG: {rag_doc_count}\n" + f" Runtime: {runtime_str} | Mode: Assist (use /agent or /crew for autonomous modes)" + ) - # Show target if provided (but don't auto-start) + # Also update header with target if present if self.target: - self._add_system(f" Target: {self.target}") + try: + self._update_header(target=self.target) + except Exception: + self._add_system(f" Target: {self.target}") except Exception as e: import traceback @@ -2408,6 +2424,39 @@ Be concise. Use the actual data from notes.""" scroll.mount(msg) except Exception: self._add_system(f" Target: {target}") + except Exception as e: + logging.getLogger(__name__).exception("Failed updating in-scroll target display: %s", e) + # Also update the persistent header so the target is always visible + try: + self._update_header(target=target) + except Exception: + pass + + def _update_header(self, model_line: Optional[str] = None, target: Optional[str] = None) -> None: + """Compose and update the persistent header widget.""" + try: + header = self.query_one("#header", Static) + # Build header text from provided pieces or current state + lines: List[str] = [] + if model_line: + lines.append(model_line) + else: + # try to recreate a compact model/runtime line + runtime_str = "Docker" if getattr(self, "use_docker", False) else "Local" + tools_count = len(getattr(self, "all_tools", [])) if hasattr(self, "all_tools") else 0 + lines.append( + f"+ PentestAgent ready\n Model: {getattr(self, 'model', '')} | Tools: {tools_count} | RAG: {getattr(self, 'rag_doc_count', '')}\n Runtime: {runtime_str} | Mode: {getattr(self, '_mode', 'assist')}" + ) + # Ensure target line is present/updated + if target is None: + target = getattr(self, "target", "") + if target: + # append target on its own line + lines.append(f" Target: {target}") + + header.update("\n".join(lines)) + except Exception: + pass except Exception: self._add_system(f" Target: {target}") From 0219d8367fa27c53c23035d564a1a404b54f0083 Mon Sep 17 00:00:00 2001 From: giveen Date: Tue, 20 Jan 2026 12:36:22 -0700 Subject: [PATCH 7/8] fix(tui): avoid mount_before AttributeError by falling back to mount --- pentestagent/interface/tui.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index 2840fea..7555a66 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -1794,7 +1794,12 @@ class PentestAgentTUI(App): first = scroll.children[0] if scroll.children else None msg = SystemMessage(f" Target: {target}") if first: - scroll.mount_before(msg, first) + # Textual's ScrollableContainer may not implement + # mount_before across versions; fall back to mount. + try: + scroll.mount_before(msg, first) + except Exception: + scroll.mount(msg) else: scroll.mount(msg) except Exception as e: @@ -2419,7 +2424,10 @@ Be concise. Use the actual data from notes.""" first = scroll.children[0] if scroll.children else None msg = SystemMessage(f" Target: {target}") if first: - scroll.mount_before(msg, first) + try: + scroll.mount_before(msg, first) + except Exception: + scroll.mount(msg) else: scroll.mount(msg) except Exception: From 9de59f1d0088f4ac54de32d270ff370371ee05ce Mon Sep 17 00:00:00 2001 From: giveen Date: Tue, 20 Jan 2026 12:38:32 -0700 Subject: [PATCH 8/8] refactor(tui): use persistent header for target display; remove in-chat target duplicates --- pentestagent/interface/tui.py | 47 +++++++++-------------------------- 1 file changed, 12 insertions(+), 35 deletions(-) diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index 7555a66..d1ca258 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -1789,28 +1789,13 @@ class PentestAgentTUI(App): updated = True break if not updated: - # Fallback: add system line near top by inserting at beginning + # If we couldn't find an existing banner SystemMessage to + # update, update the persistent header instead of inserting + # additional in-chat system messages to avoid duplicates. try: - first = scroll.children[0] if scroll.children else None - msg = SystemMessage(f" Target: {target}") - if first: - # Textual's ScrollableContainer may not implement - # mount_before across versions; fall back to mount. - try: - scroll.mount_before(msg, first) - except Exception: - scroll.mount(msg) - else: - scroll.mount(msg) - except Exception as e: - logging.getLogger(__name__).exception("Failed to mount target system message: %s", e) - try: - from pentestagent.interface.notifier import notify - - notify("warning", f"TUI: failed to display target: {e}") - except Exception as ne: - logging.getLogger(__name__).exception("Failed to notify operator about target display failure: %s", ne) - self._add_system(f" Target: {target}") + self._update_header(target=target) + except Exception: + logging.getLogger(__name__).exception("Failed to update persistent header with target") except Exception as e: logging.getLogger(__name__).exception("Failed while applying target display: %s", e) try: @@ -2419,22 +2404,14 @@ Be concise. Use the actual data from notes.""" ) updated = True break - if not updated: - try: - first = scroll.children[0] if scroll.children else None - msg = SystemMessage(f" Target: {target}") - if first: - try: - scroll.mount_before(msg, first) - except Exception: - scroll.mount(msg) - else: - scroll.mount(msg) - except Exception: - self._add_system(f" Target: {target}") + if not updated: + try: + self._update_header(target=target) + except Exception: + logging.getLogger(__name__).exception("Failed to update persistent header with target") except Exception as e: logging.getLogger(__name__).exception("Failed updating in-scroll target display: %s", e) - # Also update the persistent header so the target is always visible + # Also update the persistent header so the target is always visible try: self._update_header(target=target) except Exception: