Skip to content

Commit b2a55bf

Browse files
fix(iroh): improve reusing of QAD reports (#3512)
## Description This splits v4 vs v6 QAD report reuse between runs, to reduce traffic and overall work
1 parent b29c158 commit b2a55bf

File tree

2 files changed

+167
-107
lines changed

2 files changed

+167
-107
lines changed

iroh/src/net_report.rs

Lines changed: 161 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -87,19 +87,8 @@ pub use self::{
8787
report::{RelayLatencies, Report},
8888
reportgen::QuicConfig,
8989
};
90-
use crate::util::MaybeFuture;
9190

9291
const FULL_REPORT_INTERVAL: Duration = Duration::from_secs(5 * 60);
93-
94-
/// The maximum latency of all nodes, if none are found yet.
95-
///
96-
/// Normally the max latency of all nodes is computed, but if we don't yet know any nodes
97-
/// latencies we return this as default. This is the value of the initial QAD probe
98-
/// delays. It is only used as time to wait for further latencies to arrive, which *should*
99-
/// never happen unless there already is at least one latency. Yet here we are, defining a
100-
/// default which will never be used.
101-
const DEFAULT_MAX_LATENCY: Duration = Duration::from_millis(100);
102-
10392
const ENOUGH_NODES: usize = 3;
10493

10594
/// Client to run net_reports.
@@ -141,25 +130,26 @@ impl QadConns {
141130
}
142131
}
143132

