diff --git a/pentestagent/agents/base_agent.py b/pentestagent/agents/base_agent.py index d03a4d3..dc4b5ef 100644 --- a/pentestagent/agents/base_agent.py +++ b/pentestagent/agents/base_agent.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING, Any, AsyncIterator, List, Optional from ..config.constants import AGENT_MAX_ITERATIONS from ..workspaces.manager import TargetManager, WorkspaceManager +from ..workspaces import validation from .state import AgentState, AgentStateManager if TYPE_CHECKING: @@ -441,97 +442,14 @@ class BaseAgent(ABC): wm = WorkspaceManager() active = wm.get_active() - def _gather_candidate_targets(obj) -> list: - """Extract candidate target strings from arguments (shallow).""" - candidates = [] - if isinstance(obj, str): - candidates.append(obj) - elif isinstance(obj, dict): - for k, v in obj.items(): - if k.lower() in ( - "target", - "host", - "hostname", - "ip", - "address", - "url", - "hosts", - "targets", - ): - if isinstance(v, (list, tuple)): - for it in v: - if isinstance(it, str): - candidates.append(it) - elif isinstance(v, str): - candidates.append(v) - return candidates - - def _is_target_in_scope(candidate: str, allowed: list) -> bool: - """Check if candidate target is covered by any allowed target (IP/CIDR/hostname).""" - import ipaddress - - try: - # normalize candidate - norm = TargetManager.normalize_target(candidate) - except Exception: - return False - - # If candidate is IP or CIDR, handle appropriately - try: - if "/" in norm: - cand_net = ipaddress.ip_network(norm, strict=False) - # If any allowed contains this network or equals it - for a in allowed: - try: - if "/" in a: - an = ipaddress.ip_network(a, strict=False) - if cand_net.subnet_of(an) or cand_net == an: - return True - else: - # allowed is IP or hostname; only accept if allowed is - # a single IP that exactly matches a single-address candidate - try: - allowed_ip = ipaddress.ip_address(a) - except Exception: - # not an IP (likely hostname) - skip - continue - # If candidate network represents exactly one address, - # allow it when that address equals the allowed IP - if cand_net.num_addresses == 1 and cand_net.network_address == allowed_ip: - return True - except Exception: - continue - return False - else: - cand_ip = ipaddress.ip_address(norm) - for a in allowed: - try: - if "/" in a: - an = ipaddress.ip_network(a, strict=False) - if cand_ip in an: - return True - else: - if TargetManager.normalize_target(a) == norm: - return True - except Exception: - # hostname allowed entries fall through - if isinstance(a, str) and a.lower() == norm.lower(): - return True - return False - except Exception: - # candidate is likely hostname - for a in allowed: - if a.lower() == norm.lower(): - return True - return False - + # Use centralized validation helpers for target extraction and scope checks + candidates = validation.gather_candidate_targets(arguments) out_of_scope = [] if active: allowed = wm.list_targets(active) - candidates = _gather_candidate_targets(arguments) for c in candidates: try: - if not _is_target_in_scope(c, allowed): + if not validation.is_target_in_scope(c, allowed): out_of_scope.append(c) except Exception: out_of_scope.append(c)