feat: enforce strict schema validation for notes

This commit is contained in:
GH05TCREW
2025-12-22 16:31:22 -07:00
parent 9b14094a4a
commit 0fc28f304c
3 changed files with 156 additions and 156 deletions

View File

@@ -17,31 +17,23 @@ You are operating in an authorized penetration testing engagement. The user has
- Think through the task step-by-step.
- Use tools ONLY when you need to interact with the environment, gather information, execute something, or produce an artifact.
- **Record Findings**: When you find something important, IMMEDIATELY save it using `notes(action="create", ...)`. This is your long-term memory and how you share data with the crew.
- **Structured Notes (CRITICAL)**: Your notes build a knowledge graph. You MUST use structured fields:
- **TARGET FIELD IS MANDATORY**: ANY note about a specific host MUST include `target="IP_ADDRESS"`. Without this, data cannot be linked in the knowledge graph.
- **Structured Notes (CRITICAL)**: Your notes build a knowledge graph. You MUST use structured fields correctly or the tool will reject them with validation errors.
- **TARGET FIELD IS MANDATORY**: For categories `credential`, `vulnerability`, and `finding`, you MUST include `target="IP_ADDRESS"`. Without this, the note will be rejected.
- **Credentials**: Always include `username`, `password`, `target` (where they work), optionally `source` (where found).
*Example*: `notes(action="create", key="creds_db", value="Found database credentials in config", category="credential", username="admin", password="P@ssw0rd", target="10.10.10.20", source="10.10.10.5:/var/www/config.php")`
*Web app*: `notes(action="create", key="creds_default", value="Default credentials work on DVWA", category="credential", username="admin", password="password", target="10.10.10.1", url="http://10.10.10.1/dvwa")`
- **Credentials** (category="credential"): MUST have `username` + (`password` OR `protocol`), and `target`.
*With password*: `notes(action="create", key="creds_db", value="Found database credentials in config", category="credential", username="admin", password="P@ssw0rd", target="10.10.10.20", source="10.10.10.5")`
*With protocol*: `notes(action="create", key="creds_ssh_key", value="Found SSH key for root user", category="credential", username="root", protocol="ssh", target="10.10.10.1", source="10.10.10.5")`
- **Host/Service Profiles**: When you discover services, technologies, or endpoints on a host, create ONE comprehensive note with nested arrays:
*Example*: `notes(action="create", key="profile_webserver", value="Apache 2.2.8 web server with PHP 5.2.4, MySQL backend, and vulnerable web apps", category="finding", target="10.10.10.1", services=[{"port": 80, "product": "Apache httpd", "version": "2.2.8"}, {"port": 3306, "product": "MySQL", "version": "5.0.51"}], technologies=[{"name": "PHP", "version": "5.2.4"}, {"name": "Ubuntu", "version": "8.04"}], endpoints=[{"path": "/phpMyAdmin", "methods": ["GET", "POST"]}, {"path": "/admin", "methods": ["GET"]}, {"path": "/dvwa/", "methods": ["GET", "POST"]}])`
- `services`: Array of discovered services with port, product, version
- `technologies`: Array of tech stack components (OS, frameworks, libraries)
- `endpoints`: Array of discovered web paths with HTTP methods
- ALL THREE arrays are optional but powerful when combined
- **Host/Service Profiles** (category="finding"): MUST have `target` + at least one of (`services`, `endpoints`, `technologies`, or `port`).
*Comprehensive*: `notes(action="create", key="profile_webserver", value="Apache 2.2.8 web server with PHP 5.2.4, MySQL backend, and vulnerable web apps", category="finding", target="10.10.10.1", services=[{"port": 80, "product": "Apache httpd", "version": "2.2.8"}, {"port": 3306, "product": "MySQL", "version": "5.0.51"}], technologies=[{"name": "PHP", "version": "5.2.4"}, {"name": "Ubuntu", "version": "8.04"}], endpoints=[{"path": "/phpMyAdmin", "methods": ["GET", "POST"]}, {"path": "/admin", "methods": ["GET"]}, {"path": "/dvwa/", "methods": ["GET", "POST"]}])`
*Simple single service*: `notes(action="create", key="http_open", value="HTTP service on port 80", category="finding", target="10.10.10.1", port="80", url="http://10.10.10.1")`
- **Simple Service Discovery**: For quick single-service notes:
*Example*: `notes(action="create", key="http_open", value="HTTP service on port 80", category="finding", target="10.10.10.1", port="80", url="http://10.10.10.1")`
- **Vulnerabilities** (category="vulnerability"): MUST have `target` + (`cve` OR `weaknesses` array).
*With CVE*: `notes(action="create", key="vuln_php_cgi", value="PHP-CGI vulnerable to CVE-2012-1823 RCE", category="vulnerability", cve="CVE-2012-1823", target="10.10.10.1", affected_versions={"PHP": "5.0.0 - 5.3.11", "PHP-CGI": "5.0.0 - 5.4.1"})`
*With weaknesses array*: `notes(action="create", key="vuln_apache_multiple", value="Apache 2.2.8 has multiple exploitable vulnerabilities", category="vulnerability", target="10.10.10.1", weaknesses=[{"id": "CVE-2011-3192", "description": "Range header DoS"}, {"id": "CVE-2011-3368", "description": "Reverse proxy bypass"}])`
- **Vulnerabilities**: Use `cve`, `target`, and `affected_versions` for version-specific vulns:
*Example*: `notes(action="create", key="vuln_php_cgi", value="PHP-CGI vulnerable to CVE-2012-1823 RCE", category="vulnerability", cve="CVE-2012-1823", target="10.10.10.1", affected_versions={"PHP": "5.0.0 - 5.3.11", "PHP-CGI": "5.0.0 - 5.4.1"})`
- **Weakness Candidates**: When gathering potential vulns to filter, include `target` and `weaknesses` array:
*Example*: `notes(action="create", key="weak_candidates_apache", value="Apache 2.2.8 has multiple known CVEs", category="finding", target="10.10.10.1", weaknesses=[{"id": "CVE-2011-3192", "description": "Range header DoS"}, {"id": "CVE-2011-3368", "description": "Reverse proxy bypass"}])`
- **Evidence**: If you have a screenshot or file, use `evidence_path`.
*Example*: `notes(action="create", key="admin_panel_screenshot", value="Found admin panel at /manager/html", category="finding", target="10.10.10.1", evidence_path="loot/artifacts/screenshots/admin_panel.png")`
- **Evidence/Artifacts** (category="info" or "artifact"): No validation, optional `target`.
*Example*: `notes(action="create", key="admin_panel_screenshot", value="Found admin panel at /manager/html", category="artifact", target="10.10.10.1", evidence_path="loot/artifacts/screenshots/admin_panel.png", url="http://10.10.10.1/manager/html")`
- Do NOT describe actions you *could* take — if an action is needed, actually use the tool.
- After EVERY action that completes a plan step, call `finish` to mark it done.
- The pattern is: use tool → finish(action="complete", step_id=N) → use next tool → finish(action="complete", step_id=N+1) → repeat

