From bbb5a7876520384e2e7725d1adb6bb08faa9caa5 Mon Sep 17 00:00:00 2001 From: Valentine Briese Date: Sun, 15 Oct 2023 11:52:53 -0700 Subject: [PATCH 1/2] Add `--threads` (`-t`) option to downloading commands (#256) * Add `single-threaded` option to downloading commands * Replace `--single-threaded` boolean option with `--threads` optional `usize` option * Simplify `threads` field unwrapping * Make `--threads` `usize` with a default value --- crunchy-cli-core/src/archive/command.rs | 7 +- crunchy-cli-core/src/download/command.rs | 7 +- crunchy-cli-core/src/utils/download.rs | 301 ++++++++++++----------- 3 files changed, 166 insertions(+), 149 deletions(-) diff --git a/crunchy-cli-core/src/archive/command.rs b/crunchy-cli-core/src/archive/command.rs index 7ce1658..bb4794f 100644 --- a/crunchy-cli-core/src/archive/command.rs +++ b/crunchy-cli-core/src/archive/command.rs @@ -98,6 +98,10 @@ pub struct Archive { #[arg(short, long, default_value_t = false)] pub(crate) yes: bool, + #[arg(help = "The number of threads used to download")] + #[arg(short, long, default_value_t = num_cpus::get())] + pub(crate) threads: usize, + #[arg(help = "Crunchyroll series url(s)")] #[arg(required = true)] pub(crate) urls: Vec, @@ -158,7 +162,8 @@ impl Execute for Archive { .ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default()) .output_format(Some("matroska".to_string())) .audio_sort(Some(self.audio.clone())) - .subtitle_sort(Some(self.subtitle.clone())); + .subtitle_sort(Some(self.subtitle.clone())) + .threads(self.threads); for single_formats in single_format_collection.into_iter() { let (download_formats, mut format) = get_format(&self, &single_formats).await?; diff --git a/crunchy-cli-core/src/download/command.rs b/crunchy-cli-core/src/download/command.rs index baf27bf..760bf38 100644 --- a/crunchy-cli-core/src/download/command.rs +++ b/crunchy-cli-core/src/download/command.rs @@ -80,6 +80,10 @@ pub struct Download { #[arg(long, default_value_t = false)] pub(crate) force_hardsub: bool, + #[arg(help = "The number of threads used to download")] + #[arg(short, long, default_value_t = num_cpus::get())] + pub(crate) threads: usize, + #[arg(help = "Url(s) to Crunchyroll episodes or series")] #[arg(required = true)] pub(crate) urls: Vec, @@ -149,7 +153,8 @@ impl Execute for Download { } else { None }) - .ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default()); + .ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default()) + .threads(self.threads); for mut single_formats in single_format_collection.into_iter() { // the vec contains always only one item diff --git a/crunchy-cli-core/src/utils/download.rs b/crunchy-cli-core/src/utils/download.rs index 29f160a..ff9f240 100644 --- a/crunchy-cli-core/src/utils/download.rs +++ b/crunchy-cli-core/src/utils/download.rs @@ -50,6 +50,7 @@ pub struct DownloadBuilder { audio_sort: Option>, subtitle_sort: Option>, force_hardsub: bool, + threads: usize, } impl DownloadBuilder { @@ -61,6 +62,7 @@ impl DownloadBuilder { audio_sort: None, subtitle_sort: None, force_hardsub: false, + threads: num_cpus::get(), } } @@ -73,6 +75,7 @@ impl DownloadBuilder { subtitle_sort: self.subtitle_sort, force_hardsub: self.force_hardsub, + threads: self.threads, formats: vec![], } @@ -99,6 +102,7 @@ pub struct Downloader { subtitle_sort: Option>, force_hardsub: bool, + threads: usize, formats: Vec, } @@ -502,7 +506,8 @@ impl Downloader { let tempfile = tempfile(".mp4")?; let (mut file, path) = tempfile.into_parts(); - download_segments(ctx, &mut file, message, variant_data).await?; + self.download_segments(ctx, &mut file, message, variant_data) + .await?; Ok(path) } @@ -516,7 +521,8 @@ impl Downloader { let tempfile = tempfile(".m4a")?; let (mut file, path) = tempfile.into_parts(); - download_segments(ctx, &mut file, message, variant_data).await?; + self.download_segments(ctx, &mut file, message, variant_data) + .await?; Ok(path) } @@ -537,188 +543,189 @@ impl Downloader { Ok(path) } -} -pub async fn download_segments( - ctx: &Context, - writer: &mut impl Write, - message: String, - variant_data: &VariantData, -) -> Result<()> { - let segments = variant_data.segments().await?; - let total_segments = segments.len(); + async fn download_segments( + &self, + ctx: &Context, + writer: &mut impl Write, + message: String, + variant_data: &VariantData, + ) -> Result<()> { + let segments = variant_data.segments().await?; + let total_segments = segments.len(); - let client = Arc::new(ctx.crunchy.client()); - let count = Arc::new(Mutex::new(0)); + let client = Arc::new(ctx.crunchy.client()); + let count = Arc::new(Mutex::new(0)); - let progress = if log::max_level() == LevelFilter::Info { - let estimated_file_size = estimate_variant_file_size(variant_data, &segments); + let progress = if log::max_level() == LevelFilter::Info { + let estimated_file_size = estimate_variant_file_size(variant_data, &segments); - let progress = ProgressBar::new(estimated_file_size) - .with_style( - ProgressStyle::with_template( - ":: {msg} {bytes:>10} {bytes_per_sec:>12} [{wide_bar}] {percent:>3}%", + let progress = ProgressBar::new(estimated_file_size) + .with_style( + ProgressStyle::with_template( + ":: {msg} {bytes:>10} {bytes_per_sec:>12} [{wide_bar}] {percent:>3}%", + ) + .unwrap() + .progress_chars("##-"), ) - .unwrap() - .progress_chars("##-"), - ) - .with_message(message) - .with_finish(ProgressFinish::Abandon); - Some(progress) - } else { - None - }; + .with_message(message) + .with_finish(ProgressFinish::Abandon); + Some(progress) + } else { + None + }; - let cpus = num_cpus::get(); - let mut segs: Vec> = Vec::with_capacity(cpus); - for _ in 0..cpus { - segs.push(vec![]) - } - for (i, segment) in segments.clone().into_iter().enumerate() { - segs[i - ((i / cpus) * cpus)].push(segment); - } + let cpus = self.threads; + let mut segs: Vec> = Vec::with_capacity(cpus); + for _ in 0..cpus { + segs.push(vec![]) + } + for (i, segment) in segments.clone().into_iter().enumerate() { + segs[i - ((i / cpus) * cpus)].push(segment); + } - let (sender, mut receiver) = unbounded_channel(); + let (sender, mut receiver) = unbounded_channel(); - let mut join_set: JoinSet> = JoinSet::new(); - for num in 0..cpus { - let thread_client = client.clone(); - let thread_sender = sender.clone(); - let thread_segments = segs.remove(0); - let thread_count = count.clone(); - join_set.spawn(async move { - let after_download_sender = thread_sender.clone(); + let mut join_set: JoinSet> = JoinSet::new(); + for num in 0..cpus { + let thread_client = client.clone(); + let thread_sender = sender.clone(); + let thread_segments = segs.remove(0); + let thread_count = count.clone(); + join_set.spawn(async move { + let after_download_sender = thread_sender.clone(); - // the download process is encapsulated in its own function. this is done to easily - // catch errors which get returned with `...?` and `bail!(...)` and that the thread - // itself can report that an error has occurred - let download = || async move { - for (i, segment) in thread_segments.into_iter().enumerate() { - let mut retry_count = 0; - let mut buf = loop { - let request = thread_client - .get(&segment.url) - .timeout(Duration::from_secs(60)) - .send(); + // the download process is encapsulated in its own function. this is done to easily + // catch errors which get returned with `...?` and `bail!(...)` and that the thread + // itself can report that an error has occurred + let download = || async move { + for (i, segment) in thread_segments.into_iter().enumerate() { + let mut retry_count = 0; + let mut buf = loop { + let request = thread_client + .get(&segment.url) + .timeout(Duration::from_secs(60)) + .send(); - let response = match request.await { - Ok(r) => r, - Err(e) => { - if retry_count == 5 { - bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), e) - } - debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count); - continue - } - }; - - match response.bytes().await { - Ok(b) => break b.to_vec(), - Err(e) => { - if e.is_body() { + let response = match request.await { + Ok(r) => r, + Err(e) => { if retry_count == 5 { bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), e) } - debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count) - } else { - bail!("{}", e) + debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count); + continue + } + }; + + match response.bytes().await { + Ok(b) => break b.to_vec(), + Err(e) => { + if e.is_body() { + if retry_count == 5 { + bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), e) + } + debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count) + } else { + bail!("{}", e) + } } } - } - retry_count += 1; - }; + retry_count += 1; + }; - buf = VariantSegment::decrypt(buf.borrow_mut(), segment.key)?.to_vec(); + buf = VariantSegment::decrypt(buf.borrow_mut(), segment.key)?.to_vec(); - let mut c = thread_count.lock().await; - debug!( - "Downloaded and decrypted segment [{}/{} {:.2}%] {}", - num + (i * cpus) + 1, - total_segments, - ((*c + 1) as f64 / total_segments as f64) * 100f64, - segment.url - ); + let mut c = thread_count.lock().await; + debug!( + "Downloaded and decrypted segment [{}/{} {:.2}%] {}", + num + (i * cpus) + 1, + total_segments, + ((*c + 1) as f64 / total_segments as f64) * 100f64, + segment.url + ); - thread_sender.send((num as i32 + (i * cpus) as i32, buf))?; + thread_sender.send((num as i32 + (i * cpus) as i32, buf))?; - *c += 1; + *c += 1; + } + Ok(()) + }; + + + let result = download().await; + if result.is_err() { + after_download_sender.send((-1 as i32, vec![]))?; } - Ok(()) - }; + result + }); + } + // drop the sender already here so it does not outlive all download threads which are the only + // real consumers of it + drop(sender); - let result = download().await; - if result.is_err() { - after_download_sender.send((-1 as i32, vec![]))?; + // this is the main loop which writes the data. it uses a BTreeMap as a buffer as the write + // happens synchronized. the download consist of multiple segments. the map keys are representing + // the segment number and the values the corresponding bytes + let mut data_pos = 0; + let mut buf: BTreeMap> = BTreeMap::new(); + while let Some((pos, bytes)) = receiver.recv().await { + // if the position is lower than 0, an error occurred in the sending download thread + if pos < 0 { + break; } - result - }); - } - // drop the sender already here so it does not outlive all download threads which are the only - // real consumers of it - drop(sender); + if let Some(p) = &progress { + let progress_len = p.length().unwrap(); + let estimated_segment_len = (variant_data.bandwidth / 8) + * segments.get(pos as usize).unwrap().length.as_secs(); + let bytes_len = bytes.len() as u64; - // this is the main loop which writes the data. it uses a BTreeMap as a buffer as the write - // happens synchronized. the download consist of multiple segments. the map keys are representing - // the segment number and the values the corresponding bytes - let mut data_pos = 0; - let mut buf: BTreeMap> = BTreeMap::new(); - while let Some((pos, bytes)) = receiver.recv().await { - // if the position is lower than 0, an error occurred in the sending download thread - if pos < 0 { - break; + p.set_length(progress_len - estimated_segment_len + bytes_len); + p.inc(bytes_len) + } + + // check if the currently sent bytes are the next in the buffer. if so, write them directly + // to the target without first adding them to the buffer. + // if not, add them to the buffer + if data_pos == pos { + writer.write_all(bytes.borrow())?; + data_pos += 1; + } else { + buf.insert(pos, bytes); + } + // check if the buffer contains the next segment(s) + while let Some(b) = buf.remove(&data_pos) { + writer.write_all(b.borrow())?; + data_pos += 1; + } } - if let Some(p) = &progress { - let progress_len = p.length().unwrap(); - let estimated_segment_len = - (variant_data.bandwidth / 8) * segments.get(pos as usize).unwrap().length.as_secs(); - let bytes_len = bytes.len() as u64; - - p.set_length(progress_len - estimated_segment_len + bytes_len); - p.inc(bytes_len) + // if any error has occurred while downloading it gets returned here + while let Some(joined) = join_set.join_next().await { + joined?? } - // check if the currently sent bytes are the next in the buffer. if so, write them directly - // to the target without first adding them to the buffer. - // if not, add them to the buffer - if data_pos == pos { - writer.write_all(bytes.borrow())?; - data_pos += 1; - } else { - buf.insert(pos, bytes); - } - // check if the buffer contains the next segment(s) + // write the remaining buffer, if existent while let Some(b) = buf.remove(&data_pos) { writer.write_all(b.borrow())?; data_pos += 1; } - } - // if any error has occurred while downloading it gets returned here - while let Some(joined) = join_set.join_next().await { - joined?? - } + if !buf.is_empty() { + bail!( + "Download buffer is not empty. Remaining segments: {}", + buf.into_keys() + .map(|k| k.to_string()) + .collect::>() + .join(", ") + ) + } - // write the remaining buffer, if existent - while let Some(b) = buf.remove(&data_pos) { - writer.write_all(b.borrow())?; - data_pos += 1; + Ok(()) } - - if !buf.is_empty() { - bail!( - "Download buffer is not empty. Remaining segments: {}", - buf.into_keys() - .map(|k| k.to_string()) - .collect::>() - .join(", ") - ) - } - - Ok(()) } fn estimate_variant_file_size(variant_data: &VariantData, segments: &Vec) -> u64 { From 568bce00085cc301029d0b39c39cf15b97aa1d3d Mon Sep 17 00:00:00 2001 From: bytedream Date: Sun, 15 Oct 2023 22:39:53 +0200 Subject: [PATCH 2/2] Manually implement filename sanitizing to allow the usage of file separators --- Cargo.lock | 11 ----- crunchy-cli-core/Cargo.toml | 1 - crunchy-cli-core/src/archive/command.rs | 2 +- crunchy-cli-core/src/download/command.rs | 2 +- crunchy-cli-core/src/utils/format.rs | 55 ++++++++++++------------ crunchy-cli-core/src/utils/os.rs | 48 +++++++++++++++++++++ 6 files changed, 77 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 09cad3e..409a005 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -401,7 +401,6 @@ dependencies = [ "regex", "reqwest", "rustls-native-certs", - "sanitize-filename", "serde", "serde_json", "serde_plain", @@ -1517,16 +1516,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" -[[package]] -name = "sanitize-filename" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ed72fbaf78e6f2d41744923916966c4fbe3d7c74e3037a8ee482f1115572603" -dependencies = [ - "lazy_static", - "regex", -] - [[package]] name = "schannel" version = "0.1.22" diff --git a/crunchy-cli-core/Cargo.toml b/crunchy-cli-core/Cargo.toml index fc298f3..e38cb1a 100644 --- a/crunchy-cli-core/Cargo.toml +++ b/crunchy-cli-core/Cargo.toml @@ -28,7 +28,6 @@ log = { version = "0.4", features = ["std"] } num_cpus = "1.16" regex = "1.9" reqwest = { version = "0.11", default-features = false, features = ["socks"] } -sanitize-filename = "0.5" serde = "1.0" serde_json = "1.0" serde_plain = "1.0" diff --git a/crunchy-cli-core/src/archive/command.rs b/crunchy-cli-core/src/archive/command.rs index bb4794f..3f455ed 100644 --- a/crunchy-cli-core/src/archive/command.rs +++ b/crunchy-cli-core/src/archive/command.rs @@ -173,7 +173,7 @@ impl Execute for Archive { downloader.add_format(download_format) } - let formatted_path = format.format_path((&self.output).into(), true); + let formatted_path = format.format_path((&self.output).into()); let (path, changed) = free_file(formatted_path.clone()); if changed && self.skip_existing { diff --git a/crunchy-cli-core/src/download/command.rs b/crunchy-cli-core/src/download/command.rs index 760bf38..9c9ecb4 100644 --- a/crunchy-cli-core/src/download/command.rs +++ b/crunchy-cli-core/src/download/command.rs @@ -165,7 +165,7 @@ impl Execute for Download { let mut downloader = download_builder.clone().build(); downloader.add_format(download_format); - let formatted_path = format.format_path((&self.output).into(), true); + let formatted_path = format.format_path((&self.output).into()); let (path, changed) = free_file(formatted_path.clone()); if changed && self.skip_existing { diff --git a/crunchy-cli-core/src/utils/format.rs b/crunchy-cli-core/src/utils/format.rs index 618f7d5..f462263 100644 --- a/crunchy-cli-core/src/utils/format.rs +++ b/crunchy-cli-core/src/utils/format.rs @@ -1,6 +1,6 @@ use crate::utils::filter::real_dedup_vec; use crate::utils::log::tab_info; -use crate::utils::os::is_special_file; +use crate::utils::os::{is_special_file, sanitize}; use anyhow::Result; use chrono::Duration; use crunchyroll_rs::media::{Resolution, Stream, Subtitle, VariantData}; @@ -368,46 +368,45 @@ impl Format { } } - /// Formats the given string if it has specific pattern in it. It's possible to sanitize it which - /// removes characters which can cause failures if the output string is used as a file name. - pub fn format_path(&self, path: PathBuf, sanitize: bool) -> PathBuf { - let path = path - .to_string_lossy() - .to_string() - .replace("{title}", &self.title) + /// Formats the given string if it has specific pattern in it. It also sanitizes the filename. + pub fn format_path(&self, path: PathBuf) -> PathBuf { + let mut path = sanitize(path.to_string_lossy(), false); + path = path + .replace("{title}", &sanitize(&self.title, true)) .replace( "{audio}", - &self - .locales - .iter() - .map(|(a, _)| a.to_string()) - .collect::>() - .join("|"), + &sanitize( + self.locales + .iter() + .map(|(a, _)| a.to_string()) + .collect::>() + .join("|"), + true, + ), ) - .replace("{resolution}", &self.resolution.to_string()) - .replace("{series_id}", &self.series_id) - .replace("{series_name}", &self.series_name) - .replace("{season_id}", &self.season_id) - .replace("{season_name}", &self.season_title) + .replace("{resolution}", &sanitize(self.resolution.to_string(), true)) + .replace("{series_id}", &sanitize(&self.series_id, true)) + .replace("{series_name}", &sanitize(&self.series_name, true)) + .replace("{season_id}", &sanitize(&self.season_id, true)) + .replace("{season_name}", &sanitize(&self.season_title, true)) .replace( "{season_number}", - &format!("{:0>2}", self.season_number.to_string()), + &format!("{:0>2}", sanitize(self.season_number.to_string(), true)), ) - .replace("{episode_id}", &self.episode_id) + .replace("{episode_id}", &sanitize(&self.episode_id, true)) .replace( "{episode_number}", - &format!("{:0>2}", self.episode_number.to_string()), + &format!("{:0>2}", sanitize(&self.episode_number, true)), ) .replace( "{relative_episode_number}", - &self.relative_episode_number.unwrap_or_default().to_string(), + &sanitize( + self.relative_episode_number.unwrap_or_default().to_string(), + true, + ), ); - if sanitize { - PathBuf::from(sanitize_filename::sanitize(path)) - } else { - PathBuf::from(path) - } + PathBuf::from(path) } pub fn visual_output(&self, dst: &Path) { diff --git a/crunchy-cli-core/src/utils/os.rs b/crunchy-cli-core/src/utils/os.rs index f6d9ad0..1f76b90 100644 --- a/crunchy-cli-core/src/utils/os.rs +++ b/crunchy-cli-core/src/utils/os.rs @@ -1,4 +1,6 @@ use log::debug; +use regex::{Regex, RegexBuilder}; +use std::borrow::Cow; use std::io::ErrorKind; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; @@ -78,3 +80,49 @@ pub fn free_file(mut path: PathBuf) -> (PathBuf, bool) { pub fn is_special_file>(path: P) -> bool { path.as_ref().exists() && !path.as_ref().is_file() && !path.as_ref().is_dir() } + +lazy_static::lazy_static! { + static ref ILLEGAL_RE: Regex = Regex::new(r#"[\?<>:\*\|":]"#).unwrap(); + static ref CONTROL_RE: Regex = Regex::new(r"[\x00-\x1f\x80-\x9f]").unwrap(); + static ref RESERVED_RE: Regex = Regex::new(r"^\.+$").unwrap(); + static ref WINDOWS_RESERVED_RE: Regex = RegexBuilder::new(r"(?i)^(con|prn|aux|nul|com[0-9]|lpt[0-9])(\..*)?$") + .case_insensitive(true) + .build() + .unwrap(); + static ref WINDOWS_TRAILING_RE: Regex = Regex::new(r"[\. ]+$").unwrap(); +} + +/// Sanitizes a filename with the option to include/exclude the path separator from sanitizing. This +/// is based of the implementation of the +/// [`sanitize-filename`](https://crates.io/crates/sanitize-filename) crate. +pub fn sanitize>(path: S, include_path_separator: bool) -> String { + let path = Cow::from(path.as_ref()); + + let path = ILLEGAL_RE.replace_all(&path, ""); + let path = CONTROL_RE.replace_all(&path, ""); + let path = RESERVED_RE.replace(&path, ""); + + let collect = |name: String| { + if name.len() > 255 { + name[..255].to_string() + } else { + name + } + }; + + if cfg!(windows) { + let path = WINDOWS_RESERVED_RE.replace(&path, ""); + let path = WINDOWS_TRAILING_RE.replace(&path, ""); + let mut path = path.to_string(); + if include_path_separator { + path = path.replace(['\\', '/'], ""); + } + collect(path) + } else { + let mut path = path.to_string(); + if include_path_separator { + path = path.replace('/', ""); + } + collect(path) + } +}