import os
import sys
import yaml
import sqlite3
import time
import threading
import hashlib
import mimetypes
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
from datetime import datetime
# Global config
CONFIG = {}
# Global session tracker
SESSIONS = {}
SESSION_LOCK = threading.Lock()
# Global file storage root
STORAGE_ROOT = ""
# Global DB connection
DB_CONN = None
# Load config
def load_config():
global CONFIG, STORAGE_ROOT
with open("config.yaml", "r") as f:
CONFIG = yaml.safe_load(f)
STORAGE_ROOT = CONFIG["storage"]["root_dir"]
os.makedirs(STORAGE_ROOT, exist_ok=True)
# Initialize DB
def init_db():
global DB_CONN
DB_CONN = sqlite3.connect("fileserver.db", check_same_thread=False)
cursor = DB_CONN.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
quota_mb INTEGER NOT NULL,
used_mb REAL DEFAULT 0.0
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
session_id TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_access TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
DB_CONN.commit()
# Utility: hash password
def hash_password(password):
return hashlib.sha256(password.encode()).hexdigest()
# Utility: get user quota
def get_user_quota(username):
cursor = DB_CONN.cursor()
cursor.execute("SELECT quota_mb, used_mb FROM users WHERE username = ?", (username,))
row = cursor.fetchone()
if row:
return row[0], row[1]
return None, None
# Utility: update used space
def update_used_space(username, delta_mb):
cursor = DB_CONN.cursor()
cursor.execute("UPDATE users SET used_mb = used_mb + ? WHERE username = ?", (delta_mb, username))
DB_CONN.commit()
# Utility: check if session is valid
def is_session_valid(session_id):
cursor = DB_CONN.cursor()
cursor.execute("SELECT username, last_access FROM sessions WHERE session_id = ?", (session_id,))
row = cursor.fetchone()
if not row:
return None
# TODO: ???
username, last_access = row
now = datetime.now()
session_timeout = CONFIG["server"]["session_timeout"]
if (now - datetime.fromisoformat(last_access)).total_seconds() > session_timeout:
# Expire session
cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
DB_CONN.commit()
return None
# Update last access
cursor.execute("UPDATE sessions SET last_access = ? WHERE session_id = ?", (now.isoformat(), session_id))
DB_CONN.commit()
return username
# Utility: create session
def create_session(username):
session_id = hashlib.sha256(f"{username}{time.time()}".encode()).hexdigest()
cursor = DB_CONN.cursor()
cursor.execute("INSERT INTO sessions (username, session_id) VALUES (?, ?)", (username, session_id))
DB_CONN.commit()
return session_id
# Utility: delete session
def delete_session(session_id):
cursor = DB_CONN.cursor()
cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
DB_CONN.commit()
# Utility: get user files
def get_user_files(username):
user_dir = os.path.join(STORAGE_ROOT, username)
if not os.path.exists(user_dir):
return []
files = []
for fname in os.listdir(user_dir):
fpath = os.path.join(user_dir, fname)
if os.path.isfile(fpath):
files.append(fname)
return files
# Utility: get total used space for user
def get_user_used_space(username):
user_dir = os.path.join(STORAGE_ROOT, username)
if not os.path.exists(user_dir):
return 0.0
total = 0
for dirpath, dirnames, filenames in os.walk(user_dir):
for f in filenames:
fp = os.path.join(dirpath, f)
total += os.path.getsize(fp)
return total / (1024 * 1024) # MB
# Utility: check if user can upload
def can_upload(username, file_size_bytes):
quota_mb, used_mb = get_user_quota(username)
if quota_mb is None:
return False
file_size_mb = file_size_bytes / (1024 * 1024)
return (used_mb + file_size_mb) <= quota_mb
# Utility: get user directory
def get_user_dir(username):
user_dir = os.path.join(STORAGE_ROOT, username)
os.makedirs(user_dir, exist_ok=True)
return user_dir
# HTTP Request Handler
class FileServerHandler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
#super().__init__(*args, directory=None, **kwargs)
super().__init__(*args, **kwargs)
def log_message(self, format, *args):
# Override to avoid logging to stderr
pass
def send_html(self, content, status=200):
self.send_response(status)
self.send_header("Content-type", "text/html; charset=utf-8")
self.end_headers()
self.wfile.write(content.encode("utf-8"))
def send_redirect(self, location, status=302):
self.send_response(status)
self.send_header("Location", location)
self.end_headers()
def get_session_user(self):
cookies = self.headers.get("Cookie", "")
session_id = None
for cookie in cookies.split(";"):
if cookie.strip().startswith("session_id="):
session_id = cookie.strip().split("=", 1)[1]
break
if not session_id:
return None
return is_session_valid(session_id)
def handle_main_page(self):
content = f"""
{CONFIG['ui']['title']}
{CONFIG['ui']['title']}
{CONFIG['ui']['disclaimer']}
Contact: {CONFIG['ui']['contact_email']}
Register
Login
Explore Files
Note: All files are stored locally. No encryption. No external databases.
"""
self.send_html(content)
def handle_register_page(self):
content = f"""
Register - {CONFIG['ui']['title']}
Register
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.
Back to Home
"""
self.send_html(content)
def handle_login_page(self):
if self.path == "/login" and self.command == "POST":
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8')
params = parse_qs(post_data)
username = params.get('username', [''])[0]
password = params.get('password', [''])[0]
cursor = DB_CONN.cursor()
cursor.execute("SELECT password_hash FROM users WHERE username = ?", (username,))
row = cursor.fetchone()
if row and row[0] == hash_password(password):
session_id = create_session(username)
# Insert or replace the session record
cursor.execute('''
INSERT OR REPLACE INTO sessions (username, session_id, created_at, last_access)
VALUES (?, ?, ?, ?)
''', (username, session_id, datetime.now(), datetime.now()))
self.send_response(302)
self.send_header("Set-Cookie", f"session_id={session_id}; Path=/; HttpOnly")
self.send_header("Location", "/files")
self.end_headers()
return
else:
# Show login form with error
error = "Invalid username or password"
content = self.render_login_form(error)
self.send_html(content, 401)
return
# GET /login
content = self.render_login_form()
self.send_html(content)
def render_login_form(self, error=""):
content = f"""
Login - {CONFIG['ui']['title']}
Login
{f'{error}
' if error else ''}
Back to Home
"""
return content
def handle_files_page(self):
username = self.get_session_user()
if not username:
self.send_redirect("/login")
return
# Update used space
used_mb = get_user_used_space(username)
cursor = DB_CONN.cursor()
cursor.execute("UPDATE users SET used_mb = ? WHERE username = ?", (used_mb, username))
DB_CONN.commit()
# Get files
files = get_user_files(username)
# Render files list
files_html = ""
for fname in files:
download_url = f"/explore/{username}/{fname}" # ✅ Download link added here
files_html += f"""
"""
content = f"""
My Files - {CONFIG['ui']['title']}
My Files
Quota: {CONFIG['storage']['default_quota_mb']} MB | Used: {used_mb:.2f} MB
{files_html if files_html else "No files yet.
"}
Logout
"""
self.send_html(content)
def handle_upload(self):
username = self.get_session_user()
if not username:
self.send_redirect("/login")
return
if self.command != "POST":
self.send_html("Method not allowed
", 405)
return
# Parse multipart form
content_type = self.headers.get('Content-Type', '')
if not content_type.startswith('multipart/form-data'):
self.send_html("Invalid content type
", 400)
return
boundary = content_type.split("boundary=")[1]
content_length = int(self.headers.get('Content-Length', 0))
data = self.rfile.read(content_length)
# Find the file part
parts = data.split(b'--' + boundary.encode())
file_data = None
filename = None
for part in parts:
if b'Content-Disposition: form-data; name="file"; filename="' in part:
# Extract filename
header_end = part.find(b'\r\n\r\n')
if header_end == -1:
continue
headers = part[:header_end].decode('utf-8', errors='ignore')
filename_start = headers.find('filename="') + 10
filename_end = headers.find('"', filename_start)
filename = headers[filename_start:filename_end]
file_data = part[header_end + 4:-2] # remove trailing \r\n
break
if not filename or not file_data:
self.send_html("File not found in request
", 400)
return
# Check quota
file_size = len(file_data)
if not can_upload(username, file_size):
self.send_html("Quota exceeded
", 403)
return
# Save file
user_dir = get_user_dir(username)
filepath = os.path.join(user_dir, filename)
with open(filepath, "wb") as f:
f.write(file_data)
# Update used space
update_used_space(username, file_size / (1024 * 1024))
# Redirect back to files
self.send_redirect("/files")
def handle_file_download(self, username, filename):
user_dir = os.path.join(STORAGE_ROOT, username)
filepath = os.path.join(user_dir, filename)
if not os.path.exists(filepath) or not os.path.isfile(filepath):
self.send_html("File not found
", 404)
return
# Get MIME type
mime_type, _ = mimetypes.guess_type(filename)
if not mime_type:
mime_type = "application/octet-stream"
self.send_response(200)
self.send_header("Content-Type", mime_type)
self.send_header("Content-Disposition", f'attachment; filename="{filename}"')
self.end_headers()
with open(filepath, "rb") as f:
self.wfile.write(f.read())
def handle_delete(self):
username = self.get_session_user()
if not username:
self.send_redirect("/login")
return
if self.command != "POST":
self.send_html("Method not allowed
", 405)
return
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8')
params = parse_qs(post_data)
filename = params.get('filename', [''])[0]
if not filename:
self.send_html("Filename required
", 400)
return
user_dir = get_user_dir(username)
filepath = os.path.join(user_dir, filename)
if os.path.exists(filepath):
file_size = os.path.getsize(filepath)
os.remove(filepath)
# Update used space
update_used_space(username, -file_size / (1024 * 1024))
# Redirect back to files
self.send_redirect("/files")
def handle_logout(self):
cookies = self.headers.get("Cookie", "")
session_id = None
for cookie in cookies.split(";"):
if cookie.strip().startswith("session_id="):
session_id = cookie.strip().split("=", 1)[1]
break
if session_id:
delete_session(session_id)
self.send_response(302)
self.send_header("Set-Cookie", "session_id=; Path=/; Expires=Thu, 01 Jan 1970 00:00:00 GMT")
self.send_header("Location", "/")
self.end_headers()
def handle_explore(self):
# List all user directories
users = []
for item in os.listdir(STORAGE_ROOT):
if os.path.isdir(os.path.join(STORAGE_ROOT, item)):
users.append(item)
users_html = ""
for user in users:
users_html += f"""
"""
content = f"""
Explore Files - {CONFIG['ui']['title']}
Explore Files
All users:
{users_html if users_html else "No users yet.
"}
Back to Home
"""
self.send_html(content)
def handle_explore_user(self, username):
user_dir = os.path.join(STORAGE_ROOT, username)
if not os.path.exists(user_dir):
self.send_html("User not found
", 404)
return
files = []
for fname in os.listdir(user_dir):
fpath = os.path.join(user_dir, fname)
if os.path.isfile(fpath):
files.append(fname)
files_html = ""
for fname in files:
download_url = f"/explore/{username}/{fname}"
files_html += f"""
"""
content = f"""
{username}'s Files - {CONFIG['ui']['title']}
{username}'s Files
{files_html if files_html else "No files.
"}
Back to All Users
"""
self.send_html(content)
def handle_api_create_account(self):
if self.command != "POST":
self.send_html("Method not allowed
", 405)
return
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8')
params = parse_qs(post_data)
token = params.get('token', [''])[0]
username = params.get('username', [''])[0]
password = params.get('password', [''])[0]
quota_mb = params.get('drive_quota', [''])[0]
if token != CONFIG["security"]["api_token"]:
self.send_html("Invalid token
", 403)
return
if not username or not password or not quota_mb.isdigit():
self.send_html("Invalid parameters
", 400)
return
quota_mb = int(quota_mb)
if quota_mb <= 0:
self.send_html("Quota must be positive
", 400)
return
cursor = DB_CONN.cursor()
try:
cursor.execute("INSERT INTO users (username, password_hash, quota_mb) VALUES (?, ?, ?)",
(username, hash_password(password), quota_mb))
DB_CONN.commit()
self.send_html("Account created
")
except sqlite3.IntegrityError:
self.send_html("Username already exists
", 409)
def handle_api_set_quota(self):
if self.command != "POST":
self.send_html("Method not allowed
", 405)
return
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8')
params = parse_qs(post_data)
token = params.get('token', [''])[0]
username = params.get('username', [''])[0]
quota_mb = params.get('drive_quota', [''])[0]
if token != CONFIG["security"]["api_token"]:
self.send_html("Invalid token
", 403)
return
if not username or not quota_mb.isdigit():
self.send_html("Invalid parameters
", 400)
return
quota_mb = int(quota_mb)
if quota_mb <= 0:
self.send_html("Quota must be positive
", 400)
return
cursor = DB_CONN.cursor()
cursor.execute("UPDATE users SET quota_mb = ? WHERE username = ?", (quota_mb, username))
if cursor.rowcount == 0:
self.send_html("User not found
", 404)
else:
DB_CONN.commit()
self.send_html("Quota updated
")
def handle_api_delete_account(self):
if self.command != "POST":
self.send_html("Method not allowed
", 405)
return
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8')
params = parse_qs(post_data)
token = params.get('token', [''])[0]
username = params.get('username', [''])[0]
if token != CONFIG["security"]["api_token"]:
self.send_html("Invalid token
", 403)
return
if not username:
self.send_html("Username required
", 400)
return
cursor = DB_CONN.cursor()
cursor.execute("SELECT id FROM users WHERE username = ?", (username,))
row = cursor.fetchone()
if not row:
self.send_html("User not found
", 404)
return
# Delete user files
user_dir = os.path.join(STORAGE_ROOT, username)
if os.path.exists(user_dir):
for fname in os.listdir(user_dir):
os.remove(os.path.join(user_dir, fname))
os.rmdir(user_dir)
# Delete from DB
cursor.execute("DELETE FROM users WHERE username = ?", (username,))
DB_CONN.commit()
self.send_html("Account deleted
")
def do_GET(self):
parsed_path = urlparse(self.path)
path = parsed_path.path
if path == "/":
self.handle_main_page()
elif path == "/register":
self.handle_register_page()
elif path == "/login":
self.handle_login_page()
elif path == "/files":
self.handle_files_page()
elif path == "/logout":
self.handle_logout()
elif path == "/explore":
self.handle_explore()
elif path.startswith("/explore/") and not path.endswith("/"):
# Check if it's a file download: /explore/username/filename.ext
parts = path[len("/explore/"):].split("/", 1)
if len(parts) == 2:
username, filename = parts
self.handle_file_download(username, filename)
else:
# It's a user directory listing
username = parts[0]
self.handle_explore_user(username)
else:
self.send_html("Not Found
", 404)
def do_POST(self):
parsed_path = urlparse(self.path)
path = parsed_path.path
if path == "/login":
self.handle_login_page()
elif path == "/upload":
self.handle_upload()
elif path == "/delete":
self.handle_delete()
elif path == "/create_account":
self.handle_api_create_account()
elif path == "/set_quota":
self.handle_api_set_quota()
elif path == "/delete_account":
self.handle_api_delete_account()
else:
self.send_html("Not Found
", 404)
# Session cleanup thread
def session_cleanup():
while True:
time.sleep(60) # Check every minute
now = datetime.now()
session_timeout = CONFIG["server"]["session_timeout"]
cursor = DB_CONN.cursor()
cursor.execute("SELECT session_id, last_access FROM sessions")
rows = cursor.fetchall()
for session_id, last_access in rows:
if (now - datetime.fromisoformat(last_access)).total_seconds() > session_timeout:
cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
DB_CONN.commit()
def main():
load_config()
init_db()
# Start session cleanup thread
cleanup_thread = threading.Thread(target=session_cleanup, daemon=True)
cleanup_thread.start()
# Start server
server_address = (CONFIG["server"]["host"], CONFIG["server"]["port"])
httpd = HTTPServer(server_address, FileServerHandler)
httpd.serve_forever()
if __name__ == "__main__":
main()