mirror of
https://github.com/GH05TCREW/pentestagent.git
synced 2026-03-21 16:40:26 +00:00
250 lines
7.6 KiB
Python
250 lines
7.6 KiB
Python
"""Knowledge indexer for GhostCrew."""
|
|
|
|
import json
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, List
|
|
|
|
from .rag import Document
|
|
|
|
|
|
@dataclass
|
|
class IndexingResult:
|
|
"""Result of an indexing operation."""
|
|
|
|
total_files: int
|
|
indexed_files: int
|
|
total_chunks: int
|
|
errors: List[str]
|
|
|
|
|
|
class KnowledgeIndexer:
|
|
"""Indexes knowledge sources for the RAG engine."""
|
|
|
|
# Supported file extensions
|
|
TEXT_EXTENSIONS = [".txt", ".md", ".rst"]
|
|
DATA_EXTENSIONS = [".json", ".yaml", ".yml"]
|
|
|
|
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
|
|
"""
|
|
Initialize the knowledge indexer.
|
|
|
|
Args:
|
|
chunk_size: Maximum chunk size in characters
|
|
chunk_overlap: Overlap between chunks
|
|
"""
|
|
self.chunk_size = chunk_size
|
|
self.chunk_overlap = chunk_overlap
|
|
|
|
def index_directory(self, directory: Path) -> tuple[List[Document], IndexingResult]:
|
|
"""
|
|
Index all supported files in a directory.
|
|
|
|
Args:
|
|
directory: The directory to index
|
|
|
|
Returns:
|
|
Tuple of (documents, indexing_result)
|
|
"""
|
|
documents = []
|
|
errors = []
|
|
total_files = 0
|
|
indexed_files = 0
|
|
|
|
if not directory.exists():
|
|
return documents, IndexingResult(
|
|
0, 0, 0, [f"Directory not found: {directory}"]
|
|
)
|
|
|
|
for file_path in directory.rglob("*"):
|
|
if not file_path.is_file():
|
|
continue
|
|
|
|
total_files += 1
|
|
|
|
try:
|
|
file_docs = self.index_file(file_path)
|
|
if file_docs:
|
|
documents.extend(file_docs)
|
|
indexed_files += 1
|
|
except Exception as e:
|
|
errors.append(f"Error indexing {file_path}: {e}")
|
|
|
|
result = IndexingResult(
|
|
total_files=total_files,
|
|
indexed_files=indexed_files,
|
|
total_chunks=len(documents),
|
|
errors=errors,
|
|
)
|
|
|
|
return documents, result
|
|
|
|
def index_file(self, file_path: Path) -> List[Document]:
|
|
"""
|
|
Index a single file.
|
|
|
|
Args:
|
|
file_path: The file to index
|
|
|
|
Returns:
|
|
List of Document objects
|
|
"""
|
|
suffix = file_path.suffix.lower()
|
|
|
|
if suffix in self.TEXT_EXTENSIONS:
|
|
return self._index_text_file(file_path)
|
|
elif suffix in self.DATA_EXTENSIONS:
|
|
return self._index_data_file(file_path)
|
|
else:
|
|
return []
|
|
|
|
def _index_text_file(self, file_path: Path) -> List[Document]:
|
|
"""Index a text file."""
|
|
content = file_path.read_text(encoding="utf-8", errors="ignore")
|
|
return self._chunk_text(content, str(file_path))
|
|
|
|
def _index_data_file(self, file_path: Path) -> List[Document]:
|
|
"""Index a JSON/YAML file."""
|
|
content = file_path.read_text(encoding="utf-8")
|
|
|
|
if file_path.suffix == ".json":
|
|
data = json.loads(content)
|
|
else:
|
|
try:
|
|
import yaml
|
|
|
|
data = yaml.safe_load(content)
|
|
except ImportError:
|
|
return []
|
|
|
|
return self._process_data(data, str(file_path))
|
|
|
|
def _chunk_text(self, text: str, source: str) -> List[Document]:
|
|
"""Split text into chunks."""
|
|
chunks = []
|
|
|
|
# Try to split by sections (headers in markdown)
|
|
sections = self._split_by_sections(text)
|
|
|
|
for section in sections:
|
|
if len(section) <= self.chunk_size:
|
|
if section.strip():
|
|
chunks.append(Document(content=section.strip(), source=source))
|
|
else:
|
|
# Further split large sections
|
|
sub_chunks = self._split_by_paragraphs(section)
|
|
for sub in sub_chunks:
|
|
if sub.strip():
|
|
chunks.append(Document(content=sub.strip(), source=source))
|
|
|
|
return chunks
|
|
|
|
def _split_by_sections(self, text: str) -> List[str]:
|
|
"""Split text by markdown headers."""
|
|
import re
|
|
|
|
# Split by headers (# Header)
|
|
sections = re.split(r"\n(?=#{1,3}\s)", text)
|
|
|
|
if len(sections) == 1:
|
|
# No headers found, return original
|
|
return [text]
|
|
|
|
return sections
|
|
|
|
def _split_by_paragraphs(self, text: str) -> List[str]:
|
|
"""Split text by paragraphs with overlap."""
|
|
paragraphs = text.split("\n\n")
|
|
chunks = []
|
|
current_chunk = ""
|
|
|
|
for para in paragraphs:
|
|
if len(current_chunk) + len(para) + 2 <= self.chunk_size:
|
|
current_chunk += para + "\n\n"
|
|
else:
|
|
if current_chunk.strip():
|
|
chunks.append(current_chunk.strip())
|
|
# Start new chunk with overlap
|
|
current_chunk = para + "\n\n"
|
|
|
|
if current_chunk.strip():
|
|
chunks.append(current_chunk.strip())
|
|
|
|
return chunks
|
|
|
|
def _process_data(self, data: Any, source: str) -> List[Document]:
|
|
"""Process JSON/YAML data into documents."""
|
|
documents = []
|
|
|
|
if isinstance(data, list):
|
|
for i, item in enumerate(data):
|
|
doc = Document(
|
|
content=json.dumps(item, indent=2),
|
|
source=source,
|
|
metadata={"index": i, "type": "array_item"},
|
|
)
|
|
documents.append(doc)
|
|
|
|
elif isinstance(data, dict):
|
|
# Check if it has a specific structure
|
|
if "entries" in data or "items" in data or "data" in data:
|
|
items = data.get("entries") or data.get("items") or data.get("data")
|
|
if isinstance(items, list):
|
|
for i, item in enumerate(items):
|
|
doc = Document(
|
|
content=json.dumps(item, indent=2),
|
|
source=source,
|
|
metadata={"index": i, "type": "data_item"},
|
|
)
|
|
documents.append(doc)
|
|
else:
|
|
doc = Document(
|
|
content=json.dumps(data, indent=2),
|
|
source=source,
|
|
metadata={"type": "object"},
|
|
)
|
|
documents.append(doc)
|
|
else:
|
|
doc = Document(
|
|
content=json.dumps(data, indent=2),
|
|
source=source,
|
|
metadata={"type": "object"},
|
|
)
|
|
documents.append(doc)
|
|
|
|
else:
|
|
doc = Document(
|
|
content=str(data), source=source, metadata={"type": "primitive"}
|
|
)
|
|
documents.append(doc)
|
|
|
|
return documents
|
|
|
|
def create_knowledge_structure(self, base_path: Path):
|
|
"""
|
|
Create the default knowledge directory structure.
|
|
|
|
Args:
|
|
base_path: Base path for knowledge directory
|
|
"""
|
|
directories = [
|
|
base_path / "cves",
|
|
base_path / "wordlists",
|
|
base_path / "exploits",
|
|
base_path / "methodologies",
|
|
base_path / "custom",
|
|
]
|
|
|
|
for directory in directories:
|
|
directory.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create placeholder files
|
|
(base_path / "methodologies" / "README.md").write_text(
|
|
"# Penetration Testing Methodologies\n\n"
|
|
"Add methodology documents here.\n"
|
|
)
|
|
|
|
(base_path / "wordlists" / "common.txt").write_text(
|
|
"# Common wordlist\n" "admin\n" "password\n" "root\n" "user\n"
|
|
)
|