chore: apply ruff fixes to project files; exclude third_party from ruff

This commit is contained in:
giveen
2026-01-19 10:31:57 -07:00
parent acb5ca021e
commit 08e9d53dd8
24 changed files with 960 additions and 447 deletions

View File

@@ -0,0 +1,83 @@
import tarfile
from pathlib import Path
import pytest
from pentestagent.workspaces.utils import import_workspace
def make_tar_with_dir(source_dir: Path, archive_path: Path, store_subpath: Path = None):
# Create a tar.gz archive containing the contents of source_dir.
with tarfile.open(archive_path, "w:gz") as tf:
for p in source_dir.rglob("*"):
rel = p.relative_to(source_dir.parent)
# Optionally store paths under a custom subpath
arcname = str(rel)
if store_subpath:
# Prepend the store_subpath (e.g., workspaces/name/...)
arcname = str(store_subpath / p.relative_to(source_dir))
tf.add(str(p), arcname=arcname)
def test_import_workspace_nested(tmp_path):
# Create a workspace dir structure under a temporary dir
src_root = tmp_path / "src"
ws_name = "import-test"
ws_dir = src_root / "workspaces" / ws_name
ws_dir.mkdir(parents=True)
# write meta.yaml
meta = ws_dir / "meta.yaml"
meta.write_text("name: import-test\n")
# add a file
(ws_dir / "notes.txt").write_text("hello")
archive = tmp_path / "ws_nested.tar.gz"
# Create archive that stores workspaces/<name>/...
make_tar_with_dir(ws_dir, archive, store_subpath=Path("workspaces") / ws_name)
dest_root = tmp_path / "dest"
dest_root.mkdir()
name = import_workspace(archive, root=dest_root)
assert name == ws_name
dest_ws = dest_root / "workspaces" / ws_name
assert dest_ws.exists()
assert (dest_ws / "meta.yaml").exists()
def test_import_workspace_flat(tmp_path):
# Create a folder that is directly the workspace (not nested under workspaces/)
src = tmp_path / "srcflat"
src.mkdir()
(src / "meta.yaml").write_text("name: flat-test\n")
(src / "data.txt").write_text("x")
archive = tmp_path / "ws_flat.tar.gz"
# Archive the src folder contents directly (no workspaces/ prefix)
with tarfile.open(archive, "w:gz") as tf:
for p in src.rglob("*"):
tf.add(str(p), arcname=str(p.relative_to(src.parent)))
dest_root = tmp_path / "dest2"
dest_root.mkdir()
name = import_workspace(archive, root=dest_root)
assert name == "flat-test"
assert (dest_root / "workspaces" / "flat-test" / "meta.yaml").exists()
def test_import_workspace_missing_meta(tmp_path):
# Archive without meta.yaml
src = tmp_path / "empty"
src.mkdir()
(src / "file.txt").write_text("x")
archive = tmp_path / "no_meta.tar.gz"
with tarfile.open(archive, "w:gz") as tf:
for p in src.rglob("*"):
tf.add(str(p), arcname=str(p.relative_to(src.parent)))
dest_root = tmp_path / "dest3"
dest_root.mkdir()
with pytest.raises(ValueError):
import_workspace(archive, root=dest_root)

View File

@@ -0,0 +1,82 @@
def test_workspace_meta_write_failure_emits_notification(tmp_path, monkeypatch):
"""Simulate a meta.yaml write failure and ensure notifier receives a warning."""
from pentestagent.interface import notifier
from pentestagent.workspaces.manager import WorkspaceManager
captured = []
def cb(level, message):
captured.append((level, message))
notifier.register_callback(cb)
wm = WorkspaceManager(root=tmp_path)
# Create workspace first so initial meta is written successfully
wm.create("testws")
# Patch _write_meta to raise when called during set_active's meta update
def bad_write(self, name, meta):
raise RuntimeError("disk error")
monkeypatch.setattr(WorkspaceManager, "_write_meta", bad_write)
# Calling set_active should attempt to update meta and trigger notification
wm.set_active("testws")
assert len(captured) >= 1
# Find a warning notification
assert any("Failed to update workspace meta" in m for _, m in captured)
def test_rag_index_save_failure_emits_notification(tmp_path, monkeypatch):
"""Simulate RAG save failure during index persistence and ensure notifier gets a warning."""
from pentestagent.interface import notifier
from pentestagent.knowledge.rag import RAGEngine
captured = []
def cb(level, message):
captured.append((level, message))
notifier.register_callback(cb)
# Prepare a small knowledge tree under tmp_path
ws = tmp_path / "workspaces" / "ws1"
src = ws / "knowledge" / "sources"
src.mkdir(parents=True, exist_ok=True)
f = src / "doc.txt"
f.write_text("hello world")
# Patch resolve_knowledge_paths in the RAG module to point to our tmp workspace
def fake_resolve(root=None):
return {
"using_workspace": True,
"sources": src,
"embeddings": ws / "knowledge" / "embeddings",
}
monkeypatch.setattr("pentestagent.knowledge.rag.resolve_knowledge_paths", fake_resolve)
# Ensure embeddings generation returns deterministic array (avoid external calls)
import numpy as np
monkeypatch.setattr(
"pentestagent.knowledge.rag.get_embeddings",
lambda texts, model=None: np.zeros((len(texts), 8)),
)
# Patch save_index to raise
def bad_save(self, path):
raise RuntimeError("write failed")
monkeypatch.setattr(RAGEngine, "save_index", bad_save)
rag = RAGEngine() # uses default knowledge_path -> resolve_knowledge_paths
# Force indexing which will attempt to save and trigger notifier
rag.index(force=True)
assert len(captured) >= 1
assert any("Failed to save RAG index" in m or "persist RAG index" in m for _, m in captured)

