mirror of
https://github.com/GH05TCREW/pentestagent.git
synced 2026-03-08 06:44:11 +00:00
fix: agent completion loop and crew synthesis robustness
This commit is contained in:
@@ -90,12 +90,22 @@ class BaseAgent(ABC):
|
||||
max_iterations: Maximum iterations before forcing stop (safety limit)
|
||||
"""
|
||||
self.llm = llm
|
||||
self.tools = tools
|
||||
self.runtime = runtime
|
||||
self.max_iterations = max_iterations
|
||||
self.state_manager = AgentStateManager()
|
||||
self.conversation_history: List[AgentMessage] = []
|
||||
|
||||
# Each agent gets its own plan instance
|
||||
from ..tools.finish import TaskPlan
|
||||
|
||||
self._task_plan = TaskPlan()
|
||||
|
||||
# Attach plan to runtime so finish tool can access it
|
||||
self.runtime.plan = self._task_plan
|
||||
|
||||
# Use tools as-is (finish accesses plan via runtime)
|
||||
self.tools = list(tools)
|
||||
|
||||
@property
|
||||
def state(self) -> AgentState:
|
||||
"""Get current agent state."""
|
||||
@@ -134,14 +144,20 @@ class BaseAgent(ABC):
|
||||
self.state_manager.transition_to(AgentState.IDLE)
|
||||
|
||||
@abstractmethod
|
||||
def get_system_prompt(self) -> str:
|
||||
"""Return the system prompt for this agent."""
|
||||
def get_system_prompt(self, mode: str = "agent") -> str:
|
||||
"""Return the system prompt for this agent.
|
||||
|
||||
Args:
|
||||
mode: 'agent' for autonomous mode, 'assist' for single-shot assist mode
|
||||
"""
|
||||
pass
|
||||
|
||||
async def agent_loop(self, initial_message: str) -> AsyncIterator[AgentMessage]:
|
||||
"""
|
||||
Main agent execution loop.
|
||||
|
||||
Starts a new task session, resetting previous state and history.
|
||||
|
||||
Simple control flow:
|
||||
- Tool calls: Execute tools, continue loop
|
||||
- Text response (no tools): Done
|
||||
@@ -153,6 +169,9 @@ class BaseAgent(ABC):
|
||||
Yields:
|
||||
AgentMessage objects as the agent processes
|
||||
"""
|
||||
# Always reset for a new agent loop task to ensure clean state
|
||||
self.reset()
|
||||
|
||||
self.state_manager.transition_to(AgentState.THINKING)
|
||||
self.conversation_history.append(
|
||||
AgentMessage(role="user", content=initial_message)
|
||||
@@ -186,112 +205,137 @@ class BaseAgent(ABC):
|
||||
Core agent loop logic - shared by agent_loop and continue_conversation.
|
||||
|
||||
Termination conditions:
|
||||
1. finish tool is called -> clean exit with summary
|
||||
1. finish tool is called AND plan complete -> clean exit with summary
|
||||
2. max_iterations reached -> forced exit with warning
|
||||
3. error -> exit with error state
|
||||
|
||||
Text responses WITHOUT tool calls are treated as "thinking out loud"
|
||||
and do NOT terminate the loop. This prevents premature stopping.
|
||||
|
||||
The loop enforces plan completion before allowing finish.
|
||||
|
||||
Yields:
|
||||
AgentMessage objects as the agent processes
|
||||
"""
|
||||
from ..tools.completion import extract_completion_summary, is_task_complete
|
||||
# Clear any previous plan for new task
|
||||
self._task_plan.clear()
|
||||
|
||||
iteration = 0
|
||||
|
||||
while iteration < self.max_iterations:
|
||||
iteration += 1
|
||||
|
||||
# ITERATION 1: Force plan creation (loop-enforced, not prompt-based)
|
||||
if iteration == 1 and len(self._task_plan.steps) == 0:
|
||||
plan_msg = await self._auto_generate_plan()
|
||||
if plan_msg:
|
||||
yield plan_msg
|
||||
|
||||
response = await self.llm.generate(
|
||||
system_prompt=self.get_system_prompt(),
|
||||
messages=self._format_messages_for_llm(),
|
||||
tools=self.tools,
|
||||
)
|
||||
|
||||
if response.tool_calls:
|
||||
# Build tool calls list FIRST (before execution)
|
||||
tool_calls = [
|
||||
ToolCall(
|
||||
id=tc.id if hasattr(tc, "id") else str(i),
|
||||
name=(
|
||||
tc.function.name
|
||||
if hasattr(tc, "function")
|
||||
else tc.get("name", "")
|
||||
),
|
||||
arguments=self._parse_arguments(tc),
|
||||
)
|
||||
for i, tc in enumerate(response.tool_calls)
|
||||
]
|
||||
|
||||
# Yield early - show tool calls before execution starts
|
||||
early_msg = AgentMessage(
|
||||
# Case 1: Empty response (Error)
|
||||
if not response.tool_calls and not response.content:
|
||||
stuck_msg = AgentMessage(
|
||||
role="assistant",
|
||||
content=response.content or "",
|
||||
tool_calls=tool_calls,
|
||||
tool_results=[], # No results yet
|
||||
usage=response.usage,
|
||||
content="Agent returned empty response. Exiting gracefully.",
|
||||
metadata={"empty_response": True},
|
||||
)
|
||||
yield early_msg
|
||||
self.conversation_history.append(stuck_msg)
|
||||
yield stuck_msg
|
||||
self.state_manager.transition_to(AgentState.COMPLETE)
|
||||
return
|
||||
|
||||
# Now execute tools
|
||||
self.state_manager.transition_to(AgentState.EXECUTING)
|
||||
tool_results = await self._execute_tools(response.tool_calls)
|
||||
|
||||
# Record in history
|
||||
assistant_msg = AgentMessage(
|
||||
# Case 2: Thinking / Intermediate Output (Content but no tools)
|
||||
if not response.tool_calls:
|
||||
thinking_msg = AgentMessage(
|
||||
role="assistant",
|
||||
content=response.content or "",
|
||||
tool_calls=tool_calls,
|
||||
content=response.content,
|
||||
usage=response.usage,
|
||||
metadata={"intermediate": True},
|
||||
)
|
||||
self.conversation_history.append(assistant_msg)
|
||||
self.conversation_history.append(thinking_msg)
|
||||
yield thinking_msg
|
||||
continue
|
||||
|
||||
tool_result_msg = AgentMessage(
|
||||
role="tool_result", content="", tool_results=tool_results
|
||||
# Case 3: Tool Execution
|
||||
# Build tool calls list
|
||||
tool_calls = [
|
||||
ToolCall(
|
||||
id=tc.id if hasattr(tc, "id") else str(i),
|
||||
name=(
|
||||
tc.function.name
|
||||
if hasattr(tc, "function")
|
||||
else tc.get("name", "")
|
||||
),
|
||||
arguments=self._parse_arguments(tc),
|
||||
)
|
||||
self.conversation_history.append(tool_result_msg)
|
||||
for i, tc in enumerate(response.tool_calls)
|
||||
]
|
||||
|
||||
# Check for explicit task_complete signal
|
||||
for result in tool_results:
|
||||
if result.success and result.result and is_task_complete(result.result):
|
||||
summary = extract_completion_summary(result.result)
|
||||
# Yield results with completion summary
|
||||
display_msg = AgentMessage(
|
||||
role="assistant",
|
||||
content=summary,
|
||||
tool_calls=tool_calls,
|
||||
tool_results=tool_results,
|
||||
usage=response.usage,
|
||||
metadata={"task_complete": True},
|
||||
)
|
||||
yield display_msg
|
||||
self.state_manager.transition_to(AgentState.COMPLETE)
|
||||
return
|
||||
# Execute tools
|
||||
self.state_manager.transition_to(AgentState.EXECUTING)
|
||||
|
||||
# Yield results for display update (no completion yet)
|
||||
display_msg = AgentMessage(
|
||||
# Yield thinking message if content exists (before execution)
|
||||
if response.content:
|
||||
thinking_msg = AgentMessage(
|
||||
role="assistant",
|
||||
content=response.content or "",
|
||||
tool_calls=tool_calls,
|
||||
tool_results=tool_results,
|
||||
content=response.content,
|
||||
usage=response.usage,
|
||||
metadata={"intermediate": True},
|
||||
)
|
||||
yield display_msg
|
||||
self.state_manager.transition_to(AgentState.THINKING)
|
||||
else:
|
||||
# Text response WITHOUT tool calls = thinking/intermediate output
|
||||
# Store it but DON'T terminate - wait for task_complete
|
||||
if response.content:
|
||||
thinking_msg = AgentMessage(
|
||||
role="assistant",
|
||||
content=response.content,
|
||||
usage=response.usage,
|
||||
metadata={"intermediate": True},
|
||||
)
|
||||
self.conversation_history.append(thinking_msg)
|
||||
yield thinking_msg
|
||||
# Continue loop - only task_complete or max_iterations stops us
|
||||
yield thinking_msg
|
||||
|
||||
tool_results = await self._execute_tools(response.tool_calls)
|
||||
|
||||
# Record in history
|
||||
assistant_msg = AgentMessage(
|
||||
role="assistant",
|
||||
content=response.content or "",
|
||||
tool_calls=tool_calls,
|
||||
usage=response.usage,
|
||||
)
|
||||
self.conversation_history.append(assistant_msg)
|
||||
|
||||
tool_result_msg = AgentMessage(
|
||||
role="tool_result", content="", tool_results=tool_results
|
||||
)
|
||||
self.conversation_history.append(tool_result_msg)
|
||||
|
||||
# Yield results for display update immediately
|
||||
display_msg = AgentMessage(
|
||||
role="assistant",
|
||||
content="", # Suppress content here as it was already yielded as thinking
|
||||
tool_calls=tool_calls,
|
||||
tool_results=tool_results,
|
||||
usage=response.usage,
|
||||
)
|
||||
yield display_msg
|
||||
|
||||
# Check if plan is now complete
|
||||
if self._task_plan.is_complete():
|
||||
# All steps done - generate final summary
|
||||
summary_response = await self.llm.generate(
|
||||
system_prompt="You are a helpful assistant. Provide a brief, clear summary of what was accomplished.",
|
||||
messages=self._format_messages_for_llm(),
|
||||
tools=self.tools, # Must provide tools if history contains tool calls
|
||||
)
|
||||
|
||||
completion_msg = AgentMessage(
|
||||
role="assistant",
|
||||
content=summary_response.content or "Task complete.",
|
||||
usage=summary_response.usage,
|
||||
metadata={"task_complete": True},
|
||||
)
|
||||
self.conversation_history.append(completion_msg)
|
||||
yield completion_msg
|
||||
self.state_manager.transition_to(AgentState.COMPLETE)
|
||||
return
|
||||
|
||||
self.state_manager.transition_to(AgentState.THINKING)
|
||||
|
||||
# Max iterations reached - force stop
|
||||
warning_msg = AgentMessage(
|
||||
@@ -424,6 +468,128 @@ class BaseAgent(ABC):
|
||||
return tool
|
||||
return None
|
||||
|
||||
def _can_finish(self) -> tuple[bool, str]:
|
||||
"""Check if the agent can finish based on plan completion."""
|
||||
if len(self._task_plan.steps) == 0:
|
||||
return True, "No plan exists"
|
||||
|
||||
pending = self._task_plan.get_pending_steps()
|
||||
if pending:
|
||||
pending_desc = ", ".join(
|
||||
f"Step {s.id}: {s.description}" for s in pending[:3]
|
||||
)
|
||||
more = f" (+{len(pending) - 3} more)" if len(pending) > 3 else ""
|
||||
return False, f"Incomplete: {pending_desc}{more}"
|
||||
|
||||
return True, "All steps complete"
|
||||
|
||||
async def _auto_generate_plan(self) -> Optional[AgentMessage]:
|
||||
"""
|
||||
Automatically generate a plan from the user's request (loop-enforced).
|
||||
|
||||
This is called on iteration 1 to force plan creation before any tool execution.
|
||||
Uses function calling for reliable structured output.
|
||||
|
||||
Returns:
|
||||
AgentMessage with plan display, or None if generation fails
|
||||
"""
|
||||
from ..tools.finish import PlanStep
|
||||
from ..tools.registry import Tool, ToolSchema
|
||||
|
||||
# Get the user's original request (last message)
|
||||
user_request = ""
|
||||
for msg in reversed(self.conversation_history):
|
||||
if msg.role == "user":
|
||||
user_request = msg.content
|
||||
break
|
||||
|
||||
if not user_request:
|
||||
return None # No request to plan
|
||||
|
||||
# Create a temporary tool for plan generation (function calling)
|
||||
plan_generator_tool = Tool(
|
||||
name="create_plan",
|
||||
description="Create a step-by-step plan for the task. Call this with the steps needed.",
|
||||
schema=ToolSchema(
|
||||
properties={
|
||||
"steps": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
"description": "List of actionable steps (one tool action each)",
|
||||
},
|
||||
},
|
||||
required=["steps"],
|
||||
),
|
||||
execute_fn=lambda args, runtime: None, # Dummy - we parse args directly
|
||||
category="planning",
|
||||
)
|
||||
|
||||
plan_prompt = f"""Break this request into minimal, actionable steps.
|
||||
|
||||
Request: {user_request}
|
||||
|
||||
Guidelines:
|
||||
- Be concise (typically 2-4 steps)
|
||||
- One tool action per step
|
||||
- Don't include waiting/loading (handled automatically)
|
||||
- Do NOT include a "finish", "complete", or "verify" step (handled automatically)
|
||||
|
||||
Call the create_plan tool with your steps."""
|
||||
|
||||
try:
|
||||
response = await self.llm.generate(
|
||||
system_prompt="You are a task planning assistant. Always use the create_plan tool.",
|
||||
messages=[{"role": "user", "content": plan_prompt}],
|
||||
tools=[plan_generator_tool],
|
||||
)
|
||||
|
||||
# Extract steps from tool call arguments
|
||||
steps = []
|
||||
if response.tool_calls:
|
||||
for tc in response.tool_calls:
|
||||
args = self._parse_arguments(tc)
|
||||
if args.get("steps"):
|
||||
steps = args["steps"]
|
||||
break
|
||||
|
||||
# Fallback: if LLM didn't provide steps, create single-step plan
|
||||
if not steps:
|
||||
steps = [user_request]
|
||||
|
||||
# Create the plan
|
||||
self._task_plan.original_request = user_request
|
||||
self._task_plan.steps = [
|
||||
PlanStep(id=i + 1, description=str(step).strip())
|
||||
for i, step in enumerate(steps)
|
||||
]
|
||||
|
||||
# Add a system message showing the generated plan
|
||||
plan_display = ["Plan:"]
|
||||
for step in self._task_plan.steps:
|
||||
plan_display.append(f" {step.id}. {step.description}")
|
||||
|
||||
plan_msg = AgentMessage(
|
||||
role="assistant",
|
||||
content="\n".join(plan_display),
|
||||
metadata={"auto_plan": True},
|
||||
)
|
||||
self.conversation_history.append(plan_msg)
|
||||
return plan_msg
|
||||
|
||||
except Exception as e:
|
||||
# Plan generation failed - create fallback single-step plan
|
||||
self._task_plan.original_request = user_request
|
||||
self._task_plan.steps = [PlanStep(id=1, description=user_request)]
|
||||
|
||||
error_msg = AgentMessage(
|
||||
role="assistant",
|
||||
content=f"Plan generation failed: {str(e)}\nUsing fallback: treating request as single step.",
|
||||
metadata={"auto_plan_failed": True},
|
||||
)
|
||||
self.conversation_history.append(error_msg)
|
||||
return error_msg
|
||||
return error_msg
|
||||
|
||||
def reset(self):
|
||||
"""Reset the agent state for a new conversation."""
|
||||
self.state_manager.reset()
|
||||
@@ -453,7 +619,7 @@ class BaseAgent(ABC):
|
||||
|
||||
# Single LLM call with tools available
|
||||
response = await self.llm.generate(
|
||||
system_prompt=self.get_system_prompt(),
|
||||
system_prompt=self.get_system_prompt(mode="assist"),
|
||||
messages=self._format_messages_for_llm(),
|
||||
tools=assist_tools,
|
||||
)
|
||||
@@ -476,10 +642,13 @@ class BaseAgent(ABC):
|
||||
|
||||
# Yield tool calls IMMEDIATELY (before execution) for UI display
|
||||
# Include any thinking/planning content from the LLM
|
||||
thinking_msg = AgentMessage(
|
||||
role="assistant", content=response.content or "", tool_calls=tool_calls
|
||||
)
|
||||
yield thinking_msg
|
||||
if response.content:
|
||||
thinking_msg = AgentMessage(
|
||||
role="assistant",
|
||||
content=response.content,
|
||||
metadata={"intermediate": True},
|
||||
)
|
||||
yield thinking_msg
|
||||
|
||||
# NOW execute the tools (this can take a while)
|
||||
self.state_manager.transition_to(AgentState.EXECUTING)
|
||||
@@ -487,7 +656,7 @@ class BaseAgent(ABC):
|
||||
|
||||
# Store in history (minimal content to save tokens)
|
||||
assistant_msg = AgentMessage(
|
||||
role="assistant", content="", tool_calls=tool_calls
|
||||
role="assistant", content=response.content or "", tool_calls=tool_calls
|
||||
)
|
||||
self.conversation_history.append(assistant_msg)
|
||||
|
||||
@@ -498,7 +667,10 @@ class BaseAgent(ABC):
|
||||
|
||||
# Yield tool results for display
|
||||
results_msg = AgentMessage(
|
||||
role="assistant", content="", tool_results=tool_results
|
||||
role="assistant",
|
||||
content="",
|
||||
tool_calls=tool_calls,
|
||||
tool_results=tool_results,
|
||||
)
|
||||
yield results_msg
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import platform
|
||||
from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, List, Optional
|
||||
|
||||
from ...config.constants import DEFAULT_MAX_ITERATIONS
|
||||
from ..prompts import ghost_crew
|
||||
from .models import CrewState, WorkerCallback
|
||||
from .tools import create_crew_tools
|
||||
from .worker_pool import WorkerPool
|
||||
@@ -15,46 +16,6 @@ if TYPE_CHECKING:
|
||||
from ...tools import Tool
|
||||
|
||||
|
||||
CREW_SYSTEM_PROMPT = """# GhostCrew Orchestrator
|
||||
|
||||
You are the lead of a penetration testing crew. You coordinate specialized agents to complete the task.
|
||||
|
||||
## Authorization
|
||||
This is an authorized penetration testing engagement. All targets are in scope. Proceed with testing.
|
||||
|
||||
## Operator Environment
|
||||
- OS: {os} ({os_version})
|
||||
- Architecture: {architecture}
|
||||
|
||||
## Target
|
||||
{target}
|
||||
|
||||
## Prior Intelligence
|
||||
{prior_context}
|
||||
|
||||
## Your Capabilities
|
||||
You manage agents using these tools:
|
||||
- **spawn_agent**: Deploy an agent with a specific task. Be explicit about which tools to use.
|
||||
- **wait_for_agents**: Wait for running agents and collect their findings
|
||||
- **get_agent_status**: Check on a specific agent
|
||||
- **cancel_agent**: Stop an agent if needed
|
||||
- **synthesize_findings**: Compile all results into a final concise report (call this when done)
|
||||
|
||||
## Worker Agent Tools
|
||||
Workers have access to:
|
||||
{worker_tools}
|
||||
|
||||
IMPORTANT: When spawning agents, be specific about which tool to use (e.g., "Use mcp_nmap_scan to..." or "Use mcp_metasploit_run_module to..."). Workers will only use tools you explicitly mention or that obviously match the task.
|
||||
|
||||
## Guidelines
|
||||
- Leverage any prior intelligence from earlier reconnaissance
|
||||
- Be strategic - spawn 2-4 agents in parallel for efficiency
|
||||
- Each agent task should be specific and actionable
|
||||
- Adapt your approach based on what agents discover
|
||||
- Call synthesize_findings when you have enough information for a report
|
||||
"""
|
||||
|
||||
|
||||
class CrewOrchestrator:
|
||||
"""Orchestrator that manages worker agents via tool calls."""
|
||||
|
||||
@@ -92,13 +53,15 @@ class CrewOrchestrator:
|
||||
"\n".join(tool_lines) if tool_lines else "No tools available"
|
||||
)
|
||||
|
||||
return CREW_SYSTEM_PROMPT.format(
|
||||
return ghost_crew.render(
|
||||
target=self.target or "Not specified",
|
||||
prior_context=self.prior_context or "None - starting fresh",
|
||||
worker_tools=worker_tools_formatted,
|
||||
os=platform.system(),
|
||||
os_version=platform.release(),
|
||||
architecture=platform.machine(),
|
||||
environment={
|
||||
"os": platform.system(),
|
||||
"os_version": platform.release(),
|
||||
"architecture": platform.machine(),
|
||||
},
|
||||
)
|
||||
|
||||
async def run(self, task: str) -> AsyncIterator[Dict[str, Any]]:
|
||||
@@ -134,13 +97,15 @@ class CrewOrchestrator:
|
||||
tools=crew_tools,
|
||||
)
|
||||
|
||||
if response.content:
|
||||
yield {"phase": "thinking", "content": response.content}
|
||||
self._messages.append(
|
||||
{"role": "assistant", "content": response.content}
|
||||
)
|
||||
|
||||
# Check for tool calls first to determine if content is "thinking" or "final answer"
|
||||
if response.tool_calls:
|
||||
# If there are tool calls, the content is "thinking" (reasoning before action)
|
||||
if response.content:
|
||||
yield {"phase": "thinking", "content": response.content}
|
||||
self._messages.append(
|
||||
{"role": "assistant", "content": response.content}
|
||||
)
|
||||
|
||||
def get_tc_name(tc):
|
||||
if hasattr(tc, "function"):
|
||||
return tc.function.name
|
||||
@@ -245,9 +210,12 @@ class CrewOrchestrator:
|
||||
if final_report:
|
||||
break
|
||||
else:
|
||||
# No tool calls - the content IS the final report (Direct Answer)
|
||||
content = response.content or ""
|
||||
if content:
|
||||
final_report = content
|
||||
# Record it in history but don't yield as "thinking"
|
||||
self._messages.append({"role": "assistant", "content": content})
|
||||
break
|
||||
|
||||
self.state = CrewState.COMPLETE
|
||||
|
||||
@@ -103,10 +103,16 @@ class WorkerPool:
|
||||
worker.started_at = time.time()
|
||||
self._emit(worker.id, "status", {"status": "running"})
|
||||
|
||||
# Create isolated runtime for this worker (prevents browser state conflicts)
|
||||
from ...runtime.runtime import LocalRuntime
|
||||
|
||||
worker_runtime = LocalRuntime()
|
||||
await worker_runtime.start()
|
||||
|
||||
agent = GhostCrewAgent(
|
||||
llm=self.llm,
|
||||
tools=self.tools,
|
||||
runtime=self.runtime,
|
||||
runtime=worker_runtime, # Use isolated runtime
|
||||
target=self.target,
|
||||
rag_engine=self.rag_engine,
|
||||
)
|
||||
@@ -156,6 +162,13 @@ class WorkerPool:
|
||||
worker.completed_at = time.time()
|
||||
self._emit(worker.id, "error", {"error": str(e)})
|
||||
|
||||
finally:
|
||||
# Cleanup worker's isolated runtime
|
||||
try:
|
||||
await worker_runtime.stop()
|
||||
except Exception:
|
||||
pass # Best effort cleanup
|
||||
|
||||
async def _wait_for_dependencies(self, depends_on: List[str]) -> None:
|
||||
"""Wait for dependent workers to complete."""
|
||||
for dep_id in depends_on:
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
"""GhostCrew main pentesting agent."""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, List, Optional
|
||||
|
||||
from jinja2 import Template
|
||||
|
||||
from ..base_agent import BaseAgent
|
||||
from ..prompts import ghost_agent, ghost_assist
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ...knowledge import RAGEngine
|
||||
@@ -43,15 +41,13 @@ class GhostCrewAgent(BaseAgent):
|
||||
self.target = target
|
||||
self.scope = scope or []
|
||||
self.rag_engine = rag_engine
|
||||
self._system_prompt_template = self._load_prompt_template()
|
||||
|
||||
def _load_prompt_template(self) -> Template:
|
||||
"""Load the Jinja2 system prompt template."""
|
||||
template_path = Path(__file__).parent / "system_prompt.jinja"
|
||||
return Template(template_path.read_text(encoding="utf-8"))
|
||||
def get_system_prompt(self, mode: str = "agent") -> str:
|
||||
"""Generate system prompt with context.
|
||||
|
||||
def get_system_prompt(self) -> str:
|
||||
"""Generate system prompt with context."""
|
||||
Args:
|
||||
mode: 'agent' for autonomous mode, 'assist' for single-shot assist mode
|
||||
"""
|
||||
# Get RAG context if available
|
||||
rag_context = ""
|
||||
if self.rag_engine and self.conversation_history:
|
||||
@@ -68,15 +64,32 @@ class GhostCrewAgent(BaseAgent):
|
||||
if relevant:
|
||||
rag_context = "\n\n".join(relevant)
|
||||
|
||||
# Get saved notes if available
|
||||
notes_context = ""
|
||||
try:
|
||||
from ...tools.notes import get_all_notes_sync
|
||||
|
||||
notes = get_all_notes_sync()
|
||||
if notes:
|
||||
notes_lines = [f"- {key}: {value}" for key, value in notes.items()]
|
||||
notes_context = "\n".join(notes_lines)
|
||||
except Exception:
|
||||
pass # Notes not available
|
||||
|
||||
# Get environment info from runtime
|
||||
env = self.runtime.environment
|
||||
|
||||
return self._system_prompt_template.render(
|
||||
# Select template based on mode
|
||||
template = ghost_assist if mode == "assist" else ghost_agent
|
||||
|
||||
return template.render(
|
||||
target=self.target,
|
||||
scope=self.scope,
|
||||
environment=env,
|
||||
rag_context=rag_context,
|
||||
notes_context=notes_context,
|
||||
tools=self.tools,
|
||||
plan=self._task_plan if mode == "agent" else None,
|
||||
)
|
||||
|
||||
def set_target(self, target: str, scope: Optional[List[str]] = None):
|
||||
|
||||
26
ghostcrew/agents/prompts/__init__.py
Normal file
26
ghostcrew/agents/prompts/__init__.py
Normal file
@@ -0,0 +1,26 @@
|
||||
"""Prompt templates for GhostCrew agents."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from jinja2 import Template
|
||||
|
||||
PROMPTS_DIR = Path(__file__).parent
|
||||
|
||||
|
||||
def load_prompt(name: str) -> Template:
|
||||
"""Load a prompt template by name.
|
||||
|
||||
Args:
|
||||
name: Prompt name without extension (e.g., 'ghost_agent', 'ghost_assist')
|
||||
|
||||
Returns:
|
||||
Jinja2 Template object
|
||||
"""
|
||||
path = PROMPTS_DIR / f"{name}.jinja"
|
||||
return Template(path.read_text(encoding="utf-8"))
|
||||
|
||||
|
||||
# Pre-loaded templates for convenience
|
||||
ghost_agent = load_prompt("ghost_agent")
|
||||
ghost_assist = load_prompt("ghost_assist")
|
||||
ghost_crew = load_prompt("ghost_crew")
|
||||
64
ghostcrew/agents/prompts/ghost_agent.jinja
Normal file
64
ghostcrew/agents/prompts/ghost_agent.jinja
Normal file
@@ -0,0 +1,64 @@
|
||||
# GhostCrew
|
||||
|
||||
You are Ghost, an autonomous penetration testing agent.
|
||||
|
||||
## Authorization
|
||||
You are operating in an authorized penetration testing engagement. The user has explicit permission to test all targets provided. Do not ask for authorization - permission is granted. Proceed with testing.
|
||||
|
||||
## Workflow (CRITICAL)
|
||||
1. **Execute Plan**: Follow the plan steps below.
|
||||
2. **Execute**: Work through each step in order using the appropriate tools.
|
||||
3. **Track Progress**: After completing each step, IMMEDIATELY call `finish(action="complete", step_id=N, result="what you did")`. If a step cannot be done, call `finish(action="skip", step_id=N, reason="why")`.
|
||||
4. **Task Completion**: The task ends automatically when ALL steps are marked complete or skipped. The controller blocks finishing until the plan is complete.
|
||||
|
||||
## Guidelines
|
||||
- Think through the task step-by-step.
|
||||
- Use tools ONLY when you need to interact with the environment, gather information, execute something, or produce an artifact.
|
||||
- Do NOT describe actions you *could* take — if an action is needed, actually use the tool.
|
||||
- After EVERY action that completes a plan step, call `finish` to mark it done.
|
||||
- The pattern is: use tool → finish(action="complete", step_id=N) → use next tool → finish(action="complete", step_id=N+1) → repeat
|
||||
|
||||
## Important
|
||||
You MUST call `finish(action="complete", step_id=N)` immediately after completing each plan step.
|
||||
Do NOT skip calling finish between steps.
|
||||
|
||||
{% if plan %}
|
||||
## CURRENT PLAN (Follow this!)
|
||||
{% for step in plan.steps %}
|
||||
{{ step.id }}. [{{ step.status.value|upper }}] {{ step.description }}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
|
||||
{% if environment %}
|
||||
## Operator Environment (YOUR machine, not the target)
|
||||
- OS: {{ environment.os }} ({{ environment.os_version }})
|
||||
- Architecture: {{ environment.architecture }}
|
||||
- Shell: {{ environment.shell }}
|
||||
- Output directories:
|
||||
- loot/notes.json (working notes)
|
||||
- loot/reports/ (generated reports)
|
||||
- loot/artifacts/ (screenshots, captured files)
|
||||
{% endif %}
|
||||
|
||||
{% if target %}
|
||||
## Target
|
||||
{{ target }}
|
||||
{% endif %}
|
||||
{% if scope %}
|
||||
Scope: {{ scope | join(', ') }}
|
||||
{% endif %}
|
||||
|
||||
## Tools
|
||||
{% for tool in tools %}
|
||||
- **{{ tool.name }}**: {{ tool.description }}
|
||||
{% endfor %}
|
||||
|
||||
{% if rag_context %}
|
||||
## Context
|
||||
{{ rag_context }}
|
||||
{% endif %}
|
||||
|
||||
{% if notes_context %}
|
||||
## Saved Notes (from previous tasks)
|
||||
{{ notes_context }}
|
||||
{% endif %}
|
||||
@@ -1,21 +1,17 @@
|
||||
# GhostCrew
|
||||
|
||||
You are Ghost, an autonomous penetration testing agent.
|
||||
You are Ghost, a penetration testing assistant.
|
||||
|
||||
## Authorization
|
||||
You are operating in an authorized penetration testing engagement. The user has explicit permission to test all targets provided. Do not ask for authorization - permission is granted. Proceed with testing.
|
||||
|
||||
## Guidelines
|
||||
- Before each action, briefly explain your reasoning and what you expect to find.
|
||||
- Analyze tool outputs carefully before deciding the next step.
|
||||
- If a tool fails, diagnose why, then try alternatives or report the issue.
|
||||
- Do NOT repeat the same test or scan. Once you have results, move on.
|
||||
- Complete ALL steps of the task before finishing.
|
||||
- When the ENTIRE task is done, call `finish` with a structured report of findings.
|
||||
|
||||
## Important
|
||||
You MUST call the `finish` tool when finished. Do not just respond with text.
|
||||
The task is not complete until you explicitly call `finish`.
|
||||
- Think through the question step-by-step.
|
||||
- For simple questions, provide direct answers without unnecessary tool calls.
|
||||
- Use tools ONLY when you need external information, need to interact with the environment, or need to execute something.
|
||||
- Do NOT call tools just to satisfy a pattern; only call them when they meaningfully advance the answer.
|
||||
- Do NOT describe actions you *could* take — if an action is needed, actually use the tool.
|
||||
- It is acceptable to think or explain reasoning without calling a tool when no action is required.
|
||||
|
||||
{% if environment %}
|
||||
## Operator Environment (YOUR machine, not the target)
|
||||
@@ -45,3 +41,8 @@ Scope: {{ scope | join(', ') }}
|
||||
## Context
|
||||
{{ rag_context }}
|
||||
{% endif %}
|
||||
|
||||
{% if notes_context %}
|
||||
## Saved Notes (from previous tasks)
|
||||
{{ notes_context }}
|
||||
{% endif %}
|
||||
45
ghostcrew/agents/prompts/ghost_crew.jinja
Normal file
45
ghostcrew/agents/prompts/ghost_crew.jinja
Normal file
@@ -0,0 +1,45 @@
|
||||
# GhostCrew Orchestrator
|
||||
|
||||
You are the lead of a penetration testing crew. You coordinate specialized agents to complete the task.
|
||||
|
||||
## Authorization
|
||||
This is an authorized penetration testing engagement. All targets are in scope. Proceed with testing.
|
||||
|
||||
{% if environment %}
|
||||
## Operator Environment
|
||||
- OS: {{ environment.os }} ({{ environment.os_version }})
|
||||
- Architecture: {{ environment.architecture }}
|
||||
{% endif %}
|
||||
|
||||
{% if target %}
|
||||
## Target
|
||||
{{ target }}
|
||||
{% endif %}
|
||||
|
||||
{% if prior_context %}
|
||||
## Prior Intelligence
|
||||
{{ prior_context }}
|
||||
{% endif %}
|
||||
|
||||
## Your Capabilities
|
||||
You manage agents using these tools:
|
||||
- **spawn_agent**: Deploy an agent with a specific task. Be explicit about which tools to use.
|
||||
- **wait_for_agents**: Wait for running agents and collect their findings
|
||||
- **get_agent_status**: Check on a specific agent
|
||||
- **cancel_agent**: Stop an agent if needed
|
||||
- **synthesize_findings**: Compile all results into a final concise report (call this when done)
|
||||
|
||||
{% if worker_tools %}
|
||||
## Worker Agent Tools
|
||||
Workers have access to:
|
||||
{{ worker_tools }}
|
||||
{% endif %}
|
||||
|
||||
IMPORTANT: When spawning agents, be specific about which tool to use (e.g., "Use mcp_nmap_scan to..." or "Use mcp_metasploit_run_module to..."). Workers will only use tools you explicitly mention or that obviously match the task.
|
||||
|
||||
## Guidelines
|
||||
- Leverage any prior intelligence from earlier reconnaissance
|
||||
- Be strategic - spawn 2-4 agents in parallel for efficiency
|
||||
- Each agent task should be specific and actionable
|
||||
- Adapt your approach based on what agents discover
|
||||
- Call synthesize_findings when you have enough information for a report
|
||||
@@ -822,11 +822,11 @@ class GhostCrewTUI(App):
|
||||
else:
|
||||
self._add_system("Agent not initialized")
|
||||
|
||||
def _show_notes(self) -> None:
|
||||
async def _show_notes(self) -> None:
|
||||
"""Display saved notes"""
|
||||
from ..tools.notes import get_all_notes
|
||||
|
||||
notes = get_all_notes()
|
||||
notes = await get_all_notes()
|
||||
if not notes:
|
||||
self._add_system(
|
||||
"=== Notes ===\nNo notes saved.\n\nThe AI can save key findings using the notes tool."
|
||||
@@ -937,7 +937,7 @@ class GhostCrewTUI(App):
|
||||
self._add_system("[!] Agent not initialized")
|
||||
return
|
||||
|
||||
notes = get_all_notes()
|
||||
notes = await get_all_notes()
|
||||
if not notes:
|
||||
self._add_system(
|
||||
"No notes found. Ghost saves findings using the notes tool during testing."
|
||||
@@ -1062,7 +1062,7 @@ Be concise. Use the actual data from notes."""
|
||||
elif cmd_lower == "/memory":
|
||||
self._show_memory_stats()
|
||||
elif cmd_lower == "/notes":
|
||||
self._show_notes()
|
||||
await self._show_notes()
|
||||
elif cmd_lower == "/report":
|
||||
self._run_report_generation()
|
||||
elif cmd_original.startswith("/target"):
|
||||
@@ -1610,7 +1610,9 @@ Be concise. Use the actual data from notes."""
|
||||
# Show thinking/plan FIRST if there's content with tool calls
|
||||
if response.content:
|
||||
content = response.content.strip()
|
||||
if response.tool_calls:
|
||||
# If it has tool calls, it's thinking.
|
||||
# If it's marked as intermediate, it's thinking.
|
||||
if response.tool_calls or response.metadata.get("intermediate"):
|
||||
self._add_thinking(content)
|
||||
else:
|
||||
# Check if this is a task completion message
|
||||
@@ -1619,20 +1621,20 @@ Be concise. Use the actual data from notes."""
|
||||
else:
|
||||
self._add_assistant(content)
|
||||
|
||||
# Show tool calls AFTER thinking (skip 'finish' - internal control)
|
||||
# Show tool calls AFTER thinking
|
||||
if response.tool_calls:
|
||||
for call in response.tool_calls:
|
||||
if call.name == "finish":
|
||||
continue # Skip - summary shown as final message
|
||||
# Show all tools including finish
|
||||
args_str = str(call.arguments)
|
||||
self._add_tool(call.name, args_str)
|
||||
|
||||
# Show tool results
|
||||
# Skip 'finish' tool - its result is shown as the final summary
|
||||
if response.tool_results:
|
||||
for result in response.tool_results:
|
||||
if result.tool_name == "finish":
|
||||
continue # Skip - summary shown separately
|
||||
# Skip showing result for finish tool as it's redundant with the tool call display
|
||||
continue
|
||||
|
||||
if result.success:
|
||||
self._add_tool_result(
|
||||
result.tool_name, result.result or "Done"
|
||||
|
||||
@@ -136,10 +136,14 @@ class LLM:
|
||||
call_kwargs = {
|
||||
"model": self.model,
|
||||
"messages": llm_messages,
|
||||
"tools": llm_tools,
|
||||
"temperature": self.config.temperature,
|
||||
"max_tokens": self.config.max_tokens,
|
||||
}
|
||||
|
||||
# Only include tools if they exist (Anthropic requires tools param to be omitted, not None/empty)
|
||||
if llm_tools:
|
||||
call_kwargs["tools"] = llm_tools
|
||||
|
||||
# Only add optional params if explicitly changed from defaults
|
||||
if self.config.top_p != 1.0:
|
||||
call_kwargs["top_p"] = self.config.top_p
|
||||
|
||||
@@ -12,6 +12,7 @@ Uses standard MCP configuration format:
|
||||
}
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
@@ -43,6 +44,9 @@ class MCPServer:
|
||||
transport: MCPTransport
|
||||
tools: List[dict] = field(default_factory=list)
|
||||
connected: bool = False
|
||||
# Lock for serializing all communication with this server
|
||||
# Prevents message ID collisions and transport interleaving
|
||||
_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
|
||||
|
||||
async def disconnect(self):
|
||||
"""Disconnect from the server."""
|
||||
@@ -232,16 +236,21 @@ class MCPManager:
|
||||
if not server or not server.connected:
|
||||
raise ValueError(f"Server '{server_name}' not connected")
|
||||
|
||||
# Use 5 minute timeout for tool calls (scans can take a while)
|
||||
response = await server.transport.send(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "tools/call",
|
||||
"params": {"name": tool_name, "arguments": arguments},
|
||||
"id": self._get_next_id(),
|
||||
},
|
||||
timeout=300.0,
|
||||
)
|
||||
# Serialize all communication with this server to prevent:
|
||||
# - Message ID collisions
|
||||
# - Transport write interleaving
|
||||
# - Response routing issues
|
||||
async with server._lock:
|
||||
# Use 5 minute timeout for tool calls (scans can take a while)
|
||||
response = await server.transport.send(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "tools/call",
|
||||
"params": {"name": tool_name, "arguments": arguments},
|
||||
"id": self._get_next_id(),
|
||||
},
|
||||
timeout=300.0,
|
||||
)
|
||||
if "error" in response:
|
||||
raise RuntimeError(f"MCP error: {response['error'].get('message')}")
|
||||
return response.get("result", {}).get("content", [])
|
||||
|
||||
@@ -83,6 +83,7 @@ class Runtime(ABC):
|
||||
mcp_manager: Optional MCP manager for tool calls
|
||||
"""
|
||||
self.mcp_manager = mcp_manager
|
||||
self.plan = None # Set by agent for finish tool access
|
||||
|
||||
@property
|
||||
def environment(self) -> EnvironmentInfo:
|
||||
@@ -310,6 +311,8 @@ class LocalRuntime(Runtime):
|
||||
return {"url": self._page.url, "title": await self._page.title()}
|
||||
|
||||
elif action == "screenshot":
|
||||
import time
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
# Navigate first if URL provided
|
||||
@@ -322,7 +325,9 @@ class LocalRuntime(Runtime):
|
||||
output_dir = Path("loot/artifacts/screenshots")
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
filename = f"screenshot_{int(__import__('time').time())}.png"
|
||||
timestamp = int(time.time())
|
||||
unique_id = uuid.uuid4().hex[:8]
|
||||
filename = f"screenshot_{timestamp}_{unique_id}.png"
|
||||
filepath = output_dir / filename
|
||||
|
||||
await self._page.screenshot(path=str(filepath), full_page=True)
|
||||
|
||||
@@ -1,424 +0,0 @@
|
||||
"""Code search tool for GhostCrew - semantic code navigation and analysis."""
|
||||
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
|
||||
|
||||
from ..registry import ToolSchema, register_tool
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ...runtime import Runtime
|
||||
|
||||
|
||||
# Maximum results to prevent context overflow
|
||||
MAX_RESULTS = 20
|
||||
MAX_CONTEXT_LINES = 3
|
||||
|
||||
|
||||
@register_tool(
|
||||
name="search_code",
|
||||
description="Search for code patterns across files. Supports regex and literal search. Returns matches with surrounding context. Use for finding function definitions, variable usages, API endpoints, or security-relevant patterns.",
|
||||
schema=ToolSchema(
|
||||
properties={
|
||||
"query": {
|
||||
"type": "string",
|
||||
"description": "Search pattern (text or regex)",
|
||||
},
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": "Directory or file to search in. Default: current directory",
|
||||
},
|
||||
"pattern": {
|
||||
"type": "string",
|
||||
"description": "File glob pattern to filter (e.g., '*.py', '*.js'). Default: all files",
|
||||
},
|
||||
"regex": {
|
||||
"type": "boolean",
|
||||
"description": "Treat query as regex pattern. Default: false (literal search)",
|
||||
},
|
||||
"case_sensitive": {
|
||||
"type": "boolean",
|
||||
"description": "Case-sensitive search. Default: false",
|
||||
},
|
||||
"context_lines": {
|
||||
"type": "integer",
|
||||
"description": "Number of context lines before/after match. Default: 3",
|
||||
},
|
||||
},
|
||||
required=["query"],
|
||||
),
|
||||
category="code",
|
||||
)
|
||||
async def search_code(arguments: dict, runtime: "Runtime") -> str:
|
||||
"""
|
||||
Search for code patterns across files.
|
||||
|
||||
Args:
|
||||
arguments: Search parameters
|
||||
runtime: The runtime environment
|
||||
|
||||
Returns:
|
||||
Formatted search results with context
|
||||
"""
|
||||
query = arguments["query"]
|
||||
search_path = arguments.get("path", ".")
|
||||
file_pattern = arguments.get("pattern")
|
||||
use_regex = arguments.get("regex", False)
|
||||
case_sensitive = arguments.get("case_sensitive", False)
|
||||
context_lines = min(arguments.get("context_lines", MAX_CONTEXT_LINES), 10)
|
||||
|
||||
try:
|
||||
path = Path(search_path).resolve()
|
||||
|
||||
if not path.exists():
|
||||
return f"Error: Path not found: {search_path}"
|
||||
|
||||
# Compile regex pattern
|
||||
flags = 0 if case_sensitive else re.IGNORECASE
|
||||
if use_regex:
|
||||
try:
|
||||
pattern = re.compile(query, flags)
|
||||
except re.error as e:
|
||||
return f"Error: Invalid regex pattern: {e}"
|
||||
else:
|
||||
# Escape literal string for regex matching
|
||||
pattern = re.compile(re.escape(query), flags)
|
||||
|
||||
# Find matching files
|
||||
matches = []
|
||||
files_searched = 0
|
||||
|
||||
if path.is_file():
|
||||
files_to_search = [path]
|
||||
else:
|
||||
files_to_search = _get_searchable_files(path, file_pattern)
|
||||
|
||||
for filepath in files_to_search:
|
||||
files_searched += 1
|
||||
file_matches = _search_file(filepath, pattern, context_lines)
|
||||
if file_matches:
|
||||
matches.extend(file_matches)
|
||||
|
||||
if len(matches) >= MAX_RESULTS:
|
||||
break
|
||||
|
||||
if not matches:
|
||||
return f"No matches found for '{query}' in {files_searched} files"
|
||||
|
||||
# Format results
|
||||
output = [f"Found {len(matches)} matches in {files_searched} files:\n"]
|
||||
|
||||
for match in matches[:MAX_RESULTS]:
|
||||
output.append(_format_match(match))
|
||||
|
||||
if len(matches) > MAX_RESULTS:
|
||||
output.append(f"\n... and {len(matches) - MAX_RESULTS} more matches (showing first {MAX_RESULTS})")
|
||||
|
||||
return "\n".join(output)
|
||||
|
||||
except Exception as e:
|
||||
return f"Error searching code: {e}"
|
||||
|
||||
|
||||
@register_tool(
|
||||
name="find_definition",
|
||||
description="Find the definition of a function, class, or variable. Searches for common definition patterns across languages (def, function, class, const, let, var, etc.).",
|
||||
schema=ToolSchema(
|
||||
properties={
|
||||
"name": {
|
||||
"type": "string",
|
||||
"description": "Name of the function, class, or variable to find",
|
||||
},
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": "Directory to search in. Default: current directory",
|
||||
},
|
||||
"type": {
|
||||
"type": "string",
|
||||
"enum": ["function", "class", "variable", "any"],
|
||||
"description": "Type of definition to find. Default: 'any'",
|
||||
},
|
||||
},
|
||||
required=["name"],
|
||||
),
|
||||
category="code",
|
||||
)
|
||||
async def find_definition(arguments: dict, runtime: "Runtime") -> str:
|
||||
"""
|
||||
Find definition of a symbol.
|
||||
|
||||
Args:
|
||||
arguments: Search parameters
|
||||
runtime: The runtime environment
|
||||
|
||||
Returns:
|
||||
Definition location(s) with context
|
||||
"""
|
||||
name = arguments["name"]
|
||||
search_path = arguments.get("path", ".")
|
||||
def_type = arguments.get("type", "any")
|
||||
|
||||
# Build regex patterns for different definition types
|
||||
patterns = {
|
||||
"function": [
|
||||
rf"^\s*def\s+{re.escape(name)}\s*\(", # Python
|
||||
rf"^\s*async\s+def\s+{re.escape(name)}\s*\(", # Python async
|
||||
rf"^\s*function\s+{re.escape(name)}\s*\(", # JavaScript
|
||||
rf"^\s*async\s+function\s+{re.escape(name)}\s*\(", # JS async
|
||||
rf"^\s*{re.escape(name)}\s*[:=]\s*(?:async\s+)?function", # JS assigned
|
||||
rf"^\s*{re.escape(name)}\s*[:=]\s*\([^)]*\)\s*=>", # JS arrow
|
||||
rf"^\s*(?:public|private|protected)?\s*(?:static\s+)?(?:async\s+)?{re.escape(name)}\s*\(", # JS/TS method
|
||||
rf"^\s*func\s+{re.escape(name)}\s*\(", # Go
|
||||
rf"^\s*(?:public|private|protected)\s+.*\s+{re.escape(name)}\s*\(", # Java/C#
|
||||
],
|
||||
"class": [
|
||||
rf"^\s*class\s+{re.escape(name)}\b", # Python/JS/TS
|
||||
rf"^\s*(?:abstract\s+)?class\s+{re.escape(name)}\b", # Java/C#
|
||||
rf"^\s*interface\s+{re.escape(name)}\b", # TS/Java
|
||||
rf"^\s*type\s+{re.escape(name)}\s*=", # TS type alias
|
||||
rf"^\s*struct\s+{re.escape(name)}\b", # Go/Rust
|
||||
],
|
||||
"variable": [
|
||||
rf"^\s*{re.escape(name)}\s*=", # Python/Ruby
|
||||
rf"^\s*(?:const|let|var)\s+{re.escape(name)}\b", # JavaScript
|
||||
rf"^\s*(?:const|let|var)\s+{re.escape(name)}\s*:", # TypeScript
|
||||
rf"^\s*(?:var|val)\s+{re.escape(name)}\b", # Kotlin/Scala
|
||||
rf"^\s*{re.escape(name)}\s*:=", # Go
|
||||
],
|
||||
}
|
||||
|
||||
# Select patterns based on type
|
||||
if def_type == "any":
|
||||
selected_patterns = []
|
||||
for p_list in patterns.values():
|
||||
selected_patterns.extend(p_list)
|
||||
else:
|
||||
selected_patterns = patterns.get(def_type, [])
|
||||
|
||||
if not selected_patterns:
|
||||
return f"Error: Unknown definition type '{def_type}'"
|
||||
|
||||
# Combine patterns
|
||||
combined_pattern = "|".join(f"({p})" for p in selected_patterns)
|
||||
|
||||
try:
|
||||
pattern = re.compile(combined_pattern, re.MULTILINE)
|
||||
path = Path(search_path).resolve()
|
||||
|
||||
if not path.exists():
|
||||
return f"Error: Path not found: {search_path}"
|
||||
|
||||
matches = []
|
||||
files_searched = 0
|
||||
|
||||
for filepath in _get_searchable_files(path, None):
|
||||
files_searched += 1
|
||||
file_matches = _search_file(filepath, pattern, context_lines=5)
|
||||
if file_matches:
|
||||
matches.extend(file_matches)
|
||||
|
||||
if len(matches) >= 10:
|
||||
break
|
||||
|
||||
if not matches:
|
||||
return f"No definition found for '{name}' ({def_type}) in {files_searched} files"
|
||||
|
||||
output = [f"Found {len(matches)} definition(s) for '{name}':\n"]
|
||||
for match in matches[:10]:
|
||||
output.append(_format_match(match))
|
||||
|
||||
return "\n".join(output)
|
||||
|
||||
except Exception as e:
|
||||
return f"Error finding definition: {e}"
|
||||
|
||||
|
||||
@register_tool(
|
||||
name="list_functions",
|
||||
description="List all function/method definitions in a file or directory. Useful for understanding code structure.",
|
||||
schema=ToolSchema(
|
||||
properties={
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": "File or directory to analyze",
|
||||
},
|
||||
"pattern": {
|
||||
"type": "string",
|
||||
"description": "File glob pattern (e.g., '*.py'). Default: auto-detect",
|
||||
},
|
||||
},
|
||||
required=["path"],
|
||||
),
|
||||
category="code",
|
||||
)
|
||||
async def list_functions(arguments: dict, runtime: "Runtime") -> str:
|
||||
"""
|
||||
List function definitions in files.
|
||||
|
||||
Args:
|
||||
arguments: Search parameters
|
||||
runtime: The runtime environment
|
||||
|
||||
Returns:
|
||||
List of functions with file and line numbers
|
||||
"""
|
||||
search_path = arguments["path"]
|
||||
file_pattern = arguments.get("pattern")
|
||||
|
||||
# Patterns for function definitions
|
||||
func_patterns = [
|
||||
(r"^\s*def\s+(\w+)\s*\(", "python"),
|
||||
(r"^\s*async\s+def\s+(\w+)\s*\(", "python"),
|
||||
(r"^\s*function\s+(\w+)\s*\(", "javascript"),
|
||||
(r"^\s*async\s+function\s+(\w+)\s*\(", "javascript"),
|
||||
(r"^\s*(?:export\s+)?(?:async\s+)?function\s+(\w+)", "javascript"),
|
||||
(r"^\s*(\w+)\s*[:=]\s*(?:async\s+)?function", "javascript"),
|
||||
(r"^\s*(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>", "javascript"),
|
||||
(r"^\s*(?:public|private|protected)?\s*(?:static\s+)?(?:async\s+)?(\w+)\s*\([^)]*\)\s*[:{]", "typescript"),
|
||||
(r"^\s*func\s+(\w+)\s*\(", "go"),
|
||||
(r"^\s*(?:public|private|protected)\s+.*\s+(\w+)\s*\([^)]*\)\s*{", "java"),
|
||||
]
|
||||
|
||||
combined = "|".join(f"(?:{p})" for p, _ in func_patterns)
|
||||
pattern = re.compile(combined, re.MULTILINE)
|
||||
|
||||
try:
|
||||
path = Path(search_path).resolve()
|
||||
|
||||
if not path.exists():
|
||||
return f"Error: Path not found: {search_path}"
|
||||
|
||||
results: Dict[str, List[Tuple[int, str]]] = {}
|
||||
|
||||
if path.is_file():
|
||||
files_to_search = [path]
|
||||
else:
|
||||
files_to_search = _get_searchable_files(path, file_pattern)
|
||||
|
||||
for filepath in files_to_search:
|
||||
try:
|
||||
content = filepath.read_text(encoding="utf-8", errors="ignore")
|
||||
lines = content.splitlines()
|
||||
|
||||
for i, line in enumerate(lines, 1):
|
||||
match = pattern.search(line)
|
||||
if match:
|
||||
# Find the first non-None group (function name)
|
||||
func_name = next((g for g in match.groups() if g), None)
|
||||
if func_name:
|
||||
rel_path = str(filepath.relative_to(path) if path.is_dir() else filepath.name)
|
||||
if rel_path not in results:
|
||||
results[rel_path] = []
|
||||
results[rel_path].append((i, func_name))
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not results:
|
||||
return f"No functions found in {search_path}"
|
||||
|
||||
output = [f"Functions in {search_path}:\n"]
|
||||
|
||||
for filepath, funcs in sorted(results.items()):
|
||||
output.append(f"\n{filepath}:")
|
||||
for line_num, func_name in funcs:
|
||||
output.append(f" L{line_num}: {func_name}()")
|
||||
|
||||
total = sum(len(f) for f in results.values())
|
||||
output.insert(1, f"Found {total} functions in {len(results)} files")
|
||||
|
||||
return "\n".join(output)
|
||||
|
||||
except Exception as e:
|
||||
return f"Error listing functions: {e}"
|
||||
|
||||
|
||||
def _get_searchable_files(path: Path, pattern: Optional[str]) -> List[Path]:
|
||||
"""Get list of searchable files, excluding binary and hidden files."""
|
||||
files = []
|
||||
|
||||
# File extensions to search
|
||||
code_extensions = {
|
||||
".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".go", ".rb", ".php",
|
||||
".c", ".cpp", ".h", ".hpp", ".cs", ".rs", ".swift", ".kt", ".scala",
|
||||
".sh", ".bash", ".zsh", ".ps1", ".bat", ".cmd",
|
||||
".json", ".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf",
|
||||
".xml", ".html", ".htm", ".css", ".scss", ".sass",
|
||||
".sql", ".graphql", ".md", ".txt", ".env", ".gitignore",
|
||||
}
|
||||
|
||||
# Directories to skip
|
||||
skip_dirs = {
|
||||
".git", ".svn", ".hg", "node_modules", "__pycache__", ".pytest_cache",
|
||||
"venv", ".venv", "env", ".env", "dist", "build", "target", ".idea",
|
||||
".vscode", "coverage", ".tox", "eggs", "*.egg-info",
|
||||
}
|
||||
|
||||
for root, dirs, filenames in os.walk(path):
|
||||
# Skip hidden and common non-code directories
|
||||
dirs[:] = [d for d in dirs if d not in skip_dirs and not d.startswith(".")]
|
||||
|
||||
for filename in filenames:
|
||||
filepath = Path(root) / filename
|
||||
|
||||
# Skip hidden files
|
||||
if filename.startswith("."):
|
||||
continue
|
||||
|
||||
# Apply glob pattern if specified
|
||||
if pattern and not filepath.match(pattern):
|
||||
continue
|
||||
|
||||
# Check extension
|
||||
if filepath.suffix.lower() in code_extensions or not filepath.suffix:
|
||||
files.append(filepath)
|
||||
|
||||
return files
|
||||
|
||||
|
||||
def _search_file(
|
||||
filepath: Path,
|
||||
pattern: re.Pattern,
|
||||
context_lines: int
|
||||
) -> List[dict]:
|
||||
"""Search a single file for pattern matches."""
|
||||
matches = []
|
||||
|
||||
try:
|
||||
content = filepath.read_text(encoding="utf-8", errors="ignore")
|
||||
lines = content.splitlines()
|
||||
|
||||
for i, line in enumerate(lines):
|
||||
if pattern.search(line):
|
||||
# Get context
|
||||
start = max(0, i - context_lines)
|
||||
end = min(len(lines), i + context_lines + 1)
|
||||
|
||||
context = []
|
||||
for j in range(start, end):
|
||||
prefix = "→ " if j == i else " "
|
||||
context.append((j + 1, prefix, lines[j]))
|
||||
|
||||
matches.append({
|
||||
"file": str(filepath),
|
||||
"line": i + 1,
|
||||
"match": line.strip(),
|
||||
"context": context,
|
||||
})
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return matches
|
||||
|
||||
|
||||
def _format_match(match: dict) -> str:
|
||||
"""Format a search match for display."""
|
||||
output = [f"\n{'─' * 50}"]
|
||||
output.append(f"📄 {match['file']}:{match['line']}")
|
||||
output.append("")
|
||||
|
||||
for line_num, prefix, text in match["context"]:
|
||||
output.append(f"{line_num:4d} {prefix}{text}")
|
||||
|
||||
return "\n".join(output)
|
||||
@@ -1,176 +0,0 @@
|
||||
"""Task completion tool for GhostCrew agent loop control."""
|
||||
|
||||
import json
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from ..registry import ToolSchema, register_tool
|
||||
|
||||
# Sentinel value to signal task completion
|
||||
TASK_COMPLETE_SIGNAL = "__TASK_COMPLETE__"
|
||||
|
||||
|
||||
@register_tool(
|
||||
name="finish",
|
||||
description="Signal that the current task is finished. Call this when you have completed ALL steps of the user's request. Provide a structured report of what was accomplished.",
|
||||
schema=ToolSchema(
|
||||
properties={
|
||||
"status": {
|
||||
"type": "string",
|
||||
"enum": ["success", "partial", "failed"],
|
||||
"description": "Overall task status: 'success' (all objectives met), 'partial' (some objectives met), 'failed' (unable to complete)",
|
||||
},
|
||||
"summary": {
|
||||
"type": "string",
|
||||
"description": "Brief summary of what was accomplished and any key findings",
|
||||
},
|
||||
"findings": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
"description": "List of key findings, vulnerabilities discovered, or important observations",
|
||||
},
|
||||
"artifacts": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
"description": "List of files created (PoCs, scripts, screenshots, reports)",
|
||||
},
|
||||
"recommendations": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
"description": "Suggested next steps or follow-up actions",
|
||||
},
|
||||
},
|
||||
required=["status", "summary"],
|
||||
),
|
||||
category="control",
|
||||
)
|
||||
async def finish(arguments: dict, runtime) -> str:
|
||||
"""
|
||||
Signal task completion to the agent framework with structured output.
|
||||
|
||||
This tool is called by the agent when it has finished all steps
|
||||
of the user's task. The framework uses this as an explicit
|
||||
termination signal rather than relying on LLM text output.
|
||||
|
||||
Args:
|
||||
arguments: Dictionary with structured completion data
|
||||
runtime: The runtime environment (unused)
|
||||
|
||||
Returns:
|
||||
The completion signal with structured JSON data
|
||||
"""
|
||||
# Build structured completion report
|
||||
report = CompletionReport(
|
||||
status=arguments.get("status", "success"),
|
||||
summary=arguments.get("summary", "Task completed."),
|
||||
findings=arguments.get("findings", []),
|
||||
artifacts=arguments.get("artifacts", []),
|
||||
recommendations=arguments.get("recommendations", []),
|
||||
)
|
||||
|
||||
# Return special signal with JSON-encoded report
|
||||
return f"{TASK_COMPLETE_SIGNAL}:{report.to_json()}"
|
||||
|
||||
|
||||
class CompletionReport:
|
||||
"""Structured completion report for task results."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
status: str = "success",
|
||||
summary: str = "",
|
||||
findings: Optional[List[str]] = None,
|
||||
artifacts: Optional[List[str]] = None,
|
||||
recommendations: Optional[List[str]] = None,
|
||||
):
|
||||
self.status = status
|
||||
self.summary = summary
|
||||
self.findings = findings or []
|
||||
self.artifacts = artifacts or []
|
||||
self.recommendations = recommendations or []
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return {
|
||||
"status": self.status,
|
||||
"summary": self.summary,
|
||||
"findings": self.findings,
|
||||
"artifacts": self.artifacts,
|
||||
"recommendations": self.recommendations,
|
||||
}
|
||||
|
||||
def to_json(self) -> str:
|
||||
"""Convert to JSON string."""
|
||||
return json.dumps(self.to_dict())
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_str: str) -> "CompletionReport":
|
||||
"""Create from JSON string."""
|
||||
data = json.loads(json_str)
|
||||
return cls(**data)
|
||||
|
||||
def format_display(self) -> str:
|
||||
"""Format for human-readable display."""
|
||||
lines = []
|
||||
|
||||
# Status indicator
|
||||
status_icons = {"success": "✓", "partial": "◐", "failed": "✗"}
|
||||
icon = status_icons.get(self.status, "•")
|
||||
lines.append(f"{icon} Status: {self.status.upper()}")
|
||||
lines.append("")
|
||||
|
||||
# Summary
|
||||
lines.append(f"Summary: {self.summary}")
|
||||
|
||||
# Findings
|
||||
if self.findings:
|
||||
lines.append("")
|
||||
lines.append("Findings:")
|
||||
for finding in self.findings:
|
||||
lines.append(f" • {finding}")
|
||||
|
||||
# Artifacts
|
||||
if self.artifacts:
|
||||
lines.append("")
|
||||
lines.append("Artifacts:")
|
||||
for artifact in self.artifacts:
|
||||
lines.append(f" 📄 {artifact}")
|
||||
|
||||
# Recommendations
|
||||
if self.recommendations:
|
||||
lines.append("")
|
||||
lines.append("Recommendations:")
|
||||
for rec in self.recommendations:
|
||||
lines.append(f" → {rec}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def is_task_complete(result: str) -> bool:
|
||||
"""Check if a tool result signals task completion."""
|
||||
return result.startswith(TASK_COMPLETE_SIGNAL)
|
||||
|
||||
|
||||
def extract_completion_summary(result: str) -> str:
|
||||
"""Extract the summary from a task_complete result (legacy support)."""
|
||||
if is_task_complete(result):
|
||||
data = result[len(TASK_COMPLETE_SIGNAL) + 1:] # +1 for the colon
|
||||
# Try to parse as JSON for new format
|
||||
try:
|
||||
report = CompletionReport.from_json(data)
|
||||
return report.summary
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
# Fall back to raw string for legacy format
|
||||
return data
|
||||
return result
|
||||
|
||||
|
||||
def extract_completion_report(result: str) -> Optional[CompletionReport]:
|
||||
"""Extract the full structured report from a task_complete result."""
|
||||
if is_task_complete(result):
|
||||
data = result[len(TASK_COMPLETE_SIGNAL) + 1:]
|
||||
try:
|
||||
return CompletionReport.from_json(data)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
# Legacy format - wrap in report
|
||||
return CompletionReport(status="success", summary=data)
|
||||
return None
|
||||
@@ -1,352 +0,0 @@
|
||||
"""Filesystem tool for GhostCrew - precise file reading and editing."""
|
||||
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, List, Optional
|
||||
|
||||
from ..registry import ToolSchema, register_tool
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ...runtime import Runtime
|
||||
|
||||
|
||||
# Safety: Restrict operations to workspace
|
||||
_WORKSPACE_ROOT: Optional[Path] = None
|
||||
|
||||
|
||||
def set_workspace_root(path: Path) -> None:
|
||||
"""Set the workspace root for safety checks."""
|
||||
global _WORKSPACE_ROOT
|
||||
_WORKSPACE_ROOT = path.resolve()
|
||||
|
||||
|
||||
def _validate_path(filepath: str) -> Path:
|
||||
"""Validate and resolve a file path within the workspace."""
|
||||
path = Path(filepath).resolve()
|
||||
|
||||
# If workspace root is set, ensure path is within it
|
||||
if _WORKSPACE_ROOT:
|
||||
try:
|
||||
path.relative_to(_WORKSPACE_ROOT)
|
||||
except ValueError:
|
||||
raise ValueError(f"Path '{filepath}' is outside workspace root")
|
||||
|
||||
return path
|
||||
|
||||
|
||||
@register_tool(
|
||||
name="read_file",
|
||||
description="Read contents of a file. Can read entire file or specific line range. Use this to examine source code, configs, or any text file.",
|
||||
schema=ToolSchema(
|
||||
properties={
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": "Path to the file to read",
|
||||
},
|
||||
"start_line": {
|
||||
"type": "integer",
|
||||
"description": "Starting line number (1-indexed). If omitted, reads from beginning.",
|
||||
},
|
||||
"end_line": {
|
||||
"type": "integer",
|
||||
"description": "Ending line number (1-indexed, inclusive). If omitted, reads to end.",
|
||||
},
|
||||
},
|
||||
required=["path"],
|
||||
),
|
||||
category="filesystem",
|
||||
)
|
||||
async def read_file(arguments: dict, runtime: "Runtime") -> str:
|
||||
"""
|
||||
Read a file's contents, optionally within a line range.
|
||||
|
||||
Args:
|
||||
arguments: Dictionary with 'path' and optional 'start_line', 'end_line'
|
||||
runtime: The runtime environment
|
||||
|
||||
Returns:
|
||||
File contents with line numbers
|
||||
"""
|
||||
filepath = arguments["path"]
|
||||
start_line = arguments.get("start_line")
|
||||
end_line = arguments.get("end_line")
|
||||
|
||||
try:
|
||||
path = _validate_path(filepath)
|
||||
|
||||
if not path.exists():
|
||||
return f"Error: File not found: {filepath}"
|
||||
|
||||
if not path.is_file():
|
||||
return f"Error: Not a file: {filepath}"
|
||||
|
||||
# Read file content
|
||||
try:
|
||||
content = path.read_text(encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
content = path.read_text(encoding="latin-1")
|
||||
|
||||
lines = content.splitlines()
|
||||
total_lines = len(lines)
|
||||
|
||||
# Handle line range
|
||||
start_idx = (start_line - 1) if start_line else 0
|
||||
end_idx = end_line if end_line else total_lines
|
||||
|
||||
# Clamp to valid range
|
||||
start_idx = max(0, min(start_idx, total_lines))
|
||||
end_idx = max(0, min(end_idx, total_lines))
|
||||
|
||||
if start_idx >= end_idx:
|
||||
return f"Error: Invalid line range {start_line}-{end_line} (file has {total_lines} lines)"
|
||||
|
||||
# Format output with line numbers
|
||||
selected_lines = lines[start_idx:end_idx]
|
||||
output_lines = []
|
||||
for i, line in enumerate(selected_lines, start=start_idx + 1):
|
||||
output_lines.append(f"{i:4d} | {line}")
|
||||
|
||||
header = f"File: {filepath} (lines {start_idx + 1}-{end_idx} of {total_lines})"
|
||||
return f"{header}\n{'─' * 60}\n" + "\n".join(output_lines)
|
||||
|
||||
except ValueError as e:
|
||||
return f"Error: {e}"
|
||||
except Exception as e:
|
||||
return f"Error reading file: {e}"
|
||||
|
||||
|
||||
@register_tool(
|
||||
name="write_file",
|
||||
description="Write content to a file. Creates the file if it doesn't exist, or overwrites if it does. Use for creating PoCs, scripts, or config files.",
|
||||
schema=ToolSchema(
|
||||
properties={
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": "Path to the file to write",
|
||||
},
|
||||
"content": {
|
||||
"type": "string",
|
||||
"description": "Content to write to the file",
|
||||
},
|
||||
"append": {
|
||||
"type": "boolean",
|
||||
"description": "If true, append to file instead of overwriting. Default: false",
|
||||
},
|
||||
},
|
||||
required=["path", "content"],
|
||||
),
|
||||
category="filesystem",
|
||||
)
|
||||
async def write_file(arguments: dict, runtime: "Runtime") -> str:
|
||||
"""
|
||||
Write content to a file.
|
||||
|
||||
Args:
|
||||
arguments: Dictionary with 'path', 'content', and optional 'append'
|
||||
runtime: The runtime environment
|
||||
|
||||
Returns:
|
||||
Success or error message
|
||||
"""
|
||||
filepath = arguments["path"]
|
||||
content = arguments["content"]
|
||||
append = arguments.get("append", False)
|
||||
|
||||
try:
|
||||
path = _validate_path(filepath)
|
||||
|
||||
# Create parent directories if needed
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
mode = "a" if append else "w"
|
||||
with open(path, mode, encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
|
||||
action = "Appended to" if append else "Wrote"
|
||||
return f"{action} {len(content)} bytes to {filepath}"
|
||||
|
||||
except ValueError as e:
|
||||
return f"Error: {e}"
|
||||
except Exception as e:
|
||||
return f"Error writing file: {e}"
|
||||
|
||||
|
||||
@register_tool(
|
||||
name="replace_in_file",
|
||||
description="Replace text in a file. Finds exact match of 'old_string' and replaces with 'new_string'. Include surrounding context in old_string to ensure unique match.",
|
||||
schema=ToolSchema(
|
||||
properties={
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": "Path to the file to edit",
|
||||
},
|
||||
"old_string": {
|
||||
"type": "string",
|
||||
"description": "Exact text to find and replace (include context lines for unique match)",
|
||||
},
|
||||
"new_string": {
|
||||
"type": "string",
|
||||
"description": "Text to replace old_string with",
|
||||
},
|
||||
},
|
||||
required=["path", "old_string", "new_string"],
|
||||
),
|
||||
category="filesystem",
|
||||
)
|
||||
async def replace_in_file(arguments: dict, runtime: "Runtime") -> str:
|
||||
"""
|
||||
Replace text in a file.
|
||||
|
||||
Args:
|
||||
arguments: Dictionary with 'path', 'old_string', 'new_string'
|
||||
runtime: The runtime environment
|
||||
|
||||
Returns:
|
||||
Success or error message with diff preview
|
||||
"""
|
||||
filepath = arguments["path"]
|
||||
old_string = arguments["old_string"]
|
||||
new_string = arguments["new_string"]
|
||||
|
||||
try:
|
||||
path = _validate_path(filepath)
|
||||
|
||||
if not path.exists():
|
||||
return f"Error: File not found: {filepath}"
|
||||
|
||||
# Read current content
|
||||
try:
|
||||
content = path.read_text(encoding="utf-8")
|
||||
except UnicodeDecodeError:
|
||||
content = path.read_text(encoding="latin-1")
|
||||
|
||||
# Count occurrences
|
||||
count = content.count(old_string)
|
||||
|
||||
if count == 0:
|
||||
return f"Error: String not found in {filepath}. Make sure old_string matches exactly (including whitespace)."
|
||||
|
||||
if count > 1:
|
||||
return f"Error: Found {count} matches in {filepath}. Include more context in old_string to make it unique."
|
||||
|
||||
# Perform replacement
|
||||
new_content = content.replace(old_string, new_string, 1)
|
||||
path.write_text(new_content, encoding="utf-8")
|
||||
|
||||
# Show what changed
|
||||
old_preview = old_string[:100] + "..." if len(old_string) > 100 else old_string
|
||||
new_preview = new_string[:100] + "..." if len(new_string) > 100 else new_string
|
||||
|
||||
return f"Replaced in {filepath}:\n- {repr(old_preview)}\n+ {repr(new_preview)}"
|
||||
|
||||
except ValueError as e:
|
||||
return f"Error: {e}"
|
||||
except Exception as e:
|
||||
return f"Error replacing in file: {e}"
|
||||
|
||||
|
||||
@register_tool(
|
||||
name="list_directory",
|
||||
description="List contents of a directory. Shows files and subdirectories with basic info.",
|
||||
schema=ToolSchema(
|
||||
properties={
|
||||
"path": {
|
||||
"type": "string",
|
||||
"description": "Path to the directory to list. Default: current directory",
|
||||
},
|
||||
"recursive": {
|
||||
"type": "boolean",
|
||||
"description": "If true, list recursively (max 3 levels). Default: false",
|
||||
},
|
||||
"pattern": {
|
||||
"type": "string",
|
||||
"description": "Glob pattern to filter results (e.g., '*.py', '*.js')",
|
||||
},
|
||||
},
|
||||
required=[],
|
||||
),
|
||||
category="filesystem",
|
||||
)
|
||||
async def list_directory(arguments: dict, runtime: "Runtime") -> str:
|
||||
"""
|
||||
List directory contents.
|
||||
|
||||
Args:
|
||||
arguments: Dictionary with optional 'path', 'recursive', 'pattern'
|
||||
runtime: The runtime environment
|
||||
|
||||
Returns:
|
||||
Directory listing
|
||||
"""
|
||||
dirpath = arguments.get("path", ".")
|
||||
recursive = arguments.get("recursive", False)
|
||||
pattern = arguments.get("pattern")
|
||||
|
||||
try:
|
||||
path = _validate_path(dirpath)
|
||||
|
||||
if not path.exists():
|
||||
return f"Error: Directory not found: {dirpath}"
|
||||
|
||||
if not path.is_dir():
|
||||
return f"Error: Not a directory: {dirpath}"
|
||||
|
||||
entries = []
|
||||
|
||||
if recursive:
|
||||
# Recursive listing with depth limit
|
||||
for root, dirs, files in os.walk(path):
|
||||
root_path = Path(root)
|
||||
depth = len(root_path.relative_to(path).parts)
|
||||
if depth > 3:
|
||||
dirs.clear() # Don't go deeper
|
||||
continue
|
||||
|
||||
rel_root = root_path.relative_to(path)
|
||||
prefix = " " * depth
|
||||
|
||||
for d in sorted(dirs):
|
||||
if not d.startswith('.'):
|
||||
entries.append(f"{prefix}{d}/")
|
||||
|
||||
for f in sorted(files):
|
||||
if pattern and not Path(f).match(pattern):
|
||||
continue
|
||||
if not f.startswith('.'):
|
||||
file_path = root_path / f
|
||||
size = file_path.stat().st_size
|
||||
entries.append(f"{prefix}{f} ({_format_size(size)})")
|
||||
else:
|
||||
# Single-level listing
|
||||
for item in sorted(path.iterdir()):
|
||||
if item.name.startswith('.'):
|
||||
continue
|
||||
if pattern and not item.match(pattern):
|
||||
continue
|
||||
|
||||
if item.is_dir():
|
||||
entries.append(f"{item.name}/")
|
||||
else:
|
||||
size = item.stat().st_size
|
||||
entries.append(f"{item.name} ({_format_size(size)})")
|
||||
|
||||
if not entries:
|
||||
return f"Directory {dirpath} is empty" + (f" (pattern: {pattern})" if pattern else "")
|
||||
|
||||
header = f"Directory: {dirpath}" + (f" (pattern: {pattern})" if pattern else "")
|
||||
return f"{header}\n{'─' * 40}\n" + "\n".join(entries)
|
||||
|
||||
except ValueError as e:
|
||||
return f"Error: {e}"
|
||||
except Exception as e:
|
||||
return f"Error listing directory: {e}"
|
||||
|
||||
|
||||
def _format_size(size: int) -> str:
|
||||
"""Format file size in human-readable form."""
|
||||
for unit in ['B', 'KB', 'MB', 'GB']:
|
||||
if size < 1024:
|
||||
return f"{size:.1f}{unit}" if unit != 'B' else f"{size}B"
|
||||
size /= 1024
|
||||
return f"{size:.1f}TB"
|
||||
244
ghostcrew/tools/finish/__init__.py
Normal file
244
ghostcrew/tools/finish/__init__.py
Normal file
@@ -0,0 +1,244 @@
|
||||
"""Task completion tool for GhostCrew agent loop control."""
|
||||
|
||||
import json
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
||||
|
||||
from ..registry import Tool as Tool
|
||||
from ..registry import ToolSchema, register_tool
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ...runtime import Runtime
|
||||
from ..planning import TaskPlan
|
||||
|
||||
# Sentinel value to signal task completion
|
||||
TASK_COMPLETE_SIGNAL = "__TASK_COMPLETE__"
|
||||
|
||||
|
||||
class StepStatus(str, Enum):
|
||||
PENDING = "pending"
|
||||
IN_PROGRESS = "in_progress"
|
||||
COMPLETE = "complete"
|
||||
SKIPPED = "skipped"
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlanStep:
|
||||
id: int
|
||||
description: str
|
||||
status: StepStatus = StepStatus.PENDING
|
||||
result: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"id": self.id,
|
||||
"description": self.description,
|
||||
"status": self.status.value,
|
||||
"result": self.result,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class TaskPlan:
|
||||
steps: list[PlanStep] = field(default_factory=list)
|
||||
original_request: str = ""
|
||||
|
||||
def is_complete(self) -> bool:
|
||||
return all(
|
||||
s.status in (StepStatus.COMPLETE, StepStatus.SKIPPED) for s in self.steps
|
||||
)
|
||||
|
||||
def clear(self) -> None:
|
||||
self.steps.clear()
|
||||
self.original_request = ""
|
||||
|
||||
def get_current_step(self) -> Optional[PlanStep]:
|
||||
for s in self.steps:
|
||||
if s.status == StepStatus.PENDING:
|
||||
return s
|
||||
return None
|
||||
|
||||
def get_pending_steps(self) -> list[PlanStep]:
|
||||
return [s for s in self.steps if s.status == StepStatus.PENDING]
|
||||
|
||||
|
||||
@register_tool(
|
||||
name="finish",
|
||||
description="Mark plan steps as complete or skipped. Actions: 'complete' (mark step done), 'skip' (step not applicable). Once all steps are complete/skipped, task automatically finishes.",
|
||||
schema=ToolSchema(
|
||||
properties={
|
||||
"action": {
|
||||
"type": "string",
|
||||
"enum": ["complete", "skip"],
|
||||
"description": "Action: 'complete' (mark step done) or 'skip' (step not applicable)",
|
||||
},
|
||||
"step_id": {
|
||||
"type": "integer",
|
||||
"description": "Step number (1-indexed)",
|
||||
},
|
||||
"result": {
|
||||
"type": "string",
|
||||
"description": "[complete only] What was accomplished",
|
||||
},
|
||||
"reason": {
|
||||
"type": "string",
|
||||
"description": "[skip only] Why step is being skipped",
|
||||
},
|
||||
},
|
||||
required=["action", "step_id"],
|
||||
),
|
||||
category="workflow",
|
||||
)
|
||||
async def finish(arguments: dict, runtime: "Runtime") -> str:
|
||||
"""Mark plan steps as complete or skipped. Accesses plan via runtime.plan."""
|
||||
action = arguments.get("action", "")
|
||||
step_id = arguments.get("step_id")
|
||||
|
||||
# Access plan from runtime (set by agent)
|
||||
plan = getattr(runtime, "plan", None)
|
||||
if plan is None or len(plan.steps) == 0:
|
||||
return "No active plan."
|
||||
|
||||
# Find the step
|
||||
step = next((s for s in plan.steps if s.id == step_id), None)
|
||||
if not step:
|
||||
valid_ids = [s.id for s in plan.steps]
|
||||
return f"Error: Step {step_id} not found. Valid IDs: {valid_ids}"
|
||||
|
||||
if action == "complete":
|
||||
result = arguments.get("result", "")
|
||||
step.status = StepStatus.COMPLETE
|
||||
step.result = result
|
||||
|
||||
# Build response
|
||||
lines = [f"Step {step_id} complete"]
|
||||
if result:
|
||||
lines.append(f"Result: {result}")
|
||||
|
||||
# Check if all done
|
||||
if plan.is_complete():
|
||||
lines.append("All steps complete")
|
||||
else:
|
||||
next_step = plan.get_current_step()
|
||||
if next_step:
|
||||
lines.append(f"Next: Step {next_step.id}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
elif action == "skip":
|
||||
reason = arguments.get("reason", "")
|
||||
if not reason:
|
||||
return "Error: 'reason' required for skip action."
|
||||
|
||||
step.status = StepStatus.SKIPPED
|
||||
step.result = f"Skipped: {reason}"
|
||||
return f"Step {step_id} skipped: {reason}"
|
||||
|
||||
else:
|
||||
return f"Error: Unknown action '{action}'. Use 'complete' or 'skip'."
|
||||
|
||||
|
||||
class CompletionReport:
|
||||
"""Structured completion report for task results."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
status: str = "success",
|
||||
summary: str = "",
|
||||
findings: Optional[List[str]] = None,
|
||||
artifacts: Optional[List[str]] = None,
|
||||
recommendations: Optional[List[str]] = None,
|
||||
):
|
||||
self.status = status
|
||||
self.summary = summary
|
||||
self.findings = findings or []
|
||||
self.artifacts = artifacts or []
|
||||
self.recommendations = recommendations or []
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return {
|
||||
"status": self.status,
|
||||
"summary": self.summary,
|
||||
"findings": self.findings,
|
||||
"artifacts": self.artifacts,
|
||||
"recommendations": self.recommendations,
|
||||
}
|
||||
|
||||
def to_json(self) -> str:
|
||||
"""Convert to JSON string."""
|
||||
return json.dumps(self.to_dict())
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_str: str) -> "CompletionReport":
|
||||
"""Create from JSON string."""
|
||||
data = json.loads(json_str)
|
||||
return cls(**data)
|
||||
|
||||
def format_display(self) -> str:
|
||||
"""Format for human-readable display."""
|
||||
lines = []
|
||||
|
||||
# Status indicator
|
||||
status_icons = {"success": "✓", "partial": "◐", "failed": "✗"}
|
||||
icon = status_icons.get(self.status, "•")
|
||||
lines.append(f"{icon} Status: {self.status.upper()}")
|
||||
lines.append("")
|
||||
|
||||
# Summary
|
||||
lines.append(f"Summary: {self.summary}")
|
||||
|
||||
# Findings
|
||||
if self.findings:
|
||||
lines.append("")
|
||||
lines.append("Findings:")
|
||||
for finding in self.findings:
|
||||
lines.append(f" • {finding}")
|
||||
|
||||
# Artifacts
|
||||
if self.artifacts:
|
||||
lines.append("")
|
||||
lines.append("Artifacts:")
|
||||
for artifact in self.artifacts:
|
||||
lines.append(f" 📄 {artifact}")
|
||||
|
||||
# Recommendations
|
||||
if self.recommendations:
|
||||
lines.append("")
|
||||
lines.append("Recommendations:")
|
||||
for rec in self.recommendations:
|
||||
lines.append(f" → {rec}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def is_task_complete(result: str) -> bool:
|
||||
"""Check if a tool result signals task completion."""
|
||||
return result.startswith(TASK_COMPLETE_SIGNAL)
|
||||
|
||||
|
||||
def extract_completion_summary(result: str) -> str:
|
||||
"""Extract the summary from a task_complete result (legacy support)."""
|
||||
if is_task_complete(result):
|
||||
data = result[len(TASK_COMPLETE_SIGNAL) + 1 :] # +1 for the colon
|
||||
# Try to parse as JSON for new format
|
||||
try:
|
||||
report = CompletionReport.from_json(data)
|
||||
return report.summary
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
# Fall back to raw string for legacy format
|
||||
return data
|
||||
return result
|
||||
|
||||
|
||||
def extract_completion_report(result: str) -> Optional[CompletionReport]:
|
||||
"""Extract the full structured report from a task_complete result."""
|
||||
if is_task_complete(result):
|
||||
data = result[len(TASK_COMPLETE_SIGNAL) + 1 :]
|
||||
try:
|
||||
return CompletionReport.from_json(data)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
# Legacy format - wrap in report
|
||||
return CompletionReport(status="success", summary=data)
|
||||
return None
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Notes tool for GhostCrew - persistent key findings storage."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
@@ -9,28 +10,42 @@ from ..registry import ToolSchema, register_tool
|
||||
# Notes storage - kept at loot root for easy access
|
||||
_notes: Dict[str, str] = {}
|
||||
_notes_file: Path = Path("loot/notes.json")
|
||||
# Lock for safe concurrent access from multiple agents (asyncio since agents are async tasks)
|
||||
_notes_lock = asyncio.Lock()
|
||||
|
||||
|
||||
def _load_notes() -> None:
|
||||
"""Load notes from file."""
|
||||
def _load_notes_unlocked() -> None:
|
||||
"""Load notes from file (caller must hold lock)."""
|
||||
global _notes
|
||||
if _notes_file.exists():
|
||||
try:
|
||||
_notes = json.loads(_notes_file.read_text())
|
||||
_notes = json.loads(_notes_file.read_text(encoding="utf-8"))
|
||||
except (json.JSONDecodeError, IOError):
|
||||
_notes = {}
|
||||
|
||||
|
||||
def _save_notes() -> None:
|
||||
"""Save notes to file."""
|
||||
def _save_notes_unlocked() -> None:
|
||||
"""Save notes to file (caller must hold lock)."""
|
||||
_notes_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
_notes_file.write_text(json.dumps(_notes, indent=2))
|
||||
_notes_file.write_text(json.dumps(_notes, indent=2), encoding="utf-8")
|
||||
|
||||
|
||||
def get_all_notes() -> Dict[str, str]:
|
||||
async def get_all_notes() -> Dict[str, str]:
|
||||
"""Get all notes (for TUI /notes command)."""
|
||||
if not _notes:
|
||||
_load_notes()
|
||||
async with _notes_lock:
|
||||
if not _notes:
|
||||
_load_notes_unlocked()
|
||||
return _notes.copy()
|
||||
|
||||
|
||||
def get_all_notes_sync() -> Dict[str, str]:
|
||||
"""Get all notes synchronously (read-only, best effort for prompts)."""
|
||||
# If notes are empty, try to load from disk (safe read)
|
||||
if not _notes and _notes_file.exists():
|
||||
try:
|
||||
return json.loads(_notes_file.read_text(encoding="utf-8"))
|
||||
except (json.JSONDecodeError, IOError):
|
||||
pass
|
||||
return _notes.copy()
|
||||
|
||||
|
||||
@@ -38,11 +53,12 @@ def set_notes_file(path: Path) -> None:
|
||||
"""Set custom notes file path."""
|
||||
global _notes_file
|
||||
_notes_file = path
|
||||
_load_notes()
|
||||
# Can't use async here, so load without lock (called at init time)
|
||||
_load_notes_unlocked()
|
||||
|
||||
|
||||
# Load notes on module import
|
||||
_load_notes()
|
||||
# Load notes on module import (init time, no contention yet)
|
||||
_load_notes_unlocked()
|
||||
|
||||
|
||||
@register_tool(
|
||||
@@ -83,58 +99,59 @@ async def notes(arguments: dict, runtime) -> str:
|
||||
key = arguments.get("key", "").strip()
|
||||
value = arguments.get("value", "")
|
||||
|
||||
if action == "create":
|
||||
if not key:
|
||||
return "Error: key is required for create"
|
||||
if not value:
|
||||
return "Error: value is required for create"
|
||||
if key in _notes:
|
||||
return f"Error: note '{key}' already exists. Use 'update' to modify."
|
||||
async with _notes_lock:
|
||||
if action == "create":
|
||||
if not key:
|
||||
return "Error: key is required for create"
|
||||
if not value:
|
||||
return "Error: value is required for create"
|
||||
if key in _notes:
|
||||
return f"Error: note '{key}' already exists. Use 'update' to modify."
|
||||
|
||||
_notes[key] = value
|
||||
_save_notes()
|
||||
return f"Created note '{key}'"
|
||||
_notes[key] = value
|
||||
_save_notes_unlocked()
|
||||
return f"Created note '{key}'"
|
||||
|
||||
elif action == "read":
|
||||
if not key:
|
||||
return "Error: key is required for read"
|
||||
if key not in _notes:
|
||||
return f"Note '{key}' not found"
|
||||
elif action == "read":
|
||||
if not key:
|
||||
return "Error: key is required for read"
|
||||
if key not in _notes:
|
||||
return f"Note '{key}' not found"
|
||||
|
||||
return f"[{key}] {_notes[key]}"
|
||||
return f"[{key}] {_notes[key]}"
|
||||
|
||||
elif action == "update":
|
||||
if not key:
|
||||
return "Error: key is required for update"
|
||||
if not value:
|
||||
return "Error: value is required for update"
|
||||
elif action == "update":
|
||||
if not key:
|
||||
return "Error: key is required for update"
|
||||
if not value:
|
||||
return "Error: value is required for update"
|
||||
|
||||
existed = key in _notes
|
||||
_notes[key] = value
|
||||
_save_notes()
|
||||
return f"{'Updated' if existed else 'Created'} note '{key}'"
|
||||
existed = key in _notes
|
||||
_notes[key] = value
|
||||
_save_notes_unlocked()
|
||||
return f"{'Updated' if existed else 'Created'} note '{key}'"
|
||||
|
||||
elif action == "delete":
|
||||
if not key:
|
||||
return "Error: key is required for delete"
|
||||
if key not in _notes:
|
||||
return f"Note '{key}' not found"
|
||||
elif action == "delete":
|
||||
if not key:
|
||||
return "Error: key is required for delete"
|
||||
if key not in _notes:
|
||||
return f"Note '{key}' not found"
|
||||
|
||||
del _notes[key]
|
||||
_save_notes()
|
||||
return f"Deleted note '{key}'"
|
||||
del _notes[key]
|
||||
_save_notes_unlocked()
|
||||
return f"Deleted note '{key}'"
|
||||
|
||||
elif action == "list":
|
||||
if not _notes:
|
||||
return "No notes saved"
|
||||
elif action == "list":
|
||||
if not _notes:
|
||||
return "No notes saved"
|
||||
|
||||
lines = [f"Notes ({len(_notes)} entries):"]
|
||||
for k, v in _notes.items():
|
||||
# Truncate long values for display
|
||||
display_val = v if len(v) <= 60 else v[:57] + "..."
|
||||
lines.append(f" [{k}] {display_val}")
|
||||
lines = [f"Notes ({len(_notes)} entries):"]
|
||||
for k, v in _notes.items():
|
||||
# Truncate long values for display
|
||||
display_val = v if len(v) <= 60 else v[:57] + "..."
|
||||
lines.append(f" [{k}] {display_val}")
|
||||
|
||||
return "\n".join(lines)
|
||||
return "\n".join(lines)
|
||||
|
||||
else:
|
||||
return f"Unknown action: {action}"
|
||||
else:
|
||||
return f"Unknown action: {action}"
|
||||
|
||||
Reference in New Issue
Block a user