From cd036f873c5e6a5f049bc856ae885f2862dccb83 Mon Sep 17 00:00:00 2001 From: GH05TCREW Date: Fri, 12 Dec 2025 10:56:31 -0700 Subject: [PATCH] fix: agent completion loop and crew synthesis robustness --- ghostcrew/agents/base_agent.py | 336 ++++++++++---- ghostcrew/agents/crew/orchestrator.py | 68 +-- ghostcrew/agents/crew/worker_pool.py | 15 +- .../agents/ghostcrew_agent/ghostcrew_agent.py | 35 +- ghostcrew/agents/prompts/__init__.py | 26 ++ ghostcrew/agents/prompts/ghost_agent.jinja | 64 +++ .../ghost_assist.jinja} | 23 +- ghostcrew/agents/prompts/ghost_crew.jinja | 45 ++ ghostcrew/interface/tui.py | 22 +- ghostcrew/llm/llm.py | 6 +- ghostcrew/mcp/manager.py | 29 +- ghostcrew/runtime/runtime.py | 7 +- ghostcrew/tools/codesearch/__init__.py | 424 ------------------ ghostcrew/tools/completion/__init__.py | 176 -------- ghostcrew/tools/filesystem/__init__.py | 352 --------------- ghostcrew/tools/finish/__init__.py | 244 ++++++++++ ghostcrew/tools/notes/__init__.py | 129 +++--- 17 files changed, 816 insertions(+), 1185 deletions(-) create mode 100644 ghostcrew/agents/prompts/__init__.py create mode 100644 ghostcrew/agents/prompts/ghost_agent.jinja rename ghostcrew/agents/{ghostcrew_agent/system_prompt.jinja => prompts/ghost_assist.jinja} (56%) create mode 100644 ghostcrew/agents/prompts/ghost_crew.jinja delete mode 100644 ghostcrew/tools/codesearch/__init__.py delete mode 100644 ghostcrew/tools/completion/__init__.py delete mode 100644 ghostcrew/tools/filesystem/__init__.py create mode 100644 ghostcrew/tools/finish/__init__.py diff --git a/ghostcrew/agents/base_agent.py b/ghostcrew/agents/base_agent.py index a743c51..7d82521 100644 --- a/ghostcrew/agents/base_agent.py +++ b/ghostcrew/agents/base_agent.py @@ -90,12 +90,22 @@ class BaseAgent(ABC): max_iterations: Maximum iterations before forcing stop (safety limit) """ self.llm = llm - self.tools = tools self.runtime = runtime self.max_iterations = max_iterations self.state_manager = AgentStateManager() self.conversation_history: List[AgentMessage] = [] + # Each agent gets its own plan instance + from ..tools.finish import TaskPlan + + self._task_plan = TaskPlan() + + # Attach plan to runtime so finish tool can access it + self.runtime.plan = self._task_plan + + # Use tools as-is (finish accesses plan via runtime) + self.tools = list(tools) + @property def state(self) -> AgentState: """Get current agent state.""" @@ -134,14 +144,20 @@ class BaseAgent(ABC): self.state_manager.transition_to(AgentState.IDLE) @abstractmethod - def get_system_prompt(self) -> str: - """Return the system prompt for this agent.""" + def get_system_prompt(self, mode: str = "agent") -> str: + """Return the system prompt for this agent. + + Args: + mode: 'agent' for autonomous mode, 'assist' for single-shot assist mode + """ pass async def agent_loop(self, initial_message: str) -> AsyncIterator[AgentMessage]: """ Main agent execution loop. + Starts a new task session, resetting previous state and history. + Simple control flow: - Tool calls: Execute tools, continue loop - Text response (no tools): Done @@ -153,6 +169,9 @@ class BaseAgent(ABC): Yields: AgentMessage objects as the agent processes """ + # Always reset for a new agent loop task to ensure clean state + self.reset() + self.state_manager.transition_to(AgentState.THINKING) self.conversation_history.append( AgentMessage(role="user", content=initial_message) @@ -186,112 +205,137 @@ class BaseAgent(ABC): Core agent loop logic - shared by agent_loop and continue_conversation. Termination conditions: - 1. finish tool is called -> clean exit with summary + 1. finish tool is called AND plan complete -> clean exit with summary 2. max_iterations reached -> forced exit with warning 3. error -> exit with error state Text responses WITHOUT tool calls are treated as "thinking out loud" and do NOT terminate the loop. This prevents premature stopping. + The loop enforces plan completion before allowing finish. + Yields: AgentMessage objects as the agent processes """ - from ..tools.completion import extract_completion_summary, is_task_complete + # Clear any previous plan for new task + self._task_plan.clear() iteration = 0 while iteration < self.max_iterations: iteration += 1 + # ITERATION 1: Force plan creation (loop-enforced, not prompt-based) + if iteration == 1 and len(self._task_plan.steps) == 0: + plan_msg = await self._auto_generate_plan() + if plan_msg: + yield plan_msg + response = await self.llm.generate( system_prompt=self.get_system_prompt(), messages=self._format_messages_for_llm(), tools=self.tools, ) - if response.tool_calls: - # Build tool calls list FIRST (before execution) - tool_calls = [ - ToolCall( - id=tc.id if hasattr(tc, "id") else str(i), - name=( - tc.function.name - if hasattr(tc, "function") - else tc.get("name", "") - ), - arguments=self._parse_arguments(tc), - ) - for i, tc in enumerate(response.tool_calls) - ] - - # Yield early - show tool calls before execution starts - early_msg = AgentMessage( + # Case 1: Empty response (Error) + if not response.tool_calls and not response.content: + stuck_msg = AgentMessage( role="assistant", - content=response.content or "", - tool_calls=tool_calls, - tool_results=[], # No results yet - usage=response.usage, + content="Agent returned empty response. Exiting gracefully.", + metadata={"empty_response": True}, ) - yield early_msg + self.conversation_history.append(stuck_msg) + yield stuck_msg + self.state_manager.transition_to(AgentState.COMPLETE) + return - # Now execute tools - self.state_manager.transition_to(AgentState.EXECUTING) - tool_results = await self._execute_tools(response.tool_calls) - - # Record in history - assistant_msg = AgentMessage( + # Case 2: Thinking / Intermediate Output (Content but no tools) + if not response.tool_calls: + thinking_msg = AgentMessage( role="assistant", - content=response.content or "", - tool_calls=tool_calls, + content=response.content, usage=response.usage, + metadata={"intermediate": True}, ) - self.conversation_history.append(assistant_msg) + self.conversation_history.append(thinking_msg) + yield thinking_msg + continue - tool_result_msg = AgentMessage( - role="tool_result", content="", tool_results=tool_results + # Case 3: Tool Execution + # Build tool calls list + tool_calls = [ + ToolCall( + id=tc.id if hasattr(tc, "id") else str(i), + name=( + tc.function.name + if hasattr(tc, "function") + else tc.get("name", "") + ), + arguments=self._parse_arguments(tc), ) - self.conversation_history.append(tool_result_msg) + for i, tc in enumerate(response.tool_calls) + ] - # Check for explicit task_complete signal - for result in tool_results: - if result.success and result.result and is_task_complete(result.result): - summary = extract_completion_summary(result.result) - # Yield results with completion summary - display_msg = AgentMessage( - role="assistant", - content=summary, - tool_calls=tool_calls, - tool_results=tool_results, - usage=response.usage, - metadata={"task_complete": True}, - ) - yield display_msg - self.state_manager.transition_to(AgentState.COMPLETE) - return + # Execute tools + self.state_manager.transition_to(AgentState.EXECUTING) - # Yield results for display update (no completion yet) - display_msg = AgentMessage( + # Yield thinking message if content exists (before execution) + if response.content: + thinking_msg = AgentMessage( role="assistant", - content=response.content or "", - tool_calls=tool_calls, - tool_results=tool_results, + content=response.content, usage=response.usage, + metadata={"intermediate": True}, ) - yield display_msg - self.state_manager.transition_to(AgentState.THINKING) - else: - # Text response WITHOUT tool calls = thinking/intermediate output - # Store it but DON'T terminate - wait for task_complete - if response.content: - thinking_msg = AgentMessage( - role="assistant", - content=response.content, - usage=response.usage, - metadata={"intermediate": True}, - ) - self.conversation_history.append(thinking_msg) - yield thinking_msg - # Continue loop - only task_complete or max_iterations stops us + yield thinking_msg + + tool_results = await self._execute_tools(response.tool_calls) + + # Record in history + assistant_msg = AgentMessage( + role="assistant", + content=response.content or "", + tool_calls=tool_calls, + usage=response.usage, + ) + self.conversation_history.append(assistant_msg) + + tool_result_msg = AgentMessage( + role="tool_result", content="", tool_results=tool_results + ) + self.conversation_history.append(tool_result_msg) + + # Yield results for display update immediately + display_msg = AgentMessage( + role="assistant", + content="", # Suppress content here as it was already yielded as thinking + tool_calls=tool_calls, + tool_results=tool_results, + usage=response.usage, + ) + yield display_msg + + # Check if plan is now complete + if self._task_plan.is_complete(): + # All steps done - generate final summary + summary_response = await self.llm.generate( + system_prompt="You are a helpful assistant. Provide a brief, clear summary of what was accomplished.", + messages=self._format_messages_for_llm(), + tools=self.tools, # Must provide tools if history contains tool calls + ) + + completion_msg = AgentMessage( + role="assistant", + content=summary_response.content or "Task complete.", + usage=summary_response.usage, + metadata={"task_complete": True}, + ) + self.conversation_history.append(completion_msg) + yield completion_msg + self.state_manager.transition_to(AgentState.COMPLETE) + return + + self.state_manager.transition_to(AgentState.THINKING) # Max iterations reached - force stop warning_msg = AgentMessage( @@ -424,6 +468,128 @@ class BaseAgent(ABC): return tool return None + def _can_finish(self) -> tuple[bool, str]: + """Check if the agent can finish based on plan completion.""" + if len(self._task_plan.steps) == 0: + return True, "No plan exists" + + pending = self._task_plan.get_pending_steps() + if pending: + pending_desc = ", ".join( + f"Step {s.id}: {s.description}" for s in pending[:3] + ) + more = f" (+{len(pending) - 3} more)" if len(pending) > 3 else "" + return False, f"Incomplete: {pending_desc}{more}" + + return True, "All steps complete" + + async def _auto_generate_plan(self) -> Optional[AgentMessage]: + """ + Automatically generate a plan from the user's request (loop-enforced). + + This is called on iteration 1 to force plan creation before any tool execution. + Uses function calling for reliable structured output. + + Returns: + AgentMessage with plan display, or None if generation fails + """ + from ..tools.finish import PlanStep + from ..tools.registry import Tool, ToolSchema + + # Get the user's original request (last message) + user_request = "" + for msg in reversed(self.conversation_history): + if msg.role == "user": + user_request = msg.content + break + + if not user_request: + return None # No request to plan + + # Create a temporary tool for plan generation (function calling) + plan_generator_tool = Tool( + name="create_plan", + description="Create a step-by-step plan for the task. Call this with the steps needed.", + schema=ToolSchema( + properties={ + "steps": { + "type": "array", + "items": {"type": "string"}, + "description": "List of actionable steps (one tool action each)", + }, + }, + required=["steps"], + ), + execute_fn=lambda args, runtime: None, # Dummy - we parse args directly + category="planning", + ) + + plan_prompt = f"""Break this request into minimal, actionable steps. + +Request: {user_request} + +Guidelines: +- Be concise (typically 2-4 steps) +- One tool action per step +- Don't include waiting/loading (handled automatically) +- Do NOT include a "finish", "complete", or "verify" step (handled automatically) + +Call the create_plan tool with your steps.""" + + try: + response = await self.llm.generate( + system_prompt="You are a task planning assistant. Always use the create_plan tool.", + messages=[{"role": "user", "content": plan_prompt}], + tools=[plan_generator_tool], + ) + + # Extract steps from tool call arguments + steps = [] + if response.tool_calls: + for tc in response.tool_calls: + args = self._parse_arguments(tc) + if args.get("steps"): + steps = args["steps"] + break + + # Fallback: if LLM didn't provide steps, create single-step plan + if not steps: + steps = [user_request] + + # Create the plan + self._task_plan.original_request = user_request + self._task_plan.steps = [ + PlanStep(id=i + 1, description=str(step).strip()) + for i, step in enumerate(steps) + ] + + # Add a system message showing the generated plan + plan_display = ["Plan:"] + for step in self._task_plan.steps: + plan_display.append(f" {step.id}. {step.description}") + + plan_msg = AgentMessage( + role="assistant", + content="\n".join(plan_display), + metadata={"auto_plan": True}, + ) + self.conversation_history.append(plan_msg) + return plan_msg + + except Exception as e: + # Plan generation failed - create fallback single-step plan + self._task_plan.original_request = user_request + self._task_plan.steps = [PlanStep(id=1, description=user_request)] + + error_msg = AgentMessage( + role="assistant", + content=f"Plan generation failed: {str(e)}\nUsing fallback: treating request as single step.", + metadata={"auto_plan_failed": True}, + ) + self.conversation_history.append(error_msg) + return error_msg + return error_msg + def reset(self): """Reset the agent state for a new conversation.""" self.state_manager.reset() @@ -453,7 +619,7 @@ class BaseAgent(ABC): # Single LLM call with tools available response = await self.llm.generate( - system_prompt=self.get_system_prompt(), + system_prompt=self.get_system_prompt(mode="assist"), messages=self._format_messages_for_llm(), tools=assist_tools, ) @@ -476,10 +642,13 @@ class BaseAgent(ABC): # Yield tool calls IMMEDIATELY (before execution) for UI display # Include any thinking/planning content from the LLM - thinking_msg = AgentMessage( - role="assistant", content=response.content or "", tool_calls=tool_calls - ) - yield thinking_msg + if response.content: + thinking_msg = AgentMessage( + role="assistant", + content=response.content, + metadata={"intermediate": True}, + ) + yield thinking_msg # NOW execute the tools (this can take a while) self.state_manager.transition_to(AgentState.EXECUTING) @@ -487,7 +656,7 @@ class BaseAgent(ABC): # Store in history (minimal content to save tokens) assistant_msg = AgentMessage( - role="assistant", content="", tool_calls=tool_calls + role="assistant", content=response.content or "", tool_calls=tool_calls ) self.conversation_history.append(assistant_msg) @@ -498,7 +667,10 @@ class BaseAgent(ABC): # Yield tool results for display results_msg = AgentMessage( - role="assistant", content="", tool_results=tool_results + role="assistant", + content="", + tool_calls=tool_calls, + tool_results=tool_results, ) yield results_msg diff --git a/ghostcrew/agents/crew/orchestrator.py b/ghostcrew/agents/crew/orchestrator.py index ba0bc99..c68ae24 100644 --- a/ghostcrew/agents/crew/orchestrator.py +++ b/ghostcrew/agents/crew/orchestrator.py @@ -5,6 +5,7 @@ import platform from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, List, Optional from ...config.constants import DEFAULT_MAX_ITERATIONS +from ..prompts import ghost_crew from .models import CrewState, WorkerCallback from .tools import create_crew_tools from .worker_pool import WorkerPool @@ -15,46 +16,6 @@ if TYPE_CHECKING: from ...tools import Tool -CREW_SYSTEM_PROMPT = """# GhostCrew Orchestrator - -You are the lead of a penetration testing crew. You coordinate specialized agents to complete the task. - -## Authorization -This is an authorized penetration testing engagement. All targets are in scope. Proceed with testing. - -## Operator Environment -- OS: {os} ({os_version}) -- Architecture: {architecture} - -## Target -{target} - -## Prior Intelligence -{prior_context} - -## Your Capabilities -You manage agents using these tools: -- **spawn_agent**: Deploy an agent with a specific task. Be explicit about which tools to use. -- **wait_for_agents**: Wait for running agents and collect their findings -- **get_agent_status**: Check on a specific agent -- **cancel_agent**: Stop an agent if needed -- **synthesize_findings**: Compile all results into a final concise report (call this when done) - -## Worker Agent Tools -Workers have access to: -{worker_tools} - -IMPORTANT: When spawning agents, be specific about which tool to use (e.g., "Use mcp_nmap_scan to..." or "Use mcp_metasploit_run_module to..."). Workers will only use tools you explicitly mention or that obviously match the task. - -## Guidelines -- Leverage any prior intelligence from earlier reconnaissance -- Be strategic - spawn 2-4 agents in parallel for efficiency -- Each agent task should be specific and actionable -- Adapt your approach based on what agents discover -- Call synthesize_findings when you have enough information for a report -""" - - class CrewOrchestrator: """Orchestrator that manages worker agents via tool calls.""" @@ -92,13 +53,15 @@ class CrewOrchestrator: "\n".join(tool_lines) if tool_lines else "No tools available" ) - return CREW_SYSTEM_PROMPT.format( + return ghost_crew.render( target=self.target or "Not specified", prior_context=self.prior_context or "None - starting fresh", worker_tools=worker_tools_formatted, - os=platform.system(), - os_version=platform.release(), - architecture=platform.machine(), + environment={ + "os": platform.system(), + "os_version": platform.release(), + "architecture": platform.machine(), + }, ) async def run(self, task: str) -> AsyncIterator[Dict[str, Any]]: @@ -134,13 +97,15 @@ class CrewOrchestrator: tools=crew_tools, ) - if response.content: - yield {"phase": "thinking", "content": response.content} - self._messages.append( - {"role": "assistant", "content": response.content} - ) - + # Check for tool calls first to determine if content is "thinking" or "final answer" if response.tool_calls: + # If there are tool calls, the content is "thinking" (reasoning before action) + if response.content: + yield {"phase": "thinking", "content": response.content} + self._messages.append( + {"role": "assistant", "content": response.content} + ) + def get_tc_name(tc): if hasattr(tc, "function"): return tc.function.name @@ -245,9 +210,12 @@ class CrewOrchestrator: if final_report: break else: + # No tool calls - the content IS the final report (Direct Answer) content = response.content or "" if content: final_report = content + # Record it in history but don't yield as "thinking" + self._messages.append({"role": "assistant", "content": content}) break self.state = CrewState.COMPLETE diff --git a/ghostcrew/agents/crew/worker_pool.py b/ghostcrew/agents/crew/worker_pool.py index 7009e6d..40c6ac1 100644 --- a/ghostcrew/agents/crew/worker_pool.py +++ b/ghostcrew/agents/crew/worker_pool.py @@ -103,10 +103,16 @@ class WorkerPool: worker.started_at = time.time() self._emit(worker.id, "status", {"status": "running"}) + # Create isolated runtime for this worker (prevents browser state conflicts) + from ...runtime.runtime import LocalRuntime + + worker_runtime = LocalRuntime() + await worker_runtime.start() + agent = GhostCrewAgent( llm=self.llm, tools=self.tools, - runtime=self.runtime, + runtime=worker_runtime, # Use isolated runtime target=self.target, rag_engine=self.rag_engine, ) @@ -156,6 +162,13 @@ class WorkerPool: worker.completed_at = time.time() self._emit(worker.id, "error", {"error": str(e)}) + finally: + # Cleanup worker's isolated runtime + try: + await worker_runtime.stop() + except Exception: + pass # Best effort cleanup + async def _wait_for_dependencies(self, depends_on: List[str]) -> None: """Wait for dependent workers to complete.""" for dep_id in depends_on: diff --git a/ghostcrew/agents/ghostcrew_agent/ghostcrew_agent.py b/ghostcrew/agents/ghostcrew_agent/ghostcrew_agent.py index 5c6b25d..ead7d7f 100644 --- a/ghostcrew/agents/ghostcrew_agent/ghostcrew_agent.py +++ b/ghostcrew/agents/ghostcrew_agent/ghostcrew_agent.py @@ -1,11 +1,9 @@ """GhostCrew main pentesting agent.""" -from pathlib import Path from typing import TYPE_CHECKING, List, Optional -from jinja2 import Template - from ..base_agent import BaseAgent +from ..prompts import ghost_agent, ghost_assist if TYPE_CHECKING: from ...knowledge import RAGEngine @@ -43,15 +41,13 @@ class GhostCrewAgent(BaseAgent): self.target = target self.scope = scope or [] self.rag_engine = rag_engine - self._system_prompt_template = self._load_prompt_template() - def _load_prompt_template(self) -> Template: - """Load the Jinja2 system prompt template.""" - template_path = Path(__file__).parent / "system_prompt.jinja" - return Template(template_path.read_text(encoding="utf-8")) + def get_system_prompt(self, mode: str = "agent") -> str: + """Generate system prompt with context. - def get_system_prompt(self) -> str: - """Generate system prompt with context.""" + Args: + mode: 'agent' for autonomous mode, 'assist' for single-shot assist mode + """ # Get RAG context if available rag_context = "" if self.rag_engine and self.conversation_history: @@ -68,15 +64,32 @@ class GhostCrewAgent(BaseAgent): if relevant: rag_context = "\n\n".join(relevant) + # Get saved notes if available + notes_context = "" + try: + from ...tools.notes import get_all_notes_sync + + notes = get_all_notes_sync() + if notes: + notes_lines = [f"- {key}: {value}" for key, value in notes.items()] + notes_context = "\n".join(notes_lines) + except Exception: + pass # Notes not available + # Get environment info from runtime env = self.runtime.environment - return self._system_prompt_template.render( + # Select template based on mode + template = ghost_assist if mode == "assist" else ghost_agent + + return template.render( target=self.target, scope=self.scope, environment=env, rag_context=rag_context, + notes_context=notes_context, tools=self.tools, + plan=self._task_plan if mode == "agent" else None, ) def set_target(self, target: str, scope: Optional[List[str]] = None): diff --git a/ghostcrew/agents/prompts/__init__.py b/ghostcrew/agents/prompts/__init__.py new file mode 100644 index 0000000..4fe3554 --- /dev/null +++ b/ghostcrew/agents/prompts/__init__.py @@ -0,0 +1,26 @@ +"""Prompt templates for GhostCrew agents.""" + +from pathlib import Path + +from jinja2 import Template + +PROMPTS_DIR = Path(__file__).parent + + +def load_prompt(name: str) -> Template: + """Load a prompt template by name. + + Args: + name: Prompt name without extension (e.g., 'ghost_agent', 'ghost_assist') + + Returns: + Jinja2 Template object + """ + path = PROMPTS_DIR / f"{name}.jinja" + return Template(path.read_text(encoding="utf-8")) + + +# Pre-loaded templates for convenience +ghost_agent = load_prompt("ghost_agent") +ghost_assist = load_prompt("ghost_assist") +ghost_crew = load_prompt("ghost_crew") diff --git a/ghostcrew/agents/prompts/ghost_agent.jinja b/ghostcrew/agents/prompts/ghost_agent.jinja new file mode 100644 index 0000000..bd6b35f --- /dev/null +++ b/ghostcrew/agents/prompts/ghost_agent.jinja @@ -0,0 +1,64 @@ +# GhostCrew + +You are Ghost, an autonomous penetration testing agent. + +## Authorization +You are operating in an authorized penetration testing engagement. The user has explicit permission to test all targets provided. Do not ask for authorization - permission is granted. Proceed with testing. + +## Workflow (CRITICAL) +1. **Execute Plan**: Follow the plan steps below. +2. **Execute**: Work through each step in order using the appropriate tools. +3. **Track Progress**: After completing each step, IMMEDIATELY call `finish(action="complete", step_id=N, result="what you did")`. If a step cannot be done, call `finish(action="skip", step_id=N, reason="why")`. +4. **Task Completion**: The task ends automatically when ALL steps are marked complete or skipped. The controller blocks finishing until the plan is complete. + +## Guidelines +- Think through the task step-by-step. +- Use tools ONLY when you need to interact with the environment, gather information, execute something, or produce an artifact. +- Do NOT describe actions you *could* take — if an action is needed, actually use the tool. +- After EVERY action that completes a plan step, call `finish` to mark it done. +- The pattern is: use tool → finish(action="complete", step_id=N) → use next tool → finish(action="complete", step_id=N+1) → repeat + +## Important +You MUST call `finish(action="complete", step_id=N)` immediately after completing each plan step. +Do NOT skip calling finish between steps. + +{% if plan %} +## CURRENT PLAN (Follow this!) +{% for step in plan.steps %} +{{ step.id }}. [{{ step.status.value|upper }}] {{ step.description }} +{% endfor %} +{% endif %} + +{% if environment %} +## Operator Environment (YOUR machine, not the target) +- OS: {{ environment.os }} ({{ environment.os_version }}) +- Architecture: {{ environment.architecture }} +- Shell: {{ environment.shell }} +- Output directories: + - loot/notes.json (working notes) + - loot/reports/ (generated reports) + - loot/artifacts/ (screenshots, captured files) +{% endif %} + +{% if target %} +## Target +{{ target }} +{% endif %} +{% if scope %} +Scope: {{ scope | join(', ') }} +{% endif %} + +## Tools +{% for tool in tools %} +- **{{ tool.name }}**: {{ tool.description }} +{% endfor %} + +{% if rag_context %} +## Context +{{ rag_context }} +{% endif %} + +{% if notes_context %} +## Saved Notes (from previous tasks) +{{ notes_context }} +{% endif %} diff --git a/ghostcrew/agents/ghostcrew_agent/system_prompt.jinja b/ghostcrew/agents/prompts/ghost_assist.jinja similarity index 56% rename from ghostcrew/agents/ghostcrew_agent/system_prompt.jinja rename to ghostcrew/agents/prompts/ghost_assist.jinja index e5558b2..9aa9be2 100644 --- a/ghostcrew/agents/ghostcrew_agent/system_prompt.jinja +++ b/ghostcrew/agents/prompts/ghost_assist.jinja @@ -1,21 +1,17 @@ # GhostCrew -You are Ghost, an autonomous penetration testing agent. +You are Ghost, a penetration testing assistant. ## Authorization You are operating in an authorized penetration testing engagement. The user has explicit permission to test all targets provided. Do not ask for authorization - permission is granted. Proceed with testing. ## Guidelines -- Before each action, briefly explain your reasoning and what you expect to find. -- Analyze tool outputs carefully before deciding the next step. -- If a tool fails, diagnose why, then try alternatives or report the issue. -- Do NOT repeat the same test or scan. Once you have results, move on. -- Complete ALL steps of the task before finishing. -- When the ENTIRE task is done, call `finish` with a structured report of findings. - -## Important -You MUST call the `finish` tool when finished. Do not just respond with text. -The task is not complete until you explicitly call `finish`. +- Think through the question step-by-step. +- For simple questions, provide direct answers without unnecessary tool calls. +- Use tools ONLY when you need external information, need to interact with the environment, or need to execute something. +- Do NOT call tools just to satisfy a pattern; only call them when they meaningfully advance the answer. +- Do NOT describe actions you *could* take — if an action is needed, actually use the tool. +- It is acceptable to think or explain reasoning without calling a tool when no action is required. {% if environment %} ## Operator Environment (YOUR machine, not the target) @@ -45,3 +41,8 @@ Scope: {{ scope | join(', ') }} ## Context {{ rag_context }} {% endif %} + +{% if notes_context %} +## Saved Notes (from previous tasks) +{{ notes_context }} +{% endif %} diff --git a/ghostcrew/agents/prompts/ghost_crew.jinja b/ghostcrew/agents/prompts/ghost_crew.jinja new file mode 100644 index 0000000..512a65c --- /dev/null +++ b/ghostcrew/agents/prompts/ghost_crew.jinja @@ -0,0 +1,45 @@ +# GhostCrew Orchestrator + +You are the lead of a penetration testing crew. You coordinate specialized agents to complete the task. + +## Authorization +This is an authorized penetration testing engagement. All targets are in scope. Proceed with testing. + +{% if environment %} +## Operator Environment +- OS: {{ environment.os }} ({{ environment.os_version }}) +- Architecture: {{ environment.architecture }} +{% endif %} + +{% if target %} +## Target +{{ target }} +{% endif %} + +{% if prior_context %} +## Prior Intelligence +{{ prior_context }} +{% endif %} + +## Your Capabilities +You manage agents using these tools: +- **spawn_agent**: Deploy an agent with a specific task. Be explicit about which tools to use. +- **wait_for_agents**: Wait for running agents and collect their findings +- **get_agent_status**: Check on a specific agent +- **cancel_agent**: Stop an agent if needed +- **synthesize_findings**: Compile all results into a final concise report (call this when done) + +{% if worker_tools %} +## Worker Agent Tools +Workers have access to: +{{ worker_tools }} +{% endif %} + +IMPORTANT: When spawning agents, be specific about which tool to use (e.g., "Use mcp_nmap_scan to..." or "Use mcp_metasploit_run_module to..."). Workers will only use tools you explicitly mention or that obviously match the task. + +## Guidelines +- Leverage any prior intelligence from earlier reconnaissance +- Be strategic - spawn 2-4 agents in parallel for efficiency +- Each agent task should be specific and actionable +- Adapt your approach based on what agents discover +- Call synthesize_findings when you have enough information for a report diff --git a/ghostcrew/interface/tui.py b/ghostcrew/interface/tui.py index b118086..8f33649 100644 --- a/ghostcrew/interface/tui.py +++ b/ghostcrew/interface/tui.py @@ -822,11 +822,11 @@ class GhostCrewTUI(App): else: self._add_system("Agent not initialized") - def _show_notes(self) -> None: + async def _show_notes(self) -> None: """Display saved notes""" from ..tools.notes import get_all_notes - notes = get_all_notes() + notes = await get_all_notes() if not notes: self._add_system( "=== Notes ===\nNo notes saved.\n\nThe AI can save key findings using the notes tool." @@ -937,7 +937,7 @@ class GhostCrewTUI(App): self._add_system("[!] Agent not initialized") return - notes = get_all_notes() + notes = await get_all_notes() if not notes: self._add_system( "No notes found. Ghost saves findings using the notes tool during testing." @@ -1062,7 +1062,7 @@ Be concise. Use the actual data from notes.""" elif cmd_lower == "/memory": self._show_memory_stats() elif cmd_lower == "/notes": - self._show_notes() + await self._show_notes() elif cmd_lower == "/report": self._run_report_generation() elif cmd_original.startswith("/target"): @@ -1610,7 +1610,9 @@ Be concise. Use the actual data from notes.""" # Show thinking/plan FIRST if there's content with tool calls if response.content: content = response.content.strip() - if response.tool_calls: + # If it has tool calls, it's thinking. + # If it's marked as intermediate, it's thinking. + if response.tool_calls or response.metadata.get("intermediate"): self._add_thinking(content) else: # Check if this is a task completion message @@ -1619,20 +1621,20 @@ Be concise. Use the actual data from notes.""" else: self._add_assistant(content) - # Show tool calls AFTER thinking (skip 'finish' - internal control) + # Show tool calls AFTER thinking if response.tool_calls: for call in response.tool_calls: - if call.name == "finish": - continue # Skip - summary shown as final message + # Show all tools including finish args_str = str(call.arguments) self._add_tool(call.name, args_str) # Show tool results - # Skip 'finish' tool - its result is shown as the final summary if response.tool_results: for result in response.tool_results: if result.tool_name == "finish": - continue # Skip - summary shown separately + # Skip showing result for finish tool as it's redundant with the tool call display + continue + if result.success: self._add_tool_result( result.tool_name, result.result or "Done" diff --git a/ghostcrew/llm/llm.py b/ghostcrew/llm/llm.py index 10f644c..be80b0a 100644 --- a/ghostcrew/llm/llm.py +++ b/ghostcrew/llm/llm.py @@ -136,10 +136,14 @@ class LLM: call_kwargs = { "model": self.model, "messages": llm_messages, - "tools": llm_tools, "temperature": self.config.temperature, "max_tokens": self.config.max_tokens, } + + # Only include tools if they exist (Anthropic requires tools param to be omitted, not None/empty) + if llm_tools: + call_kwargs["tools"] = llm_tools + # Only add optional params if explicitly changed from defaults if self.config.top_p != 1.0: call_kwargs["top_p"] = self.config.top_p diff --git a/ghostcrew/mcp/manager.py b/ghostcrew/mcp/manager.py index 83893e5..02c6f29 100644 --- a/ghostcrew/mcp/manager.py +++ b/ghostcrew/mcp/manager.py @@ -12,6 +12,7 @@ Uses standard MCP configuration format: } """ +import asyncio import json import os from dataclasses import dataclass, field @@ -43,6 +44,9 @@ class MCPServer: transport: MCPTransport tools: List[dict] = field(default_factory=list) connected: bool = False + # Lock for serializing all communication with this server + # Prevents message ID collisions and transport interleaving + _lock: asyncio.Lock = field(default_factory=asyncio.Lock) async def disconnect(self): """Disconnect from the server.""" @@ -232,16 +236,21 @@ class MCPManager: if not server or not server.connected: raise ValueError(f"Server '{server_name}' not connected") - # Use 5 minute timeout for tool calls (scans can take a while) - response = await server.transport.send( - { - "jsonrpc": "2.0", - "method": "tools/call", - "params": {"name": tool_name, "arguments": arguments}, - "id": self._get_next_id(), - }, - timeout=300.0, - ) + # Serialize all communication with this server to prevent: + # - Message ID collisions + # - Transport write interleaving + # - Response routing issues + async with server._lock: + # Use 5 minute timeout for tool calls (scans can take a while) + response = await server.transport.send( + { + "jsonrpc": "2.0", + "method": "tools/call", + "params": {"name": tool_name, "arguments": arguments}, + "id": self._get_next_id(), + }, + timeout=300.0, + ) if "error" in response: raise RuntimeError(f"MCP error: {response['error'].get('message')}") return response.get("result", {}).get("content", []) diff --git a/ghostcrew/runtime/runtime.py b/ghostcrew/runtime/runtime.py index 4e6a493..f796145 100644 --- a/ghostcrew/runtime/runtime.py +++ b/ghostcrew/runtime/runtime.py @@ -83,6 +83,7 @@ class Runtime(ABC): mcp_manager: Optional MCP manager for tool calls """ self.mcp_manager = mcp_manager + self.plan = None # Set by agent for finish tool access @property def environment(self) -> EnvironmentInfo: @@ -310,6 +311,8 @@ class LocalRuntime(Runtime): return {"url": self._page.url, "title": await self._page.title()} elif action == "screenshot": + import time + import uuid from pathlib import Path # Navigate first if URL provided @@ -322,7 +325,9 @@ class LocalRuntime(Runtime): output_dir = Path("loot/artifacts/screenshots") output_dir.mkdir(parents=True, exist_ok=True) - filename = f"screenshot_{int(__import__('time').time())}.png" + timestamp = int(time.time()) + unique_id = uuid.uuid4().hex[:8] + filename = f"screenshot_{timestamp}_{unique_id}.png" filepath = output_dir / filename await self._page.screenshot(path=str(filepath), full_page=True) diff --git a/ghostcrew/tools/codesearch/__init__.py b/ghostcrew/tools/codesearch/__init__.py deleted file mode 100644 index 6751246..0000000 --- a/ghostcrew/tools/codesearch/__init__.py +++ /dev/null @@ -1,424 +0,0 @@ -"""Code search tool for GhostCrew - semantic code navigation and analysis.""" - -import os -import re -import subprocess -from pathlib import Path -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple - -from ..registry import ToolSchema, register_tool - -if TYPE_CHECKING: - from ...runtime import Runtime - - -# Maximum results to prevent context overflow -MAX_RESULTS = 20 -MAX_CONTEXT_LINES = 3 - - -@register_tool( - name="search_code", - description="Search for code patterns across files. Supports regex and literal search. Returns matches with surrounding context. Use for finding function definitions, variable usages, API endpoints, or security-relevant patterns.", - schema=ToolSchema( - properties={ - "query": { - "type": "string", - "description": "Search pattern (text or regex)", - }, - "path": { - "type": "string", - "description": "Directory or file to search in. Default: current directory", - }, - "pattern": { - "type": "string", - "description": "File glob pattern to filter (e.g., '*.py', '*.js'). Default: all files", - }, - "regex": { - "type": "boolean", - "description": "Treat query as regex pattern. Default: false (literal search)", - }, - "case_sensitive": { - "type": "boolean", - "description": "Case-sensitive search. Default: false", - }, - "context_lines": { - "type": "integer", - "description": "Number of context lines before/after match. Default: 3", - }, - }, - required=["query"], - ), - category="code", -) -async def search_code(arguments: dict, runtime: "Runtime") -> str: - """ - Search for code patterns across files. - - Args: - arguments: Search parameters - runtime: The runtime environment - - Returns: - Formatted search results with context - """ - query = arguments["query"] - search_path = arguments.get("path", ".") - file_pattern = arguments.get("pattern") - use_regex = arguments.get("regex", False) - case_sensitive = arguments.get("case_sensitive", False) - context_lines = min(arguments.get("context_lines", MAX_CONTEXT_LINES), 10) - - try: - path = Path(search_path).resolve() - - if not path.exists(): - return f"Error: Path not found: {search_path}" - - # Compile regex pattern - flags = 0 if case_sensitive else re.IGNORECASE - if use_regex: - try: - pattern = re.compile(query, flags) - except re.error as e: - return f"Error: Invalid regex pattern: {e}" - else: - # Escape literal string for regex matching - pattern = re.compile(re.escape(query), flags) - - # Find matching files - matches = [] - files_searched = 0 - - if path.is_file(): - files_to_search = [path] - else: - files_to_search = _get_searchable_files(path, file_pattern) - - for filepath in files_to_search: - files_searched += 1 - file_matches = _search_file(filepath, pattern, context_lines) - if file_matches: - matches.extend(file_matches) - - if len(matches) >= MAX_RESULTS: - break - - if not matches: - return f"No matches found for '{query}' in {files_searched} files" - - # Format results - output = [f"Found {len(matches)} matches in {files_searched} files:\n"] - - for match in matches[:MAX_RESULTS]: - output.append(_format_match(match)) - - if len(matches) > MAX_RESULTS: - output.append(f"\n... and {len(matches) - MAX_RESULTS} more matches (showing first {MAX_RESULTS})") - - return "\n".join(output) - - except Exception as e: - return f"Error searching code: {e}" - - -@register_tool( - name="find_definition", - description="Find the definition of a function, class, or variable. Searches for common definition patterns across languages (def, function, class, const, let, var, etc.).", - schema=ToolSchema( - properties={ - "name": { - "type": "string", - "description": "Name of the function, class, or variable to find", - }, - "path": { - "type": "string", - "description": "Directory to search in. Default: current directory", - }, - "type": { - "type": "string", - "enum": ["function", "class", "variable", "any"], - "description": "Type of definition to find. Default: 'any'", - }, - }, - required=["name"], - ), - category="code", -) -async def find_definition(arguments: dict, runtime: "Runtime") -> str: - """ - Find definition of a symbol. - - Args: - arguments: Search parameters - runtime: The runtime environment - - Returns: - Definition location(s) with context - """ - name = arguments["name"] - search_path = arguments.get("path", ".") - def_type = arguments.get("type", "any") - - # Build regex patterns for different definition types - patterns = { - "function": [ - rf"^\s*def\s+{re.escape(name)}\s*\(", # Python - rf"^\s*async\s+def\s+{re.escape(name)}\s*\(", # Python async - rf"^\s*function\s+{re.escape(name)}\s*\(", # JavaScript - rf"^\s*async\s+function\s+{re.escape(name)}\s*\(", # JS async - rf"^\s*{re.escape(name)}\s*[:=]\s*(?:async\s+)?function", # JS assigned - rf"^\s*{re.escape(name)}\s*[:=]\s*\([^)]*\)\s*=>", # JS arrow - rf"^\s*(?:public|private|protected)?\s*(?:static\s+)?(?:async\s+)?{re.escape(name)}\s*\(", # JS/TS method - rf"^\s*func\s+{re.escape(name)}\s*\(", # Go - rf"^\s*(?:public|private|protected)\s+.*\s+{re.escape(name)}\s*\(", # Java/C# - ], - "class": [ - rf"^\s*class\s+{re.escape(name)}\b", # Python/JS/TS - rf"^\s*(?:abstract\s+)?class\s+{re.escape(name)}\b", # Java/C# - rf"^\s*interface\s+{re.escape(name)}\b", # TS/Java - rf"^\s*type\s+{re.escape(name)}\s*=", # TS type alias - rf"^\s*struct\s+{re.escape(name)}\b", # Go/Rust - ], - "variable": [ - rf"^\s*{re.escape(name)}\s*=", # Python/Ruby - rf"^\s*(?:const|let|var)\s+{re.escape(name)}\b", # JavaScript - rf"^\s*(?:const|let|var)\s+{re.escape(name)}\s*:", # TypeScript - rf"^\s*(?:var|val)\s+{re.escape(name)}\b", # Kotlin/Scala - rf"^\s*{re.escape(name)}\s*:=", # Go - ], - } - - # Select patterns based on type - if def_type == "any": - selected_patterns = [] - for p_list in patterns.values(): - selected_patterns.extend(p_list) - else: - selected_patterns = patterns.get(def_type, []) - - if not selected_patterns: - return f"Error: Unknown definition type '{def_type}'" - - # Combine patterns - combined_pattern = "|".join(f"({p})" for p in selected_patterns) - - try: - pattern = re.compile(combined_pattern, re.MULTILINE) - path = Path(search_path).resolve() - - if not path.exists(): - return f"Error: Path not found: {search_path}" - - matches = [] - files_searched = 0 - - for filepath in _get_searchable_files(path, None): - files_searched += 1 - file_matches = _search_file(filepath, pattern, context_lines=5) - if file_matches: - matches.extend(file_matches) - - if len(matches) >= 10: - break - - if not matches: - return f"No definition found for '{name}' ({def_type}) in {files_searched} files" - - output = [f"Found {len(matches)} definition(s) for '{name}':\n"] - for match in matches[:10]: - output.append(_format_match(match)) - - return "\n".join(output) - - except Exception as e: - return f"Error finding definition: {e}" - - -@register_tool( - name="list_functions", - description="List all function/method definitions in a file or directory. Useful for understanding code structure.", - schema=ToolSchema( - properties={ - "path": { - "type": "string", - "description": "File or directory to analyze", - }, - "pattern": { - "type": "string", - "description": "File glob pattern (e.g., '*.py'). Default: auto-detect", - }, - }, - required=["path"], - ), - category="code", -) -async def list_functions(arguments: dict, runtime: "Runtime") -> str: - """ - List function definitions in files. - - Args: - arguments: Search parameters - runtime: The runtime environment - - Returns: - List of functions with file and line numbers - """ - search_path = arguments["path"] - file_pattern = arguments.get("pattern") - - # Patterns for function definitions - func_patterns = [ - (r"^\s*def\s+(\w+)\s*\(", "python"), - (r"^\s*async\s+def\s+(\w+)\s*\(", "python"), - (r"^\s*function\s+(\w+)\s*\(", "javascript"), - (r"^\s*async\s+function\s+(\w+)\s*\(", "javascript"), - (r"^\s*(?:export\s+)?(?:async\s+)?function\s+(\w+)", "javascript"), - (r"^\s*(\w+)\s*[:=]\s*(?:async\s+)?function", "javascript"), - (r"^\s*(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>", "javascript"), - (r"^\s*(?:public|private|protected)?\s*(?:static\s+)?(?:async\s+)?(\w+)\s*\([^)]*\)\s*[:{]", "typescript"), - (r"^\s*func\s+(\w+)\s*\(", "go"), - (r"^\s*(?:public|private|protected)\s+.*\s+(\w+)\s*\([^)]*\)\s*{", "java"), - ] - - combined = "|".join(f"(?:{p})" for p, _ in func_patterns) - pattern = re.compile(combined, re.MULTILINE) - - try: - path = Path(search_path).resolve() - - if not path.exists(): - return f"Error: Path not found: {search_path}" - - results: Dict[str, List[Tuple[int, str]]] = {} - - if path.is_file(): - files_to_search = [path] - else: - files_to_search = _get_searchable_files(path, file_pattern) - - for filepath in files_to_search: - try: - content = filepath.read_text(encoding="utf-8", errors="ignore") - lines = content.splitlines() - - for i, line in enumerate(lines, 1): - match = pattern.search(line) - if match: - # Find the first non-None group (function name) - func_name = next((g for g in match.groups() if g), None) - if func_name: - rel_path = str(filepath.relative_to(path) if path.is_dir() else filepath.name) - if rel_path not in results: - results[rel_path] = [] - results[rel_path].append((i, func_name)) - except Exception: - continue - - if not results: - return f"No functions found in {search_path}" - - output = [f"Functions in {search_path}:\n"] - - for filepath, funcs in sorted(results.items()): - output.append(f"\n{filepath}:") - for line_num, func_name in funcs: - output.append(f" L{line_num}: {func_name}()") - - total = sum(len(f) for f in results.values()) - output.insert(1, f"Found {total} functions in {len(results)} files") - - return "\n".join(output) - - except Exception as e: - return f"Error listing functions: {e}" - - -def _get_searchable_files(path: Path, pattern: Optional[str]) -> List[Path]: - """Get list of searchable files, excluding binary and hidden files.""" - files = [] - - # File extensions to search - code_extensions = { - ".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".go", ".rb", ".php", - ".c", ".cpp", ".h", ".hpp", ".cs", ".rs", ".swift", ".kt", ".scala", - ".sh", ".bash", ".zsh", ".ps1", ".bat", ".cmd", - ".json", ".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf", - ".xml", ".html", ".htm", ".css", ".scss", ".sass", - ".sql", ".graphql", ".md", ".txt", ".env", ".gitignore", - } - - # Directories to skip - skip_dirs = { - ".git", ".svn", ".hg", "node_modules", "__pycache__", ".pytest_cache", - "venv", ".venv", "env", ".env", "dist", "build", "target", ".idea", - ".vscode", "coverage", ".tox", "eggs", "*.egg-info", - } - - for root, dirs, filenames in os.walk(path): - # Skip hidden and common non-code directories - dirs[:] = [d for d in dirs if d not in skip_dirs and not d.startswith(".")] - - for filename in filenames: - filepath = Path(root) / filename - - # Skip hidden files - if filename.startswith("."): - continue - - # Apply glob pattern if specified - if pattern and not filepath.match(pattern): - continue - - # Check extension - if filepath.suffix.lower() in code_extensions or not filepath.suffix: - files.append(filepath) - - return files - - -def _search_file( - filepath: Path, - pattern: re.Pattern, - context_lines: int -) -> List[dict]: - """Search a single file for pattern matches.""" - matches = [] - - try: - content = filepath.read_text(encoding="utf-8", errors="ignore") - lines = content.splitlines() - - for i, line in enumerate(lines): - if pattern.search(line): - # Get context - start = max(0, i - context_lines) - end = min(len(lines), i + context_lines + 1) - - context = [] - for j in range(start, end): - prefix = "→ " if j == i else " " - context.append((j + 1, prefix, lines[j])) - - matches.append({ - "file": str(filepath), - "line": i + 1, - "match": line.strip(), - "context": context, - }) - except Exception: - pass - - return matches - - -def _format_match(match: dict) -> str: - """Format a search match for display.""" - output = [f"\n{'─' * 50}"] - output.append(f"📄 {match['file']}:{match['line']}") - output.append("") - - for line_num, prefix, text in match["context"]: - output.append(f"{line_num:4d} {prefix}{text}") - - return "\n".join(output) diff --git a/ghostcrew/tools/completion/__init__.py b/ghostcrew/tools/completion/__init__.py deleted file mode 100644 index 378f534..0000000 --- a/ghostcrew/tools/completion/__init__.py +++ /dev/null @@ -1,176 +0,0 @@ -"""Task completion tool for GhostCrew agent loop control.""" - -import json -from typing import Any, Dict, List, Optional - -from ..registry import ToolSchema, register_tool - -# Sentinel value to signal task completion -TASK_COMPLETE_SIGNAL = "__TASK_COMPLETE__" - - -@register_tool( - name="finish", - description="Signal that the current task is finished. Call this when you have completed ALL steps of the user's request. Provide a structured report of what was accomplished.", - schema=ToolSchema( - properties={ - "status": { - "type": "string", - "enum": ["success", "partial", "failed"], - "description": "Overall task status: 'success' (all objectives met), 'partial' (some objectives met), 'failed' (unable to complete)", - }, - "summary": { - "type": "string", - "description": "Brief summary of what was accomplished and any key findings", - }, - "findings": { - "type": "array", - "items": {"type": "string"}, - "description": "List of key findings, vulnerabilities discovered, or important observations", - }, - "artifacts": { - "type": "array", - "items": {"type": "string"}, - "description": "List of files created (PoCs, scripts, screenshots, reports)", - }, - "recommendations": { - "type": "array", - "items": {"type": "string"}, - "description": "Suggested next steps or follow-up actions", - }, - }, - required=["status", "summary"], - ), - category="control", -) -async def finish(arguments: dict, runtime) -> str: - """ - Signal task completion to the agent framework with structured output. - - This tool is called by the agent when it has finished all steps - of the user's task. The framework uses this as an explicit - termination signal rather than relying on LLM text output. - - Args: - arguments: Dictionary with structured completion data - runtime: The runtime environment (unused) - - Returns: - The completion signal with structured JSON data - """ - # Build structured completion report - report = CompletionReport( - status=arguments.get("status", "success"), - summary=arguments.get("summary", "Task completed."), - findings=arguments.get("findings", []), - artifacts=arguments.get("artifacts", []), - recommendations=arguments.get("recommendations", []), - ) - - # Return special signal with JSON-encoded report - return f"{TASK_COMPLETE_SIGNAL}:{report.to_json()}" - - -class CompletionReport: - """Structured completion report for task results.""" - - def __init__( - self, - status: str = "success", - summary: str = "", - findings: Optional[List[str]] = None, - artifacts: Optional[List[str]] = None, - recommendations: Optional[List[str]] = None, - ): - self.status = status - self.summary = summary - self.findings = findings or [] - self.artifacts = artifacts or [] - self.recommendations = recommendations or [] - - def to_dict(self) -> Dict[str, Any]: - """Convert to dictionary.""" - return { - "status": self.status, - "summary": self.summary, - "findings": self.findings, - "artifacts": self.artifacts, - "recommendations": self.recommendations, - } - - def to_json(self) -> str: - """Convert to JSON string.""" - return json.dumps(self.to_dict()) - - @classmethod - def from_json(cls, json_str: str) -> "CompletionReport": - """Create from JSON string.""" - data = json.loads(json_str) - return cls(**data) - - def format_display(self) -> str: - """Format for human-readable display.""" - lines = [] - - # Status indicator - status_icons = {"success": "✓", "partial": "◐", "failed": "✗"} - icon = status_icons.get(self.status, "•") - lines.append(f"{icon} Status: {self.status.upper()}") - lines.append("") - - # Summary - lines.append(f"Summary: {self.summary}") - - # Findings - if self.findings: - lines.append("") - lines.append("Findings:") - for finding in self.findings: - lines.append(f" • {finding}") - - # Artifacts - if self.artifacts: - lines.append("") - lines.append("Artifacts:") - for artifact in self.artifacts: - lines.append(f" 📄 {artifact}") - - # Recommendations - if self.recommendations: - lines.append("") - lines.append("Recommendations:") - for rec in self.recommendations: - lines.append(f" → {rec}") - - return "\n".join(lines) - - -def is_task_complete(result: str) -> bool: - """Check if a tool result signals task completion.""" - return result.startswith(TASK_COMPLETE_SIGNAL) - - -def extract_completion_summary(result: str) -> str: - """Extract the summary from a task_complete result (legacy support).""" - if is_task_complete(result): - data = result[len(TASK_COMPLETE_SIGNAL) + 1:] # +1 for the colon - # Try to parse as JSON for new format - try: - report = CompletionReport.from_json(data) - return report.summary - except (json.JSONDecodeError, TypeError): - # Fall back to raw string for legacy format - return data - return result - - -def extract_completion_report(result: str) -> Optional[CompletionReport]: - """Extract the full structured report from a task_complete result.""" - if is_task_complete(result): - data = result[len(TASK_COMPLETE_SIGNAL) + 1:] - try: - return CompletionReport.from_json(data) - except (json.JSONDecodeError, TypeError): - # Legacy format - wrap in report - return CompletionReport(status="success", summary=data) - return None diff --git a/ghostcrew/tools/filesystem/__init__.py b/ghostcrew/tools/filesystem/__init__.py deleted file mode 100644 index 2146a0e..0000000 --- a/ghostcrew/tools/filesystem/__init__.py +++ /dev/null @@ -1,352 +0,0 @@ -"""Filesystem tool for GhostCrew - precise file reading and editing.""" - -import os -import re -from pathlib import Path -from typing import TYPE_CHECKING, List, Optional - -from ..registry import ToolSchema, register_tool - -if TYPE_CHECKING: - from ...runtime import Runtime - - -# Safety: Restrict operations to workspace -_WORKSPACE_ROOT: Optional[Path] = None - - -def set_workspace_root(path: Path) -> None: - """Set the workspace root for safety checks.""" - global _WORKSPACE_ROOT - _WORKSPACE_ROOT = path.resolve() - - -def _validate_path(filepath: str) -> Path: - """Validate and resolve a file path within the workspace.""" - path = Path(filepath).resolve() - - # If workspace root is set, ensure path is within it - if _WORKSPACE_ROOT: - try: - path.relative_to(_WORKSPACE_ROOT) - except ValueError: - raise ValueError(f"Path '{filepath}' is outside workspace root") - - return path - - -@register_tool( - name="read_file", - description="Read contents of a file. Can read entire file or specific line range. Use this to examine source code, configs, or any text file.", - schema=ToolSchema( - properties={ - "path": { - "type": "string", - "description": "Path to the file to read", - }, - "start_line": { - "type": "integer", - "description": "Starting line number (1-indexed). If omitted, reads from beginning.", - }, - "end_line": { - "type": "integer", - "description": "Ending line number (1-indexed, inclusive). If omitted, reads to end.", - }, - }, - required=["path"], - ), - category="filesystem", -) -async def read_file(arguments: dict, runtime: "Runtime") -> str: - """ - Read a file's contents, optionally within a line range. - - Args: - arguments: Dictionary with 'path' and optional 'start_line', 'end_line' - runtime: The runtime environment - - Returns: - File contents with line numbers - """ - filepath = arguments["path"] - start_line = arguments.get("start_line") - end_line = arguments.get("end_line") - - try: - path = _validate_path(filepath) - - if not path.exists(): - return f"Error: File not found: {filepath}" - - if not path.is_file(): - return f"Error: Not a file: {filepath}" - - # Read file content - try: - content = path.read_text(encoding="utf-8") - except UnicodeDecodeError: - content = path.read_text(encoding="latin-1") - - lines = content.splitlines() - total_lines = len(lines) - - # Handle line range - start_idx = (start_line - 1) if start_line else 0 - end_idx = end_line if end_line else total_lines - - # Clamp to valid range - start_idx = max(0, min(start_idx, total_lines)) - end_idx = max(0, min(end_idx, total_lines)) - - if start_idx >= end_idx: - return f"Error: Invalid line range {start_line}-{end_line} (file has {total_lines} lines)" - - # Format output with line numbers - selected_lines = lines[start_idx:end_idx] - output_lines = [] - for i, line in enumerate(selected_lines, start=start_idx + 1): - output_lines.append(f"{i:4d} | {line}") - - header = f"File: {filepath} (lines {start_idx + 1}-{end_idx} of {total_lines})" - return f"{header}\n{'─' * 60}\n" + "\n".join(output_lines) - - except ValueError as e: - return f"Error: {e}" - except Exception as e: - return f"Error reading file: {e}" - - -@register_tool( - name="write_file", - description="Write content to a file. Creates the file if it doesn't exist, or overwrites if it does. Use for creating PoCs, scripts, or config files.", - schema=ToolSchema( - properties={ - "path": { - "type": "string", - "description": "Path to the file to write", - }, - "content": { - "type": "string", - "description": "Content to write to the file", - }, - "append": { - "type": "boolean", - "description": "If true, append to file instead of overwriting. Default: false", - }, - }, - required=["path", "content"], - ), - category="filesystem", -) -async def write_file(arguments: dict, runtime: "Runtime") -> str: - """ - Write content to a file. - - Args: - arguments: Dictionary with 'path', 'content', and optional 'append' - runtime: The runtime environment - - Returns: - Success or error message - """ - filepath = arguments["path"] - content = arguments["content"] - append = arguments.get("append", False) - - try: - path = _validate_path(filepath) - - # Create parent directories if needed - path.parent.mkdir(parents=True, exist_ok=True) - - mode = "a" if append else "w" - with open(path, mode, encoding="utf-8") as f: - f.write(content) - - action = "Appended to" if append else "Wrote" - return f"{action} {len(content)} bytes to {filepath}" - - except ValueError as e: - return f"Error: {e}" - except Exception as e: - return f"Error writing file: {e}" - - -@register_tool( - name="replace_in_file", - description="Replace text in a file. Finds exact match of 'old_string' and replaces with 'new_string'. Include surrounding context in old_string to ensure unique match.", - schema=ToolSchema( - properties={ - "path": { - "type": "string", - "description": "Path to the file to edit", - }, - "old_string": { - "type": "string", - "description": "Exact text to find and replace (include context lines for unique match)", - }, - "new_string": { - "type": "string", - "description": "Text to replace old_string with", - }, - }, - required=["path", "old_string", "new_string"], - ), - category="filesystem", -) -async def replace_in_file(arguments: dict, runtime: "Runtime") -> str: - """ - Replace text in a file. - - Args: - arguments: Dictionary with 'path', 'old_string', 'new_string' - runtime: The runtime environment - - Returns: - Success or error message with diff preview - """ - filepath = arguments["path"] - old_string = arguments["old_string"] - new_string = arguments["new_string"] - - try: - path = _validate_path(filepath) - - if not path.exists(): - return f"Error: File not found: {filepath}" - - # Read current content - try: - content = path.read_text(encoding="utf-8") - except UnicodeDecodeError: - content = path.read_text(encoding="latin-1") - - # Count occurrences - count = content.count(old_string) - - if count == 0: - return f"Error: String not found in {filepath}. Make sure old_string matches exactly (including whitespace)." - - if count > 1: - return f"Error: Found {count} matches in {filepath}. Include more context in old_string to make it unique." - - # Perform replacement - new_content = content.replace(old_string, new_string, 1) - path.write_text(new_content, encoding="utf-8") - - # Show what changed - old_preview = old_string[:100] + "..." if len(old_string) > 100 else old_string - new_preview = new_string[:100] + "..." if len(new_string) > 100 else new_string - - return f"Replaced in {filepath}:\n- {repr(old_preview)}\n+ {repr(new_preview)}" - - except ValueError as e: - return f"Error: {e}" - except Exception as e: - return f"Error replacing in file: {e}" - - -@register_tool( - name="list_directory", - description="List contents of a directory. Shows files and subdirectories with basic info.", - schema=ToolSchema( - properties={ - "path": { - "type": "string", - "description": "Path to the directory to list. Default: current directory", - }, - "recursive": { - "type": "boolean", - "description": "If true, list recursively (max 3 levels). Default: false", - }, - "pattern": { - "type": "string", - "description": "Glob pattern to filter results (e.g., '*.py', '*.js')", - }, - }, - required=[], - ), - category="filesystem", -) -async def list_directory(arguments: dict, runtime: "Runtime") -> str: - """ - List directory contents. - - Args: - arguments: Dictionary with optional 'path', 'recursive', 'pattern' - runtime: The runtime environment - - Returns: - Directory listing - """ - dirpath = arguments.get("path", ".") - recursive = arguments.get("recursive", False) - pattern = arguments.get("pattern") - - try: - path = _validate_path(dirpath) - - if not path.exists(): - return f"Error: Directory not found: {dirpath}" - - if not path.is_dir(): - return f"Error: Not a directory: {dirpath}" - - entries = [] - - if recursive: - # Recursive listing with depth limit - for root, dirs, files in os.walk(path): - root_path = Path(root) - depth = len(root_path.relative_to(path).parts) - if depth > 3: - dirs.clear() # Don't go deeper - continue - - rel_root = root_path.relative_to(path) - prefix = " " * depth - - for d in sorted(dirs): - if not d.startswith('.'): - entries.append(f"{prefix}{d}/") - - for f in sorted(files): - if pattern and not Path(f).match(pattern): - continue - if not f.startswith('.'): - file_path = root_path / f - size = file_path.stat().st_size - entries.append(f"{prefix}{f} ({_format_size(size)})") - else: - # Single-level listing - for item in sorted(path.iterdir()): - if item.name.startswith('.'): - continue - if pattern and not item.match(pattern): - continue - - if item.is_dir(): - entries.append(f"{item.name}/") - else: - size = item.stat().st_size - entries.append(f"{item.name} ({_format_size(size)})") - - if not entries: - return f"Directory {dirpath} is empty" + (f" (pattern: {pattern})" if pattern else "") - - header = f"Directory: {dirpath}" + (f" (pattern: {pattern})" if pattern else "") - return f"{header}\n{'─' * 40}\n" + "\n".join(entries) - - except ValueError as e: - return f"Error: {e}" - except Exception as e: - return f"Error listing directory: {e}" - - -def _format_size(size: int) -> str: - """Format file size in human-readable form.""" - for unit in ['B', 'KB', 'MB', 'GB']: - if size < 1024: - return f"{size:.1f}{unit}" if unit != 'B' else f"{size}B" - size /= 1024 - return f"{size:.1f}TB" diff --git a/ghostcrew/tools/finish/__init__.py b/ghostcrew/tools/finish/__init__.py new file mode 100644 index 0000000..487ba58 --- /dev/null +++ b/ghostcrew/tools/finish/__init__.py @@ -0,0 +1,244 @@ +"""Task completion tool for GhostCrew agent loop control.""" + +import json +from dataclasses import dataclass, field +from enum import Enum +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +from ..registry import Tool as Tool +from ..registry import ToolSchema, register_tool + +if TYPE_CHECKING: + from ...runtime import Runtime + from ..planning import TaskPlan + +# Sentinel value to signal task completion +TASK_COMPLETE_SIGNAL = "__TASK_COMPLETE__" + + +class StepStatus(str, Enum): + PENDING = "pending" + IN_PROGRESS = "in_progress" + COMPLETE = "complete" + SKIPPED = "skipped" + + +@dataclass +class PlanStep: + id: int + description: str + status: StepStatus = StepStatus.PENDING + result: Optional[str] = None + + def to_dict(self) -> dict: + return { + "id": self.id, + "description": self.description, + "status": self.status.value, + "result": self.result, + } + + +@dataclass +class TaskPlan: + steps: list[PlanStep] = field(default_factory=list) + original_request: str = "" + + def is_complete(self) -> bool: + return all( + s.status in (StepStatus.COMPLETE, StepStatus.SKIPPED) for s in self.steps + ) + + def clear(self) -> None: + self.steps.clear() + self.original_request = "" + + def get_current_step(self) -> Optional[PlanStep]: + for s in self.steps: + if s.status == StepStatus.PENDING: + return s + return None + + def get_pending_steps(self) -> list[PlanStep]: + return [s for s in self.steps if s.status == StepStatus.PENDING] + + +@register_tool( + name="finish", + description="Mark plan steps as complete or skipped. Actions: 'complete' (mark step done), 'skip' (step not applicable). Once all steps are complete/skipped, task automatically finishes.", + schema=ToolSchema( + properties={ + "action": { + "type": "string", + "enum": ["complete", "skip"], + "description": "Action: 'complete' (mark step done) or 'skip' (step not applicable)", + }, + "step_id": { + "type": "integer", + "description": "Step number (1-indexed)", + }, + "result": { + "type": "string", + "description": "[complete only] What was accomplished", + }, + "reason": { + "type": "string", + "description": "[skip only] Why step is being skipped", + }, + }, + required=["action", "step_id"], + ), + category="workflow", +) +async def finish(arguments: dict, runtime: "Runtime") -> str: + """Mark plan steps as complete or skipped. Accesses plan via runtime.plan.""" + action = arguments.get("action", "") + step_id = arguments.get("step_id") + + # Access plan from runtime (set by agent) + plan = getattr(runtime, "plan", None) + if plan is None or len(plan.steps) == 0: + return "No active plan." + + # Find the step + step = next((s for s in plan.steps if s.id == step_id), None) + if not step: + valid_ids = [s.id for s in plan.steps] + return f"Error: Step {step_id} not found. Valid IDs: {valid_ids}" + + if action == "complete": + result = arguments.get("result", "") + step.status = StepStatus.COMPLETE + step.result = result + + # Build response + lines = [f"Step {step_id} complete"] + if result: + lines.append(f"Result: {result}") + + # Check if all done + if plan.is_complete(): + lines.append("All steps complete") + else: + next_step = plan.get_current_step() + if next_step: + lines.append(f"Next: Step {next_step.id}") + + return "\n".join(lines) + + elif action == "skip": + reason = arguments.get("reason", "") + if not reason: + return "Error: 'reason' required for skip action." + + step.status = StepStatus.SKIPPED + step.result = f"Skipped: {reason}" + return f"Step {step_id} skipped: {reason}" + + else: + return f"Error: Unknown action '{action}'. Use 'complete' or 'skip'." + + +class CompletionReport: + """Structured completion report for task results.""" + + def __init__( + self, + status: str = "success", + summary: str = "", + findings: Optional[List[str]] = None, + artifacts: Optional[List[str]] = None, + recommendations: Optional[List[str]] = None, + ): + self.status = status + self.summary = summary + self.findings = findings or [] + self.artifacts = artifacts or [] + self.recommendations = recommendations or [] + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + return { + "status": self.status, + "summary": self.summary, + "findings": self.findings, + "artifacts": self.artifacts, + "recommendations": self.recommendations, + } + + def to_json(self) -> str: + """Convert to JSON string.""" + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> "CompletionReport": + """Create from JSON string.""" + data = json.loads(json_str) + return cls(**data) + + def format_display(self) -> str: + """Format for human-readable display.""" + lines = [] + + # Status indicator + status_icons = {"success": "✓", "partial": "◐", "failed": "✗"} + icon = status_icons.get(self.status, "•") + lines.append(f"{icon} Status: {self.status.upper()}") + lines.append("") + + # Summary + lines.append(f"Summary: {self.summary}") + + # Findings + if self.findings: + lines.append("") + lines.append("Findings:") + for finding in self.findings: + lines.append(f" • {finding}") + + # Artifacts + if self.artifacts: + lines.append("") + lines.append("Artifacts:") + for artifact in self.artifacts: + lines.append(f" 📄 {artifact}") + + # Recommendations + if self.recommendations: + lines.append("") + lines.append("Recommendations:") + for rec in self.recommendations: + lines.append(f" → {rec}") + + return "\n".join(lines) + + +def is_task_complete(result: str) -> bool: + """Check if a tool result signals task completion.""" + return result.startswith(TASK_COMPLETE_SIGNAL) + + +def extract_completion_summary(result: str) -> str: + """Extract the summary from a task_complete result (legacy support).""" + if is_task_complete(result): + data = result[len(TASK_COMPLETE_SIGNAL) + 1 :] # +1 for the colon + # Try to parse as JSON for new format + try: + report = CompletionReport.from_json(data) + return report.summary + except (json.JSONDecodeError, TypeError): + # Fall back to raw string for legacy format + return data + return result + + +def extract_completion_report(result: str) -> Optional[CompletionReport]: + """Extract the full structured report from a task_complete result.""" + if is_task_complete(result): + data = result[len(TASK_COMPLETE_SIGNAL) + 1 :] + try: + return CompletionReport.from_json(data) + except (json.JSONDecodeError, TypeError): + # Legacy format - wrap in report + return CompletionReport(status="success", summary=data) + return None diff --git a/ghostcrew/tools/notes/__init__.py b/ghostcrew/tools/notes/__init__.py index 8cb5386..8e1edf2 100644 --- a/ghostcrew/tools/notes/__init__.py +++ b/ghostcrew/tools/notes/__init__.py @@ -1,5 +1,6 @@ """Notes tool for GhostCrew - persistent key findings storage.""" +import asyncio import json from pathlib import Path from typing import Dict @@ -9,28 +10,42 @@ from ..registry import ToolSchema, register_tool # Notes storage - kept at loot root for easy access _notes: Dict[str, str] = {} _notes_file: Path = Path("loot/notes.json") +# Lock for safe concurrent access from multiple agents (asyncio since agents are async tasks) +_notes_lock = asyncio.Lock() -def _load_notes() -> None: - """Load notes from file.""" +def _load_notes_unlocked() -> None: + """Load notes from file (caller must hold lock).""" global _notes if _notes_file.exists(): try: - _notes = json.loads(_notes_file.read_text()) + _notes = json.loads(_notes_file.read_text(encoding="utf-8")) except (json.JSONDecodeError, IOError): _notes = {} -def _save_notes() -> None: - """Save notes to file.""" +def _save_notes_unlocked() -> None: + """Save notes to file (caller must hold lock).""" _notes_file.parent.mkdir(parents=True, exist_ok=True) - _notes_file.write_text(json.dumps(_notes, indent=2)) + _notes_file.write_text(json.dumps(_notes, indent=2), encoding="utf-8") -def get_all_notes() -> Dict[str, str]: +async def get_all_notes() -> Dict[str, str]: """Get all notes (for TUI /notes command).""" - if not _notes: - _load_notes() + async with _notes_lock: + if not _notes: + _load_notes_unlocked() + return _notes.copy() + + +def get_all_notes_sync() -> Dict[str, str]: + """Get all notes synchronously (read-only, best effort for prompts).""" + # If notes are empty, try to load from disk (safe read) + if not _notes and _notes_file.exists(): + try: + return json.loads(_notes_file.read_text(encoding="utf-8")) + except (json.JSONDecodeError, IOError): + pass return _notes.copy() @@ -38,11 +53,12 @@ def set_notes_file(path: Path) -> None: """Set custom notes file path.""" global _notes_file _notes_file = path - _load_notes() + # Can't use async here, so load without lock (called at init time) + _load_notes_unlocked() -# Load notes on module import -_load_notes() +# Load notes on module import (init time, no contention yet) +_load_notes_unlocked() @register_tool( @@ -83,58 +99,59 @@ async def notes(arguments: dict, runtime) -> str: key = arguments.get("key", "").strip() value = arguments.get("value", "") - if action == "create": - if not key: - return "Error: key is required for create" - if not value: - return "Error: value is required for create" - if key in _notes: - return f"Error: note '{key}' already exists. Use 'update' to modify." + async with _notes_lock: + if action == "create": + if not key: + return "Error: key is required for create" + if not value: + return "Error: value is required for create" + if key in _notes: + return f"Error: note '{key}' already exists. Use 'update' to modify." - _notes[key] = value - _save_notes() - return f"Created note '{key}'" + _notes[key] = value + _save_notes_unlocked() + return f"Created note '{key}'" - elif action == "read": - if not key: - return "Error: key is required for read" - if key not in _notes: - return f"Note '{key}' not found" + elif action == "read": + if not key: + return "Error: key is required for read" + if key not in _notes: + return f"Note '{key}' not found" - return f"[{key}] {_notes[key]}" + return f"[{key}] {_notes[key]}" - elif action == "update": - if not key: - return "Error: key is required for update" - if not value: - return "Error: value is required for update" + elif action == "update": + if not key: + return "Error: key is required for update" + if not value: + return "Error: value is required for update" - existed = key in _notes - _notes[key] = value - _save_notes() - return f"{'Updated' if existed else 'Created'} note '{key}'" + existed = key in _notes + _notes[key] = value + _save_notes_unlocked() + return f"{'Updated' if existed else 'Created'} note '{key}'" - elif action == "delete": - if not key: - return "Error: key is required for delete" - if key not in _notes: - return f"Note '{key}' not found" + elif action == "delete": + if not key: + return "Error: key is required for delete" + if key not in _notes: + return f"Note '{key}' not found" - del _notes[key] - _save_notes() - return f"Deleted note '{key}'" + del _notes[key] + _save_notes_unlocked() + return f"Deleted note '{key}'" - elif action == "list": - if not _notes: - return "No notes saved" + elif action == "list": + if not _notes: + return "No notes saved" - lines = [f"Notes ({len(_notes)} entries):"] - for k, v in _notes.items(): - # Truncate long values for display - display_val = v if len(v) <= 60 else v[:57] + "..." - lines.append(f" [{k}] {display_val}") + lines = [f"Notes ({len(_notes)} entries):"] + for k, v in _notes.items(): + # Truncate long values for display + display_val = v if len(v) <= 60 else v[:57] + "..." + lines.append(f" [{k}] {display_val}") - return "\n".join(lines) + return "\n".join(lines) - else: - return f"Unknown action: {action}" + else: + return f"Unknown action: {action}"