Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -90,7 +92,7 @@ final class XdsClientMetricReporterImpl implements XdsClientMetricReporter {
Arrays.asList("grpc.target", "grpc.xds.server"), Collections.emptyList(), false);
RESOURCES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.xds_client.resources",
"EXPERIMENTAL. Number of xDS resources.", "{resource}",
Arrays.asList("grpc.target", "grpc.xds.cache_state",
Arrays.asList("grpc.target", "grpc.xds.authority", "grpc.xds.cache_state",
"grpc.xds.resource_type"), Collections.emptyList(), false);
}

Expand Down Expand Up @@ -143,7 +145,15 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
getResourceMetadataCompleted.get(10, TimeUnit.SECONDS);

computeAndReportResourceCounts(metadataByType, callback);
List<String> resourceNames = metadataByType.values()
.stream()
.flatMap(innerMap -> innerMap.keySet().stream())
.collect(Collectors.toList());

Map<String, String> resourceNameToAuthority =
xdsClient.getResourceNameToAuthorityMap(resourceNames);

computeAndReportResourceCounts(metadataByType, resourceNameToAuthority, callback);

// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
Void unused = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS);
Expand All @@ -157,19 +167,37 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {

private void computeAndReportResourceCounts(
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType,
Map<String, String> resourceNameToAuthority,
MetricReporterCallback callback) {
for (Map.Entry<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByTypeEntry :
metadataByType.entrySet()) {
XdsResourceType<?> type = metadataByTypeEntry.getKey();
Map<String, ResourceMetadata> resources = metadataByTypeEntry.getValue();

Map<String, Long> resourceCountsByState = new HashMap<>();
for (ResourceMetadata metadata : metadataByTypeEntry.getValue().values()) {
Map<String, Map<String, Long>> resourceCountsByAuthorityAndState = new HashMap<>();
for (Map.Entry<String, ResourceMetadata> resourceEntry : resources.entrySet()) {
String resourceName = resourceEntry.getKey();
ResourceMetadata metadata = resourceEntry.getValue();
String authority = resourceNameToAuthority.getOrDefault(resourceName, "");
String cacheState = cacheStateFromResourceStatus(metadata.getStatus(), metadata.isCached());
resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1);
resourceCountsByAuthorityAndState
.computeIfAbsent(authority, k -> new HashMap<>())
.merge(cacheState, 1L, Long::sum);
}

resourceCountsByState.forEach((cacheState, count) ->
callback.reportResourceCountGauge(count, cacheState, type.typeUrl()));
// Report metrics
for (Map.Entry<String, Map<String, Long>> authorityEntry
: resourceCountsByAuthorityAndState.entrySet()) {
String authority = authorityEntry.getKey();
Map<String, Long> stateCounts = authorityEntry.getValue();

for (Map.Entry<String, Long> stateEntry : stateCounts.entrySet()) {
String cacheState = stateEntry.getKey();
Long count = stateEntry.getValue();

callback.reportResourceCountGauge(authority, count, cacheState, type.typeUrl());
}
}
}
}

Expand Down Expand Up @@ -199,11 +227,11 @@ static final class MetricReporterCallback implements ServerConnectionCallback {
this.target = target;
}

// TODO(dnvindhya): include the "authority" label once xds.authority is available.
void reportResourceCountGauge(long resourceCount, String cacheState,
void reportResourceCountGauge(String authority, long resourceCount, String cacheState,
String resourceType) {
recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount,
Arrays.asList(target, cacheState, resourceType), Collections.emptyList());
Arrays.asList(target, authority == null || authority.isEmpty() ? "#old" : authority,
cacheState, resourceType), Collections.emptyList());
}

@Override
Expand Down
7 changes: 7 additions & 0 deletions xds/src/main/java/io/grpc/xds/client/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,13 @@
throw new UnsupportedOperationException();
}

/**
* Returns a map of resource names to the authority.
*/
public Map<String, String> getResourceNameToAuthorityMap(List<String> resourceNames) {
throw new UnsupportedOperationException();

Check warning on line 385 in xds/src/main/java/io/grpc/xds/client/XdsClient.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClient.java#L385

Added line #L385 was not covered by tests
}

/** Callback used to report a gauge metric value for server connections. */
public interface ServerConnectionCallback {
void reportServerConnectionGauge(boolean isConnected, String xdsServer);
Expand Down
12 changes: 12 additions & 0 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -545,6 +546,17 @@
return authority;
}

@Override
public Map<String, String> getResourceNameToAuthorityMap(List<String> resourceNames) {
if (resourceNames == null || resourceNames.isEmpty()) {
return Collections.emptyMap();

Check warning on line 552 in xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java#L552

Added line #L552 was not covered by tests
}
return resourceNames.stream()
.collect(Collectors.toMap(
Function.identity(),

Check warning on line 556 in xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java#L554-L556

Added lines #L554 - L556 were not covered by tests
this::getAuthority));
}

