Re-add download timeout

This commit is contained in:
bytedream 2024-01-14 20:36:00 +01:00
parent f3faa5bf94
commit 20f796f603
6 changed files with 104 additions and 63 deletions

View file

@ -204,7 +204,8 @@ impl Execute for Archive {
single_format_collection.full_visual_output(); single_format_collection.full_visual_output();
let download_builder = DownloadBuilder::new(ctx.crunchy.client()) let download_builder =
DownloadBuilder::new(ctx.client.clone(), ctx.rate_limiter.clone())
.default_subtitle(self.default_subtitle.clone()) .default_subtitle(self.default_subtitle.clone())
.download_fonts(self.include_fonts) .download_fonts(self.include_fonts)
.ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default()) .ffmpeg_preset(self.ffmpeg_preset.clone().unwrap_or_default())

View file

@ -216,7 +216,8 @@ impl Execute for Download {
single_format_collection.full_visual_output(); single_format_collection.full_visual_output();
let download_builder = DownloadBuilder::new(ctx.crunchy.client()) let download_builder =
DownloadBuilder::new(ctx.client.clone(), ctx.rate_limiter.clone())
.default_subtitle(self.subtitle.clone()) .default_subtitle(self.subtitle.clone())
.force_hardsub(self.force_hardsub) .force_hardsub(self.force_hardsub)
.output_format(if is_special_file(&self.output) || self.output == "-" { .output_format(if is_special_file(&self.output) || self.output == "-" {

View file

@ -8,7 +8,7 @@ use crunchyroll_rs::crunchyroll::CrunchyrollBuilder;
use crunchyroll_rs::error::Error; use crunchyroll_rs::error::Error;
use crunchyroll_rs::{Crunchyroll, Locale}; use crunchyroll_rs::{Crunchyroll, Locale};
use log::{debug, error, warn, LevelFilter}; use log::{debug, error, warn, LevelFilter};
use reqwest::Proxy; use reqwest::{Client, Proxy};
use std::{env, fs}; use std::{env, fs};
mod archive; mod archive;
@ -235,11 +235,51 @@ async fn execute_executor(executor: impl Execute, ctx: Context) {
} }
async fn create_ctx(cli: &mut Cli) -> Result<Context> { async fn create_ctx(cli: &mut Cli) -> Result<Context> {
let crunchy = crunchyroll_session(cli).await?; let client = {
Ok(Context { crunchy }) let mut builder = CrunchyrollBuilder::predefined_client_builder();
if let Some(p) = &cli.proxy {
builder = builder.proxy(p.clone())
}
if let Some(ua) = &cli.user_agent {
builder = builder.user_agent(ua)
} }
async fn crunchyroll_session(cli: &mut Cli) -> Result<Crunchyroll> { #[cfg(any(feature = "openssl-tls", feature = "openssl-tls-static"))]
let client = {
let mut builder = builder.use_native_tls().tls_built_in_root_certs(false);
for certificate in rustls_native_certs::load_native_certs().unwrap() {
builder = builder.add_root_certificate(
reqwest::Certificate::from_der(certificate.0.as_slice()).unwrap(),
)
}
builder.build().unwrap()
};
#[cfg(not(any(feature = "openssl-tls", feature = "openssl-tls-static")))]
let client = builder.build().unwrap();
client
};
let rate_limiter = cli
.speed_limit
.map(|l| RateLimiterService::new(l, client.clone()));
let crunchy = crunchyroll_session(cli, client.clone(), rate_limiter.clone()).await?;
Ok(Context {
crunchy,
client,
rate_limiter,
})
}
async fn crunchyroll_session(
cli: &mut Cli,
client: Client,
rate_limiter: Option<RateLimiterService>,
) -> Result<Crunchyroll> {
let supported_langs = vec![ let supported_langs = vec![
Locale::ar_ME, Locale::ar_ME,
Locale::de_DE, Locale::de_DE,
@ -273,33 +313,6 @@ async fn crunchyroll_session(cli: &mut Cli) -> Result<Crunchyroll> {
lang lang
}; };
let client = {
let mut builder = CrunchyrollBuilder::predefined_client_builder();
if let Some(p) = &cli.proxy {
builder = builder.proxy(p.clone())
}
if let Some(ua) = &cli.user_agent {
builder = builder.user_agent(ua)
}
#[cfg(any(feature = "openssl-tls", feature = "openssl-tls-static"))]
let client = {
let mut builder = builder.use_native_tls().tls_built_in_root_certs(false);
for certificate in rustls_native_certs::load_native_certs().unwrap() {
builder = builder.add_root_certificate(
reqwest::Certificate::from_der(certificate.0.as_slice()).unwrap(),
)
}
builder.build().unwrap()
};
#[cfg(not(any(feature = "openssl-tls", feature = "openssl-tls-static")))]
let client = builder.build().unwrap();
client
};
let mut builder = Crunchyroll::builder() let mut builder = Crunchyroll::builder()
.locale(locale) .locale(locale)
.client(client.clone()) .client(client.clone())
@ -308,8 +321,8 @@ async fn crunchyroll_session(cli: &mut Cli) -> Result<Crunchyroll> {
if let Command::Download(download) = &cli.command { if let Command::Download(download) = &cli.command {
builder = builder.preferred_audio_locale(download.audio.clone()) builder = builder.preferred_audio_locale(download.audio.clone())
} }
if let Some(speed_limit) = cli.speed_limit { if let Some(rate_limiter) = rate_limiter {
builder = builder.middleware(RateLimiterService::new(speed_limit, client)); builder = builder.middleware(rate_limiter)
} }
let root_login_methods_count = cli.login_method.credentials.is_some() as u8 let root_login_methods_count = cli.login_method.credentials.is_some() as u8

View file

@ -1,5 +1,9 @@
use crate::utils::rate_limit::RateLimiterService;
use crunchyroll_rs::Crunchyroll; use crunchyroll_rs::Crunchyroll;
use reqwest::Client;
pub struct Context { pub struct Context {
pub crunchy: Crunchyroll, pub crunchy: Crunchyroll,
pub client: Client,
pub rate_limiter: Option<RateLimiterService>,
} }

View file

@ -1,6 +1,7 @@
use crate::utils::ffmpeg::FFmpegPreset; use crate::utils::ffmpeg::FFmpegPreset;
use crate::utils::filter::real_dedup_vec; use crate::utils::filter::real_dedup_vec;
use crate::utils::os::{cache_dir, is_special_file, temp_directory, temp_named_pipe, tempfile}; use crate::utils::os::{cache_dir, is_special_file, temp_directory, temp_named_pipe, tempfile};
use crate::utils::rate_limit::RateLimiterService;
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use chrono::NaiveTime; use chrono::NaiveTime;
use crunchyroll_rs::media::{Subtitle, VariantData, VariantSegment}; use crunchyroll_rs::media::{Subtitle, VariantData, VariantSegment};
@ -26,6 +27,7 @@ use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::task::JoinSet; use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tower_service::Service;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum MergeBehavior { pub enum MergeBehavior {
@ -48,6 +50,7 @@ impl MergeBehavior {
#[derive(Clone, derive_setters::Setters)] #[derive(Clone, derive_setters::Setters)]
pub struct DownloadBuilder { pub struct DownloadBuilder {
client: Client, client: Client,
rate_limiter: Option<RateLimiterService>,
ffmpeg_preset: FFmpegPreset, ffmpeg_preset: FFmpegPreset,
default_subtitle: Option<Locale>, default_subtitle: Option<Locale>,
output_format: Option<String>, output_format: Option<String>,
@ -60,9 +63,10 @@ pub struct DownloadBuilder {
} }
impl DownloadBuilder { impl DownloadBuilder {
pub fn new(client: Client) -> DownloadBuilder { pub fn new(client: Client, rate_limiter: Option<RateLimiterService>) -> DownloadBuilder {
Self { Self {
client, client,
rate_limiter,
ffmpeg_preset: FFmpegPreset::default(), ffmpeg_preset: FFmpegPreset::default(),
default_subtitle: None, default_subtitle: None,
output_format: None, output_format: None,
@ -78,6 +82,7 @@ impl DownloadBuilder {
pub fn build(self) -> Downloader { pub fn build(self) -> Downloader {
Downloader { Downloader {
client: self.client, client: self.client,
rate_limiter: self.rate_limiter,
ffmpeg_preset: self.ffmpeg_preset, ffmpeg_preset: self.ffmpeg_preset,
default_subtitle: self.default_subtitle, default_subtitle: self.default_subtitle,
output_format: self.output_format, output_format: self.output_format,
@ -109,6 +114,7 @@ pub struct DownloadFormat {
pub struct Downloader { pub struct Downloader {
client: Client, client: Client,
rate_limiter: Option<RateLimiterService>,
ffmpeg_preset: FFmpegPreset, ffmpeg_preset: FFmpegPreset,
default_subtitle: Option<Locale>, default_subtitle: Option<Locale>,
@ -768,6 +774,8 @@ impl Downloader {
for num in 0..cpus { for num in 0..cpus {
let thread_sender = sender.clone(); let thread_sender = sender.clone();
let thread_segments = segs.remove(0); let thread_segments = segs.remove(0);
let thread_client = self.client.clone();
let mut thread_rate_limiter = self.rate_limiter.clone();
let thread_count = count.clone(); let thread_count = count.clone();
join_set.spawn(async move { join_set.spawn(async move {
let after_download_sender = thread_sender.clone(); let after_download_sender = thread_sender.clone();
@ -778,21 +786,34 @@ impl Downloader {
let download = || async move { let download = || async move {
for (i, segment) in thread_segments.into_iter().enumerate() { for (i, segment) in thread_segments.into_iter().enumerate() {
let mut retry_count = 0; let mut retry_count = 0;
let buf = loop { let mut buf = loop {
let mut buf = vec![]; let request = thread_client
match segment.write_to(&mut buf).await { .get(&segment.url)
Ok(_) => break buf, .timeout(Duration::from_secs(60));
Err(e) => { let response = if let Some(rate_limiter) = &mut thread_rate_limiter {
rate_limiter.call(request.build()?).await.map_err(anyhow::Error::new)
} else {
request.send().await.map_err(anyhow::Error::new)
};
let err = match response {
Ok(r) => match r.bytes().await {
Ok(b) => break b.to_vec(),
Err(e) => anyhow::Error::new(e)
}
Err(e) => e,
};
if retry_count == 5 { if retry_count == 5 {
bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), e) bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), err)
}
debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count)
}
} }
debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), err, 5 - retry_count);
retry_count += 1; retry_count += 1;
}; };
buf = VariantSegment::decrypt(&mut buf, segment.key)?.to_vec();
let mut c = thread_count.lock().await; let mut c = thread_count.lock().await;
debug!( debug!(
"Downloaded and decrypted segment [{}/{} {:.2}%] {}", "Downloaded and decrypted segment [{}/{} {:.2}%] {}",

View file

@ -9,6 +9,7 @@ use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tower_service::Service; use tower_service::Service;
#[derive(Clone)]
pub struct RateLimiterService { pub struct RateLimiterService {
client: Arc<Client>, client: Arc<Client>,
rate_limiter: Limiter, rate_limiter: Limiter,