CryptoDM/api.py

567 lines
14 KiB
Python

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
from datetime import datetime, timedelta
from statistics import median
from uuid import uuid4
from random import randint
from time import sleep
# Fix 3.3 + 0.15 = 3.4499999999999997
from decimal import Decimal as d
def fix_add(one, two):
return float(d(str(one)) + d(str(two)))
def fix_sub(one, two):
return float(d(str(one)) - d(str(two)))
from db import *
app = FastAPI()
def token_check(token):
db = read()
if token in db['tokens']:
return True
else:
return False
############## STATS ##################
STAT_RUN = False
def stat_run(cdm_change):
# Общий баланс, среднее значение, медиана
# Низшее значение, высшее значение
# Изменение баланса и time2cdm
# Прирост за счёт алмазов, за счёт time2cdm
# Защита от конфликтов
global STAT_RUN
while STAT_RUN:
sleep(1)
STAT_RUN = True
# Получаем все балансы
db = read()
bals = []
for id in db['id']:
bals.append(db['id'][id]['bal'])
date = datetime.today().strftime('%Y-%m-%d')
stat = read('stat.json')
if date in stat:
stat[date]['gbal'] += cdm_change
else:
stat[date] = {'gbal': sum(bals), 'time2cdm': 0}
stats = {'gbal': 0, 'average': 0, 'median': 0, 'time2cdm': 0,
'gbal_delta': '0', 'time2cdm_delta': '',
'min': 0, 'max': 0,
'up_diamond': 0}
date = datetime.today().strftime('%Y-%m-%d')
yesterday = (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d')
if yesterday in stat:
gbal_y = stat[yesterday]['gbal']
time2cdm_y = stat[yesterday]['time2cdm']
else:
gbal_y, time2cdm_y = 0, 0
time2cdm = stat[date]['time2cdm']
gbal = stat[date]['gbal']
# Заполняем данные
stats['gbal'] = round(gbal, 3)
stats['average'] = round(sum([x for x in bals if x != 0])/len([x for x in bals if x != 0]), 3)
stats['median'] = round(median([x for x in bals if x != 0]), 3)
stats['time2cdm'] = round(time2cdm, 3)
stats['time2cdm_delta'] = round(time2cdm-time2cdm_y, 3)
stats['gbal_delta'] = round(gbal-gbal_y, 3)
stats['min'] = round(min([x for x in bals if x != 0]), 3)
stats['max'] = round(max(bals), 3)
stats['up_diamond'] = round(gbal - (gbal_y + time2cdm))
stat[date] = stats
write(stat, 'stat.json')
STAT_RUN = False
#######################
class User_in_db(BaseModel):
token: str
id: str = None
tg: str = None
ds: str = None
mine: str = None
nick: str = None
@app.post('/api/user_in_db/')
def user_in_db(it: User_in_db):
token, id, tg, ds, mine, nick = it.token, it.id, it.tg, it.ds, it.mine, it.nick
if token_check(token):
db = read()
try:
if id and id in db['id']:
return id
elif tg and tg in db['tg']:
return db['tg'][tg]
elif ds and ds in db['ds']:
return db['ds'][ds]
elif mine and mine in db['mine']:
return db['mine'][mine]
elif nick and nick in db['nick']:
return db['nick'][nick]
else:
return False
except:
return False
else:
return 'Error'
def gen_id():
db = read()
for i in range(1,100000):
check = str(i)
if check not in db['id']:
return str(i)
return 'Full?'
class Add_user(BaseModel):
token: str
id: str = None
tg: str = None
ds: str = None
mine: str = None
nick: str
passwd: str
@app.post('/api/add_user/')
def add_user(it: Add_user):
token, id, tg, ds, mine, nick, passwd = it.token, it.id, it.tg, it.ds, it.mine, it.nick, it.passwd
id = gen_id()
if token_check(token):
db = read()
db['id'][id] = {'tg': tg, 'ds': ds, 'mine': mine, 'nick': nick, 'passwd': passwd
, 'bal': 0.0, 'time2cdm': [0, datetime.today().strftime('%Y-%m-%d')]}
db['nick'][nick] = id
if tg:
db['tg'][tg] = id
if ds:
db['ds'][ds] = id
if mine:
db['mine'][mine] = id
write(db)
return 'OK'
else:
return 'Error'
class Del_user(BaseModel):
token: str
id: str
@app.post('/api/del_user/')
def del_user(it: Del_user):
token, id = it.token, it.id
if token_check(token):
db = read()
tg, ds, mine, nick = db['id'][id]['tg'], db['id'][id]['ds'], db['id'][id]['mine'], db['id'][id]['nick']
del db['nick'][nick]
if tg:
del db['tg'][tg]
if ds:
del db['ds'][ds]
if mine:
del db['mine'][mine]
del db['id'][id]
write(db)
return 'OK'
else:
return 'Error'
class Add_coins(BaseModel):
token: str
id: str
amount: str
@app.post('/api/add_coins/')
def add_coins(it: Add_coins):
token, id, amount = it.token, it.id, abs(float(it.amount))
if token_check(token):
db = read()
db['id'][id]['bal'] = fix_add(db['id'][id]['bal'], amount)
write(db)
stat_run(amount)
return 'OK'
else:
return 'Error'
class Del_coins(BaseModel):
token: str
id: str
amount: str
@app.post('/api/del_coins/')
def del_coins(it: Del_coins):
token, id, amount = it.token, it.id, abs(float(it.amount))
if token_check(token):
db = read()
if db['id'][id]['bal'] >= amount:
db['id'][id]['bal'] = fix_sub(db['id'][id]['bal'], amount)
write(db)
stat_run(amount*-1)
return 'OK'
else:
return 'Error'
else:
return 'Error'
class Transfer_coins(BaseModel):
token: str
src_id: str
dst_id: str
amount: str
@app.post('/api/transfer_coins/')
def transfer_coins(it: Transfer_coins):
token, src_id, dst_id, amount = it.token, it.src_id, it.dst_id, float(it.amount)
if token_check(token):
db = read()
amount = abs(amount) # Защита от отриц. чисел
src_bal = db['id'][src_id]['bal']
# Больше баланса и количество цифр после запятой <= 3
if src_bal >= amount and len(str(amount).split('.')[1]) <= 3: # and amount > 0.0001:
db['id'][src_id]['bal'] = fix_sub(db['id'][src_id]['bal'], amount)
db['id'][dst_id]['bal'] = fix_add(db['id'][dst_id]['bal'], amount)
write(db)
return 'OK'
else:
return 'No_money'
else:
return 'Error'
class Update_tg(BaseModel):
token: str
id: str
tg: str
@app.post('/api/update_tg/')
def update_tg(it: Update_tg):
token, id, tg = it.token, it.id, it.tg
if token_check(token):
db = read()
cur_tg = db['id'][id]['tg']
try:
del db['tg'][cur_tg]
except:
pass
if tg == 'None':
db['id'][id]['tg'] = None
else:
db['id'][id]['tg'] = tg
db['tg'][tg] = id
write(db)
return 'OK'
else:
return 'Error'
class Update_ds(BaseModel):
token: str
id: str
ds: str
@app.post('/api/update_ds/')
def update_ds(it: Update_ds):
token, id, ds = it.token, it.id, it.ds
if token_check(token):
db = read()
cur_ds = db['id'][id]['ds']
try:
del db['ds'][cur_ds]
except:
pass
if ds == 'None':
db['id'][id]['ds'] = None
else:
db['id'][id]['ds'] = ds
db['ds'][ds] = id
write(db)
return 'OK'
else:
return 'Error'
class Update_mine(BaseModel):
token: str
id: str
mine: str
@app.post('/api/update_mine/')
def update_mine(it: Update_mine):
token, id, mine = it.token, it.id, it.mine
if token_check(token):
db = read()
try:
cur_mine = db['id'][id]['mine']
del db['mine'][cur_mine]
except:
pass
if mine == 'None':
db['id'][id]['mine'] = None
else:
db['id'][id]['mine'] = mine
db['mine'][mine] = id
write(db)
return 'OK'
else:
return 'Error'
class Update_nick(BaseModel):
token: str
id: str
nick: str
@app.post('/api/update_nick/')
def update_nick(it: Update_nick):
token, id, nick = it.token, it.id, it.nick
if token_check(token):
db = read()
cur_nick = db['id'][id]['nick']
del db['nick'][cur_nick]
db['id'][id]['nick'] = nick
db['nick'][nick] = id
write(db)
return 'OK'
else:
return 'Error'
class Update_passwd(BaseModel):
token: str
id: str
passwd: str
@app.post('/api/update_passwd/')
def update_passwd(it: Update_passwd):
token, id, passwd = it.token, it.id, it.passwd
if token_check(token):
db = read()
db['id'][id]['passwd'] = passwd
write(db)
return 'OK'
else:
return 'Error'
class Add_time(BaseModel):
token: str
id: str
time: str
@app.post('/api/add_time/')
def add_time(it: Add_time):
token, id, time = it.token, it.id, int(it.time)
if token_check(token):
course = read('conf.json')['time2cdm']
amount = time*course
# Пополнение баланса
db = read()
db['id'][id]['bal'] += amount
# Статистика
date = datetime.today().strftime('%Y-%m-%d')
# Для пользователя
t2c_date = db['id'][id]['time2cdm'][1]
if t2c_date != date:
db['id'][id]['time2cdm'][0] = amount
db['id'][id]['time2cdm'][1] = date
else:
db['id'][id]['time2cdm'][0] += amount
write(db)
# Глобально
stat = read('stat.json')
stat[date]['time2cdm'] += amount
write(stat, 'stat.json')
stat_run(0)
return 'OK'
else:
return 'Error'
class Check_bal(BaseModel):
token: str
id: str
@app.post('/api/check_bal/')
def check_bal(it: Check_bal):
token, id = it.token, it.id
if token_check(token):
stat_run(0)
db = read()
# Если дата time2cdm прошла - обнулить баланс
date = datetime.today().strftime('%Y-%m-%d')
t2c_date = db['id'][id]['time2cdm'][1]
if t2c_date != date:
db['id'][id]['time2cdm'][0] = 0
db['id'][id]['time2cdm'][1] = date
write(db)
return db['id'][id]['bal']
else:
return 'Error'
class Get_nick(BaseModel):
token: str
id: str
@app.post('/api/get_nick/')
def get_nick(it: Get_nick):
token, id = it.token, it.id
if token_check(token):
db = read()
return db['id'][id]['nick']
else:
return 'Error'
class Get_tg(BaseModel):
token: str
id: str
@app.post('/api/get_tg/')
def get_tg(it: Get_tg):
token, id = it.token, it.id
if token_check(token):
db = read()
return db['id'][id]['tg']
else:
return 'Error'
class Get_ds(BaseModel):
token: str
id: str
@app.post('/api/get_ds/')
def get_ds(it: Get_ds):
token, id = it.token, it.id
if token_check(token):
db = read()
return db['id'][id]['ds']
else:
return 'Error'
class Get_mine(BaseModel):
token: str
id: str
@app.post('/api/get_mine/')
def get_mine(it: Get_mine):
token, id = it.token, it.id
if token_check(token):
db = read()
return db['id'][id]['mine']
else:
return 'Error'
class Get_passwd(BaseModel):
token: str
id: str
@app.post('/api/get_passwd/')
def get_passwd(it: Get_passwd):
token, id = it.token, it.id
if token_check(token):
db = read()
return db['id'][id]['passwd']
else:
return 'Error'
class Get_time2cdm(BaseModel):
token: str
id: str
@app.post('/api/get_time2cdm/')
def get_time(it: Get_time2cdm):
token, id = it.token, it.id
if token_check(token):
db = read()
return db['id'][id]['time2cdm'][0]
else:
return 'Error'
class Get_stat(BaseModel):
token: str
date: Optional[str] = None
@app.post('/api/get_stat/')
def get_stat(it: Get_stat):
token, date = it.token, it.date
if not date:
date = datetime.today().strftime('%Y-%m-%d')
if token_check(token):
stat_run(0)
db = read('stat.json')
#date = datetime.today().strftime('%Y-%m-%d')
if date not in db:
return 'Not found'
stats = db[date]
return stats
else:
return 'Error'
############# USER API ################
class Gen_token(BaseModel):
token: str
id: str
@app.post('/api/gen_token/')
def gen_token(it: Gen_token):
token, id = it.token, it.id
if token_check(token):
user_token = str(uuid4())
user_api = read('user_api.json')
user_api['tokens'][id] = user_token
write(user_api, 'user_api.json')
return user_token
else:
return 'Error'
class List_fp(BaseModel):
token: str
id: str
@app.post('/api/list_fp/')
def list_fp(it: List_fp):
token, id = it.token, it.id
if token_check(token):
user_api = read('user_api.json')
if id not in user_api['fp']:
user_api['fp'][id] = []
write(user_api, 'user_api.json')
return user_api['fp'][id]
else:
return 'Error'
def gen_fp_id(user_api):
ok = False
while not ok:
ok = True
fp_id = str(randint(5236, 645862))
if fp_id in user_api['fp']:
ok = False
return fp_id
class Gen_fp(BaseModel):
token: str
id: str
amount: str
@app.post('/api/gen_fp/')
def gen_fp(it: Gen_fp):
token, id, amount = it.token, it.id, it.amount
if token_check(token):
user_api = read('user_api.json')
if id not in user_api['fp']:
user_api['fp'][id] = []
write(user_api, 'user_api.json')
if len(user_api['fp'][id]) >= 5:
return 'Limit'
fp_id = gen_fp_id(user_api)
user_api['fp'][fp_id] = [id, amount]
user_api['fp'][id].append(fp_id)
write(user_api, 'user_api.json')
return fp_id
else:
return 'Error'
class Del_fp(BaseModel):
token: str
fp_id: str
@app.post('/api/del_fp/')
def del_fp(it: Del_fp):
token, fp_id = it.token, it.fp_id
if token_check(token):
user_api = read('user_api.json')
id = user_api['fp'][fp_id][0]
del user_api['fp'][fp_id]
user_api['fp'][id].remove(fp_id)
write(user_api, 'user_api.json')
return 'OK'
else:
return 'Error'
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host='0.0.0.0', port=7001)