generated from justuser-31/mrl_v1_license
609 lines
21 KiB
Python
609 lines
21 KiB
Python
import mimetypes
|
|
import os
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from http.server import BaseHTTPRequestHandler
|
|
from urllib.parse import parse_qs, urlparse
|
|
|
|
import database
|
|
from config import get_storage_root, load_config
|
|
from database import (
|
|
create_session,
|
|
delete_session,
|
|
get_user_used_space,
|
|
hash_password,
|
|
init_db,
|
|
is_session_valid,
|
|
update_used_space,
|
|
)
|
|
from utils import can_upload, get_user_dir, get_user_files
|
|
|
|
global CONFIG
|
|
CONFIG = load_config()
|
|
|
|
|
|
def log(message):
|
|
if CONFIG.get("logging", {}).get("enabled", True):
|
|
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {message}")
|
|
|
|
|
|
class FileServerHandler(BaseHTTPRequestHandler):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def log_message(self, format, *args):
|
|
log(format % args)
|
|
|
|
def send_html(self, content, status=200):
|
|
self.send_response(status)
|
|
self.send_header("Content-type", "text/html; charset=utf-8")
|
|
self.end_headers()
|
|
self.wfile.write(content.encode("utf-8"))
|
|
|
|
def send_redirect(self, location, status=302):
|
|
self.send_response(status)
|
|
self.send_header("Location", location)
|
|
self.end_headers()
|
|
|
|
def get_session_user(self):
|
|
cookies = self.headers.get("Cookie", "")
|
|
session_id = None
|
|
for cookie in cookies.split(";"):
|
|
if cookie.strip().startswith("session_id="):
|
|
session_id = cookie.strip().split("=", 1)[1]
|
|
break
|
|
if not session_id:
|
|
return None
|
|
return is_session_valid(session_id, CONFIG["server"]["session_timeout"])
|
|
|
|
def handle_main_page(self):
|
|
content = f"""
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>SimpliestFS</title>
|
|
<style>
|
|
body {{ font-family: sans-serif; margin: 40px; }}
|
|
.btn {{ display: inline-block; padding: 10px 20px; margin: 10px; background: #eee; border: 1px solid #ccc; text-decoration: none; color: #333; }}
|
|
.btn:hover {{ background: #ddd; }}
|
|
.disclaimer {{ margin-top: 30px; font-size: 0.9em; color: #666; }}
|
|
h1 a {{ text-decoration: none; color: inherit; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h1><a href="/">SimpliestFS</a></h1>
|
|
<p>EN: {CONFIG["ui"]["disclaimer"]}</p>
|
|
<p>RU: {CONFIG["ui"]["disclaimer_ru"]}</p>
|
|
<p>Contact: <a href="mailto:{CONFIG["ui"]["contact_email"]}">{CONFIG["ui"]["contact_email"]}</a></p>
|
|
<p>Contact (prefered): <a href="https://t.me/justuser_31">@justuser_31</a></p>
|
|
<p>
|
|
<a class="btn" href="/register">Register</a>
|
|
<a class="btn" href="/login">Login</a>
|
|
<a class="btn" href="/explore">Explore Files</a>
|
|
</p>
|
|
<div class="disclaimer">
|
|
Note: NO ENCRYPTION, no external databases. As simple as possible.
|
|
</div>
|
|
</body>
|
|
</html>
|
|
"""
|
|
self.send_html(content)
|
|
|
|
def handle_register_page(self):
|
|
content = f"""
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>Register - SimpliestFS</title>
|
|
<style>
|
|
body {{ font-family: sans-serif; margin: 40px; }}
|
|
.back-btn {{ display: inline-block; padding: 5px 10px; background: #eee; border: 1px solid #ccc; text-decoration: none; color: #333; }}
|
|
h1 a {{ text-decoration: none; color: inherit; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h1><a href="/">SimpliestFS</a></h1>
|
|
<h2>Register</h2>
|
|
<p>{CONFIG["ui"]["register_info"]}</p>
|
|
<p><a class="back-btn" href="/">Back to Home</a></p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
self.send_html(content)
|
|
|
|
def handle_login_page(self):
|
|
cursor = database.DB_CONN.cursor()
|
|
if self.path == "/login" and self.command == "POST":
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
post_data = self.rfile.read(content_length).decode("utf-8")
|
|
params = parse_qs(post_data)
|
|
username = params.get("username", [""])[0]
|
|
password = params.get("password", [""])[0]
|
|
|
|
cursor.execute(
|
|
"SELECT password_hash FROM users WHERE username = ?", (username,)
|
|
)
|
|
row = cursor.fetchone()
|
|
if row and row[0] == hash_password(password):
|
|
session_id = create_session(username)
|
|
cursor.execute(
|
|
"""
|
|
INSERT OR REPLACE INTO sessions (username, session_id, created_at, last_access)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
(username, session_id, datetime.now(), datetime.now()),
|
|
)
|
|
self.send_response(302)
|
|
self.send_header(
|
|
"Set-Cookie", f"session_id={session_id}; Path=/; HttpOnly"
|
|
)
|
|
self.send_header("Location", "/files")
|
|
self.end_headers()
|
|
return
|
|
else:
|
|
error = "Invalid username or password"
|
|
content = self.render_login_form(error)
|
|
self.send_html(content, 401)
|
|
return
|
|
|
|
# Check if user already authorized
|
|
username = self.get_session_user()
|
|
if username:
|
|
# Check if user really exist
|
|
cursor.execute("SELECT username FROM users WHERE username = ?", (username,))
|
|
username_row = cursor.fetchone()
|
|
if username_row:
|
|
self.send_redirect("/files")
|
|
return
|
|
|
|
content = self.render_login_form()
|
|
self.send_html(content)
|
|
|
|
def render_login_form(self, error=""):
|
|
content = f"""
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>Login - SimpliestFS</title>
|
|
<style>
|
|
body {{ font-family: sans-serif; margin: 40px; }}
|
|
form {{ max-width: 300px; }}
|
|
input, button {{ width: 100%; padding: 8px; margin: 5px 0; }}
|
|
.error {{ color: red; }}
|
|
.back-btn {{ display: inline-block; padding: 5px 10px; background: #eee; border: 1px solid #ccc; text-decoration: none; color: #333; }}
|
|
h1 a {{ text-decoration: none; color: inherit; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h1><a href="/">SimpliestFS</a></h1>
|
|
<h2>Login</h2>
|
|
{f'<p class="error">{error}</p>' if error else ""}
|
|
<form method="POST">
|
|
<input type="text" name="username" placeholder="Username" required>
|
|
<input type="password" name="password" placeholder="Password" required>
|
|
<button type="submit">Login</button>
|
|
</form>
|
|
<p><a class="back-btn" href="/">Back to Home</a></p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
return content
|
|
|
|
def handle_files_page(self):
|
|
username = self.get_session_user()
|
|
if not username:
|
|
self.send_redirect("/login")
|
|
return
|
|
|
|
used_mb = get_user_used_space(username)
|
|
cursor = database.DB_CONN.cursor()
|
|
cursor.execute("SELECT quota_mb FROM users WHERE username = ?", (username,))
|
|
available_mb_row = cursor.fetchone()
|
|
# If user not authorized
|
|
if not available_mb_row:
|
|
self.send_redirect("/login")
|
|
return
|
|
available_mb = available_mb_row[0]
|
|
cursor.execute(
|
|
"UPDATE users SET used_mb = ? WHERE username = ?", (used_mb, username)
|
|
)
|
|
database.DB_CONN.commit()
|
|
|
|
files = get_user_files(username)
|
|
|
|
files_html = ""
|
|
for fname in files:
|
|
download_url = f"/explore/{username}/{fname}"
|
|
files_html += f"""
|
|
<div style="margin: 5px 0; display: flex; align-items: center;">
|
|
<span style="flex: 1;">
|
|
<a href="{download_url}">{fname}</a>
|
|
</span>
|
|
<form method="POST" action="/delete" style="display: inline;">
|
|
<input type="hidden" name="filename" value="{fname}">
|
|
<button type="submit" style="padding: 2px 8px; margin-left: 5px;">Delete</button>
|
|
</form>
|
|
</div>
|
|
"""
|
|
|
|
content = f"""
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>My Files - SimpliestFS</title>
|
|
<style>
|
|
body {{ font-family: sans-serif; margin: 40px; }}
|
|
.upload-btn {{ display: inline-block; padding: 5px 10px; background: #eee; border: 1px solid #ccc; text-decoration: none; color: #333; }}
|
|
.back-btn {{ display: inline-block; padding: 5px 10px; background: #eee; border: 1px solid #ccc; text-decoration: none; color: #333; }}
|
|
.quota-info {{ margin: 20px 0; font-size: 0.9em; color: #666; }}
|
|
h1 a {{ text-decoration: none; color: inherit; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h1><a href="/">SimpliestFS</a></h1>
|
|
<h2>My Files</h2>
|
|
<div class="quota-info">
|
|
Quota: {available_mb} MB | Used: {used_mb:.2f} MB
|
|
</div>
|
|
<form method="POST" enctype="multipart/form-data" action="/upload">
|
|
<input type="file" name="file" required>
|
|
<button type="submit">Upload</button>
|
|
</form>
|
|
<hr>
|
|
{files_html if files_html else "<p>No files yet.</p>"}
|
|
<p>
|
|
<a class="back-btn" href="/logout">Logout</a>
|
|
<a class="back-btn" href="/">Back to Home</a>
|
|
</p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
self.send_html(content)
|
|
|
|
def handle_upload(self):
|
|
username = self.get_session_user()
|
|
if not username:
|
|
self.send_redirect("/login")
|
|
return
|
|
|
|
if self.command != "POST":
|
|
self.send_html("<h1>Method not allowed</h1>", 405)
|
|
return
|
|
|
|
content_type = self.headers.get("Content-Type", "")
|
|
if not content_type.startswith("multipart/form-data"):
|
|
self.send_html("<h1>Invalid content type</h1>", 400)
|
|
return
|
|
|
|
boundary = content_type.split("boundary=")[1]
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
data = self.rfile.read(content_length)
|
|
|
|
parts = data.split(b"--" + boundary.encode())
|
|
file_data = None
|
|
filename = None
|
|
for part in parts:
|
|
if b'Content-Disposition: form-data; name="file"; filename="' in part:
|
|
header_end = part.find(b"\r\n\r\n")
|
|
if header_end == -1:
|
|
continue
|
|
headers = part[:header_end].decode("utf-8", errors="ignore")
|
|
filename_start = headers.find('filename="') + 10
|
|
filename_end = headers.find('"', filename_start)
|
|
filename = headers[filename_start:filename_end]
|
|
file_data = part[header_end + 4 : -2]
|
|
break
|
|
|
|
if not filename or not file_data:
|
|
self.send_html("<h1>File not found in request</h1>", 400)
|
|
return
|
|
|
|
file_size = len(file_data)
|
|
if not can_upload(username, file_size):
|
|
self.send_html("<h1>Quota exceeded</h1>", 403)
|
|
return
|
|
|
|
user_dir = get_user_dir(username)
|
|
filepath = os.path.join(user_dir, filename)
|
|
with open(filepath, "wb") as f:
|
|
f.write(file_data)
|
|
|
|
update_used_space(username, file_size / (1024 * 1024))
|
|
self.send_redirect("/files")
|
|
|
|
def handle_file_download(self, username, filename):
|
|
user_dir = os.path.join(get_storage_root(), username)
|
|
filepath = os.path.join(user_dir, filename)
|
|
|
|
if not os.path.exists(filepath) or not os.path.isfile(filepath):
|
|
self.send_html("<h1>File not found</h1>", 404)
|
|
return
|
|
|
|
mime_type, _ = mimetypes.guess_type(filename)
|
|
if not mime_type:
|
|
mime_type = "application/octet-stream"
|
|
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", mime_type)
|
|
self.send_header("Content-Disposition", f'attachment; filename="{filename}"')
|
|
self.end_headers()
|
|
|
|
with open(filepath, "rb") as f:
|
|
self.wfile.write(f.read())
|
|
|
|
def handle_delete(self):
|
|
username = self.get_session_user()
|
|
if not username:
|
|
self.send_redirect("/login")
|
|
return
|
|
|
|
if self.command != "POST":
|
|
self.send_html("<h1>Method not allowed</h1>", 405)
|
|
return
|
|
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
post_data = self.rfile.read(content_length).decode("utf-8")
|
|
params = parse_qs(post_data)
|
|
filename = params.get("filename", [""])[0]
|
|
|
|
if not filename:
|
|
self.send_html("<h1>Filename required</h1>", 400)
|
|
return
|
|
|
|
user_dir = get_user_dir(username)
|
|
filepath = os.path.join(user_dir, filename)
|
|
if os.path.exists(filepath):
|
|
file_size = os.path.getsize(filepath)
|
|
os.remove(filepath)
|
|
update_used_space(username, -file_size / (1024 * 1024))
|
|
|
|
self.send_redirect("/files")
|
|
|
|
def handle_logout(self):
|
|
cookies = self.headers.get("Cookie", "")
|
|
session_id = None
|
|
for cookie in cookies.split(";"):
|
|
if cookie.strip().startswith("session_id="):
|
|
session_id = cookie.strip().split("=", 1)[1]
|
|
break
|
|
if session_id:
|
|
delete_session(session_id)
|
|
self.send_response(302)
|
|
self.send_header(
|
|
"Set-Cookie", "session_id=; Path=/; Expires=Thu, 01 Jan 1970 00:00:00 GMT"
|
|
)
|
|
self.send_header("Location", "/")
|
|
self.end_headers()
|
|
|
|
def handle_explore(self):
|
|
users = []
|
|
for item in os.listdir(get_storage_root()):
|
|
if os.path.isdir(os.path.join(get_storage_root(), item)):
|
|
users.append(item)
|
|
|
|
users_html = ""
|
|
for user in users:
|
|
users_html += f"""
|
|
<div style="margin: 5px 0;">
|
|
<a href="/explore/{user}">{user}</a>
|
|
</div>
|
|
"""
|
|
|
|
content = f"""
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>Explore Files - SimpliestFS</title>
|
|
<style>
|
|
body {{ font-family: sans-serif; margin: 40px; }}
|
|
.back-btn {{ display: inline-block; padding: 5px 10px; background: #eee; border: 1px solid #ccc; text-decoration: none; color: #333; }}
|
|
h1 a {{ text-decoration: none; color: inherit; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h1><a href="/">SimpliestFS</a></h1>
|
|
<h2>Explore Files</h2>
|
|
<p>All users:</p>
|
|
{users_html if users_html else "<p>No users yet.</p>"}
|
|
<p><a class="back-btn" href="/">Back to Home</a></p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
self.send_html(content)
|
|
|
|
def handle_explore_user(self, username):
|
|
user_dir = os.path.join(get_storage_root(), username)
|
|
if not os.path.exists(user_dir):
|
|
self.send_html("<h1>User not found</h1>", 404)
|
|
return
|
|
|
|
files = []
|
|
for fname in os.listdir(user_dir):
|
|
fpath = os.path.join(user_dir, fname)
|
|
if os.path.isfile(fpath):
|
|
files.append(fname)
|
|
|
|
files_html = ""
|
|
for fname in files:
|
|
download_url = f"/explore/{username}/{fname}"
|
|
files_html += f"""
|
|
<div style="margin: 5px 0;">
|
|
<a href="{download_url}">{fname}</a>
|
|
</div>
|
|
"""
|
|
|
|
content = f"""
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<title>{username}'s Files - SimpliestFS</title>
|
|
<style>
|
|
body {{ font-family: sans-serif; margin: 40px; }}
|
|
.back-btn {{ display: inline-block; padding: 5px 10px; background: #eee; border: 1px solid #ccc; text-decoration: none; color: #333; }}
|
|
h1 a {{ text-decoration: none; color: inherit; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<h1><a href="/">SimpliestFS</a></h1>
|
|
<h2>{username}'s Files</h2>
|
|
{files_html if files_html else "<p>No files.</p>"}
|
|
<p><a class="back-btn" href="/explore">Back to All Users</a></p>
|
|
</body>
|
|
</html>
|
|
"""
|
|
self.send_html(content)
|
|
|
|
def handle_api_create_account(self):
|
|
if self.command != "POST":
|
|
self.send_html("<h1>Method not allowed</h1>", 405)
|
|
return
|
|
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
post_data = self.rfile.read(content_length).decode("utf-8")
|
|
params = parse_qs(post_data)
|
|
token = params.get("token", [""])[0]
|
|
username = params.get("username", [""])[0]
|
|
password = params.get("password", [""])[0]
|
|
quota_mb = params.get("drive_quota", [""])[0]
|
|
|
|
if token != CONFIG["security"]["api_token"]:
|
|
self.send_html("<h1>Invalid token</h1>", 403)
|
|
return
|
|
|
|
if not username or not password or not quota_mb.isdigit():
|
|
self.send_html("<h1>Invalid parameters</h1>", 400)
|
|
return
|
|
|
|
quota_mb = int(quota_mb)
|
|
if quota_mb <= 0:
|
|
self.send_html("<h1>Quota must be positive</h1>", 400)
|
|
return
|
|
|
|
cursor = database.DB_CONN.cursor()
|
|
try:
|
|
cursor.execute(
|
|
"INSERT INTO users (username, password_hash, quota_mb) VALUES (?, ?, ?)",
|
|
(username, hash_password(password), quota_mb),
|
|
)
|
|
database.DB_CONN.commit()
|
|
self.send_html("<h1>Account created</h1>")
|
|
except sqlite3.IntegrityError:
|
|
self.send_html("<h1>Username already exists</h1>", 409)
|
|
|
|
def handle_api_set_quota(self):
|
|
if self.command != "POST":
|
|
self.send_html("<h1>Method not allowed</h1>", 405)
|
|
return
|
|
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
post_data = self.rfile.read(content_length).decode("utf-8")
|
|
params = parse_qs(post_data)
|
|
token = params.get("token", [""])[0]
|
|
username = params.get("username", [""])[0]
|
|
quota_mb = params.get("drive_quota", [""])[0]
|
|
|
|
if token != CONFIG["security"]["api_token"]:
|
|
self.send_html("<h1>Invalid token</h1>", 403)
|
|
return
|
|
|
|
if not username or not quota_mb.isdigit():
|
|
self.send_html("<h1>Invalid parameters</h1>", 400)
|
|
return
|
|
|
|
quota_mb = int(quota_mb)
|
|
if quota_mb <= 0:
|
|
self.send_html("<h1>Quota must be positive</h1>", 400)
|
|
return
|
|
|
|
cursor = database.DB_CONN.cursor()
|
|
cursor.execute(
|
|
"UPDATE users SET quota_mb = ? WHERE username = ?", (quota_mb, username)
|
|
)
|
|
if cursor.rowcount == 0:
|
|
self.send_html("<h1>User not found</h1>", 404)
|
|
else:
|
|
database.DB_CONN.commit()
|
|
self.send_html("<h1>Quota updated</h1>")
|
|
|
|
def handle_api_delete_account(self):
|
|
if self.command != "POST":
|
|
self.send_html("<h1>Method not allowed</h1>", 405)
|
|
return
|
|
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
post_data = self.rfile.read(content_length).decode("utf-8")
|
|
params = parse_qs(post_data)
|
|
token = params.get("token", [""])[0]
|
|
username = params.get("username", [""])[0]
|
|
|
|
if token != CONFIG["security"]["api_token"]:
|
|
self.send_html("<h1>Invalid token</h1>", 403)
|
|
return
|
|
|
|
if not username:
|
|
self.send_html("<h1>Username required</h1>", 400)
|
|
return
|
|
|
|
cursor = database.DB_CONN.cursor()
|
|
cursor.execute("SELECT id FROM users WHERE username = ?", (username,))
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
self.send_html("<h1>User not found</h1>", 404)
|
|
return
|
|
|
|
user_dir = os.path.join(get_storage_root(), username)
|
|
if os.path.exists(user_dir):
|
|
for fname in os.listdir(user_dir):
|
|
os.remove(os.path.join(user_dir, fname))
|
|
os.rmdir(user_dir)
|
|
|
|
cursor.execute("DELETE FROM users WHERE username = ?", (username,))
|
|
database.DB_CONN.commit()
|
|
self.send_html("<h1>Account deleted</h1>")
|
|
|
|
def do_GET(self):
|
|
parsed_path = urlparse(self.path)
|
|
path = parsed_path.path
|
|
|
|
if path == "/":
|
|
self.handle_main_page()
|
|
elif path == "/register":
|
|
self.handle_register_page()
|
|
elif path == "/login":
|
|
self.handle_login_page()
|
|
elif path == "/files":
|
|
self.handle_files_page()
|
|
elif path == "/logout":
|
|
self.handle_logout()
|
|
elif path == "/explore":
|
|
self.handle_explore()
|
|
elif path.startswith("/explore/") and not path.endswith("/"):
|
|
parts = path[len("/explore/") :].split("/", 1)
|
|
if len(parts) == 2:
|
|
username, filename = parts
|
|
self.handle_file_download(username, filename)
|
|
else:
|
|
username = parts[0]
|
|
self.handle_explore_user(username)
|
|
else:
|
|
self.send_html("<h1>Not Found</h1>", 404)
|
|
|
|
def do_POST(self):
|
|
parsed_path = urlparse(self.path)
|
|
path = parsed_path.path
|
|
|
|
if path == "/login":
|
|
self.handle_login_page()
|
|
elif path == "/upload":
|
|
self.handle_upload()
|
|
elif path == "/delete":
|
|
self.handle_delete()
|
|
elif path == "/create_account":
|
|
self.handle_api_create_account()
|
|
elif path == "/set_quota":
|
|
self.handle_api_set_quota()
|
|
elif path == "/delete_account":
|
|
self.handle_api_delete_account()
|
|
else:
|
|
self.send_html("<h1>Not Found</h1>", 404)
|