simpliest_fs/telegram_bot/call2api.py

83 lines
2.7 KiB
Python

import requests
def create_account(
api_url: str, api_token: str, username: str, password: str, drive_quota: int
) -> dict:
"""
Call the create account API endpoint.
Args:
api_url: Base URL of the API server (e.g., 'http://localhost:8000')
api_token: Security token for API authentication
username: Username for the new account
password: Password for the new account
drive_quota: Storage quota in MB
Returns:
Dictionary with 'status_code' and 'message' keys
"""
url = f"{api_url.rstrip('/')}/create_account"
data = {
"token": api_token,
"username": username,
"password": password,
"drive_quota": str(drive_quota),
}
try:
response = requests.post(url, data=data)
return {"status_code": response.status_code, "message": response.text}
except requests.RequestException as e:
return {"status_code": -1, "message": f"Request failed: {str(e)}"}
def set_quota(api_url: str, api_token: str, username: str, drive_quota: int) -> dict:
"""
Call the set quota API endpoint.
Args:
api_url: Base URL of the API server (e.g., 'http://localhost:8000')
api_token: Security token for API authentication
username: Username whose quota should be updated
drive_quota: New storage quota in MB
Returns:
Dictionary with 'status_code' and 'message' keys
"""
url = f"{api_url.rstrip('/')}/set_quota"
data = {"token": api_token, "username": username, "drive_quota": str(drive_quota)}
try:
response = requests.post(url, data=data)
return {"status_code": response.status_code, "message": response.text}
except requests.RequestException as e:
return {"status_code": -1, "message": f"Request failed: {str(e)}"}
def delete_account(api_url: str, api_token: str, username: str) -> dict:
"""
Call the delete account API endpoint.
Args:
api_url: Base URL of the API server (e.g., 'http://localhost:8000')
api_token: Security token for API authentication
username: Username of the account to delete
Returns:
Dictionary with 'status_code' and 'message' keys
"""
url = f"{api_url.rstrip('/')}/delete_account"
data = {"token": api_token, "username": username}
try:
response = requests.post(url, data=data)
return {"status_code": response.status_code, "message": response.text}
except requests.RequestException as e:
return {"status_code": -1, "message": f"Request failed: {str(e)}"}
# print(set_quota("http://proxy.del.pw:50020", "test", "test", 1000))
# print(delete_account("http://127.0.0.1:8000", "test", "test"))
# print(create_account("http://127.0.0.1:8000", "test", "test", "test", 999))