@@ -98,7 +98,8 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
9898 private final Aggregator aggregator ;
9999 private final long reportingInterval ;
100100 private final TimeUnit reportingIntervalTimeUnit ;
101- private final DDAgentFeaturesDiscovery features ;
101+ private final SharedCommunicationObjects sharedCommunicationObjects ;
102+ private volatile DDAgentFeaturesDiscovery features ;
102103 private final HealthMetrics healthMetrics ;
103104
104105 private volatile AgentTaskScheduler .Scheduled <?> cancellation ;
@@ -110,7 +111,7 @@ public ConflatingMetricsAggregator(
110111 this (
111112 config .getWellKnownTags (),
112113 config .getMetricsIgnoredResources (),
113- sharedCommunicationObjects . featuresDiscovery ( config ) ,
114+ sharedCommunicationObjects ,
114115 healthMetrics ,
115116 new OkHttpSink (
116117 sharedCommunicationObjects .okHttpClient ,
@@ -126,15 +127,15 @@ public ConflatingMetricsAggregator(
126127 ConflatingMetricsAggregator (
127128 WellKnownTags wellKnownTags ,
128129 Set <String > ignoredResources ,
129- DDAgentFeaturesDiscovery features ,
130+ SharedCommunicationObjects sharedCommunicationObjects ,
130131 HealthMetrics healthMetric ,
131132 Sink sink ,
132133 int maxAggregates ,
133134 int queueSize ) {
134135 this (
135136 wellKnownTags ,
136137 ignoredResources ,
137- features ,
138+ sharedCommunicationObjects ,
138139 healthMetric ,
139140 sink ,
140141 maxAggregates ,
@@ -146,7 +147,7 @@ public ConflatingMetricsAggregator(
146147 ConflatingMetricsAggregator (
147148 WellKnownTags wellKnownTags ,
148149 Set <String > ignoredResources ,
149- DDAgentFeaturesDiscovery features ,
150+ SharedCommunicationObjects sharedCommunicationObjects ,
150151 HealthMetrics healthMetric ,
151152 Sink sink ,
152153 int maxAggregates ,
@@ -155,7 +156,7 @@ public ConflatingMetricsAggregator(
155156 TimeUnit timeUnit ) {
156157 this (
157158 ignoredResources ,
158- features ,
159+ sharedCommunicationObjects ,
159160 healthMetric ,
160161 sink ,
161162 new SerializingMetricWriter (wellKnownTags , sink ),
@@ -167,7 +168,7 @@ public ConflatingMetricsAggregator(
167168
168169 ConflatingMetricsAggregator (
169170 Set <String > ignoredResources ,
170- DDAgentFeaturesDiscovery features ,
171+ SharedCommunicationObjects sharedCommunicationObjects ,
171172 HealthMetrics healthMetric ,
172173 Sink sink ,
173174 MetricWriter metricWriter ,
@@ -180,7 +181,7 @@ public ConflatingMetricsAggregator(
180181 this .batchPool = new SpmcArrayQueue <>(maxAggregates );
181182 this .pending = new NonBlockingHashMap <>(maxAggregates * 4 / 3 );
182183 this .keys = new NonBlockingHashMap <>();
183- this .features = features ;
184+ this .sharedCommunicationObjects = sharedCommunicationObjects ;
184185 this .healthMetrics = healthMetric ;
185186 this .sink = sink ;
186187 this .aggregator =
@@ -198,6 +199,18 @@ public ConflatingMetricsAggregator(
198199 this .reportingIntervalTimeUnit = timeUnit ;
199200 }
200201
202+ private DDAgentFeaturesDiscovery featuresDiscovery () {
203+ DDAgentFeaturesDiscovery ret = features ;
204+ if (ret != null ) {
205+ return ret ;
206+ }
207+ // no need to synchronise here since it's already done in sharedCommunicationObject.
208+ // At worst, we'll assign multiple time the variable but it will be the same object
209+ ret = sharedCommunicationObjects .featuresDiscovery (Config .get ());
210+ features = ret ;
211+ return ret ;
212+ }
213+
201214 @ Override
202215 public void start () {
203216 sink .register (this );
@@ -213,13 +226,6 @@ public void start() {
213226 log .debug ("started metrics aggregator" );
214227 }
215228
216- private boolean isMetricsEnabled () {
217- if (features .getMetricsEndpoint () == null ) {
218- features .discoverIfOutdated ();
219- }
220- return features .supportsMetrics ();
221- }
222-
223229 @ Override
224230 public boolean report () {
225231 boolean published ;
@@ -236,8 +242,7 @@ public boolean report() {
236242
237243 @ Override
238244 public Future <Boolean > forceReport () {
239- // Ensure the feature is enabled
240- if (!isMetricsEnabled ()) {
245+ if (!featuresDiscovery ().supportsMetrics ()) {
241246 return CompletableFuture .completedFuture (false );
242247 }
243248 // Wait for the thread to start
@@ -273,6 +278,7 @@ public Future<Boolean> forceReport() {
273278 public boolean publish (List <? extends CoreSpan <?>> trace ) {
274279 boolean forceKeep = false ;
275280 int counted = 0 ;
281+ final DDAgentFeaturesDiscovery features = featuresDiscovery ();
276282 if (features .supportsMetrics ()) {
277283 for (CoreSpan <?> span : trace ) {
278284 boolean isTopLevel = span .isTopLevel ();
@@ -283,7 +289,7 @@ public boolean publish(List<? extends CoreSpan<?>> trace) {
283289 break ;
284290 }
285291 counted ++;
286- forceKeep |= publish (span , isTopLevel );
292+ forceKeep |= publish (span , isTopLevel , features );
287293 }
288294 }
289295 healthMetrics .onClientStatTraceComputed (
@@ -305,7 +311,7 @@ private boolean spanKindEligible(CoreSpan<?> span) {
305311 return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS .contains (spanKind .toString ());
306312 }
307313
308- private boolean publish (CoreSpan <?> span , boolean isTopLevel ) {
314+ private boolean publish (CoreSpan <?> span , boolean isTopLevel , DDAgentFeaturesDiscovery features ) {
309315 final CharSequence spanKind = span .getTag (SPAN_KIND , "" );
310316 MetricKey newKey =
311317 new MetricKey (
@@ -318,7 +324,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
318324 span .getParentId () == 0 ,
319325 SPAN_KINDS .computeIfAbsent (
320326 spanKind , UTF8BytesString ::create ), // save repeated utf8 conversions
321- getPeerTags (span , spanKind .toString ()));
327+ getPeerTags (span , spanKind .toString (), features ));
322328 boolean isNewKey = false ;
323329 MetricKey key = keys .putIfAbsent (newKey , newKey );
324330 if (null == key ) {
@@ -353,7 +359,8 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
353359 return isNewKey || span .getError () > 0 ;
354360 }
355361
356- private List <UTF8BytesString > getPeerTags (CoreSpan <?> span , String spanKind ) {
362+ private List <UTF8BytesString > getPeerTags (
363+ CoreSpan <?> span , String spanKind , DDAgentFeaturesDiscovery features ) {
357364 if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION .contains (spanKind )) {
358365 List <UTF8BytesString > peerTags = new ArrayList <>();
359366 for (String peerTag : features .peerTags ()) {
@@ -417,8 +424,7 @@ public void onEvent(EventType eventType, String message) {
417424 switch (eventType ) {
418425 case DOWNGRADED :
419426 log .debug ("Agent downgrade was detected" );
420- disable ();
421- healthMetrics .onClientStatDowngraded ();
427+ AgentTaskScheduler .get ().execute (this ::disable );
422428 break ;
423429 case BAD_PAYLOAD :
424430 log .debug ("bad metrics payload sent to trace agent: {}" , message );
@@ -434,9 +440,11 @@ public void onEvent(EventType eventType, String message) {
434440 }
435441
436442 private void disable () {
443+ final DDAgentFeaturesDiscovery features = featuresDiscovery ();
437444 features .discover ();
438445 if (!features .supportsMetrics ()) {
439446 log .debug ("Disabling metric reporting because an agent downgrade was detected" );
447+ healthMetrics .onClientStatDowngraded ();
440448 this .pending .clear ();
441449 this .batchPool .clear ();
442450 this .inbox .clear ();
0 commit comments