Skip to content

Commit 5a07913

Browse files
committed
1. xds: resolves data race
2. xds: considers authority as a separate dimension
1 parent 04584c1 commit 5a07913

File tree

4 files changed

+65
-94
lines changed

4 files changed

+65
-94
lines changed

xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,15 @@
3434
import java.util.Arrays;
3535
import java.util.Collections;
3636
import java.util.HashMap;
37+
import java.util.List;
3738
import java.util.Map;
3839
import java.util.concurrent.ExecutionException;
3940
import java.util.concurrent.Future;
4041
import java.util.concurrent.TimeUnit;
4142
import java.util.concurrent.TimeoutException;
4243
import java.util.logging.Level;
4344
import java.util.logging.Logger;
45+
import java.util.stream.Collectors;
4446
import javax.annotation.Nullable;
4547

4648
/**
@@ -143,13 +145,15 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {
143145
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
144146
getResourceMetadataCompleted.get(10, TimeUnit.SECONDS);
145147

146-
ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
147-
getResourceAuthorityCompleted = xdsClient.getSubscribedResourcesAuthoritySnapshot();
148+
List<String> resourceNames = metadataByType.values()
149+
.stream()
150+
.flatMap(innerMap -> innerMap.keySet().stream())
151+
.collect(Collectors.toList());
148152

149-
Map<XdsResourceType<?>, Map<String, String>> authorityByType =
150-
getResourceAuthorityCompleted.get(10, TimeUnit.SECONDS);
153+
Map<String, String> resourceNameToAuthority =
154+
xdsClient.getResourceNameToAuthorityMap(resourceNames);
151155

152-
computeAndReportResourceCounts(metadataByType, authorityByType, callback);
156+
computeAndReportResourceCounts(metadataByType, resourceNameToAuthority, callback);
153157

154158
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
155159
Void unused = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS);
@@ -163,27 +167,37 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {
163167

164168
private void computeAndReportResourceCounts(
165169
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType,
166-
Map<XdsResourceType<?>, Map<String, String>> authorityByType,
170+
Map<String, String> resourceNameToAuthority,
167171
MetricReporterCallback callback) {
168172
for (Map.Entry<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByTypeEntry :
169173
metadataByType.entrySet()) {
170174
XdsResourceType<?> type = metadataByTypeEntry.getKey();
175+
Map<String, ResourceMetadata> resources = metadataByTypeEntry.getValue();
171176

172-
Map<String, Long> resourceCountsByState = new HashMap<>();
173-
Map<String, String> authorityByState = new HashMap<>();
174-
for (Map.Entry<String, ResourceMetadata> metadataByName :
175-
metadataByTypeEntry.getValue().entrySet()) {
176-
String resourceName = metadataByName.getKey();
177-
ResourceMetadata metadata = metadataByName.getValue();
177+
Map<String, Map<String, Long>> resourceCountsByAuthorityAndState = new HashMap<>();
178+
for (Map.Entry<String, ResourceMetadata> resourceEntry : resources.entrySet()) {
179+
String resourceName = resourceEntry.getKey();
180+
ResourceMetadata metadata = resourceEntry.getValue();
181+
String authority = resourceNameToAuthority.getOrDefault(resourceName, "");
178182
String cacheState = cacheStateFromResourceStatus(metadata.getStatus(), metadata.isCached());
179-
resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1);
180-
authorityByState.put(cacheState, authorityByType.get(type).get(resourceName));
183+
resourceCountsByAuthorityAndState
184+
.computeIfAbsent(authority, k -> new HashMap<>())
185+
.merge(cacheState, 1L, Long::sum);
181186
}
182187

183-
resourceCountsByState.forEach((cacheState, count) -> {
184-
callback.reportResourceCountGauge(authorityByState.get(cacheState),
185-
count, cacheState, type.typeUrl());
186-
});
188+
// Report metrics
189+
for (Map.Entry<String, Map<String, Long>> authorityEntry
190+
: resourceCountsByAuthorityAndState.entrySet()) {
191+
String authority = authorityEntry.getKey();
192+
Map<String, Long> stateCounts = authorityEntry.getValue();
193+
194+
for (Map.Entry<String, Long> stateEntry : stateCounts.entrySet()) {
195+
String cacheState = stateEntry.getKey();
196+
Long count = stateEntry.getValue();
197+
198+
callback.reportResourceCountGauge(authority, count, cacheState, type.typeUrl());
199+
}
200+
}
187201
}
188202
}
189203

@@ -213,12 +227,11 @@ static final class MetricReporterCallback implements ServerConnectionCallback {
213227
this.target = target;
214228
}
215229

216-
// TODO(dnvindhya): include the "authority" label once xds.authority is available.
217230
void reportResourceCountGauge(String authority, long resourceCount, String cacheState,
218231
String resourceType) {
219232
recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount,
220-
Arrays.asList(target, authority == null ? "#old" : authority,
221-
cacheState, resourceType), Collections.emptyList());
233+
Arrays.asList(target, authority.isEmpty() ? "#old" : authority,
234+
cacheState, resourceType), Collections.emptyList());
222235
}
223236

224237
@Override

xds/src/main/java/io/grpc/xds/client/XdsClient.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -319,19 +319,6 @@ public Object getSecurityConfig() {
319319
throw new UnsupportedOperationException();
320320
}
321321

322-
/**
323-
* Returns a {@link ListenableFuture} to the snapshot of the subscribed resources as
324-
* they are at the moment of the call.
325-
*
326-
* <p>The snapshot is a map from the "resource type" to
327-
* a map ("resource name": "authority").
328-
*/
329-
// Must be synchronized.
330-
public ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
331-
getSubscribedResourcesAuthoritySnapshot() {
332-
throw new UnsupportedOperationException();
333-
}
334-
335322
/**
336323
* Registers a data watcher for the given Xds resource.
337324
*/
@@ -391,6 +378,13 @@ public Map<Bootstrapper.ServerInfo, LoadReportClient> getServerLrsClientMap() {
391378
throw new UnsupportedOperationException();
392379
}
393380

