diff --git a/crunchy-cli-core/src/cli/utils.rs b/crunchy-cli-core/src/cli/utils.rs index 6045ce4..1f6bf40 100644 --- a/crunchy-cli-core/src/cli/utils.rs +++ b/crunchy-cli-core/src/cli/utils.rs @@ -93,13 +93,16 @@ pub async fn download_segments( Ok(()) }); } + // drop the sender already here so it does not outlive all (download) threads which are the only + // real consumers of it + drop(sender); + // this is the main loop which writes the data. it uses a BTreeMap as a buffer as the write + // happens synchronized. the download consist of multiple segments. the map keys are representing + // the segment number and the values the corresponding bytes let mut data_pos = 0usize; let mut buf: BTreeMap> = BTreeMap::new(); - loop { - // is always `Some` because `sender` does not get dropped when all threads are finished - let (pos, bytes) = receiver.recv().unwrap(); - + for (pos, bytes) in receiver.iter() { if let Some(p) = &progress { let progress_len = p.length().unwrap(); let estimated_segment_len = (variant_data.bandwidth / 8) @@ -115,26 +118,44 @@ pub async fn download_segments( p.inc(bytes_len) } + // check if the currently sent bytes are the next in the buffer. if so, write them directly + // to the target without first adding them to the buffer. + // if not, add them to the buffer if data_pos == pos { writer.write_all(bytes.borrow())?; data_pos += 1; } else { buf.insert(pos, bytes); } + // check if the buffer contains the next segment(s) while let Some(b) = buf.remove(&data_pos) { writer.write_all(b.borrow())?; data_pos += 1; } - - if *count.lock().unwrap() >= total_segments && buf.is_empty() { - break; - } } + // write the remaining buffer, if existent + while let Some(b) = buf.remove(&data_pos) { + writer.write_all(b.borrow())?; + data_pos += 1; + } + + // if any error has occured while downloading it gets returned here. maybe a little late, if one + // out of, for example 12, threads has the error while let Some(joined) = join_set.join_next().await { joined?? } + if !buf.is_empty() { + bail!( + "Download buffer is not empty. Remaining segments: {}", + buf.into_keys() + .map(|k| k.to_string()) + .collect::>() + .join(", ") + ) + } + Ok(()) }