mirror of
https://github.com/arc53/DocsGPT.git
synced 2025-12-02 01:53:14 +00:00
feat: finalize remote mcp
This commit is contained in:
@@ -1,97 +1,85 @@
|
||||
"""
|
||||
Simple encryption utility for securely storing sensitive credentials.
|
||||
Uses XOR encryption with a key derived from app secret and user ID.
|
||||
Note: This is basic obfuscation. For production, consider using cryptography library.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
import os
|
||||
import json
|
||||
import os
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.ciphers import algorithms, Cipher, modes
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
|
||||
from application.core.settings import settings
|
||||
|
||||
|
||||
def _get_encryption_key(user_id: str) -> bytes:
|
||||
"""
|
||||
Generate a consistent encryption key for a specific user.
|
||||
Uses app secret + user ID to create a unique key per user.
|
||||
"""
|
||||
# Get app secret from environment or use a default (in production, always use env)
|
||||
app_secret = os.environ.get(
|
||||
"APP_SECRET_KEY", "default-docsgpt-secret-key-change-in-production"
|
||||
def _derive_key(user_id: str, salt: bytes) -> bytes:
|
||||
app_secret = settings.ENCRYPTION_SECRET_KEY
|
||||
|
||||
password = f"{app_secret}#{user_id}".encode()
|
||||
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(),
|
||||
length=32,
|
||||
salt=salt,
|
||||
iterations=100000,
|
||||
backend=default_backend(),
|
||||
)
|
||||
|
||||
# Combine app secret with user ID for user-specific encryption
|
||||
combined = f"{app_secret}#{user_id}"
|
||||
|
||||
# Create a 32-byte key
|
||||
key_material = hashlib.sha256(combined.encode()).digest()
|
||||
|
||||
return key_material
|
||||
|
||||
|
||||
def _xor_encrypt_decrypt(data: bytes, key: bytes) -> bytes:
|
||||
"""Simple XOR encryption/decryption."""
|
||||
result = bytearray()
|
||||
for i, byte in enumerate(data):
|
||||
result.append(byte ^ key[i % len(key)])
|
||||
return bytes(result)
|
||||
return kdf.derive(password)
|
||||
|
||||
|
||||
def encrypt_credentials(credentials: dict, user_id: str) -> str:
|
||||
"""
|
||||
Encrypt credentials dictionary for secure storage.
|
||||
|
||||
Args:
|
||||
credentials: Dictionary containing sensitive data
|
||||
user_id: User ID for creating user-specific encryption key
|
||||
|
||||
Returns:
|
||||
Base64 encoded encrypted string
|
||||
"""
|
||||
if not credentials:
|
||||
return ""
|
||||
|
||||
try:
|
||||
key = _get_encryption_key(user_id)
|
||||
salt = os.urandom(16)
|
||||
iv = os.urandom(16)
|
||||
key = _derive_key(user_id, salt)
|
||||
|
||||
# Convert dict to JSON string and encrypt
|
||||
json_str = json.dumps(credentials)
|
||||
encrypted_data = _xor_encrypt_decrypt(json_str.encode(), key)
|
||||
|
||||
# Return base64 encoded for storage
|
||||
return base64.b64encode(encrypted_data).decode()
|
||||
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
|
||||
encryptor = cipher.encryptor()
|
||||
|
||||
padded_data = _pad_data(json_str.encode())
|
||||
encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
|
||||
|
||||
result = salt + iv + encrypted_data
|
||||
return base64.b64encode(result).decode()
|
||||
except Exception as e:
|
||||
# If encryption fails, store empty string (will require re-auth)
|
||||
print(f"Warning: Failed to encrypt credentials: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
def decrypt_credentials(encrypted_data: str, user_id: str) -> dict:
|
||||
"""
|
||||
Decrypt credentials from storage.
|
||||
|
||||
Args:
|
||||
encrypted_data: Base64 encoded encrypted string
|
||||
user_id: User ID for creating user-specific encryption key
|
||||
|
||||
Returns:
|
||||
Dictionary containing decrypted credentials
|
||||
"""
|
||||
if not encrypted_data:
|
||||
return {}
|
||||
|
||||
try:
|
||||
key = _get_encryption_key(user_id)
|
||||
data = base64.b64decode(encrypted_data.encode())
|
||||
|
||||
# Decode and decrypt
|
||||
encrypted_bytes = base64.b64decode(encrypted_data.encode())
|
||||
decrypted_data = _xor_encrypt_decrypt(encrypted_bytes, key)
|
||||
salt = data[:16]
|
||||
iv = data[16:32]
|
||||
encrypted_content = data[32:]
|
||||
|
||||
key = _derive_key(user_id, salt)
|
||||
|
||||
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
|
||||
decryptor = cipher.decryptor()
|
||||
|
||||
decrypted_padded = decryptor.update(encrypted_content) + decryptor.finalize()
|
||||
decrypted_data = _unpad_data(decrypted_padded)
|
||||
|
||||
# Parse JSON back to dict
|
||||
return json.loads(decrypted_data.decode())
|
||||
|
||||
except Exception as e:
|
||||
# If decryption fails, return empty dict (will require re-auth)
|
||||
print(f"Warning: Failed to decrypt credentials: {e}")
|
||||
return {}
|
||||
|
||||
|
||||
def _pad_data(data: bytes) -> bytes:
|
||||
block_size = 16
|
||||
padding_len = block_size - (len(data) % block_size)
|
||||
padding = bytes([padding_len]) * padding_len
|
||||
return data + padding
|
||||
|
||||
|
||||
def _unpad_data(data: bytes) -> bytes:
|
||||
padding_len = data[-1]
|
||||
return data[:-padding_len]
|
||||
|
||||
Reference in New Issue
Block a user