From b365bda5dca57754a383bdb3de0484f600c9697c Mon Sep 17 00:00:00 2001 From: ByteDream Date: Tue, 3 Jan 2023 01:28:42 +0100 Subject: [PATCH] Fix download threads to properly return errors --- crunchy-cli-core/src/cli/utils.rs | 93 ++++++++++++++++++------------- 1 file changed, 53 insertions(+), 40 deletions(-) diff --git a/crunchy-cli-core/src/cli/utils.rs b/crunchy-cli-core/src/cli/utils.rs index 841dbef..99d4f67 100644 --- a/crunchy-cli-core/src/cli/utils.rs +++ b/crunchy-cli-core/src/cli/utils.rs @@ -74,47 +74,56 @@ pub async fn download_segments( let thread_client = client.clone(); let thread_sender = sender.clone(); let thread_segments = segs.remove(0); - let thread_count = count.clone(); join_set.spawn(async move { - for (i, segment) in thread_segments.into_iter().enumerate() { - let mut retry_count = 0; - let mut buf = loop { - let response = thread_client - .get(&segment.url) - .timeout(Duration::from_secs(60)) - .send() - .await - .unwrap(); + let after_download_sender = thread_sender.clone(); - match response.bytes().await { - Ok(b) => break b.to_vec(), - Err(e) => { - if e.is_body() { - if retry_count == 5 { - panic!("Max retry count reached ({}), multiple errors occured while receiving segment {}: {}", retry_count, num + (i * cpus), e) + // the download process is encapsulated in its own function. this is done to easily + // catch errors which get returned with `...?` and `bail!(...)` and that the thread + // itself can report that an error has occured + let download = || async move { + for (i, segment) in thread_segments.into_iter().enumerate() { + let mut retry_count = 0; + let mut buf = loop { + let response = thread_client + .get(&segment.url) + .timeout(Duration::from_secs(60)) + .send() + .await?; + + match response.bytes().await { + Ok(b) => break b.to_vec(), + Err(e) => { + if e.is_body() { + if retry_count == 5 { + bail!("Max retry count reached ({}), multiple errors occured while receiving segment {}: {}", retry_count, num + (i * cpus), e) + } + debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count) + } else { + bail!("{}", e) } - debug!("Failed to download segment {} ({}). Retrying, {} out of 5 retries left", num + (i * cpus), e, 5 - retry_count) - } else { - panic!("{}", e) } } - } - retry_count += 1; - }; + retry_count += 1; + }; - buf = VariantSegment::decrypt(buf.borrow_mut(), segment.key)?.to_vec(); - debug!( - "Downloaded and decrypted segment {} ({})", - num + (i * cpus), - segment.url - ); - thread_sender.send((num + (i * cpus), buf))?; + buf = VariantSegment::decrypt(buf.borrow_mut(), segment.key)?.to_vec(); + debug!( + "Downloaded and decrypted segment {} ({})", + num + (i * cpus), + segment.url + ); + thread_sender.send((num as i32 + (i * cpus) as i32, buf))?; + } + Ok(()) + }; - *thread_count.lock().unwrap() += 1; + let result = download().await; + if result.is_err() { + after_download_sender.send((-1 as i32, vec![]))?; } - Ok(()) + result }); } // drop the sender already here so it does not outlive all (download) threads which are the only @@ -124,14 +133,19 @@ pub async fn download_segments( // this is the main loop which writes the data. it uses a BTreeMap as a buffer as the write // happens synchronized. the download consist of multiple segments. the map keys are representing // the segment number and the values the corresponding bytes - let mut data_pos = 0usize; - let mut buf: BTreeMap> = BTreeMap::new(); + let mut data_pos = 0; + let mut buf: BTreeMap> = BTreeMap::new(); for (pos, bytes) in receiver.iter() { + // if the position is lower than 0, an error occured in the sending download thread + if pos < 0 { + break + } + if let Some(p) = &progress { let progress_len = p.length().unwrap(); let estimated_segment_len = (variant_data.bandwidth / 8) * segments - .get(pos) + .get(pos as usize) .unwrap() .length .unwrap_or_default() @@ -158,18 +172,17 @@ pub async fn download_segments( } } + // if any error has occured while downloading it gets returned here + while let Some(joined) = join_set.join_next().await { + joined?? + } + // write the remaining buffer, if existent while let Some(b) = buf.remove(&data_pos) { writer.write_all(b.borrow())?; data_pos += 1; } - // if any error has occured while downloading it gets returned here. maybe a little late, if one - // out of, for example 12, threads has the error - while let Some(joined) = join_set.join_next().await { - joined?? - } - if !buf.is_empty() { bail!( "Download buffer is not empty. Remaining segments: {}",