diff --git a/app/database/models.py b/app/database/models.py
index 13cdfd0b..8c246d20 100644
--- a/app/database/models.py
+++ b/app/database/models.py
@@ -1553,6 +1553,22 @@ class WelcomeText(Base):
creator = relationship("User", backref="created_welcome_texts")
+class PinnedMessage(Base):
+ __tablename__ = "pinned_messages"
+
+ id = Column(Integer, primary_key=True, index=True)
+ content = Column(Text, nullable=False, default="")
+ media_type = Column(String(32), nullable=True)
+ media_file_id = Column(String(255), nullable=True)
+ delivery_position = Column(String(32), nullable=False, default="before_menu")
+ is_active = Column(Boolean, default=True)
+ created_by = Column(Integer, ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
+ created_at = Column(DateTime, default=func.now())
+ updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
+
+ creator = relationship("User", backref="pinned_messages")
+
+
class AdvertisingCampaign(Base):
__tablename__ = "advertising_campaigns"
diff --git a/app/database/universal_migration.py b/app/database/universal_migration.py
index e418502e..afea3fbc 100644
--- a/app/database/universal_migration.py
+++ b/app/database/universal_migration.py
@@ -3014,6 +3014,153 @@ async def create_welcome_texts_table():
logger.error(f"Ошибка создания таблицы welcome_texts: {e}")
return False
+
+async def create_pinned_messages_table():
+ table_exists = await check_table_exists("pinned_messages")
+ if table_exists:
+ logger.info("Таблица pinned_messages уже существует")
+ return True
+
+ try:
+ async with engine.begin() as conn:
+ db_type = await get_database_type()
+
+ if db_type == "sqlite":
+ create_sql = """
+ CREATE TABLE pinned_messages (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ content TEXT NOT NULL DEFAULT '',
+ media_type VARCHAR(32) NULL,
+ media_file_id VARCHAR(255) NULL,
+ delivery_position VARCHAR(32) NOT NULL DEFAULT 'before_menu',
+ is_active BOOLEAN DEFAULT 1,
+ created_by INTEGER NULL,
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
+ updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
+ FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL
+ );
+
+ CREATE INDEX IF NOT EXISTS ix_pinned_messages_active ON pinned_messages(is_active);
+ """
+
+ elif db_type == "postgresql":
+ create_sql = """
+ CREATE TABLE pinned_messages (
+ id SERIAL PRIMARY KEY,
+ content TEXT NOT NULL DEFAULT '',
+ media_type VARCHAR(32) NULL,
+ media_file_id VARCHAR(255) NULL,
+ delivery_position VARCHAR(32) NOT NULL DEFAULT 'before_menu',
+ is_active BOOLEAN DEFAULT TRUE,
+ created_by INTEGER NULL REFERENCES users(id) ON DELETE SET NULL,
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+ );
+
+ CREATE INDEX IF NOT EXISTS ix_pinned_messages_active ON pinned_messages(is_active);
+ """
+
+ elif db_type == "mysql":
+ create_sql = """
+ CREATE TABLE pinned_messages (
+ id INT AUTO_INCREMENT PRIMARY KEY,
+ content TEXT NOT NULL DEFAULT '',
+ media_type VARCHAR(32) NULL,
+ media_file_id VARCHAR(255) NULL,
+ delivery_position VARCHAR(32) NOT NULL DEFAULT 'before_menu',
+ is_active BOOLEAN DEFAULT TRUE,
+ created_by INT NULL,
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
+ updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL
+ );
+
+ CREATE INDEX ix_pinned_messages_active ON pinned_messages(is_active);
+ """
+
+ else:
+ logger.error(f"Неподдерживаемый тип БД для создания таблицы pinned_messages: {db_type}")
+ return False
+
+ await conn.execute(text(create_sql))
+
+ logger.info("✅ Таблица pinned_messages успешно создана")
+ return True
+
+ except Exception as e:
+ logger.error(f"Ошибка создания таблицы pinned_messages: {e}")
+ return False
+
+
+async def ensure_pinned_message_media_columns():
+ table_exists = await check_table_exists("pinned_messages")
+ if not table_exists:
+ logger.warning("⚠️ Таблица pinned_messages отсутствует — пропускаем обновление медиа полей")
+ return False
+
+ try:
+ async with engine.begin() as conn:
+ db_type = await get_database_type()
+
+ if not await check_column_exists("pinned_messages", "media_type"):
+ await conn.execute(
+ text("ALTER TABLE pinned_messages ADD COLUMN media_type VARCHAR(32)")
+ )
+
+ if not await check_column_exists("pinned_messages", "media_file_id"):
+ await conn.execute(
+ text("ALTER TABLE pinned_messages ADD COLUMN media_file_id VARCHAR(255)")
+ )
+
+ await conn.execute(text("UPDATE pinned_messages SET content = '' WHERE content IS NULL"))
+
+ if db_type == "postgresql":
+ await conn.execute(
+ text("ALTER TABLE pinned_messages ALTER COLUMN content SET DEFAULT ''")
+ )
+ elif db_type == "mysql":
+ await conn.execute(
+ text("ALTER TABLE pinned_messages MODIFY content TEXT NOT NULL DEFAULT ''")
+ )
+ else:
+ logger.info("ℹ️ Пропускаем установку DEFAULT для content в SQLite")
+
+ logger.info("✅ Медиа поля pinned_messages приведены в актуальное состояние")
+ return True
+
+ except Exception as e:
+ logger.error(f"Ошибка обновления медиа полей pinned_messages: {e}")
+ return False
+
+
+async def ensure_pinned_message_position_column():
+ table_exists = await check_table_exists("pinned_messages")
+ if not table_exists:
+ logger.warning("⚠️ Таблица pinned_messages отсутствует — пропускаем обновление позиции")
+ return False
+
+ try:
+ async with engine.begin() as conn:
+ if not await check_column_exists("pinned_messages", "delivery_position"):
+ await conn.execute(
+ text(
+ "ALTER TABLE pinned_messages ADD COLUMN delivery_position VARCHAR(32) "
+ "NOT NULL DEFAULT 'before_menu'"
+ )
+ )
+ await conn.execute(
+ text(
+ "UPDATE pinned_messages SET delivery_position = 'before_menu' "
+ "WHERE delivery_position IS NULL"
+ )
+ )
+ logger.info("✅ Позиция закрепленного сообщения приведена в актуальное состояние")
+ return True
+
+ except Exception as e:
+ logger.error(f"Ошибка обновления позиции закрепленного сообщения: {e}")
+ return False
+
async def add_media_fields_to_broadcast_history():
logger.info("=== ДОБАВЛЕНИЕ ПОЛЕЙ МЕДИА В BROADCAST_HISTORY ===")
@@ -4690,12 +4837,32 @@ async def run_universal_migration():
else:
logger.warning("⚠️ Проблемы с таблицей user_messages")
+ logger.info("=== СОЗДАНИЕ ТАБЛИЦЫ PINNED_MESSAGES ===")
+ pinned_messages_created = await create_pinned_messages_table()
+ if pinned_messages_created:
+ logger.info("✅ Таблица pinned_messages готова")
+ else:
+ logger.warning("⚠️ Проблемы с таблицей pinned_messages")
+
logger.info("=== СОЗДАНИЕ/ОБНОВЛЕНИЕ ТАБЛИЦЫ WELCOME_TEXTS ===")
welcome_texts_created = await create_welcome_texts_table()
if welcome_texts_created:
logger.info("✅ Таблица welcome_texts готова с полем is_enabled")
else:
logger.warning("⚠️ Проблемы с таблицей welcome_texts")
+
+ logger.info("=== ОБНОВЛЕНИЕ СХЕМЫ PINNED_MESSAGES ===")
+ pinned_media_ready = await ensure_pinned_message_media_columns()
+ if pinned_media_ready:
+ logger.info("✅ Медиа поля для pinned_messages готовы")
+ else:
+ logger.warning("⚠️ Проблемы с медиа полями pinned_messages")
+
+ pinned_position_ready = await ensure_pinned_message_position_column()
+ if pinned_position_ready:
+ logger.info("✅ Позиция закрепленного сообщения готова")
+ else:
+ logger.warning("⚠️ Проблемы с позицией закрепленного сообщения")
logger.info("=== ДОБАВЛЕНИЕ МЕДИА ПОЛЕЙ В BROADCAST_HISTORY ===")
media_fields_added = await add_media_fields_to_broadcast_history()
@@ -4880,8 +5047,11 @@ async def check_migration_status():
"cryptobot_table": False,
"heleket_table": False,
"user_messages_table": False,
+ "pinned_messages_table": False,
"welcome_texts_table": False,
"welcome_texts_is_enabled_column": False,
+ "pinned_messages_media_columns": False,
+ "pinned_messages_position_column": False,
"broadcast_history_media_fields": False,
"subscription_duplicates": False,
"subscription_conversions_table": False,
@@ -4924,6 +5094,7 @@ async def check_migration_status():
status["cryptobot_table"] = await check_table_exists('cryptobot_payments')
status["heleket_table"] = await check_table_exists('heleket_payments')
status["user_messages_table"] = await check_table_exists('user_messages')
+ status["pinned_messages_table"] = await check_table_exists('pinned_messages')
status["welcome_texts_table"] = await check_table_exists('welcome_texts')
status["privacy_policies_table"] = await check_table_exists('privacy_policies')
status["public_offers_table"] = await check_table_exists('public_offers')
@@ -4969,6 +5140,19 @@ async def check_migration_status():
await check_column_exists('broadcast_history', 'media_caption')
)
status["broadcast_history_media_fields"] = media_fields_exist
+
+ pinned_media_columns_exist = (
+ status["pinned_messages_table"]
+ and await check_column_exists('pinned_messages', 'media_type')
+ and await check_column_exists('pinned_messages', 'media_file_id')
+ )
+ status["pinned_messages_media_columns"] = pinned_media_columns_exist
+
+ pinned_position_exists = (
+ status["pinned_messages_table"]
+ and await check_column_exists('pinned_messages', 'delivery_position')
+ )
+ status["pinned_messages_position_column"] = pinned_position_exists
async with engine.begin() as conn:
duplicates_check = await conn.execute(text("""
@@ -4987,10 +5171,13 @@ async def check_migration_status():
"cryptobot_table": "Таблица CryptoBot payments",
"heleket_table": "Таблица Heleket payments",
"user_messages_table": "Таблица пользовательских сообщений",
+ "pinned_messages_table": "Таблица закреплённых сообщений",
"welcome_texts_table": "Таблица приветственных текстов",
"privacy_policies_table": "Таблица политик конфиденциальности",
"public_offers_table": "Таблица публичных оферт",
"welcome_texts_is_enabled_column": "Поле is_enabled в welcome_texts",
+ "pinned_messages_media_columns": "Медиа поля в pinned_messages",
+ "pinned_messages_position_column": "Поле позиции закреплённого сообщения",
"broadcast_history_media_fields": "Медиа поля в broadcast_history",
"subscription_conversions_table": "Таблица конверсий подписок",
"subscription_events_table": "Таблица событий подписок",
diff --git a/app/handlers/admin/messages.py b/app/handlers/admin/messages.py
index 3bf0210f..7b7b05e4 100644
--- a/app/handlers/admin/messages.py
+++ b/app/handlers/admin/messages.py
@@ -1,3 +1,4 @@
+import html
import logging
import asyncio
from datetime import datetime, timedelta
@@ -25,13 +26,19 @@ from app.keyboards.admin import (
get_admin_pagination_keyboard, get_broadcast_media_keyboard,
get_media_confirm_keyboard, get_updated_message_buttons_selector_keyboard_with_media,
BROADCAST_BUTTON_ROWS, DEFAULT_BROADCAST_BUTTONS,
- get_broadcast_button_config, get_broadcast_button_labels
+ get_broadcast_button_config, get_broadcast_button_labels, get_pinned_message_keyboard
)
from app.localization.texts import get_texts
from app.database.crud.user import get_users_list
from app.database.crud.subscription import get_expiring_subscriptions
from app.utils.decorators import admin_required, error_handler
from app.utils.miniapp_buttons import build_miniapp_or_callback_button
+from app.services.pinned_message_service import (
+ broadcast_pinned_message,
+ get_active_pinned_message,
+ set_active_pinned_message,
+ update_active_pinned_position,
+)
logger = logging.getLogger(__name__)
@@ -167,6 +174,155 @@ async def show_messages_menu(
await callback.answer()
+@admin_required
+@error_handler
+async def show_pinned_message_menu(
+ callback: types.CallbackQuery,
+ db_user: User,
+ db: AsyncSession,
+ state: FSMContext,
+):
+ texts = get_texts(db_user.language)
+ await state.clear()
+ pinned_message = await get_active_pinned_message(db)
+
+ if pinned_message:
+ content_preview = html.escape(pinned_message.content or "")
+ last_updated = pinned_message.updated_at or pinned_message.created_at
+ timestamp_text = last_updated.strftime("%d.%m.%Y %H:%M") if last_updated else "—"
+ media_line = ""
+ if pinned_message.media_type:
+ media_label = "Фото" if pinned_message.media_type == "photo" else "Видео"
+ media_line = f"📎 Медиа: {media_label}\n"
+ position_label = (
+ texts.t("PINNED_POSITION_BEFORE_MENU", "До меню")
+ if (pinned_message.delivery_position or "before_menu") == "before_menu"
+ else texts.t("PINNED_POSITION_AFTER_MENU", "После меню")
+ )
+ body = (
+ "📌 Закрепленное сообщение\n\n"
+ "📝 Текущий текст:\n"
+ f"{content_preview}\n\n"
+ f"{media_line}"
+ f"📍 Позиция: {position_label}\n"
+ f"🕒 Обновлено: {timestamp_text}"
+ )
+ else:
+ body = (
+ "📌 Закрепленное сообщение\n\n"
+ "Сообщение не задано. Отправьте новый текст, чтобы разослать и закрепить его у пользователей."
+ )
+
+ await callback.message.edit_text(
+ body,
+ reply_markup=get_pinned_message_keyboard(
+ db_user.language,
+ position=pinned_message.delivery_position if pinned_message else None,
+ ),
+ parse_mode="HTML",
+ )
+ await callback.answer()
+
+
+@admin_required
+@error_handler
+async def prompt_pinned_message_update(
+ callback: types.CallbackQuery,
+ db_user: User,
+ state: FSMContext,
+):
+ await state.set_state(AdminStates.editing_pinned_message)
+ await callback.message.edit_text(
+ "✏️ Новое закрепленное сообщение\n\n"
+ "Пришлите текст, фото или видео, которое нужно закрепить.\n"
+ "Бот отправит его всем активным пользователям, открепит старое и закрепит новое без уведомлений.",
+ reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
+ [types.InlineKeyboardButton(text="❌ Отмена", callback_data="admin_pinned_message")]
+ ]),
+ parse_mode="HTML",
+ )
+ await callback.answer()
+
+
+@admin_required
+@error_handler
+async def process_pinned_message_update(
+ message: types.Message,
+ db_user: User,
+ state: FSMContext,
+ db: AsyncSession,
+):
+ media_type: Optional[str] = None
+ media_file_id: Optional[str] = None
+
+ if message.photo:
+ media_type = "photo"
+ media_file_id = message.photo[-1].file_id
+ elif message.video:
+ media_type = "video"
+ media_file_id = message.video.file_id
+
+ pinned_text = message.html_text or message.caption_html or message.text or message.caption or ""
+
+ if not pinned_text and not media_file_id:
+ await message.answer("❌ Не удалось прочитать текст или медиа в сообщении, попробуйте снова.")
+ return
+
+ try:
+ pinned_message = await set_active_pinned_message(
+ db,
+ pinned_text,
+ db_user.id,
+ media_type=media_type,
+ media_file_id=media_file_id,
+ )
+ except ValueError as validation_error:
+ await message.answer(f"❌ {validation_error}")
+ return
+
+ await message.answer(
+ "📌 Сообщение сохранено. Начинаю отправку и закрепление у пользователей...",
+ parse_mode="HTML",
+ )
+
+ sent_count, failed_count = await broadcast_pinned_message(
+ message.bot,
+ db,
+ pinned_message,
+ )
+
+ total = sent_count + failed_count
+ await message.answer(
+ "✅ Закрепленное сообщение обновлено\n\n"
+ f"👥 Получателей: {total}\n"
+ f"✅ Отправлено: {sent_count}\n"
+ f"⚠️ Ошибок: {failed_count}",
+ reply_markup=get_admin_messages_keyboard(db_user.language),
+ parse_mode="HTML",
+ )
+ await state.clear()
+
+
+@admin_required
+@error_handler
+async def toggle_pinned_message_position(
+ callback: types.CallbackQuery,
+ db_user: User,
+ db: AsyncSession,
+ state: FSMContext,
+):
+ pinned_message = await get_active_pinned_message(db)
+ if not pinned_message:
+ await callback.answer("Сначала создайте закрепленное сообщение", show_alert=True)
+ return
+
+ current_position = (pinned_message.delivery_position or "before_menu")
+ new_position = "after_menu" if current_position == "before_menu" else "before_menu"
+
+ await update_active_pinned_position(db, new_position)
+ await show_pinned_message_menu(callback, db_user, db, state)
+
+
@admin_required
@error_handler
async def show_broadcast_targets(
@@ -1295,6 +1451,9 @@ def get_target_display_name(target: str) -> str:
def register_handlers(dp: Dispatcher):
dp.callback_query.register(show_messages_menu, F.data == "admin_messages")
+ dp.callback_query.register(show_pinned_message_menu, F.data == "admin_pinned_message")
+ dp.callback_query.register(toggle_pinned_message_position, F.data == "admin_pinned_message_position")
+ dp.callback_query.register(prompt_pinned_message_update, F.data == "admin_pinned_message_edit")
dp.callback_query.register(show_broadcast_targets, F.data.in_(["admin_msg_all", "admin_msg_by_sub"]))
dp.callback_query.register(select_broadcast_target, F.data.startswith("broadcast_"))
dp.callback_query.register(confirm_broadcast, F.data == "admin_confirm_broadcast")
@@ -1312,3 +1471,4 @@ def register_handlers(dp: Dispatcher):
dp.callback_query.register(handle_change_media, F.data == "change_media")
dp.message.register(process_broadcast_message, AdminStates.waiting_for_broadcast_message)
dp.message.register(process_broadcast_media, AdminStates.waiting_for_broadcast_media)
+ dp.message.register(process_pinned_message_update, AdminStates.editing_pinned_message)
diff --git a/app/handlers/common.py b/app/handlers/common.py
index 3c8549f2..de88d57c 100644
--- a/app/handlers/common.py
+++ b/app/handlers/common.py
@@ -67,7 +67,7 @@ async def handle_cancel(
async def handle_unknown_message(
message: types.Message,
- db_user: User
+ db_user: User | None = None
):
texts = get_texts(db_user.language if db_user else "ru")
diff --git a/app/handlers/start.py b/app/handlers/start.py
index f2e125d1..d01f816e 100644
--- a/app/handlers/start.py
+++ b/app/handlers/start.py
@@ -36,6 +36,7 @@ from app.services.subscription_service import SubscriptionService
from app.services.support_settings_service import SupportSettingsService
from app.services.main_menu_button_service import MainMenuButtonService
from app.services.privacy_policy_service import PrivacyPolicyService
+from app.services.pinned_message_service import deliver_pinned_message_to_user
from app.utils.user_utils import generate_unique_referral_code
from app.utils.promo_offer import (
build_promo_offer_hint,
@@ -61,6 +62,17 @@ def _calculate_subscription_flags(subscription):
return has_active_subscription, subscription_is_active
+async def _send_pinned_message(bot: Bot, db: AsyncSession, user, position_filter: str | None = None) -> None:
+ try:
+ await deliver_pinned_message_to_user(bot, db, user, position_filter=position_filter)
+ except Exception as error: # noqa: BLE001
+ logger.error(
+ "Не удалось отправить закрепленное сообщение пользователю %s: %s",
+ getattr(user, "telegram_id", "unknown"),
+ error,
+ )
+
+
async def _apply_campaign_bonus_if_needed(
db: AsyncSession,
user,
@@ -433,11 +445,13 @@ async def cmd_start(message: types.Message, state: FSMContext, db: AsyncSession,
is_moderator=is_moderator,
custom_buttons=custom_buttons,
)
+ await _send_pinned_message(message.bot, db, user, position_filter="before_menu")
await message.answer(
menu_text,
reply_markup=keyboard,
parse_mode="HTML"
)
+ await _send_pinned_message(message.bot, db, user, position_filter="after_menu")
await state.clear()
return
@@ -1089,11 +1103,13 @@ async def complete_registration_from_callback(
is_moderator=is_moderator,
custom_buttons=custom_buttons,
)
+ await _send_pinned_message(callback.bot, db, existing_user, position_filter="before_menu")
await callback.message.answer(
menu_text,
reply_markup=keyboard,
parse_mode="HTML"
)
+ await _send_pinned_message(callback.bot, db, existing_user, position_filter="after_menu")
except Exception as e:
logger.error(f"Ошибка при показе главного меню существующему пользователю: {e}")
await callback.message.answer(
@@ -1232,6 +1248,8 @@ async def complete_registration_from_callback(
reply_markup=get_post_registration_keyboard(user.language),
)
logger.info(f"✅ Приветственное сообщение отправлено пользователю {user.telegram_id}")
+ await _send_pinned_message(callback.bot, db, user, position_filter="before_menu")
+ await _send_pinned_message(callback.bot, db, user, position_filter="after_menu")
except Exception as e:
logger.error(f"Ошибка при отправке приветственного сообщения: {e}")
else:
@@ -1272,11 +1290,13 @@ async def complete_registration_from_callback(
is_moderator=is_moderator,
custom_buttons=custom_buttons,
)
+ await _send_pinned_message(callback.bot, db, user, position_filter="before_menu")
await callback.message.answer(
menu_text,
reply_markup=keyboard,
parse_mode="HTML"
)
+ await _send_pinned_message(callback.bot, db, user, position_filter="after_menu")
logger.info(f"✅ Главное меню показано пользователю {user.telegram_id}")
except Exception as e:
logger.error(f"Ошибка при показе главного меню: {e}")
@@ -1369,11 +1389,13 @@ async def complete_registration(
is_moderator=is_moderator,
custom_buttons=custom_buttons,
)
+ await _send_pinned_message(message.bot, db, existing_user, position_filter="before_menu")
await message.answer(
menu_text,
reply_markup=keyboard,
parse_mode="HTML"
)
+ await _send_pinned_message(message.bot, db, existing_user, position_filter="after_menu")
except Exception as e:
logger.error(f"Ошибка при показе главного меню существующему пользователю: {e}")
await message.answer(
@@ -1535,6 +1557,8 @@ async def complete_registration(
reply_markup=get_post_registration_keyboard(user.language),
)
logger.info(f"✅ Приветственное сообщение отправлено пользователю {user.telegram_id}")
+ await _send_pinned_message(message.bot, db, user, position_filter="before_menu")
+ await _send_pinned_message(message.bot, db, user, position_filter="after_menu")
except Exception as e:
logger.error(f"Ошибка при отправке приветственного сообщения: {e}")
else:
@@ -1575,12 +1599,14 @@ async def complete_registration(
is_moderator=is_moderator,
custom_buttons=custom_buttons,
)
+ await _send_pinned_message(message.bot, db, user, position_filter="before_menu")
await message.answer(
menu_text,
reply_markup=keyboard,
parse_mode="HTML"
)
logger.info(f"✅ Главное меню показано пользователю {user.telegram_id}")
+ await _send_pinned_message(message.bot, db, user, position_filter="after_menu")
except Exception as e:
logger.error(f"Ошибка при показе главного меню: {e}")
await message.answer(
@@ -1925,6 +1951,8 @@ async def required_sub_channel_check(
reply_markup=keyboard,
parse_mode="HTML",
)
+ await _send_pinned_message(bot, db, user, position_filter="before_menu")
+ await _send_pinned_message(bot, db, user, position_filter="after_menu")
else:
from app.keyboards.inline import get_rules_keyboard
diff --git a/app/keyboards/admin.py b/app/keyboards/admin.py
index 0ab2cd7f..aa71f912 100644
--- a/app/keyboards/admin.py
+++ b/app/keyboards/admin.py
@@ -837,12 +837,44 @@ def get_admin_messages_keyboard(language: str = "ru") -> InlineKeyboardMarkup:
callback_data="admin_msg_history"
)
],
+ [
+ InlineKeyboardButton(
+ text=_t(texts, "ADMIN_PINNED_MESSAGE", "📌 Закрепленное сообщение"),
+ callback_data="admin_pinned_message",
+ )
+ ],
[
InlineKeyboardButton(text=texts.BACK, callback_data="admin_submenu_communications")
]
])
+def get_pinned_message_keyboard(language: str = "ru", position: str | None = None) -> InlineKeyboardMarkup:
+ texts = get_texts(language)
+
+ position_label = ""
+ if position:
+ before_label = texts.t("PINNED_POSITION_BEFORE_MENU", "До меню")
+ after_label = texts.t("PINNED_POSITION_AFTER_MENU", "После меню")
+ position_label = " (" + (before_label if position == "before_menu" else after_label) + ")"
+
+ return InlineKeyboardMarkup(inline_keyboard=[
+ [
+ InlineKeyboardButton(
+ text=_t(texts, "ADMIN_PINNED_MESSAGE_UPDATE", "✏️ Обновить"),
+ callback_data="admin_pinned_message_edit",
+ )
+ ],
+ [
+ InlineKeyboardButton(
+ text=_t(texts, "ADMIN_PINNED_MESSAGE_POSITION", f"🔄 Позиция{position_label}"),
+ callback_data="admin_pinned_message_position",
+ )
+ ],
+ [InlineKeyboardButton(text=texts.BACK, callback_data="admin_messages")],
+ ])
+
+
def get_admin_monitoring_keyboard(language: str = "ru") -> InlineKeyboardMarkup:
texts = get_texts(language)
diff --git a/app/localization/locales/en.json b/app/localization/locales/en.json
index 1c6bc309..19aa1251 100644
--- a/app/localization/locales/en.json
+++ b/app/localization/locales/en.json
@@ -209,7 +209,12 @@
"ADMIN_MESSAGES_BY_CRITERIA": "🔍 By criteria",
"ADMIN_MESSAGES_BY_SUBSCRIPTIONS": "🎯 By subscriptions",
"ADMIN_MESSAGES_HISTORY": "📋 History",
+ "ADMIN_PINNED_MESSAGE": "📌 Pinned message",
+ "ADMIN_PINNED_MESSAGE_POSITION": "🔄 Position",
+ "ADMIN_PINNED_MESSAGE_UPDATE": "✏️ Update",
"ADMIN_MONITORING": "🔍 Monitoring",
+ "PINNED_POSITION_BEFORE_MENU": "Before menu",
+ "PINNED_POSITION_AFTER_MENU": "After menu",
"ADMIN_MONITORING_ALL_LOGS": "📋 All logs",
"ADMIN_MONITORING_AUTOPAY_SETTINGS": "💳 Auto-pay settings",
"ADMIN_MONITORING_AUTO_CLEANUP": "🧹 Auto-clean logs",
diff --git a/app/localization/locales/ru.json b/app/localization/locales/ru.json
index 64d4729e..28a3b171 100644
--- a/app/localization/locales/ru.json
+++ b/app/localization/locales/ru.json
@@ -212,7 +212,12 @@
"ADMIN_MESSAGES_BY_CRITERIA": "🔍 По критериям",
"ADMIN_MESSAGES_BY_SUBSCRIPTIONS": "🎯 По подпискам",
"ADMIN_MESSAGES_HISTORY": "📋 История",
+ "ADMIN_PINNED_MESSAGE": "📌 Закрепленное сообщение",
+ "ADMIN_PINNED_MESSAGE_POSITION": "🔄 Позиция",
+ "ADMIN_PINNED_MESSAGE_UPDATE": "✏️ Обновить",
"ADMIN_MONITORING": "🔍 Мониторинг",
+ "PINNED_POSITION_BEFORE_MENU": "До меню",
+ "PINNED_POSITION_AFTER_MENU": "После меню",
"ADMIN_MONITORING_ALL_LOGS": "📋 Все логи",
"ADMIN_MONITORING_AUTOPAY_SETTINGS": "💳 Настройки автооплаты",
"ADMIN_MONITORING_AUTO_CLEANUP": "🧹 Автоочистка логов",
diff --git a/app/localization/locales/ua.json b/app/localization/locales/ua.json
index dadb5a5e..90705ef7 100644
--- a/app/localization/locales/ua.json
+++ b/app/localization/locales/ua.json
@@ -138,8 +138,13 @@
"ADMIN_MESSAGES_ALL_USERS": "📨 Всім користувачам",
"ADMIN_MESSAGES_BY_CRITERIA": "🔍 За критеріями",
"ADMIN_MESSAGES_BY_SUBSCRIPTIONS": "🎯 За підписками",
- "ADMIN_MESSAGES_HISTORY": "📋 Історія",
- "ADMIN_MONITORING": "🔍 Моніторинг",
+ "ADMIN_MESSAGES_HISTORY": "📋 Історія",
+ "ADMIN_PINNED_MESSAGE": "📌 Закріплене повідомлення",
+ "ADMIN_PINNED_MESSAGE_POSITION": "🔄 Позиція",
+ "ADMIN_PINNED_MESSAGE_UPDATE": "✏️ Оновити",
+ "ADMIN_MONITORING": "🔍 Моніторинг",
+ "PINNED_POSITION_BEFORE_MENU": "До меню",
+ "PINNED_POSITION_AFTER_MENU": "Після меню",
"ADMIN_MONITORING_ALL_LOGS": "📋 Всі логи",
"ADMIN_MONITORING_AUTOPAY_SETTINGS": "💳 Налаштування автооплати",
"ADMIN_MONITORING_AUTO_CLEANUP": "🧹 Автоочищення логів",
diff --git a/app/localization/locales/zh.json b/app/localization/locales/zh.json
index 44124ed5..6a8f7618 100644
--- a/app/localization/locales/zh.json
+++ b/app/localization/locales/zh.json
@@ -138,7 +138,12 @@
"ADMIN_MESSAGES_BY_CRITERIA":"🔍按条件",
"ADMIN_MESSAGES_BY_SUBSCRIPTIONS":"🎯按订阅",
"ADMIN_MESSAGES_HISTORY":"📋历史记录",
+"ADMIN_PINNED_MESSAGE":"📌置顶消息",
+"ADMIN_PINNED_MESSAGE_POSITION":"🔄位置",
+"ADMIN_PINNED_MESSAGE_UPDATE":"✏️更新",
"ADMIN_MONITORING":"🔍监控",
+"PINNED_POSITION_BEFORE_MENU":"在菜单前",
+"PINNED_POSITION_AFTER_MENU":"在菜单后",
"ADMIN_MONITORING_ALL_LOGS":"📋所有日志",
"ADMIN_MONITORING_AUTOPAY_SETTINGS":"💳自动支付设置",
"ADMIN_MONITORING_AUTO_CLEANUP":"🧹自动清理日志",
diff --git a/app/services/pinned_message_service.py b/app/services/pinned_message_service.py
new file mode 100644
index 00000000..e89cea96
--- /dev/null
+++ b/app/services/pinned_message_service.py
@@ -0,0 +1,240 @@
+import asyncio
+import logging
+from typing import Optional
+
+from aiogram import Bot
+from aiogram.exceptions import (
+ TelegramBadRequest,
+ TelegramForbiddenError,
+ TelegramRetryAfter,
+)
+from sqlalchemy import select, update
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from app.database.crud.user import get_users_list
+from app.database.models import PinnedMessage, User, UserStatus
+from app.utils.validators import sanitize_html, validate_html_tags
+
+logger = logging.getLogger(__name__)
+
+
+VALID_POSITIONS = {"before_menu", "after_menu"}
+
+
+async def get_active_pinned_message(db: AsyncSession) -> Optional[PinnedMessage]:
+ result = await db.execute(
+ select(PinnedMessage)
+ .where(PinnedMessage.is_active.is_(True))
+ .order_by(PinnedMessage.created_at.desc())
+ .limit(1)
+ )
+ return result.scalar_one_or_none()
+
+
+async def set_active_pinned_message(
+ db: AsyncSession,
+ content: str,
+ created_by: Optional[int] = None,
+ media_type: Optional[str] = None,
+ media_file_id: Optional[str] = None,
+ delivery_position: Optional[str] = None,
+) -> PinnedMessage:
+ sanitized_content = sanitize_html(content or "")
+ is_valid, error_message = validate_html_tags(sanitized_content)
+ if not is_valid:
+ raise ValueError(error_message)
+
+ if media_type not in {None, "photo", "video"}:
+ raise ValueError("Поддерживаются только фото или видео в закрепленном сообщении")
+
+ if created_by is not None:
+ creator_id = await db.scalar(select(User.id).where(User.id == created_by))
+ else:
+ creator_id = None
+
+ previous_active = await get_active_pinned_message(db)
+
+ await db.execute(
+ update(PinnedMessage)
+ .where(PinnedMessage.is_active.is_(True))
+ .values(is_active=False)
+ )
+
+ resolved_position = delivery_position or getattr(previous_active, "delivery_position", None) or "before_menu"
+ if resolved_position not in VALID_POSITIONS:
+ resolved_position = "before_menu"
+
+ pinned_message = PinnedMessage(
+ content=sanitized_content,
+ media_type=media_type,
+ media_file_id=media_file_id,
+ is_active=True,
+ created_by=creator_id,
+ delivery_position=resolved_position,
+ )
+
+ db.add(pinned_message)
+ await db.commit()
+ await db.refresh(pinned_message)
+
+ logger.info("Создано новое закрепленное сообщение #%s", pinned_message.id)
+ return pinned_message
+
+
+async def deliver_pinned_message_to_user(
+ bot: Bot,
+ db: AsyncSession,
+ user: User,
+ position_filter: Optional[str] = None,
+) -> bool:
+ pinned_message = await get_active_pinned_message(db)
+ if not pinned_message:
+ return False
+
+ position = getattr(pinned_message, "delivery_position", "before_menu") or "before_menu"
+ if position_filter and position != position_filter:
+ return False
+
+ return await _send_and_pin_message(bot, user.telegram_id, pinned_message)
+
+
+async def update_active_pinned_position(db: AsyncSession, position: str) -> Optional[PinnedMessage]:
+ if position not in VALID_POSITIONS:
+ raise ValueError("Недопустимая позиция закрепленного сообщения")
+
+ pinned_message = await get_active_pinned_message(db)
+ if not pinned_message:
+ return None
+
+ await db.execute(
+ update(PinnedMessage)
+ .where(PinnedMessage.id == pinned_message.id)
+ .values(delivery_position=position)
+ )
+ await db.commit()
+ await db.refresh(pinned_message)
+ return pinned_message
+
+
+async def broadcast_pinned_message(
+ bot: Bot,
+ db: AsyncSession,
+ pinned_message: PinnedMessage,
+) -> tuple[int, int]:
+ users: list[User] = []
+ offset = 0
+ batch_size = 5000
+
+ while True:
+ batch = await get_users_list(
+ db,
+ offset=offset,
+ limit=batch_size,
+ status=UserStatus.ACTIVE,
+ )
+
+ if not batch:
+ break
+
+ users.extend(batch)
+ offset += batch_size
+
+ sent_count = 0
+ failed_count = 0
+ semaphore = asyncio.Semaphore(2)
+
+ async def send_to_user(user: User) -> None:
+ nonlocal sent_count, failed_count
+ async with semaphore:
+ for attempt in range(3):
+ try:
+ success = await _send_and_pin_message(
+ bot,
+ user.telegram_id,
+ pinned_message,
+ )
+ if success:
+ sent_count += 1
+ else:
+ failed_count += 1
+ break
+ except TelegramRetryAfter as retry_error:
+ delay = min(retry_error.retry_after + 1, 30)
+ logger.warning(
+ "RetryAfter for user %s, waiting %s seconds",
+ user.telegram_id,
+ delay,
+ )
+ await asyncio.sleep(delay)
+ except Exception as send_error: # noqa: BLE001
+ logger.error(
+ "Ошибка отправки закрепленного сообщения пользователю %s: %s",
+ user.telegram_id,
+ send_error,
+ )
+ failed_count += 1
+ break
+
+ for i in range(0, len(users), 50):
+ batch = users[i : i + 50]
+ tasks = [send_to_user(user) for user in batch]
+ await asyncio.gather(*tasks)
+ await asyncio.sleep(0.1)
+
+ return sent_count, failed_count
+
+
+async def _send_and_pin_message(bot: Bot, chat_id: int, pinned_message: PinnedMessage) -> bool:
+ try:
+ await bot.unpin_all_chat_messages(chat_id=chat_id)
+ except TelegramBadRequest:
+ pass
+ except TelegramForbiddenError:
+ return False
+
+ try:
+ if pinned_message.media_type == "photo" and pinned_message.media_file_id:
+ sent_message = await bot.send_photo(
+ chat_id=chat_id,
+ photo=pinned_message.media_file_id,
+ caption=pinned_message.content or None,
+ parse_mode="HTML" if pinned_message.content else None,
+ disable_notification=True,
+ )
+ elif pinned_message.media_type == "video" and pinned_message.media_file_id:
+ sent_message = await bot.send_video(
+ chat_id=chat_id,
+ video=pinned_message.media_file_id,
+ caption=pinned_message.content or None,
+ parse_mode="HTML" if pinned_message.content else None,
+ disable_notification=True,
+ )
+ else:
+ sent_message = await bot.send_message(
+ chat_id=chat_id,
+ text=pinned_message.content,
+ parse_mode="HTML",
+ disable_web_page_preview=True,
+ )
+ await bot.pin_chat_message(
+ chat_id=chat_id,
+ message_id=sent_message.message_id,
+ disable_notification=True,
+ )
+ return True
+ except TelegramForbiddenError:
+ return False
+ except TelegramBadRequest as error:
+ logger.warning(
+ "Некорректный запрос при отправке закрепленного сообщения в чат %s: %s",
+ chat_id,
+ error,
+ )
+ except Exception as error: # noqa: BLE001
+ logger.error(
+ "Не удалось отправить закрепленное сообщение пользователю %s: %s",
+ chat_id,
+ error,
+ )
+
+ return False
diff --git a/app/states.py b/app/states.py
index 795a6d67..ba8cfb0c 100644
--- a/app/states.py
+++ b/app/states.py
@@ -134,6 +134,7 @@ class AdminStates(StatesGroup):
creating_server_country = State()
editing_welcome_text = State()
+ editing_pinned_message = State()
waiting_for_message_buttons = "waiting_for_message_buttons"
editing_promo_offer_message = State()
diff --git a/migrations/alembic/versions/0f3d1f5c2c0a_add_delivery_position_to_pinned_messages.py b/migrations/alembic/versions/0f3d1f5c2c0a_add_delivery_position_to_pinned_messages.py
new file mode 100644
index 00000000..7779fdba
--- /dev/null
+++ b/migrations/alembic/versions/0f3d1f5c2c0a_add_delivery_position_to_pinned_messages.py
@@ -0,0 +1,28 @@
+"""add delivery position to pinned messages
+
+Revision ID: 0f3d1f5c2c0a
+Revises: 5f2a3e099427
+Create Date: 2025-01-01 00:00:00.000000
+"""
+
+from alembic import op
+import sqlalchemy as sa
+
+# revision identifiers, used by Alembic.
+revision = '0f3d1f5c2c0a'
+down_revision = '5f2a3e099427'
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+ op.add_column(
+ 'pinned_messages',
+ sa.Column('delivery_position', sa.String(length=32), nullable=False, server_default='before_menu'),
+ )
+ op.execute("UPDATE pinned_messages SET delivery_position = 'before_menu' WHERE delivery_position IS NULL")
+ op.alter_column('pinned_messages', 'delivery_position', server_default=None)
+
+
+def downgrade() -> None:
+ op.drop_column('pinned_messages', 'delivery_position')
diff --git a/migrations/alembic/versions/5f2a3e099427_add_media_fields_to_pinned_messages.py b/migrations/alembic/versions/5f2a3e099427_add_media_fields_to_pinned_messages.py
new file mode 100644
index 00000000..fdd05440
--- /dev/null
+++ b/migrations/alembic/versions/5f2a3e099427_add_media_fields_to_pinned_messages.py
@@ -0,0 +1,75 @@
+"""add media fields to pinned messages"""
+
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+
+
+revision: str = "5f2a3e099427"
+down_revision: Union[str, None] = "c9c71d04f0a1"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+TABLE_NAME = "pinned_messages"
+
+
+def _table_exists(inspector: sa.Inspector) -> bool:
+ return TABLE_NAME in inspector.get_table_names()
+
+
+def _column_missing(inspector: sa.Inspector, column_name: str) -> bool:
+ columns = {column.get("name") for column in inspector.get_columns(TABLE_NAME)}
+ return column_name not in columns
+
+
+def upgrade() -> None:
+ bind = op.get_bind()
+ inspector = sa.inspect(bind)
+
+ if not _table_exists(inspector):
+ return
+
+ if _column_missing(inspector, "media_type"):
+ op.add_column(
+ TABLE_NAME,
+ sa.Column("media_type", sa.String(length=32), nullable=True),
+ )
+
+ if _column_missing(inspector, "media_file_id"):
+ op.add_column(
+ TABLE_NAME,
+ sa.Column("media_file_id", sa.String(length=255), nullable=True),
+ )
+
+ # Ensure content has a default value for media-only messages
+ op.alter_column(
+ TABLE_NAME,
+ "content",
+ existing_type=sa.Text(),
+ nullable=False,
+ server_default="",
+ )
+
+
+def downgrade() -> None:
+ bind = op.get_bind()
+ inspector = sa.inspect(bind)
+
+ if not _table_exists(inspector):
+ return
+
+ if not _column_missing(inspector, "media_type"):
+ op.drop_column(TABLE_NAME, "media_type")
+
+ if not _column_missing(inspector, "media_file_id"):
+ op.drop_column(TABLE_NAME, "media_file_id")
+
+ op.alter_column(
+ TABLE_NAME,
+ "content",
+ existing_type=sa.Text(),
+ nullable=False,
+ server_default=None,
+ )
diff --git a/migrations/alembic/versions/c9c71d04f0a1_add_pinned_messages_table.py b/migrations/alembic/versions/c9c71d04f0a1_add_pinned_messages_table.py
new file mode 100644
index 00000000..add5fe11
--- /dev/null
+++ b/migrations/alembic/versions/c9c71d04f0a1_add_pinned_messages_table.py
@@ -0,0 +1,45 @@
+"""add pinned messages table"""
+
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+
+
+revision: str = "c9c71d04f0a1"
+down_revision: Union[str, None] = "e3c1e0b5b4a7"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+TABLE_NAME = "pinned_messages"
+
+
+def _table_exists(inspector: sa.Inspector) -> bool:
+ return TABLE_NAME in inspector.get_table_names()
+
+
+def upgrade() -> None:
+ bind = op.get_bind()
+ inspector = sa.inspect(bind)
+
+ if _table_exists(inspector):
+ return
+
+ op.create_table(
+ TABLE_NAME,
+ sa.Column("id", sa.Integer(), primary_key=True, index=True),
+ sa.Column("content", sa.Text(), nullable=False),
+ sa.Column("is_active", sa.Boolean(), default=True),
+ sa.Column("created_by", sa.Integer(), sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True),
+ sa.Column("created_at", sa.DateTime(), server_default=sa.func.now()),
+ sa.Column("updated_at", sa.DateTime(), server_default=sa.func.now(), onupdate=sa.func.now()),
+ )
+
+
+def downgrade() -> None:
+ bind = op.get_bind()
+ inspector = sa.inspect(bind)
+
+ if _table_exists(inspector):
+ op.drop_table(TABLE_NAME)