* feat: implement URL validation to prevent SSRF

* feat: add zip extraction security

* ruff fixes
This commit is contained in:
Alex
2025-12-24 15:05:35 +00:00
committed by GitHub
parent 83e7a928f1
commit 98e949d2fd
11 changed files with 929 additions and 27 deletions

View File

@@ -63,10 +63,111 @@ current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
# Zip extraction security limits
MAX_UNCOMPRESSED_SIZE = 500 * 1024 * 1024 # 500 MB max uncompressed size
MAX_FILE_COUNT = 10000 # Maximum number of files to extract
MAX_COMPRESSION_RATIO = 100 # Maximum compression ratio (to detect zip bombs)
class ZipExtractionError(Exception):
"""Raised when zip extraction fails due to security constraints."""
pass
def _is_path_safe(base_path: str, target_path: str) -> bool:
"""
Check if target_path is safely within base_path (prevents zip slip attacks).
Args:
base_path: The base directory where extraction should occur.
target_path: The full path where a file would be extracted.
Returns:
True if the path is safe, False otherwise.
"""
# Resolve to absolute paths and check containment
base_resolved = os.path.realpath(base_path)
target_resolved = os.path.realpath(target_path)
return target_resolved.startswith(base_resolved + os.sep) or target_resolved == base_resolved
def _validate_zip_safety(zip_path: str, extract_to: str) -> None:
"""
Validate a zip file for security issues before extraction.
Checks for:
- Zip bombs (excessive compression ratio or uncompressed size)
- Too many files
- Path traversal attacks (zip slip)
Args:
zip_path: Path to the zip file.
extract_to: Destination directory.
Raises:
ZipExtractionError: If the zip file fails security validation.
"""
try:
with zipfile.ZipFile(zip_path, "r") as zip_ref:
# Get compressed size
compressed_size = os.path.getsize(zip_path)
# Calculate total uncompressed size and file count
total_uncompressed = 0
file_count = 0
for info in zip_ref.infolist():
file_count += 1
# Check file count limit
if file_count > MAX_FILE_COUNT:
raise ZipExtractionError(
f"Zip file contains too many files (>{MAX_FILE_COUNT}). "
"This may be a zip bomb attack."
)
# Accumulate uncompressed size
total_uncompressed += info.file_size
# Check total uncompressed size
if total_uncompressed > MAX_UNCOMPRESSED_SIZE:
raise ZipExtractionError(
f"Zip file uncompressed size exceeds limit "
f"({total_uncompressed / (1024*1024):.1f} MB > "
f"{MAX_UNCOMPRESSED_SIZE / (1024*1024):.1f} MB). "
"This may be a zip bomb attack."
)
# Check for path traversal (zip slip)
target_path = os.path.join(extract_to, info.filename)
if not _is_path_safe(extract_to, target_path):
raise ZipExtractionError(
f"Zip file contains path traversal attempt: {info.filename}"
)
# Check compression ratio (only if compressed size is meaningful)
if compressed_size > 0 and total_uncompressed > 0:
compression_ratio = total_uncompressed / compressed_size
if compression_ratio > MAX_COMPRESSION_RATIO:
raise ZipExtractionError(
f"Zip file has suspicious compression ratio ({compression_ratio:.1f}:1 > "
f"{MAX_COMPRESSION_RATIO}:1). This may be a zip bomb attack."
)
except zipfile.BadZipFile as e:
raise ZipExtractionError(f"Invalid or corrupted zip file: {e}")
def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
"""
Recursively extract zip files with a limit on recursion depth.
Recursively extract zip files with security protections.
Security measures:
- Limits recursion depth to prevent infinite loops
- Validates uncompressed size to prevent zip bombs
- Limits number of files to prevent resource exhaustion
- Checks compression ratio to detect zip bombs
- Validates paths to prevent zip slip attacks
Args:
zip_path (str): Path to the zip file to be extracted.
@@ -77,20 +178,33 @@ def extract_zip_recursive(zip_path, extract_to, current_depth=0, max_depth=5):
if current_depth > max_depth:
logging.warning(f"Reached maximum recursion depth of {max_depth}")
return
try:
# Validate zip file safety before extraction
_validate_zip_safety(zip_path, extract_to)
# Safe to extract
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extract_to)
os.remove(zip_path) # Remove the zip file after extracting
except ZipExtractionError as e:
logging.error(f"Zip security validation failed for {zip_path}: {e}")
# Remove the potentially malicious zip file
try:
os.remove(zip_path)
except OSError:
pass
return
except Exception as e:
logging.error(f"Error extracting zip file {zip_path}: {e}", exc_info=True)
return
# Check for nested zip files and extract them
# Check for nested zip files and extract them
for root, dirs, files in os.walk(extract_to):
for file in files:
if file.endswith(".zip"):
# If a nested zip file is found, extract it recursively
file_path = os.path.join(root, file)
extract_zip_recursive(file_path, root, current_depth + 1, max_depth)