Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.xds;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.LongCounterMetricInstrument;
import io.grpc.LongGaugeMetricInstrument;
Expand All @@ -29,12 +30,11 @@
import io.grpc.xds.client.XdsClient.ResourceMetadata;
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
import io.grpc.xds.client.XdsClient.ServerConnectionCallback;
import io.grpc.xds.client.XdsClientImpl;
import io.grpc.xds.client.XdsClientMetricReporter;
import io.grpc.xds.client.XdsResourceType;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -90,7 +90,7 @@ final class XdsClientMetricReporterImpl implements XdsClientMetricReporter {
Arrays.asList("grpc.target", "grpc.xds.server"), Collections.emptyList(), false);
RESOURCES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.xds_client.resources",
"EXPERIMENTAL. Number of xDS resources.", "{resource}",
Arrays.asList("grpc.target", "grpc.xds.cache_state",
Arrays.asList("grpc.target", "grpc.xds.authority", "grpc.xds.cache_state",
"grpc.xds.resource_type"), Collections.emptyList(), false);
}

Expand Down Expand Up @@ -143,7 +143,7 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
getResourceMetadataCompleted.get(10, TimeUnit.SECONDS);

computeAndReportResourceCounts(metadataByType, callback);
computeAndReportResourceCounts(xdsClient, metadataByType, callback);

// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
Void unused = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS);
Expand All @@ -155,21 +155,30 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {
}
}

private void computeAndReportResourceCounts(
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType,
MetricReporterCallback callback) {
private void computeAndReportResourceCounts(XdsClient xdsClient,
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType,
MetricReporterCallback callback) {
for (Map.Entry<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByTypeEntry :
metadataByType.entrySet()) {
XdsResourceType<?> type = metadataByTypeEntry.getKey();

Map<String, Long> resourceCountsByState = new HashMap<>();
List<String> authorities = new ArrayList<>();
for (ResourceMetadata metadata : metadataByTypeEntry.getValue().values()) {
String cacheState = cacheStateFromResourceStatus(metadata.getStatus(), metadata.isCached());
resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1);
}
for (String resourceName : metadataByTypeEntry.getValue().keySet()) {
authorities.add(xdsClient.getAuthority(type, resourceName));
}

resourceCountsByState.forEach((cacheState, count) ->
callback.reportResourceCountGauge(count, cacheState, type.typeUrl()));
Iterator<String> authorityIterator = authorities.iterator();
resourceCountsByState.forEach((cacheState, count) -> {
if (authorityIterator.hasNext()) {
String authority = authorityIterator.next();
callback.reportResourceCountGauge(authority, count, cacheState, type.typeUrl());
}
});
}
}

Expand Down Expand Up @@ -200,10 +209,11 @@ static final class MetricReporterCallback implements ServerConnectionCallback {
}

// TODO(dnvindhya): include the "authority" label once xds.authority is available.
void reportResourceCountGauge(long resourceCount, String cacheState,
void reportResourceCountGauge(String authority, long resourceCount, String cacheState,
String resourceType) {
recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount,
Arrays.asList(target, cacheState, resourceType), Collections.emptyList());
Arrays.asList(target, authority == null ? "#old" : authority,
cacheState, resourceType), Collections.emptyList());
}

@Override
Expand Down
4 changes: 3 additions & 1 deletion xds/src/main/java/io/grpc/xds/client/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,9 @@ public Object getSecurityConfig() {
getSubscribedResourcesMetadataSnapshot() {
throw new UnsupportedOperationException();
}

public String getAuthority(XdsResourceType<?> resourceType, String resourceName) {
throw new UnsupportedOperationException();
}
/**
* Registers a data watcher for the given Xds resource.
*/
Expand Down
9 changes: 9 additions & 0 deletions xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ public void run() {
return future;
}

@Override
public String getAuthority(XdsResourceType<?> resourceType, String resourceName) {
Map<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry = resourceSubscribers.get(resourceType);
if (resourceEntry != null) {
return resourceEntry.get(resourceName).authority;
}
return null;
}

@Override
public Object getSecurityConfig() {
return securityConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void metricGauges() {

// Verify that reportResourceCounts and reportServerConnections were called
// with the captured callback
callback.reportResourceCountGauge(10, "acked", resourceTypeUrl);
callback.reportResourceCountGauge("PotatoHead", 10, "acked", resourceTypeUrl);
inOrder.verify(mockBatchRecorder)
.recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L), any(),
any());
Expand All @@ -222,7 +222,7 @@ public void metricReporterCallback() {
eq(Lists.newArrayList()));

String cacheState = "requested";
callback.reportResourceCountGauge(10, cacheState, resourceTypeUrl);
callback.reportResourceCountGauge("BuzzLightyear", 10, cacheState, resourceTypeUrl);
verify(mockBatchRecorder, times(1)).recordLongGauge(
eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L),
eq(Arrays.asList(target, cacheState, resourceTypeUrl)),
Expand Down
Loading