diff --git a/pentestagent/agents/prompts/pa_agent.jinja b/pentestagent/agents/prompts/pa_agent.jinja index 1cf62ff..4ab9660 100644 --- a/pentestagent/agents/prompts/pa_agent.jinja +++ b/pentestagent/agents/prompts/pa_agent.jinja @@ -17,31 +17,23 @@ You are operating in an authorized penetration testing engagement. The user has - Think through the task step-by-step. - Use tools ONLY when you need to interact with the environment, gather information, execute something, or produce an artifact. - **Record Findings**: When you find something important, IMMEDIATELY save it using `notes(action="create", ...)`. This is your long-term memory and how you share data with the crew. -- **Structured Notes (CRITICAL)**: Your notes build a knowledge graph. You MUST use structured fields: - - **TARGET FIELD IS MANDATORY**: ANY note about a specific host MUST include `target="IP_ADDRESS"`. Without this, data cannot be linked in the knowledge graph. +- **Structured Notes (CRITICAL)**: Your notes build a knowledge graph. You MUST use structured fields correctly or the tool will reject them with validation errors. + - **TARGET FIELD IS MANDATORY**: For categories `credential`, `vulnerability`, and `finding`, you MUST include `target="IP_ADDRESS"`. Without this, the note will be rejected. - - **Credentials**: Always include `username`, `password`, `target` (where they work), optionally `source` (where found). - *Example*: `notes(action="create", key="creds_db", value="Found database credentials in config", category="credential", username="admin", password="P@ssw0rd", target="10.10.10.20", source="10.10.10.5:/var/www/config.php")` - *Web app*: `notes(action="create", key="creds_default", value="Default credentials work on DVWA", category="credential", username="admin", password="password", target="10.10.10.1", url="http://10.10.10.1/dvwa")` + - **Credentials** (category="credential"): MUST have `username` + (`password` OR `protocol`), and `target`. + *With password*: `notes(action="create", key="creds_db", value="Found database credentials in config", category="credential", username="admin", password="P@ssw0rd", target="10.10.10.20", source="10.10.10.5")` + *With protocol*: `notes(action="create", key="creds_ssh_key", value="Found SSH key for root user", category="credential", username="root", protocol="ssh", target="10.10.10.1", source="10.10.10.5")` - - **Host/Service Profiles**: When you discover services, technologies, or endpoints on a host, create ONE comprehensive note with nested arrays: - *Example*: `notes(action="create", key="profile_webserver", value="Apache 2.2.8 web server with PHP 5.2.4, MySQL backend, and vulnerable web apps", category="finding", target="10.10.10.1", services=[{"port": 80, "product": "Apache httpd", "version": "2.2.8"}, {"port": 3306, "product": "MySQL", "version": "5.0.51"}], technologies=[{"name": "PHP", "version": "5.2.4"}, {"name": "Ubuntu", "version": "8.04"}], endpoints=[{"path": "/phpMyAdmin", "methods": ["GET", "POST"]}, {"path": "/admin", "methods": ["GET"]}, {"path": "/dvwa/", "methods": ["GET", "POST"]}])` - - `services`: Array of discovered services with port, product, version - - `technologies`: Array of tech stack components (OS, frameworks, libraries) - - `endpoints`: Array of discovered web paths with HTTP methods - - ALL THREE arrays are optional but powerful when combined + - **Host/Service Profiles** (category="finding"): MUST have `target` + at least one of (`services`, `endpoints`, `technologies`, or `port`). + *Comprehensive*: `notes(action="create", key="profile_webserver", value="Apache 2.2.8 web server with PHP 5.2.4, MySQL backend, and vulnerable web apps", category="finding", target="10.10.10.1", services=[{"port": 80, "product": "Apache httpd", "version": "2.2.8"}, {"port": 3306, "product": "MySQL", "version": "5.0.51"}], technologies=[{"name": "PHP", "version": "5.2.4"}, {"name": "Ubuntu", "version": "8.04"}], endpoints=[{"path": "/phpMyAdmin", "methods": ["GET", "POST"]}, {"path": "/admin", "methods": ["GET"]}, {"path": "/dvwa/", "methods": ["GET", "POST"]}])` + *Simple single service*: `notes(action="create", key="http_open", value="HTTP service on port 80", category="finding", target="10.10.10.1", port="80", url="http://10.10.10.1")` - - **Simple Service Discovery**: For quick single-service notes: - *Example*: `notes(action="create", key="http_open", value="HTTP service on port 80", category="finding", target="10.10.10.1", port="80", url="http://10.10.10.1")` + - **Vulnerabilities** (category="vulnerability"): MUST have `target` + (`cve` OR `weaknesses` array). + *With CVE*: `notes(action="create", key="vuln_php_cgi", value="PHP-CGI vulnerable to CVE-2012-1823 RCE", category="vulnerability", cve="CVE-2012-1823", target="10.10.10.1", affected_versions={"PHP": "5.0.0 - 5.3.11", "PHP-CGI": "5.0.0 - 5.4.1"})` + *With weaknesses array*: `notes(action="create", key="vuln_apache_multiple", value="Apache 2.2.8 has multiple exploitable vulnerabilities", category="vulnerability", target="10.10.10.1", weaknesses=[{"id": "CVE-2011-3192", "description": "Range header DoS"}, {"id": "CVE-2011-3368", "description": "Reverse proxy bypass"}])` - - **Vulnerabilities**: Use `cve`, `target`, and `affected_versions` for version-specific vulns: - *Example*: `notes(action="create", key="vuln_php_cgi", value="PHP-CGI vulnerable to CVE-2012-1823 RCE", category="vulnerability", cve="CVE-2012-1823", target="10.10.10.1", affected_versions={"PHP": "5.0.0 - 5.3.11", "PHP-CGI": "5.0.0 - 5.4.1"})` - - - **Weakness Candidates**: When gathering potential vulns to filter, include `target` and `weaknesses` array: - *Example*: `notes(action="create", key="weak_candidates_apache", value="Apache 2.2.8 has multiple known CVEs", category="finding", target="10.10.10.1", weaknesses=[{"id": "CVE-2011-3192", "description": "Range header DoS"}, {"id": "CVE-2011-3368", "description": "Reverse proxy bypass"}])` - - - **Evidence**: If you have a screenshot or file, use `evidence_path`. - *Example*: `notes(action="create", key="admin_panel_screenshot", value="Found admin panel at /manager/html", category="finding", target="10.10.10.1", evidence_path="loot/artifacts/screenshots/admin_panel.png")` + - **Evidence/Artifacts** (category="info" or "artifact"): No validation, optional `target`. + *Example*: `notes(action="create", key="admin_panel_screenshot", value="Found admin panel at /manager/html", category="artifact", target="10.10.10.1", evidence_path="loot/artifacts/screenshots/admin_panel.png", url="http://10.10.10.1/manager/html")` - Do NOT describe actions you *could* take — if an action is needed, actually use the tool. - After EVERY action that completes a plan step, call `finish` to mark it done. - The pattern is: use tool → finish(action="complete", step_id=N) → use next tool → finish(action="complete", step_id=N+1) → repeat diff --git a/pentestagent/knowledge/graph.py b/pentestagent/knowledge/graph.py index 96e2023..da0d6c1 100644 --- a/pentestagent/knowledge/graph.py +++ b/pentestagent/knowledge/graph.py @@ -126,12 +126,20 @@ class ShadowGraph: self._add_node(node_id, "host", ip) hosts.append(node_id) - # 2. Handle specific categories - if category == "credential": + # 2. Process structured metadata regardless of category + # Category is organizational only - the metadata structure determines graph entities + + # Process credential data if present + if category == "credential" or metadata.get("username") or metadata.get("password"): self._process_credential(key, content, hosts, metadata, status) - elif category == "finding": - self._process_finding(key, content, hosts, metadata, status) - elif category == "vulnerability": + + # Process services/endpoints/technologies if present + if (metadata.get("services") or metadata.get("endpoints") or + metadata.get("technologies") or metadata.get("port")): + self._process_services_and_tech(key, content, hosts, metadata, status) + + # Process vulnerability data if present + if category == "vulnerability" or metadata.get("cve") or metadata.get("weaknesses"): self._process_vulnerability(key, content, hosts, metadata, status) # 3. Link note to hosts (provenance) @@ -161,50 +169,34 @@ class ShadowGraph: if status in ["closed", "filtered"]: return - # Extract username from metadata or regex + # Extract username (with fallback for legacy notes) username = metadata.get("username") if not username: user_match = self._user_pattern.search(content) - username = user_match.group(1) if user_match else None + username = user_match.group(1) if user_match else "unknown" cred_id = f"cred:{key}" - label = f"Creds ({username})" if username else "Credentials" + label = f"Creds ({username})" if username != "unknown" else "Credentials" self._add_node(cred_id, "credential", label) - # Check for "found on" source host + # Get source host from metadata (validated notes have this if needed) source_host = None if metadata.get("source"): source_ip = metadata["source"] source_host = f"host:{source_ip}" - else: - source_match = self._source_pattern.search(content) - if source_match: - source_ip = source_match.group(1) - source_host = f"host:{source_ip}" - - if source_host: - # Add CONTAINS edge: Host -> Cred if self.graph.has_node(source_host): self._add_edge(source_host, cred_id, "CONTAINS") - # Link cred to hosts it belongs to (or works on) + # Link cred to target hosts (AUTH_ACCESS) for host_id in related_hosts: - # If this host is the source, skip adding it as a target unless explicitly clear? - # For now, if we identified it as source, assume it's NOT the target unless it's the only one? - # Let's just exclude the source host from being an AUTH_ACCESS target to avoid loops, - # unless we want to represent local privesc (which is valid). - # But for pivoting, we care about A -> Cred -> B. - - # If we found a source, and this host is that source, treat it as CONTAINS (already done). - # Otherwise, treat as AUTH_ACCESS. + # Don't create AUTH_ACCESS to the source host (already has CONTAINS edge) if source_host and host_id == source_host: continue - # If the note says "ssh", assume SSH access - protocol = "ssh" if "ssh" in content.lower() else "unknown" + protocol = metadata.get("protocol", "unknown") self._add_edge(cred_id, host_id, "AUTH_ACCESS", protocol=protocol) - def _process_finding( + def _process_services_and_tech( self, key: str, content: str, @@ -212,121 +204,93 @@ class ShadowGraph: metadata: Dict[str, Any], status: str, ) -> None: - """Process a finding note (e.g., open ports).""" + """Process services, endpoints, and technologies metadata (from any note type).""" # Skip if status is closed/filtered if status in ["closed", "filtered"]: return - # Filter related_hosts: If we have explicit target metadata, ONLY use that. - # Otherwise, use all related hosts (fallback to regex behavior). + # Target is validated for finding category, trusted for others target_hosts = related_hosts if metadata.get("target"): target_ip = metadata["target"] target_id = f"host:{target_ip}" - # Only use the target if it's in the related_hosts list (sanity check) if target_id in related_hosts: target_hosts = [target_id] - # Handle nested services metadata + # Process structured service metadata if metadata.get("services"): for svc in metadata["services"]: port = svc.get("port") + if not port: + continue + product = svc.get("product", "") version = svc.get("version", "") proto = svc.get("protocol", "tcp") - if port: - for host_id in target_hosts: - service_id = f"service:{host_id}:{port}" - label = f"{port}/{proto}" - if product: - label += f" {product}" - if version: - label += f" {version}" + for host_id in target_hosts: + service_id = f"service:{host_id}:{port}" + label = f"{port}/{proto}" + if product: + label += f" {product}" + if version: + label += f" {version}" - self._add_node( - service_id, - "service", - label, - product=product, - version=version, - ) - self._add_edge( - host_id, service_id, "HAS_SERVICE", protocol=proto - ) + self._add_node( + service_id, + "service", + label, + product=product, + version=version, + ) + self._add_edge(host_id, service_id, "HAS_SERVICE", protocol=proto) - # Handle nested endpoints metadata + # Process structured endpoint metadata if metadata.get("endpoints"): for ep in metadata["endpoints"]: path = ep.get("path") + if not path: + continue + methods = ep.get("methods", []) - if path: - for host_id in target_hosts: - endpoint_id = f"endpoint:{host_id}:{path}" - label = path - if methods: - label += f" ({','.join(methods)})" + for host_id in target_hosts: + endpoint_id = f"endpoint:{host_id}:{path}" + label = path + if methods: + label += f" ({','.join(methods)})" - self._add_node(endpoint_id, "endpoint", label, methods=methods) - self._add_edge(host_id, endpoint_id, "HAS_ENDPOINT") + self._add_node(endpoint_id, "endpoint", label, methods=methods) + self._add_edge(host_id, endpoint_id, "HAS_ENDPOINT") - # Handle nested technologies metadata + # Process structured technology metadata if metadata.get("technologies"): for tech in metadata["technologies"]: name = tech.get("name") + if not name: + continue + version = tech.get("version", "") - if name: - for host_id in target_hosts: - tech_id = f"tech:{host_id}:{name}" - label = name - if version and version != "unknown": - label += f" {version}" + for host_id in target_hosts: + tech_id = f"tech:{host_id}:{name}" + label = name + if version and version != "unknown": + label += f" {version}" - self._add_node( - tech_id, "technology", label, name=name, version=version - ) - self._add_edge(host_id, tech_id, "USES_TECH") + self._add_node( + tech_id, "technology", label, name=name, version=version + ) + self._add_edge(host_id, tech_id, "USES_TECH") - # If we processed nested metadata, we're done - if ( - metadata.get("services") - or metadata.get("endpoints") - or metadata.get("technologies") - ): - return - - # Fallback to old port extraction logic - ports = [] - if metadata.get("port"): - # Handle single port or comma-separated in metadata - p = str(metadata["port"]) - if "," in p: - # Handle comma-separated list - for port_str in p.split(","): - port_str = port_str.strip() - proto = "tcp" - if "/" in port_str: - port_str, proto = port_str.split("/") - ports.append((port_str, proto)) - else: - # Single port - proto = "tcp" - if "/" in p: - p, proto = p.split("/") - ports.append((p, proto)) - - # Always check regex too, in case metadata missed some - regex_ports = self._port_pattern.findall(content) - for p, proto in regex_ports: - if (p, proto) not in ports: - ports.append((p, proto)) - - for port, proto in ports: + # Handle simple port field (for quick single-service notes or legacy data) + if metadata.get("port") and not metadata.get("services"): + port_str = str(metadata["port"]) + proto = "tcp" + if "/" in port_str: + port_str, proto = port_str.split("/") + for host_id in target_hosts: - service_id = f"service:{host_id}:{port}" - - # Add URL to label if present - label = f"{port}/{proto}" + service_id = f"service:{host_id}:{port_str}" + label = f"{port_str}/{proto}" if metadata.get("url"): label += f" ({metadata['url']})" @@ -346,7 +310,7 @@ class ShadowGraph: if status in ["closed", "filtered"]: return - # Filter related_hosts: If we have explicit target metadata, ONLY use that. + # Target is validated for vulnerability category, so we can trust it target_hosts = related_hosts if metadata.get("target"): target_ip = metadata["target"] @@ -356,14 +320,14 @@ class ShadowGraph: vuln_id = f"vuln:{key}" - # Try to extract CVE from metadata or regex + # Get label from CVE or first weakness ID label = "Vulnerability" if metadata.get("cve"): label = metadata["cve"] - else: - cve_match = re.search(r"CVE-\d{4}-\d{4,7}", content, re.IGNORECASE) - if cve_match: - label = cve_match.group(0) + elif metadata.get("weaknesses") and len(metadata["weaknesses"]) > 0: + # Use first weakness ID as label + first_weakness = metadata["weaknesses"][0] + label = first_weakness.get("id", "Vulnerability") self._add_node(vuln_id, "vulnerability", label) @@ -392,16 +356,6 @@ class ShadowGraph: ) # Insight 2: High Value Targets (Hosts with many open ports/vulns/endpoints) - high_value_endpoints = [ - "admin", - "phpmyadmin", - "phpMyAdmin", - "manager", - "console", - "webdav", - "dav", - ] - for node, data in self.graph.nodes(data=True): if data.get("type") == "host": # Count services @@ -443,14 +397,6 @@ class ShadowGraph: parts.append(f"{len(vulns)} vulnerabilities") insights.append(f"Host {data['label']} has {', '.join(parts)}.") - # Flag high-value endpoints - for ep_id in endpoints: - ep_label = self.graph.nodes[ep_id].get("label", "") - if any(hv in ep_label.lower() for hv in high_value_endpoints): - insights.append( - f"⚠️ High-value endpoint detected: {ep_label} on {data['label']}" - ) - # Insight 3: Potential Pivots (Host A -> Cred -> Host B) # Use NetworkX to find paths from Credentials to Hosts that aren't directly connected attack_paths = self._find_attack_paths() diff --git a/pentestagent/tools/notes/__init__.py b/pentestagent/tools/notes/__init__.py index 1c5391b..651262f 100644 --- a/pentestagent/tools/notes/__init__.py +++ b/pentestagent/tools/notes/__init__.py @@ -84,6 +84,58 @@ def set_notes_file(path: Path) -> None: _load_notes_unlocked() +# Validation schema - declarative rules for note structure +HOST_SPECIFIC_FIELDS = {"services", "endpoints", "technologies", "port"} + +CATEGORY_REQUIREMENTS = { + "credential": { + "required": ["username", "target"], + "one_of": [["password", "protocol"]], + }, + "vulnerability": { + "required": ["target"], + "one_of": [["cve", "weaknesses"]], + }, + "finding": { + "required": ["target"], + "one_of": [["services", "endpoints", "technologies", "port"]], + }, +} + + +def _validate_note_schema(category: str, metadata: Dict[str, Any]) -> str | None: + """ + Validate note schema based on declarative rules. + + Returns: + Error message if validation fails, None if valid + """ + # Check if note has host-specific structured data + has_host_data = bool(HOST_SPECIFIC_FIELDS & metadata.keys()) + + # If note has host-specific data, require target + if has_host_data and not metadata.get("target"): + fields = ", ".join(f"'{f}'" for f in HOST_SPECIFIC_FIELDS if f in metadata) + return f"Error: 'target' field is required when providing host-specific data ({fields})." + + # Apply category-specific validation rules + if category in CATEGORY_REQUIREMENTS: + rules = CATEGORY_REQUIREMENTS[category] + + # Check required fields + for field in rules.get("required", []): + if not metadata.get(field): + return f"Error: '{field}' field is required for category '{category}'." + + # Check one_of constraints (at least one field from each group must be present) + for field_group in rules.get("one_of", []): + if not any(metadata.get(field) for field in field_group): + field_list = "' or '".join(field_group) + return f"Error: At least one of '{field_list}' is required for category '{category}'." + + return None + + @register_tool( name="notes", description="Manage persistent notes for key findings. Actions: create, read, update, delete, list.", @@ -241,6 +293,11 @@ async def notes(arguments: dict, runtime) -> str: if key in _notes: return f"Error: note '{key}' already exists. Use 'update' to modify." + # Validate schema based on category + validation_error = _validate_note_schema(category, metadata) + if validation_error: + return validation_error + _notes[key] = { "content": value, "category": category, @@ -274,6 +331,11 @@ async def notes(arguments: dict, runtime) -> str: existed = key in _notes + # Validate schema based on category + validation_error = _validate_note_schema(category, metadata) + if validation_error: + return validation_error + # Merge metadata if updating? For now, overwrite to keep it simple and consistent with content _notes[key] = { "content": value,