refactor: use workspaces.validation utilities for target extraction and scope checks

This commit is contained in:
giveen
2026-01-19 10:39:03 -07:00
parent 14ec8af4a4
commit 63233dc392

View File

@@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, Any, AsyncIterator, List, Optional
from ..config.constants import AGENT_MAX_ITERATIONS
from ..workspaces.manager import TargetManager, WorkspaceManager
from ..workspaces import validation
from .state import AgentState, AgentStateManager
if TYPE_CHECKING:
@@ -441,97 +442,14 @@ class BaseAgent(ABC):
wm = WorkspaceManager()
active = wm.get_active()
def _gather_candidate_targets(obj) -> list:
"""Extract candidate target strings from arguments (shallow)."""
candidates = []
if isinstance(obj, str):
candidates.append(obj)
elif isinstance(obj, dict):
for k, v in obj.items():
if k.lower() in (
"target",
"host",
"hostname",
"ip",
"address",
"url",
"hosts",
"targets",
):
if isinstance(v, (list, tuple)):
for it in v:
if isinstance(it, str):
candidates.append(it)
elif isinstance(v, str):
candidates.append(v)
return candidates
def _is_target_in_scope(candidate: str, allowed: list) -> bool:
"""Check if candidate target is covered by any allowed target (IP/CIDR/hostname)."""
import ipaddress
try:
# normalize candidate
norm = TargetManager.normalize_target(candidate)
except Exception:
return False
# If candidate is IP or CIDR, handle appropriately
try:
if "/" in norm:
cand_net = ipaddress.ip_network(norm, strict=False)
# If any allowed contains this network or equals it
for a in allowed:
try:
if "/" in a:
an = ipaddress.ip_network(a, strict=False)
if cand_net.subnet_of(an) or cand_net == an:
return True
else:
# allowed is IP or hostname; only accept if allowed is
# a single IP that exactly matches a single-address candidate
try:
allowed_ip = ipaddress.ip_address(a)
except Exception:
# not an IP (likely hostname) - skip
continue
# If candidate network represents exactly one address,
# allow it when that address equals the allowed IP
if cand_net.num_addresses == 1 and cand_net.network_address == allowed_ip:
return True
except Exception:
continue
return False
else:
cand_ip = ipaddress.ip_address(norm)
for a in allowed:
try:
if "/" in a:
an = ipaddress.ip_network(a, strict=False)
if cand_ip in an:
return True
else:
if TargetManager.normalize_target(a) == norm:
return True
except Exception:
# hostname allowed entries fall through
if isinstance(a, str) and a.lower() == norm.lower():
return True
return False
except Exception:
# candidate is likely hostname
for a in allowed:
if a.lower() == norm.lower():
return True
return False
# Use centralized validation helpers for target extraction and scope checks
candidates = validation.gather_candidate_targets(arguments)
out_of_scope = []
if active:
allowed = wm.list_targets(active)
candidates = _gather_candidate_targets(arguments)
for c in candidates:
try:
if not _is_target_in_scope(c, allowed):
if not validation.is_target_in_scope(c, allowed):
out_of_scope.append(c)
except Exception:
out_of_scope.append(c)