diff --git a/src/cli/self_update.rs b/src/cli/self_update.rs index 6c334d94af..eec372ca74 100644 --- a/src/cli/self_update.rs +++ b/src/cli/self_update.rs @@ -1275,14 +1275,7 @@ pub(crate) async fn prepare_update(dl_cfg: &DownloadCfg<'_>) -> Result) -> Result(&release_toml_str) .context("unable to parse rustup release file")?; diff --git a/src/cli/self_update/windows.rs b/src/cli/self_update/windows.rs index 499bd1225e..f7ad86fb1a 100644 --- a/src/cli/self_update/windows.rs +++ b/src/cli/self_update/windows.rs @@ -277,7 +277,7 @@ pub(crate) async fn try_install_msvc( &visual_studio_url, &visual_studio, None, - &dl_cfg.notifier, + None, dl_cfg.process, ) .await?; diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs index 902813894d..4372dbad99 100644 --- a/src/diskio/mod.rs +++ b/src/diskio/mod.rs @@ -66,7 +66,6 @@ use std::{fmt::Debug, fs::OpenOptions}; use anyhow::Result; -use crate::dist::download::Notifier; use crate::process::Process; /// Carries the implementation specific data for complete file transfers into the executor. @@ -443,13 +442,12 @@ pub(crate) fn create_dir>(path: P) -> io::Result<()> { /// Get the executor for disk IO. pub(crate) fn get_executor<'a>( - notifier: Option<&'a Notifier>, ram_budget: usize, process: &Process, ) -> anyhow::Result> { // If this gets lots of use, consider exposing via the config file. Ok(match process.io_thread_count()? { 0 | 1 => Box::new(immediate::ImmediateUnpacker::new()), - n => Box::new(threaded::Threaded::new(notifier, n, ram_budget)), + n => Box::new(threaded::Threaded::new(n, ram_budget)), }) } diff --git a/src/diskio/test.rs b/src/diskio/test.rs index f0a9298597..738d39e27b 100644 --- a/src/diskio/test.rs +++ b/src/diskio/test.rs @@ -24,7 +24,7 @@ fn test_incremental_file(io_threads: &str) -> Result<()> { let mut written = 0; let mut file_finished = false; - let mut io_executor: Box = get_executor(None, 32 * 1024 * 1024, &tp.process)?; + let mut io_executor: Box = get_executor(32 * 1024 * 1024, &tp.process)?; let (item, mut sender) = Item::write_file_segmented( work_dir.path().join("scratch"), 0o666, @@ -90,7 +90,7 @@ fn test_complete_file(io_threads: &str) -> Result<()> { vars.insert("RUSTUP_IO_THREADS".to_string(), io_threads.to_string()); let tp = TestProcess::with_vars(vars); - let mut io_executor: Box = get_executor(None, 32 * 1024 * 1024, &tp.process)?; + let mut io_executor: Box = get_executor(32 * 1024 * 1024, &tp.process)?; let mut chunk = io_executor.get_buffer(10); chunk.extend(b"0123456789"); assert_eq!(chunk.len(), 10); diff --git a/src/diskio/threaded.rs b/src/diskio/threaded.rs index e137be2108..313c06af93 100644 --- a/src/diskio/threaded.rs +++ b/src/diskio/threaded.rs @@ -15,7 +15,6 @@ use sharded_slab::pool::{OwnedRef, OwnedRefMut}; use tracing::debug; use super::{CompletedIo, Executor, Item, perform}; -use crate::dist::download::{Notification, Notifier}; #[derive(Copy, Clone, Debug, Enum)] pub(crate) enum Bucket { @@ -96,23 +95,18 @@ impl fmt::Debug for Pool { } } -pub(crate) struct Threaded<'a> { +pub(crate) struct Threaded { n_files: Arc, pool: threadpool::ThreadPool, - notifier: Option<&'a Notifier>, rx: Receiver, tx: Sender, vec_pools: EnumMap, ram_budget: usize, } -impl<'a> Threaded<'a> { +impl Threaded { /// Construct a new Threaded executor. - pub(crate) fn new( - notify_handler: Option<&'a Notifier>, - thread_count: usize, - ram_budget: usize, - ) -> Self { + pub(crate) fn new(thread_count: usize, ram_budget: usize) -> Self { // Defaults to hardware thread count threads; this is suitable for // our needs as IO bound operations tend to show up as write latencies // rather than close latencies, so we don't need to look at @@ -168,7 +162,6 @@ impl<'a> Threaded<'a> { Self { n_files: Arc::new(AtomicUsize::new(0)), pool, - notifier: notify_handler, rx, tx, vec_pools, @@ -233,7 +226,7 @@ impl<'a> Threaded<'a> { } } -impl Executor for Threaded<'_> { +impl Executor for Threaded { fn dispatch(&self, item: Item) -> Box + '_> { // Yield any completed work before accepting new work - keep memory // pressure under control @@ -260,18 +253,11 @@ impl Executor for Threaded<'_> { // items, and the download tracker's progress is confounded with // actual handling of data today, we synthesis a data buffer and // pretend to have bytes to deliver. - let mut prev_files = self.n_files.load(Ordering::Relaxed); - if let Some(notifier) = self.notifier { - notifier.handle(Notification::DownloadFinished(None)); - notifier.handle(Notification::DownloadContentLengthReceived( - prev_files as u64, - None, - )); - } + let prev_files = self.n_files.load(Ordering::Relaxed); if prev_files > 50 { debug!("{prev_files} deferred IO operations"); } - let buf: Vec = vec![0; prev_files]; + // Cheap wrap-around correctness check - we have 20k files, more than // 32K means we subtracted from 0 somewhere. assert!(32767 > prev_files); @@ -279,20 +265,10 @@ impl Executor for Threaded<'_> { while current_files != 0 { use std::thread::sleep; sleep(std::time::Duration::from_millis(100)); - prev_files = current_files; current_files = self.n_files.load(Ordering::Relaxed); - let step_count = prev_files - current_files; - if let Some(notifier) = self.notifier { - notifier.handle(Notification::DownloadDataReceived( - &buf[0..step_count], - None, - )); - } } self.pool.join(); - if let Some(notifier) = self.notifier { - notifier.handle(Notification::DownloadFinished(None)); - } + // close the feedback channel so that blocking reads on it can // complete. send is atomic, and we know the threads completed from the // pool join, so this is race-free. It is possible that try_iter is safe @@ -352,19 +328,19 @@ impl Executor for Threaded<'_> { } } -impl Drop for Threaded<'_> { +impl Drop for Threaded { fn drop(&mut self) { // We are not permitted to fail - consume but do not handle the items. self.join().for_each(drop); } } -struct JoinIterator<'a, 'b> { - executor: &'a Threaded<'b>, +struct JoinIterator<'a> { + executor: &'a Threaded, consume_sentinel: bool, } -impl JoinIterator<'_, '_> { +impl JoinIterator<'_> { fn inner>(&self, mut iter: T) -> Option { loop { let task_o = iter.next(); @@ -388,7 +364,7 @@ impl JoinIterator<'_, '_> { } } -impl Iterator for JoinIterator<'_, '_> { +impl Iterator for JoinIterator<'_> { type Item = CompletedIo; fn next(&mut self) -> Option { @@ -400,12 +376,12 @@ impl Iterator for JoinIterator<'_, '_> { } } -struct SubmitIterator<'a, 'b> { - executor: &'a Threaded<'b>, +struct SubmitIterator<'a> { + executor: &'a Threaded, item: Cell>, } -impl Iterator for SubmitIterator<'_, '_> { +impl Iterator for SubmitIterator<'_> { type Item = CompletedIo; fn next(&mut self) -> Option { diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index 75db01e742..d4fb5b3547 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -16,10 +16,9 @@ use tracing::{error, trace, warn}; use crate::diskio::{CompletedIo, Executor, FileBuffer, IO_CHUNK_SIZE, Item, Kind, get_executor}; use crate::dist::component::components::*; use crate::dist::component::transaction::*; -use crate::dist::download::Notifier; +use crate::dist::download::DownloadCfg; use crate::dist::temp; use crate::errors::*; -use crate::process::Process; use crate::utils; use crate::utils::units::Size; @@ -143,13 +142,13 @@ impl Package for DirectoryPackage { pub(crate) struct TarPackage(DirectoryPackage, temp::Dir); impl TarPackage { - pub(crate) fn new(stream: R, cx: &PackageContext<'_>) -> Result { - let temp_dir = cx.tmp_cx.new_directory()?; + pub(crate) fn new(stream: R, dl_cfg: &DownloadCfg<'_>) -> Result { + let temp_dir = dl_cfg.tmp_cx.new_directory()?; let mut archive = tar::Archive::new(stream); // The rust-installer packages unpack to a directory called // $pkgname-$version-$target. Skip that directory when // unpacking. - unpack_without_first_dir(&mut archive, &temp_dir, cx) + unpack_without_first_dir(&mut archive, &temp_dir, dl_cfg) .context("failed to extract package")?; Ok(TarPackage( @@ -163,7 +162,7 @@ impl TarPackage { fn unpack_ram( io_chunk_size: usize, effective_max_ram: Option, - cx: &PackageContext<'_>, + dl_cfg: &DownloadCfg<'_>, ) -> usize { const RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS: usize = 200 * 1024 * 1024; let minimum_ram = io_chunk_size * 2; @@ -177,7 +176,7 @@ fn unpack_ram( // Rustup does not know how much RAM the machine has: use the minimum minimum_ram }; - let unpack_ram = match cx + let unpack_ram = match dl_cfg .process .var("RUSTUP_UNPACK_RAM") .ok() @@ -289,7 +288,7 @@ enum DirStatus { fn unpack_without_first_dir( archive: &mut tar::Archive, path: &Path, - cx: &PackageContext<'_>, + dl_cfg: &DownloadCfg<'_>, ) -> Result<()> { let entries = archive.entries()?; let effective_max_ram = match effective_limits::memory_limit() { @@ -299,8 +298,8 @@ fn unpack_without_first_dir( None } }; - let unpack_ram = unpack_ram(IO_CHUNK_SIZE, effective_max_ram, cx); - let mut io_executor: Box = get_executor(cx.notifier, unpack_ram, cx.process)?; + let unpack_ram = unpack_ram(IO_CHUNK_SIZE, effective_max_ram, dl_cfg); + let mut io_executor: Box = get_executor(unpack_ram, dl_cfg.process)?; let mut directories: HashMap = HashMap::new(); // Path is presumed to exist. Call it a precondition. @@ -461,7 +460,7 @@ fn unpack_without_first_dir( // Complain about this so we can see if these exist. use std::io::Write as _; writeln!( - cx.process.stderr().lock(), + dl_cfg.process.stderr().lock(), "Unexpected: missing parent '{}' for '{}'", parent.display(), entry.path()?.display() @@ -552,9 +551,9 @@ impl Package for TarPackage { pub(crate) struct TarGzPackage(TarPackage); impl TarGzPackage { - pub(crate) fn new(stream: R, cx: &PackageContext<'_>) -> Result { + pub(crate) fn new(stream: R, dl_cfg: &DownloadCfg<'_>) -> Result { let stream = flate2::read::GzDecoder::new(stream); - Ok(TarGzPackage(TarPackage::new(stream, cx)?)) + Ok(TarGzPackage(TarPackage::new(stream, dl_cfg)?)) } } @@ -580,9 +579,9 @@ impl Package for TarGzPackage { pub(crate) struct TarXzPackage(TarPackage); impl TarXzPackage { - pub(crate) fn new(stream: R, cx: &PackageContext<'_>) -> Result { + pub(crate) fn new(stream: R, dl_cfg: &DownloadCfg<'_>) -> Result { let stream = xz2::read::XzDecoder::new(stream); - Ok(TarXzPackage(TarPackage::new(stream, cx)?)) + Ok(TarXzPackage(TarPackage::new(stream, dl_cfg)?)) } } @@ -608,9 +607,9 @@ impl Package for TarXzPackage { pub(crate) struct TarZStdPackage(TarPackage); impl TarZStdPackage { - pub(crate) fn new(stream: R, cx: &PackageContext<'_>) -> Result { + pub(crate) fn new(stream: R, dl_cfg: &DownloadCfg<'_>) -> Result { let stream = zstd::stream::read::Decoder::new(stream)?; - Ok(TarZStdPackage(TarPackage::new(stream, cx)?)) + Ok(TarZStdPackage(TarPackage::new(stream, dl_cfg)?)) } } @@ -631,9 +630,3 @@ impl Package for TarZStdPackage { self.0.components() } } - -pub(crate) struct PackageContext<'a> { - pub(crate) tmp_cx: &'a temp::Context, - pub(crate) notifier: Option<&'a Notifier>, - pub(crate) process: &'a Process, -} diff --git a/src/dist/download.rs b/src/dist/download.rs index e6061c3249..dabf990311 100644 --- a/src/dist/download.rs +++ b/src/dist/download.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::borrow::Cow; use std::fs; use std::ops; use std::path::{Path, PathBuf}; @@ -23,7 +23,7 @@ const UPDATE_HASH_LEN: usize = 20; pub struct DownloadCfg<'a> { pub tmp_cx: &'a temp::Context, pub download_dir: &'a PathBuf, - pub(crate) notifier: Notifier, + pub(super) tracker: DownloadTracker, pub process: &'a Process, } @@ -33,7 +33,7 @@ impl<'a> DownloadCfg<'a> { DownloadCfg { tmp_cx: &cfg.tmp_cx, download_dir: &cfg.download_dir, - notifier: Notifier::new(cfg.quiet, cfg.process), + tracker: DownloadTracker::new(!cfg.quiet, cfg.process), process: cfg.process, } } @@ -42,12 +42,17 @@ impl<'a> DownloadCfg<'a> { /// Partial downloads are stored in `self.download_dir`, keyed by hash. If the /// target file already exists, then the hash is checked and it is returned /// immediately without re-downloading. - pub(crate) async fn download(&self, url: &Url, hash: &str) -> Result { + pub(crate) async fn download( + &self, + url: &Url, + hash: &str, + status: &DownloadStatus, + ) -> Result { utils::ensure_dir_exists("Download Directory", self.download_dir)?; let target_file = self.download_dir.join(Path::new(hash)); if target_file.exists() { - let cached_result = file_hash(&target_file, &self.notifier)?; + let cached_result = file_hash(&target_file)?; if hash == cached_result { debug!("reusing previously downloaded file"); debug!(url = url.as_ref(), "checksum passed"); @@ -76,7 +81,7 @@ impl<'a> DownloadCfg<'a> { &partial_file_path, Some(&mut hasher), true, - &self.notifier, + Some(status), self.process, ) .await @@ -125,7 +130,7 @@ impl<'a> DownloadCfg<'a> { let hash_url = utils::parse_url(&(url.to_owned() + ".sha256"))?; let hash_file = self.tmp_cx.new_file()?; - download_file(&hash_url, &hash_file, None, &self.notifier, self.process).await?; + download_file(&hash_url, &hash_file, None, None, self.process).await?; utils::read_file("hash", &hash_file).map(|s| s[0..64].to_owned()) } @@ -139,6 +144,7 @@ impl<'a> DownloadCfg<'a> { &self, url_str: &str, update_hash: Option<&Path>, + status: Option<&DownloadStatus>, ext: &str, ) -> Result> { let hash = self.download_hash(url_str).await?; @@ -166,7 +172,7 @@ impl<'a> DownloadCfg<'a> { let file = self.tmp_cx.new_file_with_ext("", ext)?; let mut hasher = Sha256::new(); - download_file(&url, &file, Some(&mut hasher), &self.notifier, self.process).await?; + download_file(&url, &file, Some(&mut hasher), status, self.process).await?; let actual_hash = format!("{:x}", hasher.finalize()); if hash != actual_hash { @@ -183,22 +189,24 @@ impl<'a> DownloadCfg<'a> { Ok(Some((file, partial_hash))) } -} -pub(crate) struct Notifier { - tracker: Mutex, -} + pub(crate) fn status_for(&self, component: impl Into>) -> DownloadStatus { + let progress = ProgressBar::hidden(); + progress.set_style( + ProgressStyle::with_template( + "{msg:>12.bold} [{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec}, ETA: {eta})", + ) + .unwrap() + .progress_chars("## "), + ); + progress.set_message(component); + self.tracker.multi_progress_bars.add(progress.clone()); -impl Notifier { - pub(crate) fn new(quiet: bool, process: &Process) -> Self { - Self { - tracker: Mutex::new(DownloadTracker::new(!quiet, process)), + DownloadStatus { + progress, + retry_time: Mutex::new(None), } } - - pub(crate) fn handle(&self, n: Notification<'_>) { - self.tracker.lock().unwrap().handle_notification(&n); - } } /// Tracks download progress and displays information about it to a terminal. @@ -208,13 +216,6 @@ impl Notifier { pub(crate) struct DownloadTracker { /// MultiProgress bar for the downloads. multi_progress_bars: MultiProgress, - /// Mapping of URLs being downloaded to their corresponding progress bars. - /// The `Option` represents the instant where the download is being retried, - /// allowing us delay the reappearance of the progress bar so that the user can see - /// the message "retrying download" for at least a second. - /// Without it, the progress bar would reappear immediately, not allowing the user to - /// correctly see the message, before the progress bar starts again. - file_progress_bars: HashMap)>, } impl DownloadTracker { @@ -228,74 +229,36 @@ impl DownloadTracker { Self { multi_progress_bars, - file_progress_bars: HashMap::new(), - } - } - - pub(crate) fn handle_notification(&mut self, n: &Notification<'_>) { - match *n { - Notification::DownloadContentLengthReceived(content_len, url) => { - if let Some(url) = url { - self.content_length_received(content_len, url); - } - } - Notification::DownloadDataReceived(data, url) => { - if let Some(url) = url { - self.data_received(data.len(), url); - } - } - Notification::DownloadFinished(url) => { - if let Some(url) = url { - self.download_finished(url); - } - } - Notification::DownloadFailed(url) => { - self.download_failed(url); - debug!("download failed"); - } - Notification::DownloadingComponent(component, url) => { - self.create_progress_bar(component.to_owned(), url.to_owned()); - } - Notification::RetryingDownload(url) => { - self.retrying_download(url); - } } } +} - /// Creates a new ProgressBar for the given component. - pub(crate) fn create_progress_bar(&mut self, component: String, url: String) { - let pb = ProgressBar::hidden(); - pb.set_style( - ProgressStyle::with_template( - "{msg:>12.bold} [{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec}, ETA: {eta})", - ) - .unwrap() - .progress_chars("## "), - ); - pb.set_message(component); - self.multi_progress_bars.add(pb.clone()); - self.file_progress_bars.insert(url, (pb, None)); - } +pub(crate) struct DownloadStatus { + progress: ProgressBar, + /// The instant where the download is being retried. + /// + /// Allows us to delay the reappearance of the progress bar so that the user can see + /// the message "retrying download" for at least a second. Without it, the progress + /// bar would reappear immediately, not allowing the user to correctly see the message, + /// before the progress bar starts again. + retry_time: Mutex>, +} - /// Sets the length for a new ProgressBar and gives it a style. - pub(crate) fn content_length_received(&mut self, content_len: u64, url: &str) { - if let Some((pb, _)) = self.file_progress_bars.get(url) { - pb.reset(); - pb.set_length(content_len); - } +impl DownloadStatus { + pub(crate) fn received_length(&self, len: u64) { + self.progress.reset(); + self.progress.set_length(len); } - /// Notifies self that data of size `len` has been received. - pub(crate) fn data_received(&mut self, len: usize, url: &str) { - let Some((pb, retry_time)) = self.file_progress_bars.get_mut(url) else { - return; - }; - pb.inc(len as u64); + pub(crate) fn received_data(&self, len: usize) { + self.progress.inc(len as u64); + let mut retry_time = self.retry_time.lock().unwrap(); if !retry_time.is_some_and(|instant| instant.elapsed() > Duration::from_secs(1)) { return; } + *retry_time = None; - pb.set_style( + self.progress.set_style( ProgressStyle::with_template( "{msg:>12.bold} [{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec}, ETA: {eta})", ) @@ -304,59 +267,32 @@ impl DownloadTracker { ); } - /// Notifies self that the download has finished. - pub(crate) fn download_finished(&mut self, url: &str) { - let Some((pb, _)) = self.file_progress_bars.get(url) else { - return; - }; - pb.set_style( + pub(crate) fn finished(&self) { + self.progress.set_style( ProgressStyle::with_template("{msg:>12.bold} downloaded {total_bytes} in {elapsed}") .unwrap(), ); - pb.finish(); + self.progress.finish(); } - /// Notifies self that the download has failed. - pub(crate) fn download_failed(&mut self, url: &str) { - let Some((pb, _)) = self.file_progress_bars.get(url) else { - return; - }; - pb.set_style( + pub(crate) fn failed(&self) { + self.progress.set_style( ProgressStyle::with_template("{msg:>12.bold} download failed after {elapsed}") .unwrap(), ); - pb.finish(); + self.progress.finish(); } - /// Notifies self that the download is being retried. - pub(crate) fn retrying_download(&mut self, url: &str) { - let Some((pb, retry_time)) = self.file_progress_bars.get_mut(url) else { - return; - }; - *retry_time = Some(Instant::now()); - pb.set_style(ProgressStyle::with_template("{msg:>12.bold} retrying download").unwrap()); + pub(crate) fn retrying(&self) { + *self.retry_time.lock().unwrap() = Some(Instant::now()); + self.progress + .set_style(ProgressStyle::with_template("{msg:>12.bold} retrying download").unwrap()); } } -#[derive(Debug)] -pub(crate) enum Notification<'a> { - /// The URL of the download is passed as the last argument, to allow us to track concurrent downloads. - DownloadingComponent(&'a str, &'a str), - RetryingDownload(&'a str), - /// Received the Content-Length of the to-be downloaded data with - /// the respective URL of the download (for tracking concurrent downloads). - DownloadContentLengthReceived(u64, Option<&'a str>), - /// Received some data. - DownloadDataReceived(&'a [u8], Option<&'a str>), - /// Download has finished. - DownloadFinished(Option<&'a str>), - /// Download has failed. - DownloadFailed(&'a str), -} - -fn file_hash(path: &Path, notifier: &Notifier) -> Result { +fn file_hash(path: &Path) -> Result { let mut hasher = Sha256::new(); - let mut downloaded = utils::FileReaderWithProgress::new_file(path, notifier)?; + let mut downloaded = utils::FileReaderWithProgress::new_file(path)?; use std::io::Read; let mut buf = vec![0; 32768]; while let Ok(n) = downloaded.read(&mut buf) { diff --git a/src/dist/manifestation.rs b/src/dist/manifestation.rs index 00c34b0984..966f67a161 100644 --- a/src/dist/manifestation.rs +++ b/src/dist/manifestation.rs @@ -14,10 +14,10 @@ use tracing::{info, warn}; use url::Url; use crate::dist::component::{ - Components, Package, PackageContext, TarGzPackage, TarXzPackage, TarZStdPackage, Transaction, + Components, Package, TarGzPackage, TarXzPackage, TarZStdPackage, Transaction, }; use crate::dist::config::Config; -use crate::dist::download::{DownloadCfg, File, Notification}; +use crate::dist::download::{DownloadCfg, DownloadStatus, File}; use crate::dist::manifest::{Component, CompressionKind, HashedBinary, Manifest, TargetedPackage}; use crate::dist::prefix::InstallPrefix; #[cfg(test)] @@ -158,9 +158,18 @@ impl Manifestation { // Download component packages and validate hashes let mut things_to_install = Vec::new(); let mut things_downloaded = Vec::new(); - let components = update.components_urls_and_hashes(new_manifest)?; - let components_len = components.len(); + let components = update + .components_urls_and_hashes(new_manifest) + .map(|res| { + res.map(|(component, binary)| ComponentBinary { + component, + binary, + status: download_cfg.status_for(component.short_name(new_manifest)), + }) + }) + .collect::>>()?; + let components_len = components.len(); const DEFAULT_CONCURRENT_DOWNLOADS: usize = 2; let concurrent_downloads = download_cfg .process @@ -176,15 +185,6 @@ impl Manifestation { .unwrap_or(DEFAULT_MAX_RETRIES); info!("downloading component(s)"); - for bin in &components { - download_cfg - .notifier - .handle(Notification::DownloadingComponent( - &bin.component.short_name(new_manifest), - &bin.binary.url, - )); - } - let semaphore = Arc::new(Semaphore::new(concurrent_downloads)); let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| { let sem = semaphore.clone(); @@ -277,18 +277,11 @@ impl Manifestation { } } - let cx = PackageContext { - tmp_cx, - notifier: Some(&download_cfg.notifier), - process: download_cfg.process, - }; - - let reader = - utils::FileReaderWithProgress::new_file(&installer_file, &download_cfg.notifier)?; + let reader = utils::FileReaderWithProgress::new_file(&installer_file)?; let package = match format { - CompressionKind::GZip => &TarGzPackage::new(reader, &cx)? as &dyn Package, - CompressionKind::XZ => &TarXzPackage::new(reader, &cx)?, - CompressionKind::ZStd => &TarZStdPackage::new(reader, &cx)?, + CompressionKind::GZip => &TarGzPackage::new(reader, download_cfg)? as &dyn Package, + CompressionKind::XZ => &TarXzPackage::new(reader, download_cfg)?, + CompressionKind::ZStd => &TarZStdPackage::new(reader, download_cfg)?, }; // If the package doesn't contain the component that the @@ -449,12 +442,9 @@ impl Manifestation { .unwrap() .replace(DEFAULT_DIST_SERVER, dl_cfg.tmp_cx.dist_server.as_str()); - dl_cfg - .notifier - .handle(Notification::DownloadingComponent("rust", &url)); - + let status = dl_cfg.status_for("rust"); let dl = dl_cfg - .download_and_check(&url, update_hash, ".tar.gz") + .download_and_check(&url, update_hash, Some(&status), ".tar.gz") .await?; if dl.is_none() { return Ok(None); @@ -474,14 +464,8 @@ impl Manifestation { } // Install all the components in the installer - let reader = utils::FileReaderWithProgress::new_file(&installer_file, &dl_cfg.notifier)?; - let cx = PackageContext { - tmp_cx: dl_cfg.tmp_cx, - notifier: Some(&dl_cfg.notifier), - process: dl_cfg.process, - }; - - let package: &dyn Package = &TarGzPackage::new(reader, &cx)?; + let reader = utils::FileReaderWithProgress::new_file(&installer_file)?; + let package: &dyn Package = &TarGzPackage::new(reader, dl_cfg)?; for component in package.components() { tx = package.install(&self.installation, &component, None, tx)?; } @@ -716,31 +700,33 @@ impl Update { fn components_urls_and_hashes<'a>( &'a self, new_manifest: &'a Manifest, - ) -> Result>> { - let mut components_urls_and_hashes = Vec::new(); - for component in &self.components_to_install { - let package = new_manifest.get_package(component.short_name_in_manifest())?; - let target_package = package.get_target(component.target.as_ref())?; + ) -> impl Iterator> + 'a { + self.components_to_install.iter().filter_map(|component| { + let package = match new_manifest.get_package(component.short_name_in_manifest()) { + Ok(p) => p, + Err(e) => return Some(Err(e)), + }; + + let target_package = match package.get_target(component.target.as_ref()) { + Ok(tp) => tp, + Err(e) => return Some(Err(e)), + }; - if target_package.bins.is_empty() { + match target_package.bins.is_empty() { // This package is not available, no files to download. - continue; + true => None, + // We prefer the first format in the list, since the parsing of the + // manifest leaves us with the files/hash pairs in preference order. + false => Some(Ok((component, &target_package.bins[0]))), } - // We prefer the first format in the list, since the parsing of the - // manifest leaves us with the files/hash pairs in preference order. - components_urls_and_hashes.push(ComponentBinary { - component, - binary: &target_package.bins[0], - }); - } - - Ok(components_urls_and_hashes) + }) } } struct ComponentBinary<'a> { component: &'a Component, binary: &'a HashedBinary, + status: DownloadStatus, } impl<'a> ComponentBinary<'a> { @@ -755,15 +741,13 @@ impl<'a> ComponentBinary<'a> { let downloaded_file = RetryIf::spawn( FixedInterval::from_millis(0).take(max_retries), - || download_cfg.download(url, &self.binary.hash), + || download_cfg.download(url, &self.binary.hash, &self.status), |e: &anyhow::Error| { // retry only known retriable cases match e.downcast_ref::() { Some(RustupError::BrokenPartialFile) | Some(RustupError::DownloadingFile { .. }) => { - download_cfg - .notifier - .handle(Notification::RetryingDownload(url.as_str())); + self.status.retrying(); true } _ => false, diff --git a/src/dist/manifestation/tests.rs b/src/dist/manifestation/tests.rs index 39ad76ee34..376fc4c97e 100644 --- a/src/dist/manifestation/tests.rs +++ b/src/dist/manifestation/tests.rs @@ -16,7 +16,7 @@ use url::Url; use crate::{ dist::{ DEFAULT_DIST_SERVER, Profile, TargetTriple, ToolchainDesc, - download::{DownloadCfg, Notifier}, + download::{DownloadCfg, DownloadTracker}, manifest::{Component, Manifest}, manifestation::{Changes, Manifestation, UpdateStatus}, prefix::InstallPrefix, @@ -482,21 +482,14 @@ impl TestContext { let dl_cfg = DownloadCfg { tmp_cx: &self.tmp_cx, download_dir: &self.download_dir, - notifier: Notifier::new(false, &self.tp.process), + tracker: DownloadTracker::new(false, &self.tp.process), process: &self.tp.process, }; // Download the dist manifest and place it into the installation prefix let manifest_url = make_manifest_url(&self.url, &self.toolchain)?; let manifest_file = self.tmp_cx.new_file()?; - download_file( - &manifest_url, - &manifest_file, - None, - &dl_cfg.notifier, - dl_cfg.process, - ) - .await?; + download_file(&manifest_url, &manifest_file, None, None, dl_cfg.process).await?; let manifest_str = utils::read_file("manifest", &manifest_file)?; let manifest = Manifest::parse(&manifest_str)?; diff --git a/src/dist/mod.rs b/src/dist/mod.rs index 0f956a6c94..36b633b68e 100644 --- a/src/dist/mod.rs +++ b/src/dist/mod.rs @@ -1236,7 +1236,7 @@ pub(crate) async fn dl_v2_manifest( ) -> Result> { let manifest_url = toolchain.manifest_v2_url(dist_root, download.process); match download - .download_and_check(&manifest_url, update_hash, ".toml") + .download_and_check(&manifest_url, update_hash, None, ".toml") .await { Ok(manifest_dl) => { @@ -1293,7 +1293,9 @@ async fn dl_v1_manifest( } let manifest_url = toolchain.manifest_v1_url(dist_root, download.process); - let manifest_dl = download.download_and_check(&manifest_url, None, "").await?; + let manifest_dl = download + .download_and_check(&manifest_url, None, None, "") + .await?; let (manifest_file, _) = manifest_dl.unwrap(); let manifest_str = utils::read_file("manifest", &manifest_file)?; let urls = manifest_str diff --git a/src/download/mod.rs b/src/download/mod.rs index a836927865..cc60e64ef7 100644 --- a/src/download/mod.rs +++ b/src/download/mod.rs @@ -26,11 +26,7 @@ use tracing::info; use tracing::warn; use url::Url; -use crate::{ - dist::download::{Notification, Notifier}, - errors::RustupError, - process::Process, -}; +use crate::{dist::download::DownloadStatus, errors::RustupError, process::Process}; #[cfg(test)] mod tests; @@ -39,10 +35,10 @@ pub(crate) async fn download_file( url: &Url, path: &Path, hasher: Option<&mut Sha256>, - notifier: &Notifier, + status: Option<&DownloadStatus>, process: &Process, ) -> anyhow::Result<()> { - download_file_with_resume(url, path, hasher, false, notifier, process).await + download_file_with_resume(url, path, hasher, false, status, process).await } pub(crate) async fn download_file_with_resume( @@ -50,11 +46,11 @@ pub(crate) async fn download_file_with_resume( path: &Path, hasher: Option<&mut Sha256>, resume_from_partial: bool, - notifier: &Notifier, + status: Option<&DownloadStatus>, process: &Process, ) -> anyhow::Result<()> { use crate::download::DownloadError as DEK; - match download_file_(url, path, hasher, resume_from_partial, notifier, process).await { + match download_file_(url, path, hasher, resume_from_partial, status, process).await { Ok(_) => Ok(()), Err(e) => { if e.downcast_ref::().is_some() { @@ -89,7 +85,7 @@ async fn download_file_( path: &Path, hasher: Option<&mut Sha256>, resume_from_partial: bool, - notifier: &Notifier, + status: Option<&DownloadStatus>, process: &Process, ) -> anyhow::Result<()> { #[cfg(any(feature = "reqwest-rustls-tls", feature = "reqwest-native-tls"))] @@ -111,13 +107,14 @@ async fn download_file_( match msg { Event::DownloadContentLengthReceived(len) => { - notifier.handle(Notification::DownloadContentLengthReceived( - len, - Some(url.as_str()), - )); + if let Some(status) = status { + status.received_length(len) + } } Event::DownloadDataReceived(data) => { - notifier.handle(Notification::DownloadDataReceived(data, Some(url.as_str()))); + if let Some(status) = status { + status.received_data(data.len()) + } } Event::ResumingPartialDownload => debug!("resuming partial download"), } @@ -219,10 +216,12 @@ async fn download_file_( .await; // The notification should only be sent if the download was successful (i.e. didn't timeout) - notifier.handle(match &res { - Ok(_) => Notification::DownloadFinished(Some(url.as_str())), - Err(_) => Notification::DownloadFailed(url.as_str()), - }); + if let Some(status) = status { + match &res { + Ok(_) => status.finished(), + Err(_) => status.failed(), + }; + } res } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 541a872e3b..94b61a937a 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -14,7 +14,6 @@ use retry::{OperationResult, retry}; use tracing::{debug, info, warn}; use url::Url; -use crate::dist::download::{Notification, Notifier}; use crate::errors::*; use crate::process::Process; @@ -445,15 +444,13 @@ pub(crate) fn delete_dir_contents_following_links(dir_path: &Path) { } } -pub(crate) struct FileReaderWithProgress<'a> { +pub(crate) struct FileReaderWithProgress { fh: io::BufReader, - notifier: &'a Notifier, nbytes: u64, - flen: u64, } -impl<'a> FileReaderWithProgress<'a> { - pub(crate) fn new_file(path: &Path, notifier: &'a Notifier) -> Result { +impl FileReaderWithProgress { + pub(crate) fn new_file(path: &Path) -> Result { let fh = match File::open(path) { Ok(fh) => fh, Err(_) => { @@ -465,32 +462,18 @@ impl<'a> FileReaderWithProgress<'a> { }; // Inform the tracker of the file size - let flen = fh.metadata()?.len(); - notifier.handle(Notification::DownloadContentLengthReceived(flen, None)); - - let fh = BufReader::with_capacity(8 * 1024 * 1024, fh); - Ok(FileReaderWithProgress { - fh, - notifier, + fh: BufReader::with_capacity(8 * 1024 * 1024, fh), nbytes: 0, - flen, }) } } -impl io::Read for FileReaderWithProgress<'_> { +impl io::Read for FileReaderWithProgress { fn read(&mut self, buf: &mut [u8]) -> io::Result { match self.fh.read(buf) { Ok(nbytes) => { self.nbytes += nbytes as u64; - if nbytes != 0 { - self.notifier - .handle(Notification::DownloadDataReceived(&buf[0..nbytes], None)); - } - if (nbytes == 0) || (self.flen == self.nbytes) { - self.notifier.handle(Notification::DownloadFinished(None)); - } Ok(nbytes) } Err(e) => Err(e),