Skip to content

Commit 5e23f1a

Browse files
jjbayerloewenheim
andauthored
feat(store): Scaffolding for the new attachment upload service (#5341)
Co-authored-by: Sebastian Zivota <[email protected]>
1 parent f9df94b commit 5e23f1a

File tree

7 files changed

+353
-3
lines changed

7 files changed

+353
-3
lines changed

relay-config/src/config.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1173,6 +1173,9 @@ pub struct Processing {
11731173
/// Maximum rate limit to report to clients.
11741174
#[serde(default = "default_max_rate_limit")]
11751175
pub max_rate_limit: Option<u32>,
1176+
/// Configuration for attachment uploads.
1177+
#[serde(default)]
1178+
pub upload: UploadServiceConfig,
11761179
}
11771180

11781181
impl Default for Processing {
@@ -1191,6 +1194,7 @@ impl Default for Processing {
11911194
attachment_chunk_size: default_chunk_size(),
11921195
projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
11931196
max_rate_limit: default_max_rate_limit(),
1197+
upload: UploadServiceConfig::default(),
11941198
}
11951199
}
11961200
}
@@ -1239,6 +1243,26 @@ impl Default for OutcomeAggregatorConfig {
12391243
}
12401244
}
12411245

1246+
/// Configuration values for attachment uploads.
1247+
#[derive(Serialize, Deserialize, Debug)]
1248+
#[serde(default)]
1249+
pub struct UploadServiceConfig {
1250+
/// Maximum concurrency of uploads.
1251+
pub max_concurrent_requests: usize,
1252+
1253+
/// Maximum duration of an attachment upload in seconds. Uploads that take longer are discarded.
1254+
pub timeout: u64,
1255+
}
1256+
1257+
impl Default for UploadServiceConfig {
1258+
fn default() -> Self {
1259+
Self {
1260+
max_concurrent_requests: 100,
1261+
timeout: 60,
1262+
}
1263+
}
1264+
}
1265+
12421266
/// Determines how to emit outcomes.
12431267
/// For compatibility reasons, this can either be true, false or AsClientReports
12441268
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@@ -2493,6 +2517,11 @@ impl Config {
24932517
&self.values.processing.topics.unused
24942518
}
24952519

2520+
/// Configuration of the attachment upload service.
2521+
pub fn upload(&self) -> &UploadServiceConfig {
2522+
&self.values.processing.upload
2523+
}
2524+
24962525
/// Redis servers to connect to for project configs, cardinality limits,
24972526
/// rate limiting, and metrics metadata.
24982527
pub fn redis(&self) -> Option<RedisConfigsRef<'_>> {

relay-server/src/service.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ use crate::services::relays::{RelayCache, RelayCacheService};
2525
use crate::services::stats::RelayStats;
2626
#[cfg(feature = "processing")]
2727
use crate::services::store::{StoreService, StoreServicePool};
28+
#[cfg(feature = "processing")]
29+
use crate::services::upload::UploadService;
2830
use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
2931
use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
3032
#[cfg(feature = "processing")]
@@ -222,12 +224,14 @@ impl ServiceState {
222224
let store = config
223225
.processing_enabled()
224226
.then(|| {
227+
let upload = services.start(UploadService::new(config.upload()));
225228
StoreService::create(
226229
store_pool.clone(),
227230
config.clone(),
228231
global_config_handle.clone(),
229232
outcome_aggregator.clone(),
230233
metric_outcomes.clone(),
234+
upload,
231235
)
232236
.map(|s| services.start(s))
233237
})

relay-server/src/services/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
//! Controller::run(|| Server::start())
2828
//! .expect("failed to start relay");
2929
//! ```
30+
pub mod autoscaling;
3031
pub mod buffer;
3132
pub mod cogs;
3233
pub mod global_config;
@@ -42,8 +43,9 @@ pub mod proxy_processor;
4243
pub mod relays;
4344
pub mod server;
4445
pub mod stats;
45-
pub mod upstream;
46-
47-
pub mod autoscaling;
4846
#[cfg(feature = "processing")]
4947
pub mod store;
48+
#[cfg(feature = "processing")]
49+
#[expect(unused)]
50+
pub mod upload;
51+
pub mod upstream;

relay-server/src/services/store.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use crate::service::ServiceError;
4040
use crate::services::global_config::GlobalConfigHandle;
4141
use crate::services::outcome::{DiscardItemType, DiscardReason, Outcome, TrackOutcome};
4242
use crate::services::processor::Processed;
43+
use crate::services::upload::Upload;
4344
use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
4445
use crate::utils::{self, FormDataIter};
4546

@@ -215,6 +216,8 @@ pub struct StoreService {
215216
global_config: GlobalConfigHandle,
216217
outcome_aggregator: Addr<TrackOutcome>,
217218
metric_outcomes: MetricOutcomes,
219+
#[expect(unused)]
220+
upload: Addr<Upload>,
218221
producer: Producer,
219222
}
220223

@@ -225,6 +228,7 @@ impl StoreService {
225228
global_config: GlobalConfigHandle,
226229
outcome_aggregator: Addr<TrackOutcome>,
227230
metric_outcomes: MetricOutcomes,
231+
upload: Addr<Upload>,
228232
) -> anyhow::Result<Self> {
229233
let producer = Producer::create(&config)?;
230234
Ok(Self {
@@ -233,6 +237,7 @@ impl StoreService {
233237
global_config,
234238
outcome_aggregator,
235239
metric_outcomes,
240+
upload,
236241
producer,
237242
})
238243
}

0 commit comments

Comments
 (0)