Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/dist/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tracing::{debug, warn};
use url::Url;

use crate::config::Cfg;
use crate::dist::DEFAULT_DIST_SERVER;
use crate::dist::temp;
use crate::download::{download_file, download_file_with_resume};
use crate::errors::RustupError;
Expand Down Expand Up @@ -116,7 +117,7 @@ impl<'a> DownloadCfg<'a> {
}
}

pub(crate) fn clean(&self, hashes: &[String]) -> Result<()> {
pub(crate) fn clean(&self, hashes: &[impl AsRef<Path>]) -> Result<()> {
for hash in hashes.iter() {
let used_file = self.download_dir.join(hash);
if self.download_dir.join(&used_file).exists() {
Expand Down Expand Up @@ -207,6 +208,15 @@ impl<'a> DownloadCfg<'a> {
retry_time: Mutex::new(None),
}
}

pub(crate) fn url(&self, url: &str) -> Result<Url> {
match &*self.tmp_cx.dist_server {
server if server != DEFAULT_DIST_SERVER => utils::parse_url(
&url.replace(DEFAULT_DIST_SERVER, self.tmp_cx.dist_server.as_str()),
),
_ => utils::parse_url(url),
}
}
}

/// Tracks download progress and displays information about it to a terminal.
Expand Down
101 changes: 40 additions & 61 deletions src/dist/manifestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ mod tests;
use std::path::Path;

use anyhow::{Context, Result, anyhow, bail};
use futures_util::stream::StreamExt;
use std::sync::Arc;
use tokio::sync::Semaphore;
use futures_util::stream::{FuturesUnordered, StreamExt};
use tracing::{info, warn};
use url::Url;

use crate::dist::component::{Components, DirectoryPackage, Transaction};
use crate::dist::config::Config;
Expand Down Expand Up @@ -151,23 +148,20 @@ impl Manifestation {
}
}

let altered = tmp_cx.dist_server != DEFAULT_DIST_SERVER;

// Download component packages and validate hashes
let mut things_to_install = Vec::new();
let mut things_downloaded = Vec::new();
let components = update
.components_urls_and_hashes(new_manifest)
.map(|res| {
res.map(|(component, binary)| ComponentBinary {
component,
binary,
status: download_cfg.status_for(component.short_name(new_manifest)),
manifest: new_manifest,
download_cfg,
})
})
.collect::<Result<Vec<_>>>()?;

let components_len = components.len();
const DEFAULT_CONCURRENT_DOWNLOADS: usize = 2;
let concurrent_downloads = download_cfg
.process
Expand All @@ -182,39 +176,6 @@ impl Manifestation {
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_MAX_RETRIES);

info!("downloading component(s)");
let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| {
let sem = semaphore.clone();
async move {
let _permit = sem.acquire().await.unwrap();
let url = if altered {
utils::parse_url(
&bin.binary
.url
.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()),
)?
} else {
utils::parse_url(&bin.binary.url)?
};

bin.download(&url, download_cfg, max_retries, new_manifest)
.await
.map(|downloaded| (bin, downloaded))
}
});
if components_len > 0 {
let results = component_stream
.buffered(components_len)
.collect::<Vec<_>>()
.await;
for result in results {
let (bin, downloaded_file) = result?;
things_downloaded.push(bin.binary.hash.clone());
things_to_install.push((bin, downloaded_file));
}
}

// Begin transaction
let mut tx = Transaction::new(prefix.clone(), tmp_cx, download_cfg.process);

Expand Down Expand Up @@ -253,15 +214,34 @@ impl Manifestation {
tx = self.uninstall_component(component, new_manifest, tx, download_cfg.process)?;
}

// Install components
for (component_bin, installer_file) in things_to_install {
tx = component_bin.install(installer_file, tx, new_manifest, self, download_cfg)?;
info!("downloading component(s)");
let mut downloads = FuturesUnordered::new();
let mut component_iter = components.iter();
let mut cleanup_downloads = vec![];
loop {
if downloads.is_empty() && component_iter.len() == 0 {
break;
}

let installable = downloads.next().await.transpose()?;
while component_iter.len() > 0 && downloads.len() < concurrent_downloads {
if let Some(bin) = component_iter.next() {
downloads.push(bin.download(max_retries));
}
}

if let Some((bin, downloaded)) = installable {
cleanup_downloads.push(&bin.binary.hash);
tx = bin.install(downloaded, tx, self)?;
}
}

// Install new distribution manifest
let new_manifest_str = new_manifest.clone().stringify()?;
tx.modify_file(rel_installed_manifest_path)?;
utils::write_file("manifest", &installed_manifest_path, &new_manifest_str)?;
download_cfg.clean(&cleanup_downloads)?;
drop(downloads);

// Write configuration.
//
Expand All @@ -282,8 +262,6 @@ impl Manifestation {
// End transaction
tx.commit();

download_cfg.clean(&things_downloaded)?;

Ok(UpdateStatus::Changed)
}

Expand Down Expand Up @@ -692,21 +670,21 @@ struct ComponentBinary<'a> {
component: &'a Component,
binary: &'a HashedBinary,
status: DownloadStatus,
manifest: &'a Manifest,
download_cfg: &'a DownloadCfg<'a>,
}

