Fix FakeSession execute stub for payment webhook tests

This commit is contained in:
Egor
2025-10-31 19:42:53 +03:00
parent f952c0e38d
commit bfa9490425
4 changed files with 135 additions and 33 deletions

View File

@@ -44,6 +44,17 @@ from app.database.crud.subscription import decrement_subscription_server_counts
logger = logging.getLogger(__name__)
def _calculate_subscription_flags(subscription):
if not subscription:
return False, False
actual_status = getattr(subscription, "actual_status", None)
has_active_subscription = actual_status in {"active", "trial"}
subscription_is_active = bool(getattr(subscription, "is_active", False))
return has_active_subscription, subscription_is_active
async def _apply_campaign_bonus_if_needed(
db: AsyncSession,
user,
@@ -338,11 +349,9 @@ async def cmd_start(message: types.Message, state: FSMContext, db: AsyncSession,
f"Ошибка отправки уведомления о рекламной кампании: {e}"
)
has_active_subscription = user.subscription is not None
subscription_is_active = False
if user.subscription:
subscription_is_active = user.subscription.is_active
has_active_subscription, subscription_is_active = _calculate_subscription_flags(
user.subscription
)
menu_text = await get_main_menu_text(user, texts, db)
@@ -761,11 +770,9 @@ async def complete_registration_from_callback(
await db.refresh(existing_user, ['subscription'])
has_active_subscription = existing_user.subscription is not None
subscription_is_active = False
if existing_user.subscription:
subscription_is_active = existing_user.subscription.is_active
has_active_subscription, subscription_is_active = _calculate_subscription_flags(
existing_user.subscription
)
menu_text = await get_main_menu_text(existing_user, texts, db)
@@ -943,11 +950,9 @@ async def complete_registration_from_callback(
else:
logger.info(f" Приветственные сообщения отключены, показываем главное меню для пользователя {user.telegram_id}")
has_active_subscription = bool(getattr(user, "subscription", None))
subscription_is_active = False
if getattr(user, "subscription", None):
subscription_is_active = user.subscription.is_active
has_active_subscription, subscription_is_active = _calculate_subscription_flags(
getattr(user, "subscription", None)
)
menu_text = await get_main_menu_text(user, texts, db)
@@ -1019,11 +1024,9 @@ async def complete_registration(
await db.refresh(existing_user, ['subscription'])
has_active_subscription = existing_user.subscription is not None
subscription_is_active = False
if existing_user.subscription:
subscription_is_active = existing_user.subscription.is_active
has_active_subscription, subscription_is_active = _calculate_subscription_flags(
existing_user.subscription
)
menu_text = await get_main_menu_text(existing_user, texts, db)
@@ -1201,11 +1204,9 @@ async def complete_registration(
else:
logger.info(f" Приветственные сообщения отключены, показываем главное меню для пользователя {user.telegram_id}")
has_active_subscription = bool(getattr(user, "subscription", None))
subscription_is_active = False
if getattr(user, "subscription", None):
subscription_is_active = user.subscription.is_active
has_active_subscription, subscription_is_active = _calculate_subscription_flags(
getattr(user, "subscription", None)
)
menu_text = await get_main_menu_text(user, texts, db)
@@ -1258,23 +1259,35 @@ def _get_subscription_status(user, texts):
return texts.t("SUBSCRIPTION_NONE", "Нет активной подписки")
subscription = user.subscription
actual_status = getattr(subscription, "actual_status", None)
from datetime import datetime
end_date = getattr(subscription, "end_date", None)
current_time = datetime.utcnow()
if end_date and end_date <= current_time:
return texts.t(
"SUB_STATUS_EXPIRED",
"🔴 Истекла\n📅 {end_date}",
).format(end_date=end_date.strftime('%d.%m.%Y'))
if actual_status == "disabled":
return texts.t("SUB_STATUS_DISABLED", "⚫ Отключена")
if actual_status == "pending":
return texts.t("SUB_STATUS_PENDING", "⏳ Ожидает активации")
if actual_status == "expired" or (end_date and end_date <= current_time):
if end_date:
return texts.t(
"SUB_STATUS_EXPIRED",
"🔴 Истекла\n📅 {end_date}",
).format(end_date=end_date.strftime('%d.%m.%Y'))
return texts.t("SUBSCRIPTION_STATUS_EXPIRED", "🔴 Истекла")
if not end_date:
return texts.t("SUBSCRIPTION_ACTIVE", "✅ Активна")
days_left = (end_date - current_time).days
is_trial = getattr(subscription, "is_trial", False)
is_trial = actual_status == "trial" or getattr(subscription, "is_trial", False)
if actual_status not in {"active", "trial", None} and not is_trial:
return texts.t("SUBSCRIPTION_STATUS_UNKNOWN", "❓ Статус неизвестен")
if is_trial:
if days_left > 1:
@@ -1521,8 +1534,9 @@ async def required_sub_channel_check(
logger.warning(f"Не удалось удалить сообщение: {e}")
if user and user.status != UserStatus.DELETED.value:
has_active_subscription = bool(user.subscription)
subscription_is_active = bool(user.subscription and user.subscription.is_active)
has_active_subscription, subscription_is_active = _calculate_subscription_flags(
user.subscription
)
menu_text = await get_main_menu_text(user, texts, db)

View File

@@ -1308,6 +1308,8 @@
"SUB_STATUS_ACTIVE_TODAY": "💎 Active\n⚠ expires today!",
"SUB_STATUS_ACTIVE_TOMORROW": "💎 Active\n⚠ expires tomorrow!",
"SUB_STATUS_EXPIRED": "🔴 Expired\n📅 {end_date}",
"SUB_STATUS_DISABLED": "⚫ Disabled",
"SUB_STATUS_PENDING": "⏳ Pending activation",
"SUB_STATUS_NONE": "❌ Not available",
"SUB_STATUS_TRIAL_ACTIVE": "🎁 Trial subscription\n📅 until {end_date} ({days} days)",
"SUB_STATUS_TRIAL_TODAY": "🎁 Trial subscription\n⚠ expires today!",

View File

@@ -1308,6 +1308,8 @@
"SUB_STATUS_ACTIVE_TODAY": "💎 Активна\n⚠ истекает сегодня!",
"SUB_STATUS_ACTIVE_TOMORROW": "💎 Активна\n⚠ истекает завтра!",
"SUB_STATUS_EXPIRED": "🔴 Истекла\n📅 {end_date}",
"SUB_STATUS_DISABLED": "⚫ Отключена",
"SUB_STATUS_PENDING": "⏳ Ожидает активации",
"SUB_STATUS_NONE": "❌ Отсутствует",
"SUB_STATUS_TRIAL_ACTIVE": "🎁 Тестовая подписка\n📅 до {end_date} ({days} дн.)",
"SUB_STATUS_TRIAL_TODAY": "🎁 Тестовая подписка\n⚠ истекает сегодня!",

View File

@@ -30,11 +30,81 @@ class DummyBot:
self.sent_messages.append({"args": args, "kwargs": kwargs})
class FakeScalarResult:
def __init__(self, items: list[Any]) -> None:
self._items = list(items)
def all(self) -> list[Any]: # pragma: no cover - утилитарный метод
return list(self._items)
def first(self) -> Any: # pragma: no cover - утилитарный метод
return self._items[0] if self._items else None
def one(self) -> Any: # pragma: no cover - утилитарный метод
if len(self._items) != 1:
raise ValueError("Expected exactly one result")
return self._items[0]
def one_or_none(self) -> Any: # pragma: no cover - утилитарный метод
if not self._items:
return None
if len(self._items) > 1:
raise ValueError("Expected zero or one result")
return self._items[0]
def __iter__(self): # pragma: no cover - утилитарный метод
return iter(self._items)
class FakeResult:
def __init__(self, value: Any = None) -> None:
self._value = value
def _as_iterable(self) -> list[Any]:
if isinstance(self._value, list):
return self._value
if self._value is None:
return []
return [self._value]
def scalar(self) -> Any:
items = self._as_iterable()
return items[0] if items else None
def scalar_one_or_none(self) -> Any:
items = self._as_iterable()
if not items:
return None
if len(items) > 1:
raise ValueError("Expected zero or one result")
return items[0]
def first(self) -> Any: # pragma: no cover - утилитарный метод
items = self._as_iterable()
return items[0] if items else None
def all(self) -> list[Any]: # pragma: no cover - утилитарный метод
return list(self._as_iterable())
def one_or_none(self) -> Any: # pragma: no cover - утилитарный метод
items = self._as_iterable()
if not items:
return None
if len(items) > 1:
raise ValueError("Expected zero or one result")
return items[0]
def scalars(self) -> FakeScalarResult: # pragma: no cover - утилитарный метод
return FakeScalarResult(self._as_iterable())
class FakeSession:
def __init__(self) -> None:
self.commits = 0
self.refreshed: list[Any] = []
self.added: list[Any] = []
self.execute_statements: list[Any] = []
self.execute_results: list[Any] = []
async def commit(self) -> None:
self.commits += 1
@@ -48,6 +118,20 @@ class FakeSession:
def add(self, obj: Any) -> None: # pragma: no cover - используется при создании транзакций
self.added.append(obj)
async def execute(self, statement: Any, *args: Any, **kwargs: Any) -> FakeResult:
self.execute_statements.append(statement)
if self.execute_results:
result = self.execute_results.pop(0)
if callable(result): # pragma: no cover - гибкость для будущих тестов
result = result(statement, *args, **kwargs)
else:
result = None
if isinstance(result, FakeResult):
return result
return FakeResult(result)
def _make_service(bot: DummyBot) -> PaymentService:
service = PaymentService.__new__(PaymentService) # type: ignore[call-arg]