From 42fc771833e6cb606d787871a7a6678175c12677 Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Thu, 2 Oct 2025 02:33:12 +0530 Subject: [PATCH 1/5] tests:storage --- tests/storage/test_local_storage.py | 353 +++++++++++++++++++++++++ tests/storage/test_s3_storage.py | 382 ++++++++++++++++++++++++++++ 2 files changed, 735 insertions(+) create mode 100644 tests/storage/test_local_storage.py create mode 100644 tests/storage/test_s3_storage.py diff --git a/tests/storage/test_local_storage.py b/tests/storage/test_local_storage.py new file mode 100644 index 00000000..61f9fb97 --- /dev/null +++ b/tests/storage/test_local_storage.py @@ -0,0 +1,353 @@ +"""Tests for LocalStorage implementation +""" + +import io +import os +import pytest +from unittest.mock import patch, MagicMock, mock_open + +from application.storage.local import LocalStorage + + +@pytest.fixture +def temp_base_dir(): + """Provide a temporary base directory path for testing.""" + return "/tmp/test_storage" + + +@pytest.fixture +def local_storage(temp_base_dir): + """Create LocalStorage instance with test base directory.""" + return LocalStorage(base_dir=temp_base_dir) + + +class TestLocalStorageInitialization: + """Test LocalStorage initialization and configuration.""" + + def test_init_with_custom_base_dir(self): + """Should use provided base directory.""" + storage = LocalStorage(base_dir="/custom/path") + assert storage.base_dir == "/custom/path" + + def test_init_with_default_base_dir(self): + """Should use default base directory when none provided.""" + storage = LocalStorage() + # Default is three levels up from the file location + assert storage.base_dir is not None + assert isinstance(storage.base_dir, str) + + def test_get_full_path_with_relative_path(self, local_storage): + """Should combine base_dir with relative path.""" + result = local_storage._get_full_path("documents/test.txt") + assert result == "/tmp/test_storage/documents/test.txt" + + def test_get_full_path_with_absolute_path(self, local_storage): + """Should return absolute path unchanged.""" + result = local_storage._get_full_path("/absolute/path/test.txt") + assert result == "/absolute/path/test.txt" + + +class TestLocalStorageSaveFile: + """Test file saving functionality.""" + + @patch('os.makedirs') + @patch('builtins.open', new_callable=mock_open) + @patch('shutil.copyfileobj') + def test_save_file_creates_directory_and_saves( + self, mock_copyfileobj, mock_file, mock_makedirs, local_storage + ): + """Should create directory and save file content.""" + file_data = io.BytesIO(b"test content") + path = "documents/test.txt" + + result = local_storage.save_file(file_data, path) + + # Verify directory creation + mock_makedirs.assert_called_once_with( + "/tmp/test_storage/documents", + exist_ok=True + ) + + # Verify file write + mock_file.assert_called_once_with("/tmp/test_storage/documents/test.txt", 'wb') + mock_copyfileobj.assert_called_once_with(file_data, mock_file()) + + # Verify result + assert result == {'storage_type': 'local'} + + @patch('os.makedirs') + def test_save_file_with_save_method(self, mock_makedirs, local_storage): + """Should use save method if file_data has it.""" + file_data = MagicMock() + file_data.save = MagicMock() + path = "documents/test.txt" + + result = local_storage.save_file(file_data, path) + + # Verify save method was called + file_data.save.assert_called_once_with("/tmp/test_storage/documents/test.txt") + + # Verify result + assert result == {'storage_type': 'local'} + + @patch('os.makedirs') + @patch('builtins.open', new_callable=mock_open) + def test_save_file_with_absolute_path(self, mock_file, mock_makedirs, local_storage): + """Should handle absolute paths correctly.""" + file_data = io.BytesIO(b"test content") + path = "/absolute/path/test.txt" + + local_storage.save_file(file_data, path) + + mock_makedirs.assert_called_once_with("/absolute/path", exist_ok=True) + mock_file.assert_called_once_with("/absolute/path/test.txt", 'wb') + + +class TestLocalStorageGetFile: + """Test file retrieval functionality.""" + + @patch('os.path.exists', return_value=True) + @patch('builtins.open', new_callable=mock_open, read_data=b"file content") + def test_get_file_returns_file_handle(self, mock_file, mock_exists, local_storage): + """Should open and return file handle when file exists.""" + path = "documents/test.txt" + + result = local_storage.get_file(path) + + mock_exists.assert_called_once_with("/tmp/test_storage/documents/test.txt") + mock_file.assert_called_once_with("/tmp/test_storage/documents/test.txt", 'rb') + assert result is not None + + @patch('os.path.exists', return_value=False) + def test_get_file_raises_error_when_not_found(self, mock_exists, local_storage): + """Should raise FileNotFoundError when file doesn't exist.""" + path = "documents/nonexistent.txt" + + with pytest.raises(FileNotFoundError, match="File not found"): + local_storage.get_file(path) + + mock_exists.assert_called_once_with("/tmp/test_storage/documents/nonexistent.txt") + + +class TestLocalStorageDeleteFile: + """Test file deletion functionality.""" + + @patch('os.remove') + @patch('os.path.exists', return_value=True) + def test_delete_file_removes_existing_file(self, mock_exists, mock_remove, local_storage): + """Should delete file and return True when file exists.""" + path = "documents/test.txt" + + result = local_storage.delete_file(path) + + assert result is True + mock_exists.assert_called_once_with("/tmp/test_storage/documents/test.txt") + mock_remove.assert_called_once_with("/tmp/test_storage/documents/test.txt") + + @patch('os.path.exists', return_value=False) + def test_delete_file_returns_false_when_not_found(self, mock_exists, local_storage): + """Should return False when file doesn't exist.""" + path = "documents/nonexistent.txt" + + result = local_storage.delete_file(path) + + assert result is False + mock_exists.assert_called_once_with("/tmp/test_storage/documents/nonexistent.txt") + + +class TestLocalStorageFileExists: + """Test file existence checking.""" + + @patch('os.path.exists', return_value=True) + def test_file_exists_returns_true_when_file_found(self, mock_exists, local_storage): + """Should return True when file exists.""" + path = "documents/test.txt" + + result = local_storage.file_exists(path) + + assert result is True + mock_exists.assert_called_once_with("/tmp/test_storage/documents/test.txt") + + @patch('os.path.exists', return_value=False) + def test_file_exists_returns_false_when_not_found(self, mock_exists, local_storage): + """Should return False when file doesn't exist.""" + path = "documents/nonexistent.txt" + + result = local_storage.file_exists(path) + + assert result is False + mock_exists.assert_called_once_with("/tmp/test_storage/documents/nonexistent.txt") + + +class TestLocalStorageListFiles: + """Test directory listing functionality.""" + + @patch('os.walk') + @patch('os.path.exists', return_value=True) + def test_list_files_returns_all_files_in_directory( + self, mock_exists, mock_walk, local_storage + ): + """Should return all files in directory and subdirectories.""" + directory = "documents" + + # Mock os.walk to return files in directory structure + mock_walk.return_value = [ + ("/tmp/test_storage/documents", ["subdir"], ["file1.txt", "file2.txt"]), + ("/tmp/test_storage/documents/subdir", [], ["file3.txt"]) + ] + + result = local_storage.list_files(directory) + + assert len(result) == 3 + assert "documents/file1.txt" in result + assert "documents/file2.txt" in result + assert "documents/subdir/file3.txt" in result + + mock_exists.assert_called_once_with("/tmp/test_storage/documents") + mock_walk.assert_called_once_with("/tmp/test_storage/documents") + + @patch('os.path.exists', return_value=False) + def test_list_files_returns_empty_list_when_directory_not_found( + self, mock_exists, local_storage + ): + """Should return empty list when directory doesn't exist.""" + directory = "nonexistent" + + result = local_storage.list_files(directory) + + assert result == [] + mock_exists.assert_called_once_with("/tmp/test_storage/nonexistent") + + +class TestLocalStorageProcessFile: + """Test file processing functionality.""" + + @patch('os.path.exists', return_value=True) + def test_process_file_calls_processor_with_full_path( + self, mock_exists, local_storage + ): + """Should call processor function with full file path.""" + path = "documents/test.txt" + processor_func = MagicMock(return_value="processed") + + result = local_storage.process_file(path, processor_func, extra_arg="value") + + assert result == "processed" + processor_func.assert_called_once_with( + local_path="/tmp/test_storage/documents/test.txt", + extra_arg="value" + ) + mock_exists.assert_called_once_with("/tmp/test_storage/documents/test.txt") + + @patch('os.path.exists', return_value=False) + def test_process_file_raises_error_when_file_not_found(self, mock_exists, local_storage): + """Should raise FileNotFoundError when file doesn't exist.""" + path = "documents/nonexistent.txt" + processor_func = MagicMock() + + with pytest.raises(FileNotFoundError, match="File not found"): + local_storage.process_file(path, processor_func) + + processor_func.assert_not_called() + + +class TestLocalStorageIsDirectory: + """Test directory checking functionality.""" + + @patch('os.path.isdir', return_value=True) + def test_is_directory_returns_true_when_directory_exists( + self, mock_isdir, local_storage + ): + """Should return True when path is a directory.""" + path = "documents" + + result = local_storage.is_directory(path) + + assert result is True + mock_isdir.assert_called_once_with("/tmp/test_storage/documents") + + @patch('os.path.isdir', return_value=False) + def test_is_directory_returns_false_when_not_directory( + self, mock_isdir, local_storage + ): + """Should return False when path is not a directory or doesn't exist.""" + path = "documents/test.txt" + + result = local_storage.is_directory(path) + + assert result is False + mock_isdir.assert_called_once_with("/tmp/test_storage/documents/test.txt") + + +class TestLocalStorageRemoveDirectory: + """Test directory removal functionality.""" + + @patch('shutil.rmtree') + @patch('os.path.isdir', return_value=True) + @patch('os.path.exists', return_value=True) + def test_remove_directory_deletes_directory( + self, mock_exists, mock_isdir, mock_rmtree, local_storage + ): + """Should remove directory and return True when successful.""" + directory = "documents" + + result = local_storage.remove_directory(directory) + + assert result is True + mock_exists.assert_called_once_with("/tmp/test_storage/documents") + mock_isdir.assert_called_once_with("/tmp/test_storage/documents") + mock_rmtree.assert_called_once_with("/tmp/test_storage/documents") + + @patch('os.path.exists', return_value=False) + def test_remove_directory_returns_false_when_not_exists( + self, mock_exists, local_storage + ): + """Should return False when directory doesn't exist.""" + directory = "nonexistent" + + result = local_storage.remove_directory(directory) + + assert result is False + mock_exists.assert_called_once_with("/tmp/test_storage/nonexistent") + + @patch('os.path.isdir', return_value=False) + @patch('os.path.exists', return_value=True) + def test_remove_directory_returns_false_when_not_directory( + self, mock_exists, mock_isdir, local_storage + ): + """Should return False when path is not a directory.""" + path = "documents/test.txt" + + result = local_storage.remove_directory(path) + + assert result is False + mock_exists.assert_called_once_with("/tmp/test_storage/documents/test.txt") + mock_isdir.assert_called_once_with("/tmp/test_storage/documents/test.txt") + + @patch('shutil.rmtree', side_effect=OSError("Permission denied")) + @patch('os.path.isdir', return_value=True) + @patch('os.path.exists', return_value=True) + def test_remove_directory_returns_false_on_os_error( + self, mock_exists, mock_isdir, mock_rmtree, local_storage + ): + """Should return False when OSError occurs during removal.""" + directory = "documents" + + result = local_storage.remove_directory(directory) + + assert result is False + mock_rmtree.assert_called_once_with("/tmp/test_storage/documents") + + @patch('shutil.rmtree', side_effect=PermissionError("Access denied")) + @patch('os.path.isdir', return_value=True) + @patch('os.path.exists', return_value=True) + def test_remove_directory_returns_false_on_permission_error( + self, mock_exists, mock_isdir, mock_rmtree, local_storage + ): + """Should return False when PermissionError occurs during removal.""" + directory = "documents" + + result = local_storage.remove_directory(directory) + + assert result is False + mock_rmtree.assert_called_once_with("/tmp/test_storage/documents") diff --git a/tests/storage/test_s3_storage.py b/tests/storage/test_s3_storage.py new file mode 100644 index 00000000..a9cc3c5a --- /dev/null +++ b/tests/storage/test_s3_storage.py @@ -0,0 +1,382 @@ +"""Tests for S3 storage implementation. +""" + +import io +import pytest +from unittest.mock import patch, MagicMock +from botocore.exceptions import ClientError + +from application.storage.s3 import S3Storage + + +@pytest.fixture +def mock_boto3_client(): + """Mock boto3.client to isolate S3 client creation.""" + with patch('boto3.client') as mock_client: + s3_mock = MagicMock() + mock_client.return_value = s3_mock + yield s3_mock + + +@pytest.fixture +def s3_storage(mock_boto3_client): + """Create S3Storage instance with mocked boto3 client.""" + return S3Storage(bucket_name="test-bucket") + + +class TestS3StorageInitialization: + """Test S3Storage initialization and configuration.""" + + def test_init_with_default_bucket(self): + """Should use default bucket name when none provided.""" + with patch('boto3.client'): + storage = S3Storage() + assert storage.bucket_name == "docsgpt-test-bucket" + + def test_init_with_custom_bucket(self): + """Should use provided bucket name.""" + with patch('boto3.client'): + storage = S3Storage(bucket_name="custom-bucket") + assert storage.bucket_name == "custom-bucket" + + def test_init_creates_boto3_client(self): + """Should create boto3 S3 client with credentials from settings.""" + with patch('boto3.client') as mock_client, \ + patch('application.storage.s3.settings') as mock_settings: + + mock_settings.SAGEMAKER_ACCESS_KEY = "test-key" + mock_settings.SAGEMAKER_SECRET_KEY = "test-secret" + mock_settings.SAGEMAKER_REGION = "us-west-2" + + S3Storage() + + mock_client.assert_called_once_with( + "s3", + aws_access_key_id="test-key", + aws_secret_access_key="test-secret", + region_name="us-west-2" + ) + + +class TestS3StorageSaveFile: + """Test file saving functionality.""" + + def test_save_file_uploads_to_s3(self, s3_storage, mock_boto3_client): + """Should upload file to S3 with correct parameters.""" + file_data = io.BytesIO(b"test content") + path = "documents/test.txt" + + with patch('application.storage.s3.settings') as mock_settings: + mock_settings.SAGEMAKER_REGION = "us-east-1" + result = s3_storage.save_file(file_data, path) + + mock_boto3_client.upload_fileobj.assert_called_once_with( + file_data, + "test-bucket", + path, + ExtraArgs={"StorageClass": "INTELLIGENT_TIERING"} + ) + + assert result == { + "storage_type": "s3", + "bucket_name": "test-bucket", + "uri": "s3://test-bucket/documents/test.txt", + "region": "us-east-1" + } + + def test_save_file_with_custom_storage_class(self, s3_storage, mock_boto3_client): + """Should use custom storage class when provided.""" + file_data = io.BytesIO(b"test content") + path = "documents/test.txt" + + with patch('application.storage.s3.settings') as mock_settings: + mock_settings.SAGEMAKER_REGION = "us-east-1" + s3_storage.save_file(file_data, path, storage_class="STANDARD") + + mock_boto3_client.upload_fileobj.assert_called_once_with( + file_data, + "test-bucket", + path, + ExtraArgs={"StorageClass": "STANDARD"} + ) + + def test_save_file_propagates_client_error(self, s3_storage, mock_boto3_client): + """Should propagate ClientError when upload fails.""" + file_data = io.BytesIO(b"test content") + path = "documents/test.txt" + + mock_boto3_client.upload_fileobj.side_effect = ClientError( + {"Error": {"Code": "AccessDenied", "Message": "Access denied"}}, + "upload_fileobj" + ) + + with pytest.raises(ClientError): + s3_storage.save_file(file_data, path) + + +class TestS3StorageFileExists: + """Test file existence checking.""" + + def test_file_exists_returns_true_when_file_found(self, s3_storage, mock_boto3_client): + """Should return True when head_object succeeds.""" + path = "documents/test.txt" + mock_boto3_client.head_object.return_value = {"ContentLength": 100} + + result = s3_storage.file_exists(path) + + assert result is True + mock_boto3_client.head_object.assert_called_once_with( + Bucket="test-bucket", + Key=path + ) + + def test_file_exists_returns_false_on_client_error(self, s3_storage, mock_boto3_client): + """Should return False when head_object raises ClientError.""" + path = "documents/nonexistent.txt" + mock_boto3_client.head_object.side_effect = ClientError( + {"Error": {"Code": "NoSuchKey", "Message": "Not found"}}, + "head_object" + ) + + result = s3_storage.file_exists(path) + + assert result is False + + +class TestS3StorageGetFile: + """Test file retrieval functionality.""" + + def test_get_file_downloads_and_returns_file_object(self, s3_storage, mock_boto3_client): + """Should download file from S3 and return BytesIO object.""" + path = "documents/test.txt" + test_content = b"file content" + + mock_boto3_client.head_object.return_value = {} + + def mock_download(bucket, key, file_obj): + file_obj.write(test_content) + + mock_boto3_client.download_fileobj.side_effect = mock_download + + result = s3_storage.get_file(path) + + assert isinstance(result, io.BytesIO) + assert result.read() == test_content + mock_boto3_client.download_fileobj.assert_called_once() + + def test_get_file_raises_error_when_file_not_found(self, s3_storage, mock_boto3_client): + """Should raise FileNotFoundError when file doesn't exist.""" + path = "documents/nonexistent.txt" + mock_boto3_client.head_object.side_effect = ClientError( + {"Error": {"Code": "NoSuchKey", "Message": "Not found"}}, + "head_object" + ) + + with pytest.raises(FileNotFoundError, match="File not found"): + s3_storage.get_file(path) + + +class TestS3StorageDeleteFile: + """Test file deletion functionality.""" + + def test_delete_file_returns_true_on_success(self, s3_storage, mock_boto3_client): + """Should return True when deletion succeeds.""" + path = "documents/test.txt" + mock_boto3_client.delete_object.return_value = {} + + result = s3_storage.delete_file(path) + + assert result is True + mock_boto3_client.delete_object.assert_called_once_with( + Bucket="test-bucket", + Key=path + ) + + def test_delete_file_returns_false_on_client_error(self, s3_storage, mock_boto3_client): + """Should return False when deletion fails with ClientError.""" + path = "documents/test.txt" + mock_boto3_client.delete_object.side_effect = ClientError( + {"Error": {"Code": "AccessDenied", "Message": "Access denied"}}, + "delete_object" + ) + + result = s3_storage.delete_file(path) + + assert result is False + + +class TestS3StorageListFiles: + """Test directory listing functionality.""" + + def test_list_files_returns_all_keys_with_prefix(self, s3_storage, mock_boto3_client): + """Should return all file keys matching the directory prefix.""" + directory = "documents/" + + paginator_mock = MagicMock() + mock_boto3_client.get_paginator.return_value = paginator_mock + paginator_mock.paginate.return_value = [ + { + "Contents": [ + {"Key": "documents/file1.txt"}, + {"Key": "documents/file2.txt"}, + {"Key": "documents/subdir/file3.txt"} + ] + } + ] + + result = s3_storage.list_files(directory) + + assert len(result) == 3 + assert "documents/file1.txt" in result + assert "documents/file2.txt" in result + assert "documents/subdir/file3.txt" in result + + mock_boto3_client.get_paginator.assert_called_once_with('list_objects_v2') + paginator_mock.paginate.assert_called_once_with( + Bucket="test-bucket", + Prefix="documents/" + ) + + def test_list_files_returns_empty_list_when_no_contents(self, s3_storage, mock_boto3_client): + """Should return empty list when directory has no files.""" + directory = "empty/" + + paginator_mock = MagicMock() + mock_boto3_client.get_paginator.return_value = paginator_mock + paginator_mock.paginate.return_value = [{}] + + result = s3_storage.list_files(directory) + + assert result == [] + + +class TestS3StorageProcessFile: + """Test file processing functionality.""" + + def test_process_file_downloads_and_processes_file(self, s3_storage, mock_boto3_client): + """Should download file to temp location and call processor function.""" + path = "documents/test.txt" + + mock_boto3_client.head_object.return_value = {} + + with patch('tempfile.NamedTemporaryFile') as mock_temp: + mock_file = MagicMock() + mock_file.name = "/tmp/test_file" + mock_temp.return_value.__enter__.return_value = mock_file + + processor_func = MagicMock(return_value="processed") + result = s3_storage.process_file(path, processor_func, extra_arg="value") + + assert result == "processed" + processor_func.assert_called_once_with(local_path="/tmp/test_file", extra_arg="value") + mock_boto3_client.download_fileobj.assert_called_once() + + def test_process_file_raises_error_when_file_not_found(self, s3_storage, mock_boto3_client): + """Should raise FileNotFoundError when file doesn't exist.""" + path = "documents/nonexistent.txt" + mock_boto3_client.head_object.side_effect = ClientError( + {"Error": {"Code": "NoSuchKey", "Message": "Not found"}}, + "head_object" + ) + + processor_func = MagicMock() + + with pytest.raises(FileNotFoundError, match="File not found in S3"): + s3_storage.process_file(path, processor_func) + + +class TestS3StorageIsDirectory: + """Test directory checking functionality.""" + + def test_is_directory_returns_true_when_objects_exist(self, s3_storage, mock_boto3_client): + """Should return True when objects exist with the directory prefix.""" + path = "documents/" + + mock_boto3_client.list_objects_v2.return_value = { + "Contents": [{"Key": "documents/file1.txt"}] + } + + result = s3_storage.is_directory(path) + + assert result is True + mock_boto3_client.list_objects_v2.assert_called_once_with( + Bucket="test-bucket", + Prefix="documents/", + MaxKeys=1 + ) + + def test_is_directory_returns_false_when_no_objects_exist(self, s3_storage, mock_boto3_client): + """Should return False when no objects exist with the directory prefix.""" + path = "nonexistent/" + + mock_boto3_client.list_objects_v2.return_value = {} + + result = s3_storage.is_directory(path) + + assert result is False + + +class TestS3StorageRemoveDirectory: + """Test directory removal functionality.""" + + def test_remove_directory_deletes_all_objects(self, s3_storage, mock_boto3_client): + """Should delete all objects with the directory prefix.""" + directory = "documents/" + + paginator_mock = MagicMock() + mock_boto3_client.get_paginator.return_value = paginator_mock + paginator_mock.paginate.return_value = [ + { + "Contents": [ + {"Key": "documents/file1.txt"}, + {"Key": "documents/file2.txt"} + ] + } + ] + + mock_boto3_client.delete_objects.return_value = { + "Deleted": [ + {"Key": "documents/file1.txt"}, + {"Key": "documents/file2.txt"} + ] + } + + result = s3_storage.remove_directory(directory) + + assert result is True + mock_boto3_client.delete_objects.assert_called_once() + call_args = mock_boto3_client.delete_objects.call_args[1] + assert call_args["Bucket"] == "test-bucket" + assert len(call_args["Delete"]["Objects"]) == 2 + + def test_remove_directory_returns_false_when_empty(self, s3_storage, mock_boto3_client): + """Should return False when directory is empty (no objects to delete).""" + directory = "empty/" + + paginator_mock = MagicMock() + mock_boto3_client.get_paginator.return_value = paginator_mock + paginator_mock.paginate.return_value = [{}] + + result = s3_storage.remove_directory(directory) + + assert result is False + mock_boto3_client.delete_objects.assert_not_called() + + def test_remove_directory_returns_false_on_client_error(self, s3_storage, mock_boto3_client): + """Should return False when deletion fails with ClientError.""" + directory = "documents/" + + paginator_mock = MagicMock() + mock_boto3_client.get_paginator.return_value = paginator_mock + paginator_mock.paginate.return_value = [ + {"Contents": [{"Key": "documents/file1.txt"}]} + ] + + mock_boto3_client.delete_objects.side_effect = ClientError( + {"Error": {"Code": "AccessDenied", "Message": "Access denied"}}, + "delete_objects" + ) + + result = s3_storage.remove_directory(directory) + + assert result is False From 67268fd35a12971a15551b17611958c2e7487318 Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Thu, 2 Oct 2025 02:34:05 +0530 Subject: [PATCH 2/5] tests:security --- tests/security/test_encryption.py | 95 +++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 tests/security/test_encryption.py diff --git a/tests/security/test_encryption.py b/tests/security/test_encryption.py new file mode 100644 index 00000000..659924aa --- /dev/null +++ b/tests/security/test_encryption.py @@ -0,0 +1,95 @@ +import base64 + +import pytest +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + +from application.security import encryption + + +def test_derive_key_uses_secret_and_user(monkeypatch): + monkeypatch.setattr(encryption.settings, "ENCRYPTION_SECRET_KEY", "test-secret") + salt = bytes(range(16)) + + expected_kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=100000, + backend=default_backend(), + ) + expected_key = expected_kdf.derive(b"test-secret#user-123") + + derived = encryption._derive_key("user-123", salt) + + assert derived == expected_key + + +def _fake_os_urandom_factory(values): + values_iter = iter(values) + + def _fake(length): + value = next(values_iter) + assert len(value) == length + return value + + return _fake + + +def test_encrypt_and_decrypt_round_trip(monkeypatch): + monkeypatch.setattr(encryption.settings, "ENCRYPTION_SECRET_KEY", "test-secret") + salt = bytes(range(16)) + iv = bytes(range(16, 32)) + monkeypatch.setattr(encryption.os, "urandom", _fake_os_urandom_factory([salt, iv])) + + credentials = {"token": "abc123", "refresh": "xyz789"} + + encrypted = encryption.encrypt_credentials(credentials, "user-123") + + decoded = base64.b64decode(encrypted) + assert decoded[:16] == salt + assert decoded[16:32] == iv + + decrypted = encryption.decrypt_credentials(encrypted, "user-123") + + assert decrypted == credentials + + +def test_encrypt_credentials_returns_empty_for_empty_input(monkeypatch): + monkeypatch.setattr(encryption.settings, "ENCRYPTION_SECRET_KEY", "test-secret") + + assert encryption.encrypt_credentials({}, "user-123") == "" + assert encryption.encrypt_credentials(None, "user-123") == "" + + +def test_encrypt_credentials_returns_empty_on_serialization_error(monkeypatch): + monkeypatch.setattr(encryption.settings, "ENCRYPTION_SECRET_KEY", "test-secret") + monkeypatch.setattr(encryption.os, "urandom", lambda length: b"\x00" * length) + + class NonSerializable: # pragma: no cover - simple helper container + pass + + credentials = {"bad": NonSerializable()} + + assert encryption.encrypt_credentials(credentials, "user-123") == "" + + +def test_decrypt_credentials_returns_empty_for_invalid_input(monkeypatch): + monkeypatch.setattr(encryption.settings, "ENCRYPTION_SECRET_KEY", "test-secret") + + assert encryption.decrypt_credentials("", "user-123") == {} + assert encryption.decrypt_credentials("not-base64", "user-123") == {} + + invalid_payload = base64.b64encode(b"short").decode() + assert encryption.decrypt_credentials(invalid_payload, "user-123") == {} + + +def test_pad_and_unpad_are_inverse(): + original = b"secret-data" + + padded = encryption._pad_data(original) + + assert len(padded) % 16 == 0 + assert encryption._unpad_data(padded) == original + From 5de15c8413ae93154d50d6ea5d239d075bb910a2 Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Thu, 2 Oct 2025 02:39:53 +0530 Subject: [PATCH 3/5] (feat:11Labs) just functional part --- application/core/settings.py | 1 + application/tts/elevenlabs.py | 98 ++++++++--------------------------- 2 files changed, 23 insertions(+), 76 deletions(-) diff --git a/application/core/settings.py b/application/core/settings.py index 1adf8351..91144ae9 100644 --- a/application/core/settings.py +++ b/application/core/settings.py @@ -118,6 +118,7 @@ class Settings(BaseSettings): # Encryption settings ENCRYPTION_SECRET_KEY: str = "default-docsgpt-encryption-key" + ELEVENLABS_API_KEY: Optional[str] = None path = Path(__file__).parent.parent.absolute() settings = Settings(_env_file=path.joinpath(".env"), _env_file_encoding="utf-8") diff --git a/application/tts/elevenlabs.py b/application/tts/elevenlabs.py index 2e8159b8..0d82021e 100644 --- a/application/tts/elevenlabs.py +++ b/application/tts/elevenlabs.py @@ -1,84 +1,30 @@ -import asyncio -import websockets -import json -import base64 from io import BytesIO +import base64 from application.tts.base import BaseTTS +from application.core.settings import settings class ElevenlabsTTS(BaseTTS): - def __init__(self): - self.api_key = 'ELEVENLABS_API_KEY'# here you should put your api key - self.model = "eleven_flash_v2_5" - self.voice = "VOICE_ID" # this is the hash code for the voice not the name! - self.write_audio = 1 + def __init__(self): + from elevenlabs.client import ElevenLabs + + self.client = ElevenLabs( + api_key=settings.ELEVENLABS_API_KEY, + ) + def text_to_speech(self, text): - asyncio.run(self._text_to_speech_websocket(text)) + lang = "en" + audio = self.client.generate( + text=text, + model="eleven_multilingual_v2", + voice="Brian", + ) + audio_data = BytesIO() + for chunk in audio: + audio_data.write(chunk) + audio_bytes = audio_data.getvalue() - async def _text_to_speech_websocket(self, text): - uri = f"wss://api.elevenlabs.io/v1/text-to-speech/{self.voice}/stream-input?model_id={self.model}" - websocket = await websockets.connect(uri) - payload = { - "text": " ", - "voice_settings": { - "stability": 0.5, - "similarity_boost": 0.8, - }, - "xi_api_key": self.api_key, - } - - await websocket.send(json.dumps(payload)) - - async def listen(): - while 1: - try: - msg = await websocket.recv() - data = json.loads(msg) - - if data.get("audio"): - print("audio received") - yield base64.b64decode(data["audio"]) - elif data.get("isFinal"): - break - except websockets.exceptions.ConnectionClosed: - print("websocket closed") - break - listen_task = asyncio.create_task(self.stream(listen())) - - await websocket.send(json.dumps({"text": text})) - # this is to signal the end of the text, either use this or flush - await websocket.send(json.dumps({"text": ""})) - - await listen_task - - async def stream(self, audio_stream): - if self.write_audio: - audio_bytes = BytesIO() - async for chunk in audio_stream: - if chunk: - audio_bytes.write(chunk) - with open("output_audio.mp3", "wb") as f: - f.write(audio_bytes.getvalue()) - - else: - async for chunk in audio_stream: - pass # depends on the streamer! - - -def test_elevenlabs_websocket(): - """ - Tests the ElevenlabsTTS text_to_speech method with a sample prompt. - Prints out the base64-encoded result and writes it to 'output_audio.mp3'. - """ - # Instantiate your TTS class - tts = ElevenlabsTTS() - - # Call the method with some sample text - tts.text_to_speech("Hello from ElevenLabs WebSocket!") - - print("Saved audio to output_audio.mp3.") - - -if __name__ == "__main__": - test_elevenlabs_websocket() + # Encode to base64 + audio_base64 = base64.b64encode(audio_bytes).decode("utf-8") + return audio_base64, lang \ No newline at end of file From 946865a335e58efdaee920affe3f3fe737f292b6 Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Thu, 2 Oct 2025 02:40:30 +0530 Subject: [PATCH 4/5] test:TTS --- tests/tts/test_elevenlabs_tts.py | 43 ++++++++++++++++++++++++++++++++ tests/tts/test_google_tts.py | 24 ++++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 tests/tts/test_elevenlabs_tts.py create mode 100644 tests/tts/test_google_tts.py diff --git a/tests/tts/test_elevenlabs_tts.py b/tests/tts/test_elevenlabs_tts.py new file mode 100644 index 00000000..bea61b80 --- /dev/null +++ b/tests/tts/test_elevenlabs_tts.py @@ -0,0 +1,43 @@ +import base64 +import sys +from types import ModuleType, SimpleNamespace + +from application.tts.elevenlabs import ElevenlabsTTS + + +def test_elevenlabs_text_to_speech_monkeypatched_client(monkeypatch): + monkeypatch.setattr( + "application.tts.elevenlabs.settings", + SimpleNamespace(ELEVENLABS_API_KEY="api-key"), + ) + + created = {} + + class DummyClient: + def __init__(self, api_key): + created["api_key"] = api_key + self.generate_calls = [] + + def generate(self, *, text, model, voice): + self.generate_calls.append({"text": text, "model": model, "voice": voice}) + yield b"chunk-one" + yield b"chunk-two" + + client_module = ModuleType("elevenlabs.client") + client_module.ElevenLabs = DummyClient + package_module = ModuleType("elevenlabs") + package_module.client = client_module + + monkeypatch.setitem(sys.modules, "elevenlabs", package_module) + monkeypatch.setitem(sys.modules, "elevenlabs.client", client_module) + + tts = ElevenlabsTTS() + audio_base64, lang = tts.text_to_speech("Speak") + + assert created["api_key"] == "api-key" + assert tts.client.generate_calls == [ + {"text": "Speak", "model": "eleven_multilingual_v2", "voice": "Brian"} + ] + assert lang == "en" + assert base64.b64decode(audio_base64.encode()) == b"chunk-onechunk-two" + diff --git a/tests/tts/test_google_tts.py b/tests/tts/test_google_tts.py new file mode 100644 index 00000000..ea80ab79 --- /dev/null +++ b/tests/tts/test_google_tts.py @@ -0,0 +1,24 @@ +import base64 + +from application.tts.google_tts import GoogleTTS + + +def test_google_tts_text_to_speech(monkeypatch): + captured = {} + + class DummyGTTS: + def __init__(self, *, text, lang, slow): + captured["args"] = {"text": text, "lang": lang, "slow": slow} + + def write_to_fp(self, fp): + fp.write(b"synthetic-audio") + + monkeypatch.setattr("application.tts.google_tts.gTTS", DummyGTTS) + + tts = GoogleTTS() + audio_base64, lang = tts.text_to_speech("hello world") + + assert captured["args"] == {"text": "hello world", "lang": "en", "slow": False} + assert lang == "en" + assert base64.b64decode(audio_base64.encode()) == b"synthetic-audio" + From 80aaecb5f09794908029e80be5f42066650c2af5 Mon Sep 17 00:00:00 2001 From: ManishMadan2882 Date: Thu, 2 Oct 2025 02:48:16 +0530 Subject: [PATCH 5/5] ruff-fix --- tests/security/test_encryption.py | 1 - tests/storage/test_local_storage.py | 1 - 2 files changed, 2 deletions(-) diff --git a/tests/security/test_encryption.py b/tests/security/test_encryption.py index 659924aa..9b99eb2f 100644 --- a/tests/security/test_encryption.py +++ b/tests/security/test_encryption.py @@ -1,6 +1,5 @@ import base64 -import pytest from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC diff --git a/tests/storage/test_local_storage.py b/tests/storage/test_local_storage.py index 61f9fb97..4277af3a 100644 --- a/tests/storage/test_local_storage.py +++ b/tests/storage/test_local_storage.py @@ -2,7 +2,6 @@ """ import io -import os import pytest from unittest.mock import patch, MagicMock, mock_open