Merge remote-tracking branch 'origin/master' into feature/relative_sequence_number

# Conflicts:
#	crunchy-cli-core/src/utils/format.rs
This commit is contained in:
bytedream 2023-10-15 22:52:33 +02:00
commit 5d17bb1ac7
7 changed files with 253 additions and 196 deletions

11
Cargo.lock generated
View file

@ -401,7 +401,6 @@ dependencies = [
"regex", "regex",
"reqwest", "reqwest",
"rustls-native-certs", "rustls-native-certs",
"sanitize-filename",
"serde", "serde",
"serde_json", "serde_json",
"serde_plain", "serde_plain",
@ -1517,16 +1516,6 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741"
[[package]]
name = "sanitize-filename"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ed72fbaf78e6f2d41744923916966c4fbe3d7c74e3037a8ee482f1115572603"
dependencies = [
"lazy_static",
"regex",
]
[[package]] [[package]]
name = "schannel" name = "schannel"
version = "0.1.22" version = "0.1.22"

View file

@ -28,7 +28,6 @@ log = { version = "0.4", features = ["std"] }
num_cpus = "1.16" num_cpus = "1.16"
regex = "1.9" regex = "1.9"
reqwest = { version = "0.11", default-features = false, features = ["socks"] } reqwest = { version = "0.11", default-features = false, features = ["socks"] }
sanitize-filename = "0.5"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_plain = "1.0" serde_plain = "1.0"

View file

@ -100,6 +100,10 @@ pub struct Archive {
#[arg(short, long, default_value_t = false)] #[arg(short, long, default_value_t = false)]
pub(crate) yes: bool, pub(crate) yes: bool,
#[arg(help = "The number of threads used to download")]
#[arg(short, long, default_value_t = num_cpus::get())]
pub(crate) threads: usize,
#[arg(help = "Crunchyroll series url(s)")] #[arg(help = "Crunchyroll series url(s)")]
#[arg(required = true)] #[arg(required = true)]
pub(crate) urls: Vec<String>, pub(crate) urls: Vec<String>,
@ -160,7 +164,8 @@ impl Execute for Archive {
.ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default()) .ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default())
.output_format(Some("matroska".to_string())) .output_format(Some("matroska".to_string()))
.audio_sort(Some(self.audio.clone())) .audio_sort(Some(self.audio.clone()))
.subtitle_sort(Some(self.subtitle.clone())); .subtitle_sort(Some(self.subtitle.clone()))
.threads(self.threads);
for single_formats in single_format_collection.into_iter() { for single_formats in single_format_collection.into_iter() {
let (download_formats, mut format) = get_format(&self, &single_formats).await?; let (download_formats, mut format) = get_format(&self, &single_formats).await?;
@ -170,7 +175,7 @@ impl Execute for Archive {
downloader.add_format(download_format) downloader.add_format(download_format)
} }
let formatted_path = format.format_path((&self.output).into(), true); let formatted_path = format.format_path((&self.output).into());
let (path, changed) = free_file(formatted_path.clone()); let (path, changed) = free_file(formatted_path.clone());
if changed && self.skip_existing { if changed && self.skip_existing {

View file

@ -82,6 +82,10 @@ pub struct Download {
#[arg(long, default_value_t = false)] #[arg(long, default_value_t = false)]
pub(crate) force_hardsub: bool, pub(crate) force_hardsub: bool,
#[arg(help = "The number of threads used to download")]
#[arg(short, long, default_value_t = num_cpus::get())]
pub(crate) threads: usize,
#[arg(help = "Url(s) to Crunchyroll episodes or series")] #[arg(help = "Url(s) to Crunchyroll episodes or series")]
#[arg(required = true)] #[arg(required = true)]
pub(crate) urls: Vec<String>, pub(crate) urls: Vec<String>,
@ -151,7 +155,8 @@ impl Execute for Download {
} else { } else {
None None
}) })
.ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default()); .ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default())
.threads(self.threads);
for mut single_formats in single_format_collection.into_iter() { for mut single_formats in single_format_collection.into_iter() {
// the vec contains always only one item // the vec contains always only one item
@ -162,7 +167,7 @@ impl Execute for Download {
let mut downloader = download_builder.clone().build(); let mut downloader = download_builder.clone().build();
downloader.add_format(download_format); downloader.add_format(download_format);
let formatted_path = format.format_path((&self.output).into(), true); let formatted_path = format.format_path((&self.output).into());
let (path, changed) = free_file(formatted_path.clone()); let (path, changed) = free_file(formatted_path.clone());
if changed && self.skip_existing { if changed && self.skip_existing {

View file

@ -50,6 +50,7 @@ pub struct DownloadBuilder {
audio_sort: Option<Vec<Locale>>, audio_sort: Option<Vec<Locale>>,
subtitle_sort: Option<Vec<Locale>>, subtitle_sort: Option<Vec<Locale>>,
force_hardsub: bool, force_hardsub: bool,
threads: usize,
} }
impl DownloadBuilder { impl DownloadBuilder {
@ -61,6 +62,7 @@ impl DownloadBuilder {
audio_sort: None, audio_sort: None,
subtitle_sort: None, subtitle_sort: None,
force_hardsub: false, force_hardsub: false,
threads: num_cpus::get(),
} }
} }
@ -73,6 +75,7 @@ impl DownloadBuilder {
subtitle_sort: self.subtitle_sort, subtitle_sort: self.subtitle_sort,
force_hardsub: self.force_hardsub, force_hardsub: self.force_hardsub,
threads: self.threads,
formats: vec![], formats: vec![],
} }
@ -99,6 +102,7 @@ pub struct Downloader {
subtitle_sort: Option<Vec<Locale>>, subtitle_sort: Option<Vec<Locale>>,
force_hardsub: bool, force_hardsub: bool,
threads: usize,
formats: Vec<DownloadFormat>, formats: Vec<DownloadFormat>,
} }
@ -502,7 +506,8 @@ impl Downloader {
let tempfile = tempfile(".mp4")?; let tempfile = tempfile(".mp4")?;
let (mut file, path) = tempfile.into_parts(); let (mut file, path) = tempfile.into_parts();
download_segments(ctx, &mut file, message, variant_data).await?; self.download_segments(ctx, &mut file, message, variant_data)
.await?;
Ok(path) Ok(path)
} }
@ -516,7 +521,8 @@ impl Downloader {
let tempfile = tempfile(".m4a")?; let tempfile = tempfile(".m4a")?;
let (mut file, path) = tempfile.into_parts(); let (mut file, path) = tempfile.into_parts();
download_segments(ctx, &mut file, message, variant_data).await?; self.download_segments(ctx, &mut file, message, variant_data)
.await?;
Ok(path) Ok(path)
} }
@ -537,188 +543,189 @@ impl Downloader {
Ok(path) Ok(path)
} }
}
pub async fn download_segments( async fn download_segments(
ctx: &Context, &self,
writer: &mut impl Write, ctx: &Context,
message: String, writer: &mut impl Write,
variant_data: &VariantData, message: String,
) -> Result<()> { variant_data: &VariantData,
let segments = variant_data.segments().await?; ) -> Result<()> {
let total_segments = segments.len(); let segments = variant_data.segments().await?;
let total_segments = segments.len();
let client = Arc::new(ctx.crunchy.client()); let client = Arc::new(ctx.crunchy.client());
let count = Arc::new(Mutex::new(0)); let count = Arc::new(Mutex::new(0));
let progress = if log::max_level() == LevelFilter::Info { let progress = if log::max_level() == LevelFilter::Info {
let estimated_file_size = estimate_variant_file_size(variant_data, &segments); let estimated_file_size = estimate_variant_file_size(variant_data, &segments);
let progress = ProgressBar::new(estimated_file_size) let progress = ProgressBar::new(estimated_file_size)
.with_style( .with_style(
ProgressStyle::with_template( ProgressStyle::with_template(
":: {msg} {bytes:>10} {bytes_per_sec:>12} [{wide_bar}] {percent:>3}%", ":: {msg} {bytes:>10} {bytes_per_sec:>12} [{wide_bar}] {percent:>3}%",
)
.unwrap()
.progress_chars("##-"),
) )
.unwrap() .with_message(message)
.progress_chars("##-"), .with_finish(ProgressFinish::Abandon);
) Some(progress)
.with_message(message) } else {
.with_finish(ProgressFinish::Abandon); None
Some(progress) };
} else {
None
};
let cpus = num_cpus::get(); let cpus = self.threads;
let mut segs: Vec<Vec<VariantSegment>> = Vec::with_capacity(cpus); let mut segs: Vec<Vec<VariantSegment>> = Vec::with_capacity(cpus);
for _ in 0..cpus { for _ in 0..cpus {
segs.push(vec![]) segs.push(vec![])
} }
for (i, segment) in segments.clone().into_iter().enumerate() { for (i, segment) in segments.clone().into_iter().enumerate() {
segs[i - ((i / cpus) * cpus)].push(segment); segs[i - ((i / cpus) * cpus)].push(segment);
} }
let (sender, mut receiver) = unbounded_channel(); let (sender, mut receiver) = unbounded_channel();
let mut join_set: JoinSet<Result<()>> = JoinSet::new(); let mut join_set: JoinSet<Result<()>> = JoinSet::new();
for num in 0..cpus { for num in 0..cpus {
let thread_client = client.clone(); let thread_client = client.clone();
let thread_sender = sender.clone(); let thread_sender = sender.clone();
let thread_segments = segs.remove(0); let thread_segments = segs.remove(0);
let thread_count = count.clone(); let thread_count = count.clone();
join_set.spawn(async move { join_set.spawn(async move {
let after_download_sender = thread_sender.clone(); let after_download_sender = thread_sender.clone();
// the download process is encapsulated in its own function. this is done to easily // the download process is encapsulated in its own function. this is done to easily
// catch errors which get returned with `...?` and `bail!(...)` and that the thread // catch errors which get returned with `...?` and `bail!(...)` and that the thread
// itself can report that an error has occurred // itself can report that an error has occurred
let download = || async move { let download = || async move {
for (i, segment) in thread_segments.into_iter().enumerate() { for (i, segment) in thread_segments.into_iter().enumerate() {
let mut retry_count = 0; let mut retry_count = 0;
let mut buf = loop { let mut buf = loop {
let request = thread_client let request = thread_client
.get(&segment.url) .get(&segment.url)
.timeout(Duration::from_secs(60)) .timeout(Duration::from_secs(60))
.send(); .send();
let response = match request.await { let response = match request.await {
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
if retry_count == 5 {
bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), e)
}
debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count);
continue
}
};
match response.bytes().await {
Ok(b) => break b.to_vec(),
Err(e) => {
if e.is_body() {
if retry_count == 5 { if retry_count == 5 {
bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), e) bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), e)
} }
debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count) debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count);
} else { continue
bail!("{}", e) }
};
match response.bytes().await {
Ok(b) => break b.to_vec(),
Err(e) => {
if e.is_body() {
if retry_count == 5 {
bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), e)
}
debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count)
} else {
bail!("{}", e)
}
} }
} }
}
retry_count += 1; retry_count += 1;
}; };
buf = VariantSegment::decrypt(buf.borrow_mut(), segment.key)?.to_vec(); buf = VariantSegment::decrypt(buf.borrow_mut(), segment.key)?.to_vec();
let mut c = thread_count.lock().await; let mut c = thread_count.lock().await;
debug!( debug!(
"Downloaded and decrypted segment [{}/{} {:.2}%] {}", "Downloaded and decrypted segment [{}/{} {:.2}%] {}",
num + (i * cpus) + 1, num + (i * cpus) + 1,
total_segments, total_segments,
((*c + 1) as f64 / total_segments as f64) * 100f64, ((*c + 1) as f64 / total_segments as f64) * 100f64,
segment.url segment.url
); );
thread_sender.send((num as i32 + (i * cpus) as i32, buf))?; thread_sender.send((num as i32 + (i * cpus) as i32, buf))?;
*c += 1; *c += 1;
}
Ok(())
};
let result = download().await;
if result.is_err() {
after_download_sender.send((-1 as i32, vec![]))?;
} }
Ok(())
};
result
});
}
// drop the sender already here so it does not outlive all download threads which are the only
// real consumers of it
drop(sender);
let result = download().await; // this is the main loop which writes the data. it uses a BTreeMap as a buffer as the write
if result.is_err() { // happens synchronized. the download consist of multiple segments. the map keys are representing
after_download_sender.send((-1 as i32, vec![]))?; // the segment number and the values the corresponding bytes
let mut data_pos = 0;
let mut buf: BTreeMap<i32, Vec<u8>> = BTreeMap::new();
while let Some((pos, bytes)) = receiver.recv().await {
// if the position is lower than 0, an error occurred in the sending download thread
if pos < 0 {
break;
} }
result if let Some(p) = &progress {
}); let progress_len = p.length().unwrap();
} let estimated_segment_len = (variant_data.bandwidth / 8)
// drop the sender already here so it does not outlive all download threads which are the only * segments.get(pos as usize).unwrap().length.as_secs();
// real consumers of it let bytes_len = bytes.len() as u64;
drop(sender);
// this is the main loop which writes the data. it uses a BTreeMap as a buffer as the write p.set_length(progress_len - estimated_segment_len + bytes_len);
// happens synchronized. the download consist of multiple segments. the map keys are representing p.inc(bytes_len)
// the segment number and the values the corresponding bytes }
let mut data_pos = 0;
let mut buf: BTreeMap<i32, Vec<u8>> = BTreeMap::new(); // check if the currently sent bytes are the next in the buffer. if so, write them directly
while let Some((pos, bytes)) = receiver.recv().await { // to the target without first adding them to the buffer.
// if the position is lower than 0, an error occurred in the sending download thread // if not, add them to the buffer
if pos < 0 { if data_pos == pos {
break; writer.write_all(bytes.borrow())?;
data_pos += 1;
} else {
buf.insert(pos, bytes);
}
// check if the buffer contains the next segment(s)
while let Some(b) = buf.remove(&data_pos) {
writer.write_all(b.borrow())?;
data_pos += 1;
}
} }
if let Some(p) = &progress { // if any error has occurred while downloading it gets returned here
let progress_len = p.length().unwrap(); while let Some(joined) = join_set.join_next().await {
let estimated_segment_len = joined??
(variant_data.bandwidth / 8) * segments.get(pos as usize).unwrap().length.as_secs();
let bytes_len = bytes.len() as u64;
p.set_length(progress_len - estimated_segment_len + bytes_len);
p.inc(bytes_len)
} }
// check if the currently sent bytes are the next in the buffer. if so, write them directly // write the remaining buffer, if existent
// to the target without first adding them to the buffer.
// if not, add them to the buffer
if data_pos == pos {
writer.write_all(bytes.borrow())?;
data_pos += 1;
} else {
buf.insert(pos, bytes);
}
// check if the buffer contains the next segment(s)
while let Some(b) = buf.remove(&data_pos) { while let Some(b) = buf.remove(&data_pos) {
writer.write_all(b.borrow())?; writer.write_all(b.borrow())?;
data_pos += 1; data_pos += 1;
} }
}
// if any error has occurred while downloading it gets returned here if !buf.is_empty() {
while let Some(joined) = join_set.join_next().await { bail!(
joined?? "Download buffer is not empty. Remaining segments: {}",
} buf.into_keys()
.map(|k| k.to_string())
.collect::<Vec<String>>()
.join(", ")
)
}
// write the remaining buffer, if existent Ok(())
while let Some(b) = buf.remove(&data_pos) {
writer.write_all(b.borrow())?;
data_pos += 1;
} }
if !buf.is_empty() {
bail!(
"Download buffer is not empty. Remaining segments: {}",
buf.into_keys()
.map(|k| k.to_string())
.collect::<Vec<String>>()
.join(", ")
)
}
Ok(())
} }
fn estimate_variant_file_size(variant_data: &VariantData, segments: &Vec<VariantSegment>) -> u64 { fn estimate_variant_file_size(variant_data: &VariantData, segments: &Vec<VariantSegment>) -> u64 {

View file

@ -1,6 +1,6 @@
use crate::utils::filter::real_dedup_vec; use crate::utils::filter::real_dedup_vec;
use crate::utils::log::tab_info; use crate::utils::log::tab_info;
use crate::utils::os::is_special_file; use crate::utils::os::{is_special_file, sanitize};
use anyhow::Result; use anyhow::Result;
use chrono::Duration; use chrono::Duration;
use crunchyroll_rs::media::{Resolution, Stream, Subtitle, VariantData}; use crunchyroll_rs::media::{Resolution, Stream, Subtitle, VariantData};
@ -376,54 +376,58 @@ impl Format {
} }
} }
/// Formats the given string if it has specific pattern in it. It's possible to sanitize it which /// Formats the given string if it has specific pattern in it. It also sanitizes the filename.
/// removes characters which can cause failures if the output string is used as a file name. pub fn format_path(&self, path: PathBuf) -> PathBuf {
pub fn format_path(&self, path: PathBuf, sanitize: bool) -> PathBuf { let mut path = sanitize(path.to_string_lossy(), false);
let path = path path = path
.to_string_lossy() .replace("{title}", &sanitize(&self.title, true))
.to_string()
.replace("{title}", &self.title)
.replace( .replace(
"{audio}", "{audio}",
&self &sanitize(
.locales self.locales
.iter() .iter()
.map(|(a, _)| a.to_string()) .map(|(a, _)| a.to_string())
.collect::<Vec<String>>() .collect::<Vec<String>>()
.join("|"), .join("|"),
true,
),
) )
.replace("{resolution}", &self.resolution.to_string()) .replace("{resolution}", &sanitize(self.resolution.to_string(), true))
.replace("{series_id}", &self.series_id) .replace("{series_id}", &sanitize(&self.series_id, true))
.replace("{series_name}", &self.series_name) .replace("{series_name}", &sanitize(&self.series_name, true))
.replace("{season_id}", &self.season_id) .replace("{season_id}", &sanitize(&self.season_id, true))
.replace("{season_name}", &self.season_title) .replace("{season_name}", &sanitize(&self.season_title, true))
.replace( .replace(
"{season_number}", "{season_number}",
&format!("{:0>2}", self.season_number.to_string()), &format!("{:0>2}", sanitize(self.season_number.to_string(), true)),
) )
.replace("{episode_id}", &self.episode_id) .replace("{episode_id}", &sanitize(&self.episode_id, true))
.replace( .replace(
"{episode_number}", "{episode_number}",
&format!("{:0>2}", self.episode_number.to_string()), &format!("{:0>2}", sanitize(&self.episode_number, true)),
) )
.replace( .replace(
"{relative_episode_number}", "{relative_episode_number}",
&self.relative_episode_number.unwrap_or_default().to_string(), &sanitize(
self.relative_episode_number.unwrap_or_default().to_string(),
true,
),
)
.replace(
"{sequence_number}",
&sanitize(self.sequence_number.to_string(), true),
) )
.replace("{sequence_number}", &self.sequence_number.to_string())
.replace( .replace(
"{relative_sequence_number}", "{relative_sequence_number}",
&self &sanitize(
.relative_sequence_number self.relative_sequence_number
.unwrap_or_default() .unwrap_or_default()
.to_string(), .to_string(),
true,
),
); );
if sanitize { PathBuf::from(path)
PathBuf::from(sanitize_filename::sanitize(path))
} else {
PathBuf::from(path)
}
} }
pub fn visual_output(&self, dst: &Path) { pub fn visual_output(&self, dst: &Path) {

View file

@ -1,4 +1,6 @@
use log::debug; use log::debug;
use regex::{Regex, RegexBuilder};
use std::borrow::Cow;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
@ -78,3 +80,49 @@ pub fn free_file(mut path: PathBuf) -> (PathBuf, bool) {
pub fn is_special_file<P: AsRef<Path>>(path: P) -> bool { pub fn is_special_file<P: AsRef<Path>>(path: P) -> bool {
path.as_ref().exists() && !path.as_ref().is_file() && !path.as_ref().is_dir() path.as_ref().exists() && !path.as_ref().is_file() && !path.as_ref().is_dir()
} }
lazy_static::lazy_static! {
static ref ILLEGAL_RE: Regex = Regex::new(r#"[\?<>:\*\|":]"#).unwrap();
static ref CONTROL_RE: Regex = Regex::new(r"[\x00-\x1f\x80-\x9f]").unwrap();
static ref RESERVED_RE: Regex = Regex::new(r"^\.+$").unwrap();
static ref WINDOWS_RESERVED_RE: Regex = RegexBuilder::new(r"(?i)^(con|prn|aux|nul|com[0-9]|lpt[0-9])(\..*)?$")
.case_insensitive(true)
.build()
.unwrap();
static ref WINDOWS_TRAILING_RE: Regex = Regex::new(r"[\. ]+$").unwrap();
}
/// Sanitizes a filename with the option to include/exclude the path separator from sanitizing. This
/// is based of the implementation of the
/// [`sanitize-filename`](https://crates.io/crates/sanitize-filename) crate.
pub fn sanitize<S: AsRef<str>>(path: S, include_path_separator: bool) -> String {
let path = Cow::from(path.as_ref());
let path = ILLEGAL_RE.replace_all(&path, "");
let path = CONTROL_RE.replace_all(&path, "");
let path = RESERVED_RE.replace(&path, "");
let collect = |name: String| {
if name.len() > 255 {
name[..255].to_string()
} else {
name
}
};
if cfg!(windows) {
let path = WINDOWS_RESERVED_RE.replace(&path, "");
let path = WINDOWS_TRAILING_RE.replace(&path, "");
let mut path = path.to_string();
if include_path_separator {
path = path.replace(['\\', '/'], "");
}
collect(path)
} else {
let mut path = path.to_string();
if include_path_separator {
path = path.replace('/', "");
}
collect(path)
}
}