Merge pull request #958 from Fr1ngg/revert-957-xzisus-bedolaga/add-promo-offer-display-on-subscription-page

Revert "Add promo offer display to subscription miniapp"
This commit is contained in:
Egor
2025-10-09 05:07:44 +03:00
committed by GitHub
4 changed files with 95 additions and 1492 deletions

View File

@@ -110,31 +110,6 @@ async def list_discount_offers(
return result.scalars().all()
async def list_active_discount_offers_for_user(
db: AsyncSession,
user_id: int,
) -> List[DiscountOffer]:
"""Return active (not yet claimed) offers for a user."""
now = datetime.utcnow()
stmt = (
select(DiscountOffer)
.options(
selectinload(DiscountOffer.user),
selectinload(DiscountOffer.subscription),
)
.where(
DiscountOffer.user_id == user_id,
DiscountOffer.is_active == True, # noqa: E712
DiscountOffer.expires_at > now,
)
.order_by(DiscountOffer.expires_at.asc())
)
result = await db.execute(stmt)
return result.scalars().all()
async def count_discount_offers(
db: AsyncSession,
*,

View File

@@ -1,8 +1,6 @@
from __future__ import annotations
import logging
import re
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Tuple, Union
from fastapi import APIRouter, Depends, HTTPException, status
@@ -10,29 +8,15 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database.crud.discount_offer import (
get_latest_claimed_offer_for_user,
get_offer_by_id,
list_active_discount_offers_for_user,
mark_offer_claimed,
)
from app.database.crud.promo_group import get_auto_assign_promo_groups
from app.database.crud.promo_offer_template import get_promo_offer_template_by_id
from app.database.crud.server_squad import get_server_squad_by_uuid
from app.database.crud.promo_group import get_auto_assign_promo_groups
from app.database.crud.transaction import get_user_total_spent_kopeks
from app.database.crud.user import get_user_by_telegram_id
from app.database.models import (
PromoGroup,
PromoOfferTemplate,
Subscription,
Transaction,
User,
)
from app.database.models import PromoGroup, Subscription, Transaction, User
from app.services.remnawave_service import (
RemnaWaveConfigurationError,
RemnaWaveService,
)
from app.services.promo_offer_service import promo_offer_service
from app.services.subscription_service import SubscriptionService
from app.utils.subscription_utils import get_happ_cryptolink_redirect_link
from app.utils.telegram_webapp import (
@@ -42,13 +26,10 @@ from app.utils.telegram_webapp import (
from ..dependencies import get_db_session
from ..schemas.miniapp import (
MiniAppAutoPromoGroupLevel,
MiniAppConnectedServer,
MiniAppDevice,
MiniAppAutoPromoGroupLevel,
MiniAppPromoGroup,
MiniAppPromoOffer,
MiniAppPromoOfferClaimRequest,
MiniAppPromoOfferClaimResponse,
MiniAppSubscriptionRequest,
MiniAppSubscriptionResponse,
MiniAppSubscriptionUser,
@@ -85,353 +66,6 @@ def _format_limit_label(limit: Optional[int]) -> str:
return f"{limit} GB"
_TEMPLATE_ID_PATTERN = re.compile(r"promo_template_(?P<template_id>\d+)$")
_OFFER_TYPE_ICONS = {
"extend_discount": "💎",
"purchase_discount": "🎯",
"test_access": "🧪",
}
_EFFECT_TYPE_ICONS = {
"percent_discount": "🎁",
"test_access": "🧪",
"balance_bonus": "💰",
}
_DEFAULT_OFFER_ICON = "🎉"
def _extract_template_id(notification_type: Optional[str]) -> Optional[int]:
if not notification_type:
return None
match = _TEMPLATE_ID_PATTERN.match(notification_type)
if not match:
return None
try:
return int(match.group("template_id"))
except (TypeError, ValueError):
return None
def _extract_offer_extra(offer: Any) -> Dict[str, Any]:
extra = getattr(offer, "extra_data", None)
return extra if isinstance(extra, dict) else {}
def _extract_offer_type(offer: Any, template: Optional[PromoOfferTemplate]) -> Optional[str]:
extra = _extract_offer_extra(offer)
offer_type = extra.get("offer_type") if isinstance(extra.get("offer_type"), str) else None
if offer_type:
return offer_type
template_type = getattr(template, "offer_type", None)
return template_type if isinstance(template_type, str) else None
def _normalize_effect_type(effect_type: Optional[str]) -> str:
normalized = (effect_type or "percent_discount").strip().lower()
if normalized == "balance_bonus":
return "percent_discount"
return normalized or "percent_discount"
def _determine_offer_icon(offer_type: Optional[str], effect_type: str) -> str:
if offer_type and offer_type in _OFFER_TYPE_ICONS:
return _OFFER_TYPE_ICONS[offer_type]
if effect_type in _EFFECT_TYPE_ICONS:
return _EFFECT_TYPE_ICONS[effect_type]
return _DEFAULT_OFFER_ICON
def _extract_offer_test_squad_uuids(offer: Any) -> List[str]:
extra = _extract_offer_extra(offer)
raw = extra.get("test_squad_uuids") or extra.get("squads") or []
if isinstance(raw, str):
raw = [raw]
uuids: List[str] = []
try:
for item in raw:
if not item:
continue
uuids.append(str(item))
except TypeError:
return []
return uuids
def _format_offer_message(
template: Optional[PromoOfferTemplate],
offer: Any,
*,
server_name: Optional[str] = None,
) -> Optional[str]:
message_template: Optional[str] = None
if template and isinstance(template.message_text, str):
message_template = template.message_text
else:
extra = _extract_offer_extra(offer)
raw_message = extra.get("message_text") or extra.get("text")
if isinstance(raw_message, str):
message_template = raw_message
if not message_template:
return None
extra = _extract_offer_extra(offer)
discount_percent = getattr(offer, "discount_percent", None)
try:
discount_percent = int(discount_percent)
except (TypeError, ValueError):
discount_percent = None
replacements: Dict[str, Any] = {}
if discount_percent is not None:
replacements.setdefault("discount_percent", discount_percent)
for key in ("valid_hours", "active_discount_hours", "test_duration_hours"):
value = extra.get(key)
if value is None and template is not None:
template_value = getattr(template, key, None)
else:
template_value = None
replacements.setdefault(key, value if value is not None else template_value)
if replacements.get("active_discount_hours") is None and template:
replacements["active_discount_hours"] = getattr(template, "valid_hours", None)
if replacements.get("test_duration_hours") is None and template:
replacements["test_duration_hours"] = getattr(template, "test_duration_hours", None)
if server_name:
replacements.setdefault("server_name", server_name)
for key, value in extra.items():
if (
isinstance(key, str)
and key not in replacements
and isinstance(value, (str, int, float))
):
replacements[key] = value
try:
return message_template.format(**replacements)
except Exception: # pragma: no cover - fallback for malformed templates
return message_template
def _extract_offer_duration_hours(
offer: Any,
template: Optional[PromoOfferTemplate],
effect_type: str,
) -> Optional[int]:
extra = _extract_offer_extra(offer)
if effect_type == "test_access":
source = extra.get("test_duration_hours")
if source is None and template is not None:
source = getattr(template, "test_duration_hours", None)
else:
source = extra.get("active_discount_hours")
if source is None and template is not None:
source = getattr(template, "active_discount_hours", None)
try:
if source is None:
return None
hours = int(float(source))
return hours if hours > 0 else None
except (TypeError, ValueError):
return None
def _format_bonus_label(amount_kopeks: int) -> Optional[str]:
if amount_kopeks <= 0:
return None
try:
return settings.format_price(amount_kopeks)
except Exception: # pragma: no cover - defensive
return f"{amount_kopeks / 100:.2f}"
async def _build_promo_offer_models(
db: AsyncSession,
available_offers: List[Any],
active_offer: Optional[Any],
*,
user: User,
active_discount_percent: int,
active_discount_expires_at: Optional[datetime],
) -> List[MiniAppPromoOffer]:
promo_offers: List[MiniAppPromoOffer] = []
template_cache: Dict[int, Optional[PromoOfferTemplate]] = {}
candidates: List[Any] = [offer for offer in available_offers if offer]
if active_offer:
candidates.append(active_offer)
squad_map: Dict[str, MiniAppConnectedServer] = {}
if candidates:
all_uuids: List[str] = []
for offer in candidates:
all_uuids.extend(_extract_offer_test_squad_uuids(offer))
if all_uuids:
unique = list(dict.fromkeys(all_uuids))
resolved = await _resolve_connected_servers(db, unique)
squad_map = {server.uuid: server for server in resolved}
async def get_template(template_id: Optional[int]) -> Optional[PromoOfferTemplate]:
if not template_id:
return None
if template_id not in template_cache:
template_cache[template_id] = await get_promo_offer_template_by_id(db, template_id)
return template_cache[template_id]
def build_test_squads(offer: Any) -> List[MiniAppConnectedServer]:
test_squads: List[MiniAppConnectedServer] = []
for uuid in _extract_offer_test_squad_uuids(offer):
resolved = squad_map.get(uuid)
if resolved:
test_squads.append(
MiniAppConnectedServer(uuid=resolved.uuid, name=resolved.name)
)
else:
test_squads.append(MiniAppConnectedServer(uuid=uuid, name=uuid))
return test_squads
def resolve_title(
offer: Any,
template: Optional[PromoOfferTemplate],
offer_type: Optional[str],
) -> Optional[str]:
extra = _extract_offer_extra(offer)
if isinstance(extra.get("title"), str) and extra["title"].strip():
return extra["title"].strip()
if template and template.name:
return template.name
if offer_type:
return offer_type.replace("_", " ").title()
return None
for offer in available_offers:
template_id = _extract_template_id(getattr(offer, "notification_type", None))
template = await get_template(template_id)
effect_type = _normalize_effect_type(getattr(offer, "effect_type", None))
offer_type = _extract_offer_type(offer, template)
test_squads = build_test_squads(offer)
server_name = test_squads[0].name if test_squads else None
message_text = _format_offer_message(template, offer, server_name=server_name)
bonus_label = _format_bonus_label(int(getattr(offer, "bonus_amount_kopeks", 0) or 0))
discount_percent = getattr(offer, "discount_percent", 0)
try:
discount_percent = int(discount_percent)
except (TypeError, ValueError):
discount_percent = 0
extra = _extract_offer_extra(offer)
button_text = None
if isinstance(extra.get("button_text"), str) and extra["button_text"].strip():
button_text = extra["button_text"].strip()
elif template and isinstance(template.button_text, str):
button_text = template.button_text
promo_offers.append(
MiniAppPromoOffer(
id=int(getattr(offer, "id", 0) or 0),
status="pending",
notification_type=getattr(offer, "notification_type", None),
offer_type=offer_type,
effect_type=effect_type,
discount_percent=max(0, discount_percent),
bonus_amount_kopeks=int(getattr(offer, "bonus_amount_kopeks", 0) or 0),
bonus_amount_label=bonus_label,
expires_at=getattr(offer, "expires_at", None),
claimed_at=getattr(offer, "claimed_at", None),
is_active=bool(getattr(offer, "is_active", False)),
template_id=template_id,
template_name=getattr(template, "name", None),
button_text=button_text,
title=resolve_title(offer, template, offer_type),
message_text=message_text,
icon=_determine_offer_icon(offer_type, effect_type),
test_squads=test_squads,
)
)
active_offer_record = active_offer
if active_offer_record:
template_id = _extract_template_id(getattr(active_offer_record, "notification_type", None))
template = await get_template(template_id)
effect_type = _normalize_effect_type(getattr(active_offer_record, "effect_type", None))
offer_type = _extract_offer_type(active_offer_record, template)
show_active = False
discount_value = active_discount_percent
if discount_value > 0:
show_active = True
elif effect_type == "test_access":
show_active = True
if show_active:
test_squads = build_test_squads(active_offer_record)
server_name = test_squads[0].name if test_squads else None
message_text = _format_offer_message(template, active_offer_record, server_name=server_name)
bonus_label = _format_bonus_label(int(getattr(active_offer_record, "bonus_amount_kopeks", 0) or 0))
started_at = getattr(active_offer_record, "claimed_at", None)
expires_at = active_discount_expires_at
duration_seconds: Optional[int] = None
duration_hours = _extract_offer_duration_hours(active_offer_record, template, effect_type)
if expires_at is None and duration_hours and started_at:
expires_at = started_at + timedelta(hours=duration_hours)
if expires_at and started_at:
try:
duration_seconds = int((expires_at - started_at).total_seconds())
except Exception: # pragma: no cover - defensive
duration_seconds = None
if discount_value <= 0:
try:
discount_value = int(getattr(active_offer_record, "discount_percent", 0) or 0)
except (TypeError, ValueError):
discount_value = 0
extra = _extract_offer_extra(active_offer_record)
button_text = None
if isinstance(extra.get("button_text"), str) and extra["button_text"].strip():
button_text = extra["button_text"].strip()
elif template and isinstance(template.button_text, str):
button_text = template.button_text
promo_offers.insert(
0,
MiniAppPromoOffer(
id=int(getattr(active_offer_record, "id", 0) or 0),
status="active",
notification_type=getattr(active_offer_record, "notification_type", None),
offer_type=offer_type,
effect_type=effect_type,
discount_percent=max(0, discount_value),
bonus_amount_kopeks=int(getattr(active_offer_record, "bonus_amount_kopeks", 0) or 0),
bonus_amount_label=bonus_label,
expires_at=getattr(active_offer_record, "expires_at", None),
claimed_at=started_at,
is_active=False,
template_id=template_id,
template_name=getattr(template, "name", None),
button_text=button_text,
title=resolve_title(active_offer_record, template, offer_type),
message_text=message_text,
icon=_determine_offer_icon(offer_type, effect_type),
test_squads=test_squads,
active_discount_expires_at=expires_at,
active_discount_started_at=started_at,
active_discount_duration_seconds=duration_seconds,
),
)
return promo_offers
def _bytes_to_gb(bytes_value: Optional[int]) -> float:
if not bytes_value:
return 0.0
@@ -727,38 +361,6 @@ async def get_subscription_details(
)
)
active_discount_percent = 0
try:
active_discount_percent = int(getattr(user, "promo_offer_discount_percent", 0) or 0)
except (TypeError, ValueError):
active_discount_percent = 0
active_discount_expires_at = getattr(user, "promo_offer_discount_expires_at", None)
now = datetime.utcnow()
if active_discount_expires_at and active_discount_expires_at <= now:
active_discount_expires_at = None
active_discount_percent = 0
available_promo_offers = await list_active_discount_offers_for_user(db, user.id)
promo_offer_source = getattr(user, "promo_offer_discount_source", None)
active_claimed_offer = None
if promo_offer_source or active_discount_percent > 0:
active_claimed_offer = await get_latest_claimed_offer_for_user(
db,
user.id,
promo_offer_source,
)
promo_offers = await _build_promo_offer_models(
db,
available_promo_offers,
active_claimed_offer,
user=user,
active_discount_percent=active_discount_percent,
active_discount_expires_at=active_discount_expires_at,
)
response_user = MiniAppSubscriptionUser(
telegram_id=user.telegram_id,
username=user.username,
@@ -785,9 +387,6 @@ async def get_subscription_details(
traffic_limit_label=_format_limit_label(traffic_limit),
lifetime_used_traffic_gb=lifetime_used,
has_active_subscription=status_actual in {"active", "trial"},
promo_offer_discount_percent=active_discount_percent,
promo_offer_discount_expires_at=active_discount_expires_at,
promo_offer_discount_source=promo_offer_source,
)
return MiniAppSubscriptionResponse(
@@ -811,7 +410,6 @@ async def get_subscription_details(
balance_rubles=round(user.balance_rubles, 2),
balance_currency=balance_currency,
transactions=[_serialize_transaction(tx) for tx in transactions],
promo_offers=promo_offers,
promo_group=(
MiniAppPromoGroup(
id=promo_group.id,
@@ -830,151 +428,6 @@ async def get_subscription_details(
branding=settings.get_miniapp_branding(),
)
@router.post(
"/promo-offers/{offer_id}/claim",
response_model=MiniAppPromoOfferClaimResponse,
)
async def claim_promo_offer(
offer_id: int,
payload: MiniAppPromoOfferClaimRequest,
db: AsyncSession = Depends(get_db_session),
) -> MiniAppPromoOfferClaimResponse:
try:
webapp_data = parse_webapp_init_data(payload.init_data, settings.BOT_TOKEN)
except TelegramWebAppAuthError as error:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail={"code": "unauthorized", "message": str(error)},
) from error
telegram_user = webapp_data.get("user")
if not isinstance(telegram_user, dict) or "id" not in telegram_user:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
detail={"code": "invalid_user", "message": "Invalid Telegram user payload"},
)
try:
telegram_id = int(telegram_user["id"])
except (TypeError, ValueError):
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
detail={"code": "invalid_user", "message": "Invalid Telegram user identifier"},
) from None
user = await get_user_by_telegram_id(db, telegram_id)
if not user:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
detail={"code": "user_not_found", "message": "User not found"},
)
offer = await get_offer_by_id(db, offer_id)
if not offer or offer.user_id != user.id:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
detail={"code": "offer_not_found", "message": "Offer not found"},
)
now = datetime.utcnow()
if offer.claimed_at is not None:
raise HTTPException(
status.HTTP_409_CONFLICT,
detail={"code": "already_claimed", "message": "Offer already claimed"},
)
if not offer.is_active or offer.expires_at <= now:
offer.is_active = False
await db.commit()
raise HTTPException(
status.HTTP_410_GONE,
detail={"code": "offer_expired", "message": "Offer expired"},
)
effect_type = _normalize_effect_type(getattr(offer, "effect_type", None))
if effect_type == "test_access":
success, newly_added, expires_at, error_code = await promo_offer_service.grant_test_access(
db,
user,
offer,
)
if not success:
code = error_code or "claim_failed"
message_map = {
"subscription_missing": "Active subscription required",
"squads_missing": "No squads configured for test access",
"already_connected": "Servers already connected",
"remnawave_sync_failed": "Failed to apply servers",
}
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
detail={"code": code, "message": message_map.get(code, "Unable to activate offer")},
)
await mark_offer_claimed(
db,
offer,
details={
"context": "test_access_claim",
"new_squads": newly_added,
"expires_at": expires_at.isoformat() if expires_at else None,
},
)
return MiniAppPromoOfferClaimResponse(success=True, code="test_access_claimed")
discount_percent = int(getattr(offer, "discount_percent", 0) or 0)
if discount_percent <= 0:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
detail={"code": "invalid_discount", "message": "Offer does not contain discount"},
)
user.promo_offer_discount_percent = discount_percent
user.promo_offer_discount_source = offer.notification_type
user.updated_at = now
extra_data = _extract_offer_extra(offer)
raw_duration = extra_data.get("active_discount_hours")
template_id = extra_data.get("template_id")
if raw_duration in (None, "") and template_id:
try:
template = await get_promo_offer_template_by_id(db, int(template_id))
except (TypeError, ValueError):
template = None
if template and template.active_discount_hours:
raw_duration = template.active_discount_hours
else:
template = None
try:
duration_hours = int(raw_duration) if raw_duration is not None else None
except (TypeError, ValueError):
duration_hours = None
if duration_hours and duration_hours > 0:
discount_expires_at = now + timedelta(hours=duration_hours)
else:
discount_expires_at = None
user.promo_offer_discount_expires_at = discount_expires_at
await mark_offer_claimed(
db,
offer,
details={
"context": "discount_claim",
"discount_percent": discount_percent,
"discount_expires_at": discount_expires_at.isoformat() if discount_expires_at else None,
},
)
await db.refresh(user)
return MiniAppPromoOfferClaimResponse(success=True, code="discount_claimed")
def _safe_int(value: Any) -> int:
try:
return int(value)

View File

@@ -34,9 +34,6 @@ class MiniAppSubscriptionUser(BaseModel):
traffic_limit_label: str
lifetime_used_traffic_gb: float = 0.0
has_active_subscription: bool = False
promo_offer_discount_percent: int = 0
promo_offer_discount_expires_at: Optional[datetime] = None
promo_offer_discount_source: Optional[str] = None
class MiniAppPromoGroup(BaseModel):
@@ -90,39 +87,6 @@ class MiniAppTransaction(BaseModel):
completed_at: Optional[datetime] = None
class MiniAppPromoOffer(BaseModel):
id: int
status: str
notification_type: Optional[str] = None
offer_type: Optional[str] = None
effect_type: Optional[str] = None
discount_percent: int = 0
bonus_amount_kopeks: int = 0
bonus_amount_label: Optional[str] = None
expires_at: Optional[datetime] = None
claimed_at: Optional[datetime] = None
is_active: bool = False
template_id: Optional[int] = None
template_name: Optional[str] = None
button_text: Optional[str] = None
title: Optional[str] = None
message_text: Optional[str] = None
icon: Optional[str] = None
test_squads: List[MiniAppConnectedServer] = Field(default_factory=list)
active_discount_expires_at: Optional[datetime] = None
active_discount_started_at: Optional[datetime] = None
active_discount_duration_seconds: Optional[int] = None
class MiniAppPromoOfferClaimRequest(BaseModel):
init_data: str = Field(..., alias="initData")
class MiniAppPromoOfferClaimResponse(BaseModel):
success: bool = True
code: Optional[str] = None
class MiniAppSubscriptionResponse(BaseModel):
success: bool = True
subscription_id: int
@@ -145,7 +109,6 @@ class MiniAppSubscriptionResponse(BaseModel):
balance_rubles: float = 0.0
balance_currency: Optional[str] = None
transactions: List[MiniAppTransaction] = Field(default_factory=list)
promo_offers: List[MiniAppPromoOffer] = Field(default_factory=list)
promo_group: Optional[MiniAppPromoGroup] = None
auto_assign_promo_groups: List[MiniAppAutoPromoGroupLevel] = Field(default_factory=list)
total_spent_kopeks: int = 0

File diff suppressed because it is too large Load Diff