Merge pull request #2000 from ManishMadan2882/main

Test coverage:  TTS, Security and Storage layers
This commit is contained in:
Alex
2025-10-02 00:11:22 +01:00
committed by GitHub
7 changed files with 918 additions and 76 deletions

View File

@@ -118,6 +118,7 @@ class Settings(BaseSettings):
# Encryption settings # Encryption settings
ENCRYPTION_SECRET_KEY: str = "default-docsgpt-encryption-key" ENCRYPTION_SECRET_KEY: str = "default-docsgpt-encryption-key"
ELEVENLABS_API_KEY: Optional[str] = None
path = Path(__file__).parent.parent.absolute() path = Path(__file__).parent.parent.absolute()
settings = Settings(_env_file=path.joinpath(".env"), _env_file_encoding="utf-8") settings = Settings(_env_file=path.joinpath(".env"), _env_file_encoding="utf-8")

View File

@@ -1,84 +1,30 @@
import asyncio
import websockets
import json
import base64
from io import BytesIO from io import BytesIO
import base64
from application.tts.base import BaseTTS from application.tts.base import BaseTTS
from application.core.settings import settings
class ElevenlabsTTS(BaseTTS): class ElevenlabsTTS(BaseTTS):
def __init__(self): def __init__(self):
self.api_key = 'ELEVENLABS_API_KEY'# here you should put your api key from elevenlabs.client import ElevenLabs
self.model = "eleven_flash_v2_5"
self.voice = "VOICE_ID" # this is the hash code for the voice not the name! self.client = ElevenLabs(
self.write_audio = 1 api_key=settings.ELEVENLABS_API_KEY,
)
def text_to_speech(self, text): def text_to_speech(self, text):
asyncio.run(self._text_to_speech_websocket(text)) lang = "en"
audio = self.client.generate(
text=text,
model="eleven_multilingual_v2",
voice="Brian",
)
audio_data = BytesIO()
for chunk in audio:
audio_data.write(chunk)
audio_bytes = audio_data.getvalue()
async def _text_to_speech_websocket(self, text): # Encode to base64
uri = f"wss://api.elevenlabs.io/v1/text-to-speech/{self.voice}/stream-input?model_id={self.model}" audio_base64 = base64.b64encode(audio_bytes).decode("utf-8")
websocket = await websockets.connect(uri) return audio_base64, lang
payload = {
"text": " ",
"voice_settings": {
"stability": 0.5,
"similarity_boost": 0.8,
},
"xi_api_key": self.api_key,
}
await websocket.send(json.dumps(payload))
async def listen():
while 1:
try:
msg = await websocket.recv()
data = json.loads(msg)
if data.get("audio"):
print("audio received")
yield base64.b64decode(data["audio"])
elif data.get("isFinal"):
break
except websockets.exceptions.ConnectionClosed:
print("websocket closed")
break
listen_task = asyncio.create_task(self.stream(listen()))
await websocket.send(json.dumps({"text": text}))
# this is to signal the end of the text, either use this or flush
await websocket.send(json.dumps({"text": ""}))
await listen_task
async def stream(self, audio_stream):
if self.write_audio:
audio_bytes = BytesIO()
async for chunk in audio_stream:
if chunk:
audio_bytes.write(chunk)
with open("output_audio.mp3", "wb") as f:
f.write(audio_bytes.getvalue())
else:
async for chunk in audio_stream:
pass # depends on the streamer!
def test_elevenlabs_websocket():
"""
Tests the ElevenlabsTTS text_to_speech method with a sample prompt.
Prints out the base64-encoded result and writes it to 'output_audio.mp3'.
"""
# Instantiate your TTS class
tts = ElevenlabsTTS()
# Call the method with some sample text
tts.text_to_speech("Hello from ElevenLabs WebSocket!")
print("Saved audio to output_audio.mp3.")
if __name__ == "__main__":
test_elevenlabs_websocket()

View File

