Update database.py

This commit is contained in:
Fr1ngg
2025-08-12 02:33:44 +03:00
committed by GitHub
parent 23606dc2d4
commit 8d021c1530

View File

@@ -2149,3 +2149,289 @@ class Database:
except Exception as e:
logger.error(f"Error getting users with insufficient autopay balance: {e}")
return []
async def get_all_user_subscriptions_admin(self, offset: int = 0, limit: int = 20,
filter_type: str = "all") -> tuple[List[Dict], int]:
async with self.session_factory() as session:
try:
from sqlalchemy import select, func, desc, and_, or_
from datetime import datetime, timedelta
base_query = select(
UserSubscription,
Subscription.name.label('subscription_name'),
Subscription.price.label('subscription_price'),
Subscription.is_trial.label('is_trial'),
Subscription.is_imported.label('is_imported'),
User.username.label('user_username'),
User.first_name.label('user_first_name'),
User.telegram_id.label('user_telegram_id')
).select_from(
UserSubscription.__table__.join(
Subscription.__table__, UserSubscription.subscription_id == Subscription.id
).join(
User.__table__, UserSubscription.user_id == User.telegram_id
)
)
current_time = datetime.utcnow()
if filter_type == "active":
base_query = base_query.where(
and_(
UserSubscription.is_active == True,
UserSubscription.expires_at > current_time
)
)
elif filter_type == "expired":
base_query = base_query.where(
or_(
UserSubscription.is_active == False,
UserSubscription.expires_at <= current_time
)
)
elif filter_type == "expiring":
expiring_date = current_time + timedelta(days=7)
base_query = base_query.where(
and_(
UserSubscription.is_active == True,
UserSubscription.expires_at > current_time,
UserSubscription.expires_at <= expiring_date
)
)
elif filter_type == "autopay":
base_query = base_query.where(UserSubscription.auto_pay_enabled == True)
elif filter_type == "trial":
base_query = base_query.where(Subscription.is_trial == True)
elif filter_type == "imported":
base_query = base_query.where(Subscription.is_imported == True)
count_query = select(func.count()).select_from(base_query.subquery())
total_count_result = await session.execute(count_query)
total_count = total_count_result.scalar() or 0
data_query = base_query.order_by(desc(UserSubscription.created_at)).offset(offset).limit(limit)
result = await session.execute(data_query)
subscriptions_data = []
for row in result.fetchall():
user_sub = row[0]
if user_sub.expires_at <= current_time:
status = "expired"
elif not user_sub.is_active:
status = "inactive"
else:
days_left = (user_sub.expires_at - current_time).days
if days_left <= 3:
status = "expiring_soon"
elif days_left <= 7:
status = "expiring"
else:
status = "active"
subscriptions_data.append({
'id': user_sub.id,
'user_id': row.user_telegram_id,
'user_username': row.user_username or 'N/A',
'user_first_name': row.user_first_name or 'N/A',
'subscription_name': row.subscription_name,
'subscription_price': row.subscription_price,
'short_uuid': user_sub.short_uuid,
'expires_at': user_sub.expires_at,
'is_active': user_sub.is_active,
'auto_pay_enabled': user_sub.auto_pay_enabled,
'auto_pay_days_before': user_sub.auto_pay_days_before,
'created_at': user_sub.created_at,
'is_trial': row.is_trial,
'is_imported': row.is_imported,
'status': status,
'days_left': (user_sub.expires_at - current_time).days if user_sub.expires_at > current_time else 0
})
return subscriptions_data, total_count
except Exception as e:
logger.error(f"Error getting admin user subscriptions: {e}")
return [], 0
async def get_user_subscription_detail_admin(self, subscription_id: int) -> Optional[Dict]:
async with self.session_factory() as session:
try:
from sqlalchemy import select
query = select(
UserSubscription,
Subscription.name.label('subscription_name'),
Subscription.description.label('subscription_description'),
Subscription.price.label('subscription_price'),
Subscription.duration_days.label('subscription_duration'),
Subscription.traffic_limit_gb.label('subscription_traffic_limit'),
Subscription.is_trial.label('is_trial'),
Subscription.is_imported.label('is_imported'),
User.username.label('user_username'),
User.first_name.label('user_first_name'),
User.telegram_id.label('user_telegram_id'),
User.balance.label('user_balance')
).select_from(
UserSubscription.__table__.join(
Subscription.__table__, UserSubscription.subscription_id == Subscription.id
).join(
User.__table__, UserSubscription.user_id == User.telegram_id
)
).where(UserSubscription.id == subscription_id)
result = await session.execute(query)
row = result.fetchone()
if not row:
return None
user_sub = row[0]
current_time = datetime.utcnow()
if user_sub.expires_at <= current_time:
status = "expired"
status_emoji = ""
elif not user_sub.is_active:
status = "inactive"
status_emoji = ""
else:
days_left = (user_sub.expires_at - current_time).days
if days_left <= 1:
status = "expiring_today"
status_emoji = "🔴"
elif days_left <= 3:
status = "expiring_soon"
status_emoji = "🟡"
elif days_left <= 7:
status = "expiring"
status_emoji = "🟠"
else:
status = "active"
status_emoji = "🟢"
return {
'id': user_sub.id,
'user_id': row.user_telegram_id,
'user_username': row.user_username or 'N/A',
'user_first_name': row.user_first_name or 'N/A',
'user_balance': row.user_balance,
'subscription_name': row.subscription_name,
'subscription_description': row.subscription_description,
'subscription_price': row.subscription_price,
'subscription_duration': row.subscription_duration,
'subscription_traffic_limit': row.subscription_traffic_limit,
'short_uuid': user_sub.short_uuid,
'expires_at': user_sub.expires_at,
'is_active': user_sub.is_active,
'auto_pay_enabled': user_sub.auto_pay_enabled,
'auto_pay_days_before': user_sub.auto_pay_days_before,
'created_at': user_sub.created_at,
'updated_at': user_sub.updated_at,
'is_trial': row.is_trial,
'is_imported': row.is_imported,
'status': status,
'status_emoji': status_emoji,
'days_left': (user_sub.expires_at - current_time).days if user_sub.expires_at > current_time else 0
}
except Exception as e:
logger.error(f"Error getting user subscription detail: {e}")
return None
async def get_user_subscriptions_stats_admin(self) -> Dict[str, Any]:
async with self.session_factory() as session:
try:
from sqlalchemy import select, func, and_, or_
from datetime import datetime, timedelta
current_time = datetime.utcnow()
total_subs = await session.execute(
select(func.count(UserSubscription.id))
)
total_subscriptions = total_subs.scalar() or 0
active_subs = await session.execute(
select(func.count(UserSubscription.id)).where(
and_(
UserSubscription.is_active == True,
UserSubscription.expires_at > current_time
)
)
)
active_subscriptions = active_subs.scalar() or 0
expired_subs = await session.execute(
select(func.count(UserSubscription.id)).where(
or_(
UserSubscription.is_active == False,
UserSubscription.expires_at <= current_time
)
)
)
expired_subscriptions = expired_subs.scalar() or 0
autopay_subs = await session.execute(
select(func.count(UserSubscription.id)).where(
UserSubscription.auto_pay_enabled == True
)
)
autopay_subscriptions = autopay_subs.scalar() or 0
expiring_date = current_time + timedelta(days=7)
expiring_subs = await session.execute(
select(func.count(UserSubscription.id)).where(
and_(
UserSubscription.is_active == True,
UserSubscription.expires_at > current_time,
UserSubscription.expires_at <= expiring_date
)
)
)
expiring_subscriptions = expiring_subs.scalar() or 0
trial_subs = await session.execute(
select(func.count(UserSubscription.id))
.select_from(
UserSubscription.__table__.join(
Subscription.__table__, UserSubscription.subscription_id == Subscription.id
)
)
.where(Subscription.is_trial == True)
)
trial_subscriptions = trial_subs.scalar() or 0
imported_subs = await session.execute(
select(func.count(UserSubscription.id))
.select_from(
UserSubscription.__table__.join(
Subscription.__table__, UserSubscription.subscription_id == Subscription.id
)
)
.where(Subscription.is_imported == True)
)
imported_subscriptions = imported_subs.scalar() or 0
return {
'total_subscriptions': total_subscriptions,
'active_subscriptions': active_subscriptions,
'expired_subscriptions': expired_subscriptions,
'autopay_subscriptions': autopay_subscriptions,
'expiring_subscriptions': expiring_subscriptions,
'trial_subscriptions': trial_subscriptions,
'imported_subscriptions': imported_subscriptions
}
except Exception as e:
logger.error(f"Error getting user subscriptions stats: {e}")
return {
'total_subscriptions': 0,
'active_subscriptions': 0,
'expired_subscriptions': 0,
'autopay_subscriptions': 0,
'expiring_subscriptions': 0,
'trial_subscriptions': 0,
'imported_subscriptions': 0
}