from datetime import datetime import pytz import asyncio import time from collections import defaultdict, deque from typing import Dict, Deque from db import user_token_in_db_func moscow_tz = pytz.timezone('Europe/Moscow') async def log(session, user_token, message): user_token_db = await user_token_in_db_func(session, user_token) log_message = f"{[datetime.now(moscow_tz).strftime('%Y-%m-%d %H:%M %Z')]} {message}" user_token_db.logs = await append_line_with_limit(user_token_db.logs, log_message) session.add(user_token_db) await session.commit() await session.refresh(user_token_db) async def append_line_with_limit(text, new_line, max_lines = 5000) -> str: """ Append a new line to the text and maintain a maximum number of lines. When exceeding the limit, the oldest line is removed. Args: text: The existing text new_line: The line to append max_lines: Maximum number of lines to keep Returns: Updated text with the new line appended """ # Split text into lines lines = text.splitlines() if text else [] # Append the new line lines.append(new_line) # If we exceed the maximum lines, remove the oldest (first) line if len(lines) > max_lines: lines.pop(0) # Join lines back together with newlines return '\n'.join(lines) class UserRateLimiter: def __init__(self): # Store request timestamps for each IP self.ip_requests: Dict[str, Deque[float]] = defaultdict(deque) self.lock = asyncio.Lock() async def is_rate_limited(self, ip: str) -> bool: """ Check if user with given IP is making too many requests. Args: ip (str): User's IP address Returns: bool: True if user makes more than 30 requests per minute, False otherwise """ async with self.lock: current_time = time.time() # Remove timestamps older than 60 seconds while self.ip_requests[ip] and current_time - self.ip_requests[ip][0] > 60: self.ip_requests[ip].popleft() # Add current request self.ip_requests[ip].append(current_time) # Log the IP and request count request_count = len(self.ip_requests[ip]) # Return True if rate limit exceeded (more than 62 requests per minute) # Request every second + 2 for sure return request_count > 62 # Global instance rate_limiter = UserRateLimiter() async def is_rate_limited(ip: str) -> bool: return await rate_limiter.is_rate_limited(ip)