From 00944c34c83e2a34a1b6059d4b21b40c7f0cd728 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Mon, 27 Oct 2025 15:33:51 +0100 Subject: [PATCH 1/4] dist: move URL alteration logic into DownloadCfg method --- src/dist/download.rs | 10 ++++++++++ src/dist/manifestation.rs | 19 +++---------------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/dist/download.rs b/src/dist/download.rs index 31e89cef74..defec169d1 100644 --- a/src/dist/download.rs +++ b/src/dist/download.rs @@ -12,6 +12,7 @@ use tracing::{debug, warn}; use url::Url; use crate::config::Cfg; +use crate::dist::DEFAULT_DIST_SERVER; use crate::dist::temp; use crate::download::{download_file, download_file_with_resume}; use crate::errors::RustupError; @@ -207,6 +208,15 @@ impl<'a> DownloadCfg<'a> { retry_time: Mutex::new(None), } } + + pub(crate) fn url(&self, url: &str) -> Result { + match &*self.tmp_cx.dist_server { + server if server != DEFAULT_DIST_SERVER => utils::parse_url( + &url.replace(DEFAULT_DIST_SERVER, self.tmp_cx.dist_server.as_str()), + ), + _ => utils::parse_url(url), + } + } } /// Tracks download progress and displays information about it to a terminal. diff --git a/src/dist/manifestation.rs b/src/dist/manifestation.rs index 8003427627..9548d05fcf 100644 --- a/src/dist/manifestation.rs +++ b/src/dist/manifestation.rs @@ -11,7 +11,6 @@ use futures_util::stream::StreamExt; use std::sync::Arc; use tokio::sync::Semaphore; use tracing::{info, warn}; -use url::Url; use crate::dist::component::{Components, DirectoryPackage, Transaction}; use crate::dist::config::Config; @@ -151,8 +150,6 @@ impl Manifestation { } } - let altered = tmp_cx.dist_server != DEFAULT_DIST_SERVER; - // Download component packages and validate hashes let mut things_to_install = Vec::new(); let mut things_downloaded = Vec::new(); @@ -188,17 +185,7 @@ impl Manifestation { let sem = semaphore.clone(); async move { let _permit = sem.acquire().await.unwrap(); - let url = if altered { - utils::parse_url( - &bin.binary - .url - .replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()), - )? - } else { - utils::parse_url(&bin.binary.url)? - }; - - bin.download(&url, download_cfg, max_retries, new_manifest) + bin.download(download_cfg, max_retries, new_manifest) .await .map(|downloaded| (bin, downloaded)) } @@ -697,16 +684,16 @@ struct ComponentBinary<'a> { impl<'a> ComponentBinary<'a> { async fn download( &self, - url: &Url, download_cfg: &DownloadCfg<'_>, max_retries: usize, new_manifest: &Manifest, ) -> Result { use tokio_retry::{RetryIf, strategy::FixedInterval}; + let url = download_cfg.url(&self.binary.url)?; let downloaded_file = RetryIf::spawn( FixedInterval::from_millis(0).take(max_retries), - || download_cfg.download(url, &self.binary.hash, &self.status), + || download_cfg.download(&url, &self.binary.hash, &self.status), |e: &anyhow::Error| { // retry only known retriable cases match e.downcast_ref::() { From 47a20367ff65d2b51ad6f3da73605bec280892f0 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Tue, 28 Oct 2025 12:18:06 +0100 Subject: [PATCH 2/4] dist: yield self when download is complete --- src/dist/manifestation.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/dist/manifestation.rs b/src/dist/manifestation.rs index 9548d05fcf..f8040f04b2 100644 --- a/src/dist/manifestation.rs +++ b/src/dist/manifestation.rs @@ -185,9 +185,7 @@ impl Manifestation { let sem = semaphore.clone(); async move { let _permit = sem.acquire().await.unwrap(); - bin.download(download_cfg, max_retries, new_manifest) - .await - .map(|downloaded| (bin, downloaded)) + bin.download(download_cfg, max_retries, new_manifest).await } }); if components_len > 0 { @@ -683,11 +681,11 @@ struct ComponentBinary<'a> { impl<'a> ComponentBinary<'a> { async fn download( - &self, + self, download_cfg: &DownloadCfg<'_>, max_retries: usize, new_manifest: &Manifest, - ) -> Result { + ) -> Result<(Self, File)> { use tokio_retry::{RetryIf, strategy::FixedInterval}; let url = download_cfg.url(&self.binary.url)?; @@ -709,7 +707,7 @@ impl<'a> ComponentBinary<'a> { .await .with_context(|| RustupError::ComponentDownloadFailed(self.component.name(new_manifest)))?; - Ok(downloaded_file) + Ok((self, downloaded_file)) } fn install<'t>( From 6c7b9f1b497bba2ef1f7ee833fee05a6bfb0313c Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Tue, 28 Oct 2025 12:20:45 +0100 Subject: [PATCH 3/4] dist: store more context in ComponentBinary --- src/dist/manifestation.rs | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/dist/manifestation.rs b/src/dist/manifestation.rs index f8040f04b2..2ad44fa346 100644 --- a/src/dist/manifestation.rs +++ b/src/dist/manifestation.rs @@ -160,6 +160,8 @@ impl Manifestation { component, binary, status: download_cfg.status_for(component.short_name(new_manifest)), + manifest: new_manifest, + download_cfg, }) }) .collect::>>()?; @@ -185,7 +187,7 @@ impl Manifestation { let sem = semaphore.clone(); async move { let _permit = sem.acquire().await.unwrap(); - bin.download(download_cfg, max_retries, new_manifest).await + bin.download(max_retries).await } }); if components_len > 0 { @@ -240,7 +242,7 @@ impl Manifestation { // Install components for (component_bin, installer_file) in things_to_install { - tx = component_bin.install(installer_file, tx, new_manifest, self, download_cfg)?; + tx = component_bin.install(installer_file, tx, self)?; } // Install new distribution manifest @@ -677,21 +679,21 @@ struct ComponentBinary<'a> { component: &'a Component, binary: &'a HashedBinary, status: DownloadStatus, + manifest: &'a Manifest, + download_cfg: &'a DownloadCfg<'a>, } impl<'a> ComponentBinary<'a> { - async fn download( - self, - download_cfg: &DownloadCfg<'_>, - max_retries: usize, - new_manifest: &Manifest, - ) -> Result<(Self, File)> { + async fn download(self, max_retries: usize) -> Result<(Self, File)> { use tokio_retry::{RetryIf, strategy::FixedInterval}; - let url = download_cfg.url(&self.binary.url)?; + let url = self.download_cfg.url(&self.binary.url)?; let downloaded_file = RetryIf::spawn( FixedInterval::from_millis(0).take(max_retries), - || download_cfg.download(&url, &self.binary.hash, &self.status), + || { + self.download_cfg + .download(&url, &self.binary.hash, &self.status) + }, |e: &anyhow::Error| { // retry only known retriable cases match e.downcast_ref::() { @@ -705,7 +707,9 @@ impl<'a> ComponentBinary<'a> { }, ) .await - .with_context(|| RustupError::ComponentDownloadFailed(self.component.name(new_manifest)))?; + .with_context(|| { + RustupError::ComponentDownloadFailed(self.component.name(self.manifest)) + })?; Ok((self, downloaded_file)) } @@ -714,9 +718,7 @@ impl<'a> ComponentBinary<'a> { &self, installer_file: File, tx: Transaction<'t>, - new_manifest: &Manifest, manifestation: &Manifestation, - download_cfg: &DownloadCfg<'_>, ) -> Result> { // For historical reasons, the rust-installer component // names are not the same as the dist manifest component @@ -725,12 +727,13 @@ impl<'a> ComponentBinary<'a> { let component = self.component; let pkg_name = component.name_in_manifest(); let short_pkg_name = component.short_name_in_manifest(); - let short_name = component.short_name(new_manifest); + let short_name = component.short_name(self.manifest); self.status.installing(); let reader = utils::FileReaderWithProgress::new_file(&installer_file)?; - let package = DirectoryPackage::compressed(reader, self.binary.compression, download_cfg)?; + let package = + DirectoryPackage::compressed(reader, self.binary.compression, self.download_cfg)?; // If the package doesn't contain the component that the // manifest says it does then somebody must be playing a joke on us. From fd53671c0277d23e3072a74e616776bce511655c Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Mon, 27 Oct 2025 15:50:35 +0100 Subject: [PATCH 4/4] dist: install while downloading --- src/dist/download.rs | 2 +- src/dist/manifestation.rs | 57 ++++++++----------- tests/suite/cli_rustup.rs | 12 ++-- .../rustup_update_updated.stderr.term.svg | 10 ++-- 4 files changed, 36 insertions(+), 45 deletions(-) diff --git a/src/dist/download.rs b/src/dist/download.rs index defec169d1..e8989a3270 100644 --- a/src/dist/download.rs +++ b/src/dist/download.rs @@ -117,7 +117,7 @@ impl<'a> DownloadCfg<'a> { } } - pub(crate) fn clean(&self, hashes: &[String]) -> Result<()> { + pub(crate) fn clean(&self, hashes: &[impl AsRef]) -> Result<()> { for hash in hashes.iter() { let used_file = self.download_dir.join(hash); if self.download_dir.join(&used_file).exists() { diff --git a/src/dist/manifestation.rs b/src/dist/manifestation.rs index 2ad44fa346..85b839bd5c 100644 --- a/src/dist/manifestation.rs +++ b/src/dist/manifestation.rs @@ -7,9 +7,7 @@ mod tests; use std::path::Path; use anyhow::{Context, Result, anyhow, bail}; -use futures_util::stream::StreamExt; -use std::sync::Arc; -use tokio::sync::Semaphore; +use futures_util::stream::{FuturesUnordered, StreamExt}; use tracing::{info, warn}; use crate::dist::component::{Components, DirectoryPackage, Transaction}; @@ -151,8 +149,6 @@ impl Manifestation { } // Download component packages and validate hashes - let mut things_to_install = Vec::new(); - let mut things_downloaded = Vec::new(); let components = update .components_urls_and_hashes(new_manifest) .map(|res| { @@ -166,7 +162,6 @@ impl Manifestation { }) .collect::>>()?; - let components_len = components.len(); const DEFAULT_CONCURRENT_DOWNLOADS: usize = 2; let concurrent_downloads = download_cfg .process @@ -181,27 +176,6 @@ impl Manifestation { .and_then(|s| s.parse().ok()) .unwrap_or(DEFAULT_MAX_RETRIES); - info!("downloading component(s)"); - let semaphore = Arc::new(Semaphore::new(concurrent_downloads)); - let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| { - let sem = semaphore.clone(); - async move { - let _permit = sem.acquire().await.unwrap(); - bin.download(max_retries).await - } - }); - if components_len > 0 { - let results = component_stream - .buffered(components_len) - .collect::>() - .await; - for result in results { - let (bin, downloaded_file) = result?; - things_downloaded.push(bin.binary.hash.clone()); - things_to_install.push((bin, downloaded_file)); - } - } - // Begin transaction let mut tx = Transaction::new(prefix.clone(), tmp_cx, download_cfg.process); @@ -240,15 +214,34 @@ impl Manifestation { tx = self.uninstall_component(component, new_manifest, tx, download_cfg.process)?; } - // Install components - for (component_bin, installer_file) in things_to_install { - tx = component_bin.install(installer_file, tx, self)?; + info!("downloading component(s)"); + let mut downloads = FuturesUnordered::new(); + let mut component_iter = components.iter(); + let mut cleanup_downloads = vec![]; + loop { + if downloads.is_empty() && component_iter.len() == 0 { + break; + } + + let installable = downloads.next().await.transpose()?; + while component_iter.len() > 0 && downloads.len() < concurrent_downloads { + if let Some(bin) = component_iter.next() { + downloads.push(bin.download(max_retries)); + } + } + + if let Some((bin, downloaded)) = installable { + cleanup_downloads.push(&bin.binary.hash); + tx = bin.install(downloaded, tx, self)?; + } } // Install new distribution manifest let new_manifest_str = new_manifest.clone().stringify()?; tx.modify_file(rel_installed_manifest_path)?; utils::write_file("manifest", &installed_manifest_path, &new_manifest_str)?; + download_cfg.clean(&cleanup_downloads)?; + drop(downloads); // Write configuration. // @@ -269,8 +262,6 @@ impl Manifestation { // End transaction tx.commit(); - download_cfg.clean(&things_downloaded)?; - Ok(UpdateStatus::Changed) } @@ -684,7 +675,7 @@ struct ComponentBinary<'a> { } impl<'a> ComponentBinary<'a> { - async fn download(self, max_retries: usize) -> Result<(Self, File)> { + async fn download(&self, max_retries: usize) -> Result<(&Self, File)> { use tokio_retry::{RetryIf, strategy::FixedInterval}; let url = self.download_cfg.url(&self.binary.url)?; diff --git a/tests/suite/cli_rustup.rs b/tests/suite/cli_rustup.rs index 09303bda60..708e3757ec 100644 --- a/tests/suite/cli_rustup.rs +++ b/tests/suite/cli_rustup.rs @@ -35,7 +35,7 @@ async fn rustup_stable() { .with_stderr(snapbox::str![[r#" info: syncing channel updates for stable-[HOST_TRIPLE] info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0) -info: downloading component[..] +info: removing previous version of component cargo ... info: cleaning up downloads & tmp directories @@ -131,15 +131,15 @@ async fn rustup_all_channels() { .with_stderr(snapbox::str![[r#" info: syncing channel updates for stable-[HOST_TRIPLE] info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0) -info: downloading component[..] +info: removing previous version of component cargo ... info: syncing channel updates for beta-[HOST_TRIPLE] info: latest update on 2015-01-02 for version 1.2.0 (hash-beta-1.2.0) -info: downloading component[..] +info: removing previous version of component cargo ... info: syncing channel updates for nightly-[HOST_TRIPLE] info: latest update on 2015-01-02 for version 1.3.0 (hash-nightly-2) -info: downloading component[..] +info: removing previous version of component cargo ... info: cleaning up downloads & tmp directories @@ -208,12 +208,12 @@ async fn rustup_some_channels_up_to_date() { .with_stderr(snapbox::str![[r#" info: syncing channel updates for stable-[HOST_TRIPLE] info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0) -info: downloading component[..] +info: removing previous version of component cargo ... info: syncing channel updates for beta-[HOST_TRIPLE] info: syncing channel updates for nightly-[HOST_TRIPLE] info: latest update on 2015-01-02 for version 1.3.0 (hash-nightly-2) -info: downloading component[..] +info: removing previous version of component cargo ... info: cleaning up downloads & tmp directories diff --git a/tests/suite/cli_rustup_ui/rustup_update_updated.stderr.term.svg b/tests/suite/cli_rustup_ui/rustup_update_updated.stderr.term.svg index a0006f8e71..7e73169d03 100644 --- a/tests/suite/cli_rustup_ui/rustup_update_updated.stderr.term.svg +++ b/tests/suite/cli_rustup_ui/rustup_update_updated.stderr.term.svg @@ -21,15 +21,15 @@ info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0) - info: downloading component(s) + info: removing previous version of component cargo - info: removing previous version of component cargo + info: removing previous version of component rust-docs - info: removing previous version of component rust-docs + info: removing previous version of component rust-std - info: removing previous version of component rust-std + info: removing previous version of component rustc - info: removing previous version of component rustc + info: downloading component(s) info: cleaning up downloads & tmp directories