You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

587 lines
16 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import telebot
import os
import json
from telebot import types,util
global db, users, la
####### CREATE DB IF NOT EXIST
if not os.path.exists('db.json'):
db = {'token': 'None'}
js = json.dumps(db, indent=2)
with open('db.json', 'w') as outfile:
outfile.write(js)
print('Input token in "None" (db.json)')
exit()
if not os.path.exists('users.json'):
users = {}
js = json.dumps(users, indent=2)
with open('users.json', 'w') as outfile:
outfile.write(js)
if not os.path.exists('la.json'):
la = {}
js = json.dumps(la, indent=2)
with open('la.json', 'w') as outfile:
outfile.write(js)
############WORK WITH DBs##########
def read_db():
global db
with open('db.json', 'r') as openfile:
db = json.load(openfile)
def write_db():
global db
js = json.dumps(db, indent=2)
with open('db.json', 'w') as outfile:
outfile.write(js)
def read_users():
global users
with open('users.json', 'r') as openfile:
users = json.load(openfile)
def write_users():
global users
js = json.dumps(users, indent=2)
with open('users.json', 'w') as outfile:
outfile.write(js)
# LA - Low Admin.
# Admin permissions in bot without admin rights.
def read_la():
with open('la.json', 'r') as openfile:
la = json.load(openfile)
return la
def write_la(la):
js = json.dumps(la, indent=2)
with open('la.json', 'w') as outfile:
outfile.write(js)
####################FAST HASH#################
from xxhash import xxh32
# Generate fast hash
def sha(text):
text = str(text)
return xxh32(text).hexdigest()
##################FUNCTIONS########
def get_admins(message):
try:
if bot.get_chat(message.chat.id).type == 'private':
return []
else:
admins = bot.get_chat_administrators(chat_id=message.chat.id)
true_admins = []
for i in admins:
if i.status == 'creator' or i.can_restrict_members == True:
true_admins.append(i.user.id)
return true_admins
except Exception as e:
catch_error(message, e)
return None
# Fix for anon admins, all anon (not premium) users == admins
def is_anon(message):
if message.from_user.username == 'Channel_Bot' or message.from_user.username == 'GroupAnonymousBot':
if message.from_user.is_premium == None:
return True
else:
return False
# Return id from db/chat of user
def get_target(message):
try:
global users
spl = message.text.split()
if ( len(spl) > 1 and spl[1][0] == '@' ) or ( len(spl) > 2 and spl[2][0] == '@' ):
for i in spl:
if i[0] == '@':
username = i[1:]
break
read_users()
if sha(username) in users:
return users[sha(username)]
else:
return None
else:
target = message.reply_to_message.from_user.id
if target not in get_admins(message):
return target
else:
return None
except:
return None
def get_name(message):
try:
text = message.text.split()
# If message with @username
if len(text) > 1 and text[1][0] == '@':
return text[1]
if len(text) > 2 and text[2][0] == '@':
return text[2]
# Reply to message
else:
return telebot.util.user_link(message.reply_to_message.from_user)
except Exception as e:
catch_error(message, e)
# Get time for '/mute'
# [time, time_in_sec, format]
def get_time(message):
formats = {'s':[1, 'секунд(ы)'], 'm':[60, 'минут(ы)'], 'h': [3600,'час(а)'], 'd': [86400,'день/дня']}
text = message.text.split()[1:] ; time = None
# Find format in text
for i in text:
if time:
break
for f in list(formats.keys()):
if f in i:
try:
time = [i[:-1], int(i[:-1]) * formats[i[-1]][0] , formats[i[-1]][1] ]
break
except:
pass
return time
def have_rights(message, set_la = False):
la = read_la()
if message.from_user.id in get_admins(message):
return True
elif is_anon(message):
return True
elif str(message.chat.id) in la and not set_la:
if str(message.from_user.username) in la[str(message.chat.id)]:
return True
else:
bot.reply_to(message, 'Увы, но у вас нету прав.')
def key_by_value(dictionary, key):
for i in dictionary:
if dictionary[i] == key:
return i
return None
def analytic(message):
global users
read_users()
if key_by_value(users, message.from_user.id) == message.from_user.username:
pass
elif message.from_user.username == 'None':
pass
else:
users[sha(message.from_user.username)] = message.from_user.id
write_users()
#############TOKEN INIT#####
read_db()
read_users()
bot = telebot.TeleBot(db['token'])
##################COMMANDS#######
@bot.message_handler(commands=['start', 'faq'])
def send_welcome(message):
bot.reply_to(message, '''Колотушка работает 🔨
Что это такое?
Это минимальный бот-модератор без слежки с открытым кодом.
Код: https://gitea.gulyaipole.fun/justuser/just_moderator
Список команд - /help
''')
@bot.message_handler(commands=['help'])
def help(message):
bot.reply_to(message, '''
Список команд:
/mute ⇁ Мут человека в ответ на сообщение
/unmute ⇁ Снятие мута
/kick ⇁ Кик
/ban ⇁ Бан
/unban ⇁ Снятие бана
/setwelcome ⇁ Приветственное сообщение
/welcome ⇁ Демонстрация текущего сообщения
/lock ⇁ Блокировка чата (для обычных пользователей)
/unlock ⇁ Снятие блокировки чата
/chatid ⇁ Айди чата
/del ⇁ Удаление сообщения
/la - всё о режиме 'low admin'
/secret ⇁ Функция для получения ссылки html на пользователя в лс.
/html <code> ⇁ Отправка текста сообщения в режиме формата html.
/support ⇁ Написать разработчику. Прошу писать по делу.
''')
@bot.message_handler(commands=['mute'])
def mute(message):
try:
if have_rights(message):
target = get_target(message)
time = get_time(message)
if target:
if time:
bot.restrict_chat_member(message.chat.id, target, until_date = message.date + time[1])
answer = f'Пользователь { get_name(message) } был заглушён на {time[0]} {time[2]}.'
else:
bot.restrict_chat_member(message.chat.id, target, until_date = message.date)
answer = f'Пользователь { get_name(message) } был заглушен.'
try:
bot.reply_to(message, answer, parse_mode='HTML')
except:
bot.reply_to(message, answer)
else:
catch_error(message, 'None', 'no_user')
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['unmute'])
def unmute(message):
try:
if have_rights(message):
target = get_target(message)
if target:
bot.restrict_chat_member(message.chat.id, target, can_send_messages=True
, can_send_other_messages = True, can_send_polls = True
, can_add_web_page_previews = True, until_date = message.date)
bot.reply_to(message, f'''Пользователь { get_name(message) } снова имеет дар речи.
''', parse_mode='HTML')
else:
catch_error(message, None, 'no_user')
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['kick'])
def kick(message):
try:
if have_rights(message):
target = get_target(message)
if target:
bot.ban_chat_member(message.chat.id, target)
bot.unban_chat_member(message.chat.id, target)
bot.reply_to(message, f'''Пользователь { get_name(message) } был исключён.
''', parse_mode='HTML')
else:
catch_error(message, None, 'no_user')
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['ban'])
def ban(message):
try:
if have_rights(message):
target = get_target(message)
if target:
bot.ban_chat_member(message.chat.id, target)
bot.reply_to(message, f'''Пользователь { get_name(message) } исключён и заблокирован.
''', parse_mode='HTML')
else:
catch_error(message, None, 'no_user')
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['unban'])
def unban(message):
try:
if have_rights(message):
target = get_target(message)
if target:
bot.unban_chat_member(message.chat.id, target)
bot.reply_to(message, f'''Пользователь { get_name(message) } разблокирован.
''', parse_mode='HTML')
else:
catch_error(message, None, 'no_user')
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['setwelcome'])
def setwelcome(message):
try:
if have_rights(message):
global db
db[str(message.chat.id)] = message.html_text[ message.text.find(' ') + 1 :]
bot.reply_to(message, f'''Установлено новое приветственное сообщение:
\n{db[str(message.chat.id)]}''', parse_mode='HTML')
write_db()
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['welcome'])
def welcome(message):
try:
global db
read_db()
bot.reply_to(message, f'''Приветственное сообщение:
\n{db[str(message.chat.id)]}''', parse_mode='HTML')
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['lock'])
def lock(message):
try:
if have_rights(message):
bot.set_chat_permissions(message.chat.id, telebot.types.ChatPermissions(can_send_messages=False, can_send_other_messages = False, can_send_polls = False))
bot.reply_to(message, 'Чат был заблокирован 🔒')
else:
bot.reply_to(message, 'Увы, но у вас нету прав.')
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['unlock'])
def unlock(message):
try:
if have_rights(message):
bot.set_chat_permissions(message.chat.id, telebot.types.ChatPermissions(can_send_messages=True, can_send_other_messages = True, can_send_polls = True))
bot.reply_to(message, 'Чат был разблокирован 🔓')
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['id'])
def getid(message):
try:
if have_rights(message):
bot.reply_to(message, 'ID: ' + telebot.formatting.hcode(str(get_target(message))), parse_mode='HTML' )
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['del'])
def delete(message):
try:
if have_rights(message):
bot.delete_message(message.chat.id, message.reply_to_message.id)
bot.delete_message(message.chat.id, message.id)
except Exception as e:
catch_error(message, e)
### EXPEREMENTAL
@bot.message_handler(commands=['secret'])
def secret(message):
try:
if have_rights(message):
bot.send_message(message.from_user.id, telebot.util.user_link(message.reply_to_message.from_user))
bot.delete_message(message.chat.id, message.id)
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['html'])
def html(message):
try:
text = ' '.join( message.text.split()[1:] )
bot.send_message(message.chat.id , text, parse_mode='HTML')
bot.delete_message(message.chat.id, message.id)
except Exception as e:
catch_error(message, e)
###
@bot.message_handler(commands=['chatid'])
def chatid(message):
try:
if have_rights(message):
bot.reply_to(message, 'Айди чата: ' + telebot.formatting.hcode( str(message.chat.id) ), parse_mode='HTML')
else:
bot.reply_to(message, 'Увы, но у вас нету прав.')
except Exception as e:
catch_error(message, e)
############ LOW-ADMIN ##############
def have_la(message):
try:
id = message.from_user.id
la = read_la()
if id in la:
return True
else:
la[id] = []
write_la(la)
return True
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['la'])
def la(message):
bot.reply_to(message, f'''Доступные команды:
{telebot.formatting.hcode('/la-list')} - список администраторов в режиме low-admin
{telebot.formatting.hcode('/la-add @nick')} - добавить в администраторы
{telebot.formatting.hcode('/la-del @nick')} - удалить из администраторов
''', parse_mode = 'HTML')
@bot.message_handler(commands=['la-list'])
def la_list(message):
try:
if have_rights(message, set_la=True):
la = read_la()
if have_la(str(message.chat.id)):
s = 'Список администраторов в режиме low-admin:\n'
for i in la[str(message.chat.id)]:
s = s + '\n@' + i
bot.reply_to(message, s, parse_mode='HTML')
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['la-add'])
def la_add(message):
try:
if have_rights(message, set_la=True):
la = read_la()
if have_la(message.chat.id):
nick = message.text.split()[1][1:]
print(la)
la[str(message.chat.id)].append(nick)
print(la)
write_la(la)
bot.reply_to(message, f'Пользователь @{nick} успешно добавлен в список администраторов.')
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['la-del'])
def la_del(message, set_la=True):
try:
if have_rights(message, set_la=True):
la = read_la()
if have_la(message.chat.id):
nick = message.text.split()[1][1:]
if nick in la[str(message.chat.id)]:
la[str(message.chat.id)].remove(nick)
write_la(la)
bot.reply_to(message, f'Пользователь @{nick} был исключён из списка администраторов.')
except Exception as e:
catch_error(message, e)
#######################JOIN REQUEST #############
@bot.chat_join_request_handler()
def join_request(message: telebot.types.ChatJoinRequest):
try:
bot.send_message(message.chat.id, f'''Поступила заявка на вступление от { telebot.util.user_link(message.from_user) }
Принять: { telebot.formatting.hcode(f'/accept {message.from_user.id}') }''', parse_mode='HTML')
analytic(message)
except Exception as e:
catch_error(message, e)
@bot.message_handler(commands=['accept'])
def accept_request(message):
try:
if have_rights(message):
if len(message.text.split()) == 2:
bot.approve_chat_join_request(message.chat.id, message.text.split()[1] )
bot.reply_to(message, 'Заявка принята.')
except Exception as e:
catch_error(message, e)
######################SUPPORT########
@bot.message_handler(commands=['support'])
def support(message):
try:
bot.reply_to(message, f'''Связь с аднимистратором в @just_anonchat_bot
Адрес - {telebot.formatting.hcode(':justuser')}''', parse_mode='HTML')
except Exception as e:
catch_error(message, e)
#####################WELCOME#####
@bot.message_handler(content_types=['new_chat_members'])
def handler_new_member(message):
try:
global db
read_db()
bot.reply_to(message, db[str(message.chat.id)], parse_mode='HTML')
analytic(message)
except Exception as e:
catch_error(message, e)
##############ANALYTIC########
@bot.message_handler()
def catch_all_messages(message):
analytic(message)
# For what?
# This add users to db for using command like:
# /ban @username
# Without reply to message. All usernames hashed.
##################CATCH ERRORS####
import logging
import traceback
from io import StringIO # For catch log to variable
# Basic init
global log_stream
log_stream = StringIO()
logging.basicConfig(stream=log_stream)
# Known errors
known_errs = {
'A request to the Telegram API was unsuccessful. Error code: 400. Description: Bad Request: not enough rights to restrict/unrestrict chat member': 'Увы, но у бота не хватает прав для этого.'
}
# message error err_type ('no_user', ...)
def catch_error(message, e, err_type = None):
if not err_type:
global log_stream, known_errs
e = str(e)
# Check error in known_errs
print(e)
if e in known_errs:
bot.send_message(message.chat.id, known_errs[e])
else:
logging.error(traceback.format_exc()) # Log error
err = log_stream.getvalue() # Error to variable
bot.send_message(message.chat.id, 'Critical error (свяжитись через /support ) :\n\n' + telebot.formatting.hcode(err), parse_mode='HTML')
log_stream.truncate(0) # Clear
log_stream.seek(0) # Clear
elif err_type == 'no_user':
bot.send_message(message.chat.id, 'Не указан пользователь.')
##################MAIN THREAD#####
#'''
while True:
try:
bot.polling()
except KeyboardInterrupt:
exit()
except:
pass
'''
bot.polling()
'''