import mimetypes import os import sqlite3 from datetime import datetime from http.server import BaseHTTPRequestHandler from urllib.parse import parse_qs, urlparse import database from config import get_storage_root, load_config from database import ( create_session, delete_session, get_user_used_space, hash_password, init_db, is_session_valid, update_used_space, ) from utils import can_upload, get_user_dir, get_user_files global CONFIG CONFIG = load_config() def log(message): if CONFIG.get("logging", {}).get("enabled", True): print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {message}") class FileServerHandler(BaseHTTPRequestHandler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def log_message(self, format, *args): log(format % args) def send_html(self, content, status=200): self.send_response(status) self.send_header("Content-type", "text/html; charset=utf-8") self.end_headers() self.wfile.write(content.encode("utf-8")) def send_redirect(self, location, status=302): self.send_response(status) self.send_header("Location", location) self.end_headers() def get_session_user(self): cookies = self.headers.get("Cookie", "") session_id = None for cookie in cookies.split(";"): if cookie.strip().startswith("session_id="): session_id = cookie.strip().split("=", 1)[1] break if not session_id: return None return is_session_valid(session_id, CONFIG["server"]["session_timeout"]) def handle_main_page(self): content = f""" SimpliestFS

SimpliestFS

EN: {CONFIG["ui"]["disclaimer"]}

RU: {CONFIG["ui"]["disclaimer_ru"]}

Contact: {CONFIG["ui"]["contact_email"]}

Contact (prefered): @justuser_31

Register Login Explore Files

Note: NO ENCRYPTION, no external databases. As simple as possible.
""" self.send_html(content) def handle_register_page(self): content = f""" Register - SimpliestFS

SimpliestFS

Register

{CONFIG["ui"]["register_info"]}

Back to Home

""" self.send_html(content) def handle_login_page(self): cursor = database.DB_CONN.cursor() if self.path == "/login" and self.command == "POST": content_length = int(self.headers.get("Content-Length", 0)) post_data = self.rfile.read(content_length).decode("utf-8") params = parse_qs(post_data) username = params.get("username", [""])[0] password = params.get("password", [""])[0] cursor.execute( "SELECT password_hash FROM users WHERE username = ?", (username,) ) row = cursor.fetchone() if row and row[0] == hash_password(password): session_id = create_session(username) cursor.execute( """ INSERT OR REPLACE INTO sessions (username, session_id, created_at, last_access) VALUES (?, ?, ?, ?) """, (username, session_id, datetime.now(), datetime.now()), ) self.send_response(302) self.send_header( "Set-Cookie", f"session_id={session_id}; Path=/; HttpOnly" ) self.send_header("Location", "/files") self.end_headers() return else: error = "Invalid username or password" content = self.render_login_form(error) self.send_html(content, 401) return # Check if user already authorized username = self.get_session_user() if username: # Check if user really exist cursor.execute("SELECT username FROM users WHERE username = ?", (username,)) username_row = cursor.fetchone() if username_row: self.send_redirect("/files") return content = self.render_login_form() self.send_html(content) def render_login_form(self, error=""): content = f""" Login - SimpliestFS

SimpliestFS

Login

