181 lines
5.5 KiB
Python
181 lines
5.5 KiB
Python
from fastapi import FastAPI, Request, HTTPException
|
|
from pydantic import BaseModel
|
|
from typing import Optional
|
|
from time import time
|
|
|
|
# Отключение логирования для уменьшения нагрузки
|
|
import logging
|
|
#logging.disable(logging.CRITICAL)
|
|
|
|
# Fix 3.3 + 0.15 = 3.4499999999999997
|
|
from decimal import Decimal as d
|
|
def fix_add(one, two):
|
|
return float(d(str(one)) + d(str(two)))
|
|
def fix_sub(one, two):
|
|
return float(d(str(one)) - d(str(two)))
|
|
|
|
from db import *
|
|
from call2api import *
|
|
|
|
app = FastAPI()
|
|
API_TOKEN = read('conf.json')['api_token']
|
|
|
|
def check_token(nick, token):
|
|
db = read('user_api.json')
|
|
id = user_in_db(API_TOKEN, nick=nick)
|
|
if id != 'false' and token == db['tokens'][id]:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
# Анти-DDoS
|
|
# Случайные тексты
|
|
from faker import Faker
|
|
from random import choice, randint
|
|
FAKE_TEXTS = [Faker().text(max_nb_chars=100) for _ in range(100)]
|
|
# Проверка на частоту обращений
|
|
LAST_REQUESTS = {}
|
|
def too_fast(request):
|
|
ip = request.client.host
|
|
now = time()
|
|
if ip in LAST_REQUESTS and (now - LAST_REQUESTS[ip]) < 0.1:
|
|
return True
|
|
LAST_REQUESTS[ip] = time()
|
|
return False
|
|
|
|
class Check_token_user(BaseModel):
|
|
nick: str
|
|
token: str
|
|
@app.post('/api/check_token/')
|
|
def check_token_user(request: Request, it: Check_token_user):
|
|
if too_fast(request):
|
|
raise HTTPException(status_code=randint(100,999), detail=f"{choice(FAKE_TEXTS)}")
|
|
nick, token = it.nick, it.token
|
|
if check_token(nick, token):
|
|
return 'OK'
|
|
else:
|
|
return 'Error'
|
|
|
|
class Check_bal_user(BaseModel):
|
|
nick: str
|
|
token: str
|
|
@app.post('/api/check_bal/')
|
|
def check_bal_user(request: Request, it: Check_bal_user):
|
|
if too_fast(request):
|
|
raise HTTPException(status_code=randint(100,999), detail=f"{choice(FAKE_TEXTS)}")
|
|
nick, token = it.nick, it.token
|
|
if check_token(nick, token):
|
|
id = user_in_db(API_TOKEN, nick=nick)
|
|
return check_bal(API_TOKEN, id)
|
|
else:
|
|
return 'Error'
|
|
|
|
class Get_time2cdm_user(BaseModel):
|
|
nick: str
|
|
token: str
|
|
@app.post('/api/get_time2cdm/')
|
|
def get_time_user(request: Request, it: Get_time2cdm_user):
|
|
if too_fast(request):
|
|
raise HTTPException(status_code=randint(100,999), detail=f"{choice(FAKE_TEXTS)}")
|
|
nick, token = it.nick, it.token
|
|
if check_token(nick, token):
|
|
id = user_in_db(API_TOKEN, nick=nick)
|
|
return get_time2cdm(API_TOKEN, id)
|
|
else:
|
|
return 'Error'
|
|
|
|
class Get_stat_user(BaseModel):
|
|
nick: str
|
|
token: str
|
|
date: Optional[str] = None
|
|
@app.post('/api/get_stat/')
|
|
def get_stat_user(request: Request, it: Get_stat_user):
|
|
if too_fast(request):
|
|
raise HTTPException(status_code=randint(100,999), detail=f"{choice(FAKE_TEXTS)}")
|
|
nick, token, date = it.nick, it.token, it.date
|
|
if check_token(nick, token):
|
|
if date != None:
|
|
return get_stat(API_TOKEN, date)
|
|
else:
|
|
return get_stat(API_TOKEN)
|
|
else:
|
|
return 'Error'
|
|
|
|
class Transfer_coins_user(BaseModel):
|
|
nick: str
|
|
token: str
|
|
dst_nick: str
|
|
amount: str
|
|
@app.post('/api/transfer_coins/')
|
|
def transfer_coins_user(request: Request, it: Transfer_coins_user):
|
|
if too_fast(request):
|
|
raise HTTPException(status_code=randint(100,999), detail=f"{choice(FAKE_TEXTS)}")
|
|
nick, token, dst_nick, amount = it.nick, it.token, it.dst_nick, str(float(it.amount))
|
|
if check_token(nick, token):
|
|
id = user_in_db(API_TOKEN, nick=nick)
|
|
dst_id = user_in_db(API_TOKEN, nick=dst_nick)
|
|
if dst_id == 'false':
|
|
return 'Error'
|
|
if transfer_coins(API_TOKEN, id, dst_id, amount) == 'OK':
|
|
tg_dst = get_tg(API_TOKEN, dst_id)
|
|
if tg_dst != 'null':
|
|
transfer_callback('http://127.0.0.1:7002/', API_TOKEN, nick, dst_nick, amount)
|
|
return 'OK'
|
|
else:
|
|
return 'Error'
|
|
|
|
class Gen_fp_user(BaseModel):
|
|
nick: str
|
|
token: str
|
|
amount: str
|
|
chat_id: str
|
|
@app.post('/api/gen_fp/')
|
|
def gen_fp_user(request: Request, it: Gen_fp_user):
|
|
if too_fast(request):
|
|
raise HTTPException(status_code=randint(100,999), detail=f"{choice(FAKE_TEXTS)}")
|
|
nick, token, amount, chat_id = it.nick, it.token, it.amount, it.chat_id
|
|
if check_token(nick, token):
|
|
id = user_in_db(API_TOKEN, nick=nick)
|
|
fp_id = gen_fp(API_TOKEN, id, amount)
|
|
if fp_id == 'Error':
|
|
return 'Error'
|
|
elif fp_id == 'Limit':
|
|
return 'Limit'
|
|
else:
|
|
return gen_fp_mess(token, nick, amount, chat_id, fp_id)
|
|
|
|
else:
|
|
return 'Error'
|
|
|
|
class List_fp_user(BaseModel):
|
|
nick: str
|
|
token: str
|
|
@app.post('/api/list_fp/')
|
|
def list_fp_user(request: Request, it: List_fp_user):
|
|
if too_fast(request):
|
|
raise HTTPException(status_code=randint(100,999), detail=f"{choice(FAKE_TEXTS)}")
|
|
nick, token = it.nick, it.token
|
|
if check_token(nick, token):
|
|
id = user_in_db(API_TOKEN, nick=nick)
|
|
return list_fp(API_TOKEN, id)
|
|
else:
|
|
return 'Error'
|
|
|
|
class Del_fp_user(BaseModel):
|
|
nick: str
|
|
token: str
|
|
fp_id: str
|
|
@app.post('/api/del_fp_user/')
|
|
def del_fp_user(request: Request, it: Del_fp_user):
|
|
if too_fast(request):
|
|
raise HTTPException(status_code=randint(100,999), detail=f"{choice(FAKE_TEXTS)}")
|
|
nick, token, fp_id = it.nick, it.token, it.fp_id
|
|
if check_token(nick, token):
|
|
return del_fp(API_TOKEN, fp_id)
|
|
else:
|
|
return 'Error'
|
|
|
|
if __name__ == '__main__':
|
|
import uvicorn
|
|
uvicorn.run(app, host='0.0.0.0', port=7010)
|