diff --git a/ghostcrew/agents/base_agent.py b/ghostcrew/agents/base_agent.py index 7d82521..c425fbb 100644 --- a/ghostcrew/agents/base_agent.py +++ b/ghostcrew/agents/base_agent.py @@ -315,6 +315,31 @@ class BaseAgent(ABC): ) yield display_msg + # Check for plan failure (Tactical Replanning) + if ( + hasattr(self._task_plan, "has_failure") + and self._task_plan.has_failure() + ): + # Find the failed step + failed_step = None + for s in self._task_plan.steps: + if s.status == "fail": + failed_step = s + break + + if failed_step: + replan_msg = await self._replan(failed_step) + if replan_msg: + self.conversation_history.append(replan_msg) + yield replan_msg + + # Check if replan indicated impossibility + if replan_msg.metadata.get("replan_impossible"): + self.state_manager.transition_to(AgentState.COMPLETE) + return + + continue + # Check if plan is now complete if self._task_plan.is_complete(): # All steps done - generate final summary @@ -590,6 +615,117 @@ Call the create_plan tool with your steps.""" return error_msg return error_msg + async def _replan(self, failed_step: Any) -> Optional[AgentMessage]: + """ + Handle plan failure by generating a new plan (Tactical Replanning). + """ + from ..tools.finish import PlanStep + from ..tools.registry import Tool, ToolSchema + + # 1. Archive current plan (log it) + old_plan_str = "\n".join( + [f"{s.id}. {s.description} ({s.status})" for s in self._task_plan.steps] + ) + + # 2. Generate new plan + # Create a temporary tool for plan generation + plan_generator_tool = Tool( + name="create_plan", + description="Create a NEW step-by-step plan. Call this with the steps needed.", + schema=ToolSchema( + properties={ + "feasible": { + "type": "boolean", + "description": "Can the task be completed with a new plan? Set false if impossible/out-of-scope.", + }, + "steps": { + "type": "array", + "items": {"type": "string"}, + "description": "List of actionable steps (required if feasible=true).", + }, + "reason": { + "type": "string", + "description": "Reason for the new plan OR reason why it's impossible.", + }, + }, + required=["feasible", "reason"], + ), + execute_fn=lambda args, runtime: None, + category="planning", + ) + + replan_prompt = f"""The previous plan failed at step {failed_step.id}. + +Failed Step: {failed_step.description} +Reason: {failed_step.result} + +Previous Plan: +{old_plan_str} + +Original Request: {self._task_plan.original_request} + +Task: Generate a NEW plan (v2) that addresses this failure. +- If the failure invalidates the entire approach, try a different tactical approach. +- If the task is IMPOSSIBLE or OUT OF SCOPE (e.g., requires installing software on a remote target, physical access, or permissions you don't have), set feasible=False. +- Do NOT propose steps that violate standard pentest constraints (no installing agents/services on targets unless exploited). + +Call create_plan with the new steps OR feasible=False.""" + + try: + response = await self.llm.generate( + system_prompt="You are a tactical planning assistant. The previous plan failed. Create a new one or declare it impossible.", + messages=[{"role": "user", "content": replan_prompt}], + tools=[plan_generator_tool], + ) + + # Extract steps + steps = [] + feasible = True + reason = "" + + if response.tool_calls: + for tc in response.tool_calls: + args = self._parse_arguments(tc) + feasible = args.get("feasible", True) + reason = args.get("reason", "") + if feasible and args.get("steps"): + steps = args["steps"] + break + + if not feasible: + return AgentMessage( + role="assistant", + content=f"Task determined to be infeasible after failure.\nReason: {reason}", + metadata={"replan_impossible": True}, + ) + + if not steps: + return None + + # Update plan + self._task_plan.steps = [ + PlanStep(id=i + 1, description=str(step).strip()) + for i, step in enumerate(steps) + ] + + # Return message + plan_display = [f"Plan v2 (Replanned) - {reason}:"] + for step in self._task_plan.steps: + plan_display.append(f" {step.id}. {step.description}") + + return AgentMessage( + role="assistant", + content="\n".join(plan_display), + metadata={"replanned": True}, + ) + + except Exception as e: + return AgentMessage( + role="assistant", + content=f"Replanning failed: {str(e)}", + metadata={"replan_failed": True}, + ) + def reset(self): """Reset the agent state for a new conversation.""" self.state_manager.reset() diff --git a/ghostcrew/agents/crew/models.py b/ghostcrew/agents/crew/models.py index 228a31e..a8d722e 100644 --- a/ghostcrew/agents/crew/models.py +++ b/ghostcrew/agents/crew/models.py @@ -22,6 +22,7 @@ class AgentStatus(Enum): COMPLETE = "complete" WARNING = "warning" # Completed but hit max iterations ERROR = "error" + FAILED = "failed" # Task determined infeasible CANCELLED = "cancelled" diff --git a/ghostcrew/agents/crew/tools.py b/ghostcrew/agents/crew/tools.py index 49ac54d..a3b70ad 100644 --- a/ghostcrew/agents/crew/tools.py +++ b/ghostcrew/agents/crew/tools.py @@ -109,6 +109,48 @@ Present concrete findings. Be factual and concise about what was discovered. return response.content + async def formulate_strategy_fn(arguments: dict, runtime: "Runtime") -> str: + """Formulate and select a strategic Course of Action (COA).""" + problem = arguments.get("problem", "") + candidates = arguments.get("candidates", []) + selected_id = arguments.get("selected_id", "") + rationale = arguments.get("rationale", "") + feasible = arguments.get("feasible", True) + + if not problem: + return "Error: problem is required." + + if not feasible: + return f"## Strategic Decision: TERMINATE MISSION\n**Problem:** {problem}\n**Rationale:** {rationale}\n\nMission marked as infeasible." + + if not candidates or not selected_id: + return "Error: candidates and selected_id are required when feasible=True." + + # Find selected candidate + selected = next((c for c in candidates if c.get("id") == selected_id), None) + if not selected: + return f"Error: Selected ID '{selected_id}' not found in candidates." + + # Format the decision for the log/history + output = [ + f"## Strategic Decision: {selected.get('name', 'Unknown')}", + f"**Problem:** {problem}", + "", + "**Considered Options:**", + ] + + for c in candidates: + mark = "(SELECTED)" if c.get("id") == selected_id else "" + output.append(f"- **{c.get('name')}** {mark}") + output.append(f" - Pros: {c.get('pros')}") + output.append(f" - Cons: {c.get('cons')}") + output.append(f" - Risk: {c.get('risk')}") + + output.append("") + output.append(f"**Rationale:** {rationale}") + + return "\n".join(output) + # Create Tool objects tools = [ Tool( @@ -192,6 +234,53 @@ Present concrete findings. Be factual and concise about what was discovered. execute_fn=synthesize_findings_fn, category="orchestration", ), + Tool( + name="formulate_strategy", + description="Define and select a strategic Course of Action (COA). Use this when facing a strategic blocker or choosing an initial approach. This logs the decision process. Set feasible=False to terminate if no options exist.", + schema=ToolSchema( + type="object", + properties={ + "problem": { + "type": "string", + "description": "The strategic problem or blocker encountered.", + }, + "feasible": { + "type": "boolean", + "description": "Whether a feasible solution exists. Set to False to terminate the mission.", + "default": True, + }, + "candidates": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": {"type": "string"}, + "name": {"type": "string"}, + "pros": {"type": "string"}, + "cons": {"type": "string"}, + "risk": { + "type": "string", + "enum": ["Low", "Medium", "High", "Critical"], + }, + }, + "required": ["id", "name", "pros", "cons", "risk"], + }, + "description": "List of potential Courses of Action (COAs). Required if feasible=True.", + }, + "selected_id": { + "type": "string", + "description": "The ID of the selected COA. Required if feasible=True.", + }, + "rationale": { + "type": "string", + "description": "Why this COA was selected over others (or why mission is infeasible).", + }, + }, + required=["problem", "rationale"], + ), + execute_fn=formulate_strategy_fn, + category="orchestration", + ), ] return tools diff --git a/ghostcrew/agents/crew/worker_pool.py b/ghostcrew/agents/crew/worker_pool.py index 61f484c..4025d4c 100644 --- a/ghostcrew/agents/crew/worker_pool.py +++ b/ghostcrew/agents/crew/worker_pool.py @@ -123,6 +123,8 @@ class WorkerPool: try: final_response = "" hit_max_iterations = False + is_infeasible = False + async for response in agent.agent_loop(worker.task): # Track tool calls if response.tool_calls: @@ -141,17 +143,28 @@ class WorkerPool: if response.content and not response.tool_calls: final_response = response.content - # Check if max iterations was hit - if response.metadata and response.metadata.get( - "max_iterations_reached" - ): - hit_max_iterations = True + # Check metadata flags + if response.metadata: + if response.metadata.get("max_iterations_reached"): + hit_max_iterations = True + if response.metadata.get("replan_impossible"): + is_infeasible = True worker.result = final_response or "No findings." worker.completed_at = time.time() self._results[worker.id] = worker.result - if hit_max_iterations: + if is_infeasible: + worker.status = AgentStatus.FAILED + self._emit( + worker.id, + "failed", + { + "summary": worker.result[:200], + "reason": "Task determined infeasible", + }, + ) + elif hit_max_iterations: worker.status = AgentStatus.WARNING self._emit( worker.id, diff --git a/ghostcrew/agents/prompts/ghost_agent.jinja b/ghostcrew/agents/prompts/ghost_agent.jinja index eceec6e..b5a39aa 100644 --- a/ghostcrew/agents/prompts/ghost_agent.jinja +++ b/ghostcrew/agents/prompts/ghost_agent.jinja @@ -8,7 +8,9 @@ You are operating in an authorized penetration testing engagement. The user has ## Workflow (CRITICAL) 1. **Execute Plan**: Follow the plan steps below. 2. **Execute**: Work through each step in order using the appropriate tools. -3. **Track Progress**: After completing each step, IMMEDIATELY call `finish(action="complete", step_id=N, result="what you did")`. If a step cannot be done, call `finish(action="skip", step_id=N, reason="why")`. +3. **Track Progress**: After completing each step, IMMEDIATELY call `finish(action="complete", step_id=N, result="what you did")`. + - If a step cannot be done but the plan is still valid, call `finish(action="skip", step_id=N, reason="why")`. + - If a step fails in a way that INVALIDATES the plan (e.g., assumption wrong), call `finish(action="fail", step_id=N, reason="why")`. This triggers replanning. 4. **Task Completion**: The task ends automatically when ALL steps are marked complete or skipped. The controller blocks finishing until the plan is complete. ## Guidelines diff --git a/ghostcrew/agents/prompts/ghost_crew.jinja b/ghostcrew/agents/prompts/ghost_crew.jinja index 9645186..1b231b4 100644 --- a/ghostcrew/agents/prompts/ghost_crew.jinja +++ b/ghostcrew/agents/prompts/ghost_crew.jinja @@ -33,6 +33,7 @@ You manage agents using these tools: - **wait_for_agents**: Wait for running agents and collect their findings - **get_agent_status**: Check on a specific agent - **cancel_agent**: Stop an agent if needed +- **formulate_strategy**: Define and select a Course of Action (COA). Use this for strategic decisions. - **synthesize_findings**: Compile all results into a final concise report (call this when done) {% if worker_tools %} @@ -48,6 +49,17 @@ Workers also have access to these CLI tools via terminal: IMPORTANT: When spawning agents, be specific about which tool to use (e.g., "Use mcp_nmap_scan to..." or "Use mcp_metasploit_run_module to..."). Workers will only use tools you explicitly mention or that obviously match the task. +## Strategy & Replanning (COA) +You are responsible for STRATEGIC adaptation. +- If a worker fails due to a tactical issue (e.g., port closed), they will handle it. +- If a worker fails due to a STRATEGIC issue (e.g., "Approach invalid: Target is not domain joined"), you must REPLAN. +- Do not just retry the same task. +- **Use the `formulate_strategy` tool** to explicitly reason through options: + 1. Define the problem (e.g., "SMB exploit failed, target patched"). + 2. List 2-3 candidate Courses of Action (COAs). + 3. Select the best one based on risk and probability of success. + 4. THEN spawn new agents to execute the selected COA. + ## Guidelines - Leverage any prior intelligence from earlier reconnaissance - Be strategic - spawn 2-4 agents in parallel for efficiency diff --git a/ghostcrew/interface/tui.py b/ghostcrew/interface/tui.py index a2794d5..eedb4d2 100644 --- a/ghostcrew/interface/tui.py +++ b/ghostcrew/interface/tui.py @@ -1393,6 +1393,14 @@ Be concise. Use the actual data from notes.""" wtype = worker.get("worker_type", "worker") self._add_system(f"[!] {wtype.upper()} stopped: {reason}") self._update_crew_stats() + elif event_type == "failed": + # Worker determined task infeasible + self._update_crew_worker(worker_id, status="failed") + reason = data.get("reason", "Task infeasible") + worker = self._crew_workers.get(worker_id, {}) + wtype = worker.get("worker_type", "worker") + self._add_system(f"[!] {wtype.upper()} failed: {reason}") + self._update_crew_stats() elif event_type == "error": self._update_crew_worker(worker_id, status="error") worker = self._crew_workers.get(worker_id, {}) diff --git a/ghostcrew/knowledge/graph.py b/ghostcrew/knowledge/graph.py index c17f162..6b08f17 100644 --- a/ghostcrew/knowledge/graph.py +++ b/ghostcrew/knowledge/graph.py @@ -79,16 +79,23 @@ class ShadowGraph: content = note_data category = "info" metadata = {} + status = "confirmed" else: content = note_data.get("content", "") category = note_data.get("category", "info") metadata = note_data.get("metadata", {}) + status = note_data.get("status", "confirmed") - self._process_note(key, content, category, metadata) + self._process_note(key, content, category, metadata, status) self._processed_notes.add(key) def _process_note( - self, key: str, content: str, category: str, metadata: Dict[str, Any] + self, + key: str, + content: str, + category: str, + metadata: Dict[str, Any], + status: str, ) -> None: """Extract entities and relationships from a single note.""" @@ -121,11 +128,11 @@ class ShadowGraph: # 2. Handle specific categories if category == "credential": - self._process_credential(key, content, hosts, metadata) + self._process_credential(key, content, hosts, metadata, status) elif category == "finding": - self._process_finding(key, content, hosts, metadata) + self._process_finding(key, content, hosts, metadata, status) elif category == "vulnerability": - self._process_vulnerability(key, content, hosts, metadata) + self._process_vulnerability(key, content, hosts, metadata, status) # 3. Link note to hosts (provenance) # We don't add the note itself as a node usually, but we could. @@ -142,9 +149,18 @@ class ShadowGraph: self.graph.add_edge(source, target, type=edge_type, **kwargs) def _process_credential( - self, key: str, content: str, related_hosts: List[str], metadata: Dict[str, Any] + self, + key: str, + content: str, + related_hosts: List[str], + metadata: Dict[str, Any], + status: str, ) -> None: """Process a credential note.""" + # Skip if status is closed/filtered (invalid creds) + if status in ["closed", "filtered"]: + return + # Extract username from metadata or regex username = metadata.get("username") if not username: @@ -189,9 +205,18 @@ class ShadowGraph: self._add_edge(cred_id, host_id, "AUTH_ACCESS", protocol=protocol) def _process_finding( - self, key: str, content: str, related_hosts: List[str], metadata: Dict[str, Any] + self, + key: str, + content: str, + related_hosts: List[str], + metadata: Dict[str, Any], + status: str, ) -> None: """Process a finding note (e.g., open ports).""" + # Skip if status is closed/filtered + if status in ["closed", "filtered"]: + return + # Filter related_hosts: If we have explicit target metadata, ONLY use that. # Otherwise, use all related hosts (fallback to regex behavior). target_hosts = related_hosts @@ -232,9 +257,18 @@ class ShadowGraph: self._add_edge(host_id, service_id, "HAS_SERVICE", protocol=proto) def _process_vulnerability( - self, key: str, content: str, related_hosts: List[str], metadata: Dict[str, Any] + self, + key: str, + content: str, + related_hosts: List[str], + metadata: Dict[str, Any], + status: str, ) -> None: """Process a vulnerability note.""" + # Skip if status is closed/filtered (patched or not vulnerable) + if status in ["closed", "filtered"]: + return + # Filter related_hosts: If we have explicit target metadata, ONLY use that. target_hosts = related_hosts if metadata.get("target"): diff --git a/ghostcrew/runtime/runtime.py b/ghostcrew/runtime/runtime.py index 35b183a..677ef74 100644 --- a/ghostcrew/runtime/runtime.py +++ b/ghostcrew/runtime/runtime.py @@ -384,24 +384,42 @@ class LocalRuntime(Runtime): async def execute_command(self, command: str, timeout: int = 300) -> CommandResult: """Execute a command locally.""" import asyncio + import os + import re import subprocess + # Regex to strip ANSI escape codes + ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") + + # Set environment variables to discourage ANSI output + env = os.environ.copy() + env["TERM"] = "dumb" + env["NO_COLOR"] = "1" + try: process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=subprocess.DEVNULL, # Prevent interactive prompts + env=env, ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout ) + # Decode and strip ANSI codes + stdout_str = stdout.decode(errors="replace") + stderr_str = stderr.decode(errors="replace") + + stdout_clean = ansi_escape.sub("", stdout_str) + stderr_clean = ansi_escape.sub("", stderr_str) + return CommandResult( exit_code=process.returncode or 0, - stdout=stdout.decode(errors="replace"), - stderr=stderr.decode(errors="replace"), + stdout=stdout_clean, + stderr=stderr_clean, ) except asyncio.TimeoutError: diff --git a/ghostcrew/tools/finish/__init__.py b/ghostcrew/tools/finish/__init__.py index 487ba58..f768c4b 100644 --- a/ghostcrew/tools/finish/__init__.py +++ b/ghostcrew/tools/finish/__init__.py @@ -20,7 +20,8 @@ class StepStatus(str, Enum): PENDING = "pending" IN_PROGRESS = "in_progress" COMPLETE = "complete" - SKIPPED = "skipped" + SKIP = "skip" + FAIL = "fail" @dataclass @@ -46,9 +47,12 @@ class TaskPlan: def is_complete(self) -> bool: return all( - s.status in (StepStatus.COMPLETE, StepStatus.SKIPPED) for s in self.steps + s.status in (StepStatus.COMPLETE, StepStatus.SKIP) for s in self.steps ) + def has_failure(self) -> bool: + return any(s.status == StepStatus.FAIL for s in self.steps) + def clear(self) -> None: self.steps.clear() self.original_request = "" @@ -65,13 +69,13 @@ class TaskPlan: @register_tool( name="finish", - description="Mark plan steps as complete or skipped. Actions: 'complete' (mark step done), 'skip' (step not applicable). Once all steps are complete/skipped, task automatically finishes.", + description="Mark plan steps as complete, skipped, or failed. Actions: 'complete' (mark step done), 'skip' (step not applicable), 'fail' (step failed, invalidating plan). Once all steps are complete/skipped, task automatically finishes.", schema=ToolSchema( properties={ "action": { "type": "string", - "enum": ["complete", "skip"], - "description": "Action: 'complete' (mark step done) or 'skip' (step not applicable)", + "enum": ["complete", "skip", "fail"], + "description": "Action: 'complete', 'skip', or 'fail'", }, "step_id": { "type": "integer", @@ -83,7 +87,7 @@ class TaskPlan: }, "reason": { "type": "string", - "description": "[skip only] Why step is being skipped", + "description": "[skip/fail only] Why step is being skipped or failed", }, }, required=["action", "step_id"], @@ -131,12 +135,21 @@ async def finish(arguments: dict, runtime: "Runtime") -> str: if not reason: return "Error: 'reason' required for skip action." - step.status = StepStatus.SKIPPED + step.status = StepStatus.SKIP step.result = f"Skipped: {reason}" return f"Step {step_id} skipped: {reason}" + elif action == "fail": + reason = arguments.get("reason", "") + if not reason: + return "Error: 'reason' required for fail action." + + step.status = StepStatus.FAIL + step.result = f"Failed: {reason}" + return f"Step {step_id} marked as FAILED: {reason}. Initiating replanning..." + else: - return f"Error: Unknown action '{action}'. Use 'complete' or 'skip'." + return f"Error: Unknown action '{action}'. Use 'complete', 'skip', or 'fail'." class CompletionReport: diff --git a/ghostcrew/tools/notes/__init__.py b/ghostcrew/tools/notes/__init__.py index f01a5cb..7d03e43 100644 --- a/ghostcrew/tools/notes/__init__.py +++ b/ghostcrew/tools/notes/__init__.py @@ -119,6 +119,11 @@ _load_notes_unlocked() "enum": ["high", "medium", "low"], "description": "Confidence level (default: medium)", }, + "status": { + "type": "string", + "enum": ["open", "closed", "filtered", "confirmed", "potential"], + "description": "Status of the finding (e.g., 'open' for ports, 'closed' for dead services). Default: confirmed/open.", + }, "source": { "type": "string", "description": "Source IP/Hostname where the finding originated (e.g., where creds were found)", @@ -185,6 +190,7 @@ async def notes(arguments: dict, runtime) -> str: category = "info" confidence = arguments.get("confidence", "medium") + status = arguments.get("status", "confirmed") # Extract structured metadata metadata = {} @@ -214,10 +220,11 @@ async def notes(arguments: dict, runtime) -> str: "content": value, "category": category, "confidence": confidence, + "status": status, "metadata": metadata, } _save_notes_unlocked() - return f"Created note '{key}' ({category})" + return f"Created note '{key}' ({category}, {status})" elif action == "read": if not key: @@ -231,7 +238,8 @@ async def notes(arguments: dict, runtime) -> str: if note.get("metadata") else "" ) - return f"[{key}] ({note['category']}, {note['confidence']}) {note['content']}{meta_str}" + status_str = f", {note.get('status', 'confirmed')}" + return f"[{key}] ({note['category']}, {note['confidence']}{status_str}) {note['content']}{meta_str}" elif action == "update": if not key: @@ -246,6 +254,7 @@ async def notes(arguments: dict, runtime) -> str: "content": value, "category": category, "confidence": confidence, + "status": status, "metadata": metadata, } _save_notes_unlocked() diff --git a/tests/test_notes.py b/tests/test_notes.py index a17a0f2..bfd1556 100644 --- a/tests/test_notes.py +++ b/tests/test_notes.py @@ -62,8 +62,8 @@ async def test_read_note(): }, runtime=None) assert "Content to read" in result - # The format is "[key] (category, confidence) content" - assert "(info, medium)" in result + # The format is "[key] (category, confidence, status) content" + assert "(info, medium, confirmed)" in result @pytest.mark.asyncio async def test_update_note():