diff --git a/src/dist/download.rs b/src/dist/download.rs index 31e89cef74..e8989a3270 100644 --- a/src/dist/download.rs +++ b/src/dist/download.rs @@ -12,6 +12,7 @@ use tracing::{debug, warn}; use url::Url; use crate::config::Cfg; +use crate::dist::DEFAULT_DIST_SERVER; use crate::dist::temp; use crate::download::{download_file, download_file_with_resume}; use crate::errors::RustupError; @@ -116,7 +117,7 @@ impl<'a> DownloadCfg<'a> { } } - pub(crate) fn clean(&self, hashes: &[String]) -> Result<()> { + pub(crate) fn clean(&self, hashes: &[impl AsRef]) -> Result<()> { for hash in hashes.iter() { let used_file = self.download_dir.join(hash); if self.download_dir.join(&used_file).exists() { @@ -207,6 +208,15 @@ impl<'a> DownloadCfg<'a> { retry_time: Mutex::new(None), } } + + pub(crate) fn url(&self, url: &str) -> Result { + match &*self.tmp_cx.dist_server { + server if server != DEFAULT_DIST_SERVER => utils::parse_url( + &url.replace(DEFAULT_DIST_SERVER, self.tmp_cx.dist_server.as_str()), + ), + _ => utils::parse_url(url), + } + } } /// Tracks download progress and displays information about it to a terminal. diff --git a/src/dist/manifestation.rs b/src/dist/manifestation.rs index 8003427627..85b839bd5c 100644 --- a/src/dist/manifestation.rs +++ b/src/dist/manifestation.rs @@ -7,11 +7,8 @@ mod tests; use std::path::Path; use anyhow::{Context, Result, anyhow, bail}; -use futures_util::stream::StreamExt; -use std::sync::Arc; -use tokio::sync::Semaphore; +use futures_util::stream::{FuturesUnordered, StreamExt}; use tracing::{info, warn}; -use url::Url; use crate::dist::component::{Components, DirectoryPackage, Transaction}; use crate::dist::config::Config; @@ -151,11 +148,7 @@ impl Manifestation { } } - let altered = tmp_cx.dist_server != DEFAULT_DIST_SERVER; - // Download component packages and validate hashes - let mut things_to_install = Vec::new(); - let mut things_downloaded = Vec::new(); let components = update .components_urls_and_hashes(new_manifest) .map(|res| { @@ -163,11 +156,12 @@ impl Manifestation { component, binary, status: download_cfg.status_for(component.short_name(new_manifest)), + manifest: new_manifest, + download_cfg, }) }) .collect::>>()?; - let components_len = components.len(); const DEFAULT_CONCURRENT_DOWNLOADS: usize = 2; let concurrent_downloads = download_cfg .process @@ -182,39 +176,6 @@ impl Manifestation { .and_then(|s| s.parse().ok()) .unwrap_or(DEFAULT_MAX_RETRIES); - info!("downloading component(s)"); - let semaphore = Arc::new(Semaphore::new(concurrent_downloads)); - let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| { - let sem = semaphore.clone(); - async move { - let _permit = sem.acquire().await.unwrap(); - let url = if altered { - utils::parse_url( - &bin.binary - .url - .replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()), - )? - } else { - utils::parse_url(&bin.binary.url)? - }; - - bin.download(&url, download_cfg, max_retries, new_manifest) - .await - .map(|downloaded| (bin, downloaded)) - } - }); - if components_len > 0 { - let results = component_stream - .buffered(components_len) - .collect::>() - .await; - for result in results { - let (bin, downloaded_file) = result?; - things_downloaded.push(bin.binary.hash.clone()); - things_to_install.push((bin, downloaded_file)); - } - } - // Begin transaction let mut tx = Transaction::new(prefix.clone(), tmp_cx, download_cfg.process); @@ -253,15 +214,34 @@ impl Manifestation { tx = self.uninstall_component(component, new_manifest, tx, download_cfg.process)?; } - // Install components - for (component_bin, installer_file) in things_to_install { - tx = component_bin.install(installer_file, tx, new_manifest, self, download_cfg)?; + info!("downloading component(s)"); + let mut downloads = FuturesUnordered::new(); + let mut component_iter = components.iter(); + let mut cleanup_downloads = vec![]; + loop { + if downloads.is_empty() && component_iter.len() == 0 { + break; + } + + let installable = downloads.next().await.transpose()?; + while component_iter.len() > 0 && downloads.len() < concurrent_downloads { + if let Some(bin) = component_iter.next() { + downloads.push(bin.download(max_retries)); + } + } + + if let Some((bin, downloaded)) = installable { + cleanup_downloads.push(&bin.binary.hash); + tx = bin.install(downloaded, tx, self)?; + } } // Install new distribution manifest let new_manifest_str = new_manifest.clone().stringify()?; tx.modify_file(rel_installed_manifest_path)?; utils::write_file("manifest", &installed_manifest_path, &new_manifest_str)?; + download_cfg.clean(&cleanup_downloads)?; + drop(downloads); // Write configuration. // @@ -282,8 +262,6 @@ impl Manifestation { // End transaction tx.commit(); - download_cfg.clean(&things_downloaded)?; - Ok(UpdateStatus::Changed) } @@ -692,21 +670,21 @@ struct ComponentBinary<'a> { component: &'a Component, binary: &'a HashedBinary, status: DownloadStatus, + manifest: &'a Manifest, + download_cfg: &'a DownloadCfg<'a>, } impl<'a> ComponentBinary<'a> { - async fn download( - &self, - url: &Url, - download_cfg: &DownloadCfg<'_>, - max_retries: usize, - new_manifest: &Manifest, - ) -> Result { + async fn download(&self, max_retries: usize) -> Result<(&Self, File)> { use tokio_retry::{RetryIf, strategy::FixedInterval}; + let url = self.download_cfg.url(&self.binary.url)?; let downloaded_file = RetryIf::spawn( FixedInterval::from_millis(0).take(max_retries), - || download_cfg.download(url, &self.binary.hash, &self.status), + || { + self.download_cfg + .download(&url, &self.binary.hash, &self.status) + }, |e: &anyhow::Error| { // retry only known retriable cases match e.downcast_ref::() { @@ -720,18 +698,18 @@ impl<'a> ComponentBinary<'a> { }, ) .await - .with_context(|| RustupError::ComponentDownloadFailed(self.component.name(new_manifest)))?; + .with_context(|| { + RustupError::ComponentDownloadFailed(self.component.name(self.manifest)) + })?; - Ok(downloaded_file) + Ok((self, downloaded_file)) } fn install<'t>( &self, installer_file: File, tx: Transaction<'t>, - new_manifest: &Manifest, manifestation: &Manifestation, - download_cfg: &DownloadCfg<'_>, ) -> Result> { // For historical reasons, the rust-installer component // names are not the same as the dist manifest component @@ -740,12 +718,13 @@ impl<'a> ComponentBinary<'a> { let component = self.component; let pkg_name = component.name_in_manifest(); let short_pkg_name = component.short_name_in_manifest(); - let short_name = component.short_name(new_manifest); + let short_name = component.short_name(self.manifest); self.status.installing(); let reader = utils::FileReaderWithProgress::new_file(&installer_file)?; - let package = DirectoryPackage::compressed(reader, self.binary.compression, download_cfg)?; + let package = + DirectoryPackage::compressed(reader, self.binary.compression, self.download_cfg)?; // If the package doesn't contain the component that the // manifest says it does then somebody must be playing a joke on us. diff --git a/tests/suite/cli_rustup.rs b/tests/suite/cli_rustup.rs index 09303bda60..708e3757ec 100644 --- a/tests/suite/cli_rustup.rs +++ b/tests/suite/cli_rustup.rs @@ -35,7 +35,7 @@ async fn rustup_stable() { .with_stderr(snapbox::str![[r#" info: syncing channel updates for stable-[HOST_TRIPLE] info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0) -info: downloading component[..] +info: removing previous version of component cargo ... info: cleaning up downloads & tmp directories @@ -131,15 +131,15 @@ async fn rustup_all_channels() { .with_stderr(snapbox::str![[r#" info: syncing channel updates for stable-[HOST_TRIPLE] info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0) -info: downloading component[..] +info: removing previous version of component cargo ... info: syncing channel updates for beta-[HOST_TRIPLE] info: latest update on 2015-01-02 for version 1.2.0 (hash-beta-1.2.0) -info: downloading component[..] +info: removing previous version of component cargo ... info: syncing channel updates for nightly-[HOST_TRIPLE] info: latest update on 2015-01-02 for version 1.3.0 (hash-nightly-2) -info: downloading component[..] +info: removing previous version of component cargo ... info: cleaning up downloads & tmp directories @@ -208,12 +208,12 @@ async fn rustup_some_channels_up_to_date() { .with_stderr(snapbox::str![[r#" info: syncing channel updates for stable-[HOST_TRIPLE] info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0) -info: downloading component[..] +info: removing previous version of component cargo ... info: syncing channel updates for beta-[HOST_TRIPLE] info: syncing channel updates for nightly-[HOST_TRIPLE] info: latest update on 2015-01-02 for version 1.3.0 (hash-nightly-2) -info: downloading component[..] +info: removing previous version of component cargo ... info: cleaning up downloads & tmp directories diff --git a/tests/suite/cli_rustup_ui/rustup_update_updated.stderr.term.svg b/tests/suite/cli_rustup_ui/rustup_update_updated.stderr.term.svg index a0006f8e71..7e73169d03 100644 --- a/tests/suite/cli_rustup_ui/rustup_update_updated.stderr.term.svg +++ b/tests/suite/cli_rustup_ui/rustup_update_updated.stderr.term.svg @@ -21,15 +21,15 @@ info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0) - info: downloading component(s) + info: removing previous version of component cargo - info: removing previous version of component cargo + info: removing previous version of component rust-docs - info: removing previous version of component rust-docs + info: removing previous version of component rust-std - info: removing previous version of component rust-std + info: removing previous version of component rustc - info: removing previous version of component rustc + info: downloading component(s) info: cleaning up downloads & tmp directories