Create utils.py

This commit is contained in:
Fr1ngg
2025-08-04 16:45:21 +03:00
committed by GitHub
parent f40d0a69c5
commit e42286cb18

235
utils.py Normal file
View File

@@ -0,0 +1,235 @@
import re
import uuid
import secrets
import string
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any, Tuple
import logging
logger = logging.getLogger(__name__)
def generate_username() -> str:
"""Generate random username for RemnaWave"""
return f"user_{secrets.token_hex(8)}"
def generate_password() -> str:
"""Generate random password"""
alphabet = string.ascii_letters + string.digits
return ''.join(secrets.choice(alphabet) for _ in range(12))
def generate_promocode() -> str:
"""Generate random promocode"""
alphabet = string.ascii_uppercase + string.digits
return ''.join(secrets.choice(alphabet) for _ in range(8))
def is_valid_email(email: str) -> bool:
"""Validate email format"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def is_valid_amount(amount_str: str) -> Tuple[bool, float]:
"""Validate and parse amount"""
try:
amount = float(amount_str.replace(',', '.'))
if amount <= 0:
return False, 0
if amount > 100000: # Max amount limit
return False, 0
return True, amount
except ValueError:
return False, 0
def format_date(date: datetime, lang: str = 'ru') -> str:
"""Format date for display"""
if lang == 'ru':
months = [
'января', 'февраля', 'марта', 'апреля', 'мая', 'июня',
'июля', 'августа', 'сентября', 'октября', 'ноября', 'декабря'
]
return f"{date.day} {months[date.month-1]} {date.year}"
else:
return date.strftime("%B %d, %Y")
def format_datetime(date: datetime, lang: str = 'ru') -> str:
"""Format datetime for display"""
if lang == 'ru':
return date.strftime("%d.%m.%Y %H:%M")
else:
return date.strftime("%m/%d/%Y %H:%M")
def calculate_expiry_date(days: int) -> str:
"""Calculate expiry date in ISO format"""
expiry = datetime.utcnow() + timedelta(days=days)
return expiry.isoformat() + 'Z'
def parse_telegram_id(text: str) -> Optional[int]:
"""Parse Telegram ID from text"""
try:
telegram_id = int(text.strip())
if telegram_id > 0:
return telegram_id
except ValueError:
pass
return None
def format_traffic(gb: int, lang: str = 'ru') -> str:
"""Format traffic limit for display"""
if gb == 0:
return "Безлимитный" if lang == 'ru' else "Unlimited"
else:
return f"{gb} ГБ" if lang == 'ru' else f"{gb} GB"
def paginate_list(items: List[Any], page: int, per_page: int = 10) -> Tuple[List[Any], int]:
"""Paginate list of items"""
total_pages = (len(items) + per_page - 1) // per_page
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
return items[start_idx:end_idx], total_pages
def escape_markdown(text: str) -> str:
"""Escape markdown special characters"""
special_chars = ['_', '*', '[', ']', '(', ')', '~', '`', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!']
for char in special_chars:
text = text.replace(char, f'\\{char}')
return text
def truncate_text(text: str, max_length: int = 4000) -> str:
"""Truncate text to fit Telegram message limits"""
if len(text) <= max_length:
return text
return text[:max_length-3] + "..."
def validate_squad_uuid(uuid_str: str) -> bool:
"""Validate UUID format"""
try:
uuid.UUID(uuid_str)
return True
except ValueError:
return False
def format_subscription_info(subscription: Dict[str, Any], lang: str = 'ru') -> str:
"""Format subscription information for display"""
from translations import t
traffic = format_traffic(subscription['traffic_limit_gb'], lang)
info = t('subscription_info', lang,
name=subscription['name'],
price=subscription['price'],
days=subscription['duration_days'],
traffic=traffic,
description=subscription.get('description', '')
)
return info
def format_user_subscription_info(user_sub: Dict[str, Any], subscription: Dict[str, Any],
expires_at: datetime, lang: str = 'ru') -> str:
"""Format user subscription information"""
from translations import t
traffic = format_traffic(subscription['traffic_limit_gb'], lang)
# Check if expired
now = datetime.utcnow()
if expires_at < now:
status = t('subscription_expired', lang)
else:
status = t('subscription_active', lang, date=format_date(expires_at, lang))
info = f"📋 {subscription['name']}\n"
info += f"{subscription['duration_days']} дней\n" if lang == 'ru' else f"{subscription['duration_days']} days\n"
info += f"📊 {traffic}\n"
info += f"🕒 {status}\n"
if subscription.get('description'):
info += f"\n{subscription['description']}"
return info
def validate_promocode_format(code: str) -> bool:
"""Validate promocode format"""
if not code:
return False
if len(code) < 3 or len(code) > 20:
return False
if not re.match(r'^[A-Z0-9]+$', code.upper()):
return False
return True
def calculate_discount(original_price: float, promocode: Dict[str, Any]) -> float:
"""Calculate discount amount"""
if promocode.get('discount_percent'):
return original_price * (promocode['discount_percent'] / 100)
else:
return min(promocode.get('discount_amount', 0), original_price)
def format_payment_status(status: str, lang: str = 'ru') -> str:
"""Format payment status for display"""
status_map = {
'pending': 'В ожидании' if lang == 'ru' else 'Pending',
'completed': 'Завершен' if lang == 'ru' else 'Completed',
'cancelled': 'Отменен' if lang == 'ru' else 'Cancelled',
'failed': 'Ошибка' if lang == 'ru' else 'Failed'
}
return status_map.get(status, status)
def clean_phone_number(phone: str) -> str:
"""Clean and format phone number"""
# Remove all non-digit characters
digits = re.sub(r'\D', '', phone)
# Handle Russian phone numbers
if digits.startswith('8') and len(digits) == 11:
digits = '7' + digits[1:]
elif digits.startswith('9') and len(digits) == 10:
digits = '7' + digits
return digits
def format_bytes(bytes_value: int) -> str:
"""Format bytes to human readable format"""
if bytes_value == 0:
return "0 B"
units = ['B', 'KB', 'MB', 'GB', 'TB']
unit_index = 0
value = float(bytes_value)
while value >= 1024 and unit_index < len(units) - 1:
value /= 1024
unit_index += 1
if unit_index == 0:
return f"{int(value)} {units[unit_index]}"
else:
return f"{value:.1f} {units[unit_index]}"
def get_subscription_connection_url(base_url: str, short_uuid: str) -> str:
"""Generate subscription connection URL"""
return f"{base_url.rstrip('/')}/api/sub/{short_uuid}"
def log_user_action(user_id: int, action: str, details: str = ""):
"""Log user action"""
logger.info(f"User {user_id} - {action}: {details}")
class States:
"""State constants for FSM"""
WAITING_LANGUAGE = "waiting_language"
WAITING_AMOUNT = "waiting_amount"
WAITING_PROMOCODE = "waiting_promocode"
# Admin states
ADMIN_CREATE_SUB_NAME = "admin_create_sub_name"
ADMIN_CREATE_SUB_DESC = "admin_create_sub_desc"
ADMIN_CREATE_SUB_PRICE = "admin_create_sub_price"
ADMIN_CREATE_SUB_DAYS = "admin_create_sub_days"
ADMIN_CREATE_SUB_TRAFFIC = "admin_create_sub_traffic"
ADMIN_CREATE_SUB_SQUAD = "admin_create_sub_squad"
ADMIN_ADD_BALANCE_USER = "admin_add_balance_user"
ADMIN_ADD_BALANCE_AMOUNT = "admin_add_balance_amount"
ADMIN_CREATE_PROMO_CODE = "admin_create_promo_code"
ADMIN_CREATE_PROMO_DISCOUNT = "admin_create_promo_discount"
ADMIN_CREATE_PROMO_LIMIT = "admin_create_promo_limit"