fix(broadcast): resolve SQLAlchemy connection closed errors during long broadcasts

- Extract scalar values from ORM objects before long operations
- Create fresh DB sessions for persist operations with retry mechanism
- Replace ORM User objects with telegram_id integers in broadcast loops
- Update .gitignore to exclude Python cache, IDE files, and local configs

Fixes: InterfaceError "connection is closed" and MissingGreenlet errors
during mass message broadcasts
This commit is contained in:
Fringg
2026-02-05 05:42:31 +03:00
parent cf10eeda53
commit b8682adbbf
3 changed files with 258 additions and 135 deletions

115
.gitignore vendored
View File

@@ -1,49 +1,100 @@
# Игнорируем все файлы и папки по умолчанию
*
docker-compose.override.yml
# Исключения: разрешаем только нужные файлы
# ========== WHITELIST: разрешённые файлы ==========
# Конфигурация проекта
!.dockerignore
!.env.example
!install_bot.sh
!.gitignore
!.python-version
!Dockerfile
!app-config.json
!main.py
!requirements.txt
!docker-compose.yml
!docker-compose.local.yml
!Makefile
!pyproject.toml
!uv.lock
!.python-version
!docs/
!docs/**
!migrations/
!migrations/**
!requirements.txt
!alembic.ini
!app-config.json
# Документация
!README.md
!LICENSE
!CONTRIBUTING.md
!SECURITY.md
# Скрипты
!install_bot.sh
!main.py
# Статические файлы
!vpn_logo.png
# ========== WHITELIST: разрешённые папки ==========
# Разрешаем папку app/ и все её содержимое рекурсивно
!app/
!app/**
!tests/
!tests/**
# Дополнительно разрешаем README и лицензию (опционально)
!README.md
!LICENSE
# Разрешаем .gitignore чтобы он попал в репозиторий
!.gitignore
# Разрешаем .github/ (workflows, pre-commit и т.д.)
!migrations/
!migrations/**
!docs/
!docs/**
!assets/
!assets/**
!locales/
!locales/**
!.github/
!.github/**
# Разрешаем Makefile
!Makefile
# ========== BLACKLIST: игнорируемые внутри папок ==========
# Внутри разрешенных папок игнорируем служебные файлы
app/__pycache__/
app/**/__pycache__/
app/**/*.pyc
app/**/*.pyo
app/**/*.pyd
*.pyc
*.pyo
*.pyd
# Python
__pycache__/
**/__pycache__/
*.py[cod]
*$py.class
*.so
# Virtual environments
.venv/
venv/
ENV/
# IDE
.idea/
.vscode/
*.swp
*.swo
*~
# Build/dist
build/
dist/
*.egg-info/
.eggs/
# Testing/coverage
.coverage
htmlcov/
.pytest_cache/
.mypy_cache/
.ruff_cache/
# Local overrides (не коммитить!)
docker-compose.override.yml
.env
.env.local
.env.*.local
# Runtime data
logs/
data/
*.log
*.db
*.sqlite3
# OS files
.DS_Store
Thumbs.db

View File

