From 20f796f603e6c151154e9fed8d49caf42a433b40 Mon Sep 17 00:00:00 2001 From: bytedream Date: Sun, 14 Jan 2024 20:36:00 +0100 Subject: [PATCH] Re-add download timeout --- crunchy-cli-core/src/archive/command.rs | 19 +++--- crunchy-cli-core/src/download/command.rs | 23 +++---- crunchy-cli-core/src/lib.rs | 79 ++++++++++++++---------- crunchy-cli-core/src/utils/context.rs | 4 ++ crunchy-cli-core/src/utils/download.rs | 41 +++++++++--- crunchy-cli-core/src/utils/rate_limit.rs | 1 + 6 files changed, 104 insertions(+), 63 deletions(-) diff --git a/crunchy-cli-core/src/archive/command.rs b/crunchy-cli-core/src/archive/command.rs index 17f21a1..331374a 100644 --- a/crunchy-cli-core/src/archive/command.rs +++ b/crunchy-cli-core/src/archive/command.rs @@ -204,15 +204,16 @@ impl Execute for Archive { single_format_collection.full_visual_output(); - let download_builder = DownloadBuilder::new(ctx.crunchy.client()) - .default_subtitle(self.default_subtitle.clone()) - .download_fonts(self.include_fonts) - .ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default()) - .ffmpeg_threads(self.ffmpeg_threads) - .output_format(Some("matroska".to_string())) - .audio_sort(Some(self.audio.clone())) - .subtitle_sort(Some(self.subtitle.clone())) - .threads(self.threads); + let download_builder = + DownloadBuilder::new(ctx.client.clone(), ctx.rate_limiter.clone()) + .default_subtitle(self.default_subtitle.clone()) + .download_fonts(self.include_fonts) + .ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default()) + .ffmpeg_threads(self.ffmpeg_threads) + .output_format(Some("matroska".to_string())) + .audio_sort(Some(self.audio.clone())) + .subtitle_sort(Some(self.subtitle.clone())) + .threads(self.threads); for single_formats in single_format_collection.into_iter() { let (download_formats, mut format) = get_format(&self, &single_formats).await?; diff --git a/crunchy-cli-core/src/download/command.rs b/crunchy-cli-core/src/download/command.rs index 30fea44..f72923f 100644 --- a/crunchy-cli-core/src/download/command.rs +++ b/crunchy-cli-core/src/download/command.rs @@ -216,17 +216,18 @@ impl Execute for Download { single_format_collection.full_visual_output(); - let download_builder = DownloadBuilder::new(ctx.crunchy.client()) - .default_subtitle(self.subtitle.clone()) - .force_hardsub(self.force_hardsub) - .output_format(if is_special_file(&self.output) || self.output == "-" { - Some("mpegts".to_string()) - } else { - None - }) - .ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default()) - .ffmpeg_threads(self.ffmpeg_threads) - .threads(self.threads); + let download_builder = + DownloadBuilder::new(ctx.client.clone(), ctx.rate_limiter.clone()) + .default_subtitle(self.subtitle.clone()) + .force_hardsub(self.force_hardsub) + .output_format(if is_special_file(&self.output) || self.output == "-" { + Some("mpegts".to_string()) + } else { + None + }) + .ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default()) + .ffmpeg_threads(self.ffmpeg_threads) + .threads(self.threads); for mut single_formats in single_format_collection.into_iter() { // the vec contains always only one item diff --git a/crunchy-cli-core/src/lib.rs b/crunchy-cli-core/src/lib.rs index f1d659a..2d37aaa 100644 --- a/crunchy-cli-core/src/lib.rs +++ b/crunchy-cli-core/src/lib.rs @@ -8,7 +8,7 @@ use crunchyroll_rs::crunchyroll::CrunchyrollBuilder; use crunchyroll_rs::error::Error; use crunchyroll_rs::{Crunchyroll, Locale}; use log::{debug, error, warn, LevelFilter}; -use reqwest::Proxy; +use reqwest::{Client, Proxy}; use std::{env, fs}; mod archive; @@ -235,11 +235,51 @@ async fn execute_executor(executor: impl Execute, ctx: Context) { } async fn create_ctx(cli: &mut Cli) -> Result { - let crunchy = crunchyroll_session(cli).await?; - Ok(Context { crunchy }) + let client = { + let mut builder = CrunchyrollBuilder::predefined_client_builder(); + if let Some(p) = &cli.proxy { + builder = builder.proxy(p.clone()) + } + if let Some(ua) = &cli.user_agent { + builder = builder.user_agent(ua) + } + + #[cfg(any(feature = "openssl-tls", feature = "openssl-tls-static"))] + let client = { + let mut builder = builder.use_native_tls().tls_built_in_root_certs(false); + + for certificate in rustls_native_certs::load_native_certs().unwrap() { + builder = builder.add_root_certificate( + reqwest::Certificate::from_der(certificate.0.as_slice()).unwrap(), + ) + } + + builder.build().unwrap() + }; + #[cfg(not(any(feature = "openssl-tls", feature = "openssl-tls-static")))] + let client = builder.build().unwrap(); + + client + }; + + let rate_limiter = cli + .speed_limit + .map(|l| RateLimiterService::new(l, client.clone())); + + let crunchy = crunchyroll_session(cli, client.clone(), rate_limiter.clone()).await?; + + Ok(Context { + crunchy, + client, + rate_limiter, + }) } -async fn crunchyroll_session(cli: &mut Cli) -> Result { +async fn crunchyroll_session( + cli: &mut Cli, + client: Client, + rate_limiter: Option, +) -> Result { let supported_langs = vec![ Locale::ar_ME, Locale::de_DE, @@ -273,33 +313,6 @@ async fn crunchyroll_session(cli: &mut Cli) -> Result { lang }; - let client = { - let mut builder = CrunchyrollBuilder::predefined_client_builder(); - if let Some(p) = &cli.proxy { - builder = builder.proxy(p.clone()) - } - if let Some(ua) = &cli.user_agent { - builder = builder.user_agent(ua) - } - - #[cfg(any(feature = "openssl-tls", feature = "openssl-tls-static"))] - let client = { - let mut builder = builder.use_native_tls().tls_built_in_root_certs(false); - - for certificate in rustls_native_certs::load_native_certs().unwrap() { - builder = builder.add_root_certificate( - reqwest::Certificate::from_der(certificate.0.as_slice()).unwrap(), - ) - } - - builder.build().unwrap() - }; - #[cfg(not(any(feature = "openssl-tls", feature = "openssl-tls-static")))] - let client = builder.build().unwrap(); - - client - }; - let mut builder = Crunchyroll::builder() .locale(locale) .client(client.clone()) @@ -308,8 +321,8 @@ async fn crunchyroll_session(cli: &mut Cli) -> Result { if let Command::Download(download) = &cli.command { builder = builder.preferred_audio_locale(download.audio.clone()) } - if let Some(speed_limit) = cli.speed_limit { - builder = builder.middleware(RateLimiterService::new(speed_limit, client)); + if let Some(rate_limiter) = rate_limiter { + builder = builder.middleware(rate_limiter) } let root_login_methods_count = cli.login_method.credentials.is_some() as u8 diff --git a/crunchy-cli-core/src/utils/context.rs b/crunchy-cli-core/src/utils/context.rs index f8df024..693174d 100644 --- a/crunchy-cli-core/src/utils/context.rs +++ b/crunchy-cli-core/src/utils/context.rs @@ -1,5 +1,9 @@ +use crate::utils::rate_limit::RateLimiterService; use crunchyroll_rs::Crunchyroll; +use reqwest::Client; pub struct Context { pub crunchy: Crunchyroll, + pub client: Client, + pub rate_limiter: Option, } diff --git a/crunchy-cli-core/src/utils/download.rs b/crunchy-cli-core/src/utils/download.rs index e5b0444..df32ad7 100644 --- a/crunchy-cli-core/src/utils/download.rs +++ b/crunchy-cli-core/src/utils/download.rs @@ -1,6 +1,7 @@ use crate::utils::ffmpeg::FFmpegPreset; use crate::utils::filter::real_dedup_vec; use crate::utils::os::{cache_dir, is_special_file, temp_directory, temp_named_pipe, tempfile}; +use crate::utils::rate_limit::RateLimiterService; use anyhow::{bail, Result}; use chrono::NaiveTime; use crunchyroll_rs::media::{Subtitle, VariantData, VariantSegment}; @@ -26,6 +27,7 @@ use tokio::sync::mpsc::unbounded_channel; use tokio::sync::Mutex; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; +use tower_service::Service; #[derive(Clone, Debug)] pub enum MergeBehavior { @@ -48,6 +50,7 @@ impl MergeBehavior { #[derive(Clone, derive_setters::Setters)] pub struct DownloadBuilder { client: Client, + rate_limiter: Option, ffmpeg_preset: FFmpegPreset, default_subtitle: Option, output_format: Option, @@ -60,9 +63,10 @@ pub struct DownloadBuilder { } impl DownloadBuilder { - pub fn new(client: Client) -> DownloadBuilder { + pub fn new(client: Client, rate_limiter: Option) -> DownloadBuilder { Self { client, + rate_limiter, ffmpeg_preset: FFmpegPreset::default(), default_subtitle: None, output_format: None, @@ -78,6 +82,7 @@ impl DownloadBuilder { pub fn build(self) -> Downloader { Downloader { client: self.client, + rate_limiter: self.rate_limiter, ffmpeg_preset: self.ffmpeg_preset, default_subtitle: self.default_subtitle, output_format: self.output_format, @@ -109,6 +114,7 @@ pub struct DownloadFormat { pub struct Downloader { client: Client, + rate_limiter: Option, ffmpeg_preset: FFmpegPreset, default_subtitle: Option, @@ -768,6 +774,8 @@ impl Downloader { for num in 0..cpus { let thread_sender = sender.clone(); let thread_segments = segs.remove(0); + let thread_client = self.client.clone(); + let mut thread_rate_limiter = self.rate_limiter.clone(); let thread_count = count.clone(); join_set.spawn(async move { let after_download_sender = thread_sender.clone(); @@ -778,21 +786,34 @@ impl Downloader { let download = || async move { for (i, segment) in thread_segments.into_iter().enumerate() { let mut retry_count = 0; - let buf = loop { - let mut buf = vec![]; - match segment.write_to(&mut buf).await { - Ok(_) => break buf, - Err(e) => { - if retry_count == 5 { - bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), e) - } - debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count) + let mut buf = loop { + let request = thread_client + .get(&segment.url) + .timeout(Duration::from_secs(60)); + let response = if let Some(rate_limiter) = &mut thread_rate_limiter { + rate_limiter.call(request.build()?).await.map_err(anyhow::Error::new) + } else { + request.send().await.map_err(anyhow::Error::new) + }; + + let err = match response { + Ok(r) => match r.bytes().await { + Ok(b) => break b.to_vec(), + Err(e) => anyhow::Error::new(e) } + Err(e) => e, + }; + + if retry_count == 5 { + bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), err) } + debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), err, 5 - retry_count); retry_count += 1; }; + buf = VariantSegment::decrypt(&mut buf, segment.key)?.to_vec(); + let mut c = thread_count.lock().await; debug!( "Downloaded and decrypted segment [{}/{} {:.2}%] {}", diff --git a/crunchy-cli-core/src/utils/rate_limit.rs b/crunchy-cli-core/src/utils/rate_limit.rs index 16b22b3..bca0aaa 100644 --- a/crunchy-cli-core/src/utils/rate_limit.rs +++ b/crunchy-cli-core/src/utils/rate_limit.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use tower_service::Service; +#[derive(Clone)] pub struct RateLimiterService { client: Arc, rate_limiter: Limiter,