chore(mcp): remove hexstrike/metasploit adapters and helper scripts

This commit is contained in:
giveen
2026-01-21 11:27:04 -07:00
parent 30e2476d01
commit db06dcfa63
6 changed files with 0 additions and 874 deletions

View File

@@ -1,338 +0,0 @@
"""Adapter to manage a vendored HexStrike MCP server.
This adapter provides a simple programmatic API to start/stop the vendored
HexStrike server (expected under ``third_party/hexstrike``) and to perform a
health check before returning control to the caller.
The adapter is intentionally lightweight (no Docker) and uses an async
subprocess so the server can run in the background while the TUI/runtime
operates.
"""
import asyncio
import os
import shutil
import signal
import time
from pathlib import Path
from typing import Optional
try:
import aiohttp
except Exception:
aiohttp = None
from ..workspaces.utils import get_loot_file
class HexstrikeAdapter:
"""Manage a vendored HexStrike server under `third_party/hexstrike`.
Usage:
adapter = HexstrikeAdapter()
await adapter.start()
# ... use MCPManager to connect to the server
await adapter.stop()
"""
def __init__(
self,
host: str = "127.0.0.1",
port: int = 8888,
python_cmd: str = "python3",
server_script: Optional[Path] = None,
cwd: Optional[Path] = None,
env: Optional[dict] = None,
) -> None:
self.host = host
self.port = int(port)
self.python_cmd = python_cmd
self.server_script = (
server_script
or Path("third_party/hexstrike/hexstrike_server.py")
)
self.cwd = cwd or Path.cwd()
self.env = {**os.environ, **(env or {})}
self._process: Optional[asyncio.subprocess.Process] = None
self._reader_task: Optional[asyncio.Task] = None
def _build_command(self):
return [self.python_cmd, str(self.server_script), "--port", str(self.port)]
async def start(self, background: bool = True, timeout: int = 30) -> bool:
"""Start the vendored HexStrike server.
Returns True if the server started and passed health check within
`timeout` seconds.
"""
if not self.server_script.exists():
raise FileNotFoundError(
f"HexStrike server script not found at {self.server_script}."
)
if self._process and self._process.returncode is None:
return await self.health_check(timeout=1)
cmd = self._build_command()
# Resolve python command if possible
resolved = shutil.which(self.python_cmd) or self.python_cmd
self._process = await asyncio.create_subprocess_exec(
resolved,
*cmd[1:],
cwd=str(self.cwd),
env=self.env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
start_new_session=True,
)
# Log PID for debugging and management
try:
pid = getattr(self._process, "pid", None)
if pid:
log_file = get_loot_file("artifacts/hexstrike.log")
with log_file.open("a") as fh:
fh.write(f"[HexstrikeAdapter] started pid={pid}\n")
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to write hexstrike start PID to log: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to write hexstrike PID to log: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike PID log failure")
# Start a background reader task to capture logs
loop = asyncio.get_running_loop()
self._reader_task = loop.create_task(self._capture_output())
# Wait for health check
try:
return await self.health_check(timeout=timeout)
except Exception:
return False
async def _capture_output(self) -> None:
"""Capture stdout/stderr from the server and append to the log file."""
if not self._process or not self._process.stdout:
return
try:
log_file = get_loot_file("artifacts/hexstrike.log")
with log_file.open("ab") as fh:
while True:
line = await self._process.stdout.readline()
if not line:
break
fh.write(line)
fh.flush()
except asyncio.CancelledError:
return
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error capturing hexstrike output: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"HexStrike log capture failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike log capture failure")
async def stop(self, timeout: int = 5) -> None:
"""Stop the server process gracefully."""
proc = self._process
if not proc:
return
try:
proc.terminate()
await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.TimeoutError:
try:
proc.kill()
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to kill hexstrike after timeout: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to kill hexstrike after timeout: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike kill failure")
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error stopping hexstrike process: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error stopping hexstrike process: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike stop error")
self._process = None
if self._reader_task and not self._reader_task.done():
self._reader_task.cancel()
try:
await self._reader_task
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error awaiting hexstrike reader task: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error awaiting hexstrike reader task: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike reader await failure")
def stop_sync(self, timeout: int = 5) -> None:
"""Synchronous stop helper for use during process-exit cleanup.
This forcefully terminates the underlying subprocess PID if the
async event loop is no longer available.
"""
proc = self._process
if not proc:
return
# Try to terminate gracefully first
try:
pid = getattr(proc, "pid", None)
if pid:
# Kill the whole process group if possible (handles children)
try:
pgid = os.getpgid(pid)
os.killpg(pgid, signal.SIGTERM)
except Exception:
try:
os.kill(pid, signal.SIGTERM)
except Exception:
import logging
logging.getLogger(__name__).exception("Failed to SIGTERM hexstrike pid: %s", pid)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to SIGTERM hexstrike pid {pid}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike SIGTERM failure")
# wait briefly for process to exit
end = time.time() + float(timeout)
while time.time() < end:
ret = getattr(proc, "returncode", None)
if ret is not None:
break
time.sleep(0.1)
# If still running, force kill the process group
try:
pgid = os.getpgid(pid)
os.killpg(pgid, signal.SIGKILL)
except Exception:
try:
os.kill(pid, signal.SIGKILL)
except Exception:
import logging
logging.getLogger(__name__).exception("Failed to SIGKILL hexstrike pid: %s", pid)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to SIGKILL hexstrike pid {pid}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike SIGKILL failure")
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error during hexstrike stop_sync cleanup: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error during hexstrike stop_sync cleanup: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike stop_sync cleanup error")
def __del__(self):
try:
self.stop_sync()
except Exception as e:
import logging
logging.getLogger(__name__).exception("Exception during HexstrikeAdapter.__del__: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error during HexstrikeAdapter cleanup: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike __del__ error")
# Clear references
try:
self._process = None
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to clear HexstrikeAdapter process reference: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to clear hexstrike process reference: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike process-clear failure")
async def health_check(self, timeout: int = 5) -> bool:
"""Check the server health endpoint. Returns True if healthy."""
url = f"http://{self.host}:{self.port}/health"
if aiohttp:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as resp:
return resp.status == 200
except Exception as e:
import logging
logging.getLogger(__name__).exception("HexstrikeAdapter health_check (aiohttp) failed: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"HexStrike health check failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike health check failure")
return False
# Fallback: synchronous urllib in thread
import urllib.request
def _check():
try:
with urllib.request.urlopen(url, timeout=timeout) as r:
return r.status == 200
except Exception as e:
import logging
logging.getLogger(__name__).exception("HexstrikeAdapter health_check (urllib) failed: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"HexStrike health check failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about hexstrike urllib health check failure")
return False
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _check)
def is_running(self) -> bool:
return self._process is not None and self._process.returncode is None
__all__ = ["HexstrikeAdapter"]

View File

@@ -1,414 +0,0 @@
"""Adapter to manage a vendored Metasploit MCP server.
This follows the same lightweight pattern as the Hexstrike adapter: it
expects the MetasploitMCP repository to be vendored under
``third_party/MetasploitMCP`` (or a custom path provided by the caller).
The adapter starts the server as a background subprocess and performs a
health check on a configurable port.
"""
import asyncio
import os
import shutil
import signal
import time
from pathlib import Path
from typing import Optional
try:
import aiohttp
except Exception:
aiohttp = None
from ..workspaces.utils import get_loot_file
class MetasploitAdapter:
"""Manage a vendored Metasploit MCP server under `third_party/MetasploitMCP`.
Usage:
adapter = MetasploitAdapter()
await adapter.start()
# ... use MCPManager to connect to the server
await adapter.stop()
"""
def __init__(
self,
host: str = "127.0.0.1",
port: int = 7777,
python_cmd: str = "python3",
server_script: Optional[Path] = None,
cwd: Optional[Path] = None,
env: Optional[dict] = None,
transport: str = "http",
) -> None:
self.host = host
self.port = int(port)
self.python_cmd = python_cmd
# Vendored project uses 'MetasploitMCP.py' as the main entrypoint
self.server_script = (
server_script or Path("third_party/MetasploitMCP/MetasploitMCP.py")
)
self.cwd = cwd or Path.cwd()
self.env = {**os.environ, **(env or {})}
self.transport = transport
self._process: Optional[asyncio.subprocess.Process] = None
self._reader_task: Optional[asyncio.Task] = None
self._msfrpcd_proc: Optional[asyncio.subprocess.Process] = None
def _build_command(self):
cmd = [self.python_cmd, str(self.server_script)]
# Prefer explicit transport when starting vendored server from adapter
if self.transport:
cmd += ["--transport", str(self.transport)]
# When running HTTP, ensure host/port are provided
if str(self.transport).lower() in ("http", "sse"):
cmd += ["--host", str(self.host), "--port", str(self.port)]
else:
# For other transports, allow default args
cmd += ["--port", str(self.port)]
return cmd
async def _start_msfrpcd_if_needed(self) -> None:
"""Start `msfrpcd` if it's not already reachable at MSF_SERVER:MSF_PORT.
This starts `msfrpcd` as a child process (no sudo) using MSF_* env
values if available. It's intentionally conservative: if the RPC
endpoint is already listening we won't try to start a new daemon.
"""
try:
msf_server = str(self.env.get("MSF_SERVER", "127.0.0.1"))
msf_port = int(self.env.get("MSF_PORT", 55553))
except Exception:
msf_server = "127.0.0.1"
msf_port = 55553
# Quick socket check to see if msfrpcd is already listening
import socket
try:
with socket.create_connection((msf_server, msf_port), timeout=1):
return
except Exception:
pass
# If msfrpcd not available on path, skip starting
if not shutil.which("msfrpcd"):
return
msf_user = str(self.env.get("MSF_USER", "msf"))
msf_password = str(self.env.get("MSF_PASSWORD", ""))
msf_ssl = str(self.env.get("MSF_SSL", "false")).lower() in ("1", "true", "yes", "y")
# Build args for msfrpcd (no sudo). Use -S (SSL optional) flag only if requested.
args = ["msfrpcd", "-U", msf_user, "-P", msf_password, "-a", msf_server, "-p", str(msf_port)]
if msf_ssl:
args.append("-S")
try:
resolved = shutil.which("msfrpcd") or "msfrpcd"
self._msfrpcd_proc = await asyncio.create_subprocess_exec(
resolved,
*args[1:],
cwd=str(self.cwd),
env=self.env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
start_new_session=True,
)
# Start reader to capture msfrpcd logs
loop = asyncio.get_running_loop()
loop.create_task(self._capture_msfrpcd_output())
# Poll the msfrpcd TCP socket until it's accepting connections or timeout
import socket
deadline = asyncio.get_event_loop().time() + 10.0
while asyncio.get_event_loop().time() < deadline:
try:
with socket.create_connection((msf_server, msf_port), timeout=1):
return
except Exception:
await asyncio.sleep(0.5)
# If we fallthrough, msfrpcd didn't become ready in time
return
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to start msfrpcd: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to start msfrpcd: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd start failure")
return
async def _capture_msfrpcd_output(self) -> None:
if not self._msfrpcd_proc or not self._msfrpcd_proc.stdout:
return
try:
log_file = get_loot_file("artifacts/msfrpcd.log")
with log_file.open("ab") as fh:
while True:
line = await self._msfrpcd_proc.stdout.readline()
if not line:
break
fh.write(b"[msfrpcd] " + line)
fh.flush()
except asyncio.CancelledError:
return
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error capturing msfrpcd output: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"msfrpcd log capture failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd log capture failure")
async def start(self, background: bool = True, timeout: int = 30) -> bool:
"""Start the vendored Metasploit MCP server.
Returns True if the server started and passed health check within
`timeout` seconds.
"""
if not self.server_script.exists():
raise FileNotFoundError(
f"Metasploit MCP server script not found at {self.server_script}."
)
if self._process and self._process.returncode is None:
return await self.health_check(timeout=1)
# If running in HTTP/SSE mode, ensure msfrpcd is started and reachable
if str(self.transport).lower() in ("http", "sse"):
try:
await self._start_msfrpcd_if_needed()
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error starting msfrpcd: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error starting msfrpcd: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd error")
cmd = self._build_command()
resolved = shutil.which(self.python_cmd) or self.python_cmd
self._process = await asyncio.create_subprocess_exec(
resolved,
*cmd[1:],
cwd=str(self.cwd),
env=self.env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
start_new_session=True,
)
# Log PID
try:
pid = getattr(self._process, "pid", None)
if pid:
log_file = get_loot_file("artifacts/metasploit_mcp.log")
with log_file.open("a") as fh:
fh.write(f"[MetasploitAdapter] started pid={pid}\n")
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to write metasploit start PID to log: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to write metasploit PID to log: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about metasploit PID log failure")
# Start background reader
loop = asyncio.get_running_loop()
self._reader_task = loop.create_task(self._capture_output())
try:
return await self.health_check(timeout=timeout)
except Exception as e:
import logging
logging.getLogger(__name__).exception("MetasploitAdapter health_check raised: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Metasploit health check failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about metasploit health check failure")
return False
async def _capture_output(self) -> None:
if not self._process or not self._process.stdout:
return
try:
log_file = get_loot_file("artifacts/metasploit_mcp.log")
with log_file.open("ab") as fh:
while True:
line = await self._process.stdout.readline()
if not line:
break
fh.write(line)
fh.flush()
except asyncio.CancelledError:
return
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error capturing metasploit output: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Metasploit log capture failed: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about metasploit log capture failure")
async def stop(self, timeout: int = 5) -> None:
proc = self._process
if not proc:
return
try:
proc.terminate()
await asyncio.wait_for(proc.wait(), timeout=timeout)
except asyncio.TimeoutError:
try:
proc.kill()
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error waiting for process termination: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error stopping metasploit adapter: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about metasploit stop error")
self._process = None
if self._reader_task and not self._reader_task.done():
self._reader_task.cancel()
try:
await self._reader_task
except Exception as e:
import logging
logging.getLogger(__name__).exception("Failed to kill msfrpcd during stop: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Failed to kill msfrpcd: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about msfrpcd kill failure")
# Stop msfrpcd if we started it
try:
msf_proc = self._msfrpcd_proc
if msf_proc:
try:
msf_proc.terminate()
await asyncio.wait_for(msf_proc.wait(), timeout=timeout)
except asyncio.TimeoutError:
try:
msf_proc.kill()
except Exception:
pass
except Exception as e:
import logging
logging.getLogger(__name__).exception("Error stopping metasploit adapter cleanup: %s", e)
try:
from ..interface.notifier import notify
notify("warning", f"Error stopping metasploit adapter: {e}")
except Exception:
logging.getLogger(__name__).exception("Failed to notify operator about metasploit adapter cleanup error")
finally:
self._msfrpcd_proc = None
def stop_sync(self, timeout: int = 5) -> None:
proc = self._process
if not proc:
return
try:
pid = getattr(proc, "pid", None)
if pid:
try:
pgid = os.getpgid(pid)
os.killpg(pgid, signal.SIGTERM)
except Exception:
try:
os.kill(pid, signal.SIGTERM)
except Exception:
pass
end = time.time() + float(timeout)
while time.time() < end:
ret = getattr(proc, "returncode", None)
if ret is not None:
break
time.sleep(0.1)
try:
pgid = os.getpgid(pid)
os.killpg(pgid, signal.SIGKILL)
except Exception:
try:
os.kill(pid, signal.SIGKILL)
except Exception:
pass
except Exception:
pass
def __del__(self):
try:
self.stop_sync()
except Exception:
pass
try:
self._process = None
except Exception:
pass
async def health_check(self, timeout: int = 5) -> bool:
url = f"http://{self.host}:{self.port}/health"
if aiohttp:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as resp:
return resp.status == 200
except Exception:
return False
import urllib.request
def _check():
try:
with urllib.request.urlopen(url, timeout=timeout) as r:
return r.status == 200
except Exception:
return False
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _check)
def is_running(self) -> bool:
return self._process is not None and self._process.returncode is None
__all__ = ["MetasploitAdapter"]

