diff --git a/.gitignore b/.gitignore index 6d0fec20..14d418df 100644 --- a/.gitignore +++ b/.gitignore @@ -1,49 +1,100 @@ # Игнорируем все файлы и папки по умолчанию * -docker-compose.override.yml -# Исключения: разрешаем только нужные файлы +# ========== WHITELIST: разрешённые файлы ========== + +# Конфигурация проекта !.dockerignore !.env.example -!install_bot.sh +!.gitignore +!.python-version !Dockerfile -!app-config.json -!main.py -!requirements.txt +!docker-compose.yml +!docker-compose.local.yml +!Makefile !pyproject.toml !uv.lock -!.python-version -!docs/ -!docs/** -!migrations/ -!migrations/** +!requirements.txt +!alembic.ini +!app-config.json + +# Документация +!README.md +!LICENSE +!CONTRIBUTING.md +!SECURITY.md + +# Скрипты +!install_bot.sh +!main.py + +# Статические файлы +!vpn_logo.png + +# ========== WHITELIST: разрешённые папки ========== -# Разрешаем папку app/ и все её содержимое рекурсивно !app/ !app/** !tests/ !tests/** - -# Дополнительно разрешаем README и лицензию (опционально) -!README.md -!LICENSE - -# Разрешаем .gitignore чтобы он попал в репозиторий -!.gitignore - -# Разрешаем .github/ (workflows, pre-commit и т.д.) +!migrations/ +!migrations/** +!docs/ +!docs/** +!assets/ +!assets/** +!locales/ +!locales/** !.github/ !.github/** -# Разрешаем Makefile -!Makefile +# ========== BLACKLIST: игнорируемые внутри папок ========== -# Внутри разрешенных папок игнорируем служебные файлы -app/__pycache__/ -app/**/__pycache__/ -app/**/*.pyc -app/**/*.pyo -app/**/*.pyd -*.pyc -*.pyo -*.pyd +# Python +__pycache__/ +**/__pycache__/ +*.py[cod] +*$py.class +*.so + +# Virtual environments +.venv/ +venv/ +ENV/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# Build/dist +build/ +dist/ +*.egg-info/ +.eggs/ + +# Testing/coverage +.coverage +htmlcov/ +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ + +# Local overrides (не коммитить!) +docker-compose.override.yml +.env +.env.local +.env.*.local + +# Runtime data +logs/ +data/ +*.log +*.db +*.sqlite3 + +# OS files +.DS_Store +Thumbs.db diff --git a/app/handlers/admin/messages.py b/app/handlers/admin/messages.py index d281a7b4..aa0af353 100644 --- a/app/handlers/admin/messages.py +++ b/app/handlers/admin/messages.py @@ -127,58 +127,78 @@ def create_broadcast_keyboard(selected_buttons: list, language: str = 'ru') -> t async def _persist_broadcast_result( - db: AsyncSession, - broadcast_history: BroadcastHistory, + broadcast_id: int, sent_count: int, failed_count: int, status: str, ) -> None: - """Сохраняет результаты рассылки с повторной попыткой при обрыве соединения.""" + """ + Сохраняет результаты рассылки в НОВОЙ сессии. - # Сохраняем ID и время завершения в локальные переменные ДО операций с БД, - # чтобы избежать обращения к атрибутам отсоединенного объекта при потере соединения - broadcast_id = broadcast_history.id + ВАЖНО: Используем свежую сессию вместо переданной, потому что за время + долгой рассылки (минуты/часы) оригинальное соединение гарантированно + закроется по таймауту PostgreSQL (idle_in_transaction_session_timeout). + + Args: + broadcast_id: ID записи BroadcastHistory (не ORM-объект!) + sent_count: Количество успешно отправленных сообщений + failed_count: Количество неудачных отправок + status: Финальный статус рассылки ('completed', 'partial', 'failed') + """ completed_at = datetime.utcnow() + max_retries = 3 + retry_delay = 1.0 - broadcast_history.sent_count = sent_count - broadcast_history.failed_count = failed_count - broadcast_history.status = status - broadcast_history.completed_at = completed_at + for attempt in range(1, max_retries + 1): + try: + async with AsyncSessionLocal() as session: + broadcast_history = await session.get(BroadcastHistory, broadcast_id) + if not broadcast_history: + logger.critical( + 'Не удалось найти запись BroadcastHistory #%s для записи результатов', + broadcast_id, + ) + return - try: - await db.commit() - return - except InterfaceError as error: - logger.warning( - 'Соединение с БД потеряно при сохранении результатов рассылки, пробуем еще раз', - exc_info=error, - ) - await db.rollback() + broadcast_history.sent_count = sent_count + broadcast_history.failed_count = failed_count + broadcast_history.status = status + broadcast_history.completed_at = completed_at + await session.commit() - try: - async with AsyncSessionLocal() as retry_session: - retry_history = await retry_session.get(BroadcastHistory, broadcast_id) - if not retry_history: - logger.critical( - 'Не удалось найти запись BroadcastHistory #%s для повторной записи результатов', + logger.info( + 'Результаты рассылки сохранены (id=%s, sent=%d, failed=%d, status=%s)', broadcast_id, + sent_count, + failed_count, + status, ) return - retry_history.sent_count = sent_count - retry_history.failed_count = failed_count - retry_history.status = status - retry_history.completed_at = completed_at - await retry_session.commit() - logger.info( - 'Результаты рассылки успешно сохранены после повторного подключения к БД (id=%s)', - broadcast_id, + except InterfaceError as error: + logger.warning( + 'Ошибка соединения при сохранении результатов рассылки (попытка %d/%d): %s', + attempt, + max_retries, + error, ) - except Exception as retry_error: - logger.critical( - 'Не удалось сохранить результаты рассылки после восстановления подключения', - exc_info=retry_error, - ) + if attempt < max_retries: + await asyncio.sleep(retry_delay) + retry_delay *= 2 + else: + logger.critical( + 'Не удалось сохранить результаты рассылки после %d попыток (id=%s)', + max_retries, + broadcast_id, + ) + + except Exception as error: + logger.critical( + 'Неожиданная ошибка при сохранении результатов рассылки (id=%s)', + broadcast_id, + exc_info=error, + ) + return @admin_required @@ -1113,15 +1133,35 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state: media_file_id = data.get('media_file_id') media_caption = data.get('media_caption') + # ========================================================================= + # КРИТИЧНО: Извлекаем ВСЕ скалярные значения из ORM-объектов СЕЙЧАС, + # пока сессия активна. После начала рассылки соединение с БД может + # закрыться по таймауту, и любое обращение к атрибутам ORM вызовет: + # - MissingGreenlet (lazy loading вне async контекста) + # - InterfaceError (соединение закрыто) + # ========================================================================= + admin_id: int = db_user.id + admin_name: str = db_user.full_name # property, читает first_name/last_name + admin_telegram_id: int | None = db_user.telegram_id + admin_language: str = db_user.language + await safe_edit_or_send_text( callback, '📨 Начинаю рассылку...\n\n⏳ Это может занять несколько минут.', reply_markup=None, parse_mode='HTML' ) + # Загружаем пользователей и сразу извлекаем telegram_id в список + # чтобы не обращаться к ORM-объектам во время долгой рассылки if target.startswith('custom_'): - users = await get_custom_users(db, target.replace('custom_', '')) + users_orm = await get_custom_users(db, target.replace('custom_', '')) else: - users = await get_target_users(db, target) + users_orm = await get_target_users(db, target) + # Извлекаем только telegram_id - это всё что нужно для отправки + # Фильтруем None (email-only пользователи) + recipient_telegram_ids: list[int] = [user.telegram_id for user in users_orm if user.telegram_id is not None] + total_users_count = len(users_orm) + + # Создаём запись истории рассылки broadcast_history = BroadcastHistory( target_type=target, message_text=message_text, @@ -1129,21 +1169,29 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state: media_type=media_type, media_file_id=media_file_id, media_caption=media_caption, - total_count=len(users), + total_count=total_users_count, sent_count=0, failed_count=0, - admin_id=db_user.id, - admin_name=db_user.full_name, + admin_id=admin_id, + admin_name=admin_name, status='in_progress', ) db.add(broadcast_history) await db.commit() await db.refresh(broadcast_history) + # Сохраняем ID - это единственное что нам нужно после коммита + broadcast_id: int = broadcast_history.id + + # ========================================================================= + # С этого момента НЕ используем db сессию и ORM-объекты! + # Работаем только со скалярными значениями. + # ========================================================================= + sent_count = 0 failed_count = 0 - broadcast_keyboard = create_broadcast_keyboard(selected_buttons, db_user.language) + broadcast_keyboard = create_broadcast_keyboard(selected_buttons, admin_language) # Ограничение на количество одновременных отправок и базовая задержка между сообщениями, # чтобы избежать перегрузки бота и лимитов Telegram при больших рассылках @@ -1151,20 +1199,15 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state: per_message_delay = 0.05 semaphore = asyncio.Semaphore(max_concurrent_sends) - async def send_single_broadcast(user): - """Отправляет одно сообщение рассылки с семафором ограничения""" - # Skip email-only users (no telegram_id) - if not user.telegram_id: - logger.debug('Пропуск email-пользователя %s при рассылке', user.id) - return False, None - + async def send_single_broadcast(telegram_id: int) -> tuple[bool, int]: + """Отправляет одно сообщение рассылки с семафором ограничения.""" async with semaphore: for attempt in range(3): try: if has_media and media_file_id: if media_type == 'photo': await callback.bot.send_photo( - chat_id=user.telegram_id, + chat_id=telegram_id, photo=media_file_id, caption=message_text, parse_mode='HTML', @@ -1172,7 +1215,7 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state: ) elif media_type == 'video': await callback.bot.send_video( - chat_id=user.telegram_id, + chat_id=telegram_id, video=media_file_id, caption=message_text, parse_mode='HTML', @@ -1180,7 +1223,7 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state: ) elif media_type == 'document': await callback.bot.send_document( - chat_id=user.telegram_id, + chat_id=telegram_id, document=media_file_id, caption=message_text, parse_mode='HTML', @@ -1188,38 +1231,36 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state: ) else: await callback.bot.send_message( - chat_id=user.telegram_id, + chat_id=telegram_id, text=message_text, parse_mode='HTML', reply_markup=broadcast_keyboard, ) await asyncio.sleep(per_message_delay) - return True, user.telegram_id + return True, telegram_id except TelegramRetryAfter as e: retry_delay = min(e.retry_after + 1, 30) - logger.warning(f'Превышен лимит Telegram для {user.telegram_id}, ожидание {retry_delay} сек.') + logger.warning(f'Превышен лимит Telegram для {telegram_id}, ожидание {retry_delay} сек.') await asyncio.sleep(retry_delay) except TelegramForbiddenError: # Пользователь мог удалить бота или запретить сообщения - logger.info(f'Рассылка недоступна для пользователя {user.telegram_id}: Forbidden') - return False, user.telegram_id + logger.info(f'Рассылка недоступна для пользователя {telegram_id}: Forbidden') + return False, telegram_id except TelegramBadRequest as e: - logger.error(f'Некорректный запрос при рассылке пользователю {user.telegram_id}: {e}') - return False, user.telegram_id + logger.error(f'Некорректный запрос при рассылке пользователю {telegram_id}: {e}') + return False, telegram_id except Exception as e: - logger.error( - f'Ошибка отправки рассылки пользователю {user.telegram_id} (попытка {attempt + 1}/3): {e}' - ) + logger.error(f'Ошибка отправки рассылки пользователю {telegram_id} (попытка {attempt + 1}/3): {e}') await asyncio.sleep(0.5 * (attempt + 1)) - return False, user.telegram_id + return False, telegram_id # Отправляем сообщения пакетами для эффективности batch_size = 50 - for i in range(0, len(users), batch_size): - batch = users[i : i + batch_size] - tasks = [send_single_broadcast(user) for user in batch] + for i in range(0, len(recipient_telegram_ids), batch_size): + batch = recipient_telegram_ids[i : i + batch_size] + tasks = [send_single_broadcast(tid) for tid in batch] results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: @@ -1235,10 +1276,16 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state: # Небольшая задержка между пакетами для снижения нагрузки на API await asyncio.sleep(0.25) + # Учитываем пропущенных email-only пользователей + skipped_email_users = total_users_count - len(recipient_telegram_ids) + if skipped_email_users > 0: + logger.info(f'Пропущено {skipped_email_users} email-only пользователей при рассылке') + status = 'completed' if failed_count == 0 else 'partial' + + # Сохраняем результат в НОВОЙ сессии (старая уже мертва) await _persist_broadcast_result( - db=db, - broadcast_history=broadcast_history, + broadcast_id=broadcast_id, sent_count=sent_count, failed_count=failed_count, status=status, @@ -1248,16 +1295,17 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state: if has_media: media_info = f'\n🖼️ Медиафайл: {media_type}' + # Используем заранее сохранённое имя админа result_text = f""" ✅ Рассылка завершена! 📊 Результат: - Отправлено: {sent_count} - Не доставлено: {failed_count} -- Всего пользователей: {len(users)} -- Успешность: {round(sent_count / len(users) * 100, 1) if users else 0}%{media_info} +- Всего пользователей: {total_users_count} +- Успешность: {round(sent_count / total_users_count * 100, 1) if total_users_count else 0}%{media_info} -Администратор: {db_user.full_name} +Администратор: {admin_name} """ try: @@ -1290,7 +1338,9 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state: raise await state.clear() - logger.info(f'Рассылка выполнена админом {db_user.telegram_id}: {sent_count}/{len(users)} (медиа: {has_media})') + logger.info( + f'Рассылка выполнена админом {admin_telegram_id}: {sent_count}/{total_users_count} (медиа: {has_media})' + ) async def get_target_users_count(db: AsyncSession, target: str) -> int: diff --git a/app/services/pinned_message_service.py b/app/services/pinned_message_service.py index 0acd93c0..e11e5d2c 100644 --- a/app/services/pinned_message_service.py +++ b/app/services/pinned_message_service.py @@ -123,7 +123,15 @@ async def broadcast_pinned_message( db: AsyncSession, pinned_message: PinnedMessage, ) -> tuple[int, int]: - users: list[User] = [] + """ + Рассылает закреплённое сообщение всем активным пользователям. + + ВАЖНО: Извлекаем telegram_id в список ДО начала долгой рассылки, + чтобы избежать обращения к ORM-объектам после истечения таймаута + соединения с БД. + """ + # Собираем telegram_id всех активных пользователей + recipient_telegram_ids: list[int] = [] offset = 0 batch_size = 5000 @@ -138,27 +146,26 @@ async def broadcast_pinned_message( if not batch: break - users.extend(batch) + # Извлекаем только telegram_id, фильтруем email-only пользователей + for user in batch: + if user.telegram_id is not None: + recipient_telegram_ids.append(user.telegram_id) + offset += batch_size sent_count = 0 failed_count = 0 semaphore = asyncio.Semaphore(3) - async def send_to_user(user: User) -> None: + async def send_to_telegram_id(telegram_id: int) -> None: nonlocal sent_count, failed_count - # Skip email-only users (no telegram_id) - if not user.telegram_id: - failed_count += 1 - return - async with semaphore: for attempt in range(3): try: success = await _send_and_pin_message( bot, - user.telegram_id, + telegram_id, pinned_message, ) if success: @@ -170,22 +177,22 @@ async def broadcast_pinned_message( delay = min(retry_error.retry_after + 1, 30) logger.warning( 'RetryAfter for user %s, waiting %s seconds', - user.telegram_id, + telegram_id, delay, ) await asyncio.sleep(delay) except Exception as send_error: logger.error( 'Ошибка отправки закрепленного сообщения пользователю %s: %s', - user.telegram_id, + telegram_id, send_error, ) failed_count += 1 break - for i in range(0, len(users), 30): - batch = users[i : i + 30] - tasks = [send_to_user(user) for user in batch] + for i in range(0, len(recipient_telegram_ids), 30): + batch = recipient_telegram_ids[i : i + 30] + tasks = [send_to_telegram_id(tid) for tid in batch] await asyncio.gather(*tasks) await asyncio.sleep(0.05) @@ -196,11 +203,19 @@ async def unpin_active_pinned_message( bot: Bot, db: AsyncSession, ) -> tuple[int, int, bool]: + """ + Открепляет активное сообщение у всех пользователей. + + ВАЖНО: Извлекаем telegram_id в список ДО начала долгой операции, + чтобы избежать обращения к ORM-объектам после истечения таймаута + соединения с БД. + """ pinned_message = await deactivate_active_pinned_message(db) if not pinned_message: return 0, 0, False - users: list[User] = [] + # Собираем telegram_id всех активных пользователей + recipient_telegram_ids: list[int] = [] offset = 0 batch_size = 5000 @@ -215,24 +230,23 @@ async def unpin_active_pinned_message( if not batch: break - users.extend(batch) + # Извлекаем только telegram_id, фильтруем email-only пользователей + for user in batch: + if user.telegram_id is not None: + recipient_telegram_ids.append(user.telegram_id) + offset += batch_size unpinned_count = 0 failed_count = 0 semaphore = asyncio.Semaphore(5) - async def unpin_for_user(user: User) -> None: + async def unpin_for_telegram_id(telegram_id: int) -> None: nonlocal unpinned_count, failed_count - # Skip email-only users (no telegram_id) - if not user.telegram_id: - failed_count += 1 - return - async with semaphore: try: - success = await _unpin_message_for_user(bot, user.telegram_id) + success = await _unpin_message_for_user(bot, telegram_id) if success: unpinned_count += 1 else: @@ -241,22 +255,30 @@ async def unpin_active_pinned_message( delay = min(retry_error.retry_after + 1, 30) logger.warning( 'RetryAfter while unpinning for user %s, waiting %s seconds', - user.telegram_id, + telegram_id, delay, ) await asyncio.sleep(delay) - await unpin_for_user(user) + # Повторная попытка после ожидания + try: + success = await _unpin_message_for_user(bot, telegram_id) + if success: + unpinned_count += 1 + else: + failed_count += 1 + except Exception: + failed_count += 1 except Exception as error: logger.error( 'Ошибка открепления сообщения у пользователя %s: %s', - user.telegram_id, + telegram_id, error, ) failed_count += 1 - for i in range(0, len(users), 40): - batch = users[i : i + 40] - tasks = [unpin_for_user(user) for user in batch] + for i in range(0, len(recipient_telegram_ids), 40): + batch = recipient_telegram_ids[i : i + 40] + tasks = [unpin_for_telegram_id(tid) for tid in batch] await asyncio.gather(*tasks) await asyncio.sleep(0.05)