144-
fn current(&self) -> Vec<ProbeReport> {
145-
let mut reports = Vec::new();
133+
fn current_v4(&self) -> Option<ProbeReport> {
146134
if let Some((_, ref conn)) = self.v4 {
147135
if let Some(mut r) = conn.observer.get() {
148136
// grab latest rtt
149137
r.latency = conn.conn.rtt();
150-
reports.push(ProbeReport::QadIpv4(r));
138+
return Some(ProbeReport::QadIpv4(r));
151139
}
152140
}
141+
None
142+
}
153143

144+
fn current_v6(&self) -> Option<ProbeReport> {
154145
if let Some((_, ref conn)) = self.v6 {
155146
if let Some(mut r) = conn.observer.get() {
156147
// grab latest rtt
157148
r.latency = conn.conn.rtt();
158-
reports.push(ProbeReport::QadIpv6(r));
149+
return Some(ProbeReport::QadIpv6(r));
159150
}
160151
}
161-
162-
reports
152+
None
163153
}
164154

165155
fn watch_v4(&self) -> impl n0_future::Stream<Item = Option<QadProbeReport>> + Unpin + use<> {
@@ -282,7 +272,8 @@ impl Client {
282272
}
283273
self.metrics.reports.inc();
284274

285-
let enough_relays = std::cmp::min(self.relay_map.len(), ENOUGH_NODES);
275+
let num_relays = self.relay_map.len();
276+
let enough_relays = std::cmp::min(num_relays, ENOUGH_NODES);
286277
#[cfg(wasm_browser)]
287278
let if_state = IfStateDetails::default();
288279
#[cfg(not(wasm_browser))]
@@ -315,71 +306,69 @@ impl Client {
315306
report.update(&r);
316307
}
317308

318-
let mut timeout_fut = std::pin::pin!(MaybeFuture::default());
319-
320-
#[cfg(not(wasm_browser))]
321-
let mut qad_v4_stream = self.qad_conns.watch_v4();
322-
#[cfg(wasm_browser)]
323-
let mut qad_v4_stream = n0_future::stream::empty::<Option<()>>();
324-
#[cfg(not(wasm_browser))]
325-
let mut qad_v6_stream = self.qad_conns.watch_v6();
326-
#[cfg(wasm_browser)]
327-
let mut qad_v6_stream = n0_future::stream::empty::<Option<()>>();
328-
329-
loop {
330-
tokio::select! {
331-
biased;
309+
if self.have_enough_reports(&if_state, do_full, num_relays, &report) {
310+
// check if we already have enough probes immediately after QAD returns
311+
trace!("have enough probe reports, aborting further probes");
312+
// shuts down the probes
313+
drop(actor);
314+
} else {
315+
#[cfg(not(wasm_browser))]
316+
let mut qad_v4_stream = self.qad_conns.watch_v4();
317+
#[cfg(wasm_browser)]
318+
let mut qad_v4_stream = n0_future::stream::empty::<Option<()>>();
319+
#[cfg(not(wasm_browser))]
320+
let mut qad_v6_stream = self.qad_conns.watch_v6();
321+
#[cfg(wasm_browser)]
322+
let mut qad_v6_stream = n0_future::stream::empty::<Option<()>>();
332323

333-
_ = &mut timeout_fut, if timeout_fut.is_some() => {
334-
trace!("timeout done, shutting down");
335-
drop(actor); // shuts down the probes
336-
break;
337-
}
324+
loop {
325+
tokio::select! {
326+
biased;
338327

339-
Some(Some(r)) = qad_v4_stream.next() => {
340-
#[cfg(not(wasm_browser))]
341-
{
342-
trace!(?r, "new report from QAD V4");
343-
report.update(&ProbeReport::QadIpv4(r));
328+
Some(Some(r)) = qad_v4_stream.next() => {
329+
#[cfg(not(wasm_browser))]
330+
{
331+
trace!(?r, "new report from QAD V4");
332+
report.update(&ProbeReport::QadIpv4(r));
333+
}
344334
}
345-
}
346335

347-
Some(Some(r)) = qad_v6_stream.next() => {
348-
#[cfg(not(wasm_browser))]
349-
{
350-
trace!(?r, "new report from QAD V6");
351-
report.update(&ProbeReport::QadIpv6(r));
336+
Some(Some(r)) = qad_v6_stream.next() => {
337+
#[cfg(not(wasm_browser))]
338+
{
339+
trace!(?r, "new report from QAD V6");
340+
report.update(&ProbeReport::QadIpv6(r));
341+
}
352342
}
353-
}
354343

355-
maybe_probe = probe_rx.recv() => {
356-
let Some(probe_res) = maybe_probe else {
357-
break;
358-
};
359-
trace!(?probe_res, "handling probe");
360-
match probe_res {
361-
ProbeFinished::Regular(probe) => match probe {
362-
Ok(probe) => {
363-
report.update(&probe);
364-
if timeout_fut.is_none() {
365-
if let Some(timeout) = self.have_enough_reports(enough_relays, &report) {
366-
timeout_fut.as_mut().set_future(time::sleep(timeout));
344+
maybe_probe = probe_rx.recv() => {
345+
let Some(probe_res) = maybe_probe else {
346+
break;
347+
};
348+
match probe_res {
349+
ProbeFinished::Regular(probe) => match probe {
350+
Ok(probe) => {
351+
report.update(&probe);
352+
if self.have_enough_reports(&if_state, do_full, num_relays, &report) {
353+
trace!("have enough probe reports, aborting further probes");
354+
// shuts down the probes
355+
drop(actor);
356+
break;
367357
}
368358
}
359+
Err(err) => {
360+
trace!("probe failed: {:?}", err);
361+
}
362+
},
363+
#[cfg(not(wasm_browser))]
364+
ProbeFinished::CaptivePortal(portal) => {
365+
report.captive_portal = portal;
369366
}
370-
Err(err) => {
371-
trace!("probe errored: {:?}", err);
372-
}
373-
},
374-
#[cfg(not(wasm_browser))]
375-
ProbeFinished::CaptivePortal(portal) => {
376-
report.captive_portal = portal;
377367
}
378368
}
379369
}
380370
}
381371
}
382-
383372
self.add_report_history_and_set_preferred_relay(&mut report);
384373
debug!(
385374
?report,
@@ -424,9 +413,23 @@ impl Client {
424413
self.qad_conns.v6.take();
425414
}
426415
}
427-
if self.qad_conns.v4.is_some() && self.qad_conns.v6.is_some() == if_state.have_v6 {
428-
trace!("not spawning QAD, already have probes");
429-
return self.qad_conns.current();
416+
417+
let v4_report = self.qad_conns.current_v4();
418+
let v6_report = self.qad_conns.current_v6();
419+
let needs_v4_probe = v4_report.is_none();
420+
let needs_v6_probe = v6_report.is_some() != if_state.have_v6;
421+
422+
let mut reports = Vec::new();
423+
424+
if let Some(report) = v4_report {
425+
reports.push(report);
426+
}
427+
if let Some(report) = v6_report {
428+
reports.push(report);
429+
}
430+
431+
if !needs_v4_probe && !needs_v6_probe {
432+
return reports;
430433
}
431434

432435
// TODO: randomize choice?
@@ -438,7 +441,7 @@ impl Client {
438441
let cancel_v6 = CancellationToken::new();
439442

440443
for relay_node in self.relay_map.nodes().take(MAX_RELAYS) {
441-
if if_state.have_v4 {
444+
if if_state.have_v4 && needs_v4_probe {
442445
debug!(?relay_node.url, "v4 QAD probe");
443446
let ip_mapped_addrs = self.socket_state.ip_mapped_addrs.clone();
444447
let relay_node = relay_node.clone();
@@ -456,7 +459,7 @@ impl Client {
456459
);
457460
}
458461

459-
if if_state.have_v6 {
462+
if if_state.have_v6 && needs_v6_probe {
460463
debug!(?relay_node.url, "v6 QAD probe");
461464
let ip_mapped_addrs = self.socket_state.ip_mapped_addrs.clone();
462465
let relay_node = relay_node.clone();
@@ -475,8 +478,6 @@ impl Client {
475478
}
476479
}
477480

478-
let mut reports = Vec::new();
479-
480481
// We set _pending to true if at least one report was started for each category.
481482
// If we did not start any report for either category, _pending is set to false right away
482483
// (it "completed" in the sense that nothing will ever run). If we did start at least one report,
@@ -576,32 +577,91 @@ impl Client {
576577
reports
577578
}
578579

579-
fn have_enough_reports(&self, enough_relays: usize, report: &Report) -> Option<Duration> {
580-
// Once we've heard from enough relay servers (3), start a timer to give up on the other
581-
// probes. The timer's duration is a function of whether this is our initial full
582-
// probe or an incremental one. For incremental ones, wait for the duration of the
583-
// slowest relay. For initial ones, double that.
584-
let latencies: Vec<Duration> = report.relay_latency.iter().map(|(_, l)| l).collect();
585-
let have_enough_latencies = latencies.len() >= enough_relays;
586-
587-
if have_enough_latencies {
588-
let timeout = match self.reports.last.is_some() {
589-
true => Duration::from_secs(0),
590-
false => latencies
591-
.iter()
592-
.max()
593-
.copied()
594-
.unwrap_or(DEFAULT_MAX_LATENCY),
595-
};
596-
debug!(
597-
reports=latencies.len(),
598-
delay=?timeout,
599-
"Have enough probe reports, aborting further probes soon",
600-
);
580+
/// Check if we have enough information to consider the current report "good enough".
581+
fn have_enough_reports(
582+
&self,
583+
state: &IfStateDetails,
584+
do_full: bool,
585+
num_relays: usize,
586+
report: &Report,
587+
) -> bool {
588+
#[cfg_attr(wasm_browser, allow(unused_mut))]
589+
let mut num_ipv4 = 0;
590+
#[cfg_attr(wasm_browser, allow(unused_mut))]
591+
let mut num_ipv6 = 0;
592+
let mut num_https = 0;
593+
for (typ, _, _) in report.relay_latency.iter() {
594+
match typ {
595+
#[cfg(not(wasm_browser))]
596+
Probe::QadIpv4 => {
597+
num_ipv4 += 1;
598+
}
599+
#[cfg(not(wasm_browser))]
600+
Probe::QadIpv6 => {
601+
num_ipv6 += 1;
602+
}
603+
Probe::Https => {
604+
num_https += 1;
605+
}
606+
}
607+
}
601608

602-
Some(timeout)
609+
if do_full {
610+
// Full report, require more probes
611+
match (state.have_v4, state.have_v6) {
612+
(true, true) => {
613+
// Both IPv4 and IPv6 are expected to be available
614+
if num_ipv4 >= 2 && num_ipv6 >= 1 || num_ipv6 >= 2 && num_ipv4 >= 1 {
615+
return true;
616+
}
617+
}
618+
(true, false) => {
619+
// Just Ipv4 is expected
620+
if num_ipv4 >= 2 {
621+
return true;
622+
}
623+
}
624+
(false, true) => {
625+
// Just Ipv6 is expected
626+
if num_ipv6 >= 2 {
627+
return true;
628+
}
629+
}
630+
(false, false) => {}
631+
}
632+
if num_https >= num_relays {
633+
// If we have at least one https probe per relay, we are happy
634+
return true;
635+
}
636+
false
603637
} else {
604-
None
638+
// Incremental reports, here the requirements are reduced further
639+
match (state.have_v4, state.have_v6) {
640+
(true, true) => {
641+
// Both IPv4 and IPv6 are expected to be available
642+
if num_ipv4 >= 1 && num_ipv6 >= 1 {
643+
return true;
644+
}
645+
}
646+
(true, false) => {
647+
// Just Ipv4 is expected
648+
if num_ipv4 >= 1 {
649+
return true;
650+
}
651+
}
652+
(false, true) => {
653+
// Just Ipv6 is expected
654+
if num_ipv6 >= 1 {
655+
return true;
656+
}
657+
}
658+
(false, false) => {}
659+
}
660+
if num_https >= num_relays {
661+
// If we have at least one https probe per relay, we are happy
662+
return true;
663+
}
664+
false
605665
}
606666
}
607667

@@ -653,7 +713,7 @@ impl Client {
653713
let mut best_any = Duration::default();
654714
let mut old_relay_cur_latency = Duration::default();
655715
{
656-
for (url, duration) in r.relay_latency.iter() {
716+
for (_, url, duration) in r.relay_latency.iter() {
657717
if Some(url) == prev_relay.as_ref() {
658718
old_relay_cur_latency = duration;
659719
}

iroh/src/net_report/report.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -172,18 +172,18 @@ impl RelayLatencies {
172172

173173
/// Returns an iterator over all the relays and their latencies.
174174
#[cfg(not(wasm_browser))]
175-
pub fn iter(&self) -> impl Iterator<Item = (&'_ RelayUrl, Duration)> + '_ {
175+
pub fn iter(&self) -> impl Iterator<Item = (Probe, &'_ RelayUrl, Duration)> + '_ {
176176
self.https
177177
.iter()
178-
.chain(self.ipv4.iter())
179-
.chain(self.ipv6.iter())
180-
.map(|(k, v)| (k, *v))
178+
.map(|(url, l)| (Probe::Https, url, *l))
179+
.chain(self.ipv4.iter().map(|(url, l)| (Probe::QadIpv4, url, *l)))
180+
.chain(self.ipv6.iter().map(|(url, l)| (Probe::QadIpv6, url, *l)))
181181
}
182182

183183
/// Returns an iterator over all the relays and their latencies.
184184
#[cfg(wasm_browser)]
185-
pub fn iter(&self) -> impl Iterator<Item = (&'_ RelayUrl, Duration)> + '_ {
186-
self.https.iter().map(|(k, v)| (k, *v))
185+
pub fn iter(&self) -> impl Iterator<Item = (Probe, &'_ RelayUrl, Duration)> + '_ {
186+
self.https.iter().map(|(k, v)| (Probe::Https, k, *v))
187187
}
188188

189189
#[cfg(not(wasm_browser))]

0 commit comments

Comments
 (0)