@@ -2,6 +2,7 @@ use std::{borrow::Cow, future::Future, mem::replace, panic, pin::Pin};
2
2
3
3
use anyhow:: { anyhow, Result } ;
4
4
use auto_hash_map:: AutoSet ;
5
+ use futures:: StreamExt ;
5
6
use parking_lot:: Mutex ;
6
7
use rustc_hash:: FxHashSet ;
7
8
use tracing:: { Instrument , Span } ;
@@ -17,6 +18,8 @@ use crate::{
17
18
CollectiblesSource , NonLocalValue , ReadRef , ResolvedVc , TryJoinIterExt ,
18
19
} ;
19
20
21
+ const APPLY_EFFECTS_CONCURRENCY_LIMIT : usize = 1024 ;
22
+
20
23
/// A trait to emit a task effect as collectible. This trait only has one
21
24
/// implementation, `EffectInstance` and no other implementation is allowed.
22
25
/// The trait is private to this module so that no other implementation can be
@@ -168,16 +171,23 @@ pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> {
168
171
}
169
172
let span = tracing:: info_span!( "apply effects" , count = effects. len( ) ) ;
170
173
async move {
171
- effects
174
+ let effects = effects
172
175
. into_iter ( )
173
176
. map ( async |effect| {
174
177
let Some ( effect) = ResolvedVc :: try_downcast_type :: < EffectInstance > ( effect) else {
175
178
panic ! ( "Effect must only be implemented by EffectInstance" ) ;
176
179
} ;
177
180
effect. await ?. apply ( ) . await
178
181
} )
179
- . try_join ( )
180
- . await ?;
182
+ // TODO remove this collect(), but rust was not happy with it...
183
+ . collect :: < Vec < _ > > ( ) ;
184
+ // Limit the concurrency of effects,
185
+ // run them all even if an error occurs,
186
+ // report the first error.
187
+ let mut results = futures:: stream:: iter ( effects) . buffered ( APPLY_EFFECTS_CONCURRENCY_LIMIT ) ;
188
+ while let Some ( result) = results. next ( ) . await {
189
+ result?;
190
+ }
181
191
Ok ( ( ) )
182
192
}
183
193
. instrument ( span)
@@ -251,11 +261,20 @@ impl Effects {
251
261
pub async fn apply ( & self ) -> Result < ( ) > {
252
262
let span = tracing:: info_span!( "apply effects" , count = self . effects. len( ) ) ;
253
263
async move {
254
- self . effects
264
+ let effects = self
265
+ . effects
255
266
. iter ( )
256
267
. map ( async |effect| effect. apply ( ) . await )
257
- . try_join ( )
258
- . await ?;
268
+ // TODO remove this collect(), but rust was not happy with it...
269
+ . collect :: < Vec < _ > > ( ) ;
270
+ // Limit the concurrency of effects,
271
+ // run them all even if an error occurs,
272
+ // report the first error.
273
+ let mut results =
274
+ futures:: stream:: iter ( effects) . buffered ( APPLY_EFFECTS_CONCURRENCY_LIMIT ) ;
275
+ while let Some ( result) = results. next ( ) . await {
276
+ result?;
277
+ }
259
278
Ok ( ( ) )
260
279
}
261
280
. instrument ( span)
0 commit comments