Merge pull request #1988 from BEDOLAGA-DEV/bedolaga-dtalt2

Add subscription events table to universal migration
This commit is contained in:
Egor
2025-11-23 04:37:53 +03:00
committed by GitHub
8 changed files with 456 additions and 1 deletions

View File

@@ -0,0 +1,70 @@
from __future__ import annotations
from datetime import datetime
from typing import Any, Dict, Iterable, Optional, Tuple
from sqlalchemy import and_, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.models import SubscriptionEvent
async def create_subscription_event(
db: AsyncSession,
*,
user_id: int,
event_type: str,
subscription_id: Optional[int] = None,
transaction_id: Optional[int] = None,
amount_kopeks: Optional[int] = None,
currency: Optional[str] = None,
message: Optional[str] = None,
occurred_at: Optional[datetime] = None,
extra: Optional[Dict[str, Any]] = None,
) -> SubscriptionEvent:
event = SubscriptionEvent(
user_id=user_id,
event_type=event_type,
subscription_id=subscription_id,
transaction_id=transaction_id,
amount_kopeks=amount_kopeks,
currency=currency,
message=message,
occurred_at=occurred_at or datetime.utcnow(),
extra=extra or None,
)
db.add(event)
await db.commit()
await db.refresh(event)
return event
async def list_subscription_events(
db: AsyncSession,
*,
limit: int,
offset: int,
event_types: Optional[Iterable[str]] = None,
user_id: Optional[int] = None,
) -> Tuple[list[SubscriptionEvent], int]:
base_query = select(SubscriptionEvent)
filters = []
if event_types:
filters.append(SubscriptionEvent.event_type.in_(set(event_types)))
if user_id:
filters.append(SubscriptionEvent.user_id == user_id)
if filters:
base_query = base_query.where(and_(*filters))
total_query = base_query.with_only_columns(func.count()).order_by(None)
total = await db.scalar(total_query) or 0
result = await db.execute(
base_query.order_by(SubscriptionEvent.occurred_at.desc())
.offset(offset)
.limit(limit)
)
return result.scalars().all(), int(total)

View File

