import os import sys import yaml import sqlite3 import time import threading import hashlib import mimetypes from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs from datetime import datetime # Global config CONFIG = {} # Global session tracker SESSIONS = {} SESSION_LOCK = threading.Lock() # Global file storage root STORAGE_ROOT = "" # Global DB connection DB_CONN = None # Logging utility def log(message): if CONFIG.get("logging", {}).get("enabled", True): print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {message}") # Load config def load_config(): global CONFIG, STORAGE_ROOT with open("config.yaml", "r") as f: CONFIG = yaml.safe_load(f) STORAGE_ROOT = CONFIG["storage"]["root_dir"] os.makedirs(STORAGE_ROOT, exist_ok=True) # Initialize DB def init_db(): global DB_CONN DB_CONN = sqlite3.connect("fileserver.db", check_same_thread=False) cursor = DB_CONN.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, quota_mb INTEGER NOT NULL, used_mb REAL DEFAULT 0.0 ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, session_id TEXT UNIQUE NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_access TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) DB_CONN.commit() # Utility: hash password def hash_password(password): return hashlib.sha256(password.encode()).hexdigest() # Utility: get user quota def get_user_quota(username): cursor = DB_CONN.cursor() cursor.execute("SELECT quota_mb, used_mb FROM users WHERE username = ?", (username,)) row = cursor.fetchone() if row: return row[0], row[1] return None, None # Utility: update used space def update_used_space(username, delta_mb): cursor = DB_CONN.cursor() cursor.execute("UPDATE users SET used_mb = used_mb + ? WHERE username = ?", (delta_mb, username)) DB_CONN.commit() # Utility: check if session is valid def is_session_valid(session_id): cursor = DB_CONN.cursor() cursor.execute("SELECT username, last_access FROM sessions WHERE session_id = ?", (session_id,)) row = cursor.fetchone() if not row: return None # TODO: ??? username, last_access = row now = datetime.now() session_timeout = CONFIG["server"]["session_timeout"] if (now - datetime.fromisoformat(last_access)).total_seconds() > session_timeout: # Expire session cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,)) DB_CONN.commit() return None # Update last access cursor.execute("UPDATE sessions SET last_access = ? WHERE session_id = ?", (now.isoformat(), session_id)) DB_CONN.commit() return username # Utility: create session def create_session(username): session_id = hashlib.sha256(f"{username}{time.time()}".encode()).hexdigest() cursor = DB_CONN.cursor() cursor.execute("INSERT INTO sessions (username, session_id) VALUES (?, ?)", (username, session_id)) DB_CONN.commit() return session_id # Utility: delete session def delete_session(session_id): cursor = DB_CONN.cursor() cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,)) DB_CONN.commit() # Utility: get user files def get_user_files(username): user_dir = os.path.join(STORAGE_ROOT, username) if not os.path.exists(user_dir): return [] files = [] for fname in os.listdir(user_dir): fpath = os.path.join(user_dir, fname) if os.path.isfile(fpath): files.append(fname) return files # Utility: get total used space for user def get_user_used_space(username): user_dir = os.path.join(STORAGE_ROOT, username) if not os.path.exists(user_dir): return 0.0 total = 0 for dirpath, dirnames, filenames in os.walk(user_dir): for f in filenames: fp = os.path.join(dirpath, f) total += os.path.getsize(fp) return total / (1024 * 1024) # MB # Utility: check if user can upload def can_upload(username, file_size_bytes): quota_mb, used_mb = get_user_quota(username) if quota_mb is None: return False file_size_mb = file_size_bytes / (1024 * 1024) return (used_mb + file_size_mb) <= quota_mb # Utility: get user directory def get_user_dir(username): user_dir = os.path.join(STORAGE_ROOT, username) os.makedirs(user_dir, exist_ok=True) return user_dir # HTTP Request Handler class FileServerHandler(BaseHTTPRequestHandler): def __init__(self, *args, **kwargs): #super().__init__(*args, directory=None, **kwargs) super().__init__(*args, **kwargs) def log_message(self, format, *args): # Override to avoid logging to stderr log(format % args) def send_html(self, content, status=200): self.send_response(status) self.send_header("Content-type", "text/html; charset=utf-8") self.end_headers() self.wfile.write(content.encode("utf-8")) def send_redirect(self, location, status=302): self.send_response(status) self.send_header("Location", location) self.end_headers() def get_session_user(self): cookies = self.headers.get("Cookie", "") session_id = None for cookie in cookies.split(";"): if cookie.strip().startswith("session_id="): session_id = cookie.strip().split("=", 1)[1] break if not session_id: return None return is_session_valid(session_id) def handle_main_page(self): content = f""" SimpliestFS