@@ -127,58 +127,78 @@ def create_broadcast_keyboard(selected_buttons: list, language: str = 'ru') -> t
async def _persist_broadcast_result(
db: AsyncSession,
broadcast_history: BroadcastHistory,
broadcast_id: int,
sent_count: int,
failed_count: int,
status: str,
) -> None:
"""Сохраняет результаты рассылки с повторной попыткой при обрыве соединения."""
"""
Сохраняет результаты рассылки в НОВОЙ сессии.
# Сохраняем ID и время завершения в локальные переменные ДО операций с БД,
# чтобы избежать обращения к атрибутам отсоединенного объекта при потере соединения
broadcast_id = broadcast_history.id
ВАЖНО: Используем свежую сессию вместо переданной, потому что за время
долгой рассылки (минуты/часы) оригинальное соединение гарантированно
закроется по таймауту PostgreSQL (idle_in_transaction_session_timeout).
Args:
broadcast_id: ID записи BroadcastHistory (не ORM-объект!)
sent_count: Количество успешно отправленных сообщений
failed_count: Количество неудачных отправок
status: Финальный статус рассылки ('completed', 'partial', 'failed')
"""
completed_at = datetime.utcnow()
max_retries = 3
retry_delay = 1.0
broadcast_history.sent_count = sent_count
broadcast_history.failed_count = failed_count
broadcast_history.status = status
broadcast_history.completed_at = completed_at
for attempt in range(1, max_retries + 1):
try:
async with AsyncSessionLocal() as session:
broadcast_history = await session.get(BroadcastHistory, broadcast_id)
if not broadcast_history:
logger.critical(
'Не удалось найти запись BroadcastHistory #%s для записи результатов',
broadcast_id,
)
return
try:
await db.commit()
return
except InterfaceError as error:
logger.warning(
'Соединение с БД потеряно при сохранении результатов рассылки, пробуем еще раз',
exc_info=error,
)
await db.rollback()
broadcast_history.sent_count = sent_count
broadcast_history.failed_count = failed_count
broadcast_history.status = status
broadcast_history.completed_at = completed_at
await session.commit()
try:
async with AsyncSessionLocal() as retry_session:
retry_history = await retry_session.get(BroadcastHistory, broadcast_id)
if not retry_history:
logger.critical(
'Не удалось найти запись BroadcastHistory #%s для повторной записи результатов',
logger.info(
'Результаты рассылки сохранены (id=%s, sent=%d, failed=%d, status=%s)',
broadcast_id,
sent_count,
failed_count,
status,
)
return
retry_history.sent_count = sent_count
retry_history.failed_count = failed_count
retry_history.status = status
retry_history.completed_at = completed_at
await retry_session.commit()
logger.info(
'Результаты рассылки успешно сохранены после повторного подключения к БД (id=%s)',
broadcast_id,
except InterfaceError as error:
logger.warning(
'Ошибка соединения при сохранении результатов рассылки (попытка %d/%d): %s',
attempt,
max_retries,
error,
)
except Exception as retry_error:
logger.critical(
'Не удалось сохранить результаты рассылки после восстановления подключения',
exc_info=retry_error,
)
if attempt < max_retries:
await asyncio.sleep(retry_delay)
retry_delay *= 2
else:
logger.critical(
'Не удалось сохранить результаты рассылки после %d попыток (id=%s)',
max_retries,
broadcast_id,
)
except Exception as error:
logger.critical(
'Неожиданная ошибка при сохранении результатов рассылки (id=%s)',
broadcast_id,
exc_info=error,
)
return
@admin_required
@@ -1113,15 +1133,35 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
media_file_id = data.get('media_file_id')
media_caption = data.get('media_caption')
# =========================================================================
# КРИТИЧНО: Извлекаем ВСЕ скалярные значения из ORM-объектов СЕЙЧАС,
# пока сессия активна. После начала рассылки соединение с БД может
# закрыться по таймауту, и любое обращение к атрибутам ORM вызовет:
# - MissingGreenlet (lazy loading вне async контекста)
# - InterfaceError (соединение закрыто)
# =========================================================================
admin_id: int = db_user.id
admin_name: str = db_user.full_name # property, читает first_name/last_name
admin_telegram_id: int | None = db_user.telegram_id
admin_language: str = db_user.language
await safe_edit_or_send_text(
callback, '📨 Начинаю рассылку...\n\n⏳ Это может занять несколько минут.', reply_markup=None, parse_mode='HTML'
)
# Загружаем пользователей и сразу извлекаем telegram_id в список
# чтобы не обращаться к ORM-объектам во время долгой рассылки
if target.startswith('custom_'):
users = await get_custom_users(db, target.replace('custom_', ''))
users_orm = await get_custom_users(db, target.replace('custom_', ''))
else:
users = await get_target_users(db, target)
users_orm = await get_target_users(db, target)
# Извлекаем только telegram_id - это всё что нужно для отправки
# Фильтруем None (email-only пользователи)
recipient_telegram_ids: list[int] = [user.telegram_id for user in users_orm if user.telegram_id is not None]
total_users_count = len(users_orm)
# Создаём запись истории рассылки
broadcast_history = BroadcastHistory(
target_type=target,
message_text=message_text,
@@ -1129,21 +1169,29 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
media_type=media_type,
media_file_id=media_file_id,
media_caption=media_caption,
total_count=len(users),
total_count=total_users_count,
sent_count=0,
failed_count=0,
admin_id=db_user.id,
admin_name=db_user.full_name,
admin_id=admin_id,
admin_name=admin_name,
status='in_progress',
)
db.add(broadcast_history)
await db.commit()
await db.refresh(broadcast_history)
# Сохраняем ID - это единственное что нам нужно после коммита
broadcast_id: int = broadcast_history.id
# =========================================================================
# С этого момента НЕ используем db сессию и ORM-объекты!
# Работаем только со скалярными значениями.
# =========================================================================
sent_count = 0
failed_count = 0
broadcast_keyboard = create_broadcast_keyboard(selected_buttons, db_user.language)
broadcast_keyboard = create_broadcast_keyboard(selected_buttons, admin_language)
# Ограничение на количество одновременных отправок и базовая задержка между сообщениями,
# чтобы избежать перегрузки бота и лимитов Telegram при больших рассылках
@@ -1151,20 +1199,15 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
per_message_delay = 0.05
semaphore = asyncio.Semaphore(max_concurrent_sends)
async def send_single_broadcast(user):
"""Отправляет одно сообщение рассылки с семафором ограничения"""
# Skip email-only users (no telegram_id)
if not user.telegram_id:
logger.debug('Пропуск email-пользователя %s при рассылке', user.id)
return False, None
async def send_single_broadcast(telegram_id: int) -> tuple[bool, int]:
"""Отправляет одно сообщение рассылки с семафором ограничения."""
async with semaphore:
for attempt in range(3):
try:
if has_media and media_file_id:
if media_type == 'photo':
await callback.bot.send_photo(
chat_id=user.telegram_id,
chat_id=telegram_id,
photo=media_file_id,
caption=message_text,
parse_mode='HTML',
@@ -1172,7 +1215,7 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
)
elif media_type == 'video':
await callback.bot.send_video(
chat_id=user.telegram_id,
chat_id=telegram_id,
video=media_file_id,
caption=message_text,
parse_mode='HTML',
@@ -1180,7 +1223,7 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
)
elif media_type == 'document':
await callback.bot.send_document(
chat_id=user.telegram_id,
chat_id=telegram_id,
document=media_file_id,
caption=message_text,
parse_mode='HTML',
@@ -1188,38 +1231,36 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
)
else:
await callback.bot.send_message(
chat_id=user.telegram_id,
chat_id=telegram_id,
text=message_text,
parse_mode='HTML',
reply_markup=broadcast_keyboard,
)
await asyncio.sleep(per_message_delay)
return True, user.telegram_id
return True, telegram_id
except TelegramRetryAfter as e:
retry_delay = min(e.retry_after + 1, 30)
logger.warning(f'Превышен лимит Telegram для {user.telegram_id}, ожидание {retry_delay} сек.')
logger.warning(f'Превышен лимит Telegram для {telegram_id}, ожидание {retry_delay} сек.')
await asyncio.sleep(retry_delay)
except TelegramForbiddenError:
# Пользователь мог удалить бота или запретить сообщения
logger.info(f'Рассылка недоступна для пользователя {user.telegram_id}: Forbidden')
return False, user.telegram_id
logger.info(f'Рассылка недоступна для пользователя {telegram_id}: Forbidden')
return False, telegram_id
except TelegramBadRequest as e:
logger.error(f'Некорректный запрос при рассылке пользователю {user.telegram_id}: {e}')
return False, user.telegram_id
logger.error(f'Некорректный запрос при рассылке пользователю {telegram_id}: {e}')
return False, telegram_id
except Exception as e:
logger.error(
f'Ошибка отправки рассылки пользователю {user.telegram_id} (попытка {attempt + 1}/3): {e}'
)
logger.error(f'Ошибка отправки рассылки пользователю {telegram_id} (попытка {attempt + 1}/3): {e}')
await asyncio.sleep(0.5 * (attempt + 1))
return False, user.telegram_id
return False, telegram_id
# Отправляем сообщения пакетами для эффективности
batch_size = 50
for i in range(0, len(users), batch_size):
batch = users[i : i + batch_size]
tasks = [send_single_broadcast(user) for user in batch]
for i in range(0, len(recipient_telegram_ids), batch_size):
batch = recipient_telegram_ids[i : i + batch_size]
tasks = [send_single_broadcast(tid) for tid in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
@@ -1235,10 +1276,16 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
# Небольшая задержка между пакетами для снижения нагрузки на API
await asyncio.sleep(0.25)
# Учитываем пропущенных email-only пользователей
skipped_email_users = total_users_count - len(recipient_telegram_ids)
if skipped_email_users > 0:
logger.info(f'Пропущено {skipped_email_users} email-only пользователей при рассылке')
status = 'completed' if failed_count == 0 else 'partial'
# Сохраняем результат в НОВОЙ сессии (старая уже мертва)
await _persist_broadcast_result(
db=db,
broadcast_history=broadcast_history,
broadcast_id=broadcast_id,
sent_count=sent_count,
failed_count=failed_count,
status=status,
@@ -1248,16 +1295,17 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
if has_media:
media_info = f'\n🖼️ <b>Медиафайл:</b> {media_type}'
# Используем заранее сохранённое имя админа
result_text = f"""
✅ <b>Рассылка завершена!</b>
📊 <b>Результат:</b>
- Отправлено: {sent_count}
- Не доставлено: {failed_count}
- Всего пользователей: {len(users)}
- Успешность: {round(sent_count / len(users) * 100, 1) if users else 0}%{media_info}
- Всего пользователей: {total_users_count}
- Успешность: {round(sent_count / total_users_count * 100, 1) if total_users_count else 0}%{media_info}
<b>Администратор:</b> {db_user.full_name}
<b>Администратор:</b> {admin_name}
"""
try:
@@ -1290,7 +1338,9 @@ async def confirm_broadcast(callback: types.CallbackQuery, db_user: User, state:
raise
await state.clear()
logger.info(f'Рассылка выполнена админом {db_user.telegram_id}: {sent_count}/{len(users)} (медиа: {has_media})')
logger.info(
f'Рассылка выполнена админом {admin_telegram_id}: {sent_count}/{total_users_count} (медиа: {has_media})'
)
async def get_target_users_count(db: AsyncSession, target: str) -> int:

View File

@@ -123,7 +123,15 @@ async def broadcast_pinned_message(
db: AsyncSession,
pinned_message: PinnedMessage,
) -> tuple[int, int]:
users: list[User] = []
"""
Рассылает закреплённое сообщение всем активным пользователям.
ВАЖНО: Извлекаем telegram_id в список ДО начала долгой рассылки,
чтобы избежать обращения к ORM-объектам после истечения таймаута
соединения с БД.
"""
# Собираем telegram_id всех активных пользователей
recipient_telegram_ids: list[int] = []
offset = 0
batch_size = 5000
@@ -138,27 +146,26 @@ async def broadcast_pinned_message(
if not batch:
break
users.extend(batch)
# Извлекаем только telegram_id, фильтруем email-only пользователей
for user in batch:
if user.telegram_id is not None:
recipient_telegram_ids.append(user.telegram_id)
offset += batch_size
sent_count = 0
failed_count = 0
semaphore = asyncio.Semaphore(3)
async def send_to_user(user: User) -> None:
async def send_to_telegram_id(telegram_id: int) -> None:
nonlocal sent_count, failed_count
# Skip email-only users (no telegram_id)
if not user.telegram_id:
failed_count += 1
return
async with semaphore:
for attempt in range(3):
try:
success = await _send_and_pin_message(
bot,
user.telegram_id,
telegram_id,
pinned_message,
)
if success:
@@ -170,22 +177,22 @@ async def broadcast_pinned_message(
delay = min(retry_error.retry_after + 1, 30)
logger.warning(
'RetryAfter for user %s, waiting %s seconds',
user.telegram_id,
telegram_id,
delay,
)
await asyncio.sleep(delay)
except Exception as send_error:
logger.error(
'Ошибка отправки закрепленного сообщения пользователю %s: %s',
user.telegram_id,
telegram_id,
send_error,
)
failed_count += 1
break
for i in range(0, len(users), 30):
batch = users[i : i + 30]
tasks = [send_to_user(user) for user in batch]
for i in range(0, len(recipient_telegram_ids), 30):
batch = recipient_telegram_ids[i : i + 30]
tasks = [send_to_telegram_id(tid) for tid in batch]
await asyncio.gather(*tasks)
await asyncio.sleep(0.05)
@@ -196,11 +203,19 @@ async def unpin_active_pinned_message(
bot: Bot,
db: AsyncSession,
) -> tuple[int, int, bool]:
"""
Открепляет активное сообщение у всех пользователей.
ВАЖНО: Извлекаем telegram_id в список ДО начала долгой операции,
чтобы избежать обращения к ORM-объектам после истечения таймаута
соединения с БД.
"""
pinned_message = await deactivate_active_pinned_message(db)
if not pinned_message:
return 0, 0, False
users: list[User] = []
# Собираем telegram_id всех активных пользователей
recipient_telegram_ids: list[int] = []
offset = 0
batch_size = 5000
@@ -215,24 +230,23 @@ async def unpin_active_pinned_message(
if not batch:
break
users.extend(batch)
# Извлекаем только telegram_id, фильтруем email-only пользователей
for user in batch:
if user.telegram_id is not None:
recipient_telegram_ids.append(user.telegram_id)
offset += batch_size
unpinned_count = 0
failed_count = 0
semaphore = asyncio.Semaphore(5)
async def unpin_for_user(user: User) -> None:
async def unpin_for_telegram_id(telegram_id: int) -> None:
nonlocal unpinned_count, failed_count
# Skip email-only users (no telegram_id)
if not user.telegram_id:
failed_count += 1
return
async with semaphore:
try:
success = await _unpin_message_for_user(bot, user.telegram_id)
success = await _unpin_message_for_user(bot, telegram_id)
if success:
unpinned_count += 1
else:
@@ -241,22 +255,30 @@ async def unpin_active_pinned_message(
delay = min(retry_error.retry_after + 1, 30)
logger.warning(
'RetryAfter while unpinning for user %s, waiting %s seconds',
user.telegram_id,
telegram_id,
delay,
)
await asyncio.sleep(delay)
await unpin_for_user(user)
# Повторная попытка после ожидания
try:
success = await _unpin_message_for_user(bot, telegram_id)
if success:
unpinned_count += 1
else:
failed_count += 1
except Exception:
failed_count += 1
except Exception as error:
logger.error(
'Ошибка открепления сообщения у пользователя %s: %s',
user.telegram_id,
telegram_id,
error,
)
failed_count += 1
for i in range(0, len(users), 40):
batch = users[i : i + 40]
tasks = [unpin_for_user(user) for user in batch]
for i in range(0, len(recipient_telegram_ids), 40):
batch = recipient_telegram_ids[i : i + 40]
tasks = [unpin_for_telegram_id(tid) for tid in batch]
await asyncio.gather(*tasks)
await asyncio.sleep(0.05)