You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

508 lines
15 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import telebot
import os
import json
from telebot import types,util
global db, users, blocks
####### CREATE DB IF NOT EXIST
if not os.path.exists('db.json'):
db = {"token": "None"}
js = json.dumps(db, indent=2)
with open("db.json", "w") as outfile:
outfile.write(js)
print('Input token in "None" (db.json)')
exit()
if not os.path.exists('users.json'):
users = {}
js = json.dumps(users, indent=2)
with open("users.json", "w") as outfile:
outfile.write(js)
if not os.path.exists('blocks.json'):
blocks = []
js = json.dumps(blocks, indent=2)
with open("blocks.json", "w") as outfile:
outfile.write(js)
############WORK WITH DBs##########
def read_db():
global db
with open('db.json', 'r') as openfile:
db = json.load(openfile)
def write_db():
global db
js = json.dumps(db, indent=2)
with open("db.json", "w") as outfile:
outfile.write(js)
def read_users():
global users
with open('users.json', 'r') as openfile:
users = json.load(openfile)
def write_users():
global users
js = json.dumps(users, indent=2)
with open("users.json", "w") as outfile:
outfile.write(js)
def read_blocks():
global blocks
with open('blocks.json', 'r') as openfile:
blocks = json.load(openfile)
def write_blocks():
global blocks
js = json.dumps(blocks, indent=2)
with open("blocks.json", "w") as outfile:
outfile.write(js)
####################FAST HASH#################
from xxhash import xxh32
# Generate fast hash
def sha(text):
text = str(text)
return xxh32(text).hexdigest()
##################FUNCTIONS########
def get_admins(message):
try:
if bot.get_chat(message.chat.id).type == "private":
return []
else:
admins = bot.get_chat_administrators(chat_id=message.chat.id)
true_admins = []
for i in admins:
if i.status == "creator" or i.can_restrict_members == True:
true_admins.append(i.user.id)
return true_admins
except:
catch_error(message)
return None
# Fix for anon admins, all anon (not premium) users == admins
def is_anon(message):
if message.from_user.username == "Channel_Bot" and message.from_user.is_premium == None:
return True
else:
return False
def get_target(message):
try:
# if True:
if len(message.text.split()) > 1 and message.text.split()[1][0] == "@":
username = message.text.split()[1][1:]
global users
if sha(username) in users:
return users[sha(username)]
else:
return None
else:
target = message.reply_to_message.from_user.id
if target not in get_admins(message):
return target
else:
return None
except:
return None
def get_name(message):
try:
text = message.text.split()
# If message with @username
if len(text) > 1 and text[1][0] == '@':
return text[1]
# Reply to message
else:
return telebot.util.user_link(message.reply_to_message.from_user)
except:
catch_error(message)
#return telebot.util.user_link(message.reply_to_message.from_user)
def key_by_value(dictionary, key):
for i in dictionary:
if dictionary[i] == key:
return i
return None
def analytic(message):
global users
read_users()
if key_by_value(users, message.from_user.id) == message.from_user.username:
pass
elif message.from_user.username == "None":
pass
else:
users[sha(message.from_user.username)] = message.from_user.id
write_users()
#############TOKEN INIT#####
read_db()
read_users()
bot = telebot.TeleBot(db['token'])
##################COMMANDS#######
@bot.message_handler(commands=['start', 'faq'])
def send_welcome(message):
bot.reply_to(message, """Колотушка работает 🔨
Что это такое?
Это минимальный бот-модератор без слежки с открым кодом.
Код: https://gitea.gulyaipole.fun/justuser/just_moderator
Список команд - /help
""")
@bot.message_handler(commands=['help'])
def help(message):
bot.reply_to(message, """
Список команд:
/mute ⇁ Мут человека в ответ на сообщение
/unmute ⇁ Снятие мута
/kick ⇁ Кик
/ban ⇁ Бан
/unban ⇁ Снятие бана
/setwelcome ⇁ Приветственное сообщение
/welcome ⇁ Демонстрация текущего сообщения
/lock ⇁ Блокировка чата (для обычных пользователей)
/unlock ⇁ Снятие блокировки чата
/chatid ⇁ Айди чата
/secret ⇁ Функция для получения ссылки html на пользователя в лс.
/html <code> ⇁ Отправка текста сообщения в режиме формата html.
/support Помогите :( ⇁ Написать разработчику. Прошу писать по делу.
""")
@bot.message_handler(commands=['mute'])
def mute(message):
try:
if message.from_user.id in get_admins(message) or is_anon(message):
target = get_target(message)
if target:
if len(message.text.split()) == 1:
bot.restrict_chat_member(message.chat.id, target, until_date = message.date)
bot.reply_to(message, f"""Пользователь { get_name(message) } был заглушен.""", parse_mode='HTML')
elif len(message.text.split()) == 2 and message.text.split()[1][0] == "@":
bot.restrict_chat_member(message.chat.id, target, until_date = message.date)
bot.reply_to(message, f"""Пользователь { get_name(message) } был заглушен.""")
else:
time = int(message.text.split()[1]) * 60
bot.restrict_chat_member(message.chat.id, target, until_date = message.date+time)
bot.reply_to(message, f"""Пользователь { get_name(message) } был заглушён на {time/60} минут(ы).""", parse_mode='HTML')
else:
catch_error(message, "no_user")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['unmute'])
def unmute(message):
try:
if message.from_user.id in get_admins(message) or is_anon(message):
target = get_target(message)
if target:
bot.restrict_chat_member(message.chat.id, target, can_send_messages=True, can_send_other_messages = True, until_date = message.date)
bot.reply_to(message, f"""Пользователь { get_name(message) } снова имеет дар речи.
""", parse_mode='HTML')
else:
catch_error(message, "no_user")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['kick'])
def kick(message):
try:
if message.from_user.id in get_admins(message) or is_anon(message):
target = get_target(message)
if target:
bot.ban_chat_member(message.chat.id, target)
bot.unban_chat_member(message.chat.id, target)
bot.reply_to(message, f"""Пользователь { get_name(message) } был исключён.
""", parse_mode='HTML')
else:
catch_error(message, "no_user")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['ban'])
def ban(message):
try:
if message.from_user.id in get_admins(message) or is_anon(message):
target = get_target(message)
if target:
bot.ban_chat_member(message.chat.id, target)
bot.reply_to(message, f"""Пользователь { get_name(message) } исключён и заблокирован.
""", parse_mode='HTML')
else:
catch_error(message, "no_user")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['unban'])
def unban(message):
try:
if message.from_user.id in get_admins(message) or is_anon(message):
target = get_target(message)
if target:
bot.unban_chat_member(message.chat.id, target)
bot.reply_to(message, f"""Пользователь { get_name(message) } разблокирован.
""", parse_mode='HTML')
else:
catch_error(message, "no_user")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['setwelcome'])
def setwelcome(message):
try:
if message.from_user.id in get_admins(message) or is_anon(message):
global db
db[str(message.chat.id)] = message.html_text[ message.text.find(" ") + 1 :]
bot.reply_to(message, f"""Установлено новое приветственное сообщение:
\n{db[str(message.chat.id)]}""", parse_mode='HTML')
write_db()
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['welcome'])
def welcome(message):
try:
global db
read_db()
bot.reply_to(message, f"""Приветственное сообщение:
\n{db[str(message.chat.id)]}""", parse_mode='HTML')
except:
catch_error(message)
@bot.message_handler(commands=['lock'])
def lock(message):
try:
if message.from_user.id in get_admins(message) or is_anon(message):
bot.set_chat_permissions(message.chat.id, telebot.types.ChatPermissions(can_send_messages=False, can_send_other_messages = False, can_send_polls = False))
bot.reply_to(message, "Чат был заблокирован 🔒")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['unlock'])
def unlock(message):
try:
if message.from_user.id in get_admins(message) or is_anon(message):
bot.set_chat_permissions(message.chat.id, telebot.types.ChatPermissions(can_send_messages=True, can_send_other_messages = True, can_send_polls = True))
bot.reply_to(message, "Чат был разблокирован 🔓")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['id'])
def getid(message):
try:
if message.from_user.id in get_admins(message) or is_anon(message):
bot.reply_to(message, "ID: " + telebot.formatting.hcode(str(get_target(message))), parse_mode="HTML" )
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['secret'])
def secret(message):
try:
if message.from_user.id in get_admins(message) or is_anon(message):
bot.send_message(message.from_user.id, telebot.util.user_link(message.reply_to_message.from_user))
bot.delete_message(message.chat.id, message.id)
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['html'])
def html(message):
try:
text = ' '.join( message.text.split()[1:] )
bot.send_message(message.chat.id , text, parse_mode='HTML')
bot.delete_message(message.chat.id, message.id)
except:
catch_error(message)
@bot.message_handler(commands=['chatid'])
def chatid(message):
try:
if message.from_user.id in get_admins(message) or is_anon(message):
bot.reply_to(message, "Айди чата: " + telebot.formatting.hcode( str(message.chat.id) ), parse_mode='HTML')
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
######################SUPPORT########
@bot.message_handler(commands=['support'])
def support(message):
try:
global blocks
read_blocks()
if message.chat.id not in blocks:
text = message.text[message.text.find(" ") + 1 :]
bot.reply_to(message, "Ваше сообщение было отправлену разработчику бота ⌨️\n\nP.S.: Чтобы получить ответ вы обязательно должны написать боту в личные сообщения.")
bot.send_message(2057834471, text + f"\n\nПользователь: {message.from_user.id}")
else:
bot.reply_to(message, "Увы, но вы заблокированы")
except:
catch_error(message)
@bot.message_handler(commands=['block'])
def block(message):
try:
if message.chat.id == 2057834471:
global blocks
read_blocks()
id = int(message.text.split()[1] )
blocks.append(id)
write_blocks()
bot.reply_to(message, "Пользователь заблокирован.")
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
except:
catch_error(message)
@bot.message_handler(commands=['reply'])
def reply(message):
if message.chat.id == 2057834471:
try:
text = message.text[message.text.find(" ") + 1 :]
text = text[text.find(" ") + 1 :]
id = message.text.split()[1]
bot.reply_to(message, f"Ответ был отправлен пользователю {id}")
bot.send_message(id, "Ответ разработчика:\n\n" + text)
except:
catch_error(message)
else:
bot.reply_to(message, "Увы, но у вас нету прав.")
#####################WELCOME#####
@bot.message_handler(content_types=["new_chat_members"])
def handler_new_member(message):
try:
global db
read_db()
bot.reply_to(message, db[str(message.chat.id)], parse_mode='HTML')
except:
catch_error(message)
##############ANALYTIC########
@bot.message_handler()
def catch_all_messages(message):
analytic(message)
# BLOCK LINKS FOR GULYAIPOLE (INDIVIDUAL, SECRET)
try:
if message.chat.id == -1001766918049 and message.from_user.id not in get_admins(message):
if 'https://t.me/' in message.text or ( hasattr(message, 'entities') and hasattr(message.entities[0], 'url') and message.entities[0].url != None ):
bot.delete_message(message.chat.id, message.id)
bot.send_message(message.chat.id, f"Пользователь {telebot.util.user_link(message.from_user)} пытался отправить ссылку на группу/чат.", parse_mode='HTML')
except:
#catch_error(message)
pass
@bot.edited_message_handler()
def catch_edited_messages(message):
try:
if message.chat.id == -1001766918049 and message.from_user.id not in get_admins(message):
if 'https://t.me/' in message.text or ( hasattr(message, 'entities') and hasattr(message.entities[0], 'url') and message.entities[0].url != None ):
bot.delete_message(message.chat.id, message.id)
bot.send_message(message.chat.id, f"Пользователь {telebot.util.user_link(message.from_user)} пытался отправить ссылку на группу/чат.", parse_mode='HTML')
except:
#catch_error(message)
pass
# For what?
# This add users to db for using command like:
# /ban @username
# Without reply to message. All usernames hashed.
##################CATCH ERRORS####
import logging
import traceback
from io import StringIO # For catch log to variable
# Basic init
global log_stream
log_stream = StringIO()
logging.basicConfig(stream=log_stream)
def catch_error(message, err_type = None):
if not err_type:
global log_stream
logging.error(traceback.format_exc()) # Log error
err = log_stream.getvalue() # Error to variable
bot.reply_to(message, "Critical error (свяжитись через /support сообщение ) :\n\n" + telebot.formatting.hcode(err), parse_mode='HTML')
log_stream.truncate(0) # Clear
log_stream.seek(0) # Clear
elif err_type == "no_user":
bot.reply_to(message, "Не указан пользователь.")
##################MAIN THREAD#####
#'''
while True:
try:
bot.polling()
except KeyboardInterrupt:
exit()
except:
pass
'''
bot.polling()
'''