Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions src/cli/self_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1275,14 +1275,7 @@ pub(crate) async fn prepare_update(dl_cfg: &DownloadCfg<'_>) -> Result<Option<Pa

// Download new version
info!("downloading self-update (new version: {available_version})");
download_file(
&download_url,
&setup_path,
None,
&dl_cfg.notifier,
dl_cfg.process,
)
.await?;
download_file(&download_url, &setup_path, None, None, dl_cfg.process).await?;

// Mark as executable
utils::make_executable(&setup_path)?;
Expand All @@ -1301,14 +1294,7 @@ async fn get_available_rustup_version(dl_cfg: &DownloadCfg<'_>) -> Result<String
let release_file_url = format!("{update_root}/release-stable.toml");
let release_file_url = utils::parse_url(&release_file_url)?;
let release_file = tempdir.path().join("release-stable.toml");
download_file(
&release_file_url,
&release_file,
None,
&dl_cfg.notifier,
dl_cfg.process,
)
.await?;
download_file(&release_file_url, &release_file, None, None, dl_cfg.process).await?;
let release_toml_str = utils::read_file("rustup release", &release_file)?;
let release_toml = toml::from_str::<RustupManifest>(&release_toml_str)
.context("unable to parse rustup release file")?;
Expand Down
2 changes: 1 addition & 1 deletion src/cli/self_update/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ pub(crate) async fn try_install_msvc(
&visual_studio_url,
&visual_studio,
None,
&dl_cfg.notifier,
None,
dl_cfg.process,
)
.await?;
Expand Down
4 changes: 1 addition & 3 deletions src/diskio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ use std::{fmt::Debug, fs::OpenOptions};

use anyhow::Result;

use crate::dist::download::Notifier;
use crate::process::Process;

/// Carries the implementation specific data for complete file transfers into the executor.
Expand Down Expand Up @@ -443,13 +442,12 @@ pub(crate) fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {

/// Get the executor for disk IO.
pub(crate) fn get_executor<'a>(
notifier: Option<&'a Notifier>,
ram_budget: usize,
process: &Process,
) -> anyhow::Result<Box<dyn Executor + 'a>> {
// If this gets lots of use, consider exposing via the config file.
Ok(match process.io_thread_count()? {
0 | 1 => Box::new(immediate::ImmediateUnpacker::new()),
n => Box::new(threaded::Threaded::new(notifier, n, ram_budget)),
n => Box::new(threaded::Threaded::new(n, ram_budget)),
})
}
4 changes: 2 additions & 2 deletions src/diskio/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn test_incremental_file(io_threads: &str) -> Result<()> {

let mut written = 0;
let mut file_finished = false;
let mut io_executor: Box<dyn Executor> = get_executor(None, 32 * 1024 * 1024, &tp.process)?;
let mut io_executor: Box<dyn Executor> = get_executor(32 * 1024 * 1024, &tp.process)?;
let (item, mut sender) = Item::write_file_segmented(
work_dir.path().join("scratch"),
0o666,
Expand Down Expand Up @@ -90,7 +90,7 @@ fn test_complete_file(io_threads: &str) -> Result<()> {
vars.insert("RUSTUP_IO_THREADS".to_string(), io_threads.to_string());
let tp = TestProcess::with_vars(vars);

let mut io_executor: Box<dyn Executor> = get_executor(None, 32 * 1024 * 1024, &tp.process)?;
let mut io_executor: Box<dyn Executor> = get_executor(32 * 1024 * 1024, &tp.process)?;
let mut chunk = io_executor.get_buffer(10);
chunk.extend(b"0123456789");
assert_eq!(chunk.len(), 10);
Expand Down
54 changes: 15 additions & 39 deletions src/diskio/threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use sharded_slab::pool::{OwnedRef, OwnedRefMut};
use tracing::debug;

use super::{CompletedIo, Executor, Item, perform};
use crate::dist::download::{Notification, Notifier};

#[derive(Copy, Clone, Debug, Enum)]
pub(crate) enum Bucket {
Expand Down Expand Up @@ -96,23 +95,18 @@ impl fmt::Debug for Pool {
}
}

pub(crate) struct Threaded<'a> {
pub(crate) struct Threaded {
n_files: Arc<AtomicUsize>,
pool: threadpool::ThreadPool,
notifier: Option<&'a Notifier>,
rx: Receiver<Task>,
tx: Sender<Task>,
vec_pools: EnumMap<Bucket, Pool>,
ram_budget: usize,
}

impl<'a> Threaded<'a> {
impl Threaded {
/// Construct a new Threaded executor.
pub(crate) fn new(
notify_handler: Option<&'a Notifier>,
thread_count: usize,
ram_budget: usize,
) -> Self {
pub(crate) fn new(thread_count: usize, ram_budget: usize) -> Self {
// Defaults to hardware thread count threads; this is suitable for
// our needs as IO bound operations tend to show up as write latencies
// rather than close latencies, so we don't need to look at
Expand Down Expand Up @@ -168,7 +162,6 @@ impl<'a> Threaded<'a> {
Self {
n_files: Arc::new(AtomicUsize::new(0)),
pool,
notifier: notify_handler,
rx,
tx,
vec_pools,
Expand Down Expand Up @@ -233,7 +226,7 @@ impl<'a> Threaded<'a> {
}
}

impl Executor for Threaded<'_> {
impl Executor for Threaded {
fn dispatch(&self, item: Item) -> Box<dyn Iterator<Item = CompletedIo> + '_> {
// Yield any completed work before accepting new work - keep memory
// pressure under control
Expand All @@ -260,39 +253,22 @@ impl Executor for Threaded<'_> {
// items, and the download tracker's progress is confounded with
// actual handling of data today, we synthesis a data buffer and
// pretend to have bytes to deliver.
let mut prev_files = self.n_files.load(Ordering::Relaxed);
if let Some(notifier) = self.notifier {
notifier.handle(Notification::DownloadFinished(None));
notifier.handle(Notification::DownloadContentLengthReceived(
prev_files as u64,
None,
));
}
let prev_files = self.n_files.load(Ordering::Relaxed);
if prev_files > 50 {
debug!("{prev_files} deferred IO operations");
}
let buf: Vec<u8> = vec![0; prev_files];

// Cheap wrap-around correctness check - we have 20k files, more than
// 32K means we subtracted from 0 somewhere.
assert!(32767 > prev_files);
let mut current_files = prev_files;
while current_files != 0 {
use std::thread::sleep;
sleep(std::time::Duration::from_millis(100));
prev_files = current_files;
current_files = self.n_files.load(Ordering::Relaxed);
let step_count = prev_files - current_files;
if let Some(notifier) = self.notifier {
notifier.handle(Notification::DownloadDataReceived(
&buf[0..step_count],
None,
));
}
}
self.pool.join();
if let Some(notifier) = self.notifier {
notifier.handle(Notification::DownloadFinished(None));
}

// close the feedback channel so that blocking reads on it can
// complete. send is atomic, and we know the threads completed from the
// pool join, so this is race-free. It is possible that try_iter is safe
Expand Down Expand Up @@ -352,19 +328,19 @@ impl Executor for Threaded<'_> {
}
}

impl Drop for Threaded<'_> {
impl Drop for Threaded {
fn drop(&mut self) {
// We are not permitted to fail - consume but do not handle the items.
self.join().for_each(drop);
}
}

struct JoinIterator<'a, 'b> {
executor: &'a Threaded<'b>,
struct JoinIterator<'a> {
executor: &'a Threaded,
consume_sentinel: bool,
}

impl JoinIterator<'_, '_> {
impl JoinIterator<'_> {
fn inner<T: Iterator<Item = Task>>(&self, mut iter: T) -> Option<CompletedIo> {
loop {
let task_o = iter.next();
Expand All @@ -388,7 +364,7 @@ impl JoinIterator<'_, '_> {
}
}

impl Iterator for JoinIterator<'_, '_> {
impl Iterator for JoinIterator<'_> {
type Item = CompletedIo;

fn next(&mut self) -> Option<CompletedIo> {
Expand All @@ -400,12 +376,12 @@ impl Iterator for JoinIterator<'_, '_> {
}
}

struct SubmitIterator<'a, 'b> {
executor: &'a Threaded<'b>,
struct SubmitIterator<'a> {
executor: &'a Threaded,
item: Cell<Option<Item>>,
}

impl Iterator for SubmitIterator<'_, '_> {
impl Iterator for SubmitIterator<'_> {
type Item = CompletedIo;

fn next(&mut self) -> Option<CompletedIo> {
Expand Down
39 changes: 16 additions & 23 deletions src/dist/component/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ use tracing::{error, trace, warn};
use crate::diskio::{CompletedIo, Executor, FileBuffer, IO_CHUNK_SIZE, Item, Kind, get_executor};
use crate::dist::component::components::*;
use crate::dist::component::transaction::*;
use crate::dist::download::Notifier;
use crate::dist::download::DownloadCfg;
use crate::dist::temp;
use crate::errors::*;
use crate::process::Process;
use crate::utils;
use crate::utils::units::Size;

Expand Down Expand Up @@ -143,13 +142,13 @@ impl Package for DirectoryPackage {
pub(crate) struct TarPackage(DirectoryPackage, temp::Dir);

impl TarPackage {
pub(crate) fn new<R: Read>(stream: R, cx: &PackageContext<'_>) -> Result<Self> {
let temp_dir = cx.tmp_cx.new_directory()?;
pub(crate) fn new<R: Read>(stream: R, dl_cfg: &DownloadCfg<'_>) -> Result<Self> {
let temp_dir = dl_cfg.tmp_cx.new_directory()?;
let mut archive = tar::Archive::new(stream);
// The rust-installer packages unpack to a directory called
// $pkgname-$version-$target. Skip that directory when
// unpacking.
unpack_without_first_dir(&mut archive, &temp_dir, cx)
unpack_without_first_dir(&mut archive, &temp_dir, dl_cfg)
.context("failed to extract package")?;

Ok(TarPackage(
Expand All @@ -163,7 +162,7 @@ impl TarPackage {
fn unpack_ram(
io_chunk_size: usize,
effective_max_ram: Option<usize>,
cx: &PackageContext<'_>,
dl_cfg: &DownloadCfg<'_>,
) -> usize {
const RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS: usize = 200 * 1024 * 1024;
let minimum_ram = io_chunk_size * 2;
Expand All @@ -177,7 +176,7 @@ fn unpack_ram(
// Rustup does not know how much RAM the machine has: use the minimum
minimum_ram
};
let unpack_ram = match cx
let unpack_ram = match dl_cfg
.process
.var("RUSTUP_UNPACK_RAM")
.ok()
Expand Down Expand Up @@ -289,7 +288,7 @@ enum DirStatus {
fn unpack_without_first_dir<R: Read>(
archive: &mut tar::Archive<R>,
path: &Path,
cx: &PackageContext<'_>,
dl_cfg: &DownloadCfg<'_>,
) -> Result<()> {
let entries = archive.entries()?;
let effective_max_ram = match effective_limits::memory_limit() {
Expand All @@ -299,8 +298,8 @@ fn unpack_without_first_dir<R: Read>(
None
}
};
let unpack_ram = unpack_ram(IO_CHUNK_SIZE, effective_max_ram, cx);
let mut io_executor: Box<dyn Executor> = get_executor(cx.notifier, unpack_ram, cx.process)?;
let unpack_ram = unpack_ram(IO_CHUNK_SIZE, effective_max_ram, dl_cfg);
let mut io_executor: Box<dyn Executor> = get_executor(unpack_ram, dl_cfg.process)?;

let mut directories: HashMap<PathBuf, DirStatus> = HashMap::new();
// Path is presumed to exist. Call it a precondition.
Expand Down Expand Up @@ -461,7 +460,7 @@ fn unpack_without_first_dir<R: Read>(
// Complain about this so we can see if these exist.
use std::io::Write as _;
writeln!(
cx.process.stderr().lock(),
dl_cfg.process.stderr().lock(),
"Unexpected: missing parent '{}' for '{}'",
parent.display(),
entry.path()?.display()
Expand Down Expand Up @@ -552,9 +551,9 @@ impl Package for TarPackage {
pub(crate) struct TarGzPackage(TarPackage);

impl TarGzPackage {
pub(crate) fn new<R: Read>(stream: R, cx: &PackageContext<'_>) -> Result<Self> {
pub(crate) fn new<R: Read>(stream: R, dl_cfg: &DownloadCfg<'_>) -> Result<Self> {
let stream = flate2::read::GzDecoder::new(stream);
Ok(TarGzPackage(TarPackage::new(stream, cx)?))
Ok(TarGzPackage(TarPackage::new(stream, dl_cfg)?))
}
}

Expand All @@ -580,9 +579,9 @@ impl Package for TarGzPackage {
pub(crate) struct TarXzPackage(TarPackage);

impl TarXzPackage {
pub(crate) fn new<R: Read>(stream: R, cx: &PackageContext<'_>) -> Result<Self> {
pub(crate) fn new<R: Read>(stream: R, dl_cfg: &DownloadCfg<'_>) -> Result<Self> {
let stream = xz2::read::XzDecoder::new(stream);
Ok(TarXzPackage(TarPackage::new(stream, cx)?))
Ok(TarXzPackage(TarPackage::new(stream, dl_cfg)?))
}
}

Expand All @@ -608,9 +607,9 @@ impl Package for TarXzPackage {
pub(crate) struct TarZStdPackage(TarPackage);

impl TarZStdPackage {
pub(crate) fn new<R: Read>(stream: R, cx: &PackageContext<'_>) -> Result<Self> {
pub(crate) fn new<R: Read>(stream: R, dl_cfg: &DownloadCfg<'_>) -> Result<Self> {
let stream = zstd::stream::read::Decoder::new(stream)?;
Ok(TarZStdPackage(TarPackage::new(stream, cx)?))
Ok(TarZStdPackage(TarPackage::new(stream, dl_cfg)?))
}
}

Expand All @@ -631,9 +630,3 @@ impl Package for TarZStdPackage {
self.0.components()
}
}

pub(crate) struct PackageContext<'a> {
pub(crate) tmp_cx: &'a temp::Context,
pub(crate) notifier: Option<&'a Notifier>,
pub(crate) process: &'a Process,
}
Loading