from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Optional from datetime import datetime, timedelta from statistics import median from uuid import uuid4 from random import randint # Fix 3.3 + 0.15 = 3.4499999999999997 from decimal import Decimal as d def fix_add(one, two): return float(d(str(one)) + d(str(two))) def fix_sub(one, two): return float(d(str(one)) - d(str(two))) from db import * app = FastAPI() def token_check(token): db = read() if token in db['tokens']: return True else: return False def stat_run(cdm_change): # Общий баланс, среднее значение, медиана # Низшее значение, высшее значение # Изменение баланса и time2cdm # Прирост за счёт алмазов, за счёт time2cdm # Получаем все балансы db = read() bals = [] for id in db['id']: bals.append(db['id'][id]['bal']) date = datetime.today().strftime('%Y-%m-%d') stat = read('stat.json') if date in stat: stat[date]['gbal'] += cdm_change else: stat[date] = {'gbal': sum(bals), 'time2cdm': 0} stats = {'gbal': 0, 'average': 0, 'median': 0, 'time2cdm': 0, 'gbal_delta': '0', 'time2cdm_delta': '', 'min': 0, 'max': 0, 'up_diamond': 0} date = datetime.today().strftime('%Y-%m-%d') yesterday = (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d') if yesterday in stat: gbal_y = stat[yesterday]['gbal'] time2cdm_y = stat[yesterday]['time2cdm'] else: gbal_y, time2cdm_y = 0, 0 time2cdm = stat[date]['time2cdm'] gbal = stat[date]['gbal'] # Заполняем данные stats['gbal'] = round(gbal, 3) stats['average'] = round(sum([x for x in bals if x != 0])/len([x for x in bals if x != 0]), 3) stats['median'] = round(median([x for x in bals if x != 0]), 3) stats['time2cdm'] = round(time2cdm, 3) stats['time2cdm_delta'] = round(time2cdm-time2cdm_y, 3) stats['gbal_delta'] = round(gbal-gbal_y, 3) stats['min'] = round(min([x for x in bals if x != 0]), 3) stats['max'] = round(max(bals), 3) stats['up_diamond'] = round(gbal - (gbal_y + time2cdm), 3) stat[date] = stats write(stat, 'stat.json') class User_in_db(BaseModel): token: str id: str = None tg: str = None ds: str = None mine: str = None nick: str = None @app.post('/api/user_in_db/') def user_in_db(it: User_in_db): token, id, tg, ds, mine, nick = it.token, it.id, it.tg, it.ds, it.mine, it.nick if token_check(token): db = read() try: if id and id in db['id']: return id elif tg and tg in db['tg']: return db['tg'][tg] elif ds and ds in db['ds']: return db['ds'][ds] elif mine and mine in db['mine']: return db['mine'][mine] elif nick and nick in db['nick']: return db['nick'][nick] else: return False except: return False else: return 'Error' def gen_id(): db = read() for i in range(1,100000): check = str(i) if check not in db['id']: return str(i) return 'Full?' class Add_user(BaseModel): token: str id: str = None tg: str = None ds: str = None mine: str = None nick: str passwd: str @app.post('/api/add_user/') def add_user(it: Add_user): token, id, tg, ds, mine, nick, passwd = it.token, it.id, it.tg, it.ds, it.mine, it.nick, it.passwd id = gen_id() if token_check(token): db = read() db['id'][id] = {'tg': tg, 'ds': ds, 'mine': mine, 'nick': nick, 'passwd': passwd , 'bal': 0.0, 'time2cdm': [0, datetime.today().strftime('%Y-%m-%d')]} db['nick'][nick] = id if tg: db['tg'][tg] = id if ds: db['ds'][ds] = id if mine: db['mine'][mine] = id write(db) return 'OK' else: return 'Error' class Del_user(BaseModel): token: str id: str @app.post('/api/del_user/') def del_user(it: Del_user): token, id = it.token, it.id if token_check(token): db = read() tg, ds, mine, nick = db['id'][id]['tg'], db['id'][id]['ds'], db['id'][id]['mine'], db['id'][id]['nick'] del db['nick'][nick] if tg: del db['tg'][tg] if ds: del db['ds'][ds] if mine: del db['mine'][mine] del db['id'][id] write(db) return 'OK' else: return 'Error' class Add_coins(BaseModel): token: str id: str amount: str @app.post('/api/add_coins/') def add_coins(it: Add_coins): token, id, amount = it.token, it.id, abs(float(it.amount)) if token_check(token): db = read() db['id'][id]['bal'] = fix_add(db['id'][id]['bal'], amount) write(db) stat_run(amount) return 'OK' else: return 'Error' class Del_coins(BaseModel): token: str id: str amount: str @app.post('/api/del_coins/') def del_coins(it: Del_coins): token, id, amount = it.token, it.id, abs(float(it.amount)) if token_check(token): db = read() if db['id'][id]['bal'] >= amount: db['id'][id]['bal'] = fix_sub(db['id'][id]['bal'], amount) write(db) stat_run(amount*-1) return 'OK' else: return 'Error' else: return 'Error' class Transfer_coins(BaseModel): token: str src_id: str dst_id: str amount: str @app.post('/api/transfer_coins/') def transfer_coins(it: Transfer_coins): token, src_id, dst_id, amount = it.token, it.src_id, it.dst_id, float(it.amount) if token_check(token): db = read() amount = abs(amount) # Защита от отриц. чисел src_bal = db['id'][src_id]['bal'] # Больше баланса и количество цифр после запятой <= 3 if src_bal >= amount and len(str(amount).split('.')[1]) <= 3: # and amount > 0.0001: db['id'][src_id]['bal'] = fix_sub(db['id'][src_id]['bal'], amount) db['id'][dst_id]['bal'] = fix_add(db['id'][dst_id]['bal'], amount) write(db) return 'OK' else: return 'No_money' else: return 'Error' class Update_tg(BaseModel): token: str id: str tg: str @app.post('/api/update_tg/') def update_tg(it: Update_tg): token, id, tg = it.token, it.id, it.tg if token_check(token): db = read() cur_tg = db['id'][id]['tg'] try: del db['tg'][cur_tg] except: pass if tg == 'None': db['id'][id]['tg'] = None else: db['id'][id]['tg'] = tg db['tg'][tg] = id write(db) return 'OK' else: return 'Error' class Update_ds(BaseModel): token: str id: str ds: str @app.post('/api/update_ds/') def update_ds(it: Update_ds): token, id, ds = it.token, it.id, it.ds if token_check(token): db = read() cur_ds = db['id'][id]['ds'] try: del db['ds'][cur_ds] except: pass if ds == 'None': db['id'][id]['ds'] = None else: db['id'][id]['ds'] = ds db['ds'][ds] = id write(db) return 'OK' else: return 'Error' class Update_mine(BaseModel): token: str id: str mine: str @app.post('/api/update_mine/') def update_mine(it: Update_mine): token, id, mine = it.token, it.id, it.mine if token_check(token): db = read() try: cur_mine = db['id'][id]['mine'] del db['mine'][cur_mine] except: pass if mine == 'None': db['id'][id]['mine'] = None else: db['id'][id]['mine'] = mine db['mine'][mine] = id write(db) return 'OK' else: return 'Error' class Update_nick(BaseModel): token: str id: str nick: str @app.post('/api/update_nick/') def update_nick(it: Update_nick): token, id, nick = it.token, it.id, it.nick if token_check(token): db = read() cur_nick = db['id'][id]['nick'] del db['nick'][cur_nick] db['id'][id]['nick'] = nick db['nick'][nick] = id write(db) return 'OK' else: return 'Error' class Update_passwd(BaseModel): token: str id: str passwd: str @app.post('/api/update_passwd/') def update_passwd(it: Update_passwd): token, id, passwd = it.token, it.id, it.passwd if token_check(token): db = read() db['id'][id]['passwd'] = passwd write(db) return 'OK' else: return 'Error' class Add_time(BaseModel): token: str id: str time: str @app.post('/api/add_time/') def add_time(it: Add_time): token, id, time = it.token, it.id, int(it.time) if token_check(token): stat_run(0) course = read('conf.json')['time2cdm'] amount = time*course # Пополнение баланса db = read() db['id'][id]['bal'] += amount # Статистика date = datetime.today().strftime('%Y-%m-%d') # Для пользователя t2c_date = db['id'][id]['time2cdm'][1] if t2c_date != date: db['id'][id]['time2cdm'][0] = amount db['id'][id]['time2cdm'][1] = date else: db['id'][id]['time2cdm'][0] += amount write(db) # Глобально stat = read('stat.json') stat[date]['time2cdm'] += amount write(stat, 'stat.json') return 'OK' else: return 'Error' class Check_bal(BaseModel): token: str id: str @app.post('/api/check_bal/') def check_bal(it: Check_bal): token, id = it.token, it.id if token_check(token): stat_run(0) db = read() # Если дата time2cdm прошла - обнулить баланс date = datetime.today().strftime('%Y-%m-%d') t2c_date = db['id'][id]['time2cdm'][1] if t2c_date != date: db['id'][id]['time2cdm'][0] = 0 db['id'][id]['time2cdm'][1] = date write(db) return db['id'][id]['bal'] else: return 'Error' class Get_nick(BaseModel): token: str id: str @app.post('/api/get_nick/') def get_nick(it: Get_nick): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['nick'] else: return 'Error' class Get_tg(BaseModel): token: str id: str @app.post('/api/get_tg/') def get_tg(it: Get_tg): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['tg'] else: return 'Error' class Get_ds(BaseModel): token: str id: str @app.post('/api/get_ds/') def get_ds(it: Get_ds): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['ds'] else: return 'Error' class Get_mine(BaseModel): token: str id: str @app.post('/api/get_mine/') def get_mine(it: Get_mine): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['mine'] else: return 'Error' class Get_passwd(BaseModel): token: str id: str @app.post('/api/get_passwd/') def get_passwd(it: Get_passwd): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['passwd'] else: return 'Error' class Get_time2cdm(BaseModel): token: str id: str @app.post('/api/get_time2cdm/') def get_time(it: Get_time2cdm): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['time2cdm'][0] else: return 'Error' class Get_stat(BaseModel): token: str date: Optional[str] = datetime.today().strftime('%Y-%m-%d') @app.post('/api/get_stat/') def get_stat(it: Get_stat): token, date = it.token, it.date if token_check(token): stat_run(0) db = read('stat.json') #date = datetime.today().strftime('%Y-%m-%d') if date not in db: return 'Not found' stats = db[date] return stats else: return 'Error' ############# USER API ################ class Gen_token(BaseModel): token: str id: str @app.post('/api/gen_token/') def gen_token(it: Gen_token): token, id = it.token, it.id if token_check(token): user_token = str(uuid4()) user_api = read('user_api.json') user_api['tokens'][id] = user_token write(user_api, 'user_api.json') return user_token else: return 'Error' class List_fp(BaseModel): token: str id: str @app.post('/api/list_fp/') def list_fp(it: List_fp): token, id = it.token, it.id if token_check(token): user_api = read('user_api.json') if id not in user_api['fp']: user_api['fp'][id] = [] write(user_api, 'user_api.json') return user_api['fp'][id] else: return 'Error' def gen_fp_id(user_api): ok = False while not ok: ok = True fp_id = str(randint(5236, 645862)) if fp_id in user_api['fp']: ok = False return fp_id class Gen_fp(BaseModel): token: str id: str amount: str @app.post('/api/gen_fp/') def gen_fp(it: Gen_fp): token, id, amount = it.token, it.id, it.amount if token_check(token): user_api = read('user_api.json') if id not in user_api['fp']: user_api['fp'][id] = [] write(user_api, 'user_api.json') if len(user_api['fp'][id]) >= 5: return 'Limit' fp_id = gen_fp_id(user_api) user_api['fp'][fp_id] = [id, amount] user_api['fp'][id].append(fp_id) write(user_api, 'user_api.json') return fp_id else: return 'Error' class Del_fp(BaseModel): token: str fp_id: str @app.post('/api/del_fp/') def del_fp(it: Del_fp): token, fp_id = it.token, it.fp_id if token_check(token): user_api = read('user_api.json') id = user_api['fp'][fp_id][0] del user_api['fp'][fp_id] user_api['fp'][id].remove(fp_id) write(user_api, 'user_api.json') return 'OK' else: return 'Error' if __name__ == '__main__': import uvicorn uvicorn.run(app, host='0.0.0.0', port=7001)