@@ -2,7 +2,7 @@ use std::{borrow::Cow, future::Future, mem::replace, panic, pin::Pin};
22
33use anyhow:: { anyhow, Result } ;
44use auto_hash_map:: AutoSet ;
5- use futures:: StreamExt ;
5+ use futures:: { StreamExt , TryStreamExt } ;
66use parking_lot:: Mutex ;
77use rustc_hash:: FxHashSet ;
88use tracing:: { Instrument , Span } ;
@@ -171,24 +171,16 @@ pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> {
171171 }
172172 let span = tracing:: info_span!( "apply effects" , count = effects. len( ) ) ;
173173 async move {
174- let effects = effects
175- . into_iter ( )
176- . map ( async |effect| {
174+ // Limit the concurrency of effects
175+ futures:: stream:: iter ( effects)
176+ . map ( Ok )
177+ . try_for_each_concurrent ( APPLY_EFFECTS_CONCURRENCY_LIMIT , async |effect| {
177178 let Some ( effect) = ResolvedVc :: try_downcast_type :: < EffectInstance > ( effect) else {
178179 panic ! ( "Effect must only be implemented by EffectInstance" ) ;
179180 } ;
180181 effect. await ?. apply ( ) . await
181182 } )
182- // TODO remove this collect(), but rust was not happy with it...
183- . collect :: < Vec < _ > > ( ) ;
184- // Limit the concurrency of effects,
185- // run them all even if an error occurs,
186- // report the first error.
187- let mut results = futures:: stream:: iter ( effects) . buffered ( APPLY_EFFECTS_CONCURRENCY_LIMIT ) ;
188- while let Some ( result) = results. next ( ) . await {
189- result?;
190- }
191- Ok ( ( ) )
183+ . await
192184 }
193185 . instrument ( span)
194186 . await
@@ -261,21 +253,13 @@ impl Effects {
261253 pub async fn apply ( & self ) -> Result < ( ) > {
262254 let span = tracing:: info_span!( "apply effects" , count = self . effects. len( ) ) ;
263255 async move {
264- let effects = self
265- . effects
266- . iter ( )
267- . map ( async |effect| effect. apply ( ) . await )
268- // TODO remove this collect(), but rust was not happy with it...
269- . collect :: < Vec < _ > > ( ) ;
270- // Limit the concurrency of effects,
271- // run them all even if an error occurs,
272- // report the first error.
273- let mut results =
274- futures:: stream:: iter ( effects) . buffered ( APPLY_EFFECTS_CONCURRENCY_LIMIT ) ;
275- while let Some ( result) = results. next ( ) . await {
276- result?;
277- }
278- Ok ( ( ) )
256+ // Limit the concurrency of effects
257+ futures:: stream:: iter ( self . effects . iter ( ) )
258+ . map ( Ok )
259+ . try_for_each_concurrent ( APPLY_EFFECTS_CONCURRENCY_LIMIT , async |effect| {
260+ effect. apply ( ) . await
261+ } )
262+ . await
279263 }
280264 . instrument ( span)
281265 . await
0 commit comments