SimpliestFS

EN: {CONFIG['ui']['disclaimer']}

RU: {CONFIG['ui']['disclaimer_ru']}

Contact: {CONFIG['ui']['contact_email']}

Contact (prefered): @justuser_31

Register Login Explore Files

Note: NO ENCRYPTION, no external databases. As simple as possible.
""" self.send_html(content) def handle_register_page(self): content = f""" Register - SimpliestFS

SimpliestFS

Register

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

Back to Home

""" self.send_html(content) def handle_login_page(self): if self.path == "/login" and self.command == "POST": content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length).decode('utf-8') params = parse_qs(post_data) username = params.get('username', [''])[0] password = params.get('password', [''])[0] cursor = DB_CONN.cursor() cursor.execute("SELECT password_hash FROM users WHERE username = ?", (username,)) row = cursor.fetchone() if row and row[0] == hash_password(password): session_id = create_session(username) # Insert or replace the session record cursor.execute(''' INSERT OR REPLACE INTO sessions (username, session_id, created_at, last_access) VALUES (?, ?, ?, ?) ''', (username, session_id, datetime.now(), datetime.now())) self.send_response(302) self.send_header("Set-Cookie", f"session_id={session_id}; Path=/; HttpOnly") self.send_header("Location", "/files") self.end_headers() return else: # Show login form with error error = "Invalid username or password" content = self.render_login_form(error) self.send_html(content, 401) return # If already authorized username = self.get_session_user() if username: self.send_redirect("/files") return # GET /login content = self.render_login_form() self.send_html(content) def render_login_form(self, error=""): content = f""" Login - SimpliestFS

SimpliestFS

Login

