mirror of
https://github.com/GH05TCREW/pentestagent.git
synced 2026-03-06 22:04:08 +00:00
feat: Added interactive mode (#1)
* Adding interactive mode. * Added content of /loot dir to .gitignore. * When writing just a mode on the TUI, it will automatically switch to that mode (no need to prepend /agent, /assist, /interact or /crew before.
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -93,7 +93,6 @@ tests/test_*.py
|
||||
|
||||
# Workspaces directory (user data should not be committed)
|
||||
/workspaces/
|
||||
loot/token_usage.json
|
||||
loot/*
|
||||
mcp_examples/kali/mcp_servers.json
|
||||
pentestagent/mcp/mcp_servers.json
|
||||
loot/notes.json
|
||||
|
||||
@@ -944,6 +944,86 @@ Call create_plan with the new steps OR feasible=False."""
|
||||
|
||||
self.state_manager.transition_to(AgentState.COMPLETE)
|
||||
|
||||
async def interact(self, message: str) -> AsyncIterator[AgentMessage]:
|
||||
"""
|
||||
Interactive mode
|
||||
|
||||
Args:
|
||||
message: The user message to respond to
|
||||
|
||||
Yields:
|
||||
AgentMessage objects
|
||||
"""
|
||||
self.state_manager.transition_to(AgentState.THINKING)
|
||||
self.conversation_history.append(AgentMessage(role="user", content=message))
|
||||
|
||||
# Filter out 'finish' tool - not needed for single-shot assist mode
|
||||
assist_tools = [t for t in self.tools if t.name != "finish"]
|
||||
|
||||
|
||||
while True:
|
||||
# Single LLM call with tools available
|
||||
response = await self.llm.generate(
|
||||
system_prompt=self.get_system_prompt(mode="interact"),
|
||||
messages=self._format_messages_for_llm(),
|
||||
tools=assist_tools,
|
||||
)
|
||||
|
||||
# If LLM wants to use tools, execute and return result
|
||||
if response.tool_calls:
|
||||
# Build tool calls list
|
||||
tool_calls = [
|
||||
ToolCall(
|
||||
id=tc.id if hasattr(tc, "id") else str(i),
|
||||
name=(
|
||||
tc.function.name
|
||||
if hasattr(tc, "function")
|
||||
else tc.get("name", "")
|
||||
),
|
||||
arguments=self._parse_arguments(tc),
|
||||
)
|
||||
for i, tc in enumerate(response.tool_calls)
|
||||
]
|
||||
|
||||
# Store in history (minimal content to save tokens)
|
||||
assistant_msg = AgentMessage(
|
||||
role="assistant", content=response.content or "", tool_calls=tool_calls
|
||||
)
|
||||
self.conversation_history.append(assistant_msg)
|
||||
yield assistant_msg
|
||||
|
||||
# NOW execute the tools (this can take a while)
|
||||
self.state_manager.transition_to(AgentState.EXECUTING)
|
||||
tool_results = await self._execute_tools(response.tool_calls)
|
||||
|
||||
tool_result_msg = AgentMessage(
|
||||
role="tool_result", content="", tool_results=tool_results
|
||||
)
|
||||
self.conversation_history.append(tool_result_msg)
|
||||
yield tool_result_msg
|
||||
|
||||
else:
|
||||
# Direct response, no tools needed
|
||||
assistant_msg = AgentMessage(
|
||||
role="assistant", content=response.content or ""
|
||||
)
|
||||
self.conversation_history.append(assistant_msg)
|
||||
yield assistant_msg
|
||||
|
||||
# Check finish_reason to determine if we should return control to user
|
||||
finish_reason = getattr(response, 'finish_reason', None)
|
||||
if finish_reason == 'length':
|
||||
# Context limit reached
|
||||
break
|
||||
elif finish_reason == 'stop':
|
||||
# Natural stop - LLM finished responding
|
||||
break
|
||||
elif finish_reason == 'tool_calls':
|
||||
# LLM wants to use tools
|
||||
pass
|
||||
|
||||
self.state_manager.transition_to(AgentState.COMPLETE)
|
||||
|
||||
def _format_tool_results(self, results: List[ToolResult]) -> str:
|
||||
"""Format tool results as a simple response."""
|
||||
parts = []
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
from typing import TYPE_CHECKING, List, Optional
|
||||
|
||||
from ..base_agent import BaseAgent
|
||||
from ..prompts import pa_agent, pa_assist
|
||||
from ..prompts import pa_agent, pa_assist, pa_interact
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ...knowledge import RAGEngine
|
||||
@@ -132,7 +132,12 @@ class PentestAgentAgent(BaseAgent):
|
||||
env = self.runtime.environment
|
||||
|
||||
# Select template based on mode
|
||||
template = pa_assist if mode == "assist" else pa_agent
|
||||
if mode == "assist":
|
||||
template = pa_assist
|
||||
elif mode == "interact":
|
||||
template = pa_interact
|
||||
else:
|
||||
template = pa_agent
|
||||
|
||||
return template.render(
|
||||
target=self.target,
|
||||
|
||||
@@ -24,3 +24,4 @@ def load_prompt(name: str) -> Template:
|
||||
pa_agent = load_prompt("pa_agent")
|
||||
pa_assist = load_prompt("pa_assist")
|
||||
pa_crew = load_prompt("pa_crew")
|
||||
pa_interact = load_prompt("pa_interact")
|
||||
85
pentestagent/agents/prompts/pa_interact.jinja
Normal file
85
pentestagent/agents/prompts/pa_interact.jinja
Normal file
@@ -0,0 +1,85 @@
|
||||
# PentestAgent – Interactive Red Team Assistant
|
||||
|
||||
You are PentestAgent, an interactive penetration testing assistant.
|
||||
|
||||
## Behavior
|
||||
Your primary goal is to assist the user conversationally during a penetration test.
|
||||
You MUST:
|
||||
- Ask clarifying questions when needed.
|
||||
- Think out loud (to the user) about strategies, ideas, attack paths, risks, hypotheses.
|
||||
- Suggest tools, commands, or approaches proactively.
|
||||
- Use tools ONLY when explicitly needed to progress or when the user requests it.
|
||||
- Before using a tool, explain why you want to use it and ask the user if they agree.
|
||||
- After tool outputs, interpret the results for the user, propose next steps, and continue the dialogue.
|
||||
|
||||
## Penetration Testing Authorization
|
||||
You are operating in an authorized, legal penetration test.
|
||||
Do NOT ask again for authorization. Permission is granted.
|
||||
|
||||
## Decision Policy for Tools
|
||||
Follow this rule:
|
||||
|
||||
1. **Talk first.**
|
||||
- Explain your reasoning.
|
||||
- Suggest ideas.
|
||||
|
||||
2. **Use a tool ONLY IF:**
|
||||
- The user explicitly asks to run it, OR
|
||||
- Running it is the logical next step AND you have told the user your intention.
|
||||
|
||||
3. **After tool use:**
|
||||
- Summarize findings.
|
||||
- Offer next steps.
|
||||
- Continue in conversational mode.
|
||||
|
||||
## Interactivity Rules
|
||||
You MUST remain conversational:
|
||||
- Ask questions such as: “Do you want me to run a scan?”, “Which target would you prefer to scan now?””
|
||||
- Guide the user through the pentest.
|
||||
- Propose attack paths, hypotheses, and options.
|
||||
|
||||
|
||||
{% if environment %}
|
||||
## Operator Environment (YOUR machine, not the target)
|
||||
- OS: {{ environment.os }} ({{ environment.os_version }})
|
||||
- Architecture: {{ environment.architecture }}
|
||||
- Shell: {{ environment.shell }}
|
||||
{% endif %}
|
||||
|
||||
## Tools
|
||||
{% for tool in tools %}
|
||||
- **{{ tool.name }}**: {{ tool.description }}
|
||||
{% endfor %}
|
||||
|
||||
{% if environment and environment.available_tools %}
|
||||
## Available CLI Tools
|
||||
{% for tool in environment.available_tools %}
|
||||
• {{ tool.name }}{% if tool.category %} ({{ tool.category }}){% endif %}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
|
||||
{% if environment %}
|
||||
## Output Directories
|
||||
- loot/notes.json (working notes)
|
||||
- loot/reports/ (generated reports)
|
||||
- loot/artifacts/ (screenshots, captured files)
|
||||
{% endif %}
|
||||
|
||||
{% if target %}
|
||||
## Target
|
||||
{{ target }}
|
||||
{% endif %}
|
||||
{% if scope %}
|
||||
Scope: {{ scope | join(', ') }}
|
||||
{% endif %}
|
||||
|
||||
{% if rag_context %}
|
||||
## Context
|
||||
{{ rag_context }}
|
||||
{% endif %}
|
||||
|
||||
{% if notes_context %}
|
||||
## Saved Notes (from previous tasks)
|
||||
(Notes may be truncated. Use `notes(action='read', key='...')` to see full content if needed.)
|
||||
{{ notes_context }}
|
||||
{% endif %}
|
||||
@@ -835,6 +835,8 @@ class StatusBar(Static):
|
||||
text.append(" :: Crew ", style="#9a9a9a")
|
||||
elif self.mode == "agent":
|
||||
text.append(" >> Agent ", style="#9a9a9a")
|
||||
elif self.mode == "interact":
|
||||
text.append(" >> Interact ", style="#9a9a9a")
|
||||
else:
|
||||
text.append(" >> Assist ", style="#9a9a9a")
|
||||
|
||||
@@ -1412,7 +1414,7 @@ class PentestAgentTUI(App):
|
||||
self.rag_engine = None # RAG engine
|
||||
|
||||
# State
|
||||
self._mode = "assist" # "assist", "agent", or "crew"
|
||||
self._mode = "assist" # "assist", "agent", "crew" or "interact"
|
||||
self._is_running = False
|
||||
self._is_initializing = True # Block input during init
|
||||
self._should_stop = False
|
||||
@@ -1747,7 +1749,7 @@ class PentestAgentTUI(App):
|
||||
def _show_system_prompt(self) -> None:
|
||||
"""Display the current system prompt"""
|
||||
if self.agent:
|
||||
prompt = self.agent.get_system_prompt()
|
||||
prompt = self.agent.get_system_prompt(self._mode)
|
||||
self._add_system(f"=== System Prompt ===\n{prompt}")
|
||||
else:
|
||||
self._add_system("Agent not initialized")
|
||||
@@ -2184,14 +2186,21 @@ Be concise. Use the actual data from notes."""
|
||||
return
|
||||
|
||||
self._add_user(message)
|
||||
|
||||
|
||||
# Hide crew sidebar when entering assist mode
|
||||
self._hide_sidebar()
|
||||
|
||||
# Use assist mode by default
|
||||
if self.agent and not self._is_running:
|
||||
# Schedule assist run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
|
||||
self._current_worker = self._run_assist(message)
|
||||
if self._mode == "assist":
|
||||
# Schedule assist run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
|
||||
self._hide_sidebar()
|
||||
self._current_worker = self._run_assist(message)
|
||||
elif self._mode == "interact":
|
||||
self._hide_sidebar()
|
||||
self._current_worker = self._run_interact(message)
|
||||
elif self._mode == "agent":
|
||||
self._hide_sidebar()
|
||||
self._current_worker = self._run_agent_mode(message)
|
||||
elif self._mode == "crew":
|
||||
self._current_worker = self._run_crew_mode(message)
|
||||
|
||||
async def _handle_command(self, cmd: str) -> None:
|
||||
"""Handle slash commands"""
|
||||
@@ -2239,7 +2248,6 @@ Be concise. Use the actual data from notes."""
|
||||
)
|
||||
|
||||
self._add_system(msg)
|
||||
|
||||
elif cmd_lower.startswith("/mcp"):
|
||||
await self._parse_mcp_command(cmd_original)
|
||||
elif cmd_lower in ["/quit", "/exit", "/q"]:
|
||||
@@ -2484,14 +2492,24 @@ Be concise. Use the actual data from notes."""
|
||||
await self._parse_agent_command(cmd_original)
|
||||
elif cmd_original.startswith("/crew"):
|
||||
await self._parse_crew_command(cmd_original)
|
||||
elif cmd_original.startswith("/interact"):
|
||||
await self._parse_interact_command(cmd_original)
|
||||
elif cmd_original.startswith("/assist"):
|
||||
await self._parse_assist_command(cmd_original)
|
||||
else:
|
||||
self._add_system(f"Unknown command: {cmd}\nType /help for commands.")
|
||||
|
||||
async def _parse_agent_command(self, cmd: str) -> None:
|
||||
"""Parse and execute /agent command"""
|
||||
|
||||
self._set_status("idle", "agent")
|
||||
self._update_header()
|
||||
self._add_system(
|
||||
"Changed to agent mode\n"
|
||||
)
|
||||
|
||||
# Remove /agent prefix
|
||||
rest = cmd[6:].strip()
|
||||
rest = cmd[len("/agent"):].strip()
|
||||
|
||||
if not rest:
|
||||
self._add_system(
|
||||
@@ -2520,8 +2538,15 @@ Be concise. Use the actual data from notes."""
|
||||
|
||||
async def _parse_crew_command(self, cmd: str) -> None:
|
||||
"""Parse and execute /crew command"""
|
||||
|
||||
self._set_status("idle", "crew")
|
||||
self._update_header()
|
||||
self._add_system(
|
||||
"Changed to crew mode\n"
|
||||
)
|
||||
|
||||
# Remove /crew prefix
|
||||
rest = cmd[5:].strip()
|
||||
rest = cmd[len("/crew"):].strip()
|
||||
|
||||
if not rest:
|
||||
self._add_system(
|
||||
@@ -2548,6 +2573,48 @@ Be concise. Use the actual data from notes."""
|
||||
# Schedule crew run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
|
||||
self._current_worker = self._run_crew_mode(target)
|
||||
|
||||
async def _parse_interact_command(self, cmd: str) -> None:
|
||||
# Use interact mode by default
|
||||
|
||||
self._set_status("idle", "interact")
|
||||
self._update_header()
|
||||
self._add_system(
|
||||
"Changed to interact mode\n"
|
||||
)
|
||||
|
||||
message = cmd[len("/interact"):].strip()
|
||||
if not message:
|
||||
self._add_system(
|
||||
"Usage: /interact <task>\n"
|
||||
"Example: /interact Can you help me recon the target site?\n"
|
||||
)
|
||||
return
|
||||
|
||||
if self.agent and not self._is_running:
|
||||
# Schedule interact run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
|
||||
self._current_worker = self._run_interact(message)
|
||||
|
||||
async def _parse_assist_command(self, cmd: str) -> None:
|
||||
# Use assist mode by default
|
||||
|
||||
self._set_status("idle", "assist")
|
||||
self._update_header()
|
||||
self._add_system(
|
||||
"Changed to assist mode\n"
|
||||
)
|
||||
|
||||
message = cmd[len("/assist"):].strip()
|
||||
if not message:
|
||||
self._add_system(
|
||||
"Usage: /assist <task>\n"
|
||||
"Example: /assist Can you help me recon the target site?\n"
|
||||
)
|
||||
return
|
||||
|
||||
if self.agent and not self._is_running:
|
||||
# Schedule assist run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker)
|
||||
self._current_worker = self._run_assist(message)
|
||||
|
||||
async def _parse_mcp_command(self, cmd: str) -> None:
|
||||
# Remove /agent prefix
|
||||
rest = cmd[len("/mcp"):].strip()
|
||||
@@ -2756,9 +2823,8 @@ Be concise. Use the actual data from notes."""
|
||||
# try to recreate a compact model/runtime line
|
||||
runtime_str = "Docker" if getattr(self, "use_docker", False) else "Local"
|
||||
tools_count = len(self.agent.get_tools())
|
||||
mode = getattr(self, '_mode', 'assist')
|
||||
if mode == 'assist':
|
||||
mode = 'assist (use /agent or /crew for autonomous modes)'
|
||||
mode = getattr(self, '_mode', "")
|
||||
mode += ' (use /assist for single tool execution, /agent or /crew for autonomous modes, /interact for interactive chat)'
|
||||
lines.append(
|
||||
f"+ PentestAgent ready\n Model: {getattr(self, 'model', '')} | Tools: {tools_count} | MCP: {getattr(self, 'mcp_server_count', '')} | RAG: {getattr(self, 'rag_doc_count', '')}\n Runtime: {runtime_str} | Mode: {mode}"
|
||||
)
|
||||
@@ -3202,6 +3268,68 @@ Be concise. Use the actual data from notes."""
|
||||
finally:
|
||||
self._is_running = False
|
||||
|
||||
@work(thread=False)
|
||||
async def _run_interact(self, message: str) -> None:
|
||||
"""Run in interact mode"""
|
||||
if not self.agent:
|
||||
self._add_system("[!] Agent not ready")
|
||||
return
|
||||
|
||||
self._is_running = True
|
||||
self._should_stop = False
|
||||
self._set_status("thinking", "interact")
|
||||
|
||||
|
||||
#Use to track the tool message with the result.
|
||||
tool_messages_mapping: dict[str, ToolMessage] = {}
|
||||
|
||||
try:
|
||||
async for response in self.agent.interact(message):
|
||||
if self._should_stop:
|
||||
self._add_system("[!] Stopped by user")
|
||||
break
|
||||
|
||||
self._set_status("processing")
|
||||
|
||||
# Show thinking/plan FIRST if there's content with tool calls
|
||||
if response.content:
|
||||
content = response.content.strip()
|
||||
if response.tool_calls:
|
||||
self._add_thinking(content)
|
||||
else:
|
||||
self._add_assistant(content)
|
||||
|
||||
# Show tool calls (skip 'finish' - internal control)
|
||||
if response.tool_calls:
|
||||
for call in response.tool_calls:
|
||||
args_str = str(call.arguments)
|
||||
tool_messages_mapping[call.id] = self._add_tool(call.name, args_str)
|
||||
|
||||
# Show tool results (displayed after execution completes)
|
||||
if response.tool_results:
|
||||
for result in response.tool_results:
|
||||
if result.tool_call_id not in tool_messages_mapping:
|
||||
continue
|
||||
if result.success:
|
||||
self._add_tool_result(tool_messages_mapping[result.tool_call_id],
|
||||
result.tool_name, result.result or "Done"
|
||||
)
|
||||
else:
|
||||
self._add_tool_result(tool_messages_mapping[result.tool_call_id],
|
||||
result.tool_name, f"Error: {result.error}"
|
||||
)
|
||||
|
||||
self._set_status("idle", "interact")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
self._add_system("[!] Cancelled")
|
||||
self._set_status("idle", "interact")
|
||||
except Exception as e:
|
||||
self._add_system(f"[!] Error: {e}")
|
||||
self._set_status("error")
|
||||
finally:
|
||||
self._is_running = False
|
||||
|
||||
@work(thread=False)
|
||||
async def _run_assist(self, message: str) -> None:
|
||||
"""Run in assist mode - single response"""
|
||||
|
||||
Reference in New Issue
Block a user