Skip to content

Commit ae02fdb

Browse files
committed
Defer discovery as much as possible
1 parent ed9c2f2 commit ae02fdb

File tree

2 files changed

+29
-15
lines changed

2 files changed

+29
-15
lines changed

dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,16 @@ public ConflatingMetricsAggregator(
199199
this.reportingIntervalTimeUnit = timeUnit;
200200
}
201201

202+
private DDAgentFeaturesDiscovery getFeatures() {
203+
DDAgentFeaturesDiscovery ret = features;
204+
if (ret == null) {
205+
return ret;
206+
}
207+
ret = sharedCommunicationObjects.featuresDiscovery(Config.get());
208+
features = ret;
209+
return ret;
210+
}
211+
202212
@Override
203213
public void start() {
204214
AgentTaskScheduler.get()
@@ -217,13 +227,7 @@ public void start() {
217227
}
218228

219229
private boolean isMetricsEnabled() {
220-
if (features != null) {
221-
if (features.getMetricsEndpoint() == null) {
222-
features.discoverIfOutdated();
223-
}
224-
return features.supportsMetrics();
225-
}
226-
return false;
230+
return getFeatures().supportsMetrics();
227231
}
228232

229233
@Override
@@ -279,6 +283,7 @@ public Future<Boolean> forceReport() {
279283
public boolean publish(List<? extends CoreSpan<?>> trace) {
280284
boolean forceKeep = false;
281285
int counted = 0;
286+
final DDAgentFeaturesDiscovery features = getFeatures();
282287
if (features != null && features.supportsMetrics()) {
283288
for (CoreSpan<?> span : trace) {
284289
boolean isTopLevel = span.isTopLevel();
@@ -289,7 +294,7 @@ public boolean publish(List<? extends CoreSpan<?>> trace) {
289294
break;
290295
}
291296
counted++;
292-
forceKeep |= publish(span, isTopLevel);
297+
forceKeep |= publish(span, isTopLevel, features);
293298
}
294299
}
295300
healthMetrics.onClientStatTraceComputed(
@@ -311,7 +316,7 @@ private boolean spanKindEligible(CoreSpan<?> span) {
311316
return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString());
312317
}
313318

314-
private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
319+
private boolean publish(CoreSpan<?> span, boolean isTopLevel, DDAgentFeaturesDiscovery features) {
315320
final CharSequence spanKind = span.getTag(SPAN_KIND, "");
316321
MetricKey newKey =
317322
new MetricKey(
@@ -324,7 +329,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
324329
span.getParentId() == 0,
325330
SPAN_KINDS.computeIfAbsent(
326331
spanKind, UTF8BytesString::create), // save repeated utf8 conversions
327-
getPeerTags(span, spanKind.toString()));
332+
getPeerTags(span, spanKind.toString(), features));
328333
boolean isNewKey = false;
329334
MetricKey key = keys.putIfAbsent(newKey, newKey);
330335
if (null == key) {
@@ -359,7 +364,8 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
359364
return isNewKey || span.getError() > 0;
360365
}
361366

362-
private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) {
367+
private List<UTF8BytesString> getPeerTags(
368+
CoreSpan<?> span, String spanKind, DDAgentFeaturesDiscovery features) {
363369
if (features != null) {
364370
if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
365371
List<UTF8BytesString> peerTags = new ArrayList<>();
@@ -426,8 +432,7 @@ public void onEvent(EventType eventType, String message) {
426432
switch (eventType) {
427433
case DOWNGRADED:
428434
log.debug("Agent downgrade was detected");
429-
disable();
430-
healthMetrics.onClientStatDowngraded();
435+
AgentTaskScheduler.get().execute(this::disable);
431436
break;
432437
case BAD_PAYLOAD:
433438
log.debug("bad metrics payload sent to trace agent: {}", message);
@@ -443,10 +448,11 @@ public void onEvent(EventType eventType, String message) {
443448
}
444449

445450
private void disable() {
446-
// note: disable is called only if started so we're not nullchecking before accessing features
447-
features.discoverIfOutdated();
451+
final DDAgentFeaturesDiscovery features = getFeatures();
452+
features.discover();
448453
if (!features.supportsMetrics()) {
449454
log.debug("Disabling metric reporting because an agent downgrade was detected");
455+
healthMetrics.onClientStatDowngraded();
450456
this.pending.clear();
451457
this.batchPool.clear();
452458
this.inbox.clear();

dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import java.util.concurrent.CountDownLatch
1212

1313
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
1414

15+
import java.util.concurrent.TimeUnit
16+
1517
class MetricsReliabilityTest extends DDCoreSpecification {
1618

1719
static class State {
@@ -87,13 +89,19 @@ class MetricsReliabilityTest extends DDCoreSpecification {
8789

8890

8991
when: "simulate an agent downgrade"
92+
def discoveryState = featuresDiscovery.discoveryState
9093
state.reset(false, 404)
9194
tracer.startSpan("test", "test").finish()
9295
tracer.flush()
9396
tracer.flushMetrics()
9497

9598
then: "a discovery should have done - we do not support anymore stats calculation"
9699
state.latch.await()
100+
// wait at least 5 seconds. the discovery is done asynchronously so we should wait for the internal state flip
101+
def start = System.nanoTime()
102+
while (discoveryState == featuresDiscovery.discoveryState && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) {
103+
Thread.yield()
104+
}
97105
assert !featuresDiscovery.supportsMetrics()
98106
// 2 traces processed. 1 p0 dropped. 2 requests and 1 downgrade no errors
99107
assertMetrics(healthMetrics, 2, 1, 2, 0, 1)

0 commit comments

Comments
 (0)