Skip to content

Commit 6b8ed9c

Browse files
alexbrtfrigus02
andcommitted
feat: implement retries (#103)
Azure Application Insights frequently times out, returns HTTP 503 errors, or only accepts partial payloads. This adds transparent retries. Users of this crate won't have to configure anything. Using `backon` crate, because it seems small and easy to use. Design decisions: - Don't expose implementation. Add default retries that should work for everyone: - min_delay = 500ms - max_delay = 5s - with jitter (to avoid thundering herds when multiple exports run in parallel) We wanted no max_times or total delay, because the BatchSpanProcessor already provides a `max_export_timeout` option. This automatically ensures that the exporter will stop. However, it's only available in the experimental async version of the BatchSpanProcessor. And it's missing in the SimpleSpanProcessor and metric and logs processors. Therefore this sets a total delay of 35s. - Add a `with_retry_notify` option. Can be used to log retries and thereby debug exporter flakyness. - Use `futures-timer` crate to provide sleep functionality during retries. Ideally we could use opentelemetry_sdk::runtime::Runtime::delay for that. But that's behind an experimental feature and we the SDK doesn't provide the exporters with a runtime. We'd have to add a configuration to the exporter where users would need to specify their choosen runtime, which then has to fit to the rest of their setup, otherwise they see runtime errors. E.g. tokio-sleep fails if not executed in the context of a Tokio reactor with: there is no reactor running, must be called from the context of a Tokio 1.x runtime This error would also only happen if a retry has to happen, which means when a call to AppInsights fails. This would make it very hard to debug. We're using futures-timer instead, which works with all runtimes, including tokio and futures-executor. The OpenTelemetry SDK uses the latter for most default processors right now. --------- Co-authored-by: Jan Kuehle <[email protected]>
1 parent c4cbd2b commit 6b8ed9c

File tree

7 files changed

+416
-75
lines changed

7 files changed

+416
-75
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ reqwest-client-rustls = ["opentelemetry-http/reqwest", "reqwest/rustls-tls"]
4747

4848
[dependencies]
4949
async-trait = "0.1"
50+
backon = { version = "1.5", default-features = false, features = ["futures-timer-sleep"] }
5051
bytes = "1"
5152
chrono = "0.4"
5253
flate2 = "1"

src/lib.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,13 @@ use opentelemetry_sdk::ExportError;
372372
use opentelemetry_sdk::Resource;
373373
#[cfg(feature = "live-metrics")]
374374
pub use quick_pulse::LiveMetricsSpanProcessor;
375-
use std::{convert::TryInto, error::Error as StdError, fmt::Debug, sync::Arc};
375+
use std::{
376+
convert::TryInto,
377+
error::Error as StdError,
378+
fmt::Debug,
379+
sync::{Arc, Mutex},
380+
time::Duration,
381+
};
376382
#[cfg(feature = "live-metrics")]
377383
use uploader_quick_pulse::PostOrPing;
378384

@@ -386,6 +392,7 @@ pub struct Exporter<C> {
386392
#[cfg(feature = "live-metrics")]
387393
live_ping_endpoint: http::Uri,
388394
instrumentation_key: String,
395+
retry_notify: Option<Arc<Mutex<dyn FnMut(&Error, Duration) + Send + 'static>>>,
389396
#[cfg(feature = "trace")]
390397
sample_rate: f64,
391398
#[cfg(any(feature = "trace", feature = "logs"))]
@@ -436,6 +443,7 @@ impl<C> Exporter<C> {
436443
&instrumentation_key,
437444
),
438445
instrumentation_key,
446+
retry_notify: None,
439447
#[cfg(feature = "trace")]
440448
sample_rate: 100.0,
441449
#[cfg(any(feature = "trace", feature = "logs"))]
@@ -475,6 +483,7 @@ impl<C> Exporter<C> {
475483
&connection_string.instrumentation_key,
476484
),
477485
instrumentation_key: connection_string.instrumentation_key,
486+
retry_notify: None,
478487
#[cfg(feature = "trace")]
479488
sample_rate: 100.0,
480489
#[cfg(any(feature = "trace", feature = "logs"))]
@@ -484,6 +493,16 @@ impl<C> Exporter<C> {
484493
})
485494
}
486495

496+
/// Set a retry notification function that is called when a request to upload telemetry to
497+
/// Application Insights failed and will be retried.
498+
pub fn with_retry_notify<N>(mut self, retry_notify: N) -> Self
499+
where
500+
N: FnMut(&Error, Duration) + Send + 'static,
501+
{
502+
self.retry_notify = Some(Arc::new(Mutex::new(retry_notify)));
503+
self
504+
}
505+
487506
/// Set endpoint used to ingest telemetry. This should consist of scheme and authrity. The
488507
/// exporter will call `/v2/track` on the specified endpoint.
489508
///

src/logs.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,14 @@ where
8383
.collect();
8484

8585
async move {
86-
crate::uploader::send(client.as_ref(), endpoint.as_ref(), envelopes)
87-
.await
88-
.map_err(Into::into)
86+
crate::uploader::send(
87+
client.as_ref(),
88+
endpoint.as_ref(),
89+
envelopes,
90+
self.retry_notify.clone(),
91+
)
92+
.await
93+
.map_err(Into::into)
8994
}
9095
}
9196

src/metrics.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,14 @@ where
7070
}
7171

7272
async move {
73-
crate::uploader::send(client.as_ref(), endpoint.as_ref(), envelopes)
74-
.await
75-
.map_err(Into::into)
73+
crate::uploader::send(
74+
client.as_ref(),
75+
endpoint.as_ref(),
76+
envelopes,
77+
self.retry_notify.clone(),
78+
)
79+
.await
80+
.map_err(Into::into)
7681
}
7782
}
7883

src/trace.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,14 @@ where
145145
.flat_map(|span| self.create_envelopes_for_span(span, &self.resource))
146146
.collect();
147147

148-
crate::uploader::send(client.as_ref(), endpoint.as_ref(), envelopes)
149-
.await
150-
.map_err(Into::into)
148+
crate::uploader::send(
149+
client.as_ref(),
150+
endpoint.as_ref(),
151+
envelopes,
152+
self.retry_notify.clone(),
153+
)
154+
.await
155+
.map_err(Into::into)
151156
}
152157

153158
fn set_resource(&mut self, resource: &Resource) {

0 commit comments

Comments
 (0)