import telebot import os import json from telebot import types,util global db, users, la ####### CREATE DB IF NOT EXIST if not os.path.exists('db.json'): db = {"token": "None"} js = json.dumps(db, indent=2) with open("db.json", "w") as outfile: outfile.write(js) print('Input token in "None" (db.json)') exit() if not os.path.exists('users.json'): users = {} js = json.dumps(users, indent=2) with open("users.json", "w") as outfile: outfile.write(js) if not os.path.exists('la.json'): la = {} js = json.dumps(la, indent=2) with open("la.json", "w") as outfile: outfile.write(js) ############WORK WITH DBs########## def read_db(): global db with open('db.json', 'r') as openfile: db = json.load(openfile) def write_db(): global db js = json.dumps(db, indent=2) with open("db.json", "w") as outfile: outfile.write(js) def read_users(): global users with open('users.json', 'r') as openfile: users = json.load(openfile) def write_users(): global users js = json.dumps(users, indent=2) with open("users.json", "w") as outfile: outfile.write(js) # LA - Low Admin. # Admin permissions in bot without admin rights. def read_la(): with open('la.json', 'r') as openfile: la = json.load(openfile) return la def write_la(la): js = json.dumps(la, indent=2) with open("la.json", "w") as outfile: outfile.write(js) ####################FAST HASH################# from xxhash import xxh32 # Generate fast hash def sha(text): text = str(text) return xxh32(text).hexdigest() ##################FUNCTIONS######## def get_admins(message): try: if bot.get_chat(message.chat.id).type == "private": return [] else: admins = bot.get_chat_administrators(chat_id=message.chat.id) true_admins = [] for i in admins: if i.status == "creator" or i.can_restrict_members == True: true_admins.append(i.user.id) return true_admins except Exception as e: catch_error(message, e) return None # Fix for anon admins, all anon (not premium) users == admins def is_anon(message): if message.from_user.username == "Channel_Bot" or message.from_user.username == "GroupAnonymousBot": if message.from_user.is_premium == None: return True else: return False # Return id from db/chat of user def get_target(message): try: global users spl = message.text.split() if ( len(spl) > 1 and spl[1][0] == '@' ) or ( len(spl) > 2 and spl[2][0] == '@' ): for i in spl: if i[0] == '@': username = i[1:] break read_users() if sha(username) in users: return users[sha(username)] else: return None else: target = message.reply_to_message.from_user.id if target not in get_admins(message): return target else: return None except: return None def get_name(message): try: text = message.text.split() # If message with @username if len(text) > 1 and text[1][0] == '@': return text[1] if len(text) > 2 and text[2][0] == '@': return text[2] # Reply to message else: return telebot.util.user_link(message.reply_to_message.from_user) except Exception as e: catch_error(message, e) # Get time for '/mute' # [time, time_in_sec, format] def get_time(message): formats = {'s':[1, 'секунд(ы)'], 'm':[60, 'минут(ы)'], 'h': [3600,'час(а)'], 'd': [86400,'день/дня']} text = message.text.split()[1:] ; time = None # Find format in text for i in text: if time: break for f in list(formats.keys()): if f in i: try: time = [i[:-1], int(i[:-1]) * formats[i[-1]][0] , formats[i[-1]][1] ] break except: pass return time def have_rights(message, set_la = False): la = read_la() if message.from_user.id in get_admins(message): return True elif is_anon(message): return True elif str(message.chat.id) in la and not set_la: if str(message.from_user.username) in la[str(message.chat.id)]: return True else: bot.reply_to(message, "Увы, но у вас нету прав.") def key_by_value(dictionary, key): for i in dictionary: if dictionary[i] == key: return i return None def analytic(message): global users read_users() if key_by_value(users, message.from_user.id) == message.from_user.username: pass elif message.from_user.username == "None": pass else: users[sha(message.from_user.username)] = message.from_user.id write_users() #############TOKEN INIT##### read_db() read_users() bot = telebot.TeleBot(db['token']) ##################COMMANDS####### @bot.message_handler(commands=['start', 'faq']) def send_welcome(message): bot.reply_to(message, """Колотушка работает 🔨 Что это такое? Это минимальный бот-модератор без слежки с открытым кодом. Код: https://gitea.gulyaipole.fun/justuser/just_moderator Список команд - /help """) @bot.message_handler(commands=['help']) def help(message): bot.reply_to(message, """ Список команд: /mute ⇁ Мут человека в ответ на сообщение /unmute ⇁ Снятие мута /kick ⇁ Кик /ban ⇁ Бан /unban ⇁ Снятие бана /setwelcome ⇁ Приветственное сообщение /welcome ⇁ Демонстрация текущего сообщения /lock ⇁ Блокировка чата (для обычных пользователей) /unlock ⇁ Снятие блокировки чата /chatid ⇁ Айди чата /del ⇁ Удаление сообщения /la - всё о режиме 'low admin' /secret ⇁ Функция для получения ссылки html на пользователя в лс. /html ⇁ Отправка текста сообщения в режиме формата html. /support ⇁ Написать разработчику. Прошу писать по делу. """) @bot.message_handler(commands=['mute']) def mute(message): try: if have_rights(message): target = get_target(message) time = get_time(message) if target: if time: bot.restrict_chat_member(message.chat.id, target, until_date = message.date + time[1]) answer = f"Пользователь { get_name(message) } был заглушён на {time[0]} {time[2]}." else: bot.restrict_chat_member(message.chat.id, target, until_date = message.date) answer = f"Пользователь { get_name(message) } был заглушен." try: bot.reply_to(message, answer, parse_mode='HTML') except: bot.reply_to(message, answer) else: catch_error(message, 'None', 'no_user') except Exception as e: catch_error(message, e) @bot.message_handler(commands=['unmute']) def unmute(message): try: if have_rights(message): target = get_target(message) if target: bot.restrict_chat_member(message.chat.id, target, can_send_messages=True, can_send_other_messages = True, until_date = message.date) bot.reply_to(message, f"""Пользователь { get_name(message) } снова имеет дар речи. """, parse_mode='HTML') else: catch_error(message, None, "no_user") except Exception as e: catch_error(message, e) @bot.message_handler(commands=['kick']) def kick(message): try: if have_rights(message): target = get_target(message) if target: bot.ban_chat_member(message.chat.id, target) bot.unban_chat_member(message.chat.id, target) bot.reply_to(message, f"""Пользователь { get_name(message) } был исключён. """, parse_mode='HTML') else: catch_error(message, None, "no_user") except Exception as e: catch_error(message, e) @bot.message_handler(commands=['ban']) def ban(message): try: if have_rights(message): target = get_target(message) if target: bot.ban_chat_member(message.chat.id, target) bot.reply_to(message, f"""Пользователь { get_name(message) } исключён и заблокирован. """, parse_mode='HTML') else: catch_error(message, None, "no_user") except Exception as e: catch_error(message, e) @bot.message_handler(commands=['unban']) def unban(message): try: if have_rights(message): target = get_target(message) if target: bot.unban_chat_member(message.chat.id, target) bot.reply_to(message, f"""Пользователь { get_name(message) } разблокирован. """, parse_mode='HTML') else: catch_error(message, None, "no_user") except Exception as e: catch_error(message, e) @bot.message_handler(commands=['setwelcome']) def setwelcome(message): try: if have_rights(message): global db db[str(message.chat.id)] = message.html_text[ message.text.find(" ") + 1 :] bot.reply_to(message, f"""Установлено новое приветственное сообщение: \n{db[str(message.chat.id)]}""", parse_mode='HTML') write_db() except Exception as e: catch_error(message, e) @bot.message_handler(commands=['welcome']) def welcome(message): try: global db read_db() bot.reply_to(message, f"""Приветственное сообщение: \n{db[str(message.chat.id)]}""", parse_mode='HTML') except Exception as e: catch_error(message, e) @bot.message_handler(commands=['lock']) def lock(message): try: if have_rights(message): bot.set_chat_permissions(message.chat.id, telebot.types.ChatPermissions(can_send_messages=False, can_send_other_messages = False, can_send_polls = False)) bot.reply_to(message, "Чат был заблокирован 🔒") else: bot.reply_to(message, "Увы, но у вас нету прав.") except Exception as e: catch_error(message, e) @bot.message_handler(commands=['unlock']) def unlock(message): try: if have_rights(message): bot.set_chat_permissions(message.chat.id, telebot.types.ChatPermissions(can_send_messages=True, can_send_other_messages = True, can_send_polls = True)) bot.reply_to(message, "Чат был разблокирован 🔓") except Exception as e: catch_error(message, e) @bot.message_handler(commands=['id']) def getid(message): try: if have_rights(message): bot.reply_to(message, "ID: " + telebot.formatting.hcode(str(get_target(message))), parse_mode="HTML" ) except Exception as e: catch_error(message, e) @bot.message_handler(commands=['del']) def delete(message): try: if have_rights(message): bot.delete_message(message.chat.id, message.reply_to_message.id) bot.delete_message(message.chat.id, message.id) except Exception as e: catch_error(message, e) ### EXPEREMENTAL @bot.message_handler(commands=['secret']) def secret(message): try: if have_rights(message): bot.send_message(message.from_user.id, telebot.util.user_link(message.reply_to_message.from_user)) bot.delete_message(message.chat.id, message.id) except Exception as e: catch_error(message, e) @bot.message_handler(commands=['html']) def html(message): try: text = ' '.join( message.text.split()[1:] ) bot.send_message(message.chat.id , text, parse_mode='HTML') bot.delete_message(message.chat.id, message.id) except Exception as e: catch_error(message, e) ### @bot.message_handler(commands=['chatid']) def chatid(message): try: if have_rights(message): bot.reply_to(message, "Айди чата: " + telebot.formatting.hcode( str(message.chat.id) ), parse_mode='HTML') else: bot.reply_to(message, "Увы, но у вас нету прав.") except Exception as e: catch_error(message, e) ############ LOW-ADMIN ############## def have_la(id): try: la = read_la() if id in la: return True else: la[id] = [] write_la(la) return True except Exception as e: catch_error(message, e) @bot.message_handler(commands=['la']) def la(message): bot.reply_to(message, f'''Доступные команды: {telebot.formatting.hcode("/la-list")} - список администраторов в режиме low-admin {telebot.formatting.hcode("/la-add @nick")} - добавить в администраторы {telebot.formatting.hcode("/la-del @nick")} - удалить из администраторов ''', parse_mode = 'HTML') @bot.message_handler(commands=['la-list']) def la_list(message): try: if have_rights(message, set_la=True): la = read_la() if have_la(str(message.chat.id)): s = "Список администраторов в режиме low-admin:\n" for i in la[str(message.chat.id)]: s = s + '\n@' + i bot.reply_to(message, s, parse_mode='HTML') except Exception as e: catch_error(message, e) @bot.message_handler(commands=['la-add']) def la_add(message): try: if have_rights(message, set_la=True): la = read_la() if have_la(message.chat.id): nick = message.text.split()[1][1:] print(la) la[str(message.chat.id)].append(nick) print(la) write_la(la) bot.reply_to(message, f"Пользователь @{nick} успешно добавлен в список администраторов.") except Exception as e: catch_error(message, e) @bot.message_handler(commands=['la-del']) def la_del(message, set_la=True): try: if have_rights(message, set_la=True): la = read_la() if have_la(message.chat.id): nick = message.text.split()[1][1:] if nick in la[str(message.chat.id)]: la[str(message.chat.id)].remove(nick) write_la(la) bot.reply_to(message, f"Пользователь @{nick} был исключён из списка администраторов.") except Exception as e: catch_error(message, e) #######################JOIN REQUEST ############# @bot.chat_join_request_handler() def join_request(message: telebot.types.ChatJoinRequest): try: bot.send_message(message.chat.id, f"""Поступила заявка на вступление от { telebot.util.user_link(message.from_user) } Принять: { telebot.formatting.hcode(f"/accept {message.from_user.id}") }""", parse_mode="HTML") except Exception as e: catch_error(message, e) @bot.message_handler(commands=['accept']) def accept_request(message): try: if have_rights(message): if len(message.text.split()) == 2: bot.approve_chat_join_request(message.chat.id, message.text.split()[1] ) bot.reply_to(message, "Заявка принята.") except Exception as e: catch_error(message, e) ######################SUPPORT######## @bot.message_handler(commands=['support']) def support(message): try: bot.reply_to(message, f"""Связь с аднимистратором в @just_anonchat_bot Адрес - {telebot.formatting.hcode(":justuser")}""", parse_mode="HTML") except Exception as e: catch_error(message, e) #####################WELCOME##### @bot.message_handler(content_types=["new_chat_members"]) def handler_new_member(message): try: global db read_db() bot.reply_to(message, db[str(message.chat.id)], parse_mode='HTML') except Exception as e: catch_error(message, e) ##############ANALYTIC######## @bot.message_handler() def catch_all_messages(message): analytic(message) # For what? # This add users to db for using command like: # /ban @username # Without reply to message. All usernames hashed. ##################CATCH ERRORS#### import logging import traceback from io import StringIO # For catch log to variable # Basic init global log_stream log_stream = StringIO() logging.basicConfig(stream=log_stream) # Known errors known_errs = { "A request to the Telegram API was unsuccessful. Error code: 400. Description: Bad Request: not enough rights to restrict/unrestrict chat member": "Увы, но у бота не хватает прав для этого." } # message error err_type ('no_user', ...) def catch_error(message, e, err_type = None): if not err_type: global log_stream, known_errs e = str(e) # Check error in known_errs print(e) if e in known_errs: bot.send_message(message.chat.id, known_errs[e]) else: logging.error(traceback.format_exc()) # Log error err = log_stream.getvalue() # Error to variable bot.send_message(message.chat.id, "Critical error (свяжитись через /support ) :\n\n" + telebot.formatting.hcode(err), parse_mode='HTML') log_stream.truncate(0) # Clear log_stream.seek(0) # Clear elif err_type == "no_user": bot.send_message(message.chat.id, "Не указан пользователь.") ##################MAIN THREAD##### ''' while True: try: bot.polling() except KeyboardInterrupt: exit() except: pass ''' bot.polling() #'''