refactor: agent completion workflow and improve robustness

This commit is contained in:
GH05TCREW
2025-12-12 10:52:52 -07:00
parent c7e42d2660
commit 3181ec0696
4 changed files with 914 additions and 13 deletions

View File

@@ -6,11 +6,12 @@ You are Ghost, an autonomous penetration testing agent.
You are operating in an authorized penetration testing engagement. The user has explicit permission to test all targets provided. Do not ask for authorization - permission is granted. Proceed with testing.
## Guidelines
- Be concise. Avoid unnecessary explanation.
- If a tool fails, try alternatives or report the issue.
- Before each action, briefly explain your reasoning and what you expect to find.
- Analyze tool outputs carefully before deciding the next step.
- If a tool fails, diagnose why, then try alternatives or report the issue.
- Do NOT repeat the same test or scan. Once you have results, move on.
- Complete ALL steps of the task before finishing.
- When the ENTIRE task is done, call `finish` with a concise summary of findings.
- When the ENTIRE task is done, call `finish` with a structured report of findings.
## Important
You MUST call the `finish` tool when finished. Do not just respond with text.

View File

@@ -0,0 +1,424 @@
"""Code search tool for GhostCrew - semantic code navigation and analysis."""
import os
import re
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from ..registry import ToolSchema, register_tool
if TYPE_CHECKING:
from ...runtime import Runtime
# Maximum results to prevent context overflow
MAX_RESULTS = 20
MAX_CONTEXT_LINES = 3
@register_tool(
name="search_code",
description="Search for code patterns across files. Supports regex and literal search. Returns matches with surrounding context. Use for finding function definitions, variable usages, API endpoints, or security-relevant patterns.",
schema=ToolSchema(
properties={
"query": {
"type": "string",
"description": "Search pattern (text or regex)",
},
"path": {
"type": "string",
"description": "Directory or file to search in. Default: current directory",
},
"pattern": {
"type": "string",
"description": "File glob pattern to filter (e.g., '*.py', '*.js'). Default: all files",
},
"regex": {
"type": "boolean",
"description": "Treat query as regex pattern. Default: false (literal search)",
},
"case_sensitive": {
"type": "boolean",
"description": "Case-sensitive search. Default: false",
},
"context_lines": {
"type": "integer",
"description": "Number of context lines before/after match. Default: 3",
},
},
required=["query"],
),
category="code",
)
async def search_code(arguments: dict, runtime: "Runtime") -> str:
"""
Search for code patterns across files.
Args:
arguments: Search parameters
runtime: The runtime environment
Returns:
Formatted search results with context
"""
query = arguments["query"]
search_path = arguments.get("path", ".")
file_pattern = arguments.get("pattern")
use_regex = arguments.get("regex", False)
case_sensitive = arguments.get("case_sensitive", False)
context_lines = min(arguments.get("context_lines", MAX_CONTEXT_LINES), 10)
try:
path = Path(search_path).resolve()
if not path.exists():
return f"Error: Path not found: {search_path}"
# Compile regex pattern
flags = 0 if case_sensitive else re.IGNORECASE
if use_regex:
try:
pattern = re.compile(query, flags)
except re.error as e:
return f"Error: Invalid regex pattern: {e}"
else:
# Escape literal string for regex matching
pattern = re.compile(re.escape(query), flags)
# Find matching files
matches = []
files_searched = 0
if path.is_file():
files_to_search = [path]
else:
files_to_search = _get_searchable_files(path, file_pattern)
for filepath in files_to_search:
files_searched += 1
file_matches = _search_file(filepath, pattern, context_lines)
if file_matches:
matches.extend(file_matches)
if len(matches) >= MAX_RESULTS:
break
if not matches:
return f"No matches found for '{query}' in {files_searched} files"
# Format results
output = [f"Found {len(matches)} matches in {files_searched} files:\n"]
for match in matches[:MAX_RESULTS]:
output.append(_format_match(match))
if len(matches) > MAX_RESULTS:
output.append(f"\n... and {len(matches) - MAX_RESULTS} more matches (showing first {MAX_RESULTS})")
return "\n".join(output)
except Exception as e:
return f"Error searching code: {e}"
@register_tool(
name="find_definition",
description="Find the definition of a function, class, or variable. Searches for common definition patterns across languages (def, function, class, const, let, var, etc.).",
schema=ToolSchema(
properties={
"name": {
"type": "string",
"description": "Name of the function, class, or variable to find",
},
"path": {
"type": "string",
"description": "Directory to search in. Default: current directory",
},
"type": {
"type": "string",
"enum": ["function", "class", "variable", "any"],
"description": "Type of definition to find. Default: 'any'",
},
},
required=["name"],
),
category="code",
)
async def find_definition(arguments: dict, runtime: "Runtime") -> str:
"""
Find definition of a symbol.
Args:
arguments: Search parameters
runtime: The runtime environment
Returns:
Definition location(s) with context
"""
name = arguments["name"]
search_path = arguments.get("path", ".")
def_type = arguments.get("type", "any")
# Build regex patterns for different definition types
patterns = {
"function": [
rf"^\s*def\s+{re.escape(name)}\s*\(", # Python
rf"^\s*async\s+def\s+{re.escape(name)}\s*\(", # Python async
rf"^\s*function\s+{re.escape(name)}\s*\(", # JavaScript
rf"^\s*async\s+function\s+{re.escape(name)}\s*\(", # JS async
rf"^\s*{re.escape(name)}\s*[:=]\s*(?:async\s+)?function", # JS assigned
rf"^\s*{re.escape(name)}\s*[:=]\s*\([^)]*\)\s*=>", # JS arrow
rf"^\s*(?:public|private|protected)?\s*(?:static\s+)?(?:async\s+)?{re.escape(name)}\s*\(", # JS/TS method
rf"^\s*func\s+{re.escape(name)}\s*\(", # Go
rf"^\s*(?:public|private|protected)\s+.*\s+{re.escape(name)}\s*\(", # Java/C#
],
"class": [
rf"^\s*class\s+{re.escape(name)}\b", # Python/JS/TS
rf"^\s*(?:abstract\s+)?class\s+{re.escape(name)}\b", # Java/C#
rf"^\s*interface\s+{re.escape(name)}\b", # TS/Java
rf"^\s*type\s+{re.escape(name)}\s*=", # TS type alias
rf"^\s*struct\s+{re.escape(name)}\b", # Go/Rust
],
"variable": [
rf"^\s*{re.escape(name)}\s*=", # Python/Ruby
rf"^\s*(?:const|let|var)\s+{re.escape(name)}\b", # JavaScript
rf"^\s*(?:const|let|var)\s+{re.escape(name)}\s*:", # TypeScript
rf"^\s*(?:var|val)\s+{re.escape(name)}\b", # Kotlin/Scala
rf"^\s*{re.escape(name)}\s*:=", # Go
],
}
# Select patterns based on type
if def_type == "any":
selected_patterns = []
for p_list in patterns.values():
selected_patterns.extend(p_list)
else:
selected_patterns = patterns.get(def_type, [])
if not selected_patterns:
return f"Error: Unknown definition type '{def_type}'"
# Combine patterns
combined_pattern = "|".join(f"({p})" for p in selected_patterns)
try:
pattern = re.compile(combined_pattern, re.MULTILINE)
path = Path(search_path).resolve()
if not path.exists():
return f"Error: Path not found: {search_path}"
matches = []
files_searched = 0
for filepath in _get_searchable_files(path, None):
files_searched += 1
file_matches = _search_file(filepath, pattern, context_lines=5)
if file_matches:
matches.extend(file_matches)
if len(matches) >= 10:
break
if not matches:
return f"No definition found for '{name}' ({def_type}) in {files_searched} files"
output = [f"Found {len(matches)} definition(s) for '{name}':\n"]
for match in matches[:10]:
output.append(_format_match(match))
return "\n".join(output)
except Exception as e:
return f"Error finding definition: {e}"
@register_tool(
name="list_functions",
description="List all function/method definitions in a file or directory. Useful for understanding code structure.",
schema=ToolSchema(
properties={
"path": {
"type": "string",
"description": "File or directory to analyze",
},
"pattern": {
"type": "string",
"description": "File glob pattern (e.g., '*.py'). Default: auto-detect",
},
},
required=["path"],
),
category="code",
)
async def list_functions(arguments: dict, runtime: "Runtime") -> str:
"""
List function definitions in files.
Args:
arguments: Search parameters
runtime: The runtime environment
Returns:
List of functions with file and line numbers
"""
search_path = arguments["path"]
file_pattern = arguments.get("pattern")
# Patterns for function definitions
func_patterns = [
(r"^\s*def\s+(\w+)\s*\(", "python"),
(r"^\s*async\s+def\s+(\w+)\s*\(", "python"),
(r"^\s*function\s+(\w+)\s*\(", "javascript"),
(r"^\s*async\s+function\s+(\w+)\s*\(", "javascript"),
(r"^\s*(?:export\s+)?(?:async\s+)?function\s+(\w+)", "javascript"),
(r"^\s*(\w+)\s*[:=]\s*(?:async\s+)?function", "javascript"),
(r"^\s*(?:const|let|var)\s+(\w+)\s*=\s*(?:async\s+)?\([^)]*\)\s*=>", "javascript"),
(r"^\s*(?:public|private|protected)?\s*(?:static\s+)?(?:async\s+)?(\w+)\s*\([^)]*\)\s*[:{]", "typescript"),
(r"^\s*func\s+(\w+)\s*\(", "go"),
(r"^\s*(?:public|private|protected)\s+.*\s+(\w+)\s*\([^)]*\)\s*{", "java"),
]
combined = "|".join(f"(?:{p})" for p, _ in func_patterns)
pattern = re.compile(combined, re.MULTILINE)
try:
path = Path(search_path).resolve()
if not path.exists():
return f"Error: Path not found: {search_path}"
results: Dict[str, List[Tuple[int, str]]] = {}
if path.is_file():
files_to_search = [path]
else:
files_to_search = _get_searchable_files(path, file_pattern)
for filepath in files_to_search:
try:
content = filepath.read_text(encoding="utf-8", errors="ignore")
lines = content.splitlines()
for i, line in enumerate(lines, 1):
match = pattern.search(line)
if match:
# Find the first non-None group (function name)
func_name = next((g for g in match.groups() if g), None)
if func_name:
rel_path = str(filepath.relative_to(path) if path.is_dir() else filepath.name)
if rel_path not in results:
results[rel_path] = []
results[rel_path].append((i, func_name))
except Exception:
continue
if not results:
return f"No functions found in {search_path}"
output = [f"Functions in {search_path}:\n"]
for filepath, funcs in sorted(results.items()):
output.append(f"\n{filepath}:")
for line_num, func_name in funcs:
output.append(f" L{line_num}: {func_name}()")
total = sum(len(f) for f in results.values())
output.insert(1, f"Found {total} functions in {len(results)} files")
return "\n".join(output)
except Exception as e:
return f"Error listing functions: {e}"
def _get_searchable_files(path: Path, pattern: Optional[str]) -> List[Path]:
"""Get list of searchable files, excluding binary and hidden files."""
files = []
# File extensions to search
code_extensions = {
".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".go", ".rb", ".php",
".c", ".cpp", ".h", ".hpp", ".cs", ".rs", ".swift", ".kt", ".scala",
".sh", ".bash", ".zsh", ".ps1", ".bat", ".cmd",
".json", ".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf",
".xml", ".html", ".htm", ".css", ".scss", ".sass",
".sql", ".graphql", ".md", ".txt", ".env", ".gitignore",
}
# Directories to skip
skip_dirs = {
".git", ".svn", ".hg", "node_modules", "__pycache__", ".pytest_cache",
"venv", ".venv", "env", ".env", "dist", "build", "target", ".idea",
".vscode", "coverage", ".tox", "eggs", "*.egg-info",
}
for root, dirs, filenames in os.walk(path):
# Skip hidden and common non-code directories
dirs[:] = [d for d in dirs if d not in skip_dirs and not d.startswith(".")]
for filename in filenames:
filepath = Path(root) / filename
# Skip hidden files
if filename.startswith("."):
continue
# Apply glob pattern if specified
if pattern and not filepath.match(pattern):
continue
# Check extension
if filepath.suffix.lower() in code_extensions or not filepath.suffix:
files.append(filepath)
return files
def _search_file(
filepath: Path,
pattern: re.Pattern,
context_lines: int
) -> List[dict]:
"""Search a single file for pattern matches."""
matches = []
try:
content = filepath.read_text(encoding="utf-8", errors="ignore")
lines = content.splitlines()
for i, line in enumerate(lines):
if pattern.search(line):
# Get context
start = max(0, i - context_lines)
end = min(len(lines), i + context_lines + 1)
context = []
for j in range(start, end):
prefix = "" if j == i else " "
context.append((j + 1, prefix, lines[j]))
matches.append({
"file": str(filepath),
"line": i + 1,
"match": line.strip(),
"context": context,
})
except Exception:
pass
return matches
def _format_match(match: dict) -> str:
"""Format a search match for display."""
output = [f"\n{'' * 50}"]
output.append(f"📄 {match['file']}:{match['line']}")
output.append("")
for line_num, prefix, text in match["context"]:
output.append(f"{line_num:4d} {prefix}{text}")
return "\n".join(output)