@@ -0,0 +1,94 @@
import base64
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from application.security import encryption
def test_derive_key_uses_secret_and_user(monkeypatch):
monkeypatch.setattr(encryption.settings, "ENCRYPTION_SECRET_KEY", "test-secret")
salt = bytes(range(16))
expected_kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend(),
)
expected_key = expected_kdf.derive(b"test-secret#user-123")
derived = encryption._derive_key("user-123", salt)
assert derived == expected_key
def _fake_os_urandom_factory(values):
values_iter = iter(values)
def _fake(length):
value = next(values_iter)
assert len(value) == length
return value
return _fake
def test_encrypt_and_decrypt_round_trip(monkeypatch):
monkeypatch.setattr(encryption.settings, "ENCRYPTION_SECRET_KEY", "test-secret")
salt = bytes(range(16))
iv = bytes(range(16, 32))
monkeypatch.setattr(encryption.os, "urandom", _fake_os_urandom_factory([salt, iv]))
credentials = {"token": "abc123", "refresh": "xyz789"}
encrypted = encryption.encrypt_credentials(credentials, "user-123")
decoded = base64.b64decode(encrypted)
assert decoded[:16] == salt
assert decoded[16:32] == iv
decrypted = encryption.decrypt_credentials(encrypted, "user-123")
assert decrypted == credentials
def test_encrypt_credentials_returns_empty_for_empty_input(monkeypatch):
monkeypatch.setattr(encryption.settings, "ENCRYPTION_SECRET_KEY", "test-secret")
assert encryption.encrypt_credentials({}, "user-123") == ""
assert encryption.encrypt_credentials(None, "user-123") == ""
def test_encrypt_credentials_returns_empty_on_serialization_error(monkeypatch):
monkeypatch.setattr(encryption.settings, "ENCRYPTION_SECRET_KEY", "test-secret")
monkeypatch.setattr(encryption.os, "urandom", lambda length: b"\x00" * length)
class NonSerializable: # pragma: no cover - simple helper container
pass
credentials = {"bad": NonSerializable()}
assert encryption.encrypt_credentials(credentials, "user-123") == ""
def test_decrypt_credentials_returns_empty_for_invalid_input(monkeypatch):
monkeypatch.setattr(encryption.settings, "ENCRYPTION_SECRET_KEY", "test-secret")
assert encryption.decrypt_credentials("", "user-123") == {}
assert encryption.decrypt_credentials("not-base64", "user-123") == {}
invalid_payload = base64.b64encode(b"short").decode()
assert encryption.decrypt_credentials(invalid_payload, "user-123") == {}
def test_pad_and_unpad_are_inverse():
original = b"secret-data"
padded = encryption._pad_data(original)
assert len(padded) % 16 == 0
assert encryption._unpad_data(padded) == original

View File

