feat(crew): refactor to use finish tool for agent result synthesis

This commit is contained in:
GH05TCREW
2025-12-26 08:09:06 -07:00
parent 9b24dd23d2
commit f34c7fe391
5 changed files with 122 additions and 20 deletions

View File

@@ -254,8 +254,21 @@ class CrewOrchestrator:
}
)
if tc_name == "synthesize_findings":
if tc_name == "finish":
# Check if finish tool used tokens for synthesis
if (
hasattr(self.pool, "finish_tokens")
and self.pool.finish_tokens > 0
):
yield {
"phase": "tokens",
"tokens": self.pool.finish_tokens,
}
self.pool.finish_tokens = (
0 # Reset for next use
)
final_report = result
break # Exit immediately after finish
except Exception as e:
error_msg = f"Error: {e}"
@@ -284,12 +297,47 @@ class CrewOrchestrator:
if final_report:
break
else:
# No tool calls - the content IS the final report (Direct Answer)
# No tool calls - check if agents were spawned
content = response.content or ""
if content:
final_report = content
# Record it in history but don't yield as "thinking"
self._messages.append({"role": "assistant", "content": content})
# If agents were spawned, call finish to synthesize results
# Otherwise, use the response directly as final report
if self.pool and self.pool.get_workers():
# Agents exist - call finish to synthesize
yield {"phase": "thinking", "content": content}
finish_tool = next(
(t for t in crew_tools if t.name == "finish"), None
)
if finish_tool:
try:
final_report = await finish_tool.execute(
{"context": content}, self.runtime
)
# Track tokens from auto-finish synthesis
if (
hasattr(self.pool, "finish_tokens")
and self.pool.finish_tokens > 0
):
yield {
"phase": "tokens",
"tokens": self.pool.finish_tokens,
}
self.pool.finish_tokens = 0
break
except Exception as e:
yield {
"phase": "error",
"error": f"Auto-finish failed: {e}",
}
break
else:
final_report = content
break
else:
# No agents - response is the final answer
final_report = content
break
self.state = CrewState.COMPLETE

View File

