Merge pull request #2196 from BEDOLAGA-DEV/dev5

Remnawave Api Update / Pinned Massages
This commit is contained in:
Egor
2025-12-22 15:18:18 +03:00
committed by GitHub
22 changed files with 1917 additions and 107 deletions

View File

@@ -611,6 +611,7 @@ class User(Base):
promo_group = relationship("PromoGroup", back_populates="users")
user_promo_groups = relationship("UserPromoGroup", back_populates="user", cascade="all, delete-orphan")
poll_responses = relationship("PollResponse", back_populates="user")
last_pinned_message_id = Column(Integer, nullable=True)
@property
def balance_rubles(self) -> float:
@@ -1553,6 +1554,23 @@ class WelcomeText(Base):
creator = relationship("User", backref="created_welcome_texts")
class PinnedMessage(Base):
__tablename__ = "pinned_messages"
id = Column(Integer, primary_key=True, index=True)
content = Column(Text, nullable=False, default="")
media_type = Column(String(32), nullable=True)
media_file_id = Column(String(255), nullable=True)
send_before_menu = Column(Boolean, nullable=False, server_default="1", default=True)
send_on_every_start = Column(Boolean, nullable=False, server_default="1", default=True)
is_active = Column(Boolean, default=True)
created_by = Column(Integer, ForeignKey("users.id", ondelete="SET NULL"), nullable=True)
created_at = Column(DateTime, default=func.now())
updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
creator = relationship("User", backref="pinned_messages")
class AdvertisingCampaign(Base):
__tablename__ = "advertising_campaigns"

View File

