Merge pull request #36 from famez/main

Added interact mode + possibility to enable and disable tools
This commit is contained in:
Masic
2026-03-01 14:23:45 -07:00
committed by GitHub
7 changed files with 444 additions and 31 deletions

3
.gitignore vendored
View File

@@ -93,7 +93,6 @@ tests/test_*.py
# Workspaces directory (user data should not be committed)
/workspaces/
loot/token_usage.json
loot/*
mcp_examples/kali/mcp_servers.json
pentestagent/mcp/mcp_servers.json
loot/notes.json

View File

@@ -241,7 +241,7 @@ class BaseAgent(ABC):
response = await self.llm.generate(
system_prompt=self.get_system_prompt(),
messages=self._format_messages_for_llm(),
tools=self.tools,
tools=[t for t in self.tools if t.t.enabled], #Only, enabled tools.
)
# Case 1: Empty response (Error)
@@ -870,7 +870,8 @@ Call create_plan with the new steps OR feasible=False."""
self.conversation_history.append(AgentMessage(role="user", content=message))
# Filter out 'finish' tool - not needed for single-shot assist mode
assist_tools = [t for t in self.tools if t.name != "finish"]
# Pass to the agent only the enabled tools
assist_tools = [t for t in self.tools if t.name != "finish" and t.enabled]
# Single LLM call with tools available
response = await self.llm.generate(
@@ -944,6 +945,87 @@ Call create_plan with the new steps OR feasible=False."""
self.state_manager.transition_to(AgentState.COMPLETE)
async def interact(self, message: str) -> AsyncIterator[AgentMessage]:
"""
Interactive mode
Args:
message: The user message to respond to
Yields:
AgentMessage objects
"""
self.state_manager.transition_to(AgentState.THINKING)
self.conversation_history.append(AgentMessage(role="user", content=message))
# Filter out 'finish' tool - not needed for interact mode, we read finish_reason
# Pass to the agent only enabled tools.
interact_tools = [t for t in self.tools if t.name != "finish" and t.enabled]
while True:
# Single LLM call with tools available
response = await self.llm.generate(
system_prompt=self.get_system_prompt(mode="interact"),
messages=self._format_messages_for_llm(),
tools=interact_tools,
)
# If LLM wants to use tools, execute and return result
if response.tool_calls:
# Build tool calls list
tool_calls = [
ToolCall(
id=tc.id if hasattr(tc, "id") else str(i),
name=(
tc.function.name
if hasattr(tc, "function")
else tc.get("name", "")
),
arguments=self._parse_arguments(tc),
)
for i, tc in enumerate(response.tool_calls)
]
# Store in history (minimal content to save tokens)
assistant_msg = AgentMessage(
role="assistant", content=response.content or "", tool_calls=tool_calls
)
self.conversation_history.append(assistant_msg)
yield assistant_msg
# NOW execute the tools (this can take a while)
self.state_manager.transition_to(AgentState.EXECUTING)
tool_results = await self._execute_tools(response.tool_calls)
tool_result_msg = AgentMessage(
role="tool_result", content="", tool_results=tool_results
)
self.conversation_history.append(tool_result_msg)
yield tool_result_msg
else:
# Direct response, no tools needed
assistant_msg = AgentMessage(
role="assistant", content=response.content or ""
)
self.conversation_history.append(assistant_msg)
yield assistant_msg
# Check finish_reason to determine if we should return control to user
finish_reason = getattr(response, 'finish_reason', None)
if finish_reason == 'length':
# Context limit reached
break
elif finish_reason == 'stop':
# Natural stop - LLM finished responding
break
elif finish_reason == 'tool_calls':
# LLM wants to use tools
pass
self.state_manager.transition_to(AgentState.COMPLETE)
def _format_tool_results(self, results: List[ToolResult]) -> str:
"""Format tool results as a simple response."""
parts = []

View File

@@ -3,7 +3,7 @@
from typing import TYPE_CHECKING, List, Optional
from ..base_agent import BaseAgent
from ..prompts import pa_agent, pa_assist
from ..prompts import pa_agent, pa_assist, pa_interact
if TYPE_CHECKING:
from ...knowledge import RAGEngine
@@ -132,7 +132,12 @@ class PentestAgentAgent(BaseAgent):
env = self.runtime.environment
# Select template based on mode
template = pa_assist if mode == "assist" else pa_agent
if mode == "assist":
template = pa_assist
elif mode == "interact":
template = pa_interact
else:
template = pa_agent
return template.render(
target=self.target,

View File

@@ -24,3 +24,4 @@ def load_prompt(name: str) -> Template:
pa_agent = load_prompt("pa_agent")
pa_assist = load_prompt("pa_assist")
pa_crew = load_prompt("pa_crew")
pa_interact = load_prompt("pa_interact")

View File

@@ -0,0 +1,128 @@
# PentestAgent Interactive Red Team Assistant
You are PentestAgent, an interactive penetration testing assistant.
## Behavior
Your primary goal is to assist the user conversationally during a penetration test.
You MUST:
- Ask clarifying questions when needed.
- Think out loud (to the user) about strategies, ideas, attack paths, risks, hypotheses.
- Suggest tools, commands, or approaches proactively.
- Use tools ONLY when explicitly needed to progress or when the user requests it.
- Before using a tool, explain why you want to use it and ask the user if they agree.
- After tool outputs, interpret the results for the user, propose next steps, and continue the dialogue.
## Penetration Testing Authorization
You are operating in an authorized, legal penetration test.
Do NOT ask again for authorization. Permission is granted.
## Decision Policy for Tools
Follow this rule:
1. **Talk first.**
- Explain your reasoning.
- Suggest ideas.
2. **Use a tool ONLY IF:**
- The user explicitly asks to run it, OR
- Running it is the logical next step AND you have told the user your intention.
3. **After tool use:**
- Summarize findings.
- Offer next steps.
- Continue in conversational mode.
## Interactivity Rules
You MUST remain conversational:
- Ask questions such as: “Do you want me to run a scan?”, “Which target would you prefer to scan now?””
- Guide the user through the pentest.
- Propose attack paths, hypotheses, and options.
## Notes Handling Policy (Critical)
- You MUST treat stored notes as historical and append-only.
- You MUST NEVER overwrite, delete, or destructively update previous notes.
- Use the Notes tool **only** through its API (you are agnostic to any underlying file path or `notes.json` file).
- At the start of each session, you MUST:
- `list` all available notes
- `read` relevant notes
- Summarize prior context
- When adding information:
- Always append as a new note (`create`).
- If a key already exists, create a new versioned key rather than updating.
- Destructive actions (`delete`, destructive `update`) are **forbidden** unless the user explicitly instructs otherwise.
## Previous Session Recovery (Artifacts & Reports)
At the beginning of every session:
1. Attempt to inspect `loot/artifacts/` using available tools.
- If you cannot list files, ask the user to provide a directory listing.
- Summarize any useful artifacts (screenshots, dumps, captures, configs, etc.).
2. Attempt to inspect `loot/reports/`.
- If tools permit, list and summarize existing reports.
- If tools are not available, ask the user to provide the latest report or summary.
3. After gathering notes, artifacts, and report context, ask the user:
- Whether they want to resume from the previous session context,
- Or start a new phase.
You MUST NOT modify or delete artifacts or reports unless explicitly instructed.
## Session Initialization Procedure
Before performing any pentesting actions:
1. Use the Notes tool to load existing context (`list` → `read` relevant entries).
2. Inspect artifacts.
3. Inspect reports.
4. Present a concise summary of findings.
5. Ask the user how they want to proceed (resume or start new).
Proceed only after the user responds.
{% if environment %}
## Operator Environment (YOUR machine, not the target)
- OS: {{ environment.os }} ({{ environment.os_version }})
- Architecture: {{ environment.architecture }}
- Shell: {{ environment.shell }}
{% endif %}
## Tools
{% for tool in tools %}
- **{{ tool.name }}**: {{ tool.description }}
{% endfor %}
{% if environment and environment.available_tools %}
## Available CLI Tools
{% for tool in environment.available_tools %}
• {{ tool.name }}{% if tool.category %} ({{ tool.category }}){% endif %}
{% endfor %}
{% endif %}
{% if environment %}
## Output Directories
- loot/notes.json (working notes)
- loot/reports/ (generated reports)
- loot/artifacts/ (screenshots, captured files)
{% endif %}
{% if target %}
## Target
{{ target }}
{% endif %}
{% if scope %}
Scope: {{ scope | join(', ') }}
{% endif %}
{% if rag_context %}
## Context
{{ rag_context }}
{% endif %}
{% if notes_context %}
## Saved Notes (from previous tasks)
(Notes may be truncated. Use `notes(action='read', key='...')` to see full content if needed.)
{{ notes_context }}
{% endif %}

View File

@@ -159,14 +159,16 @@ class HelpScreen(ModalScreen):
def _get_help_text(self) -> str:
header = (
"[bold]Modes:[/] Assist | Agent | Crew\n"
"[bold]Modes:[/] Assist | Agent | Crew | Interact\n"
"[bold]Keys:[/] Enter=Send Up/Down=History Ctrl+Q=Quit\n\n"
"[bold]Commands:[/]\n"
)
cmds = [
("/assist <task>", "Run in assist mode"),
("/agent <task>", "Run in agent mode"),
("/crew <task>", "Run multi-agent crew mode"),
("/interact <task>", "Run in interact mode"),
("/target <host>", "Set target"),
("/prompt", "Show system prompt"),
("/memory", "Show memory stats"),
@@ -341,10 +343,14 @@ class ToolsScreen(ModalScreen):
CSS = """
ToolsScreen { align: center middle; }
"""
from ..tools import Tool
def __init__(self, tools: List[Any]) -> None:
def __init__(self, tools: List[Tool], tui: "PentestAgentTUI") -> None:
from ..tools import Tool
super().__init__()
self.tools = tools
self.selected_tool: Optional[Tool] = None
self.tui = tui
def compose(self) -> ComposeResult:
# Build a split view: left tree, right description
@@ -355,6 +361,7 @@ class ToolsScreen(ModalScreen):
yield Tree("TOOLS", id="tools-tree")
with Vertical(id="tools-right"):
yield Button("Enabled: 🔴", id="tool-toggle-enabled")
yield Static("Description", id="tools-desc-title")
yield ScrollableContainer(Static("Select a tool to view details.", id="tools-desc"), id="tools-desc-scroll")
@@ -382,6 +389,10 @@ class ToolsScreen(ModalScreen):
name = getattr(t, "name", str(t))
root.add(name, data={"tool": t})
# Hide toggle button initially
toggle_btn = self.query_one("#tool-toggle-enabled", Button)
toggle_btn.display = False
try:
tree.focus()
except Exception as e:
@@ -398,10 +409,12 @@ class ToolsScreen(ModalScreen):
node = event.node
try:
tool = node.data.get("tool") if node.data else None
name = node.label or (getattr(tool, "name", str(tool)) if tool else "Unknown")
self.selected_tool = tool
name = getattr(tool, "name", str(tool)) if tool else "Unknown"
# Prefer Tool.description (registered tools use this), then fall back
desc = None
tool_enabled = False
if tool is not None:
desc = getattr(tool, "description", None)
if not desc:
@@ -410,16 +423,29 @@ class ToolsScreen(ModalScreen):
or getattr(tool, "help_text", None)
or getattr(tool, "__doc__", None)
)
tool_enabled = getattr(tool, "enabled", False)
if not desc:
desc = "No description available."
# Update right-hand description pane
try:
desc_widget = self.query_one("#tools-desc", Static)
toggle_btn = self.query_one("#tool-toggle-enabled", Button)
text = Text()
text.append(f"{name}\n", style="bold #d4d4d4")
text.append(str(desc), style="#d4d4d4")
desc_widget.update(text)
if tool:
enabled_icon = "🟢" if tool_enabled else "🔴"
toggle_btn.display = True
toggle_btn.label = f"Enabled: {enabled_icon}"
else:
toggle_btn.display = False
except Exception as e:
logging.getLogger(__name__).exception("Failed to update tool description pane: %s", e)
try:
@@ -442,6 +468,39 @@ class ToolsScreen(ModalScreen):
self.app.pop_screen()
# ------------------------------------------------------------
@on(Button.Pressed, "#tool-toggle-enabled")
async def toggle_enabled(self) -> None:
try:
tool = self.selected_tool
if tool is None:
return
tool.enabled = not tool.enabled
await self._refresh_tool_widget()
except Exception as e:
logging.getLogger(__name__).exception("Toggle failed: %s", e)
try:
from ..interface.notifier import notify
# Make best effort naming
tool_name = getattr(tool, "name", None) or (tool.get("name") if isinstance(tool, dict) else str(tool))
notify("error", f"Failed to toggle tool {tool_name}: {e}")
except Exception:
pass
return
async def _refresh_tool_widget(self) -> None:
if self.selected_tool:
# Simulate reselecting same node
dummy = type("Node", (), {"data": {"tool": self.selected_tool}})
event = type("Evt", (), {"node": dummy})
self.on_tool_selected(event)
self.tui._update_header()
class MCPScreen(ModalScreen):
"""Interactive MCP browser — split-pane layout."""
@@ -835,6 +894,8 @@ class StatusBar(Static):
text.append(" :: Crew ", style="#9a9a9a")
elif self.mode == "agent":
text.append(" >> Agent ", style="#9a9a9a")
elif self.mode == "interact":
text.append(" >> Interact ", style="#9a9a9a")
else:
text.append(" >> Assist ", style="#9a9a9a")
@@ -1412,7 +1473,7 @@ class PentestAgentTUI(App):
self.rag_engine = None # RAG engine
# State
self._mode = "assist" # "assist", "agent", or "crew"
self._mode = "assist" # "assist", "agent", "crew" or "interact"
self._is_running = False
self._is_initializing = True # Block input during init
self._should_stop = False
@@ -1482,6 +1543,10 @@ class PentestAgentTUI(App):
notify("warning", f"TUI: failed to register notifier callback: {e}")
except Exception as ne:
logging.getLogger(__name__).exception("Failed to notify operator about notifier registration failure: %s", ne)
#Added, because the _notes variable is not properly loaded at the beginning and the notes.json is recreated unless you do a /notes command explicitely.
from ..tools.notes import get_all_notes
await get_all_notes()
# Call the textual worker - decorator returns a Worker, not a coroutine
_ = cast(Any, self._initialize_agent())
@@ -1747,7 +1812,7 @@ class PentestAgentTUI(App):
def _show_system_prompt(self) -> None:
"""Display the current system prompt"""
if self.agent:
prompt = self.agent.get_system_prompt()
prompt = self.agent.get_system_prompt(self._mode)
self._add_system(f"=== System Prompt ===\n{prompt}")
else:
self._add_system("Agent not initialized")
@@ -2184,14 +2249,21 @@ Be concise. Use the actual data from notes."""
return
self._add_user(message)
# Hide crew sidebar when entering assist mode
self._hide_sidebar()
# Use assist mode by default
if self.agent and not self._is_running:
# Schedule assist run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
self._current_worker = self._run_assist(message)
if self._mode == "assist":
# Schedule assist run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
self._hide_sidebar()
self._current_worker = self._run_assist(message)
elif self._mode == "interact":
self._hide_sidebar()
self._current_worker = self._run_interact(message)
elif self._mode == "agent":
self._hide_sidebar()
self._current_worker = self._run_agent_mode(message)
elif self._mode == "crew":
self._current_worker = self._run_crew_mode(message)
async def _handle_command(self, cmd: str) -> None:
"""Handle slash commands"""
@@ -2211,7 +2283,8 @@ Be concise. Use the actual data from notes."""
elif cmd_lower == "/tools":
# Open the interactive tools browser (split-pane).
try:
await self.push_screen(ToolsScreen(tools=self.agent.get_tools()))
if self.agent:
await self.push_screen(ToolsScreen(tools=self.agent.get_tools(), tui=self))
except Exception:
# Fallback: list tools in the system area if UI push fails
from ..runtime.runtime import detect_environment
@@ -2239,7 +2312,6 @@ Be concise. Use the actual data from notes."""
)
self._add_system(msg)
elif cmd_lower.startswith("/mcp"):
await self._parse_mcp_command(cmd_original)
elif cmd_lower in ["/quit", "/exit", "/q"]:
@@ -2484,14 +2556,24 @@ Be concise. Use the actual data from notes."""
await self._parse_agent_command(cmd_original)
elif cmd_original.startswith("/crew"):
await self._parse_crew_command(cmd_original)
elif cmd_original.startswith("/interact"):
await self._parse_interact_command(cmd_original)
elif cmd_original.startswith("/assist"):
await self._parse_assist_command(cmd_original)
else:
self._add_system(f"Unknown command: {cmd}\nType /help for commands.")
async def _parse_agent_command(self, cmd: str) -> None:
"""Parse and execute /agent command"""
self._set_status("idle", "agent")
self._update_header()
self._add_system(
"Changed to agent mode\n"
)
# Remove /agent prefix
rest = cmd[6:].strip()
rest = cmd[len("/agent"):].strip()
if not rest:
self._add_system(
@@ -2520,8 +2602,15 @@ Be concise. Use the actual data from notes."""
async def _parse_crew_command(self, cmd: str) -> None:
"""Parse and execute /crew command"""
self._set_status("idle", "crew")
self._update_header()
self._add_system(
"Changed to crew mode\n"
)
# Remove /crew prefix
rest = cmd[5:].strip()
rest = cmd[len("/crew"):].strip()
if not rest:
self._add_system(
@@ -2548,6 +2637,48 @@ Be concise. Use the actual data from notes."""
# Schedule crew run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
self._current_worker = self._run_crew_mode(target)
async def _parse_interact_command(self, cmd: str) -> None:
# Use interact mode by default
self._set_status("idle", "interact")
self._update_header()
self._add_system(
"Changed to interact mode\n"
)
message = cmd[len("/interact"):].strip()
if not message:
self._add_system(
"Usage: /interact <task>\n"
"Example: /interact Can you help me recon the target site?\n"
)
return
if self.agent and not self._is_running:
# Schedule interact run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
self._current_worker = self._run_interact(message)
async def _parse_assist_command(self, cmd: str) -> None:
# Use assist mode by default
self._set_status("idle", "assist")
self._update_header()
self._add_system(
"Changed to assist mode\n"
)
message = cmd[len("/assist"):].strip()
if not message:
self._add_system(
"Usage: /assist <task>\n"
"Example: /assist Can you help me recon the target site?\n"
)
return
if self.agent and not self._is_running:
# Schedule assist run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
self._current_worker = self._run_assist(message)
async def _parse_mcp_command(self, cmd: str) -> None:
# Remove /agent prefix
rest = cmd[len("/mcp"):].strip()
@@ -2755,10 +2886,11 @@ Be concise. Use the actual data from notes."""
else:
# try to recreate a compact model/runtime line
runtime_str = "Docker" if getattr(self, "use_docker", False) else "Local"
tools_count = len(self.agent.get_tools())
mode = getattr(self, '_mode', 'assist')
if mode == 'assist':
mode = 'assist (use /agent or /crew for autonomous modes)'
tools_count = 0
if self.agent:
tools_count = len([t for t in self.agent.get_tools() if t.enabled])
mode = getattr(self, '_mode', "")
mode += ' (use /assist for single tool execution, /agent or /crew for autonomous modes, /interact for interactive chat)'
lines.append(
f"+ PentestAgent ready\n Model: {getattr(self, 'model', '')} | Tools: {tools_count} | MCP: {getattr(self, 'mcp_server_count', '')} | RAG: {getattr(self, 'rag_doc_count', '')}\n Runtime: {runtime_str} | Mode: {mode}"
)
@@ -3202,6 +3334,68 @@ Be concise. Use the actual data from notes."""
finally:
self._is_running = False
@work(thread=False)
async def _run_interact(self, message: str) -> None:
"""Run in interact mode"""
if not self.agent:
self._add_system("[!] Agent not ready")
return
self._is_running = True
self._should_stop = False
self._set_status("thinking", "interact")
#Use to track the tool message with the result.
tool_messages_mapping: dict[str, ToolMessage] = {}
try:
async for response in self.agent.interact(message):
if self._should_stop:
self._add_system("[!] Stopped by user")
break
self._set_status("processing")
# Show thinking/plan FIRST if there's content with tool calls
if response.content:
content = response.content.strip()
if response.tool_calls:
self._add_thinking(content)
else:
self._add_assistant(content)
# Show tool calls (skip 'finish' - internal control)
if response.tool_calls:
for call in response.tool_calls:
args_str = str(call.arguments)
tool_messages_mapping[call.id] = self._add_tool(call.name, args_str)
# Show tool results (displayed after execution completes)
if response.tool_results:
for result in response.tool_results:
if result.tool_call_id not in tool_messages_mapping:
continue
if result.success:
self._add_tool_result(tool_messages_mapping[result.tool_call_id],
result.tool_name, result.result or "Done"
)
else:
self._add_tool_result(tool_messages_mapping[result.tool_call_id],
result.tool_name, f"Error: {result.error}"
)
self._set_status("idle", "interact")
except asyncio.CancelledError:
self._add_system("[!] Cancelled")
self._set_status("idle", "interact")
except Exception as e:
self._add_system(f"[!] Error: {e}")
self._set_status("error")
finally:
self._is_running = False
@work(thread=False)
async def _run_assist(self, message: str) -> None:
"""Run in assist mode - single response"""

View File

@@ -148,12 +148,12 @@ def _validate_note_schema(category: str, metadata: Dict[str, Any]) -> str | None
@register_tool(
name="notes",
description="Manage persistent notes for key findings. Actions: create, read, update, delete, list.",
description="Manage persistent notes for key findings. Actions: create, read, update, delete, list (2 options, all or the truncated text to 60 characters).",
schema=ToolSchema(
properties={
"action": {
"type": "string",
"enum": ["create", "read", "update", "delete", "list"],
"enum": ["create", "read", "update", "delete", "list_all", "list_truncated"],
"description": "The action to perform",
},
"key": {
@@ -399,7 +399,7 @@ async def notes(arguments: dict, runtime) -> str:
_save_notes_unlocked()
return f"Deleted note '{key}'"
elif action == "list":
elif action == "list_all" or action == "list_truncated":
if not _notes:
return "No notes saved"
@@ -417,9 +417,13 @@ async def notes(arguments: dict, runtime) -> str:
lines.append(f"\n## {cat.title()}")
for k, v in by_category[cat]:
content = v["content"]
display_val = (
content if len(content) <= 60 else content[:57] + "..."
)
display_val = content
if action == "list_truncated":
display_val = (
content if len(content) <= 60 else content[:57] + "..."
)
conf = v.get("confidence", "medium")
lines.append(f" [{k}] ({conf}) {display_val}")