{f'

{error}

' if error else ''}

Back to Home

""" return content def handle_files_page(self): username = self.get_session_user() if not username: self.send_redirect("/login") return # Update used space used_mb = get_user_used_space(username) cursor = DB_CONN.cursor() cursor.execute("UPDATE users SET used_mb = ? WHERE username = ?", (used_mb, username)) DB_CONN.commit() # Get files files = get_user_files(username) # Render files list files_html = "" for fname in files: download_url = f"/explore/{username}/{fname}" # ✅ Download link added here files_html += f"""
{fname}
""" content = f""" My Files - SimpliestFS

SimpliestFS

My Files

Quota: {CONFIG['storage']['default_quota_mb']} MB | Used: {used_mb:.2f} MB

{files_html if files_html else "

No files yet.

"}

Logout Back to Home

""" self.send_html(content) def handle_upload(self): username = self.get_session_user() if not username: self.send_redirect("/login") return if self.command != "POST": self.send_html("

Method not allowed

", 405) return # Parse multipart form content_type = self.headers.get('Content-Type', '') if not content_type.startswith('multipart/form-data'): self.send_html("

Invalid content type

", 400) return boundary = content_type.split("boundary=")[1] content_length = int(self.headers.get('Content-Length', 0)) data = self.rfile.read(content_length) # Find the file part parts = data.split(b'--' + boundary.encode()) file_data = None filename = None for part in parts: if b'Content-Disposition: form-data; name="file"; filename="' in part: # Extract filename header_end = part.find(b'\r\n\r\n') if header_end == -1: continue headers = part[:header_end].decode('utf-8', errors='ignore') filename_start = headers.find('filename="') + 10 filename_end = headers.find('"', filename_start) filename = headers[filename_start:filename_end] file_data = part[header_end + 4:-2] # remove trailing \r\n break if not filename or not file_data: self.send_html("

File not found in request

", 400) return # Check quota file_size = len(file_data) if not can_upload(username, file_size): self.send_html("

Quota exceeded

", 403) return # Save file user_dir = get_user_dir(username) filepath = os.path.join(user_dir, filename) with open(filepath, "wb") as f: f.write(file_data) # Update used space update_used_space(username, file_size / (1024 * 1024)) # Redirect back to files self.send_redirect("/files") def handle_file_download(self, username, filename): user_dir = os.path.join(STORAGE_ROOT, username) filepath = os.path.join(user_dir, filename) if not os.path.exists(filepath) or not os.path.isfile(filepath): self.send_html("

File not found

", 404) return # Get MIME type mime_type, _ = mimetypes.guess_type(filename) if not mime_type: mime_type = "application/octet-stream" self.send_response(200) self.send_header("Content-Type", mime_type) self.send_header("Content-Disposition", f'attachment; filename="{filename}"') self.end_headers() with open(filepath, "rb") as f: self.wfile.write(f.read()) def handle_delete(self): username = self.get_session_user() if not username: self.send_redirect("/login") return if self.command != "POST": self.send_html("

Method not allowed

", 405) return content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length).decode('utf-8') params = parse_qs(post_data) filename = params.get('filename', [''])[0] if not filename: self.send_html("

Filename required

", 400) return user_dir = get_user_dir(username) filepath = os.path.join(user_dir, filename) if os.path.exists(filepath): file_size = os.path.getsize(filepath) os.remove(filepath) # Update used space update_used_space(username, -file_size / (1024 * 1024)) # Redirect back to files self.send_redirect("/files") def handle_logout(self): cookies = self.headers.get("Cookie", "") session_id = None for cookie in cookies.split(";"): if cookie.strip().startswith("session_id="): session_id = cookie.strip().split("=", 1)[1] break if session_id: delete_session(session_id) self.send_response(302) self.send_header("Set-Cookie", "session_id=; Path=/; Expires=Thu, 01 Jan 1970 00:00:00 GMT") self.send_header("Location", "/") self.end_headers() def handle_explore(self): # List all user directories users = [] for item in os.listdir(STORAGE_ROOT): if os.path.isdir(os.path.join(STORAGE_ROOT, item)): users.append(item) users_html = "" for user in users: users_html += f"""
{user}
""" content = f""" Explore Files - SimpliestFS

SimpliestFS

Explore Files

All users:

{users_html if users_html else "

No users yet.

"}

Back to Home

""" self.send_html(content) def handle_explore_user(self, username): user_dir = os.path.join(STORAGE_ROOT, username) if not os.path.exists(user_dir): self.send_html("

User not found

", 404) return files = [] for fname in os.listdir(user_dir): fpath = os.path.join(user_dir, fname) if os.path.isfile(fpath): files.append(fname) files_html = "" for fname in files: download_url = f"/explore/{username}/{fname}" files_html += f"""
{fname}
""" content = f""" {username}'s Files - SimpliestFS

SimpliestFS

{username}'s Files

{files_html if files_html else "

No files.

"}

Back to All Users

""" self.send_html(content) def handle_api_create_account(self): if self.command != "POST": self.send_html("

Method not allowed

", 405) return content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length).decode('utf-8') params = parse_qs(post_data) token = params.get('token', [''])[0] username = params.get('username', [''])[0] password = params.get('password', [''])[0] quota_mb = params.get('drive_quota', [''])[0] if token != CONFIG["security"]["api_token"]: self.send_html("

Invalid token

", 403) return if not username or not password or not quota_mb.isdigit(): self.send_html("

Invalid parameters

", 400) return quota_mb = int(quota_mb) if quota_mb <= 0: self.send_html("

Quota must be positive

", 400) return cursor = DB_CONN.cursor() try: cursor.execute("INSERT INTO users (username, password_hash, quota_mb) VALUES (?, ?, ?)", (username, hash_password(password), quota_mb)) DB_CONN.commit() self.send_html("

Account created

") except sqlite3.IntegrityError: self.send_html("

Username already exists

", 409) def handle_api_set_quota(self): if self.command != "POST": self.send_html("

Method not allowed

", 405) return content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length).decode('utf-8') params = parse_qs(post_data) token = params.get('token', [''])[0] username = params.get('username', [''])[0] quota_mb = params.get('drive_quota', [''])[0] if token != CONFIG["security"]["api_token"]: self.send_html("

Invalid token

", 403) return if not username or not quota_mb.isdigit(): self.send_html("

Invalid parameters

", 400) return quota_mb = int(quota_mb) if quota_mb <= 0: self.send_html("

Quota must be positive

", 400) return cursor = DB_CONN.cursor() cursor.execute("UPDATE users SET quota_mb = ? WHERE username = ?", (quota_mb, username)) if cursor.rowcount == 0: self.send_html("

User not found

", 404) else: DB_CONN.commit() self.send_html("

Quota updated

") def handle_api_delete_account(self): if self.command != "POST": self.send_html("

Method not allowed

", 405) return content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length).decode('utf-8') params = parse_qs(post_data) token = params.get('token', [''])[0] username = params.get('username', [''])[0] if token != CONFIG["security"]["api_token"]: self.send_html("

Invalid token

", 403) return if not username: self.send_html("

Username required

", 400) return cursor = DB_CONN.cursor() cursor.execute("SELECT id FROM users WHERE username = ?", (username,)) row = cursor.fetchone() if not row: self.send_html("

User not found

", 404) return # Delete user files user_dir = os.path.join(STORAGE_ROOT, username) if os.path.exists(user_dir): for fname in os.listdir(user_dir): os.remove(os.path.join(user_dir, fname)) os.rmdir(user_dir) # Delete from DB cursor.execute("DELETE FROM users WHERE username = ?", (username,)) DB_CONN.commit() self.send_html("

Account deleted

") def do_GET(self): parsed_path = urlparse(self.path) path = parsed_path.path if path == "/": self.handle_main_page() elif path == "/register": self.handle_register_page() elif path == "/login": self.handle_login_page() elif path == "/files": self.handle_files_page() elif path == "/logout": self.handle_logout() elif path == "/explore": self.handle_explore() elif path.startswith("/explore/") and not path.endswith("/"): # Check if it's a file download: /explore/username/filename.ext parts = path[len("/explore/"):].split("/", 1) if len(parts) == 2: username, filename = parts self.handle_file_download(username, filename) else: # It's a user directory listing username = parts[0] self.handle_explore_user(username) else: self.send_html("

Not Found

", 404) def do_POST(self): parsed_path = urlparse(self.path) path = parsed_path.path if path == "/login": self.handle_login_page() elif path == "/upload": self.handle_upload() elif path == "/delete": self.handle_delete() elif path == "/create_account": self.handle_api_create_account() elif path == "/set_quota": self.handle_api_set_quota() elif path == "/delete_account": self.handle_api_delete_account() else: self.send_html("

Not Found

", 404) # Session cleanup thread def session_cleanup(): while True: time.sleep(60) # Check every minute now = datetime.now() session_timeout = CONFIG["server"]["session_timeout"] cursor = DB_CONN.cursor() cursor.execute("SELECT session_id, last_access FROM sessions") rows = cursor.fetchall() for session_id, last_access in rows: if (now - datetime.fromisoformat(last_access)).total_seconds() > session_timeout: cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,)) DB_CONN.commit() def main(): load_config() init_db() # Log startup log("Starting SimpliestFS server...") # Start session cleanup thread cleanup_thread = threading.Thread(target=session_cleanup, daemon=True) cleanup_thread.start() # Start server server_address = (CONFIG["server"]["host"], CONFIG["server"]["port"]) httpd = HTTPServer(server_address, FileServerHandler) log(f"SimpliestFS server running on {CONFIG['server']['host']}:{CONFIG['server']['port']}") httpd.serve_forever() if __name__ == "__main__": main()