telegram-video-downloader/src/vid_utils.py
2021-07-04 22:30:17 +02:00

115 lines
3.8 KiB
Python

import os
import logging
from json import loads
from json.decoder import JSONDecodeError
from glob import glob, escape
from subprocess import Popen, PIPE
from contextlib import contextmanager
from telegram import InlineKeyboardButton
from db import VidDatabase
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
class BadLink(Exception):
pass
class Video:
def __init__(self, link=None, vid=None, init_keyboard=False):
self.db = VidDatabase(os.path.join(os.environ['CONF_FOLDER'], 'viddb.sqlite3'))
if not self.db.is_valid:
# Database file not present
# Create a new database
self.db.create()
if vid is None and link is not None:
self.link = link
self.file_name = None
elif vid is not None and link is None:
self.link, self.code = self.db.select_vid(vid)
self.code, self.audio_only = self.code.split('|')
else:
raise Exception('what is going on?')
if init_keyboard:
try:
self.formats = self.get_formats()
self.keyboard = self.generate_keyboard()
except Exception:
raise
def get_formats(self):
formats = {}
p = Popen(['youtube-dl', '-J', self.link], stdout=PIPE, stderr=PIPE).communicate()
if b'ERROR' in p[1]:
raise Exception('video URL not supported')
video_info = loads(str(p[0], 'utf-8'))
if video_info.get('_type', None) == 'playlist':
video_info = video_info.get('entries')[0]
if video_info.get('formats') is not None:
for vid in video_info.get('formats'):
self.add_format(formats, vid)
else:
self.add_format(formats, video_info)
logger.info('Formats: {}'.format(formats))
return formats
def add_format(self, formats, vid):
format_code = vid.get('format_id')
extension = vid.get('ext')
name = vid.get('format')
key = '{},{}'.format(extension, name)
code = '{}|{}'.format(format_code, 'audio only' in name)
index = self.db.insert_vid(self.link, code)
formats[key] = index
def generate_keyboard(self):
''' Generate a list of InlineKeyboardButton of resolutions '''
kb = []
for key in self.formats.keys():
cb = '{}'.format(self.formats[key])
kb.append([InlineKeyboardButton(key, callback_data=cb)])
return kb
def download(self):
logger.info('Downloading {}'.format(self.link))
cmd = ['youtube-dl', '-o', '/bot/out/%(title)s-%(id)s.%(ext)s']
if self.audio_only in [False, "False"]:
self.code = self.code + '+bestaudio'
cmd.extend(['-f', self.code, self.link])
logger.info(cmd)
p = Popen(cmd, stdout=PIPE, stderr=PIPE).communicate()
for line in str(p[0], 'utf-8').split('\n'):
logger.info(line)
if '[download] Destination:' in line:
self.file_name = line[24:] # name of the file
if '[ffmpeg] Merging formats into' in line:
self.file_name = line[31:-1] # name of the file
def check_dimension(self):
try:
if os.path.getsize(self.file_name) > 50 * 1024 * 1023:
Popen(['split', '-b', '49M', self.file_name, self.file_name])
os.remove(self.file_name)
return glob(escape(self.file_name) + '*')
except AttributeError as e:
logger.error(e)
return None
@contextmanager
def send(self):
files = self.check_dimension() # split if size >= 50MB
yield files
for f in files: # removing old files
os.remove(f)