mirror of
https://github.com/crunchy-labs/crunchy-cli.git
synced 2026-01-21 04:02:00 -06:00
Fix ffmpeg progress not working with fast encoder
This commit is contained in:
parent
8613ea80cc
commit
0a26083232
3 changed files with 49 additions and 23 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -424,6 +424,7 @@ dependencies = [
|
||||||
"sys-locale",
|
"sys-locale",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tokio-util",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ shlex = "1.2"
|
||||||
sys-locale = "0.3"
|
sys-locale = "0.3"
|
||||||
tempfile = "3.8"
|
tempfile = "3.8"
|
||||||
tokio = { version = "1.34", features = ["io-util", "macros", "net", "rt-multi-thread", "time"] }
|
tokio = { version = "1.34", features = ["io-util", "macros", "net", "rt-multi-thread", "time"] }
|
||||||
|
tokio-util = "0.7"
|
||||||
tower-service = "0.3"
|
tower-service = "0.3"
|
||||||
rustls-native-certs = { version = "0.6", optional = true }
|
rustls-native-certs = { version = "0.6", optional = true }
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,9 +19,11 @@ use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tempfile::TempPath;
|
use tempfile::TempPath;
|
||||||
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
|
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
|
||||||
|
use tokio::select;
|
||||||
use tokio::sync::mpsc::unbounded_channel;
|
use tokio::sync::mpsc::unbounded_channel;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tokio::task::JoinSet;
|
use tokio::task::JoinSet;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum MergeBehavior {
|
pub enum MergeBehavior {
|
||||||
|
|
@ -476,25 +478,25 @@ impl Downloader {
|
||||||
.stderr(Stdio::piped())
|
.stderr(Stdio::piped())
|
||||||
.args(command_args)
|
.args(command_args)
|
||||||
.spawn()?;
|
.spawn()?;
|
||||||
|
let ffmpeg_progress_cancel = CancellationToken::new();
|
||||||
|
let ffmpeg_progress_cancellation_token = ffmpeg_progress_cancel.clone();
|
||||||
let ffmpeg_progress = tokio::spawn(async move {
|
let ffmpeg_progress = tokio::spawn(async move {
|
||||||
ffmpeg_progress(
|
ffmpeg_progress(
|
||||||
max_frames as u64,
|
max_frames as u64,
|
||||||
fifo,
|
fifo,
|
||||||
format!("{:<1$}", "Generating output file", fmt_space + 1),
|
format!("{:<1$}", "Generating output file", fmt_space + 1),
|
||||||
|
ffmpeg_progress_cancellation_token,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
});
|
});
|
||||||
|
|
||||||
let result = ffmpeg.wait_with_output()?;
|
let result = ffmpeg.wait_with_output()?;
|
||||||
if !result.status.success() {
|
if !result.status.success() {
|
||||||
|
ffmpeg_progress.abort();
|
||||||
bail!("{}", String::from_utf8_lossy(result.stderr.as_slice()))
|
bail!("{}", String::from_utf8_lossy(result.stderr.as_slice()))
|
||||||
}
|
}
|
||||||
ffmpeg_progress.abort();
|
ffmpeg_progress_cancel.cancel();
|
||||||
match ffmpeg_progress.await {
|
Ok(ffmpeg_progress.await??)
|
||||||
Ok(r) => Ok(r?),
|
|
||||||
Err(e) if e.is_cancelled() => Ok(()),
|
|
||||||
Err(e) => Err(anyhow::Error::from(e)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn check_free_space(
|
async fn check_free_space(
|
||||||
|
|
@ -905,6 +907,7 @@ async fn ffmpeg_progress<R: AsyncReadExt + Unpin>(
|
||||||
total_frames: u64,
|
total_frames: u64,
|
||||||
stats: R,
|
stats: R,
|
||||||
message: String,
|
message: String,
|
||||||
|
cancellation_token: CancellationToken,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let current_frame = Regex::new(r"frame=\s+(?P<frame>\d+)")?;
|
let current_frame = Regex::new(r"frame=\s+(?P<frame>\d+)")?;
|
||||||
|
|
||||||
|
|
@ -926,25 +929,46 @@ async fn ffmpeg_progress<R: AsyncReadExt + Unpin>(
|
||||||
|
|
||||||
let reader = BufReader::new(stats);
|
let reader = BufReader::new(stats);
|
||||||
let mut lines = reader.lines();
|
let mut lines = reader.lines();
|
||||||
while let Some(line) = lines.next_line().await? {
|
loop {
|
||||||
let frame: u64 = current_frame
|
select! {
|
||||||
.captures(line.as_str())
|
// when gracefully canceling this future, set the progress to 100% (finished). sometimes
|
||||||
.unwrap()
|
// ffmpeg is too fast or already finished when the reading process of 'stats' starts
|
||||||
.name("frame")
|
// which causes the progress to be stuck at 0%
|
||||||
.unwrap()
|
_ = cancellation_token.cancelled() => {
|
||||||
.as_str()
|
if let Some(p) = &progress {
|
||||||
.parse()?;
|
p.set_position(total_frames)
|
||||||
|
}
|
||||||
|
debug!(
|
||||||
|
"Processed frame [{}/{} 100%]",
|
||||||
|
total_frames,
|
||||||
|
total_frames
|
||||||
|
);
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
|
line = lines.next_line() => {
|
||||||
|
let Some(line) = line? else {
|
||||||
|
break
|
||||||
|
};
|
||||||
|
let frame: u64 = current_frame
|
||||||
|
.captures(line.as_str())
|
||||||
|
.unwrap()
|
||||||
|
.name("frame")
|
||||||
|
.unwrap()
|
||||||
|
.as_str()
|
||||||
|
.parse()?;
|
||||||
|
|
||||||
if let Some(p) = &progress {
|
if let Some(p) = &progress {
|
||||||
p.set_position(frame)
|
p.set_position(frame)
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"Processed frame [{}/{} {:.2}%]",
|
||||||
|
frame,
|
||||||
|
total_frames,
|
||||||
|
(frame as f64 / total_frames as f64) * 100f64
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(
|
|
||||||
"Processed frame [{}/{} {:.2}%]",
|
|
||||||
frame,
|
|
||||||
total_frames,
|
|
||||||
(frame as f64 / total_frames as f64) * 100f64
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue