@@ -98,8 +98,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
9898  private  final  Aggregator  aggregator ;
9999  private  final  long  reportingInterval ;
100100  private  final  TimeUnit  reportingIntervalTimeUnit ;
101-   private  final  SharedCommunicationObjects  sharedCommunicationObjects ;
102-   private  volatile  DDAgentFeaturesDiscovery  features ;
101+   private  final  DDAgentFeaturesDiscovery  features ;
103102  private  final  HealthMetrics  healthMetrics ;
104103
105104  private  volatile  AgentTaskScheduler .Scheduled <?> cancellation ;
@@ -111,7 +110,7 @@ public ConflatingMetricsAggregator(
111110    this (
112111        config .getWellKnownTags (),
113112        config .getMetricsIgnoredResources (),
114-         sharedCommunicationObjects ,
113+         sharedCommunicationObjects . featuresDiscovery ( config ) ,
115114        healthMetrics ,
116115        new  OkHttpSink (
117116            sharedCommunicationObjects .okHttpClient ,
@@ -127,15 +126,15 @@ public ConflatingMetricsAggregator(
127126  ConflatingMetricsAggregator (
128127      WellKnownTags  wellKnownTags ,
129128      Set <String > ignoredResources ,
130-       SharedCommunicationObjects   sharedCommunicationObjects ,
129+       DDAgentFeaturesDiscovery   features ,
131130      HealthMetrics  healthMetric ,
132131      Sink  sink ,
133132      int  maxAggregates ,
134133      int  queueSize ) {
135134    this (
136135        wellKnownTags ,
137136        ignoredResources ,
138-         sharedCommunicationObjects ,
137+         features ,
139138        healthMetric ,
140139        sink ,
141140        maxAggregates ,
@@ -147,7 +146,7 @@ public ConflatingMetricsAggregator(
147146  ConflatingMetricsAggregator (
148147      WellKnownTags  wellKnownTags ,
149148      Set <String > ignoredResources ,
150-       SharedCommunicationObjects   sharedCommunicationObjects ,
149+       DDAgentFeaturesDiscovery   features ,
151150      HealthMetrics  healthMetric ,
152151      Sink  sink ,
153152      int  maxAggregates ,
@@ -156,7 +155,7 @@ public ConflatingMetricsAggregator(
156155      TimeUnit  timeUnit ) {
157156    this (
158157        ignoredResources ,
159-         sharedCommunicationObjects ,
158+         features ,
160159        healthMetric ,
161160        sink ,
162161        new  SerializingMetricWriter (wellKnownTags , sink ),
@@ -168,7 +167,7 @@ public ConflatingMetricsAggregator(
168167
169168  ConflatingMetricsAggregator (
170169      Set <String > ignoredResources ,
171-       SharedCommunicationObjects   sharedCommunicationObjects ,
170+       DDAgentFeaturesDiscovery   features ,
172171      HealthMetrics  healthMetric ,
173172      Sink  sink ,
174173      MetricWriter  metricWriter ,
@@ -181,7 +180,7 @@ public ConflatingMetricsAggregator(
181180    this .batchPool  = new  SpmcArrayQueue <>(maxAggregates );
182181    this .pending  = new  NonBlockingHashMap <>(maxAggregates  * 4  / 3 );
183182    this .keys  = new  NonBlockingHashMap <>();
184-     this .sharedCommunicationObjects  = sharedCommunicationObjects ;
183+     this .features  = features ;
185184    this .healthMetrics  = healthMetric ;
186185    this .sink  = sink ;
187186    this .aggregator  =
@@ -199,18 +198,6 @@ public ConflatingMetricsAggregator(
199198    this .reportingIntervalTimeUnit  = timeUnit ;
200199  }
201200
202-   private  DDAgentFeaturesDiscovery  featuresDiscovery () {
203-     DDAgentFeaturesDiscovery  ret  = features ;
204-     if  (ret  != null ) {
205-       return  ret ;
206-     }
207-     // no need to synchronise here since it's already done in sharedCommunicationObject. 
208-     // At worst, we'll assign multiple time the variable but it will be the same object 
209-     ret  = sharedCommunicationObjects .featuresDiscovery (Config .get ());
210-     features  = ret ;
211-     return  ret ;
212-   }
213- 
214201  @ Override 
215202  public  void  start () {
216203    sink .register (this );
@@ -226,6 +213,13 @@ public void start() {
226213    log .debug ("started metrics aggregator" );
227214  }
228215
216+   private  boolean  isMetricsEnabled () {
217+     if  (features .getMetricsEndpoint () == null ) {
218+       features .discoverIfOutdated ();
219+     }
220+     return  features .supportsMetrics ();
221+   }
222+ 
229223  @ Override 
230224  public  boolean  report () {
231225    boolean  published ;
@@ -242,7 +236,8 @@ public boolean report() {
242236
243237  @ Override 
244238  public  Future <Boolean > forceReport () {
245-     if  (!featuresDiscovery ().supportsMetrics ()) {
239+     // Ensure the feature is enabled 
240+     if  (!isMetricsEnabled ()) {
246241      return  CompletableFuture .completedFuture (false );
247242    }
248243    // Wait for the thread to start 
@@ -278,7 +273,6 @@ public Future<Boolean> forceReport() {
278273  public  boolean  publish (List <? extends  CoreSpan <?>> trace ) {
279274    boolean  forceKeep  = false ;
280275    int  counted  = 0 ;
281-     final  DDAgentFeaturesDiscovery  features  = featuresDiscovery ();
282276    if  (features .supportsMetrics ()) {
283277      for  (CoreSpan <?> span  : trace ) {
284278        boolean  isTopLevel  = span .isTopLevel ();
@@ -289,7 +283,7 @@ public boolean publish(List<? extends CoreSpan<?>> trace) {
289283            break ;
290284          }
291285          counted ++;
292-           forceKeep  |= publish (span , isTopLevel ,  features );
286+           forceKeep  |= publish (span , isTopLevel );
293287        }
294288      }
295289      healthMetrics .onClientStatTraceComputed (
@@ -311,7 +305,7 @@ private boolean spanKindEligible(CoreSpan<?> span) {
311305    return  spanKind  != null  && ELIGIBLE_SPAN_KINDS_FOR_METRICS .contains (spanKind .toString ());
312306  }
313307
314-   private  boolean  publish (CoreSpan <?> span , boolean  isTopLevel ,  DDAgentFeaturesDiscovery   features ) {
308+   private  boolean  publish (CoreSpan <?> span , boolean  isTopLevel ) {
315309    final  CharSequence  spanKind  = span .getTag (SPAN_KIND , "" );
316310    MetricKey  newKey  =
317311        new  MetricKey (
@@ -324,7 +318,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, DDAgentFeaturesDis
324318            span .getParentId () == 0 ,
325319            SPAN_KINDS .computeIfAbsent (
326320                spanKind , UTF8BytesString ::create ), // save repeated utf8 conversions 
327-             getPeerTags (span , spanKind .toString (),  features ));
321+             getPeerTags (span , spanKind .toString ()));
328322    boolean  isNewKey  = false ;
329323    MetricKey  key  = keys .putIfAbsent (newKey , newKey );
330324    if  (null  == key ) {
@@ -359,8 +353,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, DDAgentFeaturesDis
359353    return  isNewKey  || span .getError () > 0 ;
360354  }
361355
362-   private  List <UTF8BytesString > getPeerTags (
363-       CoreSpan <?> span , String  spanKind , DDAgentFeaturesDiscovery  features ) {
356+   private  List <UTF8BytesString > getPeerTags (CoreSpan <?> span , String  spanKind ) {
364357    if  (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION .contains (spanKind )) {
365358      List <UTF8BytesString > peerTags  = new  ArrayList <>();
366359      for  (String  peerTag  : features .peerTags ()) {
@@ -424,7 +417,8 @@ public void onEvent(EventType eventType, String message) {
424417    switch  (eventType ) {
425418      case  DOWNGRADED :
426419        log .debug ("Agent downgrade was detected" );
427-         AgentTaskScheduler .get ().execute (this ::disable );
420+         disable ();
421+         healthMetrics .onClientStatDowngraded ();
428422        break ;
429423      case  BAD_PAYLOAD :
430424        log .debug ("bad metrics payload sent to trace agent: {}" , message );
@@ -440,11 +434,9 @@ public void onEvent(EventType eventType, String message) {
440434  }
441435
442436  private  void  disable () {
443-     final  DDAgentFeaturesDiscovery  features  = featuresDiscovery ();
444437    features .discover ();
445438    if  (!features .supportsMetrics ()) {
446439      log .debug ("Disabling metric reporting because an agent downgrade was detected" );
447-       healthMetrics .onClientStatDowngraded ();
448440      this .pending .clear ();
449441      this .batchPool .clear ();
450442      this .inbox .clear ();
0 commit comments