@@ -1078,6 +1078,30 @@ class SentNotification(Base):
subscription = relationship("Subscription", backref="sent_notifications")
class SubscriptionEvent(Base):
__tablename__ = "subscription_events"
id = Column(Integer, primary_key=True, index=True)
event_type = Column(String(50), nullable=False)
user_id = Column(Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False)
subscription_id = Column(
Integer, ForeignKey("subscriptions.id", ondelete="SET NULL"), nullable=True
)
transaction_id = Column(
Integer, ForeignKey("transactions.id", ondelete="SET NULL"), nullable=True
)
amount_kopeks = Column(Integer, nullable=True)
currency = Column(String(16), nullable=True)
message = Column(Text, nullable=True)
occurred_at = Column(DateTime, nullable=False, default=func.now())
extra = Column(JSON, nullable=True)
created_at = Column(DateTime, default=func.now())
user = relationship("User", backref="subscription_events")
subscription = relationship("Subscription", backref="subscription_events")
transaction = relationship("Transaction", backref="subscription_events")
class DiscountOffer(Base):
__tablename__ = "discount_offers"
__table_args__ = (

View File

@@ -2867,6 +2867,94 @@ async def create_subscription_conversions_table():
logger.error(f"Ошибка создания таблицы subscription_conversions: {e}")
return False
async def create_subscription_events_table():
table_exists = await check_table_exists("subscription_events")
if table_exists:
logger.info("Таблица subscription_events уже существует")
return True
try:
async with engine.begin() as conn:
db_type = await get_database_type()
if db_type == "sqlite":
create_sql = """
CREATE TABLE subscription_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type VARCHAR(50) NOT NULL,
user_id INTEGER NOT NULL,
subscription_id INTEGER NULL,
transaction_id INTEGER NULL,
amount_kopeks INTEGER NULL,
currency VARCHAR(16) NULL,
message TEXT NULL,
occurred_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
extra JSON NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (subscription_id) REFERENCES subscriptions(id) ON DELETE SET NULL,
FOREIGN KEY (transaction_id) REFERENCES transactions(id) ON DELETE SET NULL
);
CREATE INDEX ix_subscription_events_event_type ON subscription_events(event_type);
CREATE INDEX ix_subscription_events_user_id ON subscription_events(user_id);
"""
elif db_type == "postgresql":
create_sql = """
CREATE TABLE subscription_events (
id SERIAL PRIMARY KEY,
event_type VARCHAR(50) NOT NULL,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
subscription_id INTEGER NULL REFERENCES subscriptions(id) ON DELETE SET NULL,
transaction_id INTEGER NULL REFERENCES transactions(id) ON DELETE SET NULL,
amount_kopeks INTEGER NULL,
currency VARCHAR(16) NULL,
message TEXT NULL,
occurred_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
extra JSON NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX ix_subscription_events_event_type ON subscription_events(event_type);
CREATE INDEX ix_subscription_events_user_id ON subscription_events(user_id);
"""
elif db_type == "mysql":
create_sql = """
CREATE TABLE subscription_events (
id INT AUTO_INCREMENT PRIMARY KEY,
event_type VARCHAR(50) NOT NULL,
user_id INT NOT NULL,
subscription_id INT NULL,
transaction_id INT NULL,
amount_kopeks INT NULL,
currency VARCHAR(16) NULL,
message TEXT NULL,
occurred_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
extra JSON NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (subscription_id) REFERENCES subscriptions(id) ON DELETE SET NULL,
FOREIGN KEY (transaction_id) REFERENCES transactions(id) ON DELETE SET NULL
);
CREATE INDEX ix_subscription_events_event_type ON subscription_events(event_type);
CREATE INDEX ix_subscription_events_user_id ON subscription_events(user_id);
"""
else:
logger.error(f"Неподдерживаемый тип БД для создания таблицы subscription_events: {db_type}")
return False
await conn.execute(text(create_sql))
logger.info("✅ Таблица subscription_events успешно создана")
return True
except Exception as e:
logger.error(f"Ошибка создания таблицы subscription_events: {e}")
return False
async def fix_subscription_duplicates_universal():
async with engine.begin() as conn:
db_type = await get_database_type()
@@ -4052,7 +4140,14 @@ async def run_universal_migration():
logger.info("✅ Таблица subscription_conversions готова")
else:
logger.warning("⚠️ Проблемы с таблицей subscription_conversions")
logger.info("=== СОЗДАНИЕ ТАБЛИЦЫ SUBSCRIPTION_EVENTS ===")
events_created = await create_subscription_events_table()
if events_created:
logger.info("✅ Таблица subscription_events готова")
else:
logger.warning("⚠️ Проблемы с таблицей subscription_events")
async with engine.begin() as conn:
total_subs = await conn.execute(text("SELECT COUNT(*) FROM subscriptions"))
unique_users = await conn.execute(text("SELECT COUNT(DISTINCT user_id) FROM subscriptions"))
@@ -4089,6 +4184,7 @@ async def run_universal_migration():
logger.info("✅ CryptoBot таблица готова")
logger.info("✅ Heleket таблица готова")
logger.info("✅ Таблица конверсий подписок создана")
logger.info("✅ Таблица событий подписок создана")
logger.info("✅ Таблица welcome_texts с полем is_enabled готова")
logger.info("✅ Медиа поля в broadcast_history добавлены")
logger.info("✅ Дубликаты подписок исправлены")
@@ -4112,6 +4208,7 @@ async def check_migration_status():
"broadcast_history_media_fields": False,
"subscription_duplicates": False,
"subscription_conversions_table": False,
"subscription_events_table": False,
"promo_groups_table": False,
"server_promo_groups_table": False,
"server_squads_trial_column": False,
@@ -4145,6 +4242,7 @@ async def check_migration_status():
status["privacy_policies_table"] = await check_table_exists('privacy_policies')
status["public_offers_table"] = await check_table_exists('public_offers')
status["subscription_conversions_table"] = await check_table_exists('subscription_conversions')
status["subscription_events_table"] = await check_table_exists('subscription_events')
status["promo_groups_table"] = await check_table_exists('promo_groups')
status["server_promo_groups_table"] = await check_table_exists('server_squad_promo_groups')
status["server_squads_trial_column"] = await check_column_exists('server_squads', 'is_trial_eligible')
@@ -4200,6 +4298,7 @@ async def check_migration_status():
"welcome_texts_is_enabled_column": "Поле is_enabled в welcome_texts",
"broadcast_history_media_fields": "Медиа поля в broadcast_history",
"subscription_conversions_table": "Таблица конверсий подписок",
"subscription_events_table": "Таблица событий подписок",
"subscription_duplicates": "Отсутствие дубликатов подписок",
"promo_groups_table": "Таблица промо-групп",
"server_promo_groups_table": "Связи серверов и промогрупп",

View File

@@ -22,6 +22,7 @@ from .routes import (
pages,
remnawave,
servers,
subscription_events,
stats,
subscriptions,
tickets,
@@ -113,6 +114,13 @@ OPENAPI_TAGS = [
"name": "pages",
"description": "Управление контентом публичных страниц: оферта, политика, FAQ и правила.",
},
{
"name": "notifications",
"description": (
"Получение и просмотр уведомлений о покупках, активациях и продлениях подписок "
"для административной панели."
),
},
]
@@ -167,5 +175,10 @@ def create_web_api_app() -> FastAPI:
app.include_router(miniapp.router, prefix="/miniapp", tags=["miniapp"])
app.include_router(polls.router, prefix="/polls", tags=["polls"])
app.include_router(logs.router, prefix="/logs", tags=["logs"])
app.include_router(
subscription_events.router,
prefix="/notifications/subscriptions",
tags=["notifications"],
)
return app

View File

@@ -10,6 +10,7 @@ from . import (
promo_groups,
servers,
remnawave,
subscription_events,
stats,
subscriptions,
tickets,
@@ -31,6 +32,7 @@ __all__ = [
"promo_groups",
"servers",
"remnawave",
"subscription_events",
"stats",
"subscriptions",
"tickets",

View File

@@ -0,0 +1,123 @@
from __future__ import annotations
from typing import Any, Iterable, Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Security, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.models import Subscription, SubscriptionEvent, Transaction, User
from app.database.crud.subscription_event import (
create_subscription_event,
list_subscription_events,
)
from ..dependencies import get_db_session, require_api_token
from ..schemas.subscription_events import (
SubscriptionEventCreate,
SubscriptionEventListResponse,
SubscriptionEventResponse,
)
router = APIRouter()
async def _get_user_or_error(db: AsyncSession, user_id: int) -> User:
user = await db.get(User, user_id)
if not user:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")
return user
async def _ensure_subscription_exists(
db: AsyncSession, subscription_id: Optional[int]
) -> None:
if not subscription_id:
return
subscription = await db.get(Subscription, subscription_id)
if not subscription:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Subscription not found",
)
async def _ensure_transaction_exists(db: AsyncSession, transaction_id: Optional[int]) -> None:
if not transaction_id:
return
transaction_exists = await db.scalar(
select(Transaction.id).where(Transaction.id == transaction_id)
)
if not transaction_exists:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Transaction not found",
)
def _serialize_event(event: SubscriptionEvent) -> SubscriptionEventResponse:
return SubscriptionEventResponse(
id=event.id,
event_type=event.event_type,
user_id=event.user_id,
subscription_id=event.subscription_id,
transaction_id=event.transaction_id,
amount_kopeks=event.amount_kopeks,
currency=event.currency,
message=event.message,
occurred_at=event.occurred_at,
created_at=event.created_at,
extra=event.extra or {},
)
@router.post("", response_model=SubscriptionEventResponse, status_code=status.HTTP_201_CREATED)
async def receive_subscription_event(
payload: SubscriptionEventCreate,
_: Any = Security(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> SubscriptionEventResponse:
await _get_user_or_error(db, payload.user_id)
await _ensure_subscription_exists(db, payload.subscription_id)
await _ensure_transaction_exists(db, payload.transaction_id)
event = await create_subscription_event(
db,
user_id=payload.user_id,
event_type=payload.event_type,
subscription_id=payload.subscription_id,
transaction_id=payload.transaction_id,
amount_kopeks=payload.amount_kopeks,
currency=payload.currency,
message=payload.message,
occurred_at=payload.occurred_at,
extra=payload.extra or None,
)
return _serialize_event(event)
@router.get("", response_model=SubscriptionEventListResponse)
async def list_subscription_event_logs(
_: Any = Security(require_api_token),
db: AsyncSession = Depends(get_db_session),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
event_types: Optional[Iterable[str]] = Query(default=None, alias="event_type"),
user_id: Optional[int] = Query(default=None),
) -> SubscriptionEventListResponse:
events, total = await list_subscription_events(
db,
limit=limit,
offset=offset,
event_types=event_types,
user_id=user_id,
)
return SubscriptionEventListResponse(
items=[_serialize_event(event) for event in events],
total=total,
limit=limit,
offset=offset,
)

View File

@@ -0,0 +1,47 @@
from __future__ import annotations
from datetime import datetime
from typing import Any, Dict, Literal, Optional
from pydantic import BaseModel, Field, field_validator
class SubscriptionEventCreate(BaseModel):
event_type: Literal["activation", "purchase", "renewal"]
user_id: int = Field(..., ge=1)
subscription_id: Optional[int] = Field(default=None, ge=1)
transaction_id: Optional[int] = Field(default=None, ge=1)
amount_kopeks: Optional[int] = Field(default=None, ge=0)
currency: Optional[str] = Field(default=None, min_length=1, max_length=16)
message: Optional[str] = Field(default=None, max_length=2000)
occurred_at: Optional[datetime] = None
extra: Dict[str, Any] = Field(default_factory=dict)
@field_validator("message")
@classmethod
def _strip_message(cls, value: Optional[str]) -> Optional[str]:
if value is None:
return None
stripped = value.strip()
return stripped or None
class SubscriptionEventResponse(BaseModel):
id: int
event_type: str
user_id: int
subscription_id: Optional[int] = None
transaction_id: Optional[int] = None
amount_kopeks: Optional[int] = None
currency: Optional[str] = None
message: Optional[str] = None
occurred_at: datetime
created_at: datetime
extra: Dict[str, Any] = Field(default_factory=dict)
class SubscriptionEventListResponse(BaseModel):
items: list[SubscriptionEventResponse]
total: int
limit: int
offset: int

View File

@@ -0,0 +1,77 @@
"""Add subscription_events table"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.engine.reflection import Inspector
revision: str = "c2f9c3b5f5c4"
down_revision: Union[str, None] = "9f0f2d5a1c7b"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
TABLE_NAME = "subscription_events"
def _table_exists(inspector: Inspector) -> bool:
return TABLE_NAME in inspector.get_table_names()
def upgrade() -> None:
bind = op.get_bind()
inspector = sa.inspect(bind)
if _table_exists(inspector):
return
op.create_table(
TABLE_NAME,
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("event_type", sa.String(length=50), nullable=False),
sa.Column(
"user_id",
sa.Integer(),
sa.ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"subscription_id",
sa.Integer(),
sa.ForeignKey("subscriptions.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column(
"transaction_id",
sa.Integer(),
sa.ForeignKey("transactions.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column("amount_kopeks", sa.Integer(), nullable=True),
sa.Column("currency", sa.String(length=16), nullable=True),
sa.Column("message", sa.Text(), nullable=True),
sa.Column(
"occurred_at", sa.DateTime(), nullable=False, server_default=sa.func.now()
),
sa.Column("extra", sa.JSON(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False, server_default=sa.func.now()),
)
op.create_index(
"ix_subscription_events_event_type", TABLE_NAME, ["event_type"]
)
op.create_index("ix_subscription_events_user_id", TABLE_NAME, ["user_id"])
def downgrade() -> None:
bind = op.get_bind()
inspector = sa.inspect(bind)
if not _table_exists(inspector):
return
op.drop_index("ix_subscription_events_user_id", table_name=TABLE_NAME)
op.drop_index("ix_subscription_events_event_type", table_name=TABLE_NAME)
op.drop_table(TABLE_NAME)