Merge pull request #20 from giveen/bug-fix

Bug fix
This commit is contained in:
Masic
2026-01-20 21:40:15 -07:00
committed by GitHub
4 changed files with 266 additions and 38 deletions

3
.gitignore vendored
View File

@@ -90,6 +90,9 @@ tests/tmp/
tests/*.local.py
scripts/test_*.sh
*.test.sh
# Ignore all test_*.py scripts in tests/
tests/test_*.py
*.test.sh
# Workspaces directory (user data should not be committed)
/workspaces/

View File

@@ -489,7 +489,53 @@ class BaseAgent(ABC):
)
)
else:
result = await tool.execute(arguments, self.runtime)
# If the tool is the generic terminal fallback but the LLM
# requested a specific tool name (e.g. 'nmap'), construct
# a sensible command string from the provided arguments
# so the terminal tool can execute it. This handles
# LLMs that call semantic tool names which aren't
# registered as explicit tools.
if tool.name == "terminal" and name != "terminal":
# If the LLM already provided a raw 'command', prefer it
if isinstance(arguments, dict) and "command" in arguments:
terminal_args = arguments
else:
# Build a best-effort command: tool name + positional
# argument values joined by space. This is intentionally
# simple to avoid over-guessing flag semantics.
cmd_parts = [name]
if isinstance(arguments, dict):
# Prefer common keys like 'target' or 'hosts' first
for k in ("target", "host", "hosts", "hosts_list", "hosts[]"):
if k in arguments:
v = arguments[k]
if isinstance(v, (list, tuple)):
cmd_parts.extend([str(x) for x in v])
else:
cmd_parts.append(str(v))
# Append any remaining values
for k, v in arguments.items():
if k in ("target", "host", "hosts", "hosts_list", "hosts[]"):
continue
if v is True:
cmd_parts.append(f"--{k}")
elif v is False or v is None:
continue
elif isinstance(v, (list, tuple)):
cmd_parts.extend([str(x) for x in v])
else:
cmd_parts.append(str(v))
elif isinstance(arguments, (list, tuple)):
cmd_parts.extend([str(x) for x in arguments])
else:
# Fallback: append as single positional
cmd_parts.append(str(arguments))
terminal_args = {"command": " ".join(cmd_parts)}
result = await tool.execute(terminal_args, self.runtime)
else:
result = await tool.execute(arguments, self.runtime)
results.append(
ToolResult(
tool_call_id=tool_call_id,

View File

@@ -1228,6 +1228,9 @@ class PentestAgentTUI(App):
with Horizontal(id="main-container"):
# Chat area (left side)
with Vertical(id="chat-area"):
# Persistent header shown above the chat scroll so operator
# always sees runtime/mode/target information.
yield Static("", id="header")
yield ScrollableContainer(id="chat-scroll")
yield StatusBar(id="status-bar")
with Horizontal(id="input-container"):
@@ -1380,15 +1383,28 @@ class PentestAgentTUI(App):
tools_str += f", +{len(self.all_tools) - 5} more"
runtime_str = "Docker" if self.use_docker else "Local"
self._add_system(
f"+ PentestAgent ready\n"
f" Model: {self.model} | Tools: {len(self.all_tools)} | MCP: {mcp_server_count} | RAG: {rag_doc_count}\n"
f" Runtime: {runtime_str} | Mode: Assist (use /agent or /crew for autonomous modes)"
)
# Update persistent header instead of adding an ad-hoc system
# message to the chat. This keeps the info visible at top.
try:
self._update_header(model_line=(
f"+ PentestAgent ready\n"
f" Model: {self.model} | Tools: {len(self.all_tools)} | MCP: {mcp_server_count} | RAG: {rag_doc_count}\n"
f" Runtime: {runtime_str} | Mode: Assist (use /agent or /crew for autonomous modes)"
))
except Exception:
# Fallback to previous behavior if header widget isn't available
self._add_system(
f"+ PentestAgent ready\n"
f" Model: {self.model} | Tools: {len(self.all_tools)} | MCP: {mcp_server_count} | RAG: {rag_doc_count}\n"
f" Runtime: {runtime_str} | Mode: Assist (use /agent or /crew for autonomous modes)"
)
# Show target if provided (but don't auto-start)
# Also update header with target if present
if self.target:
self._add_system(f" Target: {self.target}")
try:
self._update_header(target=self.target)
except Exception:
self._add_system(f" Target: {self.target}")
except Exception as e:
import traceback
@@ -1669,6 +1685,25 @@ class PentestAgentTUI(App):
# Update agent's target if agent exists
if self.agent:
self.agent.target = target
try:
from pentestagent.agents.base_agent import AgentMessage
# Inform the agent (LLM) that the operator changed the target so
# subsequent generations use the new value instead of older
# workspace-bound targets from conversation history.
self.agent.conversation_history.append(
AgentMessage(role="system", content=f"Operator set target to {target}")
)
# Track manual target override so workspace restores can remove
# or supersede this message when appropriate.
try:
setattr(self.agent, "_manual_target", target)
except Exception:
pass
except Exception as e:
logging.getLogger(__name__).exception(
"Failed to append target change to agent history: %s", e
)
# Persist to active workspace if present
try:
@@ -1754,23 +1789,13 @@ class PentestAgentTUI(App):
updated = True
break
if not updated:
# Fallback: add system line near top by inserting at beginning
# If we couldn't find an existing banner SystemMessage to
# update, update the persistent header instead of inserting
# additional in-chat system messages to avoid duplicates.
try:
first = scroll.children[0] if scroll.children else None
msg = SystemMessage(f" Target: {target}")
if first:
scroll.mount_before(msg, first)
else:
scroll.mount(msg)
except Exception as e:
logging.getLogger(__name__).exception("Failed to mount target system message: %s", e)
try:
from pentestagent.interface.notifier import notify
notify("warning", f"TUI: failed to display target: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about target display failure: %s", ne)
self._add_system(f" Target: {target}")
self._update_header(target=target)
except Exception:
logging.getLogger(__name__).exception("Failed to update persistent header with target")
except Exception as e:
logging.getLogger(__name__).exception("Failed while applying target display: %s", e)
try:
@@ -1919,8 +1944,8 @@ Be concise. Use the actual data from notes."""
# Use assist mode by default
if self.agent and not self._is_running:
# Schedule assist run and keep task handle
self._current_worker = asyncio.create_task(self._run_assist(message))
# Schedule assist run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
self._current_worker = self._run_assist(message)
async def _handle_command(self, cmd: str) -> None:
"""Handle slash commands"""
@@ -2007,6 +2032,39 @@ Be concise. Use the actual data from notes."""
self.target = last
if self.agent:
self.agent.target = last
# If the operator set a manual target while no
# workspace was active, remove/supersede that
# system message so the LLM uses the workspace
# target instead of the stale manual one.
try:
manual = getattr(self.agent, "_manual_target", None)
if manual:
self.agent.conversation_history = [
m
for m in self.agent.conversation_history
if not (
m.role == "system"
and isinstance(m.content, str)
and m.content.strip().startswith("Operator set target to")
)
]
try:
delattr(self.agent, "_manual_target")
except Exception:
pass
except Exception:
pass
try:
from pentestagent.agents.base_agent import AgentMessage
self.agent.conversation_history.append(
AgentMessage(
role="system",
content=f"Workspace active; restored last target: {last}",
)
)
except Exception:
pass
try:
self._apply_target_display(last)
except Exception as e:
@@ -2104,6 +2162,35 @@ Be concise. Use the actual data from notes."""
try:
if marker.exists():
marker.unlink()
# Clear TUI and agent target when workspace is deactivated
self.target = ""
try:
self._apply_target_display("")
except Exception:
pass
if self.agent:
try:
# Clear agent's target and any manual override
self.agent.target = ""
try:
if hasattr(self.agent, "_manual_target"):
delattr(self.agent, "_manual_target")
except Exception:
pass
from pentestagent.agents.base_agent import AgentMessage
self.agent.conversation_history.append(
AgentMessage(
role="system",
content=(
f"Workspace '{active}' deactivated; cleared target"
),
)
)
except Exception:
logging.getLogger(__name__).exception(
"Failed to clear agent target on workspace clear"
)
self._add_system(f"Workspace '{active}' deactivated.")
except Exception as e:
self._add_system(f"Error deactivating workspace: {e}")
@@ -2179,7 +2266,8 @@ Be concise. Use the actual data from notes."""
if self.agent and not self._is_running:
# Schedule agent mode and keep task handle
self._current_worker = asyncio.create_task(self._run_agent_mode(task))
# Schedule agent run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
self._current_worker = self._run_agent_mode(task)
async def _parse_crew_command(self, cmd: str) -> None:
"""Parse and execute /crew command"""
@@ -2208,7 +2296,8 @@ Be concise. Use the actual data from notes."""
self._add_user(f"/crew {target}")
self._show_sidebar()
# Schedule crew mode and keep handle
self._current_worker = asyncio.create_task(self._run_crew_mode(target))
# Schedule crew run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
self._current_worker = self._run_crew_mode(target)
def _show_sidebar(self) -> None:
"""Show the sidebar for crew mode."""
@@ -2315,16 +2404,44 @@ Be concise. Use the actual data from notes."""
)
updated = True
break
if not updated:
try:
first = scroll.children[0] if scroll.children else None
msg = SystemMessage(f" Target: {target}")
if first:
scroll.mount_before(msg, first)
else:
scroll.mount(msg)
except Exception:
self._add_system(f" Target: {target}")
if not updated:
try:
self._update_header(target=target)
except Exception:
logging.getLogger(__name__).exception("Failed to update persistent header with target")
except Exception as e:
logging.getLogger(__name__).exception("Failed updating in-scroll target display: %s", e)
# Also update the persistent header so the target is always visible
try:
self._update_header(target=target)
except Exception:
pass
def _update_header(self, model_line: Optional[str] = None, target: Optional[str] = None) -> None:
"""Compose and update the persistent header widget."""
try:
header = self.query_one("#header", Static)
# Build header text from provided pieces or current state
lines: List[str] = []
if model_line:
lines.append(model_line)
else:
# try to recreate a compact model/runtime line
runtime_str = "Docker" if getattr(self, "use_docker", False) else "Local"
tools_count = len(getattr(self, "all_tools", [])) if hasattr(self, "all_tools") else 0
lines.append(
f"+ PentestAgent ready\n Model: {getattr(self, 'model', '')} | Tools: {tools_count} | RAG: {getattr(self, 'rag_doc_count', '')}\n Runtime: {runtime_str} | Mode: {getattr(self, '_mode', 'assist')}"
)
# Ensure target line is present/updated
if target is None:
target = getattr(self, "target", "")
if target:
# append target on its own line
lines.append(f" Target: {target}")
header.update("\n".join(lines))
except Exception:
pass
except Exception:
self._add_system(f" Target: {target}")

View File

@@ -52,6 +52,68 @@ async def terminal(arguments: dict, runtime: "Runtime") -> str:
else:
full_command = command
# If the provided command appears to be flags-only (starts with '-')
# it's likely the LLM intended to call a specific tool (e.g. `nmap`)
# but omitted the binary name. Detect this pattern and prefix with
# the `nmap` binary when available in the runtime environment.
try:
cmd_prefix = ""
cmd_rest = full_command
if "&&" in full_command:
left, right = full_command.split("&&", 1)
cmd_prefix = left + "&& "
cmd_rest = right
if cmd_rest.lstrip().startswith("-"):
try:
available = getattr(runtime, "environment", None)
tool_names = [t.name for t in available.available_tools] if available else []
except Exception:
tool_names = []
# Heuristic mapping of common flags to likely binaries
import re
lower = cmd_rest.lower()
chosen = None
# nmap indicators
if re.search(r"\b-p\b|\b-p[0-9]|-s[sv]|\b-on\b|\b-o[nag]\b", lower):
if "nmap" in tool_names:
chosen = "nmap"
# gobuster indicators (wordlist, dirscan)
if not chosen and re.search(r"\b-w\b|--wordlist|\b-u\b|--url", lower):
if "gobuster" in tool_names:
chosen = "gobuster"
# rustscan/masscan indicators (large-port ranges, fast scan)
if not chosen and re.search(r"\b--rate\b|\b--ping\b|\b--range\b|\b-z\b", lower):
for alt in ("rustscan", "masscan"):
if alt in tool_names:
chosen = alt
break
# web fetchers
if not chosen and re.search(r"https?://", cmd_rest):
for alt in ("curl", "wget"):
if alt in tool_names:
chosen = alt
break
# Fallback prefer nmap if present, then first interesting tool
if not chosen:
for alt in ("nmap", "rustscan", "masscan", "gobuster", "nikto", "ffuf", "dirb", "curl", "wget"):
if alt in tool_names:
chosen = alt
break
if chosen:
full_command = cmd_prefix + chosen + " " + cmd_rest.lstrip()
except Exception:
# Best-effort; do not fail if introspection errors occur
pass
result = await runtime.execute_command(full_command, timeout=timeout)
# Format the output