@@ -2,6 +2,7 @@ use std::{borrow::Cow, future::Future, mem::replace, panic, pin::Pin};
22
33use anyhow:: { anyhow, Result } ;
44use auto_hash_map:: AutoSet ;
5+ use futures:: StreamExt ;
56use parking_lot:: Mutex ;
67use rustc_hash:: FxHashSet ;
78use tracing:: { Instrument , Span } ;
@@ -17,6 +18,8 @@ use crate::{
1718 CollectiblesSource , NonLocalValue , ReadRef , ResolvedVc , TryJoinIterExt ,
1819} ;
1920
21+ const APPLY_EFFECTS_CONCURRENCY_LIMIT : usize = 1024 ;
22+
2023/// A trait to emit a task effect as collectible. This trait only has one
2124/// implementation, `EffectInstance` and no other implementation is allowed.
2225/// The trait is private to this module so that no other implementation can be
@@ -168,16 +171,23 @@ pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> {
168171 }
169172 let span = tracing:: info_span!( "apply effects" , count = effects. len( ) ) ;
170173 async move {
171- effects
174+ let effects = effects
172175 . into_iter ( )
173176 . map ( async |effect| {
174177 let Some ( effect) = ResolvedVc :: try_downcast_type :: < EffectInstance > ( effect) else {
175178 panic ! ( "Effect must only be implemented by EffectInstance" ) ;
176179 } ;
177180 effect. await ?. apply ( ) . await
178181 } )
179- . try_join ( )
180- . await ?;
182+ // TODO remove this collect(), but rust was not happy with it...
183+ . collect :: < Vec < _ > > ( ) ;
184+ // Limit the concurrency of effects,
185+ // run them all even if an error occurs,
186+ // report the first error.
187+ let mut results = futures:: stream:: iter ( effects) . buffered ( APPLY_EFFECTS_CONCURRENCY_LIMIT ) ;
188+ while let Some ( result) = results. next ( ) . await {
189+ result?;
190+ }
181191 Ok ( ( ) )
182192 }
183193 . instrument ( span)
@@ -251,11 +261,20 @@ impl Effects {
251261 pub async fn apply ( & self ) -> Result < ( ) > {
252262 let span = tracing:: info_span!( "apply effects" , count = self . effects. len( ) ) ;
253263 async move {
254- self . effects
264+ let effects = self
265+ . effects
255266 . iter ( )
256267 . map ( async |effect| effect. apply ( ) . await )
257- . try_join ( )
258- . await ?;
268+ // TODO remove this collect(), but rust was not happy with it...
269+ . collect :: < Vec < _ > > ( ) ;
270+ // Limit the concurrency of effects,
271+ // run them all even if an error occurs,
272+ // report the first error.
273+ let mut results =
274+ futures:: stream:: iter ( effects) . buffered ( APPLY_EFFECTS_CONCURRENCY_LIMIT ) ;
275+ while let Some ( result) = results. next ( ) . await {
276+ result?;
277+ }
259278 Ok ( ( ) )
260279 }
261280 . instrument ( span)
0 commit comments