@@ -80,33 +80,48 @@ def create_crew_tools(pool: "WorkerPool", llm: "LLM") -> List[Tool]:
return f"Cancelled {agent_id}"
return f"Could not cancel {agent_id} (not running or not found)"
async def synthesize_findings_fn(arguments: dict, runtime: "Runtime") -> str:
"""Compile all agent results into a unified report."""
async def finish_fn(arguments: dict, runtime: "Runtime") -> str:
"""Finish crew task: wait for agents, synthesize results, return final summary."""
context = arguments.get("context", "")
# Step 1: Wait for all agents to complete
await pool.wait_for(None)
# Step 2: Gather agent results
workers = pool.get_workers()
if not workers:
return "No agents have run yet."
return context or "Task completed. No agents were spawned."
results_text = []
for w in workers:
if w.result:
results_text.append(f"## {w.task}\n{w.result}")
results_text.append(f"## {w.id}: {w.task}\n{w.result}")
elif w.error:
results_text.append(f"## {w.task}\nError: {w.error}")
results_text.append(f"## {w.id}: {w.task}\nError: {w.error}")
if not results_text:
return "No results to synthesize."
return context or "Task completed. Agents ran but produced no results."
prompt = f"""Synthesize these agent findings into a unified penetration test report.
Present concrete findings. Be factual and concise about what was discovered.
# Step 3: Use LLM to synthesize findings into final summary
prompt = f"""Synthesize these agent findings into a clear, concise summary.
{chr(10).join(results_text)}"""
Context: {context or 'Penetration test task completed.'}
Agent Results:
{chr(10).join(results_text)}
Provide a unified summary of what was accomplished and key findings."""
response = await llm.generate(
system_prompt="Synthesize penetration test findings into a clear, actionable report.",
system_prompt="You are synthesizing penetration test results. Be factual, clear, and concise.",
messages=[{"role": "user", "content": prompt}],
tools=[],
)
# Store token usage for orchestrator to report
if response.usage:
pool.finish_tokens = response.usage.get("total_tokens", 0)
return response.content
async def formulate_strategy_fn(arguments: dict, runtime: "Runtime") -> str:
@@ -228,10 +243,19 @@ Present concrete findings. Be factual and concise about what was discovered.
category="orchestration",
),
Tool(
name="synthesize_findings",
description="Compile all agent results into a unified penetration test report. Call this after all agents have completed.",
schema=ToolSchema(type="object", properties={}, required=[]),
execute_fn=synthesize_findings_fn,
name="finish",
description="Complete the crew task. Automatically waits for all agents, synthesizes their results using LLM, and returns final summary. Call this when task objectives are met.",
schema=ToolSchema(
type="object",
properties={
"context": {
"type": "string",
"description": "Optional context about the task for synthesis (e.g., 'Tested SSH access on localhost'). Helps frame the summary.",
}
},
required=[],
),
execute_fn=finish_fn,
category="orchestration",
),
Tool(

View File

@@ -34,7 +34,7 @@ You manage agents using these tools:
- **get_agent_status**: Check on a specific agent
- **cancel_agent**: Stop an agent if needed
- **formulate_strategy**: Define and select a Course of Action (COA). Use this for strategic decisions.
- **synthesize_findings**: Compile all results into a final concise report (call this when done)
- **finish**: Mark task complete. Automatically waits for all agents and accepts your summary.
{% if worker_tools %}
## Worker Agent Tools
@@ -73,8 +73,9 @@ You are responsible for STRATEGIC adaptation.
4. THEN spawn new agents to execute the selected COA.
## Guidelines
- Do not ask the user questions or request input. Make all decisions autonomously.
- Leverage any prior intelligence from earlier reconnaissance
- Be strategic - spawn 2-4 agents in parallel for efficiency
- Each agent task should be specific and actionable
- Adapt your approach based on what agents discover
- Call synthesize_findings when you have enough information for a report
- When done, call `finish` with a summary (brief for simple tasks, detailed for complex ones)

View File

@@ -1498,6 +1498,12 @@ Be concise. Use the actual data from notes."""
if phase == "starting":
self._set_status("thinking", "crew")
elif phase == "tokens":
# Track orchestrator token usage
tokens = update.get("tokens", 0)
self._crew_tokens_used += tokens
self._update_crew_stats()
elif phase == "thinking":
# Show the orchestrator's reasoning
content = update.get("content", "")

View File

@@ -469,6 +469,13 @@ class LocalRuntime(Runtime):
if proc.returncode is None:
proc.terminate()
await proc.wait()
# Close pipes to prevent warnings
if proc.stdin:
proc.stdin.close()
if proc.stdout:
proc.stdout.close()
if proc.stderr:
proc.stderr.close()
except Exception:
pass
self._active_processes.clear()
@@ -553,10 +560,17 @@ class LocalRuntime(Runtime):
env=env,
)
# Track process for cleanup
self._active_processes.append(process)
stdout, stderr = await asyncio.wait_for(
process.communicate(), timeout=timeout
)
# Remove from tracking after completion
if process in self._active_processes:
self._active_processes.remove(process)
# Decode and strip ANSI codes
stdout_str = stdout.decode(errors="replace")
stderr_str = stderr.decode(errors="replace")
@@ -571,15 +585,24 @@ class LocalRuntime(Runtime):
)
except asyncio.TimeoutError:
# Clean up timed-out process
if process in self._active_processes:
self._active_processes.remove(process)
return CommandResult(
exit_code=-1,
stdout="",
stderr=f"Command timed out after {timeout} seconds",
)
except asyncio.CancelledError:
# Clean up cancelled process
if process in self._active_processes:
self._active_processes.remove(process)
# Handle Ctrl+C gracefully
return CommandResult(exit_code=-1, stdout="", stderr="Command cancelled")
except Exception as e:
# Clean up on error
if process in self._active_processes:
self._active_processes.remove(process)
return CommandResult(exit_code=-1, stdout="", stderr=str(e))
async def browser_action(self, action: str, **kwargs) -> dict: