Update database.py

This commit is contained in:
Egor
2025-08-15 04:49:48 +03:00
committed by GitHub
parent 27b7bfccba
commit 97a7384381

View File

@@ -662,28 +662,38 @@ class Database:
async def get_stats(self) -> dict:
async with self.session_factory() as session:
try:
from sqlalchemy import select, func
from sqlalchemy import select, func, and_
total_users = await session.execute(
select(func.count(User.id))
)
total_users = total_users.scalar()
total_subs_non_trial = await session.execute(
select(func.count(UserSubscription.id))
.join(Subscription, UserSubscription.subscription_id == Subscription.id)
.where(Subscription.is_trial == False)
)
total_subs_non_trial = total_subs_non_trial.scalar()
total_payments = await session.execute(
select(func.sum(Payment.amount)).where(
Payment.status == 'completed',
Payment.payment_type != 'trial'
and_(
Payment.status == 'completed',
Payment.payment_type.in_([
'topup', # Обычные пополнения
'subscription', # Покупка подписок
'subscription_extend', # Продление подписок
'promocode', # Активация промокодов
'admin_topup', # Пополнения администратором
'stars', # Пополнения через Telegram Stars
'autopay' # Автоплатежи
])
)
)
)
total_payments = total_payments.scalar() or 0
return {
'total_users': total_users,
'total_subscriptions_non_trial': total_subs_non_trial,
@@ -2437,62 +2447,96 @@ class Database:
}
async def get_lucky_game_admin_stats(self) -> dict:
try:
query = """
SELECT
COUNT(*) as total_games,
COUNT(CASE WHEN is_winner = 1 THEN 1 END) as total_wins,
COUNT(DISTINCT user_id) as unique_players,
SUM(reward_amount) as total_rewards,
AVG(reward_amount) as avg_reward,
COUNT(CASE WHEN DATE(played_at) = DATE('now') THEN 1 END) as games_today,
COUNT(CASE WHEN DATE(played_at) = DATE('now') AND is_winner = 1 THEN 1 END) as wins_today,
MAX(played_at) as last_game,
MIN(played_at) as first_game
FROM lucky_games
"""
async with self.get_connection() as conn:
async with conn.execute(query) as cursor:
row = await cursor.fetchone()
if row:
stats = {
'total_games': row[0] or 0,
'total_wins': row[1] or 0,
'unique_players': row[2] or 0,
'total_rewards': row[3] or 0.0,
'avg_reward': row[4] or 0.0,
'games_today': row[5] or 0,
'wins_today': row[6] or 0,
'last_game': row[7],
'first_game': row[8]
}
if stats['total_games'] > 0:
stats['win_rate'] = (stats['total_wins'] / stats['total_games']) * 100
stats['win_rate_today'] = (stats['wins_today'] / stats['games_today']) * 100 if stats['games_today'] > 0 else 0
else:
stats['win_rate'] = 0
stats['win_rate_today'] = 0
return stats
async with self.session_factory() as session:
try:
from sqlalchemy import select, func, and_, text
from datetime import date
total_games = await session.execute(
select(func.count(LuckyGame.id))
)
total_games = total_games.scalar() or 0
total_wins = await session.execute(
select(func.count(LuckyGame.id)).where(LuckyGame.is_winner == True)
)
total_wins = total_wins.scalar() or 0
unique_players = await session.execute(
select(func.count(func.distinct(LuckyGame.user_id)))
)
unique_players = unique_players.scalar() or 0
total_rewards = await session.execute(
select(func.sum(LuckyGame.reward_amount))
)
total_rewards = total_rewards.scalar() or 0.0
avg_reward = await session.execute(
select(func.avg(LuckyGame.reward_amount)).where(LuckyGame.is_winner == True)
)
avg_reward = avg_reward.scalar() or 0.0
today = date.today()
games_today = await session.execute(
select(func.count(LuckyGame.id)).where(
func.date(LuckyGame.played_at) == today
)
)
games_today = games_today.scalar() or 0
return {
'total_games': 0, 'total_wins': 0, 'unique_players': 0,
'total_rewards': 0.0, 'avg_reward': 0.0, 'games_today': 0,
'wins_today': 0, 'win_rate': 0, 'win_rate_today': 0,
'last_game': None, 'first_game': None
}
except Exception as e:
logger.error(f"Error getting lucky game admin stats: {e}")
return {
'total_games': 0, 'total_wins': 0, 'unique_players': 0,
'total_rewards': 0.0, 'avg_reward': 0.0, 'games_today': 0,
'wins_today': 0, 'win_rate': 0, 'win_rate_today': 0,
'last_game': None, 'first_game': None
}
wins_today = await session.execute(
select(func.count(LuckyGame.id)).where(
and_(
func.date(LuckyGame.played_at) == today,
LuckyGame.is_winner == True
)
)
)
wins_today = wins_today.scalar() or 0
first_game = await session.execute(
select(LuckyGame.played_at).order_by(LuckyGame.played_at.asc()).limit(1)
)
first_game_date = first_game.scalar()
last_game = await session.execute(
select(LuckyGame.played_at).order_by(LuckyGame.played_at.desc()).limit(1)
)
last_game_date = last_game.scalar()
win_rate = (total_wins / total_games * 100) if total_games > 0 else 0
win_rate_today = (wins_today / games_today * 100) if games_today > 0 else 0
return {
'total_games': total_games,
'total_wins': total_wins,
'unique_players': unique_players,
'total_rewards': total_rewards,
'avg_reward': avg_reward,
'games_today': games_today,
'wins_today': wins_today,
'win_rate': win_rate,
'win_rate_today': win_rate_today,
'first_game': first_game_date.isoformat() if first_game_date else None,
'last_game': last_game_date.isoformat() if last_game_date else None
}
except Exception as e:
logger.error(f"Error getting lucky game admin stats: {e}")
return {
'total_games': 0,
'total_wins': 0,
'unique_players': 0,
'total_rewards': 0.0,
'avg_reward': 0.0,
'games_today': 0,
'wins_today': 0,
'win_rate': 0,
'win_rate_today': 0,
'first_game': None,
'last_game': None
}
async def get_lucky_game_top_players(self, limit: int = 5) -> List[dict]:
try: