chore: remove unused folder

This commit is contained in:
GH05TCREW
2025-12-19 22:28:41 -07:00
parent c9866a407b
commit f60f943ff7
3 changed files with 0 additions and 2826 deletions

View File

@@ -1,336 +0,0 @@
"""Worker pool for managing concurrent agent execution."""
import asyncio
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from .models import AgentStatus, AgentWorker, WorkerCallback
if TYPE_CHECKING:
from ...llm import LLM
from ...runtime import Runtime
from ...tools import Tool
class WorkerPool:
"""Manages concurrent execution of worker agents."""
def __init__(
self,
llm: "LLM",
tools: List["Tool"],
runtime: "Runtime",
target: str = "",
rag_engine: Any = None,
on_worker_event: Optional[WorkerCallback] = None,
):
self.llm = llm
self.tools = tools
self.runtime = runtime
self.target = target
self.rag_engine = rag_engine
self.on_worker_event = on_worker_event
self._workers: Dict[str, AgentWorker] = {}
self._tasks: Dict[str, asyncio.Task] = {}
self._results: Dict[str, str] = {}
self._next_id = 0
self._lock = asyncio.Lock()
def _emit(self, worker_id: str, event: str, data: Dict[str, Any]) -> None:
"""Emit event to callback if registered."""
if self.on_worker_event:
self.on_worker_event(worker_id, event, data)
def _generate_id(self) -> str:
"""Generate unique worker ID."""
worker_id = f"agent-{self._next_id}"
self._next_id += 1
return worker_id
async def spawn(
self,
task: str,
priority: int = 1,
depends_on: Optional[List[str]] = None,
) -> str:
"""
Spawn a new worker agent.
Args:
task: The task description for the agent
priority: Higher priority runs first (for future use)
depends_on: List of agent IDs that must complete first
Returns:
The worker ID
"""
async with self._lock:
worker_id = self._generate_id()
worker = AgentWorker(
id=worker_id,
task=task,
priority=priority,
depends_on=depends_on or [],
)
self._workers[worker_id] = worker
# Emit spawn event for UI
self._emit(
worker_id,
"spawn",
{
"worker_type": worker_id,
"task": task,
},
)
# Start the agent task
self._tasks[worker_id] = asyncio.create_task(self._run_worker(worker))
return worker_id
async def _run_worker(self, worker: AgentWorker) -> None:
"""Run a single worker agent."""
from ..pa_agent import PentestAgentAgent
# Wait for dependencies
if worker.depends_on:
await self._wait_for_dependencies(worker.depends_on)
worker.status = AgentStatus.RUNNING
worker.started_at = time.time()
self._emit(worker.id, "status", {"status": "running"})
# Create isolated runtime for this worker (prevents browser state conflicts)
from ...runtime.runtime import LocalRuntime
worker_runtime = LocalRuntime()
await worker_runtime.start()
from ...config.constants import WORKER_MAX_ITERATIONS
agent = PentestAgentAgent(
llm=self.llm,
tools=self.tools,
runtime=worker_runtime, # Use isolated runtime
target=self.target,
rag_engine=self.rag_engine,
max_iterations=WORKER_MAX_ITERATIONS,
)
try:
final_response = ""
hit_max_iterations = False
is_infeasible = False
async for response in agent.agent_loop(worker.task):
# Track tool calls
if response.tool_calls:
for tc in response.tool_calls:
if tc.name not in worker.tools_used:
worker.tools_used.append(tc.name)
self._emit(worker.id, "tool", {"tool": tc.name})
# Track tokens (avoid double counting)
if response.usage:
total = response.usage.get("total_tokens", 0)
is_intermediate = response.metadata.get("intermediate", False)
has_tools = bool(response.tool_calls)
# Same logic as CLI to avoid double counting
should_count = False
if is_intermediate:
should_count = True
worker.last_msg_intermediate = True
elif has_tools:
if not getattr(worker, "last_msg_intermediate", False):
should_count = True
worker.last_msg_intermediate = False
else:
should_count = True
worker.last_msg_intermediate = False
if should_count and total > 0:
self._emit(worker.id, "tokens", {"tokens": total})
# Capture final response (text without tool calls)
if response.content and not response.tool_calls:
final_response = response.content
# Check metadata flags
if response.metadata:
if response.metadata.get("max_iterations_reached"):
hit_max_iterations = True
if response.metadata.get("replan_impossible"):
is_infeasible = True
# Prioritize structured results from the plan over chatty summaries
plan_summary = ""
plan = getattr(worker_runtime, "plan", None)
if plan and plan.steps:
from ...tools.finish import StepStatus
# Include ALL steps regardless of status - skips and failures are valuable context
# Note: PlanStep stores failure/skip reasons in the 'result' field
steps_with_info = [s for s in plan.steps if s.result]
if steps_with_info:
summary_lines = []
for s in steps_with_info:
status_marker = {
StepStatus.COMPLETE: "",
StepStatus.SKIP: "",
StepStatus.FAIL: "",
}.get(s.status, "·")
info = s.result or "No details"
summary_lines.append(f"{status_marker} {s.description}: {info}")
plan_summary = "\n".join(summary_lines)
# Use plan summary if available, otherwise fallback to chat response
worker.result = plan_summary or final_response or "No findings."
worker.completed_at = time.time()
self._results[worker.id] = worker.result
if is_infeasible:
worker.status = AgentStatus.FAILED
self._emit(
worker.id,
"failed",
{
"summary": worker.result[:200],
"reason": "Task determined infeasible",
},
)
elif hit_max_iterations:
worker.status = AgentStatus.WARNING
self._emit(
worker.id,
"warning",
{
"summary": worker.result[:200],
"reason": "Max iterations reached",
},
)
else:
worker.status = AgentStatus.COMPLETE
self._emit(
worker.id,
"complete",
{
"summary": worker.result[:200],
},
)
except asyncio.CancelledError:
worker.status = AgentStatus.CANCELLED
worker.completed_at = time.time()
self._emit(worker.id, "cancelled", {})
raise
except Exception as e:
worker.error = str(e)
worker.status = AgentStatus.ERROR
worker.completed_at = time.time()
self._emit(worker.id, "error", {"error": str(e)})
finally:
# Cleanup worker's isolated runtime
try:
await worker_runtime.stop()
except Exception:
pass # Best effort cleanup
async def _wait_for_dependencies(self, depends_on: List[str]) -> None:
"""Wait for dependent workers to complete."""
for dep_id in depends_on:
if dep_id in self._tasks:
try:
await self._tasks[dep_id]
except (asyncio.CancelledError, Exception):
pass # Dependency failed, but we continue
async def wait_for(self, agent_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""
Wait for specified agents (or all) to complete.
Args:
agent_ids: List of agent IDs to wait for. None = wait for all.
Returns:
Dict mapping agent_id to result/error
"""
if agent_ids is None:
agent_ids = list(self._tasks.keys())
results = {}
for agent_id in agent_ids:
if agent_id in self._tasks:
try:
await self._tasks[agent_id]
except (asyncio.CancelledError, Exception):
pass
worker = self._workers.get(agent_id)
if worker:
results[agent_id] = {
"task": worker.task,
"status": worker.status.value,
"result": worker.result,
"error": worker.error,
"tools_used": worker.tools_used,
}
return results
def get_status(self, agent_id: str) -> Optional[Dict[str, Any]]:
"""Get status of a specific agent."""
worker = self._workers.get(agent_id)
if not worker:
return None
return worker.to_dict()
def get_all_status(self) -> Dict[str, Dict[str, Any]]:
"""Get status of all agents."""
return {wid: w.to_dict() for wid, w in self._workers.items()}
async def cancel(self, agent_id: str) -> bool:
"""Cancel a running agent."""
if agent_id not in self._tasks:
return False
task = self._tasks[agent_id]
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
return True
return False
async def cancel_all(self) -> None:
"""Cancel all running agents."""
for task in self._tasks.values():
if not task.done():
task.cancel()
# Wait for all to finish
if self._tasks:
await asyncio.gather(*self._tasks.values(), return_exceptions=True)
def get_results(self) -> Dict[str, str]:
"""Get results from all completed agents."""
return dict(self._results)
def get_workers(self) -> List[AgentWorker]:
"""Get all workers."""
return list(self._workers.values())
def reset(self) -> None:
"""Reset the pool for a new task."""
self._workers.clear()
self._tasks.clear()
self._results.clear()
self._next_id = 0

View File

@@ -1,686 +0,0 @@
"""Non-interactive CLI mode for PentestAgent."""
import asyncio
import time
from datetime import datetime
from pathlib import Path
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.text import Text
console = Console()
# PA theme colors (matching TUI)
PA_PRIMARY = "#d4d4d4" # light gray - primary text
PA_SECONDARY = "#9a9a9a" # medium gray - secondary text
PA_DIM = "#6b6b6b" # dim gray - muted text
PA_BORDER = "#3a3a3a" # dark gray - borders
PA_ACCENT = "#7a7a7a" # accent gray
async def run_cli(
target: str,
model: str,
task: str = None,
report: str = None,
max_loops: int = 50,
use_docker: bool = False,
mode: str = "agent",
):
"""
Run PentestAgent in non-interactive mode.
Args:
target: Target to test
model: LLM model to use
task: Optional task description
report: Report path ("auto" for loot/reports/<target>_<timestamp>.md)
max_loops: Max agent loops before stopping
use_docker: Run tools in Docker container
mode: Execution mode ("agent" or "crew")
"""
from ..agents.pa_agent import PentestAgentAgent
from ..knowledge import RAGEngine
from ..llm import LLM
from ..runtime.docker_runtime import DockerRuntime
from ..runtime.runtime import LocalRuntime
from ..tools import get_all_tools
# Startup panel
start_text = Text()
start_text.append("PENTESTAGENT", style=f"bold {PA_PRIMARY}")
start_text.append(" - Non-interactive Mode\n\n", style=PA_DIM)
start_text.append("Target: ", style=PA_SECONDARY)
start_text.append(f"{target}\n", style=PA_PRIMARY)
start_text.append("Model: ", style=PA_SECONDARY)
start_text.append(f"{model}\n", style=PA_PRIMARY)
start_text.append("Mode: ", style=PA_SECONDARY)
start_text.append(f"{mode.title()}\n", style=PA_PRIMARY)
start_text.append("Runtime: ", style=PA_SECONDARY)
start_text.append(f"{'Docker' if use_docker else 'Local'}\n", style=PA_PRIMARY)
start_text.append("Max loops: ", style=PA_SECONDARY)
start_text.append(f"{max_loops}\n", style=PA_PRIMARY)
task_msg = task or f"Perform a penetration test on {target}"
start_text.append("Task: ", style=PA_SECONDARY)
start_text.append(task_msg, style=PA_PRIMARY)
console.print()
console.print(
Panel(
start_text, title=f"[{PA_SECONDARY}]Starting", border_style=PA_BORDER
)
)
console.print()
# Initialize RAG if knowledge exists
rag = None
knowledge_path = Path("knowledge")
if knowledge_path.exists():
try:
rag = RAGEngine(knowledge_path=knowledge_path)
rag.index()
except Exception:
pass
# Initialize MCP if config exists (silently skip failures)
mcp_manager = None
mcp_count = 0
try:
from ..mcp import MCPManager
from ..tools import register_tool_instance
mcp_manager = MCPManager()
if mcp_manager.config_path.exists():
mcp_tools = await mcp_manager.connect_all()
for tool in mcp_tools:
register_tool_instance(tool)
mcp_count = len(mcp_tools)
if mcp_count > 0:
console.print(f"[{PA_DIM}]Loaded {mcp_count} MCP tools[/]")
except Exception:
pass # MCP is optional, continue without it
# Initialize runtime - Docker or Local
if use_docker:
console.print(f"[{PA_DIM}]Starting Docker container...[/]")
runtime = DockerRuntime(mcp_manager=mcp_manager)
else:
runtime = LocalRuntime(mcp_manager=mcp_manager)
await runtime.start()
llm = LLM(model=model, rag_engine=rag)
tools = get_all_tools()
# Stats tracking
start_time = time.time()
tool_count = 0
iteration = 0
findings_count = 0 # Count of notes/findings recorded
findings = [] # Store actual findings text
total_tokens = 0 # Track total token usage
messages = [] # Store agent messages
tool_log = [] # Log of tools executed (ts, name, command, result, exit_code)
last_content = ""
last_msg_intermediate = False # Track if previous message was intermediate (to avoid double counting tokens)
stopped_reason = None
def print_status(msg: str, style: str = PA_DIM):
elapsed = int(time.time() - start_time)
mins, secs = divmod(elapsed, 60)
timestamp = f"[{mins:02d}:{secs:02d}]"
console.print(f"[{PA_DIM}]{timestamp}[/] [{style}]{msg}[/]")
def display_message(content: str, title: str) -> bool:
"""Display a message panel if it hasn't been shown yet."""
nonlocal last_content
if content and content != last_content:
console.print()
console.print(
Panel(
Markdown(content),
title=f"[{PA_PRIMARY}]{title}",
border_style=PA_BORDER,
)
)
console.print()
last_content = content
return True
return False
def generate_report() -> str:
"""Generate markdown report."""
elapsed = int(time.time() - start_time)
mins, secs = divmod(elapsed, 60)
status_text = "Complete"
if stopped_reason:
status_text = f"Interrupted ({stopped_reason})"
lines = [
"# PentestAgent Penetration Test Report",
"",
"## Executive Summary",
"",
]
# Add AI summary at top if available
# If the last finding is a full report (Crew mode), use it as the main body
# and avoid adding duplicate headers
main_content = ""
if findings:
main_content = findings[-1]
# If it's a full report (starts with #), don't add our own headers if possible
if not main_content.strip().startswith("#"):
lines.append(main_content)
lines.append("")
else:
# It's a full report, so we might want to replace the default header
# or just append it. Let's append it but skip the "Executive Summary" header above if we could.
# For now, just append it.
lines.append(main_content)
lines.append("")
else:
lines.append("*Assessment incomplete - no analysis generated.*")
lines.append("")
# Engagement details table
lines.extend(
[
"## Engagement Details",
"",
"| Field | Value |",
"|-------|-------|",
f"| **Target** | `{target}` |",
f"| **Task** | {task_msg} |",
f"| **Date** | {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |",
f"| **Duration** | {mins}m {secs}s |",
f"| **Commands Executed** | {tool_count} |",
f"| **Status** | {status_text} |",
"",
"---",
"",
"## Commands Executed",
"",
]
)
# Detailed command log
for i, entry in enumerate(tool_log, 1):
ts = entry.get("ts", "??:??")
name = entry.get("name", "unknown")
command = entry.get("command", "")
result = entry.get("result", "")
exit_code = entry.get("exit_code")
lines.append(f"### {i}. {name} `[{ts}]`")
lines.append("")
if command:
lines.append("**Command:**")
lines.append("```")
lines.append(command)
lines.append("```")
lines.append("")
if exit_code is not None:
lines.append(f"**Exit Code:** `{exit_code}`")
lines.append("")
if result:
lines.append("**Output:**")
lines.append("```")
# Limit output to 2000 chars per command for report size
if len(result) > 2000:
lines.append(result[:2000])
lines.append(f"\n... (truncated, {len(result)} total chars)")
else:
lines.append(result)
lines.append("```")
lines.append("")
# Findings section
# Only show if there are other findings besides the final report we already showed
other_findings = findings[:-1] if findings and len(findings) > 1 else []
if other_findings:
lines.extend(
[
"---",
"",
"## Detailed Findings",
"",
]
)
for i, finding in enumerate(other_findings, 1):
if len(other_findings) > 1:
lines.append(f"### Finding {i}")
lines.append("")
lines.append(finding)
lines.append("")
# Footer
lines.extend(
[
"---",
"",
f"*Report generated by PentestAgent on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*",
]
)
return "\n".join(lines)
def save_report():
"""Save report to file."""
if not report:
return
# Determine path
if report == "auto":
reports_dir = Path("loot/reports")
reports_dir.mkdir(parents=True, exist_ok=True)
safe_target = target.replace("://", "_").replace("/", "_").replace(":", "_")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_path = reports_dir / f"{safe_target}_{timestamp}.md"
else:
report_path = Path(report)
report_path.parent.mkdir(parents=True, exist_ok=True)
content = generate_report()
report_path.write_text(content, encoding="utf-8")
console.print(f"[{PA_SECONDARY}]Report saved: {report_path}[/]")
async def generate_summary():
"""Ask the LLM to summarize findings when stopped early."""
if not tool_log:
return None
print_status("Generating summary...", PA_SECONDARY)
# Build context from tool results (use full results, not truncated)
context_lines = ["Summarize the penetration test findings so far:\n"]
context_lines.append(f"Target: {target}")
context_lines.append(f"Tools executed: {tool_count}\n")
for entry in tool_log[-10:]: # Last 10 tools
name = entry.get("name", "unknown")
command = entry.get("command", "")
result = entry.get("result", "")[:500] # Limit for context window
context_lines.append(f"- **{name}**: `{command}`")
if result:
context_lines.append(f" Output: {result}")
context_lines.append(
"\nProvide a brief summary of what was discovered and any security concerns found."
)
try:
response = await llm.generate(
system_prompt="You are a penetration testing assistant. Summarize the findings concisely.",
messages=[{"role": "user", "content": "\n".join(context_lines)}],
tools=[],
)
return response.content
except Exception:
return None
async def print_summary(interrupted: bool = False):
nonlocal messages
# Generate summary if we don't have messages yet
if not messages and tool_log:
summary = await generate_summary()
if summary:
messages.append(summary)
elapsed = int(time.time() - start_time)
mins, secs = divmod(elapsed, 60)
title = "Interrupted" if interrupted else "Finished"
status = "PARTIAL RESULTS" if interrupted else "COMPLETE"
if stopped_reason:
status = f"STOPPED ({stopped_reason})"
final_text = Text()
final_text.append(f"{status}\n\n", style=f"bold {PA_PRIMARY}")
final_text.append("Duration: ", style=PA_DIM)
final_text.append(f"{mins}m {secs}s\n", style=PA_SECONDARY)
final_text.append("Loops: ", style=PA_DIM)
final_text.append(f"{iteration}/{max_loops}\n", style=PA_SECONDARY)
final_text.append("Tools: ", style=PA_DIM)
final_text.append(f"{tool_count}\n", style=PA_SECONDARY)
if total_tokens > 0:
final_text.append("Tokens: ", style=PA_DIM)
final_text.append(f"{total_tokens:,}\n", style=PA_SECONDARY)
if findings_count > 0:
final_text.append("Findings: ", style=PA_DIM)
final_text.append(f"{findings_count}", style=PA_SECONDARY)
console.print()
console.print(
Panel(
final_text,
title=f"[{PA_SECONDARY}]{title}",
border_style=PA_BORDER,
)
)
# Show summary/messages only if it's new content (not just displayed)
if messages:
display_message(messages[-1], "Summary")
# Save report
save_report()
print_status("Initializing...")
try:
if mode == "crew":
from ..agents.crew import CrewOrchestrator
def on_worker_event(worker_id: str, event_type: str, data: dict):
nonlocal tool_count, findings_count, total_tokens
if event_type == "spawn":
task = data.get("task", "")
print_status(f"Spawned worker {worker_id}: {task}", PA_ACCENT)
elif event_type == "tool":
tool_name = data.get("tool", "unknown")
tool_count += 1
print_status(
f"Worker {worker_id} using tool: {tool_name}", PA_DIM
)
# Log tool usage (limited info available from event)
elapsed = int(time.time() - start_time)
mins, secs = divmod(elapsed, 60)
ts = f"{mins:02d}:{secs:02d}"
tool_log.append(
{
"ts": ts,
"name": tool_name,
"command": f"(Worker {worker_id})",
"result": "",
"exit_code": None,
}
)
elif event_type == "tokens":
tokens = data.get("tokens", 0)
total_tokens += tokens
elif event_type == "complete":
f_count = data.get("findings_count", 0)
findings_count += f_count
print_status(
f"Worker {worker_id} complete ({f_count} findings)", "green"
)
elif event_type == "failed":
reason = data.get("reason", "unknown")
print_status(f"Worker {worker_id} failed: {reason}", "red")
elif event_type == "status":
status = data.get("status", "")
print_status(f"Worker {worker_id} status: {status}", PA_DIM)
elif event_type == "warning":
reason = data.get("reason", "unknown")
print_status(f"Worker {worker_id} warning: {reason}", "yellow")
elif event_type == "error":
error = data.get("error", "unknown")
print_status(f"Worker {worker_id} error: {error}", "red")
elif event_type == "cancelled":
print_status(f"Worker {worker_id} cancelled", "yellow")
crew = CrewOrchestrator(
llm=llm,
tools=tools,
runtime=runtime,
on_worker_event=on_worker_event,
rag_engine=rag,
target=target,
)
async for update in crew.run(task_msg):
iteration += 1
phase = update.get("phase", "")
if phase == "starting":
print_status("Crew orchestrator starting...", PA_PRIMARY)
elif phase == "thinking":
content = update.get("content", "")
if content:
display_message(content, "PentestAgent Plan")
elif phase == "tool_call":
tool = update.get("tool", "")
args = update.get("args", {})
print_status(f"Orchestrator calling: {tool}", PA_ACCENT)
elif phase == "complete":
report_content = update.get("report", "")
if report_content:
messages.append(report_content)
findings.append(
report_content
) # Add to findings so it appears in the saved report
display_message(report_content, "Crew Report")
elif phase == "error":
error = update.get("error", "Unknown error")
print_status(f"Crew error: {error}", "red")
if iteration >= max_loops:
stopped_reason = "max loops reached"
raise StopIteration()
else:
# Default Agent Mode
agent = PentestAgentAgent(
llm=llm,
tools=tools,
runtime=runtime,
target=target,
rag_engine=rag,
)
async for response in agent.agent_loop(task_msg):
iteration += 1
# Track token usage
if response.usage:
usage = response.usage.get("total_tokens", 0)
is_intermediate = response.metadata.get("intermediate", False)
has_tools = bool(response.tool_calls)
# Logic to avoid double counting:
# 1. Intermediate messages (thinking) always count
# 2. Tool messages count ONLY if not preceded by intermediate message
if is_intermediate:
total_tokens += usage
last_msg_intermediate = True
elif has_tools:
if not last_msg_intermediate:
total_tokens += usage
last_msg_intermediate = False
else:
# Other messages (like plan)
total_tokens += usage
last_msg_intermediate = False
# Show tool calls and results as they happen
if response.tool_calls:
for i, call in enumerate(response.tool_calls):
tool_count += 1
name = getattr(call, "name", None) or getattr(
call.function, "name", "tool"
)
# Track findings (notes tool)
if name == "notes":
findings_count += 1
try:
args = getattr(call, "arguments", None) or getattr(
call.function, "arguments", "{}"
)
if isinstance(args, str):
import json
args = json.loads(args)
if isinstance(args, dict):
note_content = (
args.get("value", "")
or args.get("content", "")
or args.get("note", "")
)
if note_content:
findings.append(note_content)
except Exception:
pass
elapsed = int(time.time() - start_time)
mins, secs = divmod(elapsed, 60)
ts = f"{mins:02d}:{secs:02d}"
# Get result if available
if response.tool_results and i < len(response.tool_results):
tr = response.tool_results[i]
result_text = tr.result or tr.error or ""
if result_text:
# Truncate for display
preview = result_text[:200].replace("\n", " ")
if len(result_text) > 200:
preview += "..."
# Parse args for command extraction
command_text = ""
exit_code = None
try:
args = getattr(call, "arguments", None) or getattr(
call.function, "arguments", "{}"
)
if isinstance(args, str):
import json
args = json.loads(args)
if isinstance(args, dict):
command_text = args.get("command", "")
except Exception:
pass
# Extract exit code from result
if response.tool_results and i < len(response.tool_results):
tr = response.tool_results[i]
full_result = tr.result or tr.error or ""
# Try to parse exit code
if "Exit Code:" in full_result:
try:
import re
match = re.search(
r"Exit Code:\s*(\d+)", full_result
)
if match:
exit_code = int(match.group(1))
except Exception:
pass
else:
full_result = ""
# Store full data for report (not truncated)
tool_log.append(
{
"ts": ts,
"name": name,
"command": command_text,
"result": full_result,
"exit_code": exit_code,
}
)
# Metasploit-style output with better spacing
console.print() # Blank line before each tool
print_status(f"$ {name} ({tool_count})", PA_ACCENT)
# Show command/args on separate indented line (truncated for display)
if command_text:
display_cmd = command_text[:80]
if len(command_text) > 80:
display_cmd += "..."
console.print(f" [{PA_DIM}]{display_cmd}[/]")
# Show result on separate line with status indicator
if response.tool_results and i < len(response.tool_results):
tr = response.tool_results[i]
if tr.error:
console.print(
f" [{PA_DIM}][!] {tr.error[:100]}[/]"
)
elif tr.result:
# Show exit code or brief result
result_line = tr.result[:100].replace("\n", " ")
if exit_code == 0 or "success" in result_line.lower():
console.print(f" [{PA_DIM}][+] OK[/]")
elif exit_code is not None and exit_code != 0:
console.print(
f" [{PA_DIM}][-] Exit {exit_code}[/]"
)
else:
console.print(
f" [{PA_DIM}][*] {result_line[:60]}...[/]"
)
# Print assistant content immediately (analysis/findings)
if response.content:
if display_message(response.content, "PentestAgent"):
messages.append(response.content)
# Check max loops limit
if iteration >= max_loops:
stopped_reason = "max loops reached"
console.print()
print_status(f"Max loops limit reached ({max_loops})", "yellow")
raise StopIteration()
# In agent mode, ensure the final message is treated as the main finding (Executive Summary)
if mode != "crew" and messages:
findings.append(messages[-1])
await print_summary(interrupted=False)
except StopIteration:
await print_summary(interrupted=True)
except (KeyboardInterrupt, asyncio.CancelledError):
stopped_reason = "user interrupt"
await print_summary(interrupted=True)
except Exception as e:
console.print(f"\n[red]Error: {e}[/]")
stopped_reason = f"error: {e}"
await print_summary(interrupted=True)
finally:
# Cleanup MCP connections first
if mcp_manager:
try:
await mcp_manager.disconnect_all()
await asyncio.sleep(0.1) # Allow transports to close cleanly
except Exception:
pass
# Then stop runtime
if runtime:
try:
await runtime.stop()
except Exception:
pass

File diff suppressed because it is too large Load Diff