From ffa51986a8aeb5e8839d1e741d17c8b0c3b72b7f Mon Sep 17 00:00:00 2001 From: firewookie Date: Sun, 28 Sep 2025 13:34:43 +0500 Subject: [PATCH] env to json --- app/database/crud/promo_group.py | 66 ++++++- app/services/broadcast_service.py | 279 ++++++++++++++++++++++++++ app/webapi/routes/broadcasts.py | 149 ++++++++++++++ app/webapi/routes/promo_groups.py | 64 ++++-- app/webapi/routes/promocodes.py | 301 +++++++++++++++++++++++++++++ app/webapi/schemas/broadcasts.py | 110 +++++++++++ app/webapi/schemas/promo_groups.py | 48 ++++- app/webapi/schemas/promocodes.py | 73 +++++++ 8 files changed, 1066 insertions(+), 24 deletions(-) create mode 100644 app/services/broadcast_service.py create mode 100644 app/webapi/routes/broadcasts.py create mode 100644 app/webapi/routes/promocodes.py create mode 100644 app/webapi/schemas/broadcasts.py create mode 100644 app/webapi/schemas/promocodes.py diff --git a/app/database/crud/promo_group.py b/app/database/crud/promo_group.py index 9296dd48..b6f4f6c1 100644 --- a/app/database/crud/promo_group.py +++ b/app/database/crud/promo_group.py @@ -30,13 +30,23 @@ logger = logging.getLogger(__name__) async def get_promo_groups_with_counts( db: AsyncSession, + *, + offset: int = 0, + limit: Optional[int] = None, ) -> List[Tuple[PromoGroup, int]]: - result = await db.execute( + query = ( select(PromoGroup, func.count(User.id)) .outerjoin(User, User.promo_group_id == PromoGroup.id) .group_by(PromoGroup.id) .order_by(PromoGroup.is_default.desc(), PromoGroup.name) ) + + if offset: + query = query.offset(offset) + if limit is not None: + query = query.limit(limit) + + result = await db.execute(query) return result.all() @@ -44,6 +54,11 @@ async def get_promo_group_by_id(db: AsyncSession, group_id: int) -> Optional[Pro return await db.get(PromoGroup, group_id) +async def count_promo_groups(db: AsyncSession) -> int: + result = await db.execute(select(func.count(PromoGroup.id))) + return int(result.scalar_one()) + + async def get_default_promo_group(db: AsyncSession) -> Optional[PromoGroup]: result = await db.execute( select(PromoGroup).where(PromoGroup.is_default.is_(True)) @@ -61,6 +76,7 @@ async def create_promo_group( period_discounts: Optional[Dict[int, int]] = None, auto_assign_total_spent_kopeks: Optional[int] = None, apply_discounts_to_addons: bool = True, + is_default: bool = False, ) -> PromoGroup: normalized_period_discounts = _normalize_period_discounts(period_discounts) @@ -70,6 +86,9 @@ async def create_promo_group( else None ) + existing_default = await get_default_promo_group(db) + should_be_default = existing_default is None or is_default + promo_group = PromoGroup( name=name.strip(), server_discount_percent=max(0, min(100, server_discount_percent)), @@ -78,16 +97,26 @@ async def create_promo_group( period_discounts=normalized_period_discounts or None, auto_assign_total_spent_kopeks=auto_assign_total_spent_kopeks, apply_discounts_to_addons=bool(apply_discounts_to_addons), - is_default=False, + is_default=should_be_default, ) db.add(promo_group) + await db.flush() + + if should_be_default and existing_default and existing_default.id != promo_group.id: + await db.execute( + update(PromoGroup) + .where(PromoGroup.id != promo_group.id) + .values(is_default=False) + ) + await db.commit() await db.refresh(promo_group) logger.info( - "Создана промогруппа '%s' с скидками (servers=%s%%, traffic=%s%%, devices=%s%%, periods=%s) и порогом автоприсвоения %s₽, скидки на доп. услуги: %s", + "Создана промогруппа '%s' (default=%s) с скидками (servers=%s%%, traffic=%s%%, devices=%s%%, periods=%s) и порогом автоприсвоения %s₽, скидки на доп. услуги: %s", promo_group.name, + promo_group.is_default, promo_group.server_discount_percent, promo_group.traffic_discount_percent, promo_group.device_discount_percent, @@ -110,6 +139,7 @@ async def update_promo_group( period_discounts: Optional[Dict[int, int]] = None, auto_assign_total_spent_kopeks: Optional[int] = None, apply_discounts_to_addons: Optional[bool] = None, + is_default: Optional[bool] = None, ) -> PromoGroup: if name is not None: group.name = name.strip() @@ -127,6 +157,36 @@ async def update_promo_group( if apply_discounts_to_addons is not None: group.apply_discounts_to_addons = bool(apply_discounts_to_addons) + if is_default is not None: + if is_default: + group.is_default = True + await db.flush() + await db.execute( + update(PromoGroup) + .where(PromoGroup.id != group.id) + .values(is_default=False) + ) + else: + if group.is_default: + group.is_default = False + await db.flush() + replacement = await db.execute( + select(PromoGroup) + .where(PromoGroup.id != group.id) + .order_by(PromoGroup.id) + .limit(1) + ) + new_default = replacement.scalars().first() + if new_default: + await db.execute( + update(PromoGroup) + .where(PromoGroup.id == new_default.id) + .values(is_default=True) + ) + else: + # Не допускаем состояния без базовой промогруппы + group.is_default = True + await db.commit() await db.refresh(group) diff --git a/app/services/broadcast_service.py b/app/services/broadcast_service.py new file mode 100644 index 00000000..ece01852 --- /dev/null +++ b/app/services/broadcast_service.py @@ -0,0 +1,279 @@ +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + +from aiogram import Bot +from aiogram.types import InlineKeyboardMarkup + +from app.database.database import AsyncSessionLocal +from app.database.models import BroadcastHistory +from app.handlers.admin.messages import ( + create_broadcast_keyboard, + get_custom_users, + get_target_users, +) + + +logger = logging.getLogger(__name__) + + +VALID_MEDIA_TYPES = {"photo", "video", "document"} + + +@dataclass(slots=True) +class BroadcastMediaConfig: + type: str + file_id: str + caption: Optional[str] = None + + +@dataclass(slots=True) +class BroadcastConfig: + target: str + message_text: str + selected_buttons: list[str] + media: Optional[BroadcastMediaConfig] = None + initiator_name: Optional[str] = None + + +@dataclass(slots=True) +class _BroadcastTask: + task: asyncio.Task + cancel_event: asyncio.Event + + +class BroadcastService: + """Handles broadcast execution triggered from the admin web API.""" + + def __init__(self) -> None: + self._bot: Optional[Bot] = None + self._tasks: dict[int, _BroadcastTask] = {} + self._lock = asyncio.Lock() + + def set_bot(self, bot: Bot) -> None: + self._bot = bot + + def is_running(self, broadcast_id: int) -> bool: + task_entry = self._tasks.get(broadcast_id) + return bool(task_entry and not task_entry.task.done()) + + async def start_broadcast(self, broadcast_id: int, config: BroadcastConfig) -> None: + if self._bot is None: + logger.error("Невозможно запустить рассылку %s: бот не инициализирован", broadcast_id) + await self._mark_failed(broadcast_id) + return + + cancel_event = asyncio.Event() + + async with self._lock: + if broadcast_id in self._tasks and not self._tasks[broadcast_id].task.done(): + logger.warning("Рассылка %s уже запущена", broadcast_id) + return + + task = asyncio.create_task( + self._run_broadcast(broadcast_id, config, cancel_event), + name=f"broadcast-{broadcast_id}", + ) + self._tasks[broadcast_id] = _BroadcastTask(task=task, cancel_event=cancel_event) + task.add_done_callback(lambda _: self._tasks.pop(broadcast_id, None)) + + async def request_stop(self, broadcast_id: int) -> bool: + async with self._lock: + task_entry = self._tasks.get(broadcast_id) + if not task_entry: + return False + + task_entry.cancel_event.set() + return True + + async def _run_broadcast( + self, + broadcast_id: int, + config: BroadcastConfig, + cancel_event: asyncio.Event, + ) -> None: + sent_count = 0 + failed_count = 0 + + try: + if cancel_event.is_set(): + await self._mark_cancelled(broadcast_id, sent_count, failed_count) + return + + async with AsyncSessionLocal() as session: + broadcast = await session.get(BroadcastHistory, broadcast_id) + if not broadcast: + logger.error("Запись рассылки %s не найдена в БД", broadcast_id) + return + + broadcast.status = "in_progress" + broadcast.sent_count = 0 + broadcast.failed_count = 0 + await session.commit() + + recipients = await self._fetch_recipients(config.target) + + async with AsyncSessionLocal() as session: + broadcast = await session.get(BroadcastHistory, broadcast_id) + if not broadcast: + logger.error("Запись рассылки %s удалена до запуска", broadcast_id) + return + + broadcast.total_count = len(recipients) + await session.commit() + + if cancel_event.is_set(): + await self._mark_cancelled(broadcast_id, sent_count, failed_count) + return + + if not recipients: + logger.info("Рассылка %s: получатели не найдены", broadcast_id) + await self._mark_finished(broadcast_id, sent_count, failed_count, cancelled=False) + return + + keyboard = self._build_keyboard(config.selected_buttons) + + for index, user in enumerate(recipients, start=1): + if cancel_event.is_set(): + await self._mark_cancelled(broadcast_id, sent_count, failed_count) + return + + telegram_id = getattr(user, "telegram_id", None) + if telegram_id is None: + failed_count += 1 + continue + + try: + await self._deliver_message(telegram_id, config, keyboard) + sent_count += 1 + except Exception as exc: # noqa: BLE001 + failed_count += 1 + logger.error( + "Ошибка отправки рассылки %s пользователю %s: %s", + broadcast_id, + telegram_id, + exc, + ) + + if index % 20 == 0: + await asyncio.sleep(1) + + await self._mark_finished(broadcast_id, sent_count, failed_count, cancelled=False) + + except asyncio.CancelledError: + await self._mark_cancelled(broadcast_id, sent_count, failed_count) + raise + except Exception as exc: # noqa: BLE001 + logger.exception("Критическая ошибка при выполнении рассылки %s: %s", broadcast_id, exc) + await self._mark_failed(broadcast_id, sent_count, failed_count) + + async def _fetch_recipients(self, target: str): + async with AsyncSessionLocal() as session: + if target.startswith("custom_"): + criteria = target[len("custom_"):] + return await get_custom_users(session, criteria) + return await get_target_users(session, target) + + def _build_keyboard(self, selected_buttons: Optional[list[str]]) -> Optional[InlineKeyboardMarkup]: + if selected_buttons is None: + selected_buttons = [] + return create_broadcast_keyboard(selected_buttons) + + async def _deliver_message( + self, + telegram_id: int, + config: BroadcastConfig, + keyboard: Optional[InlineKeyboardMarkup], + ) -> None: + if not self._bot: + raise RuntimeError("Телеграм-бот не инициализирован") + + if config.media and config.media.type in VALID_MEDIA_TYPES: + caption = config.media.caption or config.message_text + if config.media.type == "photo": + await self._bot.send_photo( + chat_id=telegram_id, + photo=config.media.file_id, + caption=caption, + reply_markup=keyboard, + ) + elif config.media.type == "video": + await self._bot.send_video( + chat_id=telegram_id, + video=config.media.file_id, + caption=caption, + reply_markup=keyboard, + ) + elif config.media.type == "document": + await self._bot.send_document( + chat_id=telegram_id, + document=config.media.file_id, + caption=caption, + reply_markup=keyboard, + ) + return + + await self._bot.send_message( + chat_id=telegram_id, + text=config.message_text, + reply_markup=keyboard, + ) + + async def _mark_finished( + self, + broadcast_id: int, + sent_count: int, + failed_count: int, + *, + cancelled: bool, + ) -> None: + async with AsyncSessionLocal() as session: + broadcast = await session.get(BroadcastHistory, broadcast_id) + if not broadcast: + return + + broadcast.sent_count = sent_count + broadcast.failed_count = failed_count + broadcast.status = "cancelled" if cancelled else ( + "completed" if failed_count == 0 else "partial" + ) + broadcast.completed_at = datetime.utcnow() + await session.commit() + + async def _mark_cancelled( + self, + broadcast_id: int, + sent_count: int, + failed_count: int, + ) -> None: + await self._mark_finished( + broadcast_id, + sent_count, + failed_count, + cancelled=True, + ) + + async def _mark_failed( + self, + broadcast_id: int, + sent_count: int = 0, + failed_count: int = 0, + ) -> None: + async with AsyncSessionLocal() as session: + broadcast = await session.get(BroadcastHistory, broadcast_id) + if not broadcast: + return + + broadcast.sent_count = sent_count + broadcast.failed_count = failed_count or broadcast.failed_count + broadcast.status = "failed" + broadcast.completed_at = datetime.utcnow() + await session.commit() + + +broadcast_service = BroadcastService() + diff --git a/app/webapi/routes/broadcasts.py b/app/webapi/routes/broadcasts.py new file mode 100644 index 00000000..d60a52e3 --- /dev/null +++ b/app/webapi/routes/broadcasts.py @@ -0,0 +1,149 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database.models import BroadcastHistory +from app.services.broadcast_service import ( + BroadcastConfig, + BroadcastMediaConfig, + broadcast_service, +) + +from ..dependencies import get_db_session, require_api_token +from ..schemas.broadcasts import ( + BroadcastCreateRequest, + BroadcastListResponse, + BroadcastResponse, +) + + +router = APIRouter() + + +def _serialize_broadcast(broadcast: BroadcastHistory) -> BroadcastResponse: + return BroadcastResponse( + id=broadcast.id, + target_type=broadcast.target_type, + message_text=broadcast.message_text, + has_media=broadcast.has_media, + media_type=broadcast.media_type, + media_file_id=broadcast.media_file_id, + media_caption=broadcast.media_caption, + total_count=broadcast.total_count, + sent_count=broadcast.sent_count, + failed_count=broadcast.failed_count, + status=broadcast.status, + admin_id=broadcast.admin_id, + admin_name=broadcast.admin_name, + created_at=broadcast.created_at, + completed_at=broadcast.completed_at, + ) + + +@router.post("", response_model=BroadcastResponse, status_code=status.HTTP_201_CREATED) +async def create_broadcast( + payload: BroadcastCreateRequest, + token: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> BroadcastResponse: + message_text = payload.message_text.strip() + if not message_text: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Message text must not be empty") + + media_payload = payload.media + + broadcast = BroadcastHistory( + target_type=payload.target, + message_text=message_text, + has_media=media_payload is not None, + media_type=media_payload.type if media_payload else None, + media_file_id=media_payload.file_id if media_payload else None, + media_caption=media_payload.caption if media_payload else None, + total_count=0, + sent_count=0, + failed_count=0, + status="queued", + admin_id=None, + admin_name=getattr(token, "name", None) or getattr(token, "created_by", None), + ) + db.add(broadcast) + await db.commit() + await db.refresh(broadcast) + + media_config = None + if media_payload: + media_config = BroadcastMediaConfig( + type=media_payload.type, + file_id=media_payload.file_id, + caption=media_payload.caption or message_text, + ) + + config = BroadcastConfig( + target=payload.target, + message_text=message_text, + selected_buttons=payload.selected_buttons, + media=media_config, + initiator_name=getattr(token, "name", None) or getattr(token, "created_by", None), + ) + + await broadcast_service.start_broadcast(broadcast.id, config) + await db.refresh(broadcast) + + return _serialize_broadcast(broadcast) + + +@router.get("", response_model=BroadcastListResponse) +async def list_broadcasts( + _: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), + limit: int = Query(50, ge=1, le=200), + offset: int = Query(0, ge=0), +) -> BroadcastListResponse: + total = await db.scalar(select(func.count(BroadcastHistory.id))) or 0 + + result = await db.execute( + select(BroadcastHistory) + .order_by(BroadcastHistory.created_at.desc()) + .offset(offset) + .limit(limit) + ) + broadcasts = result.scalars().all() + + return BroadcastListResponse( + items=[_serialize_broadcast(item) for item in broadcasts], + total=int(total), + limit=limit, + offset=offset, + ) + + +@router.post("/{broadcast_id}/stop", response_model=BroadcastResponse) +async def stop_broadcast( + broadcast_id: int, + _: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> BroadcastResponse: + broadcast = await db.get(BroadcastHistory, broadcast_id) + if not broadcast: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Broadcast not found") + + if broadcast.status not in {"queued", "in_progress", "cancelling"}: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Broadcast is not running") + + is_running = await broadcast_service.request_stop(broadcast_id) + + if is_running: + broadcast.status = "cancelling" + else: + broadcast.status = "cancelled" + broadcast.completed_at = datetime.utcnow() + + await db.commit() + await db.refresh(broadcast) + + return _serialize_broadcast(broadcast) diff --git a/app/webapi/routes/promo_groups.py b/app/webapi/routes/promo_groups.py index 98ac34bf..cec284d8 100644 --- a/app/webapi/routes/promo_groups.py +++ b/app/webapi/routes/promo_groups.py @@ -2,11 +2,13 @@ from __future__ import annotations from typing import Any -from fastapi import APIRouter, Depends, HTTPException, Response, Security, status +from fastapi import APIRouter, Depends, HTTPException, Query, Response, Security, status +from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from app.database.crud.promo_group import ( count_promo_group_members, + count_promo_groups, create_promo_group, delete_promo_group, get_promo_group_by_id, @@ -18,6 +20,7 @@ from app.database.models import PromoGroup from ..dependencies import get_db_session, require_api_token from ..schemas.promo_groups import ( PromoGroupCreateRequest, + PromoGroupListResponse, PromoGroupResponse, PromoGroupUpdateRequest, ) @@ -54,13 +57,26 @@ def _serialize(group: PromoGroup, members_count: int = 0) -> PromoGroupResponse: ) -@router.get("", response_model=list[PromoGroupResponse]) +@router.get("", response_model=PromoGroupListResponse) async def list_promo_groups( _: Any = Security(require_api_token), db: AsyncSession = Depends(get_db_session), -) -> list[PromoGroupResponse]: - groups_with_counts = await get_promo_groups_with_counts(db) - return [_serialize(group, members_count=count) for group, count in groups_with_counts] + limit: int = Query(50, ge=1, le=200), + offset: int = Query(0, ge=0), +) -> PromoGroupListResponse: + total = await count_promo_groups(db) + groups_with_counts = await get_promo_groups_with_counts( + db, + offset=offset, + limit=limit, + ) + + return PromoGroupListResponse( + items=[_serialize(group, members_count=count) for group, count in groups_with_counts], + total=total, + limit=limit, + offset=offset, + ) @router.get("/{group_id}", response_model=PromoGroupResponse) @@ -83,16 +99,24 @@ async def create_promo_group_endpoint( _: Any = Security(require_api_token), db: AsyncSession = Depends(get_db_session), ) -> PromoGroupResponse: - group = await create_promo_group( - db, - name=payload.name, - server_discount_percent=payload.server_discount_percent, - traffic_discount_percent=payload.traffic_discount_percent, + try: + group = await create_promo_group( + db, + name=payload.name, + server_discount_percent=payload.server_discount_percent, + traffic_discount_percent=payload.traffic_discount_percent, device_discount_percent=payload.device_discount_percent, period_discounts=payload.period_discounts, auto_assign_total_spent_kopeks=payload.auto_assign_total_spent_kopeks, apply_discounts_to_addons=payload.apply_discounts_to_addons, + is_default=payload.is_default, ) + except IntegrityError as exc: + await db.rollback() + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "Promo group with this name already exists", + ) from exc return _serialize(group, members_count=0) @@ -107,17 +131,25 @@ async def update_promo_group_endpoint( if not group: raise HTTPException(status.HTTP_404_NOT_FOUND, "Promo group not found") - group = await update_promo_group( - db, - group, - name=payload.name, - server_discount_percent=payload.server_discount_percent, - traffic_discount_percent=payload.traffic_discount_percent, + try: + group = await update_promo_group( + db, + group, + name=payload.name, + server_discount_percent=payload.server_discount_percent, + traffic_discount_percent=payload.traffic_discount_percent, device_discount_percent=payload.device_discount_percent, period_discounts=payload.period_discounts, auto_assign_total_spent_kopeks=payload.auto_assign_total_spent_kopeks, apply_discounts_to_addons=payload.apply_discounts_to_addons, + is_default=payload.is_default, ) + except IntegrityError as exc: + await db.rollback() + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "Promo group with this name already exists", + ) from exc members_count = await count_promo_group_members(db, group_id) return _serialize(group, members_count=members_count) diff --git a/app/webapi/routes/promocodes.py b/app/webapi/routes/promocodes.py new file mode 100644 index 00000000..a39c70b0 --- /dev/null +++ b/app/webapi/routes/promocodes.py @@ -0,0 +1,301 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query, Response, status +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database.crud.promocode import ( + create_promocode, + delete_promocode, + get_promocode_by_code, + get_promocode_statistics, + get_promocodes_count, + get_promocodes_list, + update_promocode, +) +from app.database.models import PromoCode, PromoCodeType, PromoCodeUse + +from ..dependencies import get_db_session, require_api_token +from ..schemas.promocodes import ( + PromoCodeCreateRequest, + PromoCodeDetailResponse, + PromoCodeListResponse, + PromoCodeRecentUse, + PromoCodeResponse, + PromoCodeUpdateRequest, +) + +router = APIRouter() + + +def _normalize_datetime(value: Optional[datetime]) -> Optional[datetime]: + if value is None: + return None + + if value.tzinfo is not None and value.utcoffset() is not None: + return value.astimezone(timezone.utc).replace(tzinfo=None) + + if value.tzinfo is not None: + return value.replace(tzinfo=None) + + return value + + +def _serialize_promocode(promocode: PromoCode) -> PromoCodeResponse: + promo_type = PromoCodeType(promocode.type) + return PromoCodeResponse( + id=promocode.id, + code=promocode.code, + type=promo_type, + balance_bonus_kopeks=promocode.balance_bonus_kopeks, + balance_bonus_rubles=round(promocode.balance_bonus_kopeks / 100, 2), + subscription_days=promocode.subscription_days, + max_uses=promocode.max_uses, + current_uses=promocode.current_uses, + uses_left=promocode.uses_left, + is_active=promocode.is_active, + is_valid=promocode.is_valid, + valid_from=promocode.valid_from, + valid_until=promocode.valid_until, + created_by=promocode.created_by, + created_at=promocode.created_at, + updated_at=promocode.updated_at, + ) + + +def _serialize_recent_use(use: PromoCodeUse) -> PromoCodeRecentUse: + return PromoCodeRecentUse( + id=use.id, + user_id=use.user_id, + user_username=getattr(use, "user_username", None), + user_full_name=getattr(use, "user_full_name", None), + user_telegram_id=getattr(use, "user_telegram_id", None), + used_at=use.used_at, + ) + + +def _validate_create_payload(payload: PromoCodeCreateRequest) -> None: + code = payload.code.strip() + if not code: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Code must not be empty") + + normalized_valid_from = _normalize_datetime(payload.valid_from) + normalized_valid_until = _normalize_datetime(payload.valid_until) + + if payload.type == PromoCodeType.BALANCE and payload.balance_bonus_kopeks <= 0: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Balance bonus must be positive for balance promo codes") + + if payload.type in {PromoCodeType.SUBSCRIPTION_DAYS, PromoCodeType.TRIAL_SUBSCRIPTION} and payload.subscription_days <= 0: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Subscription days must be positive for this promo code type") + + if normalized_valid_from and normalized_valid_until and normalized_valid_from > normalized_valid_until: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "valid_from cannot be greater than valid_until") + + +def _validate_update_payload(payload: PromoCodeUpdateRequest, promocode: PromoCode) -> None: + if payload.code is not None and not payload.code.strip(): + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Code must not be empty") + + if payload.type is not None: + new_type = payload.type + else: + new_type = PromoCodeType(promocode.type) + + balance_bonus = ( + payload.balance_bonus_kopeks + if payload.balance_bonus_kopeks is not None + else promocode.balance_bonus_kopeks + ) + subscription_days = ( + payload.subscription_days + if payload.subscription_days is not None + else promocode.subscription_days + ) + + if new_type == PromoCodeType.BALANCE and balance_bonus <= 0: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Balance bonus must be positive for balance promo codes") + + if new_type in {PromoCodeType.SUBSCRIPTION_DAYS, PromoCodeType.TRIAL_SUBSCRIPTION} and subscription_days <= 0: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Subscription days must be positive for this promo code type") + + valid_from = ( + _normalize_datetime(payload.valid_from) + if payload.valid_from is not None + else promocode.valid_from + ) + valid_until = ( + _normalize_datetime(payload.valid_until) + if payload.valid_until is not None + else promocode.valid_until + ) + + if valid_from and valid_until and valid_from > valid_until: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "valid_from cannot be greater than valid_until") + + if payload.max_uses is not None and payload.max_uses != 0 and payload.max_uses < promocode.current_uses: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "max_uses cannot be less than current uses") + + +@router.get("", response_model=PromoCodeListResponse) +async def list_promocodes( + _: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), + limit: int = Query(50, ge=1, le=200), + offset: int = Query(0, ge=0), + is_active: Optional[bool] = Query(default=None), +) -> PromoCodeListResponse: + total = await get_promocodes_count(db, is_active=is_active) or 0 + promocodes = await get_promocodes_list(db, offset=offset, limit=limit, is_active=is_active) + + return PromoCodeListResponse( + items=[_serialize_promocode(promocode) for promocode in promocodes], + total=int(total), + limit=limit, + offset=offset, + ) + + +@router.get("/{promocode_id}", response_model=PromoCodeDetailResponse) +async def get_promocode( + promocode_id: int, + _: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> PromoCodeDetailResponse: + promocode = await db.get(PromoCode, promocode_id) + if not promocode: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Promo code not found") + + stats = await get_promocode_statistics(db, promocode_id) + base = _serialize_promocode(promocode) + recent_uses = [ + _serialize_recent_use(use) + for use in stats.get("recent_uses", []) + ] + + return PromoCodeDetailResponse( + **base.dict(), + total_uses=stats.get("total_uses", 0), + today_uses=stats.get("today_uses", 0), + recent_uses=recent_uses, + ) + + +@router.post("", response_model=PromoCodeResponse, status_code=status.HTTP_201_CREATED) +async def create_promocode_endpoint( + payload: PromoCodeCreateRequest, + _: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> PromoCodeResponse: + _validate_create_payload(payload) + + normalized_code = payload.code.strip().upper() + normalized_valid_from = _normalize_datetime(payload.valid_from) + normalized_valid_until = _normalize_datetime(payload.valid_until) + + existing = await get_promocode_by_code(db, normalized_code) + if existing: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Promo code with this code already exists") + + creator_id = ( + payload.created_by + if payload.created_by is not None and payload.created_by > 0 + else None + ) + + promocode = await create_promocode( + db, + code=normalized_code, + type=payload.type, + balance_bonus_kopeks=payload.balance_bonus_kopeks, + subscription_days=payload.subscription_days, + max_uses=payload.max_uses, + valid_until=normalized_valid_until, + created_by=creator_id, + ) + + update_fields = {} + if normalized_valid_from is not None: + update_fields["valid_from"] = normalized_valid_from + if payload.is_active is not None and payload.is_active != promocode.is_active: + update_fields["is_active"] = payload.is_active + if normalized_valid_until is not None: + update_fields["valid_until"] = normalized_valid_until + + if update_fields: + promocode = await update_promocode(db, promocode, **update_fields) + + return _serialize_promocode(promocode) + + +@router.patch("/{promocode_id}", response_model=PromoCodeResponse) +async def update_promocode_endpoint( + promocode_id: int, + payload: PromoCodeUpdateRequest, + _: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> PromoCodeResponse: + promocode = await db.get(PromoCode, promocode_id) + if not promocode: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Promo code not found") + + _validate_update_payload(payload, promocode) + + updates: dict[str, Any] = {} + + if payload.code is not None: + normalized_code = payload.code.strip().upper() + if normalized_code != promocode.code: + existing = await get_promocode_by_code(db, normalized_code) + if existing and existing.id != promocode_id: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Promo code with this code already exists") + updates["code"] = normalized_code + + if payload.type is not None: + updates["type"] = payload.type.value + + if payload.balance_bonus_kopeks is not None: + updates["balance_bonus_kopeks"] = payload.balance_bonus_kopeks + + if payload.subscription_days is not None: + updates["subscription_days"] = payload.subscription_days + + if payload.max_uses is not None: + updates["max_uses"] = payload.max_uses + + if payload.valid_from is not None: + updates["valid_from"] = _normalize_datetime(payload.valid_from) + + if payload.valid_until is not None: + updates["valid_until"] = _normalize_datetime(payload.valid_until) + + if payload.is_active is not None: + updates["is_active"] = payload.is_active + + if not updates: + return _serialize_promocode(promocode) + + promocode = await update_promocode(db, promocode, **updates) + return _serialize_promocode(promocode) + + +@router.delete( + "/{promocode_id}", + status_code=status.HTTP_204_NO_CONTENT, + response_class=Response, +) +async def delete_promocode_endpoint( + promocode_id: int, + _: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> Response: + promocode = await db.get(PromoCode, promocode_id) + if not promocode: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Promo code not found") + + success = await delete_promocode(db, promocode) + if not success: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Failed to delete promo code") + + return Response(status_code=status.HTTP_204_NO_CONTENT) diff --git a/app/webapi/schemas/broadcasts.py b/app/webapi/schemas/broadcasts.py new file mode 100644 index 00000000..bc616f5b --- /dev/null +++ b/app/webapi/schemas/broadcasts.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +from datetime import datetime +from typing import ClassVar, Optional + +from pydantic import BaseModel, Field, validator + +from app.keyboards.admin import BROADCAST_BUTTONS, DEFAULT_BROADCAST_BUTTONS + + +class BroadcastMedia(BaseModel): + type: str = Field(pattern=r"^(photo|video|document)$") + file_id: str + caption: Optional[str] = None + + +class BroadcastCreateRequest(BaseModel): + target: str + message_text: str = Field(..., min_length=1, max_length=4000) + selected_buttons: list[str] = Field( + default_factory=lambda: list(DEFAULT_BROADCAST_BUTTONS) + ) + media: Optional[BroadcastMedia] = None + + _ALLOWED_TARGETS: ClassVar[set[str]] = { + "all", + "active", + "trial", + "no", + "expiring", + "expired", + "active_zero", + "trial_zero", + "zero", + } + _CUSTOM_TARGETS: ClassVar[set[str]] = { + "today", + "week", + "month", + "active_today", + "inactive_week", + "inactive_month", + "referrals", + "direct", + } + _TARGET_ALIASES: ClassVar[dict[str, str]] = { + "no_sub": "no", + } + + @validator("target") + def validate_target(cls, value: str) -> str: + normalized = value.strip().lower() + normalized = cls._TARGET_ALIASES.get(normalized, normalized) + + if normalized in cls._ALLOWED_TARGETS: + return normalized + + if normalized.startswith("custom_"): + criteria = normalized[len("custom_"):] + if criteria in cls._CUSTOM_TARGETS: + return normalized + + raise ValueError("Unsupported target value") + + @validator("selected_buttons", pre=True) + def validate_selected_buttons(cls, value): + if value is None: + return [] + if not isinstance(value, (list, tuple)): + raise TypeError("selected_buttons must be an array") + + seen = set() + ordered: list[str] = [] + for raw_button in value: + button = str(raw_button).strip() + if not button: + continue + if button not in BROADCAST_BUTTONS: + raise ValueError(f"Unsupported button '{button}'") + if button in seen: + continue + ordered.append(button) + seen.add(button) + return ordered + + +class BroadcastResponse(BaseModel): + id: int + target_type: str + message_text: str + has_media: bool + media_type: Optional[str] = None + media_file_id: Optional[str] = None + media_caption: Optional[str] = None + total_count: int + sent_count: int + failed_count: int + status: str + admin_id: Optional[int] = None + admin_name: Optional[str] = None + created_at: datetime + completed_at: Optional[datetime] = None + + +class BroadcastListResponse(BaseModel): + items: list[BroadcastResponse] + total: int + limit: int + offset: int + diff --git a/app/webapi/schemas/promo_groups.py b/app/webapi/schemas/promo_groups.py index af0be4a5..ffd2e68e 100644 --- a/app/webapi/schemas/promo_groups.py +++ b/app/webapi/schemas/promo_groups.py @@ -3,7 +3,23 @@ from __future__ import annotations from datetime import datetime from typing import Dict, Optional -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, validator + + +def _normalize_period_discounts(value: Optional[Dict[object, object]]) -> Optional[Dict[int, int]]: + if value is None: + return None + + normalized: Dict[int, int] = {} + if isinstance(value, dict): + for raw_key, raw_value in value.items(): + try: + key = int(raw_key) + normalized[key] = int(raw_value) + except (TypeError, ValueError): + continue + + return normalized or None class PromoGroupResponse(BaseModel): @@ -21,21 +37,43 @@ class PromoGroupResponse(BaseModel): updated_at: datetime -class PromoGroupCreateRequest(BaseModel): +class _PromoGroupBase(BaseModel): + period_discounts: Optional[Dict[int, int]] = Field( + default=None, + description=( + "Словарь скидок по длительности подписки. Ключ — количество месяцев, " + "значение — скидка в процентах. Например: {1: 10, 6: 20}." + ), + example={1: 10, 6: 20}, + ) + + @validator("period_discounts", pre=True) + def validate_period_discounts(cls, value): # noqa: D401,B902 + return _normalize_period_discounts(value) + + +class PromoGroupCreateRequest(_PromoGroupBase): name: str server_discount_percent: int = 0 traffic_discount_percent: int = 0 device_discount_percent: int = 0 - period_discounts: Optional[Dict[int, int]] = None auto_assign_total_spent_kopeks: Optional[int] = None apply_discounts_to_addons: bool = True + is_default: bool = False -class PromoGroupUpdateRequest(BaseModel): +class PromoGroupUpdateRequest(_PromoGroupBase): name: Optional[str] = None server_discount_percent: Optional[int] = None traffic_discount_percent: Optional[int] = None device_discount_percent: Optional[int] = None - period_discounts: Optional[Dict[int, int]] = None auto_assign_total_spent_kopeks: Optional[int] = None apply_discounts_to_addons: Optional[bool] = None + is_default: Optional[bool] = None + + +class PromoGroupListResponse(BaseModel): + items: list[PromoGroupResponse] + total: int + limit: int + offset: int diff --git a/app/webapi/schemas/promocodes.py b/app/webapi/schemas/promocodes.py new file mode 100644 index 00000000..fdf9ac66 --- /dev/null +++ b/app/webapi/schemas/promocodes.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field + +from app.database.models import PromoCodeType + + +class PromoCodeResponse(BaseModel): + id: int + code: str + type: PromoCodeType + balance_bonus_kopeks: int + balance_bonus_rubles: float + subscription_days: int + max_uses: int + current_uses: int + uses_left: int + is_active: bool + is_valid: bool + valid_from: datetime + valid_until: Optional[datetime] = None + created_by: Optional[int] = None + created_at: datetime + updated_at: datetime + + +class PromoCodeListResponse(BaseModel): + items: list[PromoCodeResponse] + total: int + limit: int + offset: int + + +class PromoCodeCreateRequest(BaseModel): + code: str + type: PromoCodeType + balance_bonus_kopeks: int = 0 + subscription_days: int = 0 + max_uses: int = Field(default=1, ge=0) + valid_from: Optional[datetime] = None + valid_until: Optional[datetime] = None + is_active: bool = True + created_by: Optional[int] = None + + +class PromoCodeUpdateRequest(BaseModel): + code: Optional[str] = None + type: Optional[PromoCodeType] = None + balance_bonus_kopeks: Optional[int] = None + subscription_days: Optional[int] = None + max_uses: Optional[int] = Field(default=None, ge=0) + valid_from: Optional[datetime] = None + valid_until: Optional[datetime] = None + is_active: Optional[bool] = None + + +class PromoCodeRecentUse(BaseModel): + id: int + user_id: int + user_username: Optional[str] = None + user_full_name: Optional[str] = None + user_telegram_id: Optional[int] = None + used_at: datetime + + +class PromoCodeDetailResponse(PromoCodeResponse): + total_uses: int + today_uses: int + recent_uses: list[PromoCodeRecentUse] = Field(default_factory=list) +