diff --git a/app/database/crud/subscription_event.py b/app/database/crud/subscription_event.py new file mode 100644 index 00000000..078c1753 --- /dev/null +++ b/app/database/crud/subscription_event.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any, Dict, Iterable, Optional, Tuple + +from sqlalchemy import and_, func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database.models import SubscriptionEvent + + +async def create_subscription_event( + db: AsyncSession, + *, + user_id: int, + event_type: str, + subscription_id: Optional[int] = None, + transaction_id: Optional[int] = None, + amount_kopeks: Optional[int] = None, + currency: Optional[str] = None, + message: Optional[str] = None, + occurred_at: Optional[datetime] = None, + extra: Optional[Dict[str, Any]] = None, +) -> SubscriptionEvent: + event = SubscriptionEvent( + user_id=user_id, + event_type=event_type, + subscription_id=subscription_id, + transaction_id=transaction_id, + amount_kopeks=amount_kopeks, + currency=currency, + message=message, + occurred_at=occurred_at or datetime.utcnow(), + extra=extra or None, + ) + db.add(event) + await db.commit() + await db.refresh(event) + return event + + +async def list_subscription_events( + db: AsyncSession, + *, + limit: int, + offset: int, + event_types: Optional[Iterable[str]] = None, + user_id: Optional[int] = None, +) -> Tuple[list[SubscriptionEvent], int]: + base_query = select(SubscriptionEvent) + filters = [] + + if event_types: + filters.append(SubscriptionEvent.event_type.in_(set(event_types))) + if user_id: + filters.append(SubscriptionEvent.user_id == user_id) + + if filters: + base_query = base_query.where(and_(*filters)) + + total_query = base_query.with_only_columns(func.count()).order_by(None) + total = await db.scalar(total_query) or 0 + + result = await db.execute( + base_query.order_by(SubscriptionEvent.occurred_at.desc()) + .offset(offset) + .limit(limit) + ) + + return result.scalars().all(), int(total) diff --git a/app/database/models.py b/app/database/models.py index 09695965..bdf8e544 100644 --- a/app/database/models.py +++ b/app/database/models.py @@ -1078,6 +1078,30 @@ class SentNotification(Base): subscription = relationship("Subscription", backref="sent_notifications") +class SubscriptionEvent(Base): + __tablename__ = "subscription_events" + + id = Column(Integer, primary_key=True, index=True) + event_type = Column(String(50), nullable=False) + user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False) + subscription_id = Column( + Integer, ForeignKey("subscriptions.id", ondelete="SET NULL"), nullable=True + ) + transaction_id = Column( + Integer, ForeignKey("transactions.id", ondelete="SET NULL"), nullable=True + ) + amount_kopeks = Column(Integer, nullable=True) + currency = Column(String(16), nullable=True) + message = Column(Text, nullable=True) + occurred_at = Column(DateTime, nullable=False, default=func.now()) + extra = Column(JSON, nullable=True) + created_at = Column(DateTime, default=func.now()) + + user = relationship("User", backref="subscription_events") + subscription = relationship("Subscription", backref="subscription_events") + transaction = relationship("Transaction", backref="subscription_events") + + class DiscountOffer(Base): __tablename__ = "discount_offers" __table_args__ = ( diff --git a/app/database/universal_migration.py b/app/database/universal_migration.py index 50f05d5b..e0caeb11 100644 --- a/app/database/universal_migration.py +++ b/app/database/universal_migration.py @@ -2867,6 +2867,94 @@ async def create_subscription_conversions_table(): logger.error(f"Ошибка создания таблицы subscription_conversions: {e}") return False + +async def create_subscription_events_table(): + table_exists = await check_table_exists("subscription_events") + if table_exists: + logger.info("Таблица subscription_events уже существует") + return True + + try: + async with engine.begin() as conn: + db_type = await get_database_type() + + if db_type == "sqlite": + create_sql = """ + CREATE TABLE subscription_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_type VARCHAR(50) NOT NULL, + user_id INTEGER NOT NULL, + subscription_id INTEGER NULL, + transaction_id INTEGER NULL, + amount_kopeks INTEGER NULL, + currency VARCHAR(16) NULL, + message TEXT NULL, + occurred_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + extra JSON NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, + FOREIGN KEY (subscription_id) REFERENCES subscriptions(id) ON DELETE SET NULL, + FOREIGN KEY (transaction_id) REFERENCES transactions(id) ON DELETE SET NULL + ); + + CREATE INDEX ix_subscription_events_event_type ON subscription_events(event_type); + CREATE INDEX ix_subscription_events_user_id ON subscription_events(user_id); + """ + + elif db_type == "postgresql": + create_sql = """ + CREATE TABLE subscription_events ( + id SERIAL PRIMARY KEY, + event_type VARCHAR(50) NOT NULL, + user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + subscription_id INTEGER NULL REFERENCES subscriptions(id) ON DELETE SET NULL, + transaction_id INTEGER NULL REFERENCES transactions(id) ON DELETE SET NULL, + amount_kopeks INTEGER NULL, + currency VARCHAR(16) NULL, + message TEXT NULL, + occurred_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + extra JSON NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + + CREATE INDEX ix_subscription_events_event_type ON subscription_events(event_type); + CREATE INDEX ix_subscription_events_user_id ON subscription_events(user_id); + """ + + elif db_type == "mysql": + create_sql = """ + CREATE TABLE subscription_events ( + id INT AUTO_INCREMENT PRIMARY KEY, + event_type VARCHAR(50) NOT NULL, + user_id INT NOT NULL, + subscription_id INT NULL, + transaction_id INT NULL, + amount_kopeks INT NULL, + currency VARCHAR(16) NULL, + message TEXT NULL, + occurred_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + extra JSON NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, + FOREIGN KEY (subscription_id) REFERENCES subscriptions(id) ON DELETE SET NULL, + FOREIGN KEY (transaction_id) REFERENCES transactions(id) ON DELETE SET NULL + ); + + CREATE INDEX ix_subscription_events_event_type ON subscription_events(event_type); + CREATE INDEX ix_subscription_events_user_id ON subscription_events(user_id); + """ + else: + logger.error(f"Неподдерживаемый тип БД для создания таблицы subscription_events: {db_type}") + return False + + await conn.execute(text(create_sql)) + logger.info("✅ Таблица subscription_events успешно создана") + return True + + except Exception as e: + logger.error(f"Ошибка создания таблицы subscription_events: {e}") + return False + async def fix_subscription_duplicates_universal(): async with engine.begin() as conn: db_type = await get_database_type() @@ -4052,7 +4140,14 @@ async def run_universal_migration(): logger.info("✅ Таблица subscription_conversions готова") else: logger.warning("⚠️ Проблемы с таблицей subscription_conversions") - + + logger.info("=== СОЗДАНИЕ ТАБЛИЦЫ SUBSCRIPTION_EVENTS ===") + events_created = await create_subscription_events_table() + if events_created: + logger.info("✅ Таблица subscription_events готова") + else: + logger.warning("⚠️ Проблемы с таблицей subscription_events") + async with engine.begin() as conn: total_subs = await conn.execute(text("SELECT COUNT(*) FROM subscriptions")) unique_users = await conn.execute(text("SELECT COUNT(DISTINCT user_id) FROM subscriptions")) @@ -4089,6 +4184,7 @@ async def run_universal_migration(): logger.info("✅ CryptoBot таблица готова") logger.info("✅ Heleket таблица готова") logger.info("✅ Таблица конверсий подписок создана") + logger.info("✅ Таблица событий подписок создана") logger.info("✅ Таблица welcome_texts с полем is_enabled готова") logger.info("✅ Медиа поля в broadcast_history добавлены") logger.info("✅ Дубликаты подписок исправлены") @@ -4112,6 +4208,7 @@ async def check_migration_status(): "broadcast_history_media_fields": False, "subscription_duplicates": False, "subscription_conversions_table": False, + "subscription_events_table": False, "promo_groups_table": False, "server_promo_groups_table": False, "server_squads_trial_column": False, @@ -4145,6 +4242,7 @@ async def check_migration_status(): status["privacy_policies_table"] = await check_table_exists('privacy_policies') status["public_offers_table"] = await check_table_exists('public_offers') status["subscription_conversions_table"] = await check_table_exists('subscription_conversions') + status["subscription_events_table"] = await check_table_exists('subscription_events') status["promo_groups_table"] = await check_table_exists('promo_groups') status["server_promo_groups_table"] = await check_table_exists('server_squad_promo_groups') status["server_squads_trial_column"] = await check_column_exists('server_squads', 'is_trial_eligible') @@ -4200,6 +4298,7 @@ async def check_migration_status(): "welcome_texts_is_enabled_column": "Поле is_enabled в welcome_texts", "broadcast_history_media_fields": "Медиа поля в broadcast_history", "subscription_conversions_table": "Таблица конверсий подписок", + "subscription_events_table": "Таблица событий подписок", "subscription_duplicates": "Отсутствие дубликатов подписок", "promo_groups_table": "Таблица промо-групп", "server_promo_groups_table": "Связи серверов и промогрупп", diff --git a/app/webapi/app.py b/app/webapi/app.py index f922bcd7..b881bf03 100644 --- a/app/webapi/app.py +++ b/app/webapi/app.py @@ -22,6 +22,7 @@ from .routes import ( pages, remnawave, servers, + subscription_events, stats, subscriptions, tickets, @@ -113,6 +114,13 @@ OPENAPI_TAGS = [ "name": "pages", "description": "Управление контентом публичных страниц: оферта, политика, FAQ и правила.", }, + { + "name": "notifications", + "description": ( + "Получение и просмотр уведомлений о покупках, активациях и продлениях подписок " + "для административной панели." + ), + }, ] @@ -167,5 +175,10 @@ def create_web_api_app() -> FastAPI: app.include_router(miniapp.router, prefix="/miniapp", tags=["miniapp"]) app.include_router(polls.router, prefix="/polls", tags=["polls"]) app.include_router(logs.router, prefix="/logs", tags=["logs"]) + app.include_router( + subscription_events.router, + prefix="/notifications/subscriptions", + tags=["notifications"], + ) return app diff --git a/app/webapi/routes/__init__.py b/app/webapi/routes/__init__.py index a648a92c..b11bf586 100644 --- a/app/webapi/routes/__init__.py +++ b/app/webapi/routes/__init__.py @@ -10,6 +10,7 @@ from . import ( promo_groups, servers, remnawave, + subscription_events, stats, subscriptions, tickets, @@ -31,6 +32,7 @@ __all__ = [ "promo_groups", "servers", "remnawave", + "subscription_events", "stats", "subscriptions", "tickets", diff --git a/app/webapi/routes/subscription_events.py b/app/webapi/routes/subscription_events.py new file mode 100644 index 00000000..002512d1 --- /dev/null +++ b/app/webapi/routes/subscription_events.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +from typing import Any, Iterable, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query, Security, status +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database.models import Subscription, SubscriptionEvent, Transaction, User +from app.database.crud.subscription_event import ( + create_subscription_event, + list_subscription_events, +) +from ..dependencies import get_db_session, require_api_token +from ..schemas.subscription_events import ( + SubscriptionEventCreate, + SubscriptionEventListResponse, + SubscriptionEventResponse, +) + +router = APIRouter() + + +async def _get_user_or_error(db: AsyncSession, user_id: int) -> User: + user = await db.get(User, user_id) + if not user: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found") + return user + + +async def _ensure_subscription_exists( + db: AsyncSession, subscription_id: Optional[int] +) -> None: + if not subscription_id: + return + + subscription = await db.get(Subscription, subscription_id) + if not subscription: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Subscription not found", + ) + + +async def _ensure_transaction_exists(db: AsyncSession, transaction_id: Optional[int]) -> None: + if not transaction_id: + return + + transaction_exists = await db.scalar( + select(Transaction.id).where(Transaction.id == transaction_id) + ) + if not transaction_exists: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Transaction not found", + ) + + +def _serialize_event(event: SubscriptionEvent) -> SubscriptionEventResponse: + return SubscriptionEventResponse( + id=event.id, + event_type=event.event_type, + user_id=event.user_id, + subscription_id=event.subscription_id, + transaction_id=event.transaction_id, + amount_kopeks=event.amount_kopeks, + currency=event.currency, + message=event.message, + occurred_at=event.occurred_at, + created_at=event.created_at, + extra=event.extra or {}, + ) + + +@router.post("", response_model=SubscriptionEventResponse, status_code=status.HTTP_201_CREATED) +async def receive_subscription_event( + payload: SubscriptionEventCreate, + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> SubscriptionEventResponse: + await _get_user_or_error(db, payload.user_id) + await _ensure_subscription_exists(db, payload.subscription_id) + await _ensure_transaction_exists(db, payload.transaction_id) + + event = await create_subscription_event( + db, + user_id=payload.user_id, + event_type=payload.event_type, + subscription_id=payload.subscription_id, + transaction_id=payload.transaction_id, + amount_kopeks=payload.amount_kopeks, + currency=payload.currency, + message=payload.message, + occurred_at=payload.occurred_at, + extra=payload.extra or None, + ) + + return _serialize_event(event) + + +@router.get("", response_model=SubscriptionEventListResponse) +async def list_subscription_event_logs( + _: Any = Security(require_api_token), + db: AsyncSession = Depends(get_db_session), + limit: int = Query(50, ge=1, le=200), + offset: int = Query(0, ge=0), + event_types: Optional[Iterable[str]] = Query(default=None, alias="event_type"), + user_id: Optional[int] = Query(default=None), +) -> SubscriptionEventListResponse: + events, total = await list_subscription_events( + db, + limit=limit, + offset=offset, + event_types=event_types, + user_id=user_id, + ) + + return SubscriptionEventListResponse( + items=[_serialize_event(event) for event in events], + total=total, + limit=limit, + offset=offset, + ) diff --git a/app/webapi/schemas/subscription_events.py b/app/webapi/schemas/subscription_events.py new file mode 100644 index 00000000..24e5b0b4 --- /dev/null +++ b/app/webapi/schemas/subscription_events.py @@ -0,0 +1,47 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any, Dict, Literal, Optional + +from pydantic import BaseModel, Field, field_validator + + +class SubscriptionEventCreate(BaseModel): + event_type: Literal["activation", "purchase", "renewal"] + user_id: int = Field(..., ge=1) + subscription_id: Optional[int] = Field(default=None, ge=1) + transaction_id: Optional[int] = Field(default=None, ge=1) + amount_kopeks: Optional[int] = Field(default=None, ge=0) + currency: Optional[str] = Field(default=None, min_length=1, max_length=16) + message: Optional[str] = Field(default=None, max_length=2000) + occurred_at: Optional[datetime] = None + extra: Dict[str, Any] = Field(default_factory=dict) + + @field_validator("message") + @classmethod + def _strip_message(cls, value: Optional[str]) -> Optional[str]: + if value is None: + return None + stripped = value.strip() + return stripped or None + + +class SubscriptionEventResponse(BaseModel): + id: int + event_type: str + user_id: int + subscription_id: Optional[int] = None + transaction_id: Optional[int] = None + amount_kopeks: Optional[int] = None + currency: Optional[str] = None + message: Optional[str] = None + occurred_at: datetime + created_at: datetime + extra: Dict[str, Any] = Field(default_factory=dict) + + +class SubscriptionEventListResponse(BaseModel): + items: list[SubscriptionEventResponse] + total: int + limit: int + offset: int diff --git a/migrations/alembic/versions/c2f9c3b5f5c4_add_subscription_events_table.py b/migrations/alembic/versions/c2f9c3b5f5c4_add_subscription_events_table.py new file mode 100644 index 00000000..0d460035 --- /dev/null +++ b/migrations/alembic/versions/c2f9c3b5f5c4_add_subscription_events_table.py @@ -0,0 +1,77 @@ +"""Add subscription_events table""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.engine.reflection import Inspector + + +revision: str = "c2f9c3b5f5c4" +down_revision: Union[str, None] = "9f0f2d5a1c7b" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +TABLE_NAME = "subscription_events" + + +def _table_exists(inspector: Inspector) -> bool: + return TABLE_NAME in inspector.get_table_names() + + +def upgrade() -> None: + bind = op.get_bind() + inspector = sa.inspect(bind) + + if _table_exists(inspector): + return + + op.create_table( + TABLE_NAME, + sa.Column("id", sa.Integer(), primary_key=True), + sa.Column("event_type", sa.String(length=50), nullable=False), + sa.Column( + "user_id", + sa.Integer(), + sa.ForeignKey("users.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column( + "subscription_id", + sa.Integer(), + sa.ForeignKey("subscriptions.id", ondelete="SET NULL"), + nullable=True, + ), + sa.Column( + "transaction_id", + sa.Integer(), + sa.ForeignKey("transactions.id", ondelete="SET NULL"), + nullable=True, + ), + sa.Column("amount_kopeks", sa.Integer(), nullable=True), + sa.Column("currency", sa.String(length=16), nullable=True), + sa.Column("message", sa.Text(), nullable=True), + sa.Column( + "occurred_at", sa.DateTime(), nullable=False, server_default=sa.func.now() + ), + sa.Column("extra", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=False, server_default=sa.func.now()), + ) + + op.create_index( + "ix_subscription_events_event_type", TABLE_NAME, ["event_type"] + ) + op.create_index("ix_subscription_events_user_id", TABLE_NAME, ["user_id"]) + + +def downgrade() -> None: + bind = op.get_bind() + inspector = sa.inspect(bind) + + if not _table_exists(inspector): + return + + op.drop_index("ix_subscription_events_user_id", table_name=TABLE_NAME) + op.drop_index("ix_subscription_events_event_type", table_name=TABLE_NAME) + op.drop_table(TABLE_NAME)