You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

544 lines
15 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import telebot
import os
import json
from telebot import types,util
global db, users, la
####### CREATE DB IF NOT EXIST
if not os.path.exists('db.json'):
db = {"token": "None"}
js = json.dumps(db, indent=2)
with open("db.json", "w") as outfile:
outfile.write(js)
print('Input token in "None" (db.json)')
exit()
if not os.path.exists('users.json'):
users = {}
js = json.dumps(users, indent=2)
with open("users.json", "w") as outfile:
outfile.write(js)
if not os.path.exists('la.json'):
la = {}
js = json.dumps(la, indent=2)
with open("la.json", "w") as outfile:
outfile.write(js)
############WORK WITH DBs##########
def read_db():
global db
with open('db.json', 'r') as openfile:
db = json.load(openfile)
def write_db():
global db
js = json.dumps(db, indent=2)
with open("db.json", "w") as outfile:
outfile.write(js)
def read_users():
global users
with open('users.json', 'r') as openfile:
users = json.load(openfile)
def write_users():
global users
js = json.dumps(users, indent=2)
with open("users.json", "w") as outfile:
outfile.write(js)
# LA - Low Admin.
# Admin permissions in bot without admin rights.
def read_la():
global la
with open('la.json', 'r') as openfile:
la = json.load(openfile)
def write_la():
global la
js = json.dumps(la, indent=2)
with open("la.json", "w") as outfile:
outfile.write(js)
####################FAST HASH#################
from xxhash import xxh32
# Generate fast hash
def sha(text):
text = str(text)
return xxh32(text).hexdigest()
##################FUNCTIONS########
def get_admins(message):
try:
if bot.get_chat(message.chat.id).type == "private":
return []
else:
admins = bot.get_chat_administrators(chat_id=message.chat.id)
true_admins = []
for i in admins:
if i.status == "creator" or i.can_restrict_members == True:
true_admins.append(i.user.id)
return true_admins
except:
catch_error(message)
return None
# Fix for anon admins, all anon (not premium) users == admins
def is_anon(message):
if message.from_user.username == "Channel_Bot" or message.from_user.username == "GroupAnonymousBot":
if message.from_user.is_premium == None:
return True
else:
return False
def get_target(message):
try:
if len(message.text.split()) > 1 and message.text.split()[1][0] == "@":
username = message.text.split()[1][1:]
global users
read_users()
if sha(username) in users:
return users[sha(username)]
else:
return None
else:
target = message.reply_to_message.from_user.id
if target not in get_admins(message):
return target
else:
return None
except:
return None
def get_name(message):
try:
text = message.text.split()
# If message with @username
if len(text) > 1 and text[1][0] == '@':
return text[1]
# Reply to message
else:
return telebot.util.user_link(message.reply_to_message.from_user)
except:
catch_error(message)
def have_rights(message, set_la = False):
global la ; read_la()
if message.from_user.id in get_admins(message):
return True
elif is_anon(message):
return True
elif str(message.chat.id) in la and not set_la:
if str(message.from_user.username) in la[str(message.chat.id)]:
return True
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
def key_by_value(dictionary, key):
for i in dictionary:
if dictionary[i] == key:
return i
return None
def analytic(message):
global users
read_users()
if key_by_value(users, message.from_user.id) == message.from_user.username:
pass
elif message.from_user.username == "None":
pass
else:
users[sha(message.from_user.username)] = message.from_user.id
write_users()
#############TOKEN INIT#####
read_db()
read_users()
bot = telebot.TeleBot(db['token'])
##################COMMANDS#######
@bot.message_handler(commands=['start', 'faq'])
def send_welcome(message):
bot.reply_to(message, """Колотушка работает 🔨
Что это такое?
Это минимальный бот-модератор без слежки с открым кодом.
Код: https://gitea.gulyaipole.fun/justuser/just_moderator
Список команд - /help
""")
@bot.message_handler(commands=['help'])
def help(message):
bot.reply_to(message, """
Список команд:
/mute ⇁ Мут человека в ответ на сообщение
/unmute ⇁ Снятие мута
/kick ⇁ Кик
/ban ⇁ Бан
/unban ⇁ Снятие бана
/setwelcome ⇁ Приветственное сообщение
/welcome ⇁ Демонстрация текущего сообщения
/lock ⇁ Блокировка чата (для обычных пользователей)
/unlock ⇁ Снятие блокировки чата
/chatid ⇁ Айди чата
/del ⇁ Удаление сообщения
/secret ⇁ Функция для получения ссылки html на пользователя в лс.
/html <code> ⇁ Отправка текста сообщения в режиме формата html.
/support ⇁ Написать разработчику. Прошу писать по делу.
""")
@bot.message_handler(commands=['mute'])
def mute(message):
try:
if have_rights(message):
target = get_target(message)
if target:
if len(message.text.split()) == 1:
bot.restrict_chat_member(message.chat.id, target, until_date = message.date)
bot.reply_to(message, f"""Пользователь { get_name(message) } был заглушен.""", parse_mode='HTML')
elif len(message.text.split()) == 2 and message.text.split()[1][0] == "@":
bot.restrict_chat_member(message.chat.id, target, until_date = message.date)
bot.reply_to(message, f"""Пользователь { get_name(message) } был заглушен.""")
else:
time = int(message.text.split()[1]) * 60
bot.restrict_chat_member(message.chat.id, target, until_date = message.date+time)
bot.reply_to(message, f"""Пользователь { get_name(message) } был заглушён на {time/60} минут(ы).""", parse_mode='HTML')
else:
catch_error(message, "no_user")
except:
catch_error(message)
@bot.message_handler(commands=['unmute'])
def unmute(message):
try:
if have_rights(message):
target = get_target(message)
if target:
bot.restrict_chat_member(message.chat.id, target, can_send_messages=True, can_send_other_messages = True, until_date = message.date)
bot.reply_to(message, f"""Пользователь { get_name(message) } снова имеет дар речи.
""", parse_mode='HTML')
else:
catch_error(message, "no_user")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['kick'])
def kick(message):
try:
if have_rights(message):
target = get_target(message)
if target:
bot.ban_chat_member(message.chat.id, target)
bot.unban_chat_member(message.chat.id, target)
bot.reply_to(message, f"""Пользователь { get_name(message) } был исключён.
""", parse_mode='HTML')
else:
catch_error(message, "no_user")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['ban'])
def ban(message):
try:
if have_rights(message):
target = get_target(message)
if target:
bot.ban_chat_member(message.chat.id, target)
bot.reply_to(message, f"""Пользователь { get_name(message) } исключён и заблокирован.
""", parse_mode='HTML')
else:
catch_error(message, "no_user")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['unban'])
def unban(message):
try:
if have_rights(message):
target = get_target(message)
if target:
bot.unban_chat_member(message.chat.id, target)
bot.reply_to(message, f"""Пользователь { get_name(message) } разблокирован.
""", parse_mode='HTML')
else:
catch_error(message, "no_user")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['setwelcome'])
def setwelcome(message):
try:
if have_rights(message):
global db
db[str(message.chat.id)] = message.html_text[ message.text.find(" ") + 1 :]
bot.reply_to(message, f"""Установлено новое приветственное сообщение:
\n{db[str(message.chat.id)]}""", parse_mode='HTML')
write_db()
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['welcome'])
def welcome(message):
try:
global db
read_db()
bot.reply_to(message, f"""Приветственное сообщение:
\n{db[str(message.chat.id)]}""", parse_mode='HTML')
except:
catch_error(message)
@bot.message_handler(commands=['lock'])
def lock(message):
try:
if have_rights(message):
bot.set_chat_permissions(message.chat.id, telebot.types.ChatPermissions(can_send_messages=False, can_send_other_messages = False, can_send_polls = False))
bot.reply_to(message, "Чат был заблокирован 🔒")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['unlock'])
def unlock(message):
try:
if have_rights(message):
bot.set_chat_permissions(message.chat.id, telebot.types.ChatPermissions(can_send_messages=True, can_send_other_messages = True, can_send_polls = True))
bot.reply_to(message, "Чат был разблокирован 🔓")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['id'])
def getid(message):
try:
if have_rights(message):
bot.reply_to(message, "ID: " + telebot.formatting.hcode(str(get_target(message))), parse_mode="HTML" )
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['del'])
def secret(message):
try:
if have_rights(message):
bot.delete_message(message.chat.id, message.reply_to_message.id)
bot.delete_message(message.chat.id, message.id)
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['secret'])
def secret(message):
try:
if have_rights(message):
bot.send_message(message.from_user.id, telebot.util.user_link(message.reply_to_message.from_user))
bot.delete_message(message.chat.id, message.id)
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['html'])
def html(message):
try:
text = ' '.join( message.text.split()[1:] )
bot.send_message(message.chat.id , text, parse_mode='HTML')
bot.delete_message(message.chat.id, message.id)
except:
catch_error(message)
@bot.message_handler(commands=['chatid'])
def chatid(message):
try:
if have_rights(message):
bot.reply_to(message, "Айди чата: " + telebot.formatting.hcode( str(message.chat.id) ), parse_mode='HTML')
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
############ LOW-ADMIN ##############
def have_la(id):
try:
global la ; read_la()
if id in la:
return True
else:
la[id] = []
write_la()
return True
except:
catch_error(message)
@bot.message_handler(commands=['la-list'])
def la_list(message):
try:
if have_rights(message, set_la=True):
global la ; read_la()
if have_la(str(message.chat.id)):
s = "Список администраторов в режиме low-admin:\n"
for i in la[str(message.chat.id)]:
s = s + '\n@' + i
bot.reply_to(message, s, parse_mode='HTML')
except:
catch_error(message)
@bot.message_handler(commands=['la-add'])
def la_add(message):
try:
if have_rights(message, set_la=True):
global la ; read_la()
if have_la(message.chat.id):
nick = message.text.split()[1][1:]
la[message.chat.id].append(nick)
write_la()
bot.reply_to(message, f"Пользователь @{nick} успешно добавлен в список администраторов.")
except:
catch_error(message)
@bot.message_handler(commands=['la-del'])
def la_del(message, set_la=True):
try:
if have_rights(message, set_la=True):
global la ; read_la()
if have_la(message.chat.id):
nick = message.text.split()[1][1:]
if nick in la[message.chat.id]:
del la[message.chat.id]
write_la()
bot.reply_to(message, f"Пользователь @{nick} был исключён из списка администраторов.")
except:
catch_error(message)
#######################JOIN REQUEST #############
@bot.chat_join_request_handler()
def join_request(message: telebot.types.ChatJoinRequest):
try:
bot.send_message(message.chat.id, f"""Поступила заявка на вступление от { telebot.util.user_link(message.from_user) }
Принять: { telebot.formatting.hcode(f"/accept {message.from_user.id}") }""", parse_mode="HTML")
except:
catch_error(message)
@bot.message_handler(commands=['accept'])
def accept_request(message):
try:
if have_rights(message):
if len(message.text.split()) == 2:
bot.approve_chat_join_request(message.chat.id, message.text.split()[1] )
bot.reply_to(message, "Заявка принята.")
except:
catch_error(message)
######################SUPPORT########
@bot.message_handler(commands=['support'])
def support(message):
try:
bot.reply_to(message, f"""Связь с аднимистратором в @just_anonchat_bot :
Адрес - {telebot.formatting.hcode("c:justuser")}""", parse_mode="HTML")
except:
catch_error(message)
#####################WELCOME#####
@bot.message_handler(content_types=["new_chat_members"])
def handler_new_member(message):
try:
global db
read_db()
bot.reply_to(message, db[str(message.chat.id)], parse_mode='HTML')
except:
catch_error(message)
##############ANALYTIC########
@bot.message_handler()
def catch_all_messages(message):
analytic(message)
# For what?
# This add users to db for using command like:
# /ban @username
# Without reply to message. All usernames hashed.
##################CATCH ERRORS####
import logging
import traceback
from io import StringIO # For catch log to variable
# Basic init
global log_stream
log_stream = StringIO()
logging.basicConfig(stream=log_stream)
def catch_error(message, err_type = None):
if not err_type:
global log_stream
logging.error(traceback.format_exc()) # Log error
err = log_stream.getvalue() # Error to variable
bot.send_message(message.chat.id, "Critical error (свяжитись через /support ) :\n\n" + telebot.formatting.hcode(err), parse_mode='HTML')
log_stream.truncate(0) # Clear
log_stream.seek(0) # Clear
elif err_type == "no_user":
bot.send_message(message.chat.id, "Не указан пользователь.")
##################MAIN THREAD#####
#'''
while True:
try:
bot.polling()
except KeyboardInterrupt:
exit()
except:
pass
'''
bot.polling(allowed_updates=telebot.util.update_types)
'''