From db06dcfa63ac6ed31761c86862a5f4a1044959e1 Mon Sep 17 00:00:00 2001 From: giveen Date: Wed, 21 Jan 2026 11:27:04 -0700 Subject: [PATCH] chore(mcp): remove hexstrike/metasploit adapters and helper scripts --- pentestagent/mcp/hexstrike_adapter.py | 338 -------------------- pentestagent/mcp/metasploit_adapter.py | 414 ------------------------- requirements-hexstrike.txt | 3 - scripts/add_hexstrike_subtree.sh | 32 -- scripts/install_hexstrike_deps.ps1 | 45 --- scripts/install_hexstrike_deps.sh | 42 --- 6 files changed, 874 deletions(-) delete mode 100644 pentestagent/mcp/hexstrike_adapter.py delete mode 100644 pentestagent/mcp/metasploit_adapter.py delete mode 100644 requirements-hexstrike.txt delete mode 100755 scripts/add_hexstrike_subtree.sh delete mode 100644 scripts/install_hexstrike_deps.ps1 delete mode 100644 scripts/install_hexstrike_deps.sh diff --git a/pentestagent/mcp/hexstrike_adapter.py b/pentestagent/mcp/hexstrike_adapter.py deleted file mode 100644 index 5e82eb9..0000000 --- a/pentestagent/mcp/hexstrike_adapter.py +++ /dev/null @@ -1,338 +0,0 @@ -"""Adapter to manage a vendored HexStrike MCP server. - -This adapter provides a simple programmatic API to start/stop the vendored -HexStrike server (expected under ``third_party/hexstrike``) and to perform a -health check before returning control to the caller. - -The adapter is intentionally lightweight (no Docker) and uses an async -subprocess so the server can run in the background while the TUI/runtime -operates. -""" - -import asyncio -import os -import shutil -import signal -import time -from pathlib import Path -from typing import Optional - -try: - import aiohttp -except Exception: - aiohttp = None - - -from ..workspaces.utils import get_loot_file - - -class HexstrikeAdapter: - """Manage a vendored HexStrike server under `third_party/hexstrike`. - - Usage: - adapter = HexstrikeAdapter() - await adapter.start() - # ... use MCPManager to connect to the server - await adapter.stop() - """ - - def __init__( - self, - host: str = "127.0.0.1", - port: int = 8888, - python_cmd: str = "python3", - server_script: Optional[Path] = None, - cwd: Optional[Path] = None, - env: Optional[dict] = None, - ) -> None: - self.host = host - self.port = int(port) - self.python_cmd = python_cmd - self.server_script = ( - server_script - or Path("third_party/hexstrike/hexstrike_server.py") - ) - self.cwd = cwd or Path.cwd() - self.env = {**os.environ, **(env or {})} - - self._process: Optional[asyncio.subprocess.Process] = None - self._reader_task: Optional[asyncio.Task] = None - - def _build_command(self): - return [self.python_cmd, str(self.server_script), "--port", str(self.port)] - - async def start(self, background: bool = True, timeout: int = 30) -> bool: - """Start the vendored HexStrike server. - - Returns True if the server started and passed health check within - `timeout` seconds. - """ - if not self.server_script.exists(): - raise FileNotFoundError( - f"HexStrike server script not found at {self.server_script}." - ) - - if self._process and self._process.returncode is None: - return await self.health_check(timeout=1) - - cmd = self._build_command() - - # Resolve python command if possible - resolved = shutil.which(self.python_cmd) or self.python_cmd - - self._process = await asyncio.create_subprocess_exec( - resolved, - *cmd[1:], - cwd=str(self.cwd), - env=self.env, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - start_new_session=True, - ) - - # Log PID for debugging and management - try: - pid = getattr(self._process, "pid", None) - if pid: - log_file = get_loot_file("artifacts/hexstrike.log") - with log_file.open("a") as fh: - fh.write(f"[HexstrikeAdapter] started pid={pid}\n") - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Failed to write hexstrike start PID to log: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Failed to write hexstrike PID to log: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about hexstrike PID log failure") - - # Start a background reader task to capture logs - loop = asyncio.get_running_loop() - self._reader_task = loop.create_task(self._capture_output()) - - # Wait for health check - try: - return await self.health_check(timeout=timeout) - except Exception: - return False - - async def _capture_output(self) -> None: - """Capture stdout/stderr from the server and append to the log file.""" - if not self._process or not self._process.stdout: - return - - try: - log_file = get_loot_file("artifacts/hexstrike.log") - with log_file.open("ab") as fh: - while True: - line = await self._process.stdout.readline() - if not line: - break - fh.write(line) - fh.flush() - except asyncio.CancelledError: - return - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Error capturing hexstrike output: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"HexStrike log capture failed: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about hexstrike log capture failure") - - async def stop(self, timeout: int = 5) -> None: - """Stop the server process gracefully.""" - proc = self._process - if not proc: - return - - try: - proc.terminate() - await asyncio.wait_for(proc.wait(), timeout=timeout) - except asyncio.TimeoutError: - try: - proc.kill() - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Failed to kill hexstrike after timeout: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Failed to kill hexstrike after timeout: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about hexstrike kill failure") - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Error stopping hexstrike process: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Error stopping hexstrike process: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about hexstrike stop error") - - self._process = None - - if self._reader_task and not self._reader_task.done(): - self._reader_task.cancel() - try: - await self._reader_task - except Exception as e: - import logging - logging.getLogger(__name__).exception("Error awaiting hexstrike reader task: %s", e) - try: - from ..interface.notifier import notify - notify("warning", f"Error awaiting hexstrike reader task: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about hexstrike reader await failure") - - def stop_sync(self, timeout: int = 5) -> None: - """Synchronous stop helper for use during process-exit cleanup. - - This forcefully terminates the underlying subprocess PID if the - async event loop is no longer available. - """ - proc = self._process - if not proc: - return - - # Try to terminate gracefully first - try: - pid = getattr(proc, "pid", None) - if pid: - # Kill the whole process group if possible (handles children) - try: - pgid = os.getpgid(pid) - os.killpg(pgid, signal.SIGTERM) - except Exception: - try: - os.kill(pid, signal.SIGTERM) - except Exception: - import logging - - logging.getLogger(__name__).exception("Failed to SIGTERM hexstrike pid: %s", pid) - try: - from ..interface.notifier import notify - - notify("warning", f"Failed to SIGTERM hexstrike pid {pid}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about hexstrike SIGTERM failure") - - # wait briefly for process to exit - end = time.time() + float(timeout) - while time.time() < end: - ret = getattr(proc, "returncode", None) - if ret is not None: - break - time.sleep(0.1) - - # If still running, force kill the process group - try: - pgid = os.getpgid(pid) - os.killpg(pgid, signal.SIGKILL) - except Exception: - try: - os.kill(pid, signal.SIGKILL) - except Exception: - import logging - - logging.getLogger(__name__).exception("Failed to SIGKILL hexstrike pid: %s", pid) - try: - from ..interface.notifier import notify - - notify("warning", f"Failed to SIGKILL hexstrike pid {pid}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about hexstrike SIGKILL failure") - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Error during hexstrike stop_sync cleanup: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Error during hexstrike stop_sync cleanup: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about hexstrike stop_sync cleanup error") - - def __del__(self): - try: - self.stop_sync() - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Exception during HexstrikeAdapter.__del__: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Error during HexstrikeAdapter cleanup: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about hexstrike __del__ error") - # Clear references - try: - self._process = None - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Failed to clear HexstrikeAdapter process reference: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Failed to clear hexstrike process reference: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about hexstrike process-clear failure") - - async def health_check(self, timeout: int = 5) -> bool: - """Check the server health endpoint. Returns True if healthy.""" - url = f"http://{self.host}:{self.port}/health" - - if aiohttp: - try: - async with aiohttp.ClientSession() as session: - async with session.get(url, timeout=timeout) as resp: - return resp.status == 200 - except Exception as e: - import logging - - logging.getLogger(__name__).exception("HexstrikeAdapter health_check (aiohttp) failed: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"HexStrike health check failed: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about hexstrike health check failure") - return False - - # Fallback: synchronous urllib in thread - import urllib.request - - def _check(): - try: - with urllib.request.urlopen(url, timeout=timeout) as r: - return r.status == 200 - except Exception as e: - import logging - - logging.getLogger(__name__).exception("HexstrikeAdapter health_check (urllib) failed: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"HexStrike health check failed: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about hexstrike urllib health check failure") - return False - - loop = asyncio.get_running_loop() - return await loop.run_in_executor(None, _check) - - def is_running(self) -> bool: - return self._process is not None and self._process.returncode is None - - -__all__ = ["HexstrikeAdapter"] diff --git a/pentestagent/mcp/metasploit_adapter.py b/pentestagent/mcp/metasploit_adapter.py deleted file mode 100644 index 39f0a7c..0000000 --- a/pentestagent/mcp/metasploit_adapter.py +++ /dev/null @@ -1,414 +0,0 @@ -"""Adapter to manage a vendored Metasploit MCP server. - -This follows the same lightweight pattern as the Hexstrike adapter: it -expects the MetasploitMCP repository to be vendored under -``third_party/MetasploitMCP`` (or a custom path provided by the caller). -The adapter starts the server as a background subprocess and performs a -health check on a configurable port. -""" - -import asyncio -import os -import shutil -import signal -import time -from pathlib import Path -from typing import Optional - -try: - import aiohttp -except Exception: - aiohttp = None - - -from ..workspaces.utils import get_loot_file - - -class MetasploitAdapter: - """Manage a vendored Metasploit MCP server under `third_party/MetasploitMCP`. - - Usage: - adapter = MetasploitAdapter() - await adapter.start() - # ... use MCPManager to connect to the server - await adapter.stop() - """ - - def __init__( - self, - host: str = "127.0.0.1", - port: int = 7777, - python_cmd: str = "python3", - server_script: Optional[Path] = None, - cwd: Optional[Path] = None, - env: Optional[dict] = None, - transport: str = "http", - ) -> None: - self.host = host - self.port = int(port) - self.python_cmd = python_cmd - # Vendored project uses 'MetasploitMCP.py' as the main entrypoint - self.server_script = ( - server_script or Path("third_party/MetasploitMCP/MetasploitMCP.py") - ) - self.cwd = cwd or Path.cwd() - self.env = {**os.environ, **(env or {})} - self.transport = transport - - self._process: Optional[asyncio.subprocess.Process] = None - self._reader_task: Optional[asyncio.Task] = None - self._msfrpcd_proc: Optional[asyncio.subprocess.Process] = None - - def _build_command(self): - cmd = [self.python_cmd, str(self.server_script)] - # Prefer explicit transport when starting vendored server from adapter - if self.transport: - cmd += ["--transport", str(self.transport)] - # When running HTTP, ensure host/port are provided - if str(self.transport).lower() in ("http", "sse"): - cmd += ["--host", str(self.host), "--port", str(self.port)] - else: - # For other transports, allow default args - cmd += ["--port", str(self.port)] - return cmd - - async def _start_msfrpcd_if_needed(self) -> None: - """Start `msfrpcd` if it's not already reachable at MSF_SERVER:MSF_PORT. - - This starts `msfrpcd` as a child process (no sudo) using MSF_* env - values if available. It's intentionally conservative: if the RPC - endpoint is already listening we won't try to start a new daemon. - """ - try: - msf_server = str(self.env.get("MSF_SERVER", "127.0.0.1")) - msf_port = int(self.env.get("MSF_PORT", 55553)) - except Exception: - msf_server = "127.0.0.1" - msf_port = 55553 - - # Quick socket check to see if msfrpcd is already listening - import socket - - try: - with socket.create_connection((msf_server, msf_port), timeout=1): - return - except Exception: - pass - - # If msfrpcd not available on path, skip starting - if not shutil.which("msfrpcd"): - return - - msf_user = str(self.env.get("MSF_USER", "msf")) - msf_password = str(self.env.get("MSF_PASSWORD", "")) - msf_ssl = str(self.env.get("MSF_SSL", "false")).lower() in ("1", "true", "yes", "y") - - # Build args for msfrpcd (no sudo). Use -S (SSL optional) flag only if requested. - args = ["msfrpcd", "-U", msf_user, "-P", msf_password, "-a", msf_server, "-p", str(msf_port)] - if msf_ssl: - args.append("-S") - - try: - resolved = shutil.which("msfrpcd") or "msfrpcd" - self._msfrpcd_proc = await asyncio.create_subprocess_exec( - resolved, - *args[1:], - cwd=str(self.cwd), - env=self.env, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - start_new_session=True, - ) - # Start reader to capture msfrpcd logs - loop = asyncio.get_running_loop() - loop.create_task(self._capture_msfrpcd_output()) - - # Poll the msfrpcd TCP socket until it's accepting connections or timeout - import socket - deadline = asyncio.get_event_loop().time() + 10.0 - while asyncio.get_event_loop().time() < deadline: - try: - with socket.create_connection((msf_server, msf_port), timeout=1): - return - except Exception: - await asyncio.sleep(0.5) - # If we fallthrough, msfrpcd didn't become ready in time - return - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Failed to start msfrpcd: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Failed to start msfrpcd: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd start failure") - return - - async def _capture_msfrpcd_output(self) -> None: - if not self._msfrpcd_proc or not self._msfrpcd_proc.stdout: - return - try: - log_file = get_loot_file("artifacts/msfrpcd.log") - with log_file.open("ab") as fh: - while True: - line = await self._msfrpcd_proc.stdout.readline() - if not line: - break - fh.write(b"[msfrpcd] " + line) - fh.flush() - except asyncio.CancelledError: - return - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Error capturing msfrpcd output: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"msfrpcd log capture failed: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd log capture failure") - - async def start(self, background: bool = True, timeout: int = 30) -> bool: - """Start the vendored Metasploit MCP server. - - Returns True if the server started and passed health check within - `timeout` seconds. - """ - if not self.server_script.exists(): - raise FileNotFoundError( - f"Metasploit MCP server script not found at {self.server_script}." - ) - - if self._process and self._process.returncode is None: - return await self.health_check(timeout=1) - - # If running in HTTP/SSE mode, ensure msfrpcd is started and reachable - if str(self.transport).lower() in ("http", "sse"): - try: - await self._start_msfrpcd_if_needed() - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Error starting msfrpcd: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Error starting msfrpcd: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd error") - - cmd = self._build_command() - resolved = shutil.which(self.python_cmd) or self.python_cmd - - self._process = await asyncio.create_subprocess_exec( - resolved, - *cmd[1:], - cwd=str(self.cwd), - env=self.env, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - start_new_session=True, - ) - - # Log PID - try: - pid = getattr(self._process, "pid", None) - if pid: - log_file = get_loot_file("artifacts/metasploit_mcp.log") - with log_file.open("a") as fh: - fh.write(f"[MetasploitAdapter] started pid={pid}\n") - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Failed to write metasploit start PID to log: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Failed to write metasploit PID to log: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about metasploit PID log failure") - - # Start background reader - loop = asyncio.get_running_loop() - self._reader_task = loop.create_task(self._capture_output()) - - try: - return await self.health_check(timeout=timeout) - except Exception as e: - import logging - - logging.getLogger(__name__).exception("MetasploitAdapter health_check raised: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Metasploit health check failed: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about metasploit health check failure") - return False - - async def _capture_output(self) -> None: - if not self._process or not self._process.stdout: - return - - try: - log_file = get_loot_file("artifacts/metasploit_mcp.log") - with log_file.open("ab") as fh: - while True: - line = await self._process.stdout.readline() - if not line: - break - fh.write(line) - fh.flush() - except asyncio.CancelledError: - return - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Error capturing metasploit output: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Metasploit log capture failed: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about metasploit log capture failure") - - async def stop(self, timeout: int = 5) -> None: - proc = self._process - if not proc: - return - - try: - proc.terminate() - await asyncio.wait_for(proc.wait(), timeout=timeout) - except asyncio.TimeoutError: - try: - proc.kill() - except Exception: - pass - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Error waiting for process termination: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Error stopping metasploit adapter: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about metasploit stop error") - - self._process = None - - if self._reader_task and not self._reader_task.done(): - self._reader_task.cancel() - try: - await self._reader_task - except Exception as e: - import logging - logging.getLogger(__name__).exception("Failed to kill msfrpcd during stop: %s", e) - try: - from ..interface.notifier import notify - notify("warning", f"Failed to kill msfrpcd: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd kill failure") - - # Stop msfrpcd if we started it - try: - msf_proc = self._msfrpcd_proc - if msf_proc: - try: - msf_proc.terminate() - await asyncio.wait_for(msf_proc.wait(), timeout=timeout) - except asyncio.TimeoutError: - try: - msf_proc.kill() - except Exception: - pass - except Exception as e: - import logging - - logging.getLogger(__name__).exception("Error stopping metasploit adapter cleanup: %s", e) - try: - from ..interface.notifier import notify - - notify("warning", f"Error stopping metasploit adapter: {e}") - except Exception: - logging.getLogger(__name__).exception("Failed to notify operator about metasploit adapter cleanup error") - finally: - self._msfrpcd_proc = None - - def stop_sync(self, timeout: int = 5) -> None: - proc = self._process - if not proc: - return - - try: - pid = getattr(proc, "pid", None) - if pid: - try: - pgid = os.getpgid(pid) - os.killpg(pgid, signal.SIGTERM) - except Exception: - try: - os.kill(pid, signal.SIGTERM) - except Exception: - pass - - end = time.time() + float(timeout) - while time.time() < end: - ret = getattr(proc, "returncode", None) - if ret is not None: - break - time.sleep(0.1) - - try: - pgid = os.getpgid(pid) - os.killpg(pgid, signal.SIGKILL) - except Exception: - try: - os.kill(pid, signal.SIGKILL) - except Exception: - pass - except Exception: - pass - - def __del__(self): - try: - self.stop_sync() - except Exception: - pass - try: - self._process = None - except Exception: - pass - - async def health_check(self, timeout: int = 5) -> bool: - url = f"http://{self.host}:{self.port}/health" - - if aiohttp: - try: - async with aiohttp.ClientSession() as session: - async with session.get(url, timeout=timeout) as resp: - return resp.status == 200 - except Exception: - return False - - import urllib.request - - def _check(): - try: - with urllib.request.urlopen(url, timeout=timeout) as r: - return r.status == 200 - except Exception: - return False - - loop = asyncio.get_running_loop() - return await loop.run_in_executor(None, _check) - - def is_running(self) -> bool: - return self._process is not None and self._process.returncode is None - - -__all__ = ["MetasploitAdapter"] diff --git a/requirements-hexstrike.txt b/requirements-hexstrike.txt deleted file mode 100644 index 024e8b7..0000000 --- a/requirements-hexstrike.txt +++ /dev/null @@ -1,3 +0,0 @@ -# Wrapper requirements file for vendored HexStrike dependencies -# This delegates to the vendored requirements in third_party/hexstrike. --r third_party/hexstrike/requirements.txt diff --git a/scripts/add_hexstrike_subtree.sh b/scripts/add_hexstrike_subtree.sh deleted file mode 100755 index 69dc060..0000000 --- a/scripts/add_hexstrike_subtree.sh +++ /dev/null @@ -1,32 +0,0 @@ -#!/usr/bin/env bash -# Helper script to vendor HexStrike into this repo using git subtree. -# Run from repository root. - -set -euo pipefail - -REPO_URL="https://github.com/0x4m4/hexstrike-ai.git" -PREFIX="third_party/hexstrike" -BRANCH="main" - -echo "This will add HexStrike as a git subtree under ${PREFIX}." -echo "If the subtree already exists, the script will pull and rebase the subtree instead.\n" - -if [ -d "${PREFIX}" ]; then - echo "Detected existing subtree at ${PREFIX}." - if [ "${FORCE_SUBTREE_PULL:-false}" = "true" ]; then - echo "FORCE_SUBTREE_PULL=true: pulling latest changes into existing subtree..." - git subtree pull --prefix="${PREFIX}" "${REPO_URL}" "${BRANCH}" --squash || { - echo "git subtree pull failed; attempting without --squash..." - git subtree pull --prefix="${PREFIX}" "${REPO_URL}" "${BRANCH}" || exit 1 - } - echo "Subtree at ${PREFIX} updated." - else - echo "To update the existing subtree run:" - echo " FORCE_SUBTREE_PULL=true bash scripts/add_hexstrike_subtree.sh" - echo "Or run manually: git subtree pull --prefix=\"${PREFIX}\" ${REPO_URL} ${BRANCH} --squash" - fi -else - echo "Adding subtree for the first time..." - git subtree add --prefix="${PREFIX}" "${REPO_URL}" "${BRANCH}" --squash - echo "HexStrike subtree added under ${PREFIX}." -fi diff --git a/scripts/install_hexstrike_deps.ps1 b/scripts/install_hexstrike_deps.ps1 deleted file mode 100644 index 68bc45a..0000000 --- a/scripts/install_hexstrike_deps.ps1 +++ /dev/null @@ -1,45 +0,0 @@ -<# -Install vendored HexStrike Python dependencies (Windows/PowerShell). - -This mirrors `scripts/install_hexstrike_deps.sh` for Windows users. -#> -Set-StrictMode -Version Latest - -Write-Host "Installing vendored HexStrike dependencies (Windows)..." - -# Load .env if present (simple parser: ignore comments/blank lines) -if (Test-Path -Path ".env") { - Write-Host "Sourcing .env" - Get-Content .env | ForEach-Object { - $line = $_.Trim() - if ($line -and -not $line.StartsWith("#") -and $line.Contains("=")) { - $parts = $line -split "=", 2 - $name = $parts[0].Trim() - $value = $parts[1].Trim() - # Only set if not empty - if ($name) { $env:$name = $value } - } - } -} - -$req = Join-Path -Path (Get-Location) -ChildPath "third_party/hexstrike/requirements.txt" - -if (-not (Test-Path -Path $req)) { - Write-Host "Cannot find $req. Is the HexStrike subtree present?" -ForegroundColor Yellow - exit 1 -} - -# Prefer venv python if present -$python = "python" -if (Test-Path -Path ".\venv\Scripts\python.exe") { - $python = Join-Path -Path (Get-Location) -ChildPath ".\venv\Scripts\python.exe" -} - -Write-Host "Using Python: $python" - -& $python -m pip install --upgrade pip -& $python -m pip install -r $req - -Write-Host "HexStrike dependencies installed. Note: many external tools are not included and must be installed separately as described in third_party/hexstrike/requirements.txt." -ForegroundColor Green - -exit 0 diff --git a/scripts/install_hexstrike_deps.sh b/scripts/install_hexstrike_deps.sh deleted file mode 100644 index 14a6cef..0000000 --- a/scripts/install_hexstrike_deps.sh +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env bash -set -euo pipefail - -# Install vendored HexStrike Python dependencies. -# This script will source a local .env if present so any environment -# variables (proxies/indices/LLM keys) are respected during installation. - -HERE=$(dirname "${BASH_SOURCE[0]}") -ROOT=$(cd "$HERE/.." && pwd) - -cd "$ROOT" - -if [ -f ".env" ]; then - echo "Sourcing .env" - # export all vars from .env (ignore comments and blank lines) - set -a - # shellcheck disable=SC1091 - source .env - set +a -fi - -REQ=third_party/hexstrike/requirements.txt - -if [ ! -f "$REQ" ]; then - echo "Cannot find $REQ. Is the HexStrike subtree present?" - exit 1 -fi - -echo "Installing HexStrike requirements from $REQ" - -# Prefer using the active venv python if present -PY=$(which python || true) -if [ -n "${VIRTUAL_ENV:-}" ]; then - PY="$VIRTUAL_ENV/bin/python" -fi - -"$PY" -m pip install --upgrade pip -"$PY" -m pip install -r "$REQ" - -echo "HexStrike dependencies installed. Note: many external tools are not included and must be installed separately as described in third_party/hexstrike/requirements.txt." - -exit 0