Skip to content

Commit 9d71fe4

Browse files
authored
Turbopack: add concurrency limit to effects (#78725)
### What? This avoids trying to apply too many effect in parallel.
1 parent 1c1d2ac commit 9d71fe4

File tree

1 file changed

+15
-12
lines changed

1 file changed

+15
-12
lines changed

turbopack/crates/turbo-tasks/src/effect.rs

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{borrow::Cow, future::Future, mem::replace, panic, pin::Pin};
22

33
use anyhow::{anyhow, Result};
44
use auto_hash_map::AutoSet;
5+
use futures::{StreamExt, TryStreamExt};
56
use parking_lot::Mutex;
67
use rustc_hash::FxHashSet;
78
use tracing::{Instrument, Span};
@@ -17,6 +18,8 @@ use crate::{
1718
CollectiblesSource, NonLocalValue, ReadRef, ResolvedVc, TryJoinIterExt,
1819
};
1920

21+
const APPLY_EFFECTS_CONCURRENCY_LIMIT: usize = 1024;
22+
2023
/// A trait to emit a task effect as collectible. This trait only has one
2124
/// implementation, `EffectInstance` and no other implementation is allowed.
2225
/// The trait is private to this module so that no other implementation can be
@@ -168,17 +171,16 @@ pub async fn apply_effects(source: impl CollectiblesSource) -> Result<()> {
168171
}
169172
let span = tracing::info_span!("apply effects", count = effects.len());
170173
async move {
171-
effects
172-
.into_iter()
173-
.map(async |effect| {
174+
// Limit the concurrency of effects
175+
futures::stream::iter(effects)
176+
.map(Ok)
177+
.try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| {
174178
let Some(effect) = ResolvedVc::try_downcast_type::<EffectInstance>(effect) else {
175179
panic!("Effect must only be implemented by EffectInstance");
176180
};
177181
effect.await?.apply().await
178182
})
179-
.try_join()
180-
.await?;
181-
Ok(())
183+
.await
182184
}
183185
.instrument(span)
184186
.await
@@ -251,12 +253,13 @@ impl Effects {
251253
pub async fn apply(&self) -> Result<()> {
252254
let span = tracing::info_span!("apply effects", count = self.effects.len());
253255
async move {
254-
self.effects
255-
.iter()
256-
.map(async |effect| effect.apply().await)
257-
.try_join()
258-
.await?;
259-
Ok(())
256+
// Limit the concurrency of effects
257+
futures::stream::iter(self.effects.iter())
258+
.map(Ok)
259+
.try_for_each_concurrent(APPLY_EFFECTS_CONCURRENCY_LIMIT, async |effect| {
260+
effect.apply().await
261+
})
262+
.await
260263
}
261264
.instrument(span)
262265
.await

0 commit comments

Comments
 (0)