381+
/**
382+
* Returns a map of resource names to the authority.
383+
*/
384+
public Map<String, String> getResourceNameToAuthorityMap(List<String> resourceNames) {
385+
throw new UnsupportedOperationException();
386+
}
387+
394388
/** Callback used to report a gauge metric value for server connections. */
395389
public interface ServerConnectionCallback {
396390
void reportServerConnectionGauge(boolean isConnected, String xdsServer);

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.concurrent.Future;
5858
import java.util.concurrent.ScheduledExecutorService;
5959
import java.util.concurrent.TimeUnit;
60+
import java.util.function.Function;
6061
import java.util.stream.Collectors;
6162
import javax.annotation.Nullable;
6263

@@ -242,33 +243,6 @@ public void run() {
242243
return future;
243244
}
244245

245-
// As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
246-
// ResourceTypes that do not have subscribers does not show up in the snapshot keys.
247-
@Override
248-
public ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
249-
getSubscribedResourcesAuthoritySnapshot() {
250-
final SettableFuture<Map<XdsResourceType<?>, Map<String, String>>> future =
251-
SettableFuture.create();
252-
syncContext.execute(new Runnable() {
253-
@Override
254-
public void run() {
255-
// A map from a "resource type" to a map ("resource name": "authority")
256-
ImmutableMap.Builder<XdsResourceType<?>, Map<String, String>> authoritySnapshot =
257-
ImmutableMap.builder();
258-
for (XdsResourceType<?> resourceType : resourceSubscribers.keySet()) {
259-
ImmutableMap.Builder<String, String> authorityMap = ImmutableMap.builder();
260-
for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry
261-
: resourceSubscribers.get(resourceType).entrySet()) {
262-
authorityMap.put(resourceEntry.getKey(), resourceEntry.getValue().authority);
263-
}
264-
authoritySnapshot.put(resourceType, authorityMap.buildOrThrow());
265-
}
266-
future.set(authoritySnapshot.buildOrThrow());
267-
}
268-
});
269-
return future;
270-
}
271-
272246
@Override
273247
public Object getSecurityConfig() {
274248
return securityConfig;
@@ -572,6 +546,17 @@ private String getAuthority(String resource) {
572546
return authority;
573547
}
574548

549+
@Override
550+
public Map<String, String> getResourceNameToAuthorityMap(List<String> resourceNames) {
551+
if (resourceNames == null || resourceNames.isEmpty()) {
552+
return Collections.emptyMap();
553+
}
554+
return resourceNames.stream()
555+
.collect(Collectors.toMap(
556+
Function.identity(),
557+
this::getAuthority));
558+
}
559+
575560
@Nullable
576561
private ImmutableList<ServerInfo> getServerInfos(String authority) {
577562
if (authority != null) {

xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java

Lines changed: 12 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.common.truth.Truth.assertThat;
2020
import static org.mockito.AdditionalAnswers.delegatesTo;
2121
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.anyList;
2223
import static org.mockito.ArgumentMatchers.argThat;
2324
import static org.mockito.ArgumentMatchers.eq;
2425
import static org.mockito.Mockito.inOrder;
@@ -129,8 +130,6 @@ public void setXdsClient_reportMetrics() throws Exception {
129130
future.set(null);
130131
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture(
131132
ImmutableMap.of()));
132-
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
133-
.thenReturn(Futures.immediateFuture(ImmutableMap.of()));
134133
when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class)))
135134
.thenReturn(future);
136135
reporter.setXdsClient(mockXdsClient);
@@ -152,8 +151,6 @@ public void setXdsClient_reportCallbackMetrics_resourceCountsFails() {
152151
future.set(null);
153152
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture(
154153
ImmutableMap.of()));
155-
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
156-
.thenReturn(Futures.immediateFuture(ImmutableMap.of()));
157154