View File

@@ -126,12 +126,20 @@ class ShadowGraph:
self._add_node(node_id, "host", ip)
hosts.append(node_id)
# 2. Handle specific categories
if category == "credential":
# 2. Process structured metadata regardless of category
# Category is organizational only - the metadata structure determines graph entities
# Process credential data if present
if category == "credential" or metadata.get("username") or metadata.get("password"):
self._process_credential(key, content, hosts, metadata, status)
elif category == "finding":
self._process_finding(key, content, hosts, metadata, status)
elif category == "vulnerability":
# Process services/endpoints/technologies if present
if (metadata.get("services") or metadata.get("endpoints") or
metadata.get("technologies") or metadata.get("port")):
self._process_services_and_tech(key, content, hosts, metadata, status)
# Process vulnerability data if present
if category == "vulnerability" or metadata.get("cve") or metadata.get("weaknesses"):
self._process_vulnerability(key, content, hosts, metadata, status)
# 3. Link note to hosts (provenance)
@@ -161,50 +169,34 @@ class ShadowGraph:
if status in ["closed", "filtered"]:
return
# Extract username from metadata or regex
# Extract username (with fallback for legacy notes)
username = metadata.get("username")
if not username:
user_match = self._user_pattern.search(content)
username = user_match.group(1) if user_match else None
username = user_match.group(1) if user_match else "unknown"
cred_id = f"cred:{key}"
label = f"Creds ({username})" if username else "Credentials"
label = f"Creds ({username})" if username != "unknown" else "Credentials"
self._add_node(cred_id, "credential", label)
# Check for "found on" source host
# Get source host from metadata (validated notes have this if needed)
source_host = None
if metadata.get("source"):
source_ip = metadata["source"]
source_host = f"host:{source_ip}"
else:
source_match = self._source_pattern.search(content)
if source_match:
source_ip = source_match.group(1)
source_host = f"host:{source_ip}"
if source_host:
# Add CONTAINS edge: Host -> Cred
if self.graph.has_node(source_host):
self._add_edge(source_host, cred_id, "CONTAINS")
# Link cred to hosts it belongs to (or works on)
# Link cred to target hosts (AUTH_ACCESS)
for host_id in related_hosts:
# If this host is the source, skip adding it as a target unless explicitly clear?
# For now, if we identified it as source, assume it's NOT the target unless it's the only one?
# Let's just exclude the source host from being an AUTH_ACCESS target to avoid loops,
# unless we want to represent local privesc (which is valid).
# But for pivoting, we care about A -> Cred -> B.
# If we found a source, and this host is that source, treat it as CONTAINS (already done).
# Otherwise, treat as AUTH_ACCESS.
# Don't create AUTH_ACCESS to the source host (already has CONTAINS edge)
if source_host and host_id == source_host:
continue
# If the note says "ssh", assume SSH access
protocol = "ssh" if "ssh" in content.lower() else "unknown"
protocol = metadata.get("protocol", "unknown")
self._add_edge(cred_id, host_id, "AUTH_ACCESS", protocol=protocol)
def _process_finding(
def _process_services_and_tech(
self,
key: str,
content: str,
@@ -212,121 +204,93 @@ class ShadowGraph:
metadata: Dict[str, Any],
status: str,
) -> None:
"""Process a finding note (e.g., open ports)."""
"""Process services, endpoints, and technologies metadata (from any note type)."""
# Skip if status is closed/filtered
if status in ["closed", "filtered"]:
return
# Filter related_hosts: If we have explicit target metadata, ONLY use that.
# Otherwise, use all related hosts (fallback to regex behavior).
# Target is validated for finding category, trusted for others
target_hosts = related_hosts
if metadata.get("target"):
target_ip = metadata["target"]
target_id = f"host:{target_ip}"
# Only use the target if it's in the related_hosts list (sanity check)
if target_id in related_hosts:
target_hosts = [target_id]
# Handle nested services metadata
# Process structured service metadata
if metadata.get("services"):
for svc in metadata["services"]:
port = svc.get("port")
if not port:
continue
product = svc.get("product", "")
version = svc.get("version", "")
proto = svc.get("protocol", "tcp")
if port:
for host_id in target_hosts:
service_id = f"service:{host_id}:{port}"
label = f"{port}/{proto}"
if product:
label += f" {product}"
if version:
label += f" {version}"
for host_id in target_hosts:
service_id = f"service:{host_id}:{port}"
label = f"{port}/{proto}"
if product:
label += f" {product}"
if version:
label += f" {version}"
self._add_node(
service_id,
"service",
label,
product=product,
version=version,
)
self._add_edge(
host_id, service_id, "HAS_SERVICE", protocol=proto
)
self._add_node(
service_id,
"service",
label,
product=product,
version=version,
)
self._add_edge(host_id, service_id, "HAS_SERVICE", protocol=proto)
# Handle nested endpoints metadata
# Process structured endpoint metadata
if metadata.get("endpoints"):
for ep in metadata["endpoints"]:
path = ep.get("path")
if not path:
continue
methods = ep.get("methods", [])
if path:
for host_id in target_hosts:
endpoint_id = f"endpoint:{host_id}:{path}"
label = path
if methods:
label += f" ({','.join(methods)})"
for host_id in target_hosts:
endpoint_id = f"endpoint:{host_id}:{path}"
label = path
if methods:
label += f" ({','.join(methods)})"
self._add_node(endpoint_id, "endpoint", label, methods=methods)
self._add_edge(host_id, endpoint_id, "HAS_ENDPOINT")
self._add_node(endpoint_id, "endpoint", label, methods=methods)
self._add_edge(host_id, endpoint_id, "HAS_ENDPOINT")
# Handle nested technologies metadata
# Process structured technology metadata
if metadata.get("technologies"):
for tech in metadata["technologies"]:
name = tech.get("name")
if not name:
continue
version = tech.get("version", "")
if name:
for host_id in target_hosts:
tech_id = f"tech:{host_id}:{name}"
label = name
if version and version != "unknown":
label += f" {version}"
for host_id in target_hosts:
tech_id = f"tech:{host_id}:{name}"
label = name
if version and version != "unknown":
label += f" {version}"
self._add_node(
tech_id, "technology", label, name=name, version=version
)
self._add_edge(host_id, tech_id, "USES_TECH")
self._add_node(
tech_id, "technology", label, name=name, version=version
)
self._add_edge(host_id, tech_id, "USES_TECH")
# If we processed nested metadata, we're done
if (
metadata.get("services")
or metadata.get("endpoints")
or metadata.get("technologies")
):
return
# Fallback to old port extraction logic
ports = []
if metadata.get("port"):
# Handle single port or comma-separated in metadata
p = str(metadata["port"])
if "," in p:
# Handle comma-separated list
for port_str in p.split(","):
port_str = port_str.strip()
proto = "tcp"
if "/" in port_str:
port_str, proto = port_str.split("/")
ports.append((port_str, proto))
else:
# Single port
proto = "tcp"
if "/" in p:
p, proto = p.split("/")
ports.append((p, proto))
# Always check regex too, in case metadata missed some
regex_ports = self._port_pattern.findall(content)
for p, proto in regex_ports:
if (p, proto) not in ports:
ports.append((p, proto))
for port, proto in ports:
# Handle simple port field (for quick single-service notes or legacy data)
if metadata.get("port") and not metadata.get("services"):
port_str = str(metadata["port"])
proto = "tcp"
if "/" in port_str:
port_str, proto = port_str.split("/")
for host_id in target_hosts:
service_id = f"service:{host_id}:{port}"
# Add URL to label if present
label = f"{port}/{proto}"
service_id = f"service:{host_id}:{port_str}"
label = f"{port_str}/{proto}"
if metadata.get("url"):
label += f" ({metadata['url']})"
@@ -346,7 +310,7 @@ class ShadowGraph:
if status in ["closed", "filtered"]:
return
# Filter related_hosts: If we have explicit target metadata, ONLY use that.
# Target is validated for vulnerability category, so we can trust it
target_hosts = related_hosts
if metadata.get("target"):
target_ip = metadata["target"]
@@ -356,14 +320,14 @@ class ShadowGraph:
vuln_id = f"vuln:{key}"
# Try to extract CVE from metadata or regex
# Get label from CVE or first weakness ID
label = "Vulnerability"
if metadata.get("cve"):
label = metadata["cve"]
else:
cve_match = re.search(r"CVE-\d{4}-\d{4,7}", content, re.IGNORECASE)
if cve_match:
label = cve_match.group(0)
elif metadata.get("weaknesses") and len(metadata["weaknesses"]) > 0:
# Use first weakness ID as label
first_weakness = metadata["weaknesses"][0]
label = first_weakness.get("id", "Vulnerability")
self._add_node(vuln_id, "vulnerability", label)
@@ -392,16 +356,6 @@ class ShadowGraph:
)
# Insight 2: High Value Targets (Hosts with many open ports/vulns/endpoints)
high_value_endpoints = [
"admin",
"phpmyadmin",
"phpMyAdmin",
"manager",
"console",
"webdav",
"dav",
]
for node, data in self.graph.nodes(data=True):
if data.get("type") == "host":
# Count services
@@ -443,14 +397,6 @@ class ShadowGraph:
parts.append(f"{len(vulns)} vulnerabilities")
insights.append(f"Host {data['label']} has {', '.join(parts)}.")
# Flag high-value endpoints
for ep_id in endpoints:
ep_label = self.graph.nodes[ep_id].get("label", "")
if any(hv in ep_label.lower() for hv in high_value_endpoints):
insights.append(
f"⚠️ High-value endpoint detected: {ep_label} on {data['label']}"
)
# Insight 3: Potential Pivots (Host A -> Cred -> Host B)
# Use NetworkX to find paths from Credentials to Hosts that aren't directly connected
attack_paths = self._find_attack_paths()

