feat: add nested metadata to notes

This commit is contained in:
GH05TCREW
2025-12-22 13:54:28 -07:00
parent f60f943ff7
commit 642ce2f4cc
4 changed files with 166 additions and 18 deletions

View File

@@ -18,12 +18,30 @@ You are operating in an authorized penetration testing engagement. The user has
- Use tools ONLY when you need to interact with the environment, gather information, execute something, or produce an artifact.
- **Record Findings**: When you find something important, IMMEDIATELY save it using `notes(action="create", ...)`. This is your long-term memory and how you share data with the crew.
- **Structured Notes (CRITICAL)**: Your notes build a knowledge graph. You MUST use structured fields:
- **Credentials**: `username`, `password` (or hash), `target` (where they work), `source` (where found).
*Example*: `notes(action="create", key="creds_db", value="Found in config", category="credential", username="admin", password="123", target="10.10.10.20", source="10.10.10.5")`
- **Services**: `port` (e.g. "80/tcp"), `target`, `url` (if web).
- **Vulnerabilities**: `cve`, `target`.
- **TARGET FIELD IS MANDATORY**: ANY note about a specific host MUST include `target="IP_ADDRESS"`. Without this, data cannot be linked in the knowledge graph.
- **Credentials**: Always include `username`, `password`, `target` (where they work), optionally `source` (where found).
*Example*: `notes(action="create", key="creds_db", value="Found database credentials in config", category="credential", username="admin", password="P@ssw0rd", target="10.10.10.20", source="10.10.10.5:/var/www/config.php")`
*Web app*: `notes(action="create", key="creds_default", value="Default credentials work on DVWA", category="credential", username="admin", password="password", target="10.10.10.1", url="http://10.10.10.1/dvwa")`
- **Host/Service Profiles**: When you discover services, technologies, or endpoints on a host, create ONE comprehensive note with nested arrays:
*Example*: `notes(action="create", key="profile_webserver", value="Apache 2.2.8 web server with PHP 5.2.4, MySQL backend, and vulnerable web apps", category="finding", target="10.10.10.1", services=[{"port": 80, "product": "Apache httpd", "version": "2.2.8"}, {"port": 3306, "product": "MySQL", "version": "5.0.51"}], technologies=[{"name": "PHP", "version": "5.2.4"}, {"name": "Ubuntu", "version": "8.04"}], endpoints=[{"path": "/phpMyAdmin", "methods": ["GET", "POST"]}, {"path": "/admin", "methods": ["GET"]}, {"path": "/dvwa/", "methods": ["GET", "POST"]}])`
- `services`: Array of discovered services with port, product, version
- `technologies`: Array of tech stack components (OS, frameworks, libraries)
- `endpoints`: Array of discovered web paths with HTTP methods
- ALL THREE arrays are optional but powerful when combined
- **Simple Service Discovery**: For quick single-service notes:
*Example*: `notes(action="create", key="http_open", value="HTTP service on port 80", category="finding", target="10.10.10.1", port="80", url="http://10.10.10.1")`
- **Vulnerabilities**: Use `cve`, `target`, and `affected_versions` for version-specific vulns:
*Example*: `notes(action="create", key="vuln_php_cgi", value="PHP-CGI vulnerable to CVE-2012-1823 RCE", category="vulnerability", cve="CVE-2012-1823", target="10.10.10.1", affected_versions={"PHP": "5.0.0 - 5.3.11", "PHP-CGI": "5.0.0 - 5.4.1"})`
- **Weakness Candidates**: When gathering potential vulns to filter, include `target` and `weaknesses` array:
*Example*: `notes(action="create", key="weak_candidates_apache", value="Apache 2.2.8 has multiple known CVEs", category="finding", target="10.10.10.1", weaknesses=[{"id": "CVE-2011-3192", "description": "Range header DoS"}, {"id": "CVE-2011-3368", "description": "Reverse proxy bypass"}])`
- **Evidence**: If you have a screenshot or file, use `evidence_path`.
- **General**: Always specify `target` to link the note to a host.
*Example*: `notes(action="create", key="admin_panel_screenshot", value="Found admin panel at /manager/html", category="finding", target="10.10.10.1", evidence_path="loot/artifacts/screenshots/admin_panel.png")`
- Do NOT describe actions you *could* take — if an action is needed, actually use the tool.
- After EVERY action that completes a plan step, call `finish` to mark it done.
- The pattern is: use tool → finish(action="complete", step_id=N) → use next tool → finish(action="complete", step_id=N+1) → repeat