@@ -0,0 +1,352 @@
"""Tests for LocalStorage implementation
"""
import io
import pytest
from unittest.mock import patch, MagicMock, mock_open
from application.storage.local import LocalStorage
@pytest.fixture
def temp_base_dir():
"""Provide a temporary base directory path for testing."""
return "/tmp/test_storage"
@pytest.fixture
def local_storage(temp_base_dir):
"""Create LocalStorage instance with test base directory."""
return LocalStorage(base_dir=temp_base_dir)
class TestLocalStorageInitialization:
"""Test LocalStorage initialization and configuration."""
def test_init_with_custom_base_dir(self):
"""Should use provided base directory."""
storage = LocalStorage(base_dir="/custom/path")
assert storage.base_dir == "/custom/path"
def test_init_with_default_base_dir(self):
"""Should use default base directory when none provided."""
storage = LocalStorage()
# Default is three levels up from the file location
assert storage.base_dir is not None
assert isinstance(storage.base_dir, str)
def test_get_full_path_with_relative_path(self, local_storage):
"""Should combine base_dir with relative path."""
result = local_storage._get_full_path("documents/test.txt")
assert result == "/tmp/test_storage/documents/test.txt"
def test_get_full_path_with_absolute_path(self, local_storage):
"""Should return absolute path unchanged."""
result = local_storage._get_full_path("/absolute/path/test.txt")
assert result == "/absolute/path/test.txt"
class TestLocalStorageSaveFile:
"""Test file saving functionality."""
@patch('os.makedirs')
@patch('builtins.open', new_callable=mock_open)
@patch('shutil.copyfileobj')
def test_save_file_creates_directory_and_saves(
self, mock_copyfileobj, mock_file, mock_makedirs, local_storage
):
"""Should create directory and save file content."""
file_data = io.BytesIO(b"test content")
path = "documents/test.txt"
result = local_storage.save_file(file_data, path)
# Verify directory creation
mock_makedirs.assert_called_once_with(
"/tmp/test_storage/documents",
exist_ok=True
)
# Verify file write
mock_file.assert_called_once_with("/tmp/test_storage/documents/test.txt", 'wb')
mock_copyfileobj.assert_called_once_with(file_data, mock_file())
# Verify result
assert result == {'storage_type': 'local'}
@patch('os.makedirs')
def test_save_file_with_save_method(self, mock_makedirs, local_storage):
"""Should use save method if file_data has it."""
file_data = MagicMock()
file_data.save = MagicMock()
path = "documents/test.txt"
result = local_storage.save_file(file_data, path)
# Verify save method was called
file_data.save.assert_called_once_with("/tmp/test_storage/documents/test.txt")
# Verify result
assert result == {'storage_type': 'local'}
@patch('os.makedirs')
@patch('builtins.open', new_callable=mock_open)
def test_save_file_with_absolute_path(self, mock_file, mock_makedirs, local_storage):
"""Should handle absolute paths correctly."""
file_data = io.BytesIO(b"test content")
path = "/absolute/path/test.txt"
local_storage.save_file(file_data, path)
mock_makedirs.assert_called_once_with("/absolute/path", exist_ok=True)
mock_file.assert_called_once_with("/absolute/path/test.txt", 'wb')
class TestLocalStorageGetFile:
"""Test file retrieval functionality."""
@patch('os.path.exists', return_value=True)
@patch('builtins.open', new_callable=mock_open, read_data=b"file content")
def test_get_file_returns_file_handle(self, mock_file, mock_exists, local_storage):
"""Should open and return file handle when file exists."""
path = "documents/test.txt"
result = local_storage.get_file(path)
mock_exists.assert_called_once_with("/tmp/test_storage/documents/test.txt")
mock_file.assert_called_once_with("/tmp/test_storage/documents/test.txt", 'rb')
assert result is not None
@patch('os.path.exists', return_value=False)
def test_get_file_raises_error_when_not_found(self, mock_exists, local_storage):
"""Should raise FileNotFoundError when file doesn't exist."""
path = "documents/nonexistent.txt"
with pytest.raises(FileNotFoundError, match="File not found"):
local_storage.get_file(path)
mock_exists.assert_called_once_with("/tmp/test_storage/documents/nonexistent.txt")
class TestLocalStorageDeleteFile:
"""Test file deletion functionality."""
@patch('os.remove')
@patch('os.path.exists', return_value=True)
def test_delete_file_removes_existing_file(self, mock_exists, mock_remove, local_storage):
"""Should delete file and return True when file exists."""
path = "documents/test.txt"
result = local_storage.delete_file(path)
assert result is True
mock_exists.assert_called_once_with("/tmp/test_storage/documents/test.txt")
mock_remove.assert_called_once_with("/tmp/test_storage/documents/test.txt")
@patch('os.path.exists', return_value=False)
def test_delete_file_returns_false_when_not_found(self, mock_exists, local_storage):
"""Should return False when file doesn't exist."""
path = "documents/nonexistent.txt"
result = local_storage.delete_file(path)
assert result is False
mock_exists.assert_called_once_with("/tmp/test_storage/documents/nonexistent.txt")
class TestLocalStorageFileExists:
"""Test file existence checking."""
@patch('os.path.exists', return_value=True)
def test_file_exists_returns_true_when_file_found(self, mock_exists, local_storage):
"""Should return True when file exists."""
path = "documents/test.txt"
result = local_storage.file_exists(path)
assert result is True
mock_exists.assert_called_once_with("/tmp/test_storage/documents/test.txt")
@patch('os.path.exists', return_value=False)
def test_file_exists_returns_false_when_not_found(self, mock_exists, local_storage):
"""Should return False when file doesn't exist."""
path = "documents/nonexistent.txt"
result = local_storage.file_exists(path)
assert result is False
mock_exists.assert_called_once_with("/tmp/test_storage/documents/nonexistent.txt")
class TestLocalStorageListFiles:
"""Test directory listing functionality."""
@patch('os.walk')
@patch('os.path.exists', return_value=True)
def test_list_files_returns_all_files_in_directory(
self, mock_exists, mock_walk, local_storage
):
"""Should return all files in directory and subdirectories."""
directory = "documents"
# Mock os.walk to return files in directory structure
mock_walk.return_value = [
("/tmp/test_storage/documents", ["subdir"], ["file1.txt", "file2.txt"]),
("/tmp/test_storage/documents/subdir", [], ["file3.txt"])
]
result = local_storage.list_files(directory)
assert len(result) == 3
assert "documents/file1.txt" in result
assert "documents/file2.txt" in result
assert "documents/subdir/file3.txt" in result
mock_exists.assert_called_once_with("/tmp/test_storage/documents")
mock_walk.assert_called_once_with("/tmp/test_storage/documents")
@patch('os.path.exists', return_value=False)
def test_list_files_returns_empty_list_when_directory_not_found(
self, mock_exists, local_storage
):
"""Should return empty list when directory doesn't exist."""
directory = "nonexistent"
result = local_storage.list_files(directory)
assert result == []
mock_exists.assert_called_once_with("/tmp/test_storage/nonexistent")
class TestLocalStorageProcessFile:
"""Test file processing functionality."""
@patch('os.path.exists', return_value=True)
def test_process_file_calls_processor_with_full_path(
self, mock_exists, local_storage
):
"""Should call processor function with full file path."""
path = "documents/test.txt"
processor_func = MagicMock(return_value="processed")
result = local_storage.process_file(path, processor_func, extra_arg="value")
assert result == "processed"
processor_func.assert_called_once_with(
local_path="/tmp/test_storage/documents/test.txt",
extra_arg="value"
)
mock_exists.assert_called_once_with("/tmp/test_storage/documents/test.txt")
@patch('os.path.exists', return_value=False)
def test_process_file_raises_error_when_file_not_found(self, mock_exists, local_storage):
"""Should raise FileNotFoundError when file doesn't exist."""
path = "documents/nonexistent.txt"
processor_func = MagicMock()
with pytest.raises(FileNotFoundError, match="File not found"):
local_storage.process_file(path, processor_func)
processor_func.assert_not_called()
class TestLocalStorageIsDirectory:
"""Test directory checking functionality."""
@patch('os.path.isdir', return_value=True)
def test_is_directory_returns_true_when_directory_exists(
self, mock_isdir, local_storage
):
"""Should return True when path is a directory."""
path = "documents"
result = local_storage.is_directory(path)
assert result is True
mock_isdir.assert_called_once_with("/tmp/test_storage/documents")
@patch('os.path.isdir', return_value=False)
def test_is_directory_returns_false_when_not_directory(
self, mock_isdir, local_storage
):
"""Should return False when path is not a directory or doesn't exist."""
path = "documents/test.txt"
result = local_storage.is_directory(path)
assert result is False
mock_isdir.assert_called_once_with("/tmp/test_storage/documents/test.txt")
class TestLocalStorageRemoveDirectory:
"""Test directory removal functionality."""
@patch('shutil.rmtree')
@patch('os.path.isdir', return_value=True)
@patch('os.path.exists', return_value=True)
def test_remove_directory_deletes_directory(
self, mock_exists, mock_isdir, mock_rmtree, local_storage
):
"""Should remove directory and return True when successful."""
directory = "documents"
result = local_storage.remove_directory(directory)
assert result is True
mock_exists.assert_called_once_with("/tmp/test_storage/documents")
mock_isdir.assert_called_once_with("/tmp/test_storage/documents")
mock_rmtree.assert_called_once_with("/tmp/test_storage/documents")
@patch('os.path.exists', return_value=False)
def test_remove_directory_returns_false_when_not_exists(
self, mock_exists, local_storage
):
"""Should return False when directory doesn't exist."""
directory = "nonexistent"
result = local_storage.remove_directory(directory)
assert result is False
mock_exists.assert_called_once_with("/tmp/test_storage/nonexistent")
@patch('os.path.isdir', return_value=False)
@patch('os.path.exists', return_value=True)
def test_remove_directory_returns_false_when_not_directory(
self, mock_exists, mock_isdir, local_storage
):
"""Should return False when path is not a directory."""
path = "documents/test.txt"
result = local_storage.remove_directory(path)
assert result is False
mock_exists.assert_called_once_with("/tmp/test_storage/documents/test.txt")
mock_isdir.assert_called_once_with("/tmp/test_storage/documents/test.txt")
@patch('shutil.rmtree', side_effect=OSError("Permission denied"))
@patch('os.path.isdir', return_value=True)
@patch('os.path.exists', return_value=True)
def test_remove_directory_returns_false_on_os_error(
self, mock_exists, mock_isdir, mock_rmtree, local_storage
):
"""Should return False when OSError occurs during removal."""
directory = "documents"
result = local_storage.remove_directory(directory)
assert result is False
mock_rmtree.assert_called_once_with("/tmp/test_storage/documents")
@patch('shutil.rmtree', side_effect=PermissionError("Access denied"))
@patch('os.path.isdir', return_value=True)
@patch('os.path.exists', return_value=True)
def test_remove_directory_returns_false_on_permission_error(
self, mock_exists, mock_isdir, mock_rmtree, local_storage
):
"""Should return False when PermissionError occurs during removal."""
directory = "documents"
result = local_storage.remove_directory(directory)
assert result is False
mock_rmtree.assert_called_once_with("/tmp/test_storage/documents")

