@@ -98,8 +98,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
9898 private final Aggregator aggregator ;
9999 private final long reportingInterval ;
100100 private final TimeUnit reportingIntervalTimeUnit ;
101- private final SharedCommunicationObjects sharedCommunicationObjects ;
102- private volatile DDAgentFeaturesDiscovery features ;
101+ private final DDAgentFeaturesDiscovery features ;
103102 private final HealthMetrics healthMetrics ;
104103
105104 private volatile AgentTaskScheduler .Scheduled <?> cancellation ;
@@ -111,7 +110,7 @@ public ConflatingMetricsAggregator(
111110 this (
112111 config .getWellKnownTags (),
113112 config .getMetricsIgnoredResources (),
114- sharedCommunicationObjects ,
113+ sharedCommunicationObjects . featuresDiscovery ( config ) ,
115114 healthMetrics ,
116115 new OkHttpSink (
117116 sharedCommunicationObjects .okHttpClient ,
@@ -127,15 +126,15 @@ public ConflatingMetricsAggregator(
127126 ConflatingMetricsAggregator (
128127 WellKnownTags wellKnownTags ,
129128 Set <String > ignoredResources ,
130- SharedCommunicationObjects sharedCommunicationObjects ,
129+ DDAgentFeaturesDiscovery features ,
131130 HealthMetrics healthMetric ,
132131 Sink sink ,
133132 int maxAggregates ,
134133 int queueSize ) {
135134 this (
136135 wellKnownTags ,
137136 ignoredResources ,
138- sharedCommunicationObjects ,
137+ features ,
139138 healthMetric ,
140139 sink ,
141140 maxAggregates ,
@@ -147,7 +146,7 @@ public ConflatingMetricsAggregator(
147146 ConflatingMetricsAggregator (
148147 WellKnownTags wellKnownTags ,
149148 Set <String > ignoredResources ,
150- SharedCommunicationObjects sharedCommunicationObjects ,
149+ DDAgentFeaturesDiscovery features ,
151150 HealthMetrics healthMetric ,
152151 Sink sink ,
153152 int maxAggregates ,
@@ -156,7 +155,7 @@ public ConflatingMetricsAggregator(
156155 TimeUnit timeUnit ) {
157156 this (
158157 ignoredResources ,
159- sharedCommunicationObjects ,
158+ features ,
160159 healthMetric ,
161160 sink ,
162161 new SerializingMetricWriter (wellKnownTags , sink ),
@@ -168,7 +167,7 @@ public ConflatingMetricsAggregator(
168167
169168 ConflatingMetricsAggregator (
170169 Set <String > ignoredResources ,
171- SharedCommunicationObjects sharedCommunicationObjects ,
170+ DDAgentFeaturesDiscovery features ,
172171 HealthMetrics healthMetric ,
173172 Sink sink ,
174173 MetricWriter metricWriter ,
@@ -181,7 +180,7 @@ public ConflatingMetricsAggregator(
181180 this .batchPool = new SpmcArrayQueue <>(maxAggregates );
182181 this .pending = new NonBlockingHashMap <>(maxAggregates * 4 / 3 );
183182 this .keys = new NonBlockingHashMap <>();
184- this .sharedCommunicationObjects = sharedCommunicationObjects ;
183+ this .features = features ;
185184 this .healthMetrics = healthMetric ;
186185 this .sink = sink ;
187186 this .aggregator =
@@ -199,18 +198,6 @@ public ConflatingMetricsAggregator(
199198 this .reportingIntervalTimeUnit = timeUnit ;
200199 }
201200
202- private DDAgentFeaturesDiscovery featuresDiscovery () {
203- DDAgentFeaturesDiscovery ret = features ;
204- if (ret != null ) {
205- return ret ;
206- }
207- // no need to synchronise here since it's already done in sharedCommunicationObject.
208- // At worst, we'll assign multiple time the variable but it will be the same object
209- ret = sharedCommunicationObjects .featuresDiscovery (Config .get ());
210- features = ret ;
211- return ret ;
212- }
213-
214201 @ Override
215202 public void start () {
216203 sink .register (this );
@@ -226,6 +213,13 @@ public void start() {
226213 log .debug ("started metrics aggregator" );
227214 }
228215
216+ private boolean isMetricsEnabled () {
217+ if (features .getMetricsEndpoint () == null ) {
218+ features .discoverIfOutdated ();
219+ }
220+ return features .supportsMetrics ();
221+ }
222+
229223 @ Override
230224 public boolean report () {
231225 boolean published ;
@@ -242,7 +236,8 @@ public boolean report() {
242236
243237 @ Override
244238 public Future <Boolean > forceReport () {
245- if (!featuresDiscovery ().supportsMetrics ()) {
239+ // Ensure the feature is enabled
240+ if (!isMetricsEnabled ()) {
246241 return CompletableFuture .completedFuture (false );
247242 }
248243 // Wait for the thread to start
@@ -278,7 +273,6 @@ public Future<Boolean> forceReport() {
278273 public boolean publish (List <? extends CoreSpan <?>> trace ) {
279274 boolean forceKeep = false ;
280275 int counted = 0 ;
281- final DDAgentFeaturesDiscovery features = featuresDiscovery ();
282276 if (features .supportsMetrics ()) {
283277 for (CoreSpan <?> span : trace ) {
284278 boolean isTopLevel = span .isTopLevel ();
@@ -289,7 +283,7 @@ public boolean publish(List<? extends CoreSpan<?>> trace) {
289283 break ;
290284 }
291285 counted ++;
292- forceKeep |= publish (span , isTopLevel , features );
286+ forceKeep |= publish (span , isTopLevel );
293287 }
294288 }
295289 healthMetrics .onClientStatTraceComputed (
@@ -311,7 +305,7 @@ private boolean spanKindEligible(CoreSpan<?> span) {
311305 return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS .contains (spanKind .toString ());
312306 }
313307
314- private boolean publish (CoreSpan <?> span , boolean isTopLevel , DDAgentFeaturesDiscovery features ) {
308+ private boolean publish (CoreSpan <?> span , boolean isTopLevel ) {
315309 final CharSequence spanKind = span .getTag (SPAN_KIND , "" );
316310 MetricKey newKey =
317311 new MetricKey (
@@ -324,7 +318,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, DDAgentFeaturesDis
324318 span .getParentId () == 0 ,
325319 SPAN_KINDS .computeIfAbsent (
326320 spanKind , UTF8BytesString ::create ), // save repeated utf8 conversions
327- getPeerTags (span , spanKind .toString (), features ));
321+ getPeerTags (span , spanKind .toString ()));
328322 boolean isNewKey = false ;
329323 MetricKey key = keys .putIfAbsent (newKey , newKey );
330324 if (null == key ) {
@@ -359,8 +353,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, DDAgentFeaturesDis
359353 return isNewKey || span .getError () > 0 ;
360354 }
361355
362- private List <UTF8BytesString > getPeerTags (
363- CoreSpan <?> span , String spanKind , DDAgentFeaturesDiscovery features ) {
356+ private List <UTF8BytesString > getPeerTags (CoreSpan <?> span , String spanKind ) {
364357 if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION .contains (spanKind )) {
365358 List <UTF8BytesString > peerTags = new ArrayList <>();
366359 for (String peerTag : features .peerTags ()) {
@@ -424,7 +417,8 @@ public void onEvent(EventType eventType, String message) {
424417 switch (eventType ) {
425418 case DOWNGRADED :
426419 log .debug ("Agent downgrade was detected" );
427- AgentTaskScheduler .get ().execute (this ::disable );
420+ disable ();
421+ healthMetrics .onClientStatDowngraded ();
428422 break ;
429423 case BAD_PAYLOAD :
430424 log .debug ("bad metrics payload sent to trace agent: {}" , message );
@@ -440,11 +434,9 @@ public void onEvent(EventType eventType, String message) {
440434 }
441435
442436 private void disable () {
443- final DDAgentFeaturesDiscovery features = featuresDiscovery ();
444437 features .discover ();
445438 if (!features .supportsMetrics ()) {
446439 log .debug ("Disabling metric reporting because an agent downgrade was detected" );
447- healthMetrics .onClientStatDowngraded ();
448440 this .pending .clear ();
449441 this .batchPool .clear ();
450442 this .inbox .clear ();
0 commit comments