Slightly change download process to be more verbose in error situations

This commit is contained in:
ByteDream 2022-12-28 02:18:17 +01:00
parent 9e0edda7c2
commit c5940a240c

View file

@ -93,13 +93,16 @@ pub async fn download_segments(
Ok(()) Ok(())
}); });
} }
// drop the sender already here so it does not outlive all (download) threads which are the only
// real consumers of it
drop(sender);
// this is the main loop which writes the data. it uses a BTreeMap as a buffer as the write
// happens synchronized. the download consist of multiple segments. the map keys are representing
// the segment number and the values the corresponding bytes
let mut data_pos = 0usize; let mut data_pos = 0usize;
let mut buf: BTreeMap<usize, Vec<u8>> = BTreeMap::new(); let mut buf: BTreeMap<usize, Vec<u8>> = BTreeMap::new();
loop { for (pos, bytes) in receiver.iter() {
// is always `Some` because `sender` does not get dropped when all threads are finished
let (pos, bytes) = receiver.recv().unwrap();
if let Some(p) = &progress { if let Some(p) = &progress {
let progress_len = p.length().unwrap(); let progress_len = p.length().unwrap();
let estimated_segment_len = (variant_data.bandwidth / 8) let estimated_segment_len = (variant_data.bandwidth / 8)
@ -115,26 +118,44 @@ pub async fn download_segments(
p.inc(bytes_len) p.inc(bytes_len)
} }
// check if the currently sent bytes are the next in the buffer. if so, write them directly
// to the target without first adding them to the buffer.
// if not, add them to the buffer
if data_pos == pos { if data_pos == pos {
writer.write_all(bytes.borrow())?; writer.write_all(bytes.borrow())?;
data_pos += 1; data_pos += 1;
} else { } else {
buf.insert(pos, bytes); buf.insert(pos, bytes);
} }
// check if the buffer contains the next segment(s)
while let Some(b) = buf.remove(&data_pos) {
writer.write_all(b.borrow())?;
data_pos += 1;
}
}
// write the remaining buffer, if existent
while let Some(b) = buf.remove(&data_pos) { while let Some(b) = buf.remove(&data_pos) {
writer.write_all(b.borrow())?; writer.write_all(b.borrow())?;
data_pos += 1; data_pos += 1;
} }
if *count.lock().unwrap() >= total_segments && buf.is_empty() { // if any error has occured while downloading it gets returned here. maybe a little late, if one
break; // out of, for example 12, threads has the error
}
}
while let Some(joined) = join_set.join_next().await { while let Some(joined) = join_set.join_next().await {
joined?? joined??
} }
if !buf.is_empty() {
bail!(
"Download buffer is not empty. Remaining segments: {}",
buf.into_keys()
.map(|k| k.to_string())
.collect::<Vec<String>>()
.join(", ")
)
}
Ok(()) Ok(())
} }