from fastapi import FastAPI, HTTPException from pydantic import BaseModel from datetime import datetime, timedelta from statistics import median from uuid import uuid4 # Fix 3.3 + 0.15 = 3.4499999999999997 from decimal import Decimal as d def fix_add(one, two): return float(d(str(one)) + d(str(two))) def fix_sub(one, two): return float(d(str(one)) - d(str(two))) from db import * app = FastAPI() def token_check(token): db = read() if token in db['tokens']: return True else: return False def stat_run(cdm_change): # Общий баланс, среднее значение, медиана # Низшее значение, высшее значение # Изменение баланса и time2cdm # Прирост за счёт алмазов, за счёт time2cdm # Получаем все балансы db = read() bals = [] for id in db['id']: bals.append(db['id'][id]['bal']) date = datetime.today().strftime('%Y-%m-%d') stat = read('stat.json') if date in stat: stat[date]['gbal'] += cdm_change else: stat[date] = {'gbal': sum(bals), 'time2cdm': 0} stats = {'gbal': 0, 'average': 0, 'median': 0, 'time2cdm': 0, 'gbal_delta': '0', 'time2cdm_delta': '', 'min': 0, 'max': 0, 'up_diamond': 0} date = datetime.today().strftime('%Y-%m-%d') yesterday = (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d') if yesterday in stat: gbal_y = stat[yesterday]['gbal'] time2cdm_y = stat[yesterday]['time2cdm'] else: gbal_y, time2cdm_y = 0, 0 time2cdm = stat[date]['time2cdm'] gbal = stat[date]['gbal'] # Заполняем данные stats['gbal'] = round(gbal, 3) stats['average'] = round(sum([x for x in bals if x != 0])/len([x for x in bals if x != 0]), 3) stats['median'] = round(median([x for x in bals if x != 0]), 3) stats['time2cdm'] = round(time2cdm, 3) stats['time2cdm_delta'] = round(time2cdm-time2cdm_y, 3) stats['gbal_delta'] = round(gbal-gbal_y, 3) stats['min'] = round(min([x for x in bals if x != 0]), 3) stats['max'] = round(max(bals), 3) stats['up_diamond'] = round(gbal - (gbal_y + time2cdm), 3) stat[date] = stats write(stat, 'stat.json') class User_in_db(BaseModel): token: str id: str = None tg: str = None ds: str = None mine: str = None nick: str = None @app.post('/api/user_in_db/') def user_in_db(it: User_in_db): token, id, tg, ds, mine, nick = it.token, it.id, it.tg, it.ds, it.mine, it.nick if token_check(token): db = read() try: if id and id in db['id']: return id elif tg and tg in db['tg']: return db['tg'][tg] elif ds and ds in db['ds']: return db['ds'][ds] elif mine and mine in db['mine']: return db['mine'][mine] elif nick and nick in db['nick']: return db['nick'][nick] else: return False except: return False else: return 'Error' def gen_id(): db = read() for i in range(1,100000): check = str(i) if check not in db['id']: return str(i) return 'Full?' class User_add(BaseModel): token: str id: str = None tg: str = None ds: str = None mine: str = None nick: str passwd: str @app.post('/api/user_add/') def user_add(it: User_add): token, id, tg, ds, mine, nick, passwd = it.token, it.id, it.tg, it.ds, it.mine, it.nick, it.passwd id = gen_id() if token_check(token): db = read() db['id'][id] = {'tg': tg, 'ds': ds, 'mine': mine, 'nick': nick, 'passwd': passwd , 'bal': 0.0, 'time2cdm': [0, datetime.today().strftime('%Y-%m-%d')]} db['nick'][nick] = id if tg: db['tg'][tg] = id if ds: db['ds'][ds] = id if mine: db['mine'][mine] = id write(db) return 'OK' else: return 'Error' class User_del(BaseModel): token: str id: str @app.post('/api/user_del/') def user_del(it: User_del): token, id = it.token, it.id if token_check(token): db = read() tg, ds, mine, nick = db['id'][id]['tg'], db['id'][id]['ds'], db['id'][id]['mine'], db['id'][id]['nick'] del db['nick'][nick] if tg: del db['tg'][tg] if ds: del db['ds'][ds] if mine: del db['mine'][mine] del db['id'][id] write(db) return 'OK' else: return 'Error' class Coins_add(BaseModel): token: str id: str amount: str @app.post('/api/coins_add/') def coins_add(it: Coins_add): token, id, amount = it.token, it.id, abs(float(it.amount)) if token_check(token): db = read() db['id'][id]['bal'] = fix_add(db['id'][id]['bal'], amount) write(db) stat_run(amount) return 'OK' else: return 'Error' class Coins_del(BaseModel): token: str id: str amount: str @app.post('/api/coins_del/') def coins_del(it: Coins_del): token, id, amount = it.token, it.id, abs(float(it.amount)) if token_check(token): db = read() if db['id'][id]['bal'] >= amount: db['id'][id]['bal'] = fix_sub(db['id'][id]['bal'], amount) write(db) stat_run(amount*-1) return 'OK' else: return 'Error' else: return 'Error' class Coins_transfer(BaseModel): token: str src_id: str dst_id: str amount: str @app.post('/api/coins_transfer/') def coins_transfer(it: Coins_transfer): token, src_id, dst_id, amount = it.token, it.src_id, it.dst_id, float(it.amount) if token_check(token): db = read() amount = abs(amount) # Защита от отриц. чисел src_bal = db['id'][src_id]['bal'] # Больше баланса и количество цифр после запятой <= 3 if src_bal >= amount and len(str(amount).split('.')[1]) <= 3: # and amount > 0.0001: db['id'][src_id]['bal'] = fix_sub(db['id'][src_id]['bal'], amount) db['id'][dst_id]['bal'] = fix_add(db['id'][dst_id]['bal'], amount) write(db) return 'OK' else: return 'No_money' else: return 'Error' class Update_tg(BaseModel): token: str id: str tg: str @app.post('/api/update_tg/') def update_tg(it: Update_tg): token, id, tg = it.token, it.id, it.tg if token_check(token): db = read() cur_tg = db['id'][id]['tg'] try: del db['tg'][cur_tg] except: pass if tg == 'None': db['id'][id]['tg'] = None else: db['id'][id]['tg'] = tg db['tg'][tg] = id write(db) return 'OK' else: return 'Error' class Update_ds(BaseModel): token: str id: str ds: str @app.post('/api/update_ds/') def update_ds(it: Update_ds): token, id, ds = it.token, it.id, it.ds if token_check(token): db = read() cur_ds = db['id'][id]['ds'] try: del db['ds'][cur_ds] except: pass if ds == 'None': db['id'][id]['ds'] = None else: db['id'][id]['ds'] = ds db['ds'][ds] = id write(db) return 'OK' else: return 'Error' class Update_mine(BaseModel): token: str id: str mine: str @app.post('/api/update_mine/') def update_mine(it: Update_mine): token, id, mine = it.token, it.id, it.mine if token_check(token): db = read() try: cur_mine = db['id'][id]['mine'] del db['mine'][cur_mine] except: pass if mine == 'None': db['id'][id]['mine'] = None else: db['id'][id]['mine'] = mine db['mine'][mine] = id write(db) return 'OK' else: return 'Error' class Update_nick(BaseModel): token: str id: str nick: str @app.post('/api/update_nick/') def update_nick(it: Update_nick): token, id, nick = it.token, it.id, it.nick if token_check(token): db = read() cur_nick = db['id'][id]['nick'] del db['nick'][cur_nick] db['id'][id]['nick'] = nick db['nick'][nick] = id write(db) return 'OK' else: return 'Error' class Update_passwd(BaseModel): token: str id: str passwd: str @app.post('/api/update_passwd/') def update_passwd(it: Update_passwd): token, id, passwd = it.token, it.id, it.passwd if token_check(token): db = read() db['id'][id]['passwd'] = passwd write(db) return 'OK' else: return 'Error' class Add_time(BaseModel): token: str id: str time: str @app.post('/api/add_time/') def add_time(it: Add_time): token, id, time = it.token, it.id, int(it.time) if token_check(token): stat_run(0) course = read('conf.json')['time2cdm'] amount = time*course # Пополнение баланса db = read() db['id'][id]['bal'] += amount # Статистика date = datetime.today().strftime('%Y-%m-%d') # Для пользователя t2c_date = db['id'][id]['time2cdm'][1] if t2c_date != date: db['id'][id]['time2cdm'][0] = amount db['id'][id]['time2cdm'][1] = date else: db['id'][id]['time2cdm'][0] += amount write(db) # Глобально stat = read('stat.json') stat[date]['time2cdm'] += amount write(stat, 'stat.json') return 'OK' else: return 'Error' class Check_bal(BaseModel): token: str id: str @app.post('/api/check_bal/') def check_bal(it: Check_bal): token, id = it.token, it.id if token_check(token): stat_run(0) db = read() # Если дата time2cdm прошла - обнулить баланс date = datetime.today().strftime('%Y-%m-%d') t2c_date = db['id'][id]['time2cdm'][1] if t2c_date != date: db['id'][id]['time2cdm'][0] = 0 db['id'][id]['time2cdm'][1] = date write(db) return db['id'][id]['bal'] else: return 'Error' class Get_nick(BaseModel): token: str id: str @app.post('/api/get_nick/') def get_nick(it: Get_nick): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['nick'] else: return 'Error' class Get_tg(BaseModel): token: str id: str @app.post('/api/get_tg/') def get_tg(it: Get_tg): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['tg'] else: return 'Error' class Get_ds(BaseModel): token: str id: str @app.post('/api/get_ds/') def get_ds(it: Get_ds): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['ds'] else: return 'Error' class Get_mine(BaseModel): token: str id: str @app.post('/api/get_mine/') def get_mine(it: Get_mine): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['mine'] else: return 'Error' class Get_passwd(BaseModel): token: str id: str @app.post('/api/get_passwd/') def get_passwd(it: Get_passwd): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['passwd'] else: return 'Error' class Get_time2cdm(BaseModel): token: str id: str @app.post('/api/get_time2cdm/') def get_time(it: Get_time2cdm): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['time2cdm'][0] else: return 'Error' class Get_stat(BaseModel): token: str @app.post('/api/get_stat/') def get_stat(it: Get_stat): token = it.token if token_check(token): stat_run(0) db = read('stat.json') date = datetime.today().strftime('%Y-%m-%d') stats = db[date] return stats else: return 'Error' class Token_gen(BaseModel): token: str id: str @app.post('/api/token_gen/') def token_gen(it: Token_gen): token, id = it.token, it.id if token_check(token): user_token = str(uuid4()) user_api = read('user_api.json') user_api['tokens'][id] = user_token write(user_api, 'user_api.json') return user_token else: return 'Error' if __name__ == '__main__': import uvicorn uvicorn.run(app, host='0.0.0.0', port=7001)