Расширение фильтров

This commit is contained in:
gy9vin
2025-09-30 12:39:15 +03:00
parent 400cc5a32e
commit c344f418c5
5 changed files with 871 additions and 17 deletions

View File

@@ -2,8 +2,8 @@ import logging
import secrets
import string
from datetime import datetime, timedelta
from typing import Optional, List
from sqlalchemy import select, and_, or_, func
from typing import Optional, List, Dict
from sqlalchemy import select, and_, or_, func, case, nullslast
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
@@ -273,7 +273,11 @@ async def get_users_list(
limit: int = 50,
search: Optional[str] = None,
status: Optional[UserStatus] = None,
order_by_balance: bool = False
order_by_balance: bool = False,
order_by_traffic: bool = False,
order_by_last_activity: bool = False,
order_by_total_spent: bool = False,
order_by_purchase_count: bool = False
) -> List[User]:
query = select(User).options(selectinload(User.subscription))
@@ -293,10 +297,71 @@ async def get_users_list(
conditions.append(User.telegram_id == int(search))
query = query.where(or_(*conditions))
# Сортировка по балансу в порядке убывания, если order_by_balance=True
if order_by_balance:
query = query.order_by(User.balance_kopeks.desc())
sort_flags = [
order_by_balance,
order_by_traffic,
order_by_last_activity,
order_by_total_spent,
order_by_purchase_count,
]
if sum(int(flag) for flag in sort_flags) > 1:
logger.debug(
"Выбрано несколько сортировок пользователей — применяется приоритет: трафик > траты > покупки > баланс > активность"
)
transactions_stats = None
if order_by_total_spent or order_by_purchase_count:
from app.database.models import Transaction
transactions_stats = (
select(
Transaction.user_id.label("user_id"),
func.coalesce(
func.sum(
case(
(
Transaction.type == TransactionType.SUBSCRIPTION_PAYMENT.value,
Transaction.amount_kopeks,
),
else_=0,
)
),
0,
).label("total_spent"),
func.coalesce(
func.sum(
case(
(
Transaction.type == TransactionType.SUBSCRIPTION_PAYMENT.value,
1,
),
else_=0,
)
),
0,
).label("purchase_count"),
)
.where(Transaction.is_completed.is_(True))
.group_by(Transaction.user_id)
.subquery()
)
query = query.outerjoin(transactions_stats, transactions_stats.c.user_id == User.id)
if order_by_traffic:
traffic_sort = func.coalesce(Subscription.traffic_used_gb, 0.0)
query = query.outerjoin(Subscription, Subscription.user_id == User.id)
query = query.order_by(traffic_sort.desc(), User.created_at.desc())
elif order_by_total_spent:
order_column = func.coalesce(transactions_stats.c.total_spent, 0)
query = query.order_by(order_column.desc(), User.created_at.desc())
elif order_by_purchase_count:
order_column = func.coalesce(transactions_stats.c.purchase_count, 0)
query = query.order_by(order_column.desc(), User.created_at.desc())
elif order_by_balance:
query = query.order_by(User.balance_kopeks.desc(), User.created_at.desc())
elif order_by_last_activity:
query = query.order_by(nullslast(User.last_activity.desc()), User.created_at.desc())
else:
query = query.order_by(User.created_at.desc())
@@ -334,6 +399,62 @@ async def get_users_count(
return result.scalar()
async def get_users_spending_stats(
db: AsyncSession,
user_ids: List[int]
) -> Dict[int, Dict[str, int]]:
if not user_ids:
return {}
from app.database.models import Transaction
stats_query = (
select(
Transaction.user_id,
func.coalesce(
func.sum(
case(
(
Transaction.type == TransactionType.SUBSCRIPTION_PAYMENT.value,
Transaction.amount_kopeks,
),
else_=0,
)
),
0,
).label("total_spent"),
func.coalesce(
func.sum(
case(
(
Transaction.type == TransactionType.SUBSCRIPTION_PAYMENT.value,
1,
),
else_=0,
)
),
0,
).label("purchase_count"),
)
.where(
Transaction.user_id.in_(user_ids),
Transaction.is_completed.is_(True),
)
.group_by(Transaction.user_id)
)
result = await db.execute(stats_query)
rows = result.all()
return {
row.user_id: {
"total_spent": int(row.total_spent or 0),
"purchase_count": int(row.purchase_count or 0),
}
for row in rows
}
async def get_referrals(db: AsyncSession, user_id: int) -> List[User]:
result = await db.execute(
select(User)

View File

@@ -81,7 +81,7 @@ async def show_users_filters(
state: FSMContext
):
text = "⚙️ <b>Фильтры пользователей</b>\n\nВыберите фильтр для отображения пользователей:"
text = ("⚙️ <b>Фильтры пользователей</b>\n\nВыберите фильтр для отображения пользователей:\n")
await callback.message.edit_text(
text,
@@ -289,6 +289,464 @@ async def show_users_list_by_balance(
await callback.answer()
@admin_required
@error_handler
async def show_users_list_by_traffic(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext,
page: int = 1
):
await state.set_state(AdminStates.viewing_user_from_traffic_list)
user_service = UserService()
users_data = await user_service.get_users_page(
db, page=page, limit=10, order_by_traffic=True
)
if not users_data["users"]:
await callback.message.edit_text(
"📶 Пользователи с трафиком не найдены",
reply_markup=get_admin_users_keyboard(db_user.language)
)
await callback.answer()
return
text = f"👥 <b>Список пользователей по использованному трафику</b> (стр. {page}/{users_data['total_pages']})\n\n"
text += "Нажмите на пользователя для управления:"
keyboard = []
for user in users_data["users"]:
if user.status == UserStatus.ACTIVE.value:
status_emoji = ""
elif user.status == UserStatus.BLOCKED.value:
status_emoji = "🚫"
else:
status_emoji = "🗑️"
if user.subscription:
sub = user.subscription
if sub.is_trial:
subscription_emoji = "🎁"
elif sub.is_active:
subscription_emoji = "💎"
else:
subscription_emoji = ""
used = sub.traffic_used_gb or 0.0
if sub.traffic_limit_gb and sub.traffic_limit_gb > 0:
limit_display = f"{sub.traffic_limit_gb}"
else:
limit_display = "♾️"
traffic_display = f"{used:.1f}/{limit_display} ГБ"
else:
subscription_emoji = ""
traffic_display = "нет подписки"
button_text = f"{status_emoji} {subscription_emoji} {user.full_name}"
button_text += f" | 📶 {traffic_display}"
if user.balance_kopeks > 0:
button_text += f" | 💰 {settings.format_price(user.balance_kopeks)}"
if len(button_text) > 60:
short_name = user.full_name
if len(short_name) > 20:
short_name = short_name[:17] + "..."
button_text = f"{status_emoji} {subscription_emoji} {short_name}"
button_text += f" | 📶 {traffic_display}"
keyboard.append([
types.InlineKeyboardButton(
text=button_text,
callback_data=f"admin_user_manage_{user.id}"
)
])
if users_data["total_pages"] > 1:
pagination_row = get_admin_pagination_keyboard(
users_data["current_page"],
users_data["total_pages"],
"admin_users_traffic_list",
"admin_users",
db_user.language
).inline_keyboard[0]
keyboard.append(pagination_row)
keyboard.extend([
[
types.InlineKeyboardButton(text="🔍 Поиск", callback_data="admin_users_search"),
types.InlineKeyboardButton(text="📊 Статистика", callback_data="admin_users_stats")
],
[
types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_users")
]
])
await callback.message.edit_text(
text,
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard)
)
await callback.answer()
@admin_required
@error_handler
async def show_users_list_by_last_activity(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext,
page: int = 1
):
await state.set_state(AdminStates.viewing_user_from_last_activity_list)
user_service = UserService()
users_data = await user_service.get_users_page(
db,
page=page,
limit=10,
order_by_last_activity=True,
)
if not users_data["users"]:
await callback.message.edit_text(
"🕒 Пользователи с активностью не найдены",
reply_markup=get_admin_users_keyboard(db_user.language)
)
await callback.answer()
return
text = f"👥 <b>Пользователи по активности</b> (стр. {page}/{users_data['total_pages']})\n\n"
text += "Нажмите на пользователя для управления:"
keyboard = []
for user in users_data["users"]:
if user.status == UserStatus.ACTIVE.value:
status_emoji = ""
elif user.status == UserStatus.BLOCKED.value:
status_emoji = "🚫"
else:
status_emoji = "🗑️"
activity_display = (
format_time_ago(user.last_activity)
if user.last_activity
else "неизвестно"
)
subscription_emoji = ""
if user.subscription:
if user.subscription.is_trial:
subscription_emoji = "🎁"
elif user.subscription.is_active:
subscription_emoji = "💎"
else:
subscription_emoji = ""
button_text = f"{status_emoji} {subscription_emoji} {user.full_name}"
button_text += f" | 🕒 {activity_display}"
keyboard.append([
types.InlineKeyboardButton(
text=button_text,
callback_data=f"admin_user_manage_{user.id}"
)
])
if users_data["total_pages"] > 1:
pagination_row = get_admin_pagination_keyboard(
users_data["current_page"],
users_data["total_pages"],
"admin_users_activity_list",
"admin_users",
db_user.language
).inline_keyboard[0]
keyboard.append(pagination_row)
keyboard.extend([
[
types.InlineKeyboardButton(text="🔍 Поиск", callback_data="admin_users_search"),
types.InlineKeyboardButton(text="📊 Статистика", callback_data="admin_users_stats")
],
[
types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_users")
]
])
await callback.message.edit_text(
text,
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard)
)
await callback.answer()
@admin_required
@error_handler
async def show_users_list_by_spending(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext,
page: int = 1
):
await state.set_state(AdminStates.viewing_user_from_spending_list)
user_service = UserService()
users_data = await user_service.get_users_page(
db,
page=page,
limit=10,
order_by_total_spent=True,
)
users = users_data["users"]
if not users:
await callback.message.edit_text(
"💳 Пользователи с тратами не найдены",
reply_markup=get_admin_users_keyboard(db_user.language)
)
await callback.answer()
return
spending_map = await user_service.get_user_spending_stats_map(
db,
[user.id for user in users],
)
text = f"👥 <b>Пользователи по сумме трат</b> (стр. {page}/{users_data['total_pages']})\n\n"
text += "Нажмите на пользователя для управления:"
keyboard = []
for user in users:
stats = spending_map.get(
user.id,
{"total_spent": 0, "purchase_count": 0},
)
total_spent = stats.get("total_spent", 0)
purchases = stats.get("purchase_count", 0)
status_emoji = "" if user.status == UserStatus.ACTIVE.value else "🚫" if user.status == UserStatus.BLOCKED.value else "🗑️"
button_text = (
f"{status_emoji} {user.full_name}"
f" | 💳 {settings.format_price(total_spent)}"
f" | 🛒 {purchases}"
)
keyboard.append([
types.InlineKeyboardButton(
text=button_text,
callback_data=f"admin_user_manage_{user.id}"
)
])
if users_data["total_pages"] > 1:
pagination_row = get_admin_pagination_keyboard(
users_data["current_page"],
users_data["total_pages"],
"admin_users_spending_list",
"admin_users",
db_user.language
).inline_keyboard[0]
keyboard.append(pagination_row)
keyboard.extend([
[
types.InlineKeyboardButton(text="🔍 Поиск", callback_data="admin_users_search"),
types.InlineKeyboardButton(text="📊 Статистика", callback_data="admin_users_stats")
],
[
types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_users")
]
])
await callback.message.edit_text(
text,
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard)
)
await callback.answer()
@admin_required
@error_handler
async def show_users_list_by_purchases(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext,
page: int = 1
):
await state.set_state(AdminStates.viewing_user_from_purchases_list)
user_service = UserService()
users_data = await user_service.get_users_page(
db,
page=page,
limit=10,
order_by_purchase_count=True,
)
users = users_data["users"]
if not users:
await callback.message.edit_text(
"🛒 Пользователи с покупками не найдены",
reply_markup=get_admin_users_keyboard(db_user.language)
)
await callback.answer()
return
spending_map = await user_service.get_user_spending_stats_map(
db,
[user.id for user in users],
)
text = f"👥 <b>Пользователи по количеству покупок</b> (стр. {page}/{users_data['total_pages']})\n\n"
text += "Нажмите на пользователя для управления:"
keyboard = []
for user in users:
stats = spending_map.get(
user.id,
{"total_spent": 0, "purchase_count": 0},
)
total_spent = stats.get("total_spent", 0)
purchases = stats.get("purchase_count", 0)
status_emoji = "" if user.status == UserStatus.ACTIVE.value else "🚫" if user.status == UserStatus.BLOCKED.value else "🗑️"
button_text = (
f"{status_emoji} {user.full_name}"
f" | 🛒 {purchases}"
f" | 💳 {settings.format_price(total_spent)}"
)
keyboard.append([
types.InlineKeyboardButton(
text=button_text,
callback_data=f"admin_user_manage_{user.id}"
)
])
if users_data["total_pages"] > 1:
pagination_row = get_admin_pagination_keyboard(
users_data["current_page"],
users_data["total_pages"],
"admin_users_purchases_list",
"admin_users",
db_user.language
).inline_keyboard[0]
keyboard.append(pagination_row)
keyboard.extend([
[
types.InlineKeyboardButton(text="🔍 Поиск", callback_data="admin_users_search"),
types.InlineKeyboardButton(text="📊 Статистика", callback_data="admin_users_stats")
],
[
types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_users")
]
])
await callback.message.edit_text(
text,
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard)
)
await callback.answer()
@admin_required
@error_handler
async def show_users_list_by_campaign(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext,
page: int = 1
):
await state.set_state(AdminStates.viewing_user_from_campaign_list)
user_service = UserService()
users_data = await user_service.get_users_by_campaign_page(
db,
page=page,
limit=10,
)
users = users_data.get("users", [])
campaign_map = users_data.get("campaigns", {})
if not users:
await callback.message.edit_text(
"📢 Пользователи с кампанией не найдены",
reply_markup=get_admin_users_keyboard(db_user.language)
)
await callback.answer()
return
text = f"👥 <b>Пользователи по кампании регистрации</b> (стр. {page}/{users_data['total_pages']})\n\n"
text += "Нажмите на пользователя для управления:"
keyboard = []
for user in users:
info = campaign_map.get(user.id, {})
campaign_name = info.get("campaign_name") or "Без кампании"
registered_at = info.get("registered_at")
registered_display = format_datetime(registered_at) if registered_at else "неизвестно"
status_emoji = "" if user.status == UserStatus.ACTIVE.value else "🚫" if user.status == UserStatus.BLOCKED.value else "🗑️"
button_text = (
f"{status_emoji} {user.full_name}"
f" | 📢 {campaign_name}"
f" | 📅 {registered_display}"
)
keyboard.append([
types.InlineKeyboardButton(
text=button_text,
callback_data=f"admin_user_manage_{user.id}"
)
])
if users_data["total_pages"] > 1:
pagination_row = get_admin_pagination_keyboard(
users_data["current_page"],
users_data["total_pages"],
"admin_users_campaign_list",
"admin_users",
db_user.language
).inline_keyboard[0]
keyboard.append(pagination_row)
keyboard.extend([
[
types.InlineKeyboardButton(text="🔍 Поиск", callback_data="admin_users_search"),
types.InlineKeyboardButton(text="📊 Статистика", callback_data="admin_users_stats")
],
[
types.InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_users")
]
])
await callback.message.edit_text(
text,
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=keyboard)
)
await callback.answer()
@admin_required
@error_handler
async def handle_users_list_pagination_fixed(
@@ -323,6 +781,91 @@ async def handle_users_balance_list_pagination(
await show_users_list_by_balance(callback, db_user, db, state, 1)
@admin_required
@error_handler
async def handle_users_traffic_list_pagination(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext
):
try:
callback_parts = callback.data.split('_')
page = int(callback_parts[-1])
await show_users_list_by_traffic(callback, db_user, db, state, page)
except (ValueError, IndexError) as e:
logger.error(f"Ошибка парсинга номера страницы: {e}")
await show_users_list_by_traffic(callback, db_user, db, state, 1)
@admin_required
@error_handler
async def handle_users_activity_list_pagination(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext
):
try:
callback_parts = callback.data.split('_')
page = int(callback_parts[-1])
await show_users_list_by_last_activity(callback, db_user, db, state, page)
except (ValueError, IndexError) as e:
logger.error(f"Ошибка парсинга номера страницы: {e}")
await show_users_list_by_last_activity(callback, db_user, db, state, 1)
@admin_required
@error_handler
async def handle_users_spending_list_pagination(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext
):
try:
callback_parts = callback.data.split('_')
page = int(callback_parts[-1])
await show_users_list_by_spending(callback, db_user, db, state, page)
except (ValueError, IndexError) as e:
logger.error(f"Ошибка парсинга номера страницы: {e}")
await show_users_list_by_spending(callback, db_user, db, state, 1)
@admin_required
@error_handler
async def handle_users_purchases_list_pagination(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext
):
try:
callback_parts = callback.data.split('_')
page = int(callback_parts[-1])
await show_users_list_by_purchases(callback, db_user, db, state, page)
except (ValueError, IndexError) as e:
logger.error(f"Ошибка парсинга номера страницы: {e}")
await show_users_list_by_purchases(callback, db_user, db, state, 1)
@admin_required
@error_handler
async def handle_users_campaign_list_pagination(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext
):
try:
callback_parts = callback.data.split('_')
page = int(callback_parts[-1])
await show_users_list_by_campaign(callback, db_user, db, state, page)
except (ValueError, IndexError) as e:
logger.error(f"Ошибка парсинга номера страницы: {e}")
await show_users_list_by_campaign(callback, db_user, db, state, 1)
@admin_required
@error_handler
async def start_user_search(
@@ -867,6 +1410,16 @@ async def show_user_management(
current_state = await state.get_state()
if current_state == AdminStates.viewing_user_from_balance_list:
back_callback = "admin_users_balance_filter"
elif current_state == AdminStates.viewing_user_from_traffic_list:
back_callback = "admin_users_traffic_filter"
elif current_state == AdminStates.viewing_user_from_last_activity_list:
back_callback = "admin_users_activity_filter"
elif current_state == AdminStates.viewing_user_from_spending_list:
back_callback = "admin_users_spending_filter"
elif current_state == AdminStates.viewing_user_from_purchases_list:
back_callback = "admin_users_purchases_filter"
elif current_state == AdminStates.viewing_user_from_campaign_list:
back_callback = "admin_users_campaign_filter"
# Базовая клавиатура профиля
kb = get_user_management_keyboard(user.id, user.status, db_user.language, back_callback)
@@ -3317,6 +3870,31 @@ def register_handlers(dp: Dispatcher):
F.data.startswith("admin_users_balance_list_page_")
)
dp.callback_query.register(
handle_users_traffic_list_pagination,
F.data.startswith("admin_users_traffic_list_page_")
)
dp.callback_query.register(
handle_users_activity_list_pagination,
F.data.startswith("admin_users_activity_list_page_")
)
dp.callback_query.register(
handle_users_spending_list_pagination,
F.data.startswith("admin_users_spending_list_page_")
)
dp.callback_query.register(
handle_users_purchases_list_pagination,
F.data.startswith("admin_users_purchases_list_page_")
)
dp.callback_query.register(
handle_users_campaign_list_pagination,
F.data.startswith("admin_users_campaign_list_page_")
)
dp.callback_query.register(
start_user_search,
F.data == "admin_users_search"
@@ -3522,9 +4100,27 @@ def register_handlers(dp: Dispatcher):
)
dp.callback_query.register(
show_users_list_by_balance,
F.data.startswith("admin_users_balance_list_page_")
show_users_list_by_traffic,
F.data == "admin_users_traffic_filter"
)
dp.callback_query.register(
show_users_list_by_last_activity,
F.data == "admin_users_activity_filter"
)
dp.callback_query.register(
show_users_list_by_spending,
F.data == "admin_users_spending_filter"
)
dp.callback_query.register(
show_users_list_by_purchases,
F.data == "admin_users_purchases_filter"
)
dp.callback_query.register(
show_users_list_by_campaign,
F.data == "admin_users_campaign_filter"
)

View File

@@ -175,6 +175,21 @@ def get_admin_users_filters_keyboard(language: str = "ru") -> InlineKeyboardMark
[
InlineKeyboardButton(text="💰 По балансу", callback_data="admin_users_balance_filter")
],
[
InlineKeyboardButton(text="📶 По трафику", callback_data="admin_users_traffic_filter")
],
[
InlineKeyboardButton(text="🕒 По активности", callback_data="admin_users_activity_filter")
],
[
InlineKeyboardButton(text="💳 По сумме трат", callback_data="admin_users_spending_filter")
],
[
InlineKeyboardButton(text="🛒 По количеству покупок", callback_data="admin_users_purchases_filter")
],
[
InlineKeyboardButton(text="📢 По кампании", callback_data="admin_users_campaign_filter")
],
[
InlineKeyboardButton(text="⬅️ Назад", callback_data="admin_users")
]

View File

@@ -2,13 +2,14 @@ import logging
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import delete, select, update
from sqlalchemy import delete, select, update, func
from aiogram import Bot
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError
from app.database.crud.user import (
get_user_by_id, get_user_by_telegram_id, get_users_list,
get_users_count, get_users_statistics, get_inactive_users,
add_user_balance, subtract_user_balance, update_user, delete_user
add_user_balance, subtract_user_balance, update_user, delete_user,
get_users_spending_stats
)
from app.database.crud.promo_group import get_promo_group_by_id
from app.database.crud.transaction import get_user_transactions_count
@@ -18,7 +19,8 @@ from app.database.models import (
ReferralEarning, SubscriptionServer, YooKassaPayment, BroadcastHistory,
CryptoBotPayment, SubscriptionConversion, UserMessage, WelcomeText,
SentNotification, PromoGroup, MulenPayPayment, Pal24Payment,
AdvertisingCampaign, PaymentMethod
AdvertisingCampaign, AdvertisingCampaignRegistration, PaymentMethod,
TransactionType
)
from app.config import settings
@@ -141,20 +143,32 @@ class UserService:
"has_next": False,
"has_prev": False
}
async def get_users_page(
self,
db: AsyncSession,
page: int = 1,
limit: int = 20,
status: Optional[UserStatus] = None,
order_by_balance: bool = False
order_by_balance: bool = False,
order_by_traffic: bool = False,
order_by_last_activity: bool = False,
order_by_total_spent: bool = False,
order_by_purchase_count: bool = False
) -> Dict[str, Any]:
try:
offset = (page - 1) * limit
users = await get_users_list(
db, offset=offset, limit=limit, status=status, order_by_balance=order_by_balance
db,
offset=offset,
limit=limit,
status=status,
order_by_balance=order_by_balance,
order_by_traffic=order_by_traffic,
order_by_last_activity=order_by_last_activity,
order_by_total_spent=order_by_total_spent,
order_by_purchase_count=order_by_purchase_count,
)
total_count = await get_users_count(db, status=status)
@@ -179,7 +193,110 @@ class UserService:
"has_next": False,
"has_prev": False
}
async def get_user_spending_stats_map(
self,
db: AsyncSession,
user_ids: List[int]
) -> Dict[int, Dict[str, int]]:
try:
return await get_users_spending_stats(db, user_ids)
except Exception as e:
logger.error(f"Ошибка получения статистики трат пользователей: {e}")
return {}
async def get_users_by_campaign_page(
self,
db: AsyncSession,
page: int = 1,
limit: int = 20
) -> Dict[str, Any]:
try:
offset = (page - 1) * limit
campaign_ranked = (
select(
AdvertisingCampaignRegistration.user_id.label("user_id"),
AdvertisingCampaignRegistration.campaign_id.label("campaign_id"),
AdvertisingCampaignRegistration.created_at.label("created_at"),
func.row_number()
.over(
partition_by=AdvertisingCampaignRegistration.user_id,
order_by=AdvertisingCampaignRegistration.created_at.desc(),
)
.label("rn"),
)
.cte("campaign_ranked")
)
latest_campaign = (
select(
campaign_ranked.c.user_id,
campaign_ranked.c.campaign_id,
campaign_ranked.c.created_at,
)
.where(campaign_ranked.c.rn == 1)
.subquery()
)
query = (
select(
User,
AdvertisingCampaign.name.label("campaign_name"),
latest_campaign.c.created_at,
)
.join(latest_campaign, latest_campaign.c.user_id == User.id)
.join(
AdvertisingCampaign,
AdvertisingCampaign.id == latest_campaign.c.campaign_id,
)
.order_by(
AdvertisingCampaign.name.asc(),
latest_campaign.c.created_at.desc(),
)
.offset(offset)
.limit(limit)
)
result = await db.execute(query)
rows = result.all()
users = [row[0] for row in rows]
campaign_map = {
row[0].id: {
"campaign_name": row[1],
"registered_at": row[2],
}
for row in rows
}
total_stmt = select(func.count()).select_from(latest_campaign)
total_result = await db.execute(total_stmt)
total_count = total_result.scalar() or 0
total_pages = (total_count + limit - 1) // limit if total_count else 1
return {
"users": users,
"campaigns": campaign_map,
"current_page": page,
"total_pages": total_pages,
"total_count": total_count,
"has_next": page < total_pages,
"has_prev": page > 1,
}
except Exception as e:
logger.error(f"Ошибка получения пользователей по кампаниям: {e}")
return {
"users": [],
"campaigns": {},
"current_page": 1,
"total_pages": 1,
"total_count": 0,
"has_next": False,
"has_prev": False,
}
async def update_user_balance(
self,
db: AsyncSession,

View File

@@ -108,6 +108,11 @@ class AdminStates(StatesGroup):
# Состояния для отслеживания источника перехода
viewing_user_from_balance_list = State()
viewing_user_from_traffic_list = State()
viewing_user_from_last_activity_list = State()
viewing_user_from_spending_list = State()
viewing_user_from_purchases_list = State()
viewing_user_from_campaign_list = State()
class SupportStates(StatesGroup):
waiting_for_message = State()