Добавление API для получения всех пользователей, доработка call2api

This commit is contained in:
justuser-31 2026-02-05 13:20:21 +03:00
parent 0adc4ea0e7
commit f06de54ee0
2 changed files with 105 additions and 3 deletions

View File

@ -1,5 +1,7 @@
import requests
from config import load_config
def create_account(
api_url: str, api_token: str, username: str, password: str, drive_quota: int
@ -77,6 +79,52 @@ def delete_account(api_url: str, api_token: str, username: str) -> dict:
return {"status_code": -1, "message": f"Request failed: {str(e)}"}
# print(set_quota("http://proxy.del.pw:50020", "test", "test", 1000))
# print(delete_account("http://127.0.0.1:8000", "test", "test"))
# print(create_account("http://127.0.0.1:8000", "test", "test", "test", 999))
def get_all_users(api_url: str, api_token: str) -> dict:
"""
Call the get all users API endpoint.
Args:
api_url: Base URL of the API server (e.g., 'http://localhost:8000')
api_token: Security token for API authentication
Returns:
Dictionary with 'status_code' and 'data' keys.
If successful, 'data' will contain a list of users.
If failed, 'data' will contain an error message.
"""
url = f"{api_url.rstrip('/')}/users"
try:
response = requests.get(url, params={"token": api_token})
if response.status_code == 200:
return {"status_code": response.status_code, "data": response.json()}
else:
return {"status_code": response.status_code, "data": response.text}
except requests.RequestException as e:
return {"status_code": -1, "data": f"Request failed: {str(e)}"}
except ValueError as e:
# Handle case where response isn't valid JSON
return {"status_code": -1, "data": f"Failed to parse response: {str(e)}"}
if __name__ == "__main__":
CONFIG = load_config()
# API_URL = "https://sfs.del.pw"
API_URL = "http://127.0.0.1:8000"
# print(set_quota(API_URL, CONFIG["security"]["api_token"], "sans", 10000))
# print(delete_account(API_URL, CONFIG["security"]["api_token"], "test"))
# print(create_account(API_URL, CONFIG["security"]["api_token"] , "test", "test", 999))
# print(get_all_users(API_URL, CONFIG["security"]["api_token"]))
users = get_all_users(API_URL, CONFIG["security"]["api_token"])["data"]
print(users)
print("-------------------------")
for user in users:
quota = user["quota_mb"]
quota += 100 # +100 MB to each user
print(
set_quota(API_URL, CONFIG["security"]["api_token"], user["username"], quota)
)
print("-------------------------")
print(get_all_users(API_URL, CONFIG["security"]["api_token"]))

View File

@ -1,3 +1,4 @@
import json
import mimetypes
import os
import sqlite3
@ -34,12 +35,27 @@ class FileServerHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
log(format % args)
def parse_params(self):
"""Parse query parameters from the URL"""
parsed_url = urlparse(self.path)
self.params = parse_qs(parsed_url.query)
# Convert lists to single values
for key, value in self.params.items():
if isinstance(value, list) and len(value) == 1:
self.params[key] = value[0]
def send_html(self, content, status=200):
self.send_response(status)
self.send_header("Content-type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(content.encode("utf-8"))
def send_json(self, data, status=200):
self.send_response(status)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(data).encode("utf-8"))
def send_redirect(self, location, status=302):
self.send_response(status)
self.send_header("Location", location)
@ -600,6 +616,42 @@ class FileServerHandler(BaseHTTPRequestHandler):
database.DB_CONN.commit()
self.send_html("<h1>Account deleted</h1>")
def handle_api_get_all_users(self):
if self.command != "GET":
self.send_html("<h1>Method not allowed</h1>", 405)
return
# Parse query parameters
self.parse_params()
# Check for API token in query parameters or headers
token = None
if "token" in self.params:
token = self.params["token"]
else:
token = self.headers.get("Authorization", "").replace("Bearer ", "")
if token != CONFIG["security"]["api_token"]:
self.send_html("<h1>Invalid token</h1>", 403)
return
cursor = database.DB_CONN.cursor()
cursor.execute("SELECT id, username, quota_mb, used_mb FROM users")
rows = cursor.fetchall()
users = []
for row in rows:
users.append(
{
"id": row[0],
"username": row[1],
"quota_mb": row[2],
"used_mb": row[3],
}
)
self.send_json(users)
def handle_static_file(self):
if self.path.startswith("/static/"):
file_path = self.path[1:] # Remove leading slash
@ -666,6 +718,8 @@ class FileServerHandler(BaseHTTPRequestHandler):
self.handle_static_file()
elif path == "/favicon.ico":
self.handle_favicon()
elif path == "/users":
self.handle_api_get_all_users()
else:
self.send_html("<h1>Not Found</h1>", 404)