env to json

This commit is contained in:
firewookie
2025-09-28 13:34:43 +05:00
parent 7e20140a00
commit ffa51986a8
8 changed files with 1066 additions and 24 deletions

View File

@@ -30,13 +30,23 @@ logger = logging.getLogger(__name__)
async def get_promo_groups_with_counts(
db: AsyncSession,
*,
offset: int = 0,
limit: Optional[int] = None,
) -> List[Tuple[PromoGroup, int]]:
result = await db.execute(
query = (
select(PromoGroup, func.count(User.id))
.outerjoin(User, User.promo_group_id == PromoGroup.id)
.group_by(PromoGroup.id)
.order_by(PromoGroup.is_default.desc(), PromoGroup.name)
)
if offset:
query = query.offset(offset)
if limit is not None:
query = query.limit(limit)
result = await db.execute(query)
return result.all()
@@ -44,6 +54,11 @@ async def get_promo_group_by_id(db: AsyncSession, group_id: int) -> Optional[Pro
return await db.get(PromoGroup, group_id)
async def count_promo_groups(db: AsyncSession) -> int:
result = await db.execute(select(func.count(PromoGroup.id)))
return int(result.scalar_one())
async def get_default_promo_group(db: AsyncSession) -> Optional[PromoGroup]:
result = await db.execute(
select(PromoGroup).where(PromoGroup.is_default.is_(True))
@@ -61,6 +76,7 @@ async def create_promo_group(
period_discounts: Optional[Dict[int, int]] = None,
auto_assign_total_spent_kopeks: Optional[int] = None,
apply_discounts_to_addons: bool = True,
is_default: bool = False,
) -> PromoGroup:
normalized_period_discounts = _normalize_period_discounts(period_discounts)
@@ -70,6 +86,9 @@ async def create_promo_group(
else None
)
existing_default = await get_default_promo_group(db)
should_be_default = existing_default is None or is_default
promo_group = PromoGroup(
name=name.strip(),
server_discount_percent=max(0, min(100, server_discount_percent)),
@@ -78,16 +97,26 @@ async def create_promo_group(
period_discounts=normalized_period_discounts or None,
auto_assign_total_spent_kopeks=auto_assign_total_spent_kopeks,
apply_discounts_to_addons=bool(apply_discounts_to_addons),
is_default=False,
is_default=should_be_default,
)
db.add(promo_group)
await db.flush()
if should_be_default and existing_default and existing_default.id != promo_group.id:
await db.execute(
update(PromoGroup)
.where(PromoGroup.id != promo_group.id)
.values(is_default=False)
)
await db.commit()
await db.refresh(promo_group)
logger.info(
"Создана промогруппа '%s' с скидками (servers=%s%%, traffic=%s%%, devices=%s%%, periods=%s) и порогом автоприсвоения %s₽, скидки на доп. услуги: %s",
"Создана промогруппа '%s' (default=%s) с скидками (servers=%s%%, traffic=%s%%, devices=%s%%, periods=%s) и порогом автоприсвоения %s₽, скидки на доп. услуги: %s",
promo_group.name,
promo_group.is_default,
promo_group.server_discount_percent,
promo_group.traffic_discount_percent,
promo_group.device_discount_percent,
@@ -110,6 +139,7 @@ async def update_promo_group(
period_discounts: Optional[Dict[int, int]] = None,
auto_assign_total_spent_kopeks: Optional[int] = None,
apply_discounts_to_addons: Optional[bool] = None,
is_default: Optional[bool] = None,
) -> PromoGroup:
if name is not None:
group.name = name.strip()
@@ -127,6 +157,36 @@ async def update_promo_group(
if apply_discounts_to_addons is not None:
group.apply_discounts_to_addons = bool(apply_discounts_to_addons)
if is_default is not None:
if is_default:
group.is_default = True
await db.flush()
await db.execute(
update(PromoGroup)
.where(PromoGroup.id != group.id)
.values(is_default=False)
)
else:
if group.is_default:
group.is_default = False
await db.flush()
replacement = await db.execute(
select(PromoGroup)
.where(PromoGroup.id != group.id)
.order_by(PromoGroup.id)
.limit(1)
)
new_default = replacement.scalars().first()
if new_default:
await db.execute(
update(PromoGroup)
.where(PromoGroup.id == new_default.id)
.values(is_default=True)
)
else:
# Не допускаем состояния без базовой промогруппы
group.is_default = True
await db.commit()
await db.refresh(group)

View File

@@ -0,0 +1,279 @@
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from aiogram import Bot
from aiogram.types import InlineKeyboardMarkup
from app.database.database import AsyncSessionLocal
from app.database.models import BroadcastHistory
from app.handlers.admin.messages import (
create_broadcast_keyboard,
get_custom_users,
get_target_users,
)
logger = logging.getLogger(__name__)
VALID_MEDIA_TYPES = {"photo", "video", "document"}
@dataclass(slots=True)
class BroadcastMediaConfig:
type: str
file_id: str
caption: Optional[str] = None
@dataclass(slots=True)
class BroadcastConfig:
target: str
message_text: str
selected_buttons: list[str]
media: Optional[BroadcastMediaConfig] = None
initiator_name: Optional[str] = None
@dataclass(slots=True)
class _BroadcastTask:
task: asyncio.Task
cancel_event: asyncio.Event
class BroadcastService:
"""Handles broadcast execution triggered from the admin web API."""
def __init__(self) -> None:
self._bot: Optional[Bot] = None
self._tasks: dict[int, _BroadcastTask] = {}
self._lock = asyncio.Lock()
def set_bot(self, bot: Bot) -> None:
self._bot = bot
def is_running(self, broadcast_id: int) -> bool:
task_entry = self._tasks.get(broadcast_id)
return bool(task_entry and not task_entry.task.done())
async def start_broadcast(self, broadcast_id: int, config: BroadcastConfig) -> None:
if self._bot is None:
logger.error("Невозможно запустить рассылку %s: бот не инициализирован", broadcast_id)
await self._mark_failed(broadcast_id)
return
cancel_event = asyncio.Event()
async with self._lock:
if broadcast_id in self._tasks and not self._tasks[broadcast_id].task.done():
logger.warning("Рассылка %s уже запущена", broadcast_id)
return
task = asyncio.create_task(
self._run_broadcast(broadcast_id, config, cancel_event),
name=f"broadcast-{broadcast_id}",
)
self._tasks[broadcast_id] = _BroadcastTask(task=task, cancel_event=cancel_event)
task.add_done_callback(lambda _: self._tasks.pop(broadcast_id, None))
async def request_stop(self, broadcast_id: int) -> bool:
async with self._lock:
task_entry = self._tasks.get(broadcast_id)
if not task_entry:
return False
task_entry.cancel_event.set()
return True
async def _run_broadcast(
self,
broadcast_id: int,
config: BroadcastConfig,
cancel_event: asyncio.Event,
) -> None:
sent_count = 0
failed_count = 0
try:
if cancel_event.is_set():
await self._mark_cancelled(broadcast_id, sent_count, failed_count)
return
async with AsyncSessionLocal() as session:
broadcast = await session.get(BroadcastHistory, broadcast_id)
if not broadcast:
logger.error("Запись рассылки %s не найдена в БД", broadcast_id)
return
broadcast.status = "in_progress"
broadcast.sent_count = 0
broadcast.failed_count = 0
await session.commit()
recipients = await self._fetch_recipients(config.target)
async with AsyncSessionLocal() as session:
broadcast = await session.get(BroadcastHistory, broadcast_id)
if not broadcast:
logger.error("Запись рассылки %s удалена до запуска", broadcast_id)
return
broadcast.total_count = len(recipients)
await session.commit()
if cancel_event.is_set():
await self._mark_cancelled(broadcast_id, sent_count, failed_count)
return
if not recipients:
logger.info("Рассылка %s: получатели не найдены", broadcast_id)
await self._mark_finished(broadcast_id, sent_count, failed_count, cancelled=False)
return
keyboard = self._build_keyboard(config.selected_buttons)
for index, user in enumerate(recipients, start=1):
if cancel_event.is_set():
await self._mark_cancelled(broadcast_id, sent_count, failed_count)
return
telegram_id = getattr(user, "telegram_id", None)
if telegram_id is None:
failed_count += 1
continue
try:
await self._deliver_message(telegram_id, config, keyboard)
sent_count += 1
except Exception as exc: # noqa: BLE001
failed_count += 1
logger.error(
"Ошибка отправки рассылки %s пользователю %s: %s",
broadcast_id,
telegram_id,
exc,
)
if index % 20 == 0:
await asyncio.sleep(1)
await self._mark_finished(broadcast_id, sent_count, failed_count, cancelled=False)
except asyncio.CancelledError:
await self._mark_cancelled(broadcast_id, sent_count, failed_count)
raise
except Exception as exc: # noqa: BLE001
logger.exception("Критическая ошибка при выполнении рассылки %s: %s", broadcast_id, exc)
await self._mark_failed(broadcast_id, sent_count, failed_count)
async def _fetch_recipients(self, target: str):
async with AsyncSessionLocal() as session:
if target.startswith("custom_"):
criteria = target[len("custom_"):]
return await get_custom_users(session, criteria)
return await get_target_users(session, target)
def _build_keyboard(self, selected_buttons: Optional[list[str]]) -> Optional[InlineKeyboardMarkup]:
if selected_buttons is None:
selected_buttons = []
return create_broadcast_keyboard(selected_buttons)
async def _deliver_message(
self,
telegram_id: int,
config: BroadcastConfig,
keyboard: Optional[InlineKeyboardMarkup],
) -> None:
if not self._bot:
raise RuntimeError("Телеграм-бот не инициализирован")
if config.media and config.media.type in VALID_MEDIA_TYPES:
caption = config.media.caption or config.message_text
if config.media.type == "photo":
await self._bot.send_photo(
chat_id=telegram_id,
photo=config.media.file_id,
caption=caption,
reply_markup=keyboard,
)
elif config.media.type == "video":
await self._bot.send_video(
chat_id=telegram_id,
video=config.media.file_id,
caption=caption,
reply_markup=keyboard,
)
elif config.media.type == "document":
await self._bot.send_document(
chat_id=telegram_id,
document=config.media.file_id,
caption=caption,
reply_markup=keyboard,
)
return
await self._bot.send_message(
chat_id=telegram_id,
text=config.message_text,
reply_markup=keyboard,
)
async def _mark_finished(
self,
broadcast_id: int,
sent_count: int,
failed_count: int,
*,
cancelled: bool,
) -> None:
async with AsyncSessionLocal() as session:
broadcast = await session.get(BroadcastHistory, broadcast_id)
if not broadcast:
return
broadcast.sent_count = sent_count
broadcast.failed_count = failed_count
broadcast.status = "cancelled" if cancelled else (
"completed" if failed_count == 0 else "partial"
)
broadcast.completed_at = datetime.utcnow()
await session.commit()
async def _mark_cancelled(
self,
broadcast_id: int,
sent_count: int,
failed_count: int,
) -> None:
await self._mark_finished(
broadcast_id,
sent_count,
failed_count,
cancelled=True,
)
async def _mark_failed(
self,
broadcast_id: int,
sent_count: int = 0,
failed_count: int = 0,
) -> None:
async with AsyncSessionLocal() as session:
broadcast = await session.get(BroadcastHistory, broadcast_id)
if not broadcast:
return
broadcast.sent_count = sent_count
broadcast.failed_count = failed_count or broadcast.failed_count
broadcast.status = "failed"
broadcast.completed_at = datetime.utcnow()
await session.commit()
broadcast_service = BroadcastService()

View File

@@ -0,0 +1,149 @@
from __future__ import annotations
from datetime import datetime
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.models import BroadcastHistory
from app.services.broadcast_service import (
BroadcastConfig,
BroadcastMediaConfig,
broadcast_service,
)
from ..dependencies import get_db_session, require_api_token
from ..schemas.broadcasts import (
BroadcastCreateRequest,
BroadcastListResponse,
BroadcastResponse,
)
router = APIRouter()
def _serialize_broadcast(broadcast: BroadcastHistory) -> BroadcastResponse:
return BroadcastResponse(
id=broadcast.id,
target_type=broadcast.target_type,
message_text=broadcast.message_text,
has_media=broadcast.has_media,
media_type=broadcast.media_type,
media_file_id=broadcast.media_file_id,
media_caption=broadcast.media_caption,
total_count=broadcast.total_count,
sent_count=broadcast.sent_count,
failed_count=broadcast.failed_count,
status=broadcast.status,
admin_id=broadcast.admin_id,
admin_name=broadcast.admin_name,
created_at=broadcast.created_at,
completed_at=broadcast.completed_at,
)
@router.post("", response_model=BroadcastResponse, status_code=status.HTTP_201_CREATED)
async def create_broadcast(
payload: BroadcastCreateRequest,
token: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> BroadcastResponse:
message_text = payload.message_text.strip()
if not message_text:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Message text must not be empty")
media_payload = payload.media
broadcast = BroadcastHistory(
target_type=payload.target,
message_text=message_text,
has_media=media_payload is not None,
media_type=media_payload.type if media_payload else None,
media_file_id=media_payload.file_id if media_payload else None,
media_caption=media_payload.caption if media_payload else None,
total_count=0,
sent_count=0,
failed_count=0,
status="queued",
admin_id=None,
admin_name=getattr(token, "name", None) or getattr(token, "created_by", None),
)
db.add(broadcast)
await db.commit()
await db.refresh(broadcast)
media_config = None
if media_payload:
media_config = BroadcastMediaConfig(
type=media_payload.type,
file_id=media_payload.file_id,
caption=media_payload.caption or message_text,
)
config = BroadcastConfig(
target=payload.target,
message_text=message_text,
selected_buttons=payload.selected_buttons,
media=media_config,
initiator_name=getattr(token, "name", None) or getattr(token, "created_by", None),
)
await broadcast_service.start_broadcast(broadcast.id, config)
await db.refresh(broadcast)
return _serialize_broadcast(broadcast)
@router.get("", response_model=BroadcastListResponse)
async def list_broadcasts(
_: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> BroadcastListResponse:
total = await db.scalar(select(func.count(BroadcastHistory.id))) or 0
result = await db.execute(
select(BroadcastHistory)
.order_by(BroadcastHistory.created_at.desc())
.offset(offset)
.limit(limit)
)
broadcasts = result.scalars().all()
return BroadcastListResponse(
items=[_serialize_broadcast(item) for item in broadcasts],
total=int(total),
limit=limit,
offset=offset,
)
@router.post("/{broadcast_id}/stop", response_model=BroadcastResponse)
async def stop_broadcast(
broadcast_id: int,
_: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> BroadcastResponse:
broadcast = await db.get(BroadcastHistory, broadcast_id)
if not broadcast:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Broadcast not found")
if broadcast.status not in {"queued", "in_progress", "cancelling"}:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Broadcast is not running")
is_running = await broadcast_service.request_stop(broadcast_id)
if is_running:
broadcast.status = "cancelling"
else:
broadcast.status = "cancelled"
broadcast.completed_at = datetime.utcnow()
await db.commit()
await db.refresh(broadcast)
return _serialize_broadcast(broadcast)

View File

@@ -2,11 +2,13 @@ from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Response, Security, status
from fastapi import APIRouter, Depends, HTTPException, Query, Response, Security, status
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.crud.promo_group import (
count_promo_group_members,
count_promo_groups,
create_promo_group,
delete_promo_group,
get_promo_group_by_id,
@@ -18,6 +20,7 @@ from app.database.models import PromoGroup
from ..dependencies import get_db_session, require_api_token
from ..schemas.promo_groups import (
PromoGroupCreateRequest,
PromoGroupListResponse,
PromoGroupResponse,
PromoGroupUpdateRequest,
)
@@ -54,13 +57,26 @@ def _serialize(group: PromoGroup, members_count: int = 0) -> PromoGroupResponse:
)
@router.get("", response_model=list[PromoGroupResponse])
@router.get("", response_model=PromoGroupListResponse)
async def list_promo_groups(
_: Any = Security(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> list[PromoGroupResponse]:
groups_with_counts = await get_promo_groups_with_counts(db)
return [_serialize(group, members_count=count) for group, count in groups_with_counts]
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
) -> PromoGroupListResponse:
total = await count_promo_groups(db)
groups_with_counts = await get_promo_groups_with_counts(
db,
offset=offset,
limit=limit,
)
return PromoGroupListResponse(
items=[_serialize(group, members_count=count) for group, count in groups_with_counts],
total=total,
limit=limit,
offset=offset,
)
@router.get("/{group_id}", response_model=PromoGroupResponse)
@@ -83,16 +99,24 @@ async def create_promo_group_endpoint(
_: Any = Security(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> PromoGroupResponse:
group = await create_promo_group(
db,
name=payload.name,
server_discount_percent=payload.server_discount_percent,
traffic_discount_percent=payload.traffic_discount_percent,
try:
group = await create_promo_group(
db,
name=payload.name,
server_discount_percent=payload.server_discount_percent,
traffic_discount_percent=payload.traffic_discount_percent,
device_discount_percent=payload.device_discount_percent,
period_discounts=payload.period_discounts,
auto_assign_total_spent_kopeks=payload.auto_assign_total_spent_kopeks,
apply_discounts_to_addons=payload.apply_discounts_to_addons,
is_default=payload.is_default,
)
except IntegrityError as exc:
await db.rollback()
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
"Promo group with this name already exists",
) from exc
return _serialize(group, members_count=0)
@@ -107,17 +131,25 @@ async def update_promo_group_endpoint(
if not group:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Promo group not found")
group = await update_promo_group(
db,
group,
name=payload.name,
server_discount_percent=payload.server_discount_percent,
traffic_discount_percent=payload.traffic_discount_percent,
try:
group = await update_promo_group(
db,
group,
name=payload.name,
server_discount_percent=payload.server_discount_percent,
traffic_discount_percent=payload.traffic_discount_percent,
device_discount_percent=payload.device_discount_percent,
period_discounts=payload.period_discounts,
auto_assign_total_spent_kopeks=payload.auto_assign_total_spent_kopeks,
apply_discounts_to_addons=payload.apply_discounts_to_addons,
is_default=payload.is_default,
)
except IntegrityError as exc:
await db.rollback()
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
"Promo group with this name already exists",
) from exc
members_count = await count_promo_group_members(db, group_id)
return _serialize(group, members_count=members_count)

View File

@@ -0,0 +1,301 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Response, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.crud.promocode import (
create_promocode,
delete_promocode,
get_promocode_by_code,
get_promocode_statistics,
get_promocodes_count,
get_promocodes_list,
update_promocode,
)
from app.database.models import PromoCode, PromoCodeType, PromoCodeUse
from ..dependencies import get_db_session, require_api_token
from ..schemas.promocodes import (
PromoCodeCreateRequest,
PromoCodeDetailResponse,
PromoCodeListResponse,
PromoCodeRecentUse,
PromoCodeResponse,
PromoCodeUpdateRequest,
)
router = APIRouter()
def _normalize_datetime(value: Optional[datetime]) -> Optional[datetime]:
if value is None:
return None
if value.tzinfo is not None and value.utcoffset() is not None:
return value.astimezone(timezone.utc).replace(tzinfo=None)
if value.tzinfo is not None:
return value.replace(tzinfo=None)
return value
def _serialize_promocode(promocode: PromoCode) -> PromoCodeResponse:
promo_type = PromoCodeType(promocode.type)
return PromoCodeResponse(
id=promocode.id,
code=promocode.code,
type=promo_type,
balance_bonus_kopeks=promocode.balance_bonus_kopeks,
balance_bonus_rubles=round(promocode.balance_bonus_kopeks / 100, 2),
subscription_days=promocode.subscription_days,
max_uses=promocode.max_uses,
current_uses=promocode.current_uses,
uses_left=promocode.uses_left,
is_active=promocode.is_active,
is_valid=promocode.is_valid,
valid_from=promocode.valid_from,
valid_until=promocode.valid_until,
created_by=promocode.created_by,
created_at=promocode.created_at,
updated_at=promocode.updated_at,
)
def _serialize_recent_use(use: PromoCodeUse) -> PromoCodeRecentUse:
return PromoCodeRecentUse(
id=use.id,
user_id=use.user_id,
user_username=getattr(use, "user_username", None),
user_full_name=getattr(use, "user_full_name", None),
user_telegram_id=getattr(use, "user_telegram_id", None),
used_at=use.used_at,
)
def _validate_create_payload(payload: PromoCodeCreateRequest) -> None:
code = payload.code.strip()
if not code:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Code must not be empty")
normalized_valid_from = _normalize_datetime(payload.valid_from)
normalized_valid_until = _normalize_datetime(payload.valid_until)
if payload.type == PromoCodeType.BALANCE and payload.balance_bonus_kopeks <= 0:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Balance bonus must be positive for balance promo codes")
if payload.type in {PromoCodeType.SUBSCRIPTION_DAYS, PromoCodeType.TRIAL_SUBSCRIPTION} and payload.subscription_days <= 0:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Subscription days must be positive for this promo code type")
if normalized_valid_from and normalized_valid_until and normalized_valid_from > normalized_valid_until:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "valid_from cannot be greater than valid_until")
def _validate_update_payload(payload: PromoCodeUpdateRequest, promocode: PromoCode) -> None:
if payload.code is not None and not payload.code.strip():
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Code must not be empty")
if payload.type is not None:
new_type = payload.type
else:
new_type = PromoCodeType(promocode.type)
balance_bonus = (
payload.balance_bonus_kopeks
if payload.balance_bonus_kopeks is not None
else promocode.balance_bonus_kopeks
)
subscription_days = (
payload.subscription_days
if payload.subscription_days is not None
else promocode.subscription_days
)
if new_type == PromoCodeType.BALANCE and balance_bonus <= 0:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Balance bonus must be positive for balance promo codes")
if new_type in {PromoCodeType.SUBSCRIPTION_DAYS, PromoCodeType.TRIAL_SUBSCRIPTION} and subscription_days <= 0:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Subscription days must be positive for this promo code type")
valid_from = (
_normalize_datetime(payload.valid_from)
if payload.valid_from is not None
else promocode.valid_from
)
valid_until = (
_normalize_datetime(payload.valid_until)
if payload.valid_until is not None
else promocode.valid_until
)
if valid_from and valid_until and valid_from > valid_until:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "valid_from cannot be greater than valid_until")
if payload.max_uses is not None and payload.max_uses != 0 and payload.max_uses < promocode.current_uses:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "max_uses cannot be less than current uses")
@router.get("", response_model=PromoCodeListResponse)
async def list_promocodes(
_: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
is_active: Optional[bool] = Query(default=None),
) -> PromoCodeListResponse:
total = await get_promocodes_count(db, is_active=is_active) or 0
promocodes = await get_promocodes_list(db, offset=offset, limit=limit, is_active=is_active)
return PromoCodeListResponse(
items=[_serialize_promocode(promocode) for promocode in promocodes],
total=int(total),
limit=limit,
offset=offset,
)
@router.get("/{promocode_id}", response_model=PromoCodeDetailResponse)
async def get_promocode(
promocode_id: int,
_: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> PromoCodeDetailResponse:
promocode = await db.get(PromoCode, promocode_id)
if not promocode:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Promo code not found")
stats = await get_promocode_statistics(db, promocode_id)
base = _serialize_promocode(promocode)
recent_uses = [
_serialize_recent_use(use)
for use in stats.get("recent_uses", [])
]
return PromoCodeDetailResponse(
**base.dict(),
total_uses=stats.get("total_uses", 0),
today_uses=stats.get("today_uses", 0),
recent_uses=recent_uses,
)
@router.post("", response_model=PromoCodeResponse, status_code=status.HTTP_201_CREATED)
async def create_promocode_endpoint(
payload: PromoCodeCreateRequest,
_: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> PromoCodeResponse:
_validate_create_payload(payload)
normalized_code = payload.code.strip().upper()
normalized_valid_from = _normalize_datetime(payload.valid_from)
normalized_valid_until = _normalize_datetime(payload.valid_until)
existing = await get_promocode_by_code(db, normalized_code)
if existing:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Promo code with this code already exists")
creator_id = (
payload.created_by
if payload.created_by is not None and payload.created_by > 0
else None
)
promocode = await create_promocode(
db,
code=normalized_code,
type=payload.type,
balance_bonus_kopeks=payload.balance_bonus_kopeks,
subscription_days=payload.subscription_days,
max_uses=payload.max_uses,
valid_until=normalized_valid_until,
created_by=creator_id,
)
update_fields = {}
if normalized_valid_from is not None:
update_fields["valid_from"] = normalized_valid_from
if payload.is_active is not None and payload.is_active != promocode.is_active:
update_fields["is_active"] = payload.is_active
if normalized_valid_until is not None:
update_fields["valid_until"] = normalized_valid_until
if update_fields:
promocode = await update_promocode(db, promocode, **update_fields)
return _serialize_promocode(promocode)
@router.patch("/{promocode_id}", response_model=PromoCodeResponse)
async def update_promocode_endpoint(
promocode_id: int,
payload: PromoCodeUpdateRequest,
_: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> PromoCodeResponse:
promocode = await db.get(PromoCode, promocode_id)
if not promocode:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Promo code not found")
_validate_update_payload(payload, promocode)
updates: dict[str, Any] = {}
if payload.code is not None:
normalized_code = payload.code.strip().upper()
if normalized_code != promocode.code:
existing = await get_promocode_by_code(db, normalized_code)
if existing and existing.id != promocode_id:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Promo code with this code already exists")
updates["code"] = normalized_code
if payload.type is not None:
updates["type"] = payload.type.value
if payload.balance_bonus_kopeks is not None:
updates["balance_bonus_kopeks"] = payload.balance_bonus_kopeks
if payload.subscription_days is not None:
updates["subscription_days"] = payload.subscription_days
if payload.max_uses is not None:
updates["max_uses"] = payload.max_uses
if payload.valid_from is not None:
updates["valid_from"] = _normalize_datetime(payload.valid_from)
if payload.valid_until is not None:
updates["valid_until"] = _normalize_datetime(payload.valid_until)
if payload.is_active is not None:
updates["is_active"] = payload.is_active
if not updates:
return _serialize_promocode(promocode)
promocode = await update_promocode(db, promocode, **updates)
return _serialize_promocode(promocode)
@router.delete(
"/{promocode_id}",
status_code=status.HTTP_204_NO_CONTENT,
response_class=Response,
)
async def delete_promocode_endpoint(
promocode_id: int,
_: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> Response:
promocode = await db.get(PromoCode, promocode_id)
if not promocode:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Promo code not found")
success = await delete_promocode(db, promocode)
if not success:
raise HTTPException(status.HTTP_400_BAD_REQUEST, "Failed to delete promo code")
return Response(status_code=status.HTTP_204_NO_CONTENT)

View File

@@ -0,0 +1,110 @@
from __future__ import annotations
from datetime import datetime
from typing import ClassVar, Optional
from pydantic import BaseModel, Field, validator
from app.keyboards.admin import BROADCAST_BUTTONS, DEFAULT_BROADCAST_BUTTONS
class BroadcastMedia(BaseModel):
type: str = Field(pattern=r"^(photo|video|document)$")
file_id: str
caption: Optional[str] = None
class BroadcastCreateRequest(BaseModel):
target: str
message_text: str = Field(..., min_length=1, max_length=4000)
selected_buttons: list[str] = Field(
default_factory=lambda: list(DEFAULT_BROADCAST_BUTTONS)
)
media: Optional[BroadcastMedia] = None
_ALLOWED_TARGETS: ClassVar[set[str]] = {
"all",
"active",
"trial",
"no",
"expiring",
"expired",
"active_zero",
"trial_zero",
"zero",
}
_CUSTOM_TARGETS: ClassVar[set[str]] = {
"today",
"week",
"month",
"active_today",
"inactive_week",
"inactive_month",
"referrals",
"direct",
}
_TARGET_ALIASES: ClassVar[dict[str, str]] = {
"no_sub": "no",
}
@validator("target")
def validate_target(cls, value: str) -> str:
normalized = value.strip().lower()
normalized = cls._TARGET_ALIASES.get(normalized, normalized)
if normalized in cls._ALLOWED_TARGETS:
return normalized
if normalized.startswith("custom_"):
criteria = normalized[len("custom_"):]
if criteria in cls._CUSTOM_TARGETS:
return normalized
raise ValueError("Unsupported target value")
@validator("selected_buttons", pre=True)
def validate_selected_buttons(cls, value):
if value is None:
return []
if not isinstance(value, (list, tuple)):
raise TypeError("selected_buttons must be an array")
seen = set()
ordered: list[str] = []
for raw_button in value:
button = str(raw_button).strip()
if not button:
continue
if button not in BROADCAST_BUTTONS:
raise ValueError(f"Unsupported button '{button}'")
if button in seen:
continue
ordered.append(button)
seen.add(button)
return ordered
class BroadcastResponse(BaseModel):
id: int
target_type: str
message_text: str
has_media: bool
media_type: Optional[str] = None
media_file_id: Optional[str] = None
media_caption: Optional[str] = None
total_count: int
sent_count: int
failed_count: int
status: str
admin_id: Optional[int] = None
admin_name: Optional[str] = None
created_at: datetime
completed_at: Optional[datetime] = None
class BroadcastListResponse(BaseModel):
items: list[BroadcastResponse]
total: int
limit: int
offset: int

View File

@@ -3,7 +3,23 @@ from __future__ import annotations
from datetime import datetime
from typing import Dict, Optional
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, validator
def _normalize_period_discounts(value: Optional[Dict[object, object]]) -> Optional[Dict[int, int]]:
if value is None:
return None
normalized: Dict[int, int] = {}
if isinstance(value, dict):
for raw_key, raw_value in value.items():
try:
key = int(raw_key)
normalized[key] = int(raw_value)
except (TypeError, ValueError):
continue
return normalized or None
class PromoGroupResponse(BaseModel):
@@ -21,21 +37,43 @@ class PromoGroupResponse(BaseModel):
updated_at: datetime
class PromoGroupCreateRequest(BaseModel):
class _PromoGroupBase(BaseModel):
period_discounts: Optional[Dict[int, int]] = Field(
default=None,
description=(
"Словарь скидок по длительности подписки. Ключ — количество месяцев, "
"значение — скидка в процентах. Например: {1: 10, 6: 20}."
),
example={1: 10, 6: 20},
)
@validator("period_discounts", pre=True)
def validate_period_discounts(cls, value): # noqa: D401,B902
return _normalize_period_discounts(value)
class PromoGroupCreateRequest(_PromoGroupBase):
name: str
server_discount_percent: int = 0
traffic_discount_percent: int = 0
device_discount_percent: int = 0
period_discounts: Optional[Dict[int, int]] = None
auto_assign_total_spent_kopeks: Optional[int] = None
apply_discounts_to_addons: bool = True
is_default: bool = False
class PromoGroupUpdateRequest(BaseModel):
class PromoGroupUpdateRequest(_PromoGroupBase):
name: Optional[str] = None
server_discount_percent: Optional[int] = None
traffic_discount_percent: Optional[int] = None
device_discount_percent: Optional[int] = None
period_discounts: Optional[Dict[int, int]] = None
auto_assign_total_spent_kopeks: Optional[int] = None
apply_discounts_to_addons: Optional[bool] = None
is_default: Optional[bool] = None
class PromoGroupListResponse(BaseModel):
items: list[PromoGroupResponse]
total: int
limit: int
offset: int

View File

@@ -0,0 +1,73 @@
from __future__ import annotations
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
from app.database.models import PromoCodeType
class PromoCodeResponse(BaseModel):
id: int
code: str
type: PromoCodeType
balance_bonus_kopeks: int
balance_bonus_rubles: float
subscription_days: int
max_uses: int
current_uses: int
uses_left: int
is_active: bool
is_valid: bool
valid_from: datetime
valid_until: Optional[datetime] = None
created_by: Optional[int] = None
created_at: datetime
updated_at: datetime
class PromoCodeListResponse(BaseModel):
items: list[PromoCodeResponse]
total: int
limit: int
offset: int
class PromoCodeCreateRequest(BaseModel):
code: str
type: PromoCodeType
balance_bonus_kopeks: int = 0
subscription_days: int = 0
max_uses: int = Field(default=1, ge=0)
valid_from: Optional[datetime] = None
valid_until: Optional[datetime] = None
is_active: bool = True
created_by: Optional[int] = None
class PromoCodeUpdateRequest(BaseModel):
code: Optional[str] = None
type: Optional[PromoCodeType] = None
balance_bonus_kopeks: Optional[int] = None
subscription_days: Optional[int] = None
max_uses: Optional[int] = Field(default=None, ge=0)
valid_from: Optional[datetime] = None
valid_until: Optional[datetime] = None
is_active: Optional[bool] = None
class PromoCodeRecentUse(BaseModel):
id: int
user_id: int
user_username: Optional[str] = None
user_full_name: Optional[str] = None
user_telegram_id: Optional[int] = None
used_at: datetime
class PromoCodeDetailResponse(PromoCodeResponse):
total_uses: int
today_uses: int
recent_uses: list[PromoCodeRecentUse] = Field(default_factory=list)