123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- import os
- import logging
- from json import loads
- from json.decoder import JSONDecodeError
- from glob import glob, escape
- from subprocess import Popen, PIPE
- from contextlib import contextmanager
- from telegram import InlineKeyboardButton
- from db import VidDatabase
- logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
- logger = logging.getLogger(__name__)
- class BadLink(Exception):
- pass
- class Video:
- def __init__(self, link=None, vid=None, init_keyboard=False):
- self.db = VidDatabase(os.path.join(os.environ['CONF_FOLDER'], 'viddb.sqlite3'))
- if not self.db.is_valid:
- # Database file not present
- # Create a new database
- self.db.create()
- if vid is None and link is not None:
- self.link = link
- self.file_name = None
- elif vid is not None and link is None:
- self.link, self.code = self.db.select_vid(vid)
- self.code, self.audio_only = self.code.split('|')
- else:
- raise Exception('what is going on?')
- if init_keyboard:
- try:
- self.formats = self.get_formats()
- self.keyboard = self.generate_keyboard()
- except Exception:
- raise
- def get_formats(self):
- formats = {}
- p = Popen(['youtube-dl', '-J', self.link], stdout=PIPE, stderr=PIPE).communicate()
- if b'ERROR' in p[1]:
- raise Exception('video URL not supported')
- video_info = loads(str(p[0], 'utf-8'))
- if video_info.get('_type', None) == 'playlist':
- video_info = video_info.get('entries')[0]
- if video_info.get('formats') is not None:
- for vid in video_info.get('formats'):
- self.add_format(formats, vid)
- else:
- self.add_format(formats, video_info)
- logger.info('Formats: {}'.format(formats))
- return formats
- def add_format(self, formats, vid):
- format_code = vid.get('format_id')
- extension = vid.get('ext')
- name = vid.get('format')
- key = '{},{}'.format(extension, name)
- code = '{}|{}'.format(format_code, 'audio only' in name)
- index = self.db.insert_vid(self.link, code)
- formats[key] = index
- def generate_keyboard(self):
- ''' Generate a list of InlineKeyboardButton of resolutions '''
- kb = []
- for key in self.formats.keys():
- cb = '{}'.format(self.formats[key])
- kb.append([InlineKeyboardButton(key, callback_data=cb)])
- return kb
- def download(self):
- logger.info('Downloading {}'.format(self.link))
- cmd = ['youtube-dl', '-o', '/bot/out/%(title)s-%(id)s.%(ext)s']
- if self.audio_only in [False, "False"]:
- self.code = self.code + '+bestaudio'
- cmd.extend(['-f', self.code, self.link])
- logger.info(cmd)
- p = Popen(cmd, stdout=PIPE, stderr=PIPE).communicate()
- for line in str(p[0], 'utf-8').split('\n'):
- logger.info(line)
- if '[download] Destination:' in line:
- self.file_name = line[24:] # name of the file
- if '[ffmpeg] Merging formats into' in line:
- self.file_name = line[31:-1] # name of the file
- def check_dimension(self):
- try:
- if os.path.getsize(self.file_name) > 50 * 1024 * 1023:
- Popen(['split', '-b', '49M', self.file_name, self.file_name])
- os.remove(self.file_name)
- return glob(escape(self.file_name) + '*')
- except AttributeError as e:
- logger.error(e)
- return None
- @contextmanager
- def send(self):
- files = self.check_dimension() # split if size >= 50MB
- yield files
- for f in files: # removing old files
- os.remove(f)
|