Files
Egor 830e64afe0 Dev (#2899)
* fix: устранить MissingGreenlet в автоплатежах и починить traceback в логах

- subtract_user_balance: пишем promo_offer_log в отдельной сессии вместо rollback после commit, который экспайрил объекты основной сессии и ломал последующие обращения к subscription/user attrs
- monitoring_service._process_autopayments: перезагружаем subscription с eager-load user/tariff после списания, оборачиваем каждую итерацию в try/except + rollback, чтобы одна ошибка не валила весь батч
- logging_config: новый processor _auto_capture_exc_info автоматически подтягивает traceback из sys.exc_info() или error-kwarg → полный traceback в файле, консоли и Telegram без exc_info=True на каждом вызове
- logging_handler: дублирующая логика захвата exc_info в TelegramNotifierProcessor как резерв

* fix: устранить root cause MissingGreenlet в автоплатежах через refetch по id

Трейс показал: subscription.user падает на lazy-load → pool._checkout →
do_ping → await_ → MissingGreenlet. SQLAlchemy 2.0 async session не
поддерживает sync-lazy-load для relationships. Причина рассинхрона:
lock_user_for_pricing делает populate_existing=True + selectinload(
User.subscriptions).selectinload(Subscription.tariff), что разгружает
Subscription.user backref для сестринских подписок того же user.
Последующее обращение sub.user у другой подписки падает.

Фикс: захватываем (sub_id, user_id) пары ДО цикла, каждую итерацию
делаем fresh refetch через async select с eager load user+tariff+
promo_group. Никаких lazy access в горячем пути. В except используем
локально захваченные id вместо getattr(subscription, ...), чтобы
логирование не падало каскадом на expired объекте.

* fix: grant all available squads for unrestricted trials (#2897)

* feat: add WEBHOOK_IP to allow Telegram bypass DNS lookup for webhook (#2894)

* feat: add WEBHOOK_IP to allow Telegram bypass DNS lookup for webhook

* style: ruff format main.py

---------

Co-authored-by: Dmitry Lunin <br@slack.ru>

* fix: do not update first_name/last_name from OIDC claims (#2892)

Co-authored-by: Dmitry Lunin <br@slack.ru>

* fix: do not reset subscription_crypto_link when cryptoLink absent in webhook (#2891)

Co-authored-by: Dmitry Lunin <br@slack.ru>

* fix: FSM state loss on balance topup, PayPear confirmation_url, hidden trial tariff in renewal

- balance/platega: re-set FSM state after min/max validation errors,
  set state before pending_amount path, use balance_topup callback for back button
- balance/main: set FSM state and payment_method in handle_topup_amount_callback
  for all providers before routing, use balance_topup callback in validation errors
- payment/paypear: fix confirmation_url key (was 'url'), add fallback,
  store charged amount with commission for correct webhook amount comparison
- tariff_purchase: redirect to active tariff list when current tariff is
  inactive (hidden trial after promo code activation)
- cabinet/renewal: check tariff.is_active in both GET and POST endpoints
  to prevent hidden trial tariff periods from appearing

* fix: tariff switch pricing showing free for upgrades, admin duplicate subscription guard

- pricing_engine: use shortest period for daily rate comparison instead
  of period closest to remaining_days — fixes incorrect free/zero cost
  for upgrades when tariffs have different period sets
- pricing_engine: remove unused target_days parameter from
  get_tariff_daily_rate_fraction
- admin_users: add duplicate subscription check before create,
  change_tariff and activate actions to prevent UniqueViolationError
  on uq_subscriptions_user_tariff_active constraint
- admin_users: add IntegrityError fallback on create as TOCTOU safety net

* feat: tariff switch direction control, fix device pricing within tariff limit

Tariff switch direction:
- Add TARIFF_SWITCH_UPGRADE_ENABLED and TARIFF_SWITCH_DOWNGRADE_ENABLED
  settings to control allowed switch directions
- Guard all 10 entry points: instant switch (list, preview, confirm),
  legacy switch (list, select, confirm, daily confirm), cabinet (preview,
  execute), purchase-options API
- Filter tariff lists by allowed direction, show "unavailable" when
  both directions disabled
- Expose settings in cabinet purchase-options response for frontend

Device pricing fix:
- Devices within tariff.device_limit are now free when restoring
  (was charging for all devices regardless of tariff inclusion)
- Fix max(100, price) minimum enforcing 1 RUB even when
  chargeable_devices is 0
- Apply fix across all endpoints: bot handlers (confirm_change,
  execute_change, confirm_add), cabinet API (legacy purchase,
  modern purchase, get-price, save-cart), inline keyboard display

* fix: classic mode renewal resets device_limit to 1 via cart key mismatch

- Fix cart key mismatch: extend cart saved 'device_limit' but
  confirm_purchase read 'devices' key, falling back to DEFAULT=1.
  Now both keys are saved in both cart-save paths
- Fix confirm_purchase device resolution: use explicit is None checks
  instead of or-chain to avoid falsy-zero trap
- Fix return_to_saved_cart display: fall back to 'device_limit' and
  'traffic_limit_gb' keys when 'devices'/'traffic_gb' are absent
- Fix second cart-save path in _extend_existing_subscription with
  same dual-key pattern
- Fix RemnaWaveService import path in renewal service
- Add RESET_DEVICES_ON_RENEWAL setting: resets all connected devices
  (hwid) via RemnaWave API on each subscription renewal

* fix: menu layout schema icon limit, traffic_topup_enabled condition, shadowing imports

- Increase icon max_length from 10 to 100 in all three schemas
  (MenuButtonConfig, ButtonUpdateRequest, AddCustomButtonRequest)
  to support Telegram Custom Emoji IDs
- Add traffic_topup_enabled condition to ButtonConditions schema
- Remove shadowing local imports of MenuLayoutService in
  routes/menu_layout.py (top-level import already provides access)

* feat(tickets): multi-media message gallery (media_items JSONB)

- Add media_items JSONB column to TicketMessage model for multi-media
  gallery support (photos/videos/documents in one bubble)
- Add TicketMediaItem schema with type validation and shared
  _validate_media_bundle helper (max 10 items, legacy field compat)
- Update admin and user ticket handlers to store media_items and
  back-fill legacy media_type/media_file_id/media_caption from first
  item for backward compatibility
- Update _message_to_response in both admin and user routes to include
  media_items in API responses
- Allow empty message text when media is attached (message field now
  defaults to empty string with model validator ensuring text or media)
- Add migration 0061 with idempotent column check

Based on PR #2869 by @smediainfo — CI/CD workflow changes excluded
(hardcoded version strings would regress dynamic manifest reading)

* fix: ticket media_items review fixes

- Add if has_media else None guards in user-side ticket handlers
  (create_ticket, add_message) matching admin handler pattern
- Fix Telegram notification using resolved primary_file_id/primary_type
  instead of raw request fields for gallery messages
- Narrow except Exception to (TypeError, KeyError, ValueError) in
  _message_to_response with warning log for debugging
- Add media_items parameter to TicketCRUD.create_ticket and
  TicketCRUD.add_message for CRUD layer parity
- Add TicketMediaItemResponse and media_items field to webapi
  TicketMessageResponse to prevent data loss on read

* feat: landing page analytics goals and sticky pay button

- Add sticky_pay_button, analytics_view_enabled, analytics_view_goal,
  analytics_click_enabled, analytics_click_goal columns to LandingPage
- Add fields to CRUD updatable fields, admin create/update/detail
  schemas, create_landing() kwargs, _landing_to_detail() response
- Expose sticky_pay_button and analytics fields in public landing
  config response for frontend Yandex Metrika integration
- Add migration 0062 with idempotent column checks

Based on PR #2852 by @smediainfo — CI/CD workflow changes excluded
(hardcoded version strings would regress dynamic manifest reading)

* fix: validate analytics goal is set when analytics is enabled on landing

Prevent enabling analytics_view/click without providing the
corresponding goal identifier, which would result in empty
Yandex Metrika calls on the frontend.

* feat: Yandex Metrika offline conversions + S2S postbacks

- Add YandexClientIdMap model for user → yandex_cid mapping with
  upsert-safe CRUD (ON CONFLICT DO UPDATE)
- Add yandex_cid, subid, referrer columns to GuestPurchase
- Add yandex_offline_conv_service: Measurement Protocol integration
  with mc.yandex.ru/collect (registration, trial, purchase events),
  background task management, CID parsing from /start params
- Add s2s_postback_service: server-to-server affiliate postbacks
  with URL template placeholders and URL-safe encoding
- Add analytics offline conversion info to branding API (masked secret)
- Add POST /analytics/yandex-cid endpoint for cabinet CID capture
- Add 11 config settings (YANDEX_OFFLINE_CONV_*, S2S_POSTBACK_*)
- Add migration 0063 (yandex_client_id_map table + guest_purchases cols)
- Fix: mask measurement secret aggressively (show only last 4 chars)
- Fix: always replace {user_id} placeholder in S2S postback URLs
- Fix: use structlog kwargs instead of f-strings with LOG_PREFIX

Based on PR #2851 by @smediainfo — CI/CD workflow changes excluded

---------

Co-authored-by: c0mrade <killmy666@gmail.com>
Co-authored-by: Danila Yudin <danyayudin2012@gmail.com>
Co-authored-by: Dmitry V. Lunin <49199230+BlackRaincoat@users.noreply.github.com>
Co-authored-by: Dmitry Lunin <br@slack.ru>
2026-04-22 06:08:26 +03:00

491 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import UTC, datetime
import structlog
from sqlalchemy import and_, desc, func, or_, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database.models import SupportAuditLog, Ticket, TicketMessage, TicketStatus
logger = structlog.get_logger(__name__)
class TicketCRUD:
"""CRUD операции для работы с тикетами"""
@staticmethod
async def create_ticket(
db: AsyncSession,
user_id: int,
title: str,
message_text: str,
priority: str = 'normal',
*,
media_type: str | None = None,
media_file_id: str | None = None,
media_caption: str | None = None,
media_items: list[dict] | None = None,
) -> Ticket:
"""Создать новый тикет с первым сообщением"""
ticket = Ticket(user_id=user_id, title=title, status=TicketStatus.OPEN.value, priority=priority)
db.add(ticket)
await db.flush() # Получаем ID тикета
# Создаем первое сообщение
message = TicketMessage(
ticket_id=ticket.id,
user_id=user_id,
message_text=message_text,
is_from_admin=False,
has_media=bool(media_type and media_file_id) or bool(media_items),
media_type=media_type,
media_file_id=media_file_id,
media_caption=media_caption,
media_items=media_items,
)
db.add(message)
await db.commit()
await db.refresh(ticket)
# Отправляем событие о создании тикета
try:
from app.services.event_emitter import event_emitter
await event_emitter.emit(
'ticket.created',
{
'ticket_id': ticket.id,
'user_id': user_id,
'title': title,
'status': ticket.status,
'priority': priority,
'has_media': bool(media_type and media_file_id),
},
db=db,
)
except Exception as error:
logger.warning('Failed to emit ticket.created event', error=error)
return ticket
@staticmethod
async def get_ticket_by_id(
db: AsyncSession, ticket_id: int, load_messages: bool = True, load_user: bool = False
) -> Ticket | None:
"""Получить тикет по ID"""
query = select(Ticket).where(Ticket.id == ticket_id)
if load_user:
query = query.options(selectinload(Ticket.user))
if load_messages:
query = query.options(selectinload(Ticket.messages))
result = await db.execute(query)
return result.scalar_one_or_none()
@staticmethod
async def get_user_tickets(
db: AsyncSession, user_id: int, status: str | None = None, limit: int = 20, offset: int = 0
) -> list[Ticket]:
"""Получить тикеты пользователя"""
query = select(Ticket).where(Ticket.user_id == user_id)
if status:
query = query.where(Ticket.status == status)
query = query.order_by(desc(Ticket.updated_at)).offset(offset).limit(limit)
result = await db.execute(query)
return result.scalars().all()
@staticmethod
async def count_user_tickets_by_statuses(db: AsyncSession, user_id: int, statuses: list[str]) -> int:
"""Подсчитать количество тикетов пользователя по списку статусов"""
query = select(func.count()).select_from(Ticket).where(Ticket.user_id == user_id)
if statuses:
query = query.where(Ticket.status.in_(statuses))
result = await db.execute(query)
return int(result.scalar() or 0)
@staticmethod
async def get_user_tickets_by_statuses(
db: AsyncSession, user_id: int, statuses: list[str], limit: int = 20, offset: int = 0
) -> list[Ticket]:
"""Получить тикеты пользователя по списку статусов с пагинацией"""
query = (
select(Ticket)
.where(Ticket.user_id == user_id)
.order_by(desc(Ticket.updated_at))
.offset(offset)
.limit(limit)
)
if statuses:
query = query.where(Ticket.status.in_(statuses))
result = await db.execute(query)
return result.scalars().all()
@staticmethod
async def user_has_active_ticket(db: AsyncSession, user_id: int) -> bool:
"""Проверить, есть ли у пользователя активный (не закрытый) тикет"""
query = (
select(Ticket.id)
.where(Ticket.user_id == user_id, Ticket.status.in_([TicketStatus.OPEN.value, TicketStatus.ANSWERED.value]))
.limit(1)
)
result = await db.execute(query)
return result.scalar_one_or_none() is not None
@staticmethod
async def is_user_globally_blocked(db: AsyncSession, user_id: int) -> datetime | None:
"""Проверить, заблокирован ли пользователь для создания/ответов по любому тикету.
Возвращает дату окончания блокировки, если активна, или None.
"""
query = (
select(Ticket)
.where(
Ticket.user_id == user_id,
or_(Ticket.user_reply_block_permanent == True, Ticket.user_reply_block_until.isnot(None)),
)
.order_by(desc(Ticket.updated_at))
.limit(10)
)
result = await db.execute(query)
tickets = result.scalars().all()
if not tickets:
return None
# Если есть вечная блокировка в любом тикете — блок активен без срока
for t in tickets:
if t.user_reply_block_permanent:
return datetime.max.replace(tzinfo=UTC)
# Иначе ищем максимальный срок блокировки, если он в будущем
future_until = [t.user_reply_block_until for t in tickets if t.user_reply_block_until]
if not future_until:
return None
max_until = max(future_until)
return max_until if max_until > datetime.now(UTC) else None
@staticmethod
async def get_all_tickets(
db: AsyncSession, status: str | None = None, priority: str | None = None, limit: int = 50, offset: int = 0
) -> list[Ticket]:
"""Получить все тикеты (для админов)"""
query = select(Ticket).options(selectinload(Ticket.user))
conditions = []
if status:
conditions.append(Ticket.status == status)
if priority:
conditions.append(Ticket.priority == priority)
if conditions:
query = query.where(and_(*conditions))
query = query.order_by(desc(Ticket.updated_at)).offset(offset).limit(limit)
result = await db.execute(query)
return result.scalars().all()
@staticmethod
async def get_tickets_by_statuses(
db: AsyncSession, statuses: list[str], limit: int = 50, offset: int = 0
) -> list[Ticket]:
query = select(Ticket).options(selectinload(Ticket.user))
if statuses:
query = query.where(Ticket.status.in_(statuses))
query = query.order_by(desc(Ticket.updated_at)).offset(offset).limit(limit)
result = await db.execute(query)
return result.scalars().all()
@staticmethod
async def count_tickets(db: AsyncSession, status: str | None = None) -> int:
query = select(func.count()).select_from(Ticket)
if status:
query = query.where(Ticket.status == status)
result = await db.execute(query)
return int(result.scalar() or 0)
@staticmethod
async def count_tickets_by_statuses(db: AsyncSession, statuses: list[str]) -> int:
query = select(func.count()).select_from(Ticket)
if statuses:
query = query.where(Ticket.status.in_(statuses))
result = await db.execute(query)
return int(result.scalar() or 0)
@staticmethod
async def update_ticket_status(
db: AsyncSession, ticket_id: int, status: str, closed_at: datetime | None = None
) -> bool:
"""Обновить статус тикета"""
ticket = await TicketCRUD.get_ticket_by_id(db, ticket_id, load_messages=False)
if not ticket:
return False
ticket.status = status
ticket.updated_at = datetime.now(UTC)
if status == TicketStatus.CLOSED.value and closed_at:
ticket.closed_at = closed_at
await db.commit()
# Отправляем событие об изменении статуса тикета
try:
from app.services.event_emitter import event_emitter
await event_emitter.emit(
'ticket.status_changed',
{
'ticket_id': ticket_id,
'user_id': ticket.user_id,
'old_status': ticket.status, # На самом деле это уже новый статус, но для простоты оставим так
'new_status': status,
'closed_at': closed_at.isoformat() if closed_at else None,
},
db=db,
)
except Exception as error:
logger.warning('Failed to emit ticket.status_changed event', error=error)
return True
@staticmethod
async def set_user_reply_block(db: AsyncSession, ticket_id: int, permanent: bool, until: datetime | None) -> bool:
ticket = await TicketCRUD.get_ticket_by_id(db, ticket_id, load_messages=False)
if not ticket:
return False
ticket.user_reply_block_permanent = bool(permanent)
ticket.user_reply_block_until = until
ticket.updated_at = datetime.now(UTC)
await db.commit()
return True
@staticmethod
async def close_ticket(db: AsyncSession, ticket_id: int) -> bool:
"""Закрыть тикет"""
return await TicketCRUD.update_ticket_status(db, ticket_id, TicketStatus.CLOSED.value, datetime.now(UTC))
@staticmethod
async def close_all_open_tickets(
db: AsyncSession,
) -> list[int]:
"""Закрыть все открытые тикеты. Возвращает список идентификаторов закрытых тикетов."""
open_statuses = [TicketStatus.OPEN.value, TicketStatus.ANSWERED.value]
result = await db.execute(select(Ticket.id).where(Ticket.status.in_(open_statuses)))
ticket_ids = result.scalars().all()
if not ticket_ids:
return []
now = datetime.now(UTC)
await db.execute(
update(Ticket)
.where(Ticket.id.in_(ticket_ids))
.values(status=TicketStatus.CLOSED.value, closed_at=now, updated_at=now)
)
await db.commit()
return ticket_ids
@staticmethod
async def add_support_audit(
db: AsyncSession,
*,
actor_user_id: int | None,
actor_telegram_id: int,
is_moderator: bool,
action: str,
ticket_id: int | None = None,
target_user_id: int | None = None,
details: dict | None = None,
) -> None:
try:
log = SupportAuditLog(
actor_user_id=actor_user_id,
actor_telegram_id=actor_telegram_id,
is_moderator=bool(is_moderator),
action=action,
ticket_id=ticket_id,
target_user_id=target_user_id,
details=details or {},
)
db.add(log)
await db.commit()
except Exception:
await db.rollback()
# не мешаем основной логике
@staticmethod
async def list_support_audit(
db: AsyncSession,
*,
limit: int = 50,
offset: int = 0,
action: str | None = None,
) -> list[SupportAuditLog]:
from sqlalchemy import desc, select
query = select(SupportAuditLog).order_by(desc(SupportAuditLog.created_at))
if action:
query = query.where(SupportAuditLog.action == action)
result = await db.execute(query.offset(offset).limit(limit))
return result.scalars().all()
@staticmethod
async def count_support_audit(db: AsyncSession, action: str | None = None) -> int:
from sqlalchemy import func, select
query = select(func.count()).select_from(SupportAuditLog)
if action:
query = query.where(SupportAuditLog.action == action)
result = await db.execute(query)
return int(result.scalar() or 0)
@staticmethod
async def list_support_audit_actions(db: AsyncSession) -> list[str]:
from sqlalchemy import select
result = await db.execute(
select(SupportAuditLog.action)
.where(SupportAuditLog.action.isnot(None))
.distinct()
.order_by(SupportAuditLog.action)
)
return [row[0] for row in result.fetchall()]
@staticmethod
async def get_open_tickets_count(db: AsyncSession) -> int:
"""Получить количество открытых тикетов"""
query = select(Ticket).where(Ticket.status.in_([TicketStatus.OPEN.value, TicketStatus.ANSWERED.value]))
result = await db.execute(query)
return len(result.scalars().all())
class TicketMessageCRUD:
"""CRUD операции для работы с сообщениями тикетов"""
@staticmethod
async def add_message(
db: AsyncSession,
ticket_id: int,
user_id: int,
message_text: str,
is_from_admin: bool = False,
media_type: str | None = None,
media_file_id: str | None = None,
media_caption: str | None = None,
media_items: list[dict] | None = None,
) -> TicketMessage:
"""Добавить сообщение в тикет"""
message = TicketMessage(
ticket_id=ticket_id,
user_id=user_id,
message_text=message_text,
is_from_admin=is_from_admin,
has_media=bool(media_type and media_file_id) or bool(media_items),
media_type=media_type,
media_file_id=media_file_id,
media_caption=media_caption,
media_items=media_items,
)
db.add(message)
# Обновляем статус тикета
ticket = await TicketCRUD.get_ticket_by_id(db, ticket_id, load_messages=False)
if ticket:
# Если тикет закрыт, запрещаем изменение статуса при сообщении пользователя
if not is_from_admin and ticket.status == TicketStatus.CLOSED.value:
return message
if is_from_admin:
# Админ ответил - тикет отвечен
ticket.status = TicketStatus.ANSWERED.value
else:
# Пользователь ответил - тикет открыт
ticket.status = TicketStatus.OPEN.value
# Сбросить отметку последнего SLA-напоминания, чтобы снова напоминать от времени нового сообщения
try:
# если колонка существует в модели
if hasattr(ticket, 'last_sla_reminder_at'):
ticket.last_sla_reminder_at = None
except Exception:
pass
ticket.updated_at = datetime.now(UTC)
await db.commit()
await db.refresh(message)
# Отправляем событие о новом сообщении в тикете
try:
from app.services.event_emitter import event_emitter
await event_emitter.emit(
'ticket.message_added',
{
'ticket_id': ticket_id,
'message_id': message.id,
'user_id': user_id,
'is_from_admin': is_from_admin,
'message_text': message_text[:200], # Ограничиваем длину для события
'has_media': bool(media_type and media_file_id),
'status': ticket.status if ticket else None,
},
db=db,
)
except Exception as error:
logger.warning('Failed to emit ticket.message_added event', error=error)
return message
@staticmethod
async def get_ticket_messages(
db: AsyncSession, ticket_id: int, limit: int = 50, offset: int = 0
) -> list[TicketMessage]:
"""Получить сообщения тикета"""
query = (
select(TicketMessage)
.where(TicketMessage.ticket_id == ticket_id)
.order_by(TicketMessage.created_at)
.offset(offset)
.limit(limit)
)
result = await db.execute(query)
return result.scalars().all()
@staticmethod
async def get_first_message(db: AsyncSession, ticket_id: int) -> TicketMessage | None:
"""Получить первое сообщение в тикете"""
query = (
select(TicketMessage)
.where(TicketMessage.ticket_id == ticket_id)
.order_by(TicketMessage.created_at)
.limit(1)
)
result = await db.execute(query)
return result.scalar_one_or_none()
@staticmethod
async def get_last_message(db: AsyncSession, ticket_id: int) -> TicketMessage | None:
"""Получить последнее сообщение в тикете"""
query = (
select(TicketMessage)
.where(TicketMessage.ticket_id == ticket_id)
.order_by(desc(TicketMessage.created_at))
.limit(1)
)
result = await db.execute(query)
return result.scalar_one_or_none()