From 0a26083232e1e835f333d0b4039653f27560adfc Mon Sep 17 00:00:00 2001 From: bytedream Date: Sun, 10 Dec 2023 14:27:05 +0100 Subject: [PATCH] Fix ffmpeg progress not working with fast encoder --- Cargo.lock | 1 + crunchy-cli-core/Cargo.toml | 1 + crunchy-cli-core/src/utils/download.rs | 70 +++++++++++++++++--------- 3 files changed, 49 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 037a4ff..919ba0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,6 +424,7 @@ dependencies = [ "sys-locale", "tempfile", "tokio", + "tokio-util", "tower-service", ] diff --git a/crunchy-cli-core/Cargo.toml b/crunchy-cli-core/Cargo.toml index 173c3ac..245c6ea 100644 --- a/crunchy-cli-core/Cargo.toml +++ b/crunchy-cli-core/Cargo.toml @@ -38,6 +38,7 @@ shlex = "1.2" sys-locale = "0.3" tempfile = "3.8" tokio = { version = "1.34", features = ["io-util", "macros", "net", "rt-multi-thread", "time"] } +tokio-util = "0.7" tower-service = "0.3" rustls-native-certs = { version = "0.6", optional = true } diff --git a/crunchy-cli-core/src/utils/download.rs b/crunchy-cli-core/src/utils/download.rs index 5e9bbce..706b539 100644 --- a/crunchy-cli-core/src/utils/download.rs +++ b/crunchy-cli-core/src/utils/download.rs @@ -19,9 +19,11 @@ use std::sync::Arc; use std::time::Duration; use tempfile::TempPath; use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader}; +use tokio::select; use tokio::sync::mpsc::unbounded_channel; use tokio::sync::Mutex; use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; #[derive(Clone, Debug)] pub enum MergeBehavior { @@ -476,25 +478,25 @@ impl Downloader { .stderr(Stdio::piped()) .args(command_args) .spawn()?; + let ffmpeg_progress_cancel = CancellationToken::new(); + let ffmpeg_progress_cancellation_token = ffmpeg_progress_cancel.clone(); let ffmpeg_progress = tokio::spawn(async move { ffmpeg_progress( max_frames as u64, fifo, format!("{:<1$}", "Generating output file", fmt_space + 1), + ffmpeg_progress_cancellation_token, ) .await }); let result = ffmpeg.wait_with_output()?; if !result.status.success() { + ffmpeg_progress.abort(); bail!("{}", String::from_utf8_lossy(result.stderr.as_slice())) } - ffmpeg_progress.abort(); - match ffmpeg_progress.await { - Ok(r) => Ok(r?), - Err(e) if e.is_cancelled() => Ok(()), - Err(e) => Err(anyhow::Error::from(e)), - } + ffmpeg_progress_cancel.cancel(); + Ok(ffmpeg_progress.await??) } async fn check_free_space( @@ -905,6 +907,7 @@ async fn ffmpeg_progress( total_frames: u64, stats: R, message: String, + cancellation_token: CancellationToken, ) -> Result<()> { let current_frame = Regex::new(r"frame=\s+(?P\d+)")?; @@ -926,25 +929,46 @@ async fn ffmpeg_progress( let reader = BufReader::new(stats); let mut lines = reader.lines(); - while let Some(line) = lines.next_line().await? { - let frame: u64 = current_frame - .captures(line.as_str()) - .unwrap() - .name("frame") - .unwrap() - .as_str() - .parse()?; + loop { + select! { + // when gracefully canceling this future, set the progress to 100% (finished). sometimes + // ffmpeg is too fast or already finished when the reading process of 'stats' starts + // which causes the progress to be stuck at 0% + _ = cancellation_token.cancelled() => { + if let Some(p) = &progress { + p.set_position(total_frames) + } + debug!( + "Processed frame [{}/{} 100%]", + total_frames, + total_frames + ); + return Ok(()) + } + line = lines.next_line() => { + let Some(line) = line? else { + break + }; + let frame: u64 = current_frame + .captures(line.as_str()) + .unwrap() + .name("frame") + .unwrap() + .as_str() + .parse()?; - if let Some(p) = &progress { - p.set_position(frame) + if let Some(p) = &progress { + p.set_position(frame) + } + + debug!( + "Processed frame [{}/{} {:.2}%]", + frame, + total_frames, + (frame as f64 / total_frames as f64) * 100f64 + ) + } } - - debug!( - "Processed frame [{}/{} {:.2}%]", - frame, - total_frames, - (frame as f64 / total_frames as f64) * 100f64 - ) } Ok(())