Skip to content

Commit da3e3fe

Browse files
committed
add concurrency limit to effects
1 parent 1c1d2ac commit da3e3fe

File tree

1 file changed

+25
-6
lines changed

1 file changed

+25
-6
lines changed

turbopack/crates/turbo-tasks/src/effect.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{borrow::Cow, future::Future, mem::replace, panic, pin::Pin};
22

33
use anyhow::{anyhow, Result};
44
use auto_hash_map::AutoSet;
5+
use futures::StreamExt;
56
use parking_lot::Mutex;
67
use rustc_hash::FxHashSet;
78
use tracing::{Instrument, Span};
@@ -17,6 +18,8 @@ use crate::{
1718
CollectiblesSource, NonLocalValue, ReadRef, ResolvedVc, TryJoinIterExt,
1819
};
1920

21+
const APPLY_EFFECTS_CONCURRENCY_LIMIT: usize = 1024;
22+
2023
/// A trait to emit a task effect as collectible. This trait only has one
2124
/// implementation, `EffectInstance` and no other implementation is allowed.
2225
/// The trait is private to this module so that no other implementation can be
@@ -168,16 +171,23 @@ pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> {
168171
}
169172
let span = tracing::info_span!("apply effects", count = effects.len());
170173
async move {
171-
effects
174+
let effects = effects
172175
.into_iter()
173176
.map(async |effect| {
174177
let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect) else {
175178
panic!("Effect must only be implemented by EffectInstance");
176179
};
177180
effect.await?.apply().await
178181
})
179-
.try_join()
180-
.await?;
182+
// TODO remove this collect(), but rust was not happy with it...
183+
.collect::<Vec<_>>();
184+
// Limit the concurrency of effects,
185+
// run them all even if an error occurs,
186+
// report the first error.
187+
let mut results = futures::stream::iter(effects).buffered(APPLY_EFFECTS_CONCURRENCY_LIMIT);
188+
while let Some(result) = results.next().await {
189+
result?;
190+
}
181191
Ok(())
182192
}
183193
.instrument(span)
@@ -251,11 +261,20 @@ impl Effects {
251261
pub async fn apply(&self) -> Result<()> {
252262
let span = tracing::info_span!("apply effects", count = self.effects.len());
253263
async move {
254-
self.effects
264+
let effects = self
265+
.effects
255266
.iter()
256267
.map(async |effect| effect.apply().await)
257-
.try_join()
258-
.await?;
268+
// TODO remove this collect(), but rust was not happy with it...
269+
.collect::<Vec<_>>();
270+
// Limit the concurrency of effects,
271+
// run them all even if an error occurs,
272+
// report the first error.
273+
let mut results =
274+
futures::stream::iter(effects).buffered(APPLY_EFFECTS_CONCURRENCY_LIMIT);
275+
while let Some(result) = results.next().await {
276+
result?;
277+
}
259278
Ok(())
260279
}
261280
.instrument(span)

0 commit comments

Comments
 (0)