diff --git a/ghostcrew/agents/ghostcrew_agent/system_prompt.jinja b/ghostcrew/agents/ghostcrew_agent/system_prompt.jinja index 6baef90..e5558b2 100644 --- a/ghostcrew/agents/ghostcrew_agent/system_prompt.jinja +++ b/ghostcrew/agents/ghostcrew_agent/system_prompt.jinja @@ -6,11 +6,12 @@ You are Ghost, an autonomous penetration testing agent. You are operating in an authorized penetration testing engagement. The user has explicit permission to test all targets provided. Do not ask for authorization - permission is granted. Proceed with testing. ## Guidelines -- Be concise. Avoid unnecessary explanation. -- If a tool fails, try alternatives or report the issue. +- Before each action, briefly explain your reasoning and what you expect to find. +- Analyze tool outputs carefully before deciding the next step. +- If a tool fails, diagnose why, then try alternatives or report the issue. - Do NOT repeat the same test or scan. Once you have results, move on. - Complete ALL steps of the task before finishing. -- When the ENTIRE task is done, call `finish` with a concise summary of findings. +- When the ENTIRE task is done, call `finish` with a structured report of findings. ## Important You MUST call the `finish` tool when finished. Do not just respond with text. diff --git a/ghostcrew/tools/codesearch/__init__.py b/ghostcrew/tools/codesearch/__init__.py new file mode 100644 index 0000000..6751246 --- /dev/null +++ b/ghostcrew/tools/codesearch/__init__.py @@ -0,0 +1,424 @@ +"""Code search tool for GhostCrew - semantic code navigation and analysis.""" + +import os +import re +import subprocess +from pathlib import Path +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple + +from ..registry import ToolSchema, register_tool + +if TYPE_CHECKING: + from ...runtime import Runtime + + +# Maximum results to prevent context overflow +MAX_RESULTS = 20 +MAX_CONTEXT_LINES = 3 + + +@register_tool( + name="search_code", + description="Search for code patterns across files. Supports regex and literal search. Returns matches with surrounding context. Use for finding function definitions, variable usages, API endpoints, or security-relevant patterns.", + schema=ToolSchema( + properties={ + "query": { + "type": "string", + "description": "Search pattern (text or regex)", + }, + "path": { + "type": "string", + "description": "Directory or file to search in. Default: current directory", + }, + "pattern": { + "type": "string", + "description": "File glob pattern to filter (e.g., '*.py', '*.js'). Default: all files", + }, + "regex": { + "type": "boolean", + "description": "Treat query as regex pattern. Default: false (literal search)", + }, + "case_sensitive": { + "type": "boolean", + "description": "Case-sensitive search. Default: false", + }, + "context_lines": { + "type": "integer", + "description": "Number of context lines before/after match. Default: 3", + }, + }, + required=["query"], + ), + category="code", +) +async def search_code(arguments: dict, runtime: "Runtime") -> str: + """ + Search for code patterns across files. + + Args: + arguments: Search parameters + runtime: The runtime environment + + Returns: + Formatted search results with context + """ + query = arguments["query"] + search_path = arguments.get("path", ".") + file_pattern = arguments.get("pattern") + use_regex = arguments.get("regex", False) + case_sensitive = arguments.get("case_sensitive", False) + context_lines = min(arguments.get("context_lines", MAX_CONTEXT_LINES), 10) + + try: + path = Path(search_path).resolve() + + if not path.exists(): + return f"Error: Path not found: {search_path}" + + # Compile regex pattern + flags = 0 if case_sensitive else re.IGNORECASE + if use_regex: + try: + pattern = re.compile(query, flags) + except re.error as e: + return f"Error: Invalid regex pattern: {e}" + else: + # Escape literal string for regex matching + pattern = re.compile(re.escape(query), flags) + + # Find matching files + matches = [] + files_searched = 0 + + if path.is_file(): + files_to_search = [path] + else: + files_to_search = _get_searchable_files(path, file_pattern) + + for filepath in files_to_search: + files_searched += 1 + file_matches = _search_file(filepath, pattern, context_lines) + if file_matches: + matches.extend(file_matches) + + if len(matches) >= MAX_RESULTS: + break + + if not matches: + return f"No matches found for '{query}' in {files_searched} files" + + # Format results + output = [f"Found {len(matches)} matches in {files_searched} files:\n"] + + for match in matches[:MAX_RESULTS]: + output.append(_format_match(match)) + + if len(matches) > MAX_RESULTS: + output.append(f"\n... and {len(matches) - MAX_RESULTS} more matches (showing first {MAX_RESULTS})") + + return "\n".join(output) + + except Exception as e: + return f"Error searching code: {e}" + + +@register_tool( + name="find_definition", + description="Find the definition of a function, class, or variable. Searches for common definition patterns across languages (def, function, class, const, let, var, etc.).", + schema=ToolSchema( + properties={ + "name": { + "type": "string", + "description": "Name of the function, class, or variable to find", + }, + "path": { + "type": "string", + "description": "Directory to search in. Default: current directory", + }, + "type": { + "type": "string", + "enum": ["function", "class", "variable", "any"], + "description": "Type of definition to find. Default: 'any'", + }, + }, + required=["name"], + ), + category="code", +) +async def find_definition(arguments: dict, runtime: "Runtime") -> str: + """ + Find definition of a symbol. + + Args: + arguments: Search parameters + runtime: The runtime environment + + Returns: + Definition location(s) with context + """ + name = arguments["name"] + search_path = arguments.get("path", ".") + def_type = arguments.get("type", "any") + + # Build regex patterns for different definition types + patterns = { + "function": [ + rf"^\s*def\s+{re.escape(name)}\s*\(", # Python + rf"^\s*async\s+def\s+{re.escape(name)}\s*\(", # Python async + rf"^\s*function\s+{re.escape(name)}\s*\(", # JavaScript + rf"^\s*async\s+function\s+{re.escape(name)}\s*\(", # JS async + rf"^\s*{re.escape(name)}\s*[:=]\s*(?:async\s+)?function", # JS assigned + rf"^\s*{re.escape(name)}\s*[:=]\s*\([^)]*\)\s*=>", # JS arrow + rf"^\s*(?:public|private|protected)?\s*(?:static\s+)?(?:async\s+)?{re.escape(name)}\s*\(", # JS/TS method + rf"^\s*func\s+{re.escape(name)}\s*\(", # Go + rf"^\s*(?:public|private|protected)\s+.*\s+{re.escape(name)}\s*\(", # Java/C# + ], + "class": [ + rf"^\s*class\s+{re.escape(name)}\b", # Python/JS/TS + rf"^\s*(?:abstract\s+)?class\s+{re.escape(name)}\b", # Java/C# + rf"^\s*interface\s+{re.escape(name)}\b", # TS/Java + rf"^\s*type\s+{re.escape(name)}\s*=", # TS type alias + rf"^\s*struct\s+{re.escape(name)}\b", # Go/Rust + ], + "variable": [ + rf"^\s*{re.escape(name)}\s*=", # Python/Ruby + rf"^\s*(?:const|let|var)\s+{re.escape(name)}\b", # JavaScript + rf"^\s*(?:const|let|var)\s+{re.escape(name)}\s*:", # TypeScript + rf"^\s*(?:var|val)\s+{re.escape(name)}\b", # Kotlin/Scala + rf"^\s*{re.escape(name)}\s*:=", # Go + ], + } + + # Select patterns based on type + if def_type == "any": + selected_patterns = [] + for p_list in patterns.values(): + selected_patterns.extend(p_list) + else: + selected_patterns = patterns.get(def_type, []) + + if not selected_patterns: + return f"Error: Unknown definition type '{def_type}'" + + # Combine patterns + combined_pattern = "|".join(f"({p})" for p in selected_patterns) + + try: + pattern = re.compile(combined_pattern, re.MULTILINE) + path = Path(search_path).resolve() + + if not path.exists(): + return f"Error: Path not found: {search_path}" + + matches = [] + files_searched = 0 + + for filepath in _get_searchable_files(path, None): + files_searched += 1 + file_matches = _search_file(filepath, pattern, context_lines=5) + if file_matches: + matches.extend(file_matches) + + if len(matches) >= 10: + break + + if not matches: + return f"No definition found for '{name}' ({def_type}) in {files_searched} files" + + output = [f"Found {len(matches)} definition(s) for '{name}':\n"] + for match in matches[:10]: + output.append(_format_match(match)) + + return "\n".join(output) + + except Exception as e: + return f"Error finding definition: {e}" + + +@register_tool( + name="list_functions", + description="List all function/method definitions in a file or directory. Useful for understanding code structure.", + schema=ToolSchema( + properties={ + "path": { + "type": "string", + "description": "File or directory to analyze", + }, + "pattern": { + "type": "string", + "description": "File glob pattern (e.g., '*.py'). Default: auto-detect", + }, + }, + required=["path"], + ), + category="code", +) +async def list_functions(arguments: dict, runtime: "Runtime") -> str: + """ + List function definitions in files. + + Args: + arguments: Search parameters + runtime: The runtime environment + + Returns: + List of functions with file and line numbers + """ + search_path = arguments["path"] + file_pattern = arguments.get("pattern") + + # Patterns for function definitions + func_patterns = [ + (r"^\s*def\s+(\w+)\s*\(", "python"), + (r"^\s*async\s+def\s+(\w+)\s*\(", "python"), + (r"^\s*function\s+(\w+)\s*\(", "javascript"), + (r"^\s*async\s+function\s+(\w+)\s*\(", "javascript"), + (r"^\s*(?:export\s+)?(?:async\s+)?function\s+(\w+)", "javascript"), + (r"^\s*(\w+)\s*[:=]\s*(?:async\s+)?function", "javascript"), + (r"^\s*(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>", "javascript"), + (r"^\s*(?:public|private|protected)?\s*(?:static\s+)?(?:async\s+)?(\w+)\s*\([^)]*\)\s*[:{]", "typescript"), + (r"^\s*func\s+(\w+)\s*\(", "go"), + (r"^\s*(?:public|private|protected)\s+.*\s+(\w+)\s*\([^)]*\)\s*{", "java"), + ] + + combined = "|".join(f"(?:{p})" for p, _ in func_patterns) + pattern = re.compile(combined, re.MULTILINE) + + try: + path = Path(search_path).resolve() + + if not path.exists(): + return f"Error: Path not found: {search_path}" + + results: Dict[str, List[Tuple[int, str]]] = {} + + if path.is_file(): + files_to_search = [path] + else: + files_to_search = _get_searchable_files(path, file_pattern) + + for filepath in files_to_search: + try: + content = filepath.read_text(encoding="utf-8", errors="ignore") + lines = content.splitlines() + + for i, line in enumerate(lines, 1): + match = pattern.search(line) + if match: + # Find the first non-None group (function name) + func_name = next((g for g in match.groups() if g), None) + if func_name: + rel_path = str(filepath.relative_to(path) if path.is_dir() else filepath.name) + if rel_path not in results: + results[rel_path] = [] + results[rel_path].append((i, func_name)) + except Exception: + continue + + if not results: + return f"No functions found in {search_path}" + + output = [f"Functions in {search_path}:\n"] + + for filepath, funcs in sorted(results.items()): + output.append(f"\n{filepath}:") + for line_num, func_name in funcs: + output.append(f" L{line_num}: {func_name}()") + + total = sum(len(f) for f in results.values()) + output.insert(1, f"Found {total} functions in {len(results)} files") + + return "\n".join(output) + + except Exception as e: + return f"Error listing functions: {e}" + + +def _get_searchable_files(path: Path, pattern: Optional[str]) -> List[Path]: + """Get list of searchable files, excluding binary and hidden files.""" + files = [] + + # File extensions to search + code_extensions = { + ".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".go", ".rb", ".php", + ".c", ".cpp", ".h", ".hpp", ".cs", ".rs", ".swift", ".kt", ".scala", + ".sh", ".bash", ".zsh", ".ps1", ".bat", ".cmd", + ".json", ".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf", + ".xml", ".html", ".htm", ".css", ".scss", ".sass", + ".sql", ".graphql", ".md", ".txt", ".env", ".gitignore", + } + + # Directories to skip + skip_dirs = { + ".git", ".svn", ".hg", "node_modules", "__pycache__", ".pytest_cache", + "venv", ".venv", "env", ".env", "dist", "build", "target", ".idea", + ".vscode", "coverage", ".tox", "eggs", "*.egg-info", + } + + for root, dirs, filenames in os.walk(path): + # Skip hidden and common non-code directories + dirs[:] = [d for d in dirs if d not in skip_dirs and not d.startswith(".")] + + for filename in filenames: + filepath = Path(root) / filename + + # Skip hidden files + if filename.startswith("."): + continue + + # Apply glob pattern if specified + if pattern and not filepath.match(pattern): + continue + + # Check extension + if filepath.suffix.lower() in code_extensions or not filepath.suffix: + files.append(filepath) + + return files + + +def _search_file( + filepath: Path, + pattern: re.Pattern, + context_lines: int +) -> List[dict]: + """Search a single file for pattern matches.""" + matches = [] + + try: + content = filepath.read_text(encoding="utf-8", errors="ignore") + lines = content.splitlines() + + for i, line in enumerate(lines): + if pattern.search(line): + # Get context + start = max(0, i - context_lines) + end = min(len(lines), i + context_lines + 1) + + context = [] + for j in range(start, end): + prefix = "→ " if j == i else " " + context.append((j + 1, prefix, lines[j])) + + matches.append({ + "file": str(filepath), + "line": i + 1, + "match": line.strip(), + "context": context, + }) + except Exception: + pass + + return matches + + +def _format_match(match: dict) -> str: + """Format a search match for display.""" + output = [f"\n{'─' * 50}"] + output.append(f"📄 {match['file']}:{match['line']}") + output.append("") + + for line_num, prefix, text in match["context"]: + output.append(f"{line_num:4d} {prefix}{text}") + + return "\n".join(output) diff --git a/ghostcrew/tools/completion/__init__.py b/ghostcrew/tools/completion/__init__.py index d8aac01..378f534 100644 --- a/ghostcrew/tools/completion/__init__.py +++ b/ghostcrew/tools/completion/__init__.py @@ -1,5 +1,8 @@ """Task completion tool for GhostCrew agent loop control.""" +import json +from typing import Any, Dict, List, Optional + from ..registry import ToolSchema, register_tool # Sentinel value to signal task completion @@ -8,36 +11,138 @@ TASK_COMPLETE_SIGNAL = "__TASK_COMPLETE__" @register_tool( name="finish", - description="Signal that the current task is finished. Call this when you have completed ALL steps of the user's request. Include a concise summary of what was accomplished.", + description="Signal that the current task is finished. Call this when you have completed ALL steps of the user's request. Provide a structured report of what was accomplished.", schema=ToolSchema( properties={ + "status": { + "type": "string", + "enum": ["success", "partial", "failed"], + "description": "Overall task status: 'success' (all objectives met), 'partial' (some objectives met), 'failed' (unable to complete)", + }, "summary": { "type": "string", "description": "Brief summary of what was accomplished and any key findings", }, + "findings": { + "type": "array", + "items": {"type": "string"}, + "description": "List of key findings, vulnerabilities discovered, or important observations", + }, + "artifacts": { + "type": "array", + "items": {"type": "string"}, + "description": "List of files created (PoCs, scripts, screenshots, reports)", + }, + "recommendations": { + "type": "array", + "items": {"type": "string"}, + "description": "Suggested next steps or follow-up actions", + }, }, - required=["summary"], + required=["status", "summary"], ), category="control", ) async def finish(arguments: dict, runtime) -> str: """ - Signal task completion to the agent framework. + Signal task completion to the agent framework with structured output. This tool is called by the agent when it has finished all steps of the user's task. The framework uses this as an explicit termination signal rather than relying on LLM text output. Args: - arguments: Dictionary with 'summary' key + arguments: Dictionary with structured completion data runtime: The runtime environment (unused) Returns: - The completion signal with summary + The completion signal with structured JSON data """ - summary = arguments.get("summary", "Task completed.") - # Return special signal that the framework recognizes - return f"{TASK_COMPLETE_SIGNAL}:{summary}" + # Build structured completion report + report = CompletionReport( + status=arguments.get("status", "success"), + summary=arguments.get("summary", "Task completed."), + findings=arguments.get("findings", []), + artifacts=arguments.get("artifacts", []), + recommendations=arguments.get("recommendations", []), + ) + + # Return special signal with JSON-encoded report + return f"{TASK_COMPLETE_SIGNAL}:{report.to_json()}" + + +class CompletionReport: + """Structured completion report for task results.""" + + def __init__( + self, + status: str = "success", + summary: str = "", + findings: Optional[List[str]] = None, + artifacts: Optional[List[str]] = None, + recommendations: Optional[List[str]] = None, + ): + self.status = status + self.summary = summary + self.findings = findings or [] + self.artifacts = artifacts or [] + self.recommendations = recommendations or [] + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + return { + "status": self.status, + "summary": self.summary, + "findings": self.findings, + "artifacts": self.artifacts, + "recommendations": self.recommendations, + } + + def to_json(self) -> str: + """Convert to JSON string.""" + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> "CompletionReport": + """Create from JSON string.""" + data = json.loads(json_str) + return cls(**data) + + def format_display(self) -> str: + """Format for human-readable display.""" + lines = [] + + # Status indicator + status_icons = {"success": "✓", "partial": "◐", "failed": "✗"} + icon = status_icons.get(self.status, "•") + lines.append(f"{icon} Status: {self.status.upper()}") + lines.append("") + + # Summary + lines.append(f"Summary: {self.summary}") + + # Findings + if self.findings: + lines.append("") + lines.append("Findings:") + for finding in self.findings: + lines.append(f" • {finding}") + + # Artifacts + if self.artifacts: + lines.append("") + lines.append("Artifacts:") + for artifact in self.artifacts: + lines.append(f" 📄 {artifact}") + + # Recommendations + if self.recommendations: + lines.append("") + lines.append("Recommendations:") + for rec in self.recommendations: + lines.append(f" → {rec}") + + return "\n".join(lines) def is_task_complete(result: str) -> bool: @@ -46,7 +151,26 @@ def is_task_complete(result: str) -> bool: def extract_completion_summary(result: str) -> str: - """Extract the summary from a task_complete result.""" + """Extract the summary from a task_complete result (legacy support).""" if is_task_complete(result): - return result[len(TASK_COMPLETE_SIGNAL) + 1:] # +1 for the colon + data = result[len(TASK_COMPLETE_SIGNAL) + 1:] # +1 for the colon + # Try to parse as JSON for new format + try: + report = CompletionReport.from_json(data) + return report.summary + except (json.JSONDecodeError, TypeError): + # Fall back to raw string for legacy format + return data return result + + +def extract_completion_report(result: str) -> Optional[CompletionReport]: + """Extract the full structured report from a task_complete result.""" + if is_task_complete(result): + data = result[len(TASK_COMPLETE_SIGNAL) + 1:] + try: + return CompletionReport.from_json(data) + except (json.JSONDecodeError, TypeError): + # Legacy format - wrap in report + return CompletionReport(status="success", summary=data) + return None diff --git a/ghostcrew/tools/filesystem/__init__.py b/ghostcrew/tools/filesystem/__init__.py new file mode 100644 index 0000000..2146a0e --- /dev/null +++ b/ghostcrew/tools/filesystem/__init__.py @@ -0,0 +1,352 @@ +"""Filesystem tool for GhostCrew - precise file reading and editing.""" + +import os +import re +from pathlib import Path +from typing import TYPE_CHECKING, List, Optional + +from ..registry import ToolSchema, register_tool + +if TYPE_CHECKING: + from ...runtime import Runtime + + +# Safety: Restrict operations to workspace +_WORKSPACE_ROOT: Optional[Path] = None + + +def set_workspace_root(path: Path) -> None: + """Set the workspace root for safety checks.""" + global _WORKSPACE_ROOT + _WORKSPACE_ROOT = path.resolve() + + +def _validate_path(filepath: str) -> Path: + """Validate and resolve a file path within the workspace.""" + path = Path(filepath).resolve() + + # If workspace root is set, ensure path is within it + if _WORKSPACE_ROOT: + try: + path.relative_to(_WORKSPACE_ROOT) + except ValueError: + raise ValueError(f"Path '{filepath}' is outside workspace root") + + return path + + +@register_tool( + name="read_file", + description="Read contents of a file. Can read entire file or specific line range. Use this to examine source code, configs, or any text file.", + schema=ToolSchema( + properties={ + "path": { + "type": "string", + "description": "Path to the file to read", + }, + "start_line": { + "type": "integer", + "description": "Starting line number (1-indexed). If omitted, reads from beginning.", + }, + "end_line": { + "type": "integer", + "description": "Ending line number (1-indexed, inclusive). If omitted, reads to end.", + }, + }, + required=["path"], + ), + category="filesystem", +) +async def read_file(arguments: dict, runtime: "Runtime") -> str: + """ + Read a file's contents, optionally within a line range. + + Args: + arguments: Dictionary with 'path' and optional 'start_line', 'end_line' + runtime: The runtime environment + + Returns: + File contents with line numbers + """ + filepath = arguments["path"] + start_line = arguments.get("start_line") + end_line = arguments.get("end_line") + + try: + path = _validate_path(filepath) + + if not path.exists(): + return f"Error: File not found: {filepath}" + + if not path.is_file(): + return f"Error: Not a file: {filepath}" + + # Read file content + try: + content = path.read_text(encoding="utf-8") + except UnicodeDecodeError: + content = path.read_text(encoding="latin-1") + + lines = content.splitlines() + total_lines = len(lines) + + # Handle line range + start_idx = (start_line - 1) if start_line else 0 + end_idx = end_line if end_line else total_lines + + # Clamp to valid range + start_idx = max(0, min(start_idx, total_lines)) + end_idx = max(0, min(end_idx, total_lines)) + + if start_idx >= end_idx: + return f"Error: Invalid line range {start_line}-{end_line} (file has {total_lines} lines)" + + # Format output with line numbers + selected_lines = lines[start_idx:end_idx] + output_lines = [] + for i, line in enumerate(selected_lines, start=start_idx + 1): + output_lines.append(f"{i:4d} | {line}") + + header = f"File: {filepath} (lines {start_idx + 1}-{end_idx} of {total_lines})" + return f"{header}\n{'─' * 60}\n" + "\n".join(output_lines) + + except ValueError as e: + return f"Error: {e}" + except Exception as e: + return f"Error reading file: {e}" + + +@register_tool( + name="write_file", + description="Write content to a file. Creates the file if it doesn't exist, or overwrites if it does. Use for creating PoCs, scripts, or config files.", + schema=ToolSchema( + properties={ + "path": { + "type": "string", + "description": "Path to the file to write", + }, + "content": { + "type": "string", + "description": "Content to write to the file", + }, + "append": { + "type": "boolean", + "description": "If true, append to file instead of overwriting. Default: false", + }, + }, + required=["path", "content"], + ), + category="filesystem", +) +async def write_file(arguments: dict, runtime: "Runtime") -> str: + """ + Write content to a file. + + Args: + arguments: Dictionary with 'path', 'content', and optional 'append' + runtime: The runtime environment + + Returns: + Success or error message + """ + filepath = arguments["path"] + content = arguments["content"] + append = arguments.get("append", False) + + try: + path = _validate_path(filepath) + + # Create parent directories if needed + path.parent.mkdir(parents=True, exist_ok=True) + + mode = "a" if append else "w" + with open(path, mode, encoding="utf-8") as f: + f.write(content) + + action = "Appended to" if append else "Wrote" + return f"{action} {len(content)} bytes to {filepath}" + + except ValueError as e: + return f"Error: {e}" + except Exception as e: + return f"Error writing file: {e}" + + +@register_tool( + name="replace_in_file", + description="Replace text in a file. Finds exact match of 'old_string' and replaces with 'new_string'. Include surrounding context in old_string to ensure unique match.", + schema=ToolSchema( + properties={ + "path": { + "type": "string", + "description": "Path to the file to edit", + }, + "old_string": { + "type": "string", + "description": "Exact text to find and replace (include context lines for unique match)", + }, + "new_string": { + "type": "string", + "description": "Text to replace old_string with", + }, + }, + required=["path", "old_string", "new_string"], + ), + category="filesystem", +) +async def replace_in_file(arguments: dict, runtime: "Runtime") -> str: + """ + Replace text in a file. + + Args: + arguments: Dictionary with 'path', 'old_string', 'new_string' + runtime: The runtime environment + + Returns: + Success or error message with diff preview + """ + filepath = arguments["path"] + old_string = arguments["old_string"] + new_string = arguments["new_string"] + + try: + path = _validate_path(filepath) + + if not path.exists(): + return f"Error: File not found: {filepath}" + + # Read current content + try: + content = path.read_text(encoding="utf-8") + except UnicodeDecodeError: + content = path.read_text(encoding="latin-1") + + # Count occurrences + count = content.count(old_string) + + if count == 0: + return f"Error: String not found in {filepath}. Make sure old_string matches exactly (including whitespace)." + + if count > 1: + return f"Error: Found {count} matches in {filepath}. Include more context in old_string to make it unique." + + # Perform replacement + new_content = content.replace(old_string, new_string, 1) + path.write_text(new_content, encoding="utf-8") + + # Show what changed + old_preview = old_string[:100] + "..." if len(old_string) > 100 else old_string + new_preview = new_string[:100] + "..." if len(new_string) > 100 else new_string + + return f"Replaced in {filepath}:\n- {repr(old_preview)}\n+ {repr(new_preview)}" + + except ValueError as e: + return f"Error: {e}" + except Exception as e: + return f"Error replacing in file: {e}" + + +@register_tool( + name="list_directory", + description="List contents of a directory. Shows files and subdirectories with basic info.", + schema=ToolSchema( + properties={ + "path": { + "type": "string", + "description": "Path to the directory to list. Default: current directory", + }, + "recursive": { + "type": "boolean", + "description": "If true, list recursively (max 3 levels). Default: false", + }, + "pattern": { + "type": "string", + "description": "Glob pattern to filter results (e.g., '*.py', '*.js')", + }, + }, + required=[], + ), + category="filesystem", +) +async def list_directory(arguments: dict, runtime: "Runtime") -> str: + """ + List directory contents. + + Args: + arguments: Dictionary with optional 'path', 'recursive', 'pattern' + runtime: The runtime environment + + Returns: + Directory listing + """ + dirpath = arguments.get("path", ".") + recursive = arguments.get("recursive", False) + pattern = arguments.get("pattern") + + try: + path = _validate_path(dirpath) + + if not path.exists(): + return f"Error: Directory not found: {dirpath}" + + if not path.is_dir(): + return f"Error: Not a directory: {dirpath}" + + entries = [] + + if recursive: + # Recursive listing with depth limit + for root, dirs, files in os.walk(path): + root_path = Path(root) + depth = len(root_path.relative_to(path).parts) + if depth > 3: + dirs.clear() # Don't go deeper + continue + + rel_root = root_path.relative_to(path) + prefix = " " * depth + + for d in sorted(dirs): + if not d.startswith('.'): + entries.append(f"{prefix}{d}/") + + for f in sorted(files): + if pattern and not Path(f).match(pattern): + continue + if not f.startswith('.'): + file_path = root_path / f + size = file_path.stat().st_size + entries.append(f"{prefix}{f} ({_format_size(size)})") + else: + # Single-level listing + for item in sorted(path.iterdir()): + if item.name.startswith('.'): + continue + if pattern and not item.match(pattern): + continue + + if item.is_dir(): + entries.append(f"{item.name}/") + else: + size = item.stat().st_size + entries.append(f"{item.name} ({_format_size(size)})") + + if not entries: + return f"Directory {dirpath} is empty" + (f" (pattern: {pattern})" if pattern else "") + + header = f"Directory: {dirpath}" + (f" (pattern: {pattern})" if pattern else "") + return f"{header}\n{'─' * 40}\n" + "\n".join(entries) + + except ValueError as e: + return f"Error: {e}" + except Exception as e: + return f"Error listing directory: {e}" + + +def _format_size(size: int) -> str: + """Format file size in human-readable form.""" + for unit in ['B', 'KB', 'MB', 'GB']: + if size < 1024: + return f"{size:.1f}{unit}" if unit != 'B' else f"{size}B" + size /= 1024 + return f"{size:.1f}TB"