From cb4b55d8afb23d4eb62a189fec3b430cc000eec1 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Tue, 23 Sep 2025 08:55:24 +0200 Subject: [PATCH 1/2] Revert "Lazily obtain feature discovery when starting client metrics (#9548)" This reverts commit 561eb5e7affa9c961722c915ec42f3f7b3534e18. --- .../ConflatingMetricsAggregatorBenchmark.java | 30 +++++------ .../metrics/ConflatingMetricsAggregator.java | 54 ++++++++----------- .../java/datadog/trace/core/CoreTracer.java | 6 +-- .../ConflatingMetricAggregatorTest.groovy | 43 +++++++-------- .../common/metrics/FootprintForkedTest.groovy | 5 +- .../metrics/MetricsReliabilityTest.groovy | 8 --- 6 files changed, 56 insertions(+), 90 deletions(-) diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java index 4f5911ebc69..3ae8a050f32 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java @@ -4,7 +4,6 @@ import static java.util.concurrent.TimeUnit.SECONDS; import datadog.communication.ddagent.DDAgentFeaturesDiscovery; -import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.communication.monitor.Monitoring; import datadog.trace.api.WellKnownTags; import datadog.trace.core.CoreSpan; @@ -33,25 +32,20 @@ @OutputTimeUnit(MICROSECONDS) @Fork(value = 1) public class ConflatingMetricsAggregatorBenchmark { - private final SharedCommunicationObjects sco = new SharedCommunicationObjects(); - private final ConflatingMetricsAggregator aggregator; + private final DDAgentFeaturesDiscovery featuresDiscovery = + new FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet()); + private final ConflatingMetricsAggregator aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + featuresDiscovery, + HealthMetrics.NO_OP, + new NullSink(), + 2048, + 2048); private final List> spans = generateTrace(64); - public ConflatingMetricsAggregatorBenchmark() { - sco.setFeaturesDiscovery( - new FixedAgentFeaturesDiscovery( - Collections.singleton("peer.hostname"), Collections.emptySet())); - aggregator = - new ConflatingMetricsAggregator( - new WellKnownTags("", "", "", "", "", ""), - Collections.emptySet(), - sco, - HealthMetrics.NO_OP, - new NullSink(), - 2048, - 2048); - } - static List> generateTrace(int len) { final List> trace = new ArrayList<>(); for (int i = 0; i < len; i++) { diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 04f60a78fa1..d2fd5e12a93 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -98,8 +98,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private final Aggregator aggregator; private final long reportingInterval; private final TimeUnit reportingIntervalTimeUnit; - private final SharedCommunicationObjects sharedCommunicationObjects; - private volatile DDAgentFeaturesDiscovery features; + private final DDAgentFeaturesDiscovery features; private final HealthMetrics healthMetrics; private volatile AgentTaskScheduler.Scheduled cancellation; @@ -111,7 +110,7 @@ public ConflatingMetricsAggregator( this( config.getWellKnownTags(), config.getMetricsIgnoredResources(), - sharedCommunicationObjects, + sharedCommunicationObjects.featuresDiscovery(config), healthMetrics, new OkHttpSink( sharedCommunicationObjects.okHttpClient, @@ -127,7 +126,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, - SharedCommunicationObjects sharedCommunicationObjects, + DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, int maxAggregates, @@ -135,7 +134,7 @@ public ConflatingMetricsAggregator( this( wellKnownTags, ignoredResources, - sharedCommunicationObjects, + features, healthMetric, sink, maxAggregates, @@ -147,7 +146,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, - SharedCommunicationObjects sharedCommunicationObjects, + DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, int maxAggregates, @@ -156,7 +155,7 @@ public ConflatingMetricsAggregator( TimeUnit timeUnit) { this( ignoredResources, - sharedCommunicationObjects, + features, healthMetric, sink, new SerializingMetricWriter(wellKnownTags, sink), @@ -168,7 +167,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( Set ignoredResources, - SharedCommunicationObjects sharedCommunicationObjects, + DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, MetricWriter metricWriter, @@ -181,7 +180,7 @@ public ConflatingMetricsAggregator( this.batchPool = new SpmcArrayQueue<>(maxAggregates); this.pending = new NonBlockingHashMap<>(maxAggregates * 4 / 3); this.keys = new NonBlockingHashMap<>(); - this.sharedCommunicationObjects = sharedCommunicationObjects; + this.features = features; this.healthMetrics = healthMetric; this.sink = sink; this.aggregator = @@ -199,18 +198,6 @@ public ConflatingMetricsAggregator( this.reportingIntervalTimeUnit = timeUnit; } - private DDAgentFeaturesDiscovery featuresDiscovery() { - DDAgentFeaturesDiscovery ret = features; - if (ret != null) { - return ret; - } - // no need to synchronise here since it's already done in sharedCommunicationObject. - // At worst, we'll assign multiple time the variable but it will be the same object - ret = sharedCommunicationObjects.featuresDiscovery(Config.get()); - features = ret; - return ret; - } - @Override public void start() { sink.register(this); @@ -226,6 +213,13 @@ public void start() { log.debug("started metrics aggregator"); } + private boolean isMetricsEnabled() { + if (features.getMetricsEndpoint() == null) { + features.discoverIfOutdated(); + } + return features.supportsMetrics(); + } + @Override public boolean report() { boolean published; @@ -242,7 +236,8 @@ public boolean report() { @Override public Future forceReport() { - if (!featuresDiscovery().supportsMetrics()) { + // Ensure the feature is enabled + if (!isMetricsEnabled()) { return CompletableFuture.completedFuture(false); } // Wait for the thread to start @@ -278,7 +273,6 @@ public Future forceReport() { public boolean publish(List> trace) { boolean forceKeep = false; int counted = 0; - final DDAgentFeaturesDiscovery features = featuresDiscovery(); if (features.supportsMetrics()) { for (CoreSpan span : trace) { boolean isTopLevel = span.isTopLevel(); @@ -289,7 +283,7 @@ public boolean publish(List> trace) { break; } counted++; - forceKeep |= publish(span, isTopLevel, features); + forceKeep |= publish(span, isTopLevel); } } healthMetrics.onClientStatTraceComputed( @@ -311,7 +305,7 @@ private boolean spanKindEligible(CoreSpan span) { return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString()); } - private boolean publish(CoreSpan span, boolean isTopLevel, DDAgentFeaturesDiscovery features) { + private boolean publish(CoreSpan span, boolean isTopLevel) { final CharSequence spanKind = span.getTag(SPAN_KIND, ""); MetricKey newKey = new MetricKey( @@ -324,7 +318,7 @@ private boolean publish(CoreSpan span, boolean isTopLevel, DDAgentFeaturesDis span.getParentId() == 0, SPAN_KINDS.computeIfAbsent( spanKind, UTF8BytesString::create), // save repeated utf8 conversions - getPeerTags(span, spanKind.toString(), features)); + getPeerTags(span, spanKind.toString())); boolean isNewKey = false; MetricKey key = keys.putIfAbsent(newKey, newKey); if (null == key) { @@ -359,8 +353,7 @@ private boolean publish(CoreSpan span, boolean isTopLevel, DDAgentFeaturesDis return isNewKey || span.getError() > 0; } - private List getPeerTags( - CoreSpan span, String spanKind, DDAgentFeaturesDiscovery features) { + private List getPeerTags(CoreSpan span, String spanKind) { if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { List peerTags = new ArrayList<>(); for (String peerTag : features.peerTags()) { @@ -424,7 +417,8 @@ public void onEvent(EventType eventType, String message) { switch (eventType) { case DOWNGRADED: log.debug("Agent downgrade was detected"); - AgentTaskScheduler.get().execute(this::disable); + disable(); + healthMetrics.onClientStatDowngraded(); break; case BAD_PAYLOAD: log.debug("bad metrics payload sent to trace agent: {}", message); @@ -440,11 +434,9 @@ public void onEvent(EventType eventType, String message) { } private void disable() { - final DDAgentFeaturesDiscovery features = featuresDiscovery(); features.discover(); if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); - healthMetrics.onClientStatDowngraded(); this.pending.clear(); this.batchPool.clear(); this.inbox.clear(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index d9832549867..32c9fc0388b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -789,10 +789,8 @@ private CoreTracer( // Schedule the metrics aggregator to begin reporting after a random delay of 1 to 10 seconds // (using milliseconds granularity.) This avoids a fleet of traced applications starting at the // same time from sending metrics in sync. - sharedCommunicationObjects.whenReady( - () -> - AgentTaskScheduler.get() - .scheduleWithJitter(MetricsAggregator::start, metricsAggregator, 1, SECONDS)); + AgentTaskScheduler.get() + .scheduleWithJitter(MetricsAggregator::start, metricsAggregator, 1, SECONDS); if (dataStreamsMonitoring == null) { this.dataStreamsMonitoring = diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 747cb33e7e3..52c1bb34de1 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -1,7 +1,6 @@ package datadog.trace.common.metrics import datadog.communication.ddagent.DDAgentFeaturesDiscovery -import datadog.communication.ddagent.SharedCommunicationObjects import datadog.trace.api.WellKnownTags import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.core.CoreSpan @@ -39,7 +38,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( wellKnownTags, empty, - sharedCommunicationObjects(features), + features, HealthMetrics.NO_OP, sink, 10, @@ -69,7 +68,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( wellKnownTags, [ignoredResourceName].toSet(), - sharedCommunicationObjects(features), + features, HealthMetrics.NO_OP, sink, 10, @@ -105,7 +104,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -147,7 +146,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -198,7 +197,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >>> [["country"], ["country", "georegion"],] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -257,7 +256,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> ["peer.hostname", "_dd.base_service"] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -305,7 +304,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, sharedCommunicationObjects(features), HealthMetrics.NO_OP, + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() @@ -354,7 +353,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) long duration = 100 List trace = [ new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, duration, HTTP_OK).setTag(SPAN_KIND, "baz"), @@ -419,7 +418,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 aggregator.start() @@ -478,7 +477,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 aggregator.start() @@ -568,7 +567,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 aggregator.start() @@ -624,7 +623,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 aggregator.start() @@ -671,7 +670,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 aggregator.start() @@ -710,7 +709,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 aggregator.start() @@ -741,7 +740,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) aggregator.start() when: @@ -763,7 +762,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> false features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS) + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS) final spans = [ new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK) ] @@ -795,7 +794,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) when: def async = CompletableFuture.supplyAsync(new Supplier() { @@ -828,7 +827,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -876,10 +875,4 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Thread.sleep(10) } } - - def sharedCommunicationObjects(features) { - def ret = new SharedCommunicationObjects() - ret.setFeaturesDiscovery(features) - ret - } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index da5fe6a26c4..4a96460d604 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -1,7 +1,6 @@ package datadog.trace.common.metrics import datadog.communication.ddagent.DDAgentFeaturesDiscovery -import datadog.communication.ddagent.SharedCommunicationObjects import datadog.trace.api.WellKnownTags import datadog.trace.core.monitor.HealthMetrics import datadog.trace.test.util.DDSpecification @@ -27,16 +26,14 @@ class FootprintForkedTest extends DDSpecification { setup: CountDownLatch latch = new CountDownLatch(1) ValidatingSink sink = new ValidatingSink(latch) - SharedCommunicationObjects sco = new SharedCommunicationObjects() DDAgentFeaturesDiscovery features = Stub(DDAgentFeaturesDiscovery) { it.supportsMetrics() >> true it.peerTags() >> [] } - sco.setFeaturesDiscovery(features) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( new WellKnownTags("runtimeid","hostname", "env", "service", "version","language"), [].toSet() as Set, - sco, + features, HealthMetrics.NO_OP, sink, 1000, diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy index 26103f11bed..8c7c64a275b 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy @@ -12,8 +12,6 @@ import java.util.concurrent.CountDownLatch import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer -import java.util.concurrent.TimeUnit - class MetricsReliabilityTest extends DDCoreSpecification { static class State { @@ -89,7 +87,6 @@ class MetricsReliabilityTest extends DDCoreSpecification { when: "simulate an agent downgrade" - def discoveryState = featuresDiscovery.discoveryState state.reset(false, 404) tracer.startSpan("test", "test").finish() tracer.flush() @@ -97,11 +94,6 @@ class MetricsReliabilityTest extends DDCoreSpecification { then: "a discovery should have done - we do not support anymore stats calculation" state.latch.await() - // wait at least 5 seconds. the discovery is done asynchronously so we should wait for the internal state flip - def start = System.nanoTime() - while (discoveryState == featuresDiscovery.discoveryState && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) { - Thread.yield() - } assert !featuresDiscovery.supportsMetrics() // 2 traces processed. 1 p0 dropped. 2 requests and 1 downgrade no errors assertMetrics(healthMetrics, 2, 1, 2, 0, 1) From 73527244989dd49f8267e777f5a4e143c8672501 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Tue, 23 Sep 2025 09:31:07 +0200 Subject: [PATCH 2/2] Defer metrics aggregator classloading to save startup time --- .../benchmark/StaticEventLogger.java | 2 +- .../common/metrics/NoOpMetricsAggregator.java | 2 +- .../java/datadog/trace/core/CoreTracer.java | 31 +++++++++++++------ 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/benchmark/StaticEventLogger.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/benchmark/StaticEventLogger.java index d8e05b5e1cc..c61c3e0c8ac 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/benchmark/StaticEventLogger.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/benchmark/StaticEventLogger.java @@ -97,7 +97,7 @@ private static String getAgentVersion() { for (int c = reader.read(); c != -1; c = reader.read()) { sb.append((char) c); } - } catch (IOException e) { + } catch (Throwable ignored) { // swallow exception return null; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/NoOpMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/NoOpMetricsAggregator.java index 733d77d3fac..a15c008e109 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/NoOpMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/NoOpMetricsAggregator.java @@ -9,7 +9,7 @@ public final class NoOpMetricsAggregator implements MetricsAggregator { - static final NoOpMetricsAggregator INSTANCE = new NoOpMetricsAggregator(); + public static final NoOpMetricsAggregator INSTANCE = new NoOpMetricsAggregator(); @Override public void start() {} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index 32c9fc0388b..5f64f71cbf1 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -72,6 +72,7 @@ import datadog.trace.civisibility.interceptor.CiVisibilityTraceInterceptor; import datadog.trace.common.GitMetadataTraceInterceptor; import datadog.trace.common.metrics.MetricsAggregator; +import datadog.trace.common.metrics.NoOpMetricsAggregator; import datadog.trace.common.sampling.Sampler; import datadog.trace.common.sampling.SingleSpanSampler; import datadog.trace.common.sampling.SpanSamplingRules; @@ -182,7 +183,7 @@ public static CoreTracerBuilder builder() { /** Scope manager is in charge of managing the scopes from which spans are created */ final ContinuableScopeManager scopeManager; - final MetricsAggregator metricsAggregator; + volatile MetricsAggregator metricsAggregator; /** Initial static configuration associated with the tracer. */ final Config initialConfig; @@ -783,14 +784,26 @@ private CoreTracer( pendingTraceBuffer.start(); sharedCommunicationObjects.whenReady(this.writer::start); - - metricsAggregator = - createMetricsAggregator(config, sharedCommunicationObjects, this.healthMetrics); - // Schedule the metrics aggregator to begin reporting after a random delay of 1 to 10 seconds - // (using milliseconds granularity.) This avoids a fleet of traced applications starting at the - // same time from sending metrics in sync. - AgentTaskScheduler.get() - .scheduleWithJitter(MetricsAggregator::start, metricsAggregator, 1, SECONDS); + // temporary assign a no-op instance. The final one will be resolved when the discovery will be + // allowed + metricsAggregator = NoOpMetricsAggregator.INSTANCE; + final SharedCommunicationObjects sco = sharedCommunicationObjects; + // asynchronously create the aggregator to avoid triggering expensive classloading during the + // tracer initialisation. + sharedCommunicationObjects.whenReady( + () -> + AgentTaskScheduler.get() + .execute( + () -> { + metricsAggregator = createMetricsAggregator(config, sco, this.healthMetrics); + // Schedule the metrics aggregator to begin reporting after a random delay of + // 1 to 10 seconds (using milliseconds granularity.) + // This avoids a fleet of traced applications starting at the same time from + // sending metrics in sync. + AgentTaskScheduler.get() + .scheduleWithJitter( + MetricsAggregator::start, metricsAggregator, 1, SECONDS); + })); if (dataStreamsMonitoring == null) { this.dataStreamsMonitoring =