mirror of
https://github.com/BEDOLAGA-DEV/remnawave-bedolaga-telegram-bot.git
synced 2026-02-28 23:35:59 +00:00
- Migrate 660+ datetime.utcnow() across 153 files to datetime.now(UTC) - Migrate 30+ datetime.now() without UTC to datetime.now(UTC) - Convert all 170 DateTime columns to DateTime(timezone=True) - Add migrate_datetime_to_timestamptz() in universal_migration with SET LOCAL timezone='UTC' safety - Remove 70+ .replace(tzinfo=None) workarounds - Fix utcfromtimestamp → fromtimestamp(..., tz=UTC) - Fix fromtimestamp() without tz= (system_logs, backup_service, referral_diagnostics) - Fix fromisoformat/isoparse to ensure aware output (platega, yookassa, wata, miniapp, nalogo) - Fix strptime() to add .replace(tzinfo=UTC) (backup_service, referral_diagnostics) - Fix datetime.combine() to include tzinfo=UTC (remnawave_sync, traffic_monitoring) - Fix datetime.max/datetime.min sentinels with .replace(tzinfo=UTC) - Rename panel_datetime_to_naive_utc → panel_datetime_to_utc - Remove DTZ003 from ruff ignore list
100 lines
3.1 KiB
Python
100 lines
3.1 KiB
Python
from __future__ import annotations
|
|
|
|
import secrets
|
|
from datetime import UTC, datetime
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import settings
|
|
from app.database.crud import web_api_token as crud
|
|
from app.database.models import WebApiToken
|
|
from app.database.universal_migration import ensure_default_web_api_token
|
|
from app.utils.security import generate_api_token, hash_api_token
|
|
|
|
|
|
class WebApiTokenService:
|
|
"""Сервис для управления токенами административного веб-API."""
|
|
|
|
def __init__(self):
|
|
self.algorithm = settings.WEB_API_TOKEN_HASH_ALGORITHM or 'sha256'
|
|
|
|
def hash_token(self, token: str) -> str:
|
|
return hash_api_token(token, self.algorithm) # type: ignore[arg-type]
|
|
|
|
async def authenticate(
|
|
self,
|
|
db: AsyncSession,
|
|
token_value: str,
|
|
*,
|
|
remote_ip: str | None = None,
|
|
) -> WebApiToken | None:
|
|
normalized_value = token_value.strip()
|
|
if not normalized_value:
|
|
return None
|
|
|
|
async def _load_token(value: str) -> WebApiToken | None:
|
|
token_hash = self.hash_token(value)
|
|
return await crud.get_token_by_hash(db, token_hash)
|
|
|
|
token = await _load_token(normalized_value)
|
|
|
|
if not token:
|
|
default_token = (settings.WEB_API_DEFAULT_TOKEN or '').strip()
|
|
if default_token and secrets.compare_digest(default_token, normalized_value):
|
|
await ensure_default_web_api_token()
|
|
token = await _load_token(default_token)
|
|
|
|
if not token or not token.is_active:
|
|
return None
|
|
|
|
if token.expires_at and token.expires_at < datetime.now(UTC):
|
|
return None
|
|
|
|
token.last_used_at = datetime.now(UTC)
|
|
if remote_ip:
|
|
token.last_used_ip = remote_ip
|
|
await db.flush()
|
|
return token
|
|
|
|
async def create_token(
|
|
self,
|
|
db: AsyncSession,
|
|
*,
|
|
name: str,
|
|
description: str | None = None,
|
|
expires_at: datetime | None = None,
|
|
created_by: str | None = None,
|
|
token_value: str | None = None,
|
|
) -> tuple[str, WebApiToken]:
|
|
plain_token = token_value or generate_api_token()
|
|
token_hash = self.hash_token(plain_token)
|
|
|
|
token = await crud.create_token(
|
|
db,
|
|
name=name,
|
|
token_hash=token_hash,
|
|
token_prefix=plain_token[:12],
|
|
description=description,
|
|
expires_at=expires_at,
|
|
created_by=created_by,
|
|
)
|
|
|
|
return plain_token, token
|
|
|
|
async def revoke_token(self, db: AsyncSession, token: WebApiToken) -> WebApiToken:
|
|
token.is_active = False
|
|
token.updated_at = datetime.now(UTC)
|
|
await db.flush()
|
|
await db.refresh(token)
|
|
return token
|
|
|
|
async def activate_token(self, db: AsyncSession, token: WebApiToken) -> WebApiToken:
|
|
token.is_active = True
|
|
token.updated_at = datetime.now(UTC)
|
|
await db.flush()
|
|
await db.refresh(token)
|
|
return token
|
|
|
|
|
|
web_api_token_service = WebApiTokenService()
|