import hashlib import os import sqlite3 from datetime import datetime from config import get_storage_root DB_CONN = None def init_db(): global DB_CONN DB_CONN = sqlite3.connect("fileserver.db", check_same_thread=False) cursor = DB_CONN.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, quota_mb INTEGER NOT NULL, used_mb REAL DEFAULT 0.0 ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL, session_id TEXT UNIQUE NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_access TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) DB_CONN.commit() def hash_password(password): return hashlib.sha256(password.encode()).hexdigest() def get_user_quota(username): cursor = DB_CONN.cursor() cursor.execute( "SELECT quota_mb, used_mb FROM users WHERE username = ?", (username,) ) row = cursor.fetchone() if row: return row[0], row[1] return None, None def update_used_space(username, delta_mb): cursor = DB_CONN.cursor() cursor.execute( "UPDATE users SET used_mb = used_mb + ? WHERE username = ?", (delta_mb, username), ) DB_CONN.commit() def is_session_valid(session_id, session_timeout): cursor = DB_CONN.cursor() cursor.execute( "SELECT username, last_access FROM sessions WHERE session_id = ?", (session_id,) ) row = cursor.fetchone() if not row: return None username, last_access = row now = datetime.now() if (now - datetime.fromisoformat(last_access)).total_seconds() > session_timeout: cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,)) DB_CONN.commit() return None cursor.execute( "UPDATE sessions SET last_access = ? WHERE session_id = ?", (now.isoformat(), session_id), ) DB_CONN.commit() return username def create_session(username): import time session_id = hashlib.sha256(f"{username}{time.time()}".encode()).hexdigest() cursor = DB_CONN.cursor() cursor.execute( "INSERT INTO sessions (username, session_id) VALUES (?, ?)", (username, session_id), ) DB_CONN.commit() return session_id def delete_session(session_id): cursor = DB_CONN.cursor() cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,)) DB_CONN.commit() def get_user_used_space(username): user_dir = os.path.join(get_storage_root(), username) if not os.path.exists(user_dir): return 0.0 total = 0 for dirpath, dirnames, filenames in os.walk(user_dir): for f in filenames: fp = os.path.join(dirpath, f) total += os.path.getsize(fp) return total / (1024 * 1024)