from flask import Flask, request, redirect, session, Response, jsonify
import json, os, random, time, uuid, re, html, base64, hashlib, hmac, struct
from datetime import datetime
from functools import wraps
from urllib.parse import quote
app = Flask(__name__)
app.secret_key = "bank_secret_key_2025"
app.config['SESSION_PERMANENT'] = True
app.config['PERMANENT_SESSION_LIFETIME'] = 3600
USERS_FILE = "users.json"
HISTORY_FILE = "history.json"
SHOP_FILE = "shop.json"
PAYMENTS_FILE = "payments.json" # Новый файл для платежей
JOURNAL_FILE = "journal.json"
CLASSES_FILE = "classes.json"
SUBJECTS_FILE = "subjects.json"
PARENT_LINKS_FILE = "parent_links.json"
CRYPTO_FILE = "crypto.json"
SERKRIPTO_NAME = "SerKripto"
SERKRIPTO_SYMBOL = "SERK"
ROLE_ADMIN = "admin"
ROLE_MANAGER = "manager"
ROLE_TEACHER = "teacher"
ROLE_PARENT = "parent"
ROLE_STUDENT = "student"
ROLE_CLIENT = "client"
VALID_ROLES = {
ROLE_ADMIN,
ROLE_MANAGER,
ROLE_TEACHER,
ROLE_PARENT,
ROLE_STUDENT,
ROLE_CLIENT
}
FINANCE_ROLES = (ROLE_ADMIN, ROLE_MANAGER, ROLE_TEACHER, ROLE_PARENT, ROLE_STUDENT, ROLE_CLIENT)
CREDIT_ROLES = (ROLE_ADMIN, ROLE_MANAGER, ROLE_TEACHER, ROLE_PARENT, ROLE_CLIENT)
CRYPTO_ROLES = (ROLE_ADMIN, ROLE_MANAGER, ROLE_TEACHER, ROLE_PARENT, ROLE_CLIENT)
SHOP_SELL_ROLES = (ROLE_ADMIN, ROLE_MANAGER, ROLE_TEACHER, ROLE_PARENT, ROLE_CLIENT)
JOURNAL_ROLES = (ROLE_ADMIN, ROLE_TEACHER)
TOTP_DIGITS = 6
TOTP_PERIOD_SEC = 30
TOTP_DRIFT_WINDOWS = 1
PENDING_2FA_TTL_SEC = 300
def normalize_role(role_value):
role = str(role_value or ROLE_CLIENT).strip().lower()
if role not in VALID_ROLES:
return ROLE_CLIENT
return role
def parse_bool(value):
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return value != 0
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "on"}
return False
def normalize_totp_secret(secret_value):
cleaned = re.sub(r"[^A-Z2-7]", "", str(secret_value or "").upper())
return cleaned
def generate_totp_secret(length=32):
alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"
return "".join(random.choice(alphabet) for _ in range(max(16, int(length))))
def generate_totp_code(secret, for_timestamp=None):
normalized_secret = normalize_totp_secret(secret)
if not normalized_secret:
return None
pad_len = (8 - (len(normalized_secret) % 8)) % 8
padded = normalized_secret + ("=" * pad_len)
try:
key = base64.b32decode(padded, casefold=True)
except Exception:
return None
timestamp = int(time.time()) if for_timestamp is None else int(for_timestamp)
counter = int(timestamp // TOTP_PERIOD_SEC)
counter_bytes = struct.pack(">Q", counter)
digest = hmac.new(key, counter_bytes, hashlib.sha1).digest()
offset = digest[-1] & 0x0F
binary = (
((digest[offset] & 0x7F) << 24)
| ((digest[offset + 1] & 0xFF) << 16)
| ((digest[offset + 2] & 0xFF) << 8)
| (digest[offset + 3] & 0xFF)
)
return str(binary % (10 ** TOTP_DIGITS)).zfill(TOTP_DIGITS)
def verify_totp_code(secret, raw_code):
code = re.sub(r"\D", "", str(raw_code or ""))
if len(code) != TOTP_DIGITS:
return False
now = int(time.time())
for offset in range(-TOTP_DRIFT_WINDOWS, TOTP_DRIFT_WINDOWS + 1):
check_time = now + (offset * TOTP_PERIOD_SEC)
expected = generate_totp_code(secret, check_time)
if expected and code == expected:
return True
return False
def get_totp_uri(username, secret):
issuer = quote("Bank School")
account = quote(str(username or "user"))
return f"otpauth://totp/{issuer}:{account}?secret={secret}&issuer={issuer}&digits={TOTP_DIGITS}&period={TOTP_PERIOD_SEC}"
def clear_pending_2fa_session():
session.pop("pending_2fa_user", None)
session.pop("pending_2fa_started", None)
def is_pending_2fa_alive():
started = int(session.get("pending_2fa_started", 0) or 0)
if started <= 0:
return False
return (int(time.time()) - started) <= PENDING_2FA_TTL_SEC
def get_user_role(user_data):
if not isinstance(user_data, dict):
return ROLE_CLIENT
return normalize_role(user_data.get("role", ROLE_CLIENT))
def has_role(user_data, allowed_roles):
role = get_user_role(user_data)
normalized_allowed = {normalize_role(r) for r in allowed_roles}
return role in normalized_allowed
def default_admin_user():
return {
"password": "CreateSergeyPass",
"balance": 0,
"deposit": 0,
"credit": 0,
"serkripto": 0.0,
"role": "admin",
"full_name": "Администратор Системы",
"email": "kompania.bank@gmail.com",
"api_key": "Sergey_API_KEY_1",
"twofa_enabled": False,
"twofa_secret": "",
}
# ---------- ИНИЦИАЛИЗАЦИЯ ФАЙЛОВ ----------
def init_files():
"""Инициализация файлов (без удаления существующих)"""
# Создаем файлы если они не существуют
files_to_init = [
(USERS_FILE, {"Sergey": default_admin_user()}),
(HISTORY_FILE, []),
(SHOP_FILE, {"items": [], "ads": []}),
(PAYMENTS_FILE, []), # Инициализация файла платежей
(JOURNAL_FILE, {}),
(CLASSES_FILE, {}),
(SUBJECTS_FILE, []),
(PARENT_LINKS_FILE, {}),
(CRYPTO_FILE, {
"name": SERKRIPTO_NAME,
"symbol": SERKRIPTO_SYMBOL,
"price": 100.0,
"min_price": 1.0,
"max_price": 1000000.0,
"price_step_buy": 0.012,
"price_step_sell": 0.01,
"idle_decay_rate": 0.0015,
"idle_decay_interval_sec": 30,
"last_buy_at": int(time.time()),
"last_update": int(time.time()),
"stats": {
"buy_ops": 0,
"sell_ops": 0,
"buy_volume": 0.0,
"sell_volume": 0.0
}
})
]
for filename, default_data in files_to_init:
if not os.path.exists(filename):
with open(filename, "w", encoding='utf-8') as f:
json.dump(default_data, f, indent=4, ensure_ascii=False)
# Инициализируем файлы при запуске
init_files()
# Совместимые no-op хуки уведомлений
def ws_emit_system(event_name, payload=None):
return None
def ws_emit_user(username, event_name, payload=None):
return None
def ws_emit_balance_update(username):
return None
# ---------- ФУНКЦИИ ДЛЯ РАБОТЫ С ДАННЫМИ ----------
def load_users():
try:
with open(USERS_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
# Убеждаемся, что данные в правильном формате
if not isinstance(data, dict):
print("Warning: users.json is not a dictionary, resetting to default")
data = {"admin": default_admin_user()}
save_users(data)
return data
# Проверяем, что все значения являются словарями
cleaned_data = {}
for username, user_data in data.items():
if isinstance(user_data, dict):
user_data["role"] = get_user_role(user_data)
try:
user_data["serkripto"] = round(float(user_data.get("serkripto", 0.0)), 6)
except:
user_data["serkripto"] = 0.0
user_data["twofa_secret"] = normalize_totp_secret(user_data.get("twofa_secret", ""))
user_data["twofa_enabled"] = parse_bool(user_data.get("twofa_enabled", False))
if not user_data["twofa_secret"]:
user_data["twofa_enabled"] = False
cleaned_data[username] = user_data
else:
print(f"Warning: User {username} data is not a dict, resetting")
if username == "admin":
cleaned_data[username] = default_admin_user()
else:
cleaned_data[username] = {
"password": "reset123",
"balance": 1000,
"deposit": 0,
"credit": 0,
"serkripto": 0.0,
"role": "client",
"full_name": "Пользователь",
"email": "user@example.com",
"api_key": None,
"twofa_enabled": False,
"twofa_secret": "",
}
# Гарантируем наличие рабочего admin даже после ручной порчи users.json
admin_defaults = default_admin_user()
admin_user = cleaned_data.get("admin")
if not isinstance(admin_user, dict):
cleaned_data["admin"] = admin_defaults
else:
for key, value in admin_defaults.items():
admin_user.setdefault(key, value)
admin_user["role"] = ROLE_ADMIN
if not admin_user.get("password"):
admin_user["password"] = admin_defaults["password"]
if cleaned_data != data:
save_users(cleaned_data)
return cleaned_data
except Exception as e:
print(f"Error loading users: {e}")
data = {"admin": default_admin_user()}
save_users(data)
return data
def save_users(data):
with open(USERS_FILE, "w", encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
def load_history():
try:
with open(HISTORY_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return []
def save_history(data):
with open(HISTORY_FILE, "w", encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
# ---------- ЭЛЕКТРОННЫЙ ДНЕВНИК ----------
def load_journal():
try:
with open(JOURNAL_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except:
return {}
def save_journal(data):
with open(JOURNAL_FILE, "w", encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
def load_classes():
try:
with open(CLASSES_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except:
return {}
def save_classes(data):
with open(CLASSES_FILE, "w", encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
def load_subjects():
try:
with open(SUBJECTS_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, list) else []
except:
return []
def save_subjects(data):
with open(SUBJECTS_FILE, "w", encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
def load_parent_links():
try:
with open(PARENT_LINKS_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except:
return {}
def save_parent_links(data):
with open(PARENT_LINKS_FILE, "w", encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
def get_parent_children(parent_username):
links = load_parent_links()
children = links.get(parent_username, [])
if not isinstance(children, list):
return []
unique_children = []
for child in children:
child_name = str(child).strip()
if child_name and child_name not in unique_children:
unique_children.append(child_name)
return unique_children
def calculate_money_for_grade(grade):
grade = int(grade)
mapping = {
5: 20,
4: 10,
3: -10,
2: -20
}
return mapping.get(grade, 0)
# ---------- ФУНКЦИИ ДЛЯ ПЛАТЕЖЕЙ ----------
def load_payments():
"""Загрузка платежей"""
try:
with open(PAYMENTS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return []
def save_payments(data):
"""Сохранение платежей"""
with open(PAYMENTS_FILE, "w", encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
def get_payment_by_id(payment_id):
payments = load_payments()
for payment in payments:
if payment.get("id") == payment_id:
return payment
return None
def sanitize_payment_output(payment):
if not isinstance(payment, dict):
return payment
safe_payment = payment.copy()
balances = safe_payment.get("balances")
if isinstance(balances, dict):
safe_payment["balances"] = {
"payer_before": balances.get("payer_before"),
"payer_after": balances.get("payer_after")
}
return safe_payment
def ws_emit_payment_event(event_name, payment, extra_payload=None):
if not isinstance(payment, dict):
return
payload = {
"id": payment.get("id"),
"status": payment.get("status"),
"payer": payment.get("payer"),
"receiver": payment.get("receiver"),
"amount": payment.get("amount"),
"allow_any_payer": bool(payment.get("allow_any_payer", False)),
"created_by": payment.get("created_by")
}
if isinstance(extra_payload, dict):
payload.update(extra_payload)
ws_emit_system(event_name, payload)
notified_users = set()
for username in [payment.get("payer"), payment.get("receiver"), payment.get("created_by")]:
username = str(username or "").strip()
if not username or username == "*" or username in notified_users:
continue
ws_emit_user(username, event_name, payload)
notified_users.add(username)
def ws_emit_crypto_market_event(market, reason="update"):
if not isinstance(market, dict):
return
payload = {
"reason": reason,
"coin": market.get("name", SERKRIPTO_NAME),
"symbol": market.get("symbol", SERKRIPTO_SYMBOL),
"price_rub": round(float(market.get("price", 0.0)), 4),
"min_price_rub": round(float(market.get("min_price", 1.0)), 4),
"max_price_rub": round(float(market.get("max_price", 1000000.0)), 4),
"time": int(time.time())
}
ws_emit_system("crypto.market", payload)
def create_payment(payer, receiver, amount, description="", external_id="", allow_any_payer=False, created_by=None):
"""Создание платежа"""
payments = load_payments()
payment_id = str(uuid.uuid4())[:12]
safe_payer = "*" if allow_any_payer else payer
payment_data = {
"id": payment_id,
"payer": safe_payer,
"receiver": receiver,
"amount": int(amount),
"description": description,
"external_id": external_id,
"allow_any_payer": bool(allow_any_payer),
"created_by": created_by if created_by else payer,
"created": int(time.time()),
"status": "pending", # pending, completed, failed, cancelled
"completed_at": None
}
payments.append(payment_data)
save_payments(payments)
ws_emit_payment_event("payment.created", payment_data)
return payment_id, payment_data
def process_payment(payment_id, payer_username=None):
"""Обработка платежа"""
payments = load_payments()
users = load_users()
payment = None
for p in payments:
if p["id"] == payment_id:
payment = p
break
if not payment:
return False, "Платеж не найден"
if payment["status"] != "pending":
return False, f"Платеж уже обработан (статус: {payment['status']})"
receiver = payment["receiver"]
amount = payment["amount"]
allow_any_payer = bool(payment.get("allow_any_payer", False))
if allow_any_payer:
if not payer_username:
return False, "Для этого платежа нужен логин плательщика"
payer = payer_username
else:
payer = payment["payer"]
if payer_username and payer != payer_username:
return False, "Неверный плательщик"
# Проверяем существование пользователей
if payer not in users:
return False, "Плательщик не найден"
if receiver not in users:
return False, "Получатель не найден"
if payer == receiver:
return False, "Нельзя оплатить самому себе"
# Проверяем баланс плательщика
if users[payer]["balance"] < amount:
return False, "Недостаточно средств"
payer_balance_before = users[payer]["balance"]
receiver_balance_before = users[receiver]["balance"]
# Выполняем перевод
users[payer]["balance"] -= amount
users[receiver]["balance"] += amount
save_users(users)
payer_balance_after = users[payer]["balance"]
receiver_balance_after = users[receiver]["balance"]
# Обновляем статус платежа
for p in payments:
if p["id"] == payment_id:
if allow_any_payer:
p["payer"] = payer
p["status"] = "completed"
p["completed_at"] = int(time.time())
p["balances"] = {
"payer_before": payer_balance_before,
"payer_after": payer_balance_after,
"receiver_before": receiver_balance_before,
"receiver_after": receiver_balance_after
}
break
save_payments(payments)
# Добавляем в историю
add_history(payer, "payment_sent", amount, f"Платеж #{payment_id} для {receiver}")
add_history(receiver, "payment_received", amount, f"Платеж #{payment_id} от {payer}")
ws_emit_payment_event("payment.completed", payment, {
"payer_before": payer_balance_before,
"payer_after": payer_balance_after
})
return True, "Платеж успешно выполнен"
def cancel_payment(payment_id):
"""Отмена платежа"""
payments = load_payments()
for payment in payments:
if payment["id"] == payment_id and payment["status"] == "pending":
payment["status"] = "cancelled"
save_payments(payments)
ws_emit_payment_event("payment.cancelled", payment)
return True, "Платеж отменен"
return False, "Платеж не найден или уже обработан"
# ---------- ФУНКЦИИ ДЛЯ МАГАЗИНА ----------
def load_shop():
"""Загрузка данных магазина"""
try:
with open(SHOP_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
# Обеспечиваем наличие всех ключей
if "items" not in data:
data["items"] = []
if "ads" not in data:
data["ads"] = []
return data
except:
return {"items": [], "ads": []}
def save_shop(data):
"""Сохранение данных магазина"""
with open(SHOP_FILE, "w", encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
def create_shop_item(seller_username, title, description, price, category="other"):
"""Создание товара в магазине"""
shop_data = load_shop()
item_id = str(uuid.uuid4())[:8]
new_item = {
"id": item_id,
"seller": seller_username,
"title": title,
"description": description,
"price": int(price),
"category": category,
"created": int(time.time()),
"status": "available",
"buyer": None
}
shop_data["items"].append(new_item)
save_shop(shop_data)
# Добавляем в историю
add_history(seller_username, "create_shop_item", 0, f"Товар: {title}")
ws_emit_system("shop.item_created", {
"item_id": item_id,
"seller": seller_username,
"title": title,
"price": int(price),
"category": category
})
ws_emit_user(seller_username, "shop.item_created", {
"item_id": item_id,
"title": title,
"price": int(price)
})
return item_id
def create_advertisement(author, title, content, contact_info):
"""Создание объявления"""
shop_data = load_shop()
ad_id = str(uuid.uuid4())[:8]
new_ad = {
"id": ad_id,
"author": author,
"title": title,
"content": content,
"contact_info": contact_info,
"created": int(time.time()),
"status": "active"
}
shop_data["ads"].append(new_ad)
save_shop(shop_data)
# Добавляем в историю
add_history(author, "create_advertisement", 0, f"Объявление: {title}")
ws_emit_system("shop.ad_created", {
"ad_id": ad_id,
"author": author,
"title": title
})
ws_emit_user(author, "shop.ad_created", {
"ad_id": ad_id,
"title": title
})
return ad_id
def buy_item(item_id, buyer_username):
"""Покупка товара"""
shop_data = load_shop()
users = load_users()
# Находим товар
item_to_buy = None
for item in shop_data["items"]:
if item["id"] == item_id and item["status"] == "available":
item_to_buy = item
break
if not item_to_buy:
return False, "Товар не найден или уже продан"
seller = item_to_buy["seller"]
price = item_to_buy["price"]
# Проверяем, есть ли у покупателя достаточно средств
if buyer_username not in users:
return False, "Покупатель не найден"
if users[buyer_username]["balance"] < price:
return False, "Недостаточно средств"
# Проверяем, что покупатель не покупает у себя
if buyer_username == seller:
return False, "Нельзя купить собственный товар"
# Проверяем, существует ли продавец
if seller not in users:
return False, "Продавец не найден"
# Выполняем транзакцию
users[buyer_username]["balance"] -= price
users[seller]["balance"] += price
save_users(users)
# Обновляем статус товара
for item in shop_data["items"]:
if item["id"] == item_id:
item["status"] = "sold"
item["buyer"] = buyer_username
item["sold_date"] = int(time.time())
break
save_shop(shop_data)
# Добавляем в историю
add_history(buyer_username, "buy_item", price, f"Товар: {item_to_buy['title']} от {seller}")
add_history(seller, "sell_item", price, f"Товар: {item_to_buy['title']} покупателю {buyer_username}")
ws_emit_system("shop.item_sold", {
"item_id": item_id,
"title": item_to_buy["title"],
"seller": seller,
"buyer": buyer_username,
"price": price
})
ws_emit_user(buyer_username, "shop.item_sold", {
"item_id": item_id,
"title": item_to_buy["title"],
"price": price,
"role": "buyer"
})
ws_emit_user(seller, "shop.item_sold", {
"item_id": item_id,
"title": item_to_buy["title"],
"price": price,
"role": "seller"
})
return True, "Покупка успешно завершена"
def default_crypto_market():
now = int(time.time())
return {
"name": SERKRIPTO_NAME,
"symbol": SERKRIPTO_SYMBOL,
"price": 100.0,
"min_price": 1.0,
"max_price": 1000000.0,
"price_step_buy": 0.012,
"price_step_sell": 0.01,
"idle_decay_rate": 0.0015,
"idle_decay_interval_sec": 30,
"last_buy_at": now,
"last_update": now,
"stats": {
"buy_ops": 0,
"sell_ops": 0,
"buy_volume": 0.0,
"sell_volume": 0.0
}
}
def normalize_crypto_market(data):
defaults = default_crypto_market()
market = {}
if not isinstance(data, dict):
data = {}
market["name"] = str(data.get("name", defaults["name"]) or defaults["name"])
market["symbol"] = str(data.get("symbol", defaults["symbol"]) or defaults["symbol"]).upper()
def to_float(value, fallback):
try:
return float(value)
except:
return float(fallback)
def to_int(value, fallback):
try:
return int(value)
except:
return int(fallback)
market["min_price"] = max(0.01, to_float(data.get("min_price"), defaults["min_price"]))
market["max_price"] = max(market["min_price"], to_float(data.get("max_price"), defaults["max_price"]))
market["price"] = to_float(data.get("price"), defaults["price"])
market["price"] = min(market["max_price"], max(market["min_price"], market["price"]))
market["price"] = round(market["price"], 4)
market["price_step_buy"] = min(0.5, max(0.0, to_float(data.get("price_step_buy"), defaults["price_step_buy"])))
market["price_step_sell"] = min(0.5, max(0.0, to_float(data.get("price_step_sell"), defaults["price_step_sell"])))
market["idle_decay_rate"] = min(0.2, max(0.0, to_float(data.get("idle_decay_rate"), defaults["idle_decay_rate"])))
market["idle_decay_interval_sec"] = max(5, to_int(data.get("idle_decay_interval_sec"), defaults["idle_decay_interval_sec"]))
market["last_buy_at"] = to_int(data.get("last_buy_at"), defaults["last_buy_at"])
market["last_update"] = to_int(data.get("last_update"), defaults["last_update"])
stats = data.get("stats", {})
if not isinstance(stats, dict):
stats = {}
market["stats"] = {
"buy_ops": max(0, to_int(stats.get("buy_ops"), 0)),
"sell_ops": max(0, to_int(stats.get("sell_ops"), 0)),
"buy_volume": max(0.0, round(to_float(stats.get("buy_volume"), 0.0), 6)),
"sell_volume": max(0.0, round(to_float(stats.get("sell_volume"), 0.0), 6))
}
return market
def load_crypto_market():
need_save = False
raw = None
try:
with open(CRYPTO_FILE, 'r', encoding='utf-8') as f:
raw = json.load(f)
except:
raw = default_crypto_market()
need_save = True
market = normalize_crypto_market(raw)
if raw != market:
need_save = True
if need_save:
save_crypto_market(market)
return market
def save_crypto_market(data):
with open(CRYPTO_FILE, "w", encoding='utf-8') as f:
json.dump(normalize_crypto_market(data), f, indent=4, ensure_ascii=False)
def apply_serkripto_idle_decay(market):
market = normalize_crypto_market(market)
now = int(time.time())
last_update = int(market.get("last_update", now))
interval = int(market.get("idle_decay_interval_sec", 30))
if now <= last_update:
return market, False
if now - int(market.get("last_buy_at", 0)) < interval:
market["last_update"] = now
return market, False
steps = (now - last_update) // interval
if steps <= 0:
return market, False
price_before = float(market.get("price", 100.0))
decay_rate = float(market.get("idle_decay_rate", 0.0015))
min_price = float(market.get("min_price", 1.0))
decayed_price = max(min_price, price_before * ((1 - decay_rate) ** steps))
market["price"] = round(decayed_price, 4)
market["last_update"] = now
changed = abs(price_before - market["price"]) > 1e-9
return market, changed
def get_user_serkripto_amount(user_data):
if not isinstance(user_data, dict):
return 0.0
try:
return round(max(0.0, float(user_data.get("serkripto", 0.0))), 6)
except:
return 0.0
def buy_serkripto(username, rub_amount):
try:
amount = int(rub_amount)
except:
return False, "invalid_amount"
if amount <= 0:
return False, "invalid_amount"
users = load_users()
user_data = users.get(username)
if not isinstance(user_data, dict):
return False, "user_not_found"
market = load_crypto_market()
market, decayed = apply_serkripto_idle_decay(market)
if decayed:
save_crypto_market(market)
ws_emit_crypto_market_event(market, "idle_decay")
price_before = float(market.get("price", 0.0))
if price_before <= 0:
return False, "market_unavailable"
if int(user_data.get("balance", 0)) < amount:
return False, "insufficient_funds"
quantity = round(amount / price_before, 6)
if quantity <= 0:
return False, "amount_too_small"
user_data["balance"] = int(user_data.get("balance", 0)) - amount
user_data["serkripto"] = round(get_user_serkripto_amount(user_data) + quantity, 6)
save_users(users)
growth = min(0.25, quantity * float(market.get("price_step_buy", 0.012)))
max_price = float(market.get("max_price", 1000000.0))
price_after = min(max_price, price_before * (1 + growth))
now = int(time.time())
market["price"] = round(price_after, 4)
market["last_buy_at"] = now
market["last_update"] = now
market["stats"]["buy_ops"] = int(market["stats"].get("buy_ops", 0)) + 1
market["stats"]["buy_volume"] = round(float(market["stats"].get("buy_volume", 0.0)) + quantity, 6)
save_crypto_market(market)
ws_emit_crypto_market_event(market, "buy")
add_history(username, "crypto_buy", amount, f"{SERKRIPTO_NAME} +{quantity:.6f} по {price_before:.4f} ₽")
ws_emit_balance_update(username)
ws_emit_user(username, "crypto.trade", {
"side": "buy",
"coin": SERKRIPTO_NAME,
"quantity": quantity,
"amount_rub": amount,
"price_before": round(price_before, 4),
"price_after": round(market["price"], 4)
})
ws_emit_system("crypto.trade", {
"side": "buy",
"user": username,
"coin": SERKRIPTO_NAME,
"quantity": quantity,
"price": round(market["price"], 4)
})
return True, {
"coin": SERKRIPTO_NAME,
"symbol": market.get("symbol", SERKRIPTO_SYMBOL),
"quantity": quantity,
"amount_rub": amount,
"price_before": round(price_before, 4),
"price_after": round(float(market.get("price", price_before)), 4),
"balance_rub": int(user_data.get("balance", 0)),
"wallet_quantity": get_user_serkripto_amount(user_data)
}
def sell_serkripto(username, quantity):
try:
qty = round(float(quantity), 6)
except:
return False, "invalid_quantity"
if qty <= 0:
return False, "invalid_quantity"
users = load_users()
user_data = users.get(username)
if not isinstance(user_data, dict):
return False, "user_not_found"
wallet_qty = get_user_serkripto_amount(user_data)
if wallet_qty + 1e-9 < qty:
return False, "insufficient_crypto"
market = load_crypto_market()
market, decayed = apply_serkripto_idle_decay(market)
if decayed:
save_crypto_market(market)
ws_emit_crypto_market_event(market, "idle_decay")
price_before = float(market.get("price", 0.0))
if price_before <= 0:
return False, "market_unavailable"
payout = int(round(qty * price_before))
if payout <= 0:
return False, "amount_too_small"
user_data["serkripto"] = round(max(0.0, wallet_qty - qty), 6)
user_data["balance"] = int(user_data.get("balance", 0)) + payout
save_users(users)
drop = min(0.25, qty * float(market.get("price_step_sell", 0.01)))
min_price = float(market.get("min_price", 1.0))
price_after = max(min_price, price_before * (1 - drop))
now = int(time.time())
market["price"] = round(price_after, 4)
market["last_update"] = now
market["stats"]["sell_ops"] = int(market["stats"].get("sell_ops", 0)) + 1
market["stats"]["sell_volume"] = round(float(market["stats"].get("sell_volume", 0.0)) + qty, 6)
save_crypto_market(market)
ws_emit_crypto_market_event(market, "sell")
add_history(username, "crypto_sell", payout, f"{SERKRIPTO_NAME} -{qty:.6f} по {price_before:.4f} ₽")
ws_emit_balance_update(username)
ws_emit_user(username, "crypto.trade", {
"side": "sell",
"coin": SERKRIPTO_NAME,
"quantity": qty,
"amount_rub": payout,
"price_before": round(price_before, 4),
"price_after": round(market["price"], 4)
})
ws_emit_system("crypto.trade", {
"side": "sell",
"user": username,
"coin": SERKRIPTO_NAME,
"quantity": qty,
"price": round(market["price"], 4)
})
return True, {
"coin": SERKRIPTO_NAME,
"symbol": market.get("symbol", SERKRIPTO_SYMBOL),
"quantity": qty,
"amount_rub": payout,
"price_before": round(price_before, 4),
"price_after": round(float(market.get("price", price_before)), 4),
"balance_rub": int(user_data.get("balance", 0)),
"wallet_quantity": get_user_serkripto_amount(user_data)
}
# ---------- API КЛЮЧИ ----------
def generate_api_key():
"""Генерация API ключа"""
return f"bank_api_{uuid.uuid4().hex[:16]}"
def get_user_by_api_key(api_key):
"""Получение пользователя по API ключу - ИСПРАВЛЕННАЯ ВЕРСИЯ"""
if not api_key:
return None, None
users = load_users()
for username, user_data in users.items():
# Проверяем, что user_data является словарем
if isinstance(user_data, dict) and user_data.get('api_key') == api_key:
return username, user_data
return None, None
# ---------- БАНКОВСКИЕ ОПЕРАЦИИ ----------
def daily_update():
"""Ежедневное обновление депозитов и кредитов"""
users = load_users()
updated = False
updated_users = []
for user_id, u in users.items():
if isinstance(u, dict): # Проверяем, что это словарь
if u.get('deposit', 0) > 0:
u['deposit'] = int(u['deposit'] * 1.01)
updated = True
updated_users.append(user_id)
if u.get('credit', 0) > 0:
u['credit'] = int(u['credit'] * 1.05)
updated = True
if user_id not in updated_users:
updated_users.append(user_id)
if updated:
save_users(users)
ws_emit_system("bank.daily_update", {"users": updated_users})
for username in updated_users:
ws_emit_balance_update(username)
def add_history(user, action, amount, target=None):
"""Добавление записи в историю"""
hist = load_history()
history_item = {
"time": int(time.time()),
"user": user,
"action": action,
"amount": amount,
"target": target,
"id": str(uuid.uuid4())[:8]
}
hist.append(history_item)
save_history(hist)
ws_emit_user(user, "history.new", history_item)
ws_emit_system("history.new", {
"user": user,
"action": action,
"amount": amount,
"target": target
})
ws_emit_balance_update(user)
# ---------- ДЕКОРАТОРЫ ----------
def login_required(f):
"""Декоратор для проверки авторизации"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user' not in session:
return redirect('/')
return f(*args, **kwargs)
return decorated_function
def web_roles_required(*allowed_roles):
"""Проверка ролей для WEB-маршрутов (требует активную сессию)"""
normalized_allowed = {normalize_role(r) for r in allowed_roles}
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user' not in session:
return redirect('/')
users = load_users()
user_data = users.get(session['user'])
if not isinstance(user_data, dict):
return redirect('/')
user_role = get_user_role(user_data)
if user_role not in normalized_allowed:
return redirect('/dashboard')
return f(*args, **kwargs)
return decorated_function
return decorator
def admin_required(f):
"""Декоратор для проверки прав администратора"""
return web_roles_required(ROLE_ADMIN)(f)
# ИСПРАВЛЕННЫЙ ДЕКОРАТОР API
def api_key_required(f):
"""Декоратор для проверки API ключа - ИСПРАВЛЕННАЯ ВЕРСИЯ"""
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = None
# 1. Проверяем заголовок X-API-Key
if 'X-API-Key' in request.headers:
api_key = request.headers.get('X-API-Key')
# 2. Проверяем параметр запроса api_key
if not api_key and request.args.get('api_key'):
api_key = request.args.get('api_key')
# 3. Проверяем JSON тело
if not api_key and request.is_json:
try:
data = request.get_json()
if data and 'api_key' in data:
api_key = data.get('api_key')
except:
pass
# 4. Проверяем форму
if not api_key and request.form.get('api_key'):
api_key = request.form.get('api_key')
if not api_key:
return jsonify({'success': False, 'error': 'API key required'}), 401
# Получаем пользователя по API ключу
username, user_data = get_user_by_api_key(api_key)
if not username:
return jsonify({'success': False, 'error': 'Invalid API key'}), 401
# Сохраняем данные пользователя в объекте request
request.username = username
request.user_data = user_data
request.user_role = get_user_role(user_data)
return f(*args, **kwargs)
return decorated_function
def api_roles_required(*allowed_roles):
"""Проверка ролей для API маршрутов (использовать после api_key_required)"""
normalized_allowed = {normalize_role(r) for r in allowed_roles}
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
user_role = getattr(request, "user_role", get_user_role(getattr(request, "user_data", {})))
if user_role not in normalized_allowed:
return jsonify({
"success": False,
"error": "Access denied for your role",
"role": user_role
}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
# ---------- HTML ШАБЛОНЫ ----------
def render_login(error_message="", twofa_mode=False, username_hint=""):
safe_error = html.escape(str(error_message or "").strip())
alert_html = f'
{safe_error}
' if safe_error else ""
safe_username_hint = html.escape(str(username_hint or "").strip())
security_notice_html = '
Важно: Мы не гарантируем полную безопасность сервиса. Все пароли хранятся в открытом виде, в базе данных, всем этим заведует один человек, поэтому я не гарантирую полную безопасность. Если понадобится восстановление, пишите на почту kompania.bank@gmail.com.
'
subtitle = "Вход в систему"
auth_form_html = '''
'''
if twofa_mode:
subtitle = f'Подтверждение входа: {safe_username_hint or "пользователь"}'
auth_form_html = '''
'''
page = '''
Банк "Школьный" | Вход в систему
'''
def render_dashboard(user_data, history_data, shop_data=None, crypto_market=None, crypto_flash=None, active_tab="services"):
"""Генерация HTML личного кабинета"""
current_role = get_user_role(user_data)
can_take_credit = current_role in CREDIT_ROLES
can_trade_crypto = current_role in CRYPTO_ROLES
allowed_tabs = {"services", "shop", "history", "api"}
if can_trade_crypto:
allowed_tabs.add("crypto")
if active_tab not in allowed_tabs:
active_tab = "services"
if not isinstance(crypto_market, dict):
crypto_market = load_crypto_market()
crypto_market, decayed = apply_serkripto_idle_decay(crypto_market)
if decayed:
save_crypto_market(crypto_market)
ws_emit_crypto_market_event(crypto_market, "idle_decay")
crypto_price = round(float(crypto_market.get("price", 0.0)), 4)
crypto_user_amount = get_user_serkripto_amount(user_data)
crypto_portfolio_value = round(crypto_user_amount * crypto_price, 2)
crypto_stats = crypto_market.get("stats", {})
if not isinstance(crypto_stats, dict):
crypto_stats = {}
crypto_buy_ops = int(crypto_stats.get("buy_ops", 0))
crypto_sell_ops = int(crypto_stats.get("sell_ops", 0))
crypto_buy_volume = round(float(crypto_stats.get("buy_volume", 0.0)), 6)
crypto_sell_volume = round(float(crypto_stats.get("sell_volume", 0.0)), 6)
decay_rate_pct = round(float(crypto_market.get("idle_decay_rate", 0.0)) * 100, 3)
decay_interval = int(crypto_market.get("idle_decay_interval_sec", 30))
net_flow = round(crypto_buy_volume - crypto_sell_volume, 6)
max_buy_amount = max(0, int(user_data.get("balance", 0))) if isinstance(user_data, dict) else 0
max_sell_quantity = max(0.0, crypto_user_amount)
crypto_flash_html = ""
if isinstance(crypto_flash, dict) and crypto_flash.get("text"):
flash_class = crypto_flash.get("class", "info")
flash_text = str(crypto_flash.get("text"))
crypto_flash_html = f'
{flash_text}
'
# Форматирование времени истории
for h in history_data:
h['formatted_time'] = datetime.fromtimestamp(h['time']).strftime('%d.%m.%Y %H:%M')
# HTML для истории
history_html = ""
for h in history_data[-10:][::-1]:
icon = "fa-history"
color = "text-primary"
if "deposit" in h['action']:
icon = "fa-piggy-bank"
color = "text-info"
elif "credit" in h['action']:
icon = "fa-credit-card"
color = "text-warning"
elif "transfer" in h['action']:
icon = "fa-exchange-alt"
color = "text-primary"
elif "card" in h['action']:
icon = "fa-credit-card"
color = "text-success"
elif "api" in h['action']:
icon = "fa-key"
color = "text-secondary"
elif "shop" in h['action'] or "item" in h['action'] or "advertisement" in h['action']:
icon = "fa-shopping-cart"
color = "text-success"
elif "payment" in h['action']:
icon = "fa-credit-card"
color = "text-success"
history_html += f'''
{h['formatted_time']}
{h['user']}
{h['action']}
{h['amount']:,} ₽
{h.get('target', '-')}
'''
# HTML для API ключа
api_key_html = ""
if user_data.get('api_key'):
api_key_html = f'''
Ваш API ключ
Используйте этот ключ для доступа к API
'''
else:
api_key_html = '''
API ключ не создан
'''
# HTML для магазина
shop_tab_content = ""
if shop_data:
# Товары
shop_items_html = ""
available_items = [item for item in shop_data["items"] if item["status"] == "available"]
if available_items:
for item in available_items[-10:][::-1]: # Последние 10 товаров
created_date = datetime.fromtimestamp(item["created"]).strftime('%d.%m.%Y')
shop_items_html += f'''
В магазине пока нет товаров. Будьте первым, кто добавит товар!
'''
# Объявления
ads_html = ""
active_ads = [ad for ad in shop_data["ads"] if ad["status"] == "active"]
if active_ads:
for ad in active_ads[-10:][::-1]: # Последние 10 объявлений
created_date = datetime.fromtimestamp(ad["created"]).strftime('%d.%m.%Y')
ads_html += f'''
# Сделать перевод
data = {{"target": "получатель", "amount": 100}}
response = requests.post(f"{{base_url}}/transfer", headers=headers, json=data)
print(response.json())
cURL:
# Получить баланс
curl -H "X-API-Key: ваш_api_ключ" http://ваш-домен/api/v1/balance
# Сделать перевод
curl -X POST -H "X-API-Key: ваш_api_ключ" \\
-H "Content-Type: application/json" \\
-d '{{"target": "получатель", "amount": 100}}' \\
http://ваш-домен/api/v1/transfer
"
created_block = ""
if created_payment and isinstance(created_payment, dict):
payment_url = f"{request.host_url.rstrip('/')}/pay/{created_payment.get('id')}"
created_block = f"""
Ссылка создана
ID платежа: {created_payment.get('id')}
{payment_url}
"""
api_key_hint = ""
if api_key:
api_key_hint = f"""
Ваш API ключ:{api_key}
"""
else:
api_key_hint = """
API ключ не создан. Сначала сгенерируйте ключ в личном кабинете.
"""
error_block = ""
if error_message:
error_block = f'
{error_message}
'
return f'''
Оплата | Банковская система
Платежная система
Создавайте ссылки и принимайте оплату
{error_block}
{created_block}
Создать ссылку для оплаты
API для оплаты
{api_key_hint}
POST /api/v1/payment/create
Header: X-API-Key: ваш_ключ
Body: {{"receiver":"{current_user or 'username'}","amount":100,"description":"Оплата","allow_any_payer":true}}
GET /api/v1/payment/status/payment_id
Header: X-API-Key: ваш_ключ
Последние платежи
ID
Сумма
Получатель
Плательщик
Статус
Дата
Ссылка
{payment_rows}
'''
def render_pay_page(payment_id):
"""Страница оплаты для клиента"""
payments = load_payments()
error_code = request.args.get("error", "").strip()
error_map = {
"invalid_credentials": "Неверный логин или пароль",
"payment_not_found": "Платеж не найден",
"wrong_payer": "Вы не являетесь плательщиком по этому платежу",
"insufficient_funds": "Недостаточно средств на счете плательщика",
"payer_not_found": "Плательщик не найден",
"receiver_not_found": "Получатель не найден",
"role_denied": "Роль пользователя не имеет доступа к оплате",
"same_user": "Нельзя оплатить самому себе",
"payment_failed": "Ошибка при обработке платежа"
}
error_message = error_map.get(error_code, "")
payment = None
for p in payments:
if p["id"] == payment_id:
payment = p
break
if not payment:
return '''
Платеж не найден
Платеж не найден
Проверьте правильность ссылки или обратитесь к отправителю.
"""
def render_parent_panel(parent_username):
users = load_users()
if parent_username not in users:
return "Родитель не найден", 404
children = get_parent_children(parent_username)
class_map = load_classes()
journal = load_journal()
children_rows = ""
for child in children:
if child not in users:
continue
stats = get_student_stats(child)
class_name = get_student_class(child)
net = stats["finance_delta"]
net_text = f"+{net}" if net >= 0 else str(net)
net_class = "text-success" if net >= 0 else "text-danger"
children_rows += f"""
"
# Кандидаты для быстрого добавления ребенка
linked = set(children)
student_candidates = []
for username, user_data in users.items():
if not isinstance(user_data, dict):
continue
role = get_user_role(user_data)
in_classes = any(username in students for students in class_map.values())
in_journal = username in journal
if role == ROLE_STUDENT or in_classes or in_journal:
if username not in linked and username != parent_username:
student_candidates.append(username)
student_candidates = sorted(set(student_candidates))
options = ''
for student_name in student_candidates:
options += f''
if not student_candidates:
options = ''
return f"""
Панель родителя
"""
def render_manager_panel(manager_username):
users = load_users()
payments = load_payments()
journal = load_journal()
classes = load_classes()
history = load_history()
shop_data = load_shop()
valid_users = {u: d for u, d in users.items() if isinstance(d, dict)}
role_counts = {role: 0 for role in sorted(VALID_ROLES)}
total_balance = 0
for _, user_data in valid_users.items():
role_counts[get_user_role(user_data)] += 1
total_balance += int(user_data.get("balance", 0))
payment_total = len(payments)
payment_pending = len([p for p in payments if p.get("status") == "pending"])
payment_completed = len([p for p in payments if p.get("status") == "completed"])
recent_payments_rows = ""
for payment in sorted(payments, key=lambda x: x.get("created", 0), reverse=True)[:20]:
created = datetime.fromtimestamp(payment.get("created", 0)).strftime("%d.%m.%Y %H:%M")
recent_payments_rows += f"""
{payment.get('id')}
{payment.get('payer')}
{payment.get('receiver')}
{payment.get('amount')}
{payment.get('status')}
{created}
"""
if not recent_payments_rows:
recent_payments_rows = "
Нет платежей
"
students_set = set(journal.keys())
for _, students in classes.items():
students_set.update(students)
for username, user_data in valid_users.items():
if get_user_role(user_data) == ROLE_STUDENT:
students_set.add(username)
top_students = []
for student in students_set:
stats = get_student_stats(student)
top_students.append({
"student": student,
"avg": stats["avg"],
"grades": stats["grade_count"],
"finance": stats["finance_delta"],
"class_name": get_student_class(student)
})
top_students.sort(key=lambda x: (x["avg"], x["grades"], x["finance"]), reverse=True)
top_students_rows = ""
for i, item in enumerate(top_students[:10], start=1):
top_students_rows += f"""
"
class_rows = ""
for class_name, students in sorted(classes.items()):
student_chips = ", ".join([f"{s}" for s in students]) if students else "—"
class_rows += f"
{class_name}
{student_chips}
"
if not class_rows:
class_rows = "
Классы не добавлены
"
subjects_text = "".join([f"{s}" for s in subjects]) if subjects else "Предметы не добавлены"
student_candidates = set(journal.keys())
for class_students in classes.values():
student_candidates.update(class_students)
for username, user_data in users.items():
if not isinstance(user_data, dict):
continue
role = user_data.get("role", "client")
if role in [ROLE_ADMIN, ROLE_TEACHER, ROLE_MANAGER, ROLE_PARENT]:
continue
student_candidates.add(username)
grade_students = sorted(student_candidates)
subject_candidates = set(subjects)
for student_subjects in journal.values():
if isinstance(student_subjects, dict):
subject_candidates.update(student_subjects.keys())
grade_subjects = sorted(subject_candidates)
if grade_students:
student_options = '' + "".join(
[f'' for student in grade_students]
)
else:
student_options = ''
if grade_subjects:
subject_options = '' + "".join(
[f'' for subject in grade_subjects]
)
else:
subject_options = ''
grade_options = """
"""
grade_submit_disabled = "disabled" if not grade_students or not grade_subjects else ""
return f"""
Учительская панель