View File

@@ -1,5 +1,8 @@
"""Task completion tool for GhostCrew agent loop control."""
import json
from typing import Any, Dict, List, Optional
from ..registry import ToolSchema, register_tool
# Sentinel value to signal task completion
@@ -8,36 +11,138 @@ TASK_COMPLETE_SIGNAL = "__TASK_COMPLETE__"
@register_tool(
name="finish",
description="Signal that the current task is finished. Call this when you have completed ALL steps of the user's request. Include a concise summary of what was accomplished.",
description="Signal that the current task is finished. Call this when you have completed ALL steps of the user's request. Provide a structured report of what was accomplished.",
schema=ToolSchema(
properties={
"status": {
"type": "string",
"enum": ["success", "partial", "failed"],
"description": "Overall task status: 'success' (all objectives met), 'partial' (some objectives met), 'failed' (unable to complete)",
},
"summary": {
"type": "string",
"description": "Brief summary of what was accomplished and any key findings",
},
"findings": {
"type": "array",
"items": {"type": "string"},
"description": "List of key findings, vulnerabilities discovered, or important observations",
},
"artifacts": {
"type": "array",
"items": {"type": "string"},
"description": "List of files created (PoCs, scripts, screenshots, reports)",
},
"recommendations": {
"type": "array",
"items": {"type": "string"},
"description": "Suggested next steps or follow-up actions",
},
},
required=["summary"],
required=["status", "summary"],
),
category="control",
)
async def finish(arguments: dict, runtime) -> str:
"""
Signal task completion to the agent framework.
Signal task completion to the agent framework with structured output.
This tool is called by the agent when it has finished all steps
of the user's task. The framework uses this as an explicit
termination signal rather than relying on LLM text output.
Args:
arguments: Dictionary with 'summary' key
arguments: Dictionary with structured completion data
runtime: The runtime environment (unused)
Returns:
The completion signal with summary
The completion signal with structured JSON data
"""
summary = arguments.get("summary", "Task completed.")
# Return special signal that the framework recognizes
return f"{TASK_COMPLETE_SIGNAL}:{summary}"
# Build structured completion report
report = CompletionReport(
status=arguments.get("status", "success"),
summary=arguments.get("summary", "Task completed."),
findings=arguments.get("findings", []),
artifacts=arguments.get("artifacts", []),
recommendations=arguments.get("recommendations", []),
)
# Return special signal with JSON-encoded report
return f"{TASK_COMPLETE_SIGNAL}:{report.to_json()}"
class CompletionReport:
"""Structured completion report for task results."""
def __init__(
self,
status: str = "success",
summary: str = "",
findings: Optional[List[str]] = None,
artifacts: Optional[List[str]] = None,
recommendations: Optional[List[str]] = None,
):
self.status = status
self.summary = summary
self.findings = findings or []
self.artifacts = artifacts or []
self.recommendations = recommendations or []
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"status": self.status,
"summary": self.summary,
"findings": self.findings,
"artifacts": self.artifacts,
"recommendations": self.recommendations,
}
def to_json(self) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> "CompletionReport":
"""Create from JSON string."""
data = json.loads(json_str)
return cls(**data)
def format_display(self) -> str:
"""Format for human-readable display."""
lines = []
# Status indicator
status_icons = {"success": "", "partial": "", "failed": ""}
icon = status_icons.get(self.status, "")
lines.append(f"{icon} Status: {self.status.upper()}")
lines.append("")
# Summary
lines.append(f"Summary: {self.summary}")
# Findings
if self.findings:
lines.append("")
lines.append("Findings:")
for finding in self.findings:
lines.append(f"{finding}")
# Artifacts
if self.artifacts:
lines.append("")
lines.append("Artifacts:")
for artifact in self.artifacts:
lines.append(f" 📄 {artifact}")
# Recommendations
if self.recommendations:
lines.append("")
lines.append("Recommendations:")
for rec in self.recommendations:
lines.append(f"{rec}")
return "\n".join(lines)
def is_task_complete(result: str) -> bool:
@@ -46,7 +151,26 @@ def is_task_complete(result: str) -> bool:
def extract_completion_summary(result: str) -> str:
"""Extract the summary from a task_complete result."""
"""Extract the summary from a task_complete result (legacy support)."""
if is_task_complete(result):
return result[len(TASK_COMPLETE_SIGNAL) + 1:] # +1 for the colon
data = result[len(TASK_COMPLETE_SIGNAL) + 1:] # +1 for the colon
# Try to parse as JSON for new format
try:
report = CompletionReport.from_json(data)
return report.summary
except (json.JSONDecodeError, TypeError):
# Fall back to raw string for legacy format
return data
return result
def extract_completion_report(result: str) -> Optional[CompletionReport]:
"""Extract the full structured report from a task_complete result."""
if is_task_complete(result):
data = result[len(TASK_COMPLETE_SIGNAL) + 1:]
try:
return CompletionReport.from_json(data)
except (json.JSONDecodeError, TypeError):
# Legacy format - wrap in report
return CompletionReport(status="success", summary=data)
return None

