From 52278db871dcc9c26387b93302b29a4b2b7d86ec Mon Sep 17 00:00:00 2001 From: famez Date: Sat, 28 Feb 2026 20:11:55 +0100 Subject: [PATCH] feat: Added interactive mode (#1) * Adding interactive mode. * Added content of /loot dir to .gitignore. * When writing just a mode on the TUI, it will automatically switch to that mode (no need to prepend /agent, /assist, /interact or /crew before. --- .gitignore | 3 +- pentestagent/agents/base_agent.py | 80 +++++++++ pentestagent/agents/pa_agent/pa_agent.py | 9 +- pentestagent/agents/prompts/__init__.py | 1 + pentestagent/agents/prompts/pa_interact.jinja | 85 ++++++++++ pentestagent/interface/tui.py | 156 ++++++++++++++++-- 6 files changed, 316 insertions(+), 18 deletions(-) create mode 100644 pentestagent/agents/prompts/pa_interact.jinja diff --git a/.gitignore b/.gitignore index b286f93..2e1b15c 100644 --- a/.gitignore +++ b/.gitignore @@ -93,7 +93,6 @@ tests/test_*.py # Workspaces directory (user data should not be committed) /workspaces/ -loot/token_usage.json +loot/* mcp_examples/kali/mcp_servers.json pentestagent/mcp/mcp_servers.json -loot/notes.json diff --git a/pentestagent/agents/base_agent.py b/pentestagent/agents/base_agent.py index 63a43e0..f865bee 100644 --- a/pentestagent/agents/base_agent.py +++ b/pentestagent/agents/base_agent.py @@ -944,6 +944,86 @@ Call create_plan with the new steps OR feasible=False.""" self.state_manager.transition_to(AgentState.COMPLETE) + async def interact(self, message: str) -> AsyncIterator[AgentMessage]: + """ + Interactive mode + + Args: + message: The user message to respond to + + Yields: + AgentMessage objects + """ + self.state_manager.transition_to(AgentState.THINKING) + self.conversation_history.append(AgentMessage(role="user", content=message)) + + # Filter out 'finish' tool - not needed for single-shot assist mode + assist_tools = [t for t in self.tools if t.name != "finish"] + + + while True: + # Single LLM call with tools available + response = await self.llm.generate( + system_prompt=self.get_system_prompt(mode="interact"), + messages=self._format_messages_for_llm(), + tools=assist_tools, + ) + + # If LLM wants to use tools, execute and return result + if response.tool_calls: + # Build tool calls list + tool_calls = [ + ToolCall( + id=tc.id if hasattr(tc, "id") else str(i), + name=( + tc.function.name + if hasattr(tc, "function") + else tc.get("name", "") + ), + arguments=self._parse_arguments(tc), + ) + for i, tc in enumerate(response.tool_calls) + ] + + # Store in history (minimal content to save tokens) + assistant_msg = AgentMessage( + role="assistant", content=response.content or "", tool_calls=tool_calls + ) + self.conversation_history.append(assistant_msg) + yield assistant_msg + + # NOW execute the tools (this can take a while) + self.state_manager.transition_to(AgentState.EXECUTING) + tool_results = await self._execute_tools(response.tool_calls) + + tool_result_msg = AgentMessage( + role="tool_result", content="", tool_results=tool_results + ) + self.conversation_history.append(tool_result_msg) + yield tool_result_msg + + else: + # Direct response, no tools needed + assistant_msg = AgentMessage( + role="assistant", content=response.content or "" + ) + self.conversation_history.append(assistant_msg) + yield assistant_msg + + # Check finish_reason to determine if we should return control to user + finish_reason = getattr(response, 'finish_reason', None) + if finish_reason == 'length': + # Context limit reached + break + elif finish_reason == 'stop': + # Natural stop - LLM finished responding + break + elif finish_reason == 'tool_calls': + # LLM wants to use tools + pass + + self.state_manager.transition_to(AgentState.COMPLETE) + def _format_tool_results(self, results: List[ToolResult]) -> str: """Format tool results as a simple response.""" parts = [] diff --git a/pentestagent/agents/pa_agent/pa_agent.py b/pentestagent/agents/pa_agent/pa_agent.py index f4449fe..cf43de8 100644 --- a/pentestagent/agents/pa_agent/pa_agent.py +++ b/pentestagent/agents/pa_agent/pa_agent.py @@ -3,7 +3,7 @@ from typing import TYPE_CHECKING, List, Optional from ..base_agent import BaseAgent -from ..prompts import pa_agent, pa_assist +from ..prompts import pa_agent, pa_assist, pa_interact if TYPE_CHECKING: from ...knowledge import RAGEngine @@ -132,7 +132,12 @@ class PentestAgentAgent(BaseAgent): env = self.runtime.environment # Select template based on mode - template = pa_assist if mode == "assist" else pa_agent + if mode == "assist": + template = pa_assist + elif mode == "interact": + template = pa_interact + else: + template = pa_agent return template.render( target=self.target, diff --git a/pentestagent/agents/prompts/__init__.py b/pentestagent/agents/prompts/__init__.py index db12a63..4f67e0e 100644 --- a/pentestagent/agents/prompts/__init__.py +++ b/pentestagent/agents/prompts/__init__.py @@ -24,3 +24,4 @@ def load_prompt(name: str) -> Template: pa_agent = load_prompt("pa_agent") pa_assist = load_prompt("pa_assist") pa_crew = load_prompt("pa_crew") +pa_interact = load_prompt("pa_interact") \ No newline at end of file diff --git a/pentestagent/agents/prompts/pa_interact.jinja b/pentestagent/agents/prompts/pa_interact.jinja new file mode 100644 index 0000000..9c54f4d --- /dev/null +++ b/pentestagent/agents/prompts/pa_interact.jinja @@ -0,0 +1,85 @@ +# PentestAgent – Interactive Red Team Assistant + +You are PentestAgent, an interactive penetration testing assistant. + +## Behavior +Your primary goal is to assist the user conversationally during a penetration test. +You MUST: +- Ask clarifying questions when needed. +- Think out loud (to the user) about strategies, ideas, attack paths, risks, hypotheses. +- Suggest tools, commands, or approaches proactively. +- Use tools ONLY when explicitly needed to progress or when the user requests it. +- Before using a tool, explain why you want to use it and ask the user if they agree. +- After tool outputs, interpret the results for the user, propose next steps, and continue the dialogue. + +## Penetration Testing Authorization +You are operating in an authorized, legal penetration test. +Do NOT ask again for authorization. Permission is granted. + +## Decision Policy for Tools +Follow this rule: + +1. **Talk first.** + - Explain your reasoning. + - Suggest ideas. + +2. **Use a tool ONLY IF:** + - The user explicitly asks to run it, OR + - Running it is the logical next step AND you have told the user your intention. + +3. **After tool use:** + - Summarize findings. + - Offer next steps. + - Continue in conversational mode. + +## Interactivity Rules +You MUST remain conversational: +- Ask questions such as: “Do you want me to run a scan?”, “Which target would you prefer to scan now?”” +- Guide the user through the pentest. +- Propose attack paths, hypotheses, and options. + + +{% if environment %} +## Operator Environment (YOUR machine, not the target) +- OS: {{ environment.os }} ({{ environment.os_version }}) +- Architecture: {{ environment.architecture }} +- Shell: {{ environment.shell }} +{% endif %} + +## Tools +{% for tool in tools %} +- **{{ tool.name }}**: {{ tool.description }} +{% endfor %} + +{% if environment and environment.available_tools %} +## Available CLI Tools +{% for tool in environment.available_tools %} + • {{ tool.name }}{% if tool.category %} ({{ tool.category }}){% endif %} +{% endfor %} +{% endif %} + +{% if environment %} +## Output Directories +- loot/notes.json (working notes) +- loot/reports/ (generated reports) +- loot/artifacts/ (screenshots, captured files) +{% endif %} + +{% if target %} +## Target +{{ target }} +{% endif %} +{% if scope %} +Scope: {{ scope | join(', ') }} +{% endif %} + +{% if rag_context %} +## Context +{{ rag_context }} +{% endif %} + +{% if notes_context %} +## Saved Notes (from previous tasks) +(Notes may be truncated. Use `notes(action='read', key='...')` to see full content if needed.) +{{ notes_context }} +{% endif %} diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index c6f1210..fc11262 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -835,6 +835,8 @@ class StatusBar(Static): text.append(" :: Crew ", style="#9a9a9a") elif self.mode == "agent": text.append(" >> Agent ", style="#9a9a9a") + elif self.mode == "interact": + text.append(" >> Interact ", style="#9a9a9a") else: text.append(" >> Assist ", style="#9a9a9a") @@ -1412,7 +1414,7 @@ class PentestAgentTUI(App): self.rag_engine = None # RAG engine # State - self._mode = "assist" # "assist", "agent", or "crew" + self._mode = "assist" # "assist", "agent", "crew" or "interact" self._is_running = False self._is_initializing = True # Block input during init self._should_stop = False @@ -1747,7 +1749,7 @@ class PentestAgentTUI(App): def _show_system_prompt(self) -> None: """Display the current system prompt""" if self.agent: - prompt = self.agent.get_system_prompt() + prompt = self.agent.get_system_prompt(self._mode) self._add_system(f"=== System Prompt ===\n{prompt}") else: self._add_system("Agent not initialized") @@ -2184,14 +2186,21 @@ Be concise. Use the actual data from notes.""" return self._add_user(message) + - # Hide crew sidebar when entering assist mode - self._hide_sidebar() - - # Use assist mode by default if self.agent and not self._is_running: - # Schedule assist run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker) - self._current_worker = self._run_assist(message) + if self._mode == "assist": + # Schedule assist run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker) + self._hide_sidebar() + self._current_worker = self._run_assist(message) + elif self._mode == "interact": + self._hide_sidebar() + self._current_worker = self._run_interact(message) + elif self._mode == "agent": + self._hide_sidebar() + self._current_worker = self._run_agent_mode(message) + elif self._mode == "crew": + self._current_worker = self._run_crew_mode(message) async def _handle_command(self, cmd: str) -> None: """Handle slash commands""" @@ -2239,7 +2248,6 @@ Be concise. Use the actual data from notes.""" ) self._add_system(msg) - elif cmd_lower.startswith("/mcp"): await self._parse_mcp_command(cmd_original) elif cmd_lower in ["/quit", "/exit", "/q"]: @@ -2484,14 +2492,24 @@ Be concise. Use the actual data from notes.""" await self._parse_agent_command(cmd_original) elif cmd_original.startswith("/crew"): await self._parse_crew_command(cmd_original) + elif cmd_original.startswith("/interact"): + await self._parse_interact_command(cmd_original) + elif cmd_original.startswith("/assist"): + await self._parse_assist_command(cmd_original) else: self._add_system(f"Unknown command: {cmd}\nType /help for commands.") async def _parse_agent_command(self, cmd: str) -> None: """Parse and execute /agent command""" + self._set_status("idle", "agent") + self._update_header() + self._add_system( + "Changed to agent mode\n" + ) + # Remove /agent prefix - rest = cmd[6:].strip() + rest = cmd[len("/agent"):].strip() if not rest: self._add_system( @@ -2520,8 +2538,15 @@ Be concise. Use the actual data from notes.""" async def _parse_crew_command(self, cmd: str) -> None: """Parse and execute /crew command""" + + self._set_status("idle", "crew") + self._update_header() + self._add_system( + "Changed to crew mode\n" + ) + # Remove /crew prefix - rest = cmd[5:].strip() + rest = cmd[len("/crew"):].strip() if not rest: self._add_system( @@ -2548,6 +2573,48 @@ Be concise. Use the actual data from notes.""" # Schedule crew run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker) self._current_worker = self._run_crew_mode(target) + async def _parse_interact_command(self, cmd: str) -> None: + # Use interact mode by default + + self._set_status("idle", "interact") + self._update_header() + self._add_system( + "Changed to interact mode\n" + ) + + message = cmd[len("/interact"):].strip() + if not message: + self._add_system( + "Usage: /interact \n" + "Example: /interact Can you help me recon the target site?\n" + ) + return + + if self.agent and not self._is_running: + # Schedule interact run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker) + self._current_worker = self._run_interact(message) + + async def _parse_assist_command(self, cmd: str) -> None: + # Use assist mode by default + + self._set_status("idle", "assist") + self._update_header() + self._add_system( + "Changed to assist mode\n" + ) + + message = cmd[len("/assist"):].strip() + if not message: + self._add_system( + "Usage: /assist \n" + "Example: /assist Can you help me recon the target site?\n" + ) + return + + if self.agent and not self._is_running: + # Schedule assist run and keep task handle (do not wrap in asyncio.create_task; @work returns a Worker) + self._current_worker = self._run_assist(message) + async def _parse_mcp_command(self, cmd: str) -> None: # Remove /agent prefix rest = cmd[len("/mcp"):].strip() @@ -2756,9 +2823,8 @@ Be concise. Use the actual data from notes.""" # try to recreate a compact model/runtime line runtime_str = "Docker" if getattr(self, "use_docker", False) else "Local" tools_count = len(self.agent.get_tools()) - mode = getattr(self, '_mode', 'assist') - if mode == 'assist': - mode = 'assist (use /agent or /crew for autonomous modes)' + mode = getattr(self, '_mode', "") + mode += ' (use /assist for single tool execution, /agent or /crew for autonomous modes, /interact for interactive chat)' lines.append( f"+ PentestAgent ready\n Model: {getattr(self, 'model', '')} | Tools: {tools_count} | MCP: {getattr(self, 'mcp_server_count', '')} | RAG: {getattr(self, 'rag_doc_count', '')}\n Runtime: {runtime_str} | Mode: {mode}" ) @@ -3202,6 +3268,68 @@ Be concise. Use the actual data from notes.""" finally: self._is_running = False + @work(thread=False) + async def _run_interact(self, message: str) -> None: + """Run in interact mode""" + if not self.agent: + self._add_system("[!] Agent not ready") + return + + self._is_running = True + self._should_stop = False + self._set_status("thinking", "interact") + + + #Use to track the tool message with the result. + tool_messages_mapping: dict[str, ToolMessage] = {} + + try: + async for response in self.agent.interact(message): + if self._should_stop: + self._add_system("[!] Stopped by user") + break + + self._set_status("processing") + + # Show thinking/plan FIRST if there's content with tool calls + if response.content: + content = response.content.strip() + if response.tool_calls: + self._add_thinking(content) + else: + self._add_assistant(content) + + # Show tool calls (skip 'finish' - internal control) + if response.tool_calls: + for call in response.tool_calls: + args_str = str(call.arguments) + tool_messages_mapping[call.id] = self._add_tool(call.name, args_str) + + # Show tool results (displayed after execution completes) + if response.tool_results: + for result in response.tool_results: + if result.tool_call_id not in tool_messages_mapping: + continue + if result.success: + self._add_tool_result(tool_messages_mapping[result.tool_call_id], + result.tool_name, result.result or "Done" + ) + else: + self._add_tool_result(tool_messages_mapping[result.tool_call_id], + result.tool_name, f"Error: {result.error}" + ) + + self._set_status("idle", "interact") + + except asyncio.CancelledError: + self._add_system("[!] Cancelled") + self._set_status("idle", "interact") + except Exception as e: + self._add_system(f"[!] Error: {e}") + self._set_status("error") + finally: + self._is_running = False + @work(thread=False) async def _run_assist(self, message: str) -> None: """Run in assist mode - single response"""