diff --git a/pentestagent/agents/base_agent.py b/pentestagent/agents/base_agent.py index 7ecaf25..d03a4d3 100644 --- a/pentestagent/agents/base_agent.py +++ b/pentestagent/agents/base_agent.py @@ -5,11 +5,8 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any, AsyncIterator, List, Optional from ..config.constants import AGENT_MAX_ITERATIONS +from ..workspaces.manager import TargetManager, WorkspaceManager from .state import AgentState, AgentStateManager -from types import MappingProxyType - -from ..workspaces.manager import WorkspaceManager, TargetManager, WorkspaceError -from ..workspaces.utils import resolve_knowledge_paths if TYPE_CHECKING: from ..llm import LLM @@ -83,94 +80,56 @@ class BaseAgent(ABC): tools: List["Tool"], runtime: "Runtime", max_iterations: int = AGENT_MAX_ITERATIONS, + **kwargs, ): """ - Initialize the base agent. + Initialize base agent state. Args: - llm: The LLM instance for generating responses - tools: List of tools available to the agent - runtime: The runtime environment for tool execution - max_iterations: Maximum iterations before forcing stop (safety limit) + llm: LLM instance used for generation + tools: Available tool list + runtime: Runtime used for tool execution + max_iterations: Safety limit for iterations """ self.llm = llm + self.tools = tools self.runtime = runtime self.max_iterations = max_iterations + + # Agent runtime state self.state_manager = AgentStateManager() self.conversation_history: List[AgentMessage] = [] - # Each agent gets its own plan instance - from ..tools.finish import TaskPlan + # Task planning structure (used by finish tool) + try: + from ..tools.finish import TaskPlan - self._task_plan = TaskPlan() + self._task_plan = TaskPlan() + except Exception: + # Fallback simple plan structure + class _SimplePlan: + def __init__(self): + self.steps = [] + self.original_request = "" - # Attach plan to runtime so finish tool can access it - self.runtime.plan = self._task_plan + def clear(self): + self.steps.clear() - # Use tools as-is (finish accesses plan via runtime) - self.tools = list(tools) + def is_complete(self): + return True - @property - def workspace_context(self): - """Return a read-only workspace context built at access time. + def has_failure(self): + return False - Uses WorkspaceManager.get_active() as the single source of truth - and does not cache state between calls. - """ - wm = WorkspaceManager() - active = wm.get_active() - if not active: - return None + self._task_plan = _SimplePlan() - targets = wm.list_targets(active) + # Expose plan to runtime so tools like `finish` can access it + try: + self.runtime.plan = self._task_plan + except Exception: + pass - kp = resolve_knowledge_paths() - knowledge_scope = "workspace" if kp.get("using_workspace") else "global" - - ctx = { - "name": active, - "targets": list(targets), - "has_targets": bool(targets), - "knowledge_scope": knowledge_scope, - } - - return MappingProxyType(ctx) - - @property - def state(self) -> AgentState: - """Get current agent state.""" - return self.state_manager.current_state - - @state.setter - def state(self, value: AgentState): - """Set agent state.""" - self.state_manager.transition_to(value) - - def cleanup_after_cancel(self) -> None: - """ - Clean up agent state after a cancellation. - - Removes the cancelled request and any pending tool calls from - conversation history to prevent stale responses from contaminating - the next conversation. - """ - # Remove incomplete messages from the end of conversation - while self.conversation_history: - last_msg = self.conversation_history[-1] - # Remove assistant message with tool calls (incomplete tool execution) - if last_msg.role == "assistant" and last_msg.tool_calls: - self.conversation_history.pop() - # Remove orphaned tool_result messages - elif last_msg.role == "tool": - self.conversation_history.pop() - # Remove the user message that triggered the cancelled request - elif last_msg.role == "user": - self.conversation_history.pop() - break # Stop after removing the user message - else: - break - - # Reset state to idle + # Ensure agent starts idle self.state_manager.transition_to(AgentState.IDLE) @abstractmethod @@ -529,8 +488,16 @@ class BaseAgent(ABC): if cand_net.subnet_of(an) or cand_net == an: return True else: - # allowed is IP/hostname - if ipaddress.ip_address(a) == list(cand_net.hosts())[0]: + # allowed is IP or hostname; only accept if allowed is + # a single IP that exactly matches a single-address candidate + try: + allowed_ip = ipaddress.ip_address(a) + except Exception: + # not an IP (likely hostname) - skip + continue + # If candidate network represents exactly one address, + # allow it when that address equals the allowed IP + if cand_net.num_addresses == 1 and cand_net.network_address == allowed_ip: return True except Exception: continue diff --git a/pentestagent/interface/main.py b/pentestagent/interface/main.py index 391b8ee..351992d 100644 --- a/pentestagent/interface/main.py +++ b/pentestagent/interface/main.py @@ -2,6 +2,7 @@ import argparse import asyncio +from pathlib import Path from ..config.constants import AGENT_MAX_ITERATIONS, DEFAULT_MODEL from .cli import run_cli @@ -325,10 +326,13 @@ def handle_mcp_command(args: argparse.Namespace): def handle_workspace_command(args: argparse.Namespace): """Handle workspace lifecycle commands and actions.""" - import shutil - from pentestagent.workspaces.manager import WorkspaceManager, WorkspaceError - from pentestagent.workspaces.utils import export_workspace, import_workspace, resolve_knowledge_paths + from pentestagent.workspaces.manager import WorkspaceError, WorkspaceManager + from pentestagent.workspaces.utils import ( + export_workspace, + import_workspace, + resolve_knowledge_paths, + ) wm = WorkspaceManager() @@ -400,14 +404,32 @@ def handle_workspace_command(args: argparse.Namespace): return if action == "note": - # Append operator note to active workspace (or specified) - name = rest[0] if rest and not rest[0].startswith("--") else wm.get_active() + # Append operator note to active workspace (or specified via --workspace/-w) + active = wm.get_active() + name = active + + text_parts = rest or [] + i = 0 + # Parse optional workspace selector flags before the note text. + while i < len(text_parts): + part = text_parts[i] + if part in ("--workspace", "-w"): + if i + 1 >= len(text_parts): + print("Usage: workspace note [--workspace NAME] ") + return + name = text_parts[i + 1] + i += 2 + continue + # First non-option token marks the start of the note text + break + if not name: print("No active workspace. Set one with /workspace .") return - text = " ".join(rest[1:]) if rest and rest[0] == name else " ".join(rest) + + text = " ".join(text_parts[i:]) if not text: - print("Usage: workspace note ") + print("Usage: workspace note [--workspace NAME] ") return try: wm.set_operator_note(name, text) @@ -501,7 +523,7 @@ def handle_workspaces_list(): def handle_target_command(args: argparse.Namespace): """Handle target add/list commands.""" - from pentestagent.workspaces.manager import WorkspaceManager, WorkspaceError + from pentestagent.workspaces.manager import WorkspaceError, WorkspaceManager wm = WorkspaceManager() active = wm.get_active() diff --git a/pentestagent/interface/notifier.py b/pentestagent/interface/notifier.py new file mode 100644 index 0000000..ae01ead --- /dev/null +++ b/pentestagent/interface/notifier.py @@ -0,0 +1,40 @@ +"""Simple notifier bridge for UI notifications. + +Modules can call `notify(level, message)` to emit operator-visible +notifications. A UI (TUI) may register a callback via `register_callback()` +to receive notifications and display them. If no callback is registered, +notifications are logged. +""" +import logging +from typing import Callable, Optional + +_callback: Optional[Callable[[str, str], None]] = None + + +def register_callback(cb: Callable[[str, str], None]) -> None: + """Register a callback to receive notifications. + + Callback receives (level, message). + """ + global _callback + _callback = cb + + +def notify(level: str, message: str) -> None: + """Emit a notification. If UI callback registered, call it; otherwise log.""" + global _callback + if _callback: + try: + _callback(level, message) + return + except Exception: + logging.getLogger(__name__).exception("Notifier callback failed") + + # Fallback to logging + log = logging.getLogger("pentestagent.notifier") + if level.lower() in ("error", "critical"): + log.error(message) + elif level.lower() in ("warn", "warning"): + log.warning(message) + else: + log.info(message) diff --git a/pentestagent/interface/tui.py b/pentestagent/interface/tui.py index 6282cfa..6bd22ec 100644 --- a/pentestagent/interface/tui.py +++ b/pentestagent/interface/tui.py @@ -1194,6 +1194,14 @@ class PentestAgentTUI(App): async def on_mount(self) -> None: """Initialize on mount""" + # Register notifier callback so other modules can emit operator-visible messages + try: + from .notifier import register_callback + + register_callback(self._notifier_callback) + except Exception: + pass + # Call the textual worker - decorator returns a Worker, not a coroutine _ = cast(Any, self._initialize_agent()) @@ -1340,6 +1348,37 @@ class PentestAgentTUI(App): except Exception: pass + def _show_notification(self, level: str, message: str) -> None: + """Display a short operator-visible notification in the chat area.""" + try: + # Prepend a concise system message so it is visible in the chat + prefix = "[!]" if level.lower() in ("error", "critical") else "[!]" + self._add_system(f"{prefix} {message}") + # Set status bar to error briefly for emphasis + if level.lower() in ("error", "critical"): + self._set_status("error") + except Exception: + pass + + def _notifier_callback(self, level: str, message: str) -> None: + """Callback wired to `pentestagent.interface.notifier`. + + This will be registered on mount so other modules can emit notifications. + """ + try: + # textual apps typically run in the main thread; try to schedule update + # using call_from_thread if available, otherwise call directly. + if hasattr(self, "call_from_thread"): + try: + self.call_from_thread(self._show_notification, level, message) + return + except Exception: + # Fall through to direct call + pass + self._show_notification(level, message) + except Exception: + pass + def _add_message(self, widget: Static) -> None: """Add a message widget to chat""" try: @@ -1798,9 +1837,12 @@ Be concise. Use the actual data from notes.""" elif cmd_original.startswith("/workspace"): # Support lightweight workspace management from the TUI try: - from pentestagent.workspaces.manager import WorkspaceManager, WorkspaceError + + from pentestagent.workspaces.manager import ( + WorkspaceError, + WorkspaceManager, + ) from pentestagent.workspaces.utils import resolve_knowledge_paths - from pathlib import Path wm = WorkspaceManager() rest = cmd_original[len("/workspace") :].strip() @@ -1869,13 +1911,27 @@ Be concise. Use the actual data from notes.""" return if verb == "note": - name = parts[1] if len(parts) > 1 and not parts[1].startswith("--") else wm.get_active() + # By default, use the active workspace; allow explicit override via --workspace/-w. + name = wm.get_active() + i = 1 + # Parse optional workspace selector flags before the note text. + while i < len(parts): + part = parts[i] + if part in ("--workspace", "-w"): + if i + 1 >= len(parts): + self._add_system("Usage: /workspace note [--workspace NAME] ") + return + name = parts[i + 1] + i += 2 + continue + # First non-option token marks the start of the note text + break if not name: self._add_system("No active workspace. Set one with /workspace .") return - text = " ".join(parts[1:]) if len(parts) > 1 and parts[1] == name else " ".join(parts[1:]) + text = " ".join(parts[i:]) if not text: - self._add_system("Usage: /workspace note ") + self._add_system("Usage: /workspace note [--workspace NAME] ") return try: wm.set_operator_note(name, text) diff --git a/pentestagent/knowledge/indexer.py b/pentestagent/knowledge/indexer.py index 3f4ce73..c04c252 100644 --- a/pentestagent/knowledge/indexer.py +++ b/pentestagent/knowledge/indexer.py @@ -5,8 +5,8 @@ from dataclasses import dataclass from pathlib import Path from typing import Any, List -from .rag import Document from ..workspaces.utils import resolve_knowledge_paths +from .rag import Document @dataclass diff --git a/pentestagent/knowledge/rag.py b/pentestagent/knowledge/rag.py index 2523a96..3616d76 100644 --- a/pentestagent/knowledge/rag.py +++ b/pentestagent/knowledge/rag.py @@ -1,14 +1,15 @@ """RAG (Retrieval Augmented Generation) engine for PentestAgent.""" import json +import logging from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional import numpy as np -from .embeddings import get_embeddings from ..workspaces.utils import resolve_knowledge_paths +from .embeddings import get_embeddings @dataclass @@ -84,12 +85,26 @@ class RAGEngine: try: self.load_index(idx_path) return - except Exception: - # Fall through to re-index if loading fails - pass - except Exception: - # Non-fatal — continue to index from sources - pass + except Exception as e: + logging.getLogger(__name__).exception( + "Failed to load persisted RAG index at %s, will re-index: %s", + idx_path, + e, + ) + try: + from ..interface.notifier import notify + + notify( + "warning", + f"Failed to load persisted RAG index at {idx_path}: {e}", + ) + except Exception: + pass + except Exception as e: + # Non-fatal — continue to index from sources, but log the error + logging.getLogger(__name__).exception( + "Error while checking for persisted workspace index: %s", e + ) # Process all files in knowledge directory if sources_base.exists(): @@ -133,7 +148,9 @@ class RAGEngine: ) ) except Exception as e: - print(f"[RAG] Error processing {file}: {e}") + logging.getLogger(__name__).exception( + "[RAG] Error processing %s: %s", file, e + ) self.documents = chunks @@ -161,11 +178,26 @@ class RAGEngine: idx_path = emb_dir / "index.pkl" try: self.save_index(idx_path) + except Exception as e: + logging.getLogger(__name__).exception( + "Failed to save RAG index to %s: %s", idx_path, e + ) + try: + from ..interface.notifier import notify + + notify("warning", f"Failed to save RAG index to {idx_path}: {e}") + except Exception: + pass + except Exception as e: + logging.getLogger(__name__).exception( + "Error while attempting to persist RAG index: %s", e + ) + try: + from ..interface.notifier import notify + + notify("warning", f"Error while attempting to persist RAG index: {e}") except Exception: - # ignore save failures pass - except Exception: - pass def _chunk_text( self, text: str, source: str, chunk_size: int = 1000, overlap: int = 200 diff --git a/pentestagent/mcp/hexstrike_adapter.py b/pentestagent/mcp/hexstrike_adapter.py index fbc7601..97bc483 100644 --- a/pentestagent/mcp/hexstrike_adapter.py +++ b/pentestagent/mcp/hexstrike_adapter.py @@ -12,11 +12,10 @@ operates. import asyncio import os import shutil -import sys -from pathlib import Path -from typing import Optional import signal import time +from pathlib import Path +from typing import Optional try: import aiohttp @@ -27,7 +26,6 @@ except Exception: from ..workspaces.utils import get_loot_file - class HexstrikeAdapter: """Manage a vendored HexStrike server under `third_party/hexstrike`. diff --git a/pentestagent/mcp/manager.py b/pentestagent/mcp/manager.py index bd4f533..55bb029 100644 --- a/pentestagent/mcp/manager.py +++ b/pentestagent/mcp/manager.py @@ -13,9 +13,9 @@ Uses standard MCP configuration format: """ import asyncio +import atexit import json import os -import atexit import signal from dataclasses import dataclass, field from pathlib import Path @@ -223,7 +223,7 @@ class MCPManager: async def _stop_started_adapters_and_disconnect(self) -> None: # Stop any adapters we started - for name, adapter in list(self._started_adapters.items()): + for _name, adapter in list(self._started_adapters.items()): try: stop = getattr(adapter, "stop", None) if stop: diff --git a/pentestagent/mcp/metasploit_adapter.py b/pentestagent/mcp/metasploit_adapter.py index f83c4ff..93437ae 100644 --- a/pentestagent/mcp/metasploit_adapter.py +++ b/pentestagent/mcp/metasploit_adapter.py @@ -10,10 +10,10 @@ health check on a configurable port. import asyncio import os import shutil +import signal +import time from pathlib import Path from typing import Optional -import time -import signal try: import aiohttp @@ -141,7 +141,8 @@ class MetasploitAdapter: if not self._msfrpcd_proc or not self._msfrpcd_proc.stdout: return try: - with LOG_FILE.open("ab") as fh: + log_file = get_loot_file("artifacts/msfrpcd.log") + with log_file.open("ab") as fh: while True: line = await self._msfrpcd_proc.stdout.readline() if not line: diff --git a/pentestagent/mcp/transport.py b/pentestagent/mcp/transport.py index d8d509a..58d80eb 100644 --- a/pentestagent/mcp/transport.py +++ b/pentestagent/mcp/transport.py @@ -371,11 +371,11 @@ class SSETransport(MCPTransport): # End of event; process accumulated lines event_name = None data_lines: list[str] = [] - for l in event_lines: - if l.startswith("event:"): - event_name = l.split(":", 1)[1].strip() - elif l.startswith("data:"): - data_lines.append(l.split(":", 1)[1].lstrip()) + for evt_line in event_lines: + if evt_line.startswith("event:"): + event_name = evt_line.split(":", 1)[1].strip() + elif evt_line.startswith("data:"): + data_lines.append(evt_line.split(":", 1)[1].lstrip()) if data_lines: data_text = "\n".join(data_lines) diff --git a/pentestagent/runtime/runtime.py b/pentestagent/runtime/runtime.py index fcbb677..355bf34 100644 --- a/pentestagent/runtime/runtime.py +++ b/pentestagent/runtime/runtime.py @@ -4,7 +4,6 @@ import platform import shutil from abc import ABC, abstractmethod from dataclasses import dataclass, field -from pathlib import Path from typing import TYPE_CHECKING, List, Optional if TYPE_CHECKING: @@ -654,7 +653,6 @@ class LocalRuntime(Runtime): elif action == "screenshot": import time import uuid - from pathlib import Path # Navigate first if URL provided if kwargs.get("url"): diff --git a/pentestagent/workspaces/__init__.py b/pentestagent/workspaces/__init__.py index bb6cc20..5713c7b 100644 --- a/pentestagent/workspaces/__init__.py +++ b/pentestagent/workspaces/__init__.py @@ -1,3 +1,3 @@ -from .manager import WorkspaceManager, TargetManager, WorkspaceError +from .manager import TargetManager, WorkspaceError, WorkspaceManager __all__ = ["WorkspaceManager", "TargetManager", "WorkspaceError"] diff --git a/pentestagent/workspaces/manager.py b/pentestagent/workspaces/manager.py index b7a869f..6ac7e91 100644 --- a/pentestagent/workspaces/manager.py +++ b/pentestagent/workspaces/manager.py @@ -6,10 +6,11 @@ Design goals: - No in-memory caching: all operations read/write files directly - Lightweight hostname validation; accept IPs, CIDRs, hostnames """ -from pathlib import Path +import ipaddress +import logging import re import time -import ipaddress +from pathlib import Path from typing import List import yaml @@ -50,7 +51,7 @@ class TargetManager: # fallback to hostname validation (light) if TargetManager.HOST_RE.match(v) and ".." not in v: return v.lower() - raise WorkspaceError(f"Invalid target: {value}") + raise WorkspaceError(f"Invalid target: {value}") from None @staticmethod def validate(value: str) -> bool: @@ -118,7 +119,7 @@ class WorkspaceManager: data.setdefault("targets", []) return data except Exception as e: - raise WorkspaceError(f"Failed to read meta for {name}: {e}") + raise WorkspaceError(f"Failed to read meta for {name}: {e}") from e def _write_meta(self, name: str, meta: dict): mp = self.meta_path(name) @@ -138,9 +139,19 @@ class WorkspaceManager: meta.setdefault("operator_notes", "") meta.setdefault("tool_runs", []) self._write_meta(name, meta) - except Exception: - # Non-fatal - don't block activation on meta write errors - pass + except Exception as e: + # Non-fatal - don't block activation on meta write errors, but log for visibility + logging.getLogger(__name__).exception( + "Failed to update meta.yaml for workspace '%s': %s", name, e + ) + try: + # Emit operator-visible notification if UI present + from ..interface.notifier import notify + + notify("warning", f"Failed to update workspace meta for '{name}': {e}") + except Exception: + # ignore notifier failures + pass def set_operator_note(self, name: str, note: str) -> dict: """Append or set operator_notes for a workspace (plain text).""" diff --git a/pentestagent/workspaces/utils.py b/pentestagent/workspaces/utils.py index e107966..dddf29c 100644 --- a/pentestagent/workspaces/utils.py +++ b/pentestagent/workspaces/utils.py @@ -3,8 +3,9 @@ All functions are file-backed and do not cache the active workspace selection. This module will emit a single warning per run if no active workspace is set. """ -from pathlib import Path import logging +import shutil +from pathlib import Path from typing import Optional from .manager import WorkspaceManager @@ -59,9 +60,16 @@ def resolve_knowledge_paths(root: Optional[Path] = None) -> dict: if workspace_base and workspace_base.exists(): # prefer workspace if it has any content (explicit opt-in) try: - if any(workspace_base.rglob("*")): + # Use a non-recursive check to avoid walking the entire directory tree + if any(workspace_base.iterdir()): use_workspace = True - except Exception: + # Also allow an explicit opt-in marker file .use_workspace + elif (workspace_base / ".use_workspace").exists(): + use_workspace = True + except Exception as e: + logging.getLogger(__name__).exception( + "Error while checking workspace knowledge directory: %s", e + ) use_workspace = False if use_workspace: @@ -162,14 +170,20 @@ def import_workspace(archive: Path, root: Optional[Path] = None) -> str: candidate_root = p / name break if candidate_root and candidate_root.exists(): - # move candidate_root to dest + # move candidate_root to dest (use shutil.move to support cross-filesystem) dest.parent.mkdir(parents=True, exist_ok=True) - candidate_root.replace(dest) + try: + shutil.move(str(candidate_root), str(dest)) + except Exception as e: + raise RuntimeError(f"Failed to move workspace subtree into place: {e}") from e else: # Otherwise, assume contents are directly the workspace folder # move the parent of meta_file (or its containing dir) src = meta_file.parent dest.parent.mkdir(parents=True, exist_ok=True) - src.replace(dest) + try: + shutil.move(str(src), str(dest)) + except Exception as e: + raise RuntimeError(f"Failed to move extracted workspace into place: {e}") from e return name diff --git a/pentestagent/workspaces/validation.py b/pentestagent/workspaces/validation.py new file mode 100644 index 0000000..6b44906 --- /dev/null +++ b/pentestagent/workspaces/validation.py @@ -0,0 +1,108 @@ +"""Workspace target validation utilities. + +Provides helpers to extract candidate targets from arbitrary tool arguments +and to determine whether a candidate target is covered by the allowed +workspace targets (IP, CIDR, hostname). +""" +import ipaddress +import logging +from typing import Any, List + +from .manager import TargetManager + + +def gather_candidate_targets(obj: Any) -> List[str]: + """Extract candidate target strings from arguments (shallow). + + This intentionally performs a shallow inspection to keep the function + fast and predictable; nested structures should be handled by callers + if required. + """ + candidates: List[str] = [] + if isinstance(obj, str): + candidates.append(obj) + elif isinstance(obj, dict): + for k, v in obj.items(): + if k.lower() in ( + "target", + "host", + "hostname", + "ip", + "address", + "url", + "hosts", + "targets", + ): + if isinstance(v, (list, tuple)): + for it in v: + if isinstance(it, str): + candidates.append(it) + elif isinstance(v, str): + candidates.append(v) + return candidates + + +def is_target_in_scope(candidate: str, allowed: List[str]) -> bool: + """Check whether `candidate` is covered by any entry in `allowed`. + + Allowed entries may be IPs, CIDRs, or hostnames/labels. Candidate may + also be an IP, CIDR, or hostname. The function normalizes inputs and + performs robust comparisons for networks and addresses. + """ + try: + norm = TargetManager.normalize_target(candidate) + except Exception: + return False + + # If candidate is a network (contains '/'), treat as network + try: + if "/" in norm: + cand_net = ipaddress.ip_network(norm, strict=False) + for a in allowed: + try: + if "/" in a: + an = ipaddress.ip_network(a, strict=False) + if cand_net.subnet_of(an) or cand_net == an: + return True + else: + # allowed is IP or hostname; accept only when candidate + # network represents exactly one address equal to allowed IP + try: + allowed_ip = ipaddress.ip_address(a) + except Exception: + # not an IP (likely hostname) - skip + continue + if cand_net.num_addresses == 1 and cand_net.network_address == allowed_ip: + return True + except Exception: + continue + return False + else: + # candidate is a single IP/hostname + try: + cand_ip = ipaddress.ip_address(norm) + for a in allowed: + try: + if "/" in a: + an = ipaddress.ip_network(a, strict=False) + if cand_ip in an: + return True + else: + if TargetManager.normalize_target(a) == norm: + return True + except Exception: + if isinstance(a, str) and a.lower() == norm.lower(): + return True + return False + except Exception: + # candidate is likely a hostname; compare case-insensitively + for a in allowed: + try: + if a.lower() == norm.lower(): + return True + except Exception: + continue + return False + except Exception as e: + logging.getLogger(__name__).exception("Error checking target scope: %s", e) + return False diff --git a/pyproject.toml b/pyproject.toml index 51f5ddf..1146ac8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -126,6 +126,8 @@ known_first_party = ["pentestagent"] line-length = 88 target-version = "py310" +exclude = ["third_party/"] + [tool.ruff.lint] select = [ "E", # pycodestyle errors diff --git a/tests/test_import_workspace.py b/tests/test_import_workspace.py new file mode 100644 index 0000000..3ee9786 --- /dev/null +++ b/tests/test_import_workspace.py @@ -0,0 +1,83 @@ +import tarfile +from pathlib import Path + +import pytest + +from pentestagent.workspaces.utils import import_workspace + + +def make_tar_with_dir(source_dir: Path, archive_path: Path, store_subpath: Path = None): + # Create a tar.gz archive containing the contents of source_dir. + with tarfile.open(archive_path, "w:gz") as tf: + for p in source_dir.rglob("*"): + rel = p.relative_to(source_dir.parent) + # Optionally store paths under a custom subpath + arcname = str(rel) + if store_subpath: + # Prepend the store_subpath (e.g., workspaces/name/...) + arcname = str(store_subpath / p.relative_to(source_dir)) + tf.add(str(p), arcname=arcname) + + +def test_import_workspace_nested(tmp_path): + # Create a workspace dir structure under a temporary dir + src_root = tmp_path / "src" + ws_name = "import-test" + ws_dir = src_root / "workspaces" / ws_name + ws_dir.mkdir(parents=True) + # write meta.yaml + meta = ws_dir / "meta.yaml" + meta.write_text("name: import-test\n") + # add a file + (ws_dir / "notes.txt").write_text("hello") + + archive = tmp_path / "ws_nested.tar.gz" + # Create archive that stores workspaces//... + make_tar_with_dir(ws_dir, archive, store_subpath=Path("workspaces") / ws_name) + + dest_root = tmp_path / "dest" + dest_root.mkdir() + + name = import_workspace(archive, root=dest_root) + assert name == ws_name + dest_ws = dest_root / "workspaces" / ws_name + assert dest_ws.exists() + assert (dest_ws / "meta.yaml").exists() + + +def test_import_workspace_flat(tmp_path): + # Create a folder that is directly the workspace (not nested under workspaces/) + src = tmp_path / "srcflat" + src.mkdir() + (src / "meta.yaml").write_text("name: flat-test\n") + (src / "data.txt").write_text("x") + + archive = tmp_path / "ws_flat.tar.gz" + # Archive the src folder contents directly (no workspaces/ prefix) + with tarfile.open(archive, "w:gz") as tf: + for p in src.rglob("*"): + tf.add(str(p), arcname=str(p.relative_to(src.parent))) + + dest_root = tmp_path / "dest2" + dest_root.mkdir() + + name = import_workspace(archive, root=dest_root) + assert name == "flat-test" + assert (dest_root / "workspaces" / "flat-test" / "meta.yaml").exists() + + +def test_import_workspace_missing_meta(tmp_path): + # Archive without meta.yaml + src = tmp_path / "empty" + src.mkdir() + (src / "file.txt").write_text("x") + archive = tmp_path / "no_meta.tar.gz" + with tarfile.open(archive, "w:gz") as tf: + for p in src.rglob("*"): + tf.add(str(p), arcname=str(p.relative_to(src.parent))) + + dest_root = tmp_path / "dest3" + dest_root.mkdir() + + with pytest.raises(ValueError): + import_workspace(archive, root=dest_root) diff --git a/tests/test_notifications.py b/tests/test_notifications.py new file mode 100644 index 0000000..f5d1690 --- /dev/null +++ b/tests/test_notifications.py @@ -0,0 +1,82 @@ + + +def test_workspace_meta_write_failure_emits_notification(tmp_path, monkeypatch): + """Simulate a meta.yaml write failure and ensure notifier receives a warning.""" + from pentestagent.interface import notifier + from pentestagent.workspaces.manager import WorkspaceManager + + captured = [] + + def cb(level, message): + captured.append((level, message)) + + notifier.register_callback(cb) + + wm = WorkspaceManager(root=tmp_path) + # Create workspace first so initial meta is written successfully + wm.create("testws") + + # Patch _write_meta to raise when called during set_active's meta update + def bad_write(self, name, meta): + raise RuntimeError("disk error") + + monkeypatch.setattr(WorkspaceManager, "_write_meta", bad_write) + + # Calling set_active should attempt to update meta and trigger notification + wm.set_active("testws") + + assert len(captured) >= 1 + # Find a warning notification + assert any("Failed to update workspace meta" in m for _, m in captured) + + +def test_rag_index_save_failure_emits_notification(tmp_path, monkeypatch): + """Simulate RAG save failure during index persistence and ensure notifier gets a warning.""" + from pentestagent.interface import notifier + from pentestagent.knowledge.rag import RAGEngine + + captured = [] + + def cb(level, message): + captured.append((level, message)) + + notifier.register_callback(cb) + + # Prepare a small knowledge tree under tmp_path + ws = tmp_path / "workspaces" / "ws1" + src = ws / "knowledge" / "sources" + src.mkdir(parents=True, exist_ok=True) + f = src / "doc.txt" + f.write_text("hello world") + + + # Patch resolve_knowledge_paths in the RAG module to point to our tmp workspace + def fake_resolve(root=None): + return { + "using_workspace": True, + "sources": src, + "embeddings": ws / "knowledge" / "embeddings", + } + + monkeypatch.setattr("pentestagent.knowledge.rag.resolve_knowledge_paths", fake_resolve) + + # Ensure embeddings generation returns deterministic array (avoid external calls) + import numpy as np + + monkeypatch.setattr( + "pentestagent.knowledge.rag.get_embeddings", + lambda texts, model=None: np.zeros((len(texts), 8)), + ) + + # Patch save_index to raise + def bad_save(self, path): + raise RuntimeError("write failed") + + monkeypatch.setattr(RAGEngine, "save_index", bad_save) + + rag = RAGEngine() # uses default knowledge_path -> resolve_knowledge_paths + # Force indexing which will attempt to save and trigger notifier + rag.index(force=True) + + assert len(captured) >= 1 + assert any("Failed to save RAG index" in m or "persist RAG index" in m for _, m in captured) diff --git a/tests/test_rag_workspace_integration.py b/tests/test_rag_workspace_integration.py index 1ffca3d..1848fca 100644 --- a/tests/test_rag_workspace_integration.py +++ b/tests/test_rag_workspace_integration.py @@ -1,11 +1,8 @@ -import os from pathlib import Path -import pytest - -from pentestagent.workspaces.manager import WorkspaceManager -from pentestagent.knowledge.rag import RAGEngine from pentestagent.knowledge.indexer import KnowledgeIndexer +from pentestagent.knowledge.rag import RAGEngine +from pentestagent.workspaces.manager import WorkspaceManager def test_rag_and_indexer_use_workspace(tmp_path, monkeypatch): diff --git a/tests/test_target_scope.py b/tests/test_target_scope.py new file mode 100644 index 0000000..7421135 --- /dev/null +++ b/tests/test_target_scope.py @@ -0,0 +1,63 @@ +from types import SimpleNamespace + +import pytest + +from pentestagent.agents.base_agent import BaseAgent +from pentestagent.workspaces.manager import WorkspaceManager + + +class DummyTool: + def __init__(self, name="dummy"): + self.name = name + + async def execute(self, arguments, runtime): + return "ok" + + +class SimpleAgent(BaseAgent): + def get_system_prompt(self, mode: str = "agent") -> str: + return "" + + +@pytest.mark.asyncio +async def test_ip_and_cidr_containment(tmp_path, monkeypatch): + # Use tmp_path as project root so WorkspaceManager writes here + monkeypatch.chdir(tmp_path) + + wm = WorkspaceManager(root=tmp_path) + name = "scope-test" + wm.create(name) + wm.set_active(name) + + tool = DummyTool("dummy") + agent = SimpleAgent(llm=object(), tools=[tool], runtime=SimpleNamespace()) + + # Helper to run execute_tools with a candidate target + async def run_with_candidate(candidate): + call = {"id": "1", "name": "dummy", "arguments": {"target": candidate}} + results = await agent._execute_tools([call]) + return results[0] + + # 1) Allowed single IP, candidate same IP + wm.add_targets(name, ["192.0.2.5"]) + res = await run_with_candidate("192.0.2.5") + assert res.success is True + + # 2) Allowed single IP, candidate single-address CIDR (/32) -> allowed + res = await run_with_candidate("192.0.2.5/32") + assert res.success is True + + # 3) Allowed CIDR, candidate IP inside -> allowed + wm.add_targets(name, ["198.51.100.0/24"]) + res = await run_with_candidate("198.51.100.25") + assert res.success is True + + # 4) Allowed CIDR, candidate subnet inside -> allowed + wm.add_targets(name, ["203.0.113.0/24"]) + res = await run_with_candidate("203.0.113.128/25") + assert res.success is True + + # 5) Allowed single IP, candidate larger network -> not allowed + wm.add_targets(name, ["192.0.2.5"]) + res = await run_with_candidate("192.0.2.0/24") + assert res.success is False diff --git a/tests/test_target_scope_edges.py b/tests/test_target_scope_edges.py new file mode 100644 index 0000000..1c407a7 --- /dev/null +++ b/tests/test_target_scope_edges.py @@ -0,0 +1,56 @@ +from pentestagent.workspaces import validation +from pentestagent.workspaces.manager import TargetManager + + +def test_ip_in_cidr_containment(): + allowed = ["10.0.0.0/8"] + assert validation.is_target_in_scope("10.1.2.3", allowed) + + +def test_cidr_within_cidr(): + allowed = ["10.0.0.0/8"] + assert validation.is_target_in_scope("10.1.0.0/16", allowed) + + +def test_cidr_equal_allowed(): + allowed = ["10.0.0.0/8"] + assert validation.is_target_in_scope("10.0.0.0/8", allowed) + + +def test_cidr_larger_than_allowed_is_out_of_scope(): + allowed = ["10.0.0.0/24"] + assert not validation.is_target_in_scope("10.0.0.0/16", allowed) + + +def test_single_ip_vs_single_address_cidr(): + allowed = ["192.168.1.5"] + # Candidate expressed as a /32 network should be allowed when it represents the same single address + assert validation.is_target_in_scope("192.168.1.5/32", allowed) + + +def test_hostname_case_insensitive_match(): + allowed = ["example.com"] + assert validation.is_target_in_scope("Example.COM", allowed) + + +def test_hostname_vs_ip_not_match(): + allowed = ["example.com"] + assert not validation.is_target_in_scope("93.184.216.34", allowed) + + +def test_gather_candidate_targets_shallow_behavior(): + # shallow extraction: list of strings is extracted + args = {"targets": ["1.2.3.4", "example.com"]} + assert set(validation.gather_candidate_targets(args)) == {"1.2.3.4", "example.com"} + + # nested dicts inside lists are NOT traversed by the shallow extractor + args2 = {"hosts": [{"ip": "5.6.7.8"}]} + assert validation.gather_candidate_targets(args2) == [] + + # direct string argument returns itself + assert validation.gather_candidate_targets("8.8.8.8") == ["8.8.8.8"] + + +def test_normalize_target_accepts_hostnames_and_ips(): + assert TargetManager.normalize_target("example.com") == "example.com" + assert TargetManager.normalize_target("8.8.8.8") == "8.8.8.8" diff --git a/tests/test_workspace.py b/tests/test_workspace.py index 1d7c48e..7b75d71 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -1,9 +1,8 @@ -import os from pathlib import Path import pytest -from pentestagent.workspaces.manager import WorkspaceManager, WorkspaceError +from pentestagent.workspaces.manager import WorkspaceError, WorkspaceManager def test_invalid_workspace_names(tmp_path: Path): @@ -19,7 +18,7 @@ def test_invalid_workspace_names(tmp_path: Path): def test_create_and_idempotent(tmp_path: Path): wm = WorkspaceManager(root=tmp_path) name = "eng1" - meta = wm.create(name) + wm.create(name) assert (tmp_path / "workspaces" / name).exists() assert (tmp_path / "workspaces" / name / "meta.yaml").exists() # create again should not raise and should return meta diff --git a/third_party/hexstrike/hexstrike_mcp.py b/third_party/hexstrike/hexstrike_mcp.py index 23b083b..c816d91 100644 --- a/third_party/hexstrike/hexstrike_mcp.py +++ b/third_party/hexstrike/hexstrike_mcp.py @@ -17,17 +17,17 @@ Architecture: MCP Client for AI agent communication with HexStrike server Framework: FastMCP integration for tool orchestration """ -import sys -import os import argparse import logging -from typing import Dict, Any, Optional -import requests +import sys import time from datetime import datetime +from typing import Any, Dict, Optional +import requests from mcp.server.fastmcp import FastMCP + class HexStrikeColors: """Enhanced color palette matching the server's ModernVisualEngine.COLORS""" @@ -447,9 +447,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"☁️ Starting Prowler {provider} security assessment") result = hexstrike_client.safe_post("api/tools/prowler", data) if result.get("success"): - logger.info(f"✅ Prowler assessment completed") + logger.info("✅ Prowler assessment completed") else: - logger.error(f"❌ Prowler assessment failed") + logger.error("❌ Prowler assessment failed") return result @mcp.tool() @@ -517,9 +517,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"☁️ Starting Scout Suite {provider} assessment") result = hexstrike_client.safe_post("api/tools/scout-suite", data) if result.get("success"): - logger.info(f"✅ Scout Suite assessment completed") + logger.info("✅ Scout Suite assessment completed") else: - logger.error(f"❌ Scout Suite assessment failed") + logger.error("❌ Scout Suite assessment failed") return result @mcp.tool() @@ -575,12 +575,12 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: "regions": regions, "additional_args": additional_args } - logger.info(f"☁️ Starting Pacu AWS exploitation") + logger.info("☁️ Starting Pacu AWS exploitation") result = hexstrike_client.safe_post("api/tools/pacu", data) if result.get("success"): - logger.info(f"✅ Pacu exploitation completed") + logger.info("✅ Pacu exploitation completed") else: - logger.error(f"❌ Pacu exploitation failed") + logger.error("❌ Pacu exploitation failed") return result @mcp.tool() @@ -611,12 +611,12 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: "report": report, "additional_args": additional_args } - logger.info(f"☁️ Starting kube-hunter Kubernetes scan") + logger.info("☁️ Starting kube-hunter Kubernetes scan") result = hexstrike_client.safe_post("api/tools/kube-hunter", data) if result.get("success"): - logger.info(f"✅ kube-hunter scan completed") + logger.info("✅ kube-hunter scan completed") else: - logger.error(f"❌ kube-hunter scan failed") + logger.error("❌ kube-hunter scan failed") return result @mcp.tool() @@ -642,12 +642,12 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: "output_format": output_format, "additional_args": additional_args } - logger.info(f"☁️ Starting kube-bench CIS benchmark") + logger.info("☁️ Starting kube-bench CIS benchmark") result = hexstrike_client.safe_post("api/tools/kube-bench", data) if result.get("success"): - logger.info(f"✅ kube-bench benchmark completed") + logger.info("✅ kube-bench benchmark completed") else: - logger.error(f"❌ kube-bench benchmark failed") + logger.error("❌ kube-bench benchmark failed") return result @mcp.tool() @@ -672,12 +672,12 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: "output_file": output_file, "additional_args": additional_args } - logger.info(f"🐳 Starting Docker Bench Security assessment") + logger.info("🐳 Starting Docker Bench Security assessment") result = hexstrike_client.safe_post("api/tools/docker-bench-security", data) if result.get("success"): - logger.info(f"✅ Docker Bench Security completed") + logger.info("✅ Docker Bench Security completed") else: - logger.error(f"❌ Docker Bench Security failed") + logger.error("❌ Docker Bench Security failed") return result @mcp.tool() @@ -736,9 +736,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🛡️ Starting Falco runtime monitoring for {duration}s") result = hexstrike_client.safe_post("api/tools/falco", data) if result.get("success"): - logger.info(f"✅ Falco monitoring completed") + logger.info("✅ Falco monitoring completed") else: - logger.error(f"❌ Falco monitoring failed") + logger.error("❌ Falco monitoring failed") return result @mcp.tool() @@ -770,9 +770,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🔍 Starting Checkov IaC scan: {directory}") result = hexstrike_client.safe_post("api/tools/checkov", data) if result.get("success"): - logger.info(f"✅ Checkov scan completed") + logger.info("✅ Checkov scan completed") else: - logger.error(f"❌ Checkov scan failed") + logger.error("❌ Checkov scan failed") return result @mcp.tool() @@ -804,9 +804,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🔍 Starting Terrascan IaC scan: {iac_dir}") result = hexstrike_client.safe_post("api/tools/terrascan", data) if result.get("success"): - logger.info(f"✅ Terrascan scan completed") + logger.info("✅ Terrascan scan completed") else: - logger.error(f"❌ Terrascan scan failed") + logger.error("❌ Terrascan scan failed") return result # ============================================================================ @@ -932,9 +932,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🎯 Generating {payload_type} payload: {size} bytes") result = hexstrike_client.safe_post("api/payloads/generate", data) if result.get("success"): - logger.info(f"✅ Payload generated successfully") + logger.info("✅ Payload generated successfully") else: - logger.error(f"❌ Failed to generate payload") + logger.error("❌ Failed to generate payload") return result # ============================================================================ @@ -988,9 +988,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🐍 Executing Python script in env {env_name}") result = hexstrike_client.safe_post("api/python/execute", data) if result.get("success"): - logger.info(f"✅ Python script executed successfully") + logger.info("✅ Python script executed successfully") else: - logger.error(f"❌ Python script execution failed") + logger.error("❌ Python script execution failed") return result # ============================================================================ @@ -1167,9 +1167,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🔐 Starting John the Ripper: {hash_file}") result = hexstrike_client.safe_post("api/tools/john", data) if result.get("success"): - logger.info(f"✅ John the Ripper completed") + logger.info("✅ John the Ripper completed") else: - logger.error(f"❌ John the Ripper failed") + logger.error("❌ John the Ripper failed") return result @mcp.tool() @@ -1337,9 +1337,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🔐 Starting Hashcat attack: mode {attack_mode}") result = hexstrike_client.safe_post("api/tools/hashcat", data) if result.get("success"): - logger.info(f"✅ Hashcat attack completed") + logger.info("✅ Hashcat attack completed") else: - logger.error(f"❌ Hashcat attack failed") + logger.error("❌ Hashcat attack failed") return result @mcp.tool() @@ -1690,9 +1690,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🔍 Starting arp-scan: {target if target else 'local network'}") result = hexstrike_client.safe_post("api/tools/arp-scan", data) if result.get("success"): - logger.info(f"✅ arp-scan completed") + logger.info("✅ arp-scan completed") else: - logger.error(f"❌ arp-scan failed") + logger.error("❌ arp-scan failed") return result @mcp.tool() @@ -1727,9 +1727,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🔍 Starting Responder on interface: {interface}") result = hexstrike_client.safe_post("api/tools/responder", data) if result.get("success"): - logger.info(f"✅ Responder completed") + logger.info("✅ Responder completed") else: - logger.error(f"❌ Responder failed") + logger.error("❌ Responder failed") return result @mcp.tool() @@ -1755,9 +1755,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🧠 Starting Volatility analysis: {plugin}") result = hexstrike_client.safe_post("api/tools/volatility", data) if result.get("success"): - logger.info(f"✅ Volatility analysis completed") + logger.info("✅ Volatility analysis completed") else: - logger.error(f"❌ Volatility analysis failed") + logger.error("❌ Volatility analysis failed") return result @mcp.tool() @@ -1787,9 +1787,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🚀 Starting MSFVenom payload generation: {payload}") result = hexstrike_client.safe_post("api/tools/msfvenom", data) if result.get("success"): - logger.info(f"✅ MSFVenom payload generated") + logger.info("✅ MSFVenom payload generated") else: - logger.error(f"❌ MSFVenom payload generation failed") + logger.error("❌ MSFVenom payload generation failed") return result # ============================================================================ @@ -2071,9 +2071,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🔧 Starting Pwntools exploit: {exploit_type}") result = hexstrike_client.safe_post("api/tools/pwntools", data) if result.get("success"): - logger.info(f"✅ Pwntools exploit completed") + logger.info("✅ Pwntools exploit completed") else: - logger.error(f"❌ Pwntools exploit failed") + logger.error("❌ Pwntools exploit failed") return result @mcp.tool() @@ -2097,9 +2097,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🔧 Starting one_gadget analysis: {libc_path}") result = hexstrike_client.safe_post("api/tools/one-gadget", data) if result.get("success"): - logger.info(f"✅ one_gadget analysis completed") + logger.info("✅ one_gadget analysis completed") else: - logger.error(f"❌ one_gadget analysis failed") + logger.error("❌ one_gadget analysis failed") return result @mcp.tool() @@ -2157,9 +2157,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🔧 Starting GDB-PEDA analysis: {binary or f'PID {attach_pid}' or core_file}") result = hexstrike_client.safe_post("api/tools/gdb-peda", data) if result.get("success"): - logger.info(f"✅ GDB-PEDA analysis completed") + logger.info("✅ GDB-PEDA analysis completed") else: - logger.error(f"❌ GDB-PEDA analysis failed") + logger.error("❌ GDB-PEDA analysis failed") return result @mcp.tool() @@ -2191,9 +2191,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🔧 Starting angr analysis: {binary}") result = hexstrike_client.safe_post("api/tools/angr", data) if result.get("success"): - logger.info(f"✅ angr analysis completed") + logger.info("✅ angr analysis completed") else: - logger.error(f"❌ angr analysis failed") + logger.error("❌ angr analysis failed") return result @mcp.tool() @@ -2225,9 +2225,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🔧 Starting ropper analysis: {binary}") result = hexstrike_client.safe_post("api/tools/ropper", data) if result.get("success"): - logger.info(f"✅ ropper analysis completed") + logger.info("✅ ropper analysis completed") else: - logger.error(f"❌ ropper analysis failed") + logger.error("❌ ropper analysis failed") return result @mcp.tool() @@ -2256,9 +2256,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🔧 Starting pwninit setup: {binary}") result = hexstrike_client.safe_post("api/tools/pwninit", data) if result.get("success"): - logger.info(f"✅ pwninit setup completed") + logger.info("✅ pwninit setup completed") else: - logger.error(f"❌ pwninit setup failed") + logger.error("❌ pwninit setup failed") return result @mcp.tool() @@ -2667,9 +2667,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🎯 Starting Dalfox XSS scan: {url if url else 'pipe mode'}") result = hexstrike_client.safe_post("api/tools/dalfox", data) if result.get("success"): - logger.info(f"✅ Dalfox XSS scan completed") + logger.info("✅ Dalfox XSS scan completed") else: - logger.error(f"❌ Dalfox XSS scan failed") + logger.error("❌ Dalfox XSS scan failed") return result @mcp.tool() @@ -2922,7 +2922,7 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: if payload_info.get("risk_level") == "HIGH": results["summary"]["high_risk_payloads"] += 1 - logger.info(f"✅ Attack suite generated:") + logger.info("✅ Attack suite generated:") logger.info(f" ├─ Total payloads: {results['summary']['total_payloads']}") logger.info(f" ├─ High-risk payloads: {results['summary']['high_risk_payloads']}") logger.info(f" └─ Test cases: {results['summary']['test_cases']}") @@ -2967,7 +2967,7 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: endpoint_count = len(result.get("results", [])) logger.info(f"✅ API endpoint testing completed: {endpoint_count} endpoints tested") else: - logger.info(f"✅ API endpoint discovery completed") + logger.info("✅ API endpoint discovery completed") else: logger.error("❌ API fuzzing failed") @@ -3032,7 +3032,7 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: "target_url": target_url } - logger.info(f"🔍 Starting JWT security analysis") + logger.info("🔍 Starting JWT security analysis") result = hexstrike_client.safe_post("api/tools/jwt_analyzer", data) if result.get("success"): @@ -3089,7 +3089,7 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.warning(f" ├─ [{severity}] {issue_type}") if endpoint_count > 0: - logger.info(f"📊 Discovered endpoints:") + logger.info("📊 Discovered endpoints:") for endpoint in analysis.get("endpoints_found", [])[:5]: # Show first 5 method = endpoint.get("method", "GET") path = endpoint.get("path", "/") @@ -3183,7 +3183,7 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: "audit_coverage": "comprehensive" if len(audit_results["tests_performed"]) >= 3 else "partial" } - logger.info(f"✅ Comprehensive API audit completed:") + logger.info("✅ Comprehensive API audit completed:") logger.info(f" ├─ Tests performed: {audit_results['summary']['tests_performed']}") logger.info(f" ├─ Total vulnerabilities: {audit_results['summary']['total_vulnerabilities']}") logger.info(f" └─ Coverage: {audit_results['summary']['audit_coverage']}") @@ -3220,9 +3220,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🧠 Starting Volatility3 analysis: {plugin}") result = hexstrike_client.safe_post("api/tools/volatility3", data) if result.get("success"): - logger.info(f"✅ Volatility3 analysis completed") + logger.info("✅ Volatility3 analysis completed") else: - logger.error(f"❌ Volatility3 analysis failed") + logger.error("❌ Volatility3 analysis failed") return result @mcp.tool() @@ -3248,9 +3248,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"📁 Starting Foremost file carving: {input_file}") result = hexstrike_client.safe_post("api/tools/foremost", data) if result.get("success"): - logger.info(f"✅ Foremost carving completed") + logger.info("✅ Foremost carving completed") else: - logger.error(f"❌ Foremost carving failed") + logger.error("❌ Foremost carving failed") return result @mcp.tool() @@ -3308,9 +3308,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"📷 Starting ExifTool analysis: {file_path}") result = hexstrike_client.safe_post("api/tools/exiftool", data) if result.get("success"): - logger.info(f"✅ ExifTool analysis completed") + logger.info("✅ ExifTool analysis completed") else: - logger.error(f"❌ ExifTool analysis failed") + logger.error("❌ ExifTool analysis failed") return result @mcp.tool() @@ -3335,12 +3335,12 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: "append_data": append_data, "additional_args": additional_args } - logger.info(f"🔐 Starting HashPump attack") + logger.info("🔐 Starting HashPump attack") result = hexstrike_client.safe_post("api/tools/hashpump", data) if result.get("success"): - logger.info(f"✅ HashPump attack completed") + logger.info("✅ HashPump attack completed") else: - logger.error(f"❌ HashPump attack failed") + logger.error("❌ HashPump attack failed") return result # ============================================================================ @@ -3383,9 +3383,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🕷️ Starting Hakrawler crawling: {url}") result = hexstrike_client.safe_post("api/tools/hakrawler", data) if result.get("success"): - logger.info(f"✅ Hakrawler crawling completed") + logger.info("✅ Hakrawler crawling completed") else: - logger.error(f"❌ Hakrawler crawling failed") + logger.error("❌ Hakrawler crawling failed") return result @mcp.tool() @@ -3416,12 +3416,12 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: "output_file": output_file, "additional_args": additional_args } - logger.info(f"🌐 Starting HTTPx probing") + logger.info("🌐 Starting HTTPx probing") result = hexstrike_client.safe_post("api/tools/httpx", data) if result.get("success"): - logger.info(f"✅ HTTPx probing completed") + logger.info("✅ HTTPx probing completed") else: - logger.error(f"❌ HTTPx probing failed") + logger.error("❌ HTTPx probing failed") return result @mcp.tool() @@ -3449,9 +3449,9 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: logger.info(f"🔍 Starting ParamSpider discovery: {domain}") result = hexstrike_client.safe_post("api/tools/paramspider", data) if result.get("success"): - logger.info(f"✅ ParamSpider discovery completed") + logger.info("✅ ParamSpider discovery completed") else: - logger.error(f"❌ ParamSpider discovery failed") + logger.error("❌ ParamSpider discovery failed") return result # ============================================================================ @@ -3486,12 +3486,12 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: "output_file": output_file, "additional_args": additional_args } - logger.info(f"🔍 Starting Burp Suite scan") + logger.info("🔍 Starting Burp Suite scan") result = hexstrike_client.safe_post("api/tools/burpsuite", data) if result.get("success"): - logger.info(f"✅ Burp Suite scan completed") + logger.info("✅ Burp Suite scan completed") else: - logger.error(f"❌ Burp Suite scan failed") + logger.error("❌ Burp Suite scan failed") return result @mcp.tool() @@ -3794,7 +3794,7 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: Returns: Server health information with tool availability and telemetry """ - logger.info(f"🏥 Checking HexStrike AI server health") + logger.info("🏥 Checking HexStrike AI server health") result = hexstrike_client.check_health() if result.get("status") == "healthy": logger.info(f"✅ Server is healthy - {result.get('total_tools_available', 0)} tools available") @@ -3810,7 +3810,7 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: Returns: Cache performance statistics """ - logger.info(f"💾 Getting cache statistics") + logger.info("💾 Getting cache statistics") result = hexstrike_client.safe_get("api/cache/stats") if "hit_rate" in result: logger.info(f"📊 Cache hit rate: {result.get('hit_rate', 'unknown')}") @@ -3824,12 +3824,12 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: Returns: Cache clear operation results """ - logger.info(f"🧹 Clearing server cache") + logger.info("🧹 Clearing server cache") result = hexstrike_client.safe_post("api/cache/clear", {}) if result.get("success"): - logger.info(f"✅ Cache cleared successfully") + logger.info("✅ Cache cleared successfully") else: - logger.error(f"❌ Failed to clear cache") + logger.error("❌ Failed to clear cache") return result @mcp.tool() @@ -3840,7 +3840,7 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: Returns: System performance and usage telemetry """ - logger.info(f"📈 Getting system telemetry") + logger.info("📈 Getting system telemetry") result = hexstrike_client.safe_get("api/telemetry") if "commands_executed" in result: logger.info(f"📊 Commands executed: {result.get('commands_executed', 0)}") @@ -3993,7 +3993,7 @@ def setup_mcp_server(hexstrike_client: HexStrikeClient) -> FastMCP: execution_time = result.get("execution_time", 0) logger.info(f"✅ Command completed successfully in {execution_time:.2f}s") else: - logger.warning(f"⚠️ Command completed with errors") + logger.warning("⚠️ Command completed with errors") return result except Exception as e: @@ -5433,7 +5433,7 @@ def main(): logger.debug("🔍 Debug logging enabled") # MCP compatibility: No banner output to avoid JSON parsing issues - logger.info(f"🚀 Starting HexStrike AI MCP Client v6.0") + logger.info("🚀 Starting HexStrike AI MCP Client v6.0") logger.info(f"🔗 Connecting to: {args.server}") try: diff --git a/third_party/hexstrike/hexstrike_server.py b/third_party/hexstrike/hexstrike_server.py index baa5db4..9e9182b 100644 --- a/third_party/hexstrike/hexstrike_server.py +++ b/third_party/hexstrike/hexstrike_server.py @@ -19,51 +19,39 @@ Framework: FastMCP integration for AI agent communication """ import argparse +import base64 +import hashlib import json import logging import os +import queue +import re +import shutil +import signal +import socket import subprocess import sys -import traceback import threading import time -import hashlib -import pickle -import base64 -import queue -from concurrent.futures import ThreadPoolExecutor -from datetime import datetime, timedelta -from typing import Dict, Any, Optional -from collections import OrderedDict -import shutil -import venv -import zipfile -from pathlib import Path -from flask import Flask, request, jsonify -import psutil -import signal -import requests -import re -import socket +import traceback import urllib.parse +import venv +from collections import OrderedDict +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field +from datetime import datetime, timedelta from enum import Enum -from typing import List, Set, Tuple -import asyncio -import aiohttp -from urllib.parse import urljoin, urlparse, parse_qs +from pathlib import Path +from typing import Any, Dict, List, Optional, Set +from urllib.parse import urljoin, urlparse + +import psutil +import requests from bs4 import BeautifulSoup -import selenium +from flask import Flask, jsonify, request from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By -from selenium.webdriver.support.ui import WebDriverWait -from selenium.webdriver.support import expected_conditions as EC -from selenium.common.exceptions import TimeoutException, WebDriverException -import mitmproxy -from mitmproxy import http as mitmhttp -from mitmproxy.tools.dump import DumpMaster -from mitmproxy.options import Options as MitmOptions # ============================================================================ # LOGGING CONFIGURATION (MUST BE FIRST) @@ -289,7 +277,7 @@ class ModernVisualEngine: dashboard_lines = [ f"{ModernVisualEngine.COLORS['PRIMARY_BORDER']}╭─────────────────────────────────────────────────────────────────────────────╮", f"│ {ModernVisualEngine.COLORS['ACCENT_LINE']}📊 HEXSTRIKE LIVE DASHBOARD{ModernVisualEngine.COLORS['PRIMARY_BORDER']} │", - f"├─────────────────────────────────────────────────────────────────────────────┤" + "├─────────────────────────────────────────────────────────────────────────────┤" ] for pid, proc_info in processes.items(): @@ -1548,12 +1536,9 @@ decision_engine = IntelligentDecisionEngine() # INTELLIGENT ERROR HANDLING AND RECOVERY SYSTEM (v11.0 ENHANCEMENT) # ============================================================================ -from enum import Enum from dataclasses import dataclass -from typing import Callable, Union -import traceback -import time -import random +from enum import Enum + class ErrorType(Enum): """Enumeration of different error types for intelligent handling""" @@ -3967,7 +3952,7 @@ class CTFChallengeAutomator: step_result["output"] += f"[MANUAL] {step['description']}\n" step_result["success"] = True elif tool == "custom": - step_result["output"] += f"[CUSTOM] Custom implementation required\n" + step_result["output"] += "[CUSTOM] Custom implementation required\n" step_result["success"] = True else: command = ctf_tools.get_tool_command(tool, challenge.target or challenge.name) @@ -5406,7 +5391,7 @@ class EnhancedProcessManager: if current_workers < self.process_pool.max_workers: self.process_pool._scale_up(1) - logger.info(f"📈 Auto-scaled up due to available resources and demand") + logger.info("📈 Auto-scaled up due to available resources and demand") def get_comprehensive_stats(self) -> Dict[str, Any]: """Get comprehensive system and process statistics""" @@ -5956,52 +5941,52 @@ class CVEIntelligenceManager: """Fetch latest CVEs from NVD and other real sources""" try: logger.info(f"🔍 Fetching CVEs from last {hours} hours with severity: {severity_filter}") - + # Calculate date range for CVE search end_date = datetime.now() start_date = end_date - timedelta(hours=hours) - + # Format dates for NVD API (ISO 8601 format) start_date_str = start_date.strftime('%Y-%m-%dT%H:%M:%S.000') end_date_str = end_date.strftime('%Y-%m-%dT%H:%M:%S.000') - + # NVD API endpoint nvd_url = "https://services.nvd.nist.gov/rest/json/cves/2.0" - + # Parse severity filter severity_levels = [s.strip().upper() for s in severity_filter.split(",")] - + all_cves = [] - + # Query NVD API with rate limiting compliance params = { 'lastModStartDate': start_date_str, 'lastModEndDate': end_date_str, 'resultsPerPage': 100 } - + try: # Add delay to respect NVD rate limits (6 seconds between requests for unauthenticated) import time - + logger.info(f"🌐 Querying NVD API: {nvd_url}") response = requests.get(nvd_url, params=params, timeout=30) - + if response.status_code == 200: nvd_data = response.json() vulnerabilities = nvd_data.get('vulnerabilities', []) - + logger.info(f"📊 Retrieved {len(vulnerabilities)} vulnerabilities from NVD") - + for vuln_item in vulnerabilities: cve_data = vuln_item.get('cve', {}) cve_id = cve_data.get('id', 'Unknown') - + # Extract CVSS scores and determine severity metrics = cve_data.get('metrics', {}) cvss_score = 0.0 severity = "UNKNOWN" - + # Try CVSS v3.1 first, then v3.0, then v2.0 if 'cvssMetricV31' in metrics and metrics['cvssMetricV31']: cvss_data = metrics['cvssMetricV31'][0]['cvssData'] @@ -6023,11 +6008,11 @@ class CVEIntelligenceManager: severity = "MEDIUM" else: severity = "LOW" - + # Filter by severity if specified if severity not in severity_levels and severity_levels != ['ALL']: continue - + # Extract description descriptions = cve_data.get('descriptions', []) description = "No description available" @@ -6035,13 +6020,13 @@ class CVEIntelligenceManager: if desc.get('lang') == 'en': description = desc.get('value', description) break - + # Extract references references = [] ref_data = cve_data.get('references', []) for ref in ref_data[:5]: # Limit to first 5 references references.append(ref.get('url', '')) - + # Extract affected software (CPE data) affected_software = [] configurations = cve_data.get('configurations', []) @@ -6059,7 +6044,7 @@ class CVEIntelligenceManager: product = parts[4] version = parts[5] if parts[5] != '*' else 'all versions' affected_software.append(f"{vendor} {product} {version}") - + cve_entry = { "cve_id": cve_id, "description": description, @@ -6071,19 +6056,19 @@ class CVEIntelligenceManager: "references": references, "source": "NVD" } - + all_cves.append(cve_entry) - + else: logger.warning(f"⚠️ NVD API returned status code: {response.status_code}") - + except requests.exceptions.RequestException as e: logger.error(f"❌ Error querying NVD API: {str(e)}") - + # If no CVEs found from NVD, try alternative sources or provide informative response if not all_cves: logger.info("🔄 No recent CVEs found in specified timeframe, checking for any recent critical CVEs...") - + # Try a broader search for recent critical CVEs (last 7 days) try: broader_start = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%S.000') @@ -6093,18 +6078,18 @@ class CVEIntelligenceManager: 'cvssV3Severity': 'CRITICAL', 'resultsPerPage': 20 } - + time.sleep(6) # Rate limit compliance response = requests.get(nvd_url, params=broader_params, timeout=30) - + if response.status_code == 200: nvd_data = response.json() vulnerabilities = nvd_data.get('vulnerabilities', []) - + for vuln_item in vulnerabilities[:10]: # Limit to 10 most recent cve_data = vuln_item.get('cve', {}) cve_id = cve_data.get('id', 'Unknown') - + # Extract basic info for recent critical CVEs descriptions = cve_data.get('descriptions', []) description = "No description available" @@ -6112,12 +6097,12 @@ class CVEIntelligenceManager: if desc.get('lang') == 'en': description = desc.get('value', description) break - + metrics = cve_data.get('metrics', {}) cvss_score = 0.0 if 'cvssMetricV31' in metrics and metrics['cvssMetricV31']: cvss_score = metrics['cvssMetricV31'][0]['cvssData'].get('baseScore', 0.0) - + cve_entry = { "cve_id": cve_id, "description": description, @@ -6129,14 +6114,14 @@ class CVEIntelligenceManager: "references": [f"https://nvd.nist.gov/vuln/detail/{cve_id}"], "source": "NVD (Recent Critical)" } - + all_cves.append(cve_entry) - + except Exception as broader_e: logger.warning(f"⚠️ Broader search also failed: {str(broader_e)}") - + logger.info(f"✅ Successfully retrieved {len(all_cves)} CVEs") - + return { "success": True, "cves": all_cves, @@ -6146,7 +6131,7 @@ class CVEIntelligenceManager: "data_sources": ["NVD API v2.0"], "search_period": f"{start_date_str} to {end_date_str}" } - + except Exception as e: logger.error(f"💥 Error fetching CVEs: {str(e)}") return { @@ -6160,16 +6145,15 @@ class CVEIntelligenceManager: """Analyze CVE exploitability using real CVE data and threat intelligence""" try: logger.info(f"🔬 Analyzing exploitability for {cve_id}") - + # Fetch detailed CVE data from NVD - nvd_url = f"https://services.nvd.nist.gov/rest/json/cves/2.0" + nvd_url = "https://services.nvd.nist.gov/rest/json/cves/2.0" params = {'cveId': cve_id} - - import time - + + try: response = requests.get(nvd_url, params=params, timeout=30) - + if response.status_code != 200: logger.warning(f"⚠️ NVD API returned status {response.status_code} for {cve_id}") return { @@ -6177,10 +6161,10 @@ class CVEIntelligenceManager: "error": f"Failed to fetch CVE data: HTTP {response.status_code}", "cve_id": cve_id } - + nvd_data = response.json() vulnerabilities = nvd_data.get('vulnerabilities', []) - + if not vulnerabilities: logger.warning(f"⚠️ No data found for CVE {cve_id}") return { @@ -6188,9 +6172,9 @@ class CVEIntelligenceManager: "error": f"CVE {cve_id} not found in NVD database", "cve_id": cve_id } - + cve_data = vulnerabilities[0].get('cve', {}) - + # Extract CVSS metrics for exploitability analysis metrics = cve_data.get('metrics', {}) cvss_score = 0.0 @@ -6200,7 +6184,7 @@ class CVEIntelligenceManager: privileges_required = "UNKNOWN" user_interaction = "UNKNOWN" exploitability_subscore = 0.0 - + # Analyze CVSS v3.1 metrics (preferred) if 'cvssMetricV31' in metrics and metrics['cvssMetricV31']: cvss_data = metrics['cvssMetricV31'][0]['cvssData'] @@ -6211,7 +6195,7 @@ class CVEIntelligenceManager: privileges_required = cvss_data.get('privilegesRequired', 'UNKNOWN') user_interaction = cvss_data.get('userInteraction', 'UNKNOWN') exploitability_subscore = cvss_data.get('exploitabilityScore', 0.0) - + elif 'cvssMetricV30' in metrics and metrics['cvssMetricV30']: cvss_data = metrics['cvssMetricV30'][0]['cvssData'] cvss_score = cvss_data.get('baseScore', 0.0) @@ -6221,17 +6205,17 @@ class CVEIntelligenceManager: privileges_required = cvss_data.get('privilegesRequired', 'UNKNOWN') user_interaction = cvss_data.get('userInteraction', 'UNKNOWN') exploitability_subscore = cvss_data.get('exploitabilityScore', 0.0) - + # Calculate exploitability score based on CVSS metrics exploitability_score = 0.0 - + # Base exploitability on CVSS exploitability subscore if available if exploitability_subscore > 0: exploitability_score = min(exploitability_subscore / 3.9, 1.0) # Normalize to 0-1 else: # Calculate based on individual CVSS components score_components = 0.0 - + # Attack Vector scoring if attack_vector == "NETWORK": score_components += 0.4 @@ -6241,25 +6225,25 @@ class CVEIntelligenceManager: score_components += 0.2 elif attack_vector == "PHYSICAL": score_components += 0.1 - + # Attack Complexity scoring if attack_complexity == "LOW": score_components += 0.3 elif attack_complexity == "HIGH": score_components += 0.1 - + # Privileges Required scoring if privileges_required == "NONE": score_components += 0.2 elif privileges_required == "LOW": score_components += 0.1 - + # User Interaction scoring if user_interaction == "NONE": score_components += 0.1 - + exploitability_score = min(score_components, 1.0) - + # Determine exploitability level if exploitability_score >= 0.8: exploitability_level = "HIGH" @@ -6269,7 +6253,7 @@ class CVEIntelligenceManager: exploitability_level = "LOW" else: exploitability_level = "VERY_LOW" - + # Extract description for additional context descriptions = cve_data.get('descriptions', []) description = "" @@ -6277,7 +6261,7 @@ class CVEIntelligenceManager: if desc.get('lang') == 'en': description = desc.get('value', '') break - + # Analyze description for exploit indicators exploit_keywords = [ 'remote code execution', 'rce', 'buffer overflow', 'stack overflow', @@ -6286,31 +6270,31 @@ class CVEIntelligenceManager: 'privilege escalation', 'directory traversal', 'path traversal', 'deserialization', 'xxe', 'ssrf', 'csrf', 'xss' ] - + description_lower = description.lower() exploit_indicators = [kw for kw in exploit_keywords if kw in description_lower] - + # Adjust exploitability based on vulnerability type if any(kw in description_lower for kw in ['remote code execution', 'rce', 'buffer overflow']): exploitability_score = min(exploitability_score + 0.2, 1.0) elif any(kw in description_lower for kw in ['authentication bypass', 'privilege escalation']): exploitability_score = min(exploitability_score + 0.15, 1.0) - + # Check for public exploit availability indicators public_exploits = False exploit_maturity = "UNKNOWN" - + # Look for exploit references in CVE references references = cve_data.get('references', []) exploit_sources = ['exploit-db.com', 'github.com', 'packetstormsecurity.com', 'metasploit'] - + for ref in references: ref_url = ref.get('url', '').lower() if any(source in ref_url for source in exploit_sources): public_exploits = True exploit_maturity = "PROOF_OF_CONCEPT" break - + # Determine weaponization level weaponization_level = "LOW" if public_exploits and exploitability_score > 0.7: @@ -6319,14 +6303,14 @@ class CVEIntelligenceManager: weaponization_level = "MEDIUM" elif exploitability_score > 0.8: weaponization_level = "MEDIUM" - + # Active exploitation assessment active_exploitation = False if exploitability_score > 0.8 and public_exploits: active_exploitation = True elif severity in ["CRITICAL", "HIGH"] and attack_vector == "NETWORK": active_exploitation = True - + # Priority recommendation if exploitability_score > 0.8 and severity == "CRITICAL": priority = "IMMEDIATE" @@ -6336,11 +6320,11 @@ class CVEIntelligenceManager: priority = "MEDIUM" else: priority = "LOW" - + # Extract publication and modification dates published_date = cve_data.get('published', '') last_modified = cve_data.get('lastModified', '') - + analysis = { "success": True, "cve_id": cve_id, @@ -6373,11 +6357,11 @@ class CVEIntelligenceManager: "data_source": "NVD API v2.0", "analysis_timestamp": datetime.now().isoformat() } - + logger.info(f"✅ Completed exploitability analysis for {cve_id}: {exploitability_level} ({exploitability_score:.2f})") - + return analysis - + except requests.exceptions.RequestException as e: logger.error(f"❌ Network error analyzing {cve_id}: {str(e)}") return { @@ -6385,7 +6369,7 @@ class CVEIntelligenceManager: "error": f"Network error: {str(e)}", "cve_id": cve_id } - + except Exception as e: logger.error(f"💥 Error analyzing CVE {cve_id}: {str(e)}") return { @@ -6398,14 +6382,14 @@ class CVEIntelligenceManager: """Search for existing exploits from real sources""" try: logger.info(f"🔎 Searching existing exploits for {cve_id}") - + all_exploits = [] sources_searched = [] - + # 1. Search GitHub for PoCs and exploits try: logger.info(f"🔍 Searching GitHub for {cve_id} exploits...") - + # GitHub Search API github_search_url = "https://api.github.com/search/repositories" github_params = { @@ -6414,18 +6398,18 @@ class CVEIntelligenceManager: 'order': 'desc', 'per_page': 10 } - + github_response = requests.get(github_search_url, params=github_params, timeout=15) - + if github_response.status_code == 200: github_data = github_response.json() repositories = github_data.get('items', []) - + for repo in repositories[:5]: # Limit to top 5 results # Check if CVE is actually mentioned in repo name or description repo_name = repo.get('name', '').lower() repo_desc = repo.get('description', '').lower() - + if cve_id.lower() in repo_name or cve_id.lower() in repo_desc: exploit_entry = { "source": "github", @@ -6443,51 +6427,51 @@ class CVEIntelligenceManager: "verified": False, "reliability": "UNVERIFIED" } - + # Assess reliability based on repo metrics stars = repo.get('stargazers_count', 0) forks = repo.get('forks_count', 0) - + if stars >= 50 or forks >= 10: exploit_entry["reliability"] = "GOOD" elif stars >= 20 or forks >= 5: exploit_entry["reliability"] = "FAIR" - + all_exploits.append(exploit_entry) - + sources_searched.append("github") logger.info(f"✅ Found {len([e for e in all_exploits if e['source'] == 'github'])} GitHub repositories") - + else: logger.warning(f"⚠️ GitHub search failed with status {github_response.status_code}") - + except requests.exceptions.RequestException as e: logger.error(f"❌ GitHub search error: {str(e)}") - + # 2. Search Exploit-DB via searchsploit-like functionality try: logger.info(f"🔍 Searching for {cve_id} in exploit databases...") - + # Since we can't directly access Exploit-DB API, we'll use a web search approach # or check if the CVE references contain exploit-db links - + # First, get CVE data to check references nvd_url = "https://services.nvd.nist.gov/rest/json/cves/2.0" nvd_params = {'cveId': cve_id} - + import time time.sleep(1) # Rate limiting - + nvd_response = requests.get(nvd_url, params=nvd_params, timeout=20) - + if nvd_response.status_code == 200: nvd_data = nvd_response.json() vulnerabilities = nvd_data.get('vulnerabilities', []) - + if vulnerabilities: cve_data = vulnerabilities[0].get('cve', {}) references = cve_data.get('references', []) - + # Check references for exploit sources exploit_sources = { 'exploit-db.com': 'exploit-db', @@ -6495,18 +6479,18 @@ class CVEIntelligenceManager: 'metasploit': 'metasploit', 'rapid7.com': 'rapid7' } - + for ref in references: ref_url = ref.get('url', '') ref_url_lower = ref_url.lower() - + for source_domain, source_name in exploit_sources.items(): if source_domain in ref_url_lower: exploit_entry = { "source": source_name, "exploit_id": f"{source_name}-ref", "title": f"Referenced exploit for {cve_id}", - "description": f"Exploit reference found in CVE data", + "description": "Exploit reference found in CVE data", "author": "Various", "date_published": cve_data.get('published', ''), "type": "reference", @@ -6516,31 +6500,31 @@ class CVEIntelligenceManager: "reliability": "GOOD" if source_name == "exploit-db" else "FAIR" } all_exploits.append(exploit_entry) - + if source_name not in sources_searched: sources_searched.append(source_name) - + except Exception as e: logger.error(f"❌ Exploit database search error: {str(e)}") - + # 3. Search for Metasploit modules try: logger.info(f"🔍 Searching for Metasploit modules for {cve_id}...") - + # Search GitHub for Metasploit modules containing the CVE msf_search_url = "https://api.github.com/search/code" msf_params = { 'q': f'{cve_id} filename:*.rb repo:rapid7/metasploit-framework', 'per_page': 5 } - + time.sleep(1) # Rate limiting msf_response = requests.get(msf_search_url, params=msf_params, timeout=15) - + if msf_response.status_code == 200: msf_data = msf_response.json() code_results = msf_data.get('items', []) - + for code_item in code_results: file_path = code_item.get('path', '') if 'exploits/' in file_path or 'auxiliary/' in file_path: @@ -6558,24 +6542,24 @@ class CVEIntelligenceManager: "reliability": "EXCELLENT" } all_exploits.append(exploit_entry) - + if code_results and "metasploit" not in sources_searched: sources_searched.append("metasploit") - + elif msf_response.status_code == 403: logger.warning("⚠️ GitHub API rate limit reached for code search") else: logger.warning(f"⚠️ Metasploit search failed with status {msf_response.status_code}") - + except requests.exceptions.RequestException as e: logger.error(f"❌ Metasploit search error: {str(e)}") - + # Add default sources to searched list default_sources = ["exploit-db", "github", "metasploit", "packetstorm"] for source in default_sources: if source not in sources_searched: sources_searched.append(source) - + # Sort exploits by reliability and date reliability_order = {"EXCELLENT": 4, "GOOD": 3, "FAIR": 2, "UNVERIFIED": 1} all_exploits.sort(key=lambda x: ( @@ -6583,9 +6567,9 @@ class CVEIntelligenceManager: x.get("stars", 0), x.get("date_published", "") ), reverse=True) - + logger.info(f"✅ Found {len(all_exploits)} total exploits from {len(sources_searched)} sources") - + return { "success": True, "cve_id": cve_id, @@ -6600,7 +6584,7 @@ class CVEIntelligenceManager: }, "search_timestamp": datetime.now().isoformat() } - + except Exception as e: logger.error(f"💥 Error searching exploits for {cve_id}: {str(e)}") return { @@ -7163,12 +7147,12 @@ def send_exploit(target_url, command): try: cve_id = cve_data.get("cve_id", "") description = cve_data.get("description", "").lower() - + logger.info(f"🛠️ Generating specific exploit for {cve_id}") # Enhanced vulnerability classification using real CVE data vuln_type, specific_details = self._analyze_vulnerability_details(description, cve_data) - + # Generate real, specific exploit based on CVE details if vuln_type == "sql_injection": exploit_code = self._generate_sql_injection_exploit(cve_data, target_info, specific_details) @@ -7293,7 +7277,7 @@ exec(base64.b64decode('{base64.b64encode(code.encode()).decode()}')) def _analyze_vulnerability_details(self, description, cve_data): """Analyze CVE data to extract specific vulnerability details""" import re # Import at the top of the method - + vuln_type = "generic" specific_details = { "endpoints": [], @@ -7303,10 +7287,10 @@ exec(base64.b64decode('{base64.b64encode(code.encode()).decode()}')) "version": "unknown", "attack_vector": "unknown" } - + # Extract specific details from description description_lower = description.lower() - + # SQL Injection detection and details if any(keyword in description_lower for keyword in ["sql injection", "sqli"]): vuln_type = "sql_injection" @@ -7318,7 +7302,7 @@ exec(base64.b64decode('{base64.b64encode(code.encode()).decode()}')) param_matches = re.findall(r'(?:via|parameter|param)\s+([a-zA-Z_][a-zA-Z0-9_]*)', description) if param_matches: specific_details["parameters"] = param_matches - + # XSS detection elif any(keyword in description_lower for keyword in ["cross-site scripting", "xss"]): vuln_type = "xss" @@ -7329,12 +7313,12 @@ exec(base64.b64decode('{base64.b64encode(code.encode()).decode()}')) specific_details["xss_type"] = "reflected" else: specific_details["xss_type"] = "unknown" - + # XXE detection elif any(keyword in description_lower for keyword in ["xxe", "xml external entity"]): vuln_type = "xxe" specific_details["payload_location"] = "xml" - + # File read/traversal detection elif any(keyword in description_lower for keyword in ["file read", "directory traversal", "path traversal", "arbitrary file", "file disclosure", "local file inclusion", "lfi", "file inclusion"]): vuln_type = "file_read" @@ -7344,34 +7328,34 @@ exec(base64.b64decode('{base64.b64encode(code.encode()).decode()}')) specific_details["traversal_type"] = "lfi" else: specific_details["traversal_type"] = "file_read" - + # Extract parameter names for LFI param_matches = re.findall(r'(?:via|parameter|param)\s+([a-zA-Z_][a-zA-Z0-9_]*)', description) if param_matches: specific_details["parameters"] = param_matches - + # Authentication bypass elif any(keyword in description_lower for keyword in ["authentication bypass", "auth bypass", "login bypass"]): vuln_type = "authentication_bypass" - + # RCE detection elif any(keyword in description_lower for keyword in ["remote code execution", "rce", "command injection"]): vuln_type = "rce" - + # Deserialization elif any(keyword in description_lower for keyword in ["deserialization", "unserialize", "pickle"]): vuln_type = "deserialization" - + # Buffer overflow elif any(keyword in description_lower for keyword in ["buffer overflow", "heap overflow", "stack overflow"]): vuln_type = "buffer_overflow" - + # Extract software and version info software_match = re.search(r'(\w+(?:\s+\w+)*)\s+v?(\d+(?:\.\d+)*)', description) if software_match: specific_details["software"] = software_match.group(1) specific_details["version"] = software_match.group(2) - + return vuln_type, specific_details def _generate_sql_injection_exploit(self, cve_data, target_info, details): @@ -7379,7 +7363,7 @@ exec(base64.b64decode('{base64.b64encode(code.encode()).decode()}')) cve_id = cve_data.get("cve_id", "") endpoint = details.get("endpoints", ["/vulnerable.php"])[0] if details.get("endpoints") else "/vulnerable.php" parameter = details.get("parameters", ["id"])[0] if details.get("parameters") else "id" - + return f'''#!/usr/bin/env python3 # SQL Injection Exploit for {cve_id} # Vulnerability: {cve_data.get("description", "")[:100]}... @@ -7509,7 +7493,7 @@ if __name__ == "__main__": """Generate specific XSS exploit based on CVE details""" cve_id = cve_data.get("cve_id", "") xss_type = details.get("xss_type", "reflected") - + return f'''#!/usr/bin/env python3 # Cross-Site Scripting (XSS) Exploit for {cve_id} # Type: {xss_type.title()} XSS @@ -7628,7 +7612,7 @@ if __name__ == "__main__": cve_id = cve_data.get("cve_id", "") parameter = details.get("parameters", ["portal_type"])[0] if details.get("parameters") else "portal_type" traversal_type = details.get("traversal_type", "file_read") - + return f'''#!/usr/bin/env python3 # Local File Inclusion (LFI) Exploit for {cve_id} # Vulnerability: {cve_data.get("description", "")[:100]}... @@ -7774,7 +7758,7 @@ if __name__ == "__main__": """Generate intelligent generic exploit based on CVE analysis""" cve_id = cve_data.get("cve_id", "") description = cve_data.get("description", "") - + return f'''#!/usr/bin/env python3 # Generic Exploit for {cve_id} # Vulnerability: {description[:150]}... @@ -7882,7 +7866,7 @@ if __name__ == "__main__": def _generate_specific_instructions(self, vuln_type, cve_data, target_info, details): """Generate specific usage instructions based on vulnerability type""" cve_id = cve_data.get("cve_id", "") - + base_instructions = f"""# Exploit for {cve_id} # Vulnerability Type: {vuln_type} # Software: {details.get('software', 'Unknown')} {details.get('version', '')} @@ -7929,7 +7913,7 @@ python3 exploit.py """ - Test for filter bypasses""" elif vuln_type == "file_read": - return base_instructions + f""" + return base_instructions + """ ## File Read/Directory Traversal: - Test with: python3 exploit.py http://target.com file_parameter @@ -7941,7 +7925,7 @@ python3 exploit.py """ - Test Windows paths: ..\\..\\..\\windows\\system32\\drivers\\etc\\hosts - Use URL encoding for bypasses""" - return base_instructions + f""" + return base_instructions + """ ## General Testing: - Run: python3 exploit.py @@ -7952,7 +7936,7 @@ python3 exploit.py """ def _generate_rce_exploit(self, cve_data, target_info, details): """Generate RCE exploit based on CVE details""" cve_id = cve_data.get("cve_id", "") - + return f'''#!/usr/bin/env python3 # Remote Code Execution Exploit for {cve_id} # Vulnerability: {cve_data.get("description", "")[:100]}... @@ -8080,7 +8064,7 @@ if __name__ == "__main__": def _generate_xxe_exploit(self, cve_data, target_info, details): """Generate XXE exploit based on CVE details""" cve_id = cve_data.get("cve_id", "") - + return f'''#!/usr/bin/env python3 # XXE (XML External Entity) Exploit for {cve_id} # Vulnerability: {cve_data.get("description", "")[:100]}... @@ -8167,7 +8151,7 @@ if __name__ == "__main__": def _generate_deserialization_exploit(self, cve_data, target_info, details): """Generate deserialization exploit based on CVE details""" cve_id = cve_data.get("cve_id", "") - + return f'''#!/usr/bin/env python3 # Deserialization Exploit for {cve_id} # Vulnerability: {cve_data.get("description", "")[:100]}... @@ -8253,7 +8237,7 @@ if __name__ == "__main__": def _generate_auth_bypass_exploit(self, cve_data, target_info, details): """Generate authentication bypass exploit""" cve_id = cve_data.get("cve_id", "") - + return f'''#!/usr/bin/env python3 # Authentication Bypass Exploit for {cve_id} # Vulnerability: {cve_data.get("description", "")[:100]}... @@ -8367,7 +8351,7 @@ if __name__ == "__main__": """Generate buffer overflow exploit""" cve_id = cve_data.get("cve_id", "") arch = target_info.get("target_arch", "x64") - + return f'''#!/usr/bin/env python3 # Buffer Overflow Exploit for {cve_id} # Architecture: {arch} @@ -10522,7 +10506,7 @@ def prowler(): logger.info(f"☁️ Starting Prowler {provider} security assessment") result = execute_command(command) result["output_directory"] = output_dir - logger.info(f"📊 Prowler assessment completed") + logger.info("📊 Prowler assessment completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in prowler endpoint: {str(e)}") @@ -10612,7 +10596,7 @@ def scout_suite(): logger.info(f"☁️ Starting Scout Suite {provider} assessment") result = execute_command(command) result["report_directory"] = report_dir - logger.info(f"📊 Scout Suite assessment completed") + logger.info("📊 Scout Suite assessment completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in scout-suite endpoint: {str(e)}") @@ -10688,7 +10672,7 @@ def pacu(): if additional_args: command += f" {additional_args}" - logger.info(f"☁️ Starting Pacu AWS exploitation") + logger.info("☁️ Starting Pacu AWS exploitation") result = execute_command(command) # Cleanup @@ -10697,7 +10681,7 @@ def pacu(): except: pass - logger.info(f"📊 Pacu exploitation completed") + logger.info("📊 Pacu exploitation completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in pacu endpoint: {str(e)}") @@ -10739,9 +10723,9 @@ def kube_hunter(): if additional_args: command += f" {additional_args}" - logger.info(f"☁️ Starting kube-hunter Kubernetes scan") + logger.info("☁️ Starting kube-hunter Kubernetes scan") result = execute_command(command) - logger.info(f"📊 kube-hunter scan completed") + logger.info("📊 kube-hunter scan completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in kube-hunter endpoint: {str(e)}") @@ -10775,9 +10759,9 @@ def kube_bench(): if additional_args: command += f" {additional_args}" - logger.info(f"☁️ Starting kube-bench CIS benchmark") + logger.info("☁️ Starting kube-bench CIS benchmark") result = execute_command(command) - logger.info(f"📊 kube-bench benchmark completed") + logger.info("📊 kube-bench benchmark completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in kube-bench endpoint: {str(e)}") @@ -10807,10 +10791,10 @@ def docker_bench_security(): if additional_args: command += f" {additional_args}" - logger.info(f"🐳 Starting Docker Bench Security assessment") + logger.info("🐳 Starting Docker Bench Security assessment") result = execute_command(command) result["output_file"] = output_file - logger.info(f"📊 Docker Bench Security completed") + logger.info("📊 Docker Bench Security completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in docker-bench-security endpoint: {str(e)}") @@ -10877,7 +10861,7 @@ def falco(): logger.info(f"🛡️ Starting Falco runtime monitoring for {duration}s") result = execute_command(command) - logger.info(f"📊 Falco monitoring completed") + logger.info("📊 Falco monitoring completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in falco endpoint: {str(e)}") @@ -10914,7 +10898,7 @@ def checkov(): logger.info(f"🔍 Starting Checkov IaC scan: {directory}") result = execute_command(command) - logger.info(f"📊 Checkov scan completed") + logger.info("📊 Checkov scan completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in checkov endpoint: {str(e)}") @@ -10948,7 +10932,7 @@ def terrascan(): logger.info(f"🔍 Starting Terrascan IaC scan: {iac_dir}") result = execute_command(command) - logger.info(f"📊 Terrascan scan completed") + logger.info("📊 Terrascan scan completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in terrascan endpoint: {str(e)}") @@ -11115,7 +11099,7 @@ def hydra(): "error": "Username/username_file and password/password_file are required" }), 400 - command = f"hydra -t 4" + command = "hydra -t 4" if username: command += f" -l {username}" @@ -11158,7 +11142,7 @@ def john(): "error": "Hash file parameter is required" }), 400 - command = f"john" + command = "john" if format_type: command += f" --format={format_type}" @@ -11173,7 +11157,7 @@ def john(): logger.info(f"🔐 Starting John the Ripper: {hash_file}") result = execute_command(command) - logger.info(f"📊 John the Ripper completed") + logger.info("📊 John the Ripper completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in john endpoint: {str(e)}") @@ -11253,7 +11237,7 @@ def ffuf(): "error": "URL parameter is required" }), 400 - command = f"ffuf" + command = "ffuf" if mode == "directory": command += f" -u {url}/FUZZ -w {wordlist}" @@ -11396,7 +11380,7 @@ def hashcat(): logger.info(f"🔐 Starting Hashcat attack: mode {attack_mode}") result = execute_command(command) - logger.info(f"📊 Hashcat attack completed") + logger.info("📊 Hashcat attack completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in hashcat endpoint: {str(e)}") @@ -11509,7 +11493,7 @@ def rustscan(): command += f" -p {ports}" if scripts: - command += f" -- -sC -sV" + command += " -- -sC -sV" if additional_args: command += f" {additional_args}" @@ -11818,7 +11802,7 @@ def arp_scan(): logger.info(f"🔍 Starting arp-scan: {target if target else 'local network'}") result = execute_command(command) - logger.info(f"📊 arp-scan completed") + logger.info("📊 arp-scan completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in arp-scan endpoint: {str(e)}") @@ -11860,7 +11844,7 @@ def responder(): logger.info(f"🔍 Starting Responder on interface: {interface}") result = execute_command(command) - logger.info(f"📊 Responder completed") + logger.info("📊 Responder completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in responder endpoint: {str(e)}") @@ -11900,7 +11884,7 @@ def volatility(): logger.info(f"🧠 Starting Volatility analysis: {plugin}") result = execute_command(command) - logger.info(f"📊 Volatility analysis completed") + logger.info("📊 Volatility analysis completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in volatility endpoint: {str(e)}") @@ -11945,7 +11929,7 @@ def msfvenom(): logger.info(f"🚀 Starting MSFVenom payload generation: {payload}") result = execute_command(command) - logger.info(f"📊 MSFVenom payload generated") + logger.info("📊 MSFVenom payload generated") return jsonify(result) except Exception as e: logger.error(f"💥 Error in msfvenom endpoint: {str(e)}") @@ -12064,7 +12048,7 @@ def binwalk(): "error": "File path parameter is required" }), 400 - command = f"binwalk" + command = "binwalk" if extract: command += " -e" @@ -12225,7 +12209,7 @@ def objdump(): "error": "Binary parameter is required" }), 400 - command = f"objdump" + command = "objdump" if disassemble: command += " -d" @@ -12360,7 +12344,7 @@ p.interactive() except: pass - logger.info(f"📊 Pwntools exploit completed") + logger.info("📊 Pwntools exploit completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in pwntools endpoint: {str(e)}") @@ -12386,7 +12370,7 @@ def one_gadget(): logger.info(f"🔧 Starting one_gadget analysis: {libc_path}") result = execute_command(command) - logger.info(f"📊 one_gadget analysis completed") + logger.info("📊 one_gadget analysis completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in one_gadget endpoint: {str(e)}") @@ -12489,7 +12473,7 @@ quit except: pass - logger.info(f"📊 GDB-PEDA analysis completed") + logger.info("📊 GDB-PEDA analysis completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in gdb-peda endpoint: {str(e)}") @@ -12580,7 +12564,7 @@ for func_addr, func in cfg.functions.items(): except: pass - logger.info(f"📊 angr analysis completed") + logger.info("📊 angr analysis completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in angr endpoint: {str(e)}") @@ -12627,7 +12611,7 @@ def ropper(): logger.info(f"🔧 Starting ropper analysis: {binary}") result = execute_command(command) - logger.info(f"📊 ropper analysis completed") + logger.info("📊 ropper analysis completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in ropper endpoint: {str(e)}") @@ -12664,7 +12648,7 @@ def pwninit(): logger.info(f"🔧 Starting pwninit setup: {binary}") result = execute_command(command) - logger.info(f"📊 pwninit setup completed") + logger.info("📊 pwninit setup completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in pwninit endpoint: {str(e)}") @@ -13126,7 +13110,7 @@ def dalfox(): logger.info(f"🎯 Starting Dalfox XSS scan: {url if url else 'pipe mode'}") result = execute_command(command) - logger.info(f"📊 Dalfox XSS scan completed") + logger.info("📊 Dalfox XSS scan completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in dalfox endpoint: {str(e)}") @@ -13389,7 +13373,7 @@ class HTTPTestingFramework: def _apply_match_replace(self, url: str, data, headers: dict): import re - from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse + from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse original_url = url out_headers = dict(headers) out_data = data @@ -13439,7 +13423,7 @@ class HTTPTestingFramework: params: list = None, payloads: list = None, base_data: dict = None, max_requests: int = 100) -> dict: """Simple fuzzing: iterate payloads over each parameter individually (Sniper).""" - from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse + from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse params = params or [] payloads = payloads or ["'\"<>`, ${7*7}"] base_data = base_data or {} @@ -14526,7 +14510,7 @@ def execute_python_script(): result["env_name"] = env_name result["script_filename"] = filename - logger.info(f"📊 Python script execution completed") + logger.info("📊 Python script execution completed") return jsonify(result) except Exception as e: @@ -14881,7 +14865,7 @@ def api_fuzzer(): logger.info(f"🔍 Starting API endpoint discovery: {base_url}") result = execute_command(command) - logger.info(f"📊 API endpoint discovery completed") + logger.info("📊 API endpoint discovery completed") return jsonify({ "success": True, @@ -15016,7 +15000,7 @@ def jwt_analyzer(): "error": "JWT token parameter is required" }), 400 - logger.info(f"🔍 Starting JWT security analysis") + logger.info("🔍 Starting JWT security analysis") results = { "token": jwt_token[:50] + "..." if len(jwt_token) > 50 else jwt_token, @@ -15081,7 +15065,7 @@ def jwt_analyzer(): "description": f"Token decoding failed: {str(decode_error)}" }) - except Exception as e: + except Exception: results["vulnerabilities"].append({ "type": "invalid_format", "severity": "HIGH", @@ -15264,7 +15248,7 @@ def volatility3(): logger.info(f"🧠 Starting Volatility3 analysis: {plugin}") result = execute_command(command) - logger.info(f"📊 Volatility3 analysis completed") + logger.info("📊 Volatility3 analysis completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in volatility3 endpoint: {str(e)}") @@ -15304,7 +15288,7 @@ def foremost(): logger.info(f"📁 Starting Foremost file carving: {input_file}") result = execute_command(command) result["output_directory"] = output_dir - logger.info(f"📊 Foremost carving completed") + logger.info("📊 Foremost carving completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in foremost endpoint: {str(e)}") @@ -15377,7 +15361,7 @@ def exiftool(): "error": "File path parameter is required" }), 400 - command = f"exiftool" + command = "exiftool" if output_format: command += f" -{output_format}" @@ -15392,7 +15376,7 @@ def exiftool(): logger.info(f"📷 Starting ExifTool analysis: {file_path}") result = execute_command(command) - logger.info(f"📊 ExifTool analysis completed") + logger.info("📊 ExifTool analysis completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in exiftool endpoint: {str(e)}") @@ -15422,9 +15406,9 @@ def hashpump(): if additional_args: command += f" {additional_args}" - logger.info(f"🔐 Starting HashPump attack") + logger.info("🔐 Starting HashPump attack") result = execute_command(command) - logger.info(f"📊 HashPump attack completed") + logger.info("📊 HashPump attack completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in hashpump endpoint: {str(e)}") @@ -15481,7 +15465,7 @@ def hakrawler(): logger.info(f"🕷️ Starting Hakrawler crawling: {url}") result = execute_command(command) - logger.info(f"📊 Hakrawler crawling completed") + logger.info("📊 Hakrawler crawling completed") return jsonify(result) except Exception as e: logger.error(f"💥 Error in hakrawler endpoint: {str(e)}")