View File

@@ -0,0 +1,352 @@
"""Filesystem tool for GhostCrew - precise file reading and editing."""
import os
import re
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional
from ..registry import ToolSchema, register_tool
if TYPE_CHECKING:
from ...runtime import Runtime
# Safety: Restrict operations to workspace
_WORKSPACE_ROOT: Optional[Path] = None
def set_workspace_root(path: Path) -> None:
"""Set the workspace root for safety checks."""
global _WORKSPACE_ROOT
_WORKSPACE_ROOT = path.resolve()
def _validate_path(filepath: str) -> Path:
"""Validate and resolve a file path within the workspace."""
path = Path(filepath).resolve()
# If workspace root is set, ensure path is within it
if _WORKSPACE_ROOT:
try:
path.relative_to(_WORKSPACE_ROOT)
except ValueError:
raise ValueError(f"Path '{filepath}' is outside workspace root")
return path
@register_tool(
name="read_file",
description="Read contents of a file. Can read entire file or specific line range. Use this to examine source code, configs, or any text file.",
schema=ToolSchema(
properties={
"path": {
"type": "string",
"description": "Path to the file to read",
},
"start_line": {
"type": "integer",
"description": "Starting line number (1-indexed). If omitted, reads from beginning.",
},
"end_line": {
"type": "integer",
"description": "Ending line number (1-indexed, inclusive). If omitted, reads to end.",
},
},
required=["path"],
),
category="filesystem",
)
async def read_file(arguments: dict, runtime: "Runtime") -> str:
"""
Read a file's contents, optionally within a line range.
Args:
arguments: Dictionary with 'path' and optional 'start_line', 'end_line'
runtime: The runtime environment
Returns:
File contents with line numbers
"""
filepath = arguments["path"]
start_line = arguments.get("start_line")
end_line = arguments.get("end_line")
try:
path = _validate_path(filepath)
if not path.exists():
return f"Error: File not found: {filepath}"
if not path.is_file():
return f"Error: Not a file: {filepath}"
# Read file content
try:
content = path.read_text(encoding="utf-8")
except UnicodeDecodeError:
content = path.read_text(encoding="latin-1")
lines = content.splitlines()
total_lines = len(lines)
# Handle line range
start_idx = (start_line - 1) if start_line else 0
end_idx = end_line if end_line else total_lines
# Clamp to valid range
start_idx = max(0, min(start_idx, total_lines))
end_idx = max(0, min(end_idx, total_lines))
if start_idx >= end_idx:
return f"Error: Invalid line range {start_line}-{end_line} (file has {total_lines} lines)"
# Format output with line numbers
selected_lines = lines[start_idx:end_idx]
output_lines = []
for i, line in enumerate(selected_lines, start=start_idx + 1):
output_lines.append(f"{i:4d} | {line}")
header = f"File: {filepath} (lines {start_idx + 1}-{end_idx} of {total_lines})"
return f"{header}\n{'' * 60}\n" + "\n".join(output_lines)
except ValueError as e:
return f"Error: {e}"
except Exception as e:
return f"Error reading file: {e}"
@register_tool(
name="write_file",
description="Write content to a file. Creates the file if it doesn't exist, or overwrites if it does. Use for creating PoCs, scripts, or config files.",
schema=ToolSchema(
properties={
"path": {
"type": "string",
"description": "Path to the file to write",
},
"content": {
"type": "string",
"description": "Content to write to the file",
},
"append": {
"type": "boolean",
"description": "If true, append to file instead of overwriting. Default: false",
},
},
required=["path", "content"],
),
category="filesystem",
)
async def write_file(arguments: dict, runtime: "Runtime") -> str:
"""
Write content to a file.
Args:
arguments: Dictionary with 'path', 'content', and optional 'append'
runtime: The runtime environment
Returns:
Success or error message
"""
filepath = arguments["path"]
content = arguments["content"]
append = arguments.get("append", False)
try:
path = _validate_path(filepath)
# Create parent directories if needed
path.parent.mkdir(parents=True, exist_ok=True)
mode = "a" if append else "w"
with open(path, mode, encoding="utf-8") as f:
f.write(content)
action = "Appended to" if append else "Wrote"
return f"{action} {len(content)} bytes to {filepath}"
except ValueError as e:
return f"Error: {e}"
except Exception as e:
return f"Error writing file: {e}"
@register_tool(
name="replace_in_file",
description="Replace text in a file. Finds exact match of 'old_string' and replaces with 'new_string'. Include surrounding context in old_string to ensure unique match.",
schema=ToolSchema(
properties={
"path": {
"type": "string",
"description": "Path to the file to edit",
},
"old_string": {
"type": "string",
"description": "Exact text to find and replace (include context lines for unique match)",
},
"new_string": {
"type": "string",
"description": "Text to replace old_string with",
},
},
required=["path", "old_string", "new_string"],
),
category="filesystem",
)
async def replace_in_file(arguments: dict, runtime: "Runtime") -> str:
"""
Replace text in a file.
Args:
arguments: Dictionary with 'path', 'old_string', 'new_string'
runtime: The runtime environment
Returns:
Success or error message with diff preview
"""
filepath = arguments["path"]
old_string = arguments["old_string"]
new_string = arguments["new_string"]
try:
path = _validate_path(filepath)
if not path.exists():
return f"Error: File not found: {filepath}"
# Read current content
try:
content = path.read_text(encoding="utf-8")
except UnicodeDecodeError:
content = path.read_text(encoding="latin-1")
# Count occurrences
count = content.count(old_string)
if count == 0:
return f"Error: String not found in {filepath}. Make sure old_string matches exactly (including whitespace)."
if count > 1:
return f"Error: Found {count} matches in {filepath}. Include more context in old_string to make it unique."
# Perform replacement
new_content = content.replace(old_string, new_string, 1)
path.write_text(new_content, encoding="utf-8")
# Show what changed
old_preview = old_string[:100] + "..." if len(old_string) > 100 else old_string
new_preview = new_string[:100] + "..." if len(new_string) > 100 else new_string
return f"Replaced in {filepath}:\n- {repr(old_preview)}\n+ {repr(new_preview)}"
except ValueError as e:
return f"Error: {e}"
except Exception as e:
return f"Error replacing in file: {e}"
@register_tool(
name="list_directory",
description="List contents of a directory. Shows files and subdirectories with basic info.",
schema=ToolSchema(
properties={
"path": {
"type": "string",
"description": "Path to the directory to list. Default: current directory",
},
"recursive": {
"type": "boolean",
"description": "If true, list recursively (max 3 levels). Default: false",
},
"pattern": {
"type": "string",
"description": "Glob pattern to filter results (e.g., '*.py', '*.js')",
},
},
required=[],
),
category="filesystem",
)
async def list_directory(arguments: dict, runtime: "Runtime") -> str:
"""
List directory contents.
Args:
arguments: Dictionary with optional 'path', 'recursive', 'pattern'
runtime: The runtime environment
Returns:
Directory listing
"""
dirpath = arguments.get("path", ".")
recursive = arguments.get("recursive", False)
pattern = arguments.get("pattern")
try:
path = _validate_path(dirpath)
if not path.exists():
return f"Error: Directory not found: {dirpath}"
if not path.is_dir():
return f"Error: Not a directory: {dirpath}"
entries = []
if recursive:
# Recursive listing with depth limit
for root, dirs, files in os.walk(path):
root_path = Path(root)
depth = len(root_path.relative_to(path).parts)
if depth > 3:
dirs.clear() # Don't go deeper
continue
rel_root = root_path.relative_to(path)
prefix = " " * depth
for d in sorted(dirs):
if not d.startswith('.'):
entries.append(f"{prefix}{d}/")
for f in sorted(files):
if pattern and not Path(f).match(pattern):
continue
if not f.startswith('.'):
file_path = root_path / f
size = file_path.stat().st_size
entries.append(f"{prefix}{f} ({_format_size(size)})")
else:
# Single-level listing
for item in sorted(path.iterdir()):
if item.name.startswith('.'):
continue
if pattern and not item.match(pattern):
continue
if item.is_dir():
entries.append(f"{item.name}/")
else:
size = item.stat().st_size
entries.append(f"{item.name} ({_format_size(size)})")
if not entries:
return f"Directory {dirpath} is empty" + (f" (pattern: {pattern})" if pattern else "")
header = f"Directory: {dirpath}" + (f" (pattern: {pattern})" if pattern else "")
return f"{header}\n{'' * 40}\n" + "\n".join(entries)
except ValueError as e:
return f"Error: {e}"
except Exception as e:
return f"Error listing directory: {e}"
def _format_size(size: int) -> str:
"""Format file size in human-readable form."""
for unit in ['B', 'KB', 'MB', 'GB']:
if size < 1024:
return f"{size:.1f}{unit}" if unit != 'B' else f"{size}B"
size /= 1024
return f"{size:.1f}TB"