Fix download threads to properly return errors

This commit is contained in:
ByteDream 2023-01-03 01:28:42 +01:00
parent 3c3b7b6566
commit b365bda5dc

View file

@ -74,8 +74,13 @@ pub async fn download_segments(
let thread_client = client.clone();
let thread_sender = sender.clone();
let thread_segments = segs.remove(0);
let thread_count = count.clone();
join_set.spawn(async move {
let after_download_sender = thread_sender.clone();
// the download process is encapsulated in its own function. this is done to easily
// catch errors which get returned with `...?` and `bail!(...)` and that the thread
// itself can report that an error has occured
let download = || async move {
for (i, segment) in thread_segments.into_iter().enumerate() {
let mut retry_count = 0;
let mut buf = loop {
@ -83,19 +88,18 @@ pub async fn download_segments(
.get(&segment.url)
.timeout(Duration::from_secs(60))
.send()
.await
.unwrap();
.await?;
match response.bytes().await {
Ok(b) => break b.to_vec(),
Err(e) => {
if e.is_body() {
if retry_count == 5 {
panic!("Max retry count reached ({}), multiple errors occured while receiving segment {}: {}", retry_count, num + (i * cpus), e)
bail!("Max retry count reached ({}), multiple errors occured while receiving segment {}: {}", retry_count, num + (i * cpus), e)
}
debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count)
} else {
panic!("{}", e)
bail!("{}", e)
}
}
}
@ -109,12 +113,17 @@ pub async fn download_segments(
num + (i * cpus),
segment.url
);
thread_sender.send((num + (i * cpus), buf))?;
thread_sender.send((num as i32 + (i * cpus) as i32, buf))?;
}
Ok(())
};
*thread_count.lock().unwrap() += 1;
let result = download().await;
if result.is_err() {
after_download_sender.send((-1 as i32, vec![]))?;
}
Ok(())
result
});
}
// drop the sender already here so it does not outlive all (download) threads which are the only
@ -124,14 +133,19 @@ pub async fn download_segments(
// this is the main loop which writes the data. it uses a BTreeMap as a buffer as the write
// happens synchronized. the download consist of multiple segments. the map keys are representing
// the segment number and the values the corresponding bytes
let mut data_pos = 0usize;
let mut buf: BTreeMap<usize, Vec<u8>> = BTreeMap::new();
let mut data_pos = 0;
let mut buf: BTreeMap<i32, Vec<u8>> = BTreeMap::new();
for (pos, bytes) in receiver.iter() {
// if the position is lower than 0, an error occured in the sending download thread
if pos < 0 {
break
}
if let Some(p) = &progress {
let progress_len = p.length().unwrap();
let estimated_segment_len = (variant_data.bandwidth / 8)
* segments
.get(pos)
.get(pos as usize)
.unwrap()
.length
.unwrap_or_default()
@ -158,18 +172,17 @@ pub async fn download_segments(
}
}
// if any error has occured while downloading it gets returned here
while let Some(joined) = join_set.join_next().await {
joined??
}
// write the remaining buffer, if existent
while let Some(b) = buf.remove(&data_pos) {
writer.write_all(b.borrow())?;
data_pos += 1;
}
// if any error has occured while downloading it gets returned here. maybe a little late, if one
// out of, for example 12, threads has the error
while let Some(joined) = join_set.join_next().await {
joined??
}
if !buf.is_empty() {
bail!(
"Download buffer is not empty. Remaining segments: {}",