Skip to content

Commit 09fd601

Browse files
committed
Defer discovery as much as possible
1 parent 2076010 commit 09fd601

File tree

2 files changed

+30
-15
lines changed

2 files changed

+30
-15
lines changed

dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,20 @@ public ConflatingMetricsAggregator(
199199
this.reportingIntervalTimeUnit = timeUnit;
200200
}
201201

202+
private DDAgentFeaturesDiscovery getFeatures() {
203+
DDAgentFeaturesDiscovery ret = features;
204+
if (ret == null) {
205+
return ret;
206+
}
207+
ret = sharedCommunicationObjects.featuresDiscovery(Config.get());
208+
features = ret;
209+
return ret;
210+
}
211+
202212
@Override
203213
public void start() {
204-
features = sharedCommunicationObjects.featuresDiscovery(Config.get());
214+
AgentTaskScheduler.get()
215+
.execute(() -> features = sharedCommunicationObjects.featuresDiscovery(Config.get()));
205216
sink.register(this);
206217
thread.start();
207218
cancellation =
@@ -216,13 +227,7 @@ public void start() {
216227
}
217228

218229
private boolean isMetricsEnabled() {
219-
if (features != null) {
220-
if (features.getMetricsEndpoint() == null) {
221-
features.discoverIfOutdated();
222-
}
223-
return features.supportsMetrics();
224-
}
225-
return false;
230+
return getFeatures().supportsMetrics();
226231
}
227232

228233
@Override
@@ -278,6 +283,7 @@ public Future<Boolean> forceReport() {
278283
public boolean publish(List<? extends CoreSpan<?>> trace) {
279284
boolean forceKeep = false;
280285
int counted = 0;
286+
final DDAgentFeaturesDiscovery features = getFeatures();
281287
if (features != null && features.supportsMetrics()) {
282288
for (CoreSpan<?> span : trace) {
283289
boolean isTopLevel = span.isTopLevel();
@@ -288,7 +294,7 @@ public boolean publish(List<? extends CoreSpan<?>> trace) {
288294
break;
289295
}
290296
counted++;
291-
forceKeep |= publish(span, isTopLevel);
297+
forceKeep |= publish(span, isTopLevel, features);
292298
}
293299
}
294300
healthMetrics.onClientStatTraceComputed(
@@ -310,7 +316,7 @@ private boolean spanKindEligible(CoreSpan<?> span) {
310316
return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString());
311317
}
312318

313-
private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
319+
private boolean publish(CoreSpan<?> span, boolean isTopLevel, DDAgentFeaturesDiscovery features) {
314320
final CharSequence spanKind = span.getTag(SPAN_KIND, "");
315321
MetricKey newKey =
316322
new MetricKey(
@@ -323,7 +329,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
323329
span.getParentId() == 0,
324330
SPAN_KINDS.computeIfAbsent(
325331
spanKind, UTF8BytesString::create), // save repeated utf8 conversions
326-
getPeerTags(span, spanKind.toString()));
332+
getPeerTags(span, spanKind.toString(), features));
327333
boolean isNewKey = false;
328334
MetricKey key = keys.putIfAbsent(newKey, newKey);
329335
if (null == key) {
@@ -358,7 +364,8 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
358364
return isNewKey || span.getError() > 0;
359365
}
360366

361-
private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) {
367+
private List<UTF8BytesString> getPeerTags(
368+
CoreSpan<?> span, String spanKind, DDAgentFeaturesDiscovery features) {
362369
if (features != null) {
363370
if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
364371
List<UTF8BytesString> peerTags = new ArrayList<>();
@@ -425,8 +432,7 @@ public void onEvent(EventType eventType, String message) {
425432
switch (eventType) {
426433
case DOWNGRADED:
427434
log.debug("Agent downgrade was detected");
428-
disable();
429-
healthMetrics.onClientStatDowngraded();
435+
AgentTaskScheduler.get().execute(this::disable);
430436
break;
431437
case BAD_PAYLOAD:
432438
log.debug("bad metrics payload sent to trace agent: {}", message);
@@ -442,10 +448,11 @@ public void onEvent(EventType eventType, String message) {
442448
}
443449

444450
private void disable() {
445-
// note: disable is called only if started so we're not nullchecking before accessing features
451+
final DDAgentFeaturesDiscovery features = getFeatures();
446452
features.discover();
447453
if (!features.supportsMetrics()) {
448454
log.debug("Disabling metric reporting because an agent downgrade was detected");
455+
healthMetrics.onClientStatDowngraded();
449456
this.pending.clear();
450457
this.batchPool.clear();
451458
this.inbox.clear();

dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import java.util.concurrent.CountDownLatch
1212

1313
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
1414

15+
import java.util.concurrent.TimeUnit
16+
1517
class MetricsReliabilityTest extends DDCoreSpecification {
1618

1719
static class State {
@@ -87,13 +89,19 @@ class MetricsReliabilityTest extends DDCoreSpecification {
8789

8890

8991
when: "simulate an agent downgrade"
92+
def discoveryState = featuresDiscovery.discoveryState
9093
state.reset(false, 404)
9194
tracer.startSpan("test", "test").finish()
9295
tracer.flush()
9396
tracer.flushMetrics()
9497

9598
then: "a discovery should have done - we do not support anymore stats calculation"
9699
state.latch.await()
100+
// wait at least 5 seconds. the discovery is done asynchronously so we should wait for the internal state flip
101+
def start = System.nanoTime()
102+
while (discoveryState == featuresDiscovery.discoveryState && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) {
103+
Thread.yield()
104+
}
97105
assert !featuresDiscovery.supportsMetrics()
98106
// 2 traces processed. 1 p0 dropped. 2 requests and 1 downgrade no errors
99107
assertMetrics(healthMetrics, 2, 1, 2, 0, 1)

0 commit comments

Comments
 (0)