@@ -3014,6 +3014,157 @@ async def create_welcome_texts_table():
logger.error(f"Ошибка создания таблицы welcome_texts: {e}")
return False
async def create_pinned_messages_table():
table_exists = await check_table_exists("pinned_messages")
if table_exists:
logger.info("Таблица pinned_messages уже существует")
return True
try:
async with engine.begin() as conn:
db_type = await get_database_type()
if db_type == "sqlite":
create_sql = """
CREATE TABLE pinned_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL DEFAULT '',
media_type VARCHAR(32) NULL,
media_file_id VARCHAR(255) NULL,
send_before_menu BOOLEAN NOT NULL DEFAULT 1,
send_on_every_start BOOLEAN NOT NULL DEFAULT 1,
is_active BOOLEAN DEFAULT 1,
created_by INTEGER NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS ix_pinned_messages_active ON pinned_messages(is_active);
"""
elif db_type == "postgresql":
create_sql = """
CREATE TABLE pinned_messages (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL DEFAULT '',
media_type VARCHAR(32) NULL,
media_file_id VARCHAR(255) NULL,
send_before_menu BOOLEAN NOT NULL DEFAULT TRUE,
send_on_every_start BOOLEAN NOT NULL DEFAULT TRUE,
is_active BOOLEAN DEFAULT TRUE,
created_by INTEGER NULL REFERENCES users(id) ON DELETE SET NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS ix_pinned_messages_active ON pinned_messages(is_active);
"""
elif db_type == "mysql":
create_sql = """
CREATE TABLE pinned_messages (
id INT AUTO_INCREMENT PRIMARY KEY,
content TEXT NOT NULL DEFAULT '',
media_type VARCHAR(32) NULL,
media_file_id VARCHAR(255) NULL,
send_before_menu BOOLEAN NOT NULL DEFAULT TRUE,
send_on_every_start BOOLEAN NOT NULL DEFAULT TRUE,
is_active BOOLEAN DEFAULT TRUE,
created_by INT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (created_by) REFERENCES users(id) ON DELETE SET NULL
);
CREATE INDEX ix_pinned_messages_active ON pinned_messages(is_active);
"""
else:
logger.error(f"Неподдерживаемый тип БД для создания таблицы pinned_messages: {db_type}")
return False
await conn.execute(text(create_sql))
logger.info("✅ Таблица pinned_messages успешно создана")
return True
except Exception as e:
logger.error(f"Ошибка создания таблицы pinned_messages: {e}")
return False
async def ensure_pinned_message_media_columns():
table_exists = await check_table_exists("pinned_messages")
if not table_exists:
logger.warning("⚠️ Таблица pinned_messages отсутствует — пропускаем обновление медиа полей")
return False
try:
async with engine.begin() as conn:
db_type = await get_database_type()
if not await check_column_exists("pinned_messages", "media_type"):
await conn.execute(
text("ALTER TABLE pinned_messages ADD COLUMN media_type VARCHAR(32)")
)
if not await check_column_exists("pinned_messages", "media_file_id"):
await conn.execute(
text("ALTER TABLE pinned_messages ADD COLUMN media_file_id VARCHAR(255)")
)
if not await check_column_exists("pinned_messages", "send_before_menu"):
default_value = "TRUE" if db_type != "sqlite" else "1"
await conn.execute(
text(
f"ALTER TABLE pinned_messages ADD COLUMN send_before_menu BOOLEAN NOT NULL DEFAULT {default_value}"
)
)
if not await check_column_exists("pinned_messages", "send_on_every_start"):
default_value = "TRUE" if db_type != "sqlite" else "1"
await conn.execute(
text(
f"ALTER TABLE pinned_messages ADD COLUMN send_on_every_start BOOLEAN NOT NULL DEFAULT {default_value}"
)
)
await conn.execute(text("UPDATE pinned_messages SET content = '' WHERE content IS NULL"))
if db_type == "postgresql":
await conn.execute(
text("ALTER TABLE pinned_messages ALTER COLUMN content SET DEFAULT ''")
)
elif db_type == "mysql":
await conn.execute(
text("ALTER TABLE pinned_messages MODIFY content TEXT NOT NULL DEFAULT ''")
)
else:
logger.info(" Пропускаем установку DEFAULT для content в SQLite")
logger.info("✅ Медиа поля pinned_messages приведены в актуальное состояние")
return True
except Exception as e:
logger.error(f"Ошибка обновления медиа полей pinned_messages: {e}")
return False
async def ensure_user_last_pinned_column():
try:
async with engine.begin() as conn:
if not await check_column_exists("users", "last_pinned_message_id"):
await conn.execute(
text("ALTER TABLE users ADD COLUMN last_pinned_message_id INTEGER")
)
logger.info("✅ Поле last_pinned_message_id у пользователей готово")
return True
except Exception as e:
logger.error(f"Ошибка добавления поля last_pinned_message_id: {e}")
return False
async def add_media_fields_to_broadcast_history():
logger.info("=== ДОБАВЛЕНИЕ ПОЛЕЙ МЕДИА В BROADCAST_HISTORY ===")
@@ -4690,12 +4841,33 @@ async def run_universal_migration():
else:
logger.warning("⚠️ Проблемы с таблицей user_messages")
logger.info("=== СОЗДАНИЕ ТАБЛИЦЫ PINNED_MESSAGES ===")
pinned_messages_created = await create_pinned_messages_table()
if pinned_messages_created:
logger.info("✅ Таблица pinned_messages готова")
else:
logger.warning("⚠️ Проблемы с таблицей pinned_messages")
logger.info("=== СОЗДАНИЕ/ОБНОВЛЕНИЕ ТАБЛИЦЫ WELCOME_TEXTS ===")
welcome_texts_created = await create_welcome_texts_table()
if welcome_texts_created:
logger.info("✅ Таблица welcome_texts готова с полем is_enabled")
else:
logger.warning("⚠️ Проблемы с таблицей welcome_texts")
logger.info("=== ОБНОВЛЕНИЕ СХЕМЫ PINNED_MESSAGES ===")
pinned_media_ready = await ensure_pinned_message_media_columns()
if pinned_media_ready:
logger.info("✅ Медиа поля для pinned_messages готовы")
else:
logger.warning("⚠️ Проблемы с медиа полями pinned_messages")
logger.info("=== ДОБАВЛЕНИЕ СЛЕДА ОТПРАВКИ ЗАКРЕПА ДЛЯ ПОЛЬЗОВАТЕЛЕЙ ===")
last_pinned_ready = await ensure_user_last_pinned_column()
if last_pinned_ready:
logger.info("✅ Колонка last_pinned_message_id добавлена")
else:
logger.warning("⚠️ Не удалось обновить колонку last_pinned_message_id")
logger.info("=== ДОБАВЛЕНИЕ МЕДИА ПОЛЕЙ В BROADCAST_HISTORY ===")
media_fields_added = await add_media_fields_to_broadcast_history()
@@ -4880,8 +5052,13 @@ async def check_migration_status():
"cryptobot_table": False,
"heleket_table": False,
"user_messages_table": False,
"pinned_messages_table": False,
"welcome_texts_table": False,
"welcome_texts_is_enabled_column": False,
"pinned_messages_media_columns": False,
"pinned_messages_position_column": False,
"pinned_messages_start_mode_column": False,
"users_last_pinned_column": False,
"broadcast_history_media_fields": False,
"subscription_duplicates": False,
"subscription_conversions_table": False,
@@ -4924,6 +5101,7 @@ async def check_migration_status():
status["cryptobot_table"] = await check_table_exists('cryptobot_payments')
status["heleket_table"] = await check_table_exists('heleket_payments')
status["user_messages_table"] = await check_table_exists('user_messages')
status["pinned_messages_table"] = await check_table_exists('pinned_messages')
status["welcome_texts_table"] = await check_table_exists('welcome_texts')
status["privacy_policies_table"] = await check_table_exists('privacy_policies')
status["public_offers_table"] = await check_table_exists('public_offers')
@@ -4969,6 +5147,25 @@ async def check_migration_status():
await check_column_exists('broadcast_history', 'media_caption')
)
status["broadcast_history_media_fields"] = media_fields_exist
pinned_media_columns_exist = (
status["pinned_messages_table"]
and await check_column_exists('pinned_messages', 'media_type')
and await check_column_exists('pinned_messages', 'media_file_id')
)
status["pinned_messages_media_columns"] = pinned_media_columns_exist
status["pinned_messages_position_column"] = (
status["pinned_messages_table"]
and await check_column_exists('pinned_messages', 'send_before_menu')
)
status["pinned_messages_start_mode_column"] = (
status["pinned_messages_table"]
and await check_column_exists('pinned_messages', 'send_on_every_start')
)
status["users_last_pinned_column"] = await check_column_exists('users', 'last_pinned_message_id')
async with engine.begin() as conn:
duplicates_check = await conn.execute(text("""
@@ -4987,10 +5184,15 @@ async def check_migration_status():
"cryptobot_table": "Таблица CryptoBot payments",
"heleket_table": "Таблица Heleket payments",
"user_messages_table": "Таблица пользовательских сообщений",
"pinned_messages_table": "Таблица закреплённых сообщений",
"welcome_texts_table": "Таблица приветственных текстов",
"privacy_policies_table": "Таблица политик конфиденциальности",
"public_offers_table": "Таблица публичных оферт",
"welcome_texts_is_enabled_column": "Поле is_enabled в welcome_texts",
"pinned_messages_media_columns": "Медиа поля в pinned_messages",
"pinned_messages_position_column": "Позиция закрепа (до/после меню)",
"pinned_messages_start_mode_column": "Режим отправки закрепа при /start",
"users_last_pinned_column": "Колонка last_pinned_message_id у пользователей",
"broadcast_history_media_fields": "Медиа поля в broadcast_history",
"subscription_conversions_table": "Таблица конверсий подписок",
"subscription_events_table": "Таблица событий подписок",

View File

@@ -1,3 +1,4 @@
import html
import logging
import asyncio
from datetime import datetime, timedelta
@@ -25,13 +26,19 @@ from app.keyboards.admin import (
get_admin_pagination_keyboard, get_broadcast_media_keyboard,
get_media_confirm_keyboard, get_updated_message_buttons_selector_keyboard_with_media,
BROADCAST_BUTTON_ROWS, DEFAULT_BROADCAST_BUTTONS,
get_broadcast_button_config, get_broadcast_button_labels
get_broadcast_button_config, get_broadcast_button_labels, get_pinned_message_keyboard
)
from app.localization.texts import get_texts
from app.database.crud.user import get_users_list
from app.database.crud.subscription import get_expiring_subscriptions
from app.utils.decorators import admin_required, error_handler
from app.utils.miniapp_buttons import build_miniapp_or_callback_button
from app.services.pinned_message_service import (
broadcast_pinned_message,
get_active_pinned_message,
set_active_pinned_message,
unpin_active_pinned_message,
)
logger = logging.getLogger(__name__)
@@ -167,6 +174,302 @@ async def show_messages_menu(
await callback.answer()
@admin_required
@error_handler
async def show_pinned_message_menu(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext,
):
await state.clear()
pinned_message = await get_active_pinned_message(db)
if pinned_message:
content_preview = html.escape(pinned_message.content or "")
last_updated = pinned_message.updated_at or pinned_message.created_at
timestamp_text = last_updated.strftime("%d.%m.%Y %H:%M") if last_updated else ""
media_line = ""
if pinned_message.media_type:
media_label = "Фото" if pinned_message.media_type == "photo" else "Видео"
media_line = f"📎 Медиа: {media_label}\n"
position_line = (
"⬆️ Отправлять перед меню"
if pinned_message.send_before_menu
else "⬇️ Отправлять после меню"
)
start_mode_line = (
"🔁 При каждом /start"
if pinned_message.send_on_every_start
else "🚫 Только один раз и при обновлении"
)
body = (
"📌 <b>Закрепленное сообщение</b>\n\n"
"📝 Текущий текст:\n"
f"<code>{content_preview}</code>\n\n"
f"{media_line}"
f"{position_line}\n"
f"{start_mode_line}\n"
f"🕒 Обновлено: {timestamp_text}"
)
else:
body = (
"📌 <b>Закрепленное сообщение</b>\n\n"
"Сообщение не задано. Отправьте новый текст, чтобы разослать и закрепить его у пользователей."
)
await callback.message.edit_text(
body,
reply_markup=get_pinned_message_keyboard(
db_user.language,
send_before_menu=getattr(pinned_message, "send_before_menu", True),
send_on_every_start=getattr(pinned_message, "send_on_every_start", True),
),
parse_mode="HTML",
)
await callback.answer()
@admin_required
@error_handler
async def prompt_pinned_message_update(
callback: types.CallbackQuery,
db_user: User,
state: FSMContext,
):
await state.set_state(AdminStates.editing_pinned_message)
await callback.message.edit_text(
"✏️ <b>Новое закрепленное сообщение</b>\n\n"
"Пришлите текст, фото или видео, которое нужно закрепить.\n"
"Бот отправит его всем активным пользователям, открепит старое и закрепит новое без уведомлений.",
reply_markup=types.InlineKeyboardMarkup(inline_keyboard=[
[types.InlineKeyboardButton(text="❌ Отмена", callback_data="admin_pinned_message")]
]),
parse_mode="HTML",
)
await callback.answer()
@admin_required
@error_handler
async def toggle_pinned_message_position(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext,
):
pinned_message = await get_active_pinned_message(db)
if not pinned_message:
await callback.answer("Сначала задайте закрепленное сообщение", show_alert=True)
return
pinned_message.send_before_menu = not pinned_message.send_before_menu
pinned_message.updated_at = datetime.utcnow()
await db.commit()
await show_pinned_message_menu(callback, db_user, db, state)
@admin_required
@error_handler
async def toggle_pinned_message_start_mode(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext,
):
pinned_message = await get_active_pinned_message(db)
if not pinned_message:
await callback.answer("Сначала задайте закрепленное сообщение", show_alert=True)
return
pinned_message.send_on_every_start = not pinned_message.send_on_every_start
pinned_message.updated_at = datetime.utcnow()
await db.commit()
await show_pinned_message_menu(callback, db_user, db, state)
@admin_required
@error_handler
async def delete_pinned_message(
callback: types.CallbackQuery,
db_user: User,
db: AsyncSession,
state: FSMContext,
):
pinned_message = await get_active_pinned_message(db)
if not pinned_message:
await callback.answer("Закрепленное сообщение уже отсутствует", show_alert=True)
return
await callback.message.edit_text(
"🗑️ <b>Удаление закрепленного сообщения</b>\n\n"
"Подождите, пока бот открепит сообщение у пользователей...",
parse_mode="HTML",
)
unpinned_count, failed_count, deleted = await unpin_active_pinned_message(
callback.bot,
db,
)
if not deleted:
await callback.message.edit_text(
"Не удалось найти активное закрепленное сообщение для удаления",
reply_markup=get_admin_messages_keyboard(db_user.language),
parse_mode="HTML",
)
await state.clear()
return
total = unpinned_count + failed_count
await callback.message.edit_text(
"✅ <b>Закрепленное сообщение удалено</b>\n\n"
f"👥 Чатов обработано: {total}\n"
f"✅ Откреплено: {unpinned_count}\n"
f"⚠️ Ошибок: {failed_count}\n\n"
"Новое сообщение можно задать кнопкой \"Обновить\".",
reply_markup=get_admin_messages_keyboard(db_user.language),
parse_mode="HTML",
)
await state.clear()
@admin_required
@error_handler
async def process_pinned_message_update(
message: types.Message,
db_user: User,
state: FSMContext,
db: AsyncSession,
):
texts = get_texts(db_user.language)
media_type: Optional[str] = None
media_file_id: Optional[str] = None
if message.photo:
media_type = "photo"
media_file_id = message.photo[-1].file_id
elif message.video:
media_type = "video"
media_file_id = message.video.file_id
pinned_text = message.html_text or message.caption_html or message.text or message.caption or ""
if not pinned_text and not media_file_id:
await message.answer(
texts.t("ADMIN_PINNED_NO_CONTENT", "Не удалось прочитать текст или медиа в сообщении, попробуйте снова.")
)
return
try:
pinned_message = await set_active_pinned_message(
db,
pinned_text,
db_user.id,
media_type=media_type,
media_file_id=media_file_id,
)
except ValueError as validation_error:
await message.answer(f"{validation_error}")
return
# Сообщение сохранено, спрашиваем о рассылке
from app.keyboards.admin import get_pinned_broadcast_confirm_keyboard
from app.states import AdminStates
await message.answer(
texts.t(
"ADMIN_PINNED_SAVED_ASK_BROADCAST",
"📌 <b>Сообщение сохранено!</b>\n\n"
"Выберите, как доставить сообщение пользователям:\n\n"
"• <b>Разослать сейчас</b> — отправит и закрепит у всех активных пользователей\n"
"• <b>Только при /start</b> — пользователи увидят при следующем запуске бота",
),
reply_markup=get_pinned_broadcast_confirm_keyboard(db_user.language, pinned_message.id),
parse_mode="HTML",
)
await state.set_state(AdminStates.confirming_pinned_broadcast)
@admin_required
@error_handler
async def handle_pinned_broadcast_now(
callback: types.CallbackQuery,
db_user: User,
state: FSMContext,
db: AsyncSession,
):
"""Разослать закреплённое сообщение сейчас всем пользователям."""
texts = get_texts(db_user.language)
# Получаем ID сообщения из callback_data
pinned_message_id = int(callback.data.split(":")[1])
# Получаем сообщение из БД
from sqlalchemy import select
from app.database.models import PinnedMessage
result = await db.execute(
select(PinnedMessage).where(PinnedMessage.id == pinned_message_id)
)
pinned_message = result.scalar_one_or_none()
if not pinned_message:
await callback.answer("❌ Сообщение не найдено", show_alert=True)
await state.clear()
return
await callback.message.edit_text(
texts.t("ADMIN_PINNED_SAVING", "📌 Сообщение сохранено. Начинаю отправку и закрепление у пользователей..."),
parse_mode="HTML",
)
sent_count, failed_count = await broadcast_pinned_message(
callback.bot,
db,
pinned_message,
)
total = sent_count + failed_count
await callback.message.edit_text(
texts.t(
"ADMIN_PINNED_UPDATED",
"✅ <b>Закрепленное сообщение обновлено</b>\n\n"
"👥 Получателей: {total}\n"
"✅ Отправлено: {sent}\n"
"⚠️ Ошибок: {failed}",
).format(total=total, sent=sent_count, failed=failed_count),
reply_markup=get_admin_messages_keyboard(db_user.language),
parse_mode="HTML",
)
await state.clear()
@admin_required
@error_handler
async def handle_pinned_broadcast_skip(
callback: types.CallbackQuery,
db_user: User,
state: FSMContext,
db: AsyncSession,
):
"""Пропустить рассылку — пользователи увидят при /start."""
texts = get_texts(db_user.language)
await callback.message.edit_text(
texts.t(
"ADMIN_PINNED_SAVED_NO_BROADCAST",
"✅ <b>Закрепленное сообщение сохранено</b>\n\n"
"Рассылка не выполнена. Пользователи увидят сообщение при следующем вводе /start.",
),
reply_markup=get_admin_messages_keyboard(db_user.language),
parse_mode="HTML",
)
await state.clear()
@admin_required
@error_handler
async def show_broadcast_targets(
@@ -1295,6 +1598,13 @@ def get_target_display_name(target: str) -> str:
def register_handlers(dp: Dispatcher):
dp.callback_query.register(show_messages_menu, F.data == "admin_messages")
dp.callback_query.register(show_pinned_message_menu, F.data == "admin_pinned_message")
dp.callback_query.register(toggle_pinned_message_position, F.data == "admin_pinned_message_position")
dp.callback_query.register(toggle_pinned_message_start_mode, F.data == "admin_pinned_message_start_mode")
dp.callback_query.register(delete_pinned_message, F.data == "admin_pinned_message_delete")
dp.callback_query.register(prompt_pinned_message_update, F.data == "admin_pinned_message_edit")
dp.callback_query.register(handle_pinned_broadcast_now, F.data.startswith("admin_pinned_broadcast_now:"))
dp.callback_query.register(handle_pinned_broadcast_skip, F.data.startswith("admin_pinned_broadcast_skip:"))
dp.callback_query.register(show_broadcast_targets, F.data.in_(["admin_msg_all", "admin_msg_by_sub"]))
dp.callback_query.register(select_broadcast_target, F.data.startswith("broadcast_"))
dp.callback_query.register(confirm_broadcast, F.data == "admin_confirm_broadcast")
@@ -1312,3 +1622,4 @@ def register_handlers(dp: Dispatcher):
dp.callback_query.register(handle_change_media, F.data == "change_media")
dp.message.register(process_broadcast_message, AdminStates.waiting_for_broadcast_message)
dp.message.register(process_broadcast_media, AdminStates.waiting_for_broadcast_media)
dp.message.register(process_pinned_message_update, AdminStates.editing_pinned_message)

View File

@@ -67,7 +67,7 @@ async def handle_cancel(
async def handle_unknown_message(
message: types.Message,
db_user: User
db_user: User | None = None,
):
texts = get_texts(db_user.language if db_user else "ru")
@@ -126,6 +126,8 @@ def register_handlers(dp: Dispatcher):
dp.message.register(
handle_unknown_message,
StateFilter(None),
F.successful_payment.is_(None)
F.successful_payment.is_(None),
F.text.is_not(None),
~F.text.startswith("/"),
)

View File

@@ -1,5 +1,6 @@
import logging
from datetime import datetime
from typing import Optional
from aiogram import Dispatcher, types, F, Bot
from aiogram.enums import ChatMemberStatus
from aiogram.exceptions import TelegramForbiddenError
@@ -18,7 +19,7 @@ from app.database.crud.campaign import (
get_campaign_by_start_parameter,
get_campaign_by_id,
)
from app.database.models import UserStatus, SubscriptionStatus
from app.database.models import PinnedMessage, SubscriptionStatus, UserStatus
from app.keyboards.inline import (
get_rules_keyboard,
get_privacy_policy_keyboard,
@@ -36,6 +37,10 @@ from app.services.subscription_service import SubscriptionService
from app.services.support_settings_service import SupportSettingsService
from app.services.main_menu_button_service import MainMenuButtonService
from app.services.privacy_policy_service import PrivacyPolicyService
from app.services.pinned_message_service import (
deliver_pinned_message_to_user,
get_active_pinned_message,
)
from app.utils.user_utils import generate_unique_referral_code
from app.utils.promo_offer import (
build_promo_offer_hint,
@@ -61,6 +66,22 @@ def _calculate_subscription_flags(subscription):
return has_active_subscription, subscription_is_active
async def _send_pinned_message(
bot: Bot,
db: AsyncSession,
user,
pinned_message: Optional[PinnedMessage] = None,
) -> None:
try:
await deliver_pinned_message_to_user(bot, db, user, pinned_message)
except Exception as error: # noqa: BLE001
logger.error(
"Не удалось отправить закрепленное сообщение пользователю %s: %s",
getattr(user, "telegram_id", "unknown"),
error,
)
async def _apply_campaign_bonus_if_needed(
db: AsyncSession,
user,
@@ -404,6 +425,11 @@ async def cmd_start(message: types.Message, state: FSMContext, db: AsyncSession,
user.subscription
)
pinned_message = await get_active_pinned_message(db)
if pinned_message and pinned_message.send_before_menu:
await _send_pinned_message(message.bot, db, user, pinned_message)
menu_text = await get_main_menu_text(user, texts, db)
is_admin = settings.is_admin(user.telegram_id)
@@ -438,6 +464,9 @@ async def cmd_start(message: types.Message, state: FSMContext, db: AsyncSession,
reply_markup=keyboard,
parse_mode="HTML"
)
if pinned_message and not pinned_message.send_before_menu:
await _send_pinned_message(message.bot, db, user, pinned_message)
await state.clear()
return
@@ -1094,6 +1123,7 @@ async def complete_registration_from_callback(
reply_markup=keyboard,
parse_mode="HTML"
)
await _send_pinned_message(callback.bot, db, existing_user)
except Exception as e:
logger.error(f"Ошибка при показе главного меню существующему пользователю: {e}")
await callback.message.answer(
@@ -1232,6 +1262,7 @@ async def complete_registration_from_callback(
reply_markup=get_post_registration_keyboard(user.language),
)
logger.info(f"✅ Приветственное сообщение отправлено пользователю {user.telegram_id}")
await _send_pinned_message(callback.bot, db, user)
except Exception as e:
logger.error(f"Ошибка при отправке приветственного сообщения: {e}")
else:
@@ -1277,6 +1308,7 @@ async def complete_registration_from_callback(
reply_markup=keyboard,
parse_mode="HTML"
)
await _send_pinned_message(callback.bot, db, user)
logger.info(f"✅ Главное меню показано пользователю {user.telegram_id}")
except Exception as e:
logger.error(f"Ошибка при показе главного меню: {e}")
@@ -1374,6 +1406,7 @@ async def complete_registration(
reply_markup=keyboard,
parse_mode="HTML"
)
await _send_pinned_message(message.bot, db, existing_user)
except Exception as e:
logger.error(f"Ошибка при показе главного меню существующему пользователю: {e}")
await message.answer(
@@ -1535,6 +1568,7 @@ async def complete_registration(
reply_markup=get_post_registration_keyboard(user.language),
)
logger.info(f"✅ Приветственное сообщение отправлено пользователю {user.telegram_id}")
await _send_pinned_message(message.bot, db, user)
except Exception as e:
logger.error(f"Ошибка при отправке приветственного сообщения: {e}")
else:
@@ -1581,6 +1615,7 @@ async def complete_registration(
parse_mode="HTML"
)
logger.info(f"✅ Главное меню показано пользователю {user.telegram_id}")
await _send_pinned_message(message.bot, db, user)
except Exception as e:
logger.error(f"Ошибка при показе главного меню: {e}")
await message.answer(
@@ -1925,6 +1960,7 @@ async def required_sub_channel_check(
reply_markup=keyboard,
parse_mode="HTML",
)
await _send_pinned_message(bot, db, user)
else:
from app.keyboards.inline import get_rules_keyboard

View File

@@ -837,12 +837,91 @@ def get_admin_messages_keyboard(language: str = "ru") -> InlineKeyboardMarkup:
callback_data="admin_msg_history"
)
],
[
InlineKeyboardButton(
text=_t(texts, "ADMIN_PINNED_MESSAGE", "📌 Закрепленное сообщение"),
callback_data="admin_pinned_message",
)
],
[
InlineKeyboardButton(text=texts.BACK, callback_data="admin_submenu_communications")
]
])
def get_pinned_message_keyboard(
language: str = "ru",
send_before_menu: bool = True,
send_on_every_start: bool = True,
) -> InlineKeyboardMarkup:
texts = get_texts(language)
position_label = (
_t(texts, "ADMIN_PINNED_POSITION_BEFORE", "⬆️ Показать перед меню")
if send_before_menu
else _t(texts, "ADMIN_PINNED_POSITION_AFTER", "⬇️ Показать после меню")
)
toggle_callback = "admin_pinned_message_position"
start_mode_label = (
_t(texts, "ADMIN_PINNED_START_EVERY_TIME", "🔁 Показать при каждом /start")
if send_on_every_start
else _t(texts, "ADMIN_PINNED_START_ONCE", "🚫 Показывать только один раз")
)
start_mode_callback = "admin_pinned_message_start_mode"
return InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(
text=_t(texts, "ADMIN_PINNED_MESSAGE_UPDATE", "✏️ Обновить"),
callback_data="admin_pinned_message_edit",
)
],
[
InlineKeyboardButton(
text=position_label,
callback_data=toggle_callback,
)
],
[
InlineKeyboardButton(
text=start_mode_label,
callback_data=start_mode_callback,
)
],
[
InlineKeyboardButton(
text=_t(texts, "ADMIN_PINNED_MESSAGE_DELETE", "🗑️ Удалить и отключить"),
callback_data="admin_pinned_message_delete",
)
],
[InlineKeyboardButton(text=texts.BACK, callback_data="admin_messages")],
])
def get_pinned_broadcast_confirm_keyboard(
language: str = "ru",
pinned_message_id: int = 0,
) -> InlineKeyboardMarkup:
"""Клавиатура для выбора: разослать сейчас или только при /start."""
texts = get_texts(language)
return InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(
text=_t(texts, "ADMIN_PINNED_BROADCAST_NOW", "📨 Разослать сейчас всем"),
callback_data=f"admin_pinned_broadcast_now:{pinned_message_id}",
)
],
[
InlineKeyboardButton(
text=_t(texts, "ADMIN_PINNED_BROADCAST_ON_START", "⏳ Только при /start"),
callback_data=f"admin_pinned_broadcast_skip:{pinned_message_id}",
)
],
])
def get_admin_monitoring_keyboard(language: str = "ru") -> InlineKeyboardMarkup:
texts = get_texts(language)

View File

@@ -209,6 +209,20 @@
"ADMIN_MESSAGES_BY_CRITERIA": "🔍 By criteria",
"ADMIN_MESSAGES_BY_SUBSCRIPTIONS": "🎯 By subscriptions",
"ADMIN_MESSAGES_HISTORY": "📋 History",
"ADMIN_PINNED_MESSAGE": "📌 Pinned message",
"ADMIN_PINNED_MESSAGE_UPDATE": "✏️ Update",
"ADMIN_PINNED_MESSAGE_DELETE": "🗑️ Remove and disable",
"ADMIN_PINNED_POSITION_BEFORE": "⬆️ Send before menu",
"ADMIN_PINNED_POSITION_AFTER": "⬇️ Send after menu",
"ADMIN_PINNED_START_EVERY_TIME": "🔁 Show on every /start",
"ADMIN_PINNED_START_ONCE": "🚫 Show only once",
"ADMIN_PINNED_NO_CONTENT": "❌ Could not read text or media from the message, please try again.",
"ADMIN_PINNED_SAVING": "📌 Message saved. Starting broadcast and pinning for users...",
"ADMIN_PINNED_UPDATED": "✅ <b>Pinned message updated</b>\n\n👥 Recipients: {total}\n✅ Sent: {sent}\n⚠ Errors: {failed}",
"ADMIN_PINNED_SAVED_ASK_BROADCAST": "📌 <b>Message saved!</b>\n\nChoose how to deliver the message to users:\n\n• <b>Broadcast now</b> — will send and pin for all active users\n• <b>Only on /start</b> — users will see it on next bot launch",
"ADMIN_PINNED_SAVED_NO_BROADCAST": "✅ <b>Pinned message saved</b>\n\nNo broadcast performed. Users will see the message on their next /start.",
"ADMIN_PINNED_BROADCAST_NOW": "📨 Broadcast now to all",
"ADMIN_PINNED_BROADCAST_ON_START": "⏳ Only on /start",
"ADMIN_MONITORING": "🔍 Monitoring",
"ADMIN_MONITORING_ALL_LOGS": "📋 All logs",
"ADMIN_MONITORING_AUTOPAY_SETTINGS": "💳 Auto-pay settings",

View File

@@ -212,6 +212,20 @@
"ADMIN_MESSAGES_BY_CRITERIA": "🔍 По критериям",
"ADMIN_MESSAGES_BY_SUBSCRIPTIONS": "🎯 По подпискам",
"ADMIN_MESSAGES_HISTORY": "📋 История",
"ADMIN_PINNED_MESSAGE": "📌 Закрепленное сообщение",
"ADMIN_PINNED_MESSAGE_UPDATE": "✏️ Обновить",
"ADMIN_PINNED_MESSAGE_DELETE": "🗑️ Удалить и отключить",
"ADMIN_PINNED_POSITION_BEFORE": "⬆️ Показать перед меню",
"ADMIN_PINNED_POSITION_AFTER": "⬇️ Показать после меню",
"ADMIN_PINNED_START_EVERY_TIME": "🔁 Показать при каждом /start",
"ADMIN_PINNED_START_ONCE": "🚫 Показывать только один раз",
"ADMIN_PINNED_NO_CONTENT": "❌ Не удалось прочитать текст или медиа в сообщении, попробуйте снова.",
"ADMIN_PINNED_SAVING": "📌 Сообщение сохранено. Начинаю отправку и закрепление у пользователей...",
"ADMIN_PINNED_UPDATED": "✅ <b>Закрепленное сообщение обновлено</b>\n\n👥 Получателей: {total}\n✅ Отправлено: {sent}\n⚠ Ошибок: {failed}",
"ADMIN_PINNED_SAVED_ASK_BROADCAST": "📌 <b>Сообщение сохранено!</b>\n\nВыберите, как доставить сообщение пользователям:\n\n• <b>Разослать сейчас</b> — отправит и закрепит у всех активных пользователей\n• <b>Только при /start</b> — пользователи увидят при следующем запуске бота",
"ADMIN_PINNED_SAVED_NO_BROADCAST": "✅ <b>Закрепленное сообщение сохранено</b>\n\nРассылка не выполнена. Пользователи увидят сообщение при следующем вводе /start.",
"ADMIN_PINNED_BROADCAST_NOW": "📨 Разослать сейчас всем",
"ADMIN_PINNED_BROADCAST_ON_START": "⏳ Только при /start",
"ADMIN_MONITORING": "🔍 Мониторинг",
"ADMIN_MONITORING_ALL_LOGS": "📋 Все логи",
"ADMIN_MONITORING_AUTOPAY_SETTINGS": "💳 Настройки автооплаты",

View File

@@ -138,8 +138,22 @@
"ADMIN_MESSAGES_ALL_USERS": "📨 Всім користувачам",
"ADMIN_MESSAGES_BY_CRITERIA": "🔍 За критеріями",
"ADMIN_MESSAGES_BY_SUBSCRIPTIONS": "🎯 За підписками",
"ADMIN_MESSAGES_HISTORY": "📋 Історія",
"ADMIN_MONITORING": "🔍 Моніторинг",
"ADMIN_MESSAGES_HISTORY": "📋 Історія",
"ADMIN_PINNED_MESSAGE": "📌 Закріплене повідомлення",
"ADMIN_PINNED_MESSAGE_UPDATE": "✏️ Оновити",
"ADMIN_PINNED_MESSAGE_DELETE": "🗑️ Видалити та вимкнути",
"ADMIN_PINNED_POSITION_BEFORE": "⬆️ Показати перед меню",
"ADMIN_PINNED_POSITION_AFTER": "⬇️ Показати після меню",
"ADMIN_PINNED_START_EVERY_TIME": "🔁 Показувати при кожному /start",
"ADMIN_PINNED_START_ONCE": "🚫 Показувати лише один раз",
"ADMIN_PINNED_NO_CONTENT": "❌ Не вдалося прочитати текст або медіа у повідомленні, спробуйте ще раз.",
"ADMIN_PINNED_SAVING": "📌 Повідомлення збережено. Починаю відправку та закріплення у користувачів...",
"ADMIN_PINNED_UPDATED": "✅ <b>Закріплене повідомлення оновлено</b>\n\n👥 Отримувачів: {total}\n✅ Надіслано: {sent}\n⚠ Помилок: {failed}",
"ADMIN_PINNED_SAVED_ASK_BROADCAST": "📌 <b>Повідомлення збережено!</b>\n\nОберіть, як доставити повідомлення користувачам:\n\n• <b>Розіслати зараз</b> — відправить і закріпить у всіх активних користувачів\n• <b>Тільки при /start</b> — користувачі побачать при наступному запуску бота",
"ADMIN_PINNED_SAVED_NO_BROADCAST": "✅ <b>Закріплене повідомлення збережено</b>\n\nРозсилка не виконана. Користувачі побачать повідомлення при наступному введенні /start.",
"ADMIN_PINNED_BROADCAST_NOW": "📨 Розіслати зараз всім",
"ADMIN_PINNED_BROADCAST_ON_START": "⏳ Тільки при /start",
"ADMIN_MONITORING": "🔍 Моніторинг",
"ADMIN_MONITORING_ALL_LOGS": "📋 Всі логи",
"ADMIN_MONITORING_AUTOPAY_SETTINGS": "💳 Налаштування автооплати",
"ADMIN_MONITORING_AUTO_CLEANUP": "🧹 Автоочищення логів",

View File

@@ -138,6 +138,20 @@
"ADMIN_MESSAGES_BY_CRITERIA":"🔍按条件",
"ADMIN_MESSAGES_BY_SUBSCRIPTIONS":"🎯按订阅",
"ADMIN_MESSAGES_HISTORY":"📋历史记录",
"ADMIN_PINNED_MESSAGE":"📌置顶消息",
"ADMIN_PINNED_MESSAGE_UPDATE":"✏️更新",
"ADMIN_PINNED_MESSAGE_DELETE":"🗑️删除并停用",
"ADMIN_PINNED_POSITION_BEFORE":"⬆️菜单前发送",
"ADMIN_PINNED_POSITION_AFTER":"⬇️菜单后发送",
"ADMIN_PINNED_START_EVERY_TIME":"🔁 每次 /start 时发送",
"ADMIN_PINNED_START_ONCE":"🚫 仅发送一次",
"ADMIN_PINNED_NO_CONTENT":"❌ 无法读取消息中的文本或媒体,请重试。",
"ADMIN_PINNED_SAVING":"📌 消息已保存。开始向用户发送并置顶...",
"ADMIN_PINNED_UPDATED":"✅ <b>置顶消息已更新</b>\n\n👥 收件人: {total}\n✅ 已发送: {sent}\n⚠ 错误: {failed}",
"ADMIN_PINNED_SAVED_ASK_BROADCAST":"📌 <b>消息已保存!</b>\n\n选择如何向用户发送消息\n\n• <b>立即广播</b> — 将发送并置顶给所有活跃用户\n• <b>仅在 /start 时</b> — 用户将在下次启动机器人时看到",
"ADMIN_PINNED_SAVED_NO_BROADCAST":"✅ <b>置顶消息已保存</b>\n\n未执行广播。用户将在下次输入 /start 时看到消息。",
"ADMIN_PINNED_BROADCAST_NOW":"📨 立即广播给所有人",
"ADMIN_PINNED_BROADCAST_ON_START":"⏳ 仅在 /start 时",
"ADMIN_MONITORING":"🔍监控",
"ADMIN_MONITORING_ALL_LOGS":"📋所有日志",
"ADMIN_MONITORING_AUTOPAY_SETTINGS":"💳自动支付设置",

View File

@@ -0,0 +1,346 @@
import asyncio
import logging
from datetime import datetime
from typing import Optional
from aiogram import Bot
from aiogram.exceptions import (
TelegramBadRequest,
TelegramForbiddenError,
TelegramRetryAfter,
)
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.database.crud.user import get_users_list
from app.database.database import AsyncSessionLocal
from app.database.models import PinnedMessage, User, UserStatus
from app.utils.validators import sanitize_html, validate_html_tags
logger = logging.getLogger(__name__)
async def get_active_pinned_message(db: AsyncSession) -> Optional[PinnedMessage]:
result = await db.execute(
select(PinnedMessage)
.where(PinnedMessage.is_active.is_(True))
.order_by(PinnedMessage.created_at.desc())
.limit(1)
)
return result.scalar_one_or_none()
async def set_active_pinned_message(
db: AsyncSession,
content: str,
created_by: Optional[int] = None,
media_type: Optional[str] = None,
media_file_id: Optional[str] = None,
send_before_menu: Optional[bool] = None,
send_on_every_start: Optional[bool] = None,
) -> PinnedMessage:
sanitized_content = sanitize_html(content or "")
is_valid, error_message = validate_html_tags(sanitized_content)
if not is_valid:
raise ValueError(error_message)
if media_type not in {None, "photo", "video"}:
raise ValueError("Поддерживаются только фото или видео в закрепленном сообщении")
if created_by is not None:
creator_id = await db.scalar(select(User.id).where(User.id == created_by))
else:
creator_id = None
previous_active = await get_active_pinned_message(db)
await db.execute(
update(PinnedMessage)
.where(PinnedMessage.is_active.is_(True))
.values(is_active=False)
)
pinned_message = PinnedMessage(
content=sanitized_content,
media_type=media_type,
media_file_id=media_file_id,
is_active=True,
created_by=creator_id,
send_before_menu=(
send_before_menu
if send_before_menu is not None
else getattr(previous_active, "send_before_menu", True)
),
send_on_every_start=(
send_on_every_start
if send_on_every_start is not None
else getattr(previous_active, "send_on_every_start", True)
),
)
db.add(pinned_message)
await db.commit()
await db.refresh(pinned_message)
logger.info("Создано новое закрепленное сообщение #%s", pinned_message.id)
return pinned_message
async def deactivate_active_pinned_message(db: AsyncSession) -> Optional[PinnedMessage]:
pinned_message = await get_active_pinned_message(db)
if not pinned_message:
return None
pinned_message.is_active = False
pinned_message.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(pinned_message)
logger.info("Деактивировано закрепленное сообщение #%s", pinned_message.id)
return pinned_message
async def deliver_pinned_message_to_user(
bot: Bot,
db: AsyncSession,
user: User,
pinned_message: Optional[PinnedMessage] = None,
) -> bool:
pinned_message = pinned_message or await get_active_pinned_message(db)
if not pinned_message:
return False
if not pinned_message.send_on_every_start:
last_pinned_id = getattr(user, "last_pinned_message_id", None)
if last_pinned_id == pinned_message.id:
return False
success = await _send_and_pin_message(bot, user.telegram_id, pinned_message)
if success:
await _mark_pinned_delivery(user_id=getattr(user, "id", None), pinned_message_id=pinned_message.id)
return success
async def broadcast_pinned_message(
bot: Bot,
db: AsyncSession,
pinned_message: PinnedMessage,
) -> tuple[int, int]:
users: list[User] = []
offset = 0
batch_size = 5000
while True:
batch = await get_users_list(
db,
offset=offset,
limit=batch_size,
status=UserStatus.ACTIVE,
)
if not batch:
break
users.extend(batch)
offset += batch_size
sent_count = 0
failed_count = 0
semaphore = asyncio.Semaphore(3)
async def send_to_user(user: User) -> None:
nonlocal sent_count, failed_count
async with semaphore:
for attempt in range(3):
try:
success = await _send_and_pin_message(
bot,
user.telegram_id,
pinned_message,
)
if success:
sent_count += 1
else:
failed_count += 1
break
except TelegramRetryAfter as retry_error:
delay = min(retry_error.retry_after + 1, 30)
logger.warning(
"RetryAfter for user %s, waiting %s seconds",
user.telegram_id,
delay,
)
await asyncio.sleep(delay)
except Exception as send_error: # noqa: BLE001
logger.error(
"Ошибка отправки закрепленного сообщения пользователю %s: %s",
user.telegram_id,
send_error,
)
failed_count += 1
break
for i in range(0, len(users), 30):
batch = users[i : i + 30]
tasks = [send_to_user(user) for user in batch]
await asyncio.gather(*tasks)
await asyncio.sleep(0.05)
return sent_count, failed_count
async def unpin_active_pinned_message(
bot: Bot,
db: AsyncSession,
) -> tuple[int, int, bool]:
pinned_message = await deactivate_active_pinned_message(db)
if not pinned_message:
return 0, 0, False
users: list[User] = []
offset = 0
batch_size = 5000
while True:
batch = await get_users_list(
db,
offset=offset,
limit=batch_size,
status=UserStatus.ACTIVE,
)
if not batch:
break
users.extend(batch)
offset += batch_size
unpinned_count = 0
failed_count = 0
semaphore = asyncio.Semaphore(5)
async def unpin_for_user(user: User) -> None:
nonlocal unpinned_count, failed_count
async with semaphore:
try:
success = await _unpin_message_for_user(bot, user.telegram_id)
if success:
unpinned_count += 1
else:
failed_count += 1
except TelegramRetryAfter as retry_error:
delay = min(retry_error.retry_after + 1, 30)
logger.warning(
"RetryAfter while unpinning for user %s, waiting %s seconds",
user.telegram_id,
delay,
)
await asyncio.sleep(delay)
await unpin_for_user(user)
except Exception as error: # noqa: BLE001
logger.error(
"Ошибка открепления сообщения у пользователя %s: %s",
user.telegram_id,
error,
)
failed_count += 1
for i in range(0, len(users), 40):
batch = users[i : i + 40]
tasks = [unpin_for_user(user) for user in batch]
await asyncio.gather(*tasks)
await asyncio.sleep(0.05)
return unpinned_count, failed_count, True
async def _mark_pinned_delivery(
user_id: Optional[int],
pinned_message_id: int,
) -> None:
if not user_id:
return
async with AsyncSessionLocal() as session:
await session.execute(
update(User)
.where(User.id == user_id)
.values(
last_pinned_message_id=pinned_message_id,
updated_at=datetime.utcnow(),
)
)
await session.commit()
async def _send_and_pin_message(bot: Bot, chat_id: int, pinned_message: PinnedMessage) -> bool:
try:
await bot.unpin_all_chat_messages(chat_id=chat_id)
except TelegramBadRequest:
pass
except TelegramForbiddenError:
return False
try:
if pinned_message.media_type == "photo" and pinned_message.media_file_id:
sent_message = await bot.send_photo(
chat_id=chat_id,
photo=pinned_message.media_file_id,
caption=pinned_message.content or None,
parse_mode="HTML" if pinned_message.content else None,
disable_notification=True,
)
elif pinned_message.media_type == "video" and pinned_message.media_file_id:
sent_message = await bot.send_video(
chat_id=chat_id,
video=pinned_message.media_file_id,
caption=pinned_message.content or None,
parse_mode="HTML" if pinned_message.content else None,
disable_notification=True,
)
else:
sent_message = await bot.send_message(
chat_id=chat_id,
text=pinned_message.content,
parse_mode="HTML",
disable_web_page_preview=True,
disable_notification=True,
)
await bot.pin_chat_message(
chat_id=chat_id,
message_id=sent_message.message_id,
disable_notification=True,
)
return True
except TelegramForbiddenError:
return False
except TelegramBadRequest as error:
logger.warning(
"Некорректный запрос при отправке закрепленного сообщения в чат %s: %s",
chat_id,
error,
)
except Exception as error: # noqa: BLE001
logger.error(
"Не удалось отправить закрепленное сообщение пользователю %s: %s",
chat_id,
error,
)
return False
async def _unpin_message_for_user(bot: Bot, chat_id: int) -> bool:
try:
await bot.unpin_all_chat_messages(chat_id=chat_id)
return True
except TelegramForbiddenError:
return False
except TelegramBadRequest:
return False
except Exception as error: # noqa: BLE001
logger.error(
"Не удалось открепить сообщение у пользователя %s: %s",
chat_id,
error,
)
return False

View File

@@ -1142,12 +1142,14 @@ class RemnaWaveService:
async with self.get_api_client() as api:
panel_users = []
start = 0
size = 100
size = 500 # Увеличен размер батча для ускорения загрузки
while True:
logger.info(f"📥 Загружаем пользователей: start={start}, size={size}")
response = await api.get_all_users(start=start, size=size, enrich_happ_links=True)
# enrich_happ_links=False - happ_crypto_link уже возвращается API в поле happ.cryptoLink
# Не делаем дополнительные HTTP-запросы для каждого пользователя
response = await api.get_all_users(start=start, size=size, enrich_happ_links=False)
users_batch = response['users']
total_users = response['total']
@@ -1370,20 +1372,40 @@ class RemnaWaveService:
processed_count = 0
cleanup_uuid_mutations: List[_UUIDMapMutation] = []
for telegram_id, db_user in bot_users_by_telegram_id.items():
if telegram_id not in panel_telegram_ids and hasattr(db_user, 'subscription') and db_user.subscription:
# Собираем список пользователей для деактивации
users_to_deactivate = [
(telegram_id, db_user)
for telegram_id, db_user in bot_users_by_telegram_id.items()
if telegram_id not in panel_telegram_ids
and hasattr(db_user, 'subscription')
and db_user.subscription
]
if users_to_deactivate:
logger.info(f"📊 Найдено {len(users_to_deactivate)} пользователей для деактивации")
# Используем один API клиент для всех операций сброса HWID
hwid_api_client = None
try:
hwid_api_client = self.get_api_client()
await hwid_api_client.__aenter__()
except Exception as api_init_error:
logger.warning(f"⚠️ Не удалось создать API клиент для сброса HWID: {api_init_error}")
hwid_api_client = None
try:
for telegram_id, db_user in users_to_deactivate:
cleanup_mutation: Optional[_UUIDMapMutation] = None
try:
logger.info(f"🗑️ Деактивация подписки пользователя {telegram_id} (нет в панели)")
subscription = db_user.subscription
if db_user.remnawave_uuid:
if db_user.remnawave_uuid and hwid_api_client:
try:
async with self.get_api_client() as api:
devices_reset = await api.reset_user_devices(db_user.remnawave_uuid)
if devices_reset:
logger.info(f"🔧 Сброшены HWID устройства для пользователя {telegram_id}")
devices_reset = await hwid_api_client.reset_user_devices(db_user.remnawave_uuid)
if devices_reset:
logger.info(f"🔧 Сброшены HWID устройства для пользователя {telegram_id}")
except Exception as hwid_error:
logger.error(f"❌ Ошибка сброса HWID устройств для {telegram_id}: {hwid_error}")
@@ -1459,21 +1481,26 @@ class RemnaWaveService:
cleanup_uuid_mutations.clear()
stats["errors"] += batch_size
break # Прерываем цикл при ошибке коммита
else:
# Увеличиваем счетчик для отслеживания прогресса
processed_count += 1
# Коммитим оставшиеся изменения
try:
await db.commit()
cleanup_uuid_mutations.clear()
except Exception as final_commit_error:
logger.error(f"❌ Ошибка финального коммита при деактивации: {final_commit_error}")
await db.rollback()
for mutation in reversed(cleanup_uuid_mutations):
mutation.rollback()
cleanup_uuid_mutations.clear()
# Коммитим оставшиеся изменения
try:
await db.commit()
cleanup_uuid_mutations.clear()
except Exception as final_commit_error:
logger.error(f"❌ Ошибка финального коммита при деактивации: {final_commit_error}")
await db.rollback()
for mutation in reversed(cleanup_uuid_mutations):
mutation.rollback()
cleanup_uuid_mutations.clear()
finally:
# Закрываем API клиент
if hwid_api_client:
try:
await hwid_api_client.__aexit__(None, None, None)
except Exception:
pass
logger.info(f"🎯 Синхронизация завершена: создано {stats['created']}, обновлено {stats['updated']}, деактивировано {stats['deleted']}, ошибок {stats['errors']}")
return stats
@@ -1677,91 +1704,120 @@ class RemnaWaveService:
try:
stats = {"created": 0, "updated": 0, "errors": 0}
batch_size = 100
batch_size = 500 # Увеличен для ускорения
offset = 0
concurrent_limit = 10 # Параллельные запросы к API
async with self.get_api_client() as api:
semaphore = asyncio.Semaphore(concurrent_limit)
while True:
users = await get_users_list(db, offset=offset, limit=batch_size)
if not users:
break
for user in users:
if not user.subscription:
continue
# Фильтруем пользователей с подписками и готовим данные
users_with_subscriptions = [u for u in users if u.subscription]
try:
subscription = user.subscription
hwid_limit = resolve_hwid_device_limit_for_payload(subscription)
if not users_with_subscriptions:
if len(users) < batch_size:
break
offset += batch_size
continue
expire_at = self._safe_expire_at_for_panel(subscription.end_date)
status = UserStatus.ACTIVE if subscription.is_active else UserStatus.DISABLED
# Подготавливаем задачи для параллельного выполнения
async def process_user(user):
async with semaphore:
try:
subscription = user.subscription
hwid_limit = resolve_hwid_device_limit_for_payload(subscription)
expire_at = self._safe_expire_at_for_panel(subscription.end_date)
username = settings.format_remnawave_username(
full_name=user.full_name,
username=user.username,
telegram_id=user.telegram_id,
)
# Определяем статус для панели
is_subscription_active = (
subscription.status in (
SubscriptionStatus.ACTIVE.value,
SubscriptionStatus.TRIAL.value,
)
and subscription.end_date > datetime.utcnow()
)
status = UserStatus.ACTIVE if is_subscription_active else UserStatus.DISABLED
create_kwargs = dict(
username=username,
expire_at=expire_at,
status=status,
traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0,
traffic_limit_strategy=TrafficLimitStrategy.MONTH,
telegram_id=user.telegram_id,
description=settings.format_remnawave_user_description(
username = settings.format_remnawave_username(
full_name=user.full_name,
username=user.username,
telegram_id=user.telegram_id
),
active_internal_squads=subscription.connected_squads,
)
telegram_id=user.telegram_id,
)
if hwid_limit is not None:
create_kwargs['hwid_device_limit'] = hwid_limit
if user.remnawave_uuid:
update_kwargs = dict(
uuid=user.remnawave_uuid,
status=status,
create_kwargs = dict(
username=username,
expire_at=expire_at,
traffic_limit_bytes=create_kwargs['traffic_limit_bytes'],
status=status,
traffic_limit_bytes=subscription.traffic_limit_gb * (1024**3) if subscription.traffic_limit_gb > 0 else 0,
traffic_limit_strategy=TrafficLimitStrategy.MONTH,
description=create_kwargs['description'],
telegram_id=user.telegram_id,
description=settings.format_remnawave_user_description(
full_name=user.full_name,
username=user.username,
telegram_id=user.telegram_id
),
active_internal_squads=subscription.connected_squads,
)
if hwid_limit is not None:
update_kwargs['hwid_device_limit'] = hwid_limit
create_kwargs['hwid_device_limit'] = hwid_limit
try:
await api.update_user(**update_kwargs)
stats["updated"] += 1
except RemnaWaveAPIError as api_error:
if api_error.status_code == 404:
logger.warning(
"⚠️ Не найден пользователь %s в панели, создаем заново",
user.remnawave_uuid,
)
if user.remnawave_uuid:
update_kwargs = dict(
uuid=user.remnawave_uuid,
status=status,
expire_at=expire_at,
traffic_limit_bytes=create_kwargs['traffic_limit_bytes'],
traffic_limit_strategy=TrafficLimitStrategy.MONTH,
description=create_kwargs['description'],
active_internal_squads=subscription.connected_squads,
)
new_user = await api.create_user(**create_kwargs)
user.remnawave_uuid = new_user.uuid
subscription.remnawave_short_uuid = new_user.short_uuid
stats["created"] += 1
else:
raise
else:
new_user = await api.create_user(**create_kwargs)
if hwid_limit is not None:
update_kwargs['hwid_device_limit'] = hwid_limit
try:
await api.update_user(**update_kwargs)
return ("updated", user, None)
except RemnaWaveAPIError as api_error:
if api_error.status_code == 404:
new_user = await api.create_user(**create_kwargs)
return ("created", user, new_user)
else:
raise
else:
new_user = await api.create_user(**create_kwargs)
return ("created", user, new_user)
except Exception as e:
logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}")
return ("error", user, None)
# Выполняем параллельно
tasks = [process_user(user) for user in users_with_subscriptions]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Обрабатываем результаты
for result in results:
if isinstance(result, Exception):
stats["errors"] += 1
continue
action, user, new_user = result
if action == "created":
if new_user:
user.remnawave_uuid = new_user.uuid
subscription.remnawave_short_uuid = new_user.short_uuid
stats["created"] += 1
except Exception as e:
logger.error(f"Ошибка синхронизации пользователя {user.telegram_id} в панель: {e}")
user.subscription.remnawave_short_uuid = new_user.short_uuid
stats["created"] += 1
elif action == "updated":
stats["updated"] += 1
else:
stats["errors"] += 1
try:
@@ -1772,7 +1828,12 @@ class RemnaWaveService:
commit_error,
)
await db.rollback()
stats["errors"] += len(users)
stats["errors"] += len(users_with_subscriptions)
logger.info(
f"📦 Обработано {offset + len(users)} пользователей: "
f"создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}"
)
if len(users) < batch_size:
break
@@ -1783,7 +1844,7 @@ class RemnaWaveService:
f"✅ Синхронизация в панель завершена: создано {stats['created']}, обновлено {stats['updated']}, ошибок {stats['errors']}"
)
return stats
except Exception as e:
logger.error(f"Ошибка синхронизации пользователей в панель: {e}")
return {"created": 0, "updated": 0, "errors": 1}

View File

@@ -134,6 +134,8 @@ class AdminStates(StatesGroup):
creating_server_country = State()
editing_welcome_text = State()
editing_pinned_message = State()
confirming_pinned_broadcast = State()
waiting_for_message_buttons = "waiting_for_message_buttons"
editing_promo_offer_message = State()

View File

@@ -4,14 +4,17 @@ from datetime import datetime
import html
ALLOWED_HTML_TAGS = {
'b', 'strong',
'i', 'em',
'u', 'ins',
's', 'strike', 'del',
'code',
'pre',
'a',
'blockquote'
'b', 'strong', # жирный
'i', 'em', # курсив
'u', 'ins', # подчёркнутый
's', 'strike', 'del', # зачёркнутый
'code', # моноширинный
'pre', # блок кода
'a', # ссылка
'blockquote', # цитата
'tg-spoiler', # спойлер
'tg-emoji', # кастомный эмодзи
'span', # для class="tg-spoiler"
}
SELF_CLOSING_TAGS = {
@@ -276,14 +279,16 @@ def fix_html_tags(text: str) -> str:
def get_html_help_text() -> str:
return """<b>Поддерживаемые HTML теги:</b>
• <code>&lt;b&gt;жирный&lt;/b&gt;</code> или <code>&lt;strong&gt;жирный&lt;/strong&gt;</code>
• <code>&lt;i&gt;курсив&lt;/i&gt;</code> или <code>&lt;em&gt;курсив&lt;/em&gt;</code>
• <code>&lt;u&gt;подчеркнутый&lt;/u&gt;</code>
• <code>&lt;s&gt;зачеркнутый&lt;/s&gt;</code>
• <code>&lt;b&gt;жирный&lt;/b&gt;</code> или <code>&lt;strong&gt;&lt;/strong&gt;</code>
• <code>&lt;i&gt;курсив&lt;/i&gt;</code> или <code>&lt;em&gt;&lt;/em&gt;</code>
• <code>&lt;u&gt;подчёркнутый&lt;/u&gt;</code>
• <code>&lt;s&gt;зачёркнутый&lt;/s&gt;</code>
• <code>&lt;code&gt;моноширинный&lt;/code&gt;</code>
• <code>&lt;pre&gt;блок кода&lt;/pre&gt;</code>
• <code>&lt;a href="url"&gt;ссылка&lt;/a&gt;</code>
• <code>&lt;blockquote&gt;цитата&lt;/blockquote&gt;</code>
• <code>&lt;tg-spoiler&gt;спойлер&lt;/tg-spoiler&gt;</code>
• <code>&lt;tg-emoji emoji-id="123"&gt;😀&lt;/tg-emoji&gt;</code>
<b>⚠️ Важные правила:</b>
• Каждый открывающий тег должен быть закрыт
@@ -292,11 +297,9 @@ def get_html_help_text() -> str:
<b>❌ Неправильно:</b>
<code>&lt;b&gt;жирный &lt;i&gt;курсив&lt;/b&gt;&lt;/i&gt;</code>
<code>&lt;a href=google.com&gt;ссылка&lt;/a&gt;</code>
<b>✅ Правильно:</b>
<code>&lt;b&gt;жирный &lt;i&gt;курсив&lt;/i&gt;&lt;/b&gt;</code>
<code>&lt;a href="https://google.com"&gt;ссылка&lt;/a&gt;</code>"""
<code>&lt;b&gt;жирный &lt;i&gt;курсив&lt;/i&gt;&lt;/b&gt;</code>"""
def validate_rules_content(text: str) -> Tuple[bool, str, Optional[str]]:

View File

@@ -18,6 +18,7 @@ from .routes import (
menu_layout,
miniapp,
partners,
pinned_messages,
polls,
promocodes,
promo_groups,
@@ -145,6 +146,13 @@ OPENAPI_TAGS = [
"name": "contests",
"description": "Управление конкурсами: реферальными и ежедневными играми/раундами.",
},
{
"name": "pinned-messages",
"description": (
"Управление закреплёнными сообщениями: создание, обновление, рассылка и "
"настройка показа при /start."
),
},
]
@@ -224,6 +232,11 @@ def create_web_api_app() -> FastAPI:
app.include_router(partners.router, prefix="/partners", tags=["partners"])
app.include_router(polls.router, prefix="/polls", tags=["polls"])
app.include_router(logs.router, prefix="/logs", tags=["logs"])
app.include_router(
pinned_messages.router,
prefix="/pinned-messages",
tags=["pinned-messages"],
)
app.include_router(
subscription_events.router,
prefix="/notifications/subscriptions",

View File

@@ -5,6 +5,7 @@ from . import (
media,
miniapp,
partners,
pinned_messages,
polls,
promo_offers,
user_messages,
@@ -31,6 +32,7 @@ __all__ = [
"media",
"miniapp",
"partners",
"pinned_messages",
"polls",
"promo_offers",
"user_messages",

View File

@@ -0,0 +1,377 @@
from __future__ import annotations
from datetime import datetime
from typing import Any, Optional
from aiogram import Bot
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import func, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database.models import PinnedMessage
from app.services.pinned_message_service import (
broadcast_pinned_message,
deactivate_active_pinned_message,
get_active_pinned_message,
set_active_pinned_message,
unpin_active_pinned_message,
)
from ..dependencies import get_db_session, require_api_token
from ..schemas.pinned_messages import (
PinnedMessageBroadcastResponse,
PinnedMessageCreateRequest,
PinnedMessageListResponse,
PinnedMessageResponse,
PinnedMessageSettingsRequest,
PinnedMessageUnpinResponse,
PinnedMessageUpdateRequest,
)
router = APIRouter()
def _serialize_pinned_message(msg: PinnedMessage) -> PinnedMessageResponse:
return PinnedMessageResponse(
id=msg.id,
content=msg.content,
media_type=msg.media_type,
media_file_id=msg.media_file_id,
send_before_menu=msg.send_before_menu,
send_on_every_start=msg.send_on_every_start,
is_active=msg.is_active,
created_by=msg.created_by,
created_at=msg.created_at,
updated_at=msg.updated_at,
)
def _get_bot() -> Bot:
"""Создать экземпляр бота для API операций."""
return Bot(
token=settings.BOT_TOKEN,
default=DefaultBotProperties(parse_mode=ParseMode.HTML),
)
@router.get("", response_model=PinnedMessageListResponse)
async def list_pinned_messages(
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
active_only: bool = Query(False),
token: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> PinnedMessageListResponse:
"""Получить список всех закреплённых сообщений."""
query = select(PinnedMessage).order_by(PinnedMessage.created_at.desc())
count_query = select(func.count(PinnedMessage.id))
if active_only:
query = query.where(PinnedMessage.is_active.is_(True))
count_query = count_query.where(PinnedMessage.is_active.is_(True))
total = await db.scalar(count_query) or 0
result = await db.execute(query.offset(offset).limit(limit))
items = result.scalars().all()
return PinnedMessageListResponse(
items=[_serialize_pinned_message(msg) for msg in items],
total=total,
limit=limit,
offset=offset,
)
@router.get("/active", response_model=Optional[PinnedMessageResponse])
async def get_active_message(
token: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> Optional[PinnedMessageResponse]:
"""Получить текущее активное закреплённое сообщение."""
msg = await get_active_pinned_message(db)
if not msg:
return None
return _serialize_pinned_message(msg)
@router.get("/{message_id}", response_model=PinnedMessageResponse)
async def get_pinned_message(
message_id: int,
token: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> PinnedMessageResponse:
"""Получить закреплённое сообщение по ID."""
result = await db.execute(
select(PinnedMessage).where(PinnedMessage.id == message_id)
)
msg = result.scalar_one_or_none()
if not msg:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Pinned message not found")
return _serialize_pinned_message(msg)
@router.post("", response_model=PinnedMessageBroadcastResponse, status_code=status.HTTP_201_CREATED)
async def create_pinned_message(
payload: PinnedMessageCreateRequest,
broadcast: bool = Query(False, description="Разослать сообщение всем пользователям (по умолчанию False — только при /start)"),
token: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> PinnedMessageBroadcastResponse:
"""
Создать новое закреплённое сообщение.
Автоматически деактивирует предыдущее активное сообщение.
- broadcast=False (по умолчанию): пользователи увидят при следующем /start
- broadcast=True: рассылает сообщение всем активным пользователям сразу
"""
content = payload.content.strip()
if not content and not payload.media:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
"Either content or media must be provided"
)
media_type = payload.media.type if payload.media else None
media_file_id = payload.media.file_id if payload.media else None
try:
msg = await set_active_pinned_message(
db=db,
content=content,
created_by=None,
media_type=media_type,
media_file_id=media_file_id,
send_before_menu=payload.send_before_menu,
send_on_every_start=payload.send_on_every_start,
)
except ValueError as e:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e))
sent_count = 0
failed_count = 0
if broadcast:
sent_count, failed_count = await broadcast_pinned_message(_get_bot(), db, msg)
return PinnedMessageBroadcastResponse(
message=_serialize_pinned_message(msg),
sent_count=sent_count,
failed_count=failed_count,
)
@router.patch("/{message_id}", response_model=PinnedMessageResponse)
async def update_pinned_message(
message_id: int,
payload: PinnedMessageUpdateRequest,
token: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> PinnedMessageResponse:
"""
Обновить закреплённое сообщение.
Можно обновить контент, медиа и настройки показа.
Не делает рассылку — для рассылки используйте POST /pinned-messages/{id}/broadcast.
"""
result = await db.execute(
select(PinnedMessage).where(PinnedMessage.id == message_id)
)
msg = result.scalar_one_or_none()
if not msg:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Pinned message not found")
if payload.content is not None:
from app.utils.validators import sanitize_html, validate_html_tags
sanitized = sanitize_html(payload.content)
is_valid, error = validate_html_tags(sanitized)
if not is_valid:
raise HTTPException(status.HTTP_400_BAD_REQUEST, error)
msg.content = sanitized
if payload.media is not None:
if payload.media.type not in ("photo", "video"):
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
"Only photo or video media types are supported"
)
msg.media_type = payload.media.type
msg.media_file_id = payload.media.file_id
if payload.send_before_menu is not None:
msg.send_before_menu = payload.send_before_menu
if payload.send_on_every_start is not None:
msg.send_on_every_start = payload.send_on_every_start
msg.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(msg)
return _serialize_pinned_message(msg)
@router.patch("/{message_id}/settings", response_model=PinnedMessageResponse)
async def update_pinned_message_settings(
message_id: int,
payload: PinnedMessageSettingsRequest,
token: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> PinnedMessageResponse:
"""
Обновить только настройки закреплённого сообщения.
- send_before_menu: показывать до или после меню
- send_on_every_start: показывать при каждом /start или только один раз
"""
result = await db.execute(
select(PinnedMessage).where(PinnedMessage.id == message_id)
)
msg = result.scalar_one_or_none()
if not msg:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Pinned message not found")
if payload.send_before_menu is not None:
msg.send_before_menu = payload.send_before_menu
if payload.send_on_every_start is not None:
msg.send_on_every_start = payload.send_on_every_start
msg.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(msg)
return _serialize_pinned_message(msg)
@router.post("/{message_id}/activate", response_model=PinnedMessageBroadcastResponse)
async def activate_pinned_message(
message_id: int,
broadcast: bool = Query(False, description="Разослать сообщение всем пользователям (по умолчанию False — только при /start)"),
token: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> PinnedMessageBroadcastResponse:
"""
Активировать закреплённое сообщение.
Деактивирует текущее активное сообщение и активирует указанное.
- broadcast=False (по умолчанию): пользователи увидят при следующем /start
- broadcast=True: рассылает сообщение всем активным пользователям сразу
"""
result = await db.execute(
select(PinnedMessage).where(PinnedMessage.id == message_id)
)
msg = result.scalar_one_or_none()
if not msg:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Pinned message not found")
# Деактивируем все активные
await db.execute(
update(PinnedMessage)
.where(PinnedMessage.is_active.is_(True))
.values(is_active=False, updated_at=datetime.utcnow())
)
# Активируем указанное
msg.is_active = True
msg.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(msg)
sent_count = 0
failed_count = 0
if broadcast:
sent_count, failed_count = await broadcast_pinned_message(_get_bot(), db, msg)
return PinnedMessageBroadcastResponse(
message=_serialize_pinned_message(msg),
sent_count=sent_count,
failed_count=failed_count,
)
@router.post("/{message_id}/broadcast", response_model=PinnedMessageBroadcastResponse)
async def broadcast_message(
message_id: int,
token: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> PinnedMessageBroadcastResponse:
"""
Разослать закреплённое сообщение всем активным пользователям.
Работает для любого сообщения, независимо от его статуса активности.
"""
result = await db.execute(
select(PinnedMessage).where(PinnedMessage.id == message_id)
)
msg = result.scalar_one_or_none()
if not msg:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Pinned message not found")
sent_count, failed_count = await broadcast_pinned_message(_get_bot(), db, msg)
return PinnedMessageBroadcastResponse(
message=_serialize_pinned_message(msg),
sent_count=sent_count,
failed_count=failed_count,
)
@router.post("/active/deactivate", response_model=Optional[PinnedMessageResponse])
async def deactivate_active_message(
token: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> Optional[PinnedMessageResponse]:
"""
Деактивировать текущее активное закреплённое сообщение.
Не удаляет сообщение и не открепляет у пользователей.
"""
msg = await deactivate_active_pinned_message(db)
if not msg:
return None
return _serialize_pinned_message(msg)
@router.post("/active/unpin", response_model=PinnedMessageUnpinResponse)
async def unpin_active_message(
token: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> PinnedMessageUnpinResponse:
"""
Открепить сообщение у всех пользователей и деактивировать.
Удаляет закреплённое сообщение из чатов всех активных пользователей.
"""
unpinned_count, failed_count, was_active = await unpin_active_pinned_message(_get_bot(), db)
return PinnedMessageUnpinResponse(
unpinned_count=unpinned_count,
failed_count=failed_count,
was_active=was_active,
)
@router.delete("/{message_id}", status_code=status.HTTP_204_NO_CONTENT, response_model=None)
async def delete_pinned_message(
message_id: int,
token: Any = Depends(require_api_token),
db: AsyncSession = Depends(get_db_session),
) -> None:
"""
Удалить закреплённое сообщение.
Если сообщение активно, сначала будет деактивировано.
Не открепляет сообщение у пользователей — для этого используйте /active/unpin.
"""
result = await db.execute(
select(PinnedMessage).where(PinnedMessage.id == message_id)
)
msg = result.scalar_one_or_none()
if not msg:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Pinned message not found")
await db.delete(msg)
await db.commit()

View File

@@ -0,0 +1,66 @@
from __future__ import annotations
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
class PinnedMessageMedia(BaseModel):
type: str = Field(pattern=r"^(photo|video)$")
file_id: str
class PinnedMessageBase(BaseModel):
content: Optional[str] = Field(None, max_length=4000)
send_before_menu: bool = True
send_on_every_start: bool = True
class PinnedMessageCreateRequest(PinnedMessageBase):
content: str = Field(..., min_length=1, max_length=4000)
media: Optional[PinnedMessageMedia] = None
class PinnedMessageUpdateRequest(BaseModel):
content: Optional[str] = Field(None, max_length=4000)
send_before_menu: Optional[bool] = None
send_on_every_start: Optional[bool] = None
media: Optional[PinnedMessageMedia] = None
class PinnedMessageSettingsRequest(BaseModel):
send_before_menu: Optional[bool] = None
send_on_every_start: Optional[bool] = None
class PinnedMessageResponse(BaseModel):
id: int
content: Optional[str]
media_type: Optional[str] = None
media_file_id: Optional[str] = None
send_before_menu: bool
send_on_every_start: bool
is_active: bool
created_by: Optional[int] = None
created_at: datetime
updated_at: Optional[datetime] = None
class PinnedMessageBroadcastResponse(BaseModel):
message: PinnedMessageResponse
sent_count: int
failed_count: int
class PinnedMessageUnpinResponse(BaseModel):
unpinned_count: int
failed_count: int
was_active: bool
class PinnedMessageListResponse(BaseModel):
items: list[PinnedMessageResponse]
total: int
limit: int
offset: int

View File

@@ -0,0 +1,57 @@
"""add pinned start mode and user last pin
Revision ID: 1b2e3d4f5a6b
Revises: 7a3c0b8f5b84
Create Date: 2025-01-01 00:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '1b2e3d4f5a6b'
down_revision = '7a3c0b8f5b84'
branch_labels = None
depends_on = None
def _table_exists(inspector: sa.Inspector, table_name: str) -> bool:
return table_name in inspector.get_table_names()
def _column_exists(inspector: sa.Inspector, table_name: str, column_name: str) -> bool:
if not _table_exists(inspector, table_name):
return False
columns = {col["name"] for col in inspector.get_columns(table_name)}
return column_name in columns
def upgrade():
bind = op.get_bind()
inspector = sa.inspect(bind)
if _table_exists(inspector, "pinned_messages"):
if not _column_exists(inspector, "pinned_messages", "send_on_every_start"):
op.add_column(
'pinned_messages',
sa.Column('send_on_every_start', sa.Boolean(), nullable=False, server_default='1'),
)
if _table_exists(inspector, "users"):
if not _column_exists(inspector, "users", "last_pinned_message_id"):
op.add_column(
'users',
sa.Column('last_pinned_message_id', sa.Integer(), nullable=True),
)
def downgrade():
bind = op.get_bind()
inspector = sa.inspect(bind)
if _column_exists(inspector, "users", "last_pinned_message_id"):
op.drop_column('users', 'last_pinned_message_id')
if _column_exists(inspector, "pinned_messages", "send_on_every_start"):
op.drop_column('pinned_messages', 'send_on_every_start')

View File

@@ -0,0 +1,75 @@
"""add media fields to pinned messages"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = "5f2a3e099427"
down_revision: Union[str, None] = "c9c71d04f0a1"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
TABLE_NAME = "pinned_messages"
def _table_exists(inspector: sa.Inspector) -> bool:
return TABLE_NAME in inspector.get_table_names()
def _column_missing(inspector: sa.Inspector, column_name: str) -> bool:
columns = {column.get("name") for column in inspector.get_columns(TABLE_NAME)}
return column_name not in columns
def upgrade() -> None:
bind = op.get_bind()
inspector = sa.inspect(bind)
if not _table_exists(inspector):
return
if _column_missing(inspector, "media_type"):
op.add_column(
TABLE_NAME,
sa.Column("media_type", sa.String(length=32), nullable=True),
)
if _column_missing(inspector, "media_file_id"):
op.add_column(
TABLE_NAME,
sa.Column("media_file_id", sa.String(length=255), nullable=True),
)
# Ensure content has a default value for media-only messages
op.alter_column(
TABLE_NAME,
"content",
existing_type=sa.Text(),
nullable=False,
server_default="",
)
def downgrade() -> None:
bind = op.get_bind()
inspector = sa.inspect(bind)
if not _table_exists(inspector):
return
if not _column_missing(inspector, "media_type"):
op.drop_column(TABLE_NAME, "media_type")
if not _column_missing(inspector, "media_file_id"):
op.drop_column(TABLE_NAME, "media_file_id")
op.alter_column(
TABLE_NAME,
"content",
existing_type=sa.Text(),
nullable=False,
server_default=None,
)

View File

@@ -0,0 +1,59 @@
"""add send_before_menu to pinned messages
Revision ID: 7a3c0b8f5b84
Revises: 5f2a3e099427
Create Date: 2025-02-05 00:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "7a3c0b8f5b84"
down_revision = "5f2a3e099427"
branch_labels = None
depends_on = None
TABLE_NAME = "pinned_messages"
def _table_exists(inspector: sa.Inspector) -> bool:
return TABLE_NAME in inspector.get_table_names()
def _column_exists(inspector: sa.Inspector, column_name: str) -> bool:
if not _table_exists(inspector):
return False
columns = {col["name"] for col in inspector.get_columns(TABLE_NAME)}
return column_name in columns
def upgrade() -> None:
bind = op.get_bind()
inspector = sa.inspect(bind)
if not _table_exists(inspector):
return
if _column_exists(inspector, "send_before_menu"):
return
op.add_column(
TABLE_NAME,
sa.Column(
"send_before_menu",
sa.Boolean(),
nullable=False,
server_default=sa.text("1"),
),
)
def downgrade() -> None:
bind = op.get_bind()
inspector = sa.inspect(bind)
if _column_exists(inspector, "send_before_menu"):
op.drop_column(TABLE_NAME, "send_before_menu")

View File

@@ -0,0 +1,45 @@
"""add pinned messages table"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = "c9c71d04f0a1"
down_revision: Union[str, None] = "e3c1e0b5b4a7"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
TABLE_NAME = "pinned_messages"
def _table_exists(inspector: sa.Inspector) -> bool:
return TABLE_NAME in inspector.get_table_names()
def upgrade() -> None:
bind = op.get_bind()
inspector = sa.inspect(bind)
if _table_exists(inspector):
return
op.create_table(
TABLE_NAME,
sa.Column("id", sa.Integer(), primary_key=True, index=True),
sa.Column("content", sa.Text(), nullable=False),
sa.Column("is_active", sa.Boolean(), default=True),
sa.Column("created_by", sa.Integer(), sa.ForeignKey("users.id", ondelete="SET NULL"), nullable=True),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.now()),
sa.Column("updated_at", sa.DateTime(), server_default=sa.func.now(), onupdate=sa.func.now()),
)
def downgrade() -> None:
bind = op.get_bind()
inspector = sa.inspect(bind)
if _table_exists(inspector):
op.drop_table(TABLE_NAME)