vpc_user/user_api/func.py

84 lines
2.6 KiB
Python

from datetime import datetime
import pytz
import asyncio
import time
from collections import defaultdict, deque
from typing import Dict, Deque
from db import user_token_in_db_func
moscow_tz = pytz.timezone('Europe/Moscow')
async def log(session, user_token, message):
user_token_db = await user_token_in_db_func(session, user_token)
log_message = f"{[datetime.now(moscow_tz).strftime('%Y-%m-%d %H:%M %Z')]} {message}"
user_token_db.logs = await append_line_with_limit(user_token_db.logs, log_message)
session.add(user_token_db)
await session.commit()
await session.refresh(user_token_db)
async def append_line_with_limit(text, new_line, max_lines = 5000) -> str:
"""
Append a new line to the text and maintain a maximum number of lines.
When exceeding the limit, the oldest line is removed.
Args:
text: The existing text
new_line: The line to append
max_lines: Maximum number of lines to keep
Returns:
Updated text with the new line appended
"""
# Split text into lines
lines = text.splitlines() if text else []
# Append the new line
lines.append(new_line)
# If we exceed the maximum lines, remove the oldest (first) line
if len(lines) > max_lines:
lines.pop(0)
# Join lines back together with newlines
return '\n'.join(lines)
class UserRateLimiter:
def __init__(self):
# Store request timestamps for each IP
self.ip_requests: Dict[str, Deque[float]] = defaultdict(deque)
self.lock = asyncio.Lock()
async def is_rate_limited(self, ip: str) -> bool:
"""
Check if user with given IP is making too many requests.
Args:
ip (str): User's IP address
Returns:
bool: True if user makes more than 30 requests per minute, False otherwise
"""
async with self.lock:
current_time = time.time()
# Remove timestamps older than 60 seconds
while self.ip_requests[ip] and current_time - self.ip_requests[ip][0] > 60:
self.ip_requests[ip].popleft()
# Add current request
self.ip_requests[ip].append(current_time)
# Log the IP and request count
request_count = len(self.ip_requests[ip])
print(f"IP: {ip}, Requests in last minute: {request_count}")
# Return True if rate limit exceeded (more than 30 requests per minute)
return request_count > 30
# Global instance
rate_limiter = UserRateLimiter()
async def is_rate_limited(ip: str) -> bool:
return await rate_limiter.is_rate_limited(ip)