import os import logging from json import loads from json.decoder import JSONDecodeError from glob import glob, escape from subprocess import Popen, PIPE from contextlib import contextmanager from telegram import InlineKeyboardButton from db import VidDatabase logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) class BadLink(Exception): pass class Video: def __init__(self, link=None, vid=None, init_keyboard=False): self.db = VidDatabase(os.path.join(os.environ['CONF_FOLDER'], 'viddb.sqlite3')) if not self.db.is_valid: # Database file not present # Create a new database self.db.create() if vid is None and link is not None: self.link = link self.file_name = None elif vid is not None and link is None: self.link, self.code = self.db.select_vid(vid) self.code, self.audio_only = self.code.split('|') else: raise Exception('what is going on?') if init_keyboard: try: self.formats = self.get_formats() self.keyboard = self.generate_keyboard() except Exception: raise def get_formats(self): formats = {} p = Popen(['youtube-dl', '-J', self.link], stdout=PIPE, stderr=PIPE).communicate() if b'ERROR' in p[1]: raise Exception('video URL not supported') video_info = loads(str(p[0], 'utf-8')) if video_info.get('_type', None) == 'playlist': video_info = video_info.get('entries')[0] if video_info.get('formats') is not None: for vid in video_info.get('formats'): self.add_format(formats, vid) else: self.add_format(formats, video_info) logger.info('Formats: {}'.format(formats)) return formats def add_format(self, formats, vid): format_code = vid.get('format_id') extension = vid.get('ext') name = vid.get('format') key = '{},{}'.format(extension, name) code = '{}|{}'.format(format_code, 'audio only' in name) index = self.db.insert_vid(self.link, code) formats[key] = index def generate_keyboard(self): ''' Generate a list of InlineKeyboardButton of resolutions ''' kb = [] for key in self.formats.keys(): cb = '{}'.format(self.formats[key]) kb.append([InlineKeyboardButton(key, callback_data=cb)]) return kb def download(self): logger.info('Downloading {}'.format(self.link)) cmd = ['youtube-dl', '-o', '/bot/out/%(title)s-%(id)s.%(ext)s'] if self.audio_only in [False, "False"]: self.code = self.code + '+bestaudio' cmd.extend(['-f', self.code, self.link]) logger.info(cmd) p = Popen(cmd, stdout=PIPE, stderr=PIPE).communicate() for line in str(p[0], 'utf-8').split('\n'): logger.info(line) if '[download] Destination:' in line: self.file_name = line[24:] # name of the file if '[ffmpeg] Merging formats into' in line: self.file_name = line[31:-1] # name of the file def check_dimension(self): try: if os.path.getsize(self.file_name) > 50 * 1024 * 1023: Popen(['split', '-b', '49M', self.file_name, self.file_name]) os.remove(self.file_name) return glob(escape(self.file_name) + '*') except AttributeError as e: logger.error(e) return None @contextmanager def send(self): files = self.check_dimension() # split if size >= 50MB yield files for f in files: # removing old files os.remove(f)