simpliest_fs/database.py

111 lines
3.0 KiB
Python

import hashlib
import os
import sqlite3
from datetime import datetime
from config import get_storage_root
DB_CONN = None
def init_db():
global DB_CONN
DB_CONN = sqlite3.connect("fileserver.db", check_same_thread=False)
cursor = DB_CONN.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
quota_mb INTEGER NOT NULL,
used_mb REAL DEFAULT 0.0
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
session_id TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_access TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
DB_CONN.commit()
def hash_password(password):
return hashlib.sha256(password.encode()).hexdigest()
def get_user_quota(username):
cursor = DB_CONN.cursor()
cursor.execute(
"SELECT quota_mb, used_mb FROM users WHERE username = ?", (username,)
)
row = cursor.fetchone()
if row:
return row[0], row[1]
return None, None
def update_used_space(username, delta_mb):
cursor = DB_CONN.cursor()
cursor.execute(
"UPDATE users SET used_mb = used_mb + ? WHERE username = ?",
(delta_mb, username),
)
DB_CONN.commit()
def is_session_valid(session_id, session_timeout):
cursor = DB_CONN.cursor()
cursor.execute(
"SELECT username, last_access FROM sessions WHERE session_id = ?", (session_id,)
)
row = cursor.fetchone()
if not row:
return None
username, last_access = row
now = datetime.now()
if (now - datetime.fromisoformat(last_access)).total_seconds() > session_timeout:
cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
DB_CONN.commit()
return None
cursor.execute(
"UPDATE sessions SET last_access = ? WHERE session_id = ?",
(now.isoformat(), session_id),
)
DB_CONN.commit()
return username
def create_session(username):
import time
session_id = hashlib.sha256(f"{username}{time.time()}".encode()).hexdigest()
cursor = DB_CONN.cursor()
cursor.execute(
"INSERT INTO sessions (username, session_id) VALUES (?, ?)",
(username, session_id),
)
DB_CONN.commit()
return session_id
def delete_session(session_id):
cursor = DB_CONN.cursor()
cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
DB_CONN.commit()
def get_user_used_space(username):
user_dir = os.path.join(get_storage_root(), username)
if not os.path.exists(user_dir):
return 0.0
total = 0
for dirpath, dirnames, filenames in os.walk(user_dir):
for f in filenames:
fp = os.path.join(dirpath, f)
total += os.path.getsize(fp)
return total / (1024 * 1024)