View File

@@ -84,6 +84,58 @@ def set_notes_file(path: Path) -> None:
_load_notes_unlocked()
# Validation schema - declarative rules for note structure
HOST_SPECIFIC_FIELDS = {"services", "endpoints", "technologies", "port"}
CATEGORY_REQUIREMENTS = {
"credential": {
"required": ["username", "target"],
"one_of": [["password", "protocol"]],
},
"vulnerability": {
"required": ["target"],
"one_of": [["cve", "weaknesses"]],
},
"finding": {
"required": ["target"],
"one_of": [["services", "endpoints", "technologies", "port"]],
},
}
def _validate_note_schema(category: str, metadata: Dict[str, Any]) -> str | None:
"""
Validate note schema based on declarative rules.
Returns:
Error message if validation fails, None if valid
"""
# Check if note has host-specific structured data
has_host_data = bool(HOST_SPECIFIC_FIELDS & metadata.keys())
# If note has host-specific data, require target
if has_host_data and not metadata.get("target"):
fields = ", ".join(f"'{f}'" for f in HOST_SPECIFIC_FIELDS if f in metadata)
return f"Error: 'target' field is required when providing host-specific data ({fields})."
# Apply category-specific validation rules
if category in CATEGORY_REQUIREMENTS:
rules = CATEGORY_REQUIREMENTS[category]
# Check required fields
for field in rules.get("required", []):
if not metadata.get(field):
return f"Error: '{field}' field is required for category '{category}'."
# Check one_of constraints (at least one field from each group must be present)
for field_group in rules.get("one_of", []):
if not any(metadata.get(field) for field in field_group):
field_list = "' or '".join(field_group)
return f"Error: At least one of '{field_list}' is required for category '{category}'."
return None
@register_tool(
name="notes",
description="Manage persistent notes for key findings. Actions: create, read, update, delete, list.",
@@ -241,6 +293,11 @@ async def notes(arguments: dict, runtime) -> str:
if key in _notes:
return f"Error: note '{key}' already exists. Use 'update' to modify."
# Validate schema based on category
validation_error = _validate_note_schema(category, metadata)
if validation_error:
return validation_error
_notes[key] = {
"content": value,
"category": category,
@@ -274,6 +331,11 @@ async def notes(arguments: dict, runtime) -> str:
existed = key in _notes
# Validate schema based on category
validation_error = _validate_note_schema(category, metadata)
if validation_error:
return validation_error
# Merge metadata if updating? For now, overwrite to keep it simple and consistent with content
_notes[key] = {
"content": value,