158155
// Create a future that will throw an exception
159156
SettableFuture<Void> serverConnectionsFeature = SettableFuture.create();
@@ -181,8 +178,6 @@ public void metricGauges() {
181178
future.set(null);
182179
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot()).thenReturn(Futures.immediateFuture(
183180
ImmutableMap.of()));
184-
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
185-
.thenReturn(Futures.immediateFuture(ImmutableMap.of()));
186181
when(mockXdsClient.reportServerConnections(any(ServerConnectionCallback.class)))
187182
.thenReturn(future);
188183
reporter.setXdsClient(mockXdsClient);
@@ -238,7 +233,7 @@ public void metricReporterCallback() {
238233
@Test
239234
public void reportCallbackMetrics_computeAndReportResourceCounts() {
240235
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType = new HashMap<>();
241-
Map<XdsResourceType<?>, Map<String, String>> authorityByType = new HashMap<>();
236+
Map<String, String> resourceNameByAuthority = new HashMap<>();
242237
XdsResourceType<?> listenerResource = XdsListenerResource.getInstance();
243238
XdsResourceType<?> routeConfigResource = XdsRouteConfigureResource.getInstance();
244239
XdsResourceType<?> clusterResource = XdsClusterResource.getInstance();
@@ -248,44 +243,37 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() {
248243
long nanosLastUpdate = 1577923199_606042047L;
249244

250245
Map<String, ResourceMetadata> ldsResourceMetadataMap = new HashMap<>();
251-
Map<String, String> ldsAuthorityMap = new HashMap<>();
252246
ldsResourceMetadataMap.put("resource1",
253247
ResourceMetadata.newResourceMetadataRequested());
254-
ldsAuthorityMap.put("resource1", "authority1");
248+
resourceNameByAuthority.put("resource1", "authority1");
255249
ResourceMetadata ackedLdsResource = ResourceMetadata.newResourceMetadataAcked(rawListener, "42",
256250
nanosLastUpdate);
257251
ldsResourceMetadataMap.put("resource2", ackedLdsResource);
258-
ldsAuthorityMap.put("resource2", "authority2");
252+
resourceNameByAuthority.put("resource2", "authority2");
259253
ldsResourceMetadataMap.put("resource3",
260254
ResourceMetadata.newResourceMetadataAcked(rawListener, "43", nanosLastUpdate));
261-
ldsAuthorityMap.put("resource3", "authority3");
255+
resourceNameByAuthority.put("resource3", "authority2");
262256
ldsResourceMetadataMap.put("resource4",
263257
ResourceMetadata.newResourceMetadataNacked(ackedLdsResource, "44", nanosLastUpdate,
264258
"nacked after previous ack", true));
265-
ldsAuthorityMap.put("resource4", "authority4");
259+
resourceNameByAuthority.put("resource4", "authority4");
266260

267261
Map<String, ResourceMetadata> rdsResourceMetadataMap = new HashMap<>();
268-
Map<String, String> rdsAuthorityMap = new HashMap<>();
269262
ResourceMetadata requestedRdsResourceMetadata = ResourceMetadata.newResourceMetadataRequested();
270263
rdsResourceMetadataMap.put("resource5",
271264
ResourceMetadata.newResourceMetadataNacked(requestedRdsResourceMetadata, "24",
272265
nanosLastUpdate, "nacked after request", false));
273-
rdsAuthorityMap.put("resource5", "authority5");
266+
resourceNameByAuthority.put("resource5", "authority5");
274267
rdsResourceMetadataMap.put("resource6",
275268
ResourceMetadata.newResourceMetadataDoesNotExist());
276-
rdsAuthorityMap.put("resource6", "authority6");
269+
resourceNameByAuthority.put("resource6", "authority6");
277270

278271
Map<String, ResourceMetadata> cdsResourceMetadataMap = new HashMap<>();
279-
Map<String, String> cdsAuthorityMap = new HashMap<>();
280272
cdsResourceMetadataMap.put("resource7", ResourceMetadata.newResourceMetadataUnknown());
281-
cdsAuthorityMap.put("resource7", "authority7");
282273

283274
metadataByType.put(listenerResource, ldsResourceMetadataMap);
284-
authorityByType.put(listenerResource, ldsAuthorityMap);
285275
metadataByType.put(routeConfigResource, rdsResourceMetadataMap);
286-
authorityByType.put(routeConfigResource, rdsAuthorityMap);
287276
metadataByType.put(clusterResource, cdsResourceMetadataMap);
288-
authorityByType.put(clusterResource, cdsAuthorityMap);
289277

290278
SettableFuture<Void> reportServerConnectionsCompleted = SettableFuture.create();
291279
reportServerConnectionsCompleted.set(null);
@@ -297,10 +285,8 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() {
297285
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot())
298286
.thenReturn(getResourceMetadataCompleted);
299287

300-
ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
301-
getResourceAuthorityCompleted = Futures.immediateFuture(authorityByType);
302-
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
303-
.thenReturn(getResourceAuthorityCompleted);
288+
when(mockXdsClient.getResourceNameToAuthorityMap(anyList()))
289+
.thenReturn(resourceNameByAuthority);
304290

305291
reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient);
306292

@@ -310,7 +296,7 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() {
310296
"requested", listenerResource.typeUrl())), any());
311297
// LDS resources acked
312298
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
313-
eq(2L), eq(Arrays.asList(target, "authority3",
299+
eq(2L), eq(Arrays.asList(target, "authority2",
314300
"acked", listenerResource.typeUrl())), any());
315301
// LDS resource nacked but cached
316302
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
@@ -328,18 +314,16 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() {
328314

329315
// CDS resource unknown
330316
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
331-
eq(1L), eq(Arrays.asList(target, "authority7",
317+
eq(1L), eq(Arrays.asList(target, "#old",
332318
"unknown", clusterResource.typeUrl())), any());
333319
verifyNoMoreInteractions(mockBatchRecorder);
334320
}
335321

336322
@Test
337323
public void reportCallbackMetrics_computeAndReportResourceCounts_emptyResources() {
338324
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType = new HashMap<>();
339-
Map<XdsResourceType<?>, Map<String, String>> authorityByType = new HashMap<>();
340325
XdsResourceType<?> listenerResource = XdsListenerResource.getInstance();
341326
metadataByType.put(listenerResource, Collections.emptyMap());
342-
authorityByType.put(listenerResource, Collections.emptyMap());
343327

344328
SettableFuture<Void> reportServerConnectionsCompleted = SettableFuture.create();
345329
reportServerConnectionsCompleted.set(null);
@@ -351,11 +335,6 @@ public void reportCallbackMetrics_computeAndReportResourceCounts_emptyResources(
351335
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot())
352336
.thenReturn(getResourceMetadataCompleted);
353337

354-
ListenableFuture<Map<XdsResourceType<?>, Map<String, String>>>
355-
getAuthorityCompleted = Futures.immediateFuture(authorityByType);
356-
when(mockXdsClient.getSubscribedResourcesAuthoritySnapshot())
357-
.thenReturn(getAuthorityCompleted);
358-
359338
reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient);
360339

361340
// Verify that reportResourceCountGauge is never called

0 commit comments

Comments
 (0)