import json import os import logging import telebot from telebot import types from db import * import subprocess import time from threading import Lock # Configure logging logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) # Bot token from @BotFather CONFIG = read() BOT_TOKEN = CONFIG['api_token'] bot = telebot.TeleBot(BOT_TOKEN) # Global lock to enforce delay between operations operation_lock = Lock() def delayed_operation(): """Enforce a 1-second delay between any two operations.""" with operation_lock: time.sleep(1) @bot.message_handler(func=lambda message: 'music.youtube.com/watch?v=' in message.text) def handle_youtube_music_link(message): """Handle YouTube Music links in messages""" user = message.from_user text = message.text # Extract all YouTube Music URLs from the message urls = [word for word in text.split() if 'music.youtube.com/watch?v=' in word] for url in urls: try: # Notify user that download has started progress_msg = bot.reply_to(message, "šŸŽµ Fetching music metadata...") # Delay before fetching metadata delayed_operation() # Step 1: Get metadata using yt-dlp in simulate mode result = subprocess.run( ["yt-dlp", "--print-json", "--skip-download", "--restrict-filenames", url], capture_output=True, text=True ) if result.returncode != 0: logger.error(f"yt-dlp metadata fetch error: {result.stderr}") bot.reply_to(message, "āŒ Failed to fetch music info. Please try again.") continue metadata = json.loads(result.stdout) title = metadata.get("title", "Unknown Title") uploader = metadata.get("uploader", "Unknown Artist") # Update message bot.edit_message_text( chat_id=progress_msg.chat.id, message_id=progress_msg.message_id, text="šŸŽµ Downloading your music..." ) # Delay before downloading delayed_operation() # Step 2: Download the audio output_template = f"{user.id}_%(title)s.%(ext)s" download_result = subprocess.run( [ "yt-dlp", "-x", "--audio-format", "mp3", "--audio-quality", "0", "--output", output_template, "--restrict-filenames", url ], capture_output=True, text=True ) if download_result.returncode != 0: logger.error(f"yt-dlp download error: {download_result.stderr}") bot.reply_to(message, "āŒ Failed to download audio. Please check the link and try again.") continue # Step 3: Find the downloaded file downloaded_files = [f for f in os.listdir('.') if f.startswith(str(user.id)) and f.endswith('.mp3')] if not downloaded_files: bot.reply_to(message, "āŒ Failed to download audio. Please try again.") return audio_file = downloaded_files[0] # Step 4: Send audio with metadata as caption with open(audio_file, 'rb') as audio: bot.send_audio( chat_id=message.chat.id, audio=audio, caption=f"šŸŽ§ {title}\nšŸ‘¤ {uploader}", title=title, performer=uploader, reply_to_message_id=message.message_id ) # Delete progress message bot.delete_message(message.chat.id, progress_msg.message_id) # Clean up downloaded file os.remove(audio_file) logger.info(f"Sent audio to user {user.first_name} (@{user.username})") except Exception as e: logger.error(f"Error processing request: {e}") bot.reply_to(message, "āŒ An error occurred while processing your request. Please try again later.") @bot.message_handler(commands=['start', 'help']) def send_welcome(message): """Send welcome message""" bot.reply_to(message, "šŸŽµ YouTube Music Downloader Bot\n\n" "Send me a link from music.youtube.com and I'll download the audio for you!\n\n" "Example: https://music.youtube.com/watch?v=dQw4w9WgXcQ\n\n" "Source code: https://gitea.del.pw/justuser-31/just_ytmusic_downloader", disable_web_page_preview=True ) # # Handler for when bot is added to a group # @bot.my_chat_member_handler() # def my_chat_member_handler(message: types.ChatMemberUpdated): # """ # Triggered when the bot's membership status changes in a chat. # If added to a group but is not admin, notify that it needs admin rights. # """ # new_status = message.new_chat_member.status # chat_type = message.chat.type # # # Only respond if added to a group or supergroup # if chat_type in ["group", "supergroup"] and new_status == "member": # # Check if bot is admin # try: # bot_member = bot.get_chat_member(message.chat.id, bot.get_me().id) # if not bot_member.status == "administrator": # bot.send_message( # chat_id=message.chat.id, # text="āš ļø This bot must be an administrator in the group to work properly.", # disable_notification=True # ) # except Exception as e: # logger.warning(f"Could not check admin status: {e}") if __name__ == '__main__': logger.info("Bot is running...") bot.infinity_polling()