You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
pxl_oboard/server.py

168 lines
3.6 KiB

from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qs
from io import BytesIO
from PIL import Image
import numpy as np
import time
# Easy debug
from icecream import ic
ic.disable() # Turn off
# Limit for requests
LTIME = cur_time = time.monotonic()
globals().update(locals())
# Key by value
def kbv(dict_, value):
for i in dict_:
if dict_[i] == value:
return i
# Work with list-like objects
from listwork import *
# Work with tokens
import json
global tokens
def tokens_load():
with open('tokens.json', 'r') as openfile:
return json.load(openfile)
# (Re)generate tokens
def tokens_regen():
import hashlib
from random import randint as ri
tokens = tokens_load()
tokens["admin"] = hashlib.sha256(str.encode( str(ri(2443, 6543)) + tokens["secret"] )).hexdigest()
tokens["premium"] = []
for i in range(10):
tokens["premium"].append( hashlib.sha256(str.encode( str(ri(2443, 6543)) + tokens["secret"] )).hexdigest() )
js = json.dumps(tokens, indent=2)
with open("tokens.json", "w") as outfile:
outfile.write(js)
# Uncomment to regen
#tokens_regen()
class RequestHandler(BaseHTTPRequestHandler):
MATRIX_SIZE = (720, 1280)
def do_GET(self):
params = parse_qs(self.path[1:])
ic(params)
if 'get_color' in params:
1 year ago
params = params['get_color'][0]
ic(len(params))
xys = unpack(params)
ic.disable()
matrix = self.get_matrix()
colors = []
for i in xys:
ic(i)
x = i[0] ; y = i[1]
color = matrix[x][y]
ic(color)
colors.append(color)
colors = pack(colors)
ic(colors)
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(f'{colors}'.encode())
else:
self.send_response(200)
self.send_header('Content-type', 'image/png')
self.end_headers()
matrix = self.get_matrix()
matrix = np.flip(matrix, axis=0) #Fix flip on server side
self.send_image(matrix)
def do_POST(self):
global LTIME
cur_time = time.monotonic()
if cur_time - LTIME <= 0.0001:
self.send_error(429, 'Too Many Requests')
self.send_response(429)
return 0
else:
LTIME = time.monotonic()
content_length = int(self.headers['Content-Length'])
body = self.rfile.read(content_length)
params = parse_qs(body.decode('utf-8'))["main"][0]
ic(params)
# Parse token
ic( parse_qs(body.decode('utf-8')) )
token = parse_qs(body.decode('utf-8'))["token"][0]
#Set limit pixels for 1 response
ic(len(params))
if len(params) > 6700:
# Get tokens
tokens = tokens_load()
# Admin's token
if token[0] == tokens["admin"][0]:
pass
elif token[0] in tokens["premium"] and len(params) < 13400:
pass
else:
return 0
matrix = self.get_matrix()
ll = unpack(params)
for i in ll:
x = i[1] ; y = i[0] #Fix (y, x) -> (x, y) on server side
matrix[x, y] = [i[2], i[3], i[4]]
self.save_matrix(matrix)
self.send_response(302)
self.send_header('Location', '/')
self.end_headers()
def get_matrix(self):
try:
with open('matrix.npy', 'rb') as f:
matrix = np.load(f)
except FileNotFoundError:
matrix = np.full(shape=(*self.MATRIX_SIZE, 3), fill_value=255, dtype=np.uint8)
self.save_matrix(matrix)
return matrix
def save_matrix(self, matrix):
with open('matrix.npy', 'wb') as f:
np.save(f, matrix)
def send_image(self, matrix):
image = Image.fromarray(matrix)
buffer = BytesIO()
image.save(buffer, format='PNG')
self.wfile.write(buffer.getvalue())
def run():
server = HTTPServer(('127.0.0.1', 3333), RequestHandler)
server.serve_forever()
while True:
try:
run()
print(1)
except KeyboardInterrupt:
exit()
except:
pass