558 lines
16 KiB
Python
558 lines
16 KiB
Python
import telebot
|
|
import os
|
|
import json
|
|
|
|
from telebot import types,util
|
|
|
|
global db, users, la
|
|
|
|
####### CREATE DB IF NOT EXIST
|
|
|
|
if not os.path.exists('db.json'):
|
|
db = {"token": "None"}
|
|
js = json.dumps(db, indent=2)
|
|
with open("db.json", "w") as outfile:
|
|
outfile.write(js)
|
|
|
|
print('Input token in "None" (db.json)')
|
|
exit()
|
|
|
|
if not os.path.exists('users.json'):
|
|
users = {}
|
|
js = json.dumps(users, indent=2)
|
|
with open("users.json", "w") as outfile:
|
|
outfile.write(js)
|
|
|
|
if not os.path.exists('la.json'):
|
|
la = {}
|
|
js = json.dumps(la, indent=2)
|
|
with open("la.json", "w") as outfile:
|
|
outfile.write(js)
|
|
############WORK WITH DBs##########
|
|
|
|
def read_db():
|
|
global db
|
|
with open('db.json', 'r') as openfile:
|
|
db = json.load(openfile)
|
|
def write_db():
|
|
global db
|
|
js = json.dumps(db, indent=2)
|
|
with open("db.json", "w") as outfile:
|
|
outfile.write(js)
|
|
|
|
|
|
def read_users():
|
|
global users
|
|
with open('users.json', 'r') as openfile:
|
|
users = json.load(openfile)
|
|
def write_users():
|
|
global users
|
|
js = json.dumps(users, indent=2)
|
|
with open("users.json", "w") as outfile:
|
|
outfile.write(js)
|
|
|
|
# LA - Low Admin.
|
|
# Admin permissions in bot without admin rights.
|
|
def read_la():
|
|
global la
|
|
with open('la.json', 'r') as openfile:
|
|
la = json.load(openfile)
|
|
def write_la():
|
|
global la
|
|
js = json.dumps(la, indent=2)
|
|
with open("la.json", "w") as outfile:
|
|
outfile.write(js)
|
|
|
|
####################FAST HASH#################
|
|
from xxhash import xxh32
|
|
|
|
# Generate fast hash
|
|
def sha(text):
|
|
text = str(text)
|
|
return xxh32(text).hexdigest()
|
|
|
|
##################FUNCTIONS########
|
|
|
|
def get_admins(message):
|
|
try:
|
|
if bot.get_chat(message.chat.id).type == "private":
|
|
return []
|
|
else:
|
|
admins = bot.get_chat_administrators(chat_id=message.chat.id)
|
|
true_admins = []
|
|
for i in admins:
|
|
if i.status == "creator" or i.can_restrict_members == True:
|
|
true_admins.append(i.user.id)
|
|
return true_admins
|
|
except:
|
|
catch_error(message)
|
|
return None
|
|
|
|
# Fix for anon admins, all anon (not premium) users == admins
|
|
def is_anon(message):
|
|
if message.from_user.username == "Channel_Bot" or message.from_user.username == "GroupAnonymousBot":
|
|
if message.from_user.is_premium == None:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def get_target(message):
|
|
# try:
|
|
if True:
|
|
global users
|
|
if len(message.text.split()) > 1 and message.text.split()[1][0] == "@":
|
|
username = message.text.split()[1][1:]
|
|
read_users()
|
|
if sha(username) in users:
|
|
return users[sha(username)]
|
|
else:
|
|
print(1)
|
|
return None
|
|
elif len(message.text.split()) > 1 and message.text.split()[2][0] == "@":
|
|
username = message.text.split()[2][1:]
|
|
read_users()
|
|
if sha(username) in users:
|
|
return users[sha(username)]
|
|
else:
|
|
print(2)
|
|
return None
|
|
else:
|
|
target = message.reply_to_message.from_user.id
|
|
if target not in get_admins(message):
|
|
return target
|
|
else:
|
|
print(3)
|
|
return None
|
|
# except:
|
|
# return None
|
|
|
|
def get_name(message):
|
|
try:
|
|
text = message.text.split()
|
|
|
|
# If message with @username
|
|
if len(text) > 1 and text[1][0] == '@':
|
|
return text[1]
|
|
if len(text) > 1 and text[2][0] == '@':
|
|
return text[2]
|
|
# Reply to message
|
|
else:
|
|
return telebot.util.user_link(message.reply_to_message.from_user)
|
|
except:
|
|
catch_error(message)
|
|
|
|
def have_rights(message, set_la = False):
|
|
global la ; read_la()
|
|
if message.from_user.id in get_admins(message):
|
|
return True
|
|
elif is_anon(message):
|
|
return True
|
|
elif str(message.chat.id) in la and not set_la:
|
|
if str(message.from_user.username) in la[str(message.chat.id)]:
|
|
return True
|
|
else:
|
|
bot.reply_to(message, "Увы, но у вас нету прав.")
|
|
|
|
def key_by_value(dictionary, key):
|
|
for i in dictionary:
|
|
if dictionary[i] == key:
|
|
return i
|
|
return None
|
|
|
|
def analytic(message):
|
|
global users
|
|
read_users()
|
|
|
|
if key_by_value(users, message.from_user.id) == message.from_user.username:
|
|
pass
|
|
elif message.from_user.username == "None":
|
|
pass
|
|
else:
|
|
users[sha(message.from_user.username)] = message.from_user.id
|
|
write_users()
|
|
|
|
|
|
#############TOKEN INIT#####
|
|
|
|
|
|
read_db()
|
|
read_users()
|
|
bot = telebot.TeleBot(db['token'])
|
|
|
|
##################COMMANDS#######
|
|
|
|
@bot.message_handler(commands=['start', 'faq'])
|
|
def send_welcome(message):
|
|
bot.reply_to(message, """Колотушка работает 🔨
|
|
|
|
Что это такое?
|
|
Это минимальный бот-модератор без слежки с открым кодом.
|
|
Код: https://gitea.gulyaipole.fun/justuser/just_moderator
|
|
|
|
Список команд - /help
|
|
""")
|
|
|
|
@bot.message_handler(commands=['help'])
|
|
def help(message):
|
|
bot.reply_to(message, """
|
|
Список команд:
|
|
|
|
/mute ⇁ Мут человека в ответ на сообщение
|
|
/unmute ⇁ Снятие мута
|
|
/kick ⇁ Кик
|
|
/ban ⇁ Бан
|
|
/unban ⇁ Снятие бана
|
|
/setwelcome ⇁ Приветственное сообщение
|
|
/welcome ⇁ Демонстрация текущего сообщения
|
|
/lock ⇁ Блокировка чата (для обычных пользователей)
|
|
/unlock ⇁ Снятие блокировки чата
|
|
/chatid ⇁ Айди чата
|
|
/del ⇁ Удаление сообщения
|
|
|
|
/secret ⇁ Функция для получения ссылки html на пользователя в лс.
|
|
/html <code> ⇁ Отправка текста сообщения в режиме формата html.
|
|
|
|
/support ⇁ Написать разработчику. Прошу писать по делу.
|
|
""")
|
|
|
|
@bot.message_handler(commands=['mute'])
|
|
def mute(message):
|
|
try:
|
|
if have_rights(message):
|
|
target = get_target(message)
|
|
print(target)
|
|
if target:
|
|
if len(message.text.split()) == 1:
|
|
bot.restrict_chat_member(message.chat.id, target, until_date = message.date)
|
|
bot.reply_to(message, f"""Пользователь { get_name(message) } был заглушен.""", parse_mode='HTML')
|
|
elif len(message.text.split()) == 2 and message.text.split()[1][0] == "@":
|
|
bot.restrict_chat_member(message.chat.id, target, until_date = message.date)
|
|
bot.reply_to(message, f"""Пользователь { get_name(message) } был заглушен.""")
|
|
else:
|
|
time = int(message.text.split()[1]) * 60
|
|
bot.restrict_chat_member(message.chat.id, target, until_date = message.date+time)
|
|
bot.reply_to(message, f"""Пользователь { get_name(message) } был заглушён на {time/60} минут(ы).""", parse_mode='HTML')
|
|
else:
|
|
catch_error(message, "no_user")
|
|
except:
|
|
catch_error(message)
|
|
|
|
|
|
@bot.message_handler(commands=['unmute'])
|
|
def unmute(message):
|
|
try:
|
|
if have_rights(message):
|
|
target = get_target(message)
|
|
if target:
|
|
bot.restrict_chat_member(message.chat.id, target, can_send_messages=True, can_send_other_messages = True, until_date = message.date)
|
|
bot.reply_to(message, f"""Пользователь { get_name(message) } снова имеет дар речи.
|
|
""", parse_mode='HTML')
|
|
else:
|
|
catch_error(message, "no_user")
|
|
else:
|
|
bot.reply_to(message, "Увы, но у вас нету прав.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
|
|
@bot.message_handler(commands=['kick'])
|
|
def kick(message):
|
|
try:
|
|
if have_rights(message):
|
|
target = get_target(message)
|
|
if target:
|
|
bot.ban_chat_member(message.chat.id, target)
|
|
bot.unban_chat_member(message.chat.id, target)
|
|
|
|
bot.reply_to(message, f"""Пользователь { get_name(message) } был исключён.
|
|
""", parse_mode='HTML')
|
|
else:
|
|
catch_error(message, "no_user")
|
|
else:
|
|
bot.reply_to(message, "Увы, но у вас нету прав.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
|
|
@bot.message_handler(commands=['ban'])
|
|
def ban(message):
|
|
try:
|
|
if have_rights(message):
|
|
target = get_target(message)
|
|
if target:
|
|
bot.ban_chat_member(message.chat.id, target)
|
|
bot.reply_to(message, f"""Пользователь { get_name(message) } исключён и заблокирован.
|
|
""", parse_mode='HTML')
|
|
else:
|
|
catch_error(message, "no_user")
|
|
else:
|
|
bot.reply_to(message, "Увы, но у вас нету прав.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
@bot.message_handler(commands=['unban'])
|
|
def unban(message):
|
|
try:
|
|
if have_rights(message):
|
|
target = get_target(message)
|
|
if target:
|
|
bot.unban_chat_member(message.chat.id, target)
|
|
bot.reply_to(message, f"""Пользователь { get_name(message) } разблокирован.
|
|
""", parse_mode='HTML')
|
|
else:
|
|
catch_error(message, "no_user")
|
|
else:
|
|
bot.reply_to(message, "Увы, но у вас нету прав.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
@bot.message_handler(commands=['setwelcome'])
|
|
def setwelcome(message):
|
|
try:
|
|
if have_rights(message):
|
|
global db
|
|
db[str(message.chat.id)] = message.html_text[ message.text.find(" ") + 1 :]
|
|
|
|
bot.reply_to(message, f"""Установлено новое приветственное сообщение:
|
|
\n{db[str(message.chat.id)]}""", parse_mode='HTML')
|
|
write_db()
|
|
else:
|
|
bot.reply_to(message, "Увы, но у вас нету прав.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
@bot.message_handler(commands=['welcome'])
|
|
def welcome(message):
|
|
try:
|
|
global db
|
|
read_db()
|
|
bot.reply_to(message, f"""Приветственное сообщение:
|
|
\n{db[str(message.chat.id)]}""", parse_mode='HTML')
|
|
except:
|
|
catch_error(message)
|
|
|
|
@bot.message_handler(commands=['lock'])
|
|
def lock(message):
|
|
try:
|
|
if have_rights(message):
|
|
bot.set_chat_permissions(message.chat.id, telebot.types.ChatPermissions(can_send_messages=False, can_send_other_messages = False, can_send_polls = False))
|
|
bot.reply_to(message, "Чат был заблокирован 🔒")
|
|
else:
|
|
bot.reply_to(message, "Увы, но у вас нету прав.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
@bot.message_handler(commands=['unlock'])
|
|
def unlock(message):
|
|
try:
|
|
if have_rights(message):
|
|
bot.set_chat_permissions(message.chat.id, telebot.types.ChatPermissions(can_send_messages=True, can_send_other_messages = True, can_send_polls = True))
|
|
bot.reply_to(message, "Чат был разблокирован 🔓")
|
|
else:
|
|
bot.reply_to(message, "Увы, но у вас нету прав.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
@bot.message_handler(commands=['id'])
|
|
def getid(message):
|
|
try:
|
|
if have_rights(message):
|
|
bot.reply_to(message, "ID: " + telebot.formatting.hcode(str(get_target(message))), parse_mode="HTML" )
|
|
else:
|
|
bot.reply_to(message, "Увы, но у вас нету прав.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
|
|
@bot.message_handler(commands=['del'])
|
|
def delete(message):
|
|
try:
|
|
if have_rights(message):
|
|
bot.delete_message(message.chat.id, message.reply_to_message.id)
|
|
bot.delete_message(message.chat.id, message.id)
|
|
else:
|
|
bot.reply_to(message, "Увы, но у вас нету прав.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
|
|
@bot.message_handler(commands=['secret'])
|
|
def secret(message):
|
|
try:
|
|
if have_rights(message):
|
|
bot.send_message(message.from_user.id, telebot.util.user_link(message.reply_to_message.from_user))
|
|
bot.delete_message(message.chat.id, message.id)
|
|
else:
|
|
bot.reply_to(message, "Увы, но у вас нету прав.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
@bot.message_handler(commands=['html'])
|
|
def html(message):
|
|
try:
|
|
text = ' '.join( message.text.split()[1:] )
|
|
bot.send_message(message.chat.id , text, parse_mode='HTML')
|
|
bot.delete_message(message.chat.id, message.id)
|
|
except:
|
|
catch_error(message)
|
|
|
|
@bot.message_handler(commands=['chatid'])
|
|
def chatid(message):
|
|
try:
|
|
if have_rights(message):
|
|
bot.reply_to(message, "Айди чата: " + telebot.formatting.hcode( str(message.chat.id) ), parse_mode='HTML')
|
|
else:
|
|
bot.reply_to(message, "Увы, но у вас нету прав.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
############ LOW-ADMIN ##############
|
|
|
|
def have_la(id):
|
|
try:
|
|
global la ; read_la()
|
|
if id in la:
|
|
return True
|
|
else:
|
|
la[id] = []
|
|
write_la()
|
|
return True
|
|
except:
|
|
catch_error(message)
|
|
|
|
@bot.message_handler(commands=['la-list'])
|
|
def la_list(message):
|
|
try:
|
|
if have_rights(message, set_la=True):
|
|
global la ; read_la()
|
|
if have_la(str(message.chat.id)):
|
|
s = "Список администраторов в режиме low-admin:\n"
|
|
for i in la[str(message.chat.id)]:
|
|
s = s + '\n@' + i
|
|
bot.reply_to(message, s, parse_mode='HTML')
|
|
except:
|
|
catch_error(message)
|
|
|
|
@bot.message_handler(commands=['la-add'])
|
|
def la_add(message):
|
|
try:
|
|
if have_rights(message, set_la=True):
|
|
global la ; read_la()
|
|
if have_la(message.chat.id):
|
|
nick = message.text.split()[1][1:]
|
|
la[message.chat.id].append(nick)
|
|
write_la()
|
|
bot.reply_to(message, f"Пользователь @{nick} успешно добавлен в список администраторов.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
@bot.message_handler(commands=['la-del'])
|
|
def la_del(message, set_la=True):
|
|
try:
|
|
if have_rights(message, set_la=True):
|
|
global la ; read_la()
|
|
if have_la(message.chat.id):
|
|
nick = message.text.split()[1][1:]
|
|
|
|
if nick in la[message.chat.id]:
|
|
del la[message.chat.id]
|
|
write_la()
|
|
bot.reply_to(message, f"Пользователь @{nick} был исключён из списка администраторов.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
|
|
#######################JOIN REQUEST #############
|
|
|
|
|
|
@bot.chat_join_request_handler()
|
|
def join_request(message: telebot.types.ChatJoinRequest):
|
|
try:
|
|
bot.send_message(message.chat.id, f"""Поступила заявка на вступление от { telebot.util.user_link(message.from_user) }
|
|
Принять: { telebot.formatting.hcode(f"/accept {message.from_user.id}") }""", parse_mode="HTML")
|
|
except:
|
|
catch_error(message)
|
|
|
|
@bot.message_handler(commands=['accept'])
|
|
def accept_request(message):
|
|
try:
|
|
if have_rights(message):
|
|
if len(message.text.split()) == 2:
|
|
bot.approve_chat_join_request(message.chat.id, message.text.split()[1] )
|
|
bot.reply_to(message, "Заявка принята.")
|
|
except:
|
|
catch_error(message)
|
|
|
|
######################SUPPORT########
|
|
|
|
@bot.message_handler(commands=['support'])
|
|
def support(message):
|
|
try:
|
|
bot.reply_to(message, f"""Связь с аднимистратором в @just_anonchat_bot :
|
|
Адрес - {telebot.formatting.hcode("c:justuser")}""", parse_mode="HTML")
|
|
except:
|
|
catch_error(message)
|
|
|
|
#####################WELCOME#####
|
|
|
|
@bot.message_handler(content_types=["new_chat_members"])
|
|
def handler_new_member(message):
|
|
try:
|
|
global db
|
|
read_db()
|
|
bot.reply_to(message, db[str(message.chat.id)], parse_mode='HTML')
|
|
except:
|
|
catch_error(message)
|
|
|
|
##############ANALYTIC########
|
|
|
|
@bot.message_handler()
|
|
def catch_all_messages(message):
|
|
analytic(message)
|
|
|
|
# For what?
|
|
# This add users to db for using command like:
|
|
# /ban @username
|
|
# Without reply to message. All usernames hashed.
|
|
|
|
|
|
##################CATCH ERRORS####
|
|
|
|
import logging
|
|
import traceback
|
|
|
|
from io import StringIO # For catch log to variable
|
|
|
|
# Basic init
|
|
global log_stream
|
|
log_stream = StringIO()
|
|
logging.basicConfig(stream=log_stream)
|
|
|
|
def catch_error(message, err_type = None):
|
|
if not err_type:
|
|
global log_stream
|
|
|
|
logging.error(traceback.format_exc()) # Log error
|
|
err = log_stream.getvalue() # Error to variable
|
|
|
|
bot.send_message(message.chat.id, "Critical error (свяжитись через /support ) :\n\n" + telebot.formatting.hcode(err), parse_mode='HTML')
|
|
|
|
log_stream.truncate(0) # Clear
|
|
log_stream.seek(0) # Clear
|
|
elif err_type == "no_user":
|
|
bot.send_message(message.chat.id, "Не указан пользователь.")
|
|
|
|
|
|
##################MAIN THREAD#####
|
|
#'''
|
|
while True:
|
|
try:
|
|
bot.polling()
|
|
except KeyboardInterrupt:
|
|
exit()
|
|
except:
|
|
pass
|
|
'''
|
|
bot.polling(allowed_updates=telebot.util.update_types)
|
|
'''
|