use crate::utils::context::Context; use crate::utils::ffmpeg::FFmpegPreset; use crate::utils::log::progress; use crate::utils::os::tempfile; use anyhow::{bail, Result}; use chrono::NaiveTime; use crunchyroll_rs::media::{Subtitle, VariantData, VariantSegment}; use crunchyroll_rs::Locale; use indicatif::{ProgressBar, ProgressFinish, ProgressStyle}; use log::{debug, LevelFilter}; use regex::Regex; use std::borrow::Borrow; use std::borrow::BorrowMut; use std::collections::BTreeMap; use std::io::Write; use std::path::Path; use std::process::{Command, Stdio}; use std::sync::{mpsc, Arc, Mutex}; use std::time::Duration; use tempfile::TempPath; use tokio::task::JoinSet; #[derive(Clone, Debug)] pub enum MergeBehavior { Video, Audio, Auto, } impl MergeBehavior { pub fn parse(s: &str) -> Result { Ok(match s.to_lowercase().as_str() { "video" => MergeBehavior::Video, "audio" => MergeBehavior::Audio, "auto" => MergeBehavior::Auto, _ => return Err(format!("'{}' is not a valid merge behavior", s)), }) } } #[derive(Clone, derive_setters::Setters)] pub struct DownloadBuilder { ffmpeg_preset: FFmpegPreset, default_subtitle: Option, output_format: Option, audio_sort: Option>, subtitle_sort: Option>, } impl DownloadBuilder { pub fn new() -> DownloadBuilder { Self { ffmpeg_preset: FFmpegPreset::default(), default_subtitle: None, output_format: None, audio_sort: None, subtitle_sort: None, } } pub fn build(self) -> Downloader { Downloader { ffmpeg_preset: self.ffmpeg_preset, default_subtitle: self.default_subtitle, output_format: self.output_format, audio_sort: self.audio_sort, subtitle_sort: self.subtitle_sort, formats: vec![], } } } struct FFmpegMeta { path: TempPath, language: Locale, title: String, } pub struct DownloadFormat { pub video: (VariantData, Locale), pub audios: Vec<(VariantData, Locale)>, pub subtitles: Vec, } pub struct Downloader { ffmpeg_preset: FFmpegPreset, default_subtitle: Option, output_format: Option, audio_sort: Option>, subtitle_sort: Option>, formats: Vec, } impl Downloader { pub fn add_format(&mut self, format: DownloadFormat) { self.formats.push(format); } pub async fn download(mut self, ctx: &Context, dst: &Path) -> Result<()> { if let Some(audio_sort_locales) = &self.audio_sort { self.formats.sort_by(|a, b| { audio_sort_locales .iter() .position(|l| l == &a.video.1) .cmp(&audio_sort_locales.iter().position(|l| l == &b.video.1)) }); } for format in self.formats.iter_mut() { if let Some(audio_sort_locales) = &self.audio_sort { format.audios.sort_by(|(_, a), (_, b)| { audio_sort_locales .iter() .position(|l| l == a) .cmp(&audio_sort_locales.iter().position(|l| l == b)) }) } if let Some(subtitle_sort) = &self.subtitle_sort { format.subtitles.sort_by(|a, b| { subtitle_sort .iter() .position(|l| l == &a.locale) .cmp(&subtitle_sort.iter().position(|l| l == &b.locale)) }) } } let mut videos = vec![]; let mut audios = vec![]; let mut subtitles = vec![]; for (i, format) in self.formats.iter().enumerate() { let fmt_space = format .audios .iter() .map(|(_, locale)| format!("Downloading {} audio", locale).len()) .max() .unwrap(); let video_path = self .download_video( ctx, &format.video.0, format!("{:<1$}", format!("Downloading video #{}", i + 1), fmt_space), ) .await?; for (variant_data, locale) in format.audios.iter() { let audio_path = self .download_audio( ctx, variant_data, format!("{:<1$}", format!("Downloading {} audio", locale), fmt_space), ) .await?; audios.push(FFmpegMeta { path: audio_path, language: locale.clone(), title: if i == 0 { locale.to_human_readable() } else { format!("{} [Video: #{}]", locale.to_human_readable(), i + 1) }, }) } let len = get_video_length(&video_path)?; for subtitle in format.subtitles.iter() { let subtitle_path = self.download_subtitle(subtitle.clone(), len).await?; subtitles.push(FFmpegMeta { path: subtitle_path, language: subtitle.locale.clone(), title: if i == 0 { subtitle.locale.to_human_readable() } else { format!( "{} [Video: #{}]", subtitle.locale.to_human_readable(), i + 1 ) }, }) } videos.push(FFmpegMeta { path: video_path, language: format.video.1.clone(), title: if self.formats.len() == 1 { "Default".to_string() } else { format!("#{}", i + 1) }, }); } let mut input = vec![]; let mut maps = vec![]; let mut metadata = vec![]; for (i, meta) in videos.iter().enumerate() { input.extend(["-i".to_string(), meta.path.to_string_lossy().to_string()]); maps.extend(["-map".to_string(), i.to_string()]); metadata.extend([ format!("-metadata:s:v:{}", i), format!("title={}", meta.title), ]); // the empty language metadata is created to avoid that metadata from the original track // is copied metadata.extend([format!("-metadata:s:v:{}", i), format!("language=")]) } for (i, meta) in audios.iter().enumerate() { input.extend(["-i".to_string(), meta.path.to_string_lossy().to_string()]); maps.extend(["-map".to_string(), (i + videos.len()).to_string()]); metadata.extend([ format!("-metadata:s:a:{}", i), format!("language={}", meta.language), ]); metadata.extend([ format!("-metadata:s:a:{}", i), format!("title={}", meta.title), ]); } for (i, meta) in subtitles.iter().enumerate() { input.extend(["-i".to_string(), meta.path.to_string_lossy().to_string()]); maps.extend([ "-map".to_string(), (i + videos.len() + audios.len()).to_string(), ]); metadata.extend([ format!("-metadata:s:s:{}", i), format!("language={}", meta.language), ]); metadata.extend([ format!("-metadata:s:s:{}", i), format!("title={}", meta.title), ]); } let (input_presets, mut output_presets) = self.ffmpeg_preset.into_input_output_args(); let mut command_args = vec!["-y".to_string(), "-hide_banner".to_string()]; command_args.extend(input_presets); command_args.extend(input); command_args.extend(maps); command_args.extend(metadata); // set default subtitle if let Some(default_subtitle) = self.default_subtitle { if let Some(position) = subtitles .iter() .position(|m| m.language == default_subtitle) { match dst.extension().unwrap_or_default().to_str().unwrap() { "mp4" => output_presets.extend([ "-movflags".to_string(), "faststart".to_string(), "-c:s".to_string(), "mov_text".to_string(), format!("-disposition:s:s:{}", position), "forced".to_string(), ]), "mkv" => output_presets.extend([ format!("-disposition:s:s:{}", position), "forced".to_string(), ]), _ => { // remove '-c:v copy' and '-c:a copy' from output presets as its causes issues with // burning subs into the video let mut last = String::new(); let mut remove_count = 0; for (i, s) in output_presets.clone().iter().enumerate() { if (last == "-c:v" || last == "-c:a") && s == "copy" { // remove last output_presets.remove(i - remove_count - 1); remove_count += 1; output_presets.remove(i - remove_count); remove_count += 1; } last = s.clone(); } output_presets.extend([ "-vf".to_string(), format!( "subtitles={}", subtitles.get(position).unwrap().path.to_str().unwrap() ), ]) } } } if let Some(position) = subtitles .iter() .position(|meta| meta.language == default_subtitle) { command_args.extend([ format!("-disposition:s:s:{}", position), "forced".to_string(), ]) } } command_args.extend(output_presets); if let Some(output_format) = self.output_format { command_args.extend(["-f".to_string(), output_format]); } command_args.push(dst.to_str().unwrap().to_string()); debug!("ffmpeg {}", command_args.join(" ")); // create parent directory if it does not exist if let Some(parent) = dst.parent() { if !parent.exists() { std::fs::create_dir_all(parent)? } } let progress_handler = progress!("Generating output file"); let ffmpeg = Command::new("ffmpeg") // pass ffmpeg stdout to real stdout only if output file is stdout .stdout(if dst.to_str().unwrap() == "-" { Stdio::inherit() } else { Stdio::null() }) .stderr(Stdio::piped()) .args(command_args) .output()?; if !ffmpeg.status.success() { bail!("{}", String::from_utf8_lossy(ffmpeg.stderr.as_slice())) } progress_handler.stop("Output file generated"); Ok(()) } async fn download_video( &self, ctx: &Context, variant_data: &VariantData, message: String, ) -> Result { let tempfile = tempfile(".mp4")?; let (mut file, path) = tempfile.into_parts(); download_segments(ctx, &mut file, Some(message), variant_data).await?; Ok(path) } async fn download_audio( &self, ctx: &Context, variant_data: &VariantData, message: String, ) -> Result { let tempfile = tempfile(".m4a")?; let (mut file, path) = tempfile.into_parts(); download_segments(ctx, &mut file, Some(message), variant_data).await?; Ok(path) } async fn download_subtitle( &self, subtitle: Subtitle, max_length: NaiveTime, ) -> Result { let tempfile = tempfile(".ass")?; let (mut file, path) = tempfile.into_parts(); let mut buf = vec![]; subtitle.write_to(&mut buf).await?; fix_subtitle_look_and_feel(&mut buf); fix_subtitle_length(&mut buf, max_length); file.write_all(buf.as_slice())?; Ok(path) } } pub async fn download_segments( ctx: &Context, writer: &mut impl Write, message: Option, variant_data: &VariantData, ) -> Result<()> { let segments = variant_data.segments().await?; let total_segments = segments.len(); let client = Arc::new(ctx.crunchy.client()); let count = Arc::new(Mutex::new(0)); let progress = if log::max_level() == LevelFilter::Info { let estimated_file_size = (variant_data.bandwidth / 8) * segments.iter().map(|s| s.length.as_secs()).sum::(); let progress = ProgressBar::new(estimated_file_size) .with_style( ProgressStyle::with_template( ":: {msg}{bytes:>10} {bytes_per_sec:>12} [{wide_bar}] {percent:>3}%", ) .unwrap() .progress_chars("##-"), ) .with_message(message.map(|m| m + " ").unwrap_or_default()) .with_finish(ProgressFinish::Abandon); Some(progress) } else { None }; let cpus = num_cpus::get(); let mut segs: Vec> = Vec::with_capacity(cpus); for _ in 0..cpus { segs.push(vec![]) } for (i, segment) in segments.clone().into_iter().enumerate() { segs[i - ((i / cpus) * cpus)].push(segment); } let (sender, receiver) = mpsc::channel(); let mut join_set: JoinSet> = JoinSet::new(); for num in 0..cpus { let thread_client = client.clone(); let thread_sender = sender.clone(); let thread_segments = segs.remove(0); let thread_count = count.clone(); join_set.spawn(async move { let after_download_sender = thread_sender.clone(); // the download process is encapsulated in its own function. this is done to easily // catch errors which get returned with `...?` and `bail!(...)` and that the thread // itself can report that an error has occurred let download = || async move { for (i, segment) in thread_segments.into_iter().enumerate() { let mut retry_count = 0; let mut buf = loop { let response = thread_client .get(&segment.url) .timeout(Duration::from_secs(60)) .send() .await?; match response.bytes().await { Ok(b) => break b.to_vec(), Err(e) => { if e.is_body() { if retry_count == 5 { bail!("Max retry count reached ({}), multiple errors occurred while receiving segment {}: {}", retry_count, num + (i * cpus), e) } debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count) } else { bail!("{}", e) } } } retry_count += 1; }; buf = VariantSegment::decrypt(buf.borrow_mut(), segment.key)?.to_vec(); let mut c = thread_count.lock().unwrap(); debug!( "Downloaded and decrypted segment [{}/{} {:.2}%] {}", num + (i * cpus) + 1, total_segments, ((*c + 1) as f64 / total_segments as f64) * 100f64, segment.url ); thread_sender.send((num as i32 + (i * cpus) as i32, buf))?; *c += 1; } Ok(()) }; let result = download().await; if result.is_err() { after_download_sender.send((-1 as i32, vec![]))?; } result }); } // drop the sender already here so it does not outlive all download threads which are the only // real consumers of it drop(sender); // this is the main loop which writes the data. it uses a BTreeMap as a buffer as the write // happens synchronized. the download consist of multiple segments. the map keys are representing // the segment number and the values the corresponding bytes let mut data_pos = 0; let mut buf: BTreeMap> = BTreeMap::new(); for (pos, bytes) in receiver.iter() { // if the position is lower than 0, an error occurred in the sending download thread if pos < 0 { break; } if let Some(p) = &progress { let progress_len = p.length().unwrap(); let estimated_segment_len = (variant_data.bandwidth / 8) * segments.get(pos as usize).unwrap().length.as_secs(); let bytes_len = bytes.len() as u64; p.set_length(progress_len - estimated_segment_len + bytes_len); p.inc(bytes_len) } // check if the currently sent bytes are the next in the buffer. if so, write them directly // to the target without first adding them to the buffer. // if not, add them to the buffer if data_pos == pos { writer.write_all(bytes.borrow())?; data_pos += 1; } else { buf.insert(pos, bytes); } // check if the buffer contains the next segment(s) while let Some(b) = buf.remove(&data_pos) { writer.write_all(b.borrow())?; data_pos += 1; } } // if any error has occurred while downloading it gets returned here while let Some(joined) = join_set.join_next().await { joined?? } // write the remaining buffer, if existent while let Some(b) = buf.remove(&data_pos) { writer.write_all(b.borrow())?; data_pos += 1; } if !buf.is_empty() { bail!( "Download buffer is not empty. Remaining segments: {}", buf.into_keys() .map(|k| k.to_string()) .collect::>() .join(", ") ) } Ok(()) } /// Add `ScaledBorderAndShadows: yes` to subtitles; without it they look very messy on some video /// players. See [crunchy-labs/crunchy-cli#66](https://github.com/crunchy-labs/crunchy-cli/issues/66) /// for more information. fn fix_subtitle_look_and_feel(raw: &mut Vec) { let mut script_info = false; let mut new = String::new(); for line in String::from_utf8_lossy(raw.as_slice()).split('\n') { if line.trim().starts_with('[') && script_info { new.push_str("ScaledBorderAndShadow: yes\n"); script_info = false } else if line.trim() == "[Script Info]" { script_info = true } new.push_str(line); new.push('\n') } *raw = new.into_bytes() } /// Get the length of a video. This is required because sometimes subtitles have an unnecessary entry /// long after the actual video ends with artificially extends the video length on some video players. /// To prevent this, the video length must be hard set. See /// [crunchy-labs/crunchy-cli#32](https://github.com/crunchy-labs/crunchy-cli/issues/32) for more /// information. pub fn get_video_length(path: &Path) -> Result { let video_length = Regex::new(r"Duration:\s(?P