View File

@@ -1,3 +0,0 @@
# Wrapper requirements file for vendored HexStrike dependencies
# This delegates to the vendored requirements in third_party/hexstrike.
-r third_party/hexstrike/requirements.txt

View File

@@ -1,32 +0,0 @@
#!/usr/bin/env bash
# Helper script to vendor HexStrike into this repo using git subtree.
# Run from repository root.
set -euo pipefail
REPO_URL="https://github.com/0x4m4/hexstrike-ai.git"
PREFIX="third_party/hexstrike"
BRANCH="main"
echo "This will add HexStrike as a git subtree under ${PREFIX}."
echo "If the subtree already exists, the script will pull and rebase the subtree instead.\n"
if [ -d "${PREFIX}" ]; then
echo "Detected existing subtree at ${PREFIX}."
if [ "${FORCE_SUBTREE_PULL:-false}" = "true" ]; then
echo "FORCE_SUBTREE_PULL=true: pulling latest changes into existing subtree..."
git subtree pull --prefix="${PREFIX}" "${REPO_URL}" "${BRANCH}" --squash || {
echo "git subtree pull failed; attempting without --squash..."
git subtree pull --prefix="${PREFIX}" "${REPO_URL}" "${BRANCH}" || exit 1
}
echo "Subtree at ${PREFIX} updated."
else
echo "To update the existing subtree run:"
echo " FORCE_SUBTREE_PULL=true bash scripts/add_hexstrike_subtree.sh"
echo "Or run manually: git subtree pull --prefix=\"${PREFIX}\" ${REPO_URL} ${BRANCH} --squash"
fi
else
echo "Adding subtree for the first time..."
git subtree add --prefix="${PREFIX}" "${REPO_URL}" "${BRANCH}" --squash
echo "HexStrike subtree added under ${PREFIX}."
fi

