@@ -7,9 +7,7 @@ mod tests;
77use  std:: path:: Path ; 
88
99use  anyhow:: { Context ,  Result ,  anyhow,  bail} ; 
10- use  futures_util:: stream:: StreamExt ; 
11- use  std:: sync:: Arc ; 
12- use  tokio:: sync:: Semaphore ; 
10+ use  futures_util:: stream:: { FuturesUnordered ,  StreamExt } ; 
1311use  tracing:: { info,  warn} ; 
1412
1513use  crate :: dist:: component:: { Components ,  DirectoryPackage ,  Transaction } ; 
@@ -151,8 +149,6 @@ impl Manifestation {
151149        } 
152150
153151        // Download component packages and validate hashes 
154-         let  mut  things_to_install = Vec :: new ( ) ; 
155-         let  mut  things_downloaded = Vec :: new ( ) ; 
156152        let  components = update
157153            . components_urls_and_hashes ( new_manifest) 
158154            . map ( |res| { 
@@ -166,7 +162,6 @@ impl Manifestation {
166162            } ) 
167163            . collect :: < Result < Vec < _ > > > ( ) ?; 
168164
169-         let  components_len = components. len ( ) ; 
170165        const  DEFAULT_CONCURRENT_DOWNLOADS :  usize  = 2 ; 
171166        let  concurrent_downloads = download_cfg
172167            . process 
@@ -181,27 +176,6 @@ impl Manifestation {
181176            . and_then ( |s| s. parse ( ) . ok ( ) ) 
182177            . unwrap_or ( DEFAULT_MAX_RETRIES ) ; 
183178
184-         info ! ( "downloading component(s)" ) ; 
185-         let  semaphore = Arc :: new ( Semaphore :: new ( concurrent_downloads) ) ; 
186-         let  component_stream = tokio_stream:: iter ( components. into_iter ( ) ) . map ( |bin| { 
187-             let  sem = semaphore. clone ( ) ; 
188-             async  move  { 
189-                 let  _permit = sem. acquire ( ) . await . unwrap ( ) ; 
190-                 bin. download ( max_retries) . await 
191-             } 
192-         } ) ; 
193-         if  components_len > 0  { 
194-             let  results = component_stream
195-                 . buffered ( components_len) 
196-                 . collect :: < Vec < _ > > ( ) 
197-                 . await ; 
198-             for  result in  results { 
199-                 let  ( bin,  downloaded_file)  = result?; 
200-                 things_downloaded. push ( bin. binary . hash . clone ( ) ) ; 
201-                 things_to_install. push ( ( bin,  downloaded_file) ) ; 
202-             } 
203-         } 
204- 
205179        // Begin transaction 
206180        let  mut  tx = Transaction :: new ( prefix. clone ( ) ,  tmp_cx,  download_cfg. process ) ; 
207181
@@ -240,15 +214,38 @@ impl Manifestation {
240214            tx = self . uninstall_component ( component,  new_manifest,  tx,  download_cfg. process ) ?; 
241215        } 
242216
243-         // Install components 
244-         for  ( component_bin,  installer_file)  in  things_to_install { 
245-             tx = component_bin. install ( installer_file,  tx,  self ) ?; 
217+         let  mut  downloads = FuturesUnordered :: new ( ) ; 
218+         let  mut  component_iter = components. iter ( ) ; 
219+         let  mut  cleanup_downloads = vec ! [ ] ; 
220+         loop  { 
221+             if  downloads. is_empty ( )  && component_iter. len ( )  == 0  { 
222+                 break ; 
223+             } 
224+ 
225+             let  installable = match  downloads. next ( ) . await  { 
226+                 Some ( Ok ( ( bin,  downloaded) ) )  => Some ( ( bin,  downloaded) ) , 
227+                 Some ( Err ( e) )  => return  Err ( e) , 
228+                 None  => None , 
229+             } ; 
230+ 
231+             while  component_iter. len ( )  > 0  && downloads. len ( )  < concurrent_downloads { 
232+                 if  let  Some ( bin)  = component_iter. next ( )  { 
233+                     downloads. push ( bin. download ( max_retries) ) ; 
234+                 } 
235+             } 
236+ 
237+             if  let  Some ( ( bin,  downloaded) )  = installable { 
238+                 cleanup_downloads. push ( & bin. binary . hash ) ; 
239+                 tx = bin. install ( downloaded,  tx,  self ) ?; 
240+             } 
246241        } 
247242
248243        // Install new distribution manifest 
249244        let  new_manifest_str = new_manifest. clone ( ) . stringify ( ) ?; 
250245        tx. modify_file ( rel_installed_manifest_path) ?; 
251246        utils:: write_file ( "manifest" ,  & installed_manifest_path,  & new_manifest_str) ?; 
247+         download_cfg. clean ( & cleanup_downloads) ?; 
248+         drop ( downloads) ; 
252249
253250        // Write configuration. 
254251        // 
@@ -269,8 +266,6 @@ impl Manifestation {
269266        // End transaction 
270267        tx. commit ( ) ; 
271268
272-         download_cfg. clean ( & things_downloaded) ?; 
273- 
274269        Ok ( UpdateStatus :: Changed ) 
275270    } 
276271
@@ -684,7 +679,7 @@ struct ComponentBinary<'a> {
684679} 
685680
686681impl < ' a >  ComponentBinary < ' a >  { 
687-     async  fn  download ( self ,  max_retries :  usize )  -> Result < ( Self ,  File ) >  { 
682+     async  fn  download ( & self ,  max_retries :  usize )  -> Result < ( & Self ,  File ) >  { 
688683        use  tokio_retry:: { RetryIf ,  strategy:: FixedInterval } ; 
689684
690685        let  url = self . download_cfg . url ( & self . binary . url ) ?; 
0 commit comments