Use library for progress

This commit is contained in:
ByteDream 2022-12-22 14:45:56 +01:00
parent 86759557fe
commit 17fa045c32
9 changed files with 307 additions and 295 deletions

View file

@ -1,5 +1,5 @@
use crate::cli::log::tab_info;
use crate::cli::utils::{download_segments, FFmpegPreset, find_resolution};
use crate::cli::utils::{download_segments, find_resolution, FFmpegPreset};
use crate::utils::context::Context;
use crate::utils::format::{format_string, Format};
use crate::utils::log::progress;
@ -137,7 +137,9 @@ impl Execute for Archive {
bail!("File extension is not '.mkv'. Currently only matroska / '.mkv' files are supported")
}
let _ = FFmpegPreset::ffmpeg_presets(self.ffmpeg_preset.clone())?;
if self.ffmpeg_preset.len() == 1 && self.ffmpeg_preset.get(0).unwrap() == &FFmpegPreset::Nvidia {
if self.ffmpeg_preset.len() == 1
&& self.ffmpeg_preset.get(0).unwrap() == &FFmpegPreset::Nvidia
{
warn!("Skipping 'nvidia' hardware acceleration preset since no other codec preset was specified")
}
@ -148,20 +150,20 @@ impl Execute for Archive {
let mut parsed_urls = vec![];
for (i, url) in self.urls.iter().enumerate() {
let _progress_handler = progress!("Parsing url {}", i + 1);
let progress_handler = progress!("Parsing url {}", i + 1);
match parse_url(&ctx.crunchy, url.clone(), true).await {
Ok((media_collection, url_filter)) => {
parsed_urls.push((media_collection, url_filter));
info!("Parsed url {}", i + 1)
progress_handler.stop(format!("Parsed url {}", i + 1))
}
Err(e) => bail!("url {} could not be parsed: {}", url, e),
}
}
for (i, (media_collection, url_filter)) in parsed_urls.into_iter().enumerate() {
let progress_handler = progress!("Fetching series details");
let archive_formats = match media_collection {
MediaCollection::Series(series) => {
let _progress_handler = progress!("Fetching series details");
formats_from_series(&self, series, &url_filter).await?
}
MediaCollection::Season(_) => bail!("Archiving a season is not supported"),
@ -171,10 +173,13 @@ impl Execute for Archive {
};
if archive_formats.is_empty() {
info!("Skipping url {} (no matching episodes found)", i + 1);
progress_handler.stop(format!(
"Skipping url {} (no matching episodes found)",
i + 1
));
continue;
}
info!("Loaded series information for url {}", i + 1);
progress_handler.stop(format!("Loaded series information for url {}", i + 1));
if log::max_level() == log::Level::Debug {
let seasons = sort_formats_after_seasons(
@ -310,9 +315,9 @@ impl Execute for Archive {
))
}
let _progess_handler = progress!("Generating mkv");
let progess_handler = progress!("Generating mkv");
generate_mkv(&self, path, video_paths, audio_paths, subtitle_paths)?;
info!("Mkv generated")
progess_handler.stop("Mkv generated")
}
}
@ -349,7 +354,10 @@ async fn formats_from_series(
// remove all seasons with the wrong audio for the current iterated season number
seasons.retain(|s| {
s.metadata.season_number != season.first().unwrap().metadata.season_number
|| archive.locale.iter().any(|l| s.metadata.audio_locales.contains(l))
|| archive
.locale
.iter()
.any(|l| s.metadata.audio_locales.contains(l))
})
}
@ -358,7 +366,10 @@ async fn formats_from_series(
BTreeMap::new();
for season in series.seasons().await? {
if !url_filter.is_season_valid(season.metadata.season_number)
|| !archive.locale.iter().any(|l| season.metadata.audio_locales.contains(l))
|| !archive
.locale
.iter()
.any(|l| season.metadata.audio_locales.contains(l))
{
continue;
}

View file

@ -1,5 +1,5 @@
use crate::cli::log::tab_info;
use crate::cli::utils::{download_segments, FFmpegPreset, find_resolution};
use crate::cli::utils::{download_segments, find_resolution, FFmpegPreset};
use crate::utils::context::Context;
use crate::utils::format::{format_string, Format};
use crate::utils::log::progress;
@ -92,7 +92,9 @@ impl Execute for Download {
}
let _ = FFmpegPreset::ffmpeg_presets(self.ffmpeg_preset.clone())?;
if self.ffmpeg_preset.len() == 1 && self.ffmpeg_preset.get(0).unwrap() == &FFmpegPreset::Nvidia {
if self.ffmpeg_preset.len() == 1
&& self.ffmpeg_preset.get(0).unwrap() == &FFmpegPreset::Nvidia
{
warn!("Skipping 'nvidia' hardware acceleration preset since no other codec preset was specified")
}
@ -103,18 +105,18 @@ impl Execute for Download {
let mut parsed_urls = vec![];
for (i, url) in self.urls.iter().enumerate() {
let _progress_handler = progress!("Parsing url {}", i + 1);
let progress_handler = progress!("Parsing url {}", i + 1);
match parse_url(&ctx.crunchy, url.clone(), true).await {
Ok((media_collection, url_filter)) => {
parsed_urls.push((media_collection, url_filter));
info!("Parsed url {}", i + 1)
progress_handler.stop(format!("Parsed url {}", i + 1))
}
Err(e) => bail!("url {} could not be parsed: {}", url, e),
}
}
for (i, (media_collection, url_filter)) in parsed_urls.into_iter().enumerate() {
let _progress_handler = progress!("Fetching series details");
let progress_handler = progress!("Fetching series details");
let formats = match media_collection {
MediaCollection::Series(series) => {
debug!("Url {} is series ({})", i + 1, series.title);
@ -156,11 +158,10 @@ impl Execute for Download {
};
let Some(formats) = formats else {
info!("Skipping url {} (no matching episodes found)", i + 1);
progress_handler.stop(format!("Skipping url {} (no matching episodes found)", i + 1));
continue;
};
info!("Loaded series information for url {}", i + 1);
drop(_progress_handler);
progress_handler.stop(format!("Loaded series information for url {}", i + 1));
if log::max_level() == log::Level::Debug {
let seasons = sort_formats_after_seasons(formats.clone());
@ -231,7 +232,9 @@ impl Execute for Download {
tab_info!("Resolution: {}", format.stream.resolution);
tab_info!("FPS: {:.2}", format.stream.fps);
if path.extension().unwrap_or_default().to_string_lossy() != "ts" || !self.ffmpeg_preset.is_empty() {
if path.extension().unwrap_or_default().to_string_lossy() != "ts"
|| !self.ffmpeg_preset.is_empty()
{
download_ffmpeg(&ctx, &self, format.stream, path.as_path()).await?;
} else if path.to_str().unwrap() == "-" {
let mut stdout = std::io::stdout().lock();
@ -247,7 +250,12 @@ impl Execute for Download {
}
}
async fn download_ffmpeg(ctx: &Context, download: &Download, variant_data: VariantData, target: &Path) -> Result<()> {
async fn download_ffmpeg(
ctx: &Context,
download: &Download,
variant_data: VariantData,
target: &Path,
) -> Result<()> {
let (input_presets, output_presets) =
FFmpegPreset::ffmpeg_presets(download.ffmpeg_preset.clone())?;

View file

@ -1,106 +1,17 @@
use indicatif::{ProgressBar, ProgressStyle};
use log::{
set_boxed_logger, set_max_level, Level, LevelFilter, Log, Metadata, Record, SetLoggerError,
};
use std::io::{stdout, Write};
use std::sync::{mpsc, Mutex};
use std::sync::Mutex;
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
struct CliProgress {
handler: JoinHandle<()>,
sender: mpsc::SyncSender<(String, Level)>,
}
impl CliProgress {
fn new(record: &Record) -> Self {
let (tx, rx) = mpsc::sync_channel(1);
let init_message = format!("{}", record.args());
let init_level = record.level();
let handler = thread::spawn(move || {
#[cfg(not(windows))]
let ok = '✔';
#[cfg(windows)]
// windows does not support all unicode characters by default in their consoles, so
// we're using this (square root?) symbol instead. microsoft.
let ok = '√';
let states = ["-", "\\", "|", "/"];
let mut old_message = init_message.clone();
let mut latest_info_message = init_message;
let mut old_level = init_level;
for i in 0.. {
let (msg, level) = match rx.try_recv() {
Ok(payload) => payload,
Err(e) => match e {
mpsc::TryRecvError::Empty => (old_message.clone(), old_level),
mpsc::TryRecvError::Disconnected => break,
},
};
// clear last line
// prefix (2), space (1), state (1), space (1), message(n)
let _ = write!(stdout(), "\r {}", " ".repeat(old_message.len()));
if old_level != level || old_message != msg {
if old_level <= Level::Warn {
let _ = writeln!(stdout(), "\r:: • {}", old_message);
} else if old_level == Level::Info && level <= Level::Warn {
let _ = writeln!(stdout(), "\r:: → {}", old_message);
} else if level == Level::Info {
latest_info_message = msg.clone();
}
}
let _ = write!(
stdout(),
"\r:: {} {}",
states[i / 2 % states.len()],
if level == Level::Info {
&msg
} else {
&latest_info_message
}
);
let _ = stdout().flush();
old_message = msg;
old_level = level;
thread::sleep(Duration::from_millis(100));
}
// clear last line
// prefix (2), space (1), state (1), space (1), message(n)
let _ = write!(stdout(), "\r {}", " ".repeat(old_message.len()));
let _ = writeln!(stdout(), "\r:: {} {}", ok, old_message);
let _ = stdout().flush();
});
Self {
handler,
sender: tx,
}
}
fn send(&self, record: &Record) {
let _ = self
.sender
.send((format!("{}", record.args()), record.level()));
}
fn stop(self) {
drop(self.sender);
let _ = self.handler.join();
}
}
#[allow(clippy::type_complexity)]
pub struct CliLogger {
all: bool,
level: LevelFilter,
progress: Mutex<Option<CliProgress>>,
progress: Mutex<Option<ProgressBar>>,
}
impl Log for CliLogger {
@ -127,7 +38,7 @@ impl Log for CliLogger {
"progress_end" => self.progress(record, true),
_ => {
if self.progress.lock().unwrap().is_some() {
self.progress(record, false);
self.progress(record, false)
} else if record.level() > Level::Warn {
self.normal(record)
} else {
@ -182,13 +93,34 @@ impl CliLogger {
}
fn progress(&self, record: &Record, stop: bool) {
let mut progress_option = self.progress.lock().unwrap();
if stop && progress_option.is_some() {
progress_option.take().unwrap().stop()
} else if let Some(p) = &*progress_option {
p.send(record);
let mut progress = self.progress.lock().unwrap();
let msg = format!("{}", record.args());
if stop && progress.is_some() {
if msg.is_empty() {
progress.take().unwrap().finish()
} else {
progress.take().unwrap().finish_with_message(msg)
}
} else if let Some(p) = &*progress {
p.println(format!(":: → {}", msg))
} else {
*progress_option = Some(CliProgress::new(record))
#[cfg(not(windows))]
let finish_str = "";
#[cfg(windows)]
// windows does not support all unicode characters by default in their consoles, so
// we're using this (square root?) symbol instead. microsoft.
let finish_str = "";
let pb = ProgressBar::new_spinner();
pb.set_style(
ProgressStyle::with_template(":: {spinner} {msg}")
.unwrap()
.tick_strings(&["-", "\\", "|", "/", finish_str]),
);
pb.enable_steady_tick(Duration::from_millis(200));
pb.set_message(msg);
*progress = Some(pb)
}
}
}

View file

@ -1,13 +1,12 @@
use crate::utils::context::Context;
use anyhow::{bail, Result};
use crunchyroll_rs::media::{Resolution, VariantData, VariantSegment};
use indicatif::{ProgressBar, ProgressFinish, ProgressStyle};
use log::{debug, LevelFilter};
use std::borrow::{Borrow, BorrowMut};
use std::collections::BTreeMap;
use std::io;
use std::io::Write;
use std::sync::{mpsc, Arc, Mutex};
use std::time::Duration;
use tokio::task::JoinSet;
pub fn find_resolution(
@ -35,78 +34,25 @@ pub async fn download_segments(
let client = Arc::new(ctx.crunchy.client());
let count = Arc::new(Mutex::new(0));
let amount = Arc::new(Mutex::new(0));
// only print progress when log level is info
let output_handler = if log::max_level() == LevelFilter::Info {
let output_count = count.clone();
let output_amount = amount.clone();
Some(tokio::spawn(async move {
let sleep_time_ms = 100;
let iter_per_sec = 1000f64 / sleep_time_ms as f64;
let progress = if log::max_level() == LevelFilter::Info {
let estimated_file_size = (variant_data.bandwidth / 8)
* segments
.iter()
.map(|s| s.length.unwrap_or_default().as_secs())
.sum::<u64>();
let mut bytes_start = 0f64;
let mut speed = 0f64;
let mut percentage = 0f64;
while *output_count.lock().unwrap() < total_segments || percentage < 100f64 {
let tmp_amount = *output_amount.lock().unwrap() as f64;
let tmp_speed = (tmp_amount - bytes_start) / 1024f64 / 1024f64;
if *output_count.lock().unwrap() < 3 {
speed = tmp_speed;
} else {
let (old_speed_ratio, new_speed_ratio) = if iter_per_sec <= 1f64 {
(0f64, 1f64)
} else {
(1f64 - (1f64 / iter_per_sec), (1f64 / iter_per_sec))
};
// calculate the average download speed "smoother"
speed = (speed * old_speed_ratio) + (tmp_speed * new_speed_ratio);
}
percentage =
(*output_count.lock().unwrap() as f64 / total_segments as f64) * 100f64;
let size = terminal_size::terminal_size()
.unwrap_or((terminal_size::Width(60), terminal_size::Height(0)))
.0
.0 as usize;
// there is a offset of 1 "length" (idk how to describe it), so removing 1 from
// `progress_available` would fill the terminal width completely. on multiple
// systems there is a bug that printing until the end of the line causes a newline
// even though technically there shouldn't be one. on my tests, this only happens on
// windows and mac machines and (at the addressed environments) only with release
// builds. so maybe an unwanted optimization?
let progress_available = size
- if let Some(msg) = &message {
35 + msg.len()
} else {
34
};
let progress_done_count =
(progress_available as f64 * (percentage / 100f64)).ceil() as usize;
let progress_to_do_count = progress_available - progress_done_count;
let _ = write!(
io::stdout(),
"\r:: {}{:>5.1} MiB {:>5.2} MiB/s [{}{}] {:>3}%",
message.clone().map_or("".to_string(), |msg| msg + " "),
tmp_amount / 1024f64 / 1024f64,
speed * iter_per_sec,
"#".repeat(progress_done_count),
"-".repeat(progress_to_do_count),
percentage as usize
);
bytes_start = tmp_amount;
tokio::time::sleep(Duration::from_millis(sleep_time_ms)).await;
}
println!()
}))
let progress = ProgressBar::new(estimated_file_size)
.with_style(
ProgressStyle::with_template(
":: {msg}{bytes:>10} {bytes_per_sec:>12} [{wide_bar}] {percent:>3}%",
)
.unwrap()
.progress_chars("##-"),
)
.with_message(message.map(|m| m + " ").unwrap_or_default())
.with_finish(ProgressFinish::Abandon);
Some(progress)
} else {
None
};
@ -116,7 +62,7 @@ pub async fn download_segments(
for _ in 0..cpus {
segs.push(vec![])
}
for (i, segment) in segments.into_iter().enumerate() {
for (i, segment) in segments.clone().into_iter().enumerate() {
segs[i - ((i / cpus) * cpus)].push(segment);
}
@ -127,15 +73,12 @@ pub async fn download_segments(
let thread_client = client.clone();
let thread_sender = sender.clone();
let thread_segments = segs.remove(0);
let thread_amount = amount.clone();
let thread_count = count.clone();
join_set.spawn(async move {
for (i, segment) in thread_segments.into_iter().enumerate() {
let response = thread_client.get(&segment.url).send().await?;
let mut buf = response.bytes().await?.to_vec();
*thread_amount.lock().unwrap() += buf.len();
buf = VariantSegment::decrypt(buf.borrow_mut(), segment.key)?.to_vec();
debug!(
"Downloaded and decrypted segment {} ({})",
@ -155,13 +98,28 @@ pub async fn download_segments(
let mut buf: BTreeMap<usize, Vec<u8>> = BTreeMap::new();
loop {
// is always `Some` because `sender` does not get dropped when all threads are finished
let data = receiver.recv().unwrap();
let (pos, bytes) = receiver.recv().unwrap();
if data_pos == data.0 {
writer.write_all(data.1.borrow())?;
if let Some(p) = &progress {
let progress_len = p.length().unwrap();
let estimated_segment_len = (variant_data.bandwidth / 8)
* segments
.get(pos)
.unwrap()
.length
.unwrap_or_default()
.as_secs();
let bytes_len = bytes.len() as u64;
p.set_length(progress_len - estimated_segment_len + bytes_len);
p.inc(bytes_len)
}
if data_pos == pos {
writer.write_all(bytes.borrow())?;
data_pos += 1;
} else {
buf.insert(data.0, data.1);
buf.insert(pos, bytes);
}
while let Some(b) = buf.remove(&data_pos) {
writer.write_all(b.borrow())?;
@ -176,9 +134,6 @@ pub async fn download_segments(
while let Some(joined) = join_set.join_next().await {
joined??
}
if let Some(handler) = output_handler {
handler.await?
}
Ok(())
}
@ -200,7 +155,7 @@ impl ToString for FFmpegPreset {
&FFmpegPreset::H265 => "h265",
&FFmpegPreset::H264 => "h264",
}
.to_string()
.to_string()
}
}
@ -233,7 +188,9 @@ impl FFmpegPreset {
})
}
pub(crate) fn ffmpeg_presets(mut presets: Vec<FFmpegPreset>) -> Result<(Vec<String>, Vec<String>)> {
pub(crate) fn ffmpeg_presets(
mut presets: Vec<FFmpegPreset>,
) -> Result<(Vec<String>, Vec<String>)> {
fn preset_check_remove(presets: &mut Vec<FFmpegPreset>, preset: FFmpegPreset) -> bool {
if let Some(i) = presets.iter().position(|p| p == &preset) {
presets.remove(i);

View file

@ -6,7 +6,7 @@ use anyhow::bail;
use anyhow::Result;
use clap::{Parser, Subcommand};
use crunchyroll_rs::{Crunchyroll, Locale};
use log::{debug, error, info, LevelFilter};
use log::{debug, error, LevelFilter};
use std::{env, fs};
mod cli;
@ -196,7 +196,7 @@ async fn crunchyroll_session(cli: &Cli) -> Result<Crunchyroll> {
+ cli.login_method.etp_rt.is_some() as u8
+ cli.login_method.anonymous as u8;
let _progress_handler = progress!("Logging in");
let progress_handler = progress!("Logging in");
if login_methods_count == 0 {
if let Some(login_file_path) = cli::login::login_file_path() {
if login_file_path.exists() {
@ -232,7 +232,7 @@ async fn crunchyroll_session(cli: &Cli) -> Result<Crunchyroll> {
bail!("should never happen")
};
info!("Logged in");
progress_handler.stop("Logged in");
Ok(crunchy)
}

View file

@ -1,10 +1,21 @@
use log::info;
pub struct ProgressHandler;
pub struct ProgressHandler {
pub(crate) stopped: bool,
}
impl Drop for ProgressHandler {
fn drop(&mut self) {
info!(target: "progress_end", "")
if !self.stopped {
info!(target: "progress_end", "")
}
}
}
impl ProgressHandler {
pub(crate) fn stop<S: AsRef<str>>(mut self, msg: S) {
self.stopped = true;
info!(target: "progress_end", "{}", msg.as_ref())
}
}
@ -12,7 +23,7 @@ macro_rules! progress {
($($arg:tt)+) => {
{
log::info!(target: "progress", $($arg)+);
$crate::utils::log::ProgressHandler{}
$crate::utils::log::ProgressHandler{stopped: false}
}
}
}