View File

@@ -1,45 +0,0 @@
<#
Install vendored HexStrike Python dependencies (Windows/PowerShell).
This mirrors `scripts/install_hexstrike_deps.sh` for Windows users.
#>
Set-StrictMode -Version Latest
Write-Host "Installing vendored HexStrike dependencies (Windows)..."
# Load .env if present (simple parser: ignore comments/blank lines)
if (Test-Path -Path ".env") {
Write-Host "Sourcing .env"
Get-Content .env | ForEach-Object {
$line = $_.Trim()
if ($line -and -not $line.StartsWith("#") -and $line.Contains("=")) {
$parts = $line -split "=", 2
$name = $parts[0].Trim()
$value = $parts[1].Trim()
# Only set if not empty
if ($name) { $env:$name = $value }
}
}
}
$req = Join-Path -Path (Get-Location) -ChildPath "third_party/hexstrike/requirements.txt"
if (-not (Test-Path -Path $req)) {
Write-Host "Cannot find $req. Is the HexStrike subtree present?" -ForegroundColor Yellow
exit 1
}
# Prefer venv python if present
$python = "python"
if (Test-Path -Path ".\venv\Scripts\python.exe") {
$python = Join-Path -Path (Get-Location) -ChildPath ".\venv\Scripts\python.exe"
}
Write-Host "Using Python: $python"
& $python -m pip install --upgrade pip
& $python -m pip install -r $req
Write-Host "HexStrike dependencies installed. Note: many external tools are not included and must be installed separately as described in third_party/hexstrike/requirements.txt." -ForegroundColor Green
exit 0

View File

@@ -1,42 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
# Install vendored HexStrike Python dependencies.
# This script will source a local .env if present so any environment
# variables (proxies/indices/LLM keys) are respected during installation.
HERE=$(dirname "${BASH_SOURCE[0]}")
ROOT=$(cd "$HERE/.." && pwd)
cd "$ROOT"
if [ -f ".env" ]; then
echo "Sourcing .env"
# export all vars from .env (ignore comments and blank lines)
set -a
# shellcheck disable=SC1091
source .env
set +a
fi
REQ=third_party/hexstrike/requirements.txt
if [ ! -f "$REQ" ]; then
echo "Cannot find $REQ. Is the HexStrike subtree present?"
exit 1
fi
echo "Installing HexStrike requirements from $REQ"
# Prefer using the active venv python if present
PY=$(which python || true)
if [ -n "${VIRTUAL_ENV:-}" ]; then
PY="$VIRTUAL_ENV/bin/python"
fi
"$PY" -m pip install --upgrade pip
"$PY" -m pip install -r "$REQ"
echo "HexStrike dependencies installed. Note: many external tools are not included and must be installed separately as described in third_party/hexstrike/requirements.txt."
exit 0