mirror of
https://github.com/BEDOLAGA-DEV/remnawave-bedolaga-telegram-bot.git
synced 2026-01-28 07:40:23 +00:00
- Add pyproject.toml with uv and ruff configuration - Pin Python version to 3.13 via .python-version - Add Makefile commands: lint, format, fix - Apply ruff formatting to entire codebase - Remove unused imports (base64 in yookassa/simple_subscription) - Update .gitignore for new config files
345 lines
11 KiB
Python
345 lines
11 KiB
Python
import json
|
||
import logging
|
||
from datetime import timedelta
|
||
from typing import Any
|
||
|
||
import redis.asyncio as redis
|
||
|
||
from app.config import settings
|
||
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class CacheService:
|
||
def __init__(self):
|
||
self.redis_client: redis.Redis | None = None
|
||
self._connected = False
|
||
|
||
async def connect(self):
|
||
try:
|
||
self.redis_client = redis.from_url(settings.REDIS_URL)
|
||
await self.redis_client.ping()
|
||
self._connected = True
|
||
logger.info('✅ Подключение к Redis кешу установлено')
|
||
except Exception as e:
|
||
logger.warning(f'⚠️ Не удалось подключиться к Redis: {e}')
|
||
self._connected = False
|
||
|
||
async def disconnect(self):
|
||
if self.redis_client:
|
||
await self.redis_client.close()
|
||
self._connected = False
|
||
|
||
async def get(self, key: str) -> Any | None:
|
||
if not self._connected:
|
||
return None
|
||
|
||
try:
|
||
value = await self.redis_client.get(key)
|
||
if value:
|
||
return json.loads(value)
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f'Ошибка получения из кеша {key}: {e}')
|
||
return None
|
||
|
||
async def set(self, key: str, value: Any, expire: int | timedelta = None) -> bool:
|
||
if not self._connected:
|
||
return False
|
||
|
||
try:
|
||
serialized_value = json.dumps(value, default=str)
|
||
|
||
if isinstance(expire, timedelta):
|
||
expire = int(expire.total_seconds())
|
||
|
||
await self.redis_client.set(key, serialized_value, ex=expire)
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f'Ошибка записи в кеш {key}: {e}')
|
||
return False
|
||
|
||
async def setnx(self, key: str, value: Any, expire: int | timedelta = None) -> bool:
|
||
"""Атомарная операция SET IF NOT EXISTS.
|
||
|
||
Устанавливает значение только если ключ не существует.
|
||
Возвращает True если значение было установлено, False если ключ уже существовал.
|
||
"""
|
||
if not self._connected:
|
||
return False
|
||
|
||
try:
|
||
serialized_value = json.dumps(value, default=str)
|
||
|
||
if isinstance(expire, timedelta):
|
||
expire = int(expire.total_seconds())
|
||
|
||
# SET с NX возвращает True если установлено, None если ключ существует
|
||
result = await self.redis_client.set(key, serialized_value, ex=expire, nx=True)
|
||
return result is True
|
||
except Exception as e:
|
||
logger.error(f'Ошибка setnx в кеш {key}: {e}')
|
||
return False
|
||
|
||
async def delete(self, key: str) -> bool:
|
||
if not self._connected:
|
||
return False
|
||
|
||
try:
|
||
deleted = await self.redis_client.delete(key)
|
||
return deleted > 0
|
||
except Exception as e:
|
||
logger.error(f'Ошибка удаления из кеша {key}: {e}')
|
||
return False
|
||
|
||
async def delete_pattern(self, pattern: str) -> int:
|
||
if not self._connected:
|
||
return 0
|
||
|
||
try:
|
||
keys = await self.redis_client.keys(pattern)
|
||
if not keys:
|
||
return 0
|
||
|
||
deleted = await self.redis_client.delete(*keys)
|
||
return int(deleted)
|
||
except Exception as e:
|
||
logger.error(f'Ошибка удаления ключей по шаблону {pattern}: {e}')
|
||
return 0
|
||
|
||
async def exists(self, key: str) -> bool:
|
||
if not self._connected:
|
||
return False
|
||
|
||
try:
|
||
return await self.redis_client.exists(key)
|
||
except Exception as e:
|
||
logger.error(f'Ошибка проверки существования в кеше {key}: {e}')
|
||
return False
|
||
|
||
async def expire(self, key: str, seconds: int) -> bool:
|
||
if not self._connected:
|
||
return False
|
||
|
||
try:
|
||
return await self.redis_client.expire(key, seconds)
|
||
except Exception as e:
|
||
logger.error(f'Ошибка установки TTL для {key}: {e}')
|
||
return False
|
||
|
||
async def get_keys(self, pattern: str = '*') -> list:
|
||
if not self._connected:
|
||
return []
|
||
|
||
try:
|
||
keys = await self.redis_client.keys(pattern)
|
||
return [key.decode() if isinstance(key, bytes) else key for key in keys]
|
||
except Exception as e:
|
||
logger.error(f'Ошибка получения ключей по паттерну {pattern}: {e}')
|
||
return []
|
||
|
||
async def flush_all(self) -> bool:
|
||
if not self._connected:
|
||
return False
|
||
|
||
try:
|
||
await self.redis_client.flushall()
|
||
logger.info('🗑️ Кеш полностью очищен')
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f'Ошибка очистки кеша: {e}')
|
||
return False
|
||
|
||
async def increment(self, key: str, amount: int = 1) -> int | None:
|
||
if not self._connected:
|
||
return None
|
||
|
||
try:
|
||
return await self.redis_client.incrby(key, amount)
|
||
except Exception as e:
|
||
logger.error(f'Ошибка инкремента {key}: {e}')
|
||
return None
|
||
|
||
async def set_hash(self, name: str, mapping: dict, expire: int = None) -> bool:
|
||
if not self._connected:
|
||
return False
|
||
|
||
try:
|
||
await self.redis_client.hset(name, mapping=mapping)
|
||
if expire:
|
||
await self.redis_client.expire(name, expire)
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f'Ошибка записи хеша {name}: {e}')
|
||
return False
|
||
|
||
async def get_hash(self, name: str, key: str = None) -> dict | str | None:
|
||
if not self._connected:
|
||
return None
|
||
|
||
try:
|
||
if key:
|
||
value = await self.redis_client.hget(name, key)
|
||
return value.decode() if value else None
|
||
hash_data = await self.redis_client.hgetall(name)
|
||
return {k.decode(): v.decode() for k, v in hash_data.items()}
|
||
except Exception as e:
|
||
logger.error(f'Ошибка получения хеша {name}: {e}')
|
||
return None
|
||
|
||
async def lpush(self, key: str, value: Any) -> bool:
|
||
"""Добавить элемент в начало списка (очереди)."""
|
||
if not self._connected:
|
||
return False
|
||
|
||
try:
|
||
serialized = json.dumps(value, default=str)
|
||
await self.redis_client.lpush(key, serialized)
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f'Ошибка добавления в очередь {key}: {e}')
|
||
return False
|
||
|
||
async def rpop(self, key: str) -> Any | None:
|
||
"""Извлечь элемент из конца списка (FIFO очередь)."""
|
||
if not self._connected:
|
||
return None
|
||
|
||
try:
|
||
value = await self.redis_client.rpop(key)
|
||
if value:
|
||
return json.loads(value)
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f'Ошибка извлечения из очереди {key}: {e}')
|
||
return None
|
||
|
||
async def llen(self, key: str) -> int:
|
||
"""Получить длину списка (очереди)."""
|
||
if not self._connected:
|
||
return 0
|
||
|
||
try:
|
||
return await self.redis_client.llen(key)
|
||
except Exception as e:
|
||
logger.error(f'Ошибка получения длины очереди {key}: {e}')
|
||
return 0
|
||
|
||
async def lrange(self, key: str, start: int = 0, end: int = -1) -> list:
|
||
"""Получить элементы списка без удаления."""
|
||
if not self._connected:
|
||
return []
|
||
|
||
try:
|
||
items = await self.redis_client.lrange(key, start, end)
|
||
return [json.loads(item) for item in items]
|
||
except Exception as e:
|
||
logger.error(f'Ошибка чтения очереди {key}: {e}')
|
||
return []
|
||
|
||
|
||
cache = CacheService()
|
||
|
||
|
||
def cache_key(*parts) -> str:
|
||
return ':'.join(str(part) for part in parts)
|
||
|
||
|
||
async def cached_function(key: str, expire: int = 300):
|
||
def decorator(func):
|
||
async def wrapper(*args, **kwargs):
|
||
cache_result = await cache.get(key)
|
||
if cache_result is not None:
|
||
return cache_result
|
||
|
||
result = await func(*args, **kwargs)
|
||
await cache.set(key, result, expire)
|
||
return result
|
||
|
||
return wrapper
|
||
|
||
return decorator
|
||
|
||
|
||
class UserCache:
|
||
@staticmethod
|
||
async def get_user_data(user_id: int) -> dict | None:
|
||
key = cache_key('user', user_id)
|
||
return await cache.get(key)
|
||
|
||
@staticmethod
|
||
async def set_user_data(user_id: int, data: dict, expire: int = 3600) -> bool:
|
||
key = cache_key('user', user_id)
|
||
return await cache.set(key, data, expire)
|
||
|
||
@staticmethod
|
||
async def delete_user_data(user_id: int) -> bool:
|
||
key = cache_key('user', user_id)
|
||
return await cache.delete(key)
|
||
|
||
@staticmethod
|
||
async def get_user_session(user_id: int, session_key: str) -> Any | None:
|
||
key = cache_key('session', user_id, session_key)
|
||
return await cache.get(key)
|
||
|
||
@staticmethod
|
||
async def set_user_session(user_id: int, session_key: str, data: Any, expire: int = 1800) -> bool:
|
||
key = cache_key('session', user_id, session_key)
|
||
return await cache.set(key, data, expire)
|
||
|
||
@staticmethod
|
||
async def delete_user_session(user_id: int, session_key: str) -> bool:
|
||
key = cache_key('session', user_id, session_key)
|
||
return await cache.delete(key)
|
||
|
||
|
||
class SystemCache:
|
||
@staticmethod
|
||
async def get_system_stats() -> dict | None:
|
||
return await cache.get('system:stats')
|
||
|
||
@staticmethod
|
||
async def set_system_stats(stats: dict, expire: int = 300) -> bool:
|
||
return await cache.set('system:stats', stats, expire)
|
||
|
||
@staticmethod
|
||
async def get_nodes_status() -> list | None:
|
||
return await cache.get('remnawave:nodes')
|
||
|
||
@staticmethod
|
||
async def set_nodes_status(nodes: list, expire: int = 60) -> bool:
|
||
return await cache.set('remnawave:nodes', nodes, expire)
|
||
|
||
@staticmethod
|
||
async def get_daily_stats(date: str) -> dict | None:
|
||
key = cache_key('stats', 'daily', date)
|
||
return await cache.get(key)
|
||
|
||
@staticmethod
|
||
async def set_daily_stats(date: str, stats: dict) -> bool:
|
||
key = cache_key('stats', 'daily', date)
|
||
return await cache.set(key, stats, 86400) # 24 часа
|
||
|
||
|
||
class RateLimitCache:
|
||
@staticmethod
|
||
async def is_rate_limited(user_id: int, action: str, limit: int, window: int) -> bool:
|
||
key = cache_key('rate_limit', user_id, action)
|
||
current = await cache.get(key)
|
||
|
||
if current is None:
|
||
await cache.set(key, 1, window)
|
||
return False
|
||
|
||
if current >= limit:
|
||
return True
|
||
|
||
await cache.increment(key)
|
||
return False
|
||
|
||
@staticmethod
|
||
async def reset_rate_limit(user_id: int, action: str) -> bool:
|
||
key = cache_key('rate_limit', user_id, action)
|
||
return await cache.delete(key)
|