@Nullable
private ImmutableList<ServerInfo> getServerInfos(String authority) {
if (authority != null) {
Expand Down
37 changes: 27 additions & 10 deletions xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
Expand Down Expand Up @@ -76,6 +77,7 @@
public class XdsClientMetricReporterImplTest {

private static final String target = "test-target";
private static final String authority = "test-authority";
private static final String server = "trafficdirector.googleapis.com";
private static final String resourceTypeUrl =
"resourceTypeUrl.googleapis.com/envoy.config.cluster.v3.Cluster";
Expand All @@ -101,7 +103,6 @@ public void setUp() {

@Test
public void reportResourceUpdates() {
// TODO(dnvindhya): add the "authority" label once available.
reporter.reportResourceUpdates(10, 5, server, resourceTypeUrl);
verify(mockMetricRecorder).addLongCounter(
eqMetricInstrumentName("grpc.xds_client.resource_updates_valid"), eq((long) 10),
Expand Down Expand Up @@ -199,7 +200,7 @@ public void metricGauges() {

// Verify that reportResourceCounts and reportServerConnections were called
// with the captured callback
callback.reportResourceCountGauge(10, "acked", resourceTypeUrl);
callback.reportResourceCountGauge("PotatoHead", 10, "acked", resourceTypeUrl);
inOrder.verify(mockBatchRecorder)
.recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L), any(),
any());
Expand All @@ -222,16 +223,17 @@ public void metricReporterCallback() {
eq(Lists.newArrayList()));

String cacheState = "requested";
callback.reportResourceCountGauge(10, cacheState, resourceTypeUrl);
callback.reportResourceCountGauge(authority, 10, cacheState, resourceTypeUrl);
verify(mockBatchRecorder, times(1)).recordLongGauge(
eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L),
eq(Arrays.asList(target, cacheState, resourceTypeUrl)),
eq(Arrays.asList(target, authority, cacheState, resourceTypeUrl)),
eq(Collections.emptyList()));
}

@Test
public void reportCallbackMetrics_computeAndReportResourceCounts() {
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType = new HashMap<>();
Map<String, String> resourceNameByAuthority = new HashMap<>();
XdsResourceType<?> listenerResource = XdsListenerResource.getInstance();
XdsResourceType<?> routeConfigResource = XdsRouteConfigureResource.getInstance();
XdsResourceType<?> clusterResource = XdsClusterResource.getInstance();
Expand All @@ -243,22 +245,28 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() {
Map<String, ResourceMetadata> ldsResourceMetadataMap = new HashMap<>();
ldsResourceMetadataMap.put("resource1",
ResourceMetadata.newResourceMetadataRequested());
resourceNameByAuthority.put("resource1", "authority1");
ResourceMetadata ackedLdsResource = ResourceMetadata.newResourceMetadataAcked(rawListener, "42",
nanosLastUpdate);
ldsResourceMetadataMap.put("resource2", ackedLdsResource);
resourceNameByAuthority.put("resource2", "authority2");
ldsResourceMetadataMap.put("resource3",
ResourceMetadata.newResourceMetadataAcked(rawListener, "43", nanosLastUpdate));
resourceNameByAuthority.put("resource3", "authority2");
ldsResourceMetadataMap.put("resource4",
ResourceMetadata.newResourceMetadataNacked(ackedLdsResource, "44", nanosLastUpdate,
"nacked after previous ack", true));
resourceNameByAuthority.put("resource4", "authority4");

Map<String, ResourceMetadata> rdsResourceMetadataMap = new HashMap<>();
ResourceMetadata requestedRdsResourceMetadata = ResourceMetadata.newResourceMetadataRequested();
rdsResourceMetadataMap.put("resource5",
ResourceMetadata.newResourceMetadataNacked(requestedRdsResourceMetadata, "24",
nanosLastUpdate, "nacked after request", false));
resourceNameByAuthority.put("resource5", "authority5");
rdsResourceMetadataMap.put("resource6",
ResourceMetadata.newResourceMetadataDoesNotExist());
resourceNameByAuthority.put("resource6", "authority6");

Map<String, ResourceMetadata> cdsResourceMetadataMap = new HashMap<>();
cdsResourceMetadataMap.put("resource7", ResourceMetadata.newResourceMetadataUnknown());
Expand All @@ -277,28 +285,37 @@ public void reportCallbackMetrics_computeAndReportResourceCounts() {
when(mockXdsClient.getSubscribedResourcesMetadataSnapshot())
.thenReturn(getResourceMetadataCompleted);

when(mockXdsClient.getResourceNameToAuthorityMap(anyList()))
.thenReturn(resourceNameByAuthority);

reporter.reportCallbackMetrics(mockBatchRecorder, mockXdsClient);

// LDS resource requested
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "requested", listenerResource.typeUrl())), any());
eq(1L), eq(Arrays.asList(target, "authority1",
"requested", listenerResource.typeUrl())), any());
// LDS resources acked
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(2L), eq(Arrays.asList(target, "acked", listenerResource.typeUrl())), any());
eq(2L), eq(Arrays.asList(target, "authority2",
"acked", listenerResource.typeUrl())), any());
// LDS resource nacked but cached
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "nacked_but_cached", listenerResource.typeUrl())), any());
eq(1L), eq(Arrays.asList(target, "authority4",
"nacked_but_cached", listenerResource.typeUrl())), any());

// RDS resource nacked
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "nacked", routeConfigResource.typeUrl())), any());
eq(1L), eq(Arrays.asList(target, "authority5",
"nacked", routeConfigResource.typeUrl())), any());
// RDS resource does not exist
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "does_not_exist", routeConfigResource.typeUrl())), any());
eq(1L), eq(Arrays.asList(target, "authority6",
"does_not_exist", routeConfigResource.typeUrl())), any());

// CDS resource unknown
verify(mockBatchRecorder).recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"),
eq(1L), eq(Arrays.asList(target, "unknown", clusterResource.typeUrl())), any());
eq(1L), eq(Arrays.asList(target, "#old",
"unknown", clusterResource.typeUrl())), any());
verifyNoMoreInteractions(mockBatchRecorder);
}

Expand Down