diff --git a/app/database/models.py b/app/database/models.py index 13cdfd0b..c2b300e3 100644 --- a/app/database/models.py +++ b/app/database/models.py @@ -611,6 +611,7 @@ class User(Base): promo_group = relationship("PromoGroup", back_populates="users") user_promo_groups = relationship("UserPromoGroup", back_populates="user", cascade="all, delete-orphan") poll_responses = relationship("PollResponse", back_populates="user") + last_pinned_message_id = Column(Integer, nullable=True) @property def balance_rubles(self) -> float: @@ -1553,6 +1554,23 @@ class WelcomeText(Base): creator = relationship("User", backref="created_welcome_texts") +class PinnedMessage(Base): + __tablename__ = "pinned_messages" + + id = Column(Integer, primary_key=True, index=True) + content = Column(Text, nullable=False, default="") + media_type = Column(String(32), nullable=True) + media_file_id = Column(String(255), nullable=True) + send_before_menu = Column(Boolean, nullable=False, server_default="1", default=True) + send_on_every_start = Column(Boolean, nullable=False, server_default="1", default=True) + is_active = Column(Boolean, default=True) + created_by = Column(Integer, ForeignKey("users.id", ondelete="SET NULL"), nullable=True) + created_at = Column(DateTime, default=func.now()) + updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) + + creator = relationship("User", backref="pinned_messages") + + class AdvertisingCampaign(Base): __tablename__ = "advertising_campaigns" diff --git a/app/database/universal_migration.py b/app/database/universal_migration.py index e418502e..e0d7e6c8 100644 --- a/app/database/universal_migration.py +++ b/app/database/universal_migration.py @@ -3014,6 +3014,157 @@ async def create_welcome_texts_table(): logger.error(f"Ошибка создания таблицы welcome_texts: {e}") return False + +async def create_pinned_messages_table(): + table_exists = await check_table_exists("pinned_messages") + if table_exists: + logger.info("Таблица pinned_messages уже существует") + return True + + try: + async with engine.begin() as conn: + db_type = await get_database_type() + + if db_type == "sqlite": + create_sql = """ + CREATE TABLE pinned_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + content TEXT NOT NULL DEFAULT '', + media_type VARCHAR(32) NULL, + media_file_id VARCHAR(255) NULL, + send_before_menu BOOLEAN NOT NULL DEFAULT 1, + send_on_every_start BOOLEAN NOT NULL DEFAULT 1, + is_active BOOLEAN DEFAULT 1, + created_by INTEGER NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL + ); + + CREATE INDEX IF NOT EXISTS ix_pinned_messages_active ON pinned_messages(is_active); + """ + + elif db_type == "postgresql": + create_sql = """ + CREATE TABLE pinned_messages ( + id SERIAL PRIMARY KEY, + content TEXT NOT NULL DEFAULT '', + media_type VARCHAR(32) NULL, + media_file_id VARCHAR(255) NULL, + send_before_menu BOOLEAN NOT NULL DEFAULT TRUE, + send_on_every_start BOOLEAN NOT NULL DEFAULT TRUE, + is_active BOOLEAN DEFAULT TRUE, + created_by INTEGER NULL REFERENCES users(id) ON DELETE SET NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + + CREATE INDEX IF NOT EXISTS ix_pinned_messages_active ON pinned_messages(is_active); + """ + + elif db_type == "mysql": + create_sql = """ + CREATE TABLE pinned_messages ( + id INT AUTO_INCREMENT PRIMARY KEY, + content TEXT NOT NULL DEFAULT '', + media_type VARCHAR(32) NULL, + media_file_id VARCHAR(255) NULL, + send_before_menu BOOLEAN NOT NULL DEFAULT TRUE, + send_on_every_start BOOLEAN NOT NULL DEFAULT TRUE, + is_active BOOLEAN DEFAULT TRUE, + created_by INT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL + ); + + CREATE INDEX ix_pinned_messages_active ON pinned_messages(is_active); + """ + + else: + logger.error(f"Неподдерживаемый тип БД для создания таблицы pinned_messages: {db_type}") + return False + + await conn.execute(text(create_sql)) + + logger.info("✅ Таблица pinned_messages успешно создана") + return True + + except Exception as e: + logger.error(f"Ошибка создания таблицы pinned_messages: {e}") + return False + + +async def ensure_pinned_message_media_columns(): + table_exists = await check_table_exists("pinned_messages") + if not table_exists: + logger.warning("⚠️ Таблица pinned_messages отсутствует — пропускаем обновление медиа полей") + return False + + try: + async with engine.begin() as conn: + db_type = await get_database_type() + + if not await check_column_exists("pinned_messages", "media_type"): + await conn.execute( + text("ALTER TABLE pinned_messages ADD COLUMN media_type VARCHAR(32)") + ) + + if not await check_column_exists("pinned_messages", "media_file_id"): + await conn.execute( + text("ALTER TABLE pinned_messages ADD COLUMN media_file_id VARCHAR(255)") + ) + + if not await check_column_exists("pinned_messages", "send_before_menu"): + default_value = "TRUE" if db_type != "sqlite" else "1" + await conn.execute( + text( + f"ALTER TABLE pinned_messages ADD COLUMN send_before_menu BOOLEAN NOT NULL DEFAULT {default_value}" + ) + ) + + if not await check_column_exists("pinned_messages", "send_on_every_start"): + default_value = "TRUE" if db_type != "sqlite" else "1" + await conn.execute( + text( + f"ALTER TABLE pinned_messages ADD COLUMN send_on_every_start BOOLEAN NOT NULL DEFAULT {default_value}" + ) + ) + + await conn.execute(text("UPDATE pinned_messages SET content = '' WHERE content IS NULL")) + + if db_type == "postgresql": + await conn.execute( + text("ALTER TABLE pinned_messages ALTER COLUMN content SET DEFAULT ''") + ) + elif db_type == "mysql": + await conn.execute( + text("ALTER TABLE pinned_messages MODIFY content TEXT NOT NULL DEFAULT ''") + ) + else: + logger.info("ℹ️ Пропускаем установку DEFAULT для content в SQLite") + + logger.info("✅ Медиа поля pinned_messages приведены в актуальное состояние") + return True + + except Exception as e: + logger.error(f"Ошибка обновления медиа полей pinned_messages: {e}") + return False + + +async def ensure_user_last_pinned_column(): + try: + async with engine.begin() as conn: + if not await check_column_exists("users", "last_pinned_message_id"): + await conn.execute( + text("ALTER TABLE users ADD COLUMN last_pinned_message_id INTEGER") + ) + logger.info("✅ Поле last_pinned_message_id у пользователей готово") + return True + except Exception as e: + logger.error(f"Ошибка добавления поля last_pinned_message_id: {e}") + return False + async def add_media_fields_to_broadcast_history(): logger.info("=== ДОБАВЛЕНИЕ ПОЛЕЙ МЕДИА В BROADCAST_HISTORY ===") @@ -4690,12 +4841,33 @@ async def run_universal_migration(): else: logger.warning("⚠️ Проблемы с таблицей user_messages") + logger.info("=== СОЗДАНИЕ ТАБЛИЦЫ PINNED_MESSAGES ===") + pinned_messages_created = await create_pinned_messages_table() + if pinned_messages_created: + logger.info("✅ Таблица pinned_messages готова") + else: + logger.warning("⚠️ Проблемы с таблицей pinned_messages") + logger.info("=== СОЗДАНИЕ/ОБНОВЛЕНИЕ ТАБЛИЦЫ WELCOME_TEXTS ===") welcome_texts_created = await create_welcome_texts_table() if welcome_texts_created: logger.info("✅ Таблица welcome_texts готова с полем is_enabled") else: logger.warning("⚠️ Проблемы с таблицей welcome_texts") + + logger.info("=== ОБНОВЛЕНИЕ СХЕМЫ PINNED_MESSAGES ===") + pinned_media_ready = await ensure_pinned_message_media_columns() + if pinned_media_ready: + logger.info("✅ Медиа поля для pinned_messages готовы") + else: + logger.warning("⚠️ Проблемы с медиа полями pinned_messages") + + logger.info("=== ДОБАВЛЕНИЕ СЛЕДА ОТПРАВКИ ЗАКРЕПА ДЛЯ ПОЛЬЗОВАТЕЛЕЙ ===") + last_pinned_ready = await ensure_user_last_pinned_column() + if last_pinned_ready: + logger.info("✅ Колонка last_pinned_message_id добавлена") + else: + logger.warning("⚠️ Не удалось обновить колонку last_pinned_message_id") logger.info("=== ДОБАВЛЕНИЕ МЕДИА ПОЛЕЙ В BROADCAST_HISTORY ===") media_fields_added = await add_media_fields_to_broadcast_history() @@ -4880,8 +5052,13 @@ async def check_migration_status(): "cryptobot_table": False, "heleket_table": False, "user_messages_table": False, + "pinned_messages_table": False, "welcome_texts_table": False, "welcome_texts_is_enabled_column": False, + "pinned_messages_media_columns": False, + "pinned_messages_position_column": False, + "pinned_messages_start_mode_column": False, + "users_last_pinned_column": False, "broadcast_history_media_fields": False, "subscription_duplicates": False, "subscription_conversions_table": False, @@ -4924,6 +5101,7 @@ async def check_migration_status(): status["cryptobot_table"] = await check_table_exists('cryptobot_payments') status["heleket_table"] = await check_table_exists('heleket_payments') status["user_messages_table"] = await check_table_exists('user_messages') + status["pinned_messages_table"] = await check_table_exists('pinned_messages') status["welcome_texts_table"] = await check_table_exists('welcome_texts') status["privacy_policies_table"] = await check_table_exists('privacy_policies') status["public_offers_table"] = await check_table_exists('public_offers') @@ -4969,6 +5147,25 @@ async def check_migration_status(): await check_column_exists('broadcast_history', 'media_caption') ) status["broadcast_history_media_fields"] = media_fields_exist + + pinned_media_columns_exist = ( + status["pinned_messages_table"] + and await check_column_exists('pinned_messages', 'media_type') + and await check_column_exists('pinned_messages', 'media_file_id') + ) + status["pinned_messages_media_columns"] = pinned_media_columns_exist + + status["pinned_messages_position_column"] = ( + status["pinned_messages_table"] + and await check_column_exists('pinned_messages', 'send_before_menu') + ) + + status["pinned_messages_start_mode_column"] = ( + status["pinned_messages_table"] + and await check_column_exists('pinned_messages', 'send_on_every_start') + ) + + status["users_last_pinned_column"] = await check_column_exists('users', 'last_pinned_message_id') async with engine.begin() as conn: duplicates_check = await conn.execute(text(""" @@ -4987,10 +5184,15 @@ async def check_migration_status(): "cryptobot_table": "Таблица CryptoBot payments", "heleket_table": "Таблица Heleket payments", "user_messages_table": "Таблица пользовательских сообщений", + "pinned_messages_table": "Таблица закреплённых сообщений", "welcome_texts_table": "Таблица приветственных текстов", "privacy_policies_table": "Таблица политик конфиденциальности", "public_offers_table": "Таблица публичных оферт", "welcome_texts_is_enabled_column": "Поле is_enabled в welcome_texts", + "pinned_messages_media_columns": "Медиа поля в pinned_messages", + "pinned_messages_position_column": "Позиция закрепа (до/после меню)", + "pinned_messages_start_mode_column": "Режим отправки закрепа при /start", + "users_last_pinned_column": "Колонка last_pinned_message_id у пользователей", "broadcast_history_media_fields": "Медиа поля в broadcast_history", "subscription_conversions_table": "Таблица конверсий подписок", "subscription_events_table": "Таблица событий подписок", diff --git a/app/handlers/admin/messages.py b/app/handlers/admin/messages.py index 3bf0210f..5579417a 100644 --- a/app/handlers/admin/messages.py +++ b/app/handlers/admin/messages.py @@ -1,3 +1,4 @@ +import html import logging import asyncio from datetime import datetime, timedelta @@ -25,13 +26,19 @@ from app.keyboards.admin import ( get_admin_pagination_keyboard, get_broadcast_media_keyboard, get_media_confirm_keyboard, get_updated_message_buttons_selector_keyboard_with_media, BROADCAST_BUTTON_ROWS, DEFAULT_BROADCAST_BUTTONS, - get_broadcast_button_config, get_broadcast_button_labels + get_broadcast_button_config, get_broadcast_button_labels, get_pinned_message_keyboard ) from app.localization.texts import get_texts from app.database.crud.user import get_users_list from app.database.crud.subscription import get_expiring_subscriptions from app.utils.decorators import admin_required, error_handler from app.utils.miniapp_buttons import build_miniapp_or_callback_button +from app.services.pinned_message_service import ( + broadcast_pinned_message, + get_active_pinned_message, + set_active_pinned_message, + unpin_active_pinned_message, +) logger = logging.getLogger(__name__) @@ -167,6 +174,302 @@ async def show_messages_menu( await callback.answer() +@admin_required +@error_handler +async def show_pinned_message_menu( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, + state: FSMContext, +): + await state.clear() + pinned_message = await get_active_pinned_message(db) + + if pinned_message: + content_preview = html.escape(pinned_message.content or "") + last_updated = pinned_message.updated_at or pinned_message.created_at + timestamp_text = last_updated.strftime("%d.%m.%Y %H:%M") if last_updated else "—" + media_line = "" + if pinned_message.media_type: + media_label = "Фото" if pinned_message.media_type == "photo" else "Видео" + media_line = f"📎 Медиа: {media_label}\n" + position_line = ( + "⬆️ Отправлять перед меню" + if pinned_message.send_before_menu + else "⬇️ Отправлять после меню" + ) + start_mode_line = ( + "🔁 При каждом /start" + if pinned_message.send_on_every_start + else "🚫 Только один раз и при обновлении" + ) + body = ( + "📌 Закрепленное сообщение\n\n" + "📝 Текущий текст:\n" + f"{content_preview}\n\n" + f"{media_line}" + f"{position_line}\n" + f"{start_mode_line}\n" + f"🕒 Обновлено: {timestamp_text}" + ) + else: + body = ( + "📌 Закрепленное сообщение\n\n" + "Сообщение не задано. Отправьте новый текст, чтобы разослать и закрепить его у пользователей." + ) + + await callback.message.edit_text( + body, + reply_markup=get_pinned_message_keyboard( + db_user.language, + send_before_menu=getattr(pinned_message, "send_before_menu", True), + send_on_every_start=getattr(pinned_message, "send_on_every_start", True), + ), + parse_mode="HTML", + ) + await callback.answer() + + +@admin_required +@error_handler +async def prompt_pinned_message_update( + callback: types.CallbackQuery, + db_user: User, + state: FSMContext, +): + await state.set_state(AdminStates.editing_pinned_message) + await callback.message.edit_text( + "✏️ Новое закрепленное сообщение\n\n" + "Пришлите текст, фото или видео, которое нужно закрепить.\n" + "Бот отправит его всем активным пользователям, открепит старое и закрепит новое без уведомлений.", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [types.InlineKeyboardButton(text="❌ Отмена", callback_data="admin_pinned_message")] + ]), + parse_mode="HTML", + ) + await callback.answer() + + +@admin_required +@error_handler +async def toggle_pinned_message_position( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, + state: FSMContext, +): + pinned_message = await get_active_pinned_message(db) + if not pinned_message: + await callback.answer("Сначала задайте закрепленное сообщение", show_alert=True) + return + + pinned_message.send_before_menu = not pinned_message.send_before_menu + pinned_message.updated_at = datetime.utcnow() + await db.commit() + + await show_pinned_message_menu(callback, db_user, db, state) + + +@admin_required +@error_handler +async def toggle_pinned_message_start_mode( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, + state: FSMContext, +): + pinned_message = await get_active_pinned_message(db) + if not pinned_message: + await callback.answer("Сначала задайте закрепленное сообщение", show_alert=True) + return + + pinned_message.send_on_every_start = not pinned_message.send_on_every_start + pinned_message.updated_at = datetime.utcnow() + await db.commit() + + await show_pinned_message_menu(callback, db_user, db, state) + + +@admin_required +@error_handler +async def delete_pinned_message( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, + state: FSMContext, +): + pinned_message = await get_active_pinned_message(db) + if not pinned_message: + await callback.answer("Закрепленное сообщение уже отсутствует", show_alert=True) + return + + await callback.message.edit_text( + "🗑️ Удаление закрепленного сообщения\n\n" + "Подождите, пока бот открепит сообщение у пользователей...", + parse_mode="HTML", + ) + + unpinned_count, failed_count, deleted = await unpin_active_pinned_message( + callback.bot, + db, + ) + + if not deleted: + await callback.message.edit_text( + "❌ Не удалось найти активное закрепленное сообщение для удаления", + reply_markup=get_admin_messages_keyboard(db_user.language), + parse_mode="HTML", + ) + await state.clear() + return + + total = unpinned_count + failed_count + await callback.message.edit_text( + "✅ Закрепленное сообщение удалено\n\n" + f"👥 Чатов обработано: {total}\n" + f"✅ Откреплено: {unpinned_count}\n" + f"⚠️ Ошибок: {failed_count}\n\n" + "Новое сообщение можно задать кнопкой \"Обновить\".", + reply_markup=get_admin_messages_keyboard(db_user.language), + parse_mode="HTML", + ) + await state.clear() + + +@admin_required +@error_handler +async def process_pinned_message_update( + message: types.Message, + db_user: User, + state: FSMContext, + db: AsyncSession, +): + texts = get_texts(db_user.language) + media_type: Optional[str] = None + media_file_id: Optional[str] = None + + if message.photo: + media_type = "photo" + media_file_id = message.photo[-1].file_id + elif message.video: + media_type = "video" + media_file_id = message.video.file_id + + pinned_text = message.html_text or message.caption_html or message.text or message.caption or "" + + if not pinned_text and not media_file_id: + await message.answer( + texts.t("ADMIN_PINNED_NO_CONTENT", "❌ Не удалось прочитать текст или медиа в сообщении, попробуйте снова.") + ) + return + + try: + pinned_message = await set_active_pinned_message( + db, + pinned_text, + db_user.id, + media_type=media_type, + media_file_id=media_file_id, + ) + except ValueError as validation_error: + await message.answer(f"❌ {validation_error}") + return + + # Сообщение сохранено, спрашиваем о рассылке + from app.keyboards.admin import get_pinned_broadcast_confirm_keyboard + from app.states import AdminStates + + await message.answer( + texts.t( + "ADMIN_PINNED_SAVED_ASK_BROADCAST", + "📌 Сообщение сохранено!\n\n" + "Выберите, как доставить сообщение пользователям:\n\n" + "• Разослать сейчас — отправит и закрепит у всех активных пользователей\n" + "• Только при /start — пользователи увидят при следующем запуске бота", + ), + reply_markup=get_pinned_broadcast_confirm_keyboard(db_user.language, pinned_message.id), + parse_mode="HTML", + ) + await state.set_state(AdminStates.confirming_pinned_broadcast) + + +@admin_required +@error_handler +async def handle_pinned_broadcast_now( + callback: types.CallbackQuery, + db_user: User, + state: FSMContext, + db: AsyncSession, +): + """Разослать закреплённое сообщение сейчас всем пользователям.""" + texts = get_texts(db_user.language) + + # Получаем ID сообщения из callback_data + pinned_message_id = int(callback.data.split(":")[1]) + + # Получаем сообщение из БД + from sqlalchemy import select + from app.database.models import PinnedMessage + + result = await db.execute( + select(PinnedMessage).where(PinnedMessage.id == pinned_message_id) + ) + pinned_message = result.scalar_one_or_none() + + if not pinned_message: + await callback.answer("❌ Сообщение не найдено", show_alert=True) + await state.clear() + return + + await callback.message.edit_text( + texts.t("ADMIN_PINNED_SAVING", "📌 Сообщение сохранено. Начинаю отправку и закрепление у пользователей..."), + parse_mode="HTML", + ) + + sent_count, failed_count = await broadcast_pinned_message( + callback.bot, + db, + pinned_message, + ) + + total = sent_count + failed_count + await callback.message.edit_text( + texts.t( + "ADMIN_PINNED_UPDATED", + "✅ Закрепленное сообщение обновлено\n\n" + "👥 Получателей: {total}\n" + "✅ Отправлено: {sent}\n" + "⚠️ Ошибок: {failed}", + ).format(total=total, sent=sent_count, failed=failed_count), + reply_markup=get_admin_messages_keyboard(db_user.language), + parse_mode="HTML", + ) + await state.clear() + + +@admin_required +@error_handler +async def handle_pinned_broadcast_skip( + callback: types.CallbackQuery, + db_user: User, + state: FSMContext, + db: AsyncSession, +): + """Пропустить рассылку — пользователи увидят при /start.""" + texts = get_texts(db_user.language) + + await callback.message.edit_text( + texts.t( + "ADMIN_PINNED_SAVED_NO_BROADCAST", + "✅ Закрепленное сообщение сохранено\n\n" + "Рассылка не выполнена. Пользователи увидят сообщение при следующем вводе /start.", + ), + reply_markup=get_admin_messages_keyboard(db_user.language), + parse_mode="HTML", + ) + await state.clear() + + @admin_required @error_handler async def show_broadcast_targets( @@ -1295,6 +1598,13 @@ def get_target_display_name(target: str) -> str: def register_handlers(dp: Dispatcher): dp.callback_query.register(show_messages_menu, F.data == "admin_messages") + dp.callback_query.register(show_pinned_message_menu, F.data == "admin_pinned_message") + dp.callback_query.register(toggle_pinned_message_position, F.data == "admin_pinned_message_position") + dp.callback_query.register(toggle_pinned_message_start_mode, F.data == "admin_pinned_message_start_mode") + dp.callback_query.register(delete_pinned_message, F.data == "admin_pinned_message_delete") + dp.callback_query.register(prompt_pinned_message_update, F.data == "admin_pinned_message_edit") + dp.callback_query.register(handle_pinned_broadcast_now, F.data.startswith("admin_pinned_broadcast_now:")) + dp.callback_query.register(handle_pinned_broadcast_skip, F.data.startswith("admin_pinned_broadcast_skip:")) dp.callback_query.register(show_broadcast_targets, F.data.in_(["admin_msg_all", "admin_msg_by_sub"])) dp.callback_query.register(select_broadcast_target, F.data.startswith("broadcast_")) dp.callback_query.register(confirm_broadcast, F.data == "admin_confirm_broadcast") @@ -1312,3 +1622,4 @@ def register_handlers(dp: Dispatcher): dp.callback_query.register(handle_change_media, F.data == "change_media") dp.message.register(process_broadcast_message, AdminStates.waiting_for_broadcast_message) dp.message.register(process_broadcast_media, AdminStates.waiting_for_broadcast_media) + dp.message.register(process_pinned_message_update, AdminStates.editing_pinned_message) diff --git a/app/handlers/common.py b/app/handlers/common.py index 3c8549f2..7d389163 100644 --- a/app/handlers/common.py +++ b/app/handlers/common.py @@ -67,7 +67,7 @@ async def handle_cancel( async def handle_unknown_message( message: types.Message, - db_user: User + db_user: User | None = None, ): texts = get_texts(db_user.language if db_user else "ru") @@ -126,6 +126,8 @@ def register_handlers(dp: Dispatcher): dp.message.register( handle_unknown_message, StateFilter(None), - F.successful_payment.is_(None) + F.successful_payment.is_(None), + F.text.is_not(None), + ~F.text.startswith("/"), ) \ No newline at end of file diff --git a/app/handlers/start.py b/app/handlers/start.py index f2e125d1..026ce92f 100644 --- a/app/handlers/start.py +++ b/app/handlers/start.py @@ -1,5 +1,6 @@ import logging from datetime import datetime +from typing import Optional from aiogram import Dispatcher, types, F, Bot from aiogram.enums import ChatMemberStatus from aiogram.exceptions import TelegramForbiddenError @@ -18,7 +19,7 @@ from app.database.crud.campaign import ( get_campaign_by_start_parameter, get_campaign_by_id, ) -from app.database.models import UserStatus, SubscriptionStatus +from app.database.models import PinnedMessage, SubscriptionStatus, UserStatus from app.keyboards.inline import ( get_rules_keyboard, get_privacy_policy_keyboard, @@ -36,6 +37,10 @@ from app.services.subscription_service import SubscriptionService from app.services.support_settings_service import SupportSettingsService from app.services.main_menu_button_service import MainMenuButtonService from app.services.privacy_policy_service import PrivacyPolicyService +from app.services.pinned_message_service import ( + deliver_pinned_message_to_user, + get_active_pinned_message, +) from app.utils.user_utils import generate_unique_referral_code from app.utils.promo_offer import ( build_promo_offer_hint, @@ -61,6 +66,22 @@ def _calculate_subscription_flags(subscription): return has_active_subscription, subscription_is_active +async def _send_pinned_message( + bot: Bot, + db: AsyncSession, + user, + pinned_message: Optional[PinnedMessage] = None, +) -> None: + try: + await deliver_pinned_message_to_user(bot, db, user, pinned_message) + except Exception as error: # noqa: BLE001 + logger.error( + "Не удалось отправить закрепленное сообщение пользователю %s: %s", + getattr(user, "telegram_id", "unknown"), + error, + ) + + async def _apply_campaign_bonus_if_needed( db: AsyncSession, user, @@ -404,6 +425,11 @@ async def cmd_start(message: types.Message, state: FSMContext, db: AsyncSession, user.subscription ) + pinned_message = await get_active_pinned_message(db) + + if pinned_message and pinned_message.send_before_menu: + await _send_pinned_message(message.bot, db, user, pinned_message) + menu_text = await get_main_menu_text(user, texts, db) is_admin = settings.is_admin(user.telegram_id) @@ -438,6 +464,9 @@ async def cmd_start(message: types.Message, state: FSMContext, db: AsyncSession, reply_markup=keyboard, parse_mode="HTML" ) + + if pinned_message and not pinned_message.send_before_menu: + await _send_pinned_message(message.bot, db, user, pinned_message) await state.clear() return @@ -1094,6 +1123,7 @@ async def complete_registration_from_callback( reply_markup=keyboard, parse_mode="HTML" ) + await _send_pinned_message(callback.bot, db, existing_user) except Exception as e: logger.error(f"Ошибка при показе главного меню существующему пользователю: {e}") await callback.message.answer( @@ -1232,6 +1262,7 @@ async def complete_registration_from_callback( reply_markup=get_post_registration_keyboard(user.language), ) logger.info(f"✅ Приветственное сообщение отправлено пользователю {user.telegram_id}") + await _send_pinned_message(callback.bot, db, user) except Exception as e: logger.error(f"Ошибка при отправке приветственного сообщения: {e}") else: @@ -1277,6 +1308,7 @@ async def complete_registration_from_callback( reply_markup=keyboard, parse_mode="HTML" ) + await _send_pinned_message(callback.bot, db, user) logger.info(f"✅ Главное меню показано пользователю {user.telegram_id}") except Exception as e: logger.error(f"Ошибка при показе главного меню: {e}") @@ -1374,6 +1406,7 @@ async def complete_registration( reply_markup=keyboard, parse_mode="HTML" ) + await _send_pinned_message(message.bot, db, existing_user) except Exception as e: logger.error(f"Ошибка при показе главного меню существующему пользователю: {e}") await message.answer( @@ -1535,6 +1568,7 @@ async def complete_registration( reply_markup=get_post_registration_keyboard(user.language), ) logger.info(f"✅ Приветственное сообщение отправлено пользователю {user.telegram_id}") + await _send_pinned_message(message.bot, db, user) except Exception as e: logger.error(f"Ошибка при отправке приветственного сообщения: {e}") else: @@ -1581,6 +1615,7 @@ async def complete_registration( parse_mode="HTML" ) logger.info(f"✅ Главное меню показано пользователю {user.telegram_id}") + await _send_pinned_message(message.bot, db, user) except Exception as e: logger.error(f"Ошибка при показе главного меню: {e}") await message.answer( @@ -1925,6 +1960,7 @@ async def required_sub_channel_check( reply_markup=keyboard, parse_mode="HTML", ) + await _send_pinned_message(bot, db, user) else: from app.keyboards.inline import get_rules_keyboard diff --git a/app/keyboards/admin.py b/app/keyboards/admin.py index 0ab2cd7f..3bd744cb 100644 --- a/app/keyboards/admin.py +++ b/app/keyboards/admin.py @@ -837,12 +837,91 @@ def get_admin_messages_keyboard(language: str = "ru") -> InlineKeyboardMarkup: callback_data="admin_msg_history" ) ], + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_PINNED_MESSAGE", "📌 Закрепленное сообщение"), + callback_data="admin_pinned_message", + ) + ], [ InlineKeyboardButton(text=texts.BACK, callback_data="admin_submenu_communications") ] ]) +def get_pinned_message_keyboard( + language: str = "ru", + send_before_menu: bool = True, + send_on_every_start: bool = True, +) -> InlineKeyboardMarkup: + texts = get_texts(language) + + position_label = ( + _t(texts, "ADMIN_PINNED_POSITION_BEFORE", "⬆️ Показать перед меню") + if send_before_menu + else _t(texts, "ADMIN_PINNED_POSITION_AFTER", "⬇️ Показать после меню") + ) + toggle_callback = "admin_pinned_message_position" + + start_mode_label = ( + _t(texts, "ADMIN_PINNED_START_EVERY_TIME", "🔁 Показать при каждом /start") + if send_on_every_start + else _t(texts, "ADMIN_PINNED_START_ONCE", "🚫 Показывать только один раз") + ) + start_mode_callback = "admin_pinned_message_start_mode" + + return InlineKeyboardMarkup(inline_keyboard=[ + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_PINNED_MESSAGE_UPDATE", "✏️ Обновить"), + callback_data="admin_pinned_message_edit", + ) + ], + [ + InlineKeyboardButton( + text=position_label, + callback_data=toggle_callback, + ) + ], + [ + InlineKeyboardButton( + text=start_mode_label, + callback_data=start_mode_callback, + ) + ], + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_PINNED_MESSAGE_DELETE", "🗑️ Удалить и отключить"), + callback_data="admin_pinned_message_delete", + ) + ], + [InlineKeyboardButton(text=texts.BACK, callback_data="admin_messages")], + ]) + + +def get_pinned_broadcast_confirm_keyboard( + language: str = "ru", + pinned_message_id: int = 0, +) -> InlineKeyboardMarkup: + """Клавиатура для выбора: разослать сейчас или только при /start.""" + texts = get_texts(language) + + return InlineKeyboardMarkup(inline_keyboard=[ + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_PINNED_BROADCAST_NOW", "📨 Разослать сейчас всем"), + callback_data=f"admin_pinned_broadcast_now:{pinned_message_id}", + ) + ], + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_PINNED_BROADCAST_ON_START", "⏳ Только при /start"), + callback_data=f"admin_pinned_broadcast_skip:{pinned_message_id}", + ) + ], + ]) + + def get_admin_monitoring_keyboard(language: str = "ru") -> InlineKeyboardMarkup: texts = get_texts(language) diff --git a/app/localization/locales/en.json b/app/localization/locales/en.json index 1c6bc309..a930438e 100644 --- a/app/localization/locales/en.json +++ b/app/localization/locales/en.json @@ -209,6 +209,20 @@ "ADMIN_MESSAGES_BY_CRITERIA": "🔍 By criteria", "ADMIN_MESSAGES_BY_SUBSCRIPTIONS": "🎯 By subscriptions", "ADMIN_MESSAGES_HISTORY": "📋 History", + "ADMIN_PINNED_MESSAGE": "📌 Pinned message", + "ADMIN_PINNED_MESSAGE_UPDATE": "✏️ Update", + "ADMIN_PINNED_MESSAGE_DELETE": "🗑️ Remove and disable", + "ADMIN_PINNED_POSITION_BEFORE": "⬆️ Send before menu", + "ADMIN_PINNED_POSITION_AFTER": "⬇️ Send after menu", + "ADMIN_PINNED_START_EVERY_TIME": "🔁 Show on every /start", + "ADMIN_PINNED_START_ONCE": "🚫 Show only once", + "ADMIN_PINNED_NO_CONTENT": "❌ Could not read text or media from the message, please try again.", + "ADMIN_PINNED_SAVING": "📌 Message saved. Starting broadcast and pinning for users...", + "ADMIN_PINNED_UPDATED": "✅ Pinned message updated\n\n👥 Recipients: {total}\n✅ Sent: {sent}\n⚠️ Errors: {failed}", + "ADMIN_PINNED_SAVED_ASK_BROADCAST": "📌 Message saved!\n\nChoose how to deliver the message to users:\n\n• Broadcast now — will send and pin for all active users\n• Only on /start — users will see it on next bot launch", + "ADMIN_PINNED_SAVED_NO_BROADCAST": "✅ Pinned message saved\n\nNo broadcast performed. Users will see the message on their next /start.", + "ADMIN_PINNED_BROADCAST_NOW": "📨 Broadcast now to all", + "ADMIN_PINNED_BROADCAST_ON_START": "⏳ Only on /start", "ADMIN_MONITORING": "🔍 Monitoring", "ADMIN_MONITORING_ALL_LOGS": "📋 All logs", "ADMIN_MONITORING_AUTOPAY_SETTINGS": "💳 Auto-pay settings", diff --git a/app/localization/locales/ru.json b/app/localization/locales/ru.json index 64d4729e..d0fe6571 100644 --- a/app/localization/locales/ru.json +++ b/app/localization/locales/ru.json @@ -212,6 +212,20 @@ "ADMIN_MESSAGES_BY_CRITERIA": "🔍 По критериям", "ADMIN_MESSAGES_BY_SUBSCRIPTIONS": "🎯 По подпискам", "ADMIN_MESSAGES_HISTORY": "📋 История", + "ADMIN_PINNED_MESSAGE": "📌 Закрепленное сообщение", + "ADMIN_PINNED_MESSAGE_UPDATE": "✏️ Обновить", + "ADMIN_PINNED_MESSAGE_DELETE": "🗑️ Удалить и отключить", + "ADMIN_PINNED_POSITION_BEFORE": "⬆️ Показать перед меню", + "ADMIN_PINNED_POSITION_AFTER": "⬇️ Показать после меню", + "ADMIN_PINNED_START_EVERY_TIME": "🔁 Показать при каждом /start", + "ADMIN_PINNED_START_ONCE": "🚫 Показывать только один раз", + "ADMIN_PINNED_NO_CONTENT": "❌ Не удалось прочитать текст или медиа в сообщении, попробуйте снова.", + "ADMIN_PINNED_SAVING": "📌 Сообщение сохранено. Начинаю отправку и закрепление у пользователей...", + "ADMIN_PINNED_UPDATED": "✅ Закрепленное сообщение обновлено\n\n👥 Получателей: {total}\n✅ Отправлено: {sent}\n⚠️ Ошибок: {failed}", + "ADMIN_PINNED_SAVED_ASK_BROADCAST": "📌 Сообщение сохранено!\n\nВыберите, как доставить сообщение пользователям:\n\n• Разослать сейчас — отправит и закрепит у всех активных пользователей\n• Только при /start — пользователи увидят при следующем запуске бота", + "ADMIN_PINNED_SAVED_NO_BROADCAST": "✅ Закрепленное сообщение сохранено\n\nРассылка не выполнена. Пользователи увидят сообщение при следующем вводе /start.", + "ADMIN_PINNED_BROADCAST_NOW": "📨 Разослать сейчас всем", + "ADMIN_PINNED_BROADCAST_ON_START": "⏳ Только при /start", "ADMIN_MONITORING": "🔍 Мониторинг", "ADMIN_MONITORING_ALL_LOGS": "📋 Все логи", "ADMIN_MONITORING_AUTOPAY_SETTINGS": "💳 Настройки автооплаты", diff --git a/app/localization/locales/ua.json b/app/localization/locales/ua.json index dadb5a5e..06b0aad0 100644 --- a/app/localization/locales/ua.json +++ b/app/localization/locales/ua.json @@ -138,8 +138,22 @@ "ADMIN_MESSAGES_ALL_USERS": "📨 Всім користувачам", "ADMIN_MESSAGES_BY_CRITERIA": "🔍 За критеріями", "ADMIN_MESSAGES_BY_SUBSCRIPTIONS": "🎯 За підписками", - "ADMIN_MESSAGES_HISTORY": "📋 Історія", - "ADMIN_MONITORING": "🔍 Моніторинг", + "ADMIN_MESSAGES_HISTORY": "📋 Історія", + "ADMIN_PINNED_MESSAGE": "📌 Закріплене повідомлення", + "ADMIN_PINNED_MESSAGE_UPDATE": "✏️ Оновити", + "ADMIN_PINNED_MESSAGE_DELETE": "🗑️ Видалити та вимкнути", + "ADMIN_PINNED_POSITION_BEFORE": "⬆️ Показати перед меню", + "ADMIN_PINNED_POSITION_AFTER": "⬇️ Показати після меню", + "ADMIN_PINNED_START_EVERY_TIME": "🔁 Показувати при кожному /start", + "ADMIN_PINNED_START_ONCE": "🚫 Показувати лише один раз", + "ADMIN_PINNED_NO_CONTENT": "❌ Не вдалося прочитати текст або медіа у повідомленні, спробуйте ще раз.", + "ADMIN_PINNED_SAVING": "📌 Повідомлення збережено. Починаю відправку та закріплення у користувачів...", + "ADMIN_PINNED_UPDATED": "✅ Закріплене повідомлення оновлено\n\n👥 Отримувачів: {total}\n✅ Надіслано: {sent}\n⚠️ Помилок: {failed}", + "ADMIN_PINNED_SAVED_ASK_BROADCAST": "📌 Повідомлення збережено!\n\nОберіть, як доставити повідомлення користувачам:\n\n• Розіслати зараз — відправить і закріпить у всіх активних користувачів\n• Тільки при /start — користувачі побачать при наступному запуску бота", + "ADMIN_PINNED_SAVED_NO_BROADCAST": "✅ Закріплене повідомлення збережено\n\nРозсилка не виконана. Користувачі побачать повідомлення при наступному введенні /start.", + "ADMIN_PINNED_BROADCAST_NOW": "📨 Розіслати зараз всім", + "ADMIN_PINNED_BROADCAST_ON_START": "⏳ Тільки при /start", + "ADMIN_MONITORING": "🔍 Моніторинг", "ADMIN_MONITORING_ALL_LOGS": "📋 Всі логи", "ADMIN_MONITORING_AUTOPAY_SETTINGS": "💳 Налаштування автооплати", "ADMIN_MONITORING_AUTO_CLEANUP": "🧹 Автоочищення логів", diff --git a/app/localization/locales/zh.json b/app/localization/locales/zh.json index 44124ed5..bbeb725e 100644 --- a/app/localization/locales/zh.json +++ b/app/localization/locales/zh.json @@ -138,6 +138,20 @@ "ADMIN_MESSAGES_BY_CRITERIA":"🔍按条件", "ADMIN_MESSAGES_BY_SUBSCRIPTIONS":"🎯按订阅", "ADMIN_MESSAGES_HISTORY":"📋历史记录", +"ADMIN_PINNED_MESSAGE":"📌置顶消息", +"ADMIN_PINNED_MESSAGE_UPDATE":"✏️更新", +"ADMIN_PINNED_MESSAGE_DELETE":"🗑️删除并停用", +"ADMIN_PINNED_POSITION_BEFORE":"⬆️菜单前发送", +"ADMIN_PINNED_POSITION_AFTER":"⬇️菜单后发送", +"ADMIN_PINNED_START_EVERY_TIME":"🔁 每次 /start 时发送", +"ADMIN_PINNED_START_ONCE":"🚫 仅发送一次", +"ADMIN_PINNED_NO_CONTENT":"❌ 无法读取消息中的文本或媒体,请重试。", +"ADMIN_PINNED_SAVING":"📌 消息已保存。开始向用户发送并置顶...", +"ADMIN_PINNED_UPDATED":"✅ 置顶消息已更新\n\n👥 收件人: {total}\n✅ 已发送: {sent}\n⚠️ 错误: {failed}", +"ADMIN_PINNED_SAVED_ASK_BROADCAST":"📌 消息已保存!\n\n选择如何向用户发送消息:\n\n• 立即广播 — 将发送并置顶给所有活跃用户\n• 仅在 /start 时 — 用户将在下次启动机器人时看到", +"ADMIN_PINNED_SAVED_NO_BROADCAST":"✅ 置顶消息已保存\n\n未执行广播。用户将在下次输入 /start 时看到消息。", +"ADMIN_PINNED_BROADCAST_NOW":"📨 立即广播给所有人", +"ADMIN_PINNED_BROADCAST_ON_START":"⏳ 仅在 /start 时", "ADMIN_MONITORING":"🔍监控", "ADMIN_MONITORING_ALL_LOGS":"📋所有日志", "ADMIN_MONITORING_AUTOPAY_SETTINGS":"💳自动支付设置", diff --git a/app/services/pinned_message_service.py b/app/services/pinned_message_service.py new file mode 100644 index 00000000..4a00f32f --- /dev/null +++ b/app/services/pinned_message_service.py @@ -0,0 +1,346 @@ +import asyncio +import logging +from datetime import datetime +from typing import Optional + +from aiogram import Bot +from aiogram.exceptions import ( + TelegramBadRequest, + TelegramForbiddenError, + TelegramRetryAfter, +) +from sqlalchemy import select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database.crud.user import get_users_list +from app.database.database import AsyncSessionLocal +from app.database.models import PinnedMessage, User, UserStatus +from app.utils.validators import sanitize_html, validate_html_tags + +logger = logging.getLogger(__name__) + + +async def get_active_pinned_message(db: AsyncSession) -> Optional[PinnedMessage]: + result = await db.execute( + select(PinnedMessage) + .where(PinnedMessage.is_active.is_(True)) + .order_by(PinnedMessage.created_at.desc()) + .limit(1) + ) + return result.scalar_one_or_none() + + +async def set_active_pinned_message( + db: AsyncSession, + content: str, + created_by: Optional[int] = None, + media_type: Optional[str] = None, + media_file_id: Optional[str] = None, + send_before_menu: Optional[bool] = None, + send_on_every_start: Optional[bool] = None, +) -> PinnedMessage: + sanitized_content = sanitize_html(content or "") + is_valid, error_message = validate_html_tags(sanitized_content) + if not is_valid: + raise ValueError(error_message) + + if media_type not in {None, "photo", "video"}: + raise ValueError("Поддерживаются только фото или видео в закрепленном сообщении") + + if created_by is not None: + creator_id = await db.scalar(select(User.id).where(User.id == created_by)) + else: + creator_id = None + + previous_active = await get_active_pinned_message(db) + + await db.execute( + update(PinnedMessage) + .where(PinnedMessage.is_active.is_(True)) + .values(is_active=False) + ) + + pinned_message = PinnedMessage( + content=sanitized_content, + media_type=media_type, + media_file_id=media_file_id, + is_active=True, + created_by=creator_id, + send_before_menu=( + send_before_menu + if send_before_menu is not None + else getattr(previous_active, "send_before_menu", True) + ), + send_on_every_start=( + send_on_every_start + if send_on_every_start is not None + else getattr(previous_active, "send_on_every_start", True) + ), + ) + + db.add(pinned_message) + await db.commit() + await db.refresh(pinned_message) + + logger.info("Создано новое закрепленное сообщение #%s", pinned_message.id) + return pinned_message + + +async def deactivate_active_pinned_message(db: AsyncSession) -> Optional[PinnedMessage]: + pinned_message = await get_active_pinned_message(db) + if not pinned_message: + return None + + pinned_message.is_active = False + pinned_message.updated_at = datetime.utcnow() + await db.commit() + await db.refresh(pinned_message) + logger.info("Деактивировано закрепленное сообщение #%s", pinned_message.id) + return pinned_message + + +async def deliver_pinned_message_to_user( + bot: Bot, + db: AsyncSession, + user: User, + pinned_message: Optional[PinnedMessage] = None, +) -> bool: + pinned_message = pinned_message or await get_active_pinned_message(db) + if not pinned_message: + return False + + if not pinned_message.send_on_every_start: + last_pinned_id = getattr(user, "last_pinned_message_id", None) + if last_pinned_id == pinned_message.id: + return False + + success = await _send_and_pin_message(bot, user.telegram_id, pinned_message) + if success: + await _mark_pinned_delivery(user_id=getattr(user, "id", None), pinned_message_id=pinned_message.id) + return success + + +async def broadcast_pinned_message( + bot: Bot, + db: AsyncSession, + pinned_message: PinnedMessage, +) -> tuple[int, int]: + users: list[User] = [] + offset = 0 + batch_size = 5000 + + while True: + batch = await get_users_list( + db, + offset=offset, + limit=batch_size, + status=UserStatus.ACTIVE, + ) + + if not batch: + break + + users.extend(batch) + offset += batch_size + + sent_count = 0 + failed_count = 0 + semaphore = asyncio.Semaphore(3) + + async def send_to_user(user: User) -> None: + nonlocal sent_count, failed_count + async with semaphore: + for attempt in range(3): + try: + success = await _send_and_pin_message( + bot, + user.telegram_id, + pinned_message, + ) + if success: + sent_count += 1 + else: + failed_count += 1 + break + except TelegramRetryAfter as retry_error: + delay = min(retry_error.retry_after + 1, 30) + logger.warning( + "RetryAfter for user %s, waiting %s seconds", + user.telegram_id, + delay, + ) + await asyncio.sleep(delay) + except Exception as send_error: # noqa: BLE001 + logger.error( + "Ошибка отправки закрепленного сообщения пользователю %s: %s", + user.telegram_id, + send_error, + ) + failed_count += 1 + break + + for i in range(0, len(users), 30): + batch = users[i : i + 30] + tasks = [send_to_user(user) for user in batch] + await asyncio.gather(*tasks) + await asyncio.sleep(0.05) + + return sent_count, failed_count + + +async def unpin_active_pinned_message( + bot: Bot, + db: AsyncSession, +) -> tuple[int, int, bool]: + pinned_message = await deactivate_active_pinned_message(db) + if not pinned_message: + return 0, 0, False + + users: list[User] = [] + offset = 0 + batch_size = 5000 + + while True: + batch = await get_users_list( + db, + offset=offset, + limit=batch_size, + status=UserStatus.ACTIVE, + ) + + if not batch: + break + + users.extend(batch) + offset += batch_size + + unpinned_count = 0 + failed_count = 0 + semaphore = asyncio.Semaphore(5) + + async def unpin_for_user(user: User) -> None: + nonlocal unpinned_count, failed_count + async with semaphore: + try: + success = await _unpin_message_for_user(bot, user.telegram_id) + if success: + unpinned_count += 1 + else: + failed_count += 1 + except TelegramRetryAfter as retry_error: + delay = min(retry_error.retry_after + 1, 30) + logger.warning( + "RetryAfter while unpinning for user %s, waiting %s seconds", + user.telegram_id, + delay, + ) + await asyncio.sleep(delay) + await unpin_for_user(user) + except Exception as error: # noqa: BLE001 + logger.error( + "Ошибка открепления сообщения у пользователя %s: %s", + user.telegram_id, + error, + ) + failed_count += 1 + + for i in range(0, len(users), 40): + batch = users[i : i + 40] + tasks = [unpin_for_user(user) for user in batch] + await asyncio.gather(*tasks) + await asyncio.sleep(0.05) + + return unpinned_count, failed_count, True + + +async def _mark_pinned_delivery( + user_id: Optional[int], + pinned_message_id: int, +) -> None: + if not user_id: + return + + async with AsyncSessionLocal() as session: + await session.execute( + update(User) + .where(User.id == user_id) + .values( + last_pinned_message_id=pinned_message_id, + updated_at=datetime.utcnow(), + ) + ) + await session.commit() + + +async def _send_and_pin_message(bot: Bot, chat_id: int, pinned_message: PinnedMessage) -> bool: + try: + await bot.unpin_all_chat_messages(chat_id=chat_id) + except TelegramBadRequest: + pass + except TelegramForbiddenError: + return False + + try: + if pinned_message.media_type == "photo" and pinned_message.media_file_id: + sent_message = await bot.send_photo( + chat_id=chat_id, + photo=pinned_message.media_file_id, + caption=pinned_message.content or None, + parse_mode="HTML" if pinned_message.content else None, + disable_notification=True, + ) + elif pinned_message.media_type == "video" and pinned_message.media_file_id: + sent_message = await bot.send_video( + chat_id=chat_id, + video=pinned_message.media_file_id, + caption=pinned_message.content or None, + parse_mode="HTML" if pinned_message.content else None, + disable_notification=True, + ) + else: + sent_message = await bot.send_message( + chat_id=chat_id, + text=pinned_message.content, + parse_mode="HTML", + disable_web_page_preview=True, + disable_notification=True, + ) + await bot.pin_chat_message( + chat_id=chat_id, + message_id=sent_message.message_id, + disable_notification=True, + ) + return True + except TelegramForbiddenError: + return False + except TelegramBadRequest as error: + logger.warning( + "Некорректный запрос при отправке закрепленного сообщения в чат %s: %s", + chat_id, + error, + ) + except Exception as error: # noqa: BLE001 + logger.error( + "Не удалось отправить закрепленное сообщение пользователю %s: %s", + chat_id, + error, + ) + + return False + + +async def _unpin_message_for_user(bot: Bot, chat_id: int) -> bool: + try: + await bot.unpin_all_chat_messages(chat_id=chat_id) + return True + except TelegramForbiddenError: + return False + except TelegramBadRequest: + return False + except Exception as error: # noqa: BLE001 + logger.error( + "Не удалось открепить сообщение у пользователя %s: %s", + chat_id, + error, + ) + return False diff --git a/app/services/remnawave_service.py b/app/services/remnawave_service.py index ab644fb0..5c1b9457 100644 --- a/app/services/remnawave_service.py +++ b/app/services/remnawave_service.py @@ -1142,12 +1142,14 @@ class RemnaWaveService: async with self.get_api_client() as api: panel_users = [] start = 0 - size = 100 + size = 500 # Увеличен размер батча для ускорения загрузки while True: logger.info(f"📥 Загружаем пользователей: start={start}, size={size}") - - response = await api.get_all_users(start=start, size=size, enrich_happ_links=True) + + # enrich_happ_links=False - happ_crypto_link уже возвращается API в поле happ.cryptoLink + # Не делаем дополнительные HTTP-запросы для каждого пользователя + response = await api.get_all_users(start=start, size=size, enrich_happ_links=False) users_batch = response['users'] total_users = response['total'] @@ -1370,20 +1372,40 @@ class RemnaWaveService: processed_count = 0 cleanup_uuid_mutations: List[_UUIDMapMutation] = [] - for telegram_id, db_user in bot_users_by_telegram_id.items(): - if telegram_id not in panel_telegram_ids and hasattr(db_user, 'subscription') and db_user.subscription: + # Собираем список пользователей для деактивации + users_to_deactivate = [ + (telegram_id, db_user) + for telegram_id, db_user in bot_users_by_telegram_id.items() + if telegram_id not in panel_telegram_ids + and hasattr(db_user, 'subscription') + and db_user.subscription + ] + + if users_to_deactivate: + logger.info(f"📊 Найдено {len(users_to_deactivate)} пользователей для деактивации") + + # Используем один API клиент для всех операций сброса HWID + hwid_api_client = None + try: + hwid_api_client = self.get_api_client() + await hwid_api_client.__aenter__() + except Exception as api_init_error: + logger.warning(f"⚠️ Не удалось создать API клиент для сброса HWID: {api_init_error}") + hwid_api_client = None + + try: + for telegram_id, db_user in users_to_deactivate: cleanup_mutation: Optional[_UUIDMapMutation] = None try: logger.info(f"🗑️ Деактивация подписки пользователя {telegram_id} (нет в панели)") subscription = db_user.subscription - - if db_user.remnawave_uuid: + + if db_user.remnawave_uuid and hwid_api_client: try: - async with self.get_api_client() as api: - devices_reset = await api.reset_user_devices(db_user.remnawave_uuid) - if devices_reset: - logger.info(f"🔧 Сброшены HWID устройства для пользователя {telegram_id}") + devices_reset = await hwid_api_client.reset_user_devices(db_user.remnawave_uuid) + if devices_reset: + logger.info(f"🔧 Сброшены HWID устройства для пользователя {telegram_id}") except Exception as hwid_error: logger.error(f"❌ Ошибка сброса HWID устройств для {telegram_id}: {hwid_error}") @@ -1459,21 +1481,26 @@ class RemnaWaveService: cleanup_uuid_mutations.clear() stats["errors"] += batch_size break # Прерываем цикл при ошибке коммита - else: - # Увеличиваем счетчик для отслеживания прогресса - processed_count += 1 - # Коммитим оставшиеся изменения - try: - await db.commit() - cleanup_uuid_mutations.clear() - except Exception as final_commit_error: - logger.error(f"❌ Ошибка финального коммита при деактивации: {final_commit_error}") - await db.rollback() - for mutation in reversed(cleanup_uuid_mutations): - mutation.rollback() - cleanup_uuid_mutations.clear() - + # Коммитим оставшиеся изменения + try: + await db.commit() + cleanup_uuid_mutations.clear() + except Exception as final_commit_error: + logger.error(f"❌ Ошибка финального коммита при деактивации: {final_commit_error}") + await db.rollback() + for mutation in reversed(cleanup_uuid_mutations): + mutation.rollback() + cleanup_uuid_mutations.clear() + + finally: + # Закрываем API клиент + if hwid_api_client: + try: + await hwid_api_client.__aexit__(None, None, None) + except Exception: + pass + logger.info(f"🎯 Синхронизация завершена: создано {stats['created']}, обновлено {stats['updated']}, деактивировано {stats['deleted']}, ошибок {stats['errors']}") return stats @@ -1677,91 +1704,120 @@ class RemnaWaveService: try: stats = {"created": 0, "updated": 0, "errors": 0} - batch_size = 100 + batch_size = 500 # Увеличен для ускорения offset = 0 + concurrent_limit = 10 # Параллельные запросы к API async with self.get_api_client() as api: + semaphore = asyncio.Semaphore(concurrent_limit) + while True: users = await get_users_list(db, offset=offset, limit=batch_size) if not users: break - for user in users: - if not user.subscription: - continue + # Фильтруем пользователей с подписками и готовим данные + users_with_subscriptions = [u for u in users if u.subscription] - try: - subscription = user.subscription - hwid_limit = resolve_hwid_device_limit_for_payload(subscription) + if not users_with_subscriptions: + if len(users) < batch_size: + break + offset += batch_size + continue - expire_at = self._safe_expire_at_for_panel(subscription.end_date) - status = UserStatus.ACTIVE if subscription.is_active else UserStatus.DISABLED + # Подготавливаем задачи для параллельного выполнения + async def process_user(user): + async with semaphore: + try: + subscription = user.subscription + hwid_limit = resolve_hwid_device_limit_for_payload(subscription) + expire_at = self._safe_expire_at_for_panel(subscription.end_date) - username = settings.format_remnawave_username( - full_name=user.full_name, - username=user.username, - telegram_id=user.telegram_id, - ) + # Определяем статус для панели + is_subscription_active = ( + subscription.status in ( + SubscriptionStatus.ACTIVE.value, + SubscriptionStatus.TRIAL.value, + ) + and subscription.end_date > datetime.utcnow() + ) + status = UserStatus.ACTIVE if is_subscription_active else UserStatus.DISABLED - create_kwargs = dict( - username=username, - expire_at=expire_at, - status=status, - traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, - traffic_limit_strategy=TrafficLimitStrategy.MONTH, - telegram_id=user.telegram_id, - description=settings.format_remnawave_user_description( + username = settings.format_remnawave_username( full_name=user.full_name, username=user.username, - telegram_id=user.telegram_id - ), - active_internal_squads=subscription.connected_squads, - ) + telegram_id=user.telegram_id, + ) - if hwid_limit is not None: - create_kwargs['hwid_device_limit'] = hwid_limit - - if user.remnawave_uuid: - update_kwargs = dict( - uuid=user.remnawave_uuid, - status=status, + create_kwargs = dict( + username=username, expire_at=expire_at, - traffic_limit_bytes=create_kwargs['traffic_limit_bytes'], + status=status, + traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0, traffic_limit_strategy=TrafficLimitStrategy.MONTH, - description=create_kwargs['description'], + telegram_id=user.telegram_id, + description=settings.format_remnawave_user_description( + full_name=user.full_name, + username=user.username, + telegram_id=user.telegram_id + ), active_internal_squads=subscription.connected_squads, ) if hwid_limit is not None: - update_kwargs['hwid_device_limit'] = hwid_limit + create_kwargs['hwid_device_limit'] = hwid_limit - try: - await api.update_user(**update_kwargs) - stats["updated"] += 1 - except RemnaWaveAPIError as api_error: - if api_error.status_code == 404: - logger.warning( - "⚠️ Не найден пользователь %s в панели, создаем заново", - user.remnawave_uuid, - ) + if user.remnawave_uuid: + update_kwargs = dict( + uuid=user.remnawave_uuid, + status=status, + expire_at=expire_at, + traffic_limit_bytes=create_kwargs['traffic_limit_bytes'], + traffic_limit_strategy=TrafficLimitStrategy.MONTH, + description=create_kwargs['description'], + active_internal_squads=subscription.connected_squads, + ) - new_user = await api.create_user(**create_kwargs) - user.remnawave_uuid = new_user.uuid - subscription.remnawave_short_uuid = new_user.short_uuid - stats["created"] += 1 - else: - raise - else: - new_user = await api.create_user(**create_kwargs) + if hwid_limit is not None: + update_kwargs['hwid_device_limit'] = hwid_limit + try: + await api.update_user(**update_kwargs) + return ("updated", user, None) + except RemnaWaveAPIError as api_error: + if api_error.status_code == 404: + new_user = await api.create_user(**create_kwargs) + return ("created", user, new_user) + else: + raise + else: + new_user = await api.create_user(**create_kwargs) + return ("created", user, new_user) + + except Exception as e: + logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") + return ("error", user, None) + + # Выполняем параллельно + tasks = [process_user(user) for user in users_with_subscriptions] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Обрабатываем результаты + for result in results: + if isinstance(result, Exception): + stats["errors"] += 1 + continue + + action, user, new_user = result + if action == "created": + if new_user: user.remnawave_uuid = new_user.uuid - subscription.remnawave_short_uuid = new_user.short_uuid - - stats["created"] += 1 - - except Exception as e: - logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}") + user.subscription.remnawave_short_uuid = new_user.short_uuid + stats["created"] += 1 + elif action == "updated": + stats["updated"] += 1 + else: stats["errors"] += 1 try: @@ -1772,7 +1828,12 @@ class RemnaWaveService: commit_error, ) await db.rollback() - stats["errors"] += len(users) + stats["errors"] += len(users_with_subscriptions) + + logger.info( + f"📦 Обработано {offset + len(users)} пользователей: " + f"создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}" + ) if len(users) < batch_size: break @@ -1783,7 +1844,7 @@ class RemnaWaveService: f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}" ) return stats - + except Exception as e: logger.error(f"Ошибка синхронизации пользователей в панель: {e}") return {"created": 0, "updated": 0, "errors": 1} diff --git a/app/states.py b/app/states.py index 795a6d67..61676d9e 100644 --- a/app/states.py +++ b/app/states.py @@ -134,6 +134,8 @@ class AdminStates(StatesGroup): creating_server_country = State() editing_welcome_text = State() + editing_pinned_message = State() + confirming_pinned_broadcast = State() waiting_for_message_buttons = "waiting_for_message_buttons" editing_promo_offer_message = State() diff --git a/app/utils/validators.py b/app/utils/validators.py index a4294a7d..fe65c273 100644 --- a/app/utils/validators.py +++ b/app/utils/validators.py @@ -4,14 +4,17 @@ from datetime import datetime import html ALLOWED_HTML_TAGS = { - 'b', 'strong', - 'i', 'em', - 'u', 'ins', - 's', 'strike', 'del', - 'code', - 'pre', - 'a', - 'blockquote' + 'b', 'strong', # жирный + 'i', 'em', # курсив + 'u', 'ins', # подчёркнутый + 's', 'strike', 'del', # зачёркнутый + 'code', # моноширинный + 'pre', # блок кода + 'a', # ссылка + 'blockquote', # цитата + 'tg-spoiler', # спойлер + 'tg-emoji', # кастомный эмодзи + 'span', # для class="tg-spoiler" } SELF_CLOSING_TAGS = { @@ -276,14 +279,16 @@ def fix_html_tags(text: str) -> str: def get_html_help_text() -> str: return """Поддерживаемые HTML теги: -• <b>жирный</b> или <strong>жирный</strong> -• <i>курсив</i> или <em>курсив</em> -• <u>подчеркнутый</u> -• <s>зачеркнутый</s> +• <b>жирный</b> или <strong></strong> +• <i>курсив</i> или <em></em> +• <u>подчёркнутый</u> +• <s>зачёркнутый</s><code>моноширинный</code><pre>блок кода</pre><a href="url">ссылка</a><blockquote>цитата</blockquote> +• <tg-spoiler>спойлер</tg-spoiler> +• <tg-emoji emoji-id="123">😀</tg-emoji> ⚠️ Важные правила: • Каждый открывающий тег должен быть закрыт @@ -292,11 +297,9 @@ def get_html_help_text() -> str: ❌ Неправильно: <b>жирный <i>курсив</b></i> -<a href=google.com>ссылка</a> ✅ Правильно: -<b>жирный <i>курсив</i></b> -<a href="https://google.com">ссылка</a>""" +<b>жирный <i>курсив</i></b>""" def validate_rules_content(text: str) -> Tuple[bool, str, Optional[str]]: diff --git a/app/webapi/app.py b/app/webapi/app.py index e4d5d372..c3745a48 100644 --- a/app/webapi/app.py +++ b/app/webapi/app.py @@ -18,6 +18,7 @@ from .routes import ( menu_layout, miniapp, partners, + pinned_messages, polls, promocodes, promo_groups, @@ -145,6 +146,13 @@ OPENAPI_TAGS = [ "name": "contests", "description": "Управление конкурсами: реферальными и ежедневными играми/раундами.", }, + { + "name": "pinned-messages", + "description": ( + "Управление закреплёнными сообщениями: создание, обновление, рассылка и " + "настройка показа при /start." + ), + }, ] @@ -224,6 +232,11 @@ def create_web_api_app() -> FastAPI: app.include_router(partners.router, prefix="/partners", tags=["partners"]) app.include_router(polls.router, prefix="/polls", tags=["polls"]) app.include_router(logs.router, prefix="/logs", tags=["logs"]) + app.include_router( + pinned_messages.router, + prefix="/pinned-messages", + tags=["pinned-messages"], + ) app.include_router( subscription_events.router, prefix="/notifications/subscriptions", diff --git a/app/webapi/routes/__init__.py b/app/webapi/routes/__init__.py index e492a4f5..f3426197 100644 --- a/app/webapi/routes/__init__.py +++ b/app/webapi/routes/__init__.py @@ -5,6 +5,7 @@ from . import ( media, miniapp, partners, + pinned_messages, polls, promo_offers, user_messages, @@ -31,6 +32,7 @@ __all__ = [ "media", "miniapp", "partners", + "pinned_messages", "polls", "promo_offers", "user_messages", diff --git a/app/webapi/routes/pinned_messages.py b/app/webapi/routes/pinned_messages.py new file mode 100644 index 00000000..d4b53a05 --- /dev/null +++ b/app/webapi/routes/pinned_messages.py @@ -0,0 +1,377 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any, Optional + +from aiogram import Bot +from aiogram.client.default import DefaultBotProperties +from aiogram.enums import ParseMode +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy import func, select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from app.config import settings +from app.database.models import PinnedMessage +from app.services.pinned_message_service import ( + broadcast_pinned_message, + deactivate_active_pinned_message, + get_active_pinned_message, + set_active_pinned_message, + unpin_active_pinned_message, +) + +from ..dependencies import get_db_session, require_api_token +from ..schemas.pinned_messages import ( + PinnedMessageBroadcastResponse, + PinnedMessageCreateRequest, + PinnedMessageListResponse, + PinnedMessageResponse, + PinnedMessageSettingsRequest, + PinnedMessageUnpinResponse, + PinnedMessageUpdateRequest, +) + +router = APIRouter() + + +def _serialize_pinned_message(msg: PinnedMessage) -> PinnedMessageResponse: + return PinnedMessageResponse( + id=msg.id, + content=msg.content, + media_type=msg.media_type, + media_file_id=msg.media_file_id, + send_before_menu=msg.send_before_menu, + send_on_every_start=msg.send_on_every_start, + is_active=msg.is_active, + created_by=msg.created_by, + created_at=msg.created_at, + updated_at=msg.updated_at, + ) + + +def _get_bot() -> Bot: + """Создать экземпляр бота для API операций.""" + return Bot( + token=settings.BOT_TOKEN, + default=DefaultBotProperties(parse_mode=ParseMode.HTML), + ) + + +@router.get("", response_model=PinnedMessageListResponse) +async def list_pinned_messages( + limit: int = Query(20, ge=1, le=100), + offset: int = Query(0, ge=0), + active_only: bool = Query(False), + token: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> PinnedMessageListResponse: + """Получить список всех закреплённых сообщений.""" + query = select(PinnedMessage).order_by(PinnedMessage.created_at.desc()) + count_query = select(func.count(PinnedMessage.id)) + + if active_only: + query = query.where(PinnedMessage.is_active.is_(True)) + count_query = count_query.where(PinnedMessage.is_active.is_(True)) + + total = await db.scalar(count_query) or 0 + result = await db.execute(query.offset(offset).limit(limit)) + items = result.scalars().all() + + return PinnedMessageListResponse( + items=[_serialize_pinned_message(msg) for msg in items], + total=total, + limit=limit, + offset=offset, + ) + + +@router.get("/active", response_model=Optional[PinnedMessageResponse]) +async def get_active_message( + token: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> Optional[PinnedMessageResponse]: + """Получить текущее активное закреплённое сообщение.""" + msg = await get_active_pinned_message(db) + if not msg: + return None + return _serialize_pinned_message(msg) + + +@router.get("/{message_id}", response_model=PinnedMessageResponse) +async def get_pinned_message( + message_id: int, + token: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> PinnedMessageResponse: + """Получить закреплённое сообщение по ID.""" + result = await db.execute( + select(PinnedMessage).where(PinnedMessage.id == message_id) + ) + msg = result.scalar_one_or_none() + if not msg: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Pinned message not found") + return _serialize_pinned_message(msg) + + +@router.post("", response_model=PinnedMessageBroadcastResponse, status_code=status.HTTP_201_CREATED) +async def create_pinned_message( + payload: PinnedMessageCreateRequest, + broadcast: bool = Query(False, description="Разослать сообщение всем пользователям (по умолчанию False — только при /start)"), + token: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> PinnedMessageBroadcastResponse: + """ + Создать новое закреплённое сообщение. + + Автоматически деактивирует предыдущее активное сообщение. + - broadcast=False (по умолчанию): пользователи увидят при следующем /start + - broadcast=True: рассылает сообщение всем активным пользователям сразу + """ + content = payload.content.strip() + if not content and not payload.media: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "Either content or media must be provided" + ) + + media_type = payload.media.type if payload.media else None + media_file_id = payload.media.file_id if payload.media else None + + try: + msg = await set_active_pinned_message( + db=db, + content=content, + created_by=None, + media_type=media_type, + media_file_id=media_file_id, + send_before_menu=payload.send_before_menu, + send_on_every_start=payload.send_on_every_start, + ) + except ValueError as e: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) + + sent_count = 0 + failed_count = 0 + + if broadcast: + sent_count, failed_count = await broadcast_pinned_message(_get_bot(), db, msg) + + return PinnedMessageBroadcastResponse( + message=_serialize_pinned_message(msg), + sent_count=sent_count, + failed_count=failed_count, + ) + + +@router.patch("/{message_id}", response_model=PinnedMessageResponse) +async def update_pinned_message( + message_id: int, + payload: PinnedMessageUpdateRequest, + token: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> PinnedMessageResponse: + """ + Обновить закреплённое сообщение. + + Можно обновить контент, медиа и настройки показа. + Не делает рассылку — для рассылки используйте POST /pinned-messages/{id}/broadcast. + """ + result = await db.execute( + select(PinnedMessage).where(PinnedMessage.id == message_id) + ) + msg = result.scalar_one_or_none() + if not msg: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Pinned message not found") + + if payload.content is not None: + from app.utils.validators import sanitize_html, validate_html_tags + sanitized = sanitize_html(payload.content) + is_valid, error = validate_html_tags(sanitized) + if not is_valid: + raise HTTPException(status.HTTP_400_BAD_REQUEST, error) + msg.content = sanitized + + if payload.media is not None: + if payload.media.type not in ("photo", "video"): + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + "Only photo or video media types are supported" + ) + msg.media_type = payload.media.type + msg.media_file_id = payload.media.file_id + + if payload.send_before_menu is not None: + msg.send_before_menu = payload.send_before_menu + + if payload.send_on_every_start is not None: + msg.send_on_every_start = payload.send_on_every_start + + msg.updated_at = datetime.utcnow() + await db.commit() + await db.refresh(msg) + + return _serialize_pinned_message(msg) + + +@router.patch("/{message_id}/settings", response_model=PinnedMessageResponse) +async def update_pinned_message_settings( + message_id: int, + payload: PinnedMessageSettingsRequest, + token: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> PinnedMessageResponse: + """ + Обновить только настройки закреплённого сообщения. + + - send_before_menu: показывать до или после меню + - send_on_every_start: показывать при каждом /start или только один раз + """ + result = await db.execute( + select(PinnedMessage).where(PinnedMessage.id == message_id) + ) + msg = result.scalar_one_or_none() + if not msg: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Pinned message not found") + + if payload.send_before_menu is not None: + msg.send_before_menu = payload.send_before_menu + + if payload.send_on_every_start is not None: + msg.send_on_every_start = payload.send_on_every_start + + msg.updated_at = datetime.utcnow() + await db.commit() + await db.refresh(msg) + + return _serialize_pinned_message(msg) + + +@router.post("/{message_id}/activate", response_model=PinnedMessageBroadcastResponse) +async def activate_pinned_message( + message_id: int, + broadcast: bool = Query(False, description="Разослать сообщение всем пользователям (по умолчанию False — только при /start)"), + token: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> PinnedMessageBroadcastResponse: + """ + Активировать закреплённое сообщение. + + Деактивирует текущее активное сообщение и активирует указанное. + - broadcast=False (по умолчанию): пользователи увидят при следующем /start + - broadcast=True: рассылает сообщение всем активным пользователям сразу + """ + result = await db.execute( + select(PinnedMessage).where(PinnedMessage.id == message_id) + ) + msg = result.scalar_one_or_none() + if not msg: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Pinned message not found") + + # Деактивируем все активные + await db.execute( + update(PinnedMessage) + .where(PinnedMessage.is_active.is_(True)) + .values(is_active=False, updated_at=datetime.utcnow()) + ) + + # Активируем указанное + msg.is_active = True + msg.updated_at = datetime.utcnow() + await db.commit() + await db.refresh(msg) + + sent_count = 0 + failed_count = 0 + + if broadcast: + sent_count, failed_count = await broadcast_pinned_message(_get_bot(), db, msg) + + return PinnedMessageBroadcastResponse( + message=_serialize_pinned_message(msg), + sent_count=sent_count, + failed_count=failed_count, + ) + + +@router.post("/{message_id}/broadcast", response_model=PinnedMessageBroadcastResponse) +async def broadcast_message( + message_id: int, + token: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> PinnedMessageBroadcastResponse: + """ + Разослать закреплённое сообщение всем активным пользователям. + + Работает для любого сообщения, независимо от его статуса активности. + """ + result = await db.execute( + select(PinnedMessage).where(PinnedMessage.id == message_id) + ) + msg = result.scalar_one_or_none() + if not msg: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Pinned message not found") + + sent_count, failed_count = await broadcast_pinned_message(_get_bot(), db, msg) + + return PinnedMessageBroadcastResponse( + message=_serialize_pinned_message(msg), + sent_count=sent_count, + failed_count=failed_count, + ) + + +@router.post("/active/deactivate", response_model=Optional[PinnedMessageResponse]) +async def deactivate_active_message( + token: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> Optional[PinnedMessageResponse]: + """ + Деактивировать текущее активное закреплённое сообщение. + + Не удаляет сообщение и не открепляет у пользователей. + """ + msg = await deactivate_active_pinned_message(db) + if not msg: + return None + return _serialize_pinned_message(msg) + + +@router.post("/active/unpin", response_model=PinnedMessageUnpinResponse) +async def unpin_active_message( + token: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> PinnedMessageUnpinResponse: + """ + Открепить сообщение у всех пользователей и деактивировать. + + Удаляет закреплённое сообщение из чатов всех активных пользователей. + """ + unpinned_count, failed_count, was_active = await unpin_active_pinned_message(_get_bot(), db) + return PinnedMessageUnpinResponse( + unpinned_count=unpinned_count, + failed_count=failed_count, + was_active=was_active, + ) + + +@router.delete("/{message_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None) +async def delete_pinned_message( + message_id: int, + token: Any = Depends(require_api_token), + db: AsyncSession = Depends(get_db_session), +) -> None: + """ + Удалить закреплённое сообщение. + + Если сообщение активно, сначала будет деактивировано. + Не открепляет сообщение у пользователей — для этого используйте /active/unpin. + """ + result = await db.execute( + select(PinnedMessage).where(PinnedMessage.id == message_id) + ) + msg = result.scalar_one_or_none() + if not msg: + raise HTTPException(status.HTTP_404_NOT_FOUND, "Pinned message not found") + + await db.delete(msg) + await db.commit() diff --git a/app/webapi/schemas/pinned_messages.py b/app/webapi/schemas/pinned_messages.py new file mode 100644 index 00000000..db8f0dc3 --- /dev/null +++ b/app/webapi/schemas/pinned_messages.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field + + +class PinnedMessageMedia(BaseModel): + type: str = Field(pattern=r"^(photo|video)$") + file_id: str + + +class PinnedMessageBase(BaseModel): + content: Optional[str] = Field(None, max_length=4000) + send_before_menu: bool = True + send_on_every_start: bool = True + + +class PinnedMessageCreateRequest(PinnedMessageBase): + content: str = Field(..., min_length=1, max_length=4000) + media: Optional[PinnedMessageMedia] = None + + +class PinnedMessageUpdateRequest(BaseModel): + content: Optional[str] = Field(None, max_length=4000) + send_before_menu: Optional[bool] = None + send_on_every_start: Optional[bool] = None + media: Optional[PinnedMessageMedia] = None + + +class PinnedMessageSettingsRequest(BaseModel): + send_before_menu: Optional[bool] = None + send_on_every_start: Optional[bool] = None + + +class PinnedMessageResponse(BaseModel): + id: int + content: Optional[str] + media_type: Optional[str] = None + media_file_id: Optional[str] = None + send_before_menu: bool + send_on_every_start: bool + is_active: bool + created_by: Optional[int] = None + created_at: datetime + updated_at: Optional[datetime] = None + + +class PinnedMessageBroadcastResponse(BaseModel): + message: PinnedMessageResponse + sent_count: int + failed_count: int + + +class PinnedMessageUnpinResponse(BaseModel): + unpinned_count: int + failed_count: int + was_active: bool + + +class PinnedMessageListResponse(BaseModel): + items: list[PinnedMessageResponse] + total: int + limit: int + offset: int diff --git a/migrations/alembic/versions/1b2e3d4f5a6b_add_pinned_start_mode_and_user_last_pin.py b/migrations/alembic/versions/1b2e3d4f5a6b_add_pinned_start_mode_and_user_last_pin.py new file mode 100644 index 00000000..f85d72dd --- /dev/null +++ b/migrations/alembic/versions/1b2e3d4f5a6b_add_pinned_start_mode_and_user_last_pin.py @@ -0,0 +1,57 @@ +"""add pinned start mode and user last pin + +Revision ID: 1b2e3d4f5a6b +Revises: 7a3c0b8f5b84 +Create Date: 2025-01-01 00:00:00.000000 +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '1b2e3d4f5a6b' +down_revision = '7a3c0b8f5b84' +branch_labels = None +depends_on = None + + +def _table_exists(inspector: sa.Inspector, table_name: str) -> bool: + return table_name in inspector.get_table_names() + + +def _column_exists(inspector: sa.Inspector, table_name: str, column_name: str) -> bool: + if not _table_exists(inspector, table_name): + return False + columns = {col["name"] for col in inspector.get_columns(table_name)} + return column_name in columns + + +def upgrade(): + bind = op.get_bind() + inspector = sa.inspect(bind) + + if _table_exists(inspector, "pinned_messages"): + if not _column_exists(inspector, "pinned_messages", "send_on_every_start"): + op.add_column( + 'pinned_messages', + sa.Column('send_on_every_start', sa.Boolean(), nullable=False, server_default='1'), + ) + + if _table_exists(inspector, "users"): + if not _column_exists(inspector, "users", "last_pinned_message_id"): + op.add_column( + 'users', + sa.Column('last_pinned_message_id', sa.Integer(), nullable=True), + ) + + +def downgrade(): + bind = op.get_bind() + inspector = sa.inspect(bind) + + if _column_exists(inspector, "users", "last_pinned_message_id"): + op.drop_column('users', 'last_pinned_message_id') + + if _column_exists(inspector, "pinned_messages", "send_on_every_start"): + op.drop_column('pinned_messages', 'send_on_every_start') diff --git a/migrations/alembic/versions/5f2a3e099427_add_media_fields_to_pinned_messages.py b/migrations/alembic/versions/5f2a3e099427_add_media_fields_to_pinned_messages.py new file mode 100644 index 00000000..fdd05440 --- /dev/null +++ b/migrations/alembic/versions/5f2a3e099427_add_media_fields_to_pinned_messages.py @@ -0,0 +1,75 @@ +"""add media fields to pinned messages""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +revision: str = "5f2a3e099427" +down_revision: Union[str, None] = "c9c71d04f0a1" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +TABLE_NAME = "pinned_messages" + + +def _table_exists(inspector: sa.Inspector) -> bool: + return TABLE_NAME in inspector.get_table_names() + + +def _column_missing(inspector: sa.Inspector, column_name: str) -> bool: + columns = {column.get("name") for column in inspector.get_columns(TABLE_NAME)} + return column_name not in columns + + +def upgrade() -> None: + bind = op.get_bind() + inspector = sa.inspect(bind) + + if not _table_exists(inspector): + return + + if _column_missing(inspector, "media_type"): + op.add_column( + TABLE_NAME, + sa.Column("media_type", sa.String(length=32), nullable=True), + ) + + if _column_missing(inspector, "media_file_id"): + op.add_column( + TABLE_NAME, + sa.Column("media_file_id", sa.String(length=255), nullable=True), + ) + + # Ensure content has a default value for media-only messages + op.alter_column( + TABLE_NAME, + "content", + existing_type=sa.Text(), + nullable=False, + server_default="", + ) + + +def downgrade() -> None: + bind = op.get_bind() + inspector = sa.inspect(bind) + + if not _table_exists(inspector): + return + + if not _column_missing(inspector, "media_type"): + op.drop_column(TABLE_NAME, "media_type") + + if not _column_missing(inspector, "media_file_id"): + op.drop_column(TABLE_NAME, "media_file_id") + + op.alter_column( + TABLE_NAME, + "content", + existing_type=sa.Text(), + nullable=False, + server_default=None, + ) diff --git a/migrations/alembic/versions/7a3c0b8f5b84_add_send_before_menu_to_pinned_messages.py b/migrations/alembic/versions/7a3c0b8f5b84_add_send_before_menu_to_pinned_messages.py new file mode 100644 index 00000000..9234bb5a --- /dev/null +++ b/migrations/alembic/versions/7a3c0b8f5b84_add_send_before_menu_to_pinned_messages.py @@ -0,0 +1,59 @@ +"""add send_before_menu to pinned messages + +Revision ID: 7a3c0b8f5b84 +Revises: 5f2a3e099427 +Create Date: 2025-02-05 00:00:00.000000 +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "7a3c0b8f5b84" +down_revision = "5f2a3e099427" +branch_labels = None +depends_on = None + + +TABLE_NAME = "pinned_messages" + + +def _table_exists(inspector: sa.Inspector) -> bool: + return TABLE_NAME in inspector.get_table_names() + + +def _column_exists(inspector: sa.Inspector, column_name: str) -> bool: + if not _table_exists(inspector): + return False + columns = {col["name"] for col in inspector.get_columns(TABLE_NAME)} + return column_name in columns + + +def upgrade() -> None: + bind = op.get_bind() + inspector = sa.inspect(bind) + + if not _table_exists(inspector): + return + + if _column_exists(inspector, "send_before_menu"): + return + + op.add_column( + TABLE_NAME, + sa.Column( + "send_before_menu", + sa.Boolean(), + nullable=False, + server_default=sa.text("1"), + ), + ) + + +def downgrade() -> None: + bind = op.get_bind() + inspector = sa.inspect(bind) + + if _column_exists(inspector, "send_before_menu"): + op.drop_column(TABLE_NAME, "send_before_menu") diff --git a/migrations/alembic/versions/c9c71d04f0a1_add_pinned_messages_table.py b/migrations/alembic/versions/c9c71d04f0a1_add_pinned_messages_table.py new file mode 100644 index 00000000..add5fe11 --- /dev/null +++ b/migrations/alembic/versions/c9c71d04f0a1_add_pinned_messages_table.py @@ -0,0 +1,45 @@ +"""add pinned messages table""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +revision: str = "c9c71d04f0a1" +down_revision: Union[str, None] = "e3c1e0b5b4a7" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +TABLE_NAME = "pinned_messages" + + +def _table_exists(inspector: sa.Inspector) -> bool: + return TABLE_NAME in inspector.get_table_names() + + +def upgrade() -> None: + bind = op.get_bind() + inspector = sa.inspect(bind) + + if _table_exists(inspector): + return + + op.create_table( + TABLE_NAME, + sa.Column("id", sa.Integer(), primary_key=True, index=True), + sa.Column("content", sa.Text(), nullable=False), + sa.Column("is_active", sa.Boolean(), default=True), + sa.Column("created_by", sa.Integer(), sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True), + sa.Column("created_at", sa.DateTime(), server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(), server_default=sa.func.now(), onupdate=sa.func.now()), + ) + + +def downgrade() -> None: + bind = op.get_bind() + inspector = sa.inspect(bind) + + if _table_exists(inspector): + op.drop_table(TABLE_NAME)