generated from justuser-31/mrl_v1_license
Добавление телеграм бота для регистрации
This commit is contained in:
@@ -0,0 +1,203 @@
|
||||
import json
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
|
||||
import telebot
|
||||
|
||||
from call2api import create_account, delete_account
|
||||
|
||||
# Load configuration
|
||||
CONFIG_FILE = "config.json"
|
||||
if not os.path.exists(CONFIG_FILE):
|
||||
with open(CONFIG_FILE, "w") as f:
|
||||
f.write("""{
|
||||
"bot_token": "YOUR_BOT_TOKEN_HERE",
|
||||
"chat_id": 1111,
|
||||
"api_url": "http://localhost:8000",
|
||||
"api_token": "YOUR_API_TOKEN_HERE",
|
||||
"drive_quota": 100
|
||||
}""")
|
||||
raise FileNotFoundError(f"Configuration file {CONFIG_FILE} not found")
|
||||
|
||||
with open(CONFIG_FILE, "r") as f:
|
||||
config = json.load(f)
|
||||
|
||||
# Bot configuration
|
||||
BOT_TOKEN = config.get("bot_token")
|
||||
CHAT_ID = config.get("chat_id", 1111)
|
||||
API_URL = config.get("api_url")
|
||||
API_TOKEN = config.get("api_token")
|
||||
DRIVE_QUOTA = config.get("drive_quota", 100)
|
||||
|
||||
# Initialize bot
|
||||
bot = telebot.TeleBot(BOT_TOKEN)
|
||||
|
||||
|
||||
# Load user registrations
|
||||
def load_registrations():
|
||||
if os.path.exists("registrations.json"):
|
||||
with open("registrations.json", "r") as f:
|
||||
return json.load(f)
|
||||
return {}
|
||||
|
||||
|
||||
# Save user registrations
|
||||
def save_registrations(registrations):
|
||||
with open("registrations.json", "w") as f:
|
||||
json.dump(registrations, f, indent=2)
|
||||
|
||||
|
||||
# Generate random password
|
||||
def generate_password(length=20):
|
||||
characters = string.ascii_letters + string.digits
|
||||
return "".join(random.choice(characters) for _ in range(length))
|
||||
|
||||
|
||||
# Generate registration token
|
||||
def generate_registration_token(length=20):
|
||||
characters = string.ascii_letters + string.digits
|
||||
return "".join(random.choice(characters) for _ in range(length))
|
||||
|
||||
|
||||
# Store pending registrations
|
||||
global pending_registrations
|
||||
pending_registrations = {}
|
||||
|
||||
|
||||
@bot.message_handler(commands=["reg_sfs"])
|
||||
def register_user(message):
|
||||
user_id = str(message.from_user.id)
|
||||
chat_id = message.chat.id
|
||||
|
||||
# Check if this is private messages
|
||||
if message.chat.type == "private":
|
||||
complete_registration(message)
|
||||
return
|
||||
# Check if command was sent in the correct chat
|
||||
if chat_id != CHAT_ID:
|
||||
bot.reply_to(message, "Please use this command in the designated chat.")
|
||||
return
|
||||
|
||||
# Load current registrations
|
||||
registrations = load_registrations()
|
||||
|
||||
# Check if user is already registered
|
||||
if user_id in registrations:
|
||||
bot.reply_to(message, "You are already registered!")
|
||||
return
|
||||
|
||||
# Generate registration token and store it
|
||||
reg_token = generate_registration_token()
|
||||
global pending_registrations
|
||||
pending_registrations[user_id] = reg_token
|
||||
|
||||
# Send instruction to user
|
||||
instruction_msg = (
|
||||
f"To complete registration, type in bot's private messages:\n"
|
||||
f"`/reg_sfs {reg_token} YOUR_USERNAME`"
|
||||
)
|
||||
bot.reply_to(message, instruction_msg, parse_mode="Markdown")
|
||||
|
||||
|
||||
def complete_registration(message):
|
||||
global pending_registrations
|
||||
user_id = str(message.from_user.id)
|
||||
chat_type = message.chat.type
|
||||
|
||||
# Registration must happen in private chat
|
||||
if chat_type != "private":
|
||||
return
|
||||
|
||||
# Check if user has initiated registration
|
||||
if user_id not in pending_registrations:
|
||||
bot.reply_to(
|
||||
message, "Please start registration in the group chat first using /reg_sfs"
|
||||
)
|
||||
return
|
||||
|
||||
parts = message.text.split()
|
||||
if len(parts) != 3:
|
||||
bot.reply_to(
|
||||
message,
|
||||
"Invalid format. Use: `/reg_sfs TOKEN USERNAME`",
|
||||
parse_mode="Markdown",
|
||||
)
|
||||
return
|
||||
|
||||
_, provided_token, username = parts
|
||||
|
||||
# Verify token
|
||||
if pending_registrations[user_id] != provided_token:
|
||||
bot.reply_to(message, "Invalid registration token.")
|
||||
return
|
||||
|
||||
# Generate password
|
||||
password = generate_password()
|
||||
|
||||
if username == "YOUR_USERNAME":
|
||||
bot.reply_to(
|
||||
message,
|
||||
"You should use own username instead example `YOUR_USERNAME`.",
|
||||
parse_mode="Markdown",
|
||||
)
|
||||
return
|
||||
|
||||
# Create account via API
|
||||
result = create_account(API_URL, API_TOKEN, username, password, DRIVE_QUOTA)
|
||||
|
||||
if result["status_code"] == 200:
|
||||
# Save registration
|
||||
registrations = load_registrations()
|
||||
registrations[user_id] = username
|
||||
registrations[username] = user_id
|
||||
save_registrations(registrations)
|
||||
|
||||
# Remove pending registration
|
||||
del pending_registrations[user_id]
|
||||
|
||||
# Send success message
|
||||
success_msg = (
|
||||
f"Successful registration:\nUsername: `{username}`\nPassword: `{password}`"
|
||||
)
|
||||
bot.reply_to(message, success_msg, parse_mode="Markdown")
|
||||
else:
|
||||
bot.reply_to(
|
||||
message,
|
||||
f"Registration failed: <code>{result['message']}</code>",
|
||||
parse_mode="HTML",
|
||||
)
|
||||
|
||||
|
||||
@bot.message_handler(content_types=["left_chat_member"])
|
||||
def handle_user_leave(message):
|
||||
user_id = str(message.left_chat_member.id)
|
||||
|
||||
# Load registrations
|
||||
registrations = load_registrations()
|
||||
|
||||
# Check if user was registered
|
||||
if user_id in registrations:
|
||||
username = registrations[user_id]
|
||||
|
||||
# Delete account via API
|
||||
result = delete_account(API_URL, API_TOKEN, username)
|
||||
|
||||
# Remove registration
|
||||
del registrations[user_id]
|
||||
if username in registrations:
|
||||
del registrations[username]
|
||||
save_registrations(registrations)
|
||||
|
||||
# Notify in chat if deletion was successful
|
||||
if result["status_code"] == 200:
|
||||
bot.send_message(CHAT_ID, f"Account for {username} has been deleted.")
|
||||
else:
|
||||
bot.send_message(
|
||||
CHAT_ID, f"Failed to delete account for {username}: {result['message']}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("SFS Bot started...")
|
||||
bot.polling(none_stop=True)
|
||||
Reference in New Issue
Block a user