From f34c7fe3911a62625c4f96a4fc16ce2b86bfba85 Mon Sep 17 00:00:00 2001 From: GH05TCREW Date: Fri, 26 Dec 2025 08:09:06 -0700 Subject: [PATCH] feat(crew): refactor to use finish tool for agent result synthesis --- pentestagent/agents/crew/orchestrator.py | 56 +++++++++++++++++++++-- pentestagent/agents/crew/tools.py | 52 +++++++++++++++------ pentestagent/agents/prompts/pa_crew.jinja | 5 +- pentestagent/interface/tui.py | 6 +++ pentestagent/runtime/runtime.py | 23 ++++++++++ 5 files changed, 122 insertions(+), 20 deletions(-) diff --git a/pentestagent/agents/crew/orchestrator.py b/pentestagent/agents/crew/orchestrator.py index b35af77..7c164fb 100644 --- a/pentestagent/agents/crew/orchestrator.py +++ b/pentestagent/agents/crew/orchestrator.py @@ -254,8 +254,21 @@ class CrewOrchestrator: } ) - if tc_name == "synthesize_findings": + if tc_name == "finish": + # Check if finish tool used tokens for synthesis + if ( + hasattr(self.pool, "finish_tokens") + and self.pool.finish_tokens > 0 + ): + yield { + "phase": "tokens", + "tokens": self.pool.finish_tokens, + } + self.pool.finish_tokens = ( + 0 # Reset for next use + ) final_report = result + break # Exit immediately after finish except Exception as e: error_msg = f"Error: {e}" @@ -284,12 +297,47 @@ class CrewOrchestrator: if final_report: break else: - # No tool calls - the content IS the final report (Direct Answer) + # No tool calls - check if agents were spawned content = response.content or "" if content: - final_report = content - # Record it in history but don't yield as "thinking" self._messages.append({"role": "assistant", "content": content}) + + # If agents were spawned, call finish to synthesize results + # Otherwise, use the response directly as final report + if self.pool and self.pool.get_workers(): + # Agents exist - call finish to synthesize + yield {"phase": "thinking", "content": content} + finish_tool = next( + (t for t in crew_tools if t.name == "finish"), None + ) + if finish_tool: + try: + final_report = await finish_tool.execute( + {"context": content}, self.runtime + ) + # Track tokens from auto-finish synthesis + if ( + hasattr(self.pool, "finish_tokens") + and self.pool.finish_tokens > 0 + ): + yield { + "phase": "tokens", + "tokens": self.pool.finish_tokens, + } + self.pool.finish_tokens = 0 + break + except Exception as e: + yield { + "phase": "error", + "error": f"Auto-finish failed: {e}", + } + break + else: + final_report = content + break + else: + # No agents - response is the final answer + final_report = content break self.state = CrewState.COMPLETE diff --git a/pentestagent/agents/crew/tools.py b/pentestagent/agents/crew/tools.py index a3b70ad..e32f837 100644 --- a/pentestagent/agents/crew/tools.py +++ b/pentestagent/agents/crew/tools.py @@ -80,33 +80,48 @@ def create_crew_tools(pool: "WorkerPool", llm: "LLM") -> List[Tool]: return f"Cancelled {agent_id}" return f"Could not cancel {agent_id} (not running or not found)" - async def synthesize_findings_fn(arguments: dict, runtime: "Runtime") -> str: - """Compile all agent results into a unified report.""" + async def finish_fn(arguments: dict, runtime: "Runtime") -> str: + """Finish crew task: wait for agents, synthesize results, return final summary.""" + context = arguments.get("context", "") + + # Step 1: Wait for all agents to complete + await pool.wait_for(None) + + # Step 2: Gather agent results workers = pool.get_workers() if not workers: - return "No agents have run yet." + return context or "Task completed. No agents were spawned." results_text = [] for w in workers: if w.result: - results_text.append(f"## {w.task}\n{w.result}") + results_text.append(f"## {w.id}: {w.task}\n{w.result}") elif w.error: - results_text.append(f"## {w.task}\nError: {w.error}") + results_text.append(f"## {w.id}: {w.task}\nError: {w.error}") if not results_text: - return "No results to synthesize." + return context or "Task completed. Agents ran but produced no results." - prompt = f"""Synthesize these agent findings into a unified penetration test report. -Present concrete findings. Be factual and concise about what was discovered. + # Step 3: Use LLM to synthesize findings into final summary + prompt = f"""Synthesize these agent findings into a clear, concise summary. -{chr(10).join(results_text)}""" +Context: {context or 'Penetration test task completed.'} + +Agent Results: +{chr(10).join(results_text)} + +Provide a unified summary of what was accomplished and key findings.""" response = await llm.generate( - system_prompt="Synthesize penetration test findings into a clear, actionable report.", + system_prompt="You are synthesizing penetration test results. Be factual, clear, and concise.", messages=[{"role": "user", "content": prompt}], tools=[], ) + # Store token usage for orchestrator to report + if response.usage: + pool.finish_tokens = response.usage.get("total_tokens", 0) + return response.content async def formulate_strategy_fn(arguments: dict, runtime: "Runtime") -> str: @@ -228,10 +243,19 @@ Present concrete findings. Be factual and concise about what was discovered. category="orchestration", ), Tool( - name="synthesize_findings", - description="Compile all agent results into a unified penetration test report. Call this after all agents have completed.", - schema=ToolSchema(type="object", properties={}, required=[]), - execute_fn=synthesize_findings_fn, + name="finish", + description="Complete the crew task. Automatically waits for all agents, synthesizes their results using LLM, and returns final summary. Call this when task objectives are met.", + schema=ToolSchema( + type="object", + properties={ + "context": { + "type": "string", + "description": "Optional context about the task for synthesis (e.g., 'Tested SSH access on localhost'). Helps frame the summary.", + } + }, + required=[], + ), + execute_fn=finish_fn, category="orchestration", ), Tool( diff --git a/pentestagent/agents/prompts/pa_crew.jinja b/pentestagent/agents/prompts/pa_crew.jinja index aeb4375..df088c1 100644 --- a/pentestagent/agents/prompts/pa_crew.jinja +++ b/pentestagent/agents/prompts/pa_crew.jinja @@ -34,7 +34,7 @@ You manage agents using these tools: - **get_agent_status**: Check on a specific agent - **cancel_agent**: Stop an agent if needed - **formulate_strategy**: Define and select a Course of Action (COA). Use this for strategic decisions. -- **synthesize_findings**: Compile all results into a final concise report (call this when done) +- **finish**: Mark task complete. Automatically waits for all agents and accepts your summary. {% if worker_tools %} ## Worker Agent Tools @@ -73,8 +73,9 @@ You are responsible for STRATEGIC adaptation. 4. THEN spawn new agents to execute the selected COA. ## Guidelines +- Do not ask the user questions or request input. Make all decisions autonomously. - Leverage any prior intelligence from earlier reconnaissance - Be strategic - spawn 2-4 agents in parallel for efficiency - Each agent task should be specific and actionable - Adapt your approach based on what agents discover -- Call synthesize_findings when you have enough information for a report +- When done, call `finish` with a summary (brief for simple tasks, detailed for complex ones) diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index e4b1a96..6583820 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -1498,6 +1498,12 @@ Be concise. Use the actual data from notes.""" if phase == "starting": self._set_status("thinking", "crew") + elif phase == "tokens": + # Track orchestrator token usage + tokens = update.get("tokens", 0) + self._crew_tokens_used += tokens + self._update_crew_stats() + elif phase == "thinking": # Show the orchestrator's reasoning content = update.get("content", "") diff --git a/pentestagent/runtime/runtime.py b/pentestagent/runtime/runtime.py index 8623be2..c68c316 100644 --- a/pentestagent/runtime/runtime.py +++ b/pentestagent/runtime/runtime.py @@ -469,6 +469,13 @@ class LocalRuntime(Runtime): if proc.returncode is None: proc.terminate() await proc.wait() + # Close pipes to prevent warnings + if proc.stdin: + proc.stdin.close() + if proc.stdout: + proc.stdout.close() + if proc.stderr: + proc.stderr.close() except Exception: pass self._active_processes.clear() @@ -553,10 +560,17 @@ class LocalRuntime(Runtime): env=env, ) + # Track process for cleanup + self._active_processes.append(process) + stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) + # Remove from tracking after completion + if process in self._active_processes: + self._active_processes.remove(process) + # Decode and strip ANSI codes stdout_str = stdout.decode(errors="replace") stderr_str = stderr.decode(errors="replace") @@ -571,15 +585,24 @@ class LocalRuntime(Runtime): ) except asyncio.TimeoutError: + # Clean up timed-out process + if process in self._active_processes: + self._active_processes.remove(process) return CommandResult( exit_code=-1, stdout="", stderr=f"Command timed out after {timeout} seconds", ) except asyncio.CancelledError: + # Clean up cancelled process + if process in self._active_processes: + self._active_processes.remove(process) # Handle Ctrl+C gracefully return CommandResult(exit_code=-1, stdout="", stderr="Command cancelled") except Exception as e: + # Clean up on error + if process in self._active_processes: + self._active_processes.remove(process) return CommandResult(exit_code=-1, stdout="", stderr=str(e)) async def browser_action(self, action: str, **kwargs) -> dict: