You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
pxl_oboard/server.py

128 lines
2.7 KiB

from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qs
from io import BytesIO
from PIL import Image
import numpy as np
import time
# Easy debug
from icecream import ic
# ic.disable() # Turn off
# Limit for requests
LTIME = cur_time = time.monotonic()
globals().update(locals())
# Key by value
def kbv(dict_, value):
for i in dict_:
if dict_[i] == value:
return i
# Work with list-like objects
from listwork import *
class RequestHandler(BaseHTTPRequestHandler):
MATRIX_SIZE = (720, 1280)
def do_GET(self):
params = parse_qs(self.path[1:])
ic(params)
if 'get_color' in params:
params = params['get_color'][0]
ic(len(params))
xys = unpack(params)
ic.disable()
matrix = self.get_matrix()
colors = []
for i in xys:
ic(i)
x = i[0] ; y = i[1]
color = matrix[x][y]
ic(color)
colors.append(color)
colors = pack(colors)
ic(colors)
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(f'{colors}'.encode())
else:
self.send_response(200)
self.send_header('Content-type', 'image/png')
self.end_headers()
matrix = self.get_matrix()
matrix = np.flip(matrix, axis=0) #Fix flip on server side
self.send_image(matrix)
def do_POST(self):
global LTIME
cur_time = time.monotonic()
if cur_time - LTIME <= 0.0001:
self.send_error(429, 'Too Many Requests')
self.send_response(429)
return 0
else:
LTIME = time.monotonic()
content_length = int(self.headers['Content-Length'])
body = self.rfile.read(content_length)
params = parse_qs(body.decode('utf-8'))["main"][0]
ic(params)
#Set limit pixels for 1 response
ic(len(params))
if len(params) > 5685:
return 0
matrix = self.get_matrix()
ll = unpack(params)
for i in ll:
x = i[1] ; y = i[0] #Fix (y, x) -> (x, y) on server side
matrix[x, y] = [i[2], i[3], i[4]]
self.save_matrix(matrix)
self.send_response(302)
self.send_header('Location', '/')
self.end_headers()
def get_matrix(self):
try:
with open('matrix.npy', 'rb') as f:
matrix = np.load(f)
except FileNotFoundError:
matrix = np.full(shape=(*self.MATRIX_SIZE, 3), fill_value=255, dtype=np.uint8)
self.save_matrix(matrix)
return matrix
def save_matrix(self, matrix):
with open('matrix.npy', 'wb') as f:
np.save(f, matrix)
def send_image(self, matrix):
image = Image.fromarray(matrix)
buffer = BytesIO()
image.save(buffer, format='PNG')
self.wfile.write(buffer.getvalue())
def run():
server = HTTPServer(('127.0.0.1', 3333), RequestHandler)
server.serve_forever()
while True:
try:
run()
print(1)
except KeyboardInterrupt:
exit()
except:
pass