Skip to content

Commit 83c6846

Browse files
committed
Lazily obtain feature discovery when starting client metrics
1 parent 9fc16bb commit 83c6846

File tree

5 files changed

+90
-69
lines changed

5 files changed

+90
-69
lines changed

dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static java.util.concurrent.TimeUnit.SECONDS;
55

66
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
7+
import datadog.communication.ddagent.SharedCommunicationObjects;
78
import datadog.communication.monitor.Monitoring;
89
import datadog.trace.api.WellKnownTags;
910
import datadog.trace.core.CoreSpan;
@@ -32,20 +33,25 @@
3233
@OutputTimeUnit(MICROSECONDS)
3334
@Fork(value = 1)
3435
public class ConflatingMetricsAggregatorBenchmark {
35-
private final DDAgentFeaturesDiscovery featuresDiscovery =
36-
new FixedAgentFeaturesDiscovery(
37-
Collections.singleton("peer.hostname"), Collections.emptySet());
38-
private final ConflatingMetricsAggregator aggregator =
39-
new ConflatingMetricsAggregator(
40-
new WellKnownTags("", "", "", "", "", ""),
41-
Collections.emptySet(),
42-
featuresDiscovery,
43-
HealthMetrics.NO_OP,
44-
new NullSink(),
45-
2048,
46-
2048);
36+
private final SharedCommunicationObjects sco = new SharedCommunicationObjects();
37+
private final ConflatingMetricsAggregator aggregator;
4738
private final List<CoreSpan<?>> spans = generateTrace(64);
4839

40+
public ConflatingMetricsAggregatorBenchmark() {
41+
sco.setFeaturesDiscovery(
42+
new FixedAgentFeaturesDiscovery(
43+
Collections.singleton("peer.hostname"), Collections.emptySet()));
44+
aggregator =
45+
new ConflatingMetricsAggregator(
46+
new WellKnownTags("", "", "", "", "", ""),
47+
Collections.emptySet(),
48+
sco,
49+
HealthMetrics.NO_OP,
50+
new NullSink(),
51+
2048,
52+
2048);
53+
}
54+
4955
static List<CoreSpan<?>> generateTrace(int len) {
5056
final List<CoreSpan<?>> trace = new ArrayList<>();
5157
for (int i = 0; i < len; i++) {

dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
9898
private final Aggregator aggregator;
9999
private final long reportingInterval;
100100
private final TimeUnit reportingIntervalTimeUnit;
101-
private final DDAgentFeaturesDiscovery features;
101+
private final SharedCommunicationObjects sharedCommunicationObjects;
102+
private DDAgentFeaturesDiscovery features;
102103
private final HealthMetrics healthMetrics;
103104

104105
private volatile AgentTaskScheduler.Scheduled<?> cancellation;
@@ -110,7 +111,7 @@ public ConflatingMetricsAggregator(
110111
this(
111112
config.getWellKnownTags(),
112113
config.getMetricsIgnoredResources(),
113-
sharedCommunicationObjects.featuresDiscovery(config),
114+
sharedCommunicationObjects,
114115
healthMetrics,
115116
new OkHttpSink(
116117
sharedCommunicationObjects.okHttpClient,
@@ -126,15 +127,15 @@ public ConflatingMetricsAggregator(
126127
ConflatingMetricsAggregator(
127128
WellKnownTags wellKnownTags,
128129
Set<String> ignoredResources,
129-
DDAgentFeaturesDiscovery features,
130+
SharedCommunicationObjects sharedCommunicationObjects,
130131
HealthMetrics healthMetric,
131132
Sink sink,
132133
int maxAggregates,
133134
int queueSize) {
134135
this(
135136
wellKnownTags,
136137
ignoredResources,
137-
features,
138+
sharedCommunicationObjects,
138139
healthMetric,
139140
sink,
140141
maxAggregates,
@@ -146,7 +147,7 @@ public ConflatingMetricsAggregator(
146147
ConflatingMetricsAggregator(
147148
WellKnownTags wellKnownTags,
148149
Set<String> ignoredResources,
149-
DDAgentFeaturesDiscovery features,
150+
SharedCommunicationObjects sharedCommunicationObjects,
150151
HealthMetrics healthMetric,
151152
Sink sink,
152153
int maxAggregates,
@@ -155,7 +156,7 @@ public ConflatingMetricsAggregator(
155156
TimeUnit timeUnit) {
156157
this(
157158
ignoredResources,
158-
features,
159+
sharedCommunicationObjects,
159160
healthMetric,
160161
sink,
161162
new SerializingMetricWriter(wellKnownTags, sink),
@@ -167,7 +168,7 @@ public ConflatingMetricsAggregator(
167168

168169
ConflatingMetricsAggregator(
169170
Set<String> ignoredResources,
170-
DDAgentFeaturesDiscovery features,
171+
SharedCommunicationObjects sharedCommunicationObjects,
171172
HealthMetrics healthMetric,
172173
Sink sink,
173174
MetricWriter metricWriter,
@@ -180,7 +181,7 @@ public ConflatingMetricsAggregator(
180181
this.batchPool = new SpmcArrayQueue<>(maxAggregates);
181182
this.pending = new NonBlockingHashMap<>(maxAggregates * 4 / 3);
182183
this.keys = new NonBlockingHashMap<>();
183-
this.features = features;
184+
this.sharedCommunicationObjects = sharedCommunicationObjects;
184185
this.healthMetrics = healthMetric;
185186
this.sink = sink;
186187
this.aggregator =
@@ -200,6 +201,7 @@ public ConflatingMetricsAggregator(
200201

201202
@Override
202203
public void start() {
204+
features = sharedCommunicationObjects.featuresDiscovery(Config.get());
203205
sink.register(this);
204206
thread.start();
205207
cancellation =
@@ -214,10 +216,13 @@ public void start() {
214216
}
215217

216218
private boolean isMetricsEnabled() {
217-
if (features.getMetricsEndpoint() == null) {
218-
features.discoverIfOutdated();
219+
if (features != null) {
220+
if (features.getMetricsEndpoint() == null) {
221+
features.discoverIfOutdated();
222+
}
223+
return features.supportsMetrics();
219224
}
220-
return features.supportsMetrics();
225+
return false;
221226
}
222227

223228
@Override
@@ -273,7 +278,7 @@ public Future<Boolean> forceReport() {
273278
public boolean publish(List<? extends CoreSpan<?>> trace) {
274279
boolean forceKeep = false;
275280
int counted = 0;
276-
if (features.supportsMetrics()) {
281+
if (features != null && features.supportsMetrics()) {
277282
for (CoreSpan<?> span : trace) {
278283
boolean isTopLevel = span.isTopLevel();
279284
if (shouldComputeMetric(span)) {
@@ -354,31 +359,34 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
354359
}
355360

356361
private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) {
357-
if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
358-
List<UTF8BytesString> peerTags = new ArrayList<>();
359-
for (String peerTag : features.peerTags()) {
360-
Object value = span.getTag(peerTag);
361-
if (value != null) {
362+
if (features != null) {
363+
if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) {
364+
List<UTF8BytesString> peerTags = new ArrayList<>();
365+
for (String peerTag : features.peerTags()) {
366+
Object value = span.getTag(peerTag);
367+
if (value != null) {
368+
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
369+
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
370+
peerTags.add(
371+
cacheAndCreator
372+
.getLeft()
373+
.computeIfAbsent(value.toString(), cacheAndCreator.getRight()));
374+
}
375+
}
376+
return peerTags;
377+
} else if (SPAN_KIND_INTERNAL.equals(spanKind)) {
378+
// in this case only the base service should be aggregated if present
379+
final Object baseService = span.getTag(BASE_SERVICE);
380+
if (baseService != null) {
362381
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
363-
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER);
364-
peerTags.add(
382+
cacheAndCreator =
383+
PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER);
384+
return Collections.singletonList(
365385
cacheAndCreator
366386
.getLeft()
367-
.computeIfAbsent(value.toString(), cacheAndCreator.getRight()));
387+
.computeIfAbsent(baseService.toString(), cacheAndCreator.getRight()));
368388
}
369389
}
370-
return peerTags;
371-
} else if (SPAN_KIND_INTERNAL.equals(spanKind)) {
372-
// in this case only the base service should be aggregated if present
373-
final Object baseService = span.getTag(BASE_SERVICE);
374-
if (baseService != null) {
375-
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
376-
cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER);
377-
return Collections.singletonList(
378-
cacheAndCreator
379-
.getLeft()
380-
.computeIfAbsent(baseService.toString(), cacheAndCreator.getRight()));
381-
}
382390
}
383391
return Collections.emptyList();
384392
}
@@ -434,6 +442,7 @@ public void onEvent(EventType eventType, String message) {
434442
}
435443

436444
private void disable() {
445+
// note: disable is called only if started so we're not nullchecking before accessing features
437446
features.discover();
438447
if (!features.supportsMetrics()) {
439448
log.debug("Disabling metric reporting because an agent downgrade was detected");

dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@
100100
import datadog.trace.core.traceinterceptor.LatencyTraceInterceptor;
101101
import datadog.trace.lambda.LambdaHandler;
102102
import datadog.trace.relocate.api.RatelimitedLogger;
103-
import datadog.trace.util.AgentTaskScheduler;
104103
import java.io.IOException;
105104
import java.lang.ref.WeakReference;
106105
import java.math.BigInteger;
@@ -790,11 +789,8 @@ private CoreTracer(
790789

791790
metricsAggregator =
792791
createMetricsAggregator(config, sharedCommunicationObjects, this.healthMetrics);
793-
// Schedule the metrics aggregator to begin reporting after a random delay of 1 to 10 seconds
794-
// (using milliseconds granularity.) This avoids a fleet of traced applications starting at the
795-
// same time from sending metrics in sync.
796-
AgentTaskScheduler.get()
797-
.scheduleWithJitter(MetricsAggregator::start, metricsAggregator, 1, SECONDS);
792+
// the jitter is brought implicitly by the callback that can vary
793+
sharedCommunicationObjects.whenReady(metricsAggregator::start);
798794

799795
if (dataStreamsMonitoring == null) {
800796
this.dataStreamsMonitoring =

0 commit comments

Comments
 (0)