simpliest_fs/telegram_bot/sfs_bot.py

204 lines
5.6 KiB
Python

import json
import os
import random
import string
import telebot
from call2api import create_account, delete_account
# Load configuration
CONFIG_FILE = "config.json"
if not os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "w") as f:
f.write("""{
"bot_token": "YOUR_BOT_TOKEN_HERE",
"chat_id": 1111,
"api_url": "http://localhost:8000",
"api_token": "YOUR_API_TOKEN_HERE",
"drive_quota": 100
}""")
raise FileNotFoundError(f"Configuration file {CONFIG_FILE} not found")
with open(CONFIG_FILE, "r") as f:
config = json.load(f)
# Bot configuration
BOT_TOKEN = config.get("bot_token")
CHAT_ID = config.get("chat_id", 1111)
API_URL = config.get("api_url")
API_TOKEN = config.get("api_token")
DRIVE_QUOTA = config.get("drive_quota", 100)
# Initialize bot
bot = telebot.TeleBot(BOT_TOKEN)
# Load user registrations
def load_registrations():
if os.path.exists("registrations.json"):
with open("registrations.json", "r") as f:
return json.load(f)
return {}
# Save user registrations
def save_registrations(registrations):
with open("registrations.json", "w") as f:
json.dump(registrations, f, indent=2)
# Generate random password
def generate_password(length=20):
characters = string.ascii_letters + string.digits
return "".join(random.choice(characters) for _ in range(length))
# Generate registration token
def generate_registration_token(length=20):
characters = string.ascii_letters + string.digits
return "".join(random.choice(characters) for _ in range(length))
# Store pending registrations
global pending_registrations
pending_registrations = {}
@bot.message_handler(commands=["reg_sfs"])
def register_user(message):
user_id = str(message.from_user.id)
chat_id = message.chat.id
# Check if this is private messages
if message.chat.type == "private":
complete_registration(message)
return
# Check if command was sent in the correct chat
if chat_id != CHAT_ID:
bot.reply_to(message, "Please use this command in the designated chat.")
return
# Load current registrations
registrations = load_registrations()
# Check if user is already registered
if user_id in registrations:
bot.reply_to(message, "You are already registered!")
return
# Generate registration token and store it
reg_token = generate_registration_token()
global pending_registrations
pending_registrations[user_id] = reg_token
# Send instruction to user
instruction_msg = (
f"To complete registration, type in bot's private messages:\n"
f"`/reg_sfs {reg_token} YOUR_USERNAME`"
)
bot.reply_to(message, instruction_msg, parse_mode="Markdown")
def complete_registration(message):
global pending_registrations
user_id = str(message.from_user.id)
chat_type = message.chat.type
# Registration must happen in private chat
if chat_type != "private":
return
# Check if user has initiated registration
if user_id not in pending_registrations:
bot.reply_to(
message, "Please start registration in the group chat first using /reg_sfs"
)
return
parts = message.text.split()
if len(parts) != 3:
bot.reply_to(
message,
"Invalid format. Use: `/reg_sfs TOKEN USERNAME`",
parse_mode="Markdown",
)
return
_, provided_token, username = parts
# Verify token
if pending_registrations[user_id] != provided_token:
bot.reply_to(message, "Invalid registration token.")
return
# Generate password
password = generate_password()
if username == "YOUR_USERNAME":
bot.reply_to(
message,
"You should use own username instead example `YOUR_USERNAME`.",
parse_mode="Markdown",
)
return
# Create account via API
result = create_account(API_URL, API_TOKEN, username, password, DRIVE_QUOTA)
if result["status_code"] == 200:
# Save registration
registrations = load_registrations()
registrations[user_id] = username
registrations[username] = user_id
save_registrations(registrations)
# Remove pending registration
del pending_registrations[user_id]
# Send success message
success_msg = (
f"Successful registration:\nUsername: `{username}`\nPassword: `{password}`"
)
bot.reply_to(message, success_msg, parse_mode="Markdown")
else:
bot.reply_to(
message,
f"Registration failed: <code>{result['message']}</code>",
parse_mode="HTML",
)
@bot.message_handler(content_types=["left_chat_member"])
def handle_user_leave(message):
user_id = str(message.left_chat_member.id)
# Load registrations
registrations = load_registrations()
# Check if user was registered
if user_id in registrations:
username = registrations[user_id]
# Delete account via API
result = delete_account(API_URL, API_TOKEN, username)
# Remove registration
del registrations[user_id]
if username in registrations:
del registrations[username]
save_registrations(registrations)
# Notify in chat if deletion was successful
if result["status_code"] == 200:
bot.send_message(CHAT_ID, f"Account for {username} has been deleted.")
else:
bot.send_message(
CHAT_ID, f"Failed to delete account for {username}: {result['message']}"
)
if __name__ == "__main__":
print("SFS Bot started...")
bot.polling(none_stop=True)