import mimetypes
import os
import sqlite3
from datetime import datetime
from http.server import BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
import database
from config import get_storage_root, load_config
from database import (
create_session,
delete_session,
get_user_used_space,
hash_password,
init_db,
is_session_valid,
update_used_space,
)
from utils import can_upload, get_user_dir, get_user_files
global CONFIG
CONFIG = load_config()
def log(message):
if CONFIG.get("logging", {}).get("enabled", True):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {message}")
class FileServerHandler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def log_message(self, format, *args):
log(format % args)
def send_html(self, content, status=200):
self.send_response(status)
self.send_header("Content-type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(content.encode("utf-8"))
def send_redirect(self, location, status=302):
self.send_response(status)
self.send_header("Location", location)
self.end_headers()
def get_session_user(self):
cookies = self.headers.get("Cookie", "")
session_id = None
for cookie in cookies.split(";"):
if cookie.strip().startswith("session_id="):
session_id = cookie.strip().split("=", 1)[1]
break
if not session_id:
return None
return is_session_valid(session_id, CONFIG["server"]["session_timeout"])
def handle_main_page(self):
content = f"""
SimpliestFS
EN: {CONFIG["ui"]["disclaimer"]}
RU: {CONFIG["ui"]["disclaimer_ru"]}
Contact: {CONFIG["ui"]["contact_email"]}
Contact (prefered): @justuser_31
Register
Login
Explore Files
Note: NO ENCRYPTION, no external databases. As simple as possible.
"""
self.send_html(content)
def handle_register_page(self):
content = f"""
Register - SimpliestFS
Register
{CONFIG["ui"]["register_info"]}
Back to Home
"""
self.send_html(content)
def handle_login_page(self):
if self.path == "/login" and self.command == "POST":
content_length = int(self.headers.get("Content-Length", 0))
post_data = self.rfile.read(content_length).decode("utf-8")
params = parse_qs(post_data)
username = params.get("username", [""])[0]
password = params.get("password", [""])[0]
cursor = database.DB_CONN.cursor()
cursor.execute(
"SELECT password_hash FROM users WHERE username = ?", (username,)
)
row = cursor.fetchone()
if row and row[0] == hash_password(password):
session_id = create_session(username)
cursor.execute(
"""
INSERT OR REPLACE INTO sessions (username, session_id, created_at, last_access)
VALUES (?, ?, ?, ?)
""",
(username, session_id, datetime.now(), datetime.now()),
)
self.send_response(302)
self.send_header(
"Set-Cookie", f"session_id={session_id}; Path=/; HttpOnly"
)
self.send_header("Location", "/files")
self.end_headers()
return
else:
error = "Invalid username or password"
content = self.render_login_form(error)
self.send_html(content, 401)
return
username = self.get_session_user()
if username:
self.send_redirect("/files")
return
content = self.render_login_form()
self.send_html(content)
def render_login_form(self, error=""):
content = f"""
Login - SimpliestFS
Login
{f'{error}
' if error else ""}
Back to Home
"""
return content
def handle_files_page(self):
username = self.get_session_user()
if not username:
self.send_redirect("/login")
return
used_mb = get_user_used_space(username)
cursor = database.DB_CONN.cursor()
cursor.execute(
"UPDATE users SET used_mb = ? WHERE username = ?", (used_mb, username)
)
database.DB_CONN.commit()
files = get_user_files(username)
files_html = ""
for fname in files:
download_url = f"/explore/{username}/{fname}"
files_html += f"""
"""
content = f"""
My Files - SimpliestFS
My Files
Quota: {CONFIG["storage"]["default_quota_mb"]} MB | Used: {used_mb:.2f} MB
{files_html if files_html else "No files yet.
"}
Logout
Back to Home
"""
self.send_html(content)
def handle_upload(self):
username = self.get_session_user()
if not username:
self.send_redirect("/login")
return
if self.command != "POST":
self.send_html("Method not allowed
", 405)
return
content_type = self.headers.get("Content-Type", "")
if not content_type.startswith("multipart/form-data"):
self.send_html("Invalid content type
", 400)
return
boundary = content_type.split("boundary=")[1]
content_length = int(self.headers.get("Content-Length", 0))
data = self.rfile.read(content_length)
parts = data.split(b"--" + boundary.encode())
file_data = None
filename = None
for part in parts:
if b'Content-Disposition: form-data; name="file"; filename="' in part:
header_end = part.find(b"\r\n\r\n")
if header_end == -1:
continue
headers = part[:header_end].decode("utf-8", errors="ignore")
filename_start = headers.find('filename="') + 10
filename_end = headers.find('"', filename_start)
filename = headers[filename_start:filename_end]
file_data = part[header_end + 4 : -2]
break
if not filename or not file_data:
self.send_html("File not found in request
", 400)
return
file_size = len(file_data)
if not can_upload(username, file_size):
self.send_html("Quota exceeded
", 403)
return
user_dir = get_user_dir(username)
filepath = os.path.join(user_dir, filename)
with open(filepath, "wb") as f:
f.write(file_data)
update_used_space(username, file_size / (1024 * 1024))
self.send_redirect("/files")
def handle_file_download(self, username, filename):
user_dir = os.path.join(get_storage_root(), username)
filepath = os.path.join(user_dir, filename)
if not os.path.exists(filepath) or not os.path.isfile(filepath):
self.send_html("File not found
", 404)
return
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = "application/octet-stream"
self.send_response(200)
self.send_header("Content-Type", mime_type)
self.send_header("Content-Disposition", f'attachment; filename="{filename}"')
self.end_headers()
with open(filepath, "rb") as f:
self.wfile.write(f.read())
def handle_delete(self):
username = self.get_session_user()
if not username:
self.send_redirect("/login")
return
if self.command != "POST":
self.send_html("Method not allowed
", 405)
return
content_length = int(self.headers.get("Content-Length", 0))
post_data = self.rfile.read(content_length).decode("utf-8")
params = parse_qs(post_data)
filename = params.get("filename", [""])[0]
if not filename:
self.send_html("Filename required
", 400)
return
user_dir = get_user_dir(username)
filepath = os.path.join(user_dir, filename)
if os.path.exists(filepath):
file_size = os.path.getsize(filepath)
os.remove(filepath)
update_used_space(username, -file_size / (1024 * 1024))
self.send_redirect("/files")
def handle_logout(self):
cookies = self.headers.get("Cookie", "")
session_id = None
for cookie in cookies.split(";"):
if cookie.strip().startswith("session_id="):
session_id = cookie.strip().split("=", 1)[1]
break
if session_id:
delete_session(session_id)
self.send_response(302)
self.send_header(
"Set-Cookie", "session_id=; Path=/; Expires=Thu, 01 Jan 1970 00:00:00 GMT"
)
self.send_header("Location", "/")
self.end_headers()
def handle_explore(self):
users = []
for item in os.listdir(get_storage_root()):
if os.path.isdir(os.path.join(get_storage_root(), item)):
users.append(item)
users_html = ""
for user in users:
users_html += f"""
"""
content = f"""
Explore Files - SimpliestFS
Explore Files
All users:
{users_html if users_html else "No users yet.
"}
Back to Home
"""
self.send_html(content)
def handle_explore_user(self, username):
user_dir = os.path.join(get_storage_root(), username)
if not os.path.exists(user_dir):
self.send_html("User not found
", 404)
return
files = []
for fname in os.listdir(user_dir):
fpath = os.path.join(user_dir, fname)
if os.path.isfile(fpath):
files.append(fname)
files_html = ""
for fname in files:
download_url = f"/explore/{username}/{fname}"
files_html += f"""
"""
content = f"""
{username}'s Files - SimpliestFS
{username}'s Files
{files_html if files_html else "No files.
"}
Back to All Users
"""
self.send_html(content)
def handle_api_create_account(self):
if self.command != "POST":
self.send_html("Method not allowed
", 405)
return
content_length = int(self.headers.get("Content-Length", 0))
post_data = self.rfile.read(content_length).decode("utf-8")
params = parse_qs(post_data)
token = params.get("token", [""])[0]
username = params.get("username", [""])[0]
password = params.get("password", [""])[0]
quota_mb = params.get("drive_quota", [""])[0]
if token != CONFIG["security"]["api_token"]:
self.send_html("Invalid token
", 403)
return
if not username or not password or not quota_mb.isdigit():
self.send_html("Invalid parameters
", 400)
return
quota_mb = int(quota_mb)
if quota_mb <= 0:
self.send_html("Quota must be positive
", 400)
return
cursor = database.DB_CONN.cursor()
try:
cursor.execute(
"INSERT INTO users (username, password_hash, quota_mb) VALUES (?, ?, ?)",
(username, hash_password(password), quota_mb),
)
DB_CONN.commit()
self.send_html("Account created
")
except sqlite3.IntegrityError:
self.send_html("Username already exists
", 409)
def handle_api_set_quota(self):
if self.command != "POST":
self.send_html("Method not allowed
", 405)
return
content_length = int(self.headers.get("Content-Length", 0))
post_data = self.rfile.read(content_length).decode("utf-8")
params = parse_qs(post_data)
token = params.get("token", [""])[0]
username = params.get("username", [""])[0]
quota_mb = params.get("drive_quota", [""])[0]
if token != CONFIG["security"]["api_token"]:
self.send_html("Invalid token
", 403)
return
if not username or not quota_mb.isdigit():
self.send_html("Invalid parameters
", 400)
return
quota_mb = int(quota_mb)
if quota_mb <= 0:
self.send_html("Quota must be positive
", 400)
return
cursor = database.DB_CONN.cursor()
cursor.execute(
"UPDATE users SET quota_mb = ? WHERE username = ?", (quota_mb, username)
)
if cursor.rowcount == 0:
self.send_html("User not found
", 404)
else:
database.DB_CONN.commit()
self.send_html("Quota updated
")
def handle_api_delete_account(self):
if self.command != "POST":
self.send_html("Method not allowed
", 405)
return
content_length = int(self.headers.get("Content-Length", 0))
post_data = self.rfile.read(content_length).decode("utf-8")
params = parse_qs(post_data)
token = params.get("token", [""])[0]
username = params.get("username", [""])[0]
if token != CONFIG["security"]["api_token"]:
self.send_html("Invalid token
", 403)
return
if not username:
self.send_html("Username required
", 400)
return
cursor = database.DB_CONN.cursor()
cursor.execute("SELECT id FROM users WHERE username = ?", (username,))
row = cursor.fetchone()
if not row:
self.send_html("User not found
", 404)
return
user_dir = os.path.join(get_storage_root(), username)
if os.path.exists(user_dir):
for fname in os.listdir(user_dir):
os.remove(os.path.join(user_dir, fname))
os.rmdir(user_dir)
cursor.execute("DELETE FROM users WHERE username = ?", (username,))
database.DB_CONN.commit()
self.send_html("Account deleted
")
def do_GET(self):
parsed_path = urlparse(self.path)
path = parsed_path.path
if path == "/":
self.handle_main_page()
elif path == "/register":
self.handle_register_page()
elif path == "/login":
self.handle_login_page()
elif path == "/files":
self.handle_files_page()
elif path == "/logout":
self.handle_logout()
elif path == "/explore":
self.handle_explore()
elif path.startswith("/explore/") and not path.endswith("/"):
parts = path[len("/explore/") :].split("/", 1)
if len(parts) == 2:
username, filename = parts
self.handle_file_download(username, filename)
else:
username = parts[0]
self.handle_explore_user(username)
else:
self.send_html("Not Found
", 404)
def do_POST(self):
parsed_path = urlparse(self.path)
path = parsed_path.path
if path == "/login":
self.handle_login_page()
elif path == "/upload":
self.handle_upload()
elif path == "/delete":
self.handle_delete()
elif path == "/create_account":
self.handle_api_create_account()
elif path == "/set_quota":
self.handle_api_set_quota()
elif path == "/delete_account":
self.handle_api_delete_account()
else:
self.send_html("Not Found
", 404)