From f7be2911cdafe066b91b7913cb9e6b8b2d511ce1 Mon Sep 17 00:00:00 2001 From: Egor Date: Mon, 22 Dec 2025 12:41:43 +0300 Subject: [PATCH] Add position control for pinned messages --- app/database/models.py | 16 ++ app/database/universal_migration.py | 187 ++++++++++++++ app/handlers/admin/messages.py | 162 +++++++++++- app/handlers/common.py | 2 +- app/handlers/start.py | 28 ++ app/keyboards/admin.py | 32 +++ app/localization/locales/en.json | 5 + app/localization/locales/ru.json | 5 + app/localization/locales/ua.json | 9 +- app/localization/locales/zh.json | 5 + app/services/pinned_message_service.py | 240 ++++++++++++++++++ app/states.py | 1 + ...dd_delivery_position_to_pinned_messages.py | 28 ++ ...427_add_media_fields_to_pinned_messages.py | 75 ++++++ .../c9c71d04f0a1_add_pinned_messages_table.py | 45 ++++ 15 files changed, 836 insertions(+), 4 deletions(-) create mode 100644 app/services/pinned_message_service.py create mode 100644 migrations/alembic/versions/0f3d1f5c2c0a_add_delivery_position_to_pinned_messages.py create mode 100644 migrations/alembic/versions/5f2a3e099427_add_media_fields_to_pinned_messages.py create mode 100644 migrations/alembic/versions/c9c71d04f0a1_add_pinned_messages_table.py diff --git a/app/database/models.py b/app/database/models.py index 13cdfd0b..8c246d20 100644 --- a/app/database/models.py +++ b/app/database/models.py @@ -1553,6 +1553,22 @@ class WelcomeText(Base): creator = relationship("User", backref="created_welcome_texts") +class PinnedMessage(Base): + __tablename__ = "pinned_messages" + + id = Column(Integer, primary_key=True, index=True) + content = Column(Text, nullable=False, default="") + media_type = Column(String(32), nullable=True) + media_file_id = Column(String(255), nullable=True) + delivery_position = Column(String(32), nullable=False, default="before_menu") + is_active = Column(Boolean, default=True) + created_by = Column(Integer, ForeignKey("users.id", ondelete="SET NULL"), nullable=True) + created_at = Column(DateTime, default=func.now()) + updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) + + creator = relationship("User", backref="pinned_messages") + + class AdvertisingCampaign(Base): __tablename__ = "advertising_campaigns" diff --git a/app/database/universal_migration.py b/app/database/universal_migration.py index e418502e..afea3fbc 100644 --- a/app/database/universal_migration.py +++ b/app/database/universal_migration.py @@ -3014,6 +3014,153 @@ async def create_welcome_texts_table(): logger.error(f"Ошибка создания таблицы welcome_texts: {e}") return False + +async def create_pinned_messages_table(): + table_exists = await check_table_exists("pinned_messages") + if table_exists: + logger.info("Таблица pinned_messages уже существует") + return True + + try: + async with engine.begin() as conn: + db_type = await get_database_type() + + if db_type == "sqlite": + create_sql = """ + CREATE TABLE pinned_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + content TEXT NOT NULL DEFAULT '', + media_type VARCHAR(32) NULL, + media_file_id VARCHAR(255) NULL, + delivery_position VARCHAR(32) NOT NULL DEFAULT 'before_menu', + is_active BOOLEAN DEFAULT 1, + created_by INTEGER NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL + ); + + CREATE INDEX IF NOT EXISTS ix_pinned_messages_active ON pinned_messages(is_active); + """ + + elif db_type == "postgresql": + create_sql = """ + CREATE TABLE pinned_messages ( + id SERIAL PRIMARY KEY, + content TEXT NOT NULL DEFAULT '', + media_type VARCHAR(32) NULL, + media_file_id VARCHAR(255) NULL, + delivery_position VARCHAR(32) NOT NULL DEFAULT 'before_menu', + is_active BOOLEAN DEFAULT TRUE, + created_by INTEGER NULL REFERENCES users(id) ON DELETE SET NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + + CREATE INDEX IF NOT EXISTS ix_pinned_messages_active ON pinned_messages(is_active); + """ + + elif db_type == "mysql": + create_sql = """ + CREATE TABLE pinned_messages ( + id INT AUTO_INCREMENT PRIMARY KEY, + content TEXT NOT NULL DEFAULT '', + media_type VARCHAR(32) NULL, + media_file_id VARCHAR(255) NULL, + delivery_position VARCHAR(32) NOT NULL DEFAULT 'before_menu', + is_active BOOLEAN DEFAULT TRUE, + created_by INT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL + ); + + CREATE INDEX ix_pinned_messages_active ON pinned_messages(is_active); + """ + + else: + logger.error(f"Неподдерживаемый тип БД для создания таблицы pinned_messages: {db_type}") + return False + + await conn.execute(text(create_sql)) + + logger.info("✅ Таблица pinned_messages успешно создана") + return True + + except Exception as e: + logger.error(f"Ошибка создания таблицы pinned_messages: {e}") + return False + + +async def ensure_pinned_message_media_columns(): + table_exists = await check_table_exists("pinned_messages") + if not table_exists: + logger.warning("⚠️ Таблица pinned_messages отсутствует — пропускаем обновление медиа полей") + return False + + try: + async with engine.begin() as conn: + db_type = await get_database_type() + + if not await check_column_exists("pinned_messages", "media_type"): + await conn.execute( + text("ALTER TABLE pinned_messages ADD COLUMN media_type VARCHAR(32)") + ) + + if not await check_column_exists("pinned_messages", "media_file_id"): + await conn.execute( + text("ALTER TABLE pinned_messages ADD COLUMN media_file_id VARCHAR(255)") + ) + + await conn.execute(text("UPDATE pinned_messages SET content = '' WHERE content IS NULL")) + + if db_type == "postgresql": + await conn.execute( + text("ALTER TABLE pinned_messages ALTER COLUMN content SET DEFAULT ''") + ) + elif db_type == "mysql": + await conn.execute( + text("ALTER TABLE pinned_messages MODIFY content TEXT NOT NULL DEFAULT ''") + ) + else: + logger.info("ℹ️ Пропускаем установку DEFAULT для content в SQLite") + + logger.info("✅ Медиа поля pinned_messages приведены в актуальное состояние") + return True + + except Exception as e: + logger.error(f"Ошибка обновления медиа полей pinned_messages: {e}") + return False + + +async def ensure_pinned_message_position_column(): + table_exists = await check_table_exists("pinned_messages") + if not table_exists: + logger.warning("⚠️ Таблица pinned_messages отсутствует — пропускаем обновление позиции") + return False + + try: + async with engine.begin() as conn: + if not await check_column_exists("pinned_messages", "delivery_position"): + await conn.execute( + text( + "ALTER TABLE pinned_messages ADD COLUMN delivery_position VARCHAR(32) " + "NOT NULL DEFAULT 'before_menu'" + ) + ) + await conn.execute( + text( + "UPDATE pinned_messages SET delivery_position = 'before_menu' " + "WHERE delivery_position IS NULL" + ) + ) + logger.info("✅ Позиция закрепленного сообщения приведена в актуальное состояние") + return True + + except Exception as e: + logger.error(f"Ошибка обновления позиции закрепленного сообщения: {e}") + return False + async def add_media_fields_to_broadcast_history(): logger.info("=== ДОБАВЛЕНИЕ ПОЛЕЙ МЕДИА В BROADCAST_HISTORY ===") @@ -4690,12 +4837,32 @@ async def run_universal_migration(): else: logger.warning("⚠️ Проблемы с таблицей user_messages") + logger.info("=== СОЗДАНИЕ ТАБЛИЦЫ PINNED_MESSAGES ===") + pinned_messages_created = await create_pinned_messages_table() + if pinned_messages_created: + logger.info("✅ Таблица pinned_messages готова") + else: + logger.warning("⚠️ Проблемы с таблицей pinned_messages") + logger.info("=== СОЗДАНИЕ/ОБНОВЛЕНИЕ ТАБЛИЦЫ WELCOME_TEXTS ===") welcome_texts_created = await create_welcome_texts_table() if welcome_texts_created: logger.info("✅ Таблица welcome_texts готова с полем is_enabled") else: logger.warning("⚠️ Проблемы с таблицей welcome_texts") + + logger.info("=== ОБНОВЛЕНИЕ СХЕМЫ PINNED_MESSAGES ===") + pinned_media_ready = await ensure_pinned_message_media_columns() + if pinned_media_ready: + logger.info("✅ Медиа поля для pinned_messages готовы") + else: + logger.warning("⚠️ Проблемы с медиа полями pinned_messages") + + pinned_position_ready = await ensure_pinned_message_position_column() + if pinned_position_ready: + logger.info("✅ Позиция закрепленного сообщения готова") + else: + logger.warning("⚠️ Проблемы с позицией закрепленного сообщения") logger.info("=== ДОБАВЛЕНИЕ МЕДИА ПОЛЕЙ В BROADCAST_HISTORY ===") media_fields_added = await add_media_fields_to_broadcast_history() @@ -4880,8 +5047,11 @@ async def check_migration_status(): "cryptobot_table": False, "heleket_table": False, "user_messages_table": False, + "pinned_messages_table": False, "welcome_texts_table": False, "welcome_texts_is_enabled_column": False, + "pinned_messages_media_columns": False, + "pinned_messages_position_column": False, "broadcast_history_media_fields": False, "subscription_duplicates": False, "subscription_conversions_table": False, @@ -4924,6 +5094,7 @@ async def check_migration_status(): status["cryptobot_table"] = await check_table_exists('cryptobot_payments') status["heleket_table"] = await check_table_exists('heleket_payments') status["user_messages_table"] = await check_table_exists('user_messages') + status["pinned_messages_table"] = await check_table_exists('pinned_messages') status["welcome_texts_table"] = await check_table_exists('welcome_texts') status["privacy_policies_table"] = await check_table_exists('privacy_policies') status["public_offers_table"] = await check_table_exists('public_offers') @@ -4969,6 +5140,19 @@ async def check_migration_status(): await check_column_exists('broadcast_history', 'media_caption') ) status["broadcast_history_media_fields"] = media_fields_exist + + pinned_media_columns_exist = ( + status["pinned_messages_table"] + and await check_column_exists('pinned_messages', 'media_type') + and await check_column_exists('pinned_messages', 'media_file_id') + ) + status["pinned_messages_media_columns"] = pinned_media_columns_exist + + pinned_position_exists = ( + status["pinned_messages_table"] + and await check_column_exists('pinned_messages', 'delivery_position') + ) + status["pinned_messages_position_column"] = pinned_position_exists async with engine.begin() as conn: duplicates_check = await conn.execute(text(""" @@ -4987,10 +5171,13 @@ async def check_migration_status(): "cryptobot_table": "Таблица CryptoBot payments", "heleket_table": "Таблица Heleket payments", "user_messages_table": "Таблица пользовательских сообщений", + "pinned_messages_table": "Таблица закреплённых сообщений", "welcome_texts_table": "Таблица приветственных текстов", "privacy_policies_table": "Таблица политик конфиденциальности", "public_offers_table": "Таблица публичных оферт", "welcome_texts_is_enabled_column": "Поле is_enabled в welcome_texts", + "pinned_messages_media_columns": "Медиа поля в pinned_messages", + "pinned_messages_position_column": "Поле позиции закреплённого сообщения", "broadcast_history_media_fields": "Медиа поля в broadcast_history", "subscription_conversions_table": "Таблица конверсий подписок", "subscription_events_table": "Таблица событий подписок", diff --git a/app/handlers/admin/messages.py b/app/handlers/admin/messages.py index 3bf0210f..7b7b05e4 100644 --- a/app/handlers/admin/messages.py +++ b/app/handlers/admin/messages.py @@ -1,3 +1,4 @@ +import html import logging import asyncio from datetime import datetime, timedelta @@ -25,13 +26,19 @@ from app.keyboards.admin import ( get_admin_pagination_keyboard, get_broadcast_media_keyboard, get_media_confirm_keyboard, get_updated_message_buttons_selector_keyboard_with_media, BROADCAST_BUTTON_ROWS, DEFAULT_BROADCAST_BUTTONS, - get_broadcast_button_config, get_broadcast_button_labels + get_broadcast_button_config, get_broadcast_button_labels, get_pinned_message_keyboard ) from app.localization.texts import get_texts from app.database.crud.user import get_users_list from app.database.crud.subscription import get_expiring_subscriptions from app.utils.decorators import admin_required, error_handler from app.utils.miniapp_buttons import build_miniapp_or_callback_button +from app.services.pinned_message_service import ( + broadcast_pinned_message, + get_active_pinned_message, + set_active_pinned_message, + update_active_pinned_position, +) logger = logging.getLogger(__name__) @@ -167,6 +174,155 @@ async def show_messages_menu( await callback.answer() +@admin_required +@error_handler +async def show_pinned_message_menu( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, + state: FSMContext, +): + texts = get_texts(db_user.language) + await state.clear() + pinned_message = await get_active_pinned_message(db) + + if pinned_message: + content_preview = html.escape(pinned_message.content or "") + last_updated = pinned_message.updated_at or pinned_message.created_at + timestamp_text = last_updated.strftime("%d.%m.%Y %H:%M") if last_updated else "—" + media_line = "" + if pinned_message.media_type: + media_label = "Фото" if pinned_message.media_type == "photo" else "Видео" + media_line = f"📎 Медиа: {media_label}\n" + position_label = ( + texts.t("PINNED_POSITION_BEFORE_MENU", "До меню") + if (pinned_message.delivery_position or "before_menu") == "before_menu" + else texts.t("PINNED_POSITION_AFTER_MENU", "После меню") + ) + body = ( + "📌 Закрепленное сообщение\n\n" + "📝 Текущий текст:\n" + f"{content_preview}\n\n" + f"{media_line}" + f"📍 Позиция: {position_label}\n" + f"🕒 Обновлено: {timestamp_text}" + ) + else: + body = ( + "📌 Закрепленное сообщение\n\n" + "Сообщение не задано. Отправьте новый текст, чтобы разослать и закрепить его у пользователей." + ) + + await callback.message.edit_text( + body, + reply_markup=get_pinned_message_keyboard( + db_user.language, + position=pinned_message.delivery_position if pinned_message else None, + ), + parse_mode="HTML", + ) + await callback.answer() + + +@admin_required +@error_handler +async def prompt_pinned_message_update( + callback: types.CallbackQuery, + db_user: User, + state: FSMContext, +): + await state.set_state(AdminStates.editing_pinned_message) + await callback.message.edit_text( + "✏️ Новое закрепленное сообщение\n\n" + "Пришлите текст, фото или видео, которое нужно закрепить.\n" + "Бот отправит его всем активным пользователям, открепит старое и закрепит новое без уведомлений.", + reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[ + [types.InlineKeyboardButton(text="❌ Отмена", callback_data="admin_pinned_message")] + ]), + parse_mode="HTML", + ) + await callback.answer() + + +@admin_required +@error_handler +async def process_pinned_message_update( + message: types.Message, + db_user: User, + state: FSMContext, + db: AsyncSession, +): + media_type: Optional[str] = None + media_file_id: Optional[str] = None + + if message.photo: + media_type = "photo" + media_file_id = message.photo[-1].file_id + elif message.video: + media_type = "video" + media_file_id = message.video.file_id + + pinned_text = message.html_text or message.caption_html or message.text or message.caption or "" + + if not pinned_text and not media_file_id: + await message.answer("❌ Не удалось прочитать текст или медиа в сообщении, попробуйте снова.") + return + + try: + pinned_message = await set_active_pinned_message( + db, + pinned_text, + db_user.id, + media_type=media_type, + media_file_id=media_file_id, + ) + except ValueError as validation_error: + await message.answer(f"❌ {validation_error}") + return + + await message.answer( + "📌 Сообщение сохранено. Начинаю отправку и закрепление у пользователей...", + parse_mode="HTML", + ) + + sent_count, failed_count = await broadcast_pinned_message( + message.bot, + db, + pinned_message, + ) + + total = sent_count + failed_count + await message.answer( + "✅ Закрепленное сообщение обновлено\n\n" + f"👥 Получателей: {total}\n" + f"✅ Отправлено: {sent_count}\n" + f"⚠️ Ошибок: {failed_count}", + reply_markup=get_admin_messages_keyboard(db_user.language), + parse_mode="HTML", + ) + await state.clear() + + +@admin_required +@error_handler +async def toggle_pinned_message_position( + callback: types.CallbackQuery, + db_user: User, + db: AsyncSession, + state: FSMContext, +): + pinned_message = await get_active_pinned_message(db) + if not pinned_message: + await callback.answer("Сначала создайте закрепленное сообщение", show_alert=True) + return + + current_position = (pinned_message.delivery_position or "before_menu") + new_position = "after_menu" if current_position == "before_menu" else "before_menu" + + await update_active_pinned_position(db, new_position) + await show_pinned_message_menu(callback, db_user, db, state) + + @admin_required @error_handler async def show_broadcast_targets( @@ -1295,6 +1451,9 @@ def get_target_display_name(target: str) -> str: def register_handlers(dp: Dispatcher): dp.callback_query.register(show_messages_menu, F.data == "admin_messages") + dp.callback_query.register(show_pinned_message_menu, F.data == "admin_pinned_message") + dp.callback_query.register(toggle_pinned_message_position, F.data == "admin_pinned_message_position") + dp.callback_query.register(prompt_pinned_message_update, F.data == "admin_pinned_message_edit") dp.callback_query.register(show_broadcast_targets, F.data.in_(["admin_msg_all", "admin_msg_by_sub"])) dp.callback_query.register(select_broadcast_target, F.data.startswith("broadcast_")) dp.callback_query.register(confirm_broadcast, F.data == "admin_confirm_broadcast") @@ -1312,3 +1471,4 @@ def register_handlers(dp: Dispatcher): dp.callback_query.register(handle_change_media, F.data == "change_media") dp.message.register(process_broadcast_message, AdminStates.waiting_for_broadcast_message) dp.message.register(process_broadcast_media, AdminStates.waiting_for_broadcast_media) + dp.message.register(process_pinned_message_update, AdminStates.editing_pinned_message) diff --git a/app/handlers/common.py b/app/handlers/common.py index 3c8549f2..de88d57c 100644 --- a/app/handlers/common.py +++ b/app/handlers/common.py @@ -67,7 +67,7 @@ async def handle_cancel( async def handle_unknown_message( message: types.Message, - db_user: User + db_user: User | None = None ): texts = get_texts(db_user.language if db_user else "ru") diff --git a/app/handlers/start.py b/app/handlers/start.py index f2e125d1..d01f816e 100644 --- a/app/handlers/start.py +++ b/app/handlers/start.py @@ -36,6 +36,7 @@ from app.services.subscription_service import SubscriptionService from app.services.support_settings_service import SupportSettingsService from app.services.main_menu_button_service import MainMenuButtonService from app.services.privacy_policy_service import PrivacyPolicyService +from app.services.pinned_message_service import deliver_pinned_message_to_user from app.utils.user_utils import generate_unique_referral_code from app.utils.promo_offer import ( build_promo_offer_hint, @@ -61,6 +62,17 @@ def _calculate_subscription_flags(subscription): return has_active_subscription, subscription_is_active +async def _send_pinned_message(bot: Bot, db: AsyncSession, user, position_filter: str | None = None) -> None: + try: + await deliver_pinned_message_to_user(bot, db, user, position_filter=position_filter) + except Exception as error: # noqa: BLE001 + logger.error( + "Не удалось отправить закрепленное сообщение пользователю %s: %s", + getattr(user, "telegram_id", "unknown"), + error, + ) + + async def _apply_campaign_bonus_if_needed( db: AsyncSession, user, @@ -433,11 +445,13 @@ async def cmd_start(message: types.Message, state: FSMContext, db: AsyncSession, is_moderator=is_moderator, custom_buttons=custom_buttons, ) + await _send_pinned_message(message.bot, db, user, position_filter="before_menu") await message.answer( menu_text, reply_markup=keyboard, parse_mode="HTML" ) + await _send_pinned_message(message.bot, db, user, position_filter="after_menu") await state.clear() return @@ -1089,11 +1103,13 @@ async def complete_registration_from_callback( is_moderator=is_moderator, custom_buttons=custom_buttons, ) + await _send_pinned_message(callback.bot, db, existing_user, position_filter="before_menu") await callback.message.answer( menu_text, reply_markup=keyboard, parse_mode="HTML" ) + await _send_pinned_message(callback.bot, db, existing_user, position_filter="after_menu") except Exception as e: logger.error(f"Ошибка при показе главного меню существующему пользователю: {e}") await callback.message.answer( @@ -1232,6 +1248,8 @@ async def complete_registration_from_callback( reply_markup=get_post_registration_keyboard(user.language), ) logger.info(f"✅ Приветственное сообщение отправлено пользователю {user.telegram_id}") + await _send_pinned_message(callback.bot, db, user, position_filter="before_menu") + await _send_pinned_message(callback.bot, db, user, position_filter="after_menu") except Exception as e: logger.error(f"Ошибка при отправке приветственного сообщения: {e}") else: @@ -1272,11 +1290,13 @@ async def complete_registration_from_callback( is_moderator=is_moderator, custom_buttons=custom_buttons, ) + await _send_pinned_message(callback.bot, db, user, position_filter="before_menu") await callback.message.answer( menu_text, reply_markup=keyboard, parse_mode="HTML" ) + await _send_pinned_message(callback.bot, db, user, position_filter="after_menu") logger.info(f"✅ Главное меню показано пользователю {user.telegram_id}") except Exception as e: logger.error(f"Ошибка при показе главного меню: {e}") @@ -1369,11 +1389,13 @@ async def complete_registration( is_moderator=is_moderator, custom_buttons=custom_buttons, ) + await _send_pinned_message(message.bot, db, existing_user, position_filter="before_menu") await message.answer( menu_text, reply_markup=keyboard, parse_mode="HTML" ) + await _send_pinned_message(message.bot, db, existing_user, position_filter="after_menu") except Exception as e: logger.error(f"Ошибка при показе главного меню существующему пользователю: {e}") await message.answer( @@ -1535,6 +1557,8 @@ async def complete_registration( reply_markup=get_post_registration_keyboard(user.language), ) logger.info(f"✅ Приветственное сообщение отправлено пользователю {user.telegram_id}") + await _send_pinned_message(message.bot, db, user, position_filter="before_menu") + await _send_pinned_message(message.bot, db, user, position_filter="after_menu") except Exception as e: logger.error(f"Ошибка при отправке приветственного сообщения: {e}") else: @@ -1575,12 +1599,14 @@ async def complete_registration( is_moderator=is_moderator, custom_buttons=custom_buttons, ) + await _send_pinned_message(message.bot, db, user, position_filter="before_menu") await message.answer( menu_text, reply_markup=keyboard, parse_mode="HTML" ) logger.info(f"✅ Главное меню показано пользователю {user.telegram_id}") + await _send_pinned_message(message.bot, db, user, position_filter="after_menu") except Exception as e: logger.error(f"Ошибка при показе главного меню: {e}") await message.answer( @@ -1925,6 +1951,8 @@ async def required_sub_channel_check( reply_markup=keyboard, parse_mode="HTML", ) + await _send_pinned_message(bot, db, user, position_filter="before_menu") + await _send_pinned_message(bot, db, user, position_filter="after_menu") else: from app.keyboards.inline import get_rules_keyboard diff --git a/app/keyboards/admin.py b/app/keyboards/admin.py index 0ab2cd7f..aa71f912 100644 --- a/app/keyboards/admin.py +++ b/app/keyboards/admin.py @@ -837,12 +837,44 @@ def get_admin_messages_keyboard(language: str = "ru") -> InlineKeyboardMarkup: callback_data="admin_msg_history" ) ], + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_PINNED_MESSAGE", "📌 Закрепленное сообщение"), + callback_data="admin_pinned_message", + ) + ], [ InlineKeyboardButton(text=texts.BACK, callback_data="admin_submenu_communications") ] ]) +def get_pinned_message_keyboard(language: str = "ru", position: str | None = None) -> InlineKeyboardMarkup: + texts = get_texts(language) + + position_label = "" + if position: + before_label = texts.t("PINNED_POSITION_BEFORE_MENU", "До меню") + after_label = texts.t("PINNED_POSITION_AFTER_MENU", "После меню") + position_label = " (" + (before_label if position == "before_menu" else after_label) + ")" + + return InlineKeyboardMarkup(inline_keyboard=[ + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_PINNED_MESSAGE_UPDATE", "✏️ Обновить"), + callback_data="admin_pinned_message_edit", + ) + ], + [ + InlineKeyboardButton( + text=_t(texts, "ADMIN_PINNED_MESSAGE_POSITION", f"🔄 Позиция{position_label}"), + callback_data="admin_pinned_message_position", + ) + ], + [InlineKeyboardButton(text=texts.BACK, callback_data="admin_messages")], + ]) + + def get_admin_monitoring_keyboard(language: str = "ru") -> InlineKeyboardMarkup: texts = get_texts(language) diff --git a/app/localization/locales/en.json b/app/localization/locales/en.json index 1c6bc309..19aa1251 100644 --- a/app/localization/locales/en.json +++ b/app/localization/locales/en.json @@ -209,7 +209,12 @@ "ADMIN_MESSAGES_BY_CRITERIA": "🔍 By criteria", "ADMIN_MESSAGES_BY_SUBSCRIPTIONS": "🎯 By subscriptions", "ADMIN_MESSAGES_HISTORY": "📋 History", + "ADMIN_PINNED_MESSAGE": "📌 Pinned message", + "ADMIN_PINNED_MESSAGE_POSITION": "🔄 Position", + "ADMIN_PINNED_MESSAGE_UPDATE": "✏️ Update", "ADMIN_MONITORING": "🔍 Monitoring", + "PINNED_POSITION_BEFORE_MENU": "Before menu", + "PINNED_POSITION_AFTER_MENU": "After menu", "ADMIN_MONITORING_ALL_LOGS": "📋 All logs", "ADMIN_MONITORING_AUTOPAY_SETTINGS": "💳 Auto-pay settings", "ADMIN_MONITORING_AUTO_CLEANUP": "🧹 Auto-clean logs", diff --git a/app/localization/locales/ru.json b/app/localization/locales/ru.json index 64d4729e..28a3b171 100644 --- a/app/localization/locales/ru.json +++ b/app/localization/locales/ru.json @@ -212,7 +212,12 @@ "ADMIN_MESSAGES_BY_CRITERIA": "🔍 По критериям", "ADMIN_MESSAGES_BY_SUBSCRIPTIONS": "🎯 По подпискам", "ADMIN_MESSAGES_HISTORY": "📋 История", + "ADMIN_PINNED_MESSAGE": "📌 Закрепленное сообщение", + "ADMIN_PINNED_MESSAGE_POSITION": "🔄 Позиция", + "ADMIN_PINNED_MESSAGE_UPDATE": "✏️ Обновить", "ADMIN_MONITORING": "🔍 Мониторинг", + "PINNED_POSITION_BEFORE_MENU": "До меню", + "PINNED_POSITION_AFTER_MENU": "После меню", "ADMIN_MONITORING_ALL_LOGS": "📋 Все логи", "ADMIN_MONITORING_AUTOPAY_SETTINGS": "💳 Настройки автооплаты", "ADMIN_MONITORING_AUTO_CLEANUP": "🧹 Автоочистка логов", diff --git a/app/localization/locales/ua.json b/app/localization/locales/ua.json index dadb5a5e..90705ef7 100644 --- a/app/localization/locales/ua.json +++ b/app/localization/locales/ua.json @@ -138,8 +138,13 @@ "ADMIN_MESSAGES_ALL_USERS": "📨 Всім користувачам", "ADMIN_MESSAGES_BY_CRITERIA": "🔍 За критеріями", "ADMIN_MESSAGES_BY_SUBSCRIPTIONS": "🎯 За підписками", - "ADMIN_MESSAGES_HISTORY": "📋 Історія", - "ADMIN_MONITORING": "🔍 Моніторинг", + "ADMIN_MESSAGES_HISTORY": "📋 Історія", + "ADMIN_PINNED_MESSAGE": "📌 Закріплене повідомлення", + "ADMIN_PINNED_MESSAGE_POSITION": "🔄 Позиція", + "ADMIN_PINNED_MESSAGE_UPDATE": "✏️ Оновити", + "ADMIN_MONITORING": "🔍 Моніторинг", + "PINNED_POSITION_BEFORE_MENU": "До меню", + "PINNED_POSITION_AFTER_MENU": "Після меню", "ADMIN_MONITORING_ALL_LOGS": "📋 Всі логи", "ADMIN_MONITORING_AUTOPAY_SETTINGS": "💳 Налаштування автооплати", "ADMIN_MONITORING_AUTO_CLEANUP": "🧹 Автоочищення логів", diff --git a/app/localization/locales/zh.json b/app/localization/locales/zh.json index 44124ed5..6a8f7618 100644 --- a/app/localization/locales/zh.json +++ b/app/localization/locales/zh.json @@ -138,7 +138,12 @@ "ADMIN_MESSAGES_BY_CRITERIA":"🔍按条件", "ADMIN_MESSAGES_BY_SUBSCRIPTIONS":"🎯按订阅", "ADMIN_MESSAGES_HISTORY":"📋历史记录", +"ADMIN_PINNED_MESSAGE":"📌置顶消息", +"ADMIN_PINNED_MESSAGE_POSITION":"🔄位置", +"ADMIN_PINNED_MESSAGE_UPDATE":"✏️更新", "ADMIN_MONITORING":"🔍监控", +"PINNED_POSITION_BEFORE_MENU":"在菜单前", +"PINNED_POSITION_AFTER_MENU":"在菜单后", "ADMIN_MONITORING_ALL_LOGS":"📋所有日志", "ADMIN_MONITORING_AUTOPAY_SETTINGS":"💳自动支付设置", "ADMIN_MONITORING_AUTO_CLEANUP":"🧹自动清理日志", diff --git a/app/services/pinned_message_service.py b/app/services/pinned_message_service.py new file mode 100644 index 00000000..e89cea96 --- /dev/null +++ b/app/services/pinned_message_service.py @@ -0,0 +1,240 @@ +import asyncio +import logging +from typing import Optional + +from aiogram import Bot +from aiogram.exceptions import ( + TelegramBadRequest, + TelegramForbiddenError, + TelegramRetryAfter, +) +from sqlalchemy import select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database.crud.user import get_users_list +from app.database.models import PinnedMessage, User, UserStatus +from app.utils.validators import sanitize_html, validate_html_tags + +logger = logging.getLogger(__name__) + + +VALID_POSITIONS = {"before_menu", "after_menu"} + + +async def get_active_pinned_message(db: AsyncSession) -> Optional[PinnedMessage]: + result = await db.execute( + select(PinnedMessage) + .where(PinnedMessage.is_active.is_(True)) + .order_by(PinnedMessage.created_at.desc()) + .limit(1) + ) + return result.scalar_one_or_none() + + +async def set_active_pinned_message( + db: AsyncSession, + content: str, + created_by: Optional[int] = None, + media_type: Optional[str] = None, + media_file_id: Optional[str] = None, + delivery_position: Optional[str] = None, +) -> PinnedMessage: + sanitized_content = sanitize_html(content or "") + is_valid, error_message = validate_html_tags(sanitized_content) + if not is_valid: + raise ValueError(error_message) + + if media_type not in {None, "photo", "video"}: + raise ValueError("Поддерживаются только фото или видео в закрепленном сообщении") + + if created_by is not None: + creator_id = await db.scalar(select(User.id).where(User.id == created_by)) + else: + creator_id = None + + previous_active = await get_active_pinned_message(db) + + await db.execute( + update(PinnedMessage) + .where(PinnedMessage.is_active.is_(True)) + .values(is_active=False) + ) + + resolved_position = delivery_position or getattr(previous_active, "delivery_position", None) or "before_menu" + if resolved_position not in VALID_POSITIONS: + resolved_position = "before_menu" + + pinned_message = PinnedMessage( + content=sanitized_content, + media_type=media_type, + media_file_id=media_file_id, + is_active=True, + created_by=creator_id, + delivery_position=resolved_position, + ) + + db.add(pinned_message) + await db.commit() + await db.refresh(pinned_message) + + logger.info("Создано новое закрепленное сообщение #%s", pinned_message.id) + return pinned_message + + +async def deliver_pinned_message_to_user( + bot: Bot, + db: AsyncSession, + user: User, + position_filter: Optional[str] = None, +) -> bool: + pinned_message = await get_active_pinned_message(db) + if not pinned_message: + return False + + position = getattr(pinned_message, "delivery_position", "before_menu") or "before_menu" + if position_filter and position != position_filter: + return False + + return await _send_and_pin_message(bot, user.telegram_id, pinned_message) + + +async def update_active_pinned_position(db: AsyncSession, position: str) -> Optional[PinnedMessage]: + if position not in VALID_POSITIONS: + raise ValueError("Недопустимая позиция закрепленного сообщения") + + pinned_message = await get_active_pinned_message(db) + if not pinned_message: + return None + + await db.execute( + update(PinnedMessage) + .where(PinnedMessage.id == pinned_message.id) + .values(delivery_position=position) + ) + await db.commit() + await db.refresh(pinned_message) + return pinned_message + + +async def broadcast_pinned_message( + bot: Bot, + db: AsyncSession, + pinned_message: PinnedMessage, +) -> tuple[int, int]: + users: list[User] = [] + offset = 0 + batch_size = 5000 + + while True: + batch = await get_users_list( + db, + offset=offset, + limit=batch_size, + status=UserStatus.ACTIVE, + ) + + if not batch: + break + + users.extend(batch) + offset += batch_size + + sent_count = 0 + failed_count = 0 + semaphore = asyncio.Semaphore(2) + + async def send_to_user(user: User) -> None: + nonlocal sent_count, failed_count + async with semaphore: + for attempt in range(3): + try: + success = await _send_and_pin_message( + bot, + user.telegram_id, + pinned_message, + ) + if success: + sent_count += 1 + else: + failed_count += 1 + break + except TelegramRetryAfter as retry_error: + delay = min(retry_error.retry_after + 1, 30) + logger.warning( + "RetryAfter for user %s, waiting %s seconds", + user.telegram_id, + delay, + ) + await asyncio.sleep(delay) + except Exception as send_error: # noqa: BLE001 + logger.error( + "Ошибка отправки закрепленного сообщения пользователю %s: %s", + user.telegram_id, + send_error, + ) + failed_count += 1 + break + + for i in range(0, len(users), 50): + batch = users[i : i + 50] + tasks = [send_to_user(user) for user in batch] + await asyncio.gather(*tasks) + await asyncio.sleep(0.1) + + return sent_count, failed_count + + +async def _send_and_pin_message(bot: Bot, chat_id: int, pinned_message: PinnedMessage) -> bool: + try: + await bot.unpin_all_chat_messages(chat_id=chat_id) + except TelegramBadRequest: + pass + except TelegramForbiddenError: + return False + + try: + if pinned_message.media_type == "photo" and pinned_message.media_file_id: + sent_message = await bot.send_photo( + chat_id=chat_id, + photo=pinned_message.media_file_id, + caption=pinned_message.content or None, + parse_mode="HTML" if pinned_message.content else None, + disable_notification=True, + ) + elif pinned_message.media_type == "video" and pinned_message.media_file_id: + sent_message = await bot.send_video( + chat_id=chat_id, + video=pinned_message.media_file_id, + caption=pinned_message.content or None, + parse_mode="HTML" if pinned_message.content else None, + disable_notification=True, + ) + else: + sent_message = await bot.send_message( + chat_id=chat_id, + text=pinned_message.content, + parse_mode="HTML", + disable_web_page_preview=True, + ) + await bot.pin_chat_message( + chat_id=chat_id, + message_id=sent_message.message_id, + disable_notification=True, + ) + return True + except TelegramForbiddenError: + return False + except TelegramBadRequest as error: + logger.warning( + "Некорректный запрос при отправке закрепленного сообщения в чат %s: %s", + chat_id, + error, + ) + except Exception as error: # noqa: BLE001 + logger.error( + "Не удалось отправить закрепленное сообщение пользователю %s: %s", + chat_id, + error, + ) + + return False diff --git a/app/states.py b/app/states.py index 795a6d67..ba8cfb0c 100644 --- a/app/states.py +++ b/app/states.py @@ -134,6 +134,7 @@ class AdminStates(StatesGroup): creating_server_country = State() editing_welcome_text = State() + editing_pinned_message = State() waiting_for_message_buttons = "waiting_for_message_buttons" editing_promo_offer_message = State() diff --git a/migrations/alembic/versions/0f3d1f5c2c0a_add_delivery_position_to_pinned_messages.py b/migrations/alembic/versions/0f3d1f5c2c0a_add_delivery_position_to_pinned_messages.py new file mode 100644 index 00000000..7779fdba --- /dev/null +++ b/migrations/alembic/versions/0f3d1f5c2c0a_add_delivery_position_to_pinned_messages.py @@ -0,0 +1,28 @@ +"""add delivery position to pinned messages + +Revision ID: 0f3d1f5c2c0a +Revises: 5f2a3e099427 +Create Date: 2025-01-01 00:00:00.000000 +""" + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = '0f3d1f5c2c0a' +down_revision = '5f2a3e099427' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + 'pinned_messages', + sa.Column('delivery_position', sa.String(length=32), nullable=False, server_default='before_menu'), + ) + op.execute("UPDATE pinned_messages SET delivery_position = 'before_menu' WHERE delivery_position IS NULL") + op.alter_column('pinned_messages', 'delivery_position', server_default=None) + + +def downgrade() -> None: + op.drop_column('pinned_messages', 'delivery_position') diff --git a/migrations/alembic/versions/5f2a3e099427_add_media_fields_to_pinned_messages.py b/migrations/alembic/versions/5f2a3e099427_add_media_fields_to_pinned_messages.py new file mode 100644 index 00000000..fdd05440 --- /dev/null +++ b/migrations/alembic/versions/5f2a3e099427_add_media_fields_to_pinned_messages.py @@ -0,0 +1,75 @@ +"""add media fields to pinned messages""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +revision: str = "5f2a3e099427" +down_revision: Union[str, None] = "c9c71d04f0a1" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +TABLE_NAME = "pinned_messages" + + +def _table_exists(inspector: sa.Inspector) -> bool: + return TABLE_NAME in inspector.get_table_names() + + +def _column_missing(inspector: sa.Inspector, column_name: str) -> bool: + columns = {column.get("name") for column in inspector.get_columns(TABLE_NAME)} + return column_name not in columns + + +def upgrade() -> None: + bind = op.get_bind() + inspector = sa.inspect(bind) + + if not _table_exists(inspector): + return + + if _column_missing(inspector, "media_type"): + op.add_column( + TABLE_NAME, + sa.Column("media_type", sa.String(length=32), nullable=True), + ) + + if _column_missing(inspector, "media_file_id"): + op.add_column( + TABLE_NAME, + sa.Column("media_file_id", sa.String(length=255), nullable=True), + ) + + # Ensure content has a default value for media-only messages + op.alter_column( + TABLE_NAME, + "content", + existing_type=sa.Text(), + nullable=False, + server_default="", + ) + + +def downgrade() -> None: + bind = op.get_bind() + inspector = sa.inspect(bind) + + if not _table_exists(inspector): + return + + if not _column_missing(inspector, "media_type"): + op.drop_column(TABLE_NAME, "media_type") + + if not _column_missing(inspector, "media_file_id"): + op.drop_column(TABLE_NAME, "media_file_id") + + op.alter_column( + TABLE_NAME, + "content", + existing_type=sa.Text(), + nullable=False, + server_default=None, + ) diff --git a/migrations/alembic/versions/c9c71d04f0a1_add_pinned_messages_table.py b/migrations/alembic/versions/c9c71d04f0a1_add_pinned_messages_table.py new file mode 100644 index 00000000..add5fe11 --- /dev/null +++ b/migrations/alembic/versions/c9c71d04f0a1_add_pinned_messages_table.py @@ -0,0 +1,45 @@ +"""add pinned messages table""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +revision: str = "c9c71d04f0a1" +down_revision: Union[str, None] = "e3c1e0b5b4a7" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +TABLE_NAME = "pinned_messages" + + +def _table_exists(inspector: sa.Inspector) -> bool: + return TABLE_NAME in inspector.get_table_names() + + +def upgrade() -> None: + bind = op.get_bind() + inspector = sa.inspect(bind) + + if _table_exists(inspector): + return + + op.create_table( + TABLE_NAME, + sa.Column("id", sa.Integer(), primary_key=True, index=True), + sa.Column("content", sa.Text(), nullable=False), + sa.Column("is_active", sa.Boolean(), default=True), + sa.Column("created_by", sa.Integer(), sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True), + sa.Column("created_at", sa.DateTime(), server_default=sa.func.now()), + sa.Column("updated_at", sa.DateTime(), server_default=sa.func.now(), onupdate=sa.func.now()), + ) + + +def downgrade() -> None: + bind = op.get_bind() + inspector = sa.inspect(bind) + + if _table_exists(inspector): + op.drop_table(TABLE_NAME)