impl<'a> ComponentBinary<'a> {
async fn download(
&self,
url: &Url,
download_cfg: &DownloadCfg<'_>,
max_retries: usize,
new_manifest: &Manifest,
) -> Result<File> {
async fn download(&self, max_retries: usize) -> Result<(&Self, File)> {
use tokio_retry::{RetryIf, strategy::FixedInterval};

let url = self.download_cfg.url(&self.binary.url)?;
let downloaded_file = RetryIf::spawn(
FixedInterval::from_millis(0).take(max_retries),
|| download_cfg.download(url, &self.binary.hash, &self.status),
|| {
self.download_cfg
.download(&url, &self.binary.hash, &self.status)
},
|e: &anyhow::Error| {
// retry only known retriable cases
match e.downcast_ref::<RustupError>() {
Expand All @@ -720,18 +698,18 @@ impl<'a> ComponentBinary<'a> {
},
)
.await
.with_context(|| RustupError::ComponentDownloadFailed(self.component.name(new_manifest)))?;
.with_context(|| {
RustupError::ComponentDownloadFailed(self.component.name(self.manifest))
})?;

Ok(downloaded_file)
Ok((self, downloaded_file))
}

fn install<'t>(
&self,
installer_file: File,
tx: Transaction<'t>,
new_manifest: &Manifest,
manifestation: &Manifestation,
download_cfg: &DownloadCfg<'_>,
) -> Result<Transaction<'t>> {
// For historical reasons, the rust-installer component
// names are not the same as the dist manifest component
Expand All @@ -740,12 +718,13 @@ impl<'a> ComponentBinary<'a> {
let component = self.component;
let pkg_name = component.name_in_manifest();
let short_pkg_name = component.short_name_in_manifest();
let short_name = component.short_name(new_manifest);
let short_name = component.short_name(self.manifest);

self.status.installing();

let reader = utils::FileReaderWithProgress::new_file(&installer_file)?;
let package = DirectoryPackage::compressed(reader, self.binary.compression, download_cfg)?;
let package =
DirectoryPackage::compressed(reader, self.binary.compression, self.download_cfg)?;

// If the package doesn't contain the component that the
// manifest says it does then somebody must be playing a joke on us.
Expand Down
12 changes: 6 additions & 6 deletions tests/suite/cli_rustup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn rustup_stable() {
.with_stderr(snapbox::str![[r#"
info: syncing channel updates for stable-[HOST_TRIPLE]
info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0)
info: downloading component[..]
info: removing previous version of component cargo
...
info: cleaning up downloads & tmp directories

Expand Down Expand Up @@ -131,15 +131,15 @@ async fn rustup_all_channels() {
.with_stderr(snapbox::str![[r#"
info: syncing channel updates for stable-[HOST_TRIPLE]
info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0)
info: downloading component[..]
info: removing previous version of component cargo
...
info: syncing channel updates for beta-[HOST_TRIPLE]
info: latest update on 2015-01-02 for version 1.2.0 (hash-beta-1.2.0)
info: downloading component[..]
info: removing previous version of component cargo
...
info: syncing channel updates for nightly-[HOST_TRIPLE]
info: latest update on 2015-01-02 for version 1.3.0 (hash-nightly-2)
info: downloading component[..]
info: removing previous version of component cargo
...
info: cleaning up downloads & tmp directories

Expand Down Expand Up @@ -208,12 +208,12 @@ async fn rustup_some_channels_up_to_date() {
.with_stderr(snapbox::str![[r#"
info: syncing channel updates for stable-[HOST_TRIPLE]
info: latest update on 2015-01-02 for version 1.1.0 (hash-stable-1.1.0)
info: downloading component[..]
info: removing previous version of component cargo
...
info: syncing channel updates for beta-[HOST_TRIPLE]
info: syncing channel updates for nightly-[HOST_TRIPLE]
info: latest update on 2015-01-02 for version 1.3.0 (hash-nightly-2)
info: downloading component[..]
info: removing previous version of component cargo
...
info: cleaning up downloads & tmp directories

Expand Down
10 changes: 5 additions & 5 deletions tests/suite/cli_rustup_ui/rustup_update_updated.stderr.term.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.