Show ffmpeg progress (#270)

This commit is contained in:
bytedream 2023-12-08 22:27:12 +01:00
parent 9ca3b79291
commit 9487dd3dbf
4 changed files with 176 additions and 32 deletions

1
Cargo.lock generated
View file

@ -397,6 +397,7 @@ dependencies = [
"indicatif",
"lazy_static",
"log",
"nix",
"num_cpus",
"regex",
"reqwest",

View file

@ -34,8 +34,11 @@ serde_plain = "1.0"
shlex = "1.2"
sys-locale = "0.3"
tempfile = "3.8"
tokio = { version = "1.34", features = ["macros", "rt-multi-thread", "time"] }
tokio = { version = "1.34", features = ["io-util", "macros", "net", "rt-multi-thread", "time"] }
rustls-native-certs = { version = "0.6", optional = true }
[target.'cfg(not(target_os = "windows"))'.dependencies]
nix = { version = "0.27", features = ["fs"] }
[build-dependencies]
chrono = "0.4"

View file

@ -1,16 +1,14 @@
use crate::utils::context::Context;
use crate::utils::ffmpeg::FFmpegPreset;
use crate::utils::log::progress;
use crate::utils::os::{is_special_file, temp_directory, tempfile};
use crate::utils::os::{is_special_file, temp_directory, temp_named_pipe, tempfile};
use anyhow::{bail, Result};
use chrono::NaiveTime;
use crunchyroll_rs::media::{Subtitle, VariantData, VariantSegment};
use crunchyroll_rs::Locale;
use indicatif::{ProgressBar, ProgressFinish, ProgressStyle};
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressFinish, ProgressStyle};
use log::{debug, warn, LevelFilter};
use regex::Regex;
use std::borrow::Borrow;
use std::borrow::BorrowMut;
use std::borrow::{Borrow, BorrowMut};
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::env;
@ -21,6 +19,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempPath;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
@ -177,15 +176,19 @@ impl Downloader {
let mut videos = vec![];
let mut audios = vec![];
let mut subtitles = vec![];
for (i, format) in self.formats.iter().enumerate() {
let fmt_space = format
.audios
let mut max_frames = 0f64;
let fmt_space = self
.formats
.iter()
.flat_map(|f| {
f.audios
.iter()
.map(|(_, locale)| format!("Downloading {} audio", locale).len())
})
.max()
.unwrap();
for (i, format) in self.formats.iter().enumerate() {
let video_path = self
.download_video(
ctx,
@ -232,7 +235,11 @@ impl Downloader {
None
};
let len = get_video_length(&video_path)?;
let (len, fps) = get_video_stats(&video_path)?;
let frames = len.signed_duration_since(NaiveTime::MIN).num_seconds() as f64 * fps;
if frames > max_frames {
max_frames = frames;
}
for (subtitle, not_cc) in format.subtitles.iter() {
if let Some(pb) = &progress_spinner {
let mut progress_message = pb.message();
@ -337,8 +344,14 @@ impl Downloader {
}
let (input_presets, mut output_presets) = self.ffmpeg_preset.into_input_output_args();
let fifo = temp_named_pipe()?;
let mut command_args = vec!["-y".to_string(), "-hide_banner".to_string()];
let mut command_args = vec![
"-y".to_string(),
"-hide_banner".to_string(),
"-vstats_file".to_string(),
fifo.name(),
];
command_args.extend(input_presets);
command_args.extend(input);
command_args.extend(maps);
@ -433,8 +446,6 @@ impl Downloader {
}
}
let progress_handler = progress!("Generating output file");
let ffmpeg = Command::new("ffmpeg")
// pass ffmpeg stdout to real stdout only if output file is stdout
.stdout(if dst.to_str().unwrap() == "-" {
@ -444,14 +455,26 @@ impl Downloader {
})
.stderr(Stdio::piped())
.args(command_args)
.output()?;
if !ffmpeg.status.success() {
bail!("{}", String::from_utf8_lossy(ffmpeg.stderr.as_slice()))
.spawn()?;
let ffmpeg_progress = tokio::spawn(async move {
ffmpeg_progress(
max_frames as u64,
fifo,
format!("{:<1$}", "Generating output file", fmt_space + 1),
)
.await
});
let result = ffmpeg.wait_with_output()?;
if !result.status.success() {
bail!("{}", String::from_utf8_lossy(result.stderr.as_slice()))
}
ffmpeg_progress.abort();
match ffmpeg_progress.await {
Ok(r) => Ok(r?),
Err(e) if e.is_cancelled() => Ok(()),
Err(e) => Err(anyhow::Error::from(e)),
}
progress_handler.stop("Output file generated");
Ok(())
}
async fn check_free_space(
@ -752,13 +775,10 @@ fn estimate_variant_file_size(variant_data: &VariantData, segments: &[VariantSeg
(variant_data.bandwidth / 8) * segments.iter().map(|s| s.length.as_secs()).sum::<u64>()
}
/// Get the length of a video. This is required because sometimes subtitles have an unnecessary entry
/// long after the actual video ends with artificially extends the video length on some video players.
/// To prevent this, the video length must be hard set. See
/// [crunchy-labs/crunchy-cli#32](https://github.com/crunchy-labs/crunchy-cli/issues/32) for more
/// information.
pub fn get_video_length(path: &Path) -> Result<NaiveTime> {
/// Get the length and fps of a video.
pub fn get_video_stats(path: &Path) -> Result<(NaiveTime, f64)> {
let video_length = Regex::new(r"Duration:\s(?P<time>\d+:\d+:\d+\.\d+),")?;
let video_fps = Regex::new(r"(?P<fps>[\d/.]+)\sfps")?;
let ffmpeg = Command::new("ffmpeg")
.stdout(Stdio::null())
@ -768,15 +788,26 @@ pub fn get_video_length(path: &Path) -> Result<NaiveTime> {
.args(["-i", path.to_str().unwrap()])
.output()?;
let ffmpeg_output = String::from_utf8(ffmpeg.stderr)?;
let caps = video_length.captures(ffmpeg_output.as_str()).map_or(
let length_caps = video_length.captures(ffmpeg_output.as_str()).map_or(
Err(anyhow::anyhow!(
"failed to get video length: {}",
ffmpeg_output
)),
Ok,
)?;
let fps_caps = video_fps.captures(ffmpeg_output.as_str()).map_or(
Err(anyhow::anyhow!(
"failed to get video fps: {}",
ffmpeg_output
)),
Ok,
)?;
Ok(NaiveTime::parse_from_str(caps.name("time").unwrap().as_str(), "%H:%M:%S%.f").unwrap())
Ok((
NaiveTime::parse_from_str(length_caps.name("time").unwrap().as_str(), "%H:%M:%S%.f")
.unwrap(),
fps_caps.name("fps").unwrap().as_str().parse().unwrap(),
))
}
/// Fix the subtitles in multiple ways as Crunchyroll sometimes delivers them malformed.
@ -875,3 +906,52 @@ fn fix_subtitles(raw: &mut Vec<u8>, max_length: NaiveTime) {
*raw = as_lines.join("\n").into_bytes()
}
async fn ffmpeg_progress<R: AsyncReadExt + Unpin>(
total_frames: u64,
stats: R,
message: String,
) -> Result<()> {
let current_frame = Regex::new(r"frame=\s+(?P<frame>\d+)")?;
let progress = if log::max_level() == LevelFilter::Info {
let progress = ProgressBar::new(total_frames)
.with_style(
ProgressStyle::with_template(":: {msg} [{wide_bar}] {percent:>3}%")
.unwrap()
.progress_chars("##-"),
)
.with_message(message)
.with_finish(ProgressFinish::Abandon);
progress.set_draw_target(ProgressDrawTarget::stdout());
progress.enable_steady_tick(Duration::from_millis(200));
Some(progress)
} else {
None
};
let reader = BufReader::new(stats);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
let frame: u64 = current_frame
.captures(line.as_str())
.unwrap()
.name("frame")
.unwrap()
.as_str()
.parse()?;
if let Some(p) = &progress {
p.set_position(frame)
}
debug!(
"Processed frame [{}/{} {:.2}%]",
frame,
total_frames,
(frame as f64 / total_frames as f64) * 100f64
)
}
Ok(())
}

View file

@ -3,9 +3,12 @@ use regex::{Regex, RegexBuilder};
use std::borrow::Cow;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::process::{Command, Stdio};
use std::task::{Context, Poll};
use std::{env, io};
use tempfile::{Builder, NamedTempFile};
use tokio::io::{AsyncRead, ReadBuf};
pub fn has_ffmpeg() -> bool {
if let Err(e) = Command::new("ffmpeg").stderr(Stdio::null()).spawn() {
@ -43,6 +46,63 @@ pub fn tempfile<S: AsRef<str>>(suffix: S) -> io::Result<NamedTempFile> {
Ok(tempfile)
}
pub struct TempNamedPipe {
name: String,
#[cfg(not(target_os = "windows"))]
reader: tokio::net::unix::pipe::Receiver,
#[cfg(target_os = "windows")]
reader: tokio::net::windows::named_pipe::NamedPipeServer,
}
impl TempNamedPipe {
pub fn name(&self) -> String {
self.name.clone()
}
}
impl AsyncRead for TempNamedPipe {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.reader).poll_read(cx, buf)
}
}
impl Drop for TempNamedPipe {
fn drop(&mut self) {
#[cfg(not(target_os = "windows"))]
let _ = nix::unistd::unlink(self.name.as_str());
}
}
pub fn temp_named_pipe() -> io::Result<TempNamedPipe> {
let (_, path) = tempfile("")?.keep()?;
let path = path.to_string_lossy().to_string();
let _ = std::fs::remove_file(path.clone());
#[cfg(not(target_os = "windows"))]
{
nix::unistd::mkfifo(path.as_str(), nix::sys::stat::Mode::S_IRWXU)?;
Ok(TempNamedPipe {
reader: tokio::net::unix::pipe::OpenOptions::new().open_receiver(&path)?,
name: path,
})
}
#[cfg(target_os = "windows")]
{
let path = format!(r"\\.\pipe\{}", &path);
Ok(TempNamedPipe {
reader: tokio::net::windows::named_pipe::ServerOptions::new().create(&path)?,
name: path,
})
}
}
/// Check if the given path exists and rename it until the new (renamed) file does not exist.
pub fn free_file(mut path: PathBuf) -> (PathBuf, bool) {
// do not rename it if it exists but is a special file