generated from justuser-31/mrl_v1_license
211 lines
5.8 KiB
Python
211 lines
5.8 KiB
Python
import json
|
|
import os
|
|
import random
|
|
import string
|
|
|
|
import telebot
|
|
|
|
from call2api import create_account, delete_account
|
|
|
|
# Load configuration
|
|
CONFIG_FILE = "config.json"
|
|
if not os.path.exists(CONFIG_FILE):
|
|
with open(CONFIG_FILE, "w") as f:
|
|
f.write("""{
|
|
"bot_token": "YOUR_BOT_TOKEN_HERE",
|
|
"chat_id": 1111,
|
|
"api_url": "http://localhost:8000",
|
|
"api_token": "YOUR_API_TOKEN_HERE",
|
|
"drive_quota": 100
|
|
}""")
|
|
raise FileNotFoundError(f"Configuration file {CONFIG_FILE} not found")
|
|
|
|
with open(CONFIG_FILE, "r") as f:
|
|
config = json.load(f)
|
|
|
|
# Bot configuration
|
|
BOT_TOKEN = config.get("bot_token")
|
|
CHAT_ID = config.get("chat_id", 1111)
|
|
API_URL = config.get("api_url")
|
|
API_TOKEN = config.get("api_token")
|
|
DRIVE_QUOTA = config.get("drive_quota", 100)
|
|
|
|
# Initialize bot
|
|
bot = telebot.TeleBot(BOT_TOKEN)
|
|
|
|
|
|
# Load user registrations
|
|
def load_registrations():
|
|
if os.path.exists("registrations.json"):
|
|
with open("registrations.json", "r") as f:
|
|
return json.load(f)
|
|
return {}
|
|
|
|
|
|
# Save user registrations
|
|
def save_registrations(registrations):
|
|
with open("registrations.json", "w") as f:
|
|
json.dump(registrations, f, indent=2)
|
|
|
|
|
|
# Generate random password
|
|
def generate_password(length=20):
|
|
characters = string.ascii_letters + string.digits
|
|
return "".join(random.choice(characters) for _ in range(length))
|
|
|
|
|
|
# Generate registration token
|
|
def generate_registration_token(length=20):
|
|
characters = string.ascii_letters + string.digits
|
|
return "".join(random.choice(characters) for _ in range(length))
|
|
|
|
|
|
# Store pending registrations
|
|
global pending_registrations
|
|
pending_registrations = {}
|
|
|
|
|
|
@bot.message_handler(commands=["reg_sfs"])
|
|
def register_user(message):
|
|
user_id = str(message.from_user.id)
|
|
chat_id = message.chat.id
|
|
|
|
# Check if this is private messages
|
|
if message.chat.type == "private":
|
|
complete_registration(message)
|
|
return
|
|
# Check if command was sent in the correct chat
|
|
if chat_id != CHAT_ID:
|
|
bot.reply_to(message, "Please use this command in the designated chat.")
|
|
return
|
|
|
|
# Load current registrations
|
|
registrations = load_registrations()
|
|
|
|
# Check if user is already registered
|
|
if user_id in registrations:
|
|
bot.reply_to(message, "You are already registered!")
|
|
return
|
|
|
|
# Generate registration token and store it
|
|
reg_token = generate_registration_token()
|
|
global pending_registrations
|
|
pending_registrations[user_id] = reg_token
|
|
|
|
# Send instruction to user
|
|
instruction_msg = (
|
|
f"To complete registration, type in bot's private messages:\n"
|
|
f"`/reg_sfs {reg_token} YOUR_USERNAME`"
|
|
)
|
|
bot.reply_to(message, instruction_msg, parse_mode="Markdown")
|
|
|
|
|
|
def complete_registration(message):
|
|
global pending_registrations
|
|
user_id = str(message.from_user.id)
|
|
chat_type = message.chat.type
|
|
|
|
# Registration must happen in private chat
|
|
if chat_type != "private":
|
|
return
|
|
|
|
# Check if user has initiated registration
|
|
if user_id not in pending_registrations:
|
|
bot.reply_to(
|
|
message, "Please start registration in the group chat first using /reg_sfs"
|
|
)
|
|
return
|
|
|
|
parts = message.text.split()
|
|
if len(parts) != 3:
|
|
bot.reply_to(
|
|
message,
|
|
"Invalid format. Use: `/reg_sfs TOKEN USERNAME`",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
_, provided_token, username = parts
|
|
|
|
# Verify token
|
|
if pending_registrations[user_id] != provided_token:
|
|
bot.reply_to(message, "Invalid registration token.")
|
|
return
|
|
|
|
# Generate password
|
|
password = generate_password()
|
|
|
|
if username == "YOUR_USERNAME":
|
|
bot.reply_to(
|
|
message,
|
|
"You should use own username instead example `YOUR_USERNAME`.",
|
|
parse_mode="Markdown",
|
|
)
|
|
return
|
|
|
|
# Create account via API
|
|
result = create_account(API_URL, API_TOKEN, username, password, DRIVE_QUOTA)
|
|
|
|
if result["status_code"] == 200:
|
|
# Save registration
|
|
registrations = load_registrations()
|
|
registrations[user_id] = username
|
|
registrations[username] = user_id
|
|
save_registrations(registrations)
|
|
|
|
# Remove pending registration
|
|
del pending_registrations[user_id]
|
|
|
|
# Send success message
|
|
success_msg = (
|
|
f"Successful registration:\nUsername: `{username}`\nPassword: `{password}`"
|
|
)
|
|
bot.reply_to(message, success_msg, parse_mode="Markdown")
|
|
else:
|
|
bot.reply_to(
|
|
message,
|
|
f"Registration failed: <code>{result['message']}</code>",
|
|
parse_mode="HTML",
|
|
)
|
|
|
|
|
|
@bot.message_handler(content_types=["left_chat_member"])
|
|
def handle_user_leave(message):
|
|
user_id = str(message.left_chat_member.id)
|
|
|
|
# Load registrations
|
|
registrations = load_registrations()
|
|
|
|
# Check if user was registered
|
|
if user_id in registrations:
|
|
username = registrations[user_id]
|
|
|
|
# Delete account via API
|
|
result = delete_account(API_URL, API_TOKEN, username)
|
|
|
|
# Remove registration
|
|
del registrations[user_id]
|
|
if username in registrations:
|
|
del registrations[username]
|
|
save_registrations(registrations)
|
|
|
|
# Notify in chat if deletion was successful
|
|
if result["status_code"] == 200:
|
|
bot.send_message(CHAT_ID, f"Account for {username} has been deleted.")
|
|
else:
|
|
bot.send_message(
|
|
CHAT_ID, f"Failed to delete account for {username}: {result['message']}"
|
|
)
|
|
|
|
|
|
from time import sleep
|
|
if __name__ == "__main__":
|
|
while True:
|
|
try:
|
|
print("SFS Bot started...")
|
|
bot.polling(none_stop=True)
|
|
except KeyboardInterrupt:
|
|
exit(0)
|
|
except:
|
|
sleep(0.5)
|