diff --git a/nndownload/__init__.py b/nndownload/__init__.py
index 43f7d8c..36ed2e9 100644
--- a/nndownload/__init__.py
+++ b/nndownload/__init__.py
@@ -1,7 +1,11 @@
+"""Module entry for nndownload."""
+
from . import nndownload
def execute(*args):
+ """Pass arguments to be executed by nndownload."""
+
args_list = [e.strip() for e in args]
- nndownload._cmdl_opts = nndownload.cmdl_parser.parse_args(args_list)
+ nndownload._CMDL_OPTS = nndownload.cmdl_parser.parse_args(args_list)
nndownload.main()
diff --git a/nndownload/__main__.py b/nndownload/__main__.py
index 884acd5..3deb2f7 100644
--- a/nndownload/__main__.py
+++ b/nndownload/__main__.py
@@ -1,3 +1,6 @@
+"""CLI entry for nndownload."""
+
from .nndownload import cli
+
cli()
diff --git a/nndownload/ffmpeg_dl.py b/nndownload/ffmpeg_dl.py
index 206ea1d..a5821fc 100644
--- a/nndownload/ffmpeg_dl.py
+++ b/nndownload/ffmpeg_dl.py
@@ -1,3 +1,5 @@
+"""ffmpeg subprocess for merging DMS streams to output."""
+
import re
import subprocess
import warnings
@@ -13,18 +15,17 @@
class FfmpegDLException(Exception):
"""Raised when a download fails."""
- pass
class FfmpegDL:
"""Send input streams for download to an `ffmpeg` subprocess."""
- FF_GLOBAL_ARGS = [
+ FF_GLOBAL_ARGS = (
"-progress",
"-",
"-nostats",
"-y"
- ]
+ )
REGEX_TIME_GROUP = "([0-9]{2}:[0-9]{2}:[0-9]{2}[.[0-9]*]?)"
REGEX_OUT_TIME = re.compile(
@@ -33,20 +34,26 @@ class FfmpegDL:
@classmethod
def get_timedelta(cls, time_str: AnyStr, str_format: AnyStr = "%H:%M:%S.%f"):
+ """Return a timedelta for a given time string"""
+
t = datetime.strptime(time_str, str_format)
return timedelta(hours=t.hour, minutes=t.minute, seconds=t.second, microseconds=t.microsecond)
def __init__(self, streams: List, input_kwargs: List, output_path: AnyStr, output_kwargs: List, global_args: List = FF_GLOBAL_ARGS):
+ """Initialize a downloader to perform an ffmpeg conversion task."""
+
inputs = []
for stream in streams:
- input = ffmpeg.input(stream, **input_kwargs)
- inputs.append(input)
+ stream_input = ffmpeg.input(stream, **input_kwargs)
+ inputs.append(stream_input)
stream_spec = ffmpeg.output(*inputs, output_path, **output_kwargs).global_args(*global_args)
self.proc_args = ffmpeg._run.compile(stream_spec=stream_spec)
self.proc: subprocess.Popen = None
def load_subprocess(self):
+ """Open an ffmpeg subprocess."""
+
self.proc = subprocess.Popen(
args=self.proc_args,
stdin=subprocess.PIPE,
@@ -56,11 +63,14 @@ def load_subprocess(self):
)
def convert(self, name: AnyStr, duration: float):
+ """Perform an ffmpeg conversion while printing progress using tqdm."""
+
progress = tqdm_rich(desc=name, unit="sec", colour="green", total=duration)
self.load_subprocess()
stdout_line = None
+ prev_line = None
while True:
if self.proc.stdout is None:
continue
diff --git a/nndownload/hls_dl.py b/nndownload/hls_dl.py
index 112e60d..da87158 100644
--- a/nndownload/hls_dl.py
+++ b/nndownload/hls_dl.py
@@ -1,3 +1,5 @@
+"""Native HLS downloader for DMS streams."""
+
import re
from concurrent.futures import ThreadPoolExecutor
@@ -11,7 +13,6 @@
def download_hls(m3u8_url, filename, name, session, progress, threads):
"""Perform a native HLS download of a provided M3U8 manifest."""
- # TODO: Support proxies (#150)
from .nndownload import FormatNotAvailableException
diff --git a/nndownload/nndownload.py b/nndownload/nndownload.py
index 776eceb..ad7f649 100644
--- a/nndownload/nndownload.py
+++ b/nndownload/nndownload.py
@@ -1,6 +1,5 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
"""Download videos and process other links from Niconico (nicovideo.jp)."""
+
import argparse
import asyncio
import collections
@@ -169,9 +168,9 @@
logger = logging.getLogger(__name__)
-cmdl_usage = "%(prog)s [options] input"
-cmdl_version = __version__
-cmdl_parser = argparse.ArgumentParser(usage=cmdl_usage, conflict_handler="resolve")
+CMDL_USAGE = "%(prog)s [options] input"
+CMDL_VERSION = __version__
+cmdl_parser = argparse.ArgumentParser(usage=CMDL_USAGE, conflict_handler="resolve")
cmdl_parser.add_argument("-u", "--username", dest="username", metavar="EMAIL/TEL",
help="account email address or telephone number")
@@ -180,7 +179,7 @@
cmdl_parser.add_argument("-n", "--netrc", action="store_true", dest="netrc", help="use .netrc authentication")
cmdl_parser.add_argument("-q", "--quiet", action="store_true", dest="quiet", help="suppress output to console")
cmdl_parser.add_argument("-l", "--log", action="store_true", dest="log", help="log output to file")
-cmdl_parser.add_argument("-v", "--version", action="version", version=cmdl_version)
+cmdl_parser.add_argument("-v", "--version", action="version", version=CMDL_VERSION)
cmdl_parser.add_argument("input", action="store", nargs="*", help="URLs or files")
dl_group = cmdl_parser.add_argument_group("download options")
@@ -217,41 +216,30 @@
# Globals
-_start_time = _progress = 0
-_cmdl_opts = None
+_START_TIME = _PROGRESS = 0
+_CMDL_OPTS = None
class AuthenticationException(Exception):
"""Raised when logging in to Niconico failed."""
- pass
-
class ArgumentException(Exception):
"""Raised when reading the argument failed."""
- pass
-
class FormatNotSupportedException(Exception):
"""Raised when the response format is not supported."""
- pass
-
class FormatNotAvailableException(Exception):
"""Raised when the requested format is not available."""
- pass
-
class ParameterExtractionException(Exception):
"""Raised when parameters could not be successfully extracted."""
- pass
class ExistingDownloadEncounteredQuit(Exception):
"""Raised when an existing and complete download is encountered."""
- pass
class ListQualitiesQuit(Exception):
"""Raised when listing available qualities for a video."""
- pass
## Utility methods
@@ -259,7 +247,7 @@ class ListQualitiesQuit(Exception):
def configure_logger():
"""Initialize logger."""
- if _cmdl_opts.log:
+ if _CMDL_OPTS.log:
logger.setLevel(logging.INFO)
log_handler = logging.FileHandler(f"[{MODULE_NAME}] {time.strftime('%Y-%m-%d')}.log", encoding="utf-8")
formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s")
@@ -270,7 +258,7 @@ def configure_logger():
def log_exception(error: Exception):
"""Process exception for logger."""
- if _cmdl_opts.log:
+ if _CMDL_OPTS.log:
sys.stdout.write("{0}: {1}\n".format(type(error).__name__, str(error)))
sys.stdout.flush()
logger.exception("An exception was encountered:\n")
@@ -278,15 +266,15 @@ def log_exception(error: Exception):
output("{0}: {1}\n".format(type(error).__name__, str(error)), logging.ERROR, force=True)
-def output(string: AnyStr, level=logging.INFO, force: bool = False):
+def output(out_str: AnyStr, level=logging.INFO, force: bool = False):
"""Print status to console unless quiet flag is set."""
- global _cmdl_opts
- if _cmdl_opts.log:
- logger.log(level, string.strip("\n"))
+ global _CMDL_OPTS
+ if _CMDL_OPTS.log:
+ logger.log(level, out_str.strip("\n"))
- if not _cmdl_opts.quiet or force:
- sys.stdout.write(string)
+ if not _CMDL_OPTS.quiet or force:
+ sys.stdout.write(out_str)
sys.stdout.flush()
@@ -306,8 +294,8 @@ def format_value(value: int, custom_type: str = "B", use_bits: bool = False):
converted = float(value / base ** exponent)
return "{0:.2f}{1}{2}".format(converted, suffix, custom_type) if not use_bits else "{0}{1}{2}".format(converted, suffix, custom_type)
- except IndexError:
- raise IndexError("Could not format number of bytes")
+ except IndexError as exception:
+ raise IndexError("Could not format number of bytes") from exception
def calculate_speed(start, now, prog_bytes):
@@ -335,7 +323,7 @@ def sanitize_for_path(value: AnyStr, replace: AnyStr = ' '):
def create_filename(template_params: dict, is_comic: bool = False):
"""Create filename from document parameters."""
- filename_template = _cmdl_opts.output_path
+ filename_template = _CMDL_OPTS.output_path
if filename_template:
template_dict = dict(template_params)
@@ -436,7 +424,7 @@ def generic_dl_request(session: requests.Session, uri: AnyStr, filename: AnyStr,
def rewrite_file(filename: AnyStr, old_str: AnyStr, new_str: AnyStr):
"""Replace a string in a text file."""
- with open(filename, "r+") as file:
+ with open(filename, "r+", encoding="utf-8") as file:
raw = file.read()
new = raw.replace(old_str, new_str)
file.seek(0)
@@ -562,7 +550,6 @@ async def open_nama_websocket(
finally:
heartbeat.cancel()
- return
def reserve_timeshift(session: requests.Session, nama_id: AnyStr) -> AnyStr:
@@ -731,7 +718,7 @@ def download_manga_chapter(session, chapter_id):
template_params = collect_seiga_manga_parameters(session, chapter_document, template_params)
chapter_directory = create_filename(template_params, is_comic=True)
- if not _cmdl_opts.skip_media:
+ if not _CMDL_OPTS.skip_media:
output("Downloading {0} to \"{1}\"...\n".format(chapter_id, chapter_directory), logging.INFO)
images = chapter_document.select("img.lazyload")
@@ -761,13 +748,13 @@ def download_manga_chapter(session, chapter_id):
output("\n", logging.DEBUG)
output("Finished downloading {0} to \"{1}\".\n".format(chapter_id, chapter_directory), logging.INFO)
- if _cmdl_opts.dump_metadata:
+ if _CMDL_OPTS.dump_metadata:
metadata_path = os.path.join(chapter_directory, "metadata.json")
dump_metadata(metadata_path, template_params)
- if _cmdl_opts.download_thumbnail:
+ if _CMDL_OPTS.download_thumbnail:
thumb_filename = os.path.join(chapter_directory, "folder")
download_thumbnail(session, thumb_filename, template_params)
- if _cmdl_opts.download_comments:
+ if _CMDL_OPTS.download_comments:
output("Downloading comments for Seiga comics is not currently supported.\n", logging.WARNING)
@@ -799,7 +786,7 @@ def download_image(session, image_id):
filename = create_filename(template_params)
- if not _cmdl_opts.skip_media:
+ if not _CMDL_OPTS.skip_media:
output("Downloading {0} to \"{1}\"...\n".format(image_id, filename), logging.INFO)
source_image_request = session.get(template_params["url"], stream=True)
@@ -811,11 +798,11 @@ def download_image(session, image_id):
output("Finished donwloading {0} to \"{1}\".\n".format(image_id, filename), logging.INFO)
- if _cmdl_opts.dump_metadata:
+ if _CMDL_OPTS.dump_metadata:
dump_metadata(filename, template_params)
- if _cmdl_opts.download_thumbnail:
+ if _CMDL_OPTS.download_thumbnail:
download_thumbnail(session, filename, template_params, set_thumb_extension=True)
- if _cmdl_opts.download_comments:
+ if _CMDL_OPTS.download_comments:
output("Downloading comments for Seiga images is not currently supported.\n", logging.WARNING)
@@ -848,8 +835,8 @@ def request_seiga_user(session, user_id):
if total_ids == 0:
raise ParameterExtractionException("Failed to collect user images. Please verify that the user's images page is public")
- if _cmdl_opts.playlist_start:
- start_index = _cmdl_opts.playlist_start
+ if _CMDL_OPTS.playlist_start:
+ start_index = _CMDL_OPTS.playlist_start
if start_index >= len(illust_ids):
raise ArgumentException("Starting index exceeds length of the user's available images")
else:
@@ -896,8 +883,8 @@ def request_seiga_user_manga(session, user_id):
if total_ids == 0:
raise ParameterExtractionException("Failed to collect user images. Please verify that the user's manga page is public")
- if _cmdl_opts.playlist_start:
- start_index = _cmdl_opts.playlist_start
+ if _CMDL_OPTS.playlist_start:
+ start_index = _CMDL_OPTS.playlist_start
if start_index >= len(manga_ids):
raise ArgumentException("Starting index exceeds length of the user's available manga")
else:
@@ -949,7 +936,7 @@ def download_channel_article(session: requests.Session, article_id: AnyStr):
filename = create_filename(template_params)
- if not _cmdl_opts.skip_media:
+ if not _CMDL_OPTS.skip_media:
output("Downloading {0} to \"{1}\"...\n".format(article_id, filename), logging.INFO)
with open(filename, "w", encoding="utf-8") as article_file:
@@ -958,9 +945,9 @@ def download_channel_article(session: requests.Session, article_id: AnyStr):
"", "\n").replace("
", "\n### ").replace("
", "\n").replace("", "").replace(
"", "- ").replace("", "\n").strip()
article_file.write(pretty_article_text)
- if _cmdl_opts.dump_metadata:
+ if _CMDL_OPTS.dump_metadata:
dump_metadata(filename, template_params)
- if _cmdl_opts.download_comments:
+ if _CMDL_OPTS.download_comments:
output("Downloading article comments is not currently supported.\n", logging.WARNING)
output("Finished downloading {0} to \"{1}\".\n".format(article_id, filename), logging.INFO)
@@ -994,8 +981,8 @@ def request_channel(session: requests.Session, channel_slug: AnyStr):
raise ParameterExtractionException("Failed to collect channel videos. Please verify that the channel's videos page is public")
output("{} videos returned.\n".format(total_ids), logging.INFO)
- if _cmdl_opts.playlist_start:
- start_index = _cmdl_opts.playlist_start
+ if _CMDL_OPTS.playlist_start:
+ start_index = _CMDL_OPTS.playlist_start
if start_index >= len(video_ids):
raise ArgumentException("Starting index exceeds length of the channel's video playlist")
else:
@@ -1057,12 +1044,12 @@ def request_video(session: requests.Session, video_id: AnyStr):
raise FormatNotAvailableException("Could not retrieve video info from thumbnail API")
concat_cookies = {}
- if _cmdl_opts.download_english:
+ if _CMDL_OPTS.download_english:
concat_cookies = {**concat_cookies, **EN_COOKIE}
- elif _cmdl_opts.download_chinese:
+ elif _CMDL_OPTS.download_chinese:
concat_cookies = {**concat_cookies, **TW_COOKIE}
- if _cmdl_opts.download_english and _cmdl_opts.download_chinese:
+ if _CMDL_OPTS.download_english and _CMDL_OPTS.download_chinese:
output("Multiple language flags were specified. --english will be used as the fallback.\n", logging.INFO)
video_request = session.get(VIDEO_URL.format(video_id), cookies=concat_cookies)
@@ -1073,17 +1060,17 @@ def request_video(session: requests.Session, video_id: AnyStr):
filename = create_filename(template_params)
- if not _cmdl_opts.skip_media:
+ if not _CMDL_OPTS.skip_media:
continue_code = download_video_media(session, filename, template_params)
- if _cmdl_opts.break_on_existing and not continue_code:
+ if _CMDL_OPTS.break_on_existing and not continue_code:
raise ExistingDownloadEncounteredQuit("Exiting as an existing video was encountered")
- if _cmdl_opts.add_metadata:
+ if _CMDL_OPTS.add_metadata:
add_metadata_to_container(filename, template_params)
- if _cmdl_opts.dump_metadata:
+ if _CMDL_OPTS.dump_metadata:
dump_metadata(filename, template_params)
- if _cmdl_opts.download_thumbnail:
+ if _CMDL_OPTS.download_thumbnail:
download_thumbnail(session, filename, template_params)
- if _cmdl_opts.download_comments:
+ if _CMDL_OPTS.download_comments:
download_comments(session, filename, template_params)
@@ -1113,8 +1100,8 @@ def request_user(session: requests.Session, user_id: AnyStr):
for video in user_videos_json["data"]["items"]:
video_ids.append(video["id"])
- if _cmdl_opts.playlist_start:
- start_index = _cmdl_opts.playlist_start
+ if _CMDL_OPTS.playlist_start:
+ start_index = _CMDL_OPTS.playlist_start
if start_index >= len(video_ids):
raise ArgumentException("Starting index exceeds length of the user's video playlist")
else:
@@ -1141,8 +1128,8 @@ def request_mylist(session: requests.Session, mylist_id: AnyStr):
mylist_json = json.loads(mylist_request.text)
items = mylist_json["data"]["mylist"]["items"]
- if _cmdl_opts.playlist_start:
- start_index = _cmdl_opts.playlist_start
+ if _CMDL_OPTS.playlist_start:
+ start_index = _CMDL_OPTS.playlist_start
if start_index >= len(items):
raise ArgumentException("Starting index exceeds length of the mylist")
else:
@@ -1210,14 +1197,14 @@ def request_series(session: requests.Session, series_id: AnyStr):
def show_multithread_progress(video_len):
"""Track overall download progress across threads."""
- global _progress, _start_time
+ global _PROGRESS, _START_TIME
finished = False
while not finished:
- if _progress >= video_len:
+ if _PROGRESS >= video_len:
finished = True
- done = int(25 * _progress / video_len)
- percent = int(100 * _progress / video_len)
- speed_str = calculate_speed(_start_time, time.time(), _progress)
+ done = int(25 * _PROGRESS / video_len)
+ percent = int(100 * _PROGRESS / video_len)
+ speed_str = calculate_speed(_START_TIME, time.time(), _PROGRESS)
output("\r|{0}{1}| {2}/100 @ {3:9}/s".format("#" * done, " " * (25 - done), percent, speed_str), logging.DEBUG)
@@ -1227,8 +1214,8 @@ def update_multithread_progress(bytes_len):
lock = threading.Lock()
lock.acquire()
try:
- global _progress
- _progress += bytes_len
+ global _PROGRESS
+ _PROGRESS += bytes_len
finally:
lock.release()
@@ -1270,9 +1257,9 @@ def perform_ffmpeg_dl(video_id: AnyStr, filename: AnyStr, duration: float, strea
video_download.convert(name=video_id, duration=duration)
return True
except FfmpegDLException as error:
- raise FormatNotAvailableException(f"ffmpeg failed to download the video or audio stream with the following error: \"{error}\"")
- except Exception:
- raise FormatNotAvailableException("Failed to download video or audio stream")
+ raise FormatNotAvailableException(f"ffmpeg failed to download the video or audio stream with the following error: \"{error}\"") from error
+ except Exception as exception:
+ raise FormatNotAvailableException("Failed to download video or audio stream") from exception
def perform_native_hls_dl(session: requests.Session, filename: AnyStr, duration: float, m3u8_streams: List, threads: int = 1):
@@ -1294,6 +1281,9 @@ def perform_native_hls_dl(session: requests.Session, filename: AnyStr, duration:
for task in tasks:
task["thread"].join()
+ if not tasks:
+ raise ArgumentException("No HLS download tasks were received")
+
# Video and audio
if len(tasks) > 1:
stream_filenames = [task["filename"] for task in tasks]
@@ -1308,15 +1298,15 @@ def perform_native_hls_dl(session: requests.Session, filename: AnyStr, duration:
})
video_convert.convert(name='Merging audio and video', duration=duration)
except FfmpegDLException as error:
- raise FormatNotAvailableException(f"ffmpeg failed to download the video or audio stream with the following error: \"{error}\"")
- except Exception:
- raise FormatNotAvailableException("Failed to download video or audio stream")
+ raise FormatNotAvailableException(f"ffmpeg failed to download the video or audio stream with the following error: \"{error}\"") from error
+ except Exception as exception:
+ raise FormatNotAvailableException("Failed to download video or audio stream") from exception
for stream_filename in stream_filenames:
os.remove(stream_filename)
# Only audio or video
else:
- shutil.move(task[0]["filename"], filename)
+ shutil.move(tasks[0]["filename"], filename)
return True
@@ -1344,7 +1334,7 @@ def download_video_media(session: requests.Session, filename: AnyStr, template_p
for stream_type, name in [("dms_video_uri", "video"), ("dms_audio_uri", "audio")]:
if template_params.get(stream_type):
m3u8_streams.append((template_params[stream_type], name))
- continue_code = perform_native_hls_dl(session, filename, float(template_params["duration"]), m3u8_streams, _cmdl_opts.threads)
+ continue_code = perform_native_hls_dl(session, filename, float(template_params["duration"]), m3u8_streams, _CMDL_OPTS.threads)
os.rename(filename, complete_filename)
return continue_code
@@ -1353,16 +1343,16 @@ def download_video_media(session: requests.Session, filename: AnyStr, template_p
dl_stream.raise_for_status()
video_len = int(dl_stream.headers["content-length"])
- if _cmdl_opts.threads:
+ if _CMDL_OPTS.threads:
output("Multithreading is experimental and will overwrite any existing files. --break-on-existing will be ignored.\n", logging.WARNING)
- threads = int(_cmdl_opts.threads)
+ threads = int(_CMDL_OPTS.threads)
if threads <= 0:
raise ArgumentException("Thread number must be a positive integer")
# Track total bytes downloaded across threads
- global _progress
- _progress = 0
+ global _PROGRESS
+ _PROGRESS = 0
# Pad out file to full length
file = open(filename, "wb")
@@ -1372,8 +1362,8 @@ def download_video_media(session: requests.Session, filename: AnyStr, template_p
# Calculate ranges for threads and dispatch
part = math.ceil(video_len / threads)
- global _start_time
- _start_time = time.time()
+ global _START_TIME
+ _START_TIME = time.time()
for i in range(threads):
start = part * i
@@ -1415,11 +1405,11 @@ def download_video_media(session: requests.Session, filename: AnyStr, template_p
"Current byte position exceeds the length of the video to be downloaded. Check the integrity of the existing file and "
"use --force-high-quality to resume this download when the high quality source is available.\n"
)
- except MP4StreamInfoError: # Thrown if not a valid MP4 (FLV, SWF)
+ except MP4StreamInfoError as error: # Thrown if not a valid MP4 (FLV, SWF)
raise FormatNotAvailableException(
"Current byte position exceeds the length of the video to be downloaded. Check the integrity of the existing file and use "
"--force-high-quality to resume this download when the high quality source is available.\n"
- )
+ ) from error
# current_byte_pos == video_len
else:
@@ -1463,13 +1453,13 @@ def download_video_media(session: requests.Session, filename: AnyStr, template_p
with open(filename, file_condition) as file:
file.seek(dl)
- _start_time = time.time()
+ _START_TIME = time.time()
for block in stream_iterator:
dl += len(block)
file.write(block)
done = int(25 * dl / video_len)
percent = int(100 * dl / video_len)
- speed_str = calculate_speed(_start_time, time.time(), dl)
+ speed_str = calculate_speed(_START_TIME, time.time(), dl)
output("\r|{0}{1}| {2}/100 @ {3:9}/s".format("#" * done, " " * (25 - done), percent, speed_str), logging.DEBUG)
output("\n", logging.DEBUG)
@@ -1516,7 +1506,7 @@ def list_qualities(sources_type: str, sources: list, is_dms: bool):
def select_quality(template_params: dict, template_key: AnyStr, sources: list, quality="") -> List[AnyStr]:
"""Select the specified quality from a sources list on DMC and DMS videos."""
- if quality and _cmdl_opts.force_high_quality:
+ if quality and _CMDL_OPTS.force_high_quality:
output("-f/--force-high-quality was set. Ignoring specified quality...\n", logging.WARNING)
# Assumes qualities are in descending order
@@ -1526,9 +1516,9 @@ def select_quality(template_params: dict, template_key: AnyStr, sources: list, q
lq_available = lowest_quality["isAvailable"]
# quality = "highest"
- if not hq_available and (_cmdl_opts.force_high_quality or (quality and quality.lower() == "highest")):
+ if not hq_available and (_CMDL_OPTS.force_high_quality or (quality and quality.lower() == "highest")):
raise FormatNotAvailableException("Highest quality is not currently available")
- elif _cmdl_opts.force_high_quality or (quality and quality.lower() == "highest"):
+ elif _CMDL_OPTS.force_high_quality or (quality and quality.lower() == "highest"):
template_params[template_key] = highest_quality["id"]
return [template_params[template_key]]
@@ -1571,17 +1561,17 @@ def perform_api_request(session: requests.Session, document: BeautifulSoup) -> d
template_params = collect_video_parameters(session, template_params, params)
- if (_cmdl_opts.no_audio and _cmdl_opts.no_video):
+ if (_CMDL_OPTS.no_audio and _CMDL_OPTS.no_video):
output("--no-audio and --no-video were both specified. Treating this download as if --skip-media was set.\n", logging.WARNING)
- _cmdl_opts.skip_media = True
- if _cmdl_opts.skip_media and not _cmdl_opts.list_qualities:
+ _CMDL_OPTS.skip_media = True
+ if _CMDL_OPTS.skip_media and not _CMDL_OPTS.list_qualities:
return template_params
# Perform request to Dwango Media Service (DMS)
# Began rollout starting 2023-11-01 for select videos and users (https://blog.nicovideo.jp/niconews/205042.html)
# Videos longer than 30 minutes in HD (>720p) quality appear to be served this way exclusively
elif params["media"]["domand"]:
- if _cmdl_opts.list_qualities:
+ if _CMDL_OPTS.list_qualities:
list_qualities("video", params["media"]["domand"]["videos"], True)
list_qualities("audio", params["media"]["domand"]["audios"], True)
raise ListQualitiesQuit("Exiting after listing available qualities")
@@ -1594,13 +1584,13 @@ def perform_api_request(session: requests.Session, document: BeautifulSoup) -> d
template_params,
"video_quality",
params["media"]["domand"]["videos"],
- _cmdl_opts.video_quality
+ _CMDL_OPTS.video_quality
)
audio_sources = select_quality(
template_params,
"audio_quality",
params["media"]["domand"]["audios"],
- _cmdl_opts.audio_quality
+ _CMDL_OPTS.audio_quality
)
# Limited to one video and audio source
@@ -1623,9 +1613,9 @@ def perform_api_request(session: requests.Session, document: BeautifulSoup) -> d
output("Retrieved video manifest.\n", logging.INFO)
output("Collecting video media URIs...\n")
- if not _cmdl_opts.no_video:
+ if not _CMDL_OPTS.no_video:
template_params["dms_video_uri"] = get_stream_from_manifest(manifest_text)
- if not _cmdl_opts.no_audio:
+ if not _CMDL_OPTS.no_audio:
template_params["dms_audio_uri"] = get_media_from_manifest(manifest_text, "audio")
# Modify container when only one stream is specified
@@ -1638,7 +1628,7 @@ def perform_api_request(session: requests.Session, document: BeautifulSoup) -> d
# Perform request to Dwango Media Cluster (DMC)
elif params["media"]["delivery"]:
- if _cmdl_opts.list_qualities:
+ if _CMDL_OPTS.list_qualities:
list_qualities("video", params["media"]["delivery"]["movie"]["videos"], False)
list_qualities("audio", params["media"]["delivery"]["movie"]["audios"], False)
raise ListQualitiesQuit("Exiting after listing available qualities")
@@ -1655,13 +1645,13 @@ def perform_api_request(session: requests.Session, document: BeautifulSoup) -> d
template_params,
"video_quality",
params["media"]["delivery"]["movie"]["videos"],
- _cmdl_opts.video_quality
+ _CMDL_OPTS.video_quality
)
audio_sources = select_quality(
template_params,
"audio_quality",
params["media"]["delivery"]["movie"]["audios"],
- _cmdl_opts.audio_quality
+ _CMDL_OPTS.audio_quality
)
heartbeat_lifetime = params["media"]["delivery"]["movie"]["session"]["heartbeatLifetime"]
@@ -1939,14 +1929,14 @@ def login(username: str, password: str, session_cookie: str) -> requests.Session
session.headers.update({"User-Agent": f"{MODULE_NAME}/{__version__}"})
- if _cmdl_opts.proxy:
+ if _CMDL_OPTS.proxy:
proxies = {
- "http": _cmdl_opts.proxy,
- "https": _cmdl_opts.proxy
+ "http": _CMDL_OPTS.proxy,
+ "https": _CMDL_OPTS.proxy
}
session.proxies.update(proxies)
- if not _cmdl_opts.no_login:
+ if not _CMDL_OPTS.no_login:
if not session_cookie:
output("Logging in...\n", logging.INFO)
@@ -1958,7 +1948,7 @@ def login(username: str, password: str, session_cookie: str) -> requests.Session
login_request = session.post(LOGIN_URL, data=login_post)
login_request.raise_for_status()
- if ("message=cant_login" in login_request.url):
+ if "message=cant_login" in login_request.url:
raise AuthenticationException("Incorrect email/telephone or password. Please verify your login details")
otp_code_request = session.get(login_request.url)
@@ -2075,15 +2065,17 @@ def process_url_mo(session, url_mo: Match):
def main():
+ """Main entry"""
+
try:
configure_logger()
- account_username = _cmdl_opts.username
- account_password = _cmdl_opts.password
- session_cookie = _cmdl_opts.session_cookie
+ account_username = _CMDL_OPTS.username
+ account_password = _CMDL_OPTS.password
+ session_cookie = _CMDL_OPTS.session_cookie
- if _cmdl_opts.netrc:
- if _cmdl_opts.username or _cmdl_opts.password or _cmdl_opts.session_cookie:
+ if _CMDL_OPTS.netrc:
+ if _CMDL_OPTS.username or _CMDL_OPTS.password or _CMDL_OPTS.session_cookie:
output("Ignoring input credentials in favor of .netrc.\n", logging.WARNING)
account_credentials = netrc.netrc().authenticators(HOST)
@@ -2092,7 +2084,7 @@ def main():
account_password = account_credentials[2]
else:
raise netrc.NetrcParseError("No authenticator available for {0}".format(HOST))
- elif not _cmdl_opts.no_login:
+ elif not _CMDL_OPTS.no_login:
while not account_username and not account_password and not session_cookie:
account_username = input("Email/telephone: ")
if account_username and not account_password:
@@ -2106,7 +2098,7 @@ def main():
session = login(account_username, account_password, session_cookie)
- for arg_item in _cmdl_opts.input:
+ for arg_item in _CMDL_OPTS.input:
try:
# Test if input is a valid URL or file
url_mo = VALID_URL_RE.match(arg_item)
@@ -2121,7 +2113,7 @@ def main():
process_url_mo(session, url_mo)
except Exception as error:
- if len(_cmdl_opts.input) == 1:
+ if len(_CMDL_OPTS.input) == 1:
raise
else:
log_exception(error)
@@ -2135,10 +2127,12 @@ def main():
def cli():
- global _cmdl_opts
+ """CLI entry"""
+
+ global _CMDL_OPTS
try:
- _cmdl_opts = cmdl_parser.parse_args()
+ _CMDL_OPTS = cmdl_parser.parse_args()
main()
except KeyboardInterrupt:
output("Keyboard interrupt received. Exiting...\n", logging.INFO)
diff --git a/pylintrc b/pylintrc
new file mode 100644
index 0000000..24f828a
--- /dev/null
+++ b/pylintrc
@@ -0,0 +1,2 @@
+[MESSAGES CONTROL]
+disable=C0209, C0301, W0602, W0603, W1514
\ No newline at end of file