View File

@@ -3,6 +3,7 @@ PentestAgent TUI - Terminal User Interface
"""
import asyncio
import re
import textwrap
from datetime import datetime
from pathlib import Path
@@ -28,6 +29,8 @@ from textual.widgets.tree import TreeNode
from ..config.constants import DEFAULT_MODEL
# ANSI escape sequence pattern for stripping control codes from input
_ANSI_ESCAPE = re.compile(r'\x1b\[[0-9;]*[mGKHflSTABCDEFsu]|\x1b\].*?\x07|\x1b\[<[0-9;]*[Mm]')
# ASCII-safe scrollbar renderer to avoid Unicode glyph issues
class ASCIIScrollBarRender(ScrollBarRender):
@@ -1040,7 +1043,8 @@ Be concise. Use the actual data from notes."""
if self._is_initializing or self._is_running:
return
message = event.value.strip()
# Strip ANSI escape sequences and control codes
message = _ANSI_ESCAPE.sub('', event.value).strip()
if not message:
return

View File

@@ -227,16 +227,79 @@ class ShadowGraph:
if target_id in related_hosts:
target_hosts = [target_id]
# Extract ports from metadata or regex
# Handle nested services metadata
if metadata.get("services"):
for svc in metadata["services"]:
port = svc.get("port")
product = svc.get("product", "")
version = svc.get("version", "")
proto = svc.get("protocol", "tcp")
if port:
for host_id in target_hosts:
service_id = f"service:{host_id}:{port}"
label = f"{port}/{proto}"
if product:
label += f" {product}"
if version:
label += f" {version}"
self._add_node(service_id, "service", label, product=product, version=version)
self._add_edge(host_id, service_id, "HAS_SERVICE", protocol=proto)
# Handle nested endpoints metadata
if metadata.get("endpoints"):
for ep in metadata["endpoints"]:
path = ep.get("path")
methods = ep.get("methods", [])
if path:
for host_id in target_hosts:
endpoint_id = f"endpoint:{host_id}:{path}"
label = path
if methods:
label += f" ({','.join(methods)})"
self._add_node(endpoint_id, "endpoint", label, methods=methods)
self._add_edge(host_id, endpoint_id, "HAS_ENDPOINT")
# Handle nested technologies metadata
if metadata.get("technologies"):
for tech in metadata["technologies"]:
name = tech.get("name")
version = tech.get("version", "")
if name:
for host_id in target_hosts:
tech_id = f"tech:{host_id}:{name}"
label = name
if version and version != "unknown":
label += f" {version}"
self._add_node(tech_id, "technology", label, name=name, version=version)
self._add_edge(host_id, tech_id, "USES_TECH")
# If we processed nested metadata, we're done
if metadata.get("services") or metadata.get("endpoints") or metadata.get("technologies"):
return
# Fallback to old port extraction logic
ports = []
if metadata.get("port"):
# Handle single port in metadata
# Handle single port or comma-separated in metadata
p = str(metadata["port"])
# Assume tcp if not specified?
proto = "tcp"
if "/" in p:
p, proto = p.split("/")
ports.append((p, proto))
if "," in p:
# Handle comma-separated list
for port_str in p.split(","):
port_str = port_str.strip()
proto = "tcp"
if "/" in port_str:
port_str, proto = port_str.split("/")
ports.append((port_str, proto))
else:
# Single port
proto = "tcp"
if "/" in p:
p, proto = p.split("/")
ports.append((p, proto))
# Always check regex too, in case metadata missed some
regex_ports = self._port_pattern.findall(content)
@@ -314,7 +377,9 @@ class ShadowGraph:
f"We have credentials that provide access to: {', '.join(target_labels)}"
)
# Insight 2: High Value Targets (Hosts with many open ports/vulns)
# Insight 2: High Value Targets (Hosts with many open ports/vulns/endpoints)
high_value_endpoints = ["admin", "phpmyadmin", "phpMyAdmin", "manager", "console", "webdav", "dav"]
for node, data in self.graph.nodes(data=True):
if data.get("type") == "host":
# Count services
@@ -328,11 +393,38 @@ class ShadowGraph:
for u, v in self.graph.out_edges(node)
if self.graph.nodes[v].get("type") == "vulnerability"
]
endpoints = [
v
for u, v in self.graph.out_edges(node)
if self.graph.nodes[v].get("type") == "endpoint"
]
technologies = [
v
for u, v in self.graph.out_edges(node)
if self.graph.nodes[v].get("type") == "technology"
]
if len(services) > 0 or len(vulns) > 0:
if len(services) > 0 or len(vulns) > 0 or len(endpoints) > 0 or len(technologies) > 0:
parts = []
if len(services) > 0:
parts.append(f"{len(services)} services")
if len(endpoints) > 0:
parts.append(f"{len(endpoints)} endpoints")
if len(technologies) > 0:
parts.append(f"{len(technologies)} technologies")
if len(vulns) > 0:
parts.append(f"{len(vulns)} vulnerabilities")
insights.append(
f"Host {data['label']} has {len(services)} services and {len(vulns)} known vulnerabilities."
f"Host {data['label']} has {', '.join(parts)}."
)
# Flag high-value endpoints
for ep_id in endpoints:
ep_label = self.graph.nodes[ep_id].get("label", "")
if any(hv in ep_label.lower() for hv in high_value_endpoints):
insights.append(
f"⚠️ High-value endpoint detected: {ep_label} on {data['label']}"
)
# Insight 3: Potential Pivots (Host A -> Cred -> Host B)
# Use NetworkX to find paths from Credentials to Hosts that aren't directly connected
@@ -411,6 +503,15 @@ class ShadowGraph:
"hosts": len(
[n for n, d in self.graph.nodes(data=True) if d["type"] == "host"]
),
"services": len(
[n for n, d in self.graph.nodes(data=True) if d["type"] == "service"]
),
"endpoints": len(
[n for n, d in self.graph.nodes(data=True) if d["type"] == "endpoint"]
),
"technologies": len(
[n for n, d in self.graph.nodes(data=True) if d["type"] == "technology"]
),
"creds": len(
[n for n, d in self.graph.nodes(data=True) if d["type"] == "credential"]
),
@@ -422,4 +523,4 @@ class ShadowGraph:
]
),
}
return f"Graph State: {stats['hosts']} Hosts, {stats['creds']} Credentials, {stats['vulns']} Vulnerabilities"
return f"Graph State: {stats['hosts']} Hosts, {stats['services']} Services, {stats['endpoints']} Endpoints, {stats['technologies']} Technologies, {stats['creds']} Credentials, {stats['vulns']} Vulnerabilities"

