-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
17 changed files
with
4,576 additions
and
0 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
__author__ = "Ernesto" | ||
__email__ = "ernestondieki12@gmail.com" | ||
|
||
|
||
from os import rename | ||
from pytube import YouTube | ||
from threading import Thread | ||
from os.path import splitext, join | ||
from YT.ffmpeg import to_mp3, add_audio, safe_delete | ||
|
||
|
||
def cvt_bytes(num: float) -> str: | ||
""" format bytes to respective units for presentation (max GB) """ | ||
try: | ||
if num >= 1073741824: | ||
return f"{round(num / 1073741824, 2):,} GB" | ||
elif num >= 1048576: | ||
return f"{round(num / 1048576, 2):,} MB" | ||
elif num >= 1024: | ||
return f"{round(num / 1024, 2):,} KB" | ||
else: | ||
return f"{num} Bytes" | ||
except Exception: | ||
return "0 Bytes" | ||
|
||
|
||
class Base(): | ||
""" Pytube YouTube object, yt_obj """ | ||
|
||
def __init__(self, yt: YouTube, display=None, f_type=""): | ||
self.display = display | ||
self.yt_obj = yt | ||
self.yt_obj.register_on_complete_callback(self.on_complete) | ||
self.yt_obj.register_on_progress_callback(self.on_progress) | ||
|
||
self.stream = None | ||
self.file_path = "" | ||
self.display_text = "" | ||
self._type = f_type | ||
|
||
@property | ||
def thumbnail(self) -> str: | ||
""" thumbnail link """ | ||
return self.yt_obj.thumbnail_url | ||
|
||
def _download(self, file_dir, prefix=None): | ||
""" file_dir: folder to save download """ | ||
file_path = self.stream.download(output_path=file_dir, | ||
max_retries=10, | ||
filename_prefix=prefix, | ||
) | ||
return file_path | ||
|
||
def update_disp(self, display): | ||
""" update the label for displaying output """ | ||
self.display = display | ||
# update text right away | ||
self.update_text(self.display_text) | ||
|
||
def update_text(self, txt: str): | ||
""" update display text """ | ||
try: | ||
self.display_text = txt | ||
self.display.configure(text=self.display_text) | ||
except Exception: | ||
pass | ||
|
||
# obsolete since ThreadPoolExcutor was introduced | ||
def download(self, path, prefix=None): | ||
""" Download to `path` folder """ | ||
thread = Thread(target=self._download, | ||
args=(path, ), kwargs={"prefix": prefix}, | ||
daemon=True, | ||
) | ||
thread.start() | ||
|
||
def on_complete(self, stream, filepath): | ||
self.file_path = filepath | ||
|
||
def on_progress(self, stream, chunk, bytes_rem): | ||
# stream, chunk, bytes_remaining | ||
filesize = stream.filesize | ||
p_done = ((filesize - bytes_rem) / filesize) * 100 | ||
|
||
self.update_text(f"({self._type}) {self.filename[:20]}... {round(p_done, 2)}% of {cvt_bytes(filesize)}") | ||
|
||
|
||
class YTAudio(Base): | ||
""" | ||
Pytube audio stream of `itag` | ||
`disp` is of a tkinter Label inst | ||
""" | ||
|
||
def __init__(self, yt: YouTube, itag: int, disp=None, _type="Audio"): | ||
|
||
super().__init__(yt, display=disp, f_type=_type) | ||
self.stream = self.yt_obj.streams.get_by_itag(itag) | ||
self.filename = self.stream.default_filename | ||
self.done = False | ||
|
||
def _download(self, file_dir, prefix=None, preset=None): | ||
""" override _download method """ | ||
try: | ||
spd = file_dir.split("\\") | ||
spd = f"\\{spd[-2]}\\{spd[-1]}" if len(spd) > 1 else f"{spd}\\" | ||
# over-write if file exists | ||
self.update_text(f"Downloading to '...{spd}'") | ||
|
||
# download | ||
file_path = super()._download(file_dir, prefix=prefix) | ||
|
||
path, file_ext = splitext(file_path) | ||
|
||
if file_ext.lower() != ".mp3": | ||
self.update_text("Converting to mp3...") | ||
|
||
to_mp3(file_path, self.thumbnail, preset=preset) | ||
safe_delete(file_path) | ||
|
||
self.update_text(f"Saved to '...{spd}'") | ||
|
||
except Exception: | ||
self.update_text("Error saving the audio. Try again.") | ||
self.done = True | ||
|
||
def __repr__(self): | ||
return f"YTAudio, {self.stream.abr}" | ||
|
||
|
||
class YTVideo(Base): | ||
""" | ||
Pytube video stream of `itag` | ||
`disp` is of a tkinter Label inst | ||
""" | ||
|
||
def __init__(self, yt: YouTube, itag: int, disp=None, _type="Video"): | ||
super().__init__(yt, display=disp, f_type=_type) | ||
|
||
self.done = False | ||
|
||
get_audio = yt.streams.get_audio_only | ||
# get by itag | ||
self.stream = self.yt_obj.streams.get_by_itag(itag) | ||
self.filename = self.stream.default_filename | ||
self.audio = get_audio(subtype="webm") or get_audio() | ||
|
||
def _download(self, file_dir: str, prefix: str = None, preset=None): | ||
""" | ||
override _download method | ||
file_dir: folder to download to | ||
""" | ||
|
||
audio_path = None | ||
output = join(file_dir, f"{splitext(self.filename)[0]}.mp4") | ||
spd = file_dir.split("\\") | ||
spd = f"\\{spd[-2]}\\{spd[-1]}" if len(spd) > 1 else f"{spd}\\" | ||
# over-write if file exists | ||
# notify starting | ||
self.update_text(f"Downloading to '...{spd}'") | ||
try: | ||
|
||
if self.stream.is_adaptive: | ||
# download audio in a different thread | ||
self.thread_audio_download(self.audio.download, | ||
file_dir, | ||
filename_prefix="Audio_", | ||
max_retries=10) | ||
audio_path = join(file_dir, f"Audio_{self.audio.default_filename}") | ||
|
||
# download video | ||
self.full_path = super()._download(file_dir, prefix=prefix) | ||
|
||
if (audio_path and self.full_path): | ||
|
||
self.update_text("Combining video and audio...") | ||
add_audio(self.full_path, audio_path, output, remove_src=True, preset=preset) | ||
|
||
elif audio_path is None: | ||
# strip 'Video_' prefix | ||
rename(self.full_path, output) | ||
|
||
self.full_path = output | ||
self.update_text(f"Saved to '...{spd}'") | ||
|
||
except Exception: | ||
self.update_text("Error saving the video. Try again.") | ||
self.done = True | ||
|
||
def thread_audio_download(self, func, *args, **kwargs): | ||
""" download audio in separate thread """ | ||
thread = Thread(target=func, args=args, kwargs=kwargs, daemon=True) | ||
thread.start() | ||
|
||
def __repr__(self): | ||
return f"YTVideo, {self.stream.resolution}" | ||
|
||
|
||
def get_yt(link: str): | ||
""" return pytube YouTube instance """ | ||
return YouTube(link) |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
__author__ = "Ernesto" | ||
__email__ = "ernestondieki12@gmail.com" | ||
|
||
# option placement matters | ||
# ffmpeg [global options] [input options] -i input [output options] output | ||
|
||
from subprocess import run, STARTUPINFO, STARTF_USESHOWWINDOW | ||
from os.path import join, abspath, dirname, exists, splitext | ||
from os import remove, mkdir | ||
import sys | ||
|
||
|
||
BASE_DIR = dirname(abspath(__file__)) | ||
|
||
|
||
def r_path(relpath): | ||
""" | ||
Get absolute path | ||
""" | ||
|
||
base_path = getattr(sys, "_MEIPASS", BASE_DIR) | ||
return join(base_path, relpath) | ||
|
||
|
||
YT_DIR = r_path("YT") | ||
YT_DIR = YT_DIR if exists(YT_DIR) else BASE_DIR # else is true when running Lazy_Selector as a file, not app | ||
|
||
DATA_DIR = join(YT_DIR, "data") | ||
|
||
FFMPEG_DIR = join(YT_DIR, "ffmpeg") | ||
|
||
# PATH = join(FFMPEG_DIR, "bin", "ffmpeg.exe") # uncomment this if you have ffmpeg.exe put in bin folder | ||
PATH = "ffmpeg" # if you have ffmpeg path set on environment variables | ||
|
||
FFMPEG_OUT_FILE = join(DATA_DIR, "ffmpeg_errors.txt") | ||
|
||
NO_WIN = STARTUPINFO() | ||
NO_WIN.dwFlags |= STARTF_USESHOWWINDOW | ||
|
||
|
||
def safe_delete(filename: str): | ||
""" skip exceptions that may occur when deleting """ | ||
try: | ||
remove(filename) | ||
except Exception: | ||
pass | ||
|
||
|
||
def ffmpeg_process(cmd: list): | ||
""" blocking function; run cmd using subprocess """ | ||
|
||
try: | ||
# over-write with the most recent errors | ||
with open(FFMPEG_OUT_FILE, "wb") as error_file: | ||
|
||
run(cmd, | ||
stderr=error_file, | ||
check=True, | ||
startupinfo=NO_WIN, | ||
) | ||
except FileNotFoundError: | ||
mkdir(DATA_DIR) | ||
ffmpeg_process(cmd) | ||
|
||
|
||
def to_mp3(filename, cover, fmt="mp3", preset=None): | ||
""" convert audio `filename` format to `fmt` """ | ||
name, ext = splitext(filename) | ||
|
||
if ext.strip(".") != fmt: | ||
output = f"{name}.{fmt}" | ||
# ffmpeg [global options] [input options] -i input [output options] output | ||
cmd = [ | ||
PATH, | ||
"-y", # overwrite output | ||
# "-cpu-used", "0", | ||
"-threads", "2", | ||
"-i", filename, | ||
"-i", cover, | ||
"-v", "error", | ||
"-map", "0:0", | ||
"-map", "1:0", | ||
"-id3v2_version", "3", | ||
"-metadata:s:v", "title='Album cover'", | ||
"-metadata:s:v", "comment='Cover (front)'", | ||
"-preset", preset or "medium", | ||
output, | ||
] | ||
|
||
ffmpeg_process(cmd) | ||
|
||
|
||
def add_audio(video, audio, output, remove_src=False, preset=None): | ||
""" add audio to video (not mixing) """ | ||
|
||
# ffmpeg [global options] [input options] -i input [output options] output | ||
cmd = [ | ||
PATH, | ||
"-y", # overwrite if output exists | ||
"-i", video, "-i", audio, | ||
"-v", "error", # log errors, we're interested in them | ||
"-map", "0:v:0", # grab video only (track 0) from index 0 (video input) | ||
"-map", "1:a:0", # grab track 0 from index 1 (audio input) | ||
"-c:v", "copy", # copy video data, no re-encoding | ||
"-c:a", "aac", # use aac codec for audio | ||
"-preset", preset or "medium", | ||
output, | ||
] | ||
|
||
ffmpeg_process(cmd) | ||
|
||
if remove_src: | ||
safe_delete(audio) | ||
safe_delete(video) | ||
|
||
|
||
def reduce_vid_size(filename): | ||
""" use crf, slower preset for small file size mp4 vids """ | ||
name, ext = splitext(filename) | ||
|
||
# ffmpeg [global options] [input options] -i input [output options] output | ||
cmd = [ | ||
PATH, | ||
"-i", filename, | ||
"-vcodec", "libx265", | ||
"-crf", "28", | ||
"-preset", "slower", | ||
f"{name}_c{ext}" | ||
] | ||
|
||
ffmpeg_process(cmd) | ||
|
||
|
||
def ffmpeg_video_download(video, audio, output): | ||
""" download youtube vids using stream links """ | ||
# ffmpeg -i "YOUR URL TO DOWNLOAD VIDEO FROM" -c:v libx264 -preset slow -crf 22 "saveas.mp4" | ||
# hw | ||
# ffmpeg -i "URL" -preset medium -c:v hevc_nvenc -rc constqp -qp 31 -c:a aac -b:a 64k -ac 1 “name_output.mp4” |
Oops, something went wrong.