mirror of
https://github.com/BEDOLAGA-DEV/remnawave-bedolaga-telegram-bot.git
synced 2026-01-20 11:50:27 +00:00
236 lines
7.9 KiB
Python
236 lines
7.9 KiB
Python
import re
|
||
import uuid
|
||
import secrets
|
||
import string
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, List, Dict, Any, Tuple
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def generate_username() -> str:
|
||
"""Generate random username for RemnaWave"""
|
||
return f"user_{secrets.token_hex(8)}"
|
||
|
||
def generate_password() -> str:
|
||
"""Generate random password"""
|
||
alphabet = string.ascii_letters + string.digits
|
||
return ''.join(secrets.choice(alphabet) for _ in range(12))
|
||
|
||
def generate_promocode() -> str:
|
||
"""Generate random promocode"""
|
||
alphabet = string.ascii_uppercase + string.digits
|
||
return ''.join(secrets.choice(alphabet) for _ in range(8))
|
||
|
||
def is_valid_email(email: str) -> bool:
|
||
"""Validate email format"""
|
||
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
||
return re.match(pattern, email) is not None
|
||
|
||
def is_valid_amount(amount_str: str) -> Tuple[bool, float]:
|
||
"""Validate and parse amount"""
|
||
try:
|
||
amount = float(amount_str.replace(',', '.'))
|
||
if amount <= 0:
|
||
return False, 0
|
||
if amount > 100000: # Max amount limit
|
||
return False, 0
|
||
return True, amount
|
||
except ValueError:
|
||
return False, 0
|
||
|
||
def format_date(date: datetime, lang: str = 'ru') -> str:
|
||
"""Format date for display"""
|
||
if lang == 'ru':
|
||
months = [
|
||
'января', 'февраля', 'марта', 'апреля', 'мая', 'июня',
|
||
'июля', 'августа', 'сентября', 'октября', 'ноября', 'декабря'
|
||
]
|
||
return f"{date.day} {months[date.month-1]} {date.year}"
|
||
else:
|
||
return date.strftime("%B %d, %Y")
|
||
|
||
def format_datetime(date: datetime, lang: str = 'ru') -> str:
|
||
"""Format datetime for display"""
|
||
if lang == 'ru':
|
||
return date.strftime("%d.%m.%Y %H:%M")
|
||
else:
|
||
return date.strftime("%m/%d/%Y %H:%M")
|
||
|
||
def calculate_expiry_date(days: int) -> str:
|
||
"""Calculate expiry date in ISO format"""
|
||
expiry = datetime.utcnow() + timedelta(days=days)
|
||
return expiry.isoformat() + 'Z'
|
||
|
||
def parse_telegram_id(text: str) -> Optional[int]:
|
||
"""Parse Telegram ID from text"""
|
||
try:
|
||
telegram_id = int(text.strip())
|
||
if telegram_id > 0:
|
||
return telegram_id
|
||
except ValueError:
|
||
pass
|
||
return None
|
||
|
||
def format_traffic(gb: int, lang: str = 'ru') -> str:
|
||
"""Format traffic limit for display"""
|
||
if gb == 0:
|
||
return "Безлимитный" if lang == 'ru' else "Unlimited"
|
||
else:
|
||
return f"{gb} ГБ" if lang == 'ru' else f"{gb} GB"
|
||
|
||
def paginate_list(items: List[Any], page: int, per_page: int = 10) -> Tuple[List[Any], int]:
|
||
"""Paginate list of items"""
|
||
total_pages = (len(items) + per_page - 1) // per_page
|
||
start_idx = (page - 1) * per_page
|
||
end_idx = start_idx + per_page
|
||
return items[start_idx:end_idx], total_pages
|
||
|
||
def escape_markdown(text: str) -> str:
|
||
"""Escape markdown special characters"""
|
||
special_chars = ['_', '*', '[', ']', '(', ')', '~', '`', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!']
|
||
for char in special_chars:
|
||
text = text.replace(char, f'\\{char}')
|
||
return text
|
||
|
||
def truncate_text(text: str, max_length: int = 4000) -> str:
|
||
"""Truncate text to fit Telegram message limits"""
|
||
if len(text) <= max_length:
|
||
return text
|
||
return text[:max_length-3] + "..."
|
||
|
||
def validate_squad_uuid(uuid_str: str) -> bool:
|
||
"""Validate UUID format"""
|
||
try:
|
||
uuid.UUID(uuid_str)
|
||
return True
|
||
except ValueError:
|
||
return False
|
||
|
||
def format_subscription_info(subscription: Dict[str, Any], lang: str = 'ru') -> str:
|
||
"""Format subscription information for display"""
|
||
from translations import t
|
||
|
||
traffic = format_traffic(subscription['traffic_limit_gb'], lang)
|
||
|
||
info = t('subscription_info', lang,
|
||
name=subscription['name'],
|
||
price=subscription['price'],
|
||
days=subscription['duration_days'],
|
||
traffic=traffic,
|
||
description=subscription.get('description', '')
|
||
)
|
||
|
||
return info
|
||
|
||
def format_user_subscription_info(user_sub: Dict[str, Any], subscription: Dict[str, Any],
|
||
expires_at: datetime, lang: str = 'ru') -> str:
|
||
"""Format user subscription information"""
|
||
from translations import t
|
||
|
||
traffic = format_traffic(subscription['traffic_limit_gb'], lang)
|
||
|
||
# Check if expired
|
||
now = datetime.utcnow()
|
||
if expires_at < now:
|
||
status = t('subscription_expired', lang)
|
||
else:
|
||
status = t('subscription_active', lang, date=format_date(expires_at, lang))
|
||
|
||
info = f"📋 {subscription['name']}\n"
|
||
info += f"⏱ {subscription['duration_days']} дней\n" if lang == 'ru' else f"⏱ {subscription['duration_days']} days\n"
|
||
info += f"📊 {traffic}\n"
|
||
info += f"🕒 {status}\n"
|
||
|
||
if subscription.get('description'):
|
||
info += f"\n{subscription['description']}"
|
||
|
||
return info
|
||
|
||
def validate_promocode_format(code: str) -> bool:
|
||
"""Validate promocode format"""
|
||
if not code:
|
||
return False
|
||
if len(code) < 3 or len(code) > 20:
|
||
return False
|
||
if not re.match(r'^[A-Z0-9]+$', code.upper()):
|
||
return False
|
||
return True
|
||
|
||
def calculate_discount(original_price: float, promocode: Dict[str, Any]) -> float:
|
||
"""Calculate discount amount"""
|
||
if promocode.get('discount_percent'):
|
||
return original_price * (promocode['discount_percent'] / 100)
|
||
else:
|
||
return min(promocode.get('discount_amount', 0), original_price)
|
||
|
||
def format_payment_status(status: str, lang: str = 'ru') -> str:
|
||
"""Format payment status for display"""
|
||
status_map = {
|
||
'pending': 'В ожидании' if lang == 'ru' else 'Pending',
|
||
'completed': 'Завершен' if lang == 'ru' else 'Completed',
|
||
'cancelled': 'Отменен' if lang == 'ru' else 'Cancelled',
|
||
'failed': 'Ошибка' if lang == 'ru' else 'Failed'
|
||
}
|
||
return status_map.get(status, status)
|
||
|
||
def clean_phone_number(phone: str) -> str:
|
||
"""Clean and format phone number"""
|
||
# Remove all non-digit characters
|
||
digits = re.sub(r'\D', '', phone)
|
||
|
||
# Handle Russian phone numbers
|
||
if digits.startswith('8') and len(digits) == 11:
|
||
digits = '7' + digits[1:]
|
||
elif digits.startswith('9') and len(digits) == 10:
|
||
digits = '7' + digits
|
||
|
||
return digits
|
||
|
||
def format_bytes(bytes_value: int) -> str:
|
||
"""Format bytes to human readable format"""
|
||
if bytes_value == 0:
|
||
return "0 B"
|
||
|
||
units = ['B', 'KB', 'MB', 'GB', 'TB']
|
||
unit_index = 0
|
||
value = float(bytes_value)
|
||
|
||
while value >= 1024 and unit_index < len(units) - 1:
|
||
value /= 1024
|
||
unit_index += 1
|
||
|
||
if unit_index == 0:
|
||
return f"{int(value)} {units[unit_index]}"
|
||
else:
|
||
return f"{value:.1f} {units[unit_index]}"
|
||
|
||
def get_subscription_connection_url(base_url: str, short_uuid: str) -> str:
|
||
"""Generate subscription connection URL"""
|
||
return f"{base_url.rstrip('/')}/api/sub/{short_uuid}"
|
||
|
||
def log_user_action(user_id: int, action: str, details: str = ""):
|
||
"""Log user action"""
|
||
logger.info(f"User {user_id} - {action}: {details}")
|
||
|
||
class States:
|
||
"""State constants for FSM"""
|
||
WAITING_LANGUAGE = "waiting_language"
|
||
WAITING_AMOUNT = "waiting_amount"
|
||
WAITING_PROMOCODE = "waiting_promocode"
|
||
|
||
# Admin states
|
||
ADMIN_CREATE_SUB_NAME = "admin_create_sub_name"
|
||
ADMIN_CREATE_SUB_DESC = "admin_create_sub_desc"
|
||
ADMIN_CREATE_SUB_PRICE = "admin_create_sub_price"
|
||
ADMIN_CREATE_SUB_DAYS = "admin_create_sub_days"
|
||
ADMIN_CREATE_SUB_TRAFFIC = "admin_create_sub_traffic"
|
||
ADMIN_CREATE_SUB_SQUAD = "admin_create_sub_squad"
|
||
|
||
ADMIN_ADD_BALANCE_USER = "admin_add_balance_user"
|
||
ADMIN_ADD_BALANCE_AMOUNT = "admin_add_balance_amount"
|
||
|
||
ADMIN_CREATE_PROMO_CODE = "admin_create_promo_code"
|
||
ADMIN_CREATE_PROMO_DISCOUNT = "admin_create_promo_discount"
|
||
ADMIN_CREATE_PROMO_LIMIT = "admin_create_promo_limit"
|