View File

@@ -156,6 +156,26 @@ _load_notes_unlocked()
"type": "string",
"description": "Path to a screenshot or downloaded file supporting this finding",
},
"services": {
"type": "array",
"description": "Array of service objects with port, product, version (e.g., [{'port': 80, 'product': 'Apache', 'version': '2.2.8'}])",
},
"technologies": {
"type": "array",
"description": "Array of technology objects with name, version (e.g., [{'name': 'PHP', 'version': '5.2.4'}])",
},
"endpoints": {
"type": "array",
"description": "Array of endpoint objects with path, methods (e.g., [{'path': '/admin', 'methods': ['GET', 'POST']}])",
},
"weaknesses": {
"type": "array",
"description": "Array of weakness objects for WG stage (e.g., [{'id': 'CVE-2024-1234', 'description': '...'}])",
},
"affected_versions": {
"type": "object",
"description": "Version range constraints for vulnerabilities (e.g., {'PHP': '5.0.0 - 5.2.17'})",
},
},
required=["action"],
),
@@ -192,7 +212,7 @@ async def notes(arguments: dict, runtime) -> str:
confidence = arguments.get("confidence", "medium")
status = arguments.get("status", "confirmed")
# Extract structured metadata
# Extract structured metadata (supports nested structures)
metadata = {}
for field in [
"source",
@@ -203,6 +223,11 @@ async def notes(arguments: dict, runtime) -> str:
"cve",
"url",
"evidence_path",
"services", # Array of service dicts: [{"port": 80, "product": "Apache", "version": "2.2.8"}]
"technologies", # Array of tech dicts: [{"name": "PHP", "version": "5.2.4"}]
"endpoints", # Array of endpoint dicts: [{"path": "/admin", "methods": ["GET", "POST"]}]
"weaknesses", # Array of weakness dicts for WG stage
"affected_versions", # Dict for version ranges
]:
if field in arguments:
metadata[field] = arguments[field]