{f'

{error}

' if error else ""}

Back to Home

""" return content def handle_files_page(self): username = self.get_session_user() if not username: self.send_redirect("/login") return used_mb = get_user_used_space(username) cursor = database.DB_CONN.cursor() cursor.execute("SELECT quota_mb FROM users WHERE username = ?", (username,)) available_mb_row = cursor.fetchone() # If user not authorized if not available_mb_row: self.send_redirect("/login") return available_mb = available_mb_row[0] cursor.execute( "UPDATE users SET used_mb = ? WHERE username = ?", (used_mb, username) ) database.DB_CONN.commit() files = get_user_files(username) files_html = "" for fname in files: download_url = f"/explore/{username}/{fname}" files_html += f"""
{fname}
""" content = f""" My Files - SimpliestFS

SimpliestFS

My Files

Quota: {available_mb} MB | Used: {used_mb:.2f} MB

{files_html if files_html else "

No files yet.

"}

Logout Back to Home

""" self.send_html(content) def handle_upload(self): username = self.get_session_user() if not username: self.send_redirect("/login") return if self.command != "POST": self.send_html("

Method not allowed

", 405) return content_type = self.headers.get("Content-Type", "") if not content_type.startswith("multipart/form-data"): self.send_html("

Invalid content type

", 400) return boundary = content_type.split("boundary=")[1] content_length = int(self.headers.get("Content-Length", 0)) data = self.rfile.read(content_length) parts = data.split(b"--" + boundary.encode()) file_data = None filename = None for part in parts: if b'Content-Disposition: form-data; name="file"; filename="' in part: header_end = part.find(b"\r\n\r\n") if header_end == -1: continue headers = part[:header_end].decode("utf-8", errors="ignore") filename_start = headers.find('filename="') + 10 filename_end = headers.find('"', filename_start) filename = headers[filename_start:filename_end] file_data = part[header_end + 4 : -2] break if not filename or not file_data: self.send_html("

File not found in request

", 400) return file_size = len(file_data) if not can_upload(username, file_size): self.send_html("

Quota exceeded

", 403) return user_dir = get_user_dir(username) filepath = os.path.join(user_dir, filename) with open(filepath, "wb") as f: f.write(file_data) update_used_space(username, file_size / (1024 * 1024)) self.send_redirect("/files") def handle_file_download(self, username, filename): user_dir = os.path.join(get_storage_root(), username) filepath = os.path.join(user_dir, filename) if not os.path.exists(filepath) or not os.path.isfile(filepath): self.send_html("

File not found

", 404) return mime_type, _ = mimetypes.guess_type(filename) if not mime_type: mime_type = "application/octet-stream" self.send_response(200) self.send_header("Content-Type", mime_type) self.send_header("Content-Disposition", f'attachment; filename="{filename}"') self.end_headers() with open(filepath, "rb") as f: self.wfile.write(f.read()) def handle_delete(self): username = self.get_session_user() if not username: self.send_redirect("/login") return if self.command != "POST": self.send_html("

Method not allowed

", 405) return content_length = int(self.headers.get("Content-Length", 0)) post_data = self.rfile.read(content_length).decode("utf-8") params = parse_qs(post_data) filename = params.get("filename", [""])[0] if not filename: self.send_html("

Filename required

", 400) return user_dir = get_user_dir(username) filepath = os.path.join(user_dir, filename) if os.path.exists(filepath): file_size = os.path.getsize(filepath) os.remove(filepath) update_used_space(username, -file_size / (1024 * 1024)) self.send_redirect("/files") def handle_logout(self): cookies = self.headers.get("Cookie", "") session_id = None for cookie in cookies.split(";"): if cookie.strip().startswith("session_id="): session_id = cookie.strip().split("=", 1)[1] break if session_id: delete_session(session_id) self.send_response(302) self.send_header( "Set-Cookie", "session_id=; Path=/; Expires=Thu, 01 Jan 1970 00:00:00 GMT" ) self.send_header("Location", "/") self.end_headers() def handle_explore(self): users = [] for item in os.listdir(get_storage_root()): if os.path.isdir(os.path.join(get_storage_root(), item)): users.append(item) users_html = "" for user in users: users_html += f"""
{user}
""" content = f""" Explore Files - SimpliestFS

SimpliestFS

Explore Files

All users:

{users_html if users_html else "

No users yet.

"}

Back to Home

""" self.send_html(content) def handle_explore_user(self, username): user_dir = os.path.join(get_storage_root(), username) if not os.path.exists(user_dir): self.send_html("

User not found

", 404) return files = [] for fname in os.listdir(user_dir): fpath = os.path.join(user_dir, fname) if os.path.isfile(fpath): files.append(fname) files_html = "" for fname in files: download_url = f"/explore/{username}/{fname}" files_html += f"""
{fname}
""" content = f""" {username}'s Files - SimpliestFS

SimpliestFS

{username}'s Files

{files_html if files_html else "

No files.

"}

Back to All Users

""" self.send_html(content) def handle_api_create_account(self): if self.command != "POST": self.send_html("

Method not allowed

", 405) return content_length = int(self.headers.get("Content-Length", 0)) post_data = self.rfile.read(content_length).decode("utf-8") params = parse_qs(post_data) token = params.get("token", [""])[0] username = params.get("username", [""])[0] password = params.get("password", [""])[0] quota_mb = params.get("drive_quota", [""])[0] if token != CONFIG["security"]["api_token"]: self.send_html("

Invalid token

", 403) return if not username or not password or not quota_mb.isdigit(): self.send_html("

Invalid parameters

", 400) return quota_mb = int(quota_mb) if quota_mb <= 0: self.send_html("

Quota must be positive

", 400) return cursor = database.DB_CONN.cursor() try: cursor.execute( "INSERT INTO users (username, password_hash, quota_mb) VALUES (?, ?, ?)", (username, hash_password(password), quota_mb), ) DB_CONN.commit() self.send_html("

Account created

") except sqlite3.IntegrityError: self.send_html("

Username already exists

", 409) def handle_api_set_quota(self): if self.command != "POST": self.send_html("

Method not allowed

", 405) return content_length = int(self.headers.get("Content-Length", 0)) post_data = self.rfile.read(content_length).decode("utf-8") params = parse_qs(post_data) token = params.get("token", [""])[0] username = params.get("username", [""])[0] quota_mb = params.get("drive_quota", [""])[0] if token != CONFIG["security"]["api_token"]: self.send_html("

Invalid token

", 403) return if not username or not quota_mb.isdigit(): self.send_html("

Invalid parameters

", 400) return quota_mb = int(quota_mb) if quota_mb <= 0: self.send_html("

Quota must be positive

", 400) return cursor = database.DB_CONN.cursor() cursor.execute( "UPDATE users SET quota_mb = ? WHERE username = ?", (quota_mb, username) ) if cursor.rowcount == 0: self.send_html("

User not found

", 404) else: database.DB_CONN.commit() self.send_html("

Quota updated

") def handle_api_delete_account(self): if self.command != "POST": self.send_html("

Method not allowed

", 405) return content_length = int(self.headers.get("Content-Length", 0)) post_data = self.rfile.read(content_length).decode("utf-8") params = parse_qs(post_data) token = params.get("token", [""])[0] username = params.get("username", [""])[0] if token != CONFIG["security"]["api_token"]: self.send_html("

Invalid token

", 403) return if not username: self.send_html("

Username required

", 400) return cursor = database.DB_CONN.cursor() cursor.execute("SELECT id FROM users WHERE username = ?", (username,)) row = cursor.fetchone() if not row: self.send_html("

User not found

", 404) return user_dir = os.path.join(get_storage_root(), username) if os.path.exists(user_dir): for fname in os.listdir(user_dir): os.remove(os.path.join(user_dir, fname)) os.rmdir(user_dir) cursor.execute("DELETE FROM users WHERE username = ?", (username,)) database.DB_CONN.commit() self.send_html("

Account deleted

") def do_GET(self): parsed_path = urlparse(self.path) path = parsed_path.path if path == "/": self.handle_main_page() elif path == "/register": self.handle_register_page() elif path == "/login": self.handle_login_page() elif path == "/files": self.handle_files_page() elif path == "/logout": self.handle_logout() elif path == "/explore": self.handle_explore() elif path.startswith("/explore/") and not path.endswith("/"): parts = path[len("/explore/") :].split("/", 1) if len(parts) == 2: username, filename = parts self.handle_file_download(username, filename) else: username = parts[0] self.handle_explore_user(username) else: self.send_html("

Not Found

", 404) def do_POST(self): parsed_path = urlparse(self.path) path = parsed_path.path if path == "/login": self.handle_login_page() elif path == "/upload": self.handle_upload() elif path == "/delete": self.handle_delete() elif path == "/create_account": self.handle_api_create_account() elif path == "/set_quota": self.handle_api_set_quota() elif path == "/delete_account": self.handle_api_delete_account() else: self.send_html("

Not Found

", 404)