diff --git a/pentestagent/agents/prompts/pa_agent.jinja b/pentestagent/agents/prompts/pa_agent.jinja index 5af3418..1cf62ff 100644 --- a/pentestagent/agents/prompts/pa_agent.jinja +++ b/pentestagent/agents/prompts/pa_agent.jinja @@ -18,12 +18,30 @@ You are operating in an authorized penetration testing engagement. The user has - Use tools ONLY when you need to interact with the environment, gather information, execute something, or produce an artifact. - **Record Findings**: When you find something important, IMMEDIATELY save it using `notes(action="create", ...)`. This is your long-term memory and how you share data with the crew. - **Structured Notes (CRITICAL)**: Your notes build a knowledge graph. You MUST use structured fields: - - **Credentials**: `username`, `password` (or hash), `target` (where they work), `source` (where found). - *Example*: `notes(action="create", key="creds_db", value="Found in config", category="credential", username="admin", password="123", target="10.10.10.20", source="10.10.10.5")` - - **Services**: `port` (e.g. "80/tcp"), `target`, `url` (if web). - - **Vulnerabilities**: `cve`, `target`. + - **TARGET FIELD IS MANDATORY**: ANY note about a specific host MUST include `target="IP_ADDRESS"`. Without this, data cannot be linked in the knowledge graph. + + - **Credentials**: Always include `username`, `password`, `target` (where they work), optionally `source` (where found). + *Example*: `notes(action="create", key="creds_db", value="Found database credentials in config", category="credential", username="admin", password="P@ssw0rd", target="10.10.10.20", source="10.10.10.5:/var/www/config.php")` + *Web app*: `notes(action="create", key="creds_default", value="Default credentials work on DVWA", category="credential", username="admin", password="password", target="10.10.10.1", url="http://10.10.10.1/dvwa")` + + - **Host/Service Profiles**: When you discover services, technologies, or endpoints on a host, create ONE comprehensive note with nested arrays: + *Example*: `notes(action="create", key="profile_webserver", value="Apache 2.2.8 web server with PHP 5.2.4, MySQL backend, and vulnerable web apps", category="finding", target="10.10.10.1", services=[{"port": 80, "product": "Apache httpd", "version": "2.2.8"}, {"port": 3306, "product": "MySQL", "version": "5.0.51"}], technologies=[{"name": "PHP", "version": "5.2.4"}, {"name": "Ubuntu", "version": "8.04"}], endpoints=[{"path": "/phpMyAdmin", "methods": ["GET", "POST"]}, {"path": "/admin", "methods": ["GET"]}, {"path": "/dvwa/", "methods": ["GET", "POST"]}])` + - `services`: Array of discovered services with port, product, version + - `technologies`: Array of tech stack components (OS, frameworks, libraries) + - `endpoints`: Array of discovered web paths with HTTP methods + - ALL THREE arrays are optional but powerful when combined + + - **Simple Service Discovery**: For quick single-service notes: + *Example*: `notes(action="create", key="http_open", value="HTTP service on port 80", category="finding", target="10.10.10.1", port="80", url="http://10.10.10.1")` + + - **Vulnerabilities**: Use `cve`, `target`, and `affected_versions` for version-specific vulns: + *Example*: `notes(action="create", key="vuln_php_cgi", value="PHP-CGI vulnerable to CVE-2012-1823 RCE", category="vulnerability", cve="CVE-2012-1823", target="10.10.10.1", affected_versions={"PHP": "5.0.0 - 5.3.11", "PHP-CGI": "5.0.0 - 5.4.1"})` + + - **Weakness Candidates**: When gathering potential vulns to filter, include `target` and `weaknesses` array: + *Example*: `notes(action="create", key="weak_candidates_apache", value="Apache 2.2.8 has multiple known CVEs", category="finding", target="10.10.10.1", weaknesses=[{"id": "CVE-2011-3192", "description": "Range header DoS"}, {"id": "CVE-2011-3368", "description": "Reverse proxy bypass"}])` + - **Evidence**: If you have a screenshot or file, use `evidence_path`. - - **General**: Always specify `target` to link the note to a host. + *Example*: `notes(action="create", key="admin_panel_screenshot", value="Found admin panel at /manager/html", category="finding", target="10.10.10.1", evidence_path="loot/artifacts/screenshots/admin_panel.png")` - Do NOT describe actions you *could* take — if an action is needed, actually use the tool. - After EVERY action that completes a plan step, call `finish` to mark it done. - The pattern is: use tool → finish(action="complete", step_id=N) → use next tool → finish(action="complete", step_id=N+1) → repeat diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index 6f938c8..a035e78 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -3,6 +3,7 @@ PentestAgent TUI - Terminal User Interface """ import asyncio +import re import textwrap from datetime import datetime from pathlib import Path @@ -28,6 +29,8 @@ from textual.widgets.tree import TreeNode from ..config.constants import DEFAULT_MODEL +# ANSI escape sequence pattern for stripping control codes from input +_ANSI_ESCAPE = re.compile(r'\x1b\[[0-9;]*[mGKHflSTABCDEFsu]|\x1b\].*?\x07|\x1b\[<[0-9;]*[Mm]') # ASCII-safe scrollbar renderer to avoid Unicode glyph issues class ASCIIScrollBarRender(ScrollBarRender): @@ -1040,7 +1043,8 @@ Be concise. Use the actual data from notes.""" if self._is_initializing or self._is_running: return - message = event.value.strip() + # Strip ANSI escape sequences and control codes + message = _ANSI_ESCAPE.sub('', event.value).strip() if not message: return diff --git a/pentestagent/knowledge/graph.py b/pentestagent/knowledge/graph.py index 1aee1d0..6cf297d 100644 --- a/pentestagent/knowledge/graph.py +++ b/pentestagent/knowledge/graph.py @@ -227,16 +227,79 @@ class ShadowGraph: if target_id in related_hosts: target_hosts = [target_id] - # Extract ports from metadata or regex + # Handle nested services metadata + if metadata.get("services"): + for svc in metadata["services"]: + port = svc.get("port") + product = svc.get("product", "") + version = svc.get("version", "") + proto = svc.get("protocol", "tcp") + + if port: + for host_id in target_hosts: + service_id = f"service:{host_id}:{port}" + label = f"{port}/{proto}" + if product: + label += f" {product}" + if version: + label += f" {version}" + + self._add_node(service_id, "service", label, product=product, version=version) + self._add_edge(host_id, service_id, "HAS_SERVICE", protocol=proto) + + # Handle nested endpoints metadata + if metadata.get("endpoints"): + for ep in metadata["endpoints"]: + path = ep.get("path") + methods = ep.get("methods", []) + if path: + for host_id in target_hosts: + endpoint_id = f"endpoint:{host_id}:{path}" + label = path + if methods: + label += f" ({','.join(methods)})" + + self._add_node(endpoint_id, "endpoint", label, methods=methods) + self._add_edge(host_id, endpoint_id, "HAS_ENDPOINT") + + # Handle nested technologies metadata + if metadata.get("technologies"): + for tech in metadata["technologies"]: + name = tech.get("name") + version = tech.get("version", "") + if name: + for host_id in target_hosts: + tech_id = f"tech:{host_id}:{name}" + label = name + if version and version != "unknown": + label += f" {version}" + + self._add_node(tech_id, "technology", label, name=name, version=version) + self._add_edge(host_id, tech_id, "USES_TECH") + + # If we processed nested metadata, we're done + if metadata.get("services") or metadata.get("endpoints") or metadata.get("technologies"): + return + + # Fallback to old port extraction logic ports = [] if metadata.get("port"): - # Handle single port in metadata + # Handle single port or comma-separated in metadata p = str(metadata["port"]) - # Assume tcp if not specified? - proto = "tcp" - if "/" in p: - p, proto = p.split("/") - ports.append((p, proto)) + if "," in p: + # Handle comma-separated list + for port_str in p.split(","): + port_str = port_str.strip() + proto = "tcp" + if "/" in port_str: + port_str, proto = port_str.split("/") + ports.append((port_str, proto)) + else: + # Single port + proto = "tcp" + if "/" in p: + p, proto = p.split("/") + ports.append((p, proto)) # Always check regex too, in case metadata missed some regex_ports = self._port_pattern.findall(content) @@ -314,7 +377,9 @@ class ShadowGraph: f"We have credentials that provide access to: {', '.join(target_labels)}" ) - # Insight 2: High Value Targets (Hosts with many open ports/vulns) + # Insight 2: High Value Targets (Hosts with many open ports/vulns/endpoints) + high_value_endpoints = ["admin", "phpmyadmin", "phpMyAdmin", "manager", "console", "webdav", "dav"] + for node, data in self.graph.nodes(data=True): if data.get("type") == "host": # Count services @@ -328,11 +393,38 @@ class ShadowGraph: for u, v in self.graph.out_edges(node) if self.graph.nodes[v].get("type") == "vulnerability" ] + endpoints = [ + v + for u, v in self.graph.out_edges(node) + if self.graph.nodes[v].get("type") == "endpoint" + ] + technologies = [ + v + for u, v in self.graph.out_edges(node) + if self.graph.nodes[v].get("type") == "technology" + ] - if len(services) > 0 or len(vulns) > 0: + if len(services) > 0 or len(vulns) > 0 or len(endpoints) > 0 or len(technologies) > 0: + parts = [] + if len(services) > 0: + parts.append(f"{len(services)} services") + if len(endpoints) > 0: + parts.append(f"{len(endpoints)} endpoints") + if len(technologies) > 0: + parts.append(f"{len(technologies)} technologies") + if len(vulns) > 0: + parts.append(f"{len(vulns)} vulnerabilities") insights.append( - f"Host {data['label']} has {len(services)} services and {len(vulns)} known vulnerabilities." + f"Host {data['label']} has {', '.join(parts)}." ) + + # Flag high-value endpoints + for ep_id in endpoints: + ep_label = self.graph.nodes[ep_id].get("label", "") + if any(hv in ep_label.lower() for hv in high_value_endpoints): + insights.append( + f"⚠️ High-value endpoint detected: {ep_label} on {data['label']}" + ) # Insight 3: Potential Pivots (Host A -> Cred -> Host B) # Use NetworkX to find paths from Credentials to Hosts that aren't directly connected @@ -411,6 +503,15 @@ class ShadowGraph: "hosts": len( [n for n, d in self.graph.nodes(data=True) if d["type"] == "host"] ), + "services": len( + [n for n, d in self.graph.nodes(data=True) if d["type"] == "service"] + ), + "endpoints": len( + [n for n, d in self.graph.nodes(data=True) if d["type"] == "endpoint"] + ), + "technologies": len( + [n for n, d in self.graph.nodes(data=True) if d["type"] == "technology"] + ), "creds": len( [n for n, d in self.graph.nodes(data=True) if d["type"] == "credential"] ), @@ -422,4 +523,4 @@ class ShadowGraph: ] ), } - return f"Graph State: {stats['hosts']} Hosts, {stats['creds']} Credentials, {stats['vulns']} Vulnerabilities" + return f"Graph State: {stats['hosts']} Hosts, {stats['services']} Services, {stats['endpoints']} Endpoints, {stats['technologies']} Technologies, {stats['creds']} Credentials, {stats['vulns']} Vulnerabilities" diff --git a/pentestagent/tools/notes/__init__.py b/pentestagent/tools/notes/__init__.py index ab7189d..1c5391b 100644 --- a/pentestagent/tools/notes/__init__.py +++ b/pentestagent/tools/notes/__init__.py @@ -156,6 +156,26 @@ _load_notes_unlocked() "type": "string", "description": "Path to a screenshot or downloaded file supporting this finding", }, + "services": { + "type": "array", + "description": "Array of service objects with port, product, version (e.g., [{'port': 80, 'product': 'Apache', 'version': '2.2.8'}])", + }, + "technologies": { + "type": "array", + "description": "Array of technology objects with name, version (e.g., [{'name': 'PHP', 'version': '5.2.4'}])", + }, + "endpoints": { + "type": "array", + "description": "Array of endpoint objects with path, methods (e.g., [{'path': '/admin', 'methods': ['GET', 'POST']}])", + }, + "weaknesses": { + "type": "array", + "description": "Array of weakness objects for WG stage (e.g., [{'id': 'CVE-2024-1234', 'description': '...'}])", + }, + "affected_versions": { + "type": "object", + "description": "Version range constraints for vulnerabilities (e.g., {'PHP': '5.0.0 - 5.2.17'})", + }, }, required=["action"], ), @@ -192,7 +212,7 @@ async def notes(arguments: dict, runtime) -> str: confidence = arguments.get("confidence", "medium") status = arguments.get("status", "confirmed") - # Extract structured metadata + # Extract structured metadata (supports nested structures) metadata = {} for field in [ "source", @@ -203,6 +223,11 @@ async def notes(arguments: dict, runtime) -> str: "cve", "url", "evidence_path", + "services", # Array of service dicts: [{"port": 80, "product": "Apache", "version": "2.2.8"}] + "technologies", # Array of tech dicts: [{"name": "PHP", "version": "5.2.4"}] + "endpoints", # Array of endpoint dicts: [{"path": "/admin", "methods": ["GET", "POST"]}] + "weaknesses", # Array of weakness dicts for WG stage + "affected_versions", # Dict for version ranges ]: if field in arguments: metadata[field] = arguments[field]