View File

@@ -1,11 +1,8 @@
import os
from pathlib import Path
import pytest
from pentestagent.workspaces.manager import WorkspaceManager
from pentestagent.knowledge.rag import RAGEngine
from pentestagent.knowledge.indexer import KnowledgeIndexer
from pentestagent.knowledge.rag import RAGEngine
from pentestagent.workspaces.manager import WorkspaceManager
def test_rag_and_indexer_use_workspace(tmp_path, monkeypatch):

View File

@@ -0,0 +1,63 @@
from types import SimpleNamespace
import pytest
from pentestagent.agents.base_agent import BaseAgent
from pentestagent.workspaces.manager import WorkspaceManager
class DummyTool:
def __init__(self, name="dummy"):
self.name = name
async def execute(self, arguments, runtime):
return "ok"
class SimpleAgent(BaseAgent):
def get_system_prompt(self, mode: str = "agent") -> str:
return ""
@pytest.mark.asyncio
async def test_ip_and_cidr_containment(tmp_path, monkeypatch):
# Use tmp_path as project root so WorkspaceManager writes here
monkeypatch.chdir(tmp_path)
wm = WorkspaceManager(root=tmp_path)
name = "scope-test"
wm.create(name)
wm.set_active(name)
tool = DummyTool("dummy")
agent = SimpleAgent(llm=object(), tools=[tool], runtime=SimpleNamespace())
# Helper to run execute_tools with a candidate target
async def run_with_candidate(candidate):
call = {"id": "1", "name": "dummy", "arguments": {"target": candidate}}
results = await agent._execute_tools([call])
return results[0]
# 1) Allowed single IP, candidate same IP
wm.add_targets(name, ["192.0.2.5"])
res = await run_with_candidate("192.0.2.5")
assert res.success is True
# 2) Allowed single IP, candidate single-address CIDR (/32) -> allowed
res = await run_with_candidate("192.0.2.5/32")
assert res.success is True
# 3) Allowed CIDR, candidate IP inside -> allowed
wm.add_targets(name, ["198.51.100.0/24"])
res = await run_with_candidate("198.51.100.25")
assert res.success is True
# 4) Allowed CIDR, candidate subnet inside -> allowed
wm.add_targets(name, ["203.0.113.0/24"])
res = await run_with_candidate("203.0.113.128/25")
assert res.success is True
# 5) Allowed single IP, candidate larger network -> not allowed
wm.add_targets(name, ["192.0.2.5"])
res = await run_with_candidate("192.0.2.0/24")
assert res.success is False

View File

@@ -0,0 +1,56 @@
from pentestagent.workspaces import validation
from pentestagent.workspaces.manager import TargetManager
def test_ip_in_cidr_containment():
allowed = ["10.0.0.0/8"]
assert validation.is_target_in_scope("10.1.2.3", allowed)
def test_cidr_within_cidr():
allowed = ["10.0.0.0/8"]
assert validation.is_target_in_scope("10.1.0.0/16", allowed)
def test_cidr_equal_allowed():
allowed = ["10.0.0.0/8"]
assert validation.is_target_in_scope("10.0.0.0/8", allowed)
def test_cidr_larger_than_allowed_is_out_of_scope():
allowed = ["10.0.0.0/24"]
assert not validation.is_target_in_scope("10.0.0.0/16", allowed)
def test_single_ip_vs_single_address_cidr():
allowed = ["192.168.1.5"]
# Candidate expressed as a /32 network should be allowed when it represents the same single address
assert validation.is_target_in_scope("192.168.1.5/32", allowed)
def test_hostname_case_insensitive_match():
allowed = ["example.com"]
assert validation.is_target_in_scope("Example.COM", allowed)
def test_hostname_vs_ip_not_match():
allowed = ["example.com"]
assert not validation.is_target_in_scope("93.184.216.34", allowed)
def test_gather_candidate_targets_shallow_behavior():
# shallow extraction: list of strings is extracted
args = {"targets": ["1.2.3.4", "example.com"]}
assert set(validation.gather_candidate_targets(args)) == {"1.2.3.4", "example.com"}
# nested dicts inside lists are NOT traversed by the shallow extractor
args2 = {"hosts": [{"ip": "5.6.7.8"}]}
assert validation.gather_candidate_targets(args2) == []
# direct string argument returns itself
assert validation.gather_candidate_targets("8.8.8.8") == ["8.8.8.8"]
def test_normalize_target_accepts_hostnames_and_ips():
assert TargetManager.normalize_target("example.com") == "example.com"
assert TargetManager.normalize_target("8.8.8.8") == "8.8.8.8"

View File

@@ -1,9 +1,8 @@
import os
from pathlib import Path
import pytest
from pentestagent.workspaces.manager import WorkspaceManager, WorkspaceError
from pentestagent.workspaces.manager import WorkspaceError, WorkspaceManager
def test_invalid_workspace_names(tmp_path: Path):
@@ -19,7 +18,7 @@ def test_invalid_workspace_names(tmp_path: Path):
def test_create_and_idempotent(tmp_path: Path):
wm = WorkspaceManager(root=tmp_path)
name = "eng1"
meta = wm.create(name)
wm.create(name)
assert (tmp_path / "workspaces" / name).exists()
assert (tmp_path / "workspaces" / name / "meta.yaml").exists()
# create again should not raise and should return meta