View File

@@ -0,0 +1,382 @@
"""Tests for S3 storage implementation.
"""
import io
import pytest
from unittest.mock import patch, MagicMock
from botocore.exceptions import ClientError
from application.storage.s3 import S3Storage
@pytest.fixture
def mock_boto3_client():
"""Mock boto3.client to isolate S3 client creation."""
with patch('boto3.client') as mock_client:
s3_mock = MagicMock()
mock_client.return_value = s3_mock
yield s3_mock
@pytest.fixture
def s3_storage(mock_boto3_client):
"""Create S3Storage instance with mocked boto3 client."""
return S3Storage(bucket_name="test-bucket")
class TestS3StorageInitialization:
"""Test S3Storage initialization and configuration."""
def test_init_with_default_bucket(self):
"""Should use default bucket name when none provided."""
with patch('boto3.client'):
storage = S3Storage()
assert storage.bucket_name == "docsgpt-test-bucket"
def test_init_with_custom_bucket(self):
"""Should use provided bucket name."""
with patch('boto3.client'):
storage = S3Storage(bucket_name="custom-bucket")
assert storage.bucket_name == "custom-bucket"
def test_init_creates_boto3_client(self):
"""Should create boto3 S3 client with credentials from settings."""
with patch('boto3.client') as mock_client, \
patch('application.storage.s3.settings') as mock_settings:
mock_settings.SAGEMAKER_ACCESS_KEY = "test-key"
mock_settings.SAGEMAKER_SECRET_KEY = "test-secret"
mock_settings.SAGEMAKER_REGION = "us-west-2"
S3Storage()
mock_client.assert_called_once_with(
"s3",
aws_access_key_id="test-key",
aws_secret_access_key="test-secret",
region_name="us-west-2"
)
class TestS3StorageSaveFile:
"""Test file saving functionality."""
def test_save_file_uploads_to_s3(self, s3_storage, mock_boto3_client):
"""Should upload file to S3 with correct parameters."""
file_data = io.BytesIO(b"test content")
path = "documents/test.txt"
with patch('application.storage.s3.settings') as mock_settings:
mock_settings.SAGEMAKER_REGION = "us-east-1"
result = s3_storage.save_file(file_data, path)
mock_boto3_client.upload_fileobj.assert_called_once_with(
file_data,
"test-bucket",
path,
ExtraArgs={"StorageClass": "INTELLIGENT_TIERING"}
)
assert result == {
"storage_type": "s3",
"bucket_name": "test-bucket",
"uri": "s3://test-bucket/documents/test.txt",
"region": "us-east-1"
}
def test_save_file_with_custom_storage_class(self, s3_storage, mock_boto3_client):
"""Should use custom storage class when provided."""
file_data = io.BytesIO(b"test content")
path = "documents/test.txt"
with patch('application.storage.s3.settings') as mock_settings:
mock_settings.SAGEMAKER_REGION = "us-east-1"
s3_storage.save_file(file_data, path, storage_class="STANDARD")
mock_boto3_client.upload_fileobj.assert_called_once_with(
file_data,
"test-bucket",
path,
ExtraArgs={"StorageClass": "STANDARD"}
)
def test_save_file_propagates_client_error(self, s3_storage, mock_boto3_client):
"""Should propagate ClientError when upload fails."""
file_data = io.BytesIO(b"test content")
path = "documents/test.txt"
mock_boto3_client.upload_fileobj.side_effect = ClientError(
{"Error": {"Code": "AccessDenied", "Message": "Access denied"}},
"upload_fileobj"
)
with pytest.raises(ClientError):
s3_storage.save_file(file_data, path)
class TestS3StorageFileExists:
"""Test file existence checking."""
def test_file_exists_returns_true_when_file_found(self, s3_storage, mock_boto3_client):
"""Should return True when head_object succeeds."""
path = "documents/test.txt"
mock_boto3_client.head_object.return_value = {"ContentLength": 100}
result = s3_storage.file_exists(path)
assert result is True
mock_boto3_client.head_object.assert_called_once_with(
Bucket="test-bucket",
Key=path
)
def test_file_exists_returns_false_on_client_error(self, s3_storage, mock_boto3_client):
"""Should return False when head_object raises ClientError."""
path = "documents/nonexistent.txt"
mock_boto3_client.head_object.side_effect = ClientError(
{"Error": {"Code": "NoSuchKey", "Message": "Not found"}},
"head_object"
)
result = s3_storage.file_exists(path)
assert result is False
class TestS3StorageGetFile:
"""Test file retrieval functionality."""
def test_get_file_downloads_and_returns_file_object(self, s3_storage, mock_boto3_client):
"""Should download file from S3 and return BytesIO object."""
path = "documents/test.txt"
test_content = b"file content"
mock_boto3_client.head_object.return_value = {}
def mock_download(bucket, key, file_obj):
file_obj.write(test_content)
mock_boto3_client.download_fileobj.side_effect = mock_download
result = s3_storage.get_file(path)
assert isinstance(result, io.BytesIO)
assert result.read() == test_content
mock_boto3_client.download_fileobj.assert_called_once()
def test_get_file_raises_error_when_file_not_found(self, s3_storage, mock_boto3_client):
"""Should raise FileNotFoundError when file doesn't exist."""
path = "documents/nonexistent.txt"
mock_boto3_client.head_object.side_effect = ClientError(
{"Error": {"Code": "NoSuchKey", "Message": "Not found"}},
"head_object"
)
with pytest.raises(FileNotFoundError, match="File not found"):
s3_storage.get_file(path)
class TestS3StorageDeleteFile:
"""Test file deletion functionality."""
def test_delete_file_returns_true_on_success(self, s3_storage, mock_boto3_client):
"""Should return True when deletion succeeds."""
path = "documents/test.txt"
mock_boto3_client.delete_object.return_value = {}
result = s3_storage.delete_file(path)
assert result is True
mock_boto3_client.delete_object.assert_called_once_with(
Bucket="test-bucket",
Key=path
)
def test_delete_file_returns_false_on_client_error(self, s3_storage, mock_boto3_client):
"""Should return False when deletion fails with ClientError."""
path = "documents/test.txt"
mock_boto3_client.delete_object.side_effect = ClientError(
{"Error": {"Code": "AccessDenied", "Message": "Access denied"}},
"delete_object"
)
result = s3_storage.delete_file(path)
assert result is False
class TestS3StorageListFiles:
"""Test directory listing functionality."""
def test_list_files_returns_all_keys_with_prefix(self, s3_storage, mock_boto3_client):
"""Should return all file keys matching the directory prefix."""
directory = "documents/"
paginator_mock = MagicMock()
mock_boto3_client.get_paginator.return_value = paginator_mock
paginator_mock.paginate.return_value = [
{
"Contents": [
{"Key": "documents/file1.txt"},
{"Key": "documents/file2.txt"},
{"Key": "documents/subdir/file3.txt"}
]
}
]
result = s3_storage.list_files(directory)
assert len(result) == 3
assert "documents/file1.txt" in result
assert "documents/file2.txt" in result
assert "documents/subdir/file3.txt" in result
mock_boto3_client.get_paginator.assert_called_once_with('list_objects_v2')
paginator_mock.paginate.assert_called_once_with(
Bucket="test-bucket",
Prefix="documents/"
)
def test_list_files_returns_empty_list_when_no_contents(self, s3_storage, mock_boto3_client):
"""Should return empty list when directory has no files."""
directory = "empty/"
paginator_mock = MagicMock()
mock_boto3_client.get_paginator.return_value = paginator_mock
paginator_mock.paginate.return_value = [{}]
result = s3_storage.list_files(directory)
assert result == []
class TestS3StorageProcessFile:
"""Test file processing functionality."""
def test_process_file_downloads_and_processes_file(self, s3_storage, mock_boto3_client):
"""Should download file to temp location and call processor function."""
path = "documents/test.txt"
mock_boto3_client.head_object.return_value = {}
with patch('tempfile.NamedTemporaryFile') as mock_temp:
mock_file = MagicMock()
mock_file.name = "/tmp/test_file"
mock_temp.return_value.__enter__.return_value = mock_file
processor_func = MagicMock(return_value="processed")
result = s3_storage.process_file(path, processor_func, extra_arg="value")
assert result == "processed"
processor_func.assert_called_once_with(local_path="/tmp/test_file", extra_arg="value")
mock_boto3_client.download_fileobj.assert_called_once()
def test_process_file_raises_error_when_file_not_found(self, s3_storage, mock_boto3_client):
"""Should raise FileNotFoundError when file doesn't exist."""
path = "documents/nonexistent.txt"
mock_boto3_client.head_object.side_effect = ClientError(
{"Error": {"Code": "NoSuchKey", "Message": "Not found"}},
"head_object"
)
processor_func = MagicMock()
with pytest.raises(FileNotFoundError, match="File not found in S3"):
s3_storage.process_file(path, processor_func)
class TestS3StorageIsDirectory:
"""Test directory checking functionality."""
def test_is_directory_returns_true_when_objects_exist(self, s3_storage, mock_boto3_client):
"""Should return True when objects exist with the directory prefix."""
path = "documents/"
mock_boto3_client.list_objects_v2.return_value = {
"Contents": [{"Key": "documents/file1.txt"}]
}
result = s3_storage.is_directory(path)
assert result is True
mock_boto3_client.list_objects_v2.assert_called_once_with(
Bucket="test-bucket",
Prefix="documents/",
MaxKeys=1
)
def test_is_directory_returns_false_when_no_objects_exist(self, s3_storage, mock_boto3_client):
"""Should return False when no objects exist with the directory prefix."""
path = "nonexistent/"
mock_boto3_client.list_objects_v2.return_value = {}
result = s3_storage.is_directory(path)
assert result is False
class TestS3StorageRemoveDirectory:
"""Test directory removal functionality."""
def test_remove_directory_deletes_all_objects(self, s3_storage, mock_boto3_client):
"""Should delete all objects with the directory prefix."""
directory = "documents/"
paginator_mock = MagicMock()
mock_boto3_client.get_paginator.return_value = paginator_mock
paginator_mock.paginate.return_value = [
{
"Contents": [
{"Key": "documents/file1.txt"},
{"Key": "documents/file2.txt"}
]
}
]
mock_boto3_client.delete_objects.return_value = {
"Deleted": [
{"Key": "documents/file1.txt"},
{"Key": "documents/file2.txt"}
]
}
result = s3_storage.remove_directory(directory)
assert result is True
mock_boto3_client.delete_objects.assert_called_once()
call_args = mock_boto3_client.delete_objects.call_args[1]
assert call_args["Bucket"] == "test-bucket"
assert len(call_args["Delete"]["Objects"]) == 2
def test_remove_directory_returns_false_when_empty(self, s3_storage, mock_boto3_client):
"""Should return False when directory is empty (no objects to delete)."""
directory = "empty/"
paginator_mock = MagicMock()
mock_boto3_client.get_paginator.return_value = paginator_mock
paginator_mock.paginate.return_value = [{}]
result = s3_storage.remove_directory(directory)
assert result is False
mock_boto3_client.delete_objects.assert_not_called()
def test_remove_directory_returns_false_on_client_error(self, s3_storage, mock_boto3_client):
"""Should return False when deletion fails with ClientError."""
directory = "documents/"
paginator_mock = MagicMock()
mock_boto3_client.get_paginator.return_value = paginator_mock
paginator_mock.paginate.return_value = [
{"Contents": [{"Key": "documents/file1.txt"}]}
]
mock_boto3_client.delete_objects.side_effect = ClientError(
{"Error": {"Code": "AccessDenied", "Message": "Access denied"}},
"delete_objects"
)
result = s3_storage.remove_directory(directory)
assert result is False

