from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Optional from datetime import datetime, timedelta from statistics import median from uuid import uuid4 from random import randint from time import sleep # Fix 3.3 + 0.15 = 3.4499999999999997 from decimal import Decimal as d def fix_add(one, two): return float(d(str(one)) + d(str(two))) def fix_sub(one, two): return float(d(str(one)) - d(str(two))) from db import * app = FastAPI() def token_check(token): db = read() if token in db['tokens']: return True else: return False ############## STATS ################## STAT_RUN = False def stat_run(cdm_change): # Общий баланс, среднее значение, медиана # Низшее значение, высшее значение # Изменение баланса и time2cdm # Прирост за счёт алмазов, за счёт time2cdm # Защита от конфликтов global STAT_RUN while STAT_RUN: sleep(1) STAT_RUN = True # Получаем все балансы db = read() bals = [] for id in db['id']: bals.append(db['id'][id]['bal']) date = datetime.today().strftime('%Y-%m-%d') stat = read('stat.json') if date in stat: stat[date]['gbal'] += cdm_change else: stat[date] = {'gbal': sum(bals), 'time2cdm': 0} stats = {'gbal': 0, 'average': 0, 'median': 0, 'time2cdm': 0, 'gbal_delta': '0', 'time2cdm_delta': '', 'min': 0, 'max': 0, 'up_diamond': 0} date = datetime.today().strftime('%Y-%m-%d') yesterday = (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d') if yesterday in stat: gbal_y = stat[yesterday]['gbal'] time2cdm_y = stat[yesterday]['time2cdm'] else: gbal_y, time2cdm_y = 0, 0 time2cdm = stat[date]['time2cdm'] gbal = stat[date]['gbal'] # Заполняем данные stats['gbal'] = round(gbal, 3) stats['average'] = round(sum([x for x in bals if x != 0])/len([x for x in bals if x != 0]), 3) stats['median'] = round(median([x for x in bals if x != 0]), 3) stats['time2cdm'] = round(time2cdm, 3) stats['time2cdm_delta'] = round(time2cdm-time2cdm_y, 3) stats['gbal_delta'] = round(gbal-gbal_y, 3) stats['min'] = round(min([x for x in bals if x != 0]), 3) stats['max'] = round(max(bals), 3) stats['up_diamond'] = round(gbal - (gbal_y + time2cdm)) stat[date] = stats write(stat, 'stat.json') STAT_RUN = False ####################### class User_in_db(BaseModel): token: str id: str = None tg: str = None ds: str = None mine: str = None nick: str = None @app.post('/api/user_in_db/') def user_in_db(it: User_in_db): token, id, tg, ds, mine, nick = it.token, it.id, it.tg, it.ds, it.mine, it.nick if token_check(token): db = read() try: if id and id in db['id']: return id elif tg and tg in db['tg']: return db['tg'][tg] elif ds and ds in db['ds']: return db['ds'][ds] elif mine and mine in db['mine']: return db['mine'][mine] elif nick and nick in db['nick']: return db['nick'][nick] else: return False except: return False else: return 'Error' def gen_id(): db = read() for i in range(1,100000): check = str(i) if check not in db['id']: return str(i) return 'Full?' class Add_user(BaseModel): token: str id: str = None tg: str = None ds: str = None mine: str = None nick: str passwd: str @app.post('/api/add_user/') def add_user(it: Add_user): token, id, tg, ds, mine, nick, passwd = it.token, it.id, it.tg, it.ds, it.mine, it.nick, it.passwd id = gen_id() if token_check(token): db = read() db['id'][id] = {'tg': tg, 'ds': ds, 'mine': mine, 'nick': nick, 'passwd': passwd , 'bal': 0.0, 'time2cdm': [0, datetime.today().strftime('%Y-%m-%d')]} db['nick'][nick] = id if tg: db['tg'][tg] = id if ds: db['ds'][ds] = id if mine: db['mine'][mine] = id write(db) return 'OK' else: return 'Error' class Del_user(BaseModel): token: str id: str @app.post('/api/del_user/') def del_user(it: Del_user): token, id = it.token, it.id if token_check(token): db = read() tg, ds, mine, nick = db['id'][id]['tg'], db['id'][id]['ds'], db['id'][id]['mine'], db['id'][id]['nick'] del db['nick'][nick] if tg: del db['tg'][tg] if ds: del db['ds'][ds] if mine: del db['mine'][mine] del db['id'][id] write(db) return 'OK' else: return 'Error' class Add_coins(BaseModel): token: str id: str amount: str @app.post('/api/add_coins/') def add_coins(it: Add_coins): token, id, amount = it.token, it.id, abs(float(it.amount)) if token_check(token): db = read() db['id'][id]['bal'] = fix_add(db['id'][id]['bal'], amount) write(db) stat_run(amount) return 'OK' else: return 'Error' class Del_coins(BaseModel): token: str id: str amount: str @app.post('/api/del_coins/') def del_coins(it: Del_coins): token, id, amount = it.token, it.id, abs(float(it.amount)) if token_check(token): db = read() if db['id'][id]['bal'] >= amount: db['id'][id]['bal'] = fix_sub(db['id'][id]['bal'], amount) write(db) stat_run(amount*-1) return 'OK' else: return 'Error' else: return 'Error' class Transfer_coins(BaseModel): token: str src_id: str dst_id: str amount: str @app.post('/api/transfer_coins/') def transfer_coins(it: Transfer_coins): token, src_id, dst_id, amount = it.token, it.src_id, it.dst_id, float(it.amount) if token_check(token): db = read() amount = abs(amount) # Защита от отриц. чисел src_bal = db['id'][src_id]['bal'] # Больше баланса и количество цифр после запятой <= 3 if src_bal >= amount and len(str(amount).split('.')[1]) <= 3: # and amount > 0.0001: db['id'][src_id]['bal'] = fix_sub(db['id'][src_id]['bal'], amount) db['id'][dst_id]['bal'] = fix_add(db['id'][dst_id]['bal'], amount) write(db) return 'OK' else: return 'No_money' else: return 'Error' class Update_tg(BaseModel): token: str id: str tg: str @app.post('/api/update_tg/') def update_tg(it: Update_tg): token, id, tg = it.token, it.id, it.tg if token_check(token): db = read() cur_tg = db['id'][id]['tg'] try: del db['tg'][cur_tg] except: pass if tg == 'None': db['id'][id]['tg'] = None else: db['id'][id]['tg'] = tg db['tg'][tg] = id write(db) return 'OK' else: return 'Error' class Update_ds(BaseModel): token: str id: str ds: str @app.post('/api/update_ds/') def update_ds(it: Update_ds): token, id, ds = it.token, it.id, it.ds if token_check(token): db = read() cur_ds = db['id'][id]['ds'] try: del db['ds'][cur_ds] except: pass if ds == 'None': db['id'][id]['ds'] = None else: db['id'][id]['ds'] = ds db['ds'][ds] = id write(db) return 'OK' else: return 'Error' class Update_mine(BaseModel): token: str id: str mine: str @app.post('/api/update_mine/') def update_mine(it: Update_mine): token, id, mine = it.token, it.id, it.mine if token_check(token): db = read() try: cur_mine = db['id'][id]['mine'] del db['mine'][cur_mine] except: pass if mine == 'None': db['id'][id]['mine'] = None else: db['id'][id]['mine'] = mine db['mine'][mine] = id write(db) return 'OK' else: return 'Error' class Update_nick(BaseModel): token: str id: str nick: str @app.post('/api/update_nick/') def update_nick(it: Update_nick): token, id, nick = it.token, it.id, it.nick if token_check(token): db = read() cur_nick = db['id'][id]['nick'] del db['nick'][cur_nick] db['id'][id]['nick'] = nick db['nick'][nick] = id write(db) return 'OK' else: return 'Error' class Update_passwd(BaseModel): token: str id: str passwd: str @app.post('/api/update_passwd/') def update_passwd(it: Update_passwd): token, id, passwd = it.token, it.id, it.passwd if token_check(token): db = read() db['id'][id]['passwd'] = passwd write(db) return 'OK' else: return 'Error' class Add_time(BaseModel): token: str id: str time: str @app.post('/api/add_time/') def add_time(it: Add_time): token, id, time = it.token, it.id, int(it.time) if token_check(token): course = read('conf.json')['time2cdm'] amount = time*course # Пополнение баланса db = read() db['id'][id]['bal'] += amount # Статистика date = datetime.today().strftime('%Y-%m-%d') # Для пользователя t2c_date = db['id'][id]['time2cdm'][1] if t2c_date != date: db['id'][id]['time2cdm'][0] = amount db['id'][id]['time2cdm'][1] = date else: db['id'][id]['time2cdm'][0] += amount write(db) # Глобально stat = read('stat.json') stat[date]['time2cdm'] += amount write(stat, 'stat.json') stat_run(0) return 'OK' else: return 'Error' class Check_bal(BaseModel): token: str id: str @app.post('/api/check_bal/') def check_bal(it: Check_bal): token, id = it.token, it.id if token_check(token): stat_run(0) db = read() # Если дата time2cdm прошла - обнулить баланс date = datetime.today().strftime('%Y-%m-%d') t2c_date = db['id'][id]['time2cdm'][1] if t2c_date != date: db['id'][id]['time2cdm'][0] = 0 db['id'][id]['time2cdm'][1] = date write(db) return db['id'][id]['bal'] else: return 'Error' class Get_nick(BaseModel): token: str id: str @app.post('/api/get_nick/') def get_nick(it: Get_nick): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['nick'] else: return 'Error' class Get_tg(BaseModel): token: str id: str @app.post('/api/get_tg/') def get_tg(it: Get_tg): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['tg'] else: return 'Error' class Get_ds(BaseModel): token: str id: str @app.post('/api/get_ds/') def get_ds(it: Get_ds): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['ds'] else: return 'Error' class Get_mine(BaseModel): token: str id: str @app.post('/api/get_mine/') def get_mine(it: Get_mine): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['mine'] else: return 'Error' class Get_passwd(BaseModel): token: str id: str @app.post('/api/get_passwd/') def get_passwd(it: Get_passwd): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['passwd'] else: return 'Error' class Get_time2cdm(BaseModel): token: str id: str @app.post('/api/get_time2cdm/') def get_time(it: Get_time2cdm): token, id = it.token, it.id if token_check(token): db = read() return db['id'][id]['time2cdm'][0] else: return 'Error' class Get_stat(BaseModel): token: str date: Optional[str] = None @app.post('/api/get_stat/') def get_stat(it: Get_stat): token, date = it.token, it.date if not date: date = datetime.today().strftime('%Y-%m-%d') if token_check(token): stat_run(0) db = read('stat.json') #date = datetime.today().strftime('%Y-%m-%d') if date not in db: return 'Not found' stats = db[date] return stats else: return 'Error' ############# USER API ################ class Gen_token(BaseModel): token: str id: str @app.post('/api/gen_token/') def gen_token(it: Gen_token): token, id = it.token, it.id if token_check(token): user_token = str(uuid4()) user_api = read('user_api.json') user_api['tokens'][id] = user_token write(user_api, 'user_api.json') return user_token else: return 'Error' class List_fp(BaseModel): token: str id: str @app.post('/api/list_fp/') def list_fp(it: List_fp): token, id = it.token, it.id if token_check(token): user_api = read('user_api.json') if id not in user_api['fp']: user_api['fp'][id] = [] write(user_api, 'user_api.json') return user_api['fp'][id] else: return 'Error' def gen_fp_id(user_api): ok = False while not ok: ok = True fp_id = str(randint(5236, 645862)) if fp_id in user_api['fp']: ok = False return fp_id class Gen_fp(BaseModel): token: str id: str amount: str @app.post('/api/gen_fp/') def gen_fp(it: Gen_fp): token, id, amount = it.token, it.id, it.amount if token_check(token): user_api = read('user_api.json') if id not in user_api['fp']: user_api['fp'][id] = [] write(user_api, 'user_api.json') if len(user_api['fp'][id]) >= 5: return 'Limit' fp_id = gen_fp_id(user_api) user_api['fp'][fp_id] = [id, amount] user_api['fp'][id].append(fp_id) write(user_api, 'user_api.json') return fp_id else: return 'Error' class Del_fp(BaseModel): token: str fp_id: str @app.post('/api/del_fp/') def del_fp(it: Del_fp): token, fp_id = it.token, it.fp_id if token_check(token): user_api = read('user_api.json') id = user_api['fp'][fp_id][0] del user_api['fp'][fp_id] user_api['fp'][id].remove(fp_id) write(user_api, 'user_api.json') return 'OK' else: return 'Error' if __name__ == '__main__': import uvicorn uvicorn.run(app, host='0.0.0.0', port=7001)