@@ -2,6 +2,7 @@ use std::{borrow::Cow, future::Future, mem::replace, panic, pin::Pin};
22
33use  anyhow:: { anyhow,  Result } ; 
44use  auto_hash_map:: AutoSet ; 
5+ use  futures:: StreamExt ; 
56use  parking_lot:: Mutex ; 
67use  rustc_hash:: FxHashSet ; 
78use  tracing:: { Instrument ,  Span } ; 
@@ -17,6 +18,8 @@ use crate::{
1718    CollectiblesSource ,  NonLocalValue ,  ReadRef ,  ResolvedVc ,  TryJoinIterExt , 
1819} ; 
1920
21+ const  APPLY_EFFECTS_CONCURRENCY_LIMIT :  usize  = 1024 ; 
22+ 
2023/// A trait to emit a task effect as collectible. This trait only has one 
2124/// implementation, `EffectInstance` and no other implementation is allowed. 
2225/// The trait is private to this module so that no other implementation can be 
@@ -168,16 +171,23 @@ pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> {
168171    } 
169172    let  span = tracing:: info_span!( "apply effects" ,  count = effects. len( ) ) ; 
170173    async  move  { 
171-         effects
174+         let  effects =  effects
172175            . into_iter ( ) 
173176            . map ( async  |effect| { 
174177                let  Some ( effect)  = ResolvedVc :: try_downcast_type :: < EffectInstance > ( effect)  else  { 
175178                    panic ! ( "Effect must only be implemented by EffectInstance" ) ; 
176179                } ; 
177180                effect. await ?. apply ( ) . await 
178181            } ) 
179-             . try_join ( ) 
180-             . await ?; 
182+             // TODO remove this collect(), but rust was not happy with it... 
183+             . collect :: < Vec < _ > > ( ) ; 
184+         // Limit the concurrency of effects, 
185+         // run them all even if an error occurs, 
186+         // report the first error. 
187+         let  mut  results = futures:: stream:: iter ( effects) . buffered ( APPLY_EFFECTS_CONCURRENCY_LIMIT ) ; 
188+         while  let  Some ( result)  = results. next ( ) . await  { 
189+             result?; 
190+         } 
181191        Ok ( ( ) ) 
182192    } 
183193    . instrument ( span) 
@@ -251,11 +261,20 @@ impl Effects {
251261pub  async  fn  apply ( & self )  -> Result < ( ) >  { 
252262        let  span = tracing:: info_span!( "apply effects" ,  count = self . effects. len( ) ) ; 
253263        async  move  { 
254-             self . effects 
264+             let  effects = self 
265+                 . effects 
255266                . iter ( ) 
256267                . map ( async  |effect| effect. apply ( ) . await ) 
257-                 . try_join ( ) 
258-                 . await ?; 
268+                 // TODO remove this collect(), but rust was not happy with it... 
269+                 . collect :: < Vec < _ > > ( ) ; 
270+             // Limit the concurrency of effects, 
271+             // run them all even if an error occurs, 
272+             // report the first error. 
273+             let  mut  results =
274+                 futures:: stream:: iter ( effects) . buffered ( APPLY_EFFECTS_CONCURRENCY_LIMIT ) ; 
275+             while  let  Some ( result)  = results. next ( ) . await  { 
276+                 result?; 
277+             } 
259278            Ok ( ( ) ) 
260279        } 
261280        . instrument ( span) 
0 commit comments