View File

@@ -0,0 +1,43 @@
import base64
import sys
from types import ModuleType, SimpleNamespace
from application.tts.elevenlabs import ElevenlabsTTS
def test_elevenlabs_text_to_speech_monkeypatched_client(monkeypatch):
monkeypatch.setattr(
"application.tts.elevenlabs.settings",
SimpleNamespace(ELEVENLABS_API_KEY="api-key"),
)
created = {}
class DummyClient:
def __init__(self, api_key):
created["api_key"] = api_key
self.generate_calls = []
def generate(self, *, text, model, voice):
self.generate_calls.append({"text": text, "model": model, "voice": voice})
yield b"chunk-one"
yield b"chunk-two"
client_module = ModuleType("elevenlabs.client")
client_module.ElevenLabs = DummyClient
package_module = ModuleType("elevenlabs")
package_module.client = client_module
monkeypatch.setitem(sys.modules, "elevenlabs", package_module)
monkeypatch.setitem(sys.modules, "elevenlabs.client", client_module)
tts = ElevenlabsTTS()
audio_base64, lang = tts.text_to_speech("Speak")
assert created["api_key"] == "api-key"
assert tts.client.generate_calls == [
{"text": "Speak", "model": "eleven_multilingual_v2", "voice": "Brian"}
]
assert lang == "en"
assert base64.b64decode(audio_base64.encode()) == b"chunk-onechunk-two"

View File

@@ -0,0 +1,24 @@
import base64
from application.tts.google_tts import GoogleTTS
def test_google_tts_text_to_speech(monkeypatch):
captured = {}
class DummyGTTS:
def __init__(self, *, text, lang, slow):
captured["args"] = {"text": text, "lang": lang, "slow": slow}
def write_to_fp(self, fp):
fp.write(b"synthetic-audio")
monkeypatch.setattr("application.tts.google_tts.gTTS", DummyGTTS)
tts = GoogleTTS()
audio_base64, lang = tts.text_to_speech("hello world")
assert captured["args"] == {"text": "hello world", "lang": "en", "slow": False}
assert lang == "en"
assert base64.b64decode(